blob: 8bd96acb27acb9134e4ca2109cdd1b82747e1bba [file] [log] [blame]
#!/usr/bin/env fuchsia-vendored-python
# Copyright 2023 The Fuchsia Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Library to help integrate power sampling into Fuchsia e2e tests."""
import abc
import dataclasses
import enum
import math
import struct
from typing import Any, BinaryIO, Sequence
# Traces use "ticks" which is a hardware dependent time duration
TICKS_PER_NS = 0.024
@dataclasses.dataclass(frozen=True)
class PowerSamplerConfig:
# Directory for samples output
output_dir: str
# Unique metric name, used in output file names.
metric_name: str
# Causes a no-op power sampler to be used.
disabled: bool = False
# Path of the measurepower tool (Optional)
measurepower_path: str | None = None
class _PowerSamplerState(enum.Enum):
INIT = 1
STARTED = 2
STOPPED = 3
class PowerSampler:
"""Power sampling base class.
Usage:
```
sampler:PowerSampler = create_power_sampler(...)
sampler.start()
... gather traces, start and stop the power sampler, create the model ...
sampler.stop()
trace_model_with_power =
power_test_utils.merge_power_data(model, sampler.extract_samples(), outpath)
processor = PowerMetricsProcessor()
processor.process_metrics(trace_model_with_power)
```
"""
def __init__(self, config: PowerSamplerConfig):
"""Creates a PowerSampler from a config.
Args:
config (PowerSamplerConfig): Configuration.
"""
self._state: _PowerSamplerState = _PowerSamplerState.INIT
self._config = config
def start(self) -> None:
"""Starts sampling."""
assert self._state == _PowerSamplerState.INIT
self._state = _PowerSamplerState.STARTED
self._start_impl()
def stop(self) -> None:
"""Stops sampling. Has no effect if never started or already stopped."""
if self._state == _PowerSamplerState.STARTED:
self._state = _PowerSamplerState.STOPPED
self._stop_impl()
def should_generate_load(self) -> bool:
return False
def has_samples(self) -> bool:
return False
def extract_samples(self) -> Sequence["Sample"]:
"""Return recorded samples. May be expensive and require I/O."""
return []
@abc.abstractmethod
def _stop_impl(self) -> None:
pass
@abc.abstractmethod
def _start_impl(self) -> None:
pass
class Sample:
def __init__(
self,
timestamp: int | str,
current: float | str,
voltage: float | str,
aux_current: float | str | None = None,
power: float | str | None = None,
rail_id: int = 1,
rail_name: str | None = None,
) -> None:
self.timestamp = int(timestamp)
self.current = float(current)
self.voltage = float(voltage)
self.aux_current = (
float(aux_current) if aux_current is not None else None
)
self.power = (
float(power) if power is not None else self.current * self.voltage
)
self.rail_id = rail_id
self.rail_name = rail_name
def __eq__(self, other: Any) -> bool:
if not isinstance(other, Sample):
raise NotImplementedError()
return (
self.timestamp == other.timestamp
and self.current == other.current
and self.voltage == other.voltage
and self.aux_current == other.aux_current
and self.power == other.power
and self.rail_id == other.rail_id
and self.rail_name == other.rail_name
)
def __repr__(self) -> str:
return (
f"ts: {self.timestamp}ms, current: {self.current}, "
f"voltage: {self.voltage}, aux: {self.aux_current}, "
f"power: {self.power}, rail_id: {self.rail_id}, rail_name: {self.rail_name}"
)
def append_power_data(
fxt_path: str,
power_samples: Sequence[Sample],
starting_ticks: int,
) -> None:
"""
Append a list of power samples to the trace at `fxt_path`.
Args:
fxt_path: the fxt file to write to
power_samples: the samples to append
starting_ticks: offset from the beginning of the trace in "ticks"
"""
print(f"Aligning Power Trace to start at {starting_ticks} ticks")
with open(fxt_path, "ab") as merged_trace:
# Virtual koids have a top bit as 1, the remaining doesn't matter as long as it's unique.
fake_process_koid = 0x8C01_1EC7_EDDA_7A10 # CollectedData10
fake_thread_koid = 0x8C01_1EC7_EDDA_7A20 # CollectedData20
fake_thread_ref = 0xFF
BYTES_PER_WORD = 8
class InlineString:
def __init__(self, s: str):
self.num_words: int = math.ceil(len(s) / BYTES_PER_WORD)
# Inline fxt ids have their top bit set to 1.
# The remaining bits indicate the number of inline bytes are valid.
self.ref: int = 0x8000 | len(s)
# Pad the word with 0x00 bytes to fill a full multiple of words.
size = self.num_words * BYTES_PER_WORD
assert (
len(s) <= size
), f"String {s} is longer than {size} bytes."
tail_zeros = "\0" * (size - len(s))
b = bytes(s + tail_zeros, "utf-8")
assert (
len(b) == size
), f"Binary string {b!r} must be {size} bytes."
self.data: bytes = b
# See //docs/reference/tracing/trace-format for the below trace format
def thread_record_header(thread_ref: int) -> int:
thread_record_type = 3
thread_record_size_words = 3
return (
thread_ref << 16
| thread_record_size_words << 4
| thread_record_type
)
def kernel_object_record_header(
num_args: int, name_ref: int, obj_type: int, size_words: int
) -> int:
kernel_object_record_header_type = 7
return (
num_args << 40
| name_ref << 24
| obj_type << 16
| size_words << 4
| kernel_object_record_header_type
)
# The a fake process and thread records
merged_trace.write(
thread_record_header(fake_thread_ref).to_bytes(8, "little")
)
merged_trace.write(fake_process_koid.to_bytes(8, "little"))
merged_trace.write(fake_thread_koid.to_bytes(8, "little"))
ZX_OBJ_TYPE_PROCESS = 1
ZX_OBJ_TYPE_THREAD = 2
# Name the fake process
process_name = InlineString("Power Measurements")
merged_trace.write(
kernel_object_record_header(
0,
process_name.ref,
ZX_OBJ_TYPE_PROCESS,
process_name.num_words + 2, # 1 word header, 1 word koid
).to_bytes(8, "little")
)
merged_trace.write(fake_process_koid.to_bytes(8, "little"))
merged_trace.write(process_name.data)
# Name the fake thread
thread_name = InlineString("Power Measurements")
merged_trace.write(
kernel_object_record_header(
0,
thread_name.ref,
ZX_OBJ_TYPE_THREAD,
thread_name.num_words + 2, # 1 word header, 1 word koid
).to_bytes(8, "little")
)
merged_trace.write(fake_thread_koid.to_bytes(8, "little"))
merged_trace.write(thread_name.data)
# Initialization record sets the expected ticks per second.
init_record_type = 1
init_record_size = 2
init_record_header = (init_record_size << 4) | init_record_type
merged_trace.write(init_record_header.to_bytes(8, "little"))
merged_trace.write(
int(TICKS_PER_NS * 1000 * 1000 * 1000).to_bytes(8, "little")
)
def counter_event_header(
name_id: int,
category_id: int,
thread_ref: int,
num_args: int,
record_words: int,
) -> int:
counter_event_type = 1 << 16
event_record_type = 4
return (
(name_id << 48)
| (category_id << 32)
| (thread_ref << 24)
| (num_args << 20)
| counter_event_type
| record_words << 4
| event_record_type
)
# Now write our sample data as counter events into the trace
for sample in power_samples:
category = InlineString("Metrics")
name = InlineString(sample.rail_name or "Metrics")
# We will be providing either 3 or 4 arguments, depending on whether
# or not this sample has raw aux current data in it.
arg_count = 4 if sample.aux_current is not None else 3
words_per_arg = (
category.num_words + name.num_words + 1 # 1 word for header.
)
arg_words = arg_count * words_per_arg
# Counter events can store up to 15 args only.
assert arg_count <= 15
# Emit the counter track
merged_trace.write(
counter_event_header(
name.ref,
category.ref,
0xFF,
arg_count,
# 1 word counter, 1 word ts,
# 2 words inline strings
# |arg_words| words of arguments,
# 1 word counter id = 5 + |arg_words|
5 + arg_words,
).to_bytes(8, "little")
)
timestamp_ticks = int(
(sample.timestamp * TICKS_PER_NS) + starting_ticks
)
assert (
timestamp_ticks >= 0
), f"timestamp_ticks must be positive: {timestamp_ticks} {sample.__dict__}"
merged_trace.write(timestamp_ticks.to_bytes(8, "little"))
merged_trace.write(category.data)
merged_trace.write(name.data)
def double_argument_header(name_ref: int, size: int) -> int:
argument_type = 5
return name_ref << 16 | size << 4 | argument_type
def write_double_argument(
trace_file: BinaryIO, name: str, data: float
) -> None:
# Words are 8 bytes. 2 for header and data. Variable for name.
name_string = InlineString(name)
words_total = 2 + name_string.num_words
trace_file.write(
double_argument_header(
name_string.ref, words_total
).to_bytes(8, "little")
)
trace_file.write(name_string.data)
s = struct.pack("d", data)
trace_file.write(s)
write_double_argument(merged_trace, "Voltage", sample.voltage)
write_double_argument(merged_trace, "Current", sample.current)
write_double_argument(merged_trace, "Power", sample.power)
if sample.aux_current is not None:
write_double_argument(
merged_trace, "Raw Aux", sample.aux_current
)
# Write the counter_id, taken as the power rail's ID.
merged_trace.write(sample.rail_id.to_bytes(8, "little"))