blob: 1c6be5ca4ced188715fafd0e866099d835faa30a [file] [log] [blame]
#!/usr/bin/env fuchsia-vendored-python
# Copyright 2024 The Fuchsia Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
from enum import Enum
from typing import Dict, List, Tuple
import itertools
import logging
import trace_processing.trace_model as trace_model
import trace_processing.trace_time as trace_time
_LOGGER: logging.Logger = logging.getLogger("CpuBreakdownMetricsProcessor")
# Default cut-off for the percentage CPU. Any process that has CPU below this
# won't be listed in the results. User can pass in a cutoff.
DEFAULT_PERCENT_CUTOFF = 0.0
class CpuBreakdownMetricsProcessor:
"""
Breaks down CPU metrics into a free-form metrics format.
"""
def __init__(
self,
model: trace_model.Model,
output_path: str = "",
percent_cutoff: float = DEFAULT_PERCENT_CUTOFF,
) -> None:
self._model: trace_model.Model = model
self._percent_cutoff = percent_cutoff
self._output_path: str = output_path
# Maps TID to a Dict of CPUs to total duration (ms) on that CPU.
# E.g. For a TID of 1001 with 3 CPUs, this would be:
# {1001: {0: 1123.123, 1: 123123.123, 3: 1231.23}}
self._tid_to_durations: Dict[int, Dict[int, float]] = {}
self._breakdown: List[Dict[str, object]] = []
self._tid_to_thread_name: Dict[int, str] = {}
self._tid_to_process_name: Dict[int, str] = {}
# Map of CPU to total duration used (ms).
self._cpu_to_total_duration: Dict[int, float] = {}
def _calculate_duration_per_cpu(
self,
cpu: int,
records: List[trace_model.ContextSwitch],
) -> None:
"""
Calculates the total duration for each thread, on a particular CPU.
Uses a List of sorted ContextSwitch records to sum up the duration for each thread.
It's possible that consecutive records do not have matching incoming_tid and outgoing_tid.
"""
smallest_timestamp = self._timestamp_ms(records[0].start)
largest_timestamp = self._timestamp_ms(records[-1].start)
total_duration = largest_timestamp - smallest_timestamp
self._cpu_to_total_duration[cpu] = total_duration
for prev_record, curr_record in itertools.pairwise(records):
# Check that the previous ContextSwitch's incoming_tid ("this thread is starting work
# on this CPU") matches the current ContextSwitch's outgoing_tid ("this thread is being
# switched away from"). If so, there is a duration to calculate. Otherwise, it means
# maybe there is skipped data or something.
if prev_record.tid != curr_record.outgoing_tid:
# TODO(https://fxbug.dev/331458411): Record how often skipping happens.
_LOGGER.info(
"Possibly missing a ContextSwitch record in trace."
)
# Purposely skip saving idle thread durations.
elif prev_record.is_idle():
continue
else:
start_ts = self._timestamp_ms(prev_record.start)
stop_ts = self._timestamp_ms(curr_record.start)
duration = stop_ts - start_ts
assert duration >= 0
if curr_record.outgoing_tid in self._tid_to_thread_name:
# Add duration to the total duration for that tid and CPU.
self._tid_to_durations.setdefault(
curr_record.outgoing_tid, {}
).setdefault(cpu, 0)
self._tid_to_durations[curr_record.outgoing_tid][
cpu
] += duration
@staticmethod
def _timestamp_ms(timestamp: trace_time.TimePoint) -> float:
"""
Return timestamp in ms.
"""
return timestamp.to_epoch_delta().to_milliseconds_f()
def process_metrics(self) -> List[Dict[str, object]]:
"""
Given TraceModel:
- Iterates through all the SchedulingRecords and calculates the duration
for each Process's Threads, and saves them by CPU.
- Writes durations into free-form-metric JSON file.
"""
# Map tids to names.
for p in self._model.processes:
for t in p.threads:
self._tid_to_process_name[t.tid] = p.name
self._tid_to_thread_name[t.tid] = t.name
# Calculate durations for each CPU for each tid.
for cpu, records in self._model.scheduling_records.items():
self._calculate_duration_per_cpu(
cpu,
sorted(
(
r
for r in records
if isinstance(r, trace_model.ContextSwitch)
),
key=lambda record: record.start,
),
)
# Calculate the percent of time the thread spent on this CPU,
# compared to the total CPU duration.
# If the percent spent is at or above our cutoff, add metric to
# breakdown.
for tid, breakdown in self._tid_to_durations.items():
if tid in self._tid_to_thread_name:
for cpu, duration in breakdown.items():
percent = round(
duration / self._cpu_to_total_duration[cpu] * 100, 3
)
if percent >= self._percent_cutoff:
metric = {
"process_name": self._tid_to_process_name[tid],
"thread_name": self._tid_to_thread_name[tid],
"cpu": cpu,
"percent": percent,
"duration": duration,
}
self._breakdown.append(metric)
# Sort metrics by CPU (desc) and percent (desc).
self._breakdown = sorted(
self._breakdown,
key=lambda m: (m["cpu"], m["percent"]),
reverse=True,
)
return self._breakdown