| #!/usr/bin/env python3 |
| # Copyright 2020 The Fuchsia Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| """ |
| A hacky script to clean up recipe DEPS: |
| - Delete unused recipe and recipe module DEPS, and automatically add entries to |
| DEPS for potential dependencies that aren't yet in DEPS. Whether a module is |
| used is determined based on AST analysis. |
| - Ensure all deps are prefixed with a repo name (e.g. "fuchsia"). |
| - Sort all DEPS lists alphabetically. |
| """ |
| # pylint: disable=unspecified-encoding |
| |
| import argparse |
| import ast |
| import difflib |
| import functools |
| import json |
| import os |
| import re |
| import sys |
| import typing |
| |
| |
| def main(): |
| parser = argparse.ArgumentParser(description="Sort and filter recipe DEPS") |
| parser.add_argument( |
| "--check", |
| action="store_true", |
| help=( |
| "Instead of writing changes, print diff to stdout and exit with retcode 1 " |
| "if cleanup is needed." |
| ), |
| ) |
| parser.add_argument( |
| "--json-output", |
| type=str, |
| help=( |
| "Write a JSON list of relative paths of badly formatted files to this file." |
| ), |
| ) |
| args = parser.parse_args() |
| |
| cwd = os.getcwd() |
| |
| # Get the name of this recipes repo to append to any DEPS that don't |
| # specify a repo. |
| with open(os.path.join(cwd, "infra", "config", "recipes.cfg")) as f: |
| cfg = json.load(f) |
| repo_name = cfg["repo_name"] |
| |
| files_with_deps = [] |
| |
| # Collect a mapping of upstream repo to list of module names, so we can |
| # infer the source repo for `api.foo` references that don't yet have an |
| # entry in DEPS. |
| upstream_modules = {} |
| recipe_deps_dir = os.path.join(cwd, ".recipe_deps") |
| if os.path.isdir(recipe_deps_dir): |
| for upstream_repo in os.listdir(recipe_deps_dir): |
| repo_dir = os.path.join(recipe_deps_dir, upstream_repo) |
| config_path = os.path.join(repo_dir, "infra", "config", "recipes.cfg") |
| if not os.path.exists(config_path): |
| continue |
| for module in os.listdir(os.path.join(repo_dir, "recipe_modules")): |
| if os.path.exists( |
| os.path.join(repo_dir, "recipe_modules", module, "api.py") |
| ): |
| upstream_modules.setdefault(upstream_repo, set()).add(module) |
| |
| recipes_dir = os.path.join(cwd, "recipes") |
| modules_dir = os.path.join(cwd, "recipe_modules") |
| for directory in [recipes_dir, modules_dir]: |
| for subdir, _, files in os.walk(directory): |
| for relpath in files: |
| _, ext = os.path.splitext(relpath) |
| if ext != ".py": |
| continue |
| path = os.path.join(subdir, relpath) |
| # __init__.py files are handled separately, since they contain |
| # DEPS entries but usages of those deps may be in other files |
| # in the directory. |
| if relpath == "__init__.py": |
| continue |
| files_with_deps.append(analyze_recipe(path, repo_name)) |
| |
| for relpath in os.listdir(modules_dir): |
| path = os.path.join(modules_dir, relpath) |
| if os.path.exists(os.path.join(path, "api.py")): |
| files_with_deps.append(analyze_module(path, repo_name)) |
| |
| for f in files_with_deps: |
| f.upstream_modules = upstream_modules |
| |
| files_to_fix = [f for f in files_with_deps if f.diff] |
| for f in files_to_fix: |
| if args.check: |
| if not args.json_output: |
| script_relpath = os.path.relpath(__file__, cwd) |
| print( |
| f"Some recipe files' DEPS are malformatted.\nRun ./{script_relpath} to fix:\n" |
| ) |
| print(f.diff) |
| else: |
| f.rewrite() |
| |
| if args.json_output: |
| j = [os.path.relpath(f.path, cwd) for f in files_to_fix] |
| if args.json_output == "-": |
| print(json.dumps(j)) |
| else: |
| with open(args.json_output, "w") as outfile: |
| json.dump(j, outfile) |
| |
| if args.check and files_to_fix: |
| sys.exit(os.EX_DATAERR) |
| |
| |
| class FileWithDEPS: |
| def __init__(self, path, repo_name): |
| """Read the file and its DEPS.""" |
| self.path = path |
| self._repo_name = repo_name |
| self.upstream_modules = {} |
| |
| # To be updated by the caller after static analysis. |
| self.used_deps = set() |
| |
| # A mapping from full dep name (e.g. "fuchsia/foo") to a list of lines |
| # that correspond to that dep - the actual import line that names the |
| # dep, along with any comments preceding the import line. All lines |
| # include any trailing newline. |
| self.original_deps = {} |
| # The original lines of the file, including trailing newlines. |
| self.original_lines = [] |
| |
| # The line number of the first dep. |
| self._deps_start_line = -1 |
| # The line number of the line after the last dep. |
| self._deps_end_line = -1 |
| |
| with open(self.path) as f: |
| self.original_lines = f.readlines() |
| |
| single_quote_count = double_quote_count = 0 |
| |
| # The lines associated with the dep currently being parsed, including any |
| # comment lines preceding the dep. |
| current_dep_lines = [] |
| |
| for i, original_line in enumerate(self.original_lines): |
| line = original_line.strip() |
| if line == "DEPS = [": |
| self._deps_start_line = i + 1 |
| continue |
| elif self._deps_start_line == -1: |
| continue |
| elif line == "]": |
| self._deps_end_line = i |
| break |
| |
| if line.startswith("#"): |
| current_dep_lines.append(original_line) |
| continue |
| |
| match = re.search( |
| r'(?P<quote>[\'|"])(?P<dep>\S+)(?P=quote),?\s*(?P<comment>\#.*)?$', |
| line, |
| ) |
| if not match: |
| current_dep_lines.append(original_line) |
| continue |
| |
| if match.group("quote") == "'": |
| single_quote_count += 1 |
| elif match.group("quote") == '"': |
| double_quote_count += 1 |
| |
| dep = match.group("dep") |
| |
| dep_line = original_line |
| # Prepend the recipe repo name (assumed to be repo_name) if it's |
| # missing. |
| if "/" not in dep: |
| old_dep = dep |
| dep = f"{self._repo_name}/{old_dep}" |
| dep_line = dep_line.replace(old_dep, dep, 1) |
| |
| current_dep_lines.append(dep_line) |
| self.original_deps[dep] = "".join(current_dep_lines) |
| current_dep_lines = [] |
| |
| self._quote = '"' |
| if single_quote_count > double_quote_count: |
| self._quote = "'" |
| |
| @property |
| @functools.lru_cache |
| def new_lines(self): |
| """Returns a list of the lines of the file with formatting applied.""" |
| new_deps = {} |
| used_deps = self.used_deps.copy() |
| for dep, lines in self.original_deps.items(): |
| dep_basename = dep.split("/")[-1] |
| if dep in used_deps or dep_basename in used_deps: |
| new_deps[dep] = lines |
| used_deps.difference_update({dep, dep_basename}) |
| |
| # For any module reference that *doesn't* correspond to a declared dep, |
| # add it to DEPS. This isn't absolutely necessary, but it's a nice |
| # feature to automate the process of populating DEPS. |
| for dep in used_deps: |
| full_dep = dep |
| if "/" not in dep: |
| full_dep = f"{self._repo_name}/{dep}" |
| for upstream_repo, mods in self.upstream_modules.items(): |
| if dep in mods: |
| full_dep = f"{upstream_repo}/{dep}" |
| break |
| new_deps[full_dep] = f" {self._quote}{full_dep}{self._quote},\n" |
| |
| sorted_dep_lines = [text for _, text in sorted(new_deps.items())] |
| new_lines = self.original_lines[:] |
| new_lines[self._deps_start_line : self._deps_end_line] = sorted_dep_lines |
| return new_lines |
| |
| @property |
| @functools.lru_cache |
| def diff(self): |
| """Returns a git-style rendering of the diff that would be produced.""" |
| return "".join( |
| difflib.unified_diff( |
| self.original_lines, self.new_lines, "a" + self.path, "b" + self.path |
| ) |
| ).rstrip() |
| |
| def rewrite(self): |
| print(f"rewriting {os.path.relpath(self.path, os.getcwd())}") |
| with open(self.path, "w") as f: |
| f.writelines(self.new_lines) |
| |
| |
| def analyze_recipe(path, repo_name): |
| """Check for unused DEPS in a recipe file. |
| |
| Args: |
| path (str): The absolute path to the recipe Python file. |
| repo_name (str): The name of the current recipes repo. |
| """ |
| recipe_file = FileWithDEPS(path, repo_name) |
| if not recipe_file.original_deps: |
| # If there are no original deps it may mean that this isn't actually a |
| # recipe file, so skip it. |
| return recipe_file |
| |
| with open(path) as f: |
| tree = ast.parse(f.read()) |
| |
| recipe_file.used_deps.update(find_usages(tree)) |
| |
| return recipe_file |
| |
| |
| def analyze_module(module_dir, repo_name): |
| """Check for unused DEPS in a recipe module's __init__.py file. |
| |
| Args: |
| module_dir (str): The absolute path to the root of the recipe module. |
| repo_name (str): The name of the current recipes repo. |
| """ |
| # We'll search for matches of this regex in each of this module's files as |
| # a heuristic for determining which dependencies the module uses. |
| init_path = os.path.join(module_dir, "__init__.py") |
| module_name = os.path.basename(module_dir) |
| if not os.path.exists(init_path): |
| raise Exception(f"recipe module {module_name} has no __init__.py file") |
| init_file = FileWithDEPS(init_path, repo_name) |
| for subdir, subdirs, files in os.walk(module_dir, topdown=True): |
| # The "examples" directory contains standalone recipes that don't |
| # relate to the recipe module's DEPS and shouldn't be taken into |
| # account when computing the recipe module's unused DEPS. Likewise, the |
| # "resources" directory contains standalone scripts that don't use |
| # recipe DEPS at all. |
| if subdir == module_dir: |
| for special_subdir in ["examples", "tests", "resources"]: |
| if special_subdir in subdirs: |
| # Tell os.walk() not to enter this subdirectory. |
| subdirs.remove(special_subdir) |
| |
| for relpath in files: |
| _, ext = os.path.splitext(relpath) |
| if ext != ".py": |
| continue |
| path = os.path.join(subdir, relpath) |
| |
| with open(path) as f: |
| tree = ast.parse(f.read()) |
| |
| usages = set(find_usages(tree)) |
| usages.difference_update({module_name, f"{repo_name}/{module_name}"}) |
| init_file.used_deps.update(usages) |
| |
| return init_file |
| |
| |
| # RecipeApi attributes that should be ignored because they are methods |
| # provided by the recipe engine internals, rather than imported modules. |
| _IGNORE_ATTRS = ( |
| "test", |
| "step_data", |
| "post_process", |
| "resource", |
| "expect_exception", |
| "override_step_data", |
| ) |
| |
| # Potential references to a RecipeApi object that dependencies are attached to. |
| # TODO(olivernewman): Using `ast.dump()` is a hacky way to check for AST |
| # equality. Use `ast.unparse()` instead once Python 3.9 is everywhere. |
| _API_REFS = [ |
| ast.dump(ast.parse(r).body[0].value) |
| for r in ("self.api", "self._api", "self.m", "api") |
| ] |
| |
| |
| class FindDepsVisitor(ast.NodeTransformer): |
| """Visit all the nodes in the tree looking for recipe dep usages.""" |
| |
| def __init__(self, *args, **kwargs): |
| super().__init__(*args, **kwargs) |
| self._deps: typing.List[str] = [] |
| |
| def accept(self, node: ast.AST): |
| self._deps = [] |
| self.visit(node) |
| return self._deps |
| |
| def _is_type_checking(self, node: ast.AST): |
| if isinstance(node, ast.Name): |
| return node.id == "TYPE_CHECKING" |
| |
| if isinstance(node, ast.Attribute): |
| if isinstance(node.value, ast.Name): |
| return node.value.id == "typing" and node.attr == "TYPE_CHECKING" |
| |
| return False |
| |
| def visit_If(self, node: ast.If) -> ast.AST: |
| """Delete the body from the tree if the condition is "TYPE_CHECKING". |
| |
| The "typing.TYPE_CHECKING" constant is always False at runtime (but static |
| analyzers may make it True). If it's always False at runtime, then any nested |
| nodes that would otherwise be interpreted as deps aren't really dependencies |
| that the recipe engine or recipe testing needs to know about, so they can be |
| ignored. Do that by removing the body of the if statement before the visitor |
| recurses. |
| """ |
| if self._is_type_checking(node.test): |
| node.body = ast.Pass() |
| |
| return self.generic_visit(node) |
| |
| def visit_ImportFrom(self, node: ast.ImportFrom) -> ast.AST: |
| """Find explicit RECIPE_MODULES imports.""" |
| if node.module: |
| parts = node.module.split(".") |
| if parts[0] == "RECIPE_MODULES": |
| self._deps.append(f"{parts[1]}/{parts[2]}") |
| |
| return self.generic_visit(node) |
| |
| def visit_Attribute(self, node: ast.Attribute) -> ast.AST: |
| """Find deps that look like "api.file" or "self.m.file".""" |
| if ( |
| ast.dump(node.value) in _API_REFS |
| and node.attr not in _IGNORE_ATTRS |
| and re.match(r"[a-z][_a-z]*", node.attr) |
| ): |
| self._deps.append(node.attr) |
| |
| return self.generic_visit(node) |
| |
| |
| def find_usages(tree: ast.AST) -> typing.List[str]: |
| """Given an AST of a Python file, find all recipe dep usages. |
| |
| Scans the AST for references like `api.foo.func()`, `self.m.foo.func()`, and |
| `from RECIPE_MODULES.repo.foo.api import bar`. |
| |
| Returns a sequence of module names. Modules accessed via `import |
| RECIPE_MODULES.repo.foo` imports will be of the form "repo/foo" because the |
| repository can be resolved, whereas modules accessed via `api.foo` will |
| correspond to returned values of the form "foo", since the source module |
| can't be determined definitively. |
| """ |
| |
| find_deps_visitor = FindDepsVisitor() |
| return find_deps_visitor.accept(tree) |
| |
| |
| if __name__ == "__main__": |
| main() |