blob: 2452fde5a04ef54abd3c20b1c397dc9a487a8650 [file] [log] [blame]
#!/usr/bin/env python3.8
# Copyright 2021 The Fuchsia Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
#### CATEGORY=Test
### Query and calculate test stats.
import argparse
import sys
import os
import subprocess
import json
from collections import defaultdict
import random
import time
# Retrieve stats on test execution.
#
# Example: fx test-stats --print
# (Prints a categorization of all tests in the current build.)
#
# Example: fx test-stats --print --os fuchsia
# (Prints a categorization of Fuchsia tests in the current build.)
#
# Example: fx test-stats --run --os fuchsia
# (Run all Fuchsia tests on a device.)
#
# Example: fx test-stats --run --dimension device_type=AEMU
# (Run only tests with device_type AEMU.)
#
# Example: fx test-stats --run --type v2
# (Run only components v2 tests on a device.)
#
# Example: fx test-stats --run --type v2 --parallel 4
# (Run components v2 tests with up to 4 cases running concurrently.)
#
# Example: fx test-stats --run --concurrent 4
# (Run all tests with up to 4 test suites running concurrently.)
#
# Example: fx test-stats --run --timeout 5
# (Run all tests with a maximum allowed running time of 5 seconds.)
def main():
parser = argparse.ArgumentParser('Retrieve stats on test execution')
action_group = parser.add_mutually_exclusive_group(required=True)
action_group.add_argument(
'--print',
action='store_true',
help='Print a count of matching tests by their various properties')
action_group.add_argument(
'--run', action='store_true', help='Run all of the matching tests once')
filter_group = parser.add_argument_group('filter')
filter_group.add_argument(
'--dimension',
action='append',
help='Include only tests matching given dimensions; can be specified multiple times. '
'Example: --dimension os=Linux --dimension cpu=x64')
filter_group.add_argument(
'--os',
action='store',
help='Include only tests matching the given os. Example: --os fuchsia')
filter_group.add_argument(
'--type',
action='store',
help='Include only tests of this type. Example: --type v2')
run_group = parser.add_argument_group('run')
run_group.add_argument(
'-c',
'--concurrent',
action='store',
type=int,
default=1,
help='Number of tests to run concurrently. Default is 1.',
)
run_group.add_argument(
'-t',
'--timeout',
action='store',
type=float,
default=120,
help='Timeout for tests, in seconds. Default is 120.',
)
run_group.add_argument(
'-p',
'--parallel',
action='store',
type=int,
default=None,
help='Number of test cases per v2 suite to run in parallel. Uses runner default if not set.',
)
run_group.add_argument(
'--shuffle',
action='store',
type=int,
default=None,
help='Toggle shuffling input. If set to 0, the current timestamp is used as the seed. If set no a non-zero number, that value will be used as the shuffle seed',
)
run_group.add_argument(
'--count',
action='store',
type=int,
help='If set, only run this number of tests. The first tests in order following any shuffling will be used'
)
args = parser.parse_args()
test_tuples = get_tests_tuples(args)
if args.print:
print_stats(test_tuples)
elif args.run:
run_tests(
test_tuples,
timeout_seconds=args.timeout,
max_running_tests=args.concurrent,
parallel=args.parallel)
else:
print('Unknown mode.')
parser.print_help()
return 1
return 0
# Loads test definitions, parses them, and returns only those matching the argument filters.
#
# Returns a list of tuples (parsed_test, original_test_json)
def get_tests_tuples(args):
json_data = json.loads(
subprocess.check_output(['fx', 'test', '--printtests']))
dimension_pairs = None
if args.dimension:
dimension_pairs = set([
(s[0], s[1])
for s in map(lambda x: x.split('='), args.dimension)
if len(s) > 1
])
def to_include(val):
(parsed_test, test_json) = val
if parsed_test is None:
return False
ret = True
if dimension_pairs is not None:
# Only include this test if one of its set of dimensions is completely included in the specified dimensions.
if not any(
all(
map(lambda v: v in dimension_pairs,
e['dimensions'].items()))
for e in test_json['environments']):
ret = False
if args.os is not None:
if test_json['test']['os'] != args.os:
ret = False
if args.type is not None:
if parsed_test.test_type != args.type:
ret = False
return ret
test_list = list(
filter(to_include, map(lambda x: (parse_test(x), x), json_data)))
if args.shuffle is not None:
shuffle = args.shuffle
if shuffle == 0:
shuffle = None
random.seed(shuffle)
random.shuffle(test_list)
if args.count is not None:
test_list = test_list[:args.count]
return test_list
# Wraps a test parsed from `fx test --printtests`
class Test:
def __init__(self, test_type=None, path=None, package_url=None):
self.test_type = test_type
self.path = path
self.package_url = package_url
# Get a unique key for this test
def key(self):
return self.package_url or self.path
def __str__(self):
return '{} test "{}"'.format(self.test_type, self.package_url or
self.path)
# Parse a single JSON dict into a Test class.
def parse_test(test):
test_val = test['test']
if 'package_url' in test_val:
suffix = test_val['package_url'][-3:]
if suffix == 'cmx':
return Test(test_type='v1', package_url=test_val['package_url'])
elif suffix == '.cm':
return Test(test_type='v2', package_url=test_val['package_url'])
else:
return 'unknown package'
elif 'path' in test_val:
return Test(test_type='host', path=test_val['path'])
else:
return Test(test_type='unknown')
# Implementation for --print mode.
def print_stats(test_tuples):
dimension_counts = defaultdict(int)
type_counts = defaultdict(int)
for (parsed_test, test_json) in test_tuples:
for env in test_json['environments']:
lst = []
for name, value in env['dimensions'].items():
lst.append('{}: {}'.format(name, value))
lst.sort()
dimension_counts['({})'.format(', '.join(lst))] += 1
type_counts[parsed_test.test_type] += 1
print('Dimensions:')
for name, count in dimension_counts.items():
print(' {}: {}'.format(name, count))
print('Types:')
for name, count in type_counts.items():
print(' {}: {}'.format(name, count))
# Wrapper for a test that is currently running on a device.
#
# This class keeps track of the start and end times of the test run.
class StartedTest:
# Start executing the given command line and wrap it in a StartedTest.
@staticmethod
def create(command_line):
return StartedTest(
command_line, time.time(),
subprocess.Popen(
command_line,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL))
def __init__(self, command_line, start_time, running_process):
self._command_line = command_line
self._start_time = start_time
self._running_process = running_process
self._end_time = None
# Check if the test is done, updating internal state.
#
# This function must be called periodically to determine when the test is complete.
#
# Returns True if the test is done, False otherwise.
def poll_done(self):
if self._end_time is not None:
return True
if self._running_process.poll() is not None:
self._end_time = time.time()
self._running_process.communicate() # drain pipes
return True
return False
# Returns the return code for the test, or None if it is still running.
def return_code(self):
return self._running_process.returncode
# Returns the original command line for the test.
def command_line(self):
return str(self._command_line)
# Returns the runtime for the test.
#
# If the test is currently running, this returns currently elapsed
# time. Otherwise it returns the runtime from start to end of the
# wrapped test.
def runtime(self):
return (self._end_time - self._start_time if self._end_time is not None
else time.time() - self._start_time)
# Force the test to terminate if it is currently running.
def terminate(self):
self._running_process.terminate()
# Start an individual Test.
#
# Returns a StartedTest if the test could be started, and None otherwise.
def start_test(test_object, parallel=None, timeout=None):
if test_object.test_type == 'v1':
command_line = ['fx', 'shell', 'run-test-component']
if timeout:
command_line.append(f'--timeout={int(timeout)}')
command_line.append(test_object.package_url)
return StartedTest.create(command_line)
if test_object.test_type == 'v2':
command_line = [
'fx', 'shell', 'run-test-suite', test_object.package_url
]
if parallel:
command_line.append('--parallel')
command_line.append(f'{parallel}')
if timeout:
command_line.append('--timeout')
command_line.append(f'{int(timeout)}')
return StartedTest.create(command_line)
else:
return None
# Implementation for --run mode.
def run_tests(to_run,
timeout_seconds=None,
max_running_tests=None,
parallel=None):
test_iter = iter(to_run)
running_tests = []
outcomes = defaultdict(list)
start_time = time.time()
# Internal function to poll and update individual tests.
def process_running_test(test):
if test.poll_done():
mode = ''
if test.return_code() == 0:
mode = 'SUCCESS'
else:
mode = f'code {test.return_code()}'
outcomes[mode].append(test)
print(f'{mode} [{test.runtime()}]: {test.command_line()}')
return False
elif test.runtime() > timeout_seconds:
test.terminate()
return True
# Internal function to print status of the run.
def print_status():
skipped = len(outcomes['SKIPPED']) if 'SKIPPED' in outcomes else 0
total_done = sum([len(v) for v in outcomes.values()]) - skipped
print(
f'Status: {total_done}/{len(to_run)} {len(running_tests)} running {skipped} skipped'
)
for (k, v) in sorted(outcomes.items()):
if k == 'SKIPPED':
continue
runtimes = list(
map(lambda x: x.runtime() if hasattr(x, 'runtime') else 0, v))
total_time = sum(runtimes)
average_time = total_time / len(runtimes)
print(
f' {k:10s}: {len(runtimes):7d} {total_time:9.3f} {average_time:9.3f} avg.'
)
overall_time = time.time() - start_time
avg_overall_time = overall_time / total_done if total_done != 0 else 0
print(
f' TOTAL : {total_done:7d} {overall_time:9.3f} {avg_overall_time:9.3f} avg.'
)
next_test = next(test_iter, None)
while running_tests or next_test is not None:
start_set = {t.command_line() for t in running_tests}
running_tests = list(filter(process_running_test, running_tests))
# Continueally start tests so long as there is a test to be
# started and we have not yet reached the maximum.
while len(running_tests) < max_running_tests and next_test is not None:
started_test = start_test(next_test[0], parallel, timeout_seconds)
if started_test is not None:
print(f'Started {next_test[0].key()}')
running_tests.append(started_test)
else:
outcomes['SKIPPED'].append(next_test)
print(f'Skipped {next_test[0].key()}')
next_test = next(test_iter, None)
# Print output if we have changed the set of running tests since the start of this iteration.
if {t.command_line() for t in running_tests} != start_set:
print_status()
# Sleep for 10ms before continuing loop to reduce CPU load.
time.sleep(.01)
print_status()
if __name__ == '__main__':
sys.exit(main())