blob: 7c90d08c850ec65a0dc23a3aaa5d37c60794aaf7 [file] [log] [blame]
Peter Pentchevd00519f2021-11-08 01:17:13 +02001"""Test the sp_rand_cleanup tool."""
2
3import collections
4import errno
5import pathlib
6import tempfile
7
8from typing import DefaultDict, Dict, List, NamedTuple # noqa: H301
9
10import pytest
11
12from storpool import sptypes # type: ignore
13
14from sp_rand import cleanup as r_cleanup
15from sp_rand import defs as r_defs
16
17
18def test_parse_file() -> None:
19 """Make sure the name prefix file will be parsed correctly."""
20 with tempfile.TemporaryDirectory() as tempd_obj:
21 tempd = pathlib.Path(tempd_obj)
22 cfg = r_cleanup.Config.default()._replace(confdir=tempd)
23
24 conffile = tempd / r_defs.FILENAME
25 conffile.write_text(f"{r_defs.PREFIX_VAR}=testvol-\n", encoding="UTF-8")
26
27 parsed_conffile, prefix = r_cleanup.parse_file(cfg)
28 assert (parsed_conffile, prefix) == (conffile, "testvol-")
29
30
31def do_test_remove_file(noop: bool) -> None:
32 """Make sure the name prefix file will be removed... or not."""
33 with tempfile.TemporaryDirectory() as tempd_obj:
34 tempd = pathlib.Path(tempd_obj)
35 cfg = r_cleanup.Config.default()._replace(confdir=tempd, noop=noop)
36
37 conffile = tempd / r_defs.FILENAME
38 otherfile = tempd / "something.conf"
39
40 otherfile.write_text("Can't touch this!\n", encoding="UTF-8")
41 assert sorted(tempd.iterdir()) == [otherfile]
42
43 # Make sure it does not complain if the file does not exist.
44 r_cleanup.remove_file(cfg, conffile)
45 assert sorted(tempd.iterdir()) == [otherfile]
46
47 conffile.write_text("hello\n", encoding="UTF-8")
48 assert sorted(tempd.iterdir()) == sorted([conffile, otherfile])
49
50 # Make sure it raises an error if it cannot remove the file.
51 tempd.chmod((tempd.stat().st_mode & 0o7777) & 0o7577)
52 try:
53 if noop:
54 r_cleanup.remove_file(cfg, conffile)
55 else:
56 with pytest.raises(OSError) as exc_info:
57 r_cleanup.remove_file(cfg, conffile)
58 assert exc_info.value.errno == errno.EACCES
59 finally:
60 tempd.chmod((tempd.stat().st_mode & 0o7777) | 0o0200)
61 assert sorted(tempd.iterdir()) == sorted([conffile, otherfile])
62
63 # Make sure it removes the file.
64 r_cleanup.remove_file(cfg, conffile)
65 if noop:
66 assert sorted(tempd.iterdir()) == sorted([conffile, otherfile])
67 else:
68 assert sorted(tempd.iterdir()) == [otherfile]
69
70
71def test_remove_file() -> None:
72 """Make sure the name prefix file will be removed."""
73 do_test_remove_file(False)
74
75
76def test_remove_file_noop() -> None:
77 """Make sure the name prefix file will not be removed."""
78 do_test_remove_file(True)
79
80
81class MockVolumeSummary(NamedTuple):
82 """Mock a volume or snapshot description."""
83
84 name: str
85
86
87class MockApi:
88 """Mock the StorPool API."""
89
90 attached: Dict[bool, List[str]]
91 invoked: DefaultDict[str, int]
92 vols: Dict[bool, List[str]]
93
94 def __init__(
95 self, attached: Dict[bool, List[str]], vols: Dict[bool, List[str]]
96 ) -> None:
97 """Store deep copies of the passed definitions."""
98 self.attached = {key: list(value) for key, value in attached.items()}
99 self.vols = {key: list(value) for key, value in vols.items()}
100 self.invoked = collections.defaultdict(lambda: 0)
101
102 # pylint: disable=invalid-name
103
104 def attachmentsList(self) -> List[sptypes.AttachmentDesc]:
105 """Return the current view of what is attached."""
106 self.invoked["attachmentsList"] += 1
107 return [
108 sptypes.AttachmentDesc(
109 volume=name, snapshot=False, client=11, rights="rw", pos=0
110 )
111 for name in self.attached[False]
112 ] + [
113 sptypes.AttachmentDesc(
114 volume=name, snapshot=True, client=11, rights="ro", pos=0
115 )
116 for name in self.attached[True]
117 ]
118
119 def volumesReassignWait(self, req: sptypes.VolumesReassignWaitDesc) -> None:
120 """Update the current view of what is attached."""
121 self.invoked["volumesReassignWait"] += 1
122
123 for item in req.reassign:
124 if isinstance(item, sptypes.VolumeReassignDesc):
125 assert item.volume.startswith("testvol-")
126 self.attached[False].remove(item.volume)
127 else:
128 assert isinstance(item, sptypes.SnapshotReassignDesc)
129 assert item.snapshot.startswith("testvol-")
130 self.attached[True].remove(item.snapshot)
131
132 assert item.detach == [11]
133
134 def volumesList(self) -> List[MockVolumeSummary]:
135 """Return the current view of the available volumes."""
136 self.invoked["volumesList"] += 1
137 return [MockVolumeSummary(name=name) for name in self.vols[False]]
138
139 def snapshotsList(self) -> List[MockVolumeSummary]:
140 """Return the current view of the available snapshots."""
141 self.invoked["snapshotsList"] += 1
142 return [MockVolumeSummary(name=name) for name in self.vols[True]]
143
144 def _volumeDelete(self, name: str, snapshot: bool) -> None:
145 """Remove a snapshot or a volume from our view."""
146 assert name not in self.attached[snapshot]
147 self.vols[snapshot].remove(name)
148
149 def volumeDelete(self, name: str) -> None:
150 """Remove a volume from our view."""
151 self.invoked["volumeDelete"] += 1
152 self._volumeDelete(name, False)
153
154 def snapshotDelete(self, name: str) -> None:
155 """Remove a snapshot from our view."""
156 self.invoked["snapshotDelete"] += 1
157 self._volumeDelete(name, True)
158
159
160def test_remove_volumes() -> None:
161 """Make sure the StorPool volumes and snapshots will be removed."""
162 cfg = r_cleanup.Config.default()
163 attached = {
164 False: ["something-else", "testvol-42"],
165 True: ["testvol-9000", "another.snapshot", "testvol-616", "some-testvol-12"],
166 }
167 vols = {
168 False: ["something-else", "testvol-42", "testvol-451"],
169 True: [
170 "another.snapshot",
171 "testvol-616",
172 "testvol-9000",
173 "some-testvol-12",
174 "yet.another.thing",
175 ],
176 }
177 mock_api = MockApi(attached=attached, vols=vols)
178
179 r_cleanup.remove_volumes(cfg, mock_api, "testvol-")
180
181 assert mock_api.attached == {
182 key: [name for name in value if not name.startswith("testvol-")]
183 for key, value in attached.items()
184 }
185 assert mock_api.vols == {
186 key: [name for name in value if not name.startswith("testvol-")]
187 for key, value in vols.items()
188 }
189
190 assert mock_api.invoked == {
191 "attachmentsList": 1,
192 "volumesReassignWait": 1,
193 "volumesList": 1,
194 "snapshotsList": 1,
195 "volumeDelete": len(
196 [name for name in vols[False] if name.startswith("testvol-")]
197 ),
198 "snapshotDelete": len(
199 [name for name in vols[True] if name.startswith("testvol-")]
200 ),
201 }
202
203
204def test_remove_volumes_noop() -> None:
205 """Make sure the StorPool volumes and snapshots will be removed."""
206 cfg = r_cleanup.Config.default()._replace(noop=True)
207 attached = {
208 False: ["something-else", "testvol-42"],
209 True: ["testvol-9000", "another.snapshot", "testvol-616", "some-testvol-12"],
210 }
211 vols = {
212 False: ["something-else", "testvol-42", "testvol-451"],
213 True: [
214 "another.snapshot",
215 "testvol-616",
216 "testvol-9000",
217 "some-testvol-12",
218 "yet.another.thing",
219 ],
220 }
221 mock_api = MockApi(attached=attached, vols=vols)
222
223 r_cleanup.remove_volumes(cfg, mock_api, "testvol-")
224
225 assert mock_api.attached == attached
226 assert mock_api.vols == vols
227
228 assert mock_api.invoked == {
229 "attachmentsList": 1,
230 "volumesList": 1,
231 "snapshotsList": 1,
232 }