blob: 6f4e56d37e9dadcdb9d8baf210c2c54aa7d8e2ea [file] [log] [blame]
#!/usr/bin/env python3
#
# Copyright 2022 The Fuchsia Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import time
from typing import TYPE_CHECKING, Any, Dict
from antlion import logger, signals, utils
if TYPE_CHECKING:
from antlion.controllers.fuchsia_device import FuchsiaDevice
TIME_TO_SLEEP_BETWEEN_RETRIES = 1
TIME_TO_WAIT_FOR_COUNTRY_CODE = 10
WlanInterfaces = Dict[str, Dict[str, Any]]
class WlanControllerError(signals.ControllerError):
pass
class WlanController:
"""Contains methods related to wlan core, to be used in FuchsiaDevice object"""
def __init__(self, fuchsia_device: "FuchsiaDevice") -> None:
self.device = fuchsia_device
self.log = logger.create_tagged_trace_logger(
f"WlanController for FuchsiaDevice | {self.device.ip}"
)
# TODO(70501): Wrap wlan_lib functions and setup from FuchsiaDevice here
# (similar to how WlanPolicyController does it) to prevent FuchsiaDevice
# from growing too large.
def _configure_wlan(self) -> None:
pass
def _deconfigure_wlan(self) -> None:
pass
def update_wlan_interfaces(self) -> None:
"""Retrieves WLAN interfaces from device and sets the FuchsiaDevice
attributes.
"""
wlan_interfaces = self.get_interfaces_by_role()
self.device.wlan_client_interfaces = wlan_interfaces["client"]
self.device.wlan_ap_interfaces = wlan_interfaces["ap"]
# Set test interfaces to value from config, else the first found
# interface, else None
self.device.wlan_client_test_interface_name = self.device.conf_data.get(
"wlan_client_test_interface",
next(iter(self.device.wlan_client_interfaces), None),
)
self.device.wlan_ap_test_interface_name = self.device.conf_data.get(
"wlan_ap_test_interface", next(iter(self.device.wlan_ap_interfaces), None)
)
def get_interfaces_by_role(self) -> WlanInterfaces:
"""Retrieves WLAN interface information, supplemented by netstack info.
Returns:
Dict with keys 'client' and 'ap', each of which contain WLAN
interfaces.
"""
# Retrieve WLAN interface IDs
response = self.device.sl4f.wlan_lib.wlanGetIfaceIdList()
if response.get("error"):
raise WlanControllerError(
f"Failed to get WLAN iface ids: {response['error']}"
)
wlan_iface_ids = response.get("result", [])
if len(wlan_iface_ids) < 1:
return {"client": {}, "ap": {}}
# Use IDs to get WLAN interface info and mac addresses
wlan_ifaces_by_mac = {}
for id in wlan_iface_ids:
response = self.device.sl4f.wlan_lib.wlanQueryInterface(id)
if response.get("error"):
raise WlanControllerError(
f"Failed to query wlan iface id {id}: {response['error']}"
)
mac = response["result"].get("sta_addr", None)
if mac is None:
# Fallback to older field name to maintain backwards
# compatibility with older versions of SL4F's
# QueryIfaceResponse. See https://fxrev.dev/562146.
mac = response["result"].get("mac_addr")
wlan_ifaces_by_mac[utils.mac_address_list_to_str(mac)] = response["result"]
# Use mac addresses to query the interfaces from the netstack view,
# which allows us to supplement the interface information with the name,
# netstack_id, etc.
# TODO(fxb/75909): This tedium is necessary to get the interface name
# because only netstack has that information. The bug linked here is
# to reconcile some of the information between the two perspectives, at
# which point we can eliminate step.
net_ifaces_response = self.device.sl4f.netstack_lib.netstackListInterfaces()
if net_ifaces_response.get("error"):
raise WlanControllerError(
f"Failed to get network interfaces list: {net_ifaces_response['error']}"
)
net_ifaces = net_ifaces_response["result"]
wlan_ifaces_by_role: WlanInterfaces = {"client": {}, "ap": {}}
for iface in net_ifaces:
try:
# Some interfaces might not have a MAC
iface_mac = utils.mac_address_list_to_str(iface["mac"])
except Exception as e:
self.log.debug(f"Error {e} getting MAC for iface {iface}")
continue
if iface_mac in wlan_ifaces_by_mac:
wlan_ifaces_by_mac[iface_mac]["netstack_id"] = iface["id"]
# Add to return dict, mapped by role then name.
wlan_ifaces_by_role[wlan_ifaces_by_mac[iface_mac]["role"].lower()][
iface["name"]
] = wlan_ifaces_by_mac[iface_mac]
return wlan_ifaces_by_role
def set_country_code(self, country_code: str) -> None:
"""Sets country code through the regulatory region service and waits
for the code to be applied to WLAN PHY.
Args:
country_code: the 2 character country code to set
Raises:
EnvironmentError - failure to get/set regulatory region
ConnectionError - failure to query PHYs
"""
self.log.info(f"Setting DUT country code to {country_code}")
country_code_response = self.device.sl4f.regulatory_region_lib.setRegion(
country_code
)
if country_code_response.get("error"):
raise EnvironmentError(
"Failed to set country code (%s) on DUT. Error: %s"
% (country_code, country_code_response["error"])
)
self.log.info(
f"Verifying DUT country code was correctly set to {country_code}."
)
phy_ids_response = self.device.sl4f.wlan_lib.wlanPhyIdList()
if phy_ids_response.get("error"):
raise ConnectionError(
"Failed to get phy ids from DUT while setting country code to "
f'{country_code}. Error: {phy_ids_response["error"]}'
)
end_time = time.time() + TIME_TO_WAIT_FOR_COUNTRY_CODE
while time.time() < end_time:
for id in phy_ids_response["result"]:
get_country_response = self.device.sl4f.wlan_lib.wlanGetCountry(id)
if get_country_response.get("error"):
raise ConnectionError(
"Failed to query PHY ID (%s) for country. Error: %s"
% (id, get_country_response["error"])
)
set_code = "".join(
[chr(ascii_char) for ascii_char in get_country_response["result"]]
)
if set_code != country_code:
self.log.debug(
"PHY (id: %s) has incorrect country code set. "
"Expected: %s, Got: %s" % (id, country_code, set_code)
)
break
else:
self.log.info(f"All PHYs have expected country code ({country_code})")
break
time.sleep(TIME_TO_SLEEP_BETWEEN_RETRIES)
else:
raise EnvironmentError(f"Failed to set DUT country code to {country_code}.")