blob: 767762b251ce0d9b17d4a7b26aff4689bde13072 [file] [log] [blame]
"""Clean up: remove volumes, snapshots, and the file itself."""
from __future__ import annotations
import argparse
import errno
import itertools
import pathlib
from typing import NamedTuple, Tuple # noqa: H301
import confget
from storpool import spapi # type: ignore
from storpool import sptypes
from . import defs
class Config(NamedTuple):
"""Runtime configuration for the sp_rand_cleanup tool."""
confdir: pathlib.Path
noop: bool
skip_api: bool
@staticmethod
def default() -> Config:
"""Build a configuration object with default values."""
return Config(confdir=defs.DEFAULT_CONFDIR, noop=False, skip_api=False)
def parse_file(cfg: Config) -> Tuple[pathlib.Path, str]:
"""Parse the StorPool configuration file."""
conffile = cfg.confdir / defs.FILENAME
print(f"Parsing {conffile}")
data = confget.read_ini_file(confget.Config([], filename=str(conffile)))
assert isinstance(data, dict)
assert sorted(data) == [""]
assert sorted(data[""]) == [defs.PREFIX_VAR]
prefix = data[""][defs.PREFIX_VAR]
print(f"- got prefix {prefix!r}")
return conffile, prefix
def remove_iscsi_targets_and_exports(cfg: Config, api: spapi.Api, prefix: str) -> None:
"""Unexport volumes, remove the iSCSI target records."""
print("Querying the StorPool API for the iSCSI configuration")
iscsi = api.iSCSIConfig().iscsi
print("Looking for target records for our volumes")
targets = {
tgt.name: tgt.volume
for tgt in iscsi.targets.values()
if tgt.volume.startswith(prefix)
}
if targets:
print(f"Found {len(targets)} target records, looking for exports")
exports = list(
itertools.chain(
*(
[
(idata.name, exp.portalGroup, targets[exp.target])
for exp in idata.exports
if exp.target in targets
]
for idata in iscsi.initiators.values()
)
)
)
if exports:
to_remove = []
for exp_name, exp_pg, exp_vol in exports:
print(f"Will remove the export for {exp_vol} to {exp_name} in {exp_pg}")
to_remove.append(
sptypes.iSCSIConfigCommand(
exportDelete=sptypes.iSCSICommandExportDelete(
initiator=exp_name, portalGroup=exp_pg, volumeName=exp_vol
)
)
)
print(f"Prepared for removing {len(to_remove)} exports")
if not cfg.noop:
api.iSCSIConfigChange(sptypes.iSCSIConfigChange(commands=to_remove))
else:
print("No iSCSI exports to remove")
to_remove = []
for vol in sorted(targets.values()):
print(f"Will remove the target record for volume {vol}")
to_remove.append(
sptypes.iSCSIConfigCommand(
deleteTarget=sptypes.iSCSICommandDeleteTarget(volumeName=vol)
)
)
print(f"Prepared for removing {len(to_remove)} targets")
if not cfg.noop:
api.iSCSIConfigChange(sptypes.iSCSIConfigChange(commands=to_remove))
else:
print("No iSCSI targets to remove, not looking for exports at all")
def remove_attachments(cfg: Config, api: spapi.Api, prefix: str) -> None:
"""Detach StorPool volumes and snapshots from the test hosts."""
print("Querying the StorPool API for attached volumes and snapshots")
to_detach = []
for desc in api.attachmentsList(): # pylint: disable=not-callable
if not desc.volume.startswith(prefix):
continue
if desc.snapshot:
print(f"Will detach snapshot {desc.volume} from client {desc.client}")
to_detach.append(
sptypes.SnapshotReassignDesc(
detach=[desc.client], force=True, snapshot=desc.volume
)
)
else:
print(f"Will detach volume {desc.volume} from client {desc.client}")
to_detach.append(
sptypes.VolumeReassignDesc(
detach=[desc.client], force=True, volume=desc.volume
)
)
if to_detach:
print(f"Detaching {len(to_detach)} volumes/snapshots")
if not cfg.noop:
api.volumesReassignWait( # pylint: disable=not-callable
sptypes.VolumesReassignWaitDesc(reassign=to_detach)
)
def remove_volumes_and_snapshots(cfg: Config, api: spapi.Api, prefix: str) -> None:
"""Actually delete the StorPool volumes and snapshots."""
print("Querying the StorPool API for existing volumes")
for name in [vol.name for vol in api.volumesList()]:
if not name.startswith(prefix):
continue
print(f"Removing volume {name}")
if not cfg.noop:
api.volumeDelete(name)
print("Querying the StorPool API for existing snapshots")
for name in [snap.name for snap in api.snapshotsList()]:
if not name.startswith(prefix):
continue
print(f"Removing snapshot {name}")
if not cfg.noop:
api.snapshotDelete(name)
def remove_volumes(cfg: Config, api: spapi.Api, prefix: str) -> None:
"""Detach and remove any pertinent StorPool volumes and snapshots."""
remove_attachments(cfg, api, prefix)
remove_iscsi_targets_and_exports(cfg, api, prefix)
remove_volumes_and_snapshots(cfg, api, prefix)
def remove_file(cfg: Config, conffile: pathlib.Path) -> None:
"""Remove the StorPool configuration file."""
print(f"Removing {conffile}")
if not cfg.noop:
try:
conffile.unlink()
except OSError as err:
if err.errno != errno.ENOENT:
raise
def parse_args() -> Config:
"""Parse the command-line arguments."""
parser = argparse.ArgumentParser(prog="sp_rand_cleanup")
parser.add_argument(
"-d",
"--confdir",
type=pathlib.Path,
default=defs.DEFAULT_CONFDIR,
help="The directory to look for the configuration file in",
)
parser.add_argument(
"-N",
"--noop",
action="store_true",
help="No-operation mode; display what would have been done",
)
parser.add_argument(
"--skip-api",
type=str,
help=(
"If explicitly given the 'yes' value, do not "
"delete any volumes or snapshots"
),
)
args = parser.parse_args()
return Config(confdir=args.confdir, noop=args.noop, skip_api=args.skip_api == "yes")
def main() -> None:
"""Main program: parse command-line options, handle them."""
cfg = parse_args()
conffile, prefix = parse_file(cfg)
if not cfg.skip_api:
api = spapi.Api.fromConfig()
remove_volumes(cfg, api, prefix)
remove_file(cfg, conffile)
if __name__ == "__main__":
main()