| # Lint as: python3 |
| #!/usr/bin/env python3 |
| # |
| # Copyright 2022 The Fuchsia Authors |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # |
| # This class provides pipeline betweem python tests and WLAN policy facade. |
| |
| from enum import StrEnum |
| |
| from honeydew import errors |
| from honeydew.interfaces.device_classes.fuchsia_device import ( |
| FuchsiaDevice as HdFuchsiaDevice, |
| ) |
| from honeydew.typing.wlan import ( |
| ClientStateSummary, |
| ConnectionState, |
| DisconnectStatus, |
| NetworkConfig, |
| NetworkIdentifier, |
| NetworkState, |
| RequestStatus, |
| SecurityType, |
| WlanClientState, |
| ) |
| |
| from antlion.controllers.ap_lib.hostapd_security import FuchsiaSecurityType |
| from antlion.controllers.fuchsia_lib.base_lib import BaseLib |
| from antlion.validation import MapValidator |
| |
| DEFAULT_UPDATE_TIMEOUT_SEC: float = 30.0 |
| |
| |
| class WlanPolicyError(Exception): |
| """Exception for SL4F commands executed by WLAN Policy.""" |
| |
| |
| class Command(StrEnum): |
| """Sl4f Server Commands.""" |
| |
| CONNECT = "wlan_policy.connect" |
| CREATE_CLIENT_CONTROLLER = "wlan_policy.create_client_controller" |
| GET_SAVED_NETWORKS = "wlan_policy.get_saved_networks" |
| GET_UPDATE = "wlan_policy.get_update" |
| REMOVE_ALL_NETWORKS = "wlan_policy.remove_all_networks" |
| REMOVE_NETWORK = "wlan_policy.remove_network" |
| SAVE_NETWORK = "wlan_policy.save_network" |
| SCAN_FOR_NETWORKS = "wlan_policy.scan_for_networks" |
| SET_NEW_UPDATE_LISTENER = "wlan_policy.set_new_update_listener" |
| START_CLIENT_CONNECTIONS = "wlan_policy.start_client_connections" |
| STOP_CLIENT_CONNECTIONS = "wlan_policy.stop_client_connections" |
| |
| |
| class FuchsiaWlanPolicyLib(BaseLib): |
| def __init__(self, addr: str, honeydew_fd: HdFuchsiaDevice | None = None) -> None: |
| super().__init__(addr, "wlan_policy") |
| self.honeydew_fd = honeydew_fd |
| |
| def _check_response_error( |
| self, cmd: Command, response_json: dict[str, object] |
| ) -> object | None: |
| """Helper method to process errors from SL4F calls. |
| |
| Args: |
| cmd: SL4F command sent. |
| response_json: Response from SL4F server. |
| |
| Returns: |
| Response json or None if error. |
| |
| Raises: |
| WlanPolicyError if the response_json has something in the 'error' field. |
| """ |
| resp = MapValidator(response_json) |
| error = resp.get(str, "error", None) |
| if error: |
| # We sometimes expect to catch WlanPolicyError so we include a log here for |
| # when we do retries. |
| self.log.debug(f"SL4F call: {cmd} failed with Error: '{error}'.") |
| raise WlanPolicyError(f"SL4F call: {cmd} failed with Error: '{error}'.") |
| else: |
| return response_json.get("result") |
| |
| def connect( |
| self, target_ssid: str, security_type: SecurityType | FuchsiaSecurityType |
| ) -> RequestStatus: |
| """Triggers connection to a network. |
| |
| Args: |
| target_ssid: The network to connect to. Must have been previously |
| saved in order for a successful connection to happen. |
| security_type: The security protocol of the network. |
| |
| Returns: |
| A RequestStatus response to the connect request. |
| |
| Raises: |
| WlanPolicyError: Sl4f run command failed. |
| TypeError: Return value not a string. |
| """ |
| # TODO(b/308807691): Change other uses of FuchsiaSecurityType to Honeydew's |
| # SecurityType |
| hd_security_type = SecurityType(security_type.value) |
| method_params = { |
| "target_ssid": target_ssid, |
| "security_type": str(hd_security_type), |
| } |
| if self.honeydew_fd: |
| try: |
| return self.honeydew_fd.wlan_policy.connect( |
| target_ssid, hd_security_type |
| ) |
| except errors.Sl4fError as e: |
| raise WlanPolicyError from e |
| else: |
| resp = self.send_command(Command.CONNECT, method_params) |
| result = self._check_response_error(Command.CONNECT, resp) |
| |
| if not isinstance(result, str): |
| raise TypeError(f'Expected "result" to be str, got {type(result)}') |
| |
| return RequestStatus(result) |
| |
| def create_client_controller(self) -> None: |
| """Initializes the client controller. |
| |
| Raises: |
| WlanPolicyError: Sl4f run command failed. |
| """ |
| if self.honeydew_fd: |
| try: |
| self.honeydew_fd.wlan_policy.create_client_controller() |
| except errors.Sl4fError as e: |
| raise WlanPolicyError from e |
| else: |
| resp = self.send_command(Command.CREATE_CLIENT_CONTROLLER) |
| self._check_response_error(Command.CREATE_CLIENT_CONTROLLER, resp) |
| |
| def get_saved_networks(self) -> list[NetworkConfig]: |
| """Gets networks saved on device. |
| |
| Returns: |
| A list of NetworkConfigs. |
| |
| Raises: |
| WlanPolicyError: Sl4f run command failed. |
| TypeError: Return values not correct types. |
| """ |
| if self.honeydew_fd: |
| try: |
| return self.honeydew_fd.wlan_policy.get_saved_networks() |
| except errors.Sl4fError as e: |
| raise WlanPolicyError from e |
| else: |
| resp = self.send_command(Command.GET_SAVED_NETWORKS) |
| result = self._check_response_error(Command.GET_SAVED_NETWORKS, resp) |
| |
| if not isinstance(result, list): |
| raise TypeError(f'Expected "result" to be list, got {type(result)}') |
| |
| networks: list[NetworkConfig] = [] |
| for n in result: |
| if not isinstance(n, dict): |
| raise TypeError(f'Expected "network" to be dict, got {type(n)}') |
| |
| network = MapValidator(n) |
| security_type = network.get(str, "security_type", "None") |
| networks.append( |
| NetworkConfig( |
| ssid=network.get(str, "ssid"), |
| security_type=SecurityType(security_type.lower()), |
| credential_type=network.get(str, "credential_type"), |
| credential_value=network.get(str, "credential_value"), |
| ) |
| ) |
| return networks |
| |
| def get_update( |
| self, timeout: float = DEFAULT_UPDATE_TIMEOUT_SEC |
| ) -> ClientStateSummary: |
| """Gets one client listener update. |
| |
| This call will return with an update immediately the |
| first time the update listener is initialized by setting a new listener |
| or by creating a client controller before setting a new listener. |
| Subsequent calls will hang until there is a change since the last |
| update call. |
| |
| Args: |
| timeout: Timeout in seconds to wait for the get_update command to |
| return. |
| |
| Returns: |
| An update of connection status. If there is no error, the result is |
| a WlanPolicyUpdate with a structure that matches the FIDL |
| ClientStateSummary struct given for updates. |
| |
| Raises: |
| WlanPolicyError: Sl4f run command failed. |
| TypeError: Return values not correct types. |
| """ |
| if self.honeydew_fd: |
| try: |
| return self.honeydew_fd.wlan_policy.get_update(response_timeout=timeout) |
| except errors.Sl4fError as e: |
| raise WlanPolicyError from e |
| else: |
| resp = self.send_command(Command.GET_UPDATE, response_timeout=timeout) |
| result_raw = self._check_response_error(Command.GET_UPDATE, resp) |
| |
| if not isinstance(result_raw, dict): |
| raise TypeError(f'Expected "result" to be dict, got {type(result_raw)}') |
| |
| result = MapValidator(result_raw) |
| networks = result.get(list, "networks", []) |
| |
| network_states: list[NetworkState] = [] |
| for n in networks: |
| network = MapValidator(n) |
| state = network.get(str, "state") |
| status = network.get(str, "status", None) |
| if status is None: |
| status = DisconnectStatus.CONNECTION_STOPPED |
| |
| id = MapValidator(network.get(dict, "id")) |
| ssid = id.get(str, "ssid") |
| security_type = id.get(str, "type_") |
| |
| network_states.append( |
| NetworkState( |
| network_identifier=NetworkIdentifier( |
| ssid=ssid, |
| security_type=SecurityType(security_type.lower()), |
| ), |
| connection_state=ConnectionState(state), |
| disconnect_status=DisconnectStatus(status), |
| ) |
| ) |
| client_state = result.get(str, "state") |
| return ClientStateSummary( |
| state=WlanClientState(client_state), networks=network_states |
| ) |
| |
| def remove_all_networks(self) -> None: |
| """Deletes all saved networks on the device. |
| |
| Raises: |
| WlanPolicyError: Sl4f run command failed. |
| """ |
| if self.honeydew_fd: |
| try: |
| self.honeydew_fd.wlan_policy.remove_all_networks() |
| except errors.Sl4fError as e: |
| raise WlanPolicyError from e |
| else: |
| resp = self.send_command(Command.REMOVE_ALL_NETWORKS) |
| self._check_response_error(Command.REMOVE_ALL_NETWORKS, resp) |
| |
| def remove_network( |
| self, |
| target_ssid: str, |
| security_type: SecurityType, |
| target_pwd: str | None = None, |
| ) -> None: |
| """Removes or "forgets" a network from saved networks. |
| |
| Args: |
| target_ssid: The network to remove. |
| security_type: The security protocol of the network. |
| target_pwd: The credential being saved with the network. No password |
| is equivalent to an empty string. |
| |
| Raises: |
| WlanPolicyError: Sl4f run command failed. |
| """ |
| if not target_pwd: |
| target_pwd = "" |
| |
| method_params = { |
| "target_ssid": target_ssid, |
| "security_type": str(security_type), |
| "target_pwd": target_pwd, |
| } |
| if self.honeydew_fd: |
| try: |
| self.honeydew_fd.wlan_policy.remove_network( |
| target_ssid, security_type, target_pwd |
| ) |
| except errors.Sl4fError as e: |
| raise WlanPolicyError from e |
| else: |
| resp = self.send_command(Command.REMOVE_NETWORK, method_params) |
| self._check_response_error(Command.REMOVE_NETWORK, resp) |
| |
| def save_network( |
| self, |
| target_ssid: str, |
| security_type: SecurityType | FuchsiaSecurityType, |
| target_pwd: str | None = None, |
| ) -> None: |
| """Saves a network to the device. |
| |
| Args: |
| target_ssid: The network to save. |
| security_type: The security protocol of the network. |
| target_pwd: The credential being saved with the network. No password |
| is equivalent to an empty string. |
| |
| Raises: |
| WlanPolicyError: Sl4f run command failed. |
| """ |
| # TODO(b/308807691): Change other uses of FuchsiaSecurityType to Honeydew's |
| # SecurityType |
| hd_security_type = SecurityType(security_type.value) |
| if not target_pwd: |
| target_pwd = "" |
| |
| method_params = { |
| "target_ssid": target_ssid, |
| "security_type": str(hd_security_type.value), |
| "target_pwd": target_pwd, |
| } |
| if self.honeydew_fd: |
| try: |
| self.honeydew_fd.wlan_policy.save_network( |
| target_ssid, hd_security_type, target_pwd |
| ) |
| except errors.Sl4fError as e: |
| raise WlanPolicyError from e |
| else: |
| resp = self.send_command(Command.SAVE_NETWORK, method_params) |
| self._check_response_error(Command.SAVE_NETWORK, resp) |
| |
| def scan_for_networks(self) -> list[str]: |
| """Scans for networks. |
| |
| Returns: |
| A list of network SSIDs that can be connected to. |
| |
| Raises: |
| WlanPolicyError: Sl4f run command failed. |
| TypeError: Return value not a list. |
| """ |
| if self.honeydew_fd: |
| try: |
| return self.honeydew_fd.wlan_policy.scan_for_network() |
| except errors.Sl4fError as e: |
| raise WlanPolicyError from e |
| else: |
| resp = self.send_command(Command.SCAN_FOR_NETWORKS) |
| result = self._check_response_error(Command.SCAN_FOR_NETWORKS, resp) |
| |
| if not isinstance(result, list): |
| raise TypeError(f'Expected "result" to be list, got {type(result)}') |
| |
| return result |
| |
| def set_new_update_listener(self) -> None: |
| """Sets the update listener stream of the facade to a new stream. |
| This causes updates to be reset. Intended to be used between tests so |
| that the behaviour of updates in a test is independent from previous |
| tests. |
| |
| Raises: |
| WlanPolicyError: Sl4f run command failed. |
| """ |
| if self.honeydew_fd: |
| try: |
| self.honeydew_fd.wlan_policy.set_new_update_listener() |
| except errors.Sl4fError as e: |
| raise WlanPolicyError from e |
| else: |
| resp = self.send_command(Command.SET_NEW_UPDATE_LISTENER) |
| self._check_response_error(Command.SET_NEW_UPDATE_LISTENER, resp) |
| |
| def start_client_connections(self) -> None: |
| """Enables device to initiate connections to networks. |
| |
| Raises: |
| WlanPolicyError: Sl4f run command failed. |
| """ |
| if self.honeydew_fd: |
| try: |
| self.honeydew_fd.wlan_policy.start_client_connections() |
| except errors.Sl4fError as e: |
| raise WlanPolicyError from e |
| else: |
| resp = self.send_command(Command.START_CLIENT_CONNECTIONS) |
| self._check_response_error(Command.START_CLIENT_CONNECTIONS, resp) |
| |
| def stop_client_connections(self) -> None: |
| """Disables device for initiating connections to networks. |
| |
| Raises: |
| WlanPolicyError: Sl4f run command failed. |
| """ |
| if self.honeydew_fd: |
| try: |
| self.honeydew_fd.wlan_policy.stop_client_connections() |
| except errors.Sl4fError as e: |
| raise WlanPolicyError from e |
| else: |
| resp = self.send_command(Command.STOP_CLIENT_CONNECTIONS) |
| self._check_response_error(Command.STOP_CLIENT_CONNECTIONS, resp) |