blob: ede0fd77da9e8337fdb76a8d3858f22258a86dce [file] [log] [blame]
# Copyright 2016 The Fuchsia Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
from urllib.parse import urlparse
import collections
import datetime
import hashlib
import re
from recipe_engine import recipe_api
class GitApi(recipe_api.RecipeApi):
"""GitApi provides support for Git."""
LOCAL_OPERATION_TIMEOUT = datetime.timedelta(minutes=1)
REMOTE_OPERATION_TIMEOUT = datetime.timedelta(minutes=10)
class RebaseError(recipe_api.StepFailure):
pass
_GIT_HASH_RE = re.compile("[0-9a-fA-F]{40}")
def __init__(self, props, *args, **kwargs):
super(GitApi, self).__init__(*args, **kwargs)
self._trace = props.trace
def __call__(self, step_name, *args, **kwargs):
"""Return a git command step.
Args:
step_name (str): Name of the step.
"""
trace = kwargs.pop("trace", self._trace)
env = {}
logs = []
if trace:
variables = [
"GIT_TRACE",
"GIT_TRACE2_EVENT",
"GIT_DAPPER_TRACE",
"GIT_TRACE_CURL",
"GIT_TRACE_CURL_NO_DATA",
"GIT_TRACE_PACKET",
"GIT_BACKENDINFO",
]
trace_dir = self.m.path.mkdtemp()
env[
"GIT_SSH_COMMAND"
] = "ssh -o SendEnv=GIT_DAPPER_TRACE -o SendEnv=GIT_BACKENDINFO"
for var in variables:
log = trace_dir.join(f"{var}.log")
env[var] = log
logs.append(log)
cmd = ["git"]
for k, v in sorted(kwargs.pop("config", {}).items()):
cmd.extend(["-c", f"{k}={v}"])
cmd += list(args)
# Default timeout for generic git operations.
kwargs.setdefault("timeout", datetime.timedelta(minutes=10))
with self.m.context(env=env):
if not trace:
return self.m.step(step_name, cmd, **kwargs)
with self.m.step.nest(step_name):
try:
return self.m.step(step_name, cmd, **kwargs)
finally:
self.m.path.mock_add_file(logs[0])
for log in logs:
if self.m.path.isfile(log):
self.m.file.read_text(self.m.path.basename(log), log)
def apply(self, patch, **kwargs):
"""Apply the specified git patch file.
Args:
patch (str): Path to git patch file.
"""
kwargs.setdefault("timeout", self.LOCAL_OPERATION_TIMEOUT)
return self("apply patch", "apply", patch, **kwargs)
def branch(self, contains=None, remotes=False, step_name="git branch"):
"""Run git branch.
Args:
contains (str): Return local branches which contain this ref.
remotes (bool): Return only remote branches.
"""
args = ["branch"]
if remotes:
args.append("--remotes")
if contains:
args += ["--contains", contains]
timeout = (
self.REMOTE_OPERATION_TIMEOUT if remotes else self.LOCAL_OPERATION_TIMEOUT
)
step = self(
step_name,
*args,
timeout=timeout,
trace=False,
stdout=self.m.raw_io.output_text(),
)
step.presentation.logs["stdout"] = step.stdout
lines = step.stdout.strip().split("\n")
branches = []
for line in lines:
branch = line.strip()
if remotes:
prefix = "origin/"
if branch.startswith(prefix):
branch = branch[len(prefix) :]
branches.append(branch)
return branches
def init(self, bare=False, **kwargs):
"""Create an empty git repository or re-initialize an existing one.
Args:
bare (bool): Whether to create a bare repository.
"""
args = ["init"]
if bare:
args.append("--bare")
kwargs.setdefault("timeout", self.LOCAL_OPERATION_TIMEOUT)
return self("git init", *args, **kwargs)
def config(self, *args, **kwargs):
"""Set a config.
Args:
args (list of str): Command-line arguments to 'git config'.
"""
step_name = kwargs.pop("step_name", "config")
kwargs.setdefault("timeout", self.LOCAL_OPERATION_TIMEOUT)
return self(step_name, "config", *args, **kwargs)
def config_remove_section(self, section, step_name="remove section", **kwargs):
"""Remove section from config.
Args:
step_name (str): Name of the step.
section (str): Section to remove.
"""
kwargs.setdefault("timeout", self.LOCAL_OPERATION_TIMEOUT)
return self(step_name, "config", "--remove-section", section, **kwargs)
def config_replace_all(
self,
name,
value,
value_regex=None,
step_name="git config replace all",
**kwargs,
):
"""Replace all matching config lines.
Args:
name (str): Name of config to replace.
value (str): Value to replace config with.
value_regex (str): Regex for multiline replacement.
"""
args = ["config", "--replace-all", name, value]
if value_regex:
args.append(value_regex)
kwargs.setdefault("timeout", self.LOCAL_OPERATION_TIMEOUT)
return self(step_name, *args, **kwargs)
def fetch(
self,
repository=None,
refspec=None,
all_remotes=False,
prune=False,
tags=False,
recurse_submodules=None,
depth=None,
filters=False,
step_name="git fetch",
**kwargs,
):
"""Fetch a ref.
Args:
repository (str): Repository to fetch. Must be set if `all_remotes` is
not set.
refspec (str): Refspec to fetch.
all_remotes (bool): Whether to fetch all remotes. Must be set if
`repository` is not set.
prune (bool): Whether to prune stale refs.
tags (bool): Whether to fetch tags.
recurse_submodules (bool|str|None): Whether to recurse submodules.
Default in Git is "on-demand" unless git config is different.
None means pass no options into git to use the default.
depth (int or None): History fetch depth. If None, do a full fetch.
filters (bool or str or seq[str]): Use git's --filter option. If
True, uses "blob:none".
step_name (str): Passed through to __call__().
"""
assert (repository and not all_remotes) or (
all_remotes and not repository
), "only one of `repository` or `all_remotes` can be set"
args = ["fetch"]
if all_remotes:
args.append("--all")
if prune:
args.append("--prune")
if tags:
args.append("--tags")
args.extend(("--jobs", str(min(16, max(2, self.m.platform.cpu_count // 2)))))
if repository:
args.append(repository)
if refspec:
args.append(refspec)
if filters:
if filters is True:
args.append("--filter=blob:none")
else:
if isinstance(filters, str):
filters = (filters,) # pragma: no cover
args.extend(f"--filter={x}" for x in filters)
# The default value for recurse-submodules is usually "on-demand"
# (unless set otherwise in git config). If not specified by the caller
# don't pass any --recurse-submodules argument.
assert recurse_submodules in (True, False, None, "on-demand")
if recurse_submodules == True:
args.append("--recurse-submodules")
elif recurse_submodules == False:
args.append("--no-recurse-submodules")
elif recurse_submodules == "on-demand":
args.append("--recurse-submodules=on-demand") # pragma: no cover
if depth:
args.extend(["--depth", depth])
# Fetch can be slow, and we want to allow time for retries by git-retry.
kwargs.setdefault("timeout", datetime.timedelta(minutes=20))
return self(step_name, *args, **kwargs)
def remote_add(self, remote, url, **kwargs):
"""Add a remote.
Args:
remote (str): Remote to add.
url (str): URL of the remote.
"""
return self("git remote", "remote", "add", remote, url, **kwargs)
def rm(self, pathspec, recursive=False, step_name="git rm", **kwargs):
"""
Remove files matching pathspec from the index.
Args:
pathspec (str): Pathspec to remove.
recursive (bool): Allow recursive removal.
"""
kwargs.setdefault("timeout", self.LOCAL_OPERATION_TIMEOUT)
args = ["rm"]
if recursive:
args.append("-r")
args.append(pathspec)
return self(step_name, *args, **kwargs)
def rev_parse(self, ref, step_name="git rev-parse", **kwargs):
"""Parse a rev into a revision.
Args:
ref (str): Git ref to parse into revision.
"""
kwargs.setdefault("timeout", self.LOCAL_OPERATION_TIMEOUT)
return self(
step_name, "rev-parse", ref, stdout=self.m.raw_io.output_text(), **kwargs
).stdout.strip()
def rev_list_count(
self, rev1, rev2=None, step_name="git rev-list --count", test_data=None
):
"""Return the number of revisions between rev1 and rev2.
Args:
rev1 (str): The first git revision.
rev2 (str): The second git revision.
"""
if rev2 is None:
rev_string = rev1
else:
rev_string = f"{rev1}..{rev2}"
num_revisions = self(
step_name,
"rev-list",
"--count",
rev_string,
timeout=self.LOCAL_OPERATION_TIMEOUT,
stdout=self.m.raw_io.output_text(),
step_test_data=lambda: self.m.raw_io.test_api.stream_output_text(test_data),
).stdout.strip()
return int(num_revisions)
def hash_object(self, hash_input, step_name="git hash-object", step_test_data=None):
"""Compute object ID.
Args:
step_name (str): Name of the step.
hash_input (str): Hash input.
"""
if not step_test_data:
mock_hash = hashlib.sha1(hash_input.encode()).hexdigest()
step_test_data = lambda: self.m.raw_io.test_api.stream_output_text(
mock_hash
)
return self(
step_name,
"hash-object",
self.m.raw_io.input(hash_input),
timeout=self.LOCAL_OPERATION_TIMEOUT,
stdout=self.m.raw_io.output_text(),
step_test_data=step_test_data,
)
def remote_alias(self, remote):
"""Convert a git remote URL to a short distinctive string.
The result is based on the prefix of the remote's hostname as well as
the path.
E.g. "https://fuchsia.googlesource.com/foo/bar.git" becomes
"fuchsia-foo-bar".
"""
url = urlparse(remote)
path = url.path
if path.endswith(".git"): # https://host/foobar.git
path = path[: -len(".git")]
path = path.strip("/")
host = url.netloc.split(".")[0]
return host + "-" + path.replace("/", "-")
def clean(
self,
step_name="git clean",
force=False,
recursive=False,
ignore_rules=False,
**kwargs,
):
"""Remove untracked files from working tree.
Args:
force (bool): Clean with force.
recursive (bool): Recurse into directories.
ignore_rules (bool): -x option: use ignore rules from -e.
"""
args = ["clean"]
if force:
args.append("-f")
if recursive:
args.append("-d")
if ignore_rules:
args.append("-x")
return self(step_name, *args, **kwargs)
def raw_checkout(
self, step_name="git checkout", ref=None, force=False, branch=None, **kwargs
):
"""Run a git checkout.
Args:
step_name (str): Name of the step.
ref (str or None): Ref to checkout.
force (bool): Checkout with force.
branch (str): Checkout new branch with this name.
"""
args = []
args.append("checkout")
if force:
args.append("-f")
if branch:
args += ["-b", branch]
if ref:
args.append(ref)
return self(step_name, *args, **kwargs)
def sync_submodule(self, step_name="git submodule sync", recursive=False, **kwargs):
"""Sync submodule.
Args:
step_name (str): Name of the step.
recursive (bool): Recursively sync submodules.
"""
args = ["submodule", "sync"]
if recursive:
args.append("--recursive")
return self(step_name, *args, **kwargs)
def update_submodule(
self,
step_name="git submodule update",
recursive=False,
force=False,
init=True,
paths=None,
remote=False,
filters=False,
fetch=True,
**kwargs,
):
"""Update submodule.
Args:
step_name (str): Name of the step.
recursive (bool): Recursively update submodule.
force (bool): Force-update submodule.
init (bool): Initialize uninitialized submodules.
paths (str): Path(s) to submodule(s).
remote (bool): Update to remote tracking branch.
filters (bool or str or seq[str]): Use git's --filter option. If
True, uses "blob:none".
fetch (bool): Whether to fetch when doing the submodule update.
"""
args = ["submodule", "update"]
if init:
args.append("--init")
if recursive:
args.append("--recursive")
if force:
args.append("--force")
if remote:
args.append("--remote")
if not fetch:
args.append("--no-fetch")
args.extend(("--jobs", str(min(16, max(2, self.m.platform.cpu_count // 2)))))
if filters:
if filters is True:
args.append("--filter=blob:none")
else:
if isinstance(filters, str):
filters = (filters,) # pragma: no cover
args.extend(f"--filter={x}" for x in filters)
if paths:
args.extend(paths)
return self(step_name, *args, **kwargs)
def commit(
self,
message=None,
no_edit=False,
step_name="git commit",
files=(),
amend=False,
allow_empty=False,
all_tracked=False,
all_files=False,
author_override=None,
**kwargs,
):
"""Run git commit in the current working directory.
Args:
message (str): The message to attach to the commit. Mutually exclusive
with `no_edit`.
no_edit (bool): Do not edit commit message. Mutually exclusive with
`message`.
files (seq[Path]): The set of files containing changes to commit.
amend (bool): Whether to amend the previous commit.
allow_empty (bool): Whether to allow an empty commit.
all_tracked (bool): Stage all tracked files before committing. If True,
files must be empty and all_files must be False.
all_files (bool): Stage all files (even untracked) before committing. If
True, files must be empty and all_tracked must be False.
author_override (dict): Dict representation of a git.Commit_User
containing name and email keys. If specified, override the author of
the commit.
"""
assert (message and not no_edit) or (not message and no_edit)
cmd = ["commit"]
if message:
cmd += ["-m", message]
if no_edit:
cmd.append("--no-edit")
if amend:
cmd.append("--amend")
if allow_empty:
cmd.append("--allow-empty")
if all_tracked:
assert not all_files
assert not files
cmd.append("-a")
elif all_files:
assert not files
self.add(add_all=True)
else:
cmd.extend(files)
if author_override:
cmd += [
"--author",
f"{author_override['name']} <{author_override['email']}>",
]
return self(step_name, *cmd, **kwargs)
def tag(self, name, step_name="git tag"):
"""Create a tag.
Args:
name (str): Name of tag.
"""
return self(step_name, "tag", name, timeout=self.LOCAL_OPERATION_TIMEOUT)
def add(
self,
step_name="git add",
pathspec=None,
add_all=False,
intent_to_add=False,
only_tracked=False,
force=False,
**kwargs,
):
"""Add file contents to index.
Args:
step_name (str): Name of the step.
pathspec (str): Add pathspec to index.
add_all (bool): Make index match the working tree.
intent_to_add (bool): Move any new files into tracked state.
only_tracked (bool): Add all tracked files to the index, and
no untracked files implies add_all=False.
force (bool): Allow adding otherwise ignored files.
"""
args = ["add"]
if only_tracked:
assert not add_all, "only_tracked and add_all cannot be set together"
args.append("--update")
if add_all:
args.append("--all")
if intent_to_add:
args.append("--intent-to-add")
if force:
args.append("--force")
if pathspec:
args.append(pathspec)
kwargs.setdefault("timeout", self.LOCAL_OPERATION_TIMEOUT)
return self(step_name, *args, **kwargs)
def status(self, step_name="git status", **kwargs):
"""
Displays the state of the working directory and staging area.
Args:
step_name (str): Name of the step.
"""
args = ["status"]
return self(step_name, *args, **kwargs)
def ls_files(
self,
step_name="git ls-files",
modified=False,
deleted=False,
others=False,
exclude_standard=False,
line_termination=False,
file=None,
test_data="hello",
**kwargs,
):
"""
List files in index.
Args:
step_name (str): Name of the step.
modified (bool): Include modified files.
deleted (bool): Include deleted files.
others (bool): Include untracked files.
exclude_standard (bool): Add standard Git exclusions.
line_termination (bool): -z option: null byte termination on output.
file (str): Filepath or regex to filter for.
test_data (str): Test data stdout.
"""
args = ["ls-files"]
if modified:
args.append("--modified")
if deleted:
args.append("--deleted")
if exclude_standard:
args.append("--exclude-standard")
if others:
args.append("--others")
if line_termination:
args.append("-z")
if file:
args += ["--", file]
kwargs.setdefault("timeout", self.LOCAL_OPERATION_TIMEOUT)
return self(
step_name,
*args,
stdout=self.m.raw_io.output_text(),
step_test_data=lambda: self.m.raw_io.test_api.stream_output_text(test_data),
**kwargs,
)
def diff(
self,
step_name="git diff",
ref_base="HEAD",
ref=None,
patch_file=None,
unified=None,
cached=False,
exit_code=False,
step_test_data=None,
**kwargs,
):
"""Run git diff in the current working directory.
Args:
step_name (str): Name of the step.
ref_base (str or None): Base ref for diff.
ref (str or None): Secondary ref for diff.
patch_file (str): Path to patch file.
unified (int): Generate diff with `unified` lines of context.
cached (bool): Get staged files.
exit_code (bool): Exit with codes similar to diff(1).
"""
if step_test_data is None:
step_test_data = lambda: self.m.raw_io.test_api.stream_output_text(
"""
diff --git a/recipe_modules/fuchsia/api.py b/recipe_modules/fuchsia/api.py
index 429e4612..83925ff3 100644
--- a/recipe_modules/fuchsia/api.py
+++ b/recipe_modules/fuchsia/api.py
@@ -562,15 +562,6 @@ class FuchsiaApi(recipe_api.RecipeApi):
- # arm64 hardware targets have a stable version of Zedboot on their
- # recovery partition, we do not want to send the build Zedboot to
- # overwrite it.
- if build.target == 'arm64' and device_type != 'QEMU':
- for image in build.images.keys():
- # Matches both 'zircon-r' and 'zircon-r.signed'
- if 'zircon-r' in image:
- del build.images[image]
"""
)
args = ["diff"]
if ref_base:
args.append(ref_base)
if ref:
args.append(ref)
if unified is not None:
args.append(f"--unified={int(unified)}")
if cached:
args.append("--cached")
if exit_code:
args.append("--exit-code")
# Raise the timeout if it takes more time, commits that "rewrite the
# world" could take more time.
kwargs.setdefault("timeout", self.LOCAL_OPERATION_TIMEOUT)
return self(
step_name,
*args,
step_test_data=step_test_data,
stdout=self.m.raw_io.output_text(leak_to=patch_file),
**kwargs,
)
def log(
self, step_name="git log", depth=10, fmt=None, test_data="aabbccdd", **kwargs
):
"""Get git log.
Args:
step_name (str): Name of the step.
depth (int): Commit depth of log.
fmt (string): Format string to pass to '--pretty'.
"""
args = ["log", "-n", depth]
if fmt:
args += [f"--pretty={fmt}"]
# Raise the timeout if it takes more time, very long log requests could
# take a while.
return self(
step_name,
*args,
timeout=self.LOCAL_OPERATION_TIMEOUT,
stdout=self.m.raw_io.output_text(),
step_test_data=lambda: self.m.raw_io.test_api.stream_output_text(test_data),
**kwargs,
)
def upload_cl(
self,
upstream_ref="main",
labels=None,
cl_notify_option=None,
cc_emails=None,
reviewer_emails=None,
**kwargs,
):
"""Upload a CL to Gerrit via `git push`.
Args:
upstream_ref (str): Upstream ref to upload CL to.
labels (dict): Mapping of Gerrit labels and scores to apply.
cl_notify_option (str): One of "NONE", "OWNER", "OWNER_REVIEWERS",
or "ALL" specifying whom to send CL notifications to.
cc_emails (seq(str)): Emails to CC on the CL.
reviewer_emails (seq(str)): Emails to set as reviewers on the CL.
kwargs (dict): Passed through to `self.push()`.
"""
labels = labels or {}
# Apply labels at push time rather than using `api.gerrit.set_review()`
# to apply them post-push in order to avoid extra Gerrit API requests as
# well as a possible race condition when applying labels to a very
# recently pushed change. See
# https://gerrit-review.googlesource.com/Documentation/user-upload.html#review_labels.
gerrit_options = []
for label, score in sorted(labels.items()):
if score != 0:
# Label push options look like "l=Commit-Queue+2".
gerrit_options.append("l=%s%+d" % (label, score))
if cl_notify_option:
gerrit_options.append(f"notify={cl_notify_option}")
if cc_emails:
gerrit_options.extend(f"cc={c}" for c in cc_emails)
if reviewer_emails:
gerrit_options.extend(f"r={r}" for r in reviewer_emails)
push_ref = f"HEAD:refs/for/{upstream_ref}"
if gerrit_options:
push_ref += "%" + ",".join(gerrit_options)
return self.push(push_ref, **kwargs)
def push(
self,
refs,
step_name="git push",
remote="origin",
atomic=False,
options=(),
dryrun=False,
**kwargs,
):
"""Run a git push for a ref or sequence of refs.
Args:
refs (str or seq(str)): Ref or sequence of refs to push to remote.
remote (str): Remote repository to push to.
atomic (bool): Request atomic transaction on remote side.
NOTE: This option is ignored by Gerrit.
options (seq(str)): Server-specific options passed to the --push-option
flag.
dryrun (bool): Push in dryrun mode.
"""
args = ["push"]
if atomic:
args.append("--atomic")
if dryrun:
args.append("--dry-run")
for option in options:
args.extend(["--push-option", option])
args.append(remote)
if isinstance(refs, str):
args.append(refs)
else:
args.extend(refs)
return self(step_name, *args, **kwargs)
def cherry_pick(self, commit, strategy_option=None, step_name="git cherry-pick"):
"""Cherry-pick a commit.
Args:
step_name (str): Name of the step.
commit (str): Commit to cherry-pick as sha1.
strategy (str|None): Use merge strategy option.
"""
args = ["cherry-pick", commit]
if strategy_option:
args += ["--strategy-option", strategy_option]
# Raise the timeout if it takes more time.
return self(step_name, *args, timeout=self.LOCAL_OPERATION_TIMEOUT)
def revert(self, commit, strategy_option=None, step_name="git revert"):
"""Revert a commit.
Args:
step_name (str): Name of the step.
commit (str): Commit to revert as sha1.
strategy (str|None): Use merge strategy option.
"""
args = ["revert", commit]
if strategy_option:
args += ["--strategy-option", strategy_option]
# Raise the timeout if it takes more time.
return self(step_name, *args, timeout=self.LOCAL_OPERATION_TIMEOUT)
def rebase(self, ref, rebase_merges=False, **kwargs):
"""Run rebase with the given arguments.
If a StepFailure occurs, abort the rebase and re-raise the
error.
"""
args = ["rebase"]
if rebase_merges:
args.append("--rebase-merges")
args.append(ref)
# Rebases may take a long time for large merge commits, so use a
# relatively high timeout.
kwargs.setdefault("timeout", datetime.timedelta(minutes=10))
try:
self("git rebase", *args, **kwargs)
except self.m.step.StepFailure: # pragma: no cover
self("git rebase abort", "rebase", "--abort", **kwargs)
raise self.RebaseError("Failed to rebase")
def get_hash(self, commit="HEAD", **kwargs):
"""Find and return the hash of the given commit."""
step_test_data = kwargs.pop(
"step_test_data",
lambda: self.m.raw_io.test_api.stream_output_text("deadbeef"),
)
return self.rev_parse(commit, step_test_data=step_test_data)
def describe(
self,
step_name="git describe",
commit="HEAD",
tags=True,
contains=False,
exact_match=True,
expected_num=None,
test_data=None,
**kwargs,
):
"""Describe a commit by its tag(s).
Args:
step_name (str): Name of the step.
commit (str): Ref to describe.
tags (bool): If True, use any tag in refs/tags namespace. Otherwise,
only use annotated tags.
contains (bool): If True, find the tag that comes after the given commit.
Automatically implies 'tags' flag.
exact_match (bool): If True, only output exact tag matches to the given
commit.
expected_num (int): If specified, raise exception if expected number
of tags which describe the given commit is not equal to actual.
test_data (func): If specified, overwrite step test data.
Returns:
list: tag(s) describing the commit.
"""
if test_data is None:
test_data = lambda: self.m.raw_io.test_api.stream_output_text(
"testtag1\ntesttag2"
)
cmd = ["describe"]
if tags is True:
cmd.append("--tags")
if contains is True:
cmd.append("--contains")
if exact_match is True:
cmd.append("--exact-match")
cmd.append(commit)
kwargs.setdefault("timeout", self.LOCAL_OPERATION_TIMEOUT)
step = self(
step_name,
*cmd,
stdout=self.m.raw_io.output_text(),
step_test_data=test_data,
ok_ret="any",
**kwargs,
)
# An error will be raised if the commit has no tags to describe it.
tags = [] if step.retcode else step.stdout.strip().split("\n")
if expected_num is not None and len(tags) != expected_num:
raise self.m.step.StepFailure(
f"Expected number of matching tags ({expected_num}) does not match actual ({len(tags)})."
)
return tags
def get_remote_url(self, name, step_test_data=None, **kwargs):
"""Find and return the URL of given remote."""
if not step_test_data:
step_test_data = lambda: self.m.raw_io.test_api.stream_output_text(
"https://fuchsia.googlesource.com/fuchsia"
)
return self.config(
f"remote.{name}.url",
stdout=self.m.raw_io.output_text(),
step_test_data=step_test_data,
**kwargs,
).stdout.strip()
def get_timestamp(self, commit="HEAD", step_name="get commit ts", **kwargs):
"""Find and return the timestamp of the given commit.
Args:
step_name (str): Name of the step.
commit (str): Ref to get timestamp for.
"""
step_test_data = kwargs.pop(
"step_test_data",
lambda: self.m.raw_io.test_api.stream_output_text("1473312770"),
)
args = ["show", commit, "--format=%at", "--no-patch"]
kwargs.setdefault("timeout", self.LOCAL_OPERATION_TIMEOUT)
return self(
step_name,
*args,
step_test_data=step_test_data,
stdout=self.m.raw_io.output_text(),
**kwargs,
).stdout.strip()
def get_author_name(
self, step_name="get commit author name", commit="HEAD", **kwargs
):
"""Find and return the author name of the given commit.
Args:
step_name (str): Name of the step.
commit (str): Ref to get author name for.
"""
step_test_data = kwargs.pop(
"step_test_data",
lambda: self.m.raw_io.test_api.stream_output_text(
"global-integration-roller"
),
)
args = [
"show",
commit,
"-s",
"--format=%an",
]
kwargs.setdefault("timeout", self.LOCAL_OPERATION_TIMEOUT)
return self(
step_name,
*args,
step_test_data=step_test_data,
stdout=self.m.raw_io.output_text(),
**kwargs,
).stdout.strip()
def get_author_email(
self, step_name="get commit author email", commit="HEAD", **kwargs
):
"""Find and return the author email of the given commit.
Args:
step_name (str): Name of the step.
commit (str): Ref to get author email for.
"""
step_test_data = kwargs.pop(
"step_test_data",
lambda: self.m.raw_io.test_api.stream_output_text(
"global-integration-roller@fuchsia.infra.roller.fuchsia-infra.iam.gserviceaccount.com"
),
)
args = [
"show",
commit,
"-s",
"--format=%ae",
]
kwargs.setdefault("timeout", self.LOCAL_OPERATION_TIMEOUT)
return self(
step_name,
*args,
step_test_data=step_test_data,
stdout=self.m.raw_io.output_text(),
**kwargs,
).stdout.strip()
def get_commit_message(
self, step_name="get commit msg", commit="HEAD", oneline=False, **kwargs
):
"""Get message of the given commit.
Args:
step_name (str): Name of the step.
commit (str): Git ref to get message for.
oneline (bool): Whether to shorten message to one line.
"""
step_test_data = kwargs.pop(
"step_test_data",
lambda: self.m.raw_io.test_api.stream_output_text("[foo] bar\nbaz\n"),
)
args = [
"show",
commit,
"--format=%B",
"--no-patch",
]
if oneline is True:
args.append("--oneline")
kwargs.setdefault("timeout", self.LOCAL_OPERATION_TIMEOUT)
step = self(
step_name,
*args,
step_test_data=step_test_data,
stdout=self.m.raw_io.output_text(),
trace=False,
**kwargs,
)
commit_msg = step.stdout.strip()
step.presentation.logs["commit message"] = commit_msg.splitlines()
return commit_msg
def get_changed_files(
self,
step_name="git diff-tree",
commit="HEAD",
deleted=True,
ignore_submodules=False,
test_data=("foo/bar/baz", "foo/baz"),
**kwargs,
):
"""Return a list of files changed/added/removed in a CL.
Args:
step_name (str): Name of the step.
commit (str): Git ref to get the files for.
deleted (bool): Whether to include deleted files.
ignore_submodules (bool): Whether to include submodule paths. If
True, the list may contain paths to submodule directories, but
not paths to changed files within submodules.
test_data (seq of str): Mock list of changed files.
**kwargs (dict): Passed through to __call__().
"""
args = [
"diff-tree",
"--no-commit-id",
"--name-only",
"-r", # Recurse into subrepos.
# Separate results with nuls - otherwise git will do really ugly
# escaping of non-ASCII characters in file names.
"-z",
]
if not deleted:
args.append("--diff-filter=d")
if ignore_submodules:
args.append("--ignore-submodules=all")
args.append(commit)
kwargs.setdefault("timeout", self.LOCAL_OPERATION_TIMEOUT)
step = self(
step_name,
*args,
step_test_data=lambda: self.m.raw_io.test_api.stream_output_text(
"".join(f"{f}\x00" for f in test_data)
),
stdout=self.m.raw_io.output_text(),
trace=False,
**kwargs,
)
stdout = step.stdout.strip("\0")
files = stdout.split("\0") if stdout else []
step.presentation.logs["files"] = self.m.json.dumps(files, indent=2)
return files
def snap_branch(
self,
snap_ref,
branch,
message,
path=None,
push=False,
tmp_branch="tmp_branch",
step_name="snap branch",
):
"""Snap a target branch to specified ref, i.e. overwrite target branch
to be equal to specified ref without losing previous history.
Args:
snap_ref (str): commit hash ref to snap to.
branch (str): target branch to apply snap.
message (str): snap commit message.
path (Path): directory to checkout in.
push (bool): push snap to target branch.
tmp_branch (str): temporary branch name.
"""
path = path or self.m.path.mkdtemp("snap_branch")
with self.m.step.nest(step_name), self.m.context(cwd=path):
self.fetch(repository="origin", refspec=branch)
self.raw_checkout(ref=branch)
# If branch HEAD is already at snap ref, no further action needed.
if self.get_hash() == snap_ref:
return
remote_branch = f"origin/{branch}"
# Create temporary branch off target branch.
self.raw_checkout(ref=remote_branch, branch=tmp_branch)
# Reset temporary branch to snap ref, preserving history.
self.reset(snap_ref, hard=True)
self.merge(remote_branch, strategy="ours", message=message)
if push:
# Push snapped temporary branch to target branch.
self.push(f"{tmp_branch}:refs/heads/{branch}")
def merge(self, ref, step_name="git merge", strategy=None, message=None):
"""Merge ref into current branch.
Args:
step_name (str): Name of the step.
strategy (str or None): Merge strategy. See git-merge docs.
message (str or None): Commit message for merge commit in case one
is created.
"""
args = ["merge"]
if strategy:
args += ["--strategy", strategy]
args.append(ref)
if message:
args += ["--message", message]
self(step_name, *args, timeout=self.REMOTE_OPERATION_TIMEOUT)
def is_ancestor(self, step_name="is ancestor", commit_1="HEAD", commit_2="HEAD"):
"""Check if commit_1 is an ancestor of commit_2.
Args:
step_name (str): Name of the step.
commit_1 (str): A commit that should be an ancestor of commit_2.
commit_2 (str): A commit we are checking ancestor against.
Returns:
bool: True if commit_1 is ancestor of commit_2, otherwise False.
Raises:
StepFailure: git command returns exit code >=2.
"""
args = ["merge-base", "--is-ancestor", commit_1, commit_2]
return (
self(
step_name, *args, timeout=self.LOCAL_OPERATION_TIMEOUT, ok_ret=(0, 1)
).retcode
== 0
)
def reset(self, ref, step_name="git reset", hard=False):
"""Reset current HEAD to specified ref.
Args:
step_name (str): Name of the step.
hard (bool): Reset index and working tree.
"""
args = ["reset"]
if hard:
args.append("--hard")
args.append(ref)
self(step_name, *args, timeout=self.REMOTE_OPERATION_TIMEOUT)
def restore(self, pathspec, staged=True, step_name="git restore"):
"""Restore working tree files.
Args:
pathspec (str): Pathspec to restore.
staged (bool): Whether to restore the index.
"""
args = ["restore"]
if staged:
args.append("--staged")
args.append(pathspec)
self(step_name, *args, timeout=self.REMOTE_OPERATION_TIMEOUT)
def get_default_remote_branch(
self, url, step_name="git ls-remote --symref HEAD", step_test_data=None
):
"""Get default remote branch.
Args:
step_name (str): Name of the step.
url (str): remote repository URL.
Returns:
str: Full name of ref, often including "refs/heads/".
"""
if not step_test_data:
step_test_data = lambda: self.m.raw_io.test_api.stream_output_text(
"ref: refs/heads/main HEAD\nh3ll0 HEAD"
)
step = self(
step_name,
"ls-remote",
"--symref",
url,
"HEAD",
timeout=self.LOCAL_OPERATION_TIMEOUT,
stdout=self.m.raw_io.output_text(),
step_test_data=step_test_data,
)
for line in step.stdout.splitlines():
line = line.strip()
if line.startswith("ref:"):
return line.split()[1]
raise ValueError(f"expected 'ref:' line in {step.stdout!r}") # pragma: no cover
def get_all_remote_branch_heads(
self, url, branch=None, step_name="git ls-remote", step_test_data=None
):
"""Get remote branch head(s).
If branch is specified, the number of entries returned will be either
one or zero, depending if a branch with the name `branch` is found.
Args:
step_name (str): Name of the step.
url (str): remote repository URL.
branch (str or None): The branch name to pass into `git ls-remote`.
Returns:
dict: A mapping between branche(s) and their HEAD commit SHA1s.
"""
if not step_test_data:
step_test_data = lambda: self.m.raw_io.test_api.stream_output_text(
f"h3ll0\trefs/heads/{branch}"
if branch
else "h3ll0\trefs/heads/main\nh3ll01\trefs/heads/custom_branch"
)
args = ["ls-remote", "--heads", url] + ([branch] if branch else [])
step = self(
step_name,
*args,
timeout=self.REMOTE_OPERATION_TIMEOUT,
stdout=self.m.raw_io.output_text(),
step_test_data=step_test_data,
trace=False,
)
stdout = step.stdout.strip()
step.presentation.logs["stdout"] = stdout
heads = [line.split() for line in stdout.split("\n")] if stdout else []
# Build a dictionary of names to commit SHA1s.
return {name: rev for rev, name in heads}
def get_remote_branch_head(
self, url, branch, step_name="git ls-remote", step_test_data=None
):
"""Get remote branch head.
If the branch does not exist remotely, return empty string.
Args:
step_name (str): Name of the step.
url (str): remote repository URL.
branch (str): branch name.
Returns:
str: HEAD commit SHA1 if it exists, otherwise empty string.
"""
# Query branch name to head.
heads = self.get_all_remote_branch_heads(
url, branch=branch, step_name=step_name, step_test_data=step_test_data
)
# Get branch hash or empty string if it doesn't exist. This assumes that
# heads has a maxiumum of 1 entry since `branch` is passed in.
return next(iter(heads.values()), "")
def get_remote_tag(self, url, tag, step_name="get remote tag", step_test_data=None):
"""Get remote tag commit.
If the tag does not exist remotely, return empty string.
Args:
step_name (str): Name of the step.
url (str): Remote repository URL.
tag (str): Git tag.
Returns:
str: tag's commit SHA1 if it exists, otherwise empty string.
"""
if not step_test_data:
step_test_data = lambda: self.m.raw_io.test_api.stream_output_text(
f"tagcommit {tag}"
)
tag_output = self(
step_name,
"ls-remote",
"--tags",
url,
tag,
timeout=self.REMOTE_OPERATION_TIMEOUT,
stdout=self.m.raw_io.output_text(),
step_test_data=step_test_data,
).stdout.strip()
# Parse out the commit SHA1.
return tag_output.split()[0] if tag_output else tag_output
def read_commit_footer(self, commit_msg, footer, sep=": "):
"""Given a commit message, parse a footer value from it.
Args:
commit_msg (str): The commit message to parse.
footer (str): The name of the footer to parse, e.g. "Change-Id".
sep (str or seq(str)): The separator between footer name and footer value.
If multiple separators are allowed (e.g. "=" and ": "), they
can be passed as a list.
Returns (str): The footer value if the footer was found in the commit
message, otherwise None.
"""
if isinstance(sep, str):
sep = [sep]
possible_prefixes = [footer + s for s in sep]
for line in reversed(commit_msg.splitlines()):
for prefix in possible_prefixes:
if line.startswith(prefix):
return line[len(prefix) :].strip()
return None
def get_remotes(self):
"""Get the remotes configured for a repository."""
Remote = collections.namedtuple("Remote", "name url")
for line in self.config(
"--get-regexp",
"^remote.*",
step_name="get remotes",
stdout=self.m.raw_io.output_text(),
step_test_data=lambda: self.m.raw_io.test_api.stream_output_text(
"remote.origin.url https://fuchsia.googlesource.com/fuchsia\n"
"remote.origin.url https://fuchsia.googlesource.com/fuchsia\n"
"remote.mirror.url https://fuchsia.googlesource.com/mirror\n"
"remote.mirror.url https://fuchsia.googlesource.com/mirror\n"
),
).stdout.splitlines():
match = re.search(r"^remote.([^.]*).url\s+(.+)\s*$", line)
if match:
yield Remote(name=match.group(1), url=match.group(2))