| #!/usr/bin/env python3 |
| # |
| # Copyright 2022 The Fuchsia Authors |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| import itertools |
| import logging |
| import os |
| import time |
| from dataclasses import dataclass |
| from enum import Enum, StrEnum, auto, unique |
| from multiprocessing import Process |
| |
| from mobly import asserts, signals, test_runner |
| |
| from antlion import utils |
| from antlion.controllers import iperf_client, iperf_server |
| from antlion.controllers.access_point import AccessPoint, setup_ap |
| from antlion.controllers.ap_lib.hostapd_constants import AP_SSID_LENGTH_2G, BandType |
| from antlion.controllers.ap_lib.hostapd_security import Security, SecurityMode |
| from antlion.controllers.ap_lib.hostapd_utils import generate_random_password |
| from antlion.controllers.fuchsia_device import FuchsiaDevice |
| from antlion.net import wait_for_port |
| from antlion.test_utils.abstract_devices.wlan_device import create_wlan_device |
| from antlion.test_utils.wifi import base_test |
| |
| DEFAULT_IPERF_TIMEOUT = 30 |
| DUT_NETWORK_CONNECTION_TIMEOUT = 60 |
| |
| |
| @unique |
| class DeviceType(StrEnum): |
| AP = auto() |
| DUT = auto() |
| |
| |
| @unique |
| class RebootType(StrEnum): |
| SOFT = auto() |
| HARD = auto() |
| |
| |
| @unique |
| class IpVersionType(Enum): |
| IPV4 = auto() |
| IPV6 = auto() |
| DUAL_IPV4_IPV6 = auto() |
| |
| def ipv4(self) -> bool: |
| match self: |
| case IpVersionType.IPV4: |
| return True |
| case IpVersionType.IPV6: |
| return False |
| case IpVersionType.DUAL_IPV4_IPV6: |
| return True |
| |
| def ipv6(self) -> bool: |
| match self: |
| case IpVersionType.IPV4: |
| return False |
| case IpVersionType.IPV6: |
| return True |
| case IpVersionType.DUAL_IPV4_IPV6: |
| return True |
| |
| @staticmethod |
| def all() -> list["IpVersionType"]: |
| return [IpVersionType.IPV4, IpVersionType.IPV6, IpVersionType.DUAL_IPV4_IPV6] |
| |
| |
| @dataclass |
| class TestParams: |
| reboot_device: DeviceType |
| reboot_type: RebootType |
| band: BandType |
| security_mode: SecurityMode |
| ip_version: IpVersionType |
| |
| |
| class WlanRebootTest(base_test.WifiBaseTest): |
| """Tests wlan reconnects in different reboot scenarios. |
| |
| Testbed Requirement: |
| * One ACTS compatible device (dut) |
| * One Whirlwind Access Point (will also serve as iperf server) |
| * One PduDevice |
| """ |
| |
| def pre_run(self) -> None: |
| test_params: list[tuple[TestParams]] = [] |
| for ( |
| device_type, |
| reboot_type, |
| band, |
| security_mode, |
| ip_version, |
| ) in itertools.product( |
| # DeviceType, |
| # RebootType, |
| # BandType, |
| # SecurityMode, |
| # IpVersionType, |
| # |
| # TODO(https://github.com/python/mypy/issues/14688): Replace the code below |
| # with the commented code above once the bug affecting StrEnum resolves. |
| [e for e in DeviceType], |
| [e for e in RebootType], |
| [e for e in BandType], |
| [e for e in SecurityMode], |
| [e for e in IpVersionType], |
| ): |
| test_params.append( |
| ( |
| TestParams( |
| device_type, |
| reboot_type, |
| band, |
| security_mode, |
| ip_version, |
| ), |
| ) |
| ) |
| |
| def generate_test_name(t: TestParams) -> str: |
| test_name = ( |
| "test" |
| f"_{t.reboot_type}_reboot" |
| f"_{t.reboot_device}" |
| f"_{t.band}" |
| f"_{t.security_mode}" |
| ) |
| if t.ip_version.ipv4(): |
| test_name += "_ipv4" |
| if t.ip_version.ipv6(): |
| test_name += "_ipv6" |
| return test_name |
| |
| self.generate_tests( |
| test_logic=self.run_reboot_test, |
| name_func=generate_test_name, |
| arg_sets=test_params, |
| ) |
| |
| def setup_class(self) -> None: |
| super().setup_class() |
| self.log = logging.getLogger() |
| self.access_point: AccessPoint = self.access_points[0] |
| self.fuchsia_device: FuchsiaDevice | None = None |
| |
| if len(self.fuchsia_devices) < 1: |
| raise signals.TestAbortClass("At least one Fuchsia device is required") |
| |
| device_type = self.user_params.get("dut", "fuchsia_devices") |
| if device_type == "fuchsia_devices": |
| self.fuchsia_device = self.fuchsia_devices[0] |
| self.dut = create_wlan_device(self.fuchsia_device) |
| elif device_type == "android_devices": |
| self.dut = create_wlan_device(self.android_devices[0]) |
| else: |
| raise ValueError( |
| f'Invalid "dut" type specified in config: "{device_type}".' |
| 'Expected "fuchsia_devices" or "android_devices".' |
| ) |
| |
| self.iperf_server_on_ap: iperf_server.IPerfServerOverSsh | None = None |
| |
| if hasattr(self, "iperf_clients") and self.iperf_clients: |
| self.iperf_client_on_dut = self.iperf_clients[0] |
| else: |
| self.iperf_client_on_dut = self.dut.create_iperf_client() |
| |
| def setup_test(self) -> None: |
| super().setup_test() |
| self.access_point.stop_all_aps() |
| self.dut.wifi_toggle_state(True) |
| for ad in self.android_devices: |
| ad.droid.wakeLockAcquireBright() |
| ad.droid.wakeUpNow() |
| self.dut.disconnect() |
| if self.fuchsia_device: |
| self.fuchsia_device.configure_wlan() |
| |
| def teardown_test(self) -> None: |
| # TODO(b/273923552): We take a snapshot here and before rebooting the |
| # DUT for every test because the persistence component does not make the |
| # inspect logs available for 120 seconds. This helps for debugging |
| # issues where we need previous state. |
| self.dut.take_bug_report(self.current_test_info.record) |
| self.download_ap_logs() |
| self.access_point.stop_all_aps() |
| self.dut.disconnect() |
| for ad in self.android_devices: |
| ad.droid.wakeLockRelease() |
| ad.droid.goToSleepNow() |
| self.dut.turn_location_off_and_scan_toggle_off() |
| self.dut.reset_wifi() |
| super().teardown_test() |
| |
| def setup_ap( |
| self, |
| ssid: str, |
| band: BandType, |
| ip_version: IpVersionType, |
| security_mode: SecurityMode, |
| password: str | None = None, |
| ) -> None: |
| """Setup ap with basic config. |
| |
| Args: |
| ssid: The ssid to setup on ap |
| band: The type of band to set up the iperf server with ('2g' or '5g'). |
| ip_version: The type of ip to use (ipv4 or ipv6) |
| security_mode: The type of security mode. |
| password: The PSK or passphase. |
| """ |
| # TODO(fxb/63719): Add varying AP parameters |
| security_profile = Security(security_mode=security_mode, password=password) |
| channel: int |
| |
| match band: |
| case BandType.BAND_2G: |
| channel = 11 |
| case BandType.BAND_5G: |
| channel = 36 |
| |
| setup_ap( |
| access_point=self.access_point, |
| profile_name="whirlwind", |
| channel=channel, |
| ssid=ssid, |
| security=security_profile, |
| is_ipv6_enabled=ip_version.ipv6(), |
| ) |
| |
| if not ip_version.ipv4(): |
| self.access_point.stop_dhcp() |
| |
| self.log.info(f"Network (SSID: {ssid}) is up.") |
| |
| def setup_iperf_server_on_ap( |
| self, band: BandType |
| ) -> iperf_server.IPerfServerOverSsh: |
| """Configures iperf server based on the tests band. |
| |
| Args: |
| band: The type of band to set up the iperf server with ('2g' or '5g'). |
| """ |
| test_interface: str |
| |
| if band is BandType.BAND_2G: |
| test_interface = self.access_point.wlan_2g |
| elif band is BandType.BAND_5G: |
| test_interface = self.access_point.wlan_5g |
| else: |
| raise TypeError(f'Unknown band type: "{band}"') |
| |
| return iperf_server.IPerfServerOverSsh( |
| self.access_point.ssh_settings, 5201, test_interface=test_interface |
| ) |
| |
| def get_iperf_server_address( |
| self, |
| iperf_server_on_ap: iperf_server.IPerfServerOverSsh, |
| ip_version: IpVersionType, |
| ) -> str: |
| """Retrieves the ip address of the iperf server. |
| |
| Args: |
| iperf_server_on_ap: IPerfServer object, linked to AP |
| ip_version: The ip version (ipv4 or ipv6) |
| |
| Returns: |
| The ip address of the iperf_server |
| """ |
| # TODO(http://b/286449352): Remove this check once iperf_client has been refactored. |
| assert isinstance( |
| self.iperf_client_on_dut, |
| (iperf_client.IPerfClientOverSsh, iperf_client.IPerfClientOverAdb), |
| ) |
| |
| iperf_server_addresses = iperf_server_on_ap.get_interface_ip_addresses( |
| iperf_server_on_ap.test_interface |
| ) |
| if ip_version == IpVersionType.IPV4: |
| iperf_server_ip_address = iperf_server_addresses["ipv4_private"][0] |
| elif ip_version == IpVersionType.IPV6: |
| if iperf_server_addresses["ipv6_private_local"]: |
| iperf_server_ip_address = iperf_server_addresses["ipv6_private_local"][ |
| 0 |
| ] |
| else: |
| iperf_server_ip_address = ( |
| f"{iperf_server_addresses['ipv6_link_local'][0]}%" |
| f"{self.iperf_client_on_dut.test_interface}" |
| ) |
| else: |
| raise TypeError(f"Invalid IP type: {ip_version}") |
| |
| return iperf_server_ip_address |
| |
| def verify_traffic_between_dut_and_ap( |
| self, |
| iperf_server_on_ap: iperf_server.IPerfServerOverSsh, |
| iperf_client_on_dut: iperf_client.IPerfClientBase, |
| ip_version: IpVersionType, |
| ) -> None: |
| """Runs IPerf traffic from the iperf client (dut) and the iperf |
| server (and vice versa) and verifies traffic was able to pass |
| successfully. |
| |
| Args: |
| iperf_server_on_ap: IPerfServer object, linked to AP |
| iperf_client_on_dut: IPerfClient object, linked to DUT |
| ip_version: The ip version (ipv4 or ipv6) |
| |
| Raises: |
| ConnectionError, if traffic is not passed successfully in both |
| directions. |
| """ |
| iperf_server_ip_address = self.get_iperf_server_address( |
| iperf_server_on_ap, ip_version |
| ) |
| |
| self.log.info( |
| f"Attempting to pass traffic from DUT to IPerf server ({iperf_server_ip_address})." |
| ) |
| tx_file = iperf_client_on_dut.start( |
| iperf_server_ip_address, |
| "-i 1 -t 3 -J", |
| "reboot_tx", |
| timeout=DEFAULT_IPERF_TIMEOUT, |
| ) |
| tx_results = iperf_server.IPerfResult(tx_file) |
| if not tx_results.avg_receive_rate or tx_results.avg_receive_rate == 0: |
| raise ConnectionError( |
| f"Failed to pass IPerf traffic from DUT to server ({iperf_server_ip_address}). " |
| f"TX average receive rate: {tx_results.avg_receive_rate}" |
| ) |
| else: |
| self.log.info( |
| f"Success: Traffic passed from DUT to IPerf server ({iperf_server_ip_address})." |
| ) |
| self.log.info( |
| f"Attempting to pass traffic from IPerf server ({iperf_server_ip_address}) to DUT." |
| ) |
| rx_file = iperf_client_on_dut.start( |
| iperf_server_ip_address, |
| "-i 1 -t 3 -R -J", |
| "reboot_rx", |
| timeout=DEFAULT_IPERF_TIMEOUT, |
| ) |
| rx_results = iperf_server.IPerfResult(rx_file) |
| if not rx_results.avg_receive_rate or rx_results.avg_receive_rate == 0: |
| raise ConnectionError( |
| f"Failed to pass IPerf traffic from server ({iperf_server_ip_address}) to DUT. " |
| f"RX average receive rate: {rx_results.avg_receive_rate}" |
| ) |
| else: |
| self.log.info( |
| f"Success: Traffic passed from IPerf server ({iperf_server_ip_address}) to DUT." |
| ) |
| |
| def start_dut_ping_process( |
| self, |
| iperf_server_on_ap: iperf_server.IPerfServerOverSsh, |
| ip_version: IpVersionType, |
| ) -> None: |
| """Creates a process that pings the AP from the DUT. |
| |
| Runs in parallel for 15 seconds, so it can be interrupted by a reboot. |
| Sleeps for a few seconds to ensure pings have started. |
| |
| Args: |
| iperf_server_on_ap: IPerfServer object, linked to AP |
| ip_version: The ip version (ipv4 or ipv6) |
| """ |
| ap_address = self.get_iperf_server_address(iperf_server_on_ap, ip_version) |
| if ap_address: |
| self.log.info( |
| f"Starting ping process to {ap_address} in parallel. Logs from this " |
| "process will be suppressed, since it will be intentionally " |
| "interrupted." |
| ) |
| ping_proc = Process( |
| target=self.dut.ping, args=[ap_address], kwargs={"count": 15} |
| ) |
| with utils.SuppressLogOutput(): |
| ping_proc.start() |
| # Allow for a few seconds of pinging before allowing it to be |
| # interrupted. |
| time.sleep(3) |
| else: |
| raise ConnectionError("Failed to retrieve APs iperf address.") |
| |
| def prepare_dut_for_reconnection(self) -> None: |
| """Perform any actions to ready DUT for reconnection. |
| |
| These actions will vary depending on the DUT. eg. android devices may |
| need to be woken up, ambient devices should not require any interaction, |
| etc. |
| """ |
| self.dut.wifi_toggle_state(True) |
| for ad in self.android_devices: |
| ad.droid.wakeUpNow() |
| |
| def wait_for_dut_network_connection(self, ssid: str) -> None: |
| """Checks if device is connected to given network. Sleeps 1 second |
| between retries. |
| |
| Args: |
| ssid: ssid to check connection to. |
| Raises: |
| ConnectionError, if DUT is not connected after all timeout. |
| """ |
| self.log.info( |
| f"Checking if DUT is connected to {ssid} network. Will retry for " |
| f"{DUT_NETWORK_CONNECTION_TIMEOUT} seconds." |
| ) |
| timeout = time.time() + DUT_NETWORK_CONNECTION_TIMEOUT |
| while time.time() < timeout: |
| try: |
| is_connected = self.dut.is_connected(ssid=ssid) |
| except Exception as err: |
| self.log.debug(f"SL4* call failed. Retrying in 1 second. Error: {err}") |
| is_connected = False |
| finally: |
| if is_connected: |
| self.log.info("Success: DUT has connected.") |
| break |
| else: |
| self.log.debug( |
| f"DUT not connected to network {ssid}...retrying in 1 second." |
| ) |
| time.sleep(1) |
| else: |
| raise ConnectionError("DUT failed to connect to the network.") |
| |
| def write_csv_time_to_reconnect( |
| self, |
| test_name: str, |
| reconnect_success: bool, |
| time_to_reconnect: float = 0.0, |
| ) -> None: |
| """Writes the time to reconnect to a csv file. |
| Args: |
| test_name: the name of the test case |
| reconnect_success: whether the test successfully reconnected or not |
| time_to_reconnect: the time from when the rebooted device came back |
| up to when it reassociated (or 'FAIL'), if it failed to |
| reconnect. |
| """ |
| csv_file_name = os.path.join(self.log_path, "time_to_reconnect.csv") |
| self.log.info(f"Writing to {csv_file_name}") |
| with open(csv_file_name, "a") as csv_file: |
| if reconnect_success: |
| csv_file.write(f"{test_name},{time_to_reconnect}\n") |
| else: |
| csv_file.write(f"{test_name},'FAIL'\n") |
| |
| def log_and_continue( |
| self, ssid: str, time_to_reconnect: float = 0.0, error: Exception | None = None |
| ) -> None: |
| """Writes the time to reconnect to the csv file before continuing, used |
| in stress tests runs. |
| |
| Args: |
| time_to_reconnect: the time from when the rebooted device came back |
| ip to when reassociation occurred. |
| error: error message to log before continuing with the test |
| """ |
| if error: |
| self.log.info( |
| f"Device failed to reconnect to network {ssid}. Error: {error}" |
| ) |
| self.write_csv_time_to_reconnect(f"{self.current_test_info.name}", False) |
| |
| else: |
| self.log.info( |
| f"Device successfully reconnected to network {ssid} after " |
| f"{time_to_reconnect} seconds." |
| ) |
| self.write_csv_time_to_reconnect( |
| f"{self.current_test_info.name}", True, time_to_reconnect |
| ) |
| |
| def run_reboot_test(self, settings: TestParams) -> None: |
| """Runs a reboot test based on a given config. |
| 1. Setups up a network, associates the dut, and saves the network. |
| 2. Verifies the dut receives ip address(es). |
| 3. Verifies traffic between DUT and AP (IPerf client and server). |
| 4. Reboots (hard or soft) the device (dut or ap). |
| - If the ap was rebooted, setup the same network again. |
| 5. Wait for reassociation or timeout. |
| 6. If reassocation occurs: |
| - Verifies the dut receives ip address(es). |
| - Verifies traffic between DUT and AP (IPerf client and server). |
| 7. Logs time to reconnect (or failure to reconnect) |
| |
| Args: |
| settings: TestParams dataclass containing the following values: |
| reboot_device: the device to reboot either DUT or AP. |
| reboot_type: how to reboot the reboot_device either hard or soft. |
| band: band to setup either 2g or 5g |
| security_mode: security mode to set up either OPEN, WPA2, or WPA3. |
| ip_version: the ip version (ipv4 or ipv6) |
| """ |
| # TODO(b/286443517): Properly support WLAN on android devices. |
| assert ( |
| self.fuchsia_device is not None |
| ), "Fuchsia device not found, test currently does not support android devices." |
| |
| # TODO(b/286449352): Remove this check once iperf_client has been refactored. |
| assert isinstance( |
| self.iperf_client_on_dut, |
| (iperf_client.IPerfClientOverSsh, iperf_client.IPerfClientOverAdb), |
| ) |
| assert isinstance(self.iperf_client_on_dut.test_interface, str) |
| |
| ssid = utils.rand_ascii_str(AP_SSID_LENGTH_2G) |
| reboot_device: DeviceType = settings.reboot_device |
| reboot_type: RebootType = settings.reboot_type |
| band: BandType = settings.band |
| ip_version: IpVersionType = settings.ip_version |
| security_mode: SecurityMode = settings.security_mode |
| password: str | None = None |
| if security_mode is not SecurityMode.OPEN: |
| password = generate_random_password(security_mode=security_mode) |
| |
| # Skip hard reboots if no PDU present |
| asserts.skip_if( |
| reboot_type is RebootType.HARD and len(self.pdu_devices) == 0, |
| "Hard reboots require a PDU device.", |
| ) |
| |
| self.setup_ap( |
| ssid, |
| band, |
| ip_version, |
| security_mode, |
| password, |
| ) |
| |
| if not self.dut.associate( |
| ssid, |
| target_security=security_mode, |
| target_pwd=password, |
| ): |
| raise EnvironmentError("Initial network connection failed.") |
| |
| # Run iperf to verify traffic between DUT and AP |
| if ip_version.ipv4(): |
| self.fuchsia_device.wait_for_ipv4_addr( |
| self.iperf_client_on_dut.test_interface |
| ) |
| if ip_version.ipv6(): |
| self.fuchsia_device.wait_for_ipv6_addr( |
| self.iperf_client_on_dut.test_interface |
| ) |
| |
| self.iperf_server_on_ap = self.setup_iperf_server_on_ap(band) |
| self.iperf_server_on_ap.start() |
| wait_for_port(self.iperf_server_on_ap.ssh_settings.hostname, 5201) |
| |
| if ip_version.ipv4(): |
| self.verify_traffic_between_dut_and_ap( |
| self.iperf_server_on_ap, |
| self.iperf_client_on_dut, |
| IpVersionType.IPV4, |
| ) |
| if ip_version.ipv6(): |
| self.verify_traffic_between_dut_and_ap( |
| self.iperf_server_on_ap, |
| self.iperf_client_on_dut, |
| IpVersionType.IPV6, |
| ) |
| |
| # Ping from DUT to AP during AP reboot. This is interrupt testing that we do not |
| # do for DUT reboots because they are prone to threading issues and not |
| # supported. |
| if reboot_device is DeviceType.AP: |
| if ip_version.ipv4(): |
| self.start_dut_ping_process(self.iperf_server_on_ap, IpVersionType.IPV4) |
| if ip_version.ipv6(): |
| self.start_dut_ping_process(self.iperf_server_on_ap, IpVersionType.IPV6) |
| |
| # TODO(b/273923552): We take a snapshot here and during test |
| # teardown for every test because the persistence component does not |
| # make the inspect logs available for 120 seconds. This helps for |
| # debugging issues where we need previous state. |
| self.dut.take_bug_report(self.current_test_info.record) |
| |
| # DUT reboots |
| if reboot_device is DeviceType.DUT: |
| if reboot_type is RebootType.SOFT: |
| self.fuchsia_device.reboot() |
| elif reboot_type is RebootType.HARD: |
| self.dut.hard_power_cycle(self.pdu_devices) |
| self.iperf_client_on_dut = self.dut.create_iperf_client() |
| |
| # AP reboots |
| elif reboot_device is DeviceType.AP: |
| self.iperf_server_on_ap.close_ssh() |
| if reboot_type is RebootType.SOFT: |
| self.log.info("Cleanly stopping ap.") |
| self.access_point.stop_all_aps() |
| elif reboot_type is RebootType.HARD: |
| self.access_point.hard_power_cycle(self.pdu_devices) |
| self.setup_ap(ssid, band, ip_version, security_mode, password) |
| self.iperf_server_on_ap = self.setup_iperf_server_on_ap(band) |
| |
| # TODO(b/286449352): Remove this check once iperf_client has been refactored. |
| assert isinstance( |
| self.iperf_client_on_dut, |
| (iperf_client.IPerfClientOverSsh, iperf_client.IPerfClientOverAdb), |
| ) |
| assert isinstance(self.iperf_client_on_dut.test_interface, str) |
| |
| self.prepare_dut_for_reconnection() |
| uptime = time.time() |
| try: |
| self.wait_for_dut_network_connection(ssid) |
| time_to_reconnect = time.time() - uptime |
| |
| if ip_version.ipv4(): |
| self.fuchsia_device.wait_for_ipv4_addr( |
| self.iperf_client_on_dut.test_interface |
| ) |
| if ip_version.ipv6(): |
| self.fuchsia_device.wait_for_ipv6_addr( |
| self.iperf_client_on_dut.test_interface |
| ) |
| |
| self.iperf_server_on_ap.start() |
| |
| if ip_version.ipv4(): |
| self.verify_traffic_between_dut_and_ap( |
| self.iperf_server_on_ap, |
| self.iperf_client_on_dut, |
| IpVersionType.IPV4, |
| ) |
| if ip_version.ipv6(): |
| self.verify_traffic_between_dut_and_ap( |
| self.iperf_server_on_ap, |
| self.iperf_client_on_dut, |
| IpVersionType.IPV6, |
| ) |
| except ConnectionError as err: |
| self.log_and_continue(ssid, error=err) |
| raise signals.TestFailure(f"Failed to reconnect to {ssid} after reboot.") |
| else: |
| self.log_and_continue(ssid, time_to_reconnect=time_to_reconnect) |
| |
| |
| if __name__ == "__main__": |
| test_runner.main() |