blob: 39a826ec6effa5cc2064e4ae84ca9a87d96dbf33 [file] [log] [blame] [edit]
# Copyright 2023 The Fuchsia Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
from __future__ import annotations
import asyncio
import atexit
from dataclasses import dataclass
from dataclasses import field
import functools
import gzip
import json
import os
import re
import subprocess
import sys
import typing
import args
import console
import dataparse
import environment
import event
import execution
import log
import selection
import selection_types
import statusinfo
import termout
import test_list_file
import tests_json_file
import util.command as command
import util.signals
def main():
# Main entrypoint.
# Set up the event loop to catch termination signals (i.e. Ctrl+C), and
# cancel the main task when they are received.
fut = asyncio.ensure_future(async_main_wrapper(args.parse_args()))
util.signals.register_on_terminate_signal(fut.cancel)
try:
loop = asyncio.get_event_loop()
loop.run_until_complete(fut)
sys.exit(fut.result())
except asyncio.CancelledError:
print("\n\nReceived interrupt, exiting")
sys.exit(1)
async def async_main_wrapper(
flags: args.Flags, recorder: event.EventRecorder | None = None
) -> int:
"""Wrapper for the main logic of fx test.
This wrapper creates a list containing tasks that must be
awaited before the program exits. The main logic may add tasks to this
list during execution, and then return the intended status code.
Args:
flags (args.Flags): Flags to pass into the main function.
recorder (event.EventRecorder | None, optional): If set,
use this event recorder. Used for testing.
Returns:
The return code of the program.
"""
tasks: typing.List[asyncio.Task] = []
if recorder is None:
recorder = event.EventRecorder()
ret = await async_main(flags, tasks, recorder)
try:
await asyncio.wait_for(asyncio.wait(tasks), timeout=5)
except asyncio.TimeoutError:
print(
"\n\nTimed out waiting for tasks to exit, terminating...\n",
file=sys.stderr,
)
return ret
async def async_main(
flags: args.Flags,
tasks: typing.List[asyncio.Task],
recorder: event.EventRecorder,
) -> int:
"""Main logic of fx test.
Args:
flags (args.Flags): Flags controlling the behavior of fx test.
tasks (List[asyncio.Tasks]): List to add tasks to that must be awaited before termination.
recorder (event.Recorder): The recorder for events.
Returns:
The return code of the program.
"""
do_status_output_signal: asyncio.Event = asyncio.Event()
tasks.append(
asyncio.create_task(
console.console_printer(recorder, flags, do_status_output_signal)
)
)
# Initialize event recording.
recorder.emit_init()
info_first_line = (
"✅ You are using the new fx test, which is currently in Fishfood ✅"
)
info_block = """See details here: https://fuchsia.googlesource.com/fuchsia/+/refs/heads/main/scripts/fxtest/rewrite
To go back to the old fx test, use `fx --enable=legacy_fxtest test`, and please file a bug under b/293917801.
"""
recorder.emit_info_message(info_first_line)
recorder.emit_instruction_message(info_block)
# Try to parse the flags. Emit one event before and another
# after flag post processing.
try:
recorder.emit_parse_flags(flags.__dict__)
flags.validate()
recorder.emit_parse_flags(flags.__dict__)
except args.FlagError as e:
recorder.emit_end(f"Flags are invalid: {e}")
return 1
# Initialize status printing at this point, if desired.
if flags.status:
do_status_output_signal.set()
termout.init()
# Process and initialize the incoming environment.
exec_env: environment.ExecutionEnvironment
try:
exec_env = environment.ExecutionEnvironment.initialize_from_args(flags)
except environment.EnvironmentError as e:
recorder.emit_end(
f"Failed to initialize environment: {e}\nDid you run fx set?"
)
return 1
recorder.emit_process_env(exec_env.__dict__)
# Configure file logging based on flags.
if flags.log and exec_env.log_file:
tasks.append(
asyncio.create_task(
log.writer(recorder, gzip.open(exec_env.log_file, "wt"))
)
)
recorder.emit_instruction_message(
f"Logging all output to: {exec_env.log_file}"
)
recorder.emit_instruction_message(
"Use the `--logpath` argument to specify a log location or `--no-log` to disable\n"
)
# For convenience, display the log output path when the program exits.
# Since the async loop may already be exited at that point, directly
# print to the console.
atexit.register(
print,
statusinfo.dim(
f"Output was logged to: {os.path.relpath(exec_env.log_file, os.getcwd())}",
style=flags.style,
),
)
# Print a message for users who want to know how to see all test output.
if not flags.output:
recorder.emit_instruction_message(
f"To show all output, specify the `-o/--output` flag."
)
# Load the list of tests to execute.
try:
tests = await load_test_list(recorder, exec_env)
except Exception as e:
recorder.emit_end(f"Failed to load tests: {e}")
return 1
# Use flags to select which tests to run.
try:
mode = selection.SelectionMode.ANY
if flags.host:
mode = selection.SelectionMode.HOST
elif flags.device:
mode = selection.SelectionMode.DEVICE
selections = await selection.select_tests(
tests,
flags.selection,
mode,
flags.fuzzy,
recorder=recorder,
)
# Mutate the selections based on the command line flags.
selections.apply_flags(flags)
if len(selections.selected_but_not_run) != 0:
total_count = len(selections.selected) + len(
selections.selected_but_not_run
)
recorder.emit_info_message(
f"Selected {total_count} tests, but only running {len(selections.selected)} due to flags."
)
recorder.emit_test_selections(selections)
except selection.SelectionError as e:
recorder.emit_end(f"Selection is invalid: {e}")
return 1
# Check that the selected tests are valid.
try:
await validate_test_selections(selections, recorder, flags)
except SelectionValidationError as e:
recorder.emit_end(str(e))
return 1
# Don't actually run any tests if --dry was specified, instead just
# print which tests were selected and exit.
if flags.dry:
recorder.emit_info_message("Selected the following tests:")
for s in selections.selected:
recorder.emit_info_message(f" {s.info.name}")
recorder.emit_instruction_message(
"\nWill not run any tests, --dry specified"
)
recorder.emit_end()
return 0
# If enabled, try to build and update the selected tests.
if flags.build and not await do_build(selections, recorder, exec_env):
recorder.emit_end("Failed to build.")
return 1
# Don't actually run tests if --list was specified, instead gather the
# list of test cases for each test and output to the user.
if flags.list:
recorder.emit_info_message("Enumerating all test cases...")
recorder.emit_instruction_message(
"Will not run any tests, --list specified"
)
await enumerate_test_cases(selections, recorder, flags, exec_env)
recorder.emit_end()
return 0
# Finally, run all selected tests.
if not await run_all_tests(selections, recorder, flags, exec_env):
recorder.emit_end("Test failures reported")
return 1
recorder.emit_end()
return 0
async def load_test_list(
recorder: event.EventRecorder, exec_env: environment.ExecutionEnvironment
) -> typing.List[test_list_file.Test]:
"""Load the input files listing tests and parse them into a list of Tests.
Args:
recorder (event.EventRecorder): Recorder for events.
exec_env (environment.ExecutionEnvironment): Environment we run in.
Raises:
TestFileError: If the tests.json file is invalid.
DataParseError: If data could not be deserialized from JSON input.
JSONDecodeError: If a JSON file fails to parse.
IOError: If a file fails to open.
ValueError: If the tests.json and test-list.json files are
incompatible for some reason.
Returns:
typing.List[test_list_file.Test]: List of available tests to execute.
"""
# Load the tests.json file.
try:
parse_id = recorder.emit_start_file_parsing(
exec_env.relative_to_root(exec_env.test_json_file),
exec_env.test_json_file,
)
test_file_entries: typing.List[
tests_json_file.TestEntry
] = tests_json_file.TestEntry.from_file(exec_env.test_json_file)
recorder.emit_test_file_loaded(
test_file_entries, exec_env.test_json_file
)
recorder.emit_end(id=parse_id)
except (tests_json_file.TestFileError, json.JSONDecodeError, IOError) as e:
recorder.emit_end("Failed to parse: " + str(e), id=parse_id)
raise e
# Load the test-list.json file.
try:
parse_id = recorder.emit_start_file_parsing(
exec_env.relative_to_root(exec_env.test_list_file),
exec_env.test_list_file,
)
test_list_entries = test_list_file.TestListFile.entries_from_file(
exec_env.test_list_file
)
recorder.emit_end(id=parse_id)
except (dataparse.DataParseError, json.JSONDecodeError, IOError) as e:
recorder.emit_end("Failed to parse: " + str(e), id=parse_id)
raise e
# Join the contents of the two files and return it.
try:
tests = test_list_file.Test.join_test_descriptions(
test_file_entries, test_list_entries
)
return tests
except ValueError as e:
recorder.emit_end(
f"tests.json and test-list.json are inconsistent: {e}"
)
raise e
class SelectionValidationError(Exception):
"""A problem occurred when validating test selections.
The message contains a human-readable explanation of the problem.
"""
async def validate_test_selections(
selections: selection_types.TestSelections,
recorder: event.EventRecorder,
flags: args.Flags,
):
"""Validate the selections matched from tests.json.
Args:
selections (TestSelections): The selection output to validate.
recorder (event.EventRecorder): An event recorder to write useful messages to.
Raises:
SelectionValidationError: If the selections are invalid.
"""
missing_groups: typing.List[selection_types.MatchGroup] = []
for group, matches in selections.group_matches:
if not matches:
missing_groups.append(group)
if missing_groups:
recorder.emit_warning_message(
"\nCould not find any tests to run for at least one set of arguments you provided."
)
recorder.emit_info_message(
"\nMake sure this test is transitively in your 'fx set' arguments."
)
recorder.emit_info_message(
"See https://fuchsia.dev/fuchsia-src/development/testing/faq for more information."
)
if flags.show_suggestions:
def suggestion_args(
arg: str, threshold: float | None = None
) -> typing.List[str]:
name = "fx"
suggestion_args = [
"search-tests",
f"--max-results={flags.suggestion_count}",
arg,
]
if threshold is not None:
suggestion_args += ["--threshold", str(threshold)]
if not flags.style:
suggestion_args += ["--no-color"]
return [name] + suggestion_args
arg_threshold_pairs = []
for group in missing_groups:
# Create pairs of a search string and threshold.
# Thresholds depend on the number of arguments joined.
# We have only a single search field, so we concatenate
# the names into one big group. To correct for lower
# match thresholds due to this union, we adjust the
# threshold when there is more than a single value to
# match against.
all_args = group.names.union(group.components).union(
group.packages
)
arg_threshold_pairs.append(
(
",".join(list(all_args)),
max(0.4, 0.9 - len(all_args) * 0.05)
if len(all_args) > 1
else None,
),
)
outputs = await run_commands_in_parallel(
[
suggestion_args(arg_pair[0], arg_pair[1])
for arg_pair in arg_threshold_pairs
],
"Find suggestions",
recorder=recorder,
maximum_parallel=10,
)
if any([val is None for val in outputs]):
return False
for group, output in zip(missing_groups, outputs):
assert output is not None # Checked above
recorder.emit_info_message(
f"\nFor `{group}`, did you mean any of the following?\n"
)
recorder.emit_verbatim_message(output.stdout)
if missing_groups:
raise SelectionValidationError(
"No tests found for the following selections:\n "
+ "\n ".join([str(m) for m in missing_groups])
)
async def do_build(
tests: selection_types.TestSelections,
recorder: event.EventRecorder,
exec_env: environment.ExecutionEnvironment,
) -> bool:
"""Attempt to build the selected tests.
Args:
tests (selection.TestSelections): Tests to attempt to build.
recorder (event.EventRecorder): Recorder for events.
exec_env (environment.ExecutionEnvironment): Incoming execution environment.
Returns:
bool: True only if the tests were built and published, False otherwise.
"""
label_to_rule = re.compile(r"//([^()]+)\(")
build_command_line = []
for selection in tests.selected:
label = selection.build.test.package_label or selection.build.test.label
path = selection.build.test.path
if path is not None:
# Host tests are built by output name.
build_command_line.append(path)
elif label:
# Other tests are built by label content, without toolchain.
match = label_to_rule.match(label)
if match:
build_command_line.append(match.group(1))
else:
recorder.emit_warning_message(f"Unknown entry {selection}")
return False
build_id = recorder.emit_build_start(targets=build_command_line)
recorder.emit_instruction_message("Use --no-build to skip building")
status_suffix = " Status output suspended." if termout.is_init() else ""
recorder.emit_info_message(f"\nExecuting build.{status_suffix}")
# Allow display to update.
await asyncio.sleep(0.1)
if termout.is_init():
# Clear the status output while we are doing the build.
termout.write_lines([])
return_code = subprocess.call(
["fx", "build"] + build_command_line,
)
error = None
if return_code != 0:
error = f"Build returned non-zero exit code {return_code}"
if error is not None:
recorder.emit_end(error, id=build_id)
return False
amber_directory = os.path.join(exec_env.out_dir, "amber-files")
delivery_blob_type = read_delivery_blob_type(exec_env, recorder)
publish_args = (
[
"fx",
"ffx",
"repository",
"publish",
"--trusted-root",
os.path.join(amber_directory, "repository/root.json"),
"--ignore-missing-packages",
"--time-versioning",
]
+ (
["--delivery-blob-type", str(delivery_blob_type)]
if delivery_blob_type is not None
else []
)
+ [
"--package-list",
os.path.join(exec_env.out_dir, "all_package_manifests.list"),
amber_directory,
]
)
output = await execution.run_command(
*publish_args,
recorder=recorder,
parent=build_id,
print_verbatim=True,
env={"CWD": exec_env.out_dir},
)
if not output:
error = "Failure publishing packages."
elif output.return_code != 0:
error = f"Publish returned non-zero exit code {output.return_code}"
elif not await post_build_checklist(tests, recorder, exec_env, build_id):
error = "Post build checklist failed"
recorder.emit_end(error, id=build_id)
return error is None
def read_delivery_blob_type(
exec_env: environment.ExecutionEnvironment,
recorder: event.EventRecorder,
) -> int | None:
"""Read the delivery blob type from the output directory.
The delivery_blob_config.json file contains a "type" field that must
be passed along to package publishing if set.
This functions attempts to load the file and returns the value of that
field if set.
Args:
exec_env (environment.ExecutionEnvironment): Test execution environment.
Returns:
int | None: The delivery blob type, if found. None otherwise.
"""
expected_path = os.path.join(exec_env.out_dir, "delivery_blob_config.json")
id = recorder.emit_start_file_parsing(
"delivery_blob_config.json", expected_path
)
if not os.path.isfile(expected_path):
recorder.emit_end(
error="Could not find delivery_blob_config.json in output", id=id
)
return None
with open(expected_path) as f:
val: typing.Dict[str, typing.Any] = json.load(f)
recorder.emit_end(id=id)
return int(val["type"]) if "type" in val else None
def has_tests_in_base(
tests: selection_types.TestSelections,
recorder: event.EventRecorder,
exec_env: environment.ExecutionEnvironment,
) -> bool:
# TODO(b/291144505): This logic was ported from update-if-in-base, but it appears to be wrong.
# BUG(b/291144505): Fix this.
base_file = os.path.join(exec_env.out_dir, "base_packages.list")
parse_id = recorder.emit_start_file_parsing("base_packages.list", base_file)
contents = None
try:
with open(base_file) as f:
contents = f.read()
except IOError as e:
recorder.emit_end(f"Parsing file failed: {e}", id=parse_id)
raise e
ret = any(
[
t.build.test.package_url is not None
and t.build.test.package_url in contents
for t in tests.selected
]
)
recorder.emit_end(id=parse_id)
return ret
@functools.lru_cache
async def has_device_connected(
recorder: event.EventRecorder, parent: event.Id | None = None
) -> bool:
"""Check if a device is connected for running target tests.
Args:
recorder (event.EventRecorder): Recorder for events.
parent (event.Id, optional): Parent task ID. Defaults to None.
Returns:
bool: True only if a device is available to run target tests.
"""
output = await execution.run_command(
"fx", "is-package-server-running", recorder=recorder, parent=parent
)
return output is not None and output.return_code == 0
async def post_build_checklist(
tests: selection_types.TestSelections,
recorder: event.EventRecorder,
exec_env: environment.ExecutionEnvironment,
build_id: event.Id,
) -> bool:
"""Perform a number of post-build checks to ensure we are ready to run tests.
Args:
tests (selection.TestSelections): Tests selected to run.
recorder (event.EventRecorder): Recorder for events.
exec_env (environment.ExecutionEnvironment): Execution environment.
build_id (event.Id): ID of the build event to use at the parent of any operations executed here.
Returns:
bool: True only if post-build checks passed, False otherwise.
"""
if tests.has_device_test() and await has_device_connected(
recorder, parent=build_id
):
try:
if has_tests_in_base(tests, recorder, exec_env):
recorder.emit_info_message(
"Some selected test(s) are in the base package set. Running an OTA."
)
output = await execution.run_command(
"fx", "ota", recorder=recorder, print_verbatim=True
)
if not output or output.return_code != 0:
recorder.emit_warning_message("OTA failed")
return False
except IOError as e:
return False
return True
async def run_all_tests(
tests: selection_types.TestSelections,
recorder: event.EventRecorder,
flags: args.Flags,
exec_env: environment.ExecutionEnvironment,
) -> bool:
"""Execute all selected tests.
Args:
tests (selection.TestSelections): The selected tests to run.
recorder (event.EventRecorder): Recorder for events.
flags (args.Flags): Incoming command flags.
exec_env (environment.ExecutionEnvironment): Execution environment.
Returns:
bool: True only if all tests ran successfully, False otherwise.
"""
max_parallel = flags.parallel
if tests.has_device_test() and not await has_device_connected(recorder):
recorder.emit_warning_message(
"\nCould not find a running package server."
)
recorder.emit_instruction_message(
"\nYou do not seem to have a package server running, but you have selected at least one device test.\nEnsure that you have `fx serve` running and that you have selected your desired device using `fx set-device`.\n"
)
return False
test_group = recorder.emit_test_group(len(tests.selected) * flags.count)
@dataclass
class ExecEntry:
"""Wrapper for test executions to share a signal for aborting by groups."""
# The test execution to run.
exec: execution.TestExecution
# Signal for aborting the execution of a specific group of tests,
# including this one.
abort_group: asyncio.Event
@dataclass
class RunState:
total_running: int = 0
non_hermetic_running: int = 0
hermetic_test_queue: asyncio.Queue[ExecEntry] = field(
default_factory=lambda: asyncio.Queue()
)
non_hermetic_test_queue: asyncio.Queue[ExecEntry] = field(
default_factory=lambda: asyncio.Queue()
)
run_condition = asyncio.Condition()
run_state = RunState()
for test in tests.selected:
abort_group = asyncio.Event()
execs = [
execution.TestExecution(
test,
exec_env,
flags,
run_suffix=None if flags.count == 1 else i + 1,
)
for i in range(flags.count)
]
for exec in execs:
if exec.is_hermetic():
run_state.hermetic_test_queue.put_nowait(
ExecEntry(exec, abort_group)
)
else:
run_state.non_hermetic_test_queue.put_nowait(
ExecEntry(exec, abort_group)
)
tasks = []
abort_all_tests_event = asyncio.Event()
test_failure_observed: bool = False
async def test_executor():
nonlocal test_failure_observed
to_run: ExecEntry
was_non_hermetic: bool = False
while True:
async with run_condition:
# Wait until we are allowed to try to run a test.
while run_state.total_running == max_parallel:
await run_condition.wait()
# If we should not execute any more tests, quit.
if abort_all_tests_event.is_set():
return
if (
run_state.non_hermetic_running == 0
and not run_state.non_hermetic_test_queue.empty()
):
to_run = run_state.non_hermetic_test_queue.get_nowait()
run_state.non_hermetic_running += 1
was_non_hermetic = True
elif run_state.hermetic_test_queue.empty():
return
else:
to_run = run_state.hermetic_test_queue.get_nowait()
was_non_hermetic = False
run_state.total_running += 1
test_suite_id = recorder.emit_test_suite_started(
to_run.exec.name(), not was_non_hermetic, parent=test_group
)
status: event.TestSuiteStatus
message: typing.Optional[str] = None
try:
if not to_run.abort_group.is_set():
# Only run if this group was not already aborted.
command_line = " ".join(to_run.exec.command_line())
recorder.emit_instruction_message(
f"Command: {command_line}"
)
# Wait for the command completion and any other signal that
# means we should stop running the test.
done, pending = await asyncio.wait(
[
asyncio.create_task(
to_run.exec.run(
recorder,
flags,
test_suite_id,
timeout=flags.timeout,
)
),
asyncio.create_task(abort_all_tests_event.wait()),
asyncio.create_task(to_run.abort_group.wait()),
],
return_when=asyncio.FIRST_COMPLETED,
)
for r in pending:
# Cancel pending tasks.
# This must happen before we throw exceptions to ensure
# tasks are properly cleaned up.
r.cancel()
if pending:
# Propagate cancellations
await asyncio.wait(pending)
for r in done:
# Re-throw exceptions.
r.result()
if abort_all_tests_event.is_set():
status = event.TestSuiteStatus.ABORTED
message = "Test suite aborted due to another failure"
elif to_run.abort_group.is_set():
status = event.TestSuiteStatus.ABORTED
message = "Aborted re-runs due to another failure"
else:
status = event.TestSuiteStatus.PASSED
except execution.TestCouldNotRun as e:
status = event.TestSuiteStatus.SKIPPED
message = str(e)
except (execution.TestTimeout, execution.TestFailed) as e:
if isinstance(e, execution.TestTimeout):
status = event.TestSuiteStatus.TIMEOUT
# Abort other tests in this group.
to_run.abort_group.set()
else:
status = event.TestSuiteStatus.FAILED
test_failure_observed = True
if flags.fail:
# Abort all other running tests, dropping through to the
# following run state code to trigger any waiting executors.
abort_all_tests_event.set()
finally:
recorder.emit_test_suite_ended(test_suite_id, status, message)
async with run_condition:
run_state.total_running -= 1
if was_non_hermetic:
run_state.non_hermetic_running -= 1
run_condition.notify()
for _ in range(max_parallel):
tasks.append(asyncio.create_task(test_executor()))
await asyncio.wait(tasks)
recorder.emit_end(id=test_group)
return not test_failure_observed
async def enumerate_test_cases(
tests: selection_types.TestSelections,
recorder: event.EventRecorder,
flags: args.Flags,
exec_env: environment.ExecutionEnvironment,
):
# Get the set of test executions that support enumeration.
executions = [
e
for t in tests.selected
if (
e := execution.TestExecution(t, exec_env, flags)
).enumerate_cases_command_line()
is not None
]
wont_enumerate_count = len(tests.selected) - len(executions)
outputs = await run_commands_in_parallel(
[
cmd_line
for e in executions
if (cmd_line := e.enumerate_cases_command_line()) is not None
],
group_name="Enumerate test cases",
recorder=recorder,
maximum_parallel=8,
)
assert len(outputs) == len(executions)
if wont_enumerate_count > 0:
recorder.emit_info_message(
f"\n{wont_enumerate_count:d} tests do not support enumeration"
)
failed_enumeration_names = []
for output, exec in zip(outputs, executions):
if output is None or output.return_code != 0:
failed_enumeration_names.append(exec.name())
continue
recorder.emit_enumerate_test_cases(
exec.name(), list(output.stdout.splitlines())
)
if failed_enumeration_names:
recorder.emit_info_message(
f"{len(failed_enumeration_names)} tests could not be enumerated"
)
async def run_commands_in_parallel(
commands: typing.List[typing.List[str]],
group_name: str,
recorder: typing.Optional[event.EventRecorder] = None,
maximum_parallel: typing.Optional[int] = None,
) -> typing.List[typing.Optional[command.CommandOutput]]:
assert recorder
parent = recorder.emit_event_group(group_name, queued_events=len(commands))
output: typing.List[typing.Optional[command.CommandOutput]] = [None] * len(
commands
)
in_progress: typing.Set[asyncio.Task] = set()
index = 0
def can_add() -> bool:
nonlocal index
return index < len(commands) and (
maximum_parallel is None or len(in_progress) < maximum_parallel
)
while index < len(commands) or in_progress:
while can_add():
async def set_index(i: int):
output[i] = await execution.run_command(
*commands[i], recorder=recorder, parent=parent
)
in_progress.add(asyncio.create_task(set_index(index)))
index += 1
_, in_progress = await asyncio.wait(
in_progress, return_when="FIRST_COMPLETED"
)
recorder.emit_end(id=parent)
return output
if __name__ == "__main__":
main()