blob: 1487d30e9424a44daf4978c7f783131e64823eb1 [file] [log] [blame]
#!/usr/bin/env fuchsia-vendored-python
# Copyright 2023 The Fuchsia Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Check a command and its outputs for leaks of the output directory.
This script is both a library and standalone binary.
The build output directory is inferred as the relative path from
the project root to the working directory.
Reject any occurrences of the output dir:
1) in the command's tokens
2) in the output files' paths
3) in the output files' contents
Usage:
$0 [options...] [outputs...] -- command...
"""
import argparse
import io
import mmap
import os
import re
import subprocess
import sys
import fuchsia
import cl_utils
from pathlib import Path
from typing import Iterable, Sequence
_SCRIPT_BASENAME = Path(__file__).name
PROJECT_ROOT = fuchsia.project_root_dir()
# This is a known path where remote execution occurs.
# This should only be used for workarounds as a last resort.
_REMOTE_PROJECT_ROOT = Path('/b/f/w')
def error_msg(text: str, label: str = None):
label_text = f'[{label}]' if label else ''
print(f'[{_SCRIPT_BASENAME}]{label_text}: Error: {text}')
def _main_arg_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
"Scan a command for output-dir leaks.",
argument_default=[],
)
parser.add_argument(
"--label",
type=str,
default=None,
help="Build system identifier for a particular action.",
)
parser.add_argument(
"--execute",
default=True,
action="store_true",
help="Execute the command and scan its outputs.",
)
parser.add_argument(
"--no-execute",
dest="execute",
action="store_false",
help="Run only pre-flight checks, and do not execute.",
)
# Positional args are the outputs of the command to scan.
parser.add_argument(
"outputs",
nargs="*",
type=Path,
help="Outputs to scan for leaks after execution.",
)
return parser
_MAIN_ARG_PARSER = _main_arg_parser()
def _whole_word_pattern(pattern: str) -> str:
boundary = r'\b'
left = '' if pattern.startswith(boundary) else boundary
right = '' if pattern.endswith(boundary) else boundary
return left + pattern + right
def _literal_dot_pattern(pattern: str) -> str:
return pattern.replace('.', r'\.')
class PathPattern(object):
"""Represents a path that is used for pattern searching."""
def __init__(self, path: Path):
"""Constructor.
Args:
path: path, which may be relative or absolute, but not != '.'.
"""
self._text = str(path)
if self._text == '.':
raise ValueError(
f'You should skip PathPattern checks when path is just "{self._text}"'
)
# match whole-word only
pattern = _whole_word_pattern(_literal_dot_pattern(self._text))
self._re_text = re.compile(pattern)
self._re_bin = re.compile(pattern.encode())
@property
def text(self) -> str:
return self._text
@property
def re_text(self) -> re.Pattern:
return self._re_text
@property
def re_bin(self) -> re.Pattern:
return self._re_bin
def __eq__(self, other) -> bool:
# equivalence of compiled re is implied
return self.text == other.text
# define for easy mocking
def _open_read_text(f) -> io.TextIOBase:
return open(f, 'rt')
def _open_read_binary(f) -> io.RawIOBase:
return open(f, 'rb', 0)
def file_contains_subpath(
f: Path,
subpath: PathPattern,
) -> bool:
"""Detect if a subpath string appears in a file.
Args:
f: file to scan
subpath: path pattern to match
Returns:
True if the file's contents contains the subpath.
"""
if not f.exists():
return False
# Ignore directory outputs for now.
if f.is_dir():
return False
# Don't know whether file is binary or text.
try: # Try text first.
with _open_read_text(f) as lines:
for line in lines: # read one line at a time
if subpath.re_text.search(line):
return True # stop at first match
except UnicodeDecodeError:
# Open as binary.
# In case file is large, use mmap() to avoid loading the entire
# contents into memory.
with _open_read_binary(f) as binary_file:
s = mmap.mmap(binary_file.fileno(), 0, access=mmap.ACCESS_READ)
# Note: This matches partial words, which risks flagging
# false positives.
if subpath.re_bin.search(s):
return True
return False
def paths_with_build_dir_leaks(paths: Iterable[Path],
pattern: re.Pattern) -> Iterable[Path]:
for path in paths:
if pattern.search(str(path)):
yield path
def _c_compiler_flag_expects_abspath(tok: str) -> bool:
"""The following gcc/clang flags remap paths, and expect an absolute
self-path as option arguments. Commands will still work remotely,
but won't cache across build environments."""
flag, sep, value = tok.partition('=')
if sep != '=':
return False
return flag in {
'-fdebug-prefix-map',
'-ffile-prefix-map',
'-fmacro-prefix-map',
'-fcoverage-prefix-map',
}
def tokens_with_build_dir_leaks(command: Iterable[str],
pattern: re.Pattern) -> Iterable[str]:
# TODO: lex --KEY=VALUE tokens into parts and match against the parts
for token in command:
if pattern.search(token):
if _c_compiler_flag_expects_abspath(token):
continue
yield token
def preflight_checks(
paths: Iterable[Path],
command: Iterable[str],
pattern: PathPattern,
label: str = None,
) -> int:
"""Checks output paths and command for build dir leaks."""
exit_code = 0
output_path_leaks = list(paths_with_build_dir_leaks(paths, pattern.re_text))
if output_path_leaks:
for f in output_path_leaks:
error_msg(
f"""Output path '{f}' contains '{pattern.text}'.
Adding rebase_path(..., root_build_dir) in GN may fix this to be relative.
If this command requires an absolute path, mark this action in GN with
'no_output_dir_leaks = false'.""",
label=label)
exit_code = 1
token_path_leaks = list(
tokens_with_build_dir_leaks(command, pattern.re_text))
for tok in token_path_leaks:
error_msg(
f"""Command token '{tok}' contains '{pattern.text}'.
Adding rebase_path(..., root_build_dir) in GN may fix this to be relative.
If this command requires an absolute path, mark this action in GN with
'no_output_dir_leaks = false'.""",
label=label)
exit_code = 1
return exit_code
def postflight_checks(
outputs: Iterable[Path],
subpath: PathPattern,
label: str = None,
):
exit_code = 0
# Command succeeded, scan its declared outputs.
for f in outputs:
if file_contains_subpath(f, subpath):
error_msg(
f"""Output file {f} contains '{subpath.text}'.
If this cannot be fixed in the tool, mark this action in GN with
'no_output_dir_leaks = false'.""",
label=label)
exit_code = 1
return exit_code
def scan_leaks(argv: Sequence[str], exec_root: Path, working_dir: Path) -> int:
"""Scan a command and its parameters for leaks of the build dir.
Leaks of the build-dir in commands and their output artifacts
are harmful to action caching.
TODO(http://fxbug.dev/92670): accept additional patterns to scan,
e.g. 'set_by_reclient/...'
Args:
argv: script args, '--', then command
exec_root: The path to the project root, inside which all build
command are invoked.
working_dir: The working dir where a command is to be run.
Returns:
0 for success, non-zero if any errors (including the command) occurred.
"""
try:
ddash = argv.index('--')
except ValueError:
error_msg("Required '--' is missing.")
_MAIN_ARG_PARSER.parse_args(['--help'])
return 1
script_args = argv[:ddash]
command = argv[ddash + 1:]
main_args = _MAIN_ARG_PARSER.parse_args(script_args)
label = main_args.label
build_subdir = cl_utils.relpath(working_dir, start=exec_root)
pre_scan_exit_code = 0
if str(build_subdir) != '.':
path_pattern = PathPattern(build_subdir)
pre_scan_exit_code = preflight_checks(
paths=main_args.outputs,
command=command,
pattern=path_pattern,
label=label)
if not main_args.execute:
return pre_scan_exit_code
# Invoke the original command.
command_exit_code = subprocess.call(
cl_utils.auto_env_prefix_command(command), cwd=working_dir)
if command_exit_code != 0:
return command_exit_code
if str(build_subdir) == '.': # nothing to check
return command_exit_code
# Command succeeded, scan its declared outputs.
post_scan_exit_code = postflight_checks(
main_args.outputs, path_pattern, label=label)
# return code reflects success of command and success of scans
scan_exit_code = pre_scan_exit_code or post_scan_exit_code
if scan_exit_code != 0:
error_msg(
"(See http://go/remotely-cacheable for more information.)",
label=label)
return scan_exit_code
def main(argv: Sequence[str]) -> int:
return scan_leaks(
argv,
exec_root=PROJECT_ROOT,
working_dir=Path(os.curdir).absolute(),
)
if __name__ == "__main__":
sys.exit(main(sys.argv[1:]))