blob: 4240ca41f70069250ddc83ac4b1607b20b946420 [file] [log] [blame]
#!/usr/bin/env python3
#
# Copyright 2022 The Fuchsia Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from typing import Any, Dict, List, Optional, Protocol, Union, runtime_checkable
from mobly.records import TestResultRecord
from antlion.controllers import iperf_client
from antlion.controllers.android_device import AndroidDevice
from antlion.controllers.ap_lib.hostapd_security import SecurityMode
from antlion.controllers.fuchsia_device import FuchsiaDevice
from antlion.controllers.iperf_client import IPerfClientBase
from antlion.controllers.pdu import PduDevice
from antlion.test_utils.wifi import wifi_test_utils as awutils
from antlion.utils import adb_shell_ping
FUCHSIA_VALID_SECURITY_TYPES = {"none", "wep", "wpa", "wpa2", "wpa3"}
@runtime_checkable
class SupportsWLAN(Protocol):
"""A generic WLAN device."""
@property
def identifier(self) -> str:
"""Unique identifier for this device."""
...
def take_bug_report(self, record: TestResultRecord) -> None:
"""Take a bug report on the device and stores it on the host.
Will store the bug report in the output directory for the currently running
test, as specified by `record`.
Args:
record: Information about the current running test.
"""
...
def associate(
self,
target_ssid: str,
target_pwd: Optional[str] = None,
key_mgmt: Optional[str] = None,
check_connectivity: bool = True,
hidden: bool = False,
target_security: SecurityMode = SecurityMode.OPEN,
) -> bool:
"""Associate to a target network.
Args:
target_ssid: SSID to associate to.
target_pwd: Password for the SSID, if necessary.
key_mgmt: The hostapd wpa_key_mgmt, if specified.
check_connectivity: Whether to check for internet connectivity.
hidden: Whether the network is hidden.
target_security: Target security for network, used to
save the network in policy connects (see wlan_policy_lib)
Returns:
True if successfully connected to WLAN, False if not.
"""
...
def disconnect(self) -> None:
"""Disconnect from all WLAN networks."""
...
def get_default_wlan_test_interface(self) -> str:
"""Name of default WLAN interface to use for testing."""
...
def is_connected(self, ssid: Optional[str] = None) -> bool:
"""Determines if wlan_device is connected to wlan network.
Args:
ssid: If specific, check if device is connect to a specific network.
Returns:
True if connected to requested network; otherwise, False.
"""
...
def can_ping(
self,
dest_ip: str,
count: int = 3,
interval: int = 1000,
timeout: int = 1000,
size: int = 25,
additional_ping_params: Optional[str] = None,
) -> bool:
"""Pings from a device to an IP address or hostname
Args:
dest_ip: IP or hostname to ping
count: How many icmp packets to send
interval: Milliseconds to wait between pings
timeout: Milliseconds to wait before having the icmp packet timeout
size: Size of the icmp packet in bytes
additional_ping_params: Command option flags to append to the command string
Returns:
True if the ping was successful; otherwise, False.
"""
...
def create_iperf_client(
self, test_interface: Optional[str] = None
) -> IPerfClientBase:
"""Create an iPerf3 client on this device.
Args:
test_interface: Name of test interface. Defaults to first found wlan client
interface.
Returns:
IPerfClient object
"""
...
def get_wlan_interface_id_list(self) -> List[str]:
"""List available WLAN interfaces.
Returns:
A list of wlan interface IDs.
"""
...
def destroy_wlan_interface(self, iface_id: str) -> bool:
"""Destroy the specified WLAN interface.
Args:
iface_id: ID of the interface to destroy.
Returns:
True if successfully destroyed wlan interface, False if not.
"""
...
def ping(
self,
dest_ip: str,
count: int = 3,
interval: int = 1000,
timeout: int = 1000,
size: int = 25,
additional_ping_params: Optional[str] = None,
) -> Dict[str, Any]:
"""Pings from a device to an IP address or hostname
Args:
dest_ip: IP or hostname to ping
count: How many icmp packets to send
interval: Milliseconds to wait between pings
timeout: Milliseconds to wait before having the icmp packet timeout
size: Size of the icmp packet in bytes
additional_ping_params: Command option flags to append to the command string
Returns:
A dictionary for the results of the ping. The dictionary contains
the following items:
status: Whether the ping was successful.
rtt_min: The minimum round trip time of the ping.
rtt_max: The minimum round trip time of the ping.
rtt_avg: The avg round trip time of the ping.
stdout: The standard out of the ping command.
stderr: The standard error of the ping command.
"""
...
def hard_power_cycle(self, pdus: List[PduDevice]) -> None:
"""Reboot a device abruptly without notification.
Args:
pdus: All testbed PDUs
"""
...
def feature_is_present(self, feature: str) -> bool:
"""Check if a WLAN feature is present.
Args:
feature: WLAN feature to query
Returns:
True if `feature` is present; otherwise, False.
"""
...
def wifi_toggle_state(self, state: Optional[bool]) -> None:
"""Toggle the state of Wi-Fi.
Args:
state: Wi-Fi state to set to. If None, opposite of the current state.
"""
...
def reset_wifi(self) -> None:
"""Clears all saved Wi-Fi networks on a device.
This will turn Wi-Fi on.
"""
...
def turn_location_off_and_scan_toggle_off(self) -> None:
"""Turn off Wi-Fi location scans."""
...
class AndroidWlanDevice(SupportsWLAN):
"""Android device that supports WLAN."""
def __init__(self, android_device: AndroidDevice) -> None:
self.device = android_device
@property
def identifier(self) -> str:
return self.device.serial
def wifi_toggle_state(self, state: Optional[bool]) -> None:
awutils.wifi_toggle_state(self.device, state)
def reset_wifi(self) -> None:
awutils.reset_wifi(self.device)
def take_bug_report(self, record: TestResultRecord) -> None:
self.device.take_bug_report(record.test_name, record.begin_time)
def turn_location_off_and_scan_toggle_off(self) -> None:
awutils.turn_location_off_and_scan_toggle_off(self.device)
def associate(
self,
target_ssid: str,
target_pwd: Optional[str] = None,
key_mgmt: Optional[str] = None,
check_connectivity: bool = True,
hidden: bool = False,
target_security: SecurityMode = SecurityMode.OPEN,
) -> bool:
network = {"SSID": target_ssid, "hiddenSSID": hidden}
if target_pwd:
network["password"] = target_pwd
if key_mgmt:
network["security"] = key_mgmt
try:
awutils.connect_to_wifi_network(
self.device,
network,
check_connectivity=check_connectivity,
hidden=hidden,
)
return True
except Exception as e:
self.device.log.info(f"Failed to associated ({e})")
return False
def disconnect(self) -> None:
awutils.turn_location_off_and_scan_toggle_off(self.device)
def get_wlan_interface_id_list(self) -> List[str]:
raise NotImplementedError("get_wlan_interface_id_list is not implemented")
def get_default_wlan_test_interface(self) -> str:
return "wlan0"
def destroy_wlan_interface(self, iface_id: str) -> bool:
raise NotImplementedError("destroy_wlan_interface is not implemented")
def is_connected(self, ssid: Optional[str] = None) -> bool:
wifi_info = self.device.droid.wifiGetConnectionInfo()
if ssid:
return "BSSID" in wifi_info and wifi_info["SSID"] == ssid
return "BSSID" in wifi_info
def can_ping(
self,
dest_ip: str,
count: int = 3,
interval: int = 1000,
timeout: int = 1000,
size: int = 25,
additional_ping_params: Optional[str] = None,
) -> bool:
return adb_shell_ping(
self.device, dest_ip=dest_ip, count=count, timeout=timeout
)
def ping(
self,
dest_ip: str,
count: int = 3,
interval: int = 1000,
timeout: int = 1000,
size: int = 25,
additional_ping_params: Optional[str] = None,
) -> Dict[str, Any]:
raise NotImplementedError("ping is not implemented")
def hard_power_cycle(self, pdus: List[PduDevice]) -> None:
raise NotImplementedError("hard_power_cycle is not implemented")
def create_iperf_client(
self, test_interface: Optional[str] = None
) -> IPerfClientBase:
if not test_interface:
test_interface = self.get_default_wlan_test_interface()
return iperf_client.IPerfClientOverAdb(
android_device_or_serial=self.device, test_interface=test_interface
)
def feature_is_present(self, feature: str) -> bool:
raise NotImplementedError("feature_is_present is not implemented")
class FuchsiaWlanDevice(SupportsWLAN):
"""Fuchsia device that supports WLAN."""
def __init__(self, fuchsia_device: FuchsiaDevice):
self.device = fuchsia_device
self.device.configure_wlan()
@property
def identifier(self) -> str:
return self.device.ip
def wifi_toggle_state(self, state: Optional[bool]) -> None:
pass
def reset_wifi(self) -> None:
pass
def take_bug_report(self, record: TestResultRecord) -> None:
self.device.take_bug_report(record.test_name, record.begin_time)
def turn_location_off_and_scan_toggle_off(self) -> None:
pass
def associate(
self,
target_ssid: str,
target_pwd: Optional[str] = None,
key_mgmt: Optional[str] = None,
check_connectivity: bool = True,
hidden: bool = False,
target_security: SecurityMode = SecurityMode.OPEN,
) -> bool:
if self.device.association_mechanism == "drivers":
bss_scan_response = self.device.sl4f.wlan_lib.wlanScanForBSSInfo()
if bss_scan_response.get("error"):
logging.error(
f"Scan for BSS info failed. Err: {bss_scan_response['error']}"
)
return False
bss_descs_for_ssid = bss_scan_response["result"].get(target_ssid, None)
if not bss_descs_for_ssid or len(bss_descs_for_ssid) < 1:
logging.error(
"Scan failed to find a BSS description for target_ssid %s"
% target_ssid
)
return False
connection_response = self.device.sl4f.wlan_lib.wlanConnectToNetwork(
target_ssid, bss_descs_for_ssid[0], target_pwd=target_pwd
)
return self.device.check_connect_response(connection_response)
else:
return self.device.wlan_policy_controller.save_and_connect(
target_ssid,
target_security.fuchsia_security_type(),
password=target_pwd,
)
def disconnect(self) -> None:
"""Function to disconnect from a Fuchsia WLAN device.
Asserts if disconnect was not successful.
"""
if self.device.association_mechanism == "drivers":
disconnect_response = self.device.sl4f.wlan_lib.wlanDisconnect()
self.device.check_disconnect_response(disconnect_response)
else:
self.device.wlan_policy_controller.remove_all_networks_and_wait_for_no_connections()
def can_ping(
self,
dest_ip: str,
count: int = 3,
interval: int = 1000,
timeout: int = 1000,
size: int = 25,
additional_ping_params: Optional[str] = None,
) -> bool:
return self.device.can_ping(
dest_ip,
count=count,
interval=interval,
timeout=timeout,
size=size,
additional_ping_params=additional_ping_params,
)
def ping(
self,
dest_ip: str,
count: int = 3,
interval: int = 1000,
timeout: int = 1000,
size: int = 25,
additional_ping_params: Optional[str] = None,
) -> Dict[str, Any]:
return self.device.ping(
dest_ip,
count=count,
interval=interval,
timeout=timeout,
size=size,
additional_ping_params=additional_ping_params,
)
def get_wlan_interface_id_list(self) -> List[str]:
response = self.device.sl4f.wlan_lib.wlanGetIfaceIdList()
if response.get("error"):
raise ConnectionError(
f'Failed to get wlan interface IDs: {response["error"]}'
)
return response.get("result")
def get_default_wlan_test_interface(self) -> str:
return self.device.wlan_client_test_interface_name
def destroy_wlan_interface(self, iface_id: str) -> bool:
result = self.device.sl4f.wlan_lib.wlanDestroyIface(iface_id)
if result.get("error") is None:
return True
else:
logging.error(f"Failed to destroy interface with: {result.get('error')}")
return False
def is_connected(self, ssid: Optional[str] = None) -> bool:
response = self.device.sl4f.wlan_lib.wlanStatus()
if response.get("error"):
raise ConnectionError("Failed to get client network connection status")
result = response.get("result")
if isinstance(result, dict):
connected_to = result.get("Connected")
# TODO(https://fxbug.dev/85938): Remove backwards compatibility once
# ACTS is versioned with Fuchsia.
if not connected_to:
connected_to = result.get("connected_to")
if not connected_to:
return False
if ssid:
# Replace encoding errors instead of raising an exception.
# Since `ssid` is a string, this will not affect the test
# for equality.
connected_ssid = bytearray(connected_to["ssid"]).decode(
encoding="utf-8", errors="replace"
)
return ssid == connected_ssid
return True
return False
def hard_power_cycle(self, pdus: List[PduDevice]) -> None:
self.device.reboot(reboot_type="hard", testbed_pdus=pdus)
def create_iperf_client(
self, test_interface: Optional[str] = None
) -> IPerfClientBase:
if not test_interface:
test_interface = self.get_default_wlan_test_interface()
# A package server is necessary to acquire the iperf3 client for
# some builds.
self.device.start_package_server()
return iperf_client.IPerfClientOverSsh(
ssh_provider=self.device.ssh,
test_interface=test_interface,
)
def feature_is_present(self, feature: str) -> bool:
return feature in self.device.wlan_features
def create_wlan_device(
hardware_device: Union[FuchsiaDevice, AndroidDevice]
) -> SupportsWLAN:
"""Creates a generic WLAN device based on type of device that is sent to
the functions.
Args:
hardware_device: A WLAN hardware device that is supported by ACTS.
"""
device: SupportsWLAN
if isinstance(hardware_device, FuchsiaDevice):
device = FuchsiaWlanDevice(hardware_device)
elif isinstance(hardware_device, AndroidDevice):
device = AndroidWlanDevice(hardware_device)
else:
raise ValueError(
f"Unable to create WLAN device for type {type(hardware_device)}"
)
assert isinstance(device, SupportsWLAN)
return device