| #!/usr/bin/env python |
| |
| # Copyright 2020, The Android Open Source Project |
| # |
| # Permission is hereby granted, free of charge, to any person |
| # obtaining a copy of this software and associated documentation |
| # files (the "Software"), to deal in the Software without |
| # restriction, including without limitation the rights to use, copy, |
| # modify, merge, publish, distribute, sublicense, and/or sell copies |
| # of the Software, and to permit persons to whom the Software is |
| # furnished to do so, subject to the following conditions: |
| # |
| # The above copyright notice and this permission notice shall be |
| # included in all copies or substantial portions of the Software. |
| # |
| # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, |
| # EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF |
| # MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND |
| # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS |
| # BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN |
| # ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN |
| # CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE |
| # SOFTWARE. |
| # |
| """Command-line tool for AFTL support for Android Verified Boot images.""" |
| |
| from __future__ import division |
| |
| import argparse |
| import binascii |
| import hashlib |
| import multiprocessing |
| import os |
| import Queue # pylint: disable=bad-python3-import |
| import struct |
| import subprocess |
| import sys |
| import tempfile |
| import time |
| |
| |
| import avbtool |
| import proto.aftl_pb2 |
| import proto.api_pb2 |
| import proto.crypto.sigpb |
| |
| # Android Firmware Transparency Log Data Structures |
| |
| |
| class AftlError(Exception): |
| """Application-specific errors. |
| |
| These errors represent issues for which a stack-trace should not be |
| presented. |
| |
| Attributes: |
| message: Error message. |
| """ |
| |
| def __init__(self, message): |
| Exception.__init__(self, message) |
| |
| |
| def rsa_key_read_pem_bytes(key_path): |
| """Reads the bytes out of the passed in PEM file. |
| |
| Arguments: |
| key_path: A string containing the path to the PEM file. |
| |
| Returns: |
| A bytearray containing the DER encoded bytes in the PEM file. |
| |
| Raises: |
| AftlError: If openssl cannot decode the PEM file. |
| """ |
| # Use openssl to decode the PEM file. |
| args = ['openssl', 'rsa', '-in', key_path, '-pubout', '-outform', 'DER'] |
| p = subprocess.Popen(args, |
| stdin=subprocess.PIPE, |
| stdout=subprocess.PIPE, |
| stderr=subprocess.PIPE) |
| (pout, perr) = p.communicate() |
| retcode = p.wait() |
| if retcode != 0: |
| raise AftlError('Error decoding: {}'.format(perr)) |
| return bytearray(pout) |
| |
| |
| def check_signature(log_root, log_root_sig, |
| transparency_log_pub_key): |
| """Validates the signature provided by the transparency log. |
| |
| Arguments: |
| log_root: The transparency log_root data structure. |
| log_root_sig: The signature of the transparency log_root data structure. |
| transparency_log_pub_key: The file path to the transparency log public key. |
| |
| Returns: |
| True if the signature check passes, otherwise False. |
| """ |
| |
| logsig_tmp = tempfile.NamedTemporaryFile() |
| logsig_tmp.write(log_root_sig) |
| logsig_tmp.flush() |
| logroot_tmp = tempfile.NamedTemporaryFile() |
| logroot_tmp.write(log_root) |
| logroot_tmp.flush() |
| |
| p = subprocess.Popen(['openssl', 'dgst', '-sha256', '-verify', |
| transparency_log_pub_key, |
| '-signature', logsig_tmp.name, logroot_tmp.name], |
| stdin=subprocess.PIPE, |
| stdout=subprocess.PIPE, |
| stderr=subprocess.PIPE) |
| |
| (_, openssl_err) = p.communicate() |
| retcode = p.wait() |
| if not retcode: |
| return True |
| sys.stderr.write('Error validating log_root signature with openssl {}'. |
| format(openssl_err)) |
| return False |
| |
| |
| # AFTL Merkle Tree Functionality |
| def rfc6962_hash_leaf(leaf): |
| """RFC6962 hashing function for hashing leaves of a Merkle tree. |
| |
| Arguments: |
| leaf: A bytearray containing the Merkle tree leaf to be hashed. |
| |
| Returns: |
| A bytearray containing the RFC6962 SHA256 hash of the leaf. |
| """ |
| hasher = hashlib.sha256() |
| # RFC6962 states a '0' byte should be prepended to the data. |
| # This is done in conjunction with the '1' byte for non-leaf |
| # nodes for 2nd preimage attack resistance. |
| hasher.update(b'\x00') |
| hasher.update(leaf) |
| return hasher.digest() |
| |
| |
| def rfc6962_hash_children(l, r): |
| """Calculates the inner Merkle tree node hash of child nodes l and r. |
| |
| Arguments: |
| l: A bytearray containing the left child node to be hashed. |
| r: A bytearray containing the right child node to be hashed. |
| |
| Returns: |
| A bytearray containing the RFC6962 SHA256 hash of 1|l|r. |
| """ |
| hasher = hashlib.sha256() |
| # RFC6962 states a '1' byte should be prepended to the concatenated data. |
| # This is done in conjunction with the '0' byte for leaf |
| # nodes for 2nd preimage attack resistance. |
| hasher.update(b'\x01') |
| hasher.update(l) |
| hasher.update(r) |
| return hasher.digest() |
| |
| |
| def chain_border_right(seed, proof): |
| """Computes a subtree hash along the left-side tree border. |
| |
| Arguments: |
| seed: A bytearray containing the starting hash. |
| proof: A list of bytearrays representing the hashes in the inclusion proof. |
| |
| Returns: |
| A bytearray containing the left-side subtree hash. |
| """ |
| for h in proof: |
| seed = rfc6962_hash_children(h, seed) |
| return seed |
| |
| |
| def chain_inner(seed, proof, leaf_index): |
| """Computes a subtree hash on or below the tree's right border. |
| |
| Arguments: |
| seed: A bytearray containing the starting hash. |
| proof: A list of bytearrays representing the hashes in the inclusion proof. |
| leaf_index: The current leaf index. |
| |
| Returns: |
| A bytearray containing the subtree hash. |
| """ |
| for i, h in enumerate(proof): |
| if leaf_index >> i & 1 == 0: |
| seed = rfc6962_hash_children(seed, h) |
| else: |
| seed = rfc6962_hash_children(h, seed) |
| return seed |
| |
| |
| def root_from_icp(leaf_index, tree_size, proof, leaf_hash): |
| """Calculates the expected Merkle tree root hash. |
| |
| Arguments: |
| leaf_index: The current leaf index. |
| tree_size: The number of nodes in the Merkle tree. |
| proof: A list of bytearrays containing the inclusion proof. |
| leaf_hash: A bytearray containing the initial leaf hash. |
| |
| Returns: |
| A bytearray containing the calculated Merkle tree root hash. |
| |
| Raises: |
| AftlError: If invalid parameters are passed in. |
| """ |
| if leaf_index < 0: |
| raise AftlError('Invalid leaf_index value: {}'.format(leaf_index)) |
| if tree_size < 0: |
| raise AftlError('Invalid tree_size value: {}'.format(tree_size)) |
| if leaf_index >= tree_size: |
| err_str = 'leaf_index cannot be equal or larger than tree_size: {}, {}' |
| raise AftlError(err_str.format(leaf_index, tree_size)) |
| if proof is None: |
| raise AftlError('Inclusion proof not provided.') |
| if leaf_hash is None: |
| raise AftlError('No leaf hash provided.') |
| # Calculate the point to split the proof into two parts. |
| # The split is where the paths to leaves diverge. |
| inner = (leaf_index ^ (tree_size - 1)).bit_length() |
| result = chain_inner(leaf_hash, proof[:inner], leaf_index) |
| result = chain_border_right(result, proof[inner:]) |
| return result |
| |
| |
| class AftlIcpHeader(object): |
| """A class for the transparency log inclusion proof header. |
| |
| Attributes: |
| magic: Magic for identifying the ICP header. |
| required_icp_version_major: The major version of AVB that wrote the entry. |
| required_icp_version_minor: The minor version of AVB that wrote the entry. |
| aftl_descriptor_size: Total size of the header's AftlDescriptor. |
| icp_count: Number of inclusion proofs represented in this structure. |
| """ |
| |
| SIZE = 18 # The size of the structure, in bytes |
| MAGIC = 'AFTL' |
| FORMAT_STRING = ('!4s2L' # magic, major & minor version |
| 'L' # descriptor size |
| 'H') # number of inclusion proof entries |
| |
| def __init__(self, data=None): |
| """Initializes a new transparency header object. |
| |
| Arguments: |
| data: If not None, must be a bytearray of size |SIZE|. |
| |
| Raises: |
| AftlError: If invalid structure for AftlIcpHeader. |
| """ |
| assert struct.calcsize(self.FORMAT_STRING) == self.SIZE |
| |
| if data: |
| (self.magic, self.required_icp_version_major, |
| self.required_icp_version_minor, self.aftl_descriptor_size, |
| self.icp_count) = struct.unpack(self.FORMAT_STRING, data) |
| else: |
| self.magic = self.MAGIC |
| self.required_icp_version_major = avbtool.AVB_VERSION_MAJOR |
| self.required_icp_version_minor = avbtool.AVB_VERSION_MINOR |
| self.aftl_descriptor_size = self.SIZE |
| self.icp_count = 0 |
| if not self.is_valid(): |
| raise AftlError('Invalid structure for AftlIcpHeader') |
| |
| def save(self, output): |
| """Serializes the transparency header |SIZE| to disk. |
| |
| Arguments: |
| output: The object to write the header to. |
| |
| Raises: |
| AftlError: If invalid structure for AftlIcpHeader. |
| """ |
| output.write(self.encode()) |
| |
| def encode(self): |
| """Serializes the header |SIZE| to a bytearray(). |
| |
| Returns: |
| A bytearray() with the encoded header. |
| |
| Raises: |
| AftlError: If invalid structure for AftlIcpHeader. |
| """ |
| if not self.is_valid(): |
| raise AftlError('Invalid structure for AftlIcpHeader') |
| return struct.pack(self.FORMAT_STRING, self.magic, |
| self.required_icp_version_major, |
| self.required_icp_version_minor, |
| self.aftl_descriptor_size, |
| self.icp_count) |
| |
| def is_valid(self): |
| """Ensures that values in an AftlIcpHeader structure are sane. |
| |
| Returns: |
| True if the values in the AftlIcpHeader are sane, False otherwise. |
| """ |
| if self.magic != AftlIcpHeader.MAGIC: |
| sys.stderr.write( |
| 'ICP Header: magic value mismatch: {}\n'.format(self.magic)) |
| return False |
| |
| if self.required_icp_version_major > avbtool.AVB_VERSION_MAJOR: |
| sys.stderr.write('ICP header: major version mismatch: {}\n'.format( |
| self.required_icp_version_major)) |
| return False |
| |
| if self.required_icp_version_minor > avbtool.AVB_VERSION_MINOR: |
| sys.stderr.write('ICP header: minor version mismatch: {}\n'.format( |
| self.required_icp_version_minor)) |
| return False |
| |
| if self.aftl_descriptor_size < self.SIZE: |
| sys.stderr.write('ICP Header: Invalid descriptor size: {}\n'.format( |
| self.aftl_descriptor_size)) |
| return False |
| |
| if self.icp_count < 0 or self.icp_count > 65535: |
| sys.stderr.write( |
| 'ICP header: ICP entry count out of range: {}\n'.format( |
| self.icp_count)) |
| return False |
| return True |
| |
| def print_desc(self, o): |
| """Print the descriptor. |
| |
| Arguments: |
| o: The object to write the output to. |
| """ |
| o.write(' Major version: {}\n'.format( |
| self.required_icp_version_major)) |
| o.write(' Minor version: {}\n'.format( |
| self.required_icp_version_minor)) |
| o.write(' Descriptor size: {}\n'.format( |
| self.aftl_descriptor_size)) |
| o.write(' ICP entries count: {}\n'.format( |
| self.icp_count)) |
| |
| class AftlIcpEntry(object): |
| """A class for the transparency log inclusion proof entries. |
| |
| The data that represents each of the components of the ICP entry are stored |
| immediately following the ICP entry header. The format is log_url, |
| SignedLogRoot, and inclusion proof hashes. |
| |
| Attributes: |
| log_url_size: Length of the string representing the transparency log URL. |
| leaf_index: Leaf index in the transparency log representing this entry. |
| log_root_descriptor_size: Size of the transparency log's SignedLogRoot. |
| fw_info_leaf_size: Size of the FirmwareInfo leaf passed to the log. |
| log_root_sig_size: Size in bytes of the log_root_signature |
| proof_hash_count: Number of hashes comprising the inclusion proof. |
| inc_proof_size: The total size of the inclusion proof, in bytes. |
| log_url: The URL for the transparency log that generated this inclusion |
| proof. |
| log_root_descriptor: The data comprising the signed tree head structure. |
| fw_info_leaf: The data comprising the FirmwareInfo leaf. |
| log_root_signature: The data comprising the log root signature. |
| proofs: The hashes comprising the inclusion proof. |
| |
| """ |
| SIZE = 27 # The size of the structure, in bytes |
| FORMAT_STRING = ('!L' # transparency log server url size |
| 'Q' # leaf index |
| 'L' # log root descriptor size |
| 'L' # firmware info leaf size |
| 'H' # log root signature size |
| 'B' # number of hashes in the inclusion proof |
| 'L') # size of the inclusion proof in bytes |
| # These are used to capture the log_url, log_root_descriptor, |
| # fw_info leaf, log root signature, and the proofs elements for the |
| # encode & save functions. |
| |
| def __init__(self, data=None): |
| """Initializes a new ICP entry object. |
| |
| Arguments: |
| data: If not None, must be a bytearray of size >= |SIZE|. |
| |
| Raises: |
| AftlError: If data does not represent a well-formed AftlIcpEntry. |
| """ |
| # Assert the header structure is of a sane size. |
| assert struct.calcsize(self.FORMAT_STRING) == self.SIZE |
| |
| if data: |
| # Deserialize the header from the data descriptor. |
| (self.log_url_size, self.leaf_index, self.log_root_descriptor_size, |
| self.fw_info_leaf_size, self.log_root_sig_size, self.proof_hash_count, |
| self.inc_proof_size) = struct.unpack( |
| self.FORMAT_STRING, data[0:self.SIZE]) |
| |
| # Deserialize ICP entry components from the data descriptor. |
| expected_format_string = '{}s{}s{}s{}s{}s'.format( |
| self.log_url_size, |
| self.log_root_descriptor_size, |
| self.fw_info_leaf_size, |
| self.log_root_sig_size, |
| self.inc_proof_size) |
| |
| (self.log_url, log_root_descriptor_bytes, self.fw_info_leaf, |
| self.log_root_signature, proof_bytes) = struct.unpack( |
| expected_format_string, data[self.SIZE:self.get_expected_size()]) |
| self.log_root_descriptor = TrillianLogRootDescriptor( |
| log_root_descriptor_bytes) |
| self.proofs = [] |
| if self.proof_hash_count > 0: |
| proof_idx = 0 |
| hash_size = self.inc_proof_size // self.proof_hash_count |
| for _ in range(self.proof_hash_count): |
| proof = proof_bytes[proof_idx:(proof_idx+hash_size)] |
| self.proofs.append(proof) |
| proof_idx += hash_size |
| else: |
| self.log_url_size = 0 |
| self.leaf_index = 0 |
| self.fw_info_leaf_size = 0 |
| self.log_root_sig_size = 0 |
| self.proof_hash_count = 0 |
| self.inc_proof_size = 0 |
| self.log_url = '' |
| self.log_root_descriptor = TrillianLogRootDescriptor() |
| log_root_descriptor_size = self.log_root_descriptor.get_expected_size() |
| self.log_root_descriptor_size = log_root_descriptor_size |
| self.fw_info_leaf = '' |
| self.log_root_signature = '' |
| self.proofs = [] |
| if not self.is_valid(): |
| raise AftlError('Invalid structure for AftlIcpEntry') |
| |
| def set_log_url(self, log_url): |
| """Sets the log_url and log_url_size elements in the AftlIcpEntry. |
| |
| Arguments: |
| log_url: The string representing the transparency log URL. |
| """ |
| self.log_url = log_url |
| self.log_url_size = len(log_url) |
| |
| def set_log_root_descriptor(self, log_root_descriptor): |
| """Sets signed_root_descriptor and signed_root_descriptor_size. |
| |
| Arguments: |
| log_root_descriptor: A TrillianLogRootDescriptor containing the |
| log_root for the transparency log. |
| """ |
| if not isinstance(log_root_descriptor, TrillianLogRootDescriptor): |
| raise AftlError('Invalid data type passed to set_log_root_descriptor: ' |
| 'Received {}.\n'.format(type(log_root_descriptor))) |
| self.log_root_descriptor = log_root_descriptor |
| self.log_root_descriptor_size = log_root_descriptor.get_expected_size() |
| |
| def set_proofs(self, proofs): |
| """Sets the proof_hash_count, proofs, and inc_proof_size. |
| |
| Arguments: |
| proofs: A bytearray of concatenated hashes comprising the inclusion proof. |
| """ |
| self.proof_hash_count = 0 |
| self.proofs = proofs |
| inc_proof_size = 0 |
| for proof in proofs: |
| inc_proof_size += len(proof) |
| self.proof_hash_count += 1 |
| self.inc_proof_size = inc_proof_size |
| |
| def verify_icp(self, transparency_log_pub_key): |
| """Verifies the contained inclusion proof given the public log key. |
| |
| Arguments: |
| transparency_log_pub_key: The trusted public key for the log. |
| |
| Returns: |
| True if the calculated signature matches AftlIcpEntry's. False otherwise. |
| """ |
| leaf_hash = rfc6962_hash_leaf(self.fw_info_leaf) |
| calc_root = root_from_icp(self.leaf_index, |
| self.log_root_descriptor.tree_size, |
| self.proofs, |
| leaf_hash) |
| if ((calc_root == self.log_root_descriptor.root_hash) and |
| check_signature( |
| self.log_root_descriptor.log_root, |
| self.log_root_signature, |
| transparency_log_pub_key)): |
| return True |
| return False |
| |
| def save(self, output): |
| """Serializes the transparency header |SIZE| and data to disk. |
| |
| Arguments: |
| output: The object to write the header to. |
| |
| Raises: |
| AftlError: If invalid entry structure. |
| """ |
| output.write(self.encode()) |
| |
| def encode(self): |
| """Serializes the header |SIZE| and data to a bytearray(). |
| |
| Returns: |
| A bytearray() with the encoded header. |
| |
| Raises: |
| AftlError: If invalid entry structure. |
| """ |
| proof_bytes = bytearray() |
| if not self.is_valid(): |
| raise AftlError('Invalid AftlIcpEntry structure') |
| |
| expected_format_string = '{}{}s{}s{}s{}s{}s'.format( |
| self.FORMAT_STRING, |
| self.log_url_size, |
| self.log_root_descriptor_size, |
| self.fw_info_leaf_size, |
| self.log_root_sig_size, |
| self.inc_proof_size) |
| |
| for proof in self.proofs: |
| proof_bytes.extend(proof) |
| |
| return struct.pack(expected_format_string, |
| self.log_url_size, self.leaf_index, |
| self.log_root_descriptor_size, self.fw_info_leaf_size, |
| self.log_root_sig_size, self.proof_hash_count, |
| self.inc_proof_size, self.log_url, |
| self.log_root_descriptor.encode(), |
| str(self.fw_info_leaf), |
| str(self.log_root_signature), |
| str(proof_bytes)) |
| |
| # TODO(danielaustin): Add unit test. |
| def translate_response(self, transparency_log, afi_response): |
| """Translates an AddFirmwareInfoResponse object to an AftlIcpEntry. |
| |
| Arguments: |
| transparency_log: String representing the transparency log URL. |
| afi_response: The AddFirmwareResponse object to translate. |
| """ |
| self.set_log_url(transparency_log) |
| self.leaf_index = afi_response.fw_info_proof.proof.leaf_index |
| self.log_root_descriptor = TrillianLogRootDescriptor( |
| afi_response.fw_info_proof.sth.log_root) |
| self.log_root_signature = afi_response.fw_info_proof.sth.log_root_signature |
| self.log_root_sig_size = len(self.log_root_signature) |
| self.log_root_descriptor_size = self.log_root_descriptor.get_expected_size() |
| |
| proof_hashes = afi_response.fw_info_proof.proof.hashes |
| self.set_proofs(proof_hashes) |
| |
| def get_expected_size(self): |
| """Gets the expected size of the full entry out of the header. |
| |
| Returns: |
| The expected size of the AftlIcpEntry from the header. |
| """ |
| return (self.SIZE + self.log_url_size + self.log_root_descriptor_size + |
| self.fw_info_leaf_size + self.log_root_sig_size + |
| self.inc_proof_size) |
| |
| def is_valid(self): |
| """Ensures that values in an AftlIcpEntry structure are sane. |
| |
| Returns: |
| True if the values in the AftlIcpEntry are sane, False otherwise. |
| """ |
| if ((self.log_url and self.log_url_size != len(self.log_url)) |
| or (not self.log_url and self.log_url_size != 0)): |
| sys.stderr.write('ICP entry: invalid URL size: {}\n' |
| .format(self.log_url_size)) |
| return False |
| |
| if self.leaf_index < 0: |
| sys.stderr.write('ICP entry: leaf index out of range: ' |
| '{}\n'.format(self.leaf_index)) |
| return False |
| |
| if (not self.log_root_descriptor or |
| not self.log_root_descriptor.is_valid()): |
| sys.stderr.write('ICP entry: invalid TrillianLogRootDescriptor\n') |
| return False |
| |
| if (self.log_root_descriptor_size != |
| self.log_root_descriptor.get_expected_size()): |
| sys.stderr.write('ICP entry: invalid signed root descriptor size: ' |
| '{}, should be {}\n'.format( |
| self.log_root_descriptor_size, |
| self.log_root_descriptor.get_expected_size())) |
| return False |
| |
| if ((self.fw_info_leaf and self.fw_info_leaf_size != len(self.fw_info_leaf)) |
| or (not self.fw_info_leaf and self.fw_info_leaf_size != 0)): |
| sys.stderr.write('ICP entry: invalid FirmwareInfo size: {}\n' |
| .format(self.fw_info_leaf_size)) |
| return False |
| |
| if self.proof_hash_count < 0: |
| sys.stderr.write('ICP entry: invalid proof count: {}\n'.format( |
| self.proof_hash_count)) |
| return False |
| |
| inc_proof_size = 0 |
| if self.proofs: |
| for proof in self.proofs: |
| inc_proof_size += len(proof) |
| if self.inc_proof_size != inc_proof_size: |
| sys.stderr.write('ICP entry: invalid transparency log proof size: ') |
| sys.stderr.write('{}, calculated {}\n'.format(self.inc_proof_size, |
| inc_proof_size)) |
| return False |
| elif self.inc_proof_size != 0: |
| sys.stderr.write('ICP entry: invalid transparency log proof size ' |
| '(should be 0): {}'.format(self.inc_proof_size)) |
| return False |
| |
| return True |
| |
| def print_desc(self, o): |
| """Print the descriptor. |
| |
| Arguments: |
| o: The object to write the output to. |
| """ |
| o.write(' Transparency Log: {}\n'.format(self.log_url)) |
| o.write(' Leaf index: {}\n'.format(self.leaf_index)) |
| o.write(' ICP hashes: ') |
| for i, proof_hash in enumerate(self.proofs): |
| if i != 0: |
| o.write(' ' * 24) |
| o.write('{}\n'.format(binascii.hexlify(proof_hash))) |
| |
| |
| class TrillianLogRootDescriptor(object): |
| """A class representing the Trillian log_root descriptor. |
| |
| Taken from Trillian definitions: |
| https://github.com/google/trillian/blob/master/trillian.proto#L255 |
| |
| Attributes: |
| version: The version number of the descriptor. Currently only version=1 is |
| supported. |
| tree_size: The size of the tree. |
| root_hash_size: The size of the root hash in bytes. Valid values are between |
| 0 and 128. |
| root_hash: The root hash as bytearray(). |
| timestamp: The timestamp in nanoseconds. |
| revision: The revision number as long. |
| metadata_size: The size of the metadata in bytes. Valid values are between |
| 0 and 65535. |
| metadata: The metadata as bytearray(). |
| """ |
| FORMAT_STRING_PART_1 = ('!H' # version |
| 'Q' # tree_size |
| 'B' # root_hash_size |
| ) |
| |
| FORMAT_STRING_PART_2 = ('!Q' # timestamp |
| 'Q' # revision |
| 'H' # metadata_size |
| ) |
| |
| def __init__(self, data=None): |
| """Initializes a new TrillianLogRoot descriptor.""" |
| if data: |
| # Parses first part of the log_root descriptor. |
| data_length = struct.calcsize(self.FORMAT_STRING_PART_1) |
| (self.version, self.tree_size, self.root_hash_size) = struct.unpack( |
| self.FORMAT_STRING_PART_1, data[0:data_length]) |
| data = data[data_length:] |
| |
| # Parses the root_hash bytes if the size indicates existance. |
| if self.root_hash_size > 0: |
| self.root_hash = data[0:self.root_hash_size] |
| data = data[self.root_hash_size:] |
| else: |
| self.root_hash = bytearray() |
| |
| # Parses second part of the log_root descriptor. |
| data_length = struct.calcsize(self.FORMAT_STRING_PART_2) |
| (self.timestamp, self.revision, self.metadata_size) = struct.unpack( |
| self.FORMAT_STRING_PART_2, data[0:data_length]) |
| data = data[data_length:] |
| |
| # Parses the metadata if the size indicates existance. |
| if self.metadata_size > 0: |
| self.metadata = data[0:self.metadata_size] |
| else: |
| self.metadata = bytearray() |
| else: |
| self.version = 1 |
| self.tree_size = 0 |
| self.root_hash_size = 0 |
| self.root_hash = bytearray() |
| self.timestamp = 0 |
| self.revision = 0 |
| self.metadata_size = 0 |
| self.metadata = bytearray() |
| |
| if not self.is_valid(): |
| raise AftlError('Invalid structure for TrillianLogRootDescriptor.') |
| |
| def get_expected_size(self): |
| """Calculates the expected size of the TrillianLogRootDescriptor. |
| |
| Returns: |
| The expected size of the TrillianLogRootDescriptor. |
| """ |
| return (struct.calcsize(self.FORMAT_STRING_PART_1) + self.root_hash_size + |
| struct.calcsize(self.FORMAT_STRING_PART_2) + self.metadata_size) |
| |
| def encode(self): |
| """Serializes the TrillianLogDescriptor to a bytearray(). |
| |
| Returns: |
| A bytearray() with the encoded header. |
| |
| Raises: |
| AftlError: If invalid entry structure. |
| """ |
| if not self.is_valid(): |
| raise AftlError('Invalid structure for TrillianLogRootDescriptor.') |
| |
| expected_format_string = '{}{}s{}{}s'.format( |
| self.FORMAT_STRING_PART_1, |
| self.root_hash_size, |
| self.FORMAT_STRING_PART_2[1:], |
| self.metadata_size) |
| |
| return struct.pack(expected_format_string, |
| self.version, self.tree_size, self.root_hash_size, |
| str(self.root_hash), self.timestamp, self.revision, |
| self.metadata_size, str(self.metadata)) |
| |
| def is_valid(self): |
| """Ensures that values in the descritor are sane. |
| |
| Returns: |
| True if the values are sane; otherwise False. |
| """ |
| cls = self.__class__.__name__ |
| if self.version != 1: |
| sys.stderr.write('{}: Bad version value {}.'.format(cls, self.version)) |
| return False |
| if self.tree_size < 0: |
| sys.stderr.write('{}: Bad tree_size value {}.'.format(cls, |
| self.tree_size)) |
| return False |
| if self.root_hash_size < 0 or self.root_hash_size > 128: |
| sys.stderr.write('{}: Bad root_hash_size value {}.'.format( |
| cls, self.root_hash_size)) |
| return False |
| if len(self.root_hash) != self.root_hash_size: |
| sys.stderr.write('{}: root_hash_size {} does not match with length of ' |
| 'root_hash {}.'.format(cls, self.root_hash_size, |
| len(self.root_hash))) |
| return False |
| if self.timestamp < 0: |
| sys.stderr.write('{}: Bad timestamp value {}.'.format(cls, |
| self.timestamp)) |
| return False |
| if self.revision < 0: |
| sys.stderr.write('{}: Bad revision value {}.'.format(cls, self.revision)) |
| return False |
| if self.metadata_size < 0 or self.metadata_size > 65535: |
| sys.stderr.write('{}: Bad metadatasize value {}.'.format( |
| cls, self.metadata_size)) |
| return False |
| if len(self.metadata) != self.metadata_size: |
| sys.stderr.write('{}: metadata_size {} does not match with length of' |
| 'metadata {}'.format(cls, self.metadata_size, |
| len(self.metadata))) |
| return False |
| return True |
| |
| def print_desc(self, o): |
| """Print the descriptor. |
| |
| Arguments: |
| o: The object to write the output to. |
| """ |
| i = ' ' * 8 |
| o.write('{}Version: {}\n'.format(i, self.version)) |
| o.write('{}Tree size: {}\n'.format(i, self.tree_size)) |
| o.write('{}Root hash size: {}\n'.format(i, self.root_hash_size)) |
| if self.root_hash_size > 0: |
| o.write('{}Root hash: {}\n'.format( |
| i, binascii.hexlify(self.root_hash))) |
| o.write('{}Timestamp (ns): {}\n'.format(i, self.timestamp)) |
| o.write('{}Revision: {}\n'.format(i, self.revision)) |
| o.write('{}Metadata size: {}\n'.format(i, self.metadata_size)) |
| if self.metadata_size > 0: |
| o.write('{}Metadata: {}\n'.format(i, self.metadata)) |
| |
| |
| class AftlDescriptor(object): |
| """A class for the transparency log inclusion proof descriptor. |
| |
| This encapsulates an AFTL ICP section with all information required to |
| validate an inclusion proof. |
| |
| Attributes: |
| icp_header: A header for the section. |
| icp_entries: A list of AftlIcpEntry objects representing the inclusion |
| proofs. |
| """ |
| |
| def __init__(self, data=None): |
| """Initializes a new AftlDescriptor section. |
| |
| Arguments: |
| data: If not None, must be a bytearray representing an AftlDescriptor. |
| |
| Raises: |
| AftlError: If the data does not represent a well-formed AftlDescriptor. |
| """ |
| if data: |
| icp_header_bytes = data[0:AftlIcpHeader.SIZE] |
| self.icp_header = AftlIcpHeader(icp_header_bytes) |
| if not self.icp_header.is_valid(): |
| raise AftlError('Invalid ICP header.') |
| icp_count = self.icp_header.icp_count |
| |
| # Jump past the header for entry deserialization. |
| icp_index = AftlIcpHeader.SIZE |
| # Validate each entry. |
| self.icp_entries = [] |
| # Add_icp_entry updates entries and header, so set header count to |
| # compensate. |
| self.icp_header.icp_count = 0 |
| for i in range(icp_count): |
| # Get the entry header from the AftlDescriptor. |
| cur_icp_entry = AftlIcpEntry(data[icp_index:]) |
| cur_icp_entry_size = cur_icp_entry.get_expected_size() |
| # Now validate the entry structure. |
| if not cur_icp_entry.is_valid(): |
| raise AftlError('Validation of ICP entry {} failed.'.format(i)) |
| self.add_icp_entry(cur_icp_entry) |
| icp_index += cur_icp_entry_size |
| else: |
| self.icp_header = AftlIcpHeader() |
| self.icp_entries = [] |
| if not self.is_valid(): |
| raise AftlError('Malformed AFTLDescriptor') |
| |
| def add_icp_entry(self, avb_icp_entry): |
| """Adds a new AftlIcpEntry to the AftlDescriptor, updating fields as needed. |
| |
| Arguments: |
| avb_icp_entry: An AftlIcpEntry structure. |
| """ |
| |
| # Set the next entry field to denote that a new ICP entry will follow. |
| self.icp_entries.append(avb_icp_entry) |
| self.icp_header.icp_count += 1 |
| |
| def save(self, output): |
| """Serializes the AftlDescriptor to disk. |
| |
| Arguments: |
| output: The object to write the descriptor to. |
| |
| Raises: |
| AftlError: If invalid descriptor structure. |
| """ |
| output.write(self.encode()) |
| |
| def encode(self): |
| """Serialize the AftlDescriptor to a bytearray(). |
| |
| Returns: |
| A bytearray() with the encoded header. |
| |
| Raises: |
| AftlError: If invalid descriptor structure. |
| """ |
| # The header and entries are guaranteed to be valid when encode is called. |
| # Check the entire structure as a whole. |
| if not self.is_valid(): |
| raise AftlError('Invalid AftlDescriptor structure.') |
| |
| icp_descriptor = bytearray() |
| icp_descriptor.extend(self.icp_header.encode()) |
| for icp_entry in self.icp_entries: |
| icp_descriptor.extend(icp_entry.encode()) |
| return icp_descriptor |
| |
| def is_valid(self): |
| """Ensures that values in the AftlDescriptor are sane. |
| |
| Returns: |
| True if the values in the AftlDescriptor are sane, False otherwise. |
| """ |
| if not self.icp_header.is_valid(): |
| return False |
| |
| if self.icp_header.icp_count != len(self.icp_entries): |
| return False |
| |
| for icp_entry in self.icp_entries: |
| if not icp_entry.is_valid(): |
| return False |
| return True |
| |
| |
| class AftlCommunication(object): |
| """Class to abstract the communication layer with the transparency log.""" |
| |
| def __init__(self, transparency_log): |
| """Initializes the object. |
| |
| Arguments: |
| transparency_log: String containing the URL of a transparency log server. |
| |
| """ |
| self.transparency_log = transparency_log |
| |
| def AddFirmwareInfo(self, request): |
| """Calls the AddFirmwareInfo RPC on the AFTL server. |
| |
| Arguments: |
| request: A AddFirmwareInfoRequest message. |
| |
| Returns: |
| An AddFirmwareInfoReponse message. |
| |
| Raises: |
| AftlError: If grpc or the proto modules cannot be loaded, if there is an |
| error communicating with the log. |
| """ |
| raise NotImplementedError( |
| 'AddFirmwareInfo() needs to be implemented by subclass.') |
| |
| |
| class AftlGrpcCommunication(AftlCommunication): |
| """Class that implements GRPC communication to the AFTL server.""" |
| |
| def AddFirmwareInfo(self, request): |
| """Calls the AddFirmwareInfo RPC on the AFTL server |
| |
| Arguments: |
| request: A AddFirmwareInfoRequest message. |
| |
| Returns: |
| An AddFirmwareInfoReponse message. |
| |
| Raises: |
| AftlError: If grpc or the proto modules cannot be loaded, if there is an |
| error communicating with the log. |
| """ |
| # Import grpc now to avoid global dependencies as it otherwise breakes |
| # running unittest with atest. |
| try: |
| import grpc |
| import proto.api_pb2_grpc |
| except ImportError as e: |
| err_str = 'grpc can be installed with python pip install grpcio.\n' |
| raise AftlError('Failed to import module: ({}).\n{}'.format(e, err_str)) |
| |
| # Set up the gRPC channel with the transparency log. |
| sys.stdout.write('Preparing to request inclusion proof from {}. This could ' |
| 'take ~30 seconds for the process to complete.\n'.format( |
| self.transparency_log)) |
| channel = grpc.insecure_channel(self.transparency_log) |
| stub = proto.api_pb2_grpc.AFTLogStub(channel) |
| |
| # Attempt to transmit to the transparency log. |
| sys.stdout.write('ICP is about to be requested from transparency log ' |
| 'with domain {}.\n'.format(self.transparency_log)) |
| try: |
| # TODO(danielaustin): Set a reasonable timeout deadline here. |
| response = stub.AddFirmwareInfo(request) |
| except grpc.RpcError as e: |
| raise AftlError('Error: grpc failure ({})'.format(e)) |
| return response |
| |
| |
| class Aftl(avbtool.Avb): |
| """Business logic for aftltool command-line tool.""" |
| |
| def info_image_icp(self, image_filename, output): |
| """Implements the 'info_image_icp' command. |
| |
| Arguments: |
| image_filename: Image file to get information from. |
| output: Output file to write human-readable information to (file object). |
| """ |
| image = avbtool.ImageHandler(image_filename) |
| o = output |
| (footer, header, _, _) = self._parse_image(image) |
| |
| offset = 0 |
| if footer: |
| offset = footer.vbmeta_offset |
| image.seek(offset + |
| header.SIZE + |
| header.authentication_data_block_size + |
| header.auxiliary_data_block_size) |
| |
| # Parse the header out to get the AftlDescriptor size. |
| tmp_header_bytes = image.read(AftlIcpHeader.SIZE) |
| try: |
| tmp_header = AftlIcpHeader(tmp_header_bytes) |
| except AftlError: |
| sys.stderr.write('This image does not contain a valid AftlDescriptor.\n') |
| return |
| # Reset to the beginning of the AftlDescriptor. |
| image.seek(offset + |
| header.SIZE + |
| header.authentication_data_block_size + |
| header.auxiliary_data_block_size) |
| icp_bytes = image.read(tmp_header.aftl_descriptor_size) |
| |
| icp_descriptor = AftlDescriptor(icp_bytes) |
| o.write('Android Firmware Transparency Descriptor:\n') |
| o.write(' Header:\n') |
| icp_descriptor.icp_header.print_desc(o) |
| for i, icp_entry in enumerate(icp_descriptor.icp_entries): |
| o.write(' Entry #{}:\n'.format(i + 1)) |
| icp_entry.print_desc(o) |
| o.write(' Log Root Descriptor:\n') |
| icp_entry.log_root_descriptor.print_desc(o) |
| |
| def request_inclusion_proof(self, transparency_log, vbmeta_descriptor, |
| version_inc, manufacturer_key_path, |
| signing_helper, signing_helper_with_files, |
| aftl_comms=None): |
| """Packages and sends a request to the specified transparency log. |
| |
| Arguments: |
| transparency_log: String containing the URL of a transparency log server. |
| vbmeta_descriptor: A bytearray with the vbmeta descriptor. |
| version_inc: Subcomponent of the build fingerprint. |
| manufacturer_key_path: Path to key used to sign messages sent to the |
| transparency log servers. |
| signing_helper: Program which signs a hash and returns a signature. |
| signing_helper_with_files: Same as signing_helper but uses files instead. |
| aftl_comms: A subclass of the AftlCommunication class. The default is |
| to use AftlGrpcCommunication. |
| |
| Returns: |
| An AftlIcpEntry with the inclusion proof for the log entry. |
| |
| Raises: |
| AftlError: If grpc or the proto modules cannot be loaded, if there is an |
| error communicating with the log, if the manufacturer_key_path |
| cannot be decoded, or if the log submission cannot be signed. |
| """ |
| # Calculate the hash of the vbmeta image. |
| hasher = hashlib.sha256() |
| hasher.update(vbmeta_descriptor) |
| vbmeta_hash = hasher.digest() |
| # Extract the key data from the PEM file. |
| manufacturer_key_data = rsa_key_read_pem_bytes(manufacturer_key_path) |
| # Calculate the hash of the manufacturer key data. |
| hasher = hashlib.sha256() |
| hasher.update(manufacturer_key_data) |
| m_key_hash = hasher.digest() |
| # Create an AddFirmwareInfoRequest protobuf for transmission to the |
| # transparency log. |
| fw_info = proto.aftl_pb2.FirmwareInfo(vbmeta_hash=vbmeta_hash, |
| version_incremental=version_inc, |
| manufacturer_key_hash=m_key_hash) |
| signed_fw_info = bytearray() |
| # AFTL supports SHA256_RSA4096 for now, more will be available. |
| algorithm_name = 'SHA256_RSA4096' |
| sig_num_bytes = 0 |
| alg_padding = '' |
| try: |
| alg = avbtool.ALGORITHMS[algorithm_name] |
| sig_num_bytes = alg.signature_num_bytes |
| alg_padding = alg.padding |
| except KeyError: |
| raise AftlError('Unknown algorithm with name {}'.format(algorithm_name)) |
| |
| hasher = hashlib.sha256() |
| hasher.update(fw_info.SerializeToString()) |
| fw_info_hash = hasher.digest() |
| padding_and_hash = str(bytearray(alg_padding)) + fw_info_hash |
| try: |
| signed_fw_info = avbtool.raw_sign(signing_helper, |
| signing_helper_with_files, |
| algorithm_name, |
| sig_num_bytes, |
| manufacturer_key_path, |
| padding_and_hash) |
| except avbtool.AvbError as e: |
| raise AftlError('Failed to sign FirmwareInfo with ' |
| '--manufacturer_key: {}'.format(e)) |
| fw_info_sig = proto.crypto.sigpb.sigpb_pb2.DigitallySigned( |
| hash_algorithm='SHA256', |
| signature_algorithm='RSA', |
| signature=str(signed_fw_info)) |
| |
| sfw_info = proto.aftl_pb2.SignedFirmwareInfo(info=fw_info, |
| info_signature=fw_info_sig) |
| request = proto.api_pb2.AddFirmwareInfoRequest(vbmeta=bytes( |
| str(vbmeta_descriptor)), fw_info=sfw_info) |
| |
| # Submit signed FirmwareInfo to the server. |
| if not aftl_comms: |
| aftl_comms = AftlGrpcCommunication(transparency_log) |
| response = aftl_comms.AddFirmwareInfo(request) |
| |
| # Return an AftlIcpEntry representing this response. |
| icp_entry = AftlIcpEntry() |
| icp_entry.fw_info_leaf = fw_info |
| icp_entry.translate_response(transparency_log, response) |
| return icp_entry |
| |
| def make_icp_from_vbmeta(self, vbmeta_image_path, output, |
| signing_helper, signing_helper_with_files, |
| version_incremental, transparency_log_servers, |
| transparency_log_pub_keys, manufacturer_key, |
| padding_size): |
| """Generates a vbmeta image with inclusion proof given a vbmeta image. |
| |
| The descriptor (struct AftlDescriptor) contains the information required to |
| validate an inclusion proof for a specific vbmeta image. It consists |
| of a header (struct AftlIcpHeader) and zero or more entry structures |
| (struct AftlIcpEntry) that contain the vbmeta leaf hash, tree size, |
| root hash, inclusion proof hashes, and the signature for the root hash. |
| |
| The vbmeta image, its hash, the version_incremental part of the build |
| fingerprint, and the hash of the manufacturer key are sent to the |
| transparency log, with the message signed by the manufacturer key. |
| An inclusion proof is calculated and returned. This inclusion proof is |
| then packaged in a AftlDescriptor structure. The existing vbmeta data is |
| copied to a new file, appended with the AftlDescriptor data, and written to |
| output. Validation of the inclusion proof does not require |
| communication with the transparency log. |
| |
| Arguments: |
| vbmeta_image_path: Path to a vbmeta image file. |
| output: File to write the results to. |
| signing_helper: Program which signs a hash and returns a signature. |
| signing_helper_with_files: Same as signing_helper but uses files instead. |
| version_incremental: A string representing the subcomponent of the |
| build fingerprint used to identify the vbmeta in the transparency log. |
| transparency_log_servers: List of strings containing URLs of transparency |
| log servers where inclusion proofs are requested from. |
| transparency_log_pub_keys: List of paths to PEM files containing trusted |
| public keys that correspond with the transparency_logs. There must be |
| the same number of keys as log servers and they must be in the same |
| order, that is, transparency_log_pub_keys[n] corresponds to |
| transparency_log_servers[n]. |
| manufacturer_key: Path to PEM file containting the key file used to sign |
| messages sent to the transparency log servers. |
| padding_size: If not 0, pads output so size is a multiple of the number. |
| |
| Returns: |
| True if the inclusion proofs could be fetched from the transparency log |
| servers and could be successfully validated, False otherwise. |
| |
| Raises: |
| AftlError: If any parameters are invalid, communication with the log |
| fails or the structures are malformed. |
| """ |
| # TODO(danielaustin): Determine the best way to handle chained vbmeta |
| # structures. Currently, we only put the main one in the transparency |
| # log. |
| |
| # Validates command line parameters. |
| if len(transparency_log_servers) != len(transparency_log_pub_keys): |
| raise AftlError('Transparency log count and public key count mismatch: ' |
| '{} servers and {} public keys'.format( |
| len(transparency_log_servers), |
| len(transparency_log_pub_keys))) |
| |
| # Retrieves vbmeta structure from given partition image. |
| image = avbtool.ImageHandler(vbmeta_image_path) |
| (footer, header, _, _) = self._parse_image(image) |
| offset = 0 |
| if footer: |
| offset = footer.vbmeta_offset |
| image.seek(offset) |
| vbmeta_image = image.read(header.SIZE + |
| header.authentication_data_block_size + |
| header.auxiliary_data_block_size) |
| |
| # Fetches inclusion proofs for vbmeta structure from all transparency logs. |
| icp_entries = [] |
| for i, transparency_log in enumerate(transparency_log_servers): |
| try: |
| icp_entry = self.request_inclusion_proof(transparency_log, vbmeta_image, |
| version_incremental, |
| manufacturer_key, |
| signing_helper, |
| signing_helper_with_files) |
| # TODO(danielaustin): Update icp_entry to validate if the vbmeta image |
| # matches with the ICP stored data, and store the correct ICP |
| # in the icp_entry. |
| if not icp_entry.verify_icp(transparency_log_pub_keys[i]): |
| sys.stderr.write('The ICP from {} could not be verified\n'.format( |
| transparency_log)) |
| icp_entries.append(icp_entry) |
| except AftlError as e: |
| sys.stderr.write('AftlError: {}'.format(e)) |
| # The inclusion proof request failed. |
| # Continue and see if another will succeed. |
| continue |
| if not icp_entries: |
| sys.stderr.write('No inclusion proofs could be validated from any log.\n') |
| return False |
| |
| # Prepares the AFTL descriptor to be appended to the vbmeta image. |
| aftl_descriptor = AftlDescriptor() |
| for icp_entry in icp_entries: |
| aftl_descriptor.add_icp_entry(icp_entry) |
| if not aftl_descriptor.is_valid(): |
| sys.stderr.write('Resulting AftlDescriptor structure is malformed\n.') |
| return False |
| |
| # Write the original vbmeta descriptor, followed by the AftlDescriptor. |
| if footer: # Checks if it is a chained partition. |
| # TODO(danielaustin): Add support for chained partitions like system.img |
| # using similar functionality as implemented in append_vbmeta_image(). |
| sys.stderr.write('Image has a footer and ICP for this format is not ' |
| 'implemented.') |
| return False |
| |
| # Writes vbmeta image with inclusion proof into a new vbmeta image. |
| output.seek(0) |
| output.write(vbmeta_image) |
| encoded_aftl_descriptor = aftl_descriptor.encode() |
| output.write(encoded_aftl_descriptor) |
| |
| if padding_size > 0: |
| descriptor_size = len(vbmeta_image) + len(encoded_aftl_descriptor) |
| padded_size = avbtool.round_to_multiple(descriptor_size, padding_size) |
| padding_needed = padded_size - descriptor_size |
| output.write('\0' * padding_needed) |
| |
| return True |
| |
| def _load_test_process_function(self, vbmeta_image_path, |
| transparency_log_server, |
| transparency_log_pub_key, manufacturer_key, |
| process_number, submission_count, |
| preserve_icp_images, result_queue): |
| """Function to be used by multiprocessing.Process. |
| |
| Arguments: |
| vbmeta_image_path: Path to a vbmeta image file. |
| transparency_log_server: A string in host:port format of transparency log |
| servers where a inclusion proof is requested from. |
| transparency_log_pub_key: Path to PEM file containing trusted |
| public keys that corresponds with the transparency_log_server. |
| manufacturer_key: Path to PEM file containting the key file used to sign |
| messages sent to the transparency log servers. |
| process_number: The number of the processes executing the function. |
| submission_count: Number of total submissions to perform per |
| process_count. |
| preserve_icp_images: Boolean to indicate if the generated vbmeta |
| image files with inclusion proofs should preserved. |
| result_queue: Multiprocessing.Queue object for posting execution results. |
| """ |
| for count in range(0, submission_count): |
| version_incremental = 'aftl_load_testing_{}_{}'.format(process_number, |
| count) |
| output_file = '{}_icp.img'.format(version_incremental) |
| output = open(output_file, 'wb') |
| |
| # Instrumented section. |
| start_time = time.time() |
| result = self.make_icp_from_vbmeta( |
| vbmeta_image_path=vbmeta_image_path, |
| output=output, |
| signing_helper=None, |
| signing_helper_with_files=None, |
| version_incremental=version_incremental, |
| transparency_log_servers=[transparency_log_server], |
| transparency_log_pub_keys=[transparency_log_pub_key], |
| manufacturer_key=manufacturer_key, |
| padding_size=0) |
| end_time = time.time() |
| |
| output.close() |
| if not preserve_icp_images: |
| os.unlink(output_file) |
| |
| # Puts the result onto the result queue. |
| execution_time = end_time - start_time |
| result_queue.put((start_time, end_time, execution_time, |
| version_incremental, result)) |
| |
| def load_test_aftl(self, vbmeta_image_path, output, transparency_log_server, |
| transparency_log_pub_key, manufacturer_key, |
| process_count, submission_count, stats_filename, |
| preserve_icp_images): |
| """Performs multi-threaded load test on a given AFTL and prints stats. |
| |
| Arguments: |
| vbmeta_image_path: Path to a vbmeta image file. |
| output: File to write the report to. |
| transparency_log_server: A string in host:port format of transparency log |
| servers where a inclusion proof is requested from. |
| transparency_log_pub_key: Path to PEM file containing trusted |
| public keys that corresponds with the transparency_log_server. |
| manufacturer_key: Path to PEM file containting the key file used to sign |
| messages sent to the transparency log servers. |
| process_count: Number of processes used for parallel testing. |
| submission_count: Number of total submissions to perform per |
| process_count. |
| stats_filename: Path to the stats file to write the raw execution data to. |
| preserve_icp_images: Boolean to indicate if the generated vbmeta |
| image files with inclusion proofs should preserved. |
| |
| Returns: |
| True if the load tested succeeded without errors; otherwise False. |
| """ |
| if process_count < 1 or submission_count < 1: |
| sys.stderr.write('Values for --processes/--submissions ' |
| 'must be at least 1.') |
| return False |
| |
| if not stats_filename: |
| stats_filename = 'load_test_p{}_s{}.csv'.format(process_count, |
| submission_count) |
| try: |
| stats_file = open(stats_filename, 'w') |
| stats_file.write('start_time,end_time,execution_time,version_incremental,' |
| 'result\n') |
| except IOError as e: |
| sys.stderr.write('Could not open stats file {}: {}'.format(stats_file, e)) |
| return False |
| |
| # Launch all the processes with their workloads. |
| result_queue = multiprocessing.Queue() |
| processes = set() |
| execution_times = [] |
| results = [] |
| for i in range(0, process_count): |
| p = multiprocessing.Process( |
| target=self._load_test_process_function, |
| args=(vbmeta_image_path, transparency_log_server, |
| transparency_log_pub_key, manufacturer_key, i, submission_count, |
| preserve_icp_images, result_queue)) |
| p.start() |
| processes.add(p) |
| |
| while processes: |
| # Processes the results queue and writes these to a stats file. |
| try: |
| (start_time, end_time, execution_time, version_incremental, |
| result) = result_queue.get(timeout=1) |
| stats_file.write('{},{},{},{},{}\n'.format(start_time, end_time, |
| execution_time, |
| version_incremental, result)) |
| execution_times.append(execution_time) |
| results.append(result) |
| except Queue.Empty: |
| pass |
| |
| # Checks if processes are still alive; if not clean them up. join() would |
| # have been nicer but we want to continously stream out the stats to file. |
| for p in processes.copy(): |
| if not p.is_alive(): |
| processes.remove(p) |
| |
| # Prepares stats. |
| executions = sorted(execution_times) |
| execution_count = len(execution_times) |
| median = 0 |
| if execution_count % 2 == 0: |
| median = (executions[execution_count // 2 - 1] |
| + executions[execution_count // 2]) / 2 |
| else: |
| median = executions[execution_count // 2] |
| |
| # Outputs the stats report. |
| o = output |
| o.write('Load testing results:\n') |
| o.write(' Processes: {}\n'.format(process_count)) |
| o.write(' Submissions per process: {}\n'.format(submission_count)) |
| o.write('\n') |
| o.write(' Submissions:\n') |
| o.write(' Total: {}\n'.format(len(executions))) |
| o.write(' Succeeded: {}\n'.format(results.count(True))) |
| o.write(' Failed: {}\n'.format(results.count(False))) |
| o.write('\n') |
| o.write(' Submission execution durations:\n') |
| o.write(' Average: {:.2f} sec\n'.format( |
| sum(execution_times) / execution_count)) |
| o.write(' Median: {:.2f} sec\n'.format(median)) |
| o.write(' Min: {:.2f} sec\n'.format(min(executions))) |
| o.write(' Max: {:.2f} sec\n'.format(max(executions))) |
| |
| # Close the stats file. |
| stats_file.close() |
| return True |
| |
| |
| class AftlTool(avbtool.AvbTool): |
| """Object for aftltool command-line tool.""" |
| |
| def __init__(self): |
| """Initializer method.""" |
| self.aftl = Aftl() |
| super(AftlTool, self).__init__() |
| |
| def make_icp_from_vbmeta(self, args): |
| """Implements the 'make_icp_from_vbmeta' sub-command.""" |
| args = self._fixup_common_args(args) |
| self.aftl.make_icp_from_vbmeta(args.vbmeta_image_path, |
| args.output, |
| args.signing_helper, |
| args.signing_helper_with_files, |
| args.version_incremental, |
| args.transparency_log_servers, |
| args.transparency_log_pub_keys, |
| args.manufacturer_key, |
| args.padding_size) |
| |
| def info_image_icp(self, args): |
| """Implements the 'info_image_icp' sub-command.""" |
| self.aftl.info_image_icp(args.vbmeta_image_path.name, args.output) |
| |
| def load_test_aftl(self, args): |
| """Implements the 'load_test_aftl' sub-command.""" |
| self.aftl.load_test_aftl(args.vbmeta_image_path, |
| args.output, |
| args.transparency_log_server, |
| args.transparency_log_pub_key, |
| args.manufacturer_key, |
| args.processes, |
| args.submissions, |
| args.stats_file, |
| args.preserve_icp_images) |
| |
| def run(self, argv): |
| """Command-line processor. |
| |
| Arguments: |
| argv: Pass sys.argv from main. |
| """ |
| parser = argparse.ArgumentParser() |
| subparsers = parser.add_subparsers(title='subcommands') |
| |
| # Command: make_icp_from_vbmeta |
| sub_parser = subparsers.add_parser('make_icp_from_vbmeta', |
| help='Makes an ICP enhanced vbmeta image' |
| ' from an existing vbmeta image.') |
| sub_parser.add_argument('--output', |
| help='Output file name.', |
| type=argparse.FileType('wb'), |
| default=sys.stdout) |
| sub_parser.add_argument('--vbmeta_image_path', |
| help='Path to a generate vbmeta image file.', |
| required=True) |
| sub_parser.add_argument('--version_incremental', |
| help='Current build ID.', |
| required=True) |
| sub_parser.add_argument('--manufacturer_key', |
| help='Path to the PEM file containing the ' |
| 'manufacturer key for use with the log.', |
| required=True) |
| sub_parser.add_argument('--transparency_log_servers', |
| help='List of transparency log servers in ' |
| 'host:port format. This must not be None and must ' |
| 'be the same size as transparency_log_pub_keys. ' |
| 'Also, transparency_log_servers[n] must correspond ' |
| 'to transparency_log_pub_keys[n] for all values n.', |
| nargs='*', |
| required=True) |
| sub_parser.add_argument('--transparency_log_pub_keys', |
| help='Paths to PEM files containing transparency ' |
| 'log server key(s). This must not be None and must ' |
| 'be the same size as transparency_log_servers. ' |
| 'Also, transparency_log_pub_keys[n] must ' |
| 'correspond to transparency_log_servers[n] for all ' |
| 'values n.', |
| nargs='*', |
| required=True) |
| sub_parser.add_argument('--padding_size', |
| metavar='NUMBER', |
| help='If non-zero, pads output with NUL bytes so ' |
| 'its size is a multiple of NUMBER ' |
| '(default: 0)', |
| type=avbtool.parse_number, |
| default=0) |
| self._add_common_args(sub_parser) |
| sub_parser.set_defaults(func=self.make_icp_from_vbmeta) |
| |
| # Command: info_image_icp |
| sub_parser = subparsers.add_parser( |
| 'info_image_icp', |
| help='Show information about AFTL ICPs in vbmeta or footer.') |
| sub_parser.add_argument('--vbmeta_image_path', |
| help='Path to vbmeta image for AFTL information.', |
| type=argparse.FileType('rb'), |
| required=True) |
| sub_parser.add_argument('--output', |
| help='Write info to file', |
| type=argparse.FileType('wt'), |
| default=sys.stdout) |
| sub_parser.set_defaults(func=self.info_image_icp) |
| |
| # Command: load_test_aftl |
| sub_parser = subparsers.add_parser( |
| 'load_test_aftl', |
| help='Perform load testing against one AFTL log server. Note: This MUST' |
| ' not be performed against a production system.') |
| sub_parser.add_argument('--vbmeta_image_path', |
| help='Path to a generate vbmeta image file.', |
| required=True) |
| sub_parser.add_argument('--output', |
| help='Write report to file.', |
| type=argparse.FileType('wt'), |
| default=sys.stdout) |
| sub_parser.add_argument('--manufacturer_key', |
| help='Path to the PEM file containing the ' |
| 'manufacturer key for use with the log.', |
| required=True) |
| sub_parser.add_argument('--transparency_log_server', |
| help='Transparency log servers to test against in ' |
| 'host:port format.', |
| required=True) |
| sub_parser.add_argument('--transparency_log_pub_key', |
| help='Paths to PEM file containing transparency ' |
| 'log server key.', |
| required=True) |
| sub_parser.add_argument('--processes', |
| help='Number of parallel processes to use for ' |
| 'testing (default: 1).', |
| type=avbtool.parse_number, |
| default=1) |
| sub_parser.add_argument('--submissions', |
| help='Number of submissions to perform against the ' |
| 'log per process (default: 1).', |
| type=avbtool.parse_number, |
| default=1) |
| sub_parser.add_argument('--stats_file', |
| help='Path to the stats file to write the raw ' |
| 'execution data to (Default: ' |
| 'load_test_p[processes]_s[submissions].csv.') |
| sub_parser.add_argument('--preserve_icp_images', |
| help='Boolean flag to indicate if the generated ' |
| 'vbmeta image files with inclusion proofs should ' |
| 'preserved.', |
| action='store_true') |
| sub_parser.set_defaults(func=self.load_test_aftl) |
| |
| args = parser.parse_args(argv[1:]) |
| try: |
| args.func(args) |
| except AftlError: |
| # Indicate failure to signal to calling tools. |
| sys.exit(1) |
| |
| if __name__ == '__main__': |
| tool = AftlTool() |
| tool.run(sys.argv) |