blob: 8ebe8f5fa74112d3263a426df4c9837583ff024c [file] [log] [blame]
#!/usr/bin/env python3
#
# Copyright (c) 2019, The OpenThread Authors.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# 3. Neither the name of the copyright holder nor the
# names of its contributors may be used to endorse or promote products
# derived from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 'AS IS'
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
import os
import sys
import unittest
import config
import debug
from node import Node
PORT_OFFSET = int(os.getenv('PORT_OFFSET', "0"))
DEFAULT_PARAMS = {
'is_mtd': False,
'is_bbr': False,
'mode': 'rsdn',
'panid': 0xface,
'whitelist': None,
'version': '1.1',
}
"""Default configurations when creating nodes."""
EXTENDED_ADDRESS_BASE = 0x166e0a0000000000
"""Extended address base to keep U/L bit 1. The value is borrowed from Thread Test Harness."""
class NcpSupportMixin():
""" The mixin to check whether a test case supports NCP.
"""
SUPPORT_NCP = True
def __init__(self, *args, **kwargs):
if os.getenv('NODE_TYPE', 'sim') == 'ncp-sim' and not self.SUPPORT_NCP:
# 77 means skip this test case in automake tests
sys.exit(77)
super().__init__(*args, **kwargs)
class TestCase(NcpSupportMixin, unittest.TestCase):
"""The base class for all thread certification test cases.
The `topology` member of sub-class is used to create test topology.
"""
TOPOLOGY = None
def setUp(self):
"""Create simulator, nodes and apply configurations.
"""
self._clean_up_tmp()
self.simulator = config.create_default_simulator()
self.nodes = {}
initial_topology = {}
for i, params in self.TOPOLOGY.items():
if params:
params = dict(DEFAULT_PARAMS, **params)
else:
params = DEFAULT_PARAMS.copy()
initial_topology[i] = params
self.nodes[i] = Node(
i,
params['is_mtd'],
simulator=self.simulator,
version=params['version'],
is_bbr=params['is_bbr'],
)
self.nodes[i].set_panid(params['panid'])
self.nodes[i].set_mode(params['mode'])
if 'partition_id' in params:
self.nodes[i].set_partition_id(params['partition_id'])
if 'channel' in params:
self.nodes[i].set_channel(params['channel'])
if 'masterkey' in params:
self.nodes[i].set_masterkey(params['masterkey'])
if 'network_name' in params:
self.nodes[i].set_network_name(params['network_name'])
if 'router_selection_jitter' in params:
self.nodes[i].set_router_selection_jitter(
params['router_selection_jitter'])
if 'router_upgrade_threshold' in params:
self.nodes[i].set_router_upgrade_threshold(
params['router_upgrade_threshold'])
if 'router_downgrade_threshold' in params:
self.nodes[i].set_router_downgrade_threshold(
params['router_downgrade_threshold'])
if 'timeout' in params:
self.nodes[i].set_timeout(params['timeout'])
if 'active_dataset' in params:
self.nodes[i].set_active_dataset(
params['active_dataset']['timestamp'],
panid=params['active_dataset'].get('panid'),
channel=params['active_dataset'].get('channel'),
channel_mask=params['active_dataset'].get('channel_mask'),
master_key=params['active_dataset'].get('master_key'))
if 'pending_dataset' in params:
self.nodes[i].set_pending_dataset(
params['pending_dataset']['pendingtimestamp'],
params['pending_dataset']['activetimestamp'],
panid=params['pending_dataset'].get('panid'),
channel=params['pending_dataset'].get('channel'))
if 'key_switch_guardtime' in params:
self.nodes[i].set_key_switch_guardtime(
params['key_switch_guardtime'])
if 'key_sequence_counter' in params:
self.nodes[i].set_key_sequence_counter(
params['key_sequence_counter'])
if 'network_id_timeout' in params:
self.nodes[i].set_network_id_timeout(
params['network_id_timeout'])
if 'context_reuse_delay' in params:
self.nodes[i].set_context_reuse_delay(
params['context_reuse_delay'])
if 'max_children' in params:
self.nodes[i].set_max_children(params['max_children'])
# we have to add whitelist after nodes are all created
for i, params in initial_topology.items():
whitelist = params['whitelist']
if not whitelist:
continue
for j in whitelist:
rssi = None
if isinstance(j, tuple):
j, rssi = j
self.nodes[i].add_whitelist(self.nodes[j].get_addr64(),
rssi=rssi)
self.nodes[i].enable_whitelist()
self._inspector = debug.Inspector(self)
def inspect(self):
self._inspector.inspect()
def tearDown(self):
"""Destroy nodes and simulator.
"""
for node in list(self.nodes.values()):
node.stop()
node.destroy()
self.simulator.stop()
del self.nodes
del self.simulator
def flush_all(self):
"""Flush away all captured messages of all nodes.
"""
for i in list(self.nodes.keys()):
self.simulator.get_messages_sent_by(i)
def flush_nodes(self, nodes):
"""Flush away all captured messages of specified nodes.
Args:
nodes (list): nodes whose messages to flush.
"""
for i in nodes:
if i in list(self.nodes.keys()):
self.simulator.get_messages_sent_by(i)
def _clean_up_tmp(self):
"""
Clean up node files in tmp directory
"""
os.system(
f"rm -f tmp/{PORT_OFFSET}_*.flash tmp/{PORT_OFFSET}_*.data tmp/{PORT_OFFSET}_*.swap"
)