blob: f5001763abec8d990b9b72a6095f54018cd6a4f8 [file] [log] [blame]
#!/usr/bin/env fuchsia-vendored-python
# Copyright 2023 The Fuchsia Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Generic utilities for working with text protobufs (without schema).
"""
import argparse
import enum
import collections
import dataclasses
import re
import sys
from pathlib import Path
from typing import Any, Dict, Iterable, Sequence
class TokenType(enum.Enum):
FIELD_NAME = 0 # includes trailing ':'
START_BLOCK = 1 # '<' or '{'
END_BLOCK = 2 # '>' or '}'
STRING_VALUE = 3 # quoted text, e.g. "string"
OTHER_VALUE = 4 # non-string value
SPACE = 5 # [ \t]*
NEWLINE = 6 # [\n\r]
@dataclasses.dataclass
class Token(object):
text: str
type: TokenType
_FIELD_NAME_RE = re.compile(r"[a-zA-Z_][a-zA-Z0-9_]*:")
_SPACE_RE = re.compile(r"[ \t]+")
_NEWLINE_RE = re.compile(r"\r?\n")
_STRING_RE = re.compile(r'"([^\\"]|\\.)*"') # Allow escaped quotes inside
_VALUE_RE = re.compile(r"[^ \t\r\n]+") # Anything text that is not space
def _lex_line(line: str) -> Iterable[Token]:
while line: # is not empty
next_char = line[0]
if next_char in {"<", "{"}:
yield Token(text=next_char, type=TokenType.START_BLOCK)
line = line[1:]
continue
if next_char in {">", "}"}:
yield Token(text=next_char, type=TokenType.END_BLOCK)
line = line[1:]
continue
field_match = _FIELD_NAME_RE.match(line)
if field_match:
field_name = field_match.group(0)
yield Token(text=field_name, type=TokenType.FIELD_NAME)
line = line[len(field_name) :]
continue
string_match = _STRING_RE.match(line)
if string_match:
string = string_match.group(0)
yield Token(text=string, type=TokenType.STRING_VALUE)
line = line[len(string) :]
continue
value_match = _VALUE_RE.match(line)
if value_match:
value = value_match.group(0)
yield Token(text=value, type=TokenType.OTHER_VALUE)
line = line[len(value) :]
continue
space_match = _SPACE_RE.match(line)
if space_match:
space = space_match.group(0)
yield Token(text=space, type=TokenType.SPACE)
line = line[len(space) :]
continue
newline_match = _NEWLINE_RE.match(line)
if newline_match:
newline = newline_match.group(0)
yield Token(text=newline, type=TokenType.NEWLINE)
line = line[len(newline) :]
continue
raise ValueError(f'[textpb.lex] Unrecognized text: "{line}"')
def yield_verbose(items: Iterable[Any]) -> Iterable[Any]:
for item in items:
print(f"{item}")
yield item
def _lex(lines: Iterable[str]) -> Iterable[Token]:
"""Divides proto text into tokens."""
for line in lines:
yield from _lex_line(line)
class ParseError(ValueError):
def __init__(self, msg: str):
super().__init__(msg)
def _auto_dict(values: Sequence[Any]):
"""Convert sequences of key-value pairs to dictionaries."""
if len(values) == 0:
return values
if all(
isinstance(v, dict) and v.keys() == {"key", "value"} for v in values
):
# assume keys are unique quoted strings
# 'key' and 'value' should not be repeated fields
return {v["key"][0].text.strip('"'): v["value"][0] for v in values}
return values
def _parse_block(
tokens: Iterable[Token], top: bool
) -> Dict[str, Sequence[Any]]:
"""Parse text proto tokens into a structure.
Args:
tokens: lexical tokens, without any spaces/newlines.
Returns:
dictionary representation of text proto.
"""
# Without a schema, we cannot deduce whether a field is scalar or
# repeated, so treat them as repeated (maybe singleton).
result = collections.defaultdict(list)
while True:
try:
field = next(tokens)
except StopIteration:
if top:
break
else:
raise ParseError(
"Unexpected EOF, missing '>' or '}' end-of-block"
)
if field.type == TokenType.END_BLOCK:
if top:
raise ParseError(
"Unexpected end-of-block at top-level before EOF."
)
break
if field.type != TokenType.FIELD_NAME:
raise ParseError(f"Expected a field name, but got {field}.")
key = field.text[:-1] # removes trailing ':'
try:
value_or_block = next(tokens)
except StopIteration:
raise ParseError(
"Unexpected EOF, expecting a value or start-of-block."
)
if value_or_block.type == TokenType.START_BLOCK:
value = _parse_block(tokens, top=False)
elif value_or_block.type in {
TokenType.STRING_VALUE,
TokenType.OTHER_VALUE,
}:
value = value_or_block # a Token
else:
raise ParseError(f"Unexpected token: {value_or_block}")
result[key].append(value)
# End of block, post-process key-value pairs into dictionaries.
return {k: _auto_dict(v) for k, v in result.items()}
def _parse_tokens(tokens: Iterable[Token]) -> Dict[str, Sequence[Any]]:
return _parse_block(tokens, top=True)
def parse(lines: Iterable[str]) -> Dict[str, Sequence[Any]]:
"""Parse a text protobuf into a recursive dictionary.
Args:
lines: lines of text proto (spaces and line breaks are ignored)
Returns:
Structured representation of the proto.
Fields are treated as either repeated (even if the original
schema was scalar) or as key-value dictionaries.
"""
# ignore spaces
return _parse_tokens(
token
for token in _lex(lines)
if token.type not in {TokenType.SPACE, TokenType.NEWLINE}
)
def _main_arg_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
description="Parse any text proto and show its representation.",
argument_default=None,
)
parser.add_argument(
"input",
type=Path,
metavar="FILE",
help="The text proto file to parse",
)
return parser
_MAIN_ARG_PARSER = _main_arg_parser()
def main(argv: Sequence[str]) -> int:
args = _MAIN_ARG_PARSER.parse_args(argv)
with open(args.input) as f:
data = parse(f)
print(data)
return 0
if __name__ == "__main__":
sys.exit(main(sys.argv[1:]))