blob: c519355ccc3740d53a7962a00d414ac9fbfde476 [file] [log] [blame]
# Copyright 2021 The Fuchsia Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
from contextlib import contextmanager
from recipe_engine import recipe_api
from google.protobuf import json_format
from PB.recipe_modules.fuchsia.rbe import rbe_metrics
from PB.go.fuchsia.dev.foundry_x.re_client.api.stats import stats as stats_pb2
class RbeApi(recipe_api.RecipeApi):
"""RemoteExecutionApi contains helper functions for using remote execution
services via re-client/re-proxy."""
def __init__(self, props, *args, **kwargs):
super(RbeApi, self).__init__(*args, **kwargs)
self._rbe_path = None
self._config_path = None
self._instance = props.instance
self._log_format = props.log_format or "reducedtext"
self._started = False
@contextmanager
def __call__(self):
"""Make context wrapping reproxy start/stop.
Raises:
StepFailure or InfraFailure if it fails to start/stop.
"""
# Save current value of infra_step so we can reset it when we
# yield back.
is_infra_step = self.m.context.infra_step
# Separate invocations of RBE tools should use unique paths to avoid
# conflicts between log/metric files.
working_dir = self.m.path.mkdtemp(prefix="rbe")
with self.m.context(env=self._environment(working_dir), infra_steps=True):
try:
self._start()
with self.m.context(infra_steps=is_infra_step):
yield
finally:
self._stop(working_dir)
@property
def _bootstrap_path(self):
assert self._rbe_path
return self._rbe_path.join("bootstrap")
def _environment(self, working_dir):
cache_dir = self.m.path["cache"].join("rbe")
# Environment. The source of truth for remote execution configuration
# is the Fuchsia tree (see $FUCHSIA_OUT_DIR/rbe_config.json). These
# values are used to modify the configuration in Infrastructure when
# appropriate. These should not be used to modify the behavior of the
# build in a meaningful way.
return {
# Override default instance. Infrastructure uses different RBE
# backends for different environments.
"RBE_instance": self._instance,
# Set deps cache path.
"RBE_deps_cache_dir": cache_dir.join("deps"),
# Set preferred log format for reproxy.
"RBE_log_format": self._log_format,
# Set log paths within the task working directory.
"RBE_log_dir": working_dir,
"RBE_output_dir": working_dir,
"RBE_proxy_log_dir": working_dir,
"RBE_server_address": "unix://{}".format(working_dir.join("reproxy.sock")),
# Use GCE credentials by default. Infrastructure presents an
# emulated GCE metadata server in all environments for uniformity.
"RBE_use_application_default_credentials": "False",
"RBE_use_gce_credentials": "True",
}
def _log_format_to_ext(self, format): # pragma: no cover
if format == "text":
return "rpl"
elif format == "reducedtext":
return "rrpl"
@property
def _reproxy_path(self):
assert self._rbe_path
return self._rbe_path.join("reproxy")
def set_path(self, path):
"""Path to the reproxy/bootstrap binary directory."""
self._rbe_path = path
def set_config_path(self, config_path):
"""Path to the config file for the repository being built.
In the case of Fuchsia, this should be set to the path referenced by
$FUCHSIA_OUT_DIR/rbe_config.json as reported by `gn gen`.
"""
self._config_path = config_path
def _start(self):
"""Start reproxy."""
assert not self._started
with self.m.step.nest("setup remote execution") as presentation:
cmd = [self._bootstrap_path, "--re_proxy={}".format(self._reproxy_path)]
if self._config_path:
cmd += ["--cfg={}".format(self._config_path)]
self.m.step("start reproxy", cmd)
self._started = True
def _stop(self, working_dir):
"""Stop reproxy."""
with self.m.step.nest("teardown remote execution") as presentation:
cmd = [self._bootstrap_path, "--shutdown"]
if self._config_path:
cmd += ["--cfg={}".format(self._config_path)]
try:
self.m.step("stop reproxy", cmd)
self._started = False
finally:
# reproxy/rewrapper/bootstrap record various log information in
# a number of locations. At the time of this implementation,
# the following log files are used:
# 1. bootstrap.<INFO|WARNING|ERROR|FATAL> is standard logging
# for `bootstrap`. Each log file includes more severe logging
# levels, e.g. bootstrap.WARNING includes WARNNG, ERROR & FATAL
# log messages.
# 2. rbe_metrics.txt is the text representation of a proto
# message that describes metrics related to the rbe execution.
# 3. reproxy.<INFO|WARNING|ERROR|FATAL> is standard logging for
# `reproxy`. See notes in #1 for more details.
# 4. reproxy_log.txt is the log file that records all info
# about all actions that are processed through reproxy.
# 5. reproxy_outerr.log is merged stderr/stdout of `reproxy`.
# 6. rewrapper.<INFO|WARNING|ERROR|FATAL> is standard logging
# for `rewrapper`. See notes in #1 for more details.
#
# We extract the WARNING log messages for each portion of the
# local rbe client as well as reproxy stdout/stderr and metrics
# from the build by default. If further debugging is required,
# you could increase the verbosity of log messages that we
# retain in logdog or add the full reproxy_log.txt log file to
# the list of outputs.
diagnostic_outputs = [
"bootstrap.WARNING",
"rbe_metrics.txt",
"reproxy.WARNING",
"reproxy_outerr.log",
"rewrapper.WARNING",
]
for output in diagnostic_outputs:
path = working_dir.join(output)
# Not all builds use rbe, so it might not exist.
self.m.path.mock_add_paths(path)
if self.m.path.exists(path):
self.m.file.read_text(
"read {}".format(output.replace(".", "_")),
path,
test_data="test log",
)
# reproxy also produces a log file of all the actions which
# it handles including more detailed debugging information
# useful for debugging.
rpl_ext = self._log_format_to_ext(self._log_format)
rpl_file_glob = "*.{}".format(rpl_ext)
rpl_paths = self.m.file.glob_paths(
name="find {} files".format(rpl_ext),
source=working_dir,
pattern=rpl_file_glob,
test_data=[
"reproxy_2021-10-16_22_52_23.{}".format(rpl_ext),
],
)
# More than 1 rpl file is likely a bug but we can punt until
# that breaks someone.
for p in rpl_paths:
self.m.path.mock_add_paths(p)
# Not all builds use rbe, so it might not exist.
if self.m.path.exists(p):
self.m.file.read_text(
"read {}".format(self.m.path.basename(p).replace(".", "_")),
p,
test_data="test log",
)
self._upload_metrics(working_dir=working_dir)
def _upload_metrics(self, working_dir):
if not (self.m.buildbucket.builder_name and self.m.buildbucket.build.id):
# Skip the upload if it does not have build input information.
return
bq_pb = rbe_metrics.RbeMetrics()
bq_pb.build_id = self.m.buildbucket.build.id
bq_pb.builder_name = self.m.buildbucket.builder_name
bq_pb.created_at.FromDatetime(self.m.time.utcnow())
bq_pb.instance = self._instance
path = self.m.path.join(working_dir, "rbe_metrics.pb")
self.m.path.mock_add_paths(path)
if not self.m.path.exists(path): # pragma: no cover
return
stats = self.m.file.read_proto(
"read rbe_metrics.pb",
path,
stats_pb2.Stats,
codec="BINARY",
test_proto=stats_pb2.Stats(
environment=dict(
foo="false",
bar="42",
)
),
)
bq_pb.stats.CopyFrom(stats)
bq_json_dict = json_format.MessageToDict(
message=bq_pb, preserving_proto_field_name=True
)
# "environment" is a map field and gets serialized to a JSON map.
# Unfortunately, this is incompatible with the corresponding BQ schema,
# which is a repeated field and thus expects a JSON array.
envs = bq_pb.stats.environment
bq_json_dict["stats"]["environment"] = [
{"key": k, "value": v} for k, v in sorted(envs.items())
]
step_result = self.m.bqupload.insert(
step_name="upload metrics",
project="fuchsia-engprod-metrics-prod",
dataset="metrics",
table="rbe_client_metrics",
rows=[bq_json_dict],
)
step_result.presentation.logs["json.output"] = self.m.json.dumps(
bq_json_dict["stats"], indent=4
).splitlines()