blob: 79a0a28a30d4f1a3a93eedb409158ec2b06fad58 [file] [log] [blame]
#!/usr/bin/env python3
#
# Copyright 2022 The Fuchsia Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
import logging
import os
import re
import textwrap
import time
from typing import Any, Mapping
import fuchsia_controller_py as fuchsia_controller
import honeydew
from honeydew.interfaces.device_classes.fuchsia_device import (
FuchsiaDevice as HdFuchsiaDevice,
)
from honeydew.typing.custom_types import TRANSPORT, FFXConfig
from honeydew.typing.wlan import CountryCode
from antlion import context
from antlion import logger as acts_logger
from antlion import signals, utils
from antlion.capabilities.ssh import DEFAULT_SSH_PORT, SSHConfig, SSHError
from antlion.controllers import pdu
from antlion.controllers.fuchsia_lib.ffx import FFX
from antlion.controllers.fuchsia_lib.lib_controllers.wlan_controller import (
WlanController,
)
from antlion.controllers.fuchsia_lib.lib_controllers.wlan_policy_controller import (
WlanPolicyController,
)
from antlion.controllers.fuchsia_lib.package_server import PackageServer
from antlion.controllers.fuchsia_lib.sl4f import SL4F
from antlion.controllers.fuchsia_lib.ssh import (
DEFAULT_SSH_PRIVATE_KEY,
DEFAULT_SSH_USER,
FuchsiaSSHProvider,
)
from antlion.controllers.fuchsia_lib.utils_lib import flash
from antlion.decorators import cached_property
from antlion.utils import (
PingResult,
get_fuchsia_mdns_ipv6_address,
get_interface_ip_addresses,
)
from antlion.validation import MapValidator
MOBLY_CONTROLLER_CONFIG_NAME = "FuchsiaDevice"
ACTS_CONTROLLER_REFERENCE_NAME = "fuchsia_devices"
FUCHSIA_RECONNECT_AFTER_REBOOT_TIME = 5
FUCHSIA_REBOOT_TYPE_SOFT = "soft"
FUCHSIA_REBOOT_TYPE_SOFT_AND_FLASH = "flash"
FUCHSIA_REBOOT_TYPE_HARD = "hard"
FUCHSIA_DEFAULT_CONNECT_TIMEOUT = 90
FUCHSIA_DEFAULT_COMMAND_TIMEOUT = 60
FUCHSIA_DEFAULT_CLEAN_UP_COMMAND_TIMEOUT = 15
FUCHSIA_COUNTRY_CODE_TIMEOUT = 15
FUCHSIA_DEFAULT_COUNTRY_CODE_US = "US"
MDNS_LOOKUP_RETRY_MAX = 3
# Duration to wait for the Fuchsia device to acquire an IP address after
# requested to join a network.
#
# Acquiring an IP address after connecting to a WLAN network could take up to
# 15 seconds if we get unlucky:
#
# 1. An outgoing passive scan just started (~7s)
# 2. An active scan is queued for the newly saved network (~7s)
# 3. The initial connection attempt fails (~1s)
IP_ADDRESS_TIMEOUT = 30
class FuchsiaDeviceError(signals.ControllerError):
pass
class FuchsiaConfigError(signals.ControllerError):
"""Incorrect FuchsiaDevice configuration."""
def create(configs: object) -> list[FuchsiaDevice]:
if not configs:
raise FuchsiaDeviceError("Configuration is empty, abort!")
elif not isinstance(configs, list):
raise FuchsiaDeviceError("Configuration should be a list, abort!")
for index, config in enumerate(configs):
if isinstance(config, str):
configs[index] = {"ip": config}
elif not isinstance(config, dict):
raise FuchsiaDeviceError(
"Fuchsia device config must be either a str or dict. abort! "
f"Invalid element {index} in {configs}"
)
return [FuchsiaDevice(config) for config in configs]
def destroy(fds: list[FuchsiaDevice]) -> None:
for fd in fds:
fd.clean_up()
del fd
def get_info(fds: list[FuchsiaDevice]) -> list[dict[str, str]]:
"""Get information on a list of FuchsiaDevice objects."""
device_info = []
for fd in fds:
info = {"ip": fd.ip}
device_info.append(info)
return device_info
class FuchsiaDevice:
"""Class representing a Fuchsia device.
Each object of this class represents one Fuchsia device in ACTS.
Attributes:
ip: The full address or Fuchsia abstract name to contact the Fuchsia
device at
log: A logger object.
ssh_port: The SSH TCP port number of the Fuchsia device.
sl4f_port: The SL4F HTTP port number of the Fuchsia device.
ssh_config: The ssh_config for connecting to the Fuchsia device.
"""
def __init__(self, fd_conf_data: Mapping[str, Any]) -> None:
self.conf_data = fd_conf_data
config = MapValidator(fd_conf_data)
if "ip" not in fd_conf_data:
raise FuchsiaDeviceError("No IP address specified, abort!")
self.ip = config.get(str, "ip")
self.orig_ip = self.ip
self.sl4f_port = config.get(int, "sl4f_port", 80)
self.ssh_username = config.get(str, "ssh_username", DEFAULT_SSH_USER)
self.ssh_port = config.get(int, "ssh_port", DEFAULT_SSH_PORT)
self.ssh_binary_path = config.get(str, "ssh_binary_path", "ssh")
def expand(path: str) -> str:
return os.path.expandvars(os.path.expanduser(path))
def path_from_config(name: str, default: str | None = None) -> str | None:
path = config.get(str, name, default)
return None if path is None else expand(path)
def assert_exists(name: str, path: str | None) -> None:
if path is None:
raise FuchsiaDeviceError(
f'Please specify "${name}" in your configuration file'
)
if not os.path.exists(path):
raise FuchsiaDeviceError(
f'Please specify a correct "${name}" in your configuration '
f'file: "{path}" does not exist'
)
self.specific_image: str | None = path_from_config("specific_image")
if self.specific_image:
assert_exists("specific_image", self.specific_image)
# Path to a tar.gz archive with pm and amber-files, as necessary for
# starting a package server.
self.packages_archive_path: str | None = path_from_config(
"packages_archive_path"
)
if self.packages_archive_path:
assert_exists("packages_archive_path", self.packages_archive_path)
def required_path_from_config(name: str, default: str | None = None) -> str:
path = path_from_config(name, default)
if path is None:
raise FuchsiaConfigError(f"{name} is a required config field")
assert_exists(name, path)
return path
self.ssh_priv_key: str = required_path_from_config(
"ssh_priv_key", DEFAULT_SSH_PRIVATE_KEY
)
self.ffx_binary_path: str = required_path_from_config(
"ffx_binary_path", "${FUCHSIA_DIR}/.jiri_root/bin/ffx"
)
self.ffx_subtools_search_path: str | None = path_from_config(
"ffx_subtools_search_path"
)
self.authorized_file = config.get(str, "authorized_file_loc", None)
self.serial_number = config.get(str, "serial_number", None)
self.device_type = config.get(str, "device_type", None)
self.product_type = config.get(str, "product_type", None)
self.board_type = config.get(str, "board_type", None)
self.build_number = config.get(str, "build_number", None)
self.build_type = config.get(str, "build_type", None)
self.mdns_name = config.get(str, "mdns_name", None)
self.enable_honeydew = config.get(bool, "enable_honeydew", False)
self.hard_reboot_on_fail = config.get(bool, "hard_reboot_on_fail", False)
self.take_bug_report_on_fail = config.get(
bool, "take_bug_report_on_fail", False
)
self.device_pdu_config = fd_conf_data.get("PduDevice", None)
self.config_country_code = config.get(
str, "country_code", FUCHSIA_DEFAULT_COUNTRY_CODE_US
).upper()
output_path = context.get_current_context().get_base_output_path()
self.ssh_config = os.path.join(output_path, f"ssh_config_{self.ip}")
self._generate_ssh_config(self.ssh_config)
# WLAN interface info is populated inside configure_wlan
self.wlan_client_interfaces: dict[str, Any] = {}
self.wlan_ap_interfaces: dict[str, Any] = {}
self.wlan_client_test_interface_name = config.get(
str, "wlan_client_test_interface", None
)
self.wlan_ap_test_interface_name = config.get(
str, "wlan_ap_test_interface", None
)
self.wlan_features: list[str] = fd_conf_data.get("wlan_features", [])
# Whether to use 'policy' or 'drivers' for WLAN connect/disconnect calls
# If set to None, wlan is not configured.
self.association_mechanism: str | None = None
# Defaults to policy layer, unless otherwise specified in the config
self.default_association_mechanism = config.get(
str, "association_mechanism", "policy"
)
# Whether to clear and preserve existing saved networks and client
# connections state, to be restored at device teardown.
self.default_preserve_saved_networks = config.get(
bool, "preserve_saved_networks", True
)
if not utils.is_valid_ipv4_address(self.ip) and not utils.is_valid_ipv6_address(
self.ip
):
mdns_ip = None
for retry_counter in range(MDNS_LOOKUP_RETRY_MAX):
mdns_ip = get_fuchsia_mdns_ipv6_address(self.ip)
if mdns_ip:
break
else:
time.sleep(1)
if mdns_ip and utils.is_valid_ipv6_address(mdns_ip):
# self.ip was actually an mdns name. Use it for self.mdns_name
# unless one was explicitly provided.
self.mdns_name = self.mdns_name or self.ip
self.ip = mdns_ip
else:
raise ValueError(f"Invalid IP: {self.ip}")
self.log = acts_logger.create_tagged_trace_logger(
f"FuchsiaDevice | {self.orig_ip}"
)
self.ping_rtt_match = re.compile(
r"RTT Min/Max/Avg = \[ ([0-9.]+) / ([0-9.]+) / ([0-9.]+) \] ms"
)
self.serial = re.sub("[.:%]", "_", self.ip)
self.package_server: PackageServer | None = None
# Create honeydew fuchsia_device if the flag "use_honeydew" is True.
self.honeydew_fd: HdFuchsiaDevice | None = None
if self.enable_honeydew:
if not self.mdns_name:
raise FuchsiaConfigError(
'Must provide "mdns_name: <device mDNS name>" in the device config '
"if use_honeydew is True"
)
hd_ffx_config = FFXConfig(
binary_path=self.ffx_binary_path,
isolate_dir=fuchsia_controller.IsolateDir(None),
# TODO(http://b/324454126): Remove type ignore
logs_dir=f"{logging.log_path}/ffx/", # type: ignore[attr-defined]
logs_level="None",
mdns_enabled=False,
subtools_search_path=None,
)
self.honeydew_fd = honeydew.create_device(
device_name=self.mdns_name,
transport=TRANSPORT.FUCHSIA_CONTROLLER_PREFERRED,
ffx_config=hd_ffx_config,
ssh_private_key=self.ssh_priv_key,
)
@cached_property
def sl4f(self) -> SL4F:
"""Get the sl4f module configured for this device."""
self.log.info("Started SL4F server")
return SL4F(self.ssh, self.sl4f_port, self.honeydew_fd)
@cached_property
def ssh(self) -> FuchsiaSSHProvider:
"""Get the SSH provider module configured for this device."""
if not self.ssh_port:
raise FuchsiaConfigError(
'Must provide "ssh_port: <int>" in the device config'
)
if not self.ssh_priv_key:
raise FuchsiaConfigError(
'Must provide "ssh_priv_key: <file path>" in the device config'
)
return FuchsiaSSHProvider(
SSHConfig(
self.ssh_username,
self.ip,
self.ssh_priv_key,
port=self.ssh_port,
ssh_binary=self.ssh_binary_path,
)
)
@cached_property
def ffx(self) -> FFX:
"""Get the ffx module configured for this device.
The ffx module uses lazy-initialization; it will initialize an ffx
connection to the device when it is required.
If ffx needs to be reinitialized, delete the "ffx" property and attempt
access again. Note re-initialization will interrupt any running ffx
calls.
"""
if not self.mdns_name:
raise FuchsiaConfigError(
'Must provide "mdns_name: <device mDNS name>" in the device config'
)
return FFX(
self.ffx_binary_path,
self.mdns_name,
self.ip,
self.ssh_priv_key,
self.ffx_subtools_search_path,
)
@ffx.deleter
# TODO(https://github.com/python/mypy/issues/11008): Rename to ffx
def ffx_deleter(self, ffx: FFX) -> None:
self.log.debug("Cleaning up ffx")
ffx.clean_up()
@cached_property
def wlan_policy_controller(self) -> WlanPolicyController:
return WlanPolicyController(self.sl4f, self.ssh)
@cached_property
def wlan_controller(self) -> WlanController:
return WlanController(self.sl4f)
def _generate_ssh_config(self, file_path: str) -> None:
"""Generate and write an SSH config for Fuchsia to disk.
Args:
file_path: Path to write the generated SSH config
"""
content = textwrap.dedent(
f"""\
Host *
CheckHostIP no
StrictHostKeyChecking no
ForwardAgent no
ForwardX11 no
GSSAPIDelegateCredentials no
UserKnownHostsFile /dev/null
User fuchsia
IdentitiesOnly yes
IdentityFile {self.ssh_priv_key}
ControlPersist yes
ControlMaster auto
ControlPath /tmp/fuchsia--%r@%h:%p
ServerAliveInterval 1
ServerAliveCountMax 1
LogLevel ERROR
"""
)
with open(file_path, "w", encoding="utf-8") as file:
file.write(content)
def start_package_server(self) -> None:
if not self.packages_archive_path:
self.log.warn(
"packages_archive_path is not specified. "
"Assuming a package server is already running and configured on "
"the DUT. If this is not the case, either run your own package "
"server, or configure these fields appropriately. "
"This is usually required for the Fuchsia iPerf3 client or "
"other testing utilities not on device cache."
)
return
if self.package_server:
self.log.warn(
"Skipping to start the package server since is already running"
)
return
self.package_server = PackageServer(self.packages_archive_path)
self.package_server.start()
self.package_server.configure_device(self.ssh)
def update_wlan_interfaces(self) -> None:
"""Retrieves WLAN interfaces from device and sets the FuchsiaDevice
attributes.
"""
wlan_interfaces = self.wlan_controller.get_interfaces_by_role()
self.wlan_client_interfaces = wlan_interfaces.client
self.wlan_ap_interfaces = wlan_interfaces.ap
# Set test interfaces to value from config, else the first found
# interface, else None
if self.wlan_client_test_interface_name is None:
self.wlan_client_test_interface_name = next(
iter(self.wlan_client_interfaces), None
)
if self.wlan_ap_test_interface_name is None:
self.wlan_ap_test_interface_name = next(iter(self.wlan_ap_interfaces), None)
def configure_wlan(
self,
association_mechanism: str | None = None,
preserve_saved_networks: bool | None = None,
) -> None:
"""
Readies device for WLAN functionality. If applicable, connects to the
policy layer and clears/saves preexisting saved networks.
Args:
association_mechanism: either 'policy' or 'drivers'. If None, uses
the default value from init (can be set by ACTS config)
preserve_saved_networks: whether to clear existing saved
networks, and preserve them for restoration later. If None, uses
the default value from init (can be set by ACTS config)
Raises:
FuchsiaDeviceError, if configuration fails
"""
self.wlan_controller.set_country_code(CountryCode(self.config_country_code))
# If args aren't provided, use the defaults, which can be set in the
# config.
if association_mechanism is None:
association_mechanism = self.default_association_mechanism
if preserve_saved_networks is None:
preserve_saved_networks = self.default_preserve_saved_networks
if association_mechanism not in {None, "policy", "drivers"}:
raise FuchsiaDeviceError(
f"Invalid FuchsiaDevice association_mechanism: {association_mechanism}"
)
# Allows for wlan to be set up differently in different tests
if self.association_mechanism:
self.log.info("Deconfiguring WLAN")
self.deconfigure_wlan()
self.association_mechanism = association_mechanism
self.log.info(
"Configuring WLAN w/ association mechanism: " f"{association_mechanism}"
)
if association_mechanism == "drivers":
self.log.warn(
"You may encounter unusual device behavior when using the "
"drivers directly for WLAN. This should be reserved for "
"debugging specific issues. Normal test runs should use the "
"policy layer."
)
if preserve_saved_networks:
self.log.warn(
"Unable to preserve saved networks when using drivers "
"association mechanism (requires policy layer control)."
)
else:
# This requires SL4F calls, so it can only happen with actual
# devices, not with unit tests.
self.wlan_policy_controller.configure_wlan(preserve_saved_networks)
# Retrieve WLAN client and AP interfaces
self.update_wlan_interfaces()
def deconfigure_wlan(self) -> None:
"""
Stops WLAN functionality (if it has been started). Used to allow
different tests to use WLAN differently (e.g. some tests require using
wlan policy, while the abstract wlan_device can be setup to use policy
or drivers)
Raises:
FuchsiaDeviveError, if deconfigure fails.
"""
if not self.association_mechanism:
self.log.debug("WLAN not configured before deconfigure was called.")
return
# If using policy, stop client connections. Otherwise, just clear
# variables.
if self.association_mechanism != "drivers":
self.wlan_policy_controller._deconfigure_wlan()
self.association_mechanism = None
def reboot(
self,
use_ssh: bool = False,
unreachable_timeout: int = FUCHSIA_DEFAULT_CONNECT_TIMEOUT,
reboot_type: str = FUCHSIA_REBOOT_TYPE_SOFT,
testbed_pdus: list[pdu.PduDevice] | None = None,
) -> None:
"""Reboot a FuchsiaDevice.
Soft reboots the device, verifies it becomes unreachable, then verifies
it comes back online. Re-initializes services so the tests can continue.
Args:
use_ssh: if True, use fuchsia shell command via ssh to reboot
instead of SL4F.
unreachable_timeout: time to wait for device to become unreachable.
reboot_type: 'soft', 'hard' or 'flash'.
testbed_pdus: all testbed PDUs.
Raises:
ConnectionError, if device fails to become unreachable or fails to
come back up.
"""
if reboot_type == FUCHSIA_REBOOT_TYPE_SOFT:
if use_ssh:
self.log.info("Soft rebooting via SSH")
try:
self.ssh.run(
"dm reboot", timeout_sec=FUCHSIA_RECONNECT_AFTER_REBOOT_TIME
)
except SSHError as e:
if "closed by remote host" not in e.result.stderr:
raise e
else:
self.log.info("Soft rebooting via SL4F")
self.sl4f.hardware_power_statecontrol_lib.suspendReboot(timeout=3)
self.ssh.wait_until_unreachable(timeout_sec=unreachable_timeout)
elif reboot_type == FUCHSIA_REBOOT_TYPE_HARD:
self.log.info("Hard rebooting via PDU")
if not testbed_pdus:
raise AttributeError(
"Testbed PDUs must be supplied " "to hard reboot a fuchsia_device."
)
device_pdu, device_pdu_port = pdu.get_pdu_port_for_device(
self.device_pdu_config, testbed_pdus
)
self.log.info("Killing power to FuchsiaDevice")
device_pdu.off(device_pdu_port)
self.ssh.wait_until_unreachable(timeout_sec=unreachable_timeout)
self.log.info("Restoring power to FuchsiaDevice")
device_pdu.on(device_pdu_port)
elif reboot_type == FUCHSIA_REBOOT_TYPE_SOFT_AND_FLASH:
flash(self, use_ssh, FUCHSIA_RECONNECT_AFTER_REBOOT_TIME)
else:
raise ValueError(f"Invalid reboot type: {reboot_type}")
# Cleanup services
self.stop_services()
# TODO(http://b/246852449): Move configure_wlan to other controllers.
# If wlan was configured before reboot, it must be configured again
# after rebooting, as it was before reboot. No preserving should occur.
if self.association_mechanism:
pre_reboot_association_mechanism = self.association_mechanism
# Prevent configure_wlan from thinking it needs to deconfigure first
self.association_mechanism = None
self.configure_wlan(
association_mechanism=pre_reboot_association_mechanism,
preserve_saved_networks=False,
)
self.log.info("Device has rebooted")
def version(self) -> str:
"""Return the version of Fuchsia running on the device."""
resp = self.sl4f.device_lib.get_version()
return MapValidator(resp).get(str, "result")
def device_name(self) -> str:
"""Return the name of the device."""
resp = self.sl4f.device_lib.get_device_name()
return MapValidator(resp).get(str, "result")
def product_name(self) -> str:
"""Return the product name of the device."""
resp = self.sl4f.device_lib.get_product_name()
return MapValidator(resp).get(str, "result")
def ping(
self,
dest_ip: str,
count: int = 3,
interval: int = 1000,
timeout: int = 1000,
size: int = 25,
additional_ping_params: str | None = None,
) -> PingResult:
"""Pings from a Fuchsia device to an IPv4 address or hostname
Args:
dest_ip: (str) The ip or hostname to ping.
count: (int) How many icmp packets to send.
interval: (int) How long to wait between pings (ms)
timeout: (int) How long to wait before having the icmp packet
timeout (ms).
size: (int) Size of the icmp packet.
additional_ping_params: (str) command option flags to
append to the command string
Returns:
A dictionary for the results of the ping. The dictionary contains
the following items:
status: Whether the ping was successful.
rtt_min: The minimum round trip time of the ping.
rtt_max: The minimum round trip time of the ping.
rtt_avg: The avg round trip time of the ping.
stdout: The standard out of the ping command.
stderr: The standard error of the ping command.
"""
self.log.debug(f"Pinging {dest_ip}...")
if not additional_ping_params:
additional_ping_params = ""
try:
ping_result = self.ssh.run(
f"ping -c {count} -i {interval} -t {timeout} -s {size} "
f"{additional_ping_params} {dest_ip}"
)
except SSHError as e:
ping_result = e.result
rtt_stats: re.Match[str] | None = None
if not ping_result.stderr:
rtt_lines = ping_result.stdout.split("\n")[:-1]
rtt_line = rtt_lines[-1]
rtt_stats = re.search(self.ping_rtt_match, rtt_line)
if rtt_stats is None:
raise FuchsiaDeviceError(f'Unable to parse ping output: "{rtt_line}"')
return PingResult(
exit_status=ping_result.exit_status,
stdout=ping_result.stdout,
stderr=ping_result.stderr,
transmitted=None,
received=None,
time_ms=None,
rtt_min_ms=float(rtt_stats.group(1)) if rtt_stats else None,
rtt_avg_ms=float(rtt_stats.group(3)) if rtt_stats else None,
rtt_max_ms=float(rtt_stats.group(2)) if rtt_stats else None,
rtt_mdev_ms=None,
)
def can_ping(
self,
dest_ip: str,
count: int = 1,
interval: int = 1000,
timeout: int = 1000,
size: int = 25,
additional_ping_params: str | None = None,
) -> bool:
"""Returns whether fuchsia device can ping a given dest address"""
ping_result = self.ping(
dest_ip,
count=count,
interval=interval,
timeout=timeout,
size=size,
additional_ping_params=additional_ping_params,
)
return ping_result.exit_status == 0
def clean_up(self) -> None:
"""Cleans up the FuchsiaDevice object, releases any resources it
claimed, and restores saved networks if applicable. For reboots, use
clean_up_services only.
Note: Any exceptions thrown in this method must be caught and handled,
ensuring that clean_up_services is run. Otherwise, the syslog listening
thread will never join and will leave tests hanging.
"""
# If and only if wlan is configured, and using the policy layer
if self.association_mechanism == "policy":
try:
self.wlan_policy_controller.clean_up()
except Exception as err:
self.log.warning(f"Unable to clean up WLAN Policy layer: {err}")
self.stop_services()
if self.package_server:
self.package_server.clean_up()
def get_interface_ip_addresses(self, interface: str) -> dict[str, list[str]]:
return get_interface_ip_addresses(self, interface)
def wait_for_ipv4_addr(self, interface: str) -> None:
"""Checks if device has an ipv4 private address. Sleeps 1 second between
retries.
Args:
interface: name of interface from which to get ipv4 address.
Raises:
ConnectionError, if device does not have an ipv4 address after all
timeout.
"""
self.log.info(
f"Checking for valid ipv4 addr. Retry {IP_ADDRESS_TIMEOUT} seconds."
)
timeout = time.time() + IP_ADDRESS_TIMEOUT
while time.time() < timeout:
ip_addrs = self.get_interface_ip_addresses(interface)
if len(ip_addrs["ipv4_private"]) > 0:
self.log.info(
"Device has an ipv4 address: " f"{ip_addrs['ipv4_private'][0]}"
)
break
else:
self.log.debug(
"Device does not yet have an ipv4 address...retrying in 1 "
"second."
)
time.sleep(1)
else:
raise ConnectionError("Device failed to get an ipv4 address.")
def wait_for_ipv6_addr(self, interface: str) -> None:
"""Checks if device has an ipv6 private local address. Sleeps 1 second
between retries.
Args:
interface: name of interface from which to get ipv6 address.
Raises:
ConnectionError, if device does not have an ipv6 address after all
timeout.
"""
self.log.info(
f"Checking for valid ipv6 addr. Retry {IP_ADDRESS_TIMEOUT} seconds."
)
timeout = time.time() + IP_ADDRESS_TIMEOUT
while time.time() < timeout:
ip_addrs = self.get_interface_ip_addresses(interface)
if len(ip_addrs["ipv6_private_local"]) > 0:
self.log.info(
"Device has an ipv6 private local address: "
f"{ip_addrs['ipv6_private_local'][0]}"
)
break
else:
self.log.debug(
"Device does not yet have an ipv6 address...retrying in 1 "
"second."
)
time.sleep(1)
else:
raise ConnectionError("Device failed to get an ipv6 address.")
def stop_services(self) -> None:
"""Stops all host-side clients to the Fuchsia device.
This is necessary whenever the device's state is unknown. These cases can be
found after device reboots, for example.
"""
self.log.info("Stopping host device services.")
del self.wlan_policy_controller
del self.wlan_controller
del self.sl4f
del self.ssh
del self.ffx
def take_bug_report(self) -> None:
"""Takes a bug report on the device and stores it in a file."""
self.log.info(f"Taking snapshot of {self.mdns_name}")
time_stamp = acts_logger.normalize_log_line_timestamp(
acts_logger.epoch_to_log_line_timestamp(utils.get_current_epoch_time())
)
out_dir = context.get_current_context().get_full_output_path()
out_path = os.path.join(out_dir, f"{self.mdns_name}_{time_stamp}.zip")
try:
with open(out_path, "wb") as file:
bytes = self.ssh.run("snapshot").stdout_bytes
file.write(bytes)
self.log.info(f"Snapshot saved to {out_path}")
except Exception as err:
self.log.error(f"Failed to take snapshot: {err}")
def take_bt_snoop_log(self, custom_name: str | None = None) -> None:
"""Takes a the bt-snoop log from the device and stores it in a file
in a pcap format.
"""
bt_snoop_path = context.get_current_context().get_full_output_path()
time_stamp = acts_logger.normalize_log_line_timestamp(
acts_logger.epoch_to_log_line_timestamp(time.time())
)
out_name = "FuchsiaDevice%s_%s" % (
self.serial,
time_stamp.replace(" ", "_").replace(":", "-"),
)
out_name = f"{out_name}.pcap"
if custom_name:
out_name = f"{self.serial}_{custom_name}.pcap"
else:
out_name = f"{out_name}.pcap"
full_out_path = os.path.join(bt_snoop_path, out_name)
with open(full_out_path, "wb") as file:
bytes = self.ssh.run("bt-snoop-cli -d -f pcap").stdout_bytes
file.write(bytes)