| #!/usr/bin/env fuchsia-vendored-python |
| # Copyright 2023 The Fuchsia Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| """Generic utilities for working with text protobufs (without schema). |
| """ |
| |
| import argparse |
| import enum |
| import collections |
| import dataclasses |
| import re |
| import sys |
| |
| from pathlib import Path |
| from typing import Any, Dict, Iterable, Sequence |
| |
| |
| class TokenType(enum.Enum): |
| FIELD_NAME = 0 # includes trailing ':' |
| START_BLOCK = 1 # '<' or '{' |
| END_BLOCK = 2 # '>' or '}' |
| STRING_VALUE = 3 # quoted text, e.g. "string" |
| OTHER_VALUE = 4 # non-string value |
| SPACE = 5 # [ \t]* |
| NEWLINE = 6 # [\n\r] |
| |
| |
| @dataclasses.dataclass |
| class Token(object): |
| text: str |
| type: TokenType |
| |
| |
| _FIELD_NAME_RE = re.compile(r"[a-zA-Z_][a-zA-Z0-9_]*:") |
| _SPACE_RE = re.compile(r"[ \t]+") |
| _NEWLINE_RE = re.compile(r"\r?\n") |
| _STRING_RE = re.compile(r'"([^\\"]|\\.)*"') # Allow escaped quotes inside |
| _VALUE_RE = re.compile(r"[^ \t\r\n]+") # Anything text that is not space |
| |
| |
| def _lex_line(line: str) -> Iterable[Token]: |
| while line: # is not empty |
| next_char = line[0] |
| |
| if next_char in {"<", "{"}: |
| yield Token(text=next_char, type=TokenType.START_BLOCK) |
| line = line[1:] |
| continue |
| |
| if next_char in {">", "}"}: |
| yield Token(text=next_char, type=TokenType.END_BLOCK) |
| line = line[1:] |
| continue |
| |
| field_match = _FIELD_NAME_RE.match(line) |
| if field_match: |
| field_name = field_match.group(0) |
| yield Token(text=field_name, type=TokenType.FIELD_NAME) |
| line = line[len(field_name) :] |
| continue |
| |
| string_match = _STRING_RE.match(line) |
| if string_match: |
| string = string_match.group(0) |
| yield Token(text=string, type=TokenType.STRING_VALUE) |
| line = line[len(string) :] |
| continue |
| |
| value_match = _VALUE_RE.match(line) |
| if value_match: |
| value = value_match.group(0) |
| yield Token(text=value, type=TokenType.OTHER_VALUE) |
| line = line[len(value) :] |
| continue |
| |
| space_match = _SPACE_RE.match(line) |
| if space_match: |
| space = space_match.group(0) |
| yield Token(text=space, type=TokenType.SPACE) |
| line = line[len(space) :] |
| continue |
| |
| newline_match = _NEWLINE_RE.match(line) |
| if newline_match: |
| newline = newline_match.group(0) |
| yield Token(text=newline, type=TokenType.NEWLINE) |
| line = line[len(newline) :] |
| continue |
| |
| raise ValueError(f'[textpb.lex] Unrecognized text: "{line}"') |
| |
| |
| def yield_verbose(items: Iterable[Any]) -> Iterable[Any]: |
| for item in items: |
| print(f"{item}") |
| yield item |
| |
| |
| def _lex(lines: Iterable[str]) -> Iterable[Token]: |
| """Divides proto text into tokens.""" |
| for line in lines: |
| yield from _lex_line(line) |
| |
| |
| class ParseError(ValueError): |
| def __init__(self, msg: str): |
| super().__init__(msg) |
| |
| |
| def _auto_dict(values: Sequence[Any]): |
| """Convert sequences of key-value pairs to dictionaries.""" |
| if len(values) == 0: |
| return values |
| |
| if all( |
| isinstance(v, dict) and v.keys() == {"key", "value"} for v in values |
| ): |
| # assume keys are unique quoted strings |
| # 'key' and 'value' should not be repeated fields |
| return {v["key"][0].text.strip('"'): v["value"][0] for v in values} |
| |
| return values |
| |
| |
| def _parse_block( |
| tokens: Iterable[Token], top: bool |
| ) -> Dict[str, Sequence[Any]]: |
| """Parse text proto tokens into a structure. |
| |
| Args: |
| tokens: lexical tokens, without any spaces/newlines. |
| |
| Returns: |
| dictionary representation of text proto. |
| """ |
| # Without a schema, we cannot deduce whether a field is scalar or |
| # repeated, so treat them as repeated (maybe singleton). |
| result = collections.defaultdict(list) |
| |
| while True: |
| try: |
| field = next(tokens) |
| except StopIteration: |
| if top: |
| break |
| else: |
| raise ParseError( |
| "Unexpected EOF, missing '>' or '}' end-of-block" |
| ) |
| |
| if field.type == TokenType.END_BLOCK: |
| if top: |
| raise ParseError( |
| "Unexpected end-of-block at top-level before EOF." |
| ) |
| break |
| |
| if field.type != TokenType.FIELD_NAME: |
| raise ParseError(f"Expected a field name, but got {field}.") |
| |
| key = field.text[:-1] # removes trailing ':' |
| try: |
| value_or_block = next(tokens) |
| except StopIteration: |
| raise ParseError( |
| "Unexpected EOF, expecting a value or start-of-block." |
| ) |
| |
| if value_or_block.type == TokenType.START_BLOCK: |
| value = _parse_block(tokens, top=False) |
| elif value_or_block.type in { |
| TokenType.STRING_VALUE, |
| TokenType.OTHER_VALUE, |
| }: |
| value = value_or_block # a Token |
| else: |
| raise ParseError(f"Unexpected token: {value_or_block}") |
| |
| result[key].append(value) |
| |
| # End of block, post-process key-value pairs into dictionaries. |
| return {k: _auto_dict(v) for k, v in result.items()} |
| |
| |
| def _parse_tokens(tokens: Iterable[Token]) -> Dict[str, Sequence[Any]]: |
| return _parse_block(tokens, top=True) |
| |
| |
| def parse(lines: Iterable[str]) -> Dict[str, Sequence[Any]]: |
| """Parse a text protobuf into a recursive dictionary. |
| |
| Args: |
| lines: lines of text proto (spaces and line breaks are ignored) |
| |
| Returns: |
| Structured representation of the proto. |
| Fields are treated as either repeated (even if the original |
| schema was scalar) or as key-value dictionaries. |
| """ |
| # ignore spaces |
| return _parse_tokens( |
| token |
| for token in _lex(lines) |
| if token.type not in {TokenType.SPACE, TokenType.NEWLINE} |
| ) |
| |
| |
| def _main_arg_parser() -> argparse.ArgumentParser: |
| parser = argparse.ArgumentParser( |
| description="Parse any text proto and show its representation.", |
| argument_default=None, |
| ) |
| parser.add_argument( |
| "input", |
| type=Path, |
| metavar="FILE", |
| help="The text proto file to parse", |
| ) |
| return parser |
| |
| |
| _MAIN_ARG_PARSER = _main_arg_parser() |
| |
| |
| def main(argv: Sequence[str]) -> int: |
| args = _MAIN_ARG_PARSER.parse_args(argv) |
| with open(args.input) as f: |
| data = parse(f) |
| print(data) |
| return 0 |
| |
| |
| if __name__ == "__main__": |
| sys.exit(main(sys.argv[1:])) |