blob: d0978139119377bc3eacaf16e7396397b462d211 [file] [log] [blame]
#!/usr/bin/env fuchsia-vendored-python
# Copyright 2021 The Fuchsia Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Build packages.archive.tgz."""
import argparse
import filecmp
import json
import os
import pathlib
import subprocess
import sys
from assembly import PackageManifest
from serialization import json_load
from typing import Dict, Set
def add_content_entry(
content: Dict[str, str],
dst: str,
src: str,
src_dir: str,
depfile_inputs: Set[str],
):
"""Add new entry to |content|, checking for duplicates."""
# Make the source path relative, because the build still
# puts absolute one in a small number of package manifests :-/
src_path = os.path.relpath(os.path.join(src_dir, src))
depfile_inputs.add(src_path)
if dst in content:
cur_src_path = content[dst]
if cur_src_path != src_path and not filecmp.cmp(cur_src_path, src_path):
raise ValueError(
"Duplicate entries for destination %s:\n %s\n %s\n"
% (dst, cur_src_path, src_path)
)
return
content[dst] = src_path
def main():
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
"--files",
metavar="FILESPEC",
nargs="*",
default=[],
help="Add one or more file to the archive, each FILESPEC should be DST=SRC",
)
parser.add_argument(
"--src-dir",
default="",
help="Specify source directory. Default is current one.",
)
parser.add_argument(
"--tuf-repo-name", required=True, help="Specify TUF repository name."
)
parser.add_argument(
"--tarmaker",
required=True,
help="Path to the 'tarmaker' host tool to use",
)
parser.add_argument(
"--tarmaker-manifest",
required=True,
help="Path to output tarmaker manifest file.",
)
parser.add_argument(
"--package-manifests-list",
required=True,
help="Path to file listing all package manifest files.",
)
parser.add_argument(
"--tuf-timestamp-json", required=True, help="Path to timestamp.json"
)
parser.add_argument(
"--tuf-targets-json",
nargs="*",
default=[],
help="Path to targets.json file",
)
parser.add_argument(
"--output", required=True, help="Path to output .tar.gz archive."
)
parser.add_argument("--depfile", help="Path to output Ninja depfile.")
args = parser.parse_args()
# Map destination path -> source path
content: Dict[str, str] = {}
# Map merkle root string -> source path
merkle_to_source: Dict[str, str] = {}
# List of input paths for the depfile.
depfile_inputs: Set[str] = set()
# Parse the --files argument first.
for filespec in args.files:
dst, sep, src = filespec.strip().partition("=")
if sep != "=":
raise ValueError("Unexpected file spec: " + filespec)
add_content_entry(content, dst, src, args.src_dir, depfile_inputs)
# Parse --package-manifests-list if provided, parse it to
# populate the blobs directory in the archive.
blobs_dst_dir = args.tuf_repo_name + "/repository/blobs"
with open(args.package_manifests_list) as f:
package_manifest_list_json = json.load(f)
package_manifest_paths = [
pathlib.Path(p)
for p in package_manifest_list_json["content"]["manifests"]
]
# use package_manifest_paths as a worklist that expands as paths are traversed
while len(package_manifest_paths) > 0:
package_manifest_path = package_manifest_paths.pop()
depfile_inputs.add(str(package_manifest_path))
with package_manifest_path.open() as f:
package_manifest = json_load(PackageManifest, f)
# TODO(https://fxbug.dev/42180820) extract this into a constructor for PackageManifest?
for blob_entry in package_manifest.blobs:
src = blob_entry.source_path
if package_manifest.blob_sources_relative == "file":
src = package_manifest_path.parent / src
merkle = blob_entry.merkle
merkle_to_source[merkle] = src
dst = os.path.join(blobs_dst_dir, merkle)
add_content_entry(content, dst, src, args.src_dir, depfile_inputs)
for subpackage in package_manifest.subpackages:
package_manifest_paths.append(
package_manifest_path.parent / subpackage.manifest_path
)
# Parse the timestamp.json file to extract the version of the
# snapshot.json file to use, copy it to the archive, then parse it.
depfile_inputs.add(args.tuf_timestamp_json)
with open(args.tuf_timestamp_json) as f:
timestamp_json = json.load(f)
tuf_repo_dir = os.path.join(args.src_dir, args.tuf_repo_name, "repository")
add_content_entry(
content,
os.path.join(tuf_repo_dir, "timestamp.json"),
args.tuf_timestamp_json,
"",
depfile_inputs,
)
# Install <version>.snapshot.json as is, then as 'snapshot.json'.
snapshot_json_version = timestamp_json["signed"]["meta"]["snapshot.json"][
"version"
]
snapshot_json_file = os.path.join(
tuf_repo_dir, "%d.snapshot.json" % snapshot_json_version
)
snapshot_json_src_path = os.path.join(args.src_dir, snapshot_json_file)
add_content_entry(
content, snapshot_json_file, snapshot_json_src_path, "", depfile_inputs
)
add_content_entry(
content,
os.path.join(tuf_repo_dir, "snapshot.json"),
snapshot_json_src_path,
"",
depfile_inputs,
)
# Parse the snapshots.json file to grab the version targets.json.
# Install <version>.snapshot.json as-is, then as snapshot.json in the
# archive.
#
with open(snapshot_json_src_path) as f:
snapshot_json = json.load(f)
targets_json_version = snapshot_json["signed"]["meta"]["targets.json"][
"version"
]
targets_json_file = os.path.join(
tuf_repo_dir, "%d.targets.json" % targets_json_version
)
targets_json_src_path = os.path.join(args.src_dir, targets_json_file)
add_content_entry(
content, targets_json_file, targets_json_src_path, "", depfile_inputs
)
add_content_entry(
content,
os.path.join(tuf_repo_dir, "targets.json"),
targets_json_file,
"",
depfile_inputs,
)
# Parse the targets.json file to populate the targets directory
# in the archive.
with open(targets_json_src_path) as f:
targets_json = json.load(f)
for target, entry in targets_json["signed"]["targets"].items():
hash = entry["hashes"]["sha512"]
merkle = entry["custom"]["merkle"]
# The target name is <dirname>/<basename> or just <basename>.
# While the installation file name is
# ${tuf_repo_name}/repository/targets/<dirname>/<hash>.<basename>
# or ${tuf_repo_name}/repository/targets/<hash>.<basename> is there
# is no <dirname> in the target name,
dirname, basename = os.path.split(target)
dst = args.tuf_repo_name + "/repository/targets"
if dirname:
dst = os.path.join(dst, dirname)
dst = os.path.join(dst, f"{hash}.{basename}")
src = merkle_to_source[merkle]
add_content_entry(content, dst, src, args.src_dir, depfile_inputs)
# Verify that all content source files are available, or print an error
# message describing what's missing.
missing_files = []
for dst, src in content.items():
if not os.path.exists(src):
missing_files.append(src)
if missing_files:
print(
"ERROR: Source files required to build the archive are missing!:\n",
file=sys.stderr,
)
for path in missing_files[:16]:
print(" " + path, file=sys.stderr)
if len(missing_files > 16):
print(" ...", file=sys.stderr)
return 1
# Generate a tarmaker manifest file listing how to construct the archive,
# then call the tool to build it.
with open(args.tarmaker_manifest, "w") as tarmaker_manifest:
for dst, src in sorted(content.items()):
tarmaker_manifest.write("%s=%s\n" % (dst, src))
cmd = [
args.tarmaker,
"-output",
args.output,
"-manifest",
args.tarmaker_manifest,
]
subprocess.run(cmd)
# Generate the depfile if needed
if args.depfile:
with open(args.depfile, "w") as f:
f.write(args.output + ":")
for input in sorted(depfile_inputs):
f.write(" " + input)
f.write("\n")
return 0
if __name__ == "__main__":
sys.exit(main())