blob: 62d6c9aec9f5d5c231ea89f8d27a0f900fcb5ac4 [file] [log] [blame]
#!/usr/bin/env python
#
# Copyright (c) 2016, The OpenThread Authors.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# 3. Neither the name of the copyright holder nor the
# names of its contributors may be used to endorse or promote products
# derived from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
import ConfigParser
import json
import logging
import os
import subprocess
import re
import time
import unittest
from builtins import input
from functools import reduce
from selenium import webdriver
from selenium.webdriver import ActionChains
from selenium.webdriver.support.ui import Select
from selenium.common.exceptions import UnexpectedAlertPresentException
from selenium.common.exceptions import NoSuchElementException
from autothreadharness import settings
from autothreadharness.exceptions import (
FailError,
FatalError,
GoldenDeviceNotEnoughError,
)
from autothreadharness.harness_controller import HarnessController
from autothreadharness.helpers import HistoryHelper
from autothreadharness.open_thread_controller import OpenThreadController
from autothreadharness.pdu_controller_factory import PduControllerFactory
from autothreadharness.rf_shield_controller import get_rf_shield_controller
logger = logging.getLogger(__name__)
THREAD_CHANNEL_MAX = 26
"""Maximum channel number of thread protocol"""
THREAD_CHANNEL_MIN = 11
"""Minimum channel number of thread protocol"""
DEFAULT_TIMEOUT = 2700
"""Timeout for each test case in seconds"""
def wait_until(what, times=-1):
"""Wait until `what` return True
Args:
what (Callable[bool]): Call `wait()` again and again until it returns True
times (int): Maximum times of trials before giving up
Returns:
True if success, False if times threshold reached
"""
while times:
logger.info('Waiting times left %d', times)
try:
if what() is True:
return True
except BaseException:
logger.exception('Wait failed')
else:
logger.warning('Trial[%d] failed', times)
times -= 1
time.sleep(1)
return False
class HarnessCase(unittest.TestCase):
"""This is the case class of all automation test cases.
All test case classes MUST define properties `role`, `case` and `golden_devices_required`
"""
channel = settings.THREAD_CHANNEL
"""int: Thread channel.
Thread channel ranges from 11 to 26.
"""
ROLE_LEADER = 1
ROLE_ROUTER = 2
ROLE_SED = 4
ROLE_BORDER = 8
ROLE_REED = 16
ROLE_ED = 32
ROLE_COMMISSIONER = 64
ROLE_JOINER = 128
ROLE_FED = 512
ROLE_MED = 1024
role = None
"""int: role id.
1
Leader
2
Router
4
Sleepy end device
16
Router eligible end device
32
End device
64
Commissioner
128
Joiner
512
Full end device
1024
Minimal end device
"""
case = None
"""str: Case id, e.g. '6 5 1'.
"""
golden_devices_required = 0
"""int: Golden devices needed to finish the test
"""
child_timeout = settings.THREAD_CHILD_TIMEOUT
"""int: Child timeout in seconds
"""
sed_polling_interval = settings.THREAD_SED_POLLING_INTERVAL
"""int: SED polling interval in seconds
"""
auto_dut = settings.AUTO_DUT
"""bool: whether use harness auto dut feature"""
timeout = (
hasattr(settings, 'TIMEOUT') and settings.TIMEOUT or DEFAULT_TIMEOUT
)
"""number: timeout in seconds to stop running this test case"""
started = 0
"""number: test case started timestamp"""
def __init__(self, *args, **kwargs):
self.dut = None
self._browser = None
self._hc = None
self.result_dir = '%s\\%s' % (
settings.OUTPUT_PATH,
self.__class__.__name__,
)
self.history = HistoryHelper()
self.add_all_devices = False
super(HarnessCase, self).__init__(*args, **kwargs)
def _init_devices(self):
"""Reboot all usb devices.
Note:
If PDU_CONTROLLER_TYPE is not valid, usb devices is not rebooted.
"""
if not settings.PDU_CONTROLLER_TYPE:
if settings.AUTO_DUT:
return
for device in settings.GOLDEN_DEVICES:
port, _ = device
try:
with OpenThreadController(port) as otc:
logger.info('Resetting %s', port)
otc.reset()
except BaseException:
logger.exception('Failed to reset device %s', port)
self.history.mark_bad_golden_device(device)
return
tries = 3
pdu_factory = PduControllerFactory()
while True:
try:
pdu = pdu_factory.create_pdu_controller(
settings.PDU_CONTROLLER_TYPE
)
pdu.open(**settings.PDU_CONTROLLER_OPEN_PARAMS)
except EOFError:
logger.warning('Failed to connect to telnet')
tries = tries - 1
if tries:
time.sleep(10)
continue
else:
logger.error('Fatal error: cannot connect to apc')
raise
else:
pdu.reboot(**settings.PDU_CONTROLLER_REBOOT_PARAMS)
pdu.close()
break
time.sleep(len(settings.GOLDEN_DEVICES))
def _init_harness(self):
"""Restart harness backend service.
Please start the harness controller before running the cases, otherwise, nothing happens
"""
self._hc = HarnessController(self.result_dir)
self._hc.stop()
time.sleep(1)
self._hc.start()
time.sleep(2)
harness_config = ConfigParser.ConfigParser()
harness_config.read(
'%s\\Config\\Configuration.ini' % settings.HARNESS_HOME
)
if harness_config.has_option(
'THREAD_HARNESS_CONFIG', 'BrowserAutoNavigate'
) and harness_config.getboolean(
'THREAD_HARNESS_CONFIG', 'BrowserAutoNavigate'
):
logger.error(
'BrowserAutoNavigate in Configuration.ini should be False'
)
raise FailError(
'BrowserAutoNavigate in Configuration.ini should be False'
)
if settings.MIXED_DEVICE_TYPE:
if harness_config.has_option(
'THREAD_HARNESS_CONFIG', 'EnableDeviceSelection'
) and not harness_config.getboolean(
'THREAD_HARNESS_CONFIG', 'EnableDeviceSelection'
):
logger.error(
'EnableDeviceSelection in Configuration.ini should be True'
)
raise FailError(
'EnableDeviceSelection in Configuration.ini should be True'
)
def _destroy_harness(self):
"""Stop harness backend service
Stop harness service.
"""
self._hc.stop()
time.sleep(2)
def _init_dut(self):
"""Initialize the DUT.
DUT will be restarted. and openthread will started.
"""
if self.auto_dut:
self.dut = None
return
dut_port = settings.DUT_DEVICE[0]
dut = OpenThreadController(dut_port)
self.dut = dut
def _destroy_dut(self):
self.dut = None
def _init_browser(self):
"""Open harness web page.
Open a quiet chrome which:
1. disables extensions,
2. ignore certificate errors and
3. always allow notifications.
"""
chrome_options = webdriver.ChromeOptions()
chrome_options.add_argument('--disable-extensions')
chrome_options.add_argument('--disable-infobars')
chrome_options.add_argument('--ignore-certificate-errors')
chrome_options.add_experimental_option(
'prefs',
{'profile.managed_default_content_settings.notifications': 1},
)
browser = webdriver.Chrome(chrome_options=chrome_options)
browser.set_page_load_timeout(10)
browser.implicitly_wait(1)
browser.maximize_window()
browser.get(settings.HARNESS_URL)
self._browser = browser
if not wait_until(lambda: 'Thread' in browser.title, 30):
self.assertIn('Thread', browser.title)
def _destroy_browser(self):
"""Close the browser.
"""
self._browser.close()
self._browser = None
def _init_rf_shield(self):
if getattr(settings, 'SHIELD_CONTROLLER_TYPE', None) and getattr(
settings, 'SHIELD_CONTROLLER_PARAMS', None
):
self.rf_shield = get_rf_shield_controller(
shield_type=settings.SHIELD_CONTROLLER_TYPE,
params=settings.SHIELD_CONTROLLER_PARAMS,
)
else:
self.rf_shield = None
def _destroy_rf_shield(self):
self.rf_shield = None
def setUp(self):
"""Prepare to run test case.
Start harness service, init golden devices, reset DUT and open browser.
"""
if self.__class__ is HarnessCase:
return
logger.info('Setting up')
# clear files
logger.info('Deleting all .pdf')
os.system('del /q "%HOMEDRIVE%%HOMEPATH%\\Downloads\\NewPdf_*.pdf"')
logger.info('Deleting all .xlsx')
os.system(
'del /q "%HOMEDRIVE%%HOMEPATH%\\Downloads\\ExcelReport*.xlsx"'
)
logger.info('Deleting all .pcapng')
os.system('del /q "%s\\Captures\\*.pcapng"' % settings.HARNESS_HOME)
# using temp files to fix excel downloading fail
logger.info('Empty files in temps')
os.system(
'del /q "%s\\Thread_Harness\\temp\\*.*"' % settings.HARNESS_HOME
)
# create directory
os.system('mkdir %s' % self.result_dir)
self._init_harness()
self._init_devices()
self._init_dut()
self._init_rf_shield()
def tearDown(self):
"""Clean up after each case.
Stop harness service, close browser and close DUT.
"""
if self.__class__ is HarnessCase:
return
logger.info('Tearing down')
self._destroy_harness()
self._destroy_browser()
self._destroy_dut()
self._destroy_rf_shield()
def _setup_page(self):
"""Do sniffer settings and general settings
"""
if not self.started:
self.started = time.time()
if time.time() - self.started > 5 * len(settings.GOLDEN_DEVICES):
self._browser.refresh()
return
# Detect Sniffer
try:
dialog = self._browser.find_element_by_id('capture-Setup-modal')
except BaseException:
logger.exception('Failed to get dialog.')
else:
if dialog and dialog.get_attribute('aria-hidden') == 'false':
times = 100
while times:
status = dialog.find_element_by_class_name(
'status-notify'
).text
if 'Searching' in status:
logger.info('Still detecting..')
elif 'Not' in status:
logger.warning('Sniffer device not verified!')
button = dialog.find_element_by_id(
'snifferAutoDetectBtn'
)
button.click()
elif 'Verified' in status:
logger.info('Verified!')
button = dialog.find_element_by_id(
'saveCaptureSettings'
)
button.click()
break
else:
logger.warning(
'Unexpected sniffer verification status'
)
times = times - 1
time.sleep(1)
if not times:
raise Exception('Unable to detect sniffer device')
time.sleep(1)
try:
skip_button = self._browser.find_element_by_id('SkipPrepareDevice')
if skip_button.is_enabled():
skip_button.click()
time.sleep(1)
except BaseException:
logger.info('Still detecting sniffers')
try:
next_button = self._browser.find_element_by_id('nextButton')
except BaseException:
logger.exception('Failed to finish setup')
return
if not next_button.is_enabled():
logger.info('Harness is still not ready')
return
# General Setup
try:
if self.child_timeout or self.sed_polling_interval:
logger.info('finding general Setup button')
button = self._browser.find_element_by_id('general-Setup')
button.click()
time.sleep(2)
dialog = self._browser.find_element_by_id(
'general-Setup-modal'
)
if dialog.get_attribute('aria-hidden') != 'false':
raise Exception('Missing General Setup dialog')
field = dialog.find_element_by_id(
'inp_general_child_update_wait_time'
)
field.clear()
if self.child_timeout:
field.send_keys(str(self.child_timeout))
field = dialog.find_element_by_id(
'inp_general_sed_polling_rate'
)
field.clear()
if self.sed_polling_interval:
field.send_keys(str(self.sed_polling_interval))
button = dialog.find_element_by_id('saveGeneralSettings')
button.click()
time.sleep(1)
except BaseException:
logger.info('general setup exception')
logger.exception('Failed to do general setup')
return
# Finish this page
next_button.click()
time.sleep(1)
def _connect_devices(self):
connect_all = self._browser.find_element_by_link_text('Connect All')
connect_all.click()
def _add_device(self, port, device_type_id):
browser = self._browser
test_bed = browser.find_element_by_id('test-bed')
device = browser.find_element_by_id(device_type_id)
# drag
action_chains = ActionChains(browser)
action_chains.click_and_hold(device)
action_chains.move_to_element(test_bed).perform()
time.sleep(1)
# drop
drop_hw = browser.find_element_by_class_name('drop-hw')
action_chains = ActionChains(browser)
action_chains.move_to_element(drop_hw)
action_chains.release(drop_hw).perform()
time.sleep(0.5)
selected_hw = browser.find_element_by_class_name('selected-hw')
form_inputs = selected_hw.find_elements_by_tag_name('input')
form_port = form_inputs[0]
form_port.clear()
form_port.send_keys(port)
def _test_bed(self):
"""Set up the test bed.
Connect number of golden devices required by each case.
"""
browser = self._browser
test_bed = browser.find_element_by_id('test-bed')
time.sleep(3)
selected_hw_set = test_bed.find_elements_by_class_name('selected-hw')
selected_hw_num = len(selected_hw_set)
while selected_hw_num:
remove_button = selected_hw_set[
selected_hw_num - 1
].find_element_by_class_name('removeSelectedDevice')
remove_button.click()
selected_hw_num = selected_hw_num - 1
devices = [
device
for device in settings.GOLDEN_DEVICES
if not self.history.is_bad_golden_device(device[0])
and not (
settings.DUT_DEVICE and device[0] == settings.DUT_DEVICE[0]
)
]
logger.info(
'Available golden devices: %s', json.dumps(devices, indent=2)
)
golden_devices_required = self.golden_devices_required
# for test bed with mixed devices
if settings.MIXED_DEVICE_TYPE:
topo_file = (
settings.HARNESS_HOME
+ "\\Thread_Harness\\TestScripts\\TopologyConfig.txt"
)
try:
f_topo = open(topo_file, 'r')
except IOError:
logger.info('%s can NOT be found', topo_file)
raise GoldenDeviceNotEnoughError()
topo_mixed_devices = []
try:
while True:
topo_line = f_topo.readline().strip()
if re.match(r'#.*', topo_line):
continue
match_line = re.match(r'(.*)-(.*)', topo_line, re.M | re.I)
if not match_line:
continue
case_id = match_line.group(1)
if re.sub(r'\.', ' ', case_id) == self.case:
logger.info(
'Get line by case %s: %s', case_id, topo_line
)
topo_device_list = re.split(',', match_line.group(2))
for i in range(len(topo_device_list)):
topo_device = re.split(':', topo_device_list[i])
topo_mixed_devices.append(tuple(topo_device))
break
else:
continue
except Exception as e:
logger.info(
'Get devices from topology config file error: %s', e
)
raise GoldenDeviceNotEnoughError()
logger.info(
'Golden devices in topology config file for case %s: %s',
case_id,
topo_mixed_devices,
)
f_topo.close()
golden_device_candidates = []
missing_golden_devices = topo_mixed_devices[:]
# mapping topology config devices with devices in settings
for mixed_device_item in topo_mixed_devices:
for device_item in devices:
if mixed_device_item[1] == device_item[1]:
golden_device_candidates.append(device_item)
devices.remove(device_item)
missing_golden_devices.remove(mixed_device_item)
break
logger.info(
'Golden devices in topology config file mapped in settings : %s',
golden_device_candidates,
)
if len(topo_mixed_devices) != len(golden_device_candidates):
device_dict = dict()
for missing_device in missing_golden_devices:
if missing_device[1] in device_dict:
device_dict[missing_device[1]] += 1
else:
device_dict[missing_device[1]] = 1
logger.info('Missing Devices: %s', device_dict)
raise GoldenDeviceNotEnoughError()
else:
devices = golden_device_candidates
golden_devices_required = len(devices)
logger.info(
'All case-needed golden devices: %s',
json.dumps(devices, indent=2),
)
if self.auto_dut and not settings.DUT_DEVICE:
if settings.MIXED_DEVICE_TYPE:
logger.info('Must set DUT_DEVICE')
raise FailError('DUT_DEVICE must be set for mixed testbed')
golden_devices_required += 1
if len(devices) < golden_devices_required:
raise GoldenDeviceNotEnoughError()
# add golden devices
number_of_devices_to_add = (
len(devices) if self.add_all_devices else golden_devices_required
)
for i in range(number_of_devices_to_add):
self._add_device(*devices.pop())
# add DUT
if settings.DUT_DEVICE:
self._add_device(*settings.DUT_DEVICE)
# enable AUTO DUT
if self.auto_dut:
checkbox_auto_dut = browser.find_element_by_id(
'EnableAutoDutSelection'
)
if not checkbox_auto_dut.is_selected():
checkbox_auto_dut.click()
time.sleep(1)
if settings.DUT_DEVICE:
radio_auto_dut = browser.find_element_by_class_name(
'AutoDUT_RadBtns'
)
if not radio_auto_dut.is_selected():
radio_auto_dut.click()
while True:
try:
self._connect_devices()
button_next = browser.find_element_by_id('nextBtn')
if not wait_until(
lambda: 'disabled'
not in button_next.get_attribute('class'),
times=(30 + 4 * number_of_devices_to_add),
):
bad_ones = []
selected_hw_set = test_bed.find_elements_by_class_name(
'selected-hw'
)
for selected_hw in selected_hw_set:
form_inputs = selected_hw.find_elements_by_tag_name(
'input'
)
form_port = form_inputs[0]
if form_port.is_enabled():
bad_ones.append(selected_hw)
for selected_hw in bad_ones:
form_inputs = selected_hw.find_elements_by_tag_name(
'input'
)
form_port = form_inputs[0]
port = form_port.get_attribute('value').encode('utf8')
if (
settings.DUT_DEVICE
and port == settings.DUT_DEVICE[0]
):
if settings.PDU_CONTROLLER_TYPE is None:
# connection error cannot recover without power
# cycling
raise FatalError('Failed to connect to DUT')
else:
raise FailError('Failed to connect to DUT')
if settings.PDU_CONTROLLER_TYPE is None:
# port cannot recover without power cycling
self.history.mark_bad_golden_device(port)
# remove the bad one
selected_hw.find_element_by_class_name(
'removeSelectedDevice'
).click()
time.sleep(0.1)
if len(devices):
self._add_device(*devices.pop())
else:
devices = None
if devices is None:
logger.warning('Golden devices not enough')
raise GoldenDeviceNotEnoughError()
else:
logger.info('Try again with new golden devices')
continue
if self.auto_dut and not settings.DUT_DEVICE:
radio_auto_dut = browser.find_element_by_class_name(
'AutoDUT_RadBtns'
)
if not radio_auto_dut.is_selected():
radio_auto_dut.click()
time.sleep(5)
button_next.click()
if not wait_until(
lambda: self._browser.current_url.endswith(
'TestExecution.html'
),
20,
):
raise Exception('Failed to load TestExecution page')
except FailError:
raise
except BaseException:
logger.exception('Unexpected error')
else:
break
def _select_case(self, role, case):
"""Select the test case.
"""
# select the case
elem = Select(self._browser.find_element_by_id('select-dut'))
elem.select_by_value(str(role))
time.sleep(1)
checkbox = None
wait_until(
lambda: self._browser.find_elements_by_css_selector(
'.tree-node .tree-title'
)
and True
)
elems = self._browser.find_elements_by_css_selector(
'.tree-node .tree-title'
)
finder = re.compile(r'.*\b' + case + r'\b')
finder_dotted = re.compile(r'.*\b' + case.replace(' ', r'\.') + r'\b')
for elem in elems:
action_chains = ActionChains(self._browser)
action_chains.move_to_element(elem)
action_chains.perform()
logger.debug(elem.text)
if finder.match(elem.text) or finder_dotted.match(elem.text):
parent = elem.find_element_by_xpath('..')
checkbox = parent.find_element_by_class_name('tree-checkbox')
break
if not checkbox:
time.sleep(5)
raise Exception('Failed to find the case')
self._browser.execute_script("$('.overview').css('left', '0')")
checkbox.click()
time.sleep(1)
elem = self._browser.find_element_by_id('runTest')
elem.click()
if not wait_until(
lambda: self._browser.find_element_by_id('stopTest') and True, 10
):
raise Exception('Failed to start test case')
def _collect_result(self):
"""Collect test result.
Generate PDF, excel and pcap file
"""
# generate pdf
self._browser.find_element_by_class_name('save-pdf').click()
time.sleep(1)
try:
dialog = self._browser.find_element_by_id('Testinfo')
except BaseException:
logger.exception('Failed to get test info dialog.')
else:
if dialog.get_attribute('aria-hidden') != 'false':
raise Exception('Test information dialog not ready')
version = (
self.auto_dut and settings.DUT_VERSION or self.dut.version
)
dialog.find_element_by_id('inp_dut_manufacturer').send_keys(
settings.DUT_MANUFACTURER
)
dialog.find_element_by_id('inp_dut_firmware_version').send_keys(
version
)
dialog.find_element_by_id('inp_tester_name').send_keys(
settings.TESTER_NAME
)
dialog.find_element_by_id('inp_remarks').send_keys(
settings.TESTER_REMARKS
)
dialog.find_element_by_id('generatePdf').click()
time.sleep(1)
main_window = self._browser.current_window_handle
# generate excel
self._browser.find_element_by_class_name('save-excel').click()
time.sleep(1)
for window_handle in self._browser.window_handles:
if window_handle != main_window:
self._browser.switch_to.window(window_handle)
self._browser.close()
self._browser.switch_to.window(main_window)
# save pcap
self._browser.find_element_by_class_name('save-wireshark').click()
time.sleep(1)
for window_handle in self._browser.window_handles:
if window_handle != main_window:
self._browser.switch_to.window(window_handle)
self._browser.close()
self._browser.switch_to.window(main_window)
os.system(
'copy "%%HOMEPATH%%\\Downloads\\NewPdf_*.pdf" %s\\'
% self.result_dir
)
os.system(
'copy "%%HOMEPATH%%\\Downloads\\ExcelReport_*.xlsx" %s\\'
% self.result_dir
)
os.system(
'copy "%s\\Captures\\*.pcapng" %s\\'
% (settings.HARNESS_HOME, self.result_dir)
)
os.system(
'copy "%s\\Thread_Harness\\temp\\*.*" "%s"'
% (settings.HARNESS_HOME, self.result_dir)
)
def _wait_dialog(self):
"""Wait for dialogs and handle them until done.
"""
logger.debug('waiting for dialog')
done = False
error = False
logger.info("self timeout %d", self.timeout)
while not done and self.timeout:
try:
dialog = self._browser.find_element_by_id('RemoteConfirm')
except BaseException:
logger.exception('Failed to get dialog.')
else:
if dialog and dialog.get_attribute('aria-hidden') == 'false':
title = dialog.find_element_by_class_name(
'modal-title'
).text
time.sleep(1)
logger.info('Handling dialog[%s]', title)
try:
done = self._handle_dialog(dialog, title)
except BaseException:
logger.exception('Error handling dialog: %s', title)
error = True
if done is None:
raise FailError('Unexpected dialog occurred')
dialog.find_element_by_id('ConfirmOk').click()
time.sleep(1)
try:
stop_button = self._browser.find_element_by_id('stopTest')
if done:
stop_button.click()
# wait for stop procedure end
time.sleep(10)
except NoSuchElementException:
logger.info('Test stopped')
time.sleep(5)
done = True
self.timeout -= 1
# check if already ended capture
if self.timeout % 10 == 0:
lines = self._hc.tail()
if 'SUCCESS: The process "dumpcap.exe" with PID ' in lines:
logger.info(
'Tshark should be ended now, lets wait at most 30 seconds.'
)
if not wait_until(
lambda: 'tshark.exe'
not in subprocess.check_output('tasklist'),
30,
):
res = subprocess.check_output(
'taskkill /t /f /im tshark.exe',
stderr=subprocess.STDOUT,
shell=True,
)
logger.info(res)
# Wait until case really stopped
wait_until(
lambda: self._browser.find_element_by_id('runTest') and True, 30
)
if error:
raise FailError('Fail for previous exceptions')
def _handle_dialog(self, dialog, title):
"""Handle a dialog.
Returns:
bool True if no more dialogs expected,
False if more dialogs needed, and
None if not handled
"""
done = self.on_dialog(dialog, title)
if isinstance(done, bool):
return done
if title.startswith('Start DUT'):
body = dialog.find_element_by_id('cnfrmMsg').text
if 'Sleepy End Device' in body:
self.dut.mode = 's'
self.dut.child_timeout = self.child_timeout
elif 'End Device' in body:
self.dut.mode = 'rsn'
self.dut.child_timeout = self.child_timeout
else:
self.dut.mode = 'rsdn'
if 'at channel' in body:
self.channel = int(body.split(':')[1])
self.dut.channel = self.channel
self.dut.panid = settings.THREAD_PANID
self.dut.networkname = settings.THREAD_NETWORKNAME
self.dut.extpanid = settings.THREAD_EXTPANID
self.dut.start()
elif title.startswith('MAC Address Required') or title.startswith(
'DUT Random Extended MAC Address Required'
):
mac = self.dut.mac
inp = dialog.find_element_by_id('cnfrmInpText')
inp.clear()
inp.send_keys('0x%s' % mac)
elif title.startswith('LL64 Address'):
ll64 = None
for addr in self.dut.addrs:
addr = addr.lower()
if addr.startswith('fe80') and not re.match(
'.+ff:fe00:[0-9a-f]{0,4}$', addr
):
ll64 = addr
break
if not ll64:
raise FailError('No link local address found')
logger.info('Link local address is %s', ll64)
inp = dialog.find_element_by_id('cnfrmInpText')
inp.clear()
inp.send_keys(ll64)
elif title.startswith('Enter Channel'):
self.dut.channel = self.channel
inp = dialog.find_element_by_id('cnfrmInpText')
inp.clear()
inp.send_keys(str(self.dut.channel))
elif title.startswith('User Action Needed'):
body = dialog.find_element_by_id('cnfrmMsg').text
if body.startswith('Power Down the DUT'):
self.dut.stop()
return True
elif title.startswith('Short Address'):
short_addr = '0x%s' % self.dut.short_addr
inp = dialog.find_element_by_id('cnfrmInpText')
inp.clear()
inp.send_keys(short_addr)
elif title.startswith('ML64 Address'):
ml64 = None
for addr in self.dut.addrs:
if addr.startswith('fd') and not re.match(
'.+ff:fe00:[0-9a-f]{0,4}$', addr
):
ml64 = addr
break
if not ml64:
raise Exception('No mesh local address found')
logger.info('Mesh local address is %s', ml64)
inp = dialog.find_element_by_id('cnfrmInpText')
inp.clear()
inp.send_keys(ml64)
elif title.startswith('Shield Devices') or title.startswith(
'Sheild DUT'
):
if self.rf_shield:
logger.info('Shielding devices')
with self.rf_shield:
self.rf_shield.shield()
elif self.dut and settings.SHIELD_SIMULATION:
self.dut.channel = (
self.channel == THREAD_CHANNEL_MAX and THREAD_CHANNEL_MIN
) or (self.channel + 1)
else:
input('Shield DUT and press enter to continue..')
elif title.startswith('Unshield Devices') or title.startswith(
'Bring DUT Back to network'
):
if self.rf_shield:
logger.info('Unshielding devices')
with self.rf_shield:
self.rf_shield.unshield()
elif self.dut and settings.SHIELD_SIMULATION:
self.dut.channel = self.channel
else:
input('Bring DUT and press enter to continue..')
elif title.startswith('Configure Prefix on DUT'):
body = dialog.find_element_by_id('cnfrmMsg').text
body = body.split(': ')[1]
params = reduce(
lambda params, param: params.update(
((param[0].strip(' '), param[1]),)
)
or params,
[it.split('=') for it in body.split(', ')],
{},
)
prefix = params['P_Prefix'].strip('\0\r\n\t ')
flags = []
if params.get('P_slaac_preferred', 0) == '1':
flags.append('p')
flags.append('ao')
if params.get('P_stable', 0) == '1':
flags.append('s')
if params.get('P_default', 0) == '1':
flags.append('r')
prf = 'high'
self.dut.add_prefix(prefix, ''.join(flags), prf)
return False
def test(self):
"""This method will only start test case in child class"""
if self.__class__ is HarnessCase:
logger.warning('Skip this harness itself')
return
logger.info('Testing role[%d] case[%s]', self.role, self.case)
try:
self._init_browser()
# prepare test case
while True:
url = self._browser.current_url
if url.endswith('SetupPage.html'):
self._setup_page()
elif url.endswith('TestBed.html'):
self._test_bed()
elif url.endswith('TestExecution.html'):
logger.info('Ready to handle dialogs')
break
time.sleep(2)
except UnexpectedAlertPresentException:
logger.exception('Failed to connect to harness server')
raise SystemExit()
except FatalError:
logger.exception('Test stopped for fatal error')
raise SystemExit()
except FailError:
logger.exception('Test failed')
raise
except BaseException:
logger.exception('Something wrong')
self._select_case(self.role, self.case)
logger.info("start to wait test process end")
self._wait_dialog()
try:
self._collect_result()
except BaseException:
logger.exception('Failed to collect results')
raise
# get case result
status = self._browser.find_element_by_class_name('title-test').text
logger.info(status)
success = 'Pass' in status
self.assertTrue(success)