blob: 5276c8be70fb2bf751ad9c70966a482b95293398 [file] [log] [blame]
#!/usr/bin/env python
#
# Copyright (c) 2016, The OpenThread Authors.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# 3. Neither the name of the copyright holder nor the
# names of its contributors may be used to endorse or promote products
# derived from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
import argparse
import fnmatch
import logging
import json
import os
import sys
import time
import unittest
from builtins import str
from collections import OrderedDict
from autothreadharness.harness_case import HarnessCase
from autothreadharness.open_thread_controller import OpenThreadController
from autothreadharness import settings
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger()
"""Logger: The global logger"""
logger.setLevel(logging.INFO)
RESUME_SCRIPT_PATH = (
'%appdata%\\Microsoft\\Windows\\Start Menu\\Programs\\'
'Startup\\continue_harness.bat'
)
class SimpleTestResult(unittest.TestResult):
executions = 0
def __init__(
self,
path,
auto_reboot_args=None,
keep_explorer=False,
add_all_devices=False,
):
"""Record test results in json file
Args:
path (str): File path to record the results
auto_reboot (bool): Whether reboot when harness die
"""
super(SimpleTestResult, self).__init__()
self.path = path
self.auto_reboot_args = auto_reboot_args
self.result = json.load(open(self.path, 'r'))
self.log_handler = None
self.started = None
self.keep_explorer = keep_explorer
self.add_all_devices = add_all_devices
SimpleTestResult.executions += 1
logger.info('Initial state is %s', json.dumps(self.result, indent=2))
def startTest(self, test):
logger.info(
'\n========================================\n%s\n========================================',
test.__class__.__name__,
)
test.add_all_devices = self.add_all_devices
# create start up script if auto reboot enabled
if self.auto_reboot_args:
test.auto_reboot = True
os.system(
'echo %s > "%s"'
% (
' '.join(
self.auto_reboot_args + ['-c', test.__class__.__name__]
),
RESUME_SCRIPT_PATH,
)
)
# record start timestamp
self.started = time.strftime('%Y-%m-%dT%H:%M:%S')
os.system('mkdir %s' % test.result_dir)
self.log_handler = logging.FileHandler(
'%s\\auto-%s.log'
% (test.result_dir, time.strftime('%Y%m%d%H%M%S'))
)
self.log_handler.setLevel(logging.DEBUG)
self.log_handler.setFormatter(
logging.Formatter('%(asctime)s %(levelname)s %(message)s')
)
logger.addHandler(self.log_handler)
def add_result(self, test, passed, error=None):
"""Record test result into json file
Args:
test (TestCase): The test just run
passed (bool): Whether the case is passed
"""
self.result[str(test.__class__.__name__)] = {
'started': self.started,
'stopped': time.strftime('%Y-%m-%dT%H:%M:%S'),
'passed': passed,
'error': error,
'executions': SimpleTestResult.executions,
}
if self.auto_reboot_args:
os.system('del "%s"' % RESUME_SCRIPT_PATH)
json.dump(
OrderedDict(sorted(self.result.items(), key=lambda t: t[0])),
open(self.path, 'w'),
indent=2,
)
# save logs
logger.removeHandler(self.log_handler)
self.log_handler.close()
self.log_handler = None
time.sleep(2)
# close explorers
if not self.keep_explorer:
os.system('taskkill /f /im explorer.exe && start explorer.exe')
def addSuccess(self, test):
logger.info('case[%s] pass', test.__class__.__name__)
super(SimpleTestResult, self).addSuccess(test)
self.add_result(test, True)
def addFailure(self, test, err):
logger.warning('case[%s] fail', test.__class__.__name__)
super(SimpleTestResult, self).addFailure(test, err)
self.add_result(test, False)
def addError(self, test, err):
logger.error('case[%s] error', test.__class__.__name__, exc_info=err)
if err and err[0] is SystemExit:
if self.auto_reboot_args:
logger.warning('rebooting..')
os.system('shutdown /r /t 1')
else:
logger.warning('exiting..')
sys.exit(1)
super(SimpleTestResult, self).addError(test, err)
self.add_result(test, None, str(err[1]))
def list_devices(names=None, continue_from=None, **kwargs):
"""List devices in settings file and print versions"""
if not names:
names = [
device
for device, _type in settings.GOLDEN_DEVICES
if _type == 'OpenThread'
]
if continue_from:
continue_from = names.index(continue_from)
else:
continue_from = 0
for port in names[continue_from:]:
try:
with OpenThreadController(port) as otc:
print('%s: %s' % (port, otc.version))
except BaseException:
logger.exception('failed to get version of %s' % port)
def discover(
names=None,
pattern=['*.py'],
skip='efp',
dry_run=False,
blacklist=None,
name_greps=None,
manual_reset=False,
delete_history=False,
max_devices=0,
continue_from=None,
result_file='./result.json',
auto_reboot=False,
keep_explorer=False,
add_all_devices=False,
):
"""Discover all test cases and skip those passed
Args:
pattern (str): Pattern to match case modules, refer python's unittest
documentation for more details
skip (str): types cases to skip
"""
if not os.path.exists(settings.OUTPUT_PATH):
os.mkdir(settings.OUTPUT_PATH)
if delete_history:
os.system('del history.json')
if blacklist:
try:
excludes = [
line.strip('\n')
for line in open(blacklist, 'r').readlines()
if not line.startswith('#')
]
except BaseException:
logger.exception('Failed to open test case black list file')
raise
else:
excludes = []
log = None
if os.path.isfile(result_file):
try:
log = json.load(open(result_file, 'r'))
except BaseException:
logger.exception('Failed to open result file')
if not log:
log = {}
json.dump(log, open(result_file, 'w'), indent=2)
suite = unittest.TestSuite()
discovered = unittest.defaultTestLoader.discover('cases', pattern)
if names and continue_from:
names = names[names.index(continue_from):]
for s1 in discovered:
for s2 in s1:
for case in s2:
if case.__class__ is HarnessCase:
continue
case_name = str(case.__class__.__name__)
# grep name
if name_greps and not any(
fnmatch.fnmatch(case_name, name_grep)
for name_grep in name_greps
):
logger.info('case[%s] skipped by name greps', case_name)
continue
# whitelist
if len(names) and case_name not in names:
logger.info('case[%s] skipped', case_name)
continue
# skip cases
if case_name in log.keys():
if (
(log[case_name]['passed'] and ('p' in skip))
or (
log[case_name]['passed'] is False and ('f' in skip)
)
or (log[case_name]['passed'] is None and ('e' in skip))
):
logger.warning(
'case[%s] skipped for its status[%s]',
case_name,
log[case_name]['passed'],
)
continue
# continue from
if continue_from:
if continue_from != case_name:
logger.warning(
'case[%s] skipped for continue from[%s]',
case_name,
continue_from,
)
continue
else:
continue_from = None
# black list
if case_name in excludes:
logger.warning('case[%s] skipped for blacklist', case_name)
continue
# max devices
if max_devices and case.golden_devices_required > max_devices:
logger.warning(
'case[%s] skipped for exceeding max golden devices allowed[%d]',
case_name,
max_devices,
)
continue
suite.addTest(case)
logger.info('case[%s] added', case_name)
if auto_reboot:
argv = []
argv.append('"%s"' % os.sep.join([os.getcwd(), 'start.bat']))
argv.extend(['-p', pattern])
argv.extend(['-k', skip])
argv.extend(['-o', result_file])
argv.append('-a')
if manual_reset:
argv.append('-m')
if delete_history:
argv.append('-d')
auto_reboot_args = argv + names
else:
auto_reboot_args = None
os.system('del "%s"' % RESUME_SCRIPT_PATH)
# manual reset
if manual_reset:
settings.PDU_CONTROLLER_TYPE = 'MANUAL_PDU_CONTROLLER'
settings.PDU_CONTROLLER_OPEN_PARAMS = {}
settings.PDU_CONTROLLER_REBOOT_PARAMS = {}
result = SimpleTestResult(
result_file, auto_reboot_args, keep_explorer, add_all_devices
)
for case in suite:
logger.info(case.__class__.__name__)
if dry_run:
return
suite.run(result)
return result
def main():
parser = argparse.ArgumentParser(
description='Thread harness test case runner'
)
parser.add_argument(
'--auto-reboot',
'-a',
action='store_true',
default=False,
help='restart system when harness service die',
)
parser.add_argument(
'names',
metavar='NAME',
type=str,
nargs='*',
default=None,
help='test case name, omit to test all',
)
parser.add_argument(
'--blacklist',
'-b',
metavar='BLACKLIST_FILE',
type=str,
help='file to list test cases to skip',
default=None,
)
parser.add_argument(
'--continue-from',
'-c',
type=str,
default=None,
help='first case to test',
)
parser.add_argument(
'--delete-history',
'-d',
action='store_true',
default=False,
help='clear history on startup',
)
parser.add_argument(
'--keep-explorer',
'-e',
action='store_true',
default=False,
help='do not restart explorer.exe at the end',
)
parser.add_argument(
'--name-greps',
'-g',
action='append',
default=None,
help='grep case by names',
)
parser.add_argument(
'--list-file',
'-i',
type=str,
default=None,
help='file to list cases names to test',
)
parser.add_argument(
'--skip',
'-k',
metavar='SKIP',
type=str,
help='type of results to skip.' 'e for error, f for fail, p for pass.',
default='',
)
parser.add_argument(
'--list-devices',
'-l',
action='store_true',
default=False,
help='list devices',
)
parser.add_argument(
'--manual-reset',
'-m',
action='store_true',
default=False,
help='reset devices manually',
)
parser.add_argument(
'--dry-run',
'-n',
action='store_true',
default=False,
help='just show what to run',
)
parser.add_argument(
'--result-file',
'-o',
type=str,
default=settings.OUTPUT_PATH + '\\result.json',
help='file to store and read current status',
)
parser.add_argument(
'--pattern',
'-p',
metavar='PATTERN',
type=str,
help='file name pattern, default to "*.py"',
default='*.py',
)
parser.add_argument(
'--rerun-fails',
'-r',
type=int,
default=0,
help='number of times to rerun failed test cases',
)
parser.add_argument(
'--add-all-devices',
'-t',
action='store_true',
default=False,
help='add all devices to the test bed',
)
parser.add_argument(
'--max-devices',
'-u',
type=int,
default=0,
help='max golden devices allowed',
)
args = vars(parser.parse_args())
if args['list_file']:
try:
names = [
line.strip('\n')
for line in open(args['list_file'], 'r').readlines()
if not line.startswith('#')
]
except BaseException:
logger.exception('Failed to open test case list file')
raise
else:
args['names'] = args['names'] + names
args.pop('list_file')
if args.pop('list_devices', False):
list_devices(**args)
return
rerun_fails = args.pop('rerun_fails')
result = discover(**args)
if rerun_fails > 0:
for i in range(rerun_fails):
failed_names = {
name
for name in result.result
if result.result[name]['passed'] is False
}
if not failed_names:
break
logger.info('Rerunning failed test cases')
logger.info('Rerun #{}:'.format(i + 1))
result = discover(
names=failed_names,
pattern=args['pattern'],
skip='',
result_file=args['result_file'],
auto_reboot=args['auto_reboot'],
keep_explorer=args['keep_explorer'],
add_all_devices=args['add_all_devices'],
)
if __name__ == '__main__':
main()