blob: be237b549fe4c8092c5c0d9a0adc37e6e83b364e [file] [log] [blame]
#!/usr/bin/env python3.8
# Copyright 2019 The Fuchsia Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import errno
import json
import os
import re
import subprocess
from collections import defaultdict
from .fuzzer import Fuzzer
from .process import Process
class BuildEnv(object):
"""Represents a local build environment for Fuchsia.
This class abstracts various repository, tool, and build details.
fuchsia_dir: Path to Fuchsia source checkout.
cli: Associated CLI object.
build_dir: Path to the Fuchsia build output.
symbolizer_exec: Path to the Fuchsia symbolizer executable.
build_id_dirs: List of paths to symbolizer debug symbols.
gsutil: Path to the Google Cloud Storage utility.
llvm_cov: Path to the LLVM/Clang coverage tool.
llvm_prodata: Path to the LLVM/Clang profile data tool.
def __init__(self, factory):
assert factory, 'Factory not set.'
self._factory = factory
fuchsia_dir ='FUCHSIA_DIR')
if not fuchsia_dir:
'FUCHSIA_DIR not set.', 'Have you sourced "scripts/"?')
self._fuchsia_dir = fuchsia_dir
self._build_dir = None
self._symbolizer_exec = None
self._build_id_dirs = None
self._gsutil = None
self._llvm_cov = None
self._llvm_profdata = None
self._fuzzers = []
def fuchsia_dir(self):
return self._fuchsia_dir
def host(self):
def build_dir(self):
assert self._build_dir, 'Build directory not set'
return self._build_dir
def symbolizer_exec(self):
assert self._symbolizer_exec, 'Symbolizer executable not set.'
return self._symbolizer_exec
def symbolizer_exec(self, symbolizer_exec):
symbolizer_exec = self.abspath(symbolizer_exec)
if not
'Invalid symbolizer executable: {}'.format(symbolizer_exec))
self._symbolizer_exec = symbolizer_exec
def build_id_dirs(self):
assert self._build_id_dirs, 'Build ID directories not set.'
return self._build_id_dirs
def build_id_dirs(self, build_id_dirs):
abspaths = []
for build_id_dir in build_id_dirs:
abspath = self.abspath(build_id_dir)
srcpath = self.srcpath(build_id_dir)
if not
'Invalid build ID directory: {}'.format(srcpath))
self._build_id_dirs = abspaths
def gsutil(self):
if not self._gsutil:
self._gsutil = self.create_process(['which',
except subprocess.CalledProcessError:
'Unable to find gsutil.',
'Try installing the Google Cloud SDK.')
return self._gsutil
def gsutil(self, gsutil):
abspath = self.abspath(gsutil)
if not'Invalid GS utility: {}'.format(abspath))
self._gsutil = abspath
def llvm_cov(self):
assert self._llvm_cov, 'LLVM cov not set.'
return self._llvm_cov
def llvm_cov(self, llvm_cov):
llvm_cov = self.abspath(llvm_cov)
if not'Invalid LLVM cov: {}'.format(llvm_cov))
self._llvm_cov = llvm_cov
def llvm_profdata(self):
assert self._llvm_profdata, 'LLVM profdata not set.'
return self._llvm_profdata
def llvm_profdata(self, llvm_profdata):
llvm_profdata = self.abspath(llvm_profdata)
if not'Invalid LLVM profdata: {}'.format(llvm_profdata))
self._llvm_profdata = llvm_profdata
# Initialization routines
def configure(self, build_dir):
"""Sets multiple properties based on the given build directory."""
self._build_dir = self.abspath(build_dir)
clang_dir = '//prebuilt/third_party/clang/' +
self.symbolizer_exec = build_dir + '/host_x64/symbolizer'
self.build_id_dirs = [
clang_dir + '/lib/debug/.build-id',
build_dir + '/.build-id',
self.llvm_cov = clang_dir + '/bin/llvm-cov'
self.llvm_profdata = clang_dir + '/bin/llvm-profdata'
def read_fuzzers(self, pathname):
"""Parses the available fuzzers from an fuzzers.json pathname."""
with, on_error=[
'Failed to read fuzzers from {}.'.format(pathname),
'Have you run "fx set ... --fuzz-with <sanitizer>"?'
]) as opened:
metadata = json.load(opened)
fuzz_specs = []
by_label = defaultdict(dict)
for entry in metadata:
# Try v2 metadata first.
label = entry.get('label')
if label:
# Fallback to v1 metadata.
package = entry['fuzzers_package']
package_url = 'fuchsia-pkg://{}'.format(package)
for fuzzer in entry['fuzzers']:
'package': package,
'package_url': package_url,
'fuzzer': fuzzer,
'manifest': '{}.cmx'.format(fuzzer),
'label': '//generated/{}:{}'.format(package, fuzzer),
fuzz_specs += list(by_label.values())
self._fuzzers = [
Fuzzer(self._factory, fuzz_spec) for fuzz_spec in fuzz_specs
def fuzzers(self, name=None, include_tests=False):
"""Returns a (possibly filtered) list of fuzzers.
Matches the given name against the fuzzers previously instantiated by `read_fuzzers`.
name A name to filter on. If the name is an exact match, it will return a
list containing the matching fuzzer. If the name is of the form 'x/y',
the filtered list will include all the fuzzers where 'x' is a substring
of `package` and y is a substring of `executable`; otherwise it includes
all the fuzzers where `name` is a substring of either `package` or
`executable`. If blank or omitted, all fuzzers are returned.
include_tests A boolean flag indicating whether to include fuzzer tests as fuzzers.
This can be useful for commands which only act on the source tree
without regard to a fuzzer being deployed on a device.
A list of fuzzers matching the given name.
ValueError: Name is malformed, e.g. of the form 'x/y/z'.
fuzzers = [
fuzzer for fuzzer in self._fuzzers
if fuzzer.matches(name) and (include_tests or not fuzzer.is_test)
for fuzzer in fuzzers:
if name == str(fuzzer):
return [fuzzer]
return fuzzers
def fuzzer_tests(self, name=None):
"""Returns a (possibly filtered) list of fuzzer tests.
Like fuzzers(), but returns uninstrumented fuzzer_tests instead of instrumented fuzzers.
name A name to filter on. If the name is an exact match, it will return a list
containing the matching fuzzer test. If the name is of the form 'x/y', the
filtered list will include all the fuzzer tests where 'x' is a substring of
`package` and y is a substring of `executable` + '_test'; otherwise it includes
all the fuzzer tests where `name` is a substring of either `package` or
`executable` + '_test'. If blank or omitted, all fuzzer tests are returned.
A list of fuzzer tests matching the given name.
ValueError: Name is malformed, e.g. of the form 'x/y/z'.
fuzzer_tests = [
fuzzer for fuzzer in self._fuzzers
if fuzzer.matches(name) and fuzzer.is_test
for fuzzer in fuzzer_tests:
if name == str(fuzzer):
return [fuzzer]
return fuzzer_tests
# Other routines
def abspath(self, *segments):
"""Returns absolute path to a path in the build environment.
This method can handle three types of path inputs:
1. GN source-absolute paths, as identified by a leading '//', e.g. '//some/src/path'.
2. Paths that are already valid absolute paths, e.g. '/an/absolute/path'.
3. Paths interpreted as relative to the current working directory. e.g. 'a/relative/path'.
See also Host.getcwd().
This method normalizes the path using os.path.normpath, so forward slashes are automatically
converted on Windows and callers should simply use POSIX-style paths.
segments A list of path segments.
A string containing the absolute path.
assert segments, 'No path segments provided.'
if segments[0].startswith('//'):
joined = os.path.join(
self.fuchsia_dir, segments[0][2:], *segments[1:])
joined = os.path.join(*segments)
if not os.path.isabs(joined):
joined = os.path.join(, joined)
return os.path.normpath(joined)
def srcpath(self, label_or_path):
"""Returns a GN source-absolute path for a label, like GN's `get_label_info(..., "dir")`.
This method can handle the same types of path inputs as `abspath`, plus one more:
4. GN relative labels, i.e. labels without a leading '//'. The specific GN target is removed
and the remainder is treated as relative to the working directory, similar to how GN
As with `abspath`, callers should simply use forward slashes, as used by both POSIX-style
and GN-style paths.
label_or_path A string containing either a filesystem path or GN label.
A string containing the source-absolute path, that is one beginning with '//'.
joined = self.abspath(label_or_path.split(':')[0])
if not joined.startswith(self.fuchsia_dir):
'{} is not a path in the source tree.'.format(joined))
return '//' + os.path.relpath(joined, self.fuchsia_dir).replace(
os.sep, '/')
def find_device(self, device_name=None):
"""Returns the IPv6 address for a device."""
cmd = [
self.abspath(self.fuchsia_dir, '.jiri_root/bin/fx'), 'ffx',
'target', 'list', '--format', 'a'
if device_name:
cmd += [device_name]
addrs =
except subprocess.CalledProcessError as err:
return self._find_device_by_list_devices(device_name)
if not addrs:'Unable to find device.', 'Try "fx set-device".')
addrs = addrs.split('\n')
if len(addrs) != 1:'Multiple devices found.', 'Try "fx set-device".')
return addrs[0]
def _find_device_by_list_devices(self, device_name=None):
""" Find a device address by using fx list-devices
device_name: The name of the device to match to. If not present,
will only suceed if there is exactly 1 device.
The device address.
cmd = [
self.abspath(self.fuchsia_dir, '.jiri_root/bin/fx'), 'list-devices'
list_devices_out =
if not list_devices_out:'Unable to find device.', 'Try "fx set-device".')
# Try parsing the returned values
devices = list_devices_out.split('\n')
if devices:
if not device_name:
if len(devices) != 1:
'Multiple devices found.', 'Try "fx set-device".')
return devices[0].split(' ', 1)[0]
# Find the matching device name
for device in devices:
addr, name = device.split(' ', 1)
if name == device_name:
return addr'Unable to find device.', 'Try "fx set-device".')
def symbolize(self, raw, json_output=None):
"""Symbolizes backtraces in a log file using the current build.
raw: Bytes representing unsymbolized lines.
json_output: If present, outputs trigger information to the specified file.
Bytes representing symbolized lines.
cmd = [self.symbolizer_exec]
for build_id_dir in self.build_id_dirs:
cmd += ['--build-id-dir', build_id_dir]
if json_output:
cmd += ['--json-output', json_output]
process =
process.stdin = subprocess.PIPE
process.stdout = subprocess.PIPE
popened = process.popen()
out, _ = popened.communicate(raw)
if popened.returncode != 0:
out = ''
return re.sub(r'[0-9\[\]\.]*\[klog\] INFO: ', '', out)
def testsharder(self, executable_url, out_dir, realm_label=None):
"""Shards the available tests into _one_ test shard per environment for
use by testrunner.
executable_url: The fuchsia pkg url of the test to generate coverage for.
out_dir: The output directory into which to write the sharded tests file.
The path to a generated file containing exactly _one_ test definition
as extracted from the tests.json file by testsharder.
sharder_out = os.path.join(out_dir, 'testsharder_out.json')
# Generate the tests but force 1 shard maximum
cmd = [os.path.join(self.build_dir, 'host_x64', 'testsharder')] \
+ ['-build-dir', self.build_dir] \
+ ['-max-shards-per-env', '1'] \
+ ['-output-file', sharder_out]
if realm_label:
cmd = cmd + ['-realm-label', realm_label]
with as f:
sharded = json.load(f)
# Depending on the set of tests included we could get multiple shards.
# One per environment, but we should only have 1 AEMU* shard.
found_aemu = False
all_tests = []
for shard in sharded:
if shard['name'].startswith('AEMU'):
if found_aemu:
'Expected a single AEMU shard, but got more than one.')
found_aemu = True
test_out = os.path.join(
out_dir, 'shard_{}_tests.json'.format(shard['name']))
all_tests = [
t for t in shard['tests'] if t.get('name') == executable_url
if not found_aemu:'Unable to find any tests for AEMU shards.')
if not all_tests:'Found no matching tests to run.')
'Found {} tests to generate coverage report for.'.format(
with, 'w') as out_file:
json.dump(all_tests, out_file)
return test_out
def testrunner(self, shard_file, out_dir, device):
"""Runs testrunner over a file of tests generated by testsharder which
generates the artifacts used for SBCC.
Additionally, dumps logs of associated pids to an output file for symbolizing.
shard_file: A fully qualified path to the sharded tests file.
out_dir: The output directory into which to write the log dump.
device: Reference to the device to get logs from.
The path to the testrunner output.
The path to the log dump file.
if not
'Unable to find sharded test file at {}.'.format(shard_file))
runner_out_dir = os.path.join(out_dir, 'testrunner_out')
cmd = [os.path.join(self.build_dir, 'host_x64', 'testrunner')] \
+ ['-out-dir', runner_out_dir] \
+ ['-use-runtests', '-per-test-timeout', '600s'] \
+ [shard_file]
out =
# fxb/64774 Catch a timeout error and check that the device addr is correct
# Look for the marker log in the testrunner output
pids = []
for line in out.splitlines():
if 'Fuzzer built as test' in line:
parts = line.split('][')
if len(parts) > 2:
if len(pids) < 1:'Unable to find a matching test fuzzer pid.')
raw = device.dump_log('--pid', str(pids[0])), 'log_dumps'))
# Strip the .json suffix from the end of the shard_file name.
shard_file_basename = os.path.basename(shard_file)[:-5]
log_dump_out = os.path.join(
out_dir, 'log_dumps', 'dump_{}'.format(shard_file_basename))
with, 'w') as out_file:
return runner_out_dir, log_dump_out
def covargs(self, runner_dir, symbolize_file, out_dir):
"""Runs covargs to generate a SBCC given a testrunner output and symbolize file.
runner_dir: The path to the testrunner output which contains a summary.json file.
symbolize_file: The path to the symbolized profile json output.
out_dir: The output directory into which to write the coverage report
The path to the generated coverage report.
summary_json_file = os.path.join(runner_dir, 'summary.json')
if not
'Unable to find summary.json file at {}.'.format(
if not
'Unable to find symbolize file at {}.'.format(symbolize_file))
coverage_path = os.path.join(out_dir, 'covargs_out')
cmd = [os.path.join(self.build_dir, 'host_x64', 'covargs')] \
+ ['-llvm-cov', self.llvm_cov] \
+ ['-llvm-profdata', self.llvm_profdata] \
+ ['-summary', summary_json_file] \
+ ['-symbolize-dump', symbolize_file] \
+ ['-output-dir', coverage_path]
for build_id_dir in self.build_id_dirs:
cmd += ['-build-id-dir', build_id_dir]
return coverage_path