blob: 896226e81bc95744458ae64e2b9838e6e91e73c8 [file] [log] [blame]
#!/usr/bin/env python3
#
# Copyright (c) 2019, The OpenThread Authors.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# 3. Neither the name of the copyright holder nor the
# names of its contributors may be used to endorse or promote products
# derived from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
# This is a test script for checking layer fields against a given test.pcap.
#
import logging
import unittest
from pktverify import layer_fields
from pktverify.addrs import EthAddr, ExtAddr, Ipv6Addr
from pktverify.bytes import Bytes
from pktverify.consts import REAL_LAYER_NAMES, VALID_LAYER_NAMES
from pktverify.layer_fields_container import LayerFieldsContainer
from pktverify.null_field import nullField
from pktverify.packet import Packet
from pktverify.pcap_reader import PcapReader
class TestLayerFields(unittest.TestCase):
def test(self):
logging.basicConfig(level=logging.DEBUG)
pkts = PcapReader.read("test.pcap")
for i, p in enumerate(pkts):
logging.info("check packet #%d", i + 1)
for layer_name in VALID_LAYER_NAMES:
if layer_name == 'lowpan': # we already checked 6lowpan
continue
layer = getattr(p, layer_name)
if hasattr(p._packet, layer_name):
if layer_name in REAL_LAYER_NAMES:
self.assertTrue(layer)
checker = getattr(self, '_test_' + layer_name, None)
if checker is None:
continue
try:
checker(p)
except Exception:
layer.show()
raise
else:
if layer_name in REAL_LAYER_NAMES:
self.assertFalse(layer)
for layer in p._packet.layers:
self._check_missing_fields(p, layer.layer_name, layer)
def _test_coap(self, p):
coap = p.coap
self.assertIsInstance(coap.version, int)
self.assertIsInstance(coap.type, int)
self.assertIsInstance(coap.token_len, int)
self.assertIsInstance(coap.code, int)
self.assertIsInstance(coap.mid, int)
self.assertIsInstanceOrNull(coap.token, int)
self.assertIsInstanceOrNull(coap.opt.uri_path_recon, str)
self.assertIsInstanceOrNull(coap.payload, bytearray)
print(p.coap.tlv.type, p.coap.tlv)
assert isinstance(coap.tlv, LayerFieldsContainer), repr(coap.tlv)
self.assertIsInstanceOrNull(coap.tlv.type, list)
def _test_mle(self, p):
mle = p.mle
self._must_have_wpan_aux_sec(p)
self.assertIsInstance(mle.cmd, int)
self.assertIsInstanceOrNull(mle.tlv.mode.receiver_on_idle, int)
self.assertIsInstanceOrNull(mle.tlv.mode.reserved1, int)
self.assertIsInstanceOrNull(mle.tlv.mode.reserved2, int)
self.assertIsInstanceOrNull(mle.tlv.mode.device_type_bit, int)
self.assertIsInstanceOrNull(mle.tlv.mode.network_data, int)
self.assertIsInstanceOrNull(mle.tlv.challenge, Bytes)
self.assertIsInstanceOrNull(mle.tlv.scan_mask.r, int)
self.assertIsInstanceOrNull(mle.tlv.scan_mask.e, int)
self.assertIsInstanceOrNull(mle.tlv.version, int)
self.assertIsInstanceOrNull(mle.tlv.source_addr, int)
self.assertIsInstanceOrNull(mle.tlv.active_tstamp, int)
self.assertIsInstanceOrNull(mle.tlv.leader_data.partition_id, int)
self.assertIsInstanceOrNull(mle.tlv.leader_data.weighting, int)
self.assertIsInstanceOrNull(mle.tlv.leader_data.data_version, int)
self.assertIsInstanceOrNull(mle.tlv.leader_data.stable_data_version, int)
self.assertIsInstanceOrNull(mle.tlv.leader_data.router_id, int)
def _test_wpan(self, p):
wpan = p.wpan
self.assertIsInstance(wpan.fcf, int)
self.assertIsInstance(wpan.fcs, int)
self.assertIsInstance(wpan.security, int)
self.assertIsInstance(wpan.pending, int)
self.assertIsInstance(wpan.ack_request, int)
self.assertIsInstance(wpan.pan_id_compression, int)
self.assertIsInstance(wpan.seqno_suppression, int)
self.assertIsInstance(wpan.ie_present, int)
self.assertIsInstance(wpan.dst_addr_mode, int)
self.assertIsInstance(wpan.version, int)
self.assertIsInstance(wpan.src_addr_mode, int)
self.assertIsInstance(wpan.seq_no, int)
if not wpan.is_ack:
self.assertIsInstanceOrNull(wpan.dst_pan, int)
self.assertIsInstanceOrNull(wpan.dst16, int)
self.assertIsInstanceOrNull(wpan.src16, int)
self.assertIsInstanceOrNull(wpan.src64, ExtAddr)
self.assertIsInstanceOrNull(wpan.dst64, ExtAddr)
if wpan.aux_sec:
self._must_have_wpan_aux_sec(p)
def _must_have_wpan_aux_sec(self, p):
wpan = p.wpan
self.assertIsInstanceOrNull(wpan.aux_sec.sec_suite, int)
self.assertIsInstanceOrNull(wpan.aux_sec.security_control_field, int)
self.assertIsInstanceOrNull(wpan.aux_sec.sec_level, int)
self.assertIsInstanceOrNull(wpan.aux_sec.key_id_mode, int)
self.assertIsInstanceOrNull(wpan.aux_sec.frame_counter_suppression, int)
self.assertIsInstanceOrNull(wpan.aux_sec.asn_in_nonce, int)
self.assertIsInstanceOrNull(wpan.aux_sec.reserved, int)
self.assertIsInstanceOrNull(wpan.aux_sec.frame_counter, int)
self.assertIsInstanceOrNull(wpan.aux_sec.key_source, int)
self.assertIsInstanceOrNull(wpan.aux_sec.key_index, int)
def assertIsInstanceOrNull(self, field, type):
if field is not nullField:
self.assertIsInstance(field, type)
def _test_thread_bl(self, p):
thread_bl = p.thread_bl
self.assertTrue(thread_bl)
self.assertIsInstanceOrNull(thread_bl.tlv.target_eid, Ipv6Addr)
self.assertIsInstanceOrNull(thread_bl.tlv.ml_eid, ExtAddr)
self.assertIsInstanceOrNull(thread_bl.tlv.last_transaction_time, int)
self.assertIsInstanceOrNull(p.thread_meshcop.tlv.net_name, list)
def _test_thread_meshcop(self, p: Packet):
thread_meshcop = p.thread_meshcop
for layer in sorted(p.layers, key=lambda l: l.layer_name):
if 'thread_meshcop.tlv.commissioner_sess_id' in layer._layer._all_fields:
self.assertIsInstance(thread_meshcop.tlv.commissioner_sess_id, int)
if 'thread_meshcop.tlv.net_name' in layer._layer._all_fields:
self.assertIsInstance(thread_meshcop.tlv.net_name, list)
if 'thread_meshcop.tlv.channel_page' in layer._layer._all_fields:
self.assertIsInstance(thread_meshcop.tlv.channel_page, int)
if 'thread_meshcop.tlv.channel' in layer._layer._all_fields:
self.assertIsInstance(thread_meshcop.tlv.channel, list)
if 'thread_meshcop.tlv.chan_mask_page' in layer._layer._all_fields:
self.assertIsInstance(thread_meshcop.tlv.chan_mask_page, int)
if 'thread_meshcop.tlv.chan_mask_len' in layer._layer._all_fields:
self.assertIsInstance(thread_meshcop.tlv.chan_mask_len, int)
if 'thread_meshcop.tlv.chan_mask_mask' in layer._layer._all_fields:
self.assertIsInstance(thread_meshcop.tlv.chan_mask_mask, int)
if 'thread_meshcop.tlv.panid' in layer._layer._all_fields:
self.assertIsInstance(thread_meshcop.tlv.panid, int)
if 'thread_meshcop.tlv.ml_prefix' in layer._layer._all_fields:
self.assertIsInstance(thread_meshcop.tlv.ml_prefix, Bytes)
if 'thread_meshcop.tlv.master_key' in layer._layer._all_fields:
self.assertIsInstance(thread_meshcop.tlv.master_key, Bytes)
if 'thread_meshcop.tlv.pskc' in layer._layer._all_fields:
self.assertIsInstance(thread_meshcop.tlv.pskc, Bytes)
if 'thread_meshcop.tlv.sec_policy_rot' in layer._layer._all_fields:
self.assertIsInstance(thread_meshcop.tlv.sec_policy_rot, int)
if 'thread_meshcop.tlv.sec_policy_o' in layer._layer._all_fields:
self.assertIsInstance(thread_meshcop.tlv.sec_policy_o, int)
if 'thread_meshcop.tlv.sec_policy_n' in layer._layer._all_fields:
self.assertIsInstance(thread_meshcop.tlv.sec_policy_n, int)
if 'thread_meshcop.tlv.sec_policy_r' in layer._layer._all_fields:
self.assertIsInstance(thread_meshcop.tlv.sec_policy_r, int)
if 'thread_meshcop.tlv.sec_policy_c' in layer._layer._all_fields:
self.assertIsInstance(thread_meshcop.tlv.sec_policy_c, int)
if 'thread_meshcop.tlv.sec_policy_b' in layer._layer._all_fields:
self.assertIsInstance(thread_meshcop.tlv.sec_policy_b, int)
if 'thread_meshcop.tlv.pan_id' in layer._layer._all_fields:
self.assertIsInstance(thread_meshcop.tlv.pan_id, list)
if 'thread_meshcop.tlv.xpan_id' in layer._layer._all_fields:
self.assertIsInstance(thread_meshcop.tlv.xpan_id, Bytes)
for field in layer._layer._all_fields:
if field.startswith('thread_meshcop') and not layer_fields.is_layer_field(field) and field not in (
'thread_meshcop.tlv', 'thread_meshcop.tlv.type', 'thread_meshcop.tlv.len8'):
print('found %s = %s in layer %s' % (
field,
layer._layer.get_field(field),
layer.layer_name,
))
def _test_icmpv6(self, p):
icmpv6 = p.icmpv6
self.assertTrue(p.icmpv6)
self.assertIsInstance(icmpv6.type, int)
self.assertIsInstance(icmpv6.code, int)
self.assertIsInstance(icmpv6.checksum, int)
self.assertIsInstanceOrNull(icmpv6.reserved, int)
self.assertIsInstanceOrNull(icmpv6.nd.na.flag.s, int)
self.assertIsInstanceOrNull(icmpv6.nd.na.flag.o, int)
self.assertIsInstanceOrNull(icmpv6.nd.na.flag.r, int)
self.assertIsInstanceOrNull(icmpv6.nd.na.flag.rsv, int)
self.assertIsInstanceOrNull(icmpv6.nd.ra.cur_hop_limit, int)
self.assertIsInstanceOrNull(icmpv6.mldr.nb_mcast_records, int)
self.assertIsInstanceOrNull(icmpv6.nd.ns.target_address, Ipv6Addr)
self.assertIsInstanceOrNull(icmpv6.mldr.mar.multicast_address, list)
def get_field(self, p: Packet, f):
secs = f.split('.')
assert len(secs) >= 2
v = p
for sec in secs:
v = getattr(v, sec)
return v
def _test_6lowpan(self, p):
lowpan = p.lowpan
assert lowpan is getattr(p, '6lowpan')
self.assertIsInstanceOrNull(lowpan.src, Ipv6Addr)
self.assertIsInstanceOrNull(lowpan.dst, Ipv6Addr)
self.assertIsInstanceOrNull(lowpan.udp.src, int)
self.assertIsInstanceOrNull(lowpan.udp.dst, int)
self.assertIsInstanceOrNull(lowpan.udp.checksum, int)
self.assertIsInstanceOrNull(lowpan.frag.size, int)
self.assertIsInstanceOrNull(lowpan.frag.tag, int)
self.assertIsInstanceOrNull(lowpan.frag.offset, int)
self.assertIsInstanceOrNull(lowpan.nhc.pattern, list)
self.assertIsInstanceOrNull(lowpan.nhc.udp.checksum, int)
self.assertIsInstanceOrNull(lowpan.nhc.udp.ports, int)
self.assertIsInstanceOrNull(lowpan.pattern, list)
self.assertIsInstanceOrNull(lowpan.iphc.tf, int)
self.assertIsInstanceOrNull(lowpan.iphc.nh, int)
self.assertIsInstanceOrNull(lowpan.iphc.hlim, int)
self.assertIsInstanceOrNull(lowpan.iphc.cid, int)
self.assertIsInstanceOrNull(lowpan.iphc.sac, int)
self.assertIsInstanceOrNull(lowpan.iphc.sam, int)
self.assertIsInstanceOrNull(lowpan.iphc.m, int)
self.assertIsInstanceOrNull(lowpan.iphc.dac, int)
self.assertIsInstanceOrNull(lowpan.iphc.dam, int)
self.assertIsInstanceOrNull(lowpan.iphc.sctx.prefix, Bytes)
self.assertIsInstanceOrNull(lowpan.iphc.dctx.prefix, Bytes)
def _test_ip(self, p):
pass
def _test_ipv6(self, p):
pass
def _test_udp(self, p):
pass
def _test_eth(self, p):
eth = p.eth
self.assertIsInstance(eth.src, EthAddr)
self.assertIsInstance(eth.dst, EthAddr)
self.assertIsInstance(eth.type, int)
def _check_missing_fields(self, p, layer_name, _layer):
for f in sorted(_layer._all_fields.keys(), reverse=True):
if f.startswith('_ws') or f.startswith('data'):
continue
logging.info('_check_missing_fields in layer %s: %s = %r' % (layer_name, f, _layer._all_fields[f]))
if f in {
'', 'icmpv6.checksum.status', 'ip.ttl.lncb', 'wpan.aux_sec.key_source.bytes', 'wpan.src64.origin'
}:
# TODO: handle these fields
continue
v = _layer._all_fields[f]
if layer_fields.is_layer_field_container(f):
continue
try:
rv = self.get_field(p, f)
self.assertIsNot(rv, nullField)
parser = layer_fields._LAYER_FIELDS[f]
if isinstance(parser, layer_fields._first):
parser = parser._sub_parse
if parser in (layer_fields._raw_hex, layer_fields._hex, layer_fields._raw_hex_rev, layer_fields._dec,
layer_fields._auto):
self.assertIsInstance(rv, int)
elif isinstance(parser, layer_fields._list):
self.assertIsInstance(rv, list)
elif parser is layer_fields._ipv6_addr:
self.assertIsInstance(rv, Ipv6Addr)
elif parser is layer_fields._eth_addr:
self.assertIsInstance(rv, EthAddr)
elif parser is layer_fields._ext_addr:
self.assertIsInstance(rv, ExtAddr)
elif parser is layer_fields._str:
self.assertIsInstance(rv, str)
elif parser is layer_fields._bytes:
self.assertIsInstance(rv, Bytes)
elif parser is layer_fields._payload:
self.assertIsInstance(rv, bytearray)
elif parser is layer_fields._float:
self.assertIsInstance(rv, float)
else:
raise NotImplementedError(parser)
except Exception:
logging.info('checking [%s] %s=%r, %r, %r (%d)' %
(layer_name, f, v, v.get_default_value(), v.raw_value, len(v.fields)))
raise