| #!/usr/bin/env python3 |
| # |
| # Copyright 2022 The Fuchsia Authors |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| import logging |
| import os |
| import time |
| |
| from mobly import asserts, test_runner |
| from mobly.records import TestResultRecord |
| |
| from antlion import context |
| from antlion.controllers.access_point import setup_ap |
| from antlion.controllers.ap_lib import hostapd_constants |
| from antlion.controllers.ap_lib.hostapd_security import Security, SecurityMode |
| from antlion.controllers.ap_lib.radvd import Radvd |
| from antlion.controllers.ap_lib.radvd_config import RadvdConfig |
| from antlion.controllers.attenuator import get_attenuators_for_device |
| from antlion.controllers.fuchsia_device import FuchsiaDevice |
| from antlion.controllers.iperf_server import IPerfResult |
| from antlion.test_utils.abstract_devices.wlan_device import create_wlan_device |
| from antlion.test_utils.wifi import base_test |
| from antlion.utils import rand_ascii_str |
| |
| AP_11ABG_PROFILE_NAME = "whirlwind_11ag_legacy" |
| REPORTING_SPEED_UNITS = "Mbps" |
| |
| RVR_GRAPH_SUMMARY_FILE = "rvr_summary.html" |
| |
| DAD_TIMEOUT_SEC = 30 |
| |
| |
| def create_rvr_graph(test_name, graph_path, graph_data): |
| """Creates the RvR graphs |
| Args: |
| test_name: The name of test that was run. This is the title of the |
| graph |
| graph_path: Where to put the graph html file. |
| graph_data: A dictionary of the data to be graphed. |
| Returns: |
| A list of bokeh graph objects. |
| """ |
| try: |
| from bokeh.plotting import ColumnDataSource, figure, output_file, save |
| except ImportError: |
| logging.warn( |
| "bokeh is not installed: skipping creation of graphs. " |
| "Note CSV files are still available. If graphs are " |
| 'desired, install antlion with the "bokeh" feature.' |
| ) |
| return [] |
| |
| output_file(f"{graph_path}rvr_throughput_vs_attn_{test_name}.html", title=test_name) |
| throughput_vs_attn_data = ColumnDataSource( |
| data=dict( |
| relative_attn=graph_data["throughput_vs_attn"]["relative_attn"], |
| throughput=graph_data["throughput_vs_attn"]["throughput"], |
| ) |
| ) |
| TOOLTIPS = [("Attenuation", "@relative_attn"), ("Throughput", "@throughput")] |
| throughput_vs_attn_graph = figure( |
| title=f"Throughput vs Relative Attenuation (Test Case: {test_name})", |
| x_axis_label=graph_data["throughput_vs_attn"]["x_label"], |
| y_axis_label=graph_data["throughput_vs_attn"]["y_label"], |
| x_range=graph_data["throughput_vs_attn"]["relative_attn"], |
| tooltips=TOOLTIPS, |
| ) |
| throughput_vs_attn_graph.sizing_mode = "stretch_width" |
| throughput_vs_attn_graph.title.align = "center" |
| throughput_vs_attn_graph.line( |
| "relative_attn", "throughput", source=throughput_vs_attn_data, line_width=2 |
| ) |
| throughput_vs_attn_graph.circle( |
| "relative_attn", "throughput", source=throughput_vs_attn_data, size=10 |
| ) |
| save([throughput_vs_attn_graph]) |
| return [throughput_vs_attn_graph] |
| |
| |
| def write_csv_rvr_data(test_name, csv_path, csv_data): |
| """Writes the CSV data for the RvR test |
| Args: |
| test_name: The name of test that was run. |
| csv_path: Where to put the csv file. |
| csv_data: A dictionary of the data to be put in the csv file. |
| """ |
| csv_file_name = f"{csv_path}rvr_throughput_vs_attn_{test_name}.csv" |
| throughput = csv_data["throughput_vs_attn"]["throughput"] |
| relative_attn = csv_data["throughput_vs_attn"]["relative_attn"] |
| with open(csv_file_name, "w+") as csv_fileId: |
| csv_fileId.write( |
| f"{csv_data['throughput_vs_attn']['x_label']},{csv_data['throughput_vs_attn']['y_label']}\n" |
| ) |
| for csv_loop_counter in range(0, len(relative_attn)): |
| csv_fileId.write( |
| f"{int(relative_attn[csv_loop_counter])},{throughput[csv_loop_counter]}\n" |
| ) |
| |
| |
| class WlanRvrTest(base_test.WifiBaseTest): |
| """Tests running WLAN RvR. |
| |
| Test Bed Requirement: |
| * One Android device or Fuchsia device |
| * One Access Point |
| * One attenuator |
| * One Linux iPerf Server |
| """ |
| |
| def __init__(self, controllers): |
| super().__init__(controllers) |
| self.rvr_graph_summary = [] |
| |
| def setup_class(self): |
| super().setup_class() |
| self.log = logging.getLogger() |
| |
| self.fuchsia_device: FuchsiaDevice | None = None |
| |
| device_type = self.user_params.get("dut", "fuchsia_devices") |
| if device_type == "fuchsia_devices": |
| self.fuchsia_device = self.fuchsia_devices[0] |
| self.dut = create_wlan_device(self.fuchsia_device) |
| elif device_type == "android_devices": |
| self.dut = create_wlan_device(self.android_devices[0]) |
| else: |
| raise ValueError( |
| f'Invalid "dut" type specified in config: "{device_type}".' |
| 'Expected "fuchsia_devices" or "android_devices".' |
| ) |
| |
| self.starting_attn = self.user_params["rvr_settings"].get("starting_attn", 0) |
| |
| self.ending_attn = self.user_params["rvr_settings"].get("ending_attn", 95) |
| |
| self.step_size_in_db = self.user_params["rvr_settings"].get( |
| "step_size_in_db", 1 |
| ) |
| |
| self.dwell_time_in_secs = self.user_params["rvr_settings"].get( |
| "dwell_time_in_secs", 10 |
| ) |
| |
| self.reverse_rvr_after_forward = bool( |
| (self.user_params["rvr_settings"].get("reverse_rvr_after_forward", None)) |
| ) |
| |
| self.iperf_flags = self.user_params["rvr_settings"].get("iperf_flags", "-i 1") |
| |
| self.iperf_flags = f"{self.iperf_flags} -t {self.dwell_time_in_secs} -J" |
| |
| self.debug_loop_count = self.user_params["rvr_settings"].get( |
| "debug_loop_count", 1 |
| ) |
| |
| self.router_adv_daemon = None |
| |
| if self.ending_attn == "auto": |
| self.use_auto_end = True |
| self.ending_attn = 100 |
| if self.step_size_in_db > 2: |
| asserts.fail( |
| "When using an ending attenuation of 'auto' " |
| "please use a value < 2db. Larger jumps will " |
| "break the test reporting." |
| ) |
| |
| self.access_point = self.access_points[0] |
| self.attenuators_2g = get_attenuators_for_device( |
| self.controller_configs["AccessPoint"][0]["Attenuator"], |
| self.attenuators, |
| "attenuator_ports_wifi_2g", |
| ) |
| self.attenuators_5g = get_attenuators_for_device( |
| self.controller_configs["AccessPoint"][0]["Attenuator"], |
| self.attenuators, |
| "attenuator_ports_wifi_5g", |
| ) |
| |
| self.iperf_server = self.iperf_servers[0] |
| |
| if hasattr(self, "iperf_clients") and self.iperf_clients: |
| self.dut_iperf_client = self.iperf_clients[0] |
| else: |
| self.dut_iperf_client = self.dut.create_iperf_client() |
| |
| self.access_point.stop_all_aps() |
| |
| def setup_test(self): |
| super().setup_test() |
| if self.iperf_server: |
| self.iperf_server.start() |
| if hasattr(self, "android_devices"): |
| for ad in self.android_devices: |
| ad.droid.wakeLockAcquireBright() |
| ad.droid.wakeUpNow() |
| self.dut.wifi_toggle_state(True) |
| |
| def teardown_test(self): |
| self.cleanup_tests() |
| super().teardown_test() |
| |
| def teardown_class(self): |
| if self.router_adv_daemon: |
| self.router_adv_daemon.stop() |
| try: |
| from bokeh.plotting import output_file, save |
| |
| output_path = context.get_current_context().get_base_output_path() |
| test_class_name = context.get_current_context().test_class_name |
| |
| output_file( |
| f"{output_path}/{test_class_name}/rvr_summary.html", title="RvR Sumamry" |
| ) |
| save(list(self.rvr_graph_summary)) |
| except ImportError: |
| logging.warn( |
| "bokeh is not installed: skipping creation of graphs. " |
| "Note CSV files are still available. If graphs are " |
| 'desired, install antlion with the "bokeh" feature.' |
| ) |
| except Exception as e: |
| self.log.error(f"Unable to generate RvR summary file: {e}") |
| |
| super().teardown_class() |
| |
| def on_fail(self, record: TestResultRecord): |
| super().on_fail(record) |
| self.cleanup_tests() |
| |
| def cleanup_tests(self): |
| """Cleans up all the dangling pieces of the tests, for example, the |
| iperf server, radvd, all the currently running APs, and the various |
| clients running during the tests. |
| """ |
| |
| if self.router_adv_daemon: |
| output_path = context.get_current_context().get_base_output_path() |
| full_output_path = os.path.join(output_path, "radvd_log.txt") |
| with open(full_output_path, "w") as file: |
| file.write(self.router_adv_daemon.pull_logs()) |
| self.router_adv_daemon.stop() |
| if hasattr(self, "android_devices"): |
| for ad in self.android_devices: |
| ad.droid.wakeLockRelease() |
| ad.droid.goToSleepNow() |
| if self.iperf_server: |
| self.iperf_server.stop() |
| self.dut.turn_location_off_and_scan_toggle_off() |
| self.dut.disconnect() |
| self.dut.reset_wifi() |
| self.download_ap_logs() |
| self.access_point.stop_all_aps() |
| |
| def _wait_for_ipv4_addrs(self): |
| """Wait for an IPv4 addresses to become available on the DUT and iperf |
| server. |
| |
| Returns: |
| A string containing the private IPv4 address of the iperf server. |
| |
| Raises: |
| TestFailure: If unable to acquire a IPv4 address. |
| """ |
| ip_address_checker_counter = 0 |
| ip_address_checker_max_attempts = 3 |
| while ip_address_checker_counter < ip_address_checker_max_attempts: |
| self.iperf_server.renew_test_interface_ip_address() |
| iperf_server_ip_addresses = self.iperf_server.get_interface_ip_addresses( |
| self.iperf_server.test_interface |
| ) |
| assert self.fuchsia_device is not None |
| dut_ip_addresses = self.fuchsia_device.get_interface_ip_addresses( |
| self.dut_iperf_client.test_interface |
| ) |
| |
| self.log.info(f"IPerf server IP info: {iperf_server_ip_addresses}") |
| self.log.info(f"DUT IP info: {dut_ip_addresses}") |
| |
| if not iperf_server_ip_addresses["ipv4_private"]: |
| self.log.warn( |
| "Unable to get the iperf server IPv4 " "address. Retrying..." |
| ) |
| ip_address_checker_counter += 1 |
| time.sleep(1) |
| continue |
| |
| if dut_ip_addresses["ipv4_private"]: |
| return iperf_server_ip_addresses["ipv4_private"][0] |
| |
| self.log.warn( |
| "Unable to get the DUT IPv4 address starting at " |
| f'attenuation "{self.starting_attn}". Retrying...' |
| ) |
| ip_address_checker_counter += 1 |
| time.sleep(1) |
| |
| asserts.fail( |
| "IPv4 addresses are not available on both the DUT and iperf server." |
| ) |
| |
| # TODO (b/258264565): Merge with fuchsia_device wait_for_ipv6_addr. |
| def _wait_for_dad(self, device, test_interface): |
| """Wait for Duplicate Address Detection to resolve so that an |
| private-local IPv6 address is available for test. |
| |
| Args: |
| device: implementor of get_interface_ip_addresses |
| test_interface: name of interface that DAD is operating on |
| |
| Returns: |
| A string containing the private-local IPv6 address of the device. |
| |
| Raises: |
| TestFailure: If unable to acquire an IPv6 address. |
| """ |
| now = time.time() |
| start = now |
| elapsed = now - start |
| |
| while elapsed < DAD_TIMEOUT_SEC: |
| addrs = device.get_interface_ip_addresses(test_interface) |
| now = time.time() |
| elapsed = now - start |
| if addrs["ipv6_private_local"]: |
| # DAD has completed |
| addr = addrs["ipv6_private_local"][0] |
| self.log.info(f'DAD resolved with "{addr}" after {elapsed}s') |
| return addr |
| time.sleep(1) |
| else: |
| asserts.fail( |
| "Unable to acquire a private-local IPv6 address for testing " |
| f"after {elapsed}s" |
| ) |
| |
| def run_rvr( |
| self, |
| ssid, |
| security_mode=None, |
| password=None, |
| band="2g", |
| traffic_dir="tx", |
| ip_version=4, |
| ): |
| """Setups and runs the RvR test |
| |
| Args: |
| ssid: The SSID for the client to associate to. |
| password: Password for the network, if necessary. |
| band: 2g or 5g |
| traffic_dir: rx or tx, bi is not supported by iperf3 |
| ip_version: 4 or 6 |
| |
| Returns: |
| The bokeh graph data. |
| """ |
| throughput: list[int] = [] |
| relative_attn: list[int] = [] |
| if band == "2g": |
| rvr_attenuators = self.attenuators_2g |
| elif band == "5g": |
| rvr_attenuators = self.attenuators_5g |
| else: |
| raise ValueError(f"Invalid WLAN band specified: {band}") |
| if ip_version == 6: |
| self.router_adv_daemon = Radvd( |
| self.access_point.ssh, |
| self.access_point.interfaces.get_bridge_interface()[0], |
| ) |
| radvd_config = RadvdConfig() |
| self.router_adv_daemon.start(radvd_config) |
| |
| for _ in range(0, self.debug_loop_count): |
| for rvr_attenuator in rvr_attenuators: |
| rvr_attenuator.set_atten(self.starting_attn) |
| |
| associate_counter = 0 |
| associate_max_attempts = 3 |
| while associate_counter < associate_max_attempts: |
| if self.dut.associate( |
| ssid, |
| target_pwd=password, |
| target_security=hostapd_constants.SECURITY_STRING_TO_DEFAULT_TARGET_SECURITY.get( |
| security_mode |
| ), |
| check_connectivity=False, |
| ): |
| break |
| else: |
| associate_counter += 1 |
| else: |
| asserts.fail( |
| f"Unable to associate at starting attenuation: {self.starting_attn}" |
| ) |
| |
| if ip_version == 4: |
| iperf_server_ip_address = self._wait_for_ipv4_addrs() |
| elif ip_version == 6: |
| self.iperf_server.renew_test_interface_ip_address() |
| self.log.info( |
| "Waiting for iperf server to complete Duplicate " |
| "Address Detection..." |
| ) |
| iperf_server_ip_address = self._wait_for_dad( |
| self.iperf_server, self.iperf_server.test_interface |
| ) |
| |
| self.log.info( |
| "Waiting for DUT to complete Duplicate Address Detection " |
| f'on interface "{self.dut_iperf_client.test_interface}"...' |
| ) |
| _ = self._wait_for_dad( |
| self.fuchsia_device, self.dut_iperf_client.test_interface |
| ) |
| else: |
| raise ValueError(f"Invalid IP version: {ip_version}") |
| |
| throughput, relative_attn = self.rvr_loop( |
| traffic_dir, |
| rvr_attenuators, |
| iperf_server_ip_address, |
| ip_version, |
| throughput=throughput, |
| relative_attn=relative_attn, |
| ) |
| if self.reverse_rvr_after_forward: |
| throughput, relative_attn = self.rvr_loop( |
| traffic_dir, |
| rvr_attenuators, |
| iperf_server_ip_address, |
| ip_version, |
| ssid=ssid, |
| security_mode=security_mode, |
| password=password, |
| reverse=True, |
| throughput=throughput, |
| relative_attn=relative_attn, |
| ) |
| self.dut.disconnect() |
| |
| throughput_vs_attn = { |
| "throughput": throughput, |
| "relative_attn": relative_attn, |
| "x_label": "Attenuation(db)", |
| "y_label": f"Throughput({REPORTING_SPEED_UNITS})", |
| } |
| graph_data = {"throughput_vs_attn": throughput_vs_attn} |
| return graph_data |
| |
| def rvr_loop( |
| self, |
| traffic_dir, |
| rvr_attenuators, |
| iperf_server_ip_address, |
| ip_version, |
| ssid=None, |
| security_mode=None, |
| password=None, |
| reverse=False, |
| throughput=None, |
| relative_attn=None, |
| ): |
| """The loop that goes through each attenuation level and runs the iperf |
| throughput pair. |
| Args: |
| traffic_dir: The traffic direction from the perspective of the DUT. |
| rvr_attenuators: A list of attenuators to set. |
| iperf_server_ip_address: The IP address of the iperf server. |
| ssid: The ssid of the wireless network that the should associated |
| to. |
| password: Password of the wireless network. |
| reverse: Whether to run RvR test starting from the highest |
| attenuation and going to the lowest. This is run after the |
| normal low attenuation to high attenuation RvR test. |
| throughput: The list of throughput data for the test. |
| relative_attn: The list of attenuation data for the test. |
| |
| Returns: |
| throughput: The list of throughput data for the test. |
| relative_attn: The list of attenuation data for the test. |
| """ |
| iperf_flags = self.iperf_flags |
| if traffic_dir == "rx": |
| iperf_flags = f"{self.iperf_flags} -R" |
| starting_attn = self.starting_attn |
| ending_attn = self.ending_attn |
| step_size_in_db = self.step_size_in_db |
| if reverse: |
| starting_attn = self.ending_attn |
| ending_attn = self.starting_attn |
| step_size_in_db = step_size_in_db * -1 |
| self.dut.disconnect() |
| for step in range(starting_attn, ending_attn, step_size_in_db): |
| try: |
| for attenuator in rvr_attenuators: |
| attenuator.set_atten(step) |
| except ValueError as e: |
| self.log.error( |
| f"{step} is beyond the max or min of the testbed " |
| f"attenuator's capability. Stopping. {e}" |
| ) |
| break |
| self.log.info(f"Set relative attenuation to {step} db") |
| |
| associated = self.dut.is_connected() |
| if associated: |
| self.log.info("DUT is currently associated.") |
| else: |
| self.log.info("DUT is not currently associated.") |
| |
| if reverse: |
| if not associated: |
| self.log.info( |
| f"Trying to associate at relative attenuation of {step} db" |
| ) |
| if self.dut.associate( |
| ssid, |
| target_pwd=password, |
| target_security=hostapd_constants.SECURITY_STRING_TO_DEFAULT_TARGET_SECURITY.get( |
| security_mode |
| ), |
| check_connectivity=False, |
| ): |
| associated = True |
| self.log.info("Successfully associated.") |
| else: |
| associated = False |
| self.log.info( |
| f"Association failed. Marking a 0 {REPORTING_SPEED_UNITS} " |
| "for throughput. Skipping running traffic." |
| ) |
| attn_value_inserted = False |
| value_to_insert = str(step) |
| while not attn_value_inserted: |
| if value_to_insert in relative_attn: |
| value_to_insert = f"{value_to_insert} " |
| else: |
| relative_attn.append(value_to_insert) |
| attn_value_inserted = True |
| |
| assert self.fuchsia_device is not None |
| dut_ip_addresses = self.fuchsia_device.get_interface_ip_addresses( |
| self.dut_iperf_client.test_interface |
| ) |
| if ip_version == 4: |
| if not dut_ip_addresses["ipv4_private"]: |
| self.log.info( |
| "DUT does not have an IPv4 address. " |
| "Traffic attempt to be run if the server " |
| "is pingable." |
| ) |
| else: |
| ipv4_private = dut_ip_addresses["ipv4_private"][0] |
| self.log.info( |
| f'DUT has the following IPv4 address: "{ipv4_private}"' |
| ) |
| elif ip_version == 6: |
| if not dut_ip_addresses["ipv6_private_local"]: |
| self.log.info( |
| "DUT does not have an IPv6 address. " |
| "Traffic attempt to be run if the server " |
| "is pingable." |
| ) |
| else: |
| ipv6_private_local = dut_ip_addresses["ipv6_private_local"][0] |
| self.log.info( |
| f'DUT has the following IPv6 address: "{ipv6_private_local}"' |
| ) |
| server_pingable = self.dut.can_ping(iperf_server_ip_address) |
| if not server_pingable: |
| self.log.info( |
| f'Iperf server "{iperf_server_ip_address}" is not pingable. ' |
| f"Marking a 0 {REPORTING_SPEED_UNITS} for throughput. " |
| "Skipping running traffic." |
| ) |
| else: |
| self.log.info(f'Iperf server "{iperf_server_ip_address}" is pingable.') |
| if server_pingable: |
| if traffic_dir == "tx": |
| self.log.info( |
| f"Running traffic DUT to {iperf_server_ip_address} at relative " |
| f"attenuation of {step}" |
| ) |
| elif traffic_dir == "rx": |
| self.log.info( |
| f"Running traffic {iperf_server_ip_address} to DUT at relative " |
| f"attenuation of {step}" |
| ) |
| else: |
| raise ValueError("Invalid traffic direction") |
| try: |
| iperf_tag = "decreasing" |
| if reverse: |
| iperf_tag = "increasing" |
| iperf_results_file = self.dut_iperf_client.start( |
| iperf_server_ip_address, |
| iperf_flags, |
| f"{iperf_tag}_{traffic_dir}_{self.starting_attn}", |
| timeout=(self.dwell_time_in_secs * 2), |
| ) |
| except TimeoutError as e: |
| iperf_results_file = None |
| self.log.error( |
| f"Iperf traffic timed out. Marking 0 {REPORTING_SPEED_UNITS} for " |
| f"throughput. {e}" |
| ) |
| |
| if not iperf_results_file: |
| throughput.append(0) |
| else: |
| try: |
| iperf_results = IPerfResult( |
| iperf_results_file, |
| reporting_speed_units=REPORTING_SPEED_UNITS, |
| ) |
| if iperf_results.error: |
| self.iperf_server.stop() |
| self.iperf_server.start() |
| self.log.error( |
| f"Errors in iperf logs:\n{iperf_results.error}" |
| ) |
| if not iperf_results.avg_send_rate: |
| throughput.append(0) |
| else: |
| throughput.append(iperf_results.avg_send_rate) |
| except ValueError as e: |
| self.iperf_server.stop() |
| self.iperf_server.start() |
| self.log.error( |
| f"No data in iPerf3 file. Marking 0 {REPORTING_SPEED_UNITS} " |
| f"for throughput: {e}" |
| ) |
| throughput.append(0) |
| except Exception as e: |
| self.iperf_server.stop() |
| self.iperf_server.start() |
| self.log.error( |
| f"Unknown exception. Marking 0 {REPORTING_SPEED_UNITS} for " |
| f"throughput: {e}" |
| ) |
| self.log.error(e) |
| throughput.append(0) |
| |
| self.log.info( |
| f"Iperf traffic complete. {traffic_dir} traffic received at " |
| f"{throughput[-1]} {REPORTING_SPEED_UNITS} at relative attenuation " |
| f"of {str(relative_attn[-1]).strip()} db" |
| ) |
| |
| else: |
| self.log.debug(f"DUT Associated: {associated}") |
| self.log.debug(f"{iperf_server_ip_address} pingable: {server_pingable}") |
| throughput.append(0) |
| return throughput, relative_attn |
| |
| def test_rvr_11ac_5g_80mhz_open_tx_ipv4(self): |
| ssid = rand_ascii_str(20) |
| setup_ap( |
| access_point=self.access_point, |
| profile_name="whirlwind", |
| channel=hostapd_constants.AP_DEFAULT_CHANNEL_5G, |
| ssid=ssid, |
| setup_bridge=True, |
| ) |
| graph_data = self.run_rvr(ssid, band="5g", traffic_dir="tx", ip_version=4) |
| for rvr_graph in create_rvr_graph( |
| self.current_test_info.name, |
| context.get_current_context().get_full_output_path(), |
| graph_data, |
| ): |
| self.rvr_graph_summary.append(rvr_graph) |
| write_csv_rvr_data( |
| self.current_test_info.name, |
| context.get_current_context().get_full_output_path(), |
| graph_data, |
| ) |
| |
| def test_rvr_11ac_5g_80mhz_open_rx_ipv4(self): |
| ssid = rand_ascii_str(20) |
| setup_ap( |
| access_point=self.access_point, |
| profile_name="whirlwind", |
| channel=hostapd_constants.AP_DEFAULT_CHANNEL_5G, |
| ssid=ssid, |
| setup_bridge=True, |
| ) |
| graph_data = self.run_rvr(ssid, band="5g", traffic_dir="rx", ip_version=4) |
| for rvr_graph in create_rvr_graph( |
| self.current_test_info.name, |
| context.get_current_context().get_full_output_path(), |
| graph_data, |
| ): |
| self.rvr_graph_summary.append(rvr_graph) |
| write_csv_rvr_data( |
| self.current_test_info.name, |
| context.get_current_context().get_full_output_path(), |
| graph_data, |
| ) |
| |
| def test_rvr_11ac_5g_80mhz_open_tx_ipv6(self): |
| ssid = rand_ascii_str(20) |
| setup_ap( |
| access_point=self.access_point, |
| profile_name="whirlwind", |
| channel=hostapd_constants.AP_DEFAULT_CHANNEL_5G, |
| ssid=ssid, |
| setup_bridge=True, |
| ) |
| graph_data = self.run_rvr(ssid, band="5g", traffic_dir="tx", ip_version=6) |
| for rvr_graph in create_rvr_graph( |
| self.current_test_info.name, |
| context.get_current_context().get_full_output_path(), |
| graph_data, |
| ): |
| self.rvr_graph_summary.append(rvr_graph) |
| write_csv_rvr_data( |
| self.current_test_info.name, |
| context.get_current_context().get_full_output_path(), |
| graph_data, |
| ) |
| |
| def test_rvr_11ac_5g_80mhz_open_rx_ipv6(self): |
| ssid = rand_ascii_str(20) |
| setup_ap( |
| access_point=self.access_point, |
| profile_name="whirlwind", |
| channel=hostapd_constants.AP_DEFAULT_CHANNEL_5G, |
| ssid=ssid, |
| setup_bridge=True, |
| ) |
| graph_data = self.run_rvr(ssid, band="5g", traffic_dir="rx", ip_version=6) |
| for rvr_graph in create_rvr_graph( |
| self.current_test_info.name, |
| context.get_current_context().get_full_output_path(), |
| graph_data, |
| ): |
| self.rvr_graph_summary.append(rvr_graph) |
| write_csv_rvr_data( |
| self.current_test_info.name, |
| context.get_current_context().get_full_output_path(), |
| graph_data, |
| ) |
| |
| def test_rvr_11ac_5g_80mhz_wpa2_tx_ipv4(self): |
| ssid = rand_ascii_str(20) |
| password = rand_ascii_str(20) |
| security_profile = Security(security_mode=SecurityMode.WPA2, password=password) |
| setup_ap( |
| access_point=self.access_point, |
| profile_name="whirlwind", |
| channel=hostapd_constants.AP_DEFAULT_CHANNEL_5G, |
| ssid=ssid, |
| security=security_profile, |
| setup_bridge=True, |
| ) |
| graph_data = self.run_rvr( |
| ssid, |
| security_mode="wpa2", |
| password=password, |
| band="5g", |
| traffic_dir="tx", |
| ip_version=4, |
| ) |
| for rvr_graph in create_rvr_graph( |
| self.current_test_info.name, |
| context.get_current_context().get_full_output_path(), |
| graph_data, |
| ): |
| self.rvr_graph_summary.append(rvr_graph) |
| write_csv_rvr_data( |
| self.current_test_info.name, |
| context.get_current_context().get_full_output_path(), |
| graph_data, |
| ) |
| |
| def test_rvr_11ac_5g_80mhz_wpa2_rx_ipv4(self): |
| ssid = rand_ascii_str(20) |
| password = rand_ascii_str(20) |
| security_profile = Security(security_mode=SecurityMode.WPA2, password=password) |
| setup_ap( |
| access_point=self.access_point, |
| profile_name="whirlwind", |
| channel=hostapd_constants.AP_DEFAULT_CHANNEL_5G, |
| ssid=ssid, |
| security=security_profile, |
| setup_bridge=True, |
| ) |
| graph_data = self.run_rvr( |
| ssid, |
| security_mode="wpa2", |
| password=password, |
| band="5g", |
| traffic_dir="rx", |
| ip_version=4, |
| ) |
| for rvr_graph in create_rvr_graph( |
| self.current_test_info.name, |
| context.get_current_context().get_full_output_path(), |
| graph_data, |
| ): |
| self.rvr_graph_summary.append(rvr_graph) |
| write_csv_rvr_data( |
| self.current_test_info.name, |
| context.get_current_context().get_full_output_path(), |
| graph_data, |
| ) |
| |
| def test_rvr_11ac_5g_80mhz_wpa2_tx_ipv6(self): |
| ssid = rand_ascii_str(20) |
| password = rand_ascii_str(20) |
| security_profile = Security(security_mode=SecurityMode.WPA2, password=password) |
| setup_ap( |
| access_point=self.access_point, |
| profile_name="whirlwind", |
| channel=hostapd_constants.AP_DEFAULT_CHANNEL_5G, |
| ssid=ssid, |
| security=security_profile, |
| setup_bridge=True, |
| ) |
| graph_data = self.run_rvr( |
| ssid, |
| security_mode="wpa2", |
| password=password, |
| band="5g", |
| traffic_dir="tx", |
| ip_version=6, |
| ) |
| for rvr_graph in create_rvr_graph( |
| self.current_test_info.name, |
| context.get_current_context().get_full_output_path(), |
| graph_data, |
| ): |
| self.rvr_graph_summary.append(rvr_graph) |
| write_csv_rvr_data( |
| self.current_test_info.name, |
| context.get_current_context().get_full_output_path(), |
| graph_data, |
| ) |
| |
| def test_rvr_11ac_5g_80mhz_wpa2_rx_ipv6(self): |
| ssid = rand_ascii_str(20) |
| password = rand_ascii_str(20) |
| security_profile = Security(security_mode=SecurityMode.WPA2, password=password) |
| setup_ap( |
| access_point=self.access_point, |
| profile_name="whirlwind", |
| channel=hostapd_constants.AP_DEFAULT_CHANNEL_5G, |
| ssid=ssid, |
| security=security_profile, |
| setup_bridge=True, |
| ) |
| graph_data = self.run_rvr( |
| ssid, |
| security_mode="wpa2", |
| password=password, |
| band="5g", |
| traffic_dir="rx", |
| ip_version=6, |
| ) |
| for rvr_graph in create_rvr_graph( |
| self.current_test_info.name, |
| context.get_current_context().get_full_output_path(), |
| graph_data, |
| ): |
| self.rvr_graph_summary.append(rvr_graph) |
| write_csv_rvr_data( |
| self.current_test_info.name, |
| context.get_current_context().get_full_output_path(), |
| graph_data, |
| ) |
| |
| def test_rvr_11n_2g_20mhz_open_tx_ipv4(self): |
| ssid = rand_ascii_str(20) |
| setup_ap( |
| access_point=self.access_point, |
| profile_name="whirlwind", |
| channel=hostapd_constants.AP_DEFAULT_CHANNEL_2G, |
| ssid=ssid, |
| setup_bridge=True, |
| ) |
| graph_data = self.run_rvr(ssid, band="2g", traffic_dir="tx", ip_version=4) |
| for rvr_graph in create_rvr_graph( |
| self.current_test_info.name, |
| context.get_current_context().get_full_output_path(), |
| graph_data, |
| ): |
| self.rvr_graph_summary.append(rvr_graph) |
| write_csv_rvr_data( |
| self.current_test_info.name, |
| context.get_current_context().get_full_output_path(), |
| graph_data, |
| ) |
| |
| def test_rvr_11n_2g_20mhz_open_rx_ipv4(self): |
| ssid = rand_ascii_str(20) |
| setup_ap( |
| access_point=self.access_point, |
| profile_name="whirlwind", |
| channel=hostapd_constants.AP_DEFAULT_CHANNEL_2G, |
| ssid=ssid, |
| setup_bridge=True, |
| ) |
| graph_data = self.run_rvr(ssid, band="2g", traffic_dir="rx", ip_version=4) |
| for rvr_graph in create_rvr_graph( |
| self.current_test_info.name, |
| context.get_current_context().get_full_output_path(), |
| graph_data, |
| ): |
| self.rvr_graph_summary.append(rvr_graph) |
| write_csv_rvr_data( |
| self.current_test_info.name, |
| context.get_current_context().get_full_output_path(), |
| graph_data, |
| ) |
| |
| def test_rvr_11n_2g_20mhz_open_tx_ipv6(self): |
| ssid = rand_ascii_str(20) |
| setup_ap( |
| access_point=self.access_point, |
| profile_name="whirlwind", |
| channel=hostapd_constants.AP_DEFAULT_CHANNEL_2G, |
| ssid=ssid, |
| setup_bridge=True, |
| ) |
| graph_data = self.run_rvr(ssid, band="2g", traffic_dir="tx", ip_version=6) |
| for rvr_graph in create_rvr_graph( |
| self.current_test_info.name, |
| context.get_current_context().get_full_output_path(), |
| graph_data, |
| ): |
| self.rvr_graph_summary.append(rvr_graph) |
| write_csv_rvr_data( |
| self.current_test_info.name, |
| context.get_current_context().get_full_output_path(), |
| graph_data, |
| ) |
| |
| def test_rvr_11n_2g_20mhz_open_rx_ipv6(self): |
| ssid = rand_ascii_str(20) |
| setup_ap( |
| access_point=self.access_point, |
| profile_name="whirlwind", |
| channel=hostapd_constants.AP_DEFAULT_CHANNEL_2G, |
| ssid=ssid, |
| setup_bridge=True, |
| ) |
| graph_data = self.run_rvr(ssid, band="2g", traffic_dir="rx", ip_version=6) |
| for rvr_graph in create_rvr_graph( |
| self.current_test_info.name, |
| context.get_current_context().get_full_output_path(), |
| graph_data, |
| ): |
| self.rvr_graph_summary.append(rvr_graph) |
| write_csv_rvr_data( |
| self.current_test_info.name, |
| context.get_current_context().get_full_output_path(), |
| graph_data, |
| ) |
| |
| def test_rvr_11n_2g_20mhz_wpa2_tx_ipv4(self): |
| ssid = rand_ascii_str(20) |
| password = rand_ascii_str(20) |
| security_profile = Security(security_mode=SecurityMode.WPA2, password=password) |
| setup_ap( |
| access_point=self.access_point, |
| profile_name="whirlwind", |
| channel=hostapd_constants.AP_DEFAULT_CHANNEL_2G, |
| ssid=ssid, |
| security=security_profile, |
| setup_bridge=True, |
| ) |
| graph_data = self.run_rvr( |
| ssid, |
| security_mode="wpa2", |
| password=password, |
| band="2g", |
| traffic_dir="tx", |
| ip_version=4, |
| ) |
| for rvr_graph in create_rvr_graph( |
| self.current_test_info.name, |
| context.get_current_context().get_full_output_path(), |
| graph_data, |
| ): |
| self.rvr_graph_summary.append(rvr_graph) |
| write_csv_rvr_data( |
| self.current_test_info.name, |
| context.get_current_context().get_full_output_path(), |
| graph_data, |
| ) |
| |
| def test_rvr_11n_2g_20mhz_wpa2_rx_ipv4(self): |
| ssid = rand_ascii_str(20) |
| password = rand_ascii_str(20) |
| security_profile = Security(security_mode=SecurityMode.WPA2, password=password) |
| setup_ap( |
| access_point=self.access_point, |
| profile_name="whirlwind", |
| channel=hostapd_constants.AP_DEFAULT_CHANNEL_2G, |
| ssid=ssid, |
| security=security_profile, |
| setup_bridge=True, |
| ) |
| graph_data = self.run_rvr( |
| ssid, |
| security_mode="wpa2", |
| password=password, |
| band="2g", |
| traffic_dir="rx", |
| ip_version=4, |
| ) |
| for rvr_graph in create_rvr_graph( |
| self.current_test_info.name, |
| context.get_current_context().get_full_output_path(), |
| graph_data, |
| ): |
| self.rvr_graph_summary.append(rvr_graph) |
| write_csv_rvr_data( |
| self.current_test_info.name, |
| context.get_current_context().get_full_output_path(), |
| graph_data, |
| ) |
| |
| def test_rvr_11n_2g_20mhz_wpa2_tx_ipv6(self): |
| ssid = rand_ascii_str(20) |
| password = rand_ascii_str(20) |
| security_profile = Security(security_mode=SecurityMode.WPA2, password=password) |
| setup_ap( |
| access_point=self.access_point, |
| profile_name="whirlwind", |
| channel=hostapd_constants.AP_DEFAULT_CHANNEL_2G, |
| ssid=ssid, |
| security=security_profile, |
| setup_bridge=True, |
| ) |
| graph_data = self.run_rvr( |
| ssid, |
| security_mode="wpa2", |
| password=password, |
| band="2g", |
| traffic_dir="tx", |
| ip_version=6, |
| ) |
| for rvr_graph in create_rvr_graph( |
| self.current_test_info.name, |
| context.get_current_context().get_full_output_path(), |
| graph_data, |
| ): |
| self.rvr_graph_summary.append(rvr_graph) |
| write_csv_rvr_data( |
| self.current_test_info.name, |
| context.get_current_context().get_full_output_path(), |
| graph_data, |
| ) |
| |
| def test_rvr_11n_2g_20mhz_wpa2_rx_ipv6(self): |
| ssid = rand_ascii_str(20) |
| password = rand_ascii_str(20) |
| security_profile = Security(security_mode=SecurityMode.WPA2, password=password) |
| setup_ap( |
| access_point=self.access_point, |
| profile_name="whirlwind", |
| channel=hostapd_constants.AP_DEFAULT_CHANNEL_2G, |
| ssid=ssid, |
| security=security_profile, |
| setup_bridge=True, |
| ) |
| graph_data = self.run_rvr( |
| ssid, |
| security_mode="wpa2", |
| password=password, |
| band="2g", |
| traffic_dir="rx", |
| ip_version=6, |
| ) |
| for rvr_graph in create_rvr_graph( |
| self.current_test_info.name, |
| context.get_current_context().get_full_output_path(), |
| graph_data, |
| ): |
| self.rvr_graph_summary.append(rvr_graph) |
| write_csv_rvr_data( |
| self.current_test_info.name, |
| context.get_current_context().get_full_output_path(), |
| graph_data, |
| ) |
| |
| |
| if __name__ == "__main__": |
| test_runner.main() |