blob: e9907e9079cccdc9b001fe99b79e6ad5d1c3264a [file] [log] [blame]
#!/usr/bin/env python3
#
# Copyright 2022 The Fuchsia Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import multiprocessing
import time
from typing import List, Mapping, NamedTuple, Optional, Tuple
from mobly import asserts, test_runner
from regulatory_channels import COUNTRY_CHANNELS, TEST_CHANNELS
from antlion import utils
from antlion.controllers.access_point import setup_ap
from antlion.controllers.ap_lib import hostapd_config, hostapd_constants
from antlion.controllers.ap_lib.hostapd_security import Security
from antlion.test_utils.abstract_devices.wlan_device import (
FuchsiaWlanDevice,
create_wlan_device,
)
from antlion.test_utils.wifi import base_test
N_CAPABILITIES_DEFAULT = [
hostapd_constants.N_CAPABILITY_LDPC,
hostapd_constants.N_CAPABILITY_SGI20,
hostapd_constants.N_CAPABILITY_SGI40,
hostapd_constants.N_CAPABILITY_TX_STBC,
hostapd_constants.N_CAPABILITY_RX_STBC1,
]
MAX_2_4_CHANNEL = 14
TIME_TO_SLEEP_BETWEEN_RETRIES = 1
TIME_TO_WAIT_FOR_COUNTRY_CODE = 10
class RegulatoryTest(NamedTuple):
country_code: str
channel: int
channel_bandwidth: int
expect_association: bool
class RegulatoryComplianceTest(base_test.WifiBaseTest):
"""Tests regulatory compliance.
Testbed Requirement:
* 1 x Fuchsia device (dut)
* 1 x access point
"""
def pre_run(self) -> None:
tests: List[RegulatoryTest] = []
for country in COUNTRY_CHANNELS.values():
for channel in TEST_CHANNELS:
for bandwidth in TEST_CHANNELS[channel]:
tests.append(
RegulatoryTest(
country_code=country.country_code,
channel=channel,
channel_bandwidth=bandwidth,
expect_association=(
channel in country.allowed_channels
and bandwidth in country.allowed_channels[channel]
),
)
)
def generate_test_name(code: str, channel: int, channel_bandwidth: int, *_):
return f"test_{code}_channel_{channel}_{channel_bandwidth}mhz"
self.generate_tests(self.verify_channel_compliance, generate_test_name, tests)
def setup_class(self) -> None:
super().setup_class()
self.log = logging.getLogger()
self.dut: FuchsiaWlanDevice = create_wlan_device(self.fuchsia_devices[0])
self.access_point = self.access_points[0]
self.access_point.stop_all_aps()
self.regulatory_results = [
"====CountryCode,Channel,Frequency,ChannelBandwith,Connected/Not-Connected===="
]
def teardown_class(self) -> None:
super().teardown_class()
regulatory_save_path = f"{self.log_path}/regulatory_results.txt"
f = open(regulatory_save_path, "w")
f.write("\n".join(self.regulatory_results))
f.close()
def setup_test(self) -> None:
self.access_point.stop_all_aps()
for ad in self.android_devices:
ad.droid.wakeLockAcquireBright()
ad.droid.wakeUpNow()
self.dut.wifi_toggle_state(True)
self.dut.disconnect()
def teardown_test(self) -> None:
for ad in self.android_devices:
ad.droid.wakeLockRelease()
ad.droid.goToSleepNow()
self.dut.turn_location_off_and_scan_toggle_off()
self.dut.disconnect()
self.download_ap_logs()
self.access_point.stop_all_aps()
def get_phy_ids(self) -> List[str]:
"""Get a list of WLAN physical interfaces."""
phy_ids_response = self.dut.device.sl4f.wlan_lib.wlanPhyIdList()
if phy_ids_response.get("error"):
raise ConnectionError(
f'Failed to get phy ids from DUT. Error: {phy_ids_response["error"]}'
)
return phy_ids_response["result"]
def get_phy_country_codes(self) -> Mapping[str, str]:
"""Get mapping of WLAN interfaces to the country code they are set to."""
phy_ids = self.get_phy_ids()
def get_country_code_from_phy(id: str) -> Tuple[str, str]:
get_country_response = self.dut.device.sl4f.wlan_lib.wlanGetCountry(id)
if get_country_response.get("error"):
raise ConnectionError(
f"Failed to query PHY ID ({id}) for country. "
f'Error: {get_country_response["error"]}'
)
country_code = "".join(
[chr(ascii_char) for ascii_char in get_country_response["result"]]
)
return (id, country_code)
with multiprocessing.Pool() as p:
items = p.map(get_country_code_from_phy, phy_ids)
return dict(items)
def set_dut_country_code(self, country_code) -> None:
"""Set the country code on the DUT. Then verify that the country
code was set successfully
Args:
country_code: string, the 2 character country code to set
"""
unique_country_codes = set(self.get_phy_country_codes().values())
if len(unique_country_codes) == 1 and unique_country_codes[0] == country_code:
# The country code is already set on all WLAN phys; skip setting the country
# code again.
self.log.debug(f"Country code already set to {country_code}")
return
self.log.info(f"Setting DUT country code to {country_code}")
country_code_response = self.dut.device.sl4f.regulatory_region_lib.setRegion(
country_code
)
if country_code_response.get("error"):
raise EnvironmentError(
f"Failed to set country code ({country_code}) on DUT. "
f'Error: {country_code_response["error"]}'
)
self.log.info(
f"Verifying DUT country code was correctly set to {country_code}."
)
end_time = time.time() + TIME_TO_WAIT_FOR_COUNTRY_CODE
while time.time() < end_time:
for phy_id, code in self.get_phy_country_codes().items():
if code != country_code:
self.log.debug(
f"PHY (id: {phy_id}) has incorrect country code set. "
f"Expected: {country_code}, Got: {code}"
)
break
else:
self.log.info(f"All PHYs have expected country code ({country_code})")
return
time.sleep(TIME_TO_SLEEP_BETWEEN_RETRIES)
raise EnvironmentError(
f"Failed to set DUT country code to {country_code} after "
f"{TIME_TO_WAIT_FOR_COUNTRY_CODE}s"
)
def setup_ap(
self,
channel: int,
channel_bandwidth: int,
security_profile: Optional[Security] = None,
) -> str:
"""Start network on AP with basic configuration.
Args:
channel: channel to use for network
channel_bandwidth: channel bandwidth in mhz to use for network,
security_profile: security type to use or None if open
Returns:
SSID of the newly created and running network
Raises:
ConnectionError if network is not started successfully.
"""
if channel > MAX_2_4_CHANNEL:
vht_bandwidth = channel_bandwidth
else:
vht_bandwidth = None
if channel_bandwidth == hostapd_constants.CHANNEL_BANDWIDTH_20MHZ:
n_capabilities = N_CAPABILITIES_DEFAULT + [
hostapd_constants.N_CAPABILITY_HT20
]
elif (
channel_bandwidth == hostapd_constants.CHANNEL_BANDWIDTH_40MHZ
or channel_bandwidth == hostapd_constants.CHANNEL_BANDWIDTH_80MHZ
):
if hostapd_config.ht40_plus_allowed(channel):
extended_channel = [hostapd_constants.N_CAPABILITY_HT40_PLUS]
elif hostapd_config.ht40_minus_allowed(channel):
extended_channel = [hostapd_constants.N_CAPABILITY_HT40_MINUS]
else:
raise ValueError(f"Invalid Channel: {channel}")
n_capabilities = N_CAPABILITIES_DEFAULT + extended_channel
else:
raise ValueError(f"Invalid Bandwidth: {channel_bandwidth}")
ssid = utils.rand_ascii_str(hostapd_constants.AP_SSID_LENGTH_2G)
try:
setup_ap(
access_point=self.access_point,
profile_name="whirlwind",
channel=channel,
security=security_profile,
n_capabilities=n_capabilities,
ac_capabilities=None,
force_wmm=True,
ssid=ssid,
vht_bandwidth=vht_bandwidth,
setup_bridge=True,
)
except Exception as err:
raise ConnectionError(
f"Failed to setup ap on channel: {channel}, "
f"channel bandwidth: {channel_bandwidth} MHz. "
) from err
else:
self.log.info(
f"Network (ssid: {ssid}) up on channel {channel} "
f"w/ channel bandwidth {channel_bandwidth} MHz"
)
return ssid
def verify_channel_compliance(
self,
country_code: str,
channel: int,
channel_bandwidth: int,
expect_association: bool,
) -> None:
"""Verify device complies with provided regulatory requirements for a
specific channel and channel bandwidth. Run with generated test cases
in the verify_regulatory_compliance parent test.
"""
self.set_dut_country_code(country_code)
ssid = self.setup_ap(channel, channel_bandwidth)
self.log.info(
f'Attempting to associate to network "{ssid}" on channel '
f"{channel} @ {channel_bandwidth}mhz"
)
associated = self.dut.associate(ssid)
channel_ghz = "2.4" if channel < 36 else "5"
association_code = "c" if associated else "nc"
regulatory_result = f"REGTRACKER: {country_code},{channel},{channel_ghz},{channel_bandwidth},{association_code}"
self.regulatory_results.append(regulatory_result)
self.log.info(regulatory_result)
asserts.assert_true(
associated == expect_association,
f"Expected device to{'' if expect_association else ' NOT'} "
f"associate using country code {country_code} for channel "
f"{channel} with channel bandwidth {channel_bandwidth} MHz.",
)
if __name__ == "__main__":
test_runner.main()