| #!/usr/bin/env fuchsia-vendored-python |
| # Copyright 2021 The Fuchsia Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| """Build packages.archive.tgz.""" |
| |
| import argparse |
| import filecmp |
| import json |
| import os |
| import pathlib |
| import subprocess |
| import sys |
| |
| from assembly import PackageManifest |
| from serialization import json_load |
| from typing import Dict, Set |
| |
| |
| def add_content_entry( |
| content: Dict[str, str], |
| dst: str, |
| src: str, |
| src_dir: str, |
| depfile_inputs: Set[str], |
| ): |
| """Add new entry to |content|, checking for duplicates.""" |
| # Make the source path relative, because the build still |
| # puts absolute one in a small number of package manifests :-/ |
| src_path = os.path.relpath(os.path.join(src_dir, src)) |
| depfile_inputs.add(src_path) |
| if dst in content: |
| cur_src_path = content[dst] |
| if cur_src_path != src_path and not filecmp.cmp(cur_src_path, src_path): |
| raise ValueError( |
| "Duplicate entries for destination %s:\n %s\n %s\n" |
| % (dst, cur_src_path, src_path) |
| ) |
| return |
| content[dst] = src_path |
| |
| |
| def main(): |
| parser = argparse.ArgumentParser(description=__doc__) |
| |
| parser.add_argument( |
| "--files", |
| metavar="FILESPEC", |
| nargs="*", |
| default=[], |
| help="Add one or more file to the archive, each FILESPEC should be DST=SRC", |
| ) |
| |
| parser.add_argument( |
| "--src-dir", |
| default="", |
| help="Specify source directory. Default is current one.", |
| ) |
| |
| parser.add_argument( |
| "--tuf-repo-name", required=True, help="Specify TUF repository name." |
| ) |
| |
| parser.add_argument( |
| "--tarmaker", |
| required=True, |
| help="Path to the 'tarmaker' host tool to use", |
| ) |
| |
| parser.add_argument( |
| "--tarmaker-manifest", |
| required=True, |
| help="Path to output tarmaker manifest file.", |
| ) |
| |
| parser.add_argument( |
| "--package-manifests-list", |
| required=True, |
| help="Path to file listing all package manifest files.", |
| ) |
| |
| parser.add_argument( |
| "--tuf-timestamp-json", required=True, help="Path to timestamp.json" |
| ) |
| |
| parser.add_argument( |
| "--tuf-targets-json", |
| nargs="*", |
| default=[], |
| help="Path to targets.json file", |
| ) |
| |
| parser.add_argument( |
| "--output", required=True, help="Path to output .tar.gz archive." |
| ) |
| |
| parser.add_argument("--depfile", help="Path to output Ninja depfile.") |
| |
| args = parser.parse_args() |
| |
| # Map destination path -> source path |
| content: Dict[str, str] = {} |
| |
| # Map merkle root string -> source path |
| merkle_to_source: Dict[str, str] = {} |
| |
| # List of input paths for the depfile. |
| depfile_inputs: Set[str] = set() |
| |
| # Parse the --files argument first. |
| for filespec in args.files: |
| dst, sep, src = filespec.strip().partition("=") |
| if sep != "=": |
| raise ValueError("Unexpected file spec: " + filespec) |
| add_content_entry(content, dst, src, args.src_dir, depfile_inputs) |
| |
| # Parse --package-manifests-list if provided, parse it to |
| # populate the blobs directory in the archive. |
| blobs_dst_dir = args.tuf_repo_name + "/repository/blobs" |
| with open(args.package_manifests_list) as f: |
| package_manifest_list_json = json.load(f) |
| package_manifest_paths = [ |
| pathlib.Path(p) |
| for p in package_manifest_list_json["content"]["manifests"] |
| ] |
| |
| # use package_manifest_paths as a worklist that expands as paths are traversed |
| while len(package_manifest_paths) > 0: |
| package_manifest_path = package_manifest_paths.pop() |
| depfile_inputs.add(str(package_manifest_path)) |
| with package_manifest_path.open() as f: |
| package_manifest = json_load(PackageManifest, f) |
| # TODO(https://fxbug.dev/42180820) extract this into a constructor for PackageManifest? |
| for blob_entry in package_manifest.blobs: |
| src = blob_entry.source_path |
| if package_manifest.blob_sources_relative == "file": |
| src = package_manifest_path.parent / src |
| merkle = blob_entry.merkle |
| merkle_to_source[merkle] = src |
| dst = os.path.join(blobs_dst_dir, merkle) |
| add_content_entry(content, dst, src, args.src_dir, depfile_inputs) |
| for subpackage in package_manifest.subpackages: |
| package_manifest_paths.append( |
| package_manifest_path.parent / subpackage.manifest_path |
| ) |
| |
| # Parse the timestamp.json file to extract the version of the |
| # snapshot.json file to use, copy it to the archive, then parse it. |
| depfile_inputs.add(args.tuf_timestamp_json) |
| with open(args.tuf_timestamp_json) as f: |
| timestamp_json = json.load(f) |
| |
| tuf_repo_dir = os.path.join(args.src_dir, args.tuf_repo_name, "repository") |
| add_content_entry( |
| content, |
| os.path.join(tuf_repo_dir, "timestamp.json"), |
| args.tuf_timestamp_json, |
| "", |
| depfile_inputs, |
| ) |
| |
| # Install <version>.snapshot.json as is, then as 'snapshot.json'. |
| snapshot_json_version = timestamp_json["signed"]["meta"]["snapshot.json"][ |
| "version" |
| ] |
| snapshot_json_file = os.path.join( |
| tuf_repo_dir, "%d.snapshot.json" % snapshot_json_version |
| ) |
| snapshot_json_src_path = os.path.join(args.src_dir, snapshot_json_file) |
| add_content_entry( |
| content, snapshot_json_file, snapshot_json_src_path, "", depfile_inputs |
| ) |
| add_content_entry( |
| content, |
| os.path.join(tuf_repo_dir, "snapshot.json"), |
| snapshot_json_src_path, |
| "", |
| depfile_inputs, |
| ) |
| |
| # Parse the snapshots.json file to grab the version targets.json. |
| # Install <version>.snapshot.json as-is, then as snapshot.json in the |
| # archive. |
| # |
| with open(snapshot_json_src_path) as f: |
| snapshot_json = json.load(f) |
| |
| targets_json_version = snapshot_json["signed"]["meta"]["targets.json"][ |
| "version" |
| ] |
| targets_json_file = os.path.join( |
| tuf_repo_dir, "%d.targets.json" % targets_json_version |
| ) |
| targets_json_src_path = os.path.join(args.src_dir, targets_json_file) |
| add_content_entry( |
| content, targets_json_file, targets_json_src_path, "", depfile_inputs |
| ) |
| add_content_entry( |
| content, |
| os.path.join(tuf_repo_dir, "targets.json"), |
| targets_json_file, |
| "", |
| depfile_inputs, |
| ) |
| |
| # Parse the targets.json file to populate the targets directory |
| # in the archive. |
| with open(targets_json_src_path) as f: |
| targets_json = json.load(f) |
| for target, entry in targets_json["signed"]["targets"].items(): |
| hash = entry["hashes"]["sha512"] |
| merkle = entry["custom"]["merkle"] |
| # The target name is <dirname>/<basename> or just <basename>. |
| # While the installation file name is |
| # ${tuf_repo_name}/repository/targets/<dirname>/<hash>.<basename> |
| # or ${tuf_repo_name}/repository/targets/<hash>.<basename> is there |
| # is no <dirname> in the target name, |
| dirname, basename = os.path.split(target) |
| dst = args.tuf_repo_name + "/repository/targets" |
| if dirname: |
| dst = os.path.join(dst, dirname) |
| dst = os.path.join(dst, f"{hash}.{basename}") |
| src = merkle_to_source[merkle] |
| add_content_entry(content, dst, src, args.src_dir, depfile_inputs) |
| |
| # Verify that all content source files are available, or print an error |
| # message describing what's missing. |
| missing_files = [] |
| for dst, src in content.items(): |
| if not os.path.exists(src): |
| missing_files.append(src) |
| |
| if missing_files: |
| print( |
| "ERROR: Source files required to build the archive are missing!:\n", |
| file=sys.stderr, |
| ) |
| for path in missing_files[:16]: |
| print(" " + path, file=sys.stderr) |
| if len(missing_files > 16): |
| print(" ...", file=sys.stderr) |
| return 1 |
| |
| # Generate a tarmaker manifest file listing how to construct the archive, |
| # then call the tool to build it. |
| with open(args.tarmaker_manifest, "w") as tarmaker_manifest: |
| for dst, src in sorted(content.items()): |
| tarmaker_manifest.write("%s=%s\n" % (dst, src)) |
| |
| cmd = [ |
| args.tarmaker, |
| "-output", |
| args.output, |
| "-manifest", |
| args.tarmaker_manifest, |
| ] |
| subprocess.run(cmd) |
| |
| # Generate the depfile if needed |
| if args.depfile: |
| with open(args.depfile, "w") as f: |
| f.write(args.output + ":") |
| for input in sorted(depfile_inputs): |
| f.write(" " + input) |
| f.write("\n") |
| |
| return 0 |
| |
| |
| if __name__ == "__main__": |
| sys.exit(main()) |