blob: 47f1c1b5a6387e4e5726e4c5c9c1265c5592c59d [file] [log] [blame]
//! Implementation of [`run()`].
//!
//! This function is the entry point to the smol executor.
use std::future::Future;
use std::task::{Context, Poll};
use std::thread;
use futures_util::future::{self, Either};
use crate::block_on;
use crate::context;
use crate::io_event::IoEvent;
use crate::reactor::{Reactor, ReactorLock};
use crate::thread_local::ThreadLocalExecutor;
use crate::throttle;
use crate::work_stealing::WorkStealingExecutor;
/// Runs executors and polls the reactor.
///
/// This function simultaneously runs the thread-local executor, runs the work-stealing
/// executor, and polls the reactor for I/O events and timers. At least one thread has to be
/// calling [`run()`] in order for futures waiting on I/O and timers to get notified.
///
/// # Examples
///
/// Single-threaded executor:
///
/// ```
/// // Run the thread-local and work-stealing executor on the current thread.
/// smol::run(async {
/// println!("Hello from the smol executor!");
/// });
/// ```
///
/// Multi-threaded executor:
///
/// ```no_run
/// use futures::future;
/// use smol::Task;
/// use std::thread;
///
/// // Same number of threads as there are CPU cores.
/// let num_threads = num_cpus::get().max(1);
///
/// // Run the thread-local and work-stealing executor on a thread pool.
/// for _ in 0..num_threads {
/// // A pending future is one that simply yields forever.
/// thread::spawn(|| smol::run(future::pending::<()>()));
/// }
///
/// // No need to `run()`, now we can just block on the main future.
/// smol::block_on(async {
/// Task::spawn(async {
/// println!("Hello from an executor thread!");
/// })
/// .await;
/// });
/// ```
///
/// Stoppable multi-threaded executor:
///
/// ```
/// use smol::Task;
/// use std::thread;
///
/// // Same number of threads as there are CPU cores.
/// let num_threads = num_cpus::get().max(1);
///
/// // A channel that sends the shutdown signal.
/// let (s, r) = piper::chan::<()>(0);
/// let mut threads = Vec::new();
///
/// // Create an executor thread pool.
/// for _ in 0..num_threads {
/// // Spawn an executor thread that waits for the shutdown signal.
/// let r = r.clone();
/// threads.push(thread::spawn(move || smol::run(r.recv())));
/// }
///
/// // No need to `run()`, now we can just block on the main future.
/// smol::block_on(async {
/// Task::spawn(async {
/// println!("Hello from an executor thread!");
/// })
/// .await;
/// });
///
/// // Send a shutdown signal.
/// drop(s);
///
/// // Wait for threads to finish.
/// for t in threads {
/// t.join().unwrap();
/// }
/// ```
pub fn run<T>(future: impl Future<Output = T>) -> T {
// Create a thread-local executor and a worker in the work-stealing executor.
let local = ThreadLocalExecutor::new();
let ws_executor = WorkStealingExecutor::get();
let worker = ws_executor.worker();
let reactor = Reactor::get();
// Create a waker that triggers an I/O event in the thread-local scheduler.
let ev = local.event().clone();
let waker = async_task::waker_fn(move || ev.notify());
let cx = &mut Context::from_waker(&waker);
futures_util::pin_mut!(future);
// Set up tokio (if enabled) and the thread-locals before execution begins.
let enter = context::enter;
let enter = |f| local.enter(|| enter(f));
let enter = |f| worker.enter(|| enter(f));
enter(|| {
// A list of I/O events that indicate there is work to do.
let io_events = [local.event(), ws_executor.event()];
// Number of times this thread has yielded because it didn't find any work.
let mut yields = 0;
// We run four components at the same time, treating them all fairly and making sure none
// of them get starved:
//
// 1. `future` - the main future.
// 2. `local - the thread-local executor.
// 3. `worker` - the work-stealing executor.
// 4. `reactor` - the reactor.
//
// When all four components are out of work, we block the current thread on
// epoll/kevent/wepoll. If new work comes in that isn't naturally triggered by an I/O event
// registered with `Async` handles, we use `IoEvent`s to simulate an I/O event that will
// unblock the thread:
//
// - When the main future is woken, `local.event()` is triggered.
// - When thread-local executor gets new work, `local.event()` is triggered.
// - When work-stealing executor gets new work, `ws_executor.event()` is triggered.
// - When a new earliest timer is registered, `reactor.event()` is triggered.
//
// This way we make sure that if any changes happen that might give us new work will
// unblock epoll/kevent/wepoll and let us continue the loop.
loop {
// 1. Poll the main future.
if let Poll::Ready(val) = throttle::setup(|| future.as_mut().poll(cx)) {
return val;
}
// 2. Run a batch of tasks in the thread-local executor.
let more_local = local.execute();
// 3. Run a batch of tasks in the work-stealing executor.
let more_worker = worker.execute();
// 4. Poll the reactor.
if let Some(reactor_lock) = reactor.try_lock() {
yields = 0;
react(reactor_lock, &io_events, more_local || more_worker);
continue;
}
// If there is more work in the thread-local or the work-stealing executor, continue.
if more_local || more_worker {
yields = 0;
continue;
}
// Yield a few times if no work is found.
yields += 1;
if yields <= 2 {
thread::yield_now();
continue;
}
// If still no work is found, stop yielding and block the thread.
yields = 0;
// Prepare for blocking until the reactor is locked or `local.event()` is triggered.
//
// Note that there is no need to wait for `ws_executor.event()`. If we lock the reactor
// immediately, we'll check for the I/O event right after that anyway.
//
// If some other worker is holding the reactor locked, it will unlock it as soon as the
// I/O event is triggered. Then, another worker will be allowed to lock the reactor,
// and will unlock it if there is more work to do because every worker triggers the I/O
// event whenever it finds a runnable task.
let lock = reactor.lock();
let notified = local.event().notified();
futures_util::pin_mut!(lock);
futures_util::pin_mut!(notified);
// Block until either the reactor is locked or `local.event()` is triggered.
if let Either::Left((reactor_lock, _)) = block_on(future::select(lock, notified)) {
react(reactor_lock, &io_events, false);
} else {
// Clear `local.event()` because it was triggered.
local.event().clear();
}
}
})
}
/// Polls or waits on the locked reactor.
///
/// If any of the I/O events are ready or there are more tasks to run, the reactor is polled.
/// Otherwise, the current thread waits on it until a timer fires or an I/O event occurs.
///
/// I/O events are cleared at the end of this function.
fn react(mut reactor_lock: ReactorLock<'_>, io_events: &[&IoEvent], mut more_tasks: bool) {
// Clear all I/O events and check if any of them were triggered.
for ev in io_events {
if ev.clear() {
more_tasks = true;
}
}
if more_tasks {
// If there might be more tasks to run, just poll without blocking.
reactor_lock.poll().expect("failure while polling I/O");
} else {
// Otherwise, block until the first I/O event or a timer.
reactor_lock.wait().expect("failure while waiting on I/O");
// Clear all I/O events before dropping the lock. This is not really necessary, but
// clearing flags here might prevent a redundant wakeup in the future.
for ev in io_events {
ev.clear();
}
}
}