| """Test the sp_rand_cleanup tool.""" |
| |
| import collections |
| import errno |
| import pathlib |
| import tempfile |
| |
| from typing import DefaultDict, Dict, List, NamedTuple, Tuple # noqa: H301 |
| |
| import pytest |
| |
| from storpool import sptypes # type: ignore |
| |
| from sp_rand import cleanup as r_cleanup |
| from sp_rand import defs as r_defs |
| |
| |
| T_ATTACHED = { |
| False: ["something-else", "testvol-42"], |
| True: ["testvol-9000", "another.snapshot", "testvol-616", "some-testvol-12"], |
| } |
| |
| T_EXPORTED = { |
| "beleriand": [ |
| ("neighbors", "testvol-451"), |
| ("ours", "something-else"), |
| ("ours", "testvol-616"), |
| ], |
| "gondor": [("ours", "testvol-12")], |
| } |
| |
| T_TARGETS = { |
| 1: {"name": "ouriqn:target-something-else", "volume": "something-else"}, |
| 2: {"name": "ouriqn:target-testvol-12", "volume": "testvol-12"}, |
| 3: {"name": "ouriqn:target-testvol-616", "volume": "testvol-616"}, |
| 4: {"name": "neiqn:whee-testvol-451", "volume": "testvol-451"}, |
| } |
| |
| T_VOLS = { |
| False: ["something-else", "testvol-42", "testvol-451"], |
| True: [ |
| "another.snapshot", |
| "testvol-616", |
| "testvol-9000", |
| "some-testvol-12", |
| "yet.another.thing", |
| ], |
| } |
| |
| |
| def test_parse_file() -> None: |
| """Make sure the name prefix file will be parsed correctly.""" |
| with tempfile.TemporaryDirectory() as tempd_obj: |
| tempd = pathlib.Path(tempd_obj) |
| cfg = r_cleanup.Config.default()._replace(confdir=tempd) |
| |
| conffile = tempd / r_defs.FILENAME |
| conffile.write_text(f"{r_defs.PREFIX_VAR}=testvol-\n", encoding="UTF-8") |
| |
| parsed_conffile, prefix = r_cleanup.parse_file(cfg) |
| assert (parsed_conffile, prefix) == (conffile, "testvol-") |
| |
| |
| def do_test_remove_file(noop: bool) -> None: |
| """Make sure the name prefix file will be removed... or not.""" |
| with tempfile.TemporaryDirectory() as tempd_obj: |
| tempd = pathlib.Path(tempd_obj) |
| cfg = r_cleanup.Config.default()._replace(confdir=tempd, noop=noop) |
| |
| conffile = tempd / r_defs.FILENAME |
| otherfile = tempd / "something.conf" |
| |
| otherfile.write_text("Can't touch this!\n", encoding="UTF-8") |
| assert sorted(tempd.iterdir()) == [otherfile] |
| |
| # Make sure it does not complain if the file does not exist. |
| r_cleanup.remove_file(cfg, conffile) |
| assert sorted(tempd.iterdir()) == [otherfile] |
| |
| conffile.write_text("hello\n", encoding="UTF-8") |
| assert sorted(tempd.iterdir()) == sorted([conffile, otherfile]) |
| |
| # Make sure it raises an error if it cannot remove the file. |
| tempd.chmod((tempd.stat().st_mode & 0o7777) & 0o7577) |
| try: |
| if noop: |
| r_cleanup.remove_file(cfg, conffile) |
| else: |
| with pytest.raises(OSError) as exc_info: |
| r_cleanup.remove_file(cfg, conffile) |
| assert exc_info.value.errno == errno.EACCES |
| finally: |
| tempd.chmod((tempd.stat().st_mode & 0o7777) | 0o0200) |
| assert sorted(tempd.iterdir()) == sorted([conffile, otherfile]) |
| |
| # Make sure it removes the file. |
| r_cleanup.remove_file(cfg, conffile) |
| if noop: |
| assert sorted(tempd.iterdir()) == sorted([conffile, otherfile]) |
| else: |
| assert sorted(tempd.iterdir()) == [otherfile] |
| |
| |
| def test_remove_file() -> None: |
| """Make sure the name prefix file will be removed.""" |
| do_test_remove_file(False) |
| |
| |
| def test_remove_file_noop() -> None: |
| """Make sure the name prefix file will not be removed.""" |
| do_test_remove_file(True) |
| |
| |
| class MockVolumeSummary(NamedTuple): |
| """Mock a volume or snapshot description.""" |
| |
| name: str |
| |
| |
| class MockApi: |
| """Mock the StorPool API.""" |
| |
| attached: Dict[bool, List[str]] |
| exported: Dict[str, List[Tuple[str, str]]] |
| invoked: DefaultDict[str, int] |
| targets: Dict[int, Dict[str, str]] |
| vols: Dict[bool, List[str]] |
| |
| def __init__( |
| self, |
| attached: Dict[bool, List[str]], |
| exported: Dict[str, List[Tuple[str, str]]], |
| targets: Dict[int, Dict[str, str]], |
| vols: Dict[bool, List[str]], |
| ) -> None: |
| """Store deep copies of the passed definitions.""" |
| self.attached = {key: list(value) for key, value in attached.items()} |
| self.exported = {key: list(value) for key, value in exported.items()} |
| self.targets = {key: dict(value) for key, value in targets.items()} |
| self.vols = {key: list(value) for key, value in vols.items()} |
| self.invoked = collections.defaultdict(lambda: 0) |
| |
| # pylint: disable=invalid-name |
| |
| def attachmentsList(self) -> List[sptypes.AttachmentDesc]: |
| """Return the current view of what is attached.""" |
| self.invoked["attachmentsList"] += 1 |
| return [ |
| sptypes.AttachmentDesc( |
| volume=name, snapshot=False, client=11, rights="rw", pos=0 |
| ) |
| for name in self.attached[False] |
| ] + [ |
| sptypes.AttachmentDesc( |
| volume=name, snapshot=True, client=11, rights="ro", pos=0 |
| ) |
| for name in self.attached[True] |
| ] |
| |
| def iSCSIConfig(self) -> sptypes.iSCSIConfig: |
| """Return the current view of the iSCSI configuration.""" |
| self.invoked["iSCSIConfig"] += 1 |
| target_dict = {tgt["volume"]: tgt["name"] for tgt in self.targets.values()} |
| |
| return sptypes.iSCSIConfig( |
| iscsi=sptypes.iSCSIConfigData( |
| baseName="just-us-here", |
| initiators={ |
| idx: sptypes.iSCSIInitiator( |
| name=name, |
| username="", |
| secret="", |
| nets=[], |
| exports=[ |
| sptypes.iSCSIExport( |
| portalGroup=exp[0], target=target_dict[exp[1]] |
| ) |
| for exp in explist |
| ], |
| ) |
| for idx, (name, explist) in enumerate(sorted(self.exported.items())) |
| }, |
| portalGroups={}, |
| targets={ |
| tid: sptypes.iSCSITarget( |
| currentControllerId=65535, |
| name=tgt["name"], |
| volume=tgt["volume"], |
| ) |
| for tid, tgt in self.targets.items() |
| }, |
| ) |
| ) |
| |
| def iSCSIConfigChange(self, change: sptypes.iSCSIConfigChange) -> None: |
| """Apply the "delete export" and "remove target" commands.""" |
| self.invoked["iSCSIConfigChange"] += 1 |
| |
| for cmd in change.commands: |
| expdel = cmd.exportDelete |
| tgtdel = cmd.deleteTarget |
| if expdel is not None: |
| initiator = self.exported[expdel.initiator] |
| data = (expdel.portalGroup, expdel.volumeName) |
| assert data in initiator |
| initiator.remove(data) |
| elif tgtdel is not None: |
| found = [ |
| tid |
| for tid, tgt in self.targets.items() |
| if tgt["volume"] == tgtdel.volumeName |
| ] |
| assert len(found) == 1 |
| del self.targets[found[0]] |
| else: |
| raise ValueError(repr(cmd)) |
| |
| def volumesReassignWait(self, req: sptypes.VolumesReassignWaitDesc) -> None: |
| """Update the current view of what is attached.""" |
| self.invoked["volumesReassignWait"] += 1 |
| |
| for item in req.reassign: |
| if isinstance(item, sptypes.VolumeReassignDesc): |
| assert item.volume.startswith("testvol-") |
| self.attached[False].remove(item.volume) |
| else: |
| assert isinstance(item, sptypes.SnapshotReassignDesc) |
| assert item.snapshot.startswith("testvol-") |
| self.attached[True].remove(item.snapshot) |
| |
| assert item.detach == [11] |
| |
| def volumesList(self) -> List[MockVolumeSummary]: |
| """Return the current view of the available volumes.""" |
| self.invoked["volumesList"] += 1 |
| return [MockVolumeSummary(name=name) for name in self.vols[False]] |
| |
| def snapshotsList(self) -> List[MockVolumeSummary]: |
| """Return the current view of the available snapshots.""" |
| self.invoked["snapshotsList"] += 1 |
| return [MockVolumeSummary(name=name) for name in self.vols[True]] |
| |
| def _volumeDelete(self, name: str, snapshot: bool) -> None: |
| """Remove a snapshot or a volume from our view.""" |
| assert name not in self.attached[snapshot] |
| self.vols[snapshot].remove(name) |
| |
| def volumeDelete(self, name: str) -> None: |
| """Remove a volume from our view.""" |
| self.invoked["volumeDelete"] += 1 |
| self._volumeDelete(name, False) |
| |
| def snapshotDelete(self, name: str) -> None: |
| """Remove a snapshot from our view.""" |
| self.invoked["snapshotDelete"] += 1 |
| self._volumeDelete(name, True) |
| |
| |
| def test_remove_volumes() -> None: |
| """Make sure the StorPool volumes and snapshots will be removed.""" |
| cfg = r_cleanup.Config.default() |
| mock_api = MockApi( |
| attached=T_ATTACHED, exported=T_EXPORTED, targets=T_TARGETS, vols=T_VOLS |
| ) |
| |
| r_cleanup.remove_volumes(cfg, mock_api, "testvol-") |
| |
| assert mock_api.attached == { |
| key: [name for name in value if not name.startswith("testvol-")] |
| for key, value in T_ATTACHED.items() |
| } |
| assert mock_api.exported == { |
| initiator: [item for item in exports if not item[1].startswith("testvol-")] |
| for initiator, exports in T_EXPORTED.items() |
| } |
| assert mock_api.targets == { |
| tid: tgt |
| for tid, tgt in T_TARGETS.items() |
| if not tgt["volume"].startswith("testvol-") |
| } |
| assert mock_api.vols == { |
| key: [name for name in value if not name.startswith("testvol-")] |
| for key, value in T_VOLS.items() |
| } |
| |
| assert mock_api.invoked == { |
| "attachmentsList": 1, |
| "iSCSIConfig": 1, |
| "iSCSIConfigChange": 2, |
| "volumesReassignWait": 1, |
| "volumesList": 1, |
| "snapshotsList": 1, |
| "volumeDelete": len( |
| [name for name in T_VOLS[False] if name.startswith("testvol-")] |
| ), |
| "snapshotDelete": len( |
| [name for name in T_VOLS[True] if name.startswith("testvol-")] |
| ), |
| } |
| |
| |
| def test_remove_volumes_noop() -> None: |
| """Make sure the StorPool volumes and snapshots will be removed.""" |
| cfg = r_cleanup.Config.default()._replace(noop=True) |
| mock_api = MockApi( |
| attached=T_ATTACHED, exported=T_EXPORTED, targets=T_TARGETS, vols=T_VOLS |
| ) |
| |
| r_cleanup.remove_volumes(cfg, mock_api, "testvol-") |
| |
| assert mock_api.attached == T_ATTACHED |
| assert mock_api.exported == T_EXPORTED |
| assert mock_api.targets == T_TARGETS |
| assert mock_api.vols == T_VOLS |
| |
| assert mock_api.invoked == { |
| "attachmentsList": 1, |
| "iSCSIConfig": 1, |
| "volumesList": 1, |
| "snapshotsList": 1, |
| } |