blob: 4e2fb74d19e0fbb64513c430e8439931897e3e4e [file] [log] [blame]
use crate::sync::batch_semaphore::{AcquireError, Semaphore};
use std::cell::UnsafeCell;
use std::ops;
#[cfg(not(loom))]
const MAX_READS: usize = 32;
#[cfg(loom)]
const MAX_READS: usize = 10;
/// An asynchronous reader-writer lock
///
/// This type of lock allows a number of readers or at most one writer at any
/// point in time. The write portion of this lock typically allows modification
/// of the underlying data (exclusive access) and the read portion of this lock
/// typically allows for read-only access (shared access).
///
/// In comparison, a [`Mutex`] does not distinguish between readers or writers
/// that acquire the lock, therefore causing any tasks waiting for the lock to
/// become available to yield. An `RwLock` will allow any number of readers to
/// acquire the lock as long as a writer is not holding the lock.
///
/// The priority policy of Tokio's read-write lock is _fair_ (or
/// [_write-preferring_]), in order to ensure that readers cannot starve
/// writers. Fairness is ensured using a first-in, first-out queue for the tasks
/// awaiting the lock; if a task that wishes to acquire the write lock is at the
/// head of the queue, read locks will not be given out until the write lock has
/// been released. This is in contrast to the Rust standard library's
/// `std::sync::RwLock`, where the priority policy is dependent on the
/// operating system's implementation.
///
/// The type parameter `T` represents the data that this lock protects. It is
/// required that `T` satisfies [`Send`] to be shared across threads. The RAII guards
/// returned from the locking methods implement [`Deref`](https://doc.rust-lang.org/std/ops/trait.Deref.html)
/// (and [`DerefMut`](https://doc.rust-lang.org/std/ops/trait.DerefMut.html)
/// for the `write` methods) to allow access to the content of the lock.
///
/// # Examples
///
/// ```
/// use tokio::sync::RwLock;
///
/// #[tokio::main]
/// async fn main() {
/// let lock = RwLock::new(5);
///
/// // many reader locks can be held at once
/// {
/// let r1 = lock.read().await;
/// let r2 = lock.read().await;
/// assert_eq!(*r1, 5);
/// assert_eq!(*r2, 5);
/// } // read locks are dropped at this point
///
/// // only one write lock may be held, however
/// {
/// let mut w = lock.write().await;
/// *w += 1;
/// assert_eq!(*w, 6);
/// } // write lock is dropped here
/// }
/// ```
///
/// [`Mutex`]: struct@super::Mutex
/// [`RwLock`]: struct@RwLock
/// [`RwLockReadGuard`]: struct@RwLockReadGuard
/// [`RwLockWriteGuard`]: struct@RwLockWriteGuard
/// [`Send`]: https://doc.rust-lang.org/std/marker/trait.Send.html
/// [_write-preferring_]: https://en.wikipedia.org/wiki/Readers%E2%80%93writer_lock#Priority_policies
#[derive(Debug)]
pub struct RwLock<T> {
//semaphore to coordinate read and write access to T
s: Semaphore,
//inner data T
c: UnsafeCell<T>,
}
/// RAII structure used to release the shared read access of a lock when
/// dropped.
///
/// This structure is created by the [`read`] method on
/// [`RwLock`].
///
/// [`read`]: method@RwLock::read
#[derive(Debug)]
pub struct RwLockReadGuard<'a, T> {
permit: ReleasingPermit<'a, T>,
lock: &'a RwLock<T>,
}
/// RAII structure used to release the exclusive write access of a lock when
/// dropped.
///
/// This structure is created by the [`write`] and method
/// on [`RwLock`].
///
/// [`write`]: method@RwLock::write
/// [`RwLock`]: struct@RwLock
#[derive(Debug)]
pub struct RwLockWriteGuard<'a, T> {
permit: ReleasingPermit<'a, T>,
lock: &'a RwLock<T>,
}
// Wrapper arround Permit that releases on Drop
#[derive(Debug)]
struct ReleasingPermit<'a, T> {
num_permits: u16,
lock: &'a RwLock<T>,
}
impl<'a, T> ReleasingPermit<'a, T> {
async fn acquire(
lock: &'a RwLock<T>,
num_permits: u16,
) -> Result<ReleasingPermit<'a, T>, AcquireError> {
lock.s.acquire(num_permits).await?;
Ok(Self { num_permits, lock })
}
}
impl<'a, T> Drop for ReleasingPermit<'a, T> {
fn drop(&mut self) {
self.lock.s.release(self.num_permits as usize);
}
}
#[test]
#[cfg(not(loom))]
fn bounds() {
fn check_send<T: Send>() {}
fn check_sync<T: Sync>() {}
fn check_unpin<T: Unpin>() {}
// This has to take a value, since the async fn's return type is unnameable.
fn check_send_sync_val<T: Send + Sync>(_t: T) {}
check_send::<RwLock<u32>>();
check_sync::<RwLock<u32>>();
check_unpin::<RwLock<u32>>();
check_sync::<RwLockReadGuard<'_, u32>>();
check_unpin::<RwLockReadGuard<'_, u32>>();
check_sync::<RwLockWriteGuard<'_, u32>>();
check_unpin::<RwLockWriteGuard<'_, u32>>();
let rwlock = RwLock::new(0);
check_send_sync_val(rwlock.read());
check_send_sync_val(rwlock.write());
}
// As long as T: Send + Sync, it's fine to send and share RwLock<T> between threads.
// If T were not Send, sending and sharing a RwLock<T> would be bad, since you can access T through
// RwLock<T>.
unsafe impl<T> Send for RwLock<T> where T: Send {}
unsafe impl<T> Sync for RwLock<T> where T: Send + Sync {}
unsafe impl<'a, T> Sync for RwLockReadGuard<'a, T> where T: Send + Sync {}
unsafe impl<'a, T> Sync for RwLockWriteGuard<'a, T> where T: Send + Sync {}
impl<T> RwLock<T> {
/// Creates a new instance of an `RwLock<T>` which is unlocked.
///
/// # Examples
///
/// ```
/// use tokio::sync::RwLock;
///
/// let lock = RwLock::new(5);
/// ```
pub fn new(value: T) -> RwLock<T> {
RwLock {
c: UnsafeCell::new(value),
s: Semaphore::new(MAX_READS),
}
}
/// Locks this rwlock with shared read access, causing the current task
/// to yield until the lock has been acquired.
///
/// The calling task will yield until there are no more writers which
/// hold the lock. There may be other readers currently inside the lock when
/// this method returns.
///
/// # Examples
///
/// ```
/// use std::sync::Arc;
/// use tokio::sync::RwLock;
///
/// #[tokio::main]
/// async fn main() {
/// let lock = Arc::new(RwLock::new(1));
/// let c_lock = lock.clone();
///
/// let n = lock.read().await;
/// assert_eq!(*n, 1);
///
/// tokio::spawn(async move {
/// // While main has an active read lock, we acquire one too.
/// let r = c_lock.read().await;
/// assert_eq!(*r, 1);
/// }).await.expect("The spawned task has paniced");
///
/// // Drop the guard after the spawned task finishes.
/// drop(n);
///}
/// ```
pub async fn read(&self) -> RwLockReadGuard<'_, T> {
let permit = ReleasingPermit::acquire(self, 1).await.unwrap_or_else(|_| {
// The semaphore was closed. but, we never explicitly close it, and we have a
// handle to it through the Arc, which means that this can never happen.
unreachable!()
});
RwLockReadGuard { lock: self, permit }
}
/// Locks this rwlock with exclusive write access, causing the current task
/// to yield until the lock has been acquired.
///
/// This function will not return while other writers or other readers
/// currently have access to the lock.
///
/// Returns an RAII guard which will drop the write access of this rwlock
/// when dropped.
///
/// # Examples
///
/// ```
/// use tokio::sync::RwLock;
///
/// #[tokio::main]
/// async fn main() {
/// let lock = RwLock::new(1);
///
/// let mut n = lock.write().await;
/// *n = 2;
///}
/// ```
pub async fn write(&self) -> RwLockWriteGuard<'_, T> {
let permit = ReleasingPermit::acquire(self, MAX_READS as u16)
.await
.unwrap_or_else(|_| {
// The semaphore was closed. but, we never explicitly close it, and we have a
// handle to it through the Arc, which means that this can never happen.
unreachable!()
});
RwLockWriteGuard { lock: self, permit }
}
/// Consumes the lock, returning the underlying data.
pub fn into_inner(self) -> T {
self.c.into_inner()
}
}
impl<T> ops::Deref for RwLockReadGuard<'_, T> {
type Target = T;
fn deref(&self) -> &T {
unsafe { &*self.lock.c.get() }
}
}
impl<T> ops::Deref for RwLockWriteGuard<'_, T> {
type Target = T;
fn deref(&self) -> &T {
unsafe { &*self.lock.c.get() }
}
}
impl<T> ops::DerefMut for RwLockWriteGuard<'_, T> {
fn deref_mut(&mut self) -> &mut T {
unsafe { &mut *self.lock.c.get() }
}
}
impl<T> From<T> for RwLock<T> {
fn from(s: T) -> Self {
Self::new(s)
}
}
impl<T> Default for RwLock<T>
where
T: Default,
{
fn default() -> Self {
Self::new(T::default())
}
}