blob: d74485d5988d8aaa57e99c57a796ccab518203bb [file] [log] [blame]
#!/usr/bin/env fuchsia-vendored-python
# Copyright 2023 The Fuchsia Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Generic utilities for working with command lines and argparse.
"""
import argparse
import asyncio
import collections
import dataclasses
import io
import os
import shlex
import subprocess
import sys
import platform
from pathlib import Path
from typing import Any, Callable, Dict, FrozenSet, Iterable, Optional, Sequence, Tuple
# Local subprocess and remote environment calls need this when a
# command is prefixed with an X=Y environment variable.
_ENV = '/usr/bin/env'
def auto_env_prefix_command(command: Sequence[str]) -> Sequence[str]:
if not command:
return []
if '=' in command[0]:
# Commands that start with X=Y local environment variables
# need to be run with 'env'.
return [_ENV] + command
return command
def bool_golang_flag(value: str) -> bool:
"""Interpret a Go-lang flag style boolean.
See https://pkg.go.dev/flag
This can be used as a 'type' parameter to 'ArgumentParser.add_argument()'
"""
return {
'1': True,
'0': False,
't': True,
'f': False,
'true': True,
'false': False,
}[value.lower()]
# TODO: move this to library for abstract data operations
def partition_sequence(seq: Sequence[Any],
sep: Any) -> Tuple[Sequence[Any], Any, Sequence[Any]]:
"""Similar to string.partition, but for arbitrary sequences.
Args:
seq: sequence of values.
sep: a value to be sought as the separator
Returns:
if sep is not found, returns (the original sequence, None, [])
otherwise returns a triple:
the subsequence before the first occurrencee of sep,
sep (the separator),
the subsequence after the first occurrence of sep.
"""
try:
sep_position = seq.index(sep)
except ValueError:
return seq, None, []
left = seq[:sep_position]
remainder = seq[sep_position + 1:]
return left, sep, remainder
# TODO: move this to library for abstract data operations
def split_into_subsequences(seq: Iterable[Any],
sep: Any) -> Iterable[Sequence[Any]]:
"""Similar to string.split, but for arbitrary sequences.
Args:
seq: sequence of values.
sep: a value to be sought as the separator
Returns:
sequence of subsequences between occurrences of the separator.
"""
subseq = []
for elem in seq:
if elem == sep:
yield subseq
subseq = []
else:
subseq.append(elem)
yield subseq
# TODO: move this to library for abstract data operations
def match_prefix_transform_suffix(
text: str, prefix: str, transform: Callable[[str],
str]) -> Optional[str]:
"""If text matches prefix, transform the text after the prefix.
This can be useful for transforming command flags.
Args:
text: string to match and possibly transform.
prefix: check if text starts with this string
transform: function to apply to remainder of text after the
matched prefix.
Returns:
Transformed text if prefix was matched, else None.
"""
if not text.startswith(prefix):
return None
suffix = text[len(prefix):]
transformed = transform(suffix)
return prefix + transformed
def command_quoted_str(command: Iterable[str]) -> str:
return ' '.join(shlex.quote(t) for t in command)
def flatten_comma_list(items: Iterable[str]) -> Iterable[str]:
"""Flatten ["a,b", "c,d"] -> ["a", "b", "c", "d"].
This is useful for merging repeatable flags, which also
have comma-separated values.
Yields:
Elements that were separated by commas, flattened over
the original sequence..
"""
for item in items:
yield from item.split(',')
def expand_fused_flags(command: Iterable[str],
flags: Sequence[str]) -> Iterable[str]:
"""Expand "fused" flags like '-I/foo/bar' into ('-I', '/foo/bar').
argparse.ArgumentParser does not handle fused flags well,
so expanding them first makes it easier to parse.
Do not expect the intended tool to be able to parse these expanded flags.
The reverse transformation is `fuse_expanded_flags()`.
Args:
command: sequence of command tokens.
flags: flag prefixes that are to be separated from their values.
Yields:
command tokens, possibly expanded.
"""
for tok in command:
matched = False
for prefix in flags:
if tok.startswith(prefix) and len(tok) > len(prefix):
# Separate value from flag to make it easier for argparse.
yield prefix
yield tok[len(prefix):]
matched = True
break
if not matched:
yield tok
def fuse_expanded_flags(command: Iterable[str],
flags: FrozenSet[str]) -> Iterable[str]:
"""Turns flags like ('-D' 'foo') into '-Dfoo'.
Reverse transformation of `expand_fused_flags()`.
Args:
command: sequence of command tokens.
flags: flag prefixes that are to be joined with their values.
Yields:
command tokens, possibly fused.
"""
prefix = None
for tok in command:
if prefix:
yield prefix + tok
prefix = None
continue
if tok in flags:
# defer to next iteration to fuse
prefix = tok
continue
yield tok
def keyed_flags_to_values_dict(
flags: Iterable[str],
convert_type: Callable[[str], Any] = None) -> Dict[str, Sequence[str]]:
"""Convert a series of key[=value]s into a dictionary.
All dictionary values are accumulated sequences of 'value's,
so repeated keys like 'k=x' and 'k=y' will result in k:[x,y].
It is up to the caller to interpret the values.
This is useful for parsing and organizing tool flags like
'-Cfoo=bar', '-Cbaz', '-Cquux=foo'.
Args:
flags: strings with the following forms:
'key' -> key: (no value)
'key=' -> key: "" (empty string)
'key=value' -> key: value
convert_type: type to convert string to, e.g. int, Path
Returns:
Strings dictionary of key and (possibly multiple) values.
"""
partitions = (f.partition('=') for f in flags)
# each partition is a tuple (left, sep, right)
d = collections.defaultdict(list)
for (key, sep, value) in partitions:
if sep == '=':
d[key].append(convert_type(value) if convert_type else value)
else:
d[key]
return d
def last_value_or_default(values: Sequence[str], default: str) -> str:
if values:
return values[-1]
return default
def last_value_of_dict_flag(
d: Dict[str, Sequence[str]], key: str, default: str = '') -> str:
"""This selects the last value among repeated occurrences of a flag as a winner."""
return last_value_or_default(d.get(key, []), default)
@dataclasses.dataclass
class ForwardedFlag(object):
# The original name of the flag to match (including leading '-' or '--').
name: str
# If true, expect the following token to be the VALUE of --flag VALUE.
has_optarg: bool
# Substitute the original flag name with this name.
# If this is "", then delete the flag name (still forward its optarg if
# applicable)
mapped_name: str
class FlagForwarder(object):
"""Separate and transform (rename) a set of flags and their values.
Unlike using argparse.ArgumentParser, this forwarding approach
preserves the left-to-right order in which flags appear.
"""
def __init__(self, flag_mappings: Iterable[ForwardedFlag]):
self._map = {m.name: m for m in flag_mappings}
def sift(self, argv: Iterable[str]) -> Tuple[Sequence[str], Sequence[str]]:
"""Sifts out known flags while transforming them.
Args:
argv: command tokens
Returns:
1) forwarded and transformed flags and args
2) filtered out copy of argv
"""
forwarded_flags = []
filtered_argv = []
next_token_is_optarg = False
for tok in argv:
if next_token_is_optarg:
forwarded_flags.append(tok)
next_token_is_optarg = False
continue
# match --flag without optarg
flag = self._map.get(tok, None)
if flag:
if flag.mapped_name:
forwarded_flags.append(flag.mapped_name)
next_token_is_optarg = flag.has_optarg
continue
# check for --flag=optarg
left, sep, right = tok.partition('=')
if sep == '=':
left_flag = self._map.get(left, None)
if left_flag:
prefix = left_flag.mapped_name + '=' if left_flag.mapped_name else ''
forwarded_flags.append(prefix + right)
continue
filtered_argv.append(tok)
return forwarded_flags, filtered_argv
def relpath(path: Path, start: Path) -> Path:
"""Relative path (using Path objects).
Path.relative_to() requires self to be a subpath of the argument,
but here, the argument is often the subpath of self.
Hence, we need os.path.relpath() in the general case.
Args:
path (Path): target path
start (Path): starting directory (required, unlike os.path.relpath)
Returns:
relative path
"""
return Path(os.path.relpath(path, start=start))
def symlink_relative(dest: Path, src: Path):
"""Create a relative-path symlink from src to dest.
Like os.symlink(), but using relative path.
Any intermediate directories to src are automatically created.
This is done without any os.chdir(), and can be done in parallel.
Args:
dest: target to link-to (not required to exist)
src: new symlink path pointing to dest
"""
if src.is_dir():
src.rmdir()
elif src.is_symlink() or src.is_file():
src.unlink()
src.parent.mkdir(parents=True, exist_ok=True)
src.symlink_to(relpath(dest, start=src.parent))
def exec_relaunch(command: Sequence[str]) -> None:
"""Re-launches a command without returning.
Works like an os.exec*() call, by replacing the current process with
a new one.
Tip: When mocking this function, give it a side-effect that
raises a test-only Exception to quickly simulate an exit, without
having to mock any other code that would normally not be reached.
Args:
command: command to execute. Must start with an executable, specified
with a relative or absolute path.
Returns: (it does not return)
"""
# TODO(http://fxbug.dev/125841): use os.execv(), but figure out
# how to get in-python print() of the new process to appear.
# os.execv(command[0], command[1:])
# Workaround: fork a subprocess
sys.exit(subprocess.call(command))
assert False, "exec_relaunch() should never return"
#####################################################################
# The following code implements subprocess 'tee' behavior based on:
# https://stackoverflow.com/questions/2996887/how-to-replicate-tee-behavior-in-python-when-using-subprocess
class SubprocessResult(object):
def __init__(self,
returncode: int,
stdout: Sequence[str] = None, # lines
stderr: Sequence[str] = None, # lines
# The process id may come in handy when looking for logs
pid: int = None,
):
self.returncode = returncode
self.stdout = stdout or []
self.stderr = stderr or []
self.pid = pid if pid is not None else -1
async def _read_stream(stream: io.TextIOBase, callback: Callable[[str], None]):
while True:
line = await stream.readline()
if line:
callback(line)
else:
break
async def _stream_subprocess(
cmd: Sequence[str],
stdin: io.TextIOBase = None,
stdout: io.TextIOBase = None,
stderr: io.TextIOBase = None,
quiet: bool = False,
**kwargs,
) -> SubprocessResult:
popen_kwargs = {}
if platform.system() == 'Windows':
platform_settings = {"env": os.environ}
else:
platform_settings = {}
# default interpreter is sufficient: {"executable": "/bin/sh"}
popen_kwargs.update(platform_settings)
popen_kwargs.update(kwargs)
cmd_str = command_quoted_str(cmd)
p = await asyncio.create_subprocess_shell(
cmd_str,
stdin=stdin,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
**popen_kwargs)
pid = p.pid
out_text = []
err_text = []
def tee(line: str, sink: Sequence[str], pipe: io.TextIOBase):
line = line.decode("utf-8").rstrip()
sink.append(line)
if not quiet:
print(line, file=pipe)
await asyncio.wait(
[ # Forward stdout, stderr while capturing them.
_read_stream(p.stdout, lambda l: tee(l, out_text, stdout or sys.stdout)),
_read_stream(p.stderr, lambda l: tee(l, err_text, stderr or sys.stderr)),
])
return SubprocessResult(
returncode=await p.wait(),
stdout=out_text,
stderr=err_text,
pid=pid,
)
def subprocess_call(
cmd: Sequence[str],
stdin: io.TextIOBase = None,
stdout: io.TextIOBase = None,
stderr: io.TextIOBase = None,
quiet: bool = False,
**kwargs,
) -> SubprocessResult:
"""Similar to subprocess.call(), but records stdout/stderr.
Use this when interested in stdout/stderr.
Args:
cmd: command to execute
stdin: input stream
stdout: output stream
stderr: error stream
quiet: if True, suppress forwarding to sys.stdout/stderr.
**kwargs: forwarded subprocess.Popen arguments.
Returns:
returncode, stdout (text), stderr (text)
"""
loop = asyncio.get_event_loop()
result = loop.run_until_complete(
_stream_subprocess(
cmd=cmd,
stdin=stdin,
stdout=stdout,
stderr=stderr,
quiet=quiet,
**kwargs,
))
return result
# end of subprocess_call section
#####################################################################