blob: de36f5f72c645becc076220907ceee7053f67133 [file] [log] [blame]
# Copyright 2021 The Fuchsia Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import collections
import contextlib
import datetime
import itertools
from PB.go.chromium.org.luci.buildbucket.proto import common as common_pb2
from PB.recipe_engine import result as result_pb2
from recipe_engine import recipe_api
from RECIPE_MODULES.fuchsia.utils import pluralize
# Timeout for Gerrit API operations.
GERRIT_TIMEOUT = datetime.timedelta(seconds=120)
CQ_MESSAGE_TAGS = (
"autogenerated:cv",
# TODO(olivernewman): Delete these entries one LUCI CV is used everywhere
# and we no longer need to support using the Commit Queue service.
"autogenerated:cq:dry-run",
"autogenerated:cq:full-run",
)
class GerritAutoSubmitApi(recipe_api.RecipeApi):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._mergeable_cache = {}
def _get_change_url(self, gerrit_host, change):
return f"https://{gerrit_host}/c/{change['project']}/+/{change['_number']}"
def _is_auto_submit_service_account(self, email):
try:
# The current implementation of get_email() raises
# NotImplementedError but this lets us switch when it is
# implemented.
return email == self.m.service_account.default().get_email()
except NotImplementedError:
# Most service accounts doing auto-submit have "auto-submit" in
# their service account email.
# TODO(crbug.com/1275620) Remove this except block.
return "auto-submit" in email
def get_eligible_changes(
self,
gerrit_host,
auto_submit_label,
max_attempts,
):
raw_query = " ".join(
[
"is:submittable",
"is:open",
"-is:wip",
"-(label:Commit-Queue+1 OR label:Commit-Queue+2)",
f"label:{auto_submit_label}+1",
# Ignore changes that haven't been updated in a while to keep
# response sizes small. We look back at least a few days to ensure
# that we can recover missed CLs in case the auto-submit builder
# breaks for a while.
"-age:10d",
# Ignore changes with unresolved comments so that changes don't
# get submitted until all reviewer feedback has been addressed.
"-has:unresolved",
]
)
changes = self.m.gerrit.change_query(
name="get changes",
query_string=raw_query,
query_params=["CURRENT_REVISION"],
host=gerrit_host,
max_attempts=3,
timeout=GERRIT_TIMEOUT,
test_data=self.m.json.test_api.output([]),
).json.output
eligible_changes = []
for change in changes:
with self.m.step.nest(f"get details for {change['_number']}"):
change_id = self.m.url.unquote(change["id"])
try:
messages = self.m.gerrit.change_details(
"get messages",
change_id,
host=gerrit_host,
# Only attempt one request; if it fails, we'll retry on
# a subsequent auto-submit iteration.
max_attempts=1,
timeout=GERRIT_TIMEOUT,
test_data=self.m.json.test_api.output({"messages": []}),
).json.output["messages"]
except self.m.step.StepFailure: # pragma: no cover
# Gerrit backend bugs/quirks can sometimes make it
# impossible to successfully query a change's details. In
# this case we should just skip the change and continue
# processing any remaining changes.
continue
# Some prod accounts don't have emails so we need to use .get().
# Tags used when posting results have an identifier appended, so we
# need to check if the tag starts with a CQ tag (example:
# "autogenerated:cq:full-run:1675352225"). Also needed to include a
# default value that is a string but isn't a prefix of any of the CQ
# tags.
updates = [
msg["author"].get("email", "")
for msg in messages
if not msg.get("tag", "=other=").startswith(CQ_MESSAGE_TAGS)
]
# Stop retrying if the last `max_attempts` comments are all
# from the auto-submit service account.
if len(updates) >= max_attempts and all(
self._is_auto_submit_service_account(email)
for email in updates[-max_attempts:]
):
continue
other_changes_info = self.m.gerrit.changes_submitted_together(
"find dependent changes",
change_id,
query_params=["NON_VISIBLE_CHANGES"],
host=gerrit_host,
max_attempts=3,
timeout=GERRIT_TIMEOUT,
).json.output
if (
len(other_changes_info["changes"]) > 1
or other_changes_info.get("non_visible_changes", 0) != 0
):
continue
eligible_changes.append(change)
return eligible_changes
def set_commit_queue(self, gerrit_host, change):
step = self.m.gerrit.set_review(
name="set CQ+2",
change_id=self.m.url.unquote(change["id"]),
labels={"Commit-Queue": 2},
host=gerrit_host,
notify="NONE",
ok_ret="any",
timeout=GERRIT_TIMEOUT,
)
return step.retcode == 0
def add_host_result(self, gerrit_host, changes, results):
if changes:
results[gerrit_host].extend(
self._get_change_url(gerrit_host, change) for change in changes
)
def filter_state(self, now, state):
delete = []
for k, v in state.items():
if "last_action_time_secs" not in v:
delete.append(k) # pragma: no cover
elif now - v["last_action_time_secs"] > 24 * 60 * 60:
delete.append(k)
for k in delete:
del state[k]
return state
def can_try_submit(self, now, change_state, tree_status, max_attempts):
# Never seen this change before.
if not change_state:
return True
elif tree_status and not tree_status.open:
# Don't retry CQ if the tree is closed, to avoid overwhelming CQ during
# widespread breakages.
return False
# Check if we attempted to submit too recently.
if (
now - change_state["last_action_time_secs"]
> 2 ** change_state["attempts"] * 30 * 60
):
if change_state["attempts"] < max_attempts:
return True
return False
def _attempt_conflict_resolution(self, change, gerrit_host):
"""Try to resolve trivial merge conflicts, if any.
Returns True if the change was already mergeable or becomes mergeable
after conflict resolution, false if it's not mergeable.
"""
# Cache mergeability by revision so we don't need to redo these
# operations over and over for the same revision. The revision will
# change if conflicts are resolved manually.
current_revision = change["current_revision"]
if current_revision not in self._mergeable_cache:
self._mergeable_cache[
current_revision
] = self._attempt_conflict_resolution_impl(change, gerrit_host)
return self._mergeable_cache[current_revision]
def _attempt_conflict_resolution_impl(self, change, gerrit_host):
change_id = self.m.url.unquote(change["id"])
try:
change_mergeability = self.m.gerrit.get_mergeable(
"get mergeable",
change_id,
host=gerrit_host,
max_attempts=3,
timeout=GERRIT_TIMEOUT,
).json.output
except self.m.step.StepFailure: # pragma: no cover
# If this query fails we shouldn't fail the entire
# auto-submit run, and instead skip this change and continue
# trying to process other changes.
return False
if change_mergeability["mergeable"]:
return True
# Gerrit sometimes detects false positive merge conflicts in
# changes that are part of a stack where previous changes have
# already been submitted. These merge conflicts can be resolved
# trivially by doing a no-op rebase.
rebase_step = self.m.gerrit.rebase(
"rebase",
change_id,
host=gerrit_host,
ok_ret="any",
timeout=GERRIT_TIMEOUT,
# Don't transfer ownership of the change to the auto-submit
# service account, as that would remove any implicit OWNERS
# approval from the change author.
on_behalf_of_uploader=True,
)
return rebase_step.retcode == 0
def submit(
self,
gerrit_host,
now,
state,
changes,
tree_status,
max_attempts,
):
submitted_changes = []
for change in changes:
with self.m.step.nest(str(change["_number"])) as presentation:
current_revision = change["current_revision"]
change_state = state.get(current_revision, {})
if not self.can_try_submit(
now, change_state, tree_status, max_attempts
):
presentation.step_text = "waiting for next retry"
continue
if not self._attempt_conflict_resolution(change, gerrit_host):
# If automatic merge conflict resolution fails, that means
# there's a true merge conflict that must be resolved
# manually by the change owner.
presentation.step_text = "merge conflict"
continue
if self.set_commit_queue(gerrit_host, change):
change_state["attempts"] = change_state.get("attempts", 0) + 1
change_state["last_action_time_secs"] = now
state[current_revision] = change_state
submitted_changes.append(change)
return submitted_changes
# TODO(nmulcahey): Add a Change class to the GerritApi to simplify this, and
# other logic in recipes.
def process_host(
self,
tree_status_host,
gerrit_host,
auto_submit_label,
max_attempts,
dry_run,
state,
results,
):
tree_status = None
if tree_status_host:
tree_status = self.m.tree_status.get(tree_status_host)
changes = None
with self.m.step.nest("get eligible") as presentation:
changes = self.get_eligible_changes(
gerrit_host, auto_submit_label, max_attempts
)
if not changes:
presentation.step_text = "\nno eligible changes."
if changes:
with self.m.step.nest("cq") as presentation:
if dry_run:
for change in [
self._get_change_url(gerrit_host, change) for change in changes
]:
presentation.links[change] = change
else:
now = int(self.m.time.time())
# Drop all state for changes that are no longer eligible
# (e.g. because they've been submitted).
state = self.filter_state(now, state)
# Attempt to submit all the changes we found.
changes = self.submit(
gerrit_host, now, state, changes, tree_status, max_attempts
)
self.add_host_result(gerrit_host, changes, results)
return changes
def summary(self, opts, results):
summary = []
for host, change_results in results.items():
summary.append(host)
if opts.repeat_duration_seconds:
summary.append(f" {len(change_results)} CLs")
else:
summary.extend(f" {x}" for x in change_results)
summary.append("")
return "\n".join(summary)
def __call__(self, opts, dry_run):
state = self.m.builder_state.fetch_previous_state()
results = collections.defaultdict(list)
finish_time = self.m.time.time() + opts.repeat_duration_seconds
try:
for i in itertools.count():
if opts.repeat_duration_seconds:
ctx = self.m.step.nest(str(i))
else:
ctx = contextlib.nullcontext()
with ctx as presentation:
submitted_changes = self._one_iteration(
opts, state, dry_run, results
)
if presentation and submitted_changes:
presentation.step_text = (
f"{pluralize('change', submitted_changes)} submitted"
)
if not opts.repeat_duration_seconds or self.m.time.time() > finish_time:
break
# Sleep between iterations to avoid overwhelming Gerrit with
# requests.
self.m.time.sleep(30)
finally:
self.m.builder_state.save(state)
return result_pb2.RawResult(
summary_markdown=self.summary(opts, results),
status=common_pb2.SUCCESS,
)
def _one_iteration(self, opts, state, dry_run, results):
max_attempts = opts.max_attempts or 4
# Process hosts in a random order so if there's a problem with ACLs for one of
# them we won't starve the other hosts.
host_configs = list(opts.host_configs)
self.m.random.shuffle(host_configs)
submitted_changes = []
for host_config in host_configs:
gerrit_host = host_config.gerrit_host or "fuchsia-review.googlesource.com"
auto_submit_label = host_config.auto_submit_label or "Fuchsia-Auto-Submit"
if len(opts.host_configs) == 1:
ctx = contextlib.nullcontext()
else:
ctx = self.m.step.nest(gerrit_host.split("-review")[0])
with ctx:
host_state = state.get(gerrit_host, {})
submitted_changes.extend(
self.process_host(
host_config.tree_status_host,
gerrit_host,
auto_submit_label,
max_attempts,
dry_run,
host_state,
results,
)
)
state[gerrit_host] = host_state.copy()
return submitted_changes