blob: 91452454b89bf6ecd1300107f0eaff9143ab0d0f [file] [log] [blame]
// Copyright 2019 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
//! Thread level stats of the ongoing perf measurement. Each thread may own it's
//! own stat. Some stats are additive some are not. Aggregating them is slightly
//! tricky and may come at cost of precision.
//! latency is in microseconds
//! bandwidth is in MB (10^6 bytes) per second
//! IOPS is in number of IOs per second - duh
use {
crate::io_packet::TimeInterval,
crate::operations::{OperationType, PipelineStages},
log::{
debug, log_enabled, warn, Level, Level::Debug, LevelFilter, Metadata, Record,
SetLoggerError,
},
std::time::Instant,
};
#[cfg(target_os = "fuchsia")]
use crate::fuchsia_utils::create_tracer;
#[cfg(not(target_os = "fuchsia"))]
use crate::not_fuchsia_utils::create_tracer;
/// OperationStats maintains stats for an operation. In addition to maintaining total
/// time spent in each stage of io packet's pipeline, it also maintains total
/// number of IOs completed and total number of bytes transferred.
#[derive(Copy, Clone, Default)]
struct OperationStats {
/// Sum of time all IoPacket spent actively in each stage.
/// TODO: Create a type instead of using an array.
stage_time_elapsed: [u128; PipelineStages::stage_count()],
/// Total number of bytes transferred. The interpretation of this variable
/// may depend on the type of operation.
bytes_transferred: u64,
/// Total IoPackets processed so far.
io_count: u64,
}
impl OperationStats {
pub fn log(
&mut self,
io_size: u64,
stage_time_elapsed: &[TimeInterval; PipelineStages::stage_count()],
) {
for stage in PipelineStages::iterator() {
self.stage_time_elapsed[stage.stage_number()] +=
stage_time_elapsed[stage.stage_number()].duration();
}
self.io_count += 1;
self.bytes_transferred += io_size;
}
pub fn aggregate_summary(&mut self, stat1: &OperationStats) {
// bytes transferred is additive.
self.bytes_transferred += stat1.bytes_transferred;
// io_count is additive.
self.io_count += stat1.io_count;
// This is debatable whether to consider adding or taking the
// smallest/largest run. For start/end we chose longest run and for
// stage duration we choose some of all time spent to get a different
// perspective about the performance.
for stage in PipelineStages::iterator() {
self.stage_time_elapsed[stage.stage_number()] +=
stat1.stage_time_elapsed[stage.stage_number()];
}
}
/// Returns latency, bandwidth and IO per second for a given stage
fn stage_stats(&self, stage: PipelineStages) -> (f64, f64, f64) {
let mut duration_sec =
self.stage_time_elapsed[stage.stage_number()] as f64 / 1_000_000_000 as f64;
let duration_ns = self.stage_time_elapsed[stage.stage_number()];
if duration_sec == 0.0 {
warn!("Stage duration is zero for {:?}", stage);
duration_sec = 1.0;
}
let latency_us = (duration_ns as f64 / 1_000 as f64) / self.io_count as f64;
let bandwidth = (self.bytes_transferred / 1_000_000) as f64 / (duration_sec as f64);
let iops_per_sec = self.io_count as f64 / (duration_sec as f64);
(latency_us, bandwidth, iops_per_sec)
}
fn log_summary_per_stage(&self) {
for stage in PipelineStages::iterator() {
let threads_duration = self.stage_time_elapsed[stage.stage_number()];
let (latency_us, bandwidth, iops_per_sec) = self.stage_stats(*stage);
debug!(
"{:?} TimeInStage:{} IOPS:{:.2} LAT(us):{:.2} BW(MB/s):{:.2}",
*stage, threads_duration, iops_per_sec, latency_us, bandwidth
);
}
}
}
/// Maintains vital stats for each operation type and also maintains time
/// duration this stat was active - time spent between calls to start() and
/// end().
pub struct Stats {
/// When was this Stats instance started. This is a snapshot of
/// monotonically increasing system clock.
start: Instant,
/// When was the Stats instance done gathering stats.
end: Instant,
// TODO: Create a type instead of using an array.
operations: [OperationStats; OperationType::operations_count()],
}
impl Stats {
pub fn new() -> Stats {
let now = Instant::now();
Stats { start: now, operations: Default::default(), end: now }
}
pub fn log(
&mut self,
operation: OperationType,
io_size: u64,
stage_time_elapsed: &[TimeInterval; PipelineStages::stage_count()],
) {
self.operations[operation.operation_number()].log(io_size, stage_time_elapsed);
}
pub fn start_clock(&mut self) {
self.start = Instant::now();
}
pub fn stop_clock(&mut self) {
self.end = Instant::now();
}
/// Aggregate stats from multiple stats into one. We consider longest running
/// thread for duration of the run.
pub fn aggregate_summary(&mut self, stat1: &Stats) {
// We consider longest running thread.
if self.end.duration_since(self.start) < stat1.end.duration_since(stat1.start) {
self.start = stat1.start;
self.end = stat1.end;
}
for operation in OperationType::iterator() {
let self_ops = &mut self.operations[operation.operation_number()];
let stat1_ops = &stat1.operations[operation.operation_number()];
self_ops.aggregate_summary(stat1_ops);
}
}
pub fn display_summary(&self) {
let mut duration =
self.end.duration_since(self.start).as_nanos() as f64 / 1_000_000_000 as f64;
if duration == 0.0 {
warn!("Stage duration is zero");
duration = 1.0;
}
for operation in OperationType::iterator() {
let stat = &self.operations[operation.operation_number()];
if log_enabled!(Debug) {
stat.log_summary_per_stage();
}
if stat.io_count > 0 {
let (latency_us, bandwidth, iops_per_sec) = stat.stage_stats(PipelineStages::Issue);
println!(
"Operation: {:?}\n\
\tRuntime:{:0.2}s IOs:{} bytes_transferred(MB):{:.2}\n\
\tIOPS:{:.2} LAT(us):{:.2} BW(MB/s):{:.2}",
operation,
duration,
stat.io_count,
stat.bytes_transferred as f64 / 1_000_000 as f64,
iops_per_sec,
latency_us,
bandwidth
);
}
}
}
}
struct SimpleLogger;
impl log::Log for SimpleLogger {
fn enabled(&self, metadata: &Metadata<'_>) -> bool {
metadata.level() <= Level::Debug
}
fn log(&self, record: &Record<'_>) {
if self.enabled(record.metadata()) {
println!("{} - {}", record.level(), record.args());
}
}
fn flush(&self) {}
}
static LOGGER: SimpleLogger = SimpleLogger;
pub fn log_init(log_ftrace: bool) -> Result<(), SetLoggerError> {
create_tracer(log_ftrace);
log::set_logger(&LOGGER).map(|()| log::set_max_level(LevelFilter::Info))
}
#[cfg(test)]
mod tests {
use {
crate::io_packet::TimeInterval,
crate::log::Stats,
crate::operations::{OperationType, PipelineStages},
std::cmp,
std::{thread, time::Duration},
};
#[test]
fn stats_test_new_initialized() {
let stats = Stats::new();
assert_eq!(stats.end.duration_since(stats.start).as_nanos(), 0);
for operation in OperationType::iterator() {
let operations = &stats.operations[operation.operation_number()];
assert_eq!(operations.bytes_transferred, 0);
assert_eq!(operations.io_count, 0);
for stage in PipelineStages::iterator() {
assert_eq!(operations.stage_time_elapsed[stage.stage_number()], 0);
}
}
}
fn log_update_helper(
stats: &mut Stats,
operation_sleep_time: Duration,
stage_sleep_time: Duration,
) {
for operation in OperationType::iterator() {
let mut stage_time_elapsed = [TimeInterval::new(); PipelineStages::stage_count()];
for stage in PipelineStages::iterator() {
stage_time_elapsed[stage.stage_number()].start();
}
// sleep for few milliseconds before we end pipelines
thread::sleep(
operation_sleep_time
+ Duration::from_millis(1 * operation.operation_number() as u64),
);
for stage in PipelineStages::iterator() {
thread::sleep(stage_sleep_time);
stage_time_elapsed[stage.stage_number()].end();
}
stats.log(*operation, operation.operation_number() as u64, &stage_time_elapsed);
}
}
fn log_verify_helper(
stats: &Stats,
io_count: u64,
operation_sleep_time: Duration,
stage_sleep_time: Duration,
) {
for operation in OperationType::iterator() {
let op_stats = &stats.operations[operation.operation_number()];
assert_eq!(op_stats.io_count, io_count);
assert_eq!(op_stats.bytes_transferred, io_count * operation.operation_number() as u64);
let operation_duration_ns = operation_sleep_time.as_nanos()
+ Duration::from_millis(1 * operation.operation_number() as u64).as_nanos();
let mut stage_duration_ns: u128 = 0;
for stage in PipelineStages::iterator() {
stage_duration_ns += stage_sleep_time.as_nanos();
assert!(
op_stats.stage_time_elapsed[stage.stage_number()]
> io_count as u128 * (operation_duration_ns + stage_duration_ns)
);
}
}
let mut total_duration: Duration = Duration::new(0, 0);
for _ in 0..io_count {
total_duration += operation_sleep_time * OperationType::operations_count() as u32;
}
assert!(stats.end.duration_since(stats.start) > total_duration);
}
#[test]
fn stats_test_log_log_once() {
let mut stats = Stats::new();
stats.start_clock();
log_update_helper(&mut stats, Duration::from_millis(10), Duration::from_millis(1));
stats.stop_clock();
log_verify_helper(&mut stats, 1, Duration::from_millis(10), Duration::from_millis(1));
}
#[test]
fn stats_test_log_log_multiple() {
let mut stats = Stats::new();
stats.start_clock();
let io_count = 4;
for _ in 0..io_count {
log_update_helper(&mut stats, Duration::from_millis(10), Duration::from_millis(1));
}
stats.stop_clock();
log_verify_helper(
&mut stats,
io_count,
Duration::from_millis(10),
Duration::from_millis(1),
);
}
#[test]
fn stats_test_aggregate() {
let mut stats1 = Stats::new();
let mut stats2 = Stats::new();
let io_count1 = 4;
let io_count2 = 5;
stats1.start_clock();
for _ in 0..io_count1 {
log_update_helper(&mut stats1, Duration::from_millis(10), Duration::from_millis(1));
}
stats1.stop_clock();
stats2.start_clock();
for _ in 0..io_count2 {
log_update_helper(&mut stats2, Duration::from_millis(10), Duration::from_millis(1));
}
stats2.stop_clock();
let mut aggregate = Stats::new();
aggregate.aggregate_summary(&stats1);
aggregate.aggregate_summary(&stats2);
for operation in OperationType::iterator() {
let aggregate_op = &aggregate.operations[operation.operation_number()];
let op1 = &stats1.operations[operation.operation_number()];
let op2 = &stats2.operations[operation.operation_number()];
// io_counts should be cumulative.
assert_eq!(aggregate_op.io_count, op1.io_count + op2.io_count);
// bytes transferred should be cumulative.
assert_eq!(
aggregate_op.bytes_transferred,
op1.bytes_transferred + op2.bytes_transferred
);
// Time spent in each stage is also cumulative.
for stage in PipelineStages::iterator() {
assert_eq!(
aggregate_op.stage_time_elapsed[stage.stage_number()],
op1.stage_time_elapsed[stage.stage_number()]
+ op2.stage_time_elapsed[stage.stage_number()]
);
}
}
// For duration of the run, longest running instance should be considered.
assert_eq!(
aggregate.end.duration_since(aggregate.start).as_nanos(),
cmp::max(
stats1.end.duration_since(stats1.start).as_nanos(),
stats2.end.duration_since(stats2.start).as_nanos()
)
);
}
}