| #!/usr/bin/env python3 |
| # |
| # Copyright 2022 The Fuchsia Authors |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| """ |
| PingStressTest exercises sending ICMP and ICMPv6 pings to a wireless access |
| router and another device behind the AP. Note, this does not reach out to the |
| internet. The DUT is only responsible for sending a routable packet; any |
| communication past the first-hop is not the responsibility of the DUT. |
| """ |
| |
| import logging |
| import multiprocessing |
| from typing import Any, Callable, NamedTuple |
| |
| from mobly import asserts, signals, test_runner |
| |
| from antlion import utils |
| from antlion.controllers.access_point import AccessPoint, setup_ap |
| from antlion.controllers.ap_lib import hostapd_constants |
| from antlion.controllers.iperf_server import IPerfServerOverSsh |
| from antlion.test_utils.abstract_devices.wlan_device import ( |
| AssociationMode, |
| create_wlan_device, |
| ) |
| from antlion.test_utils.wifi import base_test |
| from antlion.utils import rand_ascii_str |
| |
| LOOPBACK_IPV4 = "127.0.0.1" |
| LOOPBACK_IPV6 = "::1" |
| PING_RESULT_TIMEOUT_SEC = 60 * 5 |
| |
| |
| class Addrs(NamedTuple): |
| gateway_ipv4: str |
| gateway_ipv6: str |
| remote_ipv4: str |
| remote_ipv6: str |
| |
| |
| class Test(NamedTuple): |
| name: str |
| dest_ip: str | Callable[[Addrs], str] |
| packet_count: int = 3 |
| interval: int = 1000 |
| timeout: int = 1000 |
| size: int = 25 |
| |
| |
| class PingStressTest(base_test.WifiBaseTest): |
| def setup_generated_tests(self) -> None: |
| self.generate_tests( |
| self.send_ping, |
| lambda test_name, *_: f"test_{test_name}", |
| [ |
| Test("loopback_ipv4", LOOPBACK_IPV4), |
| Test("loopback_ipv6", LOOPBACK_IPV6), |
| Test("gateway_ipv4", lambda addrs: addrs.gateway_ipv4), |
| Test("gateway_ipv6", lambda addrs: addrs.gateway_ipv6), |
| Test("remote_ipv4_small_packet", lambda addrs: addrs.remote_ipv4), |
| Test("remote_ipv6_small_packet", lambda addrs: addrs.remote_ipv6), |
| Test( |
| "remote_ipv4_small_packet_long", |
| lambda addrs: addrs.remote_ipv4, |
| packet_count=50, |
| ), |
| Test( |
| "remote_ipv6_small_packet_long", |
| lambda addrs: addrs.remote_ipv6, |
| packet_count=50, |
| ), |
| Test( |
| "remote_ipv4_medium_packet", |
| lambda addrs: addrs.remote_ipv4, |
| size=64, |
| ), |
| Test( |
| "remote_ipv6_medium_packet", |
| lambda addrs: addrs.remote_ipv6, |
| size=64, |
| ), |
| Test( |
| "remote_ipv4_medium_packet_long", |
| lambda addrs: addrs.remote_ipv4, |
| packet_count=50, |
| timeout=1500, |
| size=64, |
| ), |
| Test( |
| "remote_ipv6_medium_packet_long", |
| lambda addrs: addrs.remote_ipv6, |
| packet_count=50, |
| timeout=1500, |
| size=64, |
| ), |
| Test( |
| "remote_ipv4_large_packet", |
| lambda addrs: addrs.remote_ipv4, |
| size=500, |
| ), |
| Test( |
| "remote_ipv6_large_packet", |
| lambda addrs: addrs.remote_ipv6, |
| size=500, |
| ), |
| Test( |
| "remote_ipv4_large_packet_long", |
| lambda addrs: addrs.remote_ipv4, |
| packet_count=50, |
| timeout=5000, |
| size=500, |
| ), |
| Test( |
| "remote_ipv6_large_packet_long", |
| lambda addrs: addrs.remote_ipv6, |
| packet_count=50, |
| timeout=5000, |
| size=500, |
| ), |
| ], |
| ) |
| |
| def setup_class(self) -> None: |
| super().setup_class() |
| self.log = logging.getLogger() |
| self.ssid = rand_ascii_str(10) |
| |
| if len(self.fuchsia_devices) < 1: |
| raise signals.TestAbortClass("At least one Fuchsia device is required") |
| self.fuchsia_device = self.fuchsia_devices[0] |
| self.dut = create_wlan_device(self.fuchsia_device, AssociationMode.POLICY) |
| |
| if len(self.access_points) < 1: |
| raise signals.TestAbortClass("At least one access point is required") |
| self.access_point: AccessPoint = self.access_points[0] |
| |
| if len(self.iperf_servers) < 1: |
| raise signals.TestAbortClass("At least one iPerf3 server is required") |
| self.iperf_server: IPerfServerOverSsh = self.iperf_servers[0] |
| |
| setup_ap( |
| access_point=self.access_point, |
| profile_name="whirlwind", |
| channel=hostapd_constants.AP_DEFAULT_CHANNEL_2G, |
| ssid=self.ssid, |
| setup_bridge=True, |
| is_ipv6_enabled=True, |
| is_nat_enabled=False, |
| ) |
| |
| ap_bridges = self.access_point.interfaces.get_bridge_interface() |
| if ap_bridges and len(ap_bridges) > 0: |
| ap_bridge = ap_bridges[0] |
| else: |
| asserts.abort_class( |
| f"Expected one bridge interface on the AP, got {ap_bridges}" |
| ) |
| self.ap_ipv4 = utils.get_addr(self.access_point.ssh, ap_bridge) |
| self.ap_ipv6 = utils.get_addr( |
| self.access_point.ssh, ap_bridge, addr_type="ipv6_link_local" |
| ) |
| self.log.info(f"Gateway finished setup ({self.ap_ipv4} | {self.ap_ipv6})") |
| |
| self.iperf_server.renew_test_interface_ip_address() |
| self.iperf_server_ipv4 = self.iperf_server.get_addr() |
| self.iperf_server_ipv6 = self.iperf_server.get_addr( |
| addr_type="ipv6_private_local" |
| ) |
| self.log.info( |
| f"Remote finished setup ({self.iperf_server_ipv4} | {self.iperf_server_ipv6})" |
| ) |
| |
| self.dut.associate(self.ssid) |
| |
| # Wait till the DUT has valid IP addresses after connecting. |
| self.fuchsia_device.wait_for_ipv4_addr( |
| self.dut.get_default_wlan_test_interface() |
| ) |
| self.fuchsia_device.wait_for_ipv6_addr( |
| self.dut.get_default_wlan_test_interface() |
| ) |
| self.log.info("DUT has valid IP addresses on test network") |
| |
| def teardown_class(self): |
| if hasattr(self, "dut"): |
| self.dut.disconnect() |
| self.dut.reset_wifi() |
| if hasattr(self, "access_point"): |
| self.download_ap_logs() |
| self.access_point.stop_all_aps() |
| super().teardown_class() |
| |
| def send_ping( |
| self, |
| _: str, |
| get_addr_fn: str | Callable[[Addrs], str], |
| count: int = 3, |
| interval: int = 1000, |
| timeout: int = 1000, |
| size: int = 25, |
| ) -> None: |
| dest_ip = ( |
| get_addr_fn( |
| Addrs( |
| gateway_ipv4=self.ap_ipv4, |
| # IPv6 link-local addresses require specification of the |
| # outgoing interface as the scope ID when sending packets. |
| gateway_ipv6=f"{self.ap_ipv6}%{self.dut.get_default_wlan_test_interface()}", |
| remote_ipv4=self.iperf_server_ipv4, |
| # IPv6 global addresses do not require scope IDs. |
| remote_ipv6=self.iperf_server_ipv6, |
| ) |
| ) |
| if callable(get_addr_fn) |
| else get_addr_fn |
| ) |
| |
| self.log.info(f"Attempting to ping {dest_ip}...") |
| ping_result = self.dut.can_ping(dest_ip, count, interval, timeout, size) |
| if ping_result: |
| self.log.info("Ping was successful.") |
| else: |
| raise signals.TestFailure("Ping was unsuccessful.") |
| |
| def test_simultaneous_pings(self) -> None: |
| ping_urls = [ |
| self.iperf_server_ipv4, |
| self.ap_ipv4, |
| self.iperf_server_ipv6, |
| f"{self.ap_ipv6}%{self.dut.get_default_wlan_test_interface()}", |
| ] |
| ping_processes: list[multiprocessing.Process] = [] |
| ping_results: list[Any] = [] |
| |
| def ping_from_dut(self, dest_ip, ping_results): |
| self.log.info(f"Attempting to ping {dest_ip}...") |
| ping_result = self.dut.can_ping(dest_ip, count=10, size=50) |
| if ping_result: |
| self.log.info(f"Success pinging: {dest_ip}") |
| else: |
| self.log.info(f"Failure pinging: {dest_ip}") |
| ping_results.append(ping_result) |
| |
| try: |
| # Start multiple ping at the same time |
| for index, url in enumerate(ping_urls): |
| p = multiprocessing.Process( |
| target=ping_from_dut, args=(self, url, ping_results) |
| ) |
| ping_processes.append(p) |
| p.start() |
| |
| # Wait for all processes to complete or timeout |
| for p in ping_processes: |
| p.join(PING_RESULT_TIMEOUT_SEC) |
| |
| finally: |
| is_alive = False |
| |
| for index, p in enumerate(ping_processes): |
| if p.is_alive(): |
| p.terminate() |
| is_alive = True |
| |
| if is_alive: |
| raise signals.TestFailure(f"Timed out while pinging {ping_urls[index]}") |
| |
| for index in range(0, len(ping_results)): |
| if not ping_results[index]: |
| raise signals.TestFailure(f"Failed to ping {ping_urls[index]}") |
| |
| |
| if __name__ == "__main__": |
| test_runner.main() |