| # Copyright 2021 The Fuchsia Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| import functools |
| import os |
| from contextlib import contextmanager |
| |
| from recipe_engine import recipe_api |
| from google.protobuf import json_format |
| from PB.recipe_modules.fuchsia.rbe import rbe_metrics |
| from PB.go.fuchsia.dev.foundry_x.re_client.api.stats import stats as stats_pb2 |
| from PB.go.fuchsia.dev.foundry_x.re_client.api.proxy import log as log_pb2 |
| |
| from RECIPE_MODULES.fuchsia.utils import pluralize |
| |
| RECLIENT_CXX_WRAPPER = "reclient-cxx-wrapper.sh" |
| |
| # For builds using the goma input processor, sometimes the deps cache file is |
| # too big for the default setting. So just set the max file size permitted to |
| # be large enough. |
| _DEPS_CACHE_MAX_MB = "512" |
| |
| # Helper function to serialize protos to dictionary expected by BQ. |
| # |
| # json_format.MessageToDict converts proto maps to dicts, which are |
| # incompatible with the corresponding BQ schema, so they are manually |
| # converted to lists. |
| def record_to_bq_dict(record): |
| def map_to_list(m): |
| return [{"key": k, "value": v} for k, v in sorted(m.items())] |
| |
| def map_to_list_proto_val(m): |
| return [ |
| {"key": k, "value": json_format.MessageToDict(v)} |
| for k, v in sorted(m.items()) |
| ] |
| |
| def rerun_metadata_map_to_list(dct, metadata): |
| for elm, data in zip(dct, metadata): |
| elm["output_file_digests"] = map_to_list(data.output_file_digests) |
| elm["output_directory_digests"] = map_to_list(data.output_directory_digests) |
| elm["event_times"] = map_to_list(data.event_times) |
| |
| record_dict = json_format.MessageToDict(record, preserving_proto_field_name=True) |
| |
| record_dict["command"]["platform"] = map_to_list(record.command.platform) |
| if "input" in record_dict["command"]: |
| record_dict["command"]["input"]["environment_variables"] = map_to_list( |
| record.command.input.environment_variables |
| ) |
| |
| if "remote_metadata" in record_dict: |
| record_dict["remote_metadata"]["event_times"] = map_to_list_proto_val( |
| record.remote_metadata.event_times |
| ) |
| record_dict["remote_metadata"]["output_file_digests"] = map_to_list( |
| record.remote_metadata.output_file_digests |
| ) |
| record_dict["remote_metadata"]["output_directory_digests"] = map_to_list( |
| record.remote_metadata.output_directory_digests |
| ) |
| if "rerun_metadata" in record_dict["remote_metadata"]: |
| rerun_metadata_map_to_list( |
| record_dict["remote_metadata"]["rerun_metadata"], |
| record.remote_metadata.rerun_metadata, |
| ) |
| |
| if "local_metadata" in record_dict: |
| record_dict["local_metadata"]["event_times"] = map_to_list_proto_val( |
| record.local_metadata.event_times |
| ) |
| record_dict["local_metadata"]["environment"] = map_to_list( |
| record.local_metadata.environment |
| ) |
| record_dict["local_metadata"]["labels"] = map_to_list( |
| record.local_metadata.labels |
| ) |
| if "rerun_metadata" in record_dict["local_metadata"]: |
| rerun_metadata_map_to_list( |
| record_dict["local_metadata"]["rerun_metadata"], |
| record.local_metadata.rerun_metadata, |
| ) |
| |
| return record_dict |
| |
| |
| class RbeApi(recipe_api.RecipeApi): |
| """RemoteExecutionApi contains helper functions for using remote execution |
| services via re-client/re-proxy.""" |
| |
| def __init__(self, props, *args, **kwargs): |
| super().__init__(*args, **kwargs) |
| |
| self._rbe_path = None |
| self._config_path = None |
| self._platform = props.platform |
| self._instance = props.instance |
| self._reclient_version = props.reclient_version |
| |
| if not self._platform and self._test_data.enabled: |
| self._platform = "fake_rbe_platform" |
| if not self._instance and self._test_data.enabled: |
| self._instance = "fake_rbe_instance" |
| self._log_format = props.log_format or "reducedtext" |
| self._started = False |
| |
| @contextmanager |
| def __call__(self): |
| """Make context wrapping reproxy start/stop. |
| |
| Raises: |
| StepFailure or InfraFailure if it fails to start/stop. |
| """ |
| |
| # If we do not override this value, then it user-controlled and a |
| # malicious user could manipulate the value in the fuchsia.git |
| # config file to send requests to a compromised backend (leak). |
| assert self._instance, "No RBE backend in builder properties." |
| |
| # Save current value of infra_step so we can reset it when we |
| # yield back. |
| is_infra_step = self.m.context.infra_step |
| |
| # Separate invocations of RBE tools should use unique paths to avoid |
| # conflicts between log/metric files. |
| working_dir = self.m.path.mkdtemp(prefix="rbe") |
| |
| with self.m.context(env=self._environment(working_dir), infra_steps=True): |
| try: |
| self._start() |
| with self.m.context(infra_steps=is_infra_step): |
| yield |
| finally: |
| if not self.m.runtime.in_global_shutdown: |
| self._stop(working_dir) |
| |
| def ensure(self): |
| with self.m.context(infra_steps=True): |
| pkgs = self.m.cipd.EnsureFile() |
| pkgs.add_package("infra/rbe/client/${platform}", self._reclient_version) |
| self._rbe_path = self.m.path["cache"].join("reclient") |
| self.m.cipd.ensure(self._rbe_path, pkgs) |
| return self._rbe_path |
| |
| @property |
| def _bootstrap_path(self): |
| assert self._rbe_path |
| return self._rbe_path.join("bootstrap") |
| |
| @property |
| def _rewrapper_path(self): |
| assert self._rbe_path |
| return self._rbe_path.join("rewrapper") |
| |
| @property |
| def cxx_compiler_wrapper_command(self): |
| start_dir = self.m.path["start_dir"] |
| # Note: the flags here should closely track those used in |
| # the Fuchsia project's cxx-remote-wrapper.sh. |
| # cmake generates commands with absolute paths, which can be a problem |
| # for reclient. Once reclient supports internal path relativization |
| # (b/232261587) we can drop the python wrapper. |
| return [ |
| "vpython3", |
| "-u", # unbuffered stdout/stderr |
| str(self.resource("relativize_args.py")), |
| "--", |
| str(self._rewrapper_path), |
| "--labels=type=compile,compiler=clang,lang=cpp", |
| "--canonicalize_working_dir=true", |
| "--exec_strategy=remote_local_fallback", # better diagnostics |
| "--exec_root=" |
| + os.path.commonpath([str(start_dir), str(self.m.path["cache"])]), |
| ] |
| |
| @functools.cached_property |
| def cxx_compiler_wrapper(self): |
| generated_script_dir = self.m.path.mkdtemp("cxx-rbe") |
| wrapped_command = " ".join(self.cxx_compiler_wrapper_command) |
| cxx_wrapper_script_path = generated_script_dir.join(RECLIENT_CXX_WRAPPER) |
| wrapper_script_text = """#!/bin/sh |
| exec {wrapper} "$@" |
| """.format( |
| wrapper=wrapped_command |
| ) |
| self.m.file.write_text( |
| "write %s script" % RECLIENT_CXX_WRAPPER, |
| cxx_wrapper_script_path, |
| wrapper_script_text, |
| ) |
| self.m.step( |
| "make %s executable" % RECLIENT_CXX_WRAPPER, |
| ["chmod", "+x", cxx_wrapper_script_path], |
| ) |
| return cxx_wrapper_script_path |
| |
| def _environment(self, working_dir): |
| cache_dir = self.m.path["cache"].join("rbe") |
| deps_cache_dir = cache_dir.join("deps") |
| self.m.file.ensure_directory("create rbe cache dir", deps_cache_dir) |
| # Environment. The source of truth for remote execution configuration |
| # is the Fuchsia tree (see $FUCHSIA_OUT_DIR/rbe_config.json). These |
| # values are used to modify the configuration in Infrastructure when |
| # appropriate. These should not be used to modify the behavior of the |
| # build in a meaningful way. |
| return { |
| "RBE_service": "remotebuildexecution.googleapis.com:443", |
| # TODO(fangism): sync docker image with that used in Fuchsia |
| "RBE_platform": self._platform, |
| # Override default instance. Infrastructure uses different RBE |
| # backends for different environments. |
| "RBE_instance": self._instance, |
| # Set deps cache path. |
| "RBE_deps_cache_dir": deps_cache_dir, |
| "RBE_deps_cache_max_mb": _DEPS_CACHE_MAX_MB, |
| # Set preferred log format for reproxy. |
| "RBE_log_format": self._log_format, |
| # Set log paths within the task working directory. |
| "RBE_log_dir": working_dir, |
| "RBE_output_dir": working_dir, |
| "RBE_proxy_log_dir": working_dir, |
| "RBE_server_address": "unix://{}".format(working_dir.join("reproxy.sock")), |
| "RBE_socket_path": working_dir.join("reproxy.sock"), |
| # Use GCE credentials by default. Infrastructure presents an |
| # emulated GCE metadata server in all environments for uniformity. |
| "RBE_use_application_default_credentials": "False", |
| "RBE_use_gce_credentials": "True", |
| } |
| |
| @property |
| def _reproxy_path(self): |
| assert self._rbe_path |
| return self._rbe_path.join("reproxy") |
| |
| def set_path(self, path): |
| """Path to the reproxy/bootstrap binary directory.""" |
| self._rbe_path = path |
| |
| def set_config_path(self, config_path): |
| """Path to the config file for the repository being built. |
| |
| In the case of Fuchsia, this should be set to the path referenced by |
| $FUCHSIA_OUT_DIR/rbe_config.json as reported by `gn gen`. |
| """ |
| self._config_path = config_path |
| |
| def _start(self): |
| """Start reproxy.""" |
| assert not self._started |
| |
| with self.m.step.nest("setup remote execution"): |
| cmd = [self._bootstrap_path, "--re_proxy={}".format(self._reproxy_path)] |
| if self._config_path: |
| cmd += ["--cfg={}".format(self._config_path)] |
| self.m.step("start reproxy", cmd) |
| self._started = True |
| |
| def _stop(self, working_dir): |
| """Stop reproxy.""" |
| with self.m.step.nest("teardown remote execution"): |
| cmd = [self._bootstrap_path, "--shutdown"] |
| if self._config_path: |
| cmd += ["--cfg={}".format(self._config_path)] |
| try: |
| self.m.step("stop reproxy", cmd) |
| self._started = False |
| finally: |
| # reproxy/rewrapper/bootstrap record various log information in |
| # a number of locations. At the time of this implementation, |
| # the following log files are used: |
| # 1. bootstrap.<INFO|WARNING|ERROR|FATAL> is standard logging |
| # for `bootstrap`. Each log file includes more severe logging |
| # levels, e.g. bootstrap.WARNING includes WARNING, ERROR & FATAL |
| # log messages. |
| # 2. rbe_metrics.txt is the text representation of a proto |
| # message that describes metrics related to the rbe execution. |
| # 3. reproxy.<INFO|WARNING|ERROR|FATAL> is standard logging for |
| # `reproxy`. See notes in #1 for more details. |
| # 4. reproxy_log.txt is the log file that records all info |
| # about all actions that are processed through reproxy. |
| # 5. reproxy_outerr.log is merged stderr/stdout of `reproxy`. |
| # 6. rewrapper.<INFO|WARNING|ERROR|FATAL> is standard logging |
| # for `rewrapper`. See notes in #1 for more details. |
| # 7. reproxy-gomaip.<INFO|WARNING|ERROR|FATAL> is logging |
| # for `gomaip` which is the input processor used by `reclient` |
| # for finding dependencies of `clang` compile invocations. |
| # |
| # We extract the WARNING log messages for each portion of the |
| # local rbe client as well as reproxy stdout/stderr and metrics |
| # from the build by default. If further debugging is required, |
| # you could increase the verbosity of log messages that we |
| # retain in logdog or add the full reproxy_log.txt log file to |
| # the list of outputs. |
| diagnostic_outputs = [ |
| "bootstrap.WARNING", |
| "rbe_metrics.txt", |
| "reproxy.WARNING", |
| "reproxy-gomaip.WARNING", |
| "reproxy_outerr.log", |
| "rewrapper.WARNING", |
| ] |
| |
| for output in diagnostic_outputs: |
| path = working_dir.join(output) |
| # Not all builds use rbe, so it might not exist. |
| self.m.path.mock_add_paths(path) |
| if self.m.path.exists(path): |
| self.m.file.read_text( |
| "read {}".format(output), |
| path, |
| test_data="test log", |
| include_log=False, |
| ) |
| |
| # reproxy also produces a log file of all the actions which |
| # it handles including more detailed debugging information |
| # useful for debugging. |
| rpl_ext = { |
| "text": "rpl", |
| "reducedtext": "rrpl", |
| }[self._log_format] |
| rpl_file_glob = "*.{}".format(rpl_ext) |
| rpl_paths = self.m.file.glob_paths( |
| name="find {} files".format(rpl_ext), |
| source=working_dir, |
| pattern=rpl_file_glob, |
| test_data=[ |
| "reproxy_2021-10-16_22_52_23.{}".format(rpl_ext), |
| ], |
| ) |
| |
| # More than 1 rpl file is likely a bug but we can punt until |
| # that breaks someone. |
| for p in rpl_paths: |
| self.m.path.mock_add_paths(p) |
| # Not all builds use rbe, so it might not exist. |
| if self.m.path.exists(p): |
| self.m.file.read_text( |
| "read {}".format(self.m.path.basename(p)), |
| p, |
| test_data="test log", |
| include_log=False, |
| ) |
| |
| self._upload_metrics(working_dir=working_dir) |
| try: |
| self._upload_logs(working_dir=working_dir) |
| except Exception: |
| lines = self.m.utils.traceback_format_exc().splitlines() |
| self.m.step.empty("rbe log upload failure").presentation.logs[ |
| "exception" |
| ] = lines |
| |
| def _upload_metrics(self, working_dir): |
| if not (self.m.buildbucket.builder_name and self.m.buildbucket_util.id): |
| # Skip the upload if it does not have build input information. |
| return |
| |
| bq_pb = rbe_metrics.RbeMetrics() |
| bq_pb.build_id = self.m.buildbucket_util.id |
| bq_pb.builder_name = self.m.buildbucket.builder_name |
| bq_pb.created_at.FromDatetime(self.m.time.utcnow()) |
| bq_pb.instance = self._instance |
| |
| path = self.m.path.join(working_dir, "rbe_metrics.pb") |
| self.m.path.mock_add_paths(path) |
| if not self.m.path.exists(path): # pragma: no cover |
| return |
| |
| stats = self.m.file.read_proto( |
| "read rbe_metrics.pb", |
| path, |
| stats_pb2.Stats, |
| codec="BINARY", |
| include_log=False, |
| test_proto=stats_pb2.Stats( |
| environment=dict( |
| foo="false", |
| bar="42", |
| ) |
| ), |
| ) |
| |
| bq_pb.stats.CopyFrom(stats) |
| |
| bq_json_dict = json_format.MessageToDict( |
| message=bq_pb, preserving_proto_field_name=True |
| ) |
| |
| # "environment" is a map field and gets serialized to a JSON map. |
| # Unfortunately, this is incompatible with the corresponding BQ schema, |
| # which is a repeated field and thus expects a JSON array. |
| envs = bq_pb.stats.environment |
| bq_json_dict["stats"]["environment"] = [ |
| {"key": k, "value": v} for k, v in sorted(envs.items()) |
| ] |
| |
| step_result = self.m.bqupload.insert( |
| step_name="upload metrics", |
| project="fuchsia-engprod-metrics-prod", |
| dataset="metrics", |
| table="rbe_client_metrics_v2", |
| rows=[bq_json_dict], |
| # TODO(fxbug.dev/114570): Send alerts to the build team. |
| alert_emails=["olivernewman@google.com"], |
| ) |
| |
| step_result.presentation.logs["json.output"] = self.m.json.dumps( |
| bq_json_dict["stats"], indent=4 |
| ).splitlines() |
| |
| def _upload_logs(self, working_dir): |
| if not (self.m.buildbucket.builder_name and self.m.buildbucket_util.id): |
| # Skip the upload if it does not have build input information. |
| return |
| |
| cmd = [ |
| self._rbe_path.join("logdump"), |
| "--proxy_log_dir", |
| working_dir, |
| "--output_dir", |
| working_dir, |
| ] |
| self.m.step("convert reproxy command log to binary proto", cmd) |
| |
| logs_bin_proto = self.m.path.join(working_dir, "reproxy_log.pb") |
| log_dump = self.m.file.read_proto( |
| "read {}".format(self.m.path.basename(logs_bin_proto)), |
| logs_bin_proto, |
| log_pb2.LogDump, |
| codec="BINARY", |
| include_log=False, |
| test_proto=log_pb2.LogDump(), |
| ) |
| if not log_dump.records: |
| return |
| |
| rows = [ |
| { |
| "build_id": self.m.buildbucket_util.id, |
| "log": record_to_bq_dict(record), |
| } |
| for record in log_dump.records |
| ] |
| self.m.bqupload.insert( |
| step_name="upload logs", |
| project="fuchsia-engprod-metrics-prod", |
| dataset="metrics", |
| table="rbe_client_command_logs_v2", |
| rows=rows, |
| ).presentation.step_text = pluralize("row", rows) |