| #!/usr/bin/env fuchsia-vendored-python |
| # Copyright 2021 The Fuchsia Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| #### CATEGORY=Test |
| ### Query and calculate test stats. |
| |
| import argparse |
| import sys |
| import os |
| import subprocess |
| import json |
| from collections import defaultdict |
| import random |
| import time |
| |
| |
| # Retrieve stats on test execution. |
| # |
| # Example: fx test-stats --print |
| # (Prints a categorization of all tests in the current build.) |
| # |
| # Example: fx test-stats --print --os fuchsia |
| # (Prints a categorization of Fuchsia tests in the current build.) |
| # |
| # Example: fx test-stats --run --os fuchsia |
| # (Run all Fuchsia tests on a device.) |
| # |
| # Example: fx test-stats --run --dimension device_type=AEMU |
| # (Run only tests with device_type AEMU.) |
| # |
| # Example: fx test-stats --run --type v2 |
| # (Run only components v2 tests on a device.) |
| # |
| # Example: fx test-stats --run --type v2 --parallel 4 |
| # (Run components v2 tests with up to 4 cases running concurrently.) |
| # |
| # Example: fx test-stats --run --concurrent 4 |
| # (Run all tests with up to 4 test suites running concurrently.) |
| # |
| # Example: fx test-stats --run --timeout 5 |
| # (Run all tests with a maximum allowed running time of 5 seconds.) |
| def main(): |
| parser = argparse.ArgumentParser("Retrieve stats on test execution") |
| action_group = parser.add_mutually_exclusive_group(required=True) |
| action_group.add_argument( |
| "--print", |
| action="store_true", |
| help="Print a count of matching tests by their various properties", |
| ) |
| action_group.add_argument( |
| "--run", action="store_true", help="Run all of the matching tests once" |
| ) |
| filter_group = parser.add_argument_group("filter") |
| filter_group.add_argument( |
| "--dimension", |
| action="append", |
| help="Include only tests matching given dimensions; can be specified multiple times. " |
| "Example: --dimension os=Linux --dimension cpu=x64", |
| ) |
| filter_group.add_argument( |
| "--os", |
| action="store", |
| help="Include only tests matching the given os. Example: --os fuchsia", |
| ) |
| filter_group.add_argument( |
| "--type", |
| action="store", |
| help="Include only tests of this type. Example: --type v2", |
| ) |
| run_group = parser.add_argument_group("run") |
| run_group.add_argument( |
| "-c", |
| "--concurrent", |
| action="store", |
| type=int, |
| default=1, |
| help="Number of tests to run concurrently. Default is 1.", |
| ) |
| run_group.add_argument( |
| "-t", |
| "--timeout", |
| action="store", |
| type=float, |
| default=120, |
| help="Timeout for tests, in seconds. Default is 120.", |
| ) |
| run_group.add_argument( |
| "-p", |
| "--parallel", |
| action="store", |
| type=int, |
| default=None, |
| help="Number of test cases per v2 suite to run in parallel. Uses runner default if not set.", |
| ) |
| run_group.add_argument( |
| "--shuffle", |
| action="store", |
| type=int, |
| default=None, |
| help="Toggle shuffling input. If set to 0, the current timestamp is used as the seed. If set no a non-zero number, that value will be used as the shuffle seed", |
| ) |
| run_group.add_argument( |
| "--count", |
| action="store", |
| type=int, |
| help="If set, only run this number of tests. The first tests in order following any shuffling will be used", |
| ) |
| args = parser.parse_args() |
| |
| test_tuples = get_tests_tuples(args) |
| |
| if args.print: |
| print_stats(test_tuples) |
| elif args.run: |
| run_tests( |
| test_tuples, |
| timeout_seconds=args.timeout, |
| max_running_tests=args.concurrent, |
| parallel=args.parallel, |
| ) |
| else: |
| print("Unknown mode.") |
| parser.print_help() |
| return 1 |
| |
| return 0 |
| |
| |
| # Loads test definitions, parses them, and returns only those matching the argument filters. |
| # |
| # Returns a list of tuples (parsed_test, original_test_json) |
| def get_tests_tuples(args): |
| json_data = json.loads( |
| subprocess.check_output(["fx", "test", "--printtests"]) |
| ) |
| |
| dimension_pairs = None |
| if args.dimension: |
| dimension_pairs = set( |
| [ |
| (s[0], s[1]) |
| for s in map(lambda x: x.split("="), args.dimension) |
| if len(s) > 1 |
| ] |
| ) |
| |
| def to_include(val): |
| (parsed_test, test_json) = val |
| if parsed_test is None: |
| return False |
| |
| ret = True |
| if dimension_pairs is not None: |
| # Only include this test if one of its set of dimensions is completely included in the specified dimensions. |
| if not any( |
| all( |
| map(lambda v: v in dimension_pairs, e["dimensions"].items()) |
| ) |
| for e in test_json["environments"] |
| ): |
| ret = False |
| if args.os is not None: |
| if test_json["test"]["os"] != args.os: |
| ret = False |
| if args.type is not None: |
| if parsed_test.test_type != args.type: |
| ret = False |
| |
| return ret |
| |
| test_list = list( |
| filter(to_include, map(lambda x: (parse_test(x), x), json_data)) |
| ) |
| |
| if args.shuffle is not None: |
| shuffle = args.shuffle |
| if shuffle == 0: |
| shuffle = None |
| random.seed(shuffle) |
| random.shuffle(test_list) |
| if args.count is not None: |
| test_list = test_list[: args.count] |
| |
| return test_list |
| |
| |
| # Wraps a test parsed from `fx test --printtests` |
| class Test: |
| def __init__(self, test_type=None, path=None, package_url=None): |
| self.test_type = test_type |
| self.path = path |
| self.package_url = package_url |
| |
| # Get a unique key for this test |
| def key(self): |
| return self.package_url or self.path |
| |
| def __str__(self): |
| return '{} test "{}"'.format( |
| self.test_type, self.package_url or self.path |
| ) |
| |
| |
| # Parse a single JSON dict into a Test class. |
| def parse_test(test): |
| test_val = test["test"] |
| if "package_url" in test_val: |
| suffix = test_val["package_url"][-3:] |
| if suffix == ".cm": |
| return Test(test_type="v2", package_url=test_val["package_url"]) |
| else: |
| return "unknown package" |
| elif "path" in test_val: |
| return Test(test_type="host", path=test_val["path"]) |
| else: |
| return Test(test_type="unknown") |
| |
| |
| # Implementation for --print mode. |
| def print_stats(test_tuples): |
| dimension_counts = defaultdict(int) |
| type_counts = defaultdict(int) |
| for parsed_test, test_json in test_tuples: |
| for env in test_json["environments"]: |
| lst = [] |
| for name, value in env["dimensions"].items(): |
| lst.append("{}: {}".format(name, value)) |
| lst.sort() |
| dimension_counts["({})".format(", ".join(lst))] += 1 |
| type_counts[parsed_test.test_type] += 1 |
| |
| print("Dimensions:") |
| for name, count in dimension_counts.items(): |
| print(" {}: {}".format(name, count)) |
| print("Types:") |
| for name, count in type_counts.items(): |
| print(" {}: {}".format(name, count)) |
| |
| |
| # Wrapper for a test that is currently running on a device. |
| # |
| # This class keeps track of the start and end times of the test run. |
| class StartedTest: |
| # Start executing the given command line and wrap it in a StartedTest. |
| @staticmethod |
| def create(command_line): |
| return StartedTest( |
| command_line, |
| time.time(), |
| subprocess.Popen( |
| command_line, |
| stdout=subprocess.DEVNULL, |
| stderr=subprocess.DEVNULL, |
| ), |
| ) |
| |
| def __init__(self, command_line, start_time, running_process): |
| self._command_line = command_line |
| self._start_time = start_time |
| self._running_process = running_process |
| self._end_time = None |
| |
| # Check if the test is done, updating internal state. |
| # |
| # This function must be called periodically to determine when the test is complete. |
| # |
| # Returns True if the test is done, False otherwise. |
| def poll_done(self): |
| if self._end_time is not None: |
| return True |
| |
| if self._running_process.poll() is not None: |
| self._end_time = time.time() |
| self._running_process.communicate() # drain pipes |
| return True |
| return False |
| |
| # Returns the return code for the test, or None if it is still running. |
| def return_code(self): |
| return self._running_process.returncode |
| |
| # Returns the original command line for the test. |
| def command_line(self): |
| return str(self._command_line) |
| |
| # Returns the runtime for the test. |
| # |
| # If the test is currently running, this returns currently elapsed |
| # time. Otherwise it returns the runtime from start to end of the |
| # wrapped test. |
| def runtime(self): |
| return ( |
| self._end_time - self._start_time |
| if self._end_time is not None |
| else time.time() - self._start_time |
| ) |
| |
| # Force the test to terminate if it is currently running. |
| def terminate(self): |
| self._running_process.terminate() |
| |
| |
| # Start an individual Test. |
| # |
| # Returns a StartedTest if the test could be started, and None otherwise. |
| def start_test(test_object, parallel=None, timeout=None): |
| if test_object.test_type == "v2": |
| command_line = [ |
| "fx", |
| "shell", |
| "run-test-suite", |
| test_object.package_url, |
| ] |
| if parallel: |
| command_line.append("--parallel") |
| command_line.append(f"{parallel}") |
| if timeout: |
| command_line.append("--timeout") |
| command_line.append(f"{int(timeout)}") |
| return StartedTest.create(command_line) |
| else: |
| return None |
| |
| |
| # Implementation for --run mode. |
| def run_tests( |
| to_run, timeout_seconds=None, max_running_tests=None, parallel=None |
| ): |
| test_iter = iter(to_run) |
| running_tests = [] |
| outcomes = defaultdict(list) |
| start_time = time.time() |
| |
| # Internal function to poll and update individual tests. |
| def process_running_test(test): |
| if test.poll_done(): |
| mode = "" |
| if test.return_code() == 0: |
| mode = "SUCCESS" |
| else: |
| mode = f"code {test.return_code()}" |
| outcomes[mode].append(test) |
| print(f"{mode} [{test.runtime()}]: {test.command_line()}") |
| return False |
| elif test.runtime() > timeout_seconds: |
| test.terminate() |
| |
| return True |
| |
| # Internal function to print status of the run. |
| def print_status(): |
| skipped = len(outcomes["SKIPPED"]) if "SKIPPED" in outcomes else 0 |
| total_done = sum([len(v) for v in outcomes.values()]) - skipped |
| print( |
| f"Status: {total_done}/{len(to_run)} {len(running_tests)} running {skipped} skipped" |
| ) |
| for k, v in sorted(outcomes.items()): |
| if k == "SKIPPED": |
| continue |
| runtimes = list( |
| map(lambda x: x.runtime() if hasattr(x, "runtime") else 0, v) |
| ) |
| total_time = sum(runtimes) |
| average_time = total_time / len(runtimes) |
| print( |
| f" {k:10s}: {len(runtimes):7d} {total_time:9.3f} {average_time:9.3f} avg." |
| ) |
| overall_time = time.time() - start_time |
| avg_overall_time = overall_time / total_done if total_done != 0 else 0 |
| print( |
| f" TOTAL : {total_done:7d} {overall_time:9.3f} {avg_overall_time:9.3f} avg." |
| ) |
| |
| next_test = next(test_iter, None) |
| |
| while running_tests or next_test is not None: |
| start_set = {t.command_line() for t in running_tests} |
| |
| running_tests = list(filter(process_running_test, running_tests)) |
| |
| # Continueally start tests so long as there is a test to be |
| # started and we have not yet reached the maximum. |
| while len(running_tests) < max_running_tests and next_test is not None: |
| started_test = start_test(next_test[0], parallel, timeout_seconds) |
| |
| if started_test is not None: |
| print(f"Started {next_test[0].key()}") |
| running_tests.append(started_test) |
| else: |
| outcomes["SKIPPED"].append(next_test) |
| print(f"Skipped {next_test[0].key()}") |
| |
| next_test = next(test_iter, None) |
| |
| # Print output if we have changed the set of running tests since the start of this iteration. |
| if {t.command_line() for t in running_tests} != start_set: |
| print_status() |
| |
| # Sleep for 10ms before continuing loop to reduce CPU load. |
| time.sleep(0.01) |
| |
| print_status() |
| |
| |
| if __name__ == "__main__": |
| sys.exit(main()) |