Merge "Refactored to use argsparse to enforce command-lined option requirements."
diff --git a/Android.bp b/Android.bp
index 5226edd..6bc28d1 100644
--- a/Android.bp
+++ b/Android.bp
@@ -97,12 +97,32 @@
cmd: "cp $(in) $(out)",
}
+python_library_host {
+ name: "aftl_proto",
+ srcs: [
+ "proto/**/*.py",
+ ],
+ version: {
+ py2: {
+ enabled: true,
+ // This is needs to be false due to b/146057182#comment5.
+ embedded_launcher: false,
+ },
+ py3: {
+ enabled: false,
+ },
+ },
+}
+
python_binary_host {
name: "aftltool",
srcs: [
":aftltool_py",
":avbtool_py",
],
+ libs: [
+ "aftl_proto",
+ ],
main: "aftltool.py",
required: ["fec"],
version: {
@@ -124,11 +144,17 @@
":avbtool_py",
"aftltool_test.py",
],
+ libs: [
+ "aftl_proto",
+ ],
+ data: [
+ "test/data/testkey_rsa4096.pem",
+ ],
test_suites: ["general-tests"],
version: {
py2: {
enabled: true,
- //TODO: This is needed due to b/146057182#comment5.
+ // This is needs to be false due to b/146057182#comment5.
embedded_launcher: false,
},
py3: {
diff --git a/aftltool b/aftltool
index c60d8f4..b63217e 100755
--- a/aftltool
+++ b/aftltool
@@ -40,6 +40,9 @@
import avbtool
+import proto.aftl_pb2
+import proto.api_pb2
+import proto.crypto.sigpb
# Android Firmware Transparency Log Data Structures
@@ -773,10 +776,11 @@
if self.metadata_size < 0 or self.metadata_size > 65535:
sys.stderr.write('{}: Bad metadatasize value {}.'.format(
cls, self.metadata_size))
+ return False
if len(self.metadata) != self.metadata_size:
sys.stderr.write('{}: metadata_size {} does not match with length of'
'metadata {}'.format(cls, self.metadata_size,
- len(self.metadata_size)))
+ len(self.metadata)))
return False
return True
@@ -910,6 +914,78 @@
return True
+class AftlCommunication(object):
+ """Class to abstract the communication layer with the transparency log."""
+
+ def __init__(self, transparency_log):
+ """Initializes the object.
+
+ Arguments:
+ transparency_log: String containing the URL of a transparency log server.
+
+ """
+ self.transparency_log = transparency_log
+
+ def AddFirmwareInfo(self, request):
+ """Calls the AddFirmwareInfo RPC on the AFTL server.
+
+ Arguments:
+ request: A AddFirmwareInfoRequest message.
+
+ Returns:
+ An AddFirmwareInfoReponse message.
+
+ Raises:
+ AftlError: If grpc or the proto modules cannot be loaded, if there is an
+ error communicating with the log.
+ """
+ raise NotImplementedError(
+ 'AddFirmwareInfo() needs to be implemented by subclass.')
+
+
+class AftlGrpcCommunication(AftlCommunication):
+ """Class that implements GRPC communication to the AFTL server."""
+
+ def AddFirmwareInfo(self, request):
+ """Calls the AddFirmwareInfo RPC on the AFTL server
+
+ Arguments:
+ request: A AddFirmwareInfoRequest message.
+
+ Returns:
+ An AddFirmwareInfoReponse message.
+
+ Raises:
+ AftlError: If grpc or the proto modules cannot be loaded, if there is an
+ error communicating with the log.
+ """
+ # Import grpc now to avoid global dependencies as it otherwise breakes
+ # running unittest with atest.
+ try:
+ import grpc
+ import proto.api_pb2_grpc
+ except ImportError as e:
+ err_str = 'grpc can be installed with python pip install grpcio.\n'
+ raise AftlError('Failed to import module: ({}).\n{}'.format(e, err_str))
+
+ # Set up the gRPC channel with the transparency log.
+ sys.stdout.write('Preparing to request inclusion proof from {}. This could '
+ 'take ~30 seconds for the process to complete.\n'.format(
+ self.transparency_log))
+ channel = grpc.insecure_channel(self.transparency_log)
+ stub = proto.api_pb2_grpc.AFTLogStub(channel)
+
+ # Attempt to transmit to the transparency log.
+ sys.stdout.write('ICP is about to be requested from transparency log '
+ 'with domain {}.\n'.format(self.transparency_log))
+ try:
+ # TODO(danielaustin): Set a reasonable timeout deadline here.
+ response = stub.AddFirmwareInfo(request)
+ except grpc.RpcError as e:
+ raise AftlError('Error: grpc failure ({})'.format(e))
+ return response
+
+
class Aftl(avbtool.Avb):
"""Business logic for aftltool command-line tool."""
@@ -956,10 +1032,10 @@
o.write(' Log Root Descriptor:\n')
icp_entry.log_root_descriptor.print_desc(o)
- # TODO(danielaustin): Add unit tests.
def request_inclusion_proof(self, transparency_log, vbmeta_descriptor,
version_inc, manufacturer_key_path,
- signing_helper, signing_helper_with_files):
+ signing_helper, signing_helper_with_files,
+ aftl_comms=None):
"""Packages and sends a request to the specified transparency log.
Arguments:
@@ -967,9 +1043,12 @@
vbmeta_descriptor: A bytearray with the vbmeta descriptor.
version_inc: Subcomponent of the build fingerprint.
manufacturer_key_path: Path to key used to sign messages sent to the
- transparency log servers.
+ transparency log servers.
signing_helper: Program which signs a hash and returns a signature.
signing_helper_with_files: Same as signing_helper but uses files instead.
+ aftl_comms: A subclass of the AftlCommunication class. The default is
+ to use AftlGrpcCommunication.
+
Returns:
An AftlIcpEntry with the inclusion proof for the log entry.
@@ -978,22 +1057,6 @@
error communicating with the log, if the manufacturer_key_path
cannot be decoded, or if the log submission cannot be signed.
"""
- # Import grpc and proto.api_pb2_grpc now to avoid global dependencies.
- try:
- import grpc
- import proto.api_pb2_grpc
- import proto.crypto.sigpb
- except ImportError as e:
- err_str = 'grpc can be installed with python pip install grpcio.\n'
- raise AftlError('Failed to import module: ({}).\n{}'.format(e, err_str))
-
- # Set up the gRPC channel with the transparency log.
- sys.stdout.write('Preparing to request inclusion proof from {}. This could '
- 'take ~30 seconds for the process to complete.\n'.format(
- transparency_log))
- channel = grpc.insecure_channel(transparency_log)
- stub = proto.api_pb2_grpc.AFTLogStub(channel)
-
# Calculate the hash of the vbmeta image.
hasher = hashlib.sha256()
hasher.update(vbmeta_descriptor)
@@ -1010,7 +1073,7 @@
version_incremental=version_inc,
manufacturer_key_hash=m_key_hash)
signed_fw_info = bytearray()
- # AFTL supports SHA256_SA4096 for now, more will be available.
+ # AFTL supports SHA256_RSA4096 for now, more will be available.
algorithm_name = 'SHA256_RSA4096'
sig_num_bytes = 0
alg_padding = ''
@@ -1044,14 +1107,12 @@
info_signature=fw_info_sig)
request = proto.api_pb2.AddFirmwareInfoRequest(vbmeta=bytes(
str(vbmeta_descriptor)), fw_info=sfw_info)
- # Attempt to transmit to the transparency log.
- try:
- # TODO(danielaustin): Set a reasonable timeout deadline here.
- sys.stdout.write('ICP is about to be requested from transparency log '
- 'with domain {}.\n'.format(transparency_log))
- response = stub.AddFirmwareInfo(request)
- except grpc.RpcError as e:
- raise AftlError('Error: grpc failure ({})'.format(e))
+
+ # Submit signed FirmwareInfo to the server.
+ if not aftl_comms:
+ aftl_comms = AftlGrpcCommunication(transparency_log)
+ response = aftl_comms.AddFirmwareInfo(request)
+
# Return an AftlIcpEntry representing this response.
icp_entry = AftlIcpEntry()
icp_entry.fw_info_leaf = fw_info
diff --git a/aftltool_test.py b/aftltool_test.py
index 538a12c..24e960f 100755
--- a/aftltool_test.py
+++ b/aftltool_test.py
@@ -28,11 +28,17 @@
from __future__ import print_function
import binascii
+import io
import os
import sys
import unittest
import aftltool
+import avbtool
+import proto.aftl_pb2
+import proto.api_pb2
+import proto.trillian_pb2
+
class AftltoolTestCase(unittest.TestCase):
@@ -47,6 +53,53 @@
self.null = open(os.devnull, 'wb')
sys.stderr = self.null
+ # Sets up test data.
+ # pylint: disable=no-member
+ self.test_afi_resp = proto.api_pb2.AddFirmwareInfoResponse()
+ self.test_afi_resp.fw_info_proof.proof.leaf_index = 6263
+ hashes = [
+ '3ad99869646980c0a51d637a9791f892d12e0bc83f6bac5d305a9e289e7f7e8b',
+ '2e5c664d2aee64f71cb4d292e787d0eae7ca9ed80d1e08abb41d26baca386c05',
+ 'a671dd99f8d97e9155cc2f0a9dc776a112a5ec5b821ec71571bb258ac790717a',
+ '78046b839595e4e49ad4b0c73f92bf4803aacd4a3351181086509d057ef0d7a9',
+ 'c0a7e013f03e7c69e9402070e113dadb345868cf144ccb174fabc384b5605abf',
+ 'dc36e5dbe36abe9f4ad10f14170aa0148b6fe3fcaba9df43deaf4dede01b02e8',
+ 'b063e7fb665370a361718208756c363dc5206e2e9af9b4d847d81289cdae30de',
+ 'a69ea5ba88a221103636d3f4245c800570eb86ad9276121481521f97d0a04a81']
+ for h in hashes:
+ self.test_afi_resp.fw_info_proof.proof.hashes.append(
+ binascii.unhexlify(h))
+ self.test_afi_resp.fw_info_proof.sth.key_hint = binascii.unhexlify(
+ '5af859abce8fe1ea')
+ self.test_afi_resp.fw_info_proof.sth.log_root = binascii.unhexlify(
+ '000100000000000018782053b182b55dc1377197c938637f50093131daea4'
+ 'd0696b1eae5b8a014bfde884a15edb28f1fc7954400000000000013a50000'
+ )
+ self.test_afi_resp.vbmeta_proof.sth.log_root_signature = binascii.unhexlify(
+ 'c264bc7986a1cf56364ca4dd04989f45515cb8764d05b4fb2b880172585ea404'
+ '2105f95a0e0471fb6e0f8c762b14b2e526fb78eaddcc61484917795a12f6ab3b'
+ '557b5571d492d07d7950595f9ad8647a606c7c633f4697c5eb59c272aeca0419'
+ '397c70a3b9b51537537c4ea6b49d356110e70a9286902f814cc6afbeafe612e4'
+ '9e180146140e902bdd9e9dae66b37b4943150a9571949027a648db88a4eea3ad'
+ 'f930b4fa6a183e97b762ab0e55a3a26aa6b0fd44d30531e2541ecb94bf645e62'
+ '59e8e3151e7c3b51a09fe24557ce2fd2c0ecdada7ce99c390d2ef10e5d075801'
+ '7c10d49c55cdee930959cc35f0104e04f296591eeb5defbc9ebb237da7b204ca'
+ 'a4608cb98d6bc3a01f18585a04441caf8ec7a35aa2d35f7483b92b14fd0f4a41'
+ '3a91133545579309adc593222ca5032a103b00d8fcaea911936dbec11349e4dd'
+ '419b091ea7d1130570d70e2589dd9445fd77fd7492507e1c87736847b9741cc6'
+ '236868af42558ff6e833e12010c8ede786e43ada40ff488f5f1870d1619887d7'
+ '66a24ad0a06a47cc14e2f7db07361be191172adf3155f49713807c7c265f5a84'
+ '040fc84246ccf7913e44721f0043cea05ee774e457e13206775eee992620c3f9'
+ 'd2b2584f58aac19e4afe35f0a17df699c45729f94101083f9fc4302659a7e6e0'
+ 'e7eb36f8d1ca0be2c9010160d329bd2d17bb707b010fdd63c30b667a0b886cf9'
+ )
+ self.test_afi_resp.fw_info_leaf = (
+ '{\"timestamp\":{\"seconds\":1580115370,\"nanos\":621454825},\"Va'
+ 'lue\":{\"FwInfo\":{\"info\":{\"info\":{\"vbmeta_hash\":\"ViNzEQS'
+ '/oc/bJ13yl40fk/cvXw90bxHQbzCRxgHDIGc=\",\"version_incremental\":'
+ '\"1\",\"manufacturer_key_hash\":\"yBCrUOdjvaAh4git5EgqWa5neegUao'
+ 'XeLlB67+N8ObY=\"}}}}}')
+
def tearDown(self):
"""Tears down the test bed for the unit tests."""
# Reconnects stderr back to the normal stderr; see setUp() for details.
@@ -70,21 +123,6 @@
self.test_sth.log_root_sig = bytearray('root_sig' * 64)
self.test_proofs = 'proofs'
- def _validate_icp_header(self, aftl_descriptor_size, icp_count):
- """Validate an ICP header structure and attempt to validate it.
-
- Arguments:
- aftl_descriptor_size: Total size of the AftlDescriptor.
- icp_count: Number of ICPs that follow the ICP header.
-
- Returns:
- True if the ICP header validates; otherwise False.
- """
- icp_header = aftltool.AftlIcpHeader()
- icp_header.aftl_descriptor_size = aftl_descriptor_size
- icp_header.icp_count = icp_count
- return icp_header.is_valid()
-
def _validate_icp_entry_with_setters(
self, log_url, leaf_index, log_root_descriptor, proofs):
"""Create an ICP entry structure and attempt to validate it.
@@ -118,23 +156,6 @@
icp_entry.inc_proof_size = inc_proof_size
return icp_entry.is_valid()
- def test_default_icp_header(self):
- """Tests default ICP header structure."""
- icp_header = aftltool.AftlIcpHeader()
- self.assertTrue(icp_header.is_valid())
-
- def test_valid_icp_header(self):
- """Tests valid ICP header structures."""
- self.assertTrue(self._validate_icp_header(icp_count=4,
- aftl_descriptor_size=18))
-
- def test_invalid_icp_header(self):
- """Tests invalid ICP header structures."""
- self.assertFalse(self._validate_icp_header(icp_count=-34,
- aftl_descriptor_size=18))
- self.assertFalse(self._validate_icp_header(icp_count=3,
- aftl_descriptor_size=10))
-
def test_default_icp_entry(self):
"""Tests default ICP entry structure."""
icp_entry = aftltool.AftlIcpEntry()
@@ -256,18 +277,6 @@
def test_generate_icp_images(self):
"""Test cases for full AFTL ICP structure generation."""
- icp_header = aftltool.AftlIcpHeader()
- icp_header.icp_count = 1
-
- # Tests ICP header encoding.
- expected_header_bytes = bytearray(b'\x41\x46\x54\x4c\x00\x00\x00\x01\x00'
- '\x00\x00\x01\x00\x00\x00\x12\x00\x01')
- icp_header_bytes = icp_header.encode()
- self.assertEqual(icp_header_bytes, expected_header_bytes)
-
- # Tests ICP header decoding.
- icp_header = aftltool.AftlIcpHeader(expected_header_bytes)
- self.assertTrue(icp_header.is_valid())
tl_url = 'aftl-test-server.google.com'
sth = aftltool.TrillianLogRootDescriptor()
@@ -494,11 +503,112 @@
self.assertEqual(root_hash, roots[icp[1] -1])
+class AftlIcpHeaderTest(AftltoolTestCase):
+ """Test suite for testing the AftlIcpHeader descriptor."""
+
+ def setUp(self):
+ """Sets up the test bed for the unit tests."""
+ super(AftlIcpHeaderTest, self).setUp()
+
+ self.test_header_valid = aftltool.AftlIcpHeader()
+ self.test_header_valid.icp_count = 1
+
+ self.test_header_invalid = aftltool.AftlIcpHeader()
+ self.test_header_invalid.icp_count = -34
+
+ self.test_header_bytes = bytearray(b'\x41\x46\x54\x4c\x00\x00\x00\x01'
+ '\x00\x00\x00\x01\x00\x00\x00\x12'
+ '\x00\x01')
+
+ def test__init__(self):
+ """Tests default ICP header structure."""
+
+ # Calls constructor without data.
+ header = aftltool.AftlIcpHeader()
+ self.assertEqual(header.magic, 'AFTL')
+ self.assertEqual(header.required_icp_version_major,
+ avbtool.AVB_VERSION_MAJOR)
+ self.assertEqual(header.required_icp_version_minor,
+ avbtool.AVB_VERSION_MINOR)
+ self.assertEqual(header.aftl_descriptor_size, aftltool.AftlIcpHeader.SIZE)
+ self.assertEqual(header.icp_count, 0)
+ self.assertTrue(header.is_valid())
+
+ # Calls constructor with data.
+ header = aftltool.AftlIcpHeader(self.test_header_bytes)
+ self.assertEqual(header.magic, 'AFTL')
+ self.assertEqual(header.required_icp_version_major, 1)
+ self.assertEqual(header.required_icp_version_minor, 1)
+ self.assertEqual(header.aftl_descriptor_size, aftltool.AftlIcpHeader.SIZE)
+ self.assertTrue(header.icp_count, 1)
+ self.assertTrue(header.is_valid())
+
+ def test_save(self):
+ """Tests ICP header save method."""
+ buf = io.BytesIO()
+ self.test_header_valid.save(buf)
+ self.assertEqual(buf.getvalue(), self.test_header_bytes)
+
+ def test_encode(self):
+ """Tests ICP header encoding."""
+ # Valid header.
+ header_bytes = self.test_header_valid.encode()
+ self.assertEqual(header_bytes, self.test_header_bytes)
+
+ # Invalid header
+ with self.assertRaises(aftltool.AftlError):
+ header_bytes = self.test_header_invalid.encode()
+
+ def test_is_valid(self):
+ """Tests valid ICP header structures."""
+ # Invalid magic.
+ header = aftltool.AftlIcpHeader()
+ self.assertTrue(header.is_valid())
+
+ # Invalid magic.
+ header = aftltool.AftlIcpHeader()
+ header.magic = 'YOLO'
+ self.assertFalse(header.is_valid())
+
+ # Valid ICP count.
+ self.assertTrue(self.test_header_valid.is_valid())
+
+ # Invalid ICP count.
+ self.assertFalse(self.test_header_invalid.is_valid())
+
+ header = aftltool.AftlIcpHeader()
+ header.icp_count = 10000000
+ self.assertFalse(header.is_valid())
+
+ # Invalid ICP major version.
+ header = aftltool.AftlIcpHeader()
+ header.required_icp_version_major = avbtool.AVB_VERSION_MAJOR + 1
+ self.assertFalse(header.is_valid())
+
+ # Invalid ICP minor version.
+ header = aftltool.AftlIcpHeader()
+ header.required_icp_version_minor = avbtool.AVB_VERSION_MINOR + 1
+ self.assertFalse(header.is_valid())
+
+ def test_print_desc(self):
+ """Tests print_desc method."""
+ buf = io.BytesIO()
+ self.test_header_valid.print_desc(buf)
+ desc = buf.getvalue()
+
+ # Cursory whether the printed description contains something useful.
+ self.assertGreater(len(desc), 0)
+ self.assertTrue('Major version:' in desc)
+
+
class TrillianLogRootDescriptorTest(AftltoolTestCase):
+ """Test suite for testing the TrillianLogRootDescriptorTest descriptor."""
def setUp(self):
"""Sets up the test bed for the unit tests."""
super(TrillianLogRootDescriptorTest, self).setUp()
+
+ # Creates basic log root without metadata fields.
base_log_root = (
'0001' # version
'00000000000002e5' # tree_size
@@ -508,38 +618,241 @@
'15e1c97e3b4bd239' # timestamp
'00000000000002e4' # revision
)
- self.test_log_root_without_metadata = binascii.unhexlify(
+
+ # Create valid log roots with metadata fields w/ and w/o metadata.
+ self.test_log_root_bytes_wo_metadata = binascii.unhexlify(
base_log_root + '0000')
- self.test_log_root_with_metadata = binascii.unhexlify(
+ self.test_log_root_bytes_with_metadata = binascii.unhexlify(
base_log_root + '00023132')
- def test_valid_empty_descriptor(self):
- """Tests behavior of instance creation without data."""
+ def test__init__(self):
+ """Tests constructor."""
+ # Calls constructor without data.
d = aftltool.TrillianLogRootDescriptor()
self.assertTrue(d.is_valid())
+ self.assertEqual(d.version, 1)
+ self.assertEqual(d.tree_size, 0)
+ self.assertEqual(d.root_hash_size, 0)
+ self.assertEqual(d.root_hash, bytearray())
+ self.assertEqual(d.timestamp, 0)
+ self.assertEqual(d.revision, 0)
+ self.assertEqual(d.metadata_size, 0)
+ self.assertEqual(d.metadata, bytearray())
- def test_valid_parsed_descriptor_without_metadata(self):
- """Tests parsing of a Trillian log_root structure."""
- d = aftltool.TrillianLogRootDescriptor(self.test_log_root_without_metadata)
+ # Calls constructor with log_root w/o metadata
+ d = aftltool.TrillianLogRootDescriptor(self.test_log_root_bytes_wo_metadata)
self.assertTrue(d.is_valid())
self.assertEqual(d.version, 1)
self.assertEqual(d.tree_size, 741)
self.assertEqual(d.root_hash_size, 32)
- self.assertEqual(binascii.hexlify(d.root_hash),
- '2d614759ad408a111a3351c0cb33c099'
- '422c30a5c5104788a343332bde2b387b')
+ self.assertEqual(d.root_hash,
+ binascii.unhexlify('2d614759ad408a111a3351c0cb33c099'
+ '422c30a5c5104788a343332bde2b387b'))
self.assertEqual(d.timestamp, 1576762888554271289)
self.assertEqual(d.revision, 740)
self.assertEqual(d.metadata_size, 0)
self.assertEqual(d.metadata, bytearray())
- def test_valid_parsed_descriptor_with_metadata(self):
- """Tests parsing of a Trillian log_root structure with metadata field."""
- d = aftltool.TrillianLogRootDescriptor(self.test_log_root_with_metadata)
- self.assertTrue(d.is_valid())
+ # Calls constructor with log_root with metadata
+ d = aftltool.TrillianLogRootDescriptor(
+ self.test_log_root_bytes_with_metadata)
self.assertEqual(d.metadata_size, 2)
self.assertEqual(d.metadata, bytearray('12'))
+ def test_get_expected_size(self):
+ """Tests get_expected_size method."""
+ # Default constructor.
+ d = aftltool.TrillianLogRootDescriptor()
+ self.assertEqual(d.get_expected_size(), 11 + 18)
+
+ # Log root without metadata.
+ d = aftltool.TrillianLogRootDescriptor(self.test_log_root_bytes_wo_metadata)
+ self.assertEqual(d.get_expected_size(), 11 + 18 + 32)
+
+ # Log root with metadata.
+ d = aftltool.TrillianLogRootDescriptor(
+ self.test_log_root_bytes_with_metadata)
+ self.assertEqual(d.get_expected_size(), 11 + 18 + 32 + 2)
+
+ def test_encode(self):
+ """Tests encode method."""
+ # Log root from default constructor.
+ d = aftltool.TrillianLogRootDescriptor()
+ expected_bytes = (
+ '0001' # version
+ '0000000000000000' # tree_size
+ '00' # root_hash_size
+ '' # root_hash (empty)
+ '0000000000000000' # timestamp
+ '0000000000000000' # revision
+ '0000' # metadata size
+ '' # metadata (empty)
+ )
+ self.assertEqual(d.encode(), binascii.unhexlify(expected_bytes))
+
+ # Log root without metadata.
+ d = aftltool.TrillianLogRootDescriptor(self.test_log_root_bytes_wo_metadata)
+ self.assertEqual(d.encode(), self.test_log_root_bytes_wo_metadata)
+
+ # Log root with metadata.
+ d = aftltool.TrillianLogRootDescriptor(
+ self.test_log_root_bytes_with_metadata)
+ self.assertEqual(d.encode(), self.test_log_root_bytes_with_metadata)
+
+ def test_is_valid(self):
+ """Tests the is_valid method."""
+ d = aftltool.TrillianLogRootDescriptor()
+ self.assertTrue(d.is_valid())
+
+ # Invalid version.
+ d = aftltool.TrillianLogRootDescriptor()
+ d.version = 2
+ self.assertFalse(d.is_valid())
+
+ # Invalid tree_size.
+ d = aftltool.TrillianLogRootDescriptor()
+ d.tree_size = -1
+ self.assertFalse(d.is_valid())
+
+ # Invalid root_hash_size.
+ d = aftltool.TrillianLogRootDescriptor()
+ d.root_hash_size = -1
+ self.assertFalse(d.is_valid())
+ d.root_hash_size = 300
+ self.assertFalse(d.is_valid())
+
+ # Invalid/valid root_hash_size / root_hash combination.
+ d = aftltool.TrillianLogRootDescriptor()
+ d.root_hash_size = 4
+ d.root_hash = '123'
+ self.assertFalse(d.is_valid())
+ d.root_hash = '1234'
+ self.assertTrue(d.is_valid())
+
+ # Invalid timestamp.
+ d = aftltool.TrillianLogRootDescriptor()
+ d.timestamp = -1
+ self.assertFalse(d.is_valid())
+
+ # Invalid revision.
+ d = aftltool.TrillianLogRootDescriptor()
+ d.revision = -1
+ self.assertFalse(d.is_valid())
+
+ # Invalid metadata_size.
+ d = aftltool.TrillianLogRootDescriptor()
+ d.metadata_size = -1
+ self.assertFalse(d.is_valid())
+ d.metadata_size = 70000
+ self.assertFalse(d.is_valid())
+
+ # Invalid/valid metadata_size / metadata combination.
+ d = aftltool.TrillianLogRootDescriptor()
+ d.metadata_size = 4
+ d.metadata = '123'
+ self.assertFalse(d.is_valid())
+ d.metadata = '1234'
+ self.assertTrue(d.is_valid())
+
+ def test_print_desc(self):
+ """Tests print_desc method."""
+ # Log root without metadata
+ buf = io.BytesIO()
+ d = aftltool.TrillianLogRootDescriptor(self.test_log_root_bytes_wo_metadata)
+ d.print_desc(buf)
+ desc = buf.getvalue()
+
+ # Cursory whether the printed description contains something useful.
+ self.assertGreater(len(desc), 0)
+ self.assertTrue('Version:' in desc)
+ self.assertFalse('Metadata:' in desc)
+
+ # Log root with metadata
+ buf = io.BytesIO()
+ d = aftltool.TrillianLogRootDescriptor(
+ self.test_log_root_bytes_with_metadata)
+ d.print_desc(buf)
+ desc = buf.getvalue()
+
+ # Cursory whether the printed description contains something useful.
+ self.assertGreater(len(desc), 0)
+ self.assertTrue('Version:' in desc)
+ self.assertTrue('Metadata:' in desc)
+
+
+class AftlMockCommunication(aftltool.AftlCommunication):
+ """Testing Mock implementation of AftlCommunication."""
+
+ def __init__(self, transparency_log, canned_response):
+ """Initializes the object.
+
+ Arguments:
+ transparency_log: String containing the URL of a transparency log server.
+ canned_response: AddFirmwareInfoResponse to return or the Exception to
+ raise.
+ """
+ super(AftlMockCommunication, self).__init__(transparency_log)
+ self.request = None
+ self.canned_response = canned_response
+
+ def AddFirmwareInfo(self, request):
+ """Records the request and returns the canned response."""
+ self.request = request
+
+ if isinstance(self.canned_response, aftltool.AftlError):
+ raise self.canned_response
+ return self.canned_response
+
+
+class AftlTest(AftltoolTestCase):
+
+ def setUp(self):
+ """Sets up the test bed for the unit tests."""
+ super(AftlTest, self).setUp()
+ self.mock_aftl_host = 'test.foo.bar:9000'
+
+ # pylint: disable=no-member
+ def test_request_inclusion_proof(self):
+ """Tests the request_inclusion_proof method."""
+ aftl_comms = AftlMockCommunication(self.mock_aftl_host, self.test_afi_resp)
+ aftl = aftltool.Aftl()
+ icp = aftl.request_inclusion_proof(self.mock_aftl_host,
+ 'a'*1024, 'version_inc',
+ 'test/data/testkey_rsa4096.pem',
+ None, None,
+ aftl_comms=aftl_comms)
+ self.assertEqual(icp.leaf_index,
+ self.test_afi_resp.fw_info_proof.proof.leaf_index)
+ self.assertEqual(icp.proof_hash_count,
+ len(self.test_afi_resp.fw_info_proof.proof.hashes))
+ self.assertEqual(icp.log_url, self.mock_aftl_host)
+ self.assertEqual(
+ icp.log_root_descriptor.root_hash, binascii.unhexlify(
+ '53b182b55dc1377197c938637f50093131daea4d0696b1eae5b8a014bfde884a'))
+
+ self.assertEqual(icp.fw_info_leaf.version_incremental, 'version_inc')
+ # To calculate the hash of the a RSA key use the following command:
+ # openssl rsa -in test/data/testkey_rsa4096.pem -pubout \
+ # -outform DER | sha256sum
+ self.assertEqual(icp.fw_info_leaf.manufacturer_key_hash, binascii.unhexlify(
+ '9841073d16a7abbe21059e026da71976373d8f74fdb91cc46aa0a7d622b925b9'))
+
+ self.assertEqual(icp.log_root_signature,
+ self.test_afi_resp.fw_info_proof.sth.log_root_signature)
+ self.assertEqual(icp.proofs, self.test_afi_resp.fw_info_proof.proof.hashes)
+
+ # pylint: disable=no-member
+ def test_request_inclusion_proof_failure(self):
+ """Tests the request_inclusion_proof_method in case of a comms problem."""
+ aftl_comms = AftlMockCommunication(self.mock_aftl_host,
+ aftltool.AftlError('Comms error'))
+ aftl = aftltool.Aftl()
+ with self.assertRaises(aftltool.AftlError):
+ aftl.request_inclusion_proof(self.mock_aftl_host,
+ 'a'*1024, 'version_inc',
+ 'test/data/testkey_rsa4096.pem',
+ None, None,
+ aftl_comms=aftl_comms)
if __name__ == '__main__':
unittest.main(verbosity=2)