blob: 44382f93312fbc9f40abd0e8f9306b6c181d0ef4 [file] [log] [blame] [edit]
#!/usr/bin/env python3
# Copyright 2021 The Fuchsia Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Outputs artifacts to a lock file based on a general specification.
Updates the artifacts by matching the available artifacts in an
artifact store against the constraints in a specification
(artifact_spec.json).
"""
import argparse
import hashlib
import json
import json5
import os
import subprocess
import sys
import tempfile
import urllib.error
import urllib.request
# URL to artifact_groups.json for tuf artifact store
TUF_ARTIFACTS_JSON_URL = 'https://%s/targets/artifact_groups.json'
BLOB_SERVER = 'fuchsia-blobs.googleusercontent.com'
CONTENTS_FILE_RELATIVE_PATH = 'meta/contents'
# The block size of reading a file. The value is arbitrarily chosen to avoid
# loading the whole file.
_BLOCK_SIZE = 4096
def read_file_from_meta_far(far_tool, meta_far_path, file):
"""Invokes the far tool to extract the meta_far."""
args = [far_tool, 'cat', '--archive=' + meta_far_path, '--file=' + file]
return subprocess.run(
args, check=True, capture_output=True, text=True).stdout
def _calculate_sha512_of_file(path):
hash_object = hashlib.sha512()
with open(path, 'rb') as f:
for byte_block in iter(lambda: f.read(_BLOCK_SIZE), b''):
hash_object.update(byte_block)
return hash_object.hexdigest()
def get_blob_list_and_sha(merkle, far_tool):
with tempfile.TemporaryDirectory() as tmpdir:
path = os.path.join(tmpdir, 'meta.far')
url = 'https://%s/%s' % (BLOB_SERVER, merkle)
try:
urllib.request.urlretrieve(url, path)
blobs = [merkle]
meta_content = read_file_from_meta_far(far_tool, path,
CONTENTS_FILE_RELATIVE_PATH)
for content in meta_content.split('\n'):
content = content.strip()
if not len(content):
continue
blobs.append(content.split('=')[1])
return {
'blobs': blobs,
'sha512': _calculate_sha512_of_file(path),
'size': os.path.getsize(path)
}
except urllib.error.HTTPError as ex:
raise ValueError('Failed to download meta far or blob. Url is: ' +
url) from ex
def match(repo, spec):
"""True if the spec key-values match the repo."""
for key, value in spec.items():
repo_value = repo.get(key)
if repo_value is None:
return False
if isinstance(value, (str, int, bool, float)):
if value not in ('$min', '$max') and value != repo_value:
return False
elif isinstance(value, dict):
if not match(repo_value, value):
return False
else:
raise ValueError('Unsupported value in match: {} ({})'.format(
value, type(value)))
return True
def dotted_get(dict_, keyname):
"""Given keyname in dotted notation (a or a.b), returns the value or None."""
if not keyname.find('.'):
return dict_.get(keyname)
for key in keyname.split('.'):
dict_ = dict_.get(key)
if dict_ is None:
return None
return dict_
def get_artifact(artifact_group, name):
"""Returns the artifact entry matched by name."""
for artifact in artifact_group.get('artifacts'):
if artifact.get('name') == name:
return artifact
raise ValueError('Missing artifact with name ' + name)
def get_min_max_keys(spec, key_path):
""""Produces a list of [(key, reverse), ...] for any $min/$max keys."""
sort_keys = []
for key, value in spec.items():
if value in ('$min', '$max'):
sort_keys += [('.'.join(key_path + [key]), not value == '$min')]
elif isinstance(value, dict):
sort_keys += get_min_max_keys(value, key_path + [key])
return sort_keys
def sort_by_min_max(repos, spec):
"""Sorts by any $min/$max key."""
sort_keys = get_min_max_keys(spec, [])
if len(sort_keys) > 1:
raise ValueError(
'Only one $min/$max allowed, found: {}'.format(sort_keys))
if len(sort_keys) == 1:
repos.sort(
key=lambda x: dotted_get(x.get('attributes'), sort_keys[0][0]),
reverse=sort_keys[0][1])
return repos
def get_json(store, args):
"""Return artifact_groups.json for different types of stores."""
print('Loading store ' + str(store))
store_type = store.get('type')
if store_type == 'tuf':
url = TUF_ARTIFACTS_JSON_URL % store.get('repo')
return json.load(urllib.request.urlopen(url))
if store_type == 'local' and args.artifacts_root:
path = os.path.join(args.artifacts_root, store.get('path'))
return json.load(open(path, 'r'))
raise ValueError('Missing --artifacts-root parameter')
def match_artifacts(artifact_store_json, attributes):
"""Match artifacts from the artifact store file and spec attributes."""
groups = artifact_store_json.get('artifact_groups')
matches = [
artifact_group for artifact_group in groups
if match(artifact_group.get('attributes'), attributes)
]
matches = sort_by_min_max(matches, attributes)
if matches:
return matches[0]
raise ValueError(
'Failed to find artifact in artifact store matching: {}'.format(
attributes))
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument('--far-tool', help='Path to far tool', required=False)
parser.add_argument(
'--spec-file',
help='File that lists the specifications for prebuilt artifacts',
default='example/tuf/artifact_spec.json')
parser.add_argument(
'--output-file',
help='File to write the concrete artifacts and merkle keys',
default='example/tuf/artifact_lock.json')
parser.add_argument(
'--artifacts-root',
help='Path to root dir for artifacts_groups.json',
required=False)
return parser.parse_args()
def _migrate_deprecated_field(obj, from_key, to_key):
if from_key in obj:
obj[to_key] = obj.pop(from_key)
def main():
"""Updates the artifact merkles based on specification constraints."""
args = parse_args()
if not os.path.isfile(args.spec_file):
sys.stderr.write('{} does not exist.'.format(args.spec_file))
sys.exit(1)
artifact_lock = {}
with open(args.spec_file, 'r') as spec_file:
spec = json5.load(spec_file)
artifacts = []
for artifact_group in spec['artifact_groups']:
store = artifact_group['artifact_store']
artifact_store_json = get_json(store, args)
attributes = artifact_group['attributes']
for artifact in artifact_group['artifacts']:
name = artifact['name']
artifact_attributes = artifact.get('attributes', {})
artifact_attributes.update(attributes)
matched_artifact_group = match_artifacts(
artifact_store_json, attributes)
matched_artifact = get_artifact(matched_artifact_group, name)
_migrate_deprecated_field(matched_artifact_group, 'directory',
'name')
store['artifact_group_name'] = matched_artifact_group['name']
matched_artifact['artifact_store'] = store
matched_artifact['attributes'] = matched_artifact_group[
'attributes']
if store['type'] == 'tuf' and args.far_tool:
_migrate_deprecated_field(matched_artifact, 'id', 'merkle')
if matched_artifact['type'] == 'package':
merkle = matched_artifact['merkle']
matched_artifact.update(
get_blob_list_and_sha(merkle, args.far_tool))
else:
matched_artifact['blobs'] = [matched_artifact['merkle']]
artifacts.append(matched_artifact)
artifact_lock['artifacts'] = artifacts
print('Writing to {}'.format(args.output_file))
with open(args.output_file, 'w') as lock_file:
json.dump(artifact_lock, lock_file, indent=2)
if __name__ == '__main__':
sys.exit(main())