| # Copyright 2025 The Fuchsia Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| """Common utilities supporting Bazel query and build actions. |
| |
| This module will be imported directly from other scripts, including some in |
| this directory, as well as ones in //build/api/. Its purpose is to provide |
| utility functions to perform Bazel operations in the Fuchsia workspace. It should |
| not depend on any other non-standard module, and the logic to create that workspace |
| should stay in workspace_utils.bzl instead. |
| """ |
| |
| import dataclasses |
| import errno |
| import hashlib |
| import json |
| import os |
| import shlex |
| import subprocess |
| import sys |
| import time |
| import typing as T |
| from collections.abc import Sequence |
| from pathlib import Path |
| |
| # A type that describes either a path string of a Path instance. |
| FilePath: T.TypeAlias = str | os.PathLike[T.Any] |
| |
| |
| # LINT.IfChange(bazel_topdir_config_file) |
| _BAZEL_TOPDIR_CONFIG_FROM_FUCHSIA_DIR = "build/bazel/config/bazel_top_dir" |
| # LINT.ThenChange(//build/bazel/bazel_workspace.gni:bazel_topdir_config_file) |
| |
| |
| def get_host_platform() -> str: |
| """Return host platform name, following Fuchsia conventions.""" |
| if sys.platform == "linux": |
| return "linux" |
| elif sys.platform == "darwin": |
| return "mac" |
| else: |
| return os.uname().sysname |
| |
| |
| def get_host_arch() -> str: |
| """Return host CPU architecture, following Fuchsia conventions.""" |
| host_arch = os.uname().machine |
| if host_arch == "x86_64": |
| return "x64" |
| elif host_arch.startswith(("armv8", "aarch64")): |
| return "arm64" |
| else: |
| return host_arch |
| |
| |
| def get_host_tag() -> str: |
| """Return host tag, following Fuchsia conventions.""" |
| return "%s-%s" % (get_host_platform(), get_host_arch()) |
| |
| |
| def find_fuchsia_dir(from_path: T.Optional[FilePath] = None) -> Path: |
| """Find the Fuchsia checkout from a specific path. |
| |
| Args: |
| from_path: Optional starting path for search. Defaults to the current directory. |
| Returns: |
| Path to the Fuchsia checkout directory (absolute). |
| Raises: |
| ValueError if the path could not be found. |
| """ |
| start_path = Path(from_path).resolve() if from_path else Path.cwd() |
| cur_path = start_path |
| while True: |
| if (cur_path / ".jiri_manifest").exists(): |
| return cur_path |
| prev_path = cur_path |
| cur_path = cur_path.parent |
| if cur_path == prev_path: |
| raise ValueError( |
| f"Could not find Fuchsia checkout directory from: {start_path}" |
| ) |
| |
| |
| def find_fx_build_dir(fuchsia_dir: FilePath) -> T.Optional[Path]: |
| """Find the build directory set through 'fx set' or 'fx use'. |
| |
| Args: |
| fuchsia_dir: Path to Fuchsia checkout directory. |
| Returns: |
| Path to build directory if found, of None if none |
| is available (e.g. fresh checkout or infra build). |
| """ |
| fuchsia_dir_path = Path(fuchsia_dir) |
| fx_build_dir_file = fuchsia_dir_path / ".fx-build-dir" |
| if fx_build_dir_file.exists(): |
| build_dir_relative = fx_build_dir_file.read_text().strip() |
| if build_dir_relative: |
| build_dir = fuchsia_dir_path / build_dir_relative |
| if build_dir.exists(): |
| return build_dir |
| return None |
| |
| |
| def find_host_binary_path(program: str) -> T.Optional[Path]: |
| """Find the absolute path of a given program. Like the UNIX `which` command. |
| |
| Args: |
| program: Program name. |
| Returns: |
| program's absolute path, found by parsing the content of $PATH. |
| Or None if nothing is found. |
| """ |
| for path in os.environ.get("PATH", "").split(":"): |
| # According to Posix, an empty path component is equivalent to ยด.'. |
| if path == "" or path == ".": |
| path = os.getcwd() |
| candidate = os.path.realpath(os.path.join(path, program)) |
| if os.path.isfile(candidate) and os.access( |
| candidate, os.R_OK | os.X_OK |
| ): |
| return Path(candidate) |
| |
| return None |
| |
| |
| def get_bazel_relative_topdir(fuchsia_dir: FilePath) -> tuple[str, set[Path]]: |
| """Return Bazel topdir, relative to Ninja output dir. |
| |
| Args: |
| fuchsia_dir: Fuchsia source directory path. |
| Returns: |
| A (topdir, input_files) pair, where input_files is a set of Path |
| values corresponding to the file(s) read by this function. |
| """ |
| input_file = Path(fuchsia_dir) / _BAZEL_TOPDIR_CONFIG_FROM_FUCHSIA_DIR |
| assert input_file.exists(), f"Missing input file: {input_file}" |
| return input_file.read_text().strip(), {input_file} |
| |
| |
| def find_bazel_launcher_path( |
| fuchsia_dir: FilePath, build_dir: FilePath |
| ) -> T.Optional[Path]: |
| """Find the path of the Bazel launcher script. |
| |
| Args: |
| fuchsia_dir: Path to Fuchsia checkout directory. |
| build_dir: Path to Fuchsia build directory. |
| |
| Returns: |
| Path to bazel launcher script, or empty Path() value if the file |
| does not exist. |
| """ |
| bazel_topdir, _ = get_bazel_relative_topdir(fuchsia_dir) |
| result = Path(build_dir) / bazel_topdir / "bazel" |
| return result if result.exists() else None |
| |
| |
| def find_bazel_workspace_path( |
| fuchsia_dir: FilePath, build_dir: FilePath |
| ) -> T.Optional[Path]: |
| """Find the path of the Bazel workspace. |
| |
| Args: |
| fuchsia_dir: Path to Fuchsia checkout directory. |
| build_dir: Path to Fuchsia build directory. |
| |
| Returns: |
| Path to bazel workspace, or None if the directory does not exists. |
| """ |
| bazel_topdir, _ = get_bazel_relative_topdir(fuchsia_dir) |
| result = Path(build_dir) / bazel_topdir / "workspace" |
| return result if result.exists() else None |
| |
| |
| def force_symlink(dst_path: FilePath, target_path: FilePath) -> None: |
| """Create a symlink at |dst_path| that points to |target_path|. |
| |
| The generated symlink target will always be a relative path. |
| |
| Args: |
| dst_path: path to symlink file to write or update. |
| target_path: path to actual symlink target. |
| """ |
| dst_dir = os.path.dirname(dst_path) |
| target_path = os.path.relpath(target_path, dst_dir) |
| return force_raw_symlink(dst_path, target_path) |
| |
| |
| def force_raw_symlink(dst_path: FilePath, target_path: FilePath) -> None: |
| """Create a symlink at |dst_path| that points to |target_path|. |
| |
| The generated symlink target will always be the unmodified |target_path| |
| value, which can be absolute or relative, and/or point to a |
| non-existing file. |
| |
| Args: |
| dst_path: path to symlink file to write or update. |
| target_path: raw symlink target path. |
| """ |
| if os.path.lexists(dst_path): |
| os.remove(dst_path) # Remove previous symlink if it exists. |
| dst_dir = os.path.dirname(dst_path) |
| os.makedirs(dst_dir, exist_ok=True) |
| try: |
| os.symlink(target_path, dst_path) |
| except OSError as e: |
| if e.errno == errno.EEXIST: |
| os.remove(dst_path) |
| os.symlink(target_path, dst_path) |
| else: |
| raise |
| |
| |
| def force_symlink_to_ninja_artifact( |
| link_path: FilePath, target_path: FilePath |
| ) -> None: |
| """Create a symlink pointing to a Ninja artifact. |
| |
| If the target path doesn't exist, create it with a timestamp of 0. |
| This ensures that Bazel doesn't error when performing a query over the |
| repository, even if the target artifact hasn't been built yet. |
| |
| Bazel builds still require a Ninja invocation, which will overwrite the |
| target with an up-to-date artifact on the next invocation. |
| |
| Args: |
| link_path: Path to the symlink to create. |
| target_path: Path to the target artifact. |
| """ |
| force_symlink(link_path, target_path) |
| if not os.path.exists(target_path): |
| os.makedirs(os.path.dirname(target_path), exist_ok=True) |
| with open(target_path, "w") as f: |
| f.write("") |
| os.utime(target_path, (0, 0)) |
| |
| |
| _HEXADECIMAL_SET = set("0123456789ABCDEFabcdef") |
| |
| |
| def is_hexadecimal_string(s: str) -> bool: |
| """Return True if input string only contains hexadecimal characters.""" |
| return bool(s) and all([c in _HEXADECIMAL_SET for c in s]) |
| |
| |
| _BUILD_ID_PREFIX = ".build-id/" |
| |
| |
| def is_likely_build_id_path(path: str) -> bool: |
| """Return True if path is a .build-id/xx/yyyy* file name.""" |
| # Look for .build-id/XX/ where XX is an hexadecimal string. |
| pos = path.find(_BUILD_ID_PREFIX) |
| if pos < 0: |
| return False |
| |
| if pos > 0 and path[pos - 1] != "/": |
| return False |
| |
| path = path[pos + len(_BUILD_ID_PREFIX) :] |
| if len(path) < 3 or path[2] != "/": |
| return False |
| |
| return is_hexadecimal_string(path[0:2]) |
| |
| |
| def is_likely_content_hash_path(path: str) -> bool: |
| """Return True if file path is likely based on a content hash. |
| |
| Args: |
| path: File path |
| Returns: |
| True if the path is a likely content-based file path, False otherwise. |
| """ |
| # Look for .build-id/XX/ where XX is an hexadecimal string. |
| if is_likely_build_id_path(path): |
| return True |
| |
| # Look for a long hexadecimal sequence filename. |
| filename = os.path.basename(path) |
| return len(filename) >= 16 and is_hexadecimal_string(filename) |
| |
| |
| assert is_likely_content_hash_path("/src/.build-id/ae/23094.so") |
| assert not is_likely_content_hash_path("/src/.build-id/log.txt") |
| |
| |
| @dataclasses.dataclass(frozen=True) |
| class BazelBuildInvocation: |
| """Models a single `bazel build` invocation from the Fuchsia build.""" |
| |
| bazel_targets: list[str] # Bazel target labels, cannot be empty. |
| |
| # Command-line arguments for build configuration. Can be empty. |
| build_args: list[str] = dataclasses.field(default_factory=list) |
| |
| gn_label: str | None = None # Optional GN label for the corresponding bazel_action() target, if any. |
| |
| gn_targets_dir: str | None = None |
| # Optional path to @gn_targets repository content |
| |
| bazel_action_timings: dict[str, float] | None = None |
| # Optional, dictionary describing the duration of bazel_gn_target_action.py steps |
| # keys are step names, values are durations in seconds. |
| |
| def __post_init__(self) -> None: |
| if not self.bazel_targets: |
| raise ValueError(f"Empty bazel_targets list") |
| |
| def to_json(self) -> dict[str, T.Any]: |
| """Convert instance to JSON object. |
| |
| Returns: |
| JSON object as a dictionary. |
| """ |
| result: dict[str, T.Any] = { |
| "bazel_targets": self.bazel_targets, |
| "build_args": self.build_args, |
| } |
| if self.gn_label: |
| result["gn_label"] = self.gn_label |
| if self.gn_targets_dir: |
| result["gn_targets_dir"] = self.gn_targets_dir |
| if self.bazel_action_timings: |
| result["bazel_action_timings"] = self.bazel_action_timings |
| return result |
| |
| @staticmethod |
| def from_json(d: dict[str, T.Any]) -> "BazelBuildInvocation": |
| """Convert JSON object into new instance value. |
| |
| Args: |
| json: A JSON object as a Python dictionary. |
| Returns: |
| new instance value. |
| Raises: |
| ValueError if the input is malformed. |
| """ |
| if not isinstance(d, dict): |
| raise ValueError(f"Input JSON is not an object: {d}") |
| try: |
| return BazelBuildInvocation( |
| bazel_targets=d["bazel_targets"], |
| build_args=d["build_args"], |
| gn_label=d.get("gn_label"), |
| gn_targets_dir=d.get("gn_targets_dir"), |
| bazel_action_timings=d.get("bazel_action_timings"), |
| ) |
| except KeyError as e: |
| raise ValueError(f"Missing JSON object key {e}") |
| except TypeError as e: |
| raise ValueError(f"Missing JSON object key {e}") |
| |
| |
| class LastBazelBuildInvocations(object): |
| """Models the list of last Bazel build invocations from the Fuchsia build. |
| |
| An `fx build` or `fint build` command will populate the file |
| at $BUILD_DIR/{LastBazelBuildInvocations.FILENAME} with details about |
| each `bazel build` command that was invoked through `bazel_action()` |
| targets. This can later be used by //build/api/client to perform |
| queries to extract related information, such as debug symbols. |
| |
| Its content is a JSON array of objects describing BazelBuildInvocation |
| instances. |
| """ |
| |
| # Name of the file created / updated by `fx build` and `fint build`. |
| # LINT.IfChange(last_bazel_build_invocations_file) |
| FILENAME = "last_bazel_build_invocations.json" |
| # LINT.ThenChange(//tools/devshell/build:last_bazel_build_invocations_file,//tools/integration/fint/build.go:last_bazel_build_invocations_file,//tools/artifactory/cmd/up.go:last_bazel_build_invocations_file) |
| |
| def __init__(self, invocations: list[BazelBuildInvocation] = []): |
| """Constructor. Do not use directly, see new_from_xxxx() methods.""" |
| self._invocations = invocations |
| |
| @property |
| def invocations(self) -> list[BazelBuildInvocation]: |
| return self._invocations |
| |
| def append(self, invocation: BazelBuildInvocation) -> None: |
| """Append new BazelBuildInvocation instance.""" |
| self._invocations.append(invocation) |
| |
| @staticmethod |
| def get_build_file_path(build_dir: FilePath) -> Path: |
| """Retrieve the path of the corresponding file in the Ninja build directory.""" |
| return Path(build_dir) / LastBazelBuildInvocations.FILENAME |
| |
| @staticmethod |
| def new_from_build(build_dir: FilePath) -> "LastBazelBuildInvocations": |
| """Create new instance from the Ninja build directory. |
| |
| Args: |
| build_dir: Path to Ninja build directory. |
| Returns: |
| new instance value. |
| Raises: |
| ValueError if file is malformed |
| """ |
| ( |
| file_path, |
| last_invocations_json, |
| ) = LastBazelBuildInvocations._load_json_from_build_dir(build_dir) |
| |
| return LastBazelBuildInvocations.new_from_json(last_invocations_json) |
| |
| @staticmethod |
| def append_to_build_dir( |
| build_dir: FilePath, invocation: BazelBuildInvocation |
| ) -> None: |
| """Append a new BazelBuildInvocation instance to the Ninja build directory |
| |
| Args: |
| build_dir: Path to the Ninja build directory. |
| invocation: BazelBuildInvocation value to add. |
| """ |
| ( |
| file_path, |
| last_invocations_json, |
| ) = LastBazelBuildInvocations._load_json_from_build_dir(build_dir) |
| |
| last_invocations_json.append(invocation.to_json()) |
| |
| with open(file_path, "wt") as f: |
| json.dump(last_invocations_json, f) |
| |
| @staticmethod |
| def _load_json_from_build_dir( |
| build_dir: FilePath, |
| ) -> tuple[Path, list[dict[str, T.Any]]]: |
| """Load JSON data from the build directory.""" |
| file_path = LastBazelBuildInvocations.get_build_file_path(build_dir) |
| with open(file_path, "rt") as f: |
| last_invocations_json = json.load(f) |
| return file_path, last_invocations_json |
| |
| def write_to_build_dir(self, build_dir: FilePath) -> None: |
| """Write instance to the Ninja build directory. |
| |
| Args: |
| build_dir: Path to Ninja build directory. |
| """ |
| file_path = self.get_build_file_path(build_dir) |
| with open(file_path, "wt") as f: |
| json.dump([i.to_json() for i in self._invocations], f) |
| |
| @staticmethod |
| def new_from_json( |
| json: list[dict[str, T.Any]], |
| ) -> "LastBazelBuildInvocations": |
| """Create new instance from JSON array |
| |
| Args: |
| json: A JSON array as a Python list. |
| Returns: |
| new instance value. |
| Raises: |
| ValueError if input is malformed. |
| """ |
| if not isinstance(json, list): |
| raise ValueError( |
| f"Input is not a JSON array, got {type(json)} instead!" |
| ) |
| |
| invocations: list[BazelBuildInvocation] = [ |
| BazelBuildInvocation.from_json(item) for item in json |
| ] |
| return LastBazelBuildInvocations(invocations) |
| |
| def to_json(self) -> list[dict[str, T.Any]]: |
| """Convert instance to JSON array value.""" |
| return [invocation.to_json() for invocation in self._invocations] |
| |
| |
| # Callable object that takes log messages as input. |
| LogFunc = T.Callable[[str], None] |
| |
| |
| class TimeProfile(object): |
| """Track duration of generation/build steps' start and end times. |
| |
| Usage is: |
| 1) Create instance. |
| |
| 2) Call start() when starting a new step. Repeat as many times as needed. |
| |
| 3) Optionally call stop() when a step has completed. Useful if some |
| unrelated work needs to happen after the next start() call. |
| |
| 4) Call print() to print a table detailing the timings of all |
| steps over a given threshold. |
| """ |
| |
| def __init__( |
| self, |
| log: None | LogFunc = None, |
| now: None | T.Callable[[], float] = None, |
| ) -> None: |
| """Constructor. |
| |
| Args: |
| log: An optional callable that can be used to print step descriptions |
| when start() is called. |
| now: An optional callable that can be used to return the current time |
| in seconds. Default is to use time.time. Only used for tests. |
| """ |
| self._now = now if now else time.time |
| self._start_time = self._now() |
| self._steps: list[tuple[float, float, str]] = [] |
| self._log = log |
| |
| def start(self, name: str, description: str = "") -> None: |
| """Start a new regeneration step (and stop the current one if any) |
| |
| Args: |
| name: Step name (used in final print() output) |
| description: Optional step description. Will be sent to the log |
| if one was provided in the constructor. |
| """ |
| if description and self._log: |
| self._log(description) |
| cur_time = self._close_last_step() |
| self._steps.append((cur_time, 0, name)) |
| |
| def stop(self) -> None: |
| """Stop the current step (record its end time).""" |
| self._close_last_step() |
| |
| def _close_last_step(self) -> float: |
| cur_time = self._now() |
| if self._steps: |
| start_time, end_time, name = self._steps[-1] |
| if end_time == 0: |
| end_time = cur_time |
| self._steps[-1] = (start_time, end_time, name) |
| return cur_time |
| |
| def to_json_timings(self) -> dict[str, float]: |
| """Generate a JSON object detailing the durtaion of each step. |
| |
| Returns: |
| A dictionary mapping step names to durations in seconds. |
| Keys are ordered according to step execution. |
| """ |
| self._close_last_step() |
| return { |
| name: end_time - start_time |
| for start_time, end_time, name in self._steps |
| } |
| |
| def print(self, short_step_threshold: float = 0.0) -> None: |
| """Print timings results for all recorded steps. |
| |
| Args: |
| short_step_threshold: A threshold in seconds. Any step |
| that was faster than this will be omitted from the |
| output. |
| """ |
| self._close_last_step() |
| if short_step_threshold: |
| print( |
| "Timing results for regeneration steps slower than %.3f seconds:" |
| % short_step_threshold |
| ) |
| else: |
| print("Timing results for all regeneration steps:") |
| for step in self._steps: |
| start_time, end_time, name = step |
| duration = end_time - start_time |
| if duration < short_step_threshold: |
| continue |
| print("%5.3fs %s" % (end_time - start_time, name)) |
| |
| |
| def log_stderr(msg: str) -> None: |
| """A LogFunc implementation that prints messages to stderr.""" |
| print(msg, file=sys.stderr) |
| |
| |
| def cmd_args_to_string(cmd_args: Sequence[FilePath]) -> str: |
| """Convert a list of command arguments to a printable string. |
| |
| Args: |
| cmd_args: A list of either strings or file paths. |
| Returns: |
| A single string representing the shell-quoted command. |
| """ |
| return " ".join(shlex.quote(str(c)) for c in cmd_args) |
| |
| |
| class BazelPaths(object): |
| """Convenience class used to access important Bazel-related paths.""" |
| |
| WORKSPACE_FROM_TOP_DIR = "workspace" |
| OUTPUT_BASE_FROM_TOP_DIR = "output_base" |
| OUTPUT_USER_ROOT_FROM_TOP_DIR = "output_user_root" |
| LAUNCHER_FROM_TOP_DIR = "bazel" |
| |
| # The name of the root Bazel workspace as it appears in ${output_base}/execroot/. |
| # When BzlMod is enabled, this is always "_main", independent from the |
| # name of the root workspace / module name. |
| WORKSPACE_NAME_FROM_EXECROOT = "_main" |
| |
| @staticmethod |
| def new( |
| fuchsia_dir: T.Optional[Path] = None, build_dir: T.Optional[Path] = None |
| ) -> "BazelPaths": |
| """Create new instance. |
| |
| Args: |
| fuchsia_dir: Optional Path to the Fuchsia source directory. |
| build_dir: Optional Path to the Ninja build directory. |
| Returns: |
| A new BazelPaths instance. |
| Raises: |
| ValueError if the fuchsia or build directories cannot be |
| properly auto-detected or do not exist. |
| """ |
| if fuchsia_dir: |
| fuchsia_dir = fuchsia_dir.resolve() |
| else: |
| fuchsia_dir = find_fuchsia_dir(build_dir) |
| |
| if build_dir: |
| build_dir = build_dir.resolve() |
| else: |
| build_dir = find_fx_build_dir(fuchsia_dir) |
| if not build_dir: |
| raise ValueError( |
| f"Could not detect current build-directory from Fuchsia directory: {fuchsia_dir}" |
| ) |
| |
| return BazelPaths(fuchsia_dir, build_dir) |
| |
| def __init__(self, fuchsia_dir: Path, build_dir: Path) -> None: |
| """Construct new instance. Requires explicit fuchsia_dir and build_dir paths.""" |
| bazel_topdir, self._input_files = get_bazel_relative_topdir(fuchsia_dir) |
| self._build_dir = build_dir.resolve() |
| self._top_dir = self._build_dir / bazel_topdir |
| self._fuchsia_dir = fuchsia_dir.resolve() |
| |
| @property |
| def top_dir(self) -> Path: |
| """Return the Bazel top directory.""" |
| return self._top_dir |
| |
| @staticmethod |
| def write_topdir_config_for_test(fuchsia_dir: Path, content: str) -> Path: |
| """Write a Bazel topdir configuration file. Useful for tests. |
| |
| Args: |
| fuchsia_dir: Path to Fuchsia source tree. |
| content: Config file content, i.e. path of the Bazel topdir relative |
| to the Ninja build directory. |
| Returns: |
| Path of the config file that was written. |
| """ |
| config_file = fuchsia_dir / _BAZEL_TOPDIR_CONFIG_FROM_FUCHSIA_DIR |
| config_file.parent.mkdir(parents=True, exist_ok=True) |
| config_file.write_text(content) |
| return config_file |
| |
| @property |
| def fuchsia_dir(self) -> Path: |
| """Return Path to the Fuchsia source directory.""" |
| return self._fuchsia_dir |
| |
| @property |
| def ninja_build_dir(self) -> Path: |
| """Return Path to the Ninja build directory.""" |
| return self._build_dir |
| |
| @property |
| def execroot(self) -> Path: |
| """Return the Bazel execroot path.""" |
| return ( |
| self._top_dir |
| / self.OUTPUT_BASE_FROM_TOP_DIR |
| / "execroot" |
| / self.WORKSPACE_NAME_FROM_EXECROOT |
| ) |
| |
| @property |
| def output_base(self) -> Path: |
| """Return Path to the Bazel output base.""" |
| return self._top_dir / self.OUTPUT_BASE_FROM_TOP_DIR |
| |
| @property |
| def workspace(self) -> Path: |
| """Return Path to the Bazel workspace root directory.""" |
| return self._top_dir / self.WORKSPACE_FROM_TOP_DIR |
| |
| @property |
| def launcher(self) -> Path: |
| """Return Path to the Bazel launcher script.""" |
| return self._top_dir / self.LAUNCHER_FROM_TOP_DIR |
| |
| @property |
| def input_files(self) -> set[Path]: |
| """Return list of input file paths that were read in the constructor. |
| |
| Useful for Ninja depfiles or //build/regenerator.py. |
| """ |
| return self._input_files |
| |
| |
| @dataclasses.dataclass |
| class CommandResult: |
| """The result of invoking CommandRunner.run_command(). |
| |
| This is similar to subprocess.CompletedProcess[str], but it doesn't |
| hold file descriptors open, and can be trivially instantiated for tests |
| or mock CommandRunner instances. |
| """ |
| |
| returncode: int = 0 |
| stdout: str = "" |
| stderr: str = "" |
| args: list[str] = dataclasses.field(default=list) # type: ignore |
| |
| |
| class CommandRunner(object): |
| """Convenience class to run commands. |
| |
| A small wrapper around subprocess.run(), which allows logging invocations |
| commands and results as well as allow mock implementations for tests to |
| override the run_command_internal() method in derived classes. |
| """ |
| |
| # The following constants are convenience argument keyword dictionaries |
| # that can be expanded in run_command() calls to specify how to capture outputs. |
| # This also prevents the caller from depending on the subprocess module. |
| # Capture both stdout and stderr separately. |
| CAPTURE_KWARGS = { |
| "stdout": subprocess.PIPE, |
| "stderr": subprocess.PIPE, |
| } |
| |
| # Capture both stdout and stderr into ret.stdout, while ret.stderr will be empty. |
| CAPTURE_COMBINED_KWARGS = { |
| "stdout": subprocess.PIPE, |
| "stderr": subprocess.STDOUT, |
| } |
| |
| # Capture neither stdout nor stderr. Both ret.stdout and ret.stderr will be empty. |
| CAPTURE_NONE_KWARGS = { |
| "stdout": None, |
| "stderr": None, |
| } |
| |
| def __init__( |
| self, |
| log_cmd: None | LogFunc = None, |
| log_result: None | T.Callable[[CommandResult], None] = None, |
| ) -> None: |
| """Create instance. |
| |
| Args: |
| log_cmd: Optional LogFunc to record each invocation. |
| log_result: Optional callable used to log invocation results. |
| """ |
| self._log_cmd = log_cmd |
| self._log_result = log_result |
| |
| def run_command_internal( |
| self, cmd_args: Sequence[FilePath], **subprocess_run_kwargs: T.Any |
| ) -> CommandResult: |
| """Internal implementation for run_command(). |
| |
| Mock implementations can override this to avoid calling |
| external commands during unit-tests. |
| """ |
| # Enforce text mode by default. |
| subprocess_run_kwargs.setdefault("text", True) |
| subprocess_run_kwargs.setdefault("encoding", "utf-8") |
| ret = subprocess.run( |
| [str(c) for c in cmd_args], **subprocess_run_kwargs |
| ) |
| return CommandResult(ret.returncode, ret.stdout, ret.stderr, ret.args) |
| |
| def run_command( |
| self, |
| cmd_args: Sequence[FilePath], |
| **subprocess_run_kwargs: T.Any, |
| ) -> CommandResult: |
| """Run a command. |
| |
| Note that this sets text=True and encoding="UTF-8" by default for |
| convenience, though it can be overriden when calling the method. |
| |
| Args: |
| cmd_args: Command as a list of string or Path items. |
| **kwargs: Other arguments for subprocess.run(). |
| Returns: |
| A CommandResult value. |
| """ |
| if self._log_cmd: |
| self._log_cmd(cmd_args_to_string(cmd_args)) |
| |
| try: |
| ret = self.run_command_internal(cmd_args, **subprocess_run_kwargs) |
| except KeyboardInterrupt: |
| # As a special case, handle Ctrl-C as command failure, |
| # Using exit code 130 (i.e. 128 + SIGSGEV). This is consistent |
| # with what Ninja and other tools to, and avoids printing |
| # a huge Python stack trace when the user interrupted the |
| # build manually. |
| ret = CommandResult( |
| 130, "", "KeyboardInterrupt", [str(c) for c in cmd_args] |
| ) |
| |
| if self._log_result: |
| self._log_result(ret) |
| |
| return ret |
| |
| |
| # A MockCommandFilter is a callable that takes a list of command arguments |
| # as input, and returns a CommandResult as output. It may also raise |
| # ValueError if the input command arguments are invalid / unexpected. |
| # |
| # Note that the input is a list of strings, while CommandRunner.run_command() |
| # takes a list of FilePath values. |
| MockCommandFilter: T.TypeAlias = T.Callable[[list[str]], CommandResult] |
| |
| |
| class MockCommandRunner(CommandRunner): |
| """A mock CommandRunner that can be used in unit-tests. |
| |
| Usage is: |
| 1) Create instance. |
| |
| 2) Before running the test that uses the MockCommandRunner instance, |
| push one or more CommandResult with push_result(). At least one per |
| expected call to CommandRunner.run_command(). |
| |
| 3) After the test completes, look at self.commands and self.results to |
| see what commands and results were actually recorded. |
| """ |
| |
| def __init__(self) -> None: |
| """Create instance.""" |
| |
| self.commands: list[str] = [] |
| |
| def _log_command(cmd: str) -> None: |
| self.commands.append(cmd) |
| |
| self.results: list[CommandResult] = [] |
| |
| def _log_result(result: CommandResult) -> None: |
| self.results.append(result) |
| |
| super().__init__(log_cmd=_log_command, log_result=_log_result) |
| self._result_queue: list[CommandResult] = [] |
| self._command_filter: T.Optional[MockCommandFilter] = None |
| |
| def set_command_filter(self, command_filter: MockCommandFilter) -> None: |
| """Set a command filter. |
| |
| The command filter is called automatically by run_command_internal() |
| if there are no more results in the CommandResult FIFO (which were |
| previously added with push_result()). |
| |
| Args: |
| command_filter: A MockCommandFilter values. |
| """ |
| self._command_filter = command_filter |
| |
| @staticmethod |
| def new_command_filter_from_list( |
| cmd_list: list[tuple[str, str]], |
| ) -> MockCommandFilter: |
| """Create a command filter value that will return the given results in order. |
| |
| Args: |
| cmd_list: A list of (command, stdout) pairs, where |command| is the |
| space-separated command string, and |stdout| is the corresponding |
| stdout. |
| Returns: |
| A new MockCommandFilter value. |
| """ |
| command_index = 0 |
| |
| def _command_filter(cmd_args: list[str]) -> CommandResult: |
| nonlocal command_index |
| if command_index >= len(cmd_list): |
| raise ValueError("Too many command invocations") |
| actual_args = " ".join(arg for arg in cmd_args) |
| expected_cmd_args, stdout = cmd_list[command_index] |
| if actual_args != expected_cmd_args: |
| raise ValueError( |
| f"Unexpected command arguments (command index {command_index}:\n Actual: {actual_args}\n Expected: {expected_cmd_args}" |
| ) |
| command_index += 1 |
| return CommandResult(0, stdout, "") |
| |
| return _command_filter |
| |
| def push_result( |
| self, |
| returncode: int = 0, |
| stdout: str = "", |
| stderr: str = "", |
| ) -> None: |
| """Add one result value to the CommandResult FIFO. |
| |
| Args: |
| returncode: Optional process return code. default to 0. |
| stdout: Optional process stdout, as a string, default to empty. |
| stderr: Optional process stderr, as a string, default to empty. |
| args: Optional list of command arguments. |
| """ |
| self._result_queue.append( |
| CommandResult( |
| returncode=returncode, |
| stdout=stdout, |
| stderr=stderr, |
| ) |
| ) |
| |
| def _pop_result(self) -> CommandResult: |
| """Pop one result value from the CommandResult FIFO.""" |
| assert ( |
| self._result_queue |
| ), f"Result queue is empty, did you forget to call MockCommandRunner.push_result()" |
| result = self._result_queue[0] |
| self._result_queue = self._result_queue[1:] |
| return result |
| |
| def run_command_internal( |
| self, cmd_args: Sequence[FilePath], **subprocess_run_kwargs: T.Any |
| ) -> CommandResult: |
| """Simulate command invocation by popping one value from the FIFO. |
| |
| Args: |
| cmd_args: Command arguments, these are simply saved into |
| self.commands for later inspection. |
| **subprocess_run_kwargs: Extra subprocess.run() arguments (ignored). |
| Returns: |
| The CommandResult at the start of the FIFO. |
| Raises: |
| AssertionError if there are no results in the FIFO. |
| """ |
| result_args = [str(c) for c in cmd_args] |
| if self._command_filter: |
| result = self._command_filter(result_args) |
| else: |
| result = self._pop_result() |
| result.args = result_args |
| return result |
| |
| |
| class BazelLauncher(object): |
| """Convenience class to launch Bazel invocations. |
| |
| A small wrapper around subprocess.run(), which allows tests to override |
| the run_command() method in derived classes. |
| """ |
| |
| # Forward these for caller's convenience. |
| CAPTURE_KWARGS = CommandRunner.CAPTURE_KWARGS |
| CAPTURE_COMBINED_KWARGS = CommandRunner.CAPTURE_COMBINED_KWARGS |
| CAPTURE_NONE_KWARGS = CommandRunner.CAPTURE_NONE_KWARGS |
| |
| def __init__( |
| self, |
| launcher_script: FilePath, |
| bazel_prefix_args: Sequence[FilePath] = [], |
| runner: None | CommandRunner = None, |
| log_err: None | LogFunc = None, |
| ) -> None: |
| """Create BazelLauncher instance. |
| |
| Args: |
| launcher_script: Path to the bazel launcher script. |
| bazel_prefix_args: Optional list of command arguments that may appear before the |
| launcher script during invocation. |
| runner: An optional CommandRunner instance. If None, one will be created automatically. |
| log_err: Optional LogFunc function used to record error messages in run_xxx() method calls. |
| """ |
| self._launcher_script = launcher_script |
| self._prefix_args = bazel_prefix_args |
| self._runner = runner if runner else CommandRunner() |
| self._log_err = log_err |
| |
| @property |
| def script(self) -> Path: |
| """Return path to Bazel launcher script.""" |
| return Path(self._launcher_script) |
| |
| def run_bazel_command( |
| self, |
| bazel_args: T.Iterable[FilePath], |
| **subprocess_kwargs: T.Any, |
| ) -> CommandResult: |
| """Run a Bazel command. |
| |
| Note that this captures both standard and error outputs by default. |
| Set stdout and stderr explicitly to change this behavior. |
| |
| Args: |
| bazel_args: Bazel command-line arguments. |
| |
| **subprocess_kwargs: Extra arguments passed to subprocess.run(), see |
| CommandRunner.run_command() documentation. |
| Returns: |
| A CommandResult value. |
| """ |
| # Capture outputs by default. |
| subprocess_kwargs.setdefault("stdout", subprocess.PIPE) |
| subprocess_kwargs.setdefault("stderr", subprocess.PIPE) |
| |
| cmd_args: Sequence[FilePath] = [ |
| *self._prefix_args, |
| self.script, |
| *bazel_args, |
| ] |
| |
| ret = self._runner.run_command(cmd_args, **subprocess_kwargs) |
| if ret.returncode != 0 and self._log_err: |
| self._log_err( |
| "Error when invoking command: %s\n%s\n%s\n" |
| % (cmd_args_to_string(cmd_args), ret.stderr, ret.stdout) |
| ) |
| |
| return ret |
| |
| def run_query( |
| self, query_type: str, query_args: list[str], ignore_errors: bool |
| ) -> CommandResult: |
| """Run a Bazel query, potentially ignoring errors. |
| |
| Args: |
| query_type: Type of query ("query", "cquery" or "aquery"). |
| query_args: Other query arguments. |
| ignore_errors: Set to True to allow queries that ignore errors. |
| This adds "--keep_going" to the launched command. |
| Returns: |
| A CommandResult value. |
| """ |
| query_cmd: list[FilePath] = [] |
| query_cmd.append(query_type) |
| query_cmd.extend(query_args) |
| stderr = subprocess.PIPE |
| if ignore_errors: |
| query_cmd += ["--keep_going"] |
| stderr = subprocess.DEVNULL |
| |
| return self.run_bazel_command(query_cmd, stderr=stderr) |
| |
| |
| class MockBazelLauncher(BazelLauncher): |
| """A mock BazelLauncher instance that can be used in tests.""" |
| |
| def __init__(self) -> None: |
| """Create new instance.""" |
| self.command_runner = MockCommandRunner() |
| super().__init__(launcher_script="bazel", runner=self.command_runner) |
| |
| def push_expected_outputs(self, outputs: list[str]) -> None: |
| """Push a sequence of expected command outputs to the mock command runner.""" |
| for output in outputs: |
| self.command_runner.push_result(0, output, "") |
| |
| @staticmethod |
| def new_with_empty_outputs() -> "MockBazelLauncher": |
| """Create a new MockBazelLauncher instance with empty outputs by default.""" |
| launcher = MockBazelLauncher() |
| launcher.command_runner.set_command_filter( |
| lambda args: CommandResult(0, "", "") |
| ) |
| return launcher |
| |
| |
| class BazelQueryCache(object): |
| def __init__( |
| self, |
| cache_dir: FilePath, |
| ) -> None: |
| self._cache_dir = cache_dir |
| |
| def get_query_output( |
| self, |
| query_type: str, |
| query_args: list[str], |
| launcher: BazelLauncher, |
| log: None | LogFunc = None, |
| ) -> T.Optional[list[str]]: |
| """Run a bazel query and return its output as a series of lines. |
| |
| Args: |
| query_type: One of 'query', 'cquery' or 'aquery' |
| query_args: Extra query arguments. |
| launcher: A BazelLauncher instance. |
| log: Optional LogFunc value. If not provided, uses launcher.log. |
| |
| Returns: |
| On success, a list of output lines. On failure return None. |
| """ |
| # The result of queries does not change between incremental builds, |
| # as their outputs only depend on the shape of the Bazel graph, not |
| # the content of the artifacts. Due to this, it is possible to cache |
| # the outputs to save several seconds per bazel_action() invocation. |
| # |
| # The data is simply stored under $WORKSPACE/fuchsia_build_generated/bazel_query_cache/ |
| # which will be removed by each regenerator call, since it may change the Bazel |
| # graph dependencies, and thus the query results. |
| |
| cache_key, cache_key_args = self.compute_cache_key_and_args( |
| query_type, query_args |
| ) |
| cache_file = os.path.join(self._cache_dir, f"{cache_key}.json") |
| if os.path.exists(cache_file): |
| try: |
| with open(cache_file, "rt") as f: |
| cache_value = json.load(f) |
| assert cache_value["key_args"] == cache_key_args |
| if log: |
| log( |
| f"Found cached values for query {cache_key}: {cache_key_args}" |
| ) |
| return cache_value["output_lines"] |
| except Exception as e: |
| print( |
| f"WARNING: Error when reading cached values for query {cache_key}: {cache_key_args}:\n{e}", |
| file=sys.stderr, |
| ) |
| |
| if log: |
| query_start_time = time.time() |
| |
| ret = launcher.run_query(query_type, query_args, ignore_errors=False) |
| # Log any query errors (which may have a return code of 0), if logging enabled. |
| if log and ret.stderr: |
| log(f"Query error:\n{ret.stderr}") |
| if ret.returncode != 0: |
| return None |
| |
| result = ret.stdout.splitlines() |
| |
| # Write the result to the cache. |
| new_cache_value = { |
| "key_args": cache_key_args, |
| "output_lines": result, |
| } |
| if log: |
| log( |
| "Query took %.1f seconds for query %s" |
| % ( |
| time.time() |
| - query_start_time, # pyright: ignore[reportPossiblyUnboundVariable] |
| cache_key_args, |
| ) |
| ) |
| log(f"Writing query values to cache for query {cache_key}\n") |
| |
| os.makedirs(os.path.dirname(cache_file), exist_ok=True) |
| with open(cache_file, "wt") as f: |
| json.dump(new_cache_value, f) |
| |
| return result |
| |
| @staticmethod |
| def compute_cache_key_and_args( |
| query_type: str, query_args: list[str] |
| ) -> tuple[str, list[str]]: |
| """Compute the cache key and arguments. Exposed for tests. |
| |
| Args: |
| query_type: query type (e.g. "query", "cquery" or "aquery") |
| query_args: query arguments. |
| Returns: |
| a (cache_key, cache_key_args), where cache_key is a unique |
| hexadecimal cache key value, and cache_key_args encode the |
| input query in a human readable way. This will be stored in |
| the cache value for debugging. |
| """ |
| cache_key_args = [query_type] + query_args |
| cache_key_inputs = cache_key_args[:] |
| |
| # If "--starlark:file=FILE" or "--starlark:file FILE" is used, add the |
| # content of FILE to compute the cache key. This ensure stale cache |
| # entries are not reused when only this file changes during development. |
| starlark_file_option = "--starlark:file" |
| for n, arg in enumerate(query_args): |
| input_path = None |
| if arg == starlark_file_option and n + 1 < len(query_args): |
| # For --starlark:file FILE |
| input_path = query_args[n + 1] |
| elif arg.startswith(f"{starlark_file_option}="): |
| # For --starlark:file=FILE |
| input_path = arg[len(starlark_file_option) + 1 :] |
| if input_path: |
| with open(input_path, "rt") as f: |
| cache_key_inputs.append(f.read()) |
| |
| cache_key = hashlib.sha256( |
| repr(cache_key_inputs).encode("utf-8") |
| ).hexdigest() |
| |
| return (cache_key, cache_key_args) |