blob: 9eeb990e437621b22e90ff09ac6adddd91439404 [file] [log] [blame]
# Copyright 2023 The Fuchsia Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import os
import re
import tempfile
import args
import async_utils.command as command
import environment
import event
import package_repository
import statusinfo
import test_list_file
class TestExecutionError(Exception):
"""Base error type for test failures."""
class TestCouldNotRun(TestExecutionError):
"""The test could not be run at all."""
class TestSkipped(TestExecutionError):
"""The test was skipped for some non-error reason"""
class TestFailed(TestExecutionError):
"""The test ran, but returned a failure error code."""
class TestTimeout(TestExecutionError):
"""The test timed out."""
# Unique number suffix for output subdirectories.
UNIQUE_OUTPUT_SUFFIX = 0
class TestExecution:
"""Represents a single execution for a specific test."""
def __init__(
self,
test: test_list_file.Test,
exec_env: environment.ExecutionEnvironment,
flags: args.Flags,
run_suffix: int | None = None,
device_env: environment.DeviceEnvironment | None = None,
):
"""Initialize the test execution wrapper.
Args:
test (test_list_file.Test): Test to run.
exec_env (environment.ExecutionEnvironment): Execution environment.
flags (args.Flags): Command flags.
run_suffix (int, optional): If set, this is the unique
index of a single run of the referenced test.
device_env (environment.DeviceEnvironment, optional):
If set, this contains information on how to connect to
a device. Otherwise if this is a host test it may not
connect to a device.
"""
global UNIQUE_OUTPUT_SUFFIX
self._test = test
self._exec_env = exec_env
self._flags = flags
self._run_suffix = run_suffix
self._device_env = device_env
if self._flags.artifact_output_directory is not None:
self._outdir: str | None = os.path.join(
self._flags.artifact_output_directory, str(UNIQUE_OUTPUT_SUFFIX)
)
UNIQUE_OUTPUT_SUFFIX += 1
else:
self._outdir = None
def name(self) -> str:
"""Get the name of the test.
Returns:
str: Name of the test.
"""
return self._test.name() + (
f" (Run {self._run_suffix})" if self._run_suffix is not None else ""
)
def is_hermetic(self) -> bool:
"""Determine if a test is hermetic.
Returns:
bool: True if the wrapped test is hermetic, False otherwise.
"""
return self._test.info.is_hermetic()
def command_line(self) -> list[str]:
"""Format the command line required to execute this test.
Raises:
TestCouldNotRun: If we do not know how to run this type of test.
Returns:
list[str]: The command line for the test.
"""
if self._use_test_interface():
# assert for mypy
assert self._test.build.test.new_path is not None
return [
os.path.join(
self._exec_env.out_dir, self._test.build.test.new_path
)
]
elif self._test.info.execution is not None:
execution = self._test.info.execution
component_url = self._get_component_url()
assert component_url is not None
min_severity_logs: list[str] = []
if self._flags.min_severity_logs:
min_severity_logs = self._flags.min_severity_logs
elif execution.min_severity_logs is not None:
min_severity_logs = [execution.min_severity_logs]
extra_args = []
if execution.realm:
extra_args += ["--realm", execution.realm]
if execution.max_severity_logs and self._flags.restrict_logs:
extra_args += [
"--max-severity-logs",
execution.max_severity_logs,
]
if min_severity_logs:
for min_severity_log in min_severity_logs:
extra_args += ["--min-severity-logs", min_severity_log]
parallel_cases = self._flags.parallel_cases
if (
parallel_cases == 0
and self._test.build.test.parallel is not None
):
parallel_cases = self._test.build.test.parallel
if parallel_cases != 0:
extra_args += [
"--parallel",
str(parallel_cases),
]
for test_filter in self._flags.test_filter:
extra_args += ["--test-filter", test_filter]
if self._flags.also_run_disabled_tests:
extra_args += ["--run-disabled"]
if self._flags.show_full_moniker_in_logs:
extra_args += ["--show-full-moniker-in-logs"]
if self._flags.break_on_failure:
extra_args += ["--break-on-failure"]
if self._outdir is not None:
extra_args += ["--output-directory", self._outdir]
suffix_args = (
["--"] + self._flags.extra_args
if self._flags.extra_args
else []
)
return (
["fx", "ffx", "test", "run"]
+ extra_args
+ [component_url]
+ suffix_args
)
elif self._test.build.test.path:
return [
os.path.join(self._exec_env.out_dir, self._test.build.test.path)
] + self._flags.extra_args
else:
raise TestCouldNotRun(
f"We do not know how to run this test: {str(self._test)}"
)
def enumerate_cases_command_line(self) -> list[str] | None:
"""Get the command line to enumerate all test cases in this test.
If this type of test does not support test case enumeration,
return None.
Returns:
list[str] | None: Command line to enumerate cases
if possible, None otherwise.
"""
execution = self._test.info.execution
try:
component_url = self._get_component_url()
except TestCouldNotRun:
return None
if component_url is None or execution is None:
return None
extra_args = []
if execution.realm:
extra_args += ["--realm", execution.realm]
return (
["fx", "ffx", "test", "list-cases"] + extra_args + [component_url]
)
def environment(self) -> dict[str, str] | None:
"""Format environment variables needed to run the test.
Returns:
dict[str, str] | None: Environment for
the test, or None if no environment is needed.
"""
env = self._flags.computed_env()
if (
self._test.build.test.path
or self._test.is_e2e_test()
or self._use_test_interface()
):
env.update(
{
"CWD": self._exec_env.out_dir,
}
)
if self._use_test_interface():
# TODO(https://fxbug.dev/327640651): Add support for other
# parameters like test filter, etc.
env.update(
{
# TODO(https://fxbug.dev/327640651): For now add ask path as
# host-tools till we figure out a better way.
"FUCHSIA_SDK_TOOL_PATH": os.path.join(
self._exec_env.out_dir, "host-tools"
),
}
)
if self._device_env is not None:
env.update(
{
"FUCHSIA_TARGETS": self._device_env.address,
}
)
if self._flags.extra_args:
custom_args = " ".join(self._flags.extra_args)
env.update(
{
"FUCHSIA_CUSTOM_TEST_ARGS": custom_args,
}
)
if self._test.is_e2e_test() and self._device_env is not None:
env.update(
{
"FUCHSIA_DEVICE_ADDR": self._device_env.address,
"FUCHSIA_SSH_PORT": self._device_env.port,
"FUCHSIA_SSH_KEY": self._device_env.private_key_path,
"FUCHSIA_NODENAME": self._device_env.name,
}
)
return None if not env else env
def should_symbolize(self) -> bool:
"""Determine if we should symbolize the output of this test.
Returns:
bool: True if we should run the output through a symbolizer, False otherwise.
"""
return self._test.info.execution is not None
async def run(
self,
recorder: event.EventRecorder,
flags: args.Flags,
parent: event.Id,
timeout: float | None = None,
) -> command.CommandOutput:
"""Asynchronously execute this test.
Args:
recorder (event.EventRecorder): Recorder for events.
flags (args.Flags): Command flags to control output.
parent (event.Id): Parent event to nest the execution under.
timeout (float, optional): If set, timeout after this number of seconds.
Raises:
TestFailed: If the test reported failure.
TestTimeout: If the test timed out.
TestSkipped: If the test should not run.
Returns:
command.CommandOutput: The output of executing this command.
"""
if self._test.is_boot_test():
raise TestSkipped(
"Boot tests are not supported by `fx test`. Use `fx run-boot-test`."
)
if self._test.is_e2e_test() and not flags.e2e:
raise TestSkipped(
"Skipping optional end to end test. Pass --e2e to execute this test."
)
symbolize = self.should_symbolize()
command = self.command_line()
env = self.environment() or {}
outdir = self._outdir
maybe_temp_dir: tempfile.TemporaryDirectory[str] | None = None
if not outdir:
maybe_temp_dir = tempfile.TemporaryDirectory()
outdir = maybe_temp_dir.name
env.update(
{
"FUCHSIA_TEST_OUTDIR": outdir,
}
)
output = await run_command(
*command,
recorder=recorder,
parent=parent,
print_verbatim=flags.output,
symbolize=symbolize,
env=env,
timeout=timeout,
)
if maybe_temp_dir is not None:
files: list[str] = []
for prefix, _, names in os.walk(maybe_temp_dir.name):
files.extend(
[
os.path.relpath(
os.path.join(prefix, n), maybe_temp_dir.name
)
for n in names
]
)
if files:
name_list = statusinfo.ellipsize(", ".join(files), 100)
recorder.emit_instruction_message(
f"Deleting {len(files)} files at {maybe_temp_dir.name}: {name_list}"
)
recorder.emit_instruction_message(
"To keep these files, set --ffx-output-directory."
)
maybe_temp_dir.cleanup()
if not output:
raise TestFailed("Failed to run the test command")
elif output.return_code != 0 or output.was_timeout:
if not flags.output:
# Test failed, print output now.
recorder.emit_info_message(
f"\n{statusinfo.error_highlight(self._test.name(), style=flags.style)}:\n"
)
if output.stdout:
recorder.emit_verbatim_message(output.stdout)
if output.stderr:
recorder.emit_verbatim_message(output.stderr)
if not output.stderr and not output.stdout:
recorder.emit_verbatim_message("<No command output>")
if output.was_timeout:
raise TestTimeout(f"Test exceeded runtime of {timeout} seconds")
else:
raise TestFailed("Test reported failure")
return output
def _get_component_url(self) -> str | None:
"""Get the final component URL to execute for this test.
If this test is not a component test, return None.
Raises:
TestCouldNotRun: If we cannot determine the merkle hash for this test.
Returns:
str | None: The final URL to execute (optionally with
hash), or None if this is not a component test.
"""
if self._test.info.execution is None:
return None
execution = self._test.info.execution
component_url = execution.component_url
if self._flags.use_package_hash:
try:
package_repo = package_repository.PackageRepository.from_env(
self._exec_env
)
name = extract_package_name_from_url(component_url)
if name is None:
raise TestCouldNotRun(
"Failed to parse package name for Merkle root matching.\nTry running with --no-use-package-hash or run fx build."
)
if name not in package_repo.name_to_merkle:
raise TestCouldNotRun(
f"Could not find a Merkle hash for this test: {component_url}\nTry running with --no-use-package-hash or run fx build."
)
suffix = f"?hash={package_repo.name_to_merkle[name]}"
component_url = component_url.replace("#", f"{suffix}#", 1)
except package_repository.PackageRepositoryError as e:
raise TestCouldNotRun(
f"Could not load a Merkle hash for this test ({str(e)})\nTry running with --no-use-package-hash or run fx build."
)
return component_url
def _use_test_interface(self) -> bool:
"""Should this test use the experimental test interface API.
Returns:
True if experimental flag is enabled and the test supports new
interface.
"""
return (
self._flags.use_test_interface
and self._test.build.test.new_path is not None
)
_PACKAGE_NAME_REGEX = re.compile(r"fuchsia-pkg://fuchsia\.com/([^/#]+)#")
def extract_package_name_from_url(url: str) -> str | None:
"""Given a fuchsia-pkg URL, extract and return the package name.
Example:
fuchsia-pkg://fuchsia.com/my-package#meta/my-component.cm -> my-package
Args:
url (str): A fuchsia-pkg:// URL.
Returns:
str | None: The package name from the URL, or None if parsing failed.
"""
match = _PACKAGE_NAME_REGEX.match(url)
if match is None:
return None
return match.group(1)
class DeviceConfigError(Exception):
"""There was an error reading the device configuration"""
async def get_device_environment_from_exec_env(
exec_env: environment.ExecutionEnvironment,
recorder: event.EventRecorder | None = None,
) -> environment.DeviceEnvironment:
ssh_output = await run_command(
"fx", "ffx", "target", "get-ssh-address", recorder=recorder
)
if not ssh_output or ssh_output.return_code != 0:
raise DeviceConfigError("Failed to get the ssh address of the target")
last_colon_index = ssh_output.stdout.rfind(":")
if last_colon_index == -1:
raise DeviceConfigError(f"Could not parse: {ssh_output.stdout}")
ip = ssh_output.stdout[0:last_colon_index].strip()
port = ssh_output.stdout[last_colon_index + 1 :].strip()
target_output = await run_command(
"fx", "ffx", "target", "default", "get", recorder=recorder
)
if not target_output or target_output.return_code != 0:
raise DeviceConfigError("Failed to get the target name")
target_name = target_output.stdout.strip()
# get the configured private key. Ideally, the private key usage
# should be an implementation detail internal to ffx commands.
ssh_key_output = await run_command(
"fx",
"ffx",
"config",
"get",
"--process",
"file",
"ssh.priv",
recorder=recorder,
)
if not ssh_key_output or ssh_key_output.return_code != 0:
msg = "No return information"
if ssh_key_output:
msg = ssh_key_output.stderr
raise DeviceConfigError(f"Failed to get private ssh key: {msg}")
ssh_path = ssh_key_output.stdout.strip()
# remove any double quotes around the path
ssh_path = ssh_path.replace('"', "")
return environment.DeviceEnvironment(
address=ip, port=port, name=target_name, private_key_path=ssh_path
)
async def run_command(
name: str,
*args: str,
recorder: event.EventRecorder | None = None,
parent: event.Id | None = None,
print_verbatim: bool = False,
symbolize: bool = False,
env: dict[str, str] | None = None,
timeout: float | None = None,
) -> command.CommandOutput | None:
"""Utility method to run a test command asynchronously.
Args:
name (str): Command to run.
args (list[str]): Arguments to the command.
recorder (event.EventRecorder | None):
Recorder for events. Defaults to None.
parent (event.Id | None): Parent event ID for reporting.
Defaults to None.
print_verbatim (bool, optional): If set, record verbatim
output events for stdout and stderr. Defaults to False.
symbolize (bool, optional): If true, pipe output through
symbolizer. Defaults to False.
env (dict[str, str], optional):
Environment to pass to the command. Defaults to None.
timeout (float, optional): The number of seconds to wait before timing out.
Returns:
command.CommandOutput | None: The command output if it could
be executed, None otherwise.
"""
id: event.Id
if recorder is not None:
id = recorder.emit_program_start(name, list(args), env, parent=parent)
try:
symbolizer_args = (
None if not symbolize else ["fx", "ffx", "debug", "symbolize"]
)
started = await command.AsyncCommand.create(
name,
*args,
symbolizer_args=symbolizer_args,
env=env,
timeout=timeout,
)
def handle_event(current_event: command.CommandEvent) -> None:
if recorder is not None:
if isinstance(current_event, command.StdoutEvent):
recorder.emit_program_output(
id,
current_event.text.decode(errors="replace"),
stream=event.ProgramOutputStream.STDOUT,
print_verbatim=print_verbatim,
)
if isinstance(current_event, command.StderrEvent):
recorder.emit_program_output(
id,
current_event.text.decode(errors="replace"),
stream=event.ProgramOutputStream.STDERR,
print_verbatim=print_verbatim,
)
if isinstance(current_event, command.TerminationEvent):
recorder.emit_program_termination(
id, current_event.return_code
)
return await started.run_to_completion(callback=handle_event)
except command.AsyncCommandError as e:
if recorder is not None:
recorder.emit_program_termination(id, -1, error=str(e))
return None