blob: 50b36946b7557ea16badd1b1ccc66fa0e7f9f86c [file] [log] [blame]
#!/usr/bin/env python3
#
# Copyright (c) 2016, The OpenThread Authors.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# 3. Neither the name of the copyright holder nor the
# names of its contributors may be used to endorse or promote products
# derived from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
import binascii
import ipaddress
import logging
import os
import re
import socket
import subprocess
import sys
import time
import traceback
import typing
import unittest
from ipaddress import IPv6Address, IPv6Network
from typing import Union, Dict, Optional, List, Any
import pexpect
import pexpect.popen_spawn
import config
import simulator
import thread_cert
PORT_OFFSET = int(os.getenv('PORT_OFFSET', "0"))
class OtbrDocker:
RESET_DELAY = 3
_socat_proc = None
_ot_rcp_proc = None
_docker_proc = None
def __init__(self, nodeid: int, **kwargs):
self.verbose = int(float(os.getenv('VERBOSE', 0)))
try:
self._docker_name = config.OTBR_DOCKER_NAME_PREFIX + str(nodeid)
self._prepare_ot_rcp_sim(nodeid)
self._launch_docker()
except Exception:
traceback.print_exc()
self.destroy()
raise
def _prepare_ot_rcp_sim(self, nodeid: int):
self._socat_proc = subprocess.Popen(['socat', '-d', '-d', 'pty,raw,echo=0', 'pty,raw,echo=0'],
stderr=subprocess.PIPE,
stdin=subprocess.DEVNULL,
stdout=subprocess.DEVNULL)
line = self._socat_proc.stderr.readline().decode('ascii').strip()
self._rcp_device_pty = rcp_device_pty = line[line.index('PTY is /dev') + 7:]
line = self._socat_proc.stderr.readline().decode('ascii').strip()
self._rcp_device = rcp_device = line[line.index('PTY is /dev') + 7:]
logging.info(f"socat running: device PTY: {rcp_device_pty}, device: {rcp_device}")
ot_rcp_path = self._get_ot_rcp_path()
self._ot_rcp_proc = subprocess.Popen(f"{ot_rcp_path} {nodeid} > {rcp_device_pty} < {rcp_device_pty}",
shell=True,
stdin=subprocess.DEVNULL,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL)
try:
self._ot_rcp_proc.wait(1)
except subprocess.TimeoutExpired:
# We expect ot-rcp not to quit in 1 second.
pass
else:
raise Exception(f"ot-rcp {nodeid} exited unexpectedly!")
def _get_ot_rcp_path(self) -> str:
srcdir = os.environ['top_builddir']
path = '%s/examples/apps/ncp/ot-rcp' % srcdir
logging.info("ot-rcp path: %s", path)
return path
def _launch_docker(self):
logging.info(f'Docker image: {config.OTBR_DOCKER_IMAGE}')
subprocess.check_call(f"docker rm -f {self._docker_name} || true", shell=True)
CI_ENV = os.getenv('CI_ENV', '').split()
os.makedirs('/tmp/coverage/', exist_ok=True)
self._docker_proc = subprocess.Popen(['docker', 'run'] + CI_ENV + [
'--rm',
'--name',
self._docker_name,
'--network',
config.BACKBONE_DOCKER_NETWORK_NAME,
'-i',
'--sysctl',
'net.ipv6.conf.all.disable_ipv6=0 net.ipv4.conf.all.forwarding=1 net.ipv6.conf.all.forwarding=1',
'--privileged',
'--cap-add=NET_ADMIN',
'--volume',
f'{self._rcp_device}:/dev/ttyUSB0',
'-v',
'/tmp/coverage/:/tmp/coverage/',
config.OTBR_DOCKER_IMAGE,
'-B',
config.BACKBONE_IFNAME,
'--trel-url',
f'trel://{config.BACKBONE_IFNAME}',
],
stdin=subprocess.DEVNULL,
stdout=sys.stdout,
stderr=sys.stderr)
launch_docker_deadline = time.time() + 300
launch_ok = False
while time.time() < launch_docker_deadline:
try:
subprocess.check_call(f'docker exec -i {self._docker_name} ot-ctl state', shell=True)
launch_ok = True
logging.info("OTBR Docker %s Is Ready!", self._docker_name)
break
except subprocess.CalledProcessError:
time.sleep(5)
continue
assert launch_ok
self.start_ot_ctl()
def __repr__(self):
return f'OtbrDocker<{self.nodeid}>'
def start_otbr_service(self):
self.bash('service otbr-agent start')
self.simulator.go(3)
self.start_ot_ctl()
def stop_otbr_service(self):
self.stop_ot_ctl()
self.bash('service otbr-agent stop')
def stop_mdns_service(self):
self.bash('service avahi-daemon stop')
self.bash('service mdns stop')
def start_mdns_service(self):
self.bash('service avahi-daemon start')
self.bash('service mdns start')
def start_ot_ctl(self):
cmd = f'docker exec -i {self._docker_name} ot-ctl'
self.pexpect = pexpect.popen_spawn.PopenSpawn(cmd, timeout=30)
if self.verbose:
self.pexpect.logfile_read = sys.stdout.buffer
# Add delay to ensure that the process is ready to receive commands.
timeout = 0.4
while timeout > 0:
self.pexpect.send('\r\n')
try:
self.pexpect.expect('> ', timeout=0.1)
break
except pexpect.TIMEOUT:
timeout -= 0.1
def stop_ot_ctl(self):
self.pexpect.sendeof()
self.pexpect.wait()
self.pexpect.proc.kill()
def destroy(self):
logging.info("Destroying %s", self)
self._shutdown_docker()
self._shutdown_ot_rcp()
self._shutdown_socat()
def _shutdown_docker(self):
if self._docker_proc is None:
return
try:
COVERAGE = int(os.getenv('COVERAGE', '0'))
OTBR_COVERAGE = int(os.getenv('OTBR_COVERAGE', '0'))
test_name = os.getenv('TEST_NAME')
unique_node_id = f'{test_name}-{PORT_OFFSET}-{self.nodeid}'
if COVERAGE or OTBR_COVERAGE:
self.bash('service otbr-agent stop')
cov_file_path = f'/tmp/coverage/coverage-{unique_node_id}.info'
# Upload OTBR code coverage if OTBR_COVERAGE=1, otherwise OpenThread code coverage.
if OTBR_COVERAGE:
codecov_cmd = f'lcov --directory . --capture --output-file {cov_file_path}'
else:
codecov_cmd = ('lcov --directory build/otbr/third_party/openthread/repo --capture '
f'--output-file {cov_file_path}')
self.bash(codecov_cmd)
copyCore = subprocess.run(f'docker cp {self._docker_name}:/core ./coredump_{unique_node_id}', shell=True)
if copyCore.returncode == 0:
subprocess.check_call(
f'docker cp {self._docker_name}:/usr/sbin/otbr-agent ./otbr-agent_{unique_node_id}', shell=True)
finally:
subprocess.check_call(f"docker rm -f {self._docker_name}", shell=True)
self._docker_proc.wait()
del self._docker_proc
def _shutdown_ot_rcp(self):
if self._ot_rcp_proc is not None:
self._ot_rcp_proc.kill()
self._ot_rcp_proc.wait()
del self._ot_rcp_proc
def _shutdown_socat(self):
if self._socat_proc is not None:
self._socat_proc.stderr.close()
self._socat_proc.kill()
self._socat_proc.wait()
del self._socat_proc
def bash(self, cmd: str, encoding='ascii') -> List[str]:
logging.info("%s $ %s", self, cmd)
proc = subprocess.Popen(['docker', 'exec', '-i', self._docker_name, 'bash', '-c', cmd],
stdin=subprocess.DEVNULL,
stdout=subprocess.PIPE,
stderr=sys.stderr,
encoding=encoding)
with proc:
lines = []
while True:
line = proc.stdout.readline()
if not line:
break
lines.append(line)
logging.info("%s $ %r", self, line.rstrip('\r\n'))
proc.wait()
if proc.returncode != 0:
raise subprocess.CalledProcessError(proc.returncode, cmd, ''.join(lines))
else:
return lines
def dns_dig(self, server: str, name: str, qtype: str):
"""
Run dig command to query a DNS server.
Args:
server: the server address.
name: the name to query.
qtype: the query type (e.g. AAAA, PTR, TXT, SRV).
Returns:
The dig result similar as below:
{
"opcode": "QUERY",
"status": "NOERROR",
"id": "64144",
"QUESTION": [
('google.com.', 'IN', 'AAAA')
],
"ANSWER": [
('google.com.', 107, 'IN', 'AAAA', '2404:6800:4008:c00::71'),
('google.com.', 107, 'IN', 'AAAA', '2404:6800:4008:c00::8a'),
('google.com.', 107, 'IN', 'AAAA', '2404:6800:4008:c00::66'),
('google.com.', 107, 'IN', 'AAAA', '2404:6800:4008:c00::8b'),
],
"ADDITIONAL": [
],
}
"""
output = self.bash(f'dig -6 @{server} \'{name}\' {qtype}', encoding='raw_unicode_escape')
section = None
dig_result = {
'QUESTION': [],
'ANSWER': [],
'ADDITIONAL': [],
}
for line in output:
line = line.strip()
if line.startswith(';; ->>HEADER<<- '):
headers = line[len(';; ->>HEADER<<- '):].split(', ')
for header in headers:
key, val = header.split(': ')
dig_result[key] = val
continue
if line == ';; QUESTION SECTION:':
section = 'QUESTION'
continue
elif line == ';; ANSWER SECTION:':
section = 'ANSWER'
continue
elif line == ';; ADDITIONAL SECTION:':
section = 'ADDITIONAL'
continue
elif section and not line:
section = None
continue
if section:
assert line
if section == 'QUESTION':
assert line.startswith(';')
line = line[1:]
record = list(line.split())
if section == 'QUESTION':
if record[2] in ('SRV', 'TXT'):
record[0] = self.__unescape_dns_instance_name(record[0])
else:
record[1] = int(record[1])
if record[3] == 'SRV':
record[0] = self.__unescape_dns_instance_name(record[0])
record[4], record[5], record[6] = map(int, [record[4], record[5], record[6]])
elif record[3] == 'TXT':
record[0] = self.__unescape_dns_instance_name(record[0])
record[4:] = [self.__parse_dns_dig_txt(line)]
elif record[3] == 'PTR':
record[4] = self.__unescape_dns_instance_name(record[4])
dig_result[section].append(tuple(record))
return dig_result
@staticmethod
def __unescape_dns_instance_name(name: str) -> str:
new_name = []
i = 0
while i < len(name):
c = name[i]
if c == '\\':
assert i + 1 < len(name), name
if name[i + 1].isdigit():
assert i + 3 < len(name) and name[i + 2].isdigit() and name[i + 3].isdigit(), name
new_name.append(chr(int(name[i + 1:i + 4])))
i += 3
else:
new_name.append(name[i + 1])
i += 1
else:
new_name.append(c)
i += 1
return ''.join(new_name)
def __parse_dns_dig_txt(self, line: str):
# Example TXT entry:
# "xp=\\000\\013\\184\\000\\000\\000\\000\\000"
txt = {}
for entry in re.findall(r'"((?:[^\\]|\\.)*?)"', line):
if entry == "":
continue
k, v = entry.split('=', 1)
txt[k] = v
return txt
def _setup_sysctl(self):
self.bash(f'sysctl net.ipv6.conf.{self.ETH_DEV}.accept_ra=2')
self.bash(f'sysctl net.ipv6.conf.{self.ETH_DEV}.accept_ra_rt_info_max_plen=64')
class OtCli:
RESET_DELAY = 0.1
def __init__(self, nodeid, is_mtd=False, version=None, is_bbr=False, **kwargs):
self.verbose = int(float(os.getenv('VERBOSE', 0)))
self.node_type = os.getenv('NODE_TYPE', 'sim')
self.env_version = os.getenv('THREAD_VERSION', '1.1')
self.is_bbr = is_bbr
self._initialized = False
if os.getenv('COVERAGE', 0) and os.getenv('CC', 'gcc') == 'gcc':
self._cmd_prefix = '/usr/bin/env GCOV_PREFIX=%s/ot-run/%s/ot-gcda.%d ' % (os.getenv(
'top_srcdir', '.'), sys.argv[0], nodeid)
else:
self._cmd_prefix = ''
if version is not None:
self.version = version
else:
self.version = self.env_version
mode = os.environ.get('USE_MTD') == '1' and is_mtd and 'mtd' or 'ftd'
if self.node_type == 'soc':
self.__init_soc(nodeid)
elif self.node_type == 'ncp-sim':
# TODO use mode after ncp-mtd is available.
self.__init_ncp_sim(nodeid, 'ftd')
else:
self.__init_sim(nodeid, mode)
if self.verbose:
self.pexpect.logfile_read = sys.stdout.buffer
self._initialized = True
def __init_sim(self, nodeid, mode):
""" Initialize a simulation node. """
# Default command if no match below, will be overridden if below conditions are met.
cmd = './ot-cli-%s' % (mode)
# For Thread 1.2 MTD node, use ot-cli-mtd build regardless of OT_CLI_PATH
if self.version == '1.2' and mode == 'mtd' and 'top_builddir' in os.environ:
srcdir = os.environ['top_builddir']
cmd = '%s/examples/apps/cli/ot-cli-%s %d' % (srcdir, mode, nodeid)
# If Thread version of node matches the testing environment version.
elif self.version == self.env_version:
# Load Thread 1.2 BBR device when testing Thread 1.2 scenarios
# which requires device with Backbone functionality.
if self.version == '1.2' and self.is_bbr:
if 'OT_CLI_PATH_1_2_BBR' in os.environ:
cmd = os.environ['OT_CLI_PATH_1_2_BBR']
elif 'top_builddir_1_2_bbr' in os.environ:
srcdir = os.environ['top_builddir_1_2_bbr']
cmd = '%s/examples/apps/cli/ot-cli-%s' % (srcdir, mode)
# Load Thread device of the testing environment version (may be 1.1 or 1.2)
else:
if 'OT_CLI_PATH' in os.environ:
cmd = os.environ['OT_CLI_PATH']
elif 'top_builddir' in os.environ:
srcdir = os.environ['top_builddir']
cmd = '%s/examples/apps/cli/ot-cli-%s' % (srcdir, mode)
if 'RADIO_DEVICE' in os.environ:
cmd += ' --real-time-signal=+1 -v spinel+hdlc+uart://%s?forkpty-arg=%d' % (os.environ['RADIO_DEVICE'],
nodeid)
self.is_posix = True
else:
cmd += ' %d' % nodeid
# Load Thread 1.1 node when testing Thread 1.2 scenarios for interoperability
elif self.version == '1.1':
# Posix app
if 'OT_CLI_PATH_1_1' in os.environ:
cmd = os.environ['OT_CLI_PATH_1_1']
elif 'top_builddir_1_1' in os.environ:
srcdir = os.environ['top_builddir_1_1']
cmd = '%s/examples/apps/cli/ot-cli-%s' % (srcdir, mode)
if 'RADIO_DEVICE_1_1' in os.environ:
cmd += ' --real-time-signal=+1 -v spinel+hdlc+uart://%s?forkpty-arg=%d' % (
os.environ['RADIO_DEVICE_1_1'], nodeid)
self.is_posix = True
else:
cmd += ' %d' % nodeid
print("%s" % cmd)
self.pexpect = pexpect.popen_spawn.PopenSpawn(self._cmd_prefix + cmd, timeout=10)
# Add delay to ensure that the process is ready to receive commands.
timeout = 0.4
while timeout > 0:
self.pexpect.send('\r\n')
try:
self.pexpect.expect('> ', timeout=0.1)
break
except pexpect.TIMEOUT:
timeout -= 0.1
def __init_ncp_sim(self, nodeid, mode):
""" Initialize an NCP simulation node. """
# Default command if no match below, will be overridden if below conditions are met.
cmd = 'spinel-cli.py -p ./ot-ncp-%s -n' % mode
# If Thread version of node matches the testing environment version.
if self.version == self.env_version:
if 'RADIO_DEVICE' in os.environ:
args = ' --real-time-signal=+1 spinel+hdlc+uart://%s?forkpty-arg=%d' % (os.environ['RADIO_DEVICE'],
nodeid)
self.is_posix = True
else:
args = ''
# Load Thread 1.2 BBR device when testing Thread 1.2 scenarios
# which requires device with Backbone functionality.
if self.version == '1.2' and self.is_bbr:
if 'OT_NCP_PATH_1_2_BBR' in os.environ:
cmd = 'spinel-cli.py -p "%s%s" -n' % (
os.environ['OT_NCP_PATH_1_2_BBR'],
args,
)
elif 'top_builddir_1_2_bbr' in os.environ:
srcdir = os.environ['top_builddir_1_2_bbr']
cmd = '%s/examples/apps/ncp/ot-ncp-%s' % (srcdir, mode)
cmd = 'spinel-cli.py -p "%s%s" -n' % (
cmd,
args,
)
# Load Thread device of the testing environment version (may be 1.1 or 1.2).
else:
if 'OT_NCP_PATH' in os.environ:
cmd = 'spinel-cli.py -p "%s%s" -n' % (
os.environ['OT_NCP_PATH'],
args,
)
elif 'top_builddir' in os.environ:
srcdir = os.environ['top_builddir']
cmd = '%s/examples/apps/ncp/ot-ncp-%s' % (srcdir, mode)
cmd = 'spinel-cli.py -p "%s%s" -n' % (
cmd,
args,
)
# Load Thread 1.1 node when testing Thread 1.2 scenarios for interoperability.
elif self.version == '1.1':
if 'RADIO_DEVICE_1_1' in os.environ:
args = ' --real-time-signal=+1 spinel+hdlc+uart://%s?forkpty-arg=%d' % (os.environ['RADIO_DEVICE_1_1'],
nodeid)
self.is_posix = True
else:
args = ''
if 'OT_NCP_PATH_1_1' in os.environ:
cmd = 'spinel-cli.py -p "%s%s" -n' % (
os.environ['OT_NCP_PATH_1_1'],
args,
)
elif 'top_builddir_1_1' in os.environ:
srcdir = os.environ['top_builddir_1_1']
cmd = '%s/examples/apps/ncp/ot-ncp-%s' % (srcdir, mode)
cmd = 'spinel-cli.py -p "%s%s" -n' % (
cmd,
args,
)
cmd += ' %d' % nodeid
print("%s" % cmd)
self.pexpect = pexpect.spawn(self._cmd_prefix + cmd, timeout=10)
# Add delay to ensure that the process is ready to receive commands.
time.sleep(0.2)
self._expect('spinel-cli >')
self.debug(int(os.getenv('DEBUG', '0')))
def __init_soc(self, nodeid):
""" Initialize a System-on-a-chip node connected via UART. """
import fdpexpect
serialPort = '/dev/ttyUSB%d' % ((nodeid - 1) * 2)
self.pexpect = fdpexpect.fdspawn(os.open(serialPort, os.O_RDWR | os.O_NONBLOCK | os.O_NOCTTY))
def destroy(self):
if not self._initialized:
return
if (hasattr(self.pexpect, 'proc') and self.pexpect.proc.poll() is None or
not hasattr(self.pexpect, 'proc') and self.pexpect.isalive()):
print("%d: exit" % self.nodeid)
self.pexpect.send('exit\n')
self.pexpect.expect(pexpect.EOF)
self.pexpect.wait()
self._initialized = False
class NodeImpl:
is_host = False
is_otbr = False
def __init__(self, nodeid, name=None, simulator=None, **kwargs):
self.nodeid = nodeid
self.name = name or ('Node%d' % nodeid)
self.is_posix = False
self.simulator = simulator
if self.simulator:
self.simulator.add_node(self)
super().__init__(nodeid, **kwargs)
self.set_mesh_local_prefix(config.MESH_LOCAL_PREFIX)
self.set_addr64('%016x' % (thread_cert.EXTENDED_ADDRESS_BASE + nodeid))
def _expect(self, pattern, timeout=-1, *args, **kwargs):
""" Process simulator events until expected the pattern. """
if timeout == -1:
timeout = self.pexpect.timeout
assert timeout > 0
while timeout > 0:
try:
return self.pexpect.expect(pattern, 0.1, *args, **kwargs)
except pexpect.TIMEOUT:
timeout -= 0.1
self.simulator.go(0)
if timeout <= 0:
raise
def _expect_done(self, timeout=-1):
self._expect('Done', timeout)
def _expect_result(self, pattern, *args, **kwargs):
"""Expect a single matching result.
The arguments are identical to pexpect.expect().
Returns:
The matched line.
"""
results = self._expect_results(pattern, *args, **kwargs)
assert len(results) == 1, results
return results[0]
def _expect_results(self, pattern, *args, **kwargs):
"""Expect multiple matching results.
The arguments are identical to pexpect.expect().
Returns:
The matched lines.
"""
output = self._expect_command_output()
results = [line for line in output if self._match_pattern(line, pattern)]
return results
@staticmethod
def _match_pattern(line, pattern):
if isinstance(pattern, str):
pattern = re.compile(pattern)
if isinstance(pattern, typing.Pattern):
return pattern.match(line)
else:
return any(NodeImpl._match_pattern(line, p) for p in pattern)
def _expect_command_output(self, ignore_logs=True):
lines = []
while True:
line = self.__readline(ignore_logs=ignore_logs)
if line == 'Done':
break
elif line.startswith('Error '):
raise Exception(line)
else:
lines.append(line)
print(f'_expect_command_output() returns {lines!r}')
return lines
def __is_logging_line(self, line: str) -> bool:
return len(line) >= 3 and line[:3] in {'[D]', '[I]', '[N]', '[W]', '[C]', '[-]'}
def read_cert_messages_in_commissioning_log(self, timeout=-1):
"""Get the log of the traffic after DTLS handshake.
"""
format_str = br"=+?\[\[THCI\].*?type=%s.*?\].*?=+?[\s\S]+?-{40,}"
join_fin_req = format_str % br"JOIN_FIN\.req"
join_fin_rsp = format_str % br"JOIN_FIN\.rsp"
dummy_format_str = br"\[THCI\].*?type=%s.*?"
join_ent_ntf = dummy_format_str % br"JOIN_ENT\.ntf"
join_ent_rsp = dummy_format_str % br"JOIN_ENT\.rsp"
pattern = (b"(" + join_fin_req + b")|(" + join_fin_rsp + b")|(" + join_ent_ntf + b")|(" + join_ent_rsp + b")")
messages = []
# There are at most 4 cert messages both for joiner and commissioner
for _ in range(0, 4):
try:
self._expect(pattern, timeout=timeout)
log = self.pexpect.match.group(0)
messages.append(self._extract_cert_message(log))
except BaseException:
break
return messages
def _extract_cert_message(self, log):
res = re.search(br"direction=\w+", log)
assert res
direction = res.group(0).split(b'=')[1].strip()
res = re.search(br"type=\S+", log)
assert res
type = res.group(0).split(b'=')[1].strip()
payload = bytearray([])
payload_len = 0
if type in [b"JOIN_FIN.req", b"JOIN_FIN.rsp"]:
res = re.search(br"len=\d+", log)
assert res
payload_len = int(res.group(0).split(b'=')[1].strip())
hex_pattern = br"\|(\s([0-9a-fA-F]{2}|\.\.))+?\s+?\|"
while True:
res = re.search(hex_pattern, log)
if not res:
break
data = [int(hex, 16) for hex in res.group(0)[1:-1].split(b' ') if hex and hex != b'..']
payload += bytearray(data)
log = log[res.end() - 1:]
assert len(payload) == payload_len
return (direction, type, payload)
def send_command(self, cmd, go=True, expect_command_echo=True):
print("%d: %s" % (self.nodeid, cmd))
self.pexpect.send(cmd + '\n')
if go:
self.simulator.go(0, nodeid=self.nodeid)
sys.stdout.flush()
if expect_command_echo:
self._expect_command_echo(cmd)
def _expect_command_echo(self, cmd):
cmd = cmd.strip()
while True:
line = self.__readline()
if line == cmd:
break
logging.warning("expecting echo %r, but read %r", cmd, line)
def __readline(self, ignore_logs=True):
PROMPT = 'spinel-cli > ' if self.node_type == 'ncp-sim' else '> '
while True:
self._expect(r"[^\n]+\n")
line = self.pexpect.match.group(0).decode('utf8').strip()
while line.startswith(PROMPT):
line = line[len(PROMPT):]
if line == '':
continue
if ignore_logs and self.__is_logging_line(line):
continue
return line
def get_commands(self):
self.send_command('?')
self._expect('Commands:')
return self._expect_results(r'\S+')
def set_mode(self, mode):
cmd = 'mode %s' % mode
self.send_command(cmd)
self._expect_done()
def debug(self, level):
# `debug` command will not trigger interaction with simulator
self.send_command('debug %d' % level, go=False)
def start(self):
self.interface_up()
self.thread_start()
def stop(self):
self.thread_stop()
self.interface_down()
def set_log_level(self, level: int):
self.send_command(f'log level {level}')
self._expect_done()
def interface_up(self):
self.send_command('ifconfig up')
self._expect_done()
def interface_down(self):
self.send_command('ifconfig down')
self._expect_done()
def thread_start(self):
self.send_command('thread start')
self._expect_done()
def thread_stop(self):
self.send_command('thread stop')
self._expect_done()
def commissioner_start(self):
cmd = 'commissioner start'
self.send_command(cmd)
self._expect_done()
def commissioner_stop(self):
cmd = 'commissioner stop'
self.send_command(cmd)
self._expect_done()
def commissioner_state(self):
states = [r'disabled', r'petitioning', r'active']
self.send_command('commissioner state')
return self._expect_result(states)
def commissioner_add_joiner(self, addr, psk):
cmd = 'commissioner joiner add %s %s' % (addr, psk)
self.send_command(cmd)
self._expect_done()
def commissioner_set_provisioning_url(self, provisioning_url=''):
cmd = 'commissioner provisioningurl %s' % provisioning_url
self.send_command(cmd)
self._expect_done()
def joiner_start(self, pskd='', provisioning_url=''):
cmd = 'joiner start %s %s' % (pskd, provisioning_url)
self.send_command(cmd)
self._expect_done()
def clear_allowlist(self):
cmd = 'macfilter addr clear'
self.send_command(cmd)
self._expect_done()
def enable_allowlist(self):
cmd = 'macfilter addr allowlist'
self.send_command(cmd)
self._expect_done()
def disable_allowlist(self):
cmd = 'macfilter addr disable'
self.send_command(cmd)
self._expect_done()
def add_allowlist(self, addr, rssi=None):
cmd = 'macfilter addr add %s' % addr
if rssi is not None:
cmd += ' %s' % rssi
self.send_command(cmd)
self._expect_done()
def radiofilter_is_enabled(self) -> bool:
states = [r'Disabled', r'Enabled']
self.send_command('radiofilter')
return self._expect_result(states) == 'Enabled'
def radiofilter_enable(self):
cmd = 'radiofilter enable'
self.send_command(cmd)
self._expect_done()
def radiofilter_disable(self):
cmd = 'radiofilter disable'
self.send_command(cmd)
self._expect_done()
def get_bbr_registration_jitter(self):
self.send_command('bbr jitter')
return int(self._expect_result(r'\d+'))
def set_bbr_registration_jitter(self, jitter):
cmd = 'bbr jitter %d' % jitter
self.send_command(cmd)
self._expect_done()
def get_rcp_version(self) -> str:
self.send_command('rcp version')
rcp_version = self._expect_command_output()[0].strip()
return rcp_version
def srp_server_get_state(self):
states = ['disabled', 'running', 'stopped']
self.send_command('srp server state')
return self._expect_result(states)
def srp_server_get_addr_mode(self):
modes = [r'unicast', r'anycast']
self.send_command(f'srp server addrmode')
return self._expect_result(modes)
def srp_server_set_addr_mode(self, mode):
self.send_command(f'srp server addrmode {mode}')
self._expect_done()
def srp_server_get_anycast_seq_num(self):
self.send_command(f'srp server seqnum')
return int(self._expect_result(r'\d+'))
def srp_server_set_anycast_seq_num(self, seqnum):
self.send_command(f'srp server seqnum {seqnum}')
self._expect_done()
def srp_server_set_enabled(self, enable):
cmd = f'srp server {"enable" if enable else "disable"}'
self.send_command(cmd)
self._expect_done()
def srp_server_set_lease_range(self, min_lease, max_lease, min_key_lease, max_key_lease):
self.send_command(f'srp server lease {min_lease} {max_lease} {min_key_lease} {max_key_lease}')
self._expect_done()
def srp_server_get_hosts(self):
"""Returns the host list on the SRP server as a list of property
dictionary.
Example output:
[{
'fullname': 'my-host.default.service.arpa.',
'name': 'my-host',
'deleted': 'false',
'addresses': ['2001::1', '2001::2']
}]
"""
cmd = 'srp server host'
self.send_command(cmd)
lines = self._expect_command_output()
host_list = []
while lines:
host = {}
host['fullname'] = lines.pop(0).strip()
host['name'] = host['fullname'].split('.')[0]
host['deleted'] = lines.pop(0).strip().split(':')[1].strip()
if host['deleted'] == 'true':
host_list.append(host)
continue
addresses = lines.pop(0).strip().split('[')[1].strip(' ]').split(',')
map(str.strip, addresses)
host['addresses'] = [addr for addr in addresses if addr]
host_list.append(host)
return host_list
def srp_server_get_host(self, host_name):
"""Returns host on the SRP server that matches given host name.
Example usage:
self.srp_server_get_host("my-host")
"""
for host in self.srp_server_get_hosts():
if host_name == host['name']:
return host
def srp_server_get_services(self):
"""Returns the service list on the SRP server as a list of property
dictionary.
Example output:
[{
'fullname': 'my-service._ipps._tcp.default.service.arpa.',
'instance': 'my-service',
'name': '_ipps._tcp',
'deleted': 'false',
'port': '12345',
'priority': '0',
'weight': '0',
'TXT': ['abc=010203'],
'host_fullname': 'my-host.default.service.arpa.',
'host': 'my-host',
'addresses': ['2001::1', '2001::2']
}]
Note that the TXT data is output as a HEX string.
"""
cmd = 'srp server service'
self.send_command(cmd)
lines = self._expect_command_output()
service_list = []
while lines:
service = {}
service['fullname'] = lines.pop(0).strip()
name_labels = service['fullname'].split('.')
service['instance'] = name_labels[0]
service['name'] = '.'.join(name_labels[1:3])
service['deleted'] = lines.pop(0).strip().split(':')[1].strip()
if service['deleted'] == 'true':
service_list.append(service)
continue
# 'subtypes', port', 'priority', 'weight'
for i in range(0, 4):
key_value = lines.pop(0).strip().split(':')
service[key_value[0].strip()] = key_value[1].strip()
txt_entries = lines.pop(0).strip().split('[')[1].strip(' ]').split(',')
txt_entries = map(str.strip, txt_entries)
service['TXT'] = [txt for txt in txt_entries if txt]
service['host_fullname'] = lines.pop(0).strip().split(':')[1].strip()
service['host'] = service['host_fullname'].split('.')[0]
addresses = lines.pop(0).strip().split('[')[1].strip(' ]').split(',')
addresses = map(str.strip, addresses)
service['addresses'] = [addr for addr in addresses if addr]
service_list.append(service)
return service_list
def srp_server_get_service(self, instance_name, service_name):
"""Returns service on the SRP server that matches given instance
name and service name.
Example usage:
self.srp_server_get_service("my-service", "_ipps._tcp")
"""
for service in self.srp_server_get_services():
if (instance_name == service['instance'] and service_name == service['name']):
return service
def get_srp_server_port(self):
"""Returns the SRP server UDP port by parsing
the SRP Server Data in Network Data.
"""
for service in self.get_services():
# TODO: for now, we are using 0xfd as the SRP service data.
# May use a dedicated bit flag for SRP server.
if int(service[1], 16) == 0x5d:
# The SRP server data contains IPv6 address (16 bytes)
# followed by UDP port number.
return int(service[2][2 * 16:], 16)
def srp_client_start(self, server_address, server_port):
self.send_command(f'srp client start {server_address} {server_port}')
self._expect_done()
def srp_client_stop(self):
self.send_command(f'srp client stop')
self._expect_done()
def srp_client_get_state(self):
cmd = 'srp client state'
self.send_command(cmd)
return self._expect_command_output()[0]
def srp_client_get_auto_start_mode(self):
cmd = 'srp client autostart'
self.send_command(cmd)
return self._expect_command_output()[0]
def srp_client_enable_auto_start_mode(self):
self.send_command(f'srp client autostart enable')
self._expect_done()
def srp_client_disable_auto_start_mode(self):
self.send_command(f'srp client autostart able')
self._expect_done()
def srp_client_get_server_address(self):
cmd = 'srp client server address'
self.send_command(cmd)
return self._expect_command_output()[0]
def srp_client_get_server_port(self):
cmd = 'srp client server port'
self.send_command(cmd)
return int(self._expect_command_output()[0])
def srp_client_get_host_state(self):
cmd = 'srp client host state'
self.send_command(cmd)
return self._expect_command_output()[0]
def srp_client_set_host_name(self, name):
self.send_command(f'srp client host name {name}')
self._expect_done()
def srp_client_get_host_name(self):
self.send_command(f'srp client host name')
self._expect_done()
def srp_client_remove_host(self, remove_key=False, send_unreg_to_server=False):
self.send_command(f'srp client host remove {int(remove_key)} {int(send_unreg_to_server)}')
self._expect_done()
def srp_client_clear_host(self):
self.send_command(f'srp client host clear')
self._expect_done()
def srp_client_set_host_address(self, *addrs: str):
self.send_command(f'srp client host address {" ".join(addrs)}')
self._expect_done()
def srp_client_get_host_address(self):
self.send_command(f'srp client host address')
self._expect_done()
def srp_client_add_service(self, instance_name, service_name, port, priority=0, weight=0, txt_entries=[]):
txt_record = "".join(self._encode_txt_entry(entry) for entry in txt_entries)
instance_name = self._escape_escapable(instance_name)
self.send_command(
f'srp client service add {instance_name} {service_name} {port} {priority} {weight} {txt_record}')
self._expect_done()
def srp_client_remove_service(self, instance_name, service_name):
self.send_command(f'srp client service remove {instance_name} {service_name}')
self._expect_done()
def srp_client_clear_service(self, instance_name, service_name):
self.send_command(f'srp client service clear {instance_name} {service_name}')
self._expect_done()
def srp_client_get_services(self):
cmd = 'srp client service'
self.send_command(cmd)
service_lines = self._expect_command_output()
return [self._parse_srp_client_service(line) for line in service_lines]
def srp_client_set_lease_interval(self, leaseinterval: int):
cmd = f'srp client leaseinterval {leaseinterval}'
self.send_command(cmd)
self._expect_done()
def srp_client_get_lease_interval(self) -> int:
cmd = 'srp client leaseinterval'
self.send_command(cmd)
return int(self._expect_result('\d+'))
#
# TREL utilities
#
def get_trel_state(self) -> Union[None, bool]:
states = [r'Disabled', r'Enabled']
self.send_command('trel')
try:
return self._expect_result(states) == 'Enabled'
except Exception as ex:
if 'InvalidCommand' in str(ex):
return None
raise
def _encode_txt_entry(self, entry):
"""Encodes the TXT entry to the DNS-SD TXT record format as a HEX string.
Example usage:
self._encode_txt_entries(['abc']) -> '03616263'
self._encode_txt_entries(['def=']) -> '046465663d'
self._encode_txt_entries(['xyz=XYZ']) -> '0778797a3d58595a'
"""
return '{:02x}'.format(len(entry)) + "".join("{:02x}".format(ord(c)) for c in entry)
def _parse_srp_client_service(self, line: str):
"""Parse one line of srp service list into a dictionary which
maps string keys to string values.
Example output for input
'instance:\"%s\", name:\"%s\", state:%s, port:%d, priority:%d, weight:%d"'
{
'instance': 'my-service',
'name': '_ipps._udp',
'state': 'ToAdd',
'port': '12345',
'priority': '0',
'weight': '0'
}
Note that value of 'port', 'priority' and 'weight' are represented
as strings but not integers.
"""
key_values = [word.strip().split(':') for word in line.split(', ')]
keys = [key_value[0] for key_value in key_values]
values = [key_value[1].strip('"') for key_value in key_values]
return dict(zip(keys, values))
def locate(self, anycast_addr):
cmd = 'locate ' + anycast_addr
self.send_command(cmd)
self.simulator.go(5)
return self._parse_locate_result(self._expect_command_output()[0])
def _parse_locate_result(self, line: str):
"""Parse anycast locate result as list of ml-eid and rloc16.
Example output for input
'fd00:db8:0:0:acf9:9d0:7f3c:b06e 0xa800'
[ 'fd00:db8:0:0:acf9:9d0:7f3c:b06e', '0xa800' ]
"""
return line.split(' ')
def enable_backbone_router(self):
cmd = 'bbr enable'
self.send_command(cmd)
self._expect_done()
def disable_backbone_router(self):
cmd = 'bbr disable'
self.send_command(cmd)
self._expect_done()
def register_backbone_router(self):
cmd = 'bbr register'
self.send_command(cmd)
self._expect_done()
def get_backbone_router_state(self):
states = [r'Disabled', r'Primary', r'Secondary']
self.send_command('bbr state')
return self._expect_result(states)
@property
def is_primary_backbone_router(self) -> bool:
return self.get_backbone_router_state() == 'Primary'
def get_backbone_router(self):
cmd = 'bbr config'
self.send_command(cmd)
self._expect(r'(.*)Done')
g = self.pexpect.match.groups()
output = g[0].decode("utf-8")
lines = output.strip().split('\n')
lines = [l.strip() for l in lines]
ret = {}
for l in lines:
z = re.search(r'seqno:\s+([0-9]+)', l)
if z:
ret['seqno'] = int(z.groups()[0])
z = re.search(r'delay:\s+([0-9]+)', l)
if z:
ret['delay'] = int(z.groups()[0])
z = re.search(r'timeout:\s+([0-9]+)', l)
if z:
ret['timeout'] = int(z.groups()[0])
return ret
def set_backbone_router(self, seqno=None, reg_delay=None, mlr_timeout=None):
cmd = 'bbr config'
if seqno is not None:
cmd += ' seqno %d' % seqno
if reg_delay is not None:
cmd += ' delay %d' % reg_delay
if mlr_timeout is not None:
cmd += ' timeout %d' % mlr_timeout
self.send_command(cmd)
self._expect_done()
def set_domain_prefix(self, prefix, flags='prosD'):
self.add_prefix(prefix, flags)
self.register_netdata()
def remove_domain_prefix(self, prefix):
self.remove_prefix(prefix)
self.register_netdata()
def set_next_dua_response(self, status: Union[str, int], iid=None):
# Convert 5.00 to COAP CODE 160
if isinstance(status, str):
assert '.' in status
status = status.split('.')
status = (int(status[0]) << 5) + int(status[1])
cmd = 'bbr mgmt dua {}'.format(status)
if iid is not None:
cmd += ' ' + str(iid)
self.send_command(cmd)
self._expect_done()
def set_dua_iid(self, iid: str):
assert len(iid) == 16
int(iid, 16)
cmd = 'dua iid {}'.format(iid)
self.send_command(cmd)
self._expect_done()
def clear_dua_iid(self):
cmd = 'dua iid clear'
self.send_command(cmd)
self._expect_done()
def multicast_listener_list(self) -> Dict[IPv6Address, int]:
cmd = 'bbr mgmt mlr listener'
self.send_command(cmd)
table = {}
for line in self._expect_results("\S+ \d+"):
line = line.split()
assert len(line) == 2, line
ip = IPv6Address(line[0])
timeout = int(line[1])
assert ip not in table
table[ip] = timeout
return table
def multicast_listener_clear(self):
cmd = f'bbr mgmt mlr listener clear'
self.send_command(cmd)
self._expect_done()
def multicast_listener_add(self, ip: Union[IPv6Address, str], timeout: int = 0):
if not isinstance(ip, IPv6Address):
ip = IPv6Address(ip)
cmd = f'bbr mgmt mlr listener add {ip.compressed} {timeout}'
self.send_command(cmd)
self._expect(r"(Done|Error .*)")
def set_next_mlr_response(self, status: int):
cmd = 'bbr mgmt mlr response {}'.format(status)
self.send_command(cmd)
self._expect_done()
def register_multicast_listener(self, *ipaddrs: Union[IPv6Address, str], timeout=None):
assert len(ipaddrs) > 0, ipaddrs
ipaddrs = map(str, ipaddrs)
cmd = f'mlr reg {" ".join(ipaddrs)}'
if timeout is not None:
cmd += f' {int(timeout)}'
self.send_command(cmd)
self.simulator.go(3)
lines = self._expect_command_output()
m = re.match(r'status (\d+), (\d+) failed', lines[0])
assert m is not None, lines
status = int(m.group(1))
failed_num = int(m.group(2))
assert failed_num == len(lines) - 1
failed_ips = list(map(IPv6Address, lines[1:]))
print(f"register_multicast_listener {ipaddrs} => status: {status}, failed ips: {failed_ips}")
return status, failed_ips
def set_link_quality(self, addr, lqi):
cmd = 'macfilter rss add-lqi %s %s' % (addr, lqi)
self.send_command(cmd)
self._expect_done()
def set_outbound_link_quality(self, lqi):
cmd = 'macfilter rss add-lqi * %s' % (lqi)
self.send_command(cmd)
self._expect_done()
def remove_allowlist(self, addr):
cmd = 'macfilter addr remove %s' % addr
self.send_command(cmd)
self._expect_done()
def get_addr16(self):
self.send_command('rloc16')
rloc16 = self._expect_result(r'[0-9a-fA-F]{4}')
return int(rloc16, 16)
def get_router_id(self):
rloc16 = self.get_addr16()
return rloc16 >> 10
def get_addr64(self):
self.send_command('extaddr')
return self._expect_result('[0-9a-fA-F]{16}')
def set_addr64(self, addr64: str):
# Make sure `addr64` is a hex string of length 16
assert len(addr64) == 16
int(addr64, 16)
self.send_command('extaddr %s' % addr64)
self._expect_done()
def get_eui64(self):
self.send_command('eui64')
return self._expect_result('[0-9a-fA-F]{16}')
def set_extpanid(self, extpanid):
self.send_command('extpanid %s' % extpanid)
self._expect_done()
def get_extpanid(self):
self.send_command('extpanid')
return self._expect_result('[0-9a-fA-F]{16}')
def get_mesh_local_prefix(self):
self.send_command('prefix meshlocal')
return self._expect_command_output()[0]
def set_mesh_local_prefix(self, mesh_local_prefix):
self.send_command('prefix meshlocal %s' % mesh_local_prefix)
self._expect_done()
def get_joiner_id(self):
self.send_command('joiner id')
return self._expect_result('[0-9a-fA-F]{16}')
def get_channel(self):
self.send_command('channel')
return int(self._expect_result(r'\d+'))
def set_channel(self, channel):
cmd = 'channel %d' % channel
self.send_command(cmd)
self._expect_done()
def get_networkkey(self):
self.send_command('networkkey')
return self._expect_result('[0-9a-fA-F]{32}')
def set_networkkey(self, networkkey):
cmd = 'networkkey %s' % networkkey
self.send_command(cmd)
self._expect_done()
def get_key_sequence_counter(self):
self.send_command('keysequence counter')
result = self._expect_result(r'\d+')
return int(result)
def set_key_sequence_counter(self, key_sequence_counter):
cmd = 'keysequence counter %d' % key_sequence_counter
self.send_command(cmd)
self._expect_done()
def set_key_switch_guardtime(self, key_switch_guardtime):
cmd = 'keysequence guardtime %d' % key_switch_guardtime
self.send_command(cmd)
self._expect_done()
def set_network_id_timeout(self, network_id_timeout):
cmd = 'networkidtimeout %d' % network_id_timeout
self.send_command(cmd)
self._expect_done()
def _escape_escapable(self, string):
"""Escape CLI escapable characters in the given string.
Args:
string (str): UTF-8 input string.
Returns:
[str]: The modified string with escaped characters.
"""
escapable_chars = '\\ \t\r\n'
for char in escapable_chars:
string = string.replace(char, '\\%s' % char)
return string
def get_network_name(self):
self.send_command('networkname')
return self._expect_result([r'\S+'])
def set_network_name(self, network_name):
cmd = 'networkname %s' % self._escape_escapable(network_name)
self.send_command(cmd)
self._expect_done()
def get_panid(self):
self.send_command('panid')
result = self._expect_result('0x[0-9a-fA-F]{4}')
return int(result, 16)
def set_panid(self, panid=config.PANID):
cmd = 'panid %d' % panid
self.send_command(cmd)
self._expect_done()
def set_parent_priority(self, priority):
cmd = 'parentpriority %d' % priority
self.send_command(cmd)
self._expect_done()
def get_partition_id(self):
self.send_command('partitionid')
return self._expect_result(r'\d+')
def get_preferred_partition_id(self):
self.send_command('partitionid preferred')
return self._expect_result(r'\d+')
def set_preferred_partition_id(self, partition_id):
cmd = 'partitionid preferred %d' % partition_id
self.send_command(cmd)
self._expect_done()
def get_pollperiod(self):
self.send_command('pollperiod')
return self._expect_result(r'\d+')
def set_pollperiod(self, pollperiod):
self.send_command('pollperiod %d' % pollperiod)
self._expect_done()
def get_csl_info(self):
self.send_command('csl')
self._expect_done()
def set_csl_channel(self, csl_channel):
self.send_command('csl channel %d' % csl_channel)
self._expect_done()
def set_csl_period(self, csl_period):
self.send_command('csl period %d' % csl_period)
self._expect_done()
def set_csl_timeout(self, csl_timeout):
self.send_command('csl timeout %d' % csl_timeout)
self._expect_done()
def send_mac_emptydata(self):
self.send_command('mac send emptydata')
self._expect_done()
def send_mac_datarequest(self):
self.send_command('mac send datarequest')
self._expect_done()
def set_router_upgrade_threshold(self, threshold):
cmd = 'routerupgradethreshold %d' % threshold
self.send_command(cmd)
self._expect_done()
def set_router_downgrade_threshold(self, threshold):
cmd = 'routerdowngradethreshold %d' % threshold
self.send_command(cmd)
self._expect_done()
def get_router_downgrade_threshold(self) -> int:
self.send_command('routerdowngradethreshold')
return int(self._expect_result(r'\d+'))
def set_router_eligible(self, enable: bool):
cmd = f'routereligible {"enable" if enable else "disable"}'
self.send_command(cmd)
self._expect_done()
def get_router_eligible(self) -> bool:
states = [r'Disabled', r'Enabled']
self.send_command('routereligible')
return self._expect_result(states) == 'Enabled'
def prefer_router_id(self, router_id):
cmd = 'preferrouterid %d' % router_id
self.send_command(cmd)
self._expect_done()
def release_router_id(self, router_id):
cmd = 'releaserouterid %d' % router_id
self.send_command(cmd)
self._expect_done()
def get_state(self):
states = [r'detached', r'child', r'router', r'leader', r'disabled']
self.send_command('state')
return self._expect_result(states)
def set_state(self, state):
cmd = 'state %s' % state
self.send_command(cmd)
self._expect_done()
def get_timeout(self):
self.send_command('childtimeout')
return self._expect_result(r'\d+')
def set_timeout(self, timeout):
cmd = 'childtimeout %d' % timeout
self.send_command(cmd)
self._expect_done()
def set_max_children(self, number):
cmd = 'childmax %d' % number
self.send_command(cmd)
self._expect_done()
def get_weight(self):
self.send_command('leaderweight')
return self._expect_result(r'\d+')
def set_weight(self, weight):
cmd = 'leaderweight %d' % weight
self.send_command(cmd)
self._expect_done()
def add_ipaddr(self, ipaddr):
cmd = 'ipaddr add %s' % ipaddr
self.send_command(cmd)
self._expect_done()
def del_ipaddr(self, ipaddr):
cmd = 'ipaddr del %s' % ipaddr
self.send_command(cmd)
self._expect_done()
def add_ipmaddr(self, ipmaddr):
cmd = 'ipmaddr add %s' % ipmaddr
self.send_command(cmd)
self._expect_done()
def del_ipmaddr(self, ipmaddr):
cmd = 'ipmaddr del %s' % ipmaddr
self.send_command(cmd)
self._expect_done()
def get_addrs(self):
self.send_command('ipaddr')
return self._expect_results(r'\S+(:\S*)+')
def get_mleid(self):
self.send_command('ipaddr mleid')
return self._expect_result(r'\S+(:\S*)+')
def get_linklocal(self):
self.send_command('ipaddr linklocal')
return self._expect_result(r'\S+(:\S*)+')
def get_rloc(self):
self.send_command('ipaddr rloc')
return self._expect_result(r'\S+(:\S*)+')
def get_addr(self, prefix):
network = ipaddress.ip_network(u'%s' % str(prefix))
addrs = self.get_addrs()
for addr in addrs:
if isinstance(addr, bytearray):
addr = bytes(addr)
ipv6_address = ipaddress.ip_address(addr)
if ipv6_address in network:
return ipv6_address.exploded
return None
def has_ipaddr(self, address):
ipaddr = ipaddress.ip_address(address)
ipaddrs = self.get_addrs()
for addr in ipaddrs:
if isinstance(addr, bytearray):
addr = bytes(addr)
if ipaddress.ip_address(addr) == ipaddr:
return True
return False
def get_ipmaddrs(self):
self.send_command('ipmaddr')
return self._expect_results(r'\S+(:\S*)+')
def has_ipmaddr(self, address):
ipmaddr = ipaddress.ip_address(address)
ipmaddrs = self.get_ipmaddrs()
for addr in ipmaddrs:
if isinstance(addr, bytearray):
addr = bytes(addr)
if ipaddress.ip_address(addr) == ipmaddr:
return True
return False
def get_addr_leader_aloc(self):
addrs = self.get_addrs()
for addr in addrs:
segs = addr.split(':')
if (segs[4] == '0' and segs[5] == 'ff' and segs[6] == 'fe00' and segs[7] == 'fc00'):
return addr
return None
def get_mleid_iid(self):
ml_eid = IPv6Address(self.get_mleid())
return ml_eid.packed[8:].hex()
def get_eidcaches(self):
eidcaches = []
self.send_command('eidcache')
for line in self._expect_results(r'([a-fA-F0-9\:]+) ([a-fA-F0-9]+)'):
eidcaches.append(line.split())
return eidcaches
def add_service(self, enterpriseNumber, serviceData, serverData):
cmd = 'service add %s %s %s' % (
enterpriseNumber,
serviceData,
serverData,
)
self.send_command(cmd)
self._expect_done()
def remove_service(self, enterpriseNumber, serviceData):
cmd = 'service remove %s %s' % (enterpriseNumber, serviceData)
self.send_command(cmd)
self._expect_done()
def get_child_table(self) -> Dict[int, Dict[str, Any]]:
"""Get the table of attached children."""
cmd = 'child table'
self.send_command(cmd)
output = self._expect_command_output()
#
# Example output:
# | ID | RLOC16 | Timeout | Age | LQ In | C_VN |R|D|N|Ver|CSL|QMsgCnt| Extended MAC |
# +-----+--------+------------+------------+-------+------+-+-+-+---+---+-------+------------------+
# | 1 | 0xc801 | 240 | 24 | 3 | 131 |1|0|0| 3| 0 | 0 | 4ecede68435358ac |
# | 2 | 0xc802 | 240 | 2 | 3 | 131 |0|0|0| 3| 1 | 0 | a672a601d2ce37d8 |
# Done
#
headers = self.__split_table_row(output[0])
table = {}
for line in output[2:]:
line = line.strip()
if not line:
continue
fields = self.__split_table_row(line)
col = lambda colname: self.__get_table_col(colname, headers, fields)
id = int(col("ID"))
r, d, n = int(col("R")), int(col("D")), int(col("N"))
mode = f'{"r" if r else ""}{"d" if d else ""}{"n" if n else ""}'
table[int(id)] = {
'id': int(id),
'rloc16': int(col('RLOC16'), 16),
'timeout': int(col('Timeout')),
'age': int(col('Age')),
'lq_in': int(col('LQ In')),
'c_vn': int(col('C_VN')),
'mode': mode,
'extaddr': col('Extended MAC'),
'ver': int(col('Ver')),
'csl': bool(int(col('CSL'))),
'qmsgcnt': int(col('QMsgCnt')),
}
return table
def __split_table_row(self, row: str) -> List[str]:
if not (row.startswith('|') and row.endswith('|')):
raise ValueError(row)
fields = row.split('|')
fields = [x.strip() for x in fields[1:-1]]
return fields
def __get_table_col(self, colname: str, headers: List[str], fields: List[str]) -> str:
return fields[headers.index(colname)]
def __getOmrAddress(self):
prefixes = [prefix.split('::')[0] for prefix in self.get_prefixes()]
omr_addrs = []
for addr in self.get_addrs():
for prefix in prefixes:
if (addr.startswith(prefix)) and (addr != self.__getDua()):
omr_addrs.append(addr)
break
return omr_addrs
def __getLinkLocalAddress(self):
for ip6Addr in self.get_addrs():
if re.match(config.LINK_LOCAL_REGEX_PATTERN, ip6Addr, re.I):
return ip6Addr
return None
def __getGlobalAddress(self):
global_address = []
for ip6Addr in self.get_addrs():
if ((not re.match(config.LINK_LOCAL_REGEX_PATTERN, ip6Addr, re.I)) and
(not re.match(config.MESH_LOCAL_PREFIX_REGEX_PATTERN, ip6Addr, re.I)) and
(not re.match(config.ROUTING_LOCATOR_REGEX_PATTERN, ip6Addr, re.I))):
global_address.append(ip6Addr)
return global_address
def __getRloc(self):
for ip6Addr in self.get_addrs():
if (re.match(config.MESH_LOCAL_PREFIX_REGEX_PATTERN, ip6Addr, re.I) and
re.match(config.ROUTING_LOCATOR_REGEX_PATTERN, ip6Addr, re.I) and
not (re.match(config.ALOC_FLAG_REGEX_PATTERN, ip6Addr, re.I))):
return ip6Addr
return None
def __getAloc(self):
aloc = []
for ip6Addr in self.get_addrs():
if (re.match(config.MESH_LOCAL_PREFIX_REGEX_PATTERN, ip6Addr, re.I) and
re.match(config.ROUTING_LOCATOR_REGEX_PATTERN, ip6Addr, re.I) and
re.match(config.ALOC_FLAG_REGEX_PATTERN, ip6Addr, re.I)):
aloc.append(ip6Addr)
return aloc
def __getMleid(self):
for ip6Addr in self.get_addrs():
if re.match(config.MESH_LOCAL_PREFIX_REGEX_PATTERN, ip6Addr,
re.I) and not (re.match(config.ROUTING_LOCATOR_REGEX_PATTERN, ip6Addr, re.I)):
return ip6Addr
return None
def __getDua(self) -> Optional[str]:
for ip6Addr in self.get_addrs():
if re.match(config.DOMAIN_PREFIX_REGEX_PATTERN, ip6Addr, re.I):
return ip6Addr
return None
def get_ip6_address_by_prefix(self, prefix: Union[str, IPv6Network]) -> List[IPv6Address]:
"""Get addresses matched with given prefix.
Args:
prefix: the prefix to match against.
Can be either a string or ipaddress.IPv6Network.
Returns:
The IPv6 address list.
"""
if isinstance(prefix, str):
prefix = IPv6Network(prefix)
addrs = map(IPv6Address, self.get_addrs())
return [addr for addr in addrs if addr in prefix]
def get_ip6_address(self, address_type):
"""Get specific type of IPv6 address configured on thread device.
Args:
address_type: the config.ADDRESS_TYPE type of IPv6 address.
Returns:
IPv6 address string.
"""
if address_type == config.ADDRESS_TYPE.LINK_LOCAL:
return self.__getLinkLocalAddress()
elif address_type == config.ADDRESS_TYPE.GLOBAL:
return self.__getGlobalAddress()
elif address_type == config.ADDRESS_TYPE.RLOC:
return self.__getRloc()
elif address_type == config.ADDRESS_TYPE.ALOC:
return self.__getAloc()
elif address_type == config.ADDRESS_TYPE.ML_EID:
return self.__getMleid()
elif address_type == config.ADDRESS_TYPE.DUA:
return self.__getDua()
elif address_type == config.ADDRESS_TYPE.BACKBONE_GUA:
return self._getBackboneGua()
elif address_type == config.ADDRESS_TYPE.OMR:
return self.__getOmrAddress()
else:
return None
def get_context_reuse_delay(self):
self.send_command('contextreusedelay')
return self._expect_result(r'\d+')
def set_context_reuse_delay(self, delay):
cmd = 'contextreusedelay %d' % delay
self.send_command(cmd)
self._expect_done()
def add_prefix(self, prefix, flags='paosr', prf='med'):
cmd = 'prefix add %s %s %s' % (prefix, flags, prf)
self.send_command(cmd)
self._expect_done()
def remove_prefix(self, prefix):
cmd = 'prefix remove %s' % prefix
self.send_command(cmd)
self._expect_done()
def enable_br(self):
self.send_command('br enable')
self._expect_done()
def disable_br(self):
self.send_command('br disable')
self._expect_done()
def get_br_omr_prefix(self):
cmd = 'br omrprefix'
self.send_command(cmd)
return self._expect_command_output()[0]
def get_netdata_omr_prefixes(self):
prefixes = [prefix.split(' ')[0] for prefix in self.get_prefixes()]
return prefixes
def get_br_on_link_prefix(self):
cmd = 'br onlinkprefix'
self.send_command(cmd)
return self._expect_command_output()[0]
def get_netdata_non_nat64_prefixes(self):
prefixes = []
routes = self.get_routes()
for route in routes:
if 'n' not in route.split(' ')[1]:
prefixes.append(route.split(' ')[0])
return prefixes
def get_br_nat64_prefix(self):
cmd = 'br nat64prefix'
self.send_command(cmd)
return self._expect_command_output()[0]
def get_netdata_nat64_prefix(self):
prefixes = []
routes = self.get_routes()
for route in routes:
if 'n' in route.split(' ')[1]:
prefixes.append(route.split(' ')[0])
return prefixes
def get_prefixes(self):
return self.get_netdata()['Prefixes']
def get_routes(self):
return self.get_netdata()['Routes']
def get_services(self):
netdata = self.netdata_show()
services = []
services_section = False
for line in netdata:
if line.startswith('Services:'):
services_section = True
elif services_section:
services.append(line.strip().split(' '))
return services
def netdata_show(self):
self.send_command('netdata show')
return self._expect_command_output()
def get_netdata(self):
raw_netdata = self.netdata_show()
netdata = {'Prefixes': [], 'Routes': [], 'Services': []}
key_list = ['Prefixes', 'Routes', 'Services']
key = None
for i in range(0, len(raw_netdata)):
keys = list(filter(raw_netdata[i].startswith, key_list))
if keys != []:
key = keys[0]
elif key is not None:
netdata[key].append(raw_netdata[i])
return netdata
def add_route(self, prefix, stable=False, nat64=False, prf='med'):
cmd = 'route add %s ' % prefix
if stable:
cmd += 's'
if nat64:
cmd += 'n'
cmd += ' %s' % prf
self.send_command(cmd)
self._expect_done()
def remove_route(self, prefix):
cmd = 'route remove %s' % prefix
self.send_command(cmd)
self._expect_done()
def register_netdata(self):
self.send_command('netdata register')
self._expect_done()
def netdata_publish_dnssrp_anycast(self, seqnum):
self.send_command(f'netdata publish dnssrp anycast {seqnum}')
self._expect_done()
def netdata_publish_dnssrp_unicast(self, address, port):
self.send_command(f'netdata publish dnssrp unicast {address} {port}')
self._expect_done()
def netdata_publish_dnssrp_unicast_mleid(self, port):
self.send_command(f'netdata publish dnssrp unicast {port}')
self._expect_done()
def netdata_unpublish_dnssrp(self):
self.send_command('netdata unpublish dnssrp')
self._expect_done()
def netdata_publish_prefix(self, prefix, flags='paosr', prf='med'):
self.send_command(f'netdata publish prefix {prefix} {flags} {prf}')
self._expect_done()
def netdata_publish_route(self, prefix, flags='s', prf='med'):
self.send_command(f'netdata publish route {prefix} {flags} {prf}')
self._expect_done()
def netdata_unpublish_prefix(self, prefix):
self.send_command(f'netdata unpublish {prefix}')
self._expect_done()
def send_network_diag_get(self, addr, tlv_types):
self.send_command('networkdiagnostic get %s %s' % (addr, ' '.join([str(t.value) for t in tlv_types])))
if isinstance(self.simulator, simulator.VirtualTime):
self.simulator.go(8)
timeout = 1
else:
timeout = 8
self._expect_done(timeout=timeout)
def send_network_diag_reset(self, addr, tlv_types):
self.send_command('networkdiagnostic reset %s %s' % (addr, ' '.join([str(t.value) for t in tlv_types])))
if isinstance(self.simulator, simulator.VirtualTime):
self.simulator.go(8)
timeout = 1
else:
timeout = 8
self._expect_done(timeout=timeout)
def energy_scan(self, mask, count, period, scan_duration, ipaddr):
cmd = 'commissioner energy %d %d %d %d %s' % (
mask,
count,
period,
scan_duration,
ipaddr,
)
self.send_command(cmd)
if isinstance(self.simulator, simulator.VirtualTime):
self.simulator.go(8)
timeout = 1
else:
timeout = 8
self._expect('Energy:', timeout=timeout)
def panid_query(self, panid, mask, ipaddr):
cmd = 'commissioner panid %d %d %s' % (panid, mask, ipaddr)
self.send_command(cmd)
if isinstance(self.simulator, simulator.VirtualTime):
self.simulator.go(8)
timeout = 1
else:
timeout = 8
self._expect('Conflict:', timeout=timeout)
def scan(self, result=1, timeout=10):
self.send_command('scan')
self.simulator.go(timeout)
if result == 1:
networks = []
for line in self._expect_command_output()[2:]:
_, panid, extaddr, channel, dbm, lqi, _ = map(str.strip, line.split('|'))
panid = int(panid, 16)
channel, dbm, lqi = map(int, (channel, dbm, lqi))
networks.append({
'panid': panid,
'extaddr': extaddr,
'channel': channel,
'dbm': dbm,
'lqi': lqi,
})
return networks
def scan_energy(self, timeout=10):
self.send_command('scan energy')
self.simulator.go(timeout)
rssi_list = []
for line in self._expect_command_output()[2:]:
_, channel, rssi, _ = line.split('|')
rssi_list.append({
'channel': int(channel.strip()),
'rssi': int(rssi.strip()),
})
return rssi_list
def ping(self, ipaddr, num_responses=1, size=8, timeout=5, count=1, interval=1, hoplimit=64, interface=None):
args = f'{ipaddr} {size} {count} {interval} {hoplimit} {timeout}'
if interface is not None:
args = f'-I {interface} {args}'
cmd = f'ping {args}'
self.send_command(cmd)
wait_allowance = 3
end = self.simulator.now() + timeout + wait_allowance
responders = {}
result = True
# ncp-sim doesn't print Done
done = (self.node_type == 'ncp-sim')
while len(responders) < num_responses or not done:
self.simulator.go(1)
try:
i = self._expect([r'from (\S+):', r'Done'], timeout=0.1)
except (pexpect.TIMEOUT, socket.timeout):
if self.simulator.now() < end:
continue
result = False
if isinstance(self.simulator, simulator.VirtualTime):
self.simulator.sync_devices()
break
else:
if i == 0:
responders[self.pexpect.match.groups()[0]] = 1
elif i == 1:
done = True
return result
def reset(self):
self._reset('reset')
def factory_reset(self):
self._reset('factoryreset')
def _reset(self, cmd):
self.send_command(cmd, expect_command_echo=False)
time.sleep(self.RESET_DELAY)
# Send a "version" command and drain the CLI output after reset
self.send_command('version', expect_command_echo=False)
while True:
try:
self._expect(r"[^\n]+\n", timeout=0.1)
continue
except pexpect.TIMEOUT:
break
if self.is_otbr:
self.set_log_level(5)
def set_router_selection_jitter(self, jitter):
cmd = 'routerselectionjitter %d' % jitter
self.send_command(cmd)
self._expect_done()
def set_active_dataset(
self,
timestamp,
panid=None,
channel=None,
channel_mask=None,
network_key=None,
security_policy=[],
):
self.send_command('dataset clear')
self._expect_done()
cmd = 'dataset activetimestamp %d' % timestamp
self.send_command(cmd)
self._expect_done()
if panid is not None:
cmd = 'dataset panid %d' % panid
self.send_command(cmd)
self._expect_done()
if channel is not None:
cmd = 'dataset channel %d' % channel
self.send_command(cmd)
self._expect_done()
if channel_mask is not None:
cmd = 'dataset channelmask %d' % channel_mask
self.send_command(cmd)
self._expect_done()
if network_key is not None:
cmd = 'dataset networkkey %s' % network_key
self.send_command(cmd)
self._expect_done()
if security_policy and len(security_policy) == 2:
cmd = 'dataset securitypolicy %s %s' % (
str(security_policy[0]),
security_policy[1],
)
self.send_command(cmd)
self._expect_done()
# Set the meshlocal prefix in config.py
self.send_command('dataset meshlocalprefix %s' % config.MESH_LOCAL_PREFIX.split('/')[0])
self._expect_done()
self.send_command('dataset commit active')
self._expect_done()
def set_pending_dataset(self, pendingtimestamp, activetimestamp, panid=None, channel=None, delay=None):
self.send_command('dataset clear')
self._expect_done()
cmd = 'dataset pendingtimestamp %d' % pendingtimestamp
self.send_command(cmd)
self._expect_done()
cmd = 'dataset activetimestamp %d' % activetimestamp
self.send_command(cmd)
self._expect_done()
if panid is not None:
cmd = 'dataset panid %d' % panid
self.send_command(cmd)
self._expect_done()
if channel is not None:
cmd = 'dataset channel %d' % channel
self.send_command(cmd)
self._expect_done()
if delay is not None:
cmd = 'dataset delay %d' % delay
self.send_command(cmd)
self._expect_done()
# Set the meshlocal prefix in config.py
self.send_command('dataset meshlocalprefix %s' % config.MESH_LOCAL_PREFIX.split('/')[0])
self._expect_done()
self.send_command('dataset commit pending')
self._expect_done()
def start_dataset_updater(self, panid=None, channel=None):
self.send_command('dataset clear')
self._expect_done()
if panid is not None:
cmd = 'dataset panid %d' % panid
self.send_command(cmd)
self._expect_done()
if channel is not None:
cmd = 'dataset channel %d' % channel
self.send_command(cmd)
self._expect_done()
self.send_command('dataset updater start')
self._expect_done()
def announce_begin(self, mask, count, period, ipaddr):
cmd = 'commissioner announce %d %d %d %s' % (
mask,
count,
period,
ipaddr,
)
self.send_command(cmd)
self._expect_done()
def send_mgmt_active_set(
self,
active_timestamp=None,
channel=None,
channel_mask=None,
extended_panid=None,
panid=None,
network_key=None,
mesh_local=None,
network_name=None,
security_policy=None,
binary=None,
):
cmd = 'dataset mgmtsetcommand active '
if active_timestamp is not None:
cmd += 'activetimestamp %d ' % active_timestamp
if channel is not None:
cmd += 'channel %d ' % channel
if channel_mask is not None:
cmd += 'channelmask %d ' % channel_mask
if extended_panid is not None:
cmd += 'extpanid %s ' % extended_panid
if panid is not None:
cmd += 'panid %d ' % panid
if network_key is not None:
cmd += 'networkkey %s ' % network_key
if mesh_local is not None:
cmd += 'localprefix %s ' % mesh_local
if network_name is not None:
cmd += 'networkname %s ' % self._escape_escapable(network_name)
if security_policy is not None:
rotation, flags = security_policy
cmd += 'securitypolicy %d %s ' % (rotation, flags)
if binary is not None:
cmd += '-x %s ' % binary
self.send_command(cmd)
self._expect_done()
def send_mgmt_active_get(self, addr='', tlvs=[]):
cmd = 'dataset mgmtgetcommand active'
if addr != '':
cmd += ' address '
cmd += addr
if len(tlvs) != 0:
tlv_str = ''.join('%02x' % tlv for tlv in tlvs)
cmd += ' -x '
cmd += tlv_str
self.send_command(cmd)
self._expect_done()
def send_mgmt_pending_get(self, addr='', tlvs=[]):
cmd = 'dataset mgmtgetcommand pending'
if addr != '':
cmd += ' address '
cmd += addr
if len(tlvs) != 0:
tlv_str = ''.join('%02x' % tlv for tlv in tlvs)
cmd += ' -x '
cmd += tlv_str
self.send_command(cmd)
self._expect_done()
def send_mgmt_pending_set(
self,
pending_timestamp=None,
active_timestamp=None,
delay_timer=None,
channel=None,
panid=None,
network_key=None,
mesh_local=None,
network_name=None,
):
cmd = 'dataset mgmtsetcommand pending '
if pending_timestamp is not None:
cmd += 'pendingtimestamp %d ' % pending_timestamp
if active_timestamp is not None:
cmd += 'activetimestamp %d ' % active_timestamp
if delay_timer is not None:
cmd += 'delaytimer %d ' % delay_timer
if channel is not None:
cmd += 'channel %d ' % channel
if panid is not None:
cmd += 'panid %d ' % panid
if network_key is not None:
cmd += 'networkkey %s ' % network_key
if mesh_local is not None:
cmd += 'localprefix %s ' % mesh_local
if network_name is not None:
cmd += 'networkname %s ' % self._escape_escapable(network_name)
self.send_command(cmd)
self._expect_done()
def coap_cancel(self):
"""
Cancel a CoAP subscription.
"""
cmd = 'coap cancel'
self.send_command(cmd)
self._expect_done()
def coap_delete(self, ipaddr, uri, con=False, payload=None):
"""
Send a DELETE request via CoAP.
"""
return self._coap_rq('delete', ipaddr, uri, con, payload)
def coap_get(self, ipaddr, uri, con=False, payload=None):
"""
Send a GET request via CoAP.
"""
return self._coap_rq('get', ipaddr, uri, con, payload)
def coap_get_block(self, ipaddr, uri, size=16, count=0):
"""
Send a GET request via CoAP.
"""
return self._coap_rq_block('get', ipaddr, uri, size, count)
def coap_observe(self, ipaddr, uri, con=False, payload=None):
"""
Send a GET request via CoAP with Observe set.
"""
return self._coap_rq('observe', ipaddr, uri, con, payload)
def coap_post(self, ipaddr, uri, con=False, payload=None):
"""
Send a POST request via CoAP.
"""
return self._coap_rq('post', ipaddr, uri, con, payload)
def coap_post_block(self, ipaddr, uri, size=16, count=0):
"""
Send a POST request via CoAP.
"""
return self._coap_rq_block('post', ipaddr, uri, size, count)
def coap_put(self, ipaddr, uri, con=False, payload=None):
"""
Send a PUT request via CoAP.
"""
return self._coap_rq('put', ipaddr, uri, con, payload)
def coap_put_block(self, ipaddr, uri, size=16, count=0):
"""
Send a PUT request via CoAP.
"""
return self._coap_rq_block('put', ipaddr, uri, size, count)
def _coap_rq(self, method, ipaddr, uri, con=False, payload=None):
"""
Issue a GET/POST/PUT/DELETE/GET OBSERVE request.
"""
cmd = 'coap %s %s %s' % (method, ipaddr, uri)
if con:
cmd += ' con'
else:
cmd += ' non'
if payload is not None:
cmd += ' %s' % payload
self.send_command(cmd)
return self.coap_wait_response()
def _coap_rq_block(self, method, ipaddr, uri, size=16, count=0):
"""
Issue a GET/POST/PUT/DELETE/GET OBSERVE BLOCK request.
"""
cmd = 'coap %s %s %s' % (method, ipaddr, uri)
cmd += ' block-%d' % size
if count != 0:
cmd += ' %d' % count
self.send_command(cmd)
return self.coap_wait_response()
def coap_wait_response(self):
"""
Wait for a CoAP response, and return it.
"""
if isinstance(self.simulator, simulator.VirtualTime):
self.simulator.go(5)
timeout = 1
else:
timeout = 5
self._expect(r'coap response from ([\da-f:]+)(?: OBS=(\d+))?'
r'(?: with payload: ([\da-f]+))?\b',
timeout=timeout)
(source, observe, payload) = self.pexpect.match.groups()
source = source.decode('UTF-8')
if observe is not None:
observe = int(observe, base=10)
if payload is not None:
try:
payload = binascii.a2b_hex(payload).decode('UTF-8')
except UnicodeDecodeError:
pass
# Return the values received
return dict(source=source, observe=observe, payload=payload)
def coap_wait_request(self):
"""
Wait for a CoAP request to be made.
"""
if isinstance(self.simulator, simulator.VirtualTime):
self.simulator.go(5)
timeout = 1
else:
timeout = 5
self._expect(r'coap request from ([\da-f:]+)(?: OBS=(\d+))?'
r'(?: with payload: ([\da-f]+))?\b',
timeout=timeout)
(source, observe, payload) = self.pexpect.match.groups()
source = source.decode('UTF-8')
if observe is not None:
observe = int(observe, base=10)
if payload is not None:
payload = binascii.a2b_hex(payload).decode('UTF-8')
# Return the values received
return dict(source=source, observe=observe, payload=payload)
def coap_wait_subscribe(self):
"""
Wait for a CoAP client to be subscribed.
"""
if isinstance(self.simulator, simulator.VirtualTime):
self.simulator.go(5)
timeout = 1
else:
timeout = 5
self._expect(r'Subscribing client\b', timeout=timeout)
def coap_wait_ack(self):
"""
Wait for a CoAP notification ACK.
"""
if isinstance(self.simulator, simulator.VirtualTime):
self.simulator.go(5)
timeout = 1
else:
timeout = 5
self._expect(r'Received ACK in reply to notification ' r'from ([\da-f:]+)\b', timeout=timeout)
(source,) = self.pexpect.match.groups()
source = source.decode('UTF-8')
return source
def coap_set_resource_path(self, path):
"""
Set the path for the CoAP resource.
"""
cmd = 'coap resource %s' % path
self.send_command(cmd)
self._expect_done()
def coap_set_resource_path_block(self, path, count=0):
"""
Set the path for the CoAP resource and how many blocks can be received from this resource.
"""
cmd = 'coap resource %s %d' % (path, count)
self.send_command(cmd)
self._expect('Done')
def coap_set_content(self, content):
"""
Set the content of the CoAP resource.
"""
cmd = 'coap set %s' % content
self.send_command(cmd)
self._expect_done()
def coap_start(self):
"""
Start the CoAP service.
"""
cmd = 'coap start'
self.send_command(cmd)
self._expect_done()
def coap_stop(self):
"""
Stop the CoAP service.
"""
cmd = 'coap stop'
self.send_command(cmd)
if isinstance(self.simulator, simulator.VirtualTime):
self.simulator.go(5)
timeout = 1
else:
timeout = 5
self._expect_done(timeout=timeout)
def coaps_start_psk(self, psk, pskIdentity):
cmd = 'coaps psk %s %s' % (psk, pskIdentity)
self.send_command(cmd)
self._expect_done()
cmd = 'coaps start'
self.send_command(cmd)
self._expect_done()
def coaps_start_x509(self):
cmd = 'coaps x509'
self.send_command(cmd)
self._expect_done()
cmd = 'coaps start'
self.send_command(cmd)
self._expect_done()
def coaps_set_resource_path(self, path):
cmd = 'coaps resource %s' % path
self.send_command(cmd)
self._expect_done()
def coaps_stop(self):
cmd = 'coaps stop'
self.send_command(cmd)
if isinstance(self.simulator, simulator.VirtualTime):
self.simulator.go(5)
timeout = 1
else:
timeout = 5
self._expect_done(timeout=timeout)
def coaps_connect(self, ipaddr):
cmd = 'coaps connect %s' % ipaddr
self.send_command(cmd)
if isinstance(self.simulator, simulator.VirtualTime):
self.simulator.go(5)
timeout = 1
else:
timeout = 5
self._expect('coaps connected', timeout=timeout)
def coaps_disconnect(self):
cmd = 'coaps disconnect'
self.send_command(cmd)
self._expect_done()
self.simulator.go(5)
def coaps_get(self):
cmd = 'coaps get test'
self.send_command(cmd)
if isinstance(self.simulator, simulator.VirtualTime):
self.simulator.go(5)
timeout = 1
else:
timeout = 5
self._expect('coaps response', timeout=timeout)
def commissioner_mgmtget(self, tlvs_binary=None):
cmd = 'commissioner mgmtget'
if tlvs_binary is not None:
cmd += ' -x %s' % tlvs_binary
self.send_command(cmd)
self._expect_done()
def commissioner_mgmtset(self, tlvs_binary):
cmd = 'commissioner mgmtset -x %s' % tlvs_binary
self.send_command(cmd)
self._expect_done()
def bytes_to_hex_str(self, src):
return ''.join(format(x, '02x') for x in src)
def commissioner_mgmtset_with_tlvs(self, tlvs):
payload = bytearray()
for tlv in tlvs:
payload += tlv.to_hex()
self.commissioner_mgmtset(self.bytes_to_hex_str(payload))
def udp_start(self, local_ipaddr, local_port, bind_unspecified=False):
cmd = 'udp open'
self.send_command(cmd)
self._expect_done()
cmd = 'udp bind %s %s %s' % ("-u" if bind_unspecified else "", local_ipaddr, local_port)
self.send_command(cmd)
self._expect_done()
def udp_stop(self):
cmd = 'udp close'
self.send_command(cmd)
self._expect_done()
def udp_send(self, bytes, ipaddr, port, success=True):
cmd = 'udp send %s %d -s %d ' % (ipaddr, port, bytes)
self.send_command(cmd)
if success:
self._expect_done()
else:
self._expect('Error')
def udp_check_rx(self, bytes_should_rx):
self._expect('%d bytes' % bytes_should_rx)
def set_routereligible(self, enable: bool):
cmd = f'routereligible {"enable" if enable else "disable"}'
self.send_command(cmd)
self._expect_done()
def router_list(self):
cmd = 'router list'
self.send_command(cmd)
self._expect([r'(\d+)((\s\d+)*)'])
g = self.pexpect.match.groups()
router_list = g[0].decode('utf8') + ' ' + g[1].decode('utf8')
router_list = [int(x) for x in router_list.split()]
self._expect_done()
return router_list
def router_table(self):
cmd = 'router table'
self.send_command(cmd)
self._expect(r'(.*)Done')
g = self.pexpect.match.groups()
output = g[0].decode('utf8')
lines = output.strip().split('\n')
lines = [l.strip() for l in lines]
router_table = {}
for i, line in enumerate(lines):
if not line.startswith('|') or not line.endswith('|'):
if i not in (0, 2):
# should not happen
print("unexpected line %d: %s" % (i, line))
continue
line = line[1:][:-1]
line = [x.strip() for x in line.split('|')]
if len(line) < 9:
print("unexpected line %d: %s" % (i, line))
continue
try:
int(line[0])
except ValueError:
if i != 1:
print("unexpected line %d: %s" % (i, line))
continue
id = int(line[0])
rloc16 = int(line[1], 16)
nexthop = int(line[2])
pathcost = int(line[3])
lqin = int(line[4])
lqout = int(line[5])
age = int(line[6])
emac = str(line[7])
link = int(line[8])
router_table[id] = {
'rloc16': rloc16,
'nexthop': nexthop,
'pathcost': pathcost,
'lqin': lqin,
'lqout': lqout,
'age': age,
'emac': emac,
'link': link,
}
return router_table
def link_metrics_query_single_probe(self, dst_addr: str, linkmetrics_flags: str):
cmd = 'linkmetrics query %s single %s' % (dst_addr, linkmetrics_flags)
self.send_command(cmd)
self._expect_done()
def link_metrics_query_forward_tracking_series(self, dst_addr: str, series_id: int):
cmd = 'linkmetrics query %s forward %d' % (dst_addr, series_id)
self.send_command(cmd)
self._expect_done()
def link_metrics_mgmt_req_enhanced_ack_based_probing(self,
dst_addr: str,
enable: bool,
metrics_flags: str,
ext_flags=''):
cmd = "linkmetrics mgmt %s enhanced-ack" % (dst_addr)
if enable:
cmd = cmd + (" register %s %s" % (metrics_flags, ext_flags))
else:
cmd = cmd + " clear"
self.send_command(cmd)
self._expect_done()
def link_metrics_mgmt_req_forward_tracking_series(self, dst_addr: str, series_id: int, series_flags: str,
metrics_flags: str):
cmd = "linkmetrics mgmt %s forward %d %s %s" % (dst_addr, series_id, series_flags, metrics_flags)
self.send_command(cmd)
self._expect_done()
def link_metrics_send_link_probe(self, dst_addr: str, series_id: int, length: int):
cmd = "linkmetrics probe %s %d %d" % (dst_addr, series_id, length)
self.send_command(cmd)
self._expect_done()
def send_address_notification(self, dst: str, target: str, mliid: str):
cmd = f'fake /a/an {dst} {target} {mliid}'
self.send_command(cmd)
self._expect_done()
def send_proactive_backbone_notification(self, target: str, mliid: str, ltt: int):
cmd = f'fake /b/ba {target} {mliid} {ltt}'
self.send_command(cmd)
self._expect_done()
def dns_get_config(self):
"""
Returns the DNS config as a list of property dictionary (string key and string value).
Example output:
{
'Server': '[fd00:0:0:0:0:0:0:1]:1234'
'ResponseTimeout': '5000 ms'
'MaxTxAttempts': '2'
'RecursionDesired': 'no'
}
"""
cmd = f'dns config'
self.send_command(cmd)
output = self._expect_command_output()
config = {}
for line in output:
k, v = line.split(': ')
config[k] = v
return config
def dns_set_config(self, config):
cmd = f'dns config {config}'
self.send_command(cmd)
self._expect_done()
def dns_resolve(self, hostname, server=None, port=53):
cmd = f'dns resolve {hostname}'
if server is not None:
cmd += f' {server} {port}'
self.send_command(cmd)
self.simulator.go(10)
output = self._expect_command_output()
dns_resp = output[0]
# example output: "DNS response for host1.default.service.arpa. - fd00:db8:0:0:fd3d:d471:1e8c:b60 TTL:7190 "
# " fd00:db8:0:0:0:ff:fe00:9000 TTL:7190"
addrs = dns_resp.strip().split(' - ')[1].split(' ')
ip = [item.strip() for item in addrs[::2]]
ttl = [int(item.split('TTL:')[1]) for item in addrs[1::2]]
return list(zip(ip, ttl))
def dns_resolve_service(self, instance, service, server=None, port=53):
"""
Resolves the service instance and returns the instance information as a dict.
Example return value:
{
'port': 12345,
'priority': 0,
'weight': 0,
'host': 'ins1._ipps._tcp.default.service.arpa.',
'address': '2001::1',
'txt_data': 'a=00, b=02bb',
'srv_ttl': 7100,
'txt_ttl': 7100,
'aaaa_ttl': 7100,
}
"""
instance = self._escape_escapable(instance)
cmd = f'dns service {instance} {service}'
if server is not None:
cmd += f' {server} {port}'
self.send_command(cmd)
self.simulator.go(10)
output = self._expect_command_output()
# Example output:
# DNS service resolution response for ins2 for service _ipps._tcp.default.service.arpa.
# Port:22222, Priority:2, Weight:2, TTL:7155
# Host:host2.default.service.arpa.
# HostAddress:0:0:0:0:0:0:0:0 TTL:0
# TXT:[a=00, b=02bb] TTL:7155
# Done
m = re.match(
r'.*Port:(\d+), Priority:(\d+), Weight:(\d+), TTL:(\d+)\s+Host:(.*?)\s+HostAddress:(\S+) TTL:(\d+)\s+TXT:\[(.*?)\] TTL:(\d+)',
'\r'.join(output))
if m:
port, priority, weight, srv_ttl, hostname, address, aaaa_ttl, txt_data, txt_ttl = m.groups()
return {
'port': int(port),
'priority': int(priority),
'weight': int(weight),
'host': hostname,
'address': address,
'txt_data': txt_data,
'srv_ttl': int(srv_ttl),
'txt_ttl': int(txt_ttl),
'aaaa_ttl': int(aaaa_ttl),
}
else:
raise Exception('dns resolve service failed: %s.%s' % (instance, service))
@staticmethod
def __parse_hex_string(hexstr: str) -> bytes:
assert (len(hexstr) % 2 == 0)
return bytes(int(hexstr[i:i + 2], 16) for i in range(0, len(hexstr), 2))
def dns_browse(self, service_name, server=None, port=53):
"""
Browse the service and returns the instances.
Example return value:
{
'ins1': {
'port': 12345,
'priority': 1,
'weight': 1,
'host': 'ins1._ipps._tcp.default.service.arpa.',
'address': '2001::1',
'txt_data': 'a=00, b=11cf',
'srv_ttl': 7100,
'txt_ttl': 7100,
'aaaa_ttl': 7100,
},
'ins2': {
'port': 12345,
'priority': 2,
'weight': 2,
'host': 'ins2._ipps._tcp.default.service.arpa.',
'address': '2001::2',
'txt_data': 'a=01, b=23dd',
'srv_ttl': 7100,
'txt_ttl': 7100,
'aaaa_ttl': 7100,
}
}
"""
cmd = f'dns browse {service_name}'
if server is not None:
cmd += f' {server} {port}'
self.send_command(cmd)
self.simulator.go(10)
output = '\n'.join(self._expect_command_output())
# Example output:
# ins2
# Port:22222, Priority:2, Weight:2, TTL:7175
# Host:host2.default.service.arpa.
# HostAddress:fd00:db8:0:0:3205:28dd:5b87:6a63 TTL:7175
# TXT:[a=00, b=11cf] TTL:7175
# ins1
# Port:11111, Priority:1, Weight:1, TTL:7170
# Host:host1.default.service.arpa.
# HostAddress:fd00:db8:0:0:39f4:d9:eb4f:778 TTL:7170
# TXT:[a=01, b=23dd] TTL:7170
# Done
result = {}
for ins, port, priority, weight, srv_ttl, hostname, address, aaaa_ttl, txt_data, txt_ttl in re.findall(
r'(.*?)\s+Port:(\d+), Priority:(\d+), Weight:(\d+), TTL:(\d+)\s*Host:(\S+)\s+HostAddress:(\S+) TTL:(\d+)\s+TXT:\[(.*?)\] TTL:(\d+)',
output):
result[ins] = {
'port': int(port),
'priority': int(priority),
'weight': int(weight),
'host': hostname,
'address': address,
'txt_data': txt_data,
'srv_ttl': int(srv_ttl),
'txt_ttl': int(txt_ttl),
'aaaa_ttl': int(aaaa_ttl),
}
return result
def set_mliid(self, mliid: str):
cmd = f'mliid {mliid}'
self.send_command(cmd)
self._expect_command_output()
def history_netinfo(self, num_entries=0):
"""
Get the `netinfo` history list, parse each entry and return
a list of dictionary (string key and string value) entries.
Example of return value:
[
{
'age': '00:00:00.000 ago',
'role': 'disabled',
'mode': 'rdn',
'rloc16': '0x7400',
'partition-id': '1318093703'
},
{
'age': '00:00:02.588 ago',
'role': 'leader',
'mode': 'rdn',
'rloc16': '0x7400',
'partition-id': '1318093703'
}
]
"""
cmd = f'history netinfo list {num_entries}'
self.send_command(cmd)
output = self._expect_command_output()
netinfos = []
for entry in output:
netinfo = {}
age, info = entry.split(' -> ')
netinfo['age'] = age
for item in info.split(' '):
k, v = item.split(':')
netinfo[k] = v
netinfos.append(netinfo)
return netinfos
def history_rx(self, num_entries=0):
"""
Get the IPv6 RX history list, parse each entry and return
a list of dictionary (string key and string value) entries.
Example of return value:
[
{
'age': '00:00:01.999',
'type': 'ICMP6(EchoReqst)',
'len': '16',
'sec': 'yes',
'prio': 'norm',
'rss': '-20',
'from': '0xac00',
'radio': '15.4',
'src': '[fd00:db8:0:0:2cfa:fd61:58a9:f0aa]:0',
'dst': '[fd00:db8:0:0:ed7e:2d04:e543:eba5]:0',
}
]
"""
cmd = f'history rx list {num_entries}'
self.send_command(cmd)
return self._parse_history_rx_tx_ouput(self._expect_command_output())
def history_tx(self, num_entries=0):
"""
Get the IPv6 TX history list, parse each entry and return
a list of dictionary (string key and string value) entries.
Example of return value:
[
{
'age': '00:00:01.999',
'type': 'ICMP6(EchoReply)',
'len': '16',
'sec': 'yes',
'prio': 'norm',
'to': '0xac00',
'tx-success': 'yes',
'radio': '15.4',
'src': '[fd00:db8:0:0:ed7e:2d04:e543:eba5]:0',
'dst': '[fd00:db8:0:0:2cfa:fd61:58a9:f0aa]:0',
}
]
"""
cmd = f'history tx list {num_entries}'
self.send_command(cmd)
return self._parse_history_rx_tx_ouput(self._expect_command_output())
def _parse_history_rx_tx_ouput(self, lines):
rxtx_list = []
for line in lines:
if line.strip().startswith('type:'):
for item in line.strip().split(' '):
k, v = item.split(':')
entry[k] = v
elif line.strip().startswith('src:'):
entry['src'] = line[4:]
elif line.strip().startswith('dst:'):
entry['dst'] = line[4:]
rxtx_list.append(entry)
else:
entry = {}
entry['age'] = line
return rxtx_list
def set_router_id_range(self, min_router_id: int, max_router_id: int):
cmd = f'routeridrange {min_router_id} {max_router_id}'
self.send_command(cmd)
self._expect_command_output()
def get_router_id_range(self):
cmd = 'routeridrange'
self.send_command(cmd)
line = self._expect_command_output()[0]
return [int(item) for item in line.split()]
class Node(NodeImpl, OtCli):
pass
class LinuxHost():
PING_RESPONSE_PATTERN = re.compile(r'\d+ bytes from .*:.*')
ETH_DEV = config.BACKBONE_IFNAME
def enable_ether(self):
"""Enable the ethernet interface.
"""
self.bash(f'ip link set {self.ETH_DEV} up')
def disable_ether(self):
"""Disable the ethernet interface.
"""
self.bash(f'ip link set {self.ETH_DEV} down')
def get_ether_addrs(self):
output = self.bash(f'ip -6 addr list dev {self.ETH_DEV}')
addrs = []
for line in output:
# line example: "inet6 fe80::42:c0ff:fea8:903/64 scope link"
line = line.strip().split()
if line and line[0] == 'inet6':
addr = line[1]
if '/' in addr:
addr = addr.split('/')[0]
addrs.append(addr)
logging.debug('%s: get_ether_addrs: %r', self, addrs)
return addrs
def get_ether_mac(self):
output = self.bash(f'ip addr list dev {self.ETH_DEV}')
for line in output:
# link/ether 02:42:ac:11:00:02 brd ff:ff:ff:ff:ff:ff link-netnsid 0
line = line.strip().split()
if line and line[0] == 'link/ether':
return line[1]
assert False, output
def add_ipmaddr_ether(self, ip: str):
cmd = f'python3 /app/third_party/openthread/repo/tests/scripts/thread-cert/mcast6.py {self.ETH_DEV} {ip} &'
self.bash(cmd)
def ping_ether(self, ipaddr, num_responses=1, size=None, timeout=5, ttl=None, interface='eth0') -> int:
cmd = f'ping -6 {ipaddr} -I {interface} -c {num_responses} -W {timeout}'
if size is not None:
cmd += f' -s {size}'
if ttl is not None:
cmd += f' -t {ttl}'
resp_count = 0
try:
for line in self.bash(cmd):
if self.PING_RESPONSE_PATTERN.match(line):
resp_count += 1
except subprocess.CalledProcessError:
pass
return resp_count
def _getBackboneGua(self) -> Optional[str]:
for addr in self.get_ether_addrs():
if re.match(config.BACKBONE_PREFIX_REGEX_PATTERN, addr, re.I):
return addr
return None
def _getInfraUla(self) -> Optional[str]:
""" Returns the ULA addresses autoconfigured on the infra link.
"""
addrs = []
for addr in self.get_ether_addrs():
if re.match(config.ONLINK_PREFIX_REGEX_PATTERN, addr, re.I):
addrs.append(addr)
return addrs
def _getInfraGua(self) -> Optional[str]:
""" Returns the GUA addresses autoconfigured on the infra link.
"""
gua_prefix = config.ONLINK_GUA_PREFIX.split('::/')[0]
return [addr for addr in self.get_ether_addrs() if addr.startswith(gua_prefix)]
def ping(self, *args, **kwargs):
backbone = kwargs.pop('backbone', False)
if backbone:
return self.ping_ether(*args, **kwargs)
else:
return super().ping(*args, **kwargs)
def udp_send_host(self, ipaddr, port, data, hop_limit=None):
if hop_limit is None:
if ipaddress.ip_address(ipaddr).is_multicast:
hop_limit = 10
else:
hop_limit = 64
cmd = f'python3 /app/third_party/openthread/repo/tests/scripts/thread-cert/udp_send_host.py {ipaddr} {port} "{data}" {hop_limit}'
self.bash(cmd)
def add_ipmaddr(self, *args, **kwargs):
backbone = kwargs.pop('backbone', False)
if backbone:
return self.add_ipmaddr_ether(*args, **kwargs)
else:
return super().add_ipmaddr(*args, **kwargs)
def ip_neighbors_flush(self):
# clear neigh cache on linux
self.bash(f'ip -6 neigh list dev {self.ETH_DEV}')
self.bash(f'ip -6 neigh flush nud all nud failed nud noarp dev {self.ETH_DEV}')
self.bash('ip -6 neigh list nud all dev %s | cut -d " " -f1 | sudo xargs -I{} ip -6 neigh delete {} dev %s' %
(self.ETH_DEV, self.ETH_DEV))
self.bash(f'ip -6 neigh list dev {self.ETH_DEV}')
def browse_mdns_services(self, name, timeout=2):
""" Browse mDNS services on the ethernet.
:param name: the service type name in format of '<service-name>.<protocol>'.
:param timeout: timeout value in seconds before returning.
:return: A list of service instance names.
"""
self.bash(f'dns-sd -Z {name} local. > /tmp/{name} 2>&1 &')
time.sleep(timeout)
self.bash('pkill dns-sd')
instances = []
for line in self.bash(f'cat /tmp/{name}', encoding='raw_unicode_escape'):
elements = line.split()
if len(elements) >= 3 and elements[0] == name and elements[1] == 'PTR':
instances.append(elements[2][:-len('.' + name)])
return instances
def discover_mdns_service(self, instance, name, host_name, timeout=2):
""" Discover/resolve the mDNS service on ethernet.
:param instance: the service instance name.
:param name: the service name in format of '<service-name>.<protocol>'.
:param host_name: the host name this service points to. The domain
should not be included.
:param timeout: timeout value in seconds before returning.
:return: a dict of service properties or None.
The return value is a dict with the same key/values of srp_server_get_service
except that we don't have a `deleted` field here.
"""
host_name_file = self.bash('mktemp')[0].strip()
service_data_file = self.bash('mktemp')[0].strip()
self.bash(f'dns-sd -Z {name} local. > {service_data_file} 2>&1 &')
time.sleep(timeout)
full_service_name = f'{instance}.{name}'
# When hostname is unspecified, extract hostname from browse result
if host_name is None:
for line in self.bash(f'cat {service_data_file}', encoding='raw_unicode_escape'):
elements = line.split()
if len(elements) >= 6 and elements[0] == full_service_name and elements[1] == 'SRV':
host_name = elements[5].split('.')[0]
break
assert (host_name is not None)
self.bash(f'dns-sd -G v6 {host_name}.local. > {host_name_file} 2>&1 &')
time.sleep(timeout)
self.bash('pkill dns-sd')
addresses = []
service = {}
logging.debug(self.bash(f'cat {host_name_file}', encoding='raw_unicode_escape'))
logging.debug(self.bash(f'cat {service_data_file}', encoding='raw_unicode_escape'))
# example output in the host file:
# Timestamp A/R Flags if Hostname Address TTL
# 9:38:09.274 Add 23 48 my-host.local. 2001:0000:0000:0000:0000:0000:0000:0002%<0> 120
#
for line in self.bash(f'cat {host_name_file}', encoding='raw_unicode_escape'):
elements = line.split()
fullname = f'{host_name}.local.'
if fullname not in elements:
continue
addresses.append(elements[elements.index(fullname) + 1].split('%')[0])
logging.debug(f'addresses of {host_name}: {addresses}')
# example output of in the service file:
# _ipps._tcp PTR my-service._ipps._tcp
# my-service._ipps._tcp SRV 0 0 12345 my-host.local. ; Replace with unicast FQDN of target host
# my-service._ipps._tcp TXT ""
#
is_txt = False
txt = ''
for line in self.bash(f'cat {service_data_file}', encoding='raw_unicode_escape'):
elements = line.split()
if len(elements) >= 2 and elements[0] == full_service_name and elements[1] == 'TXT':
is_txt = True
if is_txt:
txt += line.strip()
if line.strip().endswith('"'):
is_txt = False
txt_dict = self.__parse_dns_sd_txt(txt)
logging.info(f'txt = {txt_dict}')
service['txt'] = txt_dict
if not elements or elements[0] != full_service_name:
continue
if elements[1] == 'SRV':
service['fullname'] = elements[0]
service['instance'] = instance
service['name'] = name
service['priority'] = int(elements[2])
service['weight'] = int(elements[3])
service['port'] = int(elements[4])
service['host_fullname'] = elements[5]
assert (service['host_fullname'] == f'{host_name}.local.')
service['host'] = host_name
service['addresses'] = addresses
return service if 'addresses' in service and service['addresses'] else None
def start_radvd_service(self, prefix, slaac):
self.bash("""cat >/etc/radvd.conf <<EOF
interface eth0
{
AdvSendAdvert on;
AdvReachableTime 200;
AdvRetransTimer 200;
AdvDefaultLifetime 1800;
MinRtrAdvInterval 1200;
MaxRtrAdvInterval 1800;
AdvDefaultPreference low;
prefix %s
{
AdvOnLink on;
AdvAutonomous %s;
AdvRouterAddr off;
AdvPreferredLifetime 40;
AdvValidLifetime 60;
};
};
EOF
""" % (prefix, 'on' if slaac else 'off'))
self.bash('service radvd start')
self.bash('service radvd status') # Make sure radvd service is running
def stop_radvd_service(self):
self.bash('service radvd stop')
def kill_radvd_service(self):
self.bash('pkill radvd')
def __parse_dns_sd_txt(self, line: str):
# Example TXT entry:
# "xp=\\000\\013\\184\\000\\000\\000\\000\\000"
txt = {}
for entry in re.findall(r'"((?:[^\\]|\\.)*?)"', line):
if '=' not in entry:
continue
k, v = entry.split('=', 1)
txt[k] = v
return txt
class OtbrNode(LinuxHost, NodeImpl, OtbrDocker):
TUN_DEV = config.THREAD_IFNAME
is_otbr = True
is_bbr = True # OTBR is also BBR
node_type = 'otbr-docker'
def __repr__(self):
return f'Otbr<{self.nodeid}>'
def start(self):
self._setup_sysctl()
self.set_log_level(5)
super().start()
def add_ipaddr(self, addr):
cmd = f'ip -6 addr add {addr}/64 dev {self.TUN_DEV}'
self.bash(cmd)
def add_ipmaddr_tun(self, ip: str):
cmd = f'python3 /app/third_party/openthread/repo/tests/scripts/thread-cert/mcast6.py {self.TUN_DEV} {ip} &'
self.bash(cmd)
class HostNode(LinuxHost, OtbrDocker):
is_host = True
def __init__(self, nodeid, name=None, **kwargs):
self.nodeid = nodeid
self.name = name or ('Host%d' % nodeid)
super().__init__(nodeid, **kwargs)
self.bash('service otbr-agent stop')
def start(self, start_radvd=True, prefix=config.DOMAIN_PREFIX, slaac=False):
self._setup_sysctl()
if start_radvd:
self.start_radvd_service(prefix, slaac)
else:
self.stop_radvd_service()
def stop(self):
self.stop_radvd_service()
def get_addrs(self) -> List[str]:
return self.get_ether_addrs()
def __repr__(self):
return f'Host<{self.nodeid}>'
def get_matched_ula_addresses(self, prefix):
"""Get the IPv6 addresses that matches given prefix.
"""
addrs = []
for addr in self.get_ip6_address(config.ADDRESS_TYPE.ONLINK_ULA):
if IPv6Address(addr) in IPv6Network(prefix):
addrs.append(addr)
return addrs
def get_ip6_address(self, address_type: config.ADDRESS_TYPE):
"""Get specific type of IPv6 address configured on thread device.
Args:
address_type: the config.ADDRESS_TYPE type of IPv6 address.
Returns:
IPv6 address string.
"""
if address_type == config.ADDRESS_TYPE.BACKBONE_GUA:
return self._getBackboneGua()
elif address_type == config.ADDRESS_TYPE.ONLINK_ULA:
return self._getInfraUla()
elif address_type == config.ADDRESS_TYPE.ONLINK_GUA:
return self._getInfraGua()
else:
raise ValueError(f'unsupported address type: {address_type}')
if __name__ == '__main__':
unittest.main()