| #!/usr/bin/env python3.8 |
| # Copyright 2019 The Fuchsia Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| import errno |
| import json |
| import os |
| import re |
| import subprocess |
| from collections import defaultdict |
| |
| from .fuzzer import Fuzzer |
| from .process import Process |
| |
| |
| class BuildEnv(object): |
| """Represents a local build environment for Fuchsia. |
| |
| This class abstracts various repository, tool, and build details. |
| |
| Attributes: |
| fuchsia_dir: Path to Fuchsia source checkout. |
| cli: Associated CLI object. |
| build_dir: Path to the Fuchsia build output. |
| symbolizer_exec: Path to the Fuchsia symbolizer executable. |
| build_id_dirs: List of paths to symbolizer debug symbols. |
| gsutil: Path to the Google Cloud Storage utility. |
| llvm_cov: Path to the LLVM/Clang coverage tool. |
| llvm_prodata: Path to the LLVM/Clang profile data tool. |
| """ |
| |
| def __init__(self, factory): |
| assert factory, 'Factory not set.' |
| self._factory = factory |
| fuchsia_dir = self.host.getenv('FUCHSIA_DIR') |
| if not fuchsia_dir: |
| self.host.error( |
| 'FUCHSIA_DIR not set.', 'Have you sourced "scripts/fx-env.sh"?') |
| self._fuchsia_dir = fuchsia_dir |
| self._build_dir = None |
| self._symbolizer_exec = None |
| self._build_id_dirs = None |
| self._gsutil = None |
| self._llvm_cov = None |
| self._llvm_profdata = None |
| self._fuzzers = [] |
| |
| @property |
| def fuchsia_dir(self): |
| return self._fuchsia_dir |
| |
| @property |
| def host(self): |
| return self._factory.host |
| |
| @property |
| def build_dir(self): |
| assert self._build_dir, 'Build directory not set' |
| return self._build_dir |
| |
| @property |
| def symbolizer_exec(self): |
| assert self._symbolizer_exec, 'Symbolizer executable not set.' |
| return self._symbolizer_exec |
| |
| @symbolizer_exec.setter |
| def symbolizer_exec(self, symbolizer_exec): |
| symbolizer_exec = self.abspath(symbolizer_exec) |
| if not self.host.isfile(symbolizer_exec): |
| self.host.error( |
| 'Invalid symbolizer executable: {}'.format(symbolizer_exec)) |
| self._symbolizer_exec = symbolizer_exec |
| |
| @property |
| def build_id_dirs(self): |
| assert self._build_id_dirs, 'Build ID directories not set.' |
| return self._build_id_dirs |
| |
| @build_id_dirs.setter |
| def build_id_dirs(self, build_id_dirs): |
| abspaths = [] |
| for build_id_dir in build_id_dirs: |
| abspath = self.abspath(build_id_dir) |
| srcpath = self.srcpath(build_id_dir) |
| if not self.host.isdir(abspath): |
| self.host.error( |
| 'Invalid build ID directory: {}'.format(srcpath)) |
| abspaths.append(abspath) |
| self._build_id_dirs = abspaths |
| |
| @property |
| def gsutil(self): |
| if not self._gsutil: |
| try: |
| self._gsutil = self.create_process(['which', |
| 'gsutil']).check_output() |
| except subprocess.CalledProcessError: |
| self.host.error( |
| 'Unable to find gsutil.', |
| 'Try installing the Google Cloud SDK.') |
| return self._gsutil |
| |
| @gsutil.setter |
| def gsutil(self, gsutil): |
| abspath = self.abspath(gsutil) |
| if not self.host.isfile(abspath): |
| self.host.error('Invalid GS utility: {}'.format(abspath)) |
| self._gsutil = abspath |
| |
| @property |
| def llvm_cov(self): |
| assert self._llvm_cov, 'LLVM cov not set.' |
| return self._llvm_cov |
| |
| @llvm_cov.setter |
| def llvm_cov(self, llvm_cov): |
| llvm_cov = self.abspath(llvm_cov) |
| if not self.host.isfile(llvm_cov): |
| self.host.error('Invalid LLVM cov: {}'.format(llvm_cov)) |
| self._llvm_cov = llvm_cov |
| |
| @property |
| def llvm_profdata(self): |
| assert self._llvm_profdata, 'LLVM profdata not set.' |
| return self._llvm_profdata |
| |
| @llvm_profdata.setter |
| def llvm_profdata(self, llvm_profdata): |
| llvm_profdata = self.abspath(llvm_profdata) |
| if not self.host.isfile(llvm_profdata): |
| self.host.error('Invalid LLVM profdata: {}'.format(llvm_profdata)) |
| self._llvm_profdata = llvm_profdata |
| |
| # Initialization routines |
| |
| def configure(self, build_dir): |
| """Sets multiple properties based on the given build directory.""" |
| self._build_dir = self.abspath(build_dir) |
| clang_dir = '//prebuilt/third_party/clang/' + self.host.platform |
| self.symbolizer_exec = build_dir + '/host_x64/symbolizer' |
| self.build_id_dirs = [ |
| clang_dir + '/lib/debug/.build-id', |
| build_dir + '/.build-id', |
| ] |
| self.llvm_cov = clang_dir + '/bin/llvm-cov' |
| self.llvm_profdata = clang_dir + '/bin/llvm-profdata' |
| |
| def read_fuzzers(self, pathname): |
| """Parses the available fuzzers from an fuzzers.json pathname.""" |
| with self.host.open(pathname, on_error=[ |
| 'Failed to read fuzzers from {}.'.format(pathname), |
| 'Have you run "fx set ... --fuzz-with <sanitizer>"?' |
| ]) as opened: |
| metadata = json.load(opened) |
| |
| fuzz_specs = [] |
| by_label = defaultdict(dict) |
| for entry in metadata: |
| # Try v2 metadata first. |
| label = entry.get('label') |
| if label: |
| by_label[label].update(entry) |
| continue |
| # Fallback to v1 metadata. |
| package = entry['fuzzers_package'] |
| package_url = 'fuchsia-pkg://fuchsia.com/{}'.format(package) |
| for fuzzer in entry['fuzzers']: |
| fuzz_specs.append( |
| { |
| 'package': package, |
| 'package_url': package_url, |
| 'fuzzer': fuzzer, |
| 'manifest': '{}.cmx'.format(fuzzer), |
| 'label': '//generated/{}:{}'.format(package, fuzzer), |
| }) |
| fuzz_specs += list(by_label.values()) |
| self._fuzzers = [ |
| Fuzzer(self._factory, fuzz_spec) for fuzz_spec in fuzz_specs |
| ] |
| self._fuzzers.sort() |
| |
| def fuzzers(self, name=None, include_tests=False): |
| """Returns a (possibly filtered) list of fuzzers. |
| |
| Matches the given name against the fuzzers previously instantiated by `read_fuzzers`. |
| |
| Parameters: |
| name A name to filter on. If the name is an exact match, it will return a |
| list containing the matching fuzzer. If the name is of the form 'x/y', |
| the filtered list will include all the fuzzers where 'x' is a substring |
| of `package` and y is a substring of `executable`; otherwise it includes |
| all the fuzzers where `name` is a substring of either `package` or |
| `executable`. If blank or omitted, all fuzzers are returned. |
| include_tests A boolean flag indicating whether to include fuzzer tests as fuzzers. |
| This can be useful for commands which only act on the source tree |
| without regard to a fuzzer being deployed on a device. |
| |
| Returns: |
| A list of fuzzers matching the given name. |
| |
| Raises: |
| ValueError: Name is malformed, e.g. of the form 'x/y/z'. |
| """ |
| fuzzers = [ |
| fuzzer for fuzzer in self._fuzzers |
| if fuzzer.matches(name) and (include_tests or not fuzzer.is_test) |
| ] |
| for fuzzer in fuzzers: |
| if name == str(fuzzer): |
| return [fuzzer] |
| return fuzzers |
| |
| def fuzzer_tests(self, name=None): |
| """Returns a (possibly filtered) list of fuzzer tests. |
| |
| Like fuzzers(), but returns uninstrumented fuzzer_tests instead of instrumented fuzzers. |
| |
| Parameters: |
| name A name to filter on. If the name is an exact match, it will return a list |
| containing the matching fuzzer test. If the name is of the form 'x/y', the |
| filtered list will include all the fuzzer tests where 'x' is a substring of |
| `package` and y is a substring of `executable` + '_test'; otherwise it includes |
| all the fuzzer tests where `name` is a substring of either `package` or |
| `executable` + '_test'. If blank or omitted, all fuzzer tests are returned. |
| |
| Returns: |
| A list of fuzzer tests matching the given name. |
| |
| Raises: |
| ValueError: Name is malformed, e.g. of the form 'x/y/z'. |
| """ |
| fuzzer_tests = [ |
| fuzzer for fuzzer in self._fuzzers |
| if fuzzer.matches(name) and fuzzer.is_test |
| ] |
| for fuzzer in fuzzer_tests: |
| if name == str(fuzzer): |
| return [fuzzer] |
| return fuzzer_tests |
| |
| # Other routines |
| |
| def abspath(self, *segments): |
| """Returns absolute path to a path in the build environment. |
| |
| This method can handle three types of path inputs: |
| 1. GN source-absolute paths, as identified by a leading '//', e.g. '//some/src/path'. |
| 2. Paths that are already valid absolute paths, e.g. '/an/absolute/path'. |
| 3. Paths interpreted as relative to the current working directory. e.g. 'a/relative/path'. |
| See also Host.getcwd(). |
| |
| This method normalizes the path using os.path.normpath, so forward slashes are automatically |
| converted on Windows and callers should simply use POSIX-style paths. |
| |
| Parameters: |
| segments A list of path segments. |
| |
| Returns: |
| A string containing the absolute path. |
| """ |
| assert segments, 'No path segments provided.' |
| if segments[0].startswith('//'): |
| joined = os.path.join( |
| self.fuchsia_dir, segments[0][2:], *segments[1:]) |
| else: |
| joined = os.path.join(*segments) |
| if not os.path.isabs(joined): |
| joined = os.path.join(self.host.getcwd(), joined) |
| return os.path.normpath(joined) |
| |
| def srcpath(self, label_or_path): |
| """Returns a GN source-absolute path for a label, like GN's `get_label_info(..., "dir")`. |
| |
| This method can handle the same types of path inputs as `abspath`, plus one more: |
| 4. GN relative labels, i.e. labels without a leading '//'. The specific GN target is removed |
| and the remainder is treated as relative to the working directory, similar to how GN |
| behaves. |
| |
| As with `abspath`, callers should simply use forward slashes, as used by both POSIX-style |
| and GN-style paths. |
| |
| Parameters: |
| label_or_path A string containing either a filesystem path or GN label. |
| |
| Returns: |
| A string containing the source-absolute path, that is one beginning with '//'. |
| """ |
| joined = self.abspath(label_or_path.split(':')[0]) |
| if not joined.startswith(self.fuchsia_dir): |
| self.host.error( |
| '{} is not a path in the source tree.'.format(joined)) |
| return '//' + os.path.relpath(joined, self.fuchsia_dir).replace( |
| os.sep, '/') |
| |
| def find_device(self, device_name=None): |
| """Returns the IPv6 address for a device.""" |
| cmd = [ |
| self.abspath(self.fuchsia_dir, '.jiri_root/bin/fx'), 'ffx', |
| 'target', 'list', '--format', 'a' |
| ] |
| if device_name: |
| cmd += [device_name] |
| try: |
| addrs = self.host.create_process(cmd).check_output().strip() |
| except subprocess.CalledProcessError as err: |
| return self._find_device_by_list_devices(device_name) |
| |
| if not addrs: |
| self.host.error('Unable to find device.', 'Try "fx set-device".') |
| |
| addrs = addrs.split('\n') |
| if len(addrs) != 1: |
| self.host.error('Multiple devices found.', 'Try "fx set-device".') |
| return addrs[0] |
| |
| def _find_device_by_list_devices(self, device_name=None): |
| """ Find a device address by using fx list-devices |
| |
| Attributes: |
| device_name: The name of the device to match to. If not present, |
| will only suceed if there is exactly 1 device. |
| |
| Returns: |
| The device address. |
| """ |
| cmd = [ |
| self.abspath(self.fuchsia_dir, '.jiri_root/bin/fx'), 'list-devices' |
| ] |
| list_devices_out = self.host.create_process(cmd).check_output().strip() |
| if not list_devices_out: |
| self.host.error('Unable to find device.', 'Try "fx set-device".') |
| |
| # Try parsing the returned values |
| devices = list_devices_out.split('\n') |
| |
| if devices: |
| if not device_name: |
| if len(devices) != 1: |
| self.host.error( |
| 'Multiple devices found.', 'Try "fx set-device".') |
| |
| return devices[0].split(' ', 1)[0] |
| |
| # Find the matching device name |
| for device in devices: |
| addr, name = device.split(' ', 1) |
| if name == device_name: |
| return addr |
| |
| self.host.error('Unable to find device.', 'Try "fx set-device".') |
| |
| def symbolize(self, raw, json_output=None): |
| """Symbolizes backtraces in a log file using the current build. |
| |
| Attributes: |
| raw: Bytes representing unsymbolized lines. |
| json_output: If present, outputs trigger information to the specified file. |
| |
| Returns: |
| Bytes representing symbolized lines. |
| """ |
| cmd = [self.symbolizer_exec] |
| for build_id_dir in self.build_id_dirs: |
| cmd += ['--build-id-dir', build_id_dir] |
| if json_output: |
| cmd += ['--json-output', json_output] |
| process = self.host.create_process(cmd) |
| process.stdin = subprocess.PIPE |
| process.stdout = subprocess.PIPE |
| popened = process.popen() |
| out, _ = popened.communicate(raw) |
| if popened.returncode != 0: |
| out = '' |
| return re.sub(r'[0-9\[\]\.]*\[klog\] INFO: ', '', out) |
| |
| def testsharder(self, executable_url, out_dir, realm_label=None): |
| """Shards the available tests into _one_ test shard per environment for |
| use by testrunner. |
| |
| Attributes: |
| executable_url: The fuchsia pkg url of the test to generate coverage for. |
| out_dir: The output directory into which to write the sharded tests file. |
| |
| Returns: |
| The path to a generated file containing exactly _one_ test definition |
| as extracted from the tests.json file by testsharder. |
| """ |
| sharder_out = os.path.join(out_dir, 'testsharder_out.json') |
| # Generate the tests but force 1 shard maximum |
| cmd = [os.path.join(self.build_dir, 'host_x64', 'testsharder')] \ |
| + ['-build-dir', self.build_dir] \ |
| + ['-max-shards-per-env', '1'] \ |
| + ['-output-file', sharder_out] |
| if realm_label: |
| cmd = cmd + ['-realm-label', realm_label] |
| self.host.create_process(cmd).check_call() |
| with self.host.open(sharder_out) as f: |
| sharded = json.load(f) |
| |
| # Depending on the set of tests included we could get multiple shards. |
| # One per environment, but we should only have 1 AEMU* shard. |
| found_aemu = False |
| all_tests = [] |
| for shard in sharded: |
| if shard['name'].startswith('AEMU'): |
| if found_aemu: |
| self.host.error( |
| 'Expected a single AEMU shard, but got more than one.') |
| found_aemu = True |
| |
| test_out = os.path.join( |
| out_dir, 'shard_{}_tests.json'.format(shard['name'])) |
| all_tests = [ |
| t for t in shard['tests'] if t.get('name') == executable_url |
| ] |
| |
| if not found_aemu: |
| self.host.error('Unable to find any tests for AEMU shards.') |
| |
| if not all_tests: |
| self.host.error('Found no matching tests to run.') |
| |
| self.host.echo( |
| 'Found {} tests to generate coverage report for.'.format( |
| len(all_tests))) |
| |
| with self.host.open(test_out, 'w') as out_file: |
| json.dump(all_tests, out_file) |
| return test_out |
| |
| def testrunner(self, shard_file, out_dir, device): |
| """Runs testrunner over a file of tests generated by testsharder which |
| generates the artifacts used for SBCC. |
| Additionally, dumps logs of associated pids to an output file for symbolizing. |
| |
| Attributes: |
| shard_file: A fully qualified path to the sharded tests file. |
| out_dir: The output directory into which to write the log dump. |
| device: Reference to the device to get logs from. |
| |
| Returns: |
| The path to the testrunner output. |
| The path to the log dump file. |
| """ |
| if not self.host.isfile(shard_file): |
| self.host.error( |
| 'Unable to find sharded test file at {}.'.format(shard_file)) |
| |
| runner_out_dir = os.path.join(out_dir, 'testrunner_out') |
| cmd = [os.path.join(self.build_dir, 'host_x64', 'testrunner')] \ |
| + ['-out-dir', runner_out_dir] \ |
| + ['-use-runtests', '-per-test-timeout', '600s'] \ |
| + [shard_file] |
| out = self.host.create_process(cmd).check_output() |
| # fxb/64774 Catch a timeout error and check that the device addr is correct |
| |
| # Look for the marker log in the testrunner output |
| pids = [] |
| for line in out.splitlines(): |
| if 'Fuzzer built as test' in line: |
| parts = line.split('][') |
| if len(parts) > 2: |
| pids.append(int(parts[1])) |
| if len(pids) < 1: |
| self.host.error('Unable to find a matching test fuzzer pid.') |
| raw = device.dump_log('--pid', str(pids[0])) |
| |
| self.host.mkdir(os.path.join(out_dir, 'log_dumps')) |
| # Strip the .json suffix from the end of the shard_file name. |
| shard_file_basename = os.path.basename(shard_file)[:-5] |
| log_dump_out = os.path.join( |
| out_dir, 'log_dumps', 'dump_{}'.format(shard_file_basename)) |
| with self.host.open(log_dump_out, 'w') as out_file: |
| out_file.write(raw) |
| |
| return runner_out_dir, log_dump_out |
| |
| def covargs(self, runner_dir, symbolize_file, out_dir): |
| """Runs covargs to generate a SBCC given a testrunner output and symbolize file. |
| |
| Attributes: |
| runner_dir: The path to the testrunner output which contains a summary.json file. |
| symbolize_file: The path to the symbolized profile json output. |
| out_dir: The output directory into which to write the coverage report |
| |
| Returns: |
| The path to the generated coverage report. |
| """ |
| summary_json_file = os.path.join(runner_dir, 'summary.json') |
| if not self.host.isfile(summary_json_file): |
| self.host.error( |
| 'Unable to find summary.json file at {}.'.format( |
| summary_json_file)) |
| if not self.host.isfile(symbolize_file): |
| self.host.error( |
| 'Unable to find symbolize file at {}.'.format(symbolize_file)) |
| |
| coverage_path = os.path.join(out_dir, 'covargs_out') |
| cmd = [os.path.join(self.build_dir, 'host_x64', 'covargs')] \ |
| + ['-llvm-cov', self.llvm_cov] \ |
| + ['-llvm-profdata', self.llvm_profdata] \ |
| + ['-summary', summary_json_file] \ |
| + ['-symbolize-dump', symbolize_file] \ |
| + ['-output-dir', coverage_path] |
| for build_id_dir in self.build_id_dirs: |
| cmd += ['-build-id-dir', build_id_dir] |
| |
| self.host.create_process(cmd).check_call() |
| |
| return coverage_path |