| #!/usr/bin/env python |
| # |
| # Copyright (c) 2016, The OpenThread Authors. |
| # All rights reserved. |
| # |
| # Redistribution and use in source and binary forms, with or without |
| # modification, are permitted provided that the following conditions are met: |
| # 1. Redistributions of source code must retain the above copyright |
| # notice, this list of conditions and the following disclaimer. |
| # 2. Redistributions in binary form must reproduce the above copyright |
| # notice, this list of conditions and the following disclaimer in the |
| # documentation and/or other materials provided with the distribution. |
| # 3. Neither the name of the copyright holder nor the |
| # names of its contributors may be used to endorse or promote products |
| # derived from this software without specific prior written permission. |
| # |
| # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" |
| # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE |
| # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE |
| # ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE |
| # LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR |
| # CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF |
| # SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS |
| # INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN |
| # CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) |
| # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
| # POSSIBILITY OF SUCH DAMAGE. |
| # |
| |
| import ConfigParser |
| import json |
| import logging |
| import os |
| import subprocess |
| import re |
| import time |
| import unittest |
| |
| from selenium import webdriver |
| from selenium.webdriver import ActionChains |
| from selenium.webdriver.support.ui import Select |
| from selenium.common.exceptions import UnexpectedAlertPresentException |
| from selenium.common.exceptions import NoSuchElementException |
| from functools import reduce |
| |
| from autothreadharness import settings |
| from autothreadharness.exceptions import FailError, FatalError, GoldenDeviceNotEnoughError |
| from autothreadharness.harness_controller import HarnessController |
| from autothreadharness.helpers import HistoryHelper |
| from autothreadharness.open_thread_controller import OpenThreadController |
| from autothreadharness.pdu_controller_factory import PduControllerFactory |
| from autothreadharness.rf_shield_controller import get_rf_shield_controller |
| |
| logger = logging.getLogger(__name__) |
| |
| THREAD_CHANNEL_MAX = 26 |
| """Maximum channel number of thread protocol""" |
| |
| THREAD_CHANNEL_MIN = 11 |
| """Minimum channel number of thread protocol""" |
| |
| DEFAULT_TIMEOUT = 2700 |
| """Timeout for each test case in seconds""" |
| |
| |
| def wait_until(what, times=-1): |
| """Wait until `what` return True |
| |
| Args: |
| what (Callable[bool]): Call `wait()` again and again until it returns True |
| times (int): Maximum times of trials before giving up |
| |
| Returns: |
| True if success, False if times threshold reached |
| |
| """ |
| while times: |
| logger.info('Waiting times left %d', times) |
| try: |
| if what() is True: |
| return True |
| except BaseException: |
| logger.exception('Wait failed') |
| else: |
| logger.warning('Trial[%d] failed', times) |
| times -= 1 |
| time.sleep(1) |
| |
| return False |
| |
| |
| class HarnessCase(unittest.TestCase): |
| """This is the case class of all automation test cases. |
| |
| All test case classes MUST define properties `role`, `case` and `golden_devices_required` |
| """ |
| |
| channel = settings.THREAD_CHANNEL |
| """int: Thread channel. |
| |
| Thread channel ranges from 11 to 26. |
| """ |
| |
| ROLE_LEADER = 1 |
| ROLE_ROUTER = 2 |
| ROLE_SED = 4 |
| ROLE_BORDER = 8 |
| ROLE_REED = 16 |
| ROLE_ED = 32 |
| ROLE_COMMISSIONER = 64 |
| ROLE_JOINER = 128 |
| ROLE_FED = 512 |
| ROLE_MED = 1024 |
| |
| role = None |
| """int: role id. |
| |
| 1 |
| Leader |
| 2 |
| Router |
| 4 |
| Sleepy end device |
| 16 |
| Router eligible end device |
| 32 |
| End device |
| 64 |
| Commissioner |
| 128 |
| Joiner |
| 512 |
| Full end device |
| 1024 |
| Minimal end device |
| """ |
| |
| case = None |
| """str: Case id, e.g. '6 5 1'. |
| """ |
| |
| golden_devices_required = 0 |
| """int: Golden devices needed to finish the test |
| """ |
| |
| child_timeout = settings.THREAD_CHILD_TIMEOUT |
| """int: Child timeout in seconds |
| """ |
| |
| sed_polling_interval = settings.THREAD_SED_POLLING_INTERVAL |
| """int: SED polling interval in seconds |
| """ |
| |
| auto_dut = settings.AUTO_DUT |
| """bool: whether use harness auto dut feature""" |
| |
| timeout = hasattr(settings, 'TIMEOUT') and settings.TIMEOUT or DEFAULT_TIMEOUT |
| """number: timeout in seconds to stop running this test case""" |
| |
| started = 0 |
| """number: test case started timestamp""" |
| |
| case_need_shield = False |
| """bool: whether needs RF-box""" |
| |
| device_order = [] |
| """list: device drag order in TestHarness TestBed page""" |
| |
| def __init__(self, *args, **kwargs): |
| self.dut = None |
| self._browser = None |
| self._hc = None |
| self.result_dir = '%s\\%s' % (settings.OUTPUT_PATH, self.__class__.__name__) |
| self.history = HistoryHelper() |
| self.add_all_devices = False |
| self.new_th = False |
| |
| harness_info = ConfigParser.ConfigParser() |
| harness_info.read('%s\\info.ini' % settings.HARNESS_HOME) |
| if harness_info.has_option('Thread_Harness_Info', 'Version') and harness_info.has_option( |
| 'Thread_Harness_Info', 'Mode' |
| ): |
| harness_version = harness_info.get('Thread_Harness_Info', 'Version').rsplit(' ', 1)[1] |
| harness_mode = harness_info.get('Thread_Harness_Info', 'Mode') |
| |
| if harness_mode == 'External' and harness_version > '1.4.0': |
| self.new_th = True |
| |
| if harness_mode == 'Internal' and harness_version > '49.4': |
| self.new_th = True |
| |
| super(HarnessCase, self).__init__(*args, **kwargs) |
| |
| def _init_devices(self): |
| """Reboot all usb devices. |
| |
| Note: |
| If PDU_CONTROLLER_TYPE is not valid, usb devices is not rebooted. |
| """ |
| if not settings.PDU_CONTROLLER_TYPE: |
| if settings.AUTO_DUT: |
| return |
| |
| for device in settings.GOLDEN_DEVICES: |
| port, _ = device |
| try: |
| with OpenThreadController(port) as otc: |
| logger.info('Resetting %s', port) |
| otc.reset() |
| except BaseException: |
| logger.exception('Failed to reset device %s', port) |
| self.history.mark_bad_golden_device(device) |
| |
| return |
| |
| tries = 3 |
| pdu_factory = PduControllerFactory() |
| |
| while True: |
| try: |
| pdu = pdu_factory.create_pdu_controller(settings.PDU_CONTROLLER_TYPE) |
| pdu.open(**settings.PDU_CONTROLLER_OPEN_PARAMS) |
| except EOFError: |
| logger.warning('Failed to connect to telnet') |
| tries = tries - 1 |
| if tries: |
| time.sleep(10) |
| continue |
| else: |
| logger.error('Fatal error: cannot connect to apc') |
| raise |
| else: |
| pdu.reboot(**settings.PDU_CONTROLLER_REBOOT_PARAMS) |
| pdu.close() |
| break |
| |
| time.sleep(len(settings.GOLDEN_DEVICES)) |
| |
| def _init_harness(self): |
| """Restart harness backend service. |
| |
| Please start the harness controller before running the cases, otherwise, nothing happens |
| """ |
| self._hc = HarnessController(self.result_dir) |
| self._hc.stop() |
| time.sleep(1) |
| self._hc.start() |
| time.sleep(2) |
| |
| harness_config = ConfigParser.ConfigParser() |
| harness_config.read('%s\\Config\\Configuration.ini' % settings.HARNESS_HOME) |
| if harness_config.has_option('THREAD_HARNESS_CONFIG', 'BrowserAutoNavigate') and harness_config.getboolean( |
| 'THREAD_HARNESS_CONFIG', 'BrowserAutoNavigate' |
| ): |
| logger.error('BrowserAutoNavigate in Configuration.ini should be False') |
| raise FailError('BrowserAutoNavigate in Configuration.ini should be False') |
| if settings.MIXED_DEVICE_TYPE: |
| if harness_config.has_option( |
| 'THREAD_HARNESS_CONFIG', 'EnableDeviceSelection' |
| ) and not harness_config.getboolean('THREAD_HARNESS_CONFIG', 'EnableDeviceSelection'): |
| logger.error('EnableDeviceSelection in Configuration.ini should be True') |
| raise FailError('EnableDeviceSelection in Configuration.ini should be True') |
| |
| def _destroy_harness(self): |
| """Stop harness backend service |
| |
| Stop harness service. |
| """ |
| self._hc.stop() |
| time.sleep(2) |
| |
| def _init_dut(self): |
| """Initialize the DUT. |
| |
| DUT will be restarted. and openthread will started. |
| """ |
| if self.auto_dut: |
| self.dut = None |
| return |
| |
| dut_port = settings.DUT_DEVICE[0] |
| dut = OpenThreadController(dut_port) |
| self.dut = dut |
| |
| def _destroy_dut(self): |
| self.dut = None |
| |
| def _init_browser(self): |
| """Open harness web page. |
| |
| Open a quiet chrome which: |
| 1. disables extensions, |
| 2. ignore certificate errors and |
| 3. always allow notifications. |
| """ |
| try: |
| chrome_options = webdriver.ChromeOptions() |
| chrome_options.add_argument('--disable-extensions') |
| chrome_options.add_argument('--disable-infobars') |
| chrome_options.add_argument('--ignore-certificate-errors') |
| chrome_options.add_experimental_option( |
| 'prefs', {'profile.managed_default_content_settings.notifications': 1} |
| ) |
| |
| browser = webdriver.Chrome(chrome_options=chrome_options) |
| browser.set_page_load_timeout(20) |
| browser.implicitly_wait(1) |
| browser.maximize_window() |
| browser.get(settings.HARNESS_URL) |
| self._browser = browser |
| if not wait_until(lambda: 'Thread' in browser.title, 30): |
| self.assertIn('Thread', browser.title) |
| return True |
| except Exception as e: |
| logger.info('Init chrome error: {0}'.format(type(e).__name__)) |
| return False |
| |
| def _destroy_browser(self): |
| """Close the browser. |
| """ |
| if self._browser: |
| self._browser.close() |
| self._browser = None |
| |
| def _init_rf_shield(self): |
| if getattr(settings, 'SHIELD_CONTROLLER_TYPE', None) and getattr(settings, 'SHIELD_CONTROLLER_PARAMS', None): |
| self.rf_shield = get_rf_shield_controller( |
| shield_type=settings.SHIELD_CONTROLLER_TYPE, params=settings.SHIELD_CONTROLLER_PARAMS |
| ) |
| else: |
| self.rf_shield = None |
| |
| def _destroy_rf_shield(self): |
| self.rf_shield = None |
| |
| def setUp(self): |
| """Prepare to run test case. |
| |
| Start harness service, init golden devices, reset DUT and open browser. |
| """ |
| if self.__class__ is HarnessCase: |
| return |
| |
| logger.info('Setting up') |
| # clear files |
| |
| logger.info('Deleting all .pcapng') |
| os.system('del /q "%s\\Captures\\*.pcapng"' % settings.HARNESS_HOME) |
| logger.info('Empty files in Logs') |
| os.system('del /q "%s\\Logs\\*.*"' % settings.HARNESS_HOME) |
| |
| # using temp files to fix excel downloading fail |
| if self.new_th: |
| logger.info('Empty files in Reports') |
| os.system('del /q "%s\\Reports\\*.*"' % settings.HARNESS_HOME) |
| else: |
| logger.info('Empty files in temps') |
| os.system('del /q "%s\\Thread_Harness\\temp\\*.*"' % settings.HARNESS_HOME) |
| |
| # create directory |
| os.system('mkdir %s' % self.result_dir) |
| self._init_harness() |
| self._init_devices() |
| self._init_dut() |
| self._init_rf_shield() |
| |
| def tearDown(self): |
| """Clean up after each case. |
| |
| Stop harness service, close browser and close DUT. |
| """ |
| if self.__class__ is HarnessCase: |
| return |
| |
| logger.info('Tearing down') |
| self._destroy_harness() |
| self._destroy_browser() |
| self._destroy_dut() |
| self._destroy_rf_shield() |
| |
| def _setup_page(self): |
| """Do sniffer settings and general settings |
| """ |
| if not self.started: |
| self.started = time.time() |
| |
| # Detect Sniffer |
| try: |
| dialog = self._browser.find_element_by_id('capture-Setup-modal') |
| except BaseException: |
| logger.exception('Failed to get dialog.') |
| else: |
| if dialog and dialog.get_attribute('aria-hidden') == 'false': |
| times = 100 |
| while times: |
| status = dialog.find_element_by_class_name('status-notify').text |
| if 'Searching' in status: |
| logger.info('Still detecting..') |
| elif 'Not' in status: |
| logger.warning('Sniffer device not verified!') |
| button = dialog.find_element_by_id('snifferAutoDetectBtn') |
| button.click() |
| elif 'Verified' in status: |
| logger.info('Verified!') |
| button = dialog.find_element_by_id('saveCaptureSettings') |
| button.click() |
| break |
| else: |
| logger.warning('Unexpected sniffer verification status') |
| |
| times = times - 1 |
| time.sleep(1) |
| |
| if not times: |
| raise Exception('Unable to detect sniffer device') |
| |
| time.sleep(1) |
| |
| try: |
| skip_button = self._browser.find_element_by_id('SkipPrepareDevice') |
| if skip_button.is_enabled(): |
| skip_button.click() |
| time.sleep(1) |
| except BaseException: |
| logger.info('Still detecting sniffers') |
| |
| try: |
| next_button = self._browser.find_element_by_id('nextButton') |
| except BaseException: |
| logger.exception('Failed to finish setup') |
| return |
| |
| if not next_button.is_enabled(): |
| logger.info('Harness is still not ready') |
| return |
| |
| # General Setup |
| try: |
| if self.child_timeout or self.sed_polling_interval: |
| logger.info('finding general Setup button') |
| button = self._browser.find_element_by_id('general-Setup') |
| button.click() |
| time.sleep(2) |
| |
| dialog = self._browser.find_element_by_id('general-Setup-modal') |
| if dialog.get_attribute('aria-hidden') != 'false': |
| raise Exception('Missing General Setup dialog') |
| |
| field = dialog.find_element_by_id('inp_general_child_update_wait_time') |
| field.clear() |
| if self.child_timeout: |
| field.send_keys(str(self.child_timeout)) |
| |
| field = dialog.find_element_by_id('inp_general_sed_polling_rate') |
| field.clear() |
| if self.sed_polling_interval: |
| field.send_keys(str(self.sed_polling_interval)) |
| |
| button = dialog.find_element_by_id('saveGeneralSettings') |
| button.click() |
| time.sleep(1) |
| |
| except BaseException: |
| logger.info('general setup exception') |
| logger.exception('Failed to do general setup') |
| return |
| |
| # Finish this page |
| next_button.click() |
| time.sleep(1) |
| |
| def _connect_devices(self): |
| connect_all = self._browser.find_element_by_link_text('Connect All') |
| connect_all.click() |
| |
| def _add_device(self, port, device_type_id): |
| browser = self._browser |
| test_bed = browser.find_element_by_id('test-bed') |
| device = browser.find_element_by_id(device_type_id) |
| # drag |
| action_chains = ActionChains(browser) |
| action_chains.click_and_hold(device) |
| action_chains.move_to_element(test_bed).perform() |
| time.sleep(1) |
| |
| # drop |
| drop_hw = browser.find_element_by_class_name('drop-hw') |
| action_chains = ActionChains(browser) |
| action_chains.move_to_element(drop_hw) |
| action_chains.release(drop_hw).perform() |
| |
| time.sleep(0.5) |
| selected_hw = browser.find_element_by_class_name('selected-hw') |
| form_inputs = selected_hw.find_elements_by_tag_name('input') |
| form_port = form_inputs[0] |
| form_port.clear() |
| form_port.send_keys(port) |
| |
| def _test_bed(self): |
| """Set up the test bed. |
| |
| Connect number of golden devices required by each case. |
| """ |
| browser = self._browser |
| test_bed = browser.find_element_by_id('test-bed') |
| time.sleep(3) |
| selected_hw_set = test_bed.find_elements_by_class_name('selected-hw') |
| selected_hw_num = len(selected_hw_set) |
| |
| while selected_hw_num: |
| remove_button = selected_hw_set[selected_hw_num - 1].find_element_by_class_name('removeSelectedDevice') |
| remove_button.click() |
| selected_hw_num = selected_hw_num - 1 |
| |
| devices = [ |
| device |
| for device in settings.GOLDEN_DEVICES |
| if not self.history.is_bad_golden_device(device[0]) |
| and not (settings.DUT_DEVICE and device[0] == settings.DUT_DEVICE[0]) |
| ] |
| logger.info('Available golden devices: %s', json.dumps(devices, indent=2)) |
| |
| shield_devices = [ |
| shield_device |
| for shield_device in settings.SHIELD_GOLDEN_DEVICES |
| if not self.history.is_bad_golden_device(shield_device[0]) |
| and not (settings.DUT2_DEVICE and shield_device[0] == settings.DUT2_DEVICE[0]) |
| ] |
| logger.info('Available shield golden devices: %s', json.dumps(shield_devices, indent=2)) |
| golden_devices_required = self.golden_devices_required |
| |
| dut_device = () |
| if settings.DUT_DEVICE: |
| dut_device = settings.DUT_DEVICE |
| |
| """check if test case needs to use RF-shield box and its device order in Testbed page |
| Two parameters case_need_shield & device_order should be set in the case script |
| according to the requires: https://openthread.io/certification/test-cases#rf_shielding |
| Example: |
| In case script leader_9_2_9.py: |
| case_need_shield = True |
| device_order = [('Router_2', False), ('Commissioner', True), ('Router_1', False), ('DUT', True)] |
| On the TestBed page of the Test Harness, the device sort order for Leader_9_2_9 |
| should be like: |
| Router_2 |
| Commissioner |
| Router_1 |
| DUT |
| The ('Commissioner', True) and ('DUT', True) indicate Commissioner device and DUT2 device should |
| be in the RF-box and choose from SHIELD_GOLDEN_DEVICES and DUT2_DEVICE. Otherwise ('DUT', False) means |
| DUT device is not in RF-box and use DUT_DEVICE. The other roles devices with False should be selected |
| from GOLDEN_DEVICES. |
| |
| In case script med_6_3_2.py: |
| case_need_shield = True |
| device_order = [] # or not defined |
| means no device drag order. DUT2_DEVICE should be applied as DUT and the other golden devices |
| are from GOLDEN_DEVICES. |
| """ |
| if self.case_need_shield: |
| if not settings.DUT2_DEVICE: |
| logger.info('Must set DUT2_DEVICE') |
| raise FailError('DUT2_DEVICE must be set in settings.py') |
| if isinstance(self.device_order, list) and self.device_order: |
| logger.info('case %s devices ordered by %s ', self.case, self.device_order) |
| else: |
| logger.info('case %s uses %s as DUT', self.case, settings.DUT2_DEVICE) |
| |
| # for test bed with multi-vendor devices |
| if settings.MIXED_DEVICE_TYPE: |
| topo_file = settings.HARNESS_HOME + "\\Thread_Harness\\TestScripts\\TopologyConfig.txt" |
| try: |
| f_topo = open(topo_file, 'r') |
| except IOError: |
| logger.info('%s can NOT be found', topo_file) |
| raise GoldenDeviceNotEnoughError() |
| topo_mixed_devices = [] |
| try: |
| while True: |
| topo_line = f_topo.readline().strip() |
| if re.match(r'#.*', topo_line): |
| continue |
| match_line = re.match(r'(.*)-(.*)', topo_line, re.M | re.I) |
| if not match_line: |
| continue |
| case_id = match_line.group(1) |
| |
| if re.sub(r'\.', ' ', case_id) == self.case: |
| logger.info('Get line by case %s: %s', case_id, topo_line) |
| topo_device_list = re.split(',', match_line.group(2)) |
| for i in range(len(topo_device_list)): |
| topo_device = re.split(':', topo_device_list[i]) |
| topo_mixed_devices.append(tuple(topo_device)) |
| break |
| else: |
| continue |
| except Exception as e: |
| logger.info('Get devices from topology config file error: %s', e) |
| raise GoldenDeviceNotEnoughError() |
| logger.info('Golden devices in topology config file for case %s: %s', case_id, topo_mixed_devices) |
| f_topo.close() |
| golden_device_candidates = [] |
| missing_golden_devices = topo_mixed_devices[:] |
| |
| # mapping topology config devices with golden devices by device order |
| if self.case_need_shield and self.device_order: |
| matched_dut = False |
| for device_order_item in self.device_order: |
| matched = False |
| for mixed_device_item in topo_mixed_devices: |
| # mapping device in device_order which needs to be shielded |
| if device_order_item[1]: |
| if 'DUT' in device_order_item[0]: |
| golden_device_candidates.append(settings.DUT2_DEVICE) |
| dut_device = settings.DUT2_DEVICE |
| matched_dut = True |
| matched = True |
| break |
| for device_item in shield_devices: |
| if ( |
| device_order_item[0] == mixed_device_item[0] |
| and mixed_device_item[1] == device_item[1] |
| ): |
| golden_device_candidates.append(device_item) |
| shield_devices.remove(device_item) |
| matched = True |
| break |
| # mapping device in device_order which does not need to be shielded |
| else: |
| if 'DUT' in device_order_item[0]: |
| golden_device_candidates.append(settings.DUT_DEVICE) |
| matched_dut = True |
| matched = True |
| break |
| for device_item in devices: |
| if ( |
| device_order_item[0] == mixed_device_item[0] |
| and mixed_device_item[1] == device_item[1] |
| ): |
| golden_device_candidates.append(device_item) |
| devices.remove(device_item) |
| matched = True |
| break |
| if not matched: |
| logger.info('Golden device not enough in : no %s', device_order_item) |
| raise GoldenDeviceNotEnoughError() |
| if not matched_dut: |
| raise FailError('Failed to find DUT in device_order') |
| devices = golden_device_candidates |
| self.add_all_devices = True |
| else: |
| for mixed_device_item in topo_mixed_devices: |
| for device_item in devices: |
| if mixed_device_item[1] == device_item[1]: |
| golden_device_candidates.append(device_item) |
| devices.remove(device_item) |
| missing_golden_devices.remove(mixed_device_item) |
| break |
| logger.info('Golden devices in topology config file mapped in settings : %s', golden_device_candidates) |
| if len(topo_mixed_devices) != len(golden_device_candidates): |
| device_dict = dict() |
| for missing_device in missing_golden_devices: |
| if missing_device[1] in device_dict: |
| device_dict[missing_device[1]] += 1 |
| else: |
| device_dict[missing_device[1]] = 1 |
| logger.info('Missing Devices: %s', device_dict) |
| raise GoldenDeviceNotEnoughError() |
| else: |
| devices = golden_device_candidates |
| golden_devices_required = len(devices) |
| logger.info('All case-needed golden devices: %s', json.dumps(devices, indent=2)) |
| # for test bed with single vendor devices |
| else: |
| golden_device_candidates = [] |
| if self.case_need_shield and self.device_order: |
| matched_dut = False |
| for device_order_item in self.device_order: |
| matched = False |
| # choose device which needs to be shielded |
| if device_order_item[1]: |
| if 'DUT' in device_order_item[0]: |
| golden_device_candidates.append(settings.DUT2_DEVICE) |
| dut_device = settings.DUT2_DEVICE |
| matched_dut = True |
| matched = True |
| else: |
| for device_item in shield_devices: |
| golden_device_candidates.append(device_item) |
| shield_devices.remove(device_item) |
| matched = True |
| break |
| # choose device which does not need to be shielded |
| else: |
| if 'DUT' in device_order_item[0]: |
| golden_device_candidates.append(settings.DUT_DEVICE) |
| matched_dut = True |
| matched = True |
| else: |
| for device_item in devices: |
| golden_device_candidates.append(device_item) |
| devices.remove(device_item) |
| matched = True |
| break |
| if not matched: |
| logger.info('Golden device not enough in : no %s', device_order_item) |
| raise GoldenDeviceNotEnoughError() |
| if not matched_dut: |
| raise FailError('Failed to find DUT in device_order') |
| devices = golden_device_candidates |
| self.add_all_devices = True |
| |
| if self.auto_dut and not settings.DUT_DEVICE: |
| if settings.MIXED_DEVICE_TYPE: |
| logger.info('Must set DUT_DEVICE') |
| raise FailError('DUT_DEVICE must be set for mixed testbed') |
| golden_devices_required += 1 |
| |
| if len(devices) < golden_devices_required: |
| raise GoldenDeviceNotEnoughError() |
| |
| # add golden devices |
| number_of_devices_to_add = len(devices) if self.add_all_devices else golden_devices_required |
| for i in range(number_of_devices_to_add): |
| self._add_device(*devices.pop()) |
| |
| # add DUT |
| if self.case_need_shield: |
| if not self.device_order: |
| self._add_device(*settings.DUT2_DEVICE) |
| else: |
| if settings.DUT_DEVICE: |
| self._add_device(*settings.DUT_DEVICE) |
| |
| # enable AUTO DUT |
| if self.auto_dut: |
| checkbox_auto_dut = browser.find_element_by_id('EnableAutoDutSelection') |
| if not checkbox_auto_dut.is_selected(): |
| checkbox_auto_dut.click() |
| time.sleep(1) |
| |
| if settings.DUT_DEVICE: |
| radio_auto_dut = browser.find_element_by_class_name('AutoDUT_RadBtns') |
| if not radio_auto_dut.is_selected() and not self.device_order: |
| radio_auto_dut.click() |
| |
| if self.device_order: |
| selected_hw_set = test_bed.find_elements_by_class_name('selected-hw') |
| for selected_hw in selected_hw_set: |
| form_inputs = selected_hw.find_elements_by_tag_name('input') |
| form_port = form_inputs[0] |
| port = form_port.get_attribute('value').encode('utf8') |
| if port == dut_device[0]: |
| radio_auto_dut = selected_hw.find_element_by_class_name('AutoDUT_RadBtns') |
| if not radio_auto_dut.is_selected(): |
| radio_auto_dut.click() |
| |
| while True: |
| try: |
| self._connect_devices() |
| button_next = browser.find_element_by_id('nextBtn') |
| if not wait_until( |
| lambda: 'disabled' not in button_next.get_attribute('class'), |
| times=(30 + 4 * number_of_devices_to_add), |
| ): |
| bad_ones = [] |
| selected_hw_set = test_bed.find_elements_by_class_name('selected-hw') |
| for selected_hw in selected_hw_set: |
| form_inputs = selected_hw.find_elements_by_tag_name('input') |
| form_port = form_inputs[0] |
| if form_port.is_enabled(): |
| bad_ones.append(selected_hw) |
| |
| for selected_hw in bad_ones: |
| form_inputs = selected_hw.find_elements_by_tag_name('input') |
| form_port = form_inputs[0] |
| port = form_port.get_attribute('value').encode('utf8') |
| if port == dut_device[0]: |
| if settings.PDU_CONTROLLER_TYPE is None: |
| # connection error cannot recover without power |
| # cycling |
| raise FatalError('Failed to connect to DUT') |
| else: |
| raise FailError('Failed to connect to DUT') |
| |
| if settings.PDU_CONTROLLER_TYPE is None: |
| # port cannot recover without power cycling |
| self.history.mark_bad_golden_device(port) |
| |
| # remove the bad one |
| selected_hw.find_element_by_class_name('removeSelectedDevice').click() |
| time.sleep(0.1) |
| |
| if len(devices): |
| self._add_device(*devices.pop()) |
| else: |
| devices = None |
| |
| if devices is None: |
| logger.warning('Golden devices not enough') |
| raise GoldenDeviceNotEnoughError() |
| else: |
| logger.info('Try again with new golden devices') |
| continue |
| |
| if self.auto_dut and not settings.DUT_DEVICE: |
| radio_auto_dut = browser.find_element_by_class_name('AutoDUT_RadBtns') |
| if not radio_auto_dut.is_selected(): |
| radio_auto_dut.click() |
| |
| time.sleep(5) |
| |
| button_next.click() |
| if not wait_until(lambda: self._browser.current_url.endswith('TestExecution.html'), 20): |
| raise Exception('Failed to load TestExecution page') |
| except FailError: |
| raise |
| except BaseException: |
| logger.exception('Unexpected error') |
| else: |
| break |
| |
| def _select_case(self, role, case): |
| """Select the test case. |
| """ |
| # select the case |
| elem = Select(self._browser.find_element_by_id('select-dut')) |
| elem.select_by_value(str(role)) |
| time.sleep(1) |
| |
| checkbox = None |
| wait_until(lambda: self._browser.find_elements_by_css_selector('.tree-node .tree-title') and True) |
| elems = self._browser.find_elements_by_css_selector('.tree-node .tree-title') |
| finder = re.compile(r'.*\b' + case + r'\b') |
| finder_dotted = re.compile(r'.*\b' + case.replace(' ', r'\.') + r'\b') |
| for elem in elems: |
| action_chains = ActionChains(self._browser) |
| action_chains.move_to_element(elem) |
| action_chains.perform() |
| logger.debug(elem.text) |
| if finder.match(elem.text) or finder_dotted.match(elem.text): |
| parent = elem.find_element_by_xpath('..') |
| checkbox = parent.find_element_by_class_name('tree-checkbox') |
| break |
| |
| if not checkbox: |
| time.sleep(5) |
| raise Exception('Failed to find the case') |
| |
| self._browser.execute_script("$('.overview').css('left', '0')") |
| checkbox.click() |
| time.sleep(1) |
| |
| elem = self._browser.find_element_by_id('runTest') |
| elem.click() |
| if not wait_until(lambda: self._browser.find_element_by_id('stopTest') and True, 10): |
| raise Exception('Failed to start test case') |
| |
| def _collect_result(self): |
| """Collect test result. |
| |
| Copy PDF and pcap file to result directory |
| """ |
| |
| if self.new_th: |
| os.system('copy "%s\\Reports\\*.*" "%s"' % (settings.HARNESS_HOME, self.result_dir)) |
| else: |
| os.system('copy "%s\\Thread_Harness\\temp\\*.*" "%s"' % (settings.HARNESS_HOME, self.result_dir)) |
| |
| os.system('copy "%s\\Captures\\*.pcapng" %s\\' % (settings.HARNESS_HOME, self.result_dir)) |
| |
| def _wait_dialog(self): |
| """Wait for dialogs and handle them until done. |
| """ |
| logger.debug('waiting for dialog') |
| done = False |
| error = False |
| |
| logger.info('self timeout %d', self.timeout) |
| while not done and self.timeout: |
| try: |
| dialog = self._browser.find_element_by_id('RemoteConfirm') |
| except BaseException: |
| logger.exception('Failed to get dialog.') |
| else: |
| if dialog and dialog.get_attribute('aria-hidden') == 'false': |
| title = dialog.find_element_by_class_name('modal-title').text |
| time.sleep(1) |
| logger.info('Handling dialog[%s]', title) |
| |
| try: |
| done = self._handle_dialog(dialog, title) |
| except BaseException: |
| logger.exception('Error handling dialog: %s', title) |
| error = True |
| |
| if done is None: |
| raise FailError('Unexpected dialog occurred') |
| |
| dialog.find_element_by_id('ConfirmOk').click() |
| |
| time.sleep(1) |
| |
| try: |
| stop_button = self._browser.find_element_by_id('stopTest') |
| if done: |
| stop_button.click() |
| # wait for stop procedure end |
| time.sleep(10) |
| except NoSuchElementException: |
| logger.info('Test stopped') |
| time.sleep(5) |
| done = True |
| |
| self.timeout -= 1 |
| |
| # check if already ended capture |
| if self.timeout % 10 == 0: |
| lines = self._hc.tail() |
| if 'SUCCESS: The process "dumpcap.exe" with PID ' in lines: |
| logger.info('Tshark should be ended now, lets wait at most 30 seconds.') |
| if not wait_until(lambda: 'tshark.exe' not in subprocess.check_output('tasklist'), 30): |
| res = subprocess.check_output( |
| 'taskkill /t /f /im tshark.exe', stderr=subprocess.STDOUT, shell=True |
| ) |
| logger.info(res) |
| |
| # Wait until case really stopped |
| wait_until(lambda: self._browser.find_element_by_id('runTest') and True, 30) |
| |
| if error: |
| raise FailError('Fail for previous exceptions') |
| |
| def _handle_dialog(self, dialog, title): |
| """Handle a dialog. |
| |
| Returns: |
| bool True if no more dialogs expected, |
| False if more dialogs needed, and |
| None if not handled |
| """ |
| done = self.on_dialog(dialog, title) |
| if isinstance(done, bool): |
| return done |
| |
| if title.startswith('Start DUT'): |
| body = dialog.find_element_by_id('cnfrmMsg').text |
| if 'Sleepy End Device' in body: |
| self.dut.mode = 's' |
| self.dut.child_timeout = self.child_timeout |
| elif 'End Device' in body: |
| self.dut.mode = 'rsn' |
| self.dut.child_timeout = self.child_timeout |
| else: |
| self.dut.mode = 'rsdn' |
| |
| if 'at channel' in body: |
| self.channel = int(body.split(':')[1]) |
| |
| self.dut.channel = self.channel |
| self.dut.panid = settings.THREAD_PANID |
| self.dut.networkname = settings.THREAD_NETWORKNAME |
| self.dut.extpanid = settings.THREAD_EXTPANID |
| self.dut.start() |
| |
| elif title.startswith('MAC Address Required') or title.startswith('DUT Random Extended MAC Address Required'): |
| mac = self.dut.mac |
| inp = dialog.find_element_by_id('cnfrmInpText') |
| inp.clear() |
| inp.send_keys('0x%s' % mac) |
| |
| elif title.startswith('LL64 Address'): |
| ll64 = None |
| for addr in self.dut.addrs: |
| addr = addr.lower() |
| if addr.startswith('fe80') and not re.match('.+ff:fe00:[0-9a-f]{0,4}$', addr): |
| ll64 = addr |
| break |
| |
| if not ll64: |
| raise FailError('No link local address found') |
| |
| logger.info('Link local address is %s', ll64) |
| inp = dialog.find_element_by_id('cnfrmInpText') |
| inp.clear() |
| inp.send_keys(ll64) |
| |
| elif title.startswith('Enter Channel'): |
| self.dut.channel = self.channel |
| inp = dialog.find_element_by_id('cnfrmInpText') |
| inp.clear() |
| inp.send_keys(str(self.dut.channel)) |
| |
| elif title.startswith('User Action Needed'): |
| body = dialog.find_element_by_id('cnfrmMsg').text |
| if body.startswith('Power Down the DUT'): |
| self.dut.stop() |
| return True |
| |
| elif title.startswith('Short Address'): |
| short_addr = '0x%s' % self.dut.short_addr |
| inp = dialog.find_element_by_id('cnfrmInpText') |
| inp.clear() |
| inp.send_keys(short_addr) |
| |
| elif title.startswith('ML64 Address'): |
| ml64 = None |
| for addr in self.dut.addrs: |
| if addr.startswith('fd') and not re.match('.+ff:fe00:[0-9a-f]{0,4}$', addr): |
| ml64 = addr |
| break |
| |
| if not ml64: |
| raise Exception('No mesh local address found') |
| |
| logger.info('Mesh local address is %s', ml64) |
| inp = dialog.find_element_by_id('cnfrmInpText') |
| inp.clear() |
| inp.send_keys(ml64) |
| |
| elif title.startswith('Shield Devices') or title.startswith('Shield DUT'): |
| time.sleep(2) |
| if self.rf_shield: |
| logger.info('Shielding devices') |
| with self.rf_shield: |
| self.rf_shield.shield() |
| elif self.dut and settings.SHIELD_SIMULATION: |
| self.dut.channel = (self.channel == THREAD_CHANNEL_MAX and THREAD_CHANNEL_MIN) or (self.channel + 1) |
| else: |
| input('Shield DUT and press enter to continue..') |
| |
| elif title.startswith('Unshield Devices') or title.startswith('Bring DUT back to network'): |
| time.sleep(5) |
| if self.rf_shield: |
| logger.info('Unshielding devices') |
| with self.rf_shield: |
| self.rf_shield.unshield() |
| elif self.dut and settings.SHIELD_SIMULATION: |
| self.dut.channel = self.channel |
| else: |
| input('Bring DUT and press enter to continue..') |
| |
| elif title.startswith('Configure Prefix on DUT'): |
| body = dialog.find_element_by_id('cnfrmMsg').text |
| body = body.split(': ')[1] |
| params = reduce( |
| lambda params, param: params.update(((param[0].strip(' '), param[1]),)) or params, |
| [it.split('=') for it in body.split(', ')], |
| {}, |
| ) |
| prefix = params['P_Prefix'].strip('\0\r\n\t ') |
| flags = [] |
| if params.get('P_slaac_preferred', 0) == '1': |
| flags.append('p') |
| flags.append('ao') |
| if params.get('P_stable', 0) == '1': |
| flags.append('s') |
| if params.get('P_default', 0) == '1': |
| flags.append('r') |
| prf = 'high' |
| self.dut.add_prefix(prefix, ''.join(flags), prf) |
| |
| return False |
| |
| def test(self): |
| """This method will only start test case in child class""" |
| if self.__class__ is HarnessCase: |
| logger.warning('Skip this harness itself') |
| return |
| |
| logger.info('Testing role[%d] case[%s]', self.role, self.case) |
| |
| init_browser_times = 5 |
| while True: |
| if self._init_browser(): |
| break |
| elif init_browser_times > 0: |
| init_browser_times -= 1 |
| self._destroy_browser() |
| else: |
| raise SystemExit() |
| |
| try: |
| # prepare test case |
| while True: |
| url = self._browser.current_url |
| if url.endswith('SetupPage.html'): |
| self._setup_page() |
| elif url.endswith('TestBed.html'): |
| self._test_bed() |
| elif url.endswith('TestExecution.html'): |
| logger.info('Ready to handle dialogs') |
| break |
| time.sleep(2) |
| except UnexpectedAlertPresentException: |
| logger.exception('Failed to connect to harness server') |
| raise SystemExit() |
| except FatalError: |
| logger.exception('Test stopped for fatal error') |
| raise SystemExit() |
| except FailError: |
| logger.exception('Test failed') |
| raise |
| except BaseException: |
| logger.exception('Something wrong') |
| |
| self._select_case(self.role, self.case) |
| |
| logger.info('start to wait test process end') |
| self._wait_dialog() |
| |
| try: |
| self._collect_result() |
| except BaseException: |
| logger.exception('Failed to collect results') |
| raise |
| |
| # get case result |
| status = self._browser.find_element_by_class_name('title-test').text |
| logger.info(status) |
| success = 'Pass' in status |
| self.assertTrue(success) |