blob: 9789a47730266be8c8e2d039c7590e24ce688313 [file] [log] [blame]
#!/usr/bin/env python3
#
# Copyright 2022 The Fuchsia Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
import ipaddress
import time
from dataclasses import dataclass
from typing import Any, FrozenSet, Mapping
from antlion import logger, utils
from antlion.capabilities.ssh import SSHConfig, SSHProvider
from antlion.controllers.ap_lib import hostapd_constants
from antlion.controllers.ap_lib.ap_get_interface import ApInterfaces
from antlion.controllers.ap_lib.ap_iwconfig import ApIwconfig
from antlion.controllers.ap_lib.bridge_interface import BridgeInterface
from antlion.controllers.ap_lib.dhcp_config import DhcpConfig, Subnet
from antlion.controllers.ap_lib.dhcp_server import DhcpServer, NoInterfaceError
from antlion.controllers.ap_lib.extended_capabilities import ExtendedCapabilities
from antlion.controllers.ap_lib.hostapd import Hostapd
from antlion.controllers.ap_lib.hostapd_ap_preset import create_ap_preset
from antlion.controllers.ap_lib.hostapd_config import HostapdConfig
from antlion.controllers.ap_lib.hostapd_security import Security
from antlion.controllers.ap_lib.radvd import Radvd
from antlion.controllers.ap_lib.radvd_config import RadvdConfig
from antlion.controllers.ap_lib.wireless_network_management import (
BssTransitionManagementRequest,
)
from antlion.controllers.pdu import PduDevice, get_pdu_port_for_device
from antlion.controllers.utils_lib.commands import ip, route
from antlion.controllers.utils_lib.ssh import connection, settings
from antlion.libs.proc import job
MOBLY_CONTROLLER_CONFIG_NAME = "AccessPoint"
ACTS_CONTROLLER_REFERENCE_NAME = "access_points"
class Error(Exception):
"""Error raised when there is a problem with the access point."""
@dataclass
class _ApInstance:
hostapd: Hostapd
subnet: Subnet
# These ranges were split this way since each physical radio can have up
# to 8 SSIDs so for the 2GHz radio the DHCP range will be
# 192.168.1 - 8 and the 5Ghz radio will be 192.168.9 - 16
_AP_2GHZ_SUBNET_STR_DEFAULT = "192.168.1.0/24"
_AP_5GHZ_SUBNET_STR_DEFAULT = "192.168.9.0/24"
# The last digit of the ip for the bridge interface
BRIDGE_IP_LAST = "100"
class AccessPoint(object):
"""An access point controller.
Attributes:
ssh: The ssh connection to this ap.
ssh_settings: The ssh settings being used by the ssh connection.
dhcp_settings: The dhcp server settings being used.
"""
def __init__(self, configs: Mapping[str, Any]) -> None:
"""
Args:
configs: configs for the access point from config file.
"""
self.ssh_settings = settings.from_config(configs["ssh_config"])
self.log = logger.create_logger(
lambda msg: f"[Access Point|{self.ssh_settings.hostname}] {msg}"
)
self.device_pdu_config = configs.get("PduDevice", None)
self.identifier = self.ssh_settings.hostname
if "ap_subnet" in configs:
self._AP_2G_SUBNET_STR: str = configs["ap_subnet"]["2g"]
self._AP_5G_SUBNET_STR: str = configs["ap_subnet"]["5g"]
else:
self._AP_2G_SUBNET_STR = _AP_2GHZ_SUBNET_STR_DEFAULT
self._AP_5G_SUBNET_STR = _AP_5GHZ_SUBNET_STR_DEFAULT
self._AP_2G_SUBNET = Subnet(ipaddress.IPv4Network(self._AP_2G_SUBNET_STR))
self._AP_5G_SUBNET = Subnet(ipaddress.IPv4Network(self._AP_5G_SUBNET_STR))
self.ssh = connection.SshConnection(self.ssh_settings)
# TODO(http://b/278758876): Replace self.ssh with self.ssh_provider
self.ssh_provider = SSHProvider(
SSHConfig(
self.ssh_settings.username,
self.ssh_settings.hostname,
self.ssh_settings.identity_file,
port=self.ssh_settings.port,
ssh_binary=self.ssh_settings.executable,
connect_timeout=90,
)
)
# Singleton utilities for running various commands.
self._ip_cmd = ip.LinuxIpCommand(self.ssh)
self._route_cmd = route.LinuxRouteCommand(self.ssh)
# A map from network interface name to _ApInstance objects representing
# the hostapd instance running against the interface.
self._aps: dict[str, _ApInstance] = dict()
self._dhcp: DhcpServer | None = None
self._dhcp_bss: dict[Any, Subnet] = dict()
self._radvd: Radvd | None = None
self.bridge = BridgeInterface(self)
self.iwconfig = ApIwconfig(self)
# Check to see if wan_interface is specified in acts_config for tests
# isolated from the internet and set this override.
self.interfaces = ApInterfaces(self, configs.get("wan_interface"))
# Get needed interface names and initialize the unnecessary ones.
self.wan = self.interfaces.get_wan_interface()
self.wlan = self.interfaces.get_wlan_interface()
self.wlan_2g = self.wlan[0]
self.wlan_5g = self.wlan[1]
self.lan = self.interfaces.get_lan_interface()
self._initial_ap()
self.setup_bridge = False
# Access points are not given internet access, so their system time needs to be
# manually set to be accurate.
self._sync_time()
def _initial_ap(self) -> None:
"""Initial AP interfaces.
Bring down hostapd if instance is running, bring down all bridge
interfaces.
"""
# This is necessary for Gale/Whirlwind flashed with dev channel image
# Unused interfaces such as existing hostapd daemon, guest, mesh
# interfaces need to be brought down as part of the AP initialization
# process, otherwise test would fail.
try:
self.ssh.run("stop wpasupplicant")
except job.Error:
self.log.info("No wpasupplicant running")
try:
self.ssh.run("stop hostapd")
except job.Error:
self.log.info("No hostapd running")
# Bring down all wireless interfaces
for iface in self.wlan:
WLAN_DOWN = f"ip link set {iface} down"
self.ssh.run(WLAN_DOWN)
# Bring down all bridge interfaces
bridge_interfaces = self.interfaces.get_bridge_interface()
for iface in bridge_interfaces:
BRIDGE_DOWN = f"ip link set {iface} down"
BRIDGE_DEL = f"brctl delbr {iface}"
self.ssh.run(BRIDGE_DOWN)
self.ssh.run(BRIDGE_DEL)
def _sync_time(self) -> None:
"""Synchronize the system time.
Allows for better synchronization between antlion host logs and AP logs.
Useful for when the device does not have internet connection.
"""
now = datetime.datetime.now().astimezone().isoformat()
self.ssh.run(f'date -s "{now}"')
def start_ap(
self,
hostapd_config: HostapdConfig,
radvd_config: RadvdConfig | None = None,
setup_bridge: bool = False,
is_nat_enabled: bool = True,
additional_parameters: dict[str, Any] | None = None,
) -> list[str]:
"""Starts as an ap using a set of configurations.
This will start an ap on this host. To start an ap the controller
selects a network interface to use based on the configs given. It then
will start up hostapd on that interface. Next a subnet is created for
the network interface and dhcp server is refreshed to give out ips
for that subnet for any device that connects through that interface.
Args:
hostapd_config: The configurations to use when starting up the ap.
radvd_config: The IPv6 configuration to use when starting up the ap.
setup_bridge: Whether to bridge the LAN interface WLAN interface.
Only one WLAN interface can be bridged with the LAN interface
and none of the guest networks can be bridged.
is_nat_enabled: If True, start NAT on the AP to allow the DUT to be
able to access the internet if the WAN port is connected to the
internet.
additional_parameters: Parameters that can sent directly into the
hostapd config file. This can be used for debugging and or
adding one off parameters into the config.
Returns:
An identifier for each ssid being started. These identifiers can be
used later by this controller to control the ap.
Raises:
Error: When the ap can't be brought up.
"""
if additional_parameters is None:
additional_parameters = {}
if hostapd_config.frequency < 5000:
interface = self.wlan_2g
subnet = self._AP_2G_SUBNET
else:
interface = self.wlan_5g
subnet = self._AP_5G_SUBNET
# radvd requires the interface to have a IPv6 link-local address.
if radvd_config:
self.ssh.run(f"sysctl -w net.ipv6.conf.{interface}.disable_ipv6=0")
self.ssh.run(f"sysctl -w net.ipv6.conf.{interface}.forwarding=1")
# In order to handle dhcp servers on any interface, the initiation of
# the dhcp server must be done after the wlan interfaces are figured
# out as opposed to being in __init__
self._dhcp = DhcpServer(self.ssh, interface=interface)
# For multi bssid configurations the mac address
# of the wireless interface needs to have enough space to mask out
# up to 8 different mac addresses. So in for one interface the range is
# hex 0-7 and for the other the range is hex 8-f.
interface_mac_orig = None
cmd = f"ip link show {interface}|grep ether|awk -F' ' '{{print $2}}'"
interface_mac_orig = self.ssh.run(cmd)
if interface == self.wlan_5g:
hostapd_config.bssid = f"{interface_mac_orig.stdout[:-1]}0"
last_octet = 1
if interface == self.wlan_2g:
hostapd_config.bssid = f"{interface_mac_orig.stdout[:-1]}8"
last_octet = 9
if interface in self._aps:
raise ValueError(
"No WiFi interface available for AP on "
f"channel {hostapd_config.channel}"
)
apd = Hostapd(self.ssh, interface)
new_instance = _ApInstance(hostapd=apd, subnet=subnet)
self._aps[interface] = new_instance
# Turn off the DHCP server, we're going to change its settings.
self.stop_dhcp()
# Clear all routes to prevent old routes from interfering.
self._route_cmd.clear_routes(net_interface=interface)
# Add IPv6 link-local route so packets destined to the AP will be
# processed by the AP. This is necessary if an iperf server is running
# on the AP, but not for traffic handled by the Linux networking stack
# such as ping.
if radvd_config:
self._route_cmd.add_route(interface, "fe80::/64")
self._dhcp_bss = dict()
if hostapd_config.bss_lookup:
# The self._dhcp_bss dictionary is created to hold the key/value
# pair of the interface name and the ip scope that will be
# used for the particular interface. The a, b, c, d
# variables below are the octets for the ip address. The
# third octet is then incremented for each interface that
# is requested. This part is designed to bring up the
# hostapd interfaces and not the DHCP servers for each
# interface.
counter = 1
for bss in hostapd_config.bss_lookup:
if interface_mac_orig:
hostapd_config.bss_lookup[bss].bssid = (
interface_mac_orig.stdout[:-1] + hex(last_octet)[-1:]
)
self._route_cmd.clear_routes(net_interface=str(bss))
if interface is self.wlan_2g:
starting_ip_range = self._AP_2G_SUBNET_STR
else:
starting_ip_range = self._AP_5G_SUBNET_STR
a, b, c, d = starting_ip_range.split(".")
self._dhcp_bss[bss] = Subnet(
ipaddress.IPv4Network(f"{a}.{b}.{int(c) + counter}.{d}")
)
counter = counter + 1
last_octet = last_octet + 1
apd.start(hostapd_config, additional_parameters=additional_parameters)
# The DHCP serer requires interfaces to have ips and routes before
# the server will come up.
interface_ip = ipaddress.ip_interface(
f"{subnet.router}/{subnet.network.netmask}"
)
if setup_bridge is True:
bridge_interface_name = "eth_test"
interfaces = [interface]
if self.lan:
interfaces.append(self.lan)
self.create_bridge(bridge_interface_name, interfaces)
self._ip_cmd.set_ipv4_address(bridge_interface_name, interface_ip)
else:
self._ip_cmd.set_ipv4_address(interface, interface_ip)
if hostapd_config.bss_lookup:
# This loop goes through each interface that was setup for
# hostapd and assigns the DHCP scopes that were defined but
# not used during the hostapd loop above. The k and v
# variables represent the interface name, k, and dhcp info, v.
for k, v in self._dhcp_bss.items():
bss_interface_ip = ipaddress.ip_interface(
f"{self._dhcp_bss[k].router}/{self._dhcp_bss[k].network.netmask}"
)
self._ip_cmd.set_ipv4_address(str(k), bss_interface_ip)
# Restart the DHCP server with our updated list of subnets.
configured_subnets = self.get_configured_subnets()
dhcp_conf = DhcpConfig(subnets=configured_subnets)
self.start_dhcp(dhcp_conf=dhcp_conf)
if is_nat_enabled:
self.start_nat()
self.enable_forwarding()
else:
self.stop_nat()
self.enable_forwarding()
if radvd_config:
radvd_interface = bridge_interface_name if setup_bridge else interface
self._radvd = Radvd(self.ssh, radvd_interface)
self._radvd.start(radvd_config)
else:
self._radvd = None
bss_interfaces = [bss for bss in hostapd_config.bss_lookup]
bss_interfaces.append(interface)
return bss_interfaces
def get_configured_subnets(self) -> list[Subnet]:
"""Get the list of configured subnets on the access point.
This allows consumers of the access point objects create custom DHCP
configs with the correct subnets.
Returns: a list of Subnet objects
"""
configured_subnets = [x.subnet for x in self._aps.values()]
for k, v in self._dhcp_bss.items():
configured_subnets.append(v)
return configured_subnets
def start_dhcp(self, dhcp_conf: DhcpConfig) -> None:
"""Start a DHCP server for the specified subnets.
This allows consumers of the access point objects to control DHCP.
Args:
dhcp_conf: A DhcpConfig object.
Raises:
Error: Raised when a dhcp server error is found.
"""
if self._dhcp is not None:
self._dhcp.start(config=dhcp_conf)
def stop_dhcp(self) -> None:
"""Stop DHCP for this AP object.
This allows consumers of the access point objects to control DHCP.
"""
if self._dhcp is not None:
self._dhcp.stop()
def get_systemd_journal(self) -> str:
"""Get systemd journal logs from this current boot."""
return self.ssh.run(f"journalctl --boot").stdout
def get_dhcp_logs(self) -> str | None:
"""Get DHCP logs for this AP object.
This allows consumers of the access point objects to validate DHCP
behavior.
Returns:
A string of the dhcp server logs, or None is a DHCP server has not
been started.
"""
if self._dhcp is not None:
return self._dhcp.get_logs()
return None
def get_hostapd_logs(self) -> dict[str, str]:
"""Get hostapd logs for all interfaces on AP object.
This allows consumers of the access point objects to validate hostapd
behavior.
Returns: A dict with {interface: log} from hostapd instances.
"""
hostapd_logs: dict[str, str] = dict()
for identifier in self._aps:
hostapd_logs[identifier] = self._aps[identifier].hostapd.pull_logs()
return hostapd_logs
def get_radvd_logs(self) -> str | None:
"""Get radvd logs for this AP object.
This allows consumers of the access point objects to validate radvd
behavior.
Returns:
A string of the radvd logs, or None is a radvd server has not been
started.
"""
if self._radvd:
return self._radvd.pull_logs()
return None
def enable_forwarding(self) -> None:
"""Enable IPv4 and IPv6 forwarding on the AP.
When forwarding is enabled, the access point is able to route IP packets
between devices in the same subnet.
"""
self.ssh.run("echo 1 > /proc/sys/net/ipv4/ip_forward")
self.ssh.run("echo 1 > /proc/sys/net/ipv6/conf/all/forwarding")
def start_nat(self) -> None:
"""Start NAT on the AP.
This allows consumers of the access point objects to enable NAT
on the AP.
Note that this is currently a global setting, since we don't
have per-interface masquerade rules.
"""
# The following three commands are needed to enable NAT between
# the WAN and LAN/WLAN ports. This means anyone connecting to the
# WLAN/LAN ports will be able to access the internet if the WAN port
# is connected to the internet.
self.ssh.run("iptables -t nat -F")
self.ssh.run(f"iptables -t nat -A POSTROUTING -o {self.wan} -j MASQUERADE")
def stop_nat(self) -> None:
"""Stop NAT on the AP.
This allows consumers of the access point objects to disable NAT on the
AP.
Note that this is currently a global setting, since we don't have
per-interface masquerade rules.
"""
self.ssh.run("iptables -t nat -F")
def create_bridge(self, bridge_name: str, interfaces: list[str]) -> None:
"""Create the specified bridge and bridge the specified interfaces.
Args:
bridge_name: The name of the bridge to create.
interfaces: A list of interfaces to add to the bridge.
"""
# Create the bridge interface
self.ssh.run(f"brctl addbr {bridge_name}")
for interface in interfaces:
self.ssh.run(f"brctl addif {bridge_name} {interface}")
self.ssh.run(f"ip link set {bridge_name} up")
def remove_bridge(self, bridge_name: str) -> None:
"""Removes the specified bridge
Args:
bridge_name: The name of the bridge to remove.
"""
# Check if the bridge exists.
#
# Cases where it may not are if we failed to initialize properly
#
# Or if we're doing 2.4Ghz and 5Ghz SSIDs and we've already torn
# down the bridge once, but we got called for each band.
result = self.ssh.run(f"brctl show {bridge_name}", ignore_status=True)
# If the bridge exists, we'll get an exit_status of 0, indicating
# success, so we can continue and remove the bridge.
if result.returncode == 0:
self.ssh.run(f"ip link set {bridge_name} down")
self.ssh.run(f"brctl delbr {bridge_name}")
def get_bssid_from_ssid(self, ssid: str, band: str) -> str | None:
"""Gets the BSSID from a provided SSID
Args:
ssid: An SSID string.
band: 2G or 5G Wifi band.
Returns: The BSSID if on the AP or None if SSID could not be found.
"""
if band == hostapd_constants.BAND_2G:
interfaces = [self.wlan_2g, ssid]
else:
interfaces = [self.wlan_5g, ssid]
# Get the interface name associated with the given ssid.
for interface in interfaces:
iw_output = self.ssh.run(
f"iw dev {interface} info|grep ssid|awk -F' ' '{{print $2}}'"
)
if "command failed: No such device" in iw_output.stderr:
continue
else:
# If the configured ssid is equal to the given ssid, we found
# the right interface.
if iw_output.stdout == ssid:
iw_output = self.ssh.run(
f"iw dev {interface} info|grep addr|awk -F' ' '{{print $2}}'"
)
return iw_output.stdout
return None
def stop_ap(self, identifier: str) -> None:
"""Stops a running ap on this controller.
Args:
identifier: The identify of the ap that should be taken down.
"""
instance = self._aps.get(identifier)
if instance is None:
raise ValueError(f"Invalid identifier {identifier} given")
if self._radvd:
self._radvd.stop()
try:
self.stop_dhcp()
except NoInterfaceError:
pass
self.stop_nat()
instance.hostapd.stop()
self._ip_cmd.clear_ipv4_addresses(identifier)
del self._aps[identifier]
bridge_interfaces = self.interfaces.get_bridge_interface()
for iface in bridge_interfaces:
BRIDGE_DOWN = f"ip link set {iface} down"
BRIDGE_DEL = f"brctl delbr {iface}"
self.ssh.run(BRIDGE_DOWN)
self.ssh.run(BRIDGE_DEL)
def stop_all_aps(self) -> None:
"""Stops all running aps on this device."""
for ap in list(self._aps.keys()):
self.stop_ap(ap)
def close(self) -> None:
"""Called to take down the entire access point.
When called will stop all aps running on this host, shutdown the dhcp
server, and stop the ssh connection.
"""
if self._aps:
self.stop_all_aps()
self.ssh.close()
def generate_bridge_configs(self, channel: int) -> tuple[str, str | None, str]:
"""Generate a list of configs for a bridge between LAN and WLAN.
Args:
channel: the channel WLAN interface is brought up on
iface_lan: the LAN interface to bridge
Returns:
configs: tuple containing iface_wlan, iface_lan and bridge_ip
"""
if channel < 15:
iface_wlan = self.wlan_2g
subnet_str = self._AP_2G_SUBNET_STR
else:
iface_wlan = self.wlan_5g
subnet_str = self._AP_5G_SUBNET_STR
iface_lan = self.lan
a, b, c, _ = subnet_str.strip("/24").split(".")
bridge_ip = f"{a}.{b}.{c}.{BRIDGE_IP_LAST}"
return (iface_wlan, iface_lan, bridge_ip)
def ping(
self,
dest_ip: str,
count: int = 3,
interval: int = 1000,
timeout: int = 1000,
size: int = 56,
additional_ping_params: str = "",
) -> utils.PingResult:
"""Pings from AP to dest_ip, returns dict of ping stats (see utils.ping)"""
return utils.ping(
self.ssh,
dest_ip,
count=count,
interval=interval,
timeout=timeout,
size=size,
additional_ping_params=additional_ping_params,
)
def can_ping(
self,
dest_ip: str,
count: int = 1,
interval: int = 1000,
timeout: int = 1000,
size: int = 56,
additional_ping_params: str = "",
) -> bool:
"""Returns whether ap can ping dest_ip (see utils.can_ping)"""
return utils.can_ping(
self.ssh,
dest_ip,
count=count,
interval=interval,
timeout=timeout,
size=size,
additional_ping_params=additional_ping_params,
)
def hard_power_cycle(
self,
pdus: list[PduDevice],
hostapd_configs: list[HostapdConfig] | None = None,
) -> None:
"""Kills, then restores power to AccessPoint, verifying it goes down and
comes back online cleanly.
Args:
pdus: PDUs in the testbed
hostapd_configs: Hostapd settings. If present, these networks will
be spun up after the AP has rebooted. This list can either
contain HostapdConfig objects, or dictionaries with the start_ap
params
(i.e { 'hostapd_config': <HostapdConfig>,
'setup_bridge': <bool>,
'additional_parameters': <dict> } ).
Raise:
Error, if no PduDevice is provided in AccessPoint config.
ConnectionError, if AccessPoint fails to go offline or come back.
"""
if not self.device_pdu_config:
raise Error("No PduDevice provided in AccessPoint config.")
if hostapd_configs is None:
hostapd_configs = []
self.log.info(f"Power cycling")
ap_pdu, ap_pdu_port = get_pdu_port_for_device(self.device_pdu_config, pdus)
self.log.info(f"Killing power")
ap_pdu.off(ap_pdu_port)
self.log.info("Verifying AccessPoint is unreachable.")
self.ssh_provider.wait_until_unreachable()
self.log.info("AccessPoint is unreachable as expected.")
self._aps.clear()
self.log.info(f"Restoring power")
ap_pdu.on(ap_pdu_port)
self.log.info("Waiting for AccessPoint to become available via SSH.")
self.ssh_provider.wait_until_reachable()
self.log.info("AccessPoint responded to SSH.")
# Allow 5 seconds for OS to finish getting set up
time.sleep(5)
self._initial_ap()
self.log.info("Power cycled successfully")
for settings in hostapd_configs:
if isinstance(settings, HostapdConfig):
config = settings
setup_bridge = False
additional_parameters: dict[str, Any] = {}
elif isinstance(settings, dict):
config = settings["hostapd_config"]
setup_bridge = settings.get("setup_bridge", False)
additional_parameters = settings.get("additional_parameters", {})
else:
raise TypeError(
"Items in hostapd_configs list must either be "
"HostapdConfig objects or dictionaries."
)
self.log.info(f"Restarting network {config.ssid}")
self.start_ap(
config,
setup_bridge=setup_bridge,
additional_parameters=additional_parameters,
)
def channel_switch(self, identifier: str, channel_num: int) -> None:
"""Switch to a different channel on the given AP."""
instance = self._aps.get(identifier)
if instance is None:
raise ValueError(f"Invalid identifier {identifier} given")
self.log.info(f"channel switch to channel {channel_num}")
instance.hostapd.channel_switch(channel_num)
def get_current_channel(self, identifier: str) -> int:
"""Find the current channel on the given AP."""
instance = self._aps.get(identifier)
if instance is None:
raise ValueError(f"Invalid identifier {identifier} given")
return instance.hostapd.get_current_channel()
def get_stas(self, identifier: str) -> set[str]:
"""Return MAC addresses of all associated STAs on the given AP."""
instance = self._aps.get(identifier)
if instance is None:
raise ValueError(f"Invalid identifier {identifier} given")
return instance.hostapd.get_stas()
def sta_authenticated(self, identifier: str, sta_mac: str) -> bool:
"""Is STA authenticated?"""
instance = self._aps.get(identifier)
if instance is None:
raise ValueError(f"Invalid identifier {identifier} given")
return instance.hostapd.sta_authenticated(sta_mac)
def sta_associated(self, identifier: str, sta_mac: str) -> bool:
"""Is STA associated?"""
instance = self._aps.get(identifier)
if instance is None:
raise ValueError(f"Invalid identifier {identifier} given")
return instance.hostapd.sta_associated(sta_mac)
def sta_authorized(self, identifier: str, sta_mac: str) -> bool:
"""Is STA authorized (802.1X controlled port open)?"""
instance = self._aps.get(identifier)
if instance is None:
raise ValueError(f"Invalid identifier {identifier} given")
return instance.hostapd.sta_authorized(sta_mac)
def get_sta_extended_capabilities(
self, identifier: str, sta_mac: str
) -> ExtendedCapabilities:
"""Get extended capabilities for the given STA, as seen by the AP."""
instance = self._aps.get(identifier)
if instance is None:
raise ValueError(f"Invalid identifier {identifier} given")
return instance.hostapd.get_sta_extended_capabilities(sta_mac)
def send_bss_transition_management_req(
self, identifier: str, sta_mac: str, request: BssTransitionManagementRequest
) -> None:
"""Send a BSS Transition Management request to an associated STA."""
instance = self._aps.get(identifier)
if instance is None:
raise ValueError(f"Invalid identifier {identifier} given")
instance.hostapd.send_bss_transition_management_req(sta_mac, request)
def setup_ap(
access_point: AccessPoint,
profile_name: str,
channel: int,
ssid: str,
mode: str | None = None,
preamble: bool | None = None,
beacon_interval: int | None = None,
dtim_period: int | None = None,
frag_threshold: int | None = None,
rts_threshold: int | None = None,
force_wmm: bool | None = None,
hidden: bool | None = False,
security: Security | None = None,
pmf_support: int | None = None,
additional_ap_parameters: dict[str, Any] | None = None,
n_capabilities: list[Any] | None = None,
ac_capabilities: list[Any] | None = None,
vht_bandwidth: int | None = None,
wnm_features: FrozenSet[hostapd_constants.WnmFeature] = frozenset(),
setup_bridge: bool = False,
is_ipv6_enabled: bool = False,
is_nat_enabled: bool = True,
) -> list[str]:
"""Creates a hostapd profile and runs it on an ap. This is a convenience
function that allows us to start an ap with a single function, without first
creating a hostapd config.
Args:
access_point: An ACTS access_point controller
profile_name: The profile name of one of the hostapd ap presets.
channel: What channel to set the AP to.
preamble: Whether to set short or long preamble
beacon_interval: The beacon interval
dtim_period: Length of dtim period
frag_threshold: Fragmentation threshold
rts_threshold: RTS threshold
force_wmm: Enable WMM or not
hidden: Advertise the SSID or not
security: What security to enable.
pmf_support: Whether pmf is not disabled, enabled, or required
additional_ap_parameters: Additional parameters to send the AP.
check_connectivity: Whether to check for internet connectivity.
wnm_features: WNM features to enable on the AP.
setup_bridge: Whether to bridge the LAN interface WLAN interface.
Only one WLAN interface can be bridged with the LAN interface
and none of the guest networks can be bridged.
is_ipv6_enabled: If True, start a IPv6 router advertisement daemon
is_nat_enabled: If True, start NAT on the AP to allow the DUT to be able
to access the internet if the WAN port is connected to the internet.
Returns:
An identifier for each ssid being started. These identifiers can be
used later by this controller to control the ap.
Raises:
Error: When the ap can't be brought up.
"""
if additional_ap_parameters is None:
additional_ap_parameters = {}
ap = create_ap_preset(
profile_name=profile_name,
iface_wlan_2g=access_point.wlan_2g,
iface_wlan_5g=access_point.wlan_5g,
channel=channel,
ssid=ssid,
mode=mode,
short_preamble=preamble,
beacon_interval=beacon_interval,
dtim_period=dtim_period,
frag_threshold=frag_threshold,
rts_threshold=rts_threshold,
force_wmm=force_wmm,
hidden=hidden,
bss_settings=[],
security=security,
pmf_support=pmf_support,
n_capabilities=n_capabilities,
ac_capabilities=ac_capabilities,
vht_bandwidth=vht_bandwidth,
wnm_features=wnm_features,
)
return access_point.start_ap(
hostapd_config=ap,
radvd_config=RadvdConfig() if is_ipv6_enabled else None,
setup_bridge=setup_bridge,
is_nat_enabled=is_nat_enabled,
additional_parameters=additional_ap_parameters,
)
def create(configs: Any) -> list[AccessPoint]:
"""Creates ap controllers from a json config.
Creates an ap controller from either a list, or a single
element. The element can either be just the hostname or a dictionary
containing the hostname and username of the ap to connect to over ssh.
Args:
The json configs that represent this controller.
Returns:
A new AccessPoint.
"""
return [AccessPoint(c) for c in configs]
def destroy(aps: list[AccessPoint]) -> None:
"""Destroys a list of access points.
Args:
aps: The list of access points to destroy.
"""
for ap in aps:
ap.close()
def get_info(aps: list[AccessPoint]) -> list[str]:
"""Get information on a list of access points.
Args:
aps: A list of AccessPoints.
Returns:
A list of all aps hostname.
"""
return [ap.ssh_settings.hostname for ap in aps]