| # Copyright 2023 The Fuchsia Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| import asyncio |
| from collections import defaultdict |
| from dataclasses import dataclass |
| import datetime |
| from itertools import chain |
| import os |
| import time |
| import typing |
| |
| import args |
| import event |
| import statusinfo |
| import termout |
| |
| |
| @dataclass |
| class DurationInfo: |
| """Tracks an individual event duration. |
| |
| Events with started=True denote the beginning of a duration and must be |
| matched by an event with ending=True and the same Id. |
| |
| Instances of this class track durations over time for display purposes. |
| """ |
| |
| # A formatted name for the duration, used as a label. |
| name: str |
| |
| # The start monotonic time. |
| start_monotonic: float |
| |
| # The optional parent of the duration. |
| parent: event.Id | None |
| |
| # The number of expected children. If set, we can show a progress bar. |
| expected_child_tasks: int = 0 |
| |
| # If True, hide children of this duration from the display. |
| hide_children: bool = False |
| |
| |
| class ConsoleState: |
| """Holder for all console output state. |
| |
| Attributes: |
| root_path: The root Fuchsia directory. Starts empty until an |
| event containing it is processed. |
| active_durations: Map from Id to DurationInfo for durations |
| that have not yet ended. |
| complete_durations: Map from Id to DurationInfo for durations |
| that have ended. |
| end_duration: The elapsed time for the entire run, measured |
| as the difference between the start and end of GLOBAL_RUN_ID. |
| Only set after the global run has ended. |
| test_results: Map from status to a set of tests with that |
| status. This is the canonical result list for all tests. |
| """ |
| |
| def __init__(self) -> None: |
| self.root_path: str | None = None |
| self.active_durations: dict[event.Id, DurationInfo] = dict() |
| self.complete_durations: dict[event.Id, DurationInfo] = dict() |
| self.end_duration: float | None = None |
| self.test_results: dict[ |
| event.TestSuiteStatus, typing.Set[str] |
| ] = defaultdict(set) |
| |
| |
| async def console_printer( |
| recorder: event.EventRecorder, |
| flags: args.Flags, |
| do_status_output_event: asyncio.Event, |
| ) -> None: |
| """Asynchronous future that implements console printing. |
| |
| Continually reads events from the given recorder and presents status |
| updates to the terminal. This is the main output routine for fx test. |
| |
| Output is controlled by the given flags. |
| |
| This routine implements continual clearing and updating of a status |
| bar at the bottom of the user's terminal. This behavior is controlled |
| by the `do_status_output_event` asyncio.Event, which is set only when |
| continual status output is both desired and available. |
| |
| Args: |
| recorder (event.EventRecorder): Source of events to display. |
| flags (args.Flags): Command line flags to control formatting. |
| do_status_output_event (asyncio.Event): Display updating |
| status bar only if this is set. |
| """ |
| |
| state = ConsoleState() |
| print_queue: asyncio.Queue[list[str]] = asyncio.Queue() |
| |
| # Spawn an asynchronous task to actually process incoming events. |
| # The rest of this method simply displays the status output and prints |
| # lines that are requested by the other task. |
| event_loop = asyncio.create_task( |
| _console_event_loop(recorder, flags, state, print_queue) |
| ) |
| |
| # Keep pumping events until there will be no more. |
| while not event_loop.done() or not print_queue.empty(): |
| # First try to get some lines that need to be printed. |
| # If there is nothing to print by the time we need to refresh, timeout |
| # and refresh the output. |
| try: |
| lines_to_print = await asyncio.wait_for( |
| print_queue.get(), flags.status_delay |
| ) |
| except asyncio.TimeoutError: |
| lines_to_print = [] |
| |
| if do_status_output_event.is_set(): |
| status_lines = _create_status_lines_from_state(flags, state) |
| |
| # Print status output, leaving an extra line to separate |
| # from prepended lines. |
| termout.write_lines( |
| [""] + status_lines[: flags.status_lines], lines_to_print |
| ) |
| elif lines_to_print: |
| print("\n".join(lines_to_print)) |
| |
| # We are done with all events, clean up and exit. |
| |
| if do_status_output_event.is_set(): |
| # Clear status output. |
| termout.write_lines([], []) |
| |
| if state.test_results: |
| passed = len(state.test_results[event.TestSuiteStatus.PASSED]) |
| failed = ( |
| len(state.test_results[event.TestSuiteStatus.FAILED]) |
| + len(state.test_results[event.TestSuiteStatus.TIMEOUT]) |
| + len(state.test_results[event.TestSuiteStatus.FAILED_TO_START]) |
| ) |
| skipped = len(state.test_results[event.TestSuiteStatus.SKIPPED]) |
| passed_text = pass_format(passed, flags.style) |
| failed_text = fail_format(failed, flags.style) |
| skipped_text = skip_format(skipped, flags.style) |
| |
| print( |
| f"\nRAN: {passed+failed} {passed_text} {failed_text} {skipped_text}" |
| ) |
| |
| print( |
| statusinfo.dim( |
| f"\nCompleted in {state.end_duration:.3f}s", |
| style=flags.style, |
| ) |
| ) |
| |
| if state.active_durations: |
| print( |
| statusinfo.error_highlight( |
| "BUG: Durations still active at exit:", style=flags.style |
| ) |
| ) |
| for id, duration in state.active_durations.items(): |
| print(f" {id} = {duration.__dict__}") |
| |
| await event_loop |
| |
| |
| @dataclass |
| class DurationPrintInfo: |
| """Wrap information needed to print the status of a task duration.""" |
| |
| # The DurationInfo we will print. |
| info: DurationInfo |
| |
| # How far indented the duration should be. |
| indent: int |
| |
| # If set, display a progress bar with this percent completion. |
| progress: float | None = None |
| |
| |
| @dataclass |
| class TaskStatus: |
| """Overall status of all tasks, for printing.""" |
| |
| # Number of tasks currently running. |
| tasks_running: int |
| |
| # Number of tasks that have completed |
| tasks_complete: int |
| |
| # Number of tasks that are queued but have not started running yet. |
| tasks_queued_but_not_running: int |
| |
| # Detailed information to print a status line for each in-progress duration. |
| duration_infos: list[DurationPrintInfo] |
| |
| def total_tasks(self) -> int: |
| return ( |
| self.tasks_running |
| + self.tasks_complete |
| + self.tasks_queued_but_not_running |
| ) |
| |
| |
| def _create_status_lines_from_state( |
| flags: args.Flags, state: ConsoleState |
| ) -> list[str]: |
| """Process the overall console state into a list of lines to present to the user. |
| |
| Args: |
| flags (args.Flags): Flags controlling output format. |
| state (ConsoleState): The console state to process. |
| |
| Returns: |
| list[str]: List of lines to present to the user. |
| """ |
| |
| # Process the state |
| task_status = _produce_task_status_from_state(state) |
| |
| # Current time for duration displays. |
| monotonic = time.monotonic() |
| |
| # Format the computed data as lines to print out. |
| status_lines = _format_duration_lines(flags, task_status) |
| |
| # Show an overall duration timer if the global run is started. |
| if event.GLOBAL_RUN_ID in state.active_durations: |
| run_duration = f"[duration: {statusinfo.format_duration(datetime.timedelta(seconds=monotonic - state.active_durations[event.GLOBAL_RUN_ID].start_monotonic).total_seconds())}]" |
| else: |
| run_duration = "" |
| |
| # Show pass/fail counts if tests have started completing. |
| pass_fail = "" |
| if state.test_results: |
| passed = len(state.test_results[event.TestSuiteStatus.PASSED]) |
| failed = ( |
| len(state.test_results[event.TestSuiteStatus.FAILED]) |
| + len(state.test_results[event.TestSuiteStatus.TIMEOUT]) |
| + len(state.test_results[event.TestSuiteStatus.FAILED_TO_START]) |
| ) |
| skipped = len(state.test_results[event.TestSuiteStatus.SKIPPED]) |
| passed_text = pass_format(passed, flags.style) |
| failed_text = fail_format(failed, flags.style) |
| skipped_text = skip_format(skipped, flags.style) |
| |
| pass_fail = ( |
| statusinfo.dim(" [tests: ", style=flags.style) |
| + f"{passed_text} {failed_text} {skipped_text}" |
| + statusinfo.dim("] ", style=flags.style) |
| ) |
| |
| # Print out the duration lines if they are present. |
| if status_lines: |
| status_lines = [ |
| statusinfo.green("Status: ", style=flags.style) |
| + statusinfo.dim(f"{run_duration}", style=flags.style) |
| + ("" if not pass_fail else pass_fail) |
| ] + status_lines |
| |
| return status_lines |
| |
| |
| def _produce_task_status_from_state(state: ConsoleState) -> TaskStatus: |
| # Generate a mapping of each duration to its children. |
| duration_children: dict[event.Id, list[event.Id]] = defaultdict(list) |
| all_durations: dict[event.Id, DurationInfo] = dict() |
| |
| for id, duration in chain( |
| state.active_durations.items(), state.complete_durations.items() |
| ): |
| if id != event.GLOBAL_RUN_ID: |
| duration_children[duration.parent or event.GLOBAL_RUN_ID].append(id) |
| all_durations[id] = duration |
| |
| # Calculate counts of how many tasks are in what state. |
| tasks_running = len(state.active_durations) |
| tasks_complete = len(state.complete_durations) |
| tasks_queued_but_not_running = 0 |
| for id, children in duration_children.items(): |
| expected = all_durations[id].expected_child_tasks |
| if expected and expected >= len(children): |
| tasks_queued_but_not_running += expected - len(children) |
| |
| # Process the active durations into DurationPrintInfo, which |
| # contains information on how to print the duration state. |
| # We perform an in-order tree traversal over all durations |
| # starting from the root, taking account only of those |
| # durations that are active and sorting by descending |
| # timestamp. |
| duration_print_infos: list[DurationPrintInfo] = [] |
| assert event.GLOBAL_RUN_ID in all_durations |
| |
| # Stack of duration event.Ids to process. Second |
| # element of the tuple tracks indent level. |
| work_stack: list[tuple[event.Id, int]] = [(event.GLOBAL_RUN_ID, 0)] |
| while work_stack: |
| id, indent = work_stack.pop() |
| info: DurationInfo | None = None |
| |
| if id == event.GLOBAL_RUN_ID: |
| pass |
| elif id not in state.active_durations: |
| continue |
| else: |
| progress = None |
| info = state.active_durations[id] |
| if info.expected_child_tasks: |
| progress = min( |
| 1.0, |
| sum( |
| [ |
| 1 if child_id in state.complete_durations else 0 |
| for child_id in duration_children.get(id, []) |
| ] |
| ) |
| / info.expected_child_tasks, |
| ) |
| duration_print_infos.append( |
| DurationPrintInfo(info, indent, progress) |
| ) |
| |
| if info is not None and info.hide_children: |
| # Skip processing children of this duration for display. |
| continue |
| |
| for child_id in duration_children.get(id, []): |
| children = [] |
| if child_id in state.active_durations: |
| children.append(child_id) |
| # Put children in the work stack in ascending |
| # order, so that they will be popped in descending |
| # order. |
| children.sort(key=lambda x: all_durations[x].start_monotonic) |
| work_stack.extend([(child_id, indent + 1) for child_id in children]) |
| |
| return TaskStatus( |
| tasks_running=tasks_running, |
| tasks_complete=tasks_complete, |
| tasks_queued_but_not_running=tasks_queued_but_not_running, |
| duration_infos=duration_print_infos, |
| ) |
| |
| |
| def _format_duration_lines(flags: args.Flags, status: TaskStatus) -> list[str]: |
| """Given the processed status for all tasks, format output based |
| on the flags. |
| |
| Args: |
| flags (args.Flags): Flags to control output format. |
| status (TaskStatus): Processed task status. |
| |
| Returns: |
| list[str]: A list of lines to present to the user. |
| """ |
| monotonic = time.monotonic() |
| duration_lines: list[str] = [] |
| for print_info in status.duration_infos: |
| prefix = " " * (print_info.indent * 2) |
| if print_info.progress is not None: |
| duration_lines.append( |
| statusinfo.status_progress( |
| prefix + print_info.info.name, |
| print_info.progress, |
| style=flags.style, |
| ) |
| ) |
| else: |
| duration_lines.append( |
| statusinfo.duration_progress( |
| prefix + print_info.info.name, |
| datetime.timedelta( |
| seconds=monotonic - print_info.info.start_monotonic |
| ), |
| style=flags.style, |
| ) |
| ) |
| return duration_lines |
| |
| |
| class TestExecutionInfo: |
| """Track and record a single test suite's execution.""" |
| |
| def __init__(self, name: str): |
| """Initialize execution info for a named suite. |
| |
| Args: |
| name (str): The test suite's name. |
| """ |
| self.name: str = name |
| self.buffered_lines: list[str] = [] |
| self.buffered_output_task: asyncio.Task[None] | None = None |
| |
| def spawn_buffered_output_printer( |
| self, timeout: float, queue: asyncio.Queue[list[str]] |
| ) -> None: |
| """Spawn an async task that will print buffered lines after a timeout. |
| |
| Args: |
| timeout (float): Seconds to wait before printing buffered data. |
| queue (asyncio.Queue[list[str]]): Queue to print the |
| buffered data to upon timeout. |
| """ |
| |
| async def output_printer_task() -> None: |
| """Task that handles sleeping and then sending queued lines. |
| |
| May be asynchronously canceled. |
| """ |
| await asyncio.sleep(timeout) |
| await queue.put(self.buffered_lines) |
| self.buffered_lines = [] # drop to release memory |
| |
| self.buffered_output_task = asyncio.create_task(output_printer_task()) |
| |
| def print_verbatim(self) -> bool: |
| """Determine if an output printer should skip buffering and just print. |
| |
| Returns: |
| bool: True if output should be printed verbatim, False otherwise. |
| """ |
| return (task := self.buffered_output_task) is not None and task.done() |
| |
| def should_buffer_output(self) -> bool: |
| """Determine if output should be buffered. |
| |
| Returns: |
| bool: True if an output printer should buffer lines in |
| this object, False otherwise. |
| """ |
| return ( |
| task := self.buffered_output_task |
| ) is not None and not task.done() |
| |
| def cleanup(self) -> None: |
| """Clean up any buffers, cancel, print tasks, and drop any buffered output.""" |
| if ( |
| self.buffered_output_task is not None |
| and not self.buffered_output_task.done() |
| ): |
| self.buffered_output_task.cancel() |
| self.buffered_lines = [] |
| |
| |
| async def _console_event_loop( |
| recorder: event.EventRecorder, |
| flags: args.Flags, |
| state: ConsoleState, |
| print_queue: asyncio.Queue[list[str]], |
| ) -> None: |
| """Internal event processor. |
| |
| This task processes the events generated by the given EventRecorder and |
| updates the given ConsoleState based on their contents. It may also |
| request that some lines be printed for the user to see. |
| |
| Args: |
| recorder (event.EventRecorder): Source of events to process. |
| flags (args.Flags): Command line flags for this invocation. |
| state (ConsoleState): Shared state object to update over time. |
| print_queue (asyncio.Queue): Queue for lines to print to the user. |
| """ |
| |
| # Keep track of ids corresponding to test suites for display purposes: |
| # 1. We need the name to report success or failure. |
| # 2. We flatten the status display so that commands run as |
| # part of a test execution are not shown. |
| # 3. We can buffer output from those programs for later display using |
| # the --slow flag. |
| test_suite_execution_info: dict[event.Id, TestExecutionInfo] = dict() |
| |
| # Keep track of task IDs that are nested under a test suite. |
| # This is needed to map buffered program output to the correct test suite. |
| event_id_to_test_suite: dict[event.Id, event.Id] = dict() |
| next_event: event.Event |
| async for next_event in recorder.iter(): |
| lines_to_print: list[str] = [] |
| |
| # If set, and we do verbose printing, append this suffix to the output. |
| verbose_suffix: str = "" |
| |
| old_duration: DurationInfo | None = None |
| if ( |
| next_event.ending |
| and next_event.id is not None |
| and next_event.id in state.active_durations |
| ): |
| old_duration = state.active_durations.pop(next_event.id) |
| state.complete_durations[next_event.id] = old_duration |
| elapsed_time = next_event.timestamp - old_duration.start_monotonic |
| verbose_suffix = ( |
| f" [duration={datetime.timedelta(seconds=elapsed_time)}]" |
| ) |
| if next_event.id == event.GLOBAL_RUN_ID: |
| state.end_duration = elapsed_time |
| |
| if flags.verbose: |
| # In verbose mode, refuse to print too many output characters |
| # to avoid scrolling info out of view. |
| lines_to_print.append( |
| statusinfo.ellipsize(recorder.event_string(next_event), 400) |
| + verbose_suffix |
| ) |
| |
| if next_event.payload: |
| if ( |
| next_event.id is not None |
| and next_event.starting |
| and next_event.parent not in test_suite_execution_info |
| ): |
| # Provide nice formatting for event types that need to be tracked for a duration. |
| |
| if next_event.id == event.GLOBAL_RUN_ID: |
| state.active_durations[next_event.id] = DurationInfo( |
| "fx test", |
| next_event.timestamp, |
| parent=next_event.parent, |
| ) |
| elif next_event.payload.parsing_file is not None: |
| styled_name = statusinfo.highlight( |
| "parsing", style=flags.style |
| ) |
| state.active_durations[next_event.id] = DurationInfo( |
| f"{styled_name} {next_event.payload.parsing_file.name}", |
| next_event.timestamp, |
| parent=next_event.parent, |
| ) |
| elif next_event.payload.program_execution is not None: |
| styled_name = statusinfo.highlight( |
| "running", style=flags.style |
| ) |
| state.active_durations[next_event.id] = DurationInfo( |
| f"{styled_name} {next_event.payload.program_execution.to_formatted_command_line()}", |
| next_event.timestamp, |
| parent=next_event.parent, |
| ) |
| elif ( |
| next_event.payload.event_group is not None |
| or next_event.payload.test_group is not None |
| ): |
| group: event.EventGroupPayload = next_event.payload.event_group or next_event.payload.test_group # type: ignore |
| styled_name = statusinfo.highlight( |
| group.name, style=flags.style |
| ) |
| state.active_durations[next_event.id] = DurationInfo( |
| styled_name, |
| next_event.timestamp, |
| parent=next_event.parent, |
| expected_child_tasks=group.queued_events or 0, |
| hide_children=group.hide_children, |
| ) |
| elif next_event.payload.build_targets: |
| styled_name = statusinfo.highlight( |
| f"Refreshing {len(next_event.payload.build_targets)} targets", |
| style=flags.style, |
| ) |
| state.active_durations[next_event.id] = DurationInfo( |
| styled_name, |
| next_event.timestamp, |
| parent=next_event.parent, |
| ) |
| elif next_event.payload.test_suite_started: |
| styled_name = statusinfo.highlight( |
| next_event.payload.test_suite_started.name, |
| style=flags.style, |
| ) |
| state.active_durations[next_event.id] = DurationInfo( |
| styled_name, |
| next_event.timestamp, |
| parent=next_event.parent, |
| ) |
| else: |
| # Fallback. Display an ugly error if this is triggered so that we can fix the bug. |
| styled_name = statusinfo.error_highlight( |
| f"BUG: no title for {next_event.payload.to_dict()}", # type:ignore |
| style=flags.style, |
| ) |
| state.active_durations[next_event.id] = DurationInfo( |
| styled_name, |
| next_event.timestamp, |
| parent=next_event.parent, |
| ) |
| |
| if ( |
| next_event.id is not None |
| and next_event.parent in test_suite_execution_info |
| ): |
| # Track direct children of a test suite, so their |
| # output can be buffered to the right suite. |
| if next_event.starting: |
| event_id_to_test_suite[next_event.id] = next_event.parent |
| elif next_event.ending: |
| del event_id_to_test_suite[next_event.id] |
| |
| if next_event.payload.process_env is not None: |
| # Extract the path from the parsed environment. |
| root_path = next_event.payload.process_env["fuchsia_dir"] |
| elif next_event.payload.user_message is not None: |
| # Style and display user messages. |
| quietable = False |
| msg = next_event.payload.user_message |
| if msg.level == event.MessageLevel.INSTRUCTION: |
| quietable = True |
| text = statusinfo.dim(msg.value, style=flags.style) |
| elif msg.level == event.MessageLevel.WARNING: |
| text = statusinfo.warning(msg.value, style=flags.style) |
| elif msg.level == event.MessageLevel.INFO: |
| quietable = True |
| text = msg.value |
| elif msg.level == event.MessageLevel.VERBATIM: |
| text = msg.value |
| else: |
| text = msg.value |
| if not quietable or not flags.quiet: |
| lines_to_print.append(text) |
| elif next_event.payload.program_output is not None: |
| output = next_event.payload.program_output |
| |
| if output.data.endswith("\n"): |
| data = output.data[:-1] |
| else: |
| data = output.data |
| |
| if output.print_verbatim: |
| # If a program execution requests verbatim output, |
| # print to console. |
| lines_to_print.append(data) |
| elif ( |
| next_event.id is not None |
| and (suite_id := event_id_to_test_suite.get(next_event.id)) |
| is not None |
| ): |
| # This output corresponds to a running suite. |
| suite_info = test_suite_execution_info[suite_id] |
| if suite_info.print_verbatim(): |
| # Already timed out, just print this output verbatim. |
| lines_to_print.append(data) |
| elif suite_info.should_buffer_output(): |
| # Awaiting timeout, buffer this output. |
| suite_info.buffered_lines.append(data) |
| elif ( |
| next_event.payload.test_file_loaded is not None |
| and not flags.quiet |
| ): |
| # Print a result to the user when the tests file is parsed. |
| test_info = next_event.payload.test_file_loaded |
| path = ( |
| "//" + os.path.relpath(test_info.file_path, root_path) |
| if root_path |
| else test_info.file_path |
| ) |
| lines_to_print.append( |
| f"\nFound {len(test_info.test_entries)} total tests in {statusinfo.green(path, style=flags.style)}" |
| ) |
| elif next_event.payload.test_selections and not flags.quiet: |
| # Print a result to the user when tests are selected. |
| if not flags.quiet: |
| count = len(next_event.payload.test_selections.selected) |
| label = statusinfo.highlight( |
| f"{count} test{'s' if count != 1 else ''}", |
| style=flags.style, |
| ) |
| suffix = statusinfo.highlight( |
| f" {flags.count} times" if flags.count > 1 else "", |
| style=flags.style, |
| ) |
| lines_to_print.append(f"\nPlan to run {label}{suffix}") |
| elif ( |
| next_event.payload.build_targets is not None and not flags.quiet |
| ): |
| # Print the number of targets we are refreshing. |
| label = statusinfo.highlight( |
| f"{len(next_event.payload.build_targets)} targets", |
| style=flags.style, |
| ) |
| lines_to_print.append( |
| f"\n{statusinfo.green('Refreshing', style=flags.style)} {label}" |
| ) |
| # Also output the command line used for fx build up to a limit, |
| # to avoid scrolling multiple pages. |
| lines_to_print.append( |
| statusinfo.ellipsize( |
| statusinfo.green_highlight( |
| f"> fx build {' '.join(next_event.payload.build_targets)}", |
| style=flags.style, |
| ), |
| width=80 * 5, # Approximately 5 lines |
| ), |
| ) |
| elif next_event.payload.test_group is not None: |
| # Let the user know we intend to run a number of tests. |
| val = statusinfo.highlight( |
| f"{next_event.payload.test_group.queued_events} tests", |
| style=flags.style, |
| ) |
| label = statusinfo.green("Running", style=flags.style) |
| lines_to_print.append(f"{label} {val}") |
| elif next_event.payload.test_suite_started is not None: |
| # Let the user know a test suite is starting. |
| assert next_event.id |
| suite_info = TestExecutionInfo( |
| next_event.payload.test_suite_started.name |
| ) |
| test_suite_execution_info[next_event.id] = suite_info |
| if flags.slow > 0: |
| suite_info.buffered_lines.extend( |
| [ |
| statusinfo.dim( |
| f"Runtime has exceeded {flags.slow} seconds", |
| style=flags.style, |
| ), |
| f"Showing output for {test_suite_execution_info[next_event.id].name}", |
| ] |
| ) |
| suite_info.spawn_buffered_output_printer( |
| flags.slow, print_queue |
| ) |
| |
| label = "Starting:" |
| val = statusinfo.green_highlight( |
| next_event.payload.test_suite_started.name, |
| style=flags.style, |
| ) |
| # Explicitly mark if the suite is hermetic or not. |
| hermeticity = ( |
| "" |
| if next_event.payload.test_suite_started.hermetic |
| else statusinfo.warning("(NOT HERMETIC)", style=flags.style) |
| ) |
| if not flags.quiet: |
| lines_to_print.append(f"\n{label} {val} {hermeticity}") |
| elif next_event.payload.test_suite_ended is not None: |
| # Let the user know a test suite has ended, and |
| # what its status is. |
| assert next_event.id |
| payload = next_event.payload.test_suite_ended |
| if payload.status == event.TestSuiteStatus.PASSED: |
| label = statusinfo.green_highlight( |
| "PASSED", style=flags.style |
| ) |
| elif payload.status == event.TestSuiteStatus.FAILED: |
| label = statusinfo.error_highlight( |
| "FAILED", style=flags.style |
| ) |
| elif payload.status == event.TestSuiteStatus.SKIPPED: |
| label = statusinfo.highlight("SKIPPED", style=flags.style) |
| elif payload.status == event.TestSuiteStatus.ABORTED: |
| label = statusinfo.highlight("ABORTED", style=flags.style) |
| elif payload.status == event.TestSuiteStatus.FAILED_TO_START: |
| label = statusinfo.error_highlight( |
| "FAILED TO START", style=flags.style |
| ) |
| elif payload.status == event.TestSuiteStatus.TIMEOUT: |
| label = statusinfo.error_highlight( |
| "TIMEOUT", style=flags.style |
| ) |
| else: |
| label = statusinfo.error_highlight( |
| "BUG: UNKNOWN", style=flags.style |
| ) |
| |
| # Record status of the test, and stop tracking the test task. |
| finished_test = test_suite_execution_info[next_event.id] |
| state.test_results[payload.status].add(finished_test.name) |
| finished_test.cleanup() |
| del test_suite_execution_info[next_event.id] |
| |
| suffix = "" |
| if payload.message and not flags.quiet: |
| suffix = "\n" + statusinfo.dim(payload.message) + "\n" |
| |
| lines_to_print.append(f"{label}: {finished_test.name}{suffix}") |
| |
| elif next_event.payload.enumerate_test_cases is not None: |
| cases_payload = next_event.payload.enumerate_test_cases |
| styled_name = statusinfo.green_highlight( |
| cases_payload.test_name, style=flags.style |
| ) |
| lines_to_print.append(f"\nTest cases in {styled_name}:") |
| for line in cases_payload.test_case_names: |
| lines_to_print.append( |
| f" {statusinfo.highlight(line, style=flags.style)}" |
| ) |
| command = f'fx test {cases_payload.test_name} --test-filter "{line}"' |
| lines_to_print.append( |
| f" {statusinfo.dim(command, style=flags.style)}" |
| ) |
| elif next_event.payload.load_config is not None and not flags.quiet: |
| load_config = next_event.payload.load_config |
| lines_to_print.extend( |
| [ |
| statusinfo.highlight( |
| f"Default flags loaded from {load_config.path}:", |
| style=flags.style, |
| ), |
| statusinfo.dim( |
| f"{str(load_config.command_line)}\n", |
| style=flags.style, |
| ), |
| ] |
| ) |
| |
| if next_event.error: |
| # Highlight all errors |
| lines_to_print.extend( |
| [ |
| statusinfo.error_highlight(line, style=flags.style) |
| for line in ("ERROR: " + next_event.error).splitlines() |
| ] |
| ) |
| |
| if lines_to_print: |
| await print_queue.put(lines_to_print) |
| |
| |
| def pass_format(count: int, style: bool = True) -> str: |
| """Helper to format passing tests. |
| |
| Args: |
| count (int): The number of passing tests. Don't highlight for 0. |
| style (bool, optional): Only style if True. Defaults to True. |
| |
| Returns: |
| str: Formatted test count. |
| """ |
| label = f"PASSED: {count}" |
| if count > 0: |
| return statusinfo.green_highlight(label, style=style) |
| else: |
| return statusinfo.dim(label, style=style) |
| |
| |
| def fail_format(count: int, style: bool = True) -> str: |
| """Helper to format failing tests. |
| |
| Args: |
| count (int): The number of failing tests. Don't highlight for 0. |
| style (bool, optional): Only style if True. Defaults to True. |
| |
| Returns: |
| str: Formatted test count. |
| """ |
| label = f"FAILED: {count}" |
| if count > 0: |
| return statusinfo.error_highlight(label, style=style) |
| else: |
| return statusinfo.dim(label, style=style) |
| |
| |
| def skip_format(count: int, style: bool = True) -> str: |
| """Helper to format skipped tests. |
| |
| Args: |
| count (int): The number of skipped tests. Don't highlight for 0. |
| style (bool, optional): Only style if True. Defaults to True. |
| |
| Returns: |
| str: Formatted test count. |
| """ |
| label = f"SKIPPED: {count}" |
| if count > 0: |
| return label |
| else: |
| return statusinfo.dim(label, style=style) |