|  | #!/usr/bin/env python3 | 
|  | # Copyright 2022 The Fuchsia Authors. All rights reserved. | 
|  | # Use of this source code is governed by a BSD-style license that can be | 
|  | # found in the LICENSE file. | 
|  |  | 
|  | """Generate a content hash file from one or more source repository content. | 
|  |  | 
|  | By default, scan all files in the source paths (file or directory) and hash | 
|  | their content. | 
|  |  | 
|  | - For symlinks, the link value is used as input (not the target file). | 
|  |  | 
|  | - For regular files, its sha1 digest is used as input. | 
|  |  | 
|  | - For git directories (which have a .git entry), use the HEAD commit as input | 
|  | to speed things dramatically. NOTE: This ignores changes to the index, | 
|  | during development. | 
|  |  | 
|  | - If --cipd-name=NAME is set, and <source_path>/.versions/NAME.cipd_version | 
|  | exists, its content will be used as input. | 
|  |  | 
|  | - Otherwise, for directories, all files in it are found recursively, | 
|  | and used as hash input independently. | 
|  | """ | 
|  |  | 
|  | import argparse | 
|  | import hashlib | 
|  | import os | 
|  | import sys | 
|  | import typing as T | 
|  | from pathlib import Path | 
|  |  | 
|  | _HASH = "sha1" | 
|  |  | 
|  | sys.path.insert(0, str(Path(__file__).parent)) | 
|  | import get_git_head_commit as gghc | 
|  |  | 
|  |  | 
|  | def _depfile_quote(p: Path | str) -> str: | 
|  | """Quote a Path value for depfile output.""" | 
|  | return str(p).replace("\\", "\\\\").replace(" ", "\\ ") | 
|  |  | 
|  |  | 
|  | class FileState(object): | 
|  | """State object used to hash one or more source paths. | 
|  |  | 
|  | Usage is: | 
|  | - Create instance | 
|  | - Call hash_source_path() as many times as possible. | 
|  | - Use content_hash property to get final result. | 
|  | - Use sorted_input_files to get list of input files. | 
|  | """ | 
|  |  | 
|  | def __init__( | 
|  | self, | 
|  | cipd_names: T.Sequence[str] = [], | 
|  | exclude_suffixes: T.Sequence[str] = [], | 
|  | git_binary: Path = Path("git"), | 
|  | ) -> None: | 
|  | """Create new instance. | 
|  |  | 
|  | Args: | 
|  | cipd_names: A sequence of cipd names for prebuilt directories. | 
|  | exclude_suffixes: A sequence of filename suffixes to exclude from hashing. | 
|  | git_binary: Path to the git binary to use for .git repositories. | 
|  | """ | 
|  | self._cipd_names = cipd_names | 
|  | self._exclude_suffixes = tuple(exclude_suffixes) | 
|  | self._git_binary = git_binary | 
|  | self._input_files: set[Path] = set() | 
|  | self._sorted_input_files: T.Optional[list[str]] = None | 
|  | self._hstate = hashlib.new(_HASH) | 
|  |  | 
|  | def hash_source_path(self, source_path: Path) -> None: | 
|  | """Process and hash a given source file, updating internal state.""" | 
|  | self._hstate.update(self.process_source_path(source_path).encode()) | 
|  |  | 
|  | def find_directory_files(self, source_path: Path) -> set[Path]: | 
|  | source_path.is_dir(), f"Input source path is not a directory: {source_path}" | 
|  |  | 
|  | if self._cipd_names: | 
|  | for cipd_name in self._cipd_names: | 
|  | clang_version_file = ( | 
|  | source_path / ".versions" / f"{cipd_name}.cipd_version" | 
|  | ) | 
|  | if clang_version_file.exists(): | 
|  | return set([clang_version_file]) | 
|  |  | 
|  | # Find all files in direcrory. | 
|  | dir_files: set[Path] = set() | 
|  | for dirpath, dirnames, filenames in os.walk(source_path): | 
|  | for filename in filenames: | 
|  | if filename.endswith(self._exclude_suffixes): | 
|  | continue | 
|  | file_path = Path(os.path.join(dirpath, filename)) | 
|  | dir_files.add(file_path) | 
|  |  | 
|  | return dir_files | 
|  |  | 
|  | def process_source_path(self, source_path: Path) -> str: | 
|  | """Process a given source file, and return a string descriptor for it. | 
|  |  | 
|  | The first letter of the result corresponds to the type of the source path. | 
|  | This function is useful for unit-testing the implementation and verify | 
|  | that different types of source paths are handled correctly. Apart from | 
|  | that, consider this as an implementation detail. | 
|  | """ | 
|  | if not source_path.exists(): | 
|  | raise ValueError(f"Path does not exist: {source_path}") | 
|  |  | 
|  | if source_path.is_dir() and (source_path / ".git").exists(): | 
|  | head_commit = gghc.get_git_head_commit( | 
|  | source_path, self._git_binary | 
|  | ) | 
|  | self._input_files.update(gghc.find_git_head_inputs(source_path)) | 
|  | return "G" + head_commit | 
|  |  | 
|  | if source_path.is_symlink(): | 
|  | self._input_files.add(source_path) | 
|  | return "S" + str(source_path.readlink()) | 
|  |  | 
|  | if source_path.is_file(): | 
|  | self._input_files.add(source_path) | 
|  | with source_path.open("rb") as f: | 
|  | digest = hashlib.file_digest(f, _HASH) | 
|  | return "F" + digest.hexdigest() | 
|  |  | 
|  | assert source_path.is_dir(), f"Unexpected file type for {source_path}" | 
|  |  | 
|  | if self._cipd_names: | 
|  | for cipd_name in self._cipd_names: | 
|  | clang_version_file = ( | 
|  | source_path / ".versions" / f"{cipd_name}.cipd_version" | 
|  | ) | 
|  | if clang_version_file.exists(): | 
|  | return self.process_source_path(clang_version_file) | 
|  |  | 
|  | # Get the list of files relative to the source directory. | 
|  | dir_files: list[str] = [ | 
|  | os.path.relpath(f, source_path) | 
|  | for f in self.find_directory_files(source_path) | 
|  | ] | 
|  |  | 
|  | # Process them recursively to build a directory description text | 
|  | # where each line looks like: <file> <type><digest> | 
|  | dir_content = "D\n" | 
|  | for dir_file in sorted(dir_files): | 
|  | file_hash = self.process_source_path(source_path / dir_file) | 
|  | dir_content += f" {dir_file} {file_hash}\n" | 
|  | return dir_content | 
|  |  | 
|  | @property | 
|  | def content_hash(self) -> str: | 
|  | """Return final content hash as hexadecimal string.""" | 
|  | return self._hstate.hexdigest() | 
|  |  | 
|  | @property | 
|  | def sorted_input_files(self) -> list[str]: | 
|  | """Return the list of input files used by this instance.""" | 
|  | if self._sorted_input_files is None: | 
|  | self._sorted_input_files = sorted( | 
|  | [str(p) for p in self._input_files] | 
|  | ) | 
|  | return self._sorted_input_files | 
|  |  | 
|  | def get_input_file_paths(self) -> set[Path]: | 
|  | """Return the set of input file Path values used by this instance.""" | 
|  | return self._input_files | 
|  |  | 
|  |  | 
|  | def main() -> int: | 
|  | parser = argparse.ArgumentParser( | 
|  | description=__doc__, formatter_class=argparse.RawTextHelpFormatter | 
|  | ) | 
|  | parser.add_argument( | 
|  | "--cipd-name", | 
|  | action="append", | 
|  | default=[], | 
|  | help="Provide name for optional CIPD version file. Can be used multiple times.", | 
|  | ) | 
|  | parser.add_argument( | 
|  | "--exclude-suffix", | 
|  | action="append", | 
|  | default=[], | 
|  | help='Exclude directory entries with given suffix (e.g. ".pyc").\n' | 
|  | + "Can be used multiple times.", | 
|  | ) | 
|  | parser.add_argument( | 
|  | "--git-binary", | 
|  | type=Path, | 
|  | default=Path("git"), | 
|  | help="Specify git binary to use for git repositories.", | 
|  | ) | 
|  | parser.add_argument( | 
|  | "source_path", | 
|  | type=Path, | 
|  | nargs="+", | 
|  | help="Source file or directory path.", | 
|  | ) | 
|  | parser.add_argument( | 
|  | "--output", type=Path, help="Optional output file path." | 
|  | ) | 
|  | parser.add_argument( | 
|  | "--depfile", type=Path, help="Optional Ninja depfile output file path." | 
|  | ) | 
|  | parser.add_argument( | 
|  | "--inputs-list", | 
|  | type=Path, | 
|  | help="Write list of inputs to file, one path per line.", | 
|  | ) | 
|  | args = parser.parse_args() | 
|  |  | 
|  | if args.depfile and not args.output: | 
|  | parser.error("--depfile option requires --output.") | 
|  |  | 
|  | fstate = FileState(args.cipd_name, args.exclude_suffix, args.git_binary) | 
|  | for source_path in args.source_path: | 
|  | fstate.hash_source_path(source_path) | 
|  |  | 
|  | if args.output: | 
|  | # Do not modify existing output if it has the same content. | 
|  | current_content = "~~~" | 
|  | if args.output.exists(): | 
|  | current_content = args.output.read_text() | 
|  | if current_content != fstate.content_hash: | 
|  | args.output.write_text(fstate.content_hash) | 
|  | else: | 
|  | print(fstate.content_hash) | 
|  |  | 
|  | if args.inputs_list: | 
|  | args.inputs_list.write_text("\n".join(fstate.sorted_input_files) + "\n") | 
|  |  | 
|  | if args.depfile: | 
|  | args.depfile.write_text( | 
|  | "%s: \\\n  %s\n" | 
|  | % ( | 
|  | args.output, | 
|  | " \\\n  ".join( | 
|  | _depfile_quote(f) for f in fstate.sorted_input_files | 
|  | ), | 
|  | ) | 
|  | ) | 
|  |  | 
|  | return 0 | 
|  |  | 
|  |  | 
|  | if __name__ == "__main__": | 
|  | sys.exit(main()) |