| #!/usr/bin/env python3 |
| # |
| # Copyright (c) 2021, The OpenThread Authors. |
| # All rights reserved. |
| # |
| # Redistribution and use in source and binary forms, with or without |
| # modification, are permitted provided that the following conditions are met: |
| # 1. Redistributions of source code must retain the above copyright |
| # notice, this list of conditions and the following disclaimer. |
| # 2. Redistributions in binary form must reproduce the above copyright |
| # notice, this list of conditions and the following disclaimer in the |
| # documentation and/or other materials provided with the distribution. |
| # 3. Neither the name of the copyright holder nor the |
| # names of its contributors may be used to endorse or promote products |
| # derived from this software without specific prior written permission. |
| # |
| # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 'AS IS' |
| # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE |
| # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE |
| # ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE |
| # LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR |
| # CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF |
| # SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS |
| # INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN |
| # CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) |
| # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
| # POSSIBILITY OF SUCH DAMAGE. |
| # |
| # This test verifies that PBBR sets DUA routes correctly. |
| # |
| import ipaddress |
| import unittest |
| |
| import config |
| import thread_cert |
| from pktverify.consts import ADDR_QRY_URI, BACKBONE_QUERY_URI, BACKBONE_ANSWER_URI, ADDR_NTF_URI |
| from pktverify.packet_verifier import PacketVerifier |
| |
| # Use two channels |
| CH1 = 11 |
| CH2 = 22 |
| |
| PBBR = 1 |
| ROUTER1 = 2 |
| MED1 = 3 |
| HOST = 4 |
| PBBR2 = 5 |
| MED2 = 6 |
| |
| REREG_DELAY = 5 # Seconds |
| MLR_TIMEOUT = 300 # Seconds |
| WAIT_REDUNDANCE = 3 |
| |
| |
| class TestDuaRoutingMed(thread_cert.TestCase): |
| USE_MESSAGE_FACTORY = False |
| |
| # Topology: |
| # ------(eth)---------------------- |
| # | | | |
| # PBBR HOST PBBR2 |
| # / CH1\ | CH2 |
| # MED1 ROUTER1 MED2 |
| # |
| # PBBR2 is in the secondary channel |
| # |
| TOPOLOGY = { |
| PBBR: { |
| 'name': 'PBBR', |
| 'allowlist': [MED1, ROUTER1], |
| 'is_otbr': True, |
| 'version': '1.2', |
| 'channel': CH1, |
| }, |
| ROUTER1: { |
| 'name': 'ROUTER1', |
| 'allowlist': [PBBR], |
| 'version': '1.2', |
| 'channel': CH1, |
| }, |
| MED1: { |
| 'name': 'MED1', |
| 'allowlist': [PBBR], |
| 'version': '1.2', |
| 'channel': CH1, |
| 'mode': 'rn', |
| }, |
| HOST: { |
| 'name': 'HOST', |
| 'is_host': True |
| }, |
| PBBR2: { |
| 'name': 'PBBR2', |
| 'is_otbr': True, |
| 'version': '1.2', |
| 'channel': CH2, |
| }, |
| MED2: { |
| 'name': 'MED2', |
| 'version': '1.2', |
| 'channel': CH2, |
| 'mode': 'rn', |
| }, |
| } |
| |
| def _bootstrap(self): |
| # Bring up HOST |
| self.nodes[HOST].start() |
| |
| # Bring up PBBR |
| self.nodes[PBBR].start() |
| self.simulator.go(5) |
| self.assertEqual('leader', self.nodes[PBBR].get_state()) |
| self.wait_node_state(PBBR, 'leader', 5) |
| |
| self.nodes[PBBR].set_backbone_router(reg_delay=REREG_DELAY, mlr_timeout=MLR_TIMEOUT) |
| self.nodes[PBBR].enable_backbone_router() |
| self.nodes[PBBR].set_domain_prefix(config.DOMAIN_PREFIX, 'prosD') |
| self.simulator.go(5) |
| self.assertTrue(self.nodes[PBBR].is_primary_backbone_router) |
| self.assertIsNotNone(self.nodes[PBBR].get_ip6_address(config.ADDRESS_TYPE.DUA)) |
| |
| # Bring up ROUTER1 |
| self.nodes[ROUTER1].start() |
| self.simulator.go(5) |
| self.assertEqual('router', self.nodes[ROUTER1].get_state()) |
| self.assertIsNotNone(self.nodes[ROUTER1].get_ip6_address(config.ADDRESS_TYPE.DUA)) |
| |
| # Bring up MED1 |
| self.nodes[MED1].start() |
| self.simulator.go(5) |
| self.assertEqual('child', self.nodes[MED1].get_state()) |
| self.assertIsNotNone(self.nodes[MED1].get_ip6_address(config.ADDRESS_TYPE.DUA)) |
| |
| # Bring up PBBR2 |
| self.nodes[PBBR2].start() |
| self.simulator.go(5) |
| self.assertEqual('leader', self.nodes[PBBR2].get_state()) |
| self.wait_node_state(PBBR2, 'leader', 5) |
| |
| self.nodes[PBBR2].set_backbone_router(reg_delay=REREG_DELAY, mlr_timeout=MLR_TIMEOUT) |
| self.nodes[PBBR2].enable_backbone_router() |
| self.nodes[PBBR2].set_domain_prefix(config.DOMAIN_PREFIX, 'prosD') |
| self.simulator.go(5) |
| self.assertTrue(self.nodes[PBBR2].is_primary_backbone_router) |
| self.assertIsNotNone(self.nodes[PBBR2].get_ip6_address(config.ADDRESS_TYPE.DUA)) |
| |
| # Bring up MED2 |
| self.nodes[MED2].start() |
| self.simulator.go(5) |
| self.assertEqual('child', self.nodes[MED2].get_state()) |
| self.assertIsNotNone(self.nodes[MED2].get_ip6_address(config.ADDRESS_TYPE.DUA)) |
| |
| def test(self): |
| self._bootstrap() |
| |
| self.collect_ipaddrs() |
| self.collect_rloc16s() |
| |
| self.simulator.go(10) |
| |
| # PBBR2 pings MED2 |
| self.assertTrue(self.nodes[PBBR2].ping(self.nodes[MED2].get_ip6_address(config.ADDRESS_TYPE.DUA))) |
| self.simulator.go(WAIT_REDUNDANCE) |
| |
| # MED2 pings PBBR2 |
| self.assertTrue(self.nodes[MED2].ping(self.nodes[PBBR2].get_ip6_address(config.ADDRESS_TYPE.DUA))) |
| self.simulator.go(WAIT_REDUNDANCE) |
| |
| # PBBR pings PBBR2 |
| self.nodes[PBBR].ping(self.nodes[PBBR2].get_ip6_address(config.ADDRESS_TYPE.DUA)) |
| self.simulator.go(WAIT_REDUNDANCE) |
| |
| # MED1 pings ROUTER1 should succeed |
| self.assertTrue(self.nodes[MED1].ping(self.nodes[ROUTER1].get_ip6_address(config.ADDRESS_TYPE.DUA))) |
| self.simulator.go(WAIT_REDUNDANCE) |
| |
| # MED1 pings MED2 which should succeed |
| MED2_DUA = self.nodes[MED2].get_ip6_address(config.ADDRESS_TYPE.DUA) |
| self.assertTrue(self.nodes[MED1].ping(MED2_DUA)) |
| self.simulator.go(WAIT_REDUNDANCE) |
| |
| def _get_mliid(self, nodeid: int) -> str: |
| mleid = self.nodes[nodeid].get_ip6_address(config.ADDRESS_TYPE.ML_EID) |
| mliid = ipaddress.IPv6Address(mleid).packed[-8:] |
| return ''.join(['%02x' % b for b in mliid]) |
| |
| def verify(self, pv: PacketVerifier): |
| pkts = pv.pkts |
| pv.add_common_vars() |
| pv.summary.show() |
| |
| PBBR = pv.vars['PBBR'] |
| ROUTER1 = pv.vars['ROUTER1'] |
| PBBR2 = pv.vars['PBBR2'] |
| MED1 = pv.vars['MED1'] |
| MED2 = pv.vars['MED2'] |
| MED1_DUA = pv.vars['MED1_DUA'] |
| ROUTER1_DUA = pv.vars['ROUTER1_DUA'] |
| PBBR_DUA = pv.vars['PBBR_DUA'] |
| PBBR2_DUA = pv.vars['PBBR2_DUA'] |
| MED2_DUA = pv.vars['MED2_DUA'] |
| PBBR_ETH = pv.vars['PBBR_ETH'] |
| PBBR2_ETH = pv.vars['PBBR2_ETH'] |
| |
| # PBBR should never send Address Query for MED1 (Child) |
| pkts.filter_wpan_src64(PBBR).filter_RLARMA().filter_coap_request(ADDR_QRY_URI) \ |
| .filter('thread_address.tlv.target_eid == {eid}', eid=MED1_DUA) \ |
| .must_not_next() |
| |
| # PBBR2 should never send Address Query for MED2 (Child) |
| pkts.filter_wpan_src64(PBBR2).filter_RLARMA().filter_coap_request(ADDR_QRY_URI) \ |
| .filter('thread_address.tlv.target_eid == {eid}', eid=MED2_DUA) \ |
| .must_not_next() |
| |
| # MEDs should never send Address Query |
| pkts.filter_wpan_src64(MED1).filter_RLARMA().filter_coap_request(ADDR_QRY_URI).must_not_next() |
| pkts.filter_wpan_src64(MED2).filter_RLARMA().filter_coap_request(ADDR_QRY_URI).must_not_next() |
| |
| # PBBR pings PBBR2 should succeed |
| ping_id = pkts.filter_ipv6_src_dst( |
| PBBR_DUA, PBBR2_DUA).filter_eth_src(PBBR_ETH).filter_ping_request().must_next().icmpv6.echo.identifier |
| pkts.filter_ipv6_src_dst(PBBR2_DUA, |
| PBBR_DUA).filter_eth_src(PBBR2_ETH).filter_ping_reply(identifier=ping_id).must_next() |
| |
| # MED1 pings ROUTER1 |
| ping_request_pkts = pkts.filter_ipv6_src_dst(MED1_DUA, ROUTER1_DUA) |
| # MED1 sends the Ping Request |
| ping_pkt = ping_request_pkts.filter_wpan_src64(MED1).filter_ping_request().must_next() |
| ping_id = ping_pkt.icmpv6.echo.identifier |
| # PBBR sends Address Query for ROUTER1_DUA |
| pkts.filter_wpan_src64(PBBR).filter_RLARMA().filter_coap_request(ADDR_QRY_URI) \ |
| .filter('thread_address.tlv.target_eid == {eid}', eid=ROUTER1_DUA) \ |
| .must_next() |
| # PBBR sends Backbone Query for ROUTER1_DUA |
| pkts.filter_eth_src(PBBR_ETH).filter_coap_request(BACKBONE_QUERY_URI) \ |
| .filter('thread_bl.tlv.target_eid == {eid}', eid=ROUTER1_DUA) \ |
| .must_next() |
| # ROUTER1 sends Address Answer for ROUTER1_DUA |
| pkts.filter_wpan_src64(ROUTER1).filter_coap_request(ADDR_NTF_URI, confirmable=True) \ |
| .filter('thread_address.tlv.target_eid == {eid}', eid=ROUTER1_DUA) \ |
| .must_next() |
| # PBBR forwards Ping Request to ROUTER1, decreasing TTL by 1 |
| ping_request_pkts.filter_wpan_src64(PBBR).filter_ping_request(identifier=ping_id).must_next().must_verify( |
| 'ipv6.hlim == {hlim}', hlim=ping_pkt.ipv6.hlim - 1) |
| |
| # MED1 pings MED2 |
| # Verify Ping Request: MED1 -> PBBR -> PBBR2 -> MED2 |
| ping_request_pkts = pkts.filter_ipv6_src_dst(MED1_DUA, MED2_DUA) |
| # MED1 sends the Ping Request |
| ping_pkt = ping_request_pkts.filter_wpan_src64(MED1).filter_ping_request().must_next() |
| ping_id = ping_pkt.icmpv6.echo.identifier |
| # PBBR sends Address Query for MED2_DUA |
| pkts.filter_wpan_src64(PBBR).filter_RLARMA().filter_coap_request(ADDR_QRY_URI) \ |
| .filter('thread_address.tlv.target_eid == {eid}', eid=MED2_DUA) \ |
| .must_next() |
| # PBBR sends Backbone Query for MED2_DUA |
| pkts.filter_eth_src(PBBR_ETH).filter_coap_request(BACKBONE_QUERY_URI) \ |
| .filter('thread_bl.tlv.target_eid == {eid}', eid=MED2_DUA) \ |
| .must_next() |
| # PBBR2 sends Backbone Answer for MED2_DUA |
| pkts.filter_eth_src(PBBR2_ETH).filter_coap_request(BACKBONE_ANSWER_URI, confirmable=True) \ |
| .filter('thread_bl.tlv.target_eid == {eid}', eid=MED2_DUA) \ |
| .must_next() |
| # PBBR forwards Ping Request to Backbone link, decreasing TTL by 1 |
| ping_request_pkts.filter_eth_src(PBBR_ETH).filter_ping_request(identifier=ping_id).must_next().must_verify( |
| 'ipv6.hlim == {hlim}', hlim=ping_pkt.ipv6.hlim - 1) |
| ping_request_pkts.filter_wpan_src64(PBBR2).filter_ping_request(identifier=ping_id).must_next() |
| # Verify Ping Reply: MED2 -> PBBR2 -> PBBR -> MED1 |
| ping_reply_pkts = pkts.filter_ipv6_src_dst(MED2_DUA, MED1_DUA).filter_ping_reply(identifier=ping_id) |
| ping_reply_pkts.filter_eth_src(PBBR2_ETH).must_next() |
| |
| |
| if __name__ == '__main__': |
| unittest.main() |