| # Copyright 2021 The Fuchsia Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| from enum import Enum |
| import os |
| from contextlib import contextmanager |
| |
| from recipe_engine import recipe_api |
| from google.protobuf import json_format |
| from PB.recipe_modules.fuchsia.rbe import rbe_metrics |
| from PB.go.fuchsia.dev.foundry_x.re_client.api.stats import stats as stats_pb2 |
| from PB.go.fuchsia.dev.foundry_x.re_client.api.proxy import log as log_pb2 |
| |
| from RECIPE_MODULES.fuchsia.utils import pluralize |
| |
| RECLIENT_CXX_WRAPPER = "reclient-cxx-wrapper.sh" |
| |
| # For builds using the goma input processor, sometimes the deps cache file is |
| # too big for the default setting. So just set the max file size permitted to |
| # be large enough. |
| _DEPS_CACHE_MAX_MB = "512" |
| |
| |
| # Helper function to serialize protos to dictionary expected by BQ. |
| # |
| # json_format.MessageToDict converts proto maps to dicts, which are |
| # incompatible with the corresponding BQ schema, so they are manually |
| # converted to lists. |
| def record_to_bq_dict(record): |
| def map_to_list(m): |
| return [{"key": k, "value": v} for k, v in sorted(m.items())] |
| |
| def map_to_list_proto_val(m): |
| return [ |
| {"key": k, "value": json_format.MessageToDict(v)} |
| for k, v in sorted(m.items()) |
| ] |
| |
| def rerun_metadata_map_to_list(dct, metadata): |
| for elm, data in zip(dct, metadata): |
| elm["output_file_digests"] = map_to_list(data.output_file_digests) |
| elm["output_directory_digests"] = map_to_list(data.output_directory_digests) |
| elm["event_times"] = map_to_list(data.event_times) |
| |
| record_dict = json_format.MessageToDict(record, preserving_proto_field_name=True) |
| |
| record_dict["command"]["platform"] = map_to_list(record.command.platform) |
| if "input" in record_dict["command"]: |
| record_dict["command"]["input"]["environment_variables"] = map_to_list( |
| record.command.input.environment_variables |
| ) |
| |
| if "remote_metadata" in record_dict: |
| record_dict["remote_metadata"]["event_times"] = map_to_list_proto_val( |
| record.remote_metadata.event_times |
| ) |
| record_dict["remote_metadata"]["output_file_digests"] = map_to_list( |
| record.remote_metadata.output_file_digests |
| ) |
| record_dict["remote_metadata"]["output_directory_digests"] = map_to_list( |
| record.remote_metadata.output_directory_digests |
| ) |
| if "rerun_metadata" in record_dict["remote_metadata"]: |
| rerun_metadata_map_to_list( |
| record_dict["remote_metadata"]["rerun_metadata"], |
| record.remote_metadata.rerun_metadata, |
| ) |
| |
| if "local_metadata" in record_dict: |
| record_dict["local_metadata"]["event_times"] = map_to_list_proto_val( |
| record.local_metadata.event_times |
| ) |
| record_dict["local_metadata"]["environment"] = map_to_list( |
| record.local_metadata.environment |
| ) |
| record_dict["local_metadata"]["labels"] = map_to_list( |
| record.local_metadata.labels |
| ) |
| if "rerun_metadata" in record_dict["local_metadata"]: |
| rerun_metadata_map_to_list( |
| record_dict["local_metadata"]["rerun_metadata"], |
| record.local_metadata.rerun_metadata, |
| ) |
| |
| return record_dict |
| |
| |
| class RbeApi(recipe_api.RecipeApi): |
| """RemoteExecutionApi contains helper functions for using remote execution |
| services via re-client/re-proxy.""" |
| |
| class AbsolutePathPolicy(Enum): |
| """This controls how absolute paths are to be treated. |
| |
| The choice impacts how reproxy and rewrapper are invoked. |
| |
| Choices: |
| REJECT: remote commands using local absolute paths will fail. |
| rewrapper --canonicalize_working_dir=true. |
| This allows cache sharing between different build output |
| directories (under exec_root) at the same depth. |
| reproxy: no InputPathAbsoluteRoot |
| |
| RELATIVIZE: rewrite commands using relative paths, using a wrapper. |
| Relative paths are remote-execution friendly, while absolute paths |
| will likely fail. cmake builds are known to use absolute paths. |
| Relativized commands are better for caching across build |
| environments, but the wrapper script incurs some overhead. |
| rewrapper --canonicalize_working_dir=true. |
| reproxy: no InputPathAbsoluteRoot |
| |
| ALLOW: Force the remote environment to mimic local paths. |
| This allows commands with absolute paths to work, |
| at the expense of being able to cache across build environments. |
| This option can help cmake builds work remotely. |
| rewrapper --canonicalize_working_dir=false. |
| reproxy: --platform InputPathAbsoluteRoot=exec_root |
| """ |
| |
| REJECT = 1 |
| RELATIVIZE = 2 |
| ALLOW = 3 |
| |
| def __init__(self, props, *args, **kwargs): |
| super().__init__(*args, **kwargs) |
| |
| self._reclient_path = None |
| self._platform = props.platform |
| self._instance = props.instance |
| |
| # Default: let commands that use absolute paths fail remote execution. |
| # For best caching performance, restrict remote execution commands |
| # to use only relative paths. |
| self._absolute_path_policy = self.AbsolutePathPolicy.REJECT |
| |
| if not self._platform and self._test_data.enabled: |
| self._platform = "fake_rbe_platform" |
| if not self._instance and self._test_data.enabled: |
| self._instance = "fake_rbe_instance" |
| self._log_format = props.log_format or "reducedtext" |
| self._started = False |
| |
| @contextmanager |
| def __call__( |
| self, |
| reclient_path=None, |
| config_path=None, |
| absolute_path_policy=AbsolutePathPolicy.REJECT, |
| ): |
| """Make context wrapping reproxy start/stop. |
| |
| Args: |
| reclient_path (Path): if set, use this Path to reclient tools, |
| otherwise, automatically use the Path to a loaded CIPD package. |
| config_path (Path): The config file within the checkout. |
| In the case of a Fuchsia checkout, this should be set to the path |
| referenced by $FUCHSIA_OUT_DIR/rbe_config.json as reported by |
| `gn gen`. |
| absolute_path_policy (AbsolutePathPolicy): See enum definition. |
| |
| Raises: |
| StepFailure or InfraFailure if it fails to start/stop. |
| """ |
| if reclient_path: |
| self._reclient_path = reclient_path |
| else: |
| self._reclient_path = self._ensure_reclient_path |
| |
| assert self._reclient_path |
| |
| # If we do not override this value, then it user-controlled and a |
| # malicious user could manipulate the value in the fuchsia.git |
| # config file to send requests to a compromised backend (leak). |
| assert self._instance, "No RBE backend in builder properties." |
| |
| # Save current value of infra_step so we can reset it when we |
| # yield back. |
| is_infra_step = self.m.context.infra_step |
| |
| # Separate invocations of RBE tools should use unique paths to avoid |
| # conflicts between log/metric files. |
| working_dir = self.m.path.mkdtemp(prefix="rbe") |
| |
| saved_absolute_path_policy = self._absolute_path_policy |
| self._absolute_path_policy = absolute_path_policy |
| |
| with self.m.context(env=self._environment(working_dir), infra_steps=True): |
| try: |
| self._start(config_path=config_path) |
| with self.m.context(infra_steps=is_infra_step): |
| yield |
| finally: |
| if not self.m.runtime.in_global_shutdown: |
| self._stop(working_dir=working_dir, config_path=config_path) |
| self._absolute_path_policy = saved_absolute_path_policy |
| |
| @property |
| def _ensure_reclient_path(self): |
| return self.m.ensure_tool( |
| "reclient", self.resource("tool_manifest.json"), executable_path="" |
| ) |
| |
| @property |
| def _exec_root(self): |
| """Path that contains all files needed for remote execution.""" |
| return os.path.commonpath( |
| [ |
| str(self.m.path["start_dir"]), |
| str(self.m.path["cache"]), |
| ] |
| ) |
| |
| @property |
| def _bootstrap_path(self): |
| assert self._reclient_path |
| return self._reclient_path.join("bootstrap") |
| |
| @property |
| def _rewrapper_path(self): |
| assert self._reclient_path |
| return self._reclient_path.join("rewrapper") |
| |
| def cxx_compiler_wrapper_command(self): |
| command = [] |
| |
| # Path-relativization is done with a wrapper script. |
| # Once reclient supports internal path relativization |
| # (b/232261587) we can drop the python wrapper. |
| if self._absolute_path_policy == self.AbsolutePathPolicy.RELATIVIZE: |
| command += [ |
| "vpython3", |
| "-u", # unbuffered stdout/stderr |
| str(self.resource("relativize_args.py")), |
| "--", |
| ] |
| |
| # Note: the flags here should closely track those used in |
| # the Fuchsia project's cxx-remote-wrapper.sh. |
| command += [ |
| str(self._rewrapper_path), |
| "--labels=type=compile,compiler=clang,lang=cpp", |
| "--exec_strategy=remote_local_fallback", # better diagnostics |
| f"--exec_root={self._exec_root}", |
| ] |
| |
| # Setting remote mounting paths with the ALLOW option is |
| # incompatible with --canonicalize_working_dir. |
| if self._absolute_path_policy in { |
| self.AbsolutePathPolicy.REJECT, |
| self.AbsolutePathPolicy.RELATIVIZE, |
| }: |
| command += ["--canonicalize_working_dir=true"] |
| |
| return command + ["--"] |
| |
| def cxx_compiler_wrapper(self): |
| # TODO(http://fxbug.dev/107610): cmake has an issue with handling |
| # a multi-token command prefix with semicolons. To workaround this, |
| # we stuff a multi-token command prefix into a single shell script. |
| generated_script_dir = self.m.path.mkdtemp("cxx-rbe") |
| wrapped_command = " ".join(self.cxx_compiler_wrapper_command()) |
| cxx_wrapper_script_path = generated_script_dir.join(RECLIENT_CXX_WRAPPER) |
| wrapper_script_text = f"""#!/bin/sh |
| exec {wrapped_command} "$@\" |
| """ |
| self.m.file.write_text( |
| f"write {RECLIENT_CXX_WRAPPER} script", |
| cxx_wrapper_script_path, |
| wrapper_script_text, |
| ) |
| self.m.step( |
| f"make {RECLIENT_CXX_WRAPPER} executable", |
| ["chmod", "+x", cxx_wrapper_script_path], |
| ) |
| return cxx_wrapper_script_path |
| |
| @property |
| def _policy_platform(self): |
| if self._absolute_path_policy == self.AbsolutePathPolicy.ALLOW: |
| return f"{self._platform},InputRootAbsolutePath={self._exec_root}" |
| return self._platform |
| |
| def _environment(self, working_dir): |
| cache_dir = self.m.path["cache"].join("rbe") |
| deps_cache_dir = cache_dir.join("deps") |
| self.m.file.ensure_directory("create rbe cache dir", deps_cache_dir) |
| # Environment. The source of truth for remote execution configuration |
| # is the Fuchsia tree (see $FUCHSIA_OUT_DIR/rbe_config.json). These |
| # values are used to modify the configuration in Infrastructure when |
| # appropriate. These should not be used to modify the behavior of the |
| # build in a meaningful way. |
| return { |
| "RBE_service": "remotebuildexecution.googleapis.com:443", |
| # TODO(fangism): sync docker image with that used in Fuchsia |
| "RBE_platform": self._policy_platform, |
| # Override default instance. Infrastructure uses different RBE |
| # backends for different environments. |
| "RBE_instance": self._instance, |
| # Set deps cache path. |
| "RBE_enable_deps_cache": "true", |
| "RBE_cache_dir": deps_cache_dir, |
| "RBE_deps_cache_max_mb": _DEPS_CACHE_MAX_MB, |
| # Set preferred log format for reproxy. |
| "RBE_log_format": self._log_format, |
| # Set log paths within the task working directory. |
| "RBE_log_dir": working_dir, |
| "RBE_output_dir": working_dir, |
| "RBE_proxy_log_dir": working_dir, |
| "RBE_server_address": f"unix://{working_dir.join('reproxy.sock')}", |
| "RBE_socket_path": working_dir.join("reproxy.sock"), |
| # Use GCE credentials by default. Infrastructure presents an |
| # emulated GCE metadata server in all environments for uniformity. |
| "RBE_use_application_default_credentials": "false", |
| "RBE_use_gce_credentials": "true", |
| } |
| |
| @property |
| def _reproxy_path(self): |
| assert self._reclient_path |
| return self._reclient_path.join("reproxy") |
| |
| def _start(self, config_path): |
| """Start reproxy.""" |
| assert not self._started |
| |
| with self.m.step.nest("setup remote execution"): |
| cmd = [self._bootstrap_path, f"--re_proxy={self._reproxy_path}"] |
| if config_path: |
| cmd += [f"--cfg={config_path}"] |
| self.m.step("start reproxy", cmd) |
| self._started = True |
| |
| def _stop(self, working_dir, config_path): |
| """Stop reproxy.""" |
| with self.m.step.nest("teardown remote execution"): |
| cmd = [self._bootstrap_path, "--shutdown"] |
| if config_path: |
| cmd += [f"--cfg={config_path}"] |
| try: |
| self.m.step("stop reproxy", cmd) |
| self._started = False |
| finally: |
| # reproxy/rewrapper/bootstrap record various log information in |
| # a number of locations. At the time of this implementation, |
| # the following log files are used: |
| # 1. bootstrap.<INFO|WARNING|ERROR|FATAL> is standard logging |
| # for `bootstrap`. Each log file includes more severe logging |
| # levels, e.g. bootstrap.WARNING includes WARNING, ERROR & FATAL |
| # log messages. |
| # 2. rbe_metrics.txt is the text representation of a proto |
| # message that describes metrics related to the rbe execution. |
| # 3. reproxy.<INFO|WARNING|ERROR|FATAL> is standard logging for |
| # `reproxy`. See notes in #1 for more details. |
| # 4. reproxy_log.txt is the log file that records all info |
| # about all actions that are processed through reproxy. |
| # 5. reproxy_outerr.log is merged stderr/stdout of `reproxy`. |
| # 6. rewrapper.<INFO|WARNING|ERROR|FATAL> is standard logging |
| # for `rewrapper`. See notes in #1 for more details. |
| # 7. reproxy-gomaip.<INFO|WARNING|ERROR|FATAL> is logging |
| # for `gomaip` which is the input processor used by `reclient` |
| # for finding dependencies of `clang` compile invocations. |
| # |
| # We extract the WARNING log messages for each portion of the |
| # local rbe client as well as reproxy stdout/stderr and metrics |
| # from the build by default. If further debugging is required, |
| # you could increase the verbosity of log messages that we |
| # retain in logdog or add the full reproxy_log.txt log file to |
| # the list of outputs. |
| diagnostic_outputs = [ |
| "bootstrap.WARNING", |
| "rbe_metrics.txt", |
| "reproxy.WARNING", |
| "reproxy-gomaip.WARNING", |
| "reproxy_outerr.log", |
| "rewrapper.WARNING", |
| ] |
| |
| for output in diagnostic_outputs: |
| path = working_dir.join(output) |
| # Not all builds use rbe, so it might not exist. |
| self.m.path.mock_add_paths(path) |
| if self.m.path.exists(path): |
| # Read the log so it shows up in Milo for debugging. |
| self.m.file.read_text(f"read {output}", path) |
| |
| # reproxy also produces a log file of all the actions which |
| # it handles including more detailed debugging information |
| # useful for debugging. |
| rpl_ext = { |
| "text": "rpl", |
| "reducedtext": "rrpl", |
| }[self._log_format] |
| rpl_file_glob = f"*.{rpl_ext}" |
| rpl_paths = self.m.file.glob_paths( |
| name=f"find {rpl_ext} files", |
| source=working_dir, |
| pattern=rpl_file_glob, |
| test_data=[ |
| f"reproxy_2021-10-16_22_52_23.{rpl_ext}", |
| ], |
| ) |
| |
| # More than 1 rpl file is likely a bug but we can punt until |
| # that breaks someone. |
| for p in rpl_paths: |
| self.m.path.mock_add_paths(p) |
| # Not all builds use rbe, so it might not exist. |
| if self.m.path.exists(p): |
| # Read the log so it shows up in Milo for debugging. |
| self.m.file.read_text(f"read {self.m.path.basename(p)}", p) |
| |
| self._upload_metrics(working_dir=working_dir) |
| try: |
| self._upload_logs(working_dir=working_dir) |
| except Exception: |
| lines = self.m.utils.traceback_format_exc().splitlines() |
| self.m.step.empty("rbe log upload failure").presentation.logs[ |
| "exception" |
| ] = lines |
| |
| def _upload_metrics(self, working_dir): |
| if not (self.m.buildbucket.builder_name and self.m.buildbucket_util.id): |
| # Skip the upload if it does not have build input information. |
| return |
| |
| bq_pb = rbe_metrics.RbeMetrics() |
| bq_pb.build_id = self.m.buildbucket_util.id |
| bq_pb.builder_name = self.m.buildbucket.builder_name |
| bq_pb.created_at.FromDatetime(self.m.time.utcnow()) |
| bq_pb.instance = self._instance |
| |
| path = self.m.path.join(working_dir, "rbe_metrics.pb") |
| self.m.path.mock_add_paths(path) |
| if not self.m.path.exists(path): # pragma: no cover |
| return |
| |
| stats = self.m.file.read_proto( |
| "read rbe_metrics.pb", |
| path, |
| stats_pb2.Stats, |
| codec="BINARY", |
| include_log=False, |
| test_proto=stats_pb2.Stats( |
| environment=dict( |
| foo="false", |
| bar="42", |
| ) |
| ), |
| ) |
| |
| bq_pb.stats.CopyFrom(stats) |
| |
| bq_json_dict = json_format.MessageToDict( |
| message=bq_pb, preserving_proto_field_name=True |
| ) |
| |
| # "environment" is a map field and gets serialized to a JSON map. |
| # Unfortunately, this is incompatible with the corresponding BQ schema, |
| # which is a repeated field and thus expects a JSON array. |
| envs = bq_pb.stats.environment |
| bq_json_dict["stats"]["environment"] = [ |
| {"key": k, "value": v} for k, v in sorted(envs.items()) |
| ] |
| |
| step_result = self.m.bqupload.insert( |
| step_name="upload metrics", |
| project="fuchsia-engprod-metrics-prod", |
| dataset="metrics", |
| table="rbe_client_metrics_v2", |
| rows=[bq_json_dict], |
| # TODO(fxbug.dev/114570): Send alerts to the build team. |
| alert_emails=["olivernewman@google.com"], |
| ) |
| |
| step_result.presentation.logs["json.output"] = self.m.json.dumps( |
| bq_json_dict["stats"], indent=4 |
| ).splitlines() |
| |
| def _upload_logs(self, working_dir): |
| if not (self.m.buildbucket.builder_name and self.m.buildbucket_util.id): |
| # Skip the upload if it does not have build input information. |
| return |
| |
| cmd = [ |
| self._reclient_path.join("logdump"), |
| "--proxy_log_dir", |
| working_dir, |
| "--output_dir", |
| working_dir, |
| ] |
| self.m.step("convert reproxy command log to binary proto", cmd) |
| |
| logs_bin_proto = self.m.path.join(working_dir, "reproxy_log.pb") |
| log_dump = self.m.file.read_proto( |
| f"read {self.m.path.basename(logs_bin_proto)}", |
| logs_bin_proto, |
| log_pb2.LogDump, |
| codec="BINARY", |
| include_log=False, |
| test_proto=log_pb2.LogDump(), |
| ) |
| if not log_dump.records: |
| return |
| |
| rows = [ |
| { |
| "build_id": self.m.buildbucket_util.id, |
| "log": record_to_bq_dict(record), |
| } |
| for record in log_dump.records |
| ] |
| self.m.bqupload.insert( |
| step_name="upload logs", |
| project="fuchsia-engprod-metrics-prod", |
| dataset="metrics", |
| table="rbe_client_command_logs_v2", |
| rows=rows, |
| ).presentation.step_text = pluralize("row", rows) |