blob: 6b869c6149a2d95ec2f5fc8abd716e1c8b6cc41e [file] [log] [blame]
# Copyright 2017 The Fuchsia Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import base64
from state import TaskState
from recipe_engine import recipe_api
class CollectResult(object):
"""Wrapper object for collect results."""
def __init__(self, m, id, raw_results, outdir):
self._id = id
self._raw_results = raw_results
self._outputs = {}
if 'error' in raw_results:
self._state = TaskState.RPC_FAILURE
else:
state = raw_results['results']['state']
failure = raw_results['results'].get('failure', False)
if state == 'COMPLETED' and failure:
self._state = TaskState.TASK_FAILURE
elif state == 'COMPLETED' and not failure:
self._state = TaskState.SUCCESS
elif state == 'TIMED_OUT':
self._state = TaskState.TIMED_OUT
elif state == 'EXPIRED':
self._state = TaskState.EXPIRED
elif state == 'NO_RESOURCE':
self._state = TaskState.NO_RESOURCE
elif state == 'BOT_DIED':
self._state = TaskState.BOT_DIED
elif state == 'CANCELED':
self._state = TaskState.CANCELED
elif state == 'KILLED':
self._state = TaskState.KILLED
if self._state == TaskState.RPC_FAILURE:
self._output = raw_results['error']
else:
self._output = self._raw_results['output']
if self._raw_results.get('outputs'):
self._outputs = {
output: m.path.join(outdir, id, output)
for output in self._raw_results['outputs']
}
@property
def name(self):
if self._state != TaskState.RPC_FAILURE:
return self._raw_results['results']['name']
return None
@property
def id(self):
return self._id
@property
def state(self):
return self._state
@property
def output(self):
return self._output
@property
def outputs(self):
return self._outputs
class TaskRequest(object):
"""Wrapper object for constructing a Swarming task request."""
def __init__(self, name, cmd, dimensions, isolated='', isolate_server='',
expiration_secs=300, io_timeout_secs=60, hard_timeout_secs=1200,
idempotent=False, secret_bytes='', cipd_packages=(), outputs=()):
"""Creates a Swarming task request object.
For more details on what goes into a Swarming task, see the user guide:
https://github.com/luci/luci-py/blob/master/appengine/swarming/doc/User-Guide.md#task
Args:
name (str): Name of the task.
cmd (list[str]): The command that will be executed by the swarming_bot in
the task.
isolated (str): Hash of isolated on isolate server.
isolate_server (str): Base URL of the isolate server.
dimensions (dict[str]str): Dimensions to filter swarming bots on.
expiration_secs (int): Seconds before this task request expires.
io_timeout_secs (int): Seconds to allow the task to be silent.
hard_timeout_secs (int): Seconds before swarming should kill the task.
idempotent (bool): Whether this task is considered idempotent. Idempotent
tasks are such that if another task is executed with identical
properties, we can short-circuit execution and just return the other
task's results.
secret_bytes (str): Data that can securely be passed to the task.
cipd_packages (list[(str,str,str)]: List of 3-tuples corresponding to
CIPD packages needed for the task: ('path', 'package_name', 'version'),
defined as follows:
path: Path relative to the Swarming root dir in which to install
the package.
package_name: Name of the package to install,
eg. "infra/tools/authutil/${platform}"
version: Version of the package, either a package instance ID,
ref, or tag key/value pair.
outputs (list[str]): List of paths to files which can be downloaded via
collect().
"""
assert len(dimensions) >= 1 and dimensions['pool']
self.name = name
self.cmd = cmd
self.isolate_server = isolate_server
self.isolated = isolated
self.dimensions = dimensions
self.expiration_secs = expiration_secs
self.io_timeout_secs = io_timeout_secs
self.hard_timeout_secs = hard_timeout_secs
self.idempotent = idempotent
self.secret_bytes = base64.b64encode(secret_bytes)
self.cipd_packages = cipd_packages
self.outputs = outputs
def render_to_json(self):
"""Renders the task request as a JSON-serializable dict.
The format follows the Swarming task request API, which may be found here:
https://chromium.googlesource.com/infra/luci/luci-go/+/819bad947699d6a3168d476281528b73abfe32d0/common/api/swarming/swarming/v1/swarming-api.json#1313
"""
properties = {
'command': self.cmd,
'dimensions': [
{
'key': k,
'value': v,
}
for k, v in self.dimensions.iteritems()
],
'execution_timeout_secs': str(self.hard_timeout_secs),
'io_timeout_secs': str(self.io_timeout_secs),
# When a Swarming task is killed, the grace period is the amount of time
# to wait before a SIGKILL is issued to the process, allowing it to
# perform any clean-up operations.
'grace_period_secs': str(30),
'idempotent': self.idempotent,
'outputs': self.outputs,
}
if self.isolate_server and self.isolated:
properties['inputs_ref'] = {
'isolated': self.isolated,
'namespace': 'default-gzip',
'isolatedserver': self.isolate_server,
}
if self.secret_bytes:
properties['secret_bytes'] = self.secret_bytes
if self.cipd_packages:
properties['cipd_input'] = {
'packages': [
{
'package_name': name,
'path': path,
'version': version,
}
for path, name, version in self.cipd_packages
],
}
return {
'name': self.name,
'expiration_secs': str(self.expiration_secs),
# Priority is a numerical priority between 0 and 255 where a higher
# number corresponds to a lower priority. Tasks are scheduled by swarming
# in order of their priority (e.g. if both a task of priority 1 and a task
# of priority 2 are waiting for resources to free up for execution, the
# task with priority 1 will take precedence).
'priority': str(200),
'properties': properties,
}
class SwarmingApi(recipe_api.RecipeApi):
"""APIs for interacting with swarming."""
TaskState = TaskState
def __init__(self, swarming_server, *args, **kwargs):
super(SwarmingApi, self).__init__(*args, **kwargs)
self._swarming_server = swarming_server
self._swarming_client = None
def __call__(self, *args, **kwargs):
"""Return a swarming command step."""
assert self._swarming_client
name = kwargs.pop('name', 'swarming ' + args[0])
return self.m.step(name, [self._swarming_client] + list(args), **kwargs)
def ensure_swarming(self, version=None):
"""Ensures that swarming client is installed."""
if self._swarming_client:
return self._swarming_client
with self.m.step.nest('ensure_swarming'):
with self.m.context(infra_steps=True):
cipd_dir = self.m.path['start_dir'].join('cipd', 'swarming')
pkgs = self.m.cipd.EnsureFile()
pkgs.add_package('infra/tools/luci/swarming/${platform}', version or 'release')
self.m.cipd.ensure(cipd_dir, pkgs)
self._swarming_client = cipd_dir.join('swarming')
return self._swarming_client
@property
def swarming_client(self):
return self._swarming_client
@property
def swarming_server(self):
"""URL of Swarming server to use, default is a production one."""
return self._swarming_server
@swarming_server.setter
def swarming_server(self, value):
"""Changes URL of Swarming server to use."""
self._swarming_server = value
def trigger(self, name, raw_cmd, isolated=None, dump_json=None,
dimensions=None, expiration=None, io_timeout=None,
hard_timeout=None, idempotent=False, cipd_packages=None,
outputs=None):
"""Triggers a Swarming task.
Args:
name: name of the task.
raw_cmd: task command.
isolated: hash of isolated test on isolate server.
dump_json: dump details about the triggered task(s).
dimensions: dimensions to filter slaves on.
expiration: seconds before this task request expires.
io_timeout: seconds to allow the task to be silent.
hard_timeout: seconds before swarming should kill the task.
idempotent: whether this task is considered idempotent.
cipd_packages: list of 3-tuples corresponding to CIPD packages needed for
the task: ('path', 'package_name', 'version'), defined as follows:
path: Path relative to the Swarming root dir in which to install
the package.
package_name: Name of the package to install,
eg. "infra/tools/authutil/${platform}"
version: Version of the package, either a package instance ID,
ref, or tag key/value pair.
outputs: list of paths to files which can be downloaded via collect.
"""
assert self._swarming_client
cmd = [
self._swarming_client,
'trigger',
'-isolate-server', self.m.isolated.isolate_server,
'-server', self.swarming_server,
'-task-name', name,
'-namespace', 'default-gzip',
'-dump-json', self.m.json.output(leak_to=dump_json),
]
if isolated:
cmd.extend(['-isolated', isolated])
if dimensions:
for k, v in sorted(dimensions.iteritems()):
cmd.extend(['-dimension', '%s=%s' % (k, v)])
if expiration:
cmd.extend(['-expiration', str(expiration)])
if io_timeout:
cmd.extend(['-io-timeout', str(io_timeout)])
if hard_timeout:
cmd.extend(['-hard-timeout', str(hard_timeout)])
if idempotent:
cmd.append('-idempotent')
if cipd_packages:
for path, pkg, version in cipd_packages:
cmd.extend(['-cipd-package', '%s:%s=%s' % (path, pkg, version)])
if outputs:
for output in outputs:
cmd.extend(['-output', output])
cmd.extend(['-raw-cmd', '--'] + raw_cmd)
return self.m.step(
'trigger %s' % name,
cmd,
step_test_data=lambda: self.test_api.trigger(name, raw_cmd,
dimensions=dimensions, cipd_packages=cipd_packages)
)
def task_request(self, *args, **kwargs):
"""Creates a new TaskRequest object.
Passes down all arguments to the TaskRequest constructor with the exception
of isolate_server, which is provided by the isolated recipe module.
"""
return TaskRequest(*args, **dict(
kwargs, isolate_server=self.m.isolated.isolate_server))
def spawn_tasks(self, tasks=(), json_output=None):
"""Spawns a set of Swarming tasks.
Args:
tasks (seq[TaskRequest]): A sequence of task request objects representing
the tasks we want to spawn.
json_output (Path): Optional filepath to leak a JSON file containing
the return value of this method.
Returns:
A Python dict representing the JSON spawn response that may be passed into
collect().
"""
assert len(tasks) > 0
requests = []
for task in tasks:
requests.append(task.render_to_json())
spawn_resp = self.m.step(
'spawn %d tasks' % len(tasks),
[
self._swarming_client,
'spawn-tasks',
'-server', self.swarming_server,
'-json-input', self.m.json.input({'requests': requests}),
'-json-output', self.m.json.output(leak_to=json_output),
],
step_test_data=lambda: self.test_api.spawn_tasks(tasks),
).json.output
presented_links = self.m.step.active_result.presentation.links
for task in spawn_resp['tasks']:
task_id = task['task_id']
name = task['request']['name']
presented_links['Swarming task: %s' % name] = (
'%s/task?id=%s' % (self.swarming_server, task_id)
)
return spawn_resp
def collect(self, timeout=None, tasks_json=None, tasks=[]):
"""Waits on a set of Swarming tasks.
Returns both the step result as well as a set of neatly parsed results.
Args:
timeout: timeout to wait for result.
tasks_json: load details about the task(s) from the json file.
tasks: list of task ids to wait on.
"""
assert self._swarming_client
assert (tasks_json and not tasks) or (not tasks_json and tasks)
outdir = str(self.m.path.mkdtemp("swarming"))
cmd = [
self._swarming_client,
'collect',
'-server', self.swarming_server,
'-task-summary-json', self.m.json.output(),
'-task-output-stdout', 'json',
'-output-dir', outdir,
]
if timeout:
cmd.extend(['-timeout', timeout])
if tasks_json:
cmd.extend(['-requests-json', tasks_json])
if tasks:
cmd.extend(tasks)
cmd.extend([])
step_result = self.m.step(
'collect',
cmd,
infra_step=True,
step_test_data=lambda: self.test_api.collect()
)
parsed_results = [
CollectResult(self.m, id, task, outdir)
for id, task in step_result.json.output.iteritems()
]
# Fix presentation on collect to reflect bot results.
for result in parsed_results:
if result.output:
step_result.presentation.logs['Swarming task output: %s' % result.name] = (
result.output.split('\n')
)
return parsed_results