blob: 95431ce1f98eaf62af3c1ea26f3387e89381e026 [file] [log] [blame]
#!/usr/bin/env fuchsia-vendored-python
# Copyright 2023 The Fuchsia Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Check a command and its outputs for leaks of the output directory.
This script is both a library and standalone binary.
The build output directory is inferred as the relative path from
the project root to the working directory.
Reject any occurrences of the output dir:
1) in the command's tokens
2) in the output files' paths
3) in the output files' contents
$0 [options...] [outputs...] -- command...
import argparse
import io
import mmap
import os
import re
import subprocess
import sys
import fuchsia
import cl_utils
from pathlib import Path
from typing import Any, Iterable, Sequence
_SCRIPT_BASENAME = Path(__file__).name
PROJECT_ROOT = fuchsia.project_root_dir()
# This is a known path where remote execution occurs.
# This should only be used for workarounds as a last resort.
_REMOTE_PROJECT_ROOT = Path("/b/f/w")
def error_msg(text: str, label: str | None = None) -> None:
label_text = f"[{label}]" if label else ""
print(f"[{_SCRIPT_BASENAME}]{label_text}: Error: {text}")
def _main_arg_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
"Scan a command for output-dir leaks.",
help="Build system identifier for a particular action.",
help="Execute the command and scan its outputs.",
# Positional args are the outputs of the command to scan.
help="Outputs to scan for leaks after execution.",
return parser
_MAIN_ARG_PARSER = _main_arg_parser()
def _whole_word_pattern(pattern: str) -> str:
boundary = r"\b"
left = "" if pattern.startswith(boundary) else boundary
right = "" if pattern.endswith(boundary) else boundary
return left + pattern + right
def _literal_dot_pattern(pattern: str) -> str:
return pattern.replace(".", r"\.")
class PathPattern(object):
"""Represents a path that is used for pattern searching."""
def __init__(self, path: Path):
path: path, which may be relative or absolute, but not != '.'.
self._text = str(path)
if self._text == ".":
raise ValueError(
f'You should skip PathPattern checks when path is just "{self._text}"'
# match whole-word only
pattern = _whole_word_pattern(_literal_dot_pattern(self._text))
self._re_text = re.compile(pattern)
self._re_bin = re.compile(pattern.encode())
def text(self) -> str:
return self._text
def re_text(self) -> re.Pattern[str]:
return self._re_text
def re_bin(self) -> re.Pattern[bytes]:
return self._re_bin
def __eq__(self, other: object) -> bool:
if not isinstance(other, PathPattern):
return False
# equivalence of compiled re is implied
return self.text == other.text
# define for easy mocking
def _open_read_text(f: Path) -> io.TextIOBase:
return open(f, "rt")
def _open_read_binary(f: Path) -> io.RawIOBase:
return open(f, "rb", 0)
def file_contains_subpath(
f: Path,
subpath: PathPattern,
) -> bool:
"""Detect if a subpath string appears in a file.
f: file to scan
subpath: path pattern to match
True if the file's contents contains the subpath.
if not f.exists():
return False
# Ignore directory outputs for now.
if f.is_dir():
return False
# Don't know whether file is binary or text.
try: # Try text first.
with _open_read_text(f) as lines:
for line in lines: # read one line at a time
return True # stop at first match
except UnicodeDecodeError:
# Open as binary.
# In case file is large, use mmap() to avoid loading the entire
# contents into memory.
with _open_read_binary(f) as binary_file:
s = mmap.mmap(binary_file.fileno(), 0, access=mmap.ACCESS_READ)
# Note: This matches partial words, which risks flagging
# false positives.
return True
return False
def paths_with_build_dir_leaks(
paths: Iterable[Path], pattern: re.Pattern[str]
) -> Iterable[Path]:
for path in paths:
yield path
def _c_compiler_flag_expects_abspath(tok: str) -> bool:
"""The following gcc/clang flags remap paths, and expect an absolute
self-path as option arguments. Commands will still work remotely,
but won't cache across build environments."""
flag, sep, value = tok.partition("=")
if sep != "=":
return False
return flag in {
def tokens_with_build_dir_leaks(
command: Iterable[str], pattern: re.Pattern[str]
) -> Iterable[str]:
# TODO: lex --KEY=VALUE tokens into parts and match against the parts
for token in command:
if _c_compiler_flag_expects_abspath(token):
yield token
def preflight_checks(
paths: Iterable[Path],
command: Iterable[str],
pattern: PathPattern,
label: str | None = None,
) -> int:
"""Checks output paths and command for build dir leaks."""
exit_code = 0
output_path_leaks = list(paths_with_build_dir_leaks(paths, pattern.re_text))
if output_path_leaks:
for f in output_path_leaks:
f"""Output path '{f}' contains '{pattern.text}'.
Adding rebase_path(..., root_build_dir) in GN may fix this to be relative.
If this command requires an absolute path, mark this action in GN with
'no_output_dir_leaks = false'.""",
exit_code = 1
token_path_leaks = list(
tokens_with_build_dir_leaks(command, pattern.re_text)
for tok in token_path_leaks:
f"""Command token '{tok}' contains '{pattern.text}'.
Adding rebase_path(..., root_build_dir) in GN may fix this to be relative.
If this command requires an absolute path, mark this action in GN with
'no_output_dir_leaks = false'.""",
exit_code = 1
return exit_code
def postflight_checks(
outputs: Iterable[Path],
subpath: PathPattern,
label: str | None = None,
) -> int:
exit_code = 0
# Command succeeded, scan its declared outputs.
for f in outputs:
if file_contains_subpath(f, subpath):
f"""Output file {f} contains '{subpath.text}'.
If this cannot be fixed in the tool, mark this action in GN with
'no_output_dir_leaks = false'.""",
exit_code = 1
return exit_code
def scan_leaks(argv: Sequence[str], exec_root: Path, working_dir: Path) -> int:
"""Scan a command and its parameters for leaks of the build dir.
Leaks of the build-dir in commands and their output artifacts
are harmful to action caching.
TODO( accept additional patterns to scan,
e.g. 'set_by_reclient/...'
argv: script args, '--', then command
exec_root: The path to the project root, inside which all build
command are invoked.
working_dir: The working dir where a command is to be run.
0 for success, non-zero if any errors (including the command) occurred.
ddash = argv.index("--")
except ValueError:
error_msg("Required '--' is missing.")
return 1
script_args = argv[:ddash]
command = argv[ddash + 1 :]
main_args = _MAIN_ARG_PARSER.parse_args(script_args)
label = main_args.label
build_subdir = cl_utils.relpath(working_dir, start=exec_root)
pre_scan_exit_code = 0
if str(build_subdir) != ".":
path_pattern = PathPattern(build_subdir)
pre_scan_exit_code = preflight_checks(
if not main_args.execute:
return pre_scan_exit_code
# Invoke the original command.
command_exit_code =
cl_utils.auto_env_prefix_command(list(command)), cwd=working_dir
if command_exit_code != 0:
return command_exit_code
if str(build_subdir) == ".": # nothing to check
return command_exit_code
# Command succeeded, scan its declared outputs.
post_scan_exit_code = postflight_checks(
main_args.outputs, path_pattern, label=label
# return code reflects success of command and success of scans
scan_exit_code = pre_scan_exit_code or post_scan_exit_code
if scan_exit_code != 0:
"(See http://go/remotely-cacheable for more information.)",
return scan_exit_code
def main(argv: Sequence[str]) -> int:
return scan_leaks(
if __name__ == "__main__":