blob: c3efb2d1ac4ef7df1b5dca608a1efe32dc2f0123 [file] [log] [blame]
"""Test the sp_rand_cleanup tool."""
import collections
import errno
import pathlib
import tempfile
from typing import DefaultDict, Dict, List, NamedTuple # noqa: H301
import pytest
from storpool import sptypes # type: ignore
from sp_rand import cleanup as r_cleanup
from sp_rand import defs as r_defs
class VolSnapData(NamedTuple):
"""Test data: what volumes and snapshots should be shown as attached."""
volumes: List[str]
snapshots: List[str]
class ExportsData(NamedTuple):
"""Test data: which volumes are exported to an initiator."""
portal_group: str
volume_name: str
class TargetData(NamedTuple):
"""Test data: an iSCSI target record for a StorPool volume."""
name: str
volume: str
T_ATTACHED = VolSnapData(
volumes=["something-else", "testvol-42"],
snapshots=["testvol-9000", "another.snapshot", "testvol-616", "some-testvol-12"],
)
T_EXPORTED = {
"beleriand": [
ExportsData(portal_group="neighbors", volume_name="testvol-451"),
ExportsData(portal_group="ours", volume_name="something-else"),
ExportsData(portal_group="ours", volume_name="testvol-616"),
],
"gondor": [ExportsData(portal_group="ours", volume_name="testvol-12")],
}
T_TARGETS = {
1: TargetData(name="ouriqn:target-something-else", volume="something-else"),
2: TargetData(name="ouriqn:target-testvol-12", volume="testvol-12"),
3: TargetData(name="ouriqn:target-testvol-616", volume="testvol-616"),
4: TargetData(name="neiqn:whee-testvol-451", volume="testvol-451"),
}
T_VOLS = VolSnapData(
volumes=["something-else", "testvol-42", "testvol-451"],
snapshots=[
"another.snapshot",
"testvol-616",
"testvol-9000",
"some-testvol-12",
"yet.another.thing",
],
)
def test_parse_file() -> None:
"""Make sure the name prefix file will be parsed correctly."""
with tempfile.TemporaryDirectory() as tempd_obj:
tempd = pathlib.Path(tempd_obj)
cfg = r_cleanup.Config.default()._replace(confdir=tempd)
conffile = tempd / r_defs.FILENAME
conffile.write_text(f"{r_defs.PREFIX_VAR}=testvol-\n", encoding="UTF-8")
parsed_conffile, prefix = r_cleanup.parse_file(cfg)
assert (parsed_conffile, prefix) == (conffile, "testvol-")
def do_test_remove_file(noop: bool) -> None:
"""Make sure the name prefix file will be removed... or not."""
with tempfile.TemporaryDirectory() as tempd_obj:
tempd = pathlib.Path(tempd_obj)
cfg = r_cleanup.Config.default()._replace(confdir=tempd, noop=noop)
conffile = tempd / r_defs.FILENAME
otherfile = tempd / "something.conf"
otherfile.write_text("Can't touch this!\n", encoding="UTF-8")
assert sorted(tempd.iterdir()) == [otherfile]
# Make sure it does not complain if the file does not exist.
r_cleanup.remove_file(cfg, conffile)
assert sorted(tempd.iterdir()) == [otherfile]
conffile.write_text("hello\n", encoding="UTF-8")
assert sorted(tempd.iterdir()) == sorted([conffile, otherfile])
# Make sure it raises an error if it cannot remove the file.
tempd.chmod((tempd.stat().st_mode & 0o7777) & 0o7577)
try:
if noop:
r_cleanup.remove_file(cfg, conffile)
else:
with pytest.raises(OSError) as exc_info:
r_cleanup.remove_file(cfg, conffile)
assert exc_info.value.errno == errno.EACCES
finally:
tempd.chmod((tempd.stat().st_mode & 0o7777) | 0o0200)
assert sorted(tempd.iterdir()) == sorted([conffile, otherfile])
# Make sure it removes the file.
r_cleanup.remove_file(cfg, conffile)
if noop:
assert sorted(tempd.iterdir()) == sorted([conffile, otherfile])
else:
assert sorted(tempd.iterdir()) == [otherfile]
def test_remove_file() -> None:
"""Make sure the name prefix file will be removed."""
do_test_remove_file(False)
def test_remove_file_noop() -> None:
"""Make sure the name prefix file will not be removed."""
do_test_remove_file(True)
class MockVolumeSummary(NamedTuple):
"""Mock a volume or snapshot description."""
name: str
class MockApi:
"""Mock the StorPool API."""
attached: VolSnapData
exported: Dict[str, List[ExportsData]]
invoked: DefaultDict[str, int]
targets: Dict[int, TargetData]
vols: VolSnapData
def __init__(
self,
attached: VolSnapData,
exported: Dict[str, List[ExportsData]],
targets: Dict[int, TargetData],
vols: VolSnapData,
) -> None:
"""Store deep copies of the passed definitions."""
self.attached = VolSnapData(
volumes=list(attached.volumes), snapshots=list(attached.snapshots)
)
self.exported = {key: list(value) for key, value in exported.items()}
self.targets = dict(targets)
self.vols = VolSnapData(
volumes=list(vols.volumes), snapshots=list(vols.snapshots)
)
self.invoked = collections.defaultdict(lambda: 0)
# pylint: disable=invalid-name
def attachmentsList(self) -> List[sptypes.AttachmentDesc]:
"""Return the current view of what is attached."""
self.invoked["attachmentsList"] += 1
return [
sptypes.AttachmentDesc(
volume=name, snapshot=False, client=11, rights="rw", pos=0
)
for name in self.attached.volumes
] + [
sptypes.AttachmentDesc(
volume=name, snapshot=True, client=11, rights="ro", pos=0
)
for name in self.attached.snapshots
]
def iSCSIConfig(self) -> sptypes.iSCSIConfig:
"""Return the current view of the iSCSI configuration."""
self.invoked["iSCSIConfig"] += 1
target_dict = {tgt.volume: tgt.name for tgt in self.targets.values()}
return sptypes.iSCSIConfig(
iscsi=sptypes.iSCSIConfigData(
baseName="just-us-here",
initiators={
idx: sptypes.iSCSIInitiator(
name=name,
username="",
secret="",
nets=[],
exports=[
sptypes.iSCSIExport(
portalGroup=exp.portal_group,
target=target_dict[exp.volume_name],
)
for exp in explist
],
)
for idx, (name, explist) in enumerate(sorted(self.exported.items()))
},
portalGroups={},
targets={
tid: sptypes.iSCSITarget(
currentControllerId=65535,
name=tgt.name,
volume=tgt.volume,
)
for tid, tgt in self.targets.items()
},
)
)
def iSCSIConfigChange(self, change: sptypes.iSCSIConfigChange) -> None:
"""Apply the "delete export" and "remove target" commands."""
self.invoked["iSCSIConfigChange"] += 1
for cmd in change.commands:
expdel = cmd.exportDelete
tgtdel = cmd.deleteTarget
if expdel is not None:
initiator = self.exported[expdel.initiator]
data = ExportsData(
portal_group=expdel.portalGroup, volume_name=expdel.volumeName
)
assert data in initiator
initiator.remove(data)
elif tgtdel is not None:
found = [
tid
for tid, tgt in self.targets.items()
if tgt.volume == tgtdel.volumeName
]
assert len(found) == 1
del self.targets[found[0]]
else:
raise ValueError(repr(cmd))
def volumesReassignWait(self, req: sptypes.VolumesReassignWaitDesc) -> None:
"""Update the current view of what is attached."""
self.invoked["volumesReassignWait"] += 1
for item in req.reassign:
if isinstance(item, sptypes.VolumeReassignDesc):
assert item.volume.startswith("testvol-")
self.attached.volumes.remove(item.volume)
else:
assert isinstance(item, sptypes.SnapshotReassignDesc)
assert item.snapshot.startswith("testvol-")
self.attached.snapshots.remove(item.snapshot)
assert item.detach == [11]
def volumesList(self) -> List[MockVolumeSummary]:
"""Return the current view of the available volumes."""
self.invoked["volumesList"] += 1
return [MockVolumeSummary(name=name) for name in self.vols.volumes]
def snapshotsList(self) -> List[MockVolumeSummary]:
"""Return the current view of the available snapshots."""
self.invoked["snapshotsList"] += 1
return [MockVolumeSummary(name=name) for name in self.vols.snapshots]
def volumeDelete(self, name: str) -> None:
"""Remove a volume from our view."""
self.invoked["volumeDelete"] += 1
assert name not in self.attached.volumes
self.vols.volumes.remove(name)
def snapshotDelete(self, name: str) -> None:
"""Remove a snapshot from our view."""
self.invoked["snapshotDelete"] += 1
assert name not in self.attached.snapshots
self.vols.snapshots.remove(name)
def test_remove_volumes() -> None:
"""Make sure the StorPool volumes and snapshots will be removed."""
cfg = r_cleanup.Config.default()
mock_api = MockApi(
attached=T_ATTACHED, exported=T_EXPORTED, targets=T_TARGETS, vols=T_VOLS
)
r_cleanup.remove_volumes(cfg, mock_api, "testvol-")
assert mock_api.attached == VolSnapData(
volumes=[
name for name in T_ATTACHED.volumes if not name.startswith("testvol-")
],
snapshots=[
name for name in T_ATTACHED.snapshots if not name.startswith("testvol-")
],
)
assert mock_api.exported == {
initiator: [
item for item in exports if not item.volume_name.startswith("testvol-")
]
for initiator, exports in T_EXPORTED.items()
}
assert mock_api.targets == {
tid: tgt
for tid, tgt in T_TARGETS.items()
if not tgt.volume.startswith("testvol-")
}
assert mock_api.vols == VolSnapData(
volumes=[name for name in T_VOLS.volumes if not name.startswith("testvol-")],
snapshots=[
name for name in T_VOLS.snapshots if not name.startswith("testvol-")
],
)
assert mock_api.invoked == {
"attachmentsList": 1,
"iSCSIConfig": 1,
"iSCSIConfigChange": 2,
"volumesReassignWait": 1,
"volumesList": 1,
"snapshotsList": 1,
"volumeDelete": len(
[name for name in T_VOLS[False] if name.startswith("testvol-")]
),
"snapshotDelete": len(
[name for name in T_VOLS[True] if name.startswith("testvol-")]
),
}
def test_remove_volumes_noop() -> None:
"""Make sure the StorPool volumes and snapshots will be removed."""
cfg = r_cleanup.Config.default()._replace(noop=True)
mock_api = MockApi(
attached=T_ATTACHED, exported=T_EXPORTED, targets=T_TARGETS, vols=T_VOLS
)
r_cleanup.remove_volumes(cfg, mock_api, "testvol-")
assert mock_api.attached == T_ATTACHED
assert mock_api.exported == T_EXPORTED
assert mock_api.targets == T_TARGETS
assert mock_api.vols == T_VOLS
assert mock_api.invoked == {
"attachmentsList": 1,
"iSCSIConfig": 1,
"volumesList": 1,
"snapshotsList": 1,
}