| # Copyright 2017 The Fuchsia Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| from recipe_engine import recipe_api |
| |
| |
| class CollectResult(object): |
| """Wrapper object for collect results.""" |
| |
| def __init__(self, raw_results): |
| self._raw_results = raw_results |
| self._is_error = 'results' not in raw_results |
| if self._is_error: |
| self._output = self._raw_results['Body'] |
| else: |
| self._output = self._raw_results['output'] |
| |
| def is_task_failure(self): |
| return (not self._is_error) and ('failure' in self._raw_results['results']) |
| |
| def is_infra_failure(self): |
| return (self._is_error) or ('internal_failure' in self._raw_results['results']) |
| |
| @property |
| def output(self): |
| return self._output |
| |
| |
| class SwarmingApi(recipe_api.RecipeApi): |
| """APIs for interacting with swarming.""" |
| |
| def __init__(self, swarming_server, *args, **kwargs): |
| super(SwarmingApi, self).__init__(*args, **kwargs) |
| self._swarming_server = swarming_server |
| self._swarming_client = None |
| |
| def __call__(self, *args, **kwargs): |
| """Return a swarming command step.""" |
| assert self._swarming_client |
| name = kwargs.pop('name', 'swarming ' + args[0]) |
| return self.m.step(name, [self._swarming_client] + list(args), **kwargs) |
| |
| def ensure_swarming(self, version=None): |
| """Ensures that swarming client is installed.""" |
| with self.m.step.nest('ensure_swarming'): |
| with self.m.context(infra_steps=True): |
| swarming_package = ('infra/tools/luci/swarming/%s' % |
| self.m.cipd.platform_suffix()) |
| luci_dir = self.m.path['start_dir'].join('cipd', 'luci', 'swarming') |
| |
| self.m.cipd.ensure(luci_dir, |
| {swarming_package: version or 'release'}) |
| self._swarming_client = luci_dir.join('swarming') |
| |
| return self._swarming_client |
| |
| @property |
| def swarming_client(self): |
| return self._swarming_client |
| |
| @property |
| def swarming_server(self): |
| """URL of Swarming server to use, default is a production one.""" |
| return self._swarming_server |
| |
| @swarming_server.setter |
| def swarming_server(self, value): |
| """Changes URL of Swarming server to use.""" |
| self._swarming_server = value |
| |
| def trigger(self, name, raw_cmd, isolated=None, dump_json=None, |
| dimensions=None, expiration=None, io_timeout=None, |
| idempotent=False, cipd_packages=None): |
| """Triggers a Swarming task. |
| |
| Args: |
| name: name of the task. |
| raw_cmd: task command. |
| isolated: hash of isolated test on isolate server. |
| dump_json: dump details about the triggered task(s). |
| dimensions: dimensions to filter slaves on. |
| expiration: seconds before this task request expires. |
| io_timeout: seconds to allow the task to be silent. |
| idempotent: whether this task is considered idempotent. |
| cipd_packages: list of 3-tuples corresponding to CIPD packages needed for |
| the task: ('path', 'package_name', 'version'), defined as follows: |
| path: Path relative to the Swarming root dir in which to install |
| the package. |
| package_name: Name of the package to install, |
| eg. "infra/tools/authutil/${platform}" |
| version: Version of the package, either a package instance ID, |
| ref, or tag key/value pair. |
| """ |
| assert self._swarming_client |
| cmd = [ |
| self._swarming_client, |
| 'trigger', |
| '-isolate-server', self.m.isolate.isolate_server, |
| '-server', self.swarming_server, |
| '-task-name', name, |
| '-namespace', 'default-gzip', |
| '-dump-json', self.m.json.output(leak_to=dump_json), |
| ] |
| if isolated: |
| cmd.extend(['-isolated', isolated]) |
| if dimensions: |
| for k, v in sorted(dimensions.iteritems()): |
| cmd.extend(['-dimension', '%s=%s' % (k, v)]) |
| if expiration: |
| cmd.extend(['-expiration', str(expiration)]) |
| if io_timeout: |
| cmd.extend(['-io-timeout', str(io_timeout)]) |
| if idempotent: |
| cmd.append('-idempotent') |
| if cipd_packages: |
| for path, pkg, version in cipd_packages: |
| cmd.extend(['-cipd-package', '%s:%s=%s' % (path, pkg, version)]) |
| cmd.extend(['-raw-cmd', '--'] + raw_cmd) |
| return self.m.step( |
| 'trigger %s' % name, |
| cmd, |
| step_test_data=lambda: self.test_api.trigger(name, raw_cmd, |
| dimensions=dimensions, cipd_packages=cipd_packages) |
| ) |
| |
| def collect(self, timeout, requests_json=None, tasks=[]): |
| """Waits on a set of Swarming tasks. |
| |
| Returns both the step result as well as a set of neatly parsed results. |
| |
| Args: |
| timeout: timeout to wait for result. |
| requests_json: load details about the task(s) from the json file. |
| tasks: list of task ids to wait on. |
| """ |
| assert self._swarming_client |
| assert (requests_json and not tasks) or (not requests_json and tasks) |
| cmd = [ |
| self._swarming_client, |
| 'collect', |
| '-server', self.swarming_server, |
| '-task-summary-json', self.m.json.output(), |
| '-task-output-stdout', 'json', |
| '-timeout', timeout, |
| ] |
| if requests_json: |
| cmd.extend(['-requests-json', requests_json]) |
| if tasks: |
| cmd.extend(tasks) |
| step_result = self.m.step( |
| 'collect', |
| cmd, |
| infra_step=True, |
| step_test_data=lambda: self.test_api.collect() |
| ) |
| parsed_results = [CollectResult(task) for task in step_result.json.output['tasks']] |
| |
| # Fix presentation on collect to reflect bot results. |
| for i in range(len(parsed_results)): |
| # TODO(mknyszek): add task IDs to error results, so we can replace 'i' |
| # with a task ID. |
| step_result.presentation.logs['stdout.%s' % i] = parsed_results[i].output.split('\n') |
| |
| # TODO(mknyszek): add task IDs to error results so this information can be |
| # more detailed. |
| if any([result.is_task_failure() for result in parsed_results]): |
| raise self.m.step.StepFailure('Test task failed. See logs.') |
| elif any([result.is_infra_failure() for result in parsed_results]): |
| raise self.m.step.InfraFailure('Received failure from server when trying to collect.') |
| |
| return step_result |