| #!/usr/bin/env python3 |
| # |
| # Copyright 2022 The Fuchsia Authors |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| import enum |
| import io |
| import json |
| import logging |
| import os |
| import subprocess |
| import tempfile |
| import time |
| from pathlib import Path, PurePath |
| from shutil import rmtree |
| |
| from mobly import logger |
| from tenacity import retry |
| from tenacity.stop import stop_after_delay |
| from tenacity.wait import wait_fixed |
| |
| from antlion import context, signals, utils |
| |
| FFX_DEFAULT_COMMAND_TIMEOUT: int = 60 |
| FFX_CONFIG_TIMEOUT_SEC: float = 20 |
| FFX_TARGET_ADD_TIMEOUT_SEC: float = 20 |
| FFX_DAEMON_STOP_TIMEOUT_SEC: float = 4 |
| |
| |
| class FFXError(signals.TestError): |
| """Non-zero error code returned from a ffx command.""" |
| |
| def __init__(self, command: str, process: subprocess.CalledProcessError) -> None: |
| self.command = command |
| self.stdout: str = process.stdout.decode("utf-8", errors="replace") |
| self.stderr: str = process.stderr.decode("utf-8", errors="replace") |
| self.exit_status = process.returncode |
| super().__init__(self.__str__()) |
| |
| def __str__(self) -> str: |
| return f'ffx subcommand "{self.command}" returned {self.exit_status}, stdout: "{self.stdout}", stderr: "{self.stderr}"' |
| |
| |
| class FFXTimeout(signals.TestError): |
| """Timed out running a ffx command.""" |
| |
| |
| class OutputFormat(enum.StrEnum): |
| TEXT = "text" |
| JSON = "json" |
| JSON_PRETTY = "json-pretty" |
| |
| |
| class FFX: |
| """Device-specific controller for the ffx tool. |
| |
| Attributes: |
| log: Logger for the device-specific instance of ffx. |
| binary_path: Path to the ffx binary. |
| mdns_name: mDNS nodename of the default Fuchsia target. |
| ip: IP address of the default Fuchsia target. |
| ssh_private_key_path: Path to Fuchsia DUT SSH private key. |
| """ |
| |
| def __init__( |
| self, |
| binary_path: str, |
| mdns_name: str, |
| ip: str | None = None, |
| ssh_private_key_path: str | None = None, |
| subtools_search_path: str | None = None, |
| ): |
| """ |
| Args: |
| binary_path: Path to ffx binary. |
| target: Fuchsia mDNS nodename of default target. |
| ssh_private_key_path: Path to SSH private key for talking to the |
| Fuchsia DUT. |
| """ |
| self.log = logger.PrefixLoggerAdapter( |
| logging.getLogger(), |
| { |
| logger.PrefixLoggerAdapter.EXTRA_KEY_LOG_PREFIX: f"[ffx | {mdns_name}]", |
| }, |
| ) |
| self._binary_path = binary_path |
| self._mdns_name = mdns_name |
| self._ip = ip |
| self._ssh_private_key_path = ssh_private_key_path |
| self._subtools_search_path = subtools_search_path |
| |
| self._daemon: subprocess.Popen | None = None |
| self._daemon_log: io.TextIOWrapper | None = None |
| self._isolate_dir: str | None = None |
| self._sock_dir: str | None = None |
| self._ssh_auth_sock_path: str | None = None |
| self._overnet_socket_path: str | None = None |
| self._has_been_reachable = False |
| self._has_logged_version = False |
| |
| def clean_up(self) -> None: |
| self._stop_daemon() |
| self._has_been_reachable = False |
| self._has_logged_version = False |
| |
| def run( |
| self, |
| command: list[str], |
| timeout_sec: float = FFX_DEFAULT_COMMAND_TIMEOUT, |
| skip_status_code_check: bool = False, |
| skip_reachability_check: bool = False, |
| output_format: OutputFormat = OutputFormat.TEXT, |
| ) -> subprocess.CompletedProcess: |
| """Runs an ffx command. |
| |
| Verifies reachability before running, if it hasn't already. |
| |
| Args: |
| command: Command to run with ffx. |
| timeout_sec: Seconds to wait for a command to complete. |
| skip_status_code_check: Whether to check for the status code. |
| verify_reachable: Whether to verify reachability before running. |
| output_format: Desired output format; useful for parsing output. |
| |
| Raises: |
| FFXTimeout: when the command times out. |
| FFXError: when the command returns non-zero and skip_status_code_check is False. |
| |
| Returns: |
| The results of the command. Note subprocess.CompletedProcess returns |
| stdout and stderr as a byte-array, not a string. Treat these members |
| as such or convert to a string using bytes.decode('utf-8'). |
| """ |
| if not self._daemon: |
| self._start_daemon() |
| if not self._has_been_reachable and not skip_reachability_check: |
| self.log.info(f'Verifying reachability before running "{command}"') |
| self.verify_reachable() |
| return self._exec( |
| command, |
| timeout_sec, |
| check=not skip_status_code_check, |
| output_format=output_format, |
| ) |
| |
| def _exec( |
| self, |
| command: list[str], |
| timeout_sec: float, |
| check: bool = True, |
| output_format: OutputFormat = OutputFormat.TEXT, |
| ) -> subprocess.CompletedProcess[bytes]: |
| """Execute a ffx command without any other arguments. |
| |
| Args: |
| command: Command to run with ffx. |
| timeout_sec: Seconds to wait for a command to complete. |
| check: Whether to check for the status code. |
| |
| Raises: |
| FFXTimeout: when the command times out. |
| FFXError: when the command returns non-zero and skip_status_code_check is False. |
| |
| Returns: |
| The results of the command. Note subprocess.CompletedProcess returns |
| stdout and stderr as a byte-array, not a string. Treat these members |
| as such or convert to a string using bytes.decode('utf-8'). |
| """ |
| if not self._isolate_dir: |
| raise TypeError( |
| f"Expected _isolate_dir to be a str, got {type(self._isolate_dir)}" |
| ) |
| |
| self.log.debug(f'Running "{" ".join(command)}".') |
| |
| full_command = [self._binary_path, "--isolate-dir", self._isolate_dir] |
| match output_format: |
| case OutputFormat.TEXT: |
| full_command += command |
| case OutputFormat.JSON: |
| full_command += ["--machine", "json"] + command |
| case OutputFormat.JSON_PRETTY: |
| full_command += ["--machine", "json-pretty"] + command |
| |
| try: |
| result = subprocess.run( |
| full_command, |
| capture_output=True, |
| timeout=timeout_sec, |
| check=check, |
| ) |
| self.log.debug( |
| f'Result of "{" ".join(command)}":\n' |
| f'stdout: {result.stdout.decode("utf-8")}\n' |
| f'stderr: {result.stderr.decode("utf-8")}' |
| ) |
| return result |
| except subprocess.CalledProcessError as e: |
| raise FFXError(" ".join(command), e) from e |
| except subprocess.TimeoutExpired as e: |
| raise FFXTimeout(f'Timed out running "{" ".join(command)}"') from e |
| |
| def _start_daemon(self) -> None: |
| """Create a new isolated environment for ffx. |
| |
| This is needed to avoid overlapping ffx daemons while testing in |
| parallel, causing the ffx invocations to “upgrade” one daemon to |
| another, which appears as a flap/restart to another test. |
| """ |
| # Store ffx files in a unique directory. Timestamp is used to prevent |
| # files from being overwritten in the case when a test intentionally |
| # reboots or resets the device such that a new isolated ffx environment |
| # is created. |
| root_dir = context.get_current_context().get_full_output_path() |
| epoch = utils.get_current_epoch_time() |
| time_stamp = logger.normalize_log_line_timestamp( |
| logger.epoch_to_log_line_timestamp(epoch) |
| ) |
| self._isolate_dir = os.path.join(root_dir, f"{self._mdns_name}_{time_stamp}") |
| os.makedirs(self._isolate_dir, exist_ok=True) |
| |
| # Sockets need to be created in a different directory to be guaranteed |
| # to stay under the maximum socket path length of 104 characters. |
| # See https://unix.stackexchange.com/q/367008 |
| self._sock_dir = tempfile.mkdtemp() |
| # On MacOS, the socket paths need to be just paths (not pre-created |
| # Python tempfiles, which are not socket files). |
| self._ssh_auth_sock_path = str(PurePath(self._sock_dir, "ssh_auth_sock")) |
| self._overnet_socket_path = str(PurePath(self._sock_dir, "overnet_socket")) |
| |
| cmds = [ |
| ["config", "set", "log.dir", os.path.join(self._isolate_dir, "ffx_logs")], |
| ["config", "set", "log.level", "debug"], |
| ["config", "set", "target.default", self._mdns_name], |
| # Use user-specific and device-specific locations for sockets. |
| # Avoids user permission errors in a multi-user test environment. |
| # Avoids daemon upgrades when running tests in parallel in a CI |
| # environment. |
| ["config", "set", "ssh.auth-sock", self._ssh_auth_sock_path], |
| ["config", "set", "overnet.socket", self._overnet_socket_path], |
| # Alias to disable metrics, device discovery, device auto connection, etc. |
| ["config", "set", "ffx.isolated", "true"], |
| # Control the daemon's lifecycle directly |
| ["config", "set", "daemon.autostart", "false"], |
| ] |
| |
| if not self._ip: |
| cmds.append(["config", "set", "discovery.mdns.enabled", "true"]) |
| |
| # ffx looks for the private key in several default locations. For |
| # testbeds which have the private key in another location, set it now. |
| if self._ssh_private_key_path: |
| cmds.append( |
| ["config", "set", "ssh.priv", f'["{self._ssh_private_key_path}"]'] |
| ) |
| |
| if self._subtools_search_path: |
| cmds.append( |
| [ |
| "config", |
| "set", |
| "ffx.subtool-search-paths", |
| self._subtools_search_path, |
| ] |
| ) |
| |
| for cmd in cmds: |
| self._exec(cmd, FFX_CONFIG_TIMEOUT_SEC) |
| |
| self._daemon_log = open( |
| os.path.join(self._isolate_dir, "daemon.log"), "a+", encoding="utf-8" |
| ) |
| |
| # Start the daemon |
| self._daemon = subprocess.Popen( |
| [self._binary_path, "--isolate-dir", self._isolate_dir, "daemon", "start"], |
| stdout=self._daemon_log, |
| ) |
| |
| # Wait for overnet_socket to be created |
| @retry(stop=stop_after_delay(5), wait=wait_fixed(0.1)) |
| def wait_for_socket(path: str) -> None: |
| if not Path(path).is_socket(): |
| raise FileNotFoundError(f"Socket not found: {path}") |
| |
| wait_for_socket(self._overnet_socket_path) |
| |
| if self._ip: |
| self._exec( |
| ["target", "add", self._ip, "--nowait"], FFX_TARGET_ADD_TIMEOUT_SEC |
| ) |
| |
| result = self._exec(["config", "get"], FFX_CONFIG_TIMEOUT_SEC) |
| self.log.debug(f'Config:\n{result.stdout.decode("utf-8")}') |
| |
| def _stop_daemon(self) -> None: |
| if self._daemon: |
| self.run( |
| # TODO(b/332983529): Add the following arguments once ffx daemon |
| # stops correctly. |
| # ["-t", str(FFX_DAEMON_STOP_TIMEOUT_SEC * 1000)] |
| ["daemon", "stop"], |
| skip_reachability_check=True, |
| ) |
| self._daemon.wait(timeout=FFX_DAEMON_STOP_TIMEOUT_SEC) |
| self._daemon = None |
| |
| if self._daemon_log: |
| self._daemon_log.close() |
| self._daemon_log = None |
| |
| if self._ssh_auth_sock_path: |
| Path(self._ssh_auth_sock_path).unlink(missing_ok=True) |
| self._ssh_auth_sock_path = None |
| |
| if self._overnet_socket_path: |
| Path(self._overnet_socket_path).unlink(missing_ok=True) |
| self._overnet_socket_path = None |
| |
| if self._sock_dir: |
| rmtree(self._sock_dir) |
| self._sock_dir = None |
| |
| self._isolate_dir = None |
| |
| def verify_reachable(self, timeout_sec: int = FFX_DEFAULT_COMMAND_TIMEOUT) -> None: |
| """Verify the target is reachable via RCS and various services. |
| |
| Blocks until the device allows for an RCS connection. If the device |
| isn't reachable within a short time, logs a warning before waiting |
| longer. |
| |
| Verifies the RCS connection by fetching information from the device, |
| which exercises several debug and informational FIDL services. |
| |
| When called for the first time, the versions will be checked for |
| compatibility. |
| |
| Args: |
| timeout_sec: Seconds to wait for reachability check |
| |
| Raises: |
| FFXError: when an unknown error occurs |
| FFXTimeout: when the target is unreachable |
| """ |
| last_err: Exception | None = None |
| timeout = time.perf_counter() + timeout_sec |
| while True: |
| try: |
| self.run( |
| ["target", "wait"], |
| timeout_sec=FFX_CONFIG_TIMEOUT_SEC, |
| skip_reachability_check=True, |
| ) |
| break |
| except FFXError as e: |
| if "took too long connecting to ascendd socket" in e.stderr: |
| last_err = e |
| else: |
| raise e |
| except FFXTimeout as e: |
| last_err = e |
| |
| if time.perf_counter() > timeout: |
| raise FFXTimeout( |
| f"Waited over {timeout_sec}s for ffx to become reachable" |
| ) from last_err |
| |
| # Use a shorter timeout than default because device information |
| # gathering can hang for a long time if the device is not actually |
| # connectable. |
| try: |
| result = self.run( |
| ["target", "show"], |
| timeout_sec=15, |
| skip_reachability_check=True, |
| output_format=OutputFormat.JSON_PRETTY, |
| ) |
| except Exception as e: |
| self.log.error( |
| f'Failed to reach target device. Try running "{self._binary_path}' |
| + ' doctor" to diagnose issues.' |
| ) |
| raise e |
| |
| self._has_been_reachable = True |
| |
| if not self._has_logged_version: |
| self._has_logged_version = True |
| self.compare_version(result) |
| |
| def compare_version(self, target_show_result: subprocess.CompletedProcess) -> None: |
| """Compares the version of Fuchsia with the version of ffx. |
| |
| Args: |
| target_show_result: Result of the target show command with JSON |
| output mode enabled |
| """ |
| result_raw = target_show_result.stdout |
| try: |
| result_json = json.loads(result_raw) |
| build_info = next(filter(lambda s: s.get("label") == "build", result_json)) |
| version_info = next( |
| filter(lambda s: s.get("label") == "version", build_info["child"]) |
| ) |
| device_version = version_info.get("value") |
| except (AttributeError, json.JSONDecodeError) as e: |
| raise signals.TestAbortClass( |
| f'Failed to parse response of "ffx target show":\n{result_raw}' |
| ) from e |
| |
| ffx_version = self.run(["version"]).stdout.decode("utf-8") |
| |
| self.log.info(f"Device version: {device_version}, ffx version: {ffx_version}") |
| if device_version != ffx_version: |
| self.log.warning( |
| "ffx versions that differ from device versions may" |
| + " have compatibility issues. It is recommended to" |
| + " use versions within 6 weeks of each other." |
| ) |