blob: 82d986581846431b2b8af17e6306233d955c2d1e [file] [log] [blame]
# Copyright 2019 The Fuchsia Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import attr
import collections
from recipe_engine import recipe_api
from recipe_engine.config_types import Path
from RECIPE_MODULES.fuchsia.swarming_retry import api as swarming_retry_api
from RECIPE_MODULES.fuchsia.testing_requests import api as testing_requests_api
from RECIPE_MODULES.fuchsia.utils import cached_property, nice_duration, pluralize
# Name of the file produced by a testing task that contains results and outputs
# of the tests that were run.
TEST_SUMMARY_JSON = "summary.json"
# What to name the file that contains the swarming task output.
TEST_TASK_OUTPUT_FILE = "infra_and_test_std_and_klog.txt"
# Flag names from
# https://fuchsia.googlesource.com/fuchsia/+/master/tools/testing/tefmocheck/cmd/main.go
LOG_NAME_TO_TEFMOCHECK_FLAG = {
TEST_TASK_OUTPUT_FILE: "-swarming-output",
testing_requests_api.SERIAL_LOG_NAME: "-serial-log",
testing_requests_api.SYSLOG_NAME: "-syslog",
}
@attr.s
class FuchsiaTestResults(object):
"""Represents the result of testing of a Fuchsia build.
Attributes:
from_fuchsia (bool): Whether the tests ran on Fuchsia.
results_dir (Path): The directory that the test results have been moved
into (out of task_output_dir).
task_output_dir (Path): A directory containing the outputs of the swarming
task that ran these tests. Anything that's in this directory will be
uploaded to GCS when upload_results() is called.
swarming_task_id (str): The ID of the task that ran these tests.
env_name (str): The name of the environment that these tags ran in.
shard_name (str): The name of the task that ran these tests.
api (RecipeApi): The api to use for accessing recipe modules from this
object.
summary_bytes (bytes): The contents of TEST_SUMMARY_JSON.
is_multiplied (bool): Whether the test results are for a multiplier
shard.
"""
from_fuchsia = attr.ib(type=bool)
results_dir = attr.ib(type=Path)
task_output_dir = attr.ib(type=Path)
swarming_task_id = attr.ib(type=str)
env_name = attr.ib(type=str)
_shard_name = attr.ib(type=str)
_api = attr.ib(type=recipe_api.RecipeApi)
_summary_bytes = attr.ib(type=bytes)
_is_multiplied = attr.ib(type=bool)
# A mapping of relative paths to files in the results_dir containing
# stdout+stderr data to strings containing those contents.
_outputs = attr.ib(factory=dict, init=False)
# Set lazily by `_separate_tests_by_result()`, not a parameter to __init__.
_pass_name_to_tests = attr.ib(factory=dict, init=False)
_fail_name_to_tests = attr.ib(factory=dict, init=False)
# Constants representing the result of running a test. These enumerate the
# values of the 'results' field of the entries in the summary.json file
# obtained from the target device.
_TEST_RESULT_PASS = "PASS"
_TEST_RESULT_FAIL = "FAIL"
@cached_property
def summary(self):
"""The parsed summary file as a Dict or {} if missing."""
if not self._summary_bytes:
return {}
try:
return self._api.json.loads(self._summary_bytes)
except ValueError as e: # pragma: no cover
# TODO(olivernewman): JSONDecodeError in python >=3.5
raise self._api.step.StepFailure(
"Invalid %s: %s" % (TEST_SUMMARY_JSON, e.args[0])
)
@property
def passed_tests(self):
"""All entries in |self._outputs| for tests that passed."""
self._separate_tests_by_result()
passed_tests = []
for tests in self._pass_name_to_tests.itervalues():
passed_tests.extend(tests)
return passed_tests
@property
def failed_tests(self):
"""All entries in |self._outputs| for tests that failed."""
self._separate_tests_by_result()
failed_tests = []
for name, tests in self._fail_name_to_tests.iteritems():
if self._is_multiplied or name not in self._pass_name_to_tests:
failed_tests.extend(tests)
return failed_tests
@property
def flaked_tests(self):
"""All entries in |self._outputs| for tests that flaked."""
self._separate_tests_by_result()
flaked_tests = []
for name, tests in self._fail_name_to_tests.iteritems():
if not self._is_multiplied and name in self._pass_name_to_tests:
flaked_tests.extend(tests)
return flaked_tests
def _separate_tests_by_result(self):
"""Separates entries in |self._outputs| into maps based on test result.
Passed tests will be stored in self._pass_name_to_tests and failed
tests will be stored in self._fail_name_to_tests.
"""
if not self._pass_name_to_tests or not self._fail_name_to_tests:
# Using OrderedDict() will keep the tests in the same order as they
# ran in. This doesn't really matter for displaying results, but it
# keeps the expectation files from changing by maintaining the same
# order of tests.
self._pass_name_to_tests = collections.OrderedDict()
self._fail_name_to_tests = collections.OrderedDict()
for test in self.summary.get("tests") or ():
if test["result"] == self._TEST_RESULT_PASS:
tests = self._pass_name_to_tests.get(test["name"], [])
tests.append(test)
self._pass_name_to_tests[test["name"]] = tests
else:
tests = self._fail_name_to_tests.get(test["name"], [])
tests.append(test)
self._fail_name_to_tests[test["name"]] = tests
def present_tests(self, show_failures_in_red, show_passed):
def show_tests(tests, status):
for test in tests:
name = "%s: %s" % (status, test["name"])
stdio_file = test["output_file"]
if stdio_file and stdio_file not in self._outputs:
self._outputs[stdio_file] = self._api.file.read_text(
name, self.results_dir.join(stdio_file), test_data="output"
)
step = self._api.step.active_result
# file.read_text creates a step that displays a log of the file contents
# with the name as the basename of the file read. We want to remove this
# log and rename it to `stdio`.
step.presentation.logs.pop(self._api.path.basename(stdio_file))
else:
step = self._api.step(name, None)
if stdio_file:
step.presentation.logs["stdio"] = self._outputs[
stdio_file
].splitlines()
if show_failures_in_red:
step.presentation.status = self._api.step.FAILURE
show_tests(self.failed_tests, "failed")
show_tests(self.flaked_tests, "failed")
# There's recipe overhead that makes step creation slow, which we mitigate
# by cramming all the passed tests into a single step. We also skip
# presenting stdio since it's generally only useful if the test failed.
with self._api.step.nest("all passed tests") as passed_tests_step:
passed_tests_step.presentation.step_summary_text = pluralize(
"passed test", self.passed_tests
)
if show_passed:
# Start with a newline to prevent the first test from showing up on
# the same line as the step name.
passed_tests_step.presentation.step_text = "".join(
"\n" + test["name"] for test in self.passed_tests
)
def get_output(self, output_path):
"""Returns the contents of the file at output_path.
The output_path should be a relative path to the results_dir.
"""
if output_path not in self._outputs:
self._outputs[output_path] = self._api.file.read_text(
"read %s" % output_path,
self.results_dir.join(output_path),
test_data="output",
)
return self._outputs[output_path]
def upload_results(
self,
gcs_bucket,
upload_to_catapult,
orchestration_inputs,
resultdb_base_variant,
):
"""Upload select test results (e.g., coverage data) to a given GCS bucket."""
assert gcs_bucket
with self._api.step.nest("upload %s test results" % self._shard_name):
if self.summary:
# Save the summary JSON to the test shard output dir so it gets
# uploaded to GCS for easy access by e.g. Dachsiaboard.
summary_path = self.task_output_dir.join(TEST_SUMMARY_JSON)
assert not self._api.path.exists(summary_path), (
"test output files should not be named %s" % TEST_SUMMARY_JSON
)
self._api.file.write_json(
"write %s" % TEST_SUMMARY_JSON, summary_path, self.summary
)
self._api.upload.test_results_to_resultdb(
resultdb_bin_path=orchestration_inputs.resultdb,
outputs_dir=self.results_dir,
summary_filepaths=[summary_path],
base_variant=resultdb_base_variant,
)
snapshot_zip = self.results_dir.join(testing_requests_api.SNAPSHOT_NAME)
self._api.path.mock_add_paths(snapshot_zip)
if self._api.path.exists(snapshot_zip):
self._api.file.move(
"move snapshot to output dir",
snapshot_zip,
self.task_output_dir.join(testing_requests_api.SNAPSHOT_NAME),
)
# If an output was important enough to present (meaning it's in _outputs),
# we should upload it so it can be shown in Sponge, etc.
with self._api.step.nest("move test outputs"):
for output in self._outputs:
dest = self.task_output_dir.join(output)
self._api.file.ensure_directory(
"mkdir", self._api.path.dirname(dest)
)
self._api.file.move(
"mv", self.results_dir.join(output), dest,
)
self._upload_outputs(gcs_bucket)
if upload_to_catapult:
self._api.upload.test_outputs_to_catapult(self.results_dir)
def _upload_outputs(self, gcs_bucket):
self._api.upload.directory_to_gcs(
source=self.task_output_dir,
bucket=gcs_bucket,
# Namespace to avoid collision across shards and attempts.
subpath="%s/%s" % (self._shard_name, self.swarming_task_id),
# Internal gsutil retries of transient failures can cause rsync to
# fail when run by a service account that doesn't have permission
# to delete objects.
rsync=False,
)
def run_triage(self, triage_tool, triage_sources):
snapshot_zip = self.results_dir.join(testing_requests_api.SNAPSHOT_NAME)
self._api.path.mock_add_paths(snapshot_zip)
if not self._api.path.exists(snapshot_zip): # pragma: no cover
return
with self._api.step.nest(
"run triage for %s" % self._shard_name
) as presentation:
snapshot_dir = self._api.path.mkdtemp("snapshot")
self._api.tar.extract(
"extract snapshot", path=snapshot_zip, directory=snapshot_dir
)
cmd = (
[triage_tool,]
+ ["--config=%s" % f for f in triage_sources]
+ ["--data", snapshot_dir]
)
triage_output = self._api.path.mkstemp()
triage_step = self._api.step(
"triage",
cmd,
# The triage tool returns a non-zero exit code when it detects
# any violations or errors. We don't want to fail in this case,
# but just write the output to a file for analysis.
ok_ret="any",
stdout=self._api.raw_io.output(leak_to=triage_output),
)
presentation.logs["triage_output"] = (
triage_step.stdout.splitlines() if triage_step.stdout else []
)
self._api.file.move(
"move triage output file to output dir",
triage_output,
self.task_output_dir.join("triage_output"),
)
class Task(swarming_retry_api.TriggeredTask):
"""Task processes and presents results of testing Swarming tasks."""
def __init__(
self,
request,
api,
symbolize_tool,
llvm_symbolizer,
debug_symbol_url,
tefmocheck,
minfs=None,
retry_task_on_test_failure=False,
**kwargs
):
super(Task, self).__init__(request, api, **kwargs)
self._symbolize_tool = symbolize_tool
self._llvm_symbolizer = llvm_symbolizer
self._debug_symbol_url = debug_symbol_url
self._tefmocheck = tefmocheck
self._minfs = minfs
self._retry_task_on_test_failure = retry_task_on_test_failure
# Test shards with the 'multiplied:' prefix come from
# tools/integration/testsharder/shard.go in fuchsia.git. They were
# specifically created to run a test or set of tests many times to look
# for flakes. It doesn't make sense to retry these when they fail--the
# goal is to see if they fail not to get them to pass.
if self.name.startswith("multiplied:"):
self.max_attempts = 1
self._is_multiplied = True
else:
self._is_multiplied = False
# Test shards with the 'affected:' prefix run affected tests.
# If one of these tasks fail, we should abort early without waiting for
# the rest of the tasks to finish.
self.abort_early_if_failed = self.name.startswith("affected:")
def launch(self, priority_boost_amount):
"""Launches a swarming task attempt.
It also initializes the test_results field on the returned
swarming_retry.Attempt to None so that the field will be set even if
the attempt is not completed or the result processing fails.
"""
attempt = super(Task, self).launch(priority_boost_amount)
attempt.test_results = None
return attempt
def process_result(self, attempt):
"""Processes the results produced by a test shard."""
assert attempt.result
result = attempt.result
if result.isolated_outputs:
attempt.task_outputs_link = result.isolated_outputs.url
@attr.s
class _LogToProcess(object):
name = attr.ib(type=str)
data = attr.ib(type=str)
path = attr.ib(type=Path)
swarming_summary_path = self._api.path.mkstemp()
self._api.file.write_raw(
"write swarming summary JSON",
swarming_summary_path,
self._api.json.dumps(result.raw),
)
tefmocheck_cmd = [
self._tefmocheck,
"-swarming-summary-json",
swarming_summary_path,
"-swarming-host",
attempt.host,
]
to_process = [
_LogToProcess(
name=TEST_TASK_OUTPUT_FILE,
path=result.output_dir.join(TEST_TASK_OUTPUT_FILE),
data=result.output,
)
]
for log_name in (
testing_requests_api.SYSLOG_NAME,
testing_requests_api.SERIAL_LOG_NAME,
):
if log_name in result.outputs:
to_process.append(
_LogToProcess(
name=log_name,
path=result.outputs[log_name],
data=self._api.file.read_text(
"read %s" % log_name,
result.outputs[log_name],
test_data="extra log contents",
),
)
)
for log in to_process:
tefmocheck_flag_name = LOG_NAME_TO_TEFMOCHECK_FLAG.get(log.name)
if tefmocheck_flag_name:
tefmocheck_cmd.extend((tefmocheck_flag_name, log.path))
# Symbolize and overwrite so that tefmocheck and logdog see the
# symbolized versions.
if self._api.testing.task_targets_fuchsia(result):
# Non-Fuchsia should already be symbolized, and attempting to use
# the symbolizer may fail, if e.g. it was built on Mac and this is
# running on Linux.
attempt.logs[log.name] = self._api.symbolize(
symbolize_tool=self._symbolize_tool,
llvm_symbolizer=self._llvm_symbolizer,
data=log.data,
name="symbolize %s" % log.name,
debug_symbol_url=self._debug_symbol_url,
symbolizer_output=log.path,
)
else:
attempt.logs[log.name] = log.data
if not self._api.path.exists(log.path):
# Ensure it exists on file system even if we didn't symbolize
# so that it gets uploaded to GCS later.
self._api.file.write_raw(
"write %s" % log.name, log.path, log.data, include_log=False
)
self._api.path.mock_add_paths(log.path)
test_results_dir = self._api.testing.results_dir_on_host.join(result.id)
tefmocheck_cmd.extend(("-outputs-dir", test_results_dir))
summary_json_path = test_results_dir.join(TEST_SUMMARY_JSON)
# If the swarming task failed, extracting test results will likely fail.
if result.success:
self._api.testing.extract_test_results(
step_name="extract results",
task_result=result,
directory=test_results_dir,
minfs_path=self._minfs,
)
if self._api.path.exists(summary_json_path):
tefmocheck_cmd.extend(("-test-summary-json", summary_json_path))
summary_bytes = self._api.step(
"tefmocheck", tefmocheck_cmd, stdout=self._api.raw_io.output()
).stdout
attempt.logs[TEST_SUMMARY_JSON] = summary_bytes
tags = {}
for tag in self._request.tags:
k, v = tag.split(":", 1)
tags[k] = v
test_results = FuchsiaTestResults(
from_fuchsia=self._api.testing.task_targets_fuchsia(result),
results_dir=test_results_dir,
swarming_task_id=result.id,
shard_name=result.name,
env_name=tags[self._api.testing_requests.TEST_ENVIRONMENT_TAG_NAME],
api=self._api,
task_output_dir=result.output_dir,
summary_bytes=summary_bytes,
is_multiplied=self._is_multiplied,
)
attempt.test_results = test_results
flaked_tests = test_results.flaked_tests
failed_tests = test_results.failed_tests
if not result.success:
if result.state == self._api.swarming.TaskState.COMPLETED:
# The task will have state COMPLETED even if it had a non-zero
# exit code, so show a more helpful message than "completed".
attempt.failure_reason = "task failed"
else:
# "BOT_DIED" -> "bot died"
attempt.failure_reason = result.state.name.lower().replace("_", " ")
else:
if flaked_tests:
attempt.has_flakes = True
if failed_tests:
attempt.failure_reason = "%s failed" % pluralize("test", failed_tests)
# Only retry if all the failures are testing_failure_mode checks.
# Sometimes, a problem can occur in the system that causes all tests
# to fail after that point, in which case we should also retry.
# The number of failed tests to check for in this case should match
# that of the mass_test_failure_check here:
# https://fuchsia.googlesource.com/fuchsia/+/e2f3065a52c5bd69090d0c2dcd8cbc76a75d601c/tools/testing/tefmocheck/cmd/main.go#125
should_retry = (
all(t.get("is_testing_failure_mode") for t in failed_tests)
or len(failed_tests) > 5
)
if not self._retry_task_on_test_failure and not should_retry:
self.max_attempts = 1
# Log the contents of each output file mentioned in the summary.
# Note this assumes the outputs are all valid UTF-8 (See fxb/9500).
for output_name, output_path in test_results.summary.get(
"outputs", {}
).iteritems():
attempt.logs[output_name] = test_results.get_output(output_path)
def present_attempt(self, _, attempt, category=None):
"""Present an Attempt when summarizing results at the end of the run.
Args:
attempt (Attempt): the Attempt to present
category (str): the group of tasks ('passes', 'failures', or
'flakes') that this attempt should be presented under
"""
show_failures_in_red = True
# The 'passes' category includes all attempts of all tasks that
# eventually passed, so it includes some failures. Show those in
# green so people don't get confused and think the overall task
# failed.
# TODO(fxb/36647) after this bug is fixed show these steps in
# red, but show parent steps of those in green.
if category == "passes" or category == "incomplete":
show_failures_in_red = False
attempt_status = "fail"
if attempt.success:
if attempt.has_flakes:
attempt_status = "flake"
else:
attempt_status = "pass"
name = "%s (%s)" % (attempt.name, attempt_status)
with self._api.step.nest(name) as presentation:
if show_failures_in_red and (not attempt.success or attempt.has_flakes):
presentation.status = self._api.step.FAILURE
self._present(
presentation,
attempt,
show_failures_in_red=show_failures_in_red,
show_passed=True,
)
def present_status(self, parent_step, attempt):
"""Present an Attempt while showing progress in launch/collect step.
Args:
parent_step (Step): will always be 'passed tasks' or 'failed tasks'
attempt (Attempt): the Attempt to present
"""
del parent_step # Unused.
with self._api.step.nest("%s (%s)" % (self.name, attempt.name)) as presentation:
self._present(
presentation, attempt, show_failures_in_red=False, show_passed=False
)
def _present(self, presentation, attempt, show_failures_in_red, show_passed):
"""Present an Attempt.
Choosing to do largely the same thing for both kinds of presentations.
Args:
presentation (StepPresentation): where to present the attempt info
attempt (api.swarming_retry.Attempt): object to present
show_failures_in_red (bool): show failures in red (for final
'flakes' and 'failures' steps) or not (for 'launch/collect'
progress and 'passes' steps)
show_passed (bool): show the names of passed tests (only done for
the end)
Note: the 'passes' step can have failures underneath it because the
first attempt can fail but the retry passed.
"""
if attempt.result and attempt.result.duration_secs:
presentation.step_text = nice_duration(attempt.result.duration_secs)
presentation.presentation.links["swarming task"] = attempt.task_ui_link
if attempt.task_outputs_link:
presentation.links["task outputs"] = attempt.task_outputs_link
if attempt.failure_reason:
presentation.step_summary_text = attempt.failure_reason
for log, data in attempt.logs.iteritems():
presentation.logs[log] = data
if attempt.test_results:
test_results = attempt.test_results
# Log the contents of each output file mentioned in the summary.
# Note this assumes the outputs are all valid UTF-8 (See fxb/9500).
for name, path in test_results.summary.get("outputs", {}).iteritems():
presentation.logs[name] = test_results.get_output(path).split("\n")
test_results.present_tests(
show_failures_in_red=show_failures_in_red, show_passed=show_passed
)
class FuchsiaTestApi(recipe_api.RecipeApi):
"""API for running tests and processing test results."""
FuchsiaTestResults = FuchsiaTestResults
TEST_SUMMARY_JSON = TEST_SUMMARY_JSON
def task_targets_fuchsia(self, task_result):
# Sharded tasks will have the OS environment in the task name.
# Deprecated tasks do not have the OS in the name, but they always target
# fuchsia, so this function should always return true.
name = task_result.name.lower()
return "linux" not in name and "mac" not in name
@property
def results_dir_on_host(self):
"""The directory on host to which host and target test results will be written.
Target test results will be copied over to this location and
host test results will be written here. Host and target tests on
should write to separate subdirectories so as not to collide.
"""
return self.m.path["cleanup"].join("test_results")
def extract_test_results(self, step_name, task_result, directory, minfs_path=None):
"""Extracts test results from task_result.
Moves test results from the task_result's outputs into directory.
We do this because we later want to be able to archive everything in the
swarming task result's outputs, but we don't want to archive most of the
test result files.
Args:
step_name (str): The name of the step.
task_result (api.swarming.TaskResult): The task result from which to extract
the test results.
directory (Path): The directory to extract the test results into.
minfs_path (Path): The path to the minfs tool.
"""
if self.m.testing_requests.TEST_RESULTS_MINFS_NAME in task_result.outputs:
assert minfs_path, "minfs path not set"
self.m.minfs.minfs_path = minfs_path
test_results_path = task_result.outputs[
self.m.testing_requests.TEST_RESULTS_MINFS_NAME
]
self.m.file.ensure_directory("create test results dir", directory)
self.m.minfs.copy_image(
step_name=step_name, image_path=test_results_path, out_dir=directory,
)
# Delete the archive so it doesn't get uploaded with the other files in
# the swarming task's output directory.
self.m.file.remove(
"remove %s" % self.m.path.basename(test_results_path), test_results_path
)
else:
# The outputs should be in an out directory defined by
# TEST_RESULTS_DIR_NAME.
test_results_path = None
for relative_path in sorted(task_result.outputs.keys()):
if relative_path.startswith(
self.m.testing_requests.TEST_RESULTS_DIR_NAME + "/"
):
test_results_path = str(
task_result.output_dir.join(
self.m.testing_requests.TEST_RESULTS_DIR_NAME
)
)
break
assert test_results_path, (
"test results not found amongst outputs of task %s: %s"
% (task_result.name, task_result.outputs)
)
self.m.file.move(step_name, source=test_results_path, dest=directory)
# This is only needed for the recipe tests. file.listdir() doesn't mock the
# existence of the paths it returns, so we must add it separately.
# We add summary_path because we check for its existence in
# FuchsiaTestResults._parse_summary().
summary_path = directory.join(TEST_SUMMARY_JSON)
outputs = self.m.file.listdir("get extracted files", directory)
if summary_path in outputs:
self.m.path.mock_add_paths(summary_path)
def run_test_tasks(
self,
debug_symbol_url,
max_attempts,
orchestration_inputs,
rerun_budget_secs=None,
runs_per_shard=1,
retry_task_on_test_failure=False,
):
"""Tests a Fuchsia build by sharding.
Expects the build and artifacts to be at the same place they were at
the end of the build.
Args:
debug_symbol_url (str): A GCS URL hosting debug symbols.
max_attempts (int): Maximum number of attempts before marking a shard
as failed.
orchestration_inputs (TestOrchestrationInputs): Build artifacts needed
for testing.
rerun_budget_secs (int): Will run tests repeatedly until this budget is.
consumed. If set, max_attempts is ignored.
runs_per_shard (int): Number of times to run each shard.
retry_task_on_test_failure (bool): Retry tasks on all test failures.
If false, retry whole tasks on tefmocheck failures but not on
regular test failures.
Returns:
A list of swarming_retry.Tasks representing the completed test
tasks that were not subject to an infra failure.
"""
# If no shards have been provided, then we have successfully run the empty
# set of tests.
if not orchestration_inputs.task_requests:
return []
launch_deadline_time = None
present = True
if rerun_budget_secs:
# If we have a rerun deadline, we want to run the tests as many times as
# possible within that time regardless of whether the tasks pass or fail,
# so we don't care about setting a max number of attempts.
max_attempts = float("inf")
# Presenting results is currently slow due to inefficiencies in
# the recipe engine. For builds that have rerun_budget_secs enabled,
# we only care about consuming the data after it's been uploaded, not
# from the Milo build page, so skip presentation.
present = False
launch_deadline_time = self.m.time.time() + rerun_budget_secs
tasks = []
for task_request in orchestration_inputs.task_requests:
if rerun_budget_secs:
# Rerun tasks should not block other tasks.
task_request = task_request.with_priority(task_request.priority + 1)
tasks.append(
Task(
request=task_request,
api=self.m,
symbolize_tool=orchestration_inputs.symbolize_tool,
llvm_symbolizer=orchestration_inputs.llvm_symbolizer,
debug_symbol_url=debug_symbol_url,
tefmocheck=orchestration_inputs.tefmocheck,
minfs=orchestration_inputs.minfs,
retry_task_on_test_failure=retry_task_on_test_failure,
)
)
tasks = self.m.swarming_retry.run_tasks(
tasks=tasks,
collect_output_dir=self.m.path.mkdtemp("swarming"),
max_attempts=max_attempts,
launch_deadline_time=launch_deadline_time,
present=present,
run_count=runs_per_shard,
)
if present:
self.m.swarming_retry.present_tasks(tasks=tasks)
return tasks
def raise_failures(self, tasks):
failed_tests = []
failed_tests_no_tefmo = []
tasks_without_results = []
for task in tasks:
if task.failed:
# Report the test failures from the last failed attempt.
attempt = task.get_failed_attempts()[-1]
if attempt.test_results:
failed_tests.extend(
t["name"] for t in attempt.test_results.failed_tests
)
failed_tests_no_tefmo.extend(
t["name"]
for t in attempt.test_results.failed_tests
if not t.get("is_testing_failure_mode")
)
else:
tasks_without_results.append(task.name)
if tasks_without_results:
raise self.m.step.StepFailure(
"failed to retrieve test results for %s: %s"
% (
pluralize("task", tasks_without_results),
", ".join(sorted(tasks_without_results)),
)
)
# Expose these so tools can be used to re-run these failed tests.
with self.m.step.nest("record failed_test_names") as presentation:
presentation.properties["failed_test_names"] = failed_tests_no_tefmo
if not failed_tests:
return
def _test_description(test, failure_count):
count_info = ""
if failure_count > 1:
count_info = " (%d failures)" % failure_count
return "%s%s" % (test, count_info)
failure_counts = collections.Counter(failed_tests)
header = "%s failed:" % pluralize("test", failure_counts)
if len(failure_counts) == 1:
test, failure_count = failure_counts.popitem()
raise self.m.step.StepFailure(
"%s %s" % (header, _test_description(test, failure_count))
)
error_lines = [
header,
# Milo's markdown renderer requires a blank line before the
# start of a bulleted list. Otherwise the list lines will all
# be rendered on one hyphen-separated line.
"",
]
# Present up to 10 tests and show a count of the rest. But if there are
# 11-15 failed tests, still show all of the tests because it's not
# worth saving lines by hiding only a couple tests.
max_tests_to_present = 15
min_tests_to_hide = 5
if len(failure_counts) > max_tests_to_present:
tests_to_present = max_tests_to_present - min_tests_to_hide
else:
tests_to_present = len(failure_counts)
counts_to_present = sorted(failure_counts.items())[:tests_to_present]
for test, failure_count in counts_to_present:
error_lines.append("- %s" % _test_description(test, failure_count))
hidden_tests = len(failure_counts) - tests_to_present
if hidden_tests > 0:
error_lines.append("- (%d more)" % hidden_tests)
raise self.m.step.StepFailure("\n".join(error_lines))