| #!/usr/bin/env python3.4 |
| # |
| # Copyright 2022 The Fuchsia Authors |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # |
| """This test exercises the Scan functionality for the WLAN Policy API.""" |
| |
| import logging |
| from datetime import datetime |
| from typing import Any, List |
| |
| from mobly import asserts, signals, test_runner |
| from typing_extensions import TypeGuard |
| |
| from antlion.controllers.ap_lib import ( |
| hostapd_ap_preset, |
| hostapd_bss_settings, |
| hostapd_constants, |
| hostapd_security, |
| ) |
| from antlion.controllers.fuchsia_device import FuchsiaDevice |
| from antlion.test_utils.wifi import base_test |
| |
| |
| def is_str_list(val: Any) -> TypeGuard[List[str]]: |
| return isinstance(val, list) and all(isinstance(x, str) for x in val) |
| |
| |
| class PolicyScanTest(base_test.WifiBaseTest): |
| """WLAN policy scan test class. |
| |
| Test Bed Requirement: |
| * One or more Fuchsia devices |
| * One Whirlwind Access Point |
| """ |
| |
| def setup_class(self) -> None: |
| super().setup_class() |
| self.log = logging.getLogger() |
| |
| if len(self.fuchsia_devices) < 1: |
| raise signals.TestFailure("No fuchsia devices found.") |
| for fd in self.fuchsia_devices: |
| fd.configure_wlan( |
| association_mechanism="policy", preserve_saved_networks=True |
| ) |
| if len(self.access_points) < 1: |
| raise signals.TestFailure("No access points found.") |
| # Prepare the AP |
| self.access_point = self.access_points[0] |
| self.access_point.stop_all_aps() |
| # Generate network params. |
| bss_settings_2g: List[hostapd_bss_settings.BssSettings] = [] |
| bss_settings_5g: List[hostapd_bss_settings.BssSettings] = [] |
| open_network = self.get_open_network(False, []) |
| self.open_network_2g = open_network["2g"] |
| self.open_network_5g = open_network["5g"] |
| wpa2_settings = self.get_psk_network(False, []) |
| self.wpa2_network_2g = wpa2_settings["2g"] |
| self.wpa2_network_5g = wpa2_settings["5g"] |
| bss_settings_2g.append( |
| hostapd_bss_settings.BssSettings( |
| name=self.wpa2_network_2g["SSID"], |
| ssid=self.wpa2_network_2g["SSID"], |
| security=hostapd_security.Security( |
| security_mode=self.wpa2_network_2g["security"], |
| password=self.wpa2_network_2g["password"], |
| ), |
| ) |
| ) |
| bss_settings_5g.append( |
| hostapd_bss_settings.BssSettings( |
| name=self.wpa2_network_5g["SSID"], |
| ssid=self.wpa2_network_5g["SSID"], |
| security=hostapd_security.Security( |
| security_mode=self.wpa2_network_5g["security"], |
| password=self.wpa2_network_5g["password"], |
| ), |
| ) |
| ) |
| self.ap_2g = hostapd_ap_preset.create_ap_preset( |
| iface_wlan_2g=self.access_points[0].wlan_2g, |
| iface_wlan_5g=self.access_points[0].wlan_5g, |
| channel=hostapd_constants.AP_DEFAULT_CHANNEL_2G, |
| ssid=self.open_network_2g["SSID"], |
| bss_settings=bss_settings_2g, |
| ) |
| self.ap_5g = hostapd_ap_preset.create_ap_preset( |
| iface_wlan_2g=self.access_points[0].wlan_2g, |
| iface_wlan_5g=self.access_points[0].wlan_5g, |
| channel=hostapd_constants.AP_DEFAULT_CHANNEL_5G, |
| ssid=self.open_network_5g["SSID"], |
| bss_settings=bss_settings_5g, |
| ) |
| # Start the networks |
| self.access_point.start_ap(hostapd_config=self.ap_2g) |
| self.access_point.start_ap(hostapd_config=self.ap_5g) |
| # Save the SSIDs |
| self.all_ssids = [ |
| self.open_network_2g["SSID"], |
| self.wpa2_network_2g["SSID"], |
| self.open_network_5g["SSID"], |
| self.wpa2_network_5g["SSID"], |
| ] |
| |
| def setup_test(self) -> None: |
| super().setup_test() |
| for fd in self.fuchsia_devices: |
| # stub for setting up all the fuchsia devices in the testbed. |
| asserts.assert_true( |
| fd.wlan_policy_controller.remove_all_networks_and_wait_for_no_connections(), |
| f"Failed to remove all networks on {fd.mdns_name}", |
| ) |
| |
| def teardown_test(self) -> None: |
| for fd in self.fuchsia_devices: |
| # stub until policy layer has something useful to use here. |
| pass |
| super().teardown_test() |
| |
| """Helper Functions""" |
| |
| def perform_scan(self, fd: FuchsiaDevice) -> List[str]: |
| """Initiates scan on a Fuchsia device and returns results |
| |
| Args: |
| fd: A fuchsia device |
| |
| Raises: |
| signals.TestFailure: if an error is reported by the device during |
| the scan |
| |
| Returns: |
| A list of scan results |
| """ |
| start_time = datetime.now() |
| |
| scan_response = fd.sl4f.wlan_policy_lib.wlanScanForNetworks() |
| |
| # first check if we received an error |
| if scan_response.get("error") is not None: |
| # the response indicates an error - log and raise failure |
| raise signals.TestFailure( |
| f"Aborting test - scan failed with error: {scan_response.get('error')}" |
| ) |
| |
| # the scan command did not get an error response - go ahead |
| # and check for scan results |
| scan_results = scan_response["result"] |
| if not is_str_list(scan_results): |
| raise TypeError( |
| "Expected wlan_policy.scan_networks to return a list of strings, " |
| f"got {type(scan_results)}" |
| ) |
| |
| total_time_ms = (datetime.now() - start_time).total_seconds() * 1000 |
| |
| self.log.info("scan contained %d results", len(scan_results)) |
| self.log.info("scan time: %d ms", total_time_ms) |
| |
| return scan_results |
| |
| def connect_to_network(self, network: base_test.Network, fd: FuchsiaDevice) -> None: |
| """Connects the Fuchsia device to the specified network |
| |
| Args: |
| wlan_network_params: A dictionary containing wlan information. |
| fd: A fuchsia device |
| |
| Raises: |
| signals.TestFailure: if the device fails to connect |
| """ |
| # TODO(mnck): use the Policy version of this call, when it is available. |
| connection_response = fd.wlan_policy_controller.save_and_connect( |
| network["SSID"], |
| network["security"].fuchsia_security_type(), |
| network["password"], |
| ) |
| if not connection_response: |
| raise signals.TestFailure("Aborting test - Connect call failed") |
| self.log.info("Network connection successful.") |
| |
| def assert_network_is_in_results(self, scan_results: List[str], ssid: str) -> None: |
| """Verified scan results contain a specified network |
| |
| Args: |
| scan_results: Scan results from a fuchsia Policy API scan |
| ssid: SSID for network that should be in the results |
| |
| Raises: |
| signals.TestFailure: if the network is not present in the scan |
| results |
| """ |
| asserts.assert_true( |
| ssid in scan_results, |
| f'Network "{ssid}" was not found in scan results: {scan_results}', |
| ) |
| |
| """Tests""" |
| |
| def test_basic_scan_request(self) -> None: |
| """Verify a scan returns all expected networks""" |
| for fd in self.fuchsia_devices: |
| scan_results = self.perform_scan(fd) |
| if len(scan_results) == 0: |
| raise signals.TestFailure("Scan failed or did not " "find any networks") |
| for ssid in self.all_ssids: |
| self.assert_network_is_in_results(scan_results, ssid) |
| |
| def test_scan_while_connected_open_network_2g(self) -> None: |
| """Connect to an open 2g network and perform a scan""" |
| for fd in self.fuchsia_devices: |
| self.connect_to_network(self.open_network_2g, fd) |
| scan_results = self.perform_scan(fd) |
| for ssid in self.all_ssids: |
| self.assert_network_is_in_results(scan_results, ssid) |
| |
| def test_scan_while_connected_wpa2_network_2g(self) -> None: |
| """Connect to a WPA2 2g network and perform a scan""" |
| for fd in self.fuchsia_devices: |
| self.connect_to_network(self.wpa2_network_2g, fd) |
| scan_results = self.perform_scan(fd) |
| for ssid in self.all_ssids: |
| self.assert_network_is_in_results(scan_results, ssid) |
| |
| def test_scan_while_connected_open_network_5g(self) -> None: |
| """Connect to an open 5g network and perform a scan""" |
| for fd in self.fuchsia_devices: |
| self.connect_to_network(self.open_network_5g, fd) |
| scan_results = self.perform_scan(fd) |
| for ssid in self.all_ssids: |
| self.assert_network_is_in_results(scan_results, ssid) |
| |
| def test_scan_while_connected_wpa2_network_5g(self) -> None: |
| """Connect to a WPA2 5g network and perform a scan""" |
| for fd in self.fuchsia_devices: |
| self.connect_to_network(self.wpa2_network_5g, fd) |
| scan_results = self.perform_scan(fd) |
| for ssid in self.all_ssids: |
| self.assert_network_is_in_results(scan_results, ssid) |
| |
| |
| if __name__ == "__main__": |
| test_runner.main() |