blob: f31ba24811870a8bb336ac7a1279f124b6ca2b48 [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use crate::{
cutex::{AcquisitionPredicateDebug, ALWAYS_TRUE},
AcquisitionPredicate, Cutex, CutexGuard, CutexLockFuture,
};
use futures::prelude::*;
use std::pin::Pin;
use std::task::{Context, Poll};
/// Helper to poll a cutex.
///
/// Since Cutex::lock futures keep track of where in queue the lock request is,
/// this is different to `cutex.lock().poll(ctx)` as that construction will create
/// a new lock request at each poll.
/// This can often be useful when we need to poll something that is contained under
/// this mutex.
///
/// Typical usage:
/// let mut ticket = CutexTicket::new();
/// poll_fn(|ctx| {
/// let cutex_guard = ready!(ticket.poll(ctx));
/// cutex_guard.some_child_future.poll(ctx)
/// }).await;
///
/// What this means:
/// Attempt to acquire the cutex. If it's not available, wait until it's available.
/// With the cutex acquired, check some_child_future.
/// If it's completed, complete the poll_fn.
/// *If it's not completed* drop the cutex guard (unblock other tasks) and wait for
/// some_child_future to be awoken.
pub struct CutexTicket<'a, 'b, T> {
cutex: &'a Cutex<T>,
check: Pin<&'b dyn AcquisitionPredicate<T>>,
lock: Option<CutexLockFuture<'a, 'b, T>>,
}
impl<'a, 'b, T> std::fmt::Debug for CutexTicket<'a, 'b, T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("CutexTicket")
.field("check", &AcquisitionPredicateDebug(&*self.check))
.field("lock", &self.lock)
.finish()
}
}
impl<'a, 'b, T> CutexTicket<'a, 'b, T> {
/// Construct a new `CutexTicket`
pub fn new(cutex: &'a Cutex<T>) -> CutexTicket<'a, 'b, T> {
Self::new_when_pinned(cutex, Pin::new(&ALWAYS_TRUE))
}
/// Construct a new `CutexTicket` with a predicate
pub fn new_when_pinned(
cutex: &'a Cutex<T>,
check: Pin<&'b dyn AcquisitionPredicate<T>>,
) -> CutexTicket<'a, 'b, T> {
CutexTicket { cutex, check, lock: None }
}
/// Poll once to see if the lock has been acquired.
pub fn poll(&mut self, ctx: &mut Context<'_>) -> Poll<CutexGuard<'a, T>> {
let mut lock_fut = match self.lock.take() {
None => self.cutex.lock_when_pinned(self.check),
Some(lock_fut) => lock_fut,
};
match lock_fut.poll_unpin(ctx) {
Poll::Pending => {
self.lock = Some(lock_fut);
Poll::Pending
}
Poll::Ready(g) => Poll::Ready(g),
}
}
}
#[cfg(test)]
mod poll_cutex_test {
use super::CutexTicket;
use crate::Cutex;
use anyhow::{format_err, Error};
use fuchsia_async::Timer;
use futures::{
channel::oneshot,
future::{poll_fn, try_join},
task::noop_waker_ref,
};
use matches::assert_matches;
use std::{
task::{Context, Poll},
time::Duration,
};
#[fuchsia_async::run_singlethreaded(test)]
async fn basics(run: usize) {
let cutex = Cutex::new(run);
let mut ctx = Context::from_waker(noop_waker_ref());
let mut poll_cutex = CutexTicket::new(&cutex);
assert_matches!(poll_cutex.poll(&mut ctx), Poll::Ready(_));
assert_matches!(poll_cutex.poll(&mut ctx), Poll::Ready(_));
assert_matches!(poll_cutex.poll(&mut ctx), Poll::Ready(_));
let cutex_guard = cutex.lock().await;
assert_matches!(poll_cutex.poll(&mut ctx), Poll::Pending);
assert_matches!(poll_cutex.poll(&mut ctx), Poll::Pending);
drop(cutex_guard);
assert_matches!(poll_cutex.poll(&mut ctx), Poll::Ready(_));
}
#[fuchsia_async::run_singlethreaded(test)]
async fn wakes_up(run: usize) -> Result<(), Error> {
let cutex = Cutex::new(run);
let (tx_saw_first_pending, rx_saw_first_pending) = oneshot::channel();
let mut poll_cutex = CutexTicket::new(&cutex);
let cutex_guard = cutex.lock().await;
try_join(
async move {
assert_matches!(
poll_cutex.poll(&mut Context::from_waker(noop_waker_ref())),
Poll::Pending
);
tx_saw_first_pending.send(()).map_err(|_| format_err!("cancelled"))?;
assert_eq!(*poll_fn(|ctx| poll_cutex.poll(ctx)).await, run);
Ok(())
},
async move {
rx_saw_first_pending.await?;
Timer::new(Duration::from_millis(300)).await;
drop(cutex_guard);
Ok(())
},
)
.await
.map(|_| ())
}
}