blob: 2f8b32f88e1e8dc5e05cdc91867b074764b42325 [file] [log] [blame]
//! An asynchronous `Mutex`-like type.
//!
//! This module provides [`Lock`], a type that acts similarly to an asynchronous `Mutex`, with one
//! major difference: the [`LockGuard`] returned by `poll_lock` is not tied to the lifetime of the
//! `Mutex`. This enables you to acquire a lock, and then pass that guard into a future, and then
//! release it at some later point in time.
//!
//! This allows you to do something along the lines of:
//!
//! ```rust,no_run
//! # #[macro_use]
//! # extern crate futures;
//! # extern crate tokio;
//! # use futures::{future, Poll, Async, Future, Stream};
//! use tokio::sync::lock::{Lock, LockGuard};
//! struct MyType<S> {
//! lock: Lock<S>,
//! }
//!
//! impl<S> Future for MyType<S>
//! where S: Stream<Item = u32> + Send + 'static
//! {
//! type Item = ();
//! type Error = ();
//!
//! fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
//! match self.lock.poll_lock() {
//! Async::Ready(mut guard) => {
//! tokio::spawn(future::poll_fn(move || {
//! let item = try_ready!(guard.poll().map_err(|_| ()));
//! println!("item = {:?}", item);
//! Ok(().into())
//! }));
//! Ok(().into())
//! },
//! Async::NotReady => Ok(Async::NotReady)
//! }
//! }
//! }
//! # fn main() {}
//! ```
//!
//! [`Lock`]: struct.Lock.html
//! [`LockGuard`]: struct.LockGuard.html
use futures::Async;
use semaphore;
use std::cell::UnsafeCell;
use std::fmt;
use std::ops::{Deref, DerefMut};
use std::sync::Arc;
/// An asynchronous mutual exclusion primitive useful for protecting shared data
///
/// Each mutex has a type parameter (`T`) which represents the data that it is protecting. The data
/// can only be accessed through the RAII guards returned from `poll_lock`, which guarantees that
/// the data is only ever accessed when the mutex is locked.
#[derive(Debug)]
pub struct Lock<T> {
inner: Arc<State<T>>,
permit: semaphore::Permit,
}
/// A handle to a held `Lock`.
///
/// As long as you have this guard, you have exclusive access to the underlying `T`. The guard
/// internally keeps a reference-couned pointer to the original `Lock`, so even if the lock goes
/// away, the guard remains valid.
///
/// The lock is automatically released whenever the guard is dropped, at which point `poll_lock`
/// will succeed yet again.
#[derive(Debug)]
pub struct LockGuard<T>(Lock<T>);
// As long as T: Send, it's fine to send and share Lock<T> between threads.
// If T was not Send, sending and sharing a Lock<T> would be bad, since you can access T through
// Lock<T>.
unsafe impl<T> Send for Lock<T> where T: Send {}
unsafe impl<T> Sync for Lock<T> where T: Send {}
unsafe impl<T> Sync for LockGuard<T> where T: Send + Sync {}
#[derive(Debug)]
struct State<T> {
c: UnsafeCell<T>,
s: semaphore::Semaphore,
}
#[test]
fn bounds() {
fn check<T: Send>() {}
check::<LockGuard<u32>>();
}
impl<T> Lock<T> {
/// Creates a new lock in an unlocked state ready for use.
pub fn new(t: T) -> Self {
Self {
inner: Arc::new(State {
c: UnsafeCell::new(t),
s: semaphore::Semaphore::new(1),
}),
permit: semaphore::Permit::new(),
}
}
/// Try to acquire the lock.
///
/// If the lock is already held, the current task is notified when it is released.
pub fn poll_lock(&mut self) -> Async<LockGuard<T>> {
if let Async::NotReady = self.permit.poll_acquire(&self.inner.s).unwrap_or_else(|_| {
// The semaphore was closed. but, we never explicitly close it, and we have a
// handle to it through the Arc, which means that this can never happen.
unreachable!()
}) {
return Async::NotReady;
}
// We want to move the acquired permit into the guard,
// and leave an unacquired one in self.
let acquired = Self {
inner: self.inner.clone(),
permit: ::std::mem::replace(&mut self.permit, semaphore::Permit::new()),
};
Async::Ready(LockGuard(acquired))
}
}
impl<T> Drop for LockGuard<T> {
fn drop(&mut self) {
if self.0.permit.is_acquired() {
self.0.permit.release(&self.0.inner.s);
} else if ::std::thread::panicking() {
// A guard _should_ always hold its permit, but if the thread is already panicking,
// we don't want to generate a panic-while-panicing, since that's just unhelpful!
} else {
unreachable!("Permit not held when LockGuard was dropped")
}
}
}
impl<T> From<T> for Lock<T> {
fn from(s: T) -> Self {
Self::new(s)
}
}
impl<T> Clone for Lock<T> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
permit: semaphore::Permit::new(),
}
}
}
impl<T> Default for Lock<T>
where
T: Default,
{
fn default() -> Self {
Self::new(T::default())
}
}
impl<T> Deref for LockGuard<T> {
type Target = T;
fn deref(&self) -> &Self::Target {
assert!(self.0.permit.is_acquired());
unsafe { &*self.0.inner.c.get() }
}
}
impl<T> DerefMut for LockGuard<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
assert!(self.0.permit.is_acquired());
unsafe { &mut *self.0.inner.c.get() }
}
}
impl<T: fmt::Display> fmt::Display for LockGuard<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
fmt::Display::fmt(&**self, f)
}
}