| #!/usr/bin/env python3 |
| # |
| # Copyright 2022 The Fuchsia Authors |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| import itertools |
| import logging |
| import time |
| from dataclasses import dataclass |
| from enum import StrEnum, auto, unique |
| |
| from mobly import asserts, signals, test_runner |
| from mobly.config_parser import TestRunConfig |
| from mobly.records import TestResultRecord |
| |
| from antlion.controllers.ap_lib.hostapd_ap_preset import create_ap_preset |
| from antlion.controllers.ap_lib.hostapd_constants import BandType |
| from antlion.controllers.ap_lib.hostapd_security import Security, SecurityMode |
| from antlion.controllers.ap_lib.radvd_config import RadvdConfig |
| from antlion.controllers.attenuator import ( |
| Attenuator, |
| get_attenuators_for_device, |
| ) |
| from antlion.controllers.fuchsia_device import FuchsiaDevice |
| from antlion.controllers.iperf_server import IPerfResult, IPerfServerOverSsh |
| from antlion.test_utils.abstract_devices.wlan_device import AssociationMode |
| from antlion.test_utils.wifi import base_test |
| from antlion.utils import rand_ascii_str |
| from antlion.validation import MapValidator |
| |
| REPORTING_SPEED_UNITS = "Mbps" |
| DAD_TIMEOUT_SEC = 30 |
| |
| |
| @unique |
| class TrafficDirection(StrEnum): |
| RX = auto() |
| TX = auto() |
| |
| |
| @unique |
| class IPVersion(StrEnum): |
| V4 = "ipv4" |
| V6 = "ipv6" |
| |
| |
| @dataclass(frozen=True) |
| class RateByRange: |
| relative_attn: int |
| throughput: float |
| |
| |
| @dataclass(frozen=True) |
| class TestParams: |
| band: BandType |
| security: Security |
| ip_version: IPVersion |
| direction: TrafficDirection |
| |
| |
| def write_csv_rvr_data( |
| test_name: str, csv_path: str, results: list[RateByRange] |
| ) -> None: |
| """Writes the CSV data for the RvR test |
| Args: |
| test_name: The name of test that was run. |
| csv_path: Where to put the csv file. |
| csv_data: A dictionary of the data to be put in the csv file. |
| """ |
| csv_file_name = f"{csv_path}rvr_throughput_vs_attn_{test_name}.csv" |
| with open(csv_file_name, "w+") as csv_fileId: |
| csv_fileId.write( |
| f"Attenuation(db),Throughput({REPORTING_SPEED_UNITS})\n" |
| ) |
| for res in results: |
| csv_fileId.write(f"{res.relative_attn},{res.throughput}\n") |
| |
| |
| class WlanRvrTest(base_test.WifiBaseTest): |
| """Tests running WLAN RvR. |
| |
| Test Bed Requirement: |
| * One Android device or Fuchsia device |
| * One Access Point |
| * One attenuator |
| * One Linux iPerf Server |
| """ |
| |
| def __init__(self, configs: TestRunConfig) -> None: |
| super().__init__(configs) |
| self.log = logging.getLogger() |
| self.rvr_graph_summary: list[object] = [] |
| |
| params = MapValidator(self.user_params["rvr_settings"]) |
| self.starting_attn = params.get(int, "starting_attn", 0) |
| self.ending_attn = params.get(int, "ending_attn", 95) |
| self.step_size_in_db = params.get(int, "step_size_in_db", 1) |
| self.dwell_time_in_secs = params.get(int, "dwell_time_in_secs", 10) |
| |
| self.reverse_rvr_after_forward = params.get( |
| bool, "reverse_rvr_after_forward", False |
| ) |
| self.iperf_flags = params.get(str, "iperf_flags", "-i 1") |
| self.iperf_flags += f" -t {self.dwell_time_in_secs} -J" |
| |
| self.fuchsia_device, self.dut = self.get_dut_type( |
| FuchsiaDevice, AssociationMode.POLICY |
| ) |
| |
| if len(self.access_points) == 0: |
| raise signals.TestAbortClass("Requires at least one access point") |
| self.access_point = self.access_points[0] |
| |
| self.attenuators_2g = get_attenuators_for_device( |
| self.controller_configs["AccessPoint"][0]["Attenuator"], |
| self.attenuators, |
| "attenuator_ports_wifi_2g", |
| ) |
| self.attenuators_5g = get_attenuators_for_device( |
| self.controller_configs["AccessPoint"][0]["Attenuator"], |
| self.attenuators, |
| "attenuator_ports_wifi_5g", |
| ) |
| |
| self.iperf_server = self.iperf_servers[0] |
| |
| if hasattr(self, "iperf_clients") and self.iperf_clients: |
| self.dut_iperf_client = self.iperf_clients[0] |
| else: |
| self.dut_iperf_client = self.dut.create_iperf_client() |
| |
| def pre_run(self) -> None: |
| test_params: list[TestParams] = [] |
| |
| for ( |
| band, |
| security_mode, |
| ip_version, |
| direction, |
| ) in itertools.product( |
| [e for e in BandType], |
| [SecurityMode.OPEN, SecurityMode.WPA2], |
| [e for e in IPVersion], |
| [e for e in TrafficDirection], |
| ): |
| password: str | None = None |
| if security_mode is not SecurityMode.OPEN: |
| password = rand_ascii_str(20) |
| security = Security(security_mode, password) |
| test_params.append( |
| TestParams( |
| band, |
| security, |
| ip_version, |
| direction, |
| ) |
| ) |
| |
| def generate_test_name(t: TestParams) -> str: |
| # TODO(http://b/303659781): Keep mode in sync with hostapd. |
| mode = "11n" if t.band is BandType.BAND_2G else "11ac" |
| frequency = "20mhz" if t.band is BandType.BAND_2G else "80mhz" |
| return ( |
| f"test_rvr_{mode}_{t.band}_{frequency}_{t.security}_" |
| f"{t.direction}_{t.ip_version}" |
| ) |
| |
| self.generate_tests( |
| self._test_rvr, generate_test_name, [(p,) for p in test_params] |
| ) |
| |
| def setup_test(self) -> None: |
| super().setup_test() |
| self.iperf_server.start() |
| if hasattr(self, "android_devices"): |
| for ad in self.android_devices: |
| ad.droid.wakeLockAcquireBright() |
| ad.droid.wakeUpNow() |
| self.dut.wifi_toggle_state(True) |
| self.dut.disconnect() |
| self.access_point.stop_all_aps() |
| |
| def teardown_test(self) -> None: |
| self.cleanup_tests() |
| super().teardown_test() |
| |
| def on_fail(self, record: TestResultRecord) -> None: |
| super().on_fail(record) |
| self.cleanup_tests() |
| |
| def cleanup_tests(self) -> None: |
| """Cleans up all the dangling pieces of the tests, for example, the |
| iperf server, radvd, all the currently running APs, and the various |
| clients running during the tests. |
| """ |
| self.download_logs() |
| if hasattr(self, "android_devices"): |
| for ad in self.android_devices: |
| ad.droid.wakeLockRelease() |
| ad.droid.goToSleepNow() |
| self.iperf_server.stop() |
| self.dut.turn_location_off_and_scan_toggle_off() |
| self.dut.disconnect() |
| self.dut.reset_wifi() |
| self.access_point.stop_all_aps() |
| |
| def _wait_for_iperf_ipv4_addr(self) -> str: |
| """Wait for an IPv4 addresses to become available on the iperf server. |
| |
| Returns: |
| The private IPv4 address of the iperf server. |
| |
| Raises: |
| TestFailure: If unable to acquire a IPv4 address. |
| """ |
| ip_address_checker_counter = 0 |
| ip_address_checker_max_attempts = 3 |
| while ip_address_checker_counter < ip_address_checker_max_attempts: |
| self.iperf_server.renew_test_interface_ip_address() |
| iperf_server_ip_addresses = ( |
| self.iperf_server.get_interface_ip_addresses( |
| self.iperf_server.test_interface |
| ) |
| ) |
| self.log.info(f"IPerf server IP info: {iperf_server_ip_addresses}") |
| |
| if not iperf_server_ip_addresses["ipv4_private"]: |
| self.log.warning( |
| "Unable to get the iperf server IPv4 " |
| "address. Retrying..." |
| ) |
| ip_address_checker_counter += 1 |
| time.sleep(1) |
| continue |
| |
| return iperf_server_ip_addresses["ipv4_private"][0] |
| |
| raise signals.TestFailure("IPv4 address not available on iperf server.") |
| |
| def _wait_for_iperf_dad(self) -> str: |
| """Wait for Duplicate Address Detection to resolve so that an |
| private-local IPv6 address is available for test. |
| |
| Returns: |
| A string containing the private-local IPv6 address of the iperf server. |
| |
| Raises: |
| TestFailure: If unable to acquire an IPv6 address. |
| """ |
| now = time.time() |
| start = now |
| elapsed = now - start |
| |
| while elapsed < DAD_TIMEOUT_SEC: |
| addrs = self.iperf_server.get_interface_ip_addresses( |
| self.iperf_server.test_interface |
| ) |
| now = time.time() |
| elapsed = now - start |
| if addrs["ipv6_private_local"]: |
| # DAD has completed |
| addr = addrs["ipv6_private_local"][0] |
| self.log.info( |
| f'DAD on iperf server resolved with "{addr}" after {elapsed}s' |
| ) |
| return addr |
| time.sleep(1) |
| |
| raise signals.TestFailure( |
| "Iperf server unable to acquire a private-local IPv6 address for testing " |
| f"after {elapsed}s" |
| ) |
| |
| def run_rvr( |
| self, |
| ssid: str, |
| security: Security | None, |
| band: BandType, |
| traffic_dir: TrafficDirection, |
| ip_version: IPVersion, |
| ) -> list[RateByRange]: |
| """Setups and runs the RvR test |
| |
| Args: |
| ssid: The SSID for the client to associate to. |
| security: Security of the AP |
| band: 2g or 5g |
| traffic_dir: rx or tx, bi is not supported by iperf3 |
| ip_version: 4 or 6 |
| |
| Returns: |
| The bokeh graph data. |
| """ |
| match band: |
| case BandType.BAND_2G: |
| rvr_attenuators = self.attenuators_2g |
| case BandType.BAND_5G: |
| rvr_attenuators = self.attenuators_5g |
| |
| for rvr_attenuator in rvr_attenuators: |
| rvr_attenuator.set_atten(self.starting_attn) |
| |
| # Attempt association to the AP multiple times. This makes the test more |
| # resilient to AP flakes that may result in the DUT not being able to |
| # find the network in its scan results. |
| associate_counter = 0 |
| associate_max_attempts = 3 |
| while associate_counter < associate_max_attempts: |
| self.dut.disconnect() |
| |
| self.access_point.stop_all_aps() |
| self.access_point.start_ap( |
| hostapd_config=create_ap_preset( |
| iface_wlan_2g=self.access_point.wlan_2g, |
| iface_wlan_5g=self.access_point.wlan_5g, |
| profile_name="whirlwind", |
| channel=band.default_channel(), |
| ssid=ssid, |
| security=security, |
| ), |
| radvd_config=( |
| RadvdConfig() if ip_version is IPVersion.V6 else None |
| ), |
| setup_bridge=True, |
| ) |
| |
| if self.dut.associate( |
| ssid, |
| target_pwd=security.password if security else None, |
| target_security=( |
| security.security_mode if security else SecurityMode.OPEN |
| ), |
| check_connectivity=False, |
| ): |
| break |
| else: |
| associate_counter += 1 |
| else: |
| asserts.fail( |
| f"Unable to associate at starting attenuation: {self.starting_attn}" |
| ) |
| |
| match ip_version: |
| case IPVersion.V4: |
| iperf_server_ip_address = self._wait_for_iperf_ipv4_addr() |
| case IPVersion.V6: |
| self.iperf_server.renew_test_interface_ip_address() |
| self.log.info( |
| "Waiting for iperf server to complete Duplicate " |
| "Address Detection..." |
| ) |
| iperf_server_ip_address = self._wait_for_iperf_dad() |
| |
| results = self.rvr_loop( |
| traffic_dir, |
| rvr_attenuators, |
| iperf_server_ip_address, |
| ip_version, |
| ssid, |
| security=security, |
| reverse=False, |
| ) |
| if self.reverse_rvr_after_forward: |
| results = results + self.rvr_loop( |
| traffic_dir, |
| rvr_attenuators, |
| iperf_server_ip_address, |
| ip_version, |
| ssid=ssid, |
| security=security, |
| reverse=True, |
| ) |
| |
| return results |
| |
| def rvr_loop( |
| self, |
| traffic_dir: TrafficDirection, |
| rvr_attenuators: list[Attenuator], |
| iperf_server_ip_address: str, |
| ip_version: IPVersion, |
| ssid: str, |
| security: Security | None, |
| reverse: bool, |
| ) -> list[RateByRange]: |
| """The loop that goes through each attenuation level and runs the iperf |
| throughput pair. |
| Args: |
| traffic_dir: The traffic direction from the perspective of the DUT. |
| rvr_attenuators: A list of attenuators to set. |
| iperf_server_ip_address: The IP address of the iperf server. |
| ssid: The ssid of the wireless network that the should associated |
| to. |
| password: Password of the wireless network. |
| reverse: Whether to run RvR test starting from the highest |
| attenuation and going to the lowest. This is run after the |
| normal low attenuation to high attenuation RvR test. |
| throughput: The list of throughput data for the test. |
| relative_attn: The list of attenuation data for the test. |
| |
| Returns: |
| throughput: The list of throughput data for the test. |
| relative_attn: The list of attenuation data for the test. |
| """ |
| starting_attn = self.starting_attn |
| ending_attn = self.ending_attn |
| step_size_in_db = self.step_size_in_db |
| if reverse: |
| starting_attn = self.ending_attn |
| ending_attn = self.starting_attn |
| step_size_in_db = step_size_in_db * -1 |
| self.dut.disconnect() |
| |
| results: list[RateByRange] = [] |
| |
| for step in range(starting_attn, ending_attn, step_size_in_db): |
| try: |
| for attenuator in rvr_attenuators: |
| self.log.info( |
| f"Setting relative attenuation of {attenuator.instrument.address} " |
| f"to {step} dB" |
| ) |
| attenuator.set_atten(step) |
| except ValueError as e: |
| self.log.error( |
| f"{step} is beyond the max or min of the testbed " |
| f"attenuator's capability. Stopping. {e}" |
| ) |
| break |
| |
| self.log.info(f"Running iperf at relative attenuation of {step} dB") |
| |
| throughput = self._run_iperf( |
| traffic_dir, |
| iperf_server_ip_address, |
| ip_version, |
| ssid, |
| security, |
| reverse, |
| ) |
| self.log.info( |
| f"Iperf traffic complete. {traffic_dir} traffic received at " |
| f"{throughput} {REPORTING_SPEED_UNITS} at relative attenuation " |
| f"of {step} db" |
| ) |
| results.append(RateByRange(step, throughput)) |
| |
| return results |
| |
| def _run_iperf( |
| self, |
| traffic_dir: TrafficDirection, |
| iperf_server_ip_address: str, |
| ip_version: IPVersion, |
| ssid: str, |
| security: Security | None, |
| reverse: bool, |
| ) -> float: |
| iperf_flags = self.iperf_flags |
| if traffic_dir is TrafficDirection.RX: |
| iperf_flags = f"{self.iperf_flags} -R" |
| |
| if not self.dut.is_connected(): |
| if reverse: |
| # In reverse mode, we're going from a high attenuation (weak |
| # signal) to a low attenuation (strong signal). It's expected |
| # that the DUT is not connected to the AP at the high |
| # attenuation level(s), so if we're disconnected here, we |
| # should try to associate. |
| self.log.info(f"Trying to associate") |
| if self.dut.associate( |
| ssid, |
| target_pwd=security.password if security else None, |
| target_security=( |
| security.security_mode |
| if security |
| else SecurityMode.OPEN |
| ), |
| check_connectivity=False, |
| ): |
| self.log.info("Successfully associated.") |
| try: |
| self.log.debug("Getting DUT IP address") |
| assert self.dut_iperf_client.test_interface is not None |
| if ip_version is IPVersion.V4: |
| self.fuchsia_device.wait_for_ipv4_addr( |
| self.dut_iperf_client.test_interface |
| ) |
| elif ip_version is IPVersion.V6: |
| self.fuchsia_device.wait_for_ipv6_addr( |
| self.dut_iperf_client.test_interface |
| ) |
| except ConnectionError: |
| self.log.info( |
| f"Association succeeded, but unable to get DUT IP address. Marking a 0 {REPORTING_SPEED_UNITS} " |
| "for throughput. Skipping running traffic and disconnecting." |
| ) |
| # Disconnect the DUT, so that we have a fresh attempt |
| # to get an IP at the next iteration of this reverse |
| # test. |
| self.dut.disconnect() |
| return 0 |
| else: |
| self.log.info( |
| f"Association failed. Marking a 0 {REPORTING_SPEED_UNITS} " |
| "for throughput. Skipping running traffic." |
| ) |
| return 0 |
| else: |
| self.log.info( |
| f"Device no longer associated. Marking a 0 {REPORTING_SPEED_UNITS} " |
| "for throughput. Skipping running traffic." |
| ) |
| return 0 |
| |
| self.log.debug("Pinging iperf server from DUT") |
| ping_result = self.dut.ping(iperf_server_ip_address) |
| if not ping_result.success: |
| self.log.info( |
| f'Iperf server "{iperf_server_ip_address}" is not pingable. ' |
| f"Marking a 0 {REPORTING_SPEED_UNITS} for throughput. " |
| "Skipping running traffic." |
| ) |
| self.log.debug(f"{iperf_server_ip_address} pingable: {ping_result}") |
| return 0 |
| |
| self.log.info(f'Iperf server "{iperf_server_ip_address}" is pingable.') |
| |
| match traffic_dir: |
| case TrafficDirection.TX: |
| self.log.info( |
| f"Running traffic from DUT to iperf server ({iperf_server_ip_address})" |
| ) |
| case TrafficDirection.RX: |
| self.log.info( |
| f"Running traffic from iperf server ({iperf_server_ip_address}) to DUT" |
| ) |
| |
| try: |
| iperf_tag = "decreasing" |
| if reverse: |
| iperf_tag = "increasing" |
| iperf_results_file = self.dut_iperf_client.start( |
| iperf_server_ip_address, |
| iperf_flags, |
| f"{iperf_tag}_{traffic_dir}_{self.starting_attn}", |
| timeout=(self.dwell_time_in_secs * 2), |
| ) |
| except TimeoutError as e: |
| iperf_results_file = None |
| self.log.error( |
| f"Iperf traffic timed out. Marking 0 {REPORTING_SPEED_UNITS} for " |
| f"throughput. {e}" |
| ) |
| return 0 |
| |
| if not iperf_results_file: |
| return 0 |
| |
| try: |
| iperf_results = IPerfResult( |
| iperf_results_file, |
| reporting_speed_units=REPORTING_SPEED_UNITS, |
| ) |
| if iperf_results.error: |
| self.iperf_server.stop() |
| self.iperf_server.start() |
| self.log.error(f"Errors in iperf logs:\n{iperf_results.error}") |
| if iperf_results.avg_send_rate: |
| return iperf_results.avg_send_rate |
| |
| self.log.error( |
| '"avg_send_rate" not found in iPerf3 results file. Marking 0 ' |
| f"{REPORTING_SPEED_UNITS} for throughput." |
| f"\n{iperf_results.get_json()}" |
| ) |
| return 0 |
| except ValueError as e: |
| self.iperf_server.stop() |
| self.iperf_server.start() |
| self.log.error( |
| f"No data in iPerf3 file. Marking 0 {REPORTING_SPEED_UNITS} " |
| f"for throughput: {e}" |
| ) |
| return 0 |
| except Exception as e: |
| self.iperf_server.stop() |
| self.iperf_server.start() |
| self.log.error( |
| f"Unknown exception. Marking 0 {REPORTING_SPEED_UNITS} for " |
| f"throughput: {e}" |
| ) |
| return 0 |
| |
| def _test_rvr(self, t: TestParams) -> None: |
| ssid = rand_ascii_str(20) |
| self.access_point.stop_all_aps() |
| results = self.run_rvr( |
| ssid, |
| security=t.security, |
| band=t.band, |
| traffic_dir=t.direction, |
| ip_version=t.ip_version, |
| ) |
| write_csv_rvr_data( |
| self.current_test_info.name, |
| self.current_test_info.output_path, |
| results, |
| ) |
| |
| |
| if __name__ == "__main__": |
| test_runner.main() |