blob: 65669c50e121960a78cb6482447f2348ea6d008e [file] [log] [blame]
# Copyright 2021 The Fuchsia Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
from enum import Enum
import os
from contextlib import contextmanager
from recipe_engine import recipe_api
from google.protobuf import json_format
from PB.recipe_modules.fuchsia.rbe import rbe_metrics
from PB.go.fuchsia.dev.foundry_x.re_client.api.stats import stats as stats_pb2
from PB.go.fuchsia.dev.foundry_x.re_client.api.proxy import log as log_pb2
from RECIPE_MODULES.fuchsia.utils import pluralize
RECLIENT_CXX_WRAPPER = "reclient-cxx-wrapper.sh"
# For builds using the goma input processor, sometimes the deps cache file is
# too big for the default setting. So just set the max file size permitted to
# be large enough.
_DEPS_CACHE_MAX_MB = "512"
# Helper function to serialize protos to dictionary expected by BQ.
#
# json_format.MessageToDict converts proto maps to dicts, which are
# incompatible with the corresponding BQ schema, so they are manually
# converted to lists.
def record_to_bq_dict(record):
def map_to_list(m):
return [{"key": k, "value": v} for k, v in sorted(m.items())]
def map_to_list_proto_val(m):
return [
{"key": k, "value": json_format.MessageToDict(v)}
for k, v in sorted(m.items())
]
def rerun_metadata_map_to_list(dct, metadata):
for elm, data in zip(dct, metadata):
elm["output_file_digests"] = map_to_list(data.output_file_digests)
elm["output_directory_digests"] = map_to_list(data.output_directory_digests)
elm["event_times"] = map_to_list(data.event_times)
record_dict = json_format.MessageToDict(record, preserving_proto_field_name=True)
record_dict["command"]["platform"] = map_to_list(record.command.platform)
if "input" in record_dict["command"]:
record_dict["command"]["input"]["environment_variables"] = map_to_list(
record.command.input.environment_variables
)
if "remote_metadata" in record_dict:
record_dict["remote_metadata"]["event_times"] = map_to_list_proto_val(
record.remote_metadata.event_times
)
record_dict["remote_metadata"]["output_file_digests"] = map_to_list(
record.remote_metadata.output_file_digests
)
record_dict["remote_metadata"]["output_directory_digests"] = map_to_list(
record.remote_metadata.output_directory_digests
)
if "rerun_metadata" in record_dict["remote_metadata"]:
rerun_metadata_map_to_list(
record_dict["remote_metadata"]["rerun_metadata"],
record.remote_metadata.rerun_metadata,
)
if "local_metadata" in record_dict:
record_dict["local_metadata"]["event_times"] = map_to_list_proto_val(
record.local_metadata.event_times
)
record_dict["local_metadata"]["environment"] = map_to_list(
record.local_metadata.environment
)
record_dict["local_metadata"]["labels"] = map_to_list(
record.local_metadata.labels
)
if "rerun_metadata" in record_dict["local_metadata"]:
rerun_metadata_map_to_list(
record_dict["local_metadata"]["rerun_metadata"],
record.local_metadata.rerun_metadata,
)
return record_dict
class RbeApi(recipe_api.RecipeApi):
"""RemoteExecutionApi contains helper functions for using remote execution
services via re-client/re-proxy."""
class AbsolutePathPolicy(Enum):
"""This controls how absolute paths are to be treated.
The choice impacts how reproxy and rewrapper are invoked.
Choices:
REJECT: remote commands using local absolute paths will fail.
rewrapper --canonicalize_working_dir=true.
This allows cache sharing between different build output
directories (under exec_root) at the same depth.
reproxy: no InputPathAbsoluteRoot
RELATIVIZE: rewrite commands using relative paths, using a wrapper.
Relative paths are remote-execution friendly, while absolute paths
will likely fail. cmake builds are known to use absolute paths.
Relativized commands are better for caching across build
environments, but the wrapper script incurs some overhead.
rewrapper --canonicalize_working_dir=true.
reproxy: no InputPathAbsoluteRoot
ALLOW: Force the remote environment to mimic local paths.
This allows commands with absolute paths to work,
at the expense of being able to cache across build environments.
This option can help cmake builds work remotely.
rewrapper --canonicalize_working_dir=false.
reproxy: --platform InputPathAbsoluteRoot=exec_root
"""
REJECT = 1
RELATIVIZE = 2
ALLOW = 3
def __init__(self, props, *args, **kwargs):
super().__init__(*args, **kwargs)
self._reclient_path = None
self._platform = props.platform
self._instance = props.instance
# Default: let commands that use absolute paths fail remote execution.
# For best caching performance, restrict remote execution commands
# to use only relative paths.
self._absolute_path_policy = self.AbsolutePathPolicy.REJECT
if not self._platform and self._test_data.enabled:
self._platform = "fake_rbe_platform"
if not self._instance and self._test_data.enabled:
self._instance = "fake_rbe_instance"
self._log_format = props.log_format or "reducedtext"
self._started = False
@contextmanager
def __call__(
self,
reclient_path=None,
config_path=None,
absolute_path_policy=AbsolutePathPolicy.REJECT,
):
"""Make context wrapping reproxy start/stop.
Args:
reclient_path (Path): if set, use this Path to reclient tools,
otherwise, automatically use the Path to a loaded CIPD package.
config_path (Path): The config file within the checkout.
In the case of a Fuchsia checkout, this should be set to the path
referenced by $FUCHSIA_OUT_DIR/rbe_config.json as reported by
`gn gen`.
absolute_path_policy (AbsolutePathPolicy): See enum definition.
Raises:
StepFailure or InfraFailure if it fails to start/stop.
"""
if reclient_path:
self._reclient_path = reclient_path
else:
self._reclient_path = self._ensure_reclient_path
assert self._reclient_path
# If we do not override this value, then it user-controlled and a
# malicious user could manipulate the value in the fuchsia.git
# config file to send requests to a compromised backend (leak).
assert self._instance, "No RBE backend in builder properties."
# Save current value of infra_step so we can reset it when we
# yield back.
is_infra_step = self.m.context.infra_step
# Separate invocations of RBE tools should use unique paths to avoid
# conflicts between log/metric files.
working_dir = self.m.path.mkdtemp(prefix="rbe")
saved_absolute_path_policy = self._absolute_path_policy
self._absolute_path_policy = absolute_path_policy
with self.m.context(env=self._environment(working_dir), infra_steps=True):
try:
self._start(config_path=config_path)
with self.m.context(infra_steps=is_infra_step):
yield
finally:
if not self.m.runtime.in_global_shutdown:
self._stop(working_dir=working_dir, config_path=config_path)
self._absolute_path_policy = saved_absolute_path_policy
@property
def _ensure_reclient_path(self):
return self.m.ensure_tool(
"reclient", self.resource("tool_manifest.json"), executable_path=""
)
@property
def _exec_root(self):
"""Path that contains all files needed for remote execution."""
return os.path.commonpath(
[
str(self.m.path["start_dir"]),
str(self.m.path["cache"]),
]
)
@property
def _bootstrap_path(self):
assert self._reclient_path
return self._reclient_path.join("bootstrap")
@property
def _rewrapper_path(self):
assert self._reclient_path
return self._reclient_path.join("rewrapper")
def cxx_compiler_wrapper_command(self):
command = []
# Path-relativization is done with a wrapper script.
# Once reclient supports internal path relativization
# (b/232261587) we can drop the python wrapper.
if self._absolute_path_policy == self.AbsolutePathPolicy.RELATIVIZE:
command += [
"vpython3",
"-u", # unbuffered stdout/stderr
str(self.resource("relativize_args.py")),
"--",
]
# Note: the flags here should closely track those used in
# the Fuchsia project's cxx-remote-wrapper.sh.
command += [
str(self._rewrapper_path),
"--labels=type=compile,compiler=clang,lang=cpp",
"--exec_strategy=remote_local_fallback", # better diagnostics
f"--exec_root={self._exec_root}",
]
# Setting remote mounting paths with the ALLOW option is
# incompatible with --canonicalize_working_dir.
if self._absolute_path_policy in {
self.AbsolutePathPolicy.REJECT,
self.AbsolutePathPolicy.RELATIVIZE,
}:
command += ["--canonicalize_working_dir=true"]
return command + ["--"]
def cxx_compiler_wrapper(self):
# TODO(http://fxbug.dev/107610): cmake has an issue with handling
# a multi-token command prefix with semicolons. To workaround this,
# we stuff a multi-token command prefix into a single shell script.
generated_script_dir = self.m.path.mkdtemp("cxx-rbe")
wrapped_command = " ".join(self.cxx_compiler_wrapper_command())
cxx_wrapper_script_path = generated_script_dir.join(RECLIENT_CXX_WRAPPER)
wrapper_script_text = f"""#!/bin/sh
exec {wrapped_command} "$@\"
"""
self.m.file.write_text(
f"write {RECLIENT_CXX_WRAPPER} script",
cxx_wrapper_script_path,
wrapper_script_text,
)
self.m.step(
f"make {RECLIENT_CXX_WRAPPER} executable",
["chmod", "+x", cxx_wrapper_script_path],
)
return cxx_wrapper_script_path
@property
def _policy_platform(self):
if self._absolute_path_policy == self.AbsolutePathPolicy.ALLOW:
return f"{self._platform},InputRootAbsolutePath={self._exec_root}"
return self._platform
def _environment(self, working_dir):
cache_dir = self.m.path["cache"].join("rbe")
deps_cache_dir = cache_dir.join("deps")
self.m.file.ensure_directory("create rbe cache dir", deps_cache_dir)
# Environment. The source of truth for remote execution configuration
# is the Fuchsia tree (see $FUCHSIA_OUT_DIR/rbe_config.json). These
# values are used to modify the configuration in Infrastructure when
# appropriate. These should not be used to modify the behavior of the
# build in a meaningful way.
return {
"RBE_service": "remotebuildexecution.googleapis.com:443",
# TODO(fangism): sync docker image with that used in Fuchsia
"RBE_platform": self._policy_platform,
# Override default instance. Infrastructure uses different RBE
# backends for different environments.
"RBE_instance": self._instance,
# Set deps cache path.
"RBE_enable_deps_cache": "true",
"RBE_cache_dir": deps_cache_dir,
"RBE_deps_cache_max_mb": _DEPS_CACHE_MAX_MB,
# Set preferred log format for reproxy.
"RBE_log_format": self._log_format,
# Set log paths within the task working directory.
"RBE_log_dir": working_dir,
"RBE_output_dir": working_dir,
"RBE_proxy_log_dir": working_dir,
"RBE_server_address": f"unix://{working_dir.join('reproxy.sock')}",
"RBE_socket_path": working_dir.join("reproxy.sock"),
# Use GCE credentials by default. Infrastructure presents an
# emulated GCE metadata server in all environments for uniformity.
"RBE_use_application_default_credentials": "false",
"RBE_use_gce_credentials": "true",
}
@property
def _reproxy_path(self):
assert self._reclient_path
return self._reclient_path.join("reproxy")
def _start(self, config_path):
"""Start reproxy."""
assert not self._started
with self.m.step.nest("setup remote execution"):
cmd = [self._bootstrap_path, f"--re_proxy={self._reproxy_path}"]
if config_path:
cmd += [f"--cfg={config_path}"]
self.m.step("start reproxy", cmd)
self._started = True
def _stop(self, working_dir, config_path):
"""Stop reproxy."""
with self.m.step.nest("teardown remote execution"):
cmd = [self._bootstrap_path, "--shutdown"]
if config_path:
cmd += [f"--cfg={config_path}"]
try:
self.m.step("stop reproxy", cmd)
self._started = False
finally:
# reproxy/rewrapper/bootstrap record various log information in
# a number of locations. At the time of this implementation,
# the following log files are used:
# 1. bootstrap.<INFO|WARNING|ERROR|FATAL> is standard logging
# for `bootstrap`. Each log file includes more severe logging
# levels, e.g. bootstrap.WARNING includes WARNING, ERROR & FATAL
# log messages.
# 2. rbe_metrics.txt is the text representation of a proto
# message that describes metrics related to the rbe execution.
# 3. reproxy.<INFO|WARNING|ERROR|FATAL> is standard logging for
# `reproxy`. See notes in #1 for more details.
# 4. reproxy_log.txt is the log file that records all info
# about all actions that are processed through reproxy.
# 5. reproxy_outerr.log is merged stderr/stdout of `reproxy`.
# 6. rewrapper.<INFO|WARNING|ERROR|FATAL> is standard logging
# for `rewrapper`. See notes in #1 for more details.
# 7. reproxy-gomaip.<INFO|WARNING|ERROR|FATAL> is logging
# for `gomaip` which is the input processor used by `reclient`
# for finding dependencies of `clang` compile invocations.
#
# We extract the WARNING log messages for each portion of the
# local rbe client as well as reproxy stdout/stderr and metrics
# from the build by default. If further debugging is required,
# you could increase the verbosity of log messages that we
# retain in logdog or add the full reproxy_log.txt log file to
# the list of outputs.
diagnostic_outputs = [
"bootstrap.WARNING",
"rbe_metrics.txt",
"reproxy.WARNING",
"reproxy-gomaip.WARNING",
"reproxy_outerr.log",
"rewrapper.WARNING",
]
for output in diagnostic_outputs:
path = working_dir.join(output)
# Not all builds use rbe, so it might not exist.
self.m.path.mock_add_paths(path)
if self.m.path.exists(path):
# Read the log so it shows up in Milo for debugging.
self.m.file.read_text(f"read {output}", path)
# reproxy also produces a log file of all the actions which
# it handles including more detailed debugging information
# useful for debugging.
rpl_ext = {
"text": "rpl",
"reducedtext": "rrpl",
}[self._log_format]
rpl_file_glob = f"*.{rpl_ext}"
rpl_paths = self.m.file.glob_paths(
name=f"find {rpl_ext} files",
source=working_dir,
pattern=rpl_file_glob,
test_data=[
f"reproxy_2021-10-16_22_52_23.{rpl_ext}",
],
)
# More than 1 rpl file is likely a bug but we can punt until
# that breaks someone.
for p in rpl_paths:
self.m.path.mock_add_paths(p)
# Not all builds use rbe, so it might not exist.
if self.m.path.exists(p):
# Read the log so it shows up in Milo for debugging.
self.m.file.read_text(f"read {self.m.path.basename(p)}", p)
self._upload_metrics(working_dir=working_dir)
try:
self._upload_logs(working_dir=working_dir)
except Exception:
lines = self.m.utils.traceback_format_exc().splitlines()
self.m.step.empty("rbe log upload failure").presentation.logs[
"exception"
] = lines
def _upload_metrics(self, working_dir):
if not (self.m.buildbucket.builder_name and self.m.buildbucket_util.id):
# Skip the upload if it does not have build input information.
return
bq_pb = rbe_metrics.RbeMetrics()
bq_pb.build_id = self.m.buildbucket_util.id
bq_pb.builder_name = self.m.buildbucket.builder_name
bq_pb.created_at.FromDatetime(self.m.time.utcnow())
bq_pb.instance = self._instance
path = self.m.path.join(working_dir, "rbe_metrics.pb")
self.m.path.mock_add_paths(path)
if not self.m.path.exists(path): # pragma: no cover
return
stats = self.m.file.read_proto(
"read rbe_metrics.pb",
path,
stats_pb2.Stats,
codec="BINARY",
include_log=False,
test_proto=stats_pb2.Stats(
environment=dict(
foo="false",
bar="42",
)
),
)
bq_pb.stats.CopyFrom(stats)
bq_json_dict = json_format.MessageToDict(
message=bq_pb, preserving_proto_field_name=True
)
# "environment" is a map field and gets serialized to a JSON map.
# Unfortunately, this is incompatible with the corresponding BQ schema,
# which is a repeated field and thus expects a JSON array.
envs = bq_pb.stats.environment
bq_json_dict["stats"]["environment"] = [
{"key": k, "value": v} for k, v in sorted(envs.items())
]
step_result = self.m.bqupload.insert(
step_name="upload metrics",
project="fuchsia-engprod-metrics-prod",
dataset="metrics",
table="rbe_client_metrics_v2",
rows=[bq_json_dict],
# TODO(fxbug.dev/114570): Send alerts to the build team.
alert_emails=["olivernewman@google.com"],
)
step_result.presentation.logs["json.output"] = self.m.json.dumps(
bq_json_dict["stats"], indent=4
).splitlines()
def _upload_logs(self, working_dir):
if not (self.m.buildbucket.builder_name and self.m.buildbucket_util.id):
# Skip the upload if it does not have build input information.
return
cmd = [
self._reclient_path.join("logdump"),
"--proxy_log_dir",
working_dir,
"--output_dir",
working_dir,
]
self.m.step("convert reproxy command log to binary proto", cmd)
logs_bin_proto = self.m.path.join(working_dir, "reproxy_log.pb")
log_dump = self.m.file.read_proto(
f"read {self.m.path.basename(logs_bin_proto)}",
logs_bin_proto,
log_pb2.LogDump,
codec="BINARY",
include_log=False,
test_proto=log_pb2.LogDump(),
)
if not log_dump.records:
return
rows = [
{
"build_id": self.m.buildbucket_util.id,
"log": record_to_bq_dict(record),
}
for record in log_dump.records
]
self.m.bqupload.insert(
step_name="upload logs",
project="fuchsia-engprod-metrics-prod",
dataset="metrics",
table="rbe_client_command_logs_v2",
rows=rows,
).presentation.step_text = pluralize("row", rows)