blob: f0c27910c576a6ba87ddf5e9de783b024bf60524 [file] [log] [blame]
# Copyright 2019 The Fuchsia Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import collections
import copy
from recipe_engine import recipe_api
# List of available targets.
TARGETS = ['x64', 'arm64']
# The PCI address to use for the block device to contain test results.
TEST_FS_PCI_ADDR = '06.0'
# The path in the BootFS manifest that we want runcmds to show up at.
RUNCMDS_BOOTFS_PATH = 'infra/runcmds'
SECRETSHIM_CIPD_VERSION = 'git_revision:63ab3ac613fceb52ac49b63b43fce841a2585645'
# Name of BigQuery project and table for uploading artifacts.
BIGQUERY_PROJECT = 'fuchsia-infra'
BIGQUERY_ARTIFACTS_DATASET = 'artifacts'
# Image and manifest names produced by the build.
IMAGES_JSON = 'images.json'
STORAGE_FULL = 'storage-full'
TEST_RESULTS_ARCHIVE_NAME = 'out.tar'
TEST_RESULTS_MINFS_NAME = 'output.fs'
SERIAL_LOG_NAME = 'serial.txt'
SYSLOG_NAME = 'syslog.txt'
TEST_SUMMARY_JSON = 'summary.json'
KERNEL_LOG = 'kernel_log.txt'
COVARGS_LOG_LEVEL = 'debug'
COVARGS_OUTPUT_JSON = 'covargs-output.json'
# The log level to use for botanist invocations in test tasks. Can be one of
# "fatal", "error", "warning", "info", "debug", or "trace", where "trace" is
# the most verbose, and fatal is the least.
BOTANIST_LOG_LEVEL = 'debug'
# The path to the botanist config for devices preprovisioned into containers.
BOTANIST_DEVICE_CONFIG = '/etc/botanist/config.json'
# System path at which authorized SSH keys are stored.
AUTHORIZED_KEY_PATH = 'data/ssh/authorized_keys'
class FuchsiaTestResults(object):
"""Represents the result of testing of a Fuchsia build."""
# Constants representing the result of running a test. These enumerate the
# values of the 'results' field of the entries in the summary.json file
# obtained from the target device.
_TEST_RESULT_PASS = 'PASS'
_TEST_RESULT_FAIL = 'FAIL'
def __init__(self,
env_name,
from_fuchsia,
results_dir,
zircon_kernel_log,
outputs,
api,
output_dir=None,
symbolizer_output=None):
self._env_name = env_name
self._from_fuchsia = from_fuchsia
self._results_dir = results_dir
self._zircon_kernel_log = zircon_kernel_log
self._outputs = outputs
self._api = api
self._output_dir = output_dir
self._symbolizer_output = symbolizer_output
# Default to empty values if the summary file is missing.
if TEST_SUMMARY_JSON in outputs:
# Cache the summary file if present.
self._raw_summary = outputs[TEST_SUMMARY_JSON]
# TODO(kjharland): Raise explicit step failure when parsing fails so
# that it's clear that the summary file is malformed.
self._summary = api.json.loads(outputs[TEST_SUMMARY_JSON])
else:
self._raw_summary = ''
self._summary = {}
@property
def results_dir(self):
"""A path to the directory that contains test results."""
return self._results_dir
@property
def output_dir(self):
"""A path to the directory containing files produced by the tests.
If it's set, anything that's in this directory will be uploaded to GCS when
upload_results() is called.
"""
return self._output_dir
@property
def outputs(self):
"""A dict that maps test output file paths to their contents.
The output paths are relative paths to files containing stdout+stderr data
and the values are strings containing those contents.
"""
return self._outputs
@property
def raw_summary(self):
"""The raw contents of the JSON summary file or "" if missing."""
return self._raw_summary
@property
def summary(self):
"""The parsed summary file as a Dict or {} if missing."""
return self._summary
@property
def passed_test_outputs(self):
"""All entries in |self.outputs| for tests that passed."""
return self._filter_outputs_by_test_result(self._TEST_RESULT_PASS)
@property
def failed_test_outputs(self):
"""All entries in |self.outputs| for tests that failed."""
return self._filter_outputs_by_test_result(self._TEST_RESULT_FAIL)
@property
def env_name(self):
"""The display name of the environment for these tests"""
return self._env_name
@property
def from_fuchsia(self):
"""Whether the results came from Fuchsia target."""
return self._from_fuchsia
def _filter_outputs_by_test_result(self, result):
"""Returns all entries in |self.outputs| whose result is |result|.
Args:
result (String): one of the _TEST_RESULT_* constants from this class.
Returns:
A dict whose keys are paths to the files containing each test's
stderr+stdout data and whose values are strings containing those
contents.
"""
matches = collections.OrderedDict()
# TODO(kjharland): Sort test names first.
for test in self.summary.get('tests', ()):
if test['result'] == result:
# The 'output_file' field is a path to the file containing the
# stderr+stdout data for the test, and we inline the contents of that
# file as the value in the returned dict.
matches[test['name']] = self.outputs[test['output_file']]
return matches
def upload_results(self, gcs_bucket, upload_to_catapult):
"""Upload select test results (e.g., coverage data) to a given GCS bucket."""
assert gcs_bucket
with self._api.step.nest('upload %s test results' % self._env_name):
if self._api.experimental.upload_test_outputs:
self._api.artifacts.store_test_outputs(
gcs_bucket=gcs_bucket,
test_env=self.env_name,
test_outputs=self.summary.get('tests', ()),
)
if self.output_dir:
if self._raw_summary:
# Save the summary JSON to the test shard output dir so it gets
# uploaded to GCS for easy access by e.g. Dachsiaboard.
summary_path = self.output_dir.join(TEST_SUMMARY_JSON)
assert not self._api.path.exists(summary_path), (
'test output files should not be named %s' % TEST_SUMMARY_JSON)
self._api.file.write_text('write %s' % TEST_SUMMARY_JSON,
summary_path, self._raw_summary)
# Also upload symbolized logs and kernel output.
if self._zircon_kernel_log:
kernel_log_path = self.output_dir.join(KERNEL_LOG)
self._api.file.write_text('write %s' % KERNEL_LOG, kernel_log_path,
self._zircon_kernel_log)
self._upload_outputs(gcs_bucket)
if upload_to_catapult:
self._api.upload.test_outputs_to_catapult(self._output_dir)
def _upload_outputs(self, gcs_bucket):
self._api.upload.directory_to_gcs(
source=self.output_dir,
bucket=gcs_bucket,
# Namespace with the test environment name to avoid collision
# of artifacts across shards.
subpath=self._env_name,
)
def raise_failures(self):
"""Raises a step failure if there were test failures."""
if not self._summary:
# Halt with step failure if summary file is missing.
raise self._api.step.StepFailure(
'Test summary JSON not found, see kernel log for details')
failed_tests = self.failed_test_outputs.keys()
if failed_tests:
# Halt with a step failure.
raise self._api.step.StepFailure('Test failure(s): ' +
', '.join(failed_tests))
# TODO(9936): Replace with running binary tool once created.
if self._output_dir:
# Check serial log for failure messages
fail_strings = ['DEVICE SUSPEND TIMED OUT', 'ASSERT FAILED']
log_path = self.output_dir.join(SERIAL_LOG_NAME)
self._api.path.mock_add_paths(log_path)
if self._api.path.exists(log_path):
self._check_log_for_failures(log_path, fail_strings)
def _check_log_for_failures(self, log_path, fail_strings):
"""Checks for fail strings in log and fails accordingly."""
log_name = self._api.path.basename(log_path)
with self._api.step.nest('check log %s:%s' %
(self._env_name, log_name)) as check_log_step:
contents = self._api.file.read_text(
'read %s' % log_name,
log_path,
test_data='extra log contents',
).get_result()
for fail_str in fail_strings:
if fail_str in contents:
check_log_step.presentation.logs[log_name] = contents.splitlines()
raise self._api.step.StepFailure(
'Found failure string in log %s: %s' % (log_name, fail_str))
def create_task(api, *args, **kwargs):
"""Create a Task object.
The base class of the class is inside the api object, so it can't be
top-level or otherwise defined at module load time. Defining it in this
function as an alternative.
For full args list see Task.__init__ a few lines down.
"""
class Task(api.swarming_retry.TriggeredTask):
def __init__(self, api, name, request, uses_legacy_qemu, targets_fuchsia,
original_build_artifacts, *args, **kwargs):
super(Task, self).__init__(
api=api, name=name, request=request, *args, **kwargs)
self._uses_legacy_qemu = uses_legacy_qemu
self._targets_fuchsia = targets_fuchsia
self._original_build_artifacts = original_build_artifacts
# Test shards with the 'multiplied:' prefix come from
# tools/integration/testsharder/shard.go in fuchsia.git. They were
# specifically created to run a test or set of tests many times to look
# for flakes. It doesn't make sense to retry these when they fail--the
# goal is to see if they fail not to get them to pass.
if name.startswith('multiplied:'):
self.max_attempts = 1
def process_result(self):
"""Unpacks the results archive produced by a test shard."""
attempt = self.attempts[-1]
assert attempt.result
result = attempt.result
if result.isolated_outputs:
attempt.task_outputs_link = result.isolated_outputs.url
if result.state == self._api.swarming.TaskState.TIMED_OUT:
attempt.failure_reason = 'timed out'
attempt.test_results = None
with self._api.step.nest(result.name):
attempt.symbolizer_output = result.output_dir.join(
self._api.symbolize.LOG)
# Figure out what happened to the swarming task.
if result.output:
# Always symbolize the result output if present in this case.
with self._api.context(
cwd=self._original_build_artifacts.fuchsia_build_dir):
attempt.logs['symbolized log'] = self._api.symbolize(
symbolize_tool=self._original_build_artifacts.symbolize_tool,
ids_txt=self._original_build_artifacts.ids,
llvm_symbolizer=self._original_build_artifacts.llvm_symbolizer,
data=result.output,
symbolizer_output=attempt.symbolizer_output,
) # yapf:disable
attempt.logs['kernel log'] = result.output.split('\n')
if 'KERNEL PANIC' in result.output:
attempt.failure_reason = 'KERNEL PANIC' # pragma: no cover
self._check_logs_for_failures(attempt)
if result.success:
self._process_outputs(attempt)
def _process_outputs(self, attempt):
"""Reads the test results and output files of a swarming TaskResult.
Sets attempt.test_results if successful.
Args:
attempt (swarming_retry.Attempt): the attempt to process
"""
assert attempt.result
result = attempt.result
# Extract results if the task was not subject to an infra failure;
# otherwise, a step failure will be raised on exiting the
# defer_results() scope.
attempt.test_results_archive = None
for relative_path, absolute_path in sorted(result.outputs.iteritems()):
if relative_path in [
TEST_RESULTS_ARCHIVE_NAME, TEST_RESULTS_MINFS_NAME
]:
attempt.test_results_archive = absolute_path
assert attempt.test_results_archive, (
'test archive not found amongst outputs of task %s' % result.name)
self._parse_test_results(attempt)
attempt.logs[TEST_SUMMARY_JSON] = (
attempt.test_results.raw_summary.split('\n'))
# Delete the archive so it doesn't get uploaded with the other files in
# the swarming task's output directory.
self._api.file.remove(
'remove %s' % self._api.path.basename(attempt.test_results_archive),
attempt.test_results_archive)
def _parse_test_results(self, attempt):
"""Parse test results from attempt into a FuchsiaTestResults object.
Args:
attempt (swarming_retry.Attempt): the attempt to parse
"""
assert attempt.result
result = attempt.result
results_dir = self._api.testing.results_dir_on_host.join(result.id)
# pylint: disable=protected-access
test_results_map = self._api.testing._extract_test_results_archive(
step_name='extract',
archive_path=attempt.test_results_archive,
leak_to=results_dir,
is_minfs=self._uses_legacy_qemu,
)
# pylint: enable=protected-access
attempt.test_results = FuchsiaTestResults(
env_name=result.name,
from_fuchsia=self._targets_fuchsia,
results_dir=results_dir,
zircon_kernel_log=result.output,
outputs=test_results_map,
api=api,
symbolizer_output=attempt.symbolizer_output,
output_dir=result.output_dir,
)
failed_tests = attempt.test_results.failed_test_outputs
if failed_tests:
attempt.failure_reason = '%d test(s) failed' % len(failed_tests)
def _check_logs_for_failures(self, attempt):
"""Check for failure strings in logs.
Args:
attempt (swarming_retry.Attempt): the attempt to check for logs in
"""
# TODO(9936): Replace with running binary tool once created.
if attempt.result.output_dir:
# Check serial log for failure messages
fail_strings = ['DEVICE SUSPEND TIMED OUT', 'ASSERT FAILED']
log_path = attempt.result.output_dir.join(SERIAL_LOG_NAME)
self._api.path.mock_add_paths(log_path)
if self._api.path.exists(log_path):
log_name = self._api.path.basename(log_path)
with self._api.step.nest('check log %s' % log_name) as presentation:
contents = self._api.file.read_text('read', log_path)
for fail_str in fail_strings:
if fail_str in contents:
presentation.logs[log_name] = contents.splitlines()
presentation.status = self._api.step.FAILURE
presentation.step_summary_text = 'found "%s"' % fail_str
attempt.failure_reason = ('found "%s" in %s' %
(fail_str, log_name))
def present_status(self, parent_step, attempt, **kwargs):
"""Present an Attempt while showing progress in launch/collect step.
Args:
parent_step (Step): will always be 'passed tasks' or 'failed tasks'
attempt (Attempt): the Attempt to present
"""
del kwargs, parent_step # Unused.
with api.step.nest('%s (%s)' % (self.name, attempt.name)) as step:
self._present(
step, attempt, show_failures_in_red=False, show_passed=False)
def present_attempt(self, task_step, attempt, category=None):
"""Present an Attempt when summarizing results at the end of the run.
Args:
task_step (Step): assuming present() was not overridden, this will
always be a step titled after the current task
attempt (Attempt): the Attempt to present
"""
del task_step # Unused.
show_failures_in_red = True
# The 'passes' category includes all attempts of all tasks that
# eventually passed, so it includes some failures. Show those in
# green so people don't get confused and think the overall task
# failed.
# TODO(fxb/36647) after this bug is fixed show these steps in
# red, but show parent steps of those in green.
if category == 'passes':
show_failures_in_red = False
with api.step.nest(
'%s (%s)' %
(attempt.name, 'pass' if attempt.success else 'fail')) as step:
if show_failures_in_red and not attempt.success:
step.status = self._api.step.FAILURE
self._present(
step,
attempt,
show_failures_in_red=show_failures_in_red,
show_passed=True,
)
def _present(self, step, attempt, show_failures_in_red, show_passed):
"""Present an Attempt.
Choosing to do largely the same thing for both kinds of presentations.
Args:
step (Step): parent step
attempt (api.swarming_retry.Attempt): object to present
show_failures_in_red (bool): show failures in red (for final
'flakes' and 'failures' steps) or not (for 'launch/collect'
progress and 'passes' steps)
show_passed (bool): show the names of passed tests (only done for
the end)
Note: the 'passes' step can have failures underneath it because the
first attempt can fail but the retry passed.
"""
step.presentation.links['swarming task'] = attempt.task_ui_link
if attempt.task_outputs_link:
step.presentation.links['task outputs'] = attempt.task_outputs_link
if attempt.failure_reason:
step.presentation.step_summary_text = attempt.failure_reason
for log, data in attempt.logs.iteritems():
step.presentation.logs[log] = data
if attempt.test_results:
test_results = attempt.test_results
# Log the contents of each output file mentioned in the summary.
# Note this assumes the outputs are all valid UTF-8 (See fxb/9500).
for name, path in test_results.summary.get('outputs', {}).iteritems():
step.presentation.logs[name] = test_results.outputs[path].split('\n')
for test, output in test_results.failed_test_outputs.iteritems():
self._report_test_result(
test,
output,
passed=False,
show_failures_in_red=show_failures_in_red,
)
with self._api.step.nest('all passed tests') as passed_tests_step:
passed_tests = test_results.passed_test_outputs
passed_tests_step.presentation.step_summary_text = (
'%d passed tests' % len(passed_tests))
if show_passed:
# Start with a newline to prevent the first test from showing up on
# the same line as the step name.
passed_tests_step.presentation.step_text = ''.join(
'\n' + test_name for test_name in passed_tests)
for log_name in [SYSLOG_NAME, SERIAL_LOG_NAME]:
if log_name in attempt.result.outputs:
self._present_output_file(
name=log_name, path=attempt.result.outputs[log_name], step=step)
def _report_test_result(self,
test,
output,
passed,
show_failures_in_red=True):
if not passed:
test = 'failed: %s' % test
step_result = self._api.step(test, None)
if not passed:
step_result.presentation.logs['stdio'] = output.split('\n')
if show_failures_in_red:
step_result.presentation.status = self._api.step.FAILURE
def _present_output_file(self, name, path, step):
"""Records file contents to the test results step's presentation."""
contents = self._api.file.read_text(
'read %s' % name,
path,
test_data='extra log contents',
)
step.presentation.logs[name] = contents.splitlines()
return Task(*args, api=api, **kwargs)
class _ShardedTestRunner(object):
"""Handles running and analyzing tests that have been split into shards."""
def __init__(self, api, shards, build_artifacts, pool, swarming_output_dir,
timeout_secs, swarming_expiration_timeout_secs,
swarming_io_timeout_secs, max_attempts, collect_timeout):
self._api = api
self._pool = pool
self._original_build_artifacts = build_artifacts
self._shards = shards
self._swarming_output_dir = swarming_output_dir
self._timeout_secs = timeout_secs
self._swarming_expiration_timeout_secs = swarming_expiration_timeout_secs
self._swarming_io_timeout_secs = swarming_io_timeout_secs
self._max_attempts = max_attempts
self._collect_timeout = collect_timeout
self._uses_legacy_qemu = {
shard.name: (not self._api.experimental.ssh_into_qemu
and shard.device_type == 'QEMU')
for shard in self._shards
} # yapf: disable
self._targets_fuchsia = {
shard.name: shard.targets_fuchsia for shard in self._shards
}
self.tasks = []
def run_tests(self):
"""Runs all test shards and outputs FuchsiaTestResults object for each."""
self._generate_swarming_tasks()
# TODO(fxb/35021) use context manager.
self._api.swarming_retry.run_tasks(
tasks=self.tasks,
collect_output_dir=self._swarming_output_dir,
max_attempts=self._max_attempts,
collect_timeout=self._collect_timeout,
)
self._api.swarming_retry.present_tasks(tasks=self.tasks)
test_results = [
x.attempts[-1].test_results
for x in self.tasks
if x.attempts[-1].test_results
]
return test_results
def raise_failures(self):
self._api.swarming_retry.raise_failures(self.tasks)
def _generate_swarming_tasks(self):
self.tasks = []
for shard in self._shards:
with self._api.step.nest('shard %s' % shard.name):
# Copy the build_artifacts object to be modified for each shard.
build_artifacts = copy.deepcopy(self._original_build_artifacts)
self._install_test_files(shard, build_artifacts)
self.tasks.append(
create_task(
api=self._api,
name=shard.name,
request=self._construct_task_request(shard, build_artifacts),
original_build_artifacts=self._original_build_artifacts,
uses_legacy_qemu=self._uses_legacy_qemu[shard.name],
targets_fuchsia=self._targets_fuchsia[shard.name],
))
def _install_test_files(self, shard, build_artifacts):
test_list_path = self._create_test_list(shard)
runtests_file_bootfs_path = 'infra/shard.run'
runcmds_path = self._api.path['cleanup'].join('runcmds-%s' % shard.name)
self._api.testing._create_runcmds_script(
device_type=shard.device_type,
test_cmds=[
'runtests -o %s -f /boot/%s' % (
self._api.testing.results_dir_on_target,
runtests_file_bootfs_path,
)
],
output_path=runcmds_path,
)
# Create new zbi image for shard and update the associated image path.
if 'zircon-a' in build_artifacts.images and shard.device_type == 'QEMU':
shard_zbi_filename = 'fuchsia-%s.zbi' % shard.name
shard_zbi_path = build_artifacts.fuchsia_build_dir.join(
shard_zbi_filename)
self._api.zbi.copy_and_extend(
step_name='create zbi',
# TODO(IN-655): Add support for using the netboot image in non-paving
# cases.
input_image=build_artifacts.fuchsia_build_dir.join(
build_artifacts.images['zircon-a']['path']),
output_image=shard_zbi_path,
manifest={
RUNCMDS_BOOTFS_PATH: runcmds_path,
runtests_file_bootfs_path: test_list_path,
},
)
build_artifacts.images['zircon-a']['path'] = shard_zbi_filename
def _create_test_list(self, shard):
test_locations = []
for test in shard.tests:
test_locations.append(test.install_path)
test_list_path = self._api.path['cleanup'].join('tests-%s' % shard.name)
self._api.file.write_text(
name='write test list',
dest=test_list_path,
text_data='\n'.join(test_locations) + '\n',
)
return test_list_path
def _construct_task_request(self, shard, build_artifacts):
if self._uses_legacy_qemu[shard.name]:
return self._api.testing._construct_legacy_qemu_task_request(
task_name=shard.name,
pool=self._pool,
build_artifacts=build_artifacts,
timeout_secs=self._timeout_secs,
swarming_io_timeout_secs=self._swarming_io_timeout_secs,
swarming_expiration_timeout_secs=(
self._swarming_expiration_timeout_secs),
# TODO(IN-654): Add support for secret_bytes.
secret_bytes='',
)
else:
return self._construct_test_task_request(
build_artifacts=build_artifacts, shard=shard)
def _construct_test_task_request(self, build_artifacts, shard):
"""Constructs a Swarming task request to run a shard of Fuchsia tests.
Args:
build_artifacts (BuildArtifacts): The Fuchsia build artifacts to test.
shard (api.testsharder.Shard): A shard of tests.
timeout_secs (int): The amount of seconds to wait for the tests to execute
before giving up.
Returns:
An api.swarming.TaskRequest representing the swarming task request.
"""
# To freely archive files from the build directory, the source, and those we
# dynamically create, we create a tree of symlinks in a fresh directory and
# isolate that. This solves the problems of (a) finding a root directory
# that works for all artifacts, (b) being able to create files in that
# directory without fear of collision, and (c) not having to isolate
# extraneous files.
isolate_tree = self._api.file.symlink_tree(
root=self._api.path.mkdtemp('isolate'))
test_manifest = 'tests.json'
self._api.file.write_json(
'write test manifest',
isolate_tree.root.join(test_manifest),
[test.render_to_jsonish() for test in shard.tests],
indent=2)
cmd = []
outputs = []
ensure_file = self._api.cipd.EnsureFile()
dimensions = {'pool': 'fuchsia.tests'}
test_bot_cpu = 'x64'
# This command spins up a metadata server that allows its subcommands to
# automagically authenticate with LUCI auth, provided the sub-exec'ed tool
# was written in go or dart and respectively makes use of the standard
# cloud.google.com/go/compute/metadata or
# github.com/dart-lang/googleapis_auth authentication libraries. Such
# libraries look for a metadata server under environment variables
# like $GCE_METADATA_HOST, which LUCI emulates.
if shard.service_account:
# TODO(fxbug.dev/37142): Find a way to use the version that LUCI is
# currently using, instead of 'latest'.
ensure_file.add_package('infra/tools/luci-auth/${platform}', 'latest')
cmd.extend(['./luci-auth', 'context', '--'])
if shard.device_type == 'QEMU':
dimensions.update(os='Debian', cpu=build_artifacts.target, kvm='1')
# To take advantage of KVM, we execute QEMU-arm tasks on arm hardware.
test_bot_cpu = build_artifacts.target
else:
dimensions.update(shard.dimensions)
if shard.targets_fuchsia:
botanist_cmd = [
'./botanist',
'-level', BOTANIST_LOG_LEVEL,
'run',
'-images', IMAGES_JSON,
'-timeout', '%ds' % self._timeout_secs,
'-syslog', SYSLOG_NAME,
'-serial-log', SERIAL_LOG_NAME,
] #yapf: disable
outputs.append(SYSLOG_NAME)
outputs.append(SERIAL_LOG_NAME)
config = BOTANIST_DEVICE_CONFIG
if shard.device_type == 'QEMU':
config = './qemu.json'
self._api.zbi.zbi_path = build_artifacts.zbi
self._api.zbi.copy_and_extend(
'embed authorized key',
input_image=build_artifacts.fuchsia_build_dir.join(
build_artifacts.images['zircon-a']['path']),
output_image=build_artifacts.fuchsia_build_dir.join(
build_artifacts.images['zircon-a']['path']),
manifest={
AUTHORIZED_KEY_PATH: build_artifacts.authorized_key,
},
)
botanist_cmd.extend(
['-ssh', build_artifacts.DEFAULT_ISOLATED_LAYOUT.private_key])
qemu_config = [{
'type': 'qemu',
'path': './qemu/bin',
'target': build_artifacts.target,
'cpu': 8,
'memory': 8192,
'kvm': True,
}]
self._api.file.write_json(
'write qemu config',
isolate_tree.root.join('qemu.json'),
qemu_config,
indent=2)
ensure_file.add_package(
'fuchsia/third_party/qemu/${platform}',
self._api.qemu.version,
subdir='qemu')
elif shard.netboot:
botanist_cmd.append('-netboot')
botanist_cmd.extend(['-config', config])
cmd.extend(botanist_cmd)
cmd.extend([
'./testrunner',
'-archive', TEST_RESULTS_ARCHIVE_NAME,
test_manifest,
]) #yapf: disable
outputs.append(TEST_RESULTS_ARCHIVE_NAME)
isolated_hash = self._api.testing._isolate_build_artifacts(
isolate_tree, build_artifacts, shard=shard, test_bot_cpu=test_bot_cpu)
request = (self._api.swarming.task_request().
with_name(shard.name).
with_service_account(shard.service_account)
) #yapf: disable
return request.with_slice(0, request[0].
with_command(cmd).
with_isolated(isolated_hash).
with_dimensions(**dimensions).
with_expiration_secs(self._swarming_expiration_timeout_secs).
with_io_timeout_secs(self._swarming_io_timeout_secs).
with_execution_timeout_secs(self._timeout_secs).
with_outputs(outputs).
with_cipd_ensure_file(ensure_file).
with_env_vars(**self._test_task_env_vars(shard, build_artifacts))
) #yapf: disable
def _test_task_env_vars(self, shard, build_artifacts):
"""Returns the environment variables to be set for the test task.
Returns:
A dict mapping string env var names to string values.
"""
build = self._api.buildbucket.build
commit = build.input.gitiles_commit
env_vars = dict(
# `${ISOLATED_OUTDIR}` is a magic string that Swarming will replace
# with a temporary directory into which files will be automatically
# collected upon exit of a task.
FUCHSIA_TEST_OUTDIR='${ISOLATED_OUTDIR}',
BUILDBUCKET_ID=str(build.id),
BUILD_BOARD=build_artifacts.board,
BUILD_TYPE=build_artifacts.build_type,
BUILD_PRODUCT=build_artifacts.product,
BUILD_TARGET=build_artifacts.target,
BUILDBUCKET_BUCKET=build.builder.bucket,
# Used by the catapult converter
BUILD_CREATE_TIME=str(build.create_time.seconds),
BUILDER_NAME=self._api.buildbucket.builder_name,
FUCHSIA_DEVICE_TYPE=shard.device_type,
INPUT_COMMIT_HOST=commit.host,
INPUT_COMMIT_PROJECT=commit.project,
INPUT_COMMIT_REF=commit.ref,
)
# For some reason, empty string environment variables sent to the swarming
# API get interpreted as null and rejected. So don't bother sending them to
# avoid breaking the task request.
# TODO(olivernewman): Figure out whether this logic should be moved into
# the upstream swarming module (or obviated by fixing the "" -> null
# behavior).
return {k: v for k, v in env_vars.iteritems() if v}
class FuchsiaTestApi(recipe_api.RecipeApi):
"""An abstraction over how Jiri checkouts are created during Fuchsia CI/CQ builds."""
FuchsiaTestResults = FuchsiaTestResults
def test_from_spec(self, test_spec, build_artifacts):
"""Runs all tests against the build artifacts according to the input spec.
Args:
test_spec (fuchsia_pb2.Fuchsia.Test): The input spec test properties.
build_artifacts (api.build.BuildArtifacts): The build artifacts to test
against.
Returns:
A list of FuchsiaTestResults objects representing the completed test
tasks that were not subject to an infra failure.
"""
if test_spec.test_in_shards:
all_results = self._test_in_shards(
pool=test_spec.pool,
build_artifacts=build_artifacts,
shards=build_artifacts.shards,
timeout_secs=test_spec.timeout_secs,
swarming_io_timeout_secs=test_spec.swarming_io_timeout_secs,
swarming_expiration_timeout_secs=(
test_spec.swarming_expiration_timeout_secs),
max_attempts=test_spec.max_attempts,
collect_timeout_secs=test_spec.collect_timeout_secs,
)
else:
all_results = [
self.deprecated_test(
build_artifacts=build_artifacts,
pool=test_spec.pool,
timeout_secs=test_spec.timeout_secs,
swarming_io_timeout_secs=test_spec.swarming_io_timeout_secs,
swarming_expiration_timeout_secs=(
test_spec.swarming_expiration_timeout_secs),
pave=test_spec.pave,
test_cmds=[
'runtests -o %s %s' % (
self.results_dir_on_target,
test_spec.runtests_args,
),
],
device_type=test_spec.device_type,
requires_secrets=test_spec.requires_secrets,
)
]
return all_results
def analyze_test_results(self, test_results, presentation=None):
"""Analyzes test results represented by FuchsiaTestResults objects
Logs individual test results in separate steps.
Args:
test_results (FuchsiaTestResults): Fuchsia test result object
presentation (dict|None): A particular step's presentation on which to log
test result outputs; if not provided, that of the active result will be
used.
"""
if not test_results.summary:
return
presentation = presentation or self.m.step.active_result.presentation
# Log the summary file's contents.
raw_summary_log = test_results.raw_summary.split('\n')
presentation.logs[TEST_SUMMARY_JSON] = raw_summary_log
# Log the contents of each output file mentioned in the summary.
# Note this assumes the outputs are all valid unicode (See IN-656).
for output_name, output_path in test_results.summary.get('outputs',
{}).iteritems():
output_str = test_results.outputs[output_path]
presentation.logs[output_name] = output_str.split('\n')
for test, output in test_results.failed_test_outputs.iteritems():
self._report_test_result(test, output, passed=False)
with self.m.step.nest('all passed tests'):
for test, output in test_results.passed_test_outputs.iteritems():
self._report_test_result(test, output, passed=True)
def _report_test_result(self, test, output, passed):
step_result = self.m.step(test, None)
step_result.presentation.logs['stdio'] = output.split('\n')
if not passed:
step_result.presentation.status = self.m.step.FAILURE
def process_coverage(self, covargs_path, test_results, ids_txt, llvm_profdata,
llvm_cov, gcs_bucket):
output_dir = self.m.path['cleanup'].join('coverage')
cmd = [
covargs_path,
'-level',
COVARGS_LOG_LEVEL,
'-json-output',
self.m.json.output(name=COVARGS_OUTPUT_JSON),
'-output-dir',
output_dir,
'-llvm-profdata',
llvm_profdata,
'-llvm-cov',
llvm_cov,
'-ids',
ids_txt,
]
for result in test_results:
cmd.extend(['-summary', result.results_dir.join(TEST_SUMMARY_JSON)])
self.m.step('covargs', cmd)
# TODO: move this into gsutil module/deduplicate this with other GCS logic
dst = 'builds/%s/coverage' % self.m.buildbucket.build_id
step_result = self.m.gsutil.rsync(
name='upload coverage',
src=output_dir,
bucket=gcs_bucket,
dst=dst,
recursive=True,
gzip_exts=['html'],
options={
'parallel_process_count': self.m.platform.cpu_count,
'parallel_thread_count': 1,
},
multithreaded=True)
step_result.presentation.links['index.html'] = self.m.gsutil._http_url(
gcs_bucket, self.m.gsutil.join(dst, 'index.html'), True)
def _isolate_build_artifacts(self,
isolate_tree,
build_artifacts,
shard=None,
test_bot_cpu='x64'):
"""Populates a tree with build artifacts and isolates it.
Specifically, the following is linked into or created within the tree:
- The images in the build are linked in and manifest of them is created
in the root, if targeting a fuchsia device;
- The Linux/Mac tests in the shard and their runtime dependencies.
Args:
isolate_tree (api.file.SymlinkTree): A tree into which artifacts may be
linked.
build (FuchsiaBuildResults): The result of a fuchsia build.
shard (api.testsharder.Shard|None): A test shard.
test_bot_cpu (str|None): The host cpu of the bot running the test task.
Returns:
The isolated hash that may be used to reference and download the
artifacts.
"""
def register_link(relpath):
"""Prepares a symlink of a relative path within the build directory to the tree."""
isolate_tree.register_link(
target=build_artifacts.fuchsia_build_dir.join(relpath),
linkname=isolate_tree.root.join(relpath),
)
# TODO(IN-931): Remove `shard is None` condition once device and QEMU
# codepaths are passing shard and using _construct_test_task_request().
if shard is None or shard.targets_fuchsia:
image_list = build_artifacts.images.values()
image_manifest_path = isolate_tree.root.join(IMAGES_JSON)
self.m.file.write_json(
'write image manifest', image_manifest_path, image_list, indent=2)
for image in image_list:
register_link(image['path'])
if shard:
for test in shard.tests:
if test.os in ['linux', 'mac']:
register_link(test.install_path)
for dep in shard.deps:
register_link(dep)
# If targeting QEMU we include a private key corresponding to an authorized
# key already in the boot image; this is needed as we do not pave QEMU.
if shard and shard.device_type == 'QEMU':
isolate_tree.register_link(
target=build_artifacts.private_key,
linkname=isolate_tree.root.join(
build_artifacts.DEFAULT_ISOLATED_LAYOUT.private_key,),
)
for tool in [
build_artifacts.botanist(test_bot_cpu),
build_artifacts.testrunner(test_bot_cpu)
]:
tool_name = self.m.path.basename(tool)
isolate_tree.register_link(
target=tool, linkname=isolate_tree.root.join(tool_name))
isolate_tree.create_links('create tree of build artifacts')
isolated = self.m.isolated.isolated(isolate_tree.root)
isolated.add_dir(isolate_tree.root)
return isolated.archive('isolate build artifacts')
@property
def results_dir_on_target(self):
"""The directory on target to which target test results will be written."""
return '/tmp/infra-test-output'
@property
def results_dir_on_host(self):
"""The directory on host to which host and target test results will be written.
Target test results will be copied over to this location and host test
results will be written here. Host and target tests on should write to
separate subdirectories so as not to collide.
"""
return self.m.path['cleanup'].join('test_results')
def _create_runcmds_script(self, device_type, test_cmds, output_path):
"""Creates a script for running tests on boot."""
# The device topological path is the toplogical path to the block device
# which will contain test output.
device_topological_path = '/dev/sys/pci/00:%s/virtio-block/block' % (
TEST_FS_PCI_ADDR)
# Script that mounts the block device to contain test output and runs tests,
# dropping test output into the block device.
results_dir = self.results_dir_on_target
runcmds = [
'mkdir %s' % results_dir,
]
if device_type == 'QEMU':
runcmds.extend([
# Wait until the MinFS test image shows up (max <timeout> ms).
'waitfor class=block topo=%s timeout=60000' % device_topological_path,
'mount %s %s' % (device_topological_path, results_dir),
] + test_cmds + [
'umount %s' % results_dir,
'dm poweroff',
])
else:
runcmds.extend(test_cmds)
runcmds_bytes = []
for line in runcmds:
if isinstance(line, unicode):
runcmds_bytes.append(line.encode('utf-8'))
elif isinstance(line, str):
runcmds_bytes.append(line)
else: # pragma: no cover
assert False, 'line is not unicode or a str: %s, %s' % (line,
type(line))
self.m.file.write_text('write runcmds', output_path,
'\n'.join(runcmds_bytes))
def _construct_legacy_qemu_task_request(self, task_name, build_artifacts,
pool, timeout_secs,
swarming_expiration_timeout_secs,
swarming_io_timeout_secs,
secret_bytes):
"""Constructs a Swarming task request which runs Fuchsia tests inside QEMU.
Expects the build and artifacts to be at the same place they were at
the end of the build.
Args:
build_artifacts (BuildArtifacts): The Fuchsia build artifacts to test.
pool (str): Swarming pool from which the test task will be drawn.
timeout_secs (int): The amount of seconds to wait for the tests to execute
before giving up.
secret_bytes (str): secret bytes to pass to the QEMU task.
Returns:
An api.swarming.TaskRequest representing the swarming task request.
"""
# To freely archive files from the build directory, the source, and those we
# dynamically create, we create a tree of symlinks in a fresh directory and
# isolate that. This solves the problems of (a) finding a root directory
# that works for all artifacts, (b) being able to create files in that
# directory without fear of collision, and (c) not having to isolate
# extraneous files.
isolate_tree = self.m.file.symlink_tree(root=self.m.path.mkdtemp('isolate'))
# As part of running tests, we'll send a MinFS image over to another machine
# which will be declared as a block device in QEMU, at which point
# Fuchsia will mount it and write test output to. We choose 4G for the
# MinFS image arbitrarily, as it appears it can hold our test output
# comfortably without going overboard on size.
#
minfs_image_path = isolate_tree.root.join(TEST_RESULTS_MINFS_NAME)
self.m.minfs.create(minfs_image_path, '4G', name='create test image')
ensure_file = self.m.cipd.EnsureFile()
botanist_cmd = [
'./botanist',
'-level', BOTANIST_LOG_LEVEL,
'qemu',
'-qemu-dir', './qemu/bin',
'-images', IMAGES_JSON,
'-arch', build_artifacts.target,
'-minfs', TEST_RESULTS_MINFS_NAME,
'-pci-addr', TEST_FS_PCI_ADDR,
'-use-kvm'
] # yapf: disable
if secret_bytes:
# Wrap botanist command with secretshim which starts the secrets server
# before running the following command.
botanist_cmd = ['./secretshim'] + botanist_cmd
ensure_file.add_package('fuchsia/infra/secretshim/${platform}',
SECRETSHIM_CIPD_VERSION)
if [v for v in ['asan', 'profile'] if v in build_artifacts.variants]:
botanist_cmd.extend([
'-cpu',
str(8),
'-memory',
str(8192),
])
# storage-full not being present signifies the exclusion of the system
# partition, which means `boot` (i.e. running on boot) must be used instead
# of `system` (i.e., running after the system partition is mounted).
storage_free_build = STORAGE_FULL not in build_artifacts.images
arg_key = 'zircon.autorun.%s' % ('boot' if storage_free_build else 'system')
botanist_cmd.append('%s=/boot/bin/sh+/boot/%s' %
(arg_key, RUNCMDS_BOOTFS_PATH))
# To take advantage of KVM, we execute QEMU-arm tasks on arm hardware.
isolated_hash = self._isolate_build_artifacts(
isolate_tree, build_artifacts, test_bot_cpu=build_artifacts.target)
ensure_file.add_package(
'fuchsia/third_party/qemu/${platform}',
self.m.qemu.version,
subdir='qemu')
request = self.m.swarming.task_request().with_name(task_name)
return (request.with_slice(0, request[0].
with_command(botanist_cmd).
with_isolated(isolated_hash).
with_dimensions(pool=pool, os='Debian', cpu=build_artifacts.target, kvm='1').
with_io_timeout_secs(swarming_io_timeout_secs).
with_execution_timeout_secs(timeout_secs).
with_expiration_secs(swarming_expiration_timeout_secs).
with_secret_bytes(secret_bytes).
with_outputs([TEST_RESULTS_MINFS_NAME]).
with_cipd_ensure_file(ensure_file)
)) #yapf: disable
def _construct_device_task_request(self,
task_name,
device_type,
build_artifacts,
pool,
pave,
timeout_secs,
swarming_expiration_timeout_secs,
swarming_io_timeout_secs,
extra_dimensions=None):
"""Constructs a Swarming task request to run Fuchsia tests on a device.
Expects the build and artifacts to be at the same place they were at
the end of the build.
Args:
build_artifacts (BuildArtifacts): The Fuchsia build artifacts to test.
pool (str): Swarming pool from which the test task will be drawn.
pave (bool): Whether or not the build artifacts should be paved.
timeout_secs (int): The amount of seconds to wait for the tests to execute
before giving up.
extra_dimensions (dict): Additional dimensions for swarming allocation
Returns:
An api.swarming.TaskRequest representing the swarming task request.
"""
# TODO(BLD-253): images in doc string should point to the schema once there is one.
# TODO(fxb/22980): Remove "extra_dimensions" once garnet-perf doesn't need
# it anymore.
extra_dimensions = extra_dimensions or {}
# Construct the botanist command.
botanist_cmd = [
'./botanist',
'-level', BOTANIST_LOG_LEVEL,
'zedboot',
'-config', BOTANIST_DEVICE_CONFIG,
'-images', IMAGES_JSON,
'-results-dir', self.results_dir_on_target,
'-out', TEST_RESULTS_ARCHIVE_NAME,
] # yapf: disable
if not pave:
botanist_cmd.append('-netboot')
# storage-full not being present signifies the exclusion of the system
# partition, which means `boot` (i.e. running on boot) must be used instead
# of `system` (i.e., running after the system partition is mounted).
storage_free_build = STORAGE_FULL not in build_artifacts.images
arg_key = 'zircon.autorun.%s' % ('boot' if storage_free_build else 'system')
botanist_cmd.append('%s=/boot/bin/sh+/boot/%s' %
(arg_key, RUNCMDS_BOOTFS_PATH))
# To freely archive files from the build directory, the source, and those we
# dynamically create, we create a tree of symlinks in a fresh directory and
# isolate that. This solves the problems of (a) finding a root directory
# that works for all artifacts, (b) being able to create files in that
# directory without fear of collision, and (c) not having to isolate
# extraneous files.
isolate_tree = self.m.file.symlink_tree(root=self.m.path.mkdtemp('isolate'))
isolated_hash = self._isolate_build_artifacts(isolate_tree, build_artifacts)
dimensions = {
'pool': pool,
'device_type': device_type,
}
dimensions.update(extra_dimensions)
request = self.m.swarming.task_request().with_name(task_name)
return (request.with_slice(0, request[0].
with_command(botanist_cmd).
with_isolated(isolated_hash).
with_dimensions(**dimensions).
with_expiration_secs(swarming_expiration_timeout_secs).
with_io_timeout_secs(swarming_io_timeout_secs).
with_execution_timeout_secs(timeout_secs).
with_outputs([TEST_RESULTS_ARCHIVE_NAME])
)) #yapf: disable
def _extract_test_results_archive(self,
step_name,
archive_path,
is_minfs=False,
leak_to=None):
"""Extracts test results from an archive.
Args:
step_name (str): The name of the step.
archive_path (Path): The path to the archive which contains test results.
is_minfs (bool): Whether the archive in question is a minfs image
containing QEMU test results. If false, then the archive is assumed to
be a tar file.
leak_to (Path): Optionally leak the contents of the archive to a
directory.
Returns:
A dict mapping a filepath relative to the root of the archive to the
contents of that file in the archive.
"""
if is_minfs:
return self.m.minfs.copy_image(
step_name=step_name,
image_path=archive_path,
out_dir=leak_to,
).raw_io.output_dir
self.m.tar.ensure()
return self.m.tar.extract(
step_name=step_name,
path=archive_path,
directory=self.m.raw_io.output_dir(leak_to=leak_to),
).raw_io.output_dir
def _decrypt_secrets(self, build_artifacts):
"""Decrypts the secrets included in the build.
Args:
build (FuchsiaBuildResults): The build for which secret specs were
generated.
Returns:
The dictionary that maps secret spec name to the corresponding plaintext.
"""
self.m.cloudkms.ensure()
secret_spec_dir = build_artifacts.secret_specs
secrets_map = {}
with self.m.step.nest('process secret specs'):
secret_spec_files = self.m.file.listdir('list', secret_spec_dir)
for secret_spec_file in secret_spec_files:
basename = self.m.path.basename(secret_spec_file)
# Skip the 'ciphertext' subdirectory.
if basename == 'ciphertext':
continue
secret_name, _ = basename.split('.json', 1)
secret_spec = self.m.json.read('read spec for %s' % secret_name,
secret_spec_file).json.output
# For each test spec file <name>.json in this directory, there is an
# associated ciphertext file at ciphertext/<name>.ciphertext.
ciphertext_file = secret_spec_dir.join('ciphertext',
'%s.ciphertext' % secret_name)
key_path = secret_spec['cloudkms_key_path']
secrets_map[secret_name] = self.m.cloudkms.decrypt(
'decrypt secret for %s' % secret_name, key_path, ciphertext_file,
self.m.raw_io.output()).raw_io.output
return secrets_map
def deprecated_test_async(self,
build_artifacts,
pool,
test_cmds,
device_type,
timeout_secs=40 * 60,
swarming_expiration_timeout_secs=18000,
swarming_io_timeout_secs=5 * 60,
pave=True,
requires_secrets=False,
extra_dimensions=None):
"""Tests a Fuchsia build on the specified device.
This can be done in two parts: It kicks off a swarming task and returns
a function that, when called, waits for the swarming task to complete
and returns the results from the task.
Expects the build and artifacts to be at the same place they were at
the end of the build.
Args:
build_artifacts (BuildArtifacts): The Fuchsia build artifacts to test.
pool (str): Swarming pool from which the test task will be drawn.
timeout_secs (int): The amount of seconds to wait for the tests to execute
before giving up.
requires_secrets (bool): Whether tests require plaintext secrets; ignored
if device_type != 'QEMU'.
pave (bool): Whether to pave the image to disk. Ignored if device_type ==
'QEMU'.
extra_dimensions (dict): Additional dimensions for swarming allocation
Returns:
A function that, when invoked, waits for the tests to complete and
returns a FuchsiaTestResults object representing the completed test.
"""
assert test_cmds
assert device_type
# TODO(fxb/22980): Remove "extra_dimensions" once garnet-perf doesn't need
# it anymore.
extra_dimensions = extra_dimensions or {}
self.m.minfs.minfs_path = build_artifacts.minfs
self.m.zbi.zbi_path = build_artifacts.zbi
runcmds_path = self.m.path['cleanup'].join('runcmds')
self._create_runcmds_script(device_type, test_cmds, runcmds_path)
# Copy build_artifacts because we modify its contents below.
build_artifacts = copy.deepcopy(build_artifacts)
# Inject the runcmds script into the bootfs image.
if device_type == 'QEMU':
zbi_name = 'zircon-a'
else:
zbi_name = next(
(image['name']
for image in build_artifacts.images.values()
if '--boot' in image.get(
'bootserver_%s' % ('netboot' if not pave else 'pave'), [])),
None)
assert zbi_name, 'Could not find kernel image to boot.'
new_zbi_filename = 'test-infra.zbi'
new_zbi_path = build_artifacts.fuchsia_build_dir.join(new_zbi_filename)
self.m.zbi.copy_and_extend(
step_name='create test zbi',
input_image=build_artifacts.fuchsia_build_dir.join(
build_artifacts.images[zbi_name]['path']),
output_image=new_zbi_path,
manifest={RUNCMDS_BOOTFS_PATH: runcmds_path},
)
build_artifacts.images[zbi_name]['path'] = new_zbi_filename
if device_type == 'QEMU':
secret_bytes = ''
if requires_secrets:
secret_bytes = self.m.json.dumps(self._decrypt_secrets(build_artifacts))
task = self._construct_legacy_qemu_task_request(
task_name='all tests',
build_artifacts=build_artifacts,
pool=pool,
timeout_secs=timeout_secs,
swarming_expiration_timeout_secs=swarming_expiration_timeout_secs,
swarming_io_timeout_secs=swarming_io_timeout_secs,
secret_bytes=secret_bytes,
)
else:
task = self._construct_device_task_request(
task_name='all tests',
device_type=device_type,
build_artifacts=build_artifacts,
pool=pool,
pave=pave,
timeout_secs=timeout_secs,
swarming_expiration_timeout_secs=swarming_expiration_timeout_secs,
swarming_io_timeout_secs=swarming_io_timeout_secs,
extra_dimensions=extra_dimensions,
)
with self.m.context(infra_steps=True):
request_metadata = self.m.swarming.trigger(
'trigger 1 task', [task], cancel_extra_tasks=True)
def collect_and_process_func():
with self.m.context(infra_steps=True):
output_dir = self.m.path.mkdtemp('swarming')
results = self.m.swarming.collect(
'collect', tasks=request_metadata, output_dir=output_dir)
assert len(results) == 1, 'len(%s) != 1' % repr(results)
result = results[0]
symbolizer_output = output_dir.join(self.m.symbolize.LOG)
with self.m.step.nest('task results') as task_result_step:
self.analyze_task_result(
result=result,
ids_txt=build_artifacts.ids,
fuchsia_build_dir=build_artifacts.fuchsia_build_dir,
symbolize_tool=build_artifacts.symbolize_tool,
llvm_symbolizer=build_artifacts.llvm_symbolizer,
symbolizer_output=symbolizer_output,
parent_step=task_result_step,
)
with self.m.context(infra_steps=True):
# result.outputs contains the file outputs produced by the Swarming
# task, returned via isolate. It's a mapping of the 'name' of the
# output, represented as its relative path within the isolated it
# was returned in, to a Path object pointing to its location on the
# local disk. For each of the above tasks, there should be exactly
# one output.
assert len(result.outputs) == 1, 'len(%s) != 1' % repr(result.outputs)
archive_name, archive_path = result.outputs.items()[0]
test_results_dir = self.results_dir_on_host.join('target', result.id)
test_results_map = self._extract_test_results_archive(
step_name='extract results',
is_minfs=device_type == 'QEMU',
archive_path=archive_path,
# Write test results to a subdirectory of |results_dir_on_host|
# so as not to collide with host test results.
leak_to=test_results_dir,
)
# Remove the archive file so it doesn't get uploaded to GCS.
self.m.file.remove('remove %s' % archive_name, archive_path)
with self.m.step.nest('all test results'):
test_results = self.FuchsiaTestResults(
env_name=result.name,
from_fuchsia=True,
results_dir=test_results_dir,
zircon_kernel_log=result.output,
outputs=test_results_map,
api=self.m,
symbolizer_output=symbolizer_output,
output_dir=output_dir,
)
self.analyze_test_results(test_results)
return test_results
return collect_and_process_func
def deprecated_test(self, *args, **kwargs):
return self.deprecated_test_async(*args, **kwargs)()
def _test_in_shards(
self,
pool,
build_artifacts,
shards,
swarming_expiration_timeout_secs,
swarming_io_timeout_secs,
max_attempts,
collect_timeout_secs,
timeout_secs=40 * 60,
):
"""Tests a Fuchsia build by sharding.
Expects the build and artifacts to be at the same place they were at
the end of the build.
Args:
pool (str): The Swarming pool to schedule test tasks in.
build_artifacts (BuildArtifacts): The Fuchsia build artifacts to test.
shards (List): A list of Shards, each representing one test shard.
max_attempts (int): Maximum number of attempts before marking a shard
as failed.
collect_timeout_secs (int): Amount of time to wait for tasks to complete.
timeout_secs (int): The amount of seconds to wait for the tests to execute
before giving up.
Returns:
A list of FuchsiaTestResults objects representing the completed test
tasks that were not subject to an infra failure.
"""
# If no shards have been provided, then we have successfully run the empty
# set of tests.
if not shards:
return []
collect_timeout = None
if collect_timeout_secs:
collect_timeout = '%ds' % collect_timeout_secs
self.m.minfs.minfs_path = build_artifacts.minfs
self.m.zbi.zbi_path = build_artifacts.zbi
self._test_runner = _ShardedTestRunner(
self.m,
shards,
build_artifacts,
pool=pool,
swarming_output_dir=self.m.path.mkdtemp('swarming'),
timeout_secs=timeout_secs,
swarming_expiration_timeout_secs=swarming_expiration_timeout_secs,
swarming_io_timeout_secs=swarming_io_timeout_secs,
max_attempts=max_attempts,
collect_timeout=collect_timeout,
)
return self._test_runner.run_tests()
def raise_failures(self):
test_runner = getattr(self, '_test_runner', None)
if test_runner:
test_runner.raise_failures()
def analyze_task_result(self,
result,
symbolize_tool,
llvm_symbolizer,
build_id_dirs=(),
ids_txt=None,
fuchsia_build_dir=None,
symbolizer_output=None,
parent_step=None):
"""Analyzes a swarming.TaskResult and reports results as a step.
Args:
task_result (api.swarming.TaskResult): The swarming task result to
analyze.
symbolize_tool (Path): The path to the symbolize tool.
llvm_symbolizer (Path): The path to the llvm_symbolizer tool.
build_id_dirs (seq(Path)): A list of build-id directories.
ids_txt (Path|None): The path to the ids.txt file, which maps ELF
build-ids, to file paths.
fuchsia_build_dir (Path|None): The path to run symbolizer in, which
matches relative pathing in ids.txt.
symbolizer_output (Path|None): A path to a file to write the symbolizer's
stdout.
parent_step (StepData|None): The step to attach logs to. Defaults to
`active_result`.
Raises:
A StepFailure if a kernel panic is detected or an InfraFailure if the
swarming task failed for a different reason.
"""
parent_step = parent_step or self.m.step.active_result
if result.output:
# Always symbolize the result output if present in this case.
with self.m.context(cwd=fuchsia_build_dir):
self.m.symbolize(
symbolize_tool=symbolize_tool,
ids_txt=ids_txt,
build_id_dirs=build_id_dirs,
llvm_symbolizer=llvm_symbolizer,
data=result.output,
symbolizer_output=symbolizer_output,
parent_step=parent_step)
kernel_output_lines = result.output.split('\n')
parent_step.presentation.logs['kernel log'] = kernel_output_lines
# A kernel panic may be present in the logs even if the task timed out, so
# check for that first.
if 'KERNEL PANIC' in result.output:
parent_step.presentation.step_text = 'kernel panic'
parent_step.presentation.status = self.m.step.FAILURE
raise self.m.step.StepFailure(
'Found kernel panic. See symbolized output for details.')
if result.isolated_outputs:
parent_step.presentation.links[
'test_outputs'] = result.isolated_outputs.url
try:
result.analyze()
except:
self._present_task_errors(result, parent_step)
raise
def _present_task_errors(self, task_result, step):
"""Updates the current step to reflect test task errors"""
# If the task is in an unknown state or completed, but the executed command returned
# a non-zero exit code, this points to a tooling failure.
if task_result.state is None or task_result.state == self.m.swarming.TaskState.COMPLETED:
text = 'tooling failure' # pragma: no cover
else:
text = (task_result.state.name).replace('_', ' ').lower()
step.presentation.step_text = text
# Report timeouts as red, not purple, as it is likelier that the task is
# timing out due to a bug in the system under test.
if task_result.state == self.m.swarming.TaskState.TIMED_OUT:
status = self.m.step.FAILURE # pragma: no cover
else:
status = self.m.step.EXCEPTION
step.presentation.status = status