blob: 2c202bd1be94cde51468fefc51246098ad73ab4b [file] [log] [blame]
#!/usr/bin/env fuchsia-vendored-python
# Copyright 2023 The Fuchsia Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Fuchsia power test utility library."""
# keep-sorted start
import abc
import csv
import dataclasses
import enum
import itertools
import logging
import operator
import os
import pathlib
import signal
import struct
import subprocess
import time
# keep-sorted end
# keep-sorted start
from collections import deque
from collections.abc import Iterable, Mapping
from trace_processing import trace_metrics, trace_model, trace_time
from trace_processing.metrics import power as power_metrics
from typing import Sequence
# keep-sorted end
SAMPLE_INTERVAL_NS = 200000
# Traces use "ticks" which is a hardware dependent time duration
TICKS_PER_NS = 0.024
_LOGGER = logging.getLogger(__name__)
# The measurepower tool's path. The tool is expected to periodically output
# power measurements into a csv file, with _PowerCsvHeaders.all() columns.
_MEASUREPOWER_PATH_ENV_VARIABLE = "MEASUREPOWER_PATH"
def weighted_average(arr: Iterable[float], weights: Iterable[int]) -> float:
return sum(
itertools.starmap(
operator.mul,
zip(arr, weights, strict=True),
)
) / sum(weights)
# We don't have numpy in the vendored python libraries so we'll have to roll our own correlate
# functionality which will be slooooow.
#
# Normally, we'd compute the cross correlation and then take the argmax to line up the signals the
# closest. Instead we'll do the argmax and the correlation at the same time to save on allocations.
#
# Precondition: len(signal) must be >= len(feature) for accurate results
def cross_correlate_arg_max(
signal: Sequence[float], feature: Sequence[float]
) -> tuple[float, int]:
"""Cross correlate two 1d signals and return the maximum correlation. Correlation is only done
where the signals overlap completely.
Returns
(correlation, idx)
Where correlation is the maximum correlation between the signals and idx is the argmax that
it occurred at.
"""
assert len(signal) >= len(feature)
# Slide our feature across the signal and compute the dot product at each point
return max(
# Produce items of the form [(correlation1, idx1), (correlation2, idx2), ...] of which we
# want the highest correlation.
map(
lambda base: (
# Fancy itertools based dot product
sum(
itertools.starmap(
operator.mul,
zip(
itertools.islice(signal, base, base + len(feature)),
feature,
strict=True,
),
)
),
base,
),
# Take a sliding window dot product. E.g. if we have the arrays
# [1,2,3,4] and [1,2]
#
# The sliding window dot product is
# [
# [1,2] o [1,2],
# [2,3] o [1,2],
# [3,4] o [1,2],
# ]
range(len(signal) - len(feature) + 1),
)
)
# Constants class
#
# One of two formats for the CSV data is expected. If the first column is named
# simply "Timestamp", where the scripts are expected to synthesize nominal
# timestamps for the samples. If it is named "Mandatory Timestamp" instead,
# then it is data captured from a script which has already synthesized its own
# timestamps, accounting for dropped samples, calibration samples, and the power
# monitor deviations from nominal. These timestamps should be simply taken and
# used as is.
#
# In the second case, the script can (if asked to) also capture a 4th column of
# data representing the raw 16 bit readings from the fine auxiliary current
# channel. When present, these readings provide an alternative method for
# aligning the timelines of the power monitor data with the trace data.
#
class _PowerCsvHeaders(enum.StrEnum):
MANDATORY_TIMESTAMP = "Mandatory Timestamp"
TIMESTAMP = "Timestamp"
CURRENT = "Current"
VOLTAGE = "Voltage"
AUX_CURRENT = "Raw Aux"
@staticmethod
def assert_header(header: list[str]) -> None:
assert header[1] == _PowerCsvHeaders.CURRENT
assert header[2] == _PowerCsvHeaders.VOLTAGE
if header[0] == _PowerCsvHeaders.MANDATORY_TIMESTAMP:
assert len(header) == 3 or header[3] == _PowerCsvHeaders.AUX_CURRENT
else:
assert header[0] == _PowerCsvHeaders.TIMESTAMP
# TODO(b/320778225): Make this class private and stateless by changing all callers to use
# MetricsSampler directly.
class PowerMetricsProcessor(trace_metrics.MetricsProcessor):
"""Power metric processor to extract performance data from raw samples.
Args:
power_samples_path: path to power samples CSV file.
"""
def __init__(self, power_samples_path: str) -> None:
self._power_samples_path: str = power_samples_path
self._power_metrics: power_metrics.AggregatePowerMetrics = (
power_metrics.AggregatePowerMetrics()
)
# Implements MetricsProcessor.process_metrics. Model is None to support legacy users.
def process_metrics(
self, model: trace_model.Model | None = None
) -> Sequence[trace_metrics.TestCaseResult]:
"""Coverts CSV samples into aggregate metrics."""
with open(self._power_samples_path, "r") as f:
reader = csv.reader(f)
header = next(reader)
_PowerCsvHeaders.assert_header(header)
for row in reader:
sample = power_metrics.PowerMetricSample(
timestamp=int(row[0]),
voltage=float(row[1]),
current=float(row[2]),
raw_aux=int(row[3]) if len(row) >= 4 else None,
)
self._power_metrics.process_sample(sample)
return self._power_metrics.to_fuchsiaperf_results()
# DEPRECATED: Use process_metrics return value.
# TODO(b/320778225): Remove once downstream users are refactored.
def to_fuchsiaperf_results(self) -> list[trace_metrics.TestCaseResult]:
"""Returns the processed TestCaseResults"""
return self._power_metrics.to_fuchsiaperf_results()
# DEPRECATED: Use process_metrics + trace_metrics.TestCaseResult.write_fuchsiaperf_json.
# TODO(b/320778225): Remove once downstream users are refactored.
def write_fuchsiaperf_json(
self,
output_dir: str,
metric_name: str,
trace_results: list[trace_metrics.TestCaseResult] | None = None,
) -> str:
"""Writes the fuchsia_perf JSON file to specified directory.
Args:
output_dir: path to the output directory to write to.
metric_name: name of the power metric being measured.
trace_metrics: trace-based metrics to include in the output.
Returns:
The fuchiaperf.json file generated by trace processing.
"""
results = self.to_fuchsiaperf_results() + (trace_results or [])
fuchsiaperf_json_path = os.path.join(
output_dir,
f"{metric_name}_power.fuchsiaperf.json",
)
trace_metrics.TestCaseResult.write_fuchsiaperf_json(
results=results,
test_suite=f"fuchsia.power.{metric_name}",
output_path=pathlib.Path(fuchsiaperf_json_path),
)
return fuchsiaperf_json_path
@dataclasses.dataclass(frozen=True)
class PowerSamplerConfig:
# Directory for samples output
output_dir: str
# Unique metric name, used in output file names.
metric_name: str
# Path of the measurepower tool (Optional)
measurepower_path: str | None = None
class _PowerSamplerState(enum.Enum):
INIT = 1
STARTED = 2
STOPPED = 3
class PowerSampler:
"""Power sampling base class.
Usage:
```
sampler:PowerSampler = create_power_sampler(...)
sampler.start()
... interact with the device, also gather traces ...
sampler.stop()
sampler.metrics_processor().process_and_save(model, output_path="my_test.fuchsiaperf.json")
```
Alternatively, the sampler can be combined with the results of other metric processors like this:
```
power_sampler = PowerSampler(...)
processor = MetricsProcessorSet([
CpuMetricsProcessor(aggregates_only=True),
FpsMetricsProcessor(aggregates_only=False),
MyCustomProcessor(...),
power_sampler.metrics_processor(),
])
... gather traces, start and stop the power sampler, create the model ...
processor.process_and_save(model, output_path="my_test.fuchsiaperf.json")
```
"""
def __init__(self, config: PowerSamplerConfig):
"""Creates a PowerSampler from a config.
Args:
config (PowerSamplerConfig): Configuration.
"""
self._state: _PowerSamplerState = _PowerSamplerState.INIT
self._config = config
def start(self) -> None:
"""Starts sampling."""
assert self._state == _PowerSamplerState.INIT
self._state = _PowerSamplerState.STARTED
self._start_impl()
def stop(self) -> None:
"""Stops sampling. Has no effect if never started or already stopped."""
if self._state == _PowerSamplerState.STARTED:
self._state = _PowerSamplerState.STOPPED
self._stop_impl()
# DEPRECATED: Use .metric_processor().process_metrics() instead.
# TODO(b/320778225): Remove once downstream users are refactored.
def to_fuchsiaperf_results(self) -> Sequence[trace_metrics.TestCaseResult]:
"""Returns power metrics TestCaseResults"""
assert self._state == _PowerSamplerState.STOPPED
return self.metrics_processor().process_metrics(
model=trace_model.Model()
)
# Implements MetricsProcessor.process_metrics. Model is unused.
def metrics_processor(self) -> trace_metrics.MetricsProcessor:
"""Returns a MetricsProcessor instance associated with the sampler."""
return self._metrics_processor_impl()
def should_generate_load(self) -> bool:
return False
def has_samples(self) -> bool:
return False
def merge_power_data(self, model: trace_model.Model, fxt_path: str) -> None:
pass
@abc.abstractmethod
def _stop_impl(self) -> None:
pass
@abc.abstractmethod
def _start_impl(self) -> None:
pass
@abc.abstractmethod
def _metrics_processor_impl(self) -> trace_metrics.MetricsProcessor:
pass
class _NoopPowerSampler(PowerSampler):
"""A no-op power sampler, used in environments where _MEASUREPOWER_PATH_ENV_VARIABLE isn't set."""
def __init__(self, config: PowerSamplerConfig) -> None:
"""Constructor.
Args:
config (PowerSamplerConfig): Configuration.
"""
super().__init__(config)
def _start_impl(self) -> None:
pass
def _stop_impl(self) -> None:
pass
def _metrics_processor_impl(self) -> trace_metrics.MetricsProcessor:
return trace_metrics.ConstantMetricsProcessor(results=[])
class _RealPowerSampler(PowerSampler):
"""Wrapper for the measurepower command-line tool."""
def __init__(self, config: PowerSamplerConfig):
"""Constructor.
Args:
config (PowerSamplerConfig): Configuration.
"""
super().__init__(config)
assert config.measurepower_path
self._measurepower_proc: subprocess.Popen[str] | None = None
self._csv_output_path = os.path.join(
self._config.output_dir,
f"{self._config.metric_name}_power_samples.csv",
)
self._sampled_data = False
def _start_impl(self) -> None:
_LOGGER.info("Starting power sampling")
self._start_power_measurement()
self._await_first_sample()
self._sampled_data = True
def _stop_impl(self) -> None:
_LOGGER.info("Stopping power sampling...")
self._stop_power_measurement()
_LOGGER.info("Power sampling stopped")
def _metrics_processor_impl(self) -> trace_metrics.MetricsProcessor:
return trace_metrics.MetricsProcessorsSet(
(
PowerMetricsProcessor(power_samples_path=self._csv_output_path),
power_metrics.PowerMetricsProcessor(),
)
)
def _start_power_measurement(self) -> None:
assert self._config.measurepower_path
cmd = [
self._config.measurepower_path,
"-format",
"csv",
"-out",
self._csv_output_path,
]
_LOGGER.debug(f"Power measurement cmd: {cmd}")
self._measurepower_proc = subprocess.Popen(
cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True
)
def _await_first_sample(self, timeout_sec: float = 60) -> None:
_LOGGER.debug(f"Awaiting 1st power sample (timeout_sec={timeout_sec})")
assert self._measurepower_proc
proc = self._measurepower_proc
csv_path = self._csv_output_path
deadline = time.time() + timeout_sec
while time.time() < deadline:
if proc.poll():
stdout = proc.stdout.read() if proc.stdout else None
stderr = proc.stderr.read() if proc.stderr else None
raise RuntimeError(
f"Measure power failed to start with status "
f"{proc.returncode} stdout: {stdout} "
f"stderr: {stderr}"
)
if os.path.exists(csv_path) and os.path.getsize(csv_path):
_LOGGER.debug(f"Received 1st power sample in {csv_path}")
return
time.sleep(1)
raise TimeoutError(
f"Timed out after {timeout_sec} seconds while waiting for power samples"
)
def _stop_power_measurement(self, timeout_sec: float = 60) -> None:
_LOGGER.debug("Stopping the measurepower process...")
proc = self._measurepower_proc
assert proc
proc.send_signal(signal.SIGINT)
result = proc.wait(timeout_sec)
if result:
stdout = proc.stdout.read() if proc.stdout else None
stderr = proc.stderr.read() if proc.stderr else None
raise RuntimeError(
f"Measure power failed once stopped with status"
f"{proc.returncode} stdout: {stdout} "
f"stderr: {stderr}"
)
_LOGGER.debug("measurepower process stopped.")
def should_generate_load(self) -> bool:
return True
def has_samples(self) -> bool:
return self._sampled_data
def merge_power_data(self, model: trace_model.Model, fxt_path: str) -> None:
merge_power_data(model, self._csv_output_path, fxt_path)
def create_power_sampler(
config: PowerSamplerConfig, fallback_to_stub: bool = True
) -> PowerSampler:
"""Creates a power sampler.
In the absence of `_MEASUREPOWER_PATH_ENV_VARIABLE`, creates a no-op sampler.
"""
measurepower_path = config.measurepower_path or os.environ.get(
_MEASUREPOWER_PATH_ENV_VARIABLE
)
if not measurepower_path:
if not fallback_to_stub:
raise RuntimeError(
f"{_MEASUREPOWER_PATH_ENV_VARIABLE} env variable must be set"
)
_LOGGER.warning(
f"{_MEASUREPOWER_PATH_ENV_VARIABLE} env variable not set. "
"Using a no-op power sampler instead."
)
return _NoopPowerSampler(config)
else:
config = dataclasses.replace(
config, measurepower_path=measurepower_path
)
return _RealPowerSampler(config)
def read_fuchsia_trace_cpu_usage(
model: trace_model.Model,
) -> Mapping[int, Sequence[tuple[trace_time.TimePoint, float]]]:
"""
Read through the given fuchsia trace and return:
Args:
model: the model to extrace cpu usage data from
Returns:
{cpu: [timestamp, usage]}
where the timestamp is in ticks and usage is a 0.0 or 1.0 depending on if a
processes was scheduled for that interval.
"""
scheduling_intervals = {}
for cpu, intervals in model.scheduling_records.items():
scheduling_intervals[cpu] = list(
map(
lambda record: (record.start, 0.0 if record.is_idle() else 1.0),
# Drop the waking records, the don't count towards cpu usage.
filter(
lambda record: isinstance(
record, trace_model.ContextSwitch
),
intervals,
),
)
)
# The records are _probably_ sorted as is, but there's no guarantee of it.
# Let's guarantee it.
scheduling_intervals[cpu].sort(key=lambda kv: kv[0])
return scheduling_intervals
class Sample:
def __init__(
self,
timestamp: str,
current: str,
voltage: str,
aux_current: str | None,
) -> None:
self.timestamp = int(timestamp)
self.current = float(current)
self.voltage = float(voltage)
self.aux_current = (
float(aux_current) if aux_current is not None else None
)
def read_power_samples(power_trace_path: str) -> list[Sample]:
"""Return a tuple of the current and power samples from the power csv"""
samples: list[Sample] = []
with open(power_trace_path, "r") as power_csv:
reader = csv.reader(power_csv)
header = next(reader)
sample_count = 0
if header[0] == "Mandatory Timestamp":
if len(header) < 4:
create_sample = lambda line, count: Sample(
line[0], line[1], line[2], None
)
else:
create_sample = lambda line, count: Sample(
line[0], line[1], line[2], line[3]
)
else:
create_sample = lambda line, count: Sample(
count * SAMPLE_INTERVAL_NS, line[1], line[2], None
)
for line in reader:
samples.append(create_sample(line, sample_count))
sample_count += 1
return samples
def append_power_data(
fxt_path: str,
power_samples: list[Sample],
starting_ticks: int,
) -> None:
"""
Given a list of voltage and current samples spaced 200us apart and a starting timestamp, append
them to the trace at `fxt_path`.
Args:
fxt_path: the fxt file to write to
power_samples: the samples to append
starting_ticks: offset from the beginning of the trace in "ticks"
"""
with open(fxt_path, "ab") as merged_trace:
# Virtual koids have a top bit as 1, the remaining doesn't matter as long as it's unique.
fake_process_koid = 0x8C01_1EC7_EDDA_7A10 # CollectedData10
fake_thread_koid = 0x8C01_1EC7_EDDA_7A20 # CollectedData20
fake_thread_ref = 0xFF
def inline_string_ref(string: str) -> int:
# Inline fxt ids have their top bit set to 1. The remaining bits indicate the number of
# inline bytes.
return 0x8000 | len(string)
# See //docs/reference/tracing/trace-format for the below trace format
def thread_record_header(thread_ref: int) -> int:
thread_record_type = 3
thread_record_size_words = 3
return (
thread_ref << 16
| thread_record_size_words << 4
| thread_record_type
)
def kernel_object_record_header(
num_args: int, name_ref: int, obj_type: int, size_words: int
) -> int:
kernel_object_record_header_type = 7
return (
num_args << 40
| name_ref << 24
| obj_type << 16
| size_words << 4
| kernel_object_record_header_type
)
# The a fake process and thread records
merged_trace.write(
thread_record_header(fake_thread_ref).to_bytes(8, "little")
)
merged_trace.write(fake_process_koid.to_bytes(8, "little"))
merged_trace.write(fake_thread_koid.to_bytes(8, "little"))
ZX_OBJ_TYPE_PROCESS = 1
ZX_OBJ_TYPE_THREAD = 2
# Name the fake process
merged_trace.write(
kernel_object_record_header(
0,
inline_string_ref("Power Measurements"),
ZX_OBJ_TYPE_PROCESS,
5, # 1 word header, 1 word koid, 3 words for name stream
).to_bytes(8, "little")
)
merged_trace.write(fake_process_koid.to_bytes(8, "little"))
merged_trace.write(b"Power Measurements\0\0\0\0\0\0")
# Name the fake thread
merged_trace.write(
kernel_object_record_header(
0,
inline_string_ref("Power Measurements"),
ZX_OBJ_TYPE_THREAD,
5, # 1 word header, 1 word koid, 3 words for name stream
).to_bytes(8, "little")
)
merged_trace.write(fake_thread_koid.to_bytes(8, "little"))
merged_trace.write(b"Power Measurements\0\0\0\0\0\0")
def counter_event_header(
name_id: int,
category_id: int,
thread_ref: int,
num_args: int,
record_words: int,
) -> int:
counter_event_type = 1 << 16
event_record_type = 4
return (
(name_id << 48)
| (category_id << 32)
| (thread_ref << 24)
| (num_args << 20)
| counter_event_type
| record_words << 4
| event_record_type
)
# We write all our data to the same counter
COUNTER_ID = 0x000_0000_0000_0001
# Now write our sample data as counter events into the trace
for sample in power_samples:
# We will be providing either 3 or 4 arguments, depending on whether
# or not this sample has raw aux current data in it. Each argument
# takes 3 words of storage.
arg_count = 4 if sample.aux_current is not None else 3
arg_words = arg_count * 3
# Emit the counter track
merged_trace.write(
counter_event_header(
inline_string_ref("Metrics"),
inline_string_ref("Metrics"),
0xFF,
arg_count,
# 1 word counter, 1 word ts,
# 2 words inline strings
# |arg_words| words of arguments,
# 1 word counter id = 5 + |arg_words|
5 + arg_words,
).to_bytes(8, "little")
)
timestamp_ticks = int(
(sample.timestamp * TICKS_PER_NS) + starting_ticks
)
merged_trace.write(timestamp_ticks.to_bytes(8, "little"))
# Inline strings need to be 0 padded to a multiple of 8 bytes.
merged_trace.write(b"Metrics\0")
merged_trace.write(b"Metrics\0")
def double_argument_header(name_ref: int, size: int) -> int:
argument_type = 5
return name_ref << 16 | size << 4 | argument_type
# Write the Voltage
merged_trace.write(
double_argument_header(
inline_string_ref("Voltage"), 3
).to_bytes(8, "little")
)
merged_trace.write(b"Voltage\0")
data = [sample.voltage]
s = struct.pack("d" * len(data), *data)
merged_trace.write(s)
# Write the Current
merged_trace.write(
double_argument_header(
inline_string_ref("Current"), 3
).to_bytes(8, "little")
)
merged_trace.write(b"Current\0")
data = [sample.current]
s = struct.pack("d" * len(data), *data)
merged_trace.write(s)
# Write the Power
merged_trace.write(
double_argument_header(inline_string_ref("Power"), 3).to_bytes(
8, "little"
)
)
merged_trace.write(b"Power\0\0\0")
data = [sample.current * sample.voltage]
s = struct.pack("d" * len(data), *data)
merged_trace.write(s)
# Write the raw aux current, if present.
if sample.aux_current is not None:
merged_trace.write(
double_argument_header(
inline_string_ref("Raw Aux"), 3
).to_bytes(8, "little")
)
merged_trace.write(b"Raw Aux\0")
data = [sample.aux_current]
s = struct.pack("d" * len(data), *data)
merged_trace.write(s)
# Write the counter_id
merged_trace.write(COUNTER_ID.to_bytes(8, "little"))
def build_usage_samples(
scheduling_intervals: Mapping[
int, Sequence[tuple[trace_time.TimePoint, float]]
]
) -> dict[int, list[float]]:
usage_samples = {}
# Our per cpu records likely don't all start and end at the same time.
# We'll pad the other cpu tracks with idle time on either side.
max_len = 0
earliest_ts = min(x[0][0] for x in scheduling_intervals.values())
for cpu, intervals in scheduling_intervals.items():
# The power samples are a fixed interval apart. We'll synthesize cpu
# usage samples that also track over the same interval in this array.
cpu_usage_samples = []
# To make the conversion, let's start by converting our list of
# [(start_time, work)] into [(duration, work)]. We could probably do this
# in one pass, but doing this conversion first makes the logic easier
# to reason about.
(prev_ts, prev_work) = intervals[0]
# Idle pad the beginning of our intervals to start from a fixed timestamp.
weighted_work = deque([(prev_ts - earliest_ts, 0.0)])
for ts, work in intervals[1:]:
weighted_work.append((ts - prev_ts, prev_work))
(prev_ts, prev_work) = (ts, work)
# Finally, to get our fixed sample intervals, we'll use our [(duration,
# work)] list and pop chunks of work `sample_interval` ticks at a time.
# Then once we've accumulated enough weighted work to fill the
# schedule, we'll take the weighted average and call that our cpu usage
# for the interval.
usage: list[float] = []
durations: list[int] = []
interval_duration_remaining = trace_time.TimeDelta(SAMPLE_INTERVAL_NS)
for duration, work in weighted_work:
if interval_duration_remaining - duration >= trace_time.TimeDelta():
# This duration doesn't fill or finish a full sample interval,
# just append it to the accumulator lists.
usage.append(work)
durations.append(duration.to_nanoseconds())
interval_duration_remaining -= duration
else:
# We have enough work to record a sample. Insert what we need
# to top off the interval and add it to cpu_usage_samples
partial_duration = interval_duration_remaining
usage.append(work)
durations.append(partial_duration.to_nanoseconds())
duration -= partial_duration
average = weighted_average(usage, durations)
cpu_usage_samples.append(average)
# Now use up the rest of the duration. Not that it's possible
# that this duration might actually be longer a full sampling
# interval so we should synthesize multiple samples in that
# case.
remaining_duration = trace_time.TimeDelta(
duration.to_nanoseconds() % SAMPLE_INTERVAL_NS
)
num_extra_intervals = int(
duration.to_nanoseconds() / SAMPLE_INTERVAL_NS
)
cpu_usage_samples.extend(
[work for _ in range(0, num_extra_intervals)]
)
# Reset our accumulators with the leftover bits
usage = [work]
durations = [remaining_duration.to_nanoseconds()]
interval_duration_remaining = (
trace_time.TimeDelta(SAMPLE_INTERVAL_NS)
- remaining_duration
)
usage_samples[cpu] = cpu_usage_samples
max_len = max(max_len, len(cpu_usage_samples))
# Idle pad the end of our intervals to all contain the same number
for cpu, samples in usage_samples.items():
samples.extend([0 for _ in range(len(samples), max_len)])
return usage_samples
def merge_power_data(
model: trace_model.Model, power_trace_path: str, fxt_path: str
) -> None:
# We'll start by reading in the fuchsia cpu data from the trace model
scheduling_intervals = read_fuchsia_trace_cpu_usage(model)
power_samples = read_power_samples(power_trace_path)
# We can't just append the power data to the beginning of the trace. The
# trace starts before the power collection starts at some unknown offset.
# We'll have to first find this offset.
#
# We know that CPU usage and current draw are highly correlated. So we'll
# have the test start by running a cpu intensive workload in a known
# pattern for the first few seconds before starting the test. We can then
# synthesize cpu usage over the same duration and attempt to correlate the
# signals.
# The power samples come in at fixed intervals. We'll construct cpu usage
# data in the same intervals to attempt to correlate it at each offset.
# We'll assume the highest correlated offset is the delay the power
# sampling started at and we'll insert the samples starting at that
# timepoint.
earliest_ts = min(x[0][0] for x in scheduling_intervals.values())
# The trace model give us per cpu information about which processes start at
# which time. To compare it to the power samples we need to convert it into
# something of the form ["usage_sample_1", "usage_sample_2", ...] where
# each "usage_sample_n" is the cpu usage over a 200us duration.
usage_samples = build_usage_samples(scheduling_intervals)
# Now take take the average cpu usage across each cpu to get overall cpu usage which should
# correlate to our power/current samples.
merged = [samples for _, samples in usage_samples.items()]
avg_cpu_combined = [0.0] * len(merged[0])
for i, _ in enumerate(avg_cpu_combined):
total: float = 0
for sample_list in merged:
total += sample_list[i]
avg_cpu_combined[i] = total / len(merged)
# Finally, we can get the cross correlation between power and cpu usage. We run a known cpu
# heavy workload in the first 5ish seconds of the test so we limit our signal correlation to
# that portion. Power and CPU readings can be offset in either direction, but shouldn't be
# separated by more than a second.
# Due to limits on the maximum size of our traces, there is potential for particularly busy
# traces to not contain the entire first 5 seconds of the test. In order to ensure the
# correlation logic operates correctly, we need to compare consistent counts of samples for
# power and CPU readings. Thus, we take the a subset of the two datasets limited to a maximum
# of ~5 seconds worth of samples. (5khz * 5 seconds = 25000 samples) and compare it to the
# first ~4 seconds (80% of ~5 seconds) of the opposite dataset to attempt to match them up.
# This amounts to nominally comparing the first 4 seconds of power samples to the first 5
# seconds of CPU readings, and then vice-versa.
max_sequence_samples = 25000
signal_samples = min(
max_sequence_samples, len(avg_cpu_combined), len(power_samples)
)
# Ensures feature list is always shorter than the number of signal samples
feature_samples = int(0.8 * signal_samples)
(
power_after_cpu_correlation,
power_after_cpu_correlation_idx,
) = cross_correlate_arg_max(
avg_cpu_combined[0:signal_samples],
[s.current for s in power_samples[0:feature_samples]],
)
(
cpu_after_power_correlation,
cpu_after_power_correlation_idx,
) = cross_correlate_arg_max(
[s.current for s in power_samples[0:signal_samples]],
avg_cpu_combined[0:feature_samples],
)
starting_ticks = 0
if power_after_cpu_correlation >= cpu_after_power_correlation:
offset_ns = power_samples[power_after_cpu_correlation_idx].timestamp
print(f"Delaying power readings by {offset_ns/1000/1000}ms")
starting_ticks = int(
(earliest_ts + trace_time.TimeDelta(offset_ns))
.to_epoch_delta()
.to_nanoseconds()
* TICKS_PER_NS
)
else:
offset_ns = power_samples[cpu_after_power_correlation_idx].timestamp
print(f"Delaying CPU trace by {offset_ns/1000/1000}ms")
starting_ticks = int(
(earliest_ts - trace_time.TimeDelta(offset_ns))
.to_epoch_delta()
.to_nanoseconds()
* TICKS_PER_NS
)
print(f"Aligning Power Trace to start at {starting_ticks} ticks")
append_power_data(fxt_path, power_samples, starting_ticks)