| #!/usr/bin/env python3.4 |
| # |
| # Copyright 2022 The Fuchsia Authors |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| import re |
| |
| RTT_REGEX = re.compile(r"^\[(?P<timestamp>\S+)\] .*? time=(?P<rtt>\S+)") |
| LOSS_REGEX = re.compile(r"(?P<loss>\S+)% packet loss") |
| |
| |
| class PingResult(object): |
| """An object that contains the results of running ping command. |
| |
| Attributes: |
| connected: True if a connection was made. False otherwise. |
| packet_loss_percentage: The total percentage of packets lost. |
| transmission_times: The list of PingTransmissionTimes containing the |
| timestamps gathered for transmitted packets. |
| rtts: An list-like object enumerating all round-trip-times of |
| transmitted packets. |
| timestamps: A list-like object enumerating the beginning timestamps of |
| each packet transmission. |
| ping_interarrivals: A list-like object enumerating the amount of time |
| between the beginning of each subsequent transmission. |
| """ |
| |
| def __init__(self, ping_output): |
| self.packet_loss_percentage = 100 |
| self.transmission_times = [] |
| |
| self.rtts = _ListWrap(self.transmission_times, lambda entry: entry.rtt) |
| self.timestamps = _ListWrap( |
| self.transmission_times, lambda entry: entry.timestamp |
| ) |
| self.ping_interarrivals = _PingInterarrivals(self.transmission_times) |
| |
| self.start_time = 0 |
| for line in ping_output: |
| if "loss" in line: |
| match = re.search(LOSS_REGEX, line) |
| self.packet_loss_percentage = float(match.group("loss")) |
| if "time=" in line: |
| match = re.search(RTT_REGEX, line) |
| if self.start_time == 0: |
| self.start_time = float(match.group("timestamp")) |
| self.transmission_times.append( |
| PingTransmissionTimes( |
| float(match.group("timestamp")) - self.start_time, |
| float(match.group("rtt")), |
| ) |
| ) |
| self.connected = len(ping_output) > 1 and self.packet_loss_percentage < 100 |
| |
| def __getitem__(self, item): |
| if item == "rtt": |
| return self.rtts |
| if item == "connected": |
| return self.connected |
| if item == "packet_loss_percentage": |
| return self.packet_loss_percentage |
| raise ValueError("Invalid key. Please use an attribute instead.") |
| |
| def as_dict(self): |
| return { |
| "connected": 1 if self.connected else 0, |
| "rtt": list(self.rtts), |
| "time_stamp": list(self.timestamps), |
| "ping_interarrivals": list(self.ping_interarrivals), |
| "packet_loss_percentage": self.packet_loss_percentage, |
| } |
| |
| |
| class PingTransmissionTimes(object): |
| """A class that holds the timestamps for a packet sent via the ping command. |
| |
| Attributes: |
| rtt: The round trip time for the packet sent. |
| timestamp: The timestamp the packet started its trip. |
| """ |
| |
| def __init__(self, timestamp, rtt): |
| self.rtt = rtt |
| self.timestamp = timestamp |
| |
| |
| class _ListWrap(object): |
| """A convenient helper class for treating list iterators as native lists.""" |
| |
| def __init__(self, wrapped_list, func): |
| self.__wrapped_list = wrapped_list |
| self.__func = func |
| |
| def __getitem__(self, key): |
| return self.__func(self.__wrapped_list[key]) |
| |
| def __iter__(self): |
| for item in self.__wrapped_list: |
| yield self.__func(item) |
| |
| def __len__(self): |
| return len(self.__wrapped_list) |
| |
| |
| class _PingInterarrivals(object): |
| """A helper class for treating ping interarrivals as a native list.""" |
| |
| def __init__(self, ping_entries): |
| self.__ping_entries = ping_entries |
| |
| def __getitem__(self, key): |
| return ( |
| self.__ping_entries[key + 1].timestamp - self.__ping_entries[key].timestamp |
| ) |
| |
| def __iter__(self): |
| for index in range(len(self.__ping_entries) - 1): |
| yield self[index] |
| |
| def __len__(self): |
| return max(0, len(self.__ping_entries) - 1) |