| # Copyright 2019 The Fuchsia Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| import functools |
| |
| from recipe_engine import recipe_api |
| |
| from PB.go.chromium.org.luci.buildbucket.proto import ( |
| builds_service as builds_service_pb2, |
| ) |
| from PB.go.chromium.org.luci.buildbucket.proto import common as common_pb2 |
| |
| from RECIPE_MODULES.fuchsia.utils import pluralize |
| |
| |
| class BuildbucketUtilApi(recipe_api.RecipeApi): |
| """Module for high-level buildbucket utilities specific to Fuchsia infra.""" |
| |
| # The maximum number of bytes allowed in a Buildbucket build's summary |
| # markdown. |
| MAX_SUMMARY_SIZE = 4000 |
| |
| @property |
| def id(self): |
| """A unique string identifier for the current build.""" |
| if self.m.led.launched_by_led: |
| return self.m.led.run_id |
| else: |
| return str(self.m.buildbucket.build.id) |
| |
| @property |
| def is_tryjob(self): |
| """Whether the current build is running as a tryjob.""" |
| bucket_name = self.m.buildbucket.build.builder.bucket |
| return bucket_name.endswith("try") |
| |
| @property |
| def build_url(self): |
| if self.m.led.launched_by_led: |
| return "https://ci.chromium.org/swarming/task/%s?server=%s" % ( |
| self.m.swarming.task_id, |
| self.m.buildbucket.build.infra.swarming.hostname, |
| ) |
| else: |
| return "https://ci.chromium.org/b/%d" % self.m.buildbucket.build.id |
| |
| @functools.cached_property |
| def triggering_ref(self): |
| """Determine the git ref that triggered this build. |
| |
| - If this is a tryjob, returns the target branch for the triggering CL, |
| e.g. "refs/heads/main". |
| - If this is a CI build, returns the ref of the commit that triggered |
| the build, e.g. "refs/heads/main" or |
| "refs/tags/releases/6.20211025.2.1". |
| - If the build input is empty, returns None. |
| """ |
| with self.m.step.nest("resolve triggering ref") as presentation: |
| bb_input = self.m.buildbucket.build.input |
| if bb_input.gerrit_changes: |
| change = bb_input.gerrit_changes[0] |
| branch = self.m.gerrit.change_details( |
| name="get change details", |
| change_id="%s~%s" % (change.project, change.change), |
| host=change.host, |
| test_data=self.m.json.test_api.output({"branch": "main"}), |
| ).json.output["branch"] |
| ref = "refs/heads/%s" % branch |
| elif bb_input.gitiles_commit.ref: |
| ref = bb_input.gitiles_commit.ref |
| else: |
| ref = None |
| |
| if ref: |
| presentation.step_text = ref |
| |
| return ref |
| |
| @property |
| def triggering_branch(self): |
| """Return the name of the git branch that triggered this build. |
| |
| Returns None if the build has no build input, or if it is a CI build |
| triggered by a non-branch ref, e.g. a tag. |
| """ |
| ref = self.triggering_ref |
| branch_prefix = "refs/heads/" |
| if ref and ref.startswith(branch_prefix): |
| return ref[len(branch_prefix) :] |
| return None |
| |
| def full_builder_name(self, builder=None): |
| builder = builder or self.m.buildbucket.build.builder |
| return "/".join((builder.project, builder.bucket, builder.builder)) |
| |
| def build_status(self, exc): |
| """Get corresponding build status for an exception.""" |
| if isinstance(exc, self.m.step.InfraFailure): |
| return common_pb2.INFRA_FAILURE |
| if isinstance(exc, self.m.step.StepFailure): |
| return common_pb2.FAILURE |
| return common_pb2.INFRA_FAILURE |
| |
| def last_build( |
| self, project=None, bucket=None, builder=None, fields=None, status=None |
| ): |
| """Returns the build proto for a builder's most recent successful build. |
| |
| Returns None if no successful builds could be found. |
| |
| Args: |
| project (str): LUCI project. If None, defaults to current build's |
| project. |
| bucket (str): BuildBucket bucket. If None, defaults to current build's |
| bucket. |
| builder (str): BuildBucket builder. If None, defaults to current |
| build's builder name. |
| fields (seq of str): BuildBucket Build proto message fields to include in |
| the result. If None, defaults to api.buildbucket.DEFAULT_FIELDS. |
| status (common_pb2.Status enum member): If set, will return the |
| last build with this status. |
| """ |
| predicate = builds_service_pb2.BuildPredicate() |
| predicate.builder.project = project or self.m.buildbucket.build.builder.project |
| predicate.builder.bucket = bucket or self.m.buildbucket.build.builder.bucket |
| predicate.builder.builder = builder or self.m.buildbucket.build.builder.builder |
| if status is not None: |
| predicate.status = status |
| |
| fields = fields or self.m.buildbucket.DEFAULT_FIELDS |
| builds = self.m.buildbucket.search(predicate, limit=1, fields=fields) |
| if not builds: |
| return None |
| |
| assert len(builds) == 1 |
| return builds[0] |
| |
| def display_builds(self, step_name, builds, raise_on_failure=False): |
| """Display build links and status for each input build. |
| |
| Optionally raise on build failure(s). |
| |
| Args: |
| step_name (str): Name of build group to display in step name. |
| builds (seq(buildbucket.v2.Build)): buildbucket Build objects. |
| See recipe_engine/buildbucket recipe module for more info. |
| raise_on_failure (bool): Raise InfraFailure or StepFailure on |
| failure. |
| |
| Raises: |
| InfraFailure: One or more input builds had infra failure. Takes |
| priority over step failures. |
| StepFailure: One or more of input builds failed. |
| """ |
| with self.m.step.nest(step_name) as presentation: |
| builds = sorted( |
| builds, |
| # Sort first by builder name, then by build ID. |
| key=lambda b: (b.builder.builder, b.id), |
| ) |
| |
| failures = [] |
| infra_failures = [] |
| for b in builds: |
| with self.m.step.nest(b.builder.builder) as display_step: |
| display_step.presentation.links[ |
| str(b.id) |
| ] = self.m.buildbucket.build_url(build_id=b.id) |
| if b.status == common_pb2.INFRA_FAILURE: |
| display_step.presentation.status = self.m.step.EXCEPTION |
| infra_failures.append(b) |
| elif b.status == common_pb2.FAILURE: |
| display_step.presentation.status = self.m.step.FAILURE |
| failures.append(b) |
| else: |
| if b.status != common_pb2.SUCCESS: |
| # For any other status, use warning color. |
| display_step.presentation.status = self.m.step.WARNING |
| continue |
| |
| num_failed = len(failures) + len(infra_failures) |
| if num_failed == 0: |
| return |
| |
| failure_message_parts = [] |
| |
| # If the build failed, include it in the final message, truncating |
| # its summary_markdown. This is a very conservative estimate about |
| # how much of each build's summary we should truncate. May not work |
| # well with large numbers of failed builds. |
| truncate_length = self.MAX_SUMMARY_SIZE / (2 * num_failed) |
| for b in infra_failures + failures: |
| failure_message_parts.append(self._summary_section(b, truncate_length)) |
| |
| if not raise_on_failure: # pragma: no cover |
| return |
| |
| # If there were any infra failures, raise purple. |
| if infra_failures: |
| presentation.status = self.m.step.EXCEPTION |
| exception_type = self.m.step.InfraFailure |
| # Otherwise if there were any step failures, raise red. |
| else: |
| presentation.status = self.m.step.FAILURE |
| exception_type = self.m.step.StepFailure |
| |
| raise exception_type( |
| "%s failed:\n\n%s" |
| % (pluralize("build", num_failed), "\n\n".join(failure_message_parts)) |
| ) |
| |
| def _summary_section(self, build, truncate_length): |
| url = self.m.buildbucket.build_url(build_id=build.id) |
| failure_header = "[%s](%s)" % (build.builder.builder, url) |
| if build.status == common_pb2.INFRA_FAILURE: |
| failure_header += " (infra failure)" |
| |
| summary = self.summary_message( |
| build.summary_markdown.strip(), |
| truncation_message="(truncated)", |
| escape_markdown=False, |
| truncate_length=truncate_length, |
| ) |
| # Don't include an empty summary, or any summaries if we'll |
| # have to truncate them by so much that they won't contain much |
| # useful information. |
| if truncate_length < 75 or not summary: |
| return failure_header |
| return failure_header + ":\n\n%s" % summary |
| |
| def summary_message( |
| self, |
| raw_text, |
| truncation_message, |
| max_line_length=200, |
| truncate_length=None, |
| escape_markdown=True, |
| ): |
| """Return a `summary_markdown`-friendly message from raw text. |
| |
| Args: |
| raw_text (str): Raw text to convert to summary message. |
| truncation_message (str): Message to attach to end of summary message |
| if the message was truncated. |
| max_line_length (int): Truncate lines containing more than this |
| number of bytes. |
| truncate_length (int): Maximum length of the summary message. |
| escape_markdown (bool): Whether to surround the summary in triple |
| backticks, causing it to render as raw monospace text in the |
| LUCI UI. |
| """ |
| if not truncate_length: |
| truncate_length = ( |
| self.MAX_SUMMARY_SIZE |
| # Include an extra buffer so that a parent build can expose its |
| # child's summary as its own summary, with a prefix. |
| - 150 |
| # Include an extra buffer for potential autocorrelator messages. |
| - 750 |
| ) |
| # Make space for the truncation message plus two newline characters. |
| truncate_length -= len(truncation_message) + 2 |
| dots = "..." |
| truncated_line = False |
| failure_lines = [] |
| for line in raw_text.rstrip().split("\n"): |
| if len(line.encode("utf-8")) > max_line_length: |
| line = unicode_truncate(line, max_line_length - len(dots)) + dots |
| truncated_line = True |
| failure_lines.append(line) |
| final_summary = "\n".join(failure_lines) |
| if len(final_summary) > truncate_length: |
| final_summary = ( |
| unicode_truncate(final_summary, truncate_length - len(dots)) + dots |
| ) |
| elif not truncated_line: |
| # No need to include a truncation warning if we didn't truncate |
| # any lines or the entire failure summary. |
| truncation_message = "" |
| |
| # Render the error details as code in the Milo build summary so |
| # that whitespace is respected; otherwise it would be ignored |
| # by Milo's markdown renderer. |
| if escape_markdown: |
| final_summary = "```\n%s\n```" % final_summary |
| |
| return final_summary + ( |
| "\n\n%s" % truncation_message if truncation_message else "" |
| ) |
| |
| |
| def unicode_truncate(string, max_bytes): |
| """Truncates a string while avoiding splitting unicode characters. |
| |
| The resulting string will have have len() <= max_bytes and will contain |
| valid UTF-8. |
| """ |
| chars = [] |
| byte_count = 0 |
| for unicode_char in string: |
| byte_count += len(unicode_char.encode()) |
| if byte_count > max_bytes: |
| break |
| chars.append(unicode_char) |
| return "".join(chars) |