blob: 842e3c6bd9c6a1394e0524d30c38801d84902d8c [file] [log] [blame]
# Copyright 2019 The Fuchsia Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Launch and retry swarming jobs until they pass or we hit max attempts."""
import itertools
import traceback
import attr
from recipe_engine import recipe_api
from RECIPE_MODULES.fuchsia.utils import pluralize
DEFAULT_MAX_ATTEMPTS = 2
@attr.s
class Attempt(object):
"""References a specific attempt of a task."""
task_id = attr.ib(type=str)
index = attr.ib(type=int, default=None) # Number of prior attempts.
host = attr.ib(type=str, default=None)
task_ui_link = attr.ib(type=str, default=None)
# api.swarming.TaskResult from api.swarming.collect() call.
result = attr.ib(default=None)
# This attribute should be set by overrides of Task.process_result(). It
# indicates that even though at the swarming level the task may have
# passed something failed inside that larger task.
failure_reason = attr.ib(type=str, default="")
task_outputs_link = attr.ib(type=str, default=None)
logs = attr.ib(type=dict, default=attr.Factory(dict))
def __attrs_post_init__(self):
# The led module gives the host and the id, but the swarming module
# gives the link and the id. Require the id (since it has no default
# above) and require either the host or task_ui_link attributes.
assert self.host or self.task_ui_link
if not self.task_ui_link:
self.task_ui_link = "https://%s/task?id=%s" % (self.host, self.task_id)
@property
def name(self):
return "attempt %d" % self.index
# TODO(mohrr) add hook for pass/fail beyond swarming task level.
# In some cases may need to examine isolated output to determine pass/fail.
@property
def success(self):
if self.failure_reason:
return False
if not self.result: # pragma: nocover
return False
try:
self.result.analyze()
return True
except recipe_api.StepFailure:
return False
class TaskTracker(object):
"""TaskTracker tracks state about attempts to run a task.
This has two modes:
* Run the task until we get run_count successes. Usually run_count is
1, for running regular tests, but run_count may be >1 when gathering
results of performance tests.
* Run the task repeatedly (regardless of whether it passes or fails),
until a deadline occurs. This is used for gathering data on test
flakiness.
"""
# States returned by _get_state()
_LAUNCH_MORE = "launch_more"
_IN_PROGRESS = "in_progress"
_OVERALL_SUCCESS = "overall_success"
_OVERALL_FAILURE = "overall_failure"
def __init__(self, api, task, launch_deadline_time, run_count):
"""
Args:
api: recipe_api.RecipeApiPlain object.
task: Task object.
launch_deadline_time: float or int. If set, will keep launching until
this time, ignoring max_attempts and run_count.
run_count: number of successful runs we want to get for the task.
"""
self._api = api
self._task = task
self._launch_deadline_time = launch_deadline_time
self._current_time = 0
self._max_attempts = task.max_attempts
self._attempts = []
self._in_progress_attempts = []
self._successes_required = run_count
self._successes_got = 0
self._failures_got = 0
@property
def name(self):
return self._task.name
@property
def abort_early_if_failed(self):
return self._task.abort_early_if_failed
def set_current_time(self, current_time):
self._current_time = current_time
# Returns a pair (state, number_to_launch), where number_to_launch
# is the number of new task attempts to be launched.
def _get_state(self):
if len(self._in_progress_attempts) != 0:
return self._IN_PROGRESS, 0
if self._launch_deadline_time:
if self._current_time < self._launch_deadline_time:
return self._LAUNCH_MORE, 1
if self._successes_got >= 1:
return self._OVERALL_SUCCESS, 0
return self._OVERALL_FAILURE, 0 # pragma: nocover
if self._successes_got >= self._successes_required:
return self._OVERALL_SUCCESS, 0
# We treat the max_attempts parameter as a multipler, basically
# "max attempts per successful run needed", so that the same
# max_attempts value can be used for both perfcompare and regular
# builders.
attempts_allowed = self._max_attempts * self._successes_required
remaining_needed = self._successes_required - self._successes_got
remaining_allowed = attempts_allowed - len(self._attempts)
if remaining_needed > remaining_allowed:
return self._OVERALL_FAILURE, 0
# Apply the "no futile retries" strategy: If we need multiple
# successful runs but we see no successes in the first batch of
# attempts, don't do any retries, on the grounds that the build
# we're testing is probably bad (i.e. it won't pass if retried).
# This is intended to avoid wasting time and infra capacity.
if (
self._successes_required > 1
and self._successes_got == 0
and len(self._attempts) >= self._successes_required
):
return self._OVERALL_FAILURE, 0
return self._LAUNCH_MORE, remaining_needed
def should_launch(self):
_, number_to_launch = self._get_state()
return number_to_launch > 0
# Launch one or more task attempts. This assumes that should_launch()
# was previously called and returned True.
def launch(self):
state, number_to_launch = self._get_state()
assert state == self._LAUNCH_MORE, state
assert number_to_launch > 0
# Don't increase the priority:
# * if we are retrying until a deadline, or
# * if we need multiple successful runs (used for perfcompare mode).
if self._launch_deadline_time or self._successes_required > 1:
priority_boost_amount = 0
else:
# Boost the priority by the number of previous attempts. This means
# that second attempts will take priority over first attempts, third
# attempts will take priority over second attempts, etc.
#
# This means that if there is a long queue for Swarming tasks to run,
# only the first attempts should wait. Subsequent attempts should
# jump ahead in the queue.
priority_boost_amount = len(self._attempts)
task_ids = []
for _ in xrange(number_to_launch):
attempt_index = len(self._attempts)
task_name = "%s (attempt %d)" % (self.name, attempt_index)
with self._api.step.nest(task_name) as presentation:
attempt = self._task.launch(priority_boost_amount)
attempt.index = attempt_index
self._attempts.append(attempt)
self._in_progress_attempts.append(attempt)
task_ids.append(attempt.task_id)
presentation.links["Swarming task"] = attempt.task_ui_link
return task_ids
def get_successful_attempts(self):
return [attempt for attempt in self._attempts if attempt.success]
def get_failed_attempts(self):
attempts = []
if not self.in_progress:
attempts = [attempt for attempt in self._attempts if not attempt.success]
return attempts
def get_in_progress_attempts(self):
return self._in_progress_attempts[:]
def get_all_attempts(self):
return self._attempts[:]
@property
def in_progress(self):
state, _ = self._get_state()
return state == self._IN_PROGRESS
@property
def success(self):
state, _ = self._get_state()
return state == self._OVERALL_SUCCESS
@property
def failed(self):
return not self.success and not self.in_progress
def failed_after_max_attempts(self):
state, _ = self._get_state()
return state == self._OVERALL_FAILURE
def has_flakes(self):
return self._successes_got > 0 and self._failures_got > 0
def process_result(self, attempt, result, traceback_format_exc):
with self._api.step.nest(result.name):
self._in_progress_attempts.remove(attempt)
attempt.result = result
try:
self._task.process_result(attempt)
except recipe_api.StepFailure as e:
error_step = self._api.step("exception", None)
error_step.presentation.step_summary_text = str(e)
error_step.presentation.logs[
"exception"
] = traceback_format_exc().splitlines()
attempt.failure_reason = "exception during result processing"
if attempt.success:
self._successes_got += 1
else:
self._failures_got += 1
def present_status(self, list_step, attempt):
return self._task.present_status(list_step, attempt)
def present(self, **kwargs):
"""Present this task when summarizing results at the end of the run.
Args:
**kwargs (Dict): passed through to present_attempt()
Returns:
None
"""
with self._api.step.nest(self.name) as task_step_presentation:
for attempt in self._attempts:
self._task.present_attempt(task_step_presentation, attempt, **kwargs)
# Show incomplete tasks in green so as not to be confused with
# actual failures.
if self.success or self.in_progress:
task_step_presentation.status = self._api.step.SUCCESS
else:
task_step_presentation.status = self._api.step.FAILURE
class Task(object):
"""A Task object describes:
* How to launch a task.
* How to process and present the results from a task.
This class is meant to be subclassed. Subclasses must define a launch()
method.
In most cases Task.max_attempts should be left alone. If the caller wants
to ensure a task has a larger or smaller number of max attempts than the
default for other tasks, set max_attempts to that number.
Task objects do not have any state that changes as attempts are launched
and produce results.
"""
def __init__(self, api, name):
"""Initializer.
Args:
api: recipe_api.RecipeApiPlain object.
name: str, human readable name of this task
"""
self._api = api
self.name = name
self.max_attempts = None
self.abort_early_if_failed = False
def process_result(self, attempt):
"""Examine the result in the given attempt for failures.
Subclasses can set attempt.failure_reason if they find a failure inside
attempt.result. failure_reason should be a short summary of the failure
(< 50 chars).
This is invoked shortly after api.swarming.collect() returns that a
task completed. It cannot assume the swarming task completed
successfully.
This is a no-op here but can be overridden by subclasses.
Returns:
None
"""
pass
def present_status(self, parent_step_presentation, attempt):
"""Present an Attempt while showing progress in launch/collect step.
Args:
parent_step_presentation (StepPresentation): will always be for
'passed tasks' or 'failed tasks'
attempt (Attempt): the Attempt to present
This method will be invoked to show details of an Attempt. This base
class method just creates a link to the swarming results from the task,
but subclasses are free to create a step with much more elaborate
details of results.
This is only invoked for completed tasks. Identical code is used for
incomplete tasks, except it's not in Task so it can't be overridden, so
subclasses don't need to handle incomplete tasks.
Returns:
None
"""
name = "%s (%s)" % (self.name, attempt.name)
parent_step_presentation.links[name] = attempt.task_ui_link
def present_attempt(self, task_step_presentation, attempt, **kwargs):
"""Present an Attempt when summarizing results at the end of the run.
Args:
task_step_presentation (StepPresentation): assuming present() was not
overridden, this will always be for a step titled after the current
task
attempt (Attempt): the Attempt to present
**kwargs (Dict): pass-through arguments for subclasses
This method will be invoked to show details of an Attempt. This base
class method just creates a link to the swarming results from the task,
but subclasses are free to create a step with much more elaborate
details of results.
Returns:
None
"""
del kwargs # Unused.
name = "%s (%s)" % (attempt.name, "pass" if attempt.success else "fail")
task_step_presentation.links[name] = attempt.task_ui_link
def launch(self, priority_boost_amount):
"""Launch the task (using Swarming, led, or something else).
Args:
priority_boost_amount (int): Non-negative integer specifying how much
the priority of the task should be increased from the default.
Returns:
Attempt object, with the task_id or host property filled out from
from the Swarming or led result.
"""
assert False, "Subclasses must define launch() method." # pragma: no cover
class TriggeredTask(Task):
def __init__(self, request, api, **kwargs):
super(TriggeredTask, self).__init__(api, request.name, **kwargs)
self._request = request
def launch(self, priority_boost_amount):
assert self._request
request = self._request
if priority_boost_amount != 0:
old_priority = request.priority
# For Swarming tasks, numerically lower priority values are logically
# higher priorities, so use subtraction here.
request = request.with_priority(old_priority - priority_boost_amount)
with self._api.step.nest("increase priority") as pres:
pres.step_summary_text = "from %d to %d" % (
old_priority,
request.priority,
)
metadata = self._api.swarming.trigger("trigger", [request])
assert len(metadata) == 1
metadata = metadata[0]
return self._api.swarming_retry.Attempt(
task_ui_link=metadata.task_ui_link, task_id=metadata.id,
)
class LedTask(Task):
def __init__(self, led_data, *args, **kwargs):
super(LedTask, self).__init__(*args, **kwargs)
self._led_data = led_data
build = led_data.result.buildbucket.bbagent_args.build
self._swarming_pb = build.infra.swarming
self._original_priority = self._swarming_pb.priority
def launch(self, priority_boost_amount):
assert self._led_data
# For Swarming tasks, numerically lower priority values are logically
# higher priorities, so use subtraction here.
new_priority = self._original_priority - priority_boost_amount
# Although we modify this data structure in place, one launch()
# invocation should not affect later launch() invocations because this
# 'priority' field is always overwritten.
self._swarming_pb.priority = new_priority
if priority_boost_amount != 0:
with self._api.step.nest("increase priority") as pres:
pres.step_summary_text = "from %d to %d" % (
self._original_priority,
new_priority,
)
res = self._led_data.then("launch", "-modernize")
host = res.launch_result.swarming_hostname
task_id = res.launch_result.task_id
return self._api.swarming_retry.Attempt(
host=host,
task_id=task_id,
# Use Milo since this task is running a recipe.
task_ui_link="https://ci.chromium.org/swarming/task/%s?server=%s"
% (task_id, host),
)
class RetrySwarmingApi(recipe_api.RecipeApi):
"""Launch and retry swarming jobs until they pass or we hit max attempts."""
Task = Task # pylint: disable=invalid-name
LedTask = LedTask # pylint: disable=invalid-name
TriggeredTask = TriggeredTask # pylint: disable=invalid-name
Attempt = Attempt # pylint: disable=invalid-name
def __init__(self, *args, **kwargs):
super(RetrySwarmingApi, self).__init__(*args, **kwargs)
self._task_ids_seen = set()
self._prune_tracebacks = False
def prune_tracebacks_for_testing(self):
assert self._test_data.enabled
self._prune_tracebacks = True
def run_and_present_tasks(self, tasks, **kwargs):
tasks = self.run_tasks(tasks, **kwargs)
self.present_tasks(tasks)
self.raise_failures(tasks)
def _is_complete(self, result):
# At the moment results have a bunch of fields set to None if incomplete.
# On the assumption this will be changed at some point I'm also checking
# the state explicitly.
if result.name is None:
return False
return result.state not in {
self.m.swarming.TaskState.RUNNING,
self.m.swarming.TaskState.PENDING,
}
def _get_tasks_to_launch(self, tasks):
# The test version of self.m.time.time() advances the fake time
# every time it is called, which is somewhat unhelpful because it
# would make the test outcomes very dependent on implementation
# details of how often the time is sampled. To avoid that, we
# explicitly sample the time once and pass the value in to the
# TaskTracker instances.
current_time = self.m.time.time()
for task in tasks:
task.set_current_time(current_time)
if any(task.failed_after_max_attempts() for task in tasks):
# The build has failed overall, so disable launching any
# further task attempts.
return []
return [task for task in tasks if task.should_launch()]
def _launch(self, tasks):
for task in tasks:
task_ids = task.launch()
# Check whether we got any duplicate task IDs. This is just a
# sanity check for testing. With the current testing
# framework, it is easy for multiple launch attempts to return
# the same ID, because the default testing output always
# returns the same task ID value.
for task_id in task_ids:
assert (
task_id not in self._task_ids_seen
), "Duplicate task ID seen: %s" % repr(task_id)
self._task_ids_seen.add(task_id)
def traceback_format_exc(self):
text = traceback.format_exc()
if self._prune_tracebacks:
# Prune the traceback so that the checked-in test expectations
# don't contain source code line numbers, which often change.
lines = text.splitlines()
text = (
"%s\n... <traceback text omitted to simplify test expectations>\n%s"
% (lines[0], lines[-1])
)
return text
def _launch_and_collect(
self, tasks, collect_output_dir, summary_presentation, present
):
"""Launch necessary tasks and process those that complete.
Launch any tasks that are not currently running, have not passed,
and have not exceeded max_attempts.
After launching tasks, wait for the first task to complete (of the tasks
just launched as well as those that have been running for awhile).
Summarize the jobs that have just passed or failed as well as those still
running (with swarming links).
This function is mostly stateless. The caller must pass in the
same arguments for each invocation, and state is kept inside the
tasks themselves.
Args:
tasks (list[Task]): tasks to execute
collect_output_dir (Path or None): output directory to pass to
api.swarming.collect()
summary_presentation (StepPresentation): where to attach the
summary for this round of launch/collect.
present (bool): Whether or not to present detailed progress.
Returns:
Number of jobs still running or to be relaunched. As long as this
is positive the caller should continue calling this method.
"""
summary = []
def summary_entry(list_name, task_list):
if len(task_list) == 1:
count_or_name = task_list[0].name
else:
count_or_name = len(task_list)
return "%s %s" % (count_or_name, list_name)
to_launch = self._get_tasks_to_launch(tasks)
if to_launch:
with self.m.step.nest("launch"):
self._launch(to_launch)
# Wait on tasks that are in-progress.
tasks_by_id = {}
for task in tasks:
for attempt in task.get_in_progress_attempts():
assert attempt.task_id not in tasks_by_id
tasks_by_id[attempt.task_id] = (task, attempt)
results = []
if tasks_by_id:
results = self.m.swarming.collect(
"collect",
sorted(tasks_by_id.iterkeys()),
output_dir=collect_output_dir,
eager=True,
verbose=True,
)
# 'collect' takes a list of task IDs and returns a list specifying
# whether each task has completed or is still running. However,
# sometimes the list it returns misses tasks that haven't
# completed. That makes no difference here because we only look at
# the completed tasks.
completed_results = [result for result in results if self._is_complete(result)]
passed_tasks = []
failed_tasks = []
if completed_results:
with self.m.step.nest("process results") as process_results_presentation:
for result in completed_results:
task, attempt = tasks_by_id[result.id]
task.process_result(attempt, result, self.traceback_format_exc)
if attempt.success:
passed_tasks.append((task, attempt))
else:
failed_tasks.append((task, attempt))
# Prevent failure states from the child log steps being
# propagated up the log step tree by the recipe log step
# system. This is desirable because although
# task.process_result() may internally catch and suppress
# an exception, the exception will still be reported
# through recipe log step system.
process_results_presentation.status = self.m.step.SUCCESS
for list_name, task_list in [
("passed", passed_tasks),
("failed", failed_tasks),
]:
if task_list and present:
with self.m.step.nest("%s tasks" % list_name) as list_step:
for task, attempt in task_list:
task.present_status(list_step, attempt)
summary.append(
summary_entry(list_name, [task for task, _ in task_list])
)
incomplete_tasks = [task for task in tasks if task.in_progress]
if present:
# Do minimal presentation of all in-progress Attempts.
links = []
for task in tasks:
for attempt in task.get_in_progress_attempts():
name = "%s (%s)" % (task.name, attempt.name)
links.append((name, attempt.task_ui_link))
if links:
with self.m.step.nest("incomplete tasks") as list_step_presentation:
list_step_presentation.links.update(links)
summary.append(summary_entry("incomplete", incomplete_tasks))
to_be_relaunched = self._get_tasks_to_launch(tasks)
failed_after_max_attempts = [
task for task in tasks if task.failed_after_max_attempts()
]
if failed_after_max_attempts:
summary.append(
summary_entry("failed after max attempts", failed_after_max_attempts)
)
if present:
summary_presentation.step_summary_text = ", ".join(summary)
# Check if all abort_early_if_failed tasks are finished. If one or more
# fail, don't wait on remaining tasks. They will be killed when the
# build is complete.
abort_early_tasks = [task for task in tasks if task.abort_early_if_failed]
abort_early_tasks_in_progress = [
task for task in abort_early_tasks if task.in_progress
]
if not abort_early_tasks_in_progress:
failed_abort_early_tasks = [
task for task in abort_early_tasks if task.failed_after_max_attempts()
]
if failed_abort_early_tasks:
return 0
return len(to_be_relaunched) + len(incomplete_tasks)
def run_tasks(
self,
tasks,
max_attempts=0,
launch_deadline_time=None,
collect_output_dir=None,
present=True,
run_count=1,
):
"""Launch all tasks, retry until max_attempts reached.
Args:
tasks (seq[Task]): tasks to execute
max_attempts (int): maximum number of attempts per task (0 means
DEFAULT_MAX_ATTEMPTS)
launch_deadline_time: see TaskTracker constructor.
collect_output_dir (Path or None): output directory to pass to
api.swarming.collect()
present (bool): Whether or not to present progress info.
Setting to False may reduce running time and memory usage.
"""
max_attempts = max_attempts or DEFAULT_MAX_ATTEMPTS
for task in tasks:
if not task.max_attempts:
task.max_attempts = max_attempts
tasks = [
TaskTracker(self.m, task, launch_deadline_time, run_count) for task in tasks
]
with self.m.step.nest("launch/collect"), self.m.context(infra_steps=True):
for i in itertools.count(0):
with self.m.step.nest(str(i)) as presentation:
if not self._launch_and_collect(
tasks=tasks,
collect_output_dir=collect_output_dir,
summary_presentation=presentation,
present=present,
):
break
return tasks
def present_tasks(self, tasks):
"""Present results as steps.
Examine tasks for pass/fail status and create step data for displaying
that status. Group all passes under one step and all failures under
another step. Passes that failed at least once are also listed as
flakes.
Args:
tasks (seq[Task]): tasks to examine
"""
# TODO(mohrr) add hooks to include task-specific data beyond pass/fail.
passed_tasks = [x for x in tasks if x.success]
# Some tasks may be incomplete if the launch_and_collect loop exited
# early due to failures.
incomplete_tasks = [x for x in tasks if x.in_progress]
failed_tasks = [x for x in tasks if x.failed]
flaked_tasks = [x for x in tasks if x.has_flakes()]
with self.m.step.nest("passes") as step_presentation:
for task in passed_tasks:
task.present(category="passes")
step_presentation.step_summary_text = "%d passed" % len(passed_tasks)
with self.m.step.nest("flakes") as step_presentation:
for task in flaked_tasks:
task.present(category="flakes")
step_presentation.step_summary_text = "%d flaked" % len(flaked_tasks)
with self.m.step.nest("failures") as step_presentation:
for task in failed_tasks:
task.present(category="failures")
step_presentation.step_summary_text = "%d failed" % len(failed_tasks)
if incomplete_tasks:
with self.m.step.nest("incomplete") as step_presentation:
for task in incomplete_tasks:
task.present(category="incomplete")
step_presentation.step_summary_text = "%d incomplete" % len(
incomplete_tasks
)
if not failed_tasks and not incomplete_tasks:
self.m.step("all tasks passed", None) # pragma: no cover
def raise_failures(self, tasks):
"""Raise an exception if any tasks failed.
Examine tasks for pass/fail status. If any failed, raise a StepFailure.
Args:
tasks (seq[Task]): tasks to examine
"""
failed = [x for x in tasks if x.failed]
if failed:
raise self.m.step.StepFailure(
"%s failed: %s"
% (pluralize("task", failed), ", ".join(x.name for x in failed),)
)