| #!/usr/bin/env python3 |
| # |
| # Copyright 2022 The Fuchsia Authors |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| import json |
| import logging |
| import math |
| import os |
| import shlex |
| import subprocess |
| import threading |
| import time |
| |
| from antlion import context |
| from antlion import logger as acts_logger |
| from antlion import utils |
| from antlion.controllers.android_device import AndroidDevice |
| from antlion.controllers.utils_lib.ssh import connection, settings |
| from antlion.event import event_bus |
| from antlion.event.decorators import subscribe_static |
| from antlion.event.event import TestClassBeginEvent, TestClassEndEvent |
| from antlion.libs.proc import job |
| |
| MOBLY_CONTROLLER_CONFIG_NAME = "IPerfServer" |
| ACTS_CONTROLLER_REFERENCE_NAME = "iperf_servers" |
| KILOBITS = 1024 |
| MEGABITS = KILOBITS * 1024 |
| GIGABITS = MEGABITS * 1024 |
| BITS_IN_BYTE = 8 |
| |
| |
| def create(configs): |
| """Factory method for iperf servers. |
| |
| The function creates iperf servers based on at least one config. |
| If configs only specify a port number, a regular local IPerfServer object |
| will be created. If configs contains ssh settings or and AndroidDevice, |
| remote iperf servers will be started on those devices |
| |
| Args: |
| configs: config parameters for the iperf server |
| """ |
| results = [] |
| for c in configs: |
| if type(c) in (str, int) and str(c).isdigit(): |
| results.append(IPerfServer(int(c))) |
| elif type(c) is dict and "AndroidDevice" in c and "port" in c: |
| results.append(IPerfServerOverAdb(c["AndroidDevice"], c["port"])) |
| elif type(c) is dict and "ssh_config" in c and "port" in c: |
| results.append( |
| IPerfServerOverSsh( |
| settings.from_config(c["ssh_config"]), |
| c["port"], |
| test_interface=c.get("test_interface"), |
| use_killall=c.get("use_killall"), |
| ) |
| ) |
| else: |
| raise ValueError( |
| "Config entry %s in %s is not a valid IPerfServer " |
| "config." % (repr(c), configs) |
| ) |
| return results |
| |
| |
| def get_info(iperf_servers): |
| """Placeholder for info about iperf servers |
| |
| Returns: |
| None |
| """ |
| return None |
| |
| |
| def destroy(iperf_server_list): |
| for iperf_server in iperf_server_list: |
| try: |
| iperf_server.stop() |
| except Exception: |
| logging.exception(f"Unable to properly clean up {iperf_server}.") |
| |
| |
| class IPerfResult(object): |
| def __init__(self, result_path, reporting_speed_units="Mbytes"): |
| """Loads iperf result from file. |
| |
| Loads iperf result from JSON formatted server log. File can be accessed |
| before or after server is stopped. Note that only the first JSON object |
| will be loaded and this funtion is not intended to be used with files |
| containing multiple iperf client runs. |
| """ |
| # if result_path isn't a path, treat it as JSON |
| self.reporting_speed_units = reporting_speed_units |
| if not os.path.exists(result_path): |
| self.result = json.loads(result_path) |
| else: |
| try: |
| with open(result_path, "r") as f: |
| iperf_output = f.readlines() |
| if "}\n" in iperf_output: |
| iperf_output = iperf_output[: iperf_output.index("}\n") + 1] |
| iperf_string = "".join(iperf_output) |
| iperf_string = iperf_string.replace("nan", "0") |
| self.result = json.loads(iperf_string) |
| except ValueError: |
| with open(result_path, "r") as f: |
| # Possibly a result from interrupted iperf run, |
| # skip first line and try again. |
| lines = f.readlines()[1:] |
| self.result = json.loads("".join(lines)) |
| |
| def _has_data(self): |
| """Checks if the iperf result has valid throughput data. |
| |
| Returns: |
| True if the result contains throughput data. False otherwise. |
| """ |
| return ("end" in self.result) and ( |
| "sum_received" in self.result["end"] or "sum" in self.result["end"] |
| ) |
| |
| def _get_reporting_speed( |
| self, network_speed_in_bits_per_second: int | float |
| ) -> float: |
| """Sets the units for the network speed reporting based on how the |
| object was initiated. Defaults to Megabytes per second. Currently |
| supported, bits per second (bits), kilobits per second (kbits), megabits |
| per second (mbits), gigabits per second (gbits), bytes per second |
| (bytes), kilobits per second (kbytes), megabits per second (mbytes), |
| gigabytes per second (gbytes). |
| |
| Args: |
| network_speed_in_bits_per_second: The network speed from iperf in |
| bits per second. |
| |
| Returns: |
| The value of the throughput in the appropriate units. |
| """ |
| speed_divisor = 1 |
| if self.reporting_speed_units[1:].lower() == "bytes": |
| speed_divisor = speed_divisor * BITS_IN_BYTE |
| if self.reporting_speed_units[0:1].lower() == "k": |
| speed_divisor = speed_divisor * KILOBITS |
| if self.reporting_speed_units[0:1].lower() == "m": |
| speed_divisor = speed_divisor * MEGABITS |
| if self.reporting_speed_units[0:1].lower() == "g": |
| speed_divisor = speed_divisor * GIGABITS |
| return network_speed_in_bits_per_second / speed_divisor |
| |
| def get_json(self): |
| """Returns the raw json output from iPerf.""" |
| return self.result |
| |
| @property |
| def error(self): |
| return self.result.get("error", None) |
| |
| @property |
| def avg_rate(self): |
| """Average UDP rate in MB/s over the entire run. |
| |
| This is the average UDP rate observed at the terminal the iperf result |
| is pulled from. According to iperf3 documentation this is calculated |
| based on bytes sent and thus is not a good representation of the |
| quality of the link. If the result is not from a success run, this |
| property is None. |
| """ |
| if not self._has_data() or "sum" not in self.result["end"]: |
| return None |
| bps = self.result["end"]["sum"]["bits_per_second"] |
| return self._get_reporting_speed(bps) |
| |
| @property |
| def avg_receive_rate(self): |
| """Average receiving rate in MB/s over the entire run. |
| |
| This data may not exist if iperf was interrupted. If the result is not |
| from a success run, this property is None. |
| """ |
| if not self._has_data() or "sum_received" not in self.result["end"]: |
| return None |
| bps = self.result["end"]["sum_received"]["bits_per_second"] |
| return self._get_reporting_speed(bps) |
| |
| @property |
| def avg_send_rate(self): |
| """Average sending rate in MB/s over the entire run. |
| |
| This data may not exist if iperf was interrupted. If the result is not |
| from a success run, this property is None. |
| """ |
| if not self._has_data() or "sum_sent" not in self.result["end"]: |
| return None |
| bps = self.result["end"]["sum_sent"]["bits_per_second"] |
| return self._get_reporting_speed(bps) |
| |
| @property |
| def instantaneous_rates(self): |
| """Instantaneous received rate in MB/s over entire run. |
| |
| This data may not exist if iperf was interrupted. If the result is not |
| from a success run, this property is None. |
| """ |
| if not self._has_data(): |
| return None |
| intervals = [ |
| self._get_reporting_speed(interval["sum"]["bits_per_second"]) |
| for interval in self.result["intervals"] |
| ] |
| return intervals |
| |
| @property |
| def std_deviation(self): |
| """Standard deviation of rates in MB/s over entire run. |
| |
| This data may not exist if iperf was interrupted. If the result is not |
| from a success run, this property is None. |
| """ |
| return self.get_std_deviation(0) |
| |
| def get_std_deviation(self, iperf_ignored_interval): |
| """Standard deviation of rates in MB/s over entire run. |
| |
| This data may not exist if iperf was interrupted. If the result is not |
| from a success run, this property is None. A configurable number of |
| beginning (and the single last) intervals are ignored in the |
| calculation as they are inaccurate (e.g. the last is from a very small |
| interval) |
| |
| Args: |
| iperf_ignored_interval: number of iperf interval to ignored in |
| calculating standard deviation |
| |
| Returns: |
| The standard deviation. |
| """ |
| if not self._has_data(): |
| return None |
| instantaneous_rates = self.instantaneous_rates[iperf_ignored_interval:-1] |
| avg_rate = math.fsum(instantaneous_rates) / len(instantaneous_rates) |
| sqd_deviations = [(rate - avg_rate) ** 2 for rate in instantaneous_rates] |
| std_dev = math.sqrt(math.fsum(sqd_deviations) / (len(sqd_deviations) - 1)) |
| return std_dev |
| |
| |
| class IPerfServerBase(object): |
| # Keeps track of the number of IPerfServer logs to prevent file name |
| # collisions. |
| __log_file_counter = 0 |
| |
| __log_file_lock = threading.Lock() |
| |
| def __init__(self, port: int): |
| self._port = port |
| # TODO(markdr): We shouldn't be storing the log files in an array like |
| # this. Nobody should be reading this property either. Instead, the |
| # IPerfResult should be returned in stop() with all the necessary info. |
| # See aosp/1012824 for a WIP implementation. |
| self.log_files = [] |
| |
| @property |
| def port(self) -> int: |
| raise NotImplementedError("port must be specified.") |
| |
| @property |
| def started(self) -> bool: |
| raise NotImplementedError("started must be specified.") |
| |
| def start(self, extra_args: int = "", tag: str = "") -> None: |
| """Starts an iperf3 server. |
| |
| Args: |
| extra_args: Extra arguments to start iperf server with. |
| tag: Appended to log file name to identify logs from different |
| iperf runs. |
| """ |
| raise NotImplementedError("start() must be specified.") |
| |
| def stop(self) -> str: |
| """Stops the iperf server. |
| |
| Returns: |
| The name of the log file generated from the terminated session. |
| """ |
| raise NotImplementedError("stop() must be specified.") |
| |
| def _get_full_file_path(self, tag: str | None = None) -> str: |
| """Returns the full file path for the IPerfServer log file. |
| |
| Note: If the directory for the file path does not exist, it will be |
| created. |
| |
| Args: |
| tag: The tag passed in to the server run. |
| """ |
| out_dir = self.log_path |
| |
| with IPerfServerBase.__log_file_lock: |
| tags = [tag, IPerfServerBase.__log_file_counter] |
| out_file_name = "IPerfServer,%s.log" % ( |
| ",".join([str(x) for x in tags if x != "" and x is not None]) |
| ) |
| IPerfServerBase.__log_file_counter += 1 |
| |
| file_path = os.path.join(out_dir, out_file_name) |
| self.log_files.append(file_path) |
| return file_path |
| |
| @property |
| def log_path(self) -> str: |
| current_context = context.get_current_context() |
| full_out_dir = os.path.join( |
| current_context.get_full_output_path(), f"IPerfServer{self.port}" |
| ) |
| |
| # Ensure the directory exists. |
| os.makedirs(full_out_dir, exist_ok=True) |
| |
| return full_out_dir |
| |
| |
| def _get_port_from_ss_output(ss_output, pid): |
| pid = str(pid) |
| lines = ss_output.split("\n") |
| for line in lines: |
| if pid in line: |
| # Expected format: |
| # tcp LISTEN 0 5 *:<PORT> *:* users:(("cmd",pid=<PID>,fd=3)) |
| return line.split()[4].split(":")[-1] |
| else: |
| raise ProcessLookupError("Could not find started iperf3 process.") |
| |
| |
| class IPerfServer(IPerfServerBase): |
| """Class that handles iperf server commands on localhost.""" |
| |
| def __init__(self, port=5201): |
| super().__init__(port) |
| self._hinted_port = port |
| self._current_log_file = None |
| self._iperf_process = None |
| self._last_opened_file = None |
| |
| @property |
| def port(self): |
| return self._port |
| |
| @property |
| def started(self): |
| return self._iperf_process is not None |
| |
| def start(self, extra_args="", tag=""): |
| """Starts iperf server on local machine. |
| |
| Args: |
| extra_args: A string representing extra arguments to start iperf |
| server with. |
| tag: Appended to log file name to identify logs from different |
| iperf runs. |
| """ |
| if self._iperf_process is not None: |
| return |
| |
| self._current_log_file = self._get_full_file_path(tag) |
| |
| # Run an iperf3 server on the hinted port with JSON output. |
| command = ["iperf3", "-s", "-p", str(self._hinted_port), "-J"] |
| |
| command.extend(shlex.split(extra_args)) |
| |
| if self._last_opened_file: |
| self._last_opened_file.close() |
| self._last_opened_file = open(self._current_log_file, "w") |
| self._iperf_process = subprocess.Popen( |
| command, stdout=self._last_opened_file, stderr=subprocess.DEVNULL |
| ) |
| for attempts_left in reversed(range(3)): |
| try: |
| self._port = int( |
| _get_port_from_ss_output( |
| job.run("ss -l -p -n | grep iperf").stdout, |
| self._iperf_process.pid, |
| ) |
| ) |
| break |
| except ProcessLookupError: |
| if attempts_left == 0: |
| raise |
| logging.debug("iperf3 process not started yet.") |
| time.sleep(0.01) |
| |
| def stop(self): |
| """Stops the iperf server. |
| |
| Returns: |
| The name of the log file generated from the terminated session. |
| """ |
| if self._iperf_process is None: |
| return |
| |
| if self._last_opened_file: |
| self._last_opened_file.close() |
| self._last_opened_file = None |
| |
| self._iperf_process.terminate() |
| self._iperf_process = None |
| |
| return self._current_log_file |
| |
| def __del__(self): |
| self.stop() |
| |
| |
| class IPerfServerOverSsh(IPerfServerBase): |
| """Class that handles iperf3 operations on remote machines.""" |
| |
| def __init__( |
| self, |
| ssh_settings: settings.SshSettings, |
| port: int, |
| test_interface: str | None = None, |
| use_killall: bool = False, |
| ): |
| super().__init__(port) |
| self.ssh_settings = ssh_settings |
| self.log = acts_logger.create_tagged_trace_logger( |
| f"IPerfServer | {self.ssh_settings.hostname}" |
| ) |
| self._ssh_session = None |
| self.start_ssh() |
| |
| self._iperf_pid = None |
| self._current_tag = None |
| self.hostname = self.ssh_settings.hostname |
| self._use_killall = str(use_killall).lower() == "true" |
| |
| if test_interface: |
| self.test_interface = test_interface |
| else: |
| # A test interface can only be found if an ip address is specified. |
| # A fully qualified hostname will get_interface_based_on_ip to fail, |
| # raising RuntimeError. |
| self.test_interface = utils.get_interface_based_on_ip( |
| self._ssh_session, self.hostname |
| ) |
| |
| @property |
| def port(self) -> int: |
| return self._port |
| |
| @property |
| def started(self) -> bool: |
| return self._iperf_pid is not None |
| |
| def _get_remote_log_path(self) -> str: |
| return f"/tmp/iperf_server_port{self.port}.log" |
| |
| def get_interface_ip_addresses(self, interface: str) -> dict[str, list[str]]: |
| """Gets all of the ip addresses, ipv4 and ipv6, associated with a |
| particular interface name. |
| |
| Args: |
| interface: The interface name on the device, ie eth0 |
| |
| Returns: |
| A list of dictionaries of the various IP addresses. See |
| utils.get_interface_ip_addresses. |
| """ |
| if not self._ssh_session: |
| self.start_ssh() |
| |
| return utils.get_interface_ip_addresses(self._ssh_session, interface) |
| |
| def renew_test_interface_ip_address(self) -> None: |
| """Renews the test interface's IPv4 address. |
| |
| Necessary for changing DHCP scopes during a test. |
| """ |
| if not self._ssh_session: |
| self.start_ssh() |
| utils.renew_linux_ip_address(self._ssh_session, self.test_interface) |
| |
| def get_addr( |
| self, addr_type: str = "ipv4_private", timeout_sec: int | None = None |
| ) -> str: |
| """Wait until a type of IP address on the test interface is available |
| then return it. |
| """ |
| if not self._ssh_session: |
| self.start_ssh() |
| return utils.get_addr( |
| self._ssh_session, self.test_interface, addr_type, timeout_sec |
| ) |
| |
| def _cleanup_iperf_port(self) -> None: |
| """Checks and kills zombie iperf servers occupying intended port.""" |
| iperf_check_cmd = ( |
| "netstat -tulpn | grep LISTEN | grep iperf3" " | grep :{}" |
| ).format(self.port) |
| iperf_check = self._ssh_session.run(iperf_check_cmd, ignore_status=True) |
| iperf_check = iperf_check.stdout |
| if iperf_check: |
| logging.debug(f"Killing zombie server on port {self.port}") |
| iperf_pid = iperf_check.split(" ")[-1].split("/")[0] |
| self._ssh_session.run(f"kill -9 {str(iperf_pid)}") |
| |
| def start( |
| self, extra_args: str = "", tag: str = "", iperf_binary: str | None = None |
| ) -> None: |
| """Starts iperf server on specified machine and port. |
| |
| Args: |
| extra_args: Extra arguments to start iperf server with. |
| tag: Appended to log file name to identify logs from different |
| iperf runs. |
| iperf_binary: Location of iperf3 binary. If none, it is assumed the |
| the binary is in the path. |
| """ |
| if self.started: |
| return |
| |
| if not self._ssh_session: |
| self.start_ssh() |
| self._cleanup_iperf_port() |
| if not iperf_binary: |
| logging.debug( |
| "No iperf3 binary specified. " "Assuming iperf3 is in the path." |
| ) |
| iperf_binary = "iperf3" |
| else: |
| logging.debug(f"Using iperf3 binary located at {iperf_binary}") |
| iperf_command = f"{iperf_binary} -s -J -p {self.port}" |
| |
| cmd = f"{iperf_command} {extra_args} > {self._get_remote_log_path()}" |
| |
| job_result = self._ssh_session.run_async(cmd) |
| self._iperf_pid = job_result.stdout |
| self._current_tag = tag |
| |
| def stop(self) -> str: |
| """Stops the iperf server. |
| |
| Returns: |
| The name of the log file generated from the terminated session. |
| """ |
| if not self.started: |
| return |
| |
| if self._use_killall: |
| self._ssh_session.run("killall iperf3", ignore_status=True) |
| else: |
| self._ssh_session.run_async(f"kill -9 {str(self._iperf_pid)}") |
| |
| iperf_result = self._ssh_session.run(f"cat {self._get_remote_log_path()}") |
| |
| log_file = self._get_full_file_path(self._current_tag) |
| with open(log_file, "w") as f: |
| f.write(iperf_result.stdout) |
| |
| self._ssh_session.run_async(f"rm {self._get_remote_log_path()}") |
| self._iperf_pid = None |
| return log_file |
| |
| def start_ssh(self) -> None: |
| """Starts an ssh session to the iperf server.""" |
| if not self._ssh_session: |
| self._ssh_session = connection.SshConnection(self.ssh_settings) |
| |
| def close_ssh(self) -> None: |
| """Closes the ssh session to the iperf server, if one exists, preventing |
| connection reset errors when rebooting server device. |
| """ |
| if self.started: |
| self.stop() |
| if self._ssh_session: |
| self._ssh_session.close() |
| self._ssh_session = None |
| |
| |
| # TODO(markdr): Remove this after automagic controller creation has been |
| # removed. |
| class _AndroidDeviceBridge(object): |
| """A helper class for connecting serial numbers to AndroidDevices.""" |
| |
| _test_class = None |
| |
| @staticmethod |
| @subscribe_static(TestClassBeginEvent) |
| def on_test_begin(event): |
| _AndroidDeviceBridge._test_class = event.test_class |
| |
| @staticmethod |
| @subscribe_static(TestClassEndEvent) |
| def on_test_end(_): |
| _AndroidDeviceBridge._test_class = None |
| |
| @staticmethod |
| def android_devices(): |
| """A dict of serial -> AndroidDevice, where AndroidDevice is a device |
| found in the current TestClass's controllers. |
| """ |
| if not _AndroidDeviceBridge._test_class: |
| return {} |
| return { |
| device.serial: device |
| for device in _AndroidDeviceBridge._test_class.android_devices |
| } |
| |
| |
| event_bus.register_subscription(_AndroidDeviceBridge.on_test_begin.subscription) |
| event_bus.register_subscription(_AndroidDeviceBridge.on_test_end.subscription) |
| |
| |
| class IPerfServerOverAdb(IPerfServerBase): |
| """Class that handles iperf3 operations over ADB devices.""" |
| |
| def __init__(self, android_device_or_serial, port): |
| """Creates a new IPerfServerOverAdb object. |
| |
| Args: |
| android_device_or_serial: Either an AndroidDevice object, or the |
| serial that corresponds to the AndroidDevice. Note that the |
| serial must be present in an AndroidDevice entry in the ACTS |
| config. |
| port: The port number to open the iperf server on. |
| """ |
| super().__init__(port) |
| self._android_device_or_serial = android_device_or_serial |
| |
| self._iperf_process = None |
| self._current_tag = "" |
| |
| @property |
| def port(self): |
| return self._port |
| |
| @property |
| def started(self): |
| return self._iperf_process is not None |
| |
| @property |
| def _android_device(self): |
| if isinstance(self._android_device_or_serial, AndroidDevice): |
| return self._android_device_or_serial |
| else: |
| return _AndroidDeviceBridge.android_devices()[ |
| self._android_device_or_serial |
| ] |
| |
| def _get_device_log_path(self): |
| return f"~/data/iperf_server_port{self.port}.log" |
| |
| def start(self, extra_args="", tag="", iperf_binary=None): |
| """Starts iperf server on an ADB device. |
| |
| Args: |
| extra_args: A string representing extra arguments to start iperf |
| server with. |
| tag: Appended to log file name to identify logs from different |
| iperf runs. |
| iperf_binary: Location of iperf3 binary. If none, it is assumed the |
| the binary is in the path. |
| """ |
| if self._iperf_process is not None: |
| return |
| |
| if not iperf_binary: |
| logging.debug( |
| "No iperf3 binary specified. " "Assuming iperf3 is in the path." |
| ) |
| iperf_binary = "iperf3" |
| else: |
| logging.debug(f"Using iperf3 binary located at {iperf_binary}") |
| iperf_command = f"{iperf_binary} -s -J -p {self.port}" |
| |
| self._iperf_process = self._android_device.adb.shell_nb( |
| f"{iperf_command} {extra_args} > {self._get_device_log_path()} 2>&1" |
| ) |
| |
| self._iperf_process_adb_pid = "" |
| while len(self._iperf_process_adb_pid) == 0: |
| self._iperf_process_adb_pid = self._android_device.adb.shell( |
| "pgrep iperf3 -n" |
| ) |
| |
| self._current_tag = tag |
| |
| def stop(self): |
| """Stops the iperf server. |
| |
| Returns: |
| The name of the log file generated from the terminated session. |
| """ |
| if self._iperf_process is None: |
| return |
| |
| job.run(f"kill -9 {self._iperf_process.pid}") |
| |
| # TODO(markdr): update with definitive kill method |
| while True: |
| iperf_process_list = self._android_device.adb.shell("pgrep iperf3") |
| if iperf_process_list.find(self._iperf_process_adb_pid) == -1: |
| break |
| else: |
| self._android_device.adb.shell(f"kill -9 {self._iperf_process_adb_pid}") |
| |
| iperf_result = self._android_device.adb.shell( |
| f"cat {self._get_device_log_path()}" |
| ) |
| |
| log_file = self._get_full_file_path(self._current_tag) |
| with open(log_file, "w") as f: |
| f.write(iperf_result) |
| |
| self._android_device.adb.shell(f"rm {self._get_device_log_path()}") |
| |
| self._iperf_process = None |
| return log_file |