blob: 77ca633e8cda540c41ba2851cc37e32cdbd0904c [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
//! An event that can be signaled and waited on by multiple consumers.
use {
fuchsia_sync::Mutex,
futures::future::{FusedFuture, Future},
slab::Slab,
std::{
fmt,
pin::Pin,
sync::Arc,
task::{Context, Poll, Waker},
},
};
const NULL_WAKER_KEY: usize = usize::max_value();
/// An `Event` is a clonable object that can be signaled once. Calls to `.wait()` produce a future,
/// `EventWait`, that can wait on that signal. Once the `Event` has been signaled, all futures will
/// complete immediately.
#[derive(Clone)]
pub struct Event {
inner: Arc<EventSignaler>,
}
impl Event {
/// Create a new `Event` that has not yet been signaled.
pub fn new() -> Self {
Self {
inner: Arc::new(EventSignaler {
inner: Arc::new(Mutex::new(EventState {
state: State::Waiting,
wakers: Slab::new(),
})),
}),
}
}
/// Signal the `Event`. Once this is done, it cannot be undone. Any tasks waiting on this
/// `Event` will be notified and its `Future` implementation will complete.
///
/// Returns true if this `Event` was the one that performed the signal operation.
pub fn signal(&self) -> bool {
self.inner.set(State::Signaled)
}
/// Return true if `Event::signal` has already been called.
pub fn signaled(&self) -> bool {
self.inner.inner.lock().state == State::Signaled
}
/// Create a new `EventWait` future that will complete after this event has been signaled.
/// If all signalers are dropped, this future will continue to return `Poll::Pending`. To be
/// notified when all signalers are dropped without signaling, use `wait_or_dropped`.
pub fn wait(&self) -> EventWait {
EventWait { inner: self.wait_or_dropped() }
}
/// Create a new `EventWaitResult` future that will complete after this event has been
/// signaled or all `Event` clones have been dropped.
///
/// This future will output a `Result<(), Dropped>` to indicate what has occurred.
pub fn wait_or_dropped(&self) -> EventWaitResult {
EventWaitResult {
inner: (*self.inner).inner.clone(),
waker_key: NULL_WAKER_KEY,
terminated: false,
}
}
}
impl fmt::Debug for Event {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Event {{ state: {:?} }}", self.inner.inner.lock().state)
}
}
/// `Event` state tracking enum.
#[derive(Copy, Clone, PartialEq, Eq, Debug)]
enum State {
/// The `Event` has not yet been signaled. This is the initial state of an `Event`.
Waiting,
/// The `signal` method has been called on an `Event`.
Signaled,
/// All clones of an `Event` have been dropped without the `signal` method being called. An
/// `Event` can never move out of the `Dropped` state.
Dropped,
}
/// Tracks state shared by all Event clones and futures.
struct EventState {
pub state: State,
pub wakers: Slab<Waker>,
}
/// A handle shared between all `Event` structs for a given event. Once all `Event`s are dropped,
/// this will be dropped and will notify the `EventState` that it is unreachable by any signalers
/// and will never be signaled if it hasn't been already.
struct EventSignaler {
inner: Arc<Mutex<EventState>>,
}
impl EventSignaler {
/// Internal function to set the self.inner.state value if it has not already been set to
/// `State::Signaled`. Returns true if this function call changed the value of self.inner.state.
fn set(&self, state: State) -> bool {
assert!(state != State::Waiting, "Cannot reset the state to Waiting");
let mut guard = self.inner.lock();
if let State::Signaled = guard.state {
// Avoid double panicking.
if !std::thread::panicking() {
assert!(
guard.wakers.is_empty(),
"If there are wakers, a race condition is present"
);
}
false
} else {
let mut wakers = std::mem::replace(&mut guard.wakers, Slab::new());
guard.state = state;
drop(guard);
for waker in wakers.drain() {
waker.wake();
}
true
}
}
}
impl Drop for EventSignaler {
fn drop(&mut self) {
// Indicate that all `Event` clones have been dropped. This does not set the value if it
// has already been set to `State::Signaled`.
let _: bool = self.set(State::Dropped);
}
}
/// Future implementation for `Event::wait_or_dropped`.
#[must_use = "futures do nothing unless polled"]
pub struct EventWaitResult {
inner: Arc<Mutex<EventState>>,
waker_key: usize,
terminated: bool,
}
impl Future for EventWaitResult {
type Output = Result<(), Dropped>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// `this: &mut Self` allows the compiler to track access to individual fields of Self as
// distinct borrows.
let this = self.get_mut();
let mut guard = this.inner.lock();
match guard.state {
State::Waiting => {
let mut new_key = None;
if this.waker_key == NULL_WAKER_KEY || !guard.wakers.contains(this.waker_key) {
new_key = Some(guard.wakers.insert(cx.waker().clone()));
} else {
guard.wakers[this.waker_key] = cx.waker().clone();
}
if let Some(key) = new_key {
this.waker_key = key;
}
Poll::Pending
}
State::Signaled => {
this.terminated = true;
this.waker_key = NULL_WAKER_KEY;
Poll::Ready(Ok(()))
}
State::Dropped => {
this.terminated = true;
this.waker_key = NULL_WAKER_KEY;
Poll::Ready(Err(Dropped))
}
}
}
}
impl FusedFuture for EventWaitResult {
fn is_terminated(&self) -> bool {
self.terminated
}
}
impl Unpin for EventWaitResult {}
impl Drop for EventWaitResult {
fn drop(&mut self) {
if self.waker_key != NULL_WAKER_KEY {
// Cleanup the EventWaitResult's waker one is present in the wakers slab.
let mut guard = self.inner.lock();
if guard.wakers.contains(self.waker_key) {
let _ = guard.wakers.remove(self.waker_key);
}
}
}
}
/// Future implementation for `Event::wait`. This future only completes when the event is signaled.
/// If all signalers are dropped, `EventWait` continues to return `Poll::Pending`.
#[must_use = "futures do nothing unless polled"]
pub struct EventWait {
inner: EventWaitResult,
}
impl Future for EventWait {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match Pin::new(&mut self.inner).poll(cx) {
Poll::Ready(Ok(())) => Poll::Ready(()),
_ => Poll::Pending,
}
}
}
impl FusedFuture for EventWait {
fn is_terminated(&self) -> bool {
self.inner.is_terminated()
}
}
impl Unpin for EventWait {}
/// Error returned from an `EventWait` when the Event is dropped.
#[derive(Debug, Eq, PartialEq, Clone, Copy)]
pub struct Dropped;
impl fmt::Display for Dropped {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "event dropped")
}
}
impl std::error::Error for Dropped {}
#[cfg(test)]
mod tests {
use {super::*, fuchsia_async as fasync};
// TODO: Add tests to check waker count in EventWait and EventWaitResult.
#[test]
fn signaled_method_respects_signaling() {
let event = Event::new();
let event_clone = event.clone();
assert!(!event.signaled());
assert!(!event_clone.signaled());
assert!(event.signal());
assert!(event.signaled());
assert!(event_clone.signaled());
}
#[test]
fn unsignaled_event_is_pending() {
let mut ex = fasync::TestExecutor::new();
let event = Event::new();
let mut wait = event.wait();
let mut wait_or_dropped = event.wait_or_dropped();
assert!(ex.run_until_stalled(&mut wait).is_pending());
assert!(ex.run_until_stalled(&mut wait_or_dropped).is_pending());
}
#[test]
fn signaled_event_is_ready() {
let mut ex = fasync::TestExecutor::new();
let event = Event::new();
let mut wait = event.wait();
let mut wait_or_dropped = event.wait_or_dropped();
assert!(event.signal());
assert!(ex.run_until_stalled(&mut wait).is_ready());
assert!(ex.run_until_stalled(&mut wait_or_dropped).is_ready());
}
#[test]
fn event_is_ready_and_wakes_after_stalled() {
let mut ex = fasync::TestExecutor::new();
let event = Event::new();
let mut wait = event.wait();
let mut wait_or_dropped = event.wait_or_dropped();
assert!(ex.run_until_stalled(&mut wait).is_pending());
assert!(ex.run_until_stalled(&mut wait_or_dropped).is_pending());
assert!(event.signal());
assert!(ex.run_until_stalled(&mut wait).is_ready());
assert!(ex.run_until_stalled(&mut wait_or_dropped).is_ready());
}
#[test]
fn signaling_event_registers_and_wakes_multiple_waiters_properly() {
let mut ex = fasync::TestExecutor::new();
let event = Event::new();
let mut wait_1 = event.wait();
let mut wait_2 = event.wait();
let mut wait_3 = event.wait();
// Multiple waiters events are pending correctly.
assert!(ex.run_until_stalled(&mut wait_1).is_pending());
assert!(ex.run_until_stalled(&mut wait_2).is_pending());
assert!(event.signal());
// Both previously registered and unregistered event waiters complete correctly.
assert!(ex.run_until_stalled(&mut wait_1).is_ready());
assert!(ex.run_until_stalled(&mut wait_2).is_ready());
assert!(ex.run_until_stalled(&mut wait_3).is_ready());
}
#[test]
fn event_is_terminated_after_complete() {
let mut ex = fasync::TestExecutor::new();
let event = Event::new();
let mut wait = event.wait();
let mut wait_or_dropped = event.wait_or_dropped();
assert!(ex.run_until_stalled(&mut wait).is_pending());
assert!(ex.run_until_stalled(&mut wait_or_dropped).is_pending());
assert!(!wait.is_terminated());
assert!(!wait_or_dropped.is_terminated());
assert!(event.signal());
assert!(ex.run_until_stalled(&mut wait).is_ready());
assert!(ex.run_until_stalled(&mut wait_or_dropped).is_ready());
assert!(wait.is_terminated());
assert!(wait_or_dropped.is_terminated());
}
#[test]
fn waiter_drops_gracefully() {
let mut ex = fasync::TestExecutor::new();
let event = Event::new();
let mut wait = event.wait();
let mut wait_or_dropped = event.wait();
assert!(ex.run_until_stalled(&mut wait).is_pending());
assert!(ex.run_until_stalled(&mut wait_or_dropped).is_pending());
assert!(!wait.is_terminated());
assert!(!wait_or_dropped.is_terminated());
drop(wait);
drop(wait_or_dropped);
assert!(event.signal());
}
#[test]
fn waiter_completes_after_all_events_drop() {
let mut ex = fasync::TestExecutor::new();
let event = Event::new();
let event_clone = Event::new();
let mut wait = event.wait();
let mut wait_or_dropped = event.wait_or_dropped();
assert!(ex.run_until_stalled(&mut wait).is_pending());
assert!(ex.run_until_stalled(&mut wait_or_dropped).is_pending());
assert!(!wait.is_terminated());
assert!(!wait_or_dropped.is_terminated());
drop(event);
drop(event_clone);
assert!(ex.run_until_stalled(&mut wait).is_pending());
assert!(ex.run_until_stalled(&mut wait_or_dropped).is_ready());
}
#[test]
fn drop_receiver_after_poll_without_event_signal() {
let mut exec = fasync::TestExecutor::new();
let event = Event::new();
let mut waiter = event.wait_or_dropped();
assert!(exec.run_until_stalled(&mut waiter).is_pending());
drop(event);
drop(waiter);
}
#[test]
fn drop_receiver_after_event_signal_without_repoll() {
let mut exec = fasync::TestExecutor::new();
let event = Event::new();
let mut waiter = event.wait_or_dropped();
assert_eq!(event.inner.inner.lock().wakers.len(), 0);
// Polling the waiter will register a new waker.
assert!(exec.run_until_stalled(&mut waiter).is_pending());
assert_eq!(event.inner.inner.lock().wakers.len(), 1);
// The waiter's waker is used.
assert!(event.signal());
assert_eq!(event.inner.inner.lock().wakers.len(), 0);
// Dropping a waiter without polling it is valid.
drop(waiter);
}
}