| #!/usr/bin/env python3 |
| # |
| # Copyright 2022 The Fuchsia Authors |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| import logging |
| import multiprocessing |
| import time |
| from typing import List, Mapping, NamedTuple, Optional, Tuple |
| |
| from mobly import asserts, test_runner |
| from regulatory_channels import COUNTRY_CHANNELS, TEST_CHANNELS |
| |
| from antlion import utils |
| from antlion.controllers.access_point import setup_ap |
| from antlion.controllers.ap_lib import hostapd_config, hostapd_constants |
| from antlion.controllers.ap_lib.hostapd_security import Security |
| from antlion.test_utils.abstract_devices.wlan_device import ( |
| FuchsiaWlanDevice, |
| create_wlan_device, |
| ) |
| from antlion.test_utils.wifi import base_test |
| |
| N_CAPABILITIES_DEFAULT = [ |
| hostapd_constants.N_CAPABILITY_LDPC, |
| hostapd_constants.N_CAPABILITY_SGI20, |
| hostapd_constants.N_CAPABILITY_SGI40, |
| hostapd_constants.N_CAPABILITY_TX_STBC, |
| hostapd_constants.N_CAPABILITY_RX_STBC1, |
| ] |
| |
| MAX_2_4_CHANNEL = 14 |
| TIME_TO_SLEEP_BETWEEN_RETRIES = 1 |
| TIME_TO_WAIT_FOR_COUNTRY_CODE = 10 |
| |
| |
| class RegulatoryTest(NamedTuple): |
| country_code: str |
| channel: int |
| channel_bandwidth: int |
| expect_association: bool |
| |
| |
| class RegulatoryComplianceTest(base_test.WifiBaseTest): |
| """Tests regulatory compliance. |
| |
| Testbed Requirement: |
| * 1 x Fuchsia device (dut) |
| * 1 x access point |
| """ |
| |
| def pre_run(self) -> None: |
| tests: List[RegulatoryTest] = [] |
| for country in COUNTRY_CHANNELS.values(): |
| for channel in TEST_CHANNELS: |
| for bandwidth in TEST_CHANNELS[channel]: |
| tests.append( |
| RegulatoryTest( |
| country_code=country.country_code, |
| channel=channel, |
| channel_bandwidth=bandwidth, |
| expect_association=( |
| channel in country.allowed_channels |
| and bandwidth in country.allowed_channels[channel] |
| ), |
| ) |
| ) |
| |
| def generate_test_name(code: str, channel: int, channel_bandwidth: int, *_): |
| return f"test_{code}_channel_{channel}_{channel_bandwidth}mhz" |
| |
| self.generate_tests(self.verify_channel_compliance, generate_test_name, tests) |
| |
| def setup_class(self) -> None: |
| super().setup_class() |
| self.log = logging.getLogger() |
| |
| self.dut: FuchsiaWlanDevice = create_wlan_device(self.fuchsia_devices[0]) |
| self.access_point = self.access_points[0] |
| self.access_point.stop_all_aps() |
| |
| self.regulatory_results = [ |
| "====CountryCode,Channel,Frequency,ChannelBandwith,Connected/Not-Connected====" |
| ] |
| |
| def teardown_class(self) -> None: |
| super().teardown_class() |
| |
| regulatory_save_path = f"{self.log_path}/regulatory_results.txt" |
| f = open(regulatory_save_path, "w") |
| f.write("\n".join(self.regulatory_results)) |
| f.close() |
| |
| def setup_test(self) -> None: |
| self.access_point.stop_all_aps() |
| for ad in self.android_devices: |
| ad.droid.wakeLockAcquireBright() |
| ad.droid.wakeUpNow() |
| self.dut.wifi_toggle_state(True) |
| self.dut.disconnect() |
| |
| def teardown_test(self) -> None: |
| for ad in self.android_devices: |
| ad.droid.wakeLockRelease() |
| ad.droid.goToSleepNow() |
| self.dut.turn_location_off_and_scan_toggle_off() |
| self.dut.disconnect() |
| self.download_ap_logs() |
| self.access_point.stop_all_aps() |
| |
| def get_phy_ids(self) -> List[str]: |
| """Get a list of WLAN physical interfaces.""" |
| phy_ids_response = self.dut.device.sl4f.wlan_lib.wlanPhyIdList() |
| if phy_ids_response.get("error"): |
| raise ConnectionError( |
| f'Failed to get phy ids from DUT. Error: {phy_ids_response["error"]}' |
| ) |
| return phy_ids_response["result"] |
| |
| def get_phy_country_codes(self) -> Mapping[str, str]: |
| """Get mapping of WLAN interfaces to the country code they are set to.""" |
| phy_ids = self.get_phy_ids() |
| |
| def get_country_code_from_phy(id: str) -> Tuple[str, str]: |
| get_country_response = self.dut.device.sl4f.wlan_lib.wlanGetCountry(id) |
| if get_country_response.get("error"): |
| raise ConnectionError( |
| f"Failed to query PHY ID ({id}) for country. " |
| f'Error: {get_country_response["error"]}' |
| ) |
| country_code = "".join( |
| [chr(ascii_char) for ascii_char in get_country_response["result"]] |
| ) |
| return (id, country_code) |
| |
| with multiprocessing.Pool() as p: |
| items = p.map(get_country_code_from_phy, phy_ids) |
| return dict(items) |
| |
| def set_dut_country_code(self, country_code) -> None: |
| """Set the country code on the DUT. Then verify that the country |
| code was set successfully |
| |
| Args: |
| country_code: string, the 2 character country code to set |
| """ |
| unique_country_codes = set(self.get_phy_country_codes().values()) |
| if len(unique_country_codes) == 1 and unique_country_codes[0] == country_code: |
| # The country code is already set on all WLAN phys; skip setting the country |
| # code again. |
| self.log.debug(f"Country code already set to {country_code}") |
| return |
| |
| self.log.info(f"Setting DUT country code to {country_code}") |
| country_code_response = self.dut.device.sl4f.regulatory_region_lib.setRegion( |
| country_code |
| ) |
| if country_code_response.get("error"): |
| raise EnvironmentError( |
| f"Failed to set country code ({country_code}) on DUT. " |
| f'Error: {country_code_response["error"]}' |
| ) |
| |
| self.log.info( |
| f"Verifying DUT country code was correctly set to {country_code}." |
| ) |
| |
| end_time = time.time() + TIME_TO_WAIT_FOR_COUNTRY_CODE |
| while time.time() < end_time: |
| for phy_id, code in self.get_phy_country_codes().items(): |
| if code != country_code: |
| self.log.debug( |
| f"PHY (id: {phy_id}) has incorrect country code set. " |
| f"Expected: {country_code}, Got: {code}" |
| ) |
| break |
| else: |
| self.log.info(f"All PHYs have expected country code ({country_code})") |
| return |
| time.sleep(TIME_TO_SLEEP_BETWEEN_RETRIES) |
| |
| raise EnvironmentError( |
| f"Failed to set DUT country code to {country_code} after " |
| f"{TIME_TO_WAIT_FOR_COUNTRY_CODE}s" |
| ) |
| |
| def setup_ap( |
| self, |
| channel: int, |
| channel_bandwidth: int, |
| security_profile: Optional[Security] = None, |
| ) -> str: |
| """Start network on AP with basic configuration. |
| |
| Args: |
| channel: channel to use for network |
| channel_bandwidth: channel bandwidth in mhz to use for network, |
| security_profile: security type to use or None if open |
| |
| Returns: |
| SSID of the newly created and running network |
| |
| Raises: |
| ConnectionError if network is not started successfully. |
| """ |
| if channel > MAX_2_4_CHANNEL: |
| vht_bandwidth = channel_bandwidth |
| else: |
| vht_bandwidth = None |
| |
| if channel_bandwidth == hostapd_constants.CHANNEL_BANDWIDTH_20MHZ: |
| n_capabilities = N_CAPABILITIES_DEFAULT + [ |
| hostapd_constants.N_CAPABILITY_HT20 |
| ] |
| elif ( |
| channel_bandwidth == hostapd_constants.CHANNEL_BANDWIDTH_40MHZ |
| or channel_bandwidth == hostapd_constants.CHANNEL_BANDWIDTH_80MHZ |
| ): |
| if hostapd_config.ht40_plus_allowed(channel): |
| extended_channel = [hostapd_constants.N_CAPABILITY_HT40_PLUS] |
| elif hostapd_config.ht40_minus_allowed(channel): |
| extended_channel = [hostapd_constants.N_CAPABILITY_HT40_MINUS] |
| else: |
| raise ValueError(f"Invalid Channel: {channel}") |
| n_capabilities = N_CAPABILITIES_DEFAULT + extended_channel |
| else: |
| raise ValueError(f"Invalid Bandwidth: {channel_bandwidth}") |
| ssid = utils.rand_ascii_str(hostapd_constants.AP_SSID_LENGTH_2G) |
| try: |
| setup_ap( |
| access_point=self.access_point, |
| profile_name="whirlwind", |
| channel=channel, |
| security=security_profile, |
| n_capabilities=n_capabilities, |
| ac_capabilities=None, |
| force_wmm=True, |
| ssid=ssid, |
| vht_bandwidth=vht_bandwidth, |
| setup_bridge=True, |
| ) |
| except Exception as err: |
| raise ConnectionError( |
| f"Failed to setup ap on channel: {channel}, " |
| f"channel bandwidth: {channel_bandwidth} MHz. " |
| ) from err |
| else: |
| self.log.info( |
| f"Network (ssid: {ssid}) up on channel {channel} " |
| f"w/ channel bandwidth {channel_bandwidth} MHz" |
| ) |
| |
| return ssid |
| |
| def verify_channel_compliance( |
| self, |
| country_code: str, |
| channel: int, |
| channel_bandwidth: int, |
| expect_association: bool, |
| ) -> None: |
| """Verify device complies with provided regulatory requirements for a |
| specific channel and channel bandwidth. Run with generated test cases |
| in the verify_regulatory_compliance parent test. |
| """ |
| self.set_dut_country_code(country_code) |
| |
| ssid = self.setup_ap(channel, channel_bandwidth) |
| |
| self.log.info( |
| f'Attempting to associate to network "{ssid}" on channel ' |
| f"{channel} @ {channel_bandwidth}mhz" |
| ) |
| |
| associated = self.dut.associate(ssid) |
| |
| channel_ghz = "2.4" if channel < 36 else "5" |
| association_code = "c" if associated else "nc" |
| regulatory_result = f"REGTRACKER: {country_code},{channel},{channel_ghz},{channel_bandwidth},{association_code}" |
| self.regulatory_results.append(regulatory_result) |
| self.log.info(regulatory_result) |
| |
| asserts.assert_true( |
| associated == expect_association, |
| f"Expected device to{'' if expect_association else ' NOT'} " |
| f"associate using country code {country_code} for channel " |
| f"{channel} with channel bandwidth {channel_bandwidth} MHz.", |
| ) |
| |
| |
| if __name__ == "__main__": |
| test_runner.main() |