blob: 8af94ef2297efecaee0a6de37183ce8dc0ace581 [file] [log] [blame] [edit]
#!/usr/bin/env fuchsia-vendored-python
# Copyright 2022 The Fuchsia Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Functions for working with reproxy logs.
This requires python protobufs for reproxy LogDump.
"""
import argparse
import hashlib
import multiprocessing
import os
import shutil
import subprocess
import sys
from api.log import log_pb2
from pathlib import Path
from typing import Callable, Dict, Iterable, Sequence, Tuple
import fuchsia
_SCRIPT_BASENAME = Path(__file__).name
def msg(text: str):
print(f'[{_SCRIPT_BASENAME}] {text}')
class ReproxyLog(object):
"""Contains a LogDump proto and indexes information internally.
The output_file paths used are relative to the working dir.
"""
def __init__(self, log_dump: log_pb2.LogDump):
self._proto: log_pb2.LogDump = log_dump
# Index in a few ways for easy lookup.
self._records_by_output_file = self._index_records_by_output_file()
self._records_by_action_digest = self._index_records_by_action_digest()
def _index_records_by_output_file(self) -> Dict[Path, log_pb2.LogRecord]:
"""Map files to the records of the actions that created them."""
records: Dict[Path, log_pb2.LogRecord] = dict()
for record in self.proto.records:
for output_file in record.command.output.output_files:
records[Path(output_file)] = record
return records
def _index_records_by_action_digest(self) -> Dict[str, log_pb2.LogRecord]:
"""Map action records by their action digest (hash/size)."""
return {
record.remote_metadata.action_digest: record
for record in self.proto.records
# Not all actions have remote_metadata, due to local/racing.
# If there happens to be duplicates (due to retries),
# just keep the last one.
if record.HasField('remote_metadata') and
record.remote_metadata.action_digest
}
@property
def proto(self) -> log_pb2.LogDump:
return self._proto
@property
def records_by_output_file(self) -> Dict[Path, log_pb2.LogRecord]:
return self._records_by_output_file
@property
def records_by_action_digest(self) -> Dict[str, log_pb2.LogRecord]:
return self._records_by_action_digest
def diff_by_outputs(
self, other: 'ReproxyLog',
eq_comparator: Callable[[log_pb2.LogRecord, log_pb2.LogRecord], bool]
) -> Iterable[Tuple[Path, log_pb2.LogRecord, log_pb2.LogRecord]]:
"""Compare actions output-by-output.
Args:
other: the other log to compare.
eq_comparator: compare function for LogRecord.
When this evaluates to False, report a difference.
Yields:
pairs of (left, right) log records with key differences.
"""
# Find output files common to both logs.
# Non-common files are ignored.
left_keys = set(self.records_by_output_file.keys())
right_keys = set(other.records_by_output_file.keys())
common_keys = left_keys & right_keys
visited_outputs = set()
for k in common_keys:
if k in visited_outputs:
continue
left_record = self.records_by_output_file[k]
right_record = other.records_by_output_file[k]
if not eq_comparator(left_record, right_record):
yield (k, left_record, right_record)
left_outputs = set(
Path(p) for p in left_record.command.output.output_files)
right_outputs = set(
Path(p) for p in right_record.command.output.output_files)
common_outputs = left_outputs & right_outputs
for p in common_outputs:
visited_outputs.add(p)
def setup_logdir_for_logdump(path: Path, verbose: bool = False) -> Path:
"""Automatically setup a log dir for logdump, if needed.
The `logdump` tool expects a log directory, but sometimes we
are only given a log file. When we are given a log file, copy it
to a directory. The created directory is cached by the hash
of the log file, so that it can be re-used for multiple queries.
Args:
path: path to either a .rrpl/.rpl log, or an reproxy log dir
containing a .rrpl/.rpl log.
Returns:
The original path, or path to a newly created cached directory
containing a copy of the log.
"""
if path.is_dir():
# This is a directory containing a .rrpl or .rpl file.
return path
# Otherwise, this is assumed to be a .rrpl or .rpl file.
# Copy it to a cached location.
tmpdir = Path(os.environ.get('TMPDIR', '/tmp'))
stdin_path = Path('-')
if path == stdin_path:
log_contents = sys.stdin.read().encode()
else:
log_contents = path.read_bytes()
readable_hash = hashlib.sha256(log_contents).hexdigest()
cached_log_dir = tmpdir / 'reproxy_logs_py' / 'cache' / readable_hash
if verbose:
msg(f'Copying log to {cached_log_dir} for logdump processing.')
cached_log_dir.mkdir(parents=True, exist_ok=True)
if path == stdin_path:
(cached_log_dir / 'reproxy_from_stdin.rrpl').write_bytes(log_contents)
else:
shutil.copy2(path, cached_log_dir)
return cached_log_dir
def _log_dump_from_pb(log_pb_file: Path) -> log_pb2.LogDump:
log_dump = log_pb2.LogDump()
with open(log_pb_file, mode='rb') as logf:
log_dump.ParseFromString(logf.read())
return log_dump
def convert_reproxy_actions_log(
reproxy_logdir: Path,
reclient_bindir: Path,
verbose: bool = False) -> log_pb2.LogDump:
log_pb_file = reproxy_logdir / "reproxy_log.pb"
if not log_pb_file.exists():
# Convert reproxy text logs to binary proto.
logdump_cmd = [
str(reclient_bindir / "logdump"),
"--proxy_log_dir",
str(reproxy_logdir),
"--output_dir",
str(reproxy_logdir), # Use same log dir, must be writeable.
]
if verbose:
msg(f"Ingesting logs from {reproxy_logdir}.")
# logdump could take a few minutes on large reproxy logs.
subprocess.check_call(logdump_cmd)
# Produces "reproxy_log.pb" in args.reproxy_logdir.
if verbose:
msg(f"Loading log records from {log_pb_file}.")
return _log_dump_from_pb(log_pb_file)
def parse_log(
log_path: Path,
reclient_bindir: Path,
verbose: bool = False) -> ReproxyLog:
"""Prepare and parse reproxy logs."""
return ReproxyLog(
convert_reproxy_actions_log(
reproxy_logdir=setup_logdir_for_logdump(log_path, verbose=verbose),
reclient_bindir=reclient_bindir,
verbose=verbose,
))
def _action_digest_eq(
left: log_pb2.LogRecord, right: log_pb2.LogRecord) -> bool:
return left.remote_metadata.action_digest == right.remote_metadata.action_digest
def _diff_printer(log) -> str:
lines = [f'action_digest: {log.remote_metadata.action_digest}']
for file, digest in log.remote_metadata.output_file_digests.items():
lines.extend([
f'file: {file}',
f' digest: {digest}',
])
return '\n'.join(lines)
# Defined at the module level for multiprocessing to be able to serialize.
def _process_log_mp(log_path: Path) -> ReproxyLog:
"""Prepare and parse reproxy logs (for parallel processing)."""
return parse_log(
log_path=log_path,
reclient_bindir=fuchsia.RECLIENT_BINDIR,
verbose=True,
)
def parse_logs(reproxy_logs: Sequence[Path]) -> Sequence[ReproxyLog]:
"""Parse reproxy logs, maybe in parallel."""
try:
with multiprocessing.Pool() as pool:
return pool.map(_process_log_mp, reproxy_logs)
except OSError: # in case /dev/shm is not write-able (required)
if len(reproxy_logs) > 1:
msg("Warning: downloading sequentially instead of in parallel.")
return map(_process_log_mp, reproxy_logs)
def diff_logs(args: argparse.Namespace) -> int:
logs = parse_logs(args.reproxy_logs)
# len(logs) == 2, enforced by _MAIN_ARG_PARSER.
# TODO: argparse options to customize:
# different comparators
# different result printers
# TODO: use a structured proto difference representation
# to avoid showing fields that match.
left, right = logs
diffs = list(left.diff_by_outputs(right, _action_digest_eq))
if diffs:
print('Action differences found.')
for path, d_left, d_right in diffs:
left_text = _diff_printer(d_left)
right_text = _diff_printer(d_right)
print('---------------------------------')
print(f'{path}: (left)\n{left_text}\n')
print(f'{path}: (right)\n{right_text}\n')
return 0
def lookup_output_file_digest(args: argparse.Namespace) -> int:
log = parse_log(
log_path=args.log,
reclient_bindir=fuchsia.RECLIENT_BINDIR,
verbose=False,
)
path = args.path
try:
action_record = log.records_by_output_file[path]
except KeyError:
msg(f"No record with output {path} found.")
return 1
digest = action_record.remote_metadata.output_file_digests[str(path)]
# lookup must succeed by construction, else would have failed above
print(digest)
return 0
def _main_arg_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser()
# universal options
parser.add_argument(
"--verbose",
action="store_true",
default=False,
help="Print additional debug information while running.",
)
subparsers = parser.add_subparsers(required=True)
# command: diff
diff_parser = subparsers.add_parser('diff', help='diff --help')
diff_parser.set_defaults(func=diff_logs)
diff_parser.add_argument(
"reproxy_logs",
type=Path,
help="reproxy logs (.rrpl or .rpl) or the directories containing them",
metavar="PATH",
nargs=2,
)
# command: output_file_digest
output_file_digest_parser = subparsers.add_parser(
'output_file_digest',
help='prints the digest of a remote action output file',
)
output_file_digest_parser.set_defaults(func=lookup_output_file_digest)
output_file_digest_parser.add_argument(
"--log",
type=Path,
help="reproxy log (.rpl or .rrpl)",
metavar="REPROXY_LOG",
required=True,
)
output_file_digest_parser.add_argument(
"--path",
type=Path,
help="Path under the build output directory to lookup.",
metavar="OUTPUT_PATH",
required=True,
)
return parser
_MAIN_ARG_PARSER = _main_arg_parser()
def main(argv: Sequence[str]) -> int:
args = _MAIN_ARG_PARSER.parse_args(argv)
return args.func(args)
if __name__ == "__main__":
sys.exit(main(sys.argv[1:]))