| # Copyright 2019 The Fuchsia Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| from recipe_engine import recipe_test_api |
| |
| |
| class FuchsiaTestTestApi(recipe_test_api.RecipeTestApi): |
| |
| def task_request_jsonish(self, legacy_qemu): |
| ret = self.m.swarming.example_task_request_jsonish() |
| ret['tags'] = ['uses_legacy_qemu:%s' % str(legacy_qemu).lower()] |
| return ret |
| |
| def test(self, |
| name, |
| clear_default_steps=False, |
| tryjob=False, |
| expect_failure=False, |
| properties=None, |
| status=None, |
| enable_retries=True, |
| steps=()): # pragma: no cover |
| """Returns a test case appropriate for yielding from GenTests(). |
| |
| Provides default property values for the common cases. |
| |
| Args: |
| name: Test name. |
| clear_default_steps: If true, does not provide default values. |
| However, setting tryjob=True does still add the tryjob-related |
| properties. Buildbucket properties are always added. |
| tryjob: If true, adds tryjob-related properties. |
| expect_failure: If true, the test is expected to fail before |
| completion, so certain common steps shouldn't be expected to happen. |
| properties: A required dict of properties to override for this test. |
| status: One of 'success', 'failure', or 'infra_failure'. The result of |
| the test case will be required to match this. If None, will use |
| expect_failure to assume either 'success' or 'failure' |
| steps: An optional sequence of RecipeTestApi.step_data objects to append |
| to the output of this function. |
| |
| Returns: |
| TestData object. |
| """ |
| properties = properties or {} |
| |
| # Add implicit steps. |
| extra_steps = [] |
| if not clear_default_steps: |
| # Don't add these if the test is expected to raise an exception; |
| # the recipes engine will complain that these steps aren't consumed. |
| on_device = properties.get('device_type', 'QEMU') != 'QEMU' |
| |
| if not expect_failure: |
| outputs = ['out.tar', 'serial.txt'] if on_device else ['output.fs'] |
| task_result = self.m.swarming.task_result( |
| id='1', name='test', outputs=outputs) |
| # needed for checking serial log |
| task_result['output_dir'] = 'output_dir' |
| extra_steps.append( |
| self.task_step_data([task_result], enable_retries=enable_retries)) |
| extra_steps.append(self.tests_json_data(enable_retries=enable_retries)) |
| extra_steps.append(self.test_step_data(enable_retries=enable_retries)) |
| extra_steps.append(self.m.testing_requests.args_test_data()) |
| |
| # Assemble the return value. |
| if status is None: |
| status = 'success' |
| if expect_failure: |
| status = 'failure' |
| ret = self.m.status_check.test(name, status=status) |
| |
| ret += self.m.properties(**properties) |
| git_repo = 'https://fuchsia.googlesource.com/fuchsia' |
| if tryjob: |
| ret += self.m.buildbucket.try_build(project='fuchsia', git_repo=git_repo) |
| else: |
| ret += self.m.buildbucket.ci_build(project='fuchsia', git_repo=git_repo) |
| |
| for s in extra_steps: |
| ret += s |
| for s in steps: |
| # Provided steps override implicit steps. |
| ret += s |
| return ret |
| |
| def task_requests_step_data(self, task_requests, step_name): |
| """Returns mock step data for swarming task requests. |
| |
| This should be used by any test which calls api.fuchsia.test*() and expects |
| to shard tests. |
| |
| Args: |
| shards (seq[dict]): A set of example shards which should |
| be used as step data for the result of invoking the testsharder. |
| step_name (str): name to use for step data |
| |
| Returns: |
| RecipeTestApi.step_data for the extract_results step. |
| """ |
| return self.step_data(step_name, self.m.json.output(task_requests)) |
| |
| def task_step_data(self, task_results, iteration=0, enable_retries=True): |
| """Returns mock step data for collecting Swarming test tasks. |
| |
| This should be only be used when not using sharding. |
| |
| Args: |
| task_results (seq[StepTestData]): step test data to be used for the |
| collect step. |
| iteration (int): The retry index of this task. |
| enable_retries (bool): Whether this is one of potentially several test |
| task attempts. |
| |
| Returns: |
| RecipeTestApi.step_data for the collect step. |
| """ |
| step_name = 'collect' |
| if enable_retries: |
| step_name = 'run tests.attempt %d.%s' % (iteration, step_name) |
| return self.step_data(step_name, self.m.swarming.collect(task_results)) |
| |
| def task_retry_step_data(self, task_results, iteration=0, attempt=None): |
| """Returns mock step data for collecting Swarming test tasks. |
| |
| This should be used by any test which calls api.fuchsia.test*(). |
| |
| Args: |
| task_results (seq[StepTestData]): step test data to be used for the |
| collect step. |
| iteration (int): iteration of swarming_retry |
| attempt (int|None): index of attempt (if None, swarming_retry will |
| make an educated guess) |
| |
| Returns: |
| RecipeTestApi.step_data for the collect step. |
| """ |
| launch_step_data = [] |
| for task in task_results: |
| results = task['results'] |
| launch_step_data.append( |
| self.m.swarming_retry.trigger_data( |
| name=results['name'], |
| task_id=results['task_id'], |
| attempt=attempt, |
| iteration=iteration)) |
| |
| collect_step_data = self.m.swarming_retry.collect_data( |
| task_results, iteration=iteration) |
| |
| return sum(launch_step_data, collect_step_data) |
| |
| # pylint: disable=unused-argument |
| def task_retry_log_data(self, iteration, task_name, log_name, log_contents): |
| return self.step_data( |
| 'launch/collect.{iteration}.process results.{task_name}.' |
| 'check log {log_name}.read'.format(**locals()), |
| self.m.file.read_text(log_contents)) |
| |
| # pylint: enable=unused-argument |
| |
| def test_step_data( |
| self, |
| failure=False, |
| shard_name='', |
| iteration=0, |
| enable_retries=True, |
| tests_json=None, |
| qemu=True, |
| ): |
| """Returns mock step data for test results. |
| |
| This should be used by any test which calls api.fuchsia.test*() and expects |
| it to make it to the tests analysis phase. |
| |
| Args: |
| failure (bool): Whether a test failed or not. |
| shard_name (str): name of the shard for step name |
| iteration (int): iteration of launch/collect step for step name |
| tests_json (seq(dict)): The tests.json to mock. Defaults to |
| `testing_requests.EXAMPLE_TESTS_JSON`. |
| qemu (bool): Whether the tests are being run on QEMU. |
| |
| Returns: |
| RecipeTestApi.step_data for the extract_results step. |
| """ |
| result = 'FAIL' if failure else 'PASS' |
| |
| if shard_name: |
| step_name = 'launch/collect.%d.process results.%s.extract' % (iteration, |
| shard_name) |
| else: |
| step_name = 'extract results' |
| if enable_retries: |
| step_name = 'run tests.attempt %d.%s' % (iteration, step_name) |
| |
| output_dir_contents = { |
| 'goodbye.txt': 'goodbye', |
| 'benchmark.catapult_json': '["dummy_catapult_data"]', |
| } |
| summary_tests = [] |
| if tests_json is None: |
| tests_json = self.m.testing_requests.EXAMPLE_TESTS_JSON |
| for item in tests_json: |
| test = item['test'] |
| name = test['name'] |
| output_file = name if '.' in name else '%s.out' % name |
| summary_test = { |
| # QEMU outputs path as the name field. |
| 'name': test['path'] if qemu else name, |
| 'result': result, |
| 'output_file': output_file |
| } |
| if not qemu: |
| summary_test['gn_label'] = '//path/to/%s:%s(//toolchain)' % (name, name) |
| summary_tests.append(summary_test) |
| # Use the test name as the mocked contents of the test's output file, for |
| # lack of a better option. |
| output_dir_contents[output_file] = name |
| |
| output_dir_contents['summary.json'] = self.m.json.dumps({ |
| 'tests': summary_tests, |
| 'outputs': { |
| 'goodbye-txt': 'goodbye.txt' |
| } |
| }) |
| return self.step_data(step_name, |
| self.m.raw_io.output_dir(output_dir_contents)) |
| |
| def tests_json_data(self, enable_retries=True, iteration=0, tests=None): |
| """Mock the contents of the tests.json file. |
| |
| This is only valid for non-sharded tests, since sharded tests don't read |
| the tests.json file directly. |
| """ |
| step_name = 'read tests.json' |
| if enable_retries: |
| step_name = 'run tests.attempt %d.%s' % (iteration, step_name) |
| if tests is None: |
| tests = self.m.testing_requests.EXAMPLE_TESTS_JSON |
| return self.step_data(step_name, self.m.file.read_json(tests)) |