blob: 8cf481a4ace45e0a26307d4df5701dea610f9044 [file] [log] [blame]
# Copyright 2023 The Fuchsia Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Event definitions for `fx test`.
This module contains the definitions for all events that can occur during
an execution of `fx test`. All user-visible operations are represented
as an Event structure, which supports serialization to JSON. Event handlers
support displaying events to a user, writing to a log file, uploading to a
server, or any other operation that is desired.
"""
import asyncio
from dataclasses import dataclass
from dataclasses import fields
import datetime
import enum
import time
import typing
from dataparse import dataparse
import selection_types
import tests_json_file
# Events may have a unique Id, represented as a monotonically increasing integer.
Id = typing.NewType("Id", int)
# Event Id 0 is special, referring to the entire invocation of `fx test`
GLOBAL_RUN_ID: Id = Id(0)
@dataparse
@dataclass
class FileParsingPayload:
"""Payload for a file parsing event."""
# Name of the file being parsed.
name: str
# Path on the host system where the file was located.
path: str
class MessageLevel(enum.Enum):
"""Valid user message types.
Note that there is no ERROR level, because Events may themselves
represent an error and they hold their own error messages.
"""
# The message is an instruction to the user. Typically dimmed in terminal output.
INSTRUCTION = "INSTRUCTION"
# The message has actionable information for the user, typically shown normally in terminal output.
INFO = "INFO"
# The message is a warning to the user, typically shown in yellow in terminal output.
WARNING = "WARNING"
# The message should be displayed verbatim. This is used for command output.
VERBATIM = "VERBATIM"
@dataparse
@dataclass
class Message:
"""Display this value to a user."""
# The string to display.
value: str
# The level of the message to display. Different levels of messages
# may have different styling applied.
# See above for levels.
level: MessageLevel
@dataparse
@dataclass
class TestsJsonFilePayload:
"""The result of loading a tests.json file."""
# The complete list of entries parsed from the file.
# This output is in the format parsed by tests_json_file, not the
# verbatim input.
test_entries: list[tests_json_file.TestEntry]
# The path of the tests.json file that was parsed.
file_path: str
@dataparse
@dataclass
class ProgramExecutionPayload:
"""Details about a program execution."""
# The name of the command that was executed.
command: str
# List of flags passed to the command.
flags: list[str]
# The environment passed to the command.
environment: dict[str, str]
def to_formatted_command_line(self) -> str:
"""Format this program execution to an approximation of the command line.
This output can be shown to the user to describe the command invocation.
Returns:
str: Formatted command line.
"""
return f"{self.command}{'' if not self.flags else ' ' + ' '.join(self.flags)}"
class ProgramOutputStream(enum.Enum):
"""Details about the source of program output."""
# Designates output as coming from stdout.
STDOUT = "STDOUT"
# Designates output as coming from stderr.
STDERR = "STDERR"
@dataparse
@dataclass
class ProgramOutputPayload:
"""Payload for output bytes from a program."""
# The data, as a string.
data: str
# The stream this data came from. Either STDOUT or STDERR.
stream: ProgramOutputStream
# If true, the user asked for this output to be printed verbatim to their
# console.
print_verbatim: bool = False
@dataparse
@dataclass
class ProgramTerminationPayload:
"""Payload for a program terminating."""
# The return code of the program. 0 is success, and any other
# value is failure.
return_code: int
@dataparse
@dataclass
class TestSelectionPayload:
"""Payload for test selection events.
This payload provides the complete set of selected and not selected tests
for this command invocation.
"""
# Map of selected test names to their score that was below the threshold.
selected: dict[str, int]
# Map of not selected test names to their score that was above the threshold.
not_selected: dict[str, int]
# Map of selected but not run test names to their score that was below the threshold.
selected_but_not_run: dict[str, int]
# The distance threshold this selection run was configured with.
fuzzy_distance_threshold: int
@dataparse
@dataclass
class EventGroupPayload:
"""Represents a group of events that may have their own children."""
# The display name for this group.
name: str
# An optional count of events queued on the group.
# If set, this value can be used to create a progress bar for this event group.
queued_events: int | None = None
# If true, instructs console displays to hide children.
hide_children: bool = False
class TestGroupPayload(EventGroupPayload):
"""Test groups are specializations of event groups.
They are stored separately to support more detailed output to users.
"""
def __init__(self, tests_to_run: int):
"""Initialize a new test group.
Args:
tests_to_run (int): Number of tests to run. Used for
formatting a name and to initialize the number of queued
events.
"""
super().__init__(
name=f"Running {tests_to_run} tests", queued_events=tests_to_run
)
@classmethod
def from_dict(cls, dict: dict[str, typing.Any]) -> typing.Self:
s: EventGroupPayload = super().from_dict(dict) # type:ignore
ret: typing.Self = cls(0)
ret.name = s.name
ret.queued_events = s.queued_events
return ret
@dataparse
@dataclass
class TestSuiteStartedPayload:
"""A test suite started."""
# The name of the suite.
name: str
# If true, this test suite is hermetic and may be run in parallel.
hermetic: bool | None = False
class TestSuiteStatus(enum.Enum):
"""Result status for a test suite's execution."""
# A test suite passed.
PASSED = "PASSED"
# A test suite failed.
FAILED = "FAILED"
# A test suite was skipped for some reason.
SKIPPED = "SKIPPED"
# The test suite execution was aborted due to some condition.
ABORTED = "ABORTED"
# The test suite was aborted due to exceeding its timeout.
TIMEOUT = "TIMEOUT"
@dataparse
@dataclass
class TestSuiteEndedPayload:
"""A test suite finished executing."""
# The status message from above representing the result of this test suite.
status: TestSuiteStatus
# Optionally, a message describing what happened.
message: str | None = None
@dataparse
@dataclass
class EnumerateTestCasesPayload:
"""A test suite's cases were enumerated."""
# The name of the test being enumerated.
test_name: str
# The names of the test cases in the test.
test_case_names: list[str]
@dataparse
@dataclass
class LoadConfigPayload:
"""A configuration file was loaded with flag defaults."""
# Path to config file.
path: str
# The flag defaults parsed from the config file.
flags: dict[str, typing.Any]
# The command line provided in the config file.
command_line: list[str]
@dataparse
@dataclass
class EventPayloadUnion:
"""Payload for event types.
At most one of the below fields may be set.
Rather than using derived classes, we use the following at-most-one-set
union class for better compatibility with the @dataparse wrapper. This
union implements an "externally tagged" enum, which could be replaced
with a different form of tagging in the future.
"""
def __post_init__(self) -> None:
fields_present: typing.Set[str] = set(
[f.name for f in fields(self) if getattr(self, f.name) is not None]
)
if len(fields_present) != 1:
raise ValueError(
"Only one field may be set on EventPayloadUnion. The following were found: "
+ str(fields_present)
)
# If set, this event denotes the start time of the run.
# Payload is the actual timestamp of the run start as a UNIX timestamp.
#
# Other timestamps are in monotonic time, so the mapping of the monotonic
# time for the containing event to this UNIX timestamp must be used for all
# time formatting.
start_timestamp: float | None = None
# This event denotes loading a configuration file containing flag defaults.
#
# The payload contains the file path and the parsed command line flags that
# are used as defaults.
load_config: LoadConfigPayload | None = None
# This event denotes parsing command line flags.
#
# The parsed command line flags are included in the value.
parse_flags: dict[str, typing.Any] | None = None
# This event denotes processing the execution environment.
#
# The parsed environment is included in the value.
process_env: dict[str, typing.Any] | None = None
# This event denotes a message to be shown to the user.
#
# The value provides display information.
user_message: Message | None = None
# This event denotes the beginning of a new event group.
#
# The value provides details on the group.
event_group: EventGroupPayload | None = None
# This event denotes a generic file parsing duration.
#
# The value provides details of the file being parsed.
parsing_file: FileParsingPayload | None = None
# This event denotes a program starting executing.
#
# The value provides details on the program.
program_execution: ProgramExecutionPayload | None = None
# This event denotes output from a running program.
#
# The value provides contents of the output.
program_output: ProgramOutputPayload | None = None
# This event denotes the termination of a program.
#
# The value provides details on the return code.
program_termination: ProgramTerminationPayload | None = None
# This event denotes the results of loading the tests.json file.
#
# The value provides details of the parsed data.
test_file_loaded: TestsJsonFilePayload | None = None
# This event denotes selection of a set of tests.
#
# The value provides details on selection decisions.
test_selections: TestSelectionPayload | None = None
# This event denotes the beginning of a build operation.
#
# The value lists the targets being built.
build_targets: list[str] | None = None
# This event denotes the beginning of a group of test suites.
#
# The value provides display information about the tests.
test_group: TestGroupPayload | None = None
# This event denotes the beginning of a test suite.
#
# The value provides details on the suite.
test_suite_started: TestSuiteStartedPayload | None = None
# This event denotes the end of a test suite.
#
# The value provides result information.
test_suite_ended: TestSuiteEndedPayload | None = None
# This event denotes the numeration of cases within a test suite.
#
# The value provides details on the cases that were found.
enumerate_test_cases: EnumerateTestCasesPayload | None = None
@dataparse
@dataclass
class Event:
# Unique Id for the event. If not set, this event is not
# associated with a known duration.
id: Id | None = None
# Monotonic timestamp for the event.
timestamp: float = 0
# Parent Id for the event. If not set, treat GLOBAL_RUN_ID as
# the implicit parent
parent: Id | None = None
# If set, a new duration is starting with the above Id.
starting: bool | None = None
# If set, a duration with the above Id has ended.
ending: bool | None = None
# If set, the duration ended with an error. The human-readable
# message is stored in this field.
error: str | None = None
# Optional payload for the event. See EventPayloadUnion
# documentation for details.
payload: EventPayloadUnion | None = None
class EventRecorder:
"""Entry point to emitting and listening for events.
EventRecorder has methods to emit each type of event as well as iterate over
all events as they are being emitted. An instance should be passed
everywhere events are emitted or read.
"""
def __init__(self) -> None:
"""Initialize a new EventRecorder."""
# Keep track of the system time corresponding to the below monotonic
# time. This represents the beginning of this EventRecorder's execution.
self._system_time_start: float = time.time()
# Keep track of the monotonic time corresponding to the above
# system time.
self._monotonic_time_start: float = time.monotonic()
# Keep track of all events that were emitted as part of execution.
self._events: list[Event] = []
# Keep track of each asynchronous event consumer queue.
self._queues: list[asyncio.Queue[Event | None]] = []
# Async event designating that this recorder is done.
self._done: asyncio.Event = asyncio.Event()
# Keep track of the next unique Id to assign.
self._next_id: Id = Id(1)
def _get_timestamp(self) -> float:
"""Produce timestamps for events in this recorder.
Returns:
float: Monotonic timestamp for use in an event.
"""
return time.monotonic()
def _new_id(self) -> Id:
"""Produce a new unique Id in the context of this EventRecorder.
Returns:
Id: A unique Id.
"""
ret = self._next_id
self._next_id = Id(self._next_id + 1)
return ret
def _local_time_for_monotonic(self, monotonic: float) -> datetime.datetime:
"""Return the local datetime for a given monotonic value.
This function uses the previously set monotonic time mapping, and it
must be called after the first call to _get_timestamp.
Args:
monotonic (float): The monotonic time to translate.
Returns:
datetime.datetime: The system time represented by the monotonic time.
"""
diff = monotonic - self._monotonic_time_start
return datetime.datetime.fromtimestamp(self._system_time_start + diff)
def _emit(self, event: Event) -> None:
"""Helper to emit an event to all listeners.
Args:
event (Event): The event to emit.
"""
self._events.append(event)
for queue in self._queues:
queue.put_nowait(event)
def end(self) -> None:
"""End this queue. No further events may be emitted, and all listeners
will eventually terminate.
"""
for queue in self._queues:
queue.put_nowait(None)
self._done.set()
def iter(self) -> typing.AsyncIterable[Event]:
"""Create an iterator over the events of this recorder.
Raises:
StopAsyncIteration: Raised when iteration should end.
Used internally by __anext__.
Returns:
typing.AsyncIterable[Event]: Async iterator over the events.
Warning:
All returned iterators must be read to completion. Failure to do so
will result in a memory leak.
Example:
recorder = EventRecorder()
recorder.emit_init()
recorder.emit_info_message("This is a message")
asyncio.create_task(/* spawn a task that generates more records,
followed by recorder.emit_end() */)
async for event in recorder.iter():
print(recorder.event_string(event))
"""
parent = self
class Iter:
def __aiter__(self) -> typing.Self:
self._init_items: list[Event] = parent._events.copy()
self._queue: asyncio.Queue[Event | None] = asyncio.Queue()
if not parent._done.is_set():
parent._queues.append(self._queue)
else:
# Immediately end when we get to reading from the queue, but still return the stored events.
self._queue.put_nowait(None)
return self
async def __anext__(self) -> Event:
if self._init_items:
return self._init_items.pop(0)
next = await self._queue.get()
if not next:
raise StopAsyncIteration()
return next
return Iter()
def event_string(self, event: Event) -> str:
"""Print out the string representation of an event returned
by this recorder.
Args:
event (Event): The event to print.
Returns:
str: String representation of the event.
"""
time_str = self._local_time_for_monotonic(event.timestamp).strftime(
"%Y-%m-%d %H:%M:%S.%f"
)
if event.id is not None:
start_stop = (
"S"
if event.starting and not event.ending
else "E"
if not event.starting and event.ending
else "I"
)
id_line = f"[{event.id:05}:{start_stop}]"
else:
id_line = "[_______]"
return f"{time_str} {id_line:9} {self._payload_string(event.payload)}"
def _payload_string(self, payload: EventPayloadUnion | None) -> str:
"""Format the payload of an event as a string.
Args:
payload (EventPayloadUnion | None): Payload to format.
Returns:
str: String representation of the event payload.
"""
if payload is None:
return ""
if payload.start_timestamp is not None:
return "Starting Run"
elif payload.load_config is not None:
return "Loaded config: " + str(payload.load_config)
elif payload.parse_flags is not None:
return "Parsed flags: " + str(payload.parse_flags)
elif payload.process_env is not None:
return "Processed environment: " + str(payload.process_env)
elif payload.user_message is not None:
# Include the user message, except for VERBATIM payloads. In that
# case, simply print the number of characters. This avoids dupication
# of verbose command outputs.
value = payload.user_message.value
level = payload.user_message.level
value_print = (
value.strip()
if level != MessageLevel.VERBATIM
else (f"<{len(value)} characters of verbatim output>")
)
return f"Display user message: [{level}] {value_print}"
elif payload.parsing_file is not None:
return f"Parsing {payload.parsing_file.name} at {payload.parsing_file.path}"
elif payload.program_execution is not None:
return f"Running `{payload.program_execution.command} {' '.join(payload.program_execution.flags)}`"
elif payload.program_output is not None:
return f"Got {len(payload.program_output.data)} characters from {payload.program_output.stream}"
elif payload.program_termination is not None:
return f"Terminated with return_code = {payload.program_termination.return_code}"
elif payload.test_selections is not None:
return f"Selected {len(payload.test_selections.selected)} tests"
elif payload.test_file_loaded is not None:
return f"Loaded {len(payload.test_file_loaded.test_entries)} tests from {payload.test_file_loaded.file_path}"
elif payload.event_group is not None:
return f"Starting group {payload.event_group.name}"
elif payload.build_targets is not None:
return f"Building {len(payload.build_targets)} targets"
elif payload.test_group is not None:
return f"Starting test group {payload.test_group.name}"
elif payload.test_suite_started is not None:
return f"Starting test suite {payload.test_suite_started.name}"
elif payload.test_suite_ended is not None:
end_msg = payload.test_suite_ended.message
suffix = "" if not end_msg else f": {end_msg}"
return f"Ending test suite with state {payload.test_suite_ended.status}{suffix}"
else:
return "BUG: UNKNOWN EVENT PAYLOAD " + str(payload.__dict__)
def emit_init(self) -> None:
"""Emit the initial event.
This method must be called first following the creation of
an EventRecorder.
"""
self._emit(
Event(
GLOBAL_RUN_ID,
self._monotonic_time_start,
starting=True,
payload=EventPayloadUnion(
start_timestamp=self._system_time_start
),
)
)
def emit_end(self, error: str | None = None, id: Id | None = None) -> None:
"""Emit an end event for an event duration.
By default, the global run duration is terminated with an error
optionally given by the first argument.
Optionally, a different duration may be ended by giving its id.
Args:
error (str | None): If set, end the
given event with an error. Defaults to None.
id (Id | None): If set, end this
event instead of the global run. Defaults to None.
"""
id = id or GLOBAL_RUN_ID
self._emit(
Event(
id or GLOBAL_RUN_ID,
self._get_timestamp(),
ending=True,
error=error,
)
)
if id == GLOBAL_RUN_ID:
self.end()
def emit_load_config(
self,
path: str,
flags: dict[str, typing.Any],
command_line: list[str],
) -> None:
"""Emit a load_config event with details on the config.
Args:
path (str): The path to the loaded config file.
flags (dict[str, typing.Any]): The flags passed to this invocation.
command_line (list[str]): The command line parsed from the config file.
"""
self._emit(
Event(
GLOBAL_RUN_ID,
self._get_timestamp(),
payload=EventPayloadUnion(
load_config=LoadConfigPayload(path, flags, command_line)
),
)
)
def emit_parse_flags(self, flags: dict[str, typing.Any]) -> None:
"""Emit a parse_flags event with details on the flags.
Args:
flags (dict[str, typing.Any]): The flags passed to this invocation.
"""
self._emit(
Event(
GLOBAL_RUN_ID,
self._get_timestamp(),
payload=EventPayloadUnion(parse_flags=flags),
)
)
def emit_process_env(self, env: dict[str, typing.Any]) -> None:
"""Emit a process_env event with details of the environment.
Args:
env (dict[str, typing.Any]): The environment parsed by this invocation.
"""
self._emit(
Event(
GLOBAL_RUN_ID,
self._get_timestamp(),
payload=EventPayloadUnion(process_env=env),
)
)
def _emit_user_message(
self, message: str, level: MessageLevel = MessageLevel.INFO
) -> None:
"""Emit a message to display to a user.
Args:
message (str): Message to show.
level (str, optional): Message level for display. Defaults to MESSAGE_LEVEL_INFO.
"""
self._emit(
Event(
None,
self._get_timestamp(),
payload=EventPayloadUnion(
user_message=Message(value=message, level=level)
),
)
)
def emit_instruction_message(self, message: str) -> None:
"""Emit a message to the user with level INSTRUCTION.
Args:
message (str): Message contents
"""
self._emit_user_message(message, level=MessageLevel.INSTRUCTION)
def emit_info_message(self, message: str) -> None:
"""Emit a message to the user with level INFO.
Args:
message (str): Message contents
"""
self._emit_user_message(message, level=MessageLevel.INFO)
def emit_warning_message(self, message: str) -> None:
"""Emit a message to the user with level WARNING.
Args:
message (str): Message contents
"""
self._emit_user_message(message, level=MessageLevel.WARNING)
def emit_verbatim_message(self, message: str) -> None:
"""Emit a message to the user with level VERBATIM.
Args:
message (str): Message contents
"""
self._emit_user_message(message, level=MessageLevel.VERBATIM)
def emit_start_file_parsing(
self, name: str, path: str, parent: Id | None = None
) -> Id:
"""Start parsing a file.
This call must be matched with an emit_end call for the returned Id.
Args:
name (str): The name of the file being parsed.
path (str): The path to the file being parsed.
parent (Id | None): Parent of the event, if set. Defaults
to the global run.
Returns:
Id: New Id for the parsing event, which must be ended explicitly.
"""
id = self._new_id()
self._emit(
Event(
id,
self._get_timestamp(),
parent=parent,
starting=True,
payload=EventPayloadUnion(
parsing_file=FileParsingPayload(name, path)
),
)
)
return id
def emit_program_start(
self,
command: str,
args: list[str],
environment: dict[str, str] | None = None,
parent: Id | None = None,
) -> Id:
"""A program is starting execution.
This call must be matched with an emit_program_termination
call for the returned Id.
Args:
command (str): The command being executed.
args (listsed to the command.
environment (dict[str, str] | None):
The environment passed to the command. Defaults to None.
parent (Id, | None): Parent for this event. Defaults
to the global run.
Returns:
Id: New Id for the program event, which must be ended explicitly.
"""
id = self._new_id()
self._emit(
Event(
id,
self._get_timestamp(),
parent=parent,
starting=True,
payload=EventPayloadUnion(
program_execution=ProgramExecutionPayload(
command, args, environment or dict()
)
),
)
)
return id
def emit_program_output(
self,
id: Id,
content: str,
stream: ProgramOutputStream,
print_verbatim: bool = False,
) -> None:
"""A program produced output on a stream.
Args:
id (Id): An Id returned by emit_program_start.
content (str): The string content of the output.
stream (str): The stream that produced the content.
print_verbatim (bool, optional): True only if the user
requested that this command output be printed verbatim
back to the console. Defaults to False.
"""
self._emit(
Event(
id,
self._get_timestamp(),
payload=EventPayloadUnion(
program_output=ProgramOutputPayload(
content, stream, print_verbatim=print_verbatim
)
),
)
)
def emit_program_termination(
self, id: Id, return_code: int, error: str | None = None
) -> None:
"""A program terminated.
Args:
id (Id): An Id returned by emit_program_start.
return_code (int): The return code for the program.
error (str | None): If set, this program terminated
with an error represented by this string message.
Defaults to None.
"""
self._emit(
Event(
id,
self._get_timestamp(),
ending=True,
payload=EventPayloadUnion(
program_termination=ProgramTerminationPayload(return_code)
),
error=error,
)
)
def emit_test_file_loaded(
self, entries: list[tests_json_file.TestEntry], file_path: str
) -> None:
"""Event with details of loading the tests.json file.
Args:
entries (list[tests_json_file.TestEntry]): Parsed file contents.
file_path (str): Path to the tests.json file.
"""
self._emit(
Event(
None,
self._get_timestamp(),
payload=EventPayloadUnion(
test_file_loaded=TestsJsonFilePayload(entries, file_path)
),
)
)
def emit_test_selections(
self,
selections: selection_types.TestSelections,
) -> None:
"""Event with details of test selection.
Args:
selections (selection.TestSelections): The processed selections.
threshold (float): The score threshold used for selection.
"""
selected_scores = {
item.name(): selections.best_score[item.name()]
for item in selections.selected
}
selected_but_not_run_scores = {
item.name(): selections.best_score[item.name()]
for item in selections.selected_but_not_run
}
not_selected_scores = {
name: score
for name, score in selections.best_score.items()
if name not in selected_scores
}
self._emit(
Event(
None,
self._get_timestamp(),
payload=EventPayloadUnion(
test_selections=TestSelectionPayload(
selected_scores,
not_selected_scores,
selected_but_not_run_scores,
selections.fuzzy_distance_threshold,
)
),
)
)
def emit_event_group(
self,
name: str,
parent: Id | None = None,
queued_events: int | None = None,
hide_children: bool = False,
) -> Id:
"""Create a new event group.
The returned Id must be passed to a subsequent emit_end call.
Args:
name (str): Name of the group.
parent (Id | None): Parent Id for the group. Defaults to the global run.
queued_events (int | None): If set, expect this number
of events to call this group their parent. Defaults to
None.
Returns:
Id: New Id for the created group.
"""
id = self._new_id()
self._emit(
Event(
id,
self._get_timestamp(),
parent=parent,
starting=True,
payload=EventPayloadUnion(
event_group=EventGroupPayload(
name, queued_events, hide_children=hide_children
)
),
)
)
return id
def emit_build_start(self, targets: list[str]) -> Id:
"""A build process is starting.
The returned Id must be passed to a subsequent emit_end call.
Args:
targets (list[str]): List of targets being built.
Returns:
Id: New Id for the build event.
"""
id = self._new_id()
self._emit(
Event(
id,
self._get_timestamp(),
starting=True,
payload=EventPayloadUnion(build_targets=targets),
)
)
return id
def emit_test_group(self, test_count: int) -> Id:
"""A group of tests will be executed.
The returned Id must be passed to a subsequent emit_end call.
Args:
test_count (int): The number of tests that will be executed.
Returns:
Id: New Id for the test group.
"""
id = self._new_id()
self._emit(
Event(
id,
self._get_timestamp(),
starting=True,
payload=EventPayloadUnion(
test_group=TestGroupPayload(test_count)
),
)
)
return id
def emit_test_suite_started(
self, name: str, hermetic: bool, parent: Id | None = None
) -> Id:
"""A test suite has started executing.
The returned Id must be passed to a subsequent
emit_test_suite_ended call.
Args:
name (str): The name of the test suite.
hermetic (bool): True only if this suite is executed hermetically.
parent (Id | None ): Parent event. Defaults to global run.
Returns:
Id: _description_
"""
id = self._new_id()
self._emit(
Event(
id,
self._get_timestamp(),
starting=True,
parent=parent,
payload=EventPayloadUnion(
test_suite_started=TestSuiteStartedPayload(name, hermetic)
),
)
)
return id
def emit_test_suite_ended(
self, id: Id, status: TestSuiteStatus, message: str | None
) -> None:
"""A test suite has finished executing.
Args:
id (Id): The Id of the
status (str): Status string for the test suite.
message (str | None): Optional message
describing the outcome of this suite.
"""
self._emit(
Event(
id,
self._get_timestamp(),
ending=True,
payload=EventPayloadUnion(
test_suite_ended=TestSuiteEndedPayload(status, message)
),
)
)
def emit_enumerate_test_cases(
self, test_name: str, test_case_names: list[str]
) -> None:
id = self._new_id()
self._emit(
Event(
id,
self._get_timestamp(),
payload=EventPayloadUnion(
enumerate_test_cases=EnumerateTestCasesPayload(
test_name, test_case_names
)
),
)
)