blob: b75697772102f55c5727ca1e6f10e887f37f6914 [file] [log] [blame]
#!/usr/bin/env fuchsia-vendored-python
# Copyright 2023 The Fuchsia Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Generic utilities for working with command lines and argparse.
"""
import argparse
import asyncio
import collections
import contextlib
import dataclasses
import datetime
import fcntl
import filecmp
import io
import os
import shlex
import shutil
import subprocess
import sys
import platform
from pathlib import Path
from typing import (
Any,
Callable,
Dict,
FrozenSet,
Iterable,
Optional,
Sequence,
Tuple,
)
_SCRIPT_BASENAME = Path(__file__).name
# Local subprocess and remote environment calls need this when a
# command is prefixed with an X=Y environment variable.
_ENV = "/usr/bin/env"
def msg(text: str):
print(f"[{_SCRIPT_BASENAME}] {text}")
def tfmt(t: datetime.datetime) -> str:
return f"{t.hour:02.0f}:{t.minute:02.0f}:{t.second:02.0f}.{t.microsecond:06.0f}"
def tmsg(dt: datetime.datetime, text: str):
print(f"[{tfmt(dt)}] {text}")
# Global control switch for time profiling using timer_cm.
_ENABLE_TIMERS = False
@contextlib.contextmanager
def timer_cm(text: str):
"""Times execution of a body of code for profiling performance.
Globally enabled/disabled with _ENABLE_TIMERS.
"""
try:
if _ENABLE_TIMERS:
start_time = datetime.datetime.now()
tmsg(start_time, "start: " + text)
yield
finally:
if _ENABLE_TIMERS:
end_time = datetime.datetime.now()
elapsed = end_time - start_time
tmsg(end_time, "end : " + text + f" (elapsed: {elapsed})")
def auto_env_prefix_command(command: Sequence[str]) -> Sequence[str]:
if not command:
return []
if "=" in command[0]:
# Commands that start with X=Y local environment variables
# need to be run with 'env'.
return [_ENV] + command
return command
def bool_golang_flag(value: str) -> bool:
"""Interpret a Go-lang flag style boolean.
See https://pkg.go.dev/flag
This can be used as a 'type' parameter to 'ArgumentParser.add_argument()'
"""
return {
"1": True,
"0": False,
"t": True,
"f": False,
"true": True,
"false": False,
}[value.lower()]
def copy_preserve_subpath(src: Path, dest_dir: Path):
"""Like copy(), but preserves the relative path of src in the destination.
Example: src='foo/bar/baz.txt', dest_dir='save/stuff/here' will result in
'save/stuff/here/foo/bar/baz.txt'
Args:
src: path to file, relative to working dir.
dest_dir: root directory to copy to (can be absolute or relative to
working dir).
"""
assert (
not src.is_absolute()
), f"source file to be copied should be relative, but got: {src}"
dest_subdir = dest_dir / src.parent
dest_subdir.mkdir(parents=True, exist_ok=True)
dest_file = dest_subdir / src.name
if dest_file.exists():
# If files are identical, don't copy again.
# This helps if dest_file is not write-able.
if not filecmp.cmp(src, dest_file, shallow=True):
# files are different, keep the existing copy.
msg(
f"[copy_preserve_subpath()] Warning: Files {src} and {dest_file} are different. Not copying to the latter."
)
return
shutil.copy2(src, dest_subdir)
# TODO: move this to library for abstract data operations
def partition_sequence(
seq: Sequence[Any], sep: Any
) -> Tuple[Sequence[Any], Any, Sequence[Any]]:
"""Similar to string.partition, but for arbitrary sequences.
Args:
seq: sequence of values.
sep: a value to be sought as the separator
Returns:
if sep is not found, returns (the original sequence, None, [])
otherwise returns a triple:
the subsequence before the first occurrencee of sep,
sep (the separator),
the subsequence after the first occurrence of sep.
"""
try:
sep_position = seq.index(sep)
except ValueError:
return seq, None, []
left = seq[:sep_position]
remainder = seq[sep_position + 1 :]
return left, sep, remainder
# TODO: move this to library for abstract data operations
def split_into_subsequences(
seq: Iterable[Any], sep: Any
) -> Iterable[Sequence[Any]]:
"""Similar to string.split, but for arbitrary sequences.
Args:
seq: sequence of values.
sep: a value to be sought as the separator
Returns:
sequence of subsequences between occurrences of the separator.
"""
subseq = []
for elem in seq:
if elem == sep:
yield subseq
subseq = []
else:
subseq.append(elem)
yield subseq
# TODO: move this to library for abstract data operations
def match_prefix_transform_suffix(
text: str, prefix: str, transform: Callable[[str], str]
) -> Optional[str]:
"""If text matches prefix, transform the text after the prefix.
This can be useful for transforming command flags.
Args:
text: string to match and possibly transform.
prefix: check if text starts with this string
transform: function to apply to remainder of text after the
matched prefix.
Returns:
Transformed text if prefix was matched, else None.
"""
if not text.startswith(prefix):
return None
suffix = text[len(prefix) :]
transformed = transform(suffix)
return prefix + transformed
def command_quoted_str(command: Iterable[str]) -> str:
return " ".join(shlex.quote(t) for t in command)
def flatten_comma_list(items: Iterable[str]) -> Iterable[str]:
"""Flatten ["a,b", "c,d"] -> ["a", "b", "c", "d"].
This is useful for merging repeatable flags, which also
have comma-separated values.
Yields:
Elements that were separated by commas, flattened over
the original sequence..
"""
for item in items:
yield from item.split(",")
def remove_hash_comments(lines: Iterable[str]) -> Iterable[str]:
for line in lines:
if not line.startswith("#"):
yield line
def expand_response_files(
command: Iterable[str], rspfiles: Sequence[Path]
) -> Iterable[str]:
"""Expand response files in a command into tokens contained therein.
Args:
command: sequence of command-line tokens, may contain @response_files.
rspfiles: (modify-by-reference) list of response files encountered during expansion.
Yields:
tokens, possibly expanded from response files.
"""
for tok in command:
if tok.startswith("@"):
# rustc supports an alternative syntax: @shell:RSPFILE
rspfile = Path(tok[1:].removeprefix("shell:"))
rspfiles.append(rspfile)
rsp_lines = rspfile.read_text().splitlines()
# remove blanks and comments
rsp_lines_stripped = (line.strip() for line in rsp_lines)
filtered_lines = remove_hash_comments(
line for line in rsp_lines_stripped if line
)
for line in filtered_lines:
yield from expand_response_files(shlex.split(line), rspfiles)
else:
yield tok
def expand_fused_flags(
command: Iterable[str], flags: Sequence[str]
) -> Iterable[str]:
"""Expand "fused" flags like '-I/foo/bar' into ('-I', '/foo/bar').
argparse.ArgumentParser does not handle fused flags well,
so expanding them first makes it easier to parse.
Do not expect the intended tool to be able to parse these expanded flags.
The reverse transformation is `fuse_expanded_flags()`.
Args:
command: sequence of command tokens.
flags: flag prefixes that are to be separated from their values.
Yields:
command tokens, possibly expanded.
"""
for tok in command:
matched = False
for prefix in flags:
if tok.startswith(prefix) and len(tok) > len(prefix):
# Separate value from flag to make it easier for argparse.
yield prefix
yield tok[len(prefix) :]
matched = True
break
if not matched:
yield tok
def fuse_expanded_flags(
command: Iterable[str], flags: FrozenSet[str]
) -> Iterable[str]:
"""Turns flags like ('-D' 'foo') into '-Dfoo'.
Reverse transformation of `expand_fused_flags()`.
Args:
command: sequence of command tokens.
flags: flag prefixes that are to be joined with their values.
Yields:
command tokens, possibly fused.
"""
prefix = None
for tok in command:
if prefix:
yield prefix + tok
prefix = None
continue
if tok in flags:
# defer to next iteration to fuse
prefix = tok
continue
yield tok
class StringSetAdd(argparse.Action):
"""An argparse.Action for adding an element to a set of strings."""
def __init__(self, option_strings, dest, nargs=None, **kwargs):
if nargs is not None:
raise ValueError("nargs not allowed")
super().__init__(option_strings, dest, **kwargs)
def __call__(self, parser, namespace, values, option_string=None):
collection = getattr(namespace, self.dest).copy()
collection.add(values)
setattr(namespace, self.dest, collection)
class StringSetRemove(argparse.Action):
"""An argparse.Action for removing an element from a set of strings."""
def __init__(self, option_strings, dest, nargs=None, **kwargs):
if nargs is not None:
raise ValueError("nargs not allowed")
super().__init__(option_strings, dest, **kwargs)
def __call__(self, parser, namespace, values, option_string=None):
collection = getattr(namespace, self.dest).copy()
# special case: values == "all", clear entire set
if values == "all":
setattr(namespace, self.dest, set())
return
try:
collection.remove(values)
except KeyError:
pass
setattr(namespace, self.dest, collection)
def expand_paths_from_files(files: Iterable[Path]) -> Iterable[Path]:
"""Expand paths from files that list other paths."""
for path_list in files:
with open(path_list) as f:
for line in f:
stripped = line.strip()
if stripped:
# Response files can list more than one path per line,
# separated by un-escaped whitespace.
for p in shlex.split(stripped):
yield Path(p.replace(" ", "\\ ")) # preserve escape
def keyed_flags_to_values_dict(
flags: Iterable[str], convert_type: Callable[[str], Any] = None
) -> Dict[str, Sequence[str]]:
"""Convert a series of key[=value]s into a dictionary.
All dictionary values are accumulated sequences of 'value's,
so repeated keys like 'k=x' and 'k=y' will result in k:[x,y].
It is up to the caller to interpret the values.
This is useful for parsing and organizing tool flags like
'-Cfoo=bar', '-Cbaz', '-Cquux=foo'.
Args:
flags: strings with the following forms:
'key' -> key: (no value)
'key=' -> key: "" (empty string)
'key=value' -> key: value
convert_type: type to convert string to, e.g. int, Path
Returns:
Strings dictionary of key and (possibly multiple) values.
"""
partitions = (f.partition("=") for f in flags)
# each partition is a tuple (left, sep, right)
d = collections.defaultdict(list)
for key, sep, value in partitions:
if sep == "=":
d[key].append(convert_type(value) if convert_type else value)
else:
d[key]
return d
def last_value_or_default(values: Sequence[str], default: str) -> str:
if values:
return values[-1]
return default
def last_value_of_dict_flag(
d: Dict[str, Sequence[str]], key: str, default: str = ""
) -> str:
"""This selects the last value among repeated occurrences of a flag as a winner."""
return last_value_or_default(d.get(key, []), default)
def filter_out_option_with_arg(
args: Iterable[str], prefix_flag: str
) -> Iterable[str]:
"""Remove option and arguments that start with a given prefix.
Example (with prefix_flag="--foo"):
removes "--foo=bar" and ("--foo" "bar") from the token sequence.
This also removes "--foo" at the end of the sequence.
Args:
args: command tokens
prefix_flag: the flag that should be filtered out (along with its argument).
Yields:
a filtered command
"""
fused_prefix_flag = prefix_flag + "="
delete_optarg = False
for arg in args:
if delete_optarg:
delete_optarg = False
continue
if arg == prefix_flag:
delete_optarg = True
continue
if arg.startswith(fused_prefix_flag):
continue
yield arg
def strip_option_prefix(args: Iterable[str], prefix_flag: str) -> Iterable[str]:
"""Copy over flags, but strip away the prefix from a specific flag.
Example (with prefix_flag="--foo"):
Both "--foo=bar" and ("--foo" "bar") become just "bar".
This also removes "--foo" at the end of the sequence.
Args:
args: command tokens
prefix_flag: the flag that should be stripped away while preserving
its argument.
Yields:
a filtered command
"""
fused_prefix_flag = prefix_flag + "="
keep_optarg = False
for arg in args:
if keep_optarg:
keep_optarg = False
yield arg
continue
if arg == prefix_flag:
keep_optarg = True
continue
if arg.startswith(fused_prefix_flag):
yield arg.removeprefix(fused_prefix_flag)
continue
yield arg
@dataclasses.dataclass
class ForwardedFlag(object):
# The original name of the flag to match (including leading '-' or '--').
name: str
# If true, expect the following token to be the VALUE of --flag VALUE.
has_optarg: bool
# Substitute the original flag name with this name.
# If this is "", then delete the flag name (still forward its optarg if
# applicable)
mapped_name: str
class FlagForwarder(object):
"""Separate and transform (rename) a set of flags and their values.
Unlike using argparse.ArgumentParser, this forwarding approach
preserves the left-to-right order in which flags appear.
"""
def __init__(self, flag_mappings: Iterable[ForwardedFlag]):
self._map = {m.name: m for m in flag_mappings}
def sift(self, argv: Iterable[str]) -> Tuple[Sequence[str], Sequence[str]]:
"""Sifts out known flags while transforming them.
Args:
argv: command tokens
Returns:
1) forwarded and transformed flags and args
2) filtered out copy of argv
"""
forwarded_flags = []
filtered_argv = []
next_token_is_optarg = False
for tok in argv:
if next_token_is_optarg:
forwarded_flags.append(tok)
next_token_is_optarg = False
continue
# match --flag without optarg
flag = self._map.get(tok, None)
if flag:
if flag.mapped_name:
forwarded_flags.append(flag.mapped_name)
next_token_is_optarg = flag.has_optarg
continue
# check for --flag=optarg
left, sep, right = tok.partition("=")
if sep == "=":
left_flag = self._map.get(left, None)
if left_flag:
prefix = (
left_flag.mapped_name + "="
if left_flag.mapped_name
else ""
)
forwarded_flags.append(prefix + right)
continue
filtered_argv.append(tok)
return forwarded_flags, filtered_argv
@contextlib.contextmanager
def chdir_cm(d: Path):
"""FIXME: replace with contextlib.chdir(), once Python 3.11 is default."""
save_dir = os.getcwd()
os.chdir(d) # could raise OSError
try:
yield
finally:
os.chdir(save_dir)
def relpath(path: Path, start: Path) -> Path:
"""Relative path (using Path objects).
Path.relative_to() requires self to be a subpath of the argument,
but here, the argument is often the subpath of self.
Hence, we need os.path.relpath() in the general case.
Args:
path (Path): target path
start (Path): starting directory (required, unlike os.path.relpath)
Returns:
relative path
"""
return Path(os.path.relpath(path, start=start))
def symlink_relative(dest: Path, src: Path):
"""Create a relative-path symlink from src to dest.
Like os.symlink(), but using relative path.
Any intermediate directories to src are automatically created.
This is done without any os.chdir(), and can be done in parallel.
Args:
dest: target to link-to (not required to exist)
src: new symlink path pointing to dest
"""
if src.is_dir():
src.rmdir()
elif src.is_symlink() or src.is_file():
src.unlink()
src.parent.mkdir(parents=True, exist_ok=True)
src.symlink_to(relpath(dest, start=src.parent))
def qualify_tool_path(path: Path) -> str:
"""Automatically prepend ./ to a tool path if needed.
The relative path to a tool can sometimes be unqualified
if it is in the current working directory, but to execute it
it either needs to be in PATH or be relative-path qualified.
Prepending './' makes it relative-path qualified.
Args:
path: path to a tool to be used in a subprocess.Popen.
Returns:
Relative-path qualified string because Path('./foo')
is represented as Path('foo') (lossy).
"""
if path.is_absolute():
# absolute, no need to adjust
return str(path)
if path.parent == Path("."):
# unqualified, needs './' prepended
return os.path.join(".", str(path))
# else already relative-path qualified
return str(path)
def exec_relaunch(command: Sequence[str]) -> None:
"""Re-launches a command without returning.
Works like an os.exec*() call, by replacing the current process with
a new one.
Tip: When mocking this function, give it a side-effect that
raises a test-only Exception to quickly simulate an exit, without
having to mock any other code that would normally not be reached.
Args:
command: command to execute. Must start with an executable, specified
with a relative or absolute path.
Returns: (it does not return)
"""
# TODO(https://fxbug.dev/42076617): use os.execv(), but figure out
# how to get in-python print() of the new process to appear.
# os.execv(command[0], command[1:])
# Workaround: fork a subprocess
sys.exit(subprocess.call(command))
assert False, "exec_relaunch() should never return"
#####################################################################
class BlockingFileLock(object):
"""A minimal mutual exclusion lock (blocking)."""
def __init__(self, lockfile: Path):
self._lockfile: Path = lockfile
self._lockfile_fd = None
def _acquire(self):
lockfile_fd = os.open(self._lockfile, os.O_RDWR | os.O_CREAT, 0o644)
fcntl.flock(lockfile_fd, fcntl.LOCK_EX) # Blocking
self._lockfile_fd = lockfile_fd
def _release(self):
lockfile_fd = self._lockfile_fd
self._lockfile_fd = None
fcntl.flock(lockfile_fd, fcntl.LOCK_UN)
os.close(lockfile_fd)
# Do not remove the lock file:
# https://stackoverflow.com/questions/17708885/flock-removing-locked-file-without-race-condition
def __enter__(self) -> "BlockingFileLock":
self._acquire()
return self
def __exit__(self, exc_type, exc_value, traceback):
self._release()
#####################################################################
# The following code implements subprocess 'tee' behavior based on:
# https://stackoverflow.com/questions/2996887/how-to-replicate-tee-behavior-in-python-when-using-subprocess
class SubprocessResult(object):
def __init__(
self,
returncode: int,
stdout: Sequence[str] = None, # lines
stderr: Sequence[str] = None, # lines
# The process id may come in handy when looking for logs
pid: int = None,
):
self.returncode = returncode
self.stdout = stdout or []
self.stderr = stderr or []
self.pid = pid if pid is not None else -1
@property
def stdout_text(self) -> str:
return "\n".join(self.stdout)
@property
def stderr_text(self) -> str:
return "\n".join(self.stderr)
async def _read_stream(stream: io.TextIOBase, callback: Callable[[str], None]):
while True:
line = await stream.readline()
if line:
callback(line)
else:
break
async def _stream_subprocess(
cmd: Sequence[str],
stdin: io.TextIOBase = None,
stdout: io.TextIOBase = None,
stderr: io.TextIOBase = None,
quiet: bool = False,
**kwargs,
) -> SubprocessResult:
popen_kwargs = {}
if platform.system() == "Windows":
platform_settings = {"env": os.environ}
else:
platform_settings = {}
# default interpreter is sufficient: {"executable": "/bin/sh"}
popen_kwargs.update(platform_settings)
popen_kwargs.update(kwargs)
cmd_str = command_quoted_str(cmd)
p = await asyncio.create_subprocess_shell(
cmd_str,
stdin=stdin,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
limit=1024 * 1024, # 1MB buffer for stdout/stderr StreamReaders
**popen_kwargs,
)
pid = p.pid
out_text = []
err_text = []
def tee(line: str, sink: Sequence[str], pipe: io.TextIOBase):
line = line.decode("utf-8").rstrip()
sink.append(line)
if not quiet:
print(line, file=pipe)
out_task = asyncio.create_task(
_read_stream(p.stdout, lambda l: tee(l, out_text, stdout or sys.stdout))
)
err_task = asyncio.create_task(
_read_stream(p.stderr, lambda l: tee(l, err_text, stderr or sys.stderr))
)
# Forward stdout, stderr while capturing them.
await asyncio.wait([out_task, err_task])
return SubprocessResult(
returncode=await p.wait(),
stdout=out_text,
stderr=err_text,
pid=pid,
)
def subprocess_call(
cmd: Sequence[str],
stdin: io.TextIOBase = None,
stdout: io.TextIOBase = None,
stderr: io.TextIOBase = None,
quiet: bool = False,
**kwargs,
) -> SubprocessResult:
"""Similar to subprocess.call(), but records stdout/stderr.
Use this when interested in stdout/stderr.
Args:
cmd: command to execute
stdin: input stream
stdout: output stream
stderr: error stream
quiet: if True, suppress forwarding to sys.stdout/stderr.
**kwargs: forwarded subprocess.Popen arguments.
Returns:
returncode, stdout (text), stderr (text)
"""
loop = asyncio.get_event_loop()
result = loop.run_until_complete(
_stream_subprocess(
cmd=cmd,
stdin=stdin,
stdout=stdout,
stderr=stderr,
quiet=quiet,
**kwargs,
)
)
return result
# end of subprocess_call section
#####################################################################
def subprocess_communicate(
cmd: Sequence[str], input_text: str, **kwargs
) -> SubprocessResult:
"""Pipes text through an external program.
Unlike subprocess_call, this does not come with `tee` capability.
Args:
cmd: command to run (operates as a pipe).
input_text: text to feed to command's stdin.
kwargs: additional Popen() kwargs.
Returns:
SubprocessResult containing output and exit code.
"""
p = subprocess.Popen(
cmd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
**kwargs,
)
out, err = p.communicate(input=input_text)
return SubprocessResult(
returncode=p.returncode,
stdout=out.splitlines(False),
stderr=err.splitlines(False),
pid=p.pid,
)