blob: d3e41e0be1582cb37c05b6784019495f8397a439 [file] [log] [blame]
// Copyright 2021 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use {
anyhow::{anyhow, Context as _, Result},
async_lock::Mutex,
async_trait::async_trait,
ffx_stream_util::TryStreamUtilExt,
fuchsia_async::Task,
futures::prelude::*,
pin_project::pin_project,
std::cmp::Eq,
std::fmt::Debug,
std::future::Future,
std::hash::Hash,
std::pin::Pin,
std::rc::{Rc, Weak},
std::result,
std::task::{Context, Poll},
std::time::Duration,
timeout::timeout,
};
pub trait EventTrait: Debug + Sized + Hash + Clone + Eq {}
impl<T> EventTrait for T where T: Debug + Sized + Hash + Clone + Eq {}
/// An EventSynthesizer is any object that can convert a snapshot of its current
/// state into a vector of events.
#[async_trait(?Send)]
pub trait EventSynthesizer<T: EventTrait> {
async fn synthesize_events(&self) -> Vec<T>;
}
/// Convenience implementation: if attempting to synthesize events from a weak
/// pointer, returns empty when the weak pointer is no longer valid.
///
/// Leaves a log for debugging.
#[async_trait(?Send)]
impl<T: EventTrait> EventSynthesizer<T> for Weak<dyn EventSynthesizer<T>> {
async fn synthesize_events(&self) -> Vec<T> {
let this = match self.upgrade() {
Some(t) => t,
None => {
log::info!("event synthesizer parent Rc<_> lost");
return Vec::new();
}
};
this.synthesize_events().await
}
}
/// Determines the status of a handler.
#[derive(PartialEq, Eq, Debug)]
pub enum Status {
/// Returned when an event handler is done.
Done,
/// Returned when an event handler is expecting to handle more events.
Waiting,
}
/// Implements a general event handler for any inbound events.
#[async_trait(?Send)]
pub trait EventHandler<T: EventTrait> {
async fn on_event(&self, event: T) -> Result<Status>;
}
struct DispatcherInner<T: EventTrait + 'static> {
handler: Box<dyn EventHandler<T>>,
event_in: async_channel::Sender<T>,
}
/// Dispatcher runs events in the handler's queue until the handler is finished,
/// at which point processing ends.
struct Dispatcher<T: EventTrait + 'static> {
inner: Weak<DispatcherInner<T>>,
_task: Task<()>,
}
impl<T: EventTrait + 'static> Dispatcher<T> {
async fn handler_helper(
event: T,
inner: Rc<DispatcherInner<T>>,
) -> result::Result<(), Result<()>> {
inner
.handler
.on_event(event)
.map(|r| {
// This block merits some explaining:
// So originally an event handler would say that it is done by
// returning Ok(Done). This works around it so that a success
// result of `Done` will cause the work stream for this handler
// to close.
//
// Otherwise when there is an error, just rewrap it in an Err.
// This is just a complicated way to remap Result<Status> to
// Result<()> to preserve the original intended behavior.
if let Ok(r) = r {
if r == Status::Done {
Err(Ok(()))
} else {
Ok(())
}
} else {
Err(Err(r.unwrap_err()))
}
})
.await
}
fn new(handler: impl EventHandler<T> + 'static) -> Self {
let (event_in, queue) = async_channel::unbounded::<T>();
let inner = Rc::new(DispatcherInner { handler: Box::new(handler), event_in });
Self {
inner: Rc::downgrade(&inner),
_task: Task::local(async move {
queue
.map(|e| Ok(e))
.try_for_each_concurrent_while_connected(None, move |e| {
Self::handler_helper(e, inner.clone())
})
.await
.unwrap_or_else(|e| {
if let Err(e) = e {
log::warn!("dispatcher failed in detached task: {:#?}", e)
}
});
}),
}
}
async fn push(&self, e: T) -> Result<()> {
let inner = match self.inner.upgrade() {
Some(i) => i,
None => return Err(anyhow!("done")),
};
inner.event_in.send(e).await.map_err(|e| anyhow!("error enqueueing event: {:#}", e))
}
}
#[pin_project]
struct PredicateHandlerFuture<F: Future<Output = Result<()>>> {
// Hack to track whether this future has been dropped, so that eventually
// the dispatcher will clean the handler up later.
_inner: Rc<()>,
#[pin]
fut: F,
}
impl<F: Future<Output = Result<()>>> PredicateHandlerFuture<F> {
fn new(inner: Rc<()>, fut: F) -> Self {
Self { _inner: inner, fut }
}
}
impl<F: Future<Output = Result<()>>> Future for PredicateHandlerFuture<F> {
type Output = F::Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.project().fut.poll(cx)
}
}
struct PredicateHandler<T: EventTrait, F>
where
F: Future<Output = bool>,
{
parent_link: Weak<()>,
predicate_matched: async_channel::Sender<()>,
predicate: Box<dyn Fn(T) -> F + 'static>,
}
impl<T: EventTrait, F> PredicateHandler<T, F>
where
F: Future<Output = bool>,
{
fn new(
parent_link: Weak<()>,
predicate: impl (Fn(T) -> F) + 'static,
) -> (Self, async_channel::Receiver<()>) {
let (tx, rx) = async_channel::unbounded::<()>();
let s = Self { parent_link, predicate_matched: tx, predicate: Box::new(predicate) };
(s, rx)
}
}
#[async_trait(?Send)]
impl<T, F> EventHandler<T> for PredicateHandler<T, F>
where
T: EventTrait,
F: Future<Output = bool>,
{
async fn on_event(&self, event: T) -> Result<Status> {
// This is a bit of a race, but will eventually clean things up by the
// time the next event fires (if the wait_for future is dropped).
if self.parent_link.upgrade().is_none() {
return Ok(Status::Done);
}
if (self.predicate)(event).await {
self.predicate_matched.send(()).await.context("sending 'done' signal to waiter")?;
return Ok(Status::Done);
}
Ok(Status::Waiting)
}
}
type Handlers<T> = Rc<Mutex<Vec<Dispatcher<T>>>>;
#[derive(Clone)]
pub struct Queue<T: EventTrait + 'static> {
inner_tx: async_channel::Sender<T>,
handlers: Handlers<T>,
state: Weak<dyn EventSynthesizer<T>>,
// Rc<_> so that the client can drop multiple of these clients without
// having the underlying task dropped/canceled.
_processor_task: Rc<Task<()>>,
}
struct Processor<T: 'static + EventTrait> {
inner_rx: Option<async_channel::Receiver<T>>,
handlers: Handlers<T>,
}
impl<T: 'static + EventTrait> Queue<T> {
/// Creates an event queue. The state is tracked with a `Weak<_>` pointer to
/// `state`.
///
/// When this is called, an event processing task is started in the
/// background and tied to the lifetimes of these objects. Once all objects
/// are dropped, the background process will be shutdown automatically.
pub fn new(state: &Rc<impl EventSynthesizer<T> + 'static>) -> Self {
let (inner_tx, inner_rx) = async_channel::unbounded::<T>();
let handlers = Rc::new(Mutex::new(Vec::<Dispatcher<T>>::new()));
let proc = Processor::<T> { inner_rx: Some(inner_rx), handlers: handlers.clone() };
let state = Rc::downgrade(state);
Self { inner_tx, handlers, state, _processor_task: Rc::new(Task::local(proc.process())) }
}
/// Creates an event queue (see `new`) with a single handler to start.
#[allow(unused)] // TODO(awdavies): This will be needed later for target events.
pub fn new_with_handler(
state: &Rc<impl EventSynthesizer<T> + 'static>,
handler: impl EventHandler<T> + 'static,
) -> Self {
let (inner_tx, inner_rx) = async_channel::unbounded::<T>();
let handlers = Rc::new(Mutex::new(vec![Dispatcher::new(handler)]));
let proc = Processor::<T> { inner_rx: Some(inner_rx), handlers: handlers.clone() };
let state = Rc::downgrade(state);
Self { inner_tx, handlers, state, _processor_task: Rc::new(Task::local(proc.process())) }
}
/// Adds an event handler, which is fired every time an event comes in.
/// Before this happens, though, the event dispatcher associated with this
/// `EventHandler<_>` will send a list of synthesized events to the handler
/// derived from the internal state.
pub async fn add_handler(&self, handler: impl EventHandler<T> + 'static) {
// Locks the handlers so that they cannot receive events, then obtains
// the state as it will (hopefully) not have had any updates after
// acquiring the lock, on account of it not being able to push new
// events to the queue.
let mut handlers = self.handlers.lock().await;
let synth_events = self.state.synthesize_events().await;
let dispatcher = Dispatcher::new(handler);
for event in synth_events.iter() {
// If an error occurs in the event handler its Rc<_> will be dropped,
// so just return if there's an error. The result for continuing and
// adding the dispatcher anyway would be about the same, this just
// makes cleanup slightly faster.
match dispatcher
.push(event.clone())
.await
.context("sending synthesized event to child queue")
{
Ok(_) => (),
Err(e) => {
log::warn!("{}", e);
return;
}
}
}
handlers.push(dispatcher);
}
/// Waits for an event to occur. An event has occurred when the closure
/// passed to this function evaluates to `true`.
///
/// If timeout is `None`, this will run forever, else this will return an
/// `Error` if the timeout is reached (`Error` will only ever be returned
/// for a timeout).
pub async fn wait_for(
&self,
timeout: Option<Duration>,
predicate: impl Fn(T) -> bool + 'static,
) -> Result<()> {
self.wait_for_async(timeout, move |e| future::ready(predicate(e))).await
}
/// The async version of `wait_for` (See: `wait_for`).
pub async fn wait_for_async<F1>(
&self,
timeout_opt: Option<Duration>,
predicate: impl Fn(T) -> F1 + 'static,
) -> Result<()>
where
F1: Future<Output = bool> + 'static,
{
let link = Rc::new(());
let parent_link = Rc::downgrade(&link);
let (handler, mut handler_done) = PredicateHandler::new(parent_link, move |t| predicate(t));
let fut = async move {
handler_done
.next()
.await
.unwrap_or_else(|| log::warn!("unable to get 'done' signal from handler."));
Result::<()>::Ok(())
};
self.add_handler(handler).await;
if let Some(t) = timeout_opt {
PredicateHandlerFuture::new(link, async move {
timeout(t, fut).await.map_err(|e| anyhow!("waiting for event: {:#}", e))?
})
.await
} else {
PredicateHandlerFuture::new(link, fut).await
}
}
pub fn push(&self, event: T) -> Result<()> {
self.inner_tx.try_send(event).map_err(|e| anyhow!("event queue push: {:#}", e))
}
}
impl<T> Processor<T>
where
T: EventTrait + 'static,
{
async fn dispatch(&self, event: T) {
let mut handlers = self.handlers.lock().await;
let mut new_handlers = Vec::new();
for dispatcher in handlers.drain(..) {
match dispatcher.push(event.clone()).await {
Ok(()) => new_handlers.push(dispatcher),
Err(e) => {
log::info!("dispatcher closed. reason: {:#}", e);
}
}
}
*handlers = new_handlers;
}
/// Consumes the processor and then runs until all instances of the Queue are closed.
async fn process(mut self) {
if let Some(rx) = self.inner_rx.take() {
rx.for_each(|event| self.dispatch(event)).await;
} else {
log::warn!("process should only ever be called once");
}
}
}
#[cfg(test)]
mod test {
use super::*;
struct TestHookFirst {
callbacks_done: async_channel::Sender<bool>,
}
#[async_trait(?Send)]
impl EventHandler<i32> for TestHookFirst {
async fn on_event(&self, event: i32) -> Result<Status> {
assert_eq!(event, 5);
self.callbacks_done.send(true).await.unwrap();
Ok(Status::Done)
}
}
struct TestHookSecond {
callbacks_done: async_channel::Sender<bool>,
}
#[async_trait(?Send)]
impl EventHandler<i32> for TestHookSecond {
async fn on_event(&self, event: i32) -> Result<Status> {
assert_eq!(event, 5);
self.callbacks_done.send(true).await.unwrap();
Ok(Status::Waiting)
}
}
struct FakeEventStruct {}
#[async_trait(?Send)]
impl<T: EventTrait + 'static> EventSynthesizer<T> for FakeEventStruct {
async fn synthesize_events(&self) -> Vec<T> {
vec![]
}
}
#[fuchsia_async::run_singlethreaded(test)]
async fn test_receive_two_handlers() {
let (tx_from_callback, mut rx_from_callback) = async_channel::unbounded::<bool>();
let fake_events = Rc::new(FakeEventStruct {});
let queue = Queue::new(&fake_events);
let ((), ()) = futures::join!(
queue.add_handler(TestHookFirst { callbacks_done: tx_from_callback.clone() }),
queue.add_handler(TestHookSecond { callbacks_done: tx_from_callback }),
);
queue.push(5).unwrap();
assert!(rx_from_callback.next().await.unwrap());
assert!(rx_from_callback.next().await.unwrap());
}
#[fuchsia_async::run_singlethreaded(test)]
async fn test_wait_for_event_once_async() {
let fake_events = Rc::new(FakeEventStruct {});
let queue = Queue::new(&fake_events);
queue.push(5).unwrap();
queue
.wait_for_async(None, |e| async move {
assert_eq!(e, 5);
true
})
.await
.unwrap();
}
#[fuchsia_async::run_singlethreaded(test)]
async fn test_wait_for_event_once() {
let fake_events = Rc::new(FakeEventStruct {});
let queue = Queue::new(&fake_events);
queue.push(5).unwrap();
queue.wait_for(None, |e| e == 5).await.unwrap();
}
struct FakeEventSynthesizer {}
#[async_trait(?Send)]
impl EventSynthesizer<i32> for FakeEventSynthesizer {
async fn synthesize_events(&self) -> Vec<i32> {
vec![2, 3, 7, 6]
}
}
#[fuchsia_async::run_singlethreaded(test)]
async fn test_wait_for_event_synthetic() {
let fake_events = Rc::new(FakeEventSynthesizer {});
let queue = Queue::new(&fake_events);
let (one, two, three, four) = futures::join!(
queue.wait_for(None, |e| e == 7),
queue.wait_for(None, |e| e == 6),
queue.wait_for(None, |e| e == 2),
queue.wait_for(None, |e| e == 3)
);
one.unwrap();
two.unwrap();
three.unwrap();
four.unwrap();
}
// This is mostly here to fool the compiler, as for whatever reason invoking
// `synthesize_events()` directly on a `Weak<_>` doesn't work.
async fn test_event_synth_func<T: EventTrait>(es: Weak<dyn EventSynthesizer<T>>) -> Vec<T> {
es.synthesize_events().await
}
#[fuchsia_async::run_singlethreaded(test)]
async fn event_synthesis_dropped_state() {
let fake_events = Rc::new(FakeEventSynthesizer {});
let weak = Rc::downgrade(&fake_events);
std::mem::drop(fake_events);
let vec = test_event_synth_func(weak).await;
assert_eq!(vec.len(), 0);
}
#[derive(Debug, Hash, Clone, PartialEq, Eq)]
enum EventFailerInput {
Fail,
Complete,
}
struct EventFailer {
dropped: async_channel::Sender<bool>,
}
impl EventFailer {
fn new() -> (Self, async_channel::Receiver<bool>) {
let (dropped, handler_dropped_rx) = async_channel::unbounded::<bool>();
(Self { dropped }, handler_dropped_rx)
}
}
impl Drop for EventFailer {
fn drop(&mut self) {
// TODO(raggi): use a safer executor
futures::executor::block_on(self.dropped.send(true)).unwrap();
}
}
#[async_trait(?Send)]
impl EventHandler<EventFailerInput> for EventFailer {
async fn on_event(&self, event: EventFailerInput) -> Result<Status> {
match event {
EventFailerInput::Fail => Err(anyhow!("test told to fail")),
EventFailerInput::Complete => Ok(Status::Done),
}
}
}
struct EventFailerState {}
#[async_trait(?Send)]
impl EventSynthesizer<EventFailerInput> for EventFailerState {
async fn synthesize_events(&self) -> Vec<EventFailerInput> {
vec![EventFailerInput::Fail]
}
}
#[fuchsia_async::run_singlethreaded(test)]
async fn event_failure_drops_handler_synth_events() {
let fake_events = Rc::new(EventFailerState {});
let queue = Queue::new(&fake_events);
let (handler, mut handler_dropped_rx) = EventFailer::new();
queue.add_handler(handler).await;
assert!(handler_dropped_rx.next().await.unwrap());
}
#[fuchsia_async::run_singlethreaded(test)]
async fn event_failure_drops_handler() {
let fake_events = Rc::new(FakeEventStruct {});
let queue = Queue::new(&fake_events);
let (handler, mut handler_dropped_rx) = EventFailer::new();
let (handler2, mut handler_dropped_rx2) = EventFailer::new();
let ((), ()) = futures::join!(queue.add_handler(handler), queue.add_handler(handler2));
queue.push(EventFailerInput::Fail).unwrap();
assert!(handler_dropped_rx.next().await.unwrap());
assert!(handler_dropped_rx2.next().await.unwrap());
}
#[fuchsia_async::run_singlethreaded(test)]
async fn event_done_drops_handler() {
let fake_events = Rc::new(FakeEventStruct {});
let queue = Queue::new(&fake_events);
let (handler, mut handler_dropped_rx) = EventFailer::new();
let (handler2, mut handler_dropped_rx2) = EventFailer::new();
let ((), ()) = futures::join!(queue.add_handler(handler), queue.add_handler(handler2));
queue.push(EventFailerInput::Complete).unwrap();
assert!(handler_dropped_rx.next().await.unwrap());
assert!(handler_dropped_rx2.next().await.unwrap());
}
#[fuchsia_async::run_singlethreaded(test)]
async fn event_wait_for_timeout() {
let fake_events = Rc::new(FakeEventStruct {});
let queue = Queue::<i32>::new(&fake_events);
assert!(queue.wait_for(Some(Duration::from_millis(1)), |_| true).await.is_err());
}
}