| #!/usr/bin/env python3 |
| # |
| # Copyright (c) 2019, The OpenThread Authors. |
| # All rights reserved. |
| # |
| # Redistribution and use in source and binary forms, with or without |
| # modification, are permitted provided that the following conditions are met: |
| # 1. Redistributions of source code must retain the above copyright |
| # notice, this list of conditions and the following disclaimer. |
| # 2. Redistributions in binary form must reproduce the above copyright |
| # notice, this list of conditions and the following disclaimer in the |
| # documentation and/or other materials provided with the distribution. |
| # 3. Neither the name of the copyright holder nor the |
| # names of its contributors may be used to endorse or promote products |
| # derived from this software without specific prior written permission. |
| # |
| # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" |
| # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE |
| # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE |
| # ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE |
| # LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR |
| # CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF |
| # SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS |
| # INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN |
| # CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) |
| # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
| # POSSIBILITY OF SUCH DAMAGE. |
| # |
| |
| from binascii import hexlify |
| from enum import IntEnum |
| import io |
| import logging |
| import struct |
| |
| from network_data import SubTlvsFactory |
| from tlvs_parsing import UnknownTlvFactory |
| import common |
| |
| |
| class TlvType(IntEnum): |
| CHANNEL = 0 |
| PAN_ID = 1 |
| EXTENDED_PANID = 2 |
| NETWORK_NAME = 3 |
| PSKC = 4 |
| NETWORK_KEY = 5 |
| NETWORK_KEY_SEQUENCE_COUNTER = 6 |
| NETWORK_MESH_LOCAL_PREFIX = 7 |
| STEERING_DATA = 8 |
| BORDER_AGENT_LOCATOR = 9 |
| COMMISSIONER_ID = 10 |
| COMMISSIONER_SESSION_ID = 11 |
| SECURITY_POLICY = 12 |
| GET = 13 |
| ACTIVE_TIMESTAMP = 14 |
| COMMISSIONER_UDP_PORT = 15 |
| STATE = 16 |
| JOINER_DTLS_ENCAPSULATION = 17 |
| JOINER_UDP_PORT = 18 |
| JOINER_IID = 19 |
| JOINER_ROUTER_LOCATOR = 20 |
| JOINER_ROUTER_KEK = 21 |
| PROVISIONING_URL = 32 |
| VENDOR_NAME = 33 |
| VENDOR_MODEL = 34 |
| VENDOR_SW_VERSION = 35 |
| VENDOR_DATA = 36 |
| VENDOR_STACK_VERSION = 37 |
| UDP_ENCAPSULATION = 48 |
| IPV6_ADDRESS = 49 |
| PENDING_TIMESTAMP = 51 |
| DELAY_TIMER = 52 |
| CHANNEL_MASK = 53 |
| COUNT = 54 |
| PERIOD = 55 |
| SCAN_DURATION = 56 |
| ENERGY_LIST = 57 |
| CSL_SYNCHRONIZED_TIMEOUT = 85 |
| CSL_CLOCK_ACCURACY = 86 |
| DISCOVERY_REQUEST = 128 |
| DISCOVERY_RESPONSE = 129 |
| |
| |
| class MeshCopState(IntEnum): |
| ACCEPT = 0x1 |
| REJECT = 0xff |
| |
| |
| class MeshCopMessageType(IntEnum): |
| JOIN_FIN_REQ = (1,) |
| JOIN_FIN_RSP = (2,) |
| JOIN_ENT_NTF = (3,) |
| JOIN_ENT_RSP = 4 |
| |
| |
| def create_mesh_cop_message_type_set(): |
| return [ |
| MeshCopMessageType.JOIN_FIN_REQ, |
| MeshCopMessageType.JOIN_FIN_RSP, |
| MeshCopMessageType.JOIN_ENT_NTF, |
| MeshCopMessageType.JOIN_ENT_RSP, |
| ] |
| |
| |
| # Channel TLV (0) |
| class Channel(object): |
| |
| def __init__(self, channel_page, channel): |
| self._channel_page = channel_page |
| self._channel = channel |
| |
| @property |
| def channel_page(self): |
| return self._channel_page |
| |
| @property |
| def channel(self): |
| return self._channel |
| |
| def __eq__(self, other): |
| common.expect_the_same_class(self, other) |
| |
| return (self._channel_page == other._channel_page and self._channel == other.__channel) |
| |
| def __repr__(self): |
| return 'Channel(channel_page={},channel={})'.format(self._channel_page, self._channel) |
| |
| def to_hex(self): |
| return struct.pack('>BBBH', TlvType.CHANNEL, 3, self.channel_page, self.channel) |
| |
| |
| class ChannelFactory(object): |
| |
| def parse(self, data, message_info): |
| data_tp = struct.unpack('>BH', data.read(3)) |
| channel_page = data_tp[0] |
| channel = data_tp[1] |
| return Channel(channel_page, channel) |
| |
| |
| # PanId TLV (1) |
| class Panid(object): |
| # TODO: Not implemented yet |
| pass |
| |
| |
| class PanidFactory(object): |
| # TODO: Not implemented yet |
| |
| def parse(self, data, message_info): |
| raise NotImplementedError("TODO: Not implemented yet") |
| |
| |
| # ExtendedPanid TLV (2) |
| class ExtendedPanid(object): |
| |
| def __init__(self, extended_panid): |
| self._extended_panid = extended_panid |
| |
| @property |
| def extended_panid(self): |
| return self._extended_panid |
| |
| def __eq__(self, other): |
| return (isinstance(self, type(other)) and self.extended_panid == other.extended_panid) |
| |
| def __repr__(self): |
| return "ExtendedPanid(extended_panid={})".format(self.extended_panid) |
| |
| |
| class ExtendedPanidFactory(object): |
| |
| def parse(self, data, message_info): |
| extended_panid = struct.unpack(">Q", data.read(8))[0] |
| return ExtendedPanid(extended_panid) |
| |
| |
| # NetworkName TLV (3) |
| class NetworkName(object): |
| |
| def __init__(self, network_name): |
| self._network_name = network_name |
| |
| @property |
| def network_name(self): |
| return self._network_name |
| |
| def __eq__(self, other): |
| return (isinstance(self, type(other)) and self.network_name == other.network_name) |
| |
| def __repr__(self): |
| return "NetworkName(network_name={})".format(self.network_name) |
| |
| |
| class NetworkNameFactory(object): |
| |
| def parse(self, data, message_info): |
| len = message_info.length |
| network_name = struct.unpack("{}s".format(10), data.read(len))[0] |
| return NetworkName(network_name) |
| |
| |
| # PSKc TLV (4) |
| class PSKc(object): |
| # TODO: Not implemented yet |
| pass |
| |
| |
| class PSKcFactory(object): |
| # TODO: Not implemented yet |
| |
| def parse(self, data, message_info): |
| raise NotImplementedError("TODO: Not implemented yet") |
| |
| |
| # NetworkKey TLV (5) |
| class NetworkKey(object): |
| # TODO: Not implemented yet |
| pass |
| |
| |
| class NetworkKeyFactory(object): |
| # TODO: Not implemented yet |
| |
| def parse(self, data, message_info): |
| raise NotImplementedError("TODO: Not implemented yet") |
| |
| |
| # NetworkKeySequenceCounter TLV (6) |
| class NetworkKeySequenceCounter(object): |
| # TODO: Not implemented yet |
| pass |
| |
| |
| class NetworkKeySequenceCounterFactory(object): |
| # TODO: Not implemented yet |
| |
| def parse(self, data, message_info): |
| raise NotImplementedError("TODO: Not implemented yet") |
| |
| |
| # NetworkMeshLocalPrefix TLV (7) |
| class NetworkMeshLocalPrefix(object): |
| # TODO: Not implemented yet |
| pass |
| |
| |
| class NetworkMeshLocalPrefixFactory(object): |
| # TODO: Not implemented yet |
| |
| def parse(self, data, message_info): |
| raise NotImplementedError("TODO: Not implemented yet") |
| |
| |
| # Steering Data TLV (8) |
| class SteeringData(object): |
| |
| def __init__(self, bloom_filter): |
| self._bloom_filter = bloom_filter |
| |
| @property |
| def bloom_filter(self): |
| return self._bloom_filter |
| |
| def __eq__(self, other): |
| common.expect_the_same_class(self, other) |
| |
| return self._bloom_filter == other._bloom_filter |
| |
| def __repr__(self): |
| return "SteeringData(bloom_filter={})".format(hexlify(self._bloom_filter)) |
| |
| def to_hex(self): |
| bloom_filter_len = len(self.bloom_filter) |
| return (struct.pack('>BB', TlvType.STEERING_DATA, bloom_filter_len) + self.bloom_filter) |
| |
| |
| class SteeringDataFactory: |
| |
| def parse(self, data, message_info): |
| bloom_filter = data.read(message_info.length) |
| return SteeringData(bloom_filter) |
| |
| |
| # Border Agent Locator TLV (9) |
| class BorderAgentLocator(object): |
| |
| def __init__(self, address): |
| self._border_agent_locator = address |
| |
| @property |
| def border_agent_locator(self): |
| return self._border_agent_locator |
| |
| def __eq__(self, other): |
| common.expect_the_same_class(self, other) |
| |
| return self._border_agent_locator == other._border_agent_locator |
| |
| def __repr__(self): |
| return "BorderAgentLocator(rloc16={})".format(hex(self._border_agent_locator)) |
| |
| def to_hex(self): |
| return struct.pack('>BBH', TlvType.BORDER_AGENT_LOCATOR, 2, self.border_agent_locator) |
| |
| |
| class BorderAgentLocatorFactory: |
| |
| def parse(self, data, message_info): |
| border_agent_locator = struct.unpack(">H", data.read(2))[0] |
| return BorderAgentLocator(border_agent_locator) |
| |
| |
| # CommissionerId TLV (10) |
| class CommissionerId(object): |
| |
| def __init__(self, commissioner_id): |
| self._commissioner_id = commissioner_id |
| |
| @property |
| def commissioner_id(self): |
| return self._commissioner_id |
| |
| def __eq__(self, other): |
| return self.commissioner_id == other.commissioner_id |
| |
| def __repr__(self): |
| return "CommissionerId(commissioner_id={})".format(self.commissioner_id) |
| |
| |
| class CommissionerIdFactory(object): |
| |
| def parse(self, data, message_info): |
| commissioner_id = data.getvalue().decode('utf-8') |
| return CommissionerId(commissioner_id) |
| |
| |
| # Commissioner Session ID TLV (11) |
| class CommissionerSessionId(object): |
| |
| def __init__(self, commissioner_session_id): |
| self._commissioner_session_id = commissioner_session_id |
| |
| @property |
| def commissioner_session_id(self): |
| return self._commissioner_session_id |
| |
| def __eq__(self, other): |
| common.expect_the_same_class(self, other) |
| |
| return self._commissioner_session_id == other._commissioner_session_id |
| |
| def __repr__(self): |
| return "CommissionerSessionId(commissioner_session_id={})".format(self._commissioner_session_id) |
| |
| def to_hex(self): |
| return struct.pack( |
| '>BBH', |
| TlvType.COMMISSIONER_SESSION_ID, |
| 2, |
| self.commissioner_session_id, |
| ) |
| |
| |
| class CommissionerSessionIdFactory: |
| |
| def parse(self, data, message_info): |
| session_id = struct.unpack(">H", data.read(2))[0] |
| return CommissionerSessionId(session_id) |
| |
| |
| # SecurityPolicy TLV (12) |
| class SecurityPolicy(object): |
| # TODO: Not implemented yet |
| pass |
| |
| |
| class SecurityPolicyFactory(object): |
| # TODO: Not implemented yet |
| |
| def parse(self, data, message_info): |
| raise NotImplementedError("TODO: Not implemented yet") |
| |
| |
| # Get TLV (13) |
| class Get(object): |
| # TODO: Not implemented yet |
| pass |
| |
| |
| class GetFactory(object): |
| # TODO: Not implemented yet |
| |
| def parse(self, data, message_info): |
| raise NotImplementedError("TODO: Not implemented yet") |
| |
| |
| # ActiveTimestamp TLV (14) |
| class ActiveTimestamp(object): |
| # TODO: Not implemented yet |
| pass |
| |
| |
| class ActiveTimestampFactory(object): |
| # TODO: Not implemented yet |
| |
| def parse(self, data, message_info): |
| raise NotImplementedError("TODO: Not implemented yet") |
| |
| |
| # Commissioner UDP Port TLV (15) |
| class CommissionerUdpPort(object): |
| |
| def __init__(self, udp_port): |
| self._udp_port = udp_port |
| |
| @property |
| def udp_port(self): |
| return self._udp_port |
| |
| def __eq__(self, other): |
| common.expect_the_same_class(self, other) |
| |
| return self._udp_port == other._udp_port |
| |
| def __repr__(self): |
| return "CommissionerUdpPort(udp_port={})".format(self._udp_port) |
| |
| |
| class CommissionerUdpPortFactory: |
| |
| def parse(self, data, message_info): |
| udp_port = struct.unpack(">H", data.read(2))[0] |
| return CommissionerUdpPort(udp_port) |
| |
| |
| # State TLV (16) |
| class State(object): |
| |
| def __init__(self, state): |
| self._state = state |
| |
| @property |
| def state(self): |
| return self._state |
| |
| def __eq__(self, other): |
| return self.state == other.state |
| |
| def __repr__(self): |
| return "State(state={})".format(self.state) |
| |
| |
| class StateFactory: |
| |
| def parse(self, data, message_info): |
| state = ord(data.read(1)) |
| return State(state) |
| |
| |
| # JoinerDtlsEncapsulation TLV (17) |
| class JoinerDtlsEncapsulation(object): |
| # TODO: Not implemented yet |
| pass |
| |
| |
| class JoinerDtlsEncapsulationFactory(object): |
| # TODO: Not implemented yet |
| |
| def parse(self, data, message_info): |
| raise NotImplementedError("TODO: Not implemented yet") |
| |
| |
| # JoinerUdpPort TLV (18) |
| class JoinerUdpPort(object): |
| |
| def __init__(self, udp_port): |
| self._udp_port = udp_port |
| |
| @property |
| def udp_port(self): |
| return self._udp_port |
| |
| def __eq__(self, other): |
| return (isinstance(self, type(other)) and self.udp_port == other.udp_port) |
| |
| def __repr__(self): |
| return "JoinerUdpPort(udp_port={})".format(self.udp_port) |
| |
| |
| class JoinerUdpPortFactory(object): |
| |
| def parse(self, data, message_info): |
| udp_port = struct.unpack(">H", data.read(2))[0] |
| return JoinerUdpPort(udp_port) |
| |
| |
| # JoinerIID TLV (19) |
| class JoinerIID(object): |
| # TODO: Not implemented yet |
| pass |
| |
| |
| class JoinerIIDFactory(object): |
| # TODO: Not implemented yet |
| |
| def parse(self, data, message_info): |
| raise NotImplementedError("TODO: Not implemented yet") |
| |
| |
| # JoinerRouterLocator TLV (20) |
| class JoinerRouterLocator(object): |
| # TODO: Not implemented yet |
| pass |
| |
| |
| class JoinerRouterLocatorFactory(object): |
| # TODO: Not implemented yet |
| |
| def parse(self, data, message_info): |
| raise NotImplementedError("TODO: Not implemented yet") |
| |
| |
| # JoinerRouterKEK TLV (21) |
| class JoinerRouterKEK(object): |
| # TODO: Not implemented yet |
| pass |
| |
| |
| class JoinerRouterKEKFactory(object): |
| # TODO: Not implemented yet |
| |
| def parse(self, data, message_info): |
| raise NotImplementedError("TODO: Not implemented yet") |
| |
| |
| # ProvisioningURL TLV (32) |
| class ProvisioningUrl(object): |
| |
| def __init__(self, url): |
| self._url = url |
| |
| @property |
| def url(self): |
| return self._url |
| |
| def __repr__(self): |
| return "ProvisioningUrl(url={})".format(self.url) |
| |
| |
| class ProvisioningUrlFactory: |
| |
| def parse(self, data, message_info): |
| url = data.getvalue().decode('utf-8') |
| return ProvisioningUrl(url) |
| |
| |
| # VendorName TLV (33) |
| class VendorName(object): |
| |
| def __init__(self, vendor_name): |
| self._vendor_name = vendor_name |
| |
| @property |
| def vendor_name(self): |
| return self._vendor_name |
| |
| def __eq__(self, other): |
| return self.vendor_name == other.vendor_name |
| |
| def __repr__(self): |
| return "VendorName(vendor_name={})".format(self.vendor_name) |
| |
| |
| class VendorNameFactory: |
| |
| def parse(self, data, message_info): |
| vendor_name = data.getvalue().decode('utf-8') |
| return VendorName(vendor_name) |
| |
| |
| # VendorModel TLV (34) |
| class VendorModel(object): |
| |
| def __init__(self, vendor_model): |
| self._vendor_model = vendor_model |
| |
| @property |
| def vendor_model(self): |
| return self._vendor_model |
| |
| def __eq__(self, other): |
| return self.vendor_model == other.vendor_model |
| |
| def __repr__(self): |
| return "VendorModel(vendor_model={})".format(self.vendor_model) |
| |
| |
| class VendorModelFactory: |
| |
| def parse(self, data, message_info): |
| vendor_model = data.getvalue().decode('utf-8') |
| return VendorModel(vendor_model) |
| |
| |
| # VendorSWVersion TLV (35) |
| class VendorSWVersion(object): |
| |
| def __init__(self, vendor_sw_version): |
| self._vendor_sw_version = vendor_sw_version |
| |
| @property |
| def vendor_sw_version(self): |
| return self._vendor_sw_version |
| |
| def __eq__(self, other): |
| return self.vendor_sw_version == other.vendor_sw_version |
| |
| def __repr__(self): |
| return "VendorName(vendor_sw_version={})".format(self.vendor_sw_version) |
| |
| |
| class VendorSWVersionFactory: |
| |
| def parse(self, data, message_info): |
| vendor_sw_version = data.getvalue() |
| return VendorSWVersion(vendor_sw_version) |
| |
| |
| # VendorData TLV (36) |
| class VendorData(object): |
| |
| def __init__(self, data): |
| self._vendor_data = data |
| |
| @property |
| def vendor_data(self): |
| return self._vendor_data |
| |
| def __repr__(self): |
| return "Vendor(url={})".format(self.vendor_data) |
| |
| |
| class VendorDataFactory(object): |
| |
| def parse(self, data, message_info): |
| return VendorData(data) |
| |
| |
| # VendorStackVersion TLV (37) |
| class VendorStackVersion(object): |
| |
| def __init__(self, stack_vendor_oui, build, rev, minor, major): |
| self._stack_vendor_oui = stack_vendor_oui |
| self._build = build |
| self._rev = rev |
| self._minor = minor |
| self._major = major |
| return |
| |
| @property |
| def stack_vendor_oui(self): |
| return self._stack_vendor_oui |
| |
| @property |
| def build(self): |
| return self._build |
| |
| @property |
| def rev(self): |
| return self._rev |
| |
| @property |
| def minor(self): |
| return self._minor |
| |
| @property |
| def major(self): |
| return self._major |
| |
| def __repr__(self): |
| return "VendorStackVersion(vendor_stack_version={}, build={}, rev={}, minor={}, major={})".format( |
| self.stack_vendor_oui, self.build, self.rev, self.minor, self.major) |
| |
| |
| class VendorStackVersionFactory: |
| |
| def parse(self, data, message_info): |
| stack_vendor_oui = struct.unpack(">H", data.read(2))[0] |
| rest = struct.unpack(">BBBB", data.read(4)) |
| build = rest[1] << 4 | (0xf0 & rest[2]) |
| rev = 0xF & rest[2] |
| minor = rest[3] & 0xf0 |
| major = rest[3] & 0xF |
| return VendorStackVersion(stack_vendor_oui, build, rev, minor, major) |
| |
| |
| # UdpEncapsulation TLV (48) |
| class UdpEncapsulation(object): |
| # TODO: Not implemented yet |
| pass |
| |
| |
| class UdpEncapsulationFactory(object): |
| # TODO: Not implemented yet |
| |
| def parse(self, data, message_info): |
| raise NotImplementedError("TODO: Not implemented yet") |
| |
| |
| # Ipv6Address TLV (49) |
| class Ipv6Address(object): |
| # TODO: Not implemented yet |
| pass |
| |
| |
| class Ipv6AddressFactory(object): |
| # TODO: Not implemented yet |
| |
| def parse(self, data, message_info): |
| raise NotImplementedError("TODO: Not implemented yet") |
| |
| |
| # PendingTimestamp TLV (51) |
| class PendingTimestamp(object): |
| # TODO: Not implemented yet |
| pass |
| |
| |
| class PendingTimestampFactory(object): |
| # TODO: Not implemented yet |
| |
| def parse(self, data, message_info): |
| raise NotImplementedError("TODO: Not implemented yet") |
| |
| |
| # DelayTimer TLV (52) |
| class DelayTimer(object): |
| # TODO: Not implemented yet |
| pass |
| |
| |
| class DelayTimerFactory(object): |
| # TODO: Not implemented yet |
| |
| def parse(self, data, message_info): |
| raise NotImplementedError("TODO: Not implemented yet") |
| |
| |
| # ChannelMask TLV (53) |
| class ChannelMask(object): |
| # TODO: Not implemented yet |
| pass |
| |
| |
| class ChannelMaskFactory(object): |
| # TODO: Not implemented yet |
| |
| def parse(self, data, message_info): |
| raise NotImplementedError("TODO: Not implemented yet") |
| |
| |
| # Count TLV (54) |
| class Count(object): |
| # TODO: Not implemented yet |
| pass |
| |
| |
| class CountFactory(object): |
| # TODO: Not implemented yet |
| |
| def parse(self, data, message_info): |
| raise NotImplementedError("TODO: Not implemented yet") |
| |
| |
| # Period TLV (55) |
| class Period(object): |
| # TODO: Not implemented yet |
| pass |
| |
| |
| class PeriodFactory(object): |
| # TODO: Not implemented yet |
| |
| def parse(self, data, message_info): |
| raise NotImplementedError("TODO: Not implemented yet") |
| |
| |
| # ScanDuration TLV (56) |
| class ScanDuration(object): |
| # TODO: Not implemented yet |
| pass |
| |
| |
| class ScanDurationFactory(object): |
| # TODO: Not implemented yet |
| |
| def parse(self, data, message_info): |
| raise NotImplementedError("TODO: Not implemented yet") |
| |
| |
| # EnergyList TLV (57) |
| class EnergyList(object): |
| # TODO: Not implemented yet |
| pass |
| |
| |
| class EnergyListFactory(object): |
| # TODO: Not implemented yet |
| |
| def parse(self, data, message_info): |
| raise NotImplementedError("TODO: Not implemented yet") |
| |
| |
| # Discovery Request TLV (128) |
| class DiscoveryRequest(object): |
| |
| def __init__(self, version, joiner_flag): |
| self._version = version |
| self._joiner_flag = joiner_flag |
| |
| @property |
| def version(self): |
| return self._version |
| |
| @property |
| def joiner_flag(self): |
| return self._joiner_flag |
| |
| def __eq__(self, other): |
| return (isinstance(self, type(other)) and self.version == other.version and |
| self.joiner_flag == other.joiner_flag) |
| |
| def __repr__(self): |
| return "DiscoveryRequest(version={}, joiner_flag={})".format(self.version, self.joiner_flag) |
| |
| |
| class DiscoveryRequestFactory(object): |
| |
| def parse(self, data, message_info): |
| data_byte = struct.unpack(">B", data.read(1))[0] |
| version = (data_byte & 0xf0) >> 4 |
| joiner_flag = (data_byte & 0x08) >> 3 |
| |
| return DiscoveryRequest(version, joiner_flag) |
| |
| |
| # Discovery Response TLV (128) |
| class DiscoveryResponse(object): |
| |
| def __init__(self, version, native_flag): |
| self._version = version |
| self._native_flag = native_flag |
| |
| @property |
| def version(self): |
| return self._version |
| |
| @property |
| def native_flag(self): |
| return self._native_flag |
| |
| def __eq__(self, other): |
| return (isinstance(self, type(other)) and self.version == other.version and |
| self.native_flag == other.native_flag) |
| |
| def __repr__(self): |
| return "DiscoveryResponse(version={}, native_flag={})".format(self.version, self.native_flag) |
| |
| |
| class DiscoveryResponseFactory(object): |
| |
| def parse(self, data, message_info): |
| data_byte = struct.unpack(">B", data.read(1))[0] |
| version = (data_byte & 0xf0) >> 4 |
| native_flag = (data_byte & 0x08) >> 3 |
| |
| return DiscoveryResponse(version, native_flag) |
| |
| |
| class MeshCopCommand(object): |
| |
| def __init__(self, _type, tlvs): |
| self._type = _type |
| self._tlvs = tlvs |
| |
| @property |
| def type(self): |
| return self._type |
| |
| @property |
| def tlvs(self): |
| return self._tlvs |
| |
| def __repr__(self): |
| tlvs_str = ", ".join(["{}".format(tlv) for tlv in self.tlvs]) |
| return "MeshCopCommand(type={}, tlvs=[{}])".format(self.type, tlvs_str) |
| |
| |
| def create_deault_mesh_cop_msg_type_map(): |
| return { |
| 'JOIN_FIN.req': MeshCopMessageType.JOIN_FIN_REQ, |
| 'JOIN_FIN.rsp': MeshCopMessageType.JOIN_FIN_RSP, |
| 'JOIN_ENT.ntf': MeshCopMessageType.JOIN_ENT_NTF, |
| 'JOIN_ENT.rsp': MeshCopMessageType.JOIN_ENT_RSP, |
| } |
| |
| |
| class MeshCopCommandFactory: |
| |
| def __init__(self, tlvs_factories): |
| self._tlvs_factories = tlvs_factories |
| self._mesh_cop_msg_type_map = create_deault_mesh_cop_msg_type_map() |
| |
| def _get_length(self, data): |
| return ord(data.read(1)) |
| |
| def _get_tlv_factory(self, _type): |
| try: |
| return self._tlvs_factories[_type] |
| except KeyError: |
| logging.error('Could not find TLV factory. Unsupported TLV type: {}'.format(_type)) |
| return UnknownTlvFactory(_type) |
| |
| def _parse_tlv(self, data): |
| _type = TlvType(ord(data.read(1))) |
| length = self._get_length(data) |
| value = data.read(length) |
| factory = self._get_tlv_factory(_type) |
| return factory.parse(io.BytesIO(value), None) # message_info not needed here |
| |
| def _get_mesh_cop_msg_type(self, msg_type_str): |
| try: |
| return self._mesh_cop_msg_type_map[msg_type_str] |
| except KeyError: |
| raise KeyError('Mesh cop message type not found: {}'.format(msg_type_str)) |
| |
| def parse(self, cmd_type_str, data): |
| cmd_type = self._get_mesh_cop_msg_type(cmd_type_str) |
| |
| tlvs = [] |
| while data.tell() < len(data.getvalue()): |
| tlv = self._parse_tlv(data) |
| tlvs.append(tlv) |
| |
| return MeshCopCommand(cmd_type, tlvs) |
| |
| |
| def create_default_mesh_cop_tlv_factories(): |
| return { |
| TlvType.STATE: StateFactory(), |
| TlvType.PROVISIONING_URL: ProvisioningUrlFactory(), |
| TlvType.VENDOR_NAME: VendorNameFactory(), |
| TlvType.VENDOR_MODEL: VendorModelFactory(), |
| TlvType.VENDOR_SW_VERSION: VendorSWVersionFactory(), |
| TlvType.VENDOR_DATA: VendorDataFactory(), |
| TlvType.VENDOR_STACK_VERSION: VendorStackVersionFactory(), |
| } |
| |
| |
| class ThreadDiscoveryTlvsFactory(SubTlvsFactory): |
| |
| def __init__(self, sub_tlvs_factories): |
| super(ThreadDiscoveryTlvsFactory, self).__init__(sub_tlvs_factories) |