| #!/usr/bin/env fuchsia-vendored-python |
| # Copyright 2024 The Fuchsia Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| """Process a GN --ninja-outputs-file=FILE into a faster database file.""" |
| |
| import argparse |
| import difflib |
| import json |
| import sys |
| from pathlib import Path |
| from typing import Dict, List, Sequence |
| |
| |
| def levenshtein_distance(str1: str, str2: str) -> int: |
| """Compute the levenshtein distance between two strings.""" |
| result = 0 |
| counters = [0, 0] |
| for delta in difflib.ndiff(str1, str2): |
| op = delta[0] |
| if op == " ": |
| result += max(counters) |
| counters = [0, 0] |
| else: |
| counters[1 if op == "+" else 0] += 1 |
| result += max(counters) |
| return result |
| |
| |
| def closest_match( |
| input: str, references: Sequence[str], max_distance: int |
| ) -> str: |
| """Find the closest match between an input string and a sequence of references. |
| |
| Args: |
| input: Input string |
| references: Sequence of reference strings to compare to. |
| max_distance: Max levenshtein distance allowed in result. |
| |
| Returns: |
| Closest match, or an empty string if there is no matching result. |
| """ |
| result = "" |
| best_score = max_distance + 1 |
| for ref in references: |
| if abs(len(ref) - len(input)) >= best_scope: |
| continue |
| score = levenshtein_distance(ref, input) |
| if score < best_score: |
| best_score = score |
| result = ref |
| if score == 0: |
| break |
| |
| return result |
| |
| |
| class NinjaOutputsBase(object): |
| """Base class for all implementations.""" |
| |
| def load_from_json(self, input_path: Path): |
| """Load original ninja_inputs.json file into in-memory database. |
| |
| Args: |
| input_path: Path to ninja_inputs.json file generated by GN. |
| """ |
| raise NotImplementedError |
| |
| def load_from_file(self, input_path: Path): |
| """Load the database saved to a file through a previous save_to_file() call.""" |
| raise NotImplementedError |
| |
| def save_to_file(self, output_path: Path): |
| """Save database to a given file.""" |
| raise NotImplementedError |
| |
| def __enter__(self): |
| return self |
| |
| def __exit__(self, exc_type, exc_value, exc_tb): |
| return False |
| |
| def get_labels(self, sort_result=True) -> List[str]: |
| """Return the list of all GN labels in the database.""" |
| result = self.get_labels_internal() |
| return sorted(result) if sort_result else result |
| |
| def get_paths(self, sort_result=True) -> List[str]: |
| """Return the list of all Ninja output paths in the database.""" |
| result = self.get_paths_internal() |
| return sorted(result) if sort_result else result |
| |
| def gn_label_to_paths(self, label: str) -> List[str]: |
| """Return the list of output paths for a given GN label. |
| |
| Args: |
| label: GN label, as it appears in the ninja_outputs.json file. |
| Returns: |
| A list of Ninja output file paths, relative to the build |
| directory, as they appear in the ninja_outputs.json file. |
| An empty list means the label is unknown. |
| """ |
| return [] |
| |
| def path_to_gn_label(self, path: str) -> str: |
| """Return the GN label whose target generates a given Ninja output path. |
| |
| Args: |
| path: Ninja target path, as it appears in ninja_outputs.json. |
| Must be relative to the Ninja build directory. |
| Returns: |
| A GN label string, as it appears in the ninja_outputs.json file, |
| or an empty string if the path is unknown. |
| """ |
| return "" |
| |
| @staticmethod |
| def is_valid_target_name(target: str) -> bool: |
| """Returns true if input is a valid target file name. |
| |
| Args: |
| target: An input string. |
| Returns: |
| True if the input if a file name, i.e. not a file path that |
| contains a directory separator, or a GN or Bazel target that |
| begins with @ or //. |
| """ |
| return all(target.find(ch) < 0 for ch in "@/:\\") |
| |
| def target_name_to_gn_labels(self, target: str) -> List[str]: |
| """Return the GN labels that matches a given target name if possible. |
| |
| This operation is slow and should only be performed as a fallback. |
| |
| Args: |
| target: A target name, i.e. one without any separator (colon or slash) |
| Returns: |
| A list of GN labels, each one having a Ninja output path whose |
| basename matches |target|. |
| """ |
| if not self.is_valid_target_name(target): |
| raise ValueError(f"Malformed target name: {target}") |
| return self.get_target_labels_internal(target) |
| |
| def get_labels_internal(self) -> List[str]: |
| return [] |
| |
| def get_paths_internal(self) -> List[str]: |
| return [] |
| |
| def get_target_labels_internal(self, target: str) -> List[str]: |
| return [] |
| |
| |
| class NinjaOutputsJSON(NinjaOutputsBase): |
| """A NinjaOutputsBase implementation that uses an un-prettified JSON file. |
| |
| Profiling shows that queries are about the same speed as using the original ninja_outputs.json |
| file, so this does not save time. |
| |
| The generated file is about 45 MiB for a 47 MiB input file. |
| """ |
| |
| def __init__(self): |
| super().__init__() |
| self._label_to_paths: Dict[str, List[str]] = {} |
| self._path_to_label: Dict[str, str] = {} |
| |
| def load_from_json(self, input_path: Path): |
| with input_path.open() as f: |
| self._label_to_paths = json.load(f) |
| self._compute_path_map() |
| |
| def save_to_file(self, output_path: Path): |
| with output_path.open("w") as f: |
| json.dump(self._label_to_paths, f) |
| |
| def load_from_file(self, input_path: Path): |
| with input_path.open() as f: |
| self._label_to_paths = json.load(f) |
| self._compute_path_map() |
| |
| def _compute_path_map(self): |
| self._path_to_label = {} |
| for label, paths in self._label_to_paths.items(): |
| for path in paths: |
| self._path_to_label[path] = label |
| |
| def get_labels_internal(self) -> List[str]: |
| return self._label_to_paths.keys() |
| |
| def get_paths_internal(self) -> List[str]: |
| return self._path_to_label.keys() |
| |
| def gn_label_to_paths(self, label: str) -> List[str]: |
| return self._label_to_paths.get(label, []) |
| |
| def path_to_gn_label(self, path: str) -> str: |
| return self._path_to_label.get(path, "") |
| |
| def get_target_labels_internal(self, target: str) -> List[str]: |
| candidate_paths = [ |
| p |
| for p in self._path_to_label.keys() |
| if p.rpartition("/")[2] == target |
| ] |
| return sorted({self._path_to_label[p] for p in candidate_paths}) |
| |
| |
| class NinjaOutputsTabular(NinjaOutputsBase): |
| """A NinjaOutputsBase implementation that uses a simple tabular text file. |
| |
| Profiling shows that this is fast to generate, and that queries are |
| even slightly faster than the NinjaOutputsSqlite3 implementation. |
| |
| The generated file is about 43 MiB for a 47 MiB input file. |
| """ |
| |
| # Four bytes at the start of the file to recognize its format. |
| # This is followed by newline-separated rows, where each row follows |
| # the format of: |
| # |
| # <label> <tab> <path1> [<tab> <path2> ...] <newline> |
| # |
| # Where <label> is a GN label, and <path1>, <path2>, ... are corresponding output |
| # file paths. The final row always includes a trailing newline. |
| INDEX_MAGIC: str = b"\x08IDX" |
| |
| def __init__(self): |
| super().__init__() |
| self._content = b"" |
| |
| def load_from_json(self, input_path: Path): |
| with input_path.open() as f: |
| input_json = json.load(f) |
| # NOTE: Optimized for speed, modify with caution! |
| content = "\n".join( |
| f"{label}\t" + "\t".join(paths) |
| for label, paths in sorted(input_json.items()) |
| ) |
| content += "\n" |
| self._content = content.encode("latin1") |
| |
| def load_from_file(self, input_path: Path): |
| with input_path.open("rb") as f: |
| magic = f.read(4) |
| if magic != self.INDEX_MAGIC: |
| return False |
| self._content = f.read() |
| return True |
| |
| def save_to_file(self, output_path: Path): |
| with output_path.open("wb") as f: |
| f.write(self.INDEX_MAGIC) |
| f.write(self._content) |
| |
| def gn_label_to_paths(self, label: str) -> List[str]: |
| paths = [] |
| line = self._find_row_starting_with(label.encode()) |
| if line: |
| items = line.split(b"\t") |
| assert len(items) >= 2, f"Invalid input file format, line: [{line}]" |
| paths = [p.decode() for p in items[1:]] |
| |
| return paths |
| |
| def path_to_gn_label(self, path: str) -> str: |
| lines = self._find_rows_containing_value(path.encode()) |
| for line in lines: |
| items = line.split(b"\t") |
| assert len(items) >= 2, f"Invalid input file format, line [{line}]" |
| return items[0].decode() |
| |
| return "" |
| |
| def get_target_labels_internal(self, target: str) -> List[str]: |
| labels = set() |
| for line in self._find_rows_containing_target(target.encode()): |
| items = line.split(b"\t") |
| for item in items[1:]: |
| if item.decode().rpartition("/")[2] == target: |
| labels.add(items[0].decode()) |
| break |
| |
| return sorted(labels) |
| |
| def get_labels_internal(self) -> List[str]: |
| result = [] |
| for row in self._parse_rows(): |
| tab_pos = row.find(9) |
| result.append(row[0:tab_pos].decode()) |
| return result |
| |
| def get_paths_internal(self) -> List[str]: |
| result = [] |
| content = self._content |
| for row_start, row_end in self._parse_row_bounds(): |
| tab_pos = content.find(9, row_start, row_end) |
| pos = tab_pos + 1 |
| while pos < row_end: |
| tab_pos = content.find(9, pos, row_end) |
| if tab_pos < 0: |
| tab_pos = row_end |
| result.append(content[pos:tab_pos].decode()) |
| pos = tab_pos + 1 |
| return result |
| |
| def _parse_rows(self): |
| content = self._content |
| content_len = len(content) |
| line_start = 0 |
| while line_start < content_len: |
| line_end = content.find(10, line_start) |
| assert line_end > line_start, "Malformed tabular file" |
| yield content[line_start:line_end] |
| line_start = line_end + 1 |
| |
| def _parse_row_bounds(self): |
| content = self._content |
| content_len = len(content) |
| line_start = 0 |
| while line_start < content_len: |
| line_end = content.find(10, line_start) |
| assert line_end > line_start, "Malformed tabular file" |
| yield line_start, line_end |
| line_start = line_end + 1 |
| |
| def _find_row_starting_with(self, key: bytes) -> bytes: |
| """Find the row in a index input file with a given key. |
| |
| Args: |
| key: first item to look for in input rows. |
| |
| Returns: |
| On success, a byte string corresponding to the row, without the |
| newline terminator. On failure, an empty string. |
| """ |
| # Use local variable to avoid self -> _content dereference on each use. |
| # Also use integer values for find() as this is faster in practice than |
| # using b"\t" and b"\n". |
| content = self._content |
| if False: |
| # Linear scan. Keep this code here for debugging. |
| # Benchmarking shows that this is only 10%-20% slower than bisection |
| # on 40 MiB input files from Fuchsia build plans. |
| content_len = len(content) |
| content_pos = 0 |
| needle = key + 9 |
| needle_len = len(needle) |
| while content_pos < content_len: |
| pos = content.find(needle, content_pos) |
| if pos < 0: |
| return b"" |
| if pos > 0 and self._content[pos - 1] != 10: |
| # Mismatch |
| content_pos = pos + needle_len |
| continue |
| |
| line_start = pos |
| line_end = content.find(10, pos + needle_len, content_len) |
| if line_end < 0: |
| line_end = content_len |
| return content[line_start:line_end] |
| |
| else: |
| # Bisection, since the lines are sorted by key (i.e. first column). |
| # NOTE: This is faster than bisecting with a table of line start offsets. |
| lo = 0 |
| hi = len(content) |
| while lo < hi: |
| mid = (lo + hi) // 2 |
| cur_line_start = content.rfind(10, lo, mid) |
| if cur_line_start < 0: |
| cur_line_start = lo |
| else: |
| cur_line_start += 1 |
| cur_line_end = content.find(10, cur_line_start, hi) |
| if cur_line_end < 0: |
| cur_line_end = hi |
| first_tab = content.find(9, cur_line_start, cur_line_end) |
| line_key = content[cur_line_start:first_tab] |
| if line_key == key: |
| return content[cur_line_start:cur_line_end] |
| elif line_key < key: |
| lo = cur_line_end + 1 |
| else: |
| hi = cur_line_start |
| |
| return b"" |
| |
| def _find_rows_containing_value(self, value: bytes) -> List[bytes]: |
| """Find all rows in a index file which contain a given value. |
| |
| Args: |
| value: value to look for in each row, past the first key of each row. |
| |
| Returns: |
| A list of strings, one per matching row, without any newline |
| terminators. On failure, an empty list is returned. |
| """ |
| # Use local variable for performance. |
| content = self._content |
| result = [] |
| value_len = len(value) |
| content_len = len(content) |
| content_pos = 0 |
| while content_pos < content_len: |
| pos = content.find(value, content_pos) |
| if pos < 0: |
| break |
| |
| # Found it, check boundaries. |
| if ( |
| pos == 0 |
| or content[pos - 1] != 9 |
| or content[pos + value_len] not in (9, 10) |
| ): |
| # Mismatch. |
| content_pos = pos + value_len |
| continue |
| |
| # Find line start and end. |
| line_start = content.rfind(10, 0, pos) |
| if line_start < 0: |
| line_start = 0 |
| else: |
| line_start += 1 |
| line_end = content.find(10, pos, content_len) |
| if line_end < 0: |
| line_end = content_len |
| |
| result.append(content[line_start:line_end]) |
| content_pos = line_end |
| |
| return result |
| |
| def _find_rows_containing_target(self, target: bytes) -> List[bytes]: |
| content = self._content |
| result = [] |
| target_len = len(target) |
| content_len = len(content) |
| content_pos = 0 |
| while content_pos < content_len: |
| pos = content.find(target, content_pos) |
| if pos < 0: |
| break |
| |
| # Found it, check boundaries. |
| if ( |
| pos == 0 |
| or content[pos - 1] not in (9, ord("/")) |
| or content[pos + target_len] not in (9, 10) |
| ): |
| # Mismatch. |
| content_pos = pos + target_len |
| continue |
| |
| # Find line start and end. |
| line_start = content.rfind(10, 0, pos) |
| if line_start < 0: |
| line_start = 0 |
| else: |
| line_start += 1 |
| line_end = content.find(10, pos, content_len) |
| if line_end < 0: |
| line_end = content_len |
| |
| result.append(content[line_start:line_end]) |
| content_pos = line_end |
| |
| return result |
| |
| |
| _FORMAT_TO_CLASS_MAP = { |
| "json": NinjaOutputsJSON, |
| "tabular": NinjaOutputsTabular, |
| } |
| |
| _VALID_FORMATS = ["auto"] + list(_FORMAT_TO_CLASS_MAP.keys()) |
| |
| |
| def _autodetect_format(path: Path): |
| name = path.name |
| for f in _FORMAT_TO_CLASS_MAP.keys(): |
| if name.endswith("." + f): |
| return f |
| return None |
| |
| |
| def main(): |
| parser = argparse.ArgumentParser(description=__doc__) |
| parser.add_argument("input_file", type=Path, help="Input JSON file.") |
| parser.add_argument( |
| "--format", |
| choices=_VALID_FORMATS, |
| default="auto", |
| help="Select database format.", |
| ) |
| |
| subparsers = parser.add_subparsers(title="subcommands", dest="cmd") |
| |
| generate_parser = subparsers.add_parser( |
| "generate", help="Generate database file" |
| ) |
| generate_parser.add_argument( |
| "output_file", type=Path, help="Output database file." |
| ) |
| generate_parser.add_argument( |
| "--output-format", |
| choices=_VALID_FORMATS, |
| default="auto", |
| help="Output database format.", |
| ) |
| |
| query_paths_parser = subparsers.add_parser( |
| "query_paths", help="Convert a GN label into a list of paths" |
| ) |
| query_paths_parser.add_argument( |
| "label", metavar="gn_label", help="GN label" |
| ) |
| |
| query_label_parser = subparsers.add_parser( |
| "query_label", help="Convert Ninja output path into GN label" |
| ) |
| query_label_parser.add_argument( |
| "path", metavar="ninja_path", help="Ninja path" |
| ) |
| |
| list_labels_parser = subparsers.add_parser( |
| "list_labels", help="List of GN labels in database." |
| ) |
| |
| list_paths_parser = subparsers.add_parser( |
| "list_paths", help="List all Ninja paths in database." |
| ) |
| |
| closest_label_parser = subparsers.add_parser( |
| "closest_label", help="Find closest label matching input value." |
| ) |
| closest_label_parser.add_argument("label", help="Input GN label value.") |
| closest_label_parser.add_argument( |
| "--max-distance", default=2, help="Maximum distance (2)." |
| ) |
| |
| closest_path_parser = subparsers.add_parser( |
| "closest_path", help="Find closest Ninja path matching input value." |
| ) |
| closest_path_parser.add_argument("path", help="Input Ninja path value.") |
| closest_path_parser.add_argument( |
| "--max-distance", default=2, help="Maximum distance (2)." |
| ) |
| |
| args = parser.parse_args() |
| |
| if args.format == "auto": |
| args.format = _autodetect_format(args.input_file) |
| if not args.format: |
| parser.error( |
| "Could not determine format based on file extension, use --format=FORMAT." |
| ) |
| |
| if args.cmd == "generate": |
| if args.format != "json": |
| parser.error("Command only accepts json format as input!") |
| |
| if args.output_format == "auto": |
| args.output_format = _autodetect_format(args.output_file) |
| if not args.output_format: |
| parser.error( |
| "Could not determine output format based on file extension, use --output-format=FORMAT." |
| ) |
| |
| database = _FORMAT_TO_CLASS_MAP[args.output_format]() |
| database.load_from_json(args.input_file) |
| database.save_to_file(args.output_file) |
| return 0 |
| |
| # All other commands can read from an input database in any format. |
| input_class = _FORMAT_TO_CLASS_MAP[args.format] |
| database = input_class() |
| database.load_from_file(args.input_file) |
| |
| if args.cmd == "query_paths": |
| for path in database.gn_label_to_paths(args.label): |
| print(path) |
| return 0 |
| |
| if args.cmd == "query_label": |
| print(database.path_to_gn_label(args.path)) |
| return 0 |
| |
| if args.cmd == "list_labels": |
| for label in database.get_labels(): |
| print(label) |
| return 0 |
| |
| if args.cmd == "list_paths": |
| for path in database.get_paths(): |
| print(path) |
| return 0 |
| |
| if args.cmd == "closest_label": |
| label = closest_match( |
| args.label, |
| database.get_labels(sort_result=False), |
| args.max_distance, |
| ) |
| if label: |
| print(label) |
| return 0 |
| |
| if args.cmd == "closest_path": |
| path = closest_match( |
| args.path, database.get_paths(sort_result=False), args.max_distance |
| ) |
| if path: |
| print(path) |
| return 0 |
| |
| parser.error("Invalid command, see --help.") |
| return 1 |
| |
| |
| if __name__ == "__main__": |
| sys.exit(main()) |