blob: 6617a612dc4c1885ba03cdf3ba5a62d7931af49a [file] [log] [blame]
# Copyright 2021 The Fuchsia Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import functools
import os
from contextlib import contextmanager
from recipe_engine import recipe_api
from google.protobuf import json_format
from PB.recipe_modules.fuchsia.rbe import rbe_metrics
from PB.go.fuchsia.dev.foundry_x.re_client.api.stats import stats as stats_pb2
from PB.go.fuchsia.dev.foundry_x.re_client.api.proxy import log as log_pb2
from RECIPE_MODULES.fuchsia.utils import pluralize
RECLIENT_CXX_WRAPPER = "reclient-cxx-wrapper.sh"
# For builds using the goma input processor, sometimes the deps cache file is
# too big for the default setting. So just set the max file size permitted to
# be large enough.
_DEPS_CACHE_MAX_MB = "512"
# Helper function to serialize protos to dictionary expected by BQ.
#
# json_format.MessageToDict converts proto maps to dicts, which are
# incompatible with the corresponding BQ schema, so they are manually
# converted to lists.
def record_to_bq_dict(record):
def map_to_list(m):
return [{"key": k, "value": v} for k, v in sorted(m.items())]
def map_to_list_proto_val(m):
return [
{"key": k, "value": json_format.MessageToDict(v)}
for k, v in sorted(m.items())
]
def rerun_metadata_map_to_list(dct, metadata):
for elm, data in zip(dct, metadata):
elm["output_file_digests"] = map_to_list(data.output_file_digests)
elm["output_directory_digests"] = map_to_list(data.output_directory_digests)
elm["event_times"] = map_to_list(data.event_times)
record_dict = json_format.MessageToDict(record, preserving_proto_field_name=True)
record_dict["command"]["platform"] = map_to_list(record.command.platform)
if "input" in record_dict["command"]:
record_dict["command"]["input"]["environment_variables"] = map_to_list(
record.command.input.environment_variables
)
if "remote_metadata" in record_dict:
record_dict["remote_metadata"]["event_times"] = map_to_list_proto_val(
record.remote_metadata.event_times
)
record_dict["remote_metadata"]["output_file_digests"] = map_to_list(
record.remote_metadata.output_file_digests
)
record_dict["remote_metadata"]["output_directory_digests"] = map_to_list(
record.remote_metadata.output_directory_digests
)
if "rerun_metadata" in record_dict["remote_metadata"]:
rerun_metadata_map_to_list(
record_dict["remote_metadata"]["rerun_metadata"],
record.remote_metadata.rerun_metadata,
)
if "local_metadata" in record_dict:
record_dict["local_metadata"]["event_times"] = map_to_list_proto_val(
record.local_metadata.event_times
)
record_dict["local_metadata"]["environment"] = map_to_list(
record.local_metadata.environment
)
record_dict["local_metadata"]["labels"] = map_to_list(
record.local_metadata.labels
)
if "rerun_metadata" in record_dict["local_metadata"]:
rerun_metadata_map_to_list(
record_dict["local_metadata"]["rerun_metadata"],
record.local_metadata.rerun_metadata,
)
return record_dict
class RbeApi(recipe_api.RecipeApi):
"""RemoteExecutionApi contains helper functions for using remote execution
services via re-client/re-proxy."""
def __init__(self, props, *args, **kwargs):
super().__init__(*args, **kwargs)
self._rbe_path = None
self._config_path = None
self._platform = props.platform
self._instance = props.instance
self._reclient_version = props.reclient_version
if not self._platform and self._test_data.enabled:
self._platform = "fake_rbe_platform"
if not self._instance and self._test_data.enabled:
self._instance = "fake_rbe_instance"
self._log_format = props.log_format or "reducedtext"
self._started = False
@contextmanager
def __call__(self):
"""Make context wrapping reproxy start/stop.
Raises:
StepFailure or InfraFailure if it fails to start/stop.
"""
# If we do not override this value, then it user-controlled and a
# malicious user could manipulate the value in the fuchsia.git
# config file to send requests to a compromised backend (leak).
assert self._instance, "No RBE backend in builder properties."
# Save current value of infra_step so we can reset it when we
# yield back.
is_infra_step = self.m.context.infra_step
# Separate invocations of RBE tools should use unique paths to avoid
# conflicts between log/metric files.
working_dir = self.m.path.mkdtemp(prefix="rbe")
with self.m.context(env=self._environment(working_dir), infra_steps=True):
try:
self._start()
with self.m.context(infra_steps=is_infra_step):
yield
finally:
if not self.m.runtime.in_global_shutdown:
self._stop(working_dir)
def ensure(self):
with self.m.context(infra_steps=True):
pkgs = self.m.cipd.EnsureFile()
pkgs.add_package("infra/rbe/client/${platform}", self._reclient_version)
self._rbe_path = self.m.path["cache"].join("reclient")
self.m.cipd.ensure(self._rbe_path, pkgs)
return self._rbe_path
@property
def _bootstrap_path(self):
assert self._rbe_path
return self._rbe_path.join("bootstrap")
@property
def _rewrapper_path(self):
assert self._rbe_path
return self._rbe_path.join("rewrapper")
@property
def cxx_compiler_wrapper_command(self):
start_dir = self.m.path["start_dir"]
# Note: the flags here should closely track those used in
# the Fuchsia project's cxx-remote-wrapper.sh.
# cmake generates commands with absolute paths, which can be a problem
# for reclient. Once reclient supports internal path relativization
# (b/232261587) we can drop the python wrapper.
return [
"vpython3",
"-u", # unbuffered stdout/stderr
str(self.resource("relativize_args.py")),
"--",
str(self._rewrapper_path),
"--labels=type=compile,compiler=clang,lang=cpp",
"--canonicalize_working_dir=true",
"--exec_strategy=remote_local_fallback", # better diagnostics
"--exec_root="
+ os.path.commonpath([str(start_dir), str(self.m.path["cache"])]),
]
@functools.cached_property
def cxx_compiler_wrapper(self):
generated_script_dir = self.m.path.mkdtemp("cxx-rbe")
wrapped_command = " ".join(self.cxx_compiler_wrapper_command)
cxx_wrapper_script_path = generated_script_dir.join(RECLIENT_CXX_WRAPPER)
wrapper_script_text = """#!/bin/sh
exec {wrapper} "$@"
""".format(
wrapper=wrapped_command
)
self.m.file.write_text(
"write %s script" % RECLIENT_CXX_WRAPPER,
cxx_wrapper_script_path,
wrapper_script_text,
)
self.m.step(
"make %s executable" % RECLIENT_CXX_WRAPPER,
["chmod", "+x", cxx_wrapper_script_path],
)
return cxx_wrapper_script_path
def _environment(self, working_dir):
cache_dir = self.m.path["cache"].join("rbe")
deps_cache_dir = cache_dir.join("deps")
self.m.file.ensure_directory("create rbe cache dir", deps_cache_dir)
# Environment. The source of truth for remote execution configuration
# is the Fuchsia tree (see $FUCHSIA_OUT_DIR/rbe_config.json). These
# values are used to modify the configuration in Infrastructure when
# appropriate. These should not be used to modify the behavior of the
# build in a meaningful way.
return {
"RBE_service": "remotebuildexecution.googleapis.com:443",
# TODO(fangism): sync docker image with that used in Fuchsia
"RBE_platform": self._platform,
# Override default instance. Infrastructure uses different RBE
# backends for different environments.
"RBE_instance": self._instance,
# Set deps cache path.
"RBE_deps_cache_dir": deps_cache_dir,
"RBE_deps_cache_max_mb": _DEPS_CACHE_MAX_MB,
# Set preferred log format for reproxy.
"RBE_log_format": self._log_format,
# Set log paths within the task working directory.
"RBE_log_dir": working_dir,
"RBE_output_dir": working_dir,
"RBE_proxy_log_dir": working_dir,
"RBE_server_address": "unix://{}".format(working_dir.join("reproxy.sock")),
"RBE_socket_path": working_dir.join("reproxy.sock"),
# Use GCE credentials by default. Infrastructure presents an
# emulated GCE metadata server in all environments for uniformity.
"RBE_use_application_default_credentials": "False",
"RBE_use_gce_credentials": "True",
}
@property
def _reproxy_path(self):
assert self._rbe_path
return self._rbe_path.join("reproxy")
def set_path(self, path):
"""Path to the reproxy/bootstrap binary directory."""
self._rbe_path = path
def set_config_path(self, config_path):
"""Path to the config file for the repository being built.
In the case of Fuchsia, this should be set to the path referenced by
$FUCHSIA_OUT_DIR/rbe_config.json as reported by `gn gen`.
"""
self._config_path = config_path
def _start(self):
"""Start reproxy."""
assert not self._started
with self.m.step.nest("setup remote execution"):
cmd = [self._bootstrap_path, "--re_proxy={}".format(self._reproxy_path)]
if self._config_path:
cmd += ["--cfg={}".format(self._config_path)]
self.m.step("start reproxy", cmd)
self._started = True
def _stop(self, working_dir):
"""Stop reproxy."""
with self.m.step.nest("teardown remote execution"):
cmd = [self._bootstrap_path, "--shutdown"]
if self._config_path:
cmd += ["--cfg={}".format(self._config_path)]
try:
self.m.step("stop reproxy", cmd)
self._started = False
finally:
# reproxy/rewrapper/bootstrap record various log information in
# a number of locations. At the time of this implementation,
# the following log files are used:
# 1. bootstrap.<INFO|WARNING|ERROR|FATAL> is standard logging
# for `bootstrap`. Each log file includes more severe logging
# levels, e.g. bootstrap.WARNING includes WARNING, ERROR & FATAL
# log messages.
# 2. rbe_metrics.txt is the text representation of a proto
# message that describes metrics related to the rbe execution.
# 3. reproxy.<INFO|WARNING|ERROR|FATAL> is standard logging for
# `reproxy`. See notes in #1 for more details.
# 4. reproxy_log.txt is the log file that records all info
# about all actions that are processed through reproxy.
# 5. reproxy_outerr.log is merged stderr/stdout of `reproxy`.
# 6. rewrapper.<INFO|WARNING|ERROR|FATAL> is standard logging
# for `rewrapper`. See notes in #1 for more details.
# 7. reproxy-gomaip.<INFO|WARNING|ERROR|FATAL> is logging
# for `gomaip` which is the input processor used by `reclient`
# for finding dependencies of `clang` compile invocations.
#
# We extract the WARNING log messages for each portion of the
# local rbe client as well as reproxy stdout/stderr and metrics
# from the build by default. If further debugging is required,
# you could increase the verbosity of log messages that we
# retain in logdog or add the full reproxy_log.txt log file to
# the list of outputs.
diagnostic_outputs = [
"bootstrap.WARNING",
"rbe_metrics.txt",
"reproxy.WARNING",
"reproxy-gomaip.WARNING",
"reproxy_outerr.log",
"rewrapper.WARNING",
]
for output in diagnostic_outputs:
path = working_dir.join(output)
# Not all builds use rbe, so it might not exist.
self.m.path.mock_add_paths(path)
if self.m.path.exists(path):
self.m.file.read_text(
"read {}".format(output),
path,
test_data="test log",
include_log=False,
)
# reproxy also produces a log file of all the actions which
# it handles including more detailed debugging information
# useful for debugging.
rpl_ext = {
"text": "rpl",
"reducedtext": "rrpl",
}[self._log_format]
rpl_file_glob = "*.{}".format(rpl_ext)
rpl_paths = self.m.file.glob_paths(
name="find {} files".format(rpl_ext),
source=working_dir,
pattern=rpl_file_glob,
test_data=[
"reproxy_2021-10-16_22_52_23.{}".format(rpl_ext),
],
)
# More than 1 rpl file is likely a bug but we can punt until
# that breaks someone.
for p in rpl_paths:
self.m.path.mock_add_paths(p)
# Not all builds use rbe, so it might not exist.
if self.m.path.exists(p):
self.m.file.read_text(
"read {}".format(self.m.path.basename(p)),
p,
test_data="test log",
include_log=False,
)
self._upload_metrics(working_dir=working_dir)
try:
self._upload_logs(working_dir=working_dir)
except Exception:
lines = self.m.utils.traceback_format_exc().splitlines()
self.m.step.empty("rbe log upload failure").presentation.logs[
"exception"
] = lines
def _upload_metrics(self, working_dir):
if not (self.m.buildbucket.builder_name and self.m.buildbucket_util.id):
# Skip the upload if it does not have build input information.
return
bq_pb = rbe_metrics.RbeMetrics()
bq_pb.build_id = self.m.buildbucket_util.id
bq_pb.builder_name = self.m.buildbucket.builder_name
bq_pb.created_at.FromDatetime(self.m.time.utcnow())
bq_pb.instance = self._instance
path = self.m.path.join(working_dir, "rbe_metrics.pb")
self.m.path.mock_add_paths(path)
if not self.m.path.exists(path): # pragma: no cover
return
stats = self.m.file.read_proto(
"read rbe_metrics.pb",
path,
stats_pb2.Stats,
codec="BINARY",
include_log=False,
test_proto=stats_pb2.Stats(
environment=dict(
foo="false",
bar="42",
)
),
)
bq_pb.stats.CopyFrom(stats)
bq_json_dict = json_format.MessageToDict(
message=bq_pb, preserving_proto_field_name=True
)
# "environment" is a map field and gets serialized to a JSON map.
# Unfortunately, this is incompatible with the corresponding BQ schema,
# which is a repeated field and thus expects a JSON array.
envs = bq_pb.stats.environment
bq_json_dict["stats"]["environment"] = [
{"key": k, "value": v} for k, v in sorted(envs.items())
]
step_result = self.m.bqupload.insert(
step_name="upload metrics",
project="fuchsia-engprod-metrics-prod",
dataset="metrics",
table="rbe_client_metrics_v2",
rows=[bq_json_dict],
# TODO(fxbug.dev/114570): Send alerts to the build team.
alert_emails=["olivernewman@google.com"],
)
step_result.presentation.logs["json.output"] = self.m.json.dumps(
bq_json_dict["stats"], indent=4
).splitlines()
def _upload_logs(self, working_dir):
if not (self.m.buildbucket.builder_name and self.m.buildbucket_util.id):
# Skip the upload if it does not have build input information.
return
cmd = [
self._rbe_path.join("logdump"),
"--proxy_log_dir",
working_dir,
"--output_dir",
working_dir,
]
self.m.step("convert reproxy command log to binary proto", cmd)
logs_bin_proto = self.m.path.join(working_dir, "reproxy_log.pb")
log_dump = self.m.file.read_proto(
"read {}".format(self.m.path.basename(logs_bin_proto)),
logs_bin_proto,
log_pb2.LogDump,
codec="BINARY",
include_log=False,
test_proto=log_pb2.LogDump(),
)
if not log_dump.records:
return
rows = [
{
"build_id": self.m.buildbucket_util.id,
"log": record_to_bq_dict(record),
}
for record in log_dump.records
]
self.m.bqupload.insert(
step_name="upload logs",
project="fuchsia-engprod-metrics-prod",
dataset="metrics",
table="rbe_client_command_logs_v2",
rows=rows,
).presentation.step_text = pluralize("row", rows)