blob: a9bdee1f3900b63b4e2df3e861b7110b1baf7c3b [file] [log] [blame]
#!/usr/bin/env python3
#
# Copyright 2022 The Fuchsia Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import numpy as np
from antlion.controllers.monsoon_lib.sampling.engine.assembly_line import BufferList
from antlion.controllers.monsoon_lib.sampling.engine.transformer import ParallelTransformer
from antlion.controllers.monsoon_lib.sampling.engine.transformer import SequentialTransformer
class Tee(SequentialTransformer):
"""Outputs main_current values to the specified file.
Attributes:
_filename: the name of the file to open.
_fd: the filestream written to.
"""
def __init__(self, filename, measure_after_seconds=0):
"""Creates an OutputStream.
Args:
filename: the path to the file to write the collected data to.
measure_after_seconds: the number of seconds to skip before
logging data as part of the measurement.
"""
super().__init__()
self._filename = filename
self._fd = None
self.measure_after_seconds = measure_after_seconds
# The time of the first sample gathered.
self._start_time = None
def on_begin(self):
self._fd = open(self._filename, 'w+')
def on_end(self):
self._fd.close()
def _transform_buffer(self, buffer):
"""Writes the reading values to a file.
Args:
buffer: A list of HvpmReadings.
"""
for sample in buffer:
if self._start_time is None:
self._start_time = sample.sample_time
if (sample.sample_time - self._start_time <
self.measure_after_seconds):
continue
self._fd.write('%0.9f %.12f\n' %
(sample.sample_time, sample.main_current))
self._fd.flush()
return BufferList([buffer])
class PerfgateTee(SequentialTransformer):
"""Outputs records of nanoseconds,current,voltage to the specified file.
Similar to Tee, but this version includes voltage, which may help with
accuracy in the power calculations.
This output type can be enabled by passing this transformer to the
transformers kwarg in Monsoon.measure_power():
# Uses the default Tee
> monsoon.measure_power(..., output_path=filename])
# Uses PerfgateTee
> monsoon.measure_power(..., transformers=[PerfgateTee(filename)])
Attributes:
_filename: the name of the file to open.
_fd: the filestream written to.
"""
def __init__(self, filename, measure_after_seconds=0):
"""Creates an OutputStream.
Args:
filename: the path to the file to write the collected data to.
measure_after_seconds: the number of seconds to skip before logging
data as part of the measurement.
"""
super().__init__()
self._filename = filename
self._fd = None
self.measure_after_seconds = measure_after_seconds
# The time of the first sample gathered.
self._start_time = None
def on_begin(self):
self._fd = open(self._filename, 'w+')
def on_end(self):
self._fd.close()
def _transform_buffer(self, buffer):
"""Writes the reading values to a file.
Args:
buffer: A list of HvpmReadings.
"""
for sample in buffer:
if self._start_time is None:
self._start_time = sample.sample_time
if (sample.sample_time - self._start_time <
self.measure_after_seconds):
continue
self._fd.write(
'%i,%.6f,%.6f\n' %
(sample.sample_time * 1e9, sample.main_current,
sample.main_voltage))
self._fd.flush()
return BufferList([buffer])
class SampleAggregator(ParallelTransformer):
"""Aggregates the main current value and the number of samples gathered."""
def __init__(self, start_after_seconds=0):
"""Creates a new SampleAggregator.
Args:
start_after_seconds: The number of seconds to wait before gathering
data. Useful for allowing the device to settle after USB
disconnect.
"""
super().__init__()
self._num_samples = 0
self._sum_currents = 0
self.start_after_seconds = start_after_seconds
# The time of the first sample gathered.
self._start_time = None
def _transform_buffer(self, buffer):
"""Aggregates the sample data.
Args:
buffer: A buffer of H/LvpmReadings.
"""
for sample in buffer:
if self._start_time is None:
self._start_time = sample.sample_time
if sample.sample_time - self._start_time < self.start_after_seconds:
continue
self._num_samples += 1
self._sum_currents += sample.main_current
return buffer
@property
def num_samples(self):
"""The number of samples read from the device."""
return self._num_samples
@property
def sum_currents(self):
"""The total sum of current values gathered so far."""
return self._sum_currents
class DownSampler(SequentialTransformer):
"""Takes in sample outputs and returns a downsampled version of that data.
Note for speed, the downsampling must occur at a perfect integer divisor of
the Monsoon's sample rate (5000 hz).
"""
_MONSOON_SAMPLE_RATE = 5000
def __init__(self, downsample_factor):
"""Creates a DownSampler Transformer.
Args:
downsample_factor: The number of samples averaged together for a
single output sample.
"""
super().__init__()
self._mean_width = int(downsample_factor)
self._leftovers = []
def _transform_buffer(self, buffer):
"""Returns the buffer downsampled by an integer factor.
The algorithm splits data points into three categories:
tail: The remaining samples where not enough were collected to
reach the integer factor for downsampling. The tail is stored
in self._leftovers between _transform_buffer calls.
tailless_buffer: The samples excluding the tail that can be
downsampled directly.
Below is a diagram explaining the buffer math:
input: input buffer n input buffer n + 1
╔══════════════════════════╗ ╔══════════════════════════╗
... ║ ╔╗╔╗╔╗╔╗╔╗╔╗╔╗╔╗╔╗╔╗╔╗╔╗ ║ ║ ╔╗╔╗╔╗╔╗╔╗╔╗╔╗╔╗╔╗╔╗╔╗╔╗ ║ ...
║ ╚╝╚╝╚╝╚╝╚╝╚╝╚╝╚╝╚╝╚╝╚╝╚╝ ║ ║ ╚╝╚╝╚╝╚╝╚╝╚╝╚╝╚╝╚╝╚╝╚╝╚╝ ║
╚══════════════════════════╝ ╚══════════════════════════╝
▼ ▼
alg: ╔═════════════════════╦════╗ ╔═════════════════════╦════╗
║ ╔╗╔╗╔╗╔╗╔╗╔╗╔╗╔╗╔╗╔╗║╔╗╔╗║ ║ ╔╗╔╗╔╗╔╗╔╗╔╗╔╗╔╗╔╗╔╗║╔╗╔╗║
║ ╚╝╚╝╚╝╚╝╚╝╚╝╚╝╚╝╚╝╚╝║╚╝╚╝║ ║ ╚╝╚╝╚╝╚╝╚╝╚╝╚╝╚╝╚╝╚╝║╚╝╚╝║
... ║ tailless_buffer ║tail║ ║ tailless_buffer ║tail║ ...
╚═════════════════════╩════╝ ╚═════════════════════╩════╝
──┬───┘ └─┬─┘ ... └─┬─┘ └────┬─────┘ └─┬─┘ ... └─┬─┘ └──┬───
╔╗ ╔╗ ╔╗ ╔╗ ╔╗ ╔╗ ╔╗ ╔╗ ╔╗ ╔╗ ╔╗
╚╝ ╚╝ ╚╝ ╚╝ ╚╝ ╚╝ ╚╝ ╚╝ ╚╝ ╚╝ ╚╝
└─────────┬────────┘ └──────────┬─────────┘
▼ ▼
output: ╔════════════════╗ ╔════════════════╗
║ ╔╗ ╔╗ ╔╗ ╔╗ ╔╗ ║ ║ ╔╗ ╔╗ ╔╗ ╔╗ ╔╗ ║
║ ╚╝ ╚╝ ╚╝ ╚╝ ╚╝ ║ ║ ╚╝ ╚╝ ╚╝ ╚╝ ╚╝ ║
╚════════════════╝ ╚════════════════╝
output buffer n output buffer n + 1
"""
tail_length = int(
(len(buffer) + len(self._leftovers)) % self._mean_width)
tailless_buffer = np.array(buffer[:len(buffer) - tail_length])
sample_count = len(tailless_buffer) + len(self._leftovers)
downsampled_values = np.mean(
np.resize(
np.append(self._leftovers, tailless_buffer),
(sample_count // self._mean_width, self._mean_width)),
axis=1)
self._leftovers = buffer[len(buffer) - tail_length:]
return downsampled_values