blob: aeb9e2833595c8f00908bae2b859fe821e0ac746 [file] [log] [blame]
#!/usr/bin/env python3
#
# Copyright (c) 2019 Google LLC.
# All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
# @file
# Tool for factory provisioning of devices running OpenWeave firmware.
#
import os
import sys
import re
import datetime
import binascii
import base64
import struct
import tempfile
import subprocess
import urllib.request
import ssl
import argparse
import functools
import WeaveTLV
import hashlib
import csv
import plistlib
from functools import reduce
_verbosity = 0
class _UsageError(Exception):
pass
def _applyDefault(val, defaultVal):
return val if val is not None else defaultVal
def _prefixLines(str, prefix):
return re.sub(r'.*', prefix + '\g<0>', str)
def _hexDumpFile(file, prefix):
any(map(print, (prefix + (' '.join("0x%02X," % b for b in c)) for c in iter(lambda: file.read(16), b""))))
def _parseIntArg(val, min=None, max=None, argDesc='integer argument value', base=0):
try:
res = int(val, base=base)
if min != None and res < min:
res = None
elif max != None and res > max:
res = None
except ValueError:
res = None
if res == None:
raise _UsageError('Invalid %s: %s' % (argDesc, val))
return res
def _readDataOrFileArg(argName, argValue, isBase64Funct=None):
if os.path.exists(argValue):
try:
with open(argValue, mode='rb') as f:
argValue = f.read()
if isBase64Funct != None and isBase64Funct(argValue):
argValue = base64.b64decode(argValue, validate=True)
except binascii.Error as ex:
raise _UsageError('Contents of %s file invalid: %s\nFile name: %s' % (argName, str(ex), deviceCert))
except IOError as ex:
raise _UsageError('Unable to read %s file: %s\nFile name: %s' % (argName, str(ex), deviceCert))
else:
try:
argValue = base64.b64decode(deviceCert, validate=True)
except binascii.Error as ex:
raise _UsageError('Invalid value specified for %s: %s' % (argName, str(ex)))
return argValue
class _WipedNamedTemporaryFile(object):
def __init__(self, mode='w+b', suffix="", prefix=tempfile.template, dir=None):
if dir is None and os.path.isdir('/dev/shm'):
dir = '/dev/shm'
self.file = tempfile.NamedTemporaryFile(mode=mode, suffix=suffix, prefix=prefix, dir=dir, delete=True)
def __getattr__(self, name):
file = self.__dict__['file']
return getattr(file, name)
def __enter__(self):
self.file.__enter__()
return self
def close(self):
self._wipe()
self.file.close()
def __del__(self):
self._wipe()
self.file.__del__()
def __exit__(self, exc, value, tb):
self._wipe()
return self.file.__exit__(exc, value, tb)
def _wipe(self):
if os.path.isfile(self.file.name):
fileLen = os.stat(self.file.name).st_size
with open(self.file.name, mode='wb') as f:
lenWiped = 0
while lenWiped < fileLen:
f.write(b'\0' * 16)
lenWiped += 16
f.truncate()
def isBase64WeaveCert(s):
return s.startswith(b"1QAABAAB")
def isBase64WeavePrivateKey(s):
return s.startswith(b"1QAABAAC") or s.startswith(b"1QAABAAD")
def encodeESP32FirmwareImage(segments, entryPoint=0, flashMode=0, flashFreq=0, flashSize=0):
'''Constructs an ESP32 firmware image.
This format is used by the ESP32 ROM bootloader for flashing firmware image
and for loading code and data into RAM.
The image format is described here: https://github.com/espressif/esptool/wiki/Firmware-Image-Format.
'''
ESP_IMAGE_MAGIC = 0xe9
ESP_CHECKSUM_MAGIC = 0xef
# Encode the image header
# (magic, # segments, flash mode, flash size/freq, entry point)
encodedImage = struct.pack('<BBBBI', ESP_IMAGE_MAGIC, len(segments), flashMode, flashSize<<4|flashFreq, entryPoint)
# Encode all zeros for the ESP32 extended header
encodedImage += b'\0' * 16
# Encode each of the segments, with their offsets and sizes.
for (segmentOffset, segmentData) in segments:
encodedImage += struct.pack('<II', segmentOffset, len(segmentData))
encodedImage += segmentData
# Compute a checksum over all the segment data.
checksumData = (b for (segmentOffset, segmentData) in segments for b in segmentData)
checksum = reduce(lambda a,b: a^b, checksumData, ESP_CHECKSUM_MAGIC)
# Encode trailing padding and checksum byte
encodedImage += b'\0' * (15 - (len(encodedImage) % 16))
encodedImage += struct.pack('B', checksum)
return encodedImage
class ProvisioningData(object):
'''Container for factory provisioning data
Supports reading provisioning data from command line arguments, a provisioning data
CSV file and/or an HTTP-based provisioning server.
Provides method for encoding provisioning data in a self-identifying format that
can be detected and consumed by firmware running on the device.
'''
_marker = '^OW-PROV-DATA^'
_tag_SerialNumber = 0 # Serial Number (string)
_tag_DeviceId = 1 # Manufacturer-assigned Device Id (unsigned int)
_tag_DeviceCert = 2 # Manufacturer-assigned Device Cert (byte string)
_tag_DevicePrivateKey = 3 # Manufacturer-assigned Device Key (byte string)
_tag_PairingCode = 4 # Pairing Code (string)
_tag_ProductRev = 5 # Product Revision (unsigned int)
_tag_MfgDate = 6 # Manufacturing Date (string)
def __init__(self):
self._fields = { }
@property
def serialNum(self):
return self._fields.get(ProvisioningData._tag_SerialNumber, None)
@serialNum.setter
def serialNum(self, value):
if value != None and not isinstance(value, str):
raise ValueError('Invalid value specified for serial number')
self._setField(ProvisioningData._tag_SerialNumber, value)
@property
def deviceId(self):
return self._fields.get(ProvisioningData._tag_DeviceId, None)
@deviceId.setter
def deviceId(self, value):
if value != None and (not isinstance(value, int) or value < 0 or value > 0xFFFFFFFFFFFFFFFF):
raise ValueError('Invalid value specified for manufacturer-assigned device id')
self._setField(ProvisioningData._tag_DeviceId, value)
@property
def deviceCert(self):
return self._fields.get(ProvisioningData._tag_DeviceCert, None)
@deviceCert.setter
def deviceCert(self, value):
if value != None and not isinstance(value, bytes) and not isinstance(value, bytearray):
raise ValueError('Invalid value specified for manufacturer-assigned device certificate')
self._setField(ProvisioningData._tag_DeviceCert, value)
@property
def devicePrivateKey(self):
return self._fields.get(ProvisioningData._tag_DevicePrivateKey, None)
@devicePrivateKey.setter
def devicePrivateKey(self, value):
if value != None and not isinstance(value, bytes) and not isinstance(value, bytearray):
raise ValueError('Invalid value specified for manufacturer-assigned device private key')
self._setField(ProvisioningData._tag_DevicePrivateKey, value)
@property
def pairingCode(self):
return self._fields.get(ProvisioningData._tag_PairingCode, None)
@pairingCode.setter
def pairingCode(self, value):
if value != None and not isinstance(value, str):
raise ValueError('Invalid value specified for pairing code')
self._setField(ProvisioningData._tag_PairingCode, value)
@property
def productRev(self):
return self._fields.get(ProvisioningData._tag_ProductRev, None)
@productRev.setter
def productRev(self, value):
if value != None and (not isinstance(value, int) or value < 0 or value > 0xFFFF):
raise ValueError('Invalid value specified for product revision')
self._setField(ProvisioningData._tag_ProductRev, value)
@property
def mfgDate(self):
return self._fields.get(ProvisioningData._tag_MfgDate, None)
@mfgDate.setter
def mfgDate(self, value):
if value != None and not isinstance(value, str):
raise ValueError('Invalid value specified for manufacturing date')
try:
datetime.datetime.strptime(value, '%Y/%m/%d')
except ValueError:
raise ValueError('Invalid value specified for manufacturing date: %s' % value)
self._setField(ProvisioningData._tag_MfgDate, value)
def encode(self):
encodedProvData = bytearray()
# Pre-encode the data fields in Weave TLV format
tlvWriter = WeaveTLV.TLVWriter()
tlvWriter.put(None, self._fields)
encodedDataFields = tlvWriter.encoding
# Encode the provData marker
encodedProvData.extend(ProvisioningData._marker.encode('utf-8'))
# Encode the length of the data fields in 32-bit little-endian format.
encodedProvData.extend(struct.pack('<L', len(encodedDataFields)))
# Append the encoded data fields.
encodedProvData.extend(encodedDataFields)
# Compute a SHA256 hash of the marker, length and data fields and append
# it to the encoded provisioning data.
encodedProvData.extend(hashlib.sha256(encodedProvData).digest())
return encodedProvData
def __str__(self):
return self.encode()
@classmethod
def addArguments(cls, argParser):
argParser.add_argument('--serial-num', metavar='<string>', help='Set the device serial number.')
argParser.add_argument('--device-id', metavar='<hex-digits>', help='Set the manufacturer-assigned device id.',
type=functools.partial(_parseIntArg, min=0, max=0xFFFFFFFFFFFFFFFF, argDesc='device id', base=16))
argParser.add_argument('--device-cert', metavar='<base-64>|<file-name>', help='Set the manufacturer-assigned Weave device certificate.')
argParser.add_argument('--device-key', metavar='<base-64>|<file-name>', help='Set the manufacturer-assigned Weave device private key.')
argParser.add_argument('--pairing-code', metavar='<string>', help='Set the device pairing code.')
argParser.add_argument('--product-rev', metavar='<int>', help='Set the product revision for the device.',
type=functools.partial(_parseIntArg, min=0, max=0xFFFF, argDesc='product revision'))
argParser.add_argument('--mfg-date', metavar='<YYYY/MM/DD>|today|now',
help='Set the device\'s manufacturing date.')
def configFromArguments(self, argValues):
self.serialNum = argValues.serial_num
self.deviceId = argValues.device_id
self.pairingCode = argValues.pairing_code
self.productRev = argValues.product_rev
mfgDate = argValues.mfg_date
if mfgDate != None:
mfgDate = mfgDate.lower()
if mfgDate == 'today' or mfgDate == 'now':
mfgDate = datetime.datetime.now().strftime('%Y/%m/%d')
else:
mfgDate = mfgDate.replace('-', '/')
mfgDate = mfgDate.replace('.', '/')
try:
self.mfgDate = mfgDate
except ValueError as ex:
raise _UsageError(str(ex))
# Accept device certificate as base-64 string or name of file
# containing raw or base-64 encoded TLV.
deviceCert = argValues.device_cert
if deviceCert != None:
self.deviceCert = _readDataOrFileArg('device certificate', deviceCert, isBase64WeaveCert)
# Accept device private key as base-64 string or name of file
# containing raw or base-64 encoded TLV.
devicePrivateKey = argValues.device_key
if devicePrivateKey != None:
self.devicePrivateKey = _readDataOrFileArg('device private key', devicePrivateKey, isBase64WeaveCert)
def configFromProvisioningCSVFile(self, deviceId, fileName):
deviceIdStr = "%016X" % deviceId
with open(fileName) as file:
reader = csv.DictReader(file, dialect=csv.unix_dialect, skipinitialspace=True)
for row in reader:
curDeviceId = row.get('Device_Id', None)
if curDeviceId == None:
curDeviceId = row.get('MAC', None)
if curDeviceId == None:
raise Exception('"Device_Id" column not found in provisioning csv file')
if curDeviceId == deviceIdStr:
try:
self.deviceId = deviceId
self._configFromProvisioningCSVRow(row)
except binascii.Error as ex:
raise binascii.Error('Invalid base-64 data in provisioning data CSV file: %s\nFile %s, line %d' % (str(ex), fileName, reader.line_num))
except ValueError as ex:
raise ValueError('Error in provisioning data CSV file: %s\nFile %s, line %d' % (str(ex), fileName, reader.line_num))
return True
return False
def configFromProvisioningServer(self, serverBaseURL, deviceId, disableServerValidation=False):
if disableServerValidation:
ssl._create_default_https_context = ssl._create_unverified_context
url = serverBaseURL + '/GetWeaveProvisioningInfo?macAddr=%016X' % deviceId
with urllib.request.urlopen(url) as resp:
respData = resp.read()
respPlist = plistlib.loads(respData, fmt=plistlib.FMT_XML)
try:
if self.deviceCert == None:
self.deviceCert = base64.b64decode(respPlist['nlWeaveCertificate'], validate=True)
if self.devicePrivateKey == None:
self.devicePrivateKey = base64.b64decode(respPlist['nlWeavePrivateKey'], validate=True)
if self.pairingCode == None:
self.pairingCode = respPlist['nlWeavePairingCode']
except binascii.Error as ex:
raise binascii.Error('Invalid base-64 data returned by provisioning server: %s' % (str(ex)))
def _configFromProvisioningCSVRow(self, row):
if self.serialNum == None:
serialNum = row.get('Serial_Num', None)
if serialNum != None:
self.serialNum = serialNum
if self.deviceCert == None:
deviceCert = row.get('Certificate', None)
if deviceCert != None:
self.deviceCert = base64.b64decode(deviceCert, validate=True)
if self.devicePrivateKey == None:
devicePrivateKey = row.get('Private_Key', None)
if devicePrivateKey == None:
devicePrivateKey = row.get('Private Key', None)
if devicePrivateKey != None:
self.devicePrivateKey = base64.b64decode(devicePrivateKey, validate=True)
if self.pairingCode == None:
pairingCode = row.get('Pairing_Code', None)
if pairingCode == None:
pairingCode = row.get('Pairing Code', None)
if pairingCode != None:
self.pairingCode = pairingCode
if self.productRev == None:
productRev = row.get('Product_Rev', None)
if productRev != None:
self.productRev = int(productRev, base=0)
if self.mfgDate == None:
mfgDate = row.get('Mfg_Date', None)
if mfgDate != None:
self.mfgDate = mfgDate
def _setField(self, tag, value):
if value != None:
self._fields[tag] = value
else:
self._fields.pop(tag, None)
class ESPSerialDeviceControler(object):
'''Supports provisioning ESP32 devices using the ESP32 serial bootloader protocol and
Espressif's esptool.py command.'''
def __init__(self, target, loadAddr=None):
self._target = target
self._esptoolCmd = 'esptool.py'
self._comPort = target.comPort
self._comSpeed = target.comSpeed
self._loadAddr = _applyDefault(loadAddr, target.defaultLoadAddr)
def provisionDevice(self, provData):
# Encode the provisioning data
encodedProvData = provData.encode()
# Verify that the load address is valid and that no part of the provisioning data will
# fall outside the valid address range for the target device.
self._target.validateLoadAddress(self._loadAddr, len(encodedProvData))
# Create an ESP32 firmware image consisting of two segments:
# - a data segment containing the provisioning data.
# - an executable segment containing a small program to reboot the device.
provImage = encodeESP32FirmwareImage(
entryPoint=ESPSerialDeviceControler._rebootProgEntryPoint,
segments=[
(ESPSerialDeviceControler._rebootProgLoadAddr, ESPSerialDeviceControler._rebootProg),
(self._loadAddr, encodedProvData)
])
print('Writing provisioning data to device');
with _WipedNamedTemporaryFile(suffix='.bin') as imageFile:
# Write the provisioning image into a temporary file
imageFile.write(provImage)
imageFile.flush()
# Construct a command to invoke the 'load_ram' operation of the esptool, passing the name of
# of the temporary file as the input.
esptoolCmd = [
self._esptoolCmd,
'--no-stub',
'--chip', 'esp32',
'--port', self._comPort,
'--baud', self._comSpeed,
'load_ram', imageFile.name
]
if _verbosity > 0:
print(' Running "%s"' % ' '.join(esptoolCmd))
if _verbosity > 1:
print(' ESP32 Image file (%s):' % imageFile.name)
imageFile.seek(0)
_hexDumpFile(imageFile, ' ')
# Invoke the esptool command and capture its output.
esptoolProc = subprocess.Popen(esptoolCmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
(esptoolOut, esptoolErr) = esptoolProc.communicate()
esptoolOut = esptoolOut.decode('utf-8')
esptoolOut = esptoolOut.strip()
if esptoolProc.returncode != 0 or _verbosity > 0:
print(' esptool command exited with %d' % esptoolProc.returncode)
print(_prefixLines(esptoolOut, ' | '))
return esptoolProc.returncode == 0
@classmethod
def addArguments(cls, argParser):
argParser.add_argument('--esptool-cmd', metavar='<path-name>', help='Path to esptool command. Defaults to \'esptool.py\'.')
argParser.add_argument('--port', metavar='<path-name>', help='COM port device name for ESP32. Defaults to /tty/USB0.')
argParser.add_argument('--speed', metavar='<int>', help='Baud rate for COM port. Defaults to 115200.')
def configFromArguments(self, argValues):
self._esptoolCmd = _applyDefault(argValues.esptool_cmd, self._esptoolCmd)
self._comPort = _applyDefault(argValues.port, self._comPort)
self._comSpeed = _applyDefault(argValues.speed, self._comSpeed)
# Small ESP32 program to initiate a reboot of the device.
_rebootProg = bytearray([
0x70, 0x80, 0xF4, 0x3F, 0xFF, 0xFF, 0xFF, 0x3F, 0xA4, 0x80, 0xF4, 0x3F, 0xA1, 0x3A, 0xD8, 0x50,
0x8C, 0x80, 0xF4, 0x3F, 0x00, 0x0C, 0x00, 0xC0, 0x90, 0x80, 0xF4, 0x3F, 0xF0, 0x49, 0x02, 0x00,
0xA0, 0x80, 0xF4, 0x3F, 0x00, 0x00, 0x00, 0x80, 0x36, 0x41, 0x00, 0x91, 0xF5, 0xFF, 0x81, 0xF5,
0xFF, 0xC0, 0x20, 0x00, 0xA8, 0x09, 0x80, 0x8A, 0x10, 0xC0, 0x20, 0x00, 0x89, 0x09, 0x91, 0xF3,
0xFF, 0x81, 0xF1, 0xFF, 0xC0, 0x20, 0x00, 0x99, 0x08, 0x91, 0xF2, 0xFF, 0x81, 0xF1, 0xFF, 0xC0,
0x20, 0x00, 0x99, 0x08, 0x91, 0xF2, 0xFF, 0x81, 0xF0, 0xFF, 0xC0, 0x20, 0x00, 0x99, 0x08, 0x91,
0xF1, 0xFF, 0x81, 0xEF, 0xFF, 0xC0, 0x20, 0x00, 0x99, 0x08, 0x06, 0xFF, 0xFF,
])
_rebootProgLoadAddr = 0x40080000
_rebootProgEntryPoint = 0x40080028
class JLinkDeviceControler(object):
'''Supports provisioning devices using a SEGGER J-Link debug probe and SEGGER's JLinkExe command.'''
def __init__(self, target, loadAddr=None):
self._target = target
self._jlinkCmd = 'JLinkExe'
self._jLinkDeviceName = target.jlinkDeviceName
self._jlinkInterface = target.jlinkInterface
self._jlinkSpeed = target.jlinkSpeed
self._jlinkSN = None
self._loadAddr = _applyDefault(loadAddr, target.defaultLoadAddr)
def provisionDevice(self, provData):
# Encode the provisioning data
encodedProvData = provData.encode()
# Verify that the load address is valid and that no part of the provisioning data will
# fall outside the valid address range for the target device.
self._target.validateLoadAddress(self._loadAddr, len(encodedProvData))
print('Writing provisioning data to device');
with tempfile.NamedTemporaryFile(mode='w+') as scriptFile:
with _WipedNamedTemporaryFile(suffix='.bin') as provDataFile:
# Write the provisioning data set into a temporary file
provDataFile.write(provData.encode())
provDataFile.flush()
# Write a command script containing the commands:
# r
# loadfile <data-set-file> <data-set-address>
# go
# q
scriptFile.write('r\nloadfile %s 0x%08X\ngo\nq\n' % (provDataFile.name, self._loadAddr))
scriptFile.flush()
jlinkCmd = self._FormJLinkCommand(scriptFile.name)
if _verbosity > 0:
print(' Running "%s"' % ' '.join(jlinkCmd))
if _verbosity > 1:
print(' Script file (%s):' % scriptFile.name)
scriptFile.seek(0)
print(_prefixLines(scriptFile.read().strip(), ' | '))
print(' Provisioning data file (%s):' % provDataFile.name)
provDataFile.seek(0)
_hexDumpFile(provDataFile, ' ')
# Run the jlink command and collect its output.
jlinkProc = subprocess.Popen(jlinkCmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
(jlinkOut, jlinkErr) = jlinkProc.communicate()
jlinkOut = jlinkOut.decode('utf-8')
jlinkOut = jlinkOut.strip()
if jlinkProc.returncode != 0 or _verbosity > 0:
print(' J-Link command exited with %d' % jlinkProc.returncode)
print(_prefixLines(jlinkOut, ' | '))
return jlinkProc.returncode == 0
def _FormJLinkCommand(self, scriptFileName):
cmd = [
self._jlinkCmd,
'-Device', self._jLinkDeviceName,
'-If', self._jlinkInterface,
'-Speed', self._jlinkSpeed,
'-ExitOnError', '1',
'-AutoConnect', '1'
]
if self._jlinkSN != None:
cmd += [ '-SelectEmuBySn', self._jlinkSN ]
cmd += [ '-CommandFile', scriptFileName ]
return cmd
@classmethod
def addArguments(cls, argParser):
argParser.add_argument('--jlink-cmd', metavar='<path-name>', help='Path to JLink command. Defaults to \'JLinkExe\'.')
argParser.add_argument('--jlink-if', metavar='SWD|JTAG', help='J-Link interface type. Defaults to SWD.',
choices=[ 'SWD', 'JTAG'])
argParser.add_argument('--jlink-speed', metavar='<int>|adaptive|auto', help='J-Link interface speed.')
argParser.add_argument('--jlink-sn', metavar='<string>', help='J-Link probe serial number.')
def configFromArguments(self, argValues):
self._jlinkCmd = _applyDefault(argValues.jlink_cmd, self._jlinkCmd)
self._jlinkInterface = _applyDefault(argValues.jlink_if, self._jlinkInterface)
self._jlinkSpeed = _applyDefault(argValues.jlink_speed, self._jlinkSpeed)
self._jlinkSN = _applyDefault(argValues.jlink_sn, self._jlinkSN)
class Target(object):
'''Describes a class of target devices and the parameters needed to control or communicate with them.'''
def __init__(self, controllerClass,
jlinkDeviceName=None, jlinkInterface=None, jlinkSpeed=None,
comPort = None, comSpeed=None,
defaultLoadAddr=None, validAddrRanges=None):
self.controllerClass = controllerClass
self.jlinkDeviceName = jlinkDeviceName
self.jlinkInterface = jlinkInterface
self.jlinkSpeed = jlinkSpeed
self.comPort = comPort
self.comSpeed = comSpeed
self.defaultLoadAddr = defaultLoadAddr
self.validAddrRanges = validAddrRanges
def isValidLoadAddress(self, addr, dataLen):
if self.validAddrRanges != None:
addrEnd = addr + dataLen
for (addrRangeStart, addrRangeEnd) in self.validAddrRanges:
if addr >= addrRangeStart and addr <= addrRangeEnd and addrEnd >= addrRangeStart and addrEnd <= addrRangeEnd:
return True
return False
def validateLoadAddress(self, addr, dataLen):
if not self.isValidLoadAddress(addr, dataLen):
raise _UsageError('ERROR: Invalid provisioning data load address\nSome or all of the data would fall outside of the valid memory ranges for the target device');
Target_nrf52840 = Target(controllerClass=JLinkDeviceControler,
jlinkDeviceName='NRF52840_XXAA',
jlinkInterface='SWD',
jlinkSpeed='8000',
defaultLoadAddr=0x2003E000,
validAddrRanges=[
(0x20000000, 0x2003FFFF) # Data RAM region
])
Target_esp32 = Target(controllerClass=ESPSerialDeviceControler,
comPort='/dev/ttyUSB0',
comSpeed='115200',
defaultLoadAddr=(0x40000000 - 0x400),
validAddrRanges=[
(0x3FFAE000, 0x3FFFFFFF) # Internal SRAM 1 + Internal SRAM 2
])
Targets = {
'nrf52840' : Target_nrf52840,
'esp32' : Target_esp32
}
def main():
try:
class CustomArgumentParser(argparse.ArgumentParser):
def error(self, message):
raise _UsageError(message)
argParser = CustomArgumentParser(description='Tool for factory provisioning of devices running OpenWeave firmware.')
argParser.add_argument('--target', metavar='<string>', help='Target device type. Choices are: %s' % (', '.join(Targets.keys())))
argParser.add_argument('--load-addr', metavar='<hex-string>', help='Address in device memory at which provisioning data will be written.',
type=functools.partial(_parseIntArg, min=0, max=0xFFFFFFFF, argDesc='provisioning data address'))
argParser.add_argument('--verbose', '-v', action='count', help='Adjust output verbosity level. Use multiple arguments to increase verbosity.')
ProvisioningData.addArguments(argParser)
JLinkDeviceControler.addArguments(argParser)
ESPSerialDeviceControler.addArguments(argParser)
argParser.add_argument('--prov-csv-file', metavar='<file-name>', help='Read device provisioning data from a provisioning CSV file.')
argParser.add_argument('--prov-server', metavar='<url>', help='Read device provisioning data from a provisioning server.')
argParser.add_argument('--disable-server-validation', action='store_true', help='When using HTTPS, disable validation of the certificate presented by the provisioning server.')
argValues = argParser.parse_args()
if argValues.target == None:
raise _UsageError('Please specify a target device type using the --target argument.')
if argValues.target not in Targets:
raise _UsageError('Unrecognized target device type: %s\nValid device types are: %s' % (argValues.target, ', '.join(Targets.keys())))
if argValues.verbose != None:
global _verbosity
_verbosity = argValues.verbose
# Lookup the target definition object
target = Targets[argValues.target]
# Create a ProvisioningData object and initialize it from the command line arguments.
provData = ProvisioningData()
provData.configFromArguments(argValues)
# If a provisioning CSV file was given, search the file for an entry matching the given
# device id. Set the device certificate, private key and pairing code from the data in
# the CSV file *unless* any of these values were set on the command line.
if argValues.prov_csv_file != None:
if provData.deviceId is None:
raise _UsageError('Please specify the id for the device')
try:
print("Reading provisioning data from CSV file: %s" % argValues.prov_csv_file)
if not provData.configFromProvisioningCSVFile(provData.deviceId, argValues.prov_csv_file):
raise _UsageError('Unable to find provisioning data for device id %016X' % provData.deviceId)
except IOError as ex:
raise _UsageError('Unable to read provisioning data CSV file\n%s' % (str(ex)))
except binascii.Error as ex:
raise _UsageError(str(ex))
# If a provisioning server was given, connect to the server and request the device certificate,
# private key and pairing code. Use these value unless any of the values were set on the
# command line.
if argValues.prov_server != None:
if provData.deviceId is None:
raise _UsageError('Please specify the id for the device')
print("Fetching provisioning data from provisioning server: %s" % argValues.prov_server)
provData.configFromProvisioningServer(argValues.prov_server, provData.deviceId,
disableServerValidation=argValues.disable_server_validation)
# Instantiate the appropriate type of device controller based on the selected target.
devController = target.controllerClass(target=target, loadAddr=argValues.load_addr)
devController.configFromArguments(argValues)
# Invoke the device controller to provision the device with the given provisioning data.
if not devController.provisionDevice(provData=provData):
print('Failed to provision device.', file=sys.stderr)
return -1
return 0
except _UsageError as ex:
print(str(ex), file=sys.stderr)
return -1
if __name__ == '__main__':
sys.exit(main())