blob: 2e7f7c3093a0b3c05505d7429a24e1745ae7cb74 [file] [log] [blame]
# Copyright 2019 The Fuchsia Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import collections
import copy
import attr
from recipe_engine import recipe_api
from recipe_engine.config_types import Path
from RECIPE_MODULES.fuchsia.testsharder import api as testsharder_api
# List of available targets.
TARGETS = ['x64', 'arm64']
# The PCI address to use for the block device to contain test results.
TEST_FS_PCI_ADDR = '06.0'
# The path in the BootFS manifest that we want runcmds to show up at.
RUNCMDS_BOOTFS_PATH = 'infra/runcmds'
SECRETSHIM_CIPD_VERSION = 'git_revision:63ab3ac613fceb52ac49b63b43fce841a2585645'
# Name of BigQuery project and table for uploading artifacts.
BIGQUERY_PROJECT = 'fuchsia-infra'
BIGQUERY_ARTIFACTS_DATASET = 'artifacts'
# Image and manifest names produced by the build.
IMAGES_JSON = 'images.json'
STORAGE_FULL = 'storage-full'
TEST_RESULTS_ARCHIVE_NAME = 'out.tar'
TEST_RESULTS_MINFS_NAME = 'output.fs'
SERIAL_LOG_NAME = 'serial.txt'
SYSLOG_NAME = 'syslog.txt'
TEST_SUMMARY_JSON = 'summary.json'
KERNEL_LOG = 'kernel_log.txt'
COVARGS_LOG_LEVEL = 'debug'
COVARGS_OUTPUT_JSON = 'covargs-output.json'
# The log level to use for botanist invocations in test tasks. Can be one of
# "fatal", "error", "warning", "info", "debug", or "trace", where "trace" is
# the most verbose, and fatal is the least.
BOTANIST_LOG_LEVEL = 'debug'
# The path to the botanist config for devices preprovisioned into containers.
BOTANIST_DEVICE_CONFIG = '/etc/botanist/config.json'
# System path at which authorized SSH keys are stored.
AUTHORIZED_KEY_PATH = 'data/ssh/authorized_keys'
@attr.s
class FuchsiaTestResults(object):
"""Represents the result of testing of a Fuchsia build.
Attributes:
from_fuchsia (bool): Whether the tests ran on Fuchsia.
results_dir (Path): The directory that the test results archive has
been unpacked into.
output_dir (Path): A directory containing the outputs of the swarming
task that ran these tests. Anything that's in this directory will be
uploaded to GCS when upload_results() is called.
outputs (dict[str]str): A mapping from of relative paths to files
containing stdout+stderr data to strings containing those contents.
env_name (str): The name of the task that ran these tests.
tests (seq(testsharder.Test)): The tests that this task was instructed
to run (as opposed to the results of the tests that this task *did*
run, which are enumerated in `summary`).
legacy_qemu (bool): Whether these tests were run using QEMU with
runtests (no ssh).
api (RecipeApi): The api to use for accessing recipe modules from this
object.
symbolizer_output (Path|None): The path to the symbolized log file
produced by running these tests.
overwrite_summary (bool): Whether to set the "name" and "gn_label" fields
in the summary.json produced by these tests using the corresponding
values from the input tests.json. Only affects legacy QEMU tests.
(Solely for backwards compatibility with fuchsia_perf.)
"""
from_fuchsia = attr.ib(type=bool)
results_dir = attr.ib(type=Path)
output_dir = attr.ib(type=Path)
outputs = attr.ib(type=dict)
_env_name = attr.ib(type=str)
_tests = attr.ib(type=testsharder_api.Test)
_legacy_qemu = attr.ib(type=bool)
_api = attr.ib(type=recipe_api.RecipeApi)
_symbolizer_output = attr.ib(None, type=Path)
# TODO(fxb/10410): Get rid of overwrite_summary after fuchsia_perf is dead.
_overwrite_summary = attr.ib(True, type=bool)
# Set lazily by the `summary` property, not a parameter to __init__.
_summary = attr.ib(None, init=False)
# Constants representing the result of running a test. These enumerate the
# values of the 'results' field of the entries in the summary.json file
# obtained from the target device.
_TEST_RESULT_PASS = 'PASS'
_TEST_RESULT_FAIL = 'FAIL'
@property
def summary(self):
"""The parsed summary file as a Dict or {} if missing."""
if self._summary is None:
self._summary = self._parse_summary()
return self._summary
@property
def summary_lines(self):
"""Returns a list of the lines of the summary.json file."""
return self._api.json.dumps(self.summary, indent=2).splitlines()
@property
def passed(self):
"""Whether all the tests passed."""
tests = self.summary.get('tests', [])
return all(test['result'] == self._TEST_RESULT_PASS for test in tests)
@property
def passed_test_outputs(self):
"""All entries in |self.outputs| for tests that passed."""
return self._filter_outputs_by_test_result(self._TEST_RESULT_PASS)
@property
def failed_test_outputs(self):
"""All entries in |self.outputs| for tests that failed."""
return self._filter_outputs_by_test_result(self._TEST_RESULT_FAIL)
def _filter_outputs_by_test_result(self, result):
"""Returns all entries in |self.outputs| whose result is |result|.
Args:
result (String): one of the _TEST_RESULT_* constants from this class.
Returns:
A dict whose keys are paths to the files containing each test's
stderr+stdout data and whose values are strings containing those
contents.
"""
matches = collections.OrderedDict()
# TODO(kjharland): Sort test names first.
for test in self.summary.get('tests', ()):
if test['result'] == result:
# The 'output_file' field is a path to the file containing the
# stderr+stdout data for the test, and we inline the contents of that
# file as the value in the returned dict.
matches[test['name']] = self.outputs[test['output_file']]
return matches
def _parse_summary(self):
raw_summary = self.outputs.get(TEST_SUMMARY_JSON, '')
if not raw_summary:
return {}
try:
summary = self._api.json.loads(raw_summary)
except ValueError as e: # pragma: no cover
# TODO(olivernewman): JSONDecodeError in python >=3.5
raise self._api.step.StepFailure('Invalid %s: %s' %
(TEST_SUMMARY_JSON, e.args[0]))
if not self._overwrite_summary or not self._legacy_qemu:
return summary
# We want all Fuchsia tests to have the package URL in the name field. But
# QEMU tests set "name" to be the test install path (since the test list
# sent to QEMU is a list of paths). So overwrite the "name" field to be the
# package URL instead.
# Also set "gn_label", which doesn't automatically get passed through from
# tests.json.
tests_by_path = {test.path: test for test in self._tests}
for summary_test in summary['tests']:
path = summary_test['name']
# Some zircon tests get run even though they don't show up in tests.json.
# TODO(olivernewman): After build unification is complete we can assume
# that every test in summary.json will have a corresponding entry in
# tests.json, so get rid of this check and update every summary test.
if path in tests_by_path:
test = tests_by_path[path]
assert test.package_url
summary_test.update(
name=test.package_url,
gn_label=test.label,
)
return summary
def upload_results(self, gcs_bucket, upload_to_catapult):
"""Upload select test results (e.g., coverage data) to a given GCS bucket."""
assert gcs_bucket
with self._api.step.nest('upload %s test results' %
self._env_name) as presentation:
if self.summary:
# Save the summary JSON to the test shard output dir so it gets
# uploaded to GCS for easy access by e.g. Dachsiaboard.
summary_path = self.output_dir.join(TEST_SUMMARY_JSON)
assert not self._api.path.exists(summary_path), (
'test output files should not be named %s' % TEST_SUMMARY_JSON)
self._api.file.write_json('write %s' % TEST_SUMMARY_JSON, summary_path,
self.summary)
self._upload_outputs(gcs_bucket)
link = 'go/fuchsia-result-store/bid:%s' % self._api.buildbucket_util.id
presentation.links[link] = link.replace('go/', 'https://goto.google.com/')
if upload_to_catapult:
self._api.upload.test_outputs_to_catapult(self.output_dir)
def _upload_outputs(self, gcs_bucket):
self._api.upload.directory_to_gcs(
source=self.output_dir,
bucket=gcs_bucket,
# Namespace with the test environment name to avoid collision
# of artifacts across shards.
subpath=self._env_name,
)
def raise_failures(self):
"""Raises a step failure if there were test failures."""
if not self.summary:
# Halt with step failure if summary file is missing.
raise self._api.step.StepFailure(
'Test summary JSON not found, see symbolized log for details')
failed_tests = self.failed_test_outputs.keys()
if failed_tests:
# Halt with a step failure.
raise self._api.step.StepFailure('Test failure(s): ' +
', '.join(failed_tests))
# Check serial log for failure messages
# TODO(9936): Replace with running binary tool once created.
fail_strings = ['DEVICE SUSPEND TIMED OUT', 'ASSERT FAILED']
log_path = self.output_dir.join(SERIAL_LOG_NAME)
self._api.path.mock_add_paths(log_path)
if self._api.path.exists(log_path):
self._check_log_for_failures(log_path, fail_strings)
def _check_log_for_failures(self, log_path, fail_strings):
"""Checks for fail strings in log and fails accordingly."""
log_name = self._api.path.basename(log_path)
with self._api.step.nest('check log %s:%s' %
(self._env_name, log_name)) as check_log_step:
contents = self._api.file.read_text(
'read %s' % log_name,
log_path,
test_data='extra log contents',
).get_result()
for fail_str in fail_strings:
if fail_str in contents:
check_log_step.presentation.logs[log_name] = contents.splitlines()
raise self._api.step.StepFailure(
'Found failure string in log %s: %s' % (log_name, fail_str))
def create_task(api, *args, **kwargs):
"""Create a Task object.
The base class of the class is inside the api object, so it can't be
top-level or otherwise defined at module load time. Defining it in this
function as an alternative.
For full args list see Task.__init__ a few lines down.
"""
class Task(api.swarming_retry.TriggeredTask):
def __init__(self, api, name, request, uses_legacy_qemu, targets_fuchsia,
symbolize_tool, llvm_symbolizer, tests,
debug_symbol_gcs_bucket, *args, **kwargs):
super(Task, self).__init__(
api=api, name=name, request=request, *args, **kwargs)
self._uses_legacy_qemu = uses_legacy_qemu
self._targets_fuchsia = targets_fuchsia
self._symbolize_tool = symbolize_tool
self._llvm_symbolizer = llvm_symbolizer
self._tests = tests
self._debug_symbol_gcs_bucket = debug_symbol_gcs_bucket
# Test shards with the 'multiplied:' prefix come from
# tools/integration/testsharder/shard.go in fuchsia.git. They were
# specifically created to run a test or set of tests many times to look
# for flakes. It doesn't make sense to retry these when they fail--the
# goal is to see if they fail not to get them to pass.
if name.startswith('multiplied:'):
self.max_attempts = 1
def process_result(self):
"""Unpacks the results archive produced by a test shard."""
attempt = self.attempts[-1]
assert attempt.result
result = attempt.result
if result.isolated_outputs:
attempt.task_outputs_link = result.isolated_outputs.url
if result.state == self._api.swarming.TaskState.TIMED_OUT:
attempt.failure_reason = 'timed out'
attempt.test_results = None
with self._api.step.nest(result.name):
attempt.symbolizer_output = result.output_dir.join(
self._api.symbolize.LOG)
# Figure out what happened to the swarming task.
if result.output:
# Always symbolize the result output if present in this case.
attempt.logs['symbolized log'] = self._api.symbolize(
symbolize_tool=self._symbolize_tool,
debug_symbol_gcs_bucket=self._debug_symbol_gcs_bucket,
llvm_symbolizer=self._llvm_symbolizer,
data=result.output,
symbolizer_output=attempt.symbolizer_output,
) # yapf:disable
if 'KERNEL PANIC' in result.output:
attempt.failure_reason = 'KERNEL PANIC' # pragma: no cover
self._check_logs_for_failures(attempt)
if result.success:
self._process_outputs(attempt)
def _process_outputs(self, attempt):
"""Reads the test results and output files of a swarming TaskResult.
Sets attempt.test_results if successful.
Args:
attempt (swarming_retry.Attempt): the attempt to process
"""
assert attempt.result
result = attempt.result
# Extract results if the task was not subject to an infra failure;
# otherwise, a step failure will be raised on exiting the
# defer_results() scope.
attempt.test_results_archive = None
for relative_path, absolute_path in sorted(result.outputs.iteritems()):
if relative_path in [
TEST_RESULTS_ARCHIVE_NAME, TEST_RESULTS_MINFS_NAME
]:
attempt.test_results_archive = absolute_path
assert attempt.test_results_archive, (
'test archive not found amongst outputs of task %s' % result.name)
self._parse_test_results(attempt)
attempt.logs[TEST_SUMMARY_JSON] = attempt.test_results.summary_lines
# Delete the archive so it doesn't get uploaded with the other files in
# the swarming task's output directory.
self._api.file.remove(
'remove %s' % self._api.path.basename(attempt.test_results_archive),
attempt.test_results_archive)
def _parse_test_results(self, attempt):
"""Parse test results from attempt into a FuchsiaTestResults object.
Args:
attempt (swarming_retry.Attempt): the attempt to parse
"""
assert attempt.result
result = attempt.result
results_dir = self._api.testing.results_dir_on_host.join(result.id)
# pylint: disable=protected-access
test_results_map = self._api.testing._extract_test_results_archive(
step_name='extract',
archive_path=attempt.test_results_archive,
leak_to=results_dir,
is_minfs=self._uses_legacy_qemu,
)
# pylint: enable=protected-access
attempt.test_results = FuchsiaTestResults(
from_fuchsia=self._targets_fuchsia,
results_dir=results_dir,
outputs=test_results_map,
env_name=result.name,
tests=self._tests,
legacy_qemu=self._uses_legacy_qemu,
api=api,
symbolizer_output=attempt.symbolizer_output,
output_dir=result.output_dir,
)
failed_tests = attempt.test_results.failed_test_outputs
if failed_tests:
attempt.failure_reason = '%d test(s) failed' % len(failed_tests)
def _check_logs_for_failures(self, attempt):
"""Check for failure strings in logs.
Args:
attempt (swarming_retry.Attempt): the attempt to check for logs in
"""
# Check serial log for failure messages
# TODO(9936): Replace with running binary tool once created.
fail_strings = ['DEVICE SUSPEND TIMED OUT', 'ASSERT FAILED']
log_path = attempt.result.output_dir.join(SERIAL_LOG_NAME)
self._api.path.mock_add_paths(log_path)
if self._api.path.exists(log_path):
log_name = self._api.path.basename(log_path)
with self._api.step.nest('check log %s' % log_name) as presentation:
contents = self._api.file.read_text('read', log_path)
for fail_str in fail_strings:
if fail_str in contents:
presentation.logs[log_name] = contents.splitlines()
presentation.status = self._api.step.FAILURE
presentation.step_summary_text = 'found "%s"' % fail_str
attempt.failure_reason = ('found "%s" in %s' %
(fail_str, log_name))
def present_status(self, parent_step, attempt, **kwargs):
"""Present an Attempt while showing progress in launch/collect step.
Args:
parent_step (Step): will always be 'passed tasks' or 'failed tasks'
attempt (Attempt): the Attempt to present
"""
del kwargs, parent_step # Unused.
with api.step.nest('%s (%s)' % (self.name, attempt.name)) as step:
self._present(
step, attempt, show_failures_in_red=False, show_passed=False)
def present_attempt(self, task_step, attempt, category=None):
"""Present an Attempt when summarizing results at the end of the run.
Args:
task_step (Step): assuming present() was not overridden, this will
always be a step titled after the current task
attempt (Attempt): the Attempt to present
"""
del task_step # Unused.
show_failures_in_red = True
# The 'passes' category includes all attempts of all tasks that
# eventually passed, so it includes some failures. Show those in
# green so people don't get confused and think the overall task
# failed.
# TODO(fxb/36647) after this bug is fixed show these steps in
# red, but show parent steps of those in green.
if category == 'passes':
show_failures_in_red = False
with api.step.nest(
'%s (%s)' %
(attempt.name, 'pass' if attempt.success else 'fail')) as step:
if show_failures_in_red and not attempt.success:
step.status = self._api.step.FAILURE
self._present(
step,
attempt,
show_failures_in_red=show_failures_in_red,
show_passed=True,
)
def _present(self, step, attempt, show_failures_in_red, show_passed):
"""Present an Attempt.
Choosing to do largely the same thing for both kinds of presentations.
Args:
step (Step): parent step
attempt (api.swarming_retry.Attempt): object to present
show_failures_in_red (bool): show failures in red (for final
'flakes' and 'failures' steps) or not (for 'launch/collect'
progress and 'passes' steps)
show_passed (bool): show the names of passed tests (only done for
the end)
Note: the 'passes' step can have failures underneath it because the
first attempt can fail but the retry passed.
"""
step.presentation.links['swarming task'] = attempt.task_ui_link
if attempt.task_outputs_link:
step.presentation.links['task outputs'] = attempt.task_outputs_link
if attempt.failure_reason:
step.presentation.step_summary_text = attempt.failure_reason
for log, data in attempt.logs.iteritems():
step.presentation.logs[log] = data
if attempt.test_results:
test_results = attempt.test_results
# Log the contents of each output file mentioned in the summary.
# Note this assumes the outputs are all valid UTF-8 (See fxb/9500).
for name, path in test_results.summary.get('outputs', {}).iteritems():
step.presentation.logs[name] = test_results.outputs[path].split('\n')
for test, output in test_results.failed_test_outputs.iteritems():
self._report_test_result(
test,
output,
passed=False,
show_failures_in_red=show_failures_in_red,
)
with self._api.step.nest('all passed tests') as passed_tests_step:
passed_tests = test_results.passed_test_outputs
passed_tests_step.presentation.step_summary_text = (
'%d passed tests' % len(passed_tests))
if show_passed:
# Start with a newline to prevent the first test from showing up on
# the same line as the step name.
passed_tests_step.presentation.step_text = ''.join(
'\n' + test_name for test_name in passed_tests)
for log_name in [SYSLOG_NAME, SERIAL_LOG_NAME]:
if log_name in attempt.result.outputs:
self._present_output_file(
name=log_name, path=attempt.result.outputs[log_name], step=step)
def _report_test_result(self,
test,
output,
passed,
show_failures_in_red=True):
if not passed:
test = 'failed: %s' % test
step_result = self._api.step(test, None)
if not passed:
step_result.presentation.logs['stdio'] = output.split('\n')
if show_failures_in_red:
step_result.presentation.status = self._api.step.FAILURE
def _present_output_file(self, name, path, step):
"""Records file contents to the test results step's presentation."""
contents = self._api.file.read_text(
'read %s' % name,
path,
test_data='extra log contents',
)
step.presentation.logs[name] = contents.splitlines()
return Task(*args, api=api, **kwargs)
class _TaskRequester(object):
"""Creates requests for swarming tasks that run tests."""
def __init__(self, api, buildbucket_build, per_test_timeout_secs, pool,
swarming_expiration_timeout_secs, swarming_io_timeout_secs,
timeout_secs, use_runtests):
self._api = api
self._buildbucket_build = buildbucket_build
self._per_test_timeout_secs = per_test_timeout_secs
self._pool = pool
self._swarming_expiration_timeout_secs = swarming_expiration_timeout_secs
self._swarming_io_timeout_secs = swarming_io_timeout_secs
self._timeout_secs = timeout_secs
self._use_runtests = use_runtests
def request(self, shard, build_artifacts):
# Copy the build_artifacts object to be modified for each shard.
build_artifacts = copy.deepcopy(build_artifacts)
if self._api.testing._uses_legacy_qemu(shard):
task_request = self._api.testing._construct_legacy_qemu_task_request(
task_name=shard.name,
pool=self._pool,
build_artifacts=build_artifacts,
timeout_secs=self._timeout_secs,
swarming_io_timeout_secs=self._swarming_io_timeout_secs,
swarming_expiration_timeout_secs=(
self._swarming_expiration_timeout_secs),
# TODO(IN-654): Add support for secret_bytes.
secret_bytes='',
qemu_type=shard.device_type,
shard=shard,
)
else:
task_request = self._construct_test_task_request(
build_artifacts=build_artifacts, shard=shard)
return self._api.build.ShardTaskRequest(shard, task_request)
def _construct_test_task_request(self, build_artifacts, shard):
"""Constructs a Swarming task request to run a shard of Fuchsia tests.
Args:
build_artifacts (BuildArtifacts): The Fuchsia build artifacts to test.
shard (api.testsharder.Shard): A shard of tests.
timeout_secs (int): The amount of seconds to wait for the tests to execute
before giving up.
Returns:
An api.swarming.TaskRequest representing the swarming task request.
"""
# To freely archive files from the build directory, the source, and those we
# dynamically create, we create a tree of symlinks in a fresh directory and
# isolate that. This solves the problems of (a) finding a root directory
# that works for all artifacts, (b) being able to create files in that
# directory without fear of collision, and (c) not having to isolate
# extraneous files.
isolate_tree = self._api.file.symlink_tree(
root=self._api.path.mkdtemp('isolate'))
test_manifest = 'tests.json'
self._api.file.write_json(
'write test manifest',
isolate_tree.root.join(test_manifest),
[test.render_to_jsonish() for test in shard.tests],
indent=2)
cmd = []
outputs = []
ensure_file = self._api.cipd.EnsureFile()
dimensions = {'pool': self._pool}
test_bot_cpu = 'x64'
is_emu_type = self._api.emu.is_emulator_type(shard.device_type)
# This command spins up a metadata server that allows its subcommands to
# automagically authenticate with LUCI auth, provided the sub-exec'ed tool
# was written in go or dart and respectively makes use of the standard
# cloud.google.com/go/compute/metadata or
# github.com/dart-lang/googleapis_auth authentication libraries. Such
# libraries look for a metadata server under environment variables
# like $GCE_METADATA_HOST, which LUCI emulates.
if shard.service_account:
# TODO(fxbug.dev/37142): Find a way to use the version that LUCI is
# currently using, instead of 'latest'.
ensure_file.add_package('infra/tools/luci-auth/${platform}', 'latest')
cmd.extend(['./luci-auth', 'context', '--'])
if is_emu_type:
dimensions.update(os='Debian', cpu=build_artifacts.target, kvm='1')
# To take advantage of KVM, we execute QEMU-arm tasks on arm hardware.
test_bot_cpu = build_artifacts.target
else:
dimensions.update(shard.dimensions)
if shard.targets_fuchsia:
botanist_cmd = [
'./botanist',
'-level', BOTANIST_LOG_LEVEL,
'run',
'-images', IMAGES_JSON,
'-timeout', '%ds' % self._timeout_secs,
'-syslog', SYSLOG_NAME,
'-serial-log', SERIAL_LOG_NAME,
] # yapf: disable
outputs.append(SYSLOG_NAME)
outputs.append(SERIAL_LOG_NAME)
# TODO(fxbug.dev/40840): Once we can scope the proxy server to a
# an individual task, we can make free use of it in the emulator case.
if not is_emu_type:
botanist_cmd.extend([
# For container networking and authentication reasons, we access GCS
# via a proxy server running on the controller.
'-repo', self._api.artifacts.package_repo_url(host='$GCS_PROXY_HOST'),
'-blobs', self._api.artifacts.package_blob_url(host='$GCS_PROXY_HOST'),
]) # yapf: disable
config = BOTANIST_DEVICE_CONFIG
if self._api.emu.is_emulator_type(shard.device_type):
config = './qemu.json'
botanist_cmd.extend(
['-ssh', build_artifacts.DEFAULT_ISOLATED_LAYOUT.private_key])
qemu_config = [{
'type': shard.device_type.lower(),
'path': './%s/bin' % shard.device_type.lower(),
'target': build_artifacts.target,
'cpu': 8,
'memory': 8192,
'kvm': True,
}]
if shard.device_type == 'AEMU':
self._api.emu.add_aemu_to_ensure_file(ensure_file, subdir='aemu/bin')
elif shard.device_type == 'QEMU':
self._api.emu.add_qemu_to_ensure_file(ensure_file, subdir='qemu')
self._api.file.write_json(
'write qemu config',
isolate_tree.root.join('qemu.json'),
qemu_config,
indent=2)
elif shard.netboot:
botanist_cmd.append('-netboot')
botanist_cmd.extend(['-config', config])
cmd.extend(botanist_cmd)
cmd.extend([
'./testrunner',
'-archive',
TEST_RESULTS_ARCHIVE_NAME,
])
if self._use_runtests:
cmd.append('-use-runtests')
if self._per_test_timeout_secs:
cmd.extend(['-per-test-timeout', '%ds' % self._per_test_timeout_secs])
cmd.append(test_manifest)
outputs.append(TEST_RESULTS_ARCHIVE_NAME)
isolated_hash = self._api.testing._isolate_build_artifacts(
isolate_tree, build_artifacts, shard=shard, test_bot_cpu=test_bot_cpu)
env_name = '%s-%s' % (shard.device_type or shard.os, build_artifacts.target)
tags = {'test_environment_name': [env_name]}
request = (self._api.swarming.task_request().
with_name(shard.name).
with_service_account(shard.service_account).
with_tags(tags)
) #yapf: disable
return request.with_slice(0, request[0].
with_command(cmd).
with_isolated(isolated_hash).
with_dimensions(**dimensions).
with_expiration_secs(self._swarming_expiration_timeout_secs).
with_io_timeout_secs(self._swarming_io_timeout_secs).
with_execution_timeout_secs(self._timeout_secs).
with_outputs(outputs).
with_cipd_ensure_file(ensure_file).
with_env_vars(**self._test_task_env_vars(shard, build_artifacts))
) #yapf: disable
def _test_task_env_vars(self, shard, build_artifacts):
"""Returns the environment variables to be set for the test task.
Returns:
A dict mapping string env var names to string values.
"""
build = self._buildbucket_build
commit = build.input.gitiles_commit
llvm_symbolizer = self._api.path.basename(build_artifacts.llvm_symbolizer)
env_vars = dict(
# `${ISOLATED_OUTDIR}` is a magic string that Swarming will replace
# with a temporary directory into which files will be automatically
# collected upon exit of a task.
FUCHSIA_TEST_OUTDIR='${ISOLATED_OUTDIR}',
BUILDBUCKET_ID=str(build.id),
BUILD_BOARD=build_artifacts.board,
BUILD_TYPE=build_artifacts.build_type,
BUILD_PRODUCT=build_artifacts.product,
BUILD_TARGET=build_artifacts.target,
BUILDBUCKET_BUCKET=build.builder.bucket,
# Used for symbolization:
ASAN_SYMBOLIZER_PATH=llvm_symbolizer,
UBSAN_SYMBOLIZER_PATH=llvm_symbolizer,
LSAN_SYMBOLIZER_PATH=llvm_symbolizer,
# Used by the catapult converter
BUILD_CREATE_TIME=str(build.create_time.seconds),
BUILDER_NAME=build.builder.builder,
FUCHSIA_DEVICE_TYPE=shard.device_type,
INPUT_COMMIT_HOST=commit.host,
INPUT_COMMIT_PROJECT=commit.project,
INPUT_COMMIT_REF=commit.ref,
)
# For some reason, empty string environment variables sent to the swarming
# API get interpreted as null and rejected. So don't bother sending them to
# avoid breaking the task request.
# TODO(olivernewman): Figure out whether this logic should be moved into
# the upstream swarming module (or obviated by fixing the "" -> null
# behavior).
return {k: v for k, v in env_vars.iteritems() if v}
class _ShardedTestRunner(object):
"""Handles running and analyzing tests that have been split into shards."""
def __init__(self, api, collect_timeout, debug_symbol_gcs_bucket,
llvm_symbolizer, max_attempts, swarming_output_dir,
symbolize_tool, shard_requests):
self._api = api
self._swarming_output_dir = swarming_output_dir
self._max_attempts = max_attempts
self._collect_timeout = collect_timeout
self.tasks = []
for shard_request in shard_requests:
uses_legacy_qemu = any(tag.lower() == 'uses_legacy_qemu:true'
for tag in shard_request.task_request.tags)
targets_fuchsia = shard_request.task_request[0].dimensions.get(
'os', '').lower() not in ('linux', 'mac')
self.tasks.append(
create_task(
api=self._api,
name=shard_request.task_request.name,
request=shard_request.task_request,
symbolize_tool=symbolize_tool,
llvm_symbolizer=llvm_symbolizer,
tests=shard_request.shard.tests,
debug_symbol_gcs_bucket=debug_symbol_gcs_bucket,
uses_legacy_qemu=uses_legacy_qemu,
targets_fuchsia=targets_fuchsia,
))
def run_tests(self):
"""Runs all test shards and outputs FuchsiaTestResults object for each."""
# TODO(fxb/35021) use context manager.
self._api.swarming_retry.run_tasks(
tasks=self.tasks,
collect_output_dir=self._swarming_output_dir,
max_attempts=self._max_attempts,
collect_timeout=self._collect_timeout,
)
self._api.swarming_retry.present_tasks(tasks=self.tasks)
test_results = [
x.attempts[-1].test_results
for x in self.tasks
if x.attempts[-1].test_results
]
return test_results
def raise_failures(self):
self._api.swarming_retry.raise_failures(self.tasks)
class FuchsiaTestApi(recipe_api.RecipeApi):
"""An abstraction over how Jiri checkouts are created during Fuchsia CI/CQ builds."""
FuchsiaTestResults = FuchsiaTestResults
def __init__(self, *args, **kwargs):
super(FuchsiaTestApi, self).__init__(*args, **kwargs)
self._test_runner = None
def deprecated_shard_requests(self,
build_artifacts,
test_cmds,
device_type,
pool,
timeout_secs,
pave,
requires_secrets=False,
swarming_expiration_timeout_secs=18000,
swarming_io_timeout_secs=5 * 60):
"""Returns a swarming task request for testing in the deprecated way.
Args:
build_artifacts (BuildArtifacts): The Fuchsia build artifacts to test.
test_cmds (list[str]): Command to have Fuchsia run on boot.
pool (str): Swarming pool from which the test task will be drawn.
timeout_secs (int): The amount of seconds to wait for the tests to execute
before giving up.
pave (bool): Whether to pave the image to disk. Ignored if device_type ==
'QEMU'.
requires_secrets (bool): Whether tests require plaintext secrets; ignored
if device_type != 'QEMU'.
swarming_expiration_timeout_secs (int): Maximum run time for the swarming
task, once scheduled (enforced by swarming).
swarming_io_timeout_secs (int): The swarming task will be killed if it does
not produce any output for this long.
Returns:
A list of a single ShardTaskRequest.
"""
assert test_cmds
assert device_type
self.m.minfs.minfs_path = build_artifacts.minfs
self.m.zbi.zbi_path = build_artifacts.zbi
# Copy build_artifacts because we modify its contents below.
build_artifacts = copy.deepcopy(build_artifacts)
self._install_runcmds_files(
build_artifacts,
device_type=device_type,
pave=pave,
test_cmds=test_cmds,
)
if self.m.emu.is_emulator_type(device_type):
secret_bytes = ''
if requires_secrets:
secret_bytes = self.m.json.dumps(self._decrypt_secrets(build_artifacts))
task = self._construct_legacy_qemu_task_request(
task_name='all tests',
build_artifacts=build_artifacts,
pool=pool,
timeout_secs=timeout_secs,
swarming_expiration_timeout_secs=swarming_expiration_timeout_secs,
swarming_io_timeout_secs=swarming_io_timeout_secs,
secret_bytes=secret_bytes,
qemu_type=device_type,
)
else:
task = self._construct_device_task_request(
task_name='all tests',
device_type=device_type,
build_artifacts=build_artifacts,
pool=pool,
pave=pave,
timeout_secs=timeout_secs,
swarming_expiration_timeout_secs=swarming_expiration_timeout_secs,
swarming_io_timeout_secs=swarming_io_timeout_secs,
)
# In the deprecated testing code paths, shards are not used, but it makes
# other code simpler to have a valid shard here.
dummy_shard = self.m.testsharder.Shard('dummy', (), {})
return [self.m.build.ShardTaskRequest(dummy_shard, task)]
def deprecated_test_cmds(self, spec):
runtests_cmd_parts = ['runtests', '-o', self.results_dir_on_target]
if spec.test.per_test_timeout_secs:
runtests_cmd_parts.extend(['-i', '%d' % spec.test.per_test_timeout_secs])
runtests_cmd_parts.append(spec.test.runtests_args)
return [' '.join(runtests_cmd_parts)]
def _analyze_test_results(self, test_results, presentation=None):
"""Analyzes test results represented by FuchsiaTestResults objects
Logs individual test results in separate steps.
Args:
test_results (FuchsiaTestResults): Fuchsia test result object
presentation (dict|None): A particular step's presentation on which to log
test result outputs; if not provided, that of the active result will be
used.
"""
if not test_results.summary:
return
presentation = presentation or self.m.step.active_result.presentation
# Log the summary file's contents.
presentation.logs[TEST_SUMMARY_JSON] = test_results.summary_lines
# Log the contents of each output file mentioned in the summary.
# Note this assumes the outputs are all valid UTF-8 (See fxb/9500).
for output_name, output_path in test_results.summary.get('outputs',
{}).iteritems():
output_str = test_results.outputs[output_path]
presentation.logs[output_name] = output_str.split('\n')
for test, output in test_results.failed_test_outputs.iteritems():
self._report_test_result(test, output, passed=False)
with self.m.step.nest('all passed tests'):
for test, output in test_results.passed_test_outputs.iteritems():
self._report_test_result(test, output, passed=True)
def _report_test_result(self, test, output, passed):
name = test
if not passed:
# FlakeFetcher searches for the prefix "failed: " to find failed tests.
name = 'failed: ' + name
step_result = self.m.step(name, None)
step_result.presentation.logs['stdio'] = output.split('\n')
if not passed:
step_result.presentation.status = self.m.step.FAILURE
def process_coverage(self, covargs_path, test_results, ids_txt, llvm_profdata,
llvm_cov, gcs_bucket):
output_dir = self.m.path['cleanup'].join('coverage')
cmd = [
covargs_path,
'-level',
COVARGS_LOG_LEVEL,
'-json-output',
self.m.json.output(name=COVARGS_OUTPUT_JSON),
'-output-dir',
output_dir,
'-llvm-profdata',
llvm_profdata,
'-llvm-cov',
llvm_cov,
'-ids',
ids_txt,
]
for result in test_results:
cmd.extend(['-summary', result.results_dir.join(TEST_SUMMARY_JSON)])
self.m.step('covargs', cmd)
# TODO: move this into gsutil module/deduplicate this with other GCS logic
dst = 'builds/%s/coverage' % self.m.buildbucket.build_id
step_result = self.m.gsutil.rsync(
name='upload coverage',
src=output_dir,
bucket=gcs_bucket,
dst=dst,
recursive=True,
gzip_exts=['html'],
options={
'parallel_process_count': self.m.platform.cpu_count,
'parallel_thread_count': 1,
},
multithreaded=True)
step_result.presentation.links['index.html'] = self.m.gsutil._http_url(
gcs_bucket, self.m.gsutil.join(dst, 'index.html'), True)
def _isolate_build_artifacts(self,
isolate_tree,
build_artifacts,
shard=None,
test_bot_cpu='x64',
legacy_qemu=False):
"""Populates a tree with build artifacts and isolates it.
Specifically, the following is linked into or created within the tree:
- The images in the build are linked in and manifest of them is created
in the root, if targeting a fuchsia device;
- The Linux/Mac tests in the shard and their runtime dependencies.
Args:
isolate_tree (api.file.SymlinkTree): A tree into which artifacts may be
linked.
build (FuchsiaBuildResults): The result of a fuchsia build.
shard (api.testsharder.Shard|None): A test shard.
test_bot_cpu (str|None): The host cpu of the bot running the test task.
legacy_qemu (bool): Whether to only isolate the images needed to run QEMU
alone.
Returns:
The isolated hash that may be used to reference and download the
artifacts.
"""
def register_link(relpath):
"""Prepares a symlink of a relative path within the build directory to the tree."""
isolate_tree.register_link(
target=build_artifacts.fuchsia_build_dir.join(relpath),
linkname=isolate_tree.root.join(relpath),
)
# TODO(IN-931): Remove `shard is None` condition once device and QEMU
# codepaths are passing shard and using _construct_test_task_request().
no_shard = shard is None
if no_shard or shard.targets_fuchsia:
image_list = build_artifacts.images.values()
# In the case of an emulated target, we restrict what we isolate to the
# bare essentials to avoid the needless downloading of several gigabytes
# of images on the other end.
is_emulated_target = (
(no_shard and legacy_qemu) or
(shard and self.m.emu.is_emulator_type(shard.device_type))
) # yapf: disable
if is_emulated_target:
image_list = [
img for img in image_list
if img['name'] in ['qemu-kernel', 'zircon-a', 'storage-full']
] # yapf: disable
image_manifest_path = isolate_tree.root.join(IMAGES_JSON)
self.m.file.write_json(
'write image manifest', image_manifest_path, image_list, indent=2)
for image in image_list:
register_link(image['path'])
if shard:
for test in shard.tests:
if test.os in ['linux', 'mac']:
register_link(test.path)
for dep in shard.deps:
register_link(dep)
# If targeting QEMU we include a private key corresponding to an authorized
# key already in the boot image; this is needed as we do not pave QEMU.
if shard and self.m.emu.is_emulator_type(shard.device_type):
isolate_tree.register_link(
target=build_artifacts.private_key,
linkname=isolate_tree.root.join(
build_artifacts.DEFAULT_ISOLATED_LAYOUT.private_key,),
)
for tool in [
build_artifacts.botanist(test_bot_cpu),
build_artifacts.testrunner(test_bot_cpu),
build_artifacts.llvm_symbolizer,
build_artifacts.bootserver,
]:
tool_name = self.m.path.basename(tool)
isolate_tree.register_link(
target=tool, linkname=isolate_tree.root.join(tool_name))
isolate_tree.create_links('create tree of build artifacts')
isolated = self.m.isolated.isolated(isolate_tree.root)
isolated.add_dir(isolate_tree.root)
return isolated.archive('isolate build artifacts')
@property
def results_dir_on_target(self):
"""The directory on target to which target test results will be written."""
return '/tmp/infra-test-output'
@property
def results_dir_on_host(self):
"""The directory on host to which host and target test results will be written.
Target test results will be copied over to this location and host test
results will be written here. Host and target tests on should write to
separate subdirectories so as not to collide.
"""
return self.m.path['cleanup'].join('test_results')
def _create_runcmds_script(self, device_type, test_cmds, output_path):
"""Creates a script for running tests on boot."""
# The device topological path is the toplogical path to the block device
# which will contain test output.
device_topological_path = '/dev/sys/pci/00:%s/virtio-block/block' % (
TEST_FS_PCI_ADDR)
# Script that mounts the block device to contain test output and runs tests,
# dropping test output into the block device.
results_dir = self.results_dir_on_target
runcmds = [
'mkdir %s' % results_dir,
]
if self.m.emu.is_emulator_type(device_type):
runcmds.extend([
# Wait until the MinFS test image shows up (max <timeout> ms).
'waitfor class=block topo=%s timeout=60000' % device_topological_path,
'mount %s %s' % (device_topological_path, results_dir),
] + test_cmds + [
'umount %s' % results_dir,
'dm poweroff',
])
else:
runcmds.extend(test_cmds)
runcmds_bytes = []
for line in runcmds:
if isinstance(line, unicode):
runcmds_bytes.append(line.encode('utf-8'))
elif isinstance(line, str):
runcmds_bytes.append(line)
else: # pragma: no cover
assert False, 'line is not unicode or a str: %s, %s' % (line,
type(line))
self.m.file.write_text('write runcmds', output_path,
'\n'.join(runcmds_bytes))
def _construct_legacy_qemu_task_request(self,
task_name,
build_artifacts,
pool,
timeout_secs,
swarming_expiration_timeout_secs,
swarming_io_timeout_secs,
secret_bytes,
qemu_type,
shard=None):
"""Constructs a Swarming task request which runs Fuchsia tests inside QEMU.
Expects the build and artifacts to be at the same place they were at
the end of the build.
Args:
build_artifacts (BuildArtifacts): The Fuchsia build artifacts to test.
pool (str): Swarming pool from which the test task will be drawn.
timeout_secs (int): The amount of seconds to wait for the tests to execute
before giving up.
secret_bytes (str): secret bytes to pass to the QEMU task.
qemu_type (str): type of qemu, either QEMU or AEMU.
shard (api.testsharder.Shard|None): The shard associated with the task or
None if it's not a shard.
Returns:
An api.swarming.TaskRequest representing the swarming task request.
"""
# To freely archive files from the build directory, the source, and those we
# dynamically create, we create a tree of symlinks in a fresh directory and
# isolate that. This solves the problems of (a) finding a root directory
# that works for all artifacts, (b) being able to create files in that
# directory without fear of collision, and (c) not having to isolate
# extraneous files.
isolate_tree = self.m.file.symlink_tree(root=self.m.path.mkdtemp('isolate'))
# As part of running tests, we'll send a MinFS image over to another machine
# which will be declared as a block device in QEMU, at which point
# Fuchsia will mount it and write test output to. We choose 3.5G for the
# MinFS image arbitrarily, as it appears it can hold our test output
# comfortably without going overboard on size.
#
minfs_image_path = isolate_tree.root.join(TEST_RESULTS_MINFS_NAME)
self.m.minfs.create(minfs_image_path, '3584M', name='create test image')
ensure_file = self.m.cipd.EnsureFile()
botanist_cmd = [
'./botanist',
'-level', BOTANIST_LOG_LEVEL,
'qemu',
'-type', '%s' % qemu_type.lower(),
'-qemu-dir', './%s/bin' % qemu_type.lower(),
'-images', IMAGES_JSON,
'-arch', build_artifacts.target,
'-minfs', TEST_RESULTS_MINFS_NAME,
'-pci-addr', TEST_FS_PCI_ADDR,
'-use-kvm'
] # yapf: disable
if secret_bytes:
# Wrap botanist command with secretshim which starts the secrets server
# before running the following command.
botanist_cmd = ['./secretshim'] + botanist_cmd
ensure_file.add_package('fuchsia/infra/secretshim/${platform}',
SECRETSHIM_CIPD_VERSION)
if [v for v in ['asan', 'profile'] if v in build_artifacts.variants]:
botanist_cmd.extend([
'-cpu',
str(8),
'-memory',
str(8192),
])
# storage-full not being present signifies the exclusion of the system
# partition, which means `boot` (i.e. running on boot) must be used instead
# of `system` (i.e., running after the system partition is mounted).
storage_free_build = STORAGE_FULL not in build_artifacts.images
arg_key = 'zircon.autorun.%s' % ('boot' if storage_free_build else 'system')
botanist_cmd.append('%s=/boot/bin/sh+/boot/%s' %
(arg_key, self._get_runcmds_path_per_shard(shard)))
isolated_hash = self._isolate_build_artifacts(
isolate_tree,
build_artifacts,
# To take advantage of KVM, we execute QEMU-arm tasks on arm hardware.
test_bot_cpu=build_artifacts.target,
legacy_qemu=True,
)
if qemu_type == 'AEMU':
self.m.emu.add_aemu_to_ensure_file(ensure_file, subdir='aemu/bin')
elif qemu_type == 'QEMU':
self.m.emu.add_qemu_to_ensure_file(ensure_file, subdir='qemu')
env_name = '%s-%s' % (qemu_type, build_artifacts.target)
tags = {
# consumed by google3 results uploader
'test_environment_name': [env_name],
# consumed by this recipe module
'uses_legacy_qemu': ['true']
}
request = self.m.swarming.task_request().with_name(task_name).with_tags(
tags)
return (request.with_slice(0, request[0].
with_command(botanist_cmd).
with_isolated(isolated_hash).
with_dimensions(pool=pool, os='Debian', cpu=build_artifacts.target, kvm='1').
with_io_timeout_secs(swarming_io_timeout_secs).
with_execution_timeout_secs(timeout_secs).
with_expiration_secs(swarming_expiration_timeout_secs).
with_secret_bytes(secret_bytes).
with_outputs([TEST_RESULTS_MINFS_NAME]).
with_cipd_ensure_file(ensure_file)
)) #yapf: disable
def _construct_device_task_request(self, task_name, device_type,
build_artifacts, pool, pave, timeout_secs,
swarming_expiration_timeout_secs,
swarming_io_timeout_secs):
"""Constructs a Swarming task request to run Fuchsia tests on a device.
Expects the build and artifacts to be at the same place they were at
the end of the build.
Args:
build_artifacts (BuildArtifacts): The Fuchsia build artifacts to test.
pool (str): Swarming pool from which the test task will be drawn.
pave (bool): Whether or not the build artifacts should be paved.
timeout_secs (int): The amount of seconds to wait for the tests to execute
before giving up.
Returns:
An api.swarming.TaskRequest representing the swarming task request.
"""
# Construct the botanist command.
botanist_cmd = [
'./botanist',
'-level', BOTANIST_LOG_LEVEL,
'zedboot',
'-config', BOTANIST_DEVICE_CONFIG,
'-images', IMAGES_JSON,
'-results-dir', self.results_dir_on_target,
'-out', TEST_RESULTS_ARCHIVE_NAME,
'-serial-log', SERIAL_LOG_NAME,
] # yapf: disable
if not pave:
botanist_cmd.append('-netboot')
# storage-full not being present signifies the exclusion of the system
# partition, which means `boot` (i.e. running on boot) must be used instead
# of `system` (i.e., running after the system partition is mounted).
storage_free_build = STORAGE_FULL not in build_artifacts.images
arg_key = 'zircon.autorun.%s' % ('boot' if storage_free_build else 'system')
botanist_cmd.append('%s=/boot/bin/sh+/boot/%s' %
(arg_key, RUNCMDS_BOOTFS_PATH))
# To freely archive files from the build directory, the source, and those we
# dynamically create, we create a tree of symlinks in a fresh directory and
# isolate that. This solves the problems of (a) finding a root directory
# that works for all artifacts, (b) being able to create files in that
# directory without fear of collision, and (c) not having to isolate
# extraneous files.
isolate_tree = self.m.file.symlink_tree(root=self.m.path.mkdtemp('isolate'))
isolated_hash = self._isolate_build_artifacts(isolate_tree, build_artifacts)
dimensions = {
'pool': pool,
'device_type': device_type,
}
env_name = '%s-%s' % (device_type, build_artifacts.target)
tags = {'test_environment_name': [env_name]}
request = self.m.swarming.task_request().with_name(task_name).with_tags(
tags)
return (request.with_slice(0, request[0].
with_command(botanist_cmd).
with_isolated(isolated_hash).
with_dimensions(**dimensions).
with_expiration_secs(swarming_expiration_timeout_secs).
with_io_timeout_secs(swarming_io_timeout_secs).
with_execution_timeout_secs(timeout_secs).
with_outputs([TEST_RESULTS_ARCHIVE_NAME, SERIAL_LOG_NAME])
)) #yapf: disable
def _extract_test_results_archive(self,
step_name,
archive_path,
is_minfs=False,
leak_to=None):
"""Extracts test results from an archive.
Args:
step_name (str): The name of the step.
archive_path (Path): The path to the archive which contains test results.
is_minfs (bool): Whether the archive in question is a minfs image
containing QEMU test results. If false, then the archive is assumed to
be a tar file.
leak_to (Path): Optionally leak the contents of the archive to a
directory.
Returns:
A dict mapping a filepath relative to the root of the archive to the
contents of that file in the archive.
"""
if is_minfs:
return self.m.minfs.copy_image(
step_name=step_name,
image_path=archive_path,
out_dir=leak_to,
).raw_io.output_dir
return self.m.tar.extract(
step_name=step_name,
path=archive_path,
directory=self.m.raw_io.output_dir(leak_to=leak_to),
).raw_io.output_dir
def _decrypt_secrets(self, build_artifacts):
"""Decrypts the secrets included in the build.
Args:
build (FuchsiaBuildResults): The build for which secret specs were
generated.
Returns:
The dictionary that maps secret spec name to the corresponding plaintext.
"""
self.m.cloudkms.ensure()
secret_spec_dir = build_artifacts.secret_specs
secrets_map = {}
with self.m.step.nest('process secret specs'):
secret_spec_files = self.m.file.listdir('list', secret_spec_dir)
for secret_spec_file in secret_spec_files:
basename = self.m.path.basename(secret_spec_file)
# Skip the 'ciphertext' subdirectory.
if basename == 'ciphertext':
continue
secret_name, _ = basename.split('.json', 1)
secret_spec = self.m.json.read('read spec for %s' % secret_name,
secret_spec_file).json.output
# For each test spec file <name>.json in this directory, there is an
# associated ciphertext file at ciphertext/<name>.ciphertext.
ciphertext_file = secret_spec_dir.join('ciphertext',
'%s.ciphertext' % secret_name)
key_path = secret_spec['cloudkms_key_path']
secrets_map[secret_name] = self.m.cloudkms.decrypt(
'decrypt secret for %s' % secret_name, key_path, ciphertext_file,
self.m.raw_io.output()).raw_io.output
return secrets_map
def deprecated_test(self, *args, **kwargs):
"""Tests a Fuchsia build on the specified device with retries.
Expects the build and artifacts to be at the same place they were at
the end of the build.
Args (see _launch_collect_process_funcs for other args):
max_attempts (int): The tests will be run repeatedly until either
max_attempts is hit or all tests pass.
Returns:
A `FuchsiaTestResults` object corresponding to the last test attempt.
"""
# Ideally this method's arguments would look like
# (self, *args, max_attempts=0, **kwargs)
# but Python 2 doesn't allow default keyword args after variable-length
# positional *args :(
max_attempts = kwargs.pop('max_attempts', 0)
if not max_attempts:
max_attempts = self.m.swarming_retry.DEFAULT_MAX_ATTEMPTS
launch, collect, process = self._launch_collect_process_funcs(
*args, **kwargs)
test_results = None
final_exception = None
# TODO(olivernewman): status='last' should cause this step to turn green as
# long as the *last* test attempt is green, but this isn't working, at
# least not for led jobs (if the first attempt fails at the second passes,
# the build is marked as a failure). Figure out whether this will be
# resolved by using luci_runner.
with self.m.step.nest('run tests', status='last'):
for i in range(max_attempts):
with self.m.step.nest('attempt %d' % i) as attempt_presentation:
task_result = collect(launch())
try:
test_results = process(
task_result, presentation=attempt_presentation)
except self.m.step.StepFailure as e:
final_exception = e
else:
final_exception = None
if test_results.passed:
attempt_presentation.step_text = 'passed'
break
else:
failed_count = len(test_results.failed_test_outputs)
attempt_presentation.step_text = ('%d test(s) failed' %
failed_count)
if final_exception:
raise final_exception # pylint: disable=raising-bad-type
return test_results
def deprecated_test_async(self, *args, **kwargs):
"""Launches a swarming task to run Fuchsia tests.
Returns:
A function that, when invoked, waits for the tests to complete and
returns a `FuchsiaTestResults` object representing the completed test.
"""
launch, collect, process = self._launch_collect_process_funcs(
*args, **kwargs)
request_metadata = launch()
return lambda: process(collect(request_metadata))
def _launch_collect_process_funcs(
self,
debug_symbol_gcs_bucket,
device_type,
orchestration_inputs,
overwrite_summary=True,
):
"""Returns 3-tuple of functions to launch Fuchsia tests, wait for them to
complete, and process the results.
Args:
debug_symbol_gcs_bucket (str): A GCS bucket hosting debug symbols.
device_type (str): Used as swarming device_type dimension.
orchestration_inputs (TestOrchestrationInputs): the bits of data
needed to orchestrate testing.
overwrite_summary (bool): Whether to overwrite the name and label
fields in summary.json based on tests.json. This should *only* be
used by fuchsia_perf; do NOT add any new dependencies on this.
TODO(fxb/10410): remove this entirely after fuchsia_perf is dead.
Returns:
A tuple of functions:
- `launch`, which takes no arguments and launches a swarming task to
run tests against the given build artifacts. Returns a
`TaskRequestMetadata` object.
- `collect`, which takes the `TaskRequestMetadata` object returned by
launch` (and, optionally, a `StepPresentation` object to add logs
to). It blocks until the task is complete and returns a swarming
`TaskResult`.
- `process`, which processes the results and returns a
`FuchsiaTestResults` object representing the completed tests.
"""
task = orchestration_inputs.shard_requests[0].task_request
# This directory gets passed into `collect()`, but unfortunately the
# `output_dir` attribute of the `TaskResult` returned by `collect()` is
# a subdirectory of this output dir (to ensure that different tasks'
# outputs do not collide when calling `api.swarming.collect()` with many
# tasks). So we make this variable in-scope for all three functions so that
# `process()` can use it as the output dir for the test results object.
output_dir = self.m.path.mkdtemp('swarming')
def launch():
with self.m.context(infra_steps=True):
return self.m.swarming.trigger(
'trigger 1 task', [task], cancel_extra_tasks=True)
def collect(request_metadata):
with self.m.context(infra_steps=True):
results = self.m.swarming.collect(
'collect', tasks=request_metadata, output_dir=output_dir)
assert len(results) == 1, 'len(%s) != 1' % repr(results)
return results[0]
def process(task_result, presentation=None):
symbolizer_output = output_dir.join(self.m.symbolize.LOG)
with self.m.step.nest('task results'):
self._analyze_task_result(
result=task_result,
debug_symbol_gcs_bucket=debug_symbol_gcs_bucket,
symbolize_tool=orchestration_inputs.symbolize_tool,
llvm_symbolizer=orchestration_inputs.llvm_symbolizer,
symbolizer_output=symbolizer_output,
presentation=presentation,
)
with self.m.context(infra_steps=True):
# result.outputs contains the file outputs produced by the Swarming
# task, returned via isolate. It's a mapping of the 'name' of the
# output, represented as its relative path within the isolated it
# was returned in, to a Path object pointing to its location on the
# local disk. For each of the above tasks, there should be exactly
# one output.
if SERIAL_LOG_NAME in task_result.outputs:
serial_log = task_result.outputs.pop(SERIAL_LOG_NAME)
serial_log_contents = self.m.file.read_text(
'read serial.txt', serial_log, test_data=[])
serial_presentation = (
presentation or self.m.step.active_result.presentation)
serial_presentation.logs[SERIAL_LOG_NAME] = (
serial_log_contents.splitlines())
assert len(task_result.outputs) == 1, 'len(%s) != 1' % repr(
task_result.outputs)
archive_name, archive_path = task_result.outputs.items()[0]
test_results_dir = self.results_dir_on_host.join(
'target', task_result.id)
# _extract_test_results_archive needs minfs_path to be set.
# This is kinda ugly. It'd be better to pass this in as an argument.
self.m.minfs.minfs_path = orchestration_inputs.minfs
test_results_map = self._extract_test_results_archive(
step_name='extract results',
is_minfs=self.m.emu.is_emulator_type(device_type),
archive_path=archive_path,
# Write test results to a subdirectory of |results_dir_on_host|
# so as not to collide with host test results.
leak_to=test_results_dir,
)
# Remove the archive file so it doesn't get uploaded to GCS.
self.m.file.remove('remove %s' % archive_name, archive_path)
test_list = self.m.file.read_json(
'read tests.json', orchestration_inputs.tests_file, test_data=[])
tests = [
self.m.testsharder.Test.from_jsonish(t['test']) for t in test_list
]
with self.m.step.nest('all test results'):
test_results = self.FuchsiaTestResults(
from_fuchsia=True,
results_dir=test_results_dir,
outputs=test_results_map,
env_name=task_result.name,
tests=tests,
legacy_qemu=self.m.emu.is_emulator_type(device_type),
api=self.m,
symbolizer_output=symbolizer_output,
output_dir=output_dir,
overwrite_summary=overwrite_summary,
)
self._analyze_test_results(test_results, presentation=presentation)
return test_results
return launch, collect, process
def shard_requests(
self,
build_artifacts,
buildbucket_build,
per_test_timeout_secs,
pool,
swarming_expiration_timeout_secs,
swarming_io_timeout_secs,
use_runtests,
# TODO(garymm): Remove default value.
# We should always get this from a spec.
timeout_secs=40 * 60):
"""Returns a _ShardTaskRequest for each shard in build_artifact.shards.
Args:
build_artifacts (BuildArtifacts): The Fuchsia build artifacts to test.
buildbucket_build (build_pb2.Build): The buildbucket build that is going
to orchestrate testing.
per_test_timeout_secs (int): Any test that executes for longer than this
will be considered failed.
pool (str): The Swarming pool to schedule test tasks in.
use_runtests (bool): Whether to use runtests (or else run_test_component)
when executing tests on target.
timeout_secs (int): The amount of seconds to wait for the tests to execute
before giving up.
"""
self.m.minfs.minfs_path = build_artifacts.minfs
self.m.zbi.zbi_path = build_artifacts.zbi
# This modifies the build artifacts so must be done before calling
# task_requester.request().
self._install_runcmds_files(
build_artifacts,
test_in_shards=True,
per_test_timeout_secs=per_test_timeout_secs,
in_place=True)
task_requester = _TaskRequester(
self.m,
buildbucket_build=buildbucket_build,
per_test_timeout_secs=per_test_timeout_secs,
pool=pool,
swarming_expiration_timeout_secs=swarming_expiration_timeout_secs,
swarming_io_timeout_secs=swarming_io_timeout_secs,
timeout_secs=timeout_secs,
use_runtests=use_runtests,
)
shard_requests = []
for s in build_artifacts.shards:
with self.m.step.nest('shard %s' % s.name):
shard_requests.append(task_requester.request(s, build_artifacts))
return shard_requests
def test_in_shards(self, collect_timeout_secs, debug_symbol_gcs_bucket,
max_attempts, orchestration_inputs):
"""Tests a Fuchsia build by sharding.
Expects the build and artifacts to be at the same place they were at
the end of the build.
Args:
debug_symbol_gcs_bucket (str): A GCS bucket hosting debug symbols.
max_attempts (int): Maximum number of attempts before marking a shard
as failed.
collect_timeout_secs (int): Amount of time to wait for tasks to complete.
Returns:
A list of FuchsiaTestResults objects representing the completed test
tasks that were not subject to an infra failure.
"""
# If no shards have been provided, then we have successfully run the empty
# set of tests.
if not orchestration_inputs.shard_requests:
return []
self.m.minfs.minfs_path = orchestration_inputs.minfs
collect_timeout = None
if collect_timeout_secs:
collect_timeout = '%ds' % collect_timeout_secs
self._test_runner = _ShardedTestRunner(
self.m,
collect_timeout=collect_timeout,
debug_symbol_gcs_bucket=debug_symbol_gcs_bucket,
llvm_symbolizer=orchestration_inputs.llvm_symbolizer,
max_attempts=max_attempts,
swarming_output_dir=self.m.path.mkdtemp('swarming'),
symbolize_tool=orchestration_inputs.symbolize_tool,
shard_requests=orchestration_inputs.shard_requests,
)
return self._test_runner.run_tests()
def raise_failures(self):
if self._test_runner:
self._test_runner.raise_failures()
def _analyze_task_result(
self,
result,
symbolize_tool,
llvm_symbolizer,
debug_symbol_gcs_bucket,
symbolizer_output=None,
presentation=None,
):
"""Analyzes a swarming.TaskResult and reports results as a step.
Args:
task_result (api.swarming.TaskResult): The swarming task result to
analyze.
symbolize_tool (Path): The path to the symbolize tool.
llvm_symbolizer (Path): The path to the llvm_symbolizer tool.
debug_symbol_gcs_bucket (str): A GCS bucket hosting debug symbols.
symbolizer_output (Path|None): A path to a file to write the symbolizer's
stdout.
presentation (StepPresentation|None): The step presentation to attach
logs to. Defaults to `active_result.presentation`.
Raises:
A StepFailure if a kernel panic is detected or an InfraFailure if the
swarming task failed for a different reason.
"""
presentation = presentation or self.m.step.active_result.presentation
if result.output:
# Always symbolize the result output if present in this case.
presentation.logs['symbolized log'] = self.m.symbolize(
symbolize_tool=symbolize_tool,
debug_symbol_gcs_bucket=debug_symbol_gcs_bucket,
llvm_symbolizer=llvm_symbolizer,
data=result.output,
symbolizer_output=symbolizer_output,
presentation=presentation)
# A kernel panic may be present in the logs even if the task timed out, so
# check for that first.
if 'KERNEL PANIC' in result.output:
presentation.step_text = 'kernel panic'
presentation.status = self.m.step.FAILURE
raise self.m.step.StepFailure(
'Found kernel panic. See symbolized output for details.')
if result.isolated_outputs:
presentation.links['test outputs'] = result.isolated_outputs.url
try:
result.analyze()
except self.m.step.StepFailure:
self._present_task_errors(result, presentation)
raise
def _present_task_errors(self, task_result, presentation):
"""Updates text and status of the given step to reflect test task errors."""
# If the task is in an unknown state or completed, but the executed command returned
# a non-zero exit code, this points to a tooling failure.
if task_result.state is None or task_result.state == self.m.swarming.TaskState.COMPLETED:
text = 'tooling failure' # pragma: no cover
else:
text = (task_result.state.name).replace('_', ' ').lower()
presentation.step_text = text
# Report timeouts as red, not purple, as it is likelier that the task is
# timing out due to a bug in the system under test.
if task_result.state == self.m.swarming.TaskState.TIMED_OUT:
status = self.m.step.FAILURE # pragma: no cover
else:
status = self.m.step.EXCEPTION
presentation.status = status
def _create_test_list(self, shard):
test_locations = []
for test in shard.tests:
test_locations.append(test.path)
test_list_path = self.m.path['cleanup'].join('tests-%s' %
self._normalize(shard.name))
self.m.file.write_text(
name='write test list',
dest=test_list_path,
text_data='\n'.join(test_locations) + '\n',
)
return test_list_path
def _normalize(self, name):
return name.replace(' ', '_').replace('(', '').replace(')', '')
def _get_runcmds_path_per_shard(self, shard=None):
if not shard:
return RUNCMDS_BOOTFS_PATH
return '%s-%s' % (RUNCMDS_BOOTFS_PATH, self._normalize(shard.name))
def _uses_legacy_qemu(self, shard):
return (not self.m.experimental.ssh_into_qemu and
self.m.emu.is_emulator_type(shard.device_type))
def _install_runcmds_files(self,
build_artifacts,
test_in_shards=False,
per_test_timeout_secs=None,
device_type=None,
pave=False,
test_cmds=None,
in_place=False):
"""Creates the files used to invoke runtests on boot.
This is only necessary for QEMU shards, which are the only shards that
use runcmds, and the non-sharding codepath.
"""
self.m.zbi.zbi_path = build_artifacts.zbi
manifest = {}
zbi_name = 'zircon-a'
new_zbi_filename = None
new_zbi_path = None
if test_in_shards:
# if testing in shards, zbi file should be modified once for all shards in
# place before uploading through artifactory.
assert in_place
needs_key = False
for shard in build_artifacts.shards:
if self.m.emu.is_emulator_type(shard.device_type):
if self._uses_legacy_qemu(shard):
test_list_path = self._create_test_list(shard)
runtests_file_bootfs_path = 'infra/shard-%s.run' % self._normalize(
shard.name)
runcmds_path = self.m.path['cleanup'].join(
'runcmds-%s' % self._normalize(shard.name))
runtests_cmd_parts = [
'runtests', '-o', self.results_dir_on_target, '-f',
'/boot/%s' % runtests_file_bootfs_path
]
if per_test_timeout_secs:
runtests_cmd_parts.extend(['-i', '%d' % per_test_timeout_secs])
self._create_runcmds_script(
device_type=shard.device_type,
test_cmds=[' '.join(runtests_cmd_parts)],
output_path=runcmds_path,
)
manifest[self._get_runcmds_path_per_shard(shard)] = runcmds_path
manifest[runtests_file_bootfs_path] = test_list_path
else:
needs_key = True
if needs_key:
manifest[AUTHORIZED_KEY_PATH] = build_artifacts.authorized_key
else:
assert device_type and test_cmds
if not in_place:
new_zbi_filename = 'test-infra.zbi'
new_zbi_path = build_artifacts.fuchsia_build_dir.join(new_zbi_filename)
if not self.m.emu.is_emulator_type(device_type):
zbi_name = next(
(image['name']
for image in build_artifacts.images.values()
if '--boot' in image.get(
'bootserver_%s' % ('netboot' if not pave else 'pave'), [])),
None)
assert zbi_name, 'Could not find kernel image to boot.'
runcmds_path = self.m.path['cleanup'].join('runcmds')
self._create_runcmds_script(device_type, test_cmds, runcmds_path)
manifest[RUNCMDS_BOOTFS_PATH] = runcmds_path
# Inject the runcmds script and/or authorized key into the bootfs image.
if manifest and zbi_name in build_artifacts.images:
self.m.zbi.copy_and_extend(
step_name='create zbi',
input_image=build_artifacts.fuchsia_build_dir.join(
build_artifacts.images[zbi_name]['path']),
output_image=new_zbi_path or build_artifacts.fuchsia_build_dir.join(
build_artifacts.images[zbi_name]['path']),
manifest=manifest,
)
if new_zbi_filename:
build_artifacts.images[zbi_name]['path'] = new_zbi_filename