| #![warn(rust_2018_idioms)] |
| #![cfg(feature = "full")] |
| |
| use tokio::io::{AsyncReadExt, AsyncWriteExt}; |
| use tokio::net::{TcpListener, TcpStream}; |
| use tokio::runtime::{self, Runtime}; |
| use tokio::sync::oneshot; |
| use tokio_test::{assert_err, assert_ok}; |
| |
| use futures::future::poll_fn; |
| use std::future::Future; |
| use std::pin::Pin; |
| use std::sync::atomic::AtomicUsize; |
| use std::sync::atomic::Ordering::Relaxed; |
| use std::sync::{mpsc, Arc}; |
| use std::task::{Context, Poll}; |
| |
| #[test] |
| fn single_thread() { |
| // No panic when starting a runtime w/ a single thread |
| let _ = runtime::Builder::new() |
| .threaded_scheduler() |
| .enable_all() |
| .core_threads(1) |
| .build(); |
| } |
| |
| #[test] |
| fn many_oneshot_futures() { |
| // used for notifying the main thread |
| const NUM: usize = 1_000; |
| |
| for _ in 0..5 { |
| let (tx, rx) = mpsc::channel(); |
| |
| let rt = rt(); |
| let cnt = Arc::new(AtomicUsize::new(0)); |
| |
| for _ in 0..NUM { |
| let cnt = cnt.clone(); |
| let tx = tx.clone(); |
| |
| rt.spawn(async move { |
| let num = cnt.fetch_add(1, Relaxed) + 1; |
| |
| if num == NUM { |
| tx.send(()).unwrap(); |
| } |
| }); |
| } |
| |
| rx.recv().unwrap(); |
| |
| // Wait for the pool to shutdown |
| drop(rt); |
| } |
| } |
| #[test] |
| fn many_multishot_futures() { |
| use tokio::sync::mpsc; |
| |
| const CHAIN: usize = 200; |
| const CYCLES: usize = 5; |
| const TRACKS: usize = 50; |
| |
| for _ in 0..50 { |
| let mut rt = rt(); |
| let mut start_txs = Vec::with_capacity(TRACKS); |
| let mut final_rxs = Vec::with_capacity(TRACKS); |
| |
| for _ in 0..TRACKS { |
| let (start_tx, mut chain_rx) = mpsc::channel(10); |
| |
| for _ in 0..CHAIN { |
| let (mut next_tx, next_rx) = mpsc::channel(10); |
| |
| // Forward all the messages |
| rt.spawn(async move { |
| while let Some(v) = chain_rx.recv().await { |
| next_tx.send(v).await.unwrap(); |
| } |
| }); |
| |
| chain_rx = next_rx; |
| } |
| |
| // This final task cycles if needed |
| let (mut final_tx, final_rx) = mpsc::channel(10); |
| let mut cycle_tx = start_tx.clone(); |
| let mut rem = CYCLES; |
| |
| rt.spawn(async move { |
| for _ in 0..CYCLES { |
| let msg = chain_rx.recv().await.unwrap(); |
| |
| rem -= 1; |
| |
| if rem == 0 { |
| final_tx.send(msg).await.unwrap(); |
| } else { |
| cycle_tx.send(msg).await.unwrap(); |
| } |
| } |
| }); |
| |
| start_txs.push(start_tx); |
| final_rxs.push(final_rx); |
| } |
| |
| { |
| rt.block_on(async move { |
| for mut start_tx in start_txs { |
| start_tx.send("ping").await.unwrap(); |
| } |
| |
| for mut final_rx in final_rxs { |
| final_rx.recv().await.unwrap(); |
| } |
| }); |
| } |
| } |
| } |
| |
| #[test] |
| fn spawn_shutdown() { |
| let mut rt = rt(); |
| let (tx, rx) = mpsc::channel(); |
| |
| rt.block_on(async { |
| tokio::spawn(client_server(tx.clone())); |
| }); |
| |
| // Use spawner |
| rt.spawn(client_server(tx)); |
| |
| assert_ok!(rx.recv()); |
| assert_ok!(rx.recv()); |
| |
| drop(rt); |
| assert_err!(rx.try_recv()); |
| } |
| |
| async fn client_server(tx: mpsc::Sender<()>) { |
| let mut server = assert_ok!(TcpListener::bind("127.0.0.1:0").await); |
| |
| // Get the assigned address |
| let addr = assert_ok!(server.local_addr()); |
| |
| // Spawn the server |
| tokio::spawn(async move { |
| // Accept a socket |
| let (mut socket, _) = server.accept().await.unwrap(); |
| |
| // Write some data |
| socket.write_all(b"hello").await.unwrap(); |
| }); |
| |
| let mut client = TcpStream::connect(&addr).await.unwrap(); |
| |
| let mut buf = vec![]; |
| client.read_to_end(&mut buf).await.unwrap(); |
| |
| assert_eq!(buf, b"hello"); |
| tx.send(()).unwrap(); |
| } |
| |
| #[test] |
| fn drop_threadpool_drops_futures() { |
| for _ in 0..1_000 { |
| let num_inc = Arc::new(AtomicUsize::new(0)); |
| let num_dec = Arc::new(AtomicUsize::new(0)); |
| let num_drop = Arc::new(AtomicUsize::new(0)); |
| |
| struct Never(Arc<AtomicUsize>); |
| |
| impl Future for Never { |
| type Output = (); |
| |
| fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> { |
| Poll::Pending |
| } |
| } |
| |
| impl Drop for Never { |
| fn drop(&mut self) { |
| self.0.fetch_add(1, Relaxed); |
| } |
| } |
| |
| let a = num_inc.clone(); |
| let b = num_dec.clone(); |
| |
| let rt = runtime::Builder::new() |
| .threaded_scheduler() |
| .enable_all() |
| .on_thread_start(move || { |
| a.fetch_add(1, Relaxed); |
| }) |
| .on_thread_stop(move || { |
| b.fetch_add(1, Relaxed); |
| }) |
| .build() |
| .unwrap(); |
| |
| rt.spawn(Never(num_drop.clone())); |
| |
| // Wait for the pool to shutdown |
| drop(rt); |
| |
| // Assert that only a single thread was spawned. |
| let a = num_inc.load(Relaxed); |
| assert!(a >= 1); |
| |
| // Assert that all threads shutdown |
| let b = num_dec.load(Relaxed); |
| assert_eq!(a, b); |
| |
| // Assert that the future was dropped |
| let c = num_drop.load(Relaxed); |
| assert_eq!(c, 1); |
| } |
| } |
| |
| #[test] |
| fn start_stop_callbacks_called() { |
| use std::sync::atomic::{AtomicUsize, Ordering}; |
| |
| let after_start = Arc::new(AtomicUsize::new(0)); |
| let before_stop = Arc::new(AtomicUsize::new(0)); |
| |
| let after_inner = after_start.clone(); |
| let before_inner = before_stop.clone(); |
| let mut rt = tokio::runtime::Builder::new() |
| .threaded_scheduler() |
| .enable_all() |
| .on_thread_start(move || { |
| after_inner.clone().fetch_add(1, Ordering::Relaxed); |
| }) |
| .on_thread_stop(move || { |
| before_inner.clone().fetch_add(1, Ordering::Relaxed); |
| }) |
| .build() |
| .unwrap(); |
| |
| let (tx, rx) = oneshot::channel(); |
| |
| rt.spawn(async move { |
| assert_ok!(tx.send(())); |
| }); |
| |
| assert_ok!(rt.block_on(rx)); |
| |
| drop(rt); |
| |
| assert!(after_start.load(Ordering::Relaxed) > 0); |
| assert!(before_stop.load(Ordering::Relaxed) > 0); |
| } |
| |
| #[test] |
| fn blocking() { |
| // used for notifying the main thread |
| const NUM: usize = 1_000; |
| |
| for _ in 0..10 { |
| let (tx, rx) = mpsc::channel(); |
| |
| let rt = rt(); |
| let cnt = Arc::new(AtomicUsize::new(0)); |
| |
| // there are four workers in the pool |
| // so, if we run 4 blocking tasks, we know that handoff must have happened |
| let block = Arc::new(std::sync::Barrier::new(5)); |
| for _ in 0..4 { |
| let block = block.clone(); |
| rt.spawn(async move { |
| tokio::task::block_in_place(move || { |
| block.wait(); |
| block.wait(); |
| }) |
| }); |
| } |
| block.wait(); |
| |
| for _ in 0..NUM { |
| let cnt = cnt.clone(); |
| let tx = tx.clone(); |
| |
| rt.spawn(async move { |
| let num = cnt.fetch_add(1, Relaxed) + 1; |
| |
| if num == NUM { |
| tx.send(()).unwrap(); |
| } |
| }); |
| } |
| |
| rx.recv().unwrap(); |
| |
| // Wait for the pool to shutdown |
| block.wait(); |
| } |
| } |
| |
| #[test] |
| fn multi_threadpool() { |
| use tokio::sync::oneshot; |
| |
| let rt1 = rt(); |
| let rt2 = rt(); |
| |
| let (tx, rx) = oneshot::channel(); |
| let (done_tx, done_rx) = mpsc::channel(); |
| |
| rt2.spawn(async move { |
| rx.await.unwrap(); |
| done_tx.send(()).unwrap(); |
| }); |
| |
| rt1.spawn(async move { |
| tx.send(()).unwrap(); |
| }); |
| |
| done_rx.recv().unwrap(); |
| } |
| |
| // When `block_in_place` returns, it attempts to reclaim the yielded runtime |
| // worker. In this case, the remainder of the task is on the runtime worker and |
| // must take part in the cooperative task budgeting system. |
| // |
| // The test ensures that, when this happens, attempting to consume from a |
| // channel yields occasionally even if there are values ready to receive. |
| #[test] |
| fn coop_and_block_in_place() { |
| use tokio::sync::mpsc; |
| |
| let mut rt = tokio::runtime::Builder::new() |
| .threaded_scheduler() |
| // Setting max threads to 1 prevents another thread from claiming the |
| // runtime worker yielded as part of `block_in_place` and guarantees the |
| // same thread will reclaim the worker at the end of the |
| // `block_in_place` call. |
| .max_threads(1) |
| .build() |
| .unwrap(); |
| |
| rt.block_on(async move { |
| let (mut tx, mut rx) = mpsc::channel(1024); |
| |
| // Fill the channel |
| for _ in 0..1024 { |
| tx.send(()).await.unwrap(); |
| } |
| |
| drop(tx); |
| |
| tokio::spawn(async move { |
| // Block in place without doing anything |
| tokio::task::block_in_place(|| {}); |
| |
| // Receive all the values, this should trigger a `Pending` as the |
| // coop limit will be reached. |
| poll_fn(|cx| { |
| while let Poll::Ready(v) = { |
| tokio::pin! { |
| let fut = rx.recv(); |
| } |
| |
| Pin::new(&mut fut).poll(cx) |
| } { |
| if v.is_none() { |
| panic!("did not yield"); |
| } |
| } |
| |
| Poll::Ready(()) |
| }) |
| .await |
| }) |
| .await |
| .unwrap(); |
| }); |
| } |
| |
| // Testing this does not panic |
| #[test] |
| fn max_threads() { |
| let _rt = tokio::runtime::Builder::new() |
| .threaded_scheduler() |
| .max_threads(1) |
| .build() |
| .unwrap(); |
| } |
| |
| fn rt() -> Runtime { |
| Runtime::new().unwrap() |
| } |