| # Copyright 2019 The Fuchsia Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| from recipe_engine import recipe_test_api |
| |
| |
| class FuchsiaSwarmingRetryTestApi(recipe_test_api.RecipeTestApi): |
| |
| def _attempt(self, attempt, iteration): |
| """If not given guess the attempt number from context.""" |
| |
| # First, if the attempt number is given use it. The next couple |
| # statements make some assumptions but this is the way to ignore |
| # those assumptions. |
| if attempt is not None: |
| return attempt |
| |
| # If no attempt number is given assume attempts starting at iteration |
| # 0 have attempt number 0. |
| if iteration == 0: |
| return 0 |
| |
| # If not at iteration 0 assume we're relaunching and since the max |
| # attempts is currently 2 this has to be attempt 1. |
| return 1 |
| |
| def led_data(self, name, task_id, iteration=0, attempt=None): |
| """Mock data for LedResult.then('launch'). |
| |
| Args: |
| name (str): name of the task |
| task_id (str): task id returned by mock call |
| iteration (int): iteration of 'launch/collect' step |
| attempt (int or None): index number of attempt |
| |
| See FuchsiaSwarmingRetryTestApi._attempt() for details in case |
| attempt is None. |
| """ |
| |
| attempt = self._attempt(attempt=attempt, iteration=iteration) |
| |
| step_name = ('launch/collect.{iteration}.launch.{name} (attempt {attempt}).' |
| 'led launch'.format( |
| iteration=iteration, name=name, attempt=attempt)) |
| launch_data = self.m.json.output({ |
| 'swarming': { |
| 'host_name': 'chromium-swarm.appspot.com', |
| 'task_id': str(task_id), |
| } |
| }) |
| return self.step_data(step_name, stdout=launch_data) |
| |
| def trigger_data(self, name, task_id, iteration=0, attempt=None): |
| """Like led_data() above, but for mocking api.swarming.trigger.""" |
| |
| attempt = self._attempt(attempt=attempt, iteration=iteration) |
| |
| step_name = ('launch/collect.{iteration}.launch.{name} (attempt {attempt}).' |
| 'trigger'.format( |
| iteration=iteration, name=name, attempt=attempt)) |
| launch_data = self.m.swarming.trigger( |
| task_names=[name], initial_id=int(task_id)) |
| return self.step_data(step_name, launch_data) |
| |
| def task_result(self, |
| name, |
| task_id, |
| failed=False, |
| incomplete=False, |
| timed_out=False, |
| **kwargs): |
| """Mock data for call to api.swarming.collect(). |
| |
| Args: |
| name (str): name of task |
| task_id (str): id of task |
| failed (bool): if the task failed |
| incomplete (bool): if the task is incomplete |
| timed_out (bool): if the task timed out (implies failed) |
| **kwargs (dict): additional args to pass to swarming.task_result() |
| """ |
| assert not (failed and incomplete) |
| assert not (timed_out and incomplete) |
| |
| failed = failed or timed_out |
| |
| state = self.m.swarming.TaskState.COMPLETED |
| if incomplete: |
| state = None |
| name = None |
| elif timed_out: |
| state = self.m.swarming.TaskState.TIMED_OUT |
| |
| return self.m.swarming.task_result( |
| id=str(task_id), name=name, state=state, failure=failed, **kwargs) |
| |
| # These methods are just for convenience to make tests more readable. |
| def incomplete_task(self, name, task_id, **kwargs): |
| return self.task_result(name, task_id, incomplete=True, **kwargs) |
| |
| def failed_task(self, name, task_id, **kwargs): |
| return self.task_result(name, task_id, failed=True, **kwargs) |
| |
| def timed_out_task(self, name, task_id, **kwargs): |
| return self.task_result(name, task_id, timed_out=True, **kwargs) |
| |
| def passed_task(self, name, task_id, **kwargs): |
| return self.task_result(name, task_id, **kwargs) |
| |
| def collect_data(self, results, iteration=0): |
| return self.override_step_data('launch/collect.%d.collect' % iteration, |
| self.m.swarming.collect(results)) |