| #!/usr/bin/env fuchsia-vendored-python |
| # Copyright 2024 The Fuchsia Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| """Process a GN --ninja-outputs-file=FILE into a faster database file.""" |
| |
| import argparse |
| import difflib |
| import marshal |
| import sqlite3 |
| import json |
| import sys |
| from typing import Dict, List, Sequence |
| from pathlib import Path |
| |
| |
| def levenshtein_distance(str1: str, str2: str) -> int: |
| """Compute the levenshtein distance between two strings.""" |
| result = 0 |
| counters = [0, 0] |
| for delta in difflib.ndiff(str1, str2): |
| op = delta[0] |
| if op == " ": |
| result += max(counters) |
| counters = [0, 0] |
| else: |
| counters[1 if op == "+" else 0] += 1 |
| result += max(counters) |
| return result |
| |
| |
| def closest_match( |
| input: str, references: Sequence[str], max_distance: int |
| ) -> str: |
| """Find the closest match between an input string and a sequence of references. |
| |
| Args: |
| input: Input string |
| references: Sequence of reference strings to compare to. |
| max_distance: Max levenshtein distance allowed in result. |
| |
| Returns: |
| Closest match, or an empty string if there is no matching result. |
| """ |
| result = "" |
| best_score = max_distance + 1 |
| for ref in references: |
| if abs(len(ref) - len(input)) >= best_scope: |
| continue |
| score = levenshtein_distance(ref, input) |
| if score < best_score: |
| best_score = score |
| result = ref |
| if score == 0: |
| break |
| |
| return result |
| |
| |
| class NinjaOutputsBase(object): |
| """Base class for all implementations.""" |
| |
| def load_from_json(self, input_path: Path): |
| """Load original ninja_inputs.json file into in-memory database. |
| |
| Args: |
| input_path: Path to ninja_inputs.json file generated by GN. |
| """ |
| raise NotImplementedError |
| |
| def load_from_file(self, input_path: Path): |
| """Load the database saved to a file through a previous save_to_file() call.""" |
| raise NotImplementedError |
| |
| def save_to_file(self, output_path: Path): |
| """Save database to a given file.""" |
| raise NotImplementedError |
| |
| def __enter__(self): |
| return self |
| |
| def __exit__(self, exc_type, exc_value, exc_tb): |
| return False |
| |
| def get_labels(self, sort_result=True) -> List[str]: |
| """Return the list of all GN labels in the database.""" |
| result = self.get_labels_internal() |
| return sorted(result) if sort_result else result |
| |
| def get_paths(self, sort_result=True) -> List[str]: |
| """Return the list of all Ninja output paths in the database.""" |
| result = self.get_paths_internal() |
| return sorted(result) if sort_result else result |
| |
| def gn_label_to_paths(self, label: str) -> List[str]: |
| """Return the list of output paths for a given GN label. |
| |
| Args: |
| label: GN label, as it appears in the ninja_outputs.json file. |
| Returns: |
| A list of Ninja output file paths, relative to the build |
| directory, as they appear in the ninja_outputs.json file. |
| An empty list means the label is unknown. |
| """ |
| return [] |
| |
| def path_to_gn_label(self, path: str) -> str: |
| """Return the GN label whose target generates a given Ninja output path. |
| |
| Args: |
| path: Ninja target path, as it appears in ninja_outputs.json. |
| Must be relative to the Ninja build directory. |
| Returns: |
| A GN label string, as it appears in the ninja_outputs.json file, |
| or an empty string if the path is unknown. |
| """ |
| return "" |
| |
| def get_labels_internal(self) -> List[str]: |
| return [] |
| |
| def get_paths_internal(self) -> List[str]: |
| return [] |
| |
| |
| class NinjaOutputsJSON(NinjaOutputsBase): |
| """A NinjaOutputsBase implementation that uses an un-prettified JSON file. |
| |
| Profiling shows that queries are about the same speed as using the original ninja_outputs.json |
| file, so this does not save time. |
| |
| The generated file is about 45 MiB for a 47 MiB input file. |
| """ |
| |
| def __init__(self): |
| super().__init__() |
| self._label_to_paths: Dict[str, List[str]] = {} |
| self._path_to_label: Dict[str, str] = {} |
| |
| def load_from_json(self, input_path: Path): |
| with input_path.open() as f: |
| self._label_to_paths = json.load(f) |
| self._compute_path_map() |
| |
| def save_to_file(self, output_path: Path): |
| with output_path.open("w") as f: |
| json.dump(self._label_to_paths, f) |
| |
| def load_from_file(self, input_path: Path): |
| with input_path.open() as f: |
| self._label_to_paths = json.load(f) |
| self._compute_path_map() |
| |
| def _compute_path_map(self): |
| self._path_to_label = {} |
| for label, paths in self._label_to_paths.items(): |
| for path in paths: |
| self._path_to_label[path] = label |
| |
| def get_labels_internal(self) -> List[str]: |
| return self._label_to_paths.keys() |
| |
| def get_paths_internal(self) -> List[str]: |
| return self._path_to_label.keys() |
| |
| def gn_label_to_paths(self, label: str) -> List[str]: |
| return self._label_to_paths.get(label, []) |
| |
| def path_to_gn_label(self, path: str) -> str: |
| return self._path_to_label.get(path, "") |
| |
| |
| class NinjaOutputsMarshal(NinjaOutputsBase): |
| """A NinjaOutputsBase implementation that uses marshalled Python data structures. |
| |
| Profiling shows that queries are about 2x slower compared to NinjaOutputsJSON due |
| to the cost of unmarshaling the data structures at load time. |
| """ |
| |
| def __init__(self): |
| super().__init__() |
| self._label_to_paths: Dict[str, List[str]] = {} |
| self._path_to_label: Dict[str, str] = {} |
| |
| def load_from_json(self, input_path: Path): |
| with input_path.open() as f: |
| self._label_to_paths = json.load(f) |
| self._path_to_label = {} |
| for label, paths in self._label_to_paths.items(): |
| for path in paths: |
| self._path_to_label[path] = label |
| |
| def save_to_file(self, output_path: Path): |
| with output_path.open("wb") as f: |
| marshal.dump(self._label_to_paths, f) |
| marshal.dump(self._path_to_label, f) |
| |
| def load_from_file(self, input_path: Path): |
| with input_path.open("rb") as f: |
| self._label_to_paths = marshal.load(f) |
| self._path_to_label = marshal.load(f) |
| |
| def get_labels_internal(self) -> List[str]: |
| return self._label_to_paths.keys() |
| |
| def get_paths_internal(self) -> List[str]: |
| return self._path_to_label.keys() |
| |
| def gn_label_to_paths(self, label: str) -> List[str]: |
| return self._label_to_paths.get(label, []) |
| |
| def path_to_gn_label(self, path: str) -> str: |
| return self._path_to_label.get(path, "") |
| |
| |
| class NinjaOutputsTabular(NinjaOutputsBase): |
| """A NinjaOutputsBase implementation that uses a simple tabular text file. |
| |
| Profiling shows that this is fast to generate, and that queries are |
| even slightly faster than the NinjaOutputsSqlite3 implementation. |
| |
| The generated file is about 43 MiB for a 47 MiB input file. |
| """ |
| |
| # Four bytes at the start of the file to recognize its format. |
| # This is followed by newline-separated rows, where each row follows |
| # the format of: |
| # |
| # <label> <tab> <path1> [<tab> <path2> ...] <newline> |
| # |
| # Where <label> is a GN label, and <path1>, <path2>, ... are corresponding output |
| # file paths. The final row always includes a trailing newline. |
| INDEX_MAGIC: str = b"\x08IDX" |
| |
| def __init__(self): |
| super().__init__() |
| self._content = b"" |
| |
| def load_from_json(self, input_path: Path): |
| with input_path.open() as f: |
| input_json = json.load(f) |
| # NOTE: Optimized for speed, modify with caution! |
| content = "\n".join( |
| f"{label}\t" + "\t".join(paths) |
| for label, paths in sorted(input_json.items()) |
| ) |
| content += "\n" |
| self._content = content.encode("latin1") |
| |
| def load_from_file(self, input_path: Path): |
| with input_path.open("rb") as f: |
| magic = f.read(4) |
| if magic != self.INDEX_MAGIC: |
| return False |
| self._content = f.read() |
| return True |
| |
| def save_to_file(self, output_path: Path): |
| with output_path.open("wb") as f: |
| f.write(self.INDEX_MAGIC) |
| f.write(self._content) |
| |
| def gn_label_to_paths(self, label: str) -> List[str]: |
| paths = [] |
| line = self._find_row_starting_with(label.encode()) |
| if line: |
| items = line.split(b"\t") |
| assert len(items) >= 2, f"Invalid input file format, line: [{line}]" |
| paths = [p.decode() for p in items[1:]] |
| |
| return paths |
| |
| def path_to_gn_label(self, path: str) -> str: |
| labels = [] |
| lines = self._find_rows_containing_value(path.encode()) |
| for line in lines: |
| items = line.split(b"\t") |
| assert len(items) >= 2, f"Invalid input file format, line [{line}]" |
| return items[0].decode() |
| |
| return "" |
| |
| def get_labels_internal(self) -> List[str]: |
| result = [] |
| for row in self._parse_rows(): |
| tab_pos = row.find(9) |
| result.append(row[0:tab_pos].decode()) |
| return result |
| |
| def get_paths_internal(self) -> List[str]: |
| result = [] |
| content = self._content |
| for row_start, row_end in self._parse_row_bounds(): |
| tab_pos = content.find(9, row_start, row_end) |
| pos = tab_pos + 1 |
| while pos < row_end: |
| tab_pos = content.find(9, pos, row_end) |
| if tab_pos < 0: |
| tab_pos = row_end |
| result.append(content[pos:tab_pos].decode()) |
| pos = tab_pos + 1 |
| return result |
| |
| def _parse_rows(self): |
| content = self._content |
| content_len = len(content) |
| line_start = 0 |
| while line_start < content_len: |
| line_end = content.find(10, line_start) |
| assert line_end > line_start, "Malformed tabular file" |
| yield content[line_start:line_end] |
| line_start = line_end + 1 |
| |
| def _parse_row_bounds(self): |
| content = self._content |
| content_len = len(content) |
| line_start = 0 |
| while line_start < content_len: |
| line_end = content.find(10, line_start) |
| assert line_end > line_start, "Malformed tabular file" |
| yield line_start, line_end |
| line_start = line_end + 1 |
| |
| def _find_row_starting_with(self, key: bytes) -> bytes: |
| """Find the row in a index input file with a given key. |
| |
| Args: |
| key: first item to look for in input rows. |
| |
| Returns: |
| On success, a byte string corresponding to the row, without the |
| newline terminator. On failure, an empty string. |
| """ |
| # Use local variable to avoid self -> _content dereference on each use. |
| # Also use integer values for find() as this is faster in practice than |
| # using b"\t" and b"\n". |
| content = self._content |
| if False: |
| # Linear scan. Keep this code here for debugging. |
| # Benchmarking shows that this is only 10%-20% slower than bisection |
| # on 40 MiB input files from Fuchsia build plans. |
| content_len = len(content) |
| content_pos = 0 |
| needle = key + 9 |
| needle_len = len(needle) |
| while content_pos < content_len: |
| pos = content.find(needle, content_pos) |
| if pos < 0: |
| return b"" |
| if pos > 0 and self._content[pos - 1] != 10: |
| # Mismatch |
| content_pos = pos + needle_len |
| continue |
| |
| line_start = pos |
| line_end = content.find(10, pos + needle_len, content_len) |
| if line_end < 0: |
| line_end = content_len |
| return content[line_start:line_end] |
| |
| else: |
| # Bisection, since the lines are sorted by key (i.e. first column). |
| # NOTE: This is faster than bisecting with a table of line start offsets. |
| lo = 0 |
| hi = len(content) |
| while lo < hi: |
| mid = (lo + hi) // 2 |
| cur_line_start = content.rfind(10, lo, mid) |
| if cur_line_start < 0: |
| cur_line_start = lo |
| else: |
| cur_line_start += 1 |
| cur_line_end = content.find(10, cur_line_start, hi) |
| if cur_line_end < 0: |
| cur_line_end = hi |
| first_tab = content.find(9, cur_line_start, cur_line_end) |
| line_key = content[cur_line_start:first_tab] |
| if line_key == key: |
| return content[cur_line_start:cur_line_end] |
| elif line_key < key: |
| lo = cur_line_end + 1 |
| else: |
| hi = cur_line_start |
| |
| return b"" |
| |
| def _find_rows_containing_value(self, value: bytes) -> List[bytes]: |
| """Find all rows in a index file which contain a given value. |
| |
| Args: |
| value: value to look for in each row, past the first key of each row. |
| |
| Returns: |
| A list of strings, one per matching row, without any newline |
| terminators. On failure, an empty list is returned. |
| """ |
| # Use local variable for performance. |
| content = self._content |
| result = [] |
| value_len = len(value) |
| content_len = len(content) |
| content_pos = 0 |
| while content_pos < content_len: |
| pos = content.find(value, content_pos) |
| if pos < 0: |
| break |
| |
| # Found it, check boundaries. |
| if ( |
| pos == 0 |
| or content[pos - 1] != 9 |
| or content[pos + value_len] not in (9, 10) |
| ): |
| # Mismatch. |
| content_pos = pos + value_len |
| continue |
| |
| # Find line start and end. |
| line_start = content.rfind(10, 0, pos) |
| if line_start < 0: |
| line_start = 0 |
| else: |
| line_start += 1 |
| line_end = content.find(10, pos, content_len) |
| if line_end < 0: |
| line_end = content_len |
| |
| result.append(content[line_start:line_end]) |
| content_pos = line_end |
| |
| return result |
| |
| |
| class NinjaOutputsSqlite3(NinjaOutputsBase): |
| """A NinjaOutputsBase implementation that uses an in-memory sqlite3 database. |
| |
| Profiling that queries are slightly faster than the NinjaOutputsTabular implementation |
| (46.5ms versus 57.3ms). But generation takes a lot more time (4.7s vs 443ms). |
| over NinjaOutputsTabular. |
| |
| The generated file is about 109 MiB for a 47 MiB input file. |
| """ |
| |
| def __init__(self): |
| super().__init__() |
| self._db = None |
| |
| def __exit__(self, exc_type, exc_value, exc_tb): |
| if self._db: |
| self._db.close() |
| return False |
| |
| def load_from_json(self, input_path: Path): |
| with input_path.open() as f: |
| input_json = json.load(f) |
| self._db = sqlite3.connect(":memory:") |
| |
| # Create the three tables + two indices. |
| c = self._db.cursor() |
| c.execute( |
| "CREATE TABLE labels (id integer PRIMARY KEY, label text NOT NULL)" |
| ) |
| c.execute( |
| "CREATE TABLE paths (id integer PRIMARY KEY, path text NOT NULL)" |
| ) |
| c.execute("CREATE TABLE outputs (label_id integer, path_id integer)") |
| |
| # Populate the tables |
| for label, paths in input_json.items(): |
| c.execute(f"INSERT INTO labels(label) VALUES('{label}')") |
| label_id = c.lastrowid |
| for path in paths: |
| c.execute(f'INSERT INTO paths(path) VALUES("{path}")') |
| path_id = c.lastrowid |
| c.execute( |
| f"INSERT INTO outputs(label_id, path_id) VALUES({label_id}, {path_id});" |
| ) |
| |
| # Create the indices after the tables are created, for better performance. |
| c.execute("CREATE UNIQUE INDEX idx_labels ON labels(label)") |
| c.execute("CREATE UNIQUE INDEX idx_paths ON paths(path)") |
| c.execute("CREATE INDEX idx_output_labels ON outputs(label_id)") |
| c.execute("CREATE UNIQUE INDEX idx_output_paths ON outputs(path_id)") |
| |
| self._db.commit() |
| |
| def load_from_file(self, input_path: Path): |
| if self._db: |
| self._db.close() |
| self._db = sqlite3.connect(input_path) |
| |
| def save_to_file(self, output_path: Path): |
| assert ( |
| self._db |
| ), "No database, please call load_from_json() or load_from_file() first!" |
| output_path.parent.mkdir(parents=True, exist_ok=True) |
| if output_path.exists(): |
| output_path.unlink() |
| out_db = sqlite3.connect(output_path) |
| self._db.backup(out_db) |
| out_db.close() |
| return True |
| |
| def get_labels_internal(self) -> List[str]: |
| if not self._db: |
| return [] |
| |
| c = self._db.cursor() |
| c.execute("""SELECT l.label FROM labels l""") |
| return [x[0] for x in c.fetchall()] |
| |
| def get_paths_internal(self) -> List[str]: |
| if not self._db: |
| return [] |
| |
| c = self._db.cursor() |
| c.execute("""SELECT p.path FROM paths p""") |
| return sorted(x[0] for x in c.fetchall()) |
| |
| def gn_label_to_paths(self, label: str) -> List[str]: |
| if not self._db: |
| return [] |
| |
| c = self._db.cursor() |
| c.execute( |
| """SELECT p.path FROM paths p WHERE p.id IN ( |
| SELECT o.path_id FROM outputs o WHERE o.label_id IN ( |
| SELECT l.id FROM labels l WHERE l.label = ?))""", |
| (label,), |
| ) |
| return [x[0] for x in c.fetchall()] |
| |
| def path_to_gn_label(self, path: str) -> str: |
| if not self._db: |
| return "" |
| |
| c = self._db.cursor() |
| c.execute( |
| """SELECT l.label FROM labels l WHERE l.id IN ( |
| SELECT o.label_id FROM outputs o WHERE o.path_id IN ( |
| SELECT p.id FROM paths p WHERE p.path = ?))""", |
| (path,), |
| ) |
| labels = c.fetchall() |
| if not labels: |
| return "" |
| return labels[0][0] |
| |
| |
| _FORMAT_TO_CLASS_MAP = { |
| "json": NinjaOutputsJSON, |
| "sqlite": NinjaOutputsSqlite3, |
| "marshal": NinjaOutputsMarshal, |
| "tabular": NinjaOutputsTabular, |
| } |
| |
| _VALID_FORMATS = ["auto"] + list(_FORMAT_TO_CLASS_MAP.keys()) |
| |
| |
| def _autodetect_format(path: Path): |
| name = path.name |
| for f in _FORMAT_TO_CLASS_MAP.keys(): |
| if name.endswith("." + f): |
| return f |
| return None |
| |
| |
| def main(): |
| parser = argparse.ArgumentParser(description=__doc__) |
| parser.add_argument("input_file", type=Path, help="Input JSON file.") |
| parser.add_argument( |
| "--format", |
| choices=_VALID_FORMATS, |
| default="auto", |
| help="Select database format.", |
| ) |
| |
| subparsers = parser.add_subparsers(title="subcommands", dest="cmd") |
| |
| generate_parser = subparsers.add_parser( |
| "generate", help="Generate database file" |
| ) |
| generate_parser.add_argument( |
| "output_file", type=Path, help="Output database file." |
| ) |
| generate_parser.add_argument( |
| "--output-format", |
| choices=_VALID_FORMATS, |
| default="auto", |
| help="Output database format.", |
| ) |
| |
| query_paths_parser = subparsers.add_parser( |
| "query_paths", help="Convert a GN label into a list of paths" |
| ) |
| query_paths_parser.add_argument( |
| "label", metavar="gn_label", help="GN label" |
| ) |
| |
| query_label_parser = subparsers.add_parser( |
| "query_label", help="Convert Ninja output path into GN label" |
| ) |
| query_label_parser.add_argument( |
| "path", metavar="ninja_path", help="Ninja path" |
| ) |
| |
| list_labels_parser = subparsers.add_parser( |
| "list_labels", help="List of GN labels in database." |
| ) |
| |
| list_paths_parser = subparsers.add_parser( |
| "list_paths", help="List all Ninja paths in database." |
| ) |
| |
| closest_label_parser = subparsers.add_parser( |
| "closest_label", help="Find closest label matching input value." |
| ) |
| closest_label_parser.add_argument("label", help="Input GN label value.") |
| closest_label_parser.add_argument( |
| "--max-distance", default=2, help="Maximum distance (2)." |
| ) |
| |
| closest_path_parser = subparsers.add_parser( |
| "closest_path", help="Find closest Ninja path matching input value." |
| ) |
| closest_path_parser.add_argument("path", help="Input Ninja path value.") |
| closest_path_parser.add_argument( |
| "--max-distance", default=2, help="Maximum distance (2)." |
| ) |
| |
| args = parser.parse_args() |
| |
| if args.format == "auto": |
| args.format = _autodetect_format(args.input_file) |
| if not args.format: |
| parser.error( |
| "Could not determine format based on file extension, use --format=FORMAT." |
| ) |
| |
| if args.cmd == "generate": |
| if args.format != "json": |
| parser.error("Command only accepts json format as input!") |
| |
| if args.output_format == "auto": |
| args.output_format = _autodetect_format(args.output_file) |
| if not args.output_format: |
| parser.error( |
| "Could not determine output format based on file extension, use --output-format=FORMAT." |
| ) |
| |
| database = _FORMAT_TO_CLASS_MAP[args.output_format]() |
| database.load_from_json(args.input_file) |
| database.save_to_file(args.output_file) |
| return 0 |
| |
| # All other commands can read from an input database in any format. |
| input_class = _FORMAT_TO_CLASS_MAP[args.format] |
| database = input_class() |
| database.load_from_file(args.input_file) |
| |
| if args.cmd == "query_paths": |
| for path in database.gn_label_to_paths(args.label): |
| print(path) |
| return 0 |
| |
| if args.cmd == "query_label": |
| print(database.path_to_gn_label(args.path)) |
| return 0 |
| |
| if args.cmd == "list_labels": |
| for label in database.get_labels(): |
| print(label) |
| return 0 |
| |
| if args.cmd == "list_paths": |
| for path in database.get_paths(): |
| print(path) |
| return 0 |
| |
| if args.cmd == "closest_label": |
| label = closest_match( |
| args.label, |
| database.get_labels(sort_result=False), |
| args.max_distance, |
| ) |
| if label: |
| print(label) |
| return 0 |
| |
| if args.cmd == "closest_path": |
| path = closest_match( |
| args.path, database.get_paths(sort_result=False), args.max_distance |
| ) |
| if path: |
| print(path) |
| return 0 |
| |
| parser.error("Invalid command, see --help.") |
| return 1 |
| |
| |
| if __name__ == "__main__": |
| sys.exit(main()) |