| #!/usr/bin/env python3 |
| # |
| # Copyright 2022 The Fuchsia Authors |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| import logging |
| import os |
| import time |
| from statistics import pstdev |
| |
| from mobly import asserts, test_runner |
| |
| from antlion import context, utils |
| from antlion.controllers.access_point import setup_ap |
| from antlion.controllers.ap_lib import hostapd_config, hostapd_constants |
| from antlion.controllers.ap_lib.hostapd_security import Security |
| from antlion.controllers.iperf_server import IPerfResult |
| from antlion.test_utils.abstract_devices.wlan_device import create_wlan_device |
| from antlion.test_utils.wifi import base_test |
| |
| N_CAPABILITIES_DEFAULT = [ |
| hostapd_constants.N_CAPABILITY_LDPC, |
| hostapd_constants.N_CAPABILITY_SGI20, |
| hostapd_constants.N_CAPABILITY_SGI40, |
| hostapd_constants.N_CAPABILITY_TX_STBC, |
| hostapd_constants.N_CAPABILITY_RX_STBC1, |
| ] |
| |
| AC_CAPABILITIES_DEFAULT = [ |
| hostapd_constants.AC_CAPABILITY_MAX_MPDU_11454, |
| hostapd_constants.AC_CAPABILITY_RXLDPC, |
| hostapd_constants.AC_CAPABILITY_SHORT_GI_80, |
| hostapd_constants.AC_CAPABILITY_TX_STBC_2BY1, |
| hostapd_constants.AC_CAPABILITY_RX_STBC_1, |
| hostapd_constants.AC_CAPABILITY_MAX_A_MPDU_LEN_EXP7, |
| hostapd_constants.AC_CAPABILITY_RX_ANTENNA_PATTERN, |
| hostapd_constants.AC_CAPABILITY_TX_ANTENNA_PATTERN, |
| ] |
| |
| DEFAULT_MIN_THROUGHPUT = 0 |
| DEFAULT_MAX_STD_DEV = 1 |
| DEFAULT_IPERF_TIMEOUT = 30 |
| |
| DEFAULT_TIME_TO_WAIT_FOR_IP_ADDR = 30 |
| GRAPH_CIRCLE_SIZE = 10 |
| IPERF_NO_THROUGHPUT_VALUE = 0 |
| MAX_2_4_CHANNEL = 14 |
| TIME_TO_SLEEP_BETWEEN_RETRIES = 1 |
| TIME_TO_WAIT_FOR_COUNTRY_CODE = 10 |
| WEP_HEX_STRING_LENGTH = 10 |
| |
| MEGABITS_PER_SECOND = "Mbps" |
| |
| |
| def get_test_name(settings): |
| """Retrieves the test_name value from test_settings""" |
| return settings.get("test_name") |
| |
| |
| class ChannelSweepTest(base_test.WifiBaseTest): |
| """Tests channel performance and regulatory compliance.. |
| |
| Testbed Requirement: |
| * One ACTS compatible device (dut) |
| * One Access Point |
| * One Linux Machine used as IPerfServer if running performance tests |
| Note: Performance tests should be done in isolated testbed. |
| """ |
| |
| def __init__(self, controllers): |
| super().__init__(controllers) |
| if "channel_sweep_test_params" in self.user_params: |
| self.time_to_wait_for_ip_addr = self.user_params[ |
| "channel_sweep_test_params" |
| ].get("time_to_wait_for_ip_addr", DEFAULT_TIME_TO_WAIT_FOR_IP_ADDR) |
| else: |
| self.time_to_wait_for_ip_addr = DEFAULT_TIME_TO_WAIT_FOR_IP_ADDR |
| |
| def setup_class(self): |
| super().setup_class() |
| self.log = logging.getLogger() |
| |
| device_type = self.user_params.get("dut", "fuchsia_devices") |
| if device_type == "fuchsia_devices": |
| self.dut = create_wlan_device(self.fuchsia_devices[0]) |
| elif device_type == "android_devices": |
| self.dut = create_wlan_device(self.android_devices[0]) |
| else: |
| raise ValueError( |
| f'Invalid "dut" type specified in config: "{device_type}".' |
| 'Expected "fuchsia_devices" or "android_devices".' |
| ) |
| |
| self.android_devices = getattr(self, "android_devices", []) |
| |
| self.access_point = self.access_points[0] |
| self.access_point.stop_all_aps() |
| |
| self.iperf_server = None |
| self.iperf_client = None |
| |
| self.channel_sweep_test_params = self.user_params.get( |
| "channel_sweep_test_params", {} |
| ) |
| # Allows users to skip the iperf throughput measurements, just verifying |
| # association. |
| if not self.channel_sweep_test_params.get("skip_performance"): |
| try: |
| self.iperf_server = self.iperf_servers[0] |
| self.iperf_server.start() |
| except AttributeError: |
| self.log.warn( |
| "Missing iperf config. Throughput cannot be measured, so only " |
| "association will be tested." |
| ) |
| |
| if hasattr(self, "iperf_clients") and self.iperf_clients: |
| self.iperf_client = self.iperf_clients[0] |
| else: |
| self.iperf_client = self.dut.create_iperf_client() |
| |
| self.regulatory_results = "====CountryCode,Channel,Frequency,ChannelBandwith,Connected/Not-Connected====\n" |
| |
| def teardown_class(self): |
| super().teardown_class() |
| output_path = context.get_current_context().get_base_output_path() |
| regulatory_save_path = "%s/ChannelSweepTest/%s" % ( |
| output_path, |
| "regulatory_results.txt", |
| ) |
| f = open(regulatory_save_path, "w") |
| f.write(self.regulatory_results) |
| f.close() |
| |
| def setup_test(self): |
| # TODO(fxb/46417): Uncomment when wlanClearCountry is implemented up any |
| # country code changes. |
| # for fd in self.fuchsia_devices: |
| # phy_ids_response = fd.wlan_lib.wlanPhyIdList() |
| # if phy_ids_response.get('error'): |
| # raise ConnectionError( |
| # 'Failed to retrieve phy ids from FuchsiaDevice (%s). ' |
| # 'Error: %s' % (fd.ip, phy_ids_response['error'])) |
| # for id in phy_ids_response['result']: |
| # clear_country_response = fd.wlan_lib.wlanClearCountry(id) |
| # if clear_country_response.get('error'): |
| # raise EnvironmentError( |
| # 'Failed to reset country code on FuchsiaDevice (%s). ' |
| # 'Error: %s' % (fd.ip, clear_country_response['error']) |
| # ) |
| self.access_point.stop_all_aps() |
| for ad in self.android_devices: |
| ad.droid.wakeLockAcquireBright() |
| ad.droid.wakeUpNow() |
| self.dut.wifi_toggle_state(True) |
| self.dut.disconnect() |
| |
| def teardown_test(self): |
| for ad in self.android_devices: |
| ad.droid.wakeLockRelease() |
| ad.droid.goToSleepNow() |
| self.dut.turn_location_off_and_scan_toggle_off() |
| self.dut.disconnect() |
| self.download_ap_logs() |
| self.access_point.stop_all_aps() |
| |
| def set_dut_country_code(self, country_code): |
| """Set the country code on the DUT. Then verify that the country |
| code was set successfully |
| |
| Args: |
| country_code: string, the 2 character country code to set |
| """ |
| self.log.info("Setting DUT country code to %s" % country_code) |
| country_code_response = self.dut.device.sl4f.regulatory_region_lib.setRegion( |
| country_code |
| ) |
| if country_code_response.get("error"): |
| raise EnvironmentError( |
| "Failed to set country code (%s) on DUT. Error: %s" |
| % (country_code, country_code_response["error"]) |
| ) |
| |
| self.log.info( |
| "Verifying DUT country code was correctly set to %s." % country_code |
| ) |
| phy_ids_response = self.dut.device.sl4f.wlan_lib.wlanPhyIdList() |
| if phy_ids_response.get("error"): |
| raise ConnectionError( |
| "Failed to get phy ids from DUT. Error: %s" |
| % (country_code, phy_ids_response["error"]) |
| ) |
| |
| end_time = time.time() + TIME_TO_WAIT_FOR_COUNTRY_CODE |
| while time.time() < end_time: |
| for id in phy_ids_response["result"]: |
| get_country_response = self.dut.device.sl4f.wlan_lib.wlanGetCountry(id) |
| if get_country_response.get("error"): |
| raise ConnectionError( |
| "Failed to query PHY ID (%s) for country. Error: %s" |
| % (id, get_country_response["error"]) |
| ) |
| |
| set_code = "".join( |
| [chr(ascii_char) for ascii_char in get_country_response["result"]] |
| ) |
| if set_code != country_code: |
| self.log.debug( |
| "PHY (id: %s) has incorrect country code set. " |
| "Expected: %s, Got: %s" % (id, country_code, set_code) |
| ) |
| break |
| else: |
| self.log.info("All PHYs have expected country code (%s)" % country_code) |
| break |
| time.sleep(TIME_TO_SLEEP_BETWEEN_RETRIES) |
| else: |
| raise EnvironmentError( |
| "Failed to set DUT country code to %s." % country_code |
| ) |
| |
| def setup_ap(self, channel, channel_bandwidth, security_profile=None): |
| """Start network on AP with basic configuration. |
| |
| Args: |
| channel: int, channel to use for network |
| channel_bandwidth: int, channel bandwidth in mhz to use for network, |
| security_profile: Security object, or None if open |
| |
| Returns: |
| string, ssid of network running |
| |
| Raises: |
| ConnectionError if network is not started successfully. |
| """ |
| if channel > MAX_2_4_CHANNEL: |
| vht_bandwidth = channel_bandwidth |
| else: |
| vht_bandwidth = None |
| |
| if channel_bandwidth == hostapd_constants.CHANNEL_BANDWIDTH_20MHZ: |
| n_capabilities = N_CAPABILITIES_DEFAULT + [ |
| hostapd_constants.N_CAPABILITY_HT20 |
| ] |
| elif ( |
| channel_bandwidth == hostapd_constants.CHANNEL_BANDWIDTH_40MHZ |
| or channel_bandwidth == hostapd_constants.CHANNEL_BANDWIDTH_80MHZ |
| ): |
| if hostapd_config.ht40_plus_allowed(channel): |
| extended_channel = [hostapd_constants.N_CAPABILITY_HT40_PLUS] |
| elif hostapd_config.ht40_minus_allowed(channel): |
| extended_channel = [hostapd_constants.N_CAPABILITY_HT40_MINUS] |
| else: |
| raise ValueError("Invalid Channel: %s" % channel) |
| n_capabilities = N_CAPABILITIES_DEFAULT + extended_channel |
| else: |
| raise ValueError("Invalid Bandwidth: %s" % channel_bandwidth) |
| ssid = utils.rand_ascii_str(hostapd_constants.AP_SSID_LENGTH_2G) |
| try: |
| setup_ap( |
| access_point=self.access_point, |
| profile_name="whirlwind", |
| channel=channel, |
| security=security_profile, |
| n_capabilities=n_capabilities, |
| ac_capabilities=None, |
| force_wmm=True, |
| ssid=ssid, |
| vht_bandwidth=vht_bandwidth, |
| setup_bridge=True, |
| ) |
| except Exception as err: |
| raise ConnectionError( |
| "Failed to setup ap on channel: %s, channel bandwidth: %smhz. " |
| "Error: %s" % (channel, channel_bandwidth, err) |
| ) |
| else: |
| self.log.info( |
| "Network (ssid: %s) up on channel %s w/ channel bandwidth %smhz" |
| % (ssid, channel, channel_bandwidth) |
| ) |
| |
| return ssid |
| |
| def get_and_verify_iperf_address(self, channel, device, interface=None): |
| """Get ip address from a devices interface and verify it belongs to |
| expected subnet based on APs DHCP config. |
| |
| Args: |
| channel: int, channel network is running on, to determine subnet |
| device: device to get ip address for |
| interface (default: None): interface on device to get ip address. |
| If None, uses device.test_interface. |
| |
| Returns: |
| String, ip address of device on given interface (or test_interface) |
| |
| Raises: |
| ConnectionError, if device does not have a valid ip address after |
| all retries. |
| """ |
| if channel <= MAX_2_4_CHANNEL: |
| subnet = self.access_point._AP_2G_SUBNET_STR |
| else: |
| subnet = self.access_point._AP_5G_SUBNET_STR |
| end_time = time.time() + self.time_to_wait_for_ip_addr |
| while time.time() < end_time: |
| if interface: |
| device_addresses = device.get_interface_ip_addresses(interface) |
| else: |
| device_addresses = device.get_interface_ip_addresses( |
| device.test_interface |
| ) |
| |
| if device_addresses["ipv4_private"]: |
| for ip_addr in device_addresses["ipv4_private"]: |
| if utils.ip_in_subnet(ip_addr, subnet): |
| return ip_addr |
| else: |
| self.log.debug( |
| "Device has an ip address (%s), but it is not in " |
| "subnet %s" % (ip_addr, subnet) |
| ) |
| else: |
| self.log.debug("Device does not have a valid ip address. Retrying.") |
| time.sleep(TIME_TO_SLEEP_BETWEEN_RETRIES) |
| raise ConnectionError("Device failed to get an ip address.") |
| |
| def get_iperf_throughput( |
| self, iperf_server_address, iperf_client_address, reverse=False |
| ): |
| """Run iperf between client and server and get the throughput. |
| |
| Args: |
| iperf_server_address: string, ip address of running iperf server |
| iperf_client_address: string, ip address of iperf client (dut) |
| reverse (default: False): If True, run traffic in reverse direction, |
| from server to client. |
| |
| Returns: |
| int, iperf throughput OR IPERF_NO_THROUGHPUT_VALUE, if iperf fails |
| """ |
| if reverse: |
| self.log.info( |
| "Running IPerf traffic from server (%s) to dut (%s)." |
| % (iperf_server_address, iperf_client_address) |
| ) |
| iperf_results_file = self.iperf_client.start( |
| iperf_server_address, |
| "-i 1 -t 10 -R -J", |
| "channel_sweep_rx", |
| timeout=DEFAULT_IPERF_TIMEOUT, |
| ) |
| else: |
| self.log.info( |
| "Running IPerf traffic from dut (%s) to server (%s)." |
| % (iperf_client_address, iperf_server_address) |
| ) |
| iperf_results_file = self.iperf_client.start( |
| iperf_server_address, |
| "-i 1 -t 10 -J", |
| "channel_sweep_tx", |
| timeout=DEFAULT_IPERF_TIMEOUT, |
| ) |
| if iperf_results_file: |
| iperf_results = IPerfResult( |
| iperf_results_file, reporting_speed_units=MEGABITS_PER_SECOND |
| ) |
| return iperf_results.avg_send_rate |
| else: |
| return IPERF_NO_THROUGHPUT_VALUE |
| |
| def log_to_file_and_throughput_data( |
| self, channel, channel_bandwidth, tx_throughput, rx_throughput |
| ): |
| """Write performance info to csv file and to throughput data. |
| |
| Args: |
| channel: int, channel that test was run on |
| channel_bandwidth: int, channel bandwidth the test used |
| tx_throughput: float, throughput value from dut to iperf server |
| rx_throughput: float, throughput value from iperf server to dut |
| """ |
| test_name = self.throughput_data["test"] |
| output_path = context.get_current_context().get_base_output_path() |
| log_path = "%s/ChannelSweepTest/%s" % (output_path, test_name) |
| if not os.path.exists(log_path): |
| os.makedirs(log_path) |
| log_file = "%s/%s_%smhz.csv" % (log_path, test_name, channel_bandwidth) |
| self.log.info("Writing IPerf results for %s to %s" % (test_name, log_file)) |
| with open(log_file, "a") as csv_file: |
| csv_file.write("%s,%s,%s\n" % (channel, tx_throughput, rx_throughput)) |
| self.throughput_data["results"][str(channel)] = { |
| "tx_throughput": tx_throughput, |
| "rx_throughput": rx_throughput, |
| } |
| |
| def write_graph(self): |
| """Create graph html files from throughput data, plotting channel vs |
| tx_throughput and channel vs rx_throughput. |
| """ |
| # If performance measurement is skipped |
| if not self.iperf_server: |
| return |
| |
| try: |
| from bokeh.plotting import ColumnDataSource, figure, output_file, save |
| except ImportError: |
| self.log.warn( |
| "bokeh is not installed: skipping creation of graphs. " |
| "Note CSV files are still available. If graphs are " |
| 'desired, install antlion with the "bokeh" feature.' |
| ) |
| return |
| |
| output_path = context.get_current_context().get_base_output_path() |
| test_name = self.throughput_data["test"] |
| channel_bandwidth = self.throughput_data["channel_bandwidth"] |
| output_file_name = "%s/ChannelSweepTest/%s/%s_%smhz.html" % ( |
| output_path, |
| test_name, |
| test_name, |
| channel_bandwidth, |
| ) |
| output_file(output_file_name) |
| channels = [] |
| tx_throughputs = [] |
| rx_throughputs = [] |
| for channel in self.throughput_data["results"]: |
| channels.append(str(channel)) |
| tx_throughputs.append( |
| self.throughput_data["results"][channel]["tx_throughput"] |
| ) |
| rx_throughputs.append( |
| self.throughput_data["results"][channel]["rx_throughput"] |
| ) |
| channel_vs_throughput_data = ColumnDataSource( |
| data=dict( |
| channels=channels, |
| tx_throughput=tx_throughputs, |
| rx_throughput=rx_throughputs, |
| ) |
| ) |
| TOOLTIPS = [ |
| ("Channel", "@channels"), |
| ("TX_Throughput", "@tx_throughput"), |
| ("RX_Throughput", "@rx_throughput"), |
| ] |
| channel_vs_throughput_graph = figure( |
| title="Channels vs. Throughput", |
| x_axis_label="Channels", |
| x_range=channels, |
| y_axis_label="Throughput", |
| tooltips=TOOLTIPS, |
| ) |
| channel_vs_throughput_graph.sizing_mode = "stretch_both" |
| channel_vs_throughput_graph.title.align = "center" |
| channel_vs_throughput_graph.line( |
| "channels", |
| "tx_throughput", |
| source=channel_vs_throughput_data, |
| line_width=2, |
| line_color="blue", |
| legend_label="TX_Throughput", |
| ) |
| channel_vs_throughput_graph.circle( |
| "channels", |
| "tx_throughput", |
| source=channel_vs_throughput_data, |
| size=GRAPH_CIRCLE_SIZE, |
| color="blue", |
| ) |
| channel_vs_throughput_graph.line( |
| "channels", |
| "rx_throughput", |
| source=channel_vs_throughput_data, |
| line_width=2, |
| line_color="red", |
| legend_label="RX_Throughput", |
| ) |
| channel_vs_throughput_graph.circle( |
| "channels", |
| "rx_throughput", |
| source=channel_vs_throughput_data, |
| size=GRAPH_CIRCLE_SIZE, |
| color="red", |
| ) |
| |
| channel_vs_throughput_graph.legend.location = "top_left" |
| graph_file = save([channel_vs_throughput_graph]) |
| self.log.info("Saved graph to %s" % graph_file) |
| |
| def verify_standard_deviation(self, max_std_dev): |
| """Verifies the standard deviation of the throughput across the channels |
| does not exceed the max_std_dev value. |
| |
| Args: |
| max_std_dev: float, max standard deviation of throughput for a test |
| to pass (in Mb/s) |
| |
| Raises: |
| TestFailure, if standard deviation of throughput exceeds max_std_dev |
| """ |
| # If performance measurement is skipped |
| if not self.iperf_server: |
| return |
| self.log.info( |
| "Verifying standard deviation across channels does not " |
| "exceed max standard deviation of %s Mb/s" % max_std_dev |
| ) |
| tx_values = [] |
| rx_values = [] |
| for channel in self.throughput_data["results"]: |
| if self.throughput_data["results"][channel]["tx_throughput"] is not None: |
| tx_values.append( |
| self.throughput_data["results"][channel]["tx_throughput"] |
| ) |
| if self.throughput_data["results"][channel]["rx_throughput"] is not None: |
| rx_values.append( |
| self.throughput_data["results"][channel]["rx_throughput"] |
| ) |
| tx_std_dev = pstdev(tx_values) |
| rx_std_dev = pstdev(rx_values) |
| if tx_std_dev > max_std_dev or rx_std_dev > max_std_dev: |
| asserts.fail( |
| "With %smhz channel bandwidth, throughput standard " |
| "deviation (tx: %s Mb/s, rx: %s Mb/s) exceeds max standard " |
| "deviation (%s Mb/s)." |
| % ( |
| self.throughput_data["channel_bandwidth"], |
| tx_std_dev, |
| rx_std_dev, |
| max_std_dev, |
| ) |
| ) |
| else: |
| asserts.explicit_pass( |
| "Throughput standard deviation (tx: %s Mb/s, rx: %s Mb/s) " |
| "with %smhz channel bandwidth does not exceed maximum (%s Mb/s)." |
| % ( |
| tx_std_dev, |
| rx_std_dev, |
| self.throughput_data["channel_bandwidth"], |
| max_std_dev, |
| ) |
| ) |
| |
| def run_channel_performance_tests(self, settings): |
| """Test function for running channel performance tests. Used by both |
| explicit test cases and debug test cases from config. Runs a performance |
| test for each channel in test_channels with test_channel_bandwidth, then |
| writes a graph and csv file of the channel vs throughput. |
| |
| Args: |
| settings: dict, containing the following test settings |
| test_channels: list of channels to test. |
| test_channel_bandwidth: int, channel bandwidth to use for test. |
| test_security (optional): string, security type to use for test. |
| min_tx_throughput (optional, default: 0): float, minimum tx |
| throughput threshold to pass individual channel tests |
| (in Mb/s). |
| min_rx_throughput (optional, default: 0): float, minimum rx |
| throughput threshold to pass individual channel tests |
| (in Mb/s). |
| max_std_dev (optional, default: 1): float, maximum standard |
| deviation of throughput across all test channels to pass |
| test (in Mb/s). |
| base_test_name (optional): string, test name prefix to use with |
| generated subtests. |
| country_name (optional): string, country name from |
| hostapd_constants to set on device. |
| country_code (optional): string, two-char country code to set on |
| the DUT. Takes priority over country_name. |
| test_name (debug tests only): string, the test name for this |
| parent test case from the config file. In explicit tests, |
| this is not necessary. |
| |
| Writes: |
| CSV file: channel, tx_throughput, rx_throughput |
| for every test channel. |
| Graph: channel vs tx_throughput & channel vs rx_throughput |
| |
| Raises: |
| TestFailure, if throughput standard deviation across channels |
| exceeds max_std_dev |
| |
| Example Explicit Test (see EOF for debug JSON example): |
| def test_us_2g_20mhz_wpa2(self): |
| self.run_channel_performance_tests( |
| dict( |
| test_channels=hostapd_constants.US_CHANNELS_2G, |
| test_channel_bandwidth=20, |
| test_security=hostapd_constants.WPA2_STRING, |
| min_tx_throughput=2, |
| min_rx_throughput=4, |
| max_std_dev=0.75, |
| country_code='US', |
| base_test_name='test_us')) |
| """ |
| test_channels = settings["test_channels"] |
| test_channel_bandwidth = settings["test_channel_bandwidth"] |
| test_security = settings.get("test_security", None) |
| test_name = settings.get("test_name", self.test_name) |
| min_tx_throughput = settings.get("min_tx_throughput", DEFAULT_MIN_THROUGHPUT) |
| min_rx_throughput = settings.get("min_rx_throughput", DEFAULT_MIN_THROUGHPUT) |
| max_std_dev = settings.get("max_std_dev", DEFAULT_MAX_STD_DEV) |
| country_code = settings.get("country_code") |
| country_name = settings.get("country_name") |
| country_label = None |
| |
| if country_code: |
| country_label = country_code |
| self.set_dut_country_code(country_code) |
| elif country_name: |
| country_label = country_name |
| code = hostapd_constants.COUNTRY_CODE[country_name]["country_code"] |
| self.set_dut_country_code(code) |
| |
| self.throughput_data = { |
| "test": test_name, |
| "channel_bandwidth": test_channel_bandwidth, |
| "results": {}, |
| } |
| test_list = [] |
| for channel in test_channels: |
| sub_test_name = "test_%schannel_%s_%smhz_%s_performance" % ( |
| "%s_" % country_label if country_label else "", |
| channel, |
| test_channel_bandwidth, |
| test_security if test_security else "open", |
| ) |
| test_list.append( |
| { |
| "test_name": sub_test_name, |
| "channel": int(channel), |
| "channel_bandwidth": int(test_channel_bandwidth), |
| "security": test_security, |
| "min_tx_throughput": min_tx_throughput, |
| "min_rx_throughput": min_rx_throughput, |
| } |
| ) |
| self.run_generated_testcases( |
| self.get_channel_performance, settings=test_list, name_func=get_test_name |
| ) |
| self.log.info("Channel tests completed.") |
| self.write_graph() |
| self.verify_standard_deviation(max_std_dev) |
| |
| def get_channel_performance(self, settings): |
| """Run a single channel performance test and logs results to csv file |
| and throughput data. Run with generated sub test cases in |
| run_channel_performance_tests. |
| |
| 1. Sets up network with test settings |
| 2. Associates DUT |
| 3. Runs traffic between DUT and iperf server (both directions) |
| 4. Logs channel, tx_throughput (Mb/s), and rx_throughput (Mb/s) to |
| log file and throughput data. |
| 5. Checks throughput values against minimum throughput thresholds. |
| |
| Args: |
| settings: see run_channel_performance_tests |
| |
| Raises: |
| TestFailure, if throughput (either direction) is less than |
| the directions given minimum throughput threshold. |
| """ |
| channel = settings["channel"] |
| channel_bandwidth = settings["channel_bandwidth"] |
| security = settings["security"] |
| min_tx_throughput = settings["min_tx_throughput"] |
| min_rx_throughput = settings["min_rx_throughput"] |
| if security: |
| if security == hostapd_constants.WEP_STRING: |
| password = utils.rand_hex_str(WEP_HEX_STRING_LENGTH) |
| else: |
| password = utils.rand_ascii_str(hostapd_constants.MIN_WPA_PSK_LENGTH) |
| security_profile = Security(security_mode=security, password=password) |
| target_security = ( |
| hostapd_constants.SECURITY_STRING_TO_DEFAULT_TARGET_SECURITY.get( |
| security |
| ) |
| ) |
| else: |
| password = None |
| security_profile = None |
| target_security = None |
| ssid = self.setup_ap(channel, channel_bandwidth, security_profile) |
| associated = self.dut.associate( |
| ssid, target_pwd=password, target_security=target_security |
| ) |
| if not associated: |
| if self.iperf_server: |
| self.log_to_file_and_throughput_data( |
| channel, channel_bandwidth, None, None |
| ) |
| asserts.fail("Device failed to associate with network %s" % ssid) |
| self.log.info("DUT (%s) connected to network %s." % (self.dut.device.ip, ssid)) |
| if self.iperf_server: |
| self.iperf_server.renew_test_interface_ip_address() |
| self.log.info( |
| "Getting ip address for iperf server. Will retry for %s seconds." |
| % self.time_to_wait_for_ip_addr |
| ) |
| iperf_server_address = self.get_and_verify_iperf_address( |
| channel, self.iperf_server |
| ) |
| self.log.info( |
| "Getting ip address for DUT. Will retry for %s seconds." |
| % self.time_to_wait_for_ip_addr |
| ) |
| iperf_client_address = self.get_and_verify_iperf_address( |
| channel, self.dut.device, self.iperf_client.test_interface |
| ) |
| tx_throughput = self.get_iperf_throughput( |
| iperf_server_address, iperf_client_address |
| ) |
| rx_throughput = self.get_iperf_throughput( |
| iperf_server_address, iperf_client_address, reverse=True |
| ) |
| self.log_to_file_and_throughput_data( |
| channel, channel_bandwidth, tx_throughput, rx_throughput |
| ) |
| self.log.info( |
| "Throughput (tx, rx): (%s Mb/s, %s Mb/s), " |
| "Minimum threshold (tx, rx): (%s Mb/s, %s Mb/s)" |
| % (tx_throughput, rx_throughput, min_tx_throughput, min_rx_throughput) |
| ) |
| base_message = ( |
| "Actual throughput (on channel: %s, channel bandwidth: " |
| "%s, security: %s)" % (channel, channel_bandwidth, security) |
| ) |
| if ( |
| not tx_throughput |
| or not rx_throughput |
| or tx_throughput < min_tx_throughput |
| or rx_throughput < min_rx_throughput |
| ): |
| asserts.fail("%s below the minimum threshold." % base_message) |
| asserts.explicit_pass("%s above the minimum threshold." % base_message) |
| else: |
| asserts.explicit_pass( |
| "Association test pass. No throughput measurement taken." |
| ) |
| |
| def verify_regulatory_compliance(self, settings): |
| """Test function for regulatory compliance tests. Verify device complies |
| with provided regulatory requirements. |
| |
| Args: |
| settings: dict, containing the following test settings |
| test_channels: dict, mapping channels to a set of the channel |
| bandwidths to test (see example for using JSON). Defaults |
| to hostapd_constants.ALL_CHANNELS. |
| country_code: string, two-char country code to set on device |
| (prioritized over country_name) |
| country_name: string, country name from hostapd_constants to set |
| on device. |
| base_test_name (optional): string, test name prefix to use with |
| generatedsubtests. |
| test_name: string, the test name for this |
| parent test case from the config file. In explicit tests, |
| this is not necessary. |
| """ |
| country_name = settings.get("country_name") |
| country_code = settings.get("country_code") |
| if not (country_code or country_name): |
| raise ValueError("No country code or name provided.") |
| |
| test_channels = settings.get("test_channels", hostapd_constants.ALL_CHANNELS) |
| allowed_channels = settings["allowed_channels"] |
| |
| base_test_name = settings.get("base_test_name", "test_compliance") |
| |
| if country_code: |
| code = country_code |
| else: |
| code = hostapd_constants.COUNTRY_CODE[country_name]["country_code"] |
| |
| self.set_dut_country_code(code) |
| |
| test_list = [] |
| for channel in test_channels: |
| for channel_bandwidth in test_channels[channel]: |
| sub_test_name = "%s_channel_%s_%smhz" % ( |
| base_test_name, |
| channel, |
| channel_bandwidth, |
| ) |
| should_associate = ( |
| channel in allowed_channels |
| and channel_bandwidth in allowed_channels[channel] |
| ) |
| # Note: these int conversions because when these tests are |
| # imported via JSON, they may be strings since the channels |
| # will be keys. This makes the json/list test_channels param |
| # behave exactly like the in code dict/set test_channels. |
| test_list.append( |
| { |
| "country_code": code, |
| "channel": int(channel), |
| "channel_bandwidth": int(channel_bandwidth), |
| "should_associate": should_associate, |
| "test_name": sub_test_name, |
| } |
| ) |
| self.run_generated_testcases( |
| test_func=self.verify_channel_compliance, |
| settings=test_list, |
| name_func=get_test_name, |
| ) |
| |
| def verify_channel_compliance(self, settings): |
| """Verify device complies with provided regulatory requirements for a |
| specific channel and channel bandwidth. Run with generated test cases |
| in the verify_regulatory_compliance parent test. |
| _ |
| Args: |
| settings: see verify_regulatory_compliance` |
| """ |
| channel = settings["channel"] |
| channel_bandwidth = settings["channel_bandwidth"] |
| code = settings["country_code"] |
| should_associate = settings["should_associate"] |
| |
| ssid = self.setup_ap(channel, channel_bandwidth) |
| |
| self.log.info( |
| "Attempting to associate with network (%s) on channel %s @ %smhz. " |
| "Expected behavior: %s" |
| % ( |
| ssid, |
| channel, |
| channel_bandwidth, |
| "Device should associate" |
| if should_associate |
| else "Device should NOT associate.", |
| ) |
| ) |
| |
| associated = self.dut.associate(ssid) |
| |
| regulatory_result_marker = "REGTRACKER: %s,%s,%s,%s,%s" % ( |
| code, |
| channel, |
| "2.4" if channel < 36 else "5", |
| channel_bandwidth, |
| "c" if associated else "nc", |
| ) |
| self.regulatory_results += regulatory_result_marker + "\n" |
| self.log.info(regulatory_result_marker) |
| |
| if associated == should_associate: |
| asserts.explicit_pass( |
| "Device complied with %s regulatory requirement for channel %s " |
| " with channel bandwidth %smhz. %s" |
| % ( |
| code, |
| channel, |
| channel_bandwidth, |
| "Associated." if associated else "Refused to associate.", |
| ) |
| ) |
| else: |
| asserts.fail( |
| "Device failed compliance with regulatory domain %s for " |
| "channel %s with channel bandwidth %smhz. Expected: %s, Got: %s" |
| % ( |
| code, |
| channel, |
| channel_bandwidth, |
| "Should associate" if should_associate else "Should not associate", |
| "Associated" if associated else "Did not associate", |
| ) |
| ) |
| |
| # Helper functions to allow explicit tests throughput and standard deviation |
| # thresholds to be passed in via config. |
| def _get_min_tx_throughput(self, test_name): |
| return ( |
| self.user_params.get("channel_sweep_test_params", {}) |
| .get(test_name, {}) |
| .get("min_tx_throughput", DEFAULT_MIN_THROUGHPUT) |
| ) |
| |
| def _get_min_rx_throughput(self, test_name): |
| return ( |
| self.user_params.get("channel_sweep_test_params", {}) |
| .get(test_name, {}) |
| .get("min_rx_throughput", DEFAULT_MIN_THROUGHPUT) |
| ) |
| |
| def _get_max_std_dev(self, test_name): |
| return ( |
| self.user_params.get("channel_sweep_test_params", {}) |
| .get(test_name, {}) |
| .get("min_std_dev", DEFAULT_MAX_STD_DEV) |
| ) |
| |
| # Channel Performance of US Channels: 570 Test Cases |
| # 36 Test Cases |
| def test_us_20mhz_open_channel_performance(self): |
| self.run_channel_performance_tests( |
| dict( |
| test_channels=hostapd_constants.US_CHANNELS_2G |
| + hostapd_constants.US_CHANNELS_5G, |
| test_channel_bandwidth=hostapd_constants.CHANNEL_BANDWIDTH_20MHZ, |
| base_test_name=self.test_name, |
| min_tx_throughput=self._get_min_tx_throughput(self.test_name), |
| min_rx_throughput=self._get_min_rx_throughput(self.test_name), |
| max_std_dev=self._get_max_std_dev(self.test_name), |
| ) |
| ) |
| |
| # 35 Test Cases |
| def test_us_40mhz_open_channel_performance(self): |
| self.run_channel_performance_tests( |
| dict( |
| test_channels=hostapd_constants.US_CHANNELS_2G |
| + hostapd_constants.US_CHANNELS_5G[:-1], |
| test_channel_bandwidth=hostapd_constants.CHANNEL_BANDWIDTH_40MHZ, |
| base_test_name=self.test_name, |
| min_tx_throughput=self._get_min_tx_throughput(self.test_name), |
| min_rx_throughput=self._get_min_rx_throughput(self.test_name), |
| max_std_dev=self._get_max_std_dev(self.test_name), |
| ) |
| ) |
| |
| # 24 Test Cases |
| def test_us_80mhz_open_channel_performance(self): |
| self.run_channel_performance_tests( |
| dict( |
| test_channels=hostapd_constants.US_CHANNELS_5G[:-1], |
| test_channel_bandwidth=hostapd_constants.CHANNEL_BANDWIDTH_80MHZ, |
| base_test_name=self.test_name, |
| min_tx_throughput=self._get_min_tx_throughput(self.test_name), |
| min_rx_throughput=self._get_min_rx_throughput(self.test_name), |
| max_std_dev=self._get_max_std_dev(self.test_name), |
| ) |
| ) |
| |
| # 36 Test Cases |
| def test_us_20mhz_wep_channel_performance(self): |
| self.run_channel_performance_tests( |
| dict( |
| test_channels=hostapd_constants.US_CHANNELS_2G |
| + hostapd_constants.US_CHANNELS_5G, |
| test_channel_bandwidth=hostapd_constants.CHANNEL_BANDWIDTH_20MHZ, |
| test_security=hostapd_constants.WEP_STRING, |
| base_test_name=self.test_name, |
| min_tx_throughput=self._get_min_tx_throughput(self.test_name), |
| min_rx_throughput=self._get_min_rx_throughput(self.test_name), |
| max_std_dev=self._get_max_std_dev(self.test_name), |
| ) |
| ) |
| |
| # 35 Test Cases |
| def test_us_40mhz_wep_channel_performance(self): |
| self.run_channel_performance_tests( |
| dict( |
| test_channels=hostapd_constants.US_CHANNELS_2G |
| + hostapd_constants.US_CHANNELS_5G[:-1], |
| test_channel_bandwidth=hostapd_constants.CHANNEL_BANDWIDTH_40MHZ, |
| test_security=hostapd_constants.WEP_STRING, |
| base_test_name=self.test_name, |
| min_tx_throughput=self._get_min_tx_throughput(self.test_name), |
| min_rx_throughput=self._get_min_rx_throughput(self.test_name), |
| max_std_dev=self._get_max_std_dev(self.test_name), |
| ) |
| ) |
| |
| # 24 Test Cases |
| def test_us_80mhz_wep_channel_performance(self): |
| self.run_channel_performance_tests( |
| dict( |
| test_channels=hostapd_constants.US_CHANNELS_5G[:-1], |
| test_channel_bandwidth=hostapd_constants.CHANNEL_BANDWIDTH_80MHZ, |
| test_security=hostapd_constants.WEP_STRING, |
| base_test_name=self.test_name, |
| min_tx_throughput=self._get_min_tx_throughput(self.test_name), |
| min_rx_throughput=self._get_min_rx_throughput(self.test_name), |
| max_std_dev=self._get_max_std_dev(self.test_name), |
| ) |
| ) |
| |
| # 36 Test Cases |
| def test_us_20mhz_wpa_channel_performance(self): |
| self.run_channel_performance_tests( |
| dict( |
| test_channels=hostapd_constants.US_CHANNELS_2G |
| + hostapd_constants.US_CHANNELS_5G, |
| test_channel_bandwidth=hostapd_constants.CHANNEL_BANDWIDTH_20MHZ, |
| test_security=hostapd_constants.WPA_STRING, |
| base_test_name=self.test_name, |
| min_tx_throughput=self._get_min_tx_throughput(self.test_name), |
| min_rx_throughput=self._get_min_rx_throughput(self.test_name), |
| max_std_dev=self._get_max_std_dev(self.test_name), |
| ) |
| ) |
| |
| # 35 Test Cases |
| def test_us_40mhz_wpa_channel_performance(self): |
| self.run_channel_performance_tests( |
| dict( |
| test_channels=hostapd_constants.US_CHANNELS_2G |
| + hostapd_constants.US_CHANNELS_5G[:-1], |
| test_channel_bandwidth=hostapd_constants.CHANNEL_BANDWIDTH_40MHZ, |
| test_security=hostapd_constants.WPA_STRING, |
| base_test_name=self.test_name, |
| min_tx_throughput=self._get_min_tx_throughput(self.test_name), |
| min_rx_throughput=self._get_min_rx_throughput(self.test_name), |
| max_std_dev=self._get_max_std_dev(self.test_name), |
| ) |
| ) |
| |
| # 24 Test Cases |
| def test_us_80mhz_wpa_channel_performance(self): |
| self.run_channel_performance_tests( |
| dict( |
| test_channels=hostapd_constants.US_CHANNELS_5G[:-1], |
| test_channel_bandwidth=hostapd_constants.CHANNEL_BANDWIDTH_80MHZ, |
| test_security=hostapd_constants.WPA_STRING, |
| base_test_name=self.test_name, |
| min_tx_throughput=self._get_min_tx_throughput(self.test_name), |
| min_rx_throughput=self._get_min_rx_throughput(self.test_name), |
| max_std_dev=self._get_max_std_dev(self.test_name), |
| ) |
| ) |
| |
| # 36 Test Cases |
| def test_us_20mhz_wpa2_channel_performance(self): |
| self.run_channel_performance_tests( |
| dict( |
| test_channels=hostapd_constants.US_CHANNELS_2G |
| + hostapd_constants.US_CHANNELS_5G, |
| test_channel_bandwidth=hostapd_constants.CHANNEL_BANDWIDTH_20MHZ, |
| test_security=hostapd_constants.WPA2_STRING, |
| base_test_name=self.test_name, |
| min_tx_throughput=self._get_min_tx_throughput(self.test_name), |
| min_rx_throughput=self._get_min_rx_throughput(self.test_name), |
| max_std_dev=self._get_max_std_dev(self.test_name), |
| ) |
| ) |
| |
| # 35 Test Cases |
| def test_us_40mhz_wpa2_channel_performance(self): |
| self.run_channel_performance_tests( |
| dict( |
| test_channels=hostapd_constants.US_CHANNELS_2G |
| + hostapd_constants.US_CHANNELS_5G[:-1], |
| test_channel_bandwidth=hostapd_constants.CHANNEL_BANDWIDTH_40MHZ, |
| test_security=hostapd_constants.WPA2_STRING, |
| base_test_name=self.test_name, |
| min_tx_throughput=self._get_min_tx_throughput(self.test_name), |
| min_rx_throughput=self._get_min_rx_throughput(self.test_name), |
| max_std_dev=self._get_max_std_dev(self.test_name), |
| ) |
| ) |
| |
| # 24 Test Cases |
| def test_us_80mhz_wpa2_channel_performance(self): |
| self.run_channel_performance_tests( |
| dict( |
| test_channels=hostapd_constants.US_CHANNELS_5G[:-1], |
| test_channel_bandwidth=hostapd_constants.CHANNEL_BANDWIDTH_80MHZ, |
| test_security=hostapd_constants.WPA2_STRING, |
| base_test_name=self.test_name, |
| min_tx_throughput=self._get_min_tx_throughput(self.test_name), |
| min_rx_throughput=self._get_min_rx_throughput(self.test_name), |
| max_std_dev=self._get_max_std_dev(self.test_name), |
| ) |
| ) |
| |
| # 36 Test Cases |
| def test_us_20mhz_wpa_wpa2_channel_performance(self): |
| self.run_channel_performance_tests( |
| dict( |
| test_channels=hostapd_constants.US_CHANNELS_2G |
| + hostapd_constants.US_CHANNELS_5G, |
| test_channel_bandwidth=hostapd_constants.CHANNEL_BANDWIDTH_20MHZ, |
| test_security=hostapd_constants.WPA_MIXED_STRING, |
| base_test_name=self.test_name, |
| min_tx_throughput=self._get_min_tx_throughput(self.test_name), |
| min_rx_throughput=self._get_min_rx_throughput(self.test_name), |
| max_std_dev=self._get_max_std_dev(self.test_name), |
| ) |
| ) |
| |
| # 35 Test Cases |
| def test_us_40mhz_wpa_wpa2_channel_performance(self): |
| self.run_channel_performance_tests( |
| dict( |
| test_channels=hostapd_constants.US_CHANNELS_2G |
| + hostapd_constants.US_CHANNELS_5G[:-1], |
| test_channel_bandwidth=hostapd_constants.CHANNEL_BANDWIDTH_40MHZ, |
| test_security=hostapd_constants.WPA_MIXED_STRING, |
| base_test_name=self.test_name, |
| min_tx_throughput=self._get_min_tx_throughput(self.test_name), |
| min_rx_throughput=self._get_min_rx_throughput(self.test_name), |
| max_std_dev=self._get_max_std_dev(self.test_name), |
| ) |
| ) |
| |
| # 24 Test Cases |
| def test_us_80mhz_wpa_wpa2_channel_performance(self): |
| self.run_channel_performance_tests( |
| dict( |
| test_channels=hostapd_constants.US_CHANNELS_5G[:-1], |
| test_channel_bandwidth=hostapd_constants.CHANNEL_BANDWIDTH_80MHZ, |
| test_security=hostapd_constants.WPA_MIXED_STRING, |
| base_test_name=self.test_name, |
| min_tx_throughput=self._get_min_tx_throughput(self.test_name), |
| min_rx_throughput=self._get_min_rx_throughput(self.test_name), |
| max_std_dev=self._get_max_std_dev(self.test_name), |
| ) |
| ) |
| |
| # 36 Test Cases |
| def test_us_20mhz_wpa3_channel_performance(self): |
| self.run_channel_performance_tests( |
| dict( |
| test_channels=hostapd_constants.US_CHANNELS_2G |
| + hostapd_constants.US_CHANNELS_5G, |
| test_channel_bandwidth=hostapd_constants.CHANNEL_BANDWIDTH_20MHZ, |
| test_security=hostapd_constants.WPA3_STRING, |
| base_test_name=self.test_name, |
| min_tx_throughput=self._get_min_tx_throughput(self.test_name), |
| min_rx_throughput=self._get_min_rx_throughput(self.test_name), |
| max_std_dev=self._get_max_std_dev(self.test_name), |
| ) |
| ) |
| |
| # 35 Test Cases |
| def test_us_40mhz_wpa3_channel_performance(self): |
| self.run_channel_performance_tests( |
| dict( |
| test_channels=hostapd_constants.US_CHANNELS_2G |
| + hostapd_constants.US_CHANNELS_5G[:-1], |
| test_channel_bandwidth=hostapd_constants.CHANNEL_BANDWIDTH_40MHZ, |
| test_security=hostapd_constants.WPA3_STRING, |
| base_test_name=self.test_name, |
| min_tx_throughput=self._get_min_tx_throughput(self.test_name), |
| min_rx_throughput=self._get_min_rx_throughput(self.test_name), |
| max_std_dev=self._get_max_std_dev(self.test_name), |
| ) |
| ) |
| |
| # 24 Test Cases |
| def test_us_80mhz_wpa3_channel_performance(self): |
| self.run_channel_performance_tests( |
| dict( |
| test_channels=hostapd_constants.US_CHANNELS_5G[:-1], |
| test_channel_bandwidth=hostapd_constants.CHANNEL_BANDWIDTH_80MHZ, |
| test_security=hostapd_constants.WPA3_STRING, |
| base_test_name=self.test_name, |
| min_tx_throughput=self._get_min_tx_throughput(self.test_name), |
| min_rx_throughput=self._get_min_rx_throughput(self.test_name), |
| max_std_dev=self._get_max_std_dev(self.test_name), |
| ) |
| ) |
| |
| def test_channel_performance_debug(self): |
| """Run channel performance test cases from the ACTS config file. |
| |
| Example: |
| "channel_sweep_test_params": { |
| "debug_channel_performance_tests": [ |
| { |
| "test_name": "test_123_20mhz_wpa2_performance" |
| "test_channels": [1, 2, 3], |
| "test_channel_bandwidth": 20, |
| "test_security": "wpa2", |
| "base_test_name": "test_123_perf", |
| "min_tx_throughput": 1.1, |
| "min_rx_throughput": 3, |
| "max_std_dev": 0.5 |
| }, |
| ... |
| ] |
| } |
| |
| """ |
| asserts.skip_if( |
| "debug_channel_performance_tests" |
| not in self.user_params.get("channel_sweep_test_params", {}), |
| "No custom channel performance tests provided in config.", |
| ) |
| base_tests = self.user_params["channel_sweep_test_params"][ |
| "debug_channel_performance_tests" |
| ] |
| self.run_generated_testcases( |
| self.run_channel_performance_tests, |
| settings=base_tests, |
| name_func=get_test_name, |
| ) |
| |
| def test_regulatory_compliance(self): |
| """Run regulatory compliance test case from the ACTS config file. |
| Note: only one country_name OR country_code is required. |
| |
| Example: |
| "channel_sweep_test_params": { |
| "regulatory_compliance_tests": [ |
| { |
| "test_name": "test_japan_compliance_1_13_36" |
| "country_name": "JAPAN", |
| "country_code": "JP", |
| "test_channels": { |
| "1": [20, 40], "13": [40], "36": [20, 40, 80] |
| }, |
| "allowed_channels": { |
| "1": [20, 40], "36": [20, 40, 80] |
| }, |
| "base_test_name": "test_japan" |
| }, |
| ... |
| ] |
| } |
| """ |
| asserts.skip_if( |
| "regulatory_compliance_tests" |
| not in self.user_params.get("channel_sweep_test_params", {}), |
| "No custom regulatory compliance tests provided in config.", |
| ) |
| |
| # TODO(http://b/280442689): Add "supported_country_codes" and |
| # "unsupported_channels" to test params |
| base_tests = self.user_params["channel_sweep_test_params"][ |
| "regulatory_compliance_tests" |
| ] |
| self.run_generated_testcases( |
| self.verify_regulatory_compliance, |
| settings=base_tests, |
| name_func=get_test_name, |
| ) |
| |
| |
| if __name__ == "__main__": |
| test_runner.main() |