blob: efbfeafdddf2450f9574b34dde9bc307c2c6621f [file] [log] [blame] [edit]
#!/usr/bin/env python3
# Copyright 2022 The Fuchsia Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Generate a content hash file from one or more source repository content.
By default, scan all files in the source paths (file or directory) and hash
their content.
- For symlinks, the link value is used as input (not the target file).
- For regular files, its sha1 digest is used as input.
- For git directories (which have a .git entry), use the HEAD commit as input
to speed things dramatically. NOTE: This ignores changes to the index,
during development.
- If --cipd-name=NAME is set, and <source_path>/.versions/NAME.cipd_version
exists, its content will be used as input.
- Otherwise, for directories, all files in it are found recursively,
and used as hash input independently.
"""
import argparse
import hashlib
import os
import sys
import typing as T
from pathlib import Path
_HASH = "sha1"
sys.path.insert(0, str(Path(__file__).parent))
import get_git_head_commit as gghc
def _depfile_quote(p: Path | str) -> str:
"""Quote a Path value for depfile output."""
return str(p).replace("\\", "\\\\").replace(" ", "\\ ")
class FileState(object):
"""State object used to hash one or more source paths.
Usage is:
- Create instance
- Call hash_source_path() as many times as possible.
- Use content_hash property to get final result.
- Use sorted_input_files to get list of input files.
"""
def __init__(
self,
cipd_names: T.Sequence[str] = [],
exclude_suffixes: T.Sequence[str] = [],
git_binary: Path = Path("git"),
) -> None:
"""Create new instance.
Args:
cipd_names: A sequence of cipd names for prebuilt directories.
exclude_suffixes: A sequence of filename suffixes to exclude from hashing.
git_binary: Path to the git binary to use for .git repositories.
"""
self._cipd_names = cipd_names
self._exclude_suffixes = tuple(exclude_suffixes)
self._git_binary = git_binary
self._input_files: set[Path] = set()
self._sorted_input_files: T.Optional[list[str]] = None
self._hstate = hashlib.new(_HASH)
def hash_source_path(self, source_path: Path) -> None:
"""Process and hash a given source file, updating internal state."""
self._hstate.update(self.process_source_path(source_path).encode())
def find_directory_files(self, source_path: Path) -> set[Path]:
source_path.is_dir(), f"Input source path is not a directory: {source_path}"
if self._cipd_names:
for cipd_name in self._cipd_names:
clang_version_file = (
source_path / ".versions" / f"{cipd_name}.cipd_version"
)
if clang_version_file.exists():
return set([clang_version_file])
# Find all files in direcrory.
dir_files: set[Path] = set()
for dirpath, dirnames, filenames in os.walk(source_path):
for filename in filenames:
if filename.endswith(self._exclude_suffixes):
continue
file_path = Path(os.path.join(dirpath, filename))
dir_files.add(file_path)
return dir_files
def process_source_path(self, source_path: Path) -> str:
"""Process a given source file, and return a string descriptor for it.
The first letter of the result corresponds to the type of the source path.
This function is useful for unit-testing the implementation and verify
that different types of source paths are handled correctly. Apart from
that, consider this as an implementation detail.
"""
if not source_path.exists():
raise ValueError(f"Path does not exist: {source_path}")
if source_path.is_dir() and (source_path / ".git").exists():
head_commit = gghc.get_git_head_commit(
source_path, self._git_binary
)
self._input_files.update(gghc.find_git_head_inputs(source_path))
return "G" + head_commit
if source_path.is_symlink():
self._input_files.add(source_path)
return "S" + str(source_path.readlink())
if source_path.is_file():
self._input_files.add(source_path)
with source_path.open("rb") as f:
digest = hashlib.file_digest(f, _HASH)
return "F" + digest.hexdigest()
assert source_path.is_dir(), f"Unexpected file type for {source_path}"
if self._cipd_names:
for cipd_name in self._cipd_names:
clang_version_file = (
source_path / ".versions" / f"{cipd_name}.cipd_version"
)
if clang_version_file.exists():
return self.process_source_path(clang_version_file)
# Get the list of files relative to the source directory.
dir_files: list[str] = [
os.path.relpath(f, source_path)
for f in self.find_directory_files(source_path)
]
# Process them recursively to build a directory description text
# where each line looks like: <file> <type><digest>
dir_content = "D\n"
for dir_file in sorted(dir_files):
file_hash = self.process_source_path(source_path / dir_file)
dir_content += f" {dir_file} {file_hash}\n"
return dir_content
@property
def content_hash(self) -> str:
"""Return final content hash as hexadecimal string."""
return self._hstate.hexdigest()
@property
def sorted_input_files(self) -> list[str]:
"""Return the list of input files used by this instance."""
if self._sorted_input_files is None:
self._sorted_input_files = sorted(
[str(p) for p in self._input_files]
)
return self._sorted_input_files
def get_input_file_paths(self) -> set[Path]:
"""Return the set of input file Path values used by this instance."""
return self._input_files
def main() -> int:
parser = argparse.ArgumentParser(
description=__doc__, formatter_class=argparse.RawTextHelpFormatter
)
parser.add_argument(
"--cipd-name",
action="append",
default=[],
help="Provide name for optional CIPD version file. Can be used multiple times.",
)
parser.add_argument(
"--exclude-suffix",
action="append",
default=[],
help='Exclude directory entries with given suffix (e.g. ".pyc").\n'
+ "Can be used multiple times.",
)
parser.add_argument(
"--git-binary",
type=Path,
default=Path("git"),
help="Specify git binary to use for git repositories.",
)
parser.add_argument(
"source_path",
type=Path,
nargs="+",
help="Source file or directory path.",
)
parser.add_argument(
"--output", type=Path, help="Optional output file path."
)
parser.add_argument(
"--depfile", type=Path, help="Optional Ninja depfile output file path."
)
parser.add_argument(
"--inputs-list",
type=Path,
help="Write list of inputs to file, one path per line.",
)
args = parser.parse_args()
if args.depfile and not args.output:
parser.error("--depfile option requires --output.")
fstate = FileState(args.cipd_name, args.exclude_suffix, args.git_binary)
for source_path in args.source_path:
fstate.hash_source_path(source_path)
if args.output:
# Do not modify existing output if it has the same content.
current_content = "~~~"
if args.output.exists():
current_content = args.output.read_text()
if current_content != fstate.content_hash:
args.output.write_text(fstate.content_hash)
else:
print(fstate.content_hash)
if args.inputs_list:
args.inputs_list.write_text("\n".join(fstate.sorted_input_files) + "\n")
if args.depfile:
args.depfile.write_text(
"%s: \\\n %s\n"
% (
args.output,
" \\\n ".join(
_depfile_quote(f) for f in fstate.sorted_input_files
),
)
)
return 0
if __name__ == "__main__":
sys.exit(main())