# Copyright 2019 The Fuchsia Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

import copy

from recipe_engine import recipe_api

# System path at which authorized SSH keys are stored.
AUTHORIZED_KEY_PATH = 'data/ssh/authorized_keys'
# The path to the botanist config on the host.
BOTANIST_DEVICE_CONFIG = '/etc/botanist/config.json'
# The log level to use for botanist invocations in test tasks. Can be one of
# "fatal", "error", "warning", "info", "debug", or "trace", where "trace" is
# the most verbose, and fatal is the least.
BOTANIST_LOG_LEVEL = 'debug'
# Name of image manifest produced by the build.
IMAGES_JSON = 'images.json'
# The path in the BootFS manifest that we want runcmds to show up at.
RUNCMDS_BOOTFS_PATH = 'infra/runcmds'

STORAGE_FULL = 'storage-full'

# The PCI address to use for the block device to contain test results.
TEST_FS_PCI_ADDR = '06.0'

_PRIVATE_KEY_PATH = 'private_key'


class _TaskRequester(object):
  """Creates requests for swarming tasks that run tests."""

  def __init__(self, api, buildbucket_build, per_test_timeout_secs, pool,
               swarming_expiration_timeout_secs, swarming_io_timeout_secs,
               timeout_secs, use_runtests, default_service_account):
    self._api = api
    self._buildbucket_build = buildbucket_build
    self._per_test_timeout_secs = per_test_timeout_secs
    self._pool = pool
    self._swarming_expiration_timeout_secs = swarming_expiration_timeout_secs
    self._swarming_io_timeout_secs = swarming_io_timeout_secs
    self._timeout_secs = timeout_secs
    self._use_runtests = use_runtests
    self._default_service_account = default_service_account

  def request(self, shard, build_results):
    # Copy since we modify it for each shard.
    build_results = copy.deepcopy(build_results)
    task_request = self._construct_test_task_request(
        build_results=build_results, shard=shard)

    return self._api.build.ShardTaskRequest(shard, task_request)

  def _construct_test_task_request(self, build_results, shard):
    """Constructs a Swarming task request to run a shard of Fuchsia tests.

    Args:
      build_results (FuchsiaBuildResults): The Fuchsia build to test.
      shard (api.testsharder.Shard): A shard of tests.

    Returns:
      An api.swarming.TaskRequest representing the swarming task request.
    """
    # To freely archive files from the build directory, the source, and those we
    # dynamically create, we create a tree of symlinks in a fresh directory and
    # isolate that. This solves the problems of (a) finding a root directory
    # that works for all artifacts, (b) being able to create files in that
    # directory without fear of collision, and (c) not having to isolate
    # extraneous files.
    isolate_tree = self._api.file.symlink_tree(
        root=self._api.path.mkdtemp('isolate'))

    test_manifest = 'tests.json'
    self._api.file.write_json(
        'write test manifest',
        isolate_tree.root.join(test_manifest),
        [test.render_to_jsonish() for test in shard.tests],
        indent=2)

    cmd = []
    outputs = []
    ensure_file = self._api.cipd.EnsureFile()
    dimensions = {'pool': self._pool}
    test_bot_cpu = 'x64'
    is_emu_type = self._api.emu.is_emulator_type(shard.device_type)

    # This command spins up a metadata server that allows its subcommands to
    # automagically authenticate with LUCI auth, provided the sub-exec'ed tool
    # was written in go or dart and respectively makes use of the standard
    # cloud.google.com/go/compute/metadata or
    # github.com/dart-lang/googleapis_auth authentication libraries. Such
    # libraries look for a metadata server under environment variables
    # like $GCE_METADATA_HOST, which LUCI emulates.
    service_account = shard.service_account or self._default_service_account
    if service_account:
      # TODO(fxbug.dev/37142): Find a way to use the version that LUCI is
      # currently using, instead of 'latest'.
      ensure_file.add_package('infra/tools/luci-auth/${platform}', 'latest')
      cmd.extend(['./luci-auth', 'context', '--'])

    if is_emu_type:
      dimensions.update(os='Debian', cpu=build_results.target, kvm='1')
      # To take advantage of KVM, we execute QEMU-arm tasks on arm hardware.
      test_bot_cpu = build_results.target
    else:
      dimensions.update(shard.dimensions)

    image_manifest = '%s/%s' % (self._api.artifacts.image_url(), IMAGES_JSON)

    if shard.targets_fuchsia:
      botanist_cmd = [
          './botanist',
          '-level', BOTANIST_LOG_LEVEL,
          'run',
          '-images', image_manifest,
          '-timeout', '%ds' % self._timeout_secs,
          '-syslog', self._api.testing_requests.SYSLOG_NAME,
          '-serial-log', self._api.testing_requests.SERIAL_LOG_NAME,
      ] # yapf: disable
      outputs.append(self._api.testing_requests.SYSLOG_NAME)
      outputs.append(self._api.testing_requests.SERIAL_LOG_NAME)

      # TODO(fxbug.dev/40840): Once we can scope the proxy server to a
      # an individual task, we can make free use of it in the emulator case.
      if not is_emu_type:
        botanist_cmd.extend([
          # For container networking and authentication reasons, we access GCS
          # via a proxy server running on the controller.
          '-repo', self._api.artifacts.package_repo_url(host='$GCS_PROXY_HOST'),
          '-blobs', self._api.artifacts.package_blob_url(host='$GCS_PROXY_HOST'),
        ]) # yapf: disable

      config = BOTANIST_DEVICE_CONFIG
      if self._api.emu.is_emulator_type(shard.device_type):
        config = './qemu.json'
        botanist_cmd.extend(['-ssh', _PRIVATE_KEY_PATH])
        qemu_config = [{
            'type': shard.device_type.lower(),
            'path': './%s/bin' % shard.device_type.lower(),
            'target': build_results.target,
            'cpu': 8,
            'memory': 8192,
            'kvm': True,
        }]

        if shard.device_type == 'AEMU':
          self._api.emu.add_aemu_to_ensure_file(ensure_file, subdir='aemu/bin')
        elif shard.device_type == 'QEMU':
          self._api.emu.add_qemu_to_ensure_file(ensure_file, subdir='qemu')

        self._api.file.write_json(
            'write qemu config',
            isolate_tree.root.join('qemu.json'),
            qemu_config,
            indent=2)
      elif shard.netboot:
        botanist_cmd.append('-netboot')

      botanist_cmd.extend(['-config', config])

      cmd.extend(botanist_cmd)

    cmd.extend([
        './testrunner',
        '-archive',
        self._api.testing_requests.TEST_RESULTS_ARCHIVE_NAME,
    ])
    if self._use_runtests:
      cmd.append('-use-runtests')
    if self._per_test_timeout_secs:
      cmd.extend(['-per-test-timeout', '%ds' % self._per_test_timeout_secs])
    cmd.append(test_manifest)

    outputs.append(self._api.testing_requests.TEST_RESULTS_ARCHIVE_NAME)

    isolated_hash = self._api.testing_requests._isolate_build_artifacts(
        isolate_tree,
        build_results,
        shard=shard,
        test_bot_cpu=test_bot_cpu,
    )

    env_name = '%s-%s' % (shard.device_type or shard.os, build_results.target)
    tags = {'test_environment_name': [env_name]}

    request = (self._api.swarming.task_request().
        with_name(shard.name).
        with_tags(tags)
    ) #yapf: disable
    if service_account:
      request = request.with_service_account(service_account)
    return request.with_slice(0, request[0].
      with_command(cmd).
      with_isolated(isolated_hash).
      with_dimensions(**dimensions).
      with_expiration_secs(self._swarming_expiration_timeout_secs).
      with_io_timeout_secs(self._swarming_io_timeout_secs).
      with_execution_timeout_secs(self._timeout_secs).
      with_outputs(outputs).
      with_cipd_ensure_file(ensure_file).
      with_env_vars(**self._test_task_env_vars(shard, build_results))
    ) #yapf: disable

  def _test_task_env_vars(self, shard, build_results):
    """Returns the environment variables to be set for the test task.

    Returns:
      A dict mapping string env var names to string values.
    """
    build = self._buildbucket_build
    commit = build.input.gitiles_commit
    llvm_symbolizer = self._api.path.basename(build_results.llvm_symbolizer)
    env_vars = dict(
        # `${ISOLATED_OUTDIR}` is a magic string that Swarming will replace
        # with a temporary directory into which files will be automatically
        # collected upon exit of a task.
        FUCHSIA_TEST_OUTDIR='${ISOLATED_OUTDIR}',
        BUILDBUCKET_ID=str(build.id),
        BUILD_BOARD=build_results.board,
        BUILD_TYPE=build_results.build_type,
        BUILD_PRODUCT=build_results.product,
        BUILD_TARGET=build_results.target,
        BUILDBUCKET_BUCKET=build.builder.bucket,

        # Used for symbolization:
        ASAN_SYMBOLIZER_PATH=llvm_symbolizer,
        UBSAN_SYMBOLIZER_PATH=llvm_symbolizer,
        LSAN_SYMBOLIZER_PATH=llvm_symbolizer,

        # Used by the catapult converter
        BUILD_CREATE_TIME=str(build.create_time.seconds),
        BUILDER_NAME=build.builder.builder,
        FUCHSIA_DEVICE_TYPE=shard.device_type,
        INPUT_COMMIT_HOST=commit.host,
        INPUT_COMMIT_PROJECT=commit.project,
        INPUT_COMMIT_REF=commit.ref,
    )
    # For some reason, empty string environment variables sent to the swarming
    # API get interpreted as null and rejected. So don't bother sending them to
    # avoid breaking the task request.
    # TODO(olivernewman): Figure out whether this logic should be moved into
    # the upstream swarming module (or obviated by fixing the "" -> null
    # behavior).
    return {k: v for k, v in env_vars.iteritems() if v}


class TestingRequestsApi(recipe_api.RecipeApi):
  """APIs for constructing Swarming task requests to test Fuchsia."""

  SERIAL_LOG_NAME = 'serial.txt'
  SYSLOG_NAME = 'syslog.txt'
  TEST_RESULTS_ARCHIVE_NAME = 'out.tar'
  TEST_RESULTS_MINFS_NAME = 'output.fs'

  def shard_requests(
      self,
      build_results,
      buildbucket_build,
      per_test_timeout_secs,
      pool,
      shards,
      swarming_expiration_timeout_secs,
      swarming_io_timeout_secs,
      use_runtests,
      default_service_account=None,
      # TODO(garymm): Remove default value.
      # We should always get this from a spec.
      timeout_secs=40 * 60):
    """Returns a _ShardTaskRequest for each shard in build_artifact.shards.

    Args:
      build_results (FuchsiaBuildResults): The Fuchsia build results to test.
      buildbucket_build (build_pb2.Build): The buildbucket build that is going
        to orchestrate testing.
      per_test_timeout_secs (int): Any test that executes for longer than this
        will be considered failed.
      pool (str): The Swarming pool to schedule test tasks in.
      shards (list of testsharder.Shard): Test shards.
      use_runtests (bool): Whether to use runtests (or else run_test_component)
        when executing tests on target.
      default_service_account (str|None): The default service account to run the
        test task with. This is required for fetching images from GCS.
      timeout_secs (int): The amount of seconds to wait for the tests to execute
        before giving up.
    """
    # Embed the authorized key into the appropriate ZBI. This enabled us to SSH
    # into QEMU, in which case we are unable to supply the key at pave-time (as
    # QEMU instances are not paved.)
    has_emu_shard = any(
        self.m.emu.is_emulator_type(shard.device_type) for shard in shards)
    if has_emu_shard:
      self.m.zbi.zbi_path = build_results.zbi
      zbi_name = self._zbi_name(build_results)
      zbi_path = build_results.fuchsia_build_dir.join(
          build_results.images[zbi_name]['path'])

      self.m.zbi.copy_and_extend(
          step_name='embed authorized key',
          input_image=zbi_path,
          output_image=zbi_path,
          manifest={AUTHORIZED_KEY_PATH: build_results.authorized_key},
      )

    task_requester = _TaskRequester(
        self.m,
        buildbucket_build=buildbucket_build,
        per_test_timeout_secs=per_test_timeout_secs,
        pool=pool,
        swarming_expiration_timeout_secs=swarming_expiration_timeout_secs,
        swarming_io_timeout_secs=swarming_io_timeout_secs,
        timeout_secs=timeout_secs,
        use_runtests=use_runtests,
        default_service_account=default_service_account,
    )
    shard_requests = []
    for s in shards:
      with self.m.step.nest('shard %s' % s.name):
        shard_requests.append(task_requester.request(s, build_results))
    return shard_requests

  def deprecated_shard_requests(self,
                                build_results,
                                test_cmds,
                                device_type,
                                pool,
                                timeout_secs,
                                pave,
                                swarming_expiration_timeout_secs=18000,
                                swarming_io_timeout_secs=5 * 60,
                                default_service_account=None):
    """Returns a swarming task request for testing in the deprecated way.

    Args:
      build_results (FuchsiaBuildResults): The Fuchsia build to test.
      test_cmds (list[str]): Command to have Fuchsia run on boot.
      pool (str): Swarming pool from which the test task will be drawn.
      timeout_secs (int): The amount of seconds to wait for the tests to execute
        before giving up.
      pave (bool): Whether to pave the image to disk. Ignored if device_type ==
        'QEMU'.
      swarming_expiration_timeout_secs (int): Maximum run time for the swarming
        task, once scheduled (enforced by swarming).
      swarming_io_timeout_secs (int): The swarming task will be killed if it does
        not produce any output for this long.
      default_service_account (str|None): The default service account to run the
        task with.

    Returns:
      A list of a single ShardTaskRequest.
    """
    assert test_cmds
    assert device_type

    self.m.minfs.minfs_path = build_results.minfs
    self.m.zbi.zbi_path = build_results.zbi

    # Copy build_results because we modify its contents below.
    build_results = copy.deepcopy(build_results)
    self._install_runcmds_files(
        build_results,
        device_type=device_type,
        pave=pave,
        test_cmds=test_cmds,
    )

    if self.m.emu.is_emulator_type(device_type):
      task = self._construct_legacy_qemu_task_request(
          task_name='all tests',
          build_results=build_results,
          pool=pool,
          timeout_secs=timeout_secs,
          swarming_expiration_timeout_secs=swarming_expiration_timeout_secs,
          swarming_io_timeout_secs=swarming_io_timeout_secs,
          qemu_type=device_type,
          service_account=default_service_account,
      )
    else:
      task = self._construct_device_task_request(
          task_name='all tests',
          device_type=device_type,
          build_results=build_results,
          pool=pool,
          pave=pave,
          timeout_secs=timeout_secs,
          swarming_expiration_timeout_secs=swarming_expiration_timeout_secs,
          swarming_io_timeout_secs=swarming_io_timeout_secs,
          service_account=default_service_account,
      )
    # In the deprecated testing code paths, shards are not used, but it makes
    # other code simpler to have a valid shard here.
    dummy_shard = self.m.testsharder.Shard('dummy', (), {})
    return [self.m.build.ShardTaskRequest(dummy_shard, task)]

  def deprecated_test_cmds(self, test_spec):
    runtests_cmd_parts = ['runtests', '-o', self.results_dir_on_target]
    if test_spec.per_test_timeout_secs:
      runtests_cmd_parts.extend(['-i', '%d' % test_spec.per_test_timeout_secs])
    runtests_cmd_parts.append(test_spec.runtests_args)
    return [' '.join(runtests_cmd_parts)]

  @property
  def results_dir_on_target(self):
    """The directory on target to which target test results will be written."""
    return '/tmp/infra-test-output'

  def _construct_legacy_qemu_task_request(self,
                                          task_name,
                                          build_results,
                                          pool,
                                          timeout_secs,
                                          swarming_expiration_timeout_secs,
                                          swarming_io_timeout_secs,
                                          qemu_type,
                                          service_account,
                                          shard=None):
    """Constructs a Swarming task request which runs Fuchsia tests inside QEMU.

    Expects the build and artifacts to be at the same place they were at
    the end of the build.

    Args:
      build_results (FuchsiaBuildResults): The Fuchsia build to test.
      pool (str): Swarming pool from which the test task will be drawn.
      timeout_secs (int): The amount of seconds to wait for the tests to execute
        before giving up.
      qemu_type (str): type of qemu, either QEMU or AEMU.
      service_account (str|None): The service account to run the task with.
      shard (api.testsharder.Shard|None): The shard associated with the task or
        None if it's not a shard.

    Returns:
      An api.swarming.TaskRequest representing the swarming task request.
    """
    # To freely archive files from the build directory, the source, and those we
    # dynamically create, we create a tree of symlinks in a fresh directory and
    # isolate that. This solves the problems of (a) finding a root directory
    # that works for all artifacts, (b) being able to create files in that
    # directory without fear of collision, and (c) not having to isolate
    # extraneous files.
    isolate_tree = self.m.file.symlink_tree(root=self.m.path.mkdtemp('isolate'))

    # As part of running tests, we'll send a MinFS image over to another machine
    # which will be declared as a block device in QEMU, at which point
    # Fuchsia will mount it and write test output to. We choose 3.5G for the
    # MinFS image arbitrarily, as it appears it can hold our test output
    # comfortably without going overboard on size.
    #
    minfs_image_path = isolate_tree.root.join(self.TEST_RESULTS_MINFS_NAME)
    self.m.minfs.create(minfs_image_path, '3584M', name='create test image')

    cmd = []
    ensure_file = self.m.cipd.EnsureFile()
    if service_account:
      # TODO(fxbug.dev/37142): Find a way to use the version that LUCI is
      # currently using, instead of 'latest'.
      ensure_file.add_package('infra/tools/luci-auth/${platform}', 'latest')
      cmd.extend(['./luci-auth', 'context', '--'])

    image_manifest = '%s/%s' % (self.m.artifacts.image_url(), IMAGES_JSON)

    cmd.extend([
      './botanist',
      '-level', BOTANIST_LOG_LEVEL,
      'qemu',
      '-type', '%s' % qemu_type.lower(),
      '-qemu-dir', './%s/bin' % qemu_type.lower(),
      '-images', image_manifest,
      '-arch', build_results.target,
      '-minfs', self.TEST_RESULTS_MINFS_NAME,
      '-pci-addr', TEST_FS_PCI_ADDR,
      '-use-kvm'
    ]) # yapf: disable

    variants = ('asan', 'asan-ubsan', 'profile')
    if [v for v in variants if v in build_results.variants]:
      cmd.extend([
          '-cpu',
          str(8),
          '-memory',
          str(8192),
      ])

    # storage-full not being present signifies the exclusion of the system
    # partition, which means `boot` (i.e. running on boot) must be used instead
    # of `system` (i.e., running after the system partition is mounted).
    storage_free_build = STORAGE_FULL not in build_results.images
    arg_key = 'zircon.autorun.%s' % ('boot' if storage_free_build else 'system')
    cmd.append('%s=/boot/bin/sh+/boot/%s' % (arg_key, RUNCMDS_BOOTFS_PATH))

    isolated_hash = self._isolate_build_artifacts(
        isolate_tree,
        build_results,
        # To take advantage of KVM, we execute QEMU-arm tasks on arm hardware.
        test_bot_cpu=build_results.target,
        legacy_qemu=True,
    )

    if qemu_type == 'AEMU':
      self.m.emu.add_aemu_to_ensure_file(ensure_file, subdir='aemu/bin')
    elif qemu_type == 'QEMU':
      self.m.emu.add_qemu_to_ensure_file(ensure_file, subdir='qemu')

    env_name = '%s-%s' % (qemu_type, build_results.target)
    tags = {
        # consumed by google3 results uploader
        'test_environment_name': [env_name],
    }
    request = self.m.swarming.task_request().with_name(task_name).with_tags(
        tags)
    if service_account:
      request = request.with_service_account(service_account)
    return (request.with_slice(0, request[0].
      with_command(cmd).
      with_isolated(isolated_hash).
      with_dimensions(pool=pool, os='Debian', cpu=build_results.target, kvm='1').
      with_io_timeout_secs(swarming_io_timeout_secs).
      with_execution_timeout_secs(timeout_secs).
      with_expiration_secs(swarming_expiration_timeout_secs).
      with_outputs([self.TEST_RESULTS_MINFS_NAME]).
      with_cipd_ensure_file(ensure_file)
    )) #yapf: disable

  def _construct_device_task_request(self, task_name, device_type,
                                     build_results, pool, pave, timeout_secs,
                                     swarming_expiration_timeout_secs,
                                     swarming_io_timeout_secs, service_account):
    """Constructs a Swarming task request to run Fuchsia tests on a device.

    Expects the build and artifacts to be at the same place they were at
    the end of the build.

    Args:
      build_results (FuchsiaBuildResults): The Fuchsia build to test.
      pool (str): Swarming pool from which the test task will be drawn.
      pave (bool): Whether or not the build artifacts should be paved.
      timeout_secs (int): The amount of seconds to wait for the tests to execute
        before giving up.
      service_account (str|None): The service account to run the task with.

    Returns:
      An api.swarming.TaskRequest representing the swarming task request.
    """
    cmd = []
    ensure_file = self.m.cipd.EnsureFile()
    if service_account:
      # TODO(fxbug.dev/37142): Find a way to use the version that LUCI is
      # currently using, instead of 'latest'.
      ensure_file.add_package('infra/tools/luci-auth/${platform}', 'latest')
      cmd.extend(['./luci-auth', 'context', '--'])

    image_manifest = '%s/%s' % (self.m.artifacts.image_url(), IMAGES_JSON)

    # Construct the botanist command.
    cmd.extend([
      './botanist',
      '-level', BOTANIST_LOG_LEVEL,
      'zedboot',
      '-config', BOTANIST_DEVICE_CONFIG,
      '-images', image_manifest,
      '-results-dir', self.results_dir_on_target,
      '-out', self.TEST_RESULTS_ARCHIVE_NAME,
      '-serial-log', self.SERIAL_LOG_NAME,
    ]) # yapf: disable

    if not pave:
      cmd.append('-netboot')

    # storage-full not being present signifies the exclusion of the system
    # partition, which means `boot` (i.e. running on boot) must be used instead
    # of `system` (i.e., running after the system partition is mounted).
    storage_free_build = STORAGE_FULL not in build_results.images
    arg_key = 'zircon.autorun.%s' % ('boot' if storage_free_build else 'system')
    cmd.append('%s=/boot/bin/sh+/boot/%s' % (arg_key, RUNCMDS_BOOTFS_PATH))

    # To freely archive files from the build directory, the source, and those we
    # dynamically create, we create a tree of symlinks in a fresh directory and
    # isolate that. This solves the problems of (a) finding a root directory
    # that works for all artifacts, (b) being able to create files in that
    # directory without fear of collision, and (c) not having to isolate
    # extraneous files.
    isolate_tree = self.m.file.symlink_tree(root=self.m.path.mkdtemp('isolate'))
    isolated_hash = self._isolate_build_artifacts(isolate_tree, build_results)

    dimensions = {
        'pool': pool,
        'device_type': device_type,
    }

    env_name = '%s-%s' % (device_type, build_results.target)
    tags = {'test_environment_name': [env_name]}
    request = self.m.swarming.task_request().with_name(task_name).with_tags(
        tags)
    if service_account:
      request = request.with_service_account(service_account)
    return (request.with_slice(0, request[0].
      with_command(cmd).
      with_isolated(isolated_hash).
      with_dimensions(**dimensions).
      with_expiration_secs(swarming_expiration_timeout_secs).
      with_io_timeout_secs(swarming_io_timeout_secs).
      with_execution_timeout_secs(timeout_secs).
      with_outputs([self.TEST_RESULTS_ARCHIVE_NAME, self.SERIAL_LOG_NAME]).
      with_cipd_ensure_file(ensure_file)
    )) #yapf: disable

  def _create_runcmds_script(self, device_type, test_cmds, output_path):
    """Creates a script for running tests on boot."""
    # The device topological path is the toplogical path to the block device
    # which will contain test output.
    device_topological_path = '/dev/sys/pci/00:%s/virtio-block/block' % (
        TEST_FS_PCI_ADDR)

    # Script that mounts the block device to contain test output and runs tests,
    # dropping test output into the block device.
    results_dir = self.results_dir_on_target
    runcmds = [
        'mkdir %s' % results_dir,
    ]
    if self.m.emu.is_emulator_type(device_type):
      runcmds.extend([
          # Wait until the MinFS test image shows up (max <timeout> ms).
          'waitfor class=block topo=%s timeout=60000' % device_topological_path,
          'mount %s %s' % (device_topological_path, results_dir),
      ] + test_cmds + [
          'umount %s' % results_dir,
          'dm poweroff',
      ])
    else:
      runcmds.extend(test_cmds)
    runcmds_bytes = []
    for line in runcmds:
      if isinstance(line, unicode):
        runcmds_bytes.append(line.encode('utf-8'))
      elif isinstance(line, str):
        runcmds_bytes.append(line)
      else:  # pragma: no cover
        assert False, 'line is not unicode or a str: %s, %s' % (line,
                                                                type(line))
    self.m.file.write_text('write runcmds', output_path,
                           '\n'.join(runcmds_bytes))

  def _isolate_build_artifacts(
      self,
      isolate_tree,
      build_results,
      shard=None,
      test_bot_cpu='x64',
      legacy_qemu=False,
  ):
    """Populates a tree with build artifacts and isolates it.

    Specifically, the following is linked into or created within the tree:
      - The images in the build are linked in and manifest of them is created
        in the root, if targeting a fuchsia device;
      - The Linux/Mac tests in the shard and their runtime dependencies.

    Args:
      isolate_tree (api.file.SymlinkTree): A tree into which artifacts may be
        linked.
      build_results (FuchsiaBuildResults): The result of a fuchsia build.
      shard (api.testsharder.Shard|None): A test shard.
      test_bot_cpu (str|None): The host cpu of the bot running the test task.
      legacy_qemu (bool): Whether to only isolate the images needed to run QEMU
        alone.

    Returns:
      The isolated hash that may be used to reference and download the
      artifacts.
    """

    def register_link(relpath):
      """Prepares a symlink of a relative path within the build directory to the tree."""
      isolate_tree.register_link(
          target=build_results.fuchsia_build_dir.join(relpath),
          linkname=isolate_tree.root.join(relpath),
      )

    if shard:
      for test in shard.tests:
        if test.os in ['linux', 'mac']:
          register_link(test.path)
      for dep in shard.deps:
        register_link(dep)

    # If targeting QEMU we include a private key corresponding to an authorized
    # key already in the boot image; this is needed as we do not pave QEMU.
    if shard and self.m.emu.is_emulator_type(shard.device_type):
      isolate_tree.register_link(
          target=build_results.private_key,
          linkname=isolate_tree.root.join(_PRIVATE_KEY_PATH),
      )

    for tool in [
        build_results.tool('botanist', test_bot_cpu),
        build_results.tool('testrunner', test_bot_cpu),
        build_results.llvm_symbolizer,
        # TODO(fxb/38517): Replace with bootserver after there is only one.
        build_results.tool('bootserver_new'),
    ]:
      tool_name = self.m.path.basename(tool)
      isolate_tree.register_link(
          target=tool, linkname=isolate_tree.root.join(tool_name))

    isolate_tree.create_links('create tree of build artifacts')
    isolated = self.m.isolated.isolated(isolate_tree.root)
    isolated.add_dir(isolate_tree.root)
    return isolated.archive('isolate build artifacts')

  def _zbi_name(self, build_results, pave=True):
    zbi_name = next(
        (image['name'] for image in build_results.images.values() if '--boot' in
         image.get('bootserver_%s' % ('pave' if pave else 'netboot'), [])),
        'zircon-a')
    return zbi_name

  def _install_runcmds_files(self,
                             build_results,
                             per_test_timeout_secs=None,
                             device_type=None,
                             pave=False,
                             test_cmds=None):
    """Creates the files used to invoke runtests on boot.

    This is only necessary for QEMU shards, which are the only shards that
    use runcmds, and the non-sharding codepath.
    """
    assert device_type and test_cmds
    self.m.zbi.zbi_path = build_results.zbi

    # TODO(fxbug.dev/41930): The deprecated QEMU codepath explicitly looks for
    # a zircon-a. The attached bug will enable us to stop making these sorts of
    # assumptions.
    if self.m.emu.is_emulator_type(device_type):
      zbi_name = 'zircon-a'
    else:
      zbi_name = self._zbi_name(build_results, pave)
    runcmds_path = self.m.path['cleanup'].join('runcmds')
    self._create_runcmds_script(device_type, test_cmds, runcmds_path)

    zbi_path = build_results.fuchsia_build_dir.join(
        build_results.images[zbi_name]['path'])

    # Inject the runcmds script and/or authorized key into the bootfs image.
    self.m.zbi.copy_and_extend(
        step_name='create zbi',
        input_image=zbi_path,
        output_image=zbi_path,
        manifest={RUNCMDS_BOOTFS_PATH: runcmds_path},
    )
