# Copyright 2019 The Fuchsia Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

import collections
import operator

import attr

from recipe_engine import recipe_api
from recipe_engine.config_types import Path

from RECIPE_MODULES.fuchsia.testsharder import api as testsharder_api

# List of available targets.
TARGETS = ['x64', 'arm64']

# Name of BigQuery project and table for uploading artifacts.
BIGQUERY_PROJECT = 'fuchsia-infra'
BIGQUERY_ARTIFACTS_DATASET = 'artifacts'
TEST_SUMMARY_JSON = 'summary.json'
KERNEL_LOG = 'kernel_log.txt'
COVARGS_LOG_LEVEL = 'debug'
COVARGS_OUTPUT_JSON = 'covargs-output.json'
SWARMING_TASK_IDS_PROPERTY = 'testing-swarming-task-ids'


@attr.s
class FuchsiaTestResults(object):
  """Represents the result of testing of a Fuchsia build.

  Attributes:
    from_fuchsia (bool): Whether the tests ran on Fuchsia.
    results_dir (Path): The directory that the test results archive has
      been unpacked into.
    output_dir (Path): A directory containing the outputs of the swarming
      task that ran these tests. Anything that's in this directory will be
      uploaded to GCS when upload_results() is called.
    outputs (dict[str]str): A mapping from of relative paths to files
      containing stdout+stderr data to strings containing those contents.
    swarming_task_id (str): The ID of the task that ran these tests.
    env_name (str): The name of the task that ran these tests.
    tests (seq(testsharder.Test)): The tests that this task was instructed
      to run (as opposed to the results of the tests that this task *did*
      run, which are enumerated in `summary`).
    legacy_qemu (bool): Whether these tests were run using QEMU with
      runtests (no ssh).
    api (RecipeApi): The api to use for accessing recipe modules from this
      object.
    symbolizer_output (Path|None): The path to the symbolized log file
      produced by running these tests.
    overwrite_summary (bool): Whether to set the "name" and "gn_label" fields
      in the summary.json produced by these tests using the corresponding
      values from the input tests.json. Only affects legacy QEMU tests.
      (Solely for backwards compatibility with fuchsia_perf.)
  """

  from_fuchsia = attr.ib(type=bool)
  results_dir = attr.ib(type=Path)
  output_dir = attr.ib(type=Path)
  outputs = attr.ib(type=dict)
  swarming_task_id = attr.ib(type=str)
  _env_name = attr.ib(type=str)
  _tests = attr.ib(type=testsharder_api.Test)
  _legacy_qemu = attr.ib(type=bool)
  _api = attr.ib(type=recipe_api.RecipeApi)
  _symbolizer_output = attr.ib(None, type=Path)
  # TODO(fxb/10410): Get rid of overwrite_summary after fuchsia_perf is dead.
  _overwrite_summary = attr.ib(True, type=bool)

  # Set lazily by the `summary` property, not a parameter to __init__.
  _summary = attr.ib(None, init=False)

  # Constants representing the result of running a test. These enumerate the
  # values of the 'results' field of the entries in the summary.json file
  # obtained from the target device.
  _TEST_RESULT_PASS = 'PASS'
  _TEST_RESULT_FAIL = 'FAIL'

  @property
  def summary(self):
    """The parsed summary file as a Dict or {} if missing."""
    if self._summary is None:
      self._summary = self._parse_summary()
    return self._summary

  @property
  def summary_lines(self):
    """Returns a list of the lines of the summary.json file."""
    return self._api.json.dumps(self.summary, indent=2).splitlines()

  @property
  def passed(self):
    """Whether all the tests passed."""
    tests = self.summary.get('tests', [])
    return all(test['result'] == self._TEST_RESULT_PASS for test in tests)

  @property
  def passed_test_outputs(self):
    """All entries in |self.outputs| for tests that passed."""
    return self._filter_outputs_by_test_result(self._TEST_RESULT_PASS)

  @property
  def failed_test_outputs(self):
    """All entries in |self.outputs| for tests that failed."""
    return self._filter_outputs_by_test_result(self._TEST_RESULT_FAIL)

  def _filter_outputs_by_test_result(self, result):
    """Returns all entries in |self.outputs| whose result is |result|.

    Args:
      result (String): one of the _TEST_RESULT_* constants from this class.

    Returns:
      A dict whose keys are paths to the files containing each test's
      stderr+stdout data and whose values are strings containing those
      contents.
    """
    matches = collections.OrderedDict()
    # TODO(kjharland): Sort test names first.
    for test in self.summary.get('tests', ()):
      if test['result'] == result:
        # The 'output_file' field is a path to the file containing the
        # stderr+stdout data for the test, and we inline the contents of that
        # file as the value in the returned dict.
        matches[test['name']] = self.outputs[test['output_file']]

    return matches

  def _parse_summary(self):
    raw_summary = self.outputs.get(TEST_SUMMARY_JSON, '')
    if not raw_summary:
      return {}

    try:
      summary = self._api.json.loads(raw_summary)
    except ValueError as e:  # pragma: no cover
      # TODO(olivernewman): JSONDecodeError in python >=3.5
      raise self._api.step.StepFailure('Invalid %s: %s' %
                                       (TEST_SUMMARY_JSON, e.args[0]))

    if not self._overwrite_summary or not self._legacy_qemu:
      return summary
    # We want all Fuchsia tests to have the package URL in the name field. But
    # QEMU tests set "name" to be the test install path (since the test list
    # sent to QEMU is a list of paths). So overwrite the "name" field to be the
    # package URL instead.
    # Also set "gn_label", which doesn't automatically get passed through from
    # tests.json.
    tests_by_path = {test.path: test for test in self._tests}
    for summary_test in summary['tests']:
      path = summary_test['name']
      # Some zircon tests get run even though they don't show up in tests.json.
      # TODO(olivernewman): After build unification is complete we can assume
      # that every test in summary.json will have a corresponding entry in
      # tests.json, so get rid of this check and update every summary test.
      if path in tests_by_path:
        test = tests_by_path[path]
        assert test.package_url
        summary_test.update(
            name=test.package_url,
            gn_label=test.label,
        )
    return summary

  def upload_results(self, gcs_bucket, upload_to_catapult):
    """Upload select test results (e.g., coverage data) to a given GCS bucket."""
    assert gcs_bucket
    with self._api.step.nest('upload %s test results' % self._env_name):
      if self.summary:
        # Save the summary JSON to the test shard output dir so it gets
        # uploaded to GCS for easy access by e.g. Dachsiaboard.
        summary_path = self.output_dir.join(TEST_SUMMARY_JSON)
        assert not self._api.path.exists(summary_path), (
            'test output files should not be named %s' % TEST_SUMMARY_JSON)
        self._api.file.write_json('write %s' % TEST_SUMMARY_JSON, summary_path,
                                  self.summary)

      self._upload_outputs(gcs_bucket)

      if upload_to_catapult:
        self._api.upload.test_outputs_to_catapult(self.output_dir)

  def _upload_outputs(self, gcs_bucket):
    self._api.upload.directory_to_gcs(
        source=self.output_dir,
        bucket=gcs_bucket,
        # Namespace to avoid collision across shards and attempts.
        subpath='%s/%s' % (self._env_name, self.swarming_task_id),
    )

  def raise_failures(self):
    """Raises a step failure if there were test failures."""
    if not self.summary:
      # Halt with step failure if summary file is missing.
      raise self._api.step.StepFailure(
          'Test summary JSON not found, see symbolized log for details')

    failed_tests = self.failed_test_outputs.keys()
    if failed_tests:
      # Halt with a step failure.
      raise self._api.step.StepFailure('Test failure(s): ' +
                                       ', '.join(failed_tests))

    # Check serial log for failure messages
    # TODO(9936): Replace with running binary tool once created.
    fail_strings = ['DEVICE SUSPEND TIMED OUT', 'ASSERT FAILED']
    log_path = self.output_dir.join(self._api.testing_requests.SERIAL_LOG_NAME)
    self._api.path.mock_add_paths(log_path)
    if self._api.path.exists(log_path):
      self._check_log_for_failures(log_path, fail_strings)

  def _check_log_for_failures(self, log_path, fail_strings):
    """Checks for fail strings in log and fails accordingly."""
    log_name = self._api.path.basename(log_path)
    with self._api.step.nest('check log %s:%s' %
                             (self._env_name, log_name)) as check_log_step:
      contents = self._api.file.read_text(
          'read %s' % log_name,
          log_path,
          test_data='extra log contents',
      ).get_result()
      for fail_str in fail_strings:
        if fail_str in contents:
          check_log_step.presentation.logs[log_name] = contents.splitlines()
          raise self._api.step.StepFailure(
              'Found failure string in log %s: %s' % (log_name, fail_str))


def create_task(api, *args, **kwargs):
  """Create a Task object.

  The base class of the class is inside the api object, so it can't be
  top-level or otherwise defined at module load time. Defining it in this
  function as an alternative.

  For full args list see Task.__init__ a few lines down.
  """

  class Task(api.swarming_retry.TriggeredTask):

    def __init__(self, api, name, request, uses_legacy_qemu, targets_fuchsia,
                 symbolize_tool, llvm_symbolizer, tests,
                 debug_symbol_gcs_bucket, *args, **kwargs):
      super(Task, self).__init__(
          api=api, name=name, request=request, *args, **kwargs)
      self._uses_legacy_qemu = uses_legacy_qemu
      self._targets_fuchsia = targets_fuchsia
      self._symbolize_tool = symbolize_tool
      self._llvm_symbolizer = llvm_symbolizer
      self._tests = tests
      self._debug_symbol_gcs_bucket = debug_symbol_gcs_bucket

      # Test shards with the 'multiplied:' prefix come from
      # tools/integration/testsharder/shard.go in fuchsia.git. They were
      # specifically created to run a test or set of tests many times to look
      # for flakes. It doesn't make sense to retry these when they fail--the
      # goal is to see if they fail not to get them to pass.
      if name.startswith('multiplied:'):
        self.max_attempts = 1

    def process_result(self):
      """Unpacks the results archive produced by a test shard."""

      attempt = self.attempts[-1]
      assert attempt.result
      result = attempt.result

      if result.isolated_outputs:
        attempt.task_outputs_link = result.isolated_outputs.url

      if result.state == self._api.swarming.TaskState.TIMED_OUT:
        attempt.failure_reason = 'timed out'

      attempt.test_results = None

      with self._api.step.nest(result.name):
        attempt.symbolizer_output = result.output_dir.join(
            self._api.symbolize.LOG)
        # Figure out what happened to the swarming task.
        if result.output:
          # Fuchsia crashes have to be symbolized on the host.
          if self._targets_fuchsia:
            attempt.logs['symbolized log'] = self._api.symbolize(
                symbolize_tool=self._symbolize_tool,
                debug_symbol_gcs_bucket=self._debug_symbol_gcs_bucket,
                llvm_symbolizer=self._llvm_symbolizer,
                data=result.output,
                symbolizer_output=attempt.symbolizer_output,
            )  # yapf:disable
          # Non-Fuchsia should already be symbolized, and attempting to use
          # the symbolizer may fail, if e.g. it was built on Mac and this is
          # running on Linux.
          else:
            attempt.logs['symbolized log'] = result.output

        if 'KERNEL PANIC' in result.output:
          attempt.failure_reason = 'KERNEL PANIC'  # pragma: no cover

        self._check_logs_for_failures(attempt)

        if result.success:
          self._process_outputs(attempt)

    def _process_outputs(self, attempt):
      """Reads the test results and output files of a swarming TaskResult.

      Sets attempt.test_results if successful.

      Args:
        attempt (swarming_retry.Attempt): the attempt to process
      """

      assert attempt.result
      result = attempt.result

      # Extract results if the task was not subject to an infra failure;
      # otherwise, a step failure will be raised on exiting the
      # defer_results() scope.
      attempt.test_results_archive = None
      for relative_path, absolute_path in sorted(result.outputs.iteritems()):
        if relative_path in [
            self._api.testing_requests.TEST_RESULTS_ARCHIVE_NAME,
            self._api.testing_requests.TEST_RESULTS_MINFS_NAME
        ]:
          attempt.test_results_archive = absolute_path

      assert attempt.test_results_archive, (
          'test archive not found amongst outputs of task %s' % result.name)
      self._parse_test_results(attempt)

      attempt.logs[TEST_SUMMARY_JSON] = attempt.test_results.summary_lines

      # Delete the archive so it doesn't get uploaded with the other files in
      # the swarming task's output directory.
      self._api.file.remove(
          'remove %s' % self._api.path.basename(attempt.test_results_archive),
          attempt.test_results_archive)

    def _parse_test_results(self, attempt):
      """Parse test results from attempt into a FuchsiaTestResults object.

      Args:
        attempt (swarming_retry.Attempt): the attempt to parse
      """
      assert attempt.result
      result = attempt.result

      results_dir = self._api.testing.results_dir_on_host.join(result.id)
      # pylint: disable=protected-access
      test_results_map = self._api.testing._extract_test_results_archive(
          step_name='extract',
          archive_path=attempt.test_results_archive,
          leak_to=results_dir,
          is_minfs=self._uses_legacy_qemu,
      )
      # pylint: enable=protected-access

      attempt.test_results = FuchsiaTestResults(
          from_fuchsia=self._targets_fuchsia,
          results_dir=results_dir,
          outputs=test_results_map,
          swarming_task_id=attempt.task_id,
          env_name=result.name,
          tests=self._tests,
          legacy_qemu=self._uses_legacy_qemu,
          api=api,
          symbolizer_output=attempt.symbolizer_output,
          output_dir=result.output_dir,
      )

      failed_tests = attempt.test_results.failed_test_outputs
      if failed_tests:
        attempt.failure_reason = '%d test(s) failed' % len(failed_tests)

    def _check_logs_for_failures(self, attempt):
      """Check for failure strings in logs.

      Args:
        attempt (swarming_retry.Attempt): the attempt to check for logs in
      """
      # Check serial log for failure messages
      # TODO(9936): Replace with running binary tool once created.
      fail_strings = ['DEVICE SUSPEND TIMED OUT', 'ASSERT FAILED']
      log_path = attempt.result.output_dir.join(
          self._api.testing_requests.SERIAL_LOG_NAME)
      self._api.path.mock_add_paths(log_path)
      if self._api.path.exists(log_path):
        log_name = self._api.path.basename(log_path)
        with self._api.step.nest('check log %s' % log_name) as presentation:
          contents = self._api.file.read_text('read', log_path)
          for fail_str in fail_strings:
            if fail_str in contents:
              presentation.logs[log_name] = contents.splitlines()
              presentation.status = self._api.step.FAILURE
              presentation.step_summary_text = 'found "%s"' % fail_str
              attempt.failure_reason = ('found "%s" in %s' %
                                        (fail_str, log_name))

    def present_status(self, parent_step, attempt, **kwargs):
      """Present an Attempt while showing progress in launch/collect step.

      Args:
        parent_step (Step): will always be 'passed tasks' or 'failed tasks'
        attempt (Attempt): the Attempt to present
      """
      del kwargs, parent_step  # Unused.
      with api.step.nest('%s (%s)' % (self.name, attempt.name)) as presentation:
        self._present(
            presentation,
            attempt,
            show_failures_in_red=False,
            show_passed=False)

    def present_attempt(self, _, attempt, category=None):
      """Present an Attempt when summarizing results at the end of the run.

      Args:
        attempt (Attempt): the Attempt to present
        category (str): the group of tasks ('passes', 'failures', or
          'flakes') that this attempt should be presented under
      """
      show_failures_in_red = True
      # The 'passes' category includes all attempts of all tasks that
      # eventually passed, so it includes some failures. Show those in
      # green so people don't get confused and think the overall task
      # failed.
      # TODO(fxb/36647) after this bug is fixed show these steps in
      # red, but show parent steps of those in green.
      if category == 'passes':
        show_failures_in_red = False

      name = '%s (%s)' % (attempt.name, 'pass' if attempt.success else 'fail')
      with api.step.nest(name) as presentation:
        if show_failures_in_red and not attempt.success:
          presentation.status = self._api.step.FAILURE
        self._present(
            presentation,
            attempt,
            show_failures_in_red=show_failures_in_red,
            show_passed=True,
        )

    def _present(self, presentation, attempt, show_failures_in_red,
                 show_passed):
      """Present an Attempt.

      Choosing to do largely the same thing for both kinds of presentations.

      Args:
        presentation (StepPresentation): where to present the attempt info
        attempt (api.swarming_retry.Attempt): object to present
        show_failures_in_red (bool): show failures in red (for final
            'flakes' and 'failures' steps) or not (for 'launch/collect'
            progress and 'passes' steps)
        show_passed (bool): show the names of passed tests (only done for
            the end)

      Note: the 'passes' step can have failures underneath it because the
      first attempt can fail but the retry passed.
      """
      if attempt.result.duration_secs:
        presentation.step_text = nice_time(attempt.result.duration_secs)

      presentation.presentation.links['swarming task'] = attempt.task_ui_link
      if attempt.task_outputs_link:
        presentation.links['task outputs'] = attempt.task_outputs_link

      if attempt.failure_reason:
        presentation.step_summary_text = attempt.failure_reason

      for log, data in attempt.logs.iteritems():
        presentation.logs[log] = data

      if attempt.test_results:
        test_results = attempt.test_results

        # Log the contents of each output file mentioned in the summary.
        # Note this assumes the outputs are all valid UTF-8 (See fxb/9500).
        for name, path in test_results.summary.get('outputs', {}).iteritems():
          presentation.logs[name] = test_results.outputs[path].split('\n')

        for test, output in test_results.failed_test_outputs.iteritems():
          self._report_test_result(
              test,
              output,
              passed=False,
              show_failures_in_red=show_failures_in_red,
          )

        with self._api.step.nest('all passed tests') as passed_tests_step:
          passed_tests = test_results.passed_test_outputs
          passed_tests_step.presentation.step_summary_text = (
              '%d passed tests' % len(passed_tests))
          if show_passed:
            # Start with a newline to prevent the first test from showing up on
            # the same line as the step name.
            passed_tests_step.presentation.step_text = ''.join(
                '\n' + test_name for test_name in passed_tests)

      for log_name in [
          self._api.testing_requests.SYSLOG_NAME,
          self._api.testing_requests.SERIAL_LOG_NAME
      ]:
        if log_name in attempt.result.outputs:
          self._present_output_file(
              name=log_name,
              path=attempt.result.outputs[log_name],
              step=presentation)

    def _report_test_result(self,
                            test,
                            output,
                            passed,
                            show_failures_in_red=True):
      if not passed:
        test = 'failed: %s' % test
      step_result = self._api.step(test, None)
      if not passed:
        step_result.presentation.logs['stdio'] = output.split('\n')
        if show_failures_in_red:
          step_result.presentation.status = self._api.step.FAILURE

    def _present_output_file(self, name, path, step):
      """Records file contents to the test results step's presentation."""
      contents = self._api.file.read_text(
          'read %s' % name,
          path,
          test_data='extra log contents',
      )
      step.presentation.logs[name] = contents.splitlines()

  return Task(*args, api=api, **kwargs)


class _ShardedTestRunner(object):
  """Handles running and analyzing tests that have been split into shards."""

  def __init__(self, api, collect_timeout, debug_symbol_gcs_bucket,
               llvm_symbolizer, max_attempts, swarming_output_dir,
               symbolize_tool, shard_requests):
    self._api = api
    self._swarming_output_dir = swarming_output_dir
    self._max_attempts = max_attempts
    self._collect_timeout = collect_timeout
    self.tasks = []
    for shard_request in shard_requests:
      uses_legacy_qemu = any(tag.lower() == 'uses_legacy_qemu:true'
                             for tag in shard_request.task_request.tags)
      targets_fuchsia = shard_request.task_request[0].dimensions.get(
          'os', '').lower() not in ('linux', 'mac')
      self.tasks.append(
          create_task(
              api=self._api,
              name=shard_request.task_request.name,
              request=shard_request.task_request,
              symbolize_tool=symbolize_tool,
              llvm_symbolizer=llvm_symbolizer,
              tests=shard_request.shard.tests,
              debug_symbol_gcs_bucket=debug_symbol_gcs_bucket,
              uses_legacy_qemu=uses_legacy_qemu,
              targets_fuchsia=targets_fuchsia,
          ))

  def run_tests(self):
    """Runs all test shards and returns a Task object for each."""
    # TODO(fxb/35021) use context manager.
    self._api.swarming_retry.run_tasks(
        tasks=self.tasks,
        collect_output_dir=self._swarming_output_dir,
        max_attempts=self._max_attempts,
        collect_timeout=self._collect_timeout,
    )

    self._api.swarming_retry.present_tasks(tasks=self.tasks)

    return self.tasks

  def raise_failures(self):
    self._api.swarming_retry.raise_failures(self.tasks)


class FuchsiaTestApi(recipe_api.RecipeApi):
  """API for running tests and processing test results."""

  FuchsiaTestResults = FuchsiaTestResults

  def __init__(self, *args, **kwargs):
    super(FuchsiaTestApi, self).__init__(*args, **kwargs)
    self._test_runner = None

  def _analyze_test_results(self, test_results, presentation=None):
    """Analyzes test results represented by FuchsiaTestResults objects

    Logs individual test results in separate steps.

    Args:
      test_results (FuchsiaTestResults): Fuchsia test result object
      presentation (dict|None): A particular step's presentation on which to log
        test result outputs; if not provided, that of the active result will be
        used.
    """
    if not test_results.summary:
      return
    presentation = presentation or self.m.step.active_result.presentation

    # Log the summary file's contents.
    presentation.logs[TEST_SUMMARY_JSON] = test_results.summary_lines

    # Log the contents of each output file mentioned in the summary.
    # Note this assumes the outputs are all valid UTF-8 (See fxb/9500).
    for output_name, output_path in test_results.summary.get('outputs',
                                                             {}).iteritems():
      output_str = test_results.outputs[output_path]
      presentation.logs[output_name] = output_str.split('\n')

    for test, output in test_results.failed_test_outputs.iteritems():
      self._report_test_result(test, output, passed=False)
    with self.m.step.nest('all passed tests'):
      for test, output in test_results.passed_test_outputs.iteritems():
        self._report_test_result(test, output, passed=True)

    # Consumed by the google3 results uploader.
    swarming_task_ids = presentation.properties.get(SWARMING_TASK_IDS_PROPERTY,
                                                    [])
    swarming_task_ids.append(test_results.swarming_task_id)
    presentation.properties[SWARMING_TASK_IDS_PROPERTY] = swarming_task_ids

  def _report_test_result(self, test, output, passed):
    name = test
    if not passed:
      # FlakeFetcher searches for the prefix "failed: " to find failed tests.
      name = 'failed: ' + name
    step_result = self.m.step(name, None)
    step_result.presentation.logs['stdio'] = output.split('\n')
    if not passed:
      step_result.presentation.status = self.m.step.FAILURE

  def process_coverage(self, covargs_path, test_results, ids_txt, llvm_profdata,
                       llvm_cov, gcs_bucket):
    output_dir = self.m.path['cleanup'].join('coverage')

    cmd = [
        covargs_path,
        '-level',
        COVARGS_LOG_LEVEL,
        '-json-output',
        self.m.json.output(name=COVARGS_OUTPUT_JSON),
        '-output-dir',
        output_dir,
        '-llvm-profdata',
        llvm_profdata,
        '-llvm-cov',
        llvm_cov,
        '-ids',
        ids_txt,
    ]

    for result in test_results:
      cmd.extend(['-summary', result.results_dir.join(TEST_SUMMARY_JSON)])

    self.m.step('covargs', cmd)

    # TODO: move this into gsutil module/deduplicate this with other GCS logic
    dst = 'builds/%s/coverage' % self.m.buildbucket_util.id
    step_result = self.m.gsutil.rsync(
        name='upload coverage',
        src=output_dir,
        bucket=gcs_bucket,
        dst=dst,
        recursive=True,
        gzip_exts=['html'],
        options={
            'parallel_process_count': self.m.platform.cpu_count,
            'parallel_thread_count': 1,
        },
        multithreaded=True)
    step_result.presentation.links['index.html'] = self.m.gsutil._http_url(
        gcs_bucket, self.m.gsutil.join(dst, 'index.html'), True)

  @property
  def results_dir_on_host(self):
    """The directory on host to which host and target test results will be written.

    Target test results will be copied over to this location and host test
    results will be written here. Host and target tests on should write to
    separate subdirectories so as not to collide.
    """
    return self.m.path['cleanup'].join('test_results')

  def _extract_test_results_archive(self,
                                    step_name,
                                    archive_path,
                                    is_minfs=False,
                                    leak_to=None):
    """Extracts test results from an archive.

    Args:
      step_name (str): The name of the step.
      archive_path (Path): The path to the archive which contains test results.
      is_minfs (bool): Whether the archive in question is a minfs image
        containing QEMU test results. If false, then the archive is assumed to
        be a tar file.
      leak_to (Path): Optionally leak the contents of the archive to a
        directory.

    Returns:
      A dict mapping a filepath relative to the root of the archive to the
      contents of that file in the archive.
    """
    if is_minfs:
      return self.m.minfs.copy_image(
          step_name=step_name,
          image_path=archive_path,
          out_dir=leak_to,
      ).raw_io.output_dir

    return self.m.tar.extract(
        step_name=step_name,
        path=archive_path,
        directory=self.m.raw_io.output_dir(leak_to=leak_to),
    ).raw_io.output_dir

  def deprecated_test(self, *args, **kwargs):
    """Tests a Fuchsia build on the specified device with retries.

    Expects the build and artifacts to be at the same place they were at
    the end of the build.

    Args (see _launch_collect_process_funcs for other args):
      max_attempts (int): The tests will be run repeatedly until either
        max_attempts is hit or all tests pass.

    Returns:
      A `FuchsiaTestResults` object corresponding to the last test attempt.
    """
    # Ideally this method's arguments would look like
    # (self, *args, max_attempts=0, **kwargs)
    # but Python 2 doesn't allow default keyword args after variable-length
    # positional *args :(
    max_attempts = kwargs.pop('max_attempts', 0)
    if not max_attempts:
      max_attempts = self.m.swarming_retry.DEFAULT_MAX_ATTEMPTS

    launch, collect, process = self._launch_collect_process_funcs(
        *args, **kwargs)

    test_results = None
    final_exception = None

    # TODO(olivernewman): status='last' should cause this step to turn green as
    # long as the *last* test attempt is green, but this isn't working, at
    # least not for led jobs (if the first attempt fails at the second passes,
    # the build is marked as a failure). Figure out whether this will be
    # resolved by using luci_runner.
    with self.m.step.nest('run tests', status='last'):
      for i in range(max_attempts):
        with self.m.step.nest('attempt %d' % i) as attempt_presentation:
          task_result = collect(launch())
          try:
            test_results = process(
                task_result, presentation=attempt_presentation)
          except self.m.step.StepFailure as e:
            final_exception = e
          else:
            final_exception = None
            if test_results.passed:
              attempt_presentation.step_text = 'passed'
              break
            else:
              failed_count = len(test_results.failed_test_outputs)
              attempt_presentation.step_text = ('%d test(s) failed' %
                                                failed_count)

      if final_exception:
        raise final_exception  # pylint: disable=raising-bad-type

      return test_results

  def deprecated_test_async(self, *args, **kwargs):
    """Launches a swarming task to run Fuchsia tests.

    Returns:
      A function that, when invoked, waits for the tests to complete and
      returns a `FuchsiaTestResults` object representing the completed test.
    """
    launch, collect, process = self._launch_collect_process_funcs(
        *args, **kwargs)
    request_metadata = launch()
    return lambda: process(collect(request_metadata))

  def _launch_collect_process_funcs(
      self,
      debug_symbol_gcs_bucket,
      device_type,
      orchestration_inputs,
      overwrite_summary=True,
  ):
    """Returns 3-tuple of functions to launch Fuchsia tests, wait for them to
    complete, and process the results.

    Args:
      debug_symbol_gcs_bucket (str): A GCS bucket hosting debug symbols.
      device_type (str): Used as swarming device_type dimension.
      orchestration_inputs (TestOrchestrationInputs): the bits of data
          needed to orchestrate testing.
      overwrite_summary (bool): Whether to overwrite the name and label
        fields in summary.json based on tests.json. This should *only* be
        used by fuchsia_perf; do NOT add any new dependencies on this.
        TODO(fxb/10410): remove this entirely after fuchsia_perf is dead.

    Returns:
      A tuple of functions:
      - `launch`, which takes no arguments and launches a swarming task to
        run tests against the given build artifacts. Returns a
        `TaskRequestMetadata` object.
      - `collect`, which takes the `TaskRequestMetadata` object returned by
        launch` (and, optionally, a `StepPresentation` object to add logs
        to). It blocks until the task is complete and returns a swarming
        `TaskResult`.
      - `process`, which processes the results and returns a
        `FuchsiaTestResults` object representing the completed tests.
    """
    task = orchestration_inputs.shard_requests[0].task_request
    # This directory gets passed into `collect()`, but unfortunately the
    # `output_dir` attribute of the `TaskResult` returned by `collect()` is
    # a subdirectory of this output dir (to ensure that different tasks'
    # outputs do not collide when calling `api.swarming.collect()` with many
    # tasks). So we make this variable in-scope for all three functions so that
    # `process()` can use it as the output dir for the test results object.
    output_dir = self.m.path.mkdtemp('swarming')

    def launch():
      with self.m.context(infra_steps=True):
        return self.m.swarming.trigger(
            'trigger 1 task', [task], cancel_extra_tasks=True)

    def collect(request_metadata):
      with self.m.context(infra_steps=True):
        results = self.m.swarming.collect(
            'collect', tasks=request_metadata, output_dir=output_dir)
        assert len(results) == 1, 'len(%s) != 1' % repr(results)
        return results[0]

    def process(task_result, presentation=None):
      symbolizer_output = output_dir.join(self.m.symbolize.LOG)
      with self.m.step.nest('task results'):
        self._analyze_task_result(
            result=task_result,
            debug_symbol_gcs_bucket=debug_symbol_gcs_bucket,
            symbolize_tool=orchestration_inputs.symbolize_tool,
            llvm_symbolizer=orchestration_inputs.llvm_symbolizer,
            symbolizer_output=symbolizer_output,
            presentation=presentation,
        )

      with self.m.context(infra_steps=True):
        # result.outputs contains the file outputs produced by the Swarming
        # task, returned via isolate. It's a mapping of the 'name' of the
        # output, represented as its relative path within the isolated it
        # was returned in, to a Path object pointing to its location on the
        # local disk. For each of the above tasks, there should be exactly
        # one output.
        serial_log_name = self.m.testing_requests.SERIAL_LOG_NAME
        if serial_log_name in task_result.outputs:
          serial_log = task_result.outputs.pop(serial_log_name)

          serial_log_contents = self.m.file.read_text(
              'read serial.txt', serial_log, test_data=[])
          serial_presentation = (
              presentation or self.m.step.active_result.presentation)
          serial_presentation.logs[serial_log_name] = (
              serial_log_contents.splitlines())
        assert len(task_result.outputs) == 1, 'len(%s) != 1' % repr(
            task_result.outputs)
        archive_name, archive_path = task_result.outputs.items()[0]

        test_results_dir = self.results_dir_on_host.join(
            'target', task_result.id)
        # _extract_test_results_archive needs minfs_path to be set.
        # This is kinda ugly. It'd be better to pass this in as an argument.
        self.m.minfs.minfs_path = orchestration_inputs.minfs
        test_results_map = self._extract_test_results_archive(
            step_name='extract results',
            is_minfs=self.m.emu.is_emulator_type(device_type),
            archive_path=archive_path,
            # Write test results to a subdirectory of |results_dir_on_host|
            # so as not to collide with host test results.
            leak_to=test_results_dir,
        )

        # Remove the archive file so it doesn't get uploaded to GCS.
        self.m.file.remove('remove %s' % archive_name, archive_path)

        test_list = self.m.file.read_json(
            'read tests.json', orchestration_inputs.tests_file, test_data=[])
        tests = [
            self.m.testsharder.Test.from_jsonish(t['test']) for t in test_list
        ]

      with self.m.step.nest('all test results'):
        test_results = self.FuchsiaTestResults(
            from_fuchsia=True,
            results_dir=test_results_dir,
            outputs=test_results_map,
            swarming_task_id=task_result.id,
            env_name=task_result.name,
            tests=tests,
            legacy_qemu=self.m.emu.is_emulator_type(device_type),
            api=self.m,
            symbolizer_output=symbolizer_output,
            output_dir=output_dir,
            overwrite_summary=overwrite_summary,
        )
        self._analyze_test_results(test_results, presentation=presentation)
        return test_results

    return launch, collect, process

  def final_results(self, tasks):
    """Returns the FuchsiaTestResults from the final attempt of each Task."""
    return [
        t.attempts[-1].test_results
        for t in tasks
        if t.attempts[-1].test_results
    ]

  def test_in_shards(self, collect_timeout_secs, debug_symbol_gcs_bucket,
                     max_attempts, orchestration_inputs):
    """Tests a Fuchsia build by sharding.

    Expects the build and artifacts to be at the same place they were at
    the end of the build.

    Args:
      debug_symbol_gcs_bucket (str): A GCS bucket hosting debug symbols.
      max_attempts (int): Maximum number of attempts before marking a shard
        as failed.
      collect_timeout_secs (int): Amount of time to wait for tasks to complete.


    Returns:
      A list of FuchsiaTestResults objects representing the completed test
      tasks that were not subject to an infra failure.
    """
    # If no shards have been provided, then we have successfully run the empty
    # set of tests.
    if not orchestration_inputs.shard_requests:
      return []

    self.m.minfs.minfs_path = orchestration_inputs.minfs
    collect_timeout = None
    if collect_timeout_secs:
      collect_timeout = '%ds' % collect_timeout_secs

    self._test_runner = _ShardedTestRunner(
        self.m,
        collect_timeout=collect_timeout,
        debug_symbol_gcs_bucket=debug_symbol_gcs_bucket,
        llvm_symbolizer=orchestration_inputs.llvm_symbolizer,
        max_attempts=max_attempts,
        swarming_output_dir=self.m.path.mkdtemp('swarming'),
        symbolize_tool=orchestration_inputs.symbolize_tool,
        shard_requests=orchestration_inputs.shard_requests,
    )
    return self._test_runner.run_tests()

  def raise_failures(self):
    if self._test_runner:
      self._test_runner.raise_failures()

  def _analyze_task_result(
      self,
      result,
      symbolize_tool,
      llvm_symbolizer,
      debug_symbol_gcs_bucket,
      symbolizer_output=None,
      presentation=None,
  ):
    """Analyzes a swarming.TaskResult and reports results as a step.

    Args:
      task_result (api.swarming.TaskResult): The swarming task result to
        analyze.
      symbolize_tool (Path): The path to the symbolize tool.
      llvm_symbolizer (Path): The path to the llvm_symbolizer tool.
      debug_symbol_gcs_bucket (str): A GCS bucket hosting debug symbols.
      symbolizer_output (Path|None): A path to a file to write the symbolizer's
        stdout.
      presentation (StepPresentation|None): The step presentation to attach
        logs to. Defaults to `active_result.presentation`.

    Raises:
      A StepFailure if a kernel panic is detected or an InfraFailure if the
      swarming task failed for a different reason.
    """
    presentation = presentation or self.m.step.active_result.presentation
    if result.output:
      # Always symbolize the result output if present in this case.
      presentation.logs['symbolized log'] = self.m.symbolize(
          symbolize_tool=symbolize_tool,
          debug_symbol_gcs_bucket=debug_symbol_gcs_bucket,
          llvm_symbolizer=llvm_symbolizer,
          data=result.output,
          symbolizer_output=symbolizer_output,
          presentation=presentation)

    # A kernel panic may be present in the logs even if the task timed out, so
    # check for that first.
    if 'KERNEL PANIC' in result.output:
      presentation.step_text = 'kernel panic'
      presentation.status = self.m.step.FAILURE
      raise self.m.step.StepFailure(
          'Found kernel panic. See symbolized output for details.')

    if result.isolated_outputs:
      presentation.links['test outputs'] = result.isolated_outputs.url

    try:
      result.analyze()
    except self.m.step.StepFailure:
      self._present_task_errors(result, presentation)
      raise

  def _present_task_errors(self, task_result, presentation):
    """Updates text and status of the given step to reflect test task errors."""
    # If the task is in an unknown state or completed, but the executed command returned
    # a non-zero exit code, this points to a tooling failure.
    if task_result.state is None or task_result.state == self.m.swarming.TaskState.COMPLETED:
      text = 'tooling failure'  # pragma: no cover
    else:
      text = (task_result.state.name).replace('_', ' ').lower()
    presentation.step_text = text

    # Report timeouts as red, not purple, as it is likelier that the task is
    # timing out due to a bug in the system under test.
    if task_result.state == self.m.swarming.TaskState.TIMED_OUT:
      status = self.m.step.FAILURE  # pragma: no cover
    else:
      status = self.m.step.EXCEPTION
    presentation.status = status


def product(nums):
  return reduce(operator.mul, nums)


def nice_time(seconds):
  """Returns a human-readable duration given a number of seconds.

  For example: 3605 seconds => '1h 5s'
  """
  units = [
      ('d', 24),
      ('h', 60),
      ('m', 60),
      ('s', 1),
  ]

  result_parts = []
  divisor = product(unit_size for _, unit_size in units)
  for unit_abbrev, unit_size in units:
    count, seconds = divmod(seconds, divisor)
    if count > 0:
      result_parts.append('%d%s' % (count, unit_abbrev))
    divisor /= unit_size

  return ' '.join(result_parts)
