blob: 7441f3312dcace3ce34b1b64bce723718208fe66 [file] [log] [blame]
#!/usr/bin/env python
#
# Copyright (c) 2016, The OpenThread Authors.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# 3. Neither the name of the copyright holder nor the
# names of its contributors may be used to endorse or promote products
# derived from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
"""
>> Thread Host Controller Interface
>> Device : OpenThread THCI
>> Class : OpenThread
"""
import functools
import ipaddress
import logging
import random
import traceback
import re
import socket
import time
from abc import abstractmethod
import serial
from Queue import Queue
from serial.serialutil import SerialException
from GRLLibs.ThreadPacket.PlatformPackets import (
PlatformDiagnosticPacket,
PlatformPackets,
)
from GRLLibs.UtilityModules.ModuleHelper import ModuleHelper, ThreadRunner
from GRLLibs.UtilityModules.Plugins.AES_CMAC import Thread_PBKDF2
from GRLLibs.UtilityModules.Test import (
Thread_Device_Role,
Device_Data_Requirement,
MacType,
)
from GRLLibs.UtilityModules.enums import (
PlatformDiagnosticPacket_Direction,
PlatformDiagnosticPacket_Type,
)
from GRLLibs.UtilityModules.enums import DevCapb
from IThci import IThci
import commissioner
from commissioner_impl import OTCommissioner
# Replace by the actual version string for the vendor's reference device
OT11_VERSION = 'OPENTHREAD'
OT12_VERSION = 'OPENTHREAD'
OT13_VERSION = 'OPENTHREAD'
# Supported device capabilites in this THCI implementation
OT11_CAPBS = DevCapb.V1_1
OT12_CAPBS = (DevCapb.L_AIO | DevCapb.C_FFD | DevCapb.C_RFD)
OT12BR_CAPBS = (DevCapb.C_BBR | DevCapb.C_Host | DevCapb.C_Comm)
OT13_CAPBS = DevCapb.NotSpecified
ZEPHYR_PREFIX = 'ot '
"""CLI prefix used for OpenThread commands in Zephyr systems"""
LINESEPX = re.compile(r'\r\n|\n')
"""regex: used to split lines"""
LOGX = re.compile(r'((\[(-|C|W|N|I|D)\])'
r'|(-+$)' # e.x. ------------------------------------------------------------------------
r'|(=+\[.*\]=+$)' # e.x. ==============================[TX len=108]===============================
r'|(\|.+\|.+\|.+)' # e.x. | 61 DC D2 CE FA 04 00 00 | 00 00 0A 6E 16 01 00 00 | aRNz......n....
r')')
"""regex used to filter logs"""
assert LOGX.match('[-]')
assert LOGX.match('[C]')
assert LOGX.match('[W]')
assert LOGX.match('[N]')
assert LOGX.match('[I]')
assert LOGX.match('[D]')
assert LOGX.match('------------------------------------------------------------------------')
assert LOGX.match('==============================[TX len=108]===============================')
assert LOGX.match('| 61 DC D2 CE FA 04 00 00 | 00 00 0A 6E 16 01 00 00 | aRNz......n....')
# OT Errors
OT_ERROR_ALREADY = 24
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
_callStackDepth = 0
def watched(func):
func_name = func.func_name
@functools.wraps(func)
def wrapped_api_func(self, *args, **kwargs):
global _callStackDepth
callstr = '====' * _callStackDepth + "> %s%s%s" % (func_name, str(args) if args else "",
str(kwargs) if kwargs else "")
_callStackDepth += 1
try:
ret = func(self, *args, **kwargs)
self.log("%s returns %r", callstr, ret)
return ret
except Exception as ex:
self.log("FUNC %s failed: %s", func_name, str(ex))
raise
finally:
_callStackDepth -= 1
if _callStackDepth == 0:
print('\n')
return wrapped_api_func
def retry(n, interval=0):
assert n >= 0, n
def deco(func):
@functools.wraps(func)
def retried_func(*args, **kwargs):
for i in range(n + 1):
try:
return func(*args, **kwargs)
except Exception:
if i == n:
raise
time.sleep(interval)
return retried_func
return deco
def API(api_func):
try:
return watched(api_func)
except Exception:
tb = traceback.format_exc()
ModuleHelper.WriteIntoDebugLogger("Exception raised while calling %s:\n%s" % (api_func.func_name, tb))
raise
def commissioning(func):
@functools.wraps(func)
def comm_func(self, *args, **kwargs):
self._onCommissionStart()
try:
return func(self, *args, **kwargs)
finally:
self._onCommissionStop()
return comm_func
class CommandError(Exception):
def __init__(self, code, msg):
assert isinstance(code, int), code
self.code = code
self.msg = msg
super(CommandError, self).__init__("Error %d: %s" % (code, msg))
class OpenThreadTHCI(object):
LOWEST_POSSIBLE_PARTATION_ID = 0x1
LINK_QUALITY_CHANGE_TIME = 100
DEFAULT_COMMAND_TIMEOUT = 10
firmwarePrefix = 'OPENTHREAD/'
DOMAIN_NAME = 'Thread'
MLR_TIMEOUT_MIN = 300
NETWORK_ATTACHMENT_TIMEOUT = 10
IsBorderRouter = False
IsBackboneRouter = False
IsHost = False
externalCommissioner = None
_update_router_status = False
_cmdPrefix = ''
_lineSepX = LINESEPX
_ROLE_MODE_DICT = {
Thread_Device_Role.Leader: 'rdn',
Thread_Device_Role.Router: 'rdn',
Thread_Device_Role.SED: '-',
Thread_Device_Role.EndDevice: 'rn',
Thread_Device_Role.REED: 'rdn',
Thread_Device_Role.EndDevice_FED: 'rdn',
Thread_Device_Role.EndDevice_MED: 'rn',
Thread_Device_Role.SSED: '-',
}
def __init__(self, **kwargs):
"""initialize the serial port and default network parameters
Args:
**kwargs: Arbitrary keyword arguments
Includes 'EUI' and 'SerialPort'
"""
self.intialize(kwargs)
@abstractmethod
def _connect(self):
"""
Connect to the device.
"""
@abstractmethod
def _disconnect(self):
"""
Disconnect from the device
"""
@abstractmethod
def _cliReadLine(self):
"""Read exactly one line from the device
Returns:
None if no data
"""
@abstractmethod
def _cliWriteLine(self, line):
"""Send exactly one line to the device
Args:
line str: data send to device
"""
@abstractmethod
def _onCommissionStart(self):
"""Called when commissioning starts."""
@abstractmethod
def _onCommissionStop(self):
"""Called when commissioning stops."""
def __sendCommand(self, cmd, expectEcho=True):
cmd = self._cmdPrefix + cmd
# self.log("command: %s", cmd)
self._cliWriteLine(cmd)
if expectEcho:
self.__expect(cmd, endswith=True)
_COMMAND_OUTPUT_ERROR_PATTERN = re.compile(r'Error (\d+): (.*)')
@retry(3)
@watched
def __executeCommand(self, cmd, timeout=DEFAULT_COMMAND_TIMEOUT):
"""send specific command to reference unit over serial port
Args:
cmd: OpenThread CLI string
Returns:
Done: successfully send the command to reference unit and parse it
Value: successfully retrieve the desired value from reference unit
Error: some errors occur, indicates by the followed specific error number
"""
if self.logThreadStatus == self.logStatus['running']:
self.logThreadStatus = self.logStatus['pauseReq']
while (self.logThreadStatus != self.logStatus['paused'] and
self.logThreadStatus != self.logStatus['stop']):
pass
try:
self.__sendCommand(cmd)
response = []
t_end = time.time() + timeout
while time.time() < t_end:
line = self.__readCliLine()
if line is None:
time.sleep(0.01)
continue
# self.log("readline: %s", line)
# skip empty lines
if line:
response.append(line)
if line.endswith('Done'):
break
else:
m = OpenThreadTHCI._COMMAND_OUTPUT_ERROR_PATTERN.match(line)
if m is not None:
code, msg = m.groups()
raise CommandError(int(code), msg)
else:
raise Exception('%s: failed to find end of response: %s' % (self, response))
except SerialException as e:
self.log('__executeCommand() Error: ' + str(e))
self._disconnect()
self._connect()
raise e
return response
def __expect(self, expected, timeout=5, endswith=False):
"""Find the `expected` line within `times` tries.
Args:
expected str: the expected string
times int: number of tries
"""
#self.log('Expecting [%s]' % (expected))
deadline = time.time() + timeout
while True:
line = self.__readCliLine()
if line is not None:
#self.log("readline: %s", line)
pass
if line is None:
if time.time() >= deadline:
break
self.sleep(0.01)
continue
matched = line.endswith(expected) if endswith else line == expected
if matched:
#self.log('Expected [%s]' % (expected))
return
raise Exception('failed to find expected string[%s]' % expected)
def __readCliLine(self, ignoreLogs=True):
"""Read the next line from OT CLI.d"""
line = self._cliReadLine()
if ignoreLogs:
while line is not None and LOGX.match(line):
line = self._cliReadLine()
return line
@API
def getVersionNumber(self):
"""get OpenThread stack firmware version number"""
return self.__executeCommand('version')[0]
def log(self, fmt, *args):
try:
msg = fmt % args
# print('%s - %s - %s' % (self.port, time.strftime('%b %d %H:%M:%S'), msg))
print('%s %s' % (self.port, msg))
except Exception:
pass
def sleep(self, duration):
if duration >= 1:
self.log("sleeping for %ss ...", duration)
time.sleep(duration)
@API
def intialize(self, params):
"""initialize the serial port with baudrate, timeout parameters"""
self.port = params.get('SerialPort', '')
# params example: {'EUI': 1616240311388864514L, 'SerialBaudRate': None, 'TelnetIP': '192.168.8.181', 'SerialPort': None, 'Param7': None, 'Param6': None, 'Param5': 'ip', 'TelnetPort': '22', 'Param9': None, 'Param8': None}
try:
ipaddress.ip_address(self.port)
# handle TestHarness Discovery Protocol
self.connectType = 'ip'
self.telnetIp = self.port
self.telnetPort = 22
self.telnetUsername = 'pi' if params.get('Param6') is None else params.get('Param6')
self.telnetPassword = 'raspberry' if params.get('Param7') is None else params.get('Param7')
except ValueError:
self.connectType = (params.get('Param5') or 'usb').lower()
self.telnetIp = params.get('TelnetIP')
self.telnetPort = int(params.get('TelnetPort')) if params.get('TelnetPort') else 22
# username for SSH
self.telnetUsername = 'pi' if params.get('Param6') is None else params.get('Param6')
# password for SSH
self.telnetPassword = 'raspberry' if params.get('Param7') is None else params.get('Param7')
self.mac = params.get('EUI')
self.UIStatusMsg = ''
self.AutoDUTEnable = False
self.isPowerDown = False
self._is_net = False # whether device is through ser2net
self.logStatus = {
'stop': 'stop',
'running': 'running',
'pauseReq': 'pauseReq',
'paused': 'paused',
}
self.joinStatus = {
'notstart': 'notstart',
'ongoing': 'ongoing',
'succeed': 'succeed',
'failed': 'failed',
}
self.logThreadStatus = self.logStatus['stop']
self.deviceConnected = False
# init serial port
self._connect()
if not self.IsBorderRouter:
self.__detectZephyr()
self.__discoverDeviceCapability()
self.UIStatusMsg = self.getVersionNumber()
if self.firmwarePrefix in self.UIStatusMsg:
self.deviceConnected = True
else:
self.UIStatusMsg = ('Firmware Not Matching Expecting ' + self.firmwarePrefix + ', Now is ' +
self.UIStatusMsg)
ModuleHelper.WriteIntoDebugLogger('Err: OpenThread device Firmware not matching..')
def __repr__(self):
if self.connectType == 'ip':
return '[%s:%d]' % (self.telnetIp, self.telnetPort)
else:
return '[%s]' % self.port
@API
def closeConnection(self):
"""close current serial port connection"""
self._disconnect()
def __disableRouterEligible(self):
"""disable router role
"""
cmd = 'routereligible disable'
self.__executeCommand(cmd)
def __setDeviceMode(self, mode):
"""set thread device mode:
Args:
mode: thread device mode
r: rx-on-when-idle
s: secure IEEE 802.15.4 data request
d: full thread device
n: full network data
Returns:
True: successful to set the device mode
False: fail to set the device mode
"""
cmd = 'mode %s' % mode
return self.__executeCommand(cmd)[-1] == 'Done'
def __setRouterUpgradeThreshold(self, iThreshold):
"""set router upgrade threshold
Args:
iThreshold: the number of active routers on the Thread network
partition below which a REED may decide to become a Router.
Returns:
True: successful to set the ROUTER_UPGRADE_THRESHOLD
False: fail to set ROUTER_UPGRADE_THRESHOLD
"""
cmd = 'routerupgradethreshold %s' % str(iThreshold)
return self.__executeCommand(cmd)[-1] == 'Done'
def __setRouterDowngradeThreshold(self, iThreshold):
"""set router downgrade threshold
Args:
iThreshold: the number of active routers on the Thread network
partition above which an active router may decide to
become a child.
Returns:
True: successful to set the ROUTER_DOWNGRADE_THRESHOLD
False: fail to set ROUTER_DOWNGRADE_THRESHOLD
"""
cmd = 'routerdowngradethreshold %s' % str(iThreshold)
return self.__executeCommand(cmd)[-1] == 'Done'
def __setRouterSelectionJitter(self, iRouterJitter):
"""set ROUTER_SELECTION_JITTER parameter for REED to upgrade to Router
Args:
iRouterJitter: a random period prior to request Router ID for REED
Returns:
True: successful to set the ROUTER_SELECTION_JITTER
False: fail to set ROUTER_SELECTION_JITTER
"""
cmd = 'routerselectionjitter %s' % str(iRouterJitter)
return self.__executeCommand(cmd)[-1] == 'Done'
def __setAddressfilterMode(self, mode):
"""set address filter mode
Returns:
True: successful to set address filter mode.
False: fail to set address filter mode.
"""
cmd = 'macfilter addr ' + mode
return self.__executeCommand(cmd)[-1] == 'Done'
def __startOpenThread(self):
"""start OpenThread stack
Returns:
True: successful to start OpenThread stack and thread interface up
False: fail to start OpenThread stack
"""
if self.hasActiveDatasetToCommit:
if self.__executeCommand('dataset commit active')[0] != 'Done':
raise Exception('failed to commit active dataset')
else:
self.hasActiveDatasetToCommit = False
# Restore allowlist/denylist address filter mode if rejoin after
# reset
if self.isPowerDown:
if self._addressfilterMode == 'allowlist':
if self.__setAddressfilterMode('allowlist'):
for addr in self._addressfilterSet:
self.addAllowMAC(addr)
elif self._addressfilterMode == 'denylist':
if self.__setAddressfilterMode('denylist'):
for addr in self._addressfilterSet:
self.addBlockedMAC(addr)
# Set routerselectionjitter to 1 for certain device roles
if self.deviceRole in [
Thread_Device_Role.Leader,
Thread_Device_Role.Router,
Thread_Device_Role.REED,
]:
self.__setRouterSelectionJitter(1)
elif self.deviceRole in [Thread_Device_Role.BR_1, Thread_Device_Role.BR_2]:
self.IsBackboneRouter = True
self.__setRouterSelectionJitter(1)
if self.IsBackboneRouter:
# Configure default BBR dataset
self.__configBbrDataset(SeqNum=self.bbrSeqNum,
MlrTimeout=self.bbrMlrTimeout,
ReRegDelay=self.bbrReRegDelay)
# Add default domain prefix is not configured otherwise
if self.__useDefaultDomainPrefix:
self.__addDefaultDomainPrefix()
self.__executeCommand('ifconfig up')
self.__executeCommand('thread start')
self.isPowerDown = False
return True
@watched
def __isOpenThreadRunning(self):
"""check whether or not OpenThread is running
Returns:
True: OpenThread is running
False: OpenThread is not running
"""
return self.__executeCommand('state')[0] != 'disabled'
@watched
def __isDeviceAttached(self):
"""check whether or not OpenThread is running
Returns:
True: OpenThread is running
False: OpenThread is not running
"""
detached_states = ["detached", "disabled"]
return self.__executeCommand('state')[0] not in detached_states
# rloc16 might be hex string or integer, need to return actual allocated router id
def __convertRlocToRouterId(self, xRloc16):
"""mapping Rloc16 to router id
Args:
xRloc16: hex rloc16 short address
Returns:
actual router id allocated by leader
"""
routerList = self.__executeCommand('router list')[0].split()
rloc16 = None
routerid = None
for index in routerList:
cmd = 'router %s' % index
router = self.__executeCommand(cmd)
for line in router:
if 'Done' in line:
break
elif 'Router ID' in line:
routerid = line.split()[2]
elif 'Rloc' in line:
rloc16 = line.split()[1]
else:
pass
# process input rloc16
if isinstance(xRloc16, str):
rloc16 = '0x' + rloc16
if rloc16 == xRloc16:
return routerid
elif isinstance(xRloc16, int):
if int(rloc16, 16) == xRloc16:
return routerid
else:
pass
return None
# pylint: disable=no-self-use
def __convertLongToHex(self, iValue, fillZeros=None):
"""convert a long hex integer to string
remove '0x' and 'L' return string
Args:
iValue: long integer in hex format
fillZeros: pad string with zeros on the left to specified width
Returns:
string of this long integer without '0x' and 'L'
"""
fmt = '%x'
if fillZeros is not None:
fmt = '%%0%dx' % fillZeros
return fmt % iValue
@commissioning
def __readCommissioningLogs(self, durationInSeconds):
"""read logs during the commissioning process
Args:
durationInSeconds: time duration for reading commissioning logs
Returns:
Commissioning logs
"""
self.logThreadStatus = self.logStatus['running']
logs = Queue()
t_end = time.time() + durationInSeconds
joinSucceed = False
while time.time() < t_end:
if self.logThreadStatus == self.logStatus['pauseReq']:
self.logThreadStatus = self.logStatus['paused']
if self.logThreadStatus != self.logStatus['running']:
self.sleep(0.01)
continue
try:
line = self.__readCliLine(ignoreLogs=False)
if line:
self.log("commissioning log: %s", line)
logs.put(line)
if 'Join success' in line:
joinSucceed = True
# read commissioning logs for 3 more seconds
t_end = time.time() + 3
elif 'Join failed' in line:
# read commissioning logs for 3 more seconds
t_end = time.time() + 3
elif line is None:
self.sleep(0.01)
except Exception:
pass
self.joinCommissionedStatus = self.joinStatus['succeed'] if joinSucceed else self.joinStatus['failed']
self.logThreadStatus = self.logStatus['stop']
return logs
# pylint: disable=no-self-use
def __convertChannelMask(self, channelsArray):
"""convert channelsArray to bitmask format
Args:
channelsArray: channel array (i.e. [21, 22])
Returns:
bitmask format corresponding to a given channel array
"""
maskSet = 0
for eachChannel in channelsArray:
mask = 1 << eachChannel
maskSet = maskSet | mask
return maskSet
def __setChannelMask(self, channelMask):
cmd = 'dataset channelmask %s' % channelMask
self.hasActiveDatasetToCommit = True
return self.__executeCommand(cmd)[-1] == 'Done'
def __setSecurityPolicy(self, securityPolicySecs, securityPolicyFlags):
cmd = 'dataset securitypolicy %s %s' % (
str(securityPolicySecs),
securityPolicyFlags,
)
self.hasActiveDatasetToCommit = True
return self.__executeCommand(cmd)[-1] == 'Done'
def __setKeySwitchGuardTime(self, iKeySwitchGuardTime):
""" set the Key switch guard time
Args:
iKeySwitchGuardTime: key switch guard time
Returns:
True: successful to set key switch guard time
False: fail to set key switch guard time
"""
cmd = 'keysequence guardtime %s' % str(iKeySwitchGuardTime)
if self.__executeCommand(cmd)[-1] == 'Done':
self.sleep(1)
return True
else:
return False
def __getCommissionerSessionId(self):
""" get the commissioner session id allocated from Leader """
return self.__executeCommand('commissioner sessionid')[0]
# pylint: disable=no-self-use
def _deviceEscapeEscapable(self, string):
"""Escape CLI escapable characters in the given string.
Args:
string (str): UTF-8 input string.
Returns:
[str]: The modified string with escaped characters.
"""
escapable_chars = '\\ \t\r\n'
for char in escapable_chars:
string = string.replace(char, '\\%s' % char)
return string
@API
def setNetworkName(self, networkName='GRL'):
"""set Thread Network name
Args:
networkName: the networkname string to be set
Returns:
True: successful to set the Thread Networkname
False: fail to set the Thread Networkname
"""
networkName = self._deviceEscapeEscapable(networkName)
cmd = 'networkname %s' % networkName
datasetCmd = 'dataset networkname %s' % networkName
self.hasActiveDatasetToCommit = True
return self.__executeCommand(cmd)[-1] == 'Done' and self.__executeCommand(datasetCmd)[-1] == 'Done'
@API
def setChannel(self, channel=11):
"""set channel of Thread device operates on.
Args:
channel:
(0 - 10: Reserved)
(11 - 26: 2.4GHz channels)
(27 - 65535: Reserved)
Returns:
True: successful to set the channel
False: fail to set the channel
"""
cmd = 'channel %s' % channel
datasetCmd = 'dataset channel %s' % channel
self.hasSetChannel = True
self.hasActiveDatasetToCommit = True
return self.__executeCommand(cmd)[-1] == 'Done' and self.__executeCommand(datasetCmd)[-1] == 'Done'
@API
def getChannel(self):
"""get current channel"""
return self.__executeCommand('channel')[0]
@API
def setMAC(self, xEUI):
"""set the extended addresss of Thread device
Args:
xEUI: extended address in hex format
Returns:
True: successful to set the extended address
False: fail to set the extended address
"""
if not isinstance(xEUI, str):
address64 = self.__convertLongToHex(xEUI, 16)
else:
address64 = xEUI
cmd = 'extaddr %s' % address64
if self.__executeCommand(cmd)[-1] == 'Done':
self.mac = address64
return True
else:
return False
@API
def getMAC(self, bType=MacType.RandomMac):
"""get one specific type of MAC address
currently OpenThread only supports Random MAC address
Args:
bType: indicate which kind of MAC address is required
Returns:
specific type of MAC address
"""
# if power down happens, return extended address assigned previously
if self.isPowerDown:
macAddr64 = self.mac
else:
if bType == MacType.FactoryMac:
macAddr64 = self.__executeCommand('eui64')[0]
elif bType == MacType.HashMac:
macAddr64 = self.__executeCommand('joiner id')[0]
elif bType == MacType.EthMac and self.IsBorderRouter:
return self._deviceGetEtherMac()
else:
macAddr64 = self.__executeCommand('extaddr')[0]
return int(macAddr64, 16)
@API
def getLL64(self):
"""get link local unicast IPv6 address"""
return self.__executeCommand('ipaddr linklocal')[0]
@API
def getRloc16(self):
"""get rloc16 short address"""
rloc16 = self.__executeCommand('rloc16')[0]
return int(rloc16, 16)
@API
def getRloc(self):
"""get router locator unicast Ipv6 address"""
return self.__executeCommand('ipaddr rloc')[0]
def __getGlobal(self):
"""get global unicast IPv6 address set
if configuring multiple entries
"""
globalAddrs = []
rlocAddr = self.getRloc()
addrs = self.__executeCommand('ipaddr')
# take rloc address as a reference for current mesh local prefix,
# because for some TCs, mesh local prefix may be updated through
# pending dataset management.
for ip6Addr in addrs:
if ip6Addr == 'Done':
break
fullIp = ModuleHelper.GetFullIpv6Address(ip6Addr).lower()
if fullIp.startswith('fe80') or fullIp.startswith(rlocAddr[0:19]):
continue
globalAddrs.append(fullIp)
return globalAddrs
@API
def setNetworkKey(self, key):
"""set Thread network key
Args:
key: Thread network key used in secure the MLE/802.15.4 packet
Returns:
True: successful to set the Thread network key
False: fail to set the Thread network key
"""
if not isinstance(key, str):
networkKey = self.__convertLongToHex(key, 32)
cmd = 'networkkey %s' % networkKey
datasetCmd = 'dataset networkkey %s' % networkKey
else:
networkKey = key
cmd = 'networkkey %s' % networkKey
datasetCmd = 'dataset networkkey %s' % networkKey
self.networkKey = networkKey
self.hasActiveDatasetToCommit = True
return self.__executeCommand(cmd)[-1] == 'Done' and self.__executeCommand(datasetCmd)[-1] == 'Done'
@API
def addBlockedMAC(self, xEUI):
"""add a given extended address to the denylist entry
Args:
xEUI: extended address in hex format
Returns:
True: successful to add a given extended address to the denylist entry
False: fail to add a given extended address to the denylist entry
"""
if isinstance(xEUI, str):
macAddr = xEUI
else:
macAddr = self.__convertLongToHex(xEUI)
# if blocked device is itself
if macAddr == self.mac:
print('block device itself')
return True
if self._addressfilterMode != 'denylist':
if self.__setAddressfilterMode('denylist'):
self._addressfilterMode = 'denylist'
cmd = 'macfilter addr add %s' % macAddr
ret = self.__executeCommand(cmd)[-1] == 'Done'
self._addressfilterSet.add(macAddr)
print('current denylist entries:')
for addr in self._addressfilterSet:
print(addr)
return ret
@API
def addAllowMAC(self, xEUI):
"""add a given extended address to the allowlist addressfilter
Args:
xEUI: a given extended address in hex format
Returns:
True: successful to add a given extended address to the allowlist entry
False: fail to add a given extended address to the allowlist entry
"""
if isinstance(xEUI, str):
macAddr = xEUI
else:
macAddr = self.__convertLongToHex(xEUI)
if self._addressfilterMode != 'allowlist':
if self.__setAddressfilterMode('allowlist'):
self._addressfilterMode = 'allowlist'
cmd = 'macfilter addr add %s' % macAddr
ret = self.__executeCommand(cmd)[-1] == 'Done'
self._addressfilterSet.add(macAddr)
print('current allowlist entries:')
for addr in self._addressfilterSet:
print(addr)
return ret
@API
def clearBlockList(self):
"""clear all entries in denylist table
Returns:
True: successful to clear the denylist
False: fail to clear the denylist
"""
# remove all entries in denylist
print('clearing denylist entries:')
for addr in self._addressfilterSet:
print(addr)
# disable denylist
if self.__setAddressfilterMode('disable'):
self._addressfilterMode = 'disable'
# clear ops
cmd = 'macfilter addr clear'
if self.__executeCommand(cmd)[-1] == 'Done':
self._addressfilterSet.clear()
return True
return False
@API
def clearAllowList(self):
"""clear all entries in allowlist table
Returns:
True: successful to clear the allowlist
False: fail to clear the allowlist
"""
# remove all entries in allowlist
print('clearing allowlist entries:')
for addr in self._addressfilterSet:
print(addr)
# disable allowlist
if self.__setAddressfilterMode('disable'):
self._addressfilterMode = 'disable'
# clear ops
cmd = 'macfilter addr clear'
if self.__executeCommand(cmd)[-1] == 'Done':
self._addressfilterSet.clear()
return True
return False
@API
def getDeviceRole(self):
"""get current device role in Thread Network"""
return self.__executeCommand('state')[0]
@API
def joinNetwork(self, eRoleId):
"""make device ready to join the Thread Network with a given role
Args:
eRoleId: a given device role id
Returns:
True: ready to set Thread Network parameter for joining desired Network
"""
self.deviceRole = eRoleId
mode = '-'
if ModuleHelper.LeaderDutChannelFound and not self.hasSetChannel:
self.channel = ModuleHelper.Default_Channel
# FIXME: when Harness call setNetworkDataRequirement()?
# only sleep end device requires stable networkdata now
if eRoleId == Thread_Device_Role.Leader:
print('join as leader')
mode = 'rdn'
if self.AutoDUTEnable is False:
# set ROUTER_DOWNGRADE_THRESHOLD
self.__setRouterDowngradeThreshold(33)
elif eRoleId == Thread_Device_Role.Router:
print('join as router')
mode = 'rdn'
if self.AutoDUTEnable is False:
# set ROUTER_DOWNGRADE_THRESHOLD
self.__setRouterDowngradeThreshold(33)
elif eRoleId in (Thread_Device_Role.BR_1, Thread_Device_Role.BR_2):
print('join as BBR')
mode = 'rdn'
if self.AutoDUTEnable is False:
# set ROUTER_DOWNGRADE_THRESHOLD
self.__setRouterDowngradeThreshold(33)
elif eRoleId == Thread_Device_Role.SED:
print('join as sleepy end device')
mode = '-'
self.__setPollPeriod(self.__sedPollPeriod)
elif eRoleId == Thread_Device_Role.SSED:
print('join as SSED')
mode = '-'
self.setCSLperiod(self.cslPeriod)
self.setCSLtout(self.ssedTimeout)
self.setCSLsuspension(False)
elif eRoleId == Thread_Device_Role.EndDevice:
print('join as end device')
mode = 'rn'
elif eRoleId == Thread_Device_Role.REED:
print('join as REED')
mode = 'rdn'
if self.AutoDUTEnable is False:
# set ROUTER_UPGRADE_THRESHOLD
self.__setRouterUpgradeThreshold(0)
elif eRoleId == Thread_Device_Role.EndDevice_FED:
print('join as FED')
mode = 'rdn'
# always remain an ED, never request to be a router
self.__disableRouterEligible()
elif eRoleId == Thread_Device_Role.EndDevice_MED:
print('join as MED')
mode = 'rn'
else:
pass
# set Thread device mode with a given role
self.__setDeviceMode(mode)
# start OpenThread
self.__startOpenThread()
self.wait_for_attach_to_the_network(expected_role=eRoleId,
timeout=self.NETWORK_ATTACHMENT_TIMEOUT,
raise_assert=True)
return True
def wait_for_attach_to_the_network(self, expected_role, timeout, raise_assert=False):
start_time = time.time()
while time.time() < start_time + timeout:
time.sleep(0.3)
if self.__isDeviceAttached():
break
else:
if raise_assert:
raise AssertionError("OT device {} could not attach to the network after {} s of timeout.".format(
self, timeout))
else:
return False
if self._update_router_status:
self.__updateRouterStatus()
if expected_role == Thread_Device_Role.Router:
while time.time() < start_time + timeout:
time.sleep(0.3)
if self.getDeviceRole() == "router":
break
else:
if raise_assert:
raise AssertionError("OT Router {} could not attach to the network after {} s of timeout.".format(
self, timeout * 2))
else:
return False
return True
@API
def getNetworkFragmentID(self):
"""get current partition id of Thread Network Partition from LeaderData
Returns:
The Thread network Partition Id
"""
if not self.__isOpenThreadRunning():
return None
leaderData = self.__executeCommand('leaderdata')
return int(leaderData[0].split()[2], 16)
@API
def getParentAddress(self):
"""get Thread device's parent extended address and rloc16 short address
Returns:
The extended address of parent in hex format
"""
eui = None
parentInfo = self.__executeCommand('parent')
for line in parentInfo:
if 'Done' in line:
break
elif 'Ext Addr' in line:
eui = line.split()[2]
else:
pass
return int(eui, 16)
@API
def powerDown(self):
"""power down the Thread device"""
self.__sendCommand('reset', expectEcho=False)
if not self.IsBorderRouter:
self._disconnect()
self._connect()
self.isPowerDown = True
@API
def powerUp(self):
"""power up the Thread device"""
self.isPowerDown = False
if not self.__isOpenThreadRunning():
if self.deviceRole == Thread_Device_Role.SED:
self.__setPollPeriod(self.__sedPollPeriod)
self.__startOpenThread()
def reset_and_wait_for_connection(self, timeout=3):
print("Waiting after reset timeout: {} s".format(timeout))
start_time = time.time()
self.__sendCommand('reset', expectEcho=False)
self.isPowerDown = True
while time.time() < start_time + timeout:
time.sleep(0.3)
if not self.IsBorderRouter:
self._disconnect()
self._connect()
try:
self.__executeCommand('state', timeout=0.1)
break
except Exception:
continue
else:
raise AssertionError("Could not connect with OT device {} after reset.".format(self))
if self.deviceRole == Thread_Device_Role.SED:
self.__setPollPeriod(self.__sedPollPeriod)
@API
def reboot(self):
"""reset and rejoin to Thread Network without any timeout
Returns:
True: successful to reset and rejoin the Thread Network
False: fail to reset and rejoin the Thread Network
"""
self.reset_and_wait_for_connection()
self.__startOpenThread()
return self.wait_for_attach_to_the_network(expected_role="", timeout=self.NETWORK_ATTACHMENT_TIMEOUT)
@API
def resetAndRejoin(self, timeout):
"""reset and join back Thread Network with a given timeout delay
Args:
timeout: a timeout interval before rejoin Thread Network
Returns:
True: successful to reset and rejoin Thread Network
False: fail to reset and rejoin the Thread Network
"""
self.powerDown()
time.sleep(timeout)
self.powerUp()
return self.wait_for_attach_to_the_network(expected_role="", timeout=self.NETWORK_ATTACHMENT_TIMEOUT)
@API
def ping(self, strDestination, ilength=0, hop_limit=64, timeout=5):
""" send ICMPv6 echo request with a given length/hoplimit to a unicast
destination address
Args:
srcDestination: the unicast destination address of ICMPv6 echo request
ilength: the size of ICMPv6 echo request payload
hop_limit: hop limit
"""
cmd = 'ping %s %s 1 1 %d %d' % (strDestination, str(ilength), hop_limit, timeout)
self.__executeCommand(cmd)
@API
def multicast_Ping(self, destination, length=20):
"""send ICMPv6 echo request with a given length to a multicast destination
address
Args:
destination: the multicast destination address of ICMPv6 echo request
length: the size of ICMPv6 echo request payload
"""
cmd = 'ping %s %s' % (destination, str(length))
self.__sendCommand(cmd)
# wait echo reply
self.sleep(1)
@API
def setPANID(self, xPAN):
"""set Thread Network PAN ID
Args:
xPAN: a given PAN ID in hex format
Returns:
True: successful to set the Thread Network PAN ID
False: fail to set the Thread Network PAN ID
"""
panid = ''
if not isinstance(xPAN, str):
panid = str(hex(xPAN))
cmd = 'panid %s' % panid
datasetCmd = 'dataset panid %s' % panid
self.hasActiveDatasetToCommit = True
return self.__executeCommand(cmd)[-1] == 'Done' and self.__executeCommand(datasetCmd)[-1] == 'Done'
@API
def reset(self):
"""factory reset"""
self._deviceBeforeReset()
self.__sendCommand('factoryreset', expectEcho=False)
timeout = 10
start_time = time.time()
while time.time() < start_time + timeout:
time.sleep(0.5)
if not self.IsBorderRouter:
self._disconnect()
self._connect()
try:
self.__executeCommand('state', timeout=0.1)
break
except Exception:
self.__restartAgentService()
time.sleep(2)
self.__sendCommand('factoryreset', expectEcho=False)
time.sleep(0.5)
continue
else:
raise AssertionError("Could not connect with OT device {} after reset.".format(self))
self.log('factoryreset finished within 10s timeout.')
self._deviceAfterReset()
@API
def removeRouter(self, xRouterId):
"""kickoff router with a given router id from the Thread Network
Args:
xRouterId: a given router id in hex format
Returns:
True: successful to remove the router from the Thread Network
False: fail to remove the router from the Thread Network
"""
routerId = self.__convertRlocToRouterId(xRouterId)
if routerId is None:
return False
cmd = 'releaserouterid %s' % routerId
return self.__executeCommand(cmd)[-1] == 'Done'
@API
def setDefaultValues(self):
"""set default mandatory Thread Network parameter value"""
# initialize variables
self.networkName = ModuleHelper.Default_NwkName
self.networkKey = ModuleHelper.Default_NwkKey
self.channel = ModuleHelper.Default_Channel
self.channelMask = '0x7fff800' # (0xffff << 11)
self.panId = ModuleHelper.Default_PanId
self.xpanId = ModuleHelper.Default_XpanId
self.meshLocalPrefix = ModuleHelper.Default_MLPrefix
stretchedPSKc = Thread_PBKDF2.get(ModuleHelper.Default_PSKc, ModuleHelper.Default_XpanId,
ModuleHelper.Default_NwkName)
self.pskc = hex(stretchedPSKc).rstrip('L').lstrip('0x')
self.securityPolicySecs = ModuleHelper.Default_SecurityPolicy
self.securityPolicyFlags = 'onrc'
self.activetimestamp = ModuleHelper.Default_ActiveTimestamp
# self.sedPollingRate = ModuleHelper.Default_Harness_SED_Polling_Rate
self.__sedPollPeriod = 3 * 1000 # in milliseconds
self.ssedTimeout = 30 # in seconds
self.cslPeriod = 500 # in milliseconds
self.deviceRole = None
self.provisioningUrl = ''
self.hasActiveDatasetToCommit = False
self.logThread = Queue()
self.logThreadStatus = self.logStatus['stop']
self.joinCommissionedStatus = self.joinStatus['notstart']
# indicate Thread device requests full or stable network data
self.networkDataRequirement = ''
# indicate if Thread device experiences a power down event
self.isPowerDown = False
# indicate AddressFilter mode ['disable', 'allowlist', 'denylist']
self._addressfilterMode = 'disable'
self._addressfilterSet = set() # cache filter entries
# indicate if Thread device is an active commissioner
self.isActiveCommissioner = False
# indicate that the channel has been set, in case the channel was set
# to default when joining network
self.hasSetChannel = False
# indicate whether the default domain prefix is used.
self.__useDefaultDomainPrefix = True
self.__isUdpOpened = False
self.IsBackboneRouter = False
self.IsHost = False
# remove stale multicast addresses
if self.IsBorderRouter:
self.stopListeningToAddrAll()
# BBR dataset
self.bbrSeqNum = random.randint(0, 126) # 5.21.4.2
self.bbrMlrTimeout = 3600
self.bbrReRegDelay = 5
# initialize device configuration
self.setMAC(self.mac)
self.__setChannelMask(self.channelMask)
self.__setSecurityPolicy(self.securityPolicySecs, self.securityPolicyFlags)
self.setChannel(self.channel)
self.setPANID(self.panId)
self.setXpanId(self.xpanId)
self.setNetworkName(self.networkName)
self.setNetworkKey(self.networkKey)
self.setMLPrefix(self.meshLocalPrefix)
self.setPSKc(self.pskc)
self.setActiveTimestamp(self.activetimestamp)
@API
def getDeviceConncetionStatus(self):
"""check if serial port connection is ready or not"""
return self.deviceConnected
@API
def setPollingRate(self, iPollingRate):
"""set data polling rate for sleepy end device
Args:
iPollingRate: data poll period of sleepy end device (in seconds)
Returns:
True: successful to set the data polling rate for sleepy end device
False: fail to set the data polling rate for sleepy end device
"""
iPollingRate = int(iPollingRate * 1000)
if self.__sedPollPeriod != iPollingRate:
if not iPollingRate:
iPollingRate = 0xFFFF # T5.2.1, disable polling
elif iPollingRate < 1:
iPollingRate = 1 # T9.2.13
self.__sedPollPeriod = iPollingRate
# apply immediately
if self.__isOpenThreadRunning():
return self.__setPollPeriod(self.__sedPollPeriod)
return True
def __setPollPeriod(self, iPollPeriod):
"""set data poll period for sleepy end device
Args:
iPollPeriod: data poll period of sleepy end device (in milliseconds)
Returns:
True: successful to set the data poll period for sleepy end device
False: fail to set the data poll period for sleepy end device
"""
cmd = 'pollperiod %d' % iPollPeriod
return self.__executeCommand(cmd)[-1] == 'Done'
@API
def setLinkQuality(self, EUIadr, LinkQuality):
"""set custom LinkQualityIn for all receiving messages from the specified EUIadr
Args:
EUIadr: a given extended address
LinkQuality: a given custom link quality
link quality/link margin mapping table
3: 21 - 255 (dB)
2: 11 - 20 (dB)
1: 3 - 9 (dB)
0: 0 - 2 (dB)
Returns:
True: successful to set the link quality
False: fail to set the link quality
"""
# process EUIadr
euiHex = hex(EUIadr)
euiStr = str(euiHex)
euiStr = euiStr.rstrip('L')
address64 = ''
if '0x' in euiStr:
address64 = self.__lstrip0x(euiStr)
# prepend 0 at the beginning
if len(address64) < 16:
address64 = address64.zfill(16)
print(address64)
cmd = 'macfilter rss add-lqi %s %s' % (address64, str(LinkQuality))
return self.__executeCommand(cmd)[-1] == 'Done'
@API
def setOutBoundLinkQuality(self, LinkQuality):
"""set custom LinkQualityIn for all receiving messages from the any address
Args:
LinkQuality: a given custom link quality
link quality/link margin mapping table
3: 21 - 255 (dB)
2: 11 - 20 (dB)
1: 3 - 9 (dB)
0: 0 - 2 (dB)
Returns:
True: successful to set the link quality
False: fail to set the link quality
"""
cmd = 'macfilter rss add-lqi * %s' % str(LinkQuality)
return self.__executeCommand(cmd)[-1] == 'Done'
@API
def removeRouterPrefix(self, prefixEntry):
"""remove the configured prefix on a border router
Args:
prefixEntry: a on-mesh prefix entry in IPv6 dotted-quad format
Returns:
True: successful to remove the prefix entry from border router
False: fail to remove the prefix entry from border router
"""
assert (ipaddress.IPv6Network(prefixEntry.decode()))
cmd = 'prefix remove %s/64' % prefixEntry
if self.__executeCommand(cmd)[-1] == 'Done':
# send server data ntf to leader
return self.__executeCommand('netdata register')[-1] == 'Done'
else:
return False
@API
def configBorderRouter(
self,
P_Prefix="fd00:7d03:7d03:7d03::",
P_stable=1,
P_default=1,
P_slaac_preferred=0,
P_Dhcp=0,
P_preference=0,
P_on_mesh=1,
P_nd_dns=0,
P_dp=0,
):
"""configure the border router with a given prefix entry parameters
Args:
P_Prefix: IPv6 prefix that is available on the Thread Network in IPv6 dotted-quad format
P_stable: true if the default router is expected to be stable network data
P_default: true if border router offers the default route for P_Prefix
P_slaac_preferred: true if allowing auto-configure address using P_Prefix
P_Dhcp: is true if border router is a DHCPv6 Agent
P_preference: is two-bit signed integer indicating router preference
P_on_mesh: is true if P_Prefix is considered to be on-mesh
P_nd_dns: is true if border router is able to supply DNS information obtained via ND
Returns:
True: successful to configure the border router with a given prefix entry
False: fail to configure the border router with a given prefix entry
"""
assert (ipaddress.IPv6Network(P_Prefix.decode()))
# turn off default domain prefix if configBorderRouter is called before joining network
if P_dp == 0 and not self.__isOpenThreadRunning():
self.__useDefaultDomainPrefix = False
parameter = ''
prf = ''
if P_dp:
P_slaac_preferred = 1
if P_slaac_preferred == 1:
parameter += 'p'
parameter += 'a'
if P_stable == 1:
parameter += 's'
if P_default == 1:
parameter += 'r'
if P_Dhcp == 1:
parameter += 'd'
if P_on_mesh == 1:
parameter += 'o'
if P_dp == 1:
assert P_slaac_preferred and P_default and P_on_mesh and P_stable
parameter += 'D'
if P_preference == 1:
prf = 'high'
elif P_preference == 0:
prf = 'med'
elif P_preference == -1:
prf = 'low'
else:
pass
cmd = 'prefix add %s/64 %s %s' % (P_Prefix, parameter, prf)
if self.__executeCommand(cmd)[-1] == 'Done':
# if prefix configured before starting OpenThread stack
# do not send out server data ntf pro-actively
if not self.__isOpenThreadRunning():
return True
else:
# send server data ntf to leader
return self.__executeCommand('netdata register')[-1] == 'Done'
else:
return False
@API
def setNetworkIDTimeout(self, iNwkIDTimeOut):
"""set networkid timeout for Thread device
Args:
iNwkIDTimeOut: a given NETWORK_ID_TIMEOUT
Returns:
True: successful to set NETWORK_ID_TIMEOUT
False: fail to set NETWORK_ID_TIMEOUT
"""
iNwkIDTimeOut /= 1000
cmd = 'networkidtimeout %s' % str(iNwkIDTimeOut)
return self.__executeCommand(cmd)[-1] == 'Done'
@API
def setKeepAliveTimeOut(self, iTimeOut):
"""set child timeout for device
Args:
iTimeOut: child timeout for device
Returns:
True: successful to set the child timeout for device
False: fail to set the child timeout for device
"""
cmd = 'childtimeout %d' % iTimeOut
return self.__executeCommand(cmd)[-1] == 'Done'
@API
def setKeySequenceCounter(self, iKeySequenceValue):
""" set the Key sequence counter corresponding to Thread network key
Args:
iKeySequenceValue: key sequence value
Returns:
True: successful to set the key sequence
False: fail to set the key sequence
"""
# avoid key switch guard timer protection for reference device
self.__setKeySwitchGuardTime(0)
cmd = 'keysequence counter %s' % str(iKeySequenceValue)
if self.__executeCommand(cmd)[-1] == 'Done':
self.sleep(1)
return True
else:
return False
@API
def getKeySequenceCounter(self):
"""get current Thread Network key sequence"""
keySequence = self.__executeCommand('keysequence counter')[0]
return keySequence
@API
def incrementKeySequenceCounter(self, iIncrementValue=1):
"""increment the key sequence with a given value
Args:
iIncrementValue: specific increment value to be added
Returns:
True: successful to increment the key sequence with a given value
False: fail to increment the key sequence with a given value
"""
# avoid key switch guard timer protection for reference device
self.__setKeySwitchGuardTime(0)
currentKeySeq = self.getKeySequenceCounter()
keySequence = int(currentKeySeq, 10) + iIncrementValue
return self.setKeySequenceCounter(keySequence)
@API
def setNetworkDataRequirement(self, eDataRequirement):
"""set whether the Thread device requires the full network data
or only requires the stable network data
Args:
eDataRequirement: is true if requiring the full network data
Returns:
True: successful to set the network requirement
"""
if eDataRequirement == Device_Data_Requirement.ALL_DATA:
self.networkDataRequirement = 'n'
return True
@API
def configExternalRouter(self, P_Prefix, P_stable, R_Preference=0):
"""configure border router with a given external route prefix entry
Args:
P_Prefix: IPv6 prefix for the route in IPv6 dotted-quad format
P_Stable: is true if the external route prefix is stable network data
R_Preference: a two-bit signed integer indicating Router preference
1: high
0: medium
-1: low
Returns:
True: successful to configure the border router with a given external route prefix
False: fail to configure the border router with a given external route prefix
"""
assert (ipaddress.IPv6Network(P_Prefix.decode()))
prf = ''
stable = ''
if R_Preference == 1:
prf = 'high'
elif R_Preference == 0:
prf = 'med'
elif R_Preference == -1:
prf = 'low'
else:
pass
if P_stable:
stable += 's'
cmd = 'route add %s/64 %s %s' % (P_Prefix, stable, prf)
else:
cmd = 'route add %s/64 %s' % (P_Prefix, prf)
if self.__executeCommand(cmd)[-1] == 'Done':
# send server data ntf to leader
return self.__executeCommand('netdata register')[-1] == 'Done'
@API
def getNeighbouringRouters(self):
"""get neighboring routers information
Returns:
neighboring routers' extended address
"""
routerInfo = []
routerList = self.__executeCommand('router list')[0].split()
if 'Done' in routerList:
return None
for index in routerList:
router = []
cmd = 'router %s' % index
router = self.__executeCommand(cmd)
for line in router:
if 'Done' in line:
break
# elif 'Rloc' in line:
# rloc16 = line.split()[1]
elif 'Ext Addr' in line:
eui = line.split()[2]
routerInfo.append(int(eui, 16))
# elif 'LQI In' in line:
# lqi_in = line.split()[1]
# elif 'LQI Out' in line:
# lqi_out = line.split()[1]
else:
pass
return routerInfo
@API
def getChildrenInfo(self):
"""get all children information
Returns:
children's extended address
"""
eui = None
rloc16 = None
childrenInfoAll = []
childrenInfo = {'EUI': 0, 'Rloc16': 0, 'MLEID': ''}
childrenList = self.__executeCommand('child list')[0].split()
if 'Done' in childrenList:
return None
for index in childrenList:
cmd = 'child %s' % index
child = []
child = self.__executeCommand(cmd)
for line in child:
if 'Done' in line:
break
elif 'Rloc' in line:
rloc16 = line.split()[1]
elif 'Ext Addr' in line:
eui = line.split()[2]
# elif 'Child ID' in line:
# child_id = line.split()[2]
# elif 'Mode' in line:
# mode = line.split()[1]
else:
pass
childrenInfo['EUI'] = int(eui, 16)
childrenInfo['Rloc16'] = int(rloc16, 16)
# children_info['MLEID'] = self.getMLEID()
childrenInfoAll.append(childrenInfo['EUI'])
# childrenInfoAll.append(childrenInfo)
return childrenInfoAll
@API
def setXpanId(self, xPanId):
"""set extended PAN ID of Thread Network
Args:
xPanId: extended PAN ID in hex format
Returns:
True: successful to set the extended PAN ID
False: fail to set the extended PAN ID
"""
xpanid = ''
if not isinstance(xPanId, str):
xpanid = self.__convertLongToHex(xPanId, 16)
cmd = 'extpanid %s' % xpanid
datasetCmd = 'dataset extpanid %s' % xpanid
else:
xpanid = xPanId
cmd = 'extpanid %s' % xpanid
datasetCmd = 'dataset extpanid %s' % xpanid
self.xpanId = xpanid
self.hasActiveDatasetToCommit = True
return self.__executeCommand(cmd)[-1] == 'Done' and self.__executeCommand(datasetCmd)[-1] == 'Done'
@API
def getNeighbouringDevices(self):
"""gets the neighboring devices' extended address to compute the DUT
extended address automatically
Returns:
A list including extended address of neighboring routers, parent
as well as children
"""
neighbourList = []
# get parent info
parentAddr = self.getParentAddress()
if parentAddr != 0:
neighbourList.append(parentAddr)
# get ED/SED children info
childNeighbours = self.getChildrenInfo()
if childNeighbours is not None and len(childNeighbours) > 0:
for entry in childNeighbours:
neighbourList.append(entry)
# get neighboring routers info
routerNeighbours = self.getNeighbouringRouters()
if routerNeighbours is not None and len(routerNeighbours) > 0:
for entry in routerNeighbours:
neighbourList.append(entry)
return neighbourList
@API
def setPartationId(self, partationId):
"""set Thread Network Partition ID
Args:
partitionId: partition id to be set by leader
Returns:
True: successful to set the Partition ID
False: fail to set the Partition ID
"""
cmd = 'partitionid preferred %s' % (str(hex(partationId)).rstrip('L'))
return self.__executeCommand(cmd)[-1] == 'Done'
@API
def getGUA(self, filterByPrefix=None, eth=False):
"""get expected global unicast IPv6 address of Thread device
note: existing filterByPrefix are string of in lowercase. e.g.
'2001' or '2001:0db8:0001:0000".
Args:
filterByPrefix: a given expected global IPv6 prefix to be matched
Returns:
a global IPv6 address
"""
assert not eth
# get global addrs set if multiple
globalAddrs = self.__getGlobal()
if filterByPrefix is None:
return globalAddrs[0]
else:
for fullIp in globalAddrs:
if fullIp.startswith(filterByPrefix):
return fullIp
return str(globalAddrs[0])
@API
def getShortAddress(self):
"""get Rloc16 short address of Thread device"""
return self.getRloc16()
@API
def getULA64(self):
"""get mesh local EID of Thread device"""
return self.__executeCommand('ipaddr mleid')[0]
@API
def setMLPrefix(self, sMeshLocalPrefix):
"""set mesh local prefix"""
cmd = 'dataset meshlocalprefix %s' % sMeshLocalPrefix
self.hasActiveDatasetToCommit = True
return self.__executeCommand(cmd)[-1] == 'Done'
@API
def getML16(self):
"""get mesh local 16 unicast address (Rloc)"""
return self.getRloc()
@API
def downgradeToDevice(self):
pass
@API
def upgradeToRouter(self):
pass
@API
def forceSetSlaac(self, slaacAddress):
"""force to set a slaac IPv6 address to Thread interface
Args:
slaacAddress: a slaac IPv6 address to be set
Returns:
True: successful to set slaac address to Thread interface
False: fail to set slaac address to Thread interface
"""
cmd = 'ipaddr add %s' % str(slaacAddress)
return self.__executeCommand(cmd)[-1] == 'Done'
@API
def setSleepyNodePollTime(self):
pass
@API
def enableAutoDUTObjectFlag(self):
"""set AutoDUTenable flag"""
self.AutoDUTEnable = True
@API
def getChildTimeoutValue(self):
"""get child timeout"""
childTimeout = self.__executeCommand('childtimeout')[0]
return int(childTimeout)
@API
def diagnosticGet(self, strDestinationAddr, listTLV_ids=()):
if not listTLV_ids:
return
if len(listTLV_ids) == 0:
return
cmd = 'networkdiagnostic get %s %s' % (
strDestinationAddr,
' '.join([str(tlv) for tlv in listTLV_ids]),
)
return self.__sendCommand(cmd, expectEcho=False)
@API
def diagnosticReset(self, strDestinationAddr, listTLV_ids=()):
if not listTLV_ids:
return
if len(listTLV_ids) == 0:
return
cmd = 'networkdiagnostic reset %s %s' % (
strDestinationAddr,
' '.join([str(tlv) for tlv in listTLV_ids]),
)
return self.__executeCommand(cmd)
@API
def diagnosticQuery(self, strDestinationAddr, listTLV_ids=()):
self.diagnosticGet(strDestinationAddr, listTLV_ids)
@API
def startNativeCommissioner(self, strPSKc='GRLPASSPHRASE'):
# TODO: Support the whole Native Commissioner functionality
# Currently it only aims to trigger a Discovery Request message to pass
# Certification test 5.8.4
self.__executeCommand('ifconfig up')
cmd = 'joiner start %s' % (strPSKc)
return self.__executeCommand(cmd)[-1] == 'Done'
@API
def startExternalCommissioner(self, baAddr, baPort):
"""Start external commissioner
Args:
baAddr: A string represents the border agent address.
baPort: An integer represents the border agent port.
Returns:
A boolean indicates whether this function succeed.
"""
if self.externalCommissioner is None:
config = commissioner.Configuration()
config.isCcmMode = False
config.domainName = OpenThreadTHCI.DOMAIN_NAME
config.pskc = bytearray.fromhex(self.pskc)
self.externalCommissioner = OTCommissioner(config, self)
if not self.externalCommissioner.isActive():
self.externalCommissioner.start(baAddr, baPort)
if not self.externalCommissioner.isActive():
raise commissioner.Error("external commissioner is not active")
return True
@API
def stopExternalCommissioner(self):
"""Stop external commissioner
Returns:
A boolean indicates whether this function succeed.
"""
if self.externalCommissioner is not None:
self.externalCommissioner.stop()
return not self.externalCommissioner.isActive()
@API
def startCollapsedCommissioner(self, role=Thread_Device_Role.Leader):
"""start Collapsed Commissioner
Returns:
True: successful to start Commissioner
False: fail to start Commissioner
"""
if self.__startOpenThread():
self.wait_for_attach_to_the_network(expected_role=self.deviceRole,
timeout=self.NETWORK_ATTACHMENT_TIMEOUT,
raise_assert=True)
cmd = 'commissioner start'
if self.__executeCommand(cmd)[-1] == 'Done':
self.isActiveCommissioner = True
self.sleep(20) # time for petition process
return True
return False
@API
def setJoinKey(self, strPSKc):
pass
@API
def scanJoiner(self, xEUI='*', strPSKd='THREADJPAKETEST'):
"""scan Joiner
Args:
xEUI: Joiner's EUI-64
strPSKd: Joiner's PSKd for commissioning
Returns:
True: successful to add Joiner's steering data
False: fail to add Joiner's steering data
"""
self.log("scanJoiner on channel %s", self.getChannel())
# long timeout value to avoid automatic joiner removal (in seconds)
timeout = 500
if not isinstance(xEUI, str):
eui64 = self.__convertLongToHex(xEUI, 16)
else:
eui64 = xEUI
strPSKd = self.__normalizePSKd(strPSKd)
cmd = 'commissioner joiner add %s %s %s' % (
self._deviceEscapeEscapable(eui64),
strPSKd,
str(timeout),
)
if self.__executeCommand(cmd)[-1] == 'Done':
if self.logThreadStatus == self.logStatus['stop']:
self.logThread = ThreadRunner.run(target=self.__readCommissioningLogs, args=(120,))
return True
else:
return False
@staticmethod
def __normalizePSKd(strPSKd):
return strPSKd.upper().replace('I', '1').replace('O', '0').replace('Q', '0').replace('Z', '2')
@API
def setProvisioningUrl(self, strURL='grl.com'):
"""set provisioning Url
Args:
strURL: Provisioning Url string
Returns:
True: successful to set provisioning Url
False: fail to set provisioning Url
"""
self.provisioningUrl = strURL
if self.deviceRole == Thread_Device_Role.Commissioner:
cmd = 'commissioner provisioningurl %s' % (strURL)
return self.__executeCommand(cmd)[-1] == 'Done'
return True
@API
def allowCommission(self):
"""start commissioner candidate petition process
Returns:
True: successful to start commissioner candidate petition process
False: fail to start commissioner candidate petition process
"""
cmd = 'commissioner start'
if self.__executeCommand(cmd)[-1] == 'Done':
self.isActiveCommissioner = True
# time for petition process and at least one keep alive
self.sleep(3)
return True
else:
return False
@API
def joinCommissioned(self, strPSKd='THREADJPAKETEST', waitTime=20):
"""start joiner
Args:
strPSKd: Joiner's PSKd
Returns:
True: successful to start joiner
False: fail to start joiner
"""
self.log("joinCommissioned on channel %s", self.getChannel())
if self.deviceRole in [
Thread_Device_Role.Leader,
Thread_Device_Role.Router,
Thread_Device_Role.REED,
]:
self.__setRouterSelectionJitter(1)
self.__executeCommand('ifconfig up')
strPSKd = self.__normalizePSKd(strPSKd)
cmd = 'joiner start %s %s' % (strPSKd, self.provisioningUrl)
if self.__executeCommand(cmd)[-1] == 'Done':
maxDuration = 150 # seconds
self.joinCommissionedStatus = self.joinStatus['ongoing']
if self.logThreadStatus == self.logStatus['stop']:
self.logThread = ThreadRunner.run(target=self.__readCommissioningLogs, args=(maxDuration,))
t_end = time.time() + maxDuration
while time.time() < t_end:
if self.joinCommissionedStatus == self.joinStatus['succeed']:
break
elif self.joinCommissionedStatus == self.joinStatus['failed']:
return False
self.sleep(1)
self.setMAC(self.mac)
self.__executeCommand('thread start')
self.wait_for_attach_to_the_network(expected_role=self.deviceRole,
timeout=self.NETWORK_ATTACHMENT_TIMEOUT,
raise_assert=True)
return True
else:
return False
@API
def getCommissioningLogs(self):
"""get Commissioning logs
Returns:
Commissioning logs
"""
rawLogs = self.logThread.get()
ProcessedLogs = []
payload = []
while not rawLogs.empty():
rawLogEach = rawLogs.get()
if '[THCI]' not in rawLogEach:
continue
EncryptedPacket = PlatformDiagnosticPacket()
infoList = rawLogEach.split('[THCI]')[1].split(']')[0].split('|')
for eachInfo in infoList:
info = eachInfo.split('=')
infoType = info[0].strip()
infoValue = info[1].strip()
if 'direction' in infoType:
EncryptedPacket.Direction = (PlatformDiagnosticPacket_Direction.IN
if 'recv' in infoValue else PlatformDiagnosticPacket_Direction.OUT if
'send' in infoValue else PlatformDiagnosticPacket_Direction.UNKNOWN)
elif 'type' in infoType:
EncryptedPacket.Type = (PlatformDiagnosticPacket_Type.JOIN_FIN_req if 'JOIN_FIN.req' in infoValue
else PlatformDiagnosticPacket_Type.JOIN_FIN_rsp if 'JOIN_FIN.rsp'
in infoValue else PlatformDiagnosticPacket_Type.JOIN_ENT_req if
'JOIN_ENT.ntf' in infoValue else PlatformDiagnosticPacket_Type.JOIN_ENT_rsp
if 'JOIN_ENT.rsp' in infoValue else PlatformDiagnosticPacket_Type.UNKNOWN)
elif 'len' in infoType:
bytesInEachLine = 16
EncryptedPacket.TLVsLength = int(infoValue)
payloadLineCount = (int(infoValue) + bytesInEachLine - 1) / bytesInEachLine
while payloadLineCount > 0:
payloadLineCount = payloadLineCount - 1
payloadLine = rawLogs.get()
payloadSplit = payloadLine.split('|')
for block in range(1, 3):
payloadBlock = payloadSplit[block]
payloadValues = payloadBlock.split(' ')
for num in range(1, 9):
if '..' not in payloadValues[num]:
payload.append(int(payloadValues[num], 16))
EncryptedPacket.TLVs = (PlatformPackets.read(EncryptedPacket.Type, payload)
if payload != [] else [])
ProcessedLogs.append(EncryptedPacket)
return ProcessedLogs
@API
def MGMT_ED_SCAN(
self,
sAddr,
xCommissionerSessionId,
listChannelMask,
xCount,
xPeriod,
xScanDuration,
):
"""send MGMT_ED_SCAN message to a given destinaition.
Args:
sAddr: IPv6 destination address for this message
xCommissionerSessionId: commissioner session id
listChannelMask: a channel array to indicate which channels to be scaned
xCount: number of IEEE 802.15.4 ED Scans (milliseconds)
xPeriod: Period between successive IEEE802.15.4 ED Scans (milliseconds)
xScanDuration: ScanDuration when performing an IEEE 802.15.4 ED Scan (milliseconds)
Returns:
True: successful to send MGMT_ED_SCAN message.
False: fail to send MGMT_ED_SCAN message
"""
channelMask = '0x' + self.__convertLongToHex(self.__convertChannelMask(listChannelMask))
cmd = 'commissioner energy %s %s %s %s %s' % (
channelMask,
xCount,
xPeriod,
xScanDuration,
sAddr,
)
return self.__executeCommand(cmd)[-1] == 'Done'
@API
def MGMT_PANID_QUERY(self, sAddr, xCommissionerSessionId, listChannelMask, xPanId):
"""send MGMT_PANID_QUERY message to a given destination
Args:
xPanId: a given PAN ID to check the conflicts
Returns:
True: successful to send MGMT_PANID_QUERY message.
False: fail to send MGMT_PANID_QUERY message.
"""
panid = ''
channelMask = '0x' + self.__convertLongToHex(self.__convertChannelMask(listChannelMask))
if not isinstance(xPanId, str):
panid = str(hex(xPanId))
cmd = 'commissioner panid %s %s %s' % (panid, channelMask, sAddr)
return self.__executeCommand(cmd)[-1] == 'Done'
@API
def MGMT_ANNOUNCE_BEGIN(self, sAddr, xCommissionerSessionId, listChannelMask, xCount, xPeriod):
"""send MGMT_ANNOUNCE_BEGIN message to a given destination
Returns:
True: successful to send MGMT_ANNOUNCE_BEGIN message.
False: fail to send MGMT_ANNOUNCE_BEGIN message.
"""
channelMask = '0x' + self.__convertLongToHex(self.__convertChannelMask(listChannelMask))
cmd = 'commissioner announce %s %s %s %s' % (
channelMask,
xCount,
xPeriod,
sAddr,
)
return self.__executeCommand(cmd)[-1] == 'Done'
@API
def MGMT_ACTIVE_GET(self, Addr='', TLVs=()):
"""send MGMT_ACTIVE_GET command
Returns:
True: successful to send MGMT_ACTIVE_GET
False: fail to send MGMT_ACTIVE_GET
"""
cmd = 'dataset mgmtgetcommand active'
if Addr != '':
cmd += ' address '
cmd += Addr
if len(TLVs) != 0:
tlvs = ''.join('%02x' % tlv for tlv in TLVs)
cmd += ' -x '
cmd += tlvs
return self.__executeCommand(cmd)[-1] == 'Done'
@API
def MGMT_ACTIVE_SET(
self,
sAddr='',
xCommissioningSessionId=None,
listActiveTimestamp=None,
listChannelMask=None,
xExtendedPanId=None,
sNetworkName=None,
sPSKc=None,
listSecurityPolicy=None,
xChannel=None,
sMeshLocalPrefix=None,
xMasterKey=None,
xPanId=None,
xTmfPort=None,
xSteeringData=None,
xBorderRouterLocator=None,
BogusTLV=None,
xDelayTimer=None,
):
"""send MGMT_ACTIVE_SET command
Returns:
True: successful to send MGMT_ACTIVE_SET
False: fail to send MGMT_ACTIVE_SET
"""
cmd = 'dataset mgmtsetcommand active'
if listActiveTimestamp is not None:
cmd += ' activetimestamp '
cmd += str(listActiveTimestamp[0])
if xExtendedPanId is not None:
cmd += ' extpanid '
xpanid = self.__convertLongToHex(xExtendedPanId, 16)
cmd += xpanid
if sNetworkName is not None:
cmd += ' networkname '
cmd += self._deviceEscapeEscapable(str(sNetworkName))
if xChannel is not None:
cmd += ' channel '
cmd += str(xChannel)
if sMeshLocalPrefix is not None:
cmd += ' localprefix '
cmd += str(sMeshLocalPrefix)
if xMasterKey is not None:
cmd += ' networkkey '
key = self.__convertLongToHex(xMasterKey, 32)
cmd += key
if xPanId is not None:
cmd += ' panid '
cmd += str(xPanId)
if listChannelMask is not None:
cmd += ' channelmask '
cmd += '0x' + self.__convertLongToHex(self.__convertChannelMask(listChannelMask))
if (sPSKc is not None or listSecurityPolicy is not None or xCommissioningSessionId is not None or
xTmfPort is not None or xSteeringData is not None or xBorderRouterLocator is not None or
BogusTLV is not None):
cmd += ' -x '
if sPSKc is not None:
cmd += '0410'
stretchedPskc = Thread_PBKDF2.get(
sPSKc,
ModuleHelper.Default_XpanId,
ModuleHelper.Default_NwkName,
)
pskc = '%x' % stretchedPskc
if len(pskc) < 32:
pskc = pskc.zfill(32)
cmd += pskc
if listSecurityPolicy is not None:
if self.DeviceCapability == DevCapb.V1_1:
cmd += '0c03'
else:
cmd += '0c04'
rotationTime = 0
policyBits = 0
# previous passing way listSecurityPolicy=[True, True, 3600,
# False, False, True]
if len(listSecurityPolicy) == 6:
rotationTime = listSecurityPolicy[2]
# the last three reserved bits must be 1
policyBits = 0b00000111
if listSecurityPolicy[0]:
policyBits = policyBits | 0b10000000
if listSecurityPolicy[1]:
policyBits = policyBits | 0b01000000
if listSecurityPolicy[3]:
policyBits = policyBits | 0b00100000
if listSecurityPolicy[4]:
policyBits = policyBits | 0b00010000
if listSecurityPolicy[5]:
policyBits = policyBits | 0b00001000
else:
# new passing way listSecurityPolicy=[3600, 0b11001111]
rotationTime = listSecurityPolicy[0]
# bit order
if len(listSecurityPolicy) > 2:
policyBits = listSecurityPolicy[2] << 8 | listSecurityPolicy[1]
else:
policyBits = listSecurityPolicy[1]
policy = str(hex(rotationTime))[2:]
if len(policy) < 4:
policy = policy.zfill(4)
cmd += policy
flags0 = ('%x' % (policyBits & 0x00ff)).ljust(2, '0')
cmd += flags0
if self.DeviceCapability != DevCapb.V1_1:
flags1 = ('%x' % ((policyBits & 0xff00) >> 8)).ljust(2, '0')
cmd += flags1
if xCommissioningSessionId is not None:
cmd += '0b02'
sessionid = str(hex(xCommissioningSessionId))[2:]
if len(sessionid) < 4:
sessionid = sessionid.zfill(4)
cmd += sessionid
if xBorderRouterLocator is not None:
cmd += '0902'
locator = str(hex(xBorderRouterLocator))[2:]
if len(locator) < 4:
locator = locator.zfill(4)
cmd += locator
if xSteeringData is not None:
steeringData = self.__convertLongToHex(xSteeringData)
cmd += '08' + str(len(steeringData) / 2).zfill(2)
cmd += steeringData
if BogusTLV is not None:
cmd += '8202aa55'
return self.__executeCommand(cmd)[-1] == 'Done'
@API
def MGMT_PENDING_GET(self, Addr='', TLVs=()):
"""send MGMT_PENDING_GET command
Returns:
True: successful to send MGMT_PENDING_GET
False: fail to send MGMT_PENDING_GET
"""
cmd = 'dataset mgmtgetcommand pending'
if Addr != '':
cmd += ' address '
cmd += Addr
if len(TLVs) != 0:
tlvs = ''.join('%02x' % tlv for tlv in TLVs)
cmd += ' -x '
cmd += tlvs
return self.__executeCommand(cmd)[-1] == 'Done'
@API
def MGMT_PENDING_SET(
self,
sAddr='',
xCommissionerSessionId=None,
listPendingTimestamp=None,
listActiveTimestamp=None,
xDelayTimer=None,
xChannel=None,
xPanId=None,
xMasterKey=None,
sMeshLocalPrefix=None,
sNetworkName=None,
):
"""send MGMT_PENDING_SET command
Returns:
True: successful to send MGMT_PENDING_SET
False: fail to send MGMT_PENDING_SET
"""
cmd = 'dataset mgmtsetcommand pending'
if listPendingTimestamp is not None:
cmd += ' pendingtimestamp '
cmd += str(listPendingTimestamp[0])
if listActiveTimestamp is not None:
cmd += ' activetimestamp '
cmd += str(listActiveTimestamp[0])
if xDelayTimer is not None:
cmd += ' delaytimer '
cmd += str(xDelayTimer)
# cmd += ' delaytimer 3000000'
if xChannel is not None:
cmd += ' channel '
cmd += str(xChannel)
if xPanId is not None:
cmd += ' panid '
cmd += str(xPanId)
if xMasterKey is not None:
cmd += ' networkkey '
key = self.__convertLongToHex(xMasterKey, 32)
cmd += key
if sMeshLocalPrefix is not None:
cmd += ' localprefix '
cmd += str(sMeshLocalPrefix)
if sNetworkName is not None:
cmd += ' networkname '
cmd += self._deviceEscapeEscapable(str(sNetworkName))
if xCommissionerSessionId is not None:
cmd += ' -x '
cmd += '0b02'
sessionid = str(hex(xCommissionerSessionId))[2:]
if len(sessionid) < 4:
sessionid = sessionid.zfill(4)
cmd += sessionid
return self.__executeCommand(cmd)[-1] == 'Done'
@API
def MGMT_COMM_GET(self, Addr='ff02::1', TLVs=()):
"""send MGMT_COMM_GET command
Returns:
True: successful to send MGMT_COMM_GET
False: fail to send MGMT_COMM_GET
"""
cmd = 'commissioner mgmtget'
if len(TLVs) != 0:
tlvs = ''.join('%02x' % tlv for tlv in TLVs)
cmd += ' -x '
cmd += tlvs
return self.__executeCommand(cmd)[-1] == 'Done'
@API
def MGMT_COMM_SET(
self,
Addr='ff02::1',
xCommissionerSessionID=None,
xSteeringData=None,
xBorderRouterLocator=None,
xChannelTlv=None,
ExceedMaxPayload=False,
):
"""send MGMT_COMM_SET command
Returns:
True: successful to send MGMT_COMM_SET
False: fail to send MGMT_COMM_SET
"""
cmd = 'commissioner mgmtset'
if xCommissionerSessionID is not None:
# use assigned session id
cmd += ' sessionid '
cmd += str(xCommissionerSessionID)
elif xCommissionerSessionID is None:
# use original session id
if self.isActiveCommissioner is True:
cmd += ' sessionid '
cmd += self.__getCommissionerSessionId()
else:
pass
if xSteeringData is not None:
cmd += ' steeringdata '
cmd += str(hex(xSteeringData)[2:])
if xBorderRouterLocator is not None:
cmd += ' locator '
cmd += str(hex(xBorderRouterLocator))
if xChannelTlv is not None:
cmd += ' -x '
cmd += '000300' + '%04x' % xChannelTlv
return self.__executeCommand(cmd)[-1] == 'Done'
@API
def setActiveDataset(self, listActiveDataset=()):
# Unused by the scripts
pass
@API
def setCommisionerMode(self):
# Unused by the scripts
pass
@API
def setPSKc(self, strPSKc):
cmd = 'dataset pskc %s' % strPSKc
self.hasActiveDatasetToCommit = True
return self.__executeCommand(cmd)[-1] == 'Done'
@API
def setActiveTimestamp(self, xActiveTimestamp):
self.activetimestamp = xActiveTimestamp
self.hasActiveDatasetToCommit = True
cmd = 'dataset activetimestamp %s' % str(xActiveTimestamp)
return self.__executeCommand(cmd)[-1] == 'Done'
@API
def setUdpJoinerPort(self, portNumber):
"""set Joiner UDP Port
Args:
portNumber: Joiner UDP Port number
Returns:
True: successful to set Joiner UDP Port
False: fail to set Joiner UDP Port
"""
cmd = 'joinerport %d' % portNumber
return self.__executeCommand(cmd)[-1] == 'Done'
@API
def commissionerUnregister(self):
"""stop commissioner
Returns:
True: successful to stop commissioner
False: fail to stop commissioner
"""
cmd = 'commissioner stop'
return self.__executeCommand(cmd)[-1] == 'Done'
@API
def sendBeacons(self, sAddr, xCommissionerSessionId, listChannelMask, xPanId):
self.__sendCommand('scan', expectEcho=False)
@API
def updateRouterStatus(self):
"""force update to router as if there is child id request"""
self._update_router_status = True
@API
def __updateRouterStatus(self):
cmd = 'state'
while True:
state = self.__executeCommand(cmd)[0]
if state == 'detached':
continue
elif state == 'child':
break
else:
return False
cmd = 'state router'
return self.__executeCommand(cmd)[-1] == 'Done'
@API
def setRouterThresholdValues(self, upgradeThreshold, downgradeThreshold):
self.__setRouterUpgradeThreshold(upgradeThreshold)
self.__setRouterDowngradeThreshold(downgradeThreshold)
@API
def setMinDelayTimer(self, iSeconds):
cmd = 'delaytimermin %s' % iSeconds
return self.__executeCommand(cmd)[-1] == 'Done'
@API
def ValidateDeviceFirmware(self):
assert not self.IsBorderRouter, "Method not expected to be used with border router devices"
if self.DeviceCapability == OT11_CAPBS:
return OT11_VERSION in self.UIStatusMsg
elif self.DeviceCapability == OT12_CAPBS:
return OT12_VERSION in self.UIStatusMsg
elif self.DeviceCapability == OT13_CAPBS:
return OT13_VERSION in self.UIStatusMsg
else:
return False
@API
def setBbrDataset(self, SeqNumInc=False, SeqNum=None, MlrTimeout=None, ReRegDelay=None):
""" set BBR Dataset
Args:
SeqNumInc: Increase `SeqNum` by 1 if True.
SeqNum: Set `SeqNum` to a given value if not None.
MlrTimeout: Set `MlrTimeout` to a given value.
ReRegDelay: Set `ReRegDelay` to a given value.
MUST NOT set SeqNumInc to True and SeqNum to non-None value at the same time.
Returns:
True: successful to set BBR Dataset
False: fail to set BBR Dataset
"""
assert not (SeqNumInc and SeqNum is not None), "Must not specify both SeqNumInc and SeqNum"
if (MlrTimeout and MlrTimeout != self.bbrMlrTimeout) or (ReRegDelay and ReRegDelay != self.bbrReRegDelay):
if SeqNum is None:
SeqNumInc = True
if SeqNumInc:
if self.bbrSeqNum in (126, 127):
self.bbrSeqNum = 0
elif self.bbrSeqNum in (254, 255):
self.bbrSeqNum = 128
else:
self.bbrSeqNum = (self.bbrSeqNum + 1) % 256
else:
self.bbrSeqNum = SeqNum
return self.__configBbrDataset(SeqNum=self.bbrSeqNum, MlrTimeout=MlrTimeout, ReRegDelay=ReRegDelay)
def __configBbrDataset(self, SeqNum=None, MlrTimeout=None, ReRegDelay=None):
if MlrTimeout is not None and ReRegDelay is None:
ReRegDelay = self.bbrReRegDelay
cmd = 'bbr config'
if SeqNum is not None:
cmd += ' seqno %d' % SeqNum
if ReRegDelay is not None:
cmd += ' delay %d' % ReRegDelay
if MlrTimeout is not None:
cmd += ' timeout %d' % MlrTimeout
ret = self.__executeCommand(cmd)[-1] == 'Done'
if SeqNum is not None:
self.bbrSeqNum = SeqNum
if MlrTimeout is not None:
self.bbrMlrTimeout = MlrTimeout
if ReRegDelay is not None:
self.bbrReRegDelay = ReRegDelay
self.__executeCommand('netdata register')
return ret
# Low power THCI
@API
def setCSLtout(self, tout=30):
self.ssedTimeout = tout
cmd = 'csl timeout %u' % self.ssedTimeout
return self.__executeCommand(cmd)[-1] == 'Done'
@API
def setCSLchannel(self, ch=11):
cmd = 'csl channel %u' % ch
return self.__executeCommand(cmd)[-1] == 'Done'
@API
def setCSLperiod(self, period=500):
"""set Csl Period
Args:
period: csl period in ms
note: OT command 'csl period' accepts parameter in unit of 10 symbols,
period is converted from unit ms to ten symbols (160us per 10 symbols).
"""
cmd = 'csl period %u' % (period * 6.25)
return self.__executeCommand(cmd)[-1] == 'Done'
@staticmethod
def getForwardSeriesFlagsFromHexOrStr(flags):
hexFlags = int(flags, 16) if isinstance(flags, str) else flags
strFlags = ''
if hexFlags == 0:
strFlags = 'X'
else:
if hexFlags & 0x1 != 0:
strFlags += 'l'
if hexFlags & 0x2 != 0:
strFlags += 'd'
if hexFlags & 0x4 != 0:
strFlags += 'r'
if hexFlags & 0x8 != 0:
strFlags += 'a'
return strFlags
@staticmethod
def mapMetricsHexToChar(metrics):
metricsFlagMap = {
0x40: 'p',
0x09: 'q',
0x0a: 'm',
0x0b: 'r',
}
metricsReservedFlagMap = {0x11: 'q', 0x12: 'm', 0x13: 'r'}
if metricsFlagMap.get(metrics):
return metricsFlagMap.get(metrics), False
elif metricsReservedFlagMap.get(metrics):
return metricsReservedFlagMap.get(metrics), True
else:
logging.warning("Not found flag mapping for given metrics: {}".format(metrics))
return '', False
@staticmethod
def getMetricsFlagsFromHexStr(metrics):
strMetrics = ''
reserved_flag = ''
if metrics.startswith('0x'):
metrics = metrics[2:]
hexMetricsArray = bytearray.fromhex(metrics)
for metric in hexMetricsArray:
metric_flag, has_reserved_flag = OpenThreadTHCI.mapMetricsHexToChar(metric)
strMetrics += metric_flag
if has_reserved_flag:
reserved_flag = ' r'
return strMetrics + reserved_flag
@API
def LinkMetricsSingleReq(self, dst_addr, metrics):
cmd = 'linkmetrics query %s single %s' % (dst_addr, self.getMetricsFlagsFromHexStr(metrics))
return self.__executeCommand(cmd)[-1] == 'Done'
@API
def LinkMetricsMgmtReq(self, dst_addr, type_, flags, metrics, series_id):
cmd = 'linkmetrics mgmt %s ' % dst_addr
if type_ == 'FWD':
cmd += 'forward %d %s' % (series_id, self.getForwardSeriesFlagsFromHexOrStr(flags))
if flags != 0:
cmd += ' %s' % (self.getMetricsFlagsFromHexStr(metrics))
elif type_ == 'ENH':
cmd += 'enhanced-ack'
if flags != 0:
cmd += ' register %s' % (self.getMetricsFlagsFromHexStr(metrics))
else:
cmd += ' clear'
return self.__executeCommand(cmd)[-1] == 'Done'
@API
def LinkMetricsGetReport(self, dst_addr, series_id):
cmd = 'linkmetrics query %s forward %d' % (dst_addr, series_id)
return self.__executeCommand(cmd)[-1] == 'Done'
# TODO: Series Id is not in this API.
@API
def LinkMetricsSendProbe(self, dst_addr, ack=True, size=0):
cmd = 'linkmetrics probe %s %d' % (dst_addr, size)
return self.__executeCommand(cmd)[-1] == 'Done'
@API
def setTxPower(self, level):
cmd = 'txpower '
if level == 'HIGH':
cmd += '127'
elif level == 'MEDIUM':
cmd += '0'
elif level == 'LOW':
cmd += '-128'
else:
print('wrong Tx Power level')
return self.__executeCommand(cmd)[-1] == 'Done'
@API
def sendUdp(self, destination, port, payload='hello'):
assert payload is not None, 'payload should not be none'
cmd = 'udp send %s %d %s' % (destination, port, payload)
return self.__executeCommand(cmd)[-1] == 'Done'
@API
def send_udp(self, interface, destination, port, payload='12ABcd'):
''' payload hexstring
'''
assert payload is not None, 'payload should not be none'
assert interface == 0, "non-BR must send UDP to Thread interface"
self.__udpOpen()
time.sleep(0.5)
cmd = 'udp send %s %s -x %s' % (destination, port, payload)
return self.__executeCommand(cmd)[-1] == 'Done'
def __udpOpen(self):
if not self.__isUdpOpened:
cmd = 'udp open'
self.__executeCommand(cmd)
# Bind to RLOC address and first dynamic port
rlocAddr = self.getRloc()
cmd = 'udp bind %s 49152' % rlocAddr
self.__executeCommand(cmd)
self.__isUdpOpened = True
@API
def sendMACcmd(self, enh=False):
cmd = 'mac send datarequest'
return self.__executeCommand(cmd)[-1] == 'Done'
@API
def sendMACdata(self, enh=False):
cmd = 'mac send emptydata'
return self.__executeCommand(cmd)[-1] == 'Done'
@API
def setCSLsuspension(self, suspend):
if suspend:
self.__setPollPeriod(240 * 1000)
else:
self.__setPollPeriod(int(0.9 * self.ssedTimeout * 1000))
@API
def set_max_addrs_per_child(self, num):
cmd = 'childip max %d' % int(num)
self.__executeCommand(cmd)
@API
def config_next_dua_status_rsp(self, mliid, status_code):
if status_code >= 400:
# map status_code to correct COAP response code
a, b = divmod(status_code, 100)
status_code = ((a & 0x7) << 5) + (b & 0x1f)
cmd = 'bbr mgmt dua %d' % status_code
if mliid is not None:
mliid = mliid.replace(':', '')
cmd += ' %s' % mliid
self.__executeCommand(cmd)
@API
def getDUA(self):
dua = self.getGUA('fd00:7d03')
return dua
def __addDefaultDomainPrefix(self):
self.configBorderRouter(P_dp=1, P_stable=1, P_on_mesh=1, P_default=1)
def __setDUA(self, sDua):
"""specify the DUA before Thread Starts."""
if isinstance(sDua, str):
sDua = sDua.decode('utf8')
iid = ipaddress.IPv6Address(sDua).packed[-8:]
cmd = 'dua iid %s' % ''.join('%02x' % ord(b) for b in iid)
return self.__executeCommand(cmd)[-1] == 'Done'
def __getMlIid(self):
"""get the Mesh Local IID."""
# getULA64() would return the full string representation
mleid = ModuleHelper.GetFullIpv6Address(self.getULA64()).lower()
mliid = mleid[-19:].replace(':', '')
return mliid
def __setMlIid(self, sMlIid):
"""Set the Mesh Local IID before Thread Starts."""
assert ':' not in sMlIid
cmd = 'mliid %s' % sMlIid
self.__executeCommand(cmd)
@API
def registerDUA(self, sAddr=''):
self.__setDUA(sAddr)
@API
def config_next_mlr_status_rsp(self, status_code):
cmd = 'bbr mgmt mlr response %d' % status_code
return self.__executeCommand(cmd)[-1] == 'Done'
@API
def setMLRtimeout(self, iMsecs):
"""Setup BBR MLR Timeout to `iMsecs` seconds."""
self.setBbrDataset(MlrTimeout=iMsecs)
@API
def stopListeningToAddr(self, sAddr):
cmd = 'ipmaddr del ' + sAddr
try:
self.__executeCommand(cmd)
except CommandError as ex:
if ex.code == OT_ERROR_ALREADY:
pass
else:
raise
return True
@API
def registerMulticast(self, listAddr=('ff04::1234:777a:1',), timeout=MLR_TIMEOUT_MIN):
"""subscribe to the given ipv6 address (sAddr) in interface and send MLR.req OTA
Args:
sAddr : str : Multicast address to be subscribed and notified OTA.
"""
for each_sAddr in listAddr:
self._beforeRegisterMulticast(each_sAddr, timeout)
sAddr = ' '.join(listAddr)
cmd = 'ipmaddr add ' + str(sAddr)
try:
self.__executeCommand(cmd)
except CommandError as ex:
if ex.code == OT_ERROR_ALREADY:
pass
else:
raise
@API
def getMlrLogs(self):
return self.externalCommissioner.getMlrLogs()
@API
def migrateNetwork(self, channel=None, net_name=None):
"""migrate to another Thread Partition 'net_name' (could be None)
on specified 'channel'. Make sure same Mesh Local IID and DUA
after migration for DUA-TC-06/06b (DEV-1923)
"""
if channel is None:
raise Exception('channel None')
if channel not in range(11, 27):
raise Exception('channel %d not in [11, 26] Invalid' % channel)
print('new partition %s on channel %d' % (net_name, channel))
mliid = self.__getMlIid()
dua = self.getDUA()
self.reset()
deviceRole = self.deviceRole
self.setDefaultValues()
self.setChannel(channel)
if net_name is not None:
self.setNetworkName(net_name)
self.__setMlIid(mliid)
self.__setDUA(dua)
return self.joinNetwork(deviceRole)
@API
def setParentPrio(self, prio):
cmd = 'parentpriority %u' % prio
return self.__executeCommand(cmd)[-1] == 'Done'
@API
def role_transition(self, role):
cmd = 'mode %s' % OpenThreadTHCI._ROLE_MODE_DICT[role]
return self.__executeCommand(cmd)[-1] == 'Done'
@API
def setLeaderWeight(self, iWeight=72):
self.__executeCommand('leaderweight %d' % iWeight)
def __detectZephyr(self):
"""Detect if the device is running Zephyr and adapt in that case"""
try:
self._lineSepX = re.compile(r'\r\n|\r|\n')
if self.__executeCommand(ZEPHYR_PREFIX + 'thread version')[0].isdigit():
self._cmdPrefix = ZEPHYR_PREFIX
except CommandError:
self._lineSepX = LINESEPX
def __discoverDeviceCapability(self):
"""Discover device capability according to version"""
if self.IsBorderRouter:
self.log("Setting capability of BR {}: DevCapb.C_BBR | DevCapb.C_Host | DevCapb.C_Comm".format(self))
self.DeviceCapability = OT12BR_CAPBS
else:
# Get Thread stack version to distinguish device capability.
thver = self.__executeCommand('thread version')[0]
if thver in ['1.2', '3']:
self.log("Setting capability of {}: DevCapb.L_AIO | DevCapb.C_FFD | DevCapb.C_RFD".format(self))
self.DeviceCapability = OT12_CAPBS
elif thver in ['1.1', '2']:
self.log("Setting capability of {}: DevCapb.V1_1".format(self))
self.DeviceCapability = OT11_CAPBS
else:
self.log("Capability not specified for {}".format(self))
self.DeviceCapability = DevCapb.NotSpecified
assert False, thver
@staticmethod
def __lstrip0x(s):
"""strip 0x at the beginning of a hex string if it exists
Args:
s: hex string
Returns:
hex string with leading 0x stripped
"""
if s.startswith('0x'):
s = s[2:]
return s
@API
def setCcmState(self, state=0):
assert state in (0, 1), state
self.__executeCommand("ccm {}".format("enable" if state == 1 else "disable"))
@API
def setVrCheckSkip(self):
self.__executeCommand("tvcheck disable")
class OpenThread(OpenThreadTHCI, IThci):
def _connect(self):
print('My port is %s' % self)
self.__lines = []
timeout = 10
port_error = None
if self.port.startswith('COM'):
for _ in range(int(timeout / 0.5)):
time.sleep(0.5)
try:
self.__handle = serial.Serial(self.port, 115200, timeout=0, write_timeout=1)
self.sleep(1)
self.__handle.write('\r\n')
self.sleep(0.1)
self._is_net = False
break
except SerialException as port_error:
self.log("{} port not ready, retrying to connect...".format(self.port))
else:
raise SerialException("Could not open {} port: {}".format(self.port, port_error))
elif ':' in self.port:
host, port = self.port.split(':')
self.__handle = socket.create_connection((host, port))
self.__handle.setblocking(False)
self._is_net = True
else:
raise Exception('Unknown port schema')
def _disconnect(self):
if self.__handle:
self.__handle.close()
self.__handle = None
def _deviceBeforeReset(self):
pass
def _deviceAfterReset(self):
pass
def __restartAgentService(self):
pass
def _beforeRegisterMulticast(self, sAddr, timeout):
pass
def __socRead(self, size=512):
if self._is_net:
return self.__handle.recv(size)
else:
return self.__handle.read(size)
def __socWrite(self, data):
if self._is_net:
self.__handle.sendall(data)
else:
self.__handle.write(data)
def _cliReadLine(self):
if len(self.__lines) > 1:
return self.__lines.pop(0)
tail = ''
if len(self.__lines) != 0:
tail = self.__lines.pop()
try:
tail += self.__socRead()
except socket.error:
logging.exception('%s: No new data', self)
self.sleep(0.1)
self.__lines += self._lineSepX.split(tail)
if len(self.__lines) > 1:
return self.__lines.pop(0)
def _cliWriteLine(self, line):
if self._cmdPrefix == ZEPHYR_PREFIX:
if not line.startswith(self._cmdPrefix):
line = self._cmdPrefix + line
self.__socWrite(line + '\r')
else:
self.__socWrite(line + '\r\n')
def _onCommissionStart(self):
pass
def _onCommissionStop(self):
pass