blob: 485c5321800e530d70037d35085275c6d2146d06 [file] [log] [blame]
#!/usr/bin/env python3
#
# Copyright 2022 The Fuchsia Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import enum
import io
import json
import logging
import os
import subprocess
import tempfile
import time
from pathlib import Path, PurePath
from shutil import rmtree
from mobly import logger
from tenacity import retry
from tenacity.stop import stop_after_delay
from tenacity.wait import wait_fixed
from antlion import context, signals, utils
FFX_DEFAULT_COMMAND_TIMEOUT: int = 60
FFX_CONFIG_TIMEOUT_SEC: float = 20
FFX_TARGET_ADD_TIMEOUT_SEC: float = 20
FFX_DAEMON_STOP_TIMEOUT_SEC: float = 4
class FFXError(signals.TestError):
"""Non-zero error code returned from a ffx command."""
def __init__(self, command: str, process: subprocess.CalledProcessError) -> None:
self.command = command
self.stdout: str = process.stdout.decode("utf-8", errors="replace")
self.stderr: str = process.stderr.decode("utf-8", errors="replace")
self.exit_status = process.returncode
super().__init__(self.__str__())
def __str__(self) -> str:
return f'ffx subcommand "{self.command}" returned {self.exit_status}, stdout: "{self.stdout}", stderr: "{self.stderr}"'
class FFXTimeout(signals.TestError):
"""Timed out running a ffx command."""
class OutputFormat(enum.StrEnum):
TEXT = "text"
JSON = "json"
JSON_PRETTY = "json-pretty"
class FFX:
"""Device-specific controller for the ffx tool.
Attributes:
log: Logger for the device-specific instance of ffx.
binary_path: Path to the ffx binary.
mdns_name: mDNS nodename of the default Fuchsia target.
ip: IP address of the default Fuchsia target.
ssh_private_key_path: Path to Fuchsia DUT SSH private key.
"""
def __init__(
self,
binary_path: str,
mdns_name: str,
ip: str | None = None,
ssh_private_key_path: str | None = None,
subtools_search_path: str | None = None,
):
"""
Args:
binary_path: Path to ffx binary.
target: Fuchsia mDNS nodename of default target.
ssh_private_key_path: Path to SSH private key for talking to the
Fuchsia DUT.
"""
self.log = logger.PrefixLoggerAdapter(
logging.getLogger(),
{
logger.PrefixLoggerAdapter.EXTRA_KEY_LOG_PREFIX: f"[ffx | {mdns_name}]",
},
)
self._binary_path = binary_path
self._mdns_name = mdns_name
self._ip = ip
self._ssh_private_key_path = ssh_private_key_path
self._subtools_search_path = subtools_search_path
self._daemon: subprocess.Popen | None = None
self._daemon_log: io.TextIOWrapper | None = None
self._isolate_dir: str | None = None
self._sock_dir: str | None = None
self._ssh_auth_sock_path: str | None = None
self._overnet_socket_path: str | None = None
self._has_been_reachable = False
self._has_logged_version = False
def clean_up(self) -> None:
self._stop_daemon()
self._has_been_reachable = False
self._has_logged_version = False
def run(
self,
command: list[str],
timeout_sec: float = FFX_DEFAULT_COMMAND_TIMEOUT,
skip_status_code_check: bool = False,
skip_reachability_check: bool = False,
output_format: OutputFormat = OutputFormat.TEXT,
) -> subprocess.CompletedProcess:
"""Runs an ffx command.
Verifies reachability before running, if it hasn't already.
Args:
command: Command to run with ffx.
timeout_sec: Seconds to wait for a command to complete.
skip_status_code_check: Whether to check for the status code.
verify_reachable: Whether to verify reachability before running.
output_format: Desired output format; useful for parsing output.
Raises:
FFXTimeout: when the command times out.
FFXError: when the command returns non-zero and skip_status_code_check is False.
Returns:
The results of the command. Note subprocess.CompletedProcess returns
stdout and stderr as a byte-array, not a string. Treat these members
as such or convert to a string using bytes.decode('utf-8').
"""
if not self._daemon:
self._start_daemon()
if not self._has_been_reachable and not skip_reachability_check:
self.log.info(f'Verifying reachability before running "{command}"')
self.verify_reachable()
return self._exec(
command,
timeout_sec,
check=not skip_status_code_check,
output_format=output_format,
)
def _exec(
self,
command: list[str],
timeout_sec: float,
check: bool = True,
output_format: OutputFormat = OutputFormat.TEXT,
) -> subprocess.CompletedProcess[bytes]:
"""Execute a ffx command without any other arguments.
Args:
command: Command to run with ffx.
timeout_sec: Seconds to wait for a command to complete.
check: Whether to check for the status code.
Raises:
FFXTimeout: when the command times out.
FFXError: when the command returns non-zero and skip_status_code_check is False.
Returns:
The results of the command. Note subprocess.CompletedProcess returns
stdout and stderr as a byte-array, not a string. Treat these members
as such or convert to a string using bytes.decode('utf-8').
"""
if not self._isolate_dir:
raise TypeError(
f"Expected _isolate_dir to be a str, got {type(self._isolate_dir)}"
)
self.log.debug(f'Running "{" ".join(command)}".')
full_command = [self._binary_path, "--isolate-dir", self._isolate_dir]
match output_format:
case OutputFormat.TEXT:
full_command += command
case OutputFormat.JSON:
full_command += ["--machine", "json"] + command
case OutputFormat.JSON_PRETTY:
full_command += ["--machine", "json-pretty"] + command
try:
result = subprocess.run(
full_command,
capture_output=True,
timeout=timeout_sec,
check=check,
)
self.log.debug(
f'Result of "{" ".join(command)}":\n'
f'stdout: {result.stdout.decode("utf-8")}\n'
f'stderr: {result.stderr.decode("utf-8")}'
)
return result
except subprocess.CalledProcessError as e:
raise FFXError(" ".join(command), e) from e
except subprocess.TimeoutExpired as e:
raise FFXTimeout(f'Timed out running "{" ".join(command)}"') from e
def _start_daemon(self) -> None:
"""Create a new isolated environment for ffx.
This is needed to avoid overlapping ffx daemons while testing in
parallel, causing the ffx invocations to “upgrade” one daemon to
another, which appears as a flap/restart to another test.
"""
# Store ffx files in a unique directory. Timestamp is used to prevent
# files from being overwritten in the case when a test intentionally
# reboots or resets the device such that a new isolated ffx environment
# is created.
root_dir = context.get_current_context().get_full_output_path()
epoch = utils.get_current_epoch_time()
time_stamp = logger.normalize_log_line_timestamp(
logger.epoch_to_log_line_timestamp(epoch)
)
self._isolate_dir = os.path.join(root_dir, f"{self._mdns_name}_{time_stamp}")
os.makedirs(self._isolate_dir, exist_ok=True)
# Sockets need to be created in a different directory to be guaranteed
# to stay under the maximum socket path length of 104 characters.
# See https://unix.stackexchange.com/q/367008
self._sock_dir = tempfile.mkdtemp()
# On MacOS, the socket paths need to be just paths (not pre-created
# Python tempfiles, which are not socket files).
self._ssh_auth_sock_path = str(PurePath(self._sock_dir, "ssh_auth_sock"))
self._overnet_socket_path = str(PurePath(self._sock_dir, "overnet_socket"))
cmds = [
["config", "set", "log.dir", os.path.join(self._isolate_dir, "ffx_logs")],
["config", "set", "log.level", "debug"],
["config", "set", "target.default", self._mdns_name],
# Use user-specific and device-specific locations for sockets.
# Avoids user permission errors in a multi-user test environment.
# Avoids daemon upgrades when running tests in parallel in a CI
# environment.
["config", "set", "ssh.auth-sock", self._ssh_auth_sock_path],
["config", "set", "overnet.socket", self._overnet_socket_path],
# Alias to disable metrics, device discovery, device auto connection, etc.
["config", "set", "ffx.isolated", "true"],
# Control the daemon's lifecycle directly
["config", "set", "daemon.autostart", "false"],
]
if not self._ip:
cmds.append(["config", "set", "discovery.mdns.enabled", "true"])
# ffx looks for the private key in several default locations. For
# testbeds which have the private key in another location, set it now.
if self._ssh_private_key_path:
cmds.append(
["config", "set", "ssh.priv", f'["{self._ssh_private_key_path}"]']
)
if self._subtools_search_path:
cmds.append(
[
"config",
"set",
"ffx.subtool-search-paths",
self._subtools_search_path,
]
)
for cmd in cmds:
self._exec(cmd, FFX_CONFIG_TIMEOUT_SEC)
self._daemon_log = open(
os.path.join(self._isolate_dir, "daemon.log"), "a+", encoding="utf-8"
)
# Start the daemon
self._daemon = subprocess.Popen(
[self._binary_path, "--isolate-dir", self._isolate_dir, "daemon", "start"],
stdout=self._daemon_log,
)
# Wait for overnet_socket to be created
@retry(stop=stop_after_delay(5), wait=wait_fixed(0.1))
def wait_for_socket(path: str) -> None:
if not Path(path).is_socket():
raise FileNotFoundError(f"Socket not found: {path}")
wait_for_socket(self._overnet_socket_path)
if self._ip:
self._exec(
["target", "add", self._ip, "--nowait"], FFX_TARGET_ADD_TIMEOUT_SEC
)
result = self._exec(["config", "get"], FFX_CONFIG_TIMEOUT_SEC)
self.log.debug(f'Config:\n{result.stdout.decode("utf-8")}')
def _stop_daemon(self) -> None:
if self._daemon:
self.run(
# TODO(b/332983529): Add the following arguments once ffx daemon
# stops correctly.
# ["-t", str(FFX_DAEMON_STOP_TIMEOUT_SEC * 1000)]
["daemon", "stop"],
skip_reachability_check=True,
)
self._daemon.wait(timeout=FFX_DAEMON_STOP_TIMEOUT_SEC)
self._daemon = None
if self._daemon_log:
self._daemon_log.close()
self._daemon_log = None
if self._ssh_auth_sock_path:
Path(self._ssh_auth_sock_path).unlink(missing_ok=True)
self._ssh_auth_sock_path = None
if self._overnet_socket_path:
Path(self._overnet_socket_path).unlink(missing_ok=True)
self._overnet_socket_path = None
if self._sock_dir:
rmtree(self._sock_dir)
self._sock_dir = None
self._isolate_dir = None
def verify_reachable(self, timeout_sec: int = FFX_DEFAULT_COMMAND_TIMEOUT) -> None:
"""Verify the target is reachable via RCS and various services.
Blocks until the device allows for an RCS connection. If the device
isn't reachable within a short time, logs a warning before waiting
longer.
Verifies the RCS connection by fetching information from the device,
which exercises several debug and informational FIDL services.
When called for the first time, the versions will be checked for
compatibility.
Args:
timeout_sec: Seconds to wait for reachability check
Raises:
FFXError: when an unknown error occurs
FFXTimeout: when the target is unreachable
"""
last_err: Exception | None = None
timeout = time.perf_counter() + timeout_sec
while True:
try:
self.run(
["target", "wait"],
timeout_sec=FFX_CONFIG_TIMEOUT_SEC,
skip_reachability_check=True,
)
break
except FFXError as e:
if "took too long connecting to ascendd socket" in e.stderr:
last_err = e
else:
raise e
except FFXTimeout as e:
last_err = e
if time.perf_counter() > timeout:
raise FFXTimeout(
f"Waited over {timeout_sec}s for ffx to become reachable"
) from last_err
# Use a shorter timeout than default because device information
# gathering can hang for a long time if the device is not actually
# connectable.
try:
result = self.run(
["target", "show"],
timeout_sec=15,
skip_reachability_check=True,
output_format=OutputFormat.JSON_PRETTY,
)
except Exception as e:
self.log.error(
f'Failed to reach target device. Try running "{self._binary_path}'
+ ' doctor" to diagnose issues.'
)
raise e
self._has_been_reachable = True
if not self._has_logged_version:
self._has_logged_version = True
self.compare_version(result)
def compare_version(self, target_show_result: subprocess.CompletedProcess) -> None:
"""Compares the version of Fuchsia with the version of ffx.
Args:
target_show_result: Result of the target show command with JSON
output mode enabled
"""
result_raw = target_show_result.stdout
try:
result_json = json.loads(result_raw)
build_info = next(filter(lambda s: s.get("label") == "build", result_json))
version_info = next(
filter(lambda s: s.get("label") == "version", build_info["child"])
)
device_version = version_info.get("value")
except (AttributeError, json.JSONDecodeError) as e:
raise signals.TestAbortClass(
f'Failed to parse response of "ffx target show":\n{result_raw}'
) from e
ffx_version = self.run(["version"]).stdout.decode("utf-8")
self.log.info(f"Device version: {device_version}, ffx version: {ffx_version}")
if device_version != ffx_version:
self.log.warning(
"ffx versions that differ from device versions may"
+ " have compatibility issues. It is recommended to"
+ " use versions within 6 weeks of each other."
)