| #!/usr/bin/env python3 |
| # |
| # Copyright 2022 The Fuchsia Authors |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| import time |
| from dataclasses import dataclass |
| |
| from honeydew.typing.wlan import ( |
| ConnectionState, |
| DisconnectStatus, |
| NetworkConfig, |
| NetworkState, |
| RequestStatus, |
| WlanClientState, |
| ) |
| |
| from antlion import logger, signals |
| from antlion.controllers.ap_lib.hostapd_security import FuchsiaSecurityType |
| from antlion.controllers.fuchsia_lib.sl4f import SL4F |
| from antlion.controllers.fuchsia_lib.ssh import FuchsiaSSHProvider |
| from antlion.controllers.fuchsia_lib.wlan_policy_lib import WlanPolicyError |
| |
| SESSION_MANAGER_TIMEOUT_SEC = 10 |
| FUCHSIA_DEFAULT_WLAN_CONFIGURE_TIMEOUT = 30 |
| DEFAULT_GET_UPDATE_TIMEOUT = 60 |
| |
| |
| class WlanPolicyControllerError(signals.ControllerError): |
| pass |
| |
| |
| @dataclass |
| class PreservedState: |
| saved_networks: list[NetworkConfig] | None |
| client_connections_state: WlanClientState | None |
| |
| |
| @dataclass |
| class ClientState: |
| state: str |
| networks: list[dict[str, object]] |
| |
| |
| # TODO(http://b/309854439): Add a ClientStateWatcher and refactor tests to allow test |
| # developers more control when update listeners are set and the client update state is |
| # reset. |
| class WlanPolicyController: |
| """Contains methods related to the wlan policy layer, to be used in the |
| FuchsiaDevice object. |
| |
| Attributes: |
| sl4f: sl4f module for communicating to the WLAN policy controller. |
| ssh: transport to fuchsia device to stop component processes. |
| """ |
| |
| def __init__(self, sl4f: SL4F, ssh: FuchsiaSSHProvider) -> None: |
| """ |
| Args: |
| sl4f: sl4f module for communicating to the WLAN policy controller. |
| ssh: transport to fuchsia device to stop component processes. |
| """ |
| self.preserved_networks_and_client_state: PreservedState | None = None |
| self.policy_configured = False |
| self.sl4f = sl4f |
| self.ssh = ssh |
| self.log = logger.create_tagged_trace_logger( |
| f"WlanPolicyController | {self.ssh.config.host_name}" |
| ) |
| |
| def configure_wlan( |
| self, |
| preserve_saved_networks: bool, |
| timeout_sec: int = FUCHSIA_DEFAULT_WLAN_CONFIGURE_TIMEOUT, |
| ) -> None: |
| """Sets up wlan policy layer. |
| |
| Args: |
| preserve_saved_networks: whether to clear existing saved |
| networks and client state, to be restored at test close. |
| timeout_sec: time to wait for device to configure WLAN. |
| """ |
| |
| # We need to stop session manager to free control of |
| # fuchsia.wlan.policy.ClientController, which can only be used by a |
| # single caller at a time. SL4F needs the ClientController to trigger |
| # WLAN policy state changes. On eng builds the session_manager can be |
| # restarted after being stopped during reboot so we attempt killing the |
| # session manager process for 10 seconds. |
| # See https://cs.opensource.google/fuchsia/fuchsia/+/main:sdk/fidl/fuchsia.wlan.policy/client_provider.fidl |
| if "cast_agent.cm" in self.ssh.run("ps").stdout: |
| session_manager_expiration = time.time() + SESSION_MANAGER_TIMEOUT_SEC |
| while time.time() < session_manager_expiration: |
| self.ssh.stop_component("session_manager", is_cfv2_component=True) |
| |
| # Acquire control of policy layer |
| self.sl4f.wlan_policy_lib.create_client_controller() |
| self.log.info("ACTS tests now have control of the WLAN policy layer.") |
| |
| if preserve_saved_networks and not self.preserved_networks_and_client_state: |
| self.preserved_networks_and_client_state = ( |
| self.remove_and_preserve_networks_and_client_state() |
| ) |
| |
| self.sl4f.wlan_policy_lib.start_client_connections() |
| self.policy_configured = True |
| |
| def _deconfigure_wlan(self) -> None: |
| self.sl4f.wlan_policy_lib.stop_client_connections() |
| self.policy_configured = False |
| |
| def clean_up(self) -> None: |
| if self.preserved_networks_and_client_state is not None: |
| # It is possible for policy to have been configured before, but |
| # deconfigured before test end. In this case, in must be setup |
| # before restoring networks |
| if not self.policy_configured: |
| self.configure_wlan(False) |
| |
| self.restore_preserved_networks_and_client_state() |
| |
| def save_and_connect( |
| self, |
| target_ssid: str, |
| security_type: FuchsiaSecurityType, |
| target_pwd: str | None = None, |
| timeout_sec: int = 30, |
| ) -> None: |
| """Saves and connects to the network. |
| |
| This is the policy version of connect and check_connect_response because the |
| policy layer requires a saved network and the policy connect does not return |
| success or failure |
| |
| Args: |
| target_ssid: The network name to connect to. |
| security_type: Security type of network (see sl4f.wlan_policy_lib) |
| target_pwd: The credential of the network if applicable. |
| timeout_sec: Time in seconds to wait for connection. |
| |
| Raises: |
| WlanPolicyControllerError if fails to connect. |
| """ |
| self.sl4f.wlan_policy_lib.save_network( |
| target_ssid, security_type, target_pwd=target_pwd |
| ) |
| status = self.sl4f.wlan_policy_lib.connect(target_ssid, security_type) |
| |
| if status is RequestStatus.ACKNOWLEDGED: |
| self.wait_for_network_state( |
| target_ssid, ConnectionState.CONNECTED, timeout_sec=timeout_sec |
| ) |
| else: |
| self.log.error( |
| f"Failed to connect to {target_ssid}, request status: {status}" |
| ) |
| raise WlanPolicyControllerError( |
| f"Failed to connect to {target_ssid}, request status: {status}" |
| ) |
| |
| def _find_network( |
| self, ssid: str, networks: list[NetworkState] |
| ) -> NetworkState | None: |
| """Helper method to find network in list of network states. |
| |
| Args: |
| ssid: The network name to look for. |
| networks: The list of network states to look in. |
| |
| Returns: |
| Network state of target ssid or None if not found in networks. |
| """ |
| for network in networks: |
| if network.network_identifier.ssid == ssid: |
| return network |
| return None |
| |
| def wait_for_network_state( |
| self, |
| ssid: str, |
| expected_state: ConnectionState, |
| expected_status: DisconnectStatus | None = None, |
| timeout_sec: int = DEFAULT_GET_UPDATE_TIMEOUT, |
| ) -> None: |
| """Waits until the device returns with expected network state. |
| |
| Args: |
| ssid: The network name to check the state of. |
| expected_state: The network state we are waiting to see. |
| expected_status: The disconnect status of the network. |
| timeout_sec: The number of seconds to wait for a update showing connection. |
| |
| Raises: |
| WlanPolicyControllerError: If client update has no networks, if network not |
| present in update network states, or if network fails to converge to |
| expected state or status at end of timeout. |
| """ |
| self.sl4f.wlan_policy_lib.set_new_update_listener() |
| |
| last_err: WlanPolicyError | None = None |
| end_time = time.time() + timeout_sec |
| while time.time() < end_time: |
| time_left = max(1, int(end_time - time.time())) |
| try: |
| client = self.sl4f.wlan_policy_lib.get_update(timeout=time_left) |
| except WlanPolicyError as e: |
| # WlanPolicyError can be thrown if the SL4F command was not successfully |
| # sent, if the command timed out, or if the command returned with an |
| # error code in the 'error' field. We retry here to handle the cases |
| # in negative testing where we expect to recieve an 'error'. |
| last_err = e |
| time.sleep(1) |
| continue |
| |
| network = self._find_network(ssid, client.networks) |
| if network is None: |
| self.log.debug(f"{ssid} not found in client networks") |
| time.sleep(1) |
| continue |
| |
| if network.connection_state is not expected_state: |
| self.log.debug( |
| f'Expected connection state "{expected_state}", ' |
| f'got "{network.connection_state}"' |
| ) |
| time.sleep(1) |
| continue |
| |
| match network.connection_state: |
| case ConnectionState.FAILED | ConnectionState.DISCONNECTED: |
| if ( |
| expected_status |
| and network.disconnect_status is not expected_status |
| ): |
| raise WlanPolicyControllerError( |
| f"Disconnect status is not {expected_status}" |
| ) |
| case ConnectionState.CONNECTED | ConnectionState.CONNECTING: |
| # Normally these network states do not have disconnect status, but |
| # we are setting a default value to CONNECTION_STOPPED |
| if ( |
| network.disconnect_status |
| is not DisconnectStatus.CONNECTION_STOPPED |
| ): |
| raise WlanPolicyControllerError( |
| f"Expected default disconnect status, " |
| f'got "{network.disconnect_status}"' |
| ) |
| # Successfully converged on expected state/status |
| return |
| else: |
| self.log.error( |
| f'Timed out waiting for "{ssid}" to reach state {expected_state} and ' |
| f"status {expected_status}" |
| ) |
| raise WlanPolicyControllerError( |
| f'Timed out waiting for "{ssid}" to reach state {expected_state} and ' |
| f"status {expected_status}" |
| ) from last_err |
| |
| def wait_for_client_state( |
| self, |
| expected_state: WlanClientState, |
| timeout_sec: int = DEFAULT_GET_UPDATE_TIMEOUT, |
| ) -> None: |
| """Waits until the client converges to expected state. |
| |
| Args: |
| expected_state: The client state we are waiting to see. |
| timeout_sec: Duration to wait for the desired_state. |
| |
| Raises: |
| WlanPolicyControllerError: If client still has not converged to expected |
| state at end of timeout. |
| """ |
| self.sl4f.wlan_policy_lib.set_new_update_listener() |
| |
| last_err: WlanPolicyError | None = None |
| end_time = time.time() + timeout_sec |
| while time.time() < end_time: |
| time_left = max(1, int(end_time - time.time())) |
| try: |
| client = self.sl4f.wlan_policy_lib.get_update(timeout=time_left) |
| except WlanPolicyError as e: |
| # WlanPolicyError can be thrown if the SL4F command was not successfully |
| # sent, if the command timed out, or if the command returned with an |
| # error code in the 'error' field. We retry here to handle the cases |
| # in negative testing where we expect to recieve an 'error'. |
| last_err = e |
| time.sleep(1) |
| continue |
| if client.state is not expected_state: |
| # Continue getting updates. |
| time.sleep(1) |
| continue |
| else: |
| return |
| else: |
| self.log.error( |
| f"Client state did not converge to the expected state: {expected_state}" |
| f" Waited:{timeout_sec}s" |
| ) |
| raise WlanPolicyControllerError from last_err |
| |
| def wait_for_no_connections( |
| self, timeout_sec: int = DEFAULT_GET_UPDATE_TIMEOUT |
| ) -> None: |
| """Waits to see that there are no connections to the device. |
| |
| Args: |
| timeout_sec: The time to wait to see no connections. |
| |
| Raises: |
| WlanPolicyControllerError: If client update has no networks or if client |
| still has connections at end of timeout. |
| """ |
| self.sl4f.wlan_policy_lib.set_new_update_listener() |
| |
| last_err: WlanPolicyError | None = None |
| end_time = time.time() + timeout_sec |
| while time.time() < end_time: |
| curr_connected_networks: list[NetworkState] = [] |
| time_left = max(1, int(end_time - time.time())) |
| try: |
| client = self.sl4f.wlan_policy_lib.get_update(timeout=time_left) |
| except WlanPolicyError as e: |
| # WlanPolicyError can be thrown if the SL4F command was not successfully |
| # sent, if the command timed out, or if the command returned with an |
| # error code in the 'error' field. We retry here to handle the cases |
| # in negative testing where we expect to recieve an 'error'. |
| last_err = e |
| time.sleep(1) |
| continue |
| |
| # Iterate through networks checking to see if any are still connected. |
| for network in client.networks: |
| if network.connection_state in { |
| ConnectionState.CONNECTING, |
| ConnectionState.CONNECTED, |
| }: |
| curr_connected_networks.append(network) |
| |
| if len(curr_connected_networks) != 0: |
| # Continue getting updates. |
| time.sleep(1) |
| continue |
| else: |
| return |
| else: |
| self.log.error(f"Networks still connected. Waited: {timeout_sec}s") |
| raise WlanPolicyControllerError from last_err |
| |
| def remove_and_preserve_networks_and_client_state(self) -> PreservedState: |
| """Preserves networks already saved on devices before removing them. |
| |
| This method is used to set up a clean test environment. Records the state of |
| client connections before tests. |
| |
| Returns: |
| PreservedState: State of the client containing NetworkConfigs and client |
| connection state. |
| """ |
| client = self.sl4f.wlan_policy_lib.get_update() |
| networks = self.sl4f.wlan_policy_lib.get_saved_networks() |
| self.sl4f.wlan_policy_lib.remove_all_networks() |
| self.log.info("Saved networks cleared and preserved.") |
| return PreservedState( |
| saved_networks=networks, client_connections_state=client.state |
| ) |
| |
| def restore_preserved_networks_and_client_state(self) -> None: |
| """Restore preserved networks and client state onto device.""" |
| if self.preserved_networks_and_client_state is None: |
| self.log.info("No preserved networks or client state to restore") |
| return |
| |
| self.sl4f.wlan_policy_lib.remove_all_networks() |
| |
| saved_networks = self.preserved_networks_and_client_state.saved_networks |
| if saved_networks is not None: |
| for network in saved_networks: |
| try: |
| self.sl4f.wlan_policy_lib.save_network( |
| network.ssid, |
| network.security_type, |
| network.credential_value, |
| ) |
| except WlanPolicyError: |
| self.log.warn(f'Failed to restore network "{network.ssid}"') |
| |
| client_state = self.preserved_networks_and_client_state.client_connections_state |
| if client_state is not None: |
| if client_state is WlanClientState.CONNECTIONS_ENABLED: |
| self.sl4f.wlan_policy_lib.start_client_connections() |
| else: |
| self.sl4f.wlan_policy_lib.stop_client_connections() |
| |
| self.log.info("Preserved networks and client state restored.") |
| self.preserved_networks_and_client_state = None |