blob: 3f2d1dfa95099f3144c1803f4a003e28a028d86a [file] [log] [blame]
#!/usr/bin/env python3
#
# Copyright (c) 2019, The OpenThread Authors.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# 3. Neither the name of the copyright holder nor the
# names of its contributors may be used to endorse or promote products
# derived from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
from binascii import hexlify
from enum import IntEnum
import io
import logging
import struct
from network_data import SubTlvsFactory
from tlvs_parsing import UnknownTlvFactory
import common
class TlvType(IntEnum):
CHANNEL = 0
PAN_ID = 1
EXTENDED_PANID = 2
NETWORK_NAME = 3
PSKC = 4
NETWORK_MASTER_KEY = 5
NETWORK_KEY_SEQUENCE_COUNTER = 6
NETWORK_MESH_LOCAL_PREFIX = 7
STEERING_DATA = 8
BORDER_AGENT_LOCATOR = 9
COMMISSIONER_ID = 10
COMMISSIONER_SESSION_ID = 11
SECURITY_POLICY = 12
GET = 13
ACTIVE_TIMESTAMP = 14
COMMISSIONER_UDP_PORT = 15
STATE = 16
JOINER_DTLS_ENCAPSULATION = 17
JOINER_UDP_PORT = 18
JOINER_IID = 19
JOINER_ROUTER_LOCATOR = 20
JOINER_ROUTER_KEK = 21
PROVISIONING_URL = 32
VENDOR_NAME = 33
VENDOR_MODEL = 34
VENDOR_SW_VERSION = 35
VENDOR_DATA = 36
VENDOR_STACK_VERSION = 37
UDP_ENCAPSULATION = 48
IPV6_ADDRESS = 49
PENDING_TIMESTAMP = 51
DELAY_TIMER = 52
CHANNEL_MASK = 53
COUNT = 54
PERIOD = 55
SCAN_DURATION = 56
ENERGY_LIST = 57
DISCOVERY_REQUEST = 128
DISCOVERY_RESPONSE = 129
class MeshCopState(IntEnum):
ACCEPT = 0x1
REJECT = 0xff
class MeshCopMessageType(IntEnum):
JOIN_FIN_REQ = (1,)
JOIN_FIN_RSP = (2,)
JOIN_ENT_NTF = (3,)
JOIN_ENT_RSP = 4
def create_mesh_cop_message_type_set():
return [
MeshCopMessageType.JOIN_FIN_REQ,
MeshCopMessageType.JOIN_FIN_RSP,
MeshCopMessageType.JOIN_ENT_NTF,
MeshCopMessageType.JOIN_ENT_RSP,
]
# Channel TLV (0)
class Channel(object):
def __init__(self, channel_page, channel):
self._channel_page = channel_page
self._channel = channel
@property
def channel_page(self):
return self._channel_page
@property
def channel(self):
return self._channel
def __eq__(self, other):
common.expect_the_same_class(self, other)
return (self._channel_page == other._channel_page and
self._channel == other.__channel)
def __repr__(self):
return 'Channel(channel_page={},channel={})'.format(
self._channel_page, self._channel)
def to_hex(self):
return struct.pack('>BBBH', TlvType.CHANNEL, 3, self.channel_page,
self.channel)
class ChannelFactory(object):
def parse(self, data, message_info):
data_tp = struct.unpack('>BH', data.read(3))
channel_page = data_tp[0]
channel = data_tp[1]
return Channel(channel_page, channel)
# PanId TLV (1)
class Panid(object):
# TODO: Not implemented yet
pass
class PanidFactory(object):
# TODO: Not implemented yet
def parse(self, data, message_info):
raise NotImplementedError("TODO: Not implemented yet")
# ExtendedPanid TLV (2)
class ExtendedPanid(object):
def __init__(self, extended_panid):
self._extended_panid = extended_panid
@property
def extended_panid(self):
return self._extended_panid
def __eq__(self, other):
return (isinstance(self, type(other)) and
self.extended_panid == other.extended_panid)
def __repr__(self):
return "ExtendedPanid(extended_panid={})".format(self.extended_panid)
class ExtendedPanidFactory(object):
def parse(self, data, message_info):
extended_panid = struct.unpack(">Q", data.read(8))[0]
return ExtendedPanid(extended_panid)
# NetworkName TLV (3)
class NetworkName(object):
def __init__(self, network_name):
self._network_name = network_name
@property
def network_name(self):
return self._network_name
def __eq__(self, other):
return (isinstance(self, type(other)) and
self.network_name == other.network_name)
def __repr__(self):
return "NetworkName(network_name={})".format(self.network_name)
class NetworkNameFactory(object):
def parse(self, data, message_info):
len = message_info.length
network_name = struct.unpack("{}s".format(10), data.read(len))[0]
return NetworkName(network_name)
# PSKc TLV (4)
class PSKc(object):
# TODO: Not implemented yet
pass
class PSKcFactory(object):
# TODO: Not implemented yet
def parse(self, data, message_info):
raise NotImplementedError("TODO: Not implemented yet")
# NetworkMasterKey TLV (5)
class NetworkMasterKey(object):
# TODO: Not implemented yet
pass
class NetworkMasterKeyFactory(object):
# TODO: Not implemented yet
def parse(self, data, message_info):
raise NotImplementedError("TODO: Not implemented yet")
# NetworkKeySequenceCounter TLV (6)
class NetworkKeySequenceCounter(object):
# TODO: Not implemented yet
pass
class NetworkKeySequenceCounterFactory(object):
# TODO: Not implemented yet
def parse(self, data, message_info):
raise NotImplementedError("TODO: Not implemented yet")
# NetworkMeshLocalPrefix TLV (7)
class NetworkMeshLocalPrefix(object):
# TODO: Not implemented yet
pass
class NetworkMeshLocalPrefixFactory(object):
# TODO: Not implemented yet
def parse(self, data, message_info):
raise NotImplementedError("TODO: Not implemented yet")
# Steering Data TLV (8)
class SteeringData(object):
def __init__(self, bloom_filter):
self._bloom_filter = bloom_filter
@property
def bloom_filter(self):
return self._bloom_filter
def __eq__(self, other):
common.expect_the_same_class(self, other)
return self._bloom_filter == other._bloom_filter
def __repr__(self):
return "SteeringData(bloom_filter={})".format(
hexlify(self._bloom_filter))
def to_hex(self):
bloom_filter_len = len(self.bloom_filter)
return (struct.pack('>BB', TlvType.STEERING_DATA, bloom_filter_len) +
self.bloom_filter)
class SteeringDataFactory:
def parse(self, data, message_info):
bloom_filter = data.read(message_info.length)
return SteeringData(bloom_filter)
# Border Agent Locator TLV (9)
class BorderAgentLocator(object):
def __init__(self, address):
self._border_agent_locator = address
@property
def border_agent_locator(self):
return self._border_agent_locator
def __eq__(self, other):
common.expect_the_same_class(self, other)
return self._border_agent_locator == other._border_agent_locator
def __repr__(self):
return "BorderAgentLocator(rloc16={})".format(
hex(self._border_agent_locator))
def to_hex(self):
return struct.pack('>BBH', TlvType.BORDER_AGENT_LOCATOR, 2,
self.border_agent_locator)
class BorderAgentLocatorFactory:
def parse(self, data, message_info):
border_agent_locator = struct.unpack(">H", data.read(2))[0]
return BorderAgentLocator(border_agent_locator)
# CommissionerId TLV (10)
class CommissionerId(object):
def __init__(self, commissioner_id):
self._commissioner_id = commissioner_id
@property
def commissioner_id(self):
return self._commissioner_id
def __eq__(self, other):
return self.commissioner_id == other.commissioner_id
def __repr__(self):
return "CommissionerId(commissioner_id={})".format(self.commissioner_id)
class CommissionerIdFactory(object):
def parse(self, data, message_info):
commissioner_id = data.getvalue().decode('utf-8')
return CommissionerId(commissioner_id)
# Commissioner Session ID TLV (11)
class CommissionerSessionId(object):
def __init__(self, commissioner_session_id):
self._commissioner_session_id = commissioner_session_id
@property
def commissioner_session_id(self):
return self._commissioner_session_id
def __eq__(self, other):
common.expect_the_same_class(self, other)
return self._commissioner_session_id == other._commissioner_session_id
def __repr__(self):
return "CommissionerSessionId(commissioner_session_id={})".format(
self._commissioner_session_id)
def to_hex(self):
return struct.pack(
'>BBH',
TlvType.COMMISSIONER_SESSION_ID,
2,
self.commissioner_session_id,
)
class CommissionerSessionIdFactory:
def parse(self, data, message_info):
session_id = struct.unpack(">H", data.read(2))[0]
return CommissionerSessionId(session_id)
# SecurityPolicy TLV (12)
class SecurityPolicy(object):
# TODO: Not implemented yet
pass
class SecurityPolicyFactory(object):
# TODO: Not implemented yet
def parse(self, data, message_info):
raise NotImplementedError("TODO: Not implemented yet")
# Get TLV (13)
class Get(object):
# TODO: Not implemented yet
pass
class GetFactory(object):
# TODO: Not implemented yet
def parse(self, data, message_info):
raise NotImplementedError("TODO: Not implemented yet")
# ActiveTimestamp TLV (14)
class ActiveTimestamp(object):
# TODO: Not implemented yet
pass
class ActiveTimestampFactory(object):
# TODO: Not implemented yet
def parse(self, data, message_info):
raise NotImplementedError("TODO: Not implemented yet")
# Commissioner UDP Port TLV (15)
class CommissionerUdpPort(object):
def __init__(self, udp_port):
self._udp_port = udp_port
@property
def udp_port(self):
return self._udp_port
def __eq__(self, other):
common.expect_the_same_class(self, other)
return self._udp_port == other._udp_port
def __repr__(self):
return "CommissionerUdpPort(udp_port={})".format(self._udp_port)
class CommissionerUdpPortFactory:
def parse(self, data, message_info):
udp_port = struct.unpack(">H", data.read(2))[0]
return CommissionerUdpPort(udp_port)
# State TLV (16)
class State(object):
def __init__(self, state):
self._state = state
@property
def state(self):
return self._state
def __eq__(self, other):
return self.state == other.state
def __repr__(self):
return "State(state={})".format(self.state)
class StateFactory:
def parse(self, data, message_info):
state = ord(data.read(1))
return State(state)
# JoinerDtlsEncapsulation TLV (17)
class JoinerDtlsEncapsulation(object):
# TODO: Not implemented yet
pass
class JoinerDtlsEncapsulationFactory(object):
# TODO: Not implemented yet
def parse(self, data, message_info):
raise NotImplementedError("TODO: Not implemented yet")
# JoinerUdpPort TLV (18)
class JoinerUdpPort(object):
def __init__(self, udp_port):
self._udp_port = udp_port
@property
def udp_port(self):
return self._udp_port
def __eq__(self, other):
return (isinstance(self, type(other)) and
self.udp_port == other.udp_port)
def __repr__(self):
return "JoinerUdpPort(udp_port={})".format(self.udp_port)
class JoinerUdpPortFactory(object):
def parse(self, data, message_info):
udp_port = struct.unpack(">H", data.read(2))[0]
return JoinerUdpPort(udp_port)
# JoinerIID TLV (19)
class JoinerIID(object):
# TODO: Not implemented yet
pass
class JoinerIIDFactory(object):
# TODO: Not implemented yet
def parse(self, data, message_info):
raise NotImplementedError("TODO: Not implemented yet")
# JoinerRouterLocator TLV (20)
class JoinerRouterLocator(object):
# TODO: Not implemented yet
pass
class JoinerRouterLocatorFactory(object):
# TODO: Not implemented yet
def parse(self, data, message_info):
raise NotImplementedError("TODO: Not implemented yet")
# JoinerRouterKEK TLV (21)
class JoinerRouterKEK(object):
# TODO: Not implemented yet
pass
class JoinerRouterKEKFactory(object):
# TODO: Not implemented yet
def parse(self, data, message_info):
raise NotImplementedError("TODO: Not implemented yet")
# ProvisioningURL TLV (32)
class ProvisioningUrl(object):
def __init__(self, url):
self._url = url
@property
def url(self):
return self._url
def __repr__(self):
return "ProvisioningUrl(url={})".format(self.url)
class ProvisioningUrlFactory:
def parse(self, data, message_info):
url = data.decode('utf-8')
return ProvisioningUrl(url)
# VendorName TLV (33)
class VendorName(object):
def __init__(self, vendor_name):
self._vendor_name = vendor_name
@property
def vendor_name(self):
return self._vendor_name
def __eq__(self, other):
return self.vendor_name == other.vendor_name
def __repr__(self):
return "VendorName(vendor_name={})".format(self.vendor_name)
class VendorNameFactory:
def parse(self, data, message_info):
vendor_name = data.getvalue().decode('utf-8')
return VendorName(vendor_name)
# VendorModel TLV (34)
class VendorModel(object):
def __init__(self, vendor_model):
self._vendor_model = vendor_model
@property
def vendor_model(self):
return self._vendor_model
def __eq__(self, other):
return self.vendor_model == other.vendor_model
def __repr__(self):
return "VendorModel(vendor_model={})".format(self.vendor_model)
class VendorModelFactory:
def parse(self, data, message_info):
vendor_model = data.getvalue().decode('utf-8')
return VendorModel(vendor_model)
# VendorSWVersion TLV (35)
class VendorSWVersion(object):
def __init__(self, vendor_sw_version):
self._vendor_sw_version = vendor_sw_version
@property
def vendor_sw_version(self):
return self._vendor_sw_version
def __eq__(self, other):
return self.vendor_sw_version == other.vendor_sw_version
def __repr__(self):
return "VendorName(vendor_sw_version={})".format(self.vendor_sw_version)
class VendorSWVersionFactory:
def parse(self, data, message_info):
vendor_sw_version = data.getvalue()
return VendorSWVersion(vendor_sw_version)
# VendorData TLV (36)
class VendorData(object):
def __init__(self, data):
self._vendor_data = data
@property
def vendor_data(self):
return self._vendor_data
def __repr__(self):
return "Vendor(url={})".format(self.vendor_data)
class VendorDataFactory(object):
def parse(self, data, message_info):
return VendorData(data)
# VendorStackVersion TLV (37)
class VendorStackVersion(object):
def __init__(self, stack_vendor_oui, build, rev, minor, major):
self._stack_vendor_oui = stack_vendor_oui
self._build = build
self._rev = rev
self._minor = minor
self._major = major
return
@property
def stack_vendor_oui(self):
return self._stack_vendor_oui
@property
def build(self):
return self._build
@property
def rev(self):
return self._rev
@property
def minor(self):
return self._minor
@property
def major(self):
return self._major
def __repr__(self):
return "VendorStackVersion(vendor_stack_version={}, build={}, rev={}, minor={}, major={})".format(
self.stack_vendor_oui, self.build, self.rev, self.minor, self.major)
class VendorStackVersionFactory:
def parse(self, data, message_info):
stack_vendor_oui = struct.unpack(">H", data.read(2))[0]
rest = struct.unpack(">BBBB", data.read(4))
build = rest[1] << 4 | (0xf0 & rest[2])
rev = 0xF & rest[2]
minor = rest[3] & 0xf0
major = rest[3] & 0xF
return VendorStackVersion(stack_vendor_oui, build, rev, minor, major)
# UdpEncapsulation TLV (48)
class UdpEncapsulation(object):
# TODO: Not implemented yet
pass
class UdpEncapsulationFactory(object):
# TODO: Not implemented yet
def parse(self, data, message_info):
raise NotImplementedError("TODO: Not implemented yet")
# Ipv6Address TLV (49)
class Ipv6Address(object):
# TODO: Not implemented yet
pass
class Ipv6AddressFactory(object):
# TODO: Not implemented yet
def parse(self, data, message_info):
raise NotImplementedError("TODO: Not implemented yet")
# PendingTimestamp TLV (51)
class PendingTimestamp(object):
# TODO: Not implemented yet
pass
class PendingTimestampFactory(object):
# TODO: Not implemented yet
def parse(self, data, message_info):
raise NotImplementedError("TODO: Not implemented yet")
# DelayTimer TLV (52)
class DelayTimer(object):
# TODO: Not implemented yet
pass
class DelayTimerFactory(object):
# TODO: Not implemented yet
def parse(self, data, message_info):
raise NotImplementedError("TODO: Not implemented yet")
# ChannelMask TLV (53)
class ChannelMask(object):
# TODO: Not implemented yet
pass
class ChannelMaskFactory(object):
# TODO: Not implemented yet
def parse(self, data, message_info):
raise NotImplementedError("TODO: Not implemented yet")
# Count TLV (54)
class Count(object):
# TODO: Not implemented yet
pass
class CountFactory(object):
# TODO: Not implemented yet
def parse(self, data, message_info):
raise NotImplementedError("TODO: Not implemented yet")
# Period TLV (55)
class Period(object):
# TODO: Not implemented yet
pass
class PeriodFactory(object):
# TODO: Not implemented yet
def parse(self, data, message_info):
raise NotImplementedError("TODO: Not implemented yet")
# ScanDuration TLV (56)
class ScanDuration(object):
# TODO: Not implemented yet
pass
class ScanDurationFactory(object):
# TODO: Not implemented yet
def parse(self, data, message_info):
raise NotImplementedError("TODO: Not implemented yet")
# EnergyList TLV (57)
class EnergyList(object):
# TODO: Not implemented yet
pass
class EnergyListFactory(object):
# TODO: Not implemented yet
def parse(self, data, message_info):
raise NotImplementedError("TODO: Not implemented yet")
# Discovery Request TLV (128)
class DiscoveryRequest(object):
def __init__(self, version, joiner_flag):
self._version = version
self._joiner_flag = joiner_flag
@property
def version(self):
return self._version
@property
def joiner_flag(self):
return self._joiner_flag
def __eq__(self, other):
return (isinstance(self, type(other)) and
self.version == other.version and
self.joiner_flag == other.joiner_flag)
def __repr__(self):
return "DiscoveryRequest(version={}, joiner_flag={})".format(
self.version, self.joiner_flag)
class DiscoveryRequestFactory(object):
def parse(self, data, message_info):
data_byte = struct.unpack(">B", data.read(1))[0]
version = (data_byte & 0xf0) >> 4
joiner_flag = (data_byte & 0x08) >> 3
return DiscoveryRequest(version, joiner_flag)
# Discovery Response TLV (128)
class DiscoveryResponse(object):
def __init__(self, version, native_flag):
self._version = version
self._native_flag = native_flag
@property
def version(self):
return self._version
@property
def native_flag(self):
return self._native_flag
def __eq__(self, other):
return (isinstance(self, type(other)) and
self.version == other.version and
self.native_flag == other.native_flag)
def __repr__(self):
return "DiscoveryResponse(version={}, native_flag={})".format(
self.version, self.native_flag)
class DiscoveryResponseFactory(object):
def parse(self, data, message_info):
data_byte = struct.unpack(">B", data.read(1))[0]
version = (data_byte & 0xf0) >> 4
native_flag = (data_byte & 0x08) >> 3
return DiscoveryResponse(version, native_flag)
class MeshCopCommand(object):
def __init__(self, _type, tlvs):
self._type = _type
self._tlvs = tlvs
@property
def type(self):
return self._type
@property
def tlvs(self):
return self._tlvs
def __repr__(self):
tlvs_str = ", ".join(["{}".format(tlv) for tlv in self.tlvs])
return "MeshCopCommand(type={}, tlvs=[{}])".format(self.type, tlvs_str)
def create_deault_mesh_cop_msg_type_map():
return {
'JOIN_FIN.req': MeshCopMessageType.JOIN_FIN_REQ,
'JOIN_FIN.rsp': MeshCopMessageType.JOIN_FIN_RSP,
'JOIN_ENT.ntf': MeshCopMessageType.JOIN_ENT_NTF,
'JOIN_ENT.rsp': MeshCopMessageType.JOIN_ENT_RSP,
}
class MeshCopCommandFactory:
def __init__(self, tlvs_factories):
self._tlvs_factories = tlvs_factories
self._mesh_cop_msg_type_map = create_deault_mesh_cop_msg_type_map()
def _get_length(self, data):
return ord(data.read(1))
def _get_tlv_factory(self, _type):
try:
return self._tlvs_factories[_type]
except KeyError:
logging.error(
'Could not find TLV factory. Unsupported TLV type: {}'.format(
_type))
return UnknownTlvFactory(_type)
def _parse_tlv(self, data):
_type = TlvType(ord(data.read(1)))
length = self._get_length(data)
value = data.read(length)
factory = self._get_tlv_factory(_type)
return factory.parse(io.BytesIO(value),
None) # message_info not needed here
def _get_mesh_cop_msg_type(self, msg_type_str):
try:
return self._mesh_cop_msg_type_map[msg_type_str]
except KeyError:
raise KeyError(
'Mesh cop message type not found: {}'.format(msg_type_str))
def parse(self, cmd_type_str, data):
cmd_type = self._get_mesh_cop_msg_type(cmd_type_str)
tlvs = []
while data.tell() < len(data.getvalue()):
tlv = self._parse_tlv(data)
tlvs.append(tlv)
return MeshCopCommand(cmd_type, tlvs)
def create_default_mesh_cop_tlv_factories():
return {
TlvType.STATE: StateFactory(),
TlvType.PROVISIONING_URL: ProvisioningUrlFactory(),
TlvType.VENDOR_NAME: VendorNameFactory(),
TlvType.VENDOR_MODEL: VendorModelFactory(),
TlvType.VENDOR_SW_VERSION: VendorSWVersionFactory(),
TlvType.VENDOR_DATA: VendorDataFactory(),
TlvType.VENDOR_STACK_VERSION: VendorStackVersionFactory(),
}
class ThreadDiscoveryTlvsFactory(SubTlvsFactory):
def __init__(self, sub_tlvs_factories):
super(ThreadDiscoveryTlvsFactory, self).__init__(sub_tlvs_factories)