blob: 4822f8be339e64c7e16b38b265450f9f1ad33cf0 [file] [log] [blame]
#!/usr/bin/env fuchsia-vendored-python
# Copyright 2024 The Fuchsia Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Process a GN --ninja-outputs-file=FILE into a faster database file."""
import argparse
import difflib
import marshal
import sqlite3
import json
import sys
from typing import Dict, List, Sequence
from pathlib import Path
def levenshtein_distance(str1: str, str2: str) -> int:
"""Compute the levenshtein distance between two strings."""
result = 0
counters = [0, 0]
for delta in difflib.ndiff(str1, str2):
op = delta[0]
if op == " ":
result += max(counters)
counters = [0, 0]
else:
counters[1 if op == "+" else 0] += 1
result += max(counters)
return result
def closest_match(
input: str, references: Sequence[str], max_distance: int
) -> str:
"""Find the closest match between an input string and a sequence of references.
Args:
input: Input string
references: Sequence of reference strings to compare to.
max_distance: Max levenshtein distance allowed in result.
Returns:
Closest match, or an empty string if there is no matching result.
"""
result = ""
best_score = max_distance + 1
for ref in references:
if abs(len(ref) - len(input)) >= best_scope:
continue
score = levenshtein_distance(ref, input)
if score < best_score:
best_score = score
result = ref
if score == 0:
break
return result
class NinjaOutputsBase(object):
"""Base class for all implementations."""
def load_from_json(self, input_path: Path):
"""Load original ninja_inputs.json file into in-memory database.
Args:
input_path: Path to ninja_inputs.json file generated by GN.
"""
raise NotImplementedError
def load_from_file(self, input_path: Path):
"""Load the database saved to a file through a previous save_to_file() call."""
raise NotImplementedError
def save_to_file(self, output_path: Path):
"""Save database to a given file."""
raise NotImplementedError
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, exc_tb):
return False
def get_labels(self, sort_result=True) -> List[str]:
"""Return the list of all GN labels in the database."""
result = self.get_labels_internal()
return sorted(result) if sort_result else result
def get_paths(self, sort_result=True) -> List[str]:
"""Return the list of all Ninja output paths in the database."""
result = self.get_paths_internal()
return sorted(result) if sort_result else result
def gn_label_to_paths(self, label: str) -> List[str]:
"""Return the list of output paths for a given GN label.
Args:
label: GN label, as it appears in the ninja_outputs.json file.
Returns:
A list of Ninja output file paths, relative to the build
directory, as they appear in the ninja_outputs.json file.
An empty list means the label is unknown.
"""
return []
def path_to_gn_label(self, path: str) -> str:
"""Return the GN label whose target generates a given Ninja output path.
Args:
path: Ninja target path, as it appears in ninja_outputs.json.
Must be relative to the Ninja build directory.
Returns:
A GN label string, as it appears in the ninja_outputs.json file,
or an empty string if the path is unknown.
"""
return ""
def get_labels_internal(self) -> List[str]:
return []
def get_paths_internal(self) -> List[str]:
return []
class NinjaOutputsJSON(NinjaOutputsBase):
"""A NinjaOutputsBase implementation that uses an un-prettified JSON file.
Profiling shows that queries are about the same speed as using the original ninja_outputs.json
file, so this does not save time.
The generated file is about 45 MiB for a 47 MiB input file.
"""
def __init__(self):
super().__init__()
self._label_to_paths: Dict[str, List[str]] = {}
self._path_to_label: Dict[str, str] = {}
def load_from_json(self, input_path: Path):
with input_path.open() as f:
self._label_to_paths = json.load(f)
self._compute_path_map()
def save_to_file(self, output_path: Path):
with output_path.open("w") as f:
json.dump(self._label_to_paths, f)
def load_from_file(self, input_path: Path):
with input_path.open() as f:
self._label_to_paths = json.load(f)
self._compute_path_map()
def _compute_path_map(self):
self._path_to_label = {}
for label, paths in self._label_to_paths.items():
for path in paths:
self._path_to_label[path] = label
def get_labels_internal(self) -> List[str]:
return self._label_to_paths.keys()
def get_paths_internal(self) -> List[str]:
return self._path_to_label.keys()
def gn_label_to_paths(self, label: str) -> List[str]:
return self._label_to_paths.get(label, [])
def path_to_gn_label(self, path: str) -> str:
return self._path_to_label.get(path, "")
class NinjaOutputsMarshal(NinjaOutputsBase):
"""A NinjaOutputsBase implementation that uses marshalled Python data structures.
Profiling shows that queries are about 2x slower compared to NinjaOutputsJSON due
to the cost of unmarshaling the data structures at load time.
"""
def __init__(self):
super().__init__()
self._label_to_paths: Dict[str, List[str]] = {}
self._path_to_label: Dict[str, str] = {}
def load_from_json(self, input_path: Path):
with input_path.open() as f:
self._label_to_paths = json.load(f)
self._path_to_label = {}
for label, paths in self._label_to_paths.items():
for path in paths:
self._path_to_label[path] = label
def save_to_file(self, output_path: Path):
with output_path.open("wb") as f:
marshal.dump(self._label_to_paths, f)
marshal.dump(self._path_to_label, f)
def load_from_file(self, input_path: Path):
with input_path.open("rb") as f:
self._label_to_paths = marshal.load(f)
self._path_to_label = marshal.load(f)
def get_labels_internal(self) -> List[str]:
return self._label_to_paths.keys()
def get_paths_internal(self) -> List[str]:
return self._path_to_label.keys()
def gn_label_to_paths(self, label: str) -> List[str]:
return self._label_to_paths.get(label, [])
def path_to_gn_label(self, path: str) -> str:
return self._path_to_label.get(path, "")
class NinjaOutputsTabular(NinjaOutputsBase):
"""A NinjaOutputsBase implementation that uses a simple tabular text file.
Profiling shows that this is fast to generate, and that queries are
even slightly faster than the NinjaOutputsSqlite3 implementation.
The generated file is about 43 MiB for a 47 MiB input file.
"""
# Four bytes at the start of the file to recognize its format.
# This is followed by newline-separated rows, where each row follows
# the format of:
#
# <label> <tab> <path1> [<tab> <path2> ...] <newline>
#
# Where <label> is a GN label, and <path1>, <path2>, ... are corresponding output
# file paths. The final row always includes a trailing newline.
INDEX_MAGIC: str = b"\x08IDX"
def __init__(self):
super().__init__()
self._content = b""
def load_from_json(self, input_path: Path):
with input_path.open() as f:
input_json = json.load(f)
# NOTE: Optimized for speed, modify with caution!
content = "\n".join(
f"{label}\t" + "\t".join(paths)
for label, paths in sorted(input_json.items())
)
content += "\n"
self._content = content.encode("latin1")
def load_from_file(self, input_path: Path):
with input_path.open("rb") as f:
magic = f.read(4)
if magic != self.INDEX_MAGIC:
return False
self._content = f.read()
return True
def save_to_file(self, output_path: Path):
with output_path.open("wb") as f:
f.write(self.INDEX_MAGIC)
f.write(self._content)
def gn_label_to_paths(self, label: str) -> List[str]:
paths = []
line = self._find_row_starting_with(label.encode())
if line:
items = line.split(b"\t")
assert len(items) >= 2, f"Invalid input file format, line: [{line}]"
paths = [p.decode() for p in items[1:]]
return paths
def path_to_gn_label(self, path: str) -> str:
labels = []
lines = self._find_rows_containing_value(path.encode())
for line in lines:
items = line.split(b"\t")
assert len(items) >= 2, f"Invalid input file format, line [{line}]"
return items[0].decode()
return ""
def get_labels_internal(self) -> List[str]:
result = []
for row in self._parse_rows():
tab_pos = row.find(9)
result.append(row[0:tab_pos].decode())
return result
def get_paths_internal(self) -> List[str]:
result = []
content = self._content
for row_start, row_end in self._parse_row_bounds():
tab_pos = content.find(9, row_start, row_end)
pos = tab_pos + 1
while pos < row_end:
tab_pos = content.find(9, pos, row_end)
if tab_pos < 0:
tab_pos = row_end
result.append(content[pos:tab_pos].decode())
pos = tab_pos + 1
return result
def _parse_rows(self):
content = self._content
content_len = len(content)
line_start = 0
while line_start < content_len:
line_end = content.find(10, line_start)
assert line_end > line_start, "Malformed tabular file"
yield content[line_start:line_end]
line_start = line_end + 1
def _parse_row_bounds(self):
content = self._content
content_len = len(content)
line_start = 0
while line_start < content_len:
line_end = content.find(10, line_start)
assert line_end > line_start, "Malformed tabular file"
yield line_start, line_end
line_start = line_end + 1
def _find_row_starting_with(self, key: bytes) -> bytes:
"""Find the row in a index input file with a given key.
Args:
key: first item to look for in input rows.
Returns:
On success, a byte string corresponding to the row, without the
newline terminator. On failure, an empty string.
"""
# Use local variable to avoid self -> _content dereference on each use.
# Also use integer values for find() as this is faster in practice than
# using b"\t" and b"\n".
content = self._content
if False:
# Linear scan. Keep this code here for debugging.
# Benchmarking shows that this is only 10%-20% slower than bisection
# on 40 MiB input files from Fuchsia build plans.
content_len = len(content)
content_pos = 0
needle = key + 9
needle_len = len(needle)
while content_pos < content_len:
pos = content.find(needle, content_pos)
if pos < 0:
return b""
if pos > 0 and self._content[pos - 1] != 10:
# Mismatch
content_pos = pos + needle_len
continue
line_start = pos
line_end = content.find(10, pos + needle_len, content_len)
if line_end < 0:
line_end = content_len
return content[line_start:line_end]
else:
# Bisection, since the lines are sorted by key (i.e. first column).
# NOTE: This is faster than bisecting with a table of line start offsets.
lo = 0
hi = len(content)
while lo < hi:
mid = (lo + hi) // 2
cur_line_start = content.rfind(10, lo, mid)
if cur_line_start < 0:
cur_line_start = lo
else:
cur_line_start += 1
cur_line_end = content.find(10, cur_line_start, hi)
if cur_line_end < 0:
cur_line_end = hi
first_tab = content.find(9, cur_line_start, cur_line_end)
line_key = content[cur_line_start:first_tab]
if line_key == key:
return content[cur_line_start:cur_line_end]
elif line_key < key:
lo = cur_line_end + 1
else:
hi = cur_line_start
return b""
def _find_rows_containing_value(self, value: bytes) -> List[bytes]:
"""Find all rows in a index file which contain a given value.
Args:
value: value to look for in each row, past the first key of each row.
Returns:
A list of strings, one per matching row, without any newline
terminators. On failure, an empty list is returned.
"""
# Use local variable for performance.
content = self._content
result = []
value_len = len(value)
content_len = len(content)
content_pos = 0
while content_pos < content_len:
pos = content.find(value, content_pos)
if pos < 0:
break
# Found it, check boundaries.
if (
pos == 0
or content[pos - 1] != 9
or content[pos + value_len] not in (9, 10)
):
# Mismatch.
content_pos = pos + value_len
continue
# Find line start and end.
line_start = content.rfind(10, 0, pos)
if line_start < 0:
line_start = 0
else:
line_start += 1
line_end = content.find(10, pos, content_len)
if line_end < 0:
line_end = content_len
result.append(content[line_start:line_end])
content_pos = line_end
return result
class NinjaOutputsSqlite3(NinjaOutputsBase):
"""A NinjaOutputsBase implementation that uses an in-memory sqlite3 database.
Profiling that queries are slightly faster than the NinjaOutputsTabular implementation
(46.5ms versus 57.3ms). But generation takes a lot more time (4.7s vs 443ms).
over NinjaOutputsTabular.
The generated file is about 109 MiB for a 47 MiB input file.
"""
def __init__(self):
super().__init__()
self._db = None
def __exit__(self, exc_type, exc_value, exc_tb):
if self._db:
self._db.close()
return False
def load_from_json(self, input_path: Path):
with input_path.open() as f:
input_json = json.load(f)
self._db = sqlite3.connect(":memory:")
# Create the three tables + two indices.
c = self._db.cursor()
c.execute(
"CREATE TABLE labels (id integer PRIMARY KEY, label text NOT NULL)"
)
c.execute(
"CREATE TABLE paths (id integer PRIMARY KEY, path text NOT NULL)"
)
c.execute("CREATE TABLE outputs (label_id integer, path_id integer)")
# Populate the tables
for label, paths in input_json.items():
c.execute(f"INSERT INTO labels(label) VALUES('{label}')")
label_id = c.lastrowid
for path in paths:
c.execute(f'INSERT INTO paths(path) VALUES("{path}")')
path_id = c.lastrowid
c.execute(
f"INSERT INTO outputs(label_id, path_id) VALUES({label_id}, {path_id});"
)
# Create the indices after the tables are created, for better performance.
c.execute("CREATE UNIQUE INDEX idx_labels ON labels(label)")
c.execute("CREATE UNIQUE INDEX idx_paths ON paths(path)")
c.execute("CREATE INDEX idx_output_labels ON outputs(label_id)")
c.execute("CREATE UNIQUE INDEX idx_output_paths ON outputs(path_id)")
self._db.commit()
def load_from_file(self, input_path: Path):
if self._db:
self._db.close()
self._db = sqlite3.connect(input_path)
def save_to_file(self, output_path: Path):
assert (
self._db
), "No database, please call load_from_json() or load_from_file() first!"
output_path.parent.mkdir(parents=True, exist_ok=True)
if output_path.exists():
output_path.unlink()
out_db = sqlite3.connect(output_path)
self._db.backup(out_db)
out_db.close()
return True
def get_labels_internal(self) -> List[str]:
if not self._db:
return []
c = self._db.cursor()
c.execute("""SELECT l.label FROM labels l""")
return [x[0] for x in c.fetchall()]
def get_paths_internal(self) -> List[str]:
if not self._db:
return []
c = self._db.cursor()
c.execute("""SELECT p.path FROM paths p""")
return sorted(x[0] for x in c.fetchall())
def gn_label_to_paths(self, label: str) -> List[str]:
if not self._db:
return []
c = self._db.cursor()
c.execute(
"""SELECT p.path FROM paths p WHERE p.id IN (
SELECT o.path_id FROM outputs o WHERE o.label_id IN (
SELECT l.id FROM labels l WHERE l.label = ?))""",
(label,),
)
return [x[0] for x in c.fetchall()]
def path_to_gn_label(self, path: str) -> str:
if not self._db:
return ""
c = self._db.cursor()
c.execute(
"""SELECT l.label FROM labels l WHERE l.id IN (
SELECT o.label_id FROM outputs o WHERE o.path_id IN (
SELECT p.id FROM paths p WHERE p.path = ?))""",
(path,),
)
labels = c.fetchall()
if not labels:
return ""
return labels[0][0]
_FORMAT_TO_CLASS_MAP = {
"json": NinjaOutputsJSON,
"sqlite": NinjaOutputsSqlite3,
"marshal": NinjaOutputsMarshal,
"tabular": NinjaOutputsTabular,
}
_VALID_FORMATS = ["auto"] + list(_FORMAT_TO_CLASS_MAP.keys())
def _autodetect_format(path: Path):
name = path.name
for f in _FORMAT_TO_CLASS_MAP.keys():
if name.endswith("." + f):
return f
return None
def main():
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("input_file", type=Path, help="Input JSON file.")
parser.add_argument(
"--format",
choices=_VALID_FORMATS,
default="auto",
help="Select database format.",
)
subparsers = parser.add_subparsers(title="subcommands", dest="cmd")
generate_parser = subparsers.add_parser(
"generate", help="Generate database file"
)
generate_parser.add_argument(
"output_file", type=Path, help="Output database file."
)
generate_parser.add_argument(
"--output-format",
choices=_VALID_FORMATS,
default="auto",
help="Output database format.",
)
query_paths_parser = subparsers.add_parser(
"query_paths", help="Convert a GN label into a list of paths"
)
query_paths_parser.add_argument(
"label", metavar="gn_label", help="GN label"
)
query_label_parser = subparsers.add_parser(
"query_label", help="Convert Ninja output path into GN label"
)
query_label_parser.add_argument(
"path", metavar="ninja_path", help="Ninja path"
)
list_labels_parser = subparsers.add_parser(
"list_labels", help="List of GN labels in database."
)
list_paths_parser = subparsers.add_parser(
"list_paths", help="List all Ninja paths in database."
)
closest_label_parser = subparsers.add_parser(
"closest_label", help="Find closest label matching input value."
)
closest_label_parser.add_argument("label", help="Input GN label value.")
closest_label_parser.add_argument(
"--max-distance", default=2, help="Maximum distance (2)."
)
closest_path_parser = subparsers.add_parser(
"closest_path", help="Find closest Ninja path matching input value."
)
closest_path_parser.add_argument("path", help="Input Ninja path value.")
closest_path_parser.add_argument(
"--max-distance", default=2, help="Maximum distance (2)."
)
args = parser.parse_args()
if args.format == "auto":
args.format = _autodetect_format(args.input_file)
if not args.format:
parser.error(
"Could not determine format based on file extension, use --format=FORMAT."
)
if args.cmd == "generate":
if args.format != "json":
parser.error("Command only accepts json format as input!")
if args.output_format == "auto":
args.output_format = _autodetect_format(args.output_file)
if not args.output_format:
parser.error(
"Could not determine output format based on file extension, use --output-format=FORMAT."
)
database = _FORMAT_TO_CLASS_MAP[args.output_format]()
database.load_from_json(args.input_file)
database.save_to_file(args.output_file)
return 0
# All other commands can read from an input database in any format.
input_class = _FORMAT_TO_CLASS_MAP[args.format]
database = input_class()
database.load_from_file(args.input_file)
if args.cmd == "query_paths":
for path in database.gn_label_to_paths(args.label):
print(path)
return 0
if args.cmd == "query_label":
print(database.path_to_gn_label(args.path))
return 0
if args.cmd == "list_labels":
for label in database.get_labels():
print(label)
return 0
if args.cmd == "list_paths":
for path in database.get_paths():
print(path)
return 0
if args.cmd == "closest_label":
label = closest_match(
args.label,
database.get_labels(sort_result=False),
args.max_distance,
)
if label:
print(label)
return 0
if args.cmd == "closest_path":
path = closest_match(
args.path, database.get_paths(sort_result=False), args.max_distance
)
if path:
print(path)
return 0
parser.error("Invalid command, see --help.")
return 1
if __name__ == "__main__":
sys.exit(main())