blob: f9e82831db76433ebd6d5357bfc45e397327ec8b [file] [log] [blame]
# Copyright 2019 The Fuchsia Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
from recipe_engine import recipe_test_api
class FuchsiaTestTestApi(recipe_test_api.RecipeTestApi):
def task_request_jsonish(self, legacy_qemu):
ret = self.m.swarming.example_task_request_jsonish()
ret['tags'] = ['uses_legacy_qemu:%s' % str(legacy_qemu).lower()]
return ret
def test(self,
name,
clear_default_steps=False,
tryjob=False,
expect_failure=False,
properties=None,
status=None,
enable_retries=True,
steps=()): # pragma: no cover
"""Returns a test case appropriate for yielding from GenTests().
Provides default property values for the common cases.
Args:
name: Test name.
clear_default_steps: If true, does not provide default values.
However, setting tryjob=True does still add the tryjob-related
properties. Buildbucket properties are always added.
tryjob: If true, adds tryjob-related properties.
expect_failure: If true, the test is expected to fail before
completion, so certain common steps shouldn't be expected to happen.
properties: A required dict of properties to override for this test.
status: One of 'success', 'failure', or 'infra_failure'. The result of
the test case will be required to match this. If None, will use
expect_failure to assume either 'success' or 'failure'
steps: An optional sequence of RecipeTestApi.step_data objects to append
to the output of this function.
Returns:
TestData object.
"""
properties = properties or {}
# Add implicit steps.
extra_steps = []
if not clear_default_steps:
# Don't add these if the test is expected to raise an exception;
# the recipes engine will complain that these steps aren't consumed.
on_device = properties.get('device_type', 'QEMU') != 'QEMU'
if not expect_failure:
outputs = ['out.tar', 'serial.txt'] if on_device else ['output.fs']
task_result = self.m.swarming.task_result(
id='1', name='test', outputs=outputs)
# needed for checking serial log
task_result['output_dir'] = 'output_dir'
extra_steps.append(
self.task_step_data([task_result], enable_retries=enable_retries))
extra_steps.append(self.tests_json_data(enable_retries=enable_retries))
extra_steps.append(self.test_step_data(enable_retries=enable_retries))
# Assemble the return value.
if status is None:
status = 'success'
if expect_failure:
status = 'failure'
ret = self.m.status_check.test(name, status=status)
ret += self.m.properties(**properties)
git_repo = 'https://fuchsia.googlesource.com/fuchsia'
if tryjob:
ret += self.m.buildbucket.try_build(project='fuchsia', git_repo=git_repo)
else:
ret += self.m.buildbucket.ci_build(project='fuchsia', git_repo=git_repo)
for s in extra_steps:
ret += s
for s in steps:
# Provided steps override implicit steps.
ret += s
return ret
def task_requests_step_data(self, task_requests, step_name):
"""Returns mock step data for swarming task requests.
This should be used by any test which calls api.fuchsia.test*() and expects
to shard tests.
Args:
shards (seq[dict]): A set of example shards which should
be used as step data for the result of invoking the testsharder.
step_name (str): name to use for step data
Returns:
RecipeTestApi.step_data for the extract_results step.
"""
return self.step_data(step_name, self.m.json.output(task_requests))
def task_step_data(self, task_results, iteration=0, enable_retries=True):
"""Returns mock step data for collecting Swarming test tasks.
This should be only be used when not using sharding.
Args:
task_results (seq[StepTestData]): step test data to be used for the
collect step.
iteration (int): The retry index of this task.
enable_retries (bool): Whether this is one of potentially several test
task attempts.
Returns:
RecipeTestApi.step_data for the collect step.
"""
step_name = 'collect'
if enable_retries:
step_name = 'run tests.attempt %d.%s' % (iteration, step_name)
return self.step_data(step_name, self.m.swarming.collect(task_results))
def task_retry_step_data(self, task_results, iteration=0, attempt=None):
"""Returns mock step data for collecting Swarming test tasks.
This should be used by any test which calls api.fuchsia.test*().
Args:
task_results (seq[StepTestData]): step test data to be used for the
collect step.
iteration (int): iteration of swarming_retry
attempt (int or None): index of attempt (if None, swarming_retry will
make an educated guess)
Returns:
RecipeTestApi.step_data for the collect step.
"""
launch_step_data = []
for task in task_results:
results = task['results']
launch_step_data.append(
self.m.swarming_retry.trigger_data(
name=results['name'],
task_id=results['task_id'],
attempt=attempt,
iteration=iteration))
collect_step_data = self.m.swarming_retry.collect_data(
task_results, iteration=iteration)
return sum(launch_step_data, collect_step_data)
# pylint: disable=unused-argument
def task_retry_log_data(self, iteration, task_name, log_name, log_contents):
return self.step_data(
'launch/collect.{iteration}.process results.{task_name}.'
'check log {log_name}.read'.format(**locals()),
self.m.file.read_text(log_contents))
# pylint: enable=unused-argument
def test_step_data(
self,
failure=False,
shard_name='',
iteration=0,
enable_retries=True,
tests_json=None,
qemu=True,
output_dir_contents=None,
):
"""Returns mock step data for test results.
This should be used by any test which calls api.fuchsia.test*() and expects
it to make it to the tests analysis phase.
Args:
failure (bool): Whether a test failed or not.
shard_name (str): name of the shard for step name
iteration (int): iteration of launch/collect step for step name
tests_json (seq(dict)): The tests.json to mock. Defaults to
`testing_requests.EXAMPLE_TESTS_JSON`.
qemu (bool): Whether the tests are being run on QEMU.
Returns:
RecipeTestApi.step_data for the extract_results step and related steps.
"""
if shard_name:
base_name = 'launch/collect.%d.process results.%s' % (iteration,
shard_name)
step_name = base_name + '.extract'
summary_name = base_name + '.read summary.json'
extract_name = base_name + '.get extracted files'
else:
step_name = 'extract results'
summary_name = 'all test results.read summary.json'
extract_name = 'get extracted files'
if enable_retries:
step_name = 'run tests.attempt %d.%s' % (iteration, step_name)
summary_name = 'run tests.attempt %d.%s' % (iteration, summary_name)
extract_name = 'run tests.attempt %d.%s' % (iteration, extract_name)
if output_dir_contents is None:
output_dir_contents = self._output_dir_contents(
failure=failure, tests_json=tests_json, qemu=qemu)
steps = self.step_data(extract_name,
self.m.file.listdir(output_dir_contents.keys()))
if 'summary.json' in output_dir_contents:
steps += self.step_data(
summary_name,
self.m.file.read_text(output_dir_contents['summary.json']))
return steps
def _output_dir_contents(
self,
failure=False,
tests_json=None,
qemu=True,
):
"""Returns mock data for the test results directory.
Returns:
A map of files in the test results dir to their contents.
"""
result = 'FAIL' if failure else 'PASS'
output_dir_contents = {
'goodbye.txt': 'goodbye',
'benchmark.catapult_json': '["dummy_catapult_data"]',
}
summary_tests = []
if tests_json is None:
tests_json = self.m.testing_requests.EXAMPLE_TESTS_JSON
for item in tests_json:
test = item['test']
name = test['name']
output_file = name if '.' in name else '%s.out' % name
summary_test = {
# QEMU outputs path as the name field.
'name': test['path'] if qemu else name,
'result': result,
'output_file': output_file
}
if not qemu:
summary_test['gn_label'] = '//path/to/%s:%s(//toolchain)' % (name, name)
summary_tests.append(summary_test)
# Use the test name as the mocked contents of the test's output file, for
# lack of a better option.
output_dir_contents[output_file] = name
output_dir_contents['summary.json'] = self.m.json.dumps({
'tests': summary_tests,
'outputs': {
'goodbye-txt': 'goodbye.txt'
}
})
return output_dir_contents
def tests_json_data(self, enable_retries=True, iteration=0, tests=None):
"""Mock the contents of the tests.json file.
This is only valid for non-sharded tests, since sharded tests don't read
the tests.json file directly.
"""
step_name = 'read tests.json'
if enable_retries:
step_name = 'run tests.attempt %d.%s' % (iteration, step_name)
if tests is None:
tests = self.m.testing_requests.EXAMPLE_TESTS_JSON
return self.step_data(step_name, self.m.file.read_json(tests))