blob: faad02a4af9d5425b92a4a25fa07dd1742bc6169 [file] [log] [blame]
use std::cell::UnsafeCell;
use std::fmt;
use std::io::{IoSlice, IoSliceMut};
use std::ops::{Deref, DerefMut};
use std::pin::Pin;
use std::sync::atomic::{AtomicBool, Ordering};
use std::task::{Context, Poll};
use crate::event::Event;
use crossbeam_utils::Backoff;
use futures_io::{self as io, AsyncRead, AsyncWrite};
/// A mutex that implements async I/O traits.
///
/// This is a blocking mutex that adds the following impls:
///
/// - `impl<T> AsyncRead for Mutex<T> where &T: AsyncRead + Unpin {}`
/// - `impl<T> AsyncRead for &Mutex<T> where &T: AsyncRead + Unpin {}`
/// - `impl<T> AsyncWrite for Mutex<T> where &T: AsyncWrite + Unpin {}`
/// - `impl<T> AsyncWrite for &Mutex<T> where &T: AsyncWrite + Unpin {}`
///
/// This mutex is ensures fairness by handling lock operations in the first-in first-out order.
///
/// While primarily designed for wrapping async I/O objects, this mutex can also be used as a
/// regular blocking mutex. It's not quite as efficient as [`parking_lot::Mutex`], but it's still
/// an improvement over [`std::sync::Mutex`].
///
/// [`parking_lot::Mutex`]: https://docs.rs/parking_lot/0.10.0/parking_lot/type.Mutex.html
/// [`std::sync::Mutex`]: https://doc.rust-lang.org/std/sync/struct.Mutex.html
///
/// # Examples
///
/// ```
/// use futures::io;
/// use futures::prelude::*;
/// use piper::Mutex;
///
/// // Reads data from a stream and echoes it back.
/// async fn echo(stream: impl AsyncRead + AsyncWrite + Unpin) -> io::Result<u64> {
/// let stream = Mutex::new(stream);
/// io::copy(&stream, &mut &stream).await
/// }
/// ```
pub struct Mutex<T> {
/// Set to `true` when the mutex is acquired by a [`MutexGuard`].
locked: AtomicBool,
/// Lock operations waiting for the mutex to get unlocked.
lock_ops: Event,
/// The value inside the mutex.
data: UnsafeCell<T>,
}
unsafe impl<T: Send> Send for Mutex<T> {}
unsafe impl<T: Send> Sync for Mutex<T> {}
impl<T> Mutex<T> {
/// Creates a new mutex.
///
/// # Examples
///
/// ```
/// use piper::Mutex;
///
/// let mutex = Mutex::new(10);
/// ```
pub fn new(data: T) -> Mutex<T> {
Mutex {
locked: AtomicBool::new(false),
lock_ops: Event::new(),
data: UnsafeCell::new(data),
}
}
/// Acquires the mutex, blocking the current thread until it is able to do so.
///
/// Returns a guard that releases the mutex when dropped.
///
/// # Examples
///
/// ```
/// use piper::Mutex;
///
/// let mutex = Mutex::new(10);
/// let guard = mutex.lock();
/// assert_eq!(*guard, 10);
/// ```
pub fn lock(&self) -> MutexGuard<'_, T> {
loop {
// Try locking the mutex.
let backoff = Backoff::new();
loop {
if let Some(guard) = self.try_lock() {
return guard;
}
if backoff.is_completed() {
break;
}
backoff.snooze();
}
// Start watching for notifications and try locking again.
let l = self.lock_ops.listen();
if let Some(guard) = self.try_lock() {
return guard;
}
l.wait();
}
}
/// Attempts to acquire the mutex.
///
/// If the mutex could not be acquired at this time, then [`None`] is returned. Otherwise, a
/// guard is returned that releases the mutex when dropped.
///
/// [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None
///
/// # Examples
///
/// ```
/// use piper::Mutex;
///
/// let mutex = Mutex::new(10);
/// if let Ok(guard) = mutex.try_lock() {
/// assert_eq!(*guard, 10);
/// }
/// # ;
/// ```
#[inline]
pub fn try_lock(&self) -> Option<MutexGuard<'_, T>> {
if !self.locked.compare_and_swap(false, true, Ordering::Acquire) {
Some(MutexGuard(self))
} else {
None
}
}
/// Consumes the mutex, returning the underlying data.
///
/// # Examples
///
/// ```
/// use piper::Mutex;
///
/// let mutex = Mutex::new(10);
/// assert_eq!(mutex.into_inner(), 10);
/// ```
pub fn into_inner(self) -> T {
self.data.into_inner()
}
/// Returns a mutable reference to the underlying data.
///
/// Since this call borrows the mutex mutably, no actual locking takes place -- the mutable
/// borrow statically guarantees the mutex is not already acquired.
///
/// # Examples
///
/// ```
/// use piper::Mutex;
///
/// let mut mutex = Mutex::new(0);
/// *mutex.get_mut() = 10;
/// assert_eq!(*mutex.lock(), 10);
/// ```
pub fn get_mut(&mut self) -> &mut T {
unsafe { &mut *self.data.get() }
}
}
impl<T: fmt::Debug> fmt::Debug for Mutex<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
struct Locked;
impl fmt::Debug for Locked {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("<locked>")
}
}
match self.try_lock() {
None => f.debug_struct("Mutex").field("data", &Locked).finish(),
Some(guard) => f.debug_struct("Mutex").field("data", &&*guard).finish(),
}
}
}
impl<T> From<T> for Mutex<T> {
fn from(val: T) -> Mutex<T> {
Mutex::new(val)
}
}
impl<T: Default> Default for Mutex<T> {
fn default() -> Mutex<T> {
Mutex::new(Default::default())
}
}
impl<T: AsyncRead + Unpin> AsyncRead for Mutex<T> {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
Pin::new(&mut *self.lock()).poll_read(cx, buf)
}
fn poll_read_vectored(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &mut [IoSliceMut<'_>],
) -> Poll<io::Result<usize>> {
Pin::new(&mut *self.lock()).poll_read_vectored(cx, bufs)
}
}
impl<T: AsyncRead + Unpin> AsyncRead for &Mutex<T> {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
Pin::new(&mut *self.lock()).poll_read(cx, buf)
}
fn poll_read_vectored(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &mut [IoSliceMut<'_>],
) -> Poll<io::Result<usize>> {
Pin::new(&mut *self.lock()).poll_read_vectored(cx, bufs)
}
}
impl<T: AsyncWrite + Unpin> AsyncWrite for Mutex<T> {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
Pin::new(&mut *self.lock()).poll_write(cx, buf)
}
fn poll_write_vectored(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &[IoSlice<'_>],
) -> Poll<io::Result<usize>> {
Pin::new(&mut *self.lock()).poll_write_vectored(cx, bufs)
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Pin::new(&mut *self.lock()).poll_flush(cx)
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Pin::new(&mut *self.lock()).poll_close(cx)
}
}
impl<T: AsyncWrite + Unpin> AsyncWrite for &Mutex<T> {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
Pin::new(&mut *self.lock()).poll_write(cx, buf)
}
fn poll_write_vectored(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &[IoSlice<'_>],
) -> Poll<io::Result<usize>> {
Pin::new(&mut *self.lock()).poll_write_vectored(cx, bufs)
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Pin::new(&mut *self.lock()).poll_flush(cx)
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Pin::new(&mut *self.lock()).poll_close(cx)
}
}
/// A guard that releases the mutex when dropped.
pub struct MutexGuard<'a, T>(&'a Mutex<T>);
unsafe impl<T: Send> Send for MutexGuard<'_, T> {}
unsafe impl<T: Sync> Sync for MutexGuard<'_, T> {}
impl<T> Drop for MutexGuard<'_, T> {
fn drop(&mut self) {
self.0.locked.store(false, Ordering::Release);
self.0.lock_ops.notify_one();
}
}
impl<T: fmt::Debug> fmt::Debug for MutexGuard<'_, T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Debug::fmt(&**self, f)
}
}
impl<T: fmt::Display> fmt::Display for MutexGuard<'_, T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
(**self).fmt(f)
}
}
impl<T> Deref for MutexGuard<'_, T> {
type Target = T;
fn deref(&self) -> &T {
unsafe { &*self.0.data.get() }
}
}
impl<T> DerefMut for MutexGuard<'_, T> {
fn deref_mut(&mut self) -> &mut T {
unsafe { &mut *self.0.data.get() }
}
}