| # Copyright 2019 The Fuchsia Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| import collections |
| import operator |
| |
| import attr |
| |
| from recipe_engine import recipe_api |
| from recipe_engine.config_types import Path |
| |
| from RECIPE_MODULES.fuchsia.testsharder import api as testsharder_api |
| |
| # List of available targets. |
| TARGETS = ['x64', 'arm64'] |
| |
| # Name of BigQuery project and table for uploading artifacts. |
| BIGQUERY_PROJECT = 'fuchsia-infra' |
| BIGQUERY_ARTIFACTS_DATASET = 'artifacts' |
| TEST_SUMMARY_JSON = 'summary.json' |
| KERNEL_LOG = 'kernel_log.txt' |
| COVARGS_LOG_LEVEL = 'debug' |
| COVARGS_OUTPUT_JSON = 'covargs-output.json' |
| |
| |
| @attr.s |
| class FuchsiaTestResults(object): |
| """Represents the result of testing of a Fuchsia build. |
| |
| Attributes: |
| from_fuchsia (bool): Whether the tests ran on Fuchsia. |
| results_dir (Path): The directory that the test results archive has |
| been unpacked into. |
| output_dir (Path): A directory containing the outputs of the swarming |
| task that ran these tests. Anything that's in this directory will be |
| uploaded to GCS when upload_results() is called. |
| outputs (dict[str]str): A mapping from of relative paths to files |
| containing stdout+stderr data to strings containing those contents. |
| swarming_task_id (str): The ID of the task that ran these tests. |
| symbolizer_json_output (Path or None): The path to the json trigger |
| information produced by the symbolizer. |
| env_name (str): The name of the task that ran these tests. |
| tests (seq(testsharder.Test)): The tests that this task was instructed |
| to run (as opposed to the results of the tests that this task *did* |
| run, which are enumerated in `summary`). |
| legacy_qemu (bool): Whether these tests were run using QEMU with |
| runtests (no ssh). |
| api (RecipeApi): The api to use for accessing recipe modules from this |
| object. |
| symbolizer_output (Path or None): The path to the symbolized log file |
| produced by running these tests. |
| overwrite_summary (bool): Whether to set the "name" and "gn_label" fields |
| in the summary.json produced by these tests using the corresponding |
| values from the input tests.json. Only affects legacy QEMU tests. |
| (Solely for backwards compatibility with fuchsia_perf.) |
| """ |
| |
| from_fuchsia = attr.ib(type=bool) |
| results_dir = attr.ib(type=Path) |
| output_dir = attr.ib(type=Path) |
| outputs = attr.ib(type=dict) |
| swarming_task_id = attr.ib(type=str) |
| symbolizer_json_output = attr.ib(type=Path) |
| _env_name = attr.ib(type=str) |
| _tests = attr.ib(type=testsharder_api.Test) |
| _legacy_qemu = attr.ib(type=bool) |
| _api = attr.ib(type=recipe_api.RecipeApi) |
| _symbolizer_output = attr.ib(None, type=Path) |
| # TODO(fxb/10410): Get rid of overwrite_summary after fuchsia_perf is dead. |
| _overwrite_summary = attr.ib(True, type=bool) |
| |
| # Set lazily by the `summary` property, not a parameter to __init__. |
| _summary = attr.ib(None, init=False) |
| |
| # Constants representing the result of running a test. These enumerate the |
| # values of the 'results' field of the entries in the summary.json file |
| # obtained from the target device. |
| _TEST_RESULT_PASS = 'PASS' |
| _TEST_RESULT_FAIL = 'FAIL' |
| |
| @property |
| def summary(self): |
| """The parsed summary file as a Dict or {} if missing.""" |
| if self._summary is None: |
| self._summary = self._parse_summary() |
| return self._summary |
| |
| @property |
| def summary_lines(self): |
| """Returns a list of the lines of the summary.json file.""" |
| return self._api.json.dumps(self.summary, indent=2).splitlines() |
| |
| @property |
| def passed(self): |
| """Whether all the tests passed.""" |
| tests = self.summary.get('tests', []) |
| return all(test['result'] == self._TEST_RESULT_PASS for test in tests) |
| |
| @property |
| def passed_test_outputs(self): |
| """All entries in |self.outputs| for tests that passed.""" |
| return self._filter_outputs_by_test_result(self._TEST_RESULT_PASS) |
| |
| @property |
| def failed_test_outputs(self): |
| """All entries in |self.outputs| for tests that failed.""" |
| return self._filter_outputs_by_test_result(self._TEST_RESULT_FAIL) |
| |
| def _filter_outputs_by_test_result(self, result): |
| """Returns all entries in |self.outputs| whose result is |result|. |
| |
| Args: |
| result (String): one of the _TEST_RESULT_* constants from this class. |
| |
| Returns: |
| A dict whose keys are paths to the files containing each test's |
| stderr+stdout data and whose values are strings containing those |
| contents. |
| """ |
| matches = collections.OrderedDict() |
| # TODO(kjharland): Sort test names first. |
| for test in self.summary.get('tests', ()): |
| if test['result'] == result: |
| # The 'output_file' field is a path to the file containing the |
| # stderr+stdout data for the test, and we inline the contents of that |
| # file as the value in the returned dict. |
| matches[test['name']] = self.outputs[test['output_file']] |
| |
| return matches |
| |
| def _parse_summary(self): |
| raw_summary = self.outputs.get(TEST_SUMMARY_JSON, '') |
| if not raw_summary: |
| return {} |
| |
| try: |
| summary = self._api.json.loads(raw_summary) |
| except ValueError as e: # pragma: no cover |
| # TODO(olivernewman): JSONDecodeError in python >=3.5 |
| raise self._api.step.StepFailure('Invalid %s: %s' % |
| (TEST_SUMMARY_JSON, e.args[0])) |
| |
| if not self._overwrite_summary or not self._legacy_qemu: |
| return summary |
| # We want all Fuchsia tests to have the package URL in the name field. But |
| # legacy QEMU tests set "name" to be the test install path (since the test |
| # list sent to QEMU is a list of paths). So overwrite the "name" field to |
| # be the package URL instead. |
| # Also set "gn_label", which doesn't automatically get passed through from |
| # tests.json. |
| # TODO(garymm,joshuaseaton): Once deprecated testing code path is removed, |
| # self._legacy_qemu will always be False and we can remove this logic. |
| tests_by_path = {test.path: test for test in self._tests} |
| for summary_test in summary['tests']: |
| path = summary_test['name'] |
| # Some zircon tests get run even though they don't show up in tests.json. |
| # TODO(olivernewman): After build unification is complete we can assume |
| # that every test in summary.json will have a corresponding entry in |
| # tests.json, so get rid of this check and update every summary test. |
| if path in tests_by_path: |
| test = tests_by_path[path] |
| assert test.package_url |
| summary_test.update( |
| name=test.package_url, |
| gn_label=test.label, |
| ) |
| return summary |
| |
| def present_tests(self, show_failures_in_red, show_passed): |
| for test, stdio in self.failed_test_outputs.iteritems(): |
| step = self._api.step('failed: %s' % test, None) |
| step.presentation.logs['stdio'] = stdio.split('\n') |
| if show_failures_in_red: |
| step.presentation.status = self._api.step.FAILURE |
| |
| # There's recipe overhead that makes step creation slow, which we mitigate |
| # by cramming all the passed tests into a single step. We also skip |
| # presenting stdio since it's generally only useful if the test failed. |
| with self._api.step.nest('all passed tests') as passed_tests_step: |
| passed_tests = self.passed_test_outputs |
| passed_tests_step.presentation.step_summary_text = ('%d passed tests' % |
| len(passed_tests)) |
| if show_passed: |
| # Start with a newline to prevent the first test from showing up on |
| # the same line as the step name. |
| passed_tests_step.presentation.step_text = ''.join( |
| '\n' + test_name for test_name in passed_tests) |
| |
| def upload_results(self, gcs_bucket, upload_to_catapult): |
| """Upload select test results (e.g., coverage data) to a given GCS bucket.""" |
| assert gcs_bucket |
| with self._api.step.nest('upload %s test results' % self._env_name): |
| if self.summary: |
| # Save the summary JSON to the test shard output dir so it gets |
| # uploaded to GCS for easy access by e.g. Dachsiaboard. |
| summary_path = self.output_dir.join(TEST_SUMMARY_JSON) |
| assert not self._api.path.exists(summary_path), ( |
| 'test output files should not be named %s' % TEST_SUMMARY_JSON) |
| self._api.file.write_json('write %s' % TEST_SUMMARY_JSON, summary_path, |
| self.summary) |
| |
| self._upload_outputs(gcs_bucket) |
| |
| if upload_to_catapult: |
| self._api.upload.test_outputs_to_catapult(self.output_dir) |
| |
| def _upload_outputs(self, gcs_bucket): |
| self._api.upload.directory_to_gcs( |
| source=self.output_dir, |
| bucket=gcs_bucket, |
| # Namespace to avoid collision across shards and attempts. |
| subpath='%s/%s' % (self._env_name, self.swarming_task_id), |
| ) |
| |
| def raise_failures(self): |
| """Raises a step failure if there were test failures.""" |
| if not self.summary: |
| # Halt with step failure if summary file is missing. |
| raise self._api.step.StepFailure( |
| 'Test summary JSON not found, see symbolized log for details') |
| |
| failed_tests = self.failed_test_outputs.keys() |
| if failed_tests: |
| # Halt with a step failure. |
| raise self._api.step.StepFailure('Test failure(s): ' + |
| ', '.join(failed_tests)) |
| |
| # Check serial log for failure messages |
| # TODO(9936): Replace with running binary tool once created. |
| fail_strings = ['DEVICE SUSPEND TIMED OUT', 'ASSERT FAILED'] |
| log_path = self.output_dir.join(self._api.testing_requests.SERIAL_LOG_NAME) |
| self._api.path.mock_add_paths(log_path) |
| if self._api.path.exists(log_path): |
| self._check_log_for_failures(log_path, fail_strings) |
| |
| def _check_log_for_failures(self, log_path, fail_strings): |
| """Checks for fail strings in log and fails accordingly.""" |
| log_name = self._api.path.basename(log_path) |
| with self._api.step.nest('check log %s:%s' % |
| (self._env_name, log_name)) as check_log_step: |
| contents = self._api.file.read_text( |
| 'read %s' % log_name, |
| log_path, |
| test_data='extra log contents', |
| ).get_result() |
| for fail_str in fail_strings: |
| if fail_str in contents: |
| check_log_step.presentation.logs[log_name] = contents.splitlines() |
| raise self._api.step.StepFailure( |
| 'Found failure string in log %s: %s' % (log_name, fail_str)) |
| |
| |
| def create_task(api, *args, **kwargs): |
| """Create a Task object. |
| |
| The base class of the class is inside the api object, so it can't be |
| top-level or otherwise defined at module load time. Defining it in this |
| function as an alternative. |
| |
| For full args list see Task.__init__ a few lines down. |
| """ |
| |
| class Task(api.swarming_retry.TriggeredTask): |
| |
| def __init__(self, api, name, request, uses_legacy_qemu, targets_fuchsia, |
| symbolize_tool, llvm_symbolizer, tests, |
| debug_symbol_gcs_bucket, *args, **kwargs): |
| super(Task, self).__init__( |
| api=api, name=name, request=request, *args, **kwargs) |
| self._uses_legacy_qemu = uses_legacy_qemu |
| self._targets_fuchsia = targets_fuchsia |
| self._symbolize_tool = symbolize_tool |
| self._llvm_symbolizer = llvm_symbolizer |
| self._tests = tests |
| self._debug_symbol_gcs_bucket = debug_symbol_gcs_bucket |
| |
| # Test shards with the 'multiplied:' prefix come from |
| # tools/integration/testsharder/shard.go in fuchsia.git. They were |
| # specifically created to run a test or set of tests many times to look |
| # for flakes. It doesn't make sense to retry these when they fail--the |
| # goal is to see if they fail not to get them to pass. |
| if name.startswith('multiplied:'): |
| self.max_attempts = 1 |
| |
| def process_result(self): |
| """Unpacks the results archive produced by a test shard.""" |
| |
| attempt = self.attempts[-1] |
| assert attempt.result |
| result = attempt.result |
| |
| if result.isolated_outputs: |
| attempt.task_outputs_link = result.isolated_outputs.url |
| |
| if result.state == self._api.swarming.TaskState.TIMED_OUT: |
| attempt.failure_reason = 'timed out' |
| |
| attempt.test_results = None |
| |
| with self._api.step.nest(result.name): |
| attempt.symbolizer_output = result.output_dir.join( |
| self._api.symbolize.LOG) |
| attempt.symbolizer_json_output = self._api.path['cleanup'].join( |
| '%s-%d-%s' % |
| (self.name, attempt.index, self._api.symbolize.OUTPUT_JSON)) |
| # Figure out what happened to the swarming task. |
| if result.output: |
| # Fuchsia crashes have to be symbolized on the host. |
| if self._targets_fuchsia: |
| attempt.logs['symbolized log'] = self._api.symbolize( |
| symbolize_tool=self._symbolize_tool, |
| debug_symbol_gcs_bucket=self._debug_symbol_gcs_bucket, |
| llvm_symbolizer=self._llvm_symbolizer, |
| data=result.output, |
| symbolizer_output=attempt.symbolizer_output, |
| json_output=attempt.symbolizer_json_output, |
| ) |
| # Non-Fuchsia should already be symbolized, and attempting to use |
| # the symbolizer may fail, if e.g. it was built on Mac and this is |
| # running on Linux. |
| else: |
| attempt.logs['symbolized log'] = result.output |
| |
| if 'KERNEL PANIC' in result.output: |
| attempt.failure_reason = 'KERNEL PANIC' # pragma: no cover |
| |
| self._check_logs_for_failures(attempt) |
| |
| if result.success: |
| self._process_outputs(attempt) |
| |
| def _process_outputs(self, attempt): |
| """Reads the test results and output files of a swarming TaskResult. |
| |
| Sets attempt.test_results if successful. |
| |
| Args: |
| attempt (swarming_retry.Attempt): the attempt to process |
| """ |
| |
| assert attempt.result |
| result = attempt.result |
| |
| # Extract results if the task was not subject to an infra failure; |
| # otherwise, a step failure will be raised on exiting the |
| # defer_results() scope. |
| attempt.test_results_archive = None |
| for relative_path, absolute_path in sorted(result.outputs.iteritems()): |
| if relative_path in [ |
| self._api.testing_requests.TEST_RESULTS_ARCHIVE_NAME, |
| self._api.testing_requests.TEST_RESULTS_MINFS_NAME |
| ]: |
| attempt.test_results_archive = absolute_path |
| |
| assert attempt.test_results_archive, ( |
| 'test archive not found amongst outputs of task %s' % result.name) |
| self._parse_test_results(attempt) |
| |
| attempt.logs[TEST_SUMMARY_JSON] = attempt.test_results.summary_lines |
| |
| # Delete the archive so it doesn't get uploaded with the other files in |
| # the swarming task's output directory. |
| self._api.file.remove( |
| 'remove %s' % self._api.path.basename(attempt.test_results_archive), |
| attempt.test_results_archive) |
| |
| def _parse_test_results(self, attempt): |
| """Parse test results from attempt into a FuchsiaTestResults object. |
| |
| Args: |
| attempt (swarming_retry.Attempt): the attempt to parse |
| """ |
| assert attempt.result |
| result = attempt.result |
| |
| results_dir = self._api.testing.results_dir_on_host.join(result.id) |
| # pylint: disable=protected-access |
| test_results_map = self._api.testing._extract_test_results_archive( |
| step_name='extract', |
| archive_path=attempt.test_results_archive, |
| leak_to=results_dir, |
| is_minfs=self._uses_legacy_qemu, |
| ) |
| # pylint: enable=protected-access |
| |
| attempt.test_results = FuchsiaTestResults( |
| from_fuchsia=self._targets_fuchsia, |
| results_dir=results_dir, |
| outputs=test_results_map, |
| swarming_task_id=attempt.task_id, |
| symbolizer_json_output=attempt.symbolizer_json_output, |
| env_name=result.name, |
| tests=self._tests, |
| legacy_qemu=self._uses_legacy_qemu, |
| api=api, |
| symbolizer_output=attempt.symbolizer_output, |
| output_dir=result.output_dir, |
| ) |
| |
| failed_tests = attempt.test_results.failed_test_outputs |
| if failed_tests: |
| attempt.failure_reason = '%d test(s) failed' % len(failed_tests) |
| |
| def _check_logs_for_failures(self, attempt): |
| """Check for failure strings in logs. |
| |
| Args: |
| attempt (swarming_retry.Attempt): the attempt to check for logs in |
| """ |
| # Check serial log for failure messages |
| # TODO(9936): Replace with running binary tool once created. |
| fail_strings = ['DEVICE SUSPEND TIMED OUT', 'ASSERT FAILED'] |
| log_path = attempt.result.output_dir.join( |
| self._api.testing_requests.SERIAL_LOG_NAME) |
| self._api.path.mock_add_paths(log_path) |
| if self._api.path.exists(log_path): |
| log_name = self._api.path.basename(log_path) |
| with self._api.step.nest('check log %s' % log_name) as presentation: |
| contents = self._api.file.read_text('read', log_path) |
| for fail_str in fail_strings: |
| if fail_str in contents: |
| presentation.logs[log_name] = contents.splitlines() |
| presentation.status = self._api.step.FAILURE |
| presentation.step_summary_text = 'found "%s"' % fail_str |
| attempt.failure_reason = ('found "%s" in %s' % |
| (fail_str, log_name)) |
| |
| def present_status(self, parent_step, attempt, **kwargs): |
| """Present an Attempt while showing progress in launch/collect step. |
| |
| Args: |
| parent_step (Step): will always be 'passed tasks' or 'failed tasks' |
| attempt (Attempt): the Attempt to present |
| """ |
| del kwargs, parent_step # Unused. |
| with api.step.nest('%s (%s)' % (self.name, attempt.name)) as presentation: |
| self._present( |
| presentation, |
| attempt, |
| show_failures_in_red=False, |
| show_passed=False) |
| |
| def present_attempt(self, _, attempt, category=None): |
| """Present an Attempt when summarizing results at the end of the run. |
| |
| Args: |
| attempt (Attempt): the Attempt to present |
| category (str): the group of tasks ('passes', 'failures', or |
| 'flakes') that this attempt should be presented under |
| """ |
| show_failures_in_red = True |
| # The 'passes' category includes all attempts of all tasks that |
| # eventually passed, so it includes some failures. Show those in |
| # green so people don't get confused and think the overall task |
| # failed. |
| # TODO(fxb/36647) after this bug is fixed show these steps in |
| # red, but show parent steps of those in green. |
| if category == 'passes': |
| show_failures_in_red = False |
| |
| name = '%s (%s)' % (attempt.name, 'pass' if attempt.success else 'fail') |
| with api.step.nest(name) as presentation: |
| if show_failures_in_red and not attempt.success: |
| presentation.status = self._api.step.FAILURE |
| self._present( |
| presentation, |
| attempt, |
| show_failures_in_red=show_failures_in_red, |
| show_passed=True, |
| ) |
| |
| def _present(self, presentation, attempt, show_failures_in_red, |
| show_passed): |
| """Present an Attempt. |
| |
| Choosing to do largely the same thing for both kinds of presentations. |
| |
| Args: |
| presentation (StepPresentation): where to present the attempt info |
| attempt (api.swarming_retry.Attempt): object to present |
| show_failures_in_red (bool): show failures in red (for final |
| 'flakes' and 'failures' steps) or not (for 'launch/collect' |
| progress and 'passes' steps) |
| show_passed (bool): show the names of passed tests (only done for |
| the end) |
| |
| Note: the 'passes' step can have failures underneath it because the |
| first attempt can fail but the retry passed. |
| """ |
| if attempt.result.duration_secs: |
| presentation.step_text = nice_time(attempt.result.duration_secs) |
| |
| presentation.presentation.links['swarming task'] = attempt.task_ui_link |
| if attempt.task_outputs_link: |
| presentation.links['task outputs'] = attempt.task_outputs_link |
| |
| if attempt.failure_reason: |
| presentation.step_summary_text = attempt.failure_reason |
| |
| for log, data in attempt.logs.iteritems(): |
| presentation.logs[log] = data |
| |
| if attempt.test_results: |
| test_results = attempt.test_results |
| |
| # Log the contents of each output file mentioned in the summary. |
| # Note this assumes the outputs are all valid UTF-8 (See fxb/9500). |
| for name, path in test_results.summary.get('outputs', {}).iteritems(): |
| presentation.logs[name] = test_results.outputs[path].split('\n') |
| |
| test_results.present_tests( |
| show_failures_in_red=show_failures_in_red, show_passed=show_passed) |
| |
| for log_name in [ |
| self._api.testing_requests.SYSLOG_NAME, |
| self._api.testing_requests.SERIAL_LOG_NAME |
| ]: |
| if log_name in attempt.result.outputs: |
| self._present_output_file( |
| name=log_name, |
| path=attempt.result.outputs[log_name], |
| step=presentation) |
| |
| def _present_output_file(self, name, path, step): |
| """Records file contents to the test results step's presentation.""" |
| contents = self._api.file.read_text( |
| 'read %s' % name, |
| path, |
| test_data='extra log contents', |
| ) |
| step.presentation.logs[name] = contents.splitlines() |
| |
| return Task(*args, api=api, **kwargs) |
| |
| |
| class _ShardedTestRunner(object): |
| """Handles running and analyzing tests that have been split into shards.""" |
| |
| def __init__(self, api, collect_timeout, debug_symbol_gcs_bucket, |
| llvm_symbolizer, max_attempts, rerun_budget_secs, |
| swarming_output_dir, symbolize_tool, shard_requests): |
| self._api = api |
| self._collect_timeout = collect_timeout |
| self._max_attempts = max_attempts |
| launch_deadline_time = None |
| self._present = True |
| if rerun_budget_secs: |
| assert max_attempts == 1, ( |
| 'If rerun_budget_secs is set, max_attempts should be set to 1') |
| # Presenting results is currently slow due to inefficiencies in |
| # the recipe engine. For builds that have rerun_budget_secs enabled, |
| # we only care about consuming the data after it's been uploaded, not |
| # from the Milo build page, so skip presentation. |
| self._present = False |
| launch_deadline_time = self._api.time.time() + rerun_budget_secs |
| self._swarming_output_dir = swarming_output_dir |
| |
| self.tasks = [] |
| for shard_request in shard_requests: |
| uses_legacy_qemu = any(tag.lower() == 'uses_legacy_qemu:true' |
| for tag in shard_request.task_request.tags) |
| targets_fuchsia = shard_request.task_request[0].dimensions.get( |
| 'os', '').lower() not in ('linux', 'mac') |
| self.tasks.append( |
| create_task( |
| api=self._api, |
| name=shard_request.task_request.name, |
| request=shard_request.task_request, |
| symbolize_tool=symbolize_tool, |
| llvm_symbolizer=llvm_symbolizer, |
| tests=shard_request.shard.tests, |
| debug_symbol_gcs_bucket=debug_symbol_gcs_bucket, |
| uses_legacy_qemu=uses_legacy_qemu, |
| targets_fuchsia=targets_fuchsia, |
| launch_deadline_time=launch_deadline_time, |
| )) |
| |
| def run_tests(self): |
| """Runs all test shards and returns a Task object for each.""" |
| # TODO(fxb/35021) use context manager. |
| self._api.swarming_retry.run_tasks( |
| tasks=self.tasks, |
| collect_output_dir=self._swarming_output_dir, |
| max_attempts=self._max_attempts, |
| collect_timeout=self._collect_timeout, |
| ) |
| |
| if self._present: |
| self._api.swarming_retry.present_tasks(tasks=self.tasks) |
| |
| return self.tasks |
| |
| def raise_failures(self): |
| self._api.swarming_retry.raise_failures(self.tasks) |
| |
| |
| class FuchsiaTestApi(recipe_api.RecipeApi): |
| """API for running tests and processing test results.""" |
| |
| FuchsiaTestResults = FuchsiaTestResults |
| |
| def __init__(self, *args, **kwargs): |
| super(FuchsiaTestApi, self).__init__(*args, **kwargs) |
| self._test_runner = None |
| |
| def _analyze_test_results(self, test_results, presentation=None): |
| """Analyzes test results represented by FuchsiaTestResults objects |
| |
| Logs individual test results in separate steps. |
| |
| Args: |
| test_results (FuchsiaTestResults): Fuchsia test result object |
| presentation (dict or None): A particular step's presentation on which to log |
| test result outputs; if not provided, that of the active result will be |
| used. |
| """ |
| if not test_results.summary: |
| return |
| presentation = presentation or self.m.step.active_result.presentation |
| |
| # Log the summary file's contents. |
| presentation.logs[TEST_SUMMARY_JSON] = test_results.summary_lines |
| |
| # Log the contents of each output file mentioned in the summary. |
| # Note this assumes the outputs are all valid UTF-8 (See fxb/9500). |
| for output_name, output_path in test_results.summary.get('outputs', |
| {}).iteritems(): |
| output_str = test_results.outputs[output_path] |
| presentation.logs[output_name] = output_str.split('\n') |
| |
| test_results.present_tests(show_failures_in_red=True, show_passed=True) |
| |
| def process_coverage(self, covargs_path, test_results, ids_txt, llvm_profdata, |
| llvm_cov, gcs_bucket): |
| output_dir = self.m.path['cleanup'].join('coverage') |
| |
| cmd = [ |
| covargs_path, |
| '-level', |
| COVARGS_LOG_LEVEL, |
| '-json-output', |
| self.m.json.output(name=COVARGS_OUTPUT_JSON), |
| '-output-dir', |
| output_dir, |
| '-llvm-profdata', |
| llvm_profdata, |
| '-llvm-cov', |
| llvm_cov, |
| '-ids', |
| ids_txt, |
| ] |
| |
| for result in test_results: |
| cmd.extend([ |
| '-summary', result.results_dir.join(TEST_SUMMARY_JSON), |
| '-symbolize-dump', result.symbolizer_json_output, |
| ]) # yapf: disable |
| |
| self.m.step('covargs', cmd) |
| |
| # TODO: move this into gsutil module/deduplicate this with other GCS logic |
| dst = 'builds/%s/coverage' % self.m.buildbucket_util.id |
| step_result = self.m.gsutil.rsync( |
| name='upload coverage', |
| src=output_dir, |
| bucket=gcs_bucket, |
| dst=dst, |
| recursive=True, |
| gzip_exts=['html'], |
| options={ |
| 'parallel_process_count': self.m.platform.cpu_count, |
| 'parallel_thread_count': 1, |
| }, |
| multithreaded=True) |
| step_result.presentation.links['index.html'] = self.m.gsutil._http_url( |
| gcs_bucket, self.m.gsutil.join(dst, 'index.html'), True) |
| |
| @property |
| def results_dir_on_host(self): |
| """The directory on host to which host and target test results will be written. |
| |
| Target test results will be copied over to this location and host test |
| results will be written here. Host and target tests on should write to |
| separate subdirectories so as not to collide. |
| """ |
| return self.m.path['cleanup'].join('test_results') |
| |
| def _extract_test_results_archive(self, |
| step_name, |
| archive_path, |
| is_minfs=False, |
| leak_to=None): |
| """Extracts test results from an archive. |
| |
| Args: |
| step_name (str): The name of the step. |
| archive_path (Path): The path to the archive which contains test results. |
| is_minfs (bool): Whether the archive in question is a minfs image |
| containing QEMU test results. If false, then the archive is assumed to |
| be a tar file. |
| leak_to (Path): Optionally leak the contents of the archive to a |
| directory. |
| |
| Returns: |
| A dict mapping a filepath relative to the root of the archive to the |
| contents of that file in the archive. |
| """ |
| if is_minfs: |
| return self.m.minfs.copy_image( |
| step_name=step_name, |
| image_path=archive_path, |
| out_dir=leak_to, |
| ).raw_io.output_dir |
| |
| return self.m.tar.extract( |
| step_name=step_name, |
| path=archive_path, |
| directory=self.m.raw_io.output_dir(leak_to=leak_to), |
| ).raw_io.output_dir |
| |
| def deprecated_test(self, *args, **kwargs): |
| """Tests a Fuchsia build on the specified device with retries. |
| |
| Expects the build and artifacts to be at the same place they were at |
| the end of the build. |
| |
| Args (see _launch_collect_process_funcs for other args): |
| max_attempts (int): The tests will be run repeatedly until either |
| max_attempts is hit or all tests pass. |
| |
| Returns: |
| A `FuchsiaTestResults` object corresponding to the last test attempt. |
| """ |
| # Ideally this method's arguments would look like |
| # (self, *args, max_attempts=0, **kwargs) |
| # but Python 2 doesn't allow default keyword args after variable-length |
| # positional *args :( |
| max_attempts = kwargs.pop('max_attempts', 0) |
| if not max_attempts: |
| max_attempts = self.m.swarming_retry.DEFAULT_MAX_ATTEMPTS |
| |
| launch, collect, process = self._launch_collect_process_funcs( |
| *args, **kwargs) |
| |
| test_results = None |
| final_exception = None |
| |
| # TODO(olivernewman): status='last' should cause this step to turn green as |
| # long as the *last* test attempt is green, but this isn't working, at |
| # least not for led jobs (if the first attempt fails at the second passes, |
| # the build is marked as a failure). Figure out whether this will be |
| # resolved by using luci_runner. |
| with self.m.step.nest('run tests', status='last'): |
| for i in range(max_attempts): |
| with self.m.step.nest('attempt %d' % i) as attempt_presentation: |
| task_result = collect(launch()) |
| try: |
| test_results = process( |
| task_result, presentation=attempt_presentation) |
| except self.m.step.StepFailure as e: |
| final_exception = e |
| else: |
| final_exception = None |
| if test_results.passed: |
| attempt_presentation.step_text = 'passed' |
| break |
| else: |
| failed_count = len(test_results.failed_test_outputs) |
| attempt_presentation.step_text = ('%d test(s) failed' % |
| failed_count) |
| |
| if final_exception: |
| raise final_exception # pylint: disable=raising-bad-type |
| |
| return test_results |
| |
| def deprecated_test_async(self, *args, **kwargs): |
| """Launches a swarming task to run Fuchsia tests. |
| |
| Returns: |
| A function that, when invoked, waits for the tests to complete and |
| returns a `FuchsiaTestResults` object representing the completed test. |
| """ |
| launch, collect, process = self._launch_collect_process_funcs( |
| *args, **kwargs) |
| request_metadata = launch() |
| return lambda: process(collect(request_metadata)) |
| |
| def _launch_collect_process_funcs( |
| self, |
| debug_symbol_gcs_bucket, |
| device_type, |
| orchestration_inputs, |
| overwrite_summary=True, |
| ): |
| """Returns 3-tuple of functions to launch Fuchsia tests, wait for them to |
| complete, and process the results. |
| |
| Args: |
| debug_symbol_gcs_bucket (str): A GCS bucket hosting debug symbols. |
| device_type (str): Used as swarming device_type dimension. |
| orchestration_inputs (TestOrchestrationInputs): the bits of data |
| needed to orchestrate testing. |
| overwrite_summary (bool): Whether to overwrite the name and label |
| fields in summary.json based on tests.json. This should *only* be |
| used by fuchsia_perf; do NOT add any new dependencies on this. |
| TODO(fxb/10410): remove this entirely after fuchsia_perf is dead. |
| |
| Returns: |
| A tuple of functions: |
| - `launch`, which takes no arguments and launches a swarming task to |
| run tests against the given build artifacts. Returns a |
| `TaskRequestMetadata` object. |
| - `collect`, which takes the `TaskRequestMetadata` object returned by |
| launch` (and, optionally, a `StepPresentation` object to add logs |
| to). It blocks until the task is complete and returns a swarming |
| `TaskResult`. |
| - `process`, which processes the results and returns a |
| `FuchsiaTestResults` object representing the completed tests. |
| """ |
| task = orchestration_inputs.shard_requests[0].task_request |
| # This directory gets passed into `collect()`, but unfortunately the |
| # `output_dir` attribute of the `TaskResult` returned by `collect()` is |
| # a subdirectory of this output dir (to ensure that different tasks' |
| # outputs do not collide when calling `api.swarming.collect()` with many |
| # tasks). So we make this variable in-scope for all three functions so that |
| # `process()` can use it as the output dir for the test results object. |
| output_dir = self.m.path.mkdtemp('swarming') |
| |
| def launch(): |
| with self.m.context(infra_steps=True): |
| return self.m.swarming.trigger( |
| 'trigger 1 task', [task], cancel_extra_tasks=True) |
| |
| def collect(request_metadata): |
| with self.m.context(infra_steps=True): |
| results = self.m.swarming.collect( |
| 'collect', tasks=request_metadata, output_dir=output_dir) |
| assert len(results) == 1, 'len(%s) != 1' % repr(results) |
| return results[0] |
| |
| def process(task_result, presentation=None): |
| symbolizer_output = output_dir.join(self.m.symbolize.LOG) |
| symbolizer_json_output = self.m.path['cleanup'].join( |
| self.m.symbolize.OUTPUT_JSON) |
| with self.m.step.nest('task results'): |
| self._analyze_task_result( |
| result=task_result, |
| debug_symbol_gcs_bucket=debug_symbol_gcs_bucket, |
| symbolize_tool=orchestration_inputs.symbolize_tool, |
| llvm_symbolizer=orchestration_inputs.llvm_symbolizer, |
| symbolizer_output=symbolizer_output, |
| symbolizer_json_output=symbolizer_json_output, |
| presentation=presentation, |
| ) |
| |
| with self.m.context(infra_steps=True): |
| # result.outputs contains the file outputs produced by the Swarming |
| # task, returned via isolate. It's a mapping of the 'name' of the |
| # output, represented as its relative path within the isolated it |
| # was returned in, to a Path object pointing to its location on the |
| # local disk. For each of the above tasks, there should be exactly |
| # one output. |
| serial_log_name = self.m.testing_requests.SERIAL_LOG_NAME |
| if serial_log_name in task_result.outputs: |
| serial_log = task_result.outputs.pop(serial_log_name) |
| |
| serial_log_contents = self.m.file.read_text( |
| 'read serial.txt', serial_log, test_data=[]) |
| serial_presentation = ( |
| presentation or self.m.step.active_result.presentation) |
| serial_presentation.logs[serial_log_name] = ( |
| serial_log_contents.splitlines()) |
| assert len(task_result.outputs) == 1, 'len(%s) != 1' % repr( |
| task_result.outputs) |
| archive_name, archive_path = task_result.outputs.items()[0] |
| |
| test_results_dir = self.results_dir_on_host.join( |
| 'target', task_result.id) |
| # _extract_test_results_archive needs minfs_path to be set. |
| # This is kinda ugly. It'd be better to pass this in as an argument. |
| self.m.minfs.minfs_path = orchestration_inputs.minfs |
| test_results_map = self._extract_test_results_archive( |
| step_name='extract results', |
| is_minfs=self.m.emu.is_emulator_type(device_type), |
| archive_path=archive_path, |
| # Write test results to a subdirectory of |results_dir_on_host| |
| # so as not to collide with host test results. |
| leak_to=test_results_dir, |
| ) |
| |
| # Remove the archive file so it doesn't get uploaded to GCS. |
| self.m.file.remove('remove %s' % archive_name, archive_path) |
| |
| test_list = self.m.file.read_json( |
| 'read tests.json', orchestration_inputs.tests_file, test_data=[]) |
| tests = [ |
| self.m.testsharder.Test.from_jsonish(t['test']) for t in test_list |
| ] |
| |
| with self.m.step.nest('all test results'): |
| test_results = self.FuchsiaTestResults( |
| from_fuchsia=True, |
| results_dir=test_results_dir, |
| outputs=test_results_map, |
| swarming_task_id=task_result.id, |
| symbolizer_json_output=symbolizer_json_output, |
| env_name=task_result.name, |
| tests=tests, |
| legacy_qemu=self.m.emu.is_emulator_type(device_type), |
| api=self.m, |
| symbolizer_output=symbolizer_output, |
| output_dir=output_dir, |
| overwrite_summary=overwrite_summary, |
| ) |
| self._analyze_test_results(test_results, presentation=presentation) |
| return test_results |
| |
| return launch, collect, process |
| |
| def final_results(self, tasks): |
| """Returns the FuchsiaTestResults from the final attempt of each Task.""" |
| return [ |
| t.attempts[-1].test_results |
| for t in tasks |
| if t.attempts[-1].test_results |
| ] |
| |
| def test_in_shards(self, |
| collect_timeout_secs, |
| debug_symbol_gcs_bucket, |
| max_attempts, |
| orchestration_inputs, |
| rerun_budget_secs=None): |
| """Tests a Fuchsia build by sharding. |
| |
| Expects the build and artifacts to be at the same place they were at |
| the end of the build. |
| |
| Args: |
| collect_timeout_secs (int): Amount of time to wait for tasks to complete. |
| debug_symbol_gcs_bucket (str): A GCS bucket hosting debug symbols. |
| max_attempts (int): Maximum number of attempts before marking a shard |
| as failed. |
| orchestration_inputs (TestOrchestrationInputs): Build artifacts needed |
| for testing. |
| rerun_budget_secs (int): Will run tests repeatedly until this budget is consumed. |
| If set, max_attempts is ignored. |
| |
| |
| Returns: |
| A list of FuchsiaTestResults objects representing the completed test |
| tasks that were not subject to an infra failure. |
| """ |
| # If no shards have been provided, then we have successfully run the empty |
| # set of tests. |
| if not orchestration_inputs.shard_requests: |
| return [] |
| |
| self.m.minfs.minfs_path = orchestration_inputs.minfs |
| collect_timeout = None |
| if collect_timeout_secs: |
| collect_timeout = '%ds' % collect_timeout_secs |
| |
| self._test_runner = _ShardedTestRunner( |
| self.m, |
| collect_timeout=collect_timeout, |
| debug_symbol_gcs_bucket=debug_symbol_gcs_bucket, |
| llvm_symbolizer=orchestration_inputs.llvm_symbolizer, |
| max_attempts=max_attempts, |
| rerun_budget_secs=rerun_budget_secs, |
| swarming_output_dir=self.m.path.mkdtemp('swarming'), |
| symbolize_tool=orchestration_inputs.symbolize_tool, |
| shard_requests=orchestration_inputs.shard_requests, |
| ) |
| return self._test_runner.run_tests() |
| |
| def raise_failures(self): |
| if self._test_runner: |
| self._test_runner.raise_failures() |
| |
| def _analyze_task_result( |
| self, |
| result, |
| symbolize_tool, |
| llvm_symbolizer, |
| debug_symbol_gcs_bucket, |
| symbolizer_output, |
| symbolizer_json_output, |
| presentation=None, |
| ): |
| """Analyzes a swarming.TaskResult and reports results as a step. |
| |
| Args: |
| result (api.swarming.TaskResult): The swarming task result to analyze. |
| symbolize_tool (Path): The path to the symbolize tool. |
| llvm_symbolizer (Path): The path to the llvm_symbolizer tool. |
| debug_symbol_gcs_bucket (str): A GCS bucket hosting debug symbols. |
| symbolizer_output (Path): A path to a file to write the symbolizer's |
| stdout. |
| symbolizer_json_output (Path): A path to a file to write the |
| symbolizer's JSON output. |
| presentation (StepPresentation or None): The step presentation to |
| attach logs to. Defaults to `active_result.presentation`. |
| |
| Raises: |
| A StepFailure if a kernel panic is detected or an InfraFailure if the |
| swarming task failed for a different reason. |
| """ |
| presentation = presentation or self.m.step.active_result.presentation |
| if result.output: |
| # Always symbolize the result output if present in this case. |
| presentation.logs['symbolized log'] = self.m.symbolize( |
| symbolize_tool=symbolize_tool, |
| debug_symbol_gcs_bucket=debug_symbol_gcs_bucket, |
| llvm_symbolizer=llvm_symbolizer, |
| data=result.output, |
| symbolizer_output=symbolizer_output, |
| json_output=symbolizer_json_output, |
| presentation=presentation) |
| |
| # A kernel panic may be present in the logs even if the task timed out, so |
| # check for that first. |
| if 'KERNEL PANIC' in result.output: |
| presentation.step_text = 'kernel panic' |
| presentation.status = self.m.step.FAILURE |
| raise self.m.step.StepFailure( |
| 'Found kernel panic. See symbolized output for details.') |
| |
| if result.isolated_outputs: |
| presentation.links['test outputs'] = result.isolated_outputs.url |
| |
| try: |
| result.analyze() |
| except self.m.step.StepFailure: |
| self._present_task_errors(result, presentation) |
| raise |
| |
| def _present_task_errors(self, task_result, presentation): |
| """Updates text and status of the given step to reflect test task errors.""" |
| # If the task is in an unknown state or completed, but the executed command returned |
| # a non-zero exit code, this points to a tooling failure. |
| if task_result.state is None or task_result.state == self.m.swarming.TaskState.COMPLETED: |
| text = 'tooling failure' # pragma: no cover |
| else: |
| text = (task_result.state.name).replace('_', ' ').lower() |
| presentation.step_text = text |
| |
| # Report timeouts as red, not purple, as it is likelier that the task is |
| # timing out due to a bug in the system under test. |
| if task_result.state == self.m.swarming.TaskState.TIMED_OUT: |
| status = self.m.step.FAILURE # pragma: no cover |
| else: |
| status = self.m.step.EXCEPTION |
| presentation.status = status |
| |
| |
| def product(nums): |
| return reduce(operator.mul, nums) |
| |
| |
| def nice_time(seconds): |
| """Returns a human-readable duration given a number of seconds. |
| |
| For example: 3605 seconds => '1h 5s' |
| """ |
| units = [ |
| ('d', 24), |
| ('h', 60), |
| ('m', 60), |
| ('s', 1), |
| ] |
| |
| result_parts = [] |
| divisor = product(unit_size for _, unit_size in units) |
| for unit_abbrev, unit_size in units: |
| count, seconds = divmod(seconds, divisor) |
| if count > 0: |
| result_parts.append('%d%s' % (count, unit_abbrev)) |
| divisor /= unit_size |
| |
| return ' '.join(result_parts) |