| # Copyright 2021 The Fuchsia Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| import collections |
| import contextlib |
| import datetime |
| import itertools |
| |
| from PB.go.chromium.org.luci.buildbucket.proto import common as common_pb2 |
| from PB.recipe_engine import result as result_pb2 |
| from recipe_engine import recipe_api |
| |
| from RECIPE_MODULES.fuchsia.utils import pluralize |
| |
| # Timeout for Gerrit API operations. |
| GERRIT_TIMEOUT = datetime.timedelta(seconds=120) |
| |
| CQ_MESSAGE_TAGS = ( |
| "autogenerated:cv", |
| # TODO(olivernewman): Delete these entries one LUCI CV is used everywhere |
| # and we no longer need to support using the Commit Queue service. |
| "autogenerated:cq:dry-run", |
| "autogenerated:cq:full-run", |
| ) |
| |
| |
| class GerritAutoSubmitApi(recipe_api.RecipeApi): |
| def __init__(self, *args, **kwargs): |
| super().__init__(*args, **kwargs) |
| self._mergeable_cache = {} |
| |
| def _get_change_url(self, gerrit_host, change): |
| return f"https://{gerrit_host}/c/{change['project']}/+/{change['_number']}" |
| |
| def _is_auto_submit_service_account(self, email): |
| try: |
| # The current implementation of get_email() raises |
| # NotImplementedError but this lets us switch when it is |
| # implemented. |
| return email == self.m.service_account.default().get_email() |
| except NotImplementedError: |
| # Most service accounts doing auto-submit have "auto-submit" in |
| # their service account email. |
| # TODO(crbug.com/1275620) Remove this except block. |
| return "auto-submit" in email |
| |
| def get_eligible_changes( |
| self, |
| gerrit_host, |
| auto_submit_label, |
| max_attempts, |
| ): |
| raw_query = " ".join( |
| [ |
| "is:submittable", |
| "is:open", |
| "-is:wip", |
| "-(label:Commit-Queue+1 OR label:Commit-Queue+2)", |
| f"label:{auto_submit_label}+1", |
| # Ignore changes that haven't been updated in a while to keep |
| # response sizes small. We look back at least a few days to ensure |
| # that we can recover missed CLs in case the auto-submit builder |
| # breaks for a while. |
| "-age:10d", |
| # Ignore changes with unresolved comments so that changes don't |
| # get submitted until all reviewer feedback has been addressed. |
| "-has:unresolved", |
| ] |
| ) |
| changes = self.m.gerrit.change_query( |
| name="get changes", |
| query_string=raw_query, |
| query_params=["CURRENT_REVISION"], |
| host=gerrit_host, |
| max_attempts=3, |
| timeout=GERRIT_TIMEOUT, |
| test_data=self.m.json.test_api.output([]), |
| ).json.output |
| |
| eligible_changes = [] |
| for change in changes: |
| with self.m.step.nest(f"get details for {change['_number']}"): |
| change_id = self.m.url.unquote(change["id"]) |
| |
| try: |
| messages = self.m.gerrit.change_details( |
| "get messages", |
| change_id, |
| host=gerrit_host, |
| # Only attempt one request; if it fails, we'll retry on |
| # a subsequent auto-submit iteration. |
| max_attempts=1, |
| timeout=GERRIT_TIMEOUT, |
| test_data=self.m.json.test_api.output({"messages": []}), |
| ).json.output["messages"] |
| except self.m.step.StepFailure: # pragma: no cover |
| # Gerrit backend bugs/quirks can sometimes make it |
| # impossible to successfully query a change's details. In |
| # this case we should just skip the change and continue |
| # processing any remaining changes. |
| continue |
| |
| # Some prod accounts don't have emails so we need to use .get(). |
| # Tags used when posting results have an identifier appended, so we |
| # need to check if the tag starts with a CQ tag (example: |
| # "autogenerated:cq:full-run:1675352225"). Also needed to include a |
| # default value that is a string but isn't a prefix of any of the CQ |
| # tags. |
| updates = [ |
| msg["author"].get("email", "") |
| for msg in messages |
| if not msg.get("tag", "=other=").startswith(CQ_MESSAGE_TAGS) |
| ] |
| |
| # Stop retrying if the last `max_attempts` comments are all |
| # from the auto-submit service account. |
| if len(updates) >= max_attempts and all( |
| self._is_auto_submit_service_account(email) |
| for email in updates[-max_attempts:] |
| ): |
| continue |
| |
| other_changes_info = self.m.gerrit.changes_submitted_together( |
| "find dependent changes", |
| change_id, |
| query_params=["NON_VISIBLE_CHANGES"], |
| host=gerrit_host, |
| max_attempts=3, |
| timeout=GERRIT_TIMEOUT, |
| ).json.output |
| |
| if ( |
| len(other_changes_info["changes"]) > 1 |
| or other_changes_info.get("non_visible_changes", 0) != 0 |
| ): |
| continue |
| eligible_changes.append(change) |
| return eligible_changes |
| |
| def set_commit_queue(self, gerrit_host, change): |
| step = self.m.gerrit.set_review( |
| name="set CQ+2", |
| change_id=self.m.url.unquote(change["id"]), |
| labels={"Commit-Queue": 2}, |
| host=gerrit_host, |
| notify="NONE", |
| ok_ret="any", |
| timeout=GERRIT_TIMEOUT, |
| ) |
| return step.retcode == 0 |
| |
| def add_host_result(self, gerrit_host, changes, results): |
| if changes: |
| results[gerrit_host].extend( |
| self._get_change_url(gerrit_host, change) for change in changes |
| ) |
| |
| def filter_state(self, now, state): |
| delete = [] |
| for k, v in state.items(): |
| if "last_action_time_secs" not in v: |
| delete.append(k) # pragma: no cover |
| elif now - v["last_action_time_secs"] > 24 * 60 * 60: |
| delete.append(k) |
| for k in delete: |
| del state[k] |
| return state |
| |
| def can_try_submit(self, now, change_state, tree_status, max_attempts): |
| # Never seen this change before. |
| if not change_state: |
| return True |
| elif tree_status and not tree_status.open: |
| # Don't retry CQ if the tree is closed, to avoid overwhelming CQ during |
| # widespread breakages. |
| return False |
| |
| # Check if we attempted to submit too recently. |
| if ( |
| now - change_state["last_action_time_secs"] |
| > 2 ** change_state["attempts"] * 30 * 60 |
| ): |
| if change_state["attempts"] < max_attempts: |
| return True |
| return False |
| |
| def _attempt_conflict_resolution(self, change, gerrit_host): |
| """Try to resolve trivial merge conflicts, if any. |
| |
| Returns True if the change was already mergeable or becomes mergeable |
| after conflict resolution, false if it's not mergeable. |
| """ |
| # Cache mergeability by revision so we don't need to redo these |
| # operations over and over for the same revision. The revision will |
| # change if conflicts are resolved manually. |
| current_revision = change["current_revision"] |
| if current_revision not in self._mergeable_cache: |
| self._mergeable_cache[ |
| current_revision |
| ] = self._attempt_conflict_resolution_impl(change, gerrit_host) |
| return self._mergeable_cache[current_revision] |
| |
| def _attempt_conflict_resolution_impl(self, change, gerrit_host): |
| change_id = self.m.url.unquote(change["id"]) |
| try: |
| change_mergeability = self.m.gerrit.get_mergeable( |
| "get mergeable", |
| change_id, |
| host=gerrit_host, |
| max_attempts=3, |
| timeout=GERRIT_TIMEOUT, |
| ).json.output |
| except self.m.step.StepFailure: # pragma: no cover |
| # If this query fails we shouldn't fail the entire |
| # auto-submit run, and instead skip this change and continue |
| # trying to process other changes. |
| return False |
| |
| if change_mergeability["mergeable"]: |
| return True |
| |
| # Gerrit sometimes detects false positive merge conflicts in |
| # changes that are part of a stack where previous changes have |
| # already been submitted. These merge conflicts can be resolved |
| # trivially by doing a no-op rebase. |
| rebase_step = self.m.gerrit.rebase( |
| "rebase", |
| change_id, |
| host=gerrit_host, |
| ok_ret="any", |
| timeout=GERRIT_TIMEOUT, |
| # Don't transfer ownership of the change to the auto-submit |
| # service account, as that would remove any implicit OWNERS |
| # approval from the change author. |
| on_behalf_of_uploader=True, |
| ) |
| return rebase_step.retcode == 0 |
| |
| def submit( |
| self, |
| gerrit_host, |
| now, |
| state, |
| changes, |
| tree_status, |
| max_attempts, |
| ): |
| submitted_changes = [] |
| for change in changes: |
| with self.m.step.nest(str(change["_number"])) as presentation: |
| current_revision = change["current_revision"] |
| change_state = state.get(current_revision, {}) |
| if not self.can_try_submit( |
| now, change_state, tree_status, max_attempts |
| ): |
| presentation.step_text = "waiting for next retry" |
| continue |
| if not self._attempt_conflict_resolution(change, gerrit_host): |
| # If automatic merge conflict resolution fails, that means |
| # there's a true merge conflict that must be resolved |
| # manually by the change owner. |
| presentation.step_text = "merge conflict" |
| continue |
| if self.set_commit_queue(gerrit_host, change): |
| change_state["attempts"] = change_state.get("attempts", 0) + 1 |
| change_state["last_action_time_secs"] = now |
| state[current_revision] = change_state |
| submitted_changes.append(change) |
| return submitted_changes |
| |
| # TODO(nmulcahey): Add a Change class to the GerritApi to simplify this, and |
| # other logic in recipes. |
| def process_host( |
| self, |
| tree_status_host, |
| gerrit_host, |
| auto_submit_label, |
| max_attempts, |
| dry_run, |
| state, |
| results, |
| ): |
| tree_status = None |
| if tree_status_host: |
| tree_status = self.m.tree_status.get(tree_status_host) |
| |
| changes = None |
| with self.m.step.nest("get eligible") as presentation: |
| changes = self.get_eligible_changes( |
| gerrit_host, auto_submit_label, max_attempts |
| ) |
| if not changes: |
| presentation.step_text = "\nno eligible changes." |
| |
| if changes: |
| with self.m.step.nest("cq") as presentation: |
| if dry_run: |
| for change in [ |
| self._get_change_url(gerrit_host, change) for change in changes |
| ]: |
| presentation.links[change] = change |
| else: |
| now = int(self.m.time.time()) |
| # Drop all state for changes that are no longer eligible |
| # (e.g. because they've been submitted). |
| state = self.filter_state(now, state) |
| # Attempt to submit all the changes we found. |
| changes = self.submit( |
| gerrit_host, now, state, changes, tree_status, max_attempts |
| ) |
| |
| self.add_host_result(gerrit_host, changes, results) |
| return changes |
| |
| def summary(self, opts, results): |
| summary = [] |
| for host, change_results in results.items(): |
| summary.append(host) |
| if opts.repeat_duration_seconds: |
| summary.append(f" {len(change_results)} CLs") |
| else: |
| summary.extend(f" {x}" for x in change_results) |
| summary.append("") |
| |
| return "\n".join(summary) |
| |
| def __call__(self, opts, dry_run): |
| state = self.m.builder_state.fetch_previous_state() |
| results = collections.defaultdict(list) |
| |
| finish_time = self.m.time.time() + opts.repeat_duration_seconds |
| |
| try: |
| for i in itertools.count(): |
| if opts.repeat_duration_seconds: |
| ctx = self.m.step.nest(str(i)) |
| else: |
| ctx = contextlib.nullcontext() |
| with ctx as presentation: |
| submitted_changes = self._one_iteration( |
| opts, state, dry_run, results |
| ) |
| if presentation and submitted_changes: |
| presentation.step_text = ( |
| f"{pluralize('change', submitted_changes)} submitted" |
| ) |
| |
| if not opts.repeat_duration_seconds or self.m.time.time() > finish_time: |
| break |
| # Sleep between iterations to avoid overwhelming Gerrit with |
| # requests. |
| self.m.time.sleep(30) |
| finally: |
| self.m.builder_state.save(state) |
| |
| return result_pb2.RawResult( |
| summary_markdown=self.summary(opts, results), |
| status=common_pb2.SUCCESS, |
| ) |
| |
| def _one_iteration(self, opts, state, dry_run, results): |
| max_attempts = opts.max_attempts or 4 |
| |
| # Process hosts in a random order so if there's a problem with ACLs for one of |
| # them we won't starve the other hosts. |
| host_configs = list(opts.host_configs) |
| self.m.random.shuffle(host_configs) |
| |
| submitted_changes = [] |
| for host_config in host_configs: |
| gerrit_host = host_config.gerrit_host or "fuchsia-review.googlesource.com" |
| auto_submit_label = host_config.auto_submit_label or "Fuchsia-Auto-Submit" |
| |
| if len(opts.host_configs) == 1: |
| ctx = contextlib.nullcontext() |
| else: |
| ctx = self.m.step.nest(gerrit_host.split("-review")[0]) |
| |
| with ctx: |
| host_state = state.get(gerrit_host, {}) |
| submitted_changes.extend( |
| self.process_host( |
| host_config.tree_status_host, |
| gerrit_host, |
| auto_submit_label, |
| max_attempts, |
| dry_run, |
| host_state, |
| results, |
| ) |
| ) |
| state[gerrit_host] = host_state.copy() |
| return submitted_changes |