blob: b1789b7d76ae22dd9ada97c901602197d1a99273 [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
//! Fuchsia Executor Instrumentation.
//!
//! This module contains types used for instrumenting the fuchsia-async executor.
//! It exposes an event style API, intended to be invoked at certain points
//! during execution time, agnostic to the implementation of the executor.
//! The current implementation records counts and time slices with a focus
//! on minimal overhead.
use fuchsia_zircon as zx;
use std::{
cmp, mem,
panic::Location,
sync::atomic::{AtomicU64, AtomicUsize, Ordering},
};
/// A low-overhead metrics collection type intended for use by an async executor.
#[derive(Default)]
pub struct Collector {
tasks_created: AtomicUsize,
tasks_completed: AtomicUsize,
tasks_pending_max: AtomicUsize,
polls: AtomicUsize,
wakeups_io: AtomicUsize,
wakeups_deadline: AtomicUsize,
wakeups_notification: AtomicUsize,
ticks_awake: AtomicU64,
ticks_asleep: AtomicU64,
}
impl Collector {
/// Create a new blank collector.
pub fn new() -> Self {
Self::default()
}
/// Called when a task is created (usually, this means spawned).
pub fn task_created(&self, _id: usize, _source: Option<&Location<'_>>) {
#[cfg(trace_level_logging)]
tracing::trace!(
tag = "fuchsia_async",
id = _id,
source = %_source.unwrap(),
"Task spawned"
);
self.tasks_created.fetch_add(1, Ordering::Relaxed);
}
/// Called when a task is complete.
pub fn task_completed(&self, _id: usize, _source: Option<&Location<'_>>) {
#[cfg(trace_level_logging)]
tracing::trace!(
tag = "fuchsia_async",
id = _id,
source = %_source.unwrap(),
"Task completed"
);
self.tasks_completed.fetch_add(1, Ordering::Relaxed);
}
/// Creates a local collector. Each run loop should have its own local collector.
/// The local collector is initially awake, and has no recorded events.
pub fn create_local_collector(&self) -> LocalCollector<'_> {
LocalCollector {
collector: &self,
last_ticks: zx::ticks_get(),
polls: 0,
tasks_pending_max: 0, // Loading not necessary, handled by first update with same cost
}
}
#[allow(dead_code)]
/// Create a snapshot of the currently collected metrics. If the executor is running
/// in multi-threaded mode, the snapshot is non-atomic. That said, the following
/// invariant will hold: tasks_pending_max and tasks_completed <= tasks_created.
pub fn snapshot(&self) -> Snapshot {
Snapshot {
tasks_pending_max: self.tasks_pending_max.load(Ordering::Acquire),
tasks_completed: self.tasks_completed.load(Ordering::Acquire),
tasks_created: self.tasks_created.load(Ordering::Acquire),
polls: self.polls.load(Ordering::Acquire),
wakeups_io: self.wakeups_io.load(Ordering::Acquire),
wakeups_deadline: self.wakeups_deadline.load(Ordering::Acquire),
wakeups_notification: self.wakeups_notification.load(Ordering::Acquire),
ticks_awake: self.ticks_awake.load(Ordering::Acquire),
ticks_asleep: self.ticks_asleep.load(Ordering::Acquire),
}
}
}
/// A logical sub-collector type of a `Collector`, for a local run-loop.
pub struct LocalCollector<'a> {
/// The main collector of this local collector
collector: &'a Collector,
/// Ticks since the awake state was last toggled
last_ticks: i64,
/// Number of polls since last `will_wait`
polls: usize,
/// Last observed `tasks_pending_max` from main Collector
tasks_pending_max: usize,
}
impl<'a> LocalCollector<'a> {
/// Called after a task was polled. If the task completed, `complete`
/// should be true. `pending_tasks` is the observed size of the pending task
/// queue (excluding the currently polled task).
pub fn task_polled(
&mut self,
id: usize,
source: Option<&Location<'_>>,
complete: bool,
tasks_pending: usize,
) {
self.polls += 1;
let new_local_max = cmp::max(self.tasks_pending_max, tasks_pending);
if new_local_max > self.tasks_pending_max {
let prev_upstream_max =
self.collector.tasks_pending_max.fetch_max(new_local_max, Ordering::Relaxed);
self.tasks_pending_max = cmp::max(new_local_max, prev_upstream_max);
}
if complete {
self.collector.task_completed(id, source);
}
}
/// Called before the loop waits. Must be followed by a `woke_up` call.
pub fn will_wait(&mut self) {
let delta = self.bump_ticks();
self.collector.ticks_awake.fetch_add(delta, Ordering::Relaxed);
self.collector.polls.fetch_add(mem::replace(&mut self.polls, 0), Ordering::Relaxed);
}
/// Called after the loop wakes up from waiting, containing the reason
/// for the wakeup. Must follow a `will_wait` call.
pub fn woke_up(&mut self, wakeup_reason: WakeupReason) {
let delta = self.bump_ticks();
let counter = match wakeup_reason {
WakeupReason::Io => &self.collector.wakeups_io,
WakeupReason::Deadline => &self.collector.wakeups_deadline,
WakeupReason::Notification => &self.collector.wakeups_notification,
};
counter.fetch_add(1, Ordering::Relaxed);
self.collector.ticks_asleep.fetch_add(delta, Ordering::Relaxed);
}
/// Helper which replaces `last_ticks` with the current ticks.
/// Returns the ticks elapsed since `last_ticks`.
fn bump_ticks(&mut self) -> u64 {
let current_ticks = zx::ticks_get();
let delta = current_ticks - self.last_ticks;
assert!(delta >= 0, "time moved backwards in zx::ticks_get()");
self.last_ticks = current_ticks;
delta as u64
}
}
impl<'a> Drop for LocalCollector<'a> {
fn drop(&mut self) {
let delta = self.bump_ticks();
self.collector.polls.fetch_add(self.polls, Ordering::Release);
self.collector.ticks_awake.fetch_add(delta, Ordering::Release);
}
}
/// The reason a waiting run-loop was woken up.
pub enum WakeupReason {
/// An external io packet was received on the port.
Io,
/// Deadline from a user-space timer.
Deadline,
/// An executor-internal notification.
Notification,
}
/// A snapshot of all metrics collected at a specific point in time.
// TODO(https://fxbug.dev/332385167): Remove or explain #[allow(dead_code)].
#[allow(dead_code)]
#[derive(Debug)]
pub struct Snapshot {
pub tasks_created: usize,
pub tasks_completed: usize,
pub tasks_pending_max: usize,
pub polls: usize,
pub wakeups_io: usize,
pub wakeups_deadline: usize,
pub wakeups_notification: usize,
pub ticks_awake: u64,
pub ticks_asleep: u64,
}
#[cfg(test)]
mod tests {
use {
super::*,
crate::{
handle::channel::Channel, runtime::fuchsia::executor::Time, LocalExecutor,
SendExecutor, TestExecutor, Timer,
},
fuchsia_zircon::{self as zx, DurationNum},
futures::future,
std::pin::pin,
};
const MICROSECOND: std::time::Duration = std::time::Duration::from_micros(1);
use std::thread::sleep;
/// Helper which keeps track of last observed tick counts, and reports
/// changes.
struct Ticker<'a> {
c: &'a Collector,
awake: u64, // Last observed awake ticks
asleep: u64, // Last observed asleep ticks
}
impl<'a> Ticker<'a> {
fn new(c: &'a Collector) -> Self {
let result = Self { c, awake: 0, asleep: 0 };
// Ensure that the next interaction with the collector that's supposed to accumulate
// ticks will actually observe time having passed.
sleep(1 * MICROSECOND);
result
}
/// Updates awake and asleep ticks to current values. Returns a bool
/// 2-tuple indicating whether awake and asleep time, respectively,
/// progressed since last cycle.
fn update(&mut self) -> (bool, bool) {
let old_awake =
mem::replace(&mut self.awake, self.c.ticks_awake.load(Ordering::Relaxed));
let old_asleep =
mem::replace(&mut self.asleep, self.c.ticks_asleep.load(Ordering::Relaxed));
// Ensure that the next interaction with the collector that's supposed to accumulate
// ticks will actually observe time having passed.
sleep(1 * MICROSECOND);
(self.awake > old_awake, self.asleep > old_asleep)
}
}
#[test]
fn debug_snapshot() {
let collector = Collector {
tasks_created: 10.into(),
tasks_completed: 9.into(),
tasks_pending_max: 3.into(),
polls: 1000.into(),
wakeups_io: 123.into(),
wakeups_deadline: 456.into(),
wakeups_notification: 789.into(),
ticks_awake: 100000.into(),
ticks_asleep: 200000.into(),
};
assert_eq!(
format!("{:?}", collector.snapshot()),
"\
Snapshot { tasks_created: 10, tasks_completed: 9, tasks_pending_max: 3, \
polls: 1000, wakeups_io: 123, wakeups_deadline: 456, wakeups_notification: 789, \
ticks_awake: 100000, ticks_asleep: 200000 }"
);
}
#[test]
fn collector() {
let collector = Collector::new();
collector.task_created(0, Some(Location::caller()));
collector.task_created(1, Some(Location::caller()));
collector.task_completed(0, Some(Location::caller()));
assert_eq!(collector.tasks_created.load(Ordering::Relaxed), 2);
assert_eq!(collector.tasks_completed.load(Ordering::Relaxed), 1);
}
#[test]
fn task_polled() {
let collector = Collector::new();
collector.tasks_pending_max.store(6, Ordering::Relaxed);
collector.polls.store(1, Ordering::Relaxed);
let mut local_collector = collector.create_local_collector();
local_collector.task_polled(0, Some(Location::caller()), false, /* pending_tasks */ 5);
assert_eq!(collector.tasks_pending_max.load(Ordering::Relaxed), 6);
local_collector.task_polled(0, Some(Location::caller()), false, /* pending_tasks */ 7);
assert_eq!(collector.tasks_pending_max.load(Ordering::Relaxed), 7);
assert_eq!(local_collector.polls, 2);
// Polls not yet flushed
assert_eq!(collector.polls.load(Ordering::Relaxed), 1);
// Collector polls: 1 -> 3
local_collector.will_wait();
assert_eq!(collector.polls.load(Ordering::Relaxed), 3);
// Check that local collector was reset
assert_eq!(local_collector.tasks_pending_max, 7);
assert_eq!(local_collector.polls, 0);
// One more cycle to check that max is idempotent when main collector is greater
collector.tasks_pending_max.store(10, Ordering::Relaxed);
local_collector.woke_up(WakeupReason::Io);
local_collector.task_polled(0, Some(Location::caller()), false, /* pending_tasks */ 8);
assert_eq!(local_collector.tasks_pending_max, 10);
// Flush with drop this time. Polls 3 -> 4
drop(local_collector);
assert_eq!(collector.tasks_pending_max.load(Ordering::Relaxed), 10);
assert_eq!(collector.polls.load(Ordering::Relaxed), 4);
}
#[test]
fn ticks() {
let collector = Collector::new();
let mut ticker = Ticker::new(&collector);
let mut local_collector = collector.create_local_collector();
assert_eq!(ticker.update(), (false, false));
// Will wait should move awake time forward
local_collector.will_wait();
assert_eq!(ticker.update(), (true, false));
// Woke up should move asleep time forward
local_collector.woke_up(WakeupReason::Io);
assert_eq!(ticker.update(), (false, true));
// Poll should NOT move awake time forward
local_collector.task_polled(0, Some(Location::caller()), true, 1);
assert_eq!(ticker.update(), (false, false));
// Drop should move awake time forward
drop(local_collector);
assert_eq!(ticker.update(), (true, false));
}
#[test]
fn wakeups() {
let collector = Collector::new();
let mut local_collector = collector.create_local_collector();
local_collector.will_wait();
local_collector.woke_up(WakeupReason::Io);
local_collector.will_wait();
local_collector.woke_up(WakeupReason::Deadline);
local_collector.will_wait();
local_collector.woke_up(WakeupReason::Deadline);
local_collector.will_wait();
local_collector.woke_up(WakeupReason::Notification);
local_collector.will_wait();
local_collector.woke_up(WakeupReason::Notification);
local_collector.will_wait();
local_collector.woke_up(WakeupReason::Notification);
assert_eq!(collector.wakeups_io.load(Ordering::Relaxed), 1);
assert_eq!(collector.wakeups_deadline.load(Ordering::Relaxed), 2);
assert_eq!(collector.wakeups_notification.load(Ordering::Relaxed), 3);
}
// Check that multiple local collectors coalesce into expected aggregates.
// Covers some permutations of interleaved calls.
#[test]
fn multiple_local_collectors() {
let collector = Collector::new();
let mut ticker = Ticker::new(&collector);
// tasks_created += 1
collector.task_created(0, Some(Location::caller()));
let mut local_1 = collector.create_local_collector();
// tasks_polled += 1, tasks_pending_max = 5
local_1.task_polled(0, Some(Location::caller()), false, 5);
// ticks_awake += T
local_1.will_wait();
assert_eq!(ticker.update(), (true, false));
let mut local_2 = collector.create_local_collector();
// tasks_created += 1, tasks_polled += 2, tasks_complete += 1, tasks_pending_max = 7
collector.task_created(1, Some(Location::caller()));
local_2.task_polled(0, Some(Location::caller()), false, 7);
local_2.task_polled(0, Some(Location::caller()), true, 3);
// ticks_awake += T
local_2.will_wait();
assert_eq!(ticker.update(), (true, false));
// ticks_asleep += T
local_1.woke_up(WakeupReason::Io);
assert_eq!(ticker.update(), (false, true));
// ticks_asleep += T
local_1.woke_up(WakeupReason::Deadline);
assert_eq!(ticker.update(), (false, true));
// ticks_awake += T
drop(local_1);
assert_eq!(ticker.update(), (true, false));
// ticks_awake += T
drop(local_2);
assert_eq!(ticker.update(), (true, false));
// Finally, check that counters match their expected values
assert_eq!(collector.tasks_created.load(Ordering::Relaxed), 2);
assert_eq!(collector.tasks_completed.load(Ordering::Relaxed), 1);
assert_eq!(collector.polls.load(Ordering::Relaxed), 3);
assert_eq!(collector.tasks_pending_max.load(Ordering::Relaxed), 7);
assert_eq!(collector.wakeups_io.load(Ordering::Relaxed), 1);
assert_eq!(collector.wakeups_deadline.load(Ordering::Relaxed), 1);
assert_eq!(collector.wakeups_notification.load(Ordering::Relaxed), 0);
}
#[test]
fn instrumentation_single_smoke_test() {
let mut executor = LocalExecutor::new();
executor.run_singlethreaded(simple_task_for_snapshot());
let snapshot = executor.snapshot();
snapshot_sanity_check(&snapshot);
assert_eq!(snapshot.tasks_created, 2);
assert_eq!(snapshot.tasks_completed, 2);
assert!(snapshot.wakeups_deadline >= 1);
}
#[test]
fn instrumentation_until_stalled_smoke_test() {
let mut executor = TestExecutor::new_with_fake_time();
let mut fut = pin!(simple_task_for_snapshot());
assert!(executor.run_until_stalled(&mut fut).is_ready());
let snapshot = executor.snapshot();
snapshot_sanity_check(&snapshot);
assert_eq!(snapshot.tasks_created, 2);
assert_eq!(snapshot.tasks_completed, 2);
assert!(snapshot.wakeups_deadline >= 1);
}
#[test]
fn instrumentation_multi_smoke_test() {
let mut executor = SendExecutor::new(2);
executor.run(simple_task_for_snapshot());
let snapshot = executor.snapshot();
snapshot_sanity_check(&snapshot);
assert_eq!(snapshot.tasks_created, 2);
assert_eq!(snapshot.tasks_completed, 2);
assert!(snapshot.wakeups_deadline >= 1);
}
// This task spawns another tasks, which completes. It should wake up from IO, notification
// and deadline at least once each. Min polls is 4.
pub async fn simple_task_for_snapshot() {
let bytes = &[0, 1, 2, 3];
let (tx, rx) = zx::Channel::create();
let f_rx = Channel::from_channel(rx);
let mut buffer = zx::MessageBuf::new();
let read_fut = f_rx.recv_msg(&mut buffer);
// This extra poll ensures a happens-before relationship between the read and the write
// future in order to trigger an IO wakeup. This registers a waker with the executor
// which guarantees that the IO wakeup will be skipped by short circuiting.
let pending_read_fut = match future::select(read_fut, future::ready(())).await {
future::Either::Right((_, pending)) => pending,
_ => panic!("read future complete before write"),
};
let t = crate::Task::spawn(async move {
let mut handles = Vec::new();
tx.write(bytes, &mut handles).expect("failed to write message");
Timer::new(Time::after(0.nanos())).await;
});
pending_read_fut.await.expect("read future did not complete");
t.await;
}
// Sanity check for running simple_task on an executor. `extra_tasks` represents
// synthetic tasks that are added as an impl detail of the execution - e.g. a multithreaded
// execution run creates an extra synthetic task for the main future.
pub fn snapshot_sanity_check(snapshot: &Snapshot) {
assert!(snapshot.polls >= 4);
assert!(snapshot.wakeups_io >= 1);
assert!(snapshot.ticks_awake >= 1);
assert!(snapshot.ticks_asleep >= 1);
}
}