blob: f46b75236781dd2fb161875fb78114634543136b [file] [log] [blame] [edit]
# Copyright 2023 The Fuchsia Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import os
import re
import typing
import args
import environment
import event
import package_repository
import statusinfo
import test_list_file
import util.command as command
class TestExecutionError(Exception):
"""Base error type for test failures."""
class TestCouldNotRun(TestExecutionError):
"""The test could not be run at all."""
class TestFailed(TestExecutionError):
"""The test ran, but returned a failure error code."""
class TestTimeout(TestExecutionError):
"""The test timed out."""
class TestExecution:
"""Represents a single execution for a specific test."""
def __init__(
self,
test: test_list_file.Test,
exec_env: environment.ExecutionEnvironment,
flags: args.Flags,
run_suffix: int | None = None,
):
"""Initialize the test execution wrapper.
Args:
test (test_list_file.Test): Test to run.
exec_env (environment.ExecutionEnvironment): Execution environment.
flags (args.Flags): Command flags.
run_suffix (int, optional): If set, this is the unique index of a single run of the referenced test.
"""
self._test = test
self._exec_env = exec_env
self._flags = flags
self._run_suffix = run_suffix
def name(self) -> str:
"""Get the name of the test.
Returns:
str: Name of the test.
"""
return self._test.info.name + (
f" (Run {self._run_suffix})" if self._run_suffix is not None else ""
)
def is_hermetic(self) -> bool:
"""Determine if a test is hermetic.
Returns:
bool: True if the wrapped test is hermetic, False otherwise.
"""
return self._test.info.is_hermetic()
def command_line(self) -> typing.List[str]:
"""Format the command line required to execute this test.
Raises:
TestCouldNotRun: If we do not know how to run this type of test.
Returns:
typing.List[str]: The command line for the test.
"""
if self._test.info.execution is not None:
execution = self._test.info.execution
component_url = self._get_component_url()
assert component_url is not None
min_severity_logs: typing.List[str] = []
if self._flags.min_severity_logs:
min_severity_logs = self._flags.min_severity_logs
elif execution.min_severity_logs is not None:
min_severity_logs = [execution.min_severity_logs]
extra_args = []
if execution.realm:
extra_args += ["--realm", execution.realm]
if execution.max_severity_logs and self._flags.restrict_logs:
extra_args += [
"--max-severity-logs",
execution.max_severity_logs,
]
if min_severity_logs:
for min_severity_log in min_severity_logs:
extra_args += ["--min-severity-logs", min_severity_log]
if self._test.build.test.parallel is not None:
extra_args += [
"--parallel",
str(self._test.build.test.parallel),
]
for test_filter in self._flags.test_filter:
extra_args += ["--test-filter", test_filter]
if self._flags.also_run_disabled_tests:
extra_args += ["--run-disabled"]
return ["fx", "ffx", "test", "run"] + extra_args + [component_url]
elif self._test.build.test.path:
return [
os.path.join(self._exec_env.out_dir, self._test.build.test.path)
]
else:
raise TestCouldNotRun(
f"We do not know how to run this test: {str(self._test)}"
)
def enumerate_cases_command_line(self) -> typing.List[str] | None:
"""Get the command line to enumerate all test cases in this test.
If this type of test does not support test case enumeration,
return None.
Returns:
typing.List[str] | None: Command line to enumerate cases
if possible, None otherwise.
"""
execution = self._test.info.execution
try:
component_url = self._get_component_url()
except TestCouldNotRun:
return None
if component_url is None or execution is None:
return None
extra_args = []
if execution.realm:
extra_args += ["--realm", execution.realm]
return (
["fx", "ffx", "test", "list-cases"] + extra_args + [component_url]
)
def environment(self) -> typing.Dict[str, str] | None:
"""Format environment variables needed to run the test.
Returns:
typing.Optional[typing.Dict[str, str]]: Environment for
the test, or None if no environment is needed.
"""
if self._test.build.test.path:
return {
"CWD": self._exec_env.out_dir,
}
else:
return None
def should_symbolize(self) -> bool:
"""Determine if we should symbolize the output of this test.
Returns:
bool: True if we should run the output through a symbolizer, False otherwise.
"""
return self._test.info.execution is not None
async def run(
self,
recorder: event.EventRecorder,
flags: args.Flags,
parent: event.Id,
timeout: float | None = None,
) -> command.CommandOutput:
"""Asynchronously execute this test.
Args:
recorder (event.EventRecorder): Recorder for events.
flags (args.Flags): Command flags to control output.
parent (event.Id): Parent event to nest the execution under.
timeout (float, optional): If set, timeout after this number of seconds.
Raises:
TestFailed: If the test reported failure.
TestTimeout: If the test timed out.
Returns:
command.CommandOutput: The output of executing this command.
"""
symbolize = self.should_symbolize()
command = self.command_line()
env = self.environment()
output = await run_command(
*command,
recorder=recorder,
parent=parent,
print_verbatim=flags.output,
symbolize=symbolize,
env=env,
timeout=timeout,
)
if not output:
raise TestFailed("Failed to run the test command")
elif output.return_code != 0 or output.was_timeout:
if not flags.output:
# Test failed, print output now.
recorder.emit_info_message(
f"\n{statusinfo.error_highlight(self._test.info.name, style=flags.style)}:\n"
)
if output.stdout:
recorder.emit_verbatim_message(output.stdout)
if output.stderr:
recorder.emit_verbatim_message(output.stderr)
if not output.stderr and not output.stdout:
recorder.emit_verbatim_message("<No command output>")
if output.was_timeout:
raise TestTimeout(f"Test exceeded runtime of {timeout} seconds")
else:
raise TestFailed("Test reported failure")
return output
def _get_component_url(self) -> str | None:
"""Get the final component URL to execute for this test.
If this test is not a component test, return None.
Raises:
TestCouldNotRun: If we cannot determine the merkle hash for this test.
Returns:
str | None: The final URL to execute (optionally with
hash), or None if this is not a component test.
"""
if self._test.info.execution is None:
return None
execution = self._test.info.execution
component_url = execution.component_url
if self._flags.use_package_hash:
try:
package_repo = package_repository.PackageRepository.from_env(
self._exec_env
)
name = extract_package_name_from_url(component_url)
if name is None:
raise TestCouldNotRun(
"Failed to parse package name for Merkle root matching.\nTry running with --no-use-package-hash."
)
if name not in package_repo.name_to_merkle:
raise TestCouldNotRun(
f"Could not find a Merkle hash for this test: {component_url}\nTry running with --no-use-package-hash or rebuild your package repository."
)
suffix = f"?hash={package_repo.name_to_merkle[name]}"
component_url = component_url.replace("#", f"{suffix}#", 1)
except package_repository.PackageRepositoryError as e:
raise TestCouldNotRun(
f"Could not load a Merkle hash for this test ({str(e)})\nTry running with --no-use-package-hash or rebuild your package repository."
)
return component_url
_PACKAGE_NAME_REGEX = re.compile(r"fuchsia-pkg://fuchsia\.com/([^/#]+)#")
def extract_package_name_from_url(url: str) -> str | None:
"""Given a fuchsia-pkg URL, extract and return the package name.
Example:
fuchsia-pkg://fuchsia.com/my-package#meta/my-component.cm -> my-package
Args:
url (str): A fuchsia-pkg:// URL.
Returns:
str | None: The package name from the URL, or None if parsing failed.
"""
match = _PACKAGE_NAME_REGEX.match(url)
if match is None:
return None
return match.group(1)
async def run_command(
name: str,
*args: str,
recorder: typing.Optional[event.EventRecorder] = None,
parent: typing.Optional[event.Id] = None,
print_verbatim: bool = False,
symbolize: bool = False,
env: typing.Dict[str, str] | None = None,
timeout: float | None = None,
) -> typing.Optional[command.CommandOutput]:
"""Utility method to run a test command asynchronously.
Args:
name (str): Command to run.
args (typing.List[str]): Arguments to the command.
recorder (event.EventRecorder | None):
Recorder for events. Defaults to None.
parent (event.Id | None): Parent event ID for reporting.
Defaults to None.
print_verbatim (bool, optional): If set, record verbatim
output events for stdout and stderr. Defaults to False.
symbolize (bool, optional): If true, pipe output through
symbolizer. Defaults to False.
env (typing.Dict[str, str], optional):
Environment to pass to the command. Defaults to None.
timeout (float, optional): The number of seconds to wait before timing out.
Returns:
command.CommandOutput | None: The command output if it could
be executed, None otherwise.
"""
id: event.Id
if recorder is not None:
id = recorder.emit_program_start(name, list(args), env, parent=parent)
try:
symbolizer_args = (
None if not symbolize else ["fx", "ffx", "debug", "symbolize"]
)
started = await command.AsyncCommand.create(
name,
*args,
symbolizer_args=symbolizer_args,
env=env,
timeout=timeout,
)
def handle_event(current_event: command.CommandEvent):
if recorder is not None:
if isinstance(current_event, command.StdoutEvent):
recorder.emit_program_output(
id,
current_event.text.decode(errors="replace"),
stream=event.ProgramOutputStream.STDOUT,
print_verbatim=print_verbatim,
)
if isinstance(current_event, command.StderrEvent):
recorder.emit_program_output(
id,
current_event.text.decode(errors="replace"),
stream=event.ProgramOutputStream.STDERR,
print_verbatim=print_verbatim,
)
if isinstance(current_event, command.TerminationEvent):
recorder.emit_program_termination(
id, current_event.return_code
)
return await started.run_to_completion(callback=handle_event)
except command.AsyncCommandError as e:
if recorder is not None:
recorder.emit_program_termination(id, -1, error=str(e))
return None