| #!/usr/bin/env python3 |
| # |
| # Copyright 2022 The Fuchsia Authors |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| import multiprocessing |
| import time |
| from dataclasses import dataclass |
| from typing import Any, Dict, List, Optional, Tuple |
| |
| from antlion import logger, signals |
| from antlion.controllers.ap_lib.hostapd_security import FuchsiaSecurityType |
| from antlion.controllers.fuchsia_lib.sl4f import SL4F |
| from antlion.controllers.fuchsia_lib.ssh import FuchsiaSSHProvider |
| from antlion.controllers.fuchsia_lib.wlan_lib import FuchsiaWlanLib |
| |
| SAVED_NETWORKS = "saved_networks" |
| CLIENT_STATE = "client_connections_state" |
| CONNECTIONS_ENABLED = "ConnectionsEnabled" |
| CONNECTIONS_DISABLED = "ConnectionsDisabled" |
| |
| STATE_CONNECTED = "Connected" |
| STATE_CONNECTING = "Connecting" |
| STATE_DISCONNECTED = "Disconnected" |
| STATE_CONNECTION_STOPPED = "ConnectionStopped" |
| |
| SESSION_MANAGER_TIMEOUT_SEC = 10 |
| FUCHSIA_DEFAULT_WLAN_CONFIGURE_TIMEOUT = 30 |
| DEFAULT_GET_UPDATE_TIMEOUT = 60 |
| |
| |
| class WlanPolicyControllerError(signals.ControllerError): |
| pass |
| |
| |
| @dataclass |
| class PreservedState: |
| saved_networks: Optional[List[Dict[str, Any]]] |
| client_connections_state: Optional[str] |
| |
| |
| @dataclass |
| class GetCountryCodeFromPhyParams: |
| wlan_lib: FuchsiaWlanLib |
| id: str |
| |
| |
| def get_country_code_from_phy(params: GetCountryCodeFromPhyParams) -> Tuple[str, str]: |
| get_country_response = params.wlan_lib.wlanGetCountry(params.id) |
| if get_country_response.get("error"): |
| raise ConnectionError( |
| f"Failed to query PHY ID ({params.id}) for country. " |
| f'Error: {get_country_response["error"]}' |
| ) |
| country_code = "".join( |
| [chr(ascii_char) for ascii_char in get_country_response["result"]] |
| ) |
| return (params.id, country_code) |
| |
| |
| class WlanPolicyController: |
| """Contains methods related to the wlan policy layer, to be used in the |
| FuchsiaDevice object. |
| |
| Attributes: |
| sl4f: sl4f module for communicating to the WLAN policy controller. |
| ssh: transport to fuchsia device to stop component processes. |
| """ |
| |
| def __init__(self, sl4f: SL4F, ssh: FuchsiaSSHProvider): |
| """ |
| Args: |
| sl4f: sl4f module for communicating to the WLAN policy controller. |
| ssh: transport to fuchsia device to stop component processes. |
| """ |
| self.preserved_networks_and_client_state: Optional[PreservedState] = None |
| self.policy_configured = False |
| self.sl4f = sl4f |
| self.ssh = ssh |
| self.log = logger.create_tagged_trace_logger( |
| f"WlanPolicyController | {self.ssh.config.host_name}" |
| ) |
| |
| def configure_wlan( |
| self, |
| preserve_saved_networks: bool, |
| timeout_sec: int = FUCHSIA_DEFAULT_WLAN_CONFIGURE_TIMEOUT, |
| ) -> None: |
| """Sets up wlan policy layer. |
| |
| Args: |
| preserve_saved_networks: whether to clear existing saved |
| networks and client state, to be restored at test close. |
| timeout_sec: time to wait for device to configure WLAN. |
| """ |
| |
| # We need to stop session manager to free control of |
| # fuchsia.wlan.policy.ClientController, which can only be used by a |
| # single caller at a time. SL4F needs the ClientController to trigger |
| # WLAN policy state changes. On eng builds the session_manager can be |
| # restarted after being stopped during reboot so we attempt killing the |
| # session manager process for 10 seconds. |
| # See https://cs.opensource.google/fuchsia/fuchsia/+/main:sdk/fidl/fuchsia.wlan.policy/client_provider.fidl |
| if "cast_agent.cm" in self.ssh.run("ps").stdout: |
| end_time_session_manager_sec = time.time() + SESSION_MANAGER_TIMEOUT_SEC |
| while time.time() < end_time_session_manager_sec: |
| self.ssh.stop_component("session_manager", is_cfv2_component=True) |
| |
| # Acquire control of policy layer |
| end_time_config_sec = time.time() + timeout_sec |
| controller_errors = [] |
| while time.time() < end_time_config_sec: |
| # Create a client controller |
| response = self.sl4f.wlan_policy_lib.wlanCreateClientController() |
| if response.get("error"): |
| controller_errors.append(response["error"]) |
| self.log.debug(response["error"]) |
| time.sleep(1) |
| continue |
| break |
| else: |
| self.log.warning( |
| "Failed to create and use a WLAN policy client controller. Errors: [" |
| + "; ".join(controller_errors) |
| + "]" |
| ) |
| raise WlanPolicyControllerError( |
| "Failed to create and use a WLAN policy client controller." |
| ) |
| |
| self.log.info("ACTS tests now have control of the WLAN policy layer.") |
| |
| if preserve_saved_networks and not self.preserved_networks_and_client_state: |
| self.preserved_networks_and_client_state = ( |
| self.remove_and_preserve_networks_and_client_state() |
| ) |
| if not self.start_client_connections(): |
| raise WlanPolicyControllerError( |
| "Failed to start client connections during configuration." |
| ) |
| |
| self.policy_configured = True |
| |
| def _deconfigure_wlan(self) -> None: |
| if not self.stop_client_connections(): |
| raise WlanPolicyControllerError( |
| "Failed to stop client connections during deconfiguration." |
| ) |
| self.policy_configured = False |
| |
| def clean_up(self) -> None: |
| if self.preserved_networks_and_client_state is not None: |
| # It is possible for policy to have been configured before, but |
| # deconfigured before test end. In this case, in must be setup |
| # before restoring networks |
| if not self.policy_configured: |
| self.configure_wlan(False) |
| |
| self.restore_preserved_networks_and_client_state() |
| |
| def start_client_connections(self) -> bool: |
| """Allow device to connect to networks via policy layer (including |
| autoconnecting to saved networks). |
| |
| Returns: |
| True, if successful. False otherwise.""" |
| start_response = self.sl4f.wlan_policy_lib.wlanStartClientConnections() |
| if start_response.get("error"): |
| self.log.error( |
| f"Failed to start client connections. Err: {start_response['error']}" |
| ) |
| return False |
| return True |
| |
| def stop_client_connections(self) -> bool: |
| """Prevent device from connecting and autoconnecting to networks via the |
| policy layer. |
| |
| Returns: |
| True, if successful. False otherwise.""" |
| stop_response = self.sl4f.wlan_policy_lib.wlanStopClientConnections() |
| if stop_response.get("error"): |
| self.log.error( |
| f"Failed to stop client connections. Err: {stop_response['error']}" |
| ) |
| return False |
| return True |
| |
| def save_and_connect( |
| self, |
| ssid: str, |
| security: FuchsiaSecurityType, |
| password: Optional[str] = None, |
| timeout: int = 30, |
| ): |
| """Saves and connects to the network. This is the policy version of |
| connect and check_connect_response because the policy layer |
| requires a saved network and the policy connect does not return |
| success or failure |
| |
| Args: |
| ssid: string, the network name |
| security: string, security type of network (see sl4f.wlan_policy_lib) |
| password: string, the credential of the network if applicable |
| timeout: int, time in seconds to wait for connection |
| |
| Returns: |
| True, if successful. False otherwise. |
| """ |
| # Save network and check response |
| if not self.save_network(ssid, security, password=password): |
| return False |
| # Make connect call and check response |
| self.sl4f.wlan_policy_lib.wlanSetNewListener() |
| if not self.send_connect_command(ssid, security): |
| return False |
| return self.wait_for_connect(ssid, security, timeout=timeout) |
| |
| def save_and_wait_for_autoconnect( |
| self, |
| ssid: str, |
| security: FuchsiaSecurityType, |
| password: Optional[str] = None, |
| timeout: int = 30, |
| ) -> bool: |
| """Saves a network and waits, expecting an autoconnection to the newly |
| saved network. This differes from save_and_connect, as it doesn't |
| expressly trigger a connection first. There are cases in which an |
| autoconnect won't occur after a save (like if the device is connected |
| already), so this should be used with caution to test very specific |
| situations. |
| |
| Args: |
| ssid: string, the network name |
| security: string, security type of network (see sl4f.wlan_policy_lib) |
| password: string, the credential of the network if applicable |
| timeout: int, time in seconds to wait for connection |
| |
| Returns: |
| True, if successful. False otherwise. |
| """ |
| if not self.save_network(ssid, security, password=password): |
| return False |
| return self.wait_for_connect(ssid, security, timeout=timeout) |
| |
| def remove_and_wait_for_disconnect( |
| self, |
| ssid: str, |
| security_type: FuchsiaSecurityType, |
| password: Optional[str] = None, |
| state: Optional[str] = None, |
| status: Optional[str] = None, |
| timeout: int = 30, |
| ) -> bool: |
| """Removes a single network and waits for a disconnect. It is not |
| guaranteed the device will stay disconnected, as it may autoconnect |
| to a different saved network. |
| |
| Args: |
| ssid: string, the network name |
| security: string, security type of network (see sl4f.wlan_policy_lib) |
| password: string, the credential of the network if applicable |
| state: string, The connection state we are expecting, ie "Disconnected" or |
| "Failed" |
| status: string, The disconnect status we expect, it "ConnectionStopped" or |
| "ConnectionFailed" |
| timeout: int, time in seconds to wait for connection |
| |
| Returns: |
| True, if successful. False otherwise. |
| """ |
| self.sl4f.wlan_policy_lib.wlanSetNewListener() |
| if not self.remove_network(ssid, security_type, password=password): |
| return False |
| return self.wait_for_disconnect( |
| ssid, security_type, state=state, status=status, timeout=timeout |
| ) |
| |
| def remove_all_networks_and_wait_for_no_connections( |
| self, timeout_sec: int = DEFAULT_GET_UPDATE_TIMEOUT |
| ) -> bool: |
| """Removes all networks and waits until device is not connected to any |
| networks. This should be used as the policy version of disconnect. |
| |
| Args: |
| timeout_sec: The time to wait to see no connections. |
| |
| Returns: |
| True, if successful. False otherwise. |
| """ |
| self.sl4f.wlan_policy_lib.wlanSetNewListener() |
| if not self.remove_all_networks(): |
| self.log.error( |
| "Failed to remove all networks. Cannot continue to " |
| "wait_for_no_connections." |
| ) |
| return False |
| return self.wait_for_no_connections(timeout_sec=timeout_sec) |
| |
| def save_network( |
| self, |
| ssid: str, |
| security_type: FuchsiaSecurityType, |
| password: Optional[str] = None, |
| ) -> bool: |
| """Save a network via the policy layer. |
| |
| Args: |
| ssid: string, the network name |
| security: string, security type of network (see sl4f.wlan_policy_lib) |
| password: string, the credential of the network if applicable |
| |
| Returns: |
| True, if successful. False otherwise. |
| """ |
| save_response = self.sl4f.wlan_policy_lib.wlanSaveNetwork( |
| ssid, security_type, target_pwd=password |
| ) |
| if save_response.get("error"): |
| self.log.error( |
| f"Failed to save network {ssid} with error: {save_response['error']}" |
| ) |
| return False |
| return True |
| |
| def remove_network( |
| self, |
| ssid: str, |
| security_type: FuchsiaSecurityType, |
| password: Optional[str] = None, |
| ) -> bool: |
| """Remove a saved network via the policy layer. |
| |
| Args: |
| ssid: string, the network name |
| security: string, security type of network (see sl4f.wlan_policy_lib) |
| password: string, the credential of the network if applicable |
| |
| Returns: |
| True, if successful. False otherwise. |
| """ |
| remove_response = self.sl4f.wlan_policy_lib.wlanRemoveNetwork( |
| ssid, security_type, target_pwd=password |
| ) |
| if remove_response.get("error"): |
| self.log.error( |
| "Failed to remove network %s with error: %s" |
| % (ssid, remove_response["error"]) |
| ) |
| return False |
| return True |
| |
| def remove_all_networks(self) -> bool: |
| """Removes all saved networks from device. |
| |
| Returns: |
| True, if successful. False otherwise. |
| """ |
| remove_all_response = self.sl4f.wlan_policy_lib.wlanRemoveAllNetworks() |
| if remove_all_response.get("error"): |
| self.log.error( |
| f"Error occurred removing all networks: {remove_all_response['error']}" |
| ) |
| return False |
| return True |
| |
| def get_saved_networks(self) -> List[Any]: |
| """Retrieves saved networks from device. |
| |
| Returns: |
| list of saved networks |
| |
| Raises: |
| WlanPolicyControllerError, if retrieval fails. |
| """ |
| saved_networks_response = self.sl4f.wlan_policy_lib.wlanGetSavedNetworks() |
| if saved_networks_response.get("error"): |
| raise WlanPolicyControllerError( |
| f"Failed to retrieve saved networks: {saved_networks_response['error']}" |
| ) |
| return saved_networks_response["result"] |
| |
| def send_connect_command( |
| self, ssid: str, security_type: FuchsiaSecurityType |
| ) -> bool: |
| """Sends a connect command to a network that is already saved. This does |
| not wait to guarantee the connection is successful (for that, use |
| save_and_connect). |
| |
| Args: |
| ssid: string, the network name |
| security: string, security type of network (see sl4f.wlan_policy_lib) |
| password: string, the credential of the network if applicable |
| |
| Returns: |
| True, if command send successfully. False otherwise. |
| """ |
| connect_response = self.sl4f.wlan_policy_lib.wlanConnect(ssid, security_type) |
| if connect_response.get("error"): |
| self.log.error( |
| "Error occurred when sending policy connect command: %s" |
| % connect_response["error"] |
| ) |
| return False |
| return True |
| |
| def wait_for_connect( |
| self, ssid: str, security_type: FuchsiaSecurityType, timeout: int = 30 |
| ) -> bool: |
| """Wait until the device has connected to the specified network. |
| Args: |
| ssid: string, the network name |
| security: string, security type of network (see sl4f.wlan_policy_lib) |
| timeout: int, seconds to wait for a update showing connection |
| Returns: |
| True if we see a connect to the network, False otherwise. |
| """ |
| # Wait until we've connected. |
| end_time = time.time() + timeout |
| while time.time() < end_time: |
| time_left = max(1, int(end_time - time.time())) |
| |
| try: |
| update = self.sl4f.wlan_policy_lib.wlanGetUpdate(timeout=time_left) |
| except TimeoutError: |
| self.log.error( |
| "Timed out waiting for response from device " |
| 'while waiting for network with SSID "%s" to ' |
| "connect. Device took too long to connect or " |
| "the request timed out for another reason." % ssid |
| ) |
| self.sl4f.wlan_policy_lib.wlanSetNewListener() |
| return False |
| if update.get("error"): |
| # This can occur for many reasons, so it is not necessarily a |
| # failure. |
| self.log.debug( |
| f"Error occurred getting status update: {update['error']}" |
| ) |
| continue |
| |
| for network in update["result"]["networks"]: |
| if network["id"]["ssid"] == ssid or network["id"][ |
| "type_" |
| ].lower() == str(security_type): |
| if "state" not in network: |
| raise WlanPolicyControllerError( |
| "WLAN status missing state field." |
| ) |
| elif network["state"].lower() == STATE_CONNECTED.lower(): |
| return True |
| # Wait a bit before requesting another status update |
| time.sleep(1) |
| # Stopped getting updates because out timeout |
| self.log.error(f'Timed out waiting for network with SSID "{ssid}" to connect') |
| return False |
| |
| def wait_for_disconnect( |
| self, |
| ssid: str, |
| security_type: FuchsiaSecurityType, |
| state: Optional[str] = None, |
| status: Optional[str] = None, |
| timeout: int = 30, |
| ) -> bool: |
| """Wait for a disconnect of the specified network on the given device. This |
| will check that the correct connection state and disconnect status are |
| given in update. If we do not see a disconnect after some time, |
| return false. |
| |
| Args: |
| ssid: string, the network name |
| security: string, security type of network (see sl4f.wlan_policy_lib) |
| state: string, The connection state we are expecting, ie "Disconnected" or |
| "Failed" |
| status: string, The disconnect status we expect, it "ConnectionStopped" or |
| "ConnectionFailed" |
| timeout: int, seconds to wait before giving up |
| |
| Returns: True if we saw a disconnect as specified, or False otherwise. |
| """ |
| if not state: |
| state = STATE_DISCONNECTED |
| if not status: |
| status = STATE_CONNECTION_STOPPED |
| |
| end_time = time.time() + timeout |
| while time.time() < end_time: |
| time_left = max(1, int(end_time - time.time())) |
| try: |
| update = self.sl4f.wlan_policy_lib.wlanGetUpdate(timeout=time_left) |
| except TimeoutError: |
| self.log.error( |
| "Timed out waiting for response from device " |
| 'while waiting for network with SSID "%s" to ' |
| "disconnect. Device took too long to disconnect " |
| "or the request timed out for another reason." % ssid |
| ) |
| self.sl4f.wlan_policy_lib.wlanSetNewListener() |
| return False |
| |
| if update.get("error"): |
| # This can occur for many reasons, so it is not necessarily a |
| # failure. |
| self.log.debug( |
| f"Error occurred getting status update: {update['error']}" |
| ) |
| continue |
| # Update should include network, either connected to or recently disconnected. |
| if len(update["result"]["networks"]) == 0: |
| raise WlanPolicyControllerError("WLAN state update is missing network.") |
| |
| for network in update["result"]["networks"]: |
| if network["id"]["ssid"] == ssid or network["id"][ |
| "type_" |
| ].lower() == str(security_type): |
| if "state" not in network or "status" not in network: |
| raise WlanPolicyControllerError( |
| "Client state summary's network is missing fields" |
| ) |
| # If still connected, we will wait for another update and check again |
| elif network["state"].lower() == STATE_CONNECTED.lower(): |
| continue |
| elif network["state"].lower() == STATE_CONNECTING.lower(): |
| self.log.error( |
| 'Update is "Connecting", but device should already be ' |
| "connected; expected disconnect" |
| ) |
| return False |
| # Check that the network state and disconnect status are expected, ie |
| # that it isn't ConnectionFailed when we expect ConnectionStopped |
| elif ( |
| network["state"].lower() != state.lower() |
| or network["status"].lower() != status.lower() |
| ): |
| self.log.error( |
| "Connection failed: a network failure occurred that is unrelated" |
| "to remove network or incorrect status update. \nExpected state: " |
| f"{state}, Status: {status},\nActual update: {network}" |
| ) |
| return False |
| else: |
| return True |
| # Wait a bit before requesting another status update |
| time.sleep(1) |
| # Stopped getting updates because out timeout |
| self.log.error(f'Timed out waiting for network with SSID "{ssid}" to connect') |
| return False |
| |
| def wait_for_no_connections( |
| self, timeout_sec: int = DEFAULT_GET_UPDATE_TIMEOUT |
| ) -> bool: |
| """Waits to see that there are no existing connections the device. This |
| is the simplest way to watch for disconnections when only a single |
| network is saved/present. |
| |
| Args: |
| timeout_sec: The time to wait to see no connections. |
| |
| Returns: |
| True, if successful. False, if still connected after timeout. |
| """ |
| # If there are already no existing connections when this function is called, |
| # then an update won't be generated by the device, and we'll time out. |
| # Force an update by getting a new listener. |
| self.sl4f.wlan_policy_lib.wlanSetNewListener() |
| end_time = time.time() + timeout_sec |
| while time.time() < end_time: |
| time_left = max(1, int(end_time - time.time())) |
| try: |
| update = self.sl4f.wlan_policy_lib.wlanGetUpdate(timeout=time_left) |
| except TimeoutError: |
| self.log.info( |
| "Timed out getting status update while waiting for all" |
| " connections to end." |
| ) |
| self.sl4f.wlan_policy_lib.wlanSetNewListener() |
| return False |
| |
| if update["error"] != None: |
| self.log.info("Failed to get status update") |
| return False |
| # If any network is connected or being connected to, wait for them |
| # to disconnect. |
| if any( |
| network["state"].lower() |
| in {STATE_CONNECTED.lower(), STATE_CONNECTING.lower()} |
| for network in update["result"]["networks"] |
| ): |
| continue |
| else: |
| return True |
| return False |
| |
| def remove_and_preserve_networks_and_client_state(self) -> PreservedState: |
| """Preserves networks already saved on devices before removing them to |
| setup up for a clean test environment. Records the state of client |
| connections before tests. |
| |
| Raises: |
| WlanPolicyControllerError, if the network removal is unsuccessful |
| """ |
| # Save preexisting saved networks |
| state = PreservedState(saved_networks=None, client_connections_state=None) |
| saved_networks_response = self.sl4f.wlan_policy_lib.wlanGetSavedNetworks() |
| if saved_networks_response.get("error"): |
| raise WlanPolicyControllerError( |
| "Failed to get preexisting saved networks: " |
| f'{saved_networks_response["error"]}' |
| ) |
| state.saved_networks = saved_networks_response.get("result") |
| |
| # Remove preexisting saved networks |
| if not self.remove_all_networks(): |
| raise WlanPolicyControllerError( |
| "Failed to clear networks and disconnect at FuchsiaDevice creation." |
| ) |
| |
| self.sl4f.wlan_policy_lib.wlanSetNewListener() |
| update_response = self.sl4f.wlan_policy_lib.wlanGetUpdate() |
| update_result = update_response.get("result", {}) |
| state.client_connections_state = update_result.get("state") |
| if update_result.get("state") is None: |
| self.log.warn( |
| "Failed to get update; test will not start or " |
| "stop client connections at the end of the test." |
| ) |
| |
| self.log.info("Saved networks cleared and preserved.") |
| return state |
| |
| def restore_preserved_networks_and_client_state(self) -> None: |
| """Restore saved networks and client state onto device if they have |
| been preserved. |
| """ |
| if self.preserved_networks_and_client_state is None: |
| self.log.info("No preserved networks or client state to restore") |
| return |
| |
| if not self.remove_all_networks(): |
| self.log.warn("Failed to remove saved networks before restore.") |
| |
| restore_success = True |
| |
| saved_networks = self.preserved_networks_and_client_state.saved_networks |
| if saved_networks is not None: |
| for network in saved_networks: |
| if not self.save_network( |
| network["ssid"], |
| network["security_type"], |
| network["credential_value"], |
| ): |
| self.log.warn(f'Failed to restore network "{network["ssid"]}"') |
| restore_success = False |
| |
| client_state = self.preserved_networks_and_client_state.client_connections_state |
| if client_state is not None: |
| state_restored = ( |
| self.start_client_connections() |
| if client_state == CONNECTIONS_ENABLED |
| else self.stop_client_connections() |
| ) |
| if not state_restored: |
| self.log.warn("Failed to restore client connections state.") |
| restore_success = False |
| |
| if restore_success: |
| self.log.info("Preserved networks and client state restored.") |
| self.preserved_networks_and_client_state = None |
| |
| def _get_phy_ids(self) -> List[str]: |
| """Get a list of WLAN physical interfaces.""" |
| phy_ids_response = self.sl4f.wlan_lib.wlanPhyIdList() |
| if phy_ids_response.get("error"): |
| raise ConnectionError( |
| f'Failed to get phy ids from DUT. Error: {phy_ids_response["error"]}' |
| ) |
| return phy_ids_response["result"] |
| |
| def _get_phy_country_codes(self) -> Dict[str, str]: |
| """Get mapping of WLAN interfaces to the country code they are set to.""" |
| phy_ids = self._get_phy_ids() |
| args = [GetCountryCodeFromPhyParams(self.sl4f.wlan_lib, id) for id in phy_ids] |
| with multiprocessing.Pool() as p: |
| items = p.map(get_country_code_from_phy, args) |
| return dict(items) |
| |
| def set_country_code(self, country_code: str, timeout_sec: int = 10) -> None: |
| """Set and verify the country code on the device. |
| |
| Args: |
| country_code: 2 character country code to set |
| timeout_sec: seconds to wait until timing out |
| |
| Raises: |
| TimeoutError: the device did not set all interfaces to country_code within |
| timeout_sec |
| """ |
| unique_country_codes = set(self._get_phy_country_codes().values()) |
| if len(unique_country_codes) == 1 and country_code in unique_country_codes: |
| # The country code is already set on all WLAN phys; skip setting the country |
| # code again. |
| self.log.debug(f"Country code already set to {country_code}") |
| return |
| |
| self.log.debug(f"Setting DUT country code to {country_code}") |
| country_code_response = self.sl4f.regulatory_region_lib.setRegion(country_code) |
| if country_code_response.get("error"): |
| raise EnvironmentError( |
| f"Failed to set country code ({country_code}) on DUT. " |
| f'Error: {country_code_response["error"]}' |
| ) |
| |
| self.log.debug( |
| f"Verifying DUT country code was correctly set to {country_code}." |
| ) |
| |
| end_time = time.time() + timeout_sec |
| while time.time() < end_time: |
| for phy_id, code in self._get_phy_country_codes().items(): |
| if code != country_code: |
| # Interface has incorrect country code; continue to wait. |
| self.log.debug( |
| f"WLAN interface {phy_id} has incorrect country code set: " |
| f"got {code}, wanted {country_code}" |
| ) |
| break |
| else: |
| # All interfaces have the expected country code. |
| self.log.info(f"Successfully set DUT country code to {country_code}.") |
| return |
| time.sleep(0.5) |
| |
| raise TimeoutError( |
| f"Failed to set country code to {country_code} within {timeout_sec}s" |
| ) |