| # Copyright 2019 The Fuchsia Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| from recipe_engine import recipe_test_api |
| |
| from RECIPE_MODULES.fuchsia.testing_requests.api import TestingRequestsApi |
| |
| |
| class FuchsiaTestTestApi(recipe_test_api.RecipeTestApi): |
| def task_request_jsonish(self, name="QEMU"): |
| ret = self.m.swarming.example_task_request_jsonish() |
| assert "name" in ret |
| ret["name"] = name |
| tags = { |
| TestingRequestsApi.TEST_ENVIRONMENT_TAG_NAME: "env-name", |
| } |
| ret["tags"] = ["%s:%s" % item for item in tags.items()] |
| return ret |
| |
| def task_requests_step_data(self, task_requests, step_name): |
| """Returns mock step data for swarming task requests. |
| |
| Args: |
| task_requests (seq[dict]): Mock task requests produced by a |
| a subbuild. |
| step_name (str): Name to use for step data. |
| |
| Returns: |
| RecipeTestApi.step_data for the extract_results step. |
| """ |
| return self.step_data(step_name, self.m.file.read_json(task_requests)) |
| |
| def task_retry_step_data( |
| self, task_results, iteration=0, collect_iteration=0, attempt=None |
| ): |
| """Returns mock step data for collecting Swarming test tasks. |
| |
| Args: |
| task_results (seq[StepTestData]): step test data to be used for the |
| collect step. |
| iteration (int): iteration of swarming_retry. If negative, don't launch. |
| collect_iteration (int): iteration to collect results on. If negative, |
| there will be no collect step. If 0, it will be equal to the non-negative |
| iteration or 0. |
| attempt (int or None): index of attempt (if None, swarming_retry will |
| make an educated guess) |
| |
| Returns: |
| RecipeTestApi.step_data for the collect step. |
| """ |
| step_data = None |
| if iteration >= 0: |
| for task in task_results: |
| results = task["results"] |
| launch_step_data = self.m.swarming_retry.trigger_data( |
| name=results["name"], |
| task_id=results["task_id"], |
| attempt=attempt, |
| iteration=iteration, |
| ) |
| if step_data: |
| step_data += launch_step_data |
| else: |
| step_data = launch_step_data |
| |
| if collect_iteration >= 0: |
| collect_step_data = self.m.swarming_retry.collect_data( |
| task_results, iteration=max(collect_iteration, iteration) |
| ) |
| if step_data: |
| step_data += collect_step_data |
| else: |
| step_data = collect_step_data |
| |
| return step_data |
| |
| def test_step_data( |
| self, |
| base_name=None, |
| failure=False, |
| test_status=None, |
| tefmocheck_failure=False, |
| shard_name=None, |
| iteration=0, |
| tests=None, |
| output_dir_contents=None, |
| run_count_per_test=1, |
| ): |
| """Returns mock step data for test results. |
| |
| Args: |
| failure (bool): Whether a test failed or not. |
| test_status (string): Status to set for each test in summary.json. By |
| default, determined based on `failure`. |
| tefmocheck_failure (bool): Whether a test failed for a testing_failure_mode. |
| shard_name (str): name of the shard for step name |
| iteration (int): iteration of launch/collect step for step name |
| tests (seq(str)): Names of tests to mock. |
| run_count_per_test (int): How many times to present each test in the |
| summary.json. This represents the number of times the test was |
| attempted. All attempts except the last is a flake. |
| |
| Returns: |
| RecipeTestApi.step_data for the extract_results step and related steps. |
| """ |
| if test_status is None: |
| test_status = "FAIL" if failure else "PASS" |
| |
| if base_name is None: |
| base_name = f"launch/collect.{int(iteration)}.process results" |
| if shard_name: |
| base_name = f"{base_name}.{shard_name}" |
| summary_step_name = base_name + ".tefmocheck" |
| extract_step_name = base_name + ".get extracted files" |
| if output_dir_contents is None: |
| output_dir_contents = self._output_dir_contents( |
| test_status=test_status, |
| tefmocheck_failure=tefmocheck_failure, |
| tests=tests, |
| run_count_per_test=run_count_per_test, |
| ) |
| steps = self.step_data( |
| extract_step_name, self.m.file.listdir(output_dir_contents.keys()) |
| ) |
| if "summary.json" in output_dir_contents: |
| steps += self.step_data( |
| summary_step_name, |
| self.m.raw_io.output_text(output_dir_contents["summary.json"]), |
| ) |
| return steps |
| |
| def _output_dir_contents( |
| self, |
| test_status, |
| tefmocheck_failure, |
| tests, |
| run_count_per_test, |
| ): |
| """Returns mock data for the test results directory. |
| |
| Returns: |
| A map of files in the test results dir to their contents. |
| """ |
| output_dir_contents = { |
| "goodbye.txt": "goodbye", |
| "benchmark.catapult_json": '["mock_catapult_data"]', |
| } |
| summary_tests = [] |
| tests = tests or ["hello", "goodbye"] |
| for name in tests: |
| output_file = f"{name}/stdio.txt" |
| for attempt_index in range(run_count_per_test): |
| summary_test = { |
| "name": name, |
| "result": test_status, |
| "output_files": [output_file], |
| "is_testing_failure_mode": tefmocheck_failure, |
| "duration_milliseconds": 1000, |
| } |
| summary_test["gn_label"] = f"//path/to/{name}:{name}(//toolchain)" |
| if attempt_index + 1 < run_count_per_test: |
| summary_test["result"] = "FAIL" |
| summary_tests.append(summary_test) |
| # Use the test name as the mocked contents of the test's output file, for |
| # lack of a better option. |
| output_dir_contents[output_file] = name |
| |
| output_dir_contents["summary.json"] = self.m.json.dumps( |
| {"tests": summary_tests, "outputs": {"goodbye-txt": "goodbye.txt"}} |
| ) |
| return output_dir_contents |