blob: 792e72b981985803c8a079fe70e990fd962bfe15 [file] [log] [blame]
# Copyright 2019 The Fuchsia Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import collections
import operator
import attr
from recipe_engine import recipe_api
from recipe_engine.config_types import Path
from RECIPE_MODULES.fuchsia.testsharder import api as testsharder_api
# List of available targets.
TARGETS = ['x64', 'arm64']
# Name of BigQuery project and table for uploading artifacts.
BIGQUERY_PROJECT = 'fuchsia-infra'
BIGQUERY_ARTIFACTS_DATASET = 'artifacts'
TEST_SUMMARY_JSON = 'summary.json'
KERNEL_LOG = 'kernel_log.txt'
COVARGS_LOG_LEVEL = 'debug'
COVARGS_OUTPUT_JSON = 'covargs-output.json'
@attr.s
class FuchsiaTestResults(object):
"""Represents the result of testing of a Fuchsia build.
Attributes:
from_fuchsia (bool): Whether the tests ran on Fuchsia.
results_dir (Path): The directory that the test results archive has
been unpacked into.
output_dir (Path): A directory containing the outputs of the swarming
task that ran these tests. Anything that's in this directory will be
uploaded to GCS when upload_results() is called.
outputs (dict[str]str): A mapping from of relative paths to files
containing stdout+stderr data to strings containing those contents.
swarming_task_id (str): The ID of the task that ran these tests.
symbolizer_json_output (Path or None): The path to the json trigger
information produced by the symbolizer.
env_name (str): The name of the task that ran these tests.
tests (seq(testsharder.Test)): The tests that this task was instructed
to run (as opposed to the results of the tests that this task *did*
run, which are enumerated in `summary`).
legacy_qemu (bool): Whether these tests were run using QEMU with
runtests (no ssh).
api (RecipeApi): The api to use for accessing recipe modules from this
object.
symbolizer_output (Path or None): The path to the symbolized log file
produced by running these tests.
overwrite_summary (bool): Whether to set the "name" and "gn_label" fields
in the summary.json produced by these tests using the corresponding
values from the input tests.json. Only affects legacy QEMU tests.
(Solely for backwards compatibility with fuchsia_perf.)
"""
from_fuchsia = attr.ib(type=bool)
results_dir = attr.ib(type=Path)
output_dir = attr.ib(type=Path)
outputs = attr.ib(type=dict)
swarming_task_id = attr.ib(type=str)
symbolizer_json_output = attr.ib(type=Path)
_env_name = attr.ib(type=str)
_tests = attr.ib(type=testsharder_api.Test)
_legacy_qemu = attr.ib(type=bool)
_api = attr.ib(type=recipe_api.RecipeApi)
_symbolizer_output = attr.ib(None, type=Path)
# TODO(fxb/10410): Get rid of overwrite_summary after fuchsia_perf is dead.
_overwrite_summary = attr.ib(True, type=bool)
# Set lazily by the `summary` property, not a parameter to __init__.
_summary = attr.ib(None, init=False)
# Constants representing the result of running a test. These enumerate the
# values of the 'results' field of the entries in the summary.json file
# obtained from the target device.
_TEST_RESULT_PASS = 'PASS'
_TEST_RESULT_FAIL = 'FAIL'
@property
def summary(self):
"""The parsed summary file as a Dict or {} if missing."""
if self._summary is None:
self._summary = self._parse_summary()
return self._summary
@property
def summary_lines(self):
"""Returns a list of the lines of the summary.json file."""
return self._api.json.dumps(self.summary, indent=2).splitlines()
@property
def passed(self):
"""Whether all the tests passed."""
tests = self.summary.get('tests', [])
return all(test['result'] == self._TEST_RESULT_PASS for test in tests)
@property
def passed_test_outputs(self):
"""All entries in |self.outputs| for tests that passed."""
return self._filter_outputs_by_test_result(self._TEST_RESULT_PASS)
@property
def failed_test_outputs(self):
"""All entries in |self.outputs| for tests that failed."""
return self._filter_outputs_by_test_result(self._TEST_RESULT_FAIL)
def _filter_outputs_by_test_result(self, result):
"""Returns all entries in |self.outputs| whose result is |result|.
Args:
result (String): one of the _TEST_RESULT_* constants from this class.
Returns:
A dict whose keys are paths to the files containing each test's
stderr+stdout data and whose values are strings containing those
contents.
"""
matches = collections.OrderedDict()
# TODO(kjharland): Sort test names first.
for test in self.summary.get('tests', ()):
if test['result'] == result:
# The 'output_file' field is a path to the file containing the
# stderr+stdout data for the test, and we inline the contents of that
# file as the value in the returned dict.
matches[test['name']] = self.outputs[test['output_file']]
return matches
def _parse_summary(self):
raw_summary = self.outputs.get(TEST_SUMMARY_JSON, '')
if not raw_summary:
return {}
try:
summary = self._api.json.loads(raw_summary)
except ValueError as e: # pragma: no cover
# TODO(olivernewman): JSONDecodeError in python >=3.5
raise self._api.step.StepFailure('Invalid %s: %s' %
(TEST_SUMMARY_JSON, e.args[0]))
if not self._overwrite_summary or not self._legacy_qemu:
return summary
# We want all Fuchsia tests to have the package URL in the name field. But
# legacy QEMU tests set "name" to be the test install path (since the test
# list sent to QEMU is a list of paths). So overwrite the "name" field to
# be the package URL instead.
# Also set "gn_label", which doesn't automatically get passed through from
# tests.json.
# TODO(garymm,joshuaseaton): Once deprecated testing code path is removed,
# self._legacy_qemu will always be False and we can remove this logic.
tests_by_path = {test.path: test for test in self._tests}
for summary_test in summary['tests']:
path = summary_test['name']
# Some zircon tests get run even though they don't show up in tests.json.
# TODO(olivernewman): After build unification is complete we can assume
# that every test in summary.json will have a corresponding entry in
# tests.json, so get rid of this check and update every summary test.
if path in tests_by_path:
test = tests_by_path[path]
assert test.package_url
summary_test.update(
name=test.package_url,
gn_label=test.label,
)
return summary
def present_tests(self, show_failures_in_red, show_passed):
for test, stdio in self.failed_test_outputs.iteritems():
step = self._api.step('failed: %s' % test, None)
step.presentation.logs['stdio'] = stdio.split('\n')
if show_failures_in_red:
step.presentation.status = self._api.step.FAILURE
# There's recipe overhead that makes step creation slow, which we mitigate
# by cramming all the passed tests into a single step. We also skip
# presenting stdio since it's generally only useful if the test failed.
with self._api.step.nest('all passed tests') as passed_tests_step:
passed_tests = self.passed_test_outputs
passed_tests_step.presentation.step_summary_text = ('%d passed tests' %
len(passed_tests))
if show_passed:
# Start with a newline to prevent the first test from showing up on
# the same line as the step name.
passed_tests_step.presentation.step_text = ''.join(
'\n' + test_name for test_name in passed_tests)
def upload_results(self, gcs_bucket, upload_to_catapult):
"""Upload select test results (e.g., coverage data) to a given GCS bucket."""
assert gcs_bucket
with self._api.step.nest('upload %s test results' % self._env_name):
if self.summary:
# Save the summary JSON to the test shard output dir so it gets
# uploaded to GCS for easy access by e.g. Dachsiaboard.
summary_path = self.output_dir.join(TEST_SUMMARY_JSON)
assert not self._api.path.exists(summary_path), (
'test output files should not be named %s' % TEST_SUMMARY_JSON)
self._api.file.write_json('write %s' % TEST_SUMMARY_JSON, summary_path,
self.summary)
self._upload_outputs(gcs_bucket)
if upload_to_catapult:
self._api.upload.test_outputs_to_catapult(self.output_dir)
def _upload_outputs(self, gcs_bucket):
self._api.upload.directory_to_gcs(
source=self.output_dir,
bucket=gcs_bucket,
# Namespace to avoid collision across shards and attempts.
subpath='%s/%s' % (self._env_name, self.swarming_task_id),
)
def raise_failures(self):
"""Raises a step failure if there were test failures."""
if not self.summary:
# Halt with step failure if summary file is missing.
raise self._api.step.StepFailure(
'Test summary JSON not found, see symbolized log for details')
failed_tests = self.failed_test_outputs.keys()
if failed_tests:
# Halt with a step failure.
raise self._api.step.StepFailure('Test failure(s): ' +
', '.join(failed_tests))
# Check serial log for failure messages
# TODO(9936): Replace with running binary tool once created.
fail_strings = ['DEVICE SUSPEND TIMED OUT', 'ASSERT FAILED']
log_path = self.output_dir.join(self._api.testing_requests.SERIAL_LOG_NAME)
self._api.path.mock_add_paths(log_path)
if self._api.path.exists(log_path):
self._check_log_for_failures(log_path, fail_strings)
def _check_log_for_failures(self, log_path, fail_strings):
"""Checks for fail strings in log and fails accordingly."""
log_name = self._api.path.basename(log_path)
with self._api.step.nest('check log %s:%s' %
(self._env_name, log_name)) as check_log_step:
contents = self._api.file.read_text(
'read %s' % log_name,
log_path,
test_data='extra log contents',
).get_result()
for fail_str in fail_strings:
if fail_str in contents:
check_log_step.presentation.logs[log_name] = contents.splitlines()
raise self._api.step.StepFailure(
'Found failure string in log %s: %s' % (log_name, fail_str))
def create_task(api, *args, **kwargs):
"""Create a Task object.
The base class of the class is inside the api object, so it can't be
top-level or otherwise defined at module load time. Defining it in this
function as an alternative.
For full args list see Task.__init__ a few lines down.
"""
class Task(api.swarming_retry.TriggeredTask):
def __init__(self, api, name, request, uses_legacy_qemu, targets_fuchsia,
symbolize_tool, llvm_symbolizer, tests,
debug_symbol_gcs_bucket, *args, **kwargs):
super(Task, self).__init__(
api=api, name=name, request=request, *args, **kwargs)
self._uses_legacy_qemu = uses_legacy_qemu
self._targets_fuchsia = targets_fuchsia
self._symbolize_tool = symbolize_tool
self._llvm_symbolizer = llvm_symbolizer
self._tests = tests
self._debug_symbol_gcs_bucket = debug_symbol_gcs_bucket
# Test shards with the 'multiplied:' prefix come from
# tools/integration/testsharder/shard.go in fuchsia.git. They were
# specifically created to run a test or set of tests many times to look
# for flakes. It doesn't make sense to retry these when they fail--the
# goal is to see if they fail not to get them to pass.
if name.startswith('multiplied:'):
self.max_attempts = 1
def process_result(self):
"""Unpacks the results archive produced by a test shard."""
attempt = self.attempts[-1]
assert attempt.result
result = attempt.result
if result.isolated_outputs:
attempt.task_outputs_link = result.isolated_outputs.url
if result.state == self._api.swarming.TaskState.TIMED_OUT:
attempt.failure_reason = 'timed out'
attempt.test_results = None
with self._api.step.nest(result.name):
attempt.symbolizer_output = result.output_dir.join(
self._api.symbolize.LOG)
attempt.symbolizer_json_output = self._api.path['cleanup'].join(
'%s-%d-%s' %
(self.name, attempt.index, self._api.symbolize.OUTPUT_JSON))
# Figure out what happened to the swarming task.
if result.output:
# Fuchsia crashes have to be symbolized on the host.
if self._targets_fuchsia:
attempt.logs['symbolized log'] = self._api.symbolize(
symbolize_tool=self._symbolize_tool,
debug_symbol_gcs_bucket=self._debug_symbol_gcs_bucket,
llvm_symbolizer=self._llvm_symbolizer,
data=result.output,
symbolizer_output=attempt.symbolizer_output,
json_output=attempt.symbolizer_json_output,
)
# Non-Fuchsia should already be symbolized, and attempting to use
# the symbolizer may fail, if e.g. it was built on Mac and this is
# running on Linux.
else:
attempt.logs['symbolized log'] = result.output
if 'KERNEL PANIC' in result.output:
attempt.failure_reason = 'KERNEL PANIC' # pragma: no cover
self._check_logs_for_failures(attempt)
if result.success:
self._process_outputs(attempt)
def _process_outputs(self, attempt):
"""Reads the test results and output files of a swarming TaskResult.
Sets attempt.test_results if successful.
Args:
attempt (swarming_retry.Attempt): the attempt to process
"""
assert attempt.result
result = attempt.result
# Extract results if the task was not subject to an infra failure;
# otherwise, a step failure will be raised on exiting the
# defer_results() scope.
attempt.test_results_archive = None
for relative_path, absolute_path in sorted(result.outputs.iteritems()):
if relative_path in [
self._api.testing_requests.TEST_RESULTS_ARCHIVE_NAME,
self._api.testing_requests.TEST_RESULTS_MINFS_NAME
]:
attempt.test_results_archive = absolute_path
assert attempt.test_results_archive, (
'test archive not found amongst outputs of task %s' % result.name)
self._parse_test_results(attempt)
attempt.logs[TEST_SUMMARY_JSON] = attempt.test_results.summary_lines
# Delete the archive so it doesn't get uploaded with the other files in
# the swarming task's output directory.
self._api.file.remove(
'remove %s' % self._api.path.basename(attempt.test_results_archive),
attempt.test_results_archive)
def _parse_test_results(self, attempt):
"""Parse test results from attempt into a FuchsiaTestResults object.
Args:
attempt (swarming_retry.Attempt): the attempt to parse
"""
assert attempt.result
result = attempt.result
results_dir = self._api.testing.results_dir_on_host.join(result.id)
# pylint: disable=protected-access
test_results_map = self._api.testing._extract_test_results_archive(
step_name='extract',
archive_path=attempt.test_results_archive,
leak_to=results_dir,
is_minfs=self._uses_legacy_qemu,
)
# pylint: enable=protected-access
attempt.test_results = FuchsiaTestResults(
from_fuchsia=self._targets_fuchsia,
results_dir=results_dir,
outputs=test_results_map,
swarming_task_id=attempt.task_id,
symbolizer_json_output=attempt.symbolizer_json_output,
env_name=result.name,
tests=self._tests,
legacy_qemu=self._uses_legacy_qemu,
api=api,
symbolizer_output=attempt.symbolizer_output,
output_dir=result.output_dir,
)
failed_tests = attempt.test_results.failed_test_outputs
if failed_tests:
attempt.failure_reason = '%d test(s) failed' % len(failed_tests)
def _check_logs_for_failures(self, attempt):
"""Check for failure strings in logs.
Args:
attempt (swarming_retry.Attempt): the attempt to check for logs in
"""
# Check serial log for failure messages
# TODO(9936): Replace with running binary tool once created.
fail_strings = ['DEVICE SUSPEND TIMED OUT', 'ASSERT FAILED']
log_path = attempt.result.output_dir.join(
self._api.testing_requests.SERIAL_LOG_NAME)
self._api.path.mock_add_paths(log_path)
if self._api.path.exists(log_path):
log_name = self._api.path.basename(log_path)
with self._api.step.nest('check log %s' % log_name) as presentation:
contents = self._api.file.read_text('read', log_path)
for fail_str in fail_strings:
if fail_str in contents:
presentation.logs[log_name] = contents.splitlines()
presentation.status = self._api.step.FAILURE
presentation.step_summary_text = 'found "%s"' % fail_str
attempt.failure_reason = ('found "%s" in %s' %
(fail_str, log_name))
def present_status(self, parent_step, attempt, **kwargs):
"""Present an Attempt while showing progress in launch/collect step.
Args:
parent_step (Step): will always be 'passed tasks' or 'failed tasks'
attempt (Attempt): the Attempt to present
"""
del kwargs, parent_step # Unused.
with api.step.nest('%s (%s)' % (self.name, attempt.name)) as presentation:
self._present(
presentation,
attempt,
show_failures_in_red=False,
show_passed=False)
def present_attempt(self, _, attempt, category=None):
"""Present an Attempt when summarizing results at the end of the run.
Args:
attempt (Attempt): the Attempt to present
category (str): the group of tasks ('passes', 'failures', or
'flakes') that this attempt should be presented under
"""
show_failures_in_red = True
# The 'passes' category includes all attempts of all tasks that
# eventually passed, so it includes some failures. Show those in
# green so people don't get confused and think the overall task
# failed.
# TODO(fxb/36647) after this bug is fixed show these steps in
# red, but show parent steps of those in green.
if category == 'passes':
show_failures_in_red = False
name = '%s (%s)' % (attempt.name, 'pass' if attempt.success else 'fail')
with api.step.nest(name) as presentation:
if show_failures_in_red and not attempt.success:
presentation.status = self._api.step.FAILURE
self._present(
presentation,
attempt,
show_failures_in_red=show_failures_in_red,
show_passed=True,
)
def _present(self, presentation, attempt, show_failures_in_red,
show_passed):
"""Present an Attempt.
Choosing to do largely the same thing for both kinds of presentations.
Args:
presentation (StepPresentation): where to present the attempt info
attempt (api.swarming_retry.Attempt): object to present
show_failures_in_red (bool): show failures in red (for final
'flakes' and 'failures' steps) or not (for 'launch/collect'
progress and 'passes' steps)
show_passed (bool): show the names of passed tests (only done for
the end)
Note: the 'passes' step can have failures underneath it because the
first attempt can fail but the retry passed.
"""
if attempt.result.duration_secs:
presentation.step_text = nice_time(attempt.result.duration_secs)
presentation.presentation.links['swarming task'] = attempt.task_ui_link
if attempt.task_outputs_link:
presentation.links['task outputs'] = attempt.task_outputs_link
if attempt.failure_reason:
presentation.step_summary_text = attempt.failure_reason
for log, data in attempt.logs.iteritems():
presentation.logs[log] = data
if attempt.test_results:
test_results = attempt.test_results
# Log the contents of each output file mentioned in the summary.
# Note this assumes the outputs are all valid UTF-8 (See fxb/9500).
for name, path in test_results.summary.get('outputs', {}).iteritems():
presentation.logs[name] = test_results.outputs[path].split('\n')
test_results.present_tests(
show_failures_in_red=show_failures_in_red, show_passed=show_passed)
for log_name in [
self._api.testing_requests.SYSLOG_NAME,
self._api.testing_requests.SERIAL_LOG_NAME
]:
if log_name in attempt.result.outputs:
self._present_output_file(
name=log_name,
path=attempt.result.outputs[log_name],
step=presentation)
def _present_output_file(self, name, path, step):
"""Records file contents to the test results step's presentation."""
contents = self._api.file.read_text(
'read %s' % name,
path,
test_data='extra log contents',
)
step.presentation.logs[name] = contents.splitlines()
return Task(*args, api=api, **kwargs)
class _ShardedTestRunner(object):
"""Handles running and analyzing tests that have been split into shards."""
def __init__(self, api, collect_timeout, debug_symbol_gcs_bucket,
llvm_symbolizer, max_attempts, rerun_budget_secs,
swarming_output_dir, symbolize_tool, shard_requests):
self._api = api
self._collect_timeout = collect_timeout
self._max_attempts = max_attempts
launch_deadline_time = None
self._present = True
if rerun_budget_secs:
assert max_attempts == 1, (
'If rerun_budget_secs is set, max_attempts should be set to 1')
# Presenting results is currently slow due to inefficiencies in
# the recipe engine. For builds that have rerun_budget_secs enabled,
# we only care about consuming the data after it's been uploaded, not
# from the Milo build page, so skip presentation.
self._present = False
launch_deadline_time = self._api.time.time() + rerun_budget_secs
self._swarming_output_dir = swarming_output_dir
self.tasks = []
for shard_request in shard_requests:
uses_legacy_qemu = any(tag.lower() == 'uses_legacy_qemu:true'
for tag in shard_request.task_request.tags)
targets_fuchsia = shard_request.task_request[0].dimensions.get(
'os', '').lower() not in ('linux', 'mac')
self.tasks.append(
create_task(
api=self._api,
name=shard_request.task_request.name,
request=shard_request.task_request,
symbolize_tool=symbolize_tool,
llvm_symbolizer=llvm_symbolizer,
tests=shard_request.shard.tests,
debug_symbol_gcs_bucket=debug_symbol_gcs_bucket,
uses_legacy_qemu=uses_legacy_qemu,
targets_fuchsia=targets_fuchsia,
launch_deadline_time=launch_deadline_time,
))
def run_tests(self):
"""Runs all test shards and returns a Task object for each."""
# TODO(fxb/35021) use context manager.
self._api.swarming_retry.run_tasks(
tasks=self.tasks,
collect_output_dir=self._swarming_output_dir,
max_attempts=self._max_attempts,
collect_timeout=self._collect_timeout,
)
if self._present:
self._api.swarming_retry.present_tasks(tasks=self.tasks)
return self.tasks
def raise_failures(self):
self._api.swarming_retry.raise_failures(self.tasks)
class FuchsiaTestApi(recipe_api.RecipeApi):
"""API for running tests and processing test results."""
FuchsiaTestResults = FuchsiaTestResults
def __init__(self, *args, **kwargs):
super(FuchsiaTestApi, self).__init__(*args, **kwargs)
self._test_runner = None
def _analyze_test_results(self, test_results, presentation=None):
"""Analyzes test results represented by FuchsiaTestResults objects
Logs individual test results in separate steps.
Args:
test_results (FuchsiaTestResults): Fuchsia test result object
presentation (dict or None): A particular step's presentation on which to log
test result outputs; if not provided, that of the active result will be
used.
"""
if not test_results.summary:
return
presentation = presentation or self.m.step.active_result.presentation
# Log the summary file's contents.
presentation.logs[TEST_SUMMARY_JSON] = test_results.summary_lines
# Log the contents of each output file mentioned in the summary.
# Note this assumes the outputs are all valid UTF-8 (See fxb/9500).
for output_name, output_path in test_results.summary.get('outputs',
{}).iteritems():
output_str = test_results.outputs[output_path]
presentation.logs[output_name] = output_str.split('\n')
test_results.present_tests(show_failures_in_red=True, show_passed=True)
def process_coverage(self, covargs_path, test_results, ids_txt, llvm_profdata,
llvm_cov, gcs_bucket):
output_dir = self.m.path['cleanup'].join('coverage')
cmd = [
covargs_path,
'-level',
COVARGS_LOG_LEVEL,
'-json-output',
self.m.json.output(name=COVARGS_OUTPUT_JSON),
'-output-dir',
output_dir,
'-llvm-profdata',
llvm_profdata,
'-llvm-cov',
llvm_cov,
'-ids',
ids_txt,
]
for result in test_results:
cmd.extend([
'-summary', result.results_dir.join(TEST_SUMMARY_JSON),
'-symbolize-dump', result.symbolizer_json_output,
]) # yapf: disable
self.m.step('covargs', cmd)
# TODO: move this into gsutil module/deduplicate this with other GCS logic
dst = 'builds/%s/coverage' % self.m.buildbucket_util.id
step_result = self.m.gsutil.rsync(
name='upload coverage',
src=output_dir,
bucket=gcs_bucket,
dst=dst,
recursive=True,
gzip_exts=['html'],
options={
'parallel_process_count': self.m.platform.cpu_count,
'parallel_thread_count': 1,
},
multithreaded=True)
step_result.presentation.links['index.html'] = self.m.gsutil._http_url(
gcs_bucket, self.m.gsutil.join(dst, 'index.html'), True)
@property
def results_dir_on_host(self):
"""The directory on host to which host and target test results will be written.
Target test results will be copied over to this location and host test
results will be written here. Host and target tests on should write to
separate subdirectories so as not to collide.
"""
return self.m.path['cleanup'].join('test_results')
def _extract_test_results_archive(self,
step_name,
archive_path,
is_minfs=False,
leak_to=None):
"""Extracts test results from an archive.
Args:
step_name (str): The name of the step.
archive_path (Path): The path to the archive which contains test results.
is_minfs (bool): Whether the archive in question is a minfs image
containing QEMU test results. If false, then the archive is assumed to
be a tar file.
leak_to (Path): Optionally leak the contents of the archive to a
directory.
Returns:
A dict mapping a filepath relative to the root of the archive to the
contents of that file in the archive.
"""
if is_minfs:
return self.m.minfs.copy_image(
step_name=step_name,
image_path=archive_path,
out_dir=leak_to,
).raw_io.output_dir
return self.m.tar.extract(
step_name=step_name,
path=archive_path,
directory=self.m.raw_io.output_dir(leak_to=leak_to),
).raw_io.output_dir
def deprecated_test(self, *args, **kwargs):
"""Tests a Fuchsia build on the specified device with retries.
Expects the build and artifacts to be at the same place they were at
the end of the build.
Args (see _launch_collect_process_funcs for other args):
max_attempts (int): The tests will be run repeatedly until either
max_attempts is hit or all tests pass.
Returns:
A `FuchsiaTestResults` object corresponding to the last test attempt.
"""
# Ideally this method's arguments would look like
# (self, *args, max_attempts=0, **kwargs)
# but Python 2 doesn't allow default keyword args after variable-length
# positional *args :(
max_attempts = kwargs.pop('max_attempts', 0)
if not max_attempts:
max_attempts = self.m.swarming_retry.DEFAULT_MAX_ATTEMPTS
launch, collect, process = self._launch_collect_process_funcs(
*args, **kwargs)
test_results = None
final_exception = None
# TODO(olivernewman): status='last' should cause this step to turn green as
# long as the *last* test attempt is green, but this isn't working, at
# least not for led jobs (if the first attempt fails at the second passes,
# the build is marked as a failure). Figure out whether this will be
# resolved by using luci_runner.
with self.m.step.nest('run tests', status='last'):
for i in range(max_attempts):
with self.m.step.nest('attempt %d' % i) as attempt_presentation:
task_result = collect(launch())
try:
test_results = process(
task_result, presentation=attempt_presentation)
except self.m.step.StepFailure as e:
final_exception = e
else:
final_exception = None
if test_results.passed:
attempt_presentation.step_text = 'passed'
break
else:
failed_count = len(test_results.failed_test_outputs)
attempt_presentation.step_text = ('%d test(s) failed' %
failed_count)
if final_exception:
raise final_exception # pylint: disable=raising-bad-type
return test_results
def deprecated_test_async(self, *args, **kwargs):
"""Launches a swarming task to run Fuchsia tests.
Returns:
A function that, when invoked, waits for the tests to complete and
returns a `FuchsiaTestResults` object representing the completed test.
"""
launch, collect, process = self._launch_collect_process_funcs(
*args, **kwargs)
request_metadata = launch()
return lambda: process(collect(request_metadata))
def _launch_collect_process_funcs(
self,
debug_symbol_gcs_bucket,
device_type,
orchestration_inputs,
overwrite_summary=True,
):
"""Returns 3-tuple of functions to launch Fuchsia tests, wait for them to
complete, and process the results.
Args:
debug_symbol_gcs_bucket (str): A GCS bucket hosting debug symbols.
device_type (str): Used as swarming device_type dimension.
orchestration_inputs (TestOrchestrationInputs): the bits of data
needed to orchestrate testing.
overwrite_summary (bool): Whether to overwrite the name and label
fields in summary.json based on tests.json. This should *only* be
used by fuchsia_perf; do NOT add any new dependencies on this.
TODO(fxb/10410): remove this entirely after fuchsia_perf is dead.
Returns:
A tuple of functions:
- `launch`, which takes no arguments and launches a swarming task to
run tests against the given build artifacts. Returns a
`TaskRequestMetadata` object.
- `collect`, which takes the `TaskRequestMetadata` object returned by
launch` (and, optionally, a `StepPresentation` object to add logs
to). It blocks until the task is complete and returns a swarming
`TaskResult`.
- `process`, which processes the results and returns a
`FuchsiaTestResults` object representing the completed tests.
"""
task = orchestration_inputs.shard_requests[0].task_request
# This directory gets passed into `collect()`, but unfortunately the
# `output_dir` attribute of the `TaskResult` returned by `collect()` is
# a subdirectory of this output dir (to ensure that different tasks'
# outputs do not collide when calling `api.swarming.collect()` with many
# tasks). So we make this variable in-scope for all three functions so that
# `process()` can use it as the output dir for the test results object.
output_dir = self.m.path.mkdtemp('swarming')
def launch():
with self.m.context(infra_steps=True):
return self.m.swarming.trigger(
'trigger 1 task', [task])
def collect(request_metadata):
with self.m.context(infra_steps=True):
results = self.m.swarming.collect(
'collect', tasks=request_metadata, output_dir=output_dir)
assert len(results) == 1, 'len(%s) != 1' % repr(results)
return results[0]
def process(task_result, presentation=None):
symbolizer_output = output_dir.join(self.m.symbolize.LOG)
symbolizer_json_output = self.m.path['cleanup'].join(
self.m.symbolize.OUTPUT_JSON)
with self.m.step.nest('task results'):
self._analyze_task_result(
result=task_result,
debug_symbol_gcs_bucket=debug_symbol_gcs_bucket,
symbolize_tool=orchestration_inputs.symbolize_tool,
llvm_symbolizer=orchestration_inputs.llvm_symbolizer,
symbolizer_output=symbolizer_output,
symbolizer_json_output=symbolizer_json_output,
presentation=presentation,
)
with self.m.context(infra_steps=True):
# result.outputs contains the file outputs produced by the Swarming
# task, returned via isolate. It's a mapping of the 'name' of the
# output, represented as its relative path within the isolated it
# was returned in, to a Path object pointing to its location on the
# local disk. For each of the above tasks, there should be exactly
# one output.
serial_log_name = self.m.testing_requests.SERIAL_LOG_NAME
if serial_log_name in task_result.outputs:
serial_log = task_result.outputs.pop(serial_log_name)
serial_log_contents = self.m.file.read_text(
'read serial.txt', serial_log, test_data=[])
serial_presentation = (
presentation or self.m.step.active_result.presentation)
serial_presentation.logs[serial_log_name] = (
serial_log_contents.splitlines())
assert len(task_result.outputs) == 1, 'len(%s) != 1' % repr(
task_result.outputs)
archive_name, archive_path = task_result.outputs.items()[0]
test_results_dir = self.results_dir_on_host.join(
'target', task_result.id)
# _extract_test_results_archive needs minfs_path to be set.
# This is kinda ugly. It'd be better to pass this in as an argument.
self.m.minfs.minfs_path = orchestration_inputs.minfs
test_results_map = self._extract_test_results_archive(
step_name='extract results',
is_minfs=self.m.emu.is_emulator_type(device_type),
archive_path=archive_path,
# Write test results to a subdirectory of |results_dir_on_host|
# so as not to collide with host test results.
leak_to=test_results_dir,
)
# Remove the archive file so it doesn't get uploaded to GCS.
self.m.file.remove('remove %s' % archive_name, archive_path)
test_list = self.m.file.read_json(
'read tests.json', orchestration_inputs.tests_file, test_data=[])
tests = [
self.m.testsharder.Test.from_jsonish(t['test']) for t in test_list
]
with self.m.step.nest('all test results'):
test_results = self.FuchsiaTestResults(
from_fuchsia=True,
results_dir=test_results_dir,
outputs=test_results_map,
swarming_task_id=task_result.id,
symbolizer_json_output=symbolizer_json_output,
env_name=task_result.name,
tests=tests,
legacy_qemu=self.m.emu.is_emulator_type(device_type),
api=self.m,
symbolizer_output=symbolizer_output,
output_dir=output_dir,
overwrite_summary=overwrite_summary,
)
self._analyze_test_results(test_results, presentation=presentation)
return test_results
return launch, collect, process
def final_results(self, tasks):
"""Returns the FuchsiaTestResults from the final attempt of each Task."""
return [
t.attempts[-1].test_results
for t in tasks
if t.attempts[-1].test_results
]
def test_in_shards(self,
collect_timeout_secs,
debug_symbol_gcs_bucket,
max_attempts,
orchestration_inputs,
rerun_budget_secs=None):
"""Tests a Fuchsia build by sharding.
Expects the build and artifacts to be at the same place they were at
the end of the build.
Args:
collect_timeout_secs (int): Amount of time to wait for tasks to complete.
debug_symbol_gcs_bucket (str): A GCS bucket hosting debug symbols.
max_attempts (int): Maximum number of attempts before marking a shard
as failed.
orchestration_inputs (TestOrchestrationInputs): Build artifacts needed
for testing.
rerun_budget_secs (int): Will run tests repeatedly until this budget is consumed.
If set, max_attempts is ignored.
Returns:
A list of FuchsiaTestResults objects representing the completed test
tasks that were not subject to an infra failure.
"""
# If no shards have been provided, then we have successfully run the empty
# set of tests.
if not orchestration_inputs.shard_requests:
return []
self.m.minfs.minfs_path = orchestration_inputs.minfs
collect_timeout = None
if collect_timeout_secs:
collect_timeout = '%ds' % collect_timeout_secs
self._test_runner = _ShardedTestRunner(
self.m,
collect_timeout=collect_timeout,
debug_symbol_gcs_bucket=debug_symbol_gcs_bucket,
llvm_symbolizer=orchestration_inputs.llvm_symbolizer,
max_attempts=max_attempts,
rerun_budget_secs=rerun_budget_secs,
swarming_output_dir=self.m.path.mkdtemp('swarming'),
symbolize_tool=orchestration_inputs.symbolize_tool,
shard_requests=orchestration_inputs.shard_requests,
)
return self._test_runner.run_tests()
def raise_failures(self):
if self._test_runner:
self._test_runner.raise_failures()
def _analyze_task_result(
self,
result,
symbolize_tool,
llvm_symbolizer,
debug_symbol_gcs_bucket,
symbolizer_output,
symbolizer_json_output,
presentation=None,
):
"""Analyzes a swarming.TaskResult and reports results as a step.
Args:
result (api.swarming.TaskResult): The swarming task result to analyze.
symbolize_tool (Path): The path to the symbolize tool.
llvm_symbolizer (Path): The path to the llvm_symbolizer tool.
debug_symbol_gcs_bucket (str): A GCS bucket hosting debug symbols.
symbolizer_output (Path): A path to a file to write the symbolizer's
stdout.
symbolizer_json_output (Path): A path to a file to write the
symbolizer's JSON output.
presentation (StepPresentation or None): The step presentation to
attach logs to. Defaults to `active_result.presentation`.
Raises:
A StepFailure if a kernel panic is detected or an InfraFailure if the
swarming task failed for a different reason.
"""
presentation = presentation or self.m.step.active_result.presentation
if result.output:
# Always symbolize the result output if present in this case.
presentation.logs['symbolized log'] = self.m.symbolize(
symbolize_tool=symbolize_tool,
debug_symbol_gcs_bucket=debug_symbol_gcs_bucket,
llvm_symbolizer=llvm_symbolizer,
data=result.output,
symbolizer_output=symbolizer_output,
json_output=symbolizer_json_output,
presentation=presentation)
# A kernel panic may be present in the logs even if the task timed out, so
# check for that first.
if 'KERNEL PANIC' in result.output:
presentation.step_text = 'kernel panic'
presentation.status = self.m.step.FAILURE
raise self.m.step.StepFailure(
'Found kernel panic. See symbolized output for details.')
if result.isolated_outputs:
presentation.links['test outputs'] = result.isolated_outputs.url
try:
result.analyze()
except self.m.step.StepFailure:
self._present_task_errors(result, presentation)
raise
def _present_task_errors(self, task_result, presentation):
"""Updates text and status of the given step to reflect test task errors."""
# If the task is in an unknown state or completed, but the executed command returned
# a non-zero exit code, this points to a tooling failure.
if task_result.state is None or task_result.state == self.m.swarming.TaskState.COMPLETED:
text = 'tooling failure' # pragma: no cover
else:
text = (task_result.state.name).replace('_', ' ').lower()
presentation.step_text = text
# Report timeouts as red, not purple, as it is likelier that the task is
# timing out due to a bug in the system under test.
if task_result.state == self.m.swarming.TaskState.TIMED_OUT:
status = self.m.step.FAILURE # pragma: no cover
else:
status = self.m.step.EXCEPTION
presentation.status = status
def product(nums):
return reduce(operator.mul, nums)
def nice_time(seconds):
"""Returns a human-readable duration given a number of seconds.
For example: 3605 seconds => '1h 5s'
"""
units = [
('d', 24),
('h', 60),
('m', 60),
('s', 1),
]
result_parts = []
divisor = product(unit_size for _, unit_size in units)
for unit_abbrev, unit_size in units:
count, seconds = divmod(seconds, divisor)
if count > 0:
result_parts.append('%d%s' % (count, unit_abbrev))
divisor /= unit_size
return ' '.join(result_parts)