| #!/usr/bin/env python3 |
| # |
| # Copyright 2022 The Fuchsia Authors |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| from __future__ import annotations |
| |
| import enum |
| from typing import Protocol, runtime_checkable |
| |
| from honeydew.typing.wlan import ( |
| ClientStatusConnected, |
| ClientStatusConnecting, |
| ClientStatusIdle, |
| ) |
| from mobly.records import TestResultRecord |
| |
| from antlion.controllers import iperf_client |
| from antlion.controllers.android_device import AndroidDevice |
| from antlion.controllers.ap_lib.hostapd_security import SecurityMode |
| from antlion.controllers.fuchsia_device import FuchsiaDevice |
| from antlion.controllers.fuchsia_lib.lib_controllers.wlan_policy_controller import ( |
| WlanPolicyControllerError, |
| ) |
| from antlion.controllers.iperf_client import IPerfClientBase |
| from antlion.controllers.pdu import PduDevice |
| from antlion.test_utils.wifi import wifi_test_utils as awutils |
| from antlion.utils import PingResult, adb_shell_ping |
| |
| FUCHSIA_VALID_SECURITY_TYPES = {"none", "wep", "wpa", "wpa2", "wpa3"} |
| |
| |
| @runtime_checkable |
| class SupportsWLAN(Protocol): |
| """A generic WLAN device.""" |
| |
| @property |
| def identifier(self) -> str: |
| """Unique identifier for this device.""" |
| ... |
| |
| def take_bug_report(self, record: TestResultRecord) -> None: |
| """Take a bug report on the device and stores it on the host. |
| |
| Will store the bug report in the output directory for the currently running |
| test, as specified by `record`. |
| |
| Args: |
| record: Information about the current running test. |
| """ |
| ... |
| |
| def associate( |
| self, |
| target_ssid: str, |
| target_pwd: str | None = None, |
| key_mgmt: str | None = None, |
| check_connectivity: bool = True, |
| hidden: bool = False, |
| target_security: SecurityMode = SecurityMode.OPEN, |
| ) -> bool: |
| """Associate to a target network. |
| |
| Args: |
| target_ssid: SSID to associate to. |
| target_pwd: Password for the SSID, if necessary. |
| key_mgmt: The hostapd wpa_key_mgmt, if specified. |
| check_connectivity: Whether to check for internet connectivity. |
| hidden: Whether the network is hidden. |
| target_security: Target security for network, used to |
| save the network in policy connects (see wlan_policy_lib) |
| Returns: |
| True if successfully connected to WLAN, False if not. |
| """ |
| ... |
| |
| def disconnect(self) -> None: |
| """Disconnect from all WLAN networks.""" |
| ... |
| |
| def get_default_wlan_test_interface(self) -> str: |
| """Name of default WLAN interface to use for testing.""" |
| ... |
| |
| def is_connected(self, ssid: str | None = None) -> bool: |
| """Determines if wlan_device is connected to wlan network. |
| |
| Args: |
| ssid: If specific, check if device is connect to a specific network. |
| |
| Returns: |
| True if connected to requested network or if ssid not specified connected to |
| any network; otherwise, False. |
| """ |
| ... |
| |
| def can_ping( |
| self, |
| dest_ip: str, |
| count: int = 3, |
| interval: int = 1000, |
| timeout: int = 1000, |
| size: int = 25, |
| additional_ping_params: str | None = None, |
| ) -> bool: |
| """Pings from a device to an IP address or hostname |
| |
| Args: |
| dest_ip: IP or hostname to ping |
| count: How many icmp packets to send |
| interval: Milliseconds to wait between pings |
| timeout: Milliseconds to wait before having the icmp packet timeout |
| size: Size of the icmp packet in bytes |
| additional_ping_params: Command option flags to append to the command string |
| |
| Returns: |
| True if the ping was successful; otherwise, False. |
| """ |
| ... |
| |
| def create_iperf_client(self, test_interface: str | None = None) -> IPerfClientBase: |
| """Create an iPerf3 client on this device. |
| |
| Args: |
| test_interface: Name of test interface. Defaults to first found wlan client |
| interface. |
| |
| Returns: |
| IPerfClient object |
| """ |
| ... |
| |
| def get_wlan_interface_id_list(self) -> list[int]: |
| """List available WLAN interfaces. |
| |
| Returns: |
| A list of wlan interface IDs. |
| """ |
| ... |
| |
| def destroy_wlan_interface(self, iface_id: int) -> None: |
| """Destroy the specified WLAN interface. |
| |
| Args: |
| iface_id: ID of the interface to destroy. |
| """ |
| ... |
| |
| def ping( |
| self, |
| dest_ip: str, |
| count: int = 3, |
| interval: int = 1000, |
| timeout: int = 1000, |
| size: int = 25, |
| additional_ping_params: str | None = None, |
| ) -> PingResult: |
| """Pings from a device to an IP address or hostname |
| |
| Args: |
| dest_ip: IP or hostname to ping |
| count: How many icmp packets to send |
| interval: Milliseconds to wait between pings |
| timeout: Milliseconds to wait before having the icmp packet timeout |
| size: Size of the icmp packet in bytes |
| additional_ping_params: Command option flags to append to the command string |
| |
| Returns: |
| A dictionary for the results of the ping. The dictionary contains |
| the following items: |
| status: Whether the ping was successful. |
| rtt_min: The minimum round trip time of the ping. |
| rtt_max: The minimum round trip time of the ping. |
| rtt_avg: The avg round trip time of the ping. |
| stdout: The standard out of the ping command. |
| stderr: The standard error of the ping command. |
| """ |
| ... |
| |
| def hard_power_cycle(self, pdus: list[PduDevice]) -> None: |
| """Reboot a device abruptly without notification. |
| |
| Args: |
| pdus: All testbed PDUs |
| """ |
| ... |
| |
| def feature_is_present(self, feature: str) -> bool: |
| """Check if a WLAN feature is present. |
| |
| Args: |
| feature: WLAN feature to query |
| |
| Returns: |
| True if `feature` is present; otherwise, False. |
| """ |
| ... |
| |
| def wifi_toggle_state(self, state: bool | None) -> None: |
| """Toggle the state of Wi-Fi. |
| |
| Args: |
| state: Wi-Fi state to set to. If None, opposite of the current state. |
| """ |
| ... |
| |
| def reset_wifi(self) -> None: |
| """Clears all saved Wi-Fi networks on a device. |
| |
| This will turn Wi-Fi on. |
| """ |
| ... |
| |
| def turn_location_off_and_scan_toggle_off(self) -> None: |
| """Turn off Wi-Fi location scans.""" |
| ... |
| |
| |
| class AndroidWlanDevice(SupportsWLAN): |
| """Android device that supports WLAN.""" |
| |
| def __init__(self, android_device: AndroidDevice) -> None: |
| self.device = android_device |
| |
| @property |
| def identifier(self) -> str: |
| return self.device.serial |
| |
| def wifi_toggle_state(self, state: bool | None) -> None: |
| awutils.wifi_toggle_state(self.device, state) |
| |
| def reset_wifi(self) -> None: |
| awutils.reset_wifi(self.device) |
| |
| def take_bug_report(self, record: TestResultRecord) -> None: |
| self.device.take_bug_report(record.test_name, record.begin_time) |
| |
| def turn_location_off_and_scan_toggle_off(self) -> None: |
| awutils.turn_location_off_and_scan_toggle_off(self.device) |
| |
| def associate( |
| self, |
| target_ssid: str, |
| target_pwd: str | None = None, |
| key_mgmt: str | None = None, |
| check_connectivity: bool = True, |
| hidden: bool = False, |
| target_security: SecurityMode = SecurityMode.OPEN, |
| ) -> bool: |
| network = {"SSID": target_ssid, "hiddenSSID": hidden} |
| if target_pwd: |
| network["password"] = target_pwd |
| if key_mgmt: |
| network["security"] = key_mgmt |
| try: |
| awutils.connect_to_wifi_network( |
| self.device, |
| network, |
| check_connectivity=check_connectivity, |
| hidden=hidden, |
| ) |
| return True |
| except Exception as e: |
| self.device.log.info(f"Failed to associated ({e})") |
| return False |
| |
| def disconnect(self) -> None: |
| awutils.turn_location_off_and_scan_toggle_off(self.device) |
| |
| def get_wlan_interface_id_list(self) -> list[int]: |
| raise NotImplementedError("get_wlan_interface_id_list is not implemented") |
| |
| def get_default_wlan_test_interface(self) -> str: |
| return "wlan0" |
| |
| def destroy_wlan_interface(self, iface_id: int) -> None: |
| raise NotImplementedError("destroy_wlan_interface is not implemented") |
| |
| def is_connected(self, ssid: str | None = None) -> bool: |
| wifi_info = self.device.droid.wifiGetConnectionInfo() |
| if ssid: |
| return "BSSID" in wifi_info and wifi_info["SSID"] == ssid |
| return "BSSID" in wifi_info |
| |
| def can_ping( |
| self, |
| dest_ip: str, |
| count: int = 3, |
| interval: int = 1000, |
| timeout: int = 1000, |
| size: int = 25, |
| additional_ping_params: str | None = None, |
| ) -> bool: |
| return adb_shell_ping(self.device, dest_ip, count=count, timeout=timeout) |
| |
| def ping( |
| self, |
| dest_ip: str, |
| count: int = 3, |
| interval: int = 1000, |
| timeout: int = 1000, |
| size: int = 25, |
| additional_ping_params: str | None = None, |
| ) -> PingResult: |
| raise NotImplementedError("ping is not implemented") |
| |
| def hard_power_cycle(self, pdus: list[PduDevice]) -> None: |
| raise NotImplementedError("hard_power_cycle is not implemented") |
| |
| def create_iperf_client(self, test_interface: str | None = None) -> IPerfClientBase: |
| if not test_interface: |
| test_interface = self.get_default_wlan_test_interface() |
| |
| return iperf_client.IPerfClientOverAdb( |
| android_device_or_serial=self.device, test_interface=test_interface |
| ) |
| |
| def feature_is_present(self, feature: str) -> bool: |
| raise NotImplementedError("feature_is_present is not implemented") |
| |
| |
| class AssociationMode(enum.Enum): |
| """Defines which FIDLs to use for WLAN association and disconnect.""" |
| |
| DRIVER = 1 |
| """Call WLAN core FIDLs to provide all association and disconnect.""" |
| POLICY = 2 |
| """Call WLAN policy FIDLs to provide all association and disconnect.""" |
| |
| |
| class FuchsiaWlanDevice(SupportsWLAN): |
| """Fuchsia device that supports WLAN.""" |
| |
| def __init__(self, fuchsia_device: FuchsiaDevice, mode: AssociationMode): |
| self.device = fuchsia_device |
| self.device.configure_wlan() |
| self.association_mode = mode |
| |
| @property |
| def identifier(self) -> str: |
| return self.device.ip |
| |
| def wifi_toggle_state(self, state: bool | None) -> None: |
| pass |
| |
| def reset_wifi(self) -> None: |
| pass |
| |
| def take_bug_report(self, _: TestResultRecord) -> None: |
| self.device.take_bug_report() |
| |
| def turn_location_off_and_scan_toggle_off(self) -> None: |
| pass |
| |
| def associate( |
| self, |
| target_ssid: str, |
| target_pwd: str | None = None, |
| key_mgmt: str | None = None, |
| check_connectivity: bool = True, |
| hidden: bool = False, |
| target_security: SecurityMode = SecurityMode.OPEN, |
| ) -> bool: |
| match self.association_mode: |
| case AssociationMode.DRIVER: |
| ssid_bss_desc_map = self.device.sl4f.wlan_lib.scan_for_bss_info() |
| |
| bss_descs_for_ssid = ssid_bss_desc_map.get(target_ssid, None) |
| if not bss_descs_for_ssid or len(bss_descs_for_ssid) < 1: |
| self.device.log.error( |
| "Scan failed to find a BSS description for target_ssid " |
| f"{target_ssid}" |
| ) |
| return False |
| |
| return self.device.sl4f.wlan_lib.connect( |
| target_ssid, target_pwd, bss_descs_for_ssid[0] |
| ) |
| case AssociationMode.POLICY: |
| try: |
| self.device.wlan_policy_controller.save_and_connect( |
| target_ssid, |
| target_security.fuchsia_security_type(), |
| target_pwd=target_pwd, |
| ) |
| return True |
| except WlanPolicyControllerError as e: |
| self.device.log.error( |
| f"Failed to save and connect to {target_ssid} with " |
| f"error: {e}" |
| ) |
| return False |
| |
| def disconnect(self) -> None: |
| """Function to disconnect from a Fuchsia WLAN device. |
| Asserts if disconnect was not successful. |
| """ |
| match self.association_mode: |
| case AssociationMode.DRIVER: |
| self.device.sl4f.wlan_lib.disconnect() |
| case AssociationMode.POLICY: |
| self.device.sl4f.wlan_policy_lib.remove_all_networks() |
| self.device.wlan_policy_controller.wait_for_no_connections() |
| |
| def can_ping( |
| self, |
| dest_ip: str, |
| count: int = 3, |
| interval: int = 1000, |
| timeout: int = 1000, |
| size: int = 25, |
| additional_ping_params: str | None = None, |
| ) -> bool: |
| return self.device.can_ping( |
| dest_ip, |
| count=count, |
| interval=interval, |
| timeout=timeout, |
| size=size, |
| additional_ping_params=additional_ping_params, |
| ) |
| |
| def ping( |
| self, |
| dest_ip: str, |
| count: int = 3, |
| interval: int = 1000, |
| timeout: int = 1000, |
| size: int = 25, |
| additional_ping_params: str | None = None, |
| ) -> PingResult: |
| return self.device.ping( |
| dest_ip, |
| count=count, |
| interval=interval, |
| timeout=timeout, |
| size=size, |
| additional_ping_params=additional_ping_params, |
| ) |
| |
| def get_wlan_interface_id_list(self) -> list[int]: |
| return self.device.sl4f.wlan_lib.get_iface_id_list() |
| |
| def get_default_wlan_test_interface(self) -> str: |
| if self.device.wlan_client_test_interface_name is None: |
| raise TypeError("Expected wlan_client_test_interface_name to be str") |
| return self.device.wlan_client_test_interface_name |
| |
| def destroy_wlan_interface(self, iface_id: int) -> None: |
| self.device.sl4f.wlan_lib.destroy_iface(iface_id) |
| |
| def is_connected(self, ssid: str | None = None) -> bool: |
| result = self.device.sl4f.wlan_lib.status() |
| match result: |
| case ClientStatusIdle(): |
| self.device.log.info("Client status idle") |
| return False |
| case ClientStatusConnecting(): |
| ssid_bytes = bytearray(result.ssid).decode(encoding="utf-8", errors="replace") |
| self.device.log.info(f"Client status connecting to ssid: {ssid_bytes}") |
| return False |
| case ClientStatusConnected(): |
| ssid_bytes = bytearray(result.ssid).decode(encoding="utf-8", errors="replace") |
| self.device.log.info(f"Client connected to ssid: {ssid_bytes}") |
| return ssid == ssid_bytes |
| case _: |
| raise ValueError( |
| "Status did not return a valid status response: " f"{result}" |
| ) |
| |
| def hard_power_cycle(self, pdus: list[PduDevice]) -> None: |
| self.device.reboot(reboot_type="hard", testbed_pdus=pdus) |
| |
| def create_iperf_client(self, test_interface: str | None = None) -> IPerfClientBase: |
| if not test_interface: |
| test_interface = self.get_default_wlan_test_interface() |
| |
| # A package server is necessary to acquire the iperf3 client for |
| # some builds. |
| self.device.start_package_server() |
| |
| return iperf_client.IPerfClientOverSsh( |
| ssh_provider=self.device.ssh, |
| test_interface=test_interface, |
| ) |
| |
| def feature_is_present(self, feature: str) -> bool: |
| return feature in self.device.wlan_features |
| |
| |
| def create_wlan_device( |
| hardware_device: FuchsiaDevice | AndroidDevice, |
| associate_mode: AssociationMode, |
| ) -> SupportsWLAN: |
| """Creates a generic WLAN device based on type of device that is sent to |
| the functions. |
| |
| Args: |
| hardware_device: A WLAN hardware device that is supported by ACTS. |
| """ |
| device: SupportsWLAN |
| if isinstance(hardware_device, FuchsiaDevice): |
| device = FuchsiaWlanDevice(hardware_device, associate_mode) |
| elif isinstance(hardware_device, AndroidDevice): |
| device = AndroidWlanDevice(hardware_device) |
| else: |
| raise ValueError( |
| f"Unable to create WLAN device for type {type(hardware_device)}" |
| ) |
| |
| assert isinstance(device, SupportsWLAN) |
| return device |