blob: d8afc3b23c87c4eedb8a7982c36119af6ab7c71d [file] [log] [blame]
# Copyright 2019 The Fuchsia Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
from recipe_engine import recipe_test_api
from RECIPE_MODULES.fuchsia.testing_requests.api import TestingRequestsApi
class FuchsiaTestTestApi(recipe_test_api.RecipeTestApi):
def task_request_jsonish(self, legacy_qemu, name="QEMU"):
ret = self.m.swarming.example_task_request_jsonish()
assert "name" in ret
ret["name"] = name
tags = {
"uses_legacy_qemu": str(legacy_qemu).lower(),
TestingRequestsApi.TEST_ENVIRONMENT_TAG_NAME: "env-name",
}
ret["tags"] = ["%s:%s" % item for item in tags.items()]
return ret
def test(
self,
name,
clear_default_steps=False,
tryjob=False,
expect_failure=False,
properties=None,
status=None,
steps=(),
shard_name="all tests",
): # pragma: no cover
"""Returns a test case appropriate for yielding from GenTests().
Provides default property values for the common cases.
Args:
name: Test name.
clear_default_steps: If true, does not provide default values.
However, setting tryjob=True does still add the tryjob-related
properties. Buildbucket properties are always added.
tryjob: If true, adds tryjob-related properties.
expect_failure: If true, the test is expected to fail before
completion, so certain common steps shouldn't be expected to happen.
properties: A required dict of properties to override for this test.
status: One of 'success', 'failure', or 'infra_failure'. The result of
the test case will be required to match this. If None, will use
expect_failure to assume either 'success' or 'failure'
steps: An optional sequence of RecipeTestApi.step_data objects to append
to the output of this function.
Returns:
TestData object.
"""
properties = properties or {}
# Add implicit steps.
extra_steps = []
if not clear_default_steps:
on_device = properties.get("device_type", "QEMU") != "QEMU"
# Don't add these if the test is expected to raise an exception;
# the recipes engine will complain that these steps aren't consumed.
if not expect_failure:
task_result = self.task_result(on_device, shard_name=shard_name)
extra_steps.append(self.task_retry_step_data([task_result]))
extra_steps.append(self.test_step_data(shard_name=shard_name))
# Assemble the return value.
if status is None:
status = "success"
if expect_failure:
status = "failure"
ret = self.m.status_check.test(name, status=status)
ret += self.m.properties(**properties)
git_repo = "https://fuchsia.googlesource.com/fuchsia"
if tryjob:
ret += self.m.buildbucket.try_build(project="fuchsia", git_repo=git_repo)
else:
ret += self.m.buildbucket.ci_build(project="fuchsia", git_repo=git_repo)
for s in extra_steps:
ret += s
for s in steps:
# Provided steps override implicit steps.
ret += s
return ret
def task_result(self, on_device, shard_name="all tests"):
outputs = (
["out/path/to/test_output_file.txt", "serial_log.txt"]
if on_device
else ["output.fs"]
)
task_result = self.m.swarming.task_result(
id="0", name=shard_name, outputs=outputs
)
# needed for checking serial log
task_result["output_dir"] = "output_dir"
return task_result
def task_requests_step_data(self, task_requests, step_name):
"""Returns mock step data for swarming task requests.
This should be used by any test which calls api.fuchsia.test*() and expects
to shard tests.
Args:
shards (seq[dict]): A set of example shards which should
be used as step data for the result of invoking the testsharder.
step_name (str): name to use for step data
Returns:
RecipeTestApi.step_data for the extract_results step.
"""
return self.step_data(step_name, self.m.json.output(task_requests))
def task_retry_step_data(
self, task_results, iteration=0, collect_iteration=0, attempt=None
):
"""Returns mock step data for collecting Swarming test tasks.
This should be used by any test which calls api.fuchsia.test*().
Args:
task_results (seq[StepTestData]): step test data to be used for the
collect step.
iteration (int): iteration of swarming_retry. If negative, don't launch.
collect_iteration (int): iteration to collect results on. If negative,
there will be no collect step. If 0, it will be equal to the non-negative
iteration or 0.
attempt (int or None): index of attempt (if None, swarming_retry will
make an educated guess)
Returns:
RecipeTestApi.step_data for the collect step.
"""
step_data = None
if iteration >= 0:
for task in task_results:
results = task["results"]
launch_step_data = self.m.swarming_retry.trigger_data(
name=results["name"],
task_id=results["task_id"],
attempt=attempt,
iteration=iteration,
)
if step_data:
step_data += launch_step_data
else:
step_data = launch_step_data
if collect_iteration >= 0:
collect_step_data = self.m.swarming_retry.collect_data(
task_results, iteration=max(collect_iteration, iteration)
)
if step_data:
step_data += collect_step_data
else:
step_data = collect_step_data
return step_data
def test_step_data(
self,
base_name=None,
failure=False,
tefmocheck_failure=False,
shard_name=None,
iteration=0,
tests_json=None,
legacy_qemu=True,
output_dir_contents=None,
run_count_per_test=1,
):
"""Returns mock step data for test results.
This should be used by any test which calls api.fuchsia.test*() and expects
it to make it to the tests analysis phase.
Args:
failure (bool): Whether a test failed or not.
tefmocheck_failure (bool): Whether a test failed for a testing_failure_mode.
shard_name (str): name of the shard for step name
iteration (int): iteration of launch/collect step for step name
tests_json (seq(dict)): The tests.json to mock. Defaults to
`testing_requests.EXAMPLE_TESTS_JSON`.
legacy_qemu (bool): Whether the tests are being run on QEMU using
runtests (not testrunner).
run_count_per_test (int): How many times to present each test in the
summary.json. This represents the number of times the test was
attempted. All attempts except the last is a flake.
Returns:
RecipeTestApi.step_data for the extract_results step and related steps.
"""
if base_name is None:
base_name = "launch/collect.%d.process results" % iteration
if shard_name:
base_name = "%s.%s" % (base_name, shard_name)
summary_step_name = base_name + ".tefmocheck"
extract_step_name = base_name + ".get extracted files"
if output_dir_contents is None:
output_dir_contents = self._output_dir_contents(
failure=failure,
tefmocheck_failure=tefmocheck_failure,
tests_json=tests_json,
legacy_qemu=legacy_qemu,
run_count_per_test=run_count_per_test,
)
steps = self.step_data(
extract_step_name, self.m.file.listdir(output_dir_contents.keys())
)
if "summary.json" in output_dir_contents:
steps += self.step_data(
summary_step_name,
stdout=self.m.raw_io.output(output_dir_contents["summary.json"]),
)
return steps
def _output_dir_contents(
self,
failure=False,
tefmocheck_failure=False,
tests_json=None,
legacy_qemu=True,
run_count_per_test=1,
):
"""Returns mock data for the test results directory.
Returns:
A map of files in the test results dir to their contents.
"""
result = "FAIL" if failure else "PASS"
output_dir_contents = {
"goodbye.txt": "goodbye",
"benchmark.catapult_json": '["dummy_catapult_data"]',
}
summary_tests = []
if tests_json is None:
tests_json = self.m.testing_requests.EXAMPLE_TESTS_JSON
for item in tests_json:
test = item["test"]
name = test["name"]
output_file = name if "." in name else "%s.out" % name
for attempt_index in range(run_count_per_test):
summary_test = {
# QEMU outputs path as the name field.
"name": test["path"] if legacy_qemu else name,
"result": result,
"output_file": output_file,
"is_testing_failure_mode": tefmocheck_failure,
}
if not legacy_qemu:
summary_test["gn_label"] = "//path/to/%s:%s(//toolchain)" % (
name,
name,
)
if attempt_index + 1 < run_count_per_test:
summary_test["result"] = "FAIL"
summary_tests.append(summary_test)
# Use the test name as the mocked contents of the test's output file, for
# lack of a better option.
output_dir_contents[output_file] = name
output_dir_contents["summary.json"] = self.m.json.dumps(
{"tests": summary_tests, "outputs": {"goodbye-txt": "goodbye.txt"}}
)
return output_dir_contents