blob: 5714c43ed4b2d15f463c523046f67a34e4f893bc [file] [log] [blame]
#!/usr/bin/env python3
#
# Copyright 2022 The Fuchsia Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from dataclasses import dataclass
from enum import StrEnum
from typing import Any
from mobly import signals
from antlion.controllers.fuchsia_lib.base_lib import BaseLib
from antlion.validation import MapValidator
COMMAND_SCAN_FOR_BSS_INFO = "wlan.scan_for_bss_info"
COMMAND_CONNECT = "wlan.connect"
COMMAND_DISCONNECT = "wlan.disconnect"
COMMAND_STATUS = "wlan.status"
COMMAND_GET_IFACE_ID_LIST = "wlan.get_iface_id_list"
COMMAND_GET_PHY_ID_LIST = "wlan.get_phy_id_list"
COMMAND_CREATE_IFACE = "wlan.create_iface"
COMMAND_DESTROY_IFACE = "wlan.destroy_iface"
COMMAND_GET_COUNTRY = "wlan_phy.get_country"
COMMAND_GET_DEV_PATH = "wlan_phy.get_dev_path"
COMMAND_QUERY_IFACE = "wlan.query_iface"
class WlanError(signals.ControllerError):
"""SL4F server responded to wlan request with an error."""
class WlanMacRole(StrEnum):
"""Role of the WLAN MAC interface.
Loosely matches the fuchsia.wlan.common.WlanMacRole FIDL enum.
See https://cs.opensource.google/fuchsia/fuchsia/+/main:src/testing/sl4f/src/wlan/types.rs
"""
CLIENT = "Client"
AP = "Ap"
MESH = "Mesh"
UNKNOWN = "Unknown"
@dataclass(frozen=True)
class CreateIfaceResult:
interface_id: int
@dataclass(frozen=True)
class QueryIfaceResult:
"""Result of the wlan.query_iface command.
Matches fuchsia.wlan.device.service.QueryIfaceResponse.
See https://cs.opensource.google/fuchsia/fuchsia/+/main:sdk/fidl/fuchsia.wlan.device.service/service.fidl
"""
role: WlanMacRole
"""The role the iface is currently operating in, e.g., client role."""
id: int
"""The iface's global ID."""
phy_id: int
"""Iface's PHY ID."""
phy_assigned_id: int
"""Local ID assigned by this iface's PHY."""
sta_addr: list[int]
"""The iface's MAC."""
class FuchsiaWlanLib(BaseLib):
def __init__(self, addr: str) -> None:
super().__init__(addr, "wlan")
def wlanScanForBSSInfo(self):
"""Scans and returns BSS info
Returns:
A dict mapping each seen SSID to a list of BSS Description IE
blocks, one for each BSS observed in the network
"""
test_cmd = COMMAND_SCAN_FOR_BSS_INFO
return self.send_command(test_cmd, {})
def wlanConnectToNetwork(self, target_ssid, target_bss_desc, target_pwd=None):
"""Triggers a network connection
Args:
target_ssid: the network to attempt a connection to
target_pwd: (optional) password for the target network
Returns:
boolean indicating if the connection was successful
"""
test_cmd = COMMAND_CONNECT
test_args = {
"target_ssid": target_ssid,
"target_pwd": target_pwd,
"target_bss_desc": target_bss_desc,
}
return self.send_command(test_cmd, test_args)
def wlanDisconnect(self):
"""Disconnect any current wifi connections"""
test_cmd = COMMAND_DISCONNECT
return self.send_command(test_cmd, {})
def wlanCreateIface(
self, phy_id: int, role: WlanMacRole, sta_addr: str | None = None
) -> CreateIfaceResult:
"""Create a new WLAN interface.
Args:
phy_id: the interface id.
role: the role of new interface.
sta_addr: MAC address for softAP interface only.
Returns:
Dictionary, service id if success, error if error.
Raises:
WlanError: Device responded with an error.
"""
test_cmd = COMMAND_CREATE_IFACE
test_args = {
"phy_id": phy_id,
"role": role,
"sta_addr": sta_addr,
}
resp = MapValidator(self.send_command(test_cmd, test_args))
err = resp.get(str, "error", None)
if err is not None:
raise WlanError(f'Failed to create wlan iface: "{err}"')
iface_id = resp.get(int, "result")
return CreateIfaceResult(
interface_id=iface_id,
)
def wlanDestroyIface(self, iface_id):
"""Destroy WLAN interface by ID.
Args:
iface_id: the interface id.
Returns:
Dictionary, service id if success, error if error.
"""
test_cmd = COMMAND_DESTROY_IFACE
test_args = {"identifier": iface_id}
return self.send_command(test_cmd, test_args)
def wlanGetIfaceIdList(self) -> dict[int, Any]:
"""Get a list if wlan interface IDs.
Returns:
List of uint16 phy IDs.
"""
test_cmd = COMMAND_GET_IFACE_ID_LIST
return self.send_command(test_cmd, {})
def wlanPhyIdList(self) -> dict[str, Any]:
"""Get a list if wlan phy IDs.
Returns:
List of IDs if success, error if error.
"""
test_cmd = COMMAND_GET_PHY_ID_LIST
return self.send_command(test_cmd, {})
def wlanStatus(self, iface_id=None):
"""Request connection status
Args:
iface_id: unsigned 16-bit int, the wlan interface id
(defaults to None)
Returns:
Client state summary containing WlanClientState and
status of various networks connections
"""
test_cmd = COMMAND_STATUS
test_args = {}
if iface_id:
test_args = {"iface_id": iface_id}
return self.send_command(test_cmd, test_args)
def wlanGetCountry(self, phy_id):
"""Reads the currently configured country for `phy_id`.
Args:
phy_id: unsigned 16-bit integer.
Returns:
Dictionary, String if success, error if error.
"""
test_cmd = COMMAND_GET_COUNTRY
test_args = {"phy_id": phy_id}
return self.send_command(test_cmd, test_args)
def wlanGetDevPath(self, phy_id):
"""Queries the device path for `phy_id`.
Args:
phy_id: unsigned 16-bit integer.
Returns:
Dictionary, String if success, error if error.
"""
test_cmd = COMMAND_GET_DEV_PATH
test_args = {"phy_id": phy_id}
return self.send_command(test_cmd, test_args)
def wlanQueryInterface(self, iface_id: int) -> QueryIfaceResult:
"""Retrieves interface info for given wlan iface id.
Args:
iface_id: unsigned 16-bit int, the wlan interface id.
Returns:
QueryIfaceResults from the SL4F server
Raises:
WlanError: Device responded with an error
"""
test_cmd = COMMAND_QUERY_IFACE
test_args = {"iface_id": iface_id}
resp = MapValidator(self.send_command(test_cmd, test_args))
err = resp.get(str, "error", None)
if err is not None:
raise WlanError(f'Failed to query wlan iface: "{err}"')
result = MapValidator(resp.get(dict, "result"))
sta_addr = result.list("sta_addr", optional=True)
if sta_addr is None:
# Fallback to older field name to maintain backwards compatibility
# with older versions of SL4F's QueryIfaceResponse.
# See https://fxrev.dev/562146
sta_addr = result.list("mac")
return QueryIfaceResult(
role=WlanMacRole(result.get(str, "role")),
id=result.get(int, "id"),
phy_id=result.get(int, "phy_id"),
phy_assigned_id=result.get(int, "phy_assigned_id"),
sta_addr=sta_addr.all(int),
)