| #!/usr/bin/env python3 |
| # |
| # Copyright 2022 The Fuchsia Authors |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| import ipaddress |
| import time |
| from dataclasses import dataclass |
| from typing import Any, FrozenSet, Mapping |
| |
| from antlion import logger, utils |
| from antlion.capabilities.ssh import SSHConfig, SSHProvider |
| from antlion.controllers.ap_lib import hostapd_constants |
| from antlion.controllers.ap_lib.ap_get_interface import ApInterfaces |
| from antlion.controllers.ap_lib.ap_iwconfig import ApIwconfig |
| from antlion.controllers.ap_lib.bridge_interface import BridgeInterface |
| from antlion.controllers.ap_lib.dhcp_config import DhcpConfig, Subnet |
| from antlion.controllers.ap_lib.dhcp_server import DhcpServer, NoInterfaceError |
| from antlion.controllers.ap_lib.extended_capabilities import ExtendedCapabilities |
| from antlion.controllers.ap_lib.hostapd import Hostapd |
| from antlion.controllers.ap_lib.hostapd_ap_preset import create_ap_preset |
| from antlion.controllers.ap_lib.hostapd_config import HostapdConfig |
| from antlion.controllers.ap_lib.hostapd_security import Security |
| from antlion.controllers.ap_lib.radvd import Radvd |
| from antlion.controllers.ap_lib.radvd_config import RadvdConfig |
| from antlion.controllers.ap_lib.wireless_network_management import ( |
| BssTransitionManagementRequest, |
| ) |
| from antlion.controllers.pdu import PduDevice, get_pdu_port_for_device |
| from antlion.controllers.utils_lib.commands import ip, route |
| from antlion.controllers.utils_lib.ssh import connection, settings |
| from antlion.libs.proc import job |
| |
| MOBLY_CONTROLLER_CONFIG_NAME = "AccessPoint" |
| ACTS_CONTROLLER_REFERENCE_NAME = "access_points" |
| |
| |
| class Error(Exception): |
| """Error raised when there is a problem with the access point.""" |
| |
| |
| @dataclass |
| class _ApInstance: |
| hostapd: Hostapd |
| subnet: Subnet |
| |
| |
| # These ranges were split this way since each physical radio can have up |
| # to 8 SSIDs so for the 2GHz radio the DHCP range will be |
| # 192.168.1 - 8 and the 5Ghz radio will be 192.168.9 - 16 |
| _AP_2GHZ_SUBNET_STR_DEFAULT = "192.168.1.0/24" |
| _AP_5GHZ_SUBNET_STR_DEFAULT = "192.168.9.0/24" |
| |
| # The last digit of the ip for the bridge interface |
| BRIDGE_IP_LAST = "100" |
| |
| |
| class AccessPoint(object): |
| """An access point controller. |
| |
| Attributes: |
| ssh: The ssh connection to this ap. |
| ssh_settings: The ssh settings being used by the ssh connection. |
| dhcp_settings: The dhcp server settings being used. |
| """ |
| |
| def __init__(self, configs: Mapping[str, Any]) -> None: |
| """ |
| Args: |
| configs: configs for the access point from config file. |
| """ |
| self.ssh_settings = settings.from_config(configs["ssh_config"]) |
| self.log = logger.create_logger( |
| lambda msg: f"[Access Point|{self.ssh_settings.hostname}] {msg}" |
| ) |
| self.device_pdu_config = configs.get("PduDevice", None) |
| self.identifier = self.ssh_settings.hostname |
| |
| if "ap_subnet" in configs: |
| self._AP_2G_SUBNET_STR: str = configs["ap_subnet"]["2g"] |
| self._AP_5G_SUBNET_STR: str = configs["ap_subnet"]["5g"] |
| else: |
| self._AP_2G_SUBNET_STR = _AP_2GHZ_SUBNET_STR_DEFAULT |
| self._AP_5G_SUBNET_STR = _AP_5GHZ_SUBNET_STR_DEFAULT |
| |
| self._AP_2G_SUBNET = Subnet(ipaddress.ip_network(self._AP_2G_SUBNET_STR)) |
| self._AP_5G_SUBNET = Subnet(ipaddress.ip_network(self._AP_5G_SUBNET_STR)) |
| |
| self.ssh = connection.SshConnection(self.ssh_settings) |
| |
| # TODO(http://b/278758876): Replace self.ssh with self.ssh_provider |
| self.ssh_provider = SSHProvider( |
| SSHConfig( |
| self.ssh_settings.username, |
| self.ssh_settings.hostname, |
| self.ssh_settings.identity_file, |
| port=self.ssh_settings.port, |
| ssh_binary=self.ssh_settings.executable, |
| connect_timeout=90, |
| ) |
| ) |
| |
| # Singleton utilities for running various commands. |
| self._ip_cmd = ip.LinuxIpCommand(self.ssh) |
| self._route_cmd = route.LinuxRouteCommand(self.ssh) |
| |
| # A map from network interface name to _ApInstance objects representing |
| # the hostapd instance running against the interface. |
| self._aps: dict[str, _ApInstance] = dict() |
| self._dhcp: DhcpServer | None = None |
| self._dhcp_bss: dict[Any, Subnet] = dict() |
| self._radvd: Radvd | None = None |
| self.bridge = BridgeInterface(self) |
| self.iwconfig = ApIwconfig(self) |
| |
| # Check to see if wan_interface is specified in acts_config for tests |
| # isolated from the internet and set this override. |
| self.interfaces = ApInterfaces(self, configs.get("wan_interface")) |
| |
| # Get needed interface names and initialize the unnecessary ones. |
| self.wan = self.interfaces.get_wan_interface() |
| self.wlan = self.interfaces.get_wlan_interface() |
| self.wlan_2g = self.wlan[0] |
| self.wlan_5g = self.wlan[1] |
| self.lan = self.interfaces.get_lan_interface() |
| self._initial_ap() |
| self.setup_bridge = False |
| |
| def _initial_ap(self) -> None: |
| """Initial AP interfaces. |
| |
| Bring down hostapd if instance is running, bring down all bridge |
| interfaces. |
| """ |
| # This is necessary for Gale/Whirlwind flashed with dev channel image |
| # Unused interfaces such as existing hostapd daemon, guest, mesh |
| # interfaces need to be brought down as part of the AP initialization |
| # process, otherwise test would fail. |
| try: |
| self.ssh.run("stop wpasupplicant") |
| except job.Error: |
| self.log.info("No wpasupplicant running") |
| try: |
| self.ssh.run("stop hostapd") |
| except job.Error: |
| self.log.info("No hostapd running") |
| # Bring down all wireless interfaces |
| for iface in self.wlan: |
| WLAN_DOWN = f"ip link set {iface} down" |
| self.ssh.run(WLAN_DOWN) |
| # Bring down all bridge interfaces |
| bridge_interfaces = self.interfaces.get_bridge_interface() |
| for iface in bridge_interfaces: |
| BRIDGE_DOWN = f"ip link set {iface} down" |
| BRIDGE_DEL = f"brctl delbr {iface}" |
| self.ssh.run(BRIDGE_DOWN) |
| self.ssh.run(BRIDGE_DEL) |
| |
| def start_ap( |
| self, |
| hostapd_config: HostapdConfig, |
| radvd_config: RadvdConfig | None = None, |
| setup_bridge: bool = False, |
| is_nat_enabled: bool = True, |
| additional_parameters: dict[str, Any] | None = None, |
| ) -> list[Any]: |
| """Starts as an ap using a set of configurations. |
| |
| This will start an ap on this host. To start an ap the controller |
| selects a network interface to use based on the configs given. It then |
| will start up hostapd on that interface. Next a subnet is created for |
| the network interface and dhcp server is refreshed to give out ips |
| for that subnet for any device that connects through that interface. |
| |
| Args: |
| hostapd_config: The configurations to use when starting up the ap. |
| radvd_config: The IPv6 configuration to use when starting up the ap. |
| setup_bridge: Whether to bridge the LAN interface WLAN interface. |
| Only one WLAN interface can be bridged with the LAN interface |
| and none of the guest networks can be bridged. |
| is_nat_enabled: If True, start NAT on the AP to allow the DUT to be |
| able to access the internet if the WAN port is connected to the |
| internet. |
| additional_parameters: Parameters that can sent directly into the |
| hostapd config file. This can be used for debugging and or |
| adding one off parameters into the config. |
| |
| Returns: |
| An identifier for each ssid being started. These identifiers can be |
| used later by this controller to control the ap. |
| |
| Raises: |
| Error: When the ap can't be brought up. |
| """ |
| if additional_parameters is None: |
| additional_parameters = {} |
| |
| if hostapd_config.frequency < 5000: |
| interface = self.wlan_2g |
| subnet = self._AP_2G_SUBNET |
| else: |
| interface = self.wlan_5g |
| subnet = self._AP_5G_SUBNET |
| |
| # radvd requires the interface to have a IPv6 link-local address. |
| if radvd_config: |
| self.ssh.run(f"sysctl -w net.ipv6.conf.{interface}.disable_ipv6=0") |
| self.ssh.run(f"sysctl -w net.ipv6.conf.{interface}.forwarding=1") |
| |
| # In order to handle dhcp servers on any interface, the initiation of |
| # the dhcp server must be done after the wlan interfaces are figured |
| # out as opposed to being in __init__ |
| self._dhcp = DhcpServer(self.ssh, interface=interface) |
| |
| # For multi bssid configurations the mac address |
| # of the wireless interface needs to have enough space to mask out |
| # up to 8 different mac addresses. So in for one interface the range is |
| # hex 0-7 and for the other the range is hex 8-f. |
| interface_mac_orig = None |
| cmd = f"ip link show {interface}|grep ether|awk -F' ' '{{print $2}}'" |
| interface_mac_orig = self.ssh.run(cmd) |
| if interface == self.wlan_5g: |
| hostapd_config.bssid = f"{interface_mac_orig.stdout[:-1]}0" |
| last_octet = 1 |
| if interface == self.wlan_2g: |
| hostapd_config.bssid = f"{interface_mac_orig.stdout[:-1]}8" |
| last_octet = 9 |
| if interface in self._aps: |
| raise ValueError( |
| "No WiFi interface available for AP on " |
| f"channel {hostapd_config.channel}" |
| ) |
| |
| apd = Hostapd(self.ssh, interface) |
| new_instance = _ApInstance(hostapd=apd, subnet=subnet) |
| self._aps[interface] = new_instance |
| |
| # Turn off the DHCP server, we're going to change its settings. |
| self.stop_dhcp() |
| # Clear all routes to prevent old routes from interfering. |
| self._route_cmd.clear_routes(net_interface=interface) |
| # Add IPv6 link-local route so packets destined to the AP will be |
| # processed by the AP. This is necessary if an iperf server is running |
| # on the AP, but not for traffic handled by the Linux networking stack |
| # such as ping. |
| if radvd_config: |
| self._route_cmd.add_route(interface, "fe80::/64") |
| |
| self._dhcp_bss = dict() |
| if hostapd_config.bss_lookup: |
| # The self._dhcp_bss dictionary is created to hold the key/value |
| # pair of the interface name and the ip scope that will be |
| # used for the particular interface. The a, b, c, d |
| # variables below are the octets for the ip address. The |
| # third octet is then incremented for each interface that |
| # is requested. This part is designed to bring up the |
| # hostapd interfaces and not the DHCP servers for each |
| # interface. |
| counter = 1 |
| for bss in hostapd_config.bss_lookup: |
| if interface_mac_orig: |
| hostapd_config.bss_lookup[bss].bssid = ( |
| interface_mac_orig.stdout[:-1] + hex(last_octet)[-1:] |
| ) |
| self._route_cmd.clear_routes(net_interface=str(bss)) |
| if interface is self.wlan_2g: |
| starting_ip_range = self._AP_2G_SUBNET_STR |
| else: |
| starting_ip_range = self._AP_5G_SUBNET_STR |
| a, b, c, d = starting_ip_range.split(".") |
| self._dhcp_bss[bss] = Subnet( |
| ipaddress.ip_network(f"{a}.{b}.{int(c) + counter}.{d}") |
| ) |
| counter = counter + 1 |
| last_octet = last_octet + 1 |
| |
| apd.start(hostapd_config, additional_parameters=additional_parameters) |
| |
| # The DHCP serer requires interfaces to have ips and routes before |
| # the server will come up. |
| interface_ip = ipaddress.ip_interface( |
| f"{subnet.router}/{subnet.network.netmask}" |
| ) |
| if setup_bridge is True: |
| bridge_interface_name = "eth_test" |
| interfaces = [interface] |
| if self.lan: |
| interfaces.append(self.lan) |
| self.create_bridge(bridge_interface_name, interfaces) |
| self._ip_cmd.set_ipv4_address(bridge_interface_name, interface_ip) |
| else: |
| self._ip_cmd.set_ipv4_address(interface, interface_ip) |
| if hostapd_config.bss_lookup: |
| # This loop goes through each interface that was setup for |
| # hostapd and assigns the DHCP scopes that were defined but |
| # not used during the hostapd loop above. The k and v |
| # variables represent the interface name, k, and dhcp info, v. |
| for k, v in self._dhcp_bss.items(): |
| bss_interface_ip = ipaddress.ip_interface( |
| f"{self._dhcp_bss[k].router}/{self._dhcp_bss[k].network.netmask}" |
| ) |
| self._ip_cmd.set_ipv4_address(str(k), bss_interface_ip) |
| |
| # Restart the DHCP server with our updated list of subnets. |
| configured_subnets = self.get_configured_subnets() |
| dhcp_conf = DhcpConfig(subnets=configured_subnets) |
| self.start_dhcp(dhcp_conf=dhcp_conf) |
| if is_nat_enabled: |
| self.start_nat() |
| self.enable_forwarding() |
| else: |
| self.stop_nat() |
| self.enable_forwarding() |
| if radvd_config: |
| radvd_interface = bridge_interface_name if setup_bridge else interface |
| self._radvd = Radvd(self.ssh, radvd_interface) |
| self._radvd.start(radvd_config) |
| else: |
| self._radvd = None |
| |
| bss_interfaces = [bss for bss in hostapd_config.bss_lookup] |
| bss_interfaces.append(interface) |
| |
| return bss_interfaces |
| |
| def get_configured_subnets(self) -> list[Subnet]: |
| """Get the list of configured subnets on the access point. |
| |
| This allows consumers of the access point objects create custom DHCP |
| configs with the correct subnets. |
| |
| Returns: a list of Subnet objects |
| """ |
| configured_subnets = [x.subnet for x in self._aps.values()] |
| for k, v in self._dhcp_bss.items(): |
| configured_subnets.append(v) |
| return configured_subnets |
| |
| def start_dhcp(self, dhcp_conf: DhcpConfig) -> None: |
| """Start a DHCP server for the specified subnets. |
| |
| This allows consumers of the access point objects to control DHCP. |
| |
| Args: |
| dhcp_conf: A DhcpConfig object. |
| |
| Raises: |
| Error: Raised when a dhcp server error is found. |
| """ |
| if self._dhcp is not None: |
| self._dhcp.start(config=dhcp_conf) |
| |
| def stop_dhcp(self) -> None: |
| """Stop DHCP for this AP object. |
| |
| This allows consumers of the access point objects to control DHCP. |
| """ |
| if self._dhcp is not None: |
| self._dhcp.stop() |
| |
| def get_dhcp_logs(self) -> str | None: |
| """Get DHCP logs for this AP object. |
| |
| This allows consumers of the access point objects to validate DHCP |
| behavior. |
| |
| Returns: |
| A string of the dhcp server logs, or None is a DHCP server has not |
| been started. |
| """ |
| if self._dhcp is not None: |
| return self._dhcp.get_logs() |
| return None |
| |
| def get_hostapd_logs(self) -> dict[str, str]: |
| """Get hostapd logs for all interfaces on AP object. |
| |
| This allows consumers of the access point objects to validate hostapd |
| behavior. |
| |
| Returns: A dict with {interface: log} from hostapd instances. |
| """ |
| hostapd_logs: dict[str, str] = dict() |
| for identifier in self._aps: |
| hostapd_logs[identifier] = self._aps[identifier].hostapd.pull_logs() |
| return hostapd_logs |
| |
| def get_radvd_logs(self) -> str | None: |
| """Get radvd logs for this AP object. |
| |
| This allows consumers of the access point objects to validate radvd |
| behavior. |
| |
| Returns: |
| A string of the radvd logs, or None is a radvd server has not been |
| started. |
| """ |
| if self._radvd: |
| return self._radvd.pull_logs() |
| return None |
| |
| def enable_forwarding(self) -> None: |
| """Enable IPv4 and IPv6 forwarding on the AP. |
| |
| When forwarding is enabled, the access point is able to route IP packets |
| between devices in the same subnet. |
| """ |
| self.ssh.run("echo 1 > /proc/sys/net/ipv4/ip_forward") |
| self.ssh.run("echo 1 > /proc/sys/net/ipv6/conf/all/forwarding") |
| |
| def start_nat(self) -> None: |
| """Start NAT on the AP. |
| |
| This allows consumers of the access point objects to enable NAT |
| on the AP. |
| |
| Note that this is currently a global setting, since we don't |
| have per-interface masquerade rules. |
| """ |
| # The following three commands are needed to enable NAT between |
| # the WAN and LAN/WLAN ports. This means anyone connecting to the |
| # WLAN/LAN ports will be able to access the internet if the WAN port |
| # is connected to the internet. |
| self.ssh.run("iptables -t nat -F") |
| self.ssh.run(f"iptables -t nat -A POSTROUTING -o {self.wan} -j MASQUERADE") |
| |
| def stop_nat(self) -> None: |
| """Stop NAT on the AP. |
| |
| This allows consumers of the access point objects to disable NAT on the |
| AP. |
| |
| Note that this is currently a global setting, since we don't have |
| per-interface masquerade rules. |
| """ |
| self.ssh.run("iptables -t nat -F") |
| |
| def create_bridge(self, bridge_name: str, interfaces: list[str]) -> None: |
| """Create the specified bridge and bridge the specified interfaces. |
| |
| Args: |
| bridge_name: The name of the bridge to create. |
| interfaces: A list of interfaces to add to the bridge. |
| """ |
| |
| # Create the bridge interface |
| self.ssh.run(f"brctl addbr {bridge_name}") |
| |
| for interface in interfaces: |
| self.ssh.run(f"brctl addif {bridge_name} {interface}") |
| |
| self.ssh.run(f"ip link set {bridge_name} up") |
| |
| def remove_bridge(self, bridge_name: str) -> None: |
| """Removes the specified bridge |
| |
| Args: |
| bridge_name: The name of the bridge to remove. |
| """ |
| # Check if the bridge exists. |
| # |
| # Cases where it may not are if we failed to initialize properly |
| # |
| # Or if we're doing 2.4Ghz and 5Ghz SSIDs and we've already torn |
| # down the bridge once, but we got called for each band. |
| result = self.ssh.run(f"brctl show {bridge_name}", ignore_status=True) |
| |
| # If the bridge exists, we'll get an exit_status of 0, indicating |
| # success, so we can continue and remove the bridge. |
| if result.exit_status == 0: |
| self.ssh.run(f"ip link set {bridge_name} down") |
| self.ssh.run(f"brctl delbr {bridge_name}") |
| |
| def get_bssid_from_ssid(self, ssid: str, band: str) -> str | None: |
| """Gets the BSSID from a provided SSID |
| |
| Args: |
| ssid: An SSID string. |
| band: 2G or 5G Wifi band. |
| Returns: The BSSID if on the AP or None if SSID could not be found. |
| """ |
| if band == hostapd_constants.BAND_2G: |
| interfaces = [self.wlan_2g, ssid] |
| else: |
| interfaces = [self.wlan_5g, ssid] |
| |
| # Get the interface name associated with the given ssid. |
| for interface in interfaces: |
| iw_output = self.ssh.run( |
| f"iw dev {interface} info|grep ssid|awk -F' ' '{{print $2}}'" |
| ) |
| if "command failed: No such device" in iw_output.stderr: |
| continue |
| else: |
| # If the configured ssid is equal to the given ssid, we found |
| # the right interface. |
| if iw_output.stdout == ssid: |
| iw_output = self.ssh.run( |
| f"iw dev {interface} info|grep addr|awk -F' ' '{{print $2}}'" |
| ) |
| return iw_output.stdout |
| return None |
| |
| def stop_ap(self, identifier: str) -> None: |
| """Stops a running ap on this controller. |
| |
| Args: |
| identifier: The identify of the ap that should be taken down. |
| """ |
| |
| instance = self._aps.get(identifier) |
| if instance is None: |
| raise ValueError(f"Invalid identifier {identifier} given") |
| |
| if self._radvd: |
| self._radvd.stop() |
| try: |
| self.stop_dhcp() |
| except NoInterfaceError: |
| pass |
| self.stop_nat() |
| instance.hostapd.stop() |
| self._ip_cmd.clear_ipv4_addresses(identifier) |
| |
| del self._aps[identifier] |
| bridge_interfaces = self.interfaces.get_bridge_interface() |
| for iface in bridge_interfaces: |
| BRIDGE_DOWN = f"ip link set {iface} down" |
| BRIDGE_DEL = f"brctl delbr {iface}" |
| self.ssh.run(BRIDGE_DOWN) |
| self.ssh.run(BRIDGE_DEL) |
| |
| def stop_all_aps(self) -> None: |
| """Stops all running aps on this device.""" |
| |
| for ap in list(self._aps.keys()): |
| self.stop_ap(ap) |
| |
| def close(self) -> None: |
| """Called to take down the entire access point. |
| |
| When called will stop all aps running on this host, shutdown the dhcp |
| server, and stop the ssh connection. |
| """ |
| |
| if self._aps: |
| self.stop_all_aps() |
| self.ssh.close() |
| |
| def generate_bridge_configs(self, channel: int) -> tuple[str, str | None, str]: |
| """Generate a list of configs for a bridge between LAN and WLAN. |
| |
| Args: |
| channel: the channel WLAN interface is brought up on |
| iface_lan: the LAN interface to bridge |
| Returns: |
| configs: tuple containing iface_wlan, iface_lan and bridge_ip |
| """ |
| |
| if channel < 15: |
| iface_wlan = self.wlan_2g |
| subnet_str = self._AP_2G_SUBNET_STR |
| else: |
| iface_wlan = self.wlan_5g |
| subnet_str = self._AP_5G_SUBNET_STR |
| |
| iface_lan = self.lan |
| |
| a, b, c, _ = subnet_str.strip("/24").split(".") |
| bridge_ip = f"{a}.{b}.{c}.{BRIDGE_IP_LAST}" |
| |
| return (iface_wlan, iface_lan, bridge_ip) |
| |
| def ping( |
| self, |
| dest_ip: str, |
| count: int = 3, |
| interval: int = 1000, |
| timeout: int = 1000, |
| size: int = 56, |
| additional_ping_params: Any | None = None, |
| ) -> dict[str, Any]: |
| """Pings from AP to dest_ip, returns dict of ping stats (see utils.ping)""" |
| return utils.ping( |
| self.ssh, |
| dest_ip, |
| count=count, |
| interval=interval, |
| timeout=timeout, |
| size=size, |
| additional_ping_params=additional_ping_params, |
| ) |
| |
| def can_ping( |
| self, |
| dest_ip: str, |
| count: int = 1, |
| interval: int = 1000, |
| timeout: int = 1000, |
| size: int = 56, |
| additional_ping_params: Any | None = None, |
| ) -> bool: |
| """Returns whether ap can ping dest_ip (see utils.can_ping)""" |
| return utils.can_ping( |
| self.ssh, |
| dest_ip, |
| count=count, |
| interval=interval, |
| timeout=timeout, |
| size=size, |
| additional_ping_params=additional_ping_params, |
| ) |
| |
| def hard_power_cycle( |
| self, |
| pdus: list[PduDevice], |
| hostapd_configs: list[HostapdConfig] | None = None, |
| ) -> None: |
| """Kills, then restores power to AccessPoint, verifying it goes down and |
| comes back online cleanly. |
| |
| Args: |
| pdus: PDUs in the testbed |
| hostapd_configs: Hostapd settings. If present, these networks will |
| be spun up after the AP has rebooted. This list can either |
| contain HostapdConfig objects, or dictionaries with the start_ap |
| params |
| (i.e { 'hostapd_config': <HostapdConfig>, |
| 'setup_bridge': <bool>, |
| 'additional_parameters': <dict> } ). |
| Raise: |
| Error, if no PduDevice is provided in AccessPoint config. |
| ConnectionError, if AccessPoint fails to go offline or come back. |
| """ |
| if not self.device_pdu_config: |
| raise Error("No PduDevice provided in AccessPoint config.") |
| |
| if hostapd_configs is None: |
| hostapd_configs = [] |
| |
| self.log.info(f"Power cycling") |
| ap_pdu, ap_pdu_port = get_pdu_port_for_device(self.device_pdu_config, pdus) |
| |
| self.log.info(f"Killing power") |
| ap_pdu.off(str(ap_pdu_port)) |
| |
| self.log.info("Verifying AccessPoint is unreachable.") |
| self.ssh_provider.wait_until_unreachable() |
| self.log.info("AccessPoint is unreachable as expected.") |
| |
| self._aps.clear() |
| |
| self.log.info(f"Restoring power") |
| ap_pdu.on(str(ap_pdu_port)) |
| |
| self.log.info("Waiting for AccessPoint to become available via SSH.") |
| self.ssh_provider.wait_until_reachable() |
| self.log.info("AccessPoint responded to SSH.") |
| |
| # Allow 5 seconds for OS to finish getting set up |
| time.sleep(5) |
| self._initial_ap() |
| self.log.info("Power cycled successfully") |
| |
| for settings in hostapd_configs: |
| if isinstance(settings, HostapdConfig): |
| config = settings |
| setup_bridge = False |
| additional_parameters: dict[str, Any] = {} |
| |
| elif isinstance(settings, dict): |
| config = settings["hostapd_config"] |
| setup_bridge = settings.get("setup_bridge", False) |
| additional_parameters = settings.get("additional_parameters", {}) |
| else: |
| raise TypeError( |
| "Items in hostapd_configs list must either be " |
| "HostapdConfig objects or dictionaries." |
| ) |
| |
| self.log.info(f"Restarting network {config.ssid}") |
| self.start_ap( |
| config, |
| setup_bridge=setup_bridge, |
| additional_parameters=additional_parameters, |
| ) |
| |
| def channel_switch(self, identifier: str, channel_num: int) -> None: |
| """Switch to a different channel on the given AP.""" |
| instance = self._aps.get(identifier) |
| if instance is None: |
| raise ValueError(f"Invalid identifier {identifier} given") |
| self.log.info(f"channel switch to channel {channel_num}") |
| instance.hostapd.channel_switch(channel_num) |
| |
| def get_current_channel(self, identifier: str) -> int: |
| """Find the current channel on the given AP.""" |
| instance = self._aps.get(identifier) |
| if instance is None: |
| raise ValueError(f"Invalid identifier {identifier} given") |
| return instance.hostapd.get_current_channel() |
| |
| def get_stas(self, identifier: str) -> set[str]: |
| """Return MAC addresses of all associated STAs on the given AP.""" |
| instance = self._aps.get(identifier) |
| if instance is None: |
| raise ValueError(f"Invalid identifier {identifier} given") |
| return instance.hostapd.get_stas() |
| |
| def get_sta_extended_capabilities( |
| self, identifier: str, sta_mac: str |
| ) -> ExtendedCapabilities: |
| """Get extended capabilities for the given STA, as seen by the AP.""" |
| instance = self._aps.get(identifier) |
| if instance is None: |
| raise ValueError(f"Invalid identifier {identifier} given") |
| return instance.hostapd.get_sta_extended_capabilities(sta_mac) |
| |
| def send_bss_transition_management_req( |
| self, identifier: str, sta_mac: str, request: BssTransitionManagementRequest |
| ) -> job.Result: |
| """Send a BSS Transition Management request to an associated STA.""" |
| instance = self._aps.get(identifier) |
| if instance is None: |
| raise ValueError(f"Invalid identifier {identifier} given") |
| return instance.hostapd.send_bss_transition_management_req(sta_mac, request) |
| |
| |
| def setup_ap( |
| access_point: AccessPoint, |
| profile_name: str, |
| channel: int, |
| ssid: str, |
| mode: str | None = None, |
| preamble: bool | None = None, |
| beacon_interval: int | None = None, |
| dtim_period: int | None = None, |
| frag_threshold: int | None = None, |
| rts_threshold: int | None = None, |
| force_wmm: bool | None = None, |
| hidden: bool | None = False, |
| security: Security | None = None, |
| pmf_support: int | None = None, |
| additional_ap_parameters: dict[str, Any] | None = None, |
| password: str | None = None, |
| n_capabilities: list[Any] | None = None, |
| ac_capabilities: list[Any] | None = None, |
| vht_bandwidth: int | None = None, |
| wnm_features: FrozenSet[hostapd_constants.WnmFeature] = frozenset(), |
| setup_bridge: bool = False, |
| is_ipv6_enabled: bool = False, |
| is_nat_enabled: bool = True, |
| ): |
| """Creates a hostapd profile and runs it on an ap. This is a convenience |
| function that allows us to start an ap with a single function, without first |
| creating a hostapd config. |
| |
| Args: |
| access_point: An ACTS access_point controller |
| profile_name: The profile name of one of the hostapd ap presets. |
| channel: What channel to set the AP to. |
| preamble: Whether to set short or long preamble |
| beacon_interval: The beacon interval |
| dtim_period: Length of dtim period |
| frag_threshold: Fragmentation threshold |
| rts_threshold: RTS threshold |
| force_wmm: Enable WMM or not |
| hidden: Advertise the SSID or not |
| security: What security to enable. |
| pmf_support: Whether pmf is not disabled, enabled, or required |
| additional_ap_parameters: Additional parameters to send the AP. |
| password: Password to connect to WLAN if necessary. |
| check_connectivity: Whether to check for internet connectivity. |
| wnm_features: WNM features to enable on the AP. |
| setup_bridge: Whether to bridge the LAN interface WLAN interface. |
| Only one WLAN interface can be bridged with the LAN interface |
| and none of the guest networks can be bridged. |
| is_ipv6_enabled: If True, start a IPv6 router advertisement daemon |
| is_nat_enabled: If True, start NAT on the AP to allow the DUT to be able |
| to access the internet if the WAN port is connected to the internet. |
| |
| Returns: |
| An identifier for each ssid being started. These identifiers can be |
| used later by this controller to control the ap. |
| |
| Raises: |
| Error: When the ap can't be brought up. |
| """ |
| if additional_ap_parameters is None: |
| additional_ap_parameters = {} |
| |
| ap = create_ap_preset( |
| profile_name=profile_name, |
| iface_wlan_2g=access_point.wlan_2g, |
| iface_wlan_5g=access_point.wlan_5g, |
| channel=channel, |
| ssid=ssid, |
| mode=mode, |
| short_preamble=preamble, |
| beacon_interval=beacon_interval, |
| dtim_period=dtim_period, |
| frag_threshold=frag_threshold, |
| rts_threshold=rts_threshold, |
| force_wmm=force_wmm, |
| hidden=hidden, |
| bss_settings=[], |
| security=security, |
| pmf_support=pmf_support, |
| n_capabilities=n_capabilities, |
| ac_capabilities=ac_capabilities, |
| vht_bandwidth=vht_bandwidth, |
| wnm_features=wnm_features, |
| ) |
| return access_point.start_ap( |
| hostapd_config=ap, |
| radvd_config=RadvdConfig() if is_ipv6_enabled else None, |
| setup_bridge=setup_bridge, |
| is_nat_enabled=is_nat_enabled, |
| additional_parameters=additional_ap_parameters, |
| ) |
| |
| |
| def create(configs: Any) -> list[AccessPoint]: |
| """Creates ap controllers from a json config. |
| |
| Creates an ap controller from either a list, or a single |
| element. The element can either be just the hostname or a dictionary |
| containing the hostname and username of the ap to connect to over ssh. |
| |
| Args: |
| The json configs that represent this controller. |
| |
| Returns: |
| A new AccessPoint. |
| """ |
| return [AccessPoint(c) for c in configs] |
| |
| |
| def destroy(aps: list[AccessPoint]) -> None: |
| """Destroys a list of access points. |
| |
| Args: |
| aps: The list of access points to destroy. |
| """ |
| for ap in aps: |
| ap.close() |
| |
| |
| def get_info(aps: list[AccessPoint]) -> list[str]: |
| """Get information on a list of access points. |
| |
| Args: |
| aps: A list of AccessPoints. |
| |
| Returns: |
| A list of all aps hostname. |
| """ |
| return [ap.ssh_settings.hostname for ap in aps] |