blob: 931e9309b4ea998e9a670dcb769107ce0ed84f0b [file] [log] [blame]
# SPDX-FileCopyrightText: StorPool <support@storpool.com>
# SPDX-License-Identifier: BSD-2-Clause
"""Clone a Git repository."""
from __future__ import annotations
import subprocess # noqa: S404
from typing import TYPE_CHECKING
from . import defs
if TYPE_CHECKING:
import pathlib
from typing import Final
class GitError(defs.GApplyError):
"""An error that occurred during a Git-related operation."""
def repo_clone(cfg: defs.Config, repo: defs.Repo, tempd: pathlib.Path) -> pathlib.Path:
"""Clone a Git repository."""
loc: Final = cfg.repo_urls.get(repo.origin.upper())
if loc is None:
raise GitError(f"Unknown repository origin {repo.origin!r}")
cfg.log.info(
"Cloning the %(origin)s %(repo)s repository from %(loc)s",
{"origin": repo.origin, "repo": repo.repo, "loc": loc.url.geturl()},
)
repo_url: Final = loc.url._replace(
path=loc.url.path + ("" if loc.url.path.endswith("/") else "/") + repo.repo
)
repo_dir: Final = tempd / repo.origin / repo.repo
if repo_dir.exists() or repo_dir.is_symlink():
raise GitError(f"Did not expect {repo_dir} to exist")
repo_dir.parent.mkdir(mode=0o755, exist_ok=True, parents=True)
cfg.log.debug(
"About to clone %(repo_url)s into %(repo_dir)s",
{"repo_url": repo_url.geturl(), "repo_dir": repo_dir},
)
try:
subprocess.run(
["git", "clone", repo_url.geturl(), repo.repo, "-b", "master"], # noqa: S603,S607
check=True,
cwd=repo_dir.parent,
)
except (OSError, subprocess.CalledProcessError) as err:
raise GitError(
f"Could not run `git clone {repo_url.geturl()} {repo.repo}` in {tempd}: {err}"
) from err
if not repo_dir.is_dir():
raise GitError(f"`git clone` did not create {repo_dir}")
return repo_dir
def list_change_ids(cfg: defs.Config, repo_dir: pathlib.Path) -> list[str]:
"""Get the Change-Id fields of all the commits reachable from the current head."""
cfg.log.info("Getting the change IDs in %(repo_dir)s", {"repo_dir": repo_dir})
try:
lines = [
line
for line in subprocess.check_output(
["git", "log", "--pretty=%(trailers:key=Change-Id)", "--reverse"], # noqa: S607
cwd=repo_dir,
encoding="UTF-8",
shell=False, # noqa: S603
).splitlines()
if line
]
except (OSError, subprocess.CalledProcessError) as err:
raise GitError(f"Could not run `git log` for change IDs in {repo_dir}: {err}") from err
except ValueError as err:
raise GitError(
f"Could not decode the output of `git log` in {repo_dir} into UTF-8 change IDs: {err}"
) from err
def parse_line(line: str) -> str:
"""Parse a "Change-Id: Ixxx" line."""
fields: Final = line.split()
# The magic value will go away once we can use structural pattern matching
if (
len(fields) != 2 # noqa: PLR2004
or fields[0] != "Change-Id:"
or not fields[1].startswith("I")
):
raise GitError(f"Unexpected `git log` ouput for change IDs: {line!r}")
return fields[1]
return [parse_line(line) for line in lines]