blob: b409de7e7cb82d7e5e164c8cbe40420873dee5fd [file] [log] [blame]
# Copyright 2019 The Fuchsia Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import attr
import collections
from recipe_engine import recipe_api
from recipe_engine.config_types import Path
from RECIPE_MODULES.fuchsia.swarming_retry import api as swarming_retry_api
from RECIPE_MODULES.fuchsia.utils import cached_property, nice_duration, pluralize
# Name of the file produced by a testing task that contains results and outputs
# of the tests that were run.
TEST_SUMMARY_JSON = "summary.json"
@attr.s
class FuchsiaTestResults(object):
"""Represents the result of testing of a Fuchsia build.
Attributes:
from_fuchsia (bool): Whether the tests ran on Fuchsia.
results_dir (Path): The directory that the test results have been moved
into (out of task_output_dir).
task_output_dir (Path): A directory containing the outputs of the swarming
task that ran these tests. Anything that's in this directory will be
uploaded to GCS when upload_results() is called.
swarming_bot_id (str): The ID of the swarming bot that ran the task.
swarming_task_id (str): The ID of the task that ran these tests.
env_name (str): The name of the environment that these tags ran in.
shard_name (str): The name of the task that ran these tests.
api (RecipeApi): The api to use for accessing recipe modules from this
object.
summary_bytes (bytes): The contents of TEST_SUMMARY_JSON.
is_multiplied (bool): Whether the test results are for a multiplier
shard.
"""
from_fuchsia = attr.ib(type=bool)
results_dir = attr.ib(type=Path)
task_output_dir = attr.ib(type=Path)
swarming_bot_id = attr.ib(type=str)
swarming_task_id = attr.ib(type=str)
env_name = attr.ib(type=str)
_shard_name = attr.ib(type=str)
_api = attr.ib(type=recipe_api.RecipeApi)
_summary_bytes = attr.ib(type=bytes)
_is_multiplied = attr.ib(type=bool)
# A mapping of relative paths to files in the results_dir containing
# stdout+stderr data to strings containing those contents.
_outputs = attr.ib(factory=dict, init=False)
# Set lazily by `_separate_tests_by_result()`, not a parameter to __init__.
_pass_name_to_tests = attr.ib(factory=dict, init=False)
_fail_name_to_tests = attr.ib(factory=dict, init=False)
# Constants representing the result of running a test. These enumerate the
# values of the 'results' field of the entries in the summary.json file
# obtained from the target device. Any other value is considered to
# represent a failure.
_TEST_RESULT_PASS = "PASS"
_TEST_RESULT_FAIL = "FAIL"
_TEST_RESULT_TIMEOUT = "ABORT"
_TEST_RESULT_SKIP = "SKIP"
@cached_property
def summary(self):
"""The parsed summary file as a Dict or {} if missing."""
if not self._summary_bytes:
return {}
try:
return self._api.json.loads(self._summary_bytes)
except ValueError as e: # pragma: no cover
# TODO(olivernewman): JSONDecodeError in python >=3.5
raise self._api.step.StepFailure(
"Invalid %s: %s" % (TEST_SUMMARY_JSON, e.args[0])
)
@property
def passed_tests(self):
"""All entries in |self._outputs| for tests that passed."""
self._separate_tests_by_result()
passed_tests = []
for tests in self._pass_name_to_tests.values():
passed_tests.extend(tests)
return passed_tests
@property
def failed_tests(self):
"""All entries in |self._outputs| for tests that failed."""
self._separate_tests_by_result()
failed_tests = []
for name, tests in self._fail_name_to_tests.items():
if self._is_multiplied or name not in self._pass_name_to_tests:
failed_tests.extend(tests)
return failed_tests
@property
def flaked_tests(self):
"""All entries in |self._outputs| for tests that flaked."""
self._separate_tests_by_result()
flaked_tests = []
for name, tests in self._fail_name_to_tests.items():
if not self._is_multiplied and name in self._pass_name_to_tests:
flaked_tests.extend(tests)
return flaked_tests
def _separate_tests_by_result(self):
"""Separates entries in |self._outputs| into maps based on test result.
Passed tests will be stored in self._pass_name_to_tests and failed
tests will be stored in self._fail_name_to_tests.
"""
if not self._pass_name_to_tests or not self._fail_name_to_tests:
# Using OrderedDict() will keep the tests in the same order as they
# ran in. This doesn't really matter for displaying results, but it
# keeps the expectation files from changing by maintaining the same
# order of tests.
self._pass_name_to_tests = collections.OrderedDict()
self._fail_name_to_tests = collections.OrderedDict()
for test in self.summary.get("tests") or ():
if test["result"] == self._TEST_RESULT_PASS:
tests = self._pass_name_to_tests.get(test["name"], [])
tests.append(test)
self._pass_name_to_tests[test["name"]] = tests
else:
tests = self._fail_name_to_tests.get(test["name"], [])
tests.append(test)
self._fail_name_to_tests[test["name"]] = tests
def present_tests(self, show_failures_in_red, show_passed):
def show_failed_tests(tests):
for test in tests:
if test["result"] == self._TEST_RESULT_TIMEOUT:
status = "timed out"
elif test["result"] == self._TEST_RESULT_SKIP:
status = "skipped"
else:
status = "failed"
step = self._api.step.empty("%s: %s" % (status, test["name"]))
for output_file in test["output_files"]:
# Failed test output files should have been read into
# self._outputs by Task.process_result().
output = self._outputs[output_file]
log_name = self._api.path.basename(output_file)
if not output.strip():
log_name += " (empty)"
step.presentation.logs[log_name] = output.splitlines()
# If present, show the GN label to aid debugging.
label = test.get("gn_label")
if label:
step.presentation.step_summary_text = label
if show_failures_in_red:
step.presentation.status = self._api.step.FAILURE
show_failed_tests(self.failed_tests)
show_failed_tests(self.flaked_tests)
# There's recipe overhead that makes step creation slow, which we mitigate
# by cramming all the passed tests into a single step. We also skip
# presenting stdio since it's generally only useful if the test failed.
with self._api.step.nest("all passed tests") as passed_tests_step:
passed_tests_step.presentation.step_summary_text = pluralize(
"passed test", self.passed_tests
)
if show_passed:
passed_tests_step.presentation.step_text = self._passed_tests_text(
self.passed_tests
)
def _passed_tests_text(self, tests):
"""Format test names for presentation in Milo.
If a single test ran many times in a row, we'll compress those runs
into a single line with a run count, to improve readability.
"""
test_names = [t["name"] for t in tests]
lines = []
current_test_count = 0
for i, test_name in enumerate(test_names):
current_test_count += 1
# If we've reached the last test OR the next test has a different
# name then we've reached the end of a repeated streak of running a
# single test, so add a line of text for that streak.
if i + 1 >= len(test_names) or test_names[i + 1] != test_name:
# Surround test names in backticks so that Milo's markdown
# renderer doesn't try to parse them as markdown.
line = "`%s`" % test_name
if current_test_count > 1:
line += " (%d runs)" % current_test_count
lines.append(line)
current_test_count = 0
# Start with a newline to prevent the first test from showing up on
# the same line as the step name.
return "".join("\n" + l for l in lines)
def get_output(self, output_path):
"""Returns the contents of the file at output_path.
The output_path should be a relative path to the results_dir.
"""
if output_path not in self._outputs:
self._outputs[output_path] = self._api.file.read_text(
"read %s" % output_path,
self.results_dir.join(output_path),
# We're returning the log contents to be attached to other
# steps, so no need to log them here.
include_log=False,
)
return self._outputs[output_path]
def upload_results(
self,
gcs_bucket,
upload_to_catapult,
orchestration_inputs,
resultdb_base_variant,
resultdb_tags,
):
"""Upload select test results (e.g., coverage data) to a given GCS bucket."""
assert gcs_bucket
with self._api.context(infra_steps=True), self._api.step.nest(
"upload %s test results" % self._shard_name
):
if self.summary:
# Save the summary JSON to the test shard output dir so it gets
# uploaded to GCS for easy access by e.g. Dachsiaboard.
summary_path = self.task_output_dir.join(TEST_SUMMARY_JSON)
assert not self._api.path.exists(summary_path), (
"test output files should not be named %s" % TEST_SUMMARY_JSON
)
self._api.file.write_json(
"write %s" % TEST_SUMMARY_JSON, summary_path, self.summary
)
if self._api.resultdb.enabled and orchestration_inputs.resultdb:
self._upload_to_resultdb(
summary_path,
orchestration_inputs,
resultdb_base_variant,
resultdb_tags,
)
snapshot_zip = self.results_dir.join(
self._api.testing_requests.SNAPSHOT_NAME
)
self._api.path.mock_add_paths(snapshot_zip)
if self._api.path.exists(snapshot_zip):
self._api.file.move(
"move snapshot to output dir",
snapshot_zip,
self.task_output_dir.join(self._api.testing_requests.SNAPSHOT_NAME),
)
# If an output was important enough to present (meaning it's in _outputs),
# we should upload it so it can be shown in Sponge, etc.
with self._api.step.nest("move test outputs"):
for output in sorted(self._outputs):
dest = self.task_output_dir.join(output)
self._api.file.ensure_directory(
"mkdir", self._api.path.dirname(dest)
)
self._api.file.move(
"mv",
self.results_dir.join(output),
dest,
)
self._upload_outputs(gcs_bucket)
if upload_to_catapult:
self._api.catapult.upload_test_outputs(self.results_dir)
def _upload_to_resultdb(
self, summary_path, orchestration_inputs, base_variant, tags
):
cmd = [orchestration_inputs.resultdb]
base_variant = base_variant.copy()
base_variant.update(
{
"bucket": self._api.buildbucket.build.builder.bucket,
"builder": self._api.buildbucket.build.builder.builder,
}
)
if self.results_dir:
cmd.append("--output=%s" % self.results_dir)
cmd.append("--summary=%s" % summary_path)
for tag in tags:
cmd.append("--tag=%s" % tag)
try:
self._api.step(
"resultdb",
self._api.resultdb.wrap(cmd, base_variant=base_variant, include=True),
)
except self._api.step.InfraFailure: # pragma: no cover # pylint: disable=try-except-raise
# When there is a reliability issues with ResultDB, replace the
# following statement from 'raise' to 'pass' so the step stays an
# infra failure but the build is not affected:
raise
def _upload_outputs(self, gcs_bucket):
self._api.gsutil.upload_namespaced_directory(
source=self.task_output_dir,
bucket=gcs_bucket,
# Namespace to avoid collision across shards and attempts.
subpath="%s/%s" % (self._shard_name, self.swarming_task_id),
# Internal gsutil retries of transient failures can cause rsync to
# fail when run by a service account that doesn't have permission
# to delete objects.
rsync=False,
)
def run_triage(self, triage_tool, triage_sources):
snapshot_zip = self.results_dir.join(self._api.testing_requests.SNAPSHOT_NAME)
self._api.path.mock_add_paths(snapshot_zip)
if not self._api.path.exists(snapshot_zip): # pragma: no cover
return
with self._api.step.nest("run triage"):
snapshot_dir = self._api.path.mkdtemp("snapshot")
self._api.tar.extract(
"extract snapshot", path=snapshot_zip, directory=snapshot_dir
)
cmd = (
[triage_tool]
+ ["--config=%s" % f for f in triage_sources]
+ ["--data", snapshot_dir]
)
return self._api.step(
"triage",
cmd,
# The triage tool returns a non-zero exit code when it detects
# any violations or errors. We don't want to fail in this case,
# but just write the output to a file for analysis.
ok_ret="any",
stdout=self._api.raw_io.output_text(
# Write to the results_dir so it gets uploaded to ResultDB.
leak_to=self.results_dir.join("triage_output")
),
step_test_data=lambda: self._api.raw_io.test_api.stream_output_text(
"triage info"
),
)
class Task(swarming_retry_api.TriggeredTask):
"""Task processes and presents results of testing Swarming tasks."""
def __init__(
self,
request,
api,
orchestration_inputs,
debug_symbol_url,
retry_task_on_test_failure=False,
**kwargs
):
super().__init__(request, api, **kwargs)
self._orchestration_inputs = orchestration_inputs
self._debug_symbol_url = debug_symbol_url
self._retry_task_on_test_failure = retry_task_on_test_failure
# Test shards with the 'multiplied:' prefix come from
# tools/integration/testsharder/shard.go in fuchsia.git. They were
# specifically created to run a test or set of tests many times to look
# for flakes. It doesn't make sense to retry these when they fail--the
# goal is to see if they fail not to get them to pass.
if self.name.startswith("multiplied:"):
self.max_attempts = 1
self._is_multiplied = True
else:
self._is_multiplied = False
# Abort on test failures that are likely due to the change being tested
# to decrease true rejection time.
self.abort_early_if_failed = (
self.name.startswith("affected:") or self._is_multiplied
)
def launch(self, priority_boost_amount):
"""Launches a swarming task attempt.
It also initializes the test_results field on the returned
swarming_retry.Attempt to None so that the field will be set even if
the attempt is not completed or the result processing fails.
"""
attempt = super().launch(priority_boost_amount)
attempt.test_results = None
return attempt
def process_result(self, attempt):
"""Processes the results produced by a test shard."""
assert attempt.result
result = attempt.result
if result.cas_outputs:
attempt.task_outputs_link = result.cas_outputs.url
@attr.s
class _LogToProcess(object):
name = attr.ib(type=str)
data = attr.ib(type=str)
path = attr.ib(type=Path)
swarming_summary_path = self._api.path.mkstemp(result.id)
self._api.file.write_raw(
"write swarming summary JSON",
swarming_summary_path,
self._api.json.dumps(result.raw),
)
tefmocheck_cmd = [
self._orchestration_inputs.tefmocheck,
"-swarming-summary-json",
swarming_summary_path,
"-swarming-host",
attempt.host,
]
to_process = [
_LogToProcess(
name=self._api.testing_requests.TEST_TASK_OUTPUT_FILE,
path=result.output_dir.join(
self._api.testing_requests.TEST_TASK_OUTPUT_FILE
),
data=result.output,
)
]
for log_name in (
self._api.testing_requests.SYSLOG_NAME,
self._api.testing_requests.SERIAL_LOG_NAME,
):
if log_name in result.outputs:
log_path = result.outputs[log_name]
# TODO(crbug.com/1222835): Remove this check when bug is fixed.
self._api.path.mock_add_paths(log_path)
if self._api.path.isfile(log_path):
to_process.append(
_LogToProcess(
name=log_name,
path=log_path,
data=self._api.file.read_text(
"read %s" % log_name,
log_path,
test_data="extra log contents",
),
)
)
for log in to_process:
# Flag names from
# https://fuchsia.googlesource.com/fuchsia/+/main/tools/testing/tefmocheck/cmd/main.go
tefmocheck_flag_name = {
self._api.testing_requests.TEST_TASK_OUTPUT_FILE: "-swarming-output",
self._api.testing_requests.SERIAL_LOG_NAME: "-serial-log",
self._api.testing_requests.SYSLOG_NAME: "-syslog",
}.get(log.name)
if tefmocheck_flag_name:
tefmocheck_cmd.extend((tefmocheck_flag_name, log.path))
# Symbolize and overwrite so that tefmocheck and logdog see the
# symbolized versions.
if self._api.testing.task_targets_fuchsia(result):
# Non-Fuchsia should already be symbolized, and attempting to use
# the symbolizer may fail, if e.g. it was built on Mac and this is
# running on Linux.
attempt.logs[log.name] = self._api.symbolize(
symbolizer_tool=self._orchestration_inputs.symbolizer_tool,
data=log.data,
name="symbolize %s" % log.name,
debug_symbol_url=self._debug_symbol_url,
symbolizer_output=log.path,
)
else:
attempt.logs[log.name] = log.data
if not self._api.path.exists(log.path):
# Ensure it exists on file system even if we didn't symbolize
# so that it gets uploaded to GCS later.
self._api.file.write_raw("write %s" % log.name, log.path, log.data)
self._api.path.mock_add_paths(log.path)
test_results_dir = self._api.testing.extract_test_results(
step_name="extract results", task_result=result
)
# Copy syslog.txt, serial.test and infra_and_test_std_and_klog.txt to test_results_dir.
# We look for artifacts from test_results_dir to upload to ResultDB.
self._api.file.copy(
"copy %s" % self._api.testing_requests.TEST_TASK_OUTPUT_FILE,
result.output_dir.join(self._api.testing_requests.TEST_TASK_OUTPUT_FILE),
test_results_dir.join(self._api.testing_requests.TEST_TASK_OUTPUT_FILE),
)
for log_name in (
self._api.testing_requests.SYSLOG_NAME,
self._api.testing_requests.SERIAL_LOG_NAME,
):
# TODO(crbug.com/1222835): Remove the isfile check when bug is fixed.
if log_name in result.outputs and self._api.path.isfile(
result.outputs[log_name]
):
self._api.file.copy(
"copy %s" % log_name,
result.outputs[log_name],
test_results_dir.join(log_name),
)
tefmocheck_cmd.extend(("-outputs-dir", test_results_dir))
summary_json_path = test_results_dir.join(TEST_SUMMARY_JSON)
if self._api.path.exists(summary_json_path):
tefmocheck_cmd.extend(("-test-summary-json", summary_json_path))
tefmocheck_cmd.extend(
("-json-output", self._api.raw_io.output_text(suffix="json"))
)
summary_bytes = self._api.step("tefmocheck", tefmocheck_cmd).raw_io.output_text
attempt.logs[TEST_SUMMARY_JSON] = summary_bytes
tags = {}
for tag in self._request.tags:
k, v = tag.split(":", 1)
tags[k] = v
test_results = FuchsiaTestResults(
from_fuchsia=self._api.testing.task_targets_fuchsia(result),
results_dir=test_results_dir,
swarming_task_id=result.id,
swarming_bot_id=result.bot_id,
shard_name=result.name,
env_name=tags[self._api.testing_requests.TEST_ENVIRONMENT_TAG_NAME],
api=self._api,
task_output_dir=result.output_dir,
summary_bytes=summary_bytes,
is_multiplied=self._is_multiplied,
)
attempt.test_results = test_results
flaked_tests = test_results.flaked_tests
failed_tests = test_results.failed_tests
if not result.success:
if result.state == self._api.swarming.TaskState.COMPLETED:
# The task will have state COMPLETED even if it had a non-zero
# exit code, so show a more helpful message than "completed".
attempt.failure_reason = "task failed"
else:
# "BOT_DIED" -> "bot died"
attempt.failure_reason = result.state.name.lower().replace("_", " ")
else:
if flaked_tests:
attempt.has_flakes = True
if failed_tests:
attempt.failure_reason = "%s failed" % pluralize("test", failed_tests)
# Only retry if all the failures are testing_failure_mode checks.
# Sometimes, a problem can occur in the system that causes all tests
# to fail after that point, in which case we should also retry.
# The number of failed tests to check for in this case should match
# that of the mass_test_failure_check here:
# https://fuchsia.googlesource.com/fuchsia/+/e2f3065a52c5bd69090d0c2dcd8cbc76a75d601c/tools/testing/tefmocheck/cmd/main.go#125
should_retry = (
all(t.get("is_testing_failure_mode") for t in failed_tests)
or len(failed_tests) > 5
)
if not self._retry_task_on_test_failure and not should_retry:
self.max_attempts = 1
# Log the contents of each output file mentioned in the summary.
# Note this assumes the outputs are all valid UTF-8 (See fxb/9500).
for output_name, output_path in test_results.summary.get("outputs", {}).items():
attempt.logs[output_name] = test_results.get_output(output_path)
for tests_by_name in [failed_tests] + [flaked_tests]:
for test in tests_by_name:
for output_file in test["output_files"]:
# Proactively read the log so that we can use an empty step
# that doesn't have "execution details" or "stdout" logs to
# present the failed test more clearly.
test_results.get_output(output_file)
triage_step = test_results.run_triage(
self._orchestration_inputs.triage,
self._orchestration_inputs.triage_sources,
)
if triage_step:
attempt.logs["triage_output"] = triage_step.stdout.splitlines()
def present_attempt(self, _, attempt, category=None):
"""Present an Attempt when summarizing results at the end of the run.
Args:
attempt (Attempt): the Attempt to present
category (str): the group of tasks ('passes', 'failures', or
'flakes') that this attempt should be presented under
"""
show_failures_in_red = True
# The 'passes' category includes all attempts of all tasks that
# eventually passed, so it includes some failures. Show those in
# green so people don't get confused and think the overall task
# failed.
# TODO(fxb/36647) after this bug is fixed show these steps in
# red, but show parent steps of those in green.
if category == "passes" or category == "incomplete":
show_failures_in_red = False
attempt_status = "fail"
if attempt.success:
if attempt.has_flakes:
attempt_status = "flake"
else:
attempt_status = "pass"
name = "%s (%s)" % (attempt.name, attempt_status)
with self._api.step.nest(name) as presentation:
if show_failures_in_red and (not attempt.success or attempt.has_flakes):
presentation.status = self._api.step.FAILURE
if attempt.result and attempt.result.duration_secs:
presentation.step_text = nice_duration(attempt.result.duration_secs)
presentation.presentation.links["swarming task"] = attempt.task_ui_link
if attempt.task_outputs_link:
presentation.links["task outputs"] = attempt.task_outputs_link
if attempt.bot_ui_link:
presentation.presentation.links[
"bot %s" % attempt.bot_id
] = attempt.bot_ui_link
if attempt.failure_reason:
presentation.step_summary_text = attempt.failure_reason
for log, data in sorted(attempt.logs.items()):
presentation.logs[log] = data
if attempt.test_results:
test_results = attempt.test_results
# Log the contents of each output file mentioned in the summary.
# Note this assumes the outputs are all valid UTF-8 (See fxb/9500).
for name, path in test_results.summary.get("outputs", {}).items():
presentation.logs[name] = test_results.get_output(path).split("\n")
test_results.present_tests(
show_failures_in_red=show_failures_in_red, show_passed=True
)
class FuchsiaTestApi(recipe_api.RecipeApi):
"""API for running tests and processing test results."""
FuchsiaTestResults = FuchsiaTestResults
TEST_SUMMARY_JSON = TEST_SUMMARY_JSON
def task_targets_fuchsia(self, task_result):
# Sharded tasks will have the OS environment in the task name.
# Deprecated tasks do not have the OS in the name, but they always target
# fuchsia, so this function should always return true.
name = task_result.name.lower()
return "linux" not in name and "mac" not in name
def extract_test_results(self, step_name, task_result):
"""Extracts test results from task_result.
Moves test results from the task_result's outputs into a new
directory. We do this because we later want to be able to archive
everything in the swarming task result's outputs, but we don't want
to archive most of the test result files.
Args:
step_name (str): The name of the step.
task_result (api.swarming.TaskResult): The task result from which to extract
the test results.
Returns:
The Path to the directory into which test results were extracted.
"""
# Use a directory that doesn't yet exist so that `api.file.move()`
# moves the source directory to the location of this directory, instead
# of *into* this directory.
test_results_dir = self.m.path.mkdtemp(task_result.id).join("test_results")
# The outputs should be in TEST_RESULTS_DIR_NAME.
test_results_path = None
for relative_path in sorted(task_result.outputs.keys()):
if relative_path.startswith(
self.m.testing_requests.TEST_RESULTS_DIR_NAME + "/"
):
test_results_path = str(
task_result.output_dir.join(
self.m.testing_requests.TEST_RESULTS_DIR_NAME
)
)
break
# If the swarming task failed, there may not be any test results.
if test_results_path is None and not task_result.success:
self.m.file.ensure_directory("create test results dir", test_results_dir)
return test_results_dir
assert (
test_results_path
), "test results not found amongst outputs of task %s: %s" % (
task_result.name,
task_result.outputs,
)
self.m.file.move(step_name, source=test_results_path, dest=test_results_dir)
# This is only needed for the recipe tests. file.listdir() doesn't mock the
# existence of the paths it returns, so we must add it separately.
# We add summary_path because we check for its existence in
# FuchsiaTestResults._parse_summary().
summary_path = test_results_dir.join(TEST_SUMMARY_JSON)
outputs = self.m.file.listdir("get extracted files", test_results_dir)
if summary_path in outputs:
self.m.path.mock_add_paths(summary_path)
return test_results_dir
def run_test_tasks(
self,
debug_symbol_url,
max_attempts,
orchestration_inputs,
rerun_budget_secs=None,
runs_per_shard=1,
retry_task_on_test_failure=False,
):
"""Tests a Fuchsia build by sharding.
Expects the build and artifacts to be at the same place they were at
the end of the build.
Args:
debug_symbol_url (str): A GCS URL hosting debug symbols.
max_attempts (int): Maximum number of attempts before marking a
shard as failed.
orchestration_inputs (TestOrchestrationInputs): Build artifacts
needed for testing.
rerun_budget_secs (int): Will run tests repeatedly until this
budget is. consumed. If set, max_attempts is ignored.
runs_per_shard (int): Number of times to run each shard.
retry_task_on_test_failure (bool): Retry tasks on all test
failures. If false, retry whole tasks on tefmocheck failures
but not on regular test failures.
Returns:
A list of swarming_retry.Tasks representing the completed test
tasks that were not subject to an infra failure.
"""
# If no shards have been provided, then we have successfully run the empty
# set of tests.
if not orchestration_inputs.task_requests:
self.m.step.empty("no tests to run")
return []
launch_deadline_time = None
if rerun_budget_secs:
# If we have a rerun deadline, we want to run the tests as many times as
# possible within that time regardless of whether the tasks pass or fail,
# so we don't care about setting a max number of attempts.
max_attempts = float("inf")
launch_deadline_time = self.m.time.time() + rerun_budget_secs
tasks = []
for task_request in orchestration_inputs.task_requests:
if rerun_budget_secs:
# Rerun tasks should not block other tasks.
task_request = task_request.with_priority(task_request.priority + 1)
tasks.append(
Task(
request=task_request,
api=self.m,
orchestration_inputs=orchestration_inputs,
debug_symbol_url=debug_symbol_url,
retry_task_on_test_failure=retry_task_on_test_failure,
)
)
tasks = self.m.swarming_retry.run_tasks(
tasks=tasks,
collect_output_dir=self.m.path.mkdtemp("swarming"),
max_attempts=max_attempts,
launch_deadline_time=launch_deadline_time,
run_count=runs_per_shard,
)
self.m.swarming_retry.present_tasks(tasks=tasks)
return tasks
def raise_failures(self, tasks):
failed_tasks = []
failed_tests = []
failed_tests_no_tefmo = []
for task in tasks:
if task.failed:
failed_tasks.append(task.name)
# Report the test failures from the last failed attempt.
attempt = task.get_failed_attempts()[-1]
if attempt.test_results:
failed_tests.extend(
t["name"] for t in attempt.test_results.failed_tests
)
failed_tests_no_tefmo.extend(
t["name"]
for t in attempt.test_results.failed_tests
if not t.get("is_testing_failure_mode")
)
# Expose these so tools can be used to re-run these failed tests.
with self.m.step.nest("record failed_test_names") as presentation:
presentation.properties["failed_test_names"] = sorted(
set(failed_tests_no_tefmo)
)
if not failed_tests:
if failed_tasks:
raise self.m.step.StepFailure(
"failed to process test results for %s: %s"
% (
pluralize("task", failed_tasks),
", ".join(sorted(failed_tasks)),
)
)
return
def _test_description(test, failure_count):
count_info = ""
if failure_count > 1:
count_info = " (%d failures)" % failure_count
return "%s%s" % (test, count_info)
failure_counts = collections.Counter(failed_tests)
header = "%s failed:" % pluralize("test", failure_counts)
if len(failure_counts) == 1:
test, failure_count = failure_counts.popitem()
raise self.m.step.StepFailure(
"%s %s" % (header, _test_description(test, failure_count))
)
error_lines = [
header,
# Milo's markdown renderer requires a blank line before the
# start of a bulleted list. Otherwise the list lines will all
# be rendered on one hyphen-separated line.
"",
]
# Present up to 10 tests and show a count of the rest. But if there are
# 11-15 failed tests, still show all of the tests because it's not
# worth saving lines by hiding only a couple tests.
max_tests_to_present = 15
min_tests_to_hide = 5
if len(failure_counts) > max_tests_to_present:
tests_to_present = max_tests_to_present - min_tests_to_hide
else:
tests_to_present = len(failure_counts)
counts_to_present = sorted(failure_counts.items())[:tests_to_present]
for test, failure_count in counts_to_present:
error_lines.append("- %s" % _test_description(test, failure_count))
hidden_tests = len(failure_counts) - tests_to_present
if hidden_tests > 0:
error_lines.append("- (%d more)" % hidden_tests)
raise self.m.step.StepFailure("\n".join(error_lines))