| #!/usr/bin/env python3 |
| # |
| # Copyright (c) 2019, The OpenThread Authors. |
| # All rights reserved. |
| # |
| # Redistribution and use in source and binary forms, with or without |
| # modification, are permitted provided that the following conditions are met: |
| # 1. Redistributions of source code must retain the above copyright |
| # notice, this list of conditions and the following disclaimer. |
| # 2. Redistributions in binary form must reproduce the above copyright |
| # notice, this list of conditions and the following disclaimer in the |
| # documentation and/or other materials provided with the distribution. |
| # 3. Neither the name of the copyright holder nor the |
| # names of its contributors may be used to endorse or promote products |
| # derived from this software without specific prior written permission. |
| # |
| # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" |
| # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE |
| # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE |
| # ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE |
| # LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR |
| # CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF |
| # SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS |
| # INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN |
| # CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) |
| # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
| # POSSIBILITY OF SUCH DAMAGE. |
| # |
| |
| import logging |
| import operator |
| import sys |
| |
| from pktverify import consts |
| from pktverify.test_info import TestInfo |
| |
| |
| class NodeSummary(object): |
| """ |
| Represents a summary of a node. |
| """ |
| |
| def __init__(self, role, extaddr): |
| self._role = role |
| self._extaddr = extaddr |
| self._ipaddrs = {} |
| |
| @property |
| def role(self): |
| return self._role |
| |
| @property |
| def extaddr(self): |
| return self._extaddr |
| |
| @property |
| def ipaddr_link_local(self): |
| for a, _ in self._iter_ipaddrs_rev(): |
| if a.is_link_local: |
| return a |
| |
| return None |
| |
| @property |
| def ipaddr_mleid(self): |
| for a, _ in self._iter_ipaddrs_rev(): |
| if a.is_mleid: |
| return a |
| |
| return None |
| |
| def _iter_ipaddrs_rev(self): |
| return sorted(self._ipaddrs.items(), key=operator.itemgetter(1), reverse=True) |
| |
| def add_ipaddr(self, ipaddr, index): |
| if ipaddr not in self._ipaddrs: |
| self._ipaddrs[ipaddr] = index |
| |
| def __str__(self): |
| return "[node {role} extaddr {extaddr} ipaddrs {ipaddrs}]".format( |
| role=self._role, |
| extaddr=self.extaddr, |
| ipaddrs=", ".join(map(str, sorted(self._ipaddrs))), |
| ) |
| |
| __repr__ = __str__ |
| |
| |
| class Summary(object): |
| """ |
| Represents a summary of the test. |
| """ |
| |
| def __init__(self, pkts, test_info: TestInfo): |
| self._pkts = pkts |
| self._test_info = test_info |
| self._leader_id = None |
| self._analyze() |
| |
| def iterroles(self): |
| return self._role_to_node.items() |
| |
| def _analyze(self): |
| self._analyze_test_info() |
| |
| with self._pkts.save_index(): |
| for f in [ |
| self._analyze_leader, |
| self._analyze_packets, |
| ]: |
| self._pkts.index = (0, 0) |
| f() |
| |
| def _analyze_test_info(self): |
| self._role_to_node = {} |
| self._extaddr_to_node = {} |
| |
| for role, extaddr in self._test_info.extaddrs.items(): |
| assert role not in self._role_to_node |
| assert extaddr not in self._extaddr_to_node |
| |
| node = NodeSummary(role, extaddr) |
| self._role_to_node[role] = node |
| self._extaddr_to_node[extaddr] = node |
| |
| def _analyze_leader(self): |
| for p in self._pkts: |
| |
| if p.mle.cmd in [consts.MLE_DATA_RESPONSE, consts.MLE_ADVERTISEMENT]: |
| |
| p.mle.__getattr__('tlv') |
| p.mle.__getattr__('tlv.leader_data') |
| p.mle.__getattr__('tlv.leader_data.router_id') |
| |
| tlv = p.mle.tlv |
| if tlv.leader_data: |
| self._leader_id = tlv.leader_data.router_id |
| logging.info("leader found in pcap: %d", self._leader_id) |
| break |
| else: |
| logging.warning("leader not found in pcap") |
| |
| def _analyze_packets(self): |
| for i, p in enumerate(self._pkts): |
| extaddr, src = None, None |
| # each packet should be either wpan or eth |
| assert (p.wpan and not p.eth) or (p.eth and not p.wpan) |
| if p.wpan: |
| # it is a 802.15.4 packet |
| extaddr = p.wpan.src64 |
| |
| if p.ipv6: |
| # it is a IPv6 packet |
| src = p.ipv6.src |
| |
| if extaddr and src: |
| if extaddr in self._extaddr_to_node: |
| role_sum = self._extaddr_to_node[extaddr] |
| role_sum.add_ipaddr(src, i) |
| else: |
| logging.warn("Extaddr %s is not in the testbed", extaddr) |
| |
| def show(self): |
| show_roles = "\n\t\t".join(map(str, self._role_to_node.values())) |
| sys.stderr.write("""{header} |
| Pcap Summary: |
| packets = {num_packets} |
| roles = {num_roles} |
| {show_roles} |
| {tailer} |
| """.format( |
| header='>' * 120, |
| num_packets=len(self._pkts), |
| num_roles=len(self._role_to_node), |
| show_roles=show_roles, |
| tailer='<' * 120, |
| )) |
| |
| def ipaddr_mleid_by_role(self, role): |
| node = self._role_to_node[role] |
| return node.ipaddr_mleid |
| |
| def ipaddr_link_local_by_role(self, role): |
| node = self._role_to_node[role] |
| return node.ipaddr_link_local |
| |
| def role(self, r): |
| return self._role_to_node[r] |