blob: c54c53e17d4c6330cf7248d636b3be39fd1cef00 [file] [log] [blame]
"""Test the sp_rand_cleanup tool."""
import collections
import errno
import pathlib
import tempfile
from typing import DefaultDict, Dict, List, NamedTuple, Tuple # noqa: H301
import pytest
from storpool import sptypes # type: ignore
from sp_rand import cleanup as r_cleanup
from sp_rand import defs as r_defs
T_ATTACHED = {
False: ["something-else", "testvol-42"],
True: ["testvol-9000", "another.snapshot", "testvol-616", "some-testvol-12"],
}
T_EXPORTED = {
"beleriand": [
("neighbors", "testvol-451"),
("ours", "something-else"),
("ours", "testvol-616"),
],
"gondor": [("ours", "testvol-12")],
}
T_TARGETS = {
1: {"name": "ouriqn:target-something-else", "volume": "something-else"},
2: {"name": "ouriqn:target-testvol-12", "volume": "testvol-12"},
3: {"name": "ouriqn:target-testvol-616", "volume": "testvol-616"},
4: {"name": "neiqn:whee-testvol-451", "volume": "testvol-451"},
}
T_VOLS = {
False: ["something-else", "testvol-42", "testvol-451"],
True: [
"another.snapshot",
"testvol-616",
"testvol-9000",
"some-testvol-12",
"yet.another.thing",
],
}
def test_parse_file() -> None:
"""Make sure the name prefix file will be parsed correctly."""
with tempfile.TemporaryDirectory() as tempd_obj:
tempd = pathlib.Path(tempd_obj)
cfg = r_cleanup.Config.default()._replace(confdir=tempd)
conffile = tempd / r_defs.FILENAME
conffile.write_text(f"{r_defs.PREFIX_VAR}=testvol-\n", encoding="UTF-8")
parsed_conffile, prefix = r_cleanup.parse_file(cfg)
assert (parsed_conffile, prefix) == (conffile, "testvol-")
def do_test_remove_file(noop: bool) -> None:
"""Make sure the name prefix file will be removed... or not."""
with tempfile.TemporaryDirectory() as tempd_obj:
tempd = pathlib.Path(tempd_obj)
cfg = r_cleanup.Config.default()._replace(confdir=tempd, noop=noop)
conffile = tempd / r_defs.FILENAME
otherfile = tempd / "something.conf"
otherfile.write_text("Can't touch this!\n", encoding="UTF-8")
assert sorted(tempd.iterdir()) == [otherfile]
# Make sure it does not complain if the file does not exist.
r_cleanup.remove_file(cfg, conffile)
assert sorted(tempd.iterdir()) == [otherfile]
conffile.write_text("hello\n", encoding="UTF-8")
assert sorted(tempd.iterdir()) == sorted([conffile, otherfile])
# Make sure it raises an error if it cannot remove the file.
tempd.chmod((tempd.stat().st_mode & 0o7777) & 0o7577)
try:
if noop:
r_cleanup.remove_file(cfg, conffile)
else:
with pytest.raises(OSError) as exc_info:
r_cleanup.remove_file(cfg, conffile)
assert exc_info.value.errno == errno.EACCES
finally:
tempd.chmod((tempd.stat().st_mode & 0o7777) | 0o0200)
assert sorted(tempd.iterdir()) == sorted([conffile, otherfile])
# Make sure it removes the file.
r_cleanup.remove_file(cfg, conffile)
if noop:
assert sorted(tempd.iterdir()) == sorted([conffile, otherfile])
else:
assert sorted(tempd.iterdir()) == [otherfile]
def test_remove_file() -> None:
"""Make sure the name prefix file will be removed."""
do_test_remove_file(False)
def test_remove_file_noop() -> None:
"""Make sure the name prefix file will not be removed."""
do_test_remove_file(True)
class MockVolumeSummary(NamedTuple):
"""Mock a volume or snapshot description."""
name: str
class MockApi:
"""Mock the StorPool API."""
attached: Dict[bool, List[str]]
exported: Dict[str, List[Tuple[str, str]]]
invoked: DefaultDict[str, int]
targets: Dict[int, Dict[str, str]]
vols: Dict[bool, List[str]]
def __init__(
self,
attached: Dict[bool, List[str]],
exported: Dict[str, List[Tuple[str, str]]],
targets: Dict[int, Dict[str, str]],
vols: Dict[bool, List[str]],
) -> None:
"""Store deep copies of the passed definitions."""
self.attached = {key: list(value) for key, value in attached.items()}
self.exported = {key: list(value) for key, value in exported.items()}
self.targets = {key: dict(value) for key, value in targets.items()}
self.vols = {key: list(value) for key, value in vols.items()}
self.invoked = collections.defaultdict(lambda: 0)
# pylint: disable=invalid-name
def attachmentsList(self) -> List[sptypes.AttachmentDesc]:
"""Return the current view of what is attached."""
self.invoked["attachmentsList"] += 1
return [
sptypes.AttachmentDesc(
volume=name, snapshot=False, client=11, rights="rw", pos=0
)
for name in self.attached[False]
] + [
sptypes.AttachmentDesc(
volume=name, snapshot=True, client=11, rights="ro", pos=0
)
for name in self.attached[True]
]
def iSCSIConfig(self) -> sptypes.iSCSIConfig:
"""Return the current view of the iSCSI configuration."""
self.invoked["iSCSIConfig"] += 1
target_dict = {tgt["volume"]: tgt["name"] for tgt in self.targets.values()}
return sptypes.iSCSIConfig(
iscsi=sptypes.iSCSIConfigData(
baseName="just-us-here",
initiators={
idx: sptypes.iSCSIInitiator(
name=name,
username="",
secret="",
nets=[],
exports=[
sptypes.iSCSIExport(
portalGroup=exp[0], target=target_dict[exp[1]]
)
for exp in explist
],
)
for idx, (name, explist) in enumerate(sorted(self.exported.items()))
},
portalGroups={},
targets={
tid: sptypes.iSCSITarget(
currentControllerId=65535,
name=tgt["name"],
volume=tgt["volume"],
)
for tid, tgt in self.targets.items()
},
)
)
def iSCSIConfigChange(self, change: sptypes.iSCSIConfigChange) -> None:
"""Apply the "delete export" and "remove target" commands."""
self.invoked["iSCSIConfigChange"] += 1
for cmd in change.commands:
expdel = cmd.exportDelete
tgtdel = cmd.deleteTarget
if expdel is not None:
initiator = self.exported[expdel.initiator]
data = (expdel.portalGroup, expdel.volumeName)
assert data in initiator
initiator.remove(data)
elif tgtdel is not None:
found = [
tid
for tid, tgt in self.targets.items()
if tgt["volume"] == tgtdel.volumeName
]
assert len(found) == 1
del self.targets[found[0]]
else:
raise ValueError(repr(cmd))
def volumesReassignWait(self, req: sptypes.VolumesReassignWaitDesc) -> None:
"""Update the current view of what is attached."""
self.invoked["volumesReassignWait"] += 1
for item in req.reassign:
if isinstance(item, sptypes.VolumeReassignDesc):
assert item.volume.startswith("testvol-")
self.attached[False].remove(item.volume)
else:
assert isinstance(item, sptypes.SnapshotReassignDesc)
assert item.snapshot.startswith("testvol-")
self.attached[True].remove(item.snapshot)
assert item.detach == [11]
def volumesList(self) -> List[MockVolumeSummary]:
"""Return the current view of the available volumes."""
self.invoked["volumesList"] += 1
return [MockVolumeSummary(name=name) for name in self.vols[False]]
def snapshotsList(self) -> List[MockVolumeSummary]:
"""Return the current view of the available snapshots."""
self.invoked["snapshotsList"] += 1
return [MockVolumeSummary(name=name) for name in self.vols[True]]
def _volumeDelete(self, name: str, snapshot: bool) -> None:
"""Remove a snapshot or a volume from our view."""
assert name not in self.attached[snapshot]
self.vols[snapshot].remove(name)
def volumeDelete(self, name: str) -> None:
"""Remove a volume from our view."""
self.invoked["volumeDelete"] += 1
self._volumeDelete(name, False)
def snapshotDelete(self, name: str) -> None:
"""Remove a snapshot from our view."""
self.invoked["snapshotDelete"] += 1
self._volumeDelete(name, True)
def test_remove_volumes() -> None:
"""Make sure the StorPool volumes and snapshots will be removed."""
cfg = r_cleanup.Config.default()
mock_api = MockApi(
attached=T_ATTACHED, exported=T_EXPORTED, targets=T_TARGETS, vols=T_VOLS
)
r_cleanup.remove_volumes(cfg, mock_api, "testvol-")
assert mock_api.attached == {
key: [name for name in value if not name.startswith("testvol-")]
for key, value in T_ATTACHED.items()
}
assert mock_api.exported == {
initiator: [item for item in exports if not item[1].startswith("testvol-")]
for initiator, exports in T_EXPORTED.items()
}
assert mock_api.targets == {
tid: tgt
for tid, tgt in T_TARGETS.items()
if not tgt["volume"].startswith("testvol-")
}
assert mock_api.vols == {
key: [name for name in value if not name.startswith("testvol-")]
for key, value in T_VOLS.items()
}
assert mock_api.invoked == {
"attachmentsList": 1,
"iSCSIConfig": 1,
"iSCSIConfigChange": 2,
"volumesReassignWait": 1,
"volumesList": 1,
"snapshotsList": 1,
"volumeDelete": len(
[name for name in T_VOLS[False] if name.startswith("testvol-")]
),
"snapshotDelete": len(
[name for name in T_VOLS[True] if name.startswith("testvol-")]
),
}
def test_remove_volumes_noop() -> None:
"""Make sure the StorPool volumes and snapshots will be removed."""
cfg = r_cleanup.Config.default()._replace(noop=True)
mock_api = MockApi(
attached=T_ATTACHED, exported=T_EXPORTED, targets=T_TARGETS, vols=T_VOLS
)
r_cleanup.remove_volumes(cfg, mock_api, "testvol-")
assert mock_api.attached == T_ATTACHED
assert mock_api.exported == T_EXPORTED
assert mock_api.targets == T_TARGETS
assert mock_api.vols == T_VOLS
assert mock_api.invoked == {
"attachmentsList": 1,
"iSCSIConfig": 1,
"volumesList": 1,
"snapshotsList": 1,
}