blob: 64bc5201cf801326fe215b18a5001c4afe89202e [file] [log] [blame]
# Copyright 2016 The Fuchsia Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
from future.moves.urllib.parse import urlparse
from past.builtins import basestring
import hashlib
import re
from recipe_engine import recipe_api
class GitApi(recipe_api.RecipeApi):
"""GitApi provides support for Git."""
class RebaseError(recipe_api.StepFailure):
pass
_GIT_HASH_RE = re.compile("[0-9a-fA-F]{40}")
def __init__(self, git_properties, *args, **kwargs):
super(GitApi, self).__init__(*args, **kwargs)
self._cache_path = git_properties.get("cache_path", "git")
def __call__(self, step_name, *args, **kwargs):
"""Return a git command step.
Args:
step_name (str): Name of the step.
"""
cmd = ["git"]
for k, v in sorted(kwargs.pop("config", {}).items()):
cmd.extend(["-c", "%s=%s" % (k, v)])
cmd += list(args)
return self.m.step(step_name, cmd, **kwargs)
def apply(self, patch, **kwargs):
"""Apply the specified git patch file.
Args:
patch (str): Path to git patch file.
"""
return self("apply patch", "apply", patch, **kwargs)
def branch(self, contains=None, remotes=False, step_name="git branch"):
"""Run git branch.
Args:
contains (str): Return local branches which contain this ref.
remotes (bool): Return only remote branches.
"""
args = ["branch"]
if remotes:
args.append("--remotes")
if contains:
args += ["--contains", contains]
step = self(step_name, stdout=self.m.raw_io.output_text(), *args)
step.presentation.logs["stdout"] = step.stdout
lines = step.stdout.strip().split("\n")
branches = []
for line in lines:
branch = line.strip()
if remotes:
prefix = "origin/"
if branch.startswith(prefix):
branch = branch[len(prefix) :]
branches.append(branch)
return branches
def init(self, bare=False, **kwargs):
"""Create an empty git repository or re-initialize an existing one.
Args:
bare (bool): Whether to create a bare repository.
"""
args = ["init"]
if bare:
args.append("--bare")
return self("git init", *args, **kwargs)
def set_config(self, key, value=None, step_name="set config", **kwargs):
"""Set a config.
Args:
step_name (str): Name of the step.
key (str): Config name.
value (str or None): Optional config value.
"""
args = [key]
if value:
args.append(value)
return self(step_name, "config", *args, **kwargs)
def config_remove_section(self, section, step_name="remove section", **kwargs):
"""Remove section from config.
Args:
step_name (str): Name of the step.
section (str): Section to remove.
"""
return self(step_name, "config", "--remove-section", section, **kwargs)
def config_replace_all(
self,
name,
value,
value_regex=None,
step_name="git config replace all",
**kwargs
):
"""Replace all matching config lines.
Args:
name (str): Name of config to replace.
value (str): Value to replace config with.
value_regex (str): Regex for multiline replacement.
"""
args = ["config", "--replace-all", name, value]
if value_regex:
args.append(value_regex)
return self(step_name, *args, **kwargs)
def fetch(
self,
repository=None,
refspec=None,
all_remotes=False,
prune=False,
tags=False,
recurse_submodules=None,
depth=None,
step_name="git fetch",
**kwargs
):
"""Fetch a ref.
Args:
repository (str): Repository to fetch. Must be set if `all_remotes` is
not set.
refspec (str): Refspec to fetch.
all_remotes (bool): Whether to fetch all remotes. Must be set if
`repository` is not set.
prune (bool): Whether to prune stale refs.
tags (bool): Whether to fetch tags.
recurse_submodules (bool|str|None): Whether to recurse submodules.
Default in Git is "on-demand" unless git config is different.
None means pass no options into git to use the default.
depth (int or None): History fetch depth. If None, do a full fetch.
"""
assert (repository and not all_remotes) or (
all_remotes and not repository
), "only one of `repository` or `all_remotes` can be set"
args = ["fetch"]
if all_remotes:
args.append("--all")
if prune:
args.append("--prune")
if tags:
args.append("--tags")
if repository:
args.append(repository)
if refspec:
args.append(refspec)
# The default value for recurse-submodules is usually "on-demand"
# (unless set otherwise in git config). If not specified by the caller
# don't pass any --recurse-submodules argument.
assert recurse_submodules in (True, False, None, "on-demand")
if recurse_submodules == True:
args.append("--recurse-submodules")
elif recurse_submodules == False:
args.append("--no-recurse-submodules")
elif recurse_submodules == "on-demand":
args.append("--recurse-submodules=on-demand") # pragma: no cover
if depth:
args.extend(["--depth", depth])
return self(step_name, *args, **kwargs)
def remote_add(self, remote, url, **kwargs):
"""Add a remote.
Args:
remote (str): Remote to add.
url (str): URL of the remote.
"""
return self("git remote", "remote", "add", remote, url, **kwargs)
def rev_parse(self, ref, step_name="git rev-parse", **kwargs):
"""Parse a rev into a revision.
Args:
ref (str): Git ref to parse into revision.
"""
return self(
step_name, "rev-parse", ref, stdout=self.m.raw_io.output_text(), **kwargs
).stdout.strip()
def rev_list_count(
self, rev1, rev2, step_name="git rev-list --count", test_data=None
):
"""Return the number of revisions between rev1 and rev2.
Args:
rev1 (str): The first git revision.
rev2 (str): The second git revision.
"""
num_revisions = self(
step_name,
"rev-list",
"--count",
"{rev1}..{rev2}".format(rev1=rev1, rev2=rev2),
stdout=self.m.raw_io.output_text(),
step_test_data=lambda: self.m.raw_io.test_api.stream_output_text(test_data),
).stdout.strip()
return int(num_revisions)
def hash_object(self, hash_input, step_name="git hash-object", step_test_data=None):
"""Compute object ID.
Args:
step_name (str): Name of the step.
hash_input (str): Hash input.
"""
if not step_test_data:
mock_hash = hashlib.sha1(hash_input.encode()).hexdigest()
step_test_data = lambda: self.m.raw_io.test_api.stream_output_text(
mock_hash
)
return self(
step_name,
"hash-object",
self.m.raw_io.input(hash_input),
stdout=self.m.raw_io.output_text(),
step_test_data=step_test_data,
)
def checkout(
self,
url,
path=None,
ref=None,
recursive=False,
submodules=True,
submodule_force=False,
submodule_paths=None,
remote="origin",
checkout_file=None,
cache=True,
depth=None,
**kwargs
):
"""Checkout a given ref and return the checked out revision.
Args:
url (str): URL of remote repo to use as upstream.
path (Path): Directory to clone into.
ref (str): Ref to fetch and checkout.
recursive (bool): Whether to recursively fetch submodules.
submodules (bool): Whether to sync and update submodules.
submodule_force (bool): Whether to update submodules with --force.
submodule_paths (list): List of path(s) to submodule(s).
remote (str): Name of the remote to use.
checkout_file (str): Optional path to a single file to checkout.
cache (bool): Whether to use the reference cache.
depth (int or None): Depth in history of checkout. If None, do a full
checkout.
"""
if not path:
path = url.rsplit("/", 1)[-1]
if path.endswith(".git"): # https://host/foobar.git
path = path[: -len(".git")]
path = path or path.rsplit("/", 1)[-1] # ssh://host:repo/foobar/.git
path = self.m.path["start_dir"].join(path)
self.m.file.ensure_directory("makedirs", path)
with self.m.context(cwd=path):
if self.m.path.exists(path.join(".git")): # pragma: no cover
self.config_remove_section("remote.%s" % remote, **kwargs)
else:
self.init(**kwargs)
self.remote_add(remote or "origin", url)
self.set_config(
"fetch.uriprotocols",
"https",
step_name="set fetch.uriprotocols",
**kwargs
)
if cache:
with self.m.step.nest("cache"), self.m.cache.guard(
cache=self._cache_path
):
o = urlparse(url)
dirname = o.hostname + o.path.replace("-", "--").replace("/", "-")
cache_path = self.m.path["cache"].join(self._cache_path, dirname)
self.m.file.ensure_directory("makedirs", cache_path)
with self.m.context(cwd=cache_path):
objects_path = cache_path.join("objects")
if self.m.path.exists(objects_path): # pragma: no cover
self.config_remove_section("remote.%s" % remote, **kwargs)
else:
self.init(bare=True, **kwargs)
self.set_config(
"remote.%s.url" % remote or "origin",
url,
step_name="remote set-url",
**kwargs
)
self.set_config(
"fetch.uriprotocols",
"https",
step_name="set fetch.uriprotocols",
**kwargs
)
self.config_replace_all(
"remote.origin.fetch",
"+refs/heads/*:refs/heads/*",
value_regex=r"\+refs/heads/\*:.*",
step_name="replace fetch configs",
**kwargs
)
self.fetch(
repository=remote or "origin",
prune=True,
tags=True,
depth=depth,
**kwargs
)
info = path.join(".git", "objects", "info")
self.m.file.ensure_directory("makedirs object/info", info)
self.m.file.write_text(
"alternates", info.join("alternates"), "%s\n" % objects_path
)
if not ref:
fetch_ref = "main"
checkout_ref = "FETCH_HEAD"
elif self._GIT_HASH_RE.match(ref):
fetch_ref = ""
checkout_ref = ref
elif ref.startswith("refs/heads/"):
fetch_ref = ref[len("refs/heads/") :]
checkout_ref = "FETCH_HEAD"
else:
fetch_ref = ref
checkout_ref = "FETCH_HEAD"
self.fetch(
repository=remote or "origin",
refspec=fetch_ref,
tags=True,
recurse_submodules=recursive or None,
depth=depth,
**kwargs
)
self.raw_checkout(
ref=checkout_ref, force=True, checkout_file=checkout_file, **kwargs
)
step_test_data = kwargs.pop(
"step_test_data",
lambda: self.m.raw_io.test_api.stream_output_text("deadbeef"),
)
sha = self.rev_parse("HEAD", step_test_data=step_test_data)
self.clean(force=True, recursive=True, ignore_rules=True, **kwargs)
if submodules:
with self.m.step.nest("submodule"):
self.sync_submodule(**kwargs)
self.update_submodule(
recursive=recursive,
force=submodule_force,
paths=submodule_paths,
**kwargs
)
return sha
def clean(
self,
step_name="git clean",
force=False,
recursive=False,
ignore_rules=False,
**kwargs
):
"""Remove untracked files from working tree.
Args:
force (bool): Clean with force.
recursive (bool): Recurse into directories.
ignore_rules (bool): -x option: use ignore rules from -e.
"""
args = ["clean"]
if force:
args.append("-f")
if recursive:
args.append("-d")
if ignore_rules:
args.append("-x")
return self(step_name, *args, **kwargs)
def raw_checkout(
self,
step_name="git checkout",
ref=None,
checkout_file=None,
force=False,
branch=None,
directory=None,
**kwargs
):
"""Run a git checkout.
Args:
step_name (str): Name of the step.
ref (str or None): Ref to checkout.
checkout_file (str or None): Optional path to a single file to
checkout.
force (bool): Checkout with force.
branch (str): Checkout new branch with this name.
directory (Path): Checkout in this directory. TODO(atyfto): Move
this to __call__.
"""
args = []
if directory:
args += ["-C", directory]
args.append("checkout")
if force:
args.append("-f")
if branch:
args += ["-b", branch]
if ref:
args.append(ref)
if checkout_file:
args += ["--", checkout_file]
return self(step_name, *args, **kwargs)
def sync_submodule(self, step_name="git submodule sync", **kwargs):
"""Sync submodule.
Args:
step_name (str): Name of the step.
"""
return self(step_name, "submodule", "sync", **kwargs)
def update_submodule(
self,
step_name="git submodule update",
recursive=False,
force=False,
paths=None,
**kwargs
):
"""Update submodule.
Args:
step_name (str): Name of the step.
recursive (bool): Recursively update submodule.
force (bool): Force-update submodule.
submodule_path (str): Path(s) to submodule(s).
"""
args = ["submodule", "update", "--init"]
if recursive:
args.append("--recursive")
if force:
args.append("--force")
args.extend(("--jobs", str(min(16, max(2, self.m.platform.cpu_count // 2)))))
if paths:
args.extend(paths)
return self(step_name, *args, **kwargs)
def checkout_commit(self, commit, path, **kwargs):
"""Checkout a gitiles commit.
Args:
(see `checkout()` for others)
commit (common_pb2.GitilesCommit): The commit to check out.
"""
url = "https://%s/%s" % (commit.host, commit.project)
return self.checkout(url, path=path, ref=commit.id, **kwargs)
def checkout_cl(
self, cl, path, rebase_merges=False, onto="refs/heads/main", **kwargs
):
"""Checkout a CL and rebase it onto ref.
Args:
(see `checkout()` for others)
cl (common_pb2.GerritChange or None): The CL to check out.
onto (str): The ref to rebase the CL onto.
"""
url = "https://%s/%s" % (cl.host.replace("-review", ""), cl.project)
ref = "refs/changes/%02d/%d/%d" % (cl.change % 100, cl.change, cl.patchset)
with self.m.step.nest("fetch %s" % onto):
onto = self.checkout(url, path=path, ref=onto, **kwargs)
checkout_result = self.checkout(url, path=path, ref=ref, **kwargs)
with self.m.context(cwd=path):
self.rebase(onto, rebase_merges=rebase_merges)
return checkout_result
def checkout_from_build_input(
self, path, repo, rebase_merges=False, fallback_ref=None, **kwargs
):
"""Use the Buildbucket input to checkout the triggering revision.
Checkout the triggering CL in CQ, or the triggering commit in CI.
If repo does not match the build input, fallback to checkout the repo
at HEAD, or the fallback ref if specified.
"""
with self.m.step.nest("checkout"), self.m.context(infra_steps=True):
build_input = self.m.buildbucket.build.input
if build_input.gerrit_changes:
change = build_input.gerrit_changes[0]
if (
"https://%s/%s"
% (change.host.replace("-review", ""), change.project)
== repo
):
return self.checkout_cl(
change, path=path, rebase_merges=rebase_merges, **kwargs
)
elif build_input.gitiles_commit.project:
gitiles_commit = build_input.gitiles_commit
if (
"https://%s/%s" % (gitiles_commit.host, gitiles_commit.project)
== repo
):
return self.checkout_commit(
build_input.gitiles_commit, path=path, **kwargs
)
return self.checkout(repo, path=path, ref=fallback_ref, **kwargs)
def commit(
self,
message=None,
no_edit=False,
step_name="git commit",
files=(),
amend=False,
allow_empty=False,
all_tracked=False,
all_files=False,
author_override=None,
**kwargs
):
"""Run git commit in the current working directory.
Args:
message (str): The message to attach to the commit. Mutually exclusive
with `no_edit`.
no_edit (bool): Do not edit commit message. Mutually exclusive with
`message`.
files (seq[Path]): The set of files containing changes to commit.
amend (bool): Whether to amend the previous commit.
allow_empty (bool): Whether to allow an empty commit.
all_tracked (bool): Stage all tracked files before committing. If True,
files must be empty and all_files must be False.
all_files (bool): Stage all files (even untracked) before committing. If
True, files must be empty and all_tracked must be False.
author_override (dict): Dict representation of a git.Commit_User
containing name and email keys. If specified, override the author of
the commit.
"""
assert (message and not no_edit) or (not message and no_edit)
cmd = ["commit"]
if message:
cmd += ["-m", message]
if no_edit:
cmd.append("--no-edit")
if amend:
cmd.append("--amend")
if allow_empty:
cmd.append("--allow-empty")
if all_tracked:
assert not all_files
assert not files
cmd.append("-a")
elif all_files:
assert not files
self.add(add_all=True)
else:
cmd.extend(files)
if author_override:
cmd += [
"--author",
"%s <%s>" % (author_override["name"], author_override["email"]),
]
return self(step_name, *cmd, **kwargs)
def tag(self, name, step_name="git tag"):
"""Create a tag.
Args:
name (str): Name of tag.
"""
return self(step_name, "tag", name)
def add(
self,
step_name="git add",
pathspec=None,
add_all=False,
intent_to_add=False,
only_tracked=False,
force=False,
**kwargs
):
"""Add file contents to index.
Args:
step_name (str): Name of the step.
pathspec (str): Add pathspec to index.
add_all (bool): Make index match the working tree.
intent_to_add (bool): Move any new files into tracked state.
only_tracked (bool): Add all tracked files to the index, and
no untracked files implies add_all=False.
force (bool): Allow adding otherwise ignored files.
"""
args = ["add"]
if only_tracked:
assert not add_all, "only_tracked and add_all cannot be set together"
args.append("--update")
if add_all:
args.append("--all")
if intent_to_add:
args.append("--intent-to-add")
if force:
args.append("--force")
if pathspec:
args.append(pathspec)
return self(step_name, *args, **kwargs)
def ls_files(
self,
step_name="git ls-files",
modified=False,
deleted=False,
others=False,
exclude_standard=False,
line_termination=False,
file=None,
test_data="hello",
**kwargs
):
"""
List files in index.
Args:
step_name (str): Name of the step.
modified (bool): Include modified files.
deleted (bool): Include deleted files.
others (bool): Include untracked files.
exclude_standard (bool): Add standard Git exclusions.
line_termination (bool): -z option: null byte termination on output.
file (str): Filepath or regex to filter for.
test_data (str): Test data stdout.
"""
args = ["ls-files"]
if modified:
args.append("--modified")
if deleted:
args.append("--deleted")
if exclude_standard:
args.append("--exclude-standard")
if others:
args.append("--others")
if line_termination:
args.append("-z")
if file:
args += ["--", file]
return self(
step_name,
*args,
stdout=self.m.raw_io.output_text(),
step_test_data=lambda: self.m.raw_io.test_api.stream_output_text(test_data),
**kwargs
)
def diff(
self,
step_name="git diff",
ref_base="HEAD",
ref=None,
patch_file=None,
unified=None,
cached=False,
exit_code=False,
step_test_data=None,
**kwargs
):
"""Run git diff in the current working directory.
Args:
step_name (str): Name of the step.
ref_base (str or None): Base ref for diff.
ref (str or None): Secondary ref for diff.
patch_file (str): Path to patch file.
unified (int): Generate diff with `unified` lines of context.
cached (bool): Get staged files.
exit_code (bool): Exit with codes similar to diff(1).
"""
if step_test_data is None:
step_test_data = lambda: self.m.raw_io.test_api.stream_output_text(
"""
diff --git a/recipe_modules/fuchsia/api.py b/recipe_modules/fuchsia/api.py
index 429e4612..83925ff3 100644
--- a/recipe_modules/fuchsia/api.py
+++ b/recipe_modules/fuchsia/api.py
@@ -562,15 +562,6 @@ class FuchsiaApi(recipe_api.RecipeApi):
- # arm64 hardware targets have a stable version of Zedboot on their
- # recovery partition, we do not want to send the build Zedboot to
- # overwrite it.
- if build.target == 'arm64' and device_type != 'QEMU':
- for image in build.images.keys():
- # Matches both 'zircon-r' and 'zircon-r.signed'
- if 'zircon-r' in image:
- del build.images[image]
"""
)
args = ["diff"]
if ref_base:
args.append(ref_base)
if ref:
args.append(ref)
if unified is not None:
args.append("--unified=%d" % unified)
if cached:
args.append("--cached")
if exit_code:
args.append("--exit-code")
return self(
step_name,
*args,
step_test_data=step_test_data,
stdout=self.m.raw_io.output_text(leak_to=patch_file),
**kwargs
)
def log(self, step_name="git log", directory=None, depth=10):
"""Get git log.
Args:
step_name (str): Name of the step.
directory (Path or None): Directory to run git log in. Defaults to
cwd. TODO(atyfto): Move this to __call__.
depth (int): Commit depth of log.
"""
args = []
if directory:
args += ["-C", directory]
args += ["log", "-n", depth]
return self(step_name, *args)
def push(
self,
refs,
step_name="git push",
remote="origin",
atomic=False,
options=(),
dryrun=False,
**kwargs
):
"""Run a git push for a ref or sequence of refs.
Args:
refs (str or seq(str)): Ref or sequence of refs to push to remote.
remote (str): Remote repository to push to.
atomic (bool): Request atomic transaction on remote side.
NOTE: This option is ignored by Gerrit.
options (seq(str)): Server-specific options passed to the --push-option
flag.
dryrun (bool): Push in dryrun mode.
"""
args = ["push"]
if atomic:
args.append("--atomic")
if dryrun:
args.append("--dry-run")
for option in options:
args.extend(["--push-option", option])
args.append(remote)
if isinstance(refs, basestring):
args.append(refs)
else:
args.extend(refs)
return self(step_name, *args, **kwargs)
def cherry_pick(self, commit, strategy_option=None, step_name="git cherry-pick"):
"""Cherry-pick a commit.
Args:
step_name (str): Name of the step.
commit (str): Commit to cherry-pick as sha1.
strategy (str|None): Use merge strategy option.
"""
args = ["cherry-pick", commit]
if strategy_option:
args += ["--strategy-option", strategy_option]
return self(step_name, *args)
def revert(self, commit, strategy_option=None, step_name="git revert"):
"""Revert a commit.
Args:
step_name (str): Name of the step.
commit (str): Commit to revert as sha1.
strategy (str|None): Use merge strategy option.
"""
args = ["revert", commit]
if strategy_option:
args += ["--strategy-option", strategy_option]
return self(step_name, *args)
def rebase(self, ref, rebase_merges=False, **kwargs):
"""Run rebase with the given arguments.
If a StepFailure occurs, abort the rebase and re-raise the
error.
"""
args = ["rebase"]
if rebase_merges:
args.append("--rebase-merges")
args.append(ref)
try:
self("git rebase", *args, **kwargs)
except self.m.step.StepFailure: # pragma: no cover
self("git rebase abort", "rebase", "--abort", **kwargs)
raise self.RebaseError("Failed to rebase")
def get_hash(self, commit="HEAD", **kwargs):
"""Find and return the hash of the given commit."""
step_test_data = kwargs.pop(
"step_test_data",
lambda: self.m.raw_io.test_api.stream_output_text("deadbeef"),
)
return self.rev_parse(commit, step_test_data=step_test_data)
def describe(
self,
step_name="git describe",
commit="HEAD",
tags=True,
contains=False,
exact_match=True,
expected_num=None,
test_data=None,
**kwargs
):
"""Describe a commit by its tag(s).
Args:
step_name (str): Name of the step.
commit (str): Ref to describe.
tags (bool): If True, use any tag in refs/tags namespace. Otherwise,
only use annotated tags.
contains (bool): If True, find the tag that comes after the given commit.
Automatically implies 'tags' flag.
exact_match (bool): If True, only output exact tag matches to the given
commit.
expected_num (int): If specified, raise exception if expected number
of tags which describe the given commit is not equal to actual.
test_data (func): If specified, overwrite step test data.
Returns:
list: tag(s) describing the commit.
"""
if test_data is None:
test_data = lambda: self.m.raw_io.test_api.stream_output_text(
"testtag1\ntesttag2"
)
cmd = ["describe"]
if tags is True:
cmd.append("--tags")
if contains is True:
cmd.append("--contains")
if exact_match is True:
cmd.append("--exact-match")
cmd.append(commit)
step = self(
step_name,
*cmd,
stdout=self.m.raw_io.output_text(),
step_test_data=test_data,
ok_ret="any",
**kwargs
)
# An error will be raised if the commit has no tags to describe it.
tags = [] if step.retcode else step.stdout.strip().split("\n")
if expected_num is not None and len(tags) != expected_num:
raise self.m.step.StepFailure(
"Expected number of matching tags ({expected_num}) does not match "
"actual ({actual_num}).".format(
expected_num=expected_num,
actual_num=len(tags),
)
)
return tags
def get_remote_url(self, name, step_test_data=None, **kwargs):
"""Find and return the URL of given remote."""
if not step_test_data:
step_test_data = lambda: self.m.raw_io.test_api.stream_output_text(
"https://fuchsia.googlesource.com/fuchsia"
)
return self.set_config(
"remote.%s.url" % name,
stdout=self.m.raw_io.output_text(),
step_test_data=step_test_data,
**kwargs
).stdout.strip()
def get_timestamp(self, commit="HEAD", step_name="get commit ts", **kwargs):
"""Find and return the timestamp of the given commit.
Args:
step_name (str): Name of the step.
commit (str): Ref to get timestamp for.
"""
step_test_data = kwargs.pop(
"step_test_data",
lambda: self.m.raw_io.test_api.stream_output_text("1473312770"),
)
args = ["show", commit, "--format=%at", "--no-patch"]
return self(
step_name,
*args,
step_test_data=step_test_data,
stdout=self.m.raw_io.output_text(),
**kwargs
).stdout.strip()
def get_commit_message(
self, step_name="get commit msg", commit="HEAD", oneline=False, **kwargs
):
"""Get message of the given commit.
Args:
step_name (str): Name of the step.
commit (str): Git ref to get message for.
oneline (bool): Whether to shorten message to one line.
"""
step_test_data = kwargs.pop(
"step_test_data",
lambda: self.m.raw_io.test_api.stream_output_text("[foo] bar\nbaz\n"),
)
args = [
"show",
commit,
"--format=%B",
"--no-patch",
]
if oneline is True:
args.append("--oneline")
step = self(
step_name,
*args,
step_test_data=step_test_data,
stdout=self.m.raw_io.output_text(),
**kwargs
)
commit_msg = step.stdout.strip()
step.presentation.logs["commit message"] = commit_msg.splitlines()
return commit_msg
def get_changed_files(
self,
step_name="git diff-tree",
commit="HEAD",
deleted=True,
ignore_submodules=False,
test_data=("foo/bar/baz", "foo/baz"),
**kwargs
):
"""Return a list of files changed/added/removed in a CL.
Args:
step_name (str): Name of the step.
commit (str): Git ref to get the files for.
deleted (bool): Whether to include deleted files.
ignore_submodules (bool): Whether to include submodule paths. If
True, the list may contain paths to submodule directories, but
not paths to changed files within submodules.
test_data (seq of str): Mock list of changed files.
**kwargs (dict): Passed through to __call__().
"""
args = [
"diff-tree",
"--no-commit-id",
"--name-only",
"-r", # Recurse into subrepos.
# Separate results with nuls - otherwise git will do really ugly
# escaping of non-ASCII characters in file names.
"-z",
]
if not deleted:
args.append("--diff-filter=d")
if ignore_submodules:
args.append("--ignore-submodules=all")
args.append(commit)
step = self(
step_name,
*args,
step_test_data=lambda: self.m.raw_io.test_api.stream_output_text(
"".join("{}\0".format(f) for f in test_data)
),
stdout=self.m.raw_io.output_text(),
**kwargs
)
files = step.stdout.strip("\0").split("\0")
step.presentation.logs["files"] = files
return files
def snap_branch(
self,
url,
snap_ref,
branch,
message,
tmp_branch="tmp_branch",
path=None,
checkout=True,
push=False,
step_name="snap branch",
):
"""Snap a target branch to specified ref, i.e. overwrite target branch
to be equal to specified ref without losing previous history.
Args:
url (str): url of remote repo to use as upstream.
snap_ref (str): commit hash ref to snap to.
branch (str): target branch to apply snap.
message (str): snap commit message.
tmp_branch (str): temporary branch name.
path (Path): directory to checkout in.
checkout (bool): if False, use the existing checkout at path.
push (bool): push snap to target branch.
"""
assert not (
not checkout and not path
), "Cannot attempt snap with neither a checkout nor a path."
with self.m.step.nest(step_name):
path = path or self.m.path.mkdtemp("snap_branch")
with self.m.context(cwd=path):
if checkout:
self.checkout(url=url, ref=branch, path=path)
# If branch HEAD is already at snap ref, no further action needed.
if self.get_hash() == snap_ref:
return
remote_branch = "origin/{branch}".format(branch=branch)
# Create temporary branch off target branch.
self.raw_checkout(ref=remote_branch, branch=tmp_branch)
# Reset temporary branch to snap ref, preserving history.
self.reset(snap_ref, hard=True)
self.merge(remote_branch, strategy="ours", message=message)
if push:
# Push snapped temporary branch to target branch.
self.push(
"{tmp_branch}:refs/heads/{branch}".format(
tmp_branch=tmp_branch,
branch=branch,
)
)
def merge(self, ref, step_name="git merge", strategy=None, message=None):
"""Merge ref into current branch.
Args:
step_name (str): Name of the step.
strategy (str or None): Merge strategy. See git-merge docs.
message (str or None): Commit message for merge commit in case one
is created.
"""
args = ["merge"]
if strategy:
args += ["--strategy", strategy]
args.append(ref)
if message:
args += ["--message", message]
self(step_name, *args)
def is_ancestor(self, step_name="is ancestor", commit_1="HEAD", commit_2="HEAD"):
"""Check if commit_1 is an ancestor of commit_2.
Args:
step_name (str): Name of the step.
commit_1 (str): A commit that should be an ancestor of commit_2.
commit_2 (str): A commit we are checking ancestor against.
Returns:
bool: True if commit_1 is ancestor of commit_2, otherwise False.
Raises:
StepFailure: git command returns exit code >=2.
"""
args = ["merge-base", "--is-ancestor", commit_1, commit_2]
return self(step_name, *args, ok_ret=(0, 1)).retcode == 0
def reset(self, ref, step_name="git reset", hard=False):
"""Reset current HEAD to specified ref.
Args:
step_name (str): Name of the step.
hard (bool): Reset index and working tree.
"""
args = ["reset"]
if hard:
args.append("--hard")
args.append(ref)
self(step_name, *args)
def get_default_remote_branch(
self, url, step_name="git ls-remote --symref HEAD", step_test_data=None
):
"""Get default remote branch.
Args:
step_name (str): Name of the step.
url (str): remote repository URL.
Returns:
str: Full name of ref, often including "refs/heads/".
"""
if not step_test_data:
step_test_data = lambda: self.m.raw_io.test_api.stream_output_text(
"ref: refs/heads/main HEAD\nh3ll0 HEAD"
)
step = self(
step_name,
"ls-remote",
"--symref",
url,
"HEAD",
stdout=self.m.raw_io.output_text(),
step_test_data=step_test_data,
)
for line in step.stdout.splitlines():
line = line.strip()
if line.startswith("ref:"):
return line.split()[1]
raise ValueError( # pragma: no cover
"expected 'ref:' line in {!r}".format(step.stdout)
)
def get_all_remote_branch_heads(
self, url, branch=None, step_name="git ls-remote", step_test_data=None
):
"""Get remote branch head(s).
If branch is specified, the number of entries returned will be either
one or zero, depending if a branch with the name `branch` is found.
Args:
step_name (str): Name of the step.
url (str): remote repository URL.
branch (str or None): The branch name to pass into `git ls-remote`.
Returns:
dict: A mapping between branche(s) and their HEAD commit SHA1s.
"""
if not step_test_data:
step_test_data = lambda: self.m.raw_io.test_api.stream_output_text(
"h3ll0\trefs/heads/{branch}".format(branch=branch)
if branch
else "h3ll0\trefs/heads/main\nh3ll01\trefs/heads/custom_branch"
)
args = ["ls-remote", "--heads", url] + ([branch] if branch else [])
step = self(
step_name,
*args,
stdout=self.m.raw_io.output_text(),
step_test_data=step_test_data
)
stdout = step.stdout.strip()
heads = [line.split() for line in stdout.split("\n")] if stdout else []
# Build a dictionary of names to commit SHA1s.
return {name: rev for rev, name in heads}
def get_remote_branch_head(
self, url, branch, step_name="git ls-remote", step_test_data=None
):
"""Get remote branch head.
If the branch does not exist remotely, return empty string.
Args:
step_name (str): Name of the step.
url (str): remote repository URL.
branch (str): branch name.
Returns:
str: HEAD commit SHA1 if it exists, otherwise empty string.
"""
# Query branch name to head.
heads = self.get_all_remote_branch_heads(
url, branch=branch, step_name=step_name, step_test_data=step_test_data
)
# Get branch hash or empty string if it doesn't exist. This assumes that
# heads has a maxiumum of 1 entry since `branch` is passed in.
return next(iter(heads.values()), "")
def get_remote_tag(self, url, tag, step_name="get remote tag", step_test_data=None):
"""Get remote tag commit.
If the tag does not exist remotely, return empty string.
Args:
step_name (str): Name of the step.
url (str): Remote repository URL.
tag (str): Git tag.
Returns:
str: tag's commit SHA1 if it exists, otherwise empty string.
"""
if not step_test_data:
step_test_data = lambda: self.m.raw_io.test_api.stream_output_text(
"tagcommit {tag}".format(tag=tag)
)
tag_output = self(
step_name,
"ls-remote",
"--tags",
url,
tag,
stdout=self.m.raw_io.output_text(),
step_test_data=step_test_data,
).stdout.strip()
# Parse out the commit SHA1.
return tag_output.split()[0] if tag_output else tag_output
def read_commit_footer(self, commit_msg, footer, sep=": "):
"""Given a commit message, parse a footer value from it.
Args:
commit_msg (str): The commit message to parse.
footer (str): The name of the footer to parse, e.g. "Change-Id".
sep (str or seq(str)): The separator between footer name and footer value.
If multiple separators are allowed (e.g. "=" and ": "), they
can be passed as a list.
Returns (str): The footer value if the footer was found in the commit
message, otherwise None.
"""
if isinstance(sep, str):
sep = [sep]
possible_prefixes = [footer + s for s in sep]
for line in reversed(commit_msg.splitlines()):
for prefix in possible_prefixes:
if line.startswith(prefix):
return line[len(prefix) :].strip()
return None