blob: a10ec2fe388d0099272d5685134bca90ce5e6eaf [file]
#!/usr/bin/env fuchsia-vendored-python
# Copyright 2024 The Fuchsia Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Converts vmstat output to various trace formats.
Expects vmstat to have been invoked with -t for the timestamp column.
Usage:
vmstat.py [options] INPUT > OUTPUT
INPUT can be '-' to operate as a streamed pipe.
"""
import argparse
import dataclasses
import datetime
import json
import logging
import sys
from pathlib import Path
from typing import Any, Dict, Iterable, Iterator, Optional, Sequence
import trace_tools
_SCRIPT_BASENAME = Path(__file__).name
_LOGGER = logging.getLogger(_SCRIPT_BASENAME)
@dataclasses.dataclass
class ProcessCounts:
running: int
blocked: int
@dataclasses.dataclass
class MemoryUsage:
swap: int
free: int
buffers: int
cache: int
# inactive: int
# active: int
@dataclasses.dataclass
class SwapRates:
bytes_in_per_second: int
bytes_out_per_second: int
@dataclasses.dataclass
class BlockIORates:
received_per_second: int
sent_per_second: int
@dataclasses.dataclass
class SystemEventRates:
interrupts_per_second: int
context_switches_per_second: int
@dataclasses.dataclass
class CPUUsage:
user: int
system: int
idle: int
wait_io: int
stolen: int
kvm_guest: int
@dataclasses.dataclass
class VmstatEntry:
"""Represents one line of output from 'vmstat -t'"""
processes: ProcessCounts
memory: MemoryUsage
swap: SwapRates
block_io: BlockIORates
system: SystemEventRates
cpu: CPUUsage
timestamp: datetime.datetime
def chrome_trace_events_json(
self, start_time: datetime.datetime
) -> Iterable[trace_tools.TraceEvent]:
"""Yields a set of trace events at a single time."""
tdelta_us = int(
(self.timestamp - start_time) / datetime.timedelta(microseconds=1)
)
def event(
name: str, value_type: str, value: Any
) -> trace_tools.TraceEvent:
return trace_tools.event_json(
name, "system", tdelta_us, value_type, value
)
yield event("processes.running", "count", self.processes.running)
yield event("processes.blocked", "count", self.processes.blocked)
yield event("memory.swap", "bytes", self.memory.swap)
yield event("memory.free", "bytes", self.memory.free)
yield event("memory.buffers", "bytes", self.memory.buffers)
yield event("memory.cache", "bytes", self.memory.cache)
yield event(
"swap.in", "bytes_per_second", self.swap.bytes_in_per_second
)
yield event(
"swap.out", "bytes_per_second", self.swap.bytes_out_per_second
)
yield event(
"block.in", "bytes_per_second", self.block_io.received_per_second
)
yield event(
"block.out", "bytes_per_second", self.block_io.sent_per_second
)
yield event(
"system.interrupts",
"count_per_second",
self.system.interrupts_per_second,
)
yield event(
"system.context_switches",
"count_per_second",
self.system.context_switches_per_second,
)
yield event("cpu.user", "percent", self.cpu.user)
yield event("cpu.system", "percent", self.cpu.system)
yield event("cpu.idle", "percent", self.cpu.idle)
yield event("cpu.wait_io", "percent", self.cpu.wait_io)
yield event("cpu.stolen", "percent", self.cpu.stolen)
yield event("cpu.kvm_guest", "percent", self.cpu.kvm_guest)
def _parse_data_row(
line: str, field_map: Dict[str, int]
) -> Optional[VmstatEntry]:
if not line:
return None
columns = line.split()
def get_int(name: str, default: int = 0) -> int:
idx = field_map.get(name)
if idx is not None and idx < len(columns):
val = columns[idx]
try:
return int(val)
except ValueError:
_LOGGER.error(
f"Failed to parse integer for field '{name}' from value '{val}' in line: {line}"
)
return default
return default
# Timestamp handling.
# vmstat -t output usually appends the timestamp at the end.
# The header has "UTC" or "timestamp" as the last field name.
# In the data row, this might span multiple words (Date and Time).
ts_idx = field_map.get("UTC") or field_map.get("timestamp")
if ts_idx is not None and ts_idx < len(columns):
timestamp_text = " ".join(columns[ts_idx:])
try:
timestamp = datetime.datetime.strptime(
timestamp_text, "%Y-%m-%d %H:%M:%S"
)
except ValueError:
_LOGGER.error(
f"Failed to parse timestamp '{timestamp_text}' in line: {line}"
)
# Maybe different format or missing?
timestamp = datetime.datetime.min
else:
timestamp = datetime.datetime.min
return VmstatEntry(
processes=ProcessCounts(
running=get_int("r"),
blocked=get_int("b"),
),
memory=MemoryUsage(
swap=get_int("swpd"),
free=get_int("free"),
buffers=get_int("buff"),
cache=get_int("cache"),
),
swap=SwapRates(
bytes_in_per_second=get_int("si"),
bytes_out_per_second=get_int("so"),
),
block_io=BlockIORates(
received_per_second=get_int("bi"),
sent_per_second=get_int("bo"),
),
system=SystemEventRates(
interrupts_per_second=get_int("in"),
context_switches_per_second=get_int("cs"),
),
cpu=CPUUsage(
user=get_int("us"),
system=get_int("sy"),
idle=get_int("id"),
wait_io=get_int("wa"),
stolen=get_int("st"),
kvm_guest=get_int("gu"),
),
timestamp=timestamp,
)
def _parse_vmstat_output(lines: Iterable[str]) -> Iterator[VmstatEntry]:
# Expect that vmstat always prints a header row before any rows of
# data. Reuse the field_map until the next header row, because
# vmstat only prints a header row periodically.
field_map: Optional[Dict[str, int]] = None
for line in lines:
stripped_line = line.strip()
if not stripped_line:
continue
if stripped_line.startswith("#"): # comment
continue
fields = stripped_line.split()
if not fields:
continue
if fields[0] in ("procs", "--procs--"): # header categories
continue
if fields[:2] == ["r", "b"]:
# header fields, starting with running/blocked processes
# treat consecutive whitespace as single separator
field_map = {name: i for i, name in enumerate(fields)}
continue
# In case there was some text appearing before the first header row,
# ignore until we establish a row header.
if field_map is None:
_LOGGER.warning(f"Skipping line before header was found: {line}")
continue
# else is a line of trace data, and we have a field map established.
entry = _parse_data_row(stripped_line, field_map)
if entry:
yield entry
def print_chrome_trace_json(
trace: Iterator[VmstatEntry],
) -> Iterable[trace_tools.TraceEvent]:
try:
first: VmstatEntry = next(trace)
except StopIteration:
# if trace is empty, abort
return
start_time = first.timestamp
yield from first.chrome_trace_events_json(start_time)
# The remainder
for t in trace:
yield from t.chrome_trace_events_json(start_time)
def _main_arg_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser()
parser.add_argument(
"--metadata",
type=str,
help="Metadata in the form: KEY1:VALUE1,KEY2:VALUE2,...",
)
parser.add_argument(
"--parser-log",
type=Path,
help="Path to a file where parser errors will be logged.",
)
parser.add_argument(
# positional argument
"input",
type=Path,
help="text file of 'vmstat -t' output. Pass '-' to read from stdin.",
)
return parser
_MAIN_ARG_PARSER = _main_arg_parser()
def main(argv: Sequence[str]) -> int:
args = _MAIN_ARG_PARSER.parse_args(argv)
if args.parser_log:
logging.basicConfig(
filename=args.parser_log,
filemode="w",
format="%(asctime)s %(levelname)s: %(message)s",
level=logging.INFO,
)
if args.input == Path("-"):
vmstat_lines = sys.stdin # is Iterable[str]
else:
vmstat_lines = args.input.read_text().splitlines()
metadata = trace_tools.metadata_arg_to_dict(args.metadata)
vmstat_entries = _parse_vmstat_output(vmstat_lines)
trace = trace_tools.complete_trace(
metadata, list(print_chrome_trace_json(vmstat_entries))
)
print(json.dumps(trace, indent=2))
return 0
if __name__ == "__main__":
sys.exit(main(sys.argv[1:]))