blob: fc22f89ec38ad21266bfd679d0b276041656f981 [file] [log] [blame]
#!/usr/bin/env fuchsia-vendored-python
# Copyright 2021 The Fuchsia Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
#### CATEGORY=Test
### Query and calculate test stats.
import argparse
import sys
import os
import subprocess
import json
from collections import defaultdict
import random
import time
# Retrieve stats on test execution.
#
# Example: fx test-stats --print
# (Prints a categorization of all tests in the current build.)
#
# Example: fx test-stats --print --os fuchsia
# (Prints a categorization of Fuchsia tests in the current build.)
#
# Example: fx test-stats --run --os fuchsia
# (Run all Fuchsia tests on a device.)
#
# Example: fx test-stats --run --dimension device_type=AEMU
# (Run only tests with device_type AEMU.)
#
# Example: fx test-stats --run --type v2
# (Run only components v2 tests on a device.)
#
# Example: fx test-stats --run --type v2 --parallel 4
# (Run components v2 tests with up to 4 cases running concurrently.)
#
# Example: fx test-stats --run --concurrent 4
# (Run all tests with up to 4 test suites running concurrently.)
#
# Example: fx test-stats --run --timeout 5
# (Run all tests with a maximum allowed running time of 5 seconds.)
def main():
parser = argparse.ArgumentParser("Retrieve stats on test execution")
action_group = parser.add_mutually_exclusive_group(required=True)
action_group.add_argument(
"--print",
action="store_true",
help="Print a count of matching tests by their various properties",
)
action_group.add_argument(
"--run", action="store_true", help="Run all of the matching tests once"
)
filter_group = parser.add_argument_group("filter")
filter_group.add_argument(
"--dimension",
action="append",
help="Include only tests matching given dimensions; can be specified multiple times. "
"Example: --dimension os=Linux --dimension cpu=x64",
)
filter_group.add_argument(
"--os",
action="store",
help="Include only tests matching the given os. Example: --os fuchsia",
)
filter_group.add_argument(
"--type",
action="store",
help="Include only tests of this type. Example: --type v2",
)
run_group = parser.add_argument_group("run")
run_group.add_argument(
"-c",
"--concurrent",
action="store",
type=int,
default=1,
help="Number of tests to run concurrently. Default is 1.",
)
run_group.add_argument(
"-t",
"--timeout",
action="store",
type=float,
default=120,
help="Timeout for tests, in seconds. Default is 120.",
)
run_group.add_argument(
"-p",
"--parallel",
action="store",
type=int,
default=None,
help="Number of test cases per v2 suite to run in parallel. Uses runner default if not set.",
)
run_group.add_argument(
"--shuffle",
action="store",
type=int,
default=None,
help="Toggle shuffling input. If set to 0, the current timestamp is used as the seed. If set no a non-zero number, that value will be used as the shuffle seed",
)
run_group.add_argument(
"--count",
action="store",
type=int,
help="If set, only run this number of tests. The first tests in order following any shuffling will be used",
)
args = parser.parse_args()
test_tuples = get_tests_tuples(args)
if args.print:
print_stats(test_tuples)
elif args.run:
run_tests(
test_tuples,
timeout_seconds=args.timeout,
max_running_tests=args.concurrent,
parallel=args.parallel,
)
else:
print("Unknown mode.")
parser.print_help()
return 1
return 0
# Loads test definitions, parses them, and returns only those matching the argument filters.
#
# Returns a list of tuples (parsed_test, original_test_json)
def get_tests_tuples(args):
json_data = json.loads(
subprocess.check_output(["fx", "test", "--printtests"])
)
dimension_pairs = None
if args.dimension:
dimension_pairs = set(
[
(s[0], s[1])
for s in map(lambda x: x.split("="), args.dimension)
if len(s) > 1
]
)
def to_include(val):
(parsed_test, test_json) = val
if parsed_test is None:
return False
ret = True
if dimension_pairs is not None:
# Only include this test if one of its set of dimensions is completely included in the specified dimensions.
if not any(
all(
map(lambda v: v in dimension_pairs, e["dimensions"].items())
)
for e in test_json["environments"]
):
ret = False
if args.os is not None:
if test_json["test"]["os"] != args.os:
ret = False
if args.type is not None:
if parsed_test.test_type != args.type:
ret = False
return ret
test_list = list(
filter(to_include, map(lambda x: (parse_test(x), x), json_data))
)
if args.shuffle is not None:
shuffle = args.shuffle
if shuffle == 0:
shuffle = None
random.seed(shuffle)
random.shuffle(test_list)
if args.count is not None:
test_list = test_list[: args.count]
return test_list
# Wraps a test parsed from `fx test --printtests`
class Test:
def __init__(self, test_type=None, path=None, package_url=None):
self.test_type = test_type
self.path = path
self.package_url = package_url
# Get a unique key for this test
def key(self):
return self.package_url or self.path
def __str__(self):
return '{} test "{}"'.format(
self.test_type, self.package_url or self.path
)
# Parse a single JSON dict into a Test class.
def parse_test(test):
test_val = test["test"]
if "package_url" in test_val:
suffix = test_val["package_url"][-3:]
if suffix == ".cm":
return Test(test_type="v2", package_url=test_val["package_url"])
else:
return "unknown package"
elif "path" in test_val:
return Test(test_type="host", path=test_val["path"])
else:
return Test(test_type="unknown")
# Implementation for --print mode.
def print_stats(test_tuples):
dimension_counts = defaultdict(int)
type_counts = defaultdict(int)
for parsed_test, test_json in test_tuples:
for env in test_json["environments"]:
lst = []
for name, value in env["dimensions"].items():
lst.append("{}: {}".format(name, value))
lst.sort()
dimension_counts["({})".format(", ".join(lst))] += 1
type_counts[parsed_test.test_type] += 1
print("Dimensions:")
for name, count in dimension_counts.items():
print(" {}: {}".format(name, count))
print("Types:")
for name, count in type_counts.items():
print(" {}: {}".format(name, count))
# Wrapper for a test that is currently running on a device.
#
# This class keeps track of the start and end times of the test run.
class StartedTest:
# Start executing the given command line and wrap it in a StartedTest.
@staticmethod
def create(command_line):
return StartedTest(
command_line,
time.time(),
subprocess.Popen(
command_line,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
),
)
def __init__(self, command_line, start_time, running_process):
self._command_line = command_line
self._start_time = start_time
self._running_process = running_process
self._end_time = None
# Check if the test is done, updating internal state.
#
# This function must be called periodically to determine when the test is complete.
#
# Returns True if the test is done, False otherwise.
def poll_done(self):
if self._end_time is not None:
return True
if self._running_process.poll() is not None:
self._end_time = time.time()
self._running_process.communicate() # drain pipes
return True
return False
# Returns the return code for the test, or None if it is still running.
def return_code(self):
return self._running_process.returncode
# Returns the original command line for the test.
def command_line(self):
return str(self._command_line)
# Returns the runtime for the test.
#
# If the test is currently running, this returns currently elapsed
# time. Otherwise it returns the runtime from start to end of the
# wrapped test.
def runtime(self):
return (
self._end_time - self._start_time
if self._end_time is not None
else time.time() - self._start_time
)
# Force the test to terminate if it is currently running.
def terminate(self):
self._running_process.terminate()
# Start an individual Test.
#
# Returns a StartedTest if the test could be started, and None otherwise.
def start_test(test_object, parallel=None, timeout=None):
if test_object.test_type == "v2":
command_line = [
"fx",
"shell",
"run-test-suite",
test_object.package_url,
]
if parallel:
command_line.append("--parallel")
command_line.append(f"{parallel}")
if timeout:
command_line.append("--timeout")
command_line.append(f"{int(timeout)}")
return StartedTest.create(command_line)
else:
return None
# Implementation for --run mode.
def run_tests(
to_run, timeout_seconds=None, max_running_tests=None, parallel=None
):
test_iter = iter(to_run)
running_tests = []
outcomes = defaultdict(list)
start_time = time.time()
# Internal function to poll and update individual tests.
def process_running_test(test):
if test.poll_done():
mode = ""
if test.return_code() == 0:
mode = "SUCCESS"
else:
mode = f"code {test.return_code()}"
outcomes[mode].append(test)
print(f"{mode} [{test.runtime()}]: {test.command_line()}")
return False
elif test.runtime() > timeout_seconds:
test.terminate()
return True
# Internal function to print status of the run.
def print_status():
skipped = len(outcomes["SKIPPED"]) if "SKIPPED" in outcomes else 0
total_done = sum([len(v) for v in outcomes.values()]) - skipped
print(
f"Status: {total_done}/{len(to_run)} {len(running_tests)} running {skipped} skipped"
)
for k, v in sorted(outcomes.items()):
if k == "SKIPPED":
continue
runtimes = list(
map(lambda x: x.runtime() if hasattr(x, "runtime") else 0, v)
)
total_time = sum(runtimes)
average_time = total_time / len(runtimes)
print(
f" {k:10s}: {len(runtimes):7d} {total_time:9.3f} {average_time:9.3f} avg."
)
overall_time = time.time() - start_time
avg_overall_time = overall_time / total_done if total_done != 0 else 0
print(
f" TOTAL : {total_done:7d} {overall_time:9.3f} {avg_overall_time:9.3f} avg."
)
next_test = next(test_iter, None)
while running_tests or next_test is not None:
start_set = {t.command_line() for t in running_tests}
running_tests = list(filter(process_running_test, running_tests))
# Continueally start tests so long as there is a test to be
# started and we have not yet reached the maximum.
while len(running_tests) < max_running_tests and next_test is not None:
started_test = start_test(next_test[0], parallel, timeout_seconds)
if started_test is not None:
print(f"Started {next_test[0].key()}")
running_tests.append(started_test)
else:
outcomes["SKIPPED"].append(next_test)
print(f"Skipped {next_test[0].key()}")
next_test = next(test_iter, None)
# Print output if we have changed the set of running tests since the start of this iteration.
if {t.command_line() for t in running_tests} != start_set:
print_status()
# Sleep for 10ms before continuing loop to reduce CPU load.
time.sleep(0.01)
print_status()
if __name__ == "__main__":
sys.exit(main())