| #!/usr/bin/env python3 |
| # |
| # Copyright (c) 2020, The OpenThread Authors. |
| # All rights reserved. |
| # |
| # Redistribution and use in source and binary forms, with or without |
| # modification, are permitted provided that the following conditions are met: |
| # 1. Redistributions of source code must retain the above copyright |
| # notice, this list of conditions and the following disclaimer. |
| # 2. Redistributions in binary form must reproduce the above copyright |
| # notice, this list of conditions and the following disclaimer in the |
| # documentation and/or other materials provided with the distribution. |
| # 3. Neither the name of the copyright holder nor the |
| # names of its contributors may be used to endorse or promote products |
| # derived from this software without specific prior written permission. |
| # |
| # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" |
| # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE |
| # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE |
| # ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE |
| # LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR |
| # CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF |
| # SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS |
| # INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN |
| # CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) |
| # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
| # POSSIBILITY OF SUCH DAMAGE. |
| # |
| |
| import struct |
| |
| from enum import IntEnum |
| from typing import List |
| |
| import common |
| import ipaddress |
| import mle |
| |
| |
| class TlvType(IntEnum): |
| EXT_ADDRESS = 0 |
| ADDRESS16 = 1 |
| MODE = 2 |
| POLLING_PERIOD = 3 |
| CONNECTIVITY = 4 |
| ROUTE64 = 5 |
| LEADER_DATA = 6 |
| NETWORK_DATA = 7 |
| IPV6_ADDRESS_LIST = 8 |
| MAC_COUNTERS = 9 |
| BATTERY_LEVEL = 14 |
| SUPPLY_VOLTAGE = 15 |
| CHILD_TABLE = 16 |
| CHANNEL_PAGES = 17 |
| TYPE_LIST = 18 |
| MAX_CHILD_TIMEOUT = 19 |
| |
| |
| class Ipv6AddressList: |
| |
| def __init__(self, addresses: List[ipaddress.IPv6Address]): |
| self._addresses = addresses |
| |
| @property |
| def addresses(self): |
| return self._addresses |
| |
| def __eq__(self, other): |
| common.expect_the_same_class(self, other) |
| return self.addresses == other.addresses |
| |
| def __repr__(self): |
| return f'Ipv6AddressList({self.addresses})' |
| |
| |
| class Ipv6AddressListFactory: |
| |
| def parse(self, data, message_info): |
| addresses = [] |
| while data.tell() < message_info.length: |
| addresses.append(ipaddress.IPv6Address(data.read(16))) |
| return Ipv6AddressList(addresses) |
| |
| |
| class MacCounters: |
| |
| def __init__(self, counters: List[int]): |
| self._counters = counters |
| |
| @property |
| def if_in_unknown_protos(self): |
| return self._counters[0] |
| |
| @property |
| def if_in_errors(self): |
| return self._counters[1] |
| |
| @property |
| def if_out_errors(self): |
| return self._counters[2] |
| |
| @property |
| def if_in_ucast_pkts(self): |
| return self._counters[3] |
| |
| @property |
| def if_in_broadcast_pkts(self): |
| return self._counters[4] |
| |
| @property |
| def if_in_discards(self): |
| return self._counters[5] |
| |
| @property |
| def if_out_ucast_pkts(self): |
| return self._counters[6] |
| |
| @property |
| def if_out_broadcast_pkts(self): |
| return self._counters[7] |
| |
| @property |
| def if_out_discards(self): |
| return self._counters[8] |
| |
| @property |
| def counters(self): |
| return self._counters |
| |
| def __eq__(self, other): |
| common.expect_the_same_class(self, other) |
| |
| return self.counters == other.counters |
| |
| def __repr__(self): |
| return ('MacCounters(' + f'if_in_unknown_protos={self.if_in_unknown_protos}, ' + |
| f'if_in_errors={self.if_in_errors}, ' + f'if_out_errors={self.if_out_errors}, ' + |
| f'if_in_ucast_pkts={self.if_in_ucast_pkts}, ' + f'if_in_broadcast_pkts={self.if_in_broadcast_pkts}, ' + |
| f'if_in_discards={self.if_in_discards}, ' + f'if_out_ucast_pkts={self.if_out_ucast_pkts}, ' + |
| f'if_out_broadcast_pkts={self.if_out_broadcast_pkts}, ' + f'if_out_discards={self.if_out_discards})') |
| |
| |
| class MacCountersFactory: |
| |
| def parse(self, data, message_info): |
| return MacCounters(struct.unpack('>9I', data.read(4 * 9))) |
| |
| |
| class BatteryLevel: |
| |
| def __init__(self, battery_level: int): |
| self._battery_level = battery_level |
| |
| @property |
| def battery_level(self): |
| return self._battery_level |
| |
| def __eq__(self, other): |
| common.expect_the_same_class(self, other) |
| |
| return self.battery_level == other.battery_level |
| |
| def __repr__(self): |
| return f'BatteryLevel(battery_level={self.battery_level})' |
| |
| |
| class BatteryLevelFactory: |
| |
| def parse(self, data, message_info): |
| return BatteryLevel(struct.unpack('>B', data.read(1))[0]) |
| |
| |
| class SupplyVoltage: |
| |
| def __init__(self, supply_voltage: int): |
| self._supply_voltage = supply_voltage |
| |
| @property |
| def supply_voltage(self): |
| return self._supply_voltage |
| |
| def __eq__(self, other): |
| common.expect_the_same_class(self, other) |
| |
| return self.supply_voltage == other.supply_voltage |
| |
| def __repr__(self): |
| return f'SupplyVoltage(supply_voltage={self.supply_voltage})' |
| |
| |
| class SupplyVoltageFactory: |
| |
| def parse(self, data, message_info): |
| return SupplyVoltage(struct.unpack('>H', data.read(2))[0]) |
| |
| |
| class ChildTableEntry: |
| |
| def __init__(self, timeout: int, child_id: int, mode: mle.Mode): |
| self._timeout = timeout |
| self._child_id = child_id |
| self._mode = mode |
| |
| @property |
| def timeout(self): |
| return self._timeout |
| |
| @property |
| def child_id(self): |
| return self._child_id |
| |
| @property |
| def mode(self): |
| return self._mode |
| |
| def __eq__(self, other): |
| common.expect_the_same_class(self, other) |
| |
| return (self.timeout == other.timeout and self.child_id == other.child_id and self.mode == other.mode) |
| |
| def __repr__(self): |
| return f'ChildTableEntry(timeout={self.timeout}, child_id={self.child_id}, mode={self.mode})' |
| |
| |
| class ChildTable: |
| |
| def __init__(self, children: List[ChildTableEntry]): |
| self._children = sorted(children, key=lambda child: child.child_id) |
| |
| @property |
| def children(self): |
| return self._children |
| |
| def __eq__(self, other): |
| common.expect_the_same_class(self, other) |
| |
| return self.children == other.children |
| |
| def __repr__(self): |
| return f'ChildTable({self.children})' |
| |
| |
| class ChildTableFactory: |
| |
| def parse(self, data, message_info): |
| children = [] |
| while message_info.length > 0: |
| timeout_and_id = struct.unpack('>H', data.read(2))[0] |
| message_info.length -= 2 |
| |
| timeout = (timeout_and_id & 0xf800) >> 11 |
| child_id = timeout_and_id & 0x1fff |
| |
| mode = mle.ModeFactory().parse(data, message_info) |
| message_info.length -= 1 |
| |
| children.append(ChildTableEntry(timeout, child_id, mode)) |
| return ChildTable(children) |
| |
| |
| class ChannelPages: |
| |
| def __init__(self, channel_pages: bytes): |
| self._channel_pages = channel_pages |
| |
| @property |
| def channel_pages(self): |
| return self._channel_pages |
| |
| def __eq__(self, other): |
| common.expect_the_same_class(self, other) |
| |
| return self.channel_pages == other.channel_pages |
| |
| def __repr__(self): |
| return f'ChannelPages(channel_pages={self.channel_pages})' |
| |
| |
| class ChannelPagesFactory: |
| |
| def parse(self, data, message_info): |
| return ChannelPages(data.getvalue()) |
| |
| |
| class TypeList: |
| |
| def __init__(self, tlv_types: List[int]): |
| self._tlv_types = tlv_types |
| |
| @property |
| def tlv_types(self): |
| return self._tlv_types |
| |
| def __eq__(self, other): |
| common.expect_the_same_class(self, other) |
| |
| return self.tlv_types == other.tlv_types |
| |
| def __repr__(self): |
| return f'TypeList(tlv_types={self.tlv_types})' |
| |
| |
| class TypeListFactory: |
| |
| def parse(self, data, message_info): |
| return TypeList([ord(t) for t in data.getvalue()]) |
| |
| |
| class MaxChildTimeout: |
| |
| def __init__(self, max_child_timeout: int): |
| self._max_child_timeout = max_child_timeout |
| |
| @property |
| def max_child_timeout(self): |
| return self._max_child_timeout |
| |
| def __eq__(self, other): |
| common.expect_the_same_class(self, other) |
| |
| return self.max_child_timeout == other.max_child_timeout |
| |
| def __repr__(self): |
| return f'MaxChildTimeout(max_child_timeout={self.max_child_timeout})' |
| |
| |
| class MaxChildTimeoutFactory: |
| |
| def parse(self, data, message_info): |
| return MaxChildTimeout(struct.unpack('>I', data.read(4))[0]) |