blob: 8791c9f98c75db5a213d45c4ccb2312184ce7bf4 [file] [log] [blame]
#!/usr/bin/env python3
#
# Copyright 2022 The Fuchsia Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import time
from dataclasses import dataclass
from honeydew.typing.wlan import CountryCode, QueryIfaceResponse, WlanMacRole
from antlion import logger, signals, utils
from antlion.controllers.fuchsia_lib.sl4f import SL4F
from antlion.validation import MapValidator
TIME_TO_SLEEP_BETWEEN_RETRIES = 1
TIME_TO_WAIT_FOR_COUNTRY_CODE = 10
@dataclass(frozen=True)
class WlanInterfaces:
client: dict[str, QueryIfaceResponse]
ap: dict[str, QueryIfaceResponse]
class WlanControllerError(signals.ControllerError):
pass
class WlanController:
"""Contains methods related to wlan core, to be used in FuchsiaDevice object"""
def __init__(self, sl4f: SL4F) -> None:
self.sl4f = sl4f
self.log = logger.create_tagged_trace_logger(
f"WlanController | {self.sl4f.address}"
)
def get_interfaces_by_role(self) -> WlanInterfaces:
"""Retrieves WLAN interface information."""
# Retrieve WLAN interface IDs
wlan_iface_ids = self.sl4f.wlan_lib.get_iface_id_list()
if len(wlan_iface_ids) < 1:
return WlanInterfaces(client={}, ap={})
# Use IDs to get WLAN interface info and mac addresses
wlan_ifaces_by_mac: dict[str, QueryIfaceResponse] = {}
for id in wlan_iface_ids:
result = self.sl4f.wlan_lib.query_iface(id)
mac = utils.mac_address_list_to_str(bytes(result.sta_addr))
wlan_ifaces_by_mac[mac] = result
# Use mac addresses to query the interfaces from the netstack view,
# which allows us to supplement the interface information with the name,
# netstack_id, etc.
# TODO(fxb/75909): This tedium is necessary to get the interface name
# because only netstack has that information. The bug linked here is
# to reconcile some of the information between the two perspectives, at
# which point we can eliminate step.
net_ifaces_response = self.sl4f.netstack_lib.netstackListInterfaces()
if net_ifaces_response.get("error"):
raise WlanControllerError(
f"Failed to get network interfaces list: {net_ifaces_response['error']}"
)
net_ifaces = net_ifaces_response["result"]
client: dict[str, QueryIfaceResponse] = {}
ap: dict[str, QueryIfaceResponse] = {}
for iface in net_ifaces:
iface = MapValidator(iface)
try:
# Some interfaces might not have a MAC
mac_raw = iface.list("mac").all(int)
iface_mac = utils.mac_address_list_to_str(bytes(mac_raw))
except Exception as e:
self.log.debug(f"Error {e} getting MAC for iface {iface}")
continue
if iface_mac in wlan_ifaces_by_mac:
result = wlan_ifaces_by_mac[iface_mac]
name = iface.get(str, "name")
match result.role:
case WlanMacRole.CLIENT:
client[name] = result
case WlanMacRole.AP:
ap[name] = result
case _:
raise ValueError(f'Unexpected WlanMacRole "{result.role}"')
return WlanInterfaces(client, ap)
def set_country_code(self, country_code: CountryCode) -> None:
"""Sets country code through the regulatory region service and waits
for the code to be applied to WLAN PHY.
Args:
country_code: the 2 character country code to set
Raises:
EnvironmentError - failure to get/set regulatory region
ConnectionError - failure to query PHYs
"""
self.log.info(f"Setting DUT country code to {country_code}")
self.sl4f.wlan_lib.set_region(country_code)
self.log.info(
f"Verifying DUT country code was correctly set to {country_code}."
)
phy_ids_response = self.sl4f.wlan_lib.get_phy_id_list()
end_time = time.time() + TIME_TO_WAIT_FOR_COUNTRY_CODE
while time.time() < end_time:
for id in phy_ids_response:
resp = self.sl4f.wlan_lib.get_country(id)
if resp == country_code:
return
time.sleep(TIME_TO_SLEEP_BETWEEN_RETRIES)
else:
raise EnvironmentError(f"Failed to set DUT country code to {country_code}.")