blob: bd69d352d42a49006b3c3068976cdec8d8a0ae60 [file] [log] [blame]
#!/usr/bin/env python3
#
# Copyright (c) 2019, The OpenThread Authors.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# 3. Neither the name of the copyright holder nor the
# names of its contributors may be used to endorse or promote products
# derived from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
import datetime
import sys
import time
from typing import Any, Union
from pyshark.packet.fields import LayerFieldsContainer, LayerField
from pyshark.packet.packet import Packet as RawPacket
from pktverify.addrs import EthAddr, ExtAddr, Ipv6Addr
from pktverify.bytes import Bytes
from pktverify.consts import VALID_LAYER_NAMES
from pktverify.null_field import nullField
def _auto(v: Union[LayerFieldsContainer, LayerField]):
"""parse the layer field automatically according to its format"""
assert not isinstance(v, LayerFieldsContainer) or len(v.fields) == 1 or v.get_default_value() is not None, v.fields
dv = v.get_default_value()
rv = v.raw_value
if dv.startswith('0x'):
return int(dv, 16)
try:
if dv == rv:
return int(dv)
elif int(dv) == int(rv, 16):
return int(dv)
except (ValueError, TypeError):
pass
if rv is None:
try:
return int(dv)
except (ValueError, TypeError):
pass
if ':' in dv and '::' not in dv and dv.replace(':', '') == rv: # '88:00', '8800'
return int(rv, 16)
# timestamp: 'Jan 1, 1970 08:00:00.000000000 CST', '0000000000000000'
# convert to seconds from 1970, ignore the nanosecond for now since
# there are integer seconds applied in the test cases
try:
time_str = datetime.datetime.strptime(dv, "%b %d, %Y %H:%M:%S.%f000 %Z")
time_in_sec = time.mktime(time_str.utctimetuple())
return int(time_in_sec)
except (ValueError, TypeError):
pass
try:
int(rv, 16)
return int(dv)
except Exception:
pass
raise ValueError((v, v.get_default_value(), v.raw_value))
def _payload(v: Union[LayerFieldsContainer, LayerField]) -> bytearray:
"""parse the layer field as a bytearray"""
assert not isinstance(v, LayerFieldsContainer) or len(v.fields) == 1
hex_value = v.raw_value
assert len(hex_value) % 2 == 0
s = bytearray()
for i in range(0, len(hex_value), 2):
s.append(int(hex_value[i:i + 2], 16))
return s
def _hex(v: Union[LayerFieldsContainer, LayerField]) -> int:
"""parse the layer field as a hex string"""
# split v into octets and reverse the order
assert not isinstance(v, LayerFieldsContainer) or len(v.fields) == 1
return int(v.get_default_value(), 16)
def _raw_hex(v: Union[LayerFieldsContainer, LayerField]) -> int:
"""parse the layer field as a raw hex string"""
# split v into octets and reverse the order
assert not isinstance(v, LayerFieldsContainer) or len(v.fields) == 1
iv = v.hex_value
try:
int(v.get_default_value())
assert int(v.get_default_value()) == iv, (v.get_default_value(), v.raw_value)
except ValueError:
pass
try:
int(v.get_default_value(), 16)
assert int(v.get_default_value(), 16) == iv, (v.get_default_value(), v.raw_value)
except ValueError:
pass
return iv
def _raw_hex_rev(v: Union[LayerFieldsContainer, LayerField]) -> int:
"""parse the layer field as a reversed raw hex string"""
# split v into octets and reverse the order
assert not isinstance(v, LayerFieldsContainer) or len(v.fields) == 1
rv = v.raw_value
octets = [rv[i:i + 2] for i in range(0, len(rv), 2)]
iv = int(''.join(reversed(octets)), 16)
try:
int(v.get_default_value())
assert int(v.get_default_value()) == iv, (v.get_default_value(), v.raw_value)
except ValueError:
pass
try:
int(v.get_default_value(), 16)
assert int(v.get_default_value(), 16) == iv, (v.get_default_value(), v.raw_value)
except ValueError:
pass
return iv
def _dec(v: Union[LayerFieldsContainer, LayerField]) -> int:
"""parse the layer field as a decimal"""
assert not isinstance(v, LayerFieldsContainer) or len(v.fields) == 1
return int(v.get_default_value())
def _float(v: Union[LayerFieldsContainer, LayerField]) -> float:
"""parse the layer field as a float"""
assert not isinstance(v, LayerFieldsContainer) or len(v.fields) == 1
return float(v.get_default_value())
def _str(v: Union[LayerFieldsContainer, LayerField]) -> str:
"""parse the layer field as a string"""
assert not isinstance(v, LayerFieldsContainer) or len(v.fields) == 1
return str(v.get_default_value())
def _bytes(v: Union[LayerFieldsContainer, LayerField]) -> Bytes:
"""parse the layer field as raw bytes"""
assert not isinstance(v, LayerFieldsContainer) or len(v.fields) == 1
return Bytes(v.raw_value)
def _ext_addr(v: Union[LayerFieldsContainer, LayerField]) -> ExtAddr:
"""parse the layer field as an extended address"""
assert not isinstance(v, LayerFieldsContainer) or len(v.fields) == 1
return ExtAddr(v.get_default_value())
def _ipv6_addr(v: Union[LayerFieldsContainer, LayerField]) -> Ipv6Addr:
"""parse the layer field as an IPv6 address"""
assert not isinstance(v, LayerFieldsContainer) or len(v.fields) == 1
return Ipv6Addr(v.get_default_value())
def _eth_addr(v: Union[LayerFieldsContainer, LayerField]) -> EthAddr:
"""parse the layer field as an Ethernet MAC address"""
assert not isinstance(v, LayerFieldsContainer) or len(v.fields) == 1, v.fields
return EthAddr(v.get_default_value())
def _routerid_set(v: Union[LayerFieldsContainer, LayerField]) -> set:
"""parse the layer field as a set of router ids
Notes: the router ID mask in wireshark is a
hexadecimal string separated by ':'
"""
assert not isinstance(v, LayerFieldsContainer) or len(v.fields) == 1
try:
ridmask = str(v.get_default_value())
assert isinstance(ridmask, str), ridmask
ridmask_int = int(ridmask.replace(':', ''), base=16)
rid_set = set()
count = 0
while ridmask_int:
count += 1
if ridmask_int & 1:
rid_set.add(64 - count)
ridmask_int = ridmask_int >> 1
except ValueError:
pass
return rid_set
class _first(object):
"""parse the first layer field"""
def __init__(self, sub_parse):
self._sub_parse = sub_parse
def __call__(self, v: Union[LayerFieldsContainer, LayerField]):
return self._sub_parse(v.fields[0])
class _list(object):
"""parse all layer fields into a list"""
def __init__(self, sub_parse):
self._sub_parse = sub_parse
def __call__(self, v: Union[LayerFieldsContainer, LayerField]):
return [self._sub_parse(f) for f in v.fields]
_LAYER_FIELDS = {
# WPAN
'wpan.fcf': _raw_hex_rev,
'wpan.cmd': _auto,
'wpan.security': _auto,
'wpan.frame_type': _auto,
'wpan.pending': _auto,
'wpan.ack_request': _auto,
'wpan.pan_id_compression': _auto,
'wpan.seqno_suppression': _auto,
'wpan.ie_present': _auto,
'wpan.dst_addr_mode': _auto,
'wpan.version': _auto,
'wpan.src_addr_mode': _auto,
'wpan.dst_pan': _auto,
'wpan.seq_no': _auto,
'wpan.src16': _auto,
'wpan.dst16': _auto,
'wpan.src64': _ext_addr,
'wpan.dst64': _ext_addr,
'wpan.fcs': _raw_hex_rev,
'wpan.fcs_ok': _auto,
'wpan.frame_length': _dec,
'wpan.key_number': _auto,
'wpan.aux_sec.sec_suite': _auto,
'wpan.aux_sec.security_control_field': _auto,
'wpan.aux_sec.sec_level': _auto,
'wpan.aux_sec.key_id_mode': _auto,
'wpan.aux_sec.frame_counter_suppression': _auto,
'wpan.aux_sec.asn_in_nonce': _auto,
'wpan.aux_sec.reserved': _auto,
'wpan.aux_sec.frame_counter': _auto,
'wpan.aux_sec.key_source': _auto,
'wpan.aux_sec.key_index': _auto,
'wpan.aux_sec.hdr': _str,
'wpan.mic': _auto,
'wpan.channel': _auto,
'wpan.header_ie.id': _list(_auto),
'wpan.header_ie.csl.period': _auto,
'wpan.payload_ie.vendor.oui': _auto,
# MLE
'mle.cmd': _auto,
'mle.sec_suite': _hex,
'mle.tlv.type': _list(_dec),
'mle.tlv.len': _list(_dec),
'mle.tlv.mode.receiver_on_idle': _auto,
'mle.tlv.mode.reserved1': _auto,
'mle.tlv.mode.reserved2': _auto,
'mle.tlv.mode.device_type_bit': _auto,
'mle.tlv.mode.network_data': _auto,
'mle.tlv.challenge': _bytes,
'mle.tlv.scan_mask.r': _auto,
'mle.tlv.scan_mask.e': _auto,
'mle.tlv.version': _auto,
'mle.tlv.source_addr': _auto,
'mle.tlv.active_tstamp': _auto,
'mle.tlv.pending_tstamp': _auto,
'mle.tlv.leader_data.partition_id': _auto,
'mle.tlv.leader_data.weighting': _auto,
'mle.tlv.leader_data.data_version': _auto,
'mle.tlv.leader_data.stable_data_version': _auto,
'mle.tlv.leader_data.router_id': _auto,
'mle.tlv.route64.nbr_out': _list(_auto),
'mle.tlv.route64.nbr_in': _list(_auto),
'mle.tlv.route64.id_seq': _auto,
'mle.tlv.route64.id_mask': _routerid_set,
'mle.tlv.route64.cost': _list(_auto),
'mle.tlv.response': _bytes,
'mle.tlv.mle_frm_cntr': _auto,
'mle.tlv.ll_frm_cntr': _auto,
'mle.tlv.link_margin': _auto,
'mle.tlv.conn.sed_dgram_cnt': _auto,
'mle.tlv.conn.sed_buf_size': _auto,
'mle.tlv.conn.lq3': _auto,
'mle.tlv.conn.lq2': _auto,
'mle.tlv.conn.lq1': _auto,
'mle.tlv.conn.leader_cost': _auto,
'mle.tlv.conn.id_seq': _auto,
'mle.tlv.conn.flags.pp': _auto,
'mle.tlv.conn.active_rtrs': _auto,
'mle.tlv.timeout': _auto,
'mle.tlv.addr16': _auto,
'mle.tlv.channel': _auto,
'mle.tlv.addr_reg_iid': _list(_auto),
'mle.tlv.link_enh_ack_flags': _auto,
'mle.tlv.link_forward_series': _list(_auto),
'mle.tlv.link_requested_type_id_flags': _list(_hex),
'mle.tlv.link_sub_tlv': _auto,
'mle.tlv.link_status_sub_tlv': _auto,
'mle.tlv.query_id': _auto,
'mle.tlv.metric_type_id_flags.type': _list(_hex),
'mle.tlv.metric_type_id_flags.metric': _list(_hex),
'mle.tlv.metric_type_id_flags.l': _list(_hex),
'mle.tlv.link_requested_type_id_flags': _bytes,
# IP
'ip.version': _auto,
'ip.src': _str,
'ip.src_host': _str,
'ip.dst': _str,
'ip.dst_host': _str,
'ip.ttl': _auto,
'ip.proto': _auto,
'ip.len': _auto,
'ip.id': _auto,
'ip.host': _list(_str),
'ip.hdr_len': _dec,
'ip.frag_offset': _auto,
'ip.flags.rb': _auto,
'ip.flags.mf': _auto,
'ip.flags.df': _auto,
'ip.dsfield.ecn': _auto,
'ip.dsfield.dscp': _auto,
'ip.checksum.status': _auto,
'ip.addr': _list(_str),
'ip.options.routeralert': _bytes,
'ip.opt.type.number': _auto,
'ip.opt.type.copy': _auto,
'ip.opt.type.class': _auto,
'ip.opt.ra': _auto,
'ip.opt.len': _auto,
# UDP
'udp.stream': _auto,
'udp.srcport': _auto,
'udp.dstport': _auto,
'udp.length': _auto,
'udp.port': _list(_dec),
'udp.checksum.status': _auto,
# IPv6
'ipv6.version': _auto,
'ipv6.src': _ipv6_addr,
'ipv6.src_host': _ipv6_addr,
'ipv6.dst': _ipv6_addr,
'ipv6.dst_host': _ipv6_addr,
'ipv6.addr': _list(_ipv6_addr),
'ipv6.tclass.dscp': _auto,
'ipv6.tclass.ecn': _auto,
'ipv6.flow': _auto,
'ipv6.hlim': _auto,
'ipv6.nxt': _auto,
'ipv6.hopopts.len': _auto,
'ipv6.hopopts.nxt': _auto,
'ipv6.hopopts.len_oct': _dec,
'ipv6.host': _list(_ipv6_addr),
'ipv6.plen': _auto,
'ipv6.opt.type.rest': _list(_auto),
'ipv6.opt.type.change': _list(_auto),
'ipv6.opt.type.action': _list(_auto),
'ipv6.opt.router_alert': _auto,
'ipv6.opt.padn': _str,
'ipv6.opt.length': _list(_auto),
'ipv6.opt.mpl.seed_id': _bytes,
'ipv6.opt.mpl.sequence': _auto,
'ipv6.opt.mpl.flag.v': _auto,
'ipv6.opt.mpl.flag.s': _auto,
'ipv6.opt.mpl.flag.rsv': _auto,
'ipv6.opt.mpl.flag.m': _auto,
# Eth
'eth.src': _eth_addr,
'eth.src_resolved': _eth_addr,
'eth.dst': _eth_addr,
'eth.dst_resolved': _eth_addr,
'eth.type': _auto,
'eth.addr': _list(_eth_addr),
'eth.addr_resolved': _list(_eth_addr),
'eth.ig': _list(_auto),
'eth.lg': _list(_auto),
# 6LOWPAN
'6lowpan.src': _ipv6_addr,
'6lowpan.dst': _ipv6_addr,
'6lowpan.udp.src': _auto,
'6lowpan.udp.dst': _auto,
'6lowpan.udp.checksum': _auto,
'6lowpan.frag.offset': _auto,
'6lowpan.frag.tag': _auto,
'6lowpan.frag.size': _auto,
'6lowpan.pattern': _list(_auto),
'6lowpan.hops': _auto,
'6lowpan.padding': _auto,
'6lowpan.next': _auto,
'6lowpan.flow': _auto,
'6lowpan.ecn': _auto,
'6lowpan.iphc.tf': _auto,
'6lowpan.iphc.m': _auto,
'6lowpan.iphc.nh': _auto,
'6lowpan.iphc.hlim': _auto,
'6lowpan.iphc.cid': _auto,
'6lowpan.iphc.sac': _auto,
'6lowpan.iphc.sam': _auto,
'6lowpan.iphc.dac': _auto,
'6lowpan.iphc.dam': _auto,
'6lowpan.iphc.sci': _auto,
'6lowpan.iphc.dci': _auto,
'6lowpan.iphc.sctx.prefix': _ipv6_addr,
'6lowpan.iphc.dctx.prefix': _ipv6_addr,
'6lowpan.mesh.v': _auto,
'6lowpan.nhc.pattern': _list(_auto),
'6lowpan.nhc.udp.checksum': _auto,
'6lowpan.nhc.udp.ports': _auto,
'6lowpan.nhc.ext.nh': _auto,
'6lowpan.nhc.ext.length': _auto,
'6lowpan.nhc.ext.eid': _auto,
'6lowpan.reassembled.length': _auto,
'6lowpan.fragments': _str,
'6lowpan.fragment.count': _auto,
'6lowpan.mesh.orig16': _auto,
'6lowpan.mesh.hops8': _auto,
'6lowpan.mesh.hops': _auto,
'6lowpan.mesh.f': _auto,
'6lowpan.mesh.dest16': _auto,
# ICMPv6
'icmpv6.type': _first(_auto),
'icmpv6.code': _first(_auto),
'icmpv6.checksum': _first(_auto),
'icmpv6.reserved': _raw_hex,
'icmpv6.resptime': _float,
'icmpv6.resp_to': _auto,
'icmpv6.mldr.nb_mcast_records': _auto,
'icmpv6.nd.ra.cur_hop_limit': _auto,
'icmpv6.nd.ns.target_address': _ipv6_addr,
'icmpv6.nd.na.target_address': _ipv6_addr,
'icmpv6.nd.na.flag.s': _auto,
'icmpv6.nd.na.flag.o': _auto,
'icmpv6.nd.na.flag.r': _auto,
'icmpv6.nd.na.flag.rsv': _auto,
'icmpv6.mldr.mar.record_type': _list(_auto),
'icmpv6.mldr.mar.aux_data_len': _list(_auto),
'icmpv6.mldr.mar.nb_sources': _list(_auto),
'icmpv6.mldr.mar.multicast_address': _list(_ipv6_addr),
'icmpv6.opt.type': _list(_auto),
'icmpv6.opt.nonce': _bytes,
'icmpv6.opt.linkaddr': _eth_addr,
'icmpv6.opt.src_linkaddr': _eth_addr,
'icmpv6.opt.target_linkaddr': _eth_addr,
'icmpv6.opt.route_lifetime': _auto,
'icmpv6.opt.route_info.flag.route_preference': _auto,
'icmpv6.opt.route_info.flag.reserved': _auto,
'icmpv6.opt.prefix.valid_lifetime': _auto,
'icmpv6.opt.prefix.preferred_lifetime': _auto,
'icmpv6.opt.prefix.length': _list(_auto),
'icmpv6.opt.prefix.flag.reserved': _auto,
'icmpv6.opt.prefix.flag.r': _auto,
'icmpv6.opt.prefix.flag.l': _auto,
'icmpv6.opt.prefix.flag.a': _auto,
'icmpv6.opt.length': _list(_auto),
'icmpv6.opt.reserved': _str,
'icmpv6.nd.ra.router_lifetime': _auto,
'icmpv6.nd.ra.retrans_timer': _auto,
'icmpv6.nd.ra.reachable_time': _auto,
'icmpv6.nd.ra.flag.rsv': _auto,
'icmpv6.nd.ra.flag.prf': _auto,
'icmpv6.nd.ra.flag.p': _auto,
'icmpv6.nd.ra.flag.o': _auto,
'icmpv6.nd.ra.flag.m': _auto,
'icmpv6.nd.ra.flag.h': _auto,
'icmpv6.echo.sequence_number': _auto,
'icmpv6.echo.identifier': _auto,
'icmpv6.data.len': _auto,
# COAP
'coap.code': _auto,
'coap.version': _auto,
'coap.type': _auto,
'coap.mid': _auto,
'coap.token_len': _auto,
'coap.token': _auto,
'coap.opt.uri_path': _list(_str),
'coap.opt.name': _list(_str),
'coap.opt.length': _list(_auto),
'coap.opt.uri_path_recon': _str,
'coap.payload': _payload,
'coap.payload_length': _auto,
'coap.payload_desc': _str,
'coap.opt.end_marker': _auto,
'coap.opt.desc': _list(_str),
'coap.opt.delta': _list(_auto),
'coap.response_to': _auto,
'coap.response_time': _float,
# COAP TLVS
'coap.tlv.type': _list(_auto),
'coap.tlv.status': _auto,
'coap.tlv.target_eid': _ipv6_addr,
'coap.tlv.ml_eid': _ext_addr,
'coap.tlv.last_transaction_time': _auto,
'coap.tlv.rloc16': _auto,
'coap.tlv.net_name': _str,
'coap.tlv.ext_mac_addr': _ext_addr,
'coap.tlv.router_mask_assigned': _auto,
'coap.tlv.router_mask_id_seq': _auto,
# dtls
'dtls.handshake.type': _list(_auto),
'dtls.handshake.cookie': _auto,
'dtls.record.content_type': _list(_auto),
'dtls.alert_message.desc': _auto,
# thread beacon
'thread_bcn.protocol': _auto,
'thread_bcn.version': _auto,
'thread_bcn.network_name': _str,
'thread_bcn.epid': _ext_addr,
# thread_address
'thread_address.tlv.len': _list(_auto),
'thread_address.tlv.type': _list(_auto),
'thread_address.tlv.status': _auto,
'thread_address.tlv.target_eid': _ipv6_addr,
'thread_address.tlv.ext_mac_addr': _ext_addr,
'thread_address.tlv.router_mask_id_seq': _auto,
'thread_address.tlv.router_mask_assigned': _bytes,
'thread_address.tlv.rloc16': _hex,
'thread_address.tlv.target_eid': _ipv6_addr,
'thread_address.tlv.ml_eid': _ext_addr,
# thread bl
'thread_bl.tlv.type': _list(_auto),
'thread_bl.tlv.len': _list(_auto),
'thread_bl.tlv.target_eid': _ipv6_addr,
'thread_bl.tlv.ml_eid': _ext_addr,
'thread_bl.tlv.last_transaction_time': _auto,
'thread_bl.tlv.timeout': _auto,
# THEAD NM
'thread_nm.tlv.type': _list(_auto),
'thread_nm.tlv.ml_eid': _ext_addr,
'thread_nm.tlv.target_eid': _ipv6_addr,
'thread_nm.tlv.status': _auto,
'thread_nm.tlv.timeout': _auto,
# thread_meshcop is not a real layer
'thread_meshcop.len_size_mismatch': _str,
'thread_meshcop.tlv.type': _list(_auto),
'thread_meshcop.tlv.len8': _list(_auto),
'thread_meshcop.tlv.net_name': _list(_str), # from thread_bl
'thread_meshcop.tlv.commissioner_id': _str,
'thread_meshcop.tlv.commissioner_sess_id': _auto, # from mle
"thread_meshcop.tlv.channel_page": _auto, # from ble
"thread_meshcop.tlv.channel": _list(_auto), # from ble
"thread_meshcop.tlv.chan_mask": _str, # from ble
'thread_meshcop.tlv.chan_mask_page': _auto,
'thread_meshcop.tlv.chan_mask_len': _auto,
'thread_meshcop.tlv.chan_mask_mask': _bytes,
'thread_meshcop.tlv.discovery_req_ver': _auto,
'thread_meshcop.tlv.discovery_rsp_ver': _auto,
'thread_meshcop.tlv.discovery_rsp_n': _auto,
'thread_meshcop.tlv.energy_list': _list(_auto),
'thread_meshcop.tlv.pan_id': _list(_auto),
'thread_meshcop.tlv.xpan_id': _bytes,
'thread_meshcop.tlv.ml_prefix': _bytes,
'thread_meshcop.tlv.master_key': _bytes,
'thread_meshcop.tlv.pskc': _bytes,
'thread_meshcop.tlv.sec_policy_rot': _auto,
'thread_meshcop.tlv.sec_policy_o': _auto,
'thread_meshcop.tlv.sec_policy_n': _auto,
'thread_meshcop.tlv.sec_policy_r': _auto,
'thread_meshcop.tlv.sec_policy_c': _auto,
'thread_meshcop.tlv.sec_policy_b': _auto,
'thread_meshcop.tlv.state': _auto,
'thread_meshcop.tlv.steering_data': _bytes,
'thread_meshcop.tlv.unknown': _bytes,
'thread_meshcop.tlv.udp_port': _list(_auto),
'thread_meshcop.tlv.ba_locator': _auto,
'thread_meshcop.tlv.jr_locator': _auto,
'thread_meshcop.tlv.active_tstamp': _auto,
'thread_meshcop.tlv.pending_tstamp': _auto,
'thread_meshcop.tlv.delay_timer': _auto,
'thread_meshcop.tlv.ipv6_addr': _list(_ipv6_addr),
# THREAD NWD
'thread_nwd.tlv.type': _list(_auto),
'thread_nwd.tlv.len': _list(_auto),
'thread_nwd.tlv.stable': _list(_auto),
'thread_nwd.tlv.service.t': _auto,
'thread_nwd.tlv.service.s_id': _auto,
'thread_nwd.tlv.service.s_data_len': _auto,
'thread_nwd.tlv.service.s_data.seqno': _auto,
'thread_nwd.tlv.service.s_data.rrdelay': _auto,
'thread_nwd.tlv.service.s_data.mlrtimeout': _auto,
'thread_nwd.tlv.server_16': _list(_auto),
'thread_nwd.tlv.border_router_16': _list(_auto),
'thread_nwd.tlv.sub_tlvs': _list(_str),
# TODO: support thread_nwd.tlv.prefix.length and thread_nwd.tlv.prefix.domain_id
'thread_nwd.tlv.prefix': _list(_ipv6_addr),
'thread_nwd.tlv.border_router.pref': _auto,
'thread_nwd.tlv.border_router.flag.s': _list(_auto),
'thread_nwd.tlv.border_router.flag.r': _list(_auto),
'thread_nwd.tlv.border_router.flag.p': _list(_auto),
'thread_nwd.tlv.border_router.flag.o': _list(_auto),
'thread_nwd.tlv.border_router.flag.n': _list(_auto),
'thread_nwd.tlv.border_router.flag.dp': _list(_auto),
'thread_nwd.tlv.border_router.flag.d': _list(_auto),
'thread_nwd.tlv.border_router.flag.c': _list(_auto),
'thread_nwd.tlv.6co.flag.reserved': _auto,
'thread_nwd.tlv.6co.flag.cid': _auto,
'thread_nwd.tlv.6co.flag.c': _list(_auto),
'thread_nwd.tlv.6co.context_length': _auto,
# Thread Diagnostic
'thread_diagnostic.tlv.type': _list(_auto),
'thread_diagnostic.tlv.len8': _list(_auto),
'thread_diagnostic.tlv.general': _list(_str),
# DNS
'dns.resp.ttl': _auto,
'dns.flags.response': _auto,
}
_layer_containers = set()
for key in _LAYER_FIELDS:
assert key.strip() == key and ' ' not in key, key
secs = key.split('.')
assert len(secs) >= 2
assert secs[0] in VALID_LAYER_NAMES, secs[0]
for i in range(len(secs) - 2):
path = secs[0] + '.' + '.'.join(secs[1:i + 2])
assert path not in _LAYER_FIELDS, '%s can not be both field and path' % path
_layer_containers.add(path)
def is_layer_field(uri: str) -> bool:
"""
Returns if the URI is a valid layer field.
:param uri: The layer field URI.
"""
return uri in _LAYER_FIELDS
def is_layer_field_container(uri: str) -> bool:
"""
Returns if the URI is a valid layer field container.
:param uri: The layer field container URI.
"""
return uri in _layer_containers
def get_layer_field(packet: RawPacket, field_uri: str) -> Any:
"""
Get a given layer field from the packet.
:param packet: The packet.
:param field_uri: The layer field URI.
:return: The specified layer field.
"""
assert isinstance(packet, RawPacket)
secs = field_uri.split('.')
layer_depth = 0
layer_name = secs[0]
if layer_name.endswith('inner'):
layer_name = layer_name[:-len('inner')]
field_uri = '.'.join([layer_name] + secs[1:])
layer_depth = 1
if is_layer_field(field_uri):
candidate_layers = _get_candidate_layers(packet, layer_name)
for layers in candidate_layers:
if layer_depth >= len(layers):
continue
layer = layers[layer_depth]
v = layer.get_field(field_uri)
if v is not None:
try:
v = _LAYER_FIELDS[field_uri](v)
print("[%s = %r] " % (field_uri, v), file=sys.stderr)
return v
except Exception as ex:
raise ValueError('can not parse field %s = %r' % (field_uri,
(v.get_default_value(), v.raw_value))) from ex
print("[%s = %s] " % (field_uri, "null"), file=sys.stderr)
return nullField
elif is_layer_field_container(field_uri):
from pktverify.layer_fields_container import LayerFieldsContainer
return LayerFieldsContainer(packet, field_uri)
else:
raise NotImplementedError('Field %s is not valid, please add it to `_LAYER_FIELDS`' % field_uri)
def check_layer_field_exists(packet, field_uri):
"""
Check if a given layer field URI exists in the packet.
:param packet: The packet to check.
:param field_uri: The layer field URI.
:return: Whether the layer field URI exists in the packet.
"""
assert isinstance(packet, RawPacket)
secs = field_uri.split('.')
layer_name = secs[0]
if not is_layer_field(field_uri) and not is_layer_field_container(field_uri):
raise NotImplementedError('%s is neither a field or field container' % field_uri)
candidate_layers = _get_candidate_layers(packet, layer_name)
for layers in candidate_layers:
for layer in layers:
for k, v in layer._all_fields.items():
if k == field_uri or k.startswith(field_uri + '.'):
return True
return False
def _get_candidate_layers(packet, layer_name):
if layer_name == 'thread_meshcop':
candidate_layer_names = ['thread_meshcop', 'mle', 'coap', 'thread_bl', 'thread_nm']
elif layer_name == 'thread_nwd':
candidate_layer_names = ['mle', 'thread_address', 'thread_diagnostic']
elif layer_name == 'wpan':
candidate_layer_names = ['wpan', 'mle']
elif layer_name == 'ip':
candidate_layer_names = ['ip', 'ipv6']
elif layer_name == 'thread_bcn':
candidate_layer_names = ['thread_bcn']
else:
candidate_layer_names = [layer_name]
layers = []
for ln in candidate_layer_names:
if hasattr(packet, ln):
layers.append(packet.get_multiple_layers(ln))
return layers