| # Copyright 2019 The Fuchsia Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| from recipe_engine import recipe_test_api |
| |
| from RECIPE_MODULES.fuchsia.testing_requests.api import TestingRequestsApi |
| |
| |
| class FuchsiaTestTestApi(recipe_test_api.RecipeTestApi): |
| def task_request_jsonish(self, legacy_qemu, name="QEMU"): |
| ret = self.m.swarming.example_task_request_jsonish() |
| assert "name" in ret |
| ret["name"] = name |
| tags = { |
| "uses_legacy_qemu": str(legacy_qemu).lower(), |
| TestingRequestsApi.TEST_ENVIRONMENT_TAG_NAME: "env-name", |
| } |
| ret["tags"] = ["%s:%s" % item for item in tags.items()] |
| return ret |
| |
| def test( |
| self, |
| name, |
| clear_default_steps=False, |
| tryjob=False, |
| expect_failure=False, |
| properties=None, |
| status=None, |
| steps=(), |
| shard_name="all tests", |
| ): # pragma: no cover |
| """Returns a test case appropriate for yielding from GenTests(). |
| |
| Provides default property values for the common cases. |
| |
| Args: |
| name: Test name. |
| clear_default_steps: If true, does not provide default values. |
| However, setting tryjob=True does still add the tryjob-related |
| properties. Buildbucket properties are always added. |
| tryjob: If true, adds tryjob-related properties. |
| expect_failure: If true, the test is expected to fail before |
| completion, so certain common steps shouldn't be expected to happen. |
| properties: A required dict of properties to override for this test. |
| status: One of 'success', 'failure', or 'infra_failure'. The result of |
| the test case will be required to match this. If None, will use |
| expect_failure to assume either 'success' or 'failure' |
| steps: An optional sequence of RecipeTestApi.step_data objects to append |
| to the output of this function. |
| |
| Returns: |
| TestData object. |
| """ |
| properties = properties or {} |
| |
| # Add implicit steps. |
| extra_steps = [] |
| if not clear_default_steps: |
| on_device = properties.get("device_type", "QEMU") != "QEMU" |
| |
| # Don't add these if the test is expected to raise an exception; |
| # the recipes engine will complain that these steps aren't consumed. |
| if not expect_failure: |
| task_result = self.task_result(on_device, shard_name=shard_name) |
| extra_steps.append(self.task_retry_step_data([task_result])) |
| extra_steps.append(self.test_step_data(shard_name=shard_name)) |
| |
| # Assemble the return value. |
| if status is None: |
| status = "success" |
| if expect_failure: |
| status = "failure" |
| ret = self.m.status_check.test(name, status=status) |
| |
| ret += self.m.properties(**properties) |
| git_repo = "https://fuchsia.googlesource.com/fuchsia" |
| if tryjob: |
| ret += self.m.buildbucket.try_build(project="fuchsia", git_repo=git_repo) |
| else: |
| ret += self.m.buildbucket.ci_build(project="fuchsia", git_repo=git_repo) |
| |
| for s in extra_steps: |
| ret += s |
| for s in steps: |
| # Provided steps override implicit steps. |
| ret += s |
| return ret |
| |
| def task_result(self, on_device, shard_name="all tests"): |
| outputs = ( |
| ["out/path/to/test_output_file.txt", "serial_log.txt"] |
| if on_device |
| else ["output.fs"] |
| ) |
| task_result = self.m.swarming.task_result( |
| id="0", name=shard_name, outputs=outputs |
| ) |
| # needed for checking serial log |
| task_result["output_dir"] = "output_dir" |
| return task_result |
| |
| def task_requests_step_data(self, task_requests, step_name): |
| """Returns mock step data for swarming task requests. |
| |
| This should be used by any test which calls api.fuchsia.test*() and expects |
| to shard tests. |
| |
| Args: |
| shards (seq[dict]): A set of example shards which should |
| be used as step data for the result of invoking the testsharder. |
| step_name (str): name to use for step data |
| |
| Returns: |
| RecipeTestApi.step_data for the extract_results step. |
| """ |
| return self.step_data(step_name, self.m.json.output(task_requests)) |
| |
| def task_retry_step_data( |
| self, task_results, iteration=0, collect_iteration=0, attempt=None |
| ): |
| """Returns mock step data for collecting Swarming test tasks. |
| |
| This should be used by any test which calls api.fuchsia.test*(). |
| |
| Args: |
| task_results (seq[StepTestData]): step test data to be used for the |
| collect step. |
| iteration (int): iteration of swarming_retry. If negative, don't launch. |
| collect_iteration (int): iteration to collect results on. If negative, |
| there will be no collect step. If 0, it will be equal to the non-negative |
| iteration or 0. |
| attempt (int or None): index of attempt (if None, swarming_retry will |
| make an educated guess) |
| |
| Returns: |
| RecipeTestApi.step_data for the collect step. |
| """ |
| step_data = None |
| if iteration >= 0: |
| for task in task_results: |
| results = task["results"] |
| launch_step_data = self.m.swarming_retry.trigger_data( |
| name=results["name"], |
| task_id=results["task_id"], |
| attempt=attempt, |
| iteration=iteration, |
| ) |
| if step_data: |
| step_data += launch_step_data |
| else: |
| step_data = launch_step_data |
| |
| if collect_iteration >= 0: |
| collect_step_data = self.m.swarming_retry.collect_data( |
| task_results, iteration=max(collect_iteration, iteration) |
| ) |
| if step_data: |
| step_data += collect_step_data |
| else: |
| step_data = collect_step_data |
| |
| return step_data |
| |
| def test_step_data( |
| self, |
| base_name=None, |
| failure=False, |
| tefmocheck_failure=False, |
| shard_name=None, |
| iteration=0, |
| tests_json=None, |
| legacy_qemu=True, |
| output_dir_contents=None, |
| run_count_per_test=1, |
| ): |
| """Returns mock step data for test results. |
| |
| This should be used by any test which calls api.fuchsia.test*() and expects |
| it to make it to the tests analysis phase. |
| |
| Args: |
| failure (bool): Whether a test failed or not. |
| tefmocheck_failure (bool): Whether a test failed for a testing_failure_mode. |
| shard_name (str): name of the shard for step name |
| iteration (int): iteration of launch/collect step for step name |
| tests_json (seq(dict)): The tests.json to mock. Defaults to |
| `testing_requests.EXAMPLE_TESTS_JSON`. |
| legacy_qemu (bool): Whether the tests are being run on QEMU using |
| runtests (not testrunner). |
| run_count_per_test (int): How many times to present each test in the |
| summary.json. This represents the number of times the test was |
| attempted. All attempts except the last is a flake. |
| |
| Returns: |
| RecipeTestApi.step_data for the extract_results step and related steps. |
| """ |
| if base_name is None: |
| base_name = "launch/collect.%d.process results" % iteration |
| if shard_name: |
| base_name = "%s.%s" % (base_name, shard_name) |
| summary_step_name = base_name + ".tefmocheck" |
| extract_step_name = base_name + ".get extracted files" |
| if output_dir_contents is None: |
| output_dir_contents = self._output_dir_contents( |
| failure=failure, |
| tefmocheck_failure=tefmocheck_failure, |
| tests_json=tests_json, |
| legacy_qemu=legacy_qemu, |
| run_count_per_test=run_count_per_test, |
| ) |
| steps = self.step_data( |
| extract_step_name, self.m.file.listdir(output_dir_contents.keys()) |
| ) |
| if "summary.json" in output_dir_contents: |
| steps += self.step_data( |
| summary_step_name, |
| stdout=self.m.raw_io.output(output_dir_contents["summary.json"]), |
| ) |
| return steps |
| |
| def _output_dir_contents( |
| self, |
| failure=False, |
| tefmocheck_failure=False, |
| tests_json=None, |
| legacy_qemu=True, |
| run_count_per_test=1, |
| ): |
| """Returns mock data for the test results directory. |
| |
| Returns: |
| A map of files in the test results dir to their contents. |
| """ |
| result = "FAIL" if failure else "PASS" |
| |
| output_dir_contents = { |
| "goodbye.txt": "goodbye", |
| "benchmark.catapult_json": '["dummy_catapult_data"]', |
| } |
| summary_tests = [] |
| if tests_json is None: |
| tests_json = self.m.testing_requests.EXAMPLE_TESTS_JSON |
| for item in tests_json: |
| test = item["test"] |
| name = test["name"] |
| output_file = name if "." in name else "%s.out" % name |
| for attempt_index in range(run_count_per_test): |
| summary_test = { |
| # QEMU outputs path as the name field. |
| "name": test["path"] if legacy_qemu else name, |
| "result": result, |
| "output_file": output_file, |
| "is_testing_failure_mode": tefmocheck_failure, |
| } |
| if not legacy_qemu: |
| summary_test["gn_label"] = "//path/to/%s:%s(//toolchain)" % ( |
| name, |
| name, |
| ) |
| if attempt_index + 1 < run_count_per_test: |
| summary_test["result"] = "FAIL" |
| summary_tests.append(summary_test) |
| # Use the test name as the mocked contents of the test's output file, for |
| # lack of a better option. |
| output_dir_contents[output_file] = name |
| |
| output_dir_contents["summary.json"] = self.m.json.dumps( |
| {"tests": summary_tests, "outputs": {"goodbye-txt": "goodbye.txt"}} |
| ) |
| return output_dir_contents |