blob: 6c60d07749ce40b2c641d5627fe3a1a767dd5c63 [file] [log] [blame]
// Copyright 2022 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use {
anyhow::format_err,
futures::{channel::oneshot, future::Shared, lock::Mutex, pin_mut, FutureExt},
std::{future::Future, pin::Pin, sync::Arc, task::Poll},
};
/// A `Future` whose output value can be cleared and replaced. The future is in a `Poll::Pending`
/// state if and only if it has no value.
///
/// The future can be cloned arbitrarily, and can be awaited while it is being set.
#[derive(Debug, Clone)]
pub struct ResettableFuture<T>
where
T: Clone + 'static,
{
inner: Arc<Mutex<ResettableFutureInner<T>>>,
}
struct ResettableFutureInner<T>
where
T: Clone + 'static,
{
/// If `Some`, that means that the future currently has no value.
sender: Option<oneshot::Sender<T>>,
receiver: Shared<oneshot::Receiver<T>>,
}
impl<T> ResettableFuture<T>
where
T: Clone + 'static,
{
/// Creates a new, empty [`ResettableFuture`].
pub fn new() -> Self {
let (sender, receiver) = oneshot::channel();
let inner = ResettableFutureInner { sender: Some(sender), receiver: receiver.shared() };
return Self { inner: Arc::new(Mutex::new(inner)) };
}
/// Sets or replaces the future's value. Any current or future pollers will receive the new
/// value.
pub async fn set(&self, value: T) {
let sender = {
let mut inner = self.inner.lock().await;
if let Some(unused_sender) = inner.sender.take() {
unused_sender
} else {
let (new_sender, new_receiver) = oneshot::channel();
// Don't bother replacing inner.sender because we're about to consume it immediately.
inner.receiver = new_receiver.shared();
new_sender
}
};
sender.send(value).map_err(|_| format_err!("Receiver dropped before sender")).unwrap()
}
/// Clears the future's stored value. If the future is already empty, it is untouched, and
/// existing pollers remain valid.
pub async fn clear(&self) {
let mut inner = self.inner.lock().await;
if inner.sender.is_none() {
let (new_sender, new_receiver) = oneshot::channel();
inner.sender = Some(new_sender);
inner.receiver = new_receiver.shared();
}
}
/// Convenience method for awaiting the future such that it can still be manipulated with `set`
/// or `clear`.
pub async fn get(&self) -> T {
// Warning: Don't await the lock and the receiver in the same statement. The lock would
// remain held for too long, causing deadlocks.
let receiver = self.inner.lock().await.receiver.clone();
// The borrow checker doesn't allow a SettableFuture to be dropped while one of its futures
// is still live.
receiver.await.expect("Sender dropped unexpectedly while Future was live")
}
}
/// The container itself can be `.await`ed, if so inclined.
impl<T> Future for ResettableFuture<T>
where
T: Clone + 'static,
{
type Output = T;
fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
let fut = self.get();
pin_mut!(fut);
fut.poll(cx)
}
}
#[cfg(test)]
mod tests {
use {
super::*,
assert_matches::assert_matches,
futures::{join, poll},
};
#[fuchsia::test]
async fn basic_get_set_and_clear() {
let f = ResettableFuture::<u32>::new();
assert_matches!(poll!(f.clone()), Poll::Pending);
f.set(17).await;
assert_matches!(poll!(f.clone()), Poll::Ready(17));
assert_matches!(poll!(f.clone()), Poll::Ready(17));
assert_matches!(f.get().await, 17);
f.set(34).await;
assert_matches!(f.get().await, 34);
f.clear().await;
assert_matches!(poll!(f.clone()), Poll::Pending);
f.set(51).await;
assert_matches!(f.get().await, 51);
}
#[fuchsia::test]
async fn multiple_readers() {
let f = ResettableFuture::<u32>::new();
let fut_a = f.get();
pin_mut!(fut_a);
let fut_b = f.get();
pin_mut!(fut_b);
let fut_c = f.clone();
pin_mut!(fut_c);
assert_matches!(poll!(&mut fut_a), Poll::Pending);
assert_matches!(poll!(&mut fut_b), Poll::Pending);
assert_matches!(poll!(&mut fut_c), Poll::Pending);
f.set(17).await;
assert_matches!(poll!(&mut fut_a), Poll::Ready(17));
assert_matches!(poll!(&mut fut_b), Poll::Ready(17));
assert_matches!(poll!(&mut fut_c), Poll::Ready(17));
}
#[fuchsia::test]
async fn clear_while_awaited() {
let f = ResettableFuture::<u32>::new();
let fut_a = f.get();
pin_mut!(fut_a);
assert_matches!(poll!(&mut fut_a), Poll::Pending);
f.clear().await;
assert_matches!(poll!(&mut fut_a), Poll::Pending);
let (val, _) = join!(fut_a, f.set(17));
assert_eq!(val, 17);
}
}