blob: 013a683ccd38997313aa867044e4d008e3ef02ad [file] [log] [blame]
#!/usr/bin/env fuchsia-vendored-python
# Copyright 2024 The Fuchsia Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Converts ifconfig (linux) output to various trace formats.
Specifically, this reads the output of ifconfig_loop.sh,
which calls ifconfig repeatedly between printing timestamps.
Non-linux variants of ifconfig are not supported.
Currently, this outputs in chrome-trace format only.
Usage:
ifconfig_trace.py [options] INPUT > OUTPUT
INPUT can be '-' to operate as a streamed pipe.
"""
import argparse
import dataclasses
import datetime
import json
import sys
from pathlib import Path
from typing import Dict, Iterable, Iterator, Sequence
import trace_tools
_SCRIPT_BASENAME = Path(__file__).name
@dataclasses.dataclass
class TransmissionData:
interface_name: str
tx_packets: int
tx_bytes: int
rx_packets: int
rx_bytes: int
timestamp: datetime.datetime
def chrome_trace_events_json(
self, start_time: datetime.datetime, prev: "TransmissionData"
) -> Iterable[trace_tools.TraceEvent]:
"""Yields a set of trace events at a single time."""
assert self.interface_name == prev.interface_name
tdelta_us = int(
(self.timestamp - start_time) / datetime.timedelta(microseconds=1)
)
def event(
name: str, value_type: str, value: int
) -> trace_tools.TraceEvent:
return trace_tools.event_json(
f"{self.interface_name}.{name}",
"network",
tdelta_us,
value_type,
value,
)
# ifconfig tx/rx data is cumulative, so we need compute differences
# since the last sample.
yield event("rx.packets", "count", self.rx_packets - prev.rx_packets)
yield event("rx.bytes", "count", self.rx_bytes - prev.rx_bytes)
yield event("tx.packets", "count", self.tx_packets - prev.tx_packets)
yield event("tx.bytes", "count", self.tx_bytes - prev.tx_bytes)
def _parse_ifconfig_loop_output(
lines: Iterable[str],
) -> Iterator[TransmissionData]:
# Lines need to be grouped together by time.
# The format from ifconfig_loop.sh is as follows:
#
# TIME: <timestamp>
# [ifconfig output]
#
# where ifconfig output looks like any number of:
# interface_name: ...
# data ...
# data ...
# ...
# <blank line, marking end of data>
interface_name: str = ""
rx_packets: int = 0
rx_bytes: int = 0
tx_packets: int = 0
tx_bytes: int = 0
current_time: datetime.datetime
for line in lines:
if line.startswith("TIME: "):
current_time = datetime.datetime.strptime(
line.strip().removeprefix("TIME: "), "%Y-%m-%d %H:%M:%S"
)
continue
if not line.rstrip():
# blank line marks the end of interface section
yield TransmissionData(
interface_name=interface_name,
rx_packets=rx_packets,
rx_bytes=rx_bytes,
tx_packets=tx_packets,
tx_bytes=tx_bytes,
timestamp=current_time,
)
continue
if not line.startswith(" "):
# other unindented lines start an interface section
interface_name, _, _ = line.partition(":")
continue
# the remaining lines are data for the last named interface
stripped_line = line.strip()
if stripped_line.startswith("RX packets"):
# Looks like:
# RX packets ##packets## bytes ##bytes## (### GiB)
_, _, rx_packets_str, _, rx_bytes_str, _, _ = stripped_line.split()
rx_packets = int(rx_packets_str)
rx_bytes = int(rx_bytes_str)
elif stripped_line.startswith("TX packets"):
# Looks like:
# TX packets ##packets## bytes ##bytes## (### GiB)
_, _, tx_packets_str, _, tx_bytes_str, _, _ = stripped_line.split()
tx_packets = int(tx_packets_str)
tx_bytes = int(tx_bytes_str)
# else: drop other data we don't care about
def print_chrome_trace_json(
trace: Iterator[TransmissionData],
) -> Iterable[trace_tools.TraceEvent]:
prev: Dict[str, TransmissionData] = {}
try:
first: TransmissionData = next(trace)
except StopIteration:
# if trace is empty, abort
return
# Keep track of previous by interface name, so we can report the
# numeric differences.
prev[first.interface_name] = first
start_time = first.timestamp
yield from first.chrome_trace_events_json(start_time, first)
# Stream the remainder
for t in trace:
if t.interface_name not in prev: # first occurrence
prev[t.interface_name] = t
yield from t.chrome_trace_events_json(
start_time, prev[t.interface_name]
)
prev[t.interface_name] = t
def _main_arg_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser()
parser.add_argument(
"--metadata",
type=str,
help="Metadata in the form: KEY1:VALUE1,KEY2:VALUE2,...",
)
parser.add_argument(
# positional argument
"input",
type=Path,
help="output of 'ifconfig_loop.sh'. Pass '-' to read from stdin.",
)
return parser
_MAIN_ARG_PARSER = _main_arg_parser()
def main(argv: Sequence[str]) -> int:
args = _MAIN_ARG_PARSER.parse_args(argv)
metadata = trace_tools.metadata_arg_to_dict(args.metadata)
if args.input == Path("-"):
ifconfig_lines = sys.stdin # is Iterable[str]
else:
ifconfig_lines = args.input.read_text().splitlines()
transmission_data_entries = _parse_ifconfig_loop_output(ifconfig_lines)
trace = trace_tools.complete_trace(
metadata, list(print_chrome_trace_json(transmission_data_entries))
)
print(json.dumps(trace, indent=2))
return 0
if __name__ == "__main__":
sys.exit(main(sys.argv[1:]))