blob: a2bf9b37b102f07eac08b78ac2ef62fb972ca0a4 [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use {
crate::protocol::{self, ParameterValue, ReportFormat, Value, MAX_PACKET_SIZE},
anyhow::{bail, format_err, Error},
serde::{Deserialize, Serialize},
serde_json as json,
std::{
cell::RefCell,
collections::HashMap,
io::{Read, Write},
os::raw::{c_uchar, c_ushort},
sync::mpsc,
time::{Duration, SystemTime},
},
usb_bulk::{InterfaceInfo, Open},
};
const GOOGLE_VENDOR_ID: c_ushort = 0x18d1;
const ZEDMON_PRODUCT_ID: c_ushort = 0xaf00;
const VENDOR_SPECIFIC_CLASS_ID: c_uchar = 0xff;
const ZEDMON_SUBCLASS_ID: c_uchar = 0xff;
const ZEDMON_PROTOCOL_ID: c_uchar = 0x00;
/// Matches the USB interface info of a Zedmon device.
fn zedmon_match(ifc: &InterfaceInfo) -> bool {
(ifc.dev_vendor == GOOGLE_VENDOR_ID)
&& (ifc.dev_product == ZEDMON_PRODUCT_ID)
&& (ifc.ifc_class == VENDOR_SPECIFIC_CLASS_ID)
&& (ifc.ifc_subclass == ZEDMON_SUBCLASS_ID)
&& (ifc.ifc_protocol == ZEDMON_PROTOCOL_ID)
}
/// Used by ZedmonClient to determine when data reporting should stop.
pub trait StopSignal {
fn should_stop(&mut self, timestamp_micros: u64) -> Result<bool, Error>;
}
/// Raises a stop signal after Zedmon has reported for a given duration. The duration is measured
/// from the timestamp provided by the first call to `should_stop`.
///
/// Measuring this way is important to ensure consistency between the durations specified for
/// recording and the downsampling interval. Most importantly, if the recording duration and
/// downsampling interval are identical, exactly one record should be emitted.
pub struct DurationStopper {
duration_micros: u64,
start_micros: Option<u64>,
}
impl DurationStopper {
pub fn new(duration: Duration) -> DurationStopper {
DurationStopper { duration_micros: duration.as_micros() as u64, start_micros: None }
}
}
impl StopSignal for DurationStopper {
fn should_stop(&mut self, timestamp_micros: u64) -> Result<bool, Error> {
if self.start_micros.is_none() {
self.start_micros.replace(timestamp_micros);
}
Ok(timestamp_micros - self.start_micros.unwrap() >= self.duration_micros)
}
}
/// Properties that can be queried using ZedmonClient::describe.
pub const DESCRIBABLE_PROPERTIES: [&'static str; 2] = ["shunt_resistance", "csv_header"];
/// A single record of output from ZedmonClient.
#[derive(Debug, Default, Deserialize, Serialize)]
struct ZedmonRecord {
timestamp_micros: u64,
shunt_voltage: f32,
bus_voltage: f32,
power: f32,
}
struct DownsamplerState {
last_output_micros: u64,
prev_record: ZedmonRecord,
bus_voltage_integral: f32,
shunt_voltage_integral: f32,
power_integral: f32,
}
/// Helper struct for downsampling Zedmon data.
struct Downsampler {
interval_micros: u64,
state: Option<DownsamplerState>,
}
impl Downsampler {
fn new(interval_micros: u64) -> Self {
Self { interval_micros, state: None }
}
/// Process a new record. If `record` completes a resampling interval, returns a ZedmonRecord
/// containing values averaged over the interval. Otherwise, returns None.
///
/// The average value of y(t) over the interval [t_low, t_high] is computed using the definition
/// avg(y(t), [t_low, t_high]) := 1 / (t_high - t_low) * \int_{t_low}^{t_high} y(s) ds.
/// As each record is processed, the integral is udpated over the previous time interval using
/// trapezoid rule integration, i.e.
/// \int_{t_1}^{t_2} y(s) ds \approx (t2 - t1) * (y(t1) + y(t2)) / 2.
fn process(&mut self, record: ZedmonRecord) -> Option<ZedmonRecord> {
// Initialize self.state if it is unset; otherwise grab a reference to it.
let state = match self.state.as_mut() {
None => {
self.state = Some(DownsamplerState {
last_output_micros: record.timestamp_micros,
prev_record: record,
bus_voltage_integral: 0.0,
shunt_voltage_integral: 0.0,
power_integral: 0.0,
});
return None;
}
Some(state) => state,
};
// If the latest raw data interval spans multiple output samples, skip output and
// reinitialize.
if record.timestamp_micros >= state.last_output_micros + 2 * self.interval_micros {
eprintln!(
"Raw data interval [{}, {}] contains multiple downsampling output times. Skipping \
output and reinitializing downsampling.",
state.last_output_micros, record.timestamp_micros
);
self.state = None;
return self.process(record);
}
let output_micros = state.last_output_micros + self.interval_micros;
let t1 = state.prev_record.timestamp_micros as f32;
let t2 = record.timestamp_micros as f32;
// No output this cycle -- update the state and return None.
if record.timestamp_micros < output_micros {
let dt_half = (t2 - t1) / 2.0;
state.shunt_voltage_integral +=
dt_half * (state.prev_record.shunt_voltage + record.shunt_voltage);
state.bus_voltage_integral +=
dt_half * (state.prev_record.bus_voltage + record.bus_voltage);
state.power_integral += dt_half * (state.prev_record.power + record.power);
state.prev_record = record;
return None;
}
let t_out = output_micros as f32;
let dt_half = (t_out - t1) / 2.0;
// Use linear interpolation to estimate the instantaneous value of each measured quantity at
// t_out.
let interpolate = |y1, y2| y1 + (t_out - t1) / (t2 - t1) * (y2 - y1);
let shunt_voltage_interp =
interpolate(state.prev_record.shunt_voltage, record.shunt_voltage);
let bus_voltage_interp = interpolate(state.prev_record.bus_voltage, record.bus_voltage);
let power_interp = interpolate(state.prev_record.power, record.power);
// For each measured quantity, use the previous value and the interpolated value to update
// its integral over [t1, t_out].
state.shunt_voltage_integral +=
dt_half * (state.prev_record.shunt_voltage + shunt_voltage_interp);
state.bus_voltage_integral +=
dt_half * (state.prev_record.bus_voltage + bus_voltage_interp);
state.power_integral += dt_half * (state.prev_record.power + power_interp);
// Divide each integral by the total length of the integration interval to get an average
// value of the measured quanity. This populates the output record.
let record_out = ZedmonRecord {
timestamp_micros: output_micros,
shunt_voltage: state.shunt_voltage_integral / (self.interval_micros as f32),
bus_voltage: state.bus_voltage_integral / (self.interval_micros as f32),
power: state.power_integral / (self.interval_micros as f32),
};
// Now integrate over [t_out, t2] to seed the integrals for the next output interval. In the
// edge case that t2 is at the reporting interval boundary, t2 - t_out == 0.0 and the
// integrals will be appropriately set to 0.0.
let dt_half = (t2 - t_out) / 2.0;
state.shunt_voltage_integral = dt_half * (shunt_voltage_interp + record.shunt_voltage);
state.bus_voltage_integral = dt_half * (bus_voltage_interp + record.bus_voltage);
state.power_integral = dt_half * (power_interp + record.power);
// Update remaining state and return.
state.last_output_micros = output_micros;
state.prev_record = record;
Some(record_out)
}
}
/// Interface to a Zedmon device.
#[derive(Debug)]
pub struct ZedmonClient<InterfaceType>
where
InterfaceType: usb_bulk::Open<InterfaceType> + Read + Write,
{
/// USB interface to the Zedmon device, or equivalent.
interface: RefCell<InterfaceType>,
/// Format of each field in a Report.
field_formats: Vec<ReportFormat>,
/// Information necessary to obtain power data from direct Zedmon measurements.
shunt_resistance: f32,
v_shunt_index: usize,
v_bus_index: usize,
}
impl<InterfaceType: usb_bulk::Open<InterfaceType> + Read + Write> ZedmonClient<InterfaceType> {
/// Enumerates all connected Zedmons. Returns a `Vec<String>` of their serial numbers.
fn enumerate() -> Vec<String> {
let mut serials = Vec::new();
// Instead of matching any devices, this callback extracts Zedmon serial numbers as
// InterfaceType::open iterates through them. InterfaceType::open is expected to return an
// error because no devices match.
let mut cb = |info: &InterfaceInfo| -> bool {
if zedmon_match(info) {
let null_pos = match info.serial_number.iter().position(|&c| c == 0) {
Some(p) => p,
None => {
eprintln!("Warning: Detected a USB device whose serial number was not null-terminated:");
eprintln!(
"{}",
(*String::from_utf8_lossy(&info.serial_number)).to_string()
);
return false;
}
};
serials
.push((*String::from_utf8_lossy(&info.serial_number[..null_pos])).to_string());
}
false
};
assert!(
InterfaceType::open(&mut cb).is_err(),
"open() should return an error, as the supplied callback cannot match any devices."
);
serials
}
// Number of USB packets that can be queued in Zedmon's USB interface, based on STM32F072
// hardware limitations. The firmware currently only enqueues one packet, but the STM32F072 does
// support double-buffering.
const ZEDMON_USB_QUEUE_SIZE: u32 = 2;
/// Disables reporting and drains all enqueued packets from `interface`.
///
/// This should be done if a packet of incorrect type is received on the first read from
/// `interface`. Typically, that scenario occurs if a previous invocation of the client
/// terminated irregularly and left reporting enabled, but in principle a packet could still be
/// enqueued from another request as well.
///
/// Empirically, this process takes 4-5x as long as the timeout configured on `interface`, and
/// it should not be performed unconditionally to avoid unnecessary delays.
///
/// An error is returned if a packet is still received once the packet queue should be clear.
fn disable_reporting_and_drain_packets(interface: &mut InterfaceType) -> Result<(), Error> {
Self::disable_reporting(interface)?;
// Read packets from the USB interface until an error (assumed to be due to lack of packets
// -- the reason for an error is not exposed) is encountered. If we do not encounter an
// error after reading more than the queue length, Zedmon is in an unexpected state.
let mut response = [0; MAX_PACKET_SIZE];
for _ in 0..Self::ZEDMON_USB_QUEUE_SIZE + 1 {
if interface.read(&mut response).is_err() {
return Ok(());
}
}
return Err(format_err!(
"The Zedmon device is in an unexpected state; received more than {} packets after \
disabling reporting. Consider rebooting the device.",
Self::ZEDMON_USB_QUEUE_SIZE
));
}
/// Creates a new ZedmonClient instance.
// TODO(fxbug.dev/61148): Make the behavior predictable if multiple Zedmons are attached.
fn new(mut interface: InterfaceType) -> Result<Self, Error> {
// Query parameters, disabling reporting and draining packets if a packet of the wrong type
// is received on the first attempt.
let parameters = match Self::get_parameters(&mut interface) {
Err(e) => match e.downcast_ref::<protocol::Error>() {
Some(protocol::Error::WrongPacketType { .. }) => {
eprintln!(
"WARNING: Received unexpected packet type while initializing; a prior \
client invocation may have not terminated cleanly. Disabling reporting and \
draining buffered packets. Be sure to terminate recording with ENTER."
);
Self::disable_reporting_and_drain_packets(&mut interface)?;
Self::get_parameters(&mut interface)
}
_ => Err(e),
},
ok => ok,
}?;
let field_formats = Self::get_field_formats(&mut interface)?;
let shunt_resistance = {
let value = parameters["shunt_resistance"];
if let Value::F32(v) = value {
v
} else {
bail!("Wrong value type for shunt_resistance: {:?}", value);
}
};
// Use a HashMap to assist in field lookup for simplicity. Note that the Vec<ReportFormat>
// representation needs to be retained for later Report-parsing.
let formats_by_name: HashMap<String, ReportFormat> =
field_formats.iter().map(|f| (f.name.clone(), f.clone())).collect();
let v_shunt_index = formats_by_name["v_shunt"].index as usize;
let v_bus_index = formats_by_name["v_bus"].index as usize;
Ok(Self {
interface: RefCell::new(interface),
field_formats,
shunt_resistance,
v_shunt_index,
v_bus_index,
})
}
/// Describes properties of the Zedmon device and/or ZedmonClient.
pub fn describe(&self, name: &str) -> Result<json::Value, Error> {
match name {
"shunt_resistance" => Ok(json::json!(self.shunt_resistance)),
"csv_header" => {
let mut writer = csv::Writer::from_writer(Vec::new());
writer.serialize(ZedmonRecord::default())?;
let lines = String::from_utf8(writer.into_inner()?)?;
let header = lines.split('\n').nth(0).unwrap();
Ok(json::json!(header))
}
_ => panic!("'{}' is not a valid parameter name.", name),
}
}
/// Retrieves a ParameterValue from the provided Zedmon interface.
fn get_parameter(interface: &mut InterfaceType, index: u8) -> Result<ParameterValue, Error> {
let request = protocol::encode_query_parameter(index);
interface.write(&request)?;
let mut response = [0; MAX_PACKET_SIZE];
let len = interface.read(&mut response)?;
Ok(protocol::parse_parameter_value(&mut &response[0..len])?)
}
/// Retrieves every ParameterValue from the provided Zedmon interface.
fn get_parameters(interface: &mut InterfaceType) -> Result<HashMap<String, Value>, Error> {
let mut parameters = HashMap::new();
loop {
let parameter = Self::get_parameter(interface, parameters.len() as u8)?;
if parameter.name.is_empty() {
return Ok(parameters);
}
parameters.insert(parameter.name, parameter.value);
}
}
/// Retrieves a ReportFormat from the provided Zedmon interface.
fn get_report_format(interface: &mut InterfaceType, index: u8) -> Result<ReportFormat, Error> {
let request = protocol::encode_query_report_format(index);
interface.write(&request)?;
let mut response = [0; MAX_PACKET_SIZE];
let len = interface.read(&mut response)?;
Ok(protocol::parse_report_format(&response[..len])?)
}
/// Retrieves the ReportFormat for each Report field from the provided Zedmon interface.
fn get_field_formats(interface: &mut InterfaceType) -> Result<Vec<ReportFormat>, Error> {
let mut all_fields = vec![];
loop {
let format = Self::get_report_format(interface, all_fields.len() as u8)?;
if format.index == protocol::REPORT_FORMAT_INDEX_END {
return Ok(all_fields);
}
all_fields.push(format);
}
}
/// Disables reporting on the Zedmon device.
fn disable_reporting(interface: &mut InterfaceType) -> Result<(), Error> {
let request = protocol::encode_disable_reporting();
interface.write(&request)?;
Ok(())
}
/// Enables reporting on the Zedmon device.
fn enable_reporting(&self) -> Result<(), Error> {
let request = protocol::encode_enable_reporting();
self.interface.borrow_mut().write(&request)?;
Ok(())
}
/// Starts a thread to process Report packets.
fn start_report_processing_thread(
packet_receiver: mpsc::Receiver<Vec<u8>>,
parser: protocol::ReportParser,
writer: Box<dyn Write + Send>,
mut stopper: impl StopSignal + Send + 'static,
shunt_resistance: f32,
v_shunt_index: usize,
v_bus_index: usize,
reporting_interval_micros: Option<u64>,
) -> std::thread::JoinHandle<Result<(), Error>> {
std::thread::spawn(move || {
// The CSV header is suppressed. Clients may query it by using `describe`.
let mut writer = csv::WriterBuilder::new().has_headers(false).from_writer(writer);
let mut downsampler = reporting_interval_micros.map(|r| Downsampler::new(r));
for buffer in packet_receiver.iter() {
let reports = parser.parse_reports(&buffer)?;
for report in reports.into_iter() {
let (shunt_voltage, bus_voltage) =
match (report.values[v_shunt_index], report.values[v_bus_index]) {
(Value::F32(x), Value::F32(y)) => (x, y),
t => {
return Err(format_err!(
"Got wrong value types for (v_shunt, v_bus): {:?}",
t
))
}
};
let record = ZedmonRecord {
timestamp_micros: report.timestamp_micros,
shunt_voltage,
bus_voltage,
power: bus_voltage * shunt_voltage / shunt_resistance,
};
// Explicit flushing is performed when downsampling because typically the
// reporting rate is relatively low, and the user may want to see the output in
// realtime. Meanwhile, if not downsampling, there's sufficient output that
// automatic flushing is fairly frequent. This behavior could be tuned if the
// need arises.
match downsampler.as_mut() {
Some(downsampler) => match downsampler.process(record) {
Some(r) => {
writer.serialize(r)?;
writer.flush()?;
}
None => {}
},
None => writer.serialize(record)?,
}
if stopper.should_stop(report.timestamp_micros)? {
writer.flush()?;
// The main thread will detect this thread's completion via disconnection
// of packet_receiver.
return Ok(());
}
}
}
Err(format_err!("Packet sender should not close before packet receiver"))
})
}
// Number of retries to attempt in case of a USB read failure.
const NUM_USB_READ_RETRIES: usize = 3;
#[cfg(test)]
pub fn num_usb_read_retries() -> usize {
Self::NUM_USB_READ_RETRIES
}
/// Runs the I/O part of reporting.
fn run_report_io(&self, packet_sender: mpsc::Sender<Vec<u8>>) -> Result<(), Error> {
// Enable reporting and run the main loop.
self.enable_reporting()?;
let mut num_retries = Self::NUM_USB_READ_RETRIES;
loop {
let mut buffer = vec![0; MAX_PACKET_SIZE];
match self.interface.borrow_mut().read(&mut buffer) {
Err(e) => {
eprint!("USB read error: {}.", e);
if num_retries > 0 {
num_retries -= 1;
continue;
}
eprintln!(""); // Finish "USB read error" line
break Err(format_err!(
"Giving up after {} USB read failures.",
Self::NUM_USB_READ_RETRIES + 1
));
}
Ok(bytes_read) => {
num_retries = Self::NUM_USB_READ_RETRIES;
buffer.truncate(bytes_read);
}
}
if let Err(_) = packet_sender.send(buffer) {
Self::disable_reporting(&mut self.interface.borrow_mut())?;
break Ok(());
}
}
}
/// Reads reported data from the Zedmon device, taking care of enabling/disabling reporting.
///
/// Measurement data is written to `writer`. Reporting will cease when `stopper` raises its stop
/// signal.
pub fn read_reports(
&self,
writer: Box<dyn Write + Send>,
stopper: impl StopSignal + Send + 'static,
reporting_interval: Option<Duration>,
) -> Result<(), Error> {
// This function's workload is shared between its main thread and processing_thread.
//
// The main thread enables reporting, reads USB packets via blocking reads, and sends those
// packets to processing_thread via packet_sender. Meanwhile, processing_thread parses each
// packet it receives to a Vec<Report>, which it then formats and outputs via `writer`.
//
// When `stopper` indicates that reporting should stop, processing_thread exits, and the
// main thread learns of the termination via the closure of packet_receiver.
//
// The multithreading has not been confirmed as necessary for performance reasons, but it
// seems like a reasonable thing to do, as both reading from USB and outputting (typically
// to stdout or a file) involve blocking on I/O.
let (packet_sender, packet_receiver) = mpsc::channel::<Vec<u8>>();
let processing_thread = Self::start_report_processing_thread(
packet_receiver,
protocol::ReportParser::new(&self.field_formats)?,
writer,
stopper,
self.shunt_resistance,
self.v_shunt_index,
self.v_bus_index,
reporting_interval.map(|d| d.as_micros() as u64),
);
let report_io_result = self.run_report_io(packet_sender);
// Join with the processing thread, and attempt to interpret the result of any panic that
// may have occurred.
let processing_result = match processing_thread.join() {
Ok(result) => result,
Err(e) => match e.downcast::<&str>() {
Ok(s) => panic!("Processing thread panicked with error '{}'", s),
Err(e) => match e.downcast::<String>() {
Ok(s) => panic!("Processing thread panicked with error '{}'", s),
Err(_) => panic!("Processing thread panicked; unable to interpret error."),
},
},
};
report_io_result.and(processing_result)
}
/// Returns a tuple consisting of:
/// - An estimate of the offset between the Zedmon clock and the host clock, in nanoseconds.
/// The offset is defined such that, in the absence of drift,
/// `zedmon_clock + offset = host_time`.
/// It is typically, but not necessarily, positive. (Note that: (1) the signedness prevents
/// std::time::Duration from being a valid return type, and (2) i64 will suffice to
/// represent an offset of over 290 years in nanoseconds.)
/// - An estimate of the uncertainty in the offset, in nanoseconds. In the absence of clock
/// drift, the offset is accurate within ┬▒uncertainty.
// TODO(fxbug.dev/61471): Consider using microseconds instead, in correspondence with Zedmon
// timestamp units.
pub fn get_time_offset_nanos(&self) -> Result<(i64, i64), Error> {
let mut interface = self.interface.borrow_mut();
// For each query, we estimate that the retrieved timestamp reflects Zedmon's clock halfway
// between the duration spanned by our QueryTime request and Zedmon's Timestamp response.
// That allows us to estimate the offset between the host clock and Zedmon's.
//
// We run `TIME_OFFSET_NUM_QUERIES` queries and keep the estimate corresponding to the
// shortest query duration. That provides some guarding against transient sources of
// latency.
//
// Note: Unlike other methods that interact with the USB interface, this method does not
// separate out a "get_timestamp" function to keep the parsing time from contributing to
// transit time.
let mut best_offset_nanos = 0;
let mut best_query_duration_nanos = i64::MAX;
// Number of timestamp query round trips used to determine time offset.
const TIME_OFFSET_NUM_QUERIES: u32 = 10;
for _ in 0..TIME_OFFSET_NUM_QUERIES {
let request = protocol::encode_query_time();
let mut response = [0u8; MAX_PACKET_SIZE];
let host_clock_at_start = SystemTime::now();
interface.write(&request)?;
let len = interface.read(&mut response)?;
// `elapsed` could return an error if the host clock moved backwards. That should be
// rare, but if it does occur, throw out this query. (Note, however, that the offset
// would be invalidated by future jumps in the system clock.)
let query_duration_nanos = match host_clock_at_start.elapsed() {
Ok(duration) => duration.as_nanos() as i64,
Err(_) => continue,
};
if query_duration_nanos < best_query_duration_nanos {
let timestamp_micros = protocol::parse_timestamp_micros(&response[..len])? as i64;
let host_nanos_at_timestamp =
host_clock_at_start.duration_since(SystemTime::UNIX_EPOCH)?.as_nanos() as i64
+ query_duration_nanos / 2;
best_offset_nanos = host_nanos_at_timestamp - timestamp_micros * 1_000;
best_query_duration_nanos = query_duration_nanos;
}
}
// Uncertainty comes from two sources:
// - Mapping a microsecond Zedmon timestamp to nanoseconds. In the worst case, the
// timestamp is 999ns stale.
// - Estimating the instant of the timestamp in the query interval. Since we used the
// midpoint, at worst we're off by one half the query duration.
let uncertainty_nanos = 999 + best_query_duration_nanos / 2 + best_query_duration_nanos % 2;
Ok((best_offset_nanos, uncertainty_nanos))
}
/// Enables or disables the relay on the Zedmon device.
pub fn set_relay(&self, enable: bool) -> Result<(), Error> {
let request = protocol::encode_set_output(protocol::Output::Relay as u8, enable);
self.interface.borrow_mut().write(&request)?;
Ok(())
}
}
/// Lists the serial numbers of all connected Zedmons.
pub fn list() -> Vec<String> {
ZedmonClient::<usb_bulk::Interface>::enumerate()
}
pub fn zedmon() -> ZedmonClient<usb_bulk::Interface> {
let interface = usb_bulk::Interface::open(&mut zedmon_match).unwrap();
let result = ZedmonClient::new(interface);
if result.is_err() {
eprintln!("Error initializing ZedmonClient: {:?}", result);
}
result.unwrap()
}
#[cfg(test)]
mod tests {
use {
super::*,
anyhow::{format_err, Error},
num::FromPrimitive,
protocol::{tests::serialize_reports, PacketType, Report, ScalarType},
std::collections::VecDeque,
std::rc::Rc,
test_util::assert_near,
};
// Used by `interface_info`, below, as a convenient means of constructing InterfaceInfo.
struct ShortInterface<'a> {
dev_vendor: ::std::os::raw::c_ushort,
dev_product: ::std::os::raw::c_ushort,
ifc_class: ::std::os::raw::c_uchar,
ifc_subclass: ::std::os::raw::c_uchar,
ifc_protocol: ::std::os::raw::c_uchar,
serial_number: &'a str,
}
fn interface_info(short: ShortInterface<'_>) -> InterfaceInfo {
let mut serial = [0; 256];
for (i, c) in short.serial_number.as_bytes().iter().enumerate() {
serial[i] = *c;
}
InterfaceInfo {
dev_vendor: short.dev_vendor,
dev_product: short.dev_product,
dev_class: 0,
dev_subclass: 0,
dev_protocol: 0,
ifc_class: short.ifc_class,
ifc_subclass: short.ifc_subclass,
ifc_protocol: short.ifc_protocol,
has_bulk_in: 0,
has_bulk_out: 0,
writable: 0,
serial_number: serial,
device_path: [0; 256usize],
}
}
#[test]
fn test_enumerate() {
// AVAILABLE_DEVICES is state for the static method FakeEnumerationInterface::open. This
// test is single-threaded, so a thread-local static provides the most appropriate safe
// interface.
thread_local! {
static AVAILABLE_DEVICES: RefCell<Vec<InterfaceInfo>> = RefCell::new(Vec::new());
}
fn push_device(short: ShortInterface<'_>) {
AVAILABLE_DEVICES.with(|devices| {
devices.borrow_mut().push(interface_info(short));
});
}
struct FakeEnumerationInterface {}
impl usb_bulk::Open<FakeEnumerationInterface> for FakeEnumerationInterface {
fn open<F>(matcher: &mut F) -> Result<FakeEnumerationInterface, Error>
where
F: FnMut(&InterfaceInfo) -> bool,
{
AVAILABLE_DEVICES.with(|devices| {
let devices = devices.borrow();
for device in devices.iter() {
if matcher(device) {
return Ok(FakeEnumerationInterface {});
}
}
Err(format_err!("No matching devices found."))
})
}
}
impl Read for FakeEnumerationInterface {
fn read(&mut self, _: &mut [u8]) -> std::io::Result<usize> {
Ok(0)
}
}
impl Write for FakeEnumerationInterface {
fn write(&mut self, _: &[u8]) -> std::io::Result<usize> {
Ok(0)
}
fn flush(&mut self) -> std::io::Result<()> {
Ok(())
}
}
// No devices connected
let serials = ZedmonClient::<FakeEnumerationInterface>::enumerate();
assert!(serials.is_empty());
// One device: not-a-zedmon-1
push_device(ShortInterface {
dev_vendor: 0xdead,
dev_product: ZEDMON_PRODUCT_ID,
ifc_class: VENDOR_SPECIFIC_CLASS_ID,
ifc_subclass: ZEDMON_SUBCLASS_ID,
ifc_protocol: ZEDMON_PROTOCOL_ID,
serial_number: "not-a-zedmon-1",
});
let serials = ZedmonClient::<FakeEnumerationInterface>::enumerate();
assert!(serials.is_empty());
// Two devices: not-a-zedmon-1, zedmon-1
push_device(ShortInterface {
dev_vendor: GOOGLE_VENDOR_ID,
dev_product: ZEDMON_PRODUCT_ID,
ifc_class: VENDOR_SPECIFIC_CLASS_ID,
ifc_subclass: ZEDMON_SUBCLASS_ID,
ifc_protocol: ZEDMON_PROTOCOL_ID,
serial_number: "zedmon-1",
});
let serials = ZedmonClient::<FakeEnumerationInterface>::enumerate();
assert_eq!(serials, ["zedmon-1"]);
// Three devices: not-a-zedmon-1, zedmon-1, not-a-zedmon-2
push_device(ShortInterface {
dev_vendor: GOOGLE_VENDOR_ID,
dev_product: 0xbeef,
ifc_class: VENDOR_SPECIFIC_CLASS_ID,
ifc_subclass: ZEDMON_SUBCLASS_ID,
ifc_protocol: ZEDMON_PROTOCOL_ID,
serial_number: "not-a-zedmon-2",
});
let serials = ZedmonClient::<FakeEnumerationInterface>::enumerate();
assert_eq!(serials, ["zedmon-1"]);
// Four devices: not-a-zedmon-1, zedmon-1, not-a-zedmon-2, zedmon-2
push_device(ShortInterface {
dev_vendor: GOOGLE_VENDOR_ID,
dev_product: ZEDMON_PRODUCT_ID,
ifc_class: VENDOR_SPECIFIC_CLASS_ID,
ifc_subclass: ZEDMON_SUBCLASS_ID,
ifc_protocol: ZEDMON_PROTOCOL_ID,
serial_number: "zedmon-2",
});
let serials = ZedmonClient::<FakeEnumerationInterface>::enumerate();
assert_eq!(serials, ["zedmon-1", "zedmon-2"]);
}
// Provides test support for ZedmonClient functionality that interacts with a Zedmon device. It
// chiefly provides:
// - FakeZedmonInterface, for faking ZedmonClient's access to a Zedmon device.
// - Coordinator, for coordinating activity between the test and a FakeZedmonInterface.
// - CoordinatorBuilder, for producing a Coordinator instance with its various optional
// settings.
mod fake_device {
use {
super::*,
num::FromPrimitive,
protocol::{tests::*, PacketType, Unit},
};
// Coordinates interactions between FakeZedmonInterface and a test.
//
// To test ZedmonClient::read_reports:
// - Use CoordinatorBuilder::with_report_queue to populate the `report_queue` field. Each
// entry in the queue is a Vec<Report> that will be serialized into a single packet. The
// caller will need to ensure that reports are written to an accessible location.
// - Create ZedmonClient with a DurationStopper whose duration is spanned by the enqueued
// Reports.
//
// To test ZedmonClient::get_time_offset_nanos, use CoordinatorBuilder::with_offset_time to
// populate `offset_time`. Timestamps will be reported as `host_time - offset_time`; the
// fake Zedmon's clock will run perfectly in parallel to the host clock. Note that only
// timestamps reported in Timestamp packets are affected by `offset_time`; timestamps in
// Report packets are directly specified by the test, via `report_queue`.
//
// To test ZedmonClient::set_relay, use CoordinatorBuilder::with_relay_enabled to
// populate the relay state, and Coordinator::relay_enabled to check expectations.
pub struct Coordinator {
// Constants that define the fake device.
device_config: DeviceConfiguration,
// See struct comment.
report_queue: Option<VecDeque<Vec<Report>>>,
// Offset between the fake Zedmon's clock and host clock, such that `zedmon_time +
// offset_time = host_time`. Only used for Timestamp packets; the timestamps in
// Reports are set by the test when populating `report_queue`.
offset_time: Option<Duration>,
// Whether Zedmon's relay is enabled.
relay_enabled: Option<bool>,
}
impl Coordinator {
pub fn relay_enabled(&self) -> bool {
self.relay_enabled.expect("relay_enabled not set")
}
fn get_device_config(&self) -> DeviceConfiguration {
self.device_config.clone()
}
// Gets the next packet's worth of Reports, if available.
fn get_reports_for_packet(&mut self) -> Option<Vec<Report>> {
let report_queue = self.report_queue.as_mut().expect("report_queue not set");
report_queue.pop_front()
}
// Retrieves a timestamp in microseconds to fill a Timestamp packet.
fn get_timestamp_micros(&self) -> u64 {
let offset_time = self.offset_time.expect("offset_time not set");
let zedmon_now = SystemTime::now() - offset_time;
let timestamp = zedmon_now.duration_since(SystemTime::UNIX_EPOCH).unwrap();
timestamp.as_micros() as u64
}
fn set_relay_enabled(&mut self, enabled: bool) {
let relay_enabled = self.relay_enabled.as_mut().expect("relay_enabled not set");
*relay_enabled = enabled;
}
}
// Constants that are inherent to a Zedmon device. Even if these values are not exercised by
// a test, dummy values will be required by ZedmonClient::new().
#[derive(Clone, Debug)]
pub struct DeviceConfiguration {
pub shunt_resistance: f32,
pub v_shunt_scale: f32,
pub v_bus_scale: f32,
}
// Provides the interface for building a Coordinator with its various optional settings.
pub struct CoordinatorBuilder {
device_config: DeviceConfiguration,
report_queue: Option<VecDeque<Vec<Report>>>,
offset_time: Option<Duration>,
relay_enabled: Option<bool>,
}
impl CoordinatorBuilder {
pub fn new(device_config: DeviceConfiguration) -> Self {
CoordinatorBuilder {
device_config,
report_queue: None,
offset_time: None,
relay_enabled: None,
}
}
pub fn with_report_queue(mut self, report_queue: VecDeque<Vec<Report>>) -> Self {
self.report_queue.replace(report_queue);
self
}
pub fn with_offset_time(mut self, offset_time: Duration) -> Self {
self.offset_time.replace(offset_time);
self
}
pub fn with_relay_enabled(mut self, enabled: bool) -> Self {
self.relay_enabled.replace(enabled);
self
}
pub fn build(self) -> Rc<RefCell<Coordinator>> {
Rc::new(RefCell::new(Coordinator {
device_config: self.device_config,
report_queue: self.report_queue,
offset_time: self.offset_time,
relay_enabled: self.relay_enabled,
}))
}
}
// Indicates the contents of the next read from FakeZedmonInterface.
#[derive(Debug)]
enum NextRead {
ParameterValue(u8),
ReportFormat(u8),
Report,
Timestamp,
}
// Interface that provides fakes for testing interactions with a Zedmon device.
pub struct FakeZedmonInterface {
coordinator: Rc<RefCell<Coordinator>>,
// The type of read that wil be performed next from this interface, if any.
next_read: Option<NextRead>,
}
impl usb_bulk::Open<FakeZedmonInterface> for FakeZedmonInterface {
fn open<F>(_matcher: &mut F) -> Result<FakeZedmonInterface, Error>
where
F: FnMut(&InterfaceInfo) -> bool,
{
Err(format_err!("usb_bulk::Open not implemented"))
}
}
impl FakeZedmonInterface {
pub fn new(coordinator: Rc<RefCell<Coordinator>>) -> Self {
Self { coordinator, next_read: None }
}
// Populates a ParameterValue packet.
fn read_parameter_value(&mut self, index: u8, buffer: &mut [u8]) -> usize {
match index {
0 => serialize_parameter_value(
ParameterValue {
name: "shunt_resistance".to_string(),
value: Value::F32(
self.coordinator.borrow().get_device_config().shunt_resistance,
),
},
buffer,
),
1 => serialize_parameter_value(
ParameterValue { name: "".to_string(), value: Value::U8(0) },
buffer,
),
_ => panic!("Should only receive 0 or 1 as indices"),
}
}
// Populates a ReportFormat packet.
fn read_report_format(&self, index: u8, buffer: &mut [u8]) -> usize {
match index {
0 => serialize_report_format(
ReportFormat {
index,
field_type: ScalarType::I16,
unit: Unit::Volts,
scale: self.coordinator.borrow().get_device_config().v_shunt_scale,
name: "v_shunt".to_string(),
},
buffer,
),
1 => serialize_report_format(
ReportFormat {
index,
field_type: ScalarType::I16,
unit: Unit::Volts,
scale: self.coordinator.borrow().get_device_config().v_bus_scale,
name: "v_bus".to_string(),
},
buffer,
),
2 => serialize_report_format(
ReportFormat {
index: protocol::REPORT_FORMAT_INDEX_END,
field_type: ScalarType::U8,
unit: Unit::Volts,
scale: 0.0,
name: "".to_string(),
},
buffer,
),
_ => panic!("Should only receive 0, 1, or 2 as indices"),
}
}
// Populates a Report packet. If no Reports are available, the buffer is not modified,
// and attempting to parse reports from it will be an error. The processing thread will
// be guarded from attempting such parses by the StopSignal.
fn read_reports(&mut self, buffer: &mut [u8]) -> usize {
match self.coordinator.borrow_mut().get_reports_for_packet() {
Some(reports) => serialize_reports(&reports, buffer),
None => 0,
}
}
// Populates a Timestamp packet.
fn read_timestamp(&mut self, buffer: &mut [u8]) -> usize {
buffer[0] = PacketType::Timestamp as u8;
serialize_timestamp_micros(self.coordinator.borrow().get_timestamp_micros(), buffer)
}
fn set_output(&self, index: u8, value: u8) {
if index == protocol::Output::Relay as u8 {
self.coordinator.borrow_mut().set_relay_enabled(value != 0);
}
}
}
impl Read for FakeZedmonInterface {
fn read(&mut self, buffer: &mut [u8]) -> std::io::Result<usize> {
match self.next_read.take() {
Some(value) => Ok(match value {
NextRead::ParameterValue(index) => self.read_parameter_value(index, buffer),
NextRead::ReportFormat(index) => self.read_report_format(index, buffer),
NextRead::Report => {
self.next_read = Some(NextRead::Report);
self.read_reports(buffer)
}
NextRead::Timestamp => self.read_timestamp(buffer),
}),
None => Err(std::io::Error::new(std::io::ErrorKind::Other, "Read error: -1")),
}
}
}
impl Write for FakeZedmonInterface {
fn write(&mut self, data: &[u8]) -> std::io::Result<usize> {
let packet_type = PacketType::from_u8(data[0]).unwrap();
match packet_type {
PacketType::EnableReporting => self.next_read = Some(NextRead::Report),
PacketType::DisableReporting => self.next_read = None,
PacketType::QueryParameter => {
self.next_read = Some(NextRead::ParameterValue(data[1]))
}
PacketType::QueryReportFormat => {
self.next_read = Some(NextRead::ReportFormat(data[1]))
}
PacketType::QueryTime => self.next_read = Some(NextRead::Timestamp),
PacketType::SetOutput => {
assert_eq!(data.len(), 3);
self.set_output(data[1], data[2]);
}
_ => panic!("Not a valid host-to-target packet"),
}
Ok(data.len())
}
fn flush(&mut self) -> std::io::Result<()> {
Ok(())
}
}
}
fn make_report_queue(
voltage_function: impl Fn(u64) -> (f32, f32),
device_config: &fake_device::DeviceConfiguration,
test_duration: Duration,
raw_data_interval: Duration,
) -> VecDeque<Vec<Report>> {
let mut report_queue = VecDeque::new();
let mut elapsed = Duration::from_millis(0);
// 1ms of fake time elapses between each report. Reports are batched into groups of 5,
// the number that will fit into a single packet.
while elapsed <= test_duration {
let mut reports = Vec::new();
for _ in 0..5 {
let (v_shunt, v_bus) = (voltage_function)(elapsed.as_micros() as u64);
reports.push(Report {
timestamp_micros: elapsed.as_micros() as u64,
values: vec![
Value::I16((v_shunt / device_config.v_shunt_scale) as i16),
Value::I16((v_bus / device_config.v_bus_scale) as i16),
],
});
elapsed = elapsed + raw_data_interval;
if elapsed > test_duration {
break;
}
}
if !reports.is_empty() {
report_queue.push_back(reports);
}
}
report_queue
}
fn run_zedmon_reporting<InterfaceType: usb_bulk::Open<InterfaceType> + Read + Write>(
zedmon: &ZedmonClient<InterfaceType>,
test_duration: Duration,
reporting_interval: Option<Duration>,
) -> Result<Vec<u8>, Error> {
// Implements Write by sending bytes over a channel. The holder of the channel's
// Receiver can then inspect the data that was written to test expectations.
struct ChannelWriter {
sender: mpsc::Sender<Vec<u8>>,
buffer: Vec<u8>,
}
impl std::io::Write for ChannelWriter {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
self.buffer.extend_from_slice(buf);
Ok(buf.len())
}
fn flush(&mut self) -> std::io::Result<()> {
let mut payload = Vec::new();
std::mem::swap(&mut self.buffer, &mut payload);
self.sender.send(payload).unwrap();
Ok(())
}
}
let (sender, receiver) = mpsc::channel();
let writer = Box::new(ChannelWriter { sender, buffer: Vec::new() });
zedmon.read_reports(writer, DurationStopper::new(test_duration), reporting_interval)?;
let mut output = Vec::new();
while let Ok(mut buffer) = receiver.recv() {
output.append(&mut buffer);
}
Ok(output)
}
// Tests that ZedmonClient will disable reporting and drain enqueued packets on initialization.
#[test]
fn test_disable_reporting_and_drain_packets() {
// Interface that responds to reads with Report packets until reporting is disabled and an
// enqueued packet is drained. Afterwards, all calls are forwarded to an inner
// FakeZedmonInterface.
struct StillReportingInterface {
inner: fake_device::FakeZedmonInterface,
reporting_enabled: bool,
packets_enqueued: usize,
}
impl StillReportingInterface {
fn new(coordinator: Rc<RefCell<fake_device::Coordinator>>) -> Self {
Self {
inner: fake_device::FakeZedmonInterface::new(coordinator),
reporting_enabled: true,
packets_enqueued: 1,
}
}
fn make_reports(&self) -> Vec<Report> {
let mut reports = Vec::new();
for i in 0..5 {
reports.push(Report {
timestamp_micros: 1000 * (i as u64),
values: vec![Value::I16(i as i16), Value::I16(-(i as i16))],
});
}
reports
}
}
impl usb_bulk::Open<StillReportingInterface> for StillReportingInterface {
fn open<F>(_matcher: &mut F) -> Result<StillReportingInterface, Error>
where
F: FnMut(&InterfaceInfo) -> bool,
{
Err(format_err!("usb_bulk::Open not implemented"))
}
}
impl Read for StillReportingInterface {
fn read(&mut self, buffer: &mut [u8]) -> std::io::Result<usize> {
if self.reporting_enabled {
Ok(serialize_reports(&self.make_reports(), buffer))
} else if self.packets_enqueued > 0 {
self.packets_enqueued = self.packets_enqueued - 1;
Ok(serialize_reports(&self.make_reports(), buffer))
} else {
self.inner.read(buffer)
}
}
}
impl Write for StillReportingInterface {
fn write(&mut self, data: &[u8]) -> std::io::Result<usize> {
if self.reporting_enabled
&& PacketType::from_u8(data[0]).unwrap() == PacketType::DisableReporting
{
self.reporting_enabled = false;
Ok(1)
} else {
self.inner.write(data)
}
}
fn flush(&mut self) -> std::io::Result<()> {
self.inner.flush()
}
}
let device_config = fake_device::DeviceConfiguration {
shunt_resistance: 0.01,
v_shunt_scale: 1e-5,
v_bus_scale: 0.025,
};
let builder = fake_device::CoordinatorBuilder::new(device_config);
let coordinator = builder.build();
let interface = StillReportingInterface::new(coordinator);
assert!(ZedmonClient::new(interface).is_ok());
}
// Represents USB read responses enqueued by TransientFailureInterface.
enum UsbReadResponse {
Packet(Vec<u8>),
Error(std::io::Error),
}
// Acts as an intermediary between ZedmonClient and a FakeZedmonInterface, injecting
// `num_failures` read errors at the beginning of the report stream.
struct TransientFailureInterface {
inner: fake_device::FakeZedmonInterface,
num_failures: usize,
response_queue: VecDeque<UsbReadResponse>,
}
impl TransientFailureInterface {
fn new(coordinator: Rc<RefCell<fake_device::Coordinator>>, num_failures: usize) -> Self {
Self {
inner: fake_device::FakeZedmonInterface::new(coordinator),
num_failures,
response_queue: VecDeque::new(),
}
}
}
impl usb_bulk::Open<TransientFailureInterface> for TransientFailureInterface {
fn open<F>(_matcher: &mut F) -> Result<TransientFailureInterface, Error>
where
F: FnMut(&InterfaceInfo) -> bool,
{
Err(format_err!("usb_bulk::Open not implemented"))
}
}
impl Read for TransientFailureInterface {
fn read(&mut self, buffer: &mut [u8]) -> std::io::Result<usize> {
let mut packet = vec![0; MAX_PACKET_SIZE];
match self.inner.read(packet.as_mut_slice()) {
Ok(num_bytes) => {
packet.truncate(num_bytes);
if num_bytes == 0 {
// This case corresponds to exhaustion of `inner`s report queue.
self.response_queue.push_back(UsbReadResponse::Packet(packet));
} else if packet[0] == PacketType::Report as u8 {
// Enqueue a report packet for later.
self.response_queue.push_back(UsbReadResponse::Packet(packet));
} else {
// Any non-Report packets are passed through directly.
(&mut buffer[..num_bytes]).copy_from_slice(&packet);
return Ok(num_bytes);
}
}
Err(e) => self.response_queue.push_back(UsbReadResponse::Error(e)),
};
// If any failures remain, inject one; otherwise, return an enqueued Report.
if self.num_failures > 0 {
self.num_failures -= 1;
Err(std::io::Error::new(std::io::ErrorKind::Other, "Read error: -1"))
} else {
let response = self.response_queue.pop_front().unwrap();
match response {
UsbReadResponse::Packet(packet) => {
let num_bytes = packet.len();
(&mut buffer[..num_bytes]).copy_from_slice(&packet);
Ok(num_bytes)
}
UsbReadResponse::Error(e) => Err(e),
}
}
}
}
impl Write for TransientFailureInterface {
fn write(&mut self, data: &[u8]) -> std::io::Result<usize> {
self.inner.write(data)
}
fn flush(&mut self) -> std::io::Result<()> {
self.inner.flush()
}
}
#[test]
fn test_usb_read_failures() {
let device_config = fake_device::DeviceConfiguration {
shunt_resistance: 0.01,
v_shunt_scale: 1e-5,
v_bus_scale: 0.025,
};
let test_duration = Duration::from_secs(1);
let raw_data_interval = Duration::from_millis(100);
let report_queue =
make_report_queue(|_| (0.0025, 1.0), &device_config, test_duration, raw_data_interval);
let run_reporting = |num_failures| {
let coordinator = fake_device::CoordinatorBuilder::new(device_config.clone())
.with_report_queue(report_queue.clone())
.build();
let interface = TransientFailureInterface::new(coordinator, num_failures);
let zedmon = ZedmonClient::new(interface).expect("Error building ZedmonClient");
run_zedmon_reporting(&zedmon, test_duration, None)
};
let max_failures = ZedmonClient::<TransientFailureInterface>::num_usb_read_retries();
// Test that reporting proceeds in the event of a few consecutive USB read failures.
let result = run_reporting(max_failures);
assert!(result.is_ok());
let output = result.unwrap();
let mut reader =
csv::ReaderBuilder::new().has_headers(false).from_reader(output.as_slice());
assert_eq!(reader.deserialize::<ZedmonRecord>().count(), 11);
// Test that reporting terminates with an error if too many consecutive read failures occur.
let result = run_reporting(max_failures + 1);
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("USB read failures"));
}
#[test]
fn test_read_reports() -> Result<(), Error> {
// The voltages used are simply time-dependent signals that, combined with shunt_resistance
// below, yield power on the order of a few Watts.
fn get_voltages(micros: u64) -> (f32, f32) {
let seconds = micros as f32 / 1e6;
let v_shunt = 1e-3 + 2e-4 * (std::f32::consts::PI * seconds).cos();
let v_bus = 20.0 + 3.0 * (std::f32::consts::PI * seconds).sin();
(v_shunt, v_bus)
}
// These values are in the same ballpark as those used on Zedmon 2.1. The test shouldn't be
// sensitive to them.
let device_config = fake_device::DeviceConfiguration {
shunt_resistance: 0.01,
v_shunt_scale: 1e-5,
v_bus_scale: 0.025,
};
let test_duration = Duration::from_secs(10);
let raw_data_interval = Duration::from_millis(1);
let report_queue =
make_report_queue(get_voltages, &device_config, test_duration, raw_data_interval);
let coordinator = fake_device::CoordinatorBuilder::new(device_config.clone())
.with_report_queue(report_queue.clone())
.build();
let interface = fake_device::FakeZedmonInterface::new(coordinator);
let zedmon = ZedmonClient::new(interface).expect("Error building ZedmonClient");
let output = run_zedmon_reporting(&zedmon, test_duration, None)?;
let mut reader =
csv::ReaderBuilder::new().has_headers(false).from_reader(output.as_slice());
let mut num_records = 0;
for result in reader.deserialize::<ZedmonRecord>() {
let record = result?;
num_records = num_records + 1;
let (expected_shunt_voltage, expected_bus_voltage) =
get_voltages(record.timestamp_micros);
assert_near!(record.shunt_voltage, expected_shunt_voltage, device_config.v_shunt_scale);
assert_near!(record.bus_voltage, expected_bus_voltage, device_config.v_bus_scale);
assert_near!(
record.power,
record.shunt_voltage * record.bus_voltage / device_config.shunt_resistance,
1e-6
);
}
assert_eq!(num_records, test_duration.as_millis() / raw_data_interval.as_millis() + 1);
Ok(())
}
#[test]
fn test_read_reports_downsampled() -> Result<(), Error> {
// The voltages and device config are all completely made up for mathematical convenience.
// They are chosen so that v_shunt, v_bus, and power are easy to integrate analytically over
// the downsampling intervals.
fn get_voltages(micros: u64) -> (f32, f32) {
let seconds = micros as f32 / 1e6;
let v_shunt = seconds;
let v_bus = seconds.powi(2);
(v_shunt, v_bus)
}
let device_config = fake_device::DeviceConfiguration {
shunt_resistance: 2.0,
v_shunt_scale: 1e-4,
v_bus_scale: 1e-4,
};
let test_duration = Duration::from_secs(1);
let raw_data_interval = Duration::from_millis(1);
let reporting_interval = Duration::from_millis(100);
// Interval-average formulas follow from the definition
// avg(y(t), [t_low, t_high]) := 1 / (t_high - t_low) * \int_{t_low}^{t_high} y(s) ds.
// Input times are in seconds.
let v_shunt_integral = |t1: f32, t2: f32| (t2.powi(2) - t1.powi(2)) / (2.0 * (t2 - t1));
let v_bus_integral = |t1: f32, t2: f32| (t2.powi(3) - t1.powi(3)) / (3.0 * (t2 - t1));
let power_integral = |t1: f32, t2: f32| {
(t2.powi(4) - t1.powi(4)) / (4.0 * device_config.shunt_resistance * (t2 - t1))
};
let report_queue =
make_report_queue(get_voltages, &device_config, test_duration, raw_data_interval);
let coordinator = fake_device::CoordinatorBuilder::new(device_config.clone())
.with_report_queue(report_queue.clone())
.build();
let interface = fake_device::FakeZedmonInterface::new(coordinator);
let zedmon = ZedmonClient::new(interface).expect("Error building ZedmonClient");
let output = run_zedmon_reporting(&zedmon, test_duration, Some(reporting_interval))?;
let mut reader =
csv::ReaderBuilder::new().has_headers(false).from_reader(output.as_slice());
// Both v_shunt and v_bus have a max value of 1. Since
// power = v_shunt * v_bus / shunt_resistance,
// the maximum error in a raw power measurement should be roughly
// (v_shunt_scale + v_bus_scale) / shunt_resistance.
let power_tolerance = (device_config.v_shunt_scale + device_config.v_bus_scale)
/ device_config.shunt_resistance;
let mut num_records = 0;
let mut prev_timestamp = 0;
for result in reader.deserialize::<ZedmonRecord>() {
let record = result?;
num_records = num_records + 1;
let t = record.timestamp_micros as f32 / 1e6;
let t_prev = prev_timestamp as f32 / 1e6;
assert_near!(
record.shunt_voltage,
v_shunt_integral(t_prev, t),
device_config.v_shunt_scale
);
assert_near!(record.bus_voltage, v_bus_integral(t_prev, t), device_config.v_bus_scale);
assert_near!(record.power, power_integral(t_prev, t), power_tolerance);
prev_timestamp = record.timestamp_micros;
}
assert_eq!(num_records, test_duration.as_millis() / reporting_interval.as_millis());
Ok(())
}
#[test]
fn test_get_time_offset_nanos() -> Result<(), Error> {
// This instant is effectively Zedmon's zero timestamp.
let zedmon_offset = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH)?;
// Values are not used by this test.
let device_config = fake_device::DeviceConfiguration {
shunt_resistance: 0.0,
v_shunt_scale: 0.0,
v_bus_scale: 0.0,
};
let builder =
fake_device::CoordinatorBuilder::new(device_config).with_offset_time(zedmon_offset);
let coordinator = builder.build();
let interface = fake_device::FakeZedmonInterface::new(coordinator);
let zedmon = ZedmonClient::new(interface)?;
let (reported_offset, uncertainty) = zedmon.get_time_offset_nanos()?;
assert_near!(zedmon_offset.as_nanos() as i64, reported_offset, uncertainty);
Ok(())
}
#[test]
fn test_set_relay() -> Result<(), Error> {
// Values are not used by this test.
let device_config = fake_device::DeviceConfiguration {
shunt_resistance: 0.0,
v_shunt_scale: 0.0,
v_bus_scale: 0.0,
};
let builder = fake_device::CoordinatorBuilder::new(device_config).with_relay_enabled(false);
let coordinator = builder.build();
let interface = fake_device::FakeZedmonInterface::new(coordinator.clone());
let zedmon = ZedmonClient::new(interface)?;
// Test true->false and false->true transitions, and no-ops in each state.
zedmon.set_relay(true)?;
assert_eq!(coordinator.borrow().relay_enabled(), true);
zedmon.set_relay(true)?;
assert_eq!(coordinator.borrow().relay_enabled(), true);
zedmon.set_relay(false)?;
assert_eq!(coordinator.borrow().relay_enabled(), false);
zedmon.set_relay(false)?;
assert_eq!(coordinator.borrow().relay_enabled(), false);
zedmon.set_relay(true)?;
assert_eq!(coordinator.borrow().relay_enabled(), true);
Ok(())
}
#[test]
fn test_downsampler_nominal() {
// Total duration of the test scenario.
let duration_micros = 1_000_000;
// Interval of the raw signal (100 Hz).
let raw_interval_micros = 10_000;
// Downsample to 80 Hz. This lets us test both when the downsampling interval coincides with
// the end of a raw sample interval (t=25,000us, 50,000us, 75,000us, ...) and when it
// does not (t=12,500us, 37,500us, 62,500us, ...).
let downsampling_interval_micros = 12_500;
let mut downsampler = Downsampler::new(downsampling_interval_micros);
// Functions defining the raw and average values for each quantity. These are made up, and
// there is no relationship between the voltages and power in Downsampler's context.
//
// Interval-average formulas follow from the definition
// avg(y(t), [t_low, t_high]) := 1 / (t_high - t_low) * \int_{t_low}^{t_high} y(s) ds.
let shunt_voltage_raw = |t: f32| t;
let shunt_voltage_average =
|t1: f32, t2: f32| (t2.powi(2) - t1.powi(2)) / (2.0 * (t2 - t1));
let bus_voltage_raw = |t: f32| t.powi(2);
let bus_voltage_average = |t1: f32, t2: f32| (t2.powi(3) - t1.powi(3)) / (3.0 * (t2 - t1));
let power_raw = |t: f32| t.powi(3);
let power_average = |t1: f32, t2: f32| (t2.powi(4) - t1.powi(4)) / (4.0 * (t2 - t1));
// Collect raw samples from t=0s to t=1s.
let mut t_micros = 0;
let mut records_out = Vec::new();
while t_micros <= duration_micros {
let t_sec = t_micros as f32 / 1e6;
let record = ZedmonRecord {
timestamp_micros: t_micros,
shunt_voltage: shunt_voltage_raw(t_sec),
bus_voltage: bus_voltage_raw(t_sec),
power: power_raw(t_sec),
};
match downsampler.process(record) {
Some(r) => records_out.push(r),
None => {}
}
t_micros += raw_interval_micros;
}
let dt_sec = downsampling_interval_micros as f32 / 1e6;
// Confirm expectations.
assert_eq!(records_out.len(), (duration_micros / downsampling_interval_micros) as usize);
for (i, record) in records_out.into_iter().enumerate() {
let expected_micros = ((i + 1) as u64) * downsampling_interval_micros;
// Endpoints of the interval for this sample, in seconds.
let t2 = expected_micros as f32 / 1e6;
let t1 = t2 - dt_sec;
assert_eq!(record.timestamp_micros, expected_micros);
assert_near!(record.shunt_voltage, shunt_voltage_average(t1, t2), 1e-4);
assert_near!(record.bus_voltage, bus_voltage_average(t1, t2), 1e-4);
assert_near!(record.power, power_average(t1, t2), 1e-4);
}
}
#[test]
fn test_downsampler_with_data_gap() {
// Run for 1 second with a raw data interval of 10ms and a downsampling interval of 100ms,
// with an outage in raw data from 230ms to 440ms, inclusive. Downsampling will skip outputs
// at 300ms and 400ms and reinitialize at 450ms, so we should see a gap in timestamps of the
// output samples between 200ms and 550ms.
let duration_micros = 1_000_000;
let raw_interval_micros = 10_000;
let downsampling_interval_micros = 100_000;
let raw_data_gap_micros = [230_000, 440_000];
let mut downsampler = Downsampler::new(downsampling_interval_micros);
let mut t_micros = 0;
let mut records_out = Vec::new();
while t_micros <= duration_micros {
if t_micros < raw_data_gap_micros[0] || t_micros > raw_data_gap_micros[1] {
let record = ZedmonRecord {
timestamp_micros: t_micros,
shunt_voltage: 1.0,
bus_voltage: 1.0,
power: 1.0,
};
match downsampler.process(record) {
Some(r) => records_out.push(r),
None => {}
}
}
t_micros += raw_interval_micros;
}
let timestamps: Vec<u64> = records_out.into_iter().map(|r| r.timestamp_micros).collect();
assert_eq!(timestamps, vec![100_000, 200_000, 550_000, 650_000, 750_000, 850_000, 950_000]);
}
}