blob: c60e0eda454a898747ec7cb35161c7857309e444 [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use crate::constants::{HAS_WAITERS, IS_LOCKED, SENTINEL};
use crate::list;
use futures::pin_mut;
use list::List;
use parking_lot::Mutex as ParkingLotMutex;
use pin_project::{pin_project, pinned_drop};
use std::cell::UnsafeCell;
use std::future::Future;
use std::pin::Pin;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::task::{Context, Poll, Waker};
/// Determines if a Cutex can be locked by a waiter.
pub trait AcquisitionPredicate<T>: Unpin + Send + Sync {
/// Verify whether the object under the cutex is in a state where the lock requestor
/// can proceed. Return true if so.
/// If false is returned, later waiters are consulted to see if they can proceed, this
/// waiter retains its position in the queue, and will be re-visited on the next lock release.
fn can_lock(&self, value: &T) -> bool;
/// Provide debug output for this predicate. By default we just display a pointer to
/// us, but other predicates may be able to provide better debugging information.
fn debug(&self, fmt: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(fmt, "{:p}", self)
}
}
/// Acquisition predicate that can always lock.
pub(crate) struct AlwaysTrue;
impl<T> AcquisitionPredicate<T> for AlwaysTrue {
fn can_lock(&self, _: &T) -> bool {
true
}
fn debug(&self, fmt: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
fmt.write_str("true")
}
}
pub(crate) struct AcquisitionPredicateDebug<'a, T>(pub(crate) &'a dyn AcquisitionPredicate<T>);
impl<'a, T> std::fmt::Debug for AcquisitionPredicateDebug<'a, T> {
fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.0.debug(fmt)
}
}
/// Instance of AlwaysTrue that's always available.
pub(crate) const ALWAYS_TRUE: AlwaysTrue = AlwaysTrue;
impl<T, F: Unpin + Send + Sync + Fn(&T) -> bool> AcquisitionPredicate<T> for F {
fn can_lock(&self, value: &T) -> bool {
(*self)(value)
}
}
struct Waiter<T> {
waker: Option<Waker>,
predicate: *const dyn AcquisitionPredicate<T>,
}
impl<T> std::fmt::Debug for Waiter<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Waiter")
.field("waker", &self.waker)
.field("predicate", &AcquisitionPredicateDebug(unsafe { &*self.predicate }))
.finish()
}
}
/// Safe since `AcquisitionPredicate` is `Sync`.
unsafe impl<T> Send for Waiter<T> {}
struct CutexState {
state: AtomicUsize,
}
impl std::fmt::Debug for CutexState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let state = self.state.load(Ordering::Relaxed);
#[derive(Debug)]
enum GrantState {
Nobody,
WaitKey(usize),
}
f.debug_struct("CutexState")
.field("is_locked", &(state & IS_LOCKED != 0))
.field("has_waiters", &(state & HAS_WAITERS != 0))
.field(
"granted_to",
&match state & SENTINEL {
SENTINEL => GrantState::Nobody,
x => GrantState::WaitKey(x),
},
)
.finish()
}
}
impl CutexState {
fn new() -> Self {
Self { state: AtomicUsize::new(SENTINEL) }
}
// Try to lock the cutex, return true if successful.
#[must_use]
fn try_lock(&self, wait_key: usize) -> bool {
let old_state = self.state.fetch_or(IS_LOCKED, Ordering::Acquire);
if old_state & IS_LOCKED == 0 {
return true;
}
if wait_key != SENTINEL && (old_state & SENTINEL) == wait_key {
// Set the grantee to SENTINEL again, and return locked.
self.state.fetch_or(SENTINEL, Ordering::Relaxed);
return true;
}
return false;
}
// Unlock a previously locked cutex IFF it has no waiters.
// Returns true if the cutex was unlocked, false if not.
#[must_use]
fn unlock_if_no_waiters(&self) -> bool {
#[allow(deprecated)] // TODO(fxbug.dev/67113) migrate to compare_exchange
let old_state =
self.state.compare_and_swap(SENTINEL | IS_LOCKED, SENTINEL, Ordering::AcqRel);
old_state == SENTINEL | IS_LOCKED
}
// Unlock a previously locked cutex.
fn unlock(&self) {
self.state.fetch_and(!IS_LOCKED, Ordering::AcqRel);
}
// Grants a given waiter the active lock
fn grant_waiter_lock(&self, wait_key: usize) {
self.state.store(wait_key | IS_LOCKED | HAS_WAITERS, Ordering::Release)
}
// Removing a wait key: if this wait_key was granted the lock but not polled, we need to
// unlock again.
#[must_use]
fn remove_wait_key(&self, wait_key: usize) -> bool {
if wait_key != SENTINEL {
let cur_state = self.state.load(Ordering::Acquire);
if cur_state & SENTINEL == wait_key {
self.state.fetch_or(SENTINEL, Ordering::Relaxed);
return true;
}
}
return false;
}
// Mark that there are now waiters
fn mark_waiters(&self) {
self.state.fetch_or(HAS_WAITERS, Ordering::Relaxed);
}
// Mark that there are now no waiters
fn clear_waiters(&self) {
self.state.fetch_and(!HAS_WAITERS, Ordering::Relaxed);
}
}
/// Cutex - a Conditionally-acquired mUTEX
///
/// A cutex:
/// - is fair under high contention
/// - can be conditionally acquired using lock_when
/// - otherwise presents a very similar API to future::lock::Mutex
#[derive(Debug)]
pub struct Cutex<T> {
value: UnsafeCell<T>,
state: CutexState,
waiters: ParkingLotMutex<List<Waiter<T>>>,
}
unsafe impl<T> Send for Cutex<T> {}
unsafe impl<T> Sync for Cutex<T> {}
impl<T> Cutex<T> {
/// Construct a new Cutex with an initial `value`
pub fn new(value: T) -> Cutex<T> {
Cutex {
value: UnsafeCell::new(value),
state: CutexState::new(),
waiters: ParkingLotMutex::new(List::new()),
}
}
/// Try to lock the Cutex without evaluating any predicate.
fn try_lock(&self, wait_key: usize) -> Option<CutexGuard<'_, T>> {
if self.state.try_lock(wait_key) {
Some(CutexGuard { cutex: self })
} else {
None
}
}
/// Remove some waiter from the waiter list.
fn remove(&self, wait_key: &mut usize) {
if *wait_key == SENTINEL {
return;
}
let mut waiters = self.waiters.lock();
let (_, is_last) = waiters.remove(*wait_key);
if is_last {
self.state.clear_waiters();
}
*wait_key = SENTINEL;
}
/// Unconditionally lock the Cutex
pub fn lock(&self) -> CutexLockFuture<'_, '_, T> {
self.lock_when_pinned(Pin::new(&ALWAYS_TRUE))
}
/// Lock the Cutex when `predicate`'s can_lock returns true.
///
/// When the `predicate` returns false the lock request keeps it's position in queue
/// but other (later) waiters are consulted to see if they can proceed -- so a predicate that
/// always returns false does not prevent *other* waiters from proceeding.
pub async fn lock_when<'a, 'b>(
&'a self,
predicate: impl 'b + AcquisitionPredicate<T>,
) -> CutexGuard<'a, T> {
pin_mut!(predicate);
self.lock_when_pinned(predicate).await
}
/// Lock the Cutex when predicate's can_lock returns true.
///
/// See `lock_when` for a full description of predicate semantics.
pub fn lock_when_pinned<'a, 'b>(
&'a self,
predicate: Pin<&'b (dyn AcquisitionPredicate<T> + 'b)>,
) -> CutexLockFuture<'a, 'b, T> {
CutexLockFuture { cutex: self, predicate, wait_key: SENTINEL }
}
fn unlock(&self) {
if !self.state.unlock_if_no_waiters() {
let mut waiters = self.waiters.lock();
let value = unsafe { &*self.value.get() };
let completed_iteration = waiters.for_each_until_mut(|id, waiter| {
if unsafe { &*waiter.predicate }.can_lock(value) {
if let Some(w) = waiter.waker.take() {
self.state.grant_waiter_lock(id);
w.wake();
// Short circuit and leave the iteration.
return false;
}
}
return true;
});
if completed_iteration {
self.state.unlock();
}
}
}
}
/// A future that waits for a Cutex to be locked with the given predicate.
/// Once this future completes and returns a CutexGuard, it's behavior if polled
/// is explicitly unstable and may change in the future.
#[pin_project(PinnedDrop)]
pub struct CutexLockFuture<'a, 'b, T> {
cutex: &'a Cutex<T>,
predicate: Pin<&'b (dyn AcquisitionPredicate<T> + 'b)>,
wait_key: usize,
}
impl<'a, 'b, T> std::fmt::Debug for CutexLockFuture<'a, 'b, T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("CutexLockFuture")
.field("predicate", &AcquisitionPredicateDebug(&*self.predicate))
.field(
"wait_key",
&match self.wait_key {
SENTINEL => None,
x => Some(x),
},
)
.finish()
}
}
#[pinned_drop]
impl<'a, 'b, T> PinnedDrop for CutexLockFuture<'a, 'b, T> {
fn drop(mut self: Pin<&mut Self>) {
let unlock_again = self.cutex.state.remove_wait_key(self.wait_key);
self.cutex.remove(&mut self.wait_key);
if unlock_again {
self.cutex.unlock();
}
}
}
impl<'a, 'b, T> Future for CutexLockFuture<'a, 'b, T> {
type Output = CutexGuard<'a, T>;
fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
let cutex = this.cutex;
if let Some(guard) = cutex.try_lock(*this.wait_key) {
if this.predicate.can_lock(unsafe { &*cutex.value.get() }) {
cutex.remove(this.wait_key);
return Poll::Ready(guard);
}
}
{
let mut waiters = cutex.waiters.lock();
if *this.wait_key == SENTINEL {
let (wait_key, is_first) = waiters.push(Waiter {
waker: Some(ctx.waker().clone()),
predicate: unsafe {
// Transmute to cast away the lifetime of `predicate`.
// We guarantee that no access to `predicate` happens after it's destroyed
// by removing ourselves from the waiters list during drop.
std::mem::transmute(&*this.predicate.as_ref())
},
});
*this.wait_key = wait_key;
if is_first {
cutex.state.mark_waiters();
}
} else {
waiters.get_mut(*this.wait_key).waker = Some(ctx.waker().clone());
}
}
// Ensure that we haven't raced `CutexGuard::drop`'s unlock path by attempting to acquire
// the lock again.
if let Some(guard) = cutex.try_lock(*this.wait_key) {
if this.predicate.can_lock(unsafe { &*cutex.value.get() }) {
cutex.remove(this.wait_key);
return Poll::Ready(guard);
}
}
Poll::Pending
}
}
/// A locked `Cutex`. When dropped, drops the lock and allows the next waiter in.
#[derive(Debug)]
pub struct CutexGuard<'a, T> {
cutex: &'a Cutex<T>,
}
impl<'a, T> Drop for CutexGuard<'a, T> {
fn drop(&mut self) {
self.cutex.unlock();
}
}
impl<'a, T> std::ops::Deref for CutexGuard<'a, T> {
type Target = T;
fn deref(&self) -> &T {
unsafe { &*self.cutex.value.get() }
}
}
impl<'a, T> std::ops::DerefMut for CutexGuard<'a, T> {
fn deref_mut(&mut self) -> &mut T {
unsafe { &mut *self.cutex.value.get() }
}
}
#[cfg(test)]
mod tests {
use super::*;
use futures::executor::block_on;
use futures::future::join;
use futures::prelude::*;
use futures::task::noop_waker_ref;
use matches::assert_matches;
#[test]
fn noop() {
let _c = Cutex::new(());
}
#[test]
fn can_lock_uncontested() {
let _ = block_on(Cutex::new(()).lock());
}
#[test]
fn can_lock_contested() {
block_on(async move {
let c = &Cutex::new(1u8);
let g = c.lock().await;
assert_eq!(*g, 1);
join(
async move {
*c.lock().await = 2;
},
async move {
assert_eq!(*g, 1);
drop(g);
},
)
.await;
assert_eq!(*c.lock().await, 2);
});
}
#[test]
fn can_lock_when() {
block_on(async move {
let c = &Cutex::new(1u8);
join(
async move {
*c.lock_when(|x: &u8| *x == 2).await = 3;
},
async move {
let mut g = c.lock().await;
assert_eq!(*g, 1);
*g = 2;
},
)
.await;
let g = c.lock().await;
assert_eq!(*g, 3);
})
}
#[test]
fn can_drop_waiter() {
block_on(async move {
let c = &Cutex::new(1u8);
drop(c.lock());
c.lock().await;
})
}
#[test]
fn can_drop_contested_waiter() {
let c = &Cutex::new(1u8);
let mut ctx = Context::from_waker(noop_waker_ref());
let g = c.lock().now_or_never().unwrap();
let mut f1 = c.lock().boxed();
let mut f2 = c.lock().boxed();
assert_matches!(f1.as_mut().poll(&mut ctx), Poll::Pending);
assert_matches!(f2.as_mut().poll(&mut ctx), Poll::Pending);
drop(g); // Fairness says give the lock to f1
drop(f1); // But then abandon it...
assert_matches!(f2.as_mut().poll(&mut ctx), Poll::Ready(_));
}
#[test]
fn can_drop_contested_waiter2() {
let c = &Cutex::new(1u8);
let mut ctx = Context::from_waker(noop_waker_ref());
let g = c.lock().now_or_never().unwrap();
let mut f1 = c.lock().boxed();
let mut f2 = c.lock().boxed();
assert_matches!(f1.as_mut().poll(&mut ctx), Poll::Pending);
assert_matches!(f2.as_mut().poll(&mut ctx), Poll::Pending);
drop(g); // Fairness says give the lock to f1
drop(f2); // Abandoning the other waiter ought to have no effect
assert_matches!(f1.as_mut().poll(&mut ctx), Poll::Ready(_));
}
}