blob: 0b391c4bc2b3fa8846841db5a8d7e2bd070c2205 [file] [log] [blame]
#!/usr/bin/env python3
#
# Copyright 2022 The Fuchsia Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import multiprocessing
import time
from datetime import datetime
from multiprocessing.managers import DictProxy
from typing import Any, Mapping
from uuid import UUID, uuid4
from antlion import signals, tracelogger, utils
from antlion.controllers import iperf_client, iperf_server
from antlion.controllers.access_point import AccessPoint
from antlion.test_utils.abstract_devices.wlan_device import SupportsWLAN
AC_VO = "AC_VO"
AC_VI = "AC_VI"
AC_BE = "AC_BE"
AC_BK = "AC_BK"
# TODO(fxb/61421): Add tests to check all DSCP classes are mapped to the correct
# AC (there are many that aren't included here). Requires implementation of
# sniffer.
DEFAULT_AC_TO_TOS_TAG_MAP = {AC_VO: "0xC0", AC_VI: "0x80", AC_BE: "0x0", AC_BK: "0x20"}
UDP = "udp"
TCP = "tcp"
DEFAULT_IPERF_PORT = 5201
DEFAULT_STREAM_TIME = 10
DEFAULT_IP_ADDR_TIMEOUT = 15
PROCESS_JOIN_TIMEOUT = 60
AVAILABLE = True
UNAVAILABLE = False
class WmmTransceiverError(signals.ControllerError):
pass
def create(
config: Mapping[str, Any],
identifier: str | None = None,
wlan_devices: list[SupportsWLAN] | None = None,
access_points: list[AccessPoint] | None = None,
):
"""Creates a WmmTransceiver from a config.
Args:
config: Config parameters for the transceiver. Contains:
- iperf_config: dict, the config to use for creating IPerfClients and
IPerfServers (excluding port).
- port_range_start: int, the lower bound of the port range to use for
creating IPerfServers. Defaults to 5201.
- wlan_device: string, the identifier of the wlan_device used for this
WmmTransceiver (optional)
identifier: Identifier for the WmmTransceiver. Must be provided either as arg or
in the config.
wlan_devices: WLAN devices from which to get the wlan_device, if any, used as
this transceiver
access_points: Access points from which to get the access_point, if any, used as
this transceiver
"""
try:
# If identifier is not provided as func arg, it must be provided via
# config file.
if not identifier:
identifier = config["identifier"]
iperf_config = config["iperf_config"]
except KeyError as err:
raise WmmTransceiverError(
f"Parameter not provided as func arg, nor found in config: {err}"
)
if wlan_devices is None:
wlan_devices = []
if access_points is None:
access_points = []
port_range_start = config.get("port_range_start", DEFAULT_IPERF_PORT)
wd = None
ap = None
if "wlan_device" in config:
wd = _find_wlan_device(config["wlan_device"], wlan_devices)
elif "access_point" in config:
ap = _find_access_point(config["access_point"], access_points)
return WmmTransceiver(
iperf_config,
identifier,
wlan_device=wd,
access_point=ap,
port_range_start=port_range_start,
)
def _find_wlan_device(
wlan_device_identifier: str, wlan_devices: list[SupportsWLAN]
) -> SupportsWLAN:
"""Returns WLAN device based on string identifier (e.g. ip, serial, etc.)
Args:
wlan_device_identifier: Identifier for the desired WLAN device
wlan_devices: WLAN devices to search through
Returns:
A WLAN device matching wlan_device_identifier
Raises:
WmmTransceiverError, if no WLAN devices matches wlan_device_identifier
"""
for wd in wlan_devices:
if wlan_device_identifier == wd.identifier:
return wd
raise WmmTransceiverError(
f'No WLAN device with identifier "{wlan_device_identifier}"'
)
def _find_access_point(
access_point_ip: str, access_points: list[AccessPoint]
) -> AccessPoint:
"""Returns AccessPoint based on string ip address
Args:
access_point_ip: Control plane IP address of the desired AP
access_points: Access points to search through
Returns:
Access point with hostname matching access_point_ip
Raises:
WmmTransceiverError, if no access points matches access_point_ip
"""
for ap in access_points:
if ap.ssh_settings.hostname == access_point_ip:
return ap
raise WmmTransceiverError(f"No AccessPoint with ip: {access_point_ip}")
class WmmTransceiver(object):
"""Object for handling WMM tagged streams between devices"""
def __init__(
self,
iperf_config,
identifier,
wlan_device=None,
access_point=None,
port_range_start=5201,
):
self.identifier = identifier
self.log = tracelogger.TraceLogger(
WmmTransceiverLoggerAdapter(
logging.getLogger(), {"identifier": self.identifier}
)
)
# WLAN device or AccessPoint, that is used as the transceiver. Only one
# will be set. This helps consolodate association, setup, teardown, etc.
self.wlan_device = wlan_device
self.access_point = access_point
# Parameters used to create IPerfClient and IPerfServer objects on
# device
self._iperf_config = iperf_config
self._test_interface = self._iperf_config.get("test_interface")
self._port_range_start = port_range_start
self._next_server_port = port_range_start
# Maps IPerfClients, used for streams from this device, to True if
# available, False if reserved
self._iperf_clients = {}
# Maps IPerfServers, used to receive streams from other devices, to True
# if available, False if reserved
self._iperf_servers = {}
# Maps ports of servers, which are provided to other transceivers, to
# the actual IPerfServer objects
self._iperf_server_ports = {}
# Maps stream UUIDs to IPerfClients reserved for that streams use
self._reserved_clients = {}
# Maps stream UUIDs to (WmmTransceiver, IPerfServer) tuples, where the
# server is reserved on the transceiver for that streams use
self._reserved_servers = {}
# Maps with shared memory functionality to be used across the parallel
# streams. active_streams holds UUIDs of streams that are currently
# running on this device (mapped to True, since there is no
# multiprocessing set). stream_results maps UUIDs of streams completed
# on this device to IPerfResult results for that stream.
self._manager = multiprocessing.Manager()
self._active_streams = self._manager.dict()
self._stream_results = self._manager.dict()
# Holds parameters for streams that are prepared to run asynchronously
# (i.e. resources have been allocated). Maps UUIDs of the future streams
# to a dict, containing the stream parameters.
self._pending_async_streams = {}
# Set of UUIDs of asynchronous streams that have at least started, but
# have not had their resources reclaimed yet
self._ran_async_streams = set()
# Set of stream parallel process, which can be joined if completed
# successfully, or terminated and joined in the event of an error
self._running_processes = set()
def run_synchronous_traffic_stream(self, stream_parameters, subnet):
"""Runs a traffic stream with IPerf3 between two WmmTransceivers and
saves the results.
Args:
stream_parameters: dict, containing parameters to used for the
stream. See _parse_stream_parameters for details.
subnet: string, the subnet of the network to use for the stream
Returns:
uuid: UUID object, identifier of the stream
"""
(
receiver,
access_category,
bandwidth,
stream_time,
) = self._parse_stream_parameters(stream_parameters)
uuid = uuid4()
(client, server_ip, server_port) = self._get_stream_resources(
uuid, receiver, subnet
)
self._validate_server_address(server_ip, uuid)
self.log.info(
f"Running synchronous stream to {receiver.identifier} WmmTransceiver"
)
self._run_traffic(
uuid,
client,
server_ip,
server_port,
self._active_streams,
self._stream_results,
access_category=access_category,
bandwidth=bandwidth,
stream_time=stream_time,
)
self._return_stream_resources(uuid)
return uuid
def prepare_asynchronous_stream(self, stream_parameters, subnet):
"""Reserves resources and saves configs for upcoming asynchronous
traffic streams, so they can be started more simultaneously.
Args:
stream_parameters: dict, containing parameters to used for the
stream. See _parse_stream_parameters for details.
subnet: string, the subnet of the network to use for the stream
Returns:
uuid: UUID object, identifier of the stream
"""
(receiver, access_category, bandwidth, time) = self._parse_stream_parameters(
stream_parameters
)
uuid = uuid4()
(client, server_ip, server_port) = self._get_stream_resources(
uuid, receiver, subnet
)
self._validate_server_address(server_ip, uuid)
pending_stream_config = {
"client": client,
"server_ip": server_ip,
"server_port": server_port,
"access_category": access_category,
"bandwidth": bandwidth,
"time": time,
}
self._pending_async_streams[uuid] = pending_stream_config
self.log.info(f"Stream to {receiver.identifier} WmmTransceiver prepared.")
return uuid
def start_asynchronous_streams(self, start_time=None):
"""Starts pending asynchronous streams between two WmmTransceivers as
parallel processes.
Args:
start_time: float, time, seconds since epoch, at which to start the
stream (for better synchronicity). If None, start immediately.
"""
for uuid in self._pending_async_streams:
pending_stream_config = self._pending_async_streams[uuid]
client = pending_stream_config["client"]
server_ip = pending_stream_config["server_ip"]
server_port = pending_stream_config["server_port"]
access_category = pending_stream_config["access_category"]
bandwidth = pending_stream_config["bandwidth"]
time = pending_stream_config["time"]
process = multiprocessing.Process(
target=self._run_traffic,
args=[
uuid,
client,
server_ip,
server_port,
self._active_streams,
self._stream_results,
],
kwargs={
"access_category": access_category,
"bandwidth": bandwidth,
"stream_time": time,
"start_time": start_time,
},
)
# This needs to be set here to ensure its marked active before
# it even starts.
self._active_streams[uuid] = True
process.start()
self._ran_async_streams.add(uuid)
self._running_processes.add(process)
self._pending_async_streams.clear()
def cleanup_asynchronous_streams(self, timeout=PROCESS_JOIN_TIMEOUT):
"""Releases reservations on resources (IPerfClients and IPerfServers)
that were held for asynchronous streams, both pending and finished.
Attempts to join any running processes, logging an error if timeout is
exceeded.
Args:
timeout: time, in seconds, to wait for each running process, if any,
to join
"""
self.log.info("Cleaning up any asynchronous streams.")
# Releases resources for any streams that were prepared, but no run
for uuid in self._pending_async_streams:
self.log.error(f"Pending asynchronous stream {uuid} never ran. Cleaning.")
self._return_stream_resources(uuid)
self._pending_async_streams.clear()
# Attempts to join any running streams, terminating them after timeout
# if necessary.
while self._running_processes:
process = self._running_processes.pop()
process.join(timeout)
if process.is_alive():
self.log.error(
f"Stream process failed to join in {timeout} seconds. Terminating."
)
process.terminate()
process.join()
self._active_streams.clear()
# Release resources for any finished streams
while self._ran_async_streams:
uuid = self._ran_async_streams.pop()
self._return_stream_resources(uuid)
def get_results(self, uuid):
"""Retrieves a streams IPerfResults from stream_results
Args:
uuid: UUID object, identifier of the stream
"""
return self._stream_results.get(uuid, None)
def destroy_resources(self):
for server in self._iperf_servers:
server.stop()
self._iperf_servers.clear()
self._iperf_server_ports.clear()
self._iperf_clients.clear()
self._next_server_port = self._port_range_start
self._stream_results.clear()
@property
def has_active_streams(self):
return bool(self._active_streams)
# Helper Functions
def _run_traffic(
self,
uuid: UUID,
client: iperf_client.IPerfClientBase,
server_ip: str,
server_port: int,
active_streams: DictProxy[Any, Any],
stream_results: DictProxy[Any, Any],
access_category: str | None = None,
bandwidth: int | None = None,
stream_time: int = DEFAULT_STREAM_TIME,
start_time: float | None = None,
):
"""Runs an iperf3 stream.
1. Adds stream UUID to active_streams
2. Runs stream
3. Saves results to stream_results
4. Removes stream UUID from active_streams
Args:
uuid: Identifier for stream
client: IPerfClient object on device
server_ip: IP address of IPerfServer for stream
server_port: port of the IPerfServer for stream
active_streams: holds stream UUIDs of active streams on the device
stream_results: maps stream UUIDs of streams to IPerfResult objects
access_category: WMM access category to use with iperf (AC_BK, AC_BE, AC_VI,
AC_VO). Unset if None.
bandwidth: Bandwidth in mbps to use with iperf. Implies UDP. Unlimited if
None.
stream_time: Time in seconds, to run iperf stream
start_time: Time, seconds since epoch, at which to start the stream (for
better synchronicity). If None, start immediately.
"""
active_streams[uuid] = True
ac_flag = ""
bandwidth_flag = ""
time_flag = f"-t {stream_time}"
if access_category:
ac_flag = f" -S {DEFAULT_AC_TO_TOS_TAG_MAP[access_category]}"
if bandwidth:
bandwidth_flag = f" -u -b {bandwidth}M"
iperf_flags = f"-p {server_port} -i 1 {time_flag}{ac_flag}{bandwidth_flag} -J"
if not start_time:
start_time = time.time()
time_str = datetime.fromtimestamp(start_time).strftime("%H:%M:%S.%f")
self.log.info(
"At %s, starting %s second stream to %s:%s with (AC: %s, Bandwidth: %s)"
% (
time_str,
stream_time,
server_ip,
server_port,
access_category,
bandwidth if bandwidth else "Unlimited",
)
)
# If present, wait for stream start time
if start_time:
current_time = time.time()
while current_time < start_time:
current_time = time.time()
path = client.start(server_ip, iperf_flags, f"{uuid}")
stream_results[uuid] = iperf_server.IPerfResult(
path, reporting_speed_units="mbps"
)
active_streams.pop(uuid)
def _get_stream_resources(self, uuid, receiver, subnet):
"""Reserves an IPerfClient and IPerfServer for a stream.
Args:
uuid: UUID object, identifier of the stream
receiver: WmmTransceiver object, which will be the streams receiver
subnet: string, subnet of test network, to retrieve the appropriate
server address
Returns:
(IPerfClient, string, int) representing the client, server address,
and server port to use for the stream
"""
client = self._get_client(uuid)
server_ip, server_port = self._get_server(receiver, uuid, subnet)
return (client, server_ip, server_port)
def _return_stream_resources(self, uuid):
"""Releases reservations on a streams IPerfClient and IPerfServer, so
they can be used by a future stream.
Args:
uuid: UUID object, identifier of the stream
"""
if uuid in self._active_streams:
raise EnvironmentError(f"Resource still being used by stream {uuid}")
(receiver, server_port) = self._reserved_servers.pop(uuid)
receiver._release_server(server_port)
client = self._reserved_clients.pop(uuid)
self._iperf_clients[client] = AVAILABLE
def _get_client(self, uuid):
"""Retrieves and reserves IPerfClient for use in a stream. If none are
available, a new one is created.
Args:
uuid: UUID object, identifier for stream, used to link client to
stream for teardown
Returns:
IPerfClient on device
"""
reserved_client = None
for client in self._iperf_clients:
if self._iperf_clients[client] == AVAILABLE:
reserved_client = client
break
else:
reserved_client = iperf_client.create([self._iperf_config])[0]
self._iperf_clients[reserved_client] = UNAVAILABLE
self._reserved_clients[uuid] = reserved_client
return reserved_client
def _get_server(self, receiver, uuid, subnet):
"""Retrieves the address and port of a reserved IPerfServer object from
the receiver object for use in a stream.
Args:
receiver: WmmTransceiver, to get an IPerfServer from
uuid: UUID, identifier for stream, used to link server to stream
for teardown
subnet: string, subnet of test network, to retrieve the appropriate
server address
Returns:
(string, int) representing the IPerfServer address and port
"""
(server_ip, server_port) = receiver._reserve_server(subnet)
self._reserved_servers[uuid] = (receiver, server_port)
return (server_ip, server_port)
def _reserve_server(self, subnet):
"""Reserves an available IPerfServer for use in a stream from another
WmmTransceiver. If none are available, a new one is created.
Args:
subnet: string, subnet of test network, to retrieve the appropriate
server address
Returns:
(string, int) representing the IPerfServer address and port
"""
reserved_server = None
for server in self._iperf_servers:
if self._iperf_servers[server] == AVAILABLE:
reserved_server = server
break
else:
iperf_server_config = self._iperf_config
iperf_server_config.update({"port": self._next_server_port})
self._next_server_port += 1
reserved_server = iperf_server.create([iperf_server_config])[0]
self._iperf_server_ports[reserved_server.port] = reserved_server
self._iperf_servers[reserved_server] = UNAVAILABLE
reserved_server.start()
end_time = time.time() + DEFAULT_IP_ADDR_TIMEOUT
while time.time() < end_time:
if self.wlan_device:
addresses = utils.get_interface_ip_addresses(
self.wlan_device.device, self._test_interface
)
else:
addresses = reserved_server.get_interface_ip_addresses(
self._test_interface
)
for addr in addresses["ipv4_private"]:
if utils.ip_in_subnet(addr, subnet):
return (addr, reserved_server.port)
raise AttributeError(
f"Reserved server has no ipv4 address in the {subnet} subnet"
)
def _release_server(self, server_port):
"""Releases reservation on IPerfServer, which was held for a stream
from another WmmTransceiver.
Args:
server_port: int, the port of the IPerfServer being returned (since)
it is the identifying characteristic
"""
server = self._iperf_server_ports[server_port]
server.stop()
self._iperf_servers[server] = AVAILABLE
def _validate_server_address(self, server_ip, uuid, timeout=60):
"""Verifies server address can be pinged before attempting to run
traffic, since iperf is unforgiving when the server is unreachable.
Args:
server_ip: string, ip address of the iperf server
uuid: string, uuid of the stream to use this server
timeout: int, time in seconds to wait for server to respond to pings
Raises:
WmmTransceiverError, if, after timeout, server ip is unreachable.
"""
self.log.info(f"Verifying server address ({server_ip}) is reachable.")
end_time = time.time() + timeout
while time.time() < end_time:
if self.can_ping(server_ip):
break
else:
self.log.debug(
"Could not ping server address (%s). Retrying in 1 second."
% (server_ip)
)
time.sleep(1)
else:
self._return_stream_resources(uuid)
raise WmmTransceiverError(f"IPerfServer address ({server_ip}) unreachable.")
def can_ping(self, dest_ip):
"""Utilizes can_ping function in wlan_device or access_point device to
ping dest_ip
Args:
dest_ip: string, ip address to ping
Returns:
True, if dest address is reachable
False, otherwise
"""
if self.wlan_device:
return self.wlan_device.can_ping(dest_ip)
else:
return self.access_point.can_ping(dest_ip)
def _parse_stream_parameters(self, stream_parameters):
"""Parses stream_parameters from dictionary.
Args:
stream_parameters: dict of stream parameters
'receiver': WmmTransceiver, the receiver for the stream
'access_category': String, the access category to use for the
stream. Unset if None.
'bandwidth': int, bandwidth in mbps for the stream. If set,
implies UDP. If unset, implies TCP and unlimited bandwidth.
'time': int, time in seconds to run stream.
Returns:
(receiver, access_category, bandwidth, time) as
(WmmTransceiver, String, int, int)
"""
receiver = stream_parameters["receiver"]
access_category = stream_parameters.get("access_category", None)
bandwidth = stream_parameters.get("bandwidth", None)
time = stream_parameters.get("time", DEFAULT_STREAM_TIME)
return (receiver, access_category, bandwidth, time)
class WmmTransceiverLoggerAdapter(logging.LoggerAdapter):
def process(self, msg, kwargs):
if self.extra is not None and "identifier" in self.extra:
log_identifier = f" | {self.extra['identifier']}"
else:
log_identifier = ""
msg = f"[WmmTransceiver{log_identifier}] {msg}"
return super().process(msg, kwargs)