| # SPDX-FileCopyrightText: StorPool <support@storpool.com> |
| # SPDX-License-Identifier: BSD-2-Clause |
| """Clone a Git repository.""" |
| |
| from __future__ import annotations |
| |
| import subprocess |
| |
| from typing import TYPE_CHECKING |
| |
| from . import defs |
| |
| if TYPE_CHECKING: |
| import pathlib |
| |
| from typing import Final |
| |
| |
| class GitError(defs.GApplyError): |
| """An error that occurred during a Git-related operation.""" |
| |
| |
| def repo_clone(cfg: defs.Config, repo: defs.Repo, tempd: pathlib.Path) -> pathlib.Path: |
| """Clone a Git repository.""" |
| loc: Final = cfg.repo_urls.get(repo.origin.upper()) |
| if loc is None: |
| raise GitError(f"Unknown repository origin {repo.origin!r}") |
| cfg.log.info( |
| "Cloning the %(origin)s %(repo)s repository from %(loc)s", |
| {"origin": repo.origin, "repo": repo.repo, "loc": loc.url.geturl()}, |
| ) |
| |
| repo_url: Final = loc.url._replace( |
| path=loc.url.path + ("" if loc.url.path.endswith("/") else "/") + repo.repo |
| ) |
| repo_dir: Final = tempd / repo.origin / repo.repo |
| if repo_dir.exists() or repo_dir.is_symlink(): |
| raise GitError(f"Did not expect {repo_dir} to exist") |
| repo_dir.parent.mkdir(mode=0o755, exist_ok=True, parents=True) |
| |
| cfg.log.debug( |
| "About to clone %(repo_url)s into %(repo_dir)s", |
| {"repo_url": repo_url.geturl(), "repo_dir": repo_dir}, |
| ) |
| try: |
| subprocess.run( |
| ["git", "clone", repo_url.geturl(), repo.repo, "-b", "master"], |
| check=True, |
| cwd=repo_dir.parent, |
| ) |
| except (OSError, subprocess.CalledProcessError) as err: |
| raise GitError( |
| f"Could not run `git clone {repo_url.geturl()} {repo.repo}` in {tempd}: {err}" |
| ) from err |
| if not repo_dir.is_dir(): |
| raise GitError(f"`git clone` did not create {repo_dir}") |
| |
| return repo_dir |
| |
| |
| def list_change_ids(cfg: defs.Config, repo_dir: pathlib.Path) -> list[str]: |
| """Get the Change-Id fields of all the commits reachable from the current head.""" |
| cfg.log.info("Getting the change IDs in %(repo_dir)s", {"repo_dir": repo_dir}) |
| try: |
| lines = [ |
| line |
| for line in subprocess.check_output( |
| ["git", "log", "--pretty=%(trailers:key=Change-Id)", "--reverse"], |
| cwd=repo_dir, |
| encoding="UTF-8", |
| shell=False, |
| ).splitlines() |
| if line |
| ] |
| except (OSError, subprocess.CalledProcessError) as err: |
| raise GitError(f"Could not run `git log` for change IDs in {repo_dir}: {err}") from err |
| except ValueError as err: |
| raise GitError( |
| f"Could not decode the output of `git log` in {repo_dir} into UTF-8 change IDs: {err}" |
| ) from err |
| |
| def parse_line(line: str) -> str: |
| """Parse a "Change-Id: Ixxx" line.""" |
| fields: Final = line.split() |
| # The magic value will go away once we can use structural pattern matching |
| if ( |
| len(fields) != 2 # noqa: PLR2004 # pylint: disable=magic-value-comparison |
| or fields[0] != "Change-Id:" # pylint: disable=magic-value-comparison |
| or not fields[1].startswith("I") |
| ): |
| raise GitError(f"Unexpected `git log` ouput for change IDs: {line!r}") |
| return fields[1] |
| |
| return [parse_line(line) for line in lines] |