blob: ea8691fc6c94da4aa297cff363b42069e694d46f [file] [log] [blame]
# Copyright 2019 The Fuchsia Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import datetime
import functools
from typing import List, Optional, Sequence, Union
from recipe_engine import recipe_api
from PB.go.chromium.org.luci.buildbucket.proto import (
build as build_pb2,
builds_service as builds_service_pb2,
common as common_pb2,
)
from RECIPE_MODULES.fuchsia.utils import pluralize
DEFAULT_SEARCH_TIMEOUT = datetime.timedelta(minutes=5)
class BuildbucketUtilApi(recipe_api.RecipeApi):
"""Module for high-level buildbucket utilities specific to Fuchsia infra."""
# The maximum number of bytes allowed in a Buildbucket build's summary
# markdown.
MAX_SUMMARY_SIZE = 4000
DEFAULT_SEARCH_TIMEOUT = DEFAULT_SEARCH_TIMEOUT
@property
def id(self):
"""A unique string identifier for the current build."""
if self.m.led.launched_by_led and not self.m.led.led_build:
return self.m.led.run_id
else:
return str(self.m.buildbucket.build.id)
@property
def bucket(self):
if self.m.led.shadowed_bucket:
return self.m.led.shadowed_bucket
return self.m.buildbucket.build.builder.bucket
@property
def is_dev_environment(self):
"""Whether the current build is running in dev environment."""
bucket_name = self.m.buildbucket.build.builder.bucket
return ".dev." in bucket_name
@property
def is_shadowed(self):
"""Whether the current builder is in a shadow bucket."""
bucket_name = self.m.buildbucket.build.builder.bucket
return "shadow" in bucket_name.split(".")
@property
def is_tryjob(self):
"""Whether the current build is running as a tryjob."""
bucket_name = self.m.buildbucket.build.builder.bucket
return bucket_name.endswith(("try", "try.shadow"))
@property
def is_dev_or_try(self):
"""Whether the current build is running as a tryjob or in dev environments."""
return self.is_dev_environment or self.is_tryjob
@property
def build_url(self):
if self.m.led.launched_by_led:
return f"https://ci.chromium.org/swarming/task/{self.m.swarming.task_id}?server={self.m.buildbucket.backend_hostname}"
else:
return f"https://ci.chromium.org/b/{int(self.m.buildbucket.build.id)}"
@functools.cached_property
def triggering_ref(self):
"""Determine the git ref that triggered this build.
- If this is a tryjob, returns the target branch for the triggering CL,
e.g. "refs/heads/main".
- If this is a CI build, returns the ref of the commit that triggered
the build, e.g. "refs/heads/main" or
"refs/tags/releases/6.20211025.2.1".
- If the build input is empty, returns None.
"""
with self.m.step.nest("resolve triggering ref") as presentation:
bb_input = self.m.buildbucket.build.input
if bb_input.gerrit_changes:
change = bb_input.gerrit_changes[0]
branch = self.m.gerrit.change_details(
name="get change details",
change_id=f"{change.project}~{change.change}",
host=change.host,
test_data=self.m.json.test_api.output({"branch": "main"}),
).json.output["branch"]
ref = f"refs/heads/{branch}"
elif bb_input.gitiles_commit.ref:
ref = bb_input.gitiles_commit.ref
else:
ref = None
if ref:
presentation.step_text = ref
return ref
@property
def triggering_branch(self):
"""Return the name of the git branch that triggered this build.
Returns None if the build has no build input, or if it is a CI build
triggered by a non-branch ref, e.g. a tag.
"""
ref = self.triggering_ref
branch_prefix = "refs/heads/"
if ref and ref.startswith(branch_prefix):
return ref.removeprefix(branch_prefix)
return None
def full_builder_name(self, builder=None, remove_shadow=False):
"""Retrieve the full name of the current builder.
Args:
builder (BuilderID proto): proto to use in place of
self.m.buildbucket.build.builder
remove_shadow (bool): Whether to remove "shadow" from bucket names
(e.g., "foo.ci.shadow" to "foo.ci")
"""
builder = builder or self.m.buildbucket.build.builder
bucket = builder.bucket
if remove_shadow:
bucket = ".".join(part for part in bucket.split(".") if part != "shadow")
return f"{builder.project}/{bucket}/{builder.builder}"
def build_status(self, exc):
"""Get corresponding build status for an exception."""
if isinstance(exc, self.m.step.InfraFailure):
return common_pb2.INFRA_FAILURE
if isinstance(exc, self.m.step.StepFailure):
return common_pb2.FAILURE
return common_pb2.INFRA_FAILURE
def last_n_builds(
self,
project: Optional[str] = None,
bucket: Optional[str] = None,
builder: Optional[str] = None,
fields: Optional[Sequence[str]] = None,
status: Optional[common_pb2.Status] = None,
max_age: Optional[datetime.timedelta] = None,
n: int = 1,
timeout: Union[datetime.timedelta, int, float] = DEFAULT_SEARCH_TIMEOUT,
) -> List[build_pb2.Build]:
"""Returns the build protos for a builder's most recent builds.
Args:
project: LUCI project. If None, defaults to current build's project.
bucket: BuildBucket bucket. If None, defaults to current build's
bucket.
builder: BuildBucket builder. If None, defaults to current build's
builder name.
fields: BuildBucket Build proto message fields to include in the
result. If None, defaults to api.buildbucket.DEFAULT_FIELDS.
status: If set, will return the last build with this status.
max_age: Max age of builds to return.
n: Number of builds to retrieve.
timeout: Timeout for buildbucket query.
"""
predicate = builds_service_pb2.BuildPredicate()
predicate.builder.project = project or self.m.buildbucket.build.builder.project
predicate.builder.bucket = bucket or self.m.buildbucket.build.builder.bucket
predicate.builder.builder = builder or self.m.buildbucket.build.builder.builder
if status is not None:
predicate.status = status
if max_age is not None:
predicate.create_time.start_time.FromDatetime(
self.m.time.utcnow() - max_age
)
fields = fields or self.m.buildbucket.DEFAULT_FIELDS
return self.m.buildbucket.search(
predicate,
limit=n,
fields=fields,
timeout=timeout,
)
def last_build(
self,
project: Optional[str] = None,
bucket: Optional[str] = None,
builder: Optional[str] = None,
fields: Optional[Sequence[str]] = None,
status: Optional[common_pb2.Status] = None,
max_age: Optional[datetime.timedelta] = None,
timeout: Union[datetime.timedelta, int, float] = DEFAULT_SEARCH_TIMEOUT,
) -> Optional[build_pb2.Build]:
"""Returns the build proto for a builder's most recent build.
Args:
project: LUCI project. If None, defaults to current build's project.
bucket: BuildBucket bucket. If None, defaults to current build's
bucket.
builder: BuildBucket builder. If None, defaults to current build's
builder name.
fields: BuildBucket Build proto message fields to include in the
result. If None, defaults to api.buildbucket.DEFAULT_FIELDS.
status: If set, will return the last build with this status.
max_age: Max age of builds to return.
timeout: Timeout for buildbucket query.
"""
builds = self.last_n_builds(
project=project,
bucket=bucket,
builder=builder,
fields=fields,
status=status,
max_age=max_age,
n=1,
timeout=timeout,
)
if not builds:
return None
assert len(builds) == 1
return builds[0]
def display_builds(self, step_name, builds, raise_on_failure=False):
"""Display build links and status for each input build.
Optionally raise on build failure(s).
Args:
step_name (str): Name of build group to display in step name.
builds (seq(buildbucket.v2.Build)): buildbucket Build objects.
See recipe_engine/buildbucket recipe module for more info.
raise_on_failure (bool): Raise InfraFailure or StepFailure on
failure.
Raises:
InfraFailure: One or more input builds had infra failure. Takes
priority over step failures.
StepFailure: One or more of input builds failed.
"""
with self.m.step.nest(step_name) as presentation:
builds = sorted(
builds,
# Sort first by builder name, then by build ID.
key=lambda b: (b.builder.builder, b.id),
)
failures = []
infra_failures = []
for b in builds:
with self.m.step.nest(b.builder.builder) as pres:
pres.links[str(b.id)] = self.m.buildbucket.build_url(build_id=b.id)
if b.status == common_pb2.INFRA_FAILURE:
pres.status = self.m.step.EXCEPTION
infra_failures.append(b)
elif b.status == common_pb2.FAILURE:
pres.status = self.m.step.FAILURE
failures.append(b)
else:
if b.status != common_pb2.SUCCESS:
# For any other status, use warning color.
pres.status = self.m.step.WARNING
continue
num_failed = len(failures) + len(infra_failures)
if num_failed == 0:
return
failure_message_parts = []
# If the build failed, include it in the final message, truncating
# its summary_markdown. This is a very conservative estimate about
# how much of each build's summary we should truncate.
truncate_length = self.MAX_SUMMARY_SIZE / (2 * num_failed)
current_length = 0
for b in infra_failures + failures:
failure_message_part = self._summary_section(b, truncate_length)
# Don't add messages which would exceed the max summary size.
# Include a small buffer for the truncation message and final
# message wrapper.
if (
len(failure_message_part) + current_length
> self.MAX_SUMMARY_SIZE - 100
):
failure_message_parts.append(
"(truncated, see individual subbuilds for more info)"
)
break
failure_message_parts.append(failure_message_part)
current_length += len(failure_message_part)
if not raise_on_failure: # pragma: no cover
return
# If there were any infra failures, raise purple.
if infra_failures:
presentation.status = self.m.step.EXCEPTION
exception_type = self.m.step.InfraFailure
# Otherwise if there were any step failures, raise red.
else:
presentation.status = self.m.step.FAILURE
exception_type = self.m.step.StepFailure
failure_message = "\n\n".join(failure_message_parts)
raise exception_type(
f"{pluralize('build', num_failed)} failed:\n\n{failure_message}",
)
def _summary_section(self, build, truncate_length):
url = self.m.buildbucket.build_url(build_id=build.id)
failure_header = f"[{build.builder.builder}]({url})"
if build.status == common_pb2.INFRA_FAILURE:
failure_header += " (infra failure)"
summary = self.summary_message(
build.summary_markdown.strip(),
truncation_message="(truncated)",
escape_markdown=False,
truncate_length=truncate_length,
)
# Don't include an empty summary, or any summaries if we'll
# have to truncate them by so much that they won't contain much
# useful information.
if truncate_length < 75 or not summary:
return failure_header
return failure_header + f":\n\n{summary}"
def summary_message(
self,
raw_text,
truncation_message,
max_line_length=400,
truncate_length=None,
escape_markdown=True,
):
"""Return a `summary_markdown`-friendly message from raw text.
Args:
raw_text (str): Raw text to convert to summary message.
truncation_message (str): Message to attach to end of summary message
if the message was truncated.
max_line_length (int): Truncate lines containing more than this
number of bytes.
truncate_length (int): Maximum length of the summary message.
escape_markdown (bool): Whether to surround the summary in triple
backticks, causing it to render as raw monospace text in the
LUCI UI.
"""
if not truncate_length:
truncate_length = (
self.MAX_SUMMARY_SIZE
# Include an extra buffer so that a parent build can expose its
# child's summary as its own summary, with a prefix.
- 150
# Include an extra buffer for potential autocorrelator messages.
- 750
)
# Make space for the truncation message plus two newline characters.
truncate_length -= len(truncation_message) + 2
dots = "..."
truncated_line = False
failure_lines = []
for line in raw_text.rstrip().split("\n"):
if len(line.encode("utf-8")) > max_line_length:
line = unicode_truncate(line, max_line_length - len(dots)) + dots
truncated_line = True
failure_lines.append(line)
final_summary = "\n".join(failure_lines)
if len(final_summary) > truncate_length:
final_summary = (
unicode_truncate(final_summary, truncate_length - len(dots)) + dots
)
elif not truncated_line:
# No need to include a truncation warning if we didn't truncate
# any lines or the entire failure summary.
truncation_message = ""
# Render the error details as code in the Milo build summary so
# that whitespace is respected; otherwise it would be ignored
# by Milo's markdown renderer.
if escape_markdown:
final_summary = f"```\n{final_summary}\n```"
return final_summary + (
f"\n\n{truncation_message}" if truncation_message else ""
)
def unicode_truncate(string, max_bytes):
"""Truncates a string while avoiding splitting unicode characters.
The resulting string will have have len() <= max_bytes and will contain
valid UTF-8.
"""
chars = []
byte_count = 0
for unicode_char in string:
byte_count += len(unicode_char.encode())
if byte_count > max_bytes:
break
chars.append(unicode_char)
return "".join(chars)