blob: e2b61c7abf9bd7f4ede7ba7dadc5e93c97c744ba [file] [log] [blame]
#!/usr/bin/env python3
#
# Copyright (c) 2018, The OpenThread Authors.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# 3. Neither the name of the copyright holder nor the
# names of its contributors may be used to endorse or promote products
# derived from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
import wpan
from wpan import verify
# -----------------------------------------------------------------------------------------------------------------------
# Test description: Address Cache Table
#
# This test verifies the behavior of `AddressResolver` module and entries in
# address cache table. It also tests the behavior of nodes when there are
# topology changes in the network (e.g., a child switches parent). In
# particular, the test covers the address cache update through snooping, i.e.,
# the logic which inspects forwarded frames to update address cache table if
# source RLOC16 on a received frame differs from an existing entry in the
# address cache table.
test_name = __file__[:-3] if __file__.endswith('.py') else __file__
print('-' * 120)
print('Starting \'{}\''.format(test_name))
# -----------------------------------------------------------------------------------------------------------------------
# Creating `wpan.Nodes` instances
speedup = 4
wpan.Node.set_time_speedup_factor(speedup)
r1 = wpan.Node()
r2 = wpan.Node()
r3 = wpan.Node()
c1 = wpan.Node()
c2 = wpan.Node()
c3 = wpan.Node()
# -----------------------------------------------------------------------------------------------------------------------
# Init all nodes
wpan.Node.init_all_nodes()
# -----------------------------------------------------------------------------------------------------------------------
# Build network topology
#
# r1 ---- r2 ---- r3
# | | |
# | | |
# c1 c2(s) c3
#
# c1 and c3 are FED children, c2 is an SED which is first attached to r2 and
# then forced to switch to r3.
PREFIX = "fd00:1234::"
POLL_INTERVAL = 400
r1.form("addr-cache-tbl")
r1.add_prefix(PREFIX, stable=True, on_mesh=True, slaac=True, preferred=True)
r1.allowlist_node(c1)
c1.allowlist_node(r1)
c1.join_node(r1, wpan.JOIN_TYPE_END_DEVICE)
r1.allowlist_node(r2)
r2.allowlist_node(r1)
r2.join_node(r1, wpan.JOIN_TYPE_ROUTER)
c2.allowlist_node(r2)
r2.allowlist_node(c2)
c2.join_node(r2, wpan.JOIN_TYPE_SLEEPY_END_DEVICE)
c2.set(wpan.WPAN_POLL_INTERVAL, str(POLL_INTERVAL))
r2.allowlist_node(r3)
r3.allowlist_node(r2)
r3.join_node(r2, wpan.JOIN_TYPE_ROUTER)
c3.allowlist_node(r3)
r3.allowlist_node(c3)
c3.join_node(r3, wpan.JOIN_TYPE_END_DEVICE)
# -----------------------------------------------------------------------------------------------------------------------
# Test implementation
#
ROUTER_TABLE_WAIT_TIME = 30 / speedup + 5
INVALID_ROUTER_ID = 63
verify(r1.get(wpan.WPAN_NODE_TYPE) == wpan.NODE_TYPE_LEADER)
verify(r2.get(wpan.WPAN_NODE_TYPE) == wpan.NODE_TYPE_ROUTER)
verify(r3.get(wpan.WPAN_NODE_TYPE) == wpan.NODE_TYPE_ROUTER)
verify(c1.get(wpan.WPAN_NODE_TYPE) == wpan.NODE_TYPE_END_DEVICE)
verify(c2.get(wpan.WPAN_NODE_TYPE) == wpan.NODE_TYPE_SLEEPY_END_DEVICE)
verify(c3.get(wpan.WPAN_NODE_TYPE) == wpan.NODE_TYPE_END_DEVICE)
r2_rloc = int(r2.get(wpan.WPAN_THREAD_RLOC16), 16)
r3_rloc = int(r3.get(wpan.WPAN_THREAD_RLOC16), 16)
c3_rloc = int(c3.get(wpan.WPAN_THREAD_RLOC16), 16)
# Wait till we have a valid "next hop" route on r1 towards r3
def check_r1_router_table():
router_table = wpan.parse_router_table_result(r1.get(wpan.WPAN_THREAD_ROUTER_TABLE))
verify(len(router_table) == 3)
for entry in router_table:
if entry.rloc16 == r3_rloc:
verify(entry.next_hop != INVALID_ROUTER_ID)
wpan.verify_within(check_r1_router_table, ROUTER_TABLE_WAIT_TIME)
r1_address = r1.find_ip6_address_with_prefix(PREFIX)
c1_address = c1.find_ip6_address_with_prefix(PREFIX)
c2_address = c2.find_ip6_address_with_prefix(PREFIX)
c3_address = c3.find_ip6_address_with_prefix(PREFIX)
# Send a single UDP message from r1 to c2
sender = r1.prepare_tx(r1_address, c2_address, "Hi from r1 to c2")
recver = c2.prepare_rx(sender)
wpan.Node.perform_async_tx_rx()
verify(sender.was_successful and recver.was_successful)
# Send a single UDP message from r1 to c3
sender = r1.prepare_tx(r1_address, c3_address, "Hi from r1 to c3")
recver = c3.prepare_rx(sender)
wpan.Node.perform_async_tx_rx()
verify(sender.was_successful and recver.was_successful)
# The address cache table on r1 should contain two entries for
# c2 and c3 addresses.
addr_cache_table = wpan.parse_address_cache_table_result(r1.get(wpan.WPAN_THREAD_ADDRESS_CACHE_TABLE))
verify(len(addr_cache_table) == 2)
for entry in addr_cache_table:
if entry.address == c2_address:
# Entry for c2 should point towards its parent r2.
verify(entry.rloc16 == r2_rloc)
elif entry.address == c3_address:
# Entry for c3 should point towards c3 itself.
verify(entry.rloc16 == c3_rloc)
else:
raise (wpan.VerifyError("Unknown entry in the address cache table"))
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
# Force c2 to switch its parent from r2 to r3
CHILD_SUPERVISION_CHECK_TIMEOUT = 2
PARENT_SUPERVISION_INTERVAL = 1
REATTACH_WAIT_TIME = CHILD_SUPERVISION_CHECK_TIMEOUT / speedup + 6
c2.set(
wpan.WPAN_CHILD_SUPERVISION_CHECK_TIMEOUT,
str(CHILD_SUPERVISION_CHECK_TIMEOUT),
)
r3.set(wpan.WPAN_CHILD_SUPERVISION_INTERVAL, str(PARENT_SUPERVISION_INTERVAL))
r2.un_allowlist_node(c2)
r3.allowlist_node(c2)
c2.allowlist_node(r3)
# Wait for c2 to detach from r2 and attach to r3.
#
# Upon re-attach, previous parent r2 is notified and should remove c2 from
# its child table.
def check_c2_is_removed_from_r2_child_table():
child_table = wpan.parse_list(r2.get(wpan.WPAN_THREAD_CHILD_TABLE))
verify(len(child_table) == 0)
wpan.verify_within(check_c2_is_removed_from_r2_child_table, REATTACH_WAIT_TIME)
# Verify that both c2, c3 are children of r3
child_table = wpan.parse_list(r3.get(wpan.WPAN_THREAD_CHILD_TABLE))
verify(len(child_table) == 2)
# New network topology
#
# r1 ---- r2 ---- r3
# | /\
# | / \
# c1 c2(s) c3
# From r1 send again to c2 (which is now a child of r3).
#
# Note that r1 still has r2 as the destination for c2's address in its address
# cache table. But since r2 is aware that c2 is no longer its child, when it
# receives the IPv6 message with c2's address, r2 itself would do an address
# query for the address and forward the IPv6 message.
sender = r1.prepare_tx(r1_address, c2_address, "Hi again c2")
recver = c2.prepare_rx(sender)
wpan.Node.perform_async_tx_rx()
verify(sender.was_successful and recver.was_successful)
# The address cache table on r1 should have c2's address removed.
addr_cache_table = wpan.parse_address_cache_table_result(r1.get(wpan.WPAN_THREAD_ADDRESS_CACHE_TABLE))
verify(len(addr_cache_table) == 1)
for entry in addr_cache_table:
if entry.address == c3_address:
# Entry for c3 should still point towards c3
verify(entry.rloc16 == c3_rloc)
else:
raise (wpan.VerifyError("Unknown entry in the address cache table"))
# Send a UDP message from r1 to c2.
sender = r1.prepare_tx(r1_address, c2_address, "Hi again c2")
recver = c2.prepare_rx(sender)
wpan.Node.perform_async_tx_rx()
verify(sender.was_successful and recver.was_successful)
# The address cache table on r1 should have both c1 and c2.
addr_cache_table = wpan.parse_address_cache_table_result(r1.get(wpan.WPAN_THREAD_ADDRESS_CACHE_TABLE))
verify(len(addr_cache_table) == 2)
for entry in addr_cache_table:
if entry.address == c2_address:
# Entry for c2 should point towards r3
verify(entry.rloc16 == r3_rloc)
elif entry.address == c3_address:
# Entry for c3 should still point towards c3
verify(entry.rloc16 == c3_rloc)
else:
raise (wpan.VerifyError("Unknown entry in the address cache table"))
# Force c2 to switch its parent from r3 to r2
c2.set(
wpan.WPAN_CHILD_SUPERVISION_CHECK_TIMEOUT,
str(CHILD_SUPERVISION_CHECK_TIMEOUT),
)
r2.set(wpan.WPAN_CHILD_SUPERVISION_INTERVAL, str(PARENT_SUPERVISION_INTERVAL))
r3.un_allowlist_node(c2)
r2.allowlist_node(c2)
c2.allowlist_node(r2)
# Wait for c2 to detach from r3 and attach to r2.
#
# Upon re-attach, previous parent r3 is notified and should remove c2 from
# its child table.
def check_c2_is_removed_from_r3_child_table():
child_table = wpan.parse_list(r3.get(wpan.WPAN_THREAD_CHILD_TABLE))
verify(len(child_table) == 1)
wpan.verify_within(check_c2_is_removed_from_r3_child_table, REATTACH_WAIT_TIME)
# Verify that both c2 is a child of r2
child_table = wpan.parse_list(r2.get(wpan.WPAN_THREAD_CHILD_TABLE))
verify(len(child_table) == 1)
# New network topology
#
# r1 ---- r2 ---- r3
# | | |
# | | |
# c1 c2(s) c3
# Send a UDP message from c2 to c1.
# This message will be forwarded by r1 to its FED child c1.
sender = c2.prepare_tx(c2_address, c1_address, "Hi c1 child of r1")
recver = c1.prepare_rx(sender)
wpan.Node.perform_async_tx_rx()
verify(sender.was_successful and recver.was_successful)
# r1 upon receiving and forwarding the message from c2 (through r2 now) should
# update its address cache table for c2 (address cache update through snooping).
#
# verify that the address cache table is updated correctly.
addr_cache_table = wpan.parse_address_cache_table_result(r1.get(wpan.WPAN_THREAD_ADDRESS_CACHE_TABLE))
verify(len(addr_cache_table) == 2)
for entry in addr_cache_table:
if entry.address == c2_address:
# Entry for c2's address should now point to r2
verify(entry.rloc16 == r2_rloc)
elif entry.address == c3_address:
# Entry for c3's address should still point to c3
verify(entry.rloc16 == c3_rloc)
else:
raise (wpan.VerifyError("Unknown entry in the address cache table"))
# Force c2 to switch its parent from r2 to r3
CHILD_SUPERVISION_CHECK_TIMEOUT = 2
PARENT_SUPERVISION_INTERVAL = 1
REATTACH_WAIT_TIME = CHILD_SUPERVISION_CHECK_TIMEOUT / speedup + 6
c2.set(
wpan.WPAN_CHILD_SUPERVISION_CHECK_TIMEOUT,
str(CHILD_SUPERVISION_CHECK_TIMEOUT),
)
r3.set(wpan.WPAN_CHILD_SUPERVISION_INTERVAL, str(PARENT_SUPERVISION_INTERVAL))
r2.un_allowlist_node(c2)
r3.allowlist_node(c2)
c2.allowlist_node(r3)
# Wait for c2 to detach from r2 and attach to r3.
#
# Upon re-attach, previous parent r2 is notified and should remove c2 from
# its child table.
def check_c2_is_removed_from_r2_child_table():
child_table = wpan.parse_list(r2.get(wpan.WPAN_THREAD_CHILD_TABLE))
verify(len(child_table) == 0)
wpan.verify_within(check_c2_is_removed_from_r2_child_table, REATTACH_WAIT_TIME)
# Verify that both c2, c3 are children of r3
child_table = wpan.parse_list(r3.get(wpan.WPAN_THREAD_CHILD_TABLE))
verify(len(child_table) == 2)
# New network topology
#
# r1 ---- r2 ---- r3
# | /\
# | / \
# c1 c2(s) c3
# Send a UDP message from c2 to c1.
# This message will be forwarded by r1 to its FED child c1.
sender = c2.prepare_tx(c2_address, c1_address, "Hi c1 child of r1")
recver = c1.prepare_rx(sender)
wpan.Node.perform_async_tx_rx()
verify(sender.was_successful and recver.was_successful)
# r1 upon receiving and forwarding the message from c2 (through r2 now) should
# update its address cache table for c2 (address cache update through snooping).
#
# verify that the address cache table is updated correctly.
addr_cache_table = wpan.parse_address_cache_table_result(r1.get(wpan.WPAN_THREAD_ADDRESS_CACHE_TABLE))
verify(len(addr_cache_table) == 2)
for entry in addr_cache_table:
if entry.address == c2_address:
# Entry for c2's address should now point to r3
verify(entry.rloc16 == r3_rloc)
elif entry.address == c3_address:
# Entry for c3's address should still point to c3
verify(entry.rloc16 == c3_rloc)
else:
raise (wpan.VerifyError("Unknown entry in the address cache table"))
# -----------------------------------------------------------------------------------------------------------------------
# Test finished
wpan.Node.finalize_all_nodes()
print('\'{}\' passed.'.format(test_name))