| # Copyright 2019 The Fuchsia Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| import collections |
| import copy |
| |
| import attr |
| |
| from recipe_engine import recipe_api |
| from recipe_engine.config_types import Path |
| |
| from RECIPE_MODULES.fuchsia.testsharder import api as testsharder_api |
| |
| # List of available targets. |
| TARGETS = ['x64', 'arm64'] |
| |
| # The PCI address to use for the block device to contain test results. |
| TEST_FS_PCI_ADDR = '06.0' |
| |
| # The path in the BootFS manifest that we want runcmds to show up at. |
| RUNCMDS_BOOTFS_PATH = 'infra/runcmds' |
| |
| SECRETSHIM_CIPD_VERSION = 'git_revision:63ab3ac613fceb52ac49b63b43fce841a2585645' |
| |
| # Name of BigQuery project and table for uploading artifacts. |
| BIGQUERY_PROJECT = 'fuchsia-infra' |
| BIGQUERY_ARTIFACTS_DATASET = 'artifacts' |
| |
| # Image and manifest names produced by the build. |
| IMAGES_JSON = 'images.json' |
| STORAGE_FULL = 'storage-full' |
| |
| TEST_RESULTS_ARCHIVE_NAME = 'out.tar' |
| TEST_RESULTS_MINFS_NAME = 'output.fs' |
| SERIAL_LOG_NAME = 'serial.txt' |
| SYSLOG_NAME = 'syslog.txt' |
| TEST_SUMMARY_JSON = 'summary.json' |
| KERNEL_LOG = 'kernel_log.txt' |
| COVARGS_LOG_LEVEL = 'debug' |
| COVARGS_OUTPUT_JSON = 'covargs-output.json' |
| |
| # The log level to use for botanist invocations in test tasks. Can be one of |
| # "fatal", "error", "warning", "info", "debug", or "trace", where "trace" is |
| # the most verbose, and fatal is the least. |
| BOTANIST_LOG_LEVEL = 'debug' |
| |
| # The path to the botanist config for devices preprovisioned into containers. |
| BOTANIST_DEVICE_CONFIG = '/etc/botanist/config.json' |
| |
| # System path at which authorized SSH keys are stored. |
| AUTHORIZED_KEY_PATH = 'data/ssh/authorized_keys' |
| |
| |
| @attr.s |
| class FuchsiaTestResults(object): |
| """Represents the result of testing of a Fuchsia build. |
| |
| Attributes: |
| from_fuchsia (bool): Whether the tests ran on Fuchsia. |
| results_dir (Path): The directory that the test results archive has |
| been unpacked into. |
| output_dir (Path): A directory containing the outputs of the swarming |
| task that ran these tests. Anything that's in this directory will be |
| uploaded to GCS when upload_results() is called. |
| outputs (dict[str]str): A mapping from of relative paths to files |
| containing stdout+stderr data to strings containing those contents. |
| env_name (str): The name of the task that ran these tests. |
| tests (seq(testsharder.Test)): The tests that this task was instructed |
| to run (as opposed to the results of the tests that this task *did* |
| run, which are enumerated in `summary`). |
| legacy_qemu (bool): Whether these tests were run using QEMU with |
| runtests (no ssh). |
| api (RecipeApi): The api to use for accessing recipe modules from this |
| object. |
| symbolizer_output (Path|None): The path to the symbolized log file |
| produced by running these tests. |
| overwrite_summary (bool): Whether to set the "name" and "gn_label" fields |
| in the summary.json produced by these tests using the corresponding |
| values from the input tests.json. Only affects legacy QEMU tests. |
| (Solely for backwards compatibility with fuchsia_perf.) |
| """ |
| |
| from_fuchsia = attr.ib(type=bool) |
| results_dir = attr.ib(type=Path) |
| output_dir = attr.ib(type=Path) |
| outputs = attr.ib(type=dict) |
| _env_name = attr.ib(type=str) |
| _tests = attr.ib(type=testsharder_api.Test) |
| _legacy_qemu = attr.ib(type=bool) |
| _api = attr.ib(type=recipe_api.RecipeApi) |
| _symbolizer_output = attr.ib(None, type=Path) |
| # TODO(fxb/10410): Get rid of overwrite_summary after fuchsia_perf is dead. |
| _overwrite_summary = attr.ib(True, type=bool) |
| |
| # Set lazily by the `summary` property, not a parameter to __init__. |
| _summary = attr.ib(None, init=False) |
| |
| # Constants representing the result of running a test. These enumerate the |
| # values of the 'results' field of the entries in the summary.json file |
| # obtained from the target device. |
| _TEST_RESULT_PASS = 'PASS' |
| _TEST_RESULT_FAIL = 'FAIL' |
| |
| @property |
| def summary(self): |
| """The parsed summary file as a Dict or {} if missing.""" |
| if self._summary is None: |
| self._summary = self._parse_summary() |
| return self._summary |
| |
| @property |
| def summary_lines(self): |
| """Returns a list of the lines of the summary.json file.""" |
| return self._api.json.dumps(self.summary, indent=2).splitlines() |
| |
| @property |
| def passed(self): |
| """Whether all the tests passed.""" |
| tests = self.summary.get('tests', []) |
| return all(test['result'] == self._TEST_RESULT_PASS for test in tests) |
| |
| @property |
| def passed_test_outputs(self): |
| """All entries in |self.outputs| for tests that passed.""" |
| return self._filter_outputs_by_test_result(self._TEST_RESULT_PASS) |
| |
| @property |
| def failed_test_outputs(self): |
| """All entries in |self.outputs| for tests that failed.""" |
| return self._filter_outputs_by_test_result(self._TEST_RESULT_FAIL) |
| |
| def _filter_outputs_by_test_result(self, result): |
| """Returns all entries in |self.outputs| whose result is |result|. |
| |
| Args: |
| result (String): one of the _TEST_RESULT_* constants from this class. |
| |
| Returns: |
| A dict whose keys are paths to the files containing each test's |
| stderr+stdout data and whose values are strings containing those |
| contents. |
| """ |
| matches = collections.OrderedDict() |
| # TODO(kjharland): Sort test names first. |
| for test in self.summary.get('tests', ()): |
| if test['result'] == result: |
| # The 'output_file' field is a path to the file containing the |
| # stderr+stdout data for the test, and we inline the contents of that |
| # file as the value in the returned dict. |
| matches[test['name']] = self.outputs[test['output_file']] |
| |
| return matches |
| |
| def _parse_summary(self): |
| raw_summary = self.outputs.get(TEST_SUMMARY_JSON, '') |
| if not raw_summary: |
| return {} |
| |
| try: |
| summary = self._api.json.loads(raw_summary) |
| except ValueError as e: # pragma: no cover |
| # TODO(olivernewman): JSONDecodeError in python >=3.5 |
| raise self._api.step.StepFailure('Invalid %s: %s' % |
| (TEST_SUMMARY_JSON, e.args[0])) |
| |
| if not self._overwrite_summary or not self._legacy_qemu: |
| return summary |
| # We want all Fuchsia tests to have the package URL in the name field. But |
| # QEMU tests set "name" to be the test install path (since the test list |
| # sent to QEMU is a list of paths). So overwrite the "name" field to be the |
| # package URL instead. |
| # Also set "gn_label", which doesn't automatically get passed through from |
| # tests.json. |
| tests_by_path = {test.path: test for test in self._tests} |
| for summary_test in summary['tests']: |
| path = summary_test['name'] |
| # Some zircon tests get run even though they don't show up in tests.json. |
| # TODO(olivernewman): After build unification is complete we can assume |
| # that every test in summary.json will have a corresponding entry in |
| # tests.json, so get rid of this check and update every summary test. |
| if path in tests_by_path: |
| test = tests_by_path[path] |
| assert test.package_url |
| summary_test.update( |
| name=test.package_url, |
| gn_label=test.label, |
| ) |
| return summary |
| |
| def upload_results(self, gcs_bucket, upload_to_catapult): |
| """Upload select test results (e.g., coverage data) to a given GCS bucket.""" |
| assert gcs_bucket |
| with self._api.step.nest('upload %s test results' % |
| self._env_name) as presentation: |
| if self.summary: |
| # Save the summary JSON to the test shard output dir so it gets |
| # uploaded to GCS for easy access by e.g. Dachsiaboard. |
| summary_path = self.output_dir.join(TEST_SUMMARY_JSON) |
| assert not self._api.path.exists(summary_path), ( |
| 'test output files should not be named %s' % TEST_SUMMARY_JSON) |
| self._api.file.write_json('write %s' % TEST_SUMMARY_JSON, summary_path, |
| self.summary) |
| |
| self._upload_outputs(gcs_bucket) |
| link = 'go/fuchsia-result-store/bid:%s' % self._api.buildbucket_util.id |
| presentation.links[link] = link.replace('go/', 'https://goto.google.com/') |
| |
| if upload_to_catapult: |
| self._api.upload.test_outputs_to_catapult(self.output_dir) |
| |
| def _upload_outputs(self, gcs_bucket): |
| self._api.upload.directory_to_gcs( |
| source=self.output_dir, |
| bucket=gcs_bucket, |
| # Namespace with the test environment name to avoid collision |
| # of artifacts across shards. |
| subpath=self._env_name, |
| ) |
| |
| def raise_failures(self): |
| """Raises a step failure if there were test failures.""" |
| if not self.summary: |
| # Halt with step failure if summary file is missing. |
| raise self._api.step.StepFailure( |
| 'Test summary JSON not found, see symbolized log for details') |
| |
| failed_tests = self.failed_test_outputs.keys() |
| if failed_tests: |
| # Halt with a step failure. |
| raise self._api.step.StepFailure('Test failure(s): ' + |
| ', '.join(failed_tests)) |
| |
| # Check serial log for failure messages |
| # TODO(9936): Replace with running binary tool once created. |
| fail_strings = ['DEVICE SUSPEND TIMED OUT', 'ASSERT FAILED'] |
| log_path = self.output_dir.join(SERIAL_LOG_NAME) |
| self._api.path.mock_add_paths(log_path) |
| if self._api.path.exists(log_path): |
| self._check_log_for_failures(log_path, fail_strings) |
| |
| def _check_log_for_failures(self, log_path, fail_strings): |
| """Checks for fail strings in log and fails accordingly.""" |
| log_name = self._api.path.basename(log_path) |
| with self._api.step.nest('check log %s:%s' % |
| (self._env_name, log_name)) as check_log_step: |
| contents = self._api.file.read_text( |
| 'read %s' % log_name, |
| log_path, |
| test_data='extra log contents', |
| ).get_result() |
| for fail_str in fail_strings: |
| if fail_str in contents: |
| check_log_step.presentation.logs[log_name] = contents.splitlines() |
| raise self._api.step.StepFailure( |
| 'Found failure string in log %s: %s' % (log_name, fail_str)) |
| |
| |
| def create_task(api, *args, **kwargs): |
| """Create a Task object. |
| |
| The base class of the class is inside the api object, so it can't be |
| top-level or otherwise defined at module load time. Defining it in this |
| function as an alternative. |
| |
| For full args list see Task.__init__ a few lines down. |
| """ |
| |
| class Task(api.swarming_retry.TriggeredTask): |
| |
| def __init__(self, api, name, request, uses_legacy_qemu, targets_fuchsia, |
| symbolize_tool, llvm_symbolizer, tests, |
| debug_symbol_gcs_bucket, *args, **kwargs): |
| super(Task, self).__init__( |
| api=api, name=name, request=request, *args, **kwargs) |
| self._uses_legacy_qemu = uses_legacy_qemu |
| self._targets_fuchsia = targets_fuchsia |
| self._symbolize_tool = symbolize_tool |
| self._llvm_symbolizer = llvm_symbolizer |
| self._tests = tests |
| self._debug_symbol_gcs_bucket = debug_symbol_gcs_bucket |
| |
| # Test shards with the 'multiplied:' prefix come from |
| # tools/integration/testsharder/shard.go in fuchsia.git. They were |
| # specifically created to run a test or set of tests many times to look |
| # for flakes. It doesn't make sense to retry these when they fail--the |
| # goal is to see if they fail not to get them to pass. |
| if name.startswith('multiplied:'): |
| self.max_attempts = 1 |
| |
| def process_result(self): |
| """Unpacks the results archive produced by a test shard.""" |
| |
| attempt = self.attempts[-1] |
| assert attempt.result |
| result = attempt.result |
| |
| if result.isolated_outputs: |
| attempt.task_outputs_link = result.isolated_outputs.url |
| |
| if result.state == self._api.swarming.TaskState.TIMED_OUT: |
| attempt.failure_reason = 'timed out' |
| |
| attempt.test_results = None |
| |
| with self._api.step.nest(result.name): |
| attempt.symbolizer_output = result.output_dir.join( |
| self._api.symbolize.LOG) |
| # Figure out what happened to the swarming task. |
| if result.output: |
| # Always symbolize the result output if present in this case. |
| attempt.logs['symbolized log'] = self._api.symbolize( |
| symbolize_tool=self._symbolize_tool, |
| debug_symbol_gcs_bucket=self._debug_symbol_gcs_bucket, |
| llvm_symbolizer=self._llvm_symbolizer, |
| data=result.output, |
| symbolizer_output=attempt.symbolizer_output, |
| ) # yapf:disable |
| |
| if 'KERNEL PANIC' in result.output: |
| attempt.failure_reason = 'KERNEL PANIC' # pragma: no cover |
| |
| self._check_logs_for_failures(attempt) |
| |
| if result.success: |
| self._process_outputs(attempt) |
| |
| def _process_outputs(self, attempt): |
| """Reads the test results and output files of a swarming TaskResult. |
| |
| Sets attempt.test_results if successful. |
| |
| Args: |
| attempt (swarming_retry.Attempt): the attempt to process |
| """ |
| |
| assert attempt.result |
| result = attempt.result |
| |
| # Extract results if the task was not subject to an infra failure; |
| # otherwise, a step failure will be raised on exiting the |
| # defer_results() scope. |
| attempt.test_results_archive = None |
| for relative_path, absolute_path in sorted(result.outputs.iteritems()): |
| if relative_path in [ |
| TEST_RESULTS_ARCHIVE_NAME, TEST_RESULTS_MINFS_NAME |
| ]: |
| attempt.test_results_archive = absolute_path |
| |
| assert attempt.test_results_archive, ( |
| 'test archive not found amongst outputs of task %s' % result.name) |
| self._parse_test_results(attempt) |
| |
| attempt.logs[TEST_SUMMARY_JSON] = attempt.test_results.summary_lines |
| |
| # Delete the archive so it doesn't get uploaded with the other files in |
| # the swarming task's output directory. |
| self._api.file.remove( |
| 'remove %s' % self._api.path.basename(attempt.test_results_archive), |
| attempt.test_results_archive) |
| |
| def _parse_test_results(self, attempt): |
| """Parse test results from attempt into a FuchsiaTestResults object. |
| |
| Args: |
| attempt (swarming_retry.Attempt): the attempt to parse |
| """ |
| assert attempt.result |
| result = attempt.result |
| |
| results_dir = self._api.testing.results_dir_on_host.join(result.id) |
| # pylint: disable=protected-access |
| test_results_map = self._api.testing._extract_test_results_archive( |
| step_name='extract', |
| archive_path=attempt.test_results_archive, |
| leak_to=results_dir, |
| is_minfs=self._uses_legacy_qemu, |
| ) |
| # pylint: enable=protected-access |
| |
| attempt.test_results = FuchsiaTestResults( |
| from_fuchsia=self._targets_fuchsia, |
| results_dir=results_dir, |
| outputs=test_results_map, |
| env_name=result.name, |
| tests=self._tests, |
| legacy_qemu=self._uses_legacy_qemu, |
| api=api, |
| symbolizer_output=attempt.symbolizer_output, |
| output_dir=result.output_dir, |
| ) |
| |
| failed_tests = attempt.test_results.failed_test_outputs |
| if failed_tests: |
| attempt.failure_reason = '%d test(s) failed' % len(failed_tests) |
| |
| def _check_logs_for_failures(self, attempt): |
| """Check for failure strings in logs. |
| |
| Args: |
| attempt (swarming_retry.Attempt): the attempt to check for logs in |
| """ |
| # Check serial log for failure messages |
| # TODO(9936): Replace with running binary tool once created. |
| fail_strings = ['DEVICE SUSPEND TIMED OUT', 'ASSERT FAILED'] |
| log_path = attempt.result.output_dir.join(SERIAL_LOG_NAME) |
| self._api.path.mock_add_paths(log_path) |
| if self._api.path.exists(log_path): |
| log_name = self._api.path.basename(log_path) |
| with self._api.step.nest('check log %s' % log_name) as presentation: |
| contents = self._api.file.read_text('read', log_path) |
| for fail_str in fail_strings: |
| if fail_str in contents: |
| presentation.logs[log_name] = contents.splitlines() |
| presentation.status = self._api.step.FAILURE |
| presentation.step_summary_text = 'found "%s"' % fail_str |
| attempt.failure_reason = ('found "%s" in %s' % |
| (fail_str, log_name)) |
| |
| def present_status(self, parent_step, attempt, **kwargs): |
| """Present an Attempt while showing progress in launch/collect step. |
| |
| Args: |
| parent_step (Step): will always be 'passed tasks' or 'failed tasks' |
| attempt (Attempt): the Attempt to present |
| """ |
| del kwargs, parent_step # Unused. |
| with api.step.nest('%s (%s)' % (self.name, attempt.name)) as step: |
| self._present( |
| step, attempt, show_failures_in_red=False, show_passed=False) |
| |
| def present_attempt(self, task_step, attempt, category=None): |
| """Present an Attempt when summarizing results at the end of the run. |
| |
| Args: |
| task_step (Step): assuming present() was not overridden, this will |
| always be a step titled after the current task |
| attempt (Attempt): the Attempt to present |
| """ |
| del task_step # Unused. |
| |
| show_failures_in_red = True |
| # The 'passes' category includes all attempts of all tasks that |
| # eventually passed, so it includes some failures. Show those in |
| # green so people don't get confused and think the overall task |
| # failed. |
| # TODO(fxb/36647) after this bug is fixed show these steps in |
| # red, but show parent steps of those in green. |
| if category == 'passes': |
| show_failures_in_red = False |
| |
| with api.step.nest( |
| '%s (%s)' % |
| (attempt.name, 'pass' if attempt.success else 'fail')) as step: |
| if show_failures_in_red and not attempt.success: |
| step.status = self._api.step.FAILURE |
| self._present( |
| step, |
| attempt, |
| show_failures_in_red=show_failures_in_red, |
| show_passed=True, |
| ) |
| |
| def _present(self, step, attempt, show_failures_in_red, show_passed): |
| """Present an Attempt. |
| |
| Choosing to do largely the same thing for both kinds of presentations. |
| |
| Args: |
| step (Step): parent step |
| attempt (api.swarming_retry.Attempt): object to present |
| show_failures_in_red (bool): show failures in red (for final |
| 'flakes' and 'failures' steps) or not (for 'launch/collect' |
| progress and 'passes' steps) |
| show_passed (bool): show the names of passed tests (only done for |
| the end) |
| |
| Note: the 'passes' step can have failures underneath it because the |
| first attempt can fail but the retry passed. |
| """ |
| |
| step.presentation.links['swarming task'] = attempt.task_ui_link |
| if attempt.task_outputs_link: |
| step.presentation.links['task outputs'] = attempt.task_outputs_link |
| |
| if attempt.failure_reason: |
| step.presentation.step_summary_text = attempt.failure_reason |
| |
| for log, data in attempt.logs.iteritems(): |
| step.presentation.logs[log] = data |
| |
| if attempt.test_results: |
| test_results = attempt.test_results |
| |
| # Log the contents of each output file mentioned in the summary. |
| # Note this assumes the outputs are all valid UTF-8 (See fxb/9500). |
| for name, path in test_results.summary.get('outputs', {}).iteritems(): |
| step.presentation.logs[name] = test_results.outputs[path].split('\n') |
| |
| for test, output in test_results.failed_test_outputs.iteritems(): |
| self._report_test_result( |
| test, |
| output, |
| passed=False, |
| show_failures_in_red=show_failures_in_red, |
| ) |
| |
| with self._api.step.nest('all passed tests') as passed_tests_step: |
| passed_tests = test_results.passed_test_outputs |
| passed_tests_step.presentation.step_summary_text = ( |
| '%d passed tests' % len(passed_tests)) |
| if show_passed: |
| # Start with a newline to prevent the first test from showing up on |
| # the same line as the step name. |
| passed_tests_step.presentation.step_text = ''.join( |
| '\n' + test_name for test_name in passed_tests) |
| |
| for log_name in [SYSLOG_NAME, SERIAL_LOG_NAME]: |
| if log_name in attempt.result.outputs: |
| self._present_output_file( |
| name=log_name, path=attempt.result.outputs[log_name], step=step) |
| |
| def _report_test_result(self, |
| test, |
| output, |
| passed, |
| show_failures_in_red=True): |
| if not passed: |
| test = 'failed: %s' % test |
| step_result = self._api.step(test, None) |
| if not passed: |
| step_result.presentation.logs['stdio'] = output.split('\n') |
| if show_failures_in_red: |
| step_result.presentation.status = self._api.step.FAILURE |
| |
| def _present_output_file(self, name, path, step): |
| """Records file contents to the test results step's presentation.""" |
| contents = self._api.file.read_text( |
| 'read %s' % name, |
| path, |
| test_data='extra log contents', |
| ) |
| step.presentation.logs[name] = contents.splitlines() |
| |
| return Task(*args, api=api, **kwargs) |
| |
| |
| class _TaskRequester(object): |
| """Creates requests for swarming tasks that run tests.""" |
| |
| def __init__(self, api, buildbucket_build, per_test_timeout_secs, pool, |
| swarming_expiration_timeout_secs, swarming_io_timeout_secs, |
| timeout_secs, use_runtests): |
| self._api = api |
| self._buildbucket_build = buildbucket_build |
| self._per_test_timeout_secs = per_test_timeout_secs |
| self._pool = pool |
| self._swarming_expiration_timeout_secs = swarming_expiration_timeout_secs |
| self._swarming_io_timeout_secs = swarming_io_timeout_secs |
| self._timeout_secs = timeout_secs |
| self._use_runtests = use_runtests |
| |
| def request(self, shard, build_artifacts): |
| # Copy the build_artifacts object to be modified for each shard. |
| build_artifacts = copy.deepcopy(build_artifacts) |
| |
| if self._api.testing._uses_legacy_qemu(shard): |
| task_request = self._api.testing._construct_legacy_qemu_task_request( |
| task_name=shard.name, |
| pool=self._pool, |
| build_artifacts=build_artifacts, |
| timeout_secs=self._timeout_secs, |
| swarming_io_timeout_secs=self._swarming_io_timeout_secs, |
| swarming_expiration_timeout_secs=( |
| self._swarming_expiration_timeout_secs), |
| # TODO(IN-654): Add support for secret_bytes. |
| secret_bytes='', |
| qemu_type=shard.device_type, |
| shard=shard, |
| ) |
| else: |
| task_request = self._construct_test_task_request( |
| build_artifacts=build_artifacts, shard=shard) |
| |
| return self._api.build.ShardTaskRequest(shard, task_request) |
| |
| def _construct_test_task_request(self, build_artifacts, shard): |
| """Constructs a Swarming task request to run a shard of Fuchsia tests. |
| |
| Args: |
| build_artifacts (BuildArtifacts): The Fuchsia build artifacts to test. |
| shard (api.testsharder.Shard): A shard of tests. |
| timeout_secs (int): The amount of seconds to wait for the tests to execute |
| before giving up. |
| |
| Returns: |
| An api.swarming.TaskRequest representing the swarming task request. |
| """ |
| # To freely archive files from the build directory, the source, and those we |
| # dynamically create, we create a tree of symlinks in a fresh directory and |
| # isolate that. This solves the problems of (a) finding a root directory |
| # that works for all artifacts, (b) being able to create files in that |
| # directory without fear of collision, and (c) not having to isolate |
| # extraneous files. |
| isolate_tree = self._api.file.symlink_tree( |
| root=self._api.path.mkdtemp('isolate')) |
| |
| test_manifest = 'tests.json' |
| self._api.file.write_json( |
| 'write test manifest', |
| isolate_tree.root.join(test_manifest), |
| [test.render_to_jsonish() for test in shard.tests], |
| indent=2) |
| |
| cmd = [] |
| outputs = [] |
| ensure_file = self._api.cipd.EnsureFile() |
| dimensions = {'pool': self._pool} |
| test_bot_cpu = 'x64' |
| is_emu_type = self._api.emu.is_emulator_type(shard.device_type) |
| |
| # This command spins up a metadata server that allows its subcommands to |
| # automagically authenticate with LUCI auth, provided the sub-exec'ed tool |
| # was written in go or dart and respectively makes use of the standard |
| # cloud.google.com/go/compute/metadata or |
| # github.com/dart-lang/googleapis_auth authentication libraries. Such |
| # libraries look for a metadata server under environment variables |
| # like $GCE_METADATA_HOST, which LUCI emulates. |
| if shard.service_account: |
| # TODO(fxbug.dev/37142): Find a way to use the version that LUCI is |
| # currently using, instead of 'latest'. |
| ensure_file.add_package('infra/tools/luci-auth/${platform}', 'latest') |
| cmd.extend(['./luci-auth', 'context', '--']) |
| |
| if is_emu_type: |
| dimensions.update(os='Debian', cpu=build_artifacts.target, kvm='1') |
| # To take advantage of KVM, we execute QEMU-arm tasks on arm hardware. |
| test_bot_cpu = build_artifacts.target |
| else: |
| dimensions.update(shard.dimensions) |
| |
| if shard.targets_fuchsia: |
| botanist_cmd = [ |
| './botanist', |
| '-level', BOTANIST_LOG_LEVEL, |
| 'run', |
| '-images', IMAGES_JSON, |
| '-timeout', '%ds' % self._timeout_secs, |
| '-syslog', SYSLOG_NAME, |
| '-serial-log', SERIAL_LOG_NAME, |
| ] # yapf: disable |
| outputs.append(SYSLOG_NAME) |
| outputs.append(SERIAL_LOG_NAME) |
| |
| # TODO(fxbug.dev/40840): Once we can scope the proxy server to a |
| # an individual task, we can make free use of it in the emulator case. |
| if not is_emu_type: |
| botanist_cmd.extend([ |
| # For container networking and authentication reasons, we access GCS |
| # via a proxy server running on the controller. |
| '-repo', self._api.artifacts.package_repo_url(host='$GCS_PROXY_HOST'), |
| '-blobs', self._api.artifacts.package_blob_url(host='$GCS_PROXY_HOST'), |
| ]) # yapf: disable |
| |
| config = BOTANIST_DEVICE_CONFIG |
| if self._api.emu.is_emulator_type(shard.device_type): |
| config = './qemu.json' |
| botanist_cmd.extend( |
| ['-ssh', build_artifacts.DEFAULT_ISOLATED_LAYOUT.private_key]) |
| qemu_config = [{ |
| 'type': shard.device_type.lower(), |
| 'path': './%s/bin' % shard.device_type.lower(), |
| 'target': build_artifacts.target, |
| 'cpu': 8, |
| 'memory': 8192, |
| 'kvm': True, |
| }] |
| |
| if shard.device_type == 'AEMU': |
| self._api.emu.add_aemu_to_ensure_file(ensure_file, subdir='aemu/bin') |
| elif shard.device_type == 'QEMU': |
| self._api.emu.add_qemu_to_ensure_file(ensure_file, subdir='qemu') |
| |
| self._api.file.write_json( |
| 'write qemu config', |
| isolate_tree.root.join('qemu.json'), |
| qemu_config, |
| indent=2) |
| elif shard.netboot: |
| botanist_cmd.append('-netboot') |
| |
| botanist_cmd.extend(['-config', config]) |
| |
| cmd.extend(botanist_cmd) |
| |
| cmd.extend([ |
| './testrunner', |
| '-archive', |
| TEST_RESULTS_ARCHIVE_NAME, |
| ]) |
| if self._use_runtests: |
| cmd.append('-use-runtests') |
| if self._per_test_timeout_secs: |
| cmd.extend(['-per-test-timeout', '%ds' % self._per_test_timeout_secs]) |
| cmd.append(test_manifest) |
| |
| outputs.append(TEST_RESULTS_ARCHIVE_NAME) |
| |
| isolated_hash = self._api.testing._isolate_build_artifacts( |
| isolate_tree, build_artifacts, shard=shard, test_bot_cpu=test_bot_cpu) |
| |
| env_name = '%s-%s' % (shard.device_type or shard.os, build_artifacts.target) |
| tags = {'test_environment_name': [env_name]} |
| |
| request = (self._api.swarming.task_request(). |
| with_name(shard.name). |
| with_service_account(shard.service_account). |
| with_tags(tags) |
| ) #yapf: disable |
| return request.with_slice(0, request[0]. |
| with_command(cmd). |
| with_isolated(isolated_hash). |
| with_dimensions(**dimensions). |
| with_expiration_secs(self._swarming_expiration_timeout_secs). |
| with_io_timeout_secs(self._swarming_io_timeout_secs). |
| with_execution_timeout_secs(self._timeout_secs). |
| with_outputs(outputs). |
| with_cipd_ensure_file(ensure_file). |
| with_env_vars(**self._test_task_env_vars(shard, build_artifacts)) |
| ) #yapf: disable |
| |
| def _test_task_env_vars(self, shard, build_artifacts): |
| """Returns the environment variables to be set for the test task. |
| |
| Returns: |
| A dict mapping string env var names to string values. |
| """ |
| build = self._buildbucket_build |
| commit = build.input.gitiles_commit |
| llvm_symbolizer = self._api.path.basename(build_artifacts.llvm_symbolizer) |
| env_vars = dict( |
| # `${ISOLATED_OUTDIR}` is a magic string that Swarming will replace |
| # with a temporary directory into which files will be automatically |
| # collected upon exit of a task. |
| FUCHSIA_TEST_OUTDIR='${ISOLATED_OUTDIR}', |
| BUILDBUCKET_ID=str(build.id), |
| BUILD_BOARD=build_artifacts.board, |
| BUILD_TYPE=build_artifacts.build_type, |
| BUILD_PRODUCT=build_artifacts.product, |
| BUILD_TARGET=build_artifacts.target, |
| BUILDBUCKET_BUCKET=build.builder.bucket, |
| |
| # Used for symbolization: |
| ASAN_SYMBOLIZER_PATH=llvm_symbolizer, |
| UBSAN_SYMBOLIZER_PATH=llvm_symbolizer, |
| LSAN_SYMBOLIZER_PATH=llvm_symbolizer, |
| |
| # Used by the catapult converter |
| BUILD_CREATE_TIME=str(build.create_time.seconds), |
| BUILDER_NAME=build.builder.builder, |
| FUCHSIA_DEVICE_TYPE=shard.device_type, |
| INPUT_COMMIT_HOST=commit.host, |
| INPUT_COMMIT_PROJECT=commit.project, |
| INPUT_COMMIT_REF=commit.ref, |
| ) |
| # For some reason, empty string environment variables sent to the swarming |
| # API get interpreted as null and rejected. So don't bother sending them to |
| # avoid breaking the task request. |
| # TODO(olivernewman): Figure out whether this logic should be moved into |
| # the upstream swarming module (or obviated by fixing the "" -> null |
| # behavior). |
| return {k: v for k, v in env_vars.iteritems() if v} |
| |
| |
| class _ShardedTestRunner(object): |
| """Handles running and analyzing tests that have been split into shards.""" |
| |
| def __init__(self, api, collect_timeout, debug_symbol_gcs_bucket, |
| llvm_symbolizer, max_attempts, swarming_output_dir, |
| symbolize_tool, shard_requests): |
| self._api = api |
| self._swarming_output_dir = swarming_output_dir |
| self._max_attempts = max_attempts |
| self._collect_timeout = collect_timeout |
| self.tasks = [] |
| for shard_request in shard_requests: |
| uses_legacy_qemu = any(tag.lower() == 'uses_legacy_qemu:true' |
| for tag in shard_request.task_request.tags) |
| targets_fuchsia = shard_request.task_request[0].dimensions.get( |
| 'os', '').lower() not in ('linux', 'mac') |
| self.tasks.append( |
| create_task( |
| api=self._api, |
| name=shard_request.task_request.name, |
| request=shard_request.task_request, |
| symbolize_tool=symbolize_tool, |
| llvm_symbolizer=llvm_symbolizer, |
| tests=shard_request.shard.tests, |
| debug_symbol_gcs_bucket=debug_symbol_gcs_bucket, |
| uses_legacy_qemu=uses_legacy_qemu, |
| targets_fuchsia=targets_fuchsia, |
| )) |
| |
| def run_tests(self): |
| """Runs all test shards and outputs FuchsiaTestResults object for each.""" |
| # TODO(fxb/35021) use context manager. |
| self._api.swarming_retry.run_tasks( |
| tasks=self.tasks, |
| collect_output_dir=self._swarming_output_dir, |
| max_attempts=self._max_attempts, |
| collect_timeout=self._collect_timeout, |
| ) |
| |
| self._api.swarming_retry.present_tasks(tasks=self.tasks) |
| |
| test_results = [ |
| x.attempts[-1].test_results |
| for x in self.tasks |
| if x.attempts[-1].test_results |
| ] |
| return test_results |
| |
| def raise_failures(self): |
| self._api.swarming_retry.raise_failures(self.tasks) |
| |
| |
| class FuchsiaTestApi(recipe_api.RecipeApi): |
| """An abstraction over how Jiri checkouts are created during Fuchsia CI/CQ builds.""" |
| |
| FuchsiaTestResults = FuchsiaTestResults |
| |
| def __init__(self, *args, **kwargs): |
| super(FuchsiaTestApi, self).__init__(*args, **kwargs) |
| self._test_runner = None |
| |
| def deprecated_shard_requests(self, |
| build_artifacts, |
| test_cmds, |
| device_type, |
| pool, |
| timeout_secs, |
| pave, |
| requires_secrets=False, |
| swarming_expiration_timeout_secs=18000, |
| swarming_io_timeout_secs=5 * 60): |
| """Returns a swarming task request for testing in the deprecated way. |
| |
| Args: |
| build_artifacts (BuildArtifacts): The Fuchsia build artifacts to test. |
| test_cmds (list[str]): Command to have Fuchsia run on boot. |
| pool (str): Swarming pool from which the test task will be drawn. |
| timeout_secs (int): The amount of seconds to wait for the tests to execute |
| before giving up. |
| pave (bool): Whether to pave the image to disk. Ignored if device_type == |
| 'QEMU'. |
| requires_secrets (bool): Whether tests require plaintext secrets; ignored |
| if device_type != 'QEMU'. |
| swarming_expiration_timeout_secs (int): Maximum run time for the swarming |
| task, once scheduled (enforced by swarming). |
| swarming_io_timeout_secs (int): The swarming task will be killed if it does |
| not produce any output for this long. |
| |
| Returns: |
| A list of a single ShardTaskRequest. |
| """ |
| assert test_cmds |
| assert device_type |
| |
| self.m.minfs.minfs_path = build_artifacts.minfs |
| self.m.zbi.zbi_path = build_artifacts.zbi |
| |
| # Copy build_artifacts because we modify its contents below. |
| build_artifacts = copy.deepcopy(build_artifacts) |
| self._install_runcmds_files( |
| build_artifacts, |
| device_type=device_type, |
| pave=pave, |
| test_cmds=test_cmds, |
| ) |
| |
| if self.m.emu.is_emulator_type(device_type): |
| secret_bytes = '' |
| if requires_secrets: |
| secret_bytes = self.m.json.dumps(self._decrypt_secrets(build_artifacts)) |
| task = self._construct_legacy_qemu_task_request( |
| task_name='all tests', |
| build_artifacts=build_artifacts, |
| pool=pool, |
| timeout_secs=timeout_secs, |
| swarming_expiration_timeout_secs=swarming_expiration_timeout_secs, |
| swarming_io_timeout_secs=swarming_io_timeout_secs, |
| secret_bytes=secret_bytes, |
| qemu_type=device_type, |
| ) |
| else: |
| task = self._construct_device_task_request( |
| task_name='all tests', |
| device_type=device_type, |
| build_artifacts=build_artifacts, |
| pool=pool, |
| pave=pave, |
| timeout_secs=timeout_secs, |
| swarming_expiration_timeout_secs=swarming_expiration_timeout_secs, |
| swarming_io_timeout_secs=swarming_io_timeout_secs, |
| ) |
| # In the deprecated testing code paths, shards are not used, but it makes |
| # other code simpler to have a valid shard here. |
| dummy_shard = self.m.testsharder.Shard('dummy', (), {}) |
| return [self.m.build.ShardTaskRequest(dummy_shard, task)] |
| |
| def deprecated_test_cmds(self, spec): |
| runtests_cmd_parts = ['runtests', '-o', self.results_dir_on_target] |
| if spec.test.per_test_timeout_secs: |
| runtests_cmd_parts.extend(['-i', '%d' % spec.test.per_test_timeout_secs]) |
| runtests_cmd_parts.append(spec.test.runtests_args) |
| return [' '.join(runtests_cmd_parts)] |
| |
| def _analyze_test_results(self, test_results, presentation=None): |
| """Analyzes test results represented by FuchsiaTestResults objects |
| |
| Logs individual test results in separate steps. |
| |
| Args: |
| test_results (FuchsiaTestResults): Fuchsia test result object |
| presentation (dict|None): A particular step's presentation on which to log |
| test result outputs; if not provided, that of the active result will be |
| used. |
| """ |
| if not test_results.summary: |
| return |
| presentation = presentation or self.m.step.active_result.presentation |
| |
| # Log the summary file's contents. |
| presentation.logs[TEST_SUMMARY_JSON] = test_results.summary_lines |
| |
| # Log the contents of each output file mentioned in the summary. |
| # Note this assumes the outputs are all valid UTF-8 (See fxb/9500). |
| for output_name, output_path in test_results.summary.get('outputs', |
| {}).iteritems(): |
| output_str = test_results.outputs[output_path] |
| presentation.logs[output_name] = output_str.split('\n') |
| |
| for test, output in test_results.failed_test_outputs.iteritems(): |
| self._report_test_result(test, output, passed=False) |
| with self.m.step.nest('all passed tests'): |
| for test, output in test_results.passed_test_outputs.iteritems(): |
| self._report_test_result(test, output, passed=True) |
| |
| def _report_test_result(self, test, output, passed): |
| name = test |
| if not passed: |
| # FlakeFetcher searches for the prefix "failed: " to find failed tests. |
| name = 'failed: ' + name |
| step_result = self.m.step(name, None) |
| step_result.presentation.logs['stdio'] = output.split('\n') |
| if not passed: |
| step_result.presentation.status = self.m.step.FAILURE |
| |
| def process_coverage(self, covargs_path, test_results, ids_txt, llvm_profdata, |
| llvm_cov, gcs_bucket): |
| output_dir = self.m.path['cleanup'].join('coverage') |
| |
| cmd = [ |
| covargs_path, |
| '-level', |
| COVARGS_LOG_LEVEL, |
| '-json-output', |
| self.m.json.output(name=COVARGS_OUTPUT_JSON), |
| '-output-dir', |
| output_dir, |
| '-llvm-profdata', |
| llvm_profdata, |
| '-llvm-cov', |
| llvm_cov, |
| '-ids', |
| ids_txt, |
| ] |
| |
| for result in test_results: |
| cmd.extend(['-summary', result.results_dir.join(TEST_SUMMARY_JSON)]) |
| |
| self.m.step('covargs', cmd) |
| |
| # TODO: move this into gsutil module/deduplicate this with other GCS logic |
| dst = 'builds/%s/coverage' % self.m.buildbucket.build_id |
| step_result = self.m.gsutil.rsync( |
| name='upload coverage', |
| src=output_dir, |
| bucket=gcs_bucket, |
| dst=dst, |
| recursive=True, |
| gzip_exts=['html'], |
| options={ |
| 'parallel_process_count': self.m.platform.cpu_count, |
| 'parallel_thread_count': 1, |
| }, |
| multithreaded=True) |
| step_result.presentation.links['index.html'] = self.m.gsutil._http_url( |
| gcs_bucket, self.m.gsutil.join(dst, 'index.html'), True) |
| |
| def _isolate_build_artifacts(self, |
| isolate_tree, |
| build_artifacts, |
| shard=None, |
| test_bot_cpu='x64', |
| legacy_qemu=False): |
| """Populates a tree with build artifacts and isolates it. |
| |
| Specifically, the following is linked into or created within the tree: |
| - The images in the build are linked in and manifest of them is created |
| in the root, if targeting a fuchsia device; |
| - The Linux/Mac tests in the shard and their runtime dependencies. |
| |
| Args: |
| isolate_tree (api.file.SymlinkTree): A tree into which artifacts may be |
| linked. |
| build (FuchsiaBuildResults): The result of a fuchsia build. |
| shard (api.testsharder.Shard|None): A test shard. |
| test_bot_cpu (str|None): The host cpu of the bot running the test task. |
| legacy_qemu (bool): Whether to only isolate the images needed to run QEMU |
| alone. |
| |
| Returns: |
| The isolated hash that may be used to reference and download the |
| artifacts. |
| """ |
| |
| def register_link(relpath): |
| """Prepares a symlink of a relative path within the build directory to the tree.""" |
| isolate_tree.register_link( |
| target=build_artifacts.fuchsia_build_dir.join(relpath), |
| linkname=isolate_tree.root.join(relpath), |
| ) |
| |
| # TODO(IN-931): Remove `shard is None` condition once device and QEMU |
| # codepaths are passing shard and using _construct_test_task_request(). |
| no_shard = shard is None |
| if no_shard or shard.targets_fuchsia: |
| image_list = build_artifacts.images.values() |
| # In the case of an emulated target, we restrict what we isolate to the |
| # bare essentials to avoid the needless downloading of several gigabytes |
| # of images on the other end. |
| is_emulated_target = ( |
| (no_shard and legacy_qemu) or |
| (shard and self.m.emu.is_emulator_type(shard.device_type)) |
| ) # yapf: disable |
| if is_emulated_target: |
| image_list = [ |
| img for img in image_list |
| if img['name'] in ['qemu-kernel', 'zircon-a', 'storage-full'] |
| ] # yapf: disable |
| image_manifest_path = isolate_tree.root.join(IMAGES_JSON) |
| self.m.file.write_json( |
| 'write image manifest', image_manifest_path, image_list, indent=2) |
| for image in image_list: |
| register_link(image['path']) |
| |
| if shard: |
| for test in shard.tests: |
| if test.os in ['linux', 'mac']: |
| register_link(test.path) |
| for dep in shard.deps: |
| register_link(dep) |
| |
| # If targeting QEMU we include a private key corresponding to an authorized |
| # key already in the boot image; this is needed as we do not pave QEMU. |
| if shard and self.m.emu.is_emulator_type(shard.device_type): |
| isolate_tree.register_link( |
| target=build_artifacts.private_key, |
| linkname=isolate_tree.root.join( |
| build_artifacts.DEFAULT_ISOLATED_LAYOUT.private_key,), |
| ) |
| |
| for tool in [ |
| build_artifacts.botanist(test_bot_cpu), |
| build_artifacts.testrunner(test_bot_cpu), |
| build_artifacts.llvm_symbolizer, |
| build_artifacts.bootserver, |
| ]: |
| tool_name = self.m.path.basename(tool) |
| isolate_tree.register_link( |
| target=tool, linkname=isolate_tree.root.join(tool_name)) |
| |
| isolate_tree.create_links('create tree of build artifacts') |
| isolated = self.m.isolated.isolated(isolate_tree.root) |
| isolated.add_dir(isolate_tree.root) |
| return isolated.archive('isolate build artifacts') |
| |
| @property |
| def results_dir_on_target(self): |
| """The directory on target to which target test results will be written.""" |
| return '/tmp/infra-test-output' |
| |
| @property |
| def results_dir_on_host(self): |
| """The directory on host to which host and target test results will be written. |
| |
| Target test results will be copied over to this location and host test |
| results will be written here. Host and target tests on should write to |
| separate subdirectories so as not to collide. |
| """ |
| return self.m.path['cleanup'].join('test_results') |
| |
| def _create_runcmds_script(self, device_type, test_cmds, output_path): |
| """Creates a script for running tests on boot.""" |
| # The device topological path is the toplogical path to the block device |
| # which will contain test output. |
| device_topological_path = '/dev/sys/pci/00:%s/virtio-block/block' % ( |
| TEST_FS_PCI_ADDR) |
| |
| # Script that mounts the block device to contain test output and runs tests, |
| # dropping test output into the block device. |
| results_dir = self.results_dir_on_target |
| runcmds = [ |
| 'mkdir %s' % results_dir, |
| ] |
| if self.m.emu.is_emulator_type(device_type): |
| runcmds.extend([ |
| # Wait until the MinFS test image shows up (max <timeout> ms). |
| 'waitfor class=block topo=%s timeout=60000' % device_topological_path, |
| 'mount %s %s' % (device_topological_path, results_dir), |
| ] + test_cmds + [ |
| 'umount %s' % results_dir, |
| 'dm poweroff', |
| ]) |
| else: |
| runcmds.extend(test_cmds) |
| runcmds_bytes = [] |
| for line in runcmds: |
| if isinstance(line, unicode): |
| runcmds_bytes.append(line.encode('utf-8')) |
| elif isinstance(line, str): |
| runcmds_bytes.append(line) |
| else: # pragma: no cover |
| assert False, 'line is not unicode or a str: %s, %s' % (line, |
| type(line)) |
| self.m.file.write_text('write runcmds', output_path, |
| '\n'.join(runcmds_bytes)) |
| |
| def _construct_legacy_qemu_task_request(self, |
| task_name, |
| build_artifacts, |
| pool, |
| timeout_secs, |
| swarming_expiration_timeout_secs, |
| swarming_io_timeout_secs, |
| secret_bytes, |
| qemu_type, |
| shard=None): |
| """Constructs a Swarming task request which runs Fuchsia tests inside QEMU. |
| |
| Expects the build and artifacts to be at the same place they were at |
| the end of the build. |
| |
| Args: |
| build_artifacts (BuildArtifacts): The Fuchsia build artifacts to test. |
| pool (str): Swarming pool from which the test task will be drawn. |
| timeout_secs (int): The amount of seconds to wait for the tests to execute |
| before giving up. |
| secret_bytes (str): secret bytes to pass to the QEMU task. |
| qemu_type (str): type of qemu, either QEMU or AEMU. |
| shard (api.testsharder.Shard|None): The shard associated with the task or |
| None if it's not a shard. |
| |
| Returns: |
| An api.swarming.TaskRequest representing the swarming task request. |
| """ |
| # To freely archive files from the build directory, the source, and those we |
| # dynamically create, we create a tree of symlinks in a fresh directory and |
| # isolate that. This solves the problems of (a) finding a root directory |
| # that works for all artifacts, (b) being able to create files in that |
| # directory without fear of collision, and (c) not having to isolate |
| # extraneous files. |
| isolate_tree = self.m.file.symlink_tree(root=self.m.path.mkdtemp('isolate')) |
| |
| # As part of running tests, we'll send a MinFS image over to another machine |
| # which will be declared as a block device in QEMU, at which point |
| # Fuchsia will mount it and write test output to. We choose 3.5G for the |
| # MinFS image arbitrarily, as it appears it can hold our test output |
| # comfortably without going overboard on size. |
| # |
| minfs_image_path = isolate_tree.root.join(TEST_RESULTS_MINFS_NAME) |
| self.m.minfs.create(minfs_image_path, '3584M', name='create test image') |
| |
| ensure_file = self.m.cipd.EnsureFile() |
| |
| botanist_cmd = [ |
| './botanist', |
| '-level', BOTANIST_LOG_LEVEL, |
| 'qemu', |
| '-type', '%s' % qemu_type.lower(), |
| '-qemu-dir', './%s/bin' % qemu_type.lower(), |
| '-images', IMAGES_JSON, |
| '-arch', build_artifacts.target, |
| '-minfs', TEST_RESULTS_MINFS_NAME, |
| '-pci-addr', TEST_FS_PCI_ADDR, |
| '-use-kvm' |
| ] # yapf: disable |
| |
| if secret_bytes: |
| # Wrap botanist command with secretshim which starts the secrets server |
| # before running the following command. |
| botanist_cmd = ['./secretshim'] + botanist_cmd |
| ensure_file.add_package('fuchsia/infra/secretshim/${platform}', |
| SECRETSHIM_CIPD_VERSION) |
| |
| if [v for v in ['asan', 'profile'] if v in build_artifacts.variants]: |
| botanist_cmd.extend([ |
| '-cpu', |
| str(8), |
| '-memory', |
| str(8192), |
| ]) |
| |
| # storage-full not being present signifies the exclusion of the system |
| # partition, which means `boot` (i.e. running on boot) must be used instead |
| # of `system` (i.e., running after the system partition is mounted). |
| storage_free_build = STORAGE_FULL not in build_artifacts.images |
| arg_key = 'zircon.autorun.%s' % ('boot' if storage_free_build else 'system') |
| botanist_cmd.append('%s=/boot/bin/sh+/boot/%s' % |
| (arg_key, self._get_runcmds_path_per_shard(shard))) |
| |
| isolated_hash = self._isolate_build_artifacts( |
| isolate_tree, |
| build_artifacts, |
| # To take advantage of KVM, we execute QEMU-arm tasks on arm hardware. |
| test_bot_cpu=build_artifacts.target, |
| legacy_qemu=True, |
| ) |
| |
| if qemu_type == 'AEMU': |
| self.m.emu.add_aemu_to_ensure_file(ensure_file, subdir='aemu/bin') |
| elif qemu_type == 'QEMU': |
| self.m.emu.add_qemu_to_ensure_file(ensure_file, subdir='qemu') |
| |
| env_name = '%s-%s' % (qemu_type, build_artifacts.target) |
| tags = { |
| # consumed by google3 results uploader |
| 'test_environment_name': [env_name], |
| # consumed by this recipe module |
| 'uses_legacy_qemu': ['true'] |
| } |
| request = self.m.swarming.task_request().with_name(task_name).with_tags( |
| tags) |
| return (request.with_slice(0, request[0]. |
| with_command(botanist_cmd). |
| with_isolated(isolated_hash). |
| with_dimensions(pool=pool, os='Debian', cpu=build_artifacts.target, kvm='1'). |
| with_io_timeout_secs(swarming_io_timeout_secs). |
| with_execution_timeout_secs(timeout_secs). |
| with_expiration_secs(swarming_expiration_timeout_secs). |
| with_secret_bytes(secret_bytes). |
| with_outputs([TEST_RESULTS_MINFS_NAME]). |
| with_cipd_ensure_file(ensure_file) |
| )) #yapf: disable |
| |
| def _construct_device_task_request(self, task_name, device_type, |
| build_artifacts, pool, pave, timeout_secs, |
| swarming_expiration_timeout_secs, |
| swarming_io_timeout_secs): |
| """Constructs a Swarming task request to run Fuchsia tests on a device. |
| |
| Expects the build and artifacts to be at the same place they were at |
| the end of the build. |
| |
| Args: |
| build_artifacts (BuildArtifacts): The Fuchsia build artifacts to test. |
| pool (str): Swarming pool from which the test task will be drawn. |
| pave (bool): Whether or not the build artifacts should be paved. |
| timeout_secs (int): The amount of seconds to wait for the tests to execute |
| before giving up. |
| |
| Returns: |
| An api.swarming.TaskRequest representing the swarming task request. |
| """ |
| # Construct the botanist command. |
| botanist_cmd = [ |
| './botanist', |
| '-level', BOTANIST_LOG_LEVEL, |
| 'zedboot', |
| '-config', BOTANIST_DEVICE_CONFIG, |
| '-images', IMAGES_JSON, |
| '-results-dir', self.results_dir_on_target, |
| '-out', TEST_RESULTS_ARCHIVE_NAME, |
| '-serial-log', SERIAL_LOG_NAME, |
| ] # yapf: disable |
| |
| if not pave: |
| botanist_cmd.append('-netboot') |
| |
| # storage-full not being present signifies the exclusion of the system |
| # partition, which means `boot` (i.e. running on boot) must be used instead |
| # of `system` (i.e., running after the system partition is mounted). |
| storage_free_build = STORAGE_FULL not in build_artifacts.images |
| arg_key = 'zircon.autorun.%s' % ('boot' if storage_free_build else 'system') |
| botanist_cmd.append('%s=/boot/bin/sh+/boot/%s' % |
| (arg_key, RUNCMDS_BOOTFS_PATH)) |
| |
| # To freely archive files from the build directory, the source, and those we |
| # dynamically create, we create a tree of symlinks in a fresh directory and |
| # isolate that. This solves the problems of (a) finding a root directory |
| # that works for all artifacts, (b) being able to create files in that |
| # directory without fear of collision, and (c) not having to isolate |
| # extraneous files. |
| isolate_tree = self.m.file.symlink_tree(root=self.m.path.mkdtemp('isolate')) |
| isolated_hash = self._isolate_build_artifacts(isolate_tree, build_artifacts) |
| |
| dimensions = { |
| 'pool': pool, |
| 'device_type': device_type, |
| } |
| |
| env_name = '%s-%s' % (device_type, build_artifacts.target) |
| tags = {'test_environment_name': [env_name]} |
| request = self.m.swarming.task_request().with_name(task_name).with_tags( |
| tags) |
| return (request.with_slice(0, request[0]. |
| with_command(botanist_cmd). |
| with_isolated(isolated_hash). |
| with_dimensions(**dimensions). |
| with_expiration_secs(swarming_expiration_timeout_secs). |
| with_io_timeout_secs(swarming_io_timeout_secs). |
| with_execution_timeout_secs(timeout_secs). |
| with_outputs([TEST_RESULTS_ARCHIVE_NAME, SERIAL_LOG_NAME]) |
| )) #yapf: disable |
| |
| def _extract_test_results_archive(self, |
| step_name, |
| archive_path, |
| is_minfs=False, |
| leak_to=None): |
| """Extracts test results from an archive. |
| |
| Args: |
| step_name (str): The name of the step. |
| archive_path (Path): The path to the archive which contains test results. |
| is_minfs (bool): Whether the archive in question is a minfs image |
| containing QEMU test results. If false, then the archive is assumed to |
| be a tar file. |
| leak_to (Path): Optionally leak the contents of the archive to a |
| directory. |
| |
| Returns: |
| A dict mapping a filepath relative to the root of the archive to the |
| contents of that file in the archive. |
| """ |
| if is_minfs: |
| return self.m.minfs.copy_image( |
| step_name=step_name, |
| image_path=archive_path, |
| out_dir=leak_to, |
| ).raw_io.output_dir |
| |
| return self.m.tar.extract( |
| step_name=step_name, |
| path=archive_path, |
| directory=self.m.raw_io.output_dir(leak_to=leak_to), |
| ).raw_io.output_dir |
| |
| def _decrypt_secrets(self, build_artifacts): |
| """Decrypts the secrets included in the build. |
| |
| Args: |
| build (FuchsiaBuildResults): The build for which secret specs were |
| generated. |
| |
| Returns: |
| The dictionary that maps secret spec name to the corresponding plaintext. |
| """ |
| self.m.cloudkms.ensure() |
| |
| secret_spec_dir = build_artifacts.secret_specs |
| secrets_map = {} |
| with self.m.step.nest('process secret specs'): |
| secret_spec_files = self.m.file.listdir('list', secret_spec_dir) |
| for secret_spec_file in secret_spec_files: |
| basename = self.m.path.basename(secret_spec_file) |
| # Skip the 'ciphertext' subdirectory. |
| if basename == 'ciphertext': |
| continue |
| |
| secret_name, _ = basename.split('.json', 1) |
| secret_spec = self.m.json.read('read spec for %s' % secret_name, |
| secret_spec_file).json.output |
| |
| # For each test spec file <name>.json in this directory, there is an |
| # associated ciphertext file at ciphertext/<name>.ciphertext. |
| ciphertext_file = secret_spec_dir.join('ciphertext', |
| '%s.ciphertext' % secret_name) |
| |
| key_path = secret_spec['cloudkms_key_path'] |
| secrets_map[secret_name] = self.m.cloudkms.decrypt( |
| 'decrypt secret for %s' % secret_name, key_path, ciphertext_file, |
| self.m.raw_io.output()).raw_io.output |
| return secrets_map |
| |
| def deprecated_test(self, *args, **kwargs): |
| """Tests a Fuchsia build on the specified device with retries. |
| |
| Expects the build and artifacts to be at the same place they were at |
| the end of the build. |
| |
| Args (see _launch_collect_process_funcs for other args): |
| max_attempts (int): The tests will be run repeatedly until either |
| max_attempts is hit or all tests pass. |
| |
| Returns: |
| A `FuchsiaTestResults` object corresponding to the last test attempt. |
| """ |
| # Ideally this method's arguments would look like |
| # (self, *args, max_attempts=0, **kwargs) |
| # but Python 2 doesn't allow default keyword args after variable-length |
| # positional *args :( |
| max_attempts = kwargs.pop('max_attempts', 0) |
| if not max_attempts: |
| max_attempts = self.m.swarming_retry.DEFAULT_MAX_ATTEMPTS |
| |
| launch, collect, process = self._launch_collect_process_funcs( |
| *args, **kwargs) |
| |
| test_results = None |
| final_exception = None |
| |
| # TODO(olivernewman): status='last' should cause this step to turn green as |
| # long as the *last* test attempt is green, but this isn't working, at |
| # least not for led jobs (if the first attempt fails at the second passes, |
| # the build is marked as a failure). Figure out whether this will be |
| # resolved by using luci_runner. |
| with self.m.step.nest('run tests', status='last'): |
| for i in range(max_attempts): |
| with self.m.step.nest('attempt %d' % i) as attempt_presentation: |
| task_result = collect(launch()) |
| try: |
| test_results = process( |
| task_result, presentation=attempt_presentation) |
| except self.m.step.StepFailure as e: |
| final_exception = e |
| else: |
| final_exception = None |
| if test_results.passed: |
| attempt_presentation.step_text = 'passed' |
| break |
| else: |
| failed_count = len(test_results.failed_test_outputs) |
| attempt_presentation.step_text = ('%d test(s) failed' % |
| failed_count) |
| |
| if final_exception: |
| raise final_exception # pylint: disable=raising-bad-type |
| |
| return test_results |
| |
| def deprecated_test_async(self, *args, **kwargs): |
| """Launches a swarming task to run Fuchsia tests. |
| |
| Returns: |
| A function that, when invoked, waits for the tests to complete and |
| returns a `FuchsiaTestResults` object representing the completed test. |
| """ |
| launch, collect, process = self._launch_collect_process_funcs( |
| *args, **kwargs) |
| request_metadata = launch() |
| return lambda: process(collect(request_metadata)) |
| |
| def _launch_collect_process_funcs( |
| self, |
| debug_symbol_gcs_bucket, |
| device_type, |
| orchestration_inputs, |
| overwrite_summary=True, |
| ): |
| """Returns 3-tuple of functions to launch Fuchsia tests, wait for them to |
| complete, and process the results. |
| |
| Args: |
| debug_symbol_gcs_bucket (str): A GCS bucket hosting debug symbols. |
| device_type (str): Used as swarming device_type dimension. |
| orchestration_inputs (TestOrchestrationInputs): the bits of data |
| needed to orchestrate testing. |
| overwrite_summary (bool): Whether to overwrite the name and label |
| fields in summary.json based on tests.json. This should *only* be |
| used by fuchsia_perf; do NOT add any new dependencies on this. |
| TODO(fxb/10410): remove this entirely after fuchsia_perf is dead. |
| |
| Returns: |
| A tuple of functions: |
| - `launch`, which takes no arguments and launches a swarming task to |
| run tests against the given build artifacts. Returns a |
| `TaskRequestMetadata` object. |
| - `collect`, which takes the `TaskRequestMetadata` object returned by |
| launch` (and, optionally, a `StepPresentation` object to add logs |
| to). It blocks until the task is complete and returns a swarming |
| `TaskResult`. |
| - `process`, which processes the results and returns a |
| `FuchsiaTestResults` object representing the completed tests. |
| """ |
| task = orchestration_inputs.shard_requests[0].task_request |
| # This directory gets passed into `collect()`, but unfortunately the |
| # `output_dir` attribute of the `TaskResult` returned by `collect()` is |
| # a subdirectory of this output dir (to ensure that different tasks' |
| # outputs do not collide when calling `api.swarming.collect()` with many |
| # tasks). So we make this variable in-scope for all three functions so that |
| # `process()` can use it as the output dir for the test results object. |
| output_dir = self.m.path.mkdtemp('swarming') |
| |
| def launch(): |
| with self.m.context(infra_steps=True): |
| return self.m.swarming.trigger( |
| 'trigger 1 task', [task], cancel_extra_tasks=True) |
| |
| def collect(request_metadata): |
| with self.m.context(infra_steps=True): |
| results = self.m.swarming.collect( |
| 'collect', tasks=request_metadata, output_dir=output_dir) |
| assert len(results) == 1, 'len(%s) != 1' % repr(results) |
| return results[0] |
| |
| def process(task_result, presentation=None): |
| symbolizer_output = output_dir.join(self.m.symbolize.LOG) |
| with self.m.step.nest('task results'): |
| self._analyze_task_result( |
| result=task_result, |
| debug_symbol_gcs_bucket=debug_symbol_gcs_bucket, |
| symbolize_tool=orchestration_inputs.symbolize_tool, |
| llvm_symbolizer=orchestration_inputs.llvm_symbolizer, |
| symbolizer_output=symbolizer_output, |
| presentation=presentation, |
| ) |
| |
| with self.m.context(infra_steps=True): |
| # result.outputs contains the file outputs produced by the Swarming |
| # task, returned via isolate. It's a mapping of the 'name' of the |
| # output, represented as its relative path within the isolated it |
| # was returned in, to a Path object pointing to its location on the |
| # local disk. For each of the above tasks, there should be exactly |
| # one output. |
| if SERIAL_LOG_NAME in task_result.outputs: |
| serial_log = task_result.outputs.pop(SERIAL_LOG_NAME) |
| |
| serial_log_contents = self.m.file.read_text( |
| 'read serial.txt', serial_log, test_data=[]) |
| serial_presentation = ( |
| presentation or self.m.step.active_result.presentation) |
| serial_presentation.logs[SERIAL_LOG_NAME] = ( |
| serial_log_contents.splitlines()) |
| assert len(task_result.outputs) == 1, 'len(%s) != 1' % repr( |
| task_result.outputs) |
| archive_name, archive_path = task_result.outputs.items()[0] |
| |
| test_results_dir = self.results_dir_on_host.join( |
| 'target', task_result.id) |
| # _extract_test_results_archive needs minfs_path to be set. |
| # This is kinda ugly. It'd be better to pass this in as an argument. |
| self.m.minfs.minfs_path = orchestration_inputs.minfs |
| test_results_map = self._extract_test_results_archive( |
| step_name='extract results', |
| is_minfs=self.m.emu.is_emulator_type(device_type), |
| archive_path=archive_path, |
| # Write test results to a subdirectory of |results_dir_on_host| |
| # so as not to collide with host test results. |
| leak_to=test_results_dir, |
| ) |
| |
| # Remove the archive file so it doesn't get uploaded to GCS. |
| self.m.file.remove('remove %s' % archive_name, archive_path) |
| |
| test_list = self.m.file.read_json( |
| 'read tests.json', orchestration_inputs.tests_file, test_data=[]) |
| tests = [ |
| self.m.testsharder.Test.from_jsonish(t['test']) for t in test_list |
| ] |
| |
| with self.m.step.nest('all test results'): |
| test_results = self.FuchsiaTestResults( |
| from_fuchsia=True, |
| results_dir=test_results_dir, |
| outputs=test_results_map, |
| env_name=task_result.name, |
| tests=tests, |
| legacy_qemu=self.m.emu.is_emulator_type(device_type), |
| api=self.m, |
| symbolizer_output=symbolizer_output, |
| output_dir=output_dir, |
| overwrite_summary=overwrite_summary, |
| ) |
| self._analyze_test_results(test_results, presentation=presentation) |
| return test_results |
| |
| return launch, collect, process |
| |
| def shard_requests( |
| self, |
| build_artifacts, |
| buildbucket_build, |
| per_test_timeout_secs, |
| pool, |
| swarming_expiration_timeout_secs, |
| swarming_io_timeout_secs, |
| use_runtests, |
| # TODO(garymm): Remove default value. |
| # We should always get this from a spec. |
| timeout_secs=40 * 60): |
| """Returns a _ShardTaskRequest for each shard in build_artifact.shards. |
| |
| Args: |
| build_artifacts (BuildArtifacts): The Fuchsia build artifacts to test. |
| buildbucket_build (build_pb2.Build): The buildbucket build that is going |
| to orchestrate testing. |
| per_test_timeout_secs (int): Any test that executes for longer than this |
| will be considered failed. |
| pool (str): The Swarming pool to schedule test tasks in. |
| use_runtests (bool): Whether to use runtests (or else run_test_component) |
| when executing tests on target. |
| timeout_secs (int): The amount of seconds to wait for the tests to execute |
| before giving up. |
| """ |
| |
| self.m.minfs.minfs_path = build_artifacts.minfs |
| self.m.zbi.zbi_path = build_artifacts.zbi |
| # This modifies the build artifacts so must be done before calling |
| # task_requester.request(). |
| self._install_runcmds_files( |
| build_artifacts, |
| test_in_shards=True, |
| per_test_timeout_secs=per_test_timeout_secs, |
| in_place=True) |
| task_requester = _TaskRequester( |
| self.m, |
| buildbucket_build=buildbucket_build, |
| per_test_timeout_secs=per_test_timeout_secs, |
| pool=pool, |
| swarming_expiration_timeout_secs=swarming_expiration_timeout_secs, |
| swarming_io_timeout_secs=swarming_io_timeout_secs, |
| timeout_secs=timeout_secs, |
| use_runtests=use_runtests, |
| ) |
| shard_requests = [] |
| for s in build_artifacts.shards: |
| with self.m.step.nest('shard %s' % s.name): |
| shard_requests.append(task_requester.request(s, build_artifacts)) |
| return shard_requests |
| |
| def test_in_shards(self, collect_timeout_secs, debug_symbol_gcs_bucket, |
| max_attempts, orchestration_inputs): |
| """Tests a Fuchsia build by sharding. |
| |
| Expects the build and artifacts to be at the same place they were at |
| the end of the build. |
| |
| Args: |
| debug_symbol_gcs_bucket (str): A GCS bucket hosting debug symbols. |
| max_attempts (int): Maximum number of attempts before marking a shard |
| as failed. |
| collect_timeout_secs (int): Amount of time to wait for tasks to complete. |
| |
| |
| Returns: |
| A list of FuchsiaTestResults objects representing the completed test |
| tasks that were not subject to an infra failure. |
| """ |
| # If no shards have been provided, then we have successfully run the empty |
| # set of tests. |
| if not orchestration_inputs.shard_requests: |
| return [] |
| |
| self.m.minfs.minfs_path = orchestration_inputs.minfs |
| collect_timeout = None |
| if collect_timeout_secs: |
| collect_timeout = '%ds' % collect_timeout_secs |
| |
| self._test_runner = _ShardedTestRunner( |
| self.m, |
| collect_timeout=collect_timeout, |
| debug_symbol_gcs_bucket=debug_symbol_gcs_bucket, |
| llvm_symbolizer=orchestration_inputs.llvm_symbolizer, |
| max_attempts=max_attempts, |
| swarming_output_dir=self.m.path.mkdtemp('swarming'), |
| symbolize_tool=orchestration_inputs.symbolize_tool, |
| shard_requests=orchestration_inputs.shard_requests, |
| ) |
| return self._test_runner.run_tests() |
| |
| def raise_failures(self): |
| if self._test_runner: |
| self._test_runner.raise_failures() |
| |
| def _analyze_task_result( |
| self, |
| result, |
| symbolize_tool, |
| llvm_symbolizer, |
| debug_symbol_gcs_bucket, |
| symbolizer_output=None, |
| presentation=None, |
| ): |
| """Analyzes a swarming.TaskResult and reports results as a step. |
| |
| Args: |
| task_result (api.swarming.TaskResult): The swarming task result to |
| analyze. |
| symbolize_tool (Path): The path to the symbolize tool. |
| llvm_symbolizer (Path): The path to the llvm_symbolizer tool. |
| debug_symbol_gcs_bucket (str): A GCS bucket hosting debug symbols. |
| symbolizer_output (Path|None): A path to a file to write the symbolizer's |
| stdout. |
| presentation (StepPresentation|None): The step presentation to attach |
| logs to. Defaults to `active_result.presentation`. |
| |
| Raises: |
| A StepFailure if a kernel panic is detected or an InfraFailure if the |
| swarming task failed for a different reason. |
| """ |
| presentation = presentation or self.m.step.active_result.presentation |
| if result.output: |
| # Always symbolize the result output if present in this case. |
| presentation.logs['symbolized log'] = self.m.symbolize( |
| symbolize_tool=symbolize_tool, |
| debug_symbol_gcs_bucket=debug_symbol_gcs_bucket, |
| llvm_symbolizer=llvm_symbolizer, |
| data=result.output, |
| symbolizer_output=symbolizer_output, |
| presentation=presentation) |
| |
| # A kernel panic may be present in the logs even if the task timed out, so |
| # check for that first. |
| if 'KERNEL PANIC' in result.output: |
| presentation.step_text = 'kernel panic' |
| presentation.status = self.m.step.FAILURE |
| raise self.m.step.StepFailure( |
| 'Found kernel panic. See symbolized output for details.') |
| |
| if result.isolated_outputs: |
| presentation.links['test outputs'] = result.isolated_outputs.url |
| |
| try: |
| result.analyze() |
| except self.m.step.StepFailure: |
| self._present_task_errors(result, presentation) |
| raise |
| |
| def _present_task_errors(self, task_result, presentation): |
| """Updates text and status of the given step to reflect test task errors.""" |
| # If the task is in an unknown state or completed, but the executed command returned |
| # a non-zero exit code, this points to a tooling failure. |
| if task_result.state is None or task_result.state == self.m.swarming.TaskState.COMPLETED: |
| text = 'tooling failure' # pragma: no cover |
| else: |
| text = (task_result.state.name).replace('_', ' ').lower() |
| presentation.step_text = text |
| |
| # Report timeouts as red, not purple, as it is likelier that the task is |
| # timing out due to a bug in the system under test. |
| if task_result.state == self.m.swarming.TaskState.TIMED_OUT: |
| status = self.m.step.FAILURE # pragma: no cover |
| else: |
| status = self.m.step.EXCEPTION |
| presentation.status = status |
| |
| def _create_test_list(self, shard): |
| test_locations = [] |
| for test in shard.tests: |
| test_locations.append(test.path) |
| test_list_path = self.m.path['cleanup'].join('tests-%s' % |
| self._normalize(shard.name)) |
| self.m.file.write_text( |
| name='write test list', |
| dest=test_list_path, |
| text_data='\n'.join(test_locations) + '\n', |
| ) |
| return test_list_path |
| |
| def _normalize(self, name): |
| return name.replace(' ', '_').replace('(', '').replace(')', '') |
| |
| def _get_runcmds_path_per_shard(self, shard=None): |
| if not shard: |
| return RUNCMDS_BOOTFS_PATH |
| return '%s-%s' % (RUNCMDS_BOOTFS_PATH, self._normalize(shard.name)) |
| |
| def _uses_legacy_qemu(self, shard): |
| return (not self.m.experimental.ssh_into_qemu and |
| self.m.emu.is_emulator_type(shard.device_type)) |
| |
| def _install_runcmds_files(self, |
| build_artifacts, |
| test_in_shards=False, |
| per_test_timeout_secs=None, |
| device_type=None, |
| pave=False, |
| test_cmds=None, |
| in_place=False): |
| """Creates the files used to invoke runtests on boot. |
| |
| This is only necessary for QEMU shards, which are the only shards that |
| use runcmds, and the non-sharding codepath. |
| """ |
| self.m.zbi.zbi_path = build_artifacts.zbi |
| |
| manifest = {} |
| zbi_name = 'zircon-a' |
| new_zbi_filename = None |
| new_zbi_path = None |
| if test_in_shards: |
| # if testing in shards, zbi file should be modified once for all shards in |
| # place before uploading through artifactory. |
| assert in_place |
| needs_key = False |
| for shard in build_artifacts.shards: |
| if self.m.emu.is_emulator_type(shard.device_type): |
| if self._uses_legacy_qemu(shard): |
| test_list_path = self._create_test_list(shard) |
| runtests_file_bootfs_path = 'infra/shard-%s.run' % self._normalize( |
| shard.name) |
| runcmds_path = self.m.path['cleanup'].join( |
| 'runcmds-%s' % self._normalize(shard.name)) |
| runtests_cmd_parts = [ |
| 'runtests', '-o', self.results_dir_on_target, '-f', |
| '/boot/%s' % runtests_file_bootfs_path |
| ] |
| if per_test_timeout_secs: |
| runtests_cmd_parts.extend(['-i', '%d' % per_test_timeout_secs]) |
| self._create_runcmds_script( |
| device_type=shard.device_type, |
| test_cmds=[' '.join(runtests_cmd_parts)], |
| output_path=runcmds_path, |
| ) |
| manifest[self._get_runcmds_path_per_shard(shard)] = runcmds_path |
| manifest[runtests_file_bootfs_path] = test_list_path |
| else: |
| needs_key = True |
| if needs_key: |
| manifest[AUTHORIZED_KEY_PATH] = build_artifacts.authorized_key |
| |
| else: |
| assert device_type and test_cmds |
| if not in_place: |
| new_zbi_filename = 'test-infra.zbi' |
| new_zbi_path = build_artifacts.fuchsia_build_dir.join(new_zbi_filename) |
| if not self.m.emu.is_emulator_type(device_type): |
| zbi_name = next( |
| (image['name'] |
| for image in build_artifacts.images.values() |
| if '--boot' in image.get( |
| 'bootserver_%s' % ('netboot' if not pave else 'pave'), [])), |
| None) |
| assert zbi_name, 'Could not find kernel image to boot.' |
| runcmds_path = self.m.path['cleanup'].join('runcmds') |
| self._create_runcmds_script(device_type, test_cmds, runcmds_path) |
| manifest[RUNCMDS_BOOTFS_PATH] = runcmds_path |
| |
| # Inject the runcmds script and/or authorized key into the bootfs image. |
| if manifest and zbi_name in build_artifacts.images: |
| self.m.zbi.copy_and_extend( |
| step_name='create zbi', |
| input_image=build_artifacts.fuchsia_build_dir.join( |
| build_artifacts.images[zbi_name]['path']), |
| output_image=new_zbi_path or build_artifacts.fuchsia_build_dir.join( |
| build_artifacts.images[zbi_name]['path']), |
| manifest=manifest, |
| ) |
| if new_zbi_filename: |
| build_artifacts.images[zbi_name]['path'] = new_zbi_filename |