blob: 632e82c1188c50661ed86cdec1e4bfca1e6efda1 [file] [log] [blame]
#!/usr/bin/env python3
# Copyright 2023 The Fuchsia Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""General tool to manage sync-branches as described in Fuchsia RFC-0153.
This tool provides several commands that simplify creation and management
of sync branches. A typical use case would be:
1) Use the `create` command to create a new local sync-branch, with
the right cherry-picked commits in it.
2) Optional: resolve any conflicts that happen during the cherry-pick
operation launched by the `create` command. Alternatively, it is possible
to rebase the sync branch, for example to re-order commits or update
documentation.
3) Use the `rebase` command to rebase the current sync-branch on top of
the current upstream reference.
4) Optional: resolve any conflicts that happen during the rebase operation.
5) Use the `merge` command to merge the sync branch into the
main development branch.
Use `<command> --help` for command-specific details.
"""
import argparse
import json
import os
import shlex
import shutil
import subprocess
import sys
import tempfile
import time
from pathlib import Path
from typing import Any, Iterable, List, Sequence, TypeAlias
_DEFAULT_UPSTREAM_REF = "origin/upstream/master"
_DEFAULT_DEV_BRANCH = "main"
_DEFAULT_SYNC_BRANCH_NAME = "sync-branch-" + time.strftime("%Y-%m-%d", time.gmtime())
_VERBOSE = False
# A type used to represent a sequence of command arguments
# Using List[str | Path] or Iterable[str | Path] would be more appropriate,
# but results in numerous mypy errors that require far too much noise to fix.
CmdArgsList: TypeAlias = List[Any]
def log(msg: str):
if _VERBOSE:
print("LOG: " + msg, file=sys.stderr)
def cmd_args_to_list(cmd_args: CmdArgsList) -> Sequence[str]:
return [str(c) for c in cmd_args]
def cmd_quote(cmd_args: Iterable[str | Path]) -> str:
return " ".join(shlex.quote(str(c)) for c in cmd_args)
def run_command(cmd_args: CmdArgsList, **kwargs) -> subprocess.CompletedProcess:
log("CMD: " + cmd_quote(cmd_args))
return subprocess.run(cmd_args_to_list(cmd_args), **kwargs)
def get_command_output(cmd_args: CmdArgsList, **kwargs) -> str:
assert "capture_output" not in kwargs
assert "text" not in kwargs
kwargs["capture_output"] = True
kwargs["text"] = True
ret = run_command(cmd_args, **kwargs)
ret.check_returncode()
return ret.stdout.strip()
def write_file(path: Path, content: str):
path.parent.mkdir(exist_ok=True, parents=True)
path.write_text(content)
class CommandError(Exception):
pass
class GitDirectory(object):
"""A Git directory."""
def __init__(self, git_dir: Path):
assert git_dir.is_dir(), "Not a directory: %s" % git_dir
assert (git_dir / ".git").is_dir(), (
"Not a git directory (missing .git/): %s" % git_dir
)
self._git_dir = git_dir
self._cfg_dir = git_dir / ".git" / "sync_branch"
self._log: List | None = None
@property
def git_dir(self) -> Path:
return self._git_dir
@property
def cfg_dir(self) -> Path:
return self._cfg_dir
@property
def cfg_file_path(self) -> Path:
return self._cfg_dir / "config.json"
@property
def log_file_path(self) -> Path:
return self._cfg_dir / "log.json"
def append_log(self, item: Any) -> None:
if self._log is None:
if self.log_file_path.exists():
self._log = json.load(self.log_file_path.open())
else:
self._log = []
self._log.append(item)
def write_log(self) -> None:
if self._log:
with self.log_file_path.open("w") as f:
json.dump(self._log, f)
def cmd(self, args: CmdArgsList, env: dict | None = None) -> None:
cmd_args = ["git", "-C", self._git_dir] + args
ret = run_command(["git", "-C", self._git_dir] + args, env=env)
self.append_log(
{"command": [str(a) for a in cmd_args], "status": ret.returncode}
)
if ret.returncode != 0:
self.write_log()
ret.check_returncode()
def cmd_output(self, args: CmdArgsList) -> str:
return get_command_output(["git", "-C", self._git_dir] + args)
class PrintingGitDirectory(GitDirectory):
"""A GitDirectory subclass that prints commands instead of running them."""
def __init__(self, parent: GitDirectory):
self._parent = parent
def append_log(self, item: Any) -> None:
pass
def write_log(self) -> None:
pass
def cmd(self, args: CmdArgsList, env: dict | None = None) -> None:
print(
"git -C %s %s"
% (self._parent.git_dir, " ".join(shlex.quote(a) for a in args))
)
class SyncBranchConfig(object):
def __init__(self, git_dir: GitDirectory):
self.git_dir = git_dir
self.upstream_ref: str = _DEFAULT_UPSTREAM_REF
self.dev_branch_name: str = _DEFAULT_DEV_BRANCH
self.sync_branch_name: str = _DEFAULT_SYNC_BRANCH_NAME
self.stem_commit: str = ""
self.upstream_commit: str = ""
self.src_commits: List[str] = []
def init_from_create_args(self, args: argparse.Namespace) -> None:
if args.name:
self.sync_branch_name = args.name
if args.upstream_ref:
self.upstream_ref = args.upstream_ref
if args.dev_branch:
self.dev_branch_name = args.dev_branch
# Verify that the branch does not exist yet.
g = self.git_dir
has_sync_branch = False
on_sync_branch = False
current_branch = g.cmd_output(["branch", "--show-current"])
if current_branch and current_branch == self.sync_branch_name:
has_sync_branch = True
on_sync_branch = True
else:
has_sync_branch = bool(
g.cmd_output(["branch", "--list", self.sync_branch_name])
)
if has_sync_branch:
if not args.force:
raise CommandError(
"Cannot create new sync-branch over current one, use --force to override!"
)
if on_sync_branch:
g.cmd(["checkout", "--force", self.dev_branch_name])
g.cmd(["branch", "-D", self.sync_branch_name])
self.upstream_commit = self.git_dir.cmd_output(["rev-parse", self.upstream_ref])
# Compute stem commit, this the first common ancestor of
# the upstream and development branch. For example in:
#
# upstream --A1-------A2---A3
# \ \
# \ +--B1"--B2"--C2"
# \ \ \
# \ B1'--B2' \
# \ \ \
# dev -------------B1---B2---------C1--C2--D1--D2
#
# The stem commit is A1
#
self.stem_commit = g.cmd_output(
["merge-base", self.upstream_ref, self.dev_branch_name]
).strip()
if self.stem_commit == self.upstream_commit and not args.force:
raise CommandError(
"All upstream commits in current branch, no sync branch needed!"
)
# Compute source commits
#
# To do that, use --ancestry-path to limit the result to
# commits that belong only to the most recent sync branch
# is required.
#
# For example, without --ancestry-path, a command like
# `git rev-list STEM..dev` would return the following commits
# from the previous example history, which contains multiple
# sync branches originating from the same stem:
#
# upstream A2
# \
# +--B1"--B2"--C2"
# \ \
# B1'--B2' \
# \ \
# dev C1--C2--D1--D2
#
# By using --ancestry-path=C2", the result is
# limited to only the commits from the most recent sync branch, i.e.:
#
# upstream A2
# \
# +--B1"--B2"--C2"
# \
# \
# \
# dev D1--D2
#
# The commit to pass to --ancestry-path is the _second_
# parent of the most recent merge in the dev branch.
#
# To find it, the command below uses `rev-list --merges --format=%P`
# to print two lines per merge commit that is in the common ancestry
# of the stem and the dev HEAD.
#
# In the example above, the stem is A2, the HEAD is D2, and
# they share two merge commits C1 and D1, and the commands
# prints the following:
#
# commit <D1>
# <C2> <C2">
# commit <C1>
# <B2> <B2'>
#
# Taking the last hash of the second line of output gives us
# the commit value for --ancestry-path.
merge_commits_lines = g.cmd_output(
[
"rev-list",
"%s..%s" % (self.upstream_ref, self.dev_branch_name),
"--merges",
"--format=%P",
]
).splitlines()
if len(merge_commits_lines) == 0:
# No merge commit means we never created a sync branch
# in the past. In that case, do not use --ancestry-path
ancestry_path_args = []
else:
if len(merge_commits_lines) < 2:
raise CommandError(
"Invalid git rev-list output!\n %s\n"
% " \n".join(merge_commits_lines)
)
parents = merge_commits_lines[1].split(" ")
if len(parents) != 2:
# We really don't know how to handle merges with more
# than 2 parents. These were not created by the sync-branch tool.
raise CommandError(
"Most recent merge has more than 2 parents!\n %s\n"
% " \n".join(merge_commits_lines)
)
ancestry_path_args = ["--ancestry-path=" + parents[1]]
# Now list all the source commits.
self.src_commits = g.cmd_output(
[
"rev-list",
"--reverse",
"--no-merges",
"--pretty=oneline",
"%s..%s" % (self.stem_commit, self.dev_branch_name),
]
+ ancestry_path_args
).splitlines()
def has_config_file(self) -> bool:
return self.git_dir.cfg_file_path.exists()
def read_config_file(self):
c = json.load(self.git_dir.cfg_file_path.open())
self.upstream_ref = c["upstream_ref"]
self.dev_branch_name = c["dev_branch_name"]
self.sync_branch_name = c["sync_branch_name"]
self.stem_commit = c["stem_commit"]
self.src_commits = c["src_commits"]
def write_config_file(self) -> None:
c = {
"upstream_ref": self.upstream_ref,
"dev_branch_name": self.dev_branch_name,
"sync_branch_name": self.sync_branch_name,
"stem_commit": self.stem_commit,
"src_commits": self.src_commits,
}
write_file(self.git_dir.cfg_file_path, json.dumps(c, sort_keys=True, indent=2))
def clear_config_file(self) -> None:
cfg_dir = self.git_dir.cfg_dir
if cfg_dir.exists():
shutil.rmtree(cfg_dir)
def get_git_directory(args: argparse.Namespace) -> GitDirectory:
# Compute git directory.
if args.git_dir:
git_dir = Path(args.git_dir)
else:
git_dir = Path.cwd()
if not git_dir.is_dir():
raise CommandError("Not a directory: %s" % git_dir)
if not (git_dir / ".git").is_dir():
raise CommandError("Not a git directory (missing .git): %s" % git_dir)
return GitDirectory(git_dir)
def command_create(args: argparse.Namespace) -> None:
git_dir = get_git_directory(args)
sbc = SyncBranchConfig(git_dir)
sbc.init_from_create_args(args)
sbc.write_config_file()
if args.print_only:
git_dir = PrintingGitDirectory(git_dir)
# Create a tag pointing to the stem commit, for debugging.
git_dir.cmd(["tag", "-f", "SYNC_BRANCH_STEM", sbc.stem_commit])
git_dir.cmd(["checkout", "-b", sbc.sync_branch_name, sbc.stem_commit])
git_dir.cmd(["cherry-pick"] + [c.split(" ")[0] for c in sbc.src_commits])
# Run the cleanup command now. This will write the log too.
command_cleanup_history(args)
def command_cleanup_history(args: argparse.Namespace, git_dir=None) -> None:
if git_dir is None:
git_dir = get_git_directory(args)
sbc = SyncBranchConfig(git_dir)
if not sbc.has_config_file():
raise CommandError(
"No current sync branch, please use `create` command first!"
)
sbc.read_config_file()
if args.print_only:
git_dir = PrintingGitDirectory(git_dir)
# Embed the cleanup script in this one for easier distribution
# and development, in particular when making changes to this script,
# it is easier to copy it as a single file to a different location
# and invoke it from there, to avoid running commands from a previous
# version by mistake during sync-rebases.
CLEANUP_SCRIPT = r"""
#!/bin/bash
# Copyright 2023 The Fuchsia Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
# This script is used as a message filter to git filter-branch
# If the current commit was not submitted through Gerrit, do
# not do anything.
#
# Otherwise, cleanup the message by doing the following:
# - Remove previous Original-Change-Id: lines
# - Convert 'Changed-Id: XXXX' into 'Original-Change-Id: XXX'
# - Add a new 'Change-Id:' line with a random value that mimics
# the Gerrit commit hook logic.
# - Remove Gerrit-specific lines such as 'Reviewed-on:'.
input="$(cat -)"
if ! grep -q '^Commit-Queue: ' <<< "$input"; then
# Just pass stdin to stdout unmodified.
printf "%s\n" "${input}"
exit 0
fi
# Pass the processed input.
sed \
-e '/^Original-Change-Id:/d' \
-e 's/^Change-Id:/Original-Change-Id:/' \
-e '/^Reviewed-on:/d' \
-e '/^Reviewed-by:/d' \
-e '/^Fuchsia-Auto-Submit:/d' \
-e '/^Commit-Queue:/d' \
<<< "$input"
# Add new random Change-Id
random=$({ echo "$GIT_COMMITER_NAME $GIT_COMMITER_EMAIL"; echo "$GIT_COMMIT"; } | git hash-object --stdin)
echo "Change-Id: I${random}"
"""
with tempfile.TemporaryDirectory(prefix="sync-branch-cleanup-") as temp_cleanup_dir:
cleanup_script = Path(temp_cleanup_dir) / "cleanup.sh"
cleanup_script.write_text(CLEANUP_SCRIPT)
cleanup_script.chmod(0o755)
cmd_args = [
"filter-branch",
"--msg-filter",
cleanup_script,
"--force",
"--",
"%s..%s" % (sbc.stem_commit, sbc.sync_branch_name),
]
# Running git filter-branch prints a warning and waits for several seconds before
# proceeding, unless this environment variable is defined. There is no command-line
# option to do the same.
env = os.environ.copy()
env["FILTER_BRANCH_SQUELCH_WARNING"] = "1"
git_dir.cmd(cmd_args, env=env)
git_dir.write_log()
def command_rebase(args: argparse.Namespace) -> None:
git_dir = get_git_directory(args)
sbc = SyncBranchConfig(git_dir)
if not sbc.has_config_file():
raise CommandError("No current sync branch, please use `create` command first!")
sbc.read_config_file()
if args.print_only:
git_dir = PrintingGitDirectory(git_dir)
cmd_args = [
"rebase",
"--onto",
sbc.upstream_ref,
sbc.stem_commit,
sbc.sync_branch_name,
]
git_dir.cmd(cmd_args)
git_dir.write_log()
def command_merge(args: argparse.Namespace) -> None:
git_dir = get_git_directory(args)
sbc = SyncBranchConfig(git_dir)
if not sbc.has_config_file():
raise CommandError("No current sync branch, please use `create` command first!")
sbc.read_config_file()
if args.print_only:
git_dir = PrintingGitDirectory(git_dir)
git_dir.cmd(["checkout", sbc.dev_branch_name])
merge_cmd = ["merge", "--no-ff", "-X", "theirs", sbc.sync_branch_name]
if args.no_commit:
merge_cmd += ["--no-commit"]
git_dir.cmd(merge_cmd)
git_dir.write_log()
def main():
parser = argparse.ArgumentParser(
description=__doc__, formatter_class=argparse.RawTextHelpFormatter
)
parser.add_argument("--verbose", action="store_true", help="Enable verbose mode")
parser.add_argument("--git-dir", help="Specify git directory.")
parser.add_argument(
"--print-only",
action="store_true",
help="Only print the git commands, do not run them.",
)
subparsers = parser.add_subparsers(required=True, help="Available commands")
parser_create = subparsers.add_parser(
"create",
help="Create new local sync-branch",
formatter_class=argparse.RawTextHelpFormatter,
description=r"""
Create a new local sync branch, which will start at the 'stem' commit that
is common to both the upstream and development branches, and will include
all commits between them (as cherry picks).
For example, consider the following initial state:
upstream ---A1--A2--A3
\
dev - - - - - B1--B2--B3
The command will create a new branch starting from A2 that contains
cherry picks of B1..B3, as in:
sync-branch B1'--B2'--B3'
/
upstream ---A1--A2--A3
\
dev - - - - - B1--B2--B3
Note that the cherry-pick command launched by `create` may fail in case of
rare conflicts. If this happens, just fix the issue manually then use
`git cherry-pick --continue` to complete the operation,
After the cherry pick has completed, the user is free to rebase the branch,
for example to reorder commits, or update documentation, before invoking
the `rebase` command.
Note that `create` will not work if a local sync branch already exists, or
when upstream commits are already reachable in the developement branch.
These conditions can be overriden by using the `--force` option.
""",
)
parser_create.add_argument(
"--force", action="store_true", help="Discard any existing sync branch."
)
parser_create.add_argument(
"--name",
default=_DEFAULT_SYNC_BRANCH_NAME,
help=f"Specify sync branch name (default {_DEFAULT_SYNC_BRANCH_NAME})",
)
parser_create.add_argument(
"--upstream-ref",
default=_DEFAULT_UPSTREAM_REF,
help=f"Specify upstream branch reference (default {_DEFAULT_UPSTREAM_REF})",
)
parser_create.add_argument(
"--dev-branch",
default=_DEFAULT_DEV_BRANCH,
help=f"Specify development branch (default {_DEFAULT_DEV_BRANCH})",
)
parser_create.set_defaults(func=command_create)
parser_cleanup_history = subparsers.add_parser(
"cleanup_history",
help="Cleanup the sync-branch commit messages.",
formatter_class=argparse.RawTextHelpFormatter,
description=r"""
Cleanup the git commit messages of the current sync-branch.
For each commit in the sync-branch, look at its commit message
for Gerrit-specific lines (e.g. 'Commit-Queue:'). If some are
found, remove them, and create a new Change-Id value for the
commit (while saving the previous one as Original-Change-Id
for later reference).
Normally, one shouldn't need to call this command, as this
operation is performed implicitly by the 'create' command.
However, if the latter fails during the 'cherry-pick' phase,
(which should happen rarely), the developer is expected to
fix the conflicts and finish the cherry-pick, then invoke
this command.
Finally, calling this command several times is safe, as it
will not modify commits that have already been cleaned up.
""",
)
parser_cleanup_history.set_defaults(func=command_cleanup_history)
parser_rebase = subparsers.add_parser(
"rebase",
help="Rebase current sync-branch on top of upstream.",
formatter_class=argparse.RawTextHelpFormatter,
description=r"""
Rebase the current sync branch on top of the current upstream commit.
This must be called after the completion of a `create` operation.
For example consider the following state resulting from a previous
`create` operation:
sync-branch B1'--B2'--B3'
/
upstream ---A1--A2--A3
\
dev - - - - - B1--B2--B3
After the rebase, the sync branch should be something like:
sync-branch B1''--B2''--B3''
/
upstream ---A1--A2--A3
\
dev - - - - - B1--B2--B3
Where both B3 and B3'' correspond to the new desired sources.
After the rebase, the user is free to inspect the branch, rebase it or do
any necessary cleanups, and verify that everything still works as expected.
The user will then launch the `merge` command to merge the result into the
main development branch.
""",
)
parser_rebase.set_defaults(func=command_rebase)
parser_merge = subparsers.add_parser(
"merge",
help="Merge sync-branch to main development branch.",
formatter_class=argparse.RawTextHelpFormatter,
description=r"""
Merge the current sync branch into the main development branch. This should
only happen after a successful `rebase` command invocation.
For example consider the following state resulting from a previous
`rebase` operation:
sync-branch B1''--B2''--B3''
/
upstream ---A1--A2--A3
\
dev - - - - - B1--B2--B3
The end result will be:
upstream ---A1--A2--A3
\
sync-branch B1''--B2''--B3''
\
dev - - - - - B1--B2--B3-----------C1
Where C1 is the new merge commit on the main development branch.
Note that A3 will become the stem commit for future 'create'
operations.
""",
)
parser_merge.add_argument(
"--no-commit", action="store_true", help="Do not commit the merge."
)
parser_merge.set_defaults(func=command_merge)
args = parser.parse_args()
if args.verbose:
global _VERBOSE
_VERBOSE = True
try:
args.func(args)
except CommandError as e:
print(str(e), file=sys.stderr)
return 1
except subprocess.CalledProcessError as e:
print("%s:\n%s\n" % (e, e.stderr), file=sys.stderr)
return e.returncode
return 0
if __name__ == "__main__":
sys.exit(main())