| #!/usr/bin/env python |
| # |
| # Copyright (c) 2018, The OpenThread Authors. |
| # All rights reserved. |
| # |
| # Redistribution and use in source and binary forms, with or without |
| # modification, are permitted provided that the following conditions are met: |
| # 1. Redistributions of source code must retain the above copyright |
| # notice, this list of conditions and the following disclaimer. |
| # 2. Redistributions in binary form must reproduce the above copyright |
| # notice, this list of conditions and the following disclaimer in the |
| # documentation and/or other materials provided with the distribution. |
| # 3. Neither the name of the copyright holder nor the |
| # names of its contributors may be used to endorse or promote products |
| # derived from this software without specific prior written permission. |
| # |
| # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" |
| # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE |
| # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE |
| # ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE |
| # LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR |
| # CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF |
| # SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS |
| # INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN |
| # CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) |
| # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
| # POSSIBILITY OF SUCH DAMAGE. |
| |
| """ |
| >> Thread Host Controller Interface |
| >> Device : OpenThread_WpanCtl THCI |
| >> Class : OpenThread_WpanCtl |
| """ |
| |
| import re |
| import time |
| import socket |
| import logging |
| from Queue import Queue |
| |
| import serial |
| from IThci import IThci |
| from GRLLibs.UtilityModules.Test import Thread_Device_Role, Device_Data_Requirement, MacType |
| from GRLLibs.UtilityModules.enums import PlatformDiagnosticPacket_Direction, PlatformDiagnosticPacket_Type |
| from GRLLibs.UtilityModules.ModuleHelper import ModuleHelper |
| from GRLLibs.ThreadPacket.PlatformPackets import PlatformDiagnosticPacket, PlatformPackets |
| from GRLLibs.UtilityModules.Plugins.AES_CMAC import Thread_PBKDF2 |
| |
| """regex: used to split lines""" |
| LINESEPX = re.compile(r'\r\n|\n') |
| |
| |
| class OpenThread_WpanCtl(IThci): |
| LOWEST_POSSIBLE_PARTATION_ID = 0x1 |
| LINK_QUALITY_CHANGE_TIME = 100 |
| |
| # Used for reference firmware version control for Test Harness. |
| # This variable will be updated to match the OpenThread reference firmware |
| # officially released. |
| firmwarePrefix = 'OPENTHREAD/' |
| |
| def __init__(self, **kwargs): |
| """initialize the serial port and default network parameters |
| Args: |
| **kwargs: Arbitrary keyword arguments |
| Includes 'EUI' and 'SerialPort' |
| """ |
| try: |
| self.UIStatusMsg = '' |
| self.mac = kwargs.get('EUI') |
| self.handle = None |
| self.AutoDUTEnable = False |
| self._is_net = False # whether device is through ser2net |
| self.logStatus = {'stop': 'stop', 'running': 'running', 'pauseReq': 'pauseReq', 'paused': 'paused'} |
| self.logThreadStatus = self.logStatus['stop'] |
| # connection type 'ip' stands for SSH |
| self.connectType = (kwargs.get('Param5')).strip().lower() if kwargs.get('Param5') is not None else 'usb' |
| # comma separated CLI prompt, wpanctl cmd prefix, Wpan interface |
| (self.prompt, self.wpan_cmd_prefix, self.wpan_interface) = ( |
| kwargs.get('Param8').strip().split(',') if kwargs.get('Param8') else ['#', 'wpanctl', 'wpan0'] |
| ) |
| self.wpan_cmd_prefix += ' ' |
| # comma separated setting commands |
| self.precmd = (kwargs.get('Param9')).strip().split(',') if kwargs.get('Param9') else [] |
| if self.connectType == 'ip': |
| self.dutIpv4 = kwargs.get('TelnetIP') |
| self.dutPort = kwargs.get('TelnetPort') |
| self.port = self.dutIpv4 + ':' + self.dutPort |
| # username for SSH |
| self.username = kwargs.get('Param6').strip() if kwargs.get('Param6') else None |
| # password for SSH |
| self.password = kwargs.get('Param7').strip() if kwargs.get('Param7') else None |
| else: |
| self.port = kwargs.get('SerialPort') |
| self.intialize() |
| except Exception as e: |
| ModuleHelper.WriteIntoDebugLogger('initialize() Error: ' + str(e)) |
| |
| def __del__(self): |
| """close the serial port connection""" |
| try: |
| self.closeConnection() |
| self.deviceConnected = False |
| except Exception as e: |
| ModuleHelper.WriteIntoDebugLogger('delete() Error: ' + str(e)) |
| |
| def _expect(self, expected, times=50): |
| """Find the `expected` line within `times` trials. |
| |
| Args: |
| expected str: the expected string |
| times int: number of trials |
| """ |
| print('[%s] Expecting [%s]' % (self.port, expected)) |
| |
| retry_times = 10 |
| while times > 0 and retry_times > 0: |
| line = self._readline() |
| print('[%s] Got line [%s]' % (self.port, line)) |
| |
| if line == expected: |
| print('[%s] Expected [%s]' % (self.port, expected)) |
| return |
| |
| if not line: |
| retry_times -= 1 |
| time.sleep(0.1) |
| |
| times -= 1 |
| |
| raise Exception('failed to find expected string[%s]' % expected) |
| |
| def _read(self, size=512): |
| logging.info('%s: reading', self.port) |
| if self._is_net: |
| return self.handle.recv(size) |
| else: |
| return self.handle.read(size) |
| |
| def _write(self, data): |
| logging.info('%s: writing', self.port) |
| if self._is_net: |
| self.handle.sendall(data) |
| else: |
| self.handle.write(data) |
| |
| def _readline(self): |
| """Read exactly one line from the device |
| |
| Returns: |
| None on no data |
| """ |
| logging.info('%s: reading line', self.port) |
| if len(self._lines) > 1: |
| return self._lines.pop(0) |
| |
| tail = '' |
| if len(self._lines): |
| tail = self._lines.pop() |
| |
| try: |
| tail += self._read() |
| except socket.error: |
| logging.exception('%s: No new data', self.port) |
| time.sleep(0.1) |
| |
| self._lines += LINESEPX.split(tail) |
| if len(self._lines) > 1: |
| return self._lines.pop(0) |
| |
| def _sendline(self, line): |
| """Send exactly one line to the device |
| |
| Args: |
| line str: data send to device |
| """ |
| logging.info('%s: sending line', self.port) |
| # clear buffer |
| self._lines = [] |
| try: |
| self._read() |
| except socket.error: |
| logging.debug('%s: Nothing cleared', self.port) |
| |
| print('sending [%s]' % line) |
| self._write(line + '\r\n') |
| self._lines = [] |
| # wait for write to complete |
| time.sleep(0.1) |
| |
| def __sendCommand(self, cmd): |
| """send specific command to reference unit over serial port |
| |
| Args: |
| cmd: OpenThread_WpanCtl command string |
| |
| Returns: |
| Fail: Failed to send the command to reference unit and parse it |
| Value: successfully retrieve the desired value from reference unit |
| Error: some errors occur, indicates by the followed specific error number |
| """ |
| logging.info('%s: sendCommand[%s]', self.port, cmd) |
| if self.logThreadStatus == self.logStatus['running']: |
| self.logThreadStatus = self.logStatus['pauseReq'] |
| while self.logThreadStatus not in (self.logStatus['paused'], self.logStatus['stop']): |
| pass |
| |
| ssh_stdin = None |
| ssh_stdout = None |
| ssh_stderr = None |
| try: |
| # command retransmit times |
| retry_times = 3 |
| while retry_times > 0: |
| retry_times -= 1 |
| try: |
| if self._is_net: |
| ssh_stdin, ssh_stdout, ssh_stderr = self.handle.exec_command(cmd) |
| else: |
| self._sendline(cmd) |
| self._expect(cmd) |
| except Exception as e: |
| logging.exception('%s: failed to send command[%s]: %s', self.port, cmd, str(e)) |
| if retry_times == 0: |
| raise |
| else: |
| break |
| |
| line = None |
| response = [] |
| retry_times = 20 |
| stdout_lines = [] |
| stderr_lines = [] |
| if self._is_net: |
| stdout_lines = ssh_stdout.readlines() |
| stderr_lines = ssh_stderr.readlines() |
| if stderr_lines: |
| for stderr_line in stderr_lines: |
| if re.search(r'Not\s+Found|failed\s+with\s+error', stderr_line.strip(), re.M | re.I): |
| print('Command failed: %s' % stderr_line) |
| return 'Fail' |
| print('Got line: %s' % stderr_line) |
| logging.info('%s: the read line is[%s]', self.port, stderr_line) |
| response.append(str(stderr_line.strip())) |
| elif stdout_lines: |
| for stdout_line in stdout_lines: |
| logging.info('%s: the read line is[%s]', self.port, stdout_line) |
| if re.search(r'Not\s+Found|failed\s+with\s+error', stdout_line.strip(), re.M | re.I): |
| print('Command failed') |
| return 'Fail' |
| print('Got line: ' + stdout_line) |
| logging.info('%s: send command[%s] done!', self.port, cmd) |
| response.append(str(stdout_line.strip())) |
| response.append(self.prompt) |
| return response |
| else: |
| while retry_times > 0: |
| line = self._readline() |
| print('read line: %s' % line) |
| logging.info('%s: the read line is[%s]', self.port, line) |
| if line: |
| response.append(line) |
| if re.match(self.prompt, line): |
| break |
| elif re.search(r'Not\s+Found|failed\s+with\s+error', line, re.M | re.I): |
| print('Command failed') |
| return 'Fail' |
| |
| retry_times -= 1 |
| time.sleep(0.1) |
| |
| if retry_times == 0: |
| raise Exception('%s: failed to find end of response' % self.port) |
| logging.info('%s: send command[%s] done!', self.port, cmd) |
| return response |
| except Exception as e: |
| ModuleHelper.WriteIntoDebugLogger('sendCommand() Error: ' + str(e)) |
| raise |
| |
| def __stripValue(self, value): |
| """strip the special characters in the value |
| |
| Args: |
| value: value string |
| |
| Returns: |
| value string without special characters |
| """ |
| if isinstance(value, str): |
| if (value[0] == '"' and value[-1] == '"') or (value[0] == '[' and value[-1] == ']'): |
| return value[1:-1] |
| return value |
| |
| def __padIp6Addr(self, ip6Addr): |
| segments = ip6Addr.split(':') |
| empty = None |
| for i, element in enumerate(segments): |
| if empty is None and len(element) == 0: |
| empty = i |
| elif len(element) < 4: |
| segments[i] = '0' * (4 - len(element)) + element |
| |
| if empty is not None: |
| segments = segments[:empty] + ['0000'] * (8 - len(segments) + 1) + segments[empty + 1 :] |
| |
| return ':'.join(segments) |
| |
| def __setDeviceMode(self, mode): |
| """set thread device mode: |
| |
| Args: |
| mode: thread device mode. 15=rsdn, 13=rsn, 4=s |
| r: rx-on-when-idle |
| s: secure IEEE 802.15.4 data request |
| d: full thread device |
| n: full network data |
| |
| Returns: |
| True: successful to set the device mode |
| False: fail to set the device mode |
| """ |
| print('call __setDeviceMode') |
| |
| try: |
| cmd = self.wpan_cmd_prefix + 'setprop Thread:DeviceMode %d' % mode |
| return self.__sendCommand(cmd)[0] != 'Fail' |
| except Exception as e: |
| ModuleHelper.WriteIntoDebugLogger('setDeviceMode() Error: ' + str(e)) |
| |
| def __setRouterUpgradeThreshold(self, iThreshold): |
| """set router upgrade threshold |
| |
| Args: |
| iThreshold: the number of active routers on the Thread network |
| partition below which a REED may decide to become a Router. |
| |
| Returns: |
| True: successful to set the ROUTER_UPGRADE_THRESHOLD |
| False: fail to set ROUTER_UPGRADE_THRESHOLD |
| """ |
| print('call __setRouterUpgradeThreshold') |
| try: |
| cmd = self.wpan_cmd_prefix + 'setprop Thread:RouterUpgradeThreshold %s' % str(iThreshold) |
| return self.__sendCommand(cmd)[0] != 'Fail' |
| except Exception as e: |
| ModuleHelper.WriteIntoDebugLogger('setRouterUpgradeThreshold() Error: ' + str(e)) |
| |
| def __setRouterDowngradeThreshold(self, iThreshold): |
| """set router downgrade threshold |
| |
| Args: |
| iThreshold: the number of active routers on the Thread network |
| partition above which an active router may decide to |
| become a child. |
| |
| Returns: |
| True: successful to set the ROUTER_DOWNGRADE_THRESHOLD |
| False: fail to set ROUTER_DOWNGRADE_THRESHOLD |
| """ |
| print('call __setRouterDowngradeThreshold') |
| try: |
| cmd = self.wpan_cmd_prefix + 'setprop Thread:RouterDowngradeThreshold %s' % str(iThreshold) |
| return self.__sendCommand(cmd)[0] != 'Fail' |
| except Exception as e: |
| ModuleHelper.WriteIntoDebugLogger('setRouterDowngradeThreshold() Error: ' + str(e)) |
| |
| def __setRouterSelectionJitter(self, iRouterJitter): |
| """set ROUTER_SELECTION_JITTER parameter for REED to upgrade to Router |
| |
| Args: |
| iRouterJitter: a random period prior to request Router ID for REED |
| |
| Returns: |
| True: successful to set the ROUTER_SELECTION_JITTER |
| False: fail to set ROUTER_SELECTION_JITTER |
| """ |
| print('call _setRouterSelectionJitter') |
| try: |
| cmd = self.wpan_cmd_prefix + 'setprop Thread:RouterSelectionJitter %s' % str(iRouterJitter) |
| return self.__sendCommand(cmd) != 'Fail' |
| except Exception as e: |
| ModuleHelper.WriteIntoDebugLogger('setRouterSelectionJitter() Error: ' + str(e)) |
| |
| def __setAddressfilterMode(self, mode): |
| """set address filter mode |
| |
| Returns: |
| True: successful to set address filter mode. |
| False: fail to set address filter mode. |
| """ |
| print('call setAddressFilterMode() %s' % mode) |
| try: |
| if re.match('list', mode, re.M | re.I): |
| cmd = self.wpan_cmd_prefix + 'setprop MAC:' + mode + ':Enabled 1' |
| elif mode == 'disabled': |
| cmd = self.wpan_cmd_prefix + 'setprop MAC:' + mode + ':Enabled 0' |
| else: |
| print('no such option') |
| return False |
| if self.__sendCommand(cmd)[0] != 'Fail': |
| return True |
| return False |
| except Exception as e: |
| ModuleHelper.WriteIntoDebugLogger('__setAddressFilterMode() Error: ' + str(e)) |
| |
| def __startOpenThreadWpan(self): |
| """start OpenThreadWpan |
| |
| Returns: |
| True: successful to start OpenThreadWpan up |
| False: fail to start OpenThreadWpan |
| """ |
| print('call startOpenThreadWpan') |
| try: |
| |
| # restore whitelist/blacklist address filter mode if rejoin after |
| # reset |
| if self.isPowerDown: |
| if self._addressfilterMode == 'whitelist': |
| if self.__setAddressfilterMode('whitelist'): |
| for addr in self._addressfilterSet: |
| self.addAllowMAC(addr) |
| elif self._addressfilterMode == 'blacklist': |
| if self.__setAddressfilterMode('blacklist'): |
| for addr in self._addressfilterSet: |
| self.addBlockedMAC(addr) |
| time.sleep(1) |
| if ModuleHelper.LeaderDutChannelFound: |
| self.channel = ModuleHelper.Default_Channel |
| nodeType = 'router' |
| startType = 'join' |
| if self.deviceRole == Thread_Device_Role.Leader: |
| nodeType = 'router' |
| startType = 'form' |
| elif self.deviceRole == Thread_Device_Role.Router: |
| nodeType = 'router' |
| elif self.deviceRole == Thread_Device_Role.SED: |
| nodeType = 'sed' |
| elif self.deviceRole == Thread_Device_Role.EndDevice: |
| nodeType = 'end' |
| elif self.deviceRole == Thread_Device_Role.REED: |
| nodeType = 'router' |
| elif self.deviceRole == Thread_Device_Role.EndDevice_FED: |
| nodeType = 'router' |
| elif self.deviceRole == Thread_Device_Role.EndDevice_MED: |
| nodeType = 'end' |
| else: |
| pass |
| |
| if self.deviceRole in [Thread_Device_Role.Leader, Thread_Device_Role.Router, Thread_Device_Role.REED]: |
| self.__setRouterSelectionJitter(1) |
| |
| if startType == 'form': |
| startCmd = self.wpan_cmd_prefix + '%s %s -c %s -T %s ' % ( |
| startType, |
| self.networkName, |
| str(self.channel), |
| nodeType, |
| ) |
| else: |
| startCmd = self.wpan_cmd_prefix + '%s %s -p %s -c %s -T %s ' % ( |
| startType, |
| self.networkName, |
| str(hex(self.panId)), |
| str(self.channel), |
| nodeType, |
| ) |
| if self.__sendCommand(startCmd)[0] != 'Fail': |
| if self.__isOpenThreadWpanRunning(): |
| self.isPowerDown = False |
| if self.hasActiveDatasetToCommit: |
| if self.__sendCommand(self.wpan_cmd_prefix + 'setprop Dataset:Command SetActive')[0] == 'Fail': |
| raise Exception('failed to commit active dataset') |
| else: |
| self.hasActiveDatasetToCommit = False |
| |
| return True |
| else: |
| return False |
| except Exception as e: |
| ModuleHelper.WriteIntoDebugLogger('startOpenThreadWpan() Error: ' + str(e)) |
| |
| def __stopOpenThreadWpan(self): |
| """stop OpenThreadWpan |
| |
| Returns: |
| True: successfully stop OpenThreadWpan |
| False: failed to stop OpenThreadWpan |
| """ |
| print('call stopOpenThreadWpan') |
| try: |
| if ( |
| self.__sendCommand(self.wpan_cmd_prefix + 'leave')[0] != 'Fail' |
| and self.__sendCommand(self.wpan_cmd_prefix + 'dataset erase')[0] != 'Fail' |
| ): |
| return True |
| else: |
| return False |
| except Exception as e: |
| ModuleHelper.WriteIntoDebugLogger('stopOpenThreadWpan() Error: ' + str(e)) |
| |
| def __isOpenThreadWpanRunning(self): |
| """check whether or not OpenThreadWpan is running |
| |
| Returns: |
| True: OpenThreadWpan is running |
| False: OpenThreadWpan is not running |
| """ |
| print('call __isOpenThreadWpanRunning') |
| if self.__stripValue(self.__sendCommand(self.wpan_cmd_prefix + 'getprop -v NCP:State')[0]) == 'associated': |
| print('*****OpenThreadWpan is running') |
| return True |
| else: |
| print('*****Wrong OpenThreadWpan state') |
| return False |
| |
| # rloc16 might be hex string or integer, need to return actual allocated |
| # router id |
| def __convertRlocToRouterId(self, xRloc16): |
| """mapping Rloc16 to router id |
| |
| Args: |
| xRloc16: hex rloc16 short address |
| |
| Returns: |
| actual router id allocated by leader |
| """ |
| routerList = [] |
| routerList = self.__sendCommand(self.wpan_cmd_prefix + 'getprop -v Thread:RouterTable') |
| print(routerList) |
| print(xRloc16) |
| |
| for line in routerList: |
| if re.match(r'\[|\]', line): |
| continue |
| if re.match(self.prompt, line, re.M | re.I): |
| break |
| router = [] |
| router = self.__stripValue(line).split(',') |
| |
| for item in router: |
| if 'RouterId' in item: |
| routerid = item.split(':')[1] |
| elif 'RLOC16' in line: |
| rloc16 = line.split(':')[1] |
| else: |
| pass |
| |
| # process input rloc16 |
| if isinstance(xRloc16, str): |
| rloc16 = '0x' + rloc16 |
| if rloc16 == xRloc16: |
| return routerid |
| elif isinstance(xRloc16, int): |
| if int(rloc16, 16) == xRloc16: |
| return routerid |
| else: |
| pass |
| |
| return None |
| |
| def __convertIp6PrefixStringToIp6Address(self, strIp6Prefix): |
| """convert IPv6 prefix string to IPv6 dotted-quad format |
| for example: |
| 2001000000000000 -> 2001:0000:0000:0000:: |
| |
| Args: |
| strIp6Prefix: IPv6 address string |
| |
| Returns: |
| IPv6 address dotted-quad format |
| """ |
| prefix1 = strIp6Prefix.rstrip('L') |
| prefix2 = self.__lstrip0x(prefix1) |
| hexPrefix = str(prefix2).ljust(16, '0') |
| hexIter = iter(hexPrefix) |
| finalMac = ':'.join(a + b + c + d for a, b, c, d in zip(hexIter, hexIter, hexIter, hexIter)) |
| prefix = str(finalMac) |
| strIp6Prefix = prefix[:19] |
| return strIp6Prefix + '::' |
| |
| def __convertLongToHex(self, iValue, fillZeros=None): |
| """convert a long hex integer to string |
| remove '0x' and 'L' return string |
| |
| Args: |
| iValue: long integer in hex format |
| fillZeros: pad string with zeros on the left to specified width |
| |
| Returns: |
| string of this long integer without '0x' and 'L' |
| """ |
| fmt = '%x' |
| if fillZeros is not None: |
| fmt = '%%0%dx' % fillZeros |
| |
| return fmt % iValue |
| |
| def __convertChannelMask(self, channelsArray): |
| """convert channelsArray to bitmask format |
| |
| Args: |
| channelsArray: channel array (i.e. [21, 22]) |
| |
| Returns: |
| bitmask format corresponding to a given channel array |
| """ |
| maskSet = 0 |
| |
| for eachChannel in channelsArray: |
| mask = 1 << eachChannel |
| maskSet = maskSet | mask |
| |
| return maskSet |
| |
| def __ChannelMaskListToStr(self, channelList): |
| """convert a channel list to a string |
| |
| Args: |
| channelList: channel list (i.e. [21, 22, 23]) |
| |
| Returns: |
| a comma separated channel string (i.e. '21, 22, 23') |
| """ |
| chan_mask_range = '' |
| if isinstance(channelList, list): |
| if len(channelList): |
| chan_mask_range = ','.join(str(chan) for chan in channelList) |
| else: |
| print('empty list') |
| else: |
| print('not a valid channel list: %s' % channelList) |
| |
| return chan_mask_range |
| |
| def __setChannelMask(self, channelMask): |
| print('call _setChannelMask') |
| try: |
| cmd = self.wpan_cmd_prefix + 'setprop NCP:ChannelMask %s' % channelMask |
| datasetCmd = self.wpan_cmd_prefix + 'setprop Dataset:ChannelMaskPage0 %s' % channelMask |
| self.hasActiveDatasetToCommit = True |
| return self.__sendCommand(cmd)[0] != 'Fail' and self.__sendCommand(datasetCmd)[0] != 'Fail' |
| except Exception as e: |
| ModuleHelper.WriteIntoDebugLogger('setChannelMask() Error: ' + str(e)) |
| |
| def __setSecurityPolicy(self, securityPolicySecs, securityPolicyFlags): |
| print('call _setSecurityPolicy') |
| try: |
| cmd1 = self.wpan_cmd_prefix + 'setprop Dataset:SecPolicy:KeyRotation %s' % str(securityPolicySecs) |
| if securityPolicyFlags == 'onrcb': |
| cmd2 = self.wpan_cmd_prefix + 'setprop Dataset:SecPolicy:Flags 0xff' |
| else: |
| print('unknown policy flag :' + securityPolicyFlags) |
| return False |
| self.hasActiveDatasetToCommit = True |
| return self.__sendCommand(cmd1) != 'Fail' and self.__sendCommand(cmd2) != 'Fail' |
| except Exception as e: |
| ModuleHelper.WriteIntoDebugLogger('setSecurityPolicy() Error: ' + str(e)) |
| |
| def __setKeySwitchGuardTime(self, iKeySwitchGuardTime): |
| """ set the Key switch guard time |
| |
| Args: |
| iKeySwitchGuardTime: key switch guard time |
| |
| Returns: |
| True: successful to set key switch guard time |
| False: fail to set key switch guard time |
| """ |
| print('%s call setKeySwitchGuardTime' % self.port) |
| print(iKeySwitchGuardTime) |
| try: |
| cmd = self.wpan_cmd_prefix + 'setprop Network:KeySwitchGuardTime %s' % str(iKeySwitchGuardTime) |
| if self.__sendCommand(cmd)[0] != 'Fail': |
| time.sleep(1) |
| return True |
| else: |
| return False |
| except Exception as e: |
| ModuleHelper.WriteIntoDebugLogger('setKeySwitchGuardTime() Error; ' + str(e)) |
| |
| def __getCommissionerSessionId(self): |
| """ get the commissioner session id allocated from Leader """ |
| print('%s call getCommissionerSessionId' % self.port) |
| return self.__sendCommand(self.wpan_cmd_prefix + 'getprop -v Commissioner:SessionId')[0] |
| |
| def __getJoinerState(self): |
| """ get joiner state """ |
| maxDuration = 150 # seconds |
| t_end = time.time() + maxDuration |
| while time.time() < t_end: |
| joinerState = self.__stripValue(self.__sendCommand(self.wpan_cmd_prefix + 'getprop -v NCP:State')[0]) |
| if joinerState == 'offline:commissioned': |
| return True |
| elif joinerState == 'associating:credentials-needed': |
| return False |
| else: |
| time.sleep(5) |
| continue |
| return False |
| |
| def _connect(self): |
| if self.connectType == 'usb': |
| print('My port is %s' % self.port) |
| try: |
| self.handle = serial.Serial(self.port, 115200, timeout=0.2) |
| except Exception as e: |
| ModuleHelper.WriteIntoDebugLogger('open serial error ' + str(e)) |
| |
| try: |
| attempts = 0 |
| user_prompted = False |
| pwd_prompted = False |
| while attempts < 10 or pwd_prompted: |
| time.sleep(0.5) |
| attempts = attempts + 1 |
| print('attempts...%d' % attempts) |
| |
| input_data = self.handle.read(self.handle.inWaiting()) |
| |
| if not input_data: |
| if not user_prompted: |
| self.handle.write('\n') |
| time.sleep(0.5) |
| continue |
| |
| if 'login' in input_data: |
| self.handle.write(self.username + '\n') |
| time.sleep(0.3) |
| print('user prompted') |
| user_prompted = True |
| |
| elif 'Password' in input_data: |
| print('password prompted') |
| time.sleep(0.3) |
| self.handle.write(self.password + '\n') |
| pwd_prompted = True |
| |
| elif self.prompt in input_data: |
| print('login success (serial)') |
| time.sleep(0.3) |
| self.deviceConnected = True |
| for precmd in self.precmd: |
| self.handle.write(precmd + '\n') |
| time.sleep(0.3) |
| time.sleep(1) |
| break |
| if not self.deviceConnected: |
| raise Exception('login fail') |
| else: |
| self._is_net = False |
| except Exception as e: |
| ModuleHelper.WriteIntoDebugLogger('connect to serial Error: ' + str(e)) |
| |
| elif self.connectType == 'ip': |
| print('My IP: %s Port: %s' % (self.dutIpv4, self.dutPort)) |
| try: |
| import paramiko |
| |
| if not self.password: |
| transport = paramiko.Transport((self.dutIpv4, int(self.dutPort))) |
| transport.start_client() |
| transport.auth_none(self.username) |
| self.handle = paramiko.SSHClient() |
| self.handle.set_missing_host_key_policy(paramiko.AutoAddPolicy()) |
| self.handle._transport = transport |
| else: |
| self.handle = paramiko.SSHClient() |
| self.handle.set_missing_host_key_policy(paramiko.AutoAddPolicy()) |
| self.handle.connect( |
| self.dutIpv4, port=int(self.dutPort), username=self.username, password=self.password |
| ) |
| print('login success (ssh)') |
| self.deviceConnected = True |
| for precmd in self.precmd: |
| self.handle.exec_command(precmd + '\n') |
| time.sleep(0.5) |
| self._is_net = True |
| |
| except Exception as e: |
| ModuleHelper.WriteIntoDebugLogger('connect to ssh Error: ' + str(e)) |
| else: |
| raise Exception('Unknown port schema') |
| |
| def closeConnection(self): |
| """close current serial port connection""" |
| print('%s call closeConnection' % self.port) |
| try: |
| if self.handle: |
| self.handle.close() |
| self.handle = None |
| except Exception as e: |
| ModuleHelper.WriteIntoDebugLogger('closeConnection() Error: ' + str(e)) |
| |
| def intialize(self): |
| """initialize the serial port with baudrate, timeout parameters""" |
| print('%s call intialize' % self.port) |
| try: |
| # init serial port |
| self.deviceConnected = False |
| self._connect() |
| |
| if self.deviceConnected: |
| self.UIStatusMsg = self.getVersionNumber() |
| if self.firmwarePrefix not in self.UIStatusMsg: |
| self.deviceConnected = False |
| self.UIStatusMsg = ( |
| 'Firmware Not Matching Expecting ' + self.firmwarePrefix + ' Now is ' + self.UIStatusMsg |
| ) |
| raise Exception('Err: OpenThread device Firmware not matching..') |
| self.__sendCommand(self.wpan_cmd_prefix + 'leave') |
| self.__sendCommand(self.wpan_cmd_prefix + 'dataset erase') |
| else: |
| raise Exception('Err: Device not connected ..') |
| |
| except Exception as e: |
| ModuleHelper.WriteIntoDebugLogger('intialize() Error: ' + str(e)) |
| |
| def setNetworkName(self, networkName='GRL'): |
| """set Thread Network name |
| |
| Args: |
| networkName: the networkname string to be set |
| |
| Returns: |
| True: successful to set the Thread Networkname |
| False: fail to set the Thread Networkname |
| """ |
| print('%s call setNetworkName' % self.port) |
| |
| try: |
| cmd = self.wpan_cmd_prefix + 'setprop -s Network:Name %s' % networkName |
| datasetCmd = self.wpan_cmd_prefix + 'setprop Dataset:NetworkName %s' % networkName |
| self.hasActiveDatasetToCommit = True |
| return self.__sendCommand(cmd)[0] != 'Fail' and self.__sendCommand(datasetCmd)[0] != 'Fail' |
| except Exception as e: |
| ModuleHelper.WriteIntoDebugLogger('setNetworkName() Error: ' + str(e)) |
| |
| def setChannel(self, channel=15): |
| """set channel of Thread device operates on. |
| |
| Args: |
| channel: |
| (0 - 10: Reserved) |
| (11 - 26: 2.4GHz channels) |
| (27 - 65535: Reserved) |
| |
| Returns: |
| True: successful to set the channel |
| False: fail to set the channel |
| """ |
| print('%s call setChannel' % self.port) |
| |
| try: |
| cmd = self.wpan_cmd_prefix + 'setprop NCP:Channel %s' % channel |
| datasetCmd = self.wpan_cmd_prefix + 'setprop Dataset:Channel %s' % channel |
| self.hasActiveDatasetToCommit = True |
| return self.__sendCommand(cmd)[0] != 'Fail' and self.__sendCommand(datasetCmd)[0] != 'Fail' |
| except Exception as e: |
| ModuleHelper.WriteIntoDebugLogger('setChannel() Error: ' + str(e)) |
| |
| def getChannel(self): |
| """get current channel""" |
| print('%s call getChannel' % self.port) |
| return self.__sendCommand(self.wpan_cmd_prefix + 'getprop -v NCP:Channel')[0] |
| |
| def setMAC(self, xEUI): |
| """set the extended addresss of Thread device |
| |
| Args: |
| xEUI: extended address in hex format |
| |
| Returns: |
| True: successful to set the extended address |
| False: fail to set the extended address |
| """ |
| print('%s call setMAC' % self.port) |
| |
| address64 = '' |
| try: |
| if not xEUI: |
| address64 = self.mac |
| |
| if not isinstance(xEUI, str): |
| address64 = self.__convertLongToHex(xEUI, 16) |
| else: |
| address64 = xEUI |
| |
| cmd = self.wpan_cmd_prefix + 'setprop NCP:MACAddress %s' % address64 |
| |
| if self.__sendCommand(cmd)[0] != 'Fail': |
| self.mac = address64 |
| return True |
| else: |
| return False |
| except Exception as e: |
| ModuleHelper.WriteIntoDebugLogger('setMAC() Error: ' + str(e)) |
| |
| def getMAC(self, bType=MacType.RandomMac): |
| """get one specific type of MAC address |
| currently OpenThreadWpan only supports Random MAC address |
| |
| Args: |
| bType: indicate which kind of MAC address is required |
| |
| Returns: |
| specific type of MAC address |
| """ |
| print('%s call getMAC' % self.port) |
| |
| # if power down happens, return extended address assigned previously |
| if self.isPowerDown: |
| macAddr64 = self.mac |
| else: |
| if bType == MacType.FactoryMac: |
| macAddr64 = self.__stripValue( |
| self.__sendCommand(self.wpan_cmd_prefix + 'getprop -v NCP:HardwareAddress')[0] |
| ) |
| elif bType == MacType.HashMac: |
| macAddr64 = self.__stripValue( |
| self.__sendCommand(self.wpan_cmd_prefix + 'getprop -v NCP:MACAddress')[0] |
| ) |
| else: |
| macAddr64 = self.__stripValue( |
| self.__sendCommand(self.wpan_cmd_prefix + 'getprop -v NCP:ExtendedAddress')[0] |
| ) |
| |
| return int(macAddr64, 16) |
| |
| def getLL64(self): |
| """get link local unicast IPv6 address""" |
| print('%s call getLL64' % self.port) |
| return self.__stripValue(self.__sendCommand(self.wpan_cmd_prefix + 'getprop -v IPv6:LinkLocalAddress')[0]) |
| |
| def getMLEID(self): |
| """get mesh local endpoint identifier address""" |
| print('%s call getMLEID' % self.port) |
| return self.__stripValue(self.__sendCommand(self.wpan_cmd_prefix + 'getprop -v IPv6:MeshLocalAddress')[0]) |
| |
| def getRloc16(self): |
| """get rloc16 short address""" |
| print('%s call getRloc16' % self.port) |
| rloc16 = self.__sendCommand(self.wpan_cmd_prefix + 'getprop -v Thread:RLOC16')[0] |
| return int(rloc16, 16) |
| |
| def getRloc(self): |
| """get router locator unicast IPv6 address""" |
| print('%s call getRloc' % self.port) |
| prefix = self.__stripValue(self.__sendCommand(self.wpan_cmd_prefix + 'getprop -v IPv6:MeshLocalPrefix')[0]) |
| mlprefix = prefix.split('/')[0] |
| rloc16 = self.__lstrip0x(self.__sendCommand(self.wpan_cmd_prefix + 'getprop -v Thread:RLOC16')[0]) |
| |
| print('prefix: %s' % prefix) |
| print('mlprefix: %s ' % mlprefix) |
| print('rloc16: %s' % rloc16) |
| |
| rloc = self.__padIp6Addr(mlprefix + '00ff:fe00:' + rloc16) |
| print('rloc: %s' % rloc) |
| return rloc |
| |
| def getGlobal(self): |
| """get global unicast IPv6 address set |
| if configuring multiple entries |
| """ |
| print('%s call getGlobal' % self.port) |
| globalAddrs = [] |
| |
| mleid = self.__stripValue(self.__sendCommand(self.wpan_cmd_prefix + 'getprop -v IPv6:MeshLocalAddress')[0]) |
| |
| mleid = ModuleHelper.GetFullIpv6Address(mleid).lower() |
| |
| addrs = self.__sendCommand(self.wpan_cmd_prefix + 'getprop -v IPv6:AllAddresses') |
| |
| # find rloc address firstly as a reference for current mesh local prefix as for some TCs, |
| # mesh local prefix may be updated through pending dataset management. |
| for ip6AddrItem in addrs: |
| if re.match(r'\[|\]', ip6AddrItem): |
| continue |
| if re.match(self.prompt, ip6AddrItem, re.M | re.I): |
| break |
| ip6AddrItem = ip6AddrItem.strip() |
| ip6Addr = self.__stripValue(ip6AddrItem).split(' ')[0] |
| |
| fullIp = ModuleHelper.GetFullIpv6Address(ip6Addr).lower() |
| |
| print('fullip %s' % fullIp) |
| |
| if fullIp.startswith('fe80'): |
| continue |
| |
| if fullIp.startswith(mleid[0:19]): |
| continue |
| |
| print('global') |
| globalAddrs.append(fullIp) |
| |
| return globalAddrs |
| |
| def setNetworkKey(self, key): |
| """set Thread Network master key |
| |
| Args: |
| key: Thread Network master key used in secure the MLE/802.15.4 packet |
| |
| Returns: |
| True: successful to set the Thread Network master key |
| False: fail to set the Thread Network master key |
| """ |
| masterKey = '' |
| print('%s call setNetworkKey' % self.port) |
| |
| try: |
| if not isinstance(key, str): |
| masterKey = self.__convertLongToHex(key, 32) |
| cmd = self.wpan_cmd_prefix + 'setprop Network:Key %s' % masterKey |
| datasetCmd = self.wpan_cmd_prefix + 'setprop Dataset:MasterKey %s' % masterKey |
| else: |
| masterKey = key |
| cmd = self.wpan_cmd_prefix + 'setprop Network:Key %s' % masterKey |
| datasetCmd = self.wpan_cmd_prefix + 'setprop Dataset:MasterKey %s' % masterKey |
| |
| self.networkKey = masterKey |
| self.hasActiveDatasetToCommit = True |
| return self.__sendCommand(cmd)[0] != 'Fail' and self.__sendCommand(datasetCmd)[0] != 'Fail' |
| except Exception as e: |
| ModuleHelper.WriteIntoDebugLogger('setNetworkkey() Error: ' + str(e)) |
| |
| def addBlockedMAC(self, xEUI): |
| """add a given extended address to the blacklist entry |
| |
| Args: |
| xEUI: extended address in hex format |
| |
| Returns: |
| True: successful to add a given extended address to the blacklist entry |
| False: fail to add a given extended address to the blacklist entry |
| """ |
| print('%s call addBlockedMAC' % self.port) |
| print(xEUI) |
| if isinstance(xEUI, str): |
| macAddr = xEUI |
| else: |
| macAddr = self.__convertLongToHex(xEUI) |
| |
| try: |
| # if blocked device is itself |
| if macAddr == self.mac: |
| print('block device itself') |
| return True |
| |
| if self._addressfilterMode != 'blacklist': |
| if self.__setAddressfilterMode('Blacklist'): |
| self._addressfilterMode = 'blacklist' |
| |
| cmd = self.wpan_cmd_prefix + 'insert MAC:Blacklist:Entries %s' % macAddr |
| ret = self.__sendCommand(cmd)[0] != 'Fail' |
| |
| self._addressfilterSet.add(macAddr) |
| print('current blacklist entries:') |
| for addr in self._addressfilterSet: |
| print(addr) |
| |
| return ret |
| except Exception as e: |
| ModuleHelper.WriteIntoDebugLogger('addBlockedMAC() Error: ' + str(e)) |
| |
| def addAllowMAC(self, xEUI): |
| """add a given extended address to the whitelist addressfilter |
| |
| Args: |
| xEUI: a given extended address in hex format |
| |
| Returns: |
| True: successful to add a given extended address to the whitelist entry |
| False: fail to add a given extended address to the whitelist entry |
| """ |
| print('%s call addAllowMAC' % self.port) |
| print(xEUI) |
| if isinstance(xEUI, str): |
| macAddr = xEUI |
| else: |
| macAddr = self.__convertLongToHex(xEUI) |
| |
| try: |
| if self._addressfilterMode != 'whitelist': |
| if self.__setAddressfilterMode('Whitelist'): |
| self._addressfilterMode = 'whitelist' |
| |
| cmd = self.wpan_cmd_prefix + 'insert MAC:Whitelist:Entries %s' % macAddr |
| ret = self.__sendCommand(cmd)[0] != 'Fail' |
| |
| self._addressfilterSet.add(macAddr) |
| print('current whitelist entries:') |
| for addr in self._addressfilterSet: |
| print(addr) |
| return ret |
| |
| except Exception as e: |
| ModuleHelper.WriteIntoDebugLogger('addAllowMAC() Error: ' + str(e)) |
| |
| def clearBlockList(self): |
| """clear all entries in blacklist table |
| |
| Returns: |
| True: successful to clear the blacklist |
| False: fail to clear the blacklist |
| """ |
| print('%s call clearBlockList' % self.port) |
| |
| # remove all entries in blacklist |
| try: |
| print('clearing blacklist entries:') |
| for addr in self._addressfilterSet: |
| print(addr) |
| |
| # disable blacklist |
| if self.__setAddressfilterMode('disable'): |
| self._addressfilterMode = 'disable' |
| # clear ops |
| cmd = self.wpan_cmd_prefix + 'remove MAC:Blocklist:Entries' |
| if self.__sendCommand(cmd)[0] != 'Fail': |
| self._addressfilterSet.clear() |
| return True |
| return False |
| except Exception as e: |
| ModuleHelper.WriteIntoDebugLogger('clearBlockList() Error: ' + str(e)) |
| |
| def clearAllowList(self): |
| """clear all entries in whitelist table |
| |
| Returns: |
| True: successful to clear the whitelist |
| False: fail to clear the whitelist |
| """ |
| print('%s call clearAllowList' % self.port) |
| |
| # remove all entries in whitelist |
| try: |
| print('clearing whitelist entries:') |
| for addr in self._addressfilterSet: |
| print(addr) |
| |
| # disable whitelist |
| if self.__setAddressfilterMode('disable'): |
| self._addressfilterMode = 'disable' |
| # clear ops |
| cmd = self.wpan_cmd_prefix + 'insert MAC:Whitelist:Entries' |
| if self.__sendCommand(cmd)[0] != 'Fail': |
| self._addressfilterSet.clear() |
| return True |
| return False |
| except Exception as e: |
| ModuleHelper.WriteIntoDebugLogger('clearAllowList() Error: ' + str(e)) |
| |
| def getDeviceRole(self): |
| """get current device role in Thread Network""" |
| print('%s call getDeviceRole' % self.port) |
| return self.__stripValue(self.__sendCommand(self.wpan_cmd_prefix + 'getprop -v Network:NodeType')[0]) |
| |
| def joinNetwork(self, eRoleId): |
| """make device ready to join the Thread Network with a given role |
| |
| Args: |
| eRoleId: a given device role id |
| |
| Returns: |
| True: ready to set Thread Network parameter for joining desired Network |
| """ |
| print('%s call joinNetwork' % self.port) |
| print(eRoleId) |
| |
| self.deviceRole = eRoleId |
| mode = 15 |
| try: |
| if ModuleHelper.LeaderDutChannelFound: |
| self.channel = ModuleHelper.Default_Channel |
| |
| # FIXME: when Harness call setNetworkDataRequirement()? |
| # only sleep end device requires stable networkdata now |
| if eRoleId == Thread_Device_Role.Leader: |
| print('join as leader') |
| # rsdn |
| mode = 15 |
| if self.AutoDUTEnable is False: |
| # set ROUTER_DOWNGRADE_THRESHOLD |
| self.__setRouterDowngradeThreshold(33) |
| elif eRoleId == Thread_Device_Role.Router: |
| print('join as router') |
| # rsdn |
| mode = 15 |
| if self.AutoDUTEnable is False: |
| # set ROUTER_DOWNGRADE_THRESHOLD |
| self.__setRouterDowngradeThreshold(33) |
| elif eRoleId == Thread_Device_Role.SED: |
| print('join as sleepy end device') |
| # s |
| mode = 4 |
| self.__setPollPeriod(self.__sedPollPeriod) |
| elif eRoleId == Thread_Device_Role.EndDevice: |
| print('join as end device') |
| # rsn |
| mode = 13 |
| elif eRoleId == Thread_Device_Role.REED: |
| print('join as REED') |
| # rsdn |
| mode = 15 |
| # set ROUTER_UPGRADE_THRESHOLD |
| self.__setRouterUpgradeThreshold(0) |
| elif eRoleId == Thread_Device_Role.EndDevice_FED: |
| # always remain an ED, never request to be a router |
| print('join as FED') |
| # rsdn |
| mode = 15 |
| # set ROUTER_UPGRADE_THRESHOLD |
| self.__setRouterUpgradeThreshold(0) |
| elif eRoleId == Thread_Device_Role.EndDevice_MED: |
| print('join as MED') |
| # rsn |
| mode = 13 |
| else: |
| pass |
| |
| # set Thread device mode with a given role |
| self.__setDeviceMode(mode) |
| time.sleep(0.1) |
| # start OpenThreadWpan |
| self.__startOpenThreadWpan() |
| time.sleep(3) |
| return True |
| except Exception as e: |
| ModuleHelper.WriteIntoDebugLogger('joinNetwork() Error: ' + str(e)) |
| |
| def getNetworkFragmentID(self): |
| """get current partition id of Thread Network Partition from LeaderData |
| |
| Returns: |
| The Thread network Partition Id |
| """ |
| print('%s call getNetworkFragmentID' % self.port) |
| if not self.____isOpenThreadWpanRunning(): |
| print('OpenThreadWpan is not running') |
| return None |
| |
| return self.__sendCommand(self.wpan_cmd_prefix + 'getprop -v Network:PartitionId')[0] |
| |
| def getParentAddress(self): |
| """get Thread device's parent extended address and rloc16 short address |
| |
| Returns: |
| The extended address of parent in hex format |
| """ |
| print('%s call getParentAddress' % self.port) |
| parentInfo = [] |
| parentInfo = self.__stripValue(self.__sendCommand(self.wpan_cmd_prefix + 'getprop -v Thread:Parent')).split( |
| ' ' |
| ) |
| |
| return parentInfo[0] |
| |
| def powerDown(self): |
| """power down the OpenThreadWpan""" |
| print('%s call powerDown' % self.port) |
| if self.__sendCommand(self.wpan_cmd_prefix + 'setprop Daemon:AutoAssociateAfterReset false')[0] != 'Fail': |
| time.sleep(0.5) |
| if self.__sendCommand(self.wpan_cmd_prefix + 'reset')[0] != 'Fail': |
| self.isPowerDown = True |
| return True |
| else: |
| return False |
| else: |
| return False |
| |
| def powerUp(self): |
| """power up the Thread device""" |
| print('%s call powerUp' % self.port) |
| if not self.handle: |
| self._connect() |
| |
| self.isPowerDown = False |
| if self.__sendCommand(self.wpan_cmd_prefix + 'attach')[0] != 'Fail': |
| time.sleep(3) |
| else: |
| return False |
| |
| if self.__sendCommand(self.wpan_cmd_prefix + 'setprop Daemon:AutoAssociateAfterReset true')[0] == 'Fail': |
| return False |
| |
| if self.__stripValue(self.__sendCommand(self.wpan_cmd_prefix + 'getprop -v NCP:State')[0]) != 'associated': |
| print('powerUp failed') |
| return False |
| else: |
| return True |
| |
| def reboot(self): |
| """reset and rejoin to Thread Network without any timeout |
| |
| Returns: |
| True: successful to reset and rejoin the Thread Network |
| False: fail to reset and rejoin the Thread Network |
| """ |
| print('%s call reboot' % self.port) |
| try: |
| self._sendline(self.wpan_cmd_prefix + 'reset') |
| self.isPowerDown = True |
| |
| if self.__sendCommand(self.wpan_cmd_prefix + 'getprop -v NCP:State')[0] != 'associated': |
| print('[FAIL] reboot') |
| return False |
| else: |
| return True |
| except Exception as e: |
| ModuleHelper.WriteIntoDebugLogger('reboot() Error: ' + str(e)) |
| |
| def ping(self, destination, length=20): |
| """ send ICMPv6 echo request with a given length to a unicast destination |
| address |
| |
| Args: |
| destination: the unicast destination address of ICMPv6 echo request |
| length: the size of ICMPv6 echo request payload |
| """ |
| print('%s call ping' % self.port) |
| print('destination: %s' % destination) |
| try: |
| cmd = 'ping %s -c 1 -s %s -I %s' % (destination, str(length), self.wpan_interface) |
| if self._is_net: |
| ssh_stdin, ssh_stdout, ssh_stderr = self.handle.exec_command(cmd) |
| else: |
| self._sendline(cmd) |
| self._expect(cmd) |
| # wait echo reply |
| time.sleep(1) |
| except Exception as e: |
| ModuleHelper.WriteIntoDebugLogger('ping() Error: ' + str(e)) |
| |
| def multicast_Ping(self, destination, length=20): |
| """send ICMPv6 echo request with a given length to a multicast destination |
| address |
| |
| Args: |
| destination: the multicast destination address of ICMPv6 echo request |
| length: the size of ICMPv6 echo request payload |
| """ |
| print('%s call multicast_Ping' % self.port) |
| print('destination: %s' % destination) |
| try: |
| cmd = 'ping %s -c 1 -s %s -I %s' % (destination, str(length), self.wpan_interface) |
| if self._is_net: |
| ssh_stdin, ssh_stdout, ssh_stderr = self.handle.exec_command(cmd) |
| else: |
| self._sendline(cmd) |
| self._expect(cmd) |
| # wait echo reply |
| time.sleep(1) |
| except Exception as e: |
| ModuleHelper.WriteIntoDebugLogger('multicast_ping() Error: ' + str(e)) |
| |
| def getVersionNumber(self): |
| """get OpenThreadWpan stack firmware version number""" |
| print('%s call getVersionNumber' % self.port) |
| versionStr = self.__sendCommand(self.wpan_cmd_prefix + 'getprop -v NCP:Version')[0] |
| |
| return self.__stripValue(versionStr) |
| |
| def setPANID(self, xPAN): |
| """set Thread Network PAN ID |
| |
| Args: |
| xPAN: a given PAN ID in hex format |
| |
| Returns: |
| True: successful to set the Thread Network PAN ID |
| False: fail to set the Thread Network PAN ID |
| """ |
| print('%s call setPANID' % self.port) |
| print(xPAN) |
| panid = '' |
| try: |
| if not isinstance(xPAN, str): |
| panid = str(hex(xPAN)) |
| print(panid) |
| |
| cmd = self.wpan_cmd_prefix + 'setprop -s Network:PANID %s' % panid |
| datasetCmd = self.wpan_cmd_prefix + 'setprop Dataset:PanId %s' % panid |
| self.hasActiveDatasetToCommit = True |
| return self.__sendCommand(cmd)[0] != 'Fail' and self.__sendCommand(datasetCmd)[0] != 'Fail' |
| except Exception as e: |
| ModuleHelper.WriteIntoDebugLogger('setPANID() Error: ' + str(e)) |
| |
| def reset(self): |
| """factory reset""" |
| print('%s call reset' % self.port) |
| try: |
| if self._is_net: |
| self.__sendCommand(self.wpan_cmd_prefix + 'leave') |
| else: |
| self._sendline(self.wpan_cmd_prefix + 'leave') |
| |
| self.__sendCommand(self.wpan_cmd_prefix + 'dataset erase') |
| time.sleep(2) |
| if not self._is_net: |
| self._read() |
| except Exception as e: |
| ModuleHelper.WriteIntoDebugLogger('reset() Error: ' + str(e)) |
| |
| def removeRouter(self, xRouterId): |
| """kick router with a given router id from the Thread Network |
| |
| Args: |
| xRouterId: a given router id in hex format |
| |
| Returns: |
| True: successful to remove the router from the Thread Network |
| False: fail to remove the router from the Thread Network |
| """ |
| print('%s call removeRouter' % self.port) |
| print(xRouterId) |
| routerId = '' |
| routerId = self.__convertRlocToRouterId(xRouterId) |
| print(routerId) |
| |
| if routerId is None: |
| print('no matched xRouterId') |
| return False |
| |
| try: |
| cmd = 'releaserouterid %s' % routerId |
| return self.__sendCommand(cmd)[0] != 'Fail' |
| except Exception as e: |
| ModuleHelper.WriteIntoDebugLogger('removeRouter() Error: ' + str(e)) |
| |
| def setDefaultValues(self): |
| """set default mandatory Thread Network parameter value""" |
| print('%s call setDefaultValues' % self.port) |
| |
| # initialize variables |
| self.networkName = ModuleHelper.Default_NwkName |
| self.networkKey = ModuleHelper.Default_NwkKey |
| self.channel = ModuleHelper.Default_Channel |
| self.channelMask = '0x7fff800' # (0xffff << 11) |
| self.panId = ModuleHelper.Default_PanId |
| self.xpanId = ModuleHelper.Default_XpanId |
| self.meshLocalPrefix = ModuleHelper.Default_MLPrefix |
| # OT only accept hex format PSKc for now |
| self.pskc = '00000000000000000000000000000000' |
| self.securityPolicySecs = ModuleHelper.Default_SecurityPolicy |
| self.securityPolicyFlags = 'onrcb' |
| self.activetimestamp = ModuleHelper.Default_ActiveTimestamp |
| # self.sedPollingRate = ModuleHelper.Default_Harness_SED_Polling_Rate |
| self.__sedPollPeriod = 3 * 1000 # in milliseconds |
| self.deviceRole = None |
| self.provisioningUrl = '' |
| self.hasActiveDatasetToCommit = False |
| self.logThread = Queue() |
| self.logThreadStatus = self.logStatus['stop'] |
| # indicate Thread device requests full or stable network data |
| self.networkDataRequirement = '' |
| # indicate if Thread device experiences a power down event |
| self.isPowerDown = False |
| # indicate AddressFilter mode ['disable', 'whitelist', 'blacklist'] |
| self._addressfilterMode = 'disable' |
| self._addressfilterSet = set() # cache filter entries |
| # indicate if Thread device is an active commissioner |
| self.isActiveCommissioner = False |
| self._lines = None # buffered lines read from device |
| |
| # initialize device configuration |
| try: |
| self.setMAC(self.mac) |
| self.__setChannelMask(self.channelMask) |
| self.__setSecurityPolicy(self.securityPolicySecs, self.securityPolicyFlags) |
| self.setChannel(self.channel) |
| self.setPANID(self.panId) |
| self.setXpanId(self.xpanId) |
| self.setNetworkName(self.networkName) |
| self.setNetworkKey(self.networkKey) |
| self.setMLPrefix(self.meshLocalPrefix) |
| self.setPSKc(self.pskc) |
| self.setActiveTimestamp(self.activetimestamp) |
| except Exception as e: |
| ModuleHelper.WriteIntoDebugLogger('setDefaultValue() Error: ' + str(e)) |
| |
| def getDeviceConncetionStatus(self): |
| """check if serial port connection is ready or not""" |
| print('%s call getDeviceConnectionStatus' % self.port) |
| return self.deviceConnected |
| |
| def setPollingRate(self, iPollingRate): |
| """set data polling rate for sleepy end device |
| |
| Args: |
| iPollingRate: data poll period of sleepy end device (in seconds) |
| |
| Returns: |
| True: successful to set the data polling rate for sleepy end device |
| False: fail to set the data polling rate for sleepy end device |
| """ |
| print('%s call setPollingRate' % self.port) |
| |
| iPollingRate = int(iPollingRate * 1000) |
| print(iPollingRate) |
| |
| if self.__sedPollPeriod != iPollingRate: |
| self.__sedPollPeriod = iPollingRate |
| |
| # apply immediately |
| if self.__isOpenThreadWpanRunning(): |
| return self.__setPollPeriod(self.__sedPollPeriod) |
| |
| return True |
| |
| def __setPollPeriod(self, iPollPeriod): |
| """set data poll period for sleepy end device |
| |
| Args: |
| iPollPeriod: data poll period of sleepy end device (in milliseconds) |
| |
| Returns: |
| True: successful to set the data poll period for sleepy end device |
| False: fail to set the data poll period for sleepy end device |
| """ |
| try: |
| cmd = self.wpan_cmd_prefix + 'setprop NCP:SleepyPollInterval %s' % str(iPollPeriod) |
| print(cmd) |
| return self.__sendCommand(cmd)[0] != 'Fail' |
| except Exception as e: |
| ModuleHelper.WriteIntoDebugLogger('__setPollingRate() Error: ' + str(e)) |
| |
| def setLinkQuality(self, EUIadr, LinkQuality): |
| """set custom LinkQualityIn for all receiving messages from the specified EUIadr |
| |
| Args: |
| EUIadr: a given extended address |
| LinkQuality: a given custom link quality |
| link quality/link margin mapping table |
| 3: 21 - 255 (dB) |
| 2: 11 - 20 (dB) |
| 1: 3 - 9 (dB) |
| 0: 0 - 2 (dB) |
| |
| Returns: |
| True: successful to set the link quality |
| False: fail to set the link quality |
| |
| @todo: required if as reference device |
| """ |
| |
| def setOutBoundLinkQuality(self, LinkQuality): |
| """set custom LinkQualityIn for all receiving messages from the any address |
| |
| Args: |
| LinkQuality: a given custom link quality |
| link quality/link margin mapping table |
| 3: 21 - 255 (dB) |
| 2: 11 - 20 (dB) |
| 1: 3 - 9 (dB) |
| 0: 0 - 2 (dB) |
| |
| Returns: |
| True: successful to set the link quality |
| False: fail to set the link quality |
| |
| @todo: required if as reference device |
| """ |
| |
| def removeRouterPrefix(self, prefixEntry): |
| """remove the configured prefix on a border router |
| |
| Args: |
| prefixEntry: a on-mesh prefix entry |
| |
| Returns: |
| True: successful to remove the prefix entry from border router |
| False: fail to remove the prefix entry from border router |
| |
| @todo: required if as reference device |
| """ |
| |
| def resetAndRejoin(self, timeout): |
| """reset and join back Thread Network with a given timeout delay |
| |
| Args: |
| timeout: a timeout interval before rejoin Thread Network |
| |
| Returns: |
| True: successful to reset and rejoin Thread Network |
| False: fail to reset and rejoin the Thread Network |
| """ |
| print('%s call resetAndRejoin' % self.port) |
| print(timeout) |
| try: |
| if self.__sendCommand(self.wpan_cmd_prefix + 'setprop Daemon:AutoAssociateAfterReset false')[0] != 'Fail': |
| time.sleep(0.5) |
| if self.__sendCommand(self.wpan_cmd_prefix + 'reset')[0] != 'Fail': |
| self.isPowerDown = True |
| else: |
| return False |
| else: |
| return False |
| time.sleep(timeout) |
| |
| if self.deviceRole == Thread_Device_Role.SED: |
| self.__setPollPeriod(self.__sedPollPeriod) |
| |
| if self.__sendCommand(self.wpan_cmd_prefix + 'attach')[0] != 'Fail': |
| time.sleep(3) |
| else: |
| return False |
| |
| if self.__sendCommand(self.wpan_cmd_prefix + 'setprop Daemon:AutoAssociateAfterReset true')[0] == 'Fail': |
| return False |
| |
| if self.__stripValue(self.__sendCommand(self.wpan_cmd_prefix + 'getprop -v NCP:State')[0]) != 'associated': |
| print('[FAIL] reset and rejoin') |
| return False |
| return True |
| except Exception as e: |
| ModuleHelper.WriteIntoDebugLogger('resetAndRejoin() Error: ' + str(e)) |
| |
| def configBorderRouter( |
| self, P_Prefix, P_stable=1, P_default=1, P_slaac_preferred=0, P_Dhcp=0, P_preference=0, P_on_mesh=1, P_nd_dns=0 |
| ): |
| """configure the border router with a given prefix entry parameters |
| |
| Args: |
| P_Prefix: IPv6 prefix that is available on the Thread Network |
| P_stable: true if the default router is expected to be stable network data |
| P_default: true if border router offers the default route for P_Prefix |
| P_slaac_preferred: true if allowing auto-configure address using P_Prefix |
| P_Dhcp: is true if border router is a DHCPv6 Agent |
| P_preference: is two-bit signed integer indicating router preference |
| P_on_mesh: is true if P_Prefix is considered to be on-mesh |
| P_nd_dns: is true if border router is able to supply DNS information obtained via ND |
| |
| Returns: |
| True: successful to configure the border router with a given prefix entry |
| False: fail to configure the border router with a given prefix entry |
| """ |
| print('%s call configBorderRouter' % self.port) |
| prefix = self.__convertIp6PrefixStringToIp6Address(str(P_Prefix)) |
| print(prefix) |
| try: |
| parameter = '' |
| |
| if P_slaac_preferred == 1: |
| parameter += ' -a -f' |
| |
| if P_stable == 1: |
| parameter += ' -s' |
| |
| if P_default == 1: |
| parameter += ' -r' |
| |
| if P_Dhcp == 1: |
| parameter += ' -d' |
| |
| if P_on_mesh == 1: |
| parameter += ' -o' |
| |
| cmd = self.wpan_cmd_prefix + 'add-prefix %s %s -P %d' % (prefix, parameter, P_preference) |
| print(parameter) |
| print(cmd) |
| if self.__sendCommand(cmd)[0] != 'Fail': |
| return True |
| else: |
| return False |
| except Exception as e: |
| ModuleHelper.WriteIntoDebugLogger('configBorderRouter() Error: ' + str(e)) |
| |
| def setNetworkIDTimeout(self, iNwkIDTimeOut): |
| """set networkid timeout for OpenThreadWpan |
| |
| Args: |
| iNwkIDTimeOut: a given NETWORK_ID_TIMEOUT |
| |
| Returns: |
| True: successful to set NETWORK_ID_TIMEOUT |
| False: fail to set NETWORK_ID_TIMEOUT |
| |
| @todo: required if as reference device |
| """ |
| |
| def setKeepAliveTimeOut(self, iTimeOut): |
| """set keep alive timeout for device |
| has been deprecated and also set SED polling rate |
| |
| Args: |
| iTimeOut: data poll period for sleepy end device |
| |
| Returns: |
| True: successful to set the data poll period for SED |
| False: fail to set the data poll period for SED |
| """ |
| print('%s call setKeepAliveTimeOut' % self.port) |
| print(iTimeOut) |
| try: |
| cmd = self.wpan_cmd_prefix + 'setprop NCP:SleepyPollInterval %s' % str(iTimeOut * 1000) |
| print(cmd) |
| return self.__sendCommand(cmd)[0] != 'Fail' |
| except Exception as e: |
| ModuleHelper.WriteIntoDebugLogger('setKeepAliveTimeOut() Error: ' + str(e)) |
| |
| def setKeySequenceCounter(self, iKeySequenceValue): |
| """ set the Key sequence counter corresponding to Thread Network master key |
| |
| Args: |
| iKeySequenceValue: key sequence value |
| |
| Returns: |
| True: successful to set the key sequence |
| False: fail to set the key sequence |
| """ |
| print('%s call setKeySequenceCounter' % self.port) |
| print(iKeySequenceValue) |
| try: |
| # avoid key switch guard timer protection for reference device |
| self.__setKeySwitchGuardTime(0) |
| |
| cmd = self.wpan_cmd_prefix + 'setprop Network:KeyIndex %s' % str(iKeySequenceValue) |
| if self.__sendCommand(cmd)[0] != 'Fail': |
| time.sleep(1) |
| return True |
| else: |
| return False |
| except Exception as e: |
| ModuleHelper.WriteIntoDebugLogger('setKeySequenceCounter() Error: ' + str(e)) |
| |
| def getKeySequenceCounter(self): |
| """get current Thread Network key sequence""" |
| print('%s call getKeySequenceCounter' % self.port) |
| keySequence = '' |
| keySequence = self.__sendCommand(self.wpan_cmd_prefix + 'getprop -v Network:KeyIndex')[0] |
| return keySequence |
| |
| def incrementKeySequenceCounter(self, iIncrementValue=1): |
| """increment the key sequence with a given value |
| |
| Args: |
| iIncrementValue: specific increment value to be added |
| |
| Returns: |
| True: successful to increment the key sequence with a given value |
| False: fail to increment the key sequence with a given value |
| """ |
| print('%s call incrementKeySequenceCounter' % self.port) |
| print(iIncrementValue) |
| currentKeySeq = '' |
| try: |
| # avoid key switch guard timer protection for reference device |
| self.__setKeySwitchGuardTime(0) |
| |
| currentKeySeq = self.getKeySequenceCounter() |
| keySequence = int(currentKeySeq, 10) + iIncrementValue |
| print(keySequence) |
| return self.setKeySequenceCounter(keySequence) |
| except Exception as e: |
| ModuleHelper.WriteIntoDebugLogger('incrementKeySequenceCounter() Error: ' + str(e)) |
| |
| def setNetworkDataRequirement(self, eDataRequirement): |
| """set whether the Thread device requires the full network data |
| or only requires the stable network data |
| |
| Args: |
| eDataRequirement: is true if requiring the full network data |
| |
| Returns: |
| True: successful to set the network requirement |
| """ |
| print('%s call setNetworkDataRequirement' % self.port) |
| print(eDataRequirement) |
| |
| if eDataRequirement == Device_Data_Requirement.ALL_DATA: |
| self.networkDataRequirement = 'n' |
| return True |
| |
| def configExternalRouter(self, P_Prefix, P_stable, R_Preference=0): |
| """configure border router with a given external route prefix entry |
| |
| Args: |
| P_Prefix: IPv6 prefix for the route |
| P_Stable: is true if the external route prefix is stable network data |
| R_Preference: a two-bit signed integer indicating Router preference |
| 1: high |
| 0: medium |
| -1: low |
| |
| Returns: |
| True: successful to configure the border router with a given external route prefix |
| False: fail to configure the border router with a given external route prefix |
| """ |
| print('%s call configExternalRouter' % self.port) |
| print(P_Prefix) |
| prefix = self.__convertIp6PrefixStringToIp6Address(str(P_Prefix)) |
| try: |
| if P_stable: |
| cmd = self.wpan_cmd_prefix + 'add-route %s -l 64 -p %d' % (prefix, R_Preference) |
| else: |
| cmd = self.wpan_cmd_prefix + 'add-route %s -l 64 -p %d -n' % (prefix, R_Preference) |
| print(cmd) |
| |
| if self.__sendCommand(cmd)[0] != 'Fail': |
| return True |
| else: |
| return False |
| except Exception as e: |
| ModuleHelper.WriteIntoDebugLogger('configExternalRouter() Error: ' + str(e)) |
| |
| def getNeighbouringRouters(self): |
| """get neighboring routers information |
| |
| Returns: |
| neighboring routers' extended address |
| |
| @todo: required if as reference device |
| """ |
| |
| def getChildrenInfo(self): |
| """get all children information |
| |
| Returns: |
| children's extended address |
| |
| @todo: required if as reference device |
| """ |
| |
| def setXpanId(self, xPanId): |
| """set extended PAN ID of Thread Network |
| |
| Args: |
| xPanId: extended PAN ID in hex format |
| |
| Returns: |
| True: successful to set the extended PAN ID |
| False: fail to set the extended PAN ID |
| """ |
| xpanid = '' |
| print('%s call setXpanId' % self.port) |
| print(xPanId) |
| try: |
| if not isinstance(xPanId, str): |
| xpanid = self.__convertLongToHex(xPanId, 16) |
| |
| cmd = self.wpan_cmd_prefix + 'setprop Network:XPANID %s' % xpanid |
| datasetCmd = self.wpan_cmd_prefix + 'setprop Dataset:ExtendedPanId %s' % xpanid |
| else: |
| xpanid = xPanId |
| cmd = self.wpan_cmd_prefix + 'setprop Network:XPANID %s' % xpanid |
| datasetCmd = self.wpan_cmd_prefix + 'setprop Dataset:ExtendedPanId %s' % xpanid |
| |
| self.xpanId = xpanid |
| self.hasActiveDatasetToCommit = True |
| return self.__sendCommand(cmd)[0] != 'Fail' and self.__sendCommand(datasetCmd)[0] != 'Fail' |
| except Exception as e: |
| ModuleHelper.WriteIntoDebugLogger('setXpanId() Error: ' + str(e)) |
| |
| def getNeighbouringDevices(self): |
| """gets the neighboring devices' extended address to compute the DUT |
| extended address automatically |
| |
| Returns: |
| A list including extended address of neighboring routers, parent |
| as well as children |
| """ |
| print('%s call getNeighbouringDevices' % self.port) |
| neighbourList = [] |
| |
| # get parent info |
| parentAddr = self.getParentAddress() |
| if parentAddr != 0: |
| neighbourList.append(parentAddr) |
| |
| # get ED/SED children info |
| childNeighbours = self.getChildrenInfo() |
| if childNeighbours is not None and len(childNeighbours) > 0: |
| for entry in childNeighbours: |
| neighbourList.append(entry) |
| |
| # get neighboring routers info |
| routerNeighbours = self.getNeighbouringRouters() |
| if routerNeighbours is not None and len(routerNeighbours) > 0: |
| for entry in routerNeighbours: |
| neighbourList.append(entry) |
| |
| print(neighbourList) |
| return neighbourList |
| |
| def setPartationId(self, partationId): |
| """set Thread Network Partition ID |
| |
| Args: |
| partitionId: partition id to be set by leader |
| |
| Returns: |
| True: successful to set the Partition ID |
| False: fail to set the Partition ID |
| """ |
| print('%s call setPartationId' % self.port) |
| print(partationId) |
| |
| cmd = self.wpan_cmd_prefix + 'setprop Network:PartitionId %s' % (str(hex(partationId)).rstrip('L')) |
| print(cmd) |
| return self.__sendCommand(cmd)[0] != 'Fail' |
| |
| def getGUA(self, filterByPrefix=None): |
| """get expected global unicast IPv6 address of OpenThreadWpan |
| |
| note: existing filterByPrefix are string of in lowercase. e.g. |
| '2001' or '2001:0db8:0001:0000". |
| |
| Args: |
| filterByPrefix: a given expected global IPv6 prefix to be matched |
| |
| Returns: |
| a global IPv6 address |
| """ |
| print('%s call getGUA' % self.port) |
| print(filterByPrefix) |
| globalAddrs = [] |
| try: |
| # get global addrs set if multiple |
| globalAddrs = self.getGlobal() |
| |
| if filterByPrefix is None: |
| return self.globalAddrs[0] |
| else: |
| for fullIp in globalAddrs: |
| if fullIp.startswith(filterByPrefix): |
| print('target global %s' % fullIp) |
| return fullIp |
| print('no global address matched') |
| return str(globalAddrs[0]) |
| except Exception as e: |
| ModuleHelper.WriteIntoDebugLogger('getGUA() Error: ' + str(e)) |
| return e |
| |
| def getShortAddress(self): |
| """get Rloc16 short address of Thread device""" |
| print('%s call getShortAddress' % self.port) |
| return self.getRloc16() |
| |
| def getULA64(self): |
| """get mesh local EID of Thread device""" |
| print('%s call getULA64' % self.port) |
| return self.getMLEID() |
| |
| def setMLPrefix(self, sMeshLocalPrefix): |
| """set mesh local prefix""" |
| print('%s call setMLPrefix' % self.port) |
| try: |
| cmd = self.wpan_cmd_prefix + 'setprop IPv6:MeshLocalPrefix %s' % sMeshLocalPrefix |
| datasetCmd = self.wpan_cmd_prefix + 'setprop Dataset:MeshLocalPrefix %s' % sMeshLocalPrefix |
| self.hasActiveDatasetToCommit = True |
| return self.__sendCommand(cmd)[0] != 'Fail' and self.__sendCommand(datasetCmd)[0] != 'Fail' |
| except Exception as e: |
| ModuleHelper.WriteIntoDebugLogger('setMLPrefix() Error: ' + str(e)) |
| |
| def getML16(self): |
| """get mesh local 16 unicast address (Rloc)""" |
| print('%s call getML16' % self.port) |
| return self.getRloc() |
| |
| def downgradeToDevice(self): |
| pass |
| |
| def upgradeToRouter(self): |
| pass |
| |
| def forceSetSlaac(self, slaacAddress): |
| """@todo : required if as reference device""" |
| |
| def setSleepyNodePollTime(self): |
| pass |
| |
| def enableAutoDUTObjectFlag(self): |
| """set AutoDUTenable flag""" |
| print('%s call enableAutoDUTObjectFlag' % self.port) |
| self.AutoDUTEnable = True |
| |
| def getChildTimeoutValue(self): |
| """get child timeout""" |
| print('%s call getChildTimeoutValue' % self.port) |
| childTimeout = self.__sendCommand(self.wpan_cmd_prefix + 'getprop -v Thread:ChildTimeout')[0] |
| return int(childTimeout) |
| |
| def diagnosticGet(self, strDestinationAddr, listTLV_ids=[]): |
| """@todo : required if as reference device""" |
| |
| def diagnosticQuery(self, strDestinationAddr, listTLV_ids=[]): |
| """@todo : required if as reference device""" |
| self.diagnosticGet(strDestinationAddr, listTLV_ids) |
| |
| def diagnosticReset(self, strDestinationAddr, listTLV_ids=[]): |
| """@todo : required if as reference device""" |
| |
| def startNativeCommissioner(self, strPSKc='GRLpassWord'): |
| # TODO: Support the whole Native Commissioner functionality |
| # Currently it only aims to trigger a Discovery Request message to pass |
| # Certification test 5.8.4 |
| print('%s call startNativeCommissioner' % self.port) |
| cmd = self.wpan_cmd_prefix + 'joiner --start %s' % (strPSKc) |
| print(cmd) |
| if self.__sendCommand(cmd)[0] != 'Fail': |
| return True |
| else: |
| return False |
| |
| def startCollapsedCommissioner(self): |
| """start Collapsed Commissioner |
| |
| Returns: |
| True: successful to start Commissioner |
| False: fail to start Commissioner |
| """ |
| print('%s call startCollapsedCommissioner' % self.port) |
| startCmd = self.wpan_cmd_prefix + 'form %s -c %s -T router' % (self.networkName, str(self.channel)) |
| if self.__sendCommand(startCmd) != 'Fail': |
| time.sleep(2) |
| cmd = self.wpan_cmd_prefix + 'commissioner start' |
| print(cmd) |
| if self.__sendCommand(cmd)[0] != 'Fail': |
| self.isActiveCommissioner = True |
| time.sleep(20) # time for petition process |
| return True |
| return False |
| |
| def setJoinKey(self, strPSKc): |
| pass |
| |
| def scanJoiner(self, xEUI='*', strPSKd='threadjpaketest'): |
| """scan Joiner |
| |
| Args: |
| xEUI: Joiner's EUI-64 |
| strPSKd: Joiner's PSKd for commissioning |
| |
| Returns: |
| True: successful to add Joiner's steering data |
| False: fail to add Joiner's steering data |
| """ |
| print('%s call scanJoiner' % self.port) |
| if not isinstance(xEUI, str): |
| eui64 = self.__convertLongToHex(xEUI, 16) |
| else: |
| eui64 = xEUI |
| |
| # long timeout value to avoid automatic joiner removal (in seconds) |
| timeout = 500 |
| |
| cmd = self.wpan_cmd_prefix + 'commissioner joiner-add %s %s %s' % (eui64, str(timeout), strPSKd) |
| print(cmd) |
| if not self.isActiveCommissioner: |
| self.startCollapsedCommissioner() |
| if self.__sendCommand(cmd)[0] != 'Fail': |
| return True |
| else: |
| return False |
| |
| def setProvisioningUrl(self, strURL='grl.com'): |
| """set provisioning Url |
| |
| Args: |
| strURL: Provisioning Url string |
| |
| Returns: |
| True: successful to set provisioning Url |
| False: fail to set provisioning Url |
| """ |
| print('%s call setProvisioningUrl' % self.port) |
| self.provisioningUrl = strURL |
| if self.deviceRole == Thread_Device_Role.Commissioner: |
| cmd = self.wpan_cmd_prefix + 'setprop Commissioner:ProvisioningUrl %s' % (strURL) |
| print(cmd) |
| return self.__sendCommand(cmd)[0] != 'Fail' |
| return True |
| |
| def allowCommission(self): |
| """start commissioner candidate petition process |
| |
| Returns: |
| True: successful to start commissioner candidate petition process |
| False: fail to start commissioner candidate petition process |
| """ |
| print('%s call allowCommission' % self.port) |
| try: |
| cmd = self.wpan_cmd_prefix + 'commissioner start' |
| print(cmd) |
| if self.isActiveCommissioner: |
| return True |
| if self.__sendCommand(cmd)[0] != 'Fail': |
| self.isActiveCommissioner = True |
| time.sleep(40) # time for petition process and at least one keep alive |
| return True |
| else: |
| return False |
| except Exception as e: |
| ModuleHelper.WriteIntoDebugLogger('allowcommission() error: ' + str(e)) |
| |
| def joinCommissioned(self, strPSKd='threadjpaketest', waitTime=20): |
| """start joiner |
| |
| Args: |
| strPSKd: Joiner's PSKd |
| |
| Returns: |
| True: successful to start joiner |
| False: fail to start joiner |
| """ |
| print('%s call joinCommissioned' % self.port) |
| cmd = self.wpan_cmd_prefix + 'joiner --start %s %s' % (strPSKd, self.provisioningUrl) |
| print(cmd) |
| if self.__sendCommand(cmd)[0] != 'Fail': |
| if self.__getJoinerState(): |
| self.__sendCommand(self.wpan_cmd_prefix + 'joiner --attach') |
| time.sleep(30) |
| return True |
| else: |
| return False |
| else: |
| return False |
| |
| def getCommissioningLogs(self): |
| """get Commissioning logs |
| |
| Returns: |
| Commissioning logs |
| """ |
| rawLogs = self.logThread.get() |
| ProcessedLogs = [] |
| payload = [] |
| while not rawLogs.empty(): |
| rawLogEach = rawLogs.get() |
| print(rawLogEach) |
| if '[THCI]' not in rawLogEach: |
| continue |
| |
| EncryptedPacket = PlatformDiagnosticPacket() |
| infoList = rawLogEach.split('[THCI]')[1].split(']')[0].split('|') |
| for eachInfo in infoList: |
| print(eachInfo) |
| info = eachInfo.split('=') |
| infoType = info[0].strip() |
| infoValue = info[1].strip() |
| if 'direction' in infoType: |
| EncryptedPacket.Direction = ( |
| PlatformDiagnosticPacket_Direction.IN |
| if 'recv' in infoValue |
| else PlatformDiagnosticPacket_Direction.OUT |
| if 'send' in infoValue |
| else PlatformDiagnosticPacket_Direction.UNKNOWN |
| ) |
| elif 'type' in infoType: |
| EncryptedPacket.Type = ( |
| PlatformDiagnosticPacket_Type.JOIN_FIN_req |
| if 'JOIN_FIN.req' in infoValue |
| else PlatformDiagnosticPacket_Type.JOIN_FIN_rsp |
| if 'JOIN_FIN.rsp' in infoValue |
| else PlatformDiagnosticPacket_Type.JOIN_ENT_req |
| if 'JOIN_ENT.ntf' in infoValue |
| else PlatformDiagnosticPacket_Type.JOIN_ENT_rsp |
| if 'JOIN_ENT.rsp' in infoValue |
| else PlatformDiagnosticPacket_Type.UNKNOWN |
| ) |
| elif 'len' in infoType: |
| bytesInEachLine = 16 |
| EncryptedPacket.TLVsLength = int(infoValue) |
| payloadLineCount = (int(infoValue) + bytesInEachLine - 1) / bytesInEachLine |
| while payloadLineCount > 0: |
| payloadLineCount = payloadLineCount - 1 |
| payloadLine = rawLogs.get() |
| payloadSplit = payloadLine.split('|') |
| for block in range(1, 3): |
| payloadBlock = payloadSplit[block] |
| payloadValues = payloadBlock.split(' ') |
| for num in range(1, 9): |
| if '..' not in payloadValues[num]: |
| payload.append(int(payloadValues[num], 16)) |
| |
| EncryptedPacket.TLVs = PlatformPackets.read(EncryptedPacket.Type, payload) if payload != [] else [] |
| |
| ProcessedLogs.append(EncryptedPacket) |
| return ProcessedLogs |
| |
| def MGMT_ED_SCAN(self, sAddr, xCommissionerSessionId, listChannelMask, xCount, xPeriod, xScanDuration): |
| """send MGMT_ED_SCAN message to a given destinaition. |
| |
| Args: |
| sAddr: IPv6 destination address for this message |
| xCommissionerSessionId: commissioner session id |
| listChannelMask: a channel array to indicate which channels to be scaned |
| xCount: number of IEEE 802.15.4 ED Scans (milliseconds) |
| xPeriod: Period between successive IEEE802.15.4 ED Scans (milliseconds) |
| xScanDuration: IEEE 802.15.4 ScanDuration to use when performing an IEEE 802.15.4 ED Scan (milliseconds) |
| |
| Returns: |
| True: successful to send MGMT_ED_SCAN message. |
| False: fail to send MGMT_ED_SCAN message |
| """ |
| print('%s call MGMT_ED_SCAN' % self.port) |
| channelMask = '' |
| channelMask = self.__ChannelMaskListToStr(listChannelMask) |
| try: |
| cmd = self.wpan_cmd_prefix + 'commissioner energy-scan %s %s %s %s %s' % ( |
| channelMask, |
| xCount, |
| xPeriod, |
| xScanDuration, |
| sAddr, |
| ) |
| print(cmd) |
| return self.__sendCommand(cmd) != 'Fail' |
| except Exception as e: |
| ModuleHelper.WriteIntoDebugLogger('MGMT_ED_SCAN() error: ' + str(e)) |
| |
| def MGMT_PANID_QUERY(self, sAddr, xCommissionerSessionId, listChannelMask, xPanId): |
| """send MGMT_PANID_QUERY message to a given destination |
| |
| Args: |
| xPanId: a given PAN ID to check the conflicts |
| |
| Returns: |
| True: successful to send MGMT_PANID_QUERY message. |
| False: fail to send MGMT_PANID_QUERY message. |
| """ |
| print('%s call MGMT_PANID_QUERY' % self.port) |
| panid = '' |
| channelMask = '' |
| channelMask = self.__ChannelMaskListToStr(listChannelMask) |
| |
| if not isinstance(xPanId, str): |
| panid = str(hex(xPanId)) |
| |
| try: |
| cmd = self.wpan_cmd_prefix + 'commissioner pan-id-query %s %s %s' % (panid, channelMask, sAddr) |
| print(cmd) |
| return self.__sendCommand(cmd) != 'Fail' |
| except Exception as e: |
| ModuleHelper.WriteIntoDebugLogger('MGMT_PANID_QUERY() error: ' + str(e)) |
| |
| def MGMT_ANNOUNCE_BEGIN(self, sAddr, xCommissionerSessionId, listChannelMask, xCount, xPeriod): |
| """send MGMT_ANNOUNCE_BEGIN message to a given destination |
| |
| Returns: |
| True: successful to send MGMT_ANNOUNCE_BEGIN message. |
| False: fail to send MGMT_ANNOUNCE_BEGIN message. |
| """ |
| print('%s call MGMT_ANNOUNCE_BEGIN' % self.port) |
| channelMask = '' |
| channelMask = self.__ChannelMaskListToStr(listChannelMask) |
| try: |
| cmd = self.wpan_cmd_prefix + 'commissioner announce-begin %s %s %s %s' % ( |
| channelMask, |
| xCount, |
| xPeriod, |
| sAddr, |
| ) |
| print(cmd) |
| return self.__sendCommand(cmd) != 'Fail' |
| except Exception as e: |
| ModuleHelper.WriteIntoDebugLogger('MGMT_ANNOUNCE_BEGIN() error: ' + str(e)) |
| |
| def MGMT_ACTIVE_GET(self, Addr='', TLVs=[]): |
| """send MGMT_ACTIVE_GET command |
| |
| Returns: |
| True: successful to send MGMT_ACTIVE_GET |
| False: fail to send MGMT_ACTIVE_GET |
| """ |
| print('%s call MGMT_ACTIVE_GET' % self.port) |
| try: |
| |
| cmd = self.wpan_cmd_prefix + 'dataset mgmt-get-active' |
| |
| if len(TLVs) != 0: |
| tlvs = ''.join('%02x' % tlv for tlv in TLVs) |
| setTLVCmd = self.wpan_cmd_prefix + 'setprop Dataset:RawTlvs ' + tlvs |
| if self.__sendCommand(setTLVCmd)[0] == 'Fail': |
| return False |
| else: |
| if self.__sendCommand(self.wpan_cmd_prefix + 'dataset erase')[0] == 'Fail': |
| return False |
| |
| if Addr != '': |
| setAddressCmd = self.wpan_cmd_prefix + 'setprop Dataset:DestIpAddress ' + Addr |
| if self.__sendCommand(setAddressCmd)[0] == 'Fail': |
| return False |
| |
| print(cmd) |
| |
| return self.__sendCommand(cmd)[0] != 'Fail' |
| |
| except Exception as e: |
| ModuleHelper.WriteIntoDebugLogger('MGMT_ACTIVE_GET() Error: ' + str(e)) |
| |
| def MGMT_ACTIVE_SET( |
| self, |
| sAddr='', |
| xCommissioningSessionId=None, |
| listActiveTimestamp=None, |
| listChannelMask=None, |
| xExtendedPanId=None, |
| sNetworkName=None, |
| sPSKc=None, |
| listSecurityPolicy=None, |
| xChannel=None, |
| sMeshLocalPrefix=None, |
| xMasterKey=None, |
| xPanId=None, |
| xTmfPort=None, |
| xSteeringData=None, |
| xBorderRouterLocator=None, |
| BogusTLV=None, |
| xDelayTimer=None, |
| ): |
| """send MGMT_ACTIVE_SET command |
| |
| Returns: |
| True: successful to send MGMT_ACTIVE_SET |
| False: fail to send MGMT_ACTIVE_SET |
| """ |
| print('%s call MGMT_ACTIVE_SET' % self.port) |
| try: |
| cmd = self.wpan_cmd_prefix + 'dataset mgmt-set-active' |
| |
| if self.__sendCommand(self.wpan_cmd_prefix + 'dataset erase')[0] == 'Fail': |
| return False |
| |
| if listActiveTimestamp is not None: |
| sActiveTimestamp = '%016x' % listActiveTimestamp[0] |
| setActiveTimeCmd = self.wpan_cmd_prefix + 'setprop Dataset:ActiveTimestamp ' + sActiveTimestamp |
| if self.__sendCommand(setActiveTimeCmd)[0] == 'Fail': |
| return False |
| |
| if xExtendedPanId is not None: |
| xpanid = self.__convertLongToHex(xExtendedPanId, 16) |
| setExtendedPanIdCmd = self.wpan_cmd_prefix + 'setprop Dataset:ExtendedPanId ' + xpanid |
| if self.__sendCommand(setExtendedPanIdCmd)[0] == 'Fail': |
| return False |
| |
| if sNetworkName is not None: |
| setNetworkNameCmd = self.wpan_cmd_prefix + 'setprop Dataset:NetworkName ' + str(sNetworkName) |
| if self.__sendCommand(setNetworkNameCmd)[0] == 'Fail': |
| return False |
| |
| if xChannel is not None: |
| setChannelCmd = self.wpan_cmd_prefix + 'setprop Dataset:Channel ' + str(xChannel) |
| if self.__sendCommand(setChannelCmd)[0] == 'Fail': |
| return False |
| |
| if sMeshLocalPrefix is not None: |
| setMLPrefixCmd = self.wpan_cmd_prefix + 'setprop Dataset:MeshLocalPrefix ' + str(sMeshLocalPrefix) |
| if self.__sendCommand(setMLPrefixCmd)[0] == 'Fail': |
| return False |
| |
| if xMasterKey is not None: |
| key = self.__convertLongToHex(xMasterKey, 32) |
| setMasterKeyCmd = self.wpan_cmd_prefix + 'setprop Dataset:MasterKey ' + key |
| if self.__sendCommand(setMasterKeyCmd)[0] == 'Fail': |
| return False |
| |
| if xPanId is not None: |
| setPanIdCmd = self.wpan_cmd_prefix + 'setprop Dataset:PanId ' + str(xPanId) |
| if self.__sendCommand(setPanIdCmd)[0] == 'Fail': |
| return False |
| |
| if listChannelMask is not None: |
| setChannelMaskCmd = ( |
| self.wpan_cmd_prefix |
| + 'setprop Dataset:ChannelMaskPage0 ' |
| + '0x' |
| + self.__convertLongToHex(self.__convertChannelMask(listChannelMask)) |
| ) |
| if self.__sendCommand(setChannelMaskCmd)[0] == 'Fail': |
| return False |
| |
| if ( |
| sPSKc is not None |
| or listSecurityPolicy is not None |
| or xCommissioningSessionId is not None |
| or xTmfPort is not None |
| or xSteeringData is not None |
| or xBorderRouterLocator is not None |
| or BogusTLV is not None |
| ): |
| setRawTLVCmd = self.wpan_cmd_prefix + 'setprop Dataset:RawTlvs ' |
| |
| if sPSKc is not None: |
| setRawTLVCmd += '0410' |
| stretchedPskc = Thread_PBKDF2.get(sPSKc, ModuleHelper.Default_XpanId, ModuleHelper.Default_NwkName) |
| pskc = '%x' % stretchedPskc |
| |
| if len(pskc) < 32: |
| pskc = pskc.zfill(32) |
| |
| setRawTLVCmd += pskc |
| |
| if listSecurityPolicy is not None: |
| setRawTLVCmd += '0c03' |
| |
| rotationTime = 0 |
| policyBits = 0 |
| |
| # previous passing way listSecurityPolicy=[True, True, 3600, |
| # False, False, True] |
| if len(listSecurityPolicy) == 6: |
| rotationTime = listSecurityPolicy[2] |
| |
| # the last three reserved bits must be 1 |
| policyBits = 0b00000111 |
| |
| if listSecurityPolicy[0]: |
| policyBits = policyBits | 0b10000000 |
| if listSecurityPolicy[1]: |
| policyBits = policyBits | 0b01000000 |
| if listSecurityPolicy[3]: |
| policyBits = policyBits | 0b00100000 |
| if listSecurityPolicy[4]: |
| policyBits = policyBits | 0b00010000 |
| if listSecurityPolicy[5]: |
| policyBits = policyBits | 0b00001000 |
| else: |
| # new passing way listSecurityPolicy=[3600, 0b11001111] |
| rotationTime = listSecurityPolicy[0] |
| policyBits = listSecurityPolicy[1] |
| |
| policy = str(hex(rotationTime))[2:] |
| |
| if len(policy) < 4: |
| policy = policy.zfill(4) |
| |
| setRawTLVCmd += policy |
| |
| setRawTLVCmd += str(hex(policyBits))[2:] |
| |
| if xCommissioningSessionId is not None: |
| setRawTLVCmd += '0b02' |
| sessionid = str(hex(xCommissioningSessionId))[2:] |
| |
| if len(sessionid) < 4: |
| sessionid = sessionid.zfill(4) |
| |
| setRawTLVCmd += sessionid |
| |
| if xBorderRouterLocator is not None: |
| setRawTLVCmd += '0902' |
| locator = str(hex(xBorderRouterLocator))[2:] |
| |
| if len(locator) < 4: |
| locator = locator.zfill(4) |
| |
| setRawTLVCmd += locator |
| |
| if xSteeringData is not None: |
| steeringData = self.__convertLongToHex(xSteeringData) |
| setRawTLVCmd += '08' + str(len(steeringData) / 2).zfill(2) |
| setRawTLVCmd += steeringData |
| |
| if BogusTLV is not None: |
| setRawTLVCmd += '8202aa55' |
| |
| print(setRawTLVCmd) |
| print(cmd) |
| |
| if self.__sendCommand(setRawTLVCmd)[0] == 'Fail': |
| return False |
| |
| return self.__sendCommand(cmd)[0] != 'Fail' |
| |
| except Exception as e: |
| ModuleHelper.WriteIntoDebugLogger('MGMT_ACTIVE_SET() Error: ' + str(e)) |
| |
| def MGMT_PENDING_GET(self, Addr='', TLVs=[]): |
| """send MGMT_PENDING_GET command |
| |
| Returns: |
| True: successful to send MGMT_PENDING_GET |
| False: fail to send MGMT_PENDING_GET |
| """ |
| print('%s call MGMT_PENDING_GET' % self.port) |
| try: |
| cmd = self.wpan_cmd_prefix + 'dataset mgmt-get-pending' |
| |
| if len(TLVs) != 0: |
| tlvs = ''.join('%02x' % tlv for tlv in TLVs) |
| setTLVCmd = self.wpan_cmd_prefix + 'setprop Dataset:RawTlvs ' + tlvs |
| if self.__sendCommand(setTLVCmd)[0] == 'Fail': |
| return False |
| else: |
| if self.__sendCommand(self.wpan_cmd_prefix + 'dataset erase')[0] == 'Fail': |
| return False |
| |
| if Addr != '': |
| setAddressCmd = self.wpan_cmd_prefix + 'setprop Dataset:DestIpAddress ' + Addr |
| if self.__sendCommand(setAddressCmd)[0] == 'Fail': |
| return False |
| |
| print(cmd) |
| |
| return self.__sendCommand(cmd)[0] != 'Fail' |
| |
| except Exception as e: |
| ModuleHelper.WriteIntoDebugLogger('MGMT_PENDING_GET() Error: ' + str(e)) |
| |
| def MGMT_PENDING_SET( |
| self, |
| sAddr='', |
| xCommissionerSessionId=None, |
| listPendingTimestamp=None, |
| listActiveTimestamp=None, |
| xDelayTimer=None, |
| xChannel=None, |
| xPanId=None, |
| xMasterKey=None, |
| sMeshLocalPrefix=None, |
| sNetworkName=None, |
| ): |
| """send MGMT_PENDING_SET command |
| |
| Returns: |
| True: successful to send MGMT_PENDING_SET |
| False: fail to send MGMT_PENDING_SET |
| """ |
| print('%s call MGMT_PENDING_SET' % self.port) |
| try: |
| cmd = self.wpan_cmd_prefix + 'dataset mgmt-set-pending' |
| if self.__sendCommand(self.wpan_cmd_prefix + 'dataset erase')[0] == 'Fail': |
| return False |
| |
| if listPendingTimestamp is not None: |
| sActiveTimestamp = '%016x' % listPendingTimestamp[0] |
| setPendingTimeCmd = self.wpan_cmd_prefix + 'setprop Dataset:PendingTimestamp ' + sActiveTimestamp |
| if self.__sendCommand(setPendingTimeCmd)[0] == 'Fail': |
| return False |
| |
| if listActiveTimestamp is not None: |
| sActiveTimestamp = '%016x' % listActiveTimestamp[0] |
| setActiveTimeCmd = self.wpan_cmd_prefix + 'setprop Dataset:ActiveTimestamp ' + sActiveTimestamp |
| if self.__sendCommand(setActiveTimeCmd)[0] == 'Fail': |
| return False |
| |
| if xDelayTimer is not None: |
| setDelayTimerCmd = self.wpan_cmd_prefix + 'setprop Dataset:Delay ' + str(xDelayTimer) |
| if self.__sendCommand(setDelayTimerCmd)[0] == 'Fail': |
| return False |
| |
| if sNetworkName is not None: |
| setNetworkNameCmd = self.wpan_cmd_prefix + 'setprop Dataset:NetworkName ' + str(sNetworkName) |
| if self.__sendCommand(setNetworkNameCmd)[0] == 'Fail': |
| return False |
| |
| if xChannel is not None: |
| setChannelCmd = self.wpan_cmd_prefix + 'setprop Dataset:Channel ' + str(xChannel) |
| if self.__sendCommand(setChannelCmd)[0] == 'Fail': |
| return False |
| |
| if sMeshLocalPrefix is not None: |
| setMLPrefixCmd = self.wpan_cmd_prefix + 'setprop Dataset:MeshLocalPrefix ' + str(sMeshLocalPrefix) |
| if self.__sendCommand(setMLPrefixCmd)[0] == 'Fail': |
| return False |
| |
| if xMasterKey is not None: |
| key = self.__convertLongToHex(xMasterKey, 32) |
| setMasterKeyCmd = self.wpan_cmd_prefix + 'setprop Dataset:MasterKey ' + key |
| if self.__sendCommand(setMasterKeyCmd)[0] == 'Fail': |
| return False |
| |
| if xPanId is not None: |
| setPanIdCmd = self.wpan_cmd_prefix + 'setprop Dataset:PanId ' + str(xPanId) |
| if self.__sendCommand(setPanIdCmd)[0] == 'Fail': |
| return False |
| |
| if xCommissionerSessionId is not None: |
| print('not handle xCommissionerSessionId') |
| |
| print(cmd) |
| |
| return self.__sendCommand(cmd)[0] != 'Fail' |
| |
| except Exception as e: |
| ModuleHelper.WriteIntoDebugLogger('MGMT_PENDING_SET() Error: ' + str(e)) |
| |
| def MGMT_COMM_GET(self, Addr='ff02::1', TLVs=[]): |
| """send MGMT_COMM_GET command |
| |
| Returns: |
| True: successful to send MGMT_COMM_GET |
| False: fail to send MGMT_COMM_GET |
| """ |
| print('%s call MGMT_COMM_GET' % self.port) |
| try: |
| cmd = self.wpan_cmd_prefix + 'commissioner mgmt-get ' |
| print('TLVs:') |
| print(TLVs) |
| |
| if len(TLVs) != 0: |
| tlvs = ''.join('%02x' % tlv for tlv in TLVs) |
| cmd += tlvs |
| |
| print(cmd) |
| |
| return self.__sendCommand(cmd)[0] != 'Fail' |
| |
| except Exception as e: |
| ModuleHelper.WriteIntoDebugLogger('MGMT_COMM_GET() Error: ' + str(e)) |
| |
| def MGMT_COMM_SET( |
| self, |
| Addr='ff02::1', |
| xCommissionerSessionID=None, |
| xSteeringData=None, |
| xBorderRouterLocator=None, |
| xChannelTlv=None, |
| ExceedMaxPayload=False, |
| ): |
| """send MGMT_COMM_SET command |
| |
| Returns: |
| True: successful to send MGMT_COMM_SET |
| False: fail to send MGMT_COMM_SET |
| """ |
| print('%s call MGMT_COMM_SET' % self.port) |
| try: |
| cmd = self.wpan_cmd_prefix + 'commissioner mgmt-set ' |
| print('-------------------------------') |
| print(xCommissionerSessionID) |
| print(xSteeringData) |
| print(str(xSteeringData) + ' ' + str(hex(xSteeringData)[2:])) |
| print(xBorderRouterLocator) |
| print(xChannelTlv) |
| print(ExceedMaxPayload) |
| print('-------------------------------') |
| |
| if xCommissionerSessionID is not None: |
| # use assigned session id |
| cmd += '0b02' + str(xCommissionerSessionID) |
| elif xCommissionerSessionID is None: |
| # use original session id |
| if self.isActiveCommissioner is True: |
| cmd += '0b02' + self.__lstrip0x(self.__getCommissionerSessionId()) |
| else: |
| pass |
| |
| if xSteeringData is not None: |
| cmd += '08' + str(len(hex(xSteeringData)[2:])) + str(hex(xSteeringData)[2:]) |
| |
| if xBorderRouterLocator is not None: |
| cmd += '0902' + str(hex(xBorderRouterLocator)) |
| |
| if xChannelTlv is not None: |
| cmd += '000300' + '%04x' % xChannelTlv |
| |
| print(cmd) |
| |
| return self.__sendCommand(cmd)[0] != 'Fail' |
| |
| except Exception as e: |
| ModuleHelper.WriteIntoDebugLogger('MGMT_COMM_SET() Error: ' + str(e)) |
| |
| def setActiveDataset(self, listActiveDataset=[]): |
| print('%s call setActiveDataset' % self.port) |
| |
| def setCommisionerMode(self): |
| print('%s call setCommissionerMode' % self.port) |
| |
| def setPSKc(self, strPSKc): |
| print('%s call setPSKc' % self.port) |
| try: |
| cmd = self.wpan_cmd_prefix + 'setprop Network:PSKc %s' % strPSKc |
| datasetCmd = self.wpan_cmd_prefix + 'setprop Dataset:PSKc %s' % strPSKc |
| self.hasActiveDatasetToCommit = True |
| return self.__sendCommand(cmd)[0] != 'Fail' and self.__sendCommand(datasetCmd)[0] != 'Fail' |
| except Exception as e: |
| ModuleHelper.WriteIntoDebugLogger('setPSKc() Error: ' + str(e)) |
| |
| def setActiveTimestamp(self, xActiveTimestamp): |
| print('%s call setActiveTimestamp' % self.port) |
| try: |
| sActiveTimestamp = str(xActiveTimestamp) |
| if len(sActiveTimestamp) < 16: |
| sActiveTimestamp = sActiveTimestamp.zfill(16) |
| self.activetimestamp = sActiveTimestamp |
| cmd = self.wpan_cmd_prefix + 'setprop Dataset:ActiveTimestamp %s' % sActiveTimestamp |
| self.hasActiveDatasetToCommit = True |
| return self.__sendCommand(cmd)[0] != 'Fail' |
| except Exception as e: |
| ModuleHelper.WriteIntoDebugLogger('setActiveTimestamp() Error: ' + str(e)) |
| |
| def setUdpJoinerPort(self, portNumber): |
| """set Joiner UDP Port |
| |
| Args: |
| portNumber: Joiner UDP Port number |
| |
| Returns: |
| True: successful to set Joiner UDP Port |
| False: fail to set Joiner UDP Port |
| |
| @todo : required if as reference device |
| """ |
| |
| def commissionerUnregister(self): |
| """stop commissioner |
| |
| Returns: |
| True: successful to stop commissioner |
| False: fail to stop commissioner |
| """ |
| print('%s call commissionerUnregister' % self.port) |
| cmd = self.wpan_cmd_prefix + 'commissioner stop' |
| print(cmd) |
| if self.__sendCommand(cmd)[0] != 'Fail': |
| self.isActiveCommissioner = False |
| return True |
| else: |
| return False |
| |
| def sendBeacons(self, sAddr, xCommissionerSessionId, listChannelMask, xPanId): |
| print('%s call sendBeacons' % self.port) |
| self._sendline(self.wpan_cmd_prefix + 'scan') |
| return True |
| |
| def updateRouterStatus(self): |
| """force update to router as if there is child id request |
| @todo : required if as reference device |
| """ |
| |
| def setRouterThresholdValues(self, upgradeThreshold, downgradeThreshold): |
| print('%s call setRouterThresholdValues' % self.port) |
| self.__setRouterUpgradeThreshold(upgradeThreshold) |
| self.__setRouterDowngradeThreshold(downgradeThreshold) |
| |
| def setMinDelayTimer(self, iSeconds): |
| pass |
| |
| def ValidateDeviceFirmware(self): |
| print('%s call ValidateDeviceFirmware' % self.port) |
| if 'OPENTHREAD' in self.UIStatusMsg: |
| return True |
| else: |
| return False |
| |
| @staticmethod |
| def __lstrip0x(s): |
| """strip 0x at the beginning of a hex string if it exists |
| |
| Args: |
| s: hex string |
| |
| Returns: |
| hex string with leading 0x stripped |
| """ |
| if s.startswith('0x'): |
| s = s[2:] |
| |
| return s |