blob: 84f913dd0a5888e4a78366d8d7ad1d9bcecf269e [file] [log] [blame]
# Copyright 2023 The Fuchsia Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
from __future__ import annotations
import argparse
import asyncio
import atexit
from dataclasses import dataclass
from dataclasses import field
import functools
import gzip
import json
import os
import re
import subprocess
import sys
import typing
import args
import config
import console
import dataparse
import debugger
import environment
import event
import execution
import log
import selection
import selection_types
import statusinfo
import termout
import test_list_file
import tests_json_file
import util.command as command
import util.signals
def main() -> None:
# Main entrypoint.
# Set up the event loop to catch termination signals (i.e. Ctrl+C), and
# cancel the main task when they are received.
try:
config_file = config.load_config()
except argparse.ArgumentError as e:
print(f"Failed to parse config: {e.message}")
sys.exit(1)
try:
real_flags = args.parse_args(defaults=config_file.default_flags)
except argparse.ArgumentError as e:
print(f"Failed to parse command line: {e.message}")
sys.exit(1)
# Special utility mode handling
if real_flags.print_logs:
sys.exit(do_print_logs(real_flags))
# No special modes, proceed with async execution.
fut = asyncio.ensure_future(
async_main_wrapper(real_flags, config_file=config_file)
)
util.signals.register_on_terminate_signal(fut.cancel) # type: ignore[arg-type]
try:
loop = asyncio.get_event_loop()
loop.run_until_complete(fut)
sys.exit(fut.result())
except asyncio.CancelledError:
print("\n\nReceived interrupt, exiting")
sys.exit(1)
async def async_main_wrapper(
flags: args.Flags,
recorder: event.EventRecorder | None = None,
config_file: config.ConfigFile | None = None,
) -> int:
"""Wrapper for the main logic of fx test.
This wrapper creates a list containing tasks that must be
awaited before the program exits. The main logic may add tasks to this
list during execution, and then return the intended status code.
Args:
flags (args.Flags): Flags to pass into the main function.
recorder (event.EventRecorder | None, optional): If set,
use this event recorder. Used for testing.
config_file (config.ConfigFile, optional): If set, record
that this configuration was loaded to set default flags.
Returns:
The return code of the program.
"""
tasks: list[asyncio.Task[None]] = []
if recorder is None:
recorder = event.EventRecorder()
ret = await async_main(flags, tasks, recorder, config_file)
to_wait = asyncio.Task(asyncio.wait(tasks), name="Drain tasks")
timeout_seconds = 5
try:
await asyncio.wait_for(asyncio.shield(to_wait), timeout=timeout_seconds)
except asyncio.TimeoutError:
print(
f"\n\nWaiting for tasks to complete for longer than {timeout_seconds} seconds...\n",
file=sys.stderr,
)
await to_wait
return ret
def do_print_logs(flags: args.Flags) -> int:
env = environment.ExecutionEnvironment.initialize_from_args(
flags, create_log_file=False
)
try:
log_path = env.get_most_recent_log()
with gzip.open(log_path, "rt") as f:
print(f"{log_path}:\n")
log.pretty_print(f)
except environment.EnvironmentError as e:
print(f"Failed to read log: {e}", file=sys.stderr)
return 1
except gzip.BadGzipFile as e:
print(f"File does not appear to be a gzip file. ({e})", file=sys.stderr)
return 1
return 0
async def async_main(
flags: args.Flags,
tasks: list[asyncio.Task[None]],
recorder: event.EventRecorder,
config_file: config.ConfigFile | None = None,
) -> int:
"""Main logic of fx test.
Args:
flags (args.Flags): Flags controlling the behavior of fx test.
tasks (List[asyncio.Tasks]): List to add tasks to that must be awaited before termination.
recorder (event.Recorder): The recorder for events.
config_file (config.ConfigFile, optional): The loaded config, if one was set.
Returns:
The return code of the program.
"""
do_status_output_signal: asyncio.Event = asyncio.Event()
do_output_to_stdout = flags.logpath == args.LOG_TO_STDOUT_OPTION
if not do_output_to_stdout:
tasks.append(
asyncio.create_task(
console.console_printer(
recorder, flags, do_status_output_signal
)
)
)
# Initialize event recording.
recorder.emit_init()
info_first_line = "You are using the new fx test, which is currently ready for general use ✅"
info_block = """See details here: https://fuchsia.googlesource.com/fuchsia/+/refs/heads/main/scripts/fxtest/rewrite
To go back to the old fx test, use `fx --enable=legacy_fxtest test`, and please file a bug under b/293917801.
"""
recorder.emit_info_message(info_first_line)
recorder.emit_instruction_message(info_block)
# Try to parse the flags. Emit one event before and another
# after flag post processing.
try:
if config_file is not None and config_file.is_loaded():
recorder.emit_load_config(
config_file.path or "UNKNOWN PATH",
config_file.default_flags.__dict__,
config_file.command_line,
)
recorder.emit_parse_flags(flags.__dict__)
flags.validate()
recorder.emit_parse_flags(flags.__dict__)
except args.FlagError as e:
recorder.emit_end(f"Flags are invalid: {e}")
return 1
# Initialize status printing at this point, if desired.
if flags.status and not do_output_to_stdout:
do_status_output_signal.set()
termout.init()
# Process and initialize the incoming environment.
exec_env: environment.ExecutionEnvironment
try:
exec_env = environment.ExecutionEnvironment.initialize_from_args(flags)
except environment.EnvironmentError as e:
recorder.emit_end(
f"Failed to initialize environment: {e}\nDid you run fx set?"
)
return 1
recorder.emit_process_env(exec_env.__dict__)
# Configure file logging based on flags.
if flags.log and exec_env.log_file:
output_file: typing.TextIO
if exec_env.log_to_stdout():
output_file = sys.stdout
else:
output_file = gzip.open(exec_env.log_file, "wt")
tasks.append(asyncio.create_task(log.writer(recorder, output_file)))
recorder.emit_instruction_message(
f"Logging all output to: {exec_env.log_file}"
)
recorder.emit_instruction_message(
"Use the `--logpath` argument to specify a log location or `--no-log` to disable\n"
)
# For convenience, display the log output path when the program exits.
# Since the async loop may already be exited at that point, directly
# print to the console.
if not exec_env.log_to_stdout():
atexit.register(
print,
statusinfo.dim(
f"Output was logged to: {os.path.relpath(exec_env.log_file, os.getcwd())}",
style=flags.style,
),
)
if flags.has_debugger():
recorder.emit_warning_message(
"🛑 Debugger integration is currently experimental, follow https://fxbug.dev/319320287 for updates 🛑"
)
# Print a message for users who want to know how to see all test output.
if not flags.output:
recorder.emit_instruction_message(
f"To show all output, specify the `-o/--output` flag."
)
# Load the list of tests to execute.
try:
tests = await load_test_list(recorder, exec_env)
except Exception as e:
recorder.emit_end(f"Failed to load tests: {e}")
return 1
# Use flags to select which tests to run.
try:
mode = selection.SelectionMode.ANY
if flags.host:
mode = selection.SelectionMode.HOST
elif flags.device:
mode = selection.SelectionMode.DEVICE
elif flags.only_e2e:
mode = selection.SelectionMode.E2E
selections = await selection.select_tests(
tests,
flags.selection,
mode,
flags.fuzzy,
recorder=recorder,
exact_match=flags.exact,
)
# Mutate the selections based on the command line flags.
selections.apply_flags(flags)
if len(selections.selected_but_not_run) != 0:
total_count = len(selections.selected) + len(
selections.selected_but_not_run
)
recorder.emit_info_message(
f"Selected {total_count} tests, but only running {len(selections.selected)} due to flags."
)
recorder.emit_test_selections(selections)
except selection.SelectionError as e:
recorder.emit_end(f"Selection is invalid: {e}")
return 1
# Check that the selected tests are valid.
try:
await validate_test_selections(selections, recorder, flags)
except SelectionValidationError as e:
recorder.emit_end(str(e))
return 1
# Don't actually run any tests if --dry was specified, instead just
# print which tests were selected and exit.
if flags.dry:
recorder.emit_info_message("Selected the following tests:")
for s in selections.selected:
recorder.emit_info_message(f" {s.info.name}")
recorder.emit_instruction_message(
"\nWill not run any tests, --dry specified"
)
recorder.emit_end()
return 0
# If enabled, try to build and update the selected tests.
if flags.build and not await do_build(selections, recorder, exec_env):
recorder.emit_end("Failed to build.")
return 1
if flags.updateifinbase and has_tests_in_base(
selections, recorder, exec_env
):
status_suffix = (
"\nStatus output suspended." if termout.is_init() else ""
)
recorder.emit_info_message(f"\nBuilding update package.{status_suffix}")
recorder.emit_instruction_message(
"Use --no-updateifinbase to skip updating base packages."
)
build_return_code = await run_build_with_suspended_output(
["build/images/updates"],
show_output=not exec_env.log_to_stdout(),
)
if build_return_code != 0:
recorder.emit_end(
f"Failed to build update package ({build_return_code})"
)
return 1
recorder.emit_info_message("\nRunning an OTA before executing tests")
ota_result = await execution.run_command(
"fx", "ota", "--no-build", recorder=recorder, print_verbatim=True
)
if ota_result is None or ota_result.return_code != 0:
recorder.emit_warning_message(
"OTA failed, attempting to run tests anyway"
)
# Don't actually run tests if --list was specified, instead gather the
# list of test cases for each test and output to the user.
if flags.list:
recorder.emit_info_message("Enumerating all test cases...")
recorder.emit_instruction_message(
"Will not run any tests, --list specified"
)
await enumerate_test_cases(selections, recorder, flags, exec_env)
recorder.emit_end()
return 0
# Finally, run all selected tests.
if not await run_all_tests(selections, recorder, flags, exec_env):
recorder.emit_end("Test failures reported")
return 1
recorder.emit_end()
return 0
async def load_test_list(
recorder: event.EventRecorder, exec_env: environment.ExecutionEnvironment
) -> list[test_list_file.Test]:
"""Load the input files listing tests and parse them into a list of Tests.
Args:
recorder (event.EventRecorder): Recorder for events.
exec_env (environment.ExecutionEnvironment): Environment we run in.
Raises:
TestFileError: If the tests.json file is invalid.
DataParseError: If data could not be deserialized from JSON input.
JSONDecodeError: If a JSON file fails to parse.
IOError: If a file fails to open.
ValueError: If the tests.json and test-list.json files are
incompatible for some reason.
Returns:
list[test_list_file.Test]: List of available tests to execute.
"""
# Load the tests.json file.
try:
parse_id = recorder.emit_start_file_parsing(
exec_env.relative_to_root(exec_env.test_json_file),
exec_env.test_json_file,
)
test_file_entries: list[
tests_json_file.TestEntry
] = tests_json_file.TestEntry.from_file(exec_env.test_json_file)
recorder.emit_test_file_loaded(
test_file_entries, exec_env.test_json_file
)
recorder.emit_end(id=parse_id)
except (tests_json_file.TestFileError, json.JSONDecodeError, IOError) as e:
recorder.emit_end("Failed to parse: " + str(e), id=parse_id)
raise e
# Load the test-list.json file.
try:
parse_id = recorder.emit_start_file_parsing(
exec_env.relative_to_root(exec_env.test_list_file),
exec_env.test_list_file,
)
test_list_entries = test_list_file.TestListFile.entries_from_file(
exec_env.test_list_file
)
recorder.emit_end(id=parse_id)
except (dataparse.DataParseError, json.JSONDecodeError, IOError) as e:
recorder.emit_end("Failed to parse: " + str(e), id=parse_id)
raise e
# Join the contents of the two files and return it.
try:
tests = test_list_file.Test.join_test_descriptions(
test_file_entries, test_list_entries
)
return tests
except ValueError as e:
recorder.emit_end(
f"tests.json and test-list.json are inconsistent: {e}"
)
raise e
class SelectionValidationError(Exception):
"""A problem occurred when validating test selections.
The message contains a human-readable explanation of the problem.
"""
async def validate_test_selections(
selections: selection_types.TestSelections,
recorder: event.EventRecorder,
flags: args.Flags,
) -> None:
"""Validate the selections matched from tests.json.
Args:
selections (TestSelections): The selection output to validate.
recorder (event.EventRecorder): An event recorder to write useful messages to.
Raises:
SelectionValidationError: If the selections are invalid.
"""
missing_groups: list[selection_types.MatchGroup] = []
for group, matches in selections.group_matches:
if not matches:
missing_groups.append(group)
if missing_groups:
recorder.emit_warning_message(
"\nCould not find any tests to run for at least one set of arguments you provided."
)
recorder.emit_info_message(
"\nMake sure this test is transitively in your 'fx set' arguments."
)
recorder.emit_info_message(
"See https://fuchsia.dev/fuchsia-src/development/testing/faq for more information."
)
if flags.show_suggestions:
def suggestion_args(
arg: str, threshold: float | None = None
) -> list[str]:
name = "fx"
suggestion_args = [
"search-tests",
f"--max-results={flags.suggestion_count}",
"--color" if flags.style else "--no-color",
arg,
]
if threshold is not None:
suggestion_args += ["--threshold", str(threshold)]
return [name] + suggestion_args
arg_threshold_pairs = []
for group in missing_groups:
# Create pairs of a search string and threshold.
# Thresholds depend on the number of arguments joined.
# We have only a single search field, so we concatenate
# the names into one big group. To correct for lower
# match thresholds due to this union, we adjust the
# threshold when there is more than a single value to
# match against.
all_args = group.names.union(group.components).union(
group.packages
)
arg_threshold_pairs.append(
(
",".join(list(all_args)),
(
max(0.4, 0.9 - len(all_args) * 0.05)
if len(all_args) > 1
else None
),
),
)
outputs = await run_commands_in_parallel(
[
suggestion_args(arg_pair[0], arg_pair[1])
for arg_pair in arg_threshold_pairs
],
"Find suggestions",
recorder=recorder,
maximum_parallel=10,
)
if any([val is None for val in outputs]):
return
for group, output in zip(missing_groups, outputs):
assert output is not None # Checked above
recorder.emit_info_message(
f"\nFor `{group}`, did you mean any of the following?\n"
)
recorder.emit_verbatim_message(output.stdout)
if missing_groups:
raise SelectionValidationError(
"No tests found for the following selections:\n "
+ "\n ".join([str(m) for m in missing_groups])
)
async def do_build(
tests: selection_types.TestSelections,
recorder: event.EventRecorder,
exec_env: environment.ExecutionEnvironment,
) -> bool:
"""Attempt to build the selected tests.
Args:
tests (selection.TestSelections): Tests to attempt to build.
recorder (event.EventRecorder): Recorder for events.
exec_env (environment.ExecutionEnvironment): Incoming execution environment.
Returns:
bool: True only if the tests were built and published, False otherwise.
"""
# Labels start with // and end with a toolchain, starting with
# '('. Both toolchain and '//' need to be omitted for building
# device tests through fx build.
label_to_rule = re.compile(r"//([^()]+)\(")
build_command_line = []
for selection in tests.selected:
label = selection.build.test.package_label or selection.build.test.label
path = selection.build.test.path
if path is not None:
# Host tests are built by output name.
match = label_to_rule.match(label)
if match:
# NOTE: Prepend // for host labels, since that seems to be required.
build_command_line.extend(["--host", "//" + match.group(1)])
elif label:
# Other tests are built by label content, without toolchain.
match = label_to_rule.match(label)
if match:
build_command_line.append(match.group(1))
else:
recorder.emit_warning_message(f"Unknown entry {selection}")
return False
build_id = recorder.emit_build_start(targets=build_command_line)
recorder.emit_instruction_message("Use --no-build to skip building")
status_suffix = " Status output suspended." if termout.is_init() else ""
recorder.emit_info_message(f"\nExecuting build.{status_suffix}")
return_code = await run_build_with_suspended_output(
build_command_line, show_output=not exec_env.log_to_stdout()
)
error = None
if return_code != 0:
error = f"Build returned non-zero exit code {return_code}"
if error is not None:
recorder.emit_end(error, id=build_id)
return False
amber_directory = os.path.join(exec_env.out_dir, "amber-files")
delivery_blob_type = read_delivery_blob_type(exec_env, recorder)
publish_args = (
[
"fx",
"ffx",
"repository",
"publish",
"--trusted-root",
os.path.join(amber_directory, "repository/root.json"),
"--ignore-missing-packages",
"--time-versioning",
]
+ (
["--delivery-blob-type", str(delivery_blob_type)]
if delivery_blob_type is not None
else []
)
+ [
"--package-list",
os.path.join(exec_env.out_dir, "all_package_manifests.list"),
amber_directory,
]
)
output = await execution.run_command(
*publish_args,
recorder=recorder,
parent=build_id,
print_verbatim=True,
env={"CWD": exec_env.out_dir},
)
if not output:
error = "Failure publishing packages."
elif output.return_code != 0:
error = f"Publish returned non-zero exit code {output.return_code}"
elif not await post_build_checklist(tests, recorder, exec_env, build_id):
error = "Post build checklist failed"
recorder.emit_end(error, id=build_id)
return error is None
def read_delivery_blob_type(
exec_env: environment.ExecutionEnvironment,
recorder: event.EventRecorder,
) -> int | None:
"""Read the delivery blob type from the output directory.
The delivery_blob_config.json file contains a "type" field that must
be passed along to package publishing if set.
This functions attempts to load the file and returns the value of that
field if set.
Args:
exec_env (environment.ExecutionEnvironment): Test execution environment.
Returns:
int | None: The delivery blob type, if found. None otherwise.
"""
expected_path = os.path.join(exec_env.out_dir, "delivery_blob_config.json")
id = recorder.emit_start_file_parsing(
"delivery_blob_config.json", expected_path
)
if not os.path.isfile(expected_path):
recorder.emit_end(
error="Could not find delivery_blob_config.json in output", id=id
)
return None
with open(expected_path) as f:
val: dict[str, typing.Any] = json.load(f)
recorder.emit_end(id=id)
return int(val["type"]) if "type" in val else None
def has_tests_in_base(
tests: selection_types.TestSelections,
recorder: event.EventRecorder,
exec_env: environment.ExecutionEnvironment,
) -> bool:
base_file = os.path.join(exec_env.out_dir, "base_packages.list")
parse_id = recorder.emit_start_file_parsing("base_packages.list", base_file)
manifests: list[str]
try:
with open(base_file) as f:
contents = json.load(f)
manifests = contents["content"]["manifests"]
except (IOError, json.JSONDecodeError, KeyError) as e:
recorder.emit_end(f"Parsing file failed: {e}", id=parse_id)
raise e
manifest_ends = {m.split("/")[-1] for m in manifests}
in_base = [
name
for t in tests.selected
if (name := t.package_name()) in manifest_ends
]
if in_base:
names = ", ".join(in_base[:3])
tests_are_in_base_including = (
"tests are in base, including"
if len(in_base) > 1
else "test is in base:"
)
recorder.emit_info_message(
f"\n{len(in_base)} {tests_are_in_base_including} {names}"
)
recorder.emit_end(id=parse_id)
return bool(in_base)
@functools.lru_cache
async def has_device_connected(
recorder: event.EventRecorder, parent: event.Id | None = None
) -> bool:
"""Check if a device is connected for running target tests.
Args:
recorder (event.EventRecorder): Recorder for events.
parent (event.Id, optional): Parent task ID. Defaults to None.
Returns:
bool: True only if a device is available to run target tests.
"""
output = await execution.run_command(
"fx", "is-package-server-running", recorder=recorder, parent=parent
)
return output is not None and output.return_code == 0
async def run_build_with_suspended_output(
build_command_line: list[str],
show_output: bool = True,
) -> int:
# Allow display to update.
await asyncio.sleep(0.1)
if termout.is_init():
# Clear the status output while we are doing the build.
termout.write_lines([])
stdout = None if show_output else subprocess.DEVNULL
stderr = None if show_output else subprocess.DEVNULL
return_code = subprocess.call(
["fx", "build"] + build_command_line, stdout=stdout, stderr=stderr
)
return return_code
async def post_build_checklist(
tests: selection_types.TestSelections,
recorder: event.EventRecorder,
exec_env: environment.ExecutionEnvironment,
build_id: event.Id,
) -> bool:
"""Perform a number of post-build checks to ensure we are ready to run tests.
Args:
tests (selection.TestSelections): Tests selected to run.
recorder (event.EventRecorder): Recorder for events.
exec_env (environment.ExecutionEnvironment): Execution environment.
build_id (event.Id): ID of the build event to use at the parent of any operations executed here.
Returns:
bool: True only if post-build checks passed, False otherwise.
"""
if tests.has_device_test() and await has_device_connected(
recorder, parent=build_id
):
try:
if has_tests_in_base(tests, recorder, exec_env):
recorder.emit_info_message(
"Some selected test(s) are in the base package set. Running an OTA."
)
output = await execution.run_command(
"fx", "ota", recorder=recorder, print_verbatim=True
)
if not output or output.return_code != 0:
recorder.emit_warning_message("OTA failed")
return False
except IOError as e:
return False
return True
async def run_all_tests(
tests: selection_types.TestSelections,
recorder: event.EventRecorder,
flags: args.Flags,
exec_env: environment.ExecutionEnvironment,
) -> bool:
"""Execute all selected tests.
Args:
tests (selection.TestSelections): The selected tests to run.
recorder (event.EventRecorder): Recorder for events.
flags (args.Flags): Incoming command flags.
exec_env (environment.ExecutionEnvironment): Execution environment.
Returns:
bool: True only if all tests ran successfully, False otherwise.
"""
max_parallel = flags.parallel
if tests.has_device_test() and not await has_device_connected(recorder):
recorder.emit_warning_message(
"\nCould not find a running package server."
)
recorder.emit_instruction_message(
"\nYou do not seem to have a package server running, but you have selected at least one device test.\nEnsure that you have `fx serve` running and that you have selected your desired device using `fx set-device`.\n"
)
return False
device_environment: environment.DeviceEnvironment | None = None
if tests.has_e2e_test():
device_environment = (
await execution.get_device_environment_from_exec_env(
exec_env, recorder=recorder
)
)
test_group = recorder.emit_test_group(len(tests.selected) * flags.count)
@dataclass
class ExecEntry:
"""Wrapper for test executions to share a signal for aborting by groups."""
# The test execution to run.
exec: execution.TestExecution
# Signal for aborting the execution of a specific group of tests,
# including this one.
abort_group: asyncio.Event
@dataclass
class RunState:
total_running: int = 0
non_hermetic_running: int = 0
hermetic_test_queue: asyncio.Queue[ExecEntry] = field(
default_factory=lambda: asyncio.Queue()
)
non_hermetic_test_queue: asyncio.Queue[ExecEntry] = field(
default_factory=lambda: asyncio.Queue()
)
run_condition = asyncio.Condition()
run_state = RunState()
for test in tests.selected:
abort_group = asyncio.Event()
execs = [
execution.TestExecution(
test,
exec_env,
flags,
run_suffix=None if flags.count == 1 else i + 1,
device_env=(
None if not test.is_e2e_test() else device_environment
),
)
for i in range(flags.count)
]
for exec in execs:
if exec.is_hermetic():
run_state.hermetic_test_queue.put_nowait(
ExecEntry(exec, abort_group)
)
else:
run_state.non_hermetic_test_queue.put_nowait(
ExecEntry(exec, abort_group)
)
tasks = []
abort_all_tests_event = asyncio.Event()
test_failure_observed: bool = False
maybe_debugger: subprocess.Popen[bytes] | None = None
debugger_ready: asyncio.Condition = asyncio.Condition()
if flags.has_debugger():
async def on_debugger_ready() -> None:
# TODO(b/329317913): Emit a debugger event here.
async with debugger_ready:
debugger_ready.notify_all()
maybe_debugger = debugger.spawn(
tests.selected,
on_debugger_ready,
break_on_failure=flags.break_on_failure,
breakpoints=flags.breakpoints,
)
async def test_executor() -> None:
nonlocal test_failure_observed
to_run: ExecEntry
was_non_hermetic: bool = False
while True:
async with run_condition:
# Wait until we are allowed to try to run a test.
while run_state.total_running == max_parallel:
await run_condition.wait()
# If we should not execute any more tests, quit.
if abort_all_tests_event.is_set():
return
if (
run_state.non_hermetic_running == 0
and not run_state.non_hermetic_test_queue.empty()
):
to_run = run_state.non_hermetic_test_queue.get_nowait()
run_state.non_hermetic_running += 1
was_non_hermetic = True
elif run_state.hermetic_test_queue.empty():
return
else:
to_run = run_state.hermetic_test_queue.get_nowait()
was_non_hermetic = False
run_state.total_running += 1
test_suite_id = recorder.emit_test_suite_started(
to_run.exec.name(), not was_non_hermetic, parent=test_group
)
status: event.TestSuiteStatus
message: str | None = None
try:
if not to_run.abort_group.is_set():
# Only run if this group was not already aborted.
command_line = " ".join(to_run.exec.command_line())
recorder.emit_instruction_message(
f"Command: {command_line}"
)
# Wait for the command completion and any other signal that
# means we should stop running the test.
done, pending = await asyncio.wait(
[
asyncio.create_task(
to_run.exec.run(
recorder,
flags,
test_suite_id,
timeout=flags.timeout,
)
),
asyncio.create_task(abort_all_tests_event.wait()),
asyncio.create_task(to_run.abort_group.wait()),
],
return_when=asyncio.FIRST_COMPLETED,
)
for r in pending:
# Cancel pending tasks.
# This must happen before we throw exceptions to ensure
# tasks are properly cleaned up.
r.cancel()
if pending:
# Propagate cancellations
await asyncio.wait(pending)
for r in done:
# Re-throw exceptions.
r.result()
if abort_all_tests_event.is_set():
status = event.TestSuiteStatus.ABORTED
message = "Test suite aborted due to another failure"
elif to_run.abort_group.is_set():
status = event.TestSuiteStatus.ABORTED
message = "Aborted re-runs due to another failure"
else:
status = event.TestSuiteStatus.PASSED
except execution.TestCouldNotRun as e:
status = event.TestSuiteStatus.SKIPPED
message = str(e)
except execution.TestSkipped as e:
status = event.TestSuiteStatus.SKIPPED
message = str(e)
except (execution.TestTimeout, execution.TestFailed) as e:
if isinstance(e, execution.TestTimeout):
status = event.TestSuiteStatus.TIMEOUT
# Abort other tests in this group.
to_run.abort_group.set()
else:
status = event.TestSuiteStatus.FAILED
test_failure_observed = True
if flags.fail:
# Abort all other running tests, dropping through to the
# following run state code to trigger any waiting executors.
abort_all_tests_event.set()
finally:
recorder.emit_test_suite_ended(test_suite_id, status, message)
async with run_condition:
run_state.total_running -= 1
if was_non_hermetic:
run_state.non_hermetic_running -= 1
run_condition.notify()
# Wait for the debugger to signal that it is ready.
if maybe_debugger is not None:
async with debugger_ready:
await debugger_ready.wait()
for _ in range(max_parallel):
tasks.append(asyncio.create_task(test_executor()))
await asyncio.wait(tasks)
if maybe_debugger is not None:
# Close the fifo to signal zxdb to close and reset stdout to /dev/null so termout doesn't fail its cleanup.
sys.stdout.close()
sys.stdout = open(os.devnull, "w")
# This is a synchronous wait and we don't want to block the event loop, so run it in the
# default thread executor.
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, maybe_debugger.wait)
recorder.emit_end(id=test_group)
return not test_failure_observed
async def enumerate_test_cases(
tests: selection_types.TestSelections,
recorder: event.EventRecorder,
flags: args.Flags,
exec_env: environment.ExecutionEnvironment,
) -> None:
# Get the set of test executions that support enumeration.
executions = [
e
for t in tests.selected
if (
e := execution.TestExecution(t, exec_env, flags)
).enumerate_cases_command_line()
is not None
]
wont_enumerate_count = len(tests.selected) - len(executions)
outputs = await run_commands_in_parallel(
[
cmd_line
for e in executions
if (cmd_line := e.enumerate_cases_command_line()) is not None
],
group_name="Enumerate test cases",
recorder=recorder,
maximum_parallel=8,
)
assert len(outputs) == len(executions)
if wont_enumerate_count > 0:
recorder.emit_info_message(
f"\n{wont_enumerate_count:d} tests do not support enumeration"
)
failed_enumeration_names = []
for output, exec in zip(outputs, executions):
if output is None or output.return_code != 0:
failed_enumeration_names.append(exec.name())
continue
recorder.emit_enumerate_test_cases(
exec.name(), list(output.stdout.splitlines())
)
if failed_enumeration_names:
recorder.emit_info_message(
f"{len(failed_enumeration_names)} tests could not be enumerated"
)
async def run_commands_in_parallel(
commands: list[list[str]],
group_name: str,
recorder: event.EventRecorder | None = None,
maximum_parallel: int | None = None,
) -> list[command.CommandOutput | None]:
assert recorder
parent = recorder.emit_event_group(group_name, queued_events=len(commands))
output: list[command.CommandOutput | None] = [None] * len(commands)
in_progress: typing.Set[asyncio.Task[None]] = set()
index = 0
def can_add() -> bool:
nonlocal index
return index < len(commands) and (
maximum_parallel is None or len(in_progress) < maximum_parallel
)
while index < len(commands) or in_progress:
while can_add():
async def set_index(i: int) -> None:
output[i] = await execution.run_command(
*commands[i], recorder=recorder, parent=parent
)
in_progress.add(asyncio.create_task(set_index(index)))
index += 1
_, in_progress = await asyncio.wait(
in_progress, return_when="FIRST_COMPLETED"
)
recorder.emit_end(id=parent)
return output
if __name__ == "__main__":
main()