| #!/usr/bin/env python3 |
| # |
| # Copyright (c) 2019, The OpenThread Authors. |
| # All rights reserved. |
| # |
| # Redistribution and use in source and binary forms, with or without |
| # modification, are permitted provided that the following conditions are met: |
| # 1. Redistributions of source code must retain the above copyright |
| # notice, this list of conditions and the following disclaimer. |
| # 2. Redistributions in binary form must reproduce the above copyright |
| # notice, this list of conditions and the following disclaimer in the |
| # documentation and/or other materials provided with the distribution. |
| # 3. Neither the name of the copyright holder nor the |
| # names of its contributors may be used to endorse or promote products |
| # derived from this software without specific prior written permission. |
| # |
| # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" |
| # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE |
| # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE |
| # ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE |
| # LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR |
| # CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF |
| # SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS |
| # INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN |
| # CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) |
| # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
| # POSSIBILITY OF SUCH DAMAGE. |
| # |
| import datetime |
| import sys |
| import time |
| from typing import Any, Union |
| |
| from pyshark.packet.fields import LayerFieldsContainer, LayerField |
| from pyshark.packet.packet import Packet as RawPacket |
| |
| from pktverify.addrs import EthAddr, ExtAddr, Ipv6Addr |
| from pktverify.bytes import Bytes |
| from pktverify.consts import VALID_LAYER_NAMES |
| from pktverify.null_field import nullField |
| |
| |
| def _auto(v: Union[LayerFieldsContainer, LayerField]): |
| """parse the layer field automatically according to its format""" |
| assert not isinstance(v, LayerFieldsContainer) or len(v.fields) == 1 or v.get_default_value() is not None, v.fields |
| dv = v.get_default_value() |
| rv = v.raw_value |
| |
| if dv.startswith('0x'): |
| return int(dv, 16) |
| |
| try: |
| if dv == rv: |
| return int(dv) |
| elif int(dv) == int(rv, 16): |
| return int(dv) |
| except (ValueError, TypeError): |
| pass |
| |
| if rv is None: |
| try: |
| return int(dv) |
| except (ValueError, TypeError): |
| pass |
| |
| if ':' in dv and '::' not in dv and dv.replace(':', '') == rv: # '88:00', '8800' |
| return int(rv, 16) |
| |
| # timestamp: 'Jan 1, 1970 08:00:00.000000000 CST', '0000000000000000' |
| # convert to seconds from 1970, ignore the nanosecond for now since |
| # there are integer seconds applied in the test cases |
| try: |
| time_str = datetime.datetime.strptime(dv, "%b %d, %Y %H:%M:%S.%f000 %Z") |
| time_in_sec = time.mktime(time_str.utctimetuple()) |
| return int(time_in_sec) |
| except (ValueError, TypeError): |
| pass |
| |
| try: |
| int(rv, 16) |
| return int(dv) |
| except Exception: |
| pass |
| |
| raise ValueError((v, v.get_default_value(), v.raw_value)) |
| |
| |
| def _payload(v: Union[LayerFieldsContainer, LayerField]) -> bytearray: |
| """parse the layer field as a bytearray""" |
| assert not isinstance(v, LayerFieldsContainer) or len(v.fields) == 1 |
| hex_value = v.raw_value |
| assert len(hex_value) % 2 == 0 |
| s = bytearray() |
| for i in range(0, len(hex_value), 2): |
| s.append(int(hex_value[i:i + 2], 16)) |
| |
| return s |
| |
| |
| def _hex(v: Union[LayerFieldsContainer, LayerField]) -> int: |
| """parse the layer field as a hex string""" |
| # split v into octets and reverse the order |
| assert not isinstance(v, LayerFieldsContainer) or len(v.fields) == 1 |
| return int(v.get_default_value(), 16) |
| |
| |
| def _raw_hex(v: Union[LayerFieldsContainer, LayerField]) -> int: |
| """parse the layer field as a raw hex string""" |
| # split v into octets and reverse the order |
| assert not isinstance(v, LayerFieldsContainer) or len(v.fields) == 1 |
| iv = v.hex_value |
| |
| try: |
| int(v.get_default_value()) |
| assert int(v.get_default_value()) == iv, (v.get_default_value(), v.raw_value) |
| except ValueError: |
| pass |
| |
| try: |
| int(v.get_default_value(), 16) |
| assert int(v.get_default_value(), 16) == iv, (v.get_default_value(), v.raw_value) |
| except ValueError: |
| pass |
| |
| return iv |
| |
| |
| def _raw_hex_rev(v: Union[LayerFieldsContainer, LayerField]) -> int: |
| """parse the layer field as a reversed raw hex string""" |
| # split v into octets and reverse the order |
| assert not isinstance(v, LayerFieldsContainer) or len(v.fields) == 1 |
| rv = v.raw_value |
| octets = [rv[i:i + 2] for i in range(0, len(rv), 2)] |
| |
| iv = int(''.join(reversed(octets)), 16) |
| |
| try: |
| int(v.get_default_value()) |
| assert int(v.get_default_value()) == iv, (v.get_default_value(), v.raw_value) |
| except ValueError: |
| pass |
| |
| try: |
| int(v.get_default_value(), 16) |
| assert int(v.get_default_value(), 16) == iv, (v.get_default_value(), v.raw_value) |
| except ValueError: |
| pass |
| |
| return iv |
| |
| |
| def _dec(v: Union[LayerFieldsContainer, LayerField]) -> int: |
| """parse the layer field as a decimal""" |
| assert not isinstance(v, LayerFieldsContainer) or len(v.fields) == 1 |
| return int(v.get_default_value()) |
| |
| |
| def _float(v: Union[LayerFieldsContainer, LayerField]) -> float: |
| """parse the layer field as a float""" |
| assert not isinstance(v, LayerFieldsContainer) or len(v.fields) == 1 |
| return float(v.get_default_value()) |
| |
| |
| def _str(v: Union[LayerFieldsContainer, LayerField]) -> str: |
| """parse the layer field as a string""" |
| assert not isinstance(v, LayerFieldsContainer) or len(v.fields) == 1 |
| return str(v.get_default_value()) |
| |
| |
| def _bytes(v: Union[LayerFieldsContainer, LayerField]) -> Bytes: |
| """parse the layer field as raw bytes""" |
| assert not isinstance(v, LayerFieldsContainer) or len(v.fields) == 1 |
| return Bytes(v.raw_value) |
| |
| |
| def _ext_addr(v: Union[LayerFieldsContainer, LayerField]) -> ExtAddr: |
| """parse the layer field as an extended address""" |
| assert not isinstance(v, LayerFieldsContainer) or len(v.fields) == 1 |
| return ExtAddr(v.get_default_value()) |
| |
| |
| def _ipv6_addr(v: Union[LayerFieldsContainer, LayerField]) -> Ipv6Addr: |
| """parse the layer field as an IPv6 address""" |
| assert not isinstance(v, LayerFieldsContainer) or len(v.fields) == 1 |
| return Ipv6Addr(v.get_default_value()) |
| |
| |
| def _eth_addr(v: Union[LayerFieldsContainer, LayerField]) -> EthAddr: |
| """parse the layer field as an Ethernet MAC address""" |
| assert not isinstance(v, LayerFieldsContainer) or len(v.fields) == 1, v.fields |
| return EthAddr(v.get_default_value()) |
| |
| |
| def _routerid_set(v: Union[LayerFieldsContainer, LayerField]) -> set: |
| """parse the layer field as a set of router ids |
| |
| Notes: the router ID mask in wireshark is a |
| hexadecimal string separated by ':' |
| """ |
| assert not isinstance(v, LayerFieldsContainer) or len(v.fields) == 1 |
| |
| try: |
| ridmask = str(v.get_default_value()) |
| assert isinstance(ridmask, str), ridmask |
| ridmask_int = int(ridmask.replace(':', ''), base=16) |
| rid_set = set() |
| count = 0 |
| while ridmask_int: |
| count += 1 |
| if ridmask_int & 1: |
| rid_set.add(64 - count) |
| ridmask_int = ridmask_int >> 1 |
| except ValueError: |
| pass |
| |
| return rid_set |
| |
| |
| class _first(object): |
| """parse the first layer field""" |
| |
| def __init__(self, sub_parse): |
| self._sub_parse = sub_parse |
| |
| def __call__(self, v: Union[LayerFieldsContainer, LayerField]): |
| return self._sub_parse(v.fields[0]) |
| |
| |
| class _list(object): |
| """parse all layer fields into a list""" |
| |
| def __init__(self, sub_parse): |
| self._sub_parse = sub_parse |
| |
| def __call__(self, v: Union[LayerFieldsContainer, LayerField]): |
| return [self._sub_parse(f) for f in v.fields] |
| |
| |
| _LAYER_FIELDS = { |
| # WPAN |
| 'wpan.fcf': _raw_hex_rev, |
| 'wpan.cmd': _auto, |
| 'wpan.security': _auto, |
| 'wpan.frame_type': _auto, |
| 'wpan.pending': _auto, |
| 'wpan.ack_request': _auto, |
| 'wpan.pan_id_compression': _auto, |
| 'wpan.seqno_suppression': _auto, |
| 'wpan.ie_present': _auto, |
| 'wpan.dst_addr_mode': _auto, |
| 'wpan.version': _auto, |
| 'wpan.src_addr_mode': _auto, |
| 'wpan.dst_pan': _auto, |
| 'wpan.seq_no': _auto, |
| 'wpan.src16': _auto, |
| 'wpan.dst16': _auto, |
| 'wpan.src64': _ext_addr, |
| 'wpan.dst64': _ext_addr, |
| 'wpan.fcs': _raw_hex_rev, |
| 'wpan.fcs_ok': _auto, |
| 'wpan.frame_length': _dec, |
| 'wpan.key_number': _auto, |
| 'wpan.aux_sec.sec_suite': _auto, |
| 'wpan.aux_sec.security_control_field': _auto, |
| 'wpan.aux_sec.sec_level': _auto, |
| 'wpan.aux_sec.key_id_mode': _auto, |
| 'wpan.aux_sec.frame_counter_suppression': _auto, |
| 'wpan.aux_sec.asn_in_nonce': _auto, |
| 'wpan.aux_sec.reserved': _auto, |
| 'wpan.aux_sec.frame_counter': _auto, |
| 'wpan.aux_sec.key_source': _auto, |
| 'wpan.aux_sec.key_index': _auto, |
| 'wpan.aux_sec.hdr': _str, |
| 'wpan.mic': _auto, |
| 'wpan.channel': _auto, |
| 'wpan.header_ie.id': _list(_auto), |
| 'wpan.header_ie.csl.period': _auto, |
| 'wpan.payload_ie.vendor.oui': _auto, |
| |
| # MLE |
| 'mle.cmd': _auto, |
| 'mle.sec_suite': _hex, |
| 'mle.tlv.type': _list(_dec), |
| 'mle.tlv.len': _list(_dec), |
| 'mle.tlv.mode.receiver_on_idle': _auto, |
| 'mle.tlv.mode.reserved1': _auto, |
| 'mle.tlv.mode.reserved2': _auto, |
| 'mle.tlv.mode.device_type_bit': _auto, |
| 'mle.tlv.mode.network_data': _auto, |
| 'mle.tlv.challenge': _bytes, |
| 'mle.tlv.scan_mask.r': _auto, |
| 'mle.tlv.scan_mask.e': _auto, |
| 'mle.tlv.version': _auto, |
| 'mle.tlv.source_addr': _auto, |
| 'mle.tlv.active_tstamp': _auto, |
| 'mle.tlv.pending_tstamp': _auto, |
| 'mle.tlv.leader_data.partition_id': _auto, |
| 'mle.tlv.leader_data.weighting': _auto, |
| 'mle.tlv.leader_data.data_version': _auto, |
| 'mle.tlv.leader_data.stable_data_version': _auto, |
| 'mle.tlv.leader_data.router_id': _auto, |
| 'mle.tlv.route64.nbr_out': _list(_auto), |
| 'mle.tlv.route64.nbr_in': _list(_auto), |
| 'mle.tlv.route64.id_seq': _auto, |
| 'mle.tlv.route64.id_mask': _routerid_set, |
| 'mle.tlv.route64.cost': _list(_auto), |
| 'mle.tlv.response': _bytes, |
| 'mle.tlv.mle_frm_cntr': _auto, |
| 'mle.tlv.ll_frm_cntr': _auto, |
| 'mle.tlv.link_margin': _auto, |
| 'mle.tlv.conn.sed_dgram_cnt': _auto, |
| 'mle.tlv.conn.sed_buf_size': _auto, |
| 'mle.tlv.conn.lq3': _auto, |
| 'mle.tlv.conn.lq2': _auto, |
| 'mle.tlv.conn.lq1': _auto, |
| 'mle.tlv.conn.leader_cost': _auto, |
| 'mle.tlv.conn.id_seq': _auto, |
| 'mle.tlv.conn.flags.pp': _auto, |
| 'mle.tlv.conn.active_rtrs': _auto, |
| 'mle.tlv.timeout': _auto, |
| 'mle.tlv.addr16': _auto, |
| 'mle.tlv.channel': _auto, |
| 'mle.tlv.addr_reg_iid': _list(_auto), |
| 'mle.tlv.link_enh_ack_flags': _auto, |
| 'mle.tlv.link_forward_series': _list(_auto), |
| 'mle.tlv.link_requested_type_id_flags': _list(_hex), |
| 'mle.tlv.link_sub_tlv': _auto, |
| 'mle.tlv.link_status_sub_tlv': _auto, |
| 'mle.tlv.query_id': _auto, |
| 'mle.tlv.metric_type_id_flags.type': _list(_hex), |
| 'mle.tlv.metric_type_id_flags.metric': _list(_hex), |
| 'mle.tlv.metric_type_id_flags.l': _list(_hex), |
| 'mle.tlv.link_requested_type_id_flags': _bytes, |
| |
| # IP |
| 'ip.version': _auto, |
| 'ip.src': _str, |
| 'ip.src_host': _str, |
| 'ip.dst': _str, |
| 'ip.dst_host': _str, |
| 'ip.ttl': _auto, |
| 'ip.proto': _auto, |
| 'ip.len': _auto, |
| 'ip.id': _auto, |
| 'ip.host': _list(_str), |
| 'ip.hdr_len': _dec, |
| 'ip.frag_offset': _auto, |
| 'ip.flags.rb': _auto, |
| 'ip.flags.mf': _auto, |
| 'ip.flags.df': _auto, |
| 'ip.dsfield.ecn': _auto, |
| 'ip.dsfield.dscp': _auto, |
| 'ip.checksum.status': _auto, |
| 'ip.addr': _list(_str), |
| 'ip.options.routeralert': _bytes, |
| 'ip.opt.type.number': _auto, |
| 'ip.opt.type.copy': _auto, |
| 'ip.opt.type.class': _auto, |
| 'ip.opt.ra': _auto, |
| 'ip.opt.len': _auto, |
| # UDP |
| 'udp.stream': _auto, |
| 'udp.srcport': _auto, |
| 'udp.dstport': _auto, |
| 'udp.length': _auto, |
| 'udp.port': _list(_dec), |
| 'udp.checksum.status': _auto, |
| |
| # IPv6 |
| 'ipv6.version': _auto, |
| 'ipv6.src': _ipv6_addr, |
| 'ipv6.src_host': _ipv6_addr, |
| 'ipv6.dst': _ipv6_addr, |
| 'ipv6.dst_host': _ipv6_addr, |
| 'ipv6.addr': _list(_ipv6_addr), |
| 'ipv6.tclass.dscp': _auto, |
| 'ipv6.tclass.ecn': _auto, |
| 'ipv6.flow': _auto, |
| 'ipv6.hlim': _auto, |
| 'ipv6.nxt': _auto, |
| 'ipv6.hopopts.len': _auto, |
| 'ipv6.hopopts.nxt': _auto, |
| 'ipv6.hopopts.len_oct': _dec, |
| 'ipv6.host': _list(_ipv6_addr), |
| 'ipv6.plen': _auto, |
| 'ipv6.opt.type.rest': _list(_auto), |
| 'ipv6.opt.type.change': _list(_auto), |
| 'ipv6.opt.type.action': _list(_auto), |
| 'ipv6.opt.router_alert': _auto, |
| 'ipv6.opt.padn': _str, |
| 'ipv6.opt.length': _list(_auto), |
| 'ipv6.opt.mpl.seed_id': _bytes, |
| 'ipv6.opt.mpl.sequence': _auto, |
| 'ipv6.opt.mpl.flag.v': _auto, |
| 'ipv6.opt.mpl.flag.s': _auto, |
| 'ipv6.opt.mpl.flag.rsv': _auto, |
| 'ipv6.opt.mpl.flag.m': _auto, |
| |
| # Eth |
| 'eth.src': _eth_addr, |
| 'eth.src_resolved': _eth_addr, |
| 'eth.dst': _eth_addr, |
| 'eth.dst_resolved': _eth_addr, |
| 'eth.type': _auto, |
| 'eth.addr': _list(_eth_addr), |
| 'eth.addr_resolved': _list(_eth_addr), |
| 'eth.ig': _list(_auto), |
| 'eth.lg': _list(_auto), |
| # 6LOWPAN |
| '6lowpan.src': _ipv6_addr, |
| '6lowpan.dst': _ipv6_addr, |
| '6lowpan.udp.src': _auto, |
| '6lowpan.udp.dst': _auto, |
| '6lowpan.udp.checksum': _auto, |
| '6lowpan.frag.offset': _auto, |
| '6lowpan.frag.tag': _auto, |
| '6lowpan.frag.size': _auto, |
| '6lowpan.pattern': _list(_auto), |
| '6lowpan.hops': _auto, |
| '6lowpan.padding': _auto, |
| '6lowpan.next': _auto, |
| '6lowpan.flow': _auto, |
| '6lowpan.ecn': _auto, |
| '6lowpan.iphc.tf': _auto, |
| '6lowpan.iphc.m': _auto, |
| '6lowpan.iphc.nh': _auto, |
| '6lowpan.iphc.hlim': _auto, |
| '6lowpan.iphc.cid': _auto, |
| '6lowpan.iphc.sac': _auto, |
| '6lowpan.iphc.sam': _auto, |
| '6lowpan.iphc.dac': _auto, |
| '6lowpan.iphc.dam': _auto, |
| '6lowpan.iphc.sci': _auto, |
| '6lowpan.iphc.dci': _auto, |
| '6lowpan.iphc.sctx.prefix': _ipv6_addr, |
| '6lowpan.iphc.dctx.prefix': _ipv6_addr, |
| '6lowpan.mesh.v': _auto, |
| '6lowpan.nhc.pattern': _list(_auto), |
| '6lowpan.nhc.udp.checksum': _auto, |
| '6lowpan.nhc.udp.ports': _auto, |
| '6lowpan.nhc.ext.nh': _auto, |
| '6lowpan.nhc.ext.length': _auto, |
| '6lowpan.nhc.ext.eid': _auto, |
| '6lowpan.reassembled.length': _auto, |
| '6lowpan.fragments': _str, |
| '6lowpan.fragment.count': _auto, |
| '6lowpan.mesh.orig16': _auto, |
| '6lowpan.mesh.hops8': _auto, |
| '6lowpan.mesh.hops': _auto, |
| '6lowpan.mesh.f': _auto, |
| '6lowpan.mesh.dest16': _auto, |
| |
| # ICMPv6 |
| 'icmpv6.type': _first(_auto), |
| 'icmpv6.code': _first(_auto), |
| 'icmpv6.checksum': _first(_auto), |
| 'icmpv6.reserved': _raw_hex, |
| 'icmpv6.resptime': _float, |
| 'icmpv6.resp_to': _auto, |
| 'icmpv6.mldr.nb_mcast_records': _auto, |
| 'icmpv6.nd.ra.cur_hop_limit': _auto, |
| 'icmpv6.nd.ns.target_address': _ipv6_addr, |
| 'icmpv6.nd.na.target_address': _ipv6_addr, |
| 'icmpv6.nd.na.flag.s': _auto, |
| 'icmpv6.nd.na.flag.o': _auto, |
| 'icmpv6.nd.na.flag.r': _auto, |
| 'icmpv6.nd.na.flag.rsv': _auto, |
| 'icmpv6.mldr.mar.record_type': _list(_auto), |
| 'icmpv6.mldr.mar.aux_data_len': _list(_auto), |
| 'icmpv6.mldr.mar.nb_sources': _list(_auto), |
| 'icmpv6.mldr.mar.multicast_address': _list(_ipv6_addr), |
| 'icmpv6.opt.type': _list(_auto), |
| 'icmpv6.opt.nonce': _bytes, |
| 'icmpv6.opt.linkaddr': _eth_addr, |
| 'icmpv6.opt.src_linkaddr': _eth_addr, |
| 'icmpv6.opt.target_linkaddr': _eth_addr, |
| 'icmpv6.opt.route_lifetime': _auto, |
| 'icmpv6.opt.route_info.flag.route_preference': _auto, |
| 'icmpv6.opt.route_info.flag.reserved': _auto, |
| 'icmpv6.opt.prefix.valid_lifetime': _auto, |
| 'icmpv6.opt.prefix.preferred_lifetime': _auto, |
| 'icmpv6.opt.prefix.length': _list(_auto), |
| 'icmpv6.opt.prefix.flag.reserved': _auto, |
| 'icmpv6.opt.prefix.flag.r': _auto, |
| 'icmpv6.opt.prefix.flag.l': _auto, |
| 'icmpv6.opt.prefix.flag.a': _auto, |
| 'icmpv6.opt.length': _list(_auto), |
| 'icmpv6.opt.reserved': _str, |
| 'icmpv6.nd.ra.router_lifetime': _auto, |
| 'icmpv6.nd.ra.retrans_timer': _auto, |
| 'icmpv6.nd.ra.reachable_time': _auto, |
| 'icmpv6.nd.ra.flag.rsv': _auto, |
| 'icmpv6.nd.ra.flag.prf': _auto, |
| 'icmpv6.nd.ra.flag.p': _auto, |
| 'icmpv6.nd.ra.flag.o': _auto, |
| 'icmpv6.nd.ra.flag.m': _auto, |
| 'icmpv6.nd.ra.flag.h': _auto, |
| 'icmpv6.echo.sequence_number': _auto, |
| 'icmpv6.echo.identifier': _auto, |
| 'icmpv6.data.len': _auto, |
| |
| # COAP |
| 'coap.code': _auto, |
| 'coap.version': _auto, |
| 'coap.type': _auto, |
| 'coap.mid': _auto, |
| 'coap.token_len': _auto, |
| 'coap.token': _auto, |
| 'coap.opt.uri_path': _list(_str), |
| 'coap.opt.name': _list(_str), |
| 'coap.opt.length': _list(_auto), |
| 'coap.opt.uri_path_recon': _str, |
| 'coap.payload': _payload, |
| 'coap.payload_length': _auto, |
| 'coap.payload_desc': _str, |
| 'coap.opt.end_marker': _auto, |
| 'coap.opt.desc': _list(_str), |
| 'coap.opt.delta': _list(_auto), |
| 'coap.response_to': _auto, |
| 'coap.response_time': _float, |
| # COAP TLVS |
| 'coap.tlv.type': _list(_auto), |
| 'coap.tlv.status': _auto, |
| 'coap.tlv.target_eid': _ipv6_addr, |
| 'coap.tlv.ml_eid': _ext_addr, |
| 'coap.tlv.last_transaction_time': _auto, |
| 'coap.tlv.rloc16': _auto, |
| 'coap.tlv.net_name': _str, |
| 'coap.tlv.ext_mac_addr': _ext_addr, |
| 'coap.tlv.router_mask_assigned': _auto, |
| 'coap.tlv.router_mask_id_seq': _auto, |
| |
| # dtls |
| 'dtls.handshake.type': _list(_auto), |
| 'dtls.handshake.cookie': _auto, |
| 'dtls.record.content_type': _list(_auto), |
| 'dtls.alert_message.desc': _auto, |
| |
| # thread beacon |
| 'thread_bcn.protocol': _auto, |
| 'thread_bcn.version': _auto, |
| 'thread_bcn.network_name': _str, |
| 'thread_bcn.epid': _ext_addr, |
| |
| # thread_address |
| 'thread_address.tlv.len': _list(_auto), |
| 'thread_address.tlv.type': _list(_auto), |
| 'thread_address.tlv.status': _auto, |
| 'thread_address.tlv.target_eid': _ipv6_addr, |
| 'thread_address.tlv.ext_mac_addr': _ext_addr, |
| 'thread_address.tlv.router_mask_id_seq': _auto, |
| 'thread_address.tlv.router_mask_assigned': _bytes, |
| 'thread_address.tlv.rloc16': _hex, |
| 'thread_address.tlv.target_eid': _ipv6_addr, |
| 'thread_address.tlv.ml_eid': _ext_addr, |
| |
| # thread bl |
| 'thread_bl.tlv.type': _list(_auto), |
| 'thread_bl.tlv.len': _list(_auto), |
| 'thread_bl.tlv.target_eid': _ipv6_addr, |
| 'thread_bl.tlv.ml_eid': _ext_addr, |
| 'thread_bl.tlv.last_transaction_time': _auto, |
| 'thread_bl.tlv.timeout': _auto, |
| # THEAD NM |
| 'thread_nm.tlv.type': _list(_auto), |
| 'thread_nm.tlv.ml_eid': _ext_addr, |
| 'thread_nm.tlv.target_eid': _ipv6_addr, |
| 'thread_nm.tlv.status': _auto, |
| 'thread_nm.tlv.timeout': _auto, |
| # thread_meshcop is not a real layer |
| 'thread_meshcop.len_size_mismatch': _str, |
| 'thread_meshcop.tlv.type': _list(_auto), |
| 'thread_meshcop.tlv.len8': _list(_auto), |
| 'thread_meshcop.tlv.net_name': _list(_str), # from thread_bl |
| 'thread_meshcop.tlv.commissioner_id': _str, |
| 'thread_meshcop.tlv.commissioner_sess_id': _auto, # from mle |
| "thread_meshcop.tlv.channel_page": _auto, # from ble |
| "thread_meshcop.tlv.channel": _list(_auto), # from ble |
| "thread_meshcop.tlv.chan_mask": _str, # from ble |
| 'thread_meshcop.tlv.chan_mask_page': _auto, |
| 'thread_meshcop.tlv.chan_mask_len': _auto, |
| 'thread_meshcop.tlv.chan_mask_mask': _bytes, |
| 'thread_meshcop.tlv.discovery_req_ver': _auto, |
| 'thread_meshcop.tlv.discovery_rsp_ver': _auto, |
| 'thread_meshcop.tlv.discovery_rsp_n': _auto, |
| 'thread_meshcop.tlv.energy_list': _list(_auto), |
| 'thread_meshcop.tlv.pan_id': _list(_auto), |
| 'thread_meshcop.tlv.xpan_id': _bytes, |
| 'thread_meshcop.tlv.ml_prefix': _bytes, |
| 'thread_meshcop.tlv.master_key': _bytes, |
| 'thread_meshcop.tlv.pskc': _bytes, |
| 'thread_meshcop.tlv.sec_policy_rot': _auto, |
| 'thread_meshcop.tlv.sec_policy_o': _auto, |
| 'thread_meshcop.tlv.sec_policy_n': _auto, |
| 'thread_meshcop.tlv.sec_policy_r': _auto, |
| 'thread_meshcop.tlv.sec_policy_c': _auto, |
| 'thread_meshcop.tlv.sec_policy_b': _auto, |
| 'thread_meshcop.tlv.state': _auto, |
| 'thread_meshcop.tlv.steering_data': _bytes, |
| 'thread_meshcop.tlv.unknown': _bytes, |
| 'thread_meshcop.tlv.udp_port': _list(_auto), |
| 'thread_meshcop.tlv.ba_locator': _auto, |
| 'thread_meshcop.tlv.jr_locator': _auto, |
| 'thread_meshcop.tlv.active_tstamp': _auto, |
| 'thread_meshcop.tlv.pending_tstamp': _auto, |
| 'thread_meshcop.tlv.delay_timer': _auto, |
| 'thread_meshcop.tlv.ipv6_addr': _list(_ipv6_addr), |
| |
| # THREAD NWD |
| 'thread_nwd.tlv.type': _list(_auto), |
| 'thread_nwd.tlv.len': _list(_auto), |
| 'thread_nwd.tlv.stable': _list(_auto), |
| 'thread_nwd.tlv.service.t': _auto, |
| 'thread_nwd.tlv.service.s_id': _auto, |
| 'thread_nwd.tlv.service.s_data_len': _auto, |
| 'thread_nwd.tlv.service.s_data.seqno': _auto, |
| 'thread_nwd.tlv.service.s_data.rrdelay': _auto, |
| 'thread_nwd.tlv.service.s_data.mlrtimeout': _auto, |
| 'thread_nwd.tlv.server_16': _list(_auto), |
| 'thread_nwd.tlv.border_router_16': _list(_auto), |
| 'thread_nwd.tlv.sub_tlvs': _list(_str), |
| # TODO: support thread_nwd.tlv.prefix.length and thread_nwd.tlv.prefix.domain_id |
| 'thread_nwd.tlv.prefix': _list(_ipv6_addr), |
| 'thread_nwd.tlv.border_router.pref': _auto, |
| 'thread_nwd.tlv.border_router.flag.s': _list(_auto), |
| 'thread_nwd.tlv.border_router.flag.r': _list(_auto), |
| 'thread_nwd.tlv.border_router.flag.p': _list(_auto), |
| 'thread_nwd.tlv.border_router.flag.o': _list(_auto), |
| 'thread_nwd.tlv.border_router.flag.n': _list(_auto), |
| 'thread_nwd.tlv.border_router.flag.dp': _list(_auto), |
| 'thread_nwd.tlv.border_router.flag.d': _list(_auto), |
| 'thread_nwd.tlv.border_router.flag.c': _list(_auto), |
| 'thread_nwd.tlv.6co.flag.reserved': _auto, |
| 'thread_nwd.tlv.6co.flag.cid': _auto, |
| 'thread_nwd.tlv.6co.flag.c': _list(_auto), |
| 'thread_nwd.tlv.6co.context_length': _auto, |
| |
| # Thread Diagnostic |
| 'thread_diagnostic.tlv.type': _list(_auto), |
| 'thread_diagnostic.tlv.len8': _list(_auto), |
| 'thread_diagnostic.tlv.general': _list(_str), |
| |
| # DNS |
| 'dns.resp.ttl': _auto, |
| 'dns.flags.response': _auto, |
| } |
| |
| _layer_containers = set() |
| |
| for key in _LAYER_FIELDS: |
| assert key.strip() == key and ' ' not in key, key |
| secs = key.split('.') |
| assert len(secs) >= 2 |
| assert secs[0] in VALID_LAYER_NAMES, secs[0] |
| for i in range(len(secs) - 2): |
| path = secs[0] + '.' + '.'.join(secs[1:i + 2]) |
| assert path not in _LAYER_FIELDS, '%s can not be both field and path' % path |
| _layer_containers.add(path) |
| |
| |
| def is_layer_field(uri: str) -> bool: |
| """ |
| Returns if the URI is a valid layer field. |
| |
| :param uri: The layer field URI. |
| """ |
| return uri in _LAYER_FIELDS |
| |
| |
| def is_layer_field_container(uri: str) -> bool: |
| """ |
| Returns if the URI is a valid layer field container. |
| |
| :param uri: The layer field container URI. |
| """ |
| return uri in _layer_containers |
| |
| |
| def get_layer_field(packet: RawPacket, field_uri: str) -> Any: |
| """ |
| Get a given layer field from the packet. |
| |
| :param packet: The packet. |
| :param field_uri: The layer field URI. |
| |
| :return: The specified layer field. |
| """ |
| assert isinstance(packet, RawPacket) |
| secs = field_uri.split('.') |
| layer_depth = 0 |
| layer_name = secs[0] |
| if layer_name.endswith('inner'): |
| layer_name = layer_name[:-len('inner')] |
| field_uri = '.'.join([layer_name] + secs[1:]) |
| layer_depth = 1 |
| |
| if is_layer_field(field_uri): |
| candidate_layers = _get_candidate_layers(packet, layer_name) |
| for layers in candidate_layers: |
| if layer_depth >= len(layers): |
| continue |
| layer = layers[layer_depth] |
| v = layer.get_field(field_uri) |
| if v is not None: |
| try: |
| v = _LAYER_FIELDS[field_uri](v) |
| print("[%s = %r] " % (field_uri, v), file=sys.stderr) |
| return v |
| except Exception as ex: |
| raise ValueError('can not parse field %s = %r' % (field_uri, |
| (v.get_default_value(), v.raw_value))) from ex |
| |
| print("[%s = %s] " % (field_uri, "null"), file=sys.stderr) |
| return nullField |
| |
| elif is_layer_field_container(field_uri): |
| from pktverify.layer_fields_container import LayerFieldsContainer |
| return LayerFieldsContainer(packet, field_uri) |
| else: |
| raise NotImplementedError('Field %s is not valid, please add it to `_LAYER_FIELDS`' % field_uri) |
| |
| |
| def check_layer_field_exists(packet, field_uri): |
| """ |
| Check if a given layer field URI exists in the packet. |
| |
| :param packet: The packet to check. |
| :param field_uri: The layer field URI. |
| :return: Whether the layer field URI exists in the packet. |
| """ |
| assert isinstance(packet, RawPacket) |
| secs = field_uri.split('.') |
| layer_name = secs[0] |
| |
| if not is_layer_field(field_uri) and not is_layer_field_container(field_uri): |
| raise NotImplementedError('%s is neither a field or field container' % field_uri) |
| |
| candidate_layers = _get_candidate_layers(packet, layer_name) |
| for layers in candidate_layers: |
| for layer in layers: |
| for k, v in layer._all_fields.items(): |
| if k == field_uri or k.startswith(field_uri + '.'): |
| return True |
| |
| return False |
| |
| |
| def _get_candidate_layers(packet, layer_name): |
| if layer_name == 'thread_meshcop': |
| candidate_layer_names = ['thread_meshcop', 'mle', 'coap', 'thread_bl', 'thread_nm'] |
| elif layer_name == 'thread_nwd': |
| candidate_layer_names = ['mle', 'thread_address', 'thread_diagnostic'] |
| elif layer_name == 'wpan': |
| candidate_layer_names = ['wpan', 'mle'] |
| elif layer_name == 'ip': |
| candidate_layer_names = ['ip', 'ipv6'] |
| elif layer_name == 'thread_bcn': |
| candidate_layer_names = ['thread_bcn'] |
| else: |
| candidate_layer_names = [layer_name] |
| |
| layers = [] |
| for ln in candidate_layer_names: |
| if hasattr(packet, ln): |
| layers.append(packet.get_multiple_layers(ln)) |
| |
| return layers |