blob: b01c5bc47493610f014fca97e4487021bfa64d0a [file] [log] [blame]
//! A scheduler is initialized with a fixed number of workers. Each worker is
//! driven by a thread. Each worker has a "core" which contains data such as the
//! run queue and other state. When `block_in_place` is called, the worker's
//! "core" is handed off to a new thread allowing the scheduler to continue to
//! make progress while the originating thread blocks.
//!
//! # Shutdown
//!
//! Shutting down the runtime involves the following steps:
//!
//! 1. The Shared::close method is called. This closes the inject queue and
//! OwnedTasks instance and wakes up all worker threads.
//!
//! 2. Each worker thread observes the close signal next time it runs
//! Core::maintenance by checking whether the inject queue is closed.
//! The Core::is_shutdown flag is set to true.
//!
//! 3. The worker thread calls `pre_shutdown` in parallel. Here, the worker
//! will keep removing tasks from OwnedTasks until it is empty. No new
//! tasks can be pushed to the OwnedTasks during or after this step as it
//! was closed in step 1.
//!
//! 5. The workers call Shared::shutdown to enter the single-threaded phase of
//! shutdown. These calls will push their core to Shared::shutdown_cores,
//! and the last thread to push its core will finish the shutdown procedure.
//!
//! 6. The local run queue of each core is emptied, then the inject queue is
//! emptied.
//!
//! At this point, shutdown has completed. It is not possible for any of the
//! collections to contain any tasks at this point, as each collection was
//! closed first, then emptied afterwards.
//!
//! ## Spawns during shutdown
//!
//! When spawning tasks during shutdown, there are two cases:
//!
//! * The spawner observes the OwnedTasks being open, and the inject queue is
//! closed.
//! * The spawner observes the OwnedTasks being closed and doesn't check the
//! inject queue.
//!
//! The first case can only happen if the OwnedTasks::bind call happens before
//! or during step 1 of shutdown. In this case, the runtime will clean up the
//! task in step 3 of shutdown.
//!
//! In the latter case, the task was not spawned and the task is immediately
//! cancelled by the spawner.
//!
//! The correctness of shutdown requires both the inject queue and OwnedTasks
//! collection to have a closed bit. With a close bit on only the inject queue,
//! spawning could run in to a situation where a task is successfully bound long
//! after the runtime has shut down. With a close bit on only the OwnedTasks,
//! the first spawning situation could result in the notification being pushed
//! to the inject queue after step 6 of shutdown, which would leave a task in
//! the inject queue indefinitely. This would be a ref-count cycle and a memory
//! leak.
use crate::coop;
use crate::future::Future;
use crate::loom::rand::seed;
use crate::loom::sync::{Arc, Mutex};
use crate::park::{Park, Unpark};
use crate::runtime;
use crate::runtime::enter::EnterContext;
use crate::runtime::task::{Inject, JoinHandle, OwnedTasks};
use crate::runtime::thread_pool::{queue, Idle, Parker, Unparker};
use crate::runtime::{task, Callback, HandleInner, MetricsBatch, SchedulerMetrics, WorkerMetrics};
use crate::util::atomic_cell::AtomicCell;
use crate::util::FastRand;
use std::cell::RefCell;
use std::time::Duration;
/// A scheduler worker
pub(super) struct Worker {
/// Reference to shared state
shared: Arc<Shared>,
/// Index holding this worker's remote state
index: usize,
/// Used to hand-off a worker's core to another thread.
core: AtomicCell<Core>,
}
/// Core data
struct Core {
/// Used to schedule bookkeeping tasks every so often.
tick: u32,
/// When a task is scheduled from a worker, it is stored in this slot. The
/// worker will check this slot for a task **before** checking the run
/// queue. This effectively results in the **last** scheduled task to be run
/// next (LIFO). This is an optimization for message passing patterns and
/// helps to reduce latency.
lifo_slot: Option<Notified>,
/// The worker-local run queue.
run_queue: queue::Local<Arc<Shared>>,
/// True if the worker is currently searching for more work. Searching
/// involves attempting to steal from other workers.
is_searching: bool,
/// True if the scheduler is being shutdown
is_shutdown: bool,
/// Parker
///
/// Stored in an `Option` as the parker is added / removed to make the
/// borrow checker happy.
park: Option<Parker>,
/// Batching metrics so they can be submitted to RuntimeMetrics.
metrics: MetricsBatch,
/// Fast random number generator.
rand: FastRand,
/// How many ticks before pulling a task from the global/remote queue?
global_queue_interval: u32,
/// How many ticks before yielding to the driver for timer and I/O events?
event_interval: u32,
}
/// State shared across all workers
pub(super) struct Shared {
/// Handle to the I/O driver, timer, blocking spawner, ...
handle_inner: HandleInner,
/// Per-worker remote state. All other workers have access to this and is
/// how they communicate between each other.
remotes: Box<[Remote]>,
/// Global task queue used for:
/// 1. Submit work to the scheduler while **not** currently on a worker thread.
/// 2. Submit work to the scheduler when a worker run queue is saturated
inject: Inject<Arc<Shared>>,
/// Coordinates idle workers
idle: Idle,
/// Collection of all active tasks spawned onto this executor.
owned: OwnedTasks<Arc<Shared>>,
/// Cores that have observed the shutdown signal
///
/// The core is **not** placed back in the worker to avoid it from being
/// stolen by a thread that was spawned as part of `block_in_place`.
#[allow(clippy::vec_box)] // we're moving an already-boxed value
shutdown_cores: Mutex<Vec<Box<Core>>>,
/// Callback for a worker parking itself
before_park: Option<Callback>,
/// Callback for a worker unparking itself
after_unpark: Option<Callback>,
/// Collects metrics from the runtime.
pub(super) scheduler_metrics: SchedulerMetrics,
pub(super) worker_metrics: Box<[WorkerMetrics]>,
}
/// Used to communicate with a worker from other threads.
struct Remote {
/// Steals tasks from this worker.
steal: queue::Steal<Arc<Shared>>,
/// Unparks the associated worker thread
unpark: Unparker,
}
/// Thread-local context
struct Context {
/// Worker
worker: Arc<Worker>,
/// Core data
core: RefCell<Option<Box<Core>>>,
}
/// Starts the workers
pub(crate) struct Launch(Vec<Arc<Worker>>);
/// Running a task may consume the core. If the core is still available when
/// running the task completes, it is returned. Otherwise, the worker will need
/// to stop processing.
type RunResult = Result<Box<Core>, ()>;
/// A task handle
type Task = task::Task<Arc<Shared>>;
/// A notified task handle
type Notified = task::Notified<Arc<Shared>>;
// Tracks thread-local state
scoped_thread_local!(static CURRENT: Context);
pub(super) fn create(
size: usize,
park: Parker,
handle_inner: HandleInner,
before_park: Option<Callback>,
after_unpark: Option<Callback>,
global_queue_interval: u32,
event_interval: u32,
) -> (Arc<Shared>, Launch) {
let mut cores = Vec::with_capacity(size);
let mut remotes = Vec::with_capacity(size);
let mut worker_metrics = Vec::with_capacity(size);
// Create the local queues
for _ in 0..size {
let (steal, run_queue) = queue::local();
let park = park.clone();
let unpark = park.unpark();
cores.push(Box::new(Core {
tick: 0,
lifo_slot: None,
run_queue,
is_searching: false,
is_shutdown: false,
park: Some(park),
metrics: MetricsBatch::new(),
rand: FastRand::new(seed()),
global_queue_interval,
event_interval,
}));
remotes.push(Remote { steal, unpark });
worker_metrics.push(WorkerMetrics::new());
}
let shared = Arc::new(Shared {
handle_inner,
remotes: remotes.into_boxed_slice(),
inject: Inject::new(),
idle: Idle::new(size),
owned: OwnedTasks::new(),
shutdown_cores: Mutex::new(vec![]),
before_park,
after_unpark,
scheduler_metrics: SchedulerMetrics::new(),
worker_metrics: worker_metrics.into_boxed_slice(),
});
let mut launch = Launch(vec![]);
for (index, core) in cores.drain(..).enumerate() {
launch.0.push(Arc::new(Worker {
shared: shared.clone(),
index,
core: AtomicCell::new(Some(core)),
}));
}
(shared, launch)
}
pub(crate) fn block_in_place<F, R>(f: F) -> R
where
F: FnOnce() -> R,
{
// Try to steal the worker core back
struct Reset(coop::Budget);
impl Drop for Reset {
fn drop(&mut self) {
CURRENT.with(|maybe_cx| {
if let Some(cx) = maybe_cx {
let core = cx.worker.core.take();
let mut cx_core = cx.core.borrow_mut();
assert!(cx_core.is_none());
*cx_core = core;
// Reset the task budget as we are re-entering the
// runtime.
coop::set(self.0);
}
});
}
}
let mut had_entered = false;
CURRENT.with(|maybe_cx| {
match (crate::runtime::enter::context(), maybe_cx.is_some()) {
(EnterContext::Entered { .. }, true) => {
// We are on a thread pool runtime thread, so we just need to
// set up blocking.
had_entered = true;
}
(EnterContext::Entered { allow_blocking }, false) => {
// We are on an executor, but _not_ on the thread pool. That is
// _only_ okay if we are in a thread pool runtime's block_on
// method:
if allow_blocking {
had_entered = true;
return;
} else {
// This probably means we are on the basic_scheduler or in a
// LocalSet, where it is _not_ okay to block.
panic!("can call blocking only when running on the multi-threaded runtime");
}
}
(EnterContext::NotEntered, true) => {
// This is a nested call to block_in_place (we already exited).
// All the necessary setup has already been done.
return;
}
(EnterContext::NotEntered, false) => {
// We are outside of the tokio runtime, so blocking is fine.
// We can also skip all of the thread pool blocking setup steps.
return;
}
}
let cx = maybe_cx.expect("no .is_some() == false cases above should lead here");
// Get the worker core. If none is set, then blocking is fine!
let core = match cx.core.borrow_mut().take() {
Some(core) => core,
None => return,
};
// The parker should be set here
assert!(core.park.is_some());
// In order to block, the core must be sent to another thread for
// execution.
//
// First, move the core back into the worker's shared core slot.
cx.worker.core.set(core);
// Next, clone the worker handle and send it to a new thread for
// processing.
//
// Once the blocking task is done executing, we will attempt to
// steal the core back.
let worker = cx.worker.clone();
runtime::spawn_blocking(move || run(worker));
});
if had_entered {
// Unset the current task's budget. Blocking sections are not
// constrained by task budgets.
let _reset = Reset(coop::stop());
crate::runtime::enter::exit(f)
} else {
f()
}
}
impl Launch {
pub(crate) fn launch(mut self) {
for worker in self.0.drain(..) {
runtime::spawn_blocking(move || run(worker));
}
}
}
fn run(worker: Arc<Worker>) {
// Acquire a core. If this fails, then another thread is running this
// worker and there is nothing further to do.
let core = match worker.core.take() {
Some(core) => core,
None => return,
};
// Set the worker context.
let cx = Context {
worker,
core: RefCell::new(None),
};
let _enter = crate::runtime::enter(true);
CURRENT.set(&cx, || {
// This should always be an error. It only returns a `Result` to support
// using `?` to short circuit.
assert!(cx.run(core).is_err());
});
}
impl Context {
fn run(&self, mut core: Box<Core>) -> RunResult {
while !core.is_shutdown {
// Increment the tick
core.tick();
// Run maintenance, if needed
core = self.maintenance(core);
// First, check work available to the current worker.
if let Some(task) = core.next_task(&self.worker) {
core = self.run_task(task, core)?;
continue;
}
// There is no more **local** work to process, try to steal work
// from other workers.
if let Some(task) = core.steal_work(&self.worker) {
core = self.run_task(task, core)?;
} else {
// Wait for work
core = self.park(core);
}
}
core.pre_shutdown(&self.worker);
// Signal shutdown
self.worker.shared.shutdown(core);
Err(())
}
fn run_task(&self, task: Notified, mut core: Box<Core>) -> RunResult {
let task = self.worker.shared.owned.assert_owner(task);
// Make sure the worker is not in the **searching** state. This enables
// another idle worker to try to steal work.
core.transition_from_searching(&self.worker);
// Make the core available to the runtime context
core.metrics.incr_poll_count();
*self.core.borrow_mut() = Some(core);
// Run the task
coop::budget(|| {
task.run();
// As long as there is budget remaining and a task exists in the
// `lifo_slot`, then keep running.
loop {
// Check if we still have the core. If not, the core was stolen
// by another worker.
let mut core = match self.core.borrow_mut().take() {
Some(core) => core,
None => return Err(()),
};
// Check for a task in the LIFO slot
let task = match core.lifo_slot.take() {
Some(task) => task,
None => return Ok(core),
};
if coop::has_budget_remaining() {
// Run the LIFO task, then loop
core.metrics.incr_poll_count();
*self.core.borrow_mut() = Some(core);
let task = self.worker.shared.owned.assert_owner(task);
task.run();
} else {
// Not enough budget left to run the LIFO task, push it to
// the back of the queue and return.
core.run_queue
.push_back(task, self.worker.inject(), &mut core.metrics);
return Ok(core);
}
}
})
}
fn maintenance(&self, mut core: Box<Core>) -> Box<Core> {
if core.tick % core.event_interval == 0 {
// Call `park` with a 0 timeout. This enables the I/O driver, timer, ...
// to run without actually putting the thread to sleep.
core = self.park_timeout(core, Some(Duration::from_millis(0)));
// Run regularly scheduled maintenance
core.maintenance(&self.worker);
}
core
}
/// Parks the worker thread while waiting for tasks to execute.
///
/// This function checks if indeed there's no more work left to be done before parking.
/// Also important to notice that, before parking, the worker thread will try to take
/// ownership of the Driver (IO/Time) and dispatch any events that might have fired.
/// Whenever a worker thread executes the Driver loop, all waken tasks are scheduled
/// in its own local queue until the queue saturates (ntasks > LOCAL_QUEUE_CAPACITY).
/// When the local queue is saturated, the overflow tasks are added to the injection queue
/// from where other workers can pick them up.
/// Also, we rely on the workstealing algorithm to spread the tasks amongst workers
/// after all the IOs get dispatched
fn park(&self, mut core: Box<Core>) -> Box<Core> {
if let Some(f) = &self.worker.shared.before_park {
f();
}
if core.transition_to_parked(&self.worker) {
while !core.is_shutdown {
core.metrics.about_to_park();
core = self.park_timeout(core, None);
core.metrics.returned_from_park();
// Run regularly scheduled maintenance
core.maintenance(&self.worker);
if core.transition_from_parked(&self.worker) {
break;
}
}
}
if let Some(f) = &self.worker.shared.after_unpark {
f();
}
core
}
fn park_timeout(&self, mut core: Box<Core>, duration: Option<Duration>) -> Box<Core> {
// Take the parker out of core
let mut park = core.park.take().expect("park missing");
// Store `core` in context
*self.core.borrow_mut() = Some(core);
// Park thread
if let Some(timeout) = duration {
park.park_timeout(timeout).expect("park failed");
} else {
park.park().expect("park failed");
}
// Remove `core` from context
core = self.core.borrow_mut().take().expect("core missing");
// Place `park` back in `core`
core.park = Some(park);
// If there are tasks available to steal, but this worker is not
// looking for tasks to steal, notify another worker.
if !core.is_searching && core.run_queue.is_stealable() {
self.worker.shared.notify_parked();
}
core
}
}
impl Core {
/// Increment the tick
fn tick(&mut self) {
self.tick = self.tick.wrapping_add(1);
}
/// Return the next notified task available to this worker.
fn next_task(&mut self, worker: &Worker) -> Option<Notified> {
if self.tick % self.global_queue_interval == 0 {
worker.inject().pop().or_else(|| self.next_local_task())
} else {
self.next_local_task().or_else(|| worker.inject().pop())
}
}
fn next_local_task(&mut self) -> Option<Notified> {
self.lifo_slot.take().or_else(|| self.run_queue.pop())
}
/// Function responsible for stealing tasks from another worker
///
/// Note: Only if less than half the workers are searching for tasks to steal
/// a new worker will actually try to steal. The idea is to make sure not all
/// workers will be trying to steal at the same time.
fn steal_work(&mut self, worker: &Worker) -> Option<Notified> {
if !self.transition_to_searching(worker) {
return None;
}
let num = worker.shared.remotes.len();
// Start from a random worker
let start = self.rand.fastrand_n(num as u32) as usize;
for i in 0..num {
let i = (start + i) % num;
// Don't steal from ourself! We know we don't have work.
if i == worker.index {
continue;
}
let target = &worker.shared.remotes[i];
if let Some(task) = target
.steal
.steal_into(&mut self.run_queue, &mut self.metrics)
{
return Some(task);
}
}
// Fallback on checking the global queue
worker.shared.inject.pop()
}
fn transition_to_searching(&mut self, worker: &Worker) -> bool {
if !self.is_searching {
self.is_searching = worker.shared.idle.transition_worker_to_searching();
}
self.is_searching
}
fn transition_from_searching(&mut self, worker: &Worker) {
if !self.is_searching {
return;
}
self.is_searching = false;
worker.shared.transition_worker_from_searching();
}
/// Prepares the worker state for parking.
///
/// Returns true if the transition happened, false if there is work to do first.
fn transition_to_parked(&mut self, worker: &Worker) -> bool {
// Workers should not park if they have work to do
if self.lifo_slot.is_some() || self.run_queue.has_tasks() {
return false;
}
// When the final worker transitions **out** of searching to parked, it
// must check all the queues one last time in case work materialized
// between the last work scan and transitioning out of searching.
let is_last_searcher = worker
.shared
.idle
.transition_worker_to_parked(worker.index, self.is_searching);
// The worker is no longer searching. Setting this is the local cache
// only.
self.is_searching = false;
if is_last_searcher {
worker.shared.notify_if_work_pending();
}
true
}
/// Returns `true` if the transition happened.
fn transition_from_parked(&mut self, worker: &Worker) -> bool {
// If a task is in the lifo slot, then we must unpark regardless of
// being notified
if self.lifo_slot.is_some() {
// When a worker wakes, it should only transition to the "searching"
// state when the wake originates from another worker *or* a new task
// is pushed. We do *not* want the worker to transition to "searching"
// when it wakes when the I/O driver receives new events.
self.is_searching = !worker.shared.idle.unpark_worker_by_id(worker.index);
return true;
}
if worker.shared.idle.is_parked(worker.index) {
return false;
}
// When unparked, the worker is in the searching state.
self.is_searching = true;
true
}
/// Runs maintenance work such as checking the pool's state.
fn maintenance(&mut self, worker: &Worker) {
self.metrics
.submit(&worker.shared.worker_metrics[worker.index]);
if !self.is_shutdown {
// Check if the scheduler has been shutdown
self.is_shutdown = worker.inject().is_closed();
}
}
/// Signals all tasks to shut down, and waits for them to complete. Must run
/// before we enter the single-threaded phase of shutdown processing.
fn pre_shutdown(&mut self, worker: &Worker) {
// Signal to all tasks to shut down.
worker.shared.owned.close_and_shutdown_all();
self.metrics
.submit(&worker.shared.worker_metrics[worker.index]);
}
/// Shuts down the core.
fn shutdown(&mut self) {
// Take the core
let mut park = self.park.take().expect("park missing");
// Drain the queue
while self.next_local_task().is_some() {}
park.shutdown();
}
}
impl Worker {
/// Returns a reference to the scheduler's injection queue.
fn inject(&self) -> &Inject<Arc<Shared>> {
&self.shared.inject
}
}
impl task::Schedule for Arc<Shared> {
fn release(&self, task: &Task) -> Option<Task> {
self.owned.remove(task)
}
fn schedule(&self, task: Notified) {
(**self).schedule(task, false);
}
fn yield_now(&self, task: Notified) {
(**self).schedule(task, true);
}
}
impl Shared {
pub(crate) fn as_handle_inner(&self) -> &HandleInner {
&self.handle_inner
}
pub(super) fn bind_new_task<T>(
me: &Arc<Self>,
future: T,
id: crate::runtime::task::Id,
) -> JoinHandle<T::Output>
where
T: Future + Send + 'static,
T::Output: Send + 'static,
{
let (handle, notified) = me.owned.bind(future, me.clone(), id);
if let Some(notified) = notified {
me.schedule(notified, false);
}
handle
}
pub(super) fn schedule(&self, task: Notified, is_yield: bool) {
CURRENT.with(|maybe_cx| {
if let Some(cx) = maybe_cx {
// Make sure the task is part of the **current** scheduler.
if self.ptr_eq(&cx.worker.shared) {
// And the current thread still holds a core
if let Some(core) = cx.core.borrow_mut().as_mut() {
self.schedule_local(core, task, is_yield);
return;
}
}
}
// Otherwise, use the inject queue.
self.inject.push(task);
self.scheduler_metrics.inc_remote_schedule_count();
self.notify_parked();
})
}
fn schedule_local(&self, core: &mut Core, task: Notified, is_yield: bool) {
core.metrics.inc_local_schedule_count();
// Spawning from the worker thread. If scheduling a "yield" then the
// task must always be pushed to the back of the queue, enabling other
// tasks to be executed. If **not** a yield, then there is more
// flexibility and the task may go to the front of the queue.
let should_notify = if is_yield {
core.run_queue
.push_back(task, &self.inject, &mut core.metrics);
true
} else {
// Push to the LIFO slot
let prev = core.lifo_slot.take();
let ret = prev.is_some();
if let Some(prev) = prev {
core.run_queue
.push_back(prev, &self.inject, &mut core.metrics);
}
core.lifo_slot = Some(task);
ret
};
// Only notify if not currently parked. If `park` is `None`, then the
// scheduling is from a resource driver. As notifications often come in
// batches, the notification is delayed until the park is complete.
if should_notify && core.park.is_some() {
self.notify_parked();
}
}
pub(super) fn close(&self) {
if self.inject.close() {
self.notify_all();
}
}
fn notify_parked(&self) {
if let Some(index) = self.idle.worker_to_notify() {
self.remotes[index].unpark.unpark();
}
}
fn notify_all(&self) {
for remote in &self.remotes[..] {
remote.unpark.unpark();
}
}
fn notify_if_work_pending(&self) {
for remote in &self.remotes[..] {
if !remote.steal.is_empty() {
self.notify_parked();
return;
}
}
if !self.inject.is_empty() {
self.notify_parked();
}
}
fn transition_worker_from_searching(&self) {
if self.idle.transition_worker_from_searching() {
// We are the final searching worker. Because work was found, we
// need to notify another worker.
self.notify_parked();
}
}
/// Signals that a worker has observed the shutdown signal and has replaced
/// its core back into its handle.
///
/// If all workers have reached this point, the final cleanup is performed.
fn shutdown(&self, core: Box<Core>) {
let mut cores = self.shutdown_cores.lock();
cores.push(core);
if cores.len() != self.remotes.len() {
return;
}
debug_assert!(self.owned.is_empty());
for mut core in cores.drain(..) {
core.shutdown();
}
// Drain the injection queue
//
// We already shut down every task, so we can simply drop the tasks.
while let Some(task) = self.inject.pop() {
drop(task);
}
}
fn ptr_eq(&self, other: &Shared) -> bool {
std::ptr::eq(self, other)
}
}
impl crate::runtime::ToHandle for Arc<Shared> {
fn to_handle(&self) -> crate::runtime::Handle {
use crate::runtime::thread_pool::Spawner;
use crate::runtime::{self, Handle};
Handle {
spawner: runtime::Spawner::ThreadPool(Spawner {
shared: self.clone(),
}),
}
}
}
cfg_metrics! {
impl Shared {
pub(super) fn injection_queue_depth(&self) -> usize {
self.inject.len()
}
pub(super) fn worker_local_queue_depth(&self, worker: usize) -> usize {
self.remotes[worker].steal.len()
}
}
}