| #!/usr/bin/env python |
| # |
| # Copyright (c) 2016, The OpenThread Authors. |
| # All rights reserved. |
| # |
| # Redistribution and use in source and binary forms, with or without |
| # modification, are permitted provided that the following conditions are met: |
| # 1. Redistributions of source code must retain the above copyright |
| # notice, this list of conditions and the following disclaimer. |
| # 2. Redistributions in binary form must reproduce the above copyright |
| # notice, this list of conditions and the following disclaimer in the |
| # documentation and/or other materials provided with the distribution. |
| # 3. Neither the name of the copyright holder nor the |
| # names of its contributors may be used to endorse or promote products |
| # derived from this software without specific prior written permission. |
| # |
| # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" |
| # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE |
| # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE |
| # ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE |
| # LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR |
| # CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF |
| # SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS |
| # INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN |
| # CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) |
| # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
| # POSSIBILITY OF SUCH DAMAGE. |
| |
| """ |
| >> Thread Host Controller Interface |
| >> Device : OpenThread THCI |
| >> Class : OpenThread |
| """ |
| |
| import re |
| import time |
| import socket |
| import logging |
| from Queue import Queue |
| |
| import serial |
| from IThci import IThci |
| from GRLLibs.UtilityModules.Test import ( |
| Thread_Device_Role, |
| Device_Data_Requirement, |
| MacType, |
| ) |
| from GRLLibs.UtilityModules.enums import ( |
| PlatformDiagnosticPacket_Direction, |
| PlatformDiagnosticPacket_Type, |
| ) |
| from GRLLibs.UtilityModules.ModuleHelper import ModuleHelper, ThreadRunner |
| from GRLLibs.ThreadPacket.PlatformPackets import ( |
| PlatformDiagnosticPacket, |
| PlatformPackets, |
| ) |
| from GRLLibs.UtilityModules.Plugins.AES_CMAC import Thread_PBKDF2 |
| |
| LINESEPX = re.compile(r'\r\n|\n') |
| """regex: used to split lines""" |
| |
| |
| class OpenThread(IThci): |
| LOWEST_POSSIBLE_PARTATION_ID = 0x1 |
| LINK_QUALITY_CHANGE_TIME = 100 |
| |
| # Used for reference firmware version control for Test Harness. |
| # This variable will be updated to match the OpenThread reference firmware |
| # officially released. |
| firmwarePrefix = 'OPENTHREAD/' |
| |
| _update_router_status = False |
| |
| # def __init__(self, SerialPort=COMPortName, EUI=MAC_Address): |
| def __init__(self, **kwargs): |
| """initialize the serial port and default network parameters |
| Args: |
| **kwargs: Arbitrary keyword arguments |
| Includes 'EUI' and 'SerialPort' |
| """ |
| try: |
| self.UIStatusMsg = '' |
| self.mac = kwargs.get('EUI') |
| self.port = kwargs.get('SerialPort') |
| self.handle = None |
| self.AutoDUTEnable = False |
| self._is_net = False # whether device is through ser2net |
| self.logStatus = { |
| 'stop': 'stop', |
| 'running': 'running', |
| 'pauseReq': 'pauseReq', |
| 'paused': 'paused', |
| } |
| self.joinStatus = { |
| 'notstart': 'notstart', |
| 'ongoing': 'ongoing', |
| 'succeed': 'succeed', |
| 'failed': 'failed', |
| } |
| self.logThreadStatus = self.logStatus['stop'] |
| self.intialize() |
| except Exception as e: |
| ModuleHelper.WriteIntoDebugLogger('initialize() Error: ' + str(e)) |
| |
| def __del__(self): |
| """close the serial port connection""" |
| try: |
| self.closeConnection() |
| self.deviceConnected = False |
| except Exception as e: |
| ModuleHelper.WriteIntoDebugLogger('delete() Error: ' + str(e)) |
| |
| def _expect(self, expected, times=50): |
| """Find the `expected` line within `times` trials. |
| |
| Args: |
| expected str: the expected string |
| times int: number of trials |
| """ |
| print('[%s] Expecting [%s]' % (self.port, expected)) |
| |
| retry_times = 10 |
| while times > 0 and retry_times > 0: |
| line = self._readline() |
| print('[%s] Got line [%s]' % (self.port, line)) |
| |
| if line == expected: |
| print('[%s] Expected [%s]' % (self.port, expected)) |
| return |
| |
| if not line: |
| retry_times -= 1 |
| time.sleep(0.1) |
| |
| times -= 1 |
| |
| raise Exception('failed to find expected string[%s]' % expected) |
| |
| def _read(self, size=512): |
| logging.info('%s: reading', self.port) |
| if self._is_net: |
| return self.handle.recv(size) |
| else: |
| return self.handle.read(size) |
| |
| def _write(self, data): |
| logging.info('%s: writing', self.port) |
| if self._is_net: |
| self.handle.sendall(data) |
| else: |
| self.handle.write(data) |
| |
| def _readline(self): |
| """Read exactly one line from the device |
| |
| Returns: |
| None on no data |
| """ |
| logging.info('%s: reading line', self.port) |
| if len(self._lines) > 1: |
| return self._lines.pop(0) |
| |
| tail = '' |
| if len(self._lines): |
| tail = self._lines.pop() |
| |
| try: |
| tail += self._read() |
| except socket.error: |
| logging.exception('%s: No new data', self.port) |
| time.sleep(0.1) |
| |
| self._lines += LINESEPX.split(tail) |
| if len(self._lines) > 1: |
| return self._lines.pop(0) |
| |
| def _sendline(self, line): |
| """Send exactly one line to the device |
| |
| Args: |
| line str: data send to device |
| """ |
| logging.info('%s: sending line', self.port) |
| # clear buffer |
| self._lines = [] |
| try: |
| self._read() |
| except socket.error: |
| logging.debug('%s: Nothing cleared', self.port) |
| |
| print('sending [%s]' % line) |
| self._write(line + '\r\n') |
| |
| # wait for write to complete |
| time.sleep(0.1) |
| |
| def __sendCommand(self, cmd): |
| """send specific command to reference unit over serial port |
| |
| Args: |
| cmd: OpenThread CLI string |
| |
| Returns: |
| Done: successfully send the command to reference unit and parse it |
| Value: successfully retrieve the desired value from reference unit |
| Error: some errors occur, indicates by the followed specific error number |
| """ |
| logging.info('%s: sendCommand[%s]', self.port, cmd) |
| if self.logThreadStatus == self.logStatus['running']: |
| self.logThreadStatus = self.logStatus['pauseReq'] |
| while ( |
| self.logThreadStatus != self.logStatus['paused'] |
| and self.logThreadStatus != self.logStatus['stop'] |
| ): |
| pass |
| |
| try: |
| # command retransmit times |
| retry_times = 3 |
| while retry_times > 0: |
| retry_times -= 1 |
| try: |
| self._sendline(cmd) |
| self._expect(cmd) |
| except Exception as e: |
| logging.exception( |
| '%s: failed to send command[%s]: %s', |
| self.port, |
| cmd, |
| str(e), |
| ) |
| if retry_times == 0: |
| raise |
| else: |
| break |
| |
| line = None |
| response = [] |
| retry_times = 10 |
| while retry_times > 0: |
| line = self._readline() |
| logging.info('%s: the read line is[%s]', self.port, line) |
| if line: |
| response.append(line) |
| if line == 'Done': |
| break |
| else: |
| retry_times -= 1 |
| time.sleep(0.2) |
| if line != 'Done': |
| raise Exception( |
| '%s: failed to find end of response' % self.port |
| ) |
| logging.info('%s: send command[%s] done!', self.port, cmd) |
| return response |
| except Exception as e: |
| ModuleHelper.WriteIntoDebugLogger('sendCommand() Error: ' + str(e)) |
| raise |
| |
| def __disableRouterEligible(self): |
| """disable router role |
| """ |
| print('call __disableRouterEligible') |
| try: |
| cmd = 'routereligible disable' |
| self.__sendCommand(cmd) |
| except Exception as e: |
| ModuleHelper.WriteIntoDebugLogger( |
| '__disableRouterEligible() Error: ' + str(e) |
| ) |
| |
| def __setDeviceMode(self, mode): |
| """set thread device mode: |
| |
| Args: |
| mode: thread device mode |
| r: rx-on-when-idle |
| s: secure IEEE 802.15.4 data request |
| d: full thread device |
| n: full network data |
| |
| Returns: |
| True: successful to set the device mode |
| False: fail to set the device mode |
| """ |
| print('call __setDeviceMode') |
| print(mode) |
| try: |
| cmd = 'mode %s' % mode |
| return self.__sendCommand(cmd)[-1] == 'Done' |
| except Exception as e: |
| ModuleHelper.WriteIntoDebugLogger( |
| 'setDeviceMode() Error: ' + str(e) |
| ) |
| |
| def __setRouterUpgradeThreshold(self, iThreshold): |
| """set router upgrade threshold |
| |
| Args: |
| iThreshold: the number of active routers on the Thread network |
| partition below which a REED may decide to become a Router. |
| |
| Returns: |
| True: successful to set the ROUTER_UPGRADE_THRESHOLD |
| False: fail to set ROUTER_UPGRADE_THRESHOLD |
| """ |
| print('call __setRouterUpgradeThreshold') |
| try: |
| cmd = 'routerupgradethreshold %s' % str(iThreshold) |
| print(cmd) |
| return self.__sendCommand(cmd)[-1] == 'Done' |
| except Exception as e: |
| ModuleHelper.WriteIntoDebugLogger( |
| 'setRouterUpgradeThreshold() Error: ' + str(e) |
| ) |
| |
| def __setRouterDowngradeThreshold(self, iThreshold): |
| """set router downgrade threshold |
| |
| Args: |
| iThreshold: the number of active routers on the Thread network |
| partition above which an active router may decide to |
| become a child. |
| |
| Returns: |
| True: successful to set the ROUTER_DOWNGRADE_THRESHOLD |
| False: fail to set ROUTER_DOWNGRADE_THRESHOLD |
| """ |
| print('call __setRouterDowngradeThreshold') |
| try: |
| cmd = 'routerdowngradethreshold %s' % str(iThreshold) |
| print(cmd) |
| return self.__sendCommand(cmd)[-1] == 'Done' |
| except Exception as e: |
| ModuleHelper.WriteIntoDebugLogger( |
| 'setRouterDowngradeThreshold() Error: ' + str(e) |
| ) |
| |
| def __setRouterSelectionJitter(self, iRouterJitter): |
| """set ROUTER_SELECTION_JITTER parameter for REED to upgrade to Router |
| |
| Args: |
| iRouterJitter: a random period prior to request Router ID for REED |
| |
| Returns: |
| True: successful to set the ROUTER_SELECTION_JITTER |
| False: fail to set ROUTER_SELECTION_JITTER |
| """ |
| print('call _setRouterSelectionJitter') |
| try: |
| cmd = 'routerselectionjitter %s' % str(iRouterJitter) |
| print(cmd) |
| return self.__sendCommand(cmd)[-1] == 'Done' |
| except Exception as e: |
| ModuleHelper.WriteIntoDebugLogger( |
| 'setRouterSelectionJitter() Error: ' + str(e) |
| ) |
| |
| def __setAddressfilterMode(self, mode): |
| """set address filter mode |
| |
| Returns: |
| True: successful to set address filter mode. |
| False: fail to set address filter mode. |
| """ |
| print('call setAddressFilterMode() ' + mode) |
| try: |
| cmd = 'macfilter addr ' + mode |
| if self.__sendCommand(cmd)[-1] == 'Done': |
| return True |
| return False |
| except Exception as e: |
| ModuleHelper.WriteIntoDebugLogger( |
| '__setAddressFilterMode() Error: ' + str(e) |
| ) |
| |
| def __startOpenThread(self): |
| """start OpenThread stack |
| |
| Returns: |
| True: successful to start OpenThread stack and thread interface up |
| False: fail to start OpenThread stack |
| """ |
| print('call startOpenThread') |
| try: |
| if self.hasActiveDatasetToCommit: |
| if self.__sendCommand('dataset commit active')[0] != 'Done': |
| raise Exception('failed to commit active dataset') |
| else: |
| self.hasActiveDatasetToCommit = False |
| |
| # restore whitelist/blacklist address filter mode if rejoin after |
| # reset |
| if self.isPowerDown: |
| if self._addressfilterMode == 'whitelist': |
| if self.__setAddressfilterMode('whitelist'): |
| for addr in self._addressfilterSet: |
| self.addAllowMAC(addr) |
| elif self._addressfilterMode == 'blacklist': |
| if self.__setAddressfilterMode('blacklist'): |
| for addr in self._addressfilterSet: |
| self.addBlockedMAC(addr) |
| |
| if self.deviceRole in [ |
| Thread_Device_Role.Leader, |
| Thread_Device_Role.Router, |
| Thread_Device_Role.REED, |
| ]: |
| self.__setRouterSelectionJitter(1) |
| |
| if self.__sendCommand('ifconfig up')[-1] == 'Done': |
| if self.__sendCommand('thread start')[-1] == 'Done': |
| self.isPowerDown = False |
| return True |
| else: |
| return False |
| except Exception as e: |
| ModuleHelper.WriteIntoDebugLogger( |
| 'startOpenThread() Error: ' + str(e) |
| ) |
| |
| def __stopOpenThread(self): |
| """stop OpenThread stack |
| |
| Returns: |
| True: successful to stop OpenThread stack and thread interface down |
| False: fail to stop OpenThread stack |
| """ |
| print('call stopOpenThread') |
| try: |
| if self.__sendCommand('thread stop')[-1] == 'Done': |
| return self.__sendCommand('ifconfig down')[-1] == 'Done' |
| else: |
| return False |
| except Exception as e: |
| ModuleHelper.WriteIntoDebugLogger( |
| 'stopOpenThread() Error: ' + str(e) |
| ) |
| |
| def __isOpenThreadRunning(self): |
| """check whether or not OpenThread is running |
| |
| Returns: |
| True: OpenThread is running |
| False: OpenThread is not running |
| """ |
| print('call isOpenThreadRunning') |
| return self.__sendCommand('state')[0] != 'disabled' |
| |
| # rloc16 might be hex string or integer, need to return actual allocated |
| # router id |
| def __convertRlocToRouterId(self, xRloc16): |
| """mapping Rloc16 to router id |
| |
| Args: |
| xRloc16: hex rloc16 short address |
| |
| Returns: |
| actual router id allocated by leader |
| """ |
| routerList = [] |
| routerList = self.__sendCommand('router list')[0].split() |
| print(routerList) |
| print(xRloc16) |
| |
| for index in routerList: |
| router = [] |
| cmd = 'router %s' % index |
| router = self.__sendCommand(cmd) |
| |
| for line in router: |
| if 'Done' in line: |
| break |
| elif 'Router ID' in line: |
| routerid = line.split()[2] |
| elif 'Rloc' in line: |
| rloc16 = line.split()[1] |
| else: |
| pass |
| |
| # process input rloc16 |
| if isinstance(xRloc16, str): |
| rloc16 = '0x' + rloc16 |
| if rloc16 == xRloc16: |
| return routerid |
| elif isinstance(xRloc16, int): |
| if int(rloc16, 16) == xRloc16: |
| return routerid |
| else: |
| pass |
| |
| return None |
| |
| def __convertIp6PrefixStringToIp6Address(self, strIp6Prefix): |
| """convert IPv6 prefix string to IPv6 dotted-quad format |
| for example: |
| 2001000000000000 -> 2001:0000:0000:0000:: |
| |
| Args: |
| strIp6Prefix: IPv6 address string |
| |
| Returns: |
| IPv6 address dotted-quad format |
| """ |
| prefix1 = strIp6Prefix.rstrip('L') |
| prefix2 = self.__lstrip0x(prefix1) |
| hexPrefix = str(prefix2).ljust(16, '0') |
| hexIter = iter(hexPrefix) |
| finalMac = ':'.join( |
| a + b + c + d |
| for a, b, c, d in zip(hexIter, hexIter, hexIter, hexIter) |
| ) |
| prefix = str(finalMac) |
| strIp6Prefix = prefix[:19] |
| return strIp6Prefix + '::' |
| |
| def __convertLongToHex(self, iValue, fillZeros=None): |
| """convert a long hex integer to string |
| remove '0x' and 'L' return string |
| |
| Args: |
| iValue: long integer in hex format |
| fillZeros: pad string with zeros on the left to specified width |
| |
| Returns: |
| string of this long integer without '0x' and 'L' |
| """ |
| fmt = '%x' |
| if fillZeros is not None: |
| fmt = '%%0%dx' % fillZeros |
| |
| return fmt % iValue |
| |
| def __readCommissioningLogs(self, durationInSeconds): |
| """read logs during the commissioning process |
| |
| Args: |
| durationInSeconds: time duration for reading commissioning logs |
| |
| Returns: |
| Commissioning logs |
| """ |
| self.logThreadStatus = self.logStatus['running'] |
| logs = Queue() |
| t_end = time.time() + durationInSeconds |
| while time.time() < t_end: |
| time.sleep(0.3) |
| |
| if self.logThreadStatus == self.logStatus['pauseReq']: |
| self.logThreadStatus = self.logStatus['paused'] |
| |
| if self.logThreadStatus != self.logStatus['running']: |
| continue |
| |
| try: |
| line = self._readline() |
| if line: |
| print(line) |
| logs.put(line) |
| |
| if 'Join success' in line: |
| self.joinCommissionedStatus = self.joinStatus[ |
| 'succeed' |
| ] |
| break |
| elif 'Join failed' in line: |
| self.joinCommissionedStatus = self.joinStatus['failed'] |
| break |
| |
| except Exception: |
| pass |
| |
| self.logThreadStatus = self.logStatus['stop'] |
| return logs |
| |
| def __convertChannelMask(self, channelsArray): |
| """convert channelsArray to bitmask format |
| |
| Args: |
| channelsArray: channel array (i.e. [21, 22]) |
| |
| Returns: |
| bitmask format corresponding to a given channel array |
| """ |
| maskSet = 0 |
| |
| for eachChannel in channelsArray: |
| mask = 1 << eachChannel |
| maskSet = maskSet | mask |
| |
| return maskSet |
| |
| def __setChannelMask(self, channelMask): |
| print('call _setChannelMask') |
| try: |
| cmd = 'dataset channelmask %s' % channelMask |
| self.hasActiveDatasetToCommit = True |
| return self.__sendCommand(cmd)[-1] == 'Done' |
| except Exception as e: |
| ModuleHelper.WriteIntoDebugLogger( |
| 'setChannelMask() Error: ' + str(e) |
| ) |
| |
| def __setSecurityPolicy(self, securityPolicySecs, securityPolicyFlags): |
| print('call _setSecurityPolicy') |
| try: |
| cmd = 'dataset securitypolicy %s %s' % ( |
| str(securityPolicySecs), |
| securityPolicyFlags, |
| ) |
| self.hasActiveDatasetToCommit = True |
| return self.__sendCommand(cmd)[-1] == 'Done' |
| except Exception as e: |
| ModuleHelper.WriteIntoDebugLogger( |
| 'setSecurityPolicy() Error: ' + str(e) |
| ) |
| |
| def __setKeySwitchGuardTime(self, iKeySwitchGuardTime): |
| """ set the Key switch guard time |
| |
| Args: |
| iKeySwitchGuardTime: key switch guard time |
| |
| Returns: |
| True: successful to set key switch guard time |
| False: fail to set key switch guard time |
| """ |
| print('%s call setKeySwitchGuardTime' % self.port) |
| print(iKeySwitchGuardTime) |
| try: |
| cmd = 'keysequence guardtime %s' % str(iKeySwitchGuardTime) |
| if self.__sendCommand(cmd)[-1] == 'Done': |
| time.sleep(1) |
| return True |
| else: |
| return False |
| except Exception as e: |
| ModuleHelper.WriteIntoDebugLogger( |
| 'setKeySwitchGuardTime() Error; ' + str(e) |
| ) |
| |
| def __getCommissionerSessionId(self): |
| """ get the commissioner session id allocated from Leader """ |
| print('%s call getCommissionerSessionId' % self.port) |
| return self.__sendCommand('commissioner sessionid')[0] |
| |
| def _connect(self): |
| print('My port is %s' % self.port) |
| if self.port.startswith('COM'): |
| self.handle = serial.Serial(self.port, 115200, timeout=0) |
| time.sleep(1) |
| self.handle.write('\r\n') |
| time.sleep(0.1) |
| self._is_net = False |
| elif ':' in self.port: |
| host, port = self.port.split(':') |
| self.handle = socket.create_connection((host, port)) |
| self.handle.setblocking(0) |
| self._is_net = True |
| else: |
| raise Exception('Unknown port schema') |
| self.UIStatusMsg = self.getVersionNumber() |
| |
| def closeConnection(self): |
| """close current serial port connection""" |
| print('%s call closeConnection' % self.port) |
| try: |
| if self.handle: |
| self.handle.close() |
| self.handle = None |
| except Exception as e: |
| ModuleHelper.WriteIntoDebugLogger( |
| 'closeConnection() Error: ' + str(e) |
| ) |
| |
| def intialize(self): |
| """initialize the serial port with baudrate, timeout parameters""" |
| print('%s call intialize' % self.port) |
| try: |
| self.deviceConnected = False |
| |
| # init serial port |
| self._connect() |
| |
| if self.firmwarePrefix in self.UIStatusMsg: |
| self.deviceConnected = True |
| else: |
| self.UIStatusMsg = ( |
| 'Firmware Not Matching Expecting ' |
| + self.firmwarePrefix |
| + ' Now is ' |
| + self.UIStatusMsg |
| ) |
| ModuleHelper.WriteIntoDebugLogger( |
| 'Err: OpenThread device Firmware not matching..' |
| ) |
| |
| except Exception as e: |
| ModuleHelper.WriteIntoDebugLogger('intialize() Error: ' + str(e)) |
| self.deviceConnected = False |
| |
| def setNetworkName(self, networkName='GRL'): |
| """set Thread Network name |
| |
| Args: |
| networkName: the networkname string to be set |
| |
| Returns: |
| True: successful to set the Thread Networkname |
| False: fail to set the Thread Networkname |
| """ |
| print('%s call setNetworkName' % self.port) |
| print(networkName) |
| try: |
| cmd = 'networkname %s' % networkName |
| datasetCmd = 'dataset networkname %s' % networkName |
| self.hasActiveDatasetToCommit = True |
| return ( |
| self.__sendCommand(cmd)[-1] == 'Done' |
| and self.__sendCommand(datasetCmd)[-1] == 'Done' |
| ) |
| except Exception as e: |
| ModuleHelper.WriteIntoDebugLogger( |
| 'setNetworkName() Error: ' + str(e) |
| ) |
| |
| def setChannel(self, channel=11): |
| """set channel of Thread device operates on. |
| |
| Args: |
| channel: |
| (0 - 10: Reserved) |
| (11 - 26: 2.4GHz channels) |
| (27 - 65535: Reserved) |
| |
| Returns: |
| True: successful to set the channel |
| False: fail to set the channel |
| """ |
| print('%s call setChannel' % self.port) |
| print(channel) |
| try: |
| cmd = 'channel %s' % channel |
| datasetCmd = 'dataset channel %s' % channel |
| self.hasActiveDatasetToCommit = True |
| return ( |
| self.__sendCommand(cmd)[-1] == 'Done' |
| and self.__sendCommand(datasetCmd)[-1] == 'Done' |
| ) |
| except Exception as e: |
| ModuleHelper.WriteIntoDebugLogger('setChannel() Error: ' + str(e)) |
| |
| def getChannel(self): |
| """get current channel""" |
| print('%s call getChannel' % self.port) |
| return self.__sendCommand('channel')[0] |
| |
| def setMAC(self, xEUI): |
| """set the extended addresss of Thread device |
| |
| Args: |
| xEUI: extended address in hex format |
| |
| Returns: |
| True: successful to set the extended address |
| False: fail to set the extended address |
| """ |
| print('%s call setMAC' % self.port) |
| print(xEUI) |
| address64 = '' |
| try: |
| if not xEUI: |
| address64 = self.mac |
| |
| if not isinstance(xEUI, str): |
| address64 = self.__convertLongToHex(xEUI, 16) |
| else: |
| address64 = xEUI |
| |
| cmd = 'extaddr %s' % address64 |
| print(cmd) |
| if self.__sendCommand(cmd)[-1] == 'Done': |
| self.mac = address64 |
| return True |
| else: |
| return False |
| except Exception as e: |
| ModuleHelper.WriteIntoDebugLogger('setMAC() Error: ' + str(e)) |
| |
| def getMAC(self, bType=MacType.RandomMac): |
| """get one specific type of MAC address |
| currently OpenThread only supports Random MAC address |
| |
| Args: |
| bType: indicate which kind of MAC address is required |
| |
| Returns: |
| specific type of MAC address |
| """ |
| print('%s call getMAC' % self.port) |
| print(bType) |
| # if power down happens, return extended address assigned previously |
| if self.isPowerDown: |
| macAddr64 = self.mac |
| else: |
| if bType == MacType.FactoryMac: |
| macAddr64 = self.__sendCommand('eui64')[0] |
| elif bType == MacType.HashMac: |
| macAddr64 = self.__sendCommand('joiner id')[0] |
| else: |
| macAddr64 = self.__sendCommand('extaddr')[0] |
| print(macAddr64) |
| |
| return int(macAddr64, 16) |
| |
| def getLL64(self): |
| """get link local unicast IPv6 address""" |
| print('%s call getLL64' % self.port) |
| return self.__sendCommand('ipaddr linklocal')[0] |
| |
| def getRloc16(self): |
| """get rloc16 short address""" |
| print('%s call getRloc16' % self.port) |
| rloc16 = self.__sendCommand('rloc16')[0] |
| return int(rloc16, 16) |
| |
| def getRloc(self): |
| """get router locator unicast Ipv6 address""" |
| print('%s call getRloc' % self.port) |
| return self.__sendCommand('ipaddr rloc')[0] |
| |
| def __getGlobal(self): |
| """get global unicast IPv6 address set |
| if configuring multiple entries |
| """ |
| print('%s call getGlobal' % self.port) |
| globalAddrs = [] |
| rlocAddr = self.getRloc() |
| |
| addrs = self.__sendCommand('ipaddr') |
| |
| # take rloc address as a reference for current mesh local prefix, |
| # because for some TCs, mesh local prefix may be updated through |
| # pending dataset management. |
| for ip6Addr in addrs: |
| if ip6Addr == 'Done': |
| break |
| |
| fullIp = ModuleHelper.GetFullIpv6Address(ip6Addr).lower() |
| |
| print('address %s' % fullIp) |
| |
| if fullIp.startswith('fe80'): |
| print('link local') |
| continue |
| |
| if fullIp.startswith(rlocAddr[0:19]): |
| print('mesh local') |
| continue |
| |
| globalAddrs.append(fullIp) |
| print('global') |
| |
| return globalAddrs |
| |
| def setNetworkKey(self, key): |
| """set Thread Network master key |
| |
| Args: |
| key: Thread Network master key used in secure the MLE/802.15.4 packet |
| |
| Returns: |
| True: successful to set the Thread Network master key |
| False: fail to set the Thread Network master key |
| """ |
| masterKey = '' |
| print('%s call setNetworkKey' % self.port) |
| print(key) |
| try: |
| if not isinstance(key, str): |
| masterKey = self.__convertLongToHex(key, 32) |
| cmd = 'masterkey %s' % masterKey |
| datasetCmd = 'dataset masterkey %s' % masterKey |
| else: |
| masterKey = key |
| cmd = 'masterkey %s' % masterKey |
| datasetCmd = 'dataset masterkey %s' % masterKey |
| |
| self.networkKey = masterKey |
| self.hasActiveDatasetToCommit = True |
| return ( |
| self.__sendCommand(cmd)[-1] == 'Done' |
| and self.__sendCommand(datasetCmd)[-1] == 'Done' |
| ) |
| except Exception as e: |
| ModuleHelper.WriteIntoDebugLogger( |
| 'setNetworkkey() Error: ' + str(e) |
| ) |
| |
| def addBlockedMAC(self, xEUI): |
| """add a given extended address to the blacklist entry |
| |
| Args: |
| xEUI: extended address in hex format |
| |
| Returns: |
| True: successful to add a given extended address to the blacklist entry |
| False: fail to add a given extended address to the blacklist entry |
| """ |
| print('%s call addBlockedMAC' % self.port) |
| print(xEUI) |
| if isinstance(xEUI, str): |
| macAddr = xEUI |
| else: |
| macAddr = self.__convertLongToHex(xEUI) |
| |
| try: |
| # if blocked device is itself |
| if macAddr == self.mac: |
| print('block device itself') |
| return True |
| |
| if self._addressfilterMode != 'blacklist': |
| if self.__setAddressfilterMode('blacklist'): |
| self._addressfilterMode = 'blacklist' |
| |
| cmd = 'macfilter addr add %s' % macAddr |
| print(cmd) |
| ret = self.__sendCommand(cmd)[-1] == 'Done' |
| |
| self._addressfilterSet.add(macAddr) |
| print('current blacklist entries:') |
| for addr in self._addressfilterSet: |
| print(addr) |
| |
| return ret |
| except Exception as e: |
| ModuleHelper.WriteIntoDebugLogger( |
| 'addBlockedMAC() Error: ' + str(e) |
| ) |
| |
| def addAllowMAC(self, xEUI): |
| """add a given extended address to the whitelist addressfilter |
| |
| Args: |
| xEUI: a given extended address in hex format |
| |
| Returns: |
| True: successful to add a given extended address to the whitelist entry |
| False: fail to add a given extended address to the whitelist entry |
| """ |
| print('%s call addAllowMAC' % self.port) |
| print(xEUI) |
| if isinstance(xEUI, str): |
| macAddr = xEUI |
| else: |
| macAddr = self.__convertLongToHex(xEUI) |
| |
| try: |
| if self._addressfilterMode != 'whitelist': |
| if self.__setAddressfilterMode('whitelist'): |
| self._addressfilterMode = 'whitelist' |
| |
| cmd = 'macfilter addr add %s' % macAddr |
| print(cmd) |
| ret = self.__sendCommand(cmd)[-1] == 'Done' |
| |
| self._addressfilterSet.add(macAddr) |
| print('current whitelist entries:') |
| for addr in self._addressfilterSet: |
| print(addr) |
| return ret |
| |
| except Exception as e: |
| ModuleHelper.WriteIntoDebugLogger('addAllowMAC() Error: ' + str(e)) |
| |
| def clearBlockList(self): |
| """clear all entries in blacklist table |
| |
| Returns: |
| True: successful to clear the blacklist |
| False: fail to clear the blacklist |
| """ |
| print('%s call clearBlockList' % self.port) |
| |
| # remove all entries in blacklist |
| try: |
| print('clearing blacklist entries:') |
| for addr in self._addressfilterSet: |
| print(addr) |
| |
| # disable blacklist |
| if self.__setAddressfilterMode('disable'): |
| self._addressfilterMode = 'disable' |
| # clear ops |
| cmd = 'macfilter addr clear' |
| if self.__sendCommand(cmd)[-1] == 'Done': |
| self._addressfilterSet.clear() |
| return True |
| return False |
| except Exception as e: |
| ModuleHelper.WriteIntoDebugLogger( |
| 'clearBlockList() Error: ' + str(e) |
| ) |
| |
| def clearAllowList(self): |
| """clear all entries in whitelist table |
| |
| Returns: |
| True: successful to clear the whitelist |
| False: fail to clear the whitelist |
| """ |
| print('%s call clearAllowList' % self.port) |
| |
| # remove all entries in whitelist |
| try: |
| print('clearing whitelist entries:') |
| for addr in self._addressfilterSet: |
| print(addr) |
| |
| # disable whitelist |
| if self.__setAddressfilterMode('disable'): |
| self._addressfilterMode = 'disable' |
| # clear ops |
| cmd = 'macfilter addr clear' |
| if self.__sendCommand(cmd)[-1] == 'Done': |
| self._addressfilterSet.clear() |
| return True |
| return False |
| except Exception as e: |
| ModuleHelper.WriteIntoDebugLogger( |
| 'clearAllowList() Error: ' + str(e) |
| ) |
| |
| def getDeviceRole(self): |
| """get current device role in Thread Network""" |
| print('%s call getDeviceRole' % self.port) |
| return self.__sendCommand('state')[0] |
| |
| def joinNetwork(self, eRoleId): |
| """make device ready to join the Thread Network with a given role |
| |
| Args: |
| eRoleId: a given device role id |
| |
| Returns: |
| True: ready to set Thread Network parameter for joining desired Network |
| """ |
| print('%s call joinNetwork' % self.port) |
| print(eRoleId) |
| |
| self.deviceRole = eRoleId |
| mode = '' |
| try: |
| if ModuleHelper.LeaderDutChannelFound: |
| self.channel = ModuleHelper.Default_Channel |
| |
| # FIXME: when Harness call setNetworkDataRequirement()? |
| # only sleep end device requires stable networkdata now |
| if eRoleId == Thread_Device_Role.Leader: |
| print('join as leader') |
| mode = 'rsdn' |
| if self.AutoDUTEnable is False: |
| # set ROUTER_DOWNGRADE_THRESHOLD |
| self.__setRouterDowngradeThreshold(33) |
| elif eRoleId == Thread_Device_Role.Router: |
| print('join as router') |
| mode = 'rsdn' |
| if self.AutoDUTEnable is False: |
| # set ROUTER_DOWNGRADE_THRESHOLD |
| self.__setRouterDowngradeThreshold(33) |
| elif eRoleId == Thread_Device_Role.SED: |
| print('join as sleepy end device') |
| mode = 's' |
| self.__setPollPeriod(self.__sedPollPeriod) |
| elif eRoleId == Thread_Device_Role.EndDevice: |
| print('join as end device') |
| mode = 'rsn' |
| elif eRoleId == Thread_Device_Role.REED: |
| print('join as REED') |
| mode = 'rsdn' |
| # set ROUTER_UPGRADE_THRESHOLD |
| self.__setRouterUpgradeThreshold(0) |
| elif eRoleId == Thread_Device_Role.EndDevice_FED: |
| print('join as FED') |
| mode = 'rsdn' |
| # always remain an ED, never request to be a router |
| self.__disableRouterEligible() |
| elif eRoleId == Thread_Device_Role.EndDevice_MED: |
| print('join as MED') |
| mode = 'rsn' |
| else: |
| pass |
| |
| # set Thread device mode with a given role |
| self.__setDeviceMode(mode) |
| |
| # start OpenThread |
| self.__startOpenThread() |
| time.sleep(3) |
| |
| if self._update_router_status and eRoleId == Thread_Device_Role.Router: |
| self.__updateRouterStatus() |
| |
| time.sleep(5) # increase delay temporally (+5s) to remedy TH's delay updates |
| |
| return True |
| except Exception as e: |
| ModuleHelper.WriteIntoDebugLogger('joinNetwork() Error: ' + str(e)) |
| |
| def getNetworkFragmentID(self): |
| """get current partition id of Thread Network Partition from LeaderData |
| |
| Returns: |
| The Thread network Partition Id |
| """ |
| print('%s call getNetworkFragmentID' % self.port) |
| if not self.__isOpenThreadRunning(): |
| print('OpenThread is not running') |
| return None |
| |
| leaderData = [] |
| leaderData = self.__sendCommand('leaderdata') |
| return int(leaderData[0].split()[2], 16) |
| |
| def getParentAddress(self): |
| """get Thread device's parent extended address and rloc16 short address |
| |
| Returns: |
| The extended address of parent in hex format |
| """ |
| print('%s call getParentAddress' % self.port) |
| parentInfo = [] |
| parentInfo = self.__sendCommand('parent') |
| |
| for line in parentInfo: |
| if 'Done' in line: |
| break |
| elif 'Ext Addr' in line: |
| eui = line.split()[2] |
| print(eui) |
| # elif 'Rloc' in line: |
| # rloc16 = line.split()[1] |
| # print(rloc16) |
| else: |
| pass |
| |
| return int(eui, 16) |
| |
| def powerDown(self): |
| """power down the Thread device""" |
| print('%s call powerDown' % self.port) |
| self._sendline('reset') |
| self.isPowerDown = True |
| |
| def powerUp(self): |
| """power up the Thread device""" |
| print('%s call powerUp' % self.port) |
| if not self.handle: |
| self._connect() |
| |
| self.isPowerDown = False |
| |
| if not self.__isOpenThreadRunning(): |
| self.__startOpenThread() |
| |
| def reboot(self): |
| """reset and rejoin to Thread Network without any timeout |
| |
| Returns: |
| True: successful to reset and rejoin the Thread Network |
| False: fail to reset and rejoin the Thread Network |
| """ |
| print('%s call reboot' % self.port) |
| try: |
| self._sendline('reset') |
| self.isPowerDown = True |
| time.sleep(3) |
| |
| self.__startOpenThread() |
| time.sleep(3) |
| |
| if self.__sendCommand('state')[0] == 'disabled': |
| print('[FAIL] reboot') |
| return False |
| else: |
| return True |
| except Exception as e: |
| ModuleHelper.WriteIntoDebugLogger('reboot() Error: ' + str(e)) |
| |
| def ping(self, destination, length=20): |
| """ send ICMPv6 echo request with a given length to a unicast destination |
| address |
| |
| Args: |
| destination: the unicast destination address of ICMPv6 echo request |
| length: the size of ICMPv6 echo request payload |
| """ |
| print('%s call ping' % self.port) |
| print('destination: %s' % destination) |
| try: |
| cmd = 'ping %s %s' % (destination, str(length)) |
| print(cmd) |
| self._sendline(cmd) |
| self._expect(cmd) |
| # wait echo reply |
| time.sleep(6) # increase delay temporally (+5s) to remedy TH's delay updates |
| except Exception as e: |
| ModuleHelper.WriteIntoDebugLogger('ping() Error: ' + str(e)) |
| |
| def multicast_Ping(self, destination, length=20): |
| """send ICMPv6 echo request with a given length to a multicast destination |
| address |
| |
| Args: |
| destination: the multicast destination address of ICMPv6 echo request |
| length: the size of ICMPv6 echo request payload |
| """ |
| print('%s call multicast_Ping' % self.port) |
| print('destination: %s' % destination) |
| try: |
| cmd = 'ping %s %s' % (destination, str(length)) |
| print(cmd) |
| self._sendline(cmd) |
| self._expect(cmd) |
| # wait echo reply |
| time.sleep(1) |
| except Exception as e: |
| ModuleHelper.WriteIntoDebugLogger( |
| 'multicast_ping() Error: ' + str(e) |
| ) |
| |
| def getVersionNumber(self): |
| """get OpenThread stack firmware version number""" |
| print('%s call getVersionNumber' % self.port) |
| return self.__sendCommand('version')[0] |
| |
| def setPANID(self, xPAN): |
| """set Thread Network PAN ID |
| |
| Args: |
| xPAN: a given PAN ID in hex format |
| |
| Returns: |
| True: successful to set the Thread Network PAN ID |
| False: fail to set the Thread Network PAN ID |
| """ |
| print('%s call setPANID' % self.port) |
| print(xPAN) |
| panid = '' |
| try: |
| if not isinstance(xPAN, str): |
| panid = str(hex(xPAN)) |
| print(panid) |
| |
| cmd = 'panid %s' % panid |
| datasetCmd = 'dataset panid %s' % panid |
| self.hasActiveDatasetToCommit = True |
| return ( |
| self.__sendCommand(cmd)[-1] == 'Done' |
| and self.__sendCommand(datasetCmd)[-1] == 'Done' |
| ) |
| except Exception as e: |
| ModuleHelper.WriteIntoDebugLogger('setPANID() Error: ' + str(e)) |
| |
| def reset(self): |
| """factory reset""" |
| print('%s call reset' % self.port) |
| try: |
| self._sendline('factoryreset') |
| self._read() |
| time.sleep(0.5) |
| |
| except Exception as e: |
| ModuleHelper.WriteIntoDebugLogger('reset() Error: ' + str(e)) |
| |
| def removeRouter(self, xRouterId): |
| """kickoff router with a given router id from the Thread Network |
| |
| Args: |
| xRouterId: a given router id in hex format |
| |
| Returns: |
| True: successful to remove the router from the Thread Network |
| False: fail to remove the router from the Thread Network |
| """ |
| print('%s call removeRouter' % self.port) |
| print(xRouterId) |
| routerId = '' |
| routerId = self.__convertRlocToRouterId(xRouterId) |
| print(routerId) |
| |
| if routerId is None: |
| print('no matched xRouterId') |
| return False |
| |
| try: |
| cmd = 'releaserouterid %s' % routerId |
| return self.__sendCommand(cmd)[-1] == 'Done' |
| except Exception as e: |
| ModuleHelper.WriteIntoDebugLogger( |
| 'removeRouter() Error: ' + str(e) |
| ) |
| |
| def setDefaultValues(self): |
| """set default mandatory Thread Network parameter value""" |
| print('%s call setDefaultValues' % self.port) |
| |
| # initialize variables |
| self.networkName = ModuleHelper.Default_NwkName |
| self.networkKey = ModuleHelper.Default_NwkKey |
| self.channel = ModuleHelper.Default_Channel |
| self.channelMask = '0x7fff800' # (0xffff << 11) |
| self.panId = ModuleHelper.Default_PanId |
| self.xpanId = ModuleHelper.Default_XpanId |
| self.meshLocalPrefix = ModuleHelper.Default_MLPrefix |
| # OT only accept hex format PSKc for now |
| self.pskc = '00000000000000000000000000000000' |
| self.securityPolicySecs = ModuleHelper.Default_SecurityPolicy |
| self.securityPolicyFlags = 'onrcb' |
| self.activetimestamp = ModuleHelper.Default_ActiveTimestamp |
| # self.sedPollingRate = ModuleHelper.Default_Harness_SED_Polling_Rate |
| self.__sedPollPeriod = 3 * 1000 # in milliseconds |
| self.deviceRole = None |
| self.provisioningUrl = '' |
| self.hasActiveDatasetToCommit = False |
| self.logThread = Queue() |
| self.logThreadStatus = self.logStatus['stop'] |
| self.joinCommissionedStatus = self.joinStatus['notstart'] |
| # indicate Thread device requests full or stable network data |
| self.networkDataRequirement = '' |
| # indicate if Thread device experiences a power down event |
| self.isPowerDown = False |
| # indicate AddressFilter mode ['disable', 'whitelist', 'blacklist'] |
| self._addressfilterMode = 'disable' |
| self._addressfilterSet = set() # cache filter entries |
| # indicate if Thread device is an active commissioner |
| self.isActiveCommissioner = False |
| self._lines = None # buffered lines read from device |
| |
| # initialize device configuration |
| try: |
| self.setMAC(self.mac) |
| self.__setChannelMask(self.channelMask) |
| self.__setSecurityPolicy( |
| self.securityPolicySecs, self.securityPolicyFlags |
| ) |
| self.setChannel(self.channel) |
| self.setPANID(self.panId) |
| self.setXpanId(self.xpanId) |
| self.setNetworkName(self.networkName) |
| self.setNetworkKey(self.networkKey) |
| self.setMLPrefix(self.meshLocalPrefix) |
| self.setPSKc(self.pskc) |
| self.setActiveTimestamp(self.activetimestamp) |
| except Exception as e: |
| ModuleHelper.WriteIntoDebugLogger( |
| 'setDefaultValue() Error: ' + str(e) |
| ) |
| |
| def getDeviceConncetionStatus(self): |
| """check if serial port connection is ready or not""" |
| print('%s call getDeviceConnectionStatus' % self.port) |
| return self.deviceConnected |
| |
| def setPollingRate(self, iPollingRate): |
| """set data polling rate for sleepy end device |
| |
| Args: |
| iPollingRate: data poll period of sleepy end device (in seconds) |
| |
| Returns: |
| True: successful to set the data polling rate for sleepy end device |
| False: fail to set the data polling rate for sleepy end device |
| """ |
| print('%s call setPollingRate' % self.port) |
| |
| iPollingRate = int(iPollingRate * 1000) |
| print(iPollingRate) |
| |
| if self.__sedPollPeriod != iPollingRate: |
| self.__sedPollPeriod = iPollingRate |
| |
| # apply immediately |
| if self.__isOpenThreadRunning(): |
| return self.__setPollPeriod(self.__sedPollPeriod) |
| |
| return True |
| |
| def __setPollPeriod(self, iPollPeriod): |
| """set data poll period for sleepy end device |
| |
| Args: |
| iPollPeriod: data poll period of sleepy end device (in milliseconds) |
| |
| Returns: |
| True: successful to set the data poll period for sleepy end device |
| False: fail to set the data poll period for sleepy end device |
| """ |
| try: |
| cmd = 'pollperiod %d' % iPollPeriod |
| print(cmd) |
| return self.__sendCommand(cmd)[-1] == 'Done' |
| except Exception as e: |
| ModuleHelper.WriteIntoDebugLogger( |
| '__setPollPeriod() Error: ' + str(e) |
| ) |
| |
| def setLinkQuality(self, EUIadr, LinkQuality): |
| """set custom LinkQualityIn for all receiving messages from the specified EUIadr |
| |
| Args: |
| EUIadr: a given extended address |
| LinkQuality: a given custom link quality |
| link quality/link margin mapping table |
| 3: 21 - 255 (dB) |
| 2: 11 - 20 (dB) |
| 1: 3 - 9 (dB) |
| 0: 0 - 2 (dB) |
| |
| Returns: |
| True: successful to set the link quality |
| False: fail to set the link quality |
| """ |
| print('%s call setLinkQuality' % self.port) |
| print(EUIadr) |
| print(LinkQuality) |
| try: |
| # process EUIadr |
| euiHex = hex(EUIadr) |
| euiStr = str(euiHex) |
| euiStr = euiStr.rstrip('L') |
| address64 = '' |
| if '0x' in euiStr: |
| address64 = self.__lstrip0x(euiStr) |
| # prepend 0 at the beginning |
| if len(address64) < 16: |
| address64 = address64.zfill(16) |
| print(address64) |
| |
| cmd = 'macfilter rss add-lqi %s %s' % (address64, str(LinkQuality)) |
| print(cmd) |
| return self.__sendCommand(cmd)[-1] == 'Done' |
| except Exception as e: |
| ModuleHelper.WriteIntoDebugLogger( |
| 'setLinkQuality() Error: ' + str(e) |
| ) |
| |
| def setOutBoundLinkQuality(self, LinkQuality): |
| """set custom LinkQualityIn for all receiving messages from the any address |
| |
| Args: |
| LinkQuality: a given custom link quality |
| link quality/link margin mapping table |
| 3: 21 - 255 (dB) |
| 2: 11 - 20 (dB) |
| 1: 3 - 9 (dB) |
| 0: 0 - 2 (dB) |
| |
| Returns: |
| True: successful to set the link quality |
| False: fail to set the link quality |
| """ |
| print('%s call setOutBoundLinkQuality' % self.port) |
| print(LinkQuality) |
| try: |
| cmd = 'macfilter rss add-lqi * %s' % str(LinkQuality) |
| print(cmd) |
| return self.__sendCommand(cmd)[-1] == 'Done' |
| except Exception as e: |
| ModuleHelper.WriteIntoDebugLogger( |
| 'setOutBoundLinkQuality() Error: ' + str(e) |
| ) |
| |
| def removeRouterPrefix(self, prefixEntry): |
| """remove the configured prefix on a border router |
| |
| Args: |
| prefixEntry: a on-mesh prefix entry |
| |
| Returns: |
| True: successful to remove the prefix entry from border router |
| False: fail to remove the prefix entry from border router |
| """ |
| print('%s call removeRouterPrefix' % self.port) |
| print(prefixEntry) |
| prefix = self.__convertIp6PrefixStringToIp6Address(str(prefixEntry)) |
| try: |
| prefixLen = 64 |
| cmd = 'prefix remove %s/%d' % (prefix, prefixLen) |
| print(cmd) |
| if self.__sendCommand(cmd)[-1] == 'Done': |
| # send server data ntf to leader |
| return self.__sendCommand('netdataregister')[-1] == 'Done' |
| else: |
| return False |
| except Exception as e: |
| ModuleHelper.WriteIntoDebugLogger( |
| 'removeRouterPrefix() Error: ' + str(e) |
| ) |
| |
| def resetAndRejoin(self, timeout): |
| """reset and join back Thread Network with a given timeout delay |
| |
| Args: |
| timeout: a timeout interval before rejoin Thread Network |
| |
| Returns: |
| True: successful to reset and rejoin Thread Network |
| False: fail to reset and rejoin the Thread Network |
| """ |
| print('%s call resetAndRejoin' % self.port) |
| print(timeout) |
| try: |
| self._sendline('reset') |
| self.isPowerDown = True |
| time.sleep(timeout) |
| |
| if self.deviceRole == Thread_Device_Role.SED: |
| self.__setPollPeriod(self.__sedPollPeriod) |
| |
| self.__startOpenThread() |
| time.sleep(3) |
| |
| if self.__sendCommand('state')[0] == 'disabled': |
| print('[FAIL] reset and rejoin') |
| return False |
| return True |
| except Exception as e: |
| ModuleHelper.WriteIntoDebugLogger( |
| 'resetAndRejoin() Error: ' + str(e) |
| ) |
| |
| def configBorderRouter( |
| self, |
| P_Prefix, |
| P_stable=1, |
| P_default=1, |
| P_slaac_preferred=0, |
| P_Dhcp=0, |
| P_preference=0, |
| P_on_mesh=1, |
| P_nd_dns=0, |
| ): |
| """configure the border router with a given prefix entry parameters |
| |
| Args: |
| P_Prefix: IPv6 prefix that is available on the Thread Network |
| P_stable: true if the default router is expected to be stable network data |
| P_default: true if border router offers the default route for P_Prefix |
| P_slaac_preferred: true if allowing auto-configure address using P_Prefix |
| P_Dhcp: is true if border router is a DHCPv6 Agent |
| P_preference: is two-bit signed integer indicating router preference |
| P_on_mesh: is true if P_Prefix is considered to be on-mesh |
| P_nd_dns: is true if border router is able to supply DNS information obtained via ND |
| |
| Returns: |
| True: successful to configure the border router with a given prefix entry |
| False: fail to configure the border router with a given prefix entry |
| """ |
| print('%s call configBorderRouter' % self.port) |
| prefix = self.__convertIp6PrefixStringToIp6Address(str(P_Prefix)) |
| print(prefix) |
| try: |
| parameter = '' |
| prf = '' |
| |
| if P_slaac_preferred == 1: |
| parameter += 'p' |
| parameter += 'a' |
| |
| if P_stable == 1: |
| parameter += 's' |
| |
| if P_default == 1: |
| parameter += 'r' |
| |
| if P_Dhcp == 1: |
| parameter += 'd' |
| |
| if P_on_mesh == 1: |
| parameter += 'o' |
| |
| if P_preference == 1: |
| prf = 'high' |
| elif P_preference == 0: |
| prf = 'med' |
| elif P_preference == -1: |
| prf = 'low' |
| else: |
| pass |
| |
| cmd = 'prefix add %s/64 %s %s' % (prefix, parameter, prf) |
| print(cmd) |
| if self.__sendCommand(cmd)[-1] == 'Done': |
| # if prefix configured before starting OpenThread stack |
| # do not send out server data ntf pro-actively |
| if not self.__isOpenThreadRunning(): |
| return True |
| else: |
| # send server data ntf to leader |
| return self.__sendCommand('netdataregister')[-1] == 'Done' |
| else: |
| return False |
| except Exception as e: |
| ModuleHelper.WriteIntoDebugLogger( |
| 'configBorderRouter() Error: ' + str(e) |
| ) |
| |
| def setNetworkIDTimeout(self, iNwkIDTimeOut): |
| """set networkid timeout for Thread device |
| |
| Args: |
| iNwkIDTimeOut: a given NETWORK_ID_TIMEOUT |
| |
| Returns: |
| True: successful to set NETWORK_ID_TIMEOUT |
| False: fail to set NETWORK_ID_TIMEOUT |
| """ |
| print('%s call setNetworkIDTimeout' % self.port) |
| print(iNwkIDTimeOut) |
| iNwkIDTimeOut /= 1000 |
| try: |
| cmd = 'networkidtimeout %s' % str(iNwkIDTimeOut) |
| print(cmd) |
| return self.__sendCommand(cmd)[-1] == 'Done' |
| except Exception as e: |
| ModuleHelper.WriteIntoDebugLogger( |
| 'setNetworkIDTimeout() Error: ' + str(e) |
| ) |
| |
| def setKeepAliveTimeOut(self, iTimeOut): |
| """set keep alive timeout for device |
| has been deprecated and also set SED polling rate |
| |
| Args: |
| iTimeOut: data poll period for sleepy end device |
| |
| Returns: |
| True: successful to set the data poll period for SED |
| False: fail to set the data poll period for SED |
| """ |
| print('%s call setKeepAliveTimeOut' % self.port) |
| iTimeOut *= 1000 |
| print(int(iTimeOut)) |
| try: |
| cmd = 'pollperiod %d' % int(iTimeOut) |
| print(cmd) |
| return self.__sendCommand(cmd)[-1] == 'Done' |
| except Exception as e: |
| ModuleHelper.WriteIntoDebugLogger( |
| 'setKeepAliveTimeOut() Error: ' + str(e) |
| ) |
| |
| def setKeySequenceCounter(self, iKeySequenceValue): |
| """ set the Key sequence counter corresponding to Thread Network master key |
| |
| Args: |
| iKeySequenceValue: key sequence value |
| |
| Returns: |
| True: successful to set the key sequence |
| False: fail to set the key sequence |
| """ |
| print('%s call setKeySequenceCounter' % self.port) |
| print(iKeySequenceValue) |
| try: |
| # avoid key switch guard timer protection for reference device |
| self.__setKeySwitchGuardTime(0) |
| |
| cmd = 'keysequence counter %s' % str(iKeySequenceValue) |
| if self.__sendCommand(cmd)[-1] == 'Done': |
| time.sleep(1) |
| return True |
| else: |
| return False |
| except Exception as e: |
| ModuleHelper.WriteIntoDebugLogger( |
| 'setKeySequenceCounter() Error; ' + str(e) |
| ) |
| |
| def getKeySequenceCounter(self): |
| """get current Thread Network key sequence""" |
| print('%s call getKeySequenceCounter' % self.port) |
| keySequence = '' |
| keySequence = self.__sendCommand('keysequence counter')[0] |
| return keySequence |
| |
| def incrementKeySequenceCounter(self, iIncrementValue=1): |
| """increment the key sequence with a given value |
| |
| Args: |
| iIncrementValue: specific increment value to be added |
| |
| Returns: |
| True: successful to increment the key sequence with a given value |
| False: fail to increment the key sequence with a given value |
| """ |
| print('%s call incrementKeySequenceCounter' % self.port) |
| print(iIncrementValue) |
| currentKeySeq = '' |
| try: |
| # avoid key switch guard timer protection for reference device |
| self.__setKeySwitchGuardTime(0) |
| currentKeySeq = self.getKeySequenceCounter() |
| keySequence = int(currentKeySeq, 10) + iIncrementValue |
| print(keySequence) |
| return self.setKeySequenceCounter(keySequence) |
| except Exception as e: |
| ModuleHelper.WriteIntoDebugLogger( |
| 'incrementKeySequenceCounter() Error: ' + str(e) |
| ) |
| |
| def setNetworkDataRequirement(self, eDataRequirement): |
| """set whether the Thread device requires the full network data |
| or only requires the stable network data |
| |
| Args: |
| eDataRequirement: is true if requiring the full network data |
| |
| Returns: |
| True: successful to set the network requirement |
| """ |
| print('%s call setNetworkDataRequirement' % self.port) |
| print(eDataRequirement) |
| |
| if eDataRequirement == Device_Data_Requirement.ALL_DATA: |
| self.networkDataRequirement = 'n' |
| return True |
| |
| def configExternalRouter(self, P_Prefix, P_stable, R_Preference=0): |
| """configure border router with a given external route prefix entry |
| |
| Args: |
| P_Prefix: IPv6 prefix for the route |
| P_Stable: is true if the external route prefix is stable network data |
| R_Preference: a two-bit signed integer indicating Router preference |
| 1: high |
| 0: medium |
| -1: low |
| |
| Returns: |
| True: successful to configure the border router with a given external route prefix |
| False: fail to configure the border router with a given external route prefix |
| """ |
| print('%s call configExternalRouter' % self.port) |
| print(P_Prefix) |
| stable = '' |
| prefix = self.__convertIp6PrefixStringToIp6Address(str(P_Prefix)) |
| try: |
| if R_Preference == 1: |
| prf = 'high' |
| elif R_Preference == 0: |
| prf = 'med' |
| elif R_Preference == -1: |
| prf = 'low' |
| else: |
| pass |
| |
| if P_stable: |
| stable += 's' |
| cmd = 'route add %s/64 %s %s' % (prefix, stable, prf) |
| else: |
| cmd = 'route add %s/64 %s' % (prefix, prf) |
| print(cmd) |
| |
| if self.__sendCommand(cmd)[-1] == 'Done': |
| # send server data ntf to leader |
| return self.__sendCommand('netdataregister')[-1] == 'Done' |
| except Exception as e: |
| ModuleHelper.WriteIntoDebugLogger( |
| 'configExternalRouter() Error: ' + str(e) |
| ) |
| |
| def getNeighbouringRouters(self): |
| """get neighboring routers information |
| |
| Returns: |
| neighboring routers' extended address |
| """ |
| print('%s call getNeighbouringRouters' % self.port) |
| try: |
| routerInfo = [] |
| routerList = [] |
| routerList = self.__sendCommand('router list')[0].split() |
| print(routerList) |
| |
| if 'Done' in routerList: |
| print('no neighbouring routers') |
| return None |
| |
| for index in routerList: |
| router = [] |
| cmd = 'router %s' % index |
| router = self.__sendCommand(cmd) |
| |
| for line in router: |
| if 'Done' in line: |
| break |
| # elif 'Rloc' in line: |
| # rloc16 = line.split()[1] |
| elif 'Ext Addr' in line: |
| eui = line.split()[2] |
| routerInfo.append(int(eui, 16)) |
| # elif 'LQI In' in line: |
| # lqi_in = line.split()[1] |
| # elif 'LQI Out' in line: |
| # lqi_out = line.split()[1] |
| else: |
| pass |
| |
| print(routerInfo) |
| return routerInfo |
| except Exception as e: |
| ModuleHelper.WriteIntoDebugLogger( |
| 'getNeighbouringDevice() Error: ' + str(e) |
| ) |
| |
| def getChildrenInfo(self): |
| """get all children information |
| |
| Returns: |
| children's extended address |
| """ |
| print('%s call getChildrenInfo' % self.port) |
| try: |
| childrenInfoAll = [] |
| childrenInfo = {'EUI': 0, 'Rloc16': 0, 'MLEID': ''} |
| childrenList = self.__sendCommand('child list')[0].split() |
| print(childrenList) |
| |
| if 'Done' in childrenList: |
| print('no children') |
| return None |
| |
| for index in childrenList: |
| cmd = 'child %s' % index |
| child = [] |
| child = self.__sendCommand(cmd) |
| |
| for line in child: |
| if 'Done' in line: |
| break |
| elif 'Rloc' in line: |
| rloc16 = line.split()[1] |
| elif 'Ext Addr' in line: |
| eui = line.split()[2] |
| # elif 'Child ID' in line: |
| # child_id = line.split()[2] |
| # elif 'Mode' in line: |
| # mode = line.split()[1] |
| else: |
| pass |
| |
| childrenInfo['EUI'] = int(eui, 16) |
| childrenInfo['Rloc16'] = int(rloc16, 16) |
| # children_info['MLEID'] = self.getMLEID() |
| |
| childrenInfoAll.append(childrenInfo['EUI']) |
| # childrenInfoAll.append(childrenInfo) |
| |
| print(childrenInfoAll) |
| return childrenInfoAll |
| except Exception as e: |
| ModuleHelper.WriteIntoDebugLogger( |
| 'getChildrenInfo() Error: ' + str(e) |
| ) |
| |
| def setXpanId(self, xPanId): |
| """set extended PAN ID of Thread Network |
| |
| Args: |
| xPanId: extended PAN ID in hex format |
| |
| Returns: |
| True: successful to set the extended PAN ID |
| False: fail to set the extended PAN ID |
| """ |
| xpanid = '' |
| print('%s call setXpanId' % self.port) |
| print(xPanId) |
| try: |
| if not isinstance(xPanId, str): |
| xpanid = self.__convertLongToHex(xPanId, 16) |
| cmd = 'extpanid %s' % xpanid |
| datasetCmd = 'dataset extpanid %s' % xpanid |
| else: |
| xpanid = xPanId |
| cmd = 'extpanid %s' % xpanid |
| datasetCmd = 'dataset extpanid %s' % xpanid |
| |
| self.xpanId = xpanid |
| self.hasActiveDatasetToCommit = True |
| return ( |
| self.__sendCommand(cmd)[-1] == 'Done' |
| and self.__sendCommand(datasetCmd)[-1] == 'Done' |
| ) |
| except Exception as e: |
| ModuleHelper.WriteIntoDebugLogger('setXpanId() Error: ' + str(e)) |
| |
| def getNeighbouringDevices(self): |
| """gets the neighboring devices' extended address to compute the DUT |
| extended address automatically |
| |
| Returns: |
| A list including extended address of neighboring routers, parent |
| as well as children |
| """ |
| print('%s call getNeighbouringDevices' % self.port) |
| neighbourList = [] |
| |
| # get parent info |
| parentAddr = self.getParentAddress() |
| if parentAddr != 0: |
| neighbourList.append(parentAddr) |
| |
| # get ED/SED children info |
| childNeighbours = self.getChildrenInfo() |
| if childNeighbours is not None and len(childNeighbours) > 0: |
| for entry in childNeighbours: |
| neighbourList.append(entry) |
| |
| # get neighboring routers info |
| routerNeighbours = self.getNeighbouringRouters() |
| if routerNeighbours is not None and len(routerNeighbours) > 0: |
| for entry in routerNeighbours: |
| neighbourList.append(entry) |
| |
| print(neighbourList) |
| return neighbourList |
| |
| def setPartationId(self, partationId): |
| """set Thread Network Partition ID |
| |
| Args: |
| partitionId: partition id to be set by leader |
| |
| Returns: |
| True: successful to set the Partition ID |
| False: fail to set the Partition ID |
| """ |
| print('%s call setPartationId' % self.port) |
| print(partationId) |
| |
| cmd = 'leaderpartitionid %s' % (str(hex(partationId)).rstrip('L')) |
| print(cmd) |
| return self.__sendCommand(cmd)[-1] == 'Done' |
| |
| def getGUA(self, filterByPrefix=None): |
| """get expected global unicast IPv6 address of Thread device |
| |
| note: existing filterByPrefix are string of in lowercase. e.g. |
| '2001' or '2001:0db8:0001:0000". |
| |
| Args: |
| filterByPrefix: a given expected global IPv6 prefix to be matched |
| |
| Returns: |
| a global IPv6 address |
| """ |
| print('%s call getGUA' % self.port) |
| print(filterByPrefix) |
| globalAddrs = [] |
| try: |
| # get global addrs set if multiple |
| globalAddrs = self.__getGlobal() |
| |
| if filterByPrefix is None: |
| return globalAddrs[0] |
| else: |
| for fullIp in globalAddrs: |
| if fullIp.startswith(filterByPrefix): |
| print('target global %s' % fullIp) |
| return fullIp |
| print('no global address matched') |
| return str(globalAddrs[0]) |
| except Exception as e: |
| ModuleHelper.WriteIntoDebugLogger('getGUA() Error: ' + str(e)) |
| |
| def getShortAddress(self): |
| """get Rloc16 short address of Thread device""" |
| print('%s call getShortAddress' % self.port) |
| return self.getRloc16() |
| |
| def getULA64(self): |
| """get mesh local EID of Thread device""" |
| print('%s call getULA64' % self.port) |
| return self.__sendCommand('ipaddr mleid')[0] |
| |
| def setMLPrefix(self, sMeshLocalPrefix): |
| """set mesh local prefix""" |
| print('%s call setMLPrefix' % self.port) |
| try: |
| cmd = 'dataset meshlocalprefix %s' % sMeshLocalPrefix |
| self.hasActiveDatasetToCommit = True |
| return self.__sendCommand(cmd)[-1] == 'Done' |
| except Exception as e: |
| ModuleHelper.WriteIntoDebugLogger('setMLPrefix() Error: ' + str(e)) |
| |
| def getML16(self): |
| """get mesh local 16 unicast address (Rloc)""" |
| print('%s call getML16' % self.port) |
| return self.getRloc() |
| |
| def downgradeToDevice(self): |
| pass |
| |
| def upgradeToRouter(self): |
| pass |
| |
| def forceSetSlaac(self, slaacAddress): |
| """force to set a slaac IPv6 address to Thread interface |
| |
| Args: |
| slaacAddress: a slaac IPv6 address to be set |
| |
| Returns: |
| True: successful to set slaac address to Thread interface |
| False: fail to set slaac address to Thread interface |
| """ |
| print('%s call forceSetSlaac' % self.port) |
| print(slaacAddress) |
| try: |
| cmd = 'ipaddr add %s' % str(slaacAddress) |
| print(cmd) |
| return self.__sendCommand(cmd)[-1] == 'Done' |
| except Exception as e: |
| ModuleHelper.WriteIntoDebugLogger( |
| 'forceSetSlaac() Error: ' + str(e) |
| ) |
| |
| def setSleepyNodePollTime(self): |
| pass |
| |
| def enableAutoDUTObjectFlag(self): |
| """set AutoDUTenable flag""" |
| print('%s call enableAutoDUTObjectFlag' % self.port) |
| self.AutoDUTEnable = True |
| |
| def getChildTimeoutValue(self): |
| """get child timeout""" |
| print('%s call getChildTimeoutValue' % self.port) |
| childTimeout = self.__sendCommand('childtimeout')[0] |
| return int(childTimeout) |
| |
| def diagnosticGet(self, strDestinationAddr, listTLV_ids=[]): |
| if not listTLV_ids: |
| return |
| |
| if not len(listTLV_ids): |
| return |
| |
| cmd = 'networkdiagnostic get %s %s' % ( |
| strDestinationAddr, |
| ' '.join([str(tlv) for tlv in listTLV_ids]), |
| ) |
| print(cmd) |
| |
| return self._sendline(cmd) |
| |
| def diagnosticReset(self, strDestinationAddr, listTLV_ids=[]): |
| if not listTLV_ids: |
| return |
| |
| if not len(listTLV_ids): |
| return |
| |
| cmd = 'networkdiagnostic reset %s %s' % ( |
| strDestinationAddr, |
| ' '.join([str(tlv) for tlv in listTLV_ids]), |
| ) |
| print(cmd) |
| |
| return self.__sendCommand(cmd) |
| |
| def diagnosticQuery(self, strDestinationAddr, listTLV_ids=[]): |
| self.diagnosticGet(strDestinationAddr, listTLV_ids) |
| |
| def startNativeCommissioner(self, strPSKc='GRLpassWord'): |
| # TODO: Support the whole Native Commissioner functionality |
| # Currently it only aims to trigger a Discovery Request message to pass |
| # Certification test 5.8.4 |
| print('%s call startNativeCommissioner' % self.port) |
| self.__sendCommand('ifconfig up') |
| cmd = 'joiner start %s' % (strPSKc) |
| print(cmd) |
| if self.__sendCommand(cmd)[-1] == 'Done': |
| return True |
| else: |
| return False |
| |
| def startCollapsedCommissioner(self): |
| """start Collapsed Commissioner |
| |
| Returns: |
| True: successful to start Commissioner |
| False: fail to start Commissioner |
| """ |
| print('%s call startCollapsedCommissioner' % self.port) |
| if self.__startOpenThread(): |
| time.sleep(20) |
| cmd = 'commissioner start' |
| print(cmd) |
| if self.__sendCommand(cmd)[-1] == 'Done': |
| self.isActiveCommissioner = True |
| time.sleep(20) # time for petition process |
| return True |
| return False |
| |
| def setJoinKey(self, strPSKc): |
| pass |
| |
| def scanJoiner(self, xEUI='*', strPSKd='threadjpaketest'): |
| """scan Joiner |
| |
| Args: |
| xEUI: Joiner's EUI-64 |
| strPSKd: Joiner's PSKd for commissioning |
| |
| Returns: |
| True: successful to add Joiner's steering data |
| False: fail to add Joiner's steering data |
| """ |
| print('%s call scanJoiner' % self.port) |
| |
| # long timeout value to avoid automatic joiner removal (in seconds) |
| timeout = 500 |
| |
| if not isinstance(xEUI, str): |
| eui64 = self.__convertLongToHex(xEUI, 16) |
| else: |
| eui64 = xEUI |
| |
| cmd = 'commissioner joiner add %s %s %s' % ( |
| eui64, |
| strPSKd, |
| str(timeout), |
| ) |
| print(cmd) |
| if self.__sendCommand(cmd)[-1] == 'Done': |
| if self.logThreadStatus == self.logStatus['stop']: |
| self.logThread = ThreadRunner.run( |
| target=self.__readCommissioningLogs, args=(120,) |
| ) |
| return True |
| else: |
| return False |
| |
| def setProvisioningUrl(self, strURL='grl.com'): |
| """set provisioning Url |
| |
| Args: |
| strURL: Provisioning Url string |
| |
| Returns: |
| True: successful to set provisioning Url |
| False: fail to set provisioning Url |
| """ |
| print('%s call setProvisioningUrl' % self.port) |
| self.provisioningUrl = strURL |
| if self.deviceRole == Thread_Device_Role.Commissioner: |
| cmd = 'commissioner provisioningurl %s' % (strURL) |
| return self.__sendCommand(cmd)[-1] == 'Done' |
| return True |
| |
| def allowCommission(self): |
| """start commissioner candidate petition process |
| |
| Returns: |
| True: successful to start commissioner candidate petition process |
| False: fail to start commissioner candidate petition process |
| """ |
| print('%s call allowCommission' % self.port) |
| try: |
| cmd = 'commissioner start' |
| print(cmd) |
| if self.__sendCommand(cmd)[-1] == 'Done': |
| self.isActiveCommissioner = True |
| # time for petition process and at least one keep alive |
| time.sleep(40) |
| return True |
| else: |
| return False |
| except Exception as e: |
| ModuleHelper.writeintodebuglogger( |
| 'allowcommission() error: ' + str(e) |
| ) |
| |
| def joinCommissioned(self, strPSKd='threadjpaketest', waitTime=20): |
| """start joiner |
| |
| Args: |
| strPSKd: Joiner's PSKd |
| |
| Returns: |
| True: successful to start joiner |
| False: fail to start joiner |
| """ |
| print('%s call joinCommissioned' % self.port) |
| self.__sendCommand('ifconfig up') |
| cmd = 'joiner start %s %s' % (strPSKd, self.provisioningUrl) |
| print(cmd) |
| if self.__sendCommand(cmd)[-1] == 'Done': |
| maxDuration = 150 # seconds |
| self.joinCommissionedStatus = self.joinStatus['ongoing'] |
| |
| if self.logThreadStatus == self.logStatus['stop']: |
| self.logThread = ThreadRunner.run( |
| target=self.__readCommissioningLogs, args=(maxDuration,) |
| ) |
| |
| t_end = time.time() + maxDuration |
| while time.time() < t_end: |
| if self.joinCommissionedStatus == self.joinStatus['succeed']: |
| break |
| elif self.joinCommissionedStatus == self.joinStatus['failed']: |
| return False |
| |
| time.sleep(1) |
| |
| self.__sendCommand('thread start') |
| time.sleep(30) |
| return True |
| else: |
| return False |
| |
| def getCommissioningLogs(self): |
| """get Commissioning logs |
| |
| Returns: |
| Commissioning logs |
| """ |
| rawLogs = self.logThread.get() |
| ProcessedLogs = [] |
| payload = [] |
| while not rawLogs.empty(): |
| rawLogEach = rawLogs.get() |
| print(rawLogEach) |
| if '[THCI]' not in rawLogEach: |
| continue |
| |
| EncryptedPacket = PlatformDiagnosticPacket() |
| infoList = rawLogEach.split('[THCI]')[1].split(']')[0].split('|') |
| for eachInfo in infoList: |
| print(eachInfo) |
| info = eachInfo.split('=') |
| infoType = info[0].strip() |
| infoValue = info[1].strip() |
| if 'direction' in infoType: |
| EncryptedPacket.Direction = ( |
| PlatformDiagnosticPacket_Direction.IN |
| if 'recv' in infoValue |
| else PlatformDiagnosticPacket_Direction.OUT |
| if 'send' in infoValue |
| else PlatformDiagnosticPacket_Direction.UNKNOWN |
| ) |
| elif 'type' in infoType: |
| EncryptedPacket.Type = ( |
| PlatformDiagnosticPacket_Type.JOIN_FIN_req |
| if 'JOIN_FIN.req' in infoValue |
| else PlatformDiagnosticPacket_Type.JOIN_FIN_rsp |
| if 'JOIN_FIN.rsp' in infoValue |
| else PlatformDiagnosticPacket_Type.JOIN_ENT_req |
| if 'JOIN_ENT.ntf' in infoValue |
| else PlatformDiagnosticPacket_Type.JOIN_ENT_rsp |
| if 'JOIN_ENT.rsp' in infoValue |
| else PlatformDiagnosticPacket_Type.UNKNOWN |
| ) |
| elif 'len' in infoType: |
| bytesInEachLine = 16 |
| EncryptedPacket.TLVsLength = int(infoValue) |
| payloadLineCount = ( |
| int(infoValue) + bytesInEachLine - 1 |
| ) / bytesInEachLine |
| while payloadLineCount > 0: |
| payloadLineCount = payloadLineCount - 1 |
| payloadLine = rawLogs.get() |
| payloadSplit = payloadLine.split('|') |
| for block in range(1, 3): |
| payloadBlock = payloadSplit[block] |
| payloadValues = payloadBlock.split(' ') |
| for num in range(1, 9): |
| if '..' not in payloadValues[num]: |
| payload.append(int(payloadValues[num], 16)) |
| |
| EncryptedPacket.TLVs = ( |
| PlatformPackets.read(EncryptedPacket.Type, payload) |
| if payload != [] |
| else [] |
| ) |
| |
| ProcessedLogs.append(EncryptedPacket) |
| return ProcessedLogs |
| |
| def MGMT_ED_SCAN( |
| self, |
| sAddr, |
| xCommissionerSessionId, |
| listChannelMask, |
| xCount, |
| xPeriod, |
| xScanDuration, |
| ): |
| """send MGMT_ED_SCAN message to a given destinaition. |
| |
| Args: |
| sAddr: IPv6 destination address for this message |
| xCommissionerSessionId: commissioner session id |
| listChannelMask: a channel array to indicate which channels to be scaned |
| xCount: number of IEEE 802.15.4 ED Scans (milliseconds) |
| xPeriod: Period between successive IEEE802.15.4 ED Scans (milliseconds) |
| xScanDuration: ScanDuration when performing an IEEE 802.15.4 ED Scan (milliseconds) |
| |
| Returns: |
| True: successful to send MGMT_ED_SCAN message. |
| False: fail to send MGMT_ED_SCAN message |
| """ |
| print('%s call MGMT_ED_SCAN' % self.port) |
| channelMask = '' |
| channelMask = '0x' + self.__convertLongToHex( |
| self.__convertChannelMask(listChannelMask) |
| ) |
| try: |
| cmd = 'commissioner energy %s %s %s %s %s' % ( |
| channelMask, |
| xCount, |
| xPeriod, |
| xScanDuration, |
| sAddr, |
| ) |
| print(cmd) |
| return self.__sendCommand(cmd)[-1] == 'Done' |
| except Exception as e: |
| ModuleHelper.writeintodebuglogger( |
| 'MGMT_ED_SCAN() error: ' + str(e) |
| ) |
| |
| def MGMT_PANID_QUERY( |
| self, sAddr, xCommissionerSessionId, listChannelMask, xPanId |
| ): |
| """send MGMT_PANID_QUERY message to a given destination |
| |
| Args: |
| xPanId: a given PAN ID to check the conflicts |
| |
| Returns: |
| True: successful to send MGMT_PANID_QUERY message. |
| False: fail to send MGMT_PANID_QUERY message. |
| """ |
| print('%s call MGMT_PANID_QUERY' % self.port) |
| panid = '' |
| channelMask = '' |
| channelMask = '0x' + self.__convertLongToHex( |
| self.__convertChannelMask(listChannelMask) |
| ) |
| |
| if not isinstance(xPanId, str): |
| panid = str(hex(xPanId)) |
| |
| try: |
| cmd = 'commissioner panid %s %s %s' % (panid, channelMask, sAddr) |
| print(cmd) |
| return self.__sendCommand(cmd)[-1] == 'Done' |
| except Exception as e: |
| ModuleHelper.writeintodebuglogger( |
| 'MGMT_PANID_QUERY() error: ' + str(e) |
| ) |
| |
| def MGMT_ANNOUNCE_BEGIN( |
| self, sAddr, xCommissionerSessionId, listChannelMask, xCount, xPeriod |
| ): |
| """send MGMT_ANNOUNCE_BEGIN message to a given destination |
| |
| Returns: |
| True: successful to send MGMT_ANNOUNCE_BEGIN message. |
| False: fail to send MGMT_ANNOUNCE_BEGIN message. |
| """ |
| print('%s call MGMT_ANNOUNCE_BEGIN' % self.port) |
| channelMask = '' |
| channelMask = '0x' + self.__convertLongToHex( |
| self.__convertChannelMask(listChannelMask) |
| ) |
| try: |
| cmd = 'commissioner announce %s %s %s %s' % ( |
| channelMask, |
| xCount, |
| xPeriod, |
| sAddr, |
| ) |
| print(cmd) |
| return self.__sendCommand(cmd)[-1] == 'Done' |
| except Exception as e: |
| ModuleHelper.writeintodebuglogger( |
| 'MGMT_ANNOUNCE_BEGIN() error: ' + str(e) |
| ) |
| |
| def MGMT_ACTIVE_GET(self, Addr='', TLVs=[]): |
| """send MGMT_ACTIVE_GET command |
| |
| Returns: |
| True: successful to send MGMT_ACTIVE_GET |
| False: fail to send MGMT_ACTIVE_GET |
| """ |
| print('%s call MGMT_ACTIVE_GET' % self.port) |
| try: |
| cmd = 'dataset mgmtgetcommand active' |
| |
| if Addr != '': |
| cmd += ' address ' |
| cmd += Addr |
| |
| if len(TLVs) != 0: |
| tlvs = ''.join('%02x' % tlv for tlv in TLVs) |
| cmd += ' binary ' |
| cmd += tlvs |
| |
| print(cmd) |
| |
| return self.__sendCommand(cmd)[-1] == 'Done' |
| |
| except Exception as e: |
| ModuleHelper.WriteIntoDebugLogger( |
| 'MGMT_ACTIVE_GET() Error: ' + str(e) |
| ) |
| |
| def MGMT_ACTIVE_SET( |
| self, |
| sAddr='', |
| xCommissioningSessionId=None, |
| listActiveTimestamp=None, |
| listChannelMask=None, |
| xExtendedPanId=None, |
| sNetworkName=None, |
| sPSKc=None, |
| listSecurityPolicy=None, |
| xChannel=None, |
| sMeshLocalPrefix=None, |
| xMasterKey=None, |
| xPanId=None, |
| xTmfPort=None, |
| xSteeringData=None, |
| xBorderRouterLocator=None, |
| BogusTLV=None, |
| xDelayTimer=None, |
| ): |
| """send MGMT_ACTIVE_SET command |
| |
| Returns: |
| True: successful to send MGMT_ACTIVE_SET |
| False: fail to send MGMT_ACTIVE_SET |
| """ |
| print('%s call MGMT_ACTIVE_SET' % self.port) |
| try: |
| cmd = 'dataset mgmtsetcommand active' |
| |
| if listActiveTimestamp is not None: |
| cmd += ' activetimestamp ' |
| cmd += str(listActiveTimestamp[0]) |
| |
| if xExtendedPanId is not None: |
| cmd += ' extpanid ' |
| xpanid = self.__convertLongToHex(xExtendedPanId, 16) |
| |
| cmd += xpanid |
| |
| if sNetworkName is not None: |
| cmd += ' networkname ' |
| cmd += str(sNetworkName) |
| |
| if xChannel is not None: |
| cmd += ' channel ' |
| cmd += str(xChannel) |
| |
| if sMeshLocalPrefix is not None: |
| cmd += ' localprefix ' |
| cmd += str(sMeshLocalPrefix) |
| |
| if xMasterKey is not None: |
| cmd += ' masterkey ' |
| key = self.__convertLongToHex(xMasterKey, 32) |
| |
| cmd += key |
| |
| if xPanId is not None: |
| cmd += ' panid ' |
| cmd += str(xPanId) |
| |
| if listChannelMask is not None: |
| cmd += ' channelmask ' |
| cmd += '0x' + self.__convertLongToHex( |
| self.__convertChannelMask(listChannelMask) |
| ) |
| |
| if ( |
| sPSKc is not None |
| or listSecurityPolicy is not None |
| or xCommissioningSessionId is not None |
| or xTmfPort is not None |
| or xSteeringData is not None |
| or xBorderRouterLocator is not None |
| or BogusTLV is not None |
| ): |
| cmd += ' binary ' |
| |
| if sPSKc is not None: |
| cmd += '0410' |
| stretchedPskc = Thread_PBKDF2.get( |
| sPSKc, |
| ModuleHelper.Default_XpanId, |
| ModuleHelper.Default_NwkName, |
| ) |
| pskc = '%x' % stretchedPskc |
| |
| if len(pskc) < 32: |
| pskc = pskc.zfill(32) |
| |
| cmd += pskc |
| |
| if listSecurityPolicy is not None: |
| cmd += '0c03' |
| |
| rotationTime = 0 |
| policyBits = 0 |
| |
| # previous passing way listSecurityPolicy=[True, True, 3600, |
| # False, False, True] |
| if len(listSecurityPolicy) == 6: |
| rotationTime = listSecurityPolicy[2] |
| |
| # the last three reserved bits must be 1 |
| policyBits = 0b00000111 |
| |
| if listSecurityPolicy[0]: |
| policyBits = policyBits | 0b10000000 |
| if listSecurityPolicy[1]: |
| policyBits = policyBits | 0b01000000 |
| if listSecurityPolicy[3]: |
| policyBits = policyBits | 0b00100000 |
| if listSecurityPolicy[4]: |
| policyBits = policyBits | 0b00010000 |
| if listSecurityPolicy[5]: |
| policyBits = policyBits | 0b00001000 |
| else: |
| # new passing way listSecurityPolicy=[3600, 0b11001111] |
| rotationTime = listSecurityPolicy[0] |
| policyBits = listSecurityPolicy[1] |
| |
| policy = str(hex(rotationTime))[2:] |
| |
| if len(policy) < 4: |
| policy = policy.zfill(4) |
| |
| cmd += policy |
| |
| cmd += str(hex(policyBits))[2:] |
| |
| if xCommissioningSessionId is not None: |
| cmd += '0b02' |
| sessionid = str(hex(xCommissioningSessionId))[2:] |
| |
| if len(sessionid) < 4: |
| sessionid = sessionid.zfill(4) |
| |
| cmd += sessionid |
| |
| if xBorderRouterLocator is not None: |
| cmd += '0902' |
| locator = str(hex(xBorderRouterLocator))[2:] |
| |
| if len(locator) < 4: |
| locator = locator.zfill(4) |
| |
| cmd += locator |
| |
| if xSteeringData is not None: |
| steeringData = self.__convertLongToHex(xSteeringData) |
| cmd += '08' + str(len(steeringData) / 2).zfill(2) |
| cmd += steeringData |
| |
| if BogusTLV is not None: |
| cmd += '8202aa55' |
| |
| print(cmd) |
| |
| return self.__sendCommand(cmd)[-1] == 'Done' |
| |
| except Exception as e: |
| ModuleHelper.WriteIntoDebugLogger( |
| 'MGMT_ACTIVE_SET() Error: ' + str(e) |
| ) |
| |
| def MGMT_PENDING_GET(self, Addr='', TLVs=[]): |
| """send MGMT_PENDING_GET command |
| |
| Returns: |
| True: successful to send MGMT_PENDING_GET |
| False: fail to send MGMT_PENDING_GET |
| """ |
| print('%s call MGMT_PENDING_GET' % self.port) |
| try: |
| cmd = 'dataset mgmtgetcommand pending' |
| |
| if Addr != '': |
| cmd += ' address ' |
| cmd += Addr |
| |
| if len(TLVs) != 0: |
| tlvs = ''.join('%02x' % tlv for tlv in TLVs) |
| cmd += ' binary ' |
| cmd += tlvs |
| |
| print(cmd) |
| |
| return self.__sendCommand(cmd)[-1] == 'Done' |
| |
| except Exception as e: |
| ModuleHelper.WriteIntoDebugLogger( |
| 'MGMT_PENDING_GET() Error: ' + str(e) |
| ) |
| |
| def MGMT_PENDING_SET( |
| self, |
| sAddr='', |
| xCommissionerSessionId=None, |
| listPendingTimestamp=None, |
| listActiveTimestamp=None, |
| xDelayTimer=None, |
| xChannel=None, |
| xPanId=None, |
| xMasterKey=None, |
| sMeshLocalPrefix=None, |
| sNetworkName=None, |
| ): |
| """send MGMT_PENDING_SET command |
| |
| Returns: |
| True: successful to send MGMT_PENDING_SET |
| False: fail to send MGMT_PENDING_SET |
| """ |
| print('%s call MGMT_PENDING_SET' % self.port) |
| try: |
| cmd = 'dataset mgmtsetcommand pending' |
| |
| if listPendingTimestamp is not None: |
| cmd += ' pendingtimestamp ' |
| cmd += str(listPendingTimestamp[0]) |
| |
| if listActiveTimestamp is not None: |
| cmd += ' activetimestamp ' |
| cmd += str(listActiveTimestamp[0]) |
| |
| if xDelayTimer is not None: |
| cmd += ' delaytimer ' |
| cmd += str(xDelayTimer) |
| # cmd += ' delaytimer 3000000' |
| |
| if xChannel is not None: |
| cmd += ' channel ' |
| cmd += str(xChannel) |
| |
| if xPanId is not None: |
| cmd += ' panid ' |
| cmd += str(xPanId) |
| |
| if xMasterKey is not None: |
| cmd += ' masterkey ' |
| key = self.__convertLongToHex(xMasterKey, 32) |
| |
| cmd += key |
| |
| if sMeshLocalPrefix is not None: |
| cmd += ' localprefix ' |
| cmd += str(sMeshLocalPrefix) |
| |
| if sNetworkName is not None: |
| cmd += ' networkname ' |
| cmd += str(sNetworkName) |
| |
| if xCommissionerSessionId is not None: |
| cmd += ' binary ' |
| cmd += '0b02' |
| sessionid = str(hex(xCommissionerSessionId))[2:] |
| |
| if len(sessionid) < 4: |
| sessionid = sessionid.zfill(4) |
| |
| cmd += sessionid |
| |
| print(cmd) |
| |
| return self.__sendCommand(cmd)[-1] == 'Done' |
| |
| except Exception as e: |
| ModuleHelper.WriteIntoDebugLogger( |
| 'MGMT_PENDING_SET() Error: ' + str(e) |
| ) |
| |
| def MGMT_COMM_GET(self, Addr='ff02::1', TLVs=[]): |
| """send MGMT_COMM_GET command |
| |
| Returns: |
| True: successful to send MGMT_COMM_GET |
| False: fail to send MGMT_COMM_GET |
| """ |
| print('%s call MGMT_COMM_GET' % self.port) |
| try: |
| cmd = 'commissioner mgmtget' |
| |
| if len(TLVs) != 0: |
| tlvs = ''.join('%02x' % tlv for tlv in TLVs) |
| cmd += ' binary ' |
| cmd += tlvs |
| |
| print(cmd) |
| |
| return self.__sendCommand(cmd)[-1] == 'Done' |
| |
| except Exception as e: |
| ModuleHelper.WriteIntoDebugLogger( |
| 'MGMT_COMM_GET() Error: ' + str(e) |
| ) |
| |
| def MGMT_COMM_SET( |
| self, |
| Addr='ff02::1', |
| xCommissionerSessionID=None, |
| xSteeringData=None, |
| xBorderRouterLocator=None, |
| xChannelTlv=None, |
| ExceedMaxPayload=False, |
| ): |
| """send MGMT_COMM_SET command |
| |
| Returns: |
| True: successful to send MGMT_COMM_SET |
| False: fail to send MGMT_COMM_SET |
| """ |
| print('%s call MGMT_COMM_SET' % self.port) |
| try: |
| cmd = 'commissioner mgmtset' |
| |
| if xCommissionerSessionID is not None: |
| # use assigned session id |
| cmd += ' sessionid ' |
| cmd += str(xCommissionerSessionID) |
| elif xCommissionerSessionID is None: |
| # use original session id |
| if self.isActiveCommissioner is True: |
| cmd += ' sessionid ' |
| cmd += self.__getCommissionerSessionId() |
| else: |
| pass |
| |
| if xSteeringData is not None: |
| cmd += ' steeringdata ' |
| cmd += str(hex(xSteeringData)[2:]) |
| |
| if xBorderRouterLocator is not None: |
| cmd += ' locator ' |
| cmd += str(hex(xBorderRouterLocator)) |
| |
| if xChannelTlv is not None: |
| cmd += ' binary ' |
| cmd += '000300' + '%04x' % xChannelTlv |
| |
| print(cmd) |
| |
| return self.__sendCommand(cmd)[-1] == 'Done' |
| |
| except Exception as e: |
| ModuleHelper.WriteIntoDebugLogger( |
| 'MGMT_COMM_SET() Error: ' + str(e) |
| ) |
| |
| def setActiveDataset(self, listActiveDataset=[]): |
| print('%s call setActiveDataset' % self.port) |
| |
| def setCommisionerMode(self): |
| print('%s call setCommissionerMode' % self.port) |
| |
| def setPSKc(self, strPSKc): |
| print('%s call setPSKc' % self.port) |
| try: |
| cmd = 'dataset pskc %s' % strPSKc |
| self.hasActiveDatasetToCommit = True |
| return self.__sendCommand(cmd)[-1] == 'Done' |
| except Exception as e: |
| ModuleHelper.WriteIntoDebugLogger('setPSKc() Error: ' + str(e)) |
| |
| def setActiveTimestamp(self, xActiveTimestamp): |
| print('%s call setActiveTimestamp' % self.port) |
| try: |
| self.activetimestamp = xActiveTimestamp |
| cmd = 'dataset activetimestamp %s' % str(xActiveTimestamp) |
| self.hasActiveDatasetToCommit = True |
| return self.__sendCommand(cmd)[-1] == 'Done' |
| except Exception as e: |
| ModuleHelper.WriteIntoDebugLogger( |
| 'setActiveTimestamp() Error: ' + str(e) |
| ) |
| |
| def setUdpJoinerPort(self, portNumber): |
| """set Joiner UDP Port |
| |
| Args: |
| portNumber: Joiner UDP Port number |
| |
| Returns: |
| True: successful to set Joiner UDP Port |
| False: fail to set Joiner UDP Port |
| """ |
| print('%s call setUdpJoinerPort' % self.port) |
| cmd = 'joinerport %d' % portNumber |
| print(cmd) |
| return self.__sendCommand(cmd)[-1] == 'Done' |
| |
| def commissionerUnregister(self): |
| """stop commissioner |
| |
| Returns: |
| True: successful to stop commissioner |
| False: fail to stop commissioner |
| """ |
| print('%s call commissionerUnregister' % self.port) |
| cmd = 'commissioner stop' |
| print(cmd) |
| return self.__sendCommand(cmd)[-1] == 'Done' |
| |
| def sendBeacons( |
| self, sAddr, xCommissionerSessionId, listChannelMask, xPanId |
| ): |
| print('%s call sendBeacons' % self.port) |
| self._sendline('scan') |
| return True |
| |
| def updateRouterStatus(self): |
| """force update to router as if there is child id request""" |
| self._update_router_status = True |
| |
| def __updateRouterStatus(self): |
| print('%s call updateRouterStatus' % self.port) |
| cmd = 'state' |
| while True: |
| state = self.__sendCommand(cmd)[0] |
| if state == 'detached': |
| continue |
| elif state == 'child': |
| break |
| else: |
| return False |
| |
| cmd = 'state router' |
| return self.__sendCommand(cmd)[-1] == 'Done' |
| |
| def setRouterThresholdValues(self, upgradeThreshold, downgradeThreshold): |
| print('%s call setRouterThresholdValues' % self.port) |
| self.__setRouterUpgradeThreshold(upgradeThreshold) |
| self.__setRouterDowngradeThreshold(downgradeThreshold) |
| |
| def setMinDelayTimer(self, iSeconds): |
| print('%s call setMinDelayTimer' % self.port) |
| cmd = 'delaytimermin %s' % iSeconds |
| print(cmd) |
| return self.__sendCommand(cmd)[-1] == 'Done' |
| |
| def ValidateDeviceFirmware(self): |
| print('%s call ValidateDeviceFirmware' % self.port) |
| if 'OPENTHREAD' in self.UIStatusMsg: |
| return True |
| else: |
| return False |
| |
| @staticmethod |
| def __lstrip0x(s): |
| """strip 0x at the beginning of a hex string if it exists |
| |
| Args: |
| s: hex string |
| |
| Returns: |
| hex string with leading 0x stripped |
| """ |
| if s.startswith('0x'): |
| s = s[2:] |
| |
| return s |