blob: 53f131b8e3b37b1c67f1df29b0392e4f0226523f [file] [log] [blame]
//! The work-stealing executor.
//!
//! Tasks created by [`Task::spawn()`] go into this executor. Every thread calling [`run()`]
//! initializes a [`Worker`] that participates in work stealing, which is allowed to run any task in
//! this executor or in other workers. Since tasks can be stolen by any worker and thus move from
//! thread to thread, their futures must implement [`Send`].
//!
//! There is only one global instance of this type, accessible by [`WorkStealingExecutor::get()`].
//!
//! [Work stealing] is a strategy that reduces contention in multi-threaded environments. If all
//! invocations of [`run()`] used the same global task queue all the time, they would contend on
//! the queue all the time, thus slowing the executor down.
//!
//! The solution is to have a separate queue for each invocation of [`run()`], called a "worker".
//! Each thread is primarily using its own worker. Once all tasks in the worker are exhausted, then
//! we look for tasks in the global queue, called "injector", or steal tasks from other workers.
//!
//! [`run()`]: crate::run()
//! [Work stealing]: https://en.wikipedia.org/wiki/Work_stealing
use std::cell::Cell;
use std::future::Future;
use std::num::Wrapping;
use std::panic;
use crossbeam_deque as deque;
use crossbeam_utils::sync::ShardedLock;
use once_cell::sync::Lazy;
use scoped_tls_hkt::scoped_thread_local;
use slab::Slab;
use crate::io_event::IoEvent;
use crate::task::{Runnable, Task};
use crate::throttle;
scoped_thread_local! {
/// The current thread's worker.
///
/// Other threads may steal tasks from this worker through its associated stealer that was
/// registered in the work-stealing executor.
///
/// This thread-local is only set while inside [`Worker::enter()`].
static WORKER: for<'a> &'a Worker<'a>
}
/// The global work-stealing executor.
pub(crate) struct WorkStealingExecutor {
/// When a thread that is not inside [`run()`][`crate::run()`] spawns or wakes a task, it goes
/// into this queue.
injector: deque::Injector<Runnable>,
/// Registered handles for stealing tasks from workers.
stealers: ShardedLock<Slab<deque::Stealer<Runnable>>>,
/// An I/O event that is triggered whenever there might be available tasks to run.
event: IoEvent,
}
impl WorkStealingExecutor {
/// Returns a reference to the global work-stealing executor.
pub fn get() -> &'static WorkStealingExecutor {
static EXECUTOR: Lazy<WorkStealingExecutor> = Lazy::new(|| WorkStealingExecutor {
injector: deque::Injector::new(),
stealers: ShardedLock::new(Slab::new()),
event: IoEvent::new().expect("cannot create an `IoEvent`"),
});
&EXECUTOR
}
/// Returns the event indicating there is a scheduled task.
pub fn event(&self) -> &IoEvent {
&self.event
}
/// Spawns a future onto this executor.
///
/// Returns a [`Task`] handle for the spawned task.
pub fn spawn<T: Send + 'static>(
&'static self,
future: impl Future<Output = T> + Send + 'static,
) -> Task<T> {
// The function that schedules a runnable task when it gets woken up.
let schedule = move |runnable| {
if WORKER.is_set() {
// If scheduling from a worker thread, push into the worker's queue.
WORKER.with(|w| w.push(runnable));
} else {
// If scheduling from a non-worker thread, push into the injector queue.
self.injector.push(runnable);
// Notify workers that there is a task in the injector queue.
self.event.notify();
}
};
// Create a task, push it into the queue by scheduling it, and return its `Task` handle.
let (runnable, handle) = async_task::spawn(future, schedule, ());
runnable.schedule();
Task(Some(handle))
}
/// Registers a new worker.
///
/// The worker will automatically deregister itself when dropped.
pub fn worker(&self) -> Worker<'_> {
let mut stealers = self.stealers.write().unwrap();
let vacant = stealers.vacant_entry();
// Create a worker and put its stealer handle into the executor.
let worker = Worker {
key: vacant.key(),
slot: Cell::new(None),
queue: deque::Worker::new_fifo(),
executor: self,
};
vacant.insert(worker.queue.stealer());
worker
}
}
/// A worker that participates in the work-stealing executor.
///
/// Each invocation of `run()` creates its own worker.
pub(crate) struct Worker<'a> {
/// The ID of this worker obtained during registration.
key: usize,
/// A slot into which tasks go before entering the actual queue.
///
/// Note that other workers cannot steal this task.
slot: Cell<Option<Runnable>>,
/// A queue of tasks.
///
/// Other workers are able to steal tasks from this queue.
queue: deque::Worker<Runnable>,
/// The parent work-stealing executor.
executor: &'a WorkStealingExecutor,
}
impl Worker<'_> {
/// Enters the context of this executor.
pub fn enter<T>(&self, f: impl FnOnce() -> T) -> T {
if WORKER.is_set() {
panic!("cannot run an executor inside another executor");
}
WORKER.set(self, f)
}
/// Executes a batch of tasks and returns `true` if there may be more tasks to run.
pub fn execute(&self) -> bool {
// Execute 4 series of 50 tasks.
for _ in 0..4 {
for _ in 0..50 {
// Find the next task to run.
match self.search() {
None => {
// There are no more tasks to run.
return false;
}
Some(r) => {
// Notify other workers that there may be stealable tasks.
//
// Instead of notifying when we find a task, we could notify when we push a
// task into the local queue - either strategy works.
//
// Notifying when we find a task is somewhat simpler because then we don't
// need to worry about `search()` re-shuffling tasks between queues, which
// races with other workers searching for tasks. Other workers might not
// find a task while there is one! Notifying here avoids this problem.
self.executor.event.notify();
// Run the task.
if throttle::setup(|| r.run()) {
// The task was woken while it was running, which means it got
// scheduled the moment running completed. Therefore, it is now inside
// the slot and would be the next task to run.
//
// Instead of re-running the task in the next iteration, let's flush
// the slot in order to give other tasks a chance to run.
//
// This is a necessary step to ensure task yielding works as expected.
// If a task wakes itself and returns `Poll::Pending`, we don't want it
// to run immediately after that because that'd defeat the whole
// purpose of yielding.
self.flush_slot();
}
}
}
}
// Flush the slot occasionally for fair scheduling.
//
// It is possible for two tasks to be exchanging messages between each other forever so
// that every time one of them runs, it wakes the other one and puts it into the slot.
// Flushing the slot prevena them from hogging the executor.
self.flush_slot();
// Steal some tasks from the injector queue.
//
// If the executor always has tasks in the local queue, it might never get to run tasks
// in the injector queue. To prevent them from starvation, we must move them into the
// local queue every now and then.
if let Some(r) = self.steal_global() {
self.push(r);
}
}
// There are likely more tasks to run.
true
}
/// Pushes a task into this worker.
fn push(&self, runnable: Runnable) {
// Put the task into the slot.
if let Some(r) = self.slot.replace(Some(runnable)) {
// If the slot had a task, push it into the queue.
self.queue.push(r);
}
}
/// Moves a task from the slot into the local queue.
fn flush_slot(&self) {
if let Some(r) = self.slot.take() {
self.queue.push(r);
}
}
/// Finds the next task to run.
fn search(&self) -> Option<Runnable> {
// Check if there is a task in the slot or in the queue.
if let Some(r) = self.slot.take().or_else(|| self.queue.pop()) {
return Some(r);
}
// Try stealing from the injector queue.
if let Some(r) = self.steal_global() {
return Some(r);
}
// Try stealing from other workers.
let stealers = self.executor.stealers.read().unwrap();
retry_steal(|| {
// Pick a random starting point in the iterator list and rotate the list.
let n = stealers.len();
let start = fast_random(n);
let iter = stealers.iter().chain(stealers.iter()).skip(start).take(n);
// Remove this worker's stealer handle.
let iter = iter.filter(|(k, _)| *k != self.key);
// Try stealing from each worker in the list. Collecting stops as soon as we get a
// `Steal::Success`. Otherwise, if any steal attempt resulted in a `Steal::Retry`,
// that's the collected result and we'll retry from the beginning.
iter.map(|(_, s)| s.steal_batch_and_pop(&self.queue))
.collect()
})
}
/// Steals tasks from the injector queue.
fn steal_global(&self) -> Option<Runnable> {
retry_steal(|| self.executor.injector.steal_batch_and_pop(&self.queue))
}
}
impl Drop for Worker<'_> {
fn drop(&mut self) {
// Unregister the worker.
self.executor.stealers.write().unwrap().remove(self.key);
// Move the task in the slot into the injector queue.
if let Some(r) = self.slot.take() {
r.schedule();
}
// Move all tasks in this worker's queue into the injector queue.
while let Some(r) = self.queue.pop() {
r.schedule();
}
// This task will not search for tasks anymore and therefore won't notify other workers if
// new tasks are found. Notify another worker to start searching right away.
self.executor.event.notify();
}
}
/// Returns a random number in the interval `0..n`.
fn fast_random(n: usize) -> usize {
thread_local! {
static RNG: Cell<Wrapping<u32>> = Cell::new(Wrapping(1));
}
RNG.with(|rng| {
// This is the 32-bit variant of Xorshift: https://en.wikipedia.org/wiki/Xorshift
let mut x = rng.get();
x ^= x << 13;
x ^= x >> 17;
x ^= x << 5;
rng.set(x);
// This is a fast alternative to `x % n`:
// https://lemire.me/blog/2016/06/27/a-fast-alternative-to-the-modulo-reduction/
((x.0 as u64).wrapping_mul(n as u64) >> 32) as usize
})
}
/// Retries a steal operation for as long as it returns `Steal::Retry`.
fn retry_steal<T>(mut steal_op: impl FnMut() -> deque::Steal<T>) -> Option<T> {
loop {
match steal_op() {
deque::Steal::Success(t) => return Some(t),
deque::Steal::Empty => return None,
deque::Steal::Retry => {}
}
}
}