blob: d26170e7617b9aef65f1c47e84f5219beaf8c3c9 [file] [log] [blame]
#!/usr/bin/env fuchsia-vendored-python
# Copyright 2022 The Fuchsia Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Functions for working with remotetool.
This is a library and standalone executable script.
"""
import dataclasses
import difflib
import os
import shlex
import subprocess
import sys
import tempfile
from pathlib import Path
from typing import Any, Dict, Iterable, Sequence, Tuple
import fuchsia
import cl_utils
_SCRIPT_BASENAME = Path(__file__).name
_SCRIPT_DIR = Path(__file__).parent
PROJECT_ROOT = fuchsia.project_root_dir()
PROJECT_ROOT_REL = cl_utils.relpath(PROJECT_ROOT, start=os.curdir)
_REPROXY_CFG = _SCRIPT_DIR / "fuchsia-reproxy.cfg"
_HOST_REMOTETOOL = fuchsia.RECLIENT_BINDIR / "remotetool"
class ParseError(ValueError):
def __init__(self, msg: str):
super().__init__(msg)
def _must_partition_string(text: str, sep: str) -> Tuple[str, str]:
before, found_sep, after = text.partition(sep)
if found_sep != sep:
raise ParseError(
f'Expected but failed to find "{sep}" in text "{text}".'
)
return before, after
def _must_partition_sequence(
seq: Sequence[str], sep: str
) -> Tuple[Sequence[str], Sequence[str]]:
before, found_sep, after = cl_utils.partition_sequence(seq, sep)
if found_sep != sep:
raise ParseError(f'Expected but failed to find line == "{sep}".')
return before, after
def _parse_input_digest(line: str) -> Tuple[Path, str]:
path, right = _must_partition_string(line, ":")
ignored, right = _must_partition_string(right, ":")
digest = right.lstrip().rstrip("]")
return Path(path), digest
def _parse_output_digest(line: str) -> Tuple[Path, str]:
path, right = _must_partition_string(line, ",")
ignored, right = _must_partition_string(right, ":")
digest = right.lstrip()
return Path(path), digest
# TODO: move this to a library for abstract data operations
class DictionaryDiff(object):
"""Representation of a difference between dictionaries."""
def __init__(self, left: Dict[Any, Any], right: Dict[Any, Any]):
left_keys = set(left.keys())
right_keys = set(right.keys())
self._left_only = {k: left[k] for k in left_keys - right_keys}
self._right_only = {k: right[k] for k in right_keys - left_keys}
common_keys = left_keys.intersection(right_keys)
paired = [(k, (left[k], right[k])) for k in common_keys]
self._value_diffs = {k: (v[0], v[1]) for k, v in paired if v[0] != v[1]}
self._matches = {k: v[0] for k, v in paired if v[0] == v[1]}
@property
def left_only(self) -> Dict[Any, Any]:
return self._left_only
@property
def right_only(self) -> Dict[Any, Any]:
return self._right_only
@property
def value_diffs(self) -> Dict[Any, Tuple[Any, Any]]:
return self._value_diffs
@property
def matches(self) -> Dict[Any, Any]:
return self._matches
def report(self) -> Iterable[str]:
yield from (f"left only: {k}: {v}" for k, v in self.left_only.items())
yield from (f"right only: {k}: {v}" for k, v in self.right_only.items())
yield from (
f"different values: {k}: {v1} vs. {v2}"
for k, (v1, v2) in self.value_diffs.items()
)
@dataclasses.dataclass
class ActionDifferences(object):
command_unified_diffs: Sequence[str]
input_diffs: DictionaryDiff
platform_diffs: DictionaryDiff
@dataclasses.dataclass
class ShowActionResult(object):
"""Structured representation of `remotetool --operation show_action`."""
command: Sequence[str]
platform: Dict[str, str]
inputs: Dict[Path, str]
output_files: Dict[Path, str]
def __eq__(self, other) -> bool:
return (
self.command == other.command
and self.platform == other.platform
and self.inputs == other.inputs
and self.output_files == other.output_files
)
def diff(self, other: "ShowActionResult") -> ActionDifferences:
return ActionDifferences(
command_unified_diffs=list(
difflib.unified_diff(
self.command,
other.command,
fromfile="left action",
tofile="right action",
)
),
input_diffs=DictionaryDiff(self.inputs, other.inputs),
platform_diffs=DictionaryDiff(self.platform, other.platform),
)
def parse_show_action_output(lines: Iterable[str]) -> ShowActionResult:
"""Parse the results of `remotetool --operation show_action`.
Returns:
ShowActionResult, containing command, inputs, outputs.
Raises:
ParseError if there are any parsing errors.
"""
stripped_lines = [line.rstrip() for line in lines]
preamble, remainder = _must_partition_sequence(stripped_lines, "Command")
command_section, remainder = _must_partition_sequence(remainder, "Platform")
platform_section, remainder = _must_partition_sequence(remainder, "Inputs")
# Could see 'Action Result' or 'No action result in cache.'.
inputs_section, result_sep, remainder = cl_utils.partition_sequence(
remainder, "Action Result"
)
# command_section has:
# [0] '======='
# [1] 'Command Digest: ...'
# [2] <the command in one long line>
# [3] <blank line>
command = shlex.split(command_section[2].lstrip())
# platform_section has:
# [0] '======='
# [1:] key=value (repeated)
platform_parameters = {
k: v
for k, _, v in (
line.lstrip().partition("=") for line in platform_section[1:-1]
)
}
# inputs_section has:
# [0] '======='
# [1] '[Root directory digest: ...]
# [2:] <path>: [File digest: <digest>]
# [-2] <blank line>
# [-1] '------------------------------------------------------------------------'
# Input paths are relative to the exec_root.
# Inputs that come from other remote actions, might start with
# 'set_by_reclient/a' (relative to exec_root, canonicalized).
inputs = {
k: v
for k, v in (_parse_input_digest(line) for line in inputs_section[2:-2])
}
# result_section:
# (not used)
# This could also be absent with: 'No action result in cache.'
output_files = dict()
if result_sep:
result_section, remainder = _must_partition_sequence(
remainder, "Output Files"
)
output_files_section, remainder = _must_partition_sequence(
remainder, "Output Files From Directories"
)
# output_files_section has:
# [0] '======='
# [1:] <path>: [File digest: <digest>]
# [-1] <blank line>
# Output file paths are relative to the working_dir (not exec_root).
output_files = {
k: v
for k, v in (
_parse_output_digest(line)
for line in output_files_section[1:-1]
)
}
return ShowActionResult(
command=command,
inputs=inputs,
output_files=output_files,
platform=platform_parameters,
)
def read_config_file_lines(lines: Iterable[str]) -> Dict[str, str]:
"""Parser for reading RBE config files.
RBE config files are text files with lines of "VAR=VALUE"
(ignoring whole-line #-comments and blank lines).
Spec details can be found at:
https://github.com/bazelbuild/reclient/blob/main/internal/pkg/rbeflag/rbeflag.go
Args:
lines: lines of config from a file
Returns:
dictionary of key-value pairs read from the config file.
"""
result = {}
for line in lines:
stripped = line.strip()
if stripped and not stripped.startswith("#"):
key, sep, value = stripped.partition("=")
if sep == "=":
result[key] = value
return result
class RemoteTool(object):
def __init__(self, reproxy_cfg: Path):
self._reproxy_cfg = reproxy_cfg
def __eq__(self, other) -> bool:
return self.config == other.config
@property
def config(self) -> Dict[str, str]:
return self._reproxy_cfg
def run(
self,
args: Sequence[str],
show_command: bool = False,
**kwargs,
) -> cl_utils.SubprocessResult:
"""Runs `remotetool` using the given reproxy configuration.
Args:
args: remotetool command-line arguments
show_command: if True, print the full remotetool command before executing.
**kwargs: additional arguments forwarded to cl_utils.subprocess_call.
Returns:
SubprocessResult with exit code and captured stdout.
"""
service = self.config["service"]
instance = self.config["instance"]
auto_args = [
f"--service={service}",
f"--instance={instance}",
]
# Infra builds use GCE credentials instead of ADC.
gce_creds = os.environ.get("RBE_use_gce_credentials", None)
if gce_creds:
auto_args.append(f"--use_gce_credentials={gce_creds}")
else:
# Developers will use application-default-credentials for now.
# This could change to a different credential helper in the future.
auto_args.append("--use_application_default_credentials=true")
command = (
[
str(PROJECT_ROOT_REL / _HOST_REMOTETOOL),
]
+ auto_args
+ args
)
command_str = cl_utils.command_quoted_str(command)
if show_command:
print(command_str)
# TODO: cache 'show_action' results from this execution path
# This requires parsing the args before passing them straight through.
result = cl_utils.subprocess_call(command, **kwargs)
if result.returncode != 0:
for line in result.stderr:
print(line)
raise subprocess.CalledProcessError(result.returncode, command)
return result
def _show_action(self, digest: str, **kwargs) -> cl_utils.SubprocessResult:
args = ["--operation", "show_action", "--digest", digest]
final_kwargs = kwargs
final_kwargs["quiet"] = True
return self.run(args, **final_kwargs)
def show_action(self, digest: str, **kwargs) -> ShowActionResult:
"""Reads parameters of a remote action using `remotetool`.
Results of querying 'show_action' are cached.
Args:
digest: the hash/size of the action to lookup.
**kwargs: arguments forwarded to cl_utils.subprocess_call.
Returns:
ShowActionResult describing command, inputs, outputs.
"""
hash, sep, size = digest.partition(
"/"
) # actions all have the same "size" 147
tempdir = Path(tempfile.gettempdir())
cache_dir = tempdir / _SCRIPT_BASENAME / "show_action_cache"
cache_dir.mkdir(parents=True, exist_ok=True)
cache_file = cache_dir / (hash + ".stdout")
if cache_file.exists():
show_action_output = cache_file.read_text().splitlines()
else:
result = self._show_action(digest, **kwargs)
show_action_output = result.stdout
cache_file.write_text(
"".join(line + "\n" for line in show_action_output)
)
return parse_show_action_output(show_action_output)
def download_blob(
self, path: Path, digest: str, **kwargs
) -> cl_utils.SubprocessResult:
"""Downloads a remote artifact.
Args:
path: location of output to download.
digest: the hash/size of the blob to retrieve.
**kwargs: arguments forwarded to cl_utils.subprocess_call.
Returns:
return code of remotetool
"""
# TODO: use filelock.FileLock to guard against concurrent conflicts
args = [
"--operation",
"download_blob",
"--digest",
digest,
"--path",
str(path),
]
final_kwargs = kwargs
final_kwargs["quiet"] = True
return self.run(args, **final_kwargs)
def download_dir(
self, path: Path, digest: str, **kwargs
) -> cl_utils.SubprocessResult:
"""Downloads a remote directory of artifacts.
Args:
path: location of output to download.
digest: the hash/size of the dir to retrieve.
**kwargs: arguments forwarded to cl_utils.subprocess_call.
Returns:
return code of remotetool
"""
# TODO: use filelock.FileLock to guard against concurrent conflicts
args = [
"--operation",
"download_dir",
"--digest",
digest,
"--path",
str(path),
]
final_kwargs = kwargs
final_kwargs["quiet"] = True
return self.run(args, **final_kwargs)
def configure_remotetool(cfg: Path) -> RemoteTool:
return RemoteTool(read_config_file_lines(cfg.read_text().splitlines()))
def main(argv: Sequence[str]) -> int:
rtool = configure_remotetool(_REPROXY_CFG)
result = rtool.run(argv, show_command=True, quiet=False)
return result.returncode
if __name__ == "__main__":
sys.exit(main(sys.argv[1:]))