blob: f737a2f0d8e2d9119e53fc02dfa506bb4e082e60 [file] [log] [blame]
#!/usr/bin/env python3
# Copyright 2020 The Fuchsia Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""
A hacky script to clean up recipe DEPS:
- Delete unused recipe and recipe module DEPS, and automatically add entries to
DEPS for potential dependencies that aren't yet in DEPS. Whether a module is
used is determined based on AST analysis.
- Ensure all deps are prefixed with a repo name (e.g. "fuchsia").
- Sort all DEPS lists alphabetically.
"""
# pylint: disable=unspecified-encoding
import argparse
import ast
import difflib
import functools
import json
import os
import re
import sys
import typing
def main():
parser = argparse.ArgumentParser(description="Sort and filter recipe DEPS")
parser.add_argument(
"--check",
action="store_true",
help=(
"Instead of writing changes, print diff to stdout and exit with retcode 1 "
"if cleanup is needed."
),
)
parser.add_argument(
"--json-output",
type=str,
help=(
"Write a JSON list of relative paths of badly formatted files to this file."
),
)
args = parser.parse_args()
cwd = os.getcwd()
# Get the name of this recipes repo to append to any DEPS that don't
# specify a repo.
with open(os.path.join(cwd, "infra", "config", "recipes.cfg")) as f:
cfg = json.load(f)
repo_name = cfg["repo_name"]
files_with_deps = []
# Collect a mapping of upstream repo to list of module names, so we can
# infer the source repo for `api.foo` references that don't yet have an
# entry in DEPS.
upstream_modules = {}
recipe_deps_dir = os.path.join(cwd, ".recipe_deps")
if os.path.isdir(recipe_deps_dir):
for upstream_repo in os.listdir(recipe_deps_dir):
repo_dir = os.path.join(recipe_deps_dir, upstream_repo)
config_path = os.path.join(repo_dir, "infra", "config", "recipes.cfg")
if not os.path.exists(config_path):
continue
for module in os.listdir(os.path.join(repo_dir, "recipe_modules")):
if os.path.exists(
os.path.join(repo_dir, "recipe_modules", module, "api.py")
):
upstream_modules.setdefault(upstream_repo, set()).add(module)
recipes_dir = os.path.join(cwd, "recipes")
modules_dir = os.path.join(cwd, "recipe_modules")
for directory in [recipes_dir, modules_dir]:
for subdir, _, files in os.walk(directory):
for relpath in files:
_, ext = os.path.splitext(relpath)
if ext != ".py":
continue
path = os.path.join(subdir, relpath)
# __init__.py files are handled separately, since they contain
# DEPS entries but usages of those deps may be in other files
# in the directory.
if relpath == "__init__.py":
continue
files_with_deps.append(analyze_recipe(path, repo_name))
for relpath in os.listdir(modules_dir):
path = os.path.join(modules_dir, relpath)
if os.path.exists(os.path.join(path, "api.py")):
files_with_deps.append(analyze_module(path, repo_name))
for f in files_with_deps:
f.upstream_modules = upstream_modules
files_to_fix = [f for f in files_with_deps if f.diff]
for f in files_to_fix:
if args.check:
if not args.json_output:
script_relpath = os.path.relpath(__file__, cwd)
print(
f"Some recipe files' DEPS are malformatted.\nRun ./{script_relpath} to fix:\n"
)
print(f.diff)
else:
f.rewrite()
if args.json_output:
j = [os.path.relpath(f.path, cwd) for f in files_to_fix]
if args.json_output == "-":
print(json.dumps(j))
else:
with open(args.json_output, "w") as outfile:
json.dump(j, outfile)
if args.check and files_to_fix:
sys.exit(os.EX_DATAERR)
class FileWithDEPS:
def __init__(self, path, repo_name):
"""Read the file and its DEPS."""
self.path = path
self._repo_name = repo_name
self.upstream_modules = {}
# To be updated by the caller after static analysis.
self.used_deps = set()
# A mapping from full dep name (e.g. "fuchsia/foo") to a list of lines
# that correspond to that dep - the actual import line that names the
# dep, along with any comments preceding the import line. All lines
# include any trailing newline.
self.original_deps = {}
# The original lines of the file, including trailing newlines.
self.original_lines = []
# The line number of the first dep.
self._deps_start_line = -1
# The line number of the line after the last dep.
self._deps_end_line = -1
with open(self.path) as f:
self.original_lines = f.readlines()
single_quote_count = double_quote_count = 0
# The lines associated with the dep currently being parsed, including any
# comment lines preceding the dep.
current_dep_lines = []
for i, original_line in enumerate(self.original_lines):
line = original_line.strip()
if line == "DEPS = [":
self._deps_start_line = i + 1
continue
elif self._deps_start_line == -1:
continue
elif line == "]":
self._deps_end_line = i
break
if line.startswith("#"):
current_dep_lines.append(original_line)
continue
match = re.search(
r'(?P<quote>[\'|"])(?P<dep>\S+)(?P=quote),?\s*(?P<comment>\#.*)?$',
line,
)
if not match:
current_dep_lines.append(original_line)
continue
if match.group("quote") == "'":
single_quote_count += 1
elif match.group("quote") == '"':
double_quote_count += 1
dep = match.group("dep")
dep_line = original_line
# Prepend the recipe repo name (assumed to be repo_name) if it's
# missing.
if "/" not in dep:
old_dep = dep
dep = f"{self._repo_name}/{old_dep}"
dep_line = dep_line.replace(old_dep, dep, 1)
current_dep_lines.append(dep_line)
self.original_deps[dep] = "".join(current_dep_lines)
current_dep_lines = []
self._quote = '"'
if single_quote_count > double_quote_count:
self._quote = "'"
@property
@functools.lru_cache
def new_lines(self):
"""Returns a list of the lines of the file with formatting applied."""
new_deps = {}
used_deps = self.used_deps.copy()
for dep, lines in self.original_deps.items():
dep_basename = dep.split("/")[-1]
if dep in used_deps or dep_basename in used_deps:
new_deps[dep] = lines
used_deps.difference_update({dep, dep_basename})
# For any module reference that *doesn't* correspond to a declared dep,
# add it to DEPS. This isn't absolutely necessary, but it's a nice
# feature to automate the process of populating DEPS.
for dep in used_deps:
full_dep = dep
if "/" not in dep:
full_dep = f"{self._repo_name}/{dep}"
for upstream_repo, mods in self.upstream_modules.items():
if dep in mods:
full_dep = f"{upstream_repo}/{dep}"
break
new_deps[full_dep] = f" {self._quote}{full_dep}{self._quote},\n"
sorted_dep_lines = [text for _, text in sorted(new_deps.items())]
new_lines = self.original_lines[:]
new_lines[self._deps_start_line : self._deps_end_line] = sorted_dep_lines
return new_lines
@property
@functools.lru_cache
def diff(self):
"""Returns a git-style rendering of the diff that would be produced."""
return "".join(
difflib.unified_diff(
self.original_lines, self.new_lines, "a" + self.path, "b" + self.path
)
).rstrip()
def rewrite(self):
print(f"rewriting {os.path.relpath(self.path, os.getcwd())}")
with open(self.path, "w") as f:
f.writelines(self.new_lines)
def analyze_recipe(path, repo_name):
"""Check for unused DEPS in a recipe file.
Args:
path (str): The absolute path to the recipe Python file.
repo_name (str): The name of the current recipes repo.
"""
recipe_file = FileWithDEPS(path, repo_name)
if not recipe_file.original_deps:
# If there are no original deps it may mean that this isn't actually a
# recipe file, so skip it.
return recipe_file
with open(path) as f:
tree = ast.parse(f.read())
recipe_file.used_deps.update(find_usages(tree))
return recipe_file
def analyze_module(module_dir, repo_name):
"""Check for unused DEPS in a recipe module's __init__.py file.
Args:
module_dir (str): The absolute path to the root of the recipe module.
repo_name (str): The name of the current recipes repo.
"""
# We'll search for matches of this regex in each of this module's files as
# a heuristic for determining which dependencies the module uses.
init_path = os.path.join(module_dir, "__init__.py")
module_name = os.path.basename(module_dir)
if not os.path.exists(init_path):
raise Exception(f"recipe module {module_name} has no __init__.py file")
init_file = FileWithDEPS(init_path, repo_name)
for subdir, subdirs, files in os.walk(module_dir, topdown=True):
# The "examples" directory contains standalone recipes that don't
# relate to the recipe module's DEPS and shouldn't be taken into
# account when computing the recipe module's unused DEPS. Likewise, the
# "resources" directory contains standalone scripts that don't use
# recipe DEPS at all.
if subdir == module_dir:
for special_subdir in ["examples", "tests", "resources"]:
if special_subdir in subdirs:
# Tell os.walk() not to enter this subdirectory.
subdirs.remove(special_subdir)
for relpath in files:
_, ext = os.path.splitext(relpath)
if ext != ".py":
continue
path = os.path.join(subdir, relpath)
with open(path) as f:
tree = ast.parse(f.read())
usages = set(find_usages(tree))
usages.difference_update({module_name, f"{repo_name}/{module_name}"})
init_file.used_deps.update(usages)
return init_file
# RecipeApi attributes that should be ignored because they are methods
# provided by the recipe engine internals, rather than imported modules.
_IGNORE_ATTRS = (
"test",
"step_data",
"post_process",
"resource",
"expect_exception",
"override_step_data",
)
# Potential references to a RecipeApi object that dependencies are attached to.
# TODO(olivernewman): Using `ast.dump()` is a hacky way to check for AST
# equality. Use `ast.unparse()` instead once Python 3.9 is everywhere.
_API_REFS = [
ast.dump(ast.parse(r).body[0].value)
for r in ("self.api", "self._api", "self.m", "api")
]
class FindDepsVisitor(ast.NodeTransformer):
"""Visit all the nodes in the tree looking for recipe dep usages."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._deps: typing.List[str] = []
def accept(self, node: ast.AST):
self._deps = []
self.visit(node)
return self._deps
def _is_type_checking(self, node: ast.AST):
if isinstance(node, ast.Name):
return node.id == "TYPE_CHECKING"
if isinstance(node, ast.Attribute):
if isinstance(node.value, ast.Name):
return node.value.id == "typing" and node.attr == "TYPE_CHECKING"
return False
def visit_If(self, node: ast.If) -> ast.AST:
"""Delete the body from the tree if the condition is "TYPE_CHECKING".
The "typing.TYPE_CHECKING" constant is always False at runtime (but static
analyzers may make it True). If it's always False at runtime, then any nested
nodes that would otherwise be interpreted as deps aren't really dependencies
that the recipe engine or recipe testing needs to know about, so they can be
ignored. Do that by removing the body of the if statement before the visitor
recurses.
"""
if self._is_type_checking(node.test):
node.body = ast.Pass()
return self.generic_visit(node)
def visit_ImportFrom(self, node: ast.ImportFrom) -> ast.AST:
"""Find explicit RECIPE_MODULES imports."""
if node.module:
parts = node.module.split(".")
if parts[0] == "RECIPE_MODULES":
self._deps.append(f"{parts[1]}/{parts[2]}")
return self.generic_visit(node)
def visit_Attribute(self, node: ast.Attribute) -> ast.AST:
"""Find deps that look like "api.file" or "self.m.file"."""
if (
ast.dump(node.value) in _API_REFS
and node.attr not in _IGNORE_ATTRS
and re.match(r"[a-z][_a-z]*", node.attr)
):
self._deps.append(node.attr)
return self.generic_visit(node)
def find_usages(tree: ast.AST) -> typing.List[str]:
"""Given an AST of a Python file, find all recipe dep usages.
Scans the AST for references like `api.foo.func()`, `self.m.foo.func()`, and
`from RECIPE_MODULES.repo.foo.api import bar`.
Returns a sequence of module names. Modules accessed via `import
RECIPE_MODULES.repo.foo` imports will be of the form "repo/foo" because the
repository can be resolved, whereas modules accessed via `api.foo` will
correspond to returned values of the form "foo", since the source module
can't be determined definitively.
"""
find_deps_visitor = FindDepsVisitor()
return find_deps_visitor.accept(tree)
if __name__ == "__main__":
main()