| #!/usr/bin/env python3 |
| # |
| # Copyright 2024 The Fuchsia Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| import argparse |
| import os |
| import sys |
| import inspect |
| |
| # Check our environment variables. We may need to extend our path in order to |
| # find all of the libraries we will need for our imports. |
| extra_paths = os.environ.get("MONSOON_PYTHON_LIBS", None) |
| if extra_paths is not None: |
| path_list = [os.path.expanduser(x) for x in extra_paths.split(";")] |
| sys.path.extend(path_list) |
| |
| |
| # Now parse our args. Users may extend the path using arguments as well. |
| def ParseArgs(): |
| parser = argparse.ArgumentParser( |
| formatter_class=argparse.RawDescriptionHelpFormatter, |
| description="Captures power monitor data from a Monsoon HV Power Monitor", |
| epilog=inspect.cleandoc( |
| """ |
| Sample replacement behaviors: |
| |
| When calibration samples or dropped samples are encountered during capture, users may |
| select from one of four different behaviors for each of the types of samples in need of |
| replacement (calibration vs. dropped). They are: |
| |
| omit : Remove the samples from the output completely. |
| zero : Output samples with fixed values of zero for voltage and current. |
| repeat : Repeat the value of the last valid sample (this is the default). |
| sentinel : Output samples with a fixed value of (mA, V, raw_aux) == (1000, 12, 1000). |
| |
| Hermetic environment issues: |
| |
| Note that when running tests directly in the hermetic Fuchsia development environment, |
| additional paths may need to be passed to the script in order for it to function |
| properly. Notably, the Monsoon Power Monitor python library (found here |
| https://github.com/msoon/PyMonsoon), and libusb (which the Monsoon library depends on) |
| may be missing from the environment. |
| |
| Passing the -p command line argument to the script can be difficult when it is being |
| invoked automatically by the testing framework. In cases like this, users may make use |
| of the MONSOON_PYTHON_LIBS environment variable in order to work around the issue. It |
| contains a ";" separated list of paths which will be added to the location that the |
| script will search during imports. |
| |
| For example, if a user had checked out the Monsoon python library at `~/monsoon`, and had |
| used apt-get to install python's libusb support into their local system, the could set |
| their environment using: |
| |
| export MONSOON_PYTHON_LIBS="~/monsoon;/usr/lib/python3/dist-packages" |
| |
| Before invoking the test. In addition to getting the paths configured correctly to find |
| the extra libraries, the script will need to be told the serial number of the specific |
| Monsoon instance they are supposed to talk to. While this can be passed on the command |
| line, the MONSOON_SERIAL environment variable can also be used to configure this |
| parameter if access to the command line invocation of the script is impractical. If the |
| user's Monsoon was serial number 12345, the export would be: |
| |
| export MONSOON_SERIAL=12345 |
| """ |
| ), |
| ) |
| |
| parser.add_argument( |
| "-sn", |
| "--serialno", |
| type=int, |
| default=None, |
| help="The serial number of the Monsoon to use.", |
| ) |
| parser.add_argument( |
| "-format", "--out_format", help="ignored, we only support csv" |
| ) |
| |
| parser.add_argument( |
| "-p", |
| "--syspath", |
| default=[], |
| action="append", |
| help="Add one or more paths to the locations we look for python libraries. Can be used to " |
| "locate libraries like libusb or the Monsoon python library which may not be present in " |
| "the hermetic fuchsia environment", |
| ) |
| |
| parser.add_argument( |
| "-out", |
| "--csv_out", |
| required=True, |
| help="defines the file to output csv to", |
| ) |
| |
| parser.add_argument( |
| "-dsh", |
| "--dropped_sample_handling", |
| choices=["omit", "zero", "repeat", "sentinel"], |
| default="repeat", |
| help="Defines the behavior to use when dealing with detected dropped samples", |
| ) |
| |
| parser.add_argument( |
| "-csh", |
| "--calibration_sample_handling", |
| choices=["omit", "zero", "repeat", "sentinel"], |
| default="repeat", |
| help="Defines the behavior to use when dealing with inline calibration samples", |
| ) |
| |
| parser.add_argument( |
| "-a", |
| "--raw_aux", |
| action="store_true", |
| default=False, |
| help="When passed, causes the script to capture and output the raw values of the aux channel" |
| " which can be useful for synchronization purposes", |
| ) |
| |
| args = parser.parse_args() |
| |
| if args.serialno is None: |
| args.serialno = os.environ.get("MONSOON_SERIAL", None) |
| |
| if args.serialno is None: |
| print( |
| "Missing require Monsoon serial number. Either set the MONSOON_SERIAL environment" |
| "variable, or pass the serial number to use with --serialno" |
| ) |
| parser.print_help() |
| exit(-1) |
| |
| return parser.parse_args() |
| |
| |
| ARGS = ParseArgs() |
| sys.path.extend(ARGS.syspath) |
| |
| import Monsoon.HVPM as HVPM |
| import Monsoon.Operations as op |
| import Monsoon.pmapi as pmapi |
| import time |
| import struct |
| import signal |
| import pprint as pp |
| import enum |
| |
| |
| class SampleHandling(enum.Enum): |
| OMIT = 1 |
| ZERO = 2 |
| REPEAT = 3 |
| SENTINEL = 4 |
| |
| |
| def LookupSampleHandling(name): |
| return SampleHandling.__members__[name.upper()] |
| |
| |
| class Sample: |
| def __init__(self, unpacked_data): |
| self.main_coarse = unpacked_data[0] |
| self.main_fine = unpacked_data[1] |
| self.usb_coarse = unpacked_data[2] |
| self.usb_fine = unpacked_data[3] |
| self.aux_coarse = unpacked_data[4] |
| self.aux_fine = unpacked_data[5] |
| self.main_aux_voltage = unpacked_data[6] |
| self.usb_voltage = unpacked_data[7] |
| self.main_gain = unpacked_data[8] |
| self.usb_gain = unpacked_data[9] |
| |
| def __str__(self): |
| return pp.pformat(self.__dict__) |
| |
| |
| class Packet: |
| def __init__(self, data): |
| hdr = struct.unpack_from("HBB", data, 0) |
| self.dropped_count = hdr[0] |
| self.flags = hdr[1] |
| self.sample_count = hdr[2] |
| |
| sample_payloads = struct.iter_unpack(">HHHHhhHHBB", data[4:]) |
| self.samples = [Sample(x) for x in sample_payloads] |
| |
| def __str__(self): |
| x = "%12s : %5d 0x%04x\n" % ( |
| "dropped_count", |
| self.dropped_count, |
| self.dropped_count, |
| ) |
| x += "%12s : %5d 0x%04x\n" % ("flags", self.flags, self.flags) |
| x += "%12s : %5d 0x%04x\n" % ( |
| "sample_count", |
| self.sample_count, |
| self.sample_count, |
| ) |
| for s in self.samples: |
| x += str(s) + "\n" |
| return x |
| |
| |
| class AvgWindow: |
| def __init__(self, size): |
| self.data = [0.0 for x in range(size)] |
| self.ndx = 0 |
| self.sum = 0 |
| self.avg = None |
| |
| def AddVal(self, val): |
| self.sum += val - self.data[self.ndx] |
| self.data[self.ndx] = val |
| self.ndx += 1 |
| if self.ndx >= len(self.data): |
| self.ndx = 0 |
| self.avg = self.sum / len(self.data) |
| |
| def val(self): |
| return self.avg |
| |
| |
| class CalibrationWindow: |
| def __init__(self, scale, zero_offset): |
| self.scale = scale |
| self.zero_offset = zero_offset |
| self.ref_window = AvgWindow(5) |
| self.zero_window = AvgWindow(5) |
| self.ready = False |
| |
| def AddRefPoint(self, val): |
| self.ref_window.AddVal(val) |
| if ( |
| self.ref_window.val() is not None |
| and self.zero_window.val() is not None |
| ): |
| self.ready = True |
| |
| def AddZeroPoint(self, val): |
| self.zero_window.AddVal(val) |
| if ( |
| self.ref_window.val() is not None |
| and self.zero_window.val() is not None |
| ): |
| self.ready = True |
| |
| def CalibratePoint(self, val): |
| cal_ref = self.ref_window.val() |
| cal_zero = self.zero_window.val() |
| if cal_ref is None or cal_zero is None: |
| return None |
| |
| zero = cal_zero + self.zero_offset |
| d = cal_ref - zero |
| slope = self.scale / d if d != 0 else 0 |
| |
| return (val - zero) * slope |
| |
| def is_ready(self): |
| return self.ready |
| |
| |
| class CalibrationChannel: |
| def __init__( |
| self, coarse_scale_off, fine_scale_off, fine_threshold, val_fetcher |
| ): |
| self.coarse_window = CalibrationWindow(*coarse_scale_off) |
| self.fine_window = CalibrationWindow(*fine_scale_off) |
| self.fine_threshold = fine_threshold |
| self.val_fetcher = val_fetcher |
| |
| def is_ready(self): |
| return self.coarse_window.is_ready() and self.fine_window.is_ready() |
| |
| def CalibrateSample(self, sample): |
| coarse, fine = self.val_fetcher(sample) |
| if fine < self.fine_threshold: |
| return self.fine_window.CalibratePoint(fine) |
| else: |
| return self.coarse_window.CalibratePoint(coarse) |
| |
| def AddRefPoint(self, sample): |
| coarse, fine = self.val_fetcher(sample) |
| self.coarse_window.AddRefPoint(coarse) |
| self.fine_window.AddRefPoint(fine) |
| |
| def AddZeroPoint(self, sample): |
| coarse, fine = self.val_fetcher(sample) |
| self.coarse_window.AddZeroPoint(coarse) |
| self.fine_window.AddZeroPoint(fine) |
| |
| |
| class Sampler: |
| def __init__(self, monitor, outfile): |
| self.STOP_NOW = False |
| self.monitor = monitor |
| self.outfile = outfile |
| self.main_cal = CalibrationChannel( |
| ( |
| self.monitor.statusPacket.mainCoarseScale, |
| self.monitor.statusPacket.mainCoarseZeroOffset, |
| ), |
| ( |
| self.monitor.statusPacket.mainFineScale, |
| self.monitor.statusPacket.mainFineZeroOffset, |
| ), |
| self.monitor.fineThreshold, |
| lambda sample: (sample.main_coarse, sample.main_fine), |
| ) |
| self.aux_cal = CalibrationChannel( |
| (self.monitor.statusPacket.auxCoarseScale, 0.0), |
| (self.monitor.statusPacket.auxFineScale, 0.0), |
| self.monitor.auxFineThreshold, |
| lambda sample: (sample.aux_coarse, sample.aux_fine), |
| ) |
| # Magic numbers for voltage scaling were taken from |
| # https://github.com/msoon/PyMonsoon/blob/master/Monsoon/sampleEngine.py#L66 |
| self.main_voltage_scale = monitor.mainvoltageScale * (62.5 / 1e6) |
| |
| self.orig_sigint = None |
| self.orig_sigterm = None |
| |
| def sigint_stop(self, *unused_args): |
| self.STOP_NOW = True |
| signal.signal(signal.SIGINT, self.orig_sigint) |
| self.orig_sigint = None |
| |
| def sigterm_stop(self, *unused_args): |
| self.STOP_NOW = True |
| signal.signal(signal.SIGINT, self.orig_sigterm) |
| self.orig_sigterm = None |
| |
| def sample(self): |
| # Write the header to the output file and immediately flush it. The test |
| # framework is going to wait until the first data show up in the output file |
| # before it actually starts any tests. |
| hdr = "Mandatory Timestamp,Current,Voltage" |
| if ARGS.raw_aux: |
| hdr += ",Raw Aux" |
| self.outfile.write("%s\n" % hdr) |
| self.outfile.flush() |
| |
| # Set up our signal handlers to catch our shutdown signals, then capture raw |
| # packets until we receive one of those signals. Don't process the packets |
| # yet, we will take care of that once capturing is complete. |
| self.orig_sigint = signal.signal(signal.SIGINT, self.sigint_stop) |
| self.orig_sigterm = signal.signal(signal.SIGTERM, self.sigterm_stop) |
| |
| print("Capturing packets until SIGINT") |
| payloads = [] |
| self.monitor.StartSampling() |
| while not self.STOP_NOW: |
| before = time.monotonic_ns() |
| payload = self.monitor.BulkRead() |
| after = time.monotonic_ns() |
| payloads.append((payload, before, after)) |
| self.monitor.stopSampling() |
| |
| print("Processing %d collected packets" % (len(payloads),)) |
| first_packet = None |
| last_packet = None |
| last_packet_sample_count = None |
| last_sample = None |
| nominal_period_nsec = 200000 |
| min_raw_aux = None |
| samples = [] |
| |
| def GetHandler(handling_str): |
| handling = LookupSampleHandling(handling_str) |
| if handling == SampleHandling.OMIT: |
| return lambda _: None |
| if handling == SampleHandling.ZERO: |
| return lambda _: (0, 0, 0, False) |
| if handling == SampleHandling.REPEAT: |
| return lambda last: last |
| if handling == SampleHandling.SENTINEL: |
| return lambda _: (1000.0, 12.0, 1000.0, False) |
| |
| drop_handler = GetHandler(ARGS.dropped_sample_handling) |
| cal_handler = GetHandler(ARGS.calibration_sample_handling) |
| |
| # Now that we are done capturing, go back and process the samples filling |
| # our calibration windows and producing a list of samples with calibrated |
| # currents and translated voltages |
| for payload, before, after in payloads: |
| packet = Packet(payload) |
| packet.cts = after |
| |
| if last_packet is not None: |
| if last_packet.dropped_count != packet.dropped_count: |
| if last_packet.dropped_count < packet.dropped_count: |
| drop_count = ( |
| packet.dropped_count - last_packet.dropped_count |
| ) |
| print( |
| "WARNING: dropped %d packets, starting from sample count %d" |
| % (drop_count, len(samples)) |
| ) |
| if last_sample is not None: |
| filler = drop_handler(last_sample) |
| samples.extend([filler for i in range(drop_count)]) |
| else: |
| print( |
| "WARNING: ignoring bad drop count (last %d, this %d)" |
| % (last_packet.dropped_count, packet.dropped_count) |
| ) |
| |
| last_packet = packet |
| last_packet_sample_count = len(samples) |
| |
| for s in packet.samples: |
| # figure out what type of sample this is. If it is a calibration |
| # sample, we need to feed it to our average windows. |
| sample_type = s.usb_gain & 0x30 |
| |
| if sample_type == op.SampleType.Measurement: |
| if self.main_cal.is_ready() and self.aux_cal.is_ready(): |
| current = self.main_cal.CalibrateSample(s) |
| voltage = s.main_aux_voltage * self.main_voltage_scale |
| raw_aux_fine = s.aux_fine |
| |
| if last_sample is None: |
| first_packet = packet |
| last_packet = packet |
| |
| out_sample = last_sample = ( |
| current, |
| voltage, |
| raw_aux_fine, |
| True, |
| ) |
| min_raw_aux = ( |
| raw_aux_fine |
| if min_raw_aux is None |
| else min(raw_aux_fine, min_raw_aux) |
| ) |
| |
| elif sample_type == op.SampleType.ZeroCal: |
| self.main_cal.AddZeroPoint(s) |
| self.aux_cal.AddZeroPoint(s) |
| out_sample = cal_handler(last_sample) |
| elif sample_type == op.SampleType.refCal: |
| self.main_cal.AddRefPoint(s) |
| self.aux_cal.AddRefPoint(s) |
| out_sample = cal_handler(last_sample) |
| else: |
| print( |
| "WARNING: skipping sample with bad type (%02x)" |
| % (sample_type,) |
| ) |
| |
| if last_sample is not None: |
| samples.append(out_sample) |
| |
| # Finally, go back and do our best to fix up our timestamps, outputting our |
| # samples as we go. |
| |
| # Monsoon is _supposed_ to produce samples at a nominal rate of 5 KHz, |
| # however, in reality it does not. It tends to run about 700ppm slow. |
| # |
| # In order for our data to line up with collected trace data, we need to |
| # measure and do our best to fix this. We use the host clock and the |
| # capture time of the packets as our reference here, trusting that the host |
| # clock and the target clock are close enough to each other that the drift |
| # experienced over the test time is small enough to be tolerable. |
| # |
| # For now, we simply divide the time between the first and last captured |
| # packets with the number of samples present between the two to produce an |
| # average sample rate. This does not account for any drift back and forth |
| # during the capture, but at least it will align all of the samples to the |
| # trace data over the duration of the capture. |
| host_nsec = last_packet.cts - first_packet.cts |
| measured_period_nsec = host_nsec / last_packet_sample_count |
| ppm = ( |
| 1e6 |
| * (measured_period_nsec - nominal_period_nsec) |
| / nominal_period_nsec |
| ) |
| |
| print( |
| "Measured Period is %.3f nSec (nominal %.3f nSec ppm %.3f)" |
| % (measured_period_nsec, nominal_period_nsec, ppm) |
| ) |
| print("Minimum raw fine aux current value was %d" % (min_raw_aux,)) |
| |
| for i, s in enumerate(samples): |
| if s is not None: |
| common_output = (i * measured_period_nsec, s[0], s[1]) |
| if ARGS.raw_aux: |
| aux_val = s[2] - (min_raw_aux if s[3] else 0) |
| self.outfile.write( |
| "%d,%.7f,%.7f,%d\n" % (*common_output, aux_val) |
| ) |
| else: |
| self.outfile.write("%d,%.7f,%.7f\n" % common_output) |
| |
| |
| def main(): |
| try: |
| # Open and close the monitor in case teardown failed to complete previously. |
| HVMON = HVPM.Monsoon() |
| HVMON.setup_usb(ARGS.serialno, pmapi.USB_protocol()) |
| HVMON.closeDevice() |
| |
| # Create the monitor we will use and read the status packet so we have |
| # access to the calibration values. |
| HVMON = HVPM.Monsoon() |
| HVMON.setup_usb(ARGS.serialno, pmapi.USB_protocol()) |
| HVMON.fillStatusPacket() |
| |
| # Create our sampler and tell it to go, outputting to the user specified |
| # file as we do. |
| sampler = Sampler(HVMON, open(ARGS.csv_out, "w")) |
| sampler.sample() |
| |
| except OSError as err: |
| print("OS Error : " % (err,)) |
| if err.filename is not None: |
| print("Filename : %s" % (err.filename,)) |
| |
| finally: |
| HVMON.closeDevice() |
| |
| print("Done") |
| |
| |
| if __name__ == "__main__": |
| main() |