| """Test the sp_rand_cleanup tool.""" |
| |
| import collections |
| import errno |
| import pathlib |
| import tempfile |
| |
| from typing import DefaultDict, Dict, List, NamedTuple # noqa: H301 |
| |
| import pytest |
| |
| from storpool import sptypes # type: ignore |
| |
| from sp_rand import cleanup as r_cleanup |
| from sp_rand import defs as r_defs |
| |
| |
| def test_parse_file() -> None: |
| """Make sure the name prefix file will be parsed correctly.""" |
| with tempfile.TemporaryDirectory() as tempd_obj: |
| tempd = pathlib.Path(tempd_obj) |
| cfg = r_cleanup.Config.default()._replace(confdir=tempd) |
| |
| conffile = tempd / r_defs.FILENAME |
| conffile.write_text(f"{r_defs.PREFIX_VAR}=testvol-\n", encoding="UTF-8") |
| |
| parsed_conffile, prefix = r_cleanup.parse_file(cfg) |
| assert (parsed_conffile, prefix) == (conffile, "testvol-") |
| |
| |
| def do_test_remove_file(noop: bool) -> None: |
| """Make sure the name prefix file will be removed... or not.""" |
| with tempfile.TemporaryDirectory() as tempd_obj: |
| tempd = pathlib.Path(tempd_obj) |
| cfg = r_cleanup.Config.default()._replace(confdir=tempd, noop=noop) |
| |
| conffile = tempd / r_defs.FILENAME |
| otherfile = tempd / "something.conf" |
| |
| otherfile.write_text("Can't touch this!\n", encoding="UTF-8") |
| assert sorted(tempd.iterdir()) == [otherfile] |
| |
| # Make sure it does not complain if the file does not exist. |
| r_cleanup.remove_file(cfg, conffile) |
| assert sorted(tempd.iterdir()) == [otherfile] |
| |
| conffile.write_text("hello\n", encoding="UTF-8") |
| assert sorted(tempd.iterdir()) == sorted([conffile, otherfile]) |
| |
| # Make sure it raises an error if it cannot remove the file. |
| tempd.chmod((tempd.stat().st_mode & 0o7777) & 0o7577) |
| try: |
| if noop: |
| r_cleanup.remove_file(cfg, conffile) |
| else: |
| with pytest.raises(OSError) as exc_info: |
| r_cleanup.remove_file(cfg, conffile) |
| assert exc_info.value.errno == errno.EACCES |
| finally: |
| tempd.chmod((tempd.stat().st_mode & 0o7777) | 0o0200) |
| assert sorted(tempd.iterdir()) == sorted([conffile, otherfile]) |
| |
| # Make sure it removes the file. |
| r_cleanup.remove_file(cfg, conffile) |
| if noop: |
| assert sorted(tempd.iterdir()) == sorted([conffile, otherfile]) |
| else: |
| assert sorted(tempd.iterdir()) == [otherfile] |
| |
| |
| def test_remove_file() -> None: |
| """Make sure the name prefix file will be removed.""" |
| do_test_remove_file(False) |
| |
| |
| def test_remove_file_noop() -> None: |
| """Make sure the name prefix file will not be removed.""" |
| do_test_remove_file(True) |
| |
| |
| class MockVolumeSummary(NamedTuple): |
| """Mock a volume or snapshot description.""" |
| |
| name: str |
| |
| |
| class MockApi: |
| """Mock the StorPool API.""" |
| |
| attached: Dict[bool, List[str]] |
| invoked: DefaultDict[str, int] |
| vols: Dict[bool, List[str]] |
| |
| def __init__( |
| self, attached: Dict[bool, List[str]], vols: Dict[bool, List[str]] |
| ) -> None: |
| """Store deep copies of the passed definitions.""" |
| self.attached = {key: list(value) for key, value in attached.items()} |
| self.vols = {key: list(value) for key, value in vols.items()} |
| self.invoked = collections.defaultdict(lambda: 0) |
| |
| # pylint: disable=invalid-name |
| |
| def attachmentsList(self) -> List[sptypes.AttachmentDesc]: |
| """Return the current view of what is attached.""" |
| self.invoked["attachmentsList"] += 1 |
| return [ |
| sptypes.AttachmentDesc( |
| volume=name, snapshot=False, client=11, rights="rw", pos=0 |
| ) |
| for name in self.attached[False] |
| ] + [ |
| sptypes.AttachmentDesc( |
| volume=name, snapshot=True, client=11, rights="ro", pos=0 |
| ) |
| for name in self.attached[True] |
| ] |
| |
| def volumesReassignWait(self, req: sptypes.VolumesReassignWaitDesc) -> None: |
| """Update the current view of what is attached.""" |
| self.invoked["volumesReassignWait"] += 1 |
| |
| for item in req.reassign: |
| if isinstance(item, sptypes.VolumeReassignDesc): |
| assert item.volume.startswith("testvol-") |
| self.attached[False].remove(item.volume) |
| else: |
| assert isinstance(item, sptypes.SnapshotReassignDesc) |
| assert item.snapshot.startswith("testvol-") |
| self.attached[True].remove(item.snapshot) |
| |
| assert item.detach == [11] |
| |
| def volumesList(self) -> List[MockVolumeSummary]: |
| """Return the current view of the available volumes.""" |
| self.invoked["volumesList"] += 1 |
| return [MockVolumeSummary(name=name) for name in self.vols[False]] |
| |
| def snapshotsList(self) -> List[MockVolumeSummary]: |
| """Return the current view of the available snapshots.""" |
| self.invoked["snapshotsList"] += 1 |
| return [MockVolumeSummary(name=name) for name in self.vols[True]] |
| |
| def _volumeDelete(self, name: str, snapshot: bool) -> None: |
| """Remove a snapshot or a volume from our view.""" |
| assert name not in self.attached[snapshot] |
| self.vols[snapshot].remove(name) |
| |
| def volumeDelete(self, name: str) -> None: |
| """Remove a volume from our view.""" |
| self.invoked["volumeDelete"] += 1 |
| self._volumeDelete(name, False) |
| |
| def snapshotDelete(self, name: str) -> None: |
| """Remove a snapshot from our view.""" |
| self.invoked["snapshotDelete"] += 1 |
| self._volumeDelete(name, True) |
| |
| |
| def test_remove_volumes() -> None: |
| """Make sure the StorPool volumes and snapshots will be removed.""" |
| cfg = r_cleanup.Config.default() |
| attached = { |
| False: ["something-else", "testvol-42"], |
| True: ["testvol-9000", "another.snapshot", "testvol-616", "some-testvol-12"], |
| } |
| vols = { |
| False: ["something-else", "testvol-42", "testvol-451"], |
| True: [ |
| "another.snapshot", |
| "testvol-616", |
| "testvol-9000", |
| "some-testvol-12", |
| "yet.another.thing", |
| ], |
| } |
| mock_api = MockApi(attached=attached, vols=vols) |
| |
| r_cleanup.remove_volumes(cfg, mock_api, "testvol-") |
| |
| assert mock_api.attached == { |
| key: [name for name in value if not name.startswith("testvol-")] |
| for key, value in attached.items() |
| } |
| assert mock_api.vols == { |
| key: [name for name in value if not name.startswith("testvol-")] |
| for key, value in vols.items() |
| } |
| |
| assert mock_api.invoked == { |
| "attachmentsList": 1, |
| "volumesReassignWait": 1, |
| "volumesList": 1, |
| "snapshotsList": 1, |
| "volumeDelete": len( |
| [name for name in vols[False] if name.startswith("testvol-")] |
| ), |
| "snapshotDelete": len( |
| [name for name in vols[True] if name.startswith("testvol-")] |
| ), |
| } |
| |
| |
| def test_remove_volumes_noop() -> None: |
| """Make sure the StorPool volumes and snapshots will be removed.""" |
| cfg = r_cleanup.Config.default()._replace(noop=True) |
| attached = { |
| False: ["something-else", "testvol-42"], |
| True: ["testvol-9000", "another.snapshot", "testvol-616", "some-testvol-12"], |
| } |
| vols = { |
| False: ["something-else", "testvol-42", "testvol-451"], |
| True: [ |
| "another.snapshot", |
| "testvol-616", |
| "testvol-9000", |
| "some-testvol-12", |
| "yet.another.thing", |
| ], |
| } |
| mock_api = MockApi(attached=attached, vols=vols) |
| |
| r_cleanup.remove_volumes(cfg, mock_api, "testvol-") |
| |
| assert mock_api.attached == attached |
| assert mock_api.vols == vols |
| |
| assert mock_api.invoked == { |
| "attachmentsList": 1, |
| "volumesList": 1, |
| "snapshotsList": 1, |
| } |