blob: 38c7c939238d99ac253fe92be407fc1aed473509 [file] [log] [blame]
#![warn(rust_2018_idioms)]
#![cfg(feature = "full")]
use tokio::runtime::{self, Runtime};
use tokio::sync::{mpsc, oneshot};
use tokio::task::{self, LocalSet};
use tokio::time;
use std::cell::Cell;
use std::sync::atomic::Ordering::{self, SeqCst};
use std::sync::atomic::{AtomicBool, AtomicUsize};
use std::time::Duration;
#[tokio::test(basic_scheduler)]
async fn local_basic_scheduler() {
LocalSet::new()
.run_until(async {
task::spawn_local(async {}).await.unwrap();
})
.await;
}
#[tokio::test(threaded_scheduler)]
async fn local_threadpool() {
thread_local! {
static ON_RT_THREAD: Cell<bool> = Cell::new(false);
}
ON_RT_THREAD.with(|cell| cell.set(true));
LocalSet::new()
.run_until(async {
assert!(ON_RT_THREAD.with(|cell| cell.get()));
task::spawn_local(async {
assert!(ON_RT_THREAD.with(|cell| cell.get()));
})
.await
.unwrap();
})
.await;
}
#[tokio::test(threaded_scheduler)]
async fn localset_future_threadpool() {
thread_local! {
static ON_LOCAL_THREAD: Cell<bool> = Cell::new(false);
}
ON_LOCAL_THREAD.with(|cell| cell.set(true));
let local = LocalSet::new();
local.spawn_local(async move {
assert!(ON_LOCAL_THREAD.with(|cell| cell.get()));
});
local.await;
}
#[tokio::test(threaded_scheduler)]
async fn localset_future_timers() {
static RAN1: AtomicBool = AtomicBool::new(false);
static RAN2: AtomicBool = AtomicBool::new(false);
let local = LocalSet::new();
local.spawn_local(async move {
time::delay_for(Duration::from_millis(10)).await;
RAN1.store(true, Ordering::SeqCst);
});
local.spawn_local(async move {
time::delay_for(Duration::from_millis(20)).await;
RAN2.store(true, Ordering::SeqCst);
});
local.await;
assert!(RAN1.load(Ordering::SeqCst));
assert!(RAN2.load(Ordering::SeqCst));
}
#[tokio::test]
async fn localset_future_drives_all_local_futs() {
static RAN1: AtomicBool = AtomicBool::new(false);
static RAN2: AtomicBool = AtomicBool::new(false);
static RAN3: AtomicBool = AtomicBool::new(false);
let local = LocalSet::new();
local.spawn_local(async move {
task::spawn_local(async {
task::yield_now().await;
RAN3.store(true, Ordering::SeqCst);
});
task::yield_now().await;
RAN1.store(true, Ordering::SeqCst);
});
local.spawn_local(async move {
task::yield_now().await;
RAN2.store(true, Ordering::SeqCst);
});
local.await;
assert!(RAN1.load(Ordering::SeqCst));
assert!(RAN2.load(Ordering::SeqCst));
assert!(RAN3.load(Ordering::SeqCst));
}
#[tokio::test(threaded_scheduler)]
async fn local_threadpool_timer() {
// This test ensures that runtime services like the timer are properly
// set for the local task set.
thread_local! {
static ON_RT_THREAD: Cell<bool> = Cell::new(false);
}
ON_RT_THREAD.with(|cell| cell.set(true));
LocalSet::new()
.run_until(async {
assert!(ON_RT_THREAD.with(|cell| cell.get()));
let join = task::spawn_local(async move {
assert!(ON_RT_THREAD.with(|cell| cell.get()));
time::delay_for(Duration::from_millis(10)).await;
assert!(ON_RT_THREAD.with(|cell| cell.get()));
});
join.await.unwrap();
})
.await;
}
#[test]
// This will panic, since the thread that calls `block_on` cannot use
// in-place blocking inside of `block_on`.
#[should_panic]
fn local_threadpool_blocking_in_place() {
thread_local! {
static ON_RT_THREAD: Cell<bool> = Cell::new(false);
}
ON_RT_THREAD.with(|cell| cell.set(true));
let mut rt = runtime::Builder::new()
.threaded_scheduler()
.enable_all()
.build()
.unwrap();
LocalSet::new().block_on(&mut rt, async {
assert!(ON_RT_THREAD.with(|cell| cell.get()));
let join = task::spawn_local(async move {
assert!(ON_RT_THREAD.with(|cell| cell.get()));
task::block_in_place(|| {});
assert!(ON_RT_THREAD.with(|cell| cell.get()));
});
join.await.unwrap();
});
}
#[tokio::test(threaded_scheduler)]
async fn local_threadpool_blocking_run() {
thread_local! {
static ON_RT_THREAD: Cell<bool> = Cell::new(false);
}
ON_RT_THREAD.with(|cell| cell.set(true));
LocalSet::new()
.run_until(async {
assert!(ON_RT_THREAD.with(|cell| cell.get()));
let join = task::spawn_local(async move {
assert!(ON_RT_THREAD.with(|cell| cell.get()));
task::spawn_blocking(|| {
assert!(
!ON_RT_THREAD.with(|cell| cell.get()),
"blocking must not run on the local task set's thread"
);
})
.await
.unwrap();
assert!(ON_RT_THREAD.with(|cell| cell.get()));
});
join.await.unwrap();
})
.await;
}
#[tokio::test(threaded_scheduler)]
async fn all_spawns_are_local() {
use futures::future;
thread_local! {
static ON_RT_THREAD: Cell<bool> = Cell::new(false);
}
ON_RT_THREAD.with(|cell| cell.set(true));
LocalSet::new()
.run_until(async {
assert!(ON_RT_THREAD.with(|cell| cell.get()));
let handles = (0..128)
.map(|_| {
task::spawn_local(async {
assert!(ON_RT_THREAD.with(|cell| cell.get()));
})
})
.collect::<Vec<_>>();
for joined in future::join_all(handles).await {
joined.unwrap();
}
})
.await;
}
#[tokio::test(threaded_scheduler)]
async fn nested_spawn_is_local() {
thread_local! {
static ON_RT_THREAD: Cell<bool> = Cell::new(false);
}
ON_RT_THREAD.with(|cell| cell.set(true));
LocalSet::new()
.run_until(async {
assert!(ON_RT_THREAD.with(|cell| cell.get()));
task::spawn_local(async {
assert!(ON_RT_THREAD.with(|cell| cell.get()));
task::spawn_local(async {
assert!(ON_RT_THREAD.with(|cell| cell.get()));
task::spawn_local(async {
assert!(ON_RT_THREAD.with(|cell| cell.get()));
task::spawn_local(async {
assert!(ON_RT_THREAD.with(|cell| cell.get()));
})
.await
.unwrap();
})
.await
.unwrap();
})
.await
.unwrap();
})
.await
.unwrap();
})
.await;
}
#[test]
fn join_local_future_elsewhere() {
thread_local! {
static ON_RT_THREAD: Cell<bool> = Cell::new(false);
}
ON_RT_THREAD.with(|cell| cell.set(true));
let mut rt = runtime::Builder::new()
.threaded_scheduler()
.build()
.unwrap();
let local = LocalSet::new();
local.block_on(&mut rt, async move {
let (tx, rx) = oneshot::channel();
let join = task::spawn_local(async move {
println!("hello world running...");
assert!(
ON_RT_THREAD.with(|cell| cell.get()),
"local task must run on local thread, no matter where it is awaited"
);
rx.await.unwrap();
println!("hello world task done");
"hello world"
});
let join2 = task::spawn(async move {
assert!(
!ON_RT_THREAD.with(|cell| cell.get()),
"spawned task should be on a worker"
);
tx.send(()).expect("task shouldn't have ended yet");
println!("waking up hello world...");
join.await.expect("task should complete successfully");
println!("hello world task joined");
});
join2.await.unwrap()
});
}
#[test]
fn drop_cancels_tasks() {
use std::rc::Rc;
// This test reproduces issue #1842
let mut rt = rt();
let rc1 = Rc::new(());
let rc2 = rc1.clone();
let (started_tx, started_rx) = oneshot::channel();
let local = LocalSet::new();
local.spawn_local(async move {
// Move this in
let _rc2 = rc2;
started_tx.send(()).unwrap();
loop {
time::delay_for(Duration::from_secs(3600)).await;
}
});
local.block_on(&mut rt, async {
started_rx.await.unwrap();
});
drop(local);
drop(rt);
assert_eq!(1, Rc::strong_count(&rc1));
}
/// Runs a test function in a separate thread, and panics if the test does not
/// complete within the specified timeout, or if the test function panics.
///
/// This is intended for running tests whose failure mode is a hang or infinite
/// loop that cannot be detected otherwise.
fn with_timeout(timeout: Duration, f: impl FnOnce() + Send + 'static) {
use std::sync::mpsc::RecvTimeoutError;
let (done_tx, done_rx) = std::sync::mpsc::channel();
let thread = std::thread::spawn(move || {
f();
// Send a message on the channel so that the test thread can
// determine if we have entered an infinite loop:
done_tx.send(()).unwrap();
});
// Since the failure mode of this test is an infinite loop, rather than
// something we can easily make assertions about, we'll run it in a
// thread. When the test thread finishes, it will send a message on a
// channel to this thread. We'll wait for that message with a fairly
// generous timeout, and if we don't recieve it, we assume the test
// thread has hung.
//
// Note that it should definitely complete in under a minute, but just
// in case CI is slow, we'll give it a long timeout.
match done_rx.recv_timeout(timeout) {
Err(RecvTimeoutError::Timeout) => panic!(
"test did not complete within {:?} seconds, \
we have (probably) entered an infinite loop!",
timeout,
),
// Did the test thread panic? We'll find out for sure when we `join`
// with it.
Err(RecvTimeoutError::Disconnected) => {
println!("done_rx dropped, did the test thread panic?");
}
// Test completed successfully!
Ok(()) => {}
}
thread.join().expect("test thread should not panic!")
}
#[test]
fn drop_cancels_remote_tasks() {
// This test reproduces issue #1885.
with_timeout(Duration::from_secs(60), || {
let (tx, mut rx) = mpsc::channel::<()>(1024);
let mut rt = rt();
let local = LocalSet::new();
local.spawn_local(async move { while let Some(_) = rx.recv().await {} });
local.block_on(&mut rt, async {
time::delay_for(Duration::from_millis(1)).await;
});
drop(tx);
// This enters an infinite loop if the remote notified tasks are not
// properly cancelled.
drop(local);
});
}
#[test]
fn local_tasks_wake_join_all() {
// This test reproduces issue #2460.
with_timeout(Duration::from_secs(60), || {
use futures::future::join_all;
use tokio::task::LocalSet;
let mut rt = rt();
let set = LocalSet::new();
let mut handles = Vec::new();
for _ in 1..=128 {
handles.push(set.spawn_local(async move {
tokio::task::spawn_local(async move {}).await.unwrap();
}));
}
rt.block_on(set.run_until(join_all(handles)));
});
}
#[tokio::test]
async fn local_tasks_are_polled_after_tick() {
// Reproduces issues #1899 and #1900
static RX1: AtomicUsize = AtomicUsize::new(0);
static RX2: AtomicUsize = AtomicUsize::new(0);
static EXPECTED: usize = 500;
let (tx, mut rx) = mpsc::unbounded_channel();
let local = LocalSet::new();
local
.run_until(async {
let task2 = task::spawn(async move {
// Wait a bit
time::delay_for(Duration::from_millis(100)).await;
let mut oneshots = Vec::with_capacity(EXPECTED);
// Send values
for _ in 0..EXPECTED {
let (oneshot_tx, oneshot_rx) = oneshot::channel();
oneshots.push(oneshot_tx);
tx.send(oneshot_rx).unwrap();
}
time::delay_for(Duration::from_millis(100)).await;
for tx in oneshots.drain(..) {
tx.send(()).unwrap();
}
time::delay_for(Duration::from_millis(300)).await;
let rx1 = RX1.load(SeqCst);
let rx2 = RX2.load(SeqCst);
println!("EXPECT = {}; RX1 = {}; RX2 = {}", EXPECTED, rx1, rx2);
assert_eq!(EXPECTED, rx1);
assert_eq!(EXPECTED, rx2);
});
while let Some(oneshot) = rx.recv().await {
RX1.fetch_add(1, SeqCst);
task::spawn_local(async move {
oneshot.await.unwrap();
RX2.fetch_add(1, SeqCst);
});
}
task2.await.unwrap();
})
.await;
}
#[tokio::test]
async fn acquire_mutex_in_drop() {
use futures::future::pending;
let (tx1, rx1) = oneshot::channel();
let (tx2, rx2) = oneshot::channel();
let local = LocalSet::new();
local.spawn_local(async move {
let _ = rx2.await;
unreachable!();
});
local.spawn_local(async move {
let _ = rx1.await;
tx2.send(()).unwrap();
unreachable!();
});
// Spawn a task that will never notify
local.spawn_local(async move {
pending::<()>().await;
tx1.send(()).unwrap();
});
// Tick the loop
local
.run_until(async {
task::yield_now().await;
})
.await;
// Drop the LocalSet
drop(local);
}
fn rt() -> Runtime {
tokio::runtime::Builder::new()
.basic_scheduler()
.enable_all()
.build()
.unwrap()
}