blob: 5726c648b8801d1603a27181c9fecc61aef861a8 [file] [log] [blame]
#!/usr/bin/env python
#
# Copyright (c) 2016, The OpenThread Authors.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# 3. Neither the name of the copyright holder nor the
# names of its contributors may be used to endorse or promote products
# derived from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
import ConfigParser
import json
import logging
import os
import subprocess
import re
import time
import unittest
from selenium import webdriver
from selenium.webdriver import ActionChains
from selenium.webdriver.support.ui import Select
from selenium.common.exceptions import UnexpectedAlertPresentException
from selenium.common.exceptions import NoSuchElementException
from functools import reduce
from autothreadharness import settings
from autothreadharness.exceptions import FailError, FatalError, GoldenDeviceNotEnoughError
from autothreadharness.harness_controller import HarnessController
from autothreadharness.helpers import HistoryHelper
from autothreadharness.open_thread_controller import OpenThreadController
from autothreadharness.pdu_controller_factory import PduControllerFactory
from autothreadharness.rf_shield_controller import get_rf_shield_controller
logger = logging.getLogger(__name__)
THREAD_CHANNEL_MAX = 26
"""Maximum channel number of thread protocol"""
THREAD_CHANNEL_MIN = 11
"""Minimum channel number of thread protocol"""
DEFAULT_TIMEOUT = 2700
"""Timeout for each test case in seconds"""
def wait_until(what, times=-1):
"""Wait until `what` return True
Args:
what (Callable[bool]): Call `wait()` again and again until it returns True
times (int): Maximum times of trials before giving up
Returns:
True if success, False if times threshold reached
"""
while times:
logger.info('Waiting times left %d', times)
try:
if what() is True:
return True
except BaseException:
logger.exception('Wait failed')
else:
logger.warning('Trial[%d] failed', times)
times -= 1
time.sleep(1)
return False
class HarnessCase(unittest.TestCase):
"""This is the case class of all automation test cases.
All test case classes MUST define properties `role`, `case` and `golden_devices_required`
"""
channel = settings.THREAD_CHANNEL
"""int: Thread channel.
Thread channel ranges from 11 to 26.
"""
ROLE_LEADER = 1
ROLE_ROUTER = 2
ROLE_SED = 4
ROLE_BORDER = 8
ROLE_REED = 16
ROLE_ED = 32
ROLE_COMMISSIONER = 64
ROLE_JOINER = 128
ROLE_FED = 512
ROLE_MED = 1024
role = None
"""int: role id.
1
Leader
2
Router
4
Sleepy end device
16
Router eligible end device
32
End device
64
Commissioner
128
Joiner
512
Full end device
1024
Minimal end device
"""
case = None
"""str: Case id, e.g. '6 5 1'.
"""
golden_devices_required = 0
"""int: Golden devices needed to finish the test
"""
child_timeout = settings.THREAD_CHILD_TIMEOUT
"""int: Child timeout in seconds
"""
sed_polling_interval = settings.THREAD_SED_POLLING_INTERVAL
"""int: SED polling interval in seconds
"""
auto_dut = settings.AUTO_DUT
"""bool: whether use harness auto dut feature"""
timeout = hasattr(settings, 'TIMEOUT') and settings.TIMEOUT or DEFAULT_TIMEOUT
"""number: timeout in seconds to stop running this test case"""
started = 0
"""number: test case started timestamp"""
case_need_shield = False
"""bool: whether needs RF-box"""
device_order = []
"""list: device drag order in TestHarness TestBed page"""
def __init__(self, *args, **kwargs):
self.dut = None
self._browser = None
self._hc = None
self.result_dir = '%s\\%s' % (settings.OUTPUT_PATH, self.__class__.__name__)
self.history = HistoryHelper()
self.add_all_devices = False
self.new_th = False
harness_info = ConfigParser.ConfigParser()
harness_info.read('%s\\info.ini' % settings.HARNESS_HOME)
if harness_info.has_option('Thread_Harness_Info', 'Version') and harness_info.has_option(
'Thread_Harness_Info', 'Mode'):
harness_version = harness_info.get('Thread_Harness_Info', 'Version').rsplit(' ', 1)[1]
harness_mode = harness_info.get('Thread_Harness_Info', 'Mode')
if harness_mode == 'External' and harness_version > '1.4.0':
self.new_th = True
if harness_mode == 'Internal' and harness_version > '49.4':
self.new_th = True
super(HarnessCase, self).__init__(*args, **kwargs)
def _init_devices(self):
"""Reboot all usb devices.
Note:
If PDU_CONTROLLER_TYPE is not valid, usb devices is not rebooted.
"""
if not settings.PDU_CONTROLLER_TYPE:
if settings.AUTO_DUT:
return
for device in settings.GOLDEN_DEVICES:
port, _ = device
try:
with OpenThreadController(port) as otc:
logger.info('Resetting %s', port)
otc.reset()
except BaseException:
logger.exception('Failed to reset device %s', port)
self.history.mark_bad_golden_device(device)
return
tries = 3
pdu_factory = PduControllerFactory()
while True:
try:
pdu = pdu_factory.create_pdu_controller(settings.PDU_CONTROLLER_TYPE)
pdu.open(**settings.PDU_CONTROLLER_OPEN_PARAMS)
except EOFError:
logger.warning('Failed to connect to telnet')
tries = tries - 1
if tries:
time.sleep(10)
continue
else:
logger.error('Fatal error: cannot connect to apc')
raise
else:
pdu.reboot(**settings.PDU_CONTROLLER_REBOOT_PARAMS)
pdu.close()
break
time.sleep(len(settings.GOLDEN_DEVICES))
def _init_harness(self):
"""Restart harness backend service.
Please start the harness controller before running the cases, otherwise, nothing happens
"""
self._hc = HarnessController(self.result_dir)
self._hc.stop()
time.sleep(1)
self._hc.start()
time.sleep(2)
harness_config = ConfigParser.ConfigParser()
harness_config.read('%s\\Config\\Configuration.ini' % settings.HARNESS_HOME)
if harness_config.has_option('THREAD_HARNESS_CONFIG', 'BrowserAutoNavigate') and harness_config.getboolean(
'THREAD_HARNESS_CONFIG', 'BrowserAutoNavigate'):
logger.error('BrowserAutoNavigate in Configuration.ini should be False')
raise FailError('BrowserAutoNavigate in Configuration.ini should be False')
if settings.MIXED_DEVICE_TYPE:
if harness_config.has_option('THREAD_HARNESS_CONFIG',
'EnableDeviceSelection') and not harness_config.getboolean(
'THREAD_HARNESS_CONFIG', 'EnableDeviceSelection'):
logger.error('EnableDeviceSelection in Configuration.ini should be True')
raise FailError('EnableDeviceSelection in Configuration.ini should be True')
def _destroy_harness(self):
"""Stop harness backend service
Stop harness service.
"""
self._hc.stop()
time.sleep(2)
def _init_dut(self):
"""Initialize the DUT.
DUT will be restarted. and openthread will started.
"""
if self.auto_dut:
self.dut = None
return
dut_port = settings.DUT_DEVICE[0]
dut = OpenThreadController(dut_port)
self.dut = dut
def _destroy_dut(self):
self.dut = None
def _init_browser(self):
"""Open harness web page.
Open a quiet chrome which:
1. disables extensions,
2. ignore certificate errors and
3. always allow notifications.
"""
try:
chrome_options = webdriver.ChromeOptions()
chrome_options.add_argument('--disable-extensions')
chrome_options.add_argument('--disable-infobars')
chrome_options.add_argument('--ignore-certificate-errors')
chrome_options.add_experimental_option('prefs',
{'profile.managed_default_content_settings.notifications': 1})
browser = webdriver.Chrome(chrome_options=chrome_options)
browser.set_page_load_timeout(20)
browser.implicitly_wait(1)
browser.get(settings.HARNESS_URL)
browser.maximize_window()
self._browser = browser
if not wait_until(lambda: 'Thread' in browser.title, 30):
self.assertIn('Thread', browser.title)
return True
except Exception as e:
logger.info('Init chrome error: {0}'.format(type(e).__name__))
return False
def _destroy_browser(self):
"""Close the browser.
"""
if self._browser:
self._browser.close()
self._browser = None
def _init_rf_shield(self):
if getattr(settings, 'SHIELD_CONTROLLER_TYPE', None) and getattr(settings, 'SHIELD_CONTROLLER_PARAMS', None):
self.rf_shield = get_rf_shield_controller(shield_type=settings.SHIELD_CONTROLLER_TYPE,
params=settings.SHIELD_CONTROLLER_PARAMS)
else:
self.rf_shield = None
def _destroy_rf_shield(self):
self.rf_shield = None
def setUp(self):
"""Prepare to run test case.
Start harness service, init golden devices, reset DUT and open browser.
"""
if self.__class__ is HarnessCase:
return
logger.info('Setting up')
# clear files
logger.info('Deleting all .pcapng')
os.system('del /q "%s\\Captures\\*.pcapng"' % settings.HARNESS_HOME)
logger.info('Empty files in Logs')
os.system('del /q "%s\\Logs\\*.*"' % settings.HARNESS_HOME)
# using temp files to fix excel downloading fail
if self.new_th:
logger.info('Empty files in Reports')
os.system('del /q "%s\\Reports\\*.*"' % settings.HARNESS_HOME)
else:
logger.info('Empty files in temps')
os.system('del /q "%s\\Thread_Harness\\temp\\*.*"' % settings.HARNESS_HOME)
# create directory
os.system('mkdir %s' % self.result_dir)
self._init_harness()
self._init_devices()
self._init_dut()
self._init_rf_shield()
def tearDown(self):
"""Clean up after each case.
Stop harness service, close browser and close DUT.
"""
if self.__class__ is HarnessCase:
return
logger.info('Tearing down')
self._destroy_harness()
self._destroy_browser()
self._destroy_dut()
self._destroy_rf_shield()
def _setup_page(self):
"""Do sniffer settings and general settings
"""
if not self.started:
self.started = time.time()
# Detect Sniffer
try:
dialog = self._browser.find_element_by_id('capture-Setup-modal')
except BaseException:
logger.exception('Failed to get dialog.')
else:
if dialog and dialog.get_attribute('aria-hidden') == 'false':
times = 100
while times:
status = dialog.find_element_by_class_name('status-notify').text
if 'Searching' in status:
logger.info('Still detecting..')
elif 'Not' in status:
logger.warning('Sniffer device not verified!')
button = dialog.find_element_by_id('snifferAutoDetectBtn')
button.click()
elif 'Verified' in status:
logger.info('Verified!')
button = dialog.find_element_by_id('saveCaptureSettings')
button.click()
break
else:
logger.warning('Unexpected sniffer verification status')
times = times - 1
time.sleep(1)
if not times:
raise Exception('Unable to detect sniffer device')
time.sleep(1)
try:
skip_button = self._browser.find_element_by_id('SkipPrepareDevice')
if skip_button.is_enabled():
skip_button.click()
time.sleep(1)
except BaseException:
logger.info('Still detecting sniffers')
try:
next_button = self._browser.find_element_by_id('nextButton')
except BaseException:
logger.exception('Failed to finish setup')
return
if not next_button.is_enabled():
logger.info('Harness is still not ready')
return
# General Setup
try:
if self.child_timeout or self.sed_polling_interval:
logger.info('finding general Setup button')
button = self._browser.find_element_by_id('general-Setup')
button.click()
time.sleep(2)
dialog = self._browser.find_element_by_id('general-Setup-modal')
if dialog.get_attribute('aria-hidden') != 'false':
raise Exception('Missing General Setup dialog')
field = dialog.find_element_by_id('inp_general_child_update_wait_time')
field.clear()
if self.child_timeout:
field.send_keys(str(self.child_timeout))
field = dialog.find_element_by_id('inp_general_sed_polling_rate')
field.clear()
if self.sed_polling_interval:
field.send_keys(str(self.sed_polling_interval))
button = dialog.find_element_by_id('saveGeneralSettings')
button.click()
time.sleep(1)
except BaseException:
logger.info('general setup exception')
logger.exception('Failed to do general setup')
return
# Finish this page
next_button.click()
time.sleep(1)
def _connect_devices(self):
connect_all = self._browser.find_element_by_link_text('Connect All')
connect_all.click()
def _add_device(self, port, device_type_id):
browser = self._browser
test_bed = browser.find_element_by_id('test-bed')
device = browser.find_element_by_id(device_type_id)
# drag
action_chains = ActionChains(browser)
action_chains.click_and_hold(device)
action_chains.move_to_element(test_bed).perform()
time.sleep(1)
# drop
drop_hw = browser.find_element_by_class_name('drop-hw')
action_chains = ActionChains(browser)
action_chains.move_to_element(drop_hw)
action_chains.release(drop_hw).perform()
time.sleep(0.5)
selected_hw = browser.find_element_by_class_name('selected-hw')
form_inputs = selected_hw.find_elements_by_tag_name('input')
form_port = form_inputs[0]
form_port.clear()
form_port.send_keys(port)
def _test_bed(self):
"""Set up the test bed.
Connect number of golden devices required by each case.
"""
browser = self._browser
test_bed = browser.find_element_by_id('test-bed')
time.sleep(3)
selected_hw_set = test_bed.find_elements_by_class_name('selected-hw')
selected_hw_num = len(selected_hw_set)
while selected_hw_num:
remove_button = selected_hw_set[selected_hw_num - 1].find_element_by_class_name('removeSelectedDevice')
remove_button.click()
selected_hw_num = selected_hw_num - 1
devices = [
device for device in settings.GOLDEN_DEVICES if not self.history.is_bad_golden_device(device[0]) and
not (settings.DUT_DEVICE and device[0] == settings.DUT_DEVICE[0])
]
logger.info('Available golden devices: %s', json.dumps(devices, indent=2))
shield_devices = [
shield_device for shield_device in settings.SHIELD_GOLDEN_DEVICES
if not self.history.is_bad_golden_device(shield_device[0]) and
not (settings.DUT2_DEVICE and shield_device[0] == settings.DUT2_DEVICE[0])
]
logger.info('Available shield golden devices: %s', json.dumps(shield_devices, indent=2))
golden_devices_required = self.golden_devices_required
dut_device = ()
if settings.DUT_DEVICE:
dut_device = settings.DUT_DEVICE
"""check if test case needs to use RF-shield box and its device order in Testbed page
Two parameters case_need_shield & device_order should be set in the case script
according to the requires: https://openthread.io/certification/test-cases#rf_shielding
Example:
In case script leader_9_2_9.py:
case_need_shield = True
device_order = [('Router_2', False), ('Commissioner', True), ('Router_1', False), ('DUT', True)]
On the TestBed page of the Test Harness, the device sort order for Leader_9_2_9
should be like:
Router_2
Commissioner
Router_1
DUT
The ('Commissioner', True) and ('DUT', True) indicate Commissioner device and DUT2 device should
be in the RF-box and choose from SHIELD_GOLDEN_DEVICES and DUT2_DEVICE. Otherwise ('DUT', False) means
DUT device is not in RF-box and use DUT_DEVICE. The other roles devices with False should be selected
from GOLDEN_DEVICES.
In case script med_6_3_2.py:
case_need_shield = True
device_order = [] # or not defined
means no device drag order. DUT2_DEVICE should be applied as DUT and the other golden devices
are from GOLDEN_DEVICES.
"""
if self.case_need_shield:
if not settings.DUT2_DEVICE:
logger.info('Must set DUT2_DEVICE')
raise FailError('DUT2_DEVICE must be set in settings.py')
if isinstance(self.device_order, list) and self.device_order:
logger.info('case %s devices ordered by %s ', self.case, self.device_order)
else:
logger.info('case %s uses %s as DUT', self.case, settings.DUT2_DEVICE)
# for test bed with multi-vendor devices
if settings.MIXED_DEVICE_TYPE:
topo_file = settings.HARNESS_HOME + "\\Thread_Harness\\TestScripts\\TopologyConfig.txt"
try:
f_topo = open(topo_file, 'r')
except IOError:
logger.info('%s can NOT be found', topo_file)
raise GoldenDeviceNotEnoughError()
topo_mixed_devices = []
try:
while True:
topo_line = f_topo.readline().strip()
if re.match(r'#.*', topo_line):
continue
match_line = re.match(r'(.*)-(.*)', topo_line, re.M | re.I)
if not match_line:
continue
case_id = match_line.group(1)
if re.sub(r'\.', ' ', case_id) == self.case:
logger.info('Get line by case %s: %s', case_id, topo_line)
topo_device_list = re.split(',', match_line.group(2))
for i in range(len(topo_device_list)):
topo_device = re.split(':', topo_device_list[i])
topo_mixed_devices.append(tuple(topo_device))
break
else:
continue
except Exception as e:
logger.info('Get devices from topology config file error: %s', e)
raise GoldenDeviceNotEnoughError()
logger.info('Golden devices in topology config file for case %s: %s', case_id, topo_mixed_devices)
f_topo.close()
golden_device_candidates = []
missing_golden_devices = topo_mixed_devices[:]
# mapping topology config devices with golden devices by device order
if self.case_need_shield and self.device_order:
matched_dut = False
for device_order_item in self.device_order:
matched = False
for mixed_device_item in topo_mixed_devices:
# mapping device in device_order which needs to be shielded
if device_order_item[1]:
if 'DUT' in device_order_item[0]:
golden_device_candidates.append(settings.DUT2_DEVICE)
dut_device = settings.DUT2_DEVICE
matched_dut = True
matched = True
break
for device_item in shield_devices:
if (device_order_item[0] == mixed_device_item[0] and
mixed_device_item[1] == device_item[1]):
golden_device_candidates.append(device_item)
shield_devices.remove(device_item)
matched = True
break
# mapping device in device_order which does not need to be shielded
else:
if 'DUT' in device_order_item[0]:
golden_device_candidates.append(settings.DUT_DEVICE)
matched_dut = True
matched = True
break
for device_item in devices:
if (device_order_item[0] == mixed_device_item[0] and
mixed_device_item[1] == device_item[1]):
golden_device_candidates.append(device_item)
devices.remove(device_item)
matched = True
break
if not matched:
logger.info('Golden device not enough in : no %s', device_order_item)
raise GoldenDeviceNotEnoughError()
if not matched_dut:
raise FailError('Failed to find DUT in device_order')
devices = golden_device_candidates
self.add_all_devices = True
else:
for mixed_device_item in topo_mixed_devices:
for device_item in devices:
if mixed_device_item[1] == device_item[1]:
golden_device_candidates.append(device_item)
devices.remove(device_item)
missing_golden_devices.remove(mixed_device_item)
break
logger.info('Golden devices in topology config file mapped in settings : %s', golden_device_candidates)
if len(topo_mixed_devices) != len(golden_device_candidates):
device_dict = dict()
for missing_device in missing_golden_devices:
if missing_device[1] in device_dict:
device_dict[missing_device[1]] += 1
else:
device_dict[missing_device[1]] = 1
logger.info('Missing Devices: %s', device_dict)
raise GoldenDeviceNotEnoughError()
else:
devices = golden_device_candidates
golden_devices_required = len(devices)
logger.info('All case-needed golden devices: %s', json.dumps(devices, indent=2))
# for test bed with single vendor devices
else:
golden_device_candidates = []
if self.case_need_shield and self.device_order:
matched_dut = False
for device_order_item in self.device_order:
matched = False
# choose device which needs to be shielded
if device_order_item[1]:
if 'DUT' in device_order_item[0]:
golden_device_candidates.append(settings.DUT2_DEVICE)
dut_device = settings.DUT2_DEVICE
matched_dut = True
matched = True
else:
for device_item in shield_devices:
golden_device_candidates.append(device_item)
shield_devices.remove(device_item)
matched = True
break
# choose device which does not need to be shielded
else:
if 'DUT' in device_order_item[0]:
golden_device_candidates.append(settings.DUT_DEVICE)
matched_dut = True
matched = True
else:
for device_item in devices:
golden_device_candidates.append(device_item)
devices.remove(device_item)
matched = True
break
if not matched:
logger.info('Golden device not enough in : no %s', device_order_item)
raise GoldenDeviceNotEnoughError()
if not matched_dut:
raise FailError('Failed to find DUT in device_order')
devices = golden_device_candidates
self.add_all_devices = True
if self.auto_dut and not settings.DUT_DEVICE:
if settings.MIXED_DEVICE_TYPE:
logger.info('Must set DUT_DEVICE')
raise FailError('DUT_DEVICE must be set for mixed testbed')
golden_devices_required += 1
if len(devices) < golden_devices_required:
raise GoldenDeviceNotEnoughError()
# add golden devices
number_of_devices_to_add = len(devices) if self.add_all_devices else golden_devices_required
for i in range(number_of_devices_to_add):
self._add_device(*devices.pop())
# add DUT
if self.case_need_shield:
if not self.device_order:
self._add_device(*settings.DUT2_DEVICE)
else:
if settings.DUT_DEVICE:
self._add_device(*settings.DUT_DEVICE)
# enable AUTO DUT
if self.auto_dut:
checkbox_auto_dut = browser.find_element_by_id('EnableAutoDutSelection')
if not checkbox_auto_dut.is_selected():
checkbox_auto_dut.click()
time.sleep(1)
if settings.DUT_DEVICE:
radio_auto_dut = browser.find_element_by_class_name('AutoDUT_RadBtns')
if not radio_auto_dut.is_selected() and not self.device_order:
radio_auto_dut.click()
if self.device_order:
selected_hw_set = test_bed.find_elements_by_class_name('selected-hw')
for selected_hw in selected_hw_set:
form_inputs = selected_hw.find_elements_by_tag_name('input')
form_port = form_inputs[0]
port = form_port.get_attribute('value').encode('utf8')
if port == dut_device[0]:
radio_auto_dut = selected_hw.find_element_by_class_name('AutoDUT_RadBtns')
if not radio_auto_dut.is_selected():
radio_auto_dut.click()
while True:
try:
self._connect_devices()
button_next = browser.find_element_by_id('nextBtn')
if not wait_until(
lambda: 'disabled' not in button_next.get_attribute('class'),
times=(30 + 4 * number_of_devices_to_add),
):
bad_ones = []
selected_hw_set = test_bed.find_elements_by_class_name('selected-hw')
for selected_hw in selected_hw_set:
form_inputs = selected_hw.find_elements_by_tag_name('input')
form_port = form_inputs[0]
if form_port.is_enabled():
bad_ones.append(selected_hw)
for selected_hw in bad_ones:
form_inputs = selected_hw.find_elements_by_tag_name('input')
form_port = form_inputs[0]
port = form_port.get_attribute('value').encode('utf8')
if port == dut_device[0]:
if settings.PDU_CONTROLLER_TYPE is None:
# connection error cannot recover without power
# cycling
raise FatalError('Failed to connect to DUT')
else:
raise FailError('Failed to connect to DUT')
if settings.PDU_CONTROLLER_TYPE is None:
# port cannot recover without power cycling
self.history.mark_bad_golden_device(port)
# remove the bad one
selected_hw.find_element_by_class_name('removeSelectedDevice').click()
time.sleep(0.1)
if len(devices):
self._add_device(*devices.pop())
else:
devices = None
if devices is None:
logger.warning('Golden devices not enough')
raise GoldenDeviceNotEnoughError()
else:
logger.info('Try again with new golden devices')
continue
if self.auto_dut and not settings.DUT_DEVICE:
radio_auto_dut = browser.find_element_by_class_name('AutoDUT_RadBtns')
if not radio_auto_dut.is_selected():
radio_auto_dut.click()
time.sleep(5)
button_next.click()
if not wait_until(lambda: self._browser.current_url.endswith('TestExecution.html'), 20):
raise Exception('Failed to load TestExecution page')
except FailError:
raise
except BaseException:
logger.exception('Unexpected error')
else:
break
def _select_case(self, role, case):
"""Select the test case.
"""
# select the case
elem = Select(self._browser.find_element_by_id('select-dut'))
elem.select_by_value(str(role))
time.sleep(1)
checkbox = None
wait_until(lambda: self._browser.find_elements_by_css_selector('.tree-node .tree-title') and True)
elems = self._browser.find_elements_by_css_selector('.tree-node .tree-title')
finder = re.compile(r'.*\b' + case + r'\b')
finder_dotted = re.compile(r'.*\b' + case.replace(' ', r'\.') + r'\b')
for elem in elems:
# elem.txt might be null when the required reference devices could
# not be met (either due to the quantity or the type) for specific test.
# perform() will throw exceptions if elem.text is null since Chrome
# and chromedriver 80. If elem is not shown in current case list
# window, move_to_element() will not work either.
if not elem.text:
continue
# execute a javascript to scroll the window to the elem
self._browser.execute_script('arguments[0].scrollIntoView();', elem)
action_chains = ActionChains(self._browser)
action_chains.move_to_element(elem)
action_chains.perform()
logger.debug(elem.text)
if finder.match(elem.text) or finder_dotted.match(elem.text):
parent = elem.find_element_by_xpath('..')
checkbox = parent.find_element_by_class_name('tree-checkbox')
break
if not checkbox:
time.sleep(5)
raise Exception('Failed to find the case')
self._browser.execute_script("$('.overview').css('left', '0')")
checkbox.click()
time.sleep(1)
elem = self._browser.find_element_by_id('runTest')
elem.click()
if not wait_until(lambda: self._browser.find_element_by_id('stopTest') and True, 10):
raise Exception('Failed to start test case')
def _collect_result(self):
"""Collect test result.
Copy PDF and pcap file to result directory
"""
if self.new_th:
os.system('copy "%s\\Reports\\*.*" "%s"' % (settings.HARNESS_HOME, self.result_dir))
else:
os.system('copy "%s\\Thread_Harness\\temp\\*.*" "%s"' % (settings.HARNESS_HOME, self.result_dir))
os.system('copy "%s\\Captures\\*.pcapng" %s\\' % (settings.HARNESS_HOME, self.result_dir))
def _wait_dialog(self):
"""Wait for dialogs and handle them until done.
"""
logger.debug('waiting for dialog')
done = False
error = False
logger.info('self timeout %d', self.timeout)
while not done and self.timeout:
try:
dialog = self._browser.find_element_by_id('RemoteConfirm')
except BaseException:
logger.exception('Failed to get dialog.')
else:
if dialog and dialog.get_attribute('aria-hidden') == 'false':
title = dialog.find_element_by_class_name('modal-title').text
time.sleep(1)
logger.info('Handling dialog[%s]', title)
try:
done = self._handle_dialog(dialog, title)
except BaseException:
logger.exception('Error handling dialog: %s', title)
error = True
if done is None:
raise FailError('Unexpected dialog occurred')
dialog.find_element_by_id('ConfirmOk').click()
time.sleep(1)
try:
stop_button = self._browser.find_element_by_id('stopTest')
if done:
stop_button.click()
# wait for stop procedure end
time.sleep(10)
except NoSuchElementException:
logger.info('Test stopped')
time.sleep(5)
done = True
self.timeout -= 1
# check if already ended capture
if self.timeout % 10 == 0:
lines = self._hc.tail()
if 'SUCCESS: The process "dumpcap.exe" with PID ' in lines:
logger.info('Tshark should be ended now, lets wait at most 30 seconds.')
if not wait_until(lambda: 'tshark.exe' not in subprocess.check_output('tasklist'), 30):
res = subprocess.check_output('taskkill /t /f /im tshark.exe',
stderr=subprocess.STDOUT,
shell=True)
logger.info(res)
# Wait until case really stopped
wait_until(lambda: self._browser.find_element_by_id('runTest') and True, 30)
if error:
raise FailError('Fail for previous exceptions')
def _handle_dialog(self, dialog, title):
"""Handle a dialog.
Returns:
bool True if no more dialogs expected,
False if more dialogs needed, and
None if not handled
"""
done = self.on_dialog(dialog, title)
if isinstance(done, bool):
return done
if title.startswith('Start DUT'):
body = dialog.find_element_by_id('cnfrmMsg').text
if 'Sleepy End Device' in body:
self.dut.mode = 's'
self.dut.child_timeout = self.child_timeout
elif 'End Device' in body:
self.dut.mode = 'rn'
self.dut.child_timeout = self.child_timeout
else:
self.dut.mode = 'rdn'
if 'at channel' in body:
self.channel = int(body.split(':')[1])
self.dut.channel = self.channel
self.dut.panid = settings.THREAD_PANID
self.dut.networkname = settings.THREAD_NETWORKNAME
self.dut.extpanid = settings.THREAD_EXTPANID
self.dut.start()
elif title.startswith('MAC Address Required') or title.startswith('DUT Random Extended MAC Address Required'):
mac = self.dut.mac
inp = dialog.find_element_by_id('cnfrmInpText')
inp.clear()
inp.send_keys('0x%s' % mac)
elif title.startswith('LL64 Address'):
ll64 = None
for addr in self.dut.addrs:
addr = addr.lower()
if addr.startswith('fe80') and not re.match('.+ff:fe00:[0-9a-f]{0,4}$', addr):
ll64 = addr
break
if not ll64:
raise FailError('No link local address found')
logger.info('Link local address is %s', ll64)
inp = dialog.find_element_by_id('cnfrmInpText')
inp.clear()
inp.send_keys(ll64)
elif title.startswith('Enter Channel'):
self.dut.channel = self.channel
inp = dialog.find_element_by_id('cnfrmInpText')
inp.clear()
inp.send_keys(str(self.dut.channel))
elif title.startswith('User Action Needed'):
body = dialog.find_element_by_id('cnfrmMsg').text
if body.startswith('Power Down the DUT'):
self.dut.stop()
return True
elif title.startswith('Short Address'):
short_addr = '0x%s' % self.dut.short_addr
inp = dialog.find_element_by_id('cnfrmInpText')
inp.clear()
inp.send_keys(short_addr)
elif title.startswith('ML64 Address'):
ml64 = None
for addr in self.dut.addrs:
if addr.startswith('fd') and not re.match('.+ff:fe00:[0-9a-f]{0,4}$', addr):
ml64 = addr
break
if not ml64:
raise Exception('No mesh local address found')
logger.info('Mesh local address is %s', ml64)
inp = dialog.find_element_by_id('cnfrmInpText')
inp.clear()
inp.send_keys(ml64)
elif title.startswith('Shield Devices') or title.startswith('Shield DUT'):
time.sleep(2)
if self.rf_shield:
logger.info('Shielding devices')
with self.rf_shield:
self.rf_shield.shield()
elif self.dut and settings.SHIELD_SIMULATION:
self.dut.channel = (self.channel == THREAD_CHANNEL_MAX and THREAD_CHANNEL_MIN) or (self.channel + 1)
else:
input('Shield DUT and press enter to continue..')
elif title.startswith('Unshield Devices') or title.startswith('Bring DUT back to network'):
time.sleep(5)
if self.rf_shield:
logger.info('Unshielding devices')
with self.rf_shield:
self.rf_shield.unshield()
elif self.dut and settings.SHIELD_SIMULATION:
self.dut.channel = self.channel
else:
input('Bring DUT and press enter to continue..')
elif title.startswith('Configure Prefix on DUT'):
body = dialog.find_element_by_id('cnfrmMsg').text
body = body.split(': ')[1]
params = reduce(
lambda params, param: params.update(((param[0].strip(' '), param[1]),)) or params,
[it.split('=') for it in body.split(', ')],
{},
)
prefix = params['P_Prefix'].strip('\0\r\n\t ')
flags = []
if params.get('P_slaac_preferred', 0) == '1':
flags.append('p')
flags.append('ao')
if params.get('P_stable', 0) == '1':
flags.append('s')
if params.get('P_default', 0) == '1':
flags.append('r')
prf = 'high'
self.dut.add_prefix(prefix, ''.join(flags), prf)
return False
def test(self):
"""This method will only start test case in child class"""
if self.__class__ is HarnessCase:
logger.warning('Skip this harness itself')
return
logger.info('Testing role[%d] case[%s]', self.role, self.case)
init_browser_times = 5
while True:
if self._init_browser():
break
elif init_browser_times > 0:
init_browser_times -= 1
self._destroy_browser()
else:
raise SystemExit()
try:
# prepare test case
while True:
url = self._browser.current_url
if url.endswith('SetupPage.html'):
self._setup_page()
elif url.endswith('TestBed.html'):
self._test_bed()
elif url.endswith('TestExecution.html'):
logger.info('Ready to handle dialogs')
break
time.sleep(2)
except UnexpectedAlertPresentException:
logger.exception('Failed to connect to harness server')
raise SystemExit()
except FatalError:
logger.exception('Test stopped for fatal error')
raise SystemExit()
except FailError:
logger.exception('Test failed')
raise
except BaseException:
logger.exception('Something wrong')
self._select_case(self.role, self.case)
logger.info('start to wait test process end')
self._wait_dialog()
try:
self._collect_result()
except BaseException:
logger.exception('Failed to collect results')
raise
# get case result
status = self._browser.find_element_by_class_name('title-test').get_attribute('innerText')
logger.info(status)
success = 'Pass' in status
self.assertTrue(success)