blob: 20d470e36d6e12e18f54bc2f1abcfa1f225fd00c [file] [log] [blame]
"""
Copyright (c) 2023 Nordic Semiconductor ASA
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import struct
import inspect
from typing import List
from abc import ABC, abstractmethod
from tlv.dataset_tlv import MeshcopTlvType
from tlv.tlv import TLV
class DatasetEntry(ABC):
def __init__(self, type: MeshcopTlvType):
self.type = type
self.length = None
self.maxlen = None
def print_content(self, indent: int = 0, excluded_fields: List[str] = []):
excluded_fields += ['length', 'maxlen', 'type']
indentation = " " * 4 * indent
for attr_name in dir(self):
if not attr_name.startswith('_') and attr_name not in excluded_fields:
value = getattr(self, attr_name)
if not inspect.ismethod(value):
if isinstance(value, bytes):
value = value.hex()
print(f'{indentation}{attr_name}: {value}')
@abstractmethod
def to_tlv(self) -> TLV:
pass
@abstractmethod
def set_from_tlv(self, tlv: TLV):
pass
@abstractmethod
def set(self, args: List[str]):
pass
class ActiveTimestamp(DatasetEntry):
def __init__(self):
super().__init__(MeshcopTlvType.ACTIVETIMESTAMP)
self.length = 8 # spec defined
self.seconds = 0
self.ubit = 0
self.ticks = 0
def set(self, args: List[str]):
if len(args) == 0:
raise ValueError('No argument for ActiveTimestamp')
self._seconds = int(args[0])
def set_from_tlv(self, tlv: TLV):
(value,) = struct.unpack('>Q', tlv.value)
self.ubit = value & 0x1
self.ticks = (value >> 1) & 0x7FFF
self.seconds = (value >> 16) & 0xFFFF
def to_tlv(self):
value = (self.seconds << 16) | (self.ticks << 1) | self.ubit
tlv = struct.pack('>BBQ', self.type.value, self.length, value)
return TLV.from_bytes(tlv)
class PendingTimestamp(DatasetEntry):
def __init__(self):
super().__init__(MeshcopTlvType.PENDINGTIMESTAMP)
self.length = 8 # spec defined
self.seconds = 0
self.ubit = 0
self.ticks = 0
def set(self, args: List[str]):
if len(args) == 0:
raise ValueError('No argument for PendingTimestamp')
self._seconds = int(args[0])
def set_from_tlv(self, tlv: TLV):
(value,) = struct.unpack('>Q', tlv.value)
self.ubit = value & 0x1
self.ticks = (value >> 1) & 0x7FFF
self.seconds = (value >> 16) & 0xFFFF
def to_tlv(self):
value = (self.seconds << 16) | (self.ticks << 1) | self.ubit
tlv = struct.pack('>BBQ', self.type.value, self.length, value)
return TLV.from_bytes(tlv)
class NetworkKey(DatasetEntry):
def __init__(self):
super().__init__(MeshcopTlvType.NETWORKKEY)
self.length = 16 # spec defined
self.data: str = ''
def set(self, args: List[str]):
if len(args) == 0:
raise ValueError('No argument for NetworkKey')
if args[0].startswith('0x'):
args[0] = args[0][2:]
nk = args[0]
if len(nk) != self.length * 2: # need length * 2 hex characters
raise ValueError('Invalid length of NetworkKey')
self.data = nk
def set_from_tlv(self, tlv: TLV):
self.data = tlv.value.hex()
def to_tlv(self):
if len(self.data) != self.length * 2: # need length * 2 hex characters
raise ValueError('Invalid length of NetworkKey')
value = bytes.fromhex(self.data)
tlv = struct.pack('>BB', self.type.value, self.length) + value
return TLV.from_bytes(tlv)
class NetworkName(DatasetEntry):
def __init__(self):
super().__init__(MeshcopTlvType.NETWORKNAME)
self.maxlen = 16
self.data: str = ''
def set(self, args: List[str]):
if len(args) == 0:
raise ValueError('No argument for NetworkName')
nn = args[0]
if len(nn) > self.maxlen:
raise ValueError('Invalid length of NetworkName')
self.data = nn
def set_from_tlv(self, tlv: TLV):
self.data = tlv.value.decode('utf-8')
def to_tlv(self):
length_value = len(self.data)
value = self.data.encode('utf-8')
tlv = struct.pack('>BB', self.type.value, length_value) + value
return TLV.from_bytes(tlv)
class ExtPanID(DatasetEntry):
def __init__(self):
super().__init__(MeshcopTlvType.EXTPANID)
self.length = 8 # spec defined
self.data: str = ''
def set(self, args: List[str]):
if len(args) == 0:
raise ValueError('No argument for ExtPanID')
if args[0].startswith('0x'):
args[0] = args[0][2:]
epid = args[0]
if len(epid) != self.length * 2: # need length*2 hex characters
raise ValueError('Invalid length of ExtPanID')
self.data = epid
def set_from_tlv(self, tlv: TLV):
self.data = tlv.value.hex()
def to_tlv(self):
if len(self.data) != self.length * 2: # need length*2 hex characters
raise ValueError('Invalid length of ExtPanID')
value = bytes.fromhex(self.data)
tlv = struct.pack('>BB', self.type.value, self.length) + value
return TLV.from_bytes(tlv)
class MeshLocalPrefix(DatasetEntry):
def __init__(self):
super().__init__(MeshcopTlvType.MESHLOCALPREFIX)
self.length = 8 # spec defined
self.data = ''
def set(self, args: List[str]):
if len(args) == 0:
raise ValueError('No argument for MeshLocalPrefix')
if args[0].startswith('0x'):
args[0] = args[0][2:]
mlp = args[0]
if len(mlp) != self.length * 2: # need length*2 hex characters
raise ValueError('Invalid length of MeshLocalPrefix')
self.data = mlp
def set_from_tlv(self, tlv: TLV):
self.data = tlv.value.hex()
def to_tlv(self):
if len(self.data) != self.length * 2: # need length*2 hex characters
raise ValueError('Invalid length of MeshLocalPrefix')
value = bytes.fromhex(self.data)
tlv = struct.pack('>BB', self.type.value, self.length) + value
return TLV.from_bytes(tlv)
class DelayTimer(DatasetEntry):
def __init__(self):
super().__init__(MeshcopTlvType.DELAYTIMER)
self.length = 4 # spec defined
self.time_remaining = 0
def set(self, args: List[str]):
if len(args) == 0:
raise ValueError('No argument for DelayTimer')
dt = int(args[0])
self.time_remaining = dt
def set_from_tlv(self, tlv: TLV):
self.time_remaining = tlv.value
def to_tlv(self):
value = self.time_remaining
tlv = struct.pack('>BBI', self.type.value, self.length, value)
return TLV.from_bytes(tlv)
class PanID(DatasetEntry):
def __init__(self):
super().__init__(MeshcopTlvType.PANID)
self.length = 2 # spec defined
self.data: str = ''
def set(self, args: List[str]):
if len(args) == 0:
raise ValueError('No argument for PanID')
if args[0].startswith('0x'):
args[0] = args[0][2:]
pid = args[0]
if len(pid) != self.length * 2: # need length*2 hex characters
raise ValueError('Invalid length of PanID')
self.data = pid
def set_from_tlv(self, tlv: TLV):
self.data = tlv.value.hex()
def to_tlv(self):
if len(self.data) != self.length * 2: # need length*2 hex characters
raise ValueError('Invalid length of PanID')
value = bytes.fromhex(self.data)
tlv = struct.pack('>BB', self.type.value, self.length) + value
return TLV.from_bytes(tlv)
class Channel(DatasetEntry):
def __init__(self):
super().__init__(MeshcopTlvType.CHANNEL)
self.length = 3 # spec defined
self.channel_page = 0
self.channel = 0
def set(self, args: List[str]):
if len(args) == 0:
raise ValueError('No argument for Channel')
channel = int(args[0])
self.channel = channel
def set_from_tlv(self, tlv: TLV):
self.channel = int.from_bytes(tlv.value[1:3], byteorder='big')
self.channel_page = tlv.value[0]
def to_tlv(self):
tlv = struct.pack('>BBB', self.type.value, self.length, self.channel_page)
tlv += struct.pack('>H', self.channel)
return TLV.from_bytes(tlv)
class Pskc(DatasetEntry):
def __init__(self):
super().__init__(MeshcopTlvType.PSKC)
self.maxlen = 16
self.data = ''
def set(self, args: List[str]):
if len(args) == 0:
raise ValueError('No argument for Pskc')
if args[0].startswith('0x'):
args[0] = args[0][2:]
pskc = args[0]
if (len(pskc) > self.maxlen * 2):
raise ValueError('Invalid length of Pskc. Can be max ' f'{self.length * 2} hex characters.')
self.data = pskc
def set_from_tlv(self, tlv: TLV):
self.data = tlv.value.hex()
def to_tlv(self):
# should not exceed max length*2 hex characters
if (len(self.data) > self.maxlen * 2):
raise ValueError('Invalid length of Pskc')
length_value = len(self.data) // 2
value = bytes.fromhex(self.data)
tlv = struct.pack('>BB', self.type.value, length_value) + value
return TLV.from_bytes(tlv)
class SecurityPolicy(DatasetEntry):
def __init__(self):
super().__init__(MeshcopTlvType.SECURITYPOLICY)
self.length = 4 # spec defined
self.rotation_time = 0
self.out_of_band = 0 # o
self.native = 0 # n
self.routers_1_2 = 0 # r
self.external_commissioners = 0 # c
self.reserved = 0
self.commercial_commissioning_off = 0 # C
self.autonomous_enrollment_off = 0 # e
self.networkkey_provisioning_off = 0 # p
self.thread_over_ble = 0
self.non_ccm_routers_off = 0 # R
self.rsv = 0b111
self.version_threshold = 0
def set(self, args: List[str]):
if len(args) == 0:
raise ValueError('No argument for SecurityPolicy')
rotation_time, flags, version_threshold = args + [None] * (3 - len(args))
self.rotation_time = int(rotation_time) & 0xffff
if flags:
self.out_of_band = 1 if 'o' in flags else 0
self.native = 1 if 'n' in flags else 0
self.routers_1_2 = 1 if 'r' in flags else 0
self.external_commissioners = 1 if 'c' in flags else 0
self.commercial_commissioning_off = 0 if 'C' in flags else 1
self.autonomous_enrollment_off = 0 if 'e' in flags else 1
self.networkkey_provisioning_off = 0 if 'p' in flags else 1
self.non_ccm_routers_off = 0 if 'R' in flags else 1
if version_threshold:
self.version_threshold = int(version_threshold) & 0b111
def set_from_tlv(self, tlv: TLV):
value = int.from_bytes(tlv.value, byteorder='big')
self.rotation_time = (value >> 16) & 0xFFFF
self.out_of_band = (value >> 15) & 0x1
self.native = (value >> 14) & 0x1
self.routers_1_2 = (value >> 13) & 0x1
self.external_commissioners = (value >> 12) & 0x1
self.reserved = (value >> 11) & 0x1
self.commercial_commissioning_off = (value >> 10) & 0x1
self.autonomous_enrollment_off = (value >> 9) & 0x1
self.networkkey_provisioning_off = (value >> 8) & 0x1
self.thread_over_ble = (value >> 7) & 0x1
self.non_ccm_routers_off = (value >> 6) & 0x1
self.rsv = (value >> 3) & 0x7
self.version_threshold = value & 0x7
def to_tlv(self):
value = self.rotation_time << 16
value |= self.out_of_band << 15
value |= self.native << 14
value |= self.routers_1_2 << 13
value |= self.external_commissioners << 12
value |= self.reserved << 11
value |= self.commercial_commissioning_off << 10
value |= self.autonomous_enrollment_off << 9
value |= self.networkkey_provisioning_off << 8
value |= self.thread_over_ble << 7
value |= self.non_ccm_routers_off << 6
value |= self.rsv << 3
value |= self.version_threshold
tlv = struct.pack('>BBI', self.type.value, self.length, value)
return TLV.from_bytes(tlv)
def print_content(self, indent: int = 0):
flags = ''
if self.out_of_band:
flags += 'o'
if self.native:
flags += 'n'
if self.routers_1_2:
flags += 'r'
if self.external_commissioners:
flags += 'c'
if not self.commercial_commissioning_off:
flags += 'C'
if not self.autonomous_enrollment_off:
flags += 'e'
if not self.networkkey_provisioning_off:
flags += 'p'
if not self.non_ccm_routers_off:
flags += 'R'
indentation = " " * 4 * indent
print(f'{indentation}rotation_time: {self.rotation_time}')
print(f'{indentation}flags: {flags}')
print(f'{indentation}version_threshold: {self.version_threshold}')
class ChannelMask(DatasetEntry):
def __init__(self):
super().__init__(MeshcopTlvType.CHANNELMASK)
self.entries: List[ChannelMaskEntry] = []
def set(self, args: List[str]):
# to remain consistent with the OpenThread CLI API,
# provided hex string is value of the first channel mask entry
if len(args) == 0:
raise ValueError('No argument for ChannelMask')
if args[0].startswith('0x'):
args[0] = args[0][2:]
channelmsk = bytes.fromhex(args[0])
self.entries = [ChannelMaskEntry()]
self.entries[0].channel_mask = channelmsk
def print_content(self, indent: int = 0):
super().print_content(indent=indent, excluded_fields=['entries'])
indentation = " " * 4 * indent
for i, entry in enumerate(self.entries):
print(f'{indentation}ChannelMaskEntry {i}')
entry.print_content(indent=indent + 1)
def set_from_tlv(self, tlv: TLV):
self.entries = []
for mask_entry_tlv in TLV.parse_tlvs(tlv.value):
new_entry = ChannelMaskEntry()
new_entry.set_from_tlv(mask_entry_tlv)
self.entries.append(new_entry)
def to_tlv(self):
tlv_value = b''.join(mask_entry.to_tlv().to_bytes() for mask_entry in self.entries)
tlv = struct.pack('>BB', self.type.value, len(tlv_value)) + tlv_value
return TLV.from_bytes(tlv)
class ChannelMaskEntry(DatasetEntry):
def __init__(self):
self.channel_page = 0
self.channel_mask: bytes = None
def set(self, args: List[str]):
pass
def set_from_tlv(self, tlv: TLV):
self.channel_page = tlv.type
self.mask_length = len(tlv.value)
self.channel_mask = tlv.value
def to_tlv(self):
mask_len = len(self.channel_mask)
tlv = struct.pack('>BB', self.channel_page, mask_len) + self.channel_mask
return TLV.from_bytes(tlv)
def create_dataset_entry(type: MeshcopTlvType, args=None):
entry_classes = {
MeshcopTlvType.ACTIVETIMESTAMP: ActiveTimestamp,
MeshcopTlvType.PENDINGTIMESTAMP: PendingTimestamp,
MeshcopTlvType.NETWORKKEY: NetworkKey,
MeshcopTlvType.NETWORKNAME: NetworkName,
MeshcopTlvType.EXTPANID: ExtPanID,
MeshcopTlvType.MESHLOCALPREFIX: MeshLocalPrefix,
MeshcopTlvType.DELAYTIMER: DelayTimer,
MeshcopTlvType.PANID: PanID,
MeshcopTlvType.CHANNEL: Channel,
MeshcopTlvType.PSKC: Pskc,
MeshcopTlvType.SECURITYPOLICY: SecurityPolicy,
MeshcopTlvType.CHANNELMASK: ChannelMask
}
entry_class = entry_classes.get(type)
if not entry_class:
raise ValueError(f"Invalid configuration type: {type}")
res = entry_class()
if args:
res.set(args)
return res