blob: 0ff088a385eac80827ed80c5c75440fd019c0e4a [file] [log] [blame]
# Copyright 2019 The Fuchsia Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
from recipe_engine import recipe_test_api
class FuchsiaSwarmingRetryTestApi(recipe_test_api.RecipeTestApi):
def _attempt(self, attempt, iteration):
"""If not given guess the attempt number from context."""
# First, if the attempt number is given use it. The next couple
# statements make some assumptions but this is the way to ignore
# those assumptions.
if attempt is not None: # pragma: no cover
return attempt
# If no attempt number is given assume attempts starting at iteration
# 0 have attempt number 0.
if iteration == 0:
return 0
# If not at iteration 0 assume we're relaunching and since the max
# attempts is currently 2 this has to be attempt 1.
return 1
def trigger_data(self, name, task_id, iteration=0, attempt=None):
"""Like led_data() above, but for mocking api.swarming.trigger."""
attempt = self._attempt(attempt=attempt, iteration=iteration)
step_name = (
"launch/collect.{iteration}.launch.{name} (attempt {attempt})."
"trigger".format(iteration=iteration, name=name, attempt=attempt)
)
launch_data = self.m.swarming.trigger(
task_names=[name], initial_id=int(task_id)
)
return self.step_data(step_name, launch_data)
def task_result(
self, name, task_id, failed=False, incomplete=False, timed_out=False, **kwargs
):
"""Mock data for call to api.swarming.collect().
Args:
name (str): name of task
task_id (str): id of task
failed (bool): if the task failed
incomplete (bool): if the task is incomplete
timed_out (bool): if the task timed out (implies failed)
**kwargs (dict): additional args to pass to swarming.task_result()
"""
assert not (failed and incomplete)
assert not (timed_out and incomplete)
failed = failed or timed_out
state = self.m.swarming.TaskState.COMPLETED
if incomplete:
state = None
name = None
elif timed_out:
state = self.m.swarming.TaskState.TIMED_OUT
return self.m.swarming.task_result(
id=str(task_id), name=name, state=state, failure=failed, **kwargs
)
# These methods are just for convenience to make tests more readable.
def incomplete_task(self, name, task_id, **kwargs):
return self.task_result(name, task_id, incomplete=True, **kwargs)
def failed_task(self, name, task_id, **kwargs):
return self.task_result(name, task_id, failed=True, **kwargs)
def timed_out_task(self, name, task_id, **kwargs):
return self.task_result(name, task_id, timed_out=True, **kwargs)
def passed_task(self, name, task_id, **kwargs):
return self.task_result(name, task_id, **kwargs)
def collect_data(self, results, iteration=0):
return self.override_step_data(
"launch/collect.%d.collect" % iteration, self.m.swarming.collect(results)
)