| #!/usr/bin/env python3 |
| # |
| # Copyright 2022 The Fuchsia Authors |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| from __future__ import annotations |
| |
| import logging |
| import os |
| import re |
| import textwrap |
| import time |
| from typing import Any, Mapping |
| |
| from antlion import context |
| from antlion import logger as acts_logger |
| from antlion import signals, utils |
| from antlion.capabilities.ssh import DEFAULT_SSH_PORT, SSHConfig, SSHError |
| from antlion.controllers import pdu |
| from antlion.controllers.fuchsia_lib.ffx import FFX |
| from antlion.controllers.fuchsia_lib.lib_controllers.wlan_controller import ( |
| WlanController, |
| ) |
| from antlion.controllers.fuchsia_lib.lib_controllers.wlan_policy_controller import ( |
| WlanPolicyController, |
| ) |
| from antlion.controllers.fuchsia_lib.package_server import PackageServer |
| from antlion.controllers.fuchsia_lib.sl4f import SL4F |
| from antlion.controllers.fuchsia_lib.ssh import ( |
| DEFAULT_SSH_PRIVATE_KEY, |
| DEFAULT_SSH_USER, |
| FuchsiaSSHProvider, |
| ) |
| from antlion.controllers.fuchsia_lib.utils_lib import flash |
| from antlion.decorators import cached_property |
| from antlion.utils import get_fuchsia_mdns_ipv6_address, get_interface_ip_addresses |
| |
| MOBLY_CONTROLLER_CONFIG_NAME = "FuchsiaDevice" |
| ACTS_CONTROLLER_REFERENCE_NAME = "fuchsia_devices" |
| |
| FUCHSIA_DEVICE_EMPTY_CONFIG_MSG = "Configuration is empty, abort!" |
| FUCHSIA_DEVICE_NOT_LIST_CONFIG_MSG = "Configuration should be a list, abort!" |
| FUCHSIA_DEVICE_INVALID_CONFIG = ( |
| "Fuchsia device config must be either a str " |
| "or dict. abort! Invalid element %i in %r" |
| ) |
| FUCHSIA_DEVICE_NO_IP_MSG = "No IP address specified, abort!" |
| FUCHSIA_COULD_NOT_GET_DESIRED_STATE = "Could not %s %s." |
| FUCHSIA_INVALID_CONTROL_STATE = "Invalid control state (%s). abort!" |
| |
| FUCHSIA_TIME_IN_NANOSECONDS = 1000000000 |
| |
| SL4F_APK_NAME = "com.googlecode.android_scripting" |
| DAEMON_INIT_TIMEOUT_SEC = 1 |
| |
| DAEMON_ACTIVATED_STATES = ["running", "start"] |
| DAEMON_DEACTIVATED_STATES = ["stop", "stopped"] |
| |
| FUCHSIA_RECONNECT_AFTER_REBOOT_TIME = 5 |
| |
| CHANNEL_OPEN_TIMEOUT = 5 |
| |
| FUCHSIA_REBOOT_TYPE_SOFT = "soft" |
| FUCHSIA_REBOOT_TYPE_SOFT_AND_FLASH = "flash" |
| FUCHSIA_REBOOT_TYPE_HARD = "hard" |
| |
| FUCHSIA_DEFAULT_CONNECT_TIMEOUT = 90 |
| FUCHSIA_DEFAULT_COMMAND_TIMEOUT = 60 |
| |
| FUCHSIA_DEFAULT_CLEAN_UP_COMMAND_TIMEOUT = 15 |
| |
| FUCHSIA_COUNTRY_CODE_TIMEOUT = 15 |
| FUCHSIA_DEFAULT_COUNTRY_CODE_US = "US" |
| |
| MDNS_LOOKUP_RETRY_MAX = 3 |
| |
| VALID_ASSOCIATION_MECHANISMS = {None, "policy", "drivers"} |
| |
| # Duration to wait for the Fuchsia device to acquire an IP address after |
| # requested to join a network. |
| # |
| # Acquiring an IP address after connecting to a WLAN network could take up to |
| # 15 seconds if we get unlucky: |
| # |
| # 1. An outgoing passive scan just started (~7s) |
| # 2. An active scan is queued for the newly saved network (~7s) |
| # 3. The initial connection attempt fails (~1s) |
| IP_ADDRESS_TIMEOUT = 30 |
| |
| |
| class FuchsiaDeviceError(signals.ControllerError): |
| pass |
| |
| |
| class FuchsiaConfigError(signals.ControllerError): |
| """Incorrect FuchsiaDevice configuration.""" |
| |
| |
| def create(configs: object) -> list[FuchsiaDevice]: |
| if not configs: |
| raise FuchsiaDeviceError(FUCHSIA_DEVICE_EMPTY_CONFIG_MSG) |
| elif not isinstance(configs, list): |
| raise FuchsiaDeviceError(FUCHSIA_DEVICE_NOT_LIST_CONFIG_MSG) |
| for index, config in enumerate(configs): |
| if isinstance(config, str): |
| configs[index] = {"ip": config} |
| elif not isinstance(config, dict): |
| raise FuchsiaDeviceError(FUCHSIA_DEVICE_INVALID_CONFIG % (index, configs)) |
| return get_instances(configs) |
| |
| |
| def destroy(fds: list[FuchsiaDevice]) -> None: |
| for fd in fds: |
| fd.clean_up() |
| del fd |
| |
| |
| def get_info(fds): |
| """Get information on a list of FuchsiaDevice objects. |
| |
| Args: |
| fds: A list of FuchsiaDevice objects. |
| |
| Returns: |
| A list of dict, each representing info for FuchsiaDevice objects. |
| """ |
| device_info = [] |
| for fd in fds: |
| info = {"ip": fd.ip} |
| device_info.append(info) |
| return device_info |
| |
| |
| def get_instances(fds_conf_data: list[dict]) -> list[FuchsiaDevice]: |
| """Create FuchsiaDevice instances from a list of Fuchsia ips. |
| |
| Args: |
| fds_conf_data: A list of dicts that contain Fuchsia device info. |
| |
| Returns: |
| A list of FuchsiaDevice objects. |
| """ |
| |
| return [FuchsiaDevice(fd_conf_data) for fd_conf_data in fds_conf_data] |
| |
| |
| class FuchsiaDevice: |
| """Class representing a Fuchsia device. |
| |
| Each object of this class represents one Fuchsia device in ACTS. |
| |
| Attributes: |
| ip: The full address or Fuchsia abstract name to contact the Fuchsia |
| device at |
| log: A logger object. |
| ssh_port: The SSH TCP port number of the Fuchsia device. |
| sl4f_port: The SL4F HTTP port number of the Fuchsia device. |
| ssh_config: The ssh_config for connecting to the Fuchsia device. |
| """ |
| |
| def __init__(self, fd_conf_data: Mapping[str, Any]) -> None: |
| self.conf_data = fd_conf_data |
| if "ip" not in fd_conf_data: |
| raise FuchsiaDeviceError(FUCHSIA_DEVICE_NO_IP_MSG) |
| self.ip: str = fd_conf_data["ip"] |
| self.orig_ip: str = fd_conf_data["ip"] |
| self.sl4f_port: int = fd_conf_data.get("sl4f_port", 80) |
| self.ssh_username: str = fd_conf_data.get("ssh_username", DEFAULT_SSH_USER) |
| self.ssh_port: int = fd_conf_data.get("ssh_port", DEFAULT_SSH_PORT) |
| self.ssh_binary_path: str = fd_conf_data.get("ssh_binary_path", "ssh") |
| |
| def expand(path: str) -> str: |
| return os.path.expandvars(os.path.expanduser(path)) |
| |
| def path_from_config(name: str, default: str | None = None) -> str | None: |
| path = fd_conf_data.get(name, default) |
| if default is None and path is None: |
| return None |
| if type(path) != str: |
| raise FuchsiaConfigError(f"{name} must be a string, got {path}") |
| return expand(path) |
| |
| def assert_exists(name: str, path: str | None) -> None: |
| if path is None: |
| raise FuchsiaDeviceError( |
| f'Please specify "${name}" in your configuration file' |
| ) |
| if not os.path.exists(path): |
| raise FuchsiaDeviceError( |
| f'Please specify a correct "${name}" in your configuration ' |
| f'file: "{path}" does not exist' |
| ) |
| |
| self.specific_image: str | None = path_from_config("specific_image") |
| if self.specific_image: |
| assert_exists("specific_image", self.specific_image) |
| |
| # Path to a tar.gz archive with pm and amber-files, as necessary for |
| # starting a package server. |
| self.packages_archive_path: str | None = path_from_config( |
| "packages_archive_path" |
| ) |
| if self.packages_archive_path: |
| assert_exists("packages_archive_path", self.packages_archive_path) |
| |
| def required_path_from_config(name: str, default: str | None = None) -> str: |
| path = path_from_config(name, default) |
| if path is None: |
| raise FuchsiaConfigError(f"{name} is a required config field") |
| assert_exists(name, path) |
| return path |
| |
| self.ssh_priv_key: str = required_path_from_config( |
| "ssh_priv_key", DEFAULT_SSH_PRIVATE_KEY |
| ) |
| self.ffx_binary_path: str = required_path_from_config( |
| "ffx_binary_path", "${FUCHSIA_DIR}/.jiri_root/bin/ffx" |
| ) |
| |
| self.authorized_file: str | None = fd_conf_data.get("authorized_file_loc", None) |
| self.serial_number: str | None = fd_conf_data.get("serial_number", None) |
| self.device_type: str | None = fd_conf_data.get("device_type", None) |
| self.product_type: str | None = fd_conf_data.get("product_type", None) |
| self.board_type: str | None = fd_conf_data.get("board_type", None) |
| self.build_number: str | None = fd_conf_data.get("build_number", None) |
| self.build_type: str | None = fd_conf_data.get("build_type", None) |
| |
| self.mdns_name: str | None = fd_conf_data.get("mdns_name", None) |
| |
| self.hard_reboot_on_fail: bool = fd_conf_data.get("hard_reboot_on_fail", False) |
| self.take_bug_report_on_fail: bool = fd_conf_data.get( |
| "take_bug_report_on_fail", False |
| ) |
| self.device_pdu_config = fd_conf_data.get("PduDevice", None) |
| self.config_country_code: str = fd_conf_data.get( |
| "country_code", FUCHSIA_DEFAULT_COUNTRY_CODE_US |
| ).upper() |
| |
| output_path = context.get_current_context().get_base_output_path() |
| self.ssh_config = os.path.join(output_path, f"ssh_config_{self.ip}") |
| self._generate_ssh_config(self.ssh_config) |
| |
| # WLAN interface info is populated inside configure_wlan |
| self.wlan_client_interfaces: dict[str, Any] = {} |
| self.wlan_ap_interfaces: dict[str, Any] = {} |
| self.wlan_client_test_interface_name = fd_conf_data.get( |
| "wlan_client_test_interface", None |
| ) |
| self.wlan_ap_test_interface_name = fd_conf_data.get( |
| "wlan_ap_test_interface", None |
| ) |
| self.wlan_features: list[str] = fd_conf_data.get("wlan_features", []) |
| |
| # Whether to use 'policy' or 'drivers' for WLAN connect/disconnect calls |
| # If set to None, wlan is not configured. |
| self.association_mechanism: str | None = None |
| # Defaults to policy layer, unless otherwise specified in the config |
| self.default_association_mechanism = fd_conf_data.get( |
| "association_mechanism", "policy" |
| ) |
| |
| # Whether to clear and preserve existing saved networks and client |
| # connections state, to be restored at device teardown. |
| self.default_preserve_saved_networks = fd_conf_data.get( |
| "preserve_saved_networks", True |
| ) |
| |
| if not utils.is_valid_ipv4_address(self.ip) and not utils.is_valid_ipv6_address( |
| self.ip |
| ): |
| mdns_ip = None |
| for retry_counter in range(MDNS_LOOKUP_RETRY_MAX): |
| mdns_ip = get_fuchsia_mdns_ipv6_address(self.ip) |
| if mdns_ip: |
| break |
| else: |
| time.sleep(1) |
| if mdns_ip and utils.is_valid_ipv6_address(mdns_ip): |
| # self.ip was actually an mdns name. Use it for self.mdns_name |
| # unless one was explicitly provided. |
| self.mdns_name = self.mdns_name or self.ip |
| self.ip = mdns_ip |
| else: |
| raise ValueError(f"Invalid IP: {self.ip}") |
| |
| self.log = acts_logger.create_tagged_trace_logger( |
| f"FuchsiaDevice | {self.orig_ip}" |
| ) |
| |
| self.ping_rtt_match = re.compile( |
| r"RTT Min/Max/Avg " r"= \[ (.*?) / (.*?) / (.*?) \] ms" |
| ) |
| self.serial = re.sub("[.:%]", "_", self.ip) |
| log_path_base = getattr(logging, "log_path", "/tmp/logs") |
| self.log_path = os.path.join(log_path_base, f"FuchsiaDevice{self.serial}") |
| self.fuchsia_log_file_path = os.path.join( |
| self.log_path, f"fuchsialog_{self.serial}_debug.txt" |
| ) |
| self.log_process = None |
| self.package_server: PackageServer | None = None |
| |
| self.init_controllers() |
| |
| @cached_property |
| def sl4f(self) -> SL4F: |
| """Get the sl4f module configured for this device.""" |
| self.log.info("Started SL4F server") |
| return SL4F(self.ssh, self.sl4f_port) |
| |
| @cached_property |
| def ssh(self) -> FuchsiaSSHProvider: |
| """Get the SSH provider module configured for this device.""" |
| if not self.ssh_port: |
| raise FuchsiaConfigError( |
| 'Must provide "ssh_port: <int>" in the device config' |
| ) |
| if not self.ssh_priv_key: |
| raise FuchsiaConfigError( |
| 'Must provide "ssh_priv_key: <file path>" in the device config' |
| ) |
| return FuchsiaSSHProvider( |
| SSHConfig( |
| self.ssh_username, |
| self.ip, |
| self.ssh_priv_key, |
| port=self.ssh_port, |
| ssh_binary=self.ssh_binary_path, |
| ) |
| ) |
| |
| @cached_property |
| def ffx(self) -> FFX: |
| """Get the ffx module configured for this device. |
| |
| The ffx module uses lazy-initialization; it will initialize an ffx |
| connection to the device when it is required. |
| |
| If ffx needs to be reinitialized, delete the "ffx" property and attempt |
| access again. Note re-initialization will interrupt any running ffx |
| calls. |
| """ |
| if not self.mdns_name: |
| raise FuchsiaConfigError( |
| 'Must provide "mdns_name: <device mDNS name>" in the device config' |
| ) |
| return FFX(self.ffx_binary_path, self.mdns_name, self.ip, self.ssh_priv_key) |
| |
| @ffx.deleter |
| # TODO(https://github.com/python/mypy/issues/11008): Rename to ffx |
| def ffx_deleter(self, ffx: FFX) -> None: |
| self.log.debug("Cleaning up ffx") |
| ffx.clean_up() |
| |
| @cached_property |
| def wlan_policy_controller(self): |
| return WlanPolicyController(self.sl4f, self.ssh) |
| |
| def _generate_ssh_config(self, file_path: str): |
| """Generate and write an SSH config for Fuchsia to disk. |
| |
| Args: |
| file_path: Path to write the generated SSH config |
| """ |
| content = textwrap.dedent( |
| f"""\ |
| Host * |
| CheckHostIP no |
| StrictHostKeyChecking no |
| ForwardAgent no |
| ForwardX11 no |
| GSSAPIDelegateCredentials no |
| UserKnownHostsFile /dev/null |
| User fuchsia |
| IdentitiesOnly yes |
| IdentityFile {self.ssh_priv_key} |
| ControlPersist yes |
| ControlMaster auto |
| ControlPath /tmp/fuchsia--%r@%h:%p |
| ServerAliveInterval 1 |
| ServerAliveCountMax 1 |
| LogLevel ERROR |
| """ |
| ) |
| |
| with open(file_path, "w") as file: |
| file.write(content) |
| |
| def init_controllers(self): |
| # Contains WLAN core functions |
| self.wlan_controller = WlanController(self) |
| |
| def start_package_server(self): |
| if not self.packages_archive_path: |
| self.log.warn( |
| "packages_archive_path is not specified. " |
| "Assuming a package server is already running and configured on " |
| "the DUT. If this is not the case, either run your own package " |
| "server, or configure these fields appropriately. " |
| "This is usually required for the Fuchsia iPerf3 client or " |
| "other testing utilities not on device cache." |
| ) |
| return |
| if self.package_server: |
| self.log.warn( |
| "Skipping to start the package server since is already running" |
| ) |
| return |
| |
| self.package_server = PackageServer(self.packages_archive_path) |
| self.package_server.start() |
| self.package_server.configure_device(self.ssh) |
| |
| def run_commands_from_config(self, cmd_dicts): |
| """Runs commands on the Fuchsia device from the config file. Useful for |
| device and/or Fuchsia specific configuration. |
| |
| Args: |
| cmd_dicts: list of dictionaries containing the following |
| 'cmd': string, command to run on device |
| 'timeout': int, seconds to wait for command to run (optional) |
| 'skip_status_code_check': bool, disregard errors if true |
| |
| Raises: |
| FuchsiaDeviceError: if any of the commands return a non-zero status |
| code and skip_status_code_check is false or undefined. |
| """ |
| for cmd_dict in cmd_dicts: |
| try: |
| cmd = cmd_dict["cmd"] |
| except KeyError: |
| raise FuchsiaDeviceError( |
| 'To run a command via config, you must provide key "cmd" ' |
| "containing the command string." |
| ) |
| |
| timeout = cmd_dict.get("timeout", FUCHSIA_DEFAULT_COMMAND_TIMEOUT) |
| # Catch both boolean and string values from JSON |
| skip_status_code_check = ( |
| "true" == str(cmd_dict.get("skip_status_code_check", False)).lower() |
| ) |
| |
| if skip_status_code_check: |
| self.log.info(f'Running command "{cmd}" and ignoring result.') |
| else: |
| self.log.info(f'Running command "{cmd}".') |
| |
| try: |
| result = self.ssh.run(cmd, timeout_sec=timeout) |
| self.log.debug(result) |
| except SSHError as e: |
| if not skip_status_code_check: |
| raise FuchsiaDeviceError( |
| "Failed device specific commands for initial configuration" |
| ) from e |
| |
| def configure_wlan( |
| self, |
| association_mechanism: str | None = None, |
| preserve_saved_networks: bool | None = None, |
| ) -> None: |
| """ |
| Readies device for WLAN functionality. If applicable, connects to the |
| policy layer and clears/saves preexisting saved networks. |
| |
| Args: |
| association_mechanism: either 'policy' or 'drivers'. If None, uses |
| the default value from init (can be set by ACTS config) |
| preserve_saved_networks: whether to clear existing saved |
| networks, and preserve them for restoration later. If None, uses |
| the default value from init (can be set by ACTS config) |
| |
| Raises: |
| FuchsiaDeviceError, if configuration fails |
| """ |
| |
| # Set the country code US by default, or country code provided |
| # in ACTS config |
| self.wlan_policy_controller.set_country_code(self.config_country_code) |
| |
| # If args aren't provided, use the defaults, which can be set in the |
| # config. |
| if association_mechanism is None: |
| association_mechanism = self.default_association_mechanism |
| if preserve_saved_networks is None: |
| preserve_saved_networks = self.default_preserve_saved_networks |
| |
| if association_mechanism not in VALID_ASSOCIATION_MECHANISMS: |
| raise FuchsiaDeviceError( |
| f"Invalid FuchsiaDevice association_mechanism: {association_mechanism}" |
| ) |
| |
| # Allows for wlan to be set up differently in different tests |
| if self.association_mechanism: |
| self.log.info("Deconfiguring WLAN") |
| self.deconfigure_wlan() |
| |
| self.association_mechanism = association_mechanism |
| |
| self.log.info( |
| "Configuring WLAN w/ association mechanism: " f"{association_mechanism}" |
| ) |
| if association_mechanism == "drivers": |
| self.log.warn( |
| "You may encounter unusual device behavior when using the " |
| "drivers directly for WLAN. This should be reserved for " |
| "debugging specific issues. Normal test runs should use the " |
| "policy layer." |
| ) |
| if preserve_saved_networks: |
| self.log.warn( |
| "Unable to preserve saved networks when using drivers " |
| "association mechanism (requires policy layer control)." |
| ) |
| else: |
| # This requires SL4F calls, so it can only happen with actual |
| # devices, not with unit tests. |
| self.wlan_policy_controller.configure_wlan(preserve_saved_networks) |
| |
| # Retrieve WLAN client and AP interfaces |
| self.wlan_controller.update_wlan_interfaces() |
| |
| def deconfigure_wlan(self): |
| """ |
| Stops WLAN functionality (if it has been started). Used to allow |
| different tests to use WLAN differently (e.g. some tests require using |
| wlan policy, while the abstract wlan_device can be setup to use policy |
| or drivers) |
| |
| Raises: |
| FuchsiaDeviveError, if deconfigure fails. |
| """ |
| if not self.association_mechanism: |
| self.log.debug("WLAN not configured before deconfigure was called.") |
| return |
| # If using policy, stop client connections. Otherwise, just clear |
| # variables. |
| if self.association_mechanism != "drivers": |
| self.wlan_policy_controller._deconfigure_wlan() |
| self.association_mechanism = None |
| |
| def reboot( |
| self, |
| use_ssh: bool = False, |
| unreachable_timeout: int = FUCHSIA_DEFAULT_CONNECT_TIMEOUT, |
| reboot_type: str = FUCHSIA_REBOOT_TYPE_SOFT, |
| testbed_pdus: list[pdu.PduDevice] = [], |
| ) -> None: |
| """Reboot a FuchsiaDevice. |
| |
| Soft reboots the device, verifies it becomes unreachable, then verifies |
| it comes back online. Re-initializes services so the tests can continue. |
| |
| Args: |
| use_ssh: if True, use fuchsia shell command via ssh to reboot |
| instead of SL4F. |
| unreachable_timeout: time to wait for device to become unreachable. |
| reboot_type: 'soft', 'hard' or 'flash'. |
| testbed_pdus: all testbed PDUs. |
| |
| Raises: |
| ConnectionError, if device fails to become unreachable or fails to |
| come back up. |
| """ |
| if reboot_type == FUCHSIA_REBOOT_TYPE_SOFT: |
| if use_ssh: |
| self.log.info("Soft rebooting via SSH") |
| try: |
| self.ssh.run( |
| "dm reboot", timeout_sec=FUCHSIA_RECONNECT_AFTER_REBOOT_TIME |
| ) |
| except SSHError as e: |
| if "closed by remote host" not in e.result.stderr: |
| raise e |
| else: |
| self.log.info("Soft rebooting via SL4F") |
| self.sl4f.hardware_power_statecontrol_lib.suspendReboot(timeout=3) |
| self.ssh.wait_until_unreachable(timeout_sec=unreachable_timeout) |
| |
| elif reboot_type == FUCHSIA_REBOOT_TYPE_HARD: |
| self.log.info("Hard rebooting via PDU") |
| if not testbed_pdus: |
| raise AttributeError( |
| "Testbed PDUs must be supplied " "to hard reboot a fuchsia_device." |
| ) |
| device_pdu, device_pdu_port = pdu.get_pdu_port_for_device( |
| self.device_pdu_config, testbed_pdus |
| ) |
| self.log.info("Killing power to FuchsiaDevice") |
| device_pdu.off(str(device_pdu_port)) |
| self.ssh.wait_until_unreachable(timeout_sec=unreachable_timeout) |
| self.log.info("Restoring power to FuchsiaDevice") |
| device_pdu.on(str(device_pdu_port)) |
| |
| elif reboot_type == FUCHSIA_REBOOT_TYPE_SOFT_AND_FLASH: |
| flash(self, use_ssh, FUCHSIA_RECONNECT_AFTER_REBOOT_TIME) |
| |
| else: |
| raise ValueError(f"Invalid reboot type: {reboot_type}") |
| |
| # Cleanup services |
| self.stop_services() |
| |
| # TODO (b/246852449): Move configure_wlan to other controllers. |
| # If wlan was configured before reboot, it must be configured again |
| # after rebooting, as it was before reboot. No preserving should occur. |
| if self.association_mechanism: |
| pre_reboot_association_mechanism = self.association_mechanism |
| # Prevent configure_wlan from thinking it needs to deconfigure first |
| self.association_mechanism = None |
| self.configure_wlan( |
| association_mechanism=pre_reboot_association_mechanism, |
| preserve_saved_networks=False, |
| ) |
| |
| self.log.info("Device has rebooted") |
| |
| def version(self) -> str: |
| """Return the version of Fuchsia running on the device.""" |
| return self.sl4f.device_lib.get_version()["result"] |
| |
| def device_name(self) -> str: |
| """Return the name of the device.""" |
| return self.sl4f.device_lib.get_device_name()["result"] |
| |
| def product_name(self) -> str: |
| """Return the product name of the device.""" |
| return self.sl4f.device_lib.get_product_name()["result"] |
| |
| def ping( |
| self, |
| dest_ip, |
| count=3, |
| interval=1000, |
| timeout=1000, |
| size=25, |
| additional_ping_params=None, |
| ): |
| """Pings from a Fuchsia device to an IPv4 address or hostname |
| |
| Args: |
| dest_ip: (str) The ip or hostname to ping. |
| count: (int) How many icmp packets to send. |
| interval: (int) How long to wait between pings (ms) |
| timeout: (int) How long to wait before having the icmp packet |
| timeout (ms). |
| size: (int) Size of the icmp packet. |
| additional_ping_params: (str) command option flags to |
| append to the command string |
| |
| Returns: |
| A dictionary for the results of the ping. The dictionary contains |
| the following items: |
| status: Whether the ping was successful. |
| rtt_min: The minimum round trip time of the ping. |
| rtt_max: The minimum round trip time of the ping. |
| rtt_avg: The avg round trip time of the ping. |
| stdout: The standard out of the ping command. |
| stderr: The standard error of the ping command. |
| """ |
| self.log.debug(f"Pinging {dest_ip}...") |
| if not additional_ping_params: |
| additional_ping_params = "" |
| |
| try: |
| ping_result = self.ssh.run( |
| f"ping -c {count} -i {interval} -t {timeout} -s {size} " |
| f"{additional_ping_params} {dest_ip}" |
| ) |
| except SSHError as e: |
| ping_result = e.result |
| |
| result = { |
| "stdout": ping_result.stdout, |
| "stderr": ping_result.stderr, |
| "status": not ping_result.stderr, |
| } |
| |
| if not ping_result.stderr: |
| rtt_line = ping_result.stdout.split("\n")[:-1] |
| rtt_line = rtt_line[-1] |
| rtt_stats = re.search(self.ping_rtt_match, rtt_line) |
| if rtt_stats is None: |
| raise FuchsiaDeviceError(f'Unable to parse ping output: "{rtt_line}"') |
| |
| result["rtt_min"] = rtt_stats.group(1) |
| result["rtt_max"] = rtt_stats.group(2) |
| result["rtt_avg"] = rtt_stats.group(3) |
| |
| return result |
| |
| def can_ping( |
| self, |
| dest_ip, |
| count=1, |
| interval=1000, |
| timeout=1000, |
| size=25, |
| additional_ping_params=None, |
| ) -> bool: |
| """Returns whether fuchsia device can ping a given dest address""" |
| ping_result = self.ping( |
| dest_ip, |
| count=count, |
| interval=interval, |
| timeout=timeout, |
| size=size, |
| additional_ping_params=additional_ping_params, |
| ) |
| return ping_result["status"] |
| |
| def clean_up(self): |
| """Cleans up the FuchsiaDevice object, releases any resources it |
| claimed, and restores saved networks if applicable. For reboots, use |
| clean_up_services only. |
| |
| Note: Any exceptions thrown in this method must be caught and handled, |
| ensuring that clean_up_services is run. Otherwise, the syslog listening |
| thread will never join and will leave tests hanging. |
| """ |
| # If and only if wlan is configured, and using the policy layer |
| if self.association_mechanism == "policy": |
| try: |
| self.wlan_policy_controller.clean_up() |
| except Exception as err: |
| self.log.warning(f"Unable to clean up WLAN Policy layer: {err}") |
| |
| self.stop_services() |
| |
| if self.package_server: |
| self.package_server.clean_up() |
| |
| def get_interface_ip_addresses(self, interface): |
| return get_interface_ip_addresses(self, interface) |
| |
| def wait_for_ipv4_addr(self, interface: str) -> None: |
| """Checks if device has an ipv4 private address. Sleeps 1 second between |
| retries. |
| |
| Args: |
| interface: name of interface from which to get ipv4 address. |
| |
| Raises: |
| ConnectionError, if device does not have an ipv4 address after all |
| timeout. |
| """ |
| self.log.info( |
| f"Checking for valid ipv4 addr. Retry {IP_ADDRESS_TIMEOUT} seconds." |
| ) |
| timeout = time.time() + IP_ADDRESS_TIMEOUT |
| while time.time() < timeout: |
| ip_addrs = self.get_interface_ip_addresses(interface) |
| |
| if len(ip_addrs["ipv4_private"]) > 0: |
| self.log.info( |
| "Device has an ipv4 address: " f"{ip_addrs['ipv4_private'][0]}" |
| ) |
| break |
| else: |
| self.log.debug( |
| "Device does not yet have an ipv4 address...retrying in 1 " |
| "second." |
| ) |
| time.sleep(1) |
| else: |
| raise ConnectionError("Device failed to get an ipv4 address.") |
| |
| def wait_for_ipv6_addr(self, interface: str) -> None: |
| """Checks if device has an ipv6 private local address. Sleeps 1 second |
| between retries. |
| |
| Args: |
| interface: name of interface from which to get ipv6 address. |
| |
| Raises: |
| ConnectionError, if device does not have an ipv6 address after all |
| timeout. |
| """ |
| self.log.info( |
| f"Checking for valid ipv6 addr. Retry {IP_ADDRESS_TIMEOUT} seconds." |
| ) |
| timeout = time.time() + IP_ADDRESS_TIMEOUT |
| while time.time() < timeout: |
| ip_addrs = self.get_interface_ip_addresses(interface) |
| if len(ip_addrs["ipv6_private_local"]) > 0: |
| self.log.info( |
| "Device has an ipv6 private local address: " |
| f"{ip_addrs['ipv6_private_local'][0]}" |
| ) |
| break |
| else: |
| self.log.debug( |
| "Device does not yet have an ipv6 address...retrying in 1 " |
| "second." |
| ) |
| time.sleep(1) |
| else: |
| raise ConnectionError("Device failed to get an ipv6 address.") |
| |
| def check_connect_response(self, connect_response): |
| if connect_response.get("error") is None: |
| # Checks the response from SL4F and if there is no error, check |
| # the result. |
| connection_result = connect_response.get("result") |
| if not connection_result: |
| # Ideally the error would be present but just outputting a log |
| # message until available. |
| self.log.debug("Connect call failed, aborting!") |
| return False |
| else: |
| # Returns True if connection was successful. |
| return True |
| else: |
| # the response indicates an error - log and raise failure |
| self.log.debug( |
| "Aborting! - Connect call failed with error: %s" |
| % connect_response.get("error") |
| ) |
| return False |
| |
| def check_disconnect_response(self, disconnect_response): |
| if disconnect_response.get("error") is None: |
| # Returns True if disconnect was successful. |
| return True |
| else: |
| # the response indicates an error - log and raise failure |
| self.log.debug( |
| f"Disconnect call failed with error: {disconnect_response.get('error')}" |
| ) |
| return False |
| |
| def stop_services(self): |
| """Stops all host-side clients to the Fuchsia device. |
| |
| This is necessary whenever the device's state is unknown. These cases can be |
| found after device reboots, for example. |
| """ |
| self.log.info("Stopping host device services.") |
| del self.wlan_policy_controller |
| del self.sl4f |
| del self.ssh |
| del self.ffx |
| |
| def load_config(self, config): |
| pass |
| |
| def take_bug_report(self, test_name=None, begin_time=None): |
| """Takes a bug report on the device and stores it in a file. |
| |
| Args: |
| test_name: DEPRECATED. Do not specify this argument; it is only used |
| for logging. Name of the test case that triggered this bug |
| report. |
| begin_time: DEPRECATED. Do not specify this argument; it allows |
| overwriting of bug reports when this function is called several |
| times in one test. Epoch time when the test started. If not |
| specified, the current time will be used. |
| """ |
| if test_name: |
| self.log.info(f"Taking snapshot of {self.mdns_name} for {test_name}") |
| else: |
| self.log.info(f"Taking snapshot of {self.mdns_name}") |
| |
| epoch = begin_time if begin_time else utils.get_current_epoch_time() |
| time_stamp = acts_logger.normalize_log_line_timestamp( |
| acts_logger.epoch_to_log_line_timestamp(epoch) |
| ) |
| out_dir = context.get_current_context().get_full_output_path() |
| out_path = os.path.join(out_dir, f"{self.mdns_name}_{time_stamp}.zip") |
| |
| try: |
| with open(out_path, "wb") as file: |
| bytes = self.ssh.run("snapshot").stdout_bytes |
| file.write(bytes) |
| self.log.info(f"Snapshot saved to {out_path}") |
| except Exception as err: |
| self.log.error(f"Failed to take snapshot: {err}") |
| |
| def take_bt_snoop_log(self, custom_name=None): |
| """Takes a the bt-snoop log from the device and stores it in a file |
| in a pcap format. |
| """ |
| bt_snoop_path = context.get_current_context().get_full_output_path() |
| time_stamp = acts_logger.normalize_log_line_timestamp( |
| acts_logger.epoch_to_log_line_timestamp(time.time()) |
| ) |
| out_name = "FuchsiaDevice%s_%s" % ( |
| self.serial, |
| time_stamp.replace(" ", "_").replace(":", "-"), |
| ) |
| out_name = f"{out_name}.pcap" |
| if custom_name: |
| out_name = f"{self.serial}_{custom_name}.pcap" |
| else: |
| out_name = f"{out_name}.pcap" |
| full_out_path = os.path.join(bt_snoop_path, out_name) |
| with open(full_out_path, "wb") as file: |
| bytes = self.ssh.run("bt-snoop-cli -d -f pcap").stdout_bytes |
| file.write(bytes) |