blob: 44ac5669943816863608d36f042a3b4aa7db9dd1 [file] [log] [blame]
#!/usr/bin/env python
#
# Copyright (c) 2018, The OpenThread Authors.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# 3. Neither the name of the copyright holder nor the
# names of its contributors may be used to endorse or promote products
# derived from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
"""
>> Thread Host Controller Interface
>> Device : OpenThread_WpanCtl THCI
>> Class : OpenThread_WpanCtl
"""
import re
import time
import socket
import logging
from Queue import Queue
import serial
from IThci import IThci
from GRLLibs.UtilityModules.Test import Thread_Device_Role, Device_Data_Requirement, MacType
from GRLLibs.UtilityModules.enums import PlatformDiagnosticPacket_Direction, PlatformDiagnosticPacket_Type
from GRLLibs.UtilityModules.ModuleHelper import ModuleHelper
from GRLLibs.ThreadPacket.PlatformPackets import PlatformDiagnosticPacket, PlatformPackets
from GRLLibs.UtilityModules.Plugins.AES_CMAC import Thread_PBKDF2
"""regex: used to split lines"""
LINESEPX = re.compile(r'\r\n|\n')
class OpenThread_WpanCtl(IThci):
LOWEST_POSSIBLE_PARTATION_ID = 0x1
LINK_QUALITY_CHANGE_TIME = 100
# Used for reference firmware version control for Test Harness.
# This variable will be updated to match the OpenThread reference firmware
# officially released.
firmwarePrefix = 'OPENTHREAD/'
def __init__(self, **kwargs):
"""initialize the serial port and default network parameters
Args:
**kwargs: Arbitrary keyword arguments
Includes 'EUI' and 'SerialPort'
"""
try:
self.UIStatusMsg = ''
self.mac = kwargs.get('EUI')
self.handle = None
self.AutoDUTEnable = False
self._is_net = False # whether device is through ser2net
self.logStatus = {'stop': 'stop', 'running': 'running', 'pauseReq': 'pauseReq', 'paused': 'paused'}
self.logThreadStatus = self.logStatus['stop']
# connection type 'ip' stands for SSH
self.connectType = (kwargs.get('Param5')).strip().lower() if kwargs.get('Param5') is not None else 'usb'
# comma separated CLI prompt, wpanctl cmd prefix, Wpan interface
(self.prompt, self.wpan_cmd_prefix, self.wpan_interface) = (
kwargs.get('Param8').strip().split(',') if kwargs.get('Param8') else ['#', 'wpanctl', 'wpan0']
)
self.wpan_cmd_prefix += ' '
# comma separated setting commands
self.precmd = (kwargs.get('Param9')).strip().split(',') if kwargs.get('Param9') else []
if self.connectType == 'ip':
self.dutIpv4 = kwargs.get('TelnetIP')
self.dutPort = kwargs.get('TelnetPort')
self.port = self.dutIpv4 + ':' + self.dutPort
# username for SSH
self.username = kwargs.get('Param6').strip() if kwargs.get('Param6') else None
# password for SSH
self.password = kwargs.get('Param7').strip() if kwargs.get('Param7') else None
else:
self.port = kwargs.get('SerialPort')
self.intialize()
except Exception as e:
ModuleHelper.WriteIntoDebugLogger('initialize() Error: ' + str(e))
def __del__(self):
"""close the serial port connection"""
try:
self.closeConnection()
self.deviceConnected = False
except Exception as e:
ModuleHelper.WriteIntoDebugLogger('delete() Error: ' + str(e))
def _expect(self, expected, times=50):
"""Find the `expected` line within `times` trials.
Args:
expected str: the expected string
times int: number of trials
"""
print('[%s] Expecting [%s]' % (self.port, expected))
retry_times = 10
while times > 0 and retry_times > 0:
line = self._readline()
print('[%s] Got line [%s]' % (self.port, line))
if line == expected:
print('[%s] Expected [%s]' % (self.port, expected))
return
if not line:
retry_times -= 1
time.sleep(0.1)
times -= 1
raise Exception('failed to find expected string[%s]' % expected)
def _read(self, size=512):
logging.info('%s: reading', self.port)
if self._is_net:
return self.handle.recv(size)
else:
return self.handle.read(size)
def _write(self, data):
logging.info('%s: writing', self.port)
if self._is_net:
self.handle.sendall(data)
else:
self.handle.write(data)
def _readline(self):
"""Read exactly one line from the device
Returns:
None on no data
"""
logging.info('%s: reading line', self.port)
if len(self._lines) > 1:
return self._lines.pop(0)
tail = ''
if len(self._lines):
tail = self._lines.pop()
try:
tail += self._read()
except socket.error:
logging.exception('%s: No new data', self.port)
time.sleep(0.1)
self._lines += LINESEPX.split(tail)
if len(self._lines) > 1:
return self._lines.pop(0)
def _sendline(self, line):
"""Send exactly one line to the device
Args:
line str: data send to device
"""
logging.info('%s: sending line', self.port)
# clear buffer
self._lines = []
try:
self._read()
except socket.error:
logging.debug('%s: Nothing cleared', self.port)
print('sending [%s]' % line)
self._write(line + '\r\n')
self._lines = []
# wait for write to complete
time.sleep(0.1)
def __sendCommand(self, cmd):
"""send specific command to reference unit over serial port
Args:
cmd: OpenThread_WpanCtl command string
Returns:
Fail: Failed to send the command to reference unit and parse it
Value: successfully retrieve the desired value from reference unit
Error: some errors occur, indicates by the followed specific error number
"""
logging.info('%s: sendCommand[%s]', self.port, cmd)
if self.logThreadStatus == self.logStatus['running']:
self.logThreadStatus = self.logStatus['pauseReq']
while self.logThreadStatus not in (self.logStatus['paused'], self.logStatus['stop']):
pass
ssh_stdin = None
ssh_stdout = None
ssh_stderr = None
try:
# command retransmit times
retry_times = 3
while retry_times > 0:
retry_times -= 1
try:
if self._is_net:
ssh_stdin, ssh_stdout, ssh_stderr = self.handle.exec_command(cmd)
else:
self._sendline(cmd)
self._expect(cmd)
except Exception as e:
logging.exception('%s: failed to send command[%s]: %s', self.port, cmd, str(e))
if retry_times == 0:
raise
else:
break
line = None
response = []
retry_times = 20
stdout_lines = []
stderr_lines = []
if self._is_net:
stdout_lines = ssh_stdout.readlines()
stderr_lines = ssh_stderr.readlines()
if stderr_lines:
for stderr_line in stderr_lines:
if re.search(r'Not\s+Found|failed\s+with\s+error', stderr_line.strip(), re.M | re.I):
print('Command failed: %s' % stderr_line)
return 'Fail'
print('Got line: %s' % stderr_line)
logging.info('%s: the read line is[%s]', self.port, stderr_line)
response.append(str(stderr_line.strip()))
elif stdout_lines:
for stdout_line in stdout_lines:
logging.info('%s: the read line is[%s]', self.port, stdout_line)
if re.search(r'Not\s+Found|failed\s+with\s+error', stdout_line.strip(), re.M | re.I):
print('Command failed')
return 'Fail'
print('Got line: ' + stdout_line)
logging.info('%s: send command[%s] done!', self.port, cmd)
response.append(str(stdout_line.strip()))
response.append(self.prompt)
return response
else:
while retry_times > 0:
line = self._readline()
print('read line: %s' % line)
logging.info('%s: the read line is[%s]', self.port, line)
if line:
response.append(line)
if re.match(self.prompt, line):
break
elif re.search(r'Not\s+Found|failed\s+with\s+error', line, re.M | re.I):
print('Command failed')
return 'Fail'
retry_times -= 1
time.sleep(0.1)
if retry_times == 0:
raise Exception('%s: failed to find end of response' % self.port)
logging.info('%s: send command[%s] done!', self.port, cmd)
return response
except Exception as e:
ModuleHelper.WriteIntoDebugLogger('sendCommand() Error: ' + str(e))
raise
def __stripValue(self, value):
"""strip the special characters in the value
Args:
value: value string
Returns:
value string without special characters
"""
if isinstance(value, str):
if (value[0] == '"' and value[-1] == '"') or (value[0] == '[' and value[-1] == ']'):
return value[1:-1]
return value
def __padIp6Addr(self, ip6Addr):
segments = ip6Addr.split(':')
empty = None
for i, element in enumerate(segments):
if empty is None and len(element) == 0:
empty = i
elif len(element) < 4:
segments[i] = '0' * (4 - len(element)) + element
if empty is not None:
segments = segments[:empty] + ['0000'] * (8 - len(segments) + 1) + segments[empty + 1 :]
return ':'.join(segments)
def __setDeviceMode(self, mode):
"""set thread device mode:
Args:
mode: thread device mode. 15=rsdn, 13=rsn, 4=s
r: rx-on-when-idle
s: secure IEEE 802.15.4 data request
d: full thread device
n: full network data
Returns:
True: successful to set the device mode
False: fail to set the device mode
"""
print('call __setDeviceMode')
try:
cmd = self.wpan_cmd_prefix + 'setprop Thread:DeviceMode %d' % mode
return self.__sendCommand(cmd)[0] != 'Fail'
except Exception as e:
ModuleHelper.WriteIntoDebugLogger('setDeviceMode() Error: ' + str(e))
def __setRouterUpgradeThreshold(self, iThreshold):
"""set router upgrade threshold
Args:
iThreshold: the number of active routers on the Thread network
partition below which a REED may decide to become a Router.
Returns:
True: successful to set the ROUTER_UPGRADE_THRESHOLD
False: fail to set ROUTER_UPGRADE_THRESHOLD
"""
print('call __setRouterUpgradeThreshold')
try:
cmd = self.wpan_cmd_prefix + 'setprop Thread:RouterUpgradeThreshold %s' % str(iThreshold)
return self.__sendCommand(cmd)[0] != 'Fail'
except Exception as e:
ModuleHelper.WriteIntoDebugLogger('setRouterUpgradeThreshold() Error: ' + str(e))
def __setRouterDowngradeThreshold(self, iThreshold):
"""set router downgrade threshold
Args:
iThreshold: the number of active routers on the Thread network
partition above which an active router may decide to
become a child.
Returns:
True: successful to set the ROUTER_DOWNGRADE_THRESHOLD
False: fail to set ROUTER_DOWNGRADE_THRESHOLD
"""
print('call __setRouterDowngradeThreshold')
try:
cmd = self.wpan_cmd_prefix + 'setprop Thread:RouterDowngradeThreshold %s' % str(iThreshold)
return self.__sendCommand(cmd)[0] != 'Fail'
except Exception as e:
ModuleHelper.WriteIntoDebugLogger('setRouterDowngradeThreshold() Error: ' + str(e))
def __setRouterSelectionJitter(self, iRouterJitter):
"""set ROUTER_SELECTION_JITTER parameter for REED to upgrade to Router
Args:
iRouterJitter: a random period prior to request Router ID for REED
Returns:
True: successful to set the ROUTER_SELECTION_JITTER
False: fail to set ROUTER_SELECTION_JITTER
"""
print('call _setRouterSelectionJitter')
try:
cmd = self.wpan_cmd_prefix + 'setprop Thread:RouterSelectionJitter %s' % str(iRouterJitter)
return self.__sendCommand(cmd) != 'Fail'
except Exception as e:
ModuleHelper.WriteIntoDebugLogger('setRouterSelectionJitter() Error: ' + str(e))
def __setAddressfilterMode(self, mode):
"""set address filter mode
Returns:
True: successful to set address filter mode.
False: fail to set address filter mode.
"""
print('call setAddressFilterMode() %s' % mode)
try:
if re.match('list', mode, re.M | re.I):
cmd = self.wpan_cmd_prefix + 'setprop MAC:' + mode + ':Enabled 1'
elif mode == 'disabled':
cmd = self.wpan_cmd_prefix + 'setprop MAC:' + mode + ':Enabled 0'
else:
print('no such option')
return False
if self.__sendCommand(cmd)[0] != 'Fail':
return True
return False
except Exception as e:
ModuleHelper.WriteIntoDebugLogger('__setAddressFilterMode() Error: ' + str(e))
def __startOpenThreadWpan(self):
"""start OpenThreadWpan
Returns:
True: successful to start OpenThreadWpan up
False: fail to start OpenThreadWpan
"""
print('call startOpenThreadWpan')
try:
# restore whitelist/blacklist address filter mode if rejoin after
# reset
if self.isPowerDown:
if self._addressfilterMode == 'whitelist':
if self.__setAddressfilterMode('whitelist'):
for addr in self._addressfilterSet:
self.addAllowMAC(addr)
elif self._addressfilterMode == 'blacklist':
if self.__setAddressfilterMode('blacklist'):
for addr in self._addressfilterSet:
self.addBlockedMAC(addr)
time.sleep(1)
if ModuleHelper.LeaderDutChannelFound:
self.channel = ModuleHelper.Default_Channel
nodeType = 'router'
startType = 'join'
if self.deviceRole == Thread_Device_Role.Leader:
nodeType = 'router'
startType = 'form'
elif self.deviceRole == Thread_Device_Role.Router:
nodeType = 'router'
elif self.deviceRole == Thread_Device_Role.SED:
nodeType = 'sed'
elif self.deviceRole == Thread_Device_Role.EndDevice:
nodeType = 'end'
elif self.deviceRole == Thread_Device_Role.REED:
nodeType = 'router'
elif self.deviceRole == Thread_Device_Role.EndDevice_FED:
nodeType = 'router'
elif self.deviceRole == Thread_Device_Role.EndDevice_MED:
nodeType = 'end'
else:
pass
if self.deviceRole in [Thread_Device_Role.Leader, Thread_Device_Role.Router, Thread_Device_Role.REED]:
self.__setRouterSelectionJitter(1)
if startType == 'form':
startCmd = self.wpan_cmd_prefix + '%s %s -c %s -T %s ' % (
startType,
self.networkName,
str(self.channel),
nodeType,
)
else:
startCmd = self.wpan_cmd_prefix + '%s %s -p %s -c %s -T %s ' % (
startType,
self.networkName,
str(hex(self.panId)),
str(self.channel),
nodeType,
)
if self.__sendCommand(startCmd)[0] != 'Fail':
if self.__isOpenThreadWpanRunning():
self.isPowerDown = False
if self.hasActiveDatasetToCommit:
if self.__sendCommand(self.wpan_cmd_prefix + 'setprop Dataset:Command SetActive')[0] == 'Fail':
raise Exception('failed to commit active dataset')
else:
self.hasActiveDatasetToCommit = False
return True
else:
return False
except Exception as e:
ModuleHelper.WriteIntoDebugLogger('startOpenThreadWpan() Error: ' + str(e))
def __stopOpenThreadWpan(self):
"""stop OpenThreadWpan
Returns:
True: successfully stop OpenThreadWpan
False: failed to stop OpenThreadWpan
"""
print('call stopOpenThreadWpan')
try:
if (
self.__sendCommand(self.wpan_cmd_prefix + 'leave')[0] != 'Fail'
and self.__sendCommand(self.wpan_cmd_prefix + 'dataset erase')[0] != 'Fail'
):
return True
else:
return False
except Exception as e:
ModuleHelper.WriteIntoDebugLogger('stopOpenThreadWpan() Error: ' + str(e))
def __isOpenThreadWpanRunning(self):
"""check whether or not OpenThreadWpan is running
Returns:
True: OpenThreadWpan is running
False: OpenThreadWpan is not running
"""
print('call __isOpenThreadWpanRunning')
if self.__stripValue(self.__sendCommand(self.wpan_cmd_prefix + 'getprop -v NCP:State')[0]) == 'associated':
print('*****OpenThreadWpan is running')
return True
else:
print('*****Wrong OpenThreadWpan state')
return False
# rloc16 might be hex string or integer, need to return actual allocated
# router id
def __convertRlocToRouterId(self, xRloc16):
"""mapping Rloc16 to router id
Args:
xRloc16: hex rloc16 short address
Returns:
actual router id allocated by leader
"""
routerList = []
routerList = self.__sendCommand(self.wpan_cmd_prefix + 'getprop -v Thread:RouterTable')
print(routerList)
print(xRloc16)
for line in routerList:
if re.match(r'\[|\]', line):
continue
if re.match(self.prompt, line, re.M | re.I):
break
router = []
router = self.__stripValue(line).split(',')
for item in router:
if 'RouterId' in item:
routerid = item.split(':')[1]
elif 'RLOC16' in line:
rloc16 = line.split(':')[1]
else:
pass
# process input rloc16
if isinstance(xRloc16, str):
rloc16 = '0x' + rloc16
if rloc16 == xRloc16:
return routerid
elif isinstance(xRloc16, int):
if int(rloc16, 16) == xRloc16:
return routerid
else:
pass
return None
def __convertIp6PrefixStringToIp6Address(self, strIp6Prefix):
"""convert IPv6 prefix string to IPv6 dotted-quad format
for example:
2001000000000000 -> 2001:0000:0000:0000::
Args:
strIp6Prefix: IPv6 address string
Returns:
IPv6 address dotted-quad format
"""
prefix1 = strIp6Prefix.rstrip('L')
prefix2 = self.__lstrip0x(prefix1)
hexPrefix = str(prefix2).ljust(16, '0')
hexIter = iter(hexPrefix)
finalMac = ':'.join(a + b + c + d for a, b, c, d in zip(hexIter, hexIter, hexIter, hexIter))
prefix = str(finalMac)
strIp6Prefix = prefix[:19]
return strIp6Prefix + '::'
def __convertLongToHex(self, iValue, fillZeros=None):
"""convert a long hex integer to string
remove '0x' and 'L' return string
Args:
iValue: long integer in hex format
fillZeros: pad string with zeros on the left to specified width
Returns:
string of this long integer without '0x' and 'L'
"""
fmt = '%x'
if fillZeros is not None:
fmt = '%%0%dx' % fillZeros
return fmt % iValue
def __convertChannelMask(self, channelsArray):
"""convert channelsArray to bitmask format
Args:
channelsArray: channel array (i.e. [21, 22])
Returns:
bitmask format corresponding to a given channel array
"""
maskSet = 0
for eachChannel in channelsArray:
mask = 1 << eachChannel
maskSet = maskSet | mask
return maskSet
def __ChannelMaskListToStr(self, channelList):
"""convert a channel list to a string
Args:
channelList: channel list (i.e. [21, 22, 23])
Returns:
a comma separated channel string (i.e. '21, 22, 23')
"""
chan_mask_range = ''
if isinstance(channelList, list):
if len(channelList):
chan_mask_range = ','.join(str(chan) for chan in channelList)
else:
print('empty list')
else:
print('not a valid channel list: %s' % channelList)
return chan_mask_range
def __setChannelMask(self, channelMask):
print('call _setChannelMask')
try:
cmd = self.wpan_cmd_prefix + 'setprop NCP:ChannelMask %s' % channelMask
datasetCmd = self.wpan_cmd_prefix + 'setprop Dataset:ChannelMaskPage0 %s' % channelMask
self.hasActiveDatasetToCommit = True
return self.__sendCommand(cmd)[0] != 'Fail' and self.__sendCommand(datasetCmd)[0] != 'Fail'
except Exception as e:
ModuleHelper.WriteIntoDebugLogger('setChannelMask() Error: ' + str(e))
def __setSecurityPolicy(self, securityPolicySecs, securityPolicyFlags):
print('call _setSecurityPolicy')
try:
cmd1 = self.wpan_cmd_prefix + 'setprop Dataset:SecPolicy:KeyRotation %s' % str(securityPolicySecs)
if securityPolicyFlags == 'onrcb':
cmd2 = self.wpan_cmd_prefix + 'setprop Dataset:SecPolicy:Flags 0xff'
else:
print('unknown policy flag :' + securityPolicyFlags)
return False
self.hasActiveDatasetToCommit = True
return self.__sendCommand(cmd1) != 'Fail' and self.__sendCommand(cmd2) != 'Fail'
except Exception as e:
ModuleHelper.WriteIntoDebugLogger('setSecurityPolicy() Error: ' + str(e))
def __setKeySwitchGuardTime(self, iKeySwitchGuardTime):
""" set the Key switch guard time
Args:
iKeySwitchGuardTime: key switch guard time
Returns:
True: successful to set key switch guard time
False: fail to set key switch guard time
"""
print('%s call setKeySwitchGuardTime' % self.port)
print(iKeySwitchGuardTime)
try:
cmd = self.wpan_cmd_prefix + 'setprop Network:KeySwitchGuardTime %s' % str(iKeySwitchGuardTime)
if self.__sendCommand(cmd)[0] != 'Fail':
time.sleep(1)
return True
else:
return False
except Exception as e:
ModuleHelper.WriteIntoDebugLogger('setKeySwitchGuardTime() Error; ' + str(e))
def __getCommissionerSessionId(self):
""" get the commissioner session id allocated from Leader """
print('%s call getCommissionerSessionId' % self.port)
return self.__sendCommand(self.wpan_cmd_prefix + 'getprop -v Commissioner:SessionId')[0]
def __getJoinerState(self):
""" get joiner state """
maxDuration = 150 # seconds
t_end = time.time() + maxDuration
while time.time() < t_end:
joinerState = self.__stripValue(self.__sendCommand(self.wpan_cmd_prefix + 'getprop -v NCP:State')[0])
if joinerState == 'offline:commissioned':
return True
elif joinerState == 'associating:credentials-needed':
return False
else:
time.sleep(5)
continue
return False
def _connect(self):
if self.connectType == 'usb':
print('My port is %s' % self.port)
try:
self.handle = serial.Serial(self.port, 115200, timeout=0.2)
except Exception as e:
ModuleHelper.WriteIntoDebugLogger('open serial error ' + str(e))
try:
attempts = 0
user_prompted = False
pwd_prompted = False
while attempts < 10 or pwd_prompted:
time.sleep(0.5)
attempts = attempts + 1
print('attempts...%d' % attempts)
input_data = self.handle.read(self.handle.inWaiting())
if not input_data:
if not user_prompted:
self.handle.write('\n')
time.sleep(0.5)
continue
if 'login' in input_data:
self.handle.write(self.username + '\n')
time.sleep(0.3)
print('user prompted')
user_prompted = True
elif 'Password' in input_data:
print('password prompted')
time.sleep(0.3)
self.handle.write(self.password + '\n')
pwd_prompted = True
elif self.prompt in input_data:
print('login success (serial)')
time.sleep(0.3)
self.deviceConnected = True
for precmd in self.precmd:
self.handle.write(precmd + '\n')
time.sleep(0.3)
time.sleep(1)
break
if not self.deviceConnected:
raise Exception('login fail')
else:
self._is_net = False
except Exception as e:
ModuleHelper.WriteIntoDebugLogger('connect to serial Error: ' + str(e))
elif self.connectType == 'ip':
print('My IP: %s Port: %s' % (self.dutIpv4, self.dutPort))
try:
import paramiko
if not self.password:
transport = paramiko.Transport((self.dutIpv4, int(self.dutPort)))
transport.start_client()
transport.auth_none(self.username)
self.handle = paramiko.SSHClient()
self.handle.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self.handle._transport = transport
else:
self.handle = paramiko.SSHClient()
self.handle.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self.handle.connect(
self.dutIpv4, port=int(self.dutPort), username=self.username, password=self.password
)
print('login success (ssh)')
self.deviceConnected = True
for precmd in self.precmd:
self.handle.exec_command(precmd + '\n')
time.sleep(0.5)
self._is_net = True
except Exception as e:
ModuleHelper.WriteIntoDebugLogger('connect to ssh Error: ' + str(e))
else:
raise Exception('Unknown port schema')
def closeConnection(self):
"""close current serial port connection"""
print('%s call closeConnection' % self.port)
try:
if self.handle:
self.handle.close()
self.handle = None
except Exception as e:
ModuleHelper.WriteIntoDebugLogger('closeConnection() Error: ' + str(e))
def intialize(self):
"""initialize the serial port with baudrate, timeout parameters"""
print('%s call intialize' % self.port)
try:
# init serial port
self.deviceConnected = False
self._connect()
if self.deviceConnected:
self.UIStatusMsg = self.getVersionNumber()
if self.firmwarePrefix not in self.UIStatusMsg:
self.deviceConnected = False
self.UIStatusMsg = (
'Firmware Not Matching Expecting ' + self.firmwarePrefix + ' Now is ' + self.UIStatusMsg
)
raise Exception('Err: OpenThread device Firmware not matching..')
self.__sendCommand(self.wpan_cmd_prefix + 'leave')
self.__sendCommand(self.wpan_cmd_prefix + 'dataset erase')
else:
raise Exception('Err: Device not connected ..')
except Exception as e:
ModuleHelper.WriteIntoDebugLogger('intialize() Error: ' + str(e))
def setNetworkName(self, networkName='GRL'):
"""set Thread Network name
Args:
networkName: the networkname string to be set
Returns:
True: successful to set the Thread Networkname
False: fail to set the Thread Networkname
"""
print('%s call setNetworkName' % self.port)
try:
cmd = self.wpan_cmd_prefix + 'setprop -s Network:Name %s' % networkName
datasetCmd = self.wpan_cmd_prefix + 'setprop Dataset:NetworkName %s' % networkName
self.hasActiveDatasetToCommit = True
return self.__sendCommand(cmd)[0] != 'Fail' and self.__sendCommand(datasetCmd)[0] != 'Fail'
except Exception as e:
ModuleHelper.WriteIntoDebugLogger('setNetworkName() Error: ' + str(e))
def setChannel(self, channel=15):
"""set channel of Thread device operates on.
Args:
channel:
(0 - 10: Reserved)
(11 - 26: 2.4GHz channels)
(27 - 65535: Reserved)
Returns:
True: successful to set the channel
False: fail to set the channel
"""
print('%s call setChannel' % self.port)
try:
cmd = self.wpan_cmd_prefix + 'setprop NCP:Channel %s' % channel
datasetCmd = self.wpan_cmd_prefix + 'setprop Dataset:Channel %s' % channel
self.hasActiveDatasetToCommit = True
return self.__sendCommand(cmd)[0] != 'Fail' and self.__sendCommand(datasetCmd)[0] != 'Fail'
except Exception as e:
ModuleHelper.WriteIntoDebugLogger('setChannel() Error: ' + str(e))
def getChannel(self):
"""get current channel"""
print('%s call getChannel' % self.port)
return self.__sendCommand(self.wpan_cmd_prefix + 'getprop -v NCP:Channel')[0]
def setMAC(self, xEUI):
"""set the extended addresss of Thread device
Args:
xEUI: extended address in hex format
Returns:
True: successful to set the extended address
False: fail to set the extended address
"""
print('%s call setMAC' % self.port)
address64 = ''
try:
if not xEUI:
address64 = self.mac
if not isinstance(xEUI, str):
address64 = self.__convertLongToHex(xEUI, 16)
else:
address64 = xEUI
cmd = self.wpan_cmd_prefix + 'setprop NCP:MACAddress %s' % address64
if self.__sendCommand(cmd)[0] != 'Fail':
self.mac = address64
return True
else:
return False
except Exception as e:
ModuleHelper.WriteIntoDebugLogger('setMAC() Error: ' + str(e))
def getMAC(self, bType=MacType.RandomMac):
"""get one specific type of MAC address
currently OpenThreadWpan only supports Random MAC address
Args:
bType: indicate which kind of MAC address is required
Returns:
specific type of MAC address
"""
print('%s call getMAC' % self.port)
# if power down happens, return extended address assigned previously
if self.isPowerDown:
macAddr64 = self.mac
else:
if bType == MacType.FactoryMac:
macAddr64 = self.__stripValue(
self.__sendCommand(self.wpan_cmd_prefix + 'getprop -v NCP:HardwareAddress')[0]
)
elif bType == MacType.HashMac:
macAddr64 = self.__stripValue(
self.__sendCommand(self.wpan_cmd_prefix + 'getprop -v NCP:MACAddress')[0]
)
else:
macAddr64 = self.__stripValue(
self.__sendCommand(self.wpan_cmd_prefix + 'getprop -v NCP:ExtendedAddress')[0]
)
return int(macAddr64, 16)
def getLL64(self):
"""get link local unicast IPv6 address"""
print('%s call getLL64' % self.port)
return self.__stripValue(self.__sendCommand(self.wpan_cmd_prefix + 'getprop -v IPv6:LinkLocalAddress')[0])
def getMLEID(self):
"""get mesh local endpoint identifier address"""
print('%s call getMLEID' % self.port)
return self.__stripValue(self.__sendCommand(self.wpan_cmd_prefix + 'getprop -v IPv6:MeshLocalAddress')[0])
def getRloc16(self):
"""get rloc16 short address"""
print('%s call getRloc16' % self.port)
rloc16 = self.__sendCommand(self.wpan_cmd_prefix + 'getprop -v Thread:RLOC16')[0]
return int(rloc16, 16)
def getRloc(self):
"""get router locator unicast IPv6 address"""
print('%s call getRloc' % self.port)
prefix = self.__stripValue(self.__sendCommand(self.wpan_cmd_prefix + 'getprop -v IPv6:MeshLocalPrefix')[0])
mlprefix = prefix.split('/')[0]
rloc16 = self.__lstrip0x(self.__sendCommand(self.wpan_cmd_prefix + 'getprop -v Thread:RLOC16')[0])
print('prefix: %s' % prefix)
print('mlprefix: %s ' % mlprefix)
print('rloc16: %s' % rloc16)
rloc = self.__padIp6Addr(mlprefix + '00ff:fe00:' + rloc16)
print('rloc: %s' % rloc)
return rloc
def getGlobal(self):
"""get global unicast IPv6 address set
if configuring multiple entries
"""
print('%s call getGlobal' % self.port)
globalAddrs = []
mleid = self.__stripValue(self.__sendCommand(self.wpan_cmd_prefix + 'getprop -v IPv6:MeshLocalAddress')[0])
mleid = ModuleHelper.GetFullIpv6Address(mleid).lower()
addrs = self.__sendCommand(self.wpan_cmd_prefix + 'getprop -v IPv6:AllAddresses')
# find rloc address firstly as a reference for current mesh local prefix as for some TCs,
# mesh local prefix may be updated through pending dataset management.
for ip6AddrItem in addrs:
if re.match(r'\[|\]', ip6AddrItem):
continue
if re.match(self.prompt, ip6AddrItem, re.M | re.I):
break
ip6AddrItem = ip6AddrItem.strip()
ip6Addr = self.__stripValue(ip6AddrItem).split(' ')[0]
fullIp = ModuleHelper.GetFullIpv6Address(ip6Addr).lower()
print('fullip %s' % fullIp)
if fullIp.startswith('fe80'):
continue
if fullIp.startswith(mleid[0:19]):
continue
print('global')
globalAddrs.append(fullIp)
return globalAddrs
def setNetworkKey(self, key):
"""set Thread Network master key
Args:
key: Thread Network master key used in secure the MLE/802.15.4 packet
Returns:
True: successful to set the Thread Network master key
False: fail to set the Thread Network master key
"""
masterKey = ''
print('%s call setNetworkKey' % self.port)
try:
if not isinstance(key, str):
masterKey = self.__convertLongToHex(key, 32)
cmd = self.wpan_cmd_prefix + 'setprop Network:Key %s' % masterKey
datasetCmd = self.wpan_cmd_prefix + 'setprop Dataset:MasterKey %s' % masterKey
else:
masterKey = key
cmd = self.wpan_cmd_prefix + 'setprop Network:Key %s' % masterKey
datasetCmd = self.wpan_cmd_prefix + 'setprop Dataset:MasterKey %s' % masterKey
self.networkKey = masterKey
self.hasActiveDatasetToCommit = True
return self.__sendCommand(cmd)[0] != 'Fail' and self.__sendCommand(datasetCmd)[0] != 'Fail'
except Exception as e:
ModuleHelper.WriteIntoDebugLogger('setNetworkkey() Error: ' + str(e))
def addBlockedMAC(self, xEUI):
"""add a given extended address to the blacklist entry
Args:
xEUI: extended address in hex format
Returns:
True: successful to add a given extended address to the blacklist entry
False: fail to add a given extended address to the blacklist entry
"""
print('%s call addBlockedMAC' % self.port)
print(xEUI)
if isinstance(xEUI, str):
macAddr = xEUI
else:
macAddr = self.__convertLongToHex(xEUI)
try:
# if blocked device is itself
if macAddr == self.mac:
print('block device itself')
return True
if self._addressfilterMode != 'blacklist':
if self.__setAddressfilterMode('Blacklist'):
self._addressfilterMode = 'blacklist'
cmd = self.wpan_cmd_prefix + 'insert MAC:Blacklist:Entries %s' % macAddr
ret = self.__sendCommand(cmd)[0] != 'Fail'
self._addressfilterSet.add(macAddr)
print('current blacklist entries:')
for addr in self._addressfilterSet:
print(addr)
return ret
except Exception as e:
ModuleHelper.WriteIntoDebugLogger('addBlockedMAC() Error: ' + str(e))
def addAllowMAC(self, xEUI):
"""add a given extended address to the whitelist addressfilter
Args:
xEUI: a given extended address in hex format
Returns:
True: successful to add a given extended address to the whitelist entry
False: fail to add a given extended address to the whitelist entry
"""
print('%s call addAllowMAC' % self.port)
print(xEUI)
if isinstance(xEUI, str):
macAddr = xEUI
else:
macAddr = self.__convertLongToHex(xEUI)
try:
if self._addressfilterMode != 'whitelist':
if self.__setAddressfilterMode('Whitelist'):
self._addressfilterMode = 'whitelist'
cmd = self.wpan_cmd_prefix + 'insert MAC:Whitelist:Entries %s' % macAddr
ret = self.__sendCommand(cmd)[0] != 'Fail'
self._addressfilterSet.add(macAddr)
print('current whitelist entries:')
for addr in self._addressfilterSet:
print(addr)
return ret
except Exception as e:
ModuleHelper.WriteIntoDebugLogger('addAllowMAC() Error: ' + str(e))
def clearBlockList(self):
"""clear all entries in blacklist table
Returns:
True: successful to clear the blacklist
False: fail to clear the blacklist
"""
print('%s call clearBlockList' % self.port)
# remove all entries in blacklist
try:
print('clearing blacklist entries:')
for addr in self._addressfilterSet:
print(addr)
# disable blacklist
if self.__setAddressfilterMode('disable'):
self._addressfilterMode = 'disable'
# clear ops
cmd = self.wpan_cmd_prefix + 'remove MAC:Blocklist:Entries'
if self.__sendCommand(cmd)[0] != 'Fail':
self._addressfilterSet.clear()
return True
return False
except Exception as e:
ModuleHelper.WriteIntoDebugLogger('clearBlockList() Error: ' + str(e))
def clearAllowList(self):
"""clear all entries in whitelist table
Returns:
True: successful to clear the whitelist
False: fail to clear the whitelist
"""
print('%s call clearAllowList' % self.port)
# remove all entries in whitelist
try:
print('clearing whitelist entries:')
for addr in self._addressfilterSet:
print(addr)
# disable whitelist
if self.__setAddressfilterMode('disable'):
self._addressfilterMode = 'disable'
# clear ops
cmd = self.wpan_cmd_prefix + 'insert MAC:Whitelist:Entries'
if self.__sendCommand(cmd)[0] != 'Fail':
self._addressfilterSet.clear()
return True
return False
except Exception as e:
ModuleHelper.WriteIntoDebugLogger('clearAllowList() Error: ' + str(e))
def getDeviceRole(self):
"""get current device role in Thread Network"""
print('%s call getDeviceRole' % self.port)
return self.__stripValue(self.__sendCommand(self.wpan_cmd_prefix + 'getprop -v Network:NodeType')[0])
def joinNetwork(self, eRoleId):
"""make device ready to join the Thread Network with a given role
Args:
eRoleId: a given device role id
Returns:
True: ready to set Thread Network parameter for joining desired Network
"""
print('%s call joinNetwork' % self.port)
print(eRoleId)
self.deviceRole = eRoleId
mode = 15
try:
if ModuleHelper.LeaderDutChannelFound:
self.channel = ModuleHelper.Default_Channel
# FIXME: when Harness call setNetworkDataRequirement()?
# only sleep end device requires stable networkdata now
if eRoleId == Thread_Device_Role.Leader:
print('join as leader')
# rsdn
mode = 15
if self.AutoDUTEnable is False:
# set ROUTER_DOWNGRADE_THRESHOLD
self.__setRouterDowngradeThreshold(33)
elif eRoleId == Thread_Device_Role.Router:
print('join as router')
# rsdn
mode = 15
if self.AutoDUTEnable is False:
# set ROUTER_DOWNGRADE_THRESHOLD
self.__setRouterDowngradeThreshold(33)
elif eRoleId == Thread_Device_Role.SED:
print('join as sleepy end device')
# s
mode = 4
self.__setPollPeriod(self.__sedPollPeriod)
elif eRoleId == Thread_Device_Role.EndDevice:
print('join as end device')
# rsn
mode = 13
elif eRoleId == Thread_Device_Role.REED:
print('join as REED')
# rsdn
mode = 15
# set ROUTER_UPGRADE_THRESHOLD
self.__setRouterUpgradeThreshold(0)
elif eRoleId == Thread_Device_Role.EndDevice_FED:
# always remain an ED, never request to be a router
print('join as FED')
# rsdn
mode = 15
# set ROUTER_UPGRADE_THRESHOLD
self.__setRouterUpgradeThreshold(0)
elif eRoleId == Thread_Device_Role.EndDevice_MED:
print('join as MED')
# rsn
mode = 13
else:
pass
# set Thread device mode with a given role
self.__setDeviceMode(mode)
time.sleep(0.1)
# start OpenThreadWpan
self.__startOpenThreadWpan()
time.sleep(3)
return True
except Exception as e:
ModuleHelper.WriteIntoDebugLogger('joinNetwork() Error: ' + str(e))
def getNetworkFragmentID(self):
"""get current partition id of Thread Network Partition from LeaderData
Returns:
The Thread network Partition Id
"""
print('%s call getNetworkFragmentID' % self.port)
if not self.____isOpenThreadWpanRunning():
print('OpenThreadWpan is not running')
return None
return self.__sendCommand(self.wpan_cmd_prefix + 'getprop -v Network:PartitionId')[0]
def getParentAddress(self):
"""get Thread device's parent extended address and rloc16 short address
Returns:
The extended address of parent in hex format
"""
print('%s call getParentAddress' % self.port)
parentInfo = []
parentInfo = self.__stripValue(self.__sendCommand(self.wpan_cmd_prefix + 'getprop -v Thread:Parent')).split(
' '
)
return parentInfo[0]
def powerDown(self):
"""power down the OpenThreadWpan"""
print('%s call powerDown' % self.port)
if self.__sendCommand(self.wpan_cmd_prefix + 'setprop Daemon:AutoAssociateAfterReset false')[0] != 'Fail':
time.sleep(0.5)
if self.__sendCommand(self.wpan_cmd_prefix + 'reset')[0] != 'Fail':
self.isPowerDown = True
return True
else:
return False
else:
return False
def powerUp(self):
"""power up the Thread device"""
print('%s call powerUp' % self.port)
if not self.handle:
self._connect()
self.isPowerDown = False
if self.__sendCommand(self.wpan_cmd_prefix + 'attach')[0] != 'Fail':
time.sleep(3)
else:
return False
if self.__sendCommand(self.wpan_cmd_prefix + 'setprop Daemon:AutoAssociateAfterReset true')[0] == 'Fail':
return False
if self.__stripValue(self.__sendCommand(self.wpan_cmd_prefix + 'getprop -v NCP:State')[0]) != 'associated':
print('powerUp failed')
return False
else:
return True
def reboot(self):
"""reset and rejoin to Thread Network without any timeout
Returns:
True: successful to reset and rejoin the Thread Network
False: fail to reset and rejoin the Thread Network
"""
print('%s call reboot' % self.port)
try:
self._sendline(self.wpan_cmd_prefix + 'reset')
self.isPowerDown = True
if self.__sendCommand(self.wpan_cmd_prefix + 'getprop -v NCP:State')[0] != 'associated':
print('[FAIL] reboot')
return False
else:
return True
except Exception as e:
ModuleHelper.WriteIntoDebugLogger('reboot() Error: ' + str(e))
def ping(self, destination, length=20):
""" send ICMPv6 echo request with a given length to a unicast destination
address
Args:
destination: the unicast destination address of ICMPv6 echo request
length: the size of ICMPv6 echo request payload
"""
print('%s call ping' % self.port)
print('destination: %s' % destination)
try:
cmd = 'ping %s -c 1 -s %s -I %s' % (destination, str(length), self.wpan_interface)
if self._is_net:
ssh_stdin, ssh_stdout, ssh_stderr = self.handle.exec_command(cmd)
else:
self._sendline(cmd)
self._expect(cmd)
# wait echo reply
time.sleep(1)
except Exception as e:
ModuleHelper.WriteIntoDebugLogger('ping() Error: ' + str(e))
def multicast_Ping(self, destination, length=20):
"""send ICMPv6 echo request with a given length to a multicast destination
address
Args:
destination: the multicast destination address of ICMPv6 echo request
length: the size of ICMPv6 echo request payload
"""
print('%s call multicast_Ping' % self.port)
print('destination: %s' % destination)
try:
cmd = 'ping %s -c 1 -s %s -I %s' % (destination, str(length), self.wpan_interface)
if self._is_net:
ssh_stdin, ssh_stdout, ssh_stderr = self.handle.exec_command(cmd)
else:
self._sendline(cmd)
self._expect(cmd)
# wait echo reply
time.sleep(1)
except Exception as e:
ModuleHelper.WriteIntoDebugLogger('multicast_ping() Error: ' + str(e))
def getVersionNumber(self):
"""get OpenThreadWpan stack firmware version number"""
print('%s call getVersionNumber' % self.port)
versionStr = self.__sendCommand(self.wpan_cmd_prefix + 'getprop -v NCP:Version')[0]
return self.__stripValue(versionStr)
def setPANID(self, xPAN):
"""set Thread Network PAN ID
Args:
xPAN: a given PAN ID in hex format
Returns:
True: successful to set the Thread Network PAN ID
False: fail to set the Thread Network PAN ID
"""
print('%s call setPANID' % self.port)
print(xPAN)
panid = ''
try:
if not isinstance(xPAN, str):
panid = str(hex(xPAN))
print(panid)
cmd = self.wpan_cmd_prefix + 'setprop -s Network:PANID %s' % panid
datasetCmd = self.wpan_cmd_prefix + 'setprop Dataset:PanId %s' % panid
self.hasActiveDatasetToCommit = True
return self.__sendCommand(cmd)[0] != 'Fail' and self.__sendCommand(datasetCmd)[0] != 'Fail'
except Exception as e:
ModuleHelper.WriteIntoDebugLogger('setPANID() Error: ' + str(e))
def reset(self):
"""factory reset"""
print('%s call reset' % self.port)
try:
if self._is_net:
self.__sendCommand(self.wpan_cmd_prefix + 'leave')
else:
self._sendline(self.wpan_cmd_prefix + 'leave')
self.__sendCommand(self.wpan_cmd_prefix + 'dataset erase')
time.sleep(2)
if not self._is_net:
self._read()
except Exception as e:
ModuleHelper.WriteIntoDebugLogger('reset() Error: ' + str(e))
def removeRouter(self, xRouterId):
"""kick router with a given router id from the Thread Network
Args:
xRouterId: a given router id in hex format
Returns:
True: successful to remove the router from the Thread Network
False: fail to remove the router from the Thread Network
"""
print('%s call removeRouter' % self.port)
print(xRouterId)
routerId = ''
routerId = self.__convertRlocToRouterId(xRouterId)
print(routerId)
if routerId is None:
print('no matched xRouterId')
return False
try:
cmd = 'releaserouterid %s' % routerId
return self.__sendCommand(cmd)[0] != 'Fail'
except Exception as e:
ModuleHelper.WriteIntoDebugLogger('removeRouter() Error: ' + str(e))
def setDefaultValues(self):
"""set default mandatory Thread Network parameter value"""
print('%s call setDefaultValues' % self.port)
# initialize variables
self.networkName = ModuleHelper.Default_NwkName
self.networkKey = ModuleHelper.Default_NwkKey
self.channel = ModuleHelper.Default_Channel
self.channelMask = '0x7fff800' # (0xffff << 11)
self.panId = ModuleHelper.Default_PanId
self.xpanId = ModuleHelper.Default_XpanId
self.meshLocalPrefix = ModuleHelper.Default_MLPrefix
# OT only accept hex format PSKc for now
self.pskc = '00000000000000000000000000000000'
self.securityPolicySecs = ModuleHelper.Default_SecurityPolicy
self.securityPolicyFlags = 'onrcb'
self.activetimestamp = ModuleHelper.Default_ActiveTimestamp
# self.sedPollingRate = ModuleHelper.Default_Harness_SED_Polling_Rate
self.__sedPollPeriod = 3 * 1000 # in milliseconds
self.deviceRole = None
self.provisioningUrl = ''
self.hasActiveDatasetToCommit = False
self.logThread = Queue()
self.logThreadStatus = self.logStatus['stop']
# indicate Thread device requests full or stable network data
self.networkDataRequirement = ''
# indicate if Thread device experiences a power down event
self.isPowerDown = False
# indicate AddressFilter mode ['disable', 'whitelist', 'blacklist']
self._addressfilterMode = 'disable'
self._addressfilterSet = set() # cache filter entries
# indicate if Thread device is an active commissioner
self.isActiveCommissioner = False
self._lines = None # buffered lines read from device
# initialize device configuration
try:
self.setMAC(self.mac)
self.__setChannelMask(self.channelMask)
self.__setSecurityPolicy(self.securityPolicySecs, self.securityPolicyFlags)
self.setChannel(self.channel)
self.setPANID(self.panId)
self.setXpanId(self.xpanId)
self.setNetworkName(self.networkName)
self.setNetworkKey(self.networkKey)
self.setMLPrefix(self.meshLocalPrefix)
self.setPSKc(self.pskc)
self.setActiveTimestamp(self.activetimestamp)
except Exception as e:
ModuleHelper.WriteIntoDebugLogger('setDefaultValue() Error: ' + str(e))
def getDeviceConncetionStatus(self):
"""check if serial port connection is ready or not"""
print('%s call getDeviceConnectionStatus' % self.port)
return self.deviceConnected
def setPollingRate(self, iPollingRate):
"""set data polling rate for sleepy end device
Args:
iPollingRate: data poll period of sleepy end device (in seconds)
Returns:
True: successful to set the data polling rate for sleepy end device
False: fail to set the data polling rate for sleepy end device
"""
print('%s call setPollingRate' % self.port)
iPollingRate = int(iPollingRate * 1000)
print(iPollingRate)
if self.__sedPollPeriod != iPollingRate:
self.__sedPollPeriod = iPollingRate
# apply immediately
if self.__isOpenThreadWpanRunning():
return self.__setPollPeriod(self.__sedPollPeriod)
return True
def __setPollPeriod(self, iPollPeriod):
"""set data poll period for sleepy end device
Args:
iPollPeriod: data poll period of sleepy end device (in milliseconds)
Returns:
True: successful to set the data poll period for sleepy end device
False: fail to set the data poll period for sleepy end device
"""
try:
cmd = self.wpan_cmd_prefix + 'setprop NCP:SleepyPollInterval %s' % str(iPollPeriod)
print(cmd)
return self.__sendCommand(cmd)[0] != 'Fail'
except Exception as e:
ModuleHelper.WriteIntoDebugLogger('__setPollingRate() Error: ' + str(e))
def setLinkQuality(self, EUIadr, LinkQuality):
"""set custom LinkQualityIn for all receiving messages from the specified EUIadr
Args:
EUIadr: a given extended address
LinkQuality: a given custom link quality
link quality/link margin mapping table
3: 21 - 255 (dB)
2: 11 - 20 (dB)
1: 3 - 9 (dB)
0: 0 - 2 (dB)
Returns:
True: successful to set the link quality
False: fail to set the link quality
@todo: required if as reference device
"""
def setOutBoundLinkQuality(self, LinkQuality):
"""set custom LinkQualityIn for all receiving messages from the any address
Args:
LinkQuality: a given custom link quality
link quality/link margin mapping table
3: 21 - 255 (dB)
2: 11 - 20 (dB)
1: 3 - 9 (dB)
0: 0 - 2 (dB)
Returns:
True: successful to set the link quality
False: fail to set the link quality
@todo: required if as reference device
"""
def removeRouterPrefix(self, prefixEntry):
"""remove the configured prefix on a border router
Args:
prefixEntry: a on-mesh prefix entry
Returns:
True: successful to remove the prefix entry from border router
False: fail to remove the prefix entry from border router
@todo: required if as reference device
"""
def resetAndRejoin(self, timeout):
"""reset and join back Thread Network with a given timeout delay
Args:
timeout: a timeout interval before rejoin Thread Network
Returns:
True: successful to reset and rejoin Thread Network
False: fail to reset and rejoin the Thread Network
"""
print('%s call resetAndRejoin' % self.port)
print(timeout)
try:
if self.__sendCommand(self.wpan_cmd_prefix + 'setprop Daemon:AutoAssociateAfterReset false')[0] != 'Fail':
time.sleep(0.5)
if self.__sendCommand(self.wpan_cmd_prefix + 'reset')[0] != 'Fail':
self.isPowerDown = True
else:
return False
else:
return False
time.sleep(timeout)
if self.deviceRole == Thread_Device_Role.SED:
self.__setPollPeriod(self.__sedPollPeriod)
if self.__sendCommand(self.wpan_cmd_prefix + 'attach')[0] != 'Fail':
time.sleep(3)
else:
return False
if self.__sendCommand(self.wpan_cmd_prefix + 'setprop Daemon:AutoAssociateAfterReset true')[0] == 'Fail':
return False
if self.__stripValue(self.__sendCommand(self.wpan_cmd_prefix + 'getprop -v NCP:State')[0]) != 'associated':
print('[FAIL] reset and rejoin')
return False
return True
except Exception as e:
ModuleHelper.WriteIntoDebugLogger('resetAndRejoin() Error: ' + str(e))
def configBorderRouter(
self, P_Prefix, P_stable=1, P_default=1, P_slaac_preferred=0, P_Dhcp=0, P_preference=0, P_on_mesh=1, P_nd_dns=0
):
"""configure the border router with a given prefix entry parameters
Args:
P_Prefix: IPv6 prefix that is available on the Thread Network
P_stable: true if the default router is expected to be stable network data
P_default: true if border router offers the default route for P_Prefix
P_slaac_preferred: true if allowing auto-configure address using P_Prefix
P_Dhcp: is true if border router is a DHCPv6 Agent
P_preference: is two-bit signed integer indicating router preference
P_on_mesh: is true if P_Prefix is considered to be on-mesh
P_nd_dns: is true if border router is able to supply DNS information obtained via ND
Returns:
True: successful to configure the border router with a given prefix entry
False: fail to configure the border router with a given prefix entry
"""
print('%s call configBorderRouter' % self.port)
prefix = self.__convertIp6PrefixStringToIp6Address(str(P_Prefix))
print(prefix)
try:
parameter = ''
if P_slaac_preferred == 1:
parameter += ' -a -f'
if P_stable == 1:
parameter += ' -s'
if P_default == 1:
parameter += ' -r'
if P_Dhcp == 1:
parameter += ' -d'
if P_on_mesh == 1:
parameter += ' -o'
cmd = self.wpan_cmd_prefix + 'add-prefix %s %s -P %d' % (prefix, parameter, P_preference)
print(parameter)
print(cmd)
if self.__sendCommand(cmd)[0] != 'Fail':
return True
else:
return False
except Exception as e:
ModuleHelper.WriteIntoDebugLogger('configBorderRouter() Error: ' + str(e))
def setNetworkIDTimeout(self, iNwkIDTimeOut):
"""set networkid timeout for OpenThreadWpan
Args:
iNwkIDTimeOut: a given NETWORK_ID_TIMEOUT
Returns:
True: successful to set NETWORK_ID_TIMEOUT
False: fail to set NETWORK_ID_TIMEOUT
@todo: required if as reference device
"""
def setKeepAliveTimeOut(self, iTimeOut):
"""set keep alive timeout for device
has been deprecated and also set SED polling rate
Args:
iTimeOut: data poll period for sleepy end device
Returns:
True: successful to set the data poll period for SED
False: fail to set the data poll period for SED
"""
print('%s call setKeepAliveTimeOut' % self.port)
print(iTimeOut)
try:
cmd = self.wpan_cmd_prefix + 'setprop NCP:SleepyPollInterval %s' % str(iTimeOut * 1000)
print(cmd)
return self.__sendCommand(cmd)[0] != 'Fail'
except Exception as e:
ModuleHelper.WriteIntoDebugLogger('setKeepAliveTimeOut() Error: ' + str(e))
def setKeySequenceCounter(self, iKeySequenceValue):
""" set the Key sequence counter corresponding to Thread Network master key
Args:
iKeySequenceValue: key sequence value
Returns:
True: successful to set the key sequence
False: fail to set the key sequence
"""
print('%s call setKeySequenceCounter' % self.port)
print(iKeySequenceValue)
try:
# avoid key switch guard timer protection for reference device
self.__setKeySwitchGuardTime(0)
cmd = self.wpan_cmd_prefix + 'setprop Network:KeyIndex %s' % str(iKeySequenceValue)
if self.__sendCommand(cmd)[0] != 'Fail':
time.sleep(1)
return True
else:
return False
except Exception as e:
ModuleHelper.WriteIntoDebugLogger('setKeySequenceCounter() Error: ' + str(e))
def getKeySequenceCounter(self):
"""get current Thread Network key sequence"""
print('%s call getKeySequenceCounter' % self.port)
keySequence = ''
keySequence = self.__sendCommand(self.wpan_cmd_prefix + 'getprop -v Network:KeyIndex')[0]
return keySequence
def incrementKeySequenceCounter(self, iIncrementValue=1):
"""increment the key sequence with a given value
Args:
iIncrementValue: specific increment value to be added
Returns:
True: successful to increment the key sequence with a given value
False: fail to increment the key sequence with a given value
"""
print('%s call incrementKeySequenceCounter' % self.port)
print(iIncrementValue)
currentKeySeq = ''
try:
# avoid key switch guard timer protection for reference device
self.__setKeySwitchGuardTime(0)
currentKeySeq = self.getKeySequenceCounter()
keySequence = int(currentKeySeq, 10) + iIncrementValue
print(keySequence)
return self.setKeySequenceCounter(keySequence)
except Exception as e:
ModuleHelper.WriteIntoDebugLogger('incrementKeySequenceCounter() Error: ' + str(e))
def setNetworkDataRequirement(self, eDataRequirement):
"""set whether the Thread device requires the full network data
or only requires the stable network data
Args:
eDataRequirement: is true if requiring the full network data
Returns:
True: successful to set the network requirement
"""
print('%s call setNetworkDataRequirement' % self.port)
print(eDataRequirement)
if eDataRequirement == Device_Data_Requirement.ALL_DATA:
self.networkDataRequirement = 'n'
return True
def configExternalRouter(self, P_Prefix, P_stable, R_Preference=0):
"""configure border router with a given external route prefix entry
Args:
P_Prefix: IPv6 prefix for the route
P_Stable: is true if the external route prefix is stable network data
R_Preference: a two-bit signed integer indicating Router preference
1: high
0: medium
-1: low
Returns:
True: successful to configure the border router with a given external route prefix
False: fail to configure the border router with a given external route prefix
"""
print('%s call configExternalRouter' % self.port)
print(P_Prefix)
prefix = self.__convertIp6PrefixStringToIp6Address(str(P_Prefix))
try:
if P_stable:
cmd = self.wpan_cmd_prefix + 'add-route %s -l 64 -p %d' % (prefix, R_Preference)
else:
cmd = self.wpan_cmd_prefix + 'add-route %s -l 64 -p %d -n' % (prefix, R_Preference)
print(cmd)
if self.__sendCommand(cmd)[0] != 'Fail':
return True
else:
return False
except Exception as e:
ModuleHelper.WriteIntoDebugLogger('configExternalRouter() Error: ' + str(e))
def getNeighbouringRouters(self):
"""get neighboring routers information
Returns:
neighboring routers' extended address
@todo: required if as reference device
"""
def getChildrenInfo(self):
"""get all children information
Returns:
children's extended address
@todo: required if as reference device
"""
def setXpanId(self, xPanId):
"""set extended PAN ID of Thread Network
Args:
xPanId: extended PAN ID in hex format
Returns:
True: successful to set the extended PAN ID
False: fail to set the extended PAN ID
"""
xpanid = ''
print('%s call setXpanId' % self.port)
print(xPanId)
try:
if not isinstance(xPanId, str):
xpanid = self.__convertLongToHex(xPanId, 16)
cmd = self.wpan_cmd_prefix + 'setprop Network:XPANID %s' % xpanid
datasetCmd = self.wpan_cmd_prefix + 'setprop Dataset:ExtendedPanId %s' % xpanid
else:
xpanid = xPanId
cmd = self.wpan_cmd_prefix + 'setprop Network:XPANID %s' % xpanid
datasetCmd = self.wpan_cmd_prefix + 'setprop Dataset:ExtendedPanId %s' % xpanid
self.xpanId = xpanid
self.hasActiveDatasetToCommit = True
return self.__sendCommand(cmd)[0] != 'Fail' and self.__sendCommand(datasetCmd)[0] != 'Fail'
except Exception as e:
ModuleHelper.WriteIntoDebugLogger('setXpanId() Error: ' + str(e))
def getNeighbouringDevices(self):
"""gets the neighboring devices' extended address to compute the DUT
extended address automatically
Returns:
A list including extended address of neighboring routers, parent
as well as children
"""
print('%s call getNeighbouringDevices' % self.port)
neighbourList = []
# get parent info
parentAddr = self.getParentAddress()
if parentAddr != 0:
neighbourList.append(parentAddr)
# get ED/SED children info
childNeighbours = self.getChildrenInfo()
if childNeighbours is not None and len(childNeighbours) > 0:
for entry in childNeighbours:
neighbourList.append(entry)
# get neighboring routers info
routerNeighbours = self.getNeighbouringRouters()
if routerNeighbours is not None and len(routerNeighbours) > 0:
for entry in routerNeighbours:
neighbourList.append(entry)
print(neighbourList)
return neighbourList
def setPartationId(self, partationId):
"""set Thread Network Partition ID
Args:
partitionId: partition id to be set by leader
Returns:
True: successful to set the Partition ID
False: fail to set the Partition ID
"""
print('%s call setPartationId' % self.port)
print(partationId)
cmd = self.wpan_cmd_prefix + 'setprop Network:PartitionId %s' % (str(hex(partationId)).rstrip('L'))
print(cmd)
return self.__sendCommand(cmd)[0] != 'Fail'
def getGUA(self, filterByPrefix=None):
"""get expected global unicast IPv6 address of OpenThreadWpan
note: existing filterByPrefix are string of in lowercase. e.g.
'2001' or '2001:0db8:0001:0000".
Args:
filterByPrefix: a given expected global IPv6 prefix to be matched
Returns:
a global IPv6 address
"""
print('%s call getGUA' % self.port)
print(filterByPrefix)
globalAddrs = []
try:
# get global addrs set if multiple
globalAddrs = self.getGlobal()
if filterByPrefix is None:
return self.globalAddrs[0]
else:
for fullIp in globalAddrs:
if fullIp.startswith(filterByPrefix):
print('target global %s' % fullIp)
return fullIp
print('no global address matched')
return str(globalAddrs[0])
except Exception as e:
ModuleHelper.WriteIntoDebugLogger('getGUA() Error: ' + str(e))
return e
def getShortAddress(self):
"""get Rloc16 short address of Thread device"""
print('%s call getShortAddress' % self.port)
return self.getRloc16()
def getULA64(self):
"""get mesh local EID of Thread device"""
print('%s call getULA64' % self.port)
return self.getMLEID()
def setMLPrefix(self, sMeshLocalPrefix):
"""set mesh local prefix"""
print('%s call setMLPrefix' % self.port)
try:
cmd = self.wpan_cmd_prefix + 'setprop IPv6:MeshLocalPrefix %s' % sMeshLocalPrefix
datasetCmd = self.wpan_cmd_prefix + 'setprop Dataset:MeshLocalPrefix %s' % sMeshLocalPrefix
self.hasActiveDatasetToCommit = True
return self.__sendCommand(cmd)[0] != 'Fail' and self.__sendCommand(datasetCmd)[0] != 'Fail'
except Exception as e:
ModuleHelper.WriteIntoDebugLogger('setMLPrefix() Error: ' + str(e))
def getML16(self):
"""get mesh local 16 unicast address (Rloc)"""
print('%s call getML16' % self.port)
return self.getRloc()
def downgradeToDevice(self):
pass
def upgradeToRouter(self):
pass
def forceSetSlaac(self, slaacAddress):
"""@todo : required if as reference device"""
def setSleepyNodePollTime(self):
pass
def enableAutoDUTObjectFlag(self):
"""set AutoDUTenable flag"""
print('%s call enableAutoDUTObjectFlag' % self.port)
self.AutoDUTEnable = True
def getChildTimeoutValue(self):
"""get child timeout"""
print('%s call getChildTimeoutValue' % self.port)
childTimeout = self.__sendCommand(self.wpan_cmd_prefix + 'getprop -v Thread:ChildTimeout')[0]
return int(childTimeout)
def diagnosticGet(self, strDestinationAddr, listTLV_ids=[]):
"""@todo : required if as reference device"""
def diagnosticQuery(self, strDestinationAddr, listTLV_ids=[]):
"""@todo : required if as reference device"""
self.diagnosticGet(strDestinationAddr, listTLV_ids)
def diagnosticReset(self, strDestinationAddr, listTLV_ids=[]):
"""@todo : required if as reference device"""
def startNativeCommissioner(self, strPSKc='GRLpassWord'):
# TODO: Support the whole Native Commissioner functionality
# Currently it only aims to trigger a Discovery Request message to pass
# Certification test 5.8.4
print('%s call startNativeCommissioner' % self.port)
cmd = self.wpan_cmd_prefix + 'joiner --start %s' % (strPSKc)
print(cmd)
if self.__sendCommand(cmd)[0] != 'Fail':
return True
else:
return False
def startCollapsedCommissioner(self):
"""start Collapsed Commissioner
Returns:
True: successful to start Commissioner
False: fail to start Commissioner
"""
print('%s call startCollapsedCommissioner' % self.port)
startCmd = self.wpan_cmd_prefix + 'form %s -c %s -T router' % (self.networkName, str(self.channel))
if self.__sendCommand(startCmd) != 'Fail':
time.sleep(2)
cmd = self.wpan_cmd_prefix + 'commissioner start'
print(cmd)
if self.__sendCommand(cmd)[0] != 'Fail':
self.isActiveCommissioner = True
time.sleep(20) # time for petition process
return True
return False
def setJoinKey(self, strPSKc):
pass
def scanJoiner(self, xEUI='*', strPSKd='threadjpaketest'):
"""scan Joiner
Args:
xEUI: Joiner's EUI-64
strPSKd: Joiner's PSKd for commissioning
Returns:
True: successful to add Joiner's steering data
False: fail to add Joiner's steering data
"""
print('%s call scanJoiner' % self.port)
if not isinstance(xEUI, str):
eui64 = self.__convertLongToHex(xEUI, 16)
else:
eui64 = xEUI
# long timeout value to avoid automatic joiner removal (in seconds)
timeout = 500
cmd = self.wpan_cmd_prefix + 'commissioner joiner-add %s %s %s' % (eui64, str(timeout), strPSKd)
print(cmd)
if not self.isActiveCommissioner:
self.startCollapsedCommissioner()
if self.__sendCommand(cmd)[0] != 'Fail':
return True
else:
return False
def setProvisioningUrl(self, strURL='grl.com'):
"""set provisioning Url
Args:
strURL: Provisioning Url string
Returns:
True: successful to set provisioning Url
False: fail to set provisioning Url
"""
print('%s call setProvisioningUrl' % self.port)
self.provisioningUrl = strURL
if self.deviceRole == Thread_Device_Role.Commissioner:
cmd = self.wpan_cmd_prefix + 'setprop Commissioner:ProvisioningUrl %s' % (strURL)
print(cmd)
return self.__sendCommand(cmd)[0] != 'Fail'
return True
def allowCommission(self):
"""start commissioner candidate petition process
Returns:
True: successful to start commissioner candidate petition process
False: fail to start commissioner candidate petition process
"""
print('%s call allowCommission' % self.port)
try:
cmd = self.wpan_cmd_prefix + 'commissioner start'
print(cmd)
if self.isActiveCommissioner:
return True
if self.__sendCommand(cmd)[0] != 'Fail':
self.isActiveCommissioner = True
time.sleep(40) # time for petition process and at least one keep alive
return True
else:
return False
except Exception as e:
ModuleHelper.WriteIntoDebugLogger('allowcommission() error: ' + str(e))
def joinCommissioned(self, strPSKd='threadjpaketest', waitTime=20):
"""start joiner
Args:
strPSKd: Joiner's PSKd
Returns:
True: successful to start joiner
False: fail to start joiner
"""
print('%s call joinCommissioned' % self.port)
cmd = self.wpan_cmd_prefix + 'joiner --start %s %s' % (strPSKd, self.provisioningUrl)
print(cmd)
if self.__sendCommand(cmd)[0] != 'Fail':
if self.__getJoinerState():
self.__sendCommand(self.wpan_cmd_prefix + 'joiner --attach')
time.sleep(30)
return True
else:
return False
else:
return False
def getCommissioningLogs(self):
"""get Commissioning logs
Returns:
Commissioning logs
"""
rawLogs = self.logThread.get()
ProcessedLogs = []
payload = []
while not rawLogs.empty():
rawLogEach = rawLogs.get()
print(rawLogEach)
if '[THCI]' not in rawLogEach:
continue
EncryptedPacket = PlatformDiagnosticPacket()
infoList = rawLogEach.split('[THCI]')[1].split(']')[0].split('|')
for eachInfo in infoList:
print(eachInfo)
info = eachInfo.split('=')
infoType = info[0].strip()
infoValue = info[1].strip()
if 'direction' in infoType:
EncryptedPacket.Direction = (
PlatformDiagnosticPacket_Direction.IN
if 'recv' in infoValue
else PlatformDiagnosticPacket_Direction.OUT
if 'send' in infoValue
else PlatformDiagnosticPacket_Direction.UNKNOWN
)
elif 'type' in infoType:
EncryptedPacket.Type = (
PlatformDiagnosticPacket_Type.JOIN_FIN_req
if 'JOIN_FIN.req' in infoValue
else PlatformDiagnosticPacket_Type.JOIN_FIN_rsp
if 'JOIN_FIN.rsp' in infoValue
else PlatformDiagnosticPacket_Type.JOIN_ENT_req
if 'JOIN_ENT.ntf' in infoValue
else PlatformDiagnosticPacket_Type.JOIN_ENT_rsp
if 'JOIN_ENT.rsp' in infoValue
else PlatformDiagnosticPacket_Type.UNKNOWN
)
elif 'len' in infoType:
bytesInEachLine = 16
EncryptedPacket.TLVsLength = int(infoValue)
payloadLineCount = (int(infoValue) + bytesInEachLine - 1) / bytesInEachLine
while payloadLineCount > 0:
payloadLineCount = payloadLineCount - 1
payloadLine = rawLogs.get()
payloadSplit = payloadLine.split('|')
for block in range(1, 3):
payloadBlock = payloadSplit[block]
payloadValues = payloadBlock.split(' ')
for num in range(1, 9):
if '..' not in payloadValues[num]:
payload.append(int(payloadValues[num], 16))
EncryptedPacket.TLVs = PlatformPackets.read(EncryptedPacket.Type, payload) if payload != [] else []
ProcessedLogs.append(EncryptedPacket)
return ProcessedLogs
def MGMT_ED_SCAN(self, sAddr, xCommissionerSessionId, listChannelMask, xCount, xPeriod, xScanDuration):
"""send MGMT_ED_SCAN message to a given destinaition.
Args:
sAddr: IPv6 destination address for this message
xCommissionerSessionId: commissioner session id
listChannelMask: a channel array to indicate which channels to be scaned
xCount: number of IEEE 802.15.4 ED Scans (milliseconds)
xPeriod: Period between successive IEEE802.15.4 ED Scans (milliseconds)
xScanDuration: IEEE 802.15.4 ScanDuration to use when performing an IEEE 802.15.4 ED Scan (milliseconds)
Returns:
True: successful to send MGMT_ED_SCAN message.
False: fail to send MGMT_ED_SCAN message
"""
print('%s call MGMT_ED_SCAN' % self.port)
channelMask = ''
channelMask = self.__ChannelMaskListToStr(listChannelMask)
try:
cmd = self.wpan_cmd_prefix + 'commissioner energy-scan %s %s %s %s %s' % (
channelMask,
xCount,
xPeriod,
xScanDuration,
sAddr,
)
print(cmd)
return self.__sendCommand(cmd) != 'Fail'
except Exception as e:
ModuleHelper.WriteIntoDebugLogger('MGMT_ED_SCAN() error: ' + str(e))
def MGMT_PANID_QUERY(self, sAddr, xCommissionerSessionId, listChannelMask, xPanId):
"""send MGMT_PANID_QUERY message to a given destination
Args:
xPanId: a given PAN ID to check the conflicts
Returns:
True: successful to send MGMT_PANID_QUERY message.
False: fail to send MGMT_PANID_QUERY message.
"""
print('%s call MGMT_PANID_QUERY' % self.port)
panid = ''
channelMask = ''
channelMask = self.__ChannelMaskListToStr(listChannelMask)
if not isinstance(xPanId, str):
panid = str(hex(xPanId))
try:
cmd = self.wpan_cmd_prefix + 'commissioner pan-id-query %s %s %s' % (panid, channelMask, sAddr)
print(cmd)
return self.__sendCommand(cmd) != 'Fail'
except Exception as e:
ModuleHelper.WriteIntoDebugLogger('MGMT_PANID_QUERY() error: ' + str(e))
def MGMT_ANNOUNCE_BEGIN(self, sAddr, xCommissionerSessionId, listChannelMask, xCount, xPeriod):
"""send MGMT_ANNOUNCE_BEGIN message to a given destination
Returns:
True: successful to send MGMT_ANNOUNCE_BEGIN message.
False: fail to send MGMT_ANNOUNCE_BEGIN message.
"""
print('%s call MGMT_ANNOUNCE_BEGIN' % self.port)
channelMask = ''
channelMask = self.__ChannelMaskListToStr(listChannelMask)
try:
cmd = self.wpan_cmd_prefix + 'commissioner announce-begin %s %s %s %s' % (
channelMask,
xCount,
xPeriod,
sAddr,
)
print(cmd)
return self.__sendCommand(cmd) != 'Fail'
except Exception as e:
ModuleHelper.WriteIntoDebugLogger('MGMT_ANNOUNCE_BEGIN() error: ' + str(e))
def MGMT_ACTIVE_GET(self, Addr='', TLVs=[]):
"""send MGMT_ACTIVE_GET command
Returns:
True: successful to send MGMT_ACTIVE_GET
False: fail to send MGMT_ACTIVE_GET
"""
print('%s call MGMT_ACTIVE_GET' % self.port)
try:
cmd = self.wpan_cmd_prefix + 'dataset mgmt-get-active'
if len(TLVs) != 0:
tlvs = ''.join('%02x' % tlv for tlv in TLVs)
setTLVCmd = self.wpan_cmd_prefix + 'setprop Dataset:RawTlvs ' + tlvs
if self.__sendCommand(setTLVCmd)[0] == 'Fail':
return False
else:
if self.__sendCommand(self.wpan_cmd_prefix + 'dataset erase')[0] == 'Fail':
return False
if Addr != '':
setAddressCmd = self.wpan_cmd_prefix + 'setprop Dataset:DestIpAddress ' + Addr
if self.__sendCommand(setAddressCmd)[0] == 'Fail':
return False
print(cmd)
return self.__sendCommand(cmd)[0] != 'Fail'
except Exception as e:
ModuleHelper.WriteIntoDebugLogger('MGMT_ACTIVE_GET() Error: ' + str(e))
def MGMT_ACTIVE_SET(
self,
sAddr='',
xCommissioningSessionId=None,
listActiveTimestamp=None,
listChannelMask=None,
xExtendedPanId=None,
sNetworkName=None,
sPSKc=None,
listSecurityPolicy=None,
xChannel=None,
sMeshLocalPrefix=None,
xMasterKey=None,
xPanId=None,
xTmfPort=None,
xSteeringData=None,
xBorderRouterLocator=None,
BogusTLV=None,
xDelayTimer=None,
):
"""send MGMT_ACTIVE_SET command
Returns:
True: successful to send MGMT_ACTIVE_SET
False: fail to send MGMT_ACTIVE_SET
"""
print('%s call MGMT_ACTIVE_SET' % self.port)
try:
cmd = self.wpan_cmd_prefix + 'dataset mgmt-set-active'
if self.__sendCommand(self.wpan_cmd_prefix + 'dataset erase')[0] == 'Fail':
return False
if listActiveTimestamp is not None:
sActiveTimestamp = '%016x' % listActiveTimestamp[0]
setActiveTimeCmd = self.wpan_cmd_prefix + 'setprop Dataset:ActiveTimestamp ' + sActiveTimestamp
if self.__sendCommand(setActiveTimeCmd)[0] == 'Fail':
return False
if xExtendedPanId is not None:
xpanid = self.__convertLongToHex(xExtendedPanId, 16)
setExtendedPanIdCmd = self.wpan_cmd_prefix + 'setprop Dataset:ExtendedPanId ' + xpanid
if self.__sendCommand(setExtendedPanIdCmd)[0] == 'Fail':
return False
if sNetworkName is not None:
setNetworkNameCmd = self.wpan_cmd_prefix + 'setprop Dataset:NetworkName ' + str(sNetworkName)
if self.__sendCommand(setNetworkNameCmd)[0] == 'Fail':
return False
if xChannel is not None:
setChannelCmd = self.wpan_cmd_prefix + 'setprop Dataset:Channel ' + str(xChannel)
if self.__sendCommand(setChannelCmd)[0] == 'Fail':
return False
if sMeshLocalPrefix is not None:
setMLPrefixCmd = self.wpan_cmd_prefix + 'setprop Dataset:MeshLocalPrefix ' + str(sMeshLocalPrefix)
if self.__sendCommand(setMLPrefixCmd)[0] == 'Fail':
return False
if xMasterKey is not None:
key = self.__convertLongToHex(xMasterKey, 32)
setMasterKeyCmd = self.wpan_cmd_prefix + 'setprop Dataset:MasterKey ' + key
if self.__sendCommand(setMasterKeyCmd)[0] == 'Fail':
return False
if xPanId is not None:
setPanIdCmd = self.wpan_cmd_prefix + 'setprop Dataset:PanId ' + str(xPanId)
if self.__sendCommand(setPanIdCmd)[0] == 'Fail':
return False
if listChannelMask is not None:
setChannelMaskCmd = (
self.wpan_cmd_prefix
+ 'setprop Dataset:ChannelMaskPage0 '
+ '0x'
+ self.__convertLongToHex(self.__convertChannelMask(listChannelMask))
)
if self.__sendCommand(setChannelMaskCmd)[0] == 'Fail':
return False
if (
sPSKc is not None
or listSecurityPolicy is not None
or xCommissioningSessionId is not None
or xTmfPort is not None
or xSteeringData is not None
or xBorderRouterLocator is not None
or BogusTLV is not None
):
setRawTLVCmd = self.wpan_cmd_prefix + 'setprop Dataset:RawTlvs '
if sPSKc is not None:
setRawTLVCmd += '0410'
stretchedPskc = Thread_PBKDF2.get(sPSKc, ModuleHelper.Default_XpanId, ModuleHelper.Default_NwkName)
pskc = '%x' % stretchedPskc
if len(pskc) < 32:
pskc = pskc.zfill(32)
setRawTLVCmd += pskc
if listSecurityPolicy is not None:
setRawTLVCmd += '0c03'
rotationTime = 0
policyBits = 0
# previous passing way listSecurityPolicy=[True, True, 3600,
# False, False, True]
if len(listSecurityPolicy) == 6:
rotationTime = listSecurityPolicy[2]
# the last three reserved bits must be 1
policyBits = 0b00000111
if listSecurityPolicy[0]:
policyBits = policyBits | 0b10000000
if listSecurityPolicy[1]:
policyBits = policyBits | 0b01000000
if listSecurityPolicy[3]:
policyBits = policyBits | 0b00100000
if listSecurityPolicy[4]:
policyBits = policyBits | 0b00010000
if listSecurityPolicy[5]:
policyBits = policyBits | 0b00001000
else:
# new passing way listSecurityPolicy=[3600, 0b11001111]
rotationTime = listSecurityPolicy[0]
policyBits = listSecurityPolicy[1]
policy = str(hex(rotationTime))[2:]
if len(policy) < 4:
policy = policy.zfill(4)
setRawTLVCmd += policy
setRawTLVCmd += str(hex(policyBits))[2:]
if xCommissioningSessionId is not None:
setRawTLVCmd += '0b02'
sessionid = str(hex(xCommissioningSessionId))[2:]
if len(sessionid) < 4:
sessionid = sessionid.zfill(4)
setRawTLVCmd += sessionid
if xBorderRouterLocator is not None:
setRawTLVCmd += '0902'
locator = str(hex(xBorderRouterLocator))[2:]
if len(locator) < 4:
locator = locator.zfill(4)
setRawTLVCmd += locator
if xSteeringData is not None:
steeringData = self.__convertLongToHex(xSteeringData)
setRawTLVCmd += '08' + str(len(steeringData) / 2).zfill(2)
setRawTLVCmd += steeringData
if BogusTLV is not None:
setRawTLVCmd += '8202aa55'
print(setRawTLVCmd)
print(cmd)
if self.__sendCommand(setRawTLVCmd)[0] == 'Fail':
return False
return self.__sendCommand(cmd)[0] != 'Fail'
except Exception as e:
ModuleHelper.WriteIntoDebugLogger('MGMT_ACTIVE_SET() Error: ' + str(e))
def MGMT_PENDING_GET(self, Addr='', TLVs=[]):
"""send MGMT_PENDING_GET command
Returns:
True: successful to send MGMT_PENDING_GET
False: fail to send MGMT_PENDING_GET
"""
print('%s call MGMT_PENDING_GET' % self.port)
try:
cmd = self.wpan_cmd_prefix + 'dataset mgmt-get-pending'
if len(TLVs) != 0:
tlvs = ''.join('%02x' % tlv for tlv in TLVs)
setTLVCmd = self.wpan_cmd_prefix + 'setprop Dataset:RawTlvs ' + tlvs
if self.__sendCommand(setTLVCmd)[0] == 'Fail':
return False
else:
if self.__sendCommand(self.wpan_cmd_prefix + 'dataset erase')[0] == 'Fail':
return False
if Addr != '':
setAddressCmd = self.wpan_cmd_prefix + 'setprop Dataset:DestIpAddress ' + Addr
if self.__sendCommand(setAddressCmd)[0] == 'Fail':
return False
print(cmd)
return self.__sendCommand(cmd)[0] != 'Fail'
except Exception as e:
ModuleHelper.WriteIntoDebugLogger('MGMT_PENDING_GET() Error: ' + str(e))
def MGMT_PENDING_SET(
self,
sAddr='',
xCommissionerSessionId=None,
listPendingTimestamp=None,
listActiveTimestamp=None,
xDelayTimer=None,
xChannel=None,
xPanId=None,
xMasterKey=None,
sMeshLocalPrefix=None,
sNetworkName=None,
):
"""send MGMT_PENDING_SET command
Returns:
True: successful to send MGMT_PENDING_SET
False: fail to send MGMT_PENDING_SET
"""
print('%s call MGMT_PENDING_SET' % self.port)
try:
cmd = self.wpan_cmd_prefix + 'dataset mgmt-set-pending'
if self.__sendCommand(self.wpan_cmd_prefix + 'dataset erase')[0] == 'Fail':
return False
if listPendingTimestamp is not None:
sActiveTimestamp = '%016x' % listPendingTimestamp[0]
setPendingTimeCmd = self.wpan_cmd_prefix + 'setprop Dataset:PendingTimestamp ' + sActiveTimestamp
if self.__sendCommand(setPendingTimeCmd)[0] == 'Fail':
return False
if listActiveTimestamp is not None:
sActiveTimestamp = '%016x' % listActiveTimestamp[0]
setActiveTimeCmd = self.wpan_cmd_prefix + 'setprop Dataset:ActiveTimestamp ' + sActiveTimestamp
if self.__sendCommand(setActiveTimeCmd)[0] == 'Fail':
return False
if xDelayTimer is not None:
setDelayTimerCmd = self.wpan_cmd_prefix + 'setprop Dataset:Delay ' + str(xDelayTimer)
if self.__sendCommand(setDelayTimerCmd)[0] == 'Fail':
return False
if sNetworkName is not None:
setNetworkNameCmd = self.wpan_cmd_prefix + 'setprop Dataset:NetworkName ' + str(sNetworkName)
if self.__sendCommand(setNetworkNameCmd)[0] == 'Fail':
return False
if xChannel is not None:
setChannelCmd = self.wpan_cmd_prefix + 'setprop Dataset:Channel ' + str(xChannel)
if self.__sendCommand(setChannelCmd)[0] == 'Fail':
return False
if sMeshLocalPrefix is not None:
setMLPrefixCmd = self.wpan_cmd_prefix + 'setprop Dataset:MeshLocalPrefix ' + str(sMeshLocalPrefix)
if self.__sendCommand(setMLPrefixCmd)[0] == 'Fail':
return False
if xMasterKey is not None:
key = self.__convertLongToHex(xMasterKey, 32)
setMasterKeyCmd = self.wpan_cmd_prefix + 'setprop Dataset:MasterKey ' + key
if self.__sendCommand(setMasterKeyCmd)[0] == 'Fail':
return False
if xPanId is not None:
setPanIdCmd = self.wpan_cmd_prefix + 'setprop Dataset:PanId ' + str(xPanId)
if self.__sendCommand(setPanIdCmd)[0] == 'Fail':
return False
if xCommissionerSessionId is not None:
print('not handle xCommissionerSessionId')
print(cmd)
return self.__sendCommand(cmd)[0] != 'Fail'
except Exception as e:
ModuleHelper.WriteIntoDebugLogger('MGMT_PENDING_SET() Error: ' + str(e))
def MGMT_COMM_GET(self, Addr='ff02::1', TLVs=[]):
"""send MGMT_COMM_GET command
Returns:
True: successful to send MGMT_COMM_GET
False: fail to send MGMT_COMM_GET
"""
print('%s call MGMT_COMM_GET' % self.port)
try:
cmd = self.wpan_cmd_prefix + 'commissioner mgmt-get '
print('TLVs:')
print(TLVs)
if len(TLVs) != 0:
tlvs = ''.join('%02x' % tlv for tlv in TLVs)
cmd += tlvs
print(cmd)
return self.__sendCommand(cmd)[0] != 'Fail'
except Exception as e:
ModuleHelper.WriteIntoDebugLogger('MGMT_COMM_GET() Error: ' + str(e))
def MGMT_COMM_SET(
self,
Addr='ff02::1',
xCommissionerSessionID=None,
xSteeringData=None,
xBorderRouterLocator=None,
xChannelTlv=None,
ExceedMaxPayload=False,
):
"""send MGMT_COMM_SET command
Returns:
True: successful to send MGMT_COMM_SET
False: fail to send MGMT_COMM_SET
"""
print('%s call MGMT_COMM_SET' % self.port)
try:
cmd = self.wpan_cmd_prefix + 'commissioner mgmt-set '
print('-------------------------------')
print(xCommissionerSessionID)
print(xSteeringData)
print(str(xSteeringData) + ' ' + str(hex(xSteeringData)[2:]))
print(xBorderRouterLocator)
print(xChannelTlv)
print(ExceedMaxPayload)
print('-------------------------------')
if xCommissionerSessionID is not None:
# use assigned session id
cmd += '0b02' + str(xCommissionerSessionID)
elif xCommissionerSessionID is None:
# use original session id
if self.isActiveCommissioner is True:
cmd += '0b02' + self.__lstrip0x(self.__getCommissionerSessionId())
else:
pass
if xSteeringData is not None:
cmd += '08' + str(len(hex(xSteeringData)[2:])) + str(hex(xSteeringData)[2:])
if xBorderRouterLocator is not None:
cmd += '0902' + str(hex(xBorderRouterLocator))
if xChannelTlv is not None:
cmd += '000300' + '%04x' % xChannelTlv
print(cmd)
return self.__sendCommand(cmd)[0] != 'Fail'
except Exception as e:
ModuleHelper.WriteIntoDebugLogger('MGMT_COMM_SET() Error: ' + str(e))
def setActiveDataset(self, listActiveDataset=[]):
print('%s call setActiveDataset' % self.port)
def setCommisionerMode(self):
print('%s call setCommissionerMode' % self.port)
def setPSKc(self, strPSKc):
print('%s call setPSKc' % self.port)
try:
cmd = self.wpan_cmd_prefix + 'setprop Network:PSKc %s' % strPSKc
datasetCmd = self.wpan_cmd_prefix + 'setprop Dataset:PSKc %s' % strPSKc
self.hasActiveDatasetToCommit = True
return self.__sendCommand(cmd)[0] != 'Fail' and self.__sendCommand(datasetCmd)[0] != 'Fail'
except Exception as e:
ModuleHelper.WriteIntoDebugLogger('setPSKc() Error: ' + str(e))
def setActiveTimestamp(self, xActiveTimestamp):
print('%s call setActiveTimestamp' % self.port)
try:
sActiveTimestamp = str(xActiveTimestamp)
if len(sActiveTimestamp) < 16:
sActiveTimestamp = sActiveTimestamp.zfill(16)
self.activetimestamp = sActiveTimestamp
cmd = self.wpan_cmd_prefix + 'setprop Dataset:ActiveTimestamp %s' % sActiveTimestamp
self.hasActiveDatasetToCommit = True
return self.__sendCommand(cmd)[0] != 'Fail'
except Exception as e:
ModuleHelper.WriteIntoDebugLogger('setActiveTimestamp() Error: ' + str(e))
def setUdpJoinerPort(self, portNumber):
"""set Joiner UDP Port
Args:
portNumber: Joiner UDP Port number
Returns:
True: successful to set Joiner UDP Port
False: fail to set Joiner UDP Port
@todo : required if as reference device
"""
def commissionerUnregister(self):
"""stop commissioner
Returns:
True: successful to stop commissioner
False: fail to stop commissioner
"""
print('%s call commissionerUnregister' % self.port)
cmd = self.wpan_cmd_prefix + 'commissioner stop'
print(cmd)
if self.__sendCommand(cmd)[0] != 'Fail':
self.isActiveCommissioner = False
return True
else:
return False
def sendBeacons(self, sAddr, xCommissionerSessionId, listChannelMask, xPanId):
print('%s call sendBeacons' % self.port)
self._sendline(self.wpan_cmd_prefix + 'scan')
return True
def updateRouterStatus(self):
"""force update to router as if there is child id request
@todo : required if as reference device
"""
def setRouterThresholdValues(self, upgradeThreshold, downgradeThreshold):
print('%s call setRouterThresholdValues' % self.port)
self.__setRouterUpgradeThreshold(upgradeThreshold)
self.__setRouterDowngradeThreshold(downgradeThreshold)
def setMinDelayTimer(self, iSeconds):
pass
def ValidateDeviceFirmware(self):
print('%s call ValidateDeviceFirmware' % self.port)
if 'OPENTHREAD' in self.UIStatusMsg:
return True
else:
return False
@staticmethod
def __lstrip0x(s):
"""strip 0x at the beginning of a hex string if it exists
Args:
s: hex string
Returns:
hex string with leading 0x stripped
"""
if s.startswith('0x'):
s = s[2:]
return s