| #!/usr/bin/env python3 |
| # |
| # Copyright 2022 The Fuchsia Authors |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| from dataclasses import dataclass |
| from enum import StrEnum |
| from typing import Any |
| |
| from mobly import signals |
| |
| from antlion.controllers.fuchsia_lib.base_lib import BaseLib |
| from antlion.validation import MapValidator |
| |
| COMMAND_SCAN_FOR_BSS_INFO = "wlan.scan_for_bss_info" |
| COMMAND_CONNECT = "wlan.connect" |
| COMMAND_DISCONNECT = "wlan.disconnect" |
| COMMAND_STATUS = "wlan.status" |
| COMMAND_GET_IFACE_ID_LIST = "wlan.get_iface_id_list" |
| COMMAND_GET_PHY_ID_LIST = "wlan.get_phy_id_list" |
| COMMAND_CREATE_IFACE = "wlan.create_iface" |
| COMMAND_DESTROY_IFACE = "wlan.destroy_iface" |
| COMMAND_GET_COUNTRY = "wlan_phy.get_country" |
| COMMAND_GET_DEV_PATH = "wlan_phy.get_dev_path" |
| COMMAND_QUERY_IFACE = "wlan.query_iface" |
| |
| |
| class WlanError(signals.ControllerError): |
| """SL4F server responded to wlan request with an error.""" |
| |
| |
| class WlanMacRole(StrEnum): |
| """Role of the WLAN MAC interface. |
| |
| Loosely matches the fuchsia.wlan.common.WlanMacRole FIDL enum. |
| See https://cs.opensource.google/fuchsia/fuchsia/+/main:src/testing/sl4f/src/wlan/types.rs |
| """ |
| |
| CLIENT = "Client" |
| AP = "Ap" |
| MESH = "Mesh" |
| UNKNOWN = "Unknown" |
| |
| |
| @dataclass(frozen=True) |
| class CreateIfaceResult: |
| interface_id: int |
| |
| |
| @dataclass(frozen=True) |
| class QueryIfaceResult: |
| """Result of the wlan.query_iface command. |
| |
| Matches fuchsia.wlan.device.service.QueryIfaceResponse. |
| See https://cs.opensource.google/fuchsia/fuchsia/+/main:sdk/fidl/fuchsia.wlan.device.service/service.fidl |
| """ |
| |
| role: WlanMacRole |
| """The role the iface is currently operating in, e.g., client role.""" |
| id: int |
| """The iface's global ID.""" |
| phy_id: int |
| """Iface's PHY ID.""" |
| phy_assigned_id: int |
| """Local ID assigned by this iface's PHY.""" |
| sta_addr: list[int] |
| """The iface's MAC.""" |
| |
| |
| class FuchsiaWlanLib(BaseLib): |
| def __init__(self, addr: str) -> None: |
| super().__init__(addr, "wlan") |
| |
| def wlanScanForBSSInfo(self): |
| """Scans and returns BSS info |
| |
| Returns: |
| A dict mapping each seen SSID to a list of BSS Description IE |
| blocks, one for each BSS observed in the network |
| """ |
| test_cmd = COMMAND_SCAN_FOR_BSS_INFO |
| |
| return self.send_command(test_cmd, {}) |
| |
| def wlanConnectToNetwork(self, target_ssid, target_bss_desc, target_pwd=None): |
| """Triggers a network connection |
| Args: |
| target_ssid: the network to attempt a connection to |
| target_pwd: (optional) password for the target network |
| |
| Returns: |
| boolean indicating if the connection was successful |
| """ |
| test_cmd = COMMAND_CONNECT |
| test_args = { |
| "target_ssid": target_ssid, |
| "target_pwd": target_pwd, |
| "target_bss_desc": target_bss_desc, |
| } |
| |
| return self.send_command(test_cmd, test_args) |
| |
| def wlanDisconnect(self): |
| """Disconnect any current wifi connections""" |
| test_cmd = COMMAND_DISCONNECT |
| |
| return self.send_command(test_cmd, {}) |
| |
| def wlanCreateIface( |
| self, phy_id: int, role: WlanMacRole, sta_addr: str | None = None |
| ) -> CreateIfaceResult: |
| """Create a new WLAN interface. |
| Args: |
| phy_id: the interface id. |
| role: the role of new interface. |
| sta_addr: MAC address for softAP interface only. |
| |
| Returns: |
| Dictionary, service id if success, error if error. |
| |
| Raises: |
| WlanError: Device responded with an error. |
| """ |
| test_cmd = COMMAND_CREATE_IFACE |
| test_args = { |
| "phy_id": phy_id, |
| "role": role, |
| "sta_addr": sta_addr, |
| } |
| |
| resp = MapValidator(self.send_command(test_cmd, test_args)) |
| err = resp.get(str, "error", None) |
| if err is not None: |
| raise WlanError(f'Failed to create wlan iface: "{err}"') |
| |
| iface_id = resp.get(int, "result") |
| return CreateIfaceResult( |
| interface_id=iface_id, |
| ) |
| |
| def wlanDestroyIface(self, iface_id): |
| """Destroy WLAN interface by ID. |
| Args: |
| iface_id: the interface id. |
| |
| Returns: |
| Dictionary, service id if success, error if error. |
| """ |
| test_cmd = COMMAND_DESTROY_IFACE |
| test_args = {"identifier": iface_id} |
| |
| return self.send_command(test_cmd, test_args) |
| |
| def wlanGetIfaceIdList(self) -> dict[int, Any]: |
| """Get a list if wlan interface IDs. |
| |
| Returns: |
| List of uint16 phy IDs. |
| """ |
| test_cmd = COMMAND_GET_IFACE_ID_LIST |
| |
| return self.send_command(test_cmd, {}) |
| |
| def wlanPhyIdList(self) -> dict[str, Any]: |
| """Get a list if wlan phy IDs. |
| |
| Returns: |
| List of IDs if success, error if error. |
| """ |
| test_cmd = COMMAND_GET_PHY_ID_LIST |
| |
| return self.send_command(test_cmd, {}) |
| |
| def wlanStatus(self, iface_id=None): |
| """Request connection status |
| |
| Args: |
| iface_id: unsigned 16-bit int, the wlan interface id |
| (defaults to None) |
| |
| Returns: |
| Client state summary containing WlanClientState and |
| status of various networks connections |
| """ |
| test_cmd = COMMAND_STATUS |
| test_args = {} |
| if iface_id: |
| test_args = {"iface_id": iface_id} |
| |
| return self.send_command(test_cmd, test_args) |
| |
| def wlanGetCountry(self, phy_id): |
| """Reads the currently configured country for `phy_id`. |
| |
| Args: |
| phy_id: unsigned 16-bit integer. |
| |
| Returns: |
| Dictionary, String if success, error if error. |
| """ |
| test_cmd = COMMAND_GET_COUNTRY |
| test_args = {"phy_id": phy_id} |
| |
| return self.send_command(test_cmd, test_args) |
| |
| def wlanGetDevPath(self, phy_id): |
| """Queries the device path for `phy_id`. |
| |
| Args: |
| phy_id: unsigned 16-bit integer. |
| |
| Returns: |
| Dictionary, String if success, error if error. |
| """ |
| test_cmd = COMMAND_GET_DEV_PATH |
| test_args = {"phy_id": phy_id} |
| |
| return self.send_command(test_cmd, test_args) |
| |
| def wlanQueryInterface(self, iface_id: int) -> QueryIfaceResult: |
| """Retrieves interface info for given wlan iface id. |
| |
| Args: |
| iface_id: unsigned 16-bit int, the wlan interface id. |
| |
| Returns: |
| QueryIfaceResults from the SL4F server |
| |
| Raises: |
| WlanError: Device responded with an error |
| """ |
| test_cmd = COMMAND_QUERY_IFACE |
| test_args = {"iface_id": iface_id} |
| |
| resp = MapValidator(self.send_command(test_cmd, test_args)) |
| err = resp.get(str, "error", None) |
| if err is not None: |
| raise WlanError(f'Failed to query wlan iface: "{err}"') |
| |
| result = MapValidator(resp.get(dict, "result")) |
| sta_addr = result.list("sta_addr", optional=True) |
| if sta_addr is None: |
| # Fallback to older field name to maintain backwards compatibility |
| # with older versions of SL4F's QueryIfaceResponse. |
| # See https://fxrev.dev/562146 |
| sta_addr = result.list("mac") |
| return QueryIfaceResult( |
| role=WlanMacRole(result.get(str, "role")), |
| id=result.get(int, "id"), |
| phy_id=result.get(int, "phy_id"), |
| phy_assigned_id=result.get(int, "phy_assigned_id"), |
| sta_addr=sta_addr.all(int), |
| ) |