| #!/usr/bin/env python3 |
| # |
| # Copyright 2022 The Fuchsia Authors |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| import logging |
| from typing import Any, Dict, List, Optional, Protocol, Union, runtime_checkable |
| |
| from mobly.records import TestResultRecord |
| |
| from antlion.controllers import iperf_client |
| from antlion.controllers.android_device import AndroidDevice |
| from antlion.controllers.ap_lib.hostapd_security import SecurityMode |
| from antlion.controllers.fuchsia_device import FuchsiaDevice |
| from antlion.controllers.iperf_client import IPerfClientBase |
| from antlion.controllers.pdu import PduDevice |
| from antlion.test_utils.wifi import wifi_test_utils as awutils |
| from antlion.utils import adb_shell_ping |
| |
| FUCHSIA_VALID_SECURITY_TYPES = {"none", "wep", "wpa", "wpa2", "wpa3"} |
| |
| |
| @runtime_checkable |
| class SupportsWLAN(Protocol): |
| """A generic WLAN device.""" |
| |
| @property |
| def identifier(self) -> str: |
| """Unique identifier for this device.""" |
| ... |
| |
| def take_bug_report(self, record: TestResultRecord) -> None: |
| """Take a bug report on the device and stores it on the host. |
| |
| Will store the bug report in the output directory for the currently running |
| test, as specified by `record`. |
| |
| Args: |
| record: Information about the current running test. |
| """ |
| ... |
| |
| def associate( |
| self, |
| target_ssid: str, |
| target_pwd: Optional[str] = None, |
| key_mgmt: Optional[str] = None, |
| check_connectivity: bool = True, |
| hidden: bool = False, |
| target_security: SecurityMode = SecurityMode.OPEN, |
| ) -> bool: |
| """Associate to a target network. |
| |
| Args: |
| target_ssid: SSID to associate to. |
| target_pwd: Password for the SSID, if necessary. |
| key_mgmt: The hostapd wpa_key_mgmt, if specified. |
| check_connectivity: Whether to check for internet connectivity. |
| hidden: Whether the network is hidden. |
| target_security: Target security for network, used to |
| save the network in policy connects (see wlan_policy_lib) |
| Returns: |
| True if successfully connected to WLAN, False if not. |
| """ |
| ... |
| |
| def disconnect(self) -> None: |
| """Disconnect from all WLAN networks.""" |
| ... |
| |
| def get_default_wlan_test_interface(self) -> str: |
| """Name of default WLAN interface to use for testing.""" |
| ... |
| |
| def is_connected(self, ssid: Optional[str] = None) -> bool: |
| """Determines if wlan_device is connected to wlan network. |
| |
| Args: |
| ssid: If specific, check if device is connect to a specific network. |
| |
| Returns: |
| True if connected to requested network; otherwise, False. |
| """ |
| ... |
| |
| def can_ping( |
| self, |
| dest_ip: str, |
| count: int = 3, |
| interval: int = 1000, |
| timeout: int = 1000, |
| size: int = 25, |
| additional_ping_params: Optional[str] = None, |
| ) -> bool: |
| """Pings from a device to an IP address or hostname |
| |
| Args: |
| dest_ip: IP or hostname to ping |
| count: How many icmp packets to send |
| interval: Milliseconds to wait between pings |
| timeout: Milliseconds to wait before having the icmp packet timeout |
| size: Size of the icmp packet in bytes |
| additional_ping_params: Command option flags to append to the command string |
| |
| Returns: |
| True if the ping was successful; otherwise, False. |
| """ |
| ... |
| |
| def create_iperf_client( |
| self, test_interface: Optional[str] = None |
| ) -> IPerfClientBase: |
| """Create an iPerf3 client on this device. |
| |
| Args: |
| test_interface: Name of test interface. Defaults to first found wlan client |
| interface. |
| |
| Returns: |
| IPerfClient object |
| """ |
| ... |
| |
| def get_wlan_interface_id_list(self) -> List[str]: |
| """List available WLAN interfaces. |
| |
| Returns: |
| A list of wlan interface IDs. |
| """ |
| ... |
| |
| def destroy_wlan_interface(self, iface_id: str) -> bool: |
| """Destroy the specified WLAN interface. |
| |
| Args: |
| iface_id: ID of the interface to destroy. |
| |
| Returns: |
| True if successfully destroyed wlan interface, False if not. |
| """ |
| ... |
| |
| def ping( |
| self, |
| dest_ip: str, |
| count: int = 3, |
| interval: int = 1000, |
| timeout: int = 1000, |
| size: int = 25, |
| additional_ping_params: Optional[str] = None, |
| ) -> Dict[str, Any]: |
| """Pings from a device to an IP address or hostname |
| |
| Args: |
| dest_ip: IP or hostname to ping |
| count: How many icmp packets to send |
| interval: Milliseconds to wait between pings |
| timeout: Milliseconds to wait before having the icmp packet timeout |
| size: Size of the icmp packet in bytes |
| additional_ping_params: Command option flags to append to the command string |
| |
| Returns: |
| A dictionary for the results of the ping. The dictionary contains |
| the following items: |
| status: Whether the ping was successful. |
| rtt_min: The minimum round trip time of the ping. |
| rtt_max: The minimum round trip time of the ping. |
| rtt_avg: The avg round trip time of the ping. |
| stdout: The standard out of the ping command. |
| stderr: The standard error of the ping command. |
| """ |
| ... |
| |
| def hard_power_cycle(self, pdus: List[PduDevice]) -> None: |
| """Reboot a device abruptly without notification. |
| |
| Args: |
| pdus: All testbed PDUs |
| """ |
| ... |
| |
| def feature_is_present(self, feature: str) -> bool: |
| """Check if a WLAN feature is present. |
| |
| Args: |
| feature: WLAN feature to query |
| |
| Returns: |
| True if `feature` is present; otherwise, False. |
| """ |
| ... |
| |
| def wifi_toggle_state(self, state: Optional[bool]) -> None: |
| """Toggle the state of Wi-Fi. |
| |
| Args: |
| state: Wi-Fi state to set to. If None, opposite of the current state. |
| """ |
| ... |
| |
| def reset_wifi(self) -> None: |
| """Clears all saved Wi-Fi networks on a device. |
| |
| This will turn Wi-Fi on. |
| """ |
| ... |
| |
| def turn_location_off_and_scan_toggle_off(self) -> None: |
| """Turn off Wi-Fi location scans.""" |
| ... |
| |
| |
| class AndroidWlanDevice(SupportsWLAN): |
| """Android device that supports WLAN.""" |
| |
| def __init__(self, android_device: AndroidDevice) -> None: |
| self.device = android_device |
| |
| @property |
| def identifier(self) -> str: |
| return self.device.serial |
| |
| def wifi_toggle_state(self, state: Optional[bool]) -> None: |
| awutils.wifi_toggle_state(self.device, state) |
| |
| def reset_wifi(self) -> None: |
| awutils.reset_wifi(self.device) |
| |
| def take_bug_report(self, record: TestResultRecord) -> None: |
| self.device.take_bug_report(record.test_name, record.begin_time) |
| |
| def turn_location_off_and_scan_toggle_off(self) -> None: |
| awutils.turn_location_off_and_scan_toggle_off(self.device) |
| |
| def associate( |
| self, |
| target_ssid: str, |
| target_pwd: Optional[str] = None, |
| key_mgmt: Optional[str] = None, |
| check_connectivity: bool = True, |
| hidden: bool = False, |
| target_security: SecurityMode = SecurityMode.OPEN, |
| ) -> bool: |
| network = {"SSID": target_ssid, "hiddenSSID": hidden} |
| if target_pwd: |
| network["password"] = target_pwd |
| if key_mgmt: |
| network["security"] = key_mgmt |
| try: |
| awutils.connect_to_wifi_network( |
| self.device, |
| network, |
| check_connectivity=check_connectivity, |
| hidden=hidden, |
| ) |
| return True |
| except Exception as e: |
| self.device.log.info(f"Failed to associated ({e})") |
| return False |
| |
| def disconnect(self) -> None: |
| awutils.turn_location_off_and_scan_toggle_off(self.device) |
| |
| def get_wlan_interface_id_list(self) -> List[str]: |
| raise NotImplementedError("get_wlan_interface_id_list is not implemented") |
| |
| def get_default_wlan_test_interface(self) -> str: |
| return "wlan0" |
| |
| def destroy_wlan_interface(self, iface_id: str) -> bool: |
| raise NotImplementedError("destroy_wlan_interface is not implemented") |
| |
| def is_connected(self, ssid: Optional[str] = None) -> bool: |
| wifi_info = self.device.droid.wifiGetConnectionInfo() |
| if ssid: |
| return "BSSID" in wifi_info and wifi_info["SSID"] == ssid |
| return "BSSID" in wifi_info |
| |
| def can_ping( |
| self, |
| dest_ip: str, |
| count: int = 3, |
| interval: int = 1000, |
| timeout: int = 1000, |
| size: int = 25, |
| additional_ping_params: Optional[str] = None, |
| ) -> bool: |
| return adb_shell_ping( |
| self.device, dest_ip=dest_ip, count=count, timeout=timeout |
| ) |
| |
| def ping( |
| self, |
| dest_ip: str, |
| count: int = 3, |
| interval: int = 1000, |
| timeout: int = 1000, |
| size: int = 25, |
| additional_ping_params: Optional[str] = None, |
| ) -> Dict[str, Any]: |
| raise NotImplementedError("ping is not implemented") |
| |
| def hard_power_cycle(self, pdus: List[PduDevice]) -> None: |
| raise NotImplementedError("hard_power_cycle is not implemented") |
| |
| def create_iperf_client( |
| self, test_interface: Optional[str] = None |
| ) -> IPerfClientBase: |
| if not test_interface: |
| test_interface = self.get_default_wlan_test_interface() |
| |
| return iperf_client.IPerfClientOverAdb( |
| android_device_or_serial=self.device, test_interface=test_interface |
| ) |
| |
| def feature_is_present(self, feature: str) -> bool: |
| raise NotImplementedError("feature_is_present is not implemented") |
| |
| |
| class FuchsiaWlanDevice(SupportsWLAN): |
| """Fuchsia device that supports WLAN.""" |
| |
| def __init__(self, fuchsia_device: FuchsiaDevice): |
| self.device = fuchsia_device |
| self.device.configure_wlan() |
| |
| @property |
| def identifier(self) -> str: |
| return self.device.ip |
| |
| def wifi_toggle_state(self, state: Optional[bool]) -> None: |
| pass |
| |
| def reset_wifi(self) -> None: |
| pass |
| |
| def take_bug_report(self, record: TestResultRecord) -> None: |
| self.device.take_bug_report(record.test_name, record.begin_time) |
| |
| def turn_location_off_and_scan_toggle_off(self) -> None: |
| pass |
| |
| def associate( |
| self, |
| target_ssid: str, |
| target_pwd: Optional[str] = None, |
| key_mgmt: Optional[str] = None, |
| check_connectivity: bool = True, |
| hidden: bool = False, |
| target_security: SecurityMode = SecurityMode.OPEN, |
| ) -> bool: |
| if self.device.association_mechanism == "drivers": |
| bss_scan_response = self.device.sl4f.wlan_lib.wlanScanForBSSInfo() |
| if bss_scan_response.get("error"): |
| logging.error( |
| f"Scan for BSS info failed. Err: {bss_scan_response['error']}" |
| ) |
| return False |
| |
| bss_descs_for_ssid = bss_scan_response["result"].get(target_ssid, None) |
| if not bss_descs_for_ssid or len(bss_descs_for_ssid) < 1: |
| logging.error( |
| "Scan failed to find a BSS description for target_ssid %s" |
| % target_ssid |
| ) |
| return False |
| |
| connection_response = self.device.sl4f.wlan_lib.wlanConnectToNetwork( |
| target_ssid, bss_descs_for_ssid[0], target_pwd=target_pwd |
| ) |
| return self.device.check_connect_response(connection_response) |
| else: |
| return self.device.wlan_policy_controller.save_and_connect( |
| target_ssid, |
| target_security.fuchsia_security_type(), |
| password=target_pwd, |
| ) |
| |
| def disconnect(self) -> None: |
| """Function to disconnect from a Fuchsia WLAN device. |
| Asserts if disconnect was not successful. |
| """ |
| if self.device.association_mechanism == "drivers": |
| disconnect_response = self.device.sl4f.wlan_lib.wlanDisconnect() |
| self.device.check_disconnect_response(disconnect_response) |
| else: |
| self.device.wlan_policy_controller.remove_all_networks_and_wait_for_no_connections() |
| |
| def can_ping( |
| self, |
| dest_ip: str, |
| count: int = 3, |
| interval: int = 1000, |
| timeout: int = 1000, |
| size: int = 25, |
| additional_ping_params: Optional[str] = None, |
| ) -> bool: |
| return self.device.can_ping( |
| dest_ip, |
| count=count, |
| interval=interval, |
| timeout=timeout, |
| size=size, |
| additional_ping_params=additional_ping_params, |
| ) |
| |
| def ping( |
| self, |
| dest_ip: str, |
| count: int = 3, |
| interval: int = 1000, |
| timeout: int = 1000, |
| size: int = 25, |
| additional_ping_params: Optional[str] = None, |
| ) -> Dict[str, Any]: |
| return self.device.ping( |
| dest_ip, |
| count=count, |
| interval=interval, |
| timeout=timeout, |
| size=size, |
| additional_ping_params=additional_ping_params, |
| ) |
| |
| def get_wlan_interface_id_list(self) -> List[str]: |
| response = self.device.sl4f.wlan_lib.wlanGetIfaceIdList() |
| if response.get("error"): |
| raise ConnectionError( |
| f'Failed to get wlan interface IDs: {response["error"]}' |
| ) |
| return response.get("result") |
| |
| def get_default_wlan_test_interface(self) -> str: |
| return self.device.wlan_client_test_interface_name |
| |
| def destroy_wlan_interface(self, iface_id: str) -> bool: |
| result = self.device.sl4f.wlan_lib.wlanDestroyIface(iface_id) |
| if result.get("error") is None: |
| return True |
| else: |
| logging.error(f"Failed to destroy interface with: {result.get('error')}") |
| return False |
| |
| def is_connected(self, ssid: Optional[str] = None) -> bool: |
| response = self.device.sl4f.wlan_lib.wlanStatus() |
| if response.get("error"): |
| raise ConnectionError("Failed to get client network connection status") |
| result = response.get("result") |
| if isinstance(result, dict): |
| connected_to = result.get("Connected") |
| # TODO(https://fxbug.dev/85938): Remove backwards compatibility once |
| # ACTS is versioned with Fuchsia. |
| if not connected_to: |
| connected_to = result.get("connected_to") |
| if not connected_to: |
| return False |
| |
| if ssid: |
| # Replace encoding errors instead of raising an exception. |
| # Since `ssid` is a string, this will not affect the test |
| # for equality. |
| connected_ssid = bytearray(connected_to["ssid"]).decode( |
| encoding="utf-8", errors="replace" |
| ) |
| return ssid == connected_ssid |
| return True |
| return False |
| |
| def hard_power_cycle(self, pdus: List[PduDevice]) -> None: |
| self.device.reboot(reboot_type="hard", testbed_pdus=pdus) |
| |
| def create_iperf_client( |
| self, test_interface: Optional[str] = None |
| ) -> IPerfClientBase: |
| if not test_interface: |
| test_interface = self.get_default_wlan_test_interface() |
| |
| # A package server is necessary to acquire the iperf3 client for |
| # some builds. |
| self.device.start_package_server() |
| |
| return iperf_client.IPerfClientOverSsh( |
| ssh_provider=self.device.ssh, |
| test_interface=test_interface, |
| ) |
| |
| def feature_is_present(self, feature: str) -> bool: |
| return feature in self.device.wlan_features |
| |
| |
| def create_wlan_device( |
| hardware_device: Union[FuchsiaDevice, AndroidDevice] |
| ) -> SupportsWLAN: |
| """Creates a generic WLAN device based on type of device that is sent to |
| the functions. |
| |
| Args: |
| hardware_device: A WLAN hardware device that is supported by ACTS. |
| """ |
| device: SupportsWLAN |
| if isinstance(hardware_device, FuchsiaDevice): |
| device = FuchsiaWlanDevice(hardware_device) |
| elif isinstance(hardware_device, AndroidDevice): |
| device = AndroidWlanDevice(hardware_device) |
| else: |
| raise ValueError( |
| f"Unable to create WLAN device for type {type(hardware_device)}" |
| ) |
| |
| assert isinstance(device, SupportsWLAN) |
| return device |