blob: 51ec1348bce50e0d47a40023b30543a948be51f1 [file] [log] [blame]
//! The blocking executor.
//!
//! Tasks created by [`Task::blocking()`] go into this executor. This executor is independent of
//! [`run()`][`crate::run()`] - it does not need to be driven.
//!
//! Blocking tasks are allowed to block without restrictions. However, the executor puts a limit on
//! the number of concurrently running tasks. Once that limit is hit, a task will need to complete
//! or yield in order for others to run.
//!
//! In idle state, this executor has no threads and consumes no resources. Once tasks are spawned,
//! new threads will get started, as many as is needed to keep up with the present amount of work.
//! When threads are idle, they wait for some time for new work to come in and shut down after a
//! certain timeout.
//!
//! This module also implements convenient adapters:
//!
//! - [`blocking!`] as syntax sugar around [`Task::blocking()`]
//! - [`iter()`] converts an [`Iterator`] into a [`Stream`]
//! - [`reader()`] converts a [`Read`] into an [`AsyncRead`]
//! - [`writer()`] converts a [`Write`] into an [`AsyncWrite`]
use std::collections::VecDeque;
use std::future::Future;
use std::io::{self, Read, Write};
use std::panic;
use std::pin::Pin;
use std::sync::{Condvar, Mutex, MutexGuard};
use std::task::{Context, Poll};
use std::thread;
use std::time::Duration;
use futures_util::io::{AllowStdIo, AsyncRead, AsyncWrite, AsyncWriteExt};
use futures_util::stream::Stream;
use once_cell::sync::Lazy;
use crate::context;
use crate::task::{Runnable, Task};
use crate::throttle;
/// The blocking executor.
pub(crate) struct BlockingExecutor {
/// The current state of the executor.
state: Mutex<State>,
/// Used to put idle threads to sleep and wake them up when new work comes in.
cvar: Condvar,
}
/// Current state of the blocking executor.
struct State {
/// Number of idle threads in the pool.
///
/// Idle threads are sleeping, waiting to get a task to run.
idle_count: usize,
/// Total number of thread in the pool.
///
/// This is the number of idle threads + the number of active threads.
thread_count: usize,
/// The queue of blocking tasks.
queue: VecDeque<Runnable>,
}
impl BlockingExecutor {
/// Returns a reference to the blocking executor.
pub fn get() -> &'static BlockingExecutor {
static EXECUTOR: Lazy<BlockingExecutor> = Lazy::new(|| BlockingExecutor {
state: Mutex::new(State {
idle_count: 0,
thread_count: 0,
queue: VecDeque::new(),
}),
cvar: Condvar::new(),
});
&EXECUTOR
}
/// Spawns a future onto this executor.
///
/// Returns a [`Task`] handle for the spawned task.
pub fn spawn<T: Send + 'static>(
&'static self,
future: impl Future<Output = T> + Send + 'static,
) -> Task<T> {
// Create a task, schedule it, and return its `Task` handle.
let (runnable, handle) = async_task::spawn(future, move |r| self.schedule(r), ());
runnable.schedule();
Task(Some(handle))
}
/// Runs the main loop on the current thread.
///
/// This function runs blocking tasks until it becomes idle and times out.
fn main_loop(&'static self) {
let mut state = self.state.lock().unwrap();
loop {
// This thread is not idle anymore because it's going to run tasks.
state.idle_count -= 1;
// Run tasks in the queue.
while let Some(runnable) = state.queue.pop_front() {
// We have found a task - grow the pool if needed.
self.grow_pool(state);
// Run the task.
let _ = panic::catch_unwind(|| runnable.run());
// Re-lock the state and continue.
state = self.state.lock().unwrap();
}
// This thread is now becoming idle.
state.idle_count += 1;
// Put the thread to sleep until another task is scheduled.
let timeout = Duration::from_millis(500);
let (s, res) = self.cvar.wait_timeout(state, timeout).unwrap();
state = s;
// If there are no tasks after a while, stop this thread.
if res.timed_out() && state.queue.is_empty() {
state.idle_count -= 1;
state.thread_count -= 1;
break;
}
}
}
/// Schedules a runnable task for execution.
fn schedule(&'static self, runnable: Runnable) {
let mut state = self.state.lock().unwrap();
state.queue.push_back(runnable);
// Notify a sleeping thread and spawn more threads if needed.
self.cvar.notify_one();
self.grow_pool(state);
}
/// Spawns more blocking threads if the pool is overloaded with work.
fn grow_pool(&'static self, mut state: MutexGuard<'static, State>) {
// If runnable tasks greatly outnumber idle threads and there aren't too many threads
// already, then be aggressive: wake all idle threads and spawn one more thread.
while state.queue.len() > state.idle_count * 5 && state.thread_count < 500 {
// The new thread starts in idle state.
state.idle_count += 1;
state.thread_count += 1;
// Notify all existing idle threads because we need to hurry up.
self.cvar.notify_all();
// Spawn the new thread.
thread::spawn(move || {
// If enabled, set up tokio before the main loop begins.
context::enter(|| self.main_loop())
});
}
}
}
/// Spawns blocking code onto a thread.
///
/// Note that `blocking!(expr)` is just syntax sugar for
/// `Task::blocking(async move { expr }).await`.
///
/// # Examples
///
/// Read a file into a string:
///
/// ```no_run
/// use smol::blocking;
/// use std::fs;
///
/// # smol::run(async {
/// let contents = blocking!(fs::read_to_string("file.txt"))?;
/// # std::io::Result::Ok(()) });
/// ```
///
/// Spawn a process:
///
/// ```no_run
/// use smol::blocking;
/// use std::process::Command;
///
/// # smol::run(async {
/// let out = blocking!(Command::new("dir").output())?;
/// # std::io::Result::Ok(()) });
/// ```
#[macro_export]
macro_rules! blocking {
($($expr:tt)*) => {
$crate::Task::blocking(async move { $($expr)* }).await
};
}
/// Creates a stream that iterates on a thread.
///
/// This adapter converts any kind of synchronous iterator into an asynchronous stream by running
/// it on the blocking executor and sending items back over a channel.
///
/// # Examples
///
/// List files in the current directory:
///
/// ```no_run
/// use futures::stream::StreamExt;
/// use smol::{blocking, iter};
/// use std::fs;
///
/// # smol::run(async {
/// // Load a directory.
/// let mut dir = blocking!(fs::read_dir("."))?;
/// let mut dir = iter(dir);
///
/// // Iterate over the contents of the directory.
/// while let Some(res) = dir.next().await {
/// println!("{}", res?.file_name().to_string_lossy());
/// }
/// # std::io::Result::Ok(()) });
/// ```
pub fn iter<T: Send + 'static>(
iter: impl Iterator<Item = T> + Send + 'static,
) -> impl Stream<Item = T> + Send + Unpin + 'static {
/// Current state of the iterator.
enum State<T, I> {
/// The iterator is idle.
Idle(Option<I>),
/// The iterator is running in a blocking task and sending items into a channel.
Busy(piper::Receiver<T>, Task<I>),
}
impl<T, I> Unpin for State<T, I> {}
impl<T: Send + 'static, I: Iterator<Item = T> + Send + 'static> Stream for State<T, I> {
type Item = T;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
// Throttle if the current task has done too many I/O operations without yielding.
futures_util::ready!(throttle::poll(cx));
match &mut *self {
State::Idle(iter) => {
// If idle, take the iterator out to run it on a blocking task.
let mut iter = iter.take().unwrap();
// This channel capacity seems to work well in practice. If it's too low, there
// will be too much synchronization between tasks. If too high, memory
// consumption increases.
let (sender, receiver) = piper::chan(8 * 1024); // 8192 items
// Spawn a blocking task that runs the iterator and returns it when done.
let task = Task::blocking(async move {
for item in &mut iter {
sender.send(item).await;
}
iter
});
// Move into the busy state and poll again.
*self = State::Busy(receiver, task);
self.poll_next(cx)
}
State::Busy(receiver, task) => {
// Poll the channel.
let opt = futures_util::ready!(Pin::new(receiver).poll_next(cx));
// If the channel is closed, retrieve the iterator back from the blocking task.
// This is not really a required step, but it's cleaner to drop the iterator on
// the same thread that created it.
if opt.is_none() {
// Poll the task to retrieve the iterator.
let iter = futures_util::ready!(Pin::new(task).poll(cx));
*self = State::Idle(Some(iter));
}
Poll::Ready(opt)
}
}
}
}
State::Idle(Some(iter))
}
/// Creates an async reader that runs on a thread.
///
/// This adapter converts any kind of synchronous reader into an asynchronous reader by running it
/// on the blocking executor and sending bytes back over a pipe.
///
/// # Examples
///
/// Read from a file:
///
/// ```no_run
/// use futures::prelude::*;
/// use smol::{blocking, reader};
/// use std::fs::File;
///
/// # smol::run(async {
/// // Open a file for reading.
/// let file = blocking!(File::open("foo.txt"))?;
/// let mut file = reader(file);
///
/// // Read the whole file.
/// let mut contents = Vec::new();
/// file.read_to_end(&mut contents).await?;
/// # std::io::Result::Ok(()) });
/// ```
///
/// Read output from a process:
///
/// ```no_run
/// use futures::prelude::*;
/// use smol::reader;
/// use std::process::{Command, Stdio};
///
/// # smol::run(async {
/// // Spawn a child process and make an async reader for its stdout.
/// let child = Command::new("dir").stdout(Stdio::piped()).spawn()?;
/// let mut child_stdout = reader(child.stdout.unwrap());
///
/// // Read the entire output.
/// let mut output = String::new();
/// child_stdout.read_to_string(&mut output).await?;
/// # std::io::Result::Ok(()) });
/// ```
pub fn reader(reader: impl Read + Send + 'static) -> impl AsyncRead + Send + Unpin + 'static {
/// Current state of the reader.
enum State<T> {
/// The reader is idle.
Idle(Option<T>),
/// The reader is running in a blocking task and sending bytes into a pipe.
Busy(piper::Reader, Task<(io::Result<()>, T)>),
}
impl<T: AsyncRead + Send + Unpin + 'static> AsyncRead for State<T> {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
// Throttle if the current task has done too many I/O operations without yielding.
futures_util::ready!(throttle::poll(cx));
match &mut *self {
State::Idle(io) => {
// If idle, take the I/O handle out to read it on a blocking task.
let mut io = io.take().unwrap();
// This pipe capacity seems to work well in practice. If it's too low, there
// will be too much synchronization between tasks. If too high, memory
// consumption increases.
let (reader, mut writer) = piper::pipe(8 * 1024 * 1024); // 8 MB
// Spawn a blocking task that reads and returns the I/O handle when done.
let task = Task::blocking(async move {
// Copy bytes from the I/O handle into the pipe until the pipe is closed or
// an error occurs.
let res = futures_util::io::copy(&mut io, &mut writer).await;
(res.map(drop), io)
});
// Move into the busy state and poll again.
*self = State::Busy(reader, task);
self.poll_read(cx, buf)
}
State::Busy(reader, task) => {
// Poll the pipe.
let n = futures_util::ready!(Pin::new(reader).poll_read(cx, buf))?;
// If the pipe is closed, retrieve the I/O handle back from the blocking task.
// This is not really a required step, but it's cleaner to drop the handle on
// the same thread that created it.
if n == 0 {
// Poll the task to retrieve the I/O handle.
let (res, io) = futures_util::ready!(Pin::new(task).poll(cx));
// Make sure to move into the idle state before reporting errors.
*self = State::Idle(Some(io));
res?;
}
Poll::Ready(Ok(n))
}
}
}
}
// It's okay to treat the `Read` type as `AsyncRead` because it's only read from inside a
// blocking task.
let io = Box::pin(AllowStdIo::new(reader));
State::Idle(Some(io))
}
/// Creates an async writer that runs on a thread.
///
/// This adapter converts any kind of synchronous writer into an asynchronous writer by running it
/// on the blocking executor and receiving bytes over a pipe.
///
/// **Note:** Don't forget to flush the writer at the end, or some written bytes might get lost!
///
/// # Examples
///
/// Write into a file:
///
/// ```no_run
/// use futures::prelude::*;
/// use smol::{blocking, writer};
/// use std::fs::File;
///
/// # smol::run(async {
/// // Open a file for writing.
/// let file = blocking!(File::open("foo.txt"))?;
/// let mut file = writer(file);
///
/// // Write some bytes into the file and flush.
/// file.write_all(b"hello").await?;
/// file.flush().await?;
/// # std::io::Result::Ok(()) });
/// ```
///
/// Write into standard output:
///
/// ```no_run
/// use futures::prelude::*;
/// use smol::writer;
///
/// # smol::run(async {
/// // Create an async writer to stdout.
/// let mut stdout = writer(std::io::stdout());
///
/// // Write a message and flush.
/// stdout.write_all(b"hello").await?;
/// stdout.flush().await?;
/// # std::io::Result::Ok(()) });
/// ```
pub fn writer(writer: impl Write + Send + 'static) -> impl AsyncWrite + Send + Unpin + 'static {
/// Current state of the writer.
enum State<T> {
/// The writer is idle.
Idle(Option<T>),
/// The writer is running in a blocking task and receiving bytes from a pipe.
Busy(Option<piper::Writer>, Task<(io::Result<()>, T)>),
}
impl<T: AsyncWrite + Send + Unpin + 'static> State<T> {
/// Starts a blocking task.
fn start(&mut self) {
if let State::Idle(io) = self {
// If idle, take the I/O handle out to write on a blocking task.
let mut io = io.take().unwrap();
// This pipe capacity seems to work well in practice. If it's too low, there will
// be too much synchronization between tasks. If too high, memory consumption
// increases.
let (reader, writer) = piper::pipe(8 * 1024 * 1024); // 8 MB
// Spawn a blocking task that writes and returns the I/O handle when done.
let task = Task::blocking(async move {
// Copy bytes from the pipe into the I/O handle until the pipe is closed or an
// error occurs. Flush the I/O handle at the end.
match futures_util::io::copy(reader, &mut io).await {
Ok(_) => (io.flush().await, io),
Err(err) => (Err(err), io),
}
});
// Move into the busy state.
*self = State::Busy(Some(writer), task);
}
}
}
impl<T: AsyncWrite + Send + Unpin + 'static> AsyncWrite for State<T> {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
// Throttle if the current task has done too many I/O operations without yielding.
futures_util::ready!(throttle::poll(cx));
loop {
match &mut *self {
// The writer is idle and closed.
State::Idle(None) => return Poll::Ready(Ok(0)),
// The writer is idle and open - start a blocking task.
State::Idle(Some(_)) => self.start(),
// The task is flushing and in process of stopping.
State::Busy(None, task) => {
// Poll the task to retrieve the I/O handle.
let (res, io) = futures_util::ready!(Pin::new(task).poll(cx));
// Make sure to move into the idle state before reporting errors.
*self = State::Idle(Some(io));
res?;
}
// The writer is busy - write more bytes into the pipe.
State::Busy(Some(writer), _) => return Pin::new(writer).poll_write(cx, buf),
}
}
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
// Throttle if the current task has done too many I/O operations without yielding.
futures_util::ready!(throttle::poll(cx));
loop {
match &mut *self {
// The writer is idle and closed.
State::Idle(None) => return Poll::Ready(Ok(())),
// The writer is idle and open - start a blocking task.
State::Idle(Some(_)) => self.start(),
// The task is busy.
State::Busy(writer, task) => {
// Drop the writer to close the pipe. This stops the `futures_util::io::copy`
// operation in the task, after which the task flushes the I/O handle and
// returns it back.
writer.take();
// Poll the task to retrieve the I/O handle.
let (res, io) = futures_util::ready!(Pin::new(task).poll(cx));
// Make sure to move into the idle state before reporting errors.
*self = State::Idle(Some(io));
return Poll::Ready(res);
}
}
}
}
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
// First, make sure the I/O handle is flushed.
futures_util::ready!(Pin::new(&mut *self).poll_flush(cx))?;
// Then move into the idle state with no I/O handle, thus dropping it.
*self = State::Idle(None);
Poll::Ready(Ok(()))
}
}
// It's okay to treat the `Write` type as `AsyncWrite` because it's only written to inside a
// blocking task.
let io = AllowStdIo::new(writer);
State::Idle(Some(io))
}