blob: 05594bac371a27dce60152fbb03d88bdb870411a [file] [log] [blame]
# Copyright 2019 The Fuchsia Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
from google.protobuf import json_format
from recipe_engine.recipe_api import Property
from PB.go.chromium.org.luci.led.job import job as job_pb2
from RECIPE_MODULES.fuchsia.swarming_retry import api as swarming_retry_api
DEPS = [
"fuchsia/status_check",
"fuchsia/swarming_retry",
"recipe_engine/buildbucket",
"recipe_engine/led",
"recipe_engine/properties",
"recipe_engine/step",
"recipe_engine/swarming",
"recipe_engine/time",
]
PROPERTIES = {
"full": Property(
kind=bool, default=False, help="Whether to run six tasks or just one.",
),
"run_count": Property(
kind=int, default=1, help="Number of successful runs we want for each task.",
),
"task_type": Property(
kind=str,
default="test",
help="Type of tasks to create. Options: "
'"test", "internal_failure", "raising", "led", "triggered".',
),
"max_attempts": Property(kind=int, default=2, help="Overall max attempts.",),
"last_task_max_attempts": Property(
kind=int,
default=None,
help="Override the overall max attempts by setting on "
"Task.max_attempts. Only set on last task.",
),
"launch_deadline_time": Property(
kind=float,
default=None,
help="Passed through to swarming_retry.Task.__init__().",
),
"abort_early": Property(
kind=bool,
default=False,
help="Whether to run a task that will fail and abort early",
),
}
class Task(swarming_retry_api.Task):
"""Required subclass for testing swarming_retry.
Defined inside a function because base class is inside api object.
"""
def __init__(self, initial_task_id, *args, **kwargs):
"""Construct a Task object.
Args:
initial_task_id (int or str): integer decimal value (since this needs
to be incremented but is then used as a str later this method
accepts both int and str types to minimize confusion, so long as
int(initial_task_id) works)
"""
abort_early = kwargs.pop("abort_early", False)
super(Task, self).__init__(*args, **kwargs)
self._next_task_id = int(initial_task_id)
self.abort_early_if_failed = abort_early
def launch(self, priority_boost_amount):
kwargs = {
"task_id": str(self._next_task_id),
}
self._next_task_id += 1
# This looks funny but it's needed to ensure coverage of
# Attempt.task_ui_link.
if self._next_task_id % 2 == 0:
kwargs["host"] = "testhost"
else:
kwargs["task_ui_link"] = "https://testhost/task?id=%s" % kwargs["task_id"]
attempt = self._api.swarming_retry.Attempt(**kwargs)
step = self._api.step("launch %s" % self.name, None)
step.presentation.step_summary_text = attempt.task_id
return attempt
class InternalFailureTask(Task):
def process_result(self, attempt):
attempt.failure_reason = "internal failure"
class RaisingTask(Task):
def process_result(self, attempt):
# Raise the exception inside a recipe log step in order to test how
# the exception handling in the software-under-test interacts with
# failure reporting in the log steps.
#
# The step named "nested" will get marked as failed. The recipe
# framework's log system will mark all the parent and ancestor
# steps as failed too (even if the exception is caught and
# handled), unless measures are taken to prevent that. The
# software-under-test does take those measures, but without this
# nested step those measures won't get tested.
with self._api.step.nest("nested"):
raise self._api.step.StepFailure("something failed")
class LedTask(swarming_retry_api.LedTask):
def __init__(self, initial_task_id, api, **kwargs):
ir = api.led("get-builder", "project/bucket:builder")
build_proto = ir.result.buildbucket.bbagent_args.build
build_proto.infra.swarming.task_id = str(initial_task_id)
super(LedTask, self).__init__(ir, api=api, **kwargs)
def launch(self, priority_boost_amount):
ret = super(LedTask, self).launch(priority_boost_amount)
build_proto = self._led_data.result.buildbucket.bbagent_args.build
cur_id = int(build_proto.infra.swarming.task_id)
build_proto.infra.swarming.task_id = str(cur_id + 1)
return ret
class TriggeredTask(swarming_retry_api.TriggeredTask):
def __init__(self, api, name, initial_task_id, **kwargs):
del initial_task_id # Unused.
dimensions = {
"pool": "pool",
"device_type": "device_type",
}
request = api.swarming.task_request().with_name(name)
request = request.with_slice(0, request[0].with_dimensions(**dimensions))
super(TriggeredTask, self).__init__(request, api, **kwargs)
# Test that the interface of traceback_format_exc() stays the same
# whether or not its test mode is enabled. As a side effect, this
# enables its test mode (prune_tracebacks_for_testing()).
def test_traceback_format_exc(api):
try:
raise AssertionError()
except:
text = api.swarming_retry.traceback_format_exc()
assert isinstance(text, str)
assert "line " in text, repr(text)
assert "raise AssertionError()" in text, repr(text)
api.swarming_retry.prune_tracebacks_for_testing()
try:
raise AssertionError()
except:
text = api.swarming_retry.traceback_format_exc()
assert isinstance(text, str)
assert "line " not in text, repr(text)
# pylint: disable=invalid-name
def RunSteps(
api,
full,
task_type,
max_attempts,
last_task_max_attempts,
launch_deadline_time,
run_count,
abort_early,
):
test_traceback_format_exc(api)
task_types = {
"test": Task,
"internal_failure": InternalFailureTask,
"raising": RaisingTask,
"led": LedTask,
"triggered": TriggeredTask,
}
_create_task = task_types[task_type] # pylint: disable=invalid-name
if full:
tasks = [
_create_task(api=api, name="pass", initial_task_id=100),
_create_task(api=api, name="flake", initial_task_id=200),
_create_task(api=api, name="fail", initial_task_id=300),
_create_task(api=api, name="pass_long", initial_task_id=400),
_create_task(api=api, name="flake_long", initial_task_id=500),
_create_task(api=api, name="fail_long", initial_task_id=600),
]
else:
tasks = [_create_task(api=api, name="task", initial_task_id=100)]
if abort_early:
tasks.append(
_create_task(
api=api, name="abort_early_task", initial_task_id=700, abort_early=True,
)
)
if last_task_max_attempts:
tasks[-1].max_attempts = last_task_max_attempts
if launch_deadline_time:
# Avoid using run_and_present_tasks() when testing
# launch_deadline_time. Although it works, its present_tasks()
# step will misleadingly present tasks as flakes if there are
# multiple runs, even if they are all successful.
assert run_count == 1, run_count
tasks = api.swarming_retry.run_tasks(
tasks, launch_deadline_time=launch_deadline_time
)
for task in tasks:
# Call these methods to satisfy code coverage checks.
assert isinstance(task.get_all_attempts(), list)
assert isinstance(task.get_successful_attempts(), list)
assert isinstance(task.get_failed_attempts(), list)
else:
api.swarming_retry.run_and_present_tasks(
tasks, max_attempts=max_attempts, run_count=run_count
)
def GenTests(api): # pylint: disable=invalid-name
test_api = api.swarming_retry
def led_build_data(priority=100):
build = api.buildbucket.ci_build_message(priority=priority)
job_def = job_pb2.Definition()
job_def.buildbucket.bbagent_args.build.CopyFrom(build)
return api.led.mock_get_builder(job_def)
yield (
api.status_check.test("full_test", status="failure")
+ api.properties(full=True)
+ test_api.collect_data(
[
test_api.passed_task("pass", 100),
test_api.failed_task("flake", 200),
test_api.failed_task("fail", 300),
],
iteration=0,
)
+ test_api.collect_data(
[test_api.passed_task("flake", 201), test_api.failed_task("fail", 301),],
iteration=1,
)
+
# `fail` task failed max times so remaining long tasks should only be run
# once.
test_api.collect_data(
[
test_api.incomplete_task("pass_long", 400),
test_api.incomplete_task("flake_long", 500),
test_api.incomplete_task("fail_long", 600),
],
iteration=2,
)
+ test_api.collect_data([], iteration=3)
+ test_api.collect_data([test_api.passed_task("pass_long", 400),], iteration=4)
+ test_api.collect_data([test_api.failed_task("flake_long", 500),], iteration=5)
+ test_api.collect_data([test_api.failed_task("fail_long", 600),], iteration=6)
)
yield (
api.status_check.test("timeout_then_pass")
+ api.properties(full=False)
+ test_api.collect_data([test_api.timed_out_task("task", 100)])
+ test_api.collect_data([test_api.passed_task("task", 101)], iteration=1)
)
yield (
api.status_check.test("internal_failure", status="failure")
+ api.properties(full=False, task_type="internal_failure")
+ test_api.collect_data([test_api.passed_task("task", 100)], iteration=0)
+ test_api.collect_data([test_api.passed_task("task", 101)], iteration=1)
)
yield (
api.status_check.test("raising_process_results", status="failure")
+ api.properties(full=False, task_type="raising")
+ test_api.collect_data([test_api.passed_task("task", 100)], iteration=0)
+ test_api.collect_data([test_api.passed_task("task", 101)], iteration=1)
)
yield (
api.status_check.test("led_task")
+ api.properties(full=False, task_type="led")
+ led_build_data()
+ test_api.collect_data([test_api.failed_task("task", 100)], iteration=0)
+ test_api.collect_data([test_api.passed_task("task", 101)], iteration=1)
)
yield (
api.status_check.test("led_task_hardcoded_attempt")
+ api.properties(full=False, task_type="led")
+ led_build_data()
)
yield (
api.status_check.test("triggered_task")
+ api.properties(full=False, task_type="triggered")
+ api.swarming_retry.trigger_data("task", 1, iteration=0)
+ test_api.collect_data([test_api.failed_task("task", 1)], iteration=0)
+ api.swarming_retry.trigger_data("task", 2, iteration=1)
+ test_api.collect_data([test_api.passed_task("task", 2)], iteration=1)
)
yield (
api.status_check.test("max_attempts_three", status="failure")
+ api.properties(full=False, task_type="raising", max_attempts=3)
+ test_api.collect_data([test_api.passed_task("task", 100)], iteration=0)
+ test_api.collect_data([test_api.passed_task("task", 101)], iteration=1)
+ test_api.collect_data([test_api.passed_task("task", 102)], iteration=2)
)
yield (
api.status_check.test("last_task_max_attempts_low", status="failure")
+ api.properties(
full=False, task_type="raising", max_attempts=3, last_task_max_attempts=1
)
+ test_api.collect_data([test_api.passed_task("task", 100)], iteration=0)
)
yield (
api.status_check.test("last_task_max_attempts_high", status="failure")
+ api.properties(
full=False, task_type="raising", max_attempts=3, last_task_max_attempts=5
)
+ test_api.collect_data([test_api.passed_task("task", 100)], iteration=0)
+ test_api.collect_data([test_api.passed_task("task", 101)], iteration=1)
+ test_api.collect_data([test_api.passed_task("task", 102)], iteration=2)
+ test_api.collect_data([test_api.passed_task("task", 103)], iteration=3)
+ test_api.collect_data([test_api.passed_task("task", 104)], iteration=4)
)
# Test the launch_deadline_time parameter, used for implementing
# rerun_budget_secs for the rerun builders. Test that the task gets
# run multiple times (more than the default max_attempts) even when it
# passes.
#
# The test version of api.time.time() advances the clock by the amount
# set by api.time.step() on every call, which is not very helpful
# because it makes the test quite dependent on how often the
# software-under-test calls api.time.time(). We pick the values so
# that the task gets run 3 times.
yield (
api.status_check.test("launch_deadline_time")
+ api.properties(full=False, task_type="triggered", launch_deadline_time=103.4)
+ api.time.seed(100.0)
+ api.time.step(0.5)
+ api.swarming_retry.trigger_data("task", 1, iteration=0)
+ test_api.collect_data([test_api.passed_task("task", 1)], iteration=0)
+ api.swarming_retry.trigger_data("task", 2, iteration=1)
+ test_api.collect_data([test_api.passed_task("task", 2)], iteration=1)
+ api.swarming_retry.trigger_data("task", 3, iteration=2, attempt=2)
+ test_api.collect_data([test_api.passed_task("task", 3)], iteration=2)
)
# Test case where we want to get multiple successful runs of the same
# task (run_count > 1).
# Test the simple case where there are no failures of the task.
yield (
api.status_check.test("multirun_without_failures", status="success")
+ api.properties(run_count=2, task_type="triggered")
# Enforce that both of these task attempt are launched in the first
# iteration. (This requires using task_type="triggered".)
+ api.swarming_retry.trigger_data("task", 100, iteration=0, attempt=0)
+ api.swarming_retry.trigger_data("task", 101, iteration=0, attempt=1)
+ test_api.collect_data([test_api.passed_task("task", 100)], iteration=0)
+ test_api.collect_data([test_api.passed_task("task", 101)], iteration=1)
)
# Test the case where a task must be retried (multiple times) but
# eventually passes.
yield (
api.status_check.test("multirun_retry_overall_pass", status="success")
+ api.properties(run_count=2, task_type="triggered")
+ api.swarming_retry.trigger_data("task", 100, iteration=0, attempt=0)
+ api.swarming_retry.trigger_data("task", 101, iteration=0, attempt=1)
+ test_api.collect_data([test_api.passed_task("task", 100)], iteration=0)
+ test_api.collect_data([test_api.failed_task("task", 101)], iteration=1)
+ api.swarming_retry.trigger_data("task", 102, iteration=2, attempt=2)
+ test_api.collect_data([test_api.failed_task("task", 102)], iteration=2)
+ api.swarming_retry.trigger_data("task", 103, iteration=3, attempt=3)
+ test_api.collect_data([test_api.passed_task("task", 103)], iteration=3)
)
# Test the case where a task is retried, but ultimately we do not get
# enough passes within the max_attempts retry limit.
yield (
api.status_check.test("multirun_retry_overall_fail", status="failure")
+ api.properties(run_count=2, task_type="triggered")
+ api.swarming_retry.trigger_data("task", 100, iteration=0, attempt=0)
+ api.swarming_retry.trigger_data("task", 101, iteration=0, attempt=1)
+ test_api.collect_data([test_api.passed_task("task", 100)], iteration=0)
+ test_api.collect_data([test_api.failed_task("task", 101)], iteration=1)
+ api.swarming_retry.trigger_data("task", 102, iteration=2, attempt=2)
+ test_api.collect_data([test_api.failed_task("task", 102)], iteration=2)
+ api.swarming_retry.trigger_data("task", 103, iteration=3, attempt=3)
+ test_api.collect_data([test_api.failed_task("task", 103)], iteration=3)
)
# If the last attempt in the list passed but the task failed overall,
# it should not be treated as passed.
#
# Also, if the last attempt in the list completes before other attempts
# have completed, the task should be treated as in-progress not
# finished.
yield (
api.status_check.test("multirun_fail_pass", status="failure")
+ api.properties(run_count=2, max_attempts=1, task_type="triggered")
+ api.swarming_retry.trigger_data("task", 100, iteration=0, attempt=0)
+ api.swarming_retry.trigger_data("task", 101, iteration=0, attempt=1)
# The second attempt passes first.
+ test_api.collect_data([test_api.passed_task("task", 101)], iteration=0)
+ test_api.collect_data([test_api.failed_task("task", 100)], iteration=1)
)
# Test that the "no futile retries" strategy is applied: If all of the
# attempts in the first batch fail, there should be no retries.
yield (
api.status_check.test("multirun_no_futile_retries", status="failure")
+ api.properties(run_count=2, task_type="triggered")
+ api.swarming_retry.trigger_data("task", 100, iteration=0, attempt=0)
+ api.swarming_retry.trigger_data("task", 101, iteration=0, attempt=1)
+ test_api.collect_data([test_api.failed_task("task", 100)], iteration=0)
+ test_api.collect_data([test_api.failed_task("task", 101)], iteration=1)
)
yield (
api.status_check.test(
"no_collect_after_failed_abort_early_task", status="failure"
)
+ api.properties(full=True, abort_early=True)
+ test_api.collect_data(
[
test_api.failed_task("abort_early_task", 700),
test_api.passed_task("pass", 100),
test_api.failed_task("flake", 200),
test_api.failed_task("fail", 300),
],
iteration=0,
)
+ test_api.collect_data(
[test_api.failed_task("abort_early_task", 701),], iteration=1,
)
)