blob: d066e3c4ee538269131b0d5161db24c84fa6099f [file] [log] [blame]
# Copyright 2016 The Fuchsia Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
from urlparse import urlparse
import re
from recipe_engine import recipe_api
class GitApi(recipe_api.RecipeApi):
"""GitApi provides support for Git."""
_GIT_HASH_RE = re.compile('[0-9a-f]{40}', re.IGNORECASE)
def __init__(self, git_properties, *args, **kwargs):
super(GitApi, self).__init__(*args, **kwargs)
self._cache_path = git_properties.get('cache_path', 'git')
def __call__(self, *args, **kwargs):
"""Return a git command step."""
name = kwargs.pop('name', 'git ' + args[0])
git_cmd = ['git']
for k, v in sorted(kwargs.pop('config', {}).iteritems()):
git_cmd.extend(['-c', '%s=%s' % (k, v)])
return self.m.step(name, git_cmd + list(args), **kwargs)
def apply(self, patch, **kwargs):
"""Applies the specified git patch file."""
return self('apply', patch, **kwargs)
def checkout(self,
url,
path=None,
ref=None,
recursive=False,
submodules=True,
submodule_force=False,
remote='origin',
checkout_file=None,
cache=True,
**kwargs):
"""Checkout a given ref and return the checked out revision.
Args:
url (str): url of remote repo to use as upstream
path (Path): directory to clone into
ref (str): ref to fetch and check out
recursive (bool): whether to recursively fetch submodules or not
submodules (bool): whether to sync and update submodules or not
submodule_force (bool): whether to update submodules with --force
remote (str): name of the remote to use
checkout_file (str): optional path to a single file to checkout
cache (bool): whether to use the reference cache
"""
if not path:
path = url.rsplit('/', 1)[-1]
if path.endswith('.git'): # https://host/foobar.git
path = path[:-len('.git')]
path = path or path.rsplit('/', 1)[-1] # ssh://host:repo/foobar/.git
path = self.m.path['start_dir'].join(path)
self.m.file.ensure_directory('makedirs', path)
with self.m.context(cwd=path):
if self.m.path.exists(path.join('.git')): # pragma: no cover
self('config', '--remove-section', 'remote.%s' % remote, **kwargs)
else:
self('init', **kwargs)
self('remote', 'add', remote or 'origin', url)
if cache:
with self.m.step.nest('cache'):
o = urlparse(url)
dirname = o.hostname + o.path.replace('-', '--').replace('/', '-')
cache_path = self.m.path['cache'].join(self._cache_path, dirname)
self.m.file.ensure_directory('makedirs', cache_path)
with self.m.context(cwd=cache_path):
objects_path = cache_path.join('objects')
if self.m.path.exists(objects_path): # pragma: no cover
self('config', '--remove-section', 'remote.%s' % remote, **kwargs)
else:
self('init', '--bare', **kwargs)
self(
'config',
'remote.%s.url' % remote or 'origin',
url,
name='remote set-url',
**kwargs)
self('config', '--replace-all', 'remote.origin.fetch',
'+refs/heads/*:refs/heads/*', r'\+refs/heads/\*:.*', **kwargs)
self('fetch', '--prune', '--tags', remote or 'origin', **kwargs)
info = path.join('.git', 'objects', 'info')
self.m.file.ensure_directory('makedirs object/info', info)
self.m.file.write_text('alternates', info.join('alternates'),
'%s\n' % objects_path)
if not ref:
fetch_ref = 'master'
checkout_ref = 'FETCH_HEAD'
elif self._GIT_HASH_RE.match(ref):
fetch_ref = ''
checkout_ref = ref
elif ref.startswith('refs/heads/'):
fetch_ref = ref[len('refs/heads/'):]
checkout_ref = 'FETCH_HEAD'
else:
fetch_ref = ref
checkout_ref = 'FETCH_HEAD'
fetch_args = [x for x in (remote or 'origin', fetch_ref) if x]
if recursive:
fetch_args.append('--recurse-submodules')
self('fetch', '--tags', *fetch_args, **kwargs)
if checkout_file:
self('checkout', '-f', checkout_ref, '--', checkout_file, **kwargs)
else:
self('checkout', '-f', checkout_ref, **kwargs)
step_test_data = kwargs.pop(
'step_test_data', lambda: self.m.raw_io.test_api.stream_output(
'deadbeef'))
step = self(
'rev-parse',
'HEAD',
stdout=self.m.raw_io.output(),
step_test_data=step_test_data)
sha = step.stdout.strip()
self('clean', '-f', '-d', '-x', **kwargs)
if submodules:
with self.m.step.nest('submodule'):
self('submodule', 'sync', name='git submodule sync', **kwargs)
submodule_update_args = ['--init']
if recursive:
submodule_update_args.append('--recursive')
if submodule_force:
submodule_update_args.append('--force')
self(
'submodule',
'update',
*submodule_update_args,
name='git submodule update',
**kwargs)
return sha
def checkout_commit(self, commit, **kwargs):
"""Checks out a gitiles commit.
Args (see `checkout()` for others):
commit (common_pb2.GitilesCommit): the commit to check out
"""
url = 'https://%s/%s' % (commit.host, commit.project)
return self.checkout(url, ref=commit.id, **kwargs)
def checkout_cl(self, cl, path, **kwargs):
"""Checks out a CL and rebases it on top of master.
Args (see `checkout()` for others):
cl (common_pb2.GerritChange or None): The CL to check out.
"""
url = 'https://%s/%s' % (cl.host.replace('-review', ''), cl.project)
ref = 'refs/changes/%02d/%d/%d' % (cl.change % 100, cl.change, cl.patchset)
with self.m.step.nest('fetch master'):
master_ref = self.checkout(url)
checkout_result = self.checkout(url, path=path, ref=ref, **kwargs)
with self.m.context(cwd=path):
self.rebase(master_ref)
return checkout_result
def commit(self,
message,
files=(),
all_tracked=False,
all_files=False,
**kwargs):
"""Runs git commit in the current working directory.
Args:
message (str): The message to attach to the commit.
files (seq[Path]): The set of files containing changes to commit.
all_tracked (bool): Stage all tracked files before committing. If True,
files must be empty and all_files must be False.
all_files (bool): Stage all files (even untracked) before committing. If
True, files must be empty and all_tracked must be False.
"""
if all_tracked:
assert not all_files
assert not files
return self('commit', '-m', message, '-a', **kwargs)
elif all_files:
assert not files
self('add', '-A')
return self('commit', '-m', message, **kwargs)
return self('commit', '-m', message, *files, **kwargs)
def diff(self, ref_base='HEAD', ref=None, patch_file=None, **kwargs):
"""Runs git diff in the current working directory."""
step_test_data = kwargs.pop(
'step_test_data', lambda: self.m.raw_io.test_api.stream_output("""
diff --git a/recipe_modules/fuchsia/api.py b/recipe_modules/fuchsia/api.py
index 429e4612..83925ff3 100644
--- a/recipe_modules/fuchsia/api.py
+++ b/recipe_modules/fuchsia/api.py
@@ -562,15 +562,6 @@ class FuchsiaApi(recipe_api.RecipeApi):
- # arm64 hardware targets have a stable version of Zedboot on their
- # recovery partition, we do not want to send the build Zedboot to
- # overwrite it.
- if build.target == 'arm64' and device_type != 'QEMU':
- for image in build.images.keys():
- # Matches both 'zircon-r' and 'zircon-r.signed'
- if 'zircon-r' in image:
- del build.images[image]
"""))
if ref is not None:
return self(
'diff',
ref_base,
ref,
step_test_data=step_test_data,
stdout=self.m.raw_io.output(leak_to=patch_file),
**kwargs).stdout.strip()
return self(
'diff',
ref_base,
step_test_data=step_test_data,
stdout=self.m.raw_io.output(leak_to=patch_file),
**kwargs).stdout.strip()
def push(self, ref, remote='origin', dryrun=False, **kwargs):
cmd = ['push']
if dryrun:
cmd.append('--dry-run')
cmd += [remote, ref]
return self(*cmd, **kwargs)
def rebase(self, *args, **kwargs):
"""Run rebase with the given arguments.
If a StepFailure occurs, abort the rebase and re-raise the error.
"""
try:
self('rebase', *args, **kwargs)
except self.m.step.StepFailure: # pragma: no cover
self('rebase', '--abort', **kwargs)
raise
def get_hash(self, commit='HEAD', **kwargs):
"""Find and return the hash of the given commit."""
step_test_data = kwargs.pop(
'step_test_data', lambda: self.test_api.m.raw_io.stream_output(
'deadbeef'))
return self(
'rev-parse',
commit,
stdout=self.m.raw_io.output(),
step_test_data=step_test_data,
**kwargs).stdout.strip()
def describe(self,
commit='HEAD',
tags=True,
contains=False,
exact_match=True,
expected_num=None,
test_data=None,
**kwargs):
"""
Describe a commit by its tag(s).
Args:
tags (bool): If True, use any tag in refs/tags namespace. Otherwise,
only use annotated tags.
contains (bool): If True, find the tag that comes after the given commit.
Automatically implies 'tags' flag.
exact_match (bool): If True, only output exact tag matches to the given
commit.
expected_num (int): If specified, raise exception if expected number
of tags which describe the given commit is not equal to actual.
test_data (func): If specified, overwrite step test data.
Returns:
list: tag(s) describing the commit.
"""
if test_data is None:
test_data = lambda: self.m.raw_io.test_api.stream_output(
'testtag1\ntesttag2')
cmd = ['describe']
if tags is True:
cmd.append('--tags')
if contains is True:
cmd.append('--contains')
if exact_match is True:
cmd.append('--exact-match')
cmd.append(commit)
output = self(
*cmd, stdout=self.m.raw_io.output(), step_test_data=test_data,
**kwargs).stdout.strip()
tags = output.split('\n')
if expected_num is not None and len(tags) != expected_num:
raise self.m.step.StepFailure(
'Expected number of matching tags ({expected_num}) does not match '
'actual ({actual_num}).'.format(
expected_num=expected_num,
actual_num=len(tags),
))
return tags
def get_remote_url(self, name, **kwargs):
"""Find and return the URL of given remote."""
step_test_data = kwargs.pop(
'step_test_data', lambda: self.test_api.m.raw_io.stream_output(
'https//fuchsia.googlesource.com/fuchsia'))
return self(
'config',
'remote.%s.url' % name,
stdout=self.m.raw_io.output(),
step_test_data=step_test_data,
**kwargs).stdout.strip()
def get_timestamp(self, commit='HEAD', **kwargs):
"""Find and return the timestamp of the given commit."""
step_test_data = kwargs.pop(
'step_test_data', lambda: self.m.raw_io.test_api.stream_output(
'1473312770'))
# TODO(garymm): Use --no-patch instead of -s
return self(
'show',
commit,
'--format=%at',
'-s',
step_test_data=step_test_data,
stdout=self.m.raw_io.output(),
**kwargs).stdout.strip()
def get_commit_message(self, commit='HEAD', oneline=False, **kwargs):
"""
Returns the message of the given commit.
Args:
commit (str): git ref to get message for.
oneline (bool): shorten message to oneline.
"""
step_test_data = kwargs.pop(
'step_test_data', lambda: self.m.raw_io.test_api.stream_output(
'[foo] bar\nbaz\n'))
args = [
'show',
commit,
'--format=%B',
'--no-patch',
]
if oneline is True:
args.append('--oneline')
step = self(
*args,
step_test_data=step_test_data,
stdout=self.m.raw_io.output(),
**kwargs)
commit_msg = step.stdout.strip()
step.presentation.logs['commit message'] = commit_msg.splitlines()
return commit_msg
def get_changed_files(self, commit='HEAD', **kwargs):
"""Return a list of files changed/added/removed in a CL.
Args:
commit (str): git ref to get the files for.
"""
test_data = kwargs.pop('test_data', 'foo/bar/baz\0foo/baz\0')
if isinstance(test_data, (list, tuple)):
test_data = ''.join('{}\0'.format(x) for x in test_data)
step_test_data = lambda: self.m.raw_io.test_api.stream_output(test_data)
args = [
'diff-tree',
'--no-commit-id',
'--name-only',
'-r', # Recurse into subrepos.
'HEAD',
'-z', # Separate results with nuls.
]
step = self(
*args,
step_test_data=step_test_data,
stdout=self.m.raw_io.output(),
**kwargs)
files = step.stdout.strip('\0').split('\0')
step.presentation.logs['files'] = files
return files
def snap_branch(self,
url,
snap_ref,
branch,
message,
tmp_branch='tmp_branch',
path=None,
checkout=True,
push=False):
"""
Snap a target branch to specified ref, i.e. overwrite target
branch to be equal to specified ref without losing previous history.
Args:
url (str): url of remote repo to use as upstream.
snap_ref (str): commit hash ref to snap to.
branch (str): target branch to apply snap.
message (str): snap commit message.
tmp_branch (str): temporary branch name.
path (Path): directory to checkout in.
checkout (bool): if False, use the existing checkout at path.
push (bool): push snap to target branch.
"""
assert not (not checkout and not path), \
'Cannot attempt snap with neither a checkout nor a path.'
with self.m.step.nest('snap branch'):
path = path or self.m.path.mkdtemp()
with self.m.context(cwd=path):
if checkout:
self.checkout(
url=url,
ref=branch,
path=path,
)
# If branch HEAD is already at snap ref, no further action needed.
if self.get_hash() == snap_ref:
return
remote_branch = 'origin/{branch}'.format(branch=branch)
# Create temporary branch off target branch.
self('checkout', '-b', tmp_branch, remote_branch)
# Reset temporary branch to snap ref, preserving history.
self('reset', '--hard', snap_ref)
self('merge', '--strategy', 'ours', remote_branch, '--message', message)
if push:
# Push snapped temporary branch to target branch.
self.push('{tmp_branch}:refs/heads/{branch}'.format(
tmp_branch=tmp_branch,
branch=branch,
))
def get_remote_branch_head(self, url, branch):
"""
Get remote branch head.
If the branch does not exist remotely, return empty string.
Args:
url (str): remote repository URL.
branch (str): branch name.
Returns:
str: HEAD commit SHA1 if it exists, otherwise empty string.
"""
head_output = self(
'ls-remote',
'--heads',
url,
branch,
stdout=self.m.raw_io.output(),
step_test_data=lambda: self.m.raw_io.test_api.stream_output(
'h3ll0 {branch}'.format(branch=branch)),
).stdout.strip()
# Parse out the commit SHA1.
return head_output.split()[0] if head_output else head_output
def get_remote_tag(self, url, tag):
"""
Get remote tag commit.
If the tag does not exist remotely, return empty string.
Args:
url (str): remote repository URL.
tag (str): git tag.
Returns:
str: tag's commit SHA1 if it exists, otherwise empty string.
"""
tag_output = self(
'ls-remote',
'--tags',
url,
tag,
stdout=self.m.raw_io.output(),
step_test_data=lambda: self.m.raw_io.test_api.stream_output(
'tagcommit {tag}'.format(tag=tag)),
).stdout.strip()
# Parse out the commit SHA1.
return tag_output.split()[0] if tag_output else tag_output