blob: 0e96b53e0ca2b461aa890b64182cbf321ecb4e84 [file] [log] [blame]
#!/usr/bin/env python3
#
# Copyright (c) 2019 Google LLC.
# All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
# @file
# This file contains definitions for working with data encoded in Weave TLV format.
#
import sys
import struct
from collections import Mapping, Sequence, OrderedDict
TLVType_SignedInteger = 0x00
TLVType_UnsignedInteger = 0x04
TLVType_Boolean = 0x08
TLVType_FloatingPointNumber = 0x0A
TLVType_UTF8String = 0x0C
TLVType_ByteString = 0x10
TLVType_Null = 0x14
TLVType_Structure = 0x15
TLVType_Array = 0x16
TLVType_Path = 0x17
TLVTagControl_Anonymous = 0x00
TLVTagControl_ContextSpecific = 0x20
TLVTagControl_CommonProfile_2Bytes = 0x40
TLVTagControl_CommonProfile_4Bytes = 0x60
TLVTagControl_ImplicitProfile_2Bytes = 0x80
TLVTagControl_ImplicitProfile_4Bytes = 0xA0
TLVTagControl_FullyQualified_6Bytes = 0xC0
TLVTagControl_FullyQualified_8Bytes = 0xE0
TLVBoolean_False = TLVType_Boolean
TLVBoolean_True = TLVType_Boolean + 1
TLVEndOfContainer = 0x18
INT8_MIN = -128
INT16_MIN = -32768
INT32_MIN = -2147483648
INT64_MIN = -9223372036854775808
INT8_MAX = 127
INT16_MAX = 32767
INT32_MAX = 2147483647
INT64_MAX = 9223372036854775807
UINT8_MAX = 255
UINT16_MAX = 65535
UINT32_MAX = 4294967295
UINT64_MAX = 18446744073709551615
class TLVWriter(object):
def __init__(self, encoding=bytearray(), implicitProfile=None):
self._encoding = encoding
self._implicitProfile = implicitProfile
self._containerStack = []
def __del__(self):
del self._encoding [:]
self._implicitProfile = None
self._containerStack = None
@property
def encoding(self):
'''The object into which encoded TLV data is written.
By default this is a bytearray object.
'''
return self._encoding
@encoding.setter
def encoding(self, val):
self._encoding = val
@property
def implicitProfile(self):
'''The Weave profile id used when encoding implicit profile tags.
Setting this value will result in an implicit profile tag being encoded
whenever the profile of the tag to be encoded matches the specified implicit
profile id.
Setting this value to None (the default) disabled encoding of implicit
profile tags.
'''
return self._implicitProfile
@implicitProfile.setter
def implicitProfile(self, val):
self._implicitProfile = val
def put(self, tag, val):
'''Write a value in TLV format with the specified TLV tag.
val can be a Python object which will be encoded as follows:
- Python bools, floats and strings are encoded as their respective TLV types.
- Python ints are encoded as unsigned TLV integers if zero or positive; signed TLV
integers if negative.
- None is encoded as a TLV Null.
- bytes and bytearray objects are encoded as TVL byte strings.
- Mapping-like objects (e.g. dict) are encoded as TLV structures. The keys of the
map object are expected to be tag values, as described below for the tag argument.
Map values are encoded recursively, using the same rules as defined for the val
argument. The encoding order of elements depends on the type of the map object.
Elements within a dict are automatically encoded tag numerical order. Elements
within other forms of mapping object (e.g. OrderedDict) are encoded in the
object's natural iteration order.
- Sequence-like objects (e.g. arrays) are written as TLV arrays. Elements within
the array are encoded recursively, using the same rules as defined for the val
argument.
tag can be a small int (0-255), a tuple of two integers, or None.
If tag is an integer, it is encoded as a TLV context-specific tag.
If tag is a two-integer tuple, it is encoded as a TLV profile-specific tag, with
the first integer encoded as the profile id and the second as the tag number.
If tag is None, it is encoded as a TLV anonymous tag.
'''
if val == None:
self.putNull(tag)
elif isinstance(val, bool):
self.putBool(tag, val)
elif isinstance(val, int):
if val < 0:
self.putSignedInt(tag, val)
else:
self.putUnsignedInt(tag, val)
elif isinstance(val, float):
self.putFloat(tag, val)
elif isinstance(val, str):
self.putString(tag, val)
elif isinstance(val, bytes) or isinstance(val, bytearray):
self.putBytes(tag, val)
elif isinstance(val, Mapping):
self.startStructure(tag)
if type(val) == dict:
val = OrderedDict(sorted(val.items(), key=lambda item: tlvTagToSortKey(item[0])))
for containedTag, containedVal in val.items():
self.put(containedTag, containedVal)
self.endContainer()
elif isinstance(val, Sequence):
self.startArray(tag)
for containedVal in val:
self.put(None, containedVal)
self.endContainer()
else:
raise ValueError('Attempt to TLV encode unsupported value')
def putSignedInt(self, tag, val):
'''Write a value as a TLV signed integer with the specified TLV tag.'''
if val >= INT8_MIN and val <= INT8_MAX:
format = '<b'
elif val >= INT16_MIN and val <= INT16_MAX:
format = '<h'
elif val >= INT32_MIN and val <= INT32_MAX:
format = '<l'
elif val >= INT64_MIN and val <= INT64_MAX:
format = '<q'
else:
raise ValueError('Integer value out of range')
val = struct.pack(format, val)
controlAndTag = self._encodeControlAndTag(TLVType_SignedInteger, tag, lenOfLenOrVal=len(val))
self._encoding.extend(controlAndTag)
self._encoding.extend(val)
def putUnsignedInt(self, tag, val):
'''Write a value as a TLV unsigned integer with the specified TLV tag.'''
val = self._encodeUnsignedInt(val)
controlAndTag = self._encodeControlAndTag(TLVType_UnsignedInteger, tag, lenOfLenOrVal=len(val))
self._encoding.extend(controlAndTag)
self._encoding.extend(val)
def putFloat(self, tag, val):
'''Write a value as a TLV float with the specified TLV tag.'''
val = struct.pack('d', val)
controlAndTag = self._encodeControlAndTag(TLVType_FloatingPointNumber, tag, lenOfLenOrVal=len(val))
self._encoding.extend(controlAndTag)
self._encoding.extend(val)
def putString(self, tag, val):
'''Write a value as a TLV string with the specified TLV tag.'''
val = val.encode('utf-8')
valLen = self._encodeUnsignedInt(len(val))
controlAndTag = self._encodeControlAndTag(TLVType_UTF8String, tag, lenOfLenOrVal=len(valLen))
self._encoding.extend(controlAndTag)
self._encoding.extend(valLen)
self._encoding.extend(val)
def putBytes(self, tag, val):
'''Write a value as a TLV byte string with the specified TLV tag.'''
valLen = self._encodeUnsignedInt(len(val))
controlAndTag = self._encodeControlAndTag(TLVType_ByteString, tag, lenOfLenOrVal=len(valLen))
self._encoding.extend(controlAndTag)
self._encoding.extend(valLen)
self._encoding.extend(val)
def putBool(self, tag, val):
'''Write a value as a TLV boolean with the specified TLV tag.'''
if val:
type = TLVBoolean_True
else:
type = TLVBoolean_False
controlAndTag = self._encodeControlAndTag(type, tag)
self._encoding.extend(controlAndTag)
def putNull(self, tag):
'''Write a TLV null with the specified TLV tag.'''
controlAndTag = self._encodeControlAndTag(TLVType_Null, tag)
self._encoding.extend(controlAndTag)
def startContainer(self, tag, containerType):
'''Start writing a TLV container with the specified TLV tag.
containerType can be one of TLVType_Structure, TLVType_Array or
TLVType_Path.
'''
self._verifyValidContainerType(containerType)
controlAndTag = self._encodeControlAndTag(containerType, tag)
self._encoding.extend(controlAndTag)
self._containerStack.insert(0, containerType)
def startStructure(self, tag):
'''Start writing a TLV structure with the specified TLV tag.'''
self.startContainer(tag, containerType=TLVType_Structure)
def startArray(self, tag):
'''Start writing a TLV array with the specified TLV tag.'''
self.startContainer(tag, containerType=TLVType_Array)
def startPath(self, tag):
'''Start writing a TLV path with the specified TLV tag.'''
self.startContainer(tag, containerType=TLVType_Path)
def endContainer(self):
'''End writing the current TLV container.'''
self._containerStack.pop(0)
controlAndTag = self._encodeControlAndTag(TLVEndOfContainer, None)
self._encoding.extend(controlAndTag)
def _encodeControlAndTag(self, type, tag, lenOfLenOrVal=0):
controlByte = type
if lenOfLenOrVal == 2:
controlByte |= 1
elif lenOfLenOrVal == 4:
controlByte |= 2
elif lenOfLenOrVal == 8:
controlByte |= 3
if tag == None:
if type != TLVEndOfContainer and len(self._containerStack) != 0 and self._containerStack[0] == TLVType_Structure:
raise ValueError('Attempt to encode anonymous tag within TLV structure')
controlByte |= TLVTagControl_Anonymous
return struct.pack('<B', controlByte)
if isinstance(tag, int):
if tag < 0 or tag > UINT8_MAX:
ValueError('Context-specific TLV tag number out of range')
if len(self._containerStack) == 0:
raise ValueError('Attempt to encode context-specific TLV tag at top level')
if self._containerStack[0] == TLVType_Array:
raise ValueError('Attempt to encode context-specific tag within TLV array')
controlByte |= TLVTagControl_ContextSpecific
return struct.pack('<BB', controlByte, tag)
if isinstance(tag, tuple):
(profile, tagNum) = tag
if not isinstance(tagNum, int):
raise ValueError('Invalid object given for TLV tag')
if tagNum < 0 or tagNum > UINT32_MAX:
raise ValueError('TLV tag number out of range')
if profile != None:
if not isinstance(profile, int):
raise ValueError('Invalid object given for TLV profile id')
if profile < 0 or profile > UINT32_MAX:
raise ValueError('TLV profile id value out of range')
if len(self._containerStack) != 0 and self._containerStack[0] == TLVType_Array:
raise ValueError('Attempt to encode profile-specific tag within TLV array')
if profile == None or profile == self._implicitProfile:
if tagNum <= UINT16_MAX:
controlByte |= TLVTagControl_ImplicitProfile_2Bytes
return struct.pack('<BH', controlByte, tagNum)
else:
controlByte |= TLVTagControl_ImplicitProfile_4Bytes
return struct.pack('<BL', controlByte, tagNum)
elif profile == 0:
if tagNum <= UINT16_MAX:
controlByte |= TLVTagControl_CommonProfile_2Bytes
return struct.pack('<BH', controlByte, tagNum)
else:
controlByte |= TLVTagControl_CommonProfile_4Bytes
return struct.pack('<BL', controlByte, tagNum)
else:
if tagNum <= UINT16_MAX:
controlByte |= TLVTagControl_FullyQualified_6Bytes
return struct.pack('<BLH', controlByte, profile, tagNum)
else:
controlByte |= TLVTagControl_FullyQualified_8Bytes
return struct.pack('<BLL', controlByte, profile, tagNum)
raise ValueError('Invalid object given for TLV tag')
@staticmethod
def _encodeUnsignedInt(val):
if val < 0:
raise ValueError('Integer value out of range')
if val <= UINT8_MAX:
format = '<B'
elif val <= UINT16_MAX:
format = '<H'
elif val <= UINT32_MAX:
format = '<L'
elif val <= UINT64_MAX:
format = '<Q'
else:
raise ValueError('Integer value out of range')
return struct.pack(format, val)
@staticmethod
def _verifyValidContainerType(containerType):
if containerType != TLVType_Structure and containerType != TLVType_Array and containerType != TLVType_Path:
raise ValueError('Invalid TLV container type')
def tlvTagToSortKey(tag):
if tag == None:
return -1
if isinstance(tag, int):
majorOrder = 0;
elif isinstance(tag, tuple):
(profileId, tag) = tag
if profileId == None:
majorOrder = 1
else:
majorOrder = profileId + 2
else:
raise ValueError('Invalid TLV tag')
return (majorOrder << 32) + tag
if __name__ == '__main__':
val = dict([
(1, 0),
(2, 65536),
(3, True),
(4, None),
(5, "Hello!"),
(6, bytearray([ 0xDE, 0xAD, 0xBE, 0xEF ])),
(7, [
"Goodbye!",
71024724507,
False
]),
((0x235A0000, 42), "FOO"),
((None, 42), "BAR"),
])
writer = TLVWriter()
encodedVal = writer.put(None, val)
sys.stdout.write(writer.encoding)