blob: fbdf2389dbeb39145c414ea108809ad7c317656b [file] [log] [blame]
#!/usr/bin/env python3
# Copyright 2020 The Fuchsia Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""
A hacky script to clean up recipe DEPS:
- Delete unused recipe and recipe module DEPS, and automatically add entries to
DEPS for potential dependencies that aren't yet in DEPS. Whether a module is
used is determined based on AST analysis.
- Ensure all deps are prefixed with a repo name (e.g. "fuchsia").
- Sort all DEPS lists alphabetically.
"""
# pylint: disable=unspecified-encoding
import argparse
import ast
import difflib
import functools
import json
import os
import re
import sys
def main():
parser = argparse.ArgumentParser(description="Sort and filter recipe DEPS")
parser.add_argument(
"--check",
action="store_true",
help=(
"Instead of writing changes, print diff to stdout and exit with retcode 1 "
"if cleanup is needed."
),
)
parser.add_argument(
"--json-output",
type=str,
help=(
"Write a JSON list of relative paths of badly formatted files to this file."
),
)
args = parser.parse_args()
cwd = os.getcwd()
# Get the name of this recipes repo to append to any DEPS that don't
# specify a repo.
with open(os.path.join(cwd, "infra", "config", "recipes.cfg")) as f:
cfg = json.load(f)
repo_name = cfg["repo_name"]
files_with_deps = []
# Collect a mapping of upstream repo to list of module names, so we can
# infer the source repo for `api.foo` references that don't yet have an
# entry in DEPS.
upstream_modules = {}
recipe_deps_dir = os.path.join(cwd, ".recipe_deps")
if os.path.isdir(recipe_deps_dir):
for upstream_repo in os.listdir(recipe_deps_dir):
repo_dir = os.path.join(recipe_deps_dir, upstream_repo)
config_path = os.path.join(repo_dir, "infra", "config", "recipes.cfg")
if not os.path.exists(config_path):
continue
for module in os.listdir(os.path.join(repo_dir, "recipe_modules")):
if os.path.exists(
os.path.join(repo_dir, "recipe_modules", module, "api.py")
):
upstream_modules.setdefault(upstream_repo, set()).add(module)
recipes_dir = os.path.join(cwd, "recipes")
modules_dir = os.path.join(cwd, "recipe_modules")
for directory in [recipes_dir, modules_dir]:
for subdir, _, files in os.walk(directory):
for relpath in files:
_, ext = os.path.splitext(relpath)
if ext != ".py":
continue
path = os.path.join(subdir, relpath)
# __init__.py files are handled separately, since they contain
# DEPS entries but usages of those deps may be in other files
# in the directory.
if relpath == "__init__.py":
continue
files_with_deps.append(analyze_recipe(path, repo_name))
for relpath in os.listdir(modules_dir):
path = os.path.join(modules_dir, relpath)
if os.path.exists(os.path.join(path, "api.py")):
files_with_deps.append(analyze_module(path, repo_name))
for f in files_with_deps:
f.upstream_modules = upstream_modules
files_to_fix = [f for f in files_with_deps if f.diff]
for f in files_to_fix:
if args.check:
script_relpath = os.path.relpath(__file__, cwd)
print(
"Some recipe files' DEPS are malformatted.\n"
"Run ./%s to fix:\n" % script_relpath
)
print(f.diff)
else:
f.rewrite()
if args.json_output:
with open(args.json_output, "w") as outfile:
json.dump([os.path.relpath(f.path, cwd) for f in files_to_fix], outfile)
if args.check and files_to_fix:
sys.exit(1)
class FileWithDEPS:
def __init__(self, path, repo_name):
"""Read the file and its DEPS."""
self.path = path
self._repo_name = repo_name
self.upstream_modules = {}
# To be updated by the caller after static analysis.
self.used_deps = set()
# A mapping from full dep name (e.g. "fuchsia/foo") to a list of lines
# that correspond to that dep - the actual import line that names the
# dep, along with any comments preceding the import line. All lines
# include any trailing newline.
self.original_deps = {}
# The original lines of the file, including trailing newlines.
self.original_lines = []
# The line number of the first dep.
self._deps_start_line = -1
# The line number of the line after the last dep.
self._deps_end_line = -1
with open(self.path) as f:
self.original_lines = f.readlines()
single_quote_count = double_quote_count = 0
# The lines associated with the dep currently being parsed, including any
# comment lines preceding the dep.
current_dep_lines = []
for i, original_line in enumerate(self.original_lines):
line = original_line.strip()
if line == "DEPS = [":
self._deps_start_line = i + 1
continue
elif self._deps_start_line == -1:
continue
elif line == "]":
self._deps_end_line = i
break
if line.startswith("#"):
current_dep_lines.append(original_line)
continue
match = re.search(
r'(?P<quote>[\'|"])(?P<dep>\S+)(?P=quote),?\s*(?P<comment>\#.*)?$',
line,
)
if not match:
current_dep_lines.append(original_line)
continue
if match.group("quote") == "'":
single_quote_count += 1
elif match.group("quote") == '"':
double_quote_count += 1
dep = match.group("dep")
dep_line = original_line
# Prepend the recipe repo name (assumed to be repo_name) if it's
# missing.
if "/" not in dep:
old_dep = dep
dep = "%s/%s" % (self._repo_name, old_dep)
dep_line = dep_line.replace(old_dep, dep, 1)
current_dep_lines.append(dep_line)
self.original_deps[dep] = "".join(current_dep_lines)
current_dep_lines = []
self._quote = '"'
if single_quote_count > double_quote_count:
self._quote = "'"
@property
@functools.lru_cache
def new_lines(self):
"""Returns a list of the lines of the file with formatting applied."""
new_deps = {}
used_deps = self.used_deps.copy()
for dep, lines in self.original_deps.items():
dep_basename = dep.split("/")[-1]
if dep in used_deps or dep_basename in used_deps:
new_deps[dep] = lines
used_deps.difference_update({dep, dep_basename})
# For any module reference that *doesn't* correspond to a declared dep,
# add it to DEPS. This isn't absolutely necessary, but it's a nice
# feature to automate the process of populating DEPS.
for dep in used_deps:
full_dep = dep
if "/" not in dep:
full_dep = f"{self._repo_name}/{dep}"
for upstream_repo, mods in self.upstream_modules.items():
if dep in mods:
full_dep = f"{upstream_repo}/{dep}"
break
new_deps[full_dep] = f" {self._quote}{full_dep}{self._quote},\n"
sorted_dep_lines = [text for _, text in sorted(new_deps.items())]
new_lines = self.original_lines[:]
new_lines[self._deps_start_line : self._deps_end_line] = sorted_dep_lines
return new_lines
@property
@functools.lru_cache
def diff(self):
"""Returns a git-style rendering of the diff that would be produced."""
return "".join(
difflib.unified_diff(
self.original_lines, self.new_lines, "a" + self.path, "b" + self.path
)
).rstrip()
def rewrite(self):
print("rewriting %s" % os.path.relpath(self.path, os.getcwd()))
with open(self.path, "w") as f:
f.writelines(self.new_lines)
def analyze_recipe(path, repo_name):
"""Check for unused DEPS in a recipe file.
Args:
path (str): The absolute path to the recipe Python file.
repo_name (str): The name of the current recipes repo.
"""
recipe_file = FileWithDEPS(path, repo_name)
if not recipe_file.original_deps:
# If there are no original deps it may mean that this isn't actually a
# recipe file, so skip it.
return recipe_file
with open(path) as f:
tree = ast.parse(f.read())
recipe_file.used_deps.update(find_usages(tree))
return recipe_file
def analyze_module(module_dir, repo_name):
"""Check for unused DEPS in a recipe module's __init__.py file.
Args:
module_dir (str): The absolute path to the root of the recipe module.
repo_name (str): The name of the current recipes repo.
"""
# We'll search for matches of this regex in each of this module's files as
# a heuristic for determining which dependencies the module uses.
init_path = os.path.join(module_dir, "__init__.py")
module_name = os.path.basename(module_dir)
if not os.path.exists(init_path):
raise Exception("recipe module %s has no __init__.py file" % module_name)
init_file = FileWithDEPS(init_path, repo_name)
for subdir, subdirs, files in os.walk(module_dir, topdown=True):
# The "examples" directory contains standalone recipes that don't
# relate to the recipe module's DEPS and shouldn't be taken into
# account when computing the recipe module's unused DEPS. Likewise, the
# "resources" directory contains standalone scripts that don't use
# recipe DEPS at all.
if subdir == module_dir:
for special_subdir in ["examples", "tests", "resources"]:
if special_subdir in subdirs:
# Tell os.walk() not to enter this subdirectory.
subdirs.remove(special_subdir)
for relpath in files:
_, ext = os.path.splitext(relpath)
if ext != ".py":
continue
path = os.path.join(subdir, relpath)
with open(path) as f:
tree = ast.parse(f.read())
usages = set(find_usages(tree))
usages.difference_update({module_name, f"{repo_name}/{module_name}"})
init_file.used_deps.update(usages)
return init_file
def find_usages(tree: ast.AST):
"""Given an AST of a Python file, find all recipe dep usages.
Scans the AST for references like `api.foo.func()`, `self.m.foo.func()`, and
`from RECIPE_MODULES.repo.foo.api import bar`.
Yields an iterator of module names. Modules accessed via `import
RECIPE_MODULES.repo.foo` imports will be of the form "repo/foo" because the
repository can be resolved, whereas modules accessed via `api.foo` will
correspond to returned values of the form "foo", since the source module
can't be determined definitively.
"""
# Potential references to a RecipeApi object that dependencies are attached
# to.
# TODO(olivernewman): Using `ast.dump()` is a hacky way to check for AST
# equality. Use `ast.unparse()` instead once Python 3.9 is everywhere.
api_refs = [
ast.dump(ast.parse(r).body[0].value) for r in ("self._api", "self.m", "api")
]
# RecipeApi attributes that should be ignored because they are methods
# provided by the recipe engine internals, rather than imported modules.
ignore_attrs = (
"test",
"step_data",
"post_process",
"resource",
"expect_exception",
"override_step_data",
)
for node in ast.walk(tree):
if isinstance(node, ast.ImportFrom):
if not node.module:
continue
parts = node.module.split(".")
if parts[0] == "RECIPE_MODULES":
yield f"{parts[1]}/{parts[2]}"
elif isinstance(node, ast.Attribute):
if (
ast.dump(node.value) in api_refs
and node.attr not in ignore_attrs
and re.match(r"[a-z][_a-z]*", node.attr)
):
yield node.attr
if __name__ == "__main__":
main()