| #!/usr/bin/env python3 |
| # |
| # Copyright 2022 The Fuchsia Authors |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| import time |
| from dataclasses import dataclass |
| |
| from honeydew.typing.wlan import CountryCode, QueryIfaceResponse, WlanMacRole |
| |
| from antlion import logger, signals, utils |
| from antlion.controllers.fuchsia_lib.sl4f import SL4F |
| from antlion.validation import MapValidator |
| |
| TIME_TO_SLEEP_BETWEEN_RETRIES = 1 |
| TIME_TO_WAIT_FOR_COUNTRY_CODE = 10 |
| |
| |
| @dataclass(frozen=True) |
| class WlanInterfaces: |
| client: dict[str, QueryIfaceResponse] |
| ap: dict[str, QueryIfaceResponse] |
| |
| |
| class WlanControllerError(signals.ControllerError): |
| pass |
| |
| |
| class WlanController: |
| """Contains methods related to wlan core, to be used in FuchsiaDevice object""" |
| |
| def __init__(self, sl4f: SL4F) -> None: |
| self.sl4f = sl4f |
| self.log = logger.create_tagged_trace_logger( |
| f"WlanController | {self.sl4f.address}" |
| ) |
| |
| def get_interfaces_by_role(self) -> WlanInterfaces: |
| """Retrieves WLAN interface information.""" |
| |
| # Retrieve WLAN interface IDs |
| wlan_iface_ids = self.sl4f.wlan_lib.get_iface_id_list() |
| if len(wlan_iface_ids) < 1: |
| return WlanInterfaces(client={}, ap={}) |
| |
| # Use IDs to get WLAN interface info and mac addresses |
| wlan_ifaces_by_mac: dict[str, QueryIfaceResponse] = {} |
| for id in wlan_iface_ids: |
| result = self.sl4f.wlan_lib.query_iface(id) |
| mac = utils.mac_address_list_to_str(bytes(result.sta_addr)) |
| wlan_ifaces_by_mac[mac] = result |
| |
| # Use mac addresses to query the interfaces from the netstack view, |
| # which allows us to supplement the interface information with the name, |
| # netstack_id, etc. |
| |
| # TODO(fxb/75909): This tedium is necessary to get the interface name |
| # because only netstack has that information. The bug linked here is |
| # to reconcile some of the information between the two perspectives, at |
| # which point we can eliminate step. |
| net_ifaces_response = self.sl4f.netstack_lib.netstackListInterfaces() |
| if net_ifaces_response.get("error"): |
| raise WlanControllerError( |
| f"Failed to get network interfaces list: {net_ifaces_response['error']}" |
| ) |
| net_ifaces = net_ifaces_response["result"] |
| |
| client: dict[str, QueryIfaceResponse] = {} |
| ap: dict[str, QueryIfaceResponse] = {} |
| for iface in net_ifaces: |
| iface = MapValidator(iface) |
| try: |
| # Some interfaces might not have a MAC |
| mac_raw = iface.list("mac").all(int) |
| iface_mac = utils.mac_address_list_to_str(bytes(mac_raw)) |
| except Exception as e: |
| self.log.debug(f"Error {e} getting MAC for iface {iface}") |
| continue |
| if iface_mac in wlan_ifaces_by_mac: |
| result = wlan_ifaces_by_mac[iface_mac] |
| name = iface.get(str, "name") |
| match result.role: |
| case WlanMacRole.CLIENT: |
| client[name] = result |
| case WlanMacRole.AP: |
| ap[name] = result |
| case _: |
| raise ValueError(f'Unexpected WlanMacRole "{result.role}"') |
| |
| return WlanInterfaces(client, ap) |
| |
| def set_country_code(self, country_code: CountryCode) -> None: |
| """Sets country code through the regulatory region service and waits |
| for the code to be applied to WLAN PHY. |
| |
| Args: |
| country_code: the 2 character country code to set |
| |
| Raises: |
| EnvironmentError - failure to get/set regulatory region |
| ConnectionError - failure to query PHYs |
| """ |
| self.log.info(f"Setting DUT country code to {country_code}") |
| self.sl4f.wlan_lib.set_region(country_code) |
| |
| self.log.info( |
| f"Verifying DUT country code was correctly set to {country_code}." |
| ) |
| phy_ids_response = self.sl4f.wlan_lib.get_phy_id_list() |
| |
| end_time = time.time() + TIME_TO_WAIT_FOR_COUNTRY_CODE |
| while time.time() < end_time: |
| for id in phy_ids_response: |
| resp = self.sl4f.wlan_lib.get_country(id) |
| if resp == country_code: |
| return |
| time.sleep(TIME_TO_SLEEP_BETWEEN_RETRIES) |
| else: |
| raise EnvironmentError(f"Failed to set DUT country code to {country_code}.") |