| # Copyright 2021 The Fuchsia Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| from contextlib import contextmanager |
| |
| from recipe_engine import recipe_api |
| from google.protobuf import json_format |
| from PB.recipe_modules.fuchsia.rbe import rbe_metrics |
| from PB.go.fuchsia.dev.foundry_x.re_client.api.stats import stats as stats_pb2 |
| |
| |
| class RbeApi(recipe_api.RecipeApi): |
| """RemoteExecutionApi contains helper functions for using remote execution |
| services via re-client/re-proxy.""" |
| |
| def __init__(self, props, *args, **kwargs): |
| super(RbeApi, self).__init__(*args, **kwargs) |
| |
| self._rbe_path = None |
| self._config_path = None |
| self._instance = props.instance |
| self._log_format = props.log_format or "reducedtext" |
| self._started = False |
| |
| @contextmanager |
| def __call__(self): |
| """Make context wrapping reproxy start/stop. |
| |
| Raises: |
| StepFailure or InfraFailure if it fails to start/stop. |
| """ |
| |
| # Save current value of infra_step so we can reset it when we |
| # yield back. |
| is_infra_step = self.m.context.infra_step |
| |
| # Separate invocations of RBE tools should use unique paths to avoid |
| # conflicts between log/metric files. |
| working_dir = self.m.path.mkdtemp(prefix="rbe") |
| |
| with self.m.context(env=self._environment(working_dir), infra_steps=True): |
| try: |
| self._start() |
| with self.m.context(infra_steps=is_infra_step): |
| yield |
| finally: |
| self._stop(working_dir) |
| |
| @property |
| def _bootstrap_path(self): |
| assert self._rbe_path |
| return self._rbe_path.join("bootstrap") |
| |
| def _environment(self, working_dir): |
| cache_dir = self.m.path["cache"].join("rbe") |
| |
| # Environment. The source of truth for remote execution configuration |
| # is the Fuchsia tree (see $FUCHSIA_OUT_DIR/rbe_config.json). These |
| # values are used to modify the configuration in Infrastructure when |
| # appropriate. These should not be used to modify the behavior of the |
| # build in a meaningful way. |
| return { |
| # Override default instance. Infrastructure uses different RBE |
| # backends for different environments. |
| "RBE_instance": self._instance, |
| # Set deps cache path. |
| "RBE_deps_cache_dir": cache_dir.join("deps"), |
| # Set preferred log format for reproxy. |
| "RBE_log_format": self._log_format, |
| # Set log paths within the task working directory. |
| "RBE_log_dir": working_dir, |
| "RBE_output_dir": working_dir, |
| "RBE_proxy_log_dir": working_dir, |
| "RBE_server_address": "unix://{}".format(working_dir.join("reproxy.sock")), |
| # Use GCE credentials by default. Infrastructure presents an |
| # emulated GCE metadata server in all environments for uniformity. |
| "RBE_use_application_default_credentials": "False", |
| "RBE_use_gce_credentials": "True", |
| } |
| |
| def _log_format_to_ext(self, format): # pragma: no cover |
| if format == "text": |
| return "rpl" |
| elif format == "reducedtext": |
| return "rrpl" |
| |
| @property |
| def _reproxy_path(self): |
| assert self._rbe_path |
| return self._rbe_path.join("reproxy") |
| |
| def set_path(self, path): |
| """Path to the reproxy/bootstrap binary directory.""" |
| self._rbe_path = path |
| |
| def set_config_path(self, config_path): |
| """Path to the config file for the repository being built. |
| |
| In the case of Fuchsia, this should be set to the path referenced by |
| $FUCHSIA_OUT_DIR/rbe_config.json as reported by `gn gen`. |
| """ |
| self._config_path = config_path |
| |
| def _start(self): |
| """Start reproxy.""" |
| assert not self._started |
| |
| with self.m.step.nest("setup remote execution") as presentation: |
| cmd = [self._bootstrap_path, "--re_proxy={}".format(self._reproxy_path)] |
| if self._config_path: |
| cmd += ["--cfg={}".format(self._config_path)] |
| self.m.step("start reproxy", cmd) |
| self._started = True |
| |
| def _stop(self, working_dir): |
| """Stop reproxy.""" |
| with self.m.step.nest("teardown remote execution") as presentation: |
| cmd = [self._bootstrap_path, "--shutdown"] |
| if self._config_path: |
| cmd += ["--cfg={}".format(self._config_path)] |
| try: |
| self.m.step("stop reproxy", cmd) |
| self._started = False |
| finally: |
| # reproxy/rewrapper/bootstrap record various log information in |
| # a number of locations. At the time of this implementation, |
| # the following log files are used: |
| # 1. bootstrap.<INFO|WARNING|ERROR|FATAL> is standard logging |
| # for `bootstrap`. Each log file includes more severe logging |
| # levels, e.g. bootstrap.WARNING includes WARNNG, ERROR & FATAL |
| # log messages. |
| # 2. rbe_metrics.txt is the text representation of a proto |
| # message that describes metrics related to the rbe execution. |
| # 3. reproxy.<INFO|WARNING|ERROR|FATAL> is standard logging for |
| # `reproxy`. See notes in #1 for more details. |
| # 4. reproxy_log.txt is the log file that records all info |
| # about all actions that are processed through reproxy. |
| # 5. reproxy_outerr.log is merged stderr/stdout of `reproxy`. |
| # 6. rewrapper.<INFO|WARNING|ERROR|FATAL> is standard logging |
| # for `rewrapper`. See notes in #1 for more details. |
| # |
| # We extract the WARNING log messages for each portion of the |
| # local rbe client as well as reproxy stdout/stderr and metrics |
| # from the build by default. If further debugging is required, |
| # you could increase the verbosity of log messages that we |
| # retain in logdog or add the full reproxy_log.txt log file to |
| # the list of outputs. |
| diagnostic_outputs = [ |
| "bootstrap.WARNING", |
| "rbe_metrics.txt", |
| "reproxy.WARNING", |
| "reproxy_outerr.log", |
| "rewrapper.WARNING", |
| ] |
| |
| for output in diagnostic_outputs: |
| path = working_dir.join(output) |
| # Not all builds use rbe, so it might not exist. |
| self.m.path.mock_add_paths(path) |
| if self.m.path.exists(path): |
| self.m.file.read_text( |
| "read {}".format(output.replace(".", "_")), |
| path, |
| test_data="test log", |
| ) |
| |
| # reproxy also produces a log file of all the actions which |
| # it handles including more detailed debugging information |
| # useful for debugging. |
| rpl_ext = self._log_format_to_ext(self._log_format) |
| rpl_file_glob = "*.{}".format(rpl_ext) |
| rpl_paths = self.m.file.glob_paths( |
| name="find {} files".format(rpl_ext), |
| source=working_dir, |
| pattern=rpl_file_glob, |
| test_data=[ |
| "reproxy_2021-10-16_22_52_23.{}".format(rpl_ext), |
| ], |
| ) |
| |
| # More than 1 rpl file is likely a bug but we can punt until |
| # that breaks someone. |
| for p in rpl_paths: |
| self.m.path.mock_add_paths(p) |
| # Not all builds use rbe, so it might not exist. |
| if self.m.path.exists(p): |
| self.m.file.read_text( |
| "read {}".format(self.m.path.basename(p).replace(".", "_")), |
| p, |
| test_data="test log", |
| ) |
| |
| self._upload_metrics(working_dir=working_dir) |
| |
| def _upload_metrics(self, working_dir): |
| if not (self.m.buildbucket.builder_name and self.m.buildbucket.build.id): |
| # Skip the upload if it does not have build input information. |
| return |
| |
| bq_pb = rbe_metrics.RbeMetrics() |
| bq_pb.build_id = self.m.buildbucket.build.id |
| bq_pb.builder_name = self.m.buildbucket.builder_name |
| bq_pb.created_at.FromDatetime(self.m.time.utcnow()) |
| bq_pb.instance = self._instance |
| |
| path = self.m.path.join(working_dir, "rbe_metrics.pb") |
| self.m.path.mock_add_paths(path) |
| if not self.m.path.exists(path): # pragma: no cover |
| return |
| |
| stats = self.m.file.read_proto( |
| "read rbe_metrics.pb", |
| path, |
| stats_pb2.Stats, |
| codec="BINARY", |
| test_proto=stats_pb2.Stats( |
| environment=dict( |
| foo="false", |
| bar="42", |
| ) |
| ), |
| ) |
| |
| bq_pb.stats.CopyFrom(stats) |
| |
| bq_json_dict = json_format.MessageToDict( |
| message=bq_pb, preserving_proto_field_name=True |
| ) |
| |
| # "environment" is a map field and gets serialized to a JSON map. |
| # Unfortunately, this is incompatible with the corresponding BQ schema, |
| # which is a repeated field and thus expects a JSON array. |
| envs = bq_pb.stats.environment |
| bq_json_dict["stats"]["environment"] = [ |
| {"key": k, "value": v} for k, v in sorted(envs.items()) |
| ] |
| |
| step_result = self.m.bqupload.insert( |
| step_name="upload metrics", |
| project="fuchsia-engprod-metrics-prod", |
| dataset="metrics", |
| table="rbe_client_metrics", |
| rows=[bq_json_dict], |
| ) |
| |
| step_result.presentation.logs["json.output"] = self.m.json.dumps( |
| bq_json_dict["stats"], indent=4 |
| ).splitlines() |