blob: 48aff27db2f4c3c7c40945e8dc88b69d14f1bc32 [file] [log] [blame]
use std::future::Future;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::thread;
use async_lock::{RwLock, RwLockUpgradableReadGuard};
use futures_lite::{future, FutureExt};
fn spawn<T: Send + 'static>(f: impl Future<Output = T> + Send + 'static) -> future::Boxed<T> {
let (s, r) = async_channel::bounded(1);
thread::spawn(move || {
future::block_on(async {
let _ = s.send(f.await).await;
})
});
async move { r.recv().await.unwrap() }.boxed()
}
#[test]
fn smoke() {
future::block_on(async {
let lock = RwLock::new(());
drop(lock.read().await);
drop(lock.write().await);
drop((lock.read().await, lock.read().await));
drop(lock.write().await);
});
}
#[test]
fn try_write() {
future::block_on(async {
let lock = RwLock::new(0isize);
let read_guard = lock.read().await;
assert!(lock.try_write().is_none());
drop(read_guard);
});
}
#[test]
fn into_inner() {
let lock = RwLock::new(10);
assert_eq!(lock.into_inner(), 10);
}
#[test]
fn into_inner_and_drop() {
struct Counter(Arc<AtomicUsize>);
impl Drop for Counter {
fn drop(&mut self) {
self.0.fetch_add(1, Ordering::SeqCst);
}
}
let cnt = Arc::new(AtomicUsize::new(0));
let lock = RwLock::new(Counter(cnt.clone()));
assert_eq!(cnt.load(Ordering::SeqCst), 0);
{
let _inner = lock.into_inner();
assert_eq!(cnt.load(Ordering::SeqCst), 0);
}
assert_eq!(cnt.load(Ordering::SeqCst), 1);
}
#[test]
fn get_mut() {
let mut lock = RwLock::new(10);
*lock.get_mut() = 20;
assert_eq!(lock.into_inner(), 20);
}
#[test]
fn contention() {
const N: u32 = 10;
const M: usize = 1000;
let (tx, rx) = async_channel::unbounded();
let tx = Arc::new(tx);
let rw = Arc::new(RwLock::new(()));
// Spawn N tasks that randomly acquire the lock M times.
for _ in 0..N {
let tx = tx.clone();
let rw = rw.clone();
spawn(async move {
for _ in 0..M {
if fastrand::u32(..N) == 0 {
drop(rw.write().await);
} else {
drop(rw.read().await);
}
}
tx.send(()).await.unwrap();
});
}
future::block_on(async move {
for _ in 0..N {
rx.recv().await.unwrap();
}
});
}
#[test]
fn writer_and_readers() {
let lock = Arc::new(RwLock::new(0i32));
let (tx, rx) = async_channel::unbounded();
// Spawn a writer task.
spawn({
let lock = lock.clone();
async move {
let mut lock = lock.write().await;
for _ in 0..1000 {
let tmp = *lock;
*lock = -1;
future::yield_now().await;
*lock = tmp + 1;
}
tx.send(()).await.unwrap();
}
});
// Readers try to catch the writer in the act.
let mut readers = Vec::new();
for _ in 0..5 {
let lock = lock.clone();
readers.push(spawn(async move {
for _ in 0..1000 {
let lock = lock.read().await;
assert!(*lock >= 0);
}
}));
}
future::block_on(async move {
// Wait for readers to pass their asserts.
for r in readers {
r.await;
}
// Wait for writer to finish.
rx.recv().await.unwrap();
let lock = lock.read().await;
assert_eq!(*lock, 1000);
});
}
#[test]
fn upgrade() {
future::block_on(async {
let lock: RwLock<i32> = RwLock::new(0);
let read_guard = lock.read().await;
let read_guard2 = lock.read().await;
// Should be able to obtain an upgradable lock.
let upgradable_guard = lock.upgradable_read().await;
// Should be able to obtain a read lock when an upgradable lock is active.
let read_guard3 = lock.read().await;
assert_eq!(0, *read_guard3);
drop(read_guard);
drop(read_guard2);
drop(read_guard3);
// Writers should not pass.
assert!(lock.try_write().is_none());
let mut write_guard = RwLockUpgradableReadGuard::try_upgrade(upgradable_guard).expect(
"should be able to upgrade an upgradable lock because there are no more readers",
);
*write_guard += 1;
drop(write_guard);
let read_guard = lock.read().await;
assert_eq!(1, *read_guard)
});
}
#[test]
fn not_upgrade() {
future::block_on(async {
let mutex: RwLock<i32> = RwLock::new(0);
let read_guard = mutex.read().await;
let read_guard2 = mutex.read().await;
// Should be able to obtain an upgradable lock.
let upgradable_guard = mutex.upgradable_read().await;
// Should be able to obtain a shared lock when an upgradable lock is active.
let read_guard3 = mutex.read().await;
assert_eq!(0, *read_guard3);
drop(read_guard);
drop(read_guard2);
drop(read_guard3);
// Drop the upgradable lock.
drop(upgradable_guard);
assert_eq!(0, *(mutex.read().await));
// Should be able to acquire a write lock because there are no more readers.
let mut write_guard = mutex.write().await;
*write_guard += 1;
drop(write_guard);
let read_guard = mutex.read().await;
assert_eq!(1, *read_guard)
});
}
#[test]
fn upgradable_with_concurrent_writer() {
future::block_on(async {
let lock: Arc<RwLock<i32>> = Arc::new(RwLock::new(0));
let lock2 = lock.clone();
let upgradable_guard = lock.upgradable_read().await;
future::or(
async move {
let mut write_guard = lock2.write().await;
*write_guard = 1;
},
async move {
let mut write_guard = RwLockUpgradableReadGuard::upgrade(upgradable_guard).await;
assert_eq!(*write_guard, 0);
*write_guard = 2;
},
)
.await;
assert_eq!(2, *(lock.write().await));
let read_guard = lock.read().await;
assert_eq!(2, *read_guard);
});
}