Add the sp_rand_init and sp_rand_cleanup tools.

These will be used on the worker node to set up a random volume name
prefix for the StorPool volumes and snapshots created during
the OpenStack CI run, and detach and remove them all later.

Change-Id: Ic558e2183db8068545f7454f956dc8bc740959c6
diff --git a/tools/bin/sp_rand_cleanup b/tools/bin/sp_rand_cleanup
new file mode 100755
index 0000000..f92b85a
--- /dev/null
+++ b/tools/bin/sp_rand_cleanup
@@ -0,0 +1,8 @@
+#!/bin/sh
+
+set -e
+
+pylib="$(dirname -- "$(dirname -- "$(readlink -f -- "$0")")")/sp-rand/src"
+pypath="${PYTHONPATH:+$PYTHONPATH:}$pylib"
+
+env PYTHONPATH="$pypath" /opt/storpool/python3/bin/python3 -B -u -m sp_rand.cleanup "$@"
diff --git a/tools/bin/sp_rand_init b/tools/bin/sp_rand_init
new file mode 100755
index 0000000..abbcfb1
--- /dev/null
+++ b/tools/bin/sp_rand_init
@@ -0,0 +1,8 @@
+#!/bin/sh
+
+set -e
+
+pylib="$(dirname -- "$(dirname -- "$(readlink -f -- "$0")")")/sp-rand/src"
+pypath="${PYTHONPATH:+$PYTHONPATH:}$pylib"
+
+env PYTHONPATH="$pypath" /opt/storpool/python3/bin/python3 -B -u -m sp_rand.init "$@"
diff --git a/tools/sp-rand/.pylintrc b/tools/sp-rand/.pylintrc
new file mode 100644
index 0000000..64521d5
--- /dev/null
+++ b/tools/sp-rand/.pylintrc
@@ -0,0 +1,2 @@
+[MESSAGES CONTROL]
+disable=duplicate-code
diff --git a/tools/sp-rand/MANIFEST.in b/tools/sp-rand/MANIFEST.in
new file mode 100644
index 0000000..0647fb4
--- /dev/null
+++ b/tools/sp-rand/MANIFEST.in
@@ -0,0 +1,2 @@
+include tox.ini
+recursive-include unit_tests *.py
diff --git a/tools/sp-rand/pyproject.toml b/tools/sp-rand/pyproject.toml
new file mode 100644
index 0000000..9787c3b
--- /dev/null
+++ b/tools/sp-rand/pyproject.toml
@@ -0,0 +1,3 @@
+[build-system]
+requires = ["setuptools", "wheel"]
+build-backend = "setuptools.build_meta"
diff --git a/tools/sp-rand/setup.cfg b/tools/sp-rand/setup.cfg
new file mode 100644
index 0000000..1273ae8
--- /dev/null
+++ b/tools/sp-rand/setup.cfg
@@ -0,0 +1,33 @@
+[metadata]
+name = sp_rand
+version = attr: sp_rand.defs.VERSION
+author = StorPool
+author_email = openstack-dev@storpool.com
+description = Set up and clean up volumes with random name prefixes.
+
+[options]
+zip_safe = True
+package_dir =
+    = src
+packages =
+    sp_rand
+setup_requires =
+install_requires =
+    confget >= 4
+    storpool >= 6
+
+[options.package_data]
+sp_rand =
+    py.typed
+
+[options.entry_points]
+console_scripts =
+    sp_rand_cleanup = sp_rand.cleanup:main
+    sp_rand_init = sp_rand.init:main
+
+[flake8]
+max_line_length = 88
+
+[mypy]
+python_version = 3.8
+strict = True
diff --git a/tools/sp-rand/src/sp_rand/__init__.py b/tools/sp-rand/src/sp_rand/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/tools/sp-rand/src/sp_rand/__init__.py
diff --git a/tools/sp-rand/src/sp_rand/cleanup.py b/tools/sp-rand/src/sp_rand/cleanup.py
new file mode 100644
index 0000000..3e1a464
--- /dev/null
+++ b/tools/sp-rand/src/sp_rand/cleanup.py
@@ -0,0 +1,147 @@
+"""Clean up: remove volumes, snapshots, and the file itself."""
+
+from __future__ import annotations
+
+import argparse
+import errno
+import pathlib
+
+from typing import NamedTuple, Tuple  # noqa: H301
+
+import confget
+
+from storpool import spapi  # type: ignore
+from storpool import sptypes
+
+from . import defs
+
+
+class Config(NamedTuple):
+    """Runtime configuration for the sp_rand_cleanup tool."""
+
+    confdir: pathlib.Path
+    noop: bool
+    skip_api: bool
+
+    @staticmethod
+    def default() -> Config:
+        """Build a configuration object with default values."""
+        return Config(confdir=defs.DEFAULT_CONFDIR, noop=False, skip_api=False)
+
+
+def parse_file(cfg: Config) -> Tuple[pathlib.Path, str]:
+    """Parse the StorPool configuration file."""
+    conffile = cfg.confdir / defs.FILENAME
+    print(f"Parsing {conffile}")
+    data = confget.read_ini_file(confget.Config([], filename=str(conffile)))
+
+    assert isinstance(data, dict)
+    assert sorted(data) == [""]
+    assert sorted(data[""]) == [defs.PREFIX_VAR]
+    prefix = data[""][defs.PREFIX_VAR]
+    print(f"- got prefix {prefix!r}")
+
+    return conffile, prefix
+
+
+def remove_volumes(cfg: Config, api: spapi.Api, prefix: str) -> None:
+    """Detach and remove any pertinent StorPool volumes and snapshots."""
+    print("Querying the StorPool API for attached volumes and snapshots")
+    to_detach = []
+    for desc in api.attachmentsList():  # pylint: disable=not-callable
+        if not desc.volume.startswith(prefix):
+            continue
+        if desc.snapshot:
+            print(f"Will detach snapshot {desc.volume} from client {desc.client}")
+            to_detach.append(
+                sptypes.SnapshotReassignDesc(
+                    detach=[desc.client], force=True, snapshot=desc.volume
+                )
+            )
+        else:
+            print(f"Will detach volume {desc.volume} from client {desc.client}")
+            to_detach.append(
+                sptypes.VolumeReassignDesc(
+                    detach=[desc.client], force=True, volume=desc.volume
+                )
+            )
+
+    if to_detach:
+        print(f"Detaching {len(to_detach)} volumes/snapshots")
+        if not cfg.noop:
+            api.volumesReassignWait(  # pylint: disable=not-callable
+                sptypes.VolumesReassignWaitDesc(reassign=to_detach)
+            )
+
+    print("Querying the StorPool API for existing volumes")
+    for name in [vol.name for vol in api.volumesList()]:
+        if not name.startswith(prefix):
+            continue
+        print(f"Removing volume {name}")
+        if not cfg.noop:
+            api.volumeDelete(name)
+
+    print("Querying the StorPool API for existing snapshots")
+    for name in [snap.name for snap in api.snapshotsList()]:
+        if not name.startswith(prefix):
+            continue
+        print(f"Removing snapshot {name}")
+        if not cfg.noop:
+            api.snapshotDelete(name)
+
+
+def remove_file(cfg: Config, conffile: pathlib.Path) -> None:
+    """Remove the StorPool configuration file."""
+    print(f"Removing {conffile}")
+    if not cfg.noop:
+        try:
+            conffile.unlink()
+        except OSError as err:
+            if err.errno != errno.ENOENT:
+                raise
+
+
+def parse_args() -> Config:
+    """Parse the command-line arguments."""
+    parser = argparse.ArgumentParser(prog="sp_rand_cleanup")
+    parser.add_argument(
+        "-d",
+        "--confdir",
+        type=pathlib.Path,
+        default=defs.DEFAULT_CONFDIR,
+        help="The directory to look for the configuration file in",
+    )
+    parser.add_argument(
+        "-N",
+        "--noop",
+        action="store_true",
+        help="No-operation mode; display what would have been done",
+    )
+    parser.add_argument(
+        "--skip-api",
+        type=str,
+        help=(
+            "If explicitly given the 'yes' value, do not "
+            "delete any volumes or snapshots"
+        ),
+    )
+
+    args = parser.parse_args()
+
+    return Config(confdir=args.confdir, noop=args.noop, skip_api=args.skip_api == "yes")
+
+
+def main() -> None:
+    """Main program: parse command-line options, handle them."""
+    cfg = parse_args()
+    conffile, prefix = parse_file(cfg)
+
+    if not cfg.skip_api:
+        api = spapi.Api.fromConfig()
+        remove_volumes(cfg, api, prefix)
+
+    remove_file(cfg, conffile)
+
+
+if __name__ == "__main__":
+    main()
diff --git a/tools/sp-rand/src/sp_rand/defs.py b/tools/sp-rand/src/sp_rand/defs.py
new file mode 100644
index 0000000..bf2ad8a
--- /dev/null
+++ b/tools/sp-rand/src/sp_rand/defs.py
@@ -0,0 +1,11 @@
+"""Common definitions for the StorPool random prefix toolset."""
+
+import pathlib
+
+
+VERSION = "0.1.0"
+
+FILENAME = "sp-ostack-random.conf"
+PREFIX_VAR = "SP_OPENSTACK_VOLUME_PREFIX"
+
+DEFAULT_CONFDIR = pathlib.Path("/etc/storpool.conf.d")
diff --git a/tools/sp-rand/src/sp_rand/init.py b/tools/sp-rand/src/sp_rand/init.py
new file mode 100644
index 0000000..60baa88
--- /dev/null
+++ b/tools/sp-rand/src/sp_rand/init.py
@@ -0,0 +1,100 @@
+"""Set up the random volume name prefix."""
+
+from __future__ import annotations
+
+import argparse
+import datetime
+import pathlib
+import random
+
+from typing import NamedTuple, Tuple  # noqa: H301
+
+from . import defs
+
+DEFAULT_TAG = "osci-1-date"
+
+
+class Config(NamedTuple):
+    """Runtime configuration for the sp_rand_init tool."""
+
+    confdir: pathlib.Path
+    noop: bool
+    prefix_var: str
+    tag: str
+
+    @staticmethod
+    def default() -> Config:
+        """Build a configuration object with the default values."""
+        return Config(
+            confdir=defs.DEFAULT_CONFDIR,
+            noop=False,
+            prefix_var=defs.PREFIX_VAR,
+            tag=DEFAULT_TAG,
+        )
+
+
+def generate_prefix(cfg: Config) -> str:
+    """Generate a random name prefix."""
+    now = datetime.datetime.now()
+    rval = random.randint(1000, 9999)
+    tdate = f"{now.year:04}{now.month:02}{now.day:02}"
+    ttime = f"{now.hour:02}{now.minute:02}"
+    return f"{cfg.tag}-{tdate}-{ttime}-{rval}-"
+
+
+def store_prefix(cfg: Config) -> Tuple[pathlib.Path, str]:
+    """Generate and store the random name prefix into a file."""
+    prefix = generate_prefix(cfg)
+    conffile = cfg.confdir / defs.FILENAME
+    if not cfg.noop:
+        conffile.write_text(f"{cfg.prefix_var}={prefix}\n", encoding="UTF-8")
+    return conffile, prefix
+
+
+def parse_args() -> Config:
+    """Parse the command-line arguments."""
+    parser = argparse.ArgumentParser(prog="sp_rand_init")
+    parser.add_argument(
+        "-d",
+        "--confdir",
+        type=pathlib.Path,
+        default=defs.DEFAULT_CONFDIR,
+        help="The directory to create the configuration file in",
+    )
+    parser.add_argument(
+        "-N",
+        "--noop",
+        action="store_true",
+        help="No-operation mode; display what would have been done",
+    )
+    parser.add_argument(
+        "-p",
+        "--prefix-var",
+        type=str,
+        default=defs.PREFIX_VAR,
+        help="The name of the variable to store into the configuration file",
+    )
+    parser.add_argument(
+        "-t",
+        "--tag",
+        type=str,
+        default=DEFAULT_TAG,
+        help="The tag that the prefix should start with",
+    )
+
+    args = parser.parse_args()
+
+    return Config(
+        confdir=args.confdir, noop=args.noop, prefix_var=args.prefix_var, tag=args.tag
+    )
+
+
+def main() -> None:
+    """Main program: parse options, generate a prefix, store it."""
+    cfg = parse_args()
+    conffile, prefix = store_prefix(cfg)
+    print(f"{prefix}\n{conffile}")
+
+
+if __name__ == "__main__":
+    main()
diff --git a/tools/sp-rand/src/sp_rand/py.typed b/tools/sp-rand/src/sp_rand/py.typed
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/tools/sp-rand/src/sp_rand/py.typed
diff --git a/tools/sp-rand/test_functional.sh b/tools/sp-rand/test_functional.sh
new file mode 100755
index 0000000..a03b01d
--- /dev/null
+++ b/tools/sp-rand/test_functional.sh
@@ -0,0 +1,41 @@
+#!/bin/sh
+
+set -e
+set -x
+
+tempd="$(mktemp -d)"
+trap "rm -rf -- '$tempd'" EXIT HUP INT TERM QUIT
+
+conffile="$tempd/sp-ostack-random.conf"
+
+rm -rf -- "$tempd"
+test ! -e "$tempd"
+test ! -h "$tempd"
+
+sp_rand_init -d "$tempd" -N
+sp_rand_init -d "$tempd" -N | grep -Fqxe "$conffile"
+sp_rand_init -d "$tempd" -N | grep -Eqe '^osci-1-date-[0-9]{8}-[0-9]{4}-[0-9]{4}-$'
+
+mkdir -- "$tempd"
+test -d "$tempd"
+test ! -e "$conffile"
+test ! -h "$conffile"
+
+sp_rand_init -d "$tempd" -N
+test ! -e "$conffile"
+test ! -h "$conffile"
+
+sp_rand_init -d "$tempd"
+test -f "$conffile"
+
+cat -- "$conffile"
+grep -Eqe '^SP_OPENSTACK_VOLUME_PREFIX=osci-1-date-[0-9]{8}-[0-9]{4}-[0-9]{4}-$' -- "$conffile"
+
+sp_rand_cleanup -d "$tempd" --skip-api yes -N
+test -f "$conffile"
+
+sp_rand_cleanup -d "$tempd" --skip-api yes
+test ! -e "$conffile"
+test ! -h "$conffile"
+
+rmdir -- "$tempd"
diff --git a/tools/sp-rand/tox.ini b/tools/sp-rand/tox.ini
new file mode 100644
index 0000000..81307c5
--- /dev/null
+++ b/tools/sp-rand/tox.ini
@@ -0,0 +1,74 @@
+[tox]
+envlist =
+  black
+  pep8
+  pep8h
+  mypy
+  pylint
+  unit-tests
+  functional-test
+isolated_build = True
+
+
+[defs]
+pyfiles =
+  src/sp_rand
+  unit_tests
+
+[testenv:black]
+skip_install = True
+deps =
+  black >= 21b0, < 22b0
+commands =
+  black --check {[defs]pyfiles}
+
+[testenv:black-reformat]
+skip_install = True
+deps =
+  black >= 21b0, < 22b0
+commands =
+  black {[defs]pyfiles}
+
+[testenv:pep8]
+skip_install = True
+deps =
+  flake8
+commands =
+  flake8 {[defs]pyfiles}
+
+[testenv:pep8h]
+skip_install = True
+deps =
+  flake8
+  hacking >= 4
+commands =
+  flake8 {[defs]pyfiles}
+
+[testenv:mypy]
+skip_install = True
+deps =
+  confget >= 4
+  mypy
+  pytest >= 6
+commands =
+  mypy {[defs]pyfiles}
+
+[testenv:pylint]
+skip_install = True
+deps =
+  confget >= 4
+  pylint
+  pytest >= 6
+  storpool >= 6
+commands =
+  pylint {[defs]pyfiles}
+
+[testenv:unit-tests]
+deps =
+  pytest >= 6
+commands =
+  pytest unit_tests
+
+[testenv:functional-test]
+commands =
+  {toxinidir}/test_functional.sh
diff --git a/tools/sp-rand/unit_tests/__init__.py b/tools/sp-rand/unit_tests/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/tools/sp-rand/unit_tests/__init__.py
diff --git a/tools/sp-rand/unit_tests/test_cleanup.py b/tools/sp-rand/unit_tests/test_cleanup.py
new file mode 100644
index 0000000..7c90d08
--- /dev/null
+++ b/tools/sp-rand/unit_tests/test_cleanup.py
@@ -0,0 +1,232 @@
+"""Test the sp_rand_cleanup tool."""
+
+import collections
+import errno
+import pathlib
+import tempfile
+
+from typing import DefaultDict, Dict, List, NamedTuple  # noqa: H301
+
+import pytest
+
+from storpool import sptypes  # type: ignore
+
+from sp_rand import cleanup as r_cleanup
+from sp_rand import defs as r_defs
+
+
+def test_parse_file() -> None:
+    """Make sure the name prefix file will be parsed correctly."""
+    with tempfile.TemporaryDirectory() as tempd_obj:
+        tempd = pathlib.Path(tempd_obj)
+        cfg = r_cleanup.Config.default()._replace(confdir=tempd)
+
+        conffile = tempd / r_defs.FILENAME
+        conffile.write_text(f"{r_defs.PREFIX_VAR}=testvol-\n", encoding="UTF-8")
+
+        parsed_conffile, prefix = r_cleanup.parse_file(cfg)
+        assert (parsed_conffile, prefix) == (conffile, "testvol-")
+
+
+def do_test_remove_file(noop: bool) -> None:
+    """Make sure the name prefix file will be removed... or not."""
+    with tempfile.TemporaryDirectory() as tempd_obj:
+        tempd = pathlib.Path(tempd_obj)
+        cfg = r_cleanup.Config.default()._replace(confdir=tempd, noop=noop)
+
+        conffile = tempd / r_defs.FILENAME
+        otherfile = tempd / "something.conf"
+
+        otherfile.write_text("Can't touch this!\n", encoding="UTF-8")
+        assert sorted(tempd.iterdir()) == [otherfile]
+
+        # Make sure it does not complain if the file does not exist.
+        r_cleanup.remove_file(cfg, conffile)
+        assert sorted(tempd.iterdir()) == [otherfile]
+
+        conffile.write_text("hello\n", encoding="UTF-8")
+        assert sorted(tempd.iterdir()) == sorted([conffile, otherfile])
+
+        # Make sure it raises an error if it cannot remove the file.
+        tempd.chmod((tempd.stat().st_mode & 0o7777) & 0o7577)
+        try:
+            if noop:
+                r_cleanup.remove_file(cfg, conffile)
+            else:
+                with pytest.raises(OSError) as exc_info:
+                    r_cleanup.remove_file(cfg, conffile)
+                assert exc_info.value.errno == errno.EACCES
+        finally:
+            tempd.chmod((tempd.stat().st_mode & 0o7777) | 0o0200)
+        assert sorted(tempd.iterdir()) == sorted([conffile, otherfile])
+
+        # Make sure it removes the file.
+        r_cleanup.remove_file(cfg, conffile)
+        if noop:
+            assert sorted(tempd.iterdir()) == sorted([conffile, otherfile])
+        else:
+            assert sorted(tempd.iterdir()) == [otherfile]
+
+
+def test_remove_file() -> None:
+    """Make sure the name prefix file will be removed."""
+    do_test_remove_file(False)
+
+
+def test_remove_file_noop() -> None:
+    """Make sure the name prefix file will not be removed."""
+    do_test_remove_file(True)
+
+
+class MockVolumeSummary(NamedTuple):
+    """Mock a volume or snapshot description."""
+
+    name: str
+
+
+class MockApi:
+    """Mock the StorPool API."""
+
+    attached: Dict[bool, List[str]]
+    invoked: DefaultDict[str, int]
+    vols: Dict[bool, List[str]]
+
+    def __init__(
+        self, attached: Dict[bool, List[str]], vols: Dict[bool, List[str]]
+    ) -> None:
+        """Store deep copies of the passed definitions."""
+        self.attached = {key: list(value) for key, value in attached.items()}
+        self.vols = {key: list(value) for key, value in vols.items()}
+        self.invoked = collections.defaultdict(lambda: 0)
+
+    # pylint: disable=invalid-name
+
+    def attachmentsList(self) -> List[sptypes.AttachmentDesc]:
+        """Return the current view of what is attached."""
+        self.invoked["attachmentsList"] += 1
+        return [
+            sptypes.AttachmentDesc(
+                volume=name, snapshot=False, client=11, rights="rw", pos=0
+            )
+            for name in self.attached[False]
+        ] + [
+            sptypes.AttachmentDesc(
+                volume=name, snapshot=True, client=11, rights="ro", pos=0
+            )
+            for name in self.attached[True]
+        ]
+
+    def volumesReassignWait(self, req: sptypes.VolumesReassignWaitDesc) -> None:
+        """Update the current view of what is attached."""
+        self.invoked["volumesReassignWait"] += 1
+
+        for item in req.reassign:
+            if isinstance(item, sptypes.VolumeReassignDesc):
+                assert item.volume.startswith("testvol-")
+                self.attached[False].remove(item.volume)
+            else:
+                assert isinstance(item, sptypes.SnapshotReassignDesc)
+                assert item.snapshot.startswith("testvol-")
+                self.attached[True].remove(item.snapshot)
+
+            assert item.detach == [11]
+
+    def volumesList(self) -> List[MockVolumeSummary]:
+        """Return the current view of the available volumes."""
+        self.invoked["volumesList"] += 1
+        return [MockVolumeSummary(name=name) for name in self.vols[False]]
+
+    def snapshotsList(self) -> List[MockVolumeSummary]:
+        """Return the current view of the available snapshots."""
+        self.invoked["snapshotsList"] += 1
+        return [MockVolumeSummary(name=name) for name in self.vols[True]]
+
+    def _volumeDelete(self, name: str, snapshot: bool) -> None:
+        """Remove a snapshot or a volume from our view."""
+        assert name not in self.attached[snapshot]
+        self.vols[snapshot].remove(name)
+
+    def volumeDelete(self, name: str) -> None:
+        """Remove a volume from our view."""
+        self.invoked["volumeDelete"] += 1
+        self._volumeDelete(name, False)
+
+    def snapshotDelete(self, name: str) -> None:
+        """Remove a snapshot from our view."""
+        self.invoked["snapshotDelete"] += 1
+        self._volumeDelete(name, True)
+
+
+def test_remove_volumes() -> None:
+    """Make sure the StorPool volumes and snapshots will be removed."""
+    cfg = r_cleanup.Config.default()
+    attached = {
+        False: ["something-else", "testvol-42"],
+        True: ["testvol-9000", "another.snapshot", "testvol-616", "some-testvol-12"],
+    }
+    vols = {
+        False: ["something-else", "testvol-42", "testvol-451"],
+        True: [
+            "another.snapshot",
+            "testvol-616",
+            "testvol-9000",
+            "some-testvol-12",
+            "yet.another.thing",
+        ],
+    }
+    mock_api = MockApi(attached=attached, vols=vols)
+
+    r_cleanup.remove_volumes(cfg, mock_api, "testvol-")
+
+    assert mock_api.attached == {
+        key: [name for name in value if not name.startswith("testvol-")]
+        for key, value in attached.items()
+    }
+    assert mock_api.vols == {
+        key: [name for name in value if not name.startswith("testvol-")]
+        for key, value in vols.items()
+    }
+
+    assert mock_api.invoked == {
+        "attachmentsList": 1,
+        "volumesReassignWait": 1,
+        "volumesList": 1,
+        "snapshotsList": 1,
+        "volumeDelete": len(
+            [name for name in vols[False] if name.startswith("testvol-")]
+        ),
+        "snapshotDelete": len(
+            [name for name in vols[True] if name.startswith("testvol-")]
+        ),
+    }
+
+
+def test_remove_volumes_noop() -> None:
+    """Make sure the StorPool volumes and snapshots will be removed."""
+    cfg = r_cleanup.Config.default()._replace(noop=True)
+    attached = {
+        False: ["something-else", "testvol-42"],
+        True: ["testvol-9000", "another.snapshot", "testvol-616", "some-testvol-12"],
+    }
+    vols = {
+        False: ["something-else", "testvol-42", "testvol-451"],
+        True: [
+            "another.snapshot",
+            "testvol-616",
+            "testvol-9000",
+            "some-testvol-12",
+            "yet.another.thing",
+        ],
+    }
+    mock_api = MockApi(attached=attached, vols=vols)
+
+    r_cleanup.remove_volumes(cfg, mock_api, "testvol-")
+
+    assert mock_api.attached == attached
+    assert mock_api.vols == vols
+
+    assert mock_api.invoked == {
+        "attachmentsList": 1,
+        "volumesList": 1,
+        "snapshotsList": 1,
+    }
diff --git a/tools/sp-rand/unit_tests/test_init.py b/tools/sp-rand/unit_tests/test_init.py
new file mode 100644
index 0000000..121bfa9
--- /dev/null
+++ b/tools/sp-rand/unit_tests/test_init.py
@@ -0,0 +1,53 @@
+"""Test the creation of the StorPool volume name random seed."""
+
+import pathlib
+import re
+import tempfile
+
+from sp_rand import defs as r_defs
+from sp_rand import init as r_init
+
+TAG = "testvol"
+RE_PREFIX = re.compile(r"^ testvol-\d{8}-\d{4}-\d{4}- $", re.X)
+
+
+def test_generate() -> None:
+    """Make sure the prefix is generated correctly."""
+    cfg = r_init.Config.default()._replace(tag=TAG)
+    prefix = r_init.generate_prefix(cfg)
+    assert RE_PREFIX.match(prefix)
+
+
+def do_test_store(noop: bool) -> None:
+    """Make sure the prefix is stored into the config file... or not."""
+    with tempfile.TemporaryDirectory() as tempd_obj:
+        tempd = pathlib.Path(tempd_obj)
+        assert sorted(tempd.iterdir()) == []
+
+        cfg = r_init.Config.default()._replace(confdir=tempd, noop=noop, tag=TAG)
+        conffile, prefix = r_init.store_prefix(cfg)
+
+        if noop:
+            assert sorted(tempd.iterdir()) == []
+        else:
+            files = sorted(tempd.iterdir())
+            assert files == [conffile]
+            assert conffile.is_file()
+            assert conffile.name.endswith(".conf") and not conffile.name.startswith(".")
+
+            lines = conffile.read_text(encoding="UTF-8").splitlines()
+            assert len(lines) == 1
+            fields = lines[0].split("=", maxsplit=1)
+            assert fields == [r_defs.PREFIX_VAR, prefix]
+
+        assert RE_PREFIX.match(prefix)
+
+
+def test_store() -> None:
+    """Make sure the prefix is stored into the config file."""
+    do_test_store(False)
+
+
+def test_store_noop() -> None:
+    """Make sure the prefix is not stored into the config file."""
+    do_test_store(True)