blob: b781018717e4868ff55783b751d121cc30a5fc06 [file] [log] [blame]
#!/usr/bin/env python
#
# Copyright (c) 2013-2017 Nest Labs, Inc.
# All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
# @file
# Unit tests for the weave command line tool.
#
import os
import sys
import unittest
import tempfile
import shutil
import subprocess
import base64
import binascii
import re
import hashlib
import argparse
import itertools
# ================================================================================
# Test Data
# ================================================================================
args = None
allCurves = [ 'prime192v1', 'secp224r1', 'prime256v1' ]
allSignatureHashes = [ 'sha1', 'sha256' ]
deviceCert_Weave = base64.b64decode('''
1QAABAABADABCEiqbwLPvS7MJAIFNwMnEwIAAO7uMLQYGCYEABhzGiYFfxKaLTcGJxEBAAAAADC0
GBgkBwImCCUAWiMwCjkE72edUwyZ/51yQrH5tmAgjiWfNXLwo+eD5lYUk/loRWWLJDFeh4xkNSWH
GQOZzUWhJPp2CxKeOX41gykBGDWCKQEkAgUYNYQpATYCBAIEARgYNYEwAghO/0dR5MZjmxg1gDAC
CETjQDip1LWnGDUMMAEYILdaKd7BcqnLAeFhSRoELsUBOTW7DAUYMAIZAKKWYMG5uOD6TfpmKbEQ
qlVFjrRd9t+4lhgY
''')
deviceCert_PEM = '''\
-----BEGIN CERTIFICATE-----
MIIBiTCCAT+gAwIBAgIISKpvAs+9LswwCgYIKoZIzj0EAwIwIjEgMB4GCisGAQQB
gsMrAQMMEDE4QjQzMEVFRUUwMDAwMDIwHhcNMTMxMDIyMDAwMDAwWhcNMjMxMDIw
MjM1OTU5WjAiMSAwHgYKKwYBBAGCwysBAQwQMThCNDMwMDAwMDAwMDAwMTBOMBAG
ByqGSM49AgEGBSuBBAAhAzoABO9nnVMMmf+dckKx+bZgII4lnzVy8KPng+ZWFJP5
aEVliyQxXoeMZDUlhxkDmc1FoST6dgsSnjl+o2owaDAMBgNVHRMBAf8EAjAAMA4G
A1UdDwEB/wQEAwIFoDAgBgNVHSUBAf8EFjAUBggrBgEFBQcDAgYIKwYBBQUHAwEw
EQYDVR0OBAoECE7/R1HkxmObMBMGA1UdIwQMMAqACETjQDip1LWnMAoGCCqGSM49
BAMCAzgAMDUCGCC3WinewXKpywHhYUkaBC7FATk1uwwFGAIZAKKWYMG5uOD6Tfpm
KbEQqlVFjrRd9t+4lg==
-----END CERTIFICATE-----
'''
deviceCert_DER = base64.b64decode('''
MIIBiTCCAT+gAwIBAgIISKpvAs+9LswwCgYIKoZIzj0EAwIwIjEgMB4GCisGAQQBgsMrAQMMEDE4
QjQzMEVFRUUwMDAwMDIwHhcNMTMxMDIyMDAwMDAwWhcNMjMxMDIwMjM1OTU5WjAiMSAwHgYKKwYB
BAGCwysBAQwQMThCNDMwMDAwMDAwMDAwMTBOMBAGByqGSM49AgEGBSuBBAAhAzoABO9nnVMMmf+d
ckKx+bZgII4lnzVy8KPng+ZWFJP5aEVliyQxXoeMZDUlhxkDmc1FoST6dgsSnjl+o2owaDAMBgNV
HRMBAf8EAjAAMA4GA1UdDwEB/wQEAwIFoDAgBgNVHSUBAf8EFjAUBggrBgEFBQcDAgYIKwYBBQUH
AwEwEQYDVR0OBAoECE7/R1HkxmObMBMGA1UdIwQMMAqACETjQDip1LWnMAoGCCqGSM49BAMCAzgA
MDUCGCC3WinewXKpywHhYUkaBC7FATk1uwwFGAIZAKKWYMG5uOD6TfpmKbEQqlVFjrRd9t+4lg==
''')
deviceKey_Weave = base64.b64decode('''
1QAABAACACQBJTACHKmH22bJe2mLomgimLLt70dguW98M6EsKajVuO4wAzkE72edUwyZ/51yQrH5
tmAgjiWfNXLwo+eD5lYUk/loRWWLJDFeh4xkNSWHGQOZzUWhJPp2CxKeOX4Y
''')
deviceKey_PKCS8_PEM = '''\
-----BEGIN PRIVATE KEY-----
MHgCAQAwEAYHKoZIzj0CAQYFK4EEACEEYTBfAgEBBByph9tmyXtpi6JoIpiy7e9H
YLlvfDOhLCmo1bjuoTwDOgAE72edUwyZ/51yQrH5tmAgjiWfNXLwo+eD5lYUk/lo
RWWLJDFeh4xkNSWHGQOZzUWhJPp2CxKeOX4=
-----END PRIVATE KEY-----
'''
deviceKey_PKCS8_DER = base64.b64decode('''
MHgCAQAwEAYHKoZIzj0CAQYFK4EEACEEYTBfAgEBBByph9tmyXtpi6JoIpiy7e9H
YLlvfDOhLCmo1bjuoTwDOgAE72edUwyZ/51yQrH5tmAgjiWfNXLwo+eD5lYUk/lo
RWWLJDFeh4xkNSWHGQOZzUWhJPp2CxKeOX4=
''')
deviceKey_SEC1_PEM = '''\
-----BEGIN EC PRIVATE KEY-----
MGgCAQEEHKmH22bJe2mLomgimLLt70dguW98M6EsKajVuO6gBwYFK4EEACGhPAM6
AATvZ51TDJn/nXJCsfm2YCCOJZ81cvCj54PmVhST+WhFZYskMV6HjGQ1JYcZA5nN
RaEk+nYLEp45fg==
-----END EC PRIVATE KEY-----
'''
deviceKey_SEC1_DER = base64.b64decode('''
MGgCAQEEHKmH22bJe2mLomgimLLt70dguW98M6EsKajVuO6gBwYFK4EEACGhPAM6
AATvZ51TDJn/nXJCsfm2YCCOJZ81cvCj54PmVhST+WhFZYskMV6HjGQ1JYcZA5nN
RaEk+nYLEp45fg==
''')
ecPrivKey_prime192v1 = '''\
-----BEGIN EC PRIVATE KEY-----
MF8CAQEEGCRNKztjGPy1ZaOBN0PNPhZWNYzwG+jGHKAKBggqhkjOPQMBAaE0AzIA
BB0B2y4GG4vi83oeDCKEH5dRBsfrDJIN8eOK99tW1APHjAlD+dk6bjd87hAWbYtB
0A==
-----END EC PRIVATE KEY-----
'''
ecPrivKey_secp224r1 = '''\
-----BEGIN EC PRIVATE KEY-----
MGgCAQEEHJwfkRY712nw9ZorpyuD0BWWVJE4qDZTH8gJWoKgBwYFK4EEACGhPAM6
AATZb9qPAwHhPQYog7jzwcBJ+jeItUwc+gWy8y4h3ElQftCIt+E5vLgZ/eJtDBE2
nLM8N1XVSXbdyA==
-----END EC PRIVATE KEY-----
'''
ecPrivKey_prime256v1 = '''\
-----BEGIN EC PRIVATE KEY-----
MHcCAQEEIIa9Aa2nUwi5evHrP6k6wtW2Ej202GOPjmbqPjozbnlZoAoGCCqGSM49
AwEHoUQDQgAE5T4DC5tEevCM02PZRQeh+e9DEAQzXoz97xN8Lx9eHq4EE7XHhqf9
cKxcN1Wowx24MlwHnqycOTOvPnLq2lQ0eA==
-----END EC PRIVATE KEY-----
'''
ecPrivKeys = {
'prime192v1': ecPrivKey_prime192v1,
'secp224r1': ecPrivKey_secp224r1,
'prime256v1': ecPrivKey_prime256v1,
}
rootCACert_prime256v1_sha256 = '''\
-----BEGIN CERTIFICATE-----
MIIBhTCCASugAwIBAgIIVNWe4R2MEuwwCgYIKoZIzj0EAwIwIjEgMB4GCisGAQQB
gsMrAQMMEDE4QjQzMEVFRUUwMDAwMDEwHhcNMTgwMTAxMDAwMDAwWhcNMTgxMjMx
MjM1OTU5WjAiMSAwHgYKKwYBBAGCwysBAwwQMThCNDMwRUVFRTAwMDAwMTBZMBMG
ByqGSM49AgEGCCqGSM49AwEHA0IABHRsXKwVm44lx1XonvFe6NN2FC0EHxrURPkF
UD4HITXZvCJuvuhts9omej+dK5xKykeeboFg+nFfigXBWe+6q8CjSzBJMA8GA1Ud
EwEB/wQFMAMBAf8wDgYDVR0PAQH/BAQDAgEGMBEGA1UdDgQKBAhDIkjqBK5b0jAT
BgNVHSMEDDAKgAhDIkjqBK5b0jAKBggqhkjOPQQDAgNIADBFAiEAyepSVsXgc3YS
v9UrGrlV5yRh/YKehKkd/qqtXIoHtdYCIHTccVIhRP2luMS74x2P7yeBTB9m3/Fi
+AVuB/DSuE+K
-----END CERTIFICATE-----
'''
rootCAKey_prime256v1 = '''\
-----BEGIN EC PRIVATE KEY-----
MHcCAQEEIBKjAhVUhrZv9epNU/TOgrMnidr6kQTrIzc5zdmRbcNzoAoGCCqGSM49
AwEHoUQDQgAEdGxcrBWbjiXHVeie8V7o03YULQQfGtRE+QVQPgchNdm8Im6+6G2z
2iZ6P50rnErKR55ugWD6cV+KBcFZ77qrwA==
-----END EC PRIVATE KEY-----
'''
rootCAKeyId_prime256v1 = binascii.unhexlify('432248EA04AE5BD2')
weaveSig_ecdsaWithSHA1_signingCert = base64.b64decode('''\
1QAABAAFADUBMAEdAMaLcZBPltcc7efJQrbZUGPj1D73DWaf+scPKd0wAh0AizhyI08p7rmHuodn
LJtP/0eswUpU5QulTAXl2Rg2BBUwAQgaUkJcquuMVCQCBFcDACcTBAAA7u4wtBgYJgQIrWUaJgWI
oxgtVwYAJxQBAADuAzC0GBgkBwIkCCUwCjkEyGxXmW/tdZwqQFBDdK6rV0JuWRhMM4W4kEteNaVG
+pYEetbpTVk/xQOGR8GTiHO5zUzGBqORpxk1gykBGDWCKQEkAgEYNYQpATYCBAMYGDWBMAIIRybd
iJ776L8YNYAwAghMjpcZLrz47Rg1DDABHCPtQBA1kYR/qhLlvZ/88vkCFo/aB6yZS4O6cekwAh0A
qcjqr71PG/EoC0rjT8jK+tEw07cLJ8/d59szuhgYFTABCGRW+nvGNJkUJAIEVwMAJxMBAADu7jC0
GBgmBOGIZRomBWEE5VNXBgAnEwQAAO7uMLQYGCQHAiQIJTAKOQQ3boDGKBoAVSfJn1CGq3F6mWzd
25VCwiQ3fHabganwrjBOEGLnWBxz0o5nrEG15D0ZBlBYhwFVzDWDKQEpAhg1gikBJAJgGDWBMAII
TI6XGS68+O0YNYAwAghKqnukemFLLRg1DDABHQCrMcDH5OYW1me01XfsZwTG3igFS/XJKlTtetvA
MAIdAPC4MHMAwN3fk0W17E0aeFrtovEgcsJ8GrfNDAAYGBgY
''')
weaveSig_ecdsaWithSHA256_certRef = base64.b64decode('''\
1QAABAAFACUFBQI1ATABHE0Px2EANL9tD9G4K82MeSUHihoqi9nhqJxa0JwwAhwtgeTZTnZpiX/+
eZ71UhRhnjKfRp/8bH+ipYZMGDUDMAIIRybdiJ776L8YGA==
''')
# ================================================================================
# Utility Classes and Functions
# ================================================================================
def computeTruncatedKeyId(pubKey):
# Generate "truncated" SHA-1 hash. Per RFC5280:
#
# "(2) The keyIdentifier is composed of a four-bit type field with the value 0100 followed
# by the least significant 60 bits of the SHA-1 hash of the value of the BIT STRING
# subjectPublicKey (excluding the tag, length, and number of unused bits)."
#
pubKeyHash = hashlib.sha1(pubKey).digest()
keyId = chr(0x40 | ord(pubKeyHash[12]) & 0xF) + pubKeyHash[13:]
return keyId
def splitSimpleCSV(line):
return [ v.strip() for v in line.split(',') ]
def parseOpenSSLTextField(text, startMarker):
m = re.search(r'''^\s+%s(.*)$''' % startMarker, text, re.MULTILINE)
if m:
return m.group(1).strip()
else:
return None
def parseOpenSSLDNField(text, startMarker):
v = parseOpenSSLTextField(text, startMarker)
if v != None:
v = re.sub(r'''\s+=\s+''', '=', v)
return v
def parseOpenSSLHexField(text, startMarker):
m = re.search(r'''^\s*%s\s*(([0-9A-Fa-f]{2}[\s:]+)+)''' % (startMarker), text, re.MULTILINE)
if m:
valueText = m.group(1)
valueHex = re.sub(r'''[^0-9A-Fa-f]''', '', valueText)
return binascii.unhexlify(valueHex)
else:
return None
def prefixLines(text, prefix):
return re.sub(r'''^''', prefix, text, flags=re.MULTILINE)
def isPrintable(s):
return re.search(r'''[^\x09-\x0d\x20-\x7f]''', s) == None
class FileArg:
def __init__(self, name):
self.name = name
self.baseName = name
def prepare(self, dirName):
self.name = os.path.join(dirName, self.baseName)
def __str__(self):
return self.name
class InFileArg(FileArg):
def __init__(self, name, contents):
FileArg.__init__(self, name)
self.contents = contents
def prepare(self, dirName):
FileArg.prepare(self, dirName)
with open(self.name, 'w+') as f:
f.write(self.contents)
class OutFileArg(FileArg):
def __init__(self, name):
self.name = name
self.baseName = name
@property
def contents(self):
if os.access(self.name, os.R_OK):
with open(self.name, 'rb') as f:
self.contents = f.read()
return self.contents
else:
return None
class CommandRecord:
def __init__(self, cmd, args, files, res):
self.cmd = cmd
self.args = args
self.files = files
self.res = res
def __str__(self):
s = '''Command: %s %s\n Exit Code: %d''' % (self.cmd, ' '.join(self.args), self.res)
for name, contents in self.files:
if contents != None:
if not isPrintable(contents):
contents = contents.encode('string-escape')
s += '\n %s:\n%s' % (name, prefixLines(contents, ' |'))
return s
class WeaveToolTestCase(unittest.TestCase):
'''Base class for all weave tool test cases'''
def id(self):
testName = re.sub(r'''^TEST\d\d_?''', '', type(self).__name__)
methodName = re.sub(r'''^test_?''', '', self._testMethodName)
if methodName != '':
return "%s[%s]" % (testName, methodName)
else:
return testName
def setUp(self):
unittest.TestCase.setUp(self)
# Create a temporary directory to hold test input/output files.
parentDir = '/dev/shm'
if not os.path.isdir(parentDir):
parentDir = None
self.tmpDir = tempfile.mkdtemp(dir=parentDir)
self.clearTestContext()
print '- %s' % (self.id())
def tearDown(self):
unittest.TestCase.tearDown(self)
# Remove the temporary directory, and any files it contains.
if os.path.isdir(self.tmpDir):
shutil.rmtree(self.tmpDir)
def getCommandSummary(self):
return 'Commands executed: %d\n\n%s' % (
len(self.commandRecords),
'\n\n'.join((str(cmdRec) for cmdRec in self.commandRecords))
)
def clearTestContext(self):
self.commandRecords = []
def clearTmpFiles(self):
if os.path.isdir(self.tmpDir):
for name in os.listdir(self.tmpDir):
pathName = os.path.join(self.tmpDir, name)
if os.path.isfile(pathName):
os.unlink(pathName)
def runCommand(self, cmd, args, stdin=None):
fileArgs = [ arg for arg in args if isinstance(arg, FileArg) ]
for fileArg in fileArgs:
fileArg.prepare(self.tmpDir)
args = [ str(arg) for arg in args ]
proc = subprocess.Popen([ cmd ] + args, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
(stdout, stderr) = proc.communicate(stdin)
res = proc.wait()
self.commandRecords.append(
CommandRecord(
cmd=cmd,
args=args,
files=[ ('stdin', stdin), ('stdout', stdout), ('stderr', stderr) ] + [ (fileArg.name, fileArg.contents) for fileArg in fileArgs ],
res=res
)
)
return (res, stdout, stderr)
def parseWeaveCert(self, cert):
(res, stdout, stderr) = self.runCommand(args.weaveTool,
args=[ 'convert-cert', '--x509', '-', '-' ],
stdin=cert)
self.assertEqual(res, 0, 'Failed to convert weave certificate')
return self.parseX509Cert(stdout)
def parseX509Cert(self, cert):
if re.match(r'''(-+)[\w\s]+\1''', cert):
inForm = 'pem'
else:
inForm = 'der'
(res, certText, stderr) = self.runCommand(args.opensslTool,
args=[ 'x509', '-text', '-noout', '-inform', inForm ],
stdin=cert)
self.assertEqual(res, 0, 'Failed to dump X.509 certificate')
certProps = { }
certProps['Text'] = certText
certProps['Version'] = parseOpenSSLTextField(certText, 'Version:')
certProps['SerialNumber'] = parseOpenSSLTextField(certText, 'Serial Number:')
certProps['SignatureAlgorithm'] = parseOpenSSLTextField(certText, 'Signature Algorithm:')
certProps['Issuer'] = parseOpenSSLDNField(certText, 'Issuer:')
certProps['NotBefore'] = parseOpenSSLTextField(certText, 'Not Before:')
certProps['NotAfter'] = parseOpenSSLTextField(certText, 'Not After :')
certProps['Subject'] = parseOpenSSLDNField(certText, 'Subject:')
certProps['Curve'] = parseOpenSSLTextField(certText, 'ASN1 OID:')
certProps['PublicKeyAlgorithm'] = parseOpenSSLTextField(certText, 'Public Key Algorithm:')
certProps['PublicKey'] = parseOpenSSLHexField(certText, 'pub:')
certProps['SubjectKeyId'] = parseOpenSSLHexField(certText, 'X509v3 Subject Key Identifier:')
certProps['AuthorityKeyId'] = parseOpenSSLHexField(certText, r'''X509v3 Authority Key Identifier:\s+keyid:''')
m = re.search(r'''^\s+X509v3 Basic Constraints: critical\n([^\n]+)$''', certText, re.MULTILINE)
if m:
certProps['BasicConstraints'] = [ v.strip() for v in m.group(1).split(',') ]
else:
certProps['BasicConstraints'] = None
m = re.search(r'''^\s+X509v3 Key Usage: critical\n([^\n]+)$''', certText, re.MULTILINE)
if m:
certProps['KeyUsage'] = [ v.strip() for v in m.group(1).split(',') ]
else:
certProps['KeyUsage'] = None
m = re.search(r'''^\s+X509v3 Extended Key Usage: critical\n([^\n]+)$''', certText, re.MULTILINE)
if m:
certProps['ExtendedKeyUsage'] = [ v.strip() for v in m.group(1).split(',') ]
else:
certProps['ExtendedKeyUsage'] = None
return certProps
def privateKeyToPublicKey(self, privKey, keyFormat):
if keyFormat == 'weave':
(res, privKeyPEM, stderr) = self.runCommand(args.weaveTool,
args=[ 'convert-key', '--pem', '-', '-' ],
stdin=privKey)
self.assertEqual(res, 0, 'Failed to convert Weave private key to PEM format')
privKey = privKeyPEM
keyFormat = 'pem'
if keyFormat == 'pkcs8':
(res, privKeyPEM, stderr) = self.runCommand(args.opensslTool,
args=[ 'pkcs8', '-nocrypt', '-inform', 'der', '-outform', 'pem' ],
stdin=privKey)
self.assertEqual(res, 0, 'Failed to convert PKCS#8 private key to PEM format')
privKey = privKeyPEM
keyFormat = 'pem'
(res, privKeyText, stderr) = self.runCommand(args.opensslTool,
args=[ 'ec', '-text', '-noout', '-inform', keyFormat ],
stdin=privKey)
self.assertEqual(res, 0, 'Failed to extract public key from private key')
return parseOpenSSLHexField(privKeyText, 'pub:')
def verifyProvisioningDataFile(self, fileName, deviceCount, curve, hash, certFormat, keyFormat):
# Read the provisioning data file...
with open(fileName, 'r') as provData:
# Read and parse the header line
columnNames = splitSimpleCSV(provData.readline())
certColName = 'Certificate' + (' ' + certFormat.upper() if certFormat != 'weave' else '')
privKeyColName = 'Private Key' + (' ' + keyFormat.upper() if keyFormat != 'weave' else '')
# Verify the correct columns are present
expectedColumnNames = [
'MAC',
certColName,
privKeyColName,
'Permissions',
'Pairing Code',
'Certificate Type'
]
self.assertItemsEqual(columnNames, expectedColumnNames, 'Invalid column names in provisioning data file')
macCol = columnNames.index('MAC')
certCol = columnNames.index(certColName)
privKeyCol = columnNames.index(privKeyColName)
pairingCodeCol = columnNames.index('Pairing Code')
certTypeCol = columnNames.index('Certificate Type')
count = 0
# Read the provisioning data lines...
for line in provData:
# Parse the provisioning data
values = splitSimpleCSV(line)
mac = values[macCol]
cert = values[certCol]
privKey = values[privKeyCol]
pairingCode = values[pairingCodeCol]
certType = values[certTypeCol]
# Verify the MAC
expectedMAC = "%16X" % (0x18B4300000000001 + count)
self.assertEqual(mac, expectedMAC, 'Invalid MAC value')
# Verify the certificate's construction
if certFormat == 'weave':
certProps = self.parseWeaveCert(cert)
else:
cert = base64.b64decode(cert)
certProps = self.parseX509Cert(cert)
self.verifyDeviceCert(certProps, mac, curve, hash)
# Verify the CertificateType
self.assertEqual(certType, 'ECDSAWith' + hash.upper(), 'Invalid Certificate Type value')
# Verify the private key matches the public key in the certificate.
privKey = base64.b64decode(privKey)
expectedPubKey = self.privateKeyToPublicKey(privKey, keyFormat)
self.assertEqual(certProps['PublicKey'], expectedPubKey, 'Public key in certificate does not match generated private key')
# Verify the pairing code
self.verifyPairingCode(pairingCode)
count += 1
self.assertEqual(count, deviceCount, 'Incorrect number of lines in generate provisioning data file')
def verifyPairingCode(self, pairingCode):
for ch in pairingCode:
self.assertTrue(ch in Verhoeff.CharSet_Base32, 'Invalid character in pairing code')
self.assertTrue(Verhoeff.VerifyCheckChar32(pairingCode), 'Invalid check character in pairing code')
def verifyDeviceCert(self, certProps, mac, curve, hash):
self.assertEqual(certProps['Version'], '3 (0x2)', 'Invalid certificate version')
self.assertEqual(certProps['Subject'], '1.3.6.1.4.1.41387.1.1=%s' % mac, 'Invalid certificate subject')
self.assertEqual(certProps['Issuer'], '1.3.6.1.4.1.41387.1.3=18B430EEEE000001', 'Invalid certificate issuer')
self.assertEqual(certProps['Curve'], curve, 'Invalid public key curve');
self.assertEqual(certProps['SignatureAlgorithm'], 'ecdsa-with-%s' % (hash.upper()), 'Invalid signature algo');
self.assertEqual(certProps['NotBefore'], 'Jan 1 00:00:00 2018 GMT', 'Invalid NotBefore date');
self.assertEqual(certProps['NotAfter'], 'Dec 31 23:59:59 2018 GMT', 'Invalid NotAfter date');
self.assertIn('CA:FALSE', certProps['BasicConstraints'], 'Invalid CA constraint')
self.assertItemsEqual(['Digital Signature', 'Key Encipherment' ], certProps['KeyUsage'], 'Invalid Key Usage constraint')
self.assertItemsEqual(['TLS Web Client Authentication', 'TLS Web Server Authentication' ], certProps['ExtendedKeyUsage'], 'Invalid Extended Key Usage constraint')
self.assertEqual(len(certProps['SubjectKeyId']), 8, 'Invalid Subject Key Id length (expected 8)');
self.assertEqual(certProps['SubjectKeyId'], computeTruncatedKeyId(certProps['PublicKey']), 'SubjectKeyId does not match public key');
self.assertEqual(rootCAKeyId_prime256v1, certProps['AuthorityKeyId'], 'AuthorityKeyId does not match root certificate');
class WeaveToolTestResult(unittest.TestResult):
def _exc_info_to_string(self, err, test):
return "%s\n%s\n" % (
unittest.TestResult._exc_info_to_string(self, err, test),
test.getCommandSummary()
)
# ================================================================================
# Test Cases
# ================================================================================
class TEST01_PrintCert(WeaveToolTestCase):
def test(self):
'''Test the weave print-cert command'''
inFile = InFileArg('cert.weave', deviceCert_Weave)
(res, stdout, stderr) = self.runCommand(args.weaveTool, [ 'print-cert', inFile ])
self.assertEqual(res, 0, 'Command returned %d' % res)
self.assertEqual(stderr, '', 'Text in stderr')
self.assertRegexpMatches(stdout, r'''Subject:\s+WeaveDeviceId=18B4300000000001''')
self.assertRegexpMatches(stdout, r'''Issuer:\s+WeaveCAId=18B430EEEE000002''')
self.assertRegexpMatches(stdout, r'''Subject\s+Key\s+Id:\s+4EFF4751E4C6639B''')
self.assertRegexpMatches(stdout, r'''Authority\s+Key\s+Id:\s+44E34038A9D4B5A7''')
self.assertRegexpMatches(stdout, r'''Not\s+Before:\s+2013/10/22''')
self.assertRegexpMatches(stdout, r'''Not\s+After:\s+2023/10/20''')
self.assertRegexpMatches(stdout, r'''Key\s+Usage:\s+DigitalSignature\s+KeyEncipherment''')
self.assertRegexpMatches(stdout, r'''Key\s+Purpose:\s+ServerAuth\s+ClientAuth''')
self.assertRegexpMatches(stdout, r'''Public\s+Key\s+Algorithm:\s+ECPublicKey''')
self.assertRegexpMatches(stdout, r'''Signature\s+Algorithm:\s+ECDSAWithSHA256''')
self.assertRegexpMatches(stdout, r'''Curve\s+Id:\s+secp224r1''')
self.assertRegexpMatches(stdout, r'''Public\s+Key:\s+04 EF 67 9D 53 0C 99 FF 9D 72 42 B1 F9 B6 60 20''')
self.assertRegexpMatches(stdout, r'''r:\s+20 B7 5A 29 DE C1 72 A9 CB 01 E1 61 49 1A 04 2E''')
self.assertRegexpMatches(stdout, r'''s:\s+00 A2 96 60 C1 B9 B8 E0 FA 4D FA 66 29 B1 10 AA''')
class TEST02_ConvertCert(WeaveToolTestCase):
'''Test the weave convert-cert command'''
def test_WeaveToPEM(self):
'''Test certificate conversion: Weave to PEM'''
inFile = InFileArg('cert.weave', deviceCert_Weave)
outFile = OutFileArg('cert.pem')
(res, stdout, stderr) = self.runCommand(args.weaveTool, [ 'convert-cert', '--x509', inFile, outFile ])
self.assertEqual(res, 0, 'Command returned %d' % res)
self.assertEqual(stderr, '', 'Text in stderr')
self.assertEqual(outFile.contents, deviceCert_PEM)
def test_WeaveToDER(self):
'''Test certificate conversion: Weave to DER'''
inFile = InFileArg('cert.weave', deviceCert_Weave)
outFile = OutFileArg('cert.der')
(res, stdout, stderr) = self.runCommand(args.weaveTool, [ 'convert-cert', '--x509-der', inFile, outFile ])
self.assertEqual(res, 0, 'Command returned %d' % res)
self.assertEqual(stderr, '', 'Text in stderr')
self.assertEqual(outFile.contents, deviceCert_DER)
def test_PEMToWeave(self):
'''Test certificate conversion: PEM to Weave'''
inFile = InFileArg('cert.pem', deviceCert_PEM)
outFile = OutFileArg('cert.weave')
(res, stdout, stderr) = self.runCommand(args.weaveTool, [ 'convert-cert', '--weave', inFile, outFile ])
self.assertEqual(res, 0, 'Command returned %d' % res)
self.assertEqual(stderr, '', 'Text in stderr')
self.assertEqual(outFile.contents, deviceCert_Weave)
def test_DERToWeave(self):
'''Test certificate conversion: DER to Weave'''
inFile = InFileArg('cert.pem', deviceCert_DER)
outFile = OutFileArg('cert.weave')
(res, stdout, stderr) = self.runCommand(args.weaveTool, [ 'convert-cert', '--weave', inFile, outFile ])
self.assertEqual(res, 0, 'Command returned %d' % res)
self.assertEqual(stderr, '', 'Text in stderr')
self.assertEqual(outFile.contents, deviceCert_Weave)
class TEST03_ConvertKey(WeaveToolTestCase):
'''Test the weave convert-key command'''
def test_WeaveToPKCS8PEM(self):
'''Test private key conversion: Weave to PKCS8 PEM'''
inFile = InFileArg('key.weave', deviceKey_Weave)
outFile = OutFileArg('key.pem')
(res, stdout, stderr) = self.runCommand(args.weaveTool, [ 'convert-key', '--pkcs8-pem', inFile, outFile ])
self.assertEqual(res, 0, 'Command returned %d' % res)
self.assertEqual(stderr, '', 'Text in stderr')
self.assertEqual(outFile.contents, deviceKey_PKCS8_PEM)
def test_WeaveToPKCS8DER(self):
'''Test private key conversion: Weave to PKCS8 DER'''
inFile = InFileArg('key.weave', deviceKey_Weave)
outFile = OutFileArg('key.pem')
(res, stdout, stderr) = self.runCommand(args.weaveTool, [ 'convert-key', '--pkcs8-der', inFile, outFile ])
self.assertEqual(res, 0, 'Command returned %d' % res)
self.assertEqual(stderr, '', 'Text in stderr')
self.assertEqual(outFile.contents, deviceKey_PKCS8_DER)
def test_WeaveToSEC1PEM(self):
'''Test private key conversion: Weave to SEC1 PEM'''
inFile = InFileArg('key.weave', deviceKey_Weave)
outFile = OutFileArg('key.pem')
(res, stdout, stderr) = self.runCommand(args.weaveTool, [ 'convert-key', '--pem', inFile, outFile ])
self.assertEqual(res, 0, 'Command returned %d' % res)
self.assertEqual(stderr, '', 'Text in stderr')
self.assertEqual(outFile.contents, deviceKey_SEC1_PEM)
def test_WeaveToSEC1DER(self):
'''Test private key conversion: Weave to SEC1 DER'''
inFile = InFileArg('key.weave', deviceKey_Weave)
outFile = OutFileArg('key.pem')
(res, stdout, stderr) = self.runCommand(args.weaveTool, [ 'convert-key', '--der', inFile, outFile ])
self.assertEqual(res, 0, 'Command returned %d' % res)
self.assertEqual(stderr, '', 'Text in stderr')
self.assertEqual(outFile.contents, deviceKey_SEC1_DER)
class TEST04_GenCACert(WeaveToolTestCase):
'''Test the weave gen-ca-cert command'''
def test_SelfSigned(self):
'''Test generation of self-signed CA certificates'''
for curve, hash in itertools.product(allCurves, allSignatureHashes):
self.clearTestContext()
self.clearTmpFiles()
# Invoke weave gen-ca-cert command to generate a self signed CA cert
keyFile = InFileArg('key.pem', ecPrivKeys[curve])
certFile = OutFileArg('cert.pem')
(res, stdout, stderr) = self.runCommand(args.weaveTool,
args=[
'gen-ca-cert',
'--id', '18B430EEEE000001',
'--self',
'--' + hash,
'--valid-from', '2018/01/01',
'--lifetime', '365',
'--key', keyFile,
'--out', certFile
])
# Check for errors or unexpected output.
self.assertEqual(res, 0, 'Command returned %d' % res)
self.assertEqual(stderr, '', 'Text in stdout')
self.assertEqual(stderr, '', 'Text in stderr')
# Decode the certificate
certProps = self.parseX509Cert(certFile.contents)
# Verify the certificate's construction
self.assertEqual(certProps['Version'], '3 (0x2)', 'Invalid certificate version')
self.assertEqual(certProps['Subject'], '1.3.6.1.4.1.41387.1.3=18B430EEEE000001', 'Invalid certificate subject')
self.assertEqual(certProps['Issuer'], '1.3.6.1.4.1.41387.1.3=18B430EEEE000001', 'Invalid certificate issuer')
self.assertEqual(certProps['Curve'], curve, 'Invalid public key curve');
self.assertEqual(certProps['SignatureAlgorithm'], 'ecdsa-with-%s' % (hash.upper()), 'Invalid signature algo');
self.assertEqual(certProps['NotBefore'], 'Jan 1 00:00:00 2018 GMT', 'Invalid NotBefore date');
self.assertEqual(certProps['NotAfter'], 'Dec 31 23:59:59 2018 GMT', 'Invalid NotAfter date');
self.assertIn('CA:TRUE', certProps['BasicConstraints'], 'Invalid CA constraint')
self.assertItemsEqual(['Certificate Sign', 'CRL Sign' ], certProps['KeyUsage'], 'Invalid Key Usage constraint')
self.assertIsNone(certProps['ExtendedKeyUsage'], 'Unexpected Extended Key Usage constraint')
self.assertEqual(len(certProps['SubjectKeyId']), 8, 'Invalid Subject Key Id length (expected 8)');
self.assertEqual(certProps['SubjectKeyId'], computeTruncatedKeyId(certProps['PublicKey']), 'SubjectKeyId does not match public key');
self.assertEqual(certProps['SubjectKeyId'], certProps['AuthorityKeyId'], 'SubjectKeyId does not match AuthorityKeyId');
def test_RootSigned(self):
'''Test generation of CA certificates signed by a root certificate'''
for curve, hash in itertools.product(allCurves, allSignatureHashes):
self.clearTestContext()
self.clearTmpFiles()
# Invoke weave gen-ca-cert command to generate a CA cert signed by a root cert/key
caCertFile = InFileArg('ca-cert.pem', rootCACert_prime256v1_sha256)
caKeyFile = InFileArg('ca-key.pem', rootCAKey_prime256v1)
keyFile = InFileArg('key.pem', ecPrivKeys[curve])
certFile = OutFileArg('cert.pem')
(res, stdout, stderr) = self.runCommand(args.weaveTool,
args=[
'gen-ca-cert',
'--id', '18B430EEEE000002',
'--ca-cert', caCertFile,
'--ca-key', caKeyFile,
'--key', keyFile,
'--' + hash,
'--valid-from', '2018/01/01',
'--lifetime', '365',
'--out', certFile
])
# Check for errors or unexpected output.
self.assertEqual(res, 0, 'Command returned %d' % res)
self.assertEqual(stderr, '', 'Text in stdout')
self.assertEqual(stderr, '', 'Text in stderr')
# Decode the certificate
certProps = self.parseX509Cert(certFile.contents)
# Verify the certificate's construction
self.assertEqual(certProps['Version'], '3 (0x2)', 'Invalid certificate version')
self.assertEqual(certProps['Subject'], '1.3.6.1.4.1.41387.1.3=18B430EEEE000002', 'Invalid certificate subject')
self.assertEqual(certProps['Issuer'], '1.3.6.1.4.1.41387.1.3=18B430EEEE000001', 'Invalid certificate issuer')
self.assertEqual(certProps['Curve'], curve, 'Invalid public key curve');
self.assertEqual(certProps['SignatureAlgorithm'], 'ecdsa-with-%s' % (hash.upper()), 'Invalid signature algo');
self.assertEqual(certProps['NotBefore'], 'Jan 1 00:00:00 2018 GMT', 'Invalid NotBefore date');
self.assertEqual(certProps['NotAfter'], 'Dec 31 23:59:59 2018 GMT', 'Invalid NotAfter date');
self.assertIn('CA:TRUE', certProps['BasicConstraints'], 'Invalid CA constraint')
self.assertItemsEqual(['Certificate Sign', 'CRL Sign' ], certProps['KeyUsage'], 'Invalid Key Usage constraint')
self.assertIsNone(certProps['ExtendedKeyUsage'], 'Unexpected Extended Key Usage constraint')
self.assertEqual(len(certProps['SubjectKeyId']), 8, 'Invalid Subject Key Id length (expected 8)');
self.assertEqual(certProps['SubjectKeyId'], computeTruncatedKeyId(certProps['PublicKey']), 'SubjectKeyId does not match public key');
self.assertEqual(rootCAKeyId_prime256v1, certProps['AuthorityKeyId'], 'AuthorityKeyId does not match root certificate');
class TEST05_GetDeviceCert(WeaveToolTestCase):
'''Test the weave gen-device-cert command'''
def test_ExistingKey(self):
'''Test generation of a device certificate using an existing private key file'''
for curve, hash in itertools.product(allCurves, allSignatureHashes):
self.clearTestContext()
self.clearTmpFiles()
# Invoke weave gen-device-cert command to generate a device certificate
caCertFile = InFileArg('ca-cert.pem', rootCACert_prime256v1_sha256)
caKeyFile = InFileArg('ca-key.pem', rootCAKey_prime256v1)
keyFile = InFileArg('key.pem', ecPrivKeys[curve])
certFile = OutFileArg('cert.weave.b64')
(res, stdout, stderr) = self.runCommand(args.weaveTool,
args=[
'gen-device-cert',
'--dev-id', '18B430EEEE000002',
'--ca-cert', caCertFile,
'--ca-key', caKeyFile,
'--' + hash,
'--valid-from', '2018/01/01',
'--lifetime', '365',
'--key', keyFile,
'--out', certFile
])
# Check for errors or unexpected output.
self.assertEqual(res, 0, 'Command returned %d' % res)
self.assertEqual(stderr, '', 'Text in stdout')
self.assertEqual(stderr, '', 'Text in stderr')
# Decode the certificate
certProps = self.parseWeaveCert(certFile.contents)
# Verify the certificate's construction
self.assertEqual(certProps['Version'], '3 (0x2)', 'Invalid certificate version')
self.assertEqual(certProps['Subject'], '1.3.6.1.4.1.41387.1.1=18B430EEEE000002', 'Invalid certificate subject')
self.assertEqual(certProps['Issuer'], '1.3.6.1.4.1.41387.1.3=18B430EEEE000001', 'Invalid certificate issuer')
self.assertEqual(certProps['Curve'], curve, 'Invalid public key curve');
self.assertEqual(certProps['SignatureAlgorithm'], 'ecdsa-with-%s' % (hash.upper()), 'Invalid signature algo');
self.assertEqual(certProps['NotBefore'], 'Jan 1 00:00:00 2018 GMT', 'Invalid NotBefore date');
self.assertEqual(certProps['NotAfter'], 'Dec 31 23:59:59 2018 GMT', 'Invalid NotAfter date');
self.assertIn('CA:FALSE', certProps['BasicConstraints'], 'Invalid CA constraint')
self.assertItemsEqual(['Digital Signature', 'Key Encipherment' ], certProps['KeyUsage'], 'Invalid Key Usage constraint')
self.assertItemsEqual(['TLS Web Client Authentication', 'TLS Web Server Authentication' ], certProps['ExtendedKeyUsage'], 'Invalid Extended Key Usage constraint')
self.assertEqual(len(certProps['SubjectKeyId']), 8, 'Invalid Subject Key Id length (expected 8)');
self.assertEqual(certProps['SubjectKeyId'], computeTruncatedKeyId(certProps['PublicKey']), 'SubjectKeyId does not match public key');
self.assertEqual(rootCAKeyId_prime256v1, certProps['AuthorityKeyId'], 'AuthorityKeyId does not match root certificate');
def test_GenerateKey(self):
'''Test generation of a device certificate and a corresponding private key'''
for curve, hash in itertools.product(allCurves, allSignatureHashes):
self.clearTestContext()
self.clearTmpFiles()
# Invoke weave gen-device-cert command to generate a device certificate
caCertFile = InFileArg('ca-cert.pem', rootCACert_prime256v1_sha256)
caKeyFile = InFileArg('ca-key.pem', rootCAKey_prime256v1)
certFile = OutFileArg('cert.weave.b64')
keyFile = OutFileArg('key.weave.b64')
(res, stdout, stderr) = self.runCommand(args.weaveTool,
args=[
'gen-device-cert',
'--dev-id', '18B430EEEE000002',
'--ca-cert', caCertFile,
'--ca-key', caKeyFile,
'--curve', curve,
'--' + hash,
'--valid-from', '2018/01/01',
'--lifetime', '365',
'--out-key', keyFile,
'--out', certFile
])
# Check for errors or unexpected output.
self.assertEqual(res, 0, 'Command returned %d' % res)
self.assertEqual(stderr, '', 'Text in stdout')
self.assertEqual(stderr, '', 'Text in stderr')
# Decode the certificate
certProps = self.parseWeaveCert(certFile.contents)
# Verify the certificate's construction
self.assertEqual(certProps['Version'], '3 (0x2)', 'Invalid certificate version')
self.assertEqual(certProps['Subject'], '1.3.6.1.4.1.41387.1.1=18B430EEEE000002', 'Invalid certificate subject')
self.assertEqual(certProps['Issuer'], '1.3.6.1.4.1.41387.1.3=18B430EEEE000001', 'Invalid certificate issuer')
self.assertEqual(certProps['Curve'], curve, 'Invalid public key curve');
self.assertEqual(certProps['SignatureAlgorithm'], 'ecdsa-with-%s' % (hash.upper()), 'Invalid signature algo');
self.assertEqual(certProps['NotBefore'], 'Jan 1 00:00:00 2018 GMT', 'Invalid NotBefore date');
self.assertEqual(certProps['NotAfter'], 'Dec 31 23:59:59 2018 GMT', 'Invalid NotAfter date');
self.assertIn('CA:FALSE', certProps['BasicConstraints'], 'Invalid CA constraint')
self.assertItemsEqual(['Digital Signature', 'Key Encipherment' ], certProps['KeyUsage'], 'Invalid Key Usage constraint')
self.assertItemsEqual(['TLS Web Client Authentication', 'TLS Web Server Authentication' ], certProps['ExtendedKeyUsage'], 'Invalid Extended Key Usage constraint')
self.assertEqual(len(certProps['SubjectKeyId']), 8, 'Invalid Subject Key Id length (expected 8)');
self.assertEqual(certProps['SubjectKeyId'], computeTruncatedKeyId(certProps['PublicKey']), 'SubjectKeyId does not match public key');
self.assertEqual(rootCAKeyId_prime256v1, certProps['AuthorityKeyId'], 'AuthorityKeyId does not match root certificate');
expectedPubKey = self.privateKeyToPublicKey(keyFile.contents, 'weave')
self.assertEqual(certProps['PublicKey'], expectedPubKey, 'Public Key in certificate does not match generated private key')
class TEST06_GenProvisioningData(WeaveToolTestCase):
'''Test the weave gen-provisioning-data command'''
def test(self):
'''Test generation of device provisioning data'''
allCertFormats = [ 'weave', 'der' ]
allKeyFormats = [ 'weave', 'der', 'pkcs8' ]
deviceCount = 5
for curve, hash, certFormat, keyFormat in itertools.product(allCurves, allSignatureHashes, allCertFormats, allKeyFormats):
self.clearTestContext()
self.clearTmpFiles()
# Invoke weave gen-provisioning-data command
caCertFile = InFileArg('ca-cert.pem', rootCACert_prime256v1_sha256)
caKeyFile = InFileArg('ca-key.pem', rootCAKey_prime256v1)
provDataFile = OutFileArg('prov-data.csv')
(res, stdout, stderr) = self.runCommand(args.weaveTool,
args=[
'gen-provisioning-data',
'--dev-id', '18B4300000000001',
'--count', deviceCount,
'--ca-cert', caCertFile,
'--ca-key', caKeyFile,
'--curve', curve,
'--' + hash,
'--valid-from', '2018/01/01',
'--lifetime', '365',
'--' + certFormat + '-cert',
'--' + keyFormat + '-key',
'--out', provDataFile
])
# Check for errors or unexpected output.
self.assertEqual(res, 0, 'Command returned %d' % res)
self.assertEqual(stderr, '', 'Text in stdout')
self.assertEqual(stderr, '', 'Text in stderr')
# Verify the contents of the generated provisioning data file
self.verifyProvisioningDataFile(provDataFile.name, deviceCount, curve, hash, certFormat, keyFormat)
class TEST07_ConvertProvisioningData(WeaveToolTestCase):
'''Test the weave convert-provisioning-data command'''
def test_CertificateConversion(self):
'''Test conversion of certificate formats in a device provisioning data file'''
allCertFormats = [ 'weave', 'der' ]
deviceCount = 5
curve = 'prime256v1'
hash = 'sha256'
keyFormat = 'weave'
for fromKeyFormat in allCertFormats:
self.clearTestContext()
self.clearTmpFiles()
# Invoke weave gen-provisioning-data command
caCertFile = InFileArg('ca-cert.pem', rootCACert_prime256v1_sha256)
caKeyFile = InFileArg('ca-key.pem', rootCAKey_prime256v1)
fromProvDataFile = OutFileArg('from-prov-data.csv')
(res, stdout, stderr) = self.runCommand(args.weaveTool,
args=[
'gen-provisioning-data',
'--dev-id', '18B4300000000001',
'--count', deviceCount,
'--ca-cert', caCertFile,
'--ca-key', caKeyFile,
'--curve', curve,
'--sha256',
'--valid-from', '2018/01/01',
'--lifetime', '365',
'--' + fromKeyFormat + '-cert',
'--' + keyFormat + '-key',
'--out', fromProvDataFile
])
# Check for errors or unexpected output.
self.assertEqual(res, 0, 'Command returned %d' % res)
self.assertEqual(stderr, '', 'Text in stdout')
self.assertEqual(stderr, '', 'Text in stderr')
for toKeyFormat in allCertFormats:
# Invoke weave convert-provisioning-data command
toProvDataFile = OutFileArg('to-prov-data.csv')
(res, stdout, stderr) = self.runCommand(args.weaveTool,
args=[
'convert-provisioning-data',
'--' + toKeyFormat + '-cert',
fromProvDataFile,
toProvDataFile
])
# Check for errors or unexpected output.
self.assertEqual(res, 0, 'Command returned %d' % res)
self.assertEqual(stderr, '', 'Text in stdout')
self.assertEqual(stderr, '', 'Text in stderr')
# Verify the contents of the generated provisioning data file
self.verifyProvisioningDataFile(toProvDataFile.name, deviceCount, curve, hash, toKeyFormat, keyFormat)
def test_KeyConversion(self):
'''Test conversion of private key formats in a device provisioning data file'''
allKeyFormats = [ 'weave', 'der', 'pkcs8' ]
deviceCount = 5
curve = 'prime256v1'
hash = 'sha256'
certFormat = 'weave'
for fromKeyFormat in allKeyFormats:
self.clearTestContext()
self.clearTmpFiles()
# Invoke weave gen-provisioning-data command
caCertFile = InFileArg('ca-cert.pem', rootCACert_prime256v1_sha256)
caKeyFile = InFileArg('ca-key.pem', rootCAKey_prime256v1)
fromProvDataFile = OutFileArg('from-prov-data.csv')
(res, stdout, stderr) = self.runCommand(args.weaveTool,
args=[
'gen-provisioning-data',
'--dev-id', '18B4300000000001',
'--count', deviceCount,
'--ca-cert', caCertFile,
'--ca-key', caKeyFile,
'--curve', curve,
'--sha256',
'--valid-from', '2018/01/01',
'--lifetime', '365',
'--' + certFormat + '-cert',
'--' + fromKeyFormat + '-key',
'--out', fromProvDataFile
])
# Check for errors or unexpected output.
self.assertEqual(res, 0, 'Command returned %d' % res)
self.assertEqual(stderr, '', 'Text in stdout')
self.assertEqual(stderr, '', 'Text in stderr')
for toKeyFormat in allKeyFormats:
# Invoke weave convert-provisioning-data command
toProvDataFile = OutFileArg('to-prov-data.csv')
(res, stdout, stderr) = self.runCommand(args.weaveTool,
args=[
'convert-provisioning-data',
'--' + toKeyFormat + '-key',
fromProvDataFile,
toProvDataFile
])
# Check for errors or unexpected output.
self.assertEqual(res, 0, 'Command returned %d' % res)
self.assertEqual(stderr, '', 'Text in stdout')
self.assertEqual(stderr, '', 'Text in stderr')
# Verify the contents of the generated provisioning data file
self.verifyProvisioningDataFile(toProvDataFile.name, deviceCount, curve, hash, certFormat, toKeyFormat)
class TEST08_PrintSig(WeaveToolTestCase):
def findField(self, fieldNamePattern, inStr, searchStart=0, searchEnd=None):
if searchEnd == None:
searchEnd = len(inStr)
fieldHeaderRE = re.compile(r'''^(\s+)''' + fieldNamePattern + r''':\s*''', re.MULTILINE)
fieldHeader = fieldHeaderRE.search(inStr, searchStart, searchEnd)
if fieldHeader:
fieldIndent = str(fieldHeader.group(1))
fieldEndRE = re.compile(r'''^''' + fieldIndent + r'''[^\s]''', re.MULTILINE)
fieldEnd = fieldEndRE.search(inStr, fieldHeader.end())
if fieldEnd:
return (fieldHeader.start(), fieldEnd.start())
else:
return (fieldHeader.start(), searchEnd)
else:
return (None, None)
def extractRelatedCerts(self, output):
(relatedCertsStart, relatedCertsEnd) = self.findField('Related Certificates', output)
if relatedCertsStart != None:
remainingOutput = output[:relatedCertsStart] + output[relatedCertsEnd:]
relatedCerts = []
nextSearchStart = relatedCertsStart
while True:
(certStart, certEnd) = self.findField('Certificate \d+', output, nextSearchStart, relatedCertsEnd)
if certStart != None:
relatedCerts.append(output[certStart:certEnd])
nextSearchStart = certEnd
else:
break
else:
remainingOutput = output
relatedCerts = None
return (relatedCerts, remainingOutput)
def test_SigningCert(self):
'''Test the weave print-sig command on signature object that includes a set of related signing certificates'''
inFormats = [ 'tlv', 'base64' ]
for inFormat in inFormats:
if inFormat == 'base64':
inFile = InFileArg('sig.b64', base64.b64encode(weaveSig_ecdsaWithSHA1_signingCert))
else:
inFile = InFileArg('sig.weave', weaveSig_ecdsaWithSHA1_signingCert)
(res, stdout, stderr) = self.runCommand(args.weaveTool, [ 'print-sig', inFile ])
self.assertEqual(res, 0, 'Command returned %d' % res)
self.assertEqual(stderr, '', 'Text in stderr')
(relatedCerts, remainingOutput) = self.extractRelatedCerts(stdout)
self.assertRegexpMatches(remainingOutput, r'''(?m)^\s+Signature\s+Algorithm:\s+ECDSAWithSHA1\s+\(implicit\)\s*$''')
self.assertRegexpMatches(remainingOutput, r'''(?m)^\s+r:\s+00 C6 8B 71 90 4F 96 D7 1C ED E7 C9 42 B6 D9 50\s+63 E3 D4 3E F7 0D 66 9F FA C7 0F 29 DD\s*$''')
self.assertRegexpMatches(remainingOutput, r'''(?m)^\s+s:\s+00 8B 38 72 23 4F 29 EE B9 87 BA 87 67 2C 9B 4F\s+FF 47 AC C1 4A 54 E5 0B A5 4C 05 E5 D9\s*$''')
self.assertRegexpMatches(relatedCerts[0], r'''(?m)^\s+Subject:\s+WeaveSoftwarePublisherId=18B43003EE000001''')
self.assertRegexpMatches(relatedCerts[0], r'''(?m)^\s+Issuer:\s+WeaveCAId=18B430EEEE000004''')
self.assertRegexpMatches(relatedCerts[0], r'''(?m)^\s+Subject\s+Key\s+Id:\s+4726DD889EFBE8BF\s*$''')
self.assertRegexpMatches(relatedCerts[0], r'''(?m)^\s+Authority\s+Key\s+Id:\s+4C8E97192EBCF8ED\s*$''')
self.assertRegexpMatches(relatedCerts[0], r'''(?m)^\s+Not\s+Before:\s+2013/10/11\s*$''')
self.assertRegexpMatches(relatedCerts[0], r'''(?m)^\s+Not\s+After:\s+2023/07/15\s*$''')
self.assertRegexpMatches(relatedCerts[0], r'''(?m)^\s+Key\s+Usage:\s+DigitalSignature\s*$''')
self.assertRegexpMatches(relatedCerts[0], r'''(?m)^\s+Key\s+Purpose:\s+CodeSigning\s*$''')
self.assertRegexpMatches(relatedCerts[0], r'''(?m)^\s+Public\s+Key\s+Algorithm:\s+ECPublicKey\s*$''')
self.assertRegexpMatches(relatedCerts[0], r'''(?m)^\s+Signature\s+Algorithm:\s+ECDSAWithSHA1\s*$''')
self.assertRegexpMatches(relatedCerts[0], r'''(?m)^\s+Curve\s+Id:\s+secp224r1\s*$''')
self.assertRegexpMatches(relatedCerts[0], r'''(?m)^\s+Public\s+Key:\s+04 C8 6C 57 99 6F ED 75 9C 2A 40 50 43 74 AE AB\s*57 42 6E 59 18 4C 33 85 B8 90 4B 5E 35 A5 46 FA\s*96 04 7A D6 E9 4D 59 3F C5 03 86 47 C1 93 88 73\s*B9 CD 4C C6 06 A3 91 A7 19\s*$''')
self.assertRegexpMatches(relatedCerts[0], r'''(?m)^\s+r:\s+23 ED 40 10 35 91 84 7F AA 12 E5 BD 9F FC F2 F9\s*02 16 8F DA 07 AC 99 4B 83 BA 71 E9\s*$''')
self.assertRegexpMatches(relatedCerts[0], r'''(?m)^\s+s:\s+00 A9 C8 EA AF BD 4F 1B F1 28 0B 4A E3 4F C8 CA\s*FA D1 30 D3 B7 0B 27 CF DD E7 DB 33 BA\s*$''')
self.assertRegexpMatches(relatedCerts[1], r'''(?m)^\s+Subject:\s+WeaveCAId=18B430EEEE000004''')
self.assertRegexpMatches(relatedCerts[1], r'''(?m)^\s+Issuer:\s+WeaveCAId=18B430EEEE000001''')
self.assertRegexpMatches(relatedCerts[1], r'''(?m)^\s+Subject\s+Key\s+Id:\s+4C8E97192EBCF8ED\s*$''')
self.assertRegexpMatches(relatedCerts[1], r'''(?m)^\s+Authority\s+Key\s+Id:\s+4AAA7BA47A614B2D\s*$''')
self.assertRegexpMatches(relatedCerts[1], r'''(?m)^\s+Not\s+Before:\s+2013/10/11\s*$''')
self.assertRegexpMatches(relatedCerts[1], r'''(?m)^\s+Not\s+After:\s+2043/10/16\s*$''')
self.assertRegexpMatches(relatedCerts[1], r'''(?m)^\s+Is\s+CA:\s+true\s*$''')
self.assertRegexpMatches(relatedCerts[1], r'''(?m)^\s+Key\s+Usage:\s+KeyCertSign\s+CRLSign\s*$''')
self.assertRegexpMatches(relatedCerts[1], r'''(?m)^\s+Public\s+Key\s+Algorithm:\s+ECPublicKey\s*$''')
self.assertRegexpMatches(relatedCerts[1], r'''(?m)^\s+Signature\s+Algorithm:\s+ECDSAWithSHA1\s*$''')
self.assertRegexpMatches(relatedCerts[1], r'''(?m)^\s+Curve\s+Id:\s+secp224r1\s*$''')
self.assertRegexpMatches(relatedCerts[1], r'''(?m)^\s+Public\s+Key:\s+04 37 6E 80 C6 28 1A 00 55 27 C9 9F 50 86 AB 71\s*7A 99 6C DD DB 95 42 C2 24 37 7C 76 9B 81 A9 F0\s*AE 30 4E 10 62 E7 58 1C 73 D2 8E 67 AC 41 B5 E4\s*3D 19 06 50 58 87 01 55 CC\s*$''')
self.assertRegexpMatches(relatedCerts[1], r'''(?m)^\s+r:\s+00 AB 31 C0 C7 E4 E6 16 D6 67 B4 D5 77 EC 67 04\s*C6 DE 28 05 4B F5 C9 2A 54 ED 7A DB C0\s*$''')
self.assertRegexpMatches(relatedCerts[1], r'''(?m)^\s+s:\s+00 F0 B8 30 73 00 C0 DD DF 93 45 B5 EC 4D 1A 78\s*5A ED A2 F1 20 72 C2 7C 1A B7 CD 0C 00\s*$''')
def test_CertRef(self):
'''Test the weave print-sig command on signature object that includes a certificate reference'''
inFormats = [ 'tlv', 'base64' ]
for inFormat in inFormats:
if inFormat == 'base64':
inFile = InFileArg('sig.b64', base64.b64encode(weaveSig_ecdsaWithSHA256_certRef))
else:
inFile = InFileArg('sig.weave', weaveSig_ecdsaWithSHA256_certRef)
(res, stdout, stderr) = self.runCommand(args.weaveTool, [ 'print-sig', inFile ])
self.assertEqual(res, 0, 'Command returned %d' % res)
self.assertEqual(stderr, '', 'Text in stderr')
(relatedCerts, remainingOutput) = self.extractRelatedCerts(stdout)
self.assertRegexpMatches(remainingOutput, r'''(?m)^\s+Signature\s+Algorithm:\s+ECDSAWithSHA256\s*$''')
self.assertRegexpMatches(remainingOutput, r'''(?m)^\s+r:\s+4D 0F C7 61 00 34 BF 6D 0F D1 B8 2B CD 8C 79 25\s*07 8A 1A 2A 8B D9 E1 A8 9C 5A D0 9C\s*$''')
self.assertRegexpMatches(remainingOutput, r'''(?m)^\s+s:\s+2D 81 E4 D9 4E 76 69 89 7F FE 79 9E F5 52 14 61\s*9E 32 9F 46 9F FC 6C 7F A2 A5 86 4C\s*$''')
if __name__ == '__main__':
argParser = argparse.ArgumentParser(description='Script for testing the weave tool')
argParser.add_argument('--run', required=False, dest='runTests', help='List of tests to be run, separated by commas. Defaults to all.')
argParser.add_argument('--weave-root', required=False, dest='weaveRoot', help='Path to the root of the weave source tree')
argParser.add_argument('--weave-tool', required=False, dest='weaveTool', help='Path to the weave tool executable to be tested')
argParser.add_argument('--openssl', required=False, dest='opensslTool', help='Path to the openssl executable.')
args = argParser.parse_args()
# Locate the root of the Weave source tree.
# If specified on the command line, use that.
# If not, look for the env var 'abs_top_srcdir', passed by the Weave test-apps
# Makefile.
# Finally, infer the location of the source directory based on the location
# of the TestWeaveTool script.
if not args.weaveRoot:
args.weaveRoot = os.environ.get('abs_top_srcdir', None)
if not args.weaveRoot:
scriptDir = os.path.dirname(os.path.abspath(os.path.realpath(__file__)))
args.weaveRoot = os.path.abspath(os.path.join(scriptDir, '../..'))
# Locate the target-specific Weave build directory.
buildDir = os.environ.get('abs_top_builddir', None)
if not buildDir:
configGuessTool = os.path.join(args.weaveRoot, './third_party/nlbuild-autotools/repo/third_party/autoconf/config.guess')
targetDirName = subprocess.check_output([ configGuessTool ]).strip()
buildDir = os.path.join(args.weaveRoot, 'build', targetDirName)
# Locate the weave tool executable. If not specified on the command line,
# look for it in the target-specific Weave build directory.
if not args.weaveTool:
args.weaveTool = os.path.join(buildDir, 'src/tools/weave/weave')
if not os.path.isfile(args.weaveTool) or not os.access(args.weaveTool, os.X_OK):
sys.stderr.write('ERROR: weave tool not found\n')
sys.stderr.write('Expected location: %s\n' % args.weaveTool)
sys.exit(-1)
# If not specified on the command line, locate the openssl tool in the user's PATH.
if not args.opensslTool:
args.opensslTool = next(
(fileName for fileName in
(os.path.join(path, 'openssl') for path in os.environ["PATH"].split(os.pathsep))
if os.path.isfile(fileName) and os.access(fileName, os.X_OK)),
None)
if not args.opensslTool or not os.path.isfile(args.opensslTool) or not os.access(args.opensslTool, os.X_OK):
sys.stderr.write('ERROR: openssl tool not found\n')
sys.exit(-1)
# Import the Verhoeff module from the Weave source tree
verhoeffDir = os.path.join(args.weaveRoot, 'src/lib/support/verhoeff')
if os.path.isdir(verhoeffDir):
sys.path.append(verhoeffDir)
import Verhoeff
print 'TestWeaveTool'
tests = unittest.defaultTestLoader.loadTestsFromModule(sys.modules[__name__])
testResult = WeaveToolTestResult()
testResult.failfast = True
testResult.startTestRun()
try:
tests(testResult)
finally:
testResult.stopTestRun()
for test, error in testResult.errors:
sys.stderr.write('TEST ERROR: %s\n%s' % (test, error))
for test, error in testResult.failures:
sys.stderr.write('TEST FAILED: %s (%s)\n%s' % (test.id(), test.shortDescription(), error))
sys.exit(0 if testResult.wasSuccessful() else 1)