blob: 67ef374364f55c9dc65f9f1e3459a152d8d8ee1f [file] [log] [blame]
# -*- coding: utf-8 -*-
# Copyright 2019 The Fuchsia Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
from recipe_engine import recipe_api
from PB.go.chromium.org.luci.buildbucket.proto import (
builds_service as builds_service_pb2,
)
# The maximum number of bytes allowed in a Buildbucket build's summary markdown.
BUILDBUCKET_MAX_SUMMARY_SIZE = 4000
class BuildbucketUtilApi(recipe_api.RecipeApi):
"""Module for high-level buildbucket utilities specific to Fuchsia infra."""
@property
def id(self):
"""A unique string identifier for the current build."""
if self.m.led.launched_by_led:
return self.m.led.run_id
else:
return str(self.m.buildbucket.build.id)
@property
def is_tryjob(self):
"""Whether the current build is running as a tryjob."""
bucket_name = self.m.buildbucket.build.builder.bucket
return bucket_name.endswith("try")
@property
def build_url(self):
if self.m.led.launched_by_led:
return "https://ci.chromium.org/swarming/task/%s?server=%s" % (
self.m.swarming.task_id,
self.m.buildbucket.build.infra.swarming.hostname,
)
else:
return "https://ci.chromium.org/b/%d" % self.m.buildbucket.build.id
def full_builder_name(self, builder=None):
builder = builder or self.m.buildbucket.build.builder
return "/".join((builder.project, builder.bucket, builder.builder))
def last_build(self, project, bucket, builder, fields=None, status=None):
"""Returns the build proto for a builder's most recent successful build.
Returns None if no successful builds could be found.
Args:
project (str): LUCI project.
bucket (str): BuildBucket bucket.
builder (str): BuildBucket builder.
fields (seq of str): BuildBucket Build proto message fields to include in
the result. If None, defaults to api.buildbucket.DEFAULT_FIELDS.
status (common_pb2.Status enum member): If set, will return the
last build with this status.
"""
predicate = builds_service_pb2.BuildPredicate()
predicate.builder.project = project
predicate.builder.bucket = bucket
predicate.builder.builder = builder
if status is not None:
predicate.status = status
fields = fields or self.m.buildbucket.DEFAULT_FIELDS
builds = self.m.buildbucket.search(predicate, limit=1, fields=fields)
if not builds:
return None
assert len(builds) == 1
return builds[0]
def summary_message(self, raw_text, truncation_message, max_line_length=200):
"""Return a `summary_markdown`-friendly message from raw text.
Args:
raw_text (str): Raw text to convert to summary message.
truncation_message (str): Message to attach to end of summary message
if the message was truncated.
max_line_length (int): Truncate lines longer than this value.
"""
truncate_length = (
BUILDBUCKET_MAX_SUMMARY_SIZE
- len(truncation_message)
# Make space for the truncation message plus two newline characters.
- 2
# Include an extra buffer so that a parent build can expose its
# child's summary as its own summary, with a prefix.
- 150
)
# TODO(olivernewman): In Python 3 the length of a unicode character is
# always one regardless of how many bytes it contains, so we'll need to
# adjust max_line_length by `len(dots.encode())` to make it correct.
dots = "…"
truncated_line = False
failure_lines = []
for line in raw_text.split("\n"):
if len(line) > max_line_length:
line = unicode_truncate(line, max_line_length - len(dots)) + dots
truncated_line = True
failure_lines.append(line)
final_summary = "\n".join(failure_lines)
if len(final_summary) > truncate_length:
final_summary = (
unicode_truncate(final_summary, truncate_length - len(dots)) + dots
)
elif not truncated_line:
# No need to include a truncation warning if we didn't truncate
# any lines or the entire failure summary.
truncation_message = ""
# Render the error details as code in the Milo build summary so
# that whitespace is respected; otherwise it would be ignored
# by Milo's markdown renderer.
return "```\n%s\n```%s" % (
final_summary,
"\n\n%s" % truncation_message if truncation_message else "",
)
def unicode_truncate(string, max_bytes):
"""Truncates a string while avoiding splitting unicode characters.
The resulting string will have have len() <= max_bytes and will contain
valid UTF-8.
"""
chars = []
byte_count = 0
for unicode_char in string.decode("utf-8"):
encoded = unicode_char.encode("utf-8")
if byte_count + len(encoded) > max_bytes:
break
chars.append(encoded)
byte_count += len(encoded)
return "".join(chars)