blob: 7c90d08c850ec65a0dc23a3aaa5d37c60794aaf7 [file] [log] [blame]
"""Test the sp_rand_cleanup tool."""
import collections
import errno
import pathlib
import tempfile
from typing import DefaultDict, Dict, List, NamedTuple # noqa: H301
import pytest
from storpool import sptypes # type: ignore
from sp_rand import cleanup as r_cleanup
from sp_rand import defs as r_defs
def test_parse_file() -> None:
"""Make sure the name prefix file will be parsed correctly."""
with tempfile.TemporaryDirectory() as tempd_obj:
tempd = pathlib.Path(tempd_obj)
cfg = r_cleanup.Config.default()._replace(confdir=tempd)
conffile = tempd / r_defs.FILENAME
conffile.write_text(f"{r_defs.PREFIX_VAR}=testvol-\n", encoding="UTF-8")
parsed_conffile, prefix = r_cleanup.parse_file(cfg)
assert (parsed_conffile, prefix) == (conffile, "testvol-")
def do_test_remove_file(noop: bool) -> None:
"""Make sure the name prefix file will be removed... or not."""
with tempfile.TemporaryDirectory() as tempd_obj:
tempd = pathlib.Path(tempd_obj)
cfg = r_cleanup.Config.default()._replace(confdir=tempd, noop=noop)
conffile = tempd / r_defs.FILENAME
otherfile = tempd / "something.conf"
otherfile.write_text("Can't touch this!\n", encoding="UTF-8")
assert sorted(tempd.iterdir()) == [otherfile]
# Make sure it does not complain if the file does not exist.
r_cleanup.remove_file(cfg, conffile)
assert sorted(tempd.iterdir()) == [otherfile]
conffile.write_text("hello\n", encoding="UTF-8")
assert sorted(tempd.iterdir()) == sorted([conffile, otherfile])
# Make sure it raises an error if it cannot remove the file.
tempd.chmod((tempd.stat().st_mode & 0o7777) & 0o7577)
try:
if noop:
r_cleanup.remove_file(cfg, conffile)
else:
with pytest.raises(OSError) as exc_info:
r_cleanup.remove_file(cfg, conffile)
assert exc_info.value.errno == errno.EACCES
finally:
tempd.chmod((tempd.stat().st_mode & 0o7777) | 0o0200)
assert sorted(tempd.iterdir()) == sorted([conffile, otherfile])
# Make sure it removes the file.
r_cleanup.remove_file(cfg, conffile)
if noop:
assert sorted(tempd.iterdir()) == sorted([conffile, otherfile])
else:
assert sorted(tempd.iterdir()) == [otherfile]
def test_remove_file() -> None:
"""Make sure the name prefix file will be removed."""
do_test_remove_file(False)
def test_remove_file_noop() -> None:
"""Make sure the name prefix file will not be removed."""
do_test_remove_file(True)
class MockVolumeSummary(NamedTuple):
"""Mock a volume or snapshot description."""
name: str
class MockApi:
"""Mock the StorPool API."""
attached: Dict[bool, List[str]]
invoked: DefaultDict[str, int]
vols: Dict[bool, List[str]]
def __init__(
self, attached: Dict[bool, List[str]], vols: Dict[bool, List[str]]
) -> None:
"""Store deep copies of the passed definitions."""
self.attached = {key: list(value) for key, value in attached.items()}
self.vols = {key: list(value) for key, value in vols.items()}
self.invoked = collections.defaultdict(lambda: 0)
# pylint: disable=invalid-name
def attachmentsList(self) -> List[sptypes.AttachmentDesc]:
"""Return the current view of what is attached."""
self.invoked["attachmentsList"] += 1
return [
sptypes.AttachmentDesc(
volume=name, snapshot=False, client=11, rights="rw", pos=0
)
for name in self.attached[False]
] + [
sptypes.AttachmentDesc(
volume=name, snapshot=True, client=11, rights="ro", pos=0
)
for name in self.attached[True]
]
def volumesReassignWait(self, req: sptypes.VolumesReassignWaitDesc) -> None:
"""Update the current view of what is attached."""
self.invoked["volumesReassignWait"] += 1
for item in req.reassign:
if isinstance(item, sptypes.VolumeReassignDesc):
assert item.volume.startswith("testvol-")
self.attached[False].remove(item.volume)
else:
assert isinstance(item, sptypes.SnapshotReassignDesc)
assert item.snapshot.startswith("testvol-")
self.attached[True].remove(item.snapshot)
assert item.detach == [11]
def volumesList(self) -> List[MockVolumeSummary]:
"""Return the current view of the available volumes."""
self.invoked["volumesList"] += 1
return [MockVolumeSummary(name=name) for name in self.vols[False]]
def snapshotsList(self) -> List[MockVolumeSummary]:
"""Return the current view of the available snapshots."""
self.invoked["snapshotsList"] += 1
return [MockVolumeSummary(name=name) for name in self.vols[True]]
def _volumeDelete(self, name: str, snapshot: bool) -> None:
"""Remove a snapshot or a volume from our view."""
assert name not in self.attached[snapshot]
self.vols[snapshot].remove(name)
def volumeDelete(self, name: str) -> None:
"""Remove a volume from our view."""
self.invoked["volumeDelete"] += 1
self._volumeDelete(name, False)
def snapshotDelete(self, name: str) -> None:
"""Remove a snapshot from our view."""
self.invoked["snapshotDelete"] += 1
self._volumeDelete(name, True)
def test_remove_volumes() -> None:
"""Make sure the StorPool volumes and snapshots will be removed."""
cfg = r_cleanup.Config.default()
attached = {
False: ["something-else", "testvol-42"],
True: ["testvol-9000", "another.snapshot", "testvol-616", "some-testvol-12"],
}
vols = {
False: ["something-else", "testvol-42", "testvol-451"],
True: [
"another.snapshot",
"testvol-616",
"testvol-9000",
"some-testvol-12",
"yet.another.thing",
],
}
mock_api = MockApi(attached=attached, vols=vols)
r_cleanup.remove_volumes(cfg, mock_api, "testvol-")
assert mock_api.attached == {
key: [name for name in value if not name.startswith("testvol-")]
for key, value in attached.items()
}
assert mock_api.vols == {
key: [name for name in value if not name.startswith("testvol-")]
for key, value in vols.items()
}
assert mock_api.invoked == {
"attachmentsList": 1,
"volumesReassignWait": 1,
"volumesList": 1,
"snapshotsList": 1,
"volumeDelete": len(
[name for name in vols[False] if name.startswith("testvol-")]
),
"snapshotDelete": len(
[name for name in vols[True] if name.startswith("testvol-")]
),
}
def test_remove_volumes_noop() -> None:
"""Make sure the StorPool volumes and snapshots will be removed."""
cfg = r_cleanup.Config.default()._replace(noop=True)
attached = {
False: ["something-else", "testvol-42"],
True: ["testvol-9000", "another.snapshot", "testvol-616", "some-testvol-12"],
}
vols = {
False: ["something-else", "testvol-42", "testvol-451"],
True: [
"another.snapshot",
"testvol-616",
"testvol-9000",
"some-testvol-12",
"yet.another.thing",
],
}
mock_api = MockApi(attached=attached, vols=vols)
r_cleanup.remove_volumes(cfg, mock_api, "testvol-")
assert mock_api.attached == attached
assert mock_api.vols == vols
assert mock_api.invoked == {
"attachmentsList": 1,
"volumesList": 1,
"snapshotsList": 1,
}