| #!/usr/bin/env python3.4 |
| # |
| # Copyright 2022 The Fuchsia Authors |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| import collections |
| import importlib |
| import ipaddress |
| import logging |
| import re |
| import time |
| from concurrent.futures import ThreadPoolExecutor |
| |
| import numpy |
| from mobly import asserts |
| |
| from antlion import utils |
| from antlion.controllers.android_device import AndroidDevice |
| from antlion.controllers.utils_lib import ssh |
| from antlion.test_utils.wifi import wifi_test_utils as wutils |
| from antlion.test_utils.wifi.wifi_performance_test_utils import ( |
| brcm_utils, |
| ping_utils, |
| qcom_utils, |
| ) |
| |
| SHORT_SLEEP = 1 |
| MED_SLEEP = 6 |
| CHANNELS_6GHz = [f"6g{4 * x + 1}" for x in range(59)] |
| BAND_TO_CHANNEL_MAP = { |
| "2.4GHz": list(range(1, 14)), |
| "UNII-1": [36, 40, 44, 48], |
| "UNII-2": [52, 56, 60, 64, 100, 104, 108, 112, 116, 120, 124, 128, 132, 140], |
| "UNII-3": [149, 153, 157, 161, 165], |
| "6GHz": CHANNELS_6GHz, |
| } |
| CHANNEL_TO_BAND_MAP = { |
| channel: band |
| for band, channels in BAND_TO_CHANNEL_MAP.items() |
| for channel in channels |
| } |
| |
| |
| # Decorators |
| def nonblocking(f): |
| """Creates a decorator transforming function calls to non-blocking""" |
| |
| def wrap(*args, **kwargs): |
| executor = ThreadPoolExecutor(max_workers=1) |
| thread_future = executor.submit(f, *args, **kwargs) |
| # Ensure resources are freed up when executor ruturns or raises |
| executor.shutdown(wait=False) |
| return thread_future |
| |
| return wrap |
| |
| |
| def detect_wifi_platform(dut): |
| if hasattr(dut, "wifi_platform"): |
| return dut.wifi_platform |
| qcom_check = len(dut.get_file_names("/vendor/firmware/wlan/qca_cld/")) |
| if qcom_check: |
| dut.wifi_platform = "qcom" |
| else: |
| dut.wifi_platform = "brcm" |
| return dut.wifi_platform |
| |
| |
| def detect_wifi_decorator(f): |
| def wrap(*args, **kwargs): |
| if "dut" in kwargs: |
| dut = kwargs["dut"] |
| else: |
| dut = next(arg for arg in args if type(arg) == AndroidDevice) |
| dut_package = ( |
| "acts_contrib.test_utils.wifi.wifi_performance_test_utils.{}_utils".format( |
| detect_wifi_platform(dut) |
| ) |
| ) |
| dut_package = importlib.import_module(dut_package) |
| f_decorated = getattr(dut_package, f.__name__, lambda: None) |
| return f_decorated(*args, **kwargs) |
| |
| return wrap |
| |
| |
| # JSON serializer |
| def serialize_dict(input_dict): |
| """Function to serialize dicts to enable JSON output""" |
| output_dict = collections.OrderedDict() |
| for key, value in input_dict.items(): |
| output_dict[_serialize_value(key)] = _serialize_value(value) |
| return output_dict |
| |
| |
| def _serialize_value(value): |
| """Function to recursively serialize dict entries to enable JSON output""" |
| if isinstance(value, tuple): |
| return str(value) |
| if isinstance(value, numpy.int64): |
| return int(value) |
| if isinstance(value, numpy.float64): |
| return float(value) |
| if isinstance(value, list): |
| return [_serialize_value(x) for x in value] |
| if isinstance(value, numpy.ndarray): |
| return [_serialize_value(x) for x in value] |
| elif isinstance(value, dict): |
| return serialize_dict(value) |
| elif type(value) in (float, int, bool, str): |
| return value |
| else: |
| return "Non-serializable object" |
| |
| |
| def extract_sub_dict(full_dict, fields): |
| sub_dict = collections.OrderedDict((field, full_dict[field]) for field in fields) |
| return sub_dict |
| |
| |
| # Miscellaneous Wifi Utilities |
| def check_skip_conditions(testcase_params, dut, access_point, ota_chamber=None): |
| """Checks if test should be skipped.""" |
| # Check battery level before test |
| if not health_check(dut, 10): |
| asserts.skip("DUT battery level too low.") |
| if not access_point.band_lookup_by_channel(testcase_params["channel"]): |
| asserts.skip("AP does not support requested channel.") |
| if ( |
| ota_chamber |
| and CHANNEL_TO_BAND_MAP[testcase_params["channel"]] |
| not in ota_chamber.SUPPORTED_BANDS |
| ): |
| asserts.skip("OTA chamber does not support requested channel.") |
| # Check if 6GHz is supported by checking capabilities in the US. |
| if not dut.droid.wifiCheckState(): |
| wutils.wifi_toggle_state(dut, True) |
| iw_list = dut.adb.shell("iw list") |
| supports_6ghz = "6135 MHz" in iw_list |
| supports_160mhz = "Supported Channel Width: 160 MHz" in iw_list |
| if testcase_params.get("bandwidth", 20) == 160 and not supports_160mhz: |
| asserts.skip("DUT does not support 160 MHz networks.") |
| if testcase_params.get("channel", 6) in CHANNELS_6GHz and not supports_6ghz: |
| asserts.skip("DUT does not support 6 GHz band.") |
| |
| |
| def validate_network(dut, ssid): |
| """Check that DUT has a valid internet connection through expected SSID |
| |
| Args: |
| dut: android device of interest |
| ssid: expected ssid |
| """ |
| try: |
| connected = wutils.validate_connection(dut, wait_time=3) is not None |
| current_network = dut.droid.wifiGetConnectionInfo() |
| except: |
| connected = False |
| current_network = None |
| if connected and current_network["SSID"] == ssid: |
| return True |
| else: |
| return False |
| |
| |
| def get_server_address(ssh_connection, dut_ip, subnet_mask): |
| """Get server address on a specific subnet, |
| |
| This function retrieves the LAN or WAN IP of a remote machine used in |
| testing. If subnet_mask is set to 'public' it returns a machines global ip, |
| else it returns the ip belonging to the dut local network given the dut's |
| ip and subnet mask. |
| |
| Args: |
| ssh_connection: object representing server for which we want an ip |
| dut_ip: string in ip address format, i.e., xxx.xxx.xxx.xxx |
| subnet_mask: string representing subnet mask (public for global ip) |
| """ |
| ifconfig_out = ssh_connection.run("ifconfig").stdout |
| ip_list = re.findall("inet (?:addr:)?(\d+.\d+.\d+.\d+)", ifconfig_out) |
| ip_list = [ipaddress.ip_address(ip) for ip in ip_list] |
| |
| if subnet_mask == "public": |
| for ip in ip_list: |
| # is_global is not used to allow for CGNAT ips in 100.x.y.z range |
| if not ip.is_private: |
| return str(ip) |
| else: |
| dut_network = ipaddress.ip_network(f"{dut_ip}/{subnet_mask}", strict=False) |
| for ip in ip_list: |
| if ip in dut_network: |
| return str(ip) |
| logging.error("No IP address found in requested subnet") |
| |
| |
| # Ping utilities |
| def get_ping_stats(src_device, dest_address, ping_duration, ping_interval, ping_size): |
| """Run ping to or from the DUT. |
| |
| The function computes either pings the DUT or pings a remote ip from |
| DUT. |
| |
| Args: |
| src_device: object representing device to ping from |
| dest_address: ip address to ping |
| ping_duration: timeout to set on the ping process (in seconds) |
| ping_interval: time between pings (in seconds) |
| ping_size: size of ping packet payload |
| Returns: |
| ping_result: dict containing ping results and other meta data |
| """ |
| ping_count = int(ping_duration / ping_interval) |
| ping_deadline = int(ping_count * ping_interval) + 1 |
| ping_cmd_linux = "ping -c {} -w {} -i {} -s {} -D".format( |
| ping_count, |
| ping_deadline, |
| ping_interval, |
| ping_size, |
| ) |
| |
| ping_cmd_macos = "ping -c {} -t {} -i {} -s {}".format( |
| ping_count, |
| ping_deadline, |
| ping_interval, |
| ping_size, |
| ) |
| |
| if isinstance(src_device, AndroidDevice): |
| ping_cmd = f"{ping_cmd_linux} {dest_address}" |
| ping_output = src_device.adb.shell( |
| ping_cmd, timeout=ping_deadline + SHORT_SLEEP, ignore_status=True |
| ) |
| elif isinstance(src_device, ssh.connection.SshConnection): |
| platform = src_device.run("uname").stdout |
| if "linux" in platform.lower(): |
| ping_cmd = f"sudo {ping_cmd_linux} {dest_address}" |
| elif "darwin" in platform.lower(): |
| ping_cmd = "sudo {} {}| while IFS= read -r line; do printf '[%s] %s\n' \"$(gdate '+%s.%N')\" \"$line\"; done".format( |
| ping_cmd_macos, dest_address |
| ) |
| ping_output = src_device.run( |
| ping_cmd, timeout=ping_deadline + SHORT_SLEEP, ignore_status=True |
| ).stdout |
| else: |
| raise TypeError(f"Unable to ping using src_device of type {type(src_device)}.") |
| return ping_utils.PingResult(ping_output.splitlines()) |
| |
| |
| @nonblocking |
| def get_ping_stats_nb( |
| src_device, dest_address, ping_duration, ping_interval, ping_size |
| ): |
| return get_ping_stats( |
| src_device, dest_address, ping_duration, ping_interval, ping_size |
| ) |
| |
| |
| # Iperf utilities |
| @nonblocking |
| def start_iperf_client_nb(iperf_client, iperf_server_address, iperf_args, tag, timeout): |
| return iperf_client.start(iperf_server_address, iperf_args, tag, timeout) |
| |
| |
| def get_iperf_arg_string( |
| duration, |
| reverse_direction, |
| interval=1, |
| traffic_type="TCP", |
| socket_size=None, |
| num_processes=1, |
| udp_throughput="1000M", |
| ipv6=False, |
| ): |
| """Function to format iperf client arguments. |
| |
| This function takes in iperf client parameters and returns a properly |
| formatter iperf arg string to be used in throughput tests. |
| |
| Args: |
| duration: iperf duration in seconds |
| reverse_direction: boolean controlling the -R flag for iperf clients |
| interval: iperf print interval |
| traffic_type: string specifying TCP or UDP traffic |
| socket_size: string specifying TCP window or socket buffer, e.g., 2M |
| num_processes: int specifying number of iperf processes |
| udp_throughput: string specifying TX throughput in UDP tests, e.g. 100M |
| ipv6: boolean controlling the use of IP V6 |
| Returns: |
| iperf_args: string of formatted iperf args |
| """ |
| iperf_args = f"-i {interval} -t {duration} -J " |
| if ipv6: |
| iperf_args += "-6 " |
| if traffic_type.upper() == "UDP": |
| iperf_args += f"-u -b {udp_throughput} -l 1470 -P {num_processes} " |
| elif traffic_type.upper() == "TCP": |
| iperf_args += f"-P {num_processes} " |
| if socket_size: |
| iperf_args += f"-w {socket_size} " |
| if reverse_direction: |
| iperf_args += "-R " |
| return iperf_args |
| |
| |
| # Attenuator Utilities |
| def atten_by_label(atten_list, path_label, atten_level): |
| """Attenuate signals according to their path label. |
| |
| Args: |
| atten_list: list of attenuators to iterate over |
| path_label: path label on which to set desired attenuation |
| atten_level: attenuation desired on path |
| """ |
| for atten in atten_list: |
| if path_label in atten.path: |
| atten.set_atten(atten_level, retry=True) |
| |
| |
| def get_atten_for_target_rssi(target_rssi, attenuators, dut, ping_server): |
| """Function to estimate attenuation to hit a target RSSI. |
| |
| This function estimates a constant attenuation setting on all atennuation |
| ports to hit a target RSSI. The estimate is not meant to be exact or |
| guaranteed. |
| |
| Args: |
| target_rssi: rssi of interest |
| attenuators: list of attenuator ports |
| dut: android device object assumed connected to a wifi network. |
| ping_server: ssh connection object to ping server |
| Returns: |
| target_atten: attenuation setting to achieve target_rssi |
| """ |
| logging.info(f"Searching attenuation for RSSI = {target_rssi}dB") |
| # Set attenuator to 0 dB |
| for atten in attenuators: |
| atten.set_atten(0, strict=False, retry=True) |
| # Start ping traffic |
| dut_ip = dut.droid.connectivityGetIPv4Addresses("wlan0")[0] |
| # Measure starting RSSI |
| ping_future = get_ping_stats_nb( |
| src_device=ping_server, |
| dest_address=dut_ip, |
| ping_duration=1.5, |
| ping_interval=0.02, |
| ping_size=64, |
| ) |
| current_rssi = get_connected_rssi( |
| dut, |
| num_measurements=4, |
| polling_frequency=0.25, |
| first_measurement_delay=0.5, |
| disconnect_warning=1, |
| ignore_samples=1, |
| ) |
| current_rssi = current_rssi["signal_poll_rssi"]["mean"] |
| ping_future.result() |
| target_atten = 0 |
| logging.debug(f"RSSI @ {target_atten:.2f}dB attenuation = {current_rssi:.2f}") |
| within_range = 0 |
| for idx in range(20): |
| atten_delta = max(min(current_rssi - target_rssi, 20), -20) |
| target_atten = int((target_atten + atten_delta) * 4) / 4 |
| if target_atten < 0: |
| return 0 |
| if target_atten > attenuators[0].get_max_atten(): |
| return attenuators[0].get_max_atten() |
| for atten in attenuators: |
| atten.set_atten(target_atten, strict=False, retry=True) |
| ping_future = get_ping_stats_nb( |
| src_device=ping_server, |
| dest_address=dut_ip, |
| ping_duration=1.5, |
| ping_interval=0.02, |
| ping_size=64, |
| ) |
| current_rssi = get_connected_rssi( |
| dut, |
| num_measurements=4, |
| polling_frequency=0.25, |
| first_measurement_delay=0.5, |
| disconnect_warning=1, |
| ignore_samples=1, |
| ) |
| current_rssi = current_rssi["signal_poll_rssi"]["mean"] |
| ping_future.result() |
| logging.info(f"RSSI @ {target_atten:.2f}dB attenuation = {current_rssi:.2f}") |
| if abs(current_rssi - target_rssi) < 1: |
| if within_range: |
| logging.info( |
| "Reached RSSI: {0:.2f}. Target RSSI: {1:.2f}." |
| "Attenuation: {2:.2f}, Iterations = {3:.2f}".format( |
| current_rssi, target_rssi, target_atten, idx |
| ) |
| ) |
| return target_atten |
| else: |
| within_range = True |
| else: |
| within_range = False |
| return target_atten |
| |
| |
| def get_current_atten_dut_chain_map(attenuators, dut, ping_server, ping_from_dut=False): |
| """Function to detect mapping between attenuator ports and DUT chains. |
| |
| This function detects the mapping between attenuator ports and DUT chains |
| in cases where DUT chains are connected to only one attenuator port. The |
| function assumes the DUT is already connected to a wifi network. The |
| function starts by measuring per chain RSSI at 0 attenuation, then |
| attenuates one port at a time looking for the chain that reports a lower |
| RSSI. |
| |
| Args: |
| attenuators: list of attenuator ports |
| dut: android device object assumed connected to a wifi network. |
| ping_server: ssh connection object to ping server |
| ping_from_dut: boolean controlling whether to ping from or to dut |
| Returns: |
| chain_map: list of dut chains, one entry per attenuator port |
| """ |
| # Set attenuator to 0 dB |
| for atten in attenuators: |
| atten.set_atten(0, strict=False, retry=True) |
| # Start ping traffic |
| dut_ip = dut.droid.connectivityGetIPv4Addresses("wlan0")[0] |
| if ping_from_dut: |
| ping_future = get_ping_stats_nb( |
| dut, ping_server._settings.hostname, 11, 0.02, 64 |
| ) |
| else: |
| ping_future = get_ping_stats_nb(ping_server, dut_ip, 11, 0.02, 64) |
| # Measure starting RSSI |
| base_rssi = get_connected_rssi(dut, 4, 0.25, 1) |
| chain0_base_rssi = base_rssi["chain_0_rssi"]["mean"] |
| chain1_base_rssi = base_rssi["chain_1_rssi"]["mean"] |
| if chain0_base_rssi < -70 or chain1_base_rssi < -70: |
| logging.warning("RSSI might be too low to get reliable chain map.") |
| # Compile chain map by attenuating one path at a time and seeing which |
| # chain's RSSI degrades |
| chain_map = [] |
| for test_atten in attenuators: |
| # Set one attenuator to 30 dB down |
| test_atten.set_atten(30, strict=False, retry=True) |
| # Get new RSSI |
| test_rssi = get_connected_rssi(dut, 4, 0.25, 1) |
| # Assign attenuator to path that has lower RSSI |
| if ( |
| chain0_base_rssi > -70 |
| and chain0_base_rssi - test_rssi["chain_0_rssi"]["mean"] > 10 |
| ): |
| chain_map.append("DUT-Chain-0") |
| elif ( |
| chain1_base_rssi > -70 |
| and chain1_base_rssi - test_rssi["chain_1_rssi"]["mean"] > 10 |
| ): |
| chain_map.append("DUT-Chain-1") |
| else: |
| chain_map.append(None) |
| # Reset attenuator to 0 |
| test_atten.set_atten(0, strict=False, retry=True) |
| ping_future.result() |
| logging.debug(f"Chain Map: {chain_map}") |
| return chain_map |
| |
| |
| def get_full_rf_connection_map( |
| attenuators, dut, ping_server, networks, ping_from_dut=False |
| ): |
| """Function to detect per-network connections between attenuator and DUT. |
| |
| This function detects the mapping between attenuator ports and DUT chains |
| on all networks in its arguments. The function connects the DUT to each |
| network then calls get_current_atten_dut_chain_map to get the connection |
| map on the current network. The function outputs the results in two formats |
| to enable easy access when users are interested in indexing by network or |
| attenuator port. |
| |
| Args: |
| attenuators: list of attenuator ports |
| dut: android device object assumed connected to a wifi network. |
| ping_server: ssh connection object to ping server |
| networks: dict of network IDs and configs |
| Returns: |
| rf_map_by_network: dict of RF connections indexed by network. |
| rf_map_by_atten: list of RF connections indexed by attenuator |
| """ |
| for atten in attenuators: |
| atten.set_atten(0, strict=False, retry=True) |
| |
| rf_map_by_network = collections.OrderedDict() |
| rf_map_by_atten = [[] for atten in attenuators] |
| for net_id, net_config in networks.items(): |
| wutils.reset_wifi(dut) |
| wutils.wifi_connect( |
| dut, |
| net_config, |
| num_of_tries=1, |
| assert_on_fail=False, |
| check_connectivity=False, |
| ) |
| rf_map_by_network[net_id] = get_current_atten_dut_chain_map( |
| attenuators, dut, ping_server, ping_from_dut |
| ) |
| for idx, chain in enumerate(rf_map_by_network[net_id]): |
| if chain: |
| rf_map_by_atten[idx].append({"network": net_id, "dut_chain": chain}) |
| logging.debug(f"RF Map (by Network): {rf_map_by_network}") |
| logging.debug(f"RF Map (by Atten): {rf_map_by_atten}") |
| |
| return rf_map_by_network, rf_map_by_atten |
| |
| |
| # Generic device utils |
| def get_dut_temperature(dut): |
| """Function to get dut temperature. |
| |
| The function fetches and returns the reading from the temperature sensor |
| used for skin temperature and thermal throttling. |
| |
| Args: |
| dut: AndroidDevice of interest |
| Returns: |
| temperature: device temperature. 0 if temperature could not be read |
| """ |
| candidate_zones = [ |
| "/sys/devices/virtual/thermal/tz-by-name/skin-therm/temp", |
| "/sys/devices/virtual/thermal/tz-by-name/sdm-therm-monitor/temp", |
| "/sys/devices/virtual/thermal/tz-by-name/sdm-therm-adc/temp", |
| "/sys/devices/virtual/thermal/tz-by-name/back_therm/temp", |
| "/dev/thermal/tz-by-name/quiet_therm/temp", |
| ] |
| for zone in candidate_zones: |
| try: |
| temperature = int(dut.adb.shell(f"cat {zone}")) |
| break |
| except: |
| temperature = 0 |
| if temperature == 0: |
| logging.debug("Could not check DUT temperature.") |
| elif temperature > 100: |
| temperature = temperature / 1000 |
| return temperature |
| |
| |
| def wait_for_dut_cooldown(dut, target_temp=50, timeout=300): |
| """Function to wait for a DUT to cool down. |
| |
| Args: |
| dut: AndroidDevice of interest |
| target_temp: target cooldown temperature |
| timeout: maxt time to wait for cooldown |
| """ |
| start_time = time.time() |
| while time.time() - start_time < timeout: |
| temperature = get_dut_temperature(dut) |
| if temperature < target_temp: |
| break |
| time.sleep(SHORT_SLEEP) |
| elapsed_time = time.time() - start_time |
| logging.debug( |
| f"DUT Final Temperature: {temperature}C. Cooldown duration: {elapsed_time}" |
| ) |
| |
| |
| def health_check(dut, batt_thresh=5, temp_threshold=53, cooldown=1): |
| """Function to check health status of a DUT. |
| |
| The function checks both battery levels and temperature to avoid DUT |
| powering off during the test. |
| |
| Args: |
| dut: AndroidDevice of interest |
| batt_thresh: battery level threshold |
| temp_threshold: temperature threshold |
| cooldown: flag to wait for DUT to cool down when overheating |
| Returns: |
| health_check: boolean confirming device is healthy |
| """ |
| health_check = True |
| battery_level = utils.get_battery_level(dut) |
| if battery_level < batt_thresh: |
| logging.warning(f"Battery level low ({battery_level}%)") |
| health_check = False |
| else: |
| logging.debug(f"Battery level = {battery_level}%") |
| |
| temperature = get_dut_temperature(dut) |
| if temperature > temp_threshold: |
| if cooldown: |
| logging.warning(f"Waiting for DUT to cooldown. ({temperature} C)") |
| wait_for_dut_cooldown(dut, target_temp=temp_threshold - 5) |
| else: |
| logging.warning(f"DUT Overheating ({temperature} C)") |
| health_check = False |
| else: |
| logging.debug(f"DUT Temperature = {temperature} C") |
| return health_check |
| |
| |
| # Wifi Device Utils |
| def empty_rssi_result(): |
| return collections.OrderedDict( |
| [("data", []), ("mean", float("nan")), ("stdev", float("nan"))] |
| ) |
| |
| |
| @nonblocking |
| def get_connected_rssi_nb( |
| dut, |
| num_measurements=1, |
| polling_frequency=SHORT_SLEEP, |
| first_measurement_delay=0, |
| disconnect_warning=True, |
| ignore_samples=0, |
| interface="wlan0", |
| ): |
| return get_connected_rssi( |
| dut, |
| num_measurements, |
| polling_frequency, |
| first_measurement_delay, |
| disconnect_warning, |
| ignore_samples, |
| interface, |
| ) |
| |
| |
| @detect_wifi_decorator |
| def get_connected_rssi( |
| dut, |
| num_measurements=1, |
| polling_frequency=SHORT_SLEEP, |
| first_measurement_delay=0, |
| disconnect_warning=True, |
| ignore_samples=0, |
| interface="wlan0", |
| ): |
| """Gets all RSSI values reported for the connected access point/BSSID. |
| |
| Args: |
| dut: android device object from which to get RSSI |
| num_measurements: number of scans done, and RSSIs collected |
| polling_frequency: time to wait between RSSI measurements |
| disconnect_warning: boolean controlling disconnection logging messages |
| ignore_samples: number of leading samples to ignore |
| Returns: |
| connected_rssi: dict containing the measurements results for |
| all reported RSSI values (signal_poll, per chain, etc.) and their |
| statistics |
| """ |
| |
| |
| @nonblocking |
| def get_scan_rssi_nb(dut, tracked_bssids, num_measurements=1): |
| return get_scan_rssi(dut, tracked_bssids, num_measurements) |
| |
| |
| @detect_wifi_decorator |
| def get_scan_rssi(dut, tracked_bssids, num_measurements=1): |
| """Gets scan RSSI for specified BSSIDs. |
| |
| Args: |
| dut: android device object from which to get RSSI |
| tracked_bssids: array of BSSIDs to gather RSSI data for |
| num_measurements: number of scans done, and RSSIs collected |
| Returns: |
| scan_rssi: dict containing the measurement results as well as the |
| statistics of the scan RSSI for all BSSIDs in tracked_bssids |
| """ |
| |
| |
| @detect_wifi_decorator |
| def get_sw_signature(dut): |
| """Function that checks the signature for wifi firmware and config files. |
| |
| Returns: |
| bdf_signature: signature consisting of last three digits of bdf cksums |
| fw_signature: floating point firmware version, i.e., major.minor |
| """ |
| |
| |
| @detect_wifi_decorator |
| def get_country_code(dut): |
| """Function that returns the current wifi country code.""" |
| |
| |
| @detect_wifi_decorator |
| def push_config(dut, config_file): |
| """Function to push Wifi BDF files |
| |
| This function checks for existing wifi bdf files and over writes them all, |
| for simplicity, with the bdf file provided in the arguments. The dut is |
| rebooted for the bdf file to take effect |
| |
| Args: |
| dut: dut to push bdf file to |
| config_file: path to bdf_file to push |
| """ |
| |
| |
| @detect_wifi_decorator |
| def start_wifi_logging(dut): |
| """Function to start collecting wifi-related logs""" |
| |
| |
| @detect_wifi_decorator |
| def stop_wifi_logging(dut): |
| """Function to start collecting wifi-related logs""" |
| |
| |
| @detect_wifi_decorator |
| def push_firmware(dut, firmware_files): |
| """Function to push Wifi firmware files |
| |
| Args: |
| dut: dut to push bdf file to |
| firmware_files: path to wlanmdsp.mbn file |
| datamsc_file: path to Data.msc file |
| """ |
| |
| |
| @detect_wifi_decorator |
| def disable_beamforming(dut): |
| """Function to disable beamforming.""" |
| |
| |
| @detect_wifi_decorator |
| def set_nss_capability(dut, nss): |
| """Function to set number of spatial streams supported.""" |
| |
| |
| @detect_wifi_decorator |
| def set_chain_mask(dut, chain_mask): |
| """Function to set DUT chain mask. |
| |
| Args: |
| dut: android device |
| chain_mask: desired chain mask in [0, 1, '2x2'] |
| """ |
| |
| |
| # Link layer stats utilities |
| class LinkLayerStats: |
| def __new__(self, dut, llstats_enabled=True): |
| if detect_wifi_platform(dut) == "qcom": |
| return qcom_utils.LinkLayerStats(dut, llstats_enabled) |
| else: |
| return brcm_utils.LinkLayerStats(dut, llstats_enabled) |