blob: 94fbf6686949cec2930bbf11f5c8e1fcf609f301 [file] [log] [blame]
#!/usr/bin/env fuchsia-vendored-python
# Copyright 2024 The Fuchsia Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Power trace metrics."""
import dataclasses
import itertools
import logging
from typing import Callable, MutableSequence, Sequence
from reporting import metrics
from trace_processing import trace_metrics, trace_model, trace_time, trace_utils
from trace_processing.metrics import suspend as suspend_metrics
_LOGGER: logging.Logger = logging.getLogger(__name__)
_LOAD_GEN = "load_generator"
_SAG = "system-activity-governor"
@dataclasses.dataclass
class _Sample:
"""A sample of collected power metrics.
Args:
timestamp: timestamp of sample in nanoseconds since epoch.
voltage: voltage in Volts.
current: current in milliAmpere.
raw_aux: (optional) The raw 16 bit fine aux channel reading from a Monsoon power monitor. Can
optionally be used for log synchronization and alignment
"""
timestamp: int
voltage: float
current: float
raw_aux: int | None
@property
def power(self) -> float:
"""Power in Watts."""
return self.voltage * self.current * 1e-3
def _running_avg(avg: float, value: float, count: int) -> float:
return avg + (value - avg) / count
@dataclasses.dataclass
class _AggregateMetrics:
"""Aggregate power metrics representation.
Represents aggregated metrics over a number of power metrics samples.
Args:
sample_count: number of power metric samples.
max_power: maximum power in Watts over all samples.
mean_power: average power in Watts over all samples.
min_power: minimum power in Watts over all samples.
"""
sample_count: int = 0
max_power: float | None = None
mean_power: float = 0
min_power: float | None = None
DESCRIPTION_BASE = "Power usage sampled during test"
"""Stable format for descriptions of aggregate metrics."""
def process_sample(self, sample: _Sample) -> None:
"""Process a sample of power metrics.
Args:
sample: A sample of power metrics.
"""
self.sample_count += 1
self.max_power = (
max(self.max_power, sample.power)
if self.max_power is not None
else sample.power
)
self.mean_power = _running_avg(
self.mean_power, sample.power, self.sample_count
)
self.min_power = (
min(self.min_power, sample.power)
if self.min_power is not None
else sample.power
)
@property
def is_empty(self) -> bool:
"""Returns true if no samples have been processed yet."""
return self.max_power is None or self.min_power is None
def _build_expl(self, condition: str, aggregate: str) -> str:
"""Builds a properly formatted explanation for a given power metric."""
if not condition:
return f"{_AggregateMetrics.DESCRIPTION_BASE}, {aggregate}"
return f"{_AggregateMetrics.DESCRIPTION_BASE} {condition}, {aggregate}"
def to_fuchsiaperf_results(
self, tag: str, condition: str
) -> list[metrics.TestCaseResult]:
"""Converts Power metrics to fuchsiaperf JSON object.
Args:
tag: a descriptive word to add to the end of metric names, e.g. "suspend"
condition: a descriptive string to add to the explanatory text used for these metrics,
e.g. "while device is suspended"
Returns:
List of JSON object.
"""
assert self.min_power is not None and self.max_power is not None
suffix = f"_{tag}" if tag else ""
results: list[metrics.TestCaseResult] = [
metrics.TestCaseResult(
label="MinPower" + suffix,
unit=metrics.Unit.watts,
values=[self.min_power],
doc=self._build_expl(condition, "minimum"),
),
# TODO(cmasone): Add MedianPower metrics
metrics.TestCaseResult(
label="MeanPower" + suffix,
unit=metrics.Unit.watts,
values=[self.mean_power],
doc=self._build_expl(condition, "mean"),
),
metrics.TestCaseResult(
label="MaxPower" + suffix,
unit=metrics.Unit.watts,
values=[self.max_power],
doc=self._build_expl(condition, "maximum"),
),
]
return results
class PowerMetricsProcessor(trace_metrics.MetricsProcessor):
"""Computes aggregate power consumption metrics.
Given a trace containing power samples, separately computes aggregate power usage metrics for:
* The duration of the entire workload captured by the trace.
* Any periods of suspension for the device.
* Periods during which the device is NOT suspended (iff suspends are detected).
"""
def __init__(
self,
good_suspend_pred: Callable[[trace_time.Window], bool] | None = None,
) -> None:
"""
Args:
good_suspend_pred: Optional predicate for which suspend windows should be included
in metrics calculation. If provided, must return True for any window the caller
wishes to take into account, False for those that should be ignored.
"""
self._good_suspend_pred = good_suspend_pred
@property
def event_patterns(self) -> set[str]:
return suspend_metrics.EVENT_PATTERNS
def process_metrics(
self, model: trace_model.Model
) -> MutableSequence[metrics.TestCaseResult]:
"""Calculate power metrics, excluding power/trace sync signals.
In order to sync power measurements with a system trace, our test harness
generates some structured CPU load (and corresponding power consumption data).
This portion of the data must be excluded from metrics calculation.
Args:
model: In-memory representation of a merged power and system trace.
Returns:
Set of metrics results for this test case.
"""
test_start = _find_test_start(model)
if test_start is None:
_LOGGER.info(
"No load_generator scheduling records present. Power data may not have been "
"merged into the model."
)
return []
post_sync_model = model.slice(test_start)
metrics_events = trace_utils.filter_events(
post_sync_model.all_events(),
category="Metrics",
name="Metrics",
type=trace_model.CounterEvent,
)
suspend_windows = [
window
for window in _find_suspend_windows(post_sync_model)
if self.is_good_suspend_window(window)
]
_LOGGER.info(f"Identified suspend windows: {suspend_windows}")
power_metrics = _AggregateMetrics()
running_power_metrics = _AggregateMetrics()
suspend_power_metrics = _AggregateMetrics()
for me in metrics_events:
# These args are set in _append_power_data()
# found in //src/tests/end_to_end/power/power_test_utils.py
if "Voltage" in me.args and "Current" in me.args:
sample = _Sample(
timestamp=me.start.to_epoch_delta().to_nanoseconds(),
voltage=float(me.args["Voltage"]),
current=float(me.args["Current"]),
raw_aux=int(me.args["Raw Aux"])
if "Raw_Aux" in me.args
else None,
)
power_metrics.process_sample(sample)
if any(me.start in window for window in suspend_windows):
suspend_power_metrics.process_sample(sample)
else:
running_power_metrics.process_sample(sample)
if power_metrics.is_empty:
_LOGGER.warning(
"No power metric records after load_generator sync signal. "
"There may have been an error processing power or trace data."
)
return []
results = power_metrics.to_fuchsiaperf_results("", "")
# If the system was suspended, also report suspended and non-suspended
# (running) power metrics.
if not suspend_power_metrics.is_empty:
results.extend(
suspend_power_metrics.to_fuchsiaperf_results(
"suspend", "while device is suspended"
)
)
results.extend(
running_power_metrics.to_fuchsiaperf_results(
"running", "while device is awake"
)
)
return results
def is_good_suspend_window(self, window: trace_time.Window) -> bool:
return self._good_suspend_pred is None or self._good_suspend_pred(
window
)
def _find_test_start(model: trace_model.Model) -> trace_time.TimePoint | None:
"""Identify the point at which the test workload began.
In order to sync power measurements with a system trace, our test harness
runs a process on-device that generates structured CPU load. This process
is named `load_generator.cm`. The first TimePoint after all the threads of
this process exit is the first moment that data should be used for metrics
calculation.
Args:
model: In-memory representation of a merged power and system trace.
Returns:
The first TimePoint after power/system trace sync signals complete.
"""
load_generator: trace_model.Process | None = None
for proc in model.processes:
if proc.name and proc.name.startswith(_LOAD_GEN):
load_generator = proc
break
if not load_generator:
return None
load_generator_threads = [t.tid for t in load_generator.threads]
def is_last_generator_thread_record(
r: trace_model.ContextSwitch,
) -> bool:
return (
r.outgoing_tid in load_generator_threads
and r.outgoing_state == trace_model.ThreadState.ZX_THREAD_STATE_DEAD
)
records = sorted(
filter(
is_last_generator_thread_record,
trace_utils.filter_records(
itertools.chain.from_iterable(
model.scheduling_records.values()
),
trace_model.ContextSwitch,
),
),
key=lambda r: r.start,
)
return records[-1].start if records else None
def _find_suspend_windows(
model: trace_model.Model,
) -> Sequence[trace_time.Window]:
"""Identify periods of suspension in model.
Inspect scheduling records in TimePoint order across all CPUs to identify periods of
suspension.
Args:
model: In-memory representation of a system trace.
Returns:
Windows of time during which the device was suspended.
"""
system_activity_governor: trace_model.Process | None = None
for proc in model.processes:
if proc.name and proc.name.startswith(_SAG):
system_activity_governor = proc
break
if not system_activity_governor:
_LOGGER.info(f"No {_SAG} process; skipping suspend metrics")
return []
suspend_windows: list[trace_time.Window] = []
events = filter(
lambda e: e.pid == system_activity_governor.pid,
suspend_metrics.filter_sag_suspend_events(model),
)
for suspend in events:
if suspend.duration is None:
_LOGGER.warning("Skipping suspend event with empty duration")
continue
suspend_windows.append(
trace_time.Window(suspend.start, suspend.start + suspend.duration)
)
return suspend_windows