blob: 87010b658e07f6758d35fe028f99bdf51ce9d1f4 [file] [log] [blame]
#![warn(rust_2018_idioms)]
use std::sync::Arc;
use std::task::Poll;
use futures::future::FutureExt;
use futures::stream;
use futures::stream::StreamExt;
use tokio::sync::{Barrier, RwLock};
use tokio_test::task::spawn;
use tokio_test::{assert_pending, assert_ready};
#[test]
fn into_inner() {
let rwlock = RwLock::new(42);
assert_eq!(rwlock.into_inner(), 42);
}
// multiple reads should be Ready
#[test]
fn read_shared() {
let rwlock = RwLock::new(100);
let mut t1 = spawn(rwlock.read());
let _g1 = assert_ready!(t1.poll());
let mut t2 = spawn(rwlock.read());
assert_ready!(t2.poll());
}
// When there is an active shared owner, exclusive access should not be possible
#[test]
fn write_shared_pending() {
let rwlock = RwLock::new(100);
let mut t1 = spawn(rwlock.read());
let _g1 = assert_ready!(t1.poll());
let mut t2 = spawn(rwlock.write());
assert_pending!(t2.poll());
}
// When there is an active exclusive owner, subsequent exclusive access should not be possible
#[test]
fn read_exclusive_pending() {
let rwlock = RwLock::new(100);
let mut t1 = spawn(rwlock.write());
let _g1 = assert_ready!(t1.poll());
let mut t2 = spawn(rwlock.read());
assert_pending!(t2.poll());
}
// If the max shared access is reached and subsquent shared access is pending
// should be made available when one of the shared acesses is dropped
#[test]
fn exhaust_reading() {
let rwlock = RwLock::new(100);
let mut reads = Vec::new();
loop {
let mut t = spawn(rwlock.read());
match t.poll() {
Poll::Ready(guard) => reads.push(guard),
Poll::Pending => break,
}
}
let mut t1 = spawn(rwlock.read());
assert_pending!(t1.poll());
let g2 = reads.pop().unwrap();
drop(g2);
assert!(t1.is_woken());
assert_ready!(t1.poll());
}
// When there is an active exclusive owner, subsequent exclusive access should not be possible
#[test]
fn write_exclusive_pending() {
let rwlock = RwLock::new(100);
let mut t1 = spawn(rwlock.write());
let _g1 = assert_ready!(t1.poll());
let mut t2 = spawn(rwlock.write());
assert_pending!(t2.poll());
}
// When there is an active shared owner, exclusive access should be possible after shared is dropped
#[test]
fn write_shared_drop() {
let rwlock = RwLock::new(100);
let mut t1 = spawn(rwlock.read());
let g1 = assert_ready!(t1.poll());
let mut t2 = spawn(rwlock.write());
assert_pending!(t2.poll());
drop(g1);
assert!(t2.is_woken());
assert_ready!(t2.poll());
}
// when there is an active shared owner, and exclusive access is triggered,
// subsequent shared access should not be possible as write gathers all the available semaphore permits
#[test]
fn write_read_shared_pending() {
let rwlock = RwLock::new(100);
let mut t1 = spawn(rwlock.read());
let _g1 = assert_ready!(t1.poll());
let mut t2 = spawn(rwlock.read());
assert_ready!(t2.poll());
let mut t3 = spawn(rwlock.write());
assert_pending!(t3.poll());
let mut t4 = spawn(rwlock.read());
assert_pending!(t4.poll());
}
// when there is an active shared owner, and exclusive access is triggered,
// reading should be possible after pending exclusive access is dropped
#[test]
fn write_read_shared_drop_pending() {
let rwlock = RwLock::new(100);
let mut t1 = spawn(rwlock.read());
let _g1 = assert_ready!(t1.poll());
let mut t2 = spawn(rwlock.write());
assert_pending!(t2.poll());
let mut t3 = spawn(rwlock.read());
assert_pending!(t3.poll());
drop(t2);
assert!(t3.is_woken());
assert_ready!(t3.poll());
}
// Acquire an RwLock nonexclusively by a single task
#[tokio::test]
async fn read_uncontested() {
let rwlock = RwLock::new(100);
let result = *rwlock.read().await;
assert_eq!(result, 100);
}
// Acquire an uncontested RwLock in exclusive mode
#[tokio::test]
async fn write_uncontested() {
let rwlock = RwLock::new(100);
let mut result = rwlock.write().await;
*result += 50;
assert_eq!(*result, 150);
}
// RwLocks should be acquired in the order that their Futures are waited upon.
#[tokio::test]
async fn write_order() {
let rwlock = RwLock::<Vec<u32>>::new(vec![]);
let fut2 = rwlock.write().map(|mut guard| guard.push(2));
let fut1 = rwlock.write().map(|mut guard| guard.push(1));
fut1.await;
fut2.await;
let g = rwlock.read().await;
assert_eq!(*g, vec![1, 2]);
}
// A single RwLock is contested by tasks in multiple threads
#[tokio::test(threaded_scheduler)]
async fn multithreaded() {
let barrier = Arc::new(Barrier::new(5));
let rwlock = Arc::new(RwLock::<u32>::new(0));
let rwclone1 = rwlock.clone();
let rwclone2 = rwlock.clone();
let rwclone3 = rwlock.clone();
let rwclone4 = rwlock.clone();
let b1 = barrier.clone();
tokio::spawn(async move {
stream::iter(0..1000)
.for_each(move |_| {
let rwlock = rwclone1.clone();
async move {
let mut guard = rwlock.write().await;
*guard += 2;
}
})
.await;
b1.wait().await;
});
let b2 = barrier.clone();
tokio::spawn(async move {
stream::iter(0..1000)
.for_each(move |_| {
let rwlock = rwclone2.clone();
async move {
let mut guard = rwlock.write().await;
*guard += 3;
}
})
.await;
b2.wait().await;
});
let b3 = barrier.clone();
tokio::spawn(async move {
stream::iter(0..1000)
.for_each(move |_| {
let rwlock = rwclone3.clone();
async move {
let mut guard = rwlock.write().await;
*guard += 5;
}
})
.await;
b3.wait().await;
});
let b4 = barrier.clone();
tokio::spawn(async move {
stream::iter(0..1000)
.for_each(move |_| {
let rwlock = rwclone4.clone();
async move {
let mut guard = rwlock.write().await;
*guard += 7;
}
})
.await;
b4.wait().await;
});
barrier.wait().await;
let g = rwlock.read().await;
assert_eq!(*g, 17_000);
}