| # Copyright 2019 The Fuchsia Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| import attr |
| import collections |
| |
| from recipe_engine import recipe_api |
| from recipe_engine.config_types import Path |
| |
| from RECIPE_MODULES.fuchsia.swarming_retry import api as swarming_retry_api |
| from RECIPE_MODULES.fuchsia.utils import cached_property, nice_duration, pluralize |
| |
| # Name of the file produced by a testing task that contains results and outputs |
| # of the tests that were run. |
| TEST_SUMMARY_JSON = "summary.json" |
| |
| |
| @attr.s |
| class FuchsiaTestResults(object): |
| """Represents the result of testing of a Fuchsia build. |
| |
| Attributes: |
| from_fuchsia (bool): Whether the tests ran on Fuchsia. |
| results_dir (Path): The directory that the test results have been moved |
| into (out of task_output_dir). |
| task_output_dir (Path): A directory containing the outputs of the swarming |
| task that ran these tests. Anything that's in this directory will be |
| uploaded to GCS when upload_results() is called. |
| swarming_bot_id (str): The ID of the swarming bot that ran the task. |
| swarming_task_id (str): The ID of the task that ran these tests. |
| env_name (str): The name of the environment that these tags ran in. |
| shard_name (str): The name of the task that ran these tests. |
| api (RecipeApi): The api to use for accessing recipe modules from this |
| object. |
| summary_bytes (bytes): The contents of TEST_SUMMARY_JSON. |
| is_multiplied (bool): Whether the test results are for a multiplier |
| shard. |
| """ |
| |
| from_fuchsia = attr.ib(type=bool) |
| results_dir = attr.ib(type=Path) |
| task_output_dir = attr.ib(type=Path) |
| swarming_bot_id = attr.ib(type=str) |
| swarming_task_id = attr.ib(type=str) |
| env_name = attr.ib(type=str) |
| _shard_name = attr.ib(type=str) |
| _api = attr.ib(type=recipe_api.RecipeApi) |
| _summary_bytes = attr.ib(type=bytes) |
| _is_multiplied = attr.ib(type=bool) |
| # A list of files to upload to resultdb as invocation-level artifacts. |
| # These should be relative paths to the results_dir. |
| _resultdb_artifacts = attr.ib(type=list) |
| |
| # A mapping of relative paths to files in the results_dir containing |
| # stdout+stderr data to strings containing those contents. |
| _outputs = attr.ib(factory=dict, init=False) |
| |
| # True if this set of TestResults corresponds to a skipped test shard. |
| skipped = attr.ib(type=bool, default=False) |
| |
| # Set lazily by `_separate_tests_by_result()`, not a parameter to __init__. |
| _pass_name_to_tests = attr.ib(factory=dict, init=False) |
| _fail_name_to_tests = attr.ib(factory=dict, init=False) |
| |
| # Constants representing the result of running a test. These enumerate the |
| # values of the 'results' field of the entries in the summary.json file |
| # obtained from the target device. Any other value is considered to |
| # represent a failure. |
| _TEST_RESULT_PASS = "PASS" |
| _TEST_RESULT_FAIL = "FAIL" |
| _TEST_RESULT_TIMEOUT = "ABORT" |
| _TEST_RESULT_SKIP = "SKIP" |
| |
| @classmethod |
| def from_skipped_shard(cls, api, shard): |
| """Generate a FuchsiaTestResults from a skipped test shard. |
| |
| Args: |
| api (RecipeApi): The api to use for accessing recipe modules from this |
| object. |
| shard (orchestration_inputs.SkippedShard): The test shard to generate results for. |
| |
| Returns: |
| A FuchsiaTestResults object containing the results of the shard if |
| the shard contains a summary, None if it does not. |
| """ |
| return cls( |
| from_fuchsia=shard.from_fuchsia, |
| results_dir="", |
| task_output_dir="", |
| swarming_bot_id="", |
| swarming_task_id="", |
| env_name=shard.env_name, |
| shard_name=shard.name, |
| api=api, |
| skipped=True, |
| summary_bytes=api.json.dumps(shard.summary), |
| is_multiplied=False, # A skipped shard is never multiplied. |
| resultdb_artifacts=[], |
| ) |
| |
| @cached_property |
| def summary(self): |
| """The parsed summary file as a Dict or {} if missing.""" |
| if not self._summary_bytes: |
| return {} |
| |
| try: |
| return self._api.json.loads(self._summary_bytes) |
| except ValueError as e: # pragma: no cover |
| # TODO(olivernewman): JSONDecodeError in python >=3.5 |
| raise self._api.step.StepFailure( |
| "Invalid %s: %s" % (TEST_SUMMARY_JSON, e.args[0]) |
| ) |
| |
| @property |
| def passed_tests(self): |
| """All entries in |self._outputs| for tests that passed.""" |
| self._separate_tests_by_result() |
| passed_tests = [] |
| for tests in self._pass_name_to_tests.values(): |
| passed_tests.extend(tests) |
| return passed_tests |
| |
| @property |
| def failed_tests(self): |
| """All entries in |self._outputs| for tests that failed.""" |
| self._separate_tests_by_result() |
| failed_tests = [] |
| for name, tests in self._fail_name_to_tests.items(): |
| if self._is_multiplied or name not in self._pass_name_to_tests: |
| failed_tests.extend(tests) |
| return failed_tests |
| |
| @property |
| def flaked_tests(self): |
| """All entries in |self._outputs| for tests that flaked.""" |
| self._separate_tests_by_result() |
| flaked_tests = [] |
| for name, tests in self._fail_name_to_tests.items(): |
| if not self._is_multiplied and name in self._pass_name_to_tests: |
| flaked_tests.extend(tests) |
| return flaked_tests |
| |
| @property |
| def snapshot_path(self): |
| """Returns the snapshot.zip path.""" |
| snapshot_zip = self.results_dir.join(self._api.testing_requests.SNAPSHOT_NAME) |
| if self._api.path.exists(snapshot_zip): |
| return snapshot_zip |
| return None # pragma: no cover |
| |
| @property |
| def gcs_namespace_path(self): |
| """Subpath to which test output files will be uploaded in GCS.""" |
| return "%s/%s" % (self._shard_name, self.swarming_task_id) |
| |
| def _separate_tests_by_result(self): |
| """Separates entries in |self._outputs| into maps based on test result. |
| |
| Passed tests will be stored in self._pass_name_to_tests and failed |
| tests will be stored in self._fail_name_to_tests. |
| """ |
| if not self._pass_name_to_tests or not self._fail_name_to_tests: |
| # Using OrderedDict() will keep the tests in the same order as they |
| # ran in. This doesn't really matter for displaying results, but it |
| # keeps the expectation files from changing by maintaining the same |
| # order of tests. |
| self._pass_name_to_tests = collections.OrderedDict() |
| self._fail_name_to_tests = collections.OrderedDict() |
| for test in self.summary.get("tests") or (): |
| if test["result"] == self._TEST_RESULT_PASS: |
| tests = self._pass_name_to_tests.get(test["name"], []) |
| tests.append(test) |
| self._pass_name_to_tests[test["name"]] = tests |
| else: |
| tests = self._fail_name_to_tests.get(test["name"], []) |
| tests.append(test) |
| self._fail_name_to_tests[test["name"]] = tests |
| |
| def present_tests(self, show_failures_in_red, show_passed): |
| def show_failed_tests(tests): |
| for test in tests: |
| if test["result"] == self._TEST_RESULT_TIMEOUT: |
| status = "timed out" |
| elif test["result"] == self._TEST_RESULT_SKIP: |
| status = "skipped" |
| else: |
| status = "failed" |
| |
| step = self._api.step.empty("%s: %s" % (status, test["name"])) |
| for output_file in test["output_files"]: |
| # Failed test output files should have been read into |
| # self._outputs by Task.process_result(). |
| output = self._outputs[output_file] |
| log_name = self._api.path.basename(output_file) |
| if not output.strip(): |
| log_name += " (empty)" |
| step.presentation.logs[log_name] = output.splitlines() |
| |
| # If present, show the GN label to aid debugging. |
| label = test.get("gn_label") |
| if label: |
| step.presentation.step_summary_text = label |
| if show_failures_in_red: |
| step.presentation.status = self._api.step.FAILURE |
| |
| show_failed_tests(self.failed_tests) |
| show_failed_tests(self.flaked_tests) |
| |
| # There's recipe overhead that makes step creation slow, which we mitigate |
| # by cramming all the passed tests into a single step. We also skip |
| # presenting stdio since it's generally only useful if the test failed. |
| with self._api.step.nest("all passed tests") as passed_tests_step: |
| passed_tests_step.presentation.step_summary_text = pluralize( |
| "passed test", self.passed_tests |
| ) |
| if show_passed: |
| passed_tests_step.presentation.step_text = self._passed_tests_text( |
| self.passed_tests |
| ) |
| |
| def _passed_tests_text(self, tests): |
| """Format test names for presentation in Milo. |
| |
| If a single test ran many times in a row, we'll compress those runs |
| into a single line with a run count, to improve readability. |
| """ |
| test_names = [t["name"] for t in tests] |
| lines = [] |
| current_test_count = 0 |
| for i, test_name in enumerate(test_names): |
| current_test_count += 1 |
| # If we've reached the last test OR the next test has a different |
| # name then we've reached the end of a repeated streak of running a |
| # single test, so add a line of text for that streak. |
| if i + 1 >= len(test_names) or test_names[i + 1] != test_name: |
| # Surround test names in backticks so that Milo's markdown |
| # renderer doesn't try to parse them as markdown. |
| line = "`%s`" % test_name |
| if current_test_count > 1: |
| line += " (%d runs)" % current_test_count |
| lines.append(line) |
| current_test_count = 0 |
| |
| # Start with a newline to prevent the first test from showing up on |
| # the same line as the step name. |
| return "".join("\n" + l for l in lines) |
| |
| def get_output(self, output_path): |
| """Returns the contents of the file at output_path. |
| |
| The output_path should be a relative path to the results_dir. |
| """ |
| if output_path not in self._outputs: |
| self._outputs[output_path] = self._api.file.read_text( |
| "read %s" % output_path, |
| self.results_dir.join(output_path), |
| # We're returning the log contents to be attached to other |
| # steps, so no need to log them here. |
| include_log=False, |
| ) |
| return self._outputs[output_path] |
| |
| def upload_results( |
| self, |
| gcs_bucket, |
| upload_to_catapult, |
| orchestration_inputs, |
| resultdb_base_variant, |
| resultdb_tags, |
| ): |
| """Upload select test results (e.g., coverage data) to a given GCS bucket.""" |
| assert gcs_bucket |
| with self._api.context(infra_steps=True), self._api.step.nest( |
| "upload %s test results" % self._shard_name |
| ): |
| upload_dir = self.task_output_dir or self._api.path.mkdtemp( |
| self._shard_name |
| ) |
| if self.summary: |
| # Save the summary JSON to the test shard output dir so it gets |
| # uploaded to GCS for easy access by e.g. Dachsiaboard. |
| summary_path = upload_dir.join(TEST_SUMMARY_JSON) |
| assert not self._api.path.exists(summary_path), ( |
| "test output files should not be named %s" % TEST_SUMMARY_JSON |
| ) |
| self._api.file.write_json( |
| "write %s" % TEST_SUMMARY_JSON, summary_path, self.summary |
| ) |
| if self._api.resultdb.enabled and orchestration_inputs.resultdb: |
| self._upload_to_resultdb( |
| summary_path, |
| orchestration_inputs, |
| resultdb_base_variant, |
| resultdb_tags, |
| ) |
| snapshot_zip = self.results_dir.join( |
| self._api.testing_requests.SNAPSHOT_NAME |
| ) |
| self._api.path.mock_add_paths(snapshot_zip) |
| if self._api.path.exists(snapshot_zip): |
| self._api.file.move( |
| "move snapshot to output dir", |
| snapshot_zip, |
| upload_dir.join(self._api.testing_requests.SNAPSHOT_NAME), |
| ) |
| |
| # If an output was important enough to present (meaning it's in _outputs), |
| # we should upload it so it can be shown in Sponge, etc. |
| with self._api.step.nest("move test outputs"): |
| for output in sorted(self._outputs): |
| dest = upload_dir.join(output) |
| self._api.file.ensure_directory( |
| "mkdir", self._api.path.dirname(dest) |
| ) |
| self._api.file.move( |
| "mv", |
| self.results_dir.join(output), |
| dest, |
| ) |
| self._upload_outputs(gcs_bucket, upload_dir) |
| |
| if upload_to_catapult and self.results_dir: |
| self._api.catapult.upload_test_outputs(self.results_dir) |
| |
| def _upload_to_resultdb( |
| self, summary_path, orchestration_inputs, base_variant, tags |
| ): |
| cmd = [orchestration_inputs.resultdb] |
| base_variant = base_variant.copy() |
| base_variant.update( |
| { |
| "bucket": self._api.buildbucket.build.builder.bucket, |
| "builder": self._api.buildbucket.build.builder.builder, |
| } |
| ) |
| if self.results_dir: |
| cmd.append("--output=%s" % self.results_dir) |
| cmd.append("--summary=%s" % summary_path) |
| for tag in tags: |
| cmd.append("--tag=%s" % tag) |
| for artifact in self._resultdb_artifacts: |
| cmd.append("--invocation-artifact=%s" % artifact) |
| try: |
| self._api.step( |
| "resultdb", |
| self._api.resultdb.wrap(cmd, base_variant=base_variant, include=True), |
| ) |
| except self._api.step.InfraFailure: # pragma: no cover # pylint: disable=try-except-raise |
| # When there is a reliability issues with ResultDB, replace the |
| # following statement from 'raise' to 'pass' so the step stays an |
| # infra failure but the build is not affected: |
| raise |
| |
| def _upload_outputs(self, gcs_bucket, upload_dir): |
| self._api.gsutil.upload_namespaced_directory( |
| source=upload_dir, |
| bucket=gcs_bucket, |
| # Namespace to avoid collision across shards and attempts. |
| subpath=self.gcs_namespace_path, |
| # Internal gsutil retries of transient failures can cause rsync to |
| # fail when run by a service account that doesn't have permission |
| # to delete objects. |
| rsync=False, |
| ) |
| |
| def run_triage(self, triage_tool, triage_sources): |
| snapshot_zip = self.results_dir.join(self._api.testing_requests.SNAPSHOT_NAME) |
| self._api.path.mock_add_paths(snapshot_zip) |
| if not self._api.path.exists(snapshot_zip): # pragma: no cover |
| return |
| with self._api.step.nest("run triage"): |
| snapshot_dir = self._api.path.mkdtemp("snapshot") |
| self._api.tar.extract( |
| "extract snapshot", path=snapshot_zip, directory=snapshot_dir |
| ) |
| cmd = ( |
| [triage_tool] |
| + ["--config=%s" % f for f in triage_sources] |
| + ["--data", snapshot_dir] |
| ) |
| output_file = "triage_output" |
| self._resultdb_artifacts.append(output_file) |
| return self._api.step( |
| "triage", |
| cmd, |
| # The triage tool returns a non-zero exit code when it detects |
| # any violations or errors. We don't want to fail in this case, |
| # but just write the output to a file for analysis. |
| ok_ret="any", |
| stdout=self._api.raw_io.output_text( |
| # Write to the results_dir so it gets uploaded to ResultDB. |
| leak_to=self.results_dir.join(output_file) |
| ), |
| step_test_data=lambda: self._api.raw_io.test_api.stream_output_text( |
| "triage info" |
| ), |
| ) |
| |
| |
| class Task(swarming_retry_api.TriggeredTask): |
| """Task processes and presents results of testing Swarming tasks.""" |
| |
| def __init__( |
| self, |
| request, |
| api, |
| orchestration_inputs, |
| debug_symbol_url, |
| retry_task_on_test_failure=False, |
| **kwargs |
| ): |
| super().__init__(request, api, **kwargs) |
| self._orchestration_inputs = orchestration_inputs |
| self._debug_symbol_url = debug_symbol_url |
| self._retry_task_on_test_failure = retry_task_on_test_failure |
| # Test shards with the 'multiplied:' prefix come from |
| # tools/integration/testsharder/shard.go in fuchsia.git. They were |
| # specifically created to run a test or set of tests many times to look |
| # for flakes. It doesn't make sense to retry these when they fail--the |
| # goal is to see if they fail not to get them to pass. |
| if self.name.startswith("multiplied:"): |
| self.max_attempts = 1 |
| self._is_multiplied = True |
| else: |
| self._is_multiplied = False |
| # Abort on test failures that are likely due to the change being tested |
| # to decrease true rejection time. |
| self.abort_early_if_failed = ( |
| self.name.startswith("affected:") or self._is_multiplied |
| ) |
| |
| def launch(self, priority_boost_amount): |
| """Launches a swarming task attempt. |
| |
| It also initializes the test_results field on the returned |
| swarming_retry.Attempt to None so that the field will be set even if |
| the attempt is not completed or the result processing fails. |
| """ |
| attempt = super().launch(priority_boost_amount) |
| attempt.test_results = None |
| return attempt |
| |
| def process_result(self, attempt): |
| """Processes the results produced by a test shard.""" |
| assert attempt.result |
| result = attempt.result |
| |
| if result.cas_outputs: |
| attempt.task_outputs_link = result.cas_outputs.url |
| |
| @attr.s |
| class _LogToProcess(object): |
| name = attr.ib(type=str) |
| data = attr.ib(type=str) |
| path = attr.ib(type=Path) |
| |
| swarming_summary_path = self._api.path.mkstemp(result.id) |
| self._api.file.write_raw( |
| "write swarming summary JSON", |
| swarming_summary_path, |
| self._api.json.dumps(result.raw), |
| ) |
| |
| tefmocheck_cmd = [ |
| self._orchestration_inputs.tefmocheck, |
| "-swarming-summary-json", |
| swarming_summary_path, |
| "-swarming-host", |
| attempt.host, |
| ] |
| |
| to_process = [ |
| _LogToProcess( |
| name=self._api.testing_requests.TEST_TASK_OUTPUT_FILE, |
| path=result.output_dir.join( |
| self._api.testing_requests.TEST_TASK_OUTPUT_FILE |
| ), |
| data=result.output, |
| ) |
| ] |
| |
| # Flag names from |
| # https://fuchsia.googlesource.com/fuchsia/+/main/tools/testing/tefmocheck/cmd/main.go |
| logs_to_tefmo_flags = { |
| self._api.testing_requests.TEST_TASK_OUTPUT_FILE: "-swarming-output", |
| } |
| |
| for log_dir, flag_name in ( |
| (self._api.testing_requests.SYSLOG_DIR, "-syslog"), |
| (self._api.testing_requests.SERIAL_LOG_DIR, "-serial-log"), |
| ): |
| for log_name, log_path in result.outputs.items(): |
| if not log_name.startswith(log_dir): |
| continue |
| self._api.path.mock_add_paths(log_path) |
| log_basename = self._api.path.basename(log_path) |
| logs_to_tefmo_flags[log_basename] = flag_name |
| to_process.append( |
| _LogToProcess( |
| name=log_basename, |
| path=log_path, |
| data=self._api.file.read_text( |
| "read %s" % log_basename, |
| log_path, |
| test_data="extra log contents", |
| ), |
| ) |
| ) |
| |
| for log in to_process: |
| tefmocheck_flag_name = logs_to_tefmo_flags.get(log.name) |
| if tefmocheck_flag_name: |
| tefmocheck_cmd.extend((tefmocheck_flag_name, log.path)) |
| # Symbolize and overwrite so that tefmocheck and logdog see the |
| # symbolized versions. |
| if self._api.testing.task_targets_fuchsia(result): |
| # Non-Fuchsia should already be symbolized, and attempting to use |
| # the symbolizer may fail, if e.g. it was built on Mac and this is |
| # running on Linux. |
| attempt.logs[log.name] = self._api.symbolize( |
| symbolizer_tool=self._orchestration_inputs.symbolizer_tool, |
| data=log.data, |
| name="symbolize %s" % log.name, |
| debug_symbol_url=self._debug_symbol_url, |
| symbolizer_output=log.path, |
| ) |
| else: |
| attempt.logs[log.name] = log.data |
| if not self._api.path.exists(log.path): |
| # Ensure it exists on file system even if we didn't symbolize |
| # so that it gets uploaded to GCS later. |
| self._api.file.write_raw("write %s" % log.name, log.path, log.data) |
| self._api.path.mock_add_paths(log.path) |
| |
| test_results_dir = self._api.testing.extract_test_results( |
| step_name="extract results", task_result=result |
| ) |
| |
| # Copy syslogs, serial logs and infra_and_test_std_and_klog.txt to test_results_dir. |
| # We look for artifacts from test_results_dir to upload to ResultDB. |
| for log in to_process: |
| self._api.file.copy( |
| "copy %s" % log.name, log.path, test_results_dir.join(log.name) |
| ) |
| |
| tefmocheck_cmd.extend(("-outputs-dir", test_results_dir)) |
| summary_json_path = test_results_dir.join(TEST_SUMMARY_JSON) |
| |
| if self._api.path.exists(summary_json_path): |
| tefmocheck_cmd.extend(("-test-summary-json", summary_json_path)) |
| |
| tefmocheck_cmd.extend( |
| ("-json-output", self._api.raw_io.output_text(suffix="json")) |
| ) |
| summary_bytes = self._api.step("tefmocheck", tefmocheck_cmd).raw_io.output_text |
| attempt.logs[TEST_SUMMARY_JSON] = summary_bytes |
| |
| tags = {} |
| for tag in self._request.tags: |
| k, v = tag.split(":", 1) |
| tags[k] = v |
| |
| test_results = FuchsiaTestResults( |
| from_fuchsia=self._api.testing.task_targets_fuchsia(result), |
| results_dir=test_results_dir, |
| swarming_task_id=result.id, |
| swarming_bot_id=result.bot_id, |
| shard_name=result.name, |
| env_name=tags[self._api.testing_requests.TEST_ENVIRONMENT_TAG_NAME], |
| api=self._api, |
| task_output_dir=result.output_dir, |
| summary_bytes=summary_bytes, |
| is_multiplied=self._is_multiplied, |
| resultdb_artifacts=[log.name for log in to_process], |
| ) |
| attempt.test_results = test_results |
| flaked_tests = test_results.flaked_tests |
| failed_tests = test_results.failed_tests |
| if not result.success: |
| if result.state == self._api.swarming.TaskState.COMPLETED: |
| # The task will have state COMPLETED even if it had a non-zero |
| # exit code, so show a more helpful message than "completed". |
| attempt.failure_reason = "task failed" |
| else: |
| # "BOT_DIED" -> "bot died" |
| attempt.failure_reason = result.state.name.lower().replace("_", " ") |
| else: |
| if flaked_tests: |
| attempt.has_flakes = True |
| if failed_tests: |
| attempt.failure_reason = "%s failed" % pluralize("test", failed_tests) |
| # Only retry if all the failures are testing_failure_mode checks. |
| # Sometimes, a problem can occur in the system that causes all tests |
| # to fail after that point, in which case we should also retry. |
| # The number of failed tests to check for in this case should match |
| # that of the mass_test_failure_check here: |
| # https://fuchsia.googlesource.com/fuchsia/+/e2f3065a52c5bd69090d0c2dcd8cbc76a75d601c/tools/testing/tefmocheck/cmd/main.go#125 |
| should_retry = ( |
| all(t.get("is_testing_failure_mode") for t in failed_tests) |
| or len(failed_tests) > 5 |
| ) |
| if not self._retry_task_on_test_failure and not should_retry: |
| self.max_attempts = 1 |
| # Log the contents of each output file mentioned in the summary. |
| # Note this assumes the outputs are all valid UTF-8 (See fxb/9500). |
| for output_name, output_path in test_results.summary.get("outputs", {}).items(): |
| attempt.logs[output_name] = test_results.get_output(output_path) |
| for tests_by_name in [failed_tests] + [flaked_tests]: |
| for test in tests_by_name: |
| for output_file in test["output_files"]: |
| # Proactively read the log so that we can use an empty step |
| # that doesn't have "execution details" or "stdout" logs to |
| # present the failed test more clearly. |
| test_results.get_output(output_file) |
| |
| triage_step = test_results.run_triage( |
| self._orchestration_inputs.triage, |
| self._orchestration_inputs.triage_sources, |
| ) |
| if triage_step: |
| attempt.logs["triage_output"] = triage_step.stdout.splitlines() |
| |
| def present_attempt(self, _, attempt, category=None): |
| """Present an Attempt when summarizing results at the end of the run. |
| |
| Args: |
| attempt (Attempt): the Attempt to present |
| category (str): the group of tasks ('passes', 'failures', or |
| 'flakes') that this attempt should be presented under |
| """ |
| show_failures_in_red = True |
| # The 'passes' category includes all attempts of all tasks that |
| # eventually passed, so it includes some failures. Show those in |
| # green so people don't get confused and think the overall task |
| # failed. |
| # TODO(fxb/36647) after this bug is fixed show these steps in |
| # red, but show parent steps of those in green. |
| if category == "passes" or category == "incomplete": |
| show_failures_in_red = False |
| |
| attempt_status = "fail" |
| if attempt.success: |
| if attempt.has_flakes: |
| attempt_status = "flake" |
| else: |
| attempt_status = "pass" |
| name = "%s (%s)" % (attempt.name, attempt_status) |
| with self._api.step.nest(name) as presentation: |
| if show_failures_in_red and (not attempt.success or attempt.has_flakes): |
| presentation.status = self._api.step.FAILURE |
| |
| if attempt.result and attempt.result.duration_secs: |
| presentation.step_text = nice_duration(attempt.result.duration_secs) |
| |
| presentation.links["swarming task"] = attempt.task_ui_link |
| if attempt.task_outputs_link: |
| presentation.links["task outputs"] = attempt.task_outputs_link |
| if attempt.bot_ui_link: |
| presentation.links["bot %s" % attempt.bot_id] = attempt.bot_ui_link |
| |
| if attempt.test_results and attempt.test_results.snapshot_path: |
| snapshot_gcs_url = self._api.gsutil.http_url( |
| # TODO(olivernewman): Plumb through `artifact_gcs_bucket` |
| # instead of assuming the snapshot is uploaded to the same |
| # bucket as `api.artifacts.gcs_bucket`. |
| bucket=self._api.artifacts.gcs_bucket, |
| dest=self._api.gsutil.namespaced_gcs_path( |
| "%s/%s" |
| % ( |
| attempt.test_results.gcs_namespace_path, |
| self._api.testing_requests.SNAPSHOT_NAME, |
| ), |
| ), |
| ) |
| presentation.links["view snapshot in fsv"] = ( |
| "http://go/fsv?from=%s" % snapshot_gcs_url |
| ) |
| |
| if attempt.failure_reason: |
| presentation.step_summary_text = attempt.failure_reason |
| |
| for log, data in sorted(attempt.logs.items()): |
| presentation.logs[log] = data |
| |
| if attempt.test_results: |
| test_results = attempt.test_results |
| |
| # Log the contents of each output file mentioned in the summary. |
| # Note this assumes the outputs are all valid UTF-8 (See fxb/9500). |
| for name, path in test_results.summary.get("outputs", {}).items(): |
| presentation.logs[name] = test_results.get_output(path).split("\n") |
| |
| test_results.present_tests( |
| show_failures_in_red=show_failures_in_red, show_passed=True |
| ) |
| |
| |
| class FuchsiaTestApi(recipe_api.RecipeApi): |
| """API for running tests and processing test results.""" |
| |
| FuchsiaTestResults = FuchsiaTestResults |
| |
| TEST_SUMMARY_JSON = TEST_SUMMARY_JSON |
| |
| def task_targets_fuchsia(self, task_result): |
| # Sharded tasks will have the OS environment in the task name. |
| # Deprecated tasks do not have the OS in the name, but they always target |
| # fuchsia, so this function should always return true. |
| name = task_result.name.lower() |
| return "linux" not in name and "mac" not in name |
| |
| def extract_test_results(self, step_name, task_result): |
| """Extracts test results from task_result. |
| |
| Moves test results from the task_result's outputs into a new |
| directory. We do this because we later want to be able to archive |
| everything in the swarming task result's outputs, but we don't want |
| to archive most of the test result files. |
| |
| Args: |
| step_name (str): The name of the step. |
| task_result (api.swarming.TaskResult): The task result from which to extract |
| the test results. |
| |
| Returns: |
| The Path to the directory into which test results were extracted. |
| """ |
| # Use a directory that doesn't yet exist so that `api.file.move()` |
| # moves the source directory to the location of this directory, instead |
| # of *into* this directory. |
| test_results_dir = self.m.path.mkdtemp(task_result.id).join("test_results") |
| |
| # The outputs should be in TEST_RESULTS_DIR_NAME. |
| test_results_path = None |
| for relative_path in sorted(task_result.outputs.keys()): |
| if relative_path.startswith( |
| self.m.testing_requests.TEST_RESULTS_DIR_NAME + "/" |
| ): |
| test_results_path = str( |
| task_result.output_dir.join( |
| self.m.testing_requests.TEST_RESULTS_DIR_NAME |
| ) |
| ) |
| break |
| |
| # If the swarming task failed, there may not be any test results. |
| if test_results_path is None and not task_result.success: |
| self.m.file.ensure_directory("create test results dir", test_results_dir) |
| return test_results_dir |
| |
| assert ( |
| test_results_path |
| ), "test results not found amongst outputs of task %s: %s" % ( |
| task_result.name, |
| task_result.outputs, |
| ) |
| self.m.file.move(step_name, source=test_results_path, dest=test_results_dir) |
| |
| # This is only needed for the recipe tests. file.listdir() doesn't mock the |
| # existence of the paths it returns, so we must add it separately. |
| # We add summary_path because we check for its existence in |
| # FuchsiaTestResults._parse_summary(). |
| summary_path = test_results_dir.join(TEST_SUMMARY_JSON) |
| outputs = self.m.file.listdir("get extracted files", test_results_dir) |
| if summary_path in outputs: |
| self.m.path.mock_add_paths(summary_path) |
| return test_results_dir |
| |
| def run_test_tasks( |
| self, |
| debug_symbol_url, |
| max_attempts, |
| orchestration_inputs, |
| rerun_budget_secs=None, |
| runs_per_shard=1, |
| retry_task_on_test_failure=False, |
| ): |
| """Tests a Fuchsia build by sharding. |
| |
| Expects the build and artifacts to be at the same place they were at |
| the end of the build. |
| |
| Args: |
| debug_symbol_url (str): A GCS URL hosting debug symbols. |
| max_attempts (int): Maximum number of attempts before marking a |
| shard as failed. |
| orchestration_inputs (TestOrchestrationInputs): Build artifacts |
| needed for testing. |
| rerun_budget_secs (int): Will run tests repeatedly until this |
| budget is. consumed. If set, max_attempts is ignored. |
| runs_per_shard (int): Number of times to run each shard. |
| retry_task_on_test_failure (bool): Retry tasks on all test |
| failures. If false, retry whole tasks on tefmocheck failures |
| but not on regular test failures. |
| |
| Returns: |
| A list of swarming_retry.Tasks representing the completed test |
| tasks that were not subject to an infra failure. |
| """ |
| |
| # If no shards have been provided, then we have successfully run the empty |
| # set of tests. |
| if not orchestration_inputs.task_requests: |
| self.m.step.empty("no tests to run") |
| return [] |
| |
| launch_deadline_time = None |
| if rerun_budget_secs: |
| # If we have a rerun deadline, we want to run the tests as many times as |
| # possible within that time regardless of whether the tasks pass or fail, |
| # so we don't care about setting a max number of attempts. |
| max_attempts = float("inf") |
| launch_deadline_time = self.m.time.time() + rerun_budget_secs |
| |
| tasks = [] |
| for task_request in orchestration_inputs.task_requests: |
| if rerun_budget_secs: |
| # Rerun tasks should not block other tasks. |
| task_request = task_request.with_priority(task_request.priority + 1) |
| tasks.append( |
| Task( |
| request=task_request, |
| api=self.m, |
| orchestration_inputs=orchestration_inputs, |
| debug_symbol_url=debug_symbol_url, |
| retry_task_on_test_failure=retry_task_on_test_failure, |
| ) |
| ) |
| |
| tasks = self.m.swarming_retry.run_tasks( |
| tasks=tasks, |
| collect_output_dir=self.m.path.mkdtemp("swarming"), |
| max_attempts=max_attempts, |
| launch_deadline_time=launch_deadline_time, |
| run_count=runs_per_shard, |
| ) |
| |
| self.m.swarming_retry.present_tasks(tasks=tasks) |
| |
| return tasks |
| |
| def raise_failures(self, tasks): |
| failed_tasks = [] |
| failed_tests = [] |
| failed_tests_no_tefmo = [] |
| for task in tasks: |
| if task.failed: |
| failed_tasks.append(task.name) |
| # Report the test failures from the last failed attempt. |
| attempt = task.get_failed_attempts()[-1] |
| if attempt.test_results: |
| failed_tests.extend( |
| t["name"] for t in attempt.test_results.failed_tests |
| ) |
| failed_tests_no_tefmo.extend( |
| t["name"] |
| for t in attempt.test_results.failed_tests |
| if not t.get("is_testing_failure_mode") |
| ) |
| |
| # Expose these so tools can be used to re-run these failed tests. |
| with self.m.step.nest("record failed_test_names") as presentation: |
| presentation.properties["failed_test_names"] = sorted( |
| set(failed_tests_no_tefmo) |
| ) |
| |
| if not failed_tests: |
| if failed_tasks: |
| raise self.m.step.StepFailure( |
| "failed to process test results for %s: %s" |
| % ( |
| pluralize("task", failed_tasks), |
| ", ".join(sorted(failed_tasks)), |
| ) |
| ) |
| return |
| |
| def _test_description(test, failure_count): |
| count_info = "" |
| if failure_count > 1: |
| count_info = " (%d failures)" % failure_count |
| return "%s%s" % (test, count_info) |
| |
| failure_counts = collections.Counter(failed_tests) |
| header = "%s failed:" % pluralize("test", failure_counts) |
| if len(failure_counts) == 1: |
| test, failure_count = failure_counts.popitem() |
| raise self.m.step.StepFailure( |
| "%s %s" % (header, _test_description(test, failure_count)) |
| ) |
| |
| error_lines = [ |
| header, |
| # Milo's markdown renderer requires a blank line before the |
| # start of a bulleted list. Otherwise the list lines will all |
| # be rendered on one hyphen-separated line. |
| "", |
| ] |
| |
| # Present up to 10 tests and show a count of the rest. But if there are |
| # 11-15 failed tests, still show all of the tests because it's not |
| # worth saving lines by hiding only a couple tests. |
| max_tests_to_present = 15 |
| min_tests_to_hide = 5 |
| if len(failure_counts) > max_tests_to_present: |
| tests_to_present = max_tests_to_present - min_tests_to_hide |
| else: |
| tests_to_present = len(failure_counts) |
| |
| counts_to_present = sorted(failure_counts.items())[:tests_to_present] |
| for test, failure_count in counts_to_present: |
| error_lines.append("- %s" % _test_description(test, failure_count)) |
| |
| hidden_tests = len(failure_counts) - tests_to_present |
| if hidden_tests > 0: |
| error_lines.append("- (%d more)" % hidden_tests) |
| |
| raise self.m.step.StepFailure("\n".join(error_lines)) |
| |
| def test_results_from_skipped_shards(self, shards): |
| """Converts the given set of skipped shards to a set of test results. |
| |
| Args: |
| shards (list(orchestration_inputs.SkippedShard)): The list of skipped shards. |
| |
| Returns: |
| A list of FuchsiaTestResults. |
| """ |
| return [ |
| FuchsiaTestResults.from_skipped_shard(self.m, shard) for shard in shards |
| ] |