blob: 8c59876d603a0e578c53cc2983c3f74bf38966e5 [file] [log] [blame]
//! Runs `!Send` futures on the current thread.
use crate::loom::sync::{Arc, Mutex};
use crate::runtime::task::{self, JoinHandle, LocalOwnedTasks, Task};
use crate::sync::AtomicWaker;
use crate::util::VecDequeCell;
use std::cell::Cell;
use std::collections::VecDeque;
use std::fmt;
use std::future::Future;
use std::marker::PhantomData;
use std::pin::Pin;
use std::task::Poll;
use pin_project_lite::pin_project;
cfg_rt! {
/// A set of tasks which are executed on the same thread.
///
/// In some cases, it is necessary to run one or more futures that do not
/// implement [`Send`] and thus are unsafe to send between threads. In these
/// cases, a [local task set] may be used to schedule one or more `!Send`
/// futures to run together on the same thread.
///
/// For example, the following code will not compile:
///
/// ```rust,compile_fail
/// use std::rc::Rc;
///
/// #[tokio::main]
/// async fn main() {
/// // `Rc` does not implement `Send`, and thus may not be sent between
/// // threads safely.
/// let unsend_data = Rc::new("my unsend data...");
///
/// let unsend_data = unsend_data.clone();
/// // Because the `async` block here moves `unsend_data`, the future is `!Send`.
/// // Since `tokio::spawn` requires the spawned future to implement `Send`, this
/// // will not compile.
/// tokio::spawn(async move {
/// println!("{}", unsend_data);
/// // ...
/// }).await.unwrap();
/// }
/// ```
///
/// # Use with `run_until`
///
/// To spawn `!Send` futures, we can use a local task set to schedule them
/// on the thread calling [`Runtime::block_on`]. When running inside of the
/// local task set, we can use [`task::spawn_local`], which can spawn
/// `!Send` futures. For example:
///
/// ```rust
/// use std::rc::Rc;
/// use tokio::task;
///
/// #[tokio::main]
/// async fn main() {
/// let unsend_data = Rc::new("my unsend data...");
///
/// // Construct a local task set that can run `!Send` futures.
/// let local = task::LocalSet::new();
///
/// // Run the local task set.
/// local.run_until(async move {
/// let unsend_data = unsend_data.clone();
/// // `spawn_local` ensures that the future is spawned on the local
/// // task set.
/// task::spawn_local(async move {
/// println!("{}", unsend_data);
/// // ...
/// }).await.unwrap();
/// }).await;
/// }
/// ```
/// **Note:** The `run_until` method can only be used in `#[tokio::main]`,
/// `#[tokio::test]` or directly inside a call to [`Runtime::block_on`]. It
/// cannot be used inside a task spawned with `tokio::spawn`.
///
/// ## Awaiting a `LocalSet`
///
/// Additionally, a `LocalSet` itself implements `Future`, completing when
/// *all* tasks spawned on the `LocalSet` complete. This can be used to run
/// several futures on a `LocalSet` and drive the whole set until they
/// complete. For example,
///
/// ```rust
/// use tokio::{task, time};
/// use std::rc::Rc;
///
/// #[tokio::main]
/// async fn main() {
/// let unsend_data = Rc::new("world");
/// let local = task::LocalSet::new();
///
/// let unsend_data2 = unsend_data.clone();
/// local.spawn_local(async move {
/// // ...
/// println!("hello {}", unsend_data2)
/// });
///
/// local.spawn_local(async move {
/// time::sleep(time::Duration::from_millis(100)).await;
/// println!("goodbye {}", unsend_data)
/// });
///
/// // ...
///
/// local.await;
/// }
/// ```
/// **Note:** Awaiting a `LocalSet` can only be done inside
/// `#[tokio::main]`, `#[tokio::test]` or directly inside a call to
/// [`Runtime::block_on`]. It cannot be used inside a task spawned with
/// `tokio::spawn`.
///
/// ## Use inside `tokio::spawn`
///
/// The two methods mentioned above cannot be used inside `tokio::spawn`, so
/// to spawn `!Send` futures from inside `tokio::spawn`, we need to do
/// something else. The solution is to create the `LocalSet` somewhere else,
/// and communicate with it using an [`mpsc`] channel.
///
/// The following example puts the `LocalSet` inside a new thread.
/// ```
/// use tokio::runtime::Builder;
/// use tokio::sync::{mpsc, oneshot};
/// use tokio::task::LocalSet;
///
/// // This struct describes the task you want to spawn. Here we include
/// // some simple examples. The oneshot channel allows sending a response
/// // to the spawner.
/// #[derive(Debug)]
/// enum Task {
/// PrintNumber(u32),
/// AddOne(u32, oneshot::Sender<u32>),
/// }
///
/// #[derive(Clone)]
/// struct LocalSpawner {
/// send: mpsc::UnboundedSender<Task>,
/// }
///
/// impl LocalSpawner {
/// pub fn new() -> Self {
/// let (send, mut recv) = mpsc::unbounded_channel();
///
/// let rt = Builder::new_current_thread()
/// .enable_all()
/// .build()
/// .unwrap();
///
/// std::thread::spawn(move || {
/// let local = LocalSet::new();
///
/// local.spawn_local(async move {
/// while let Some(new_task) = recv.recv().await {
/// tokio::task::spawn_local(run_task(new_task));
/// }
/// // If the while loop returns, then all the LocalSpawner
/// // objects have have been dropped.
/// });
///
/// // This will return once all senders are dropped and all
/// // spawned tasks have returned.
/// rt.block_on(local);
/// });
///
/// Self {
/// send,
/// }
/// }
///
/// pub fn spawn(&self, task: Task) {
/// self.send.send(task).expect("Thread with LocalSet has shut down.");
/// }
/// }
///
/// // This task may do !Send stuff. We use printing a number as an example,
/// // but it could be anything.
/// //
/// // The Task struct is an enum to support spawning many different kinds
/// // of operations.
/// async fn run_task(task: Task) {
/// match task {
/// Task::PrintNumber(n) => {
/// println!("{}", n);
/// },
/// Task::AddOne(n, response) => {
/// // We ignore failures to send the response.
/// let _ = response.send(n + 1);
/// },
/// }
/// }
///
/// #[tokio::main]
/// async fn main() {
/// let spawner = LocalSpawner::new();
///
/// let (send, response) = oneshot::channel();
/// spawner.spawn(Task::AddOne(10, send));
/// let eleven = response.await.unwrap();
/// assert_eq!(eleven, 11);
/// }
/// ```
///
/// [`Send`]: trait@std::marker::Send
/// [local task set]: struct@LocalSet
/// [`Runtime::block_on`]: method@crate::runtime::Runtime::block_on
/// [`task::spawn_local`]: fn@spawn_local
/// [`mpsc`]: mod@crate::sync::mpsc
pub struct LocalSet {
/// Current scheduler tick.
tick: Cell<u8>,
/// State available from thread-local.
context: Context,
/// This type should not be Send.
_not_send: PhantomData<*const ()>,
}
}
/// State available from the thread-local.
struct Context {
/// Collection of all active tasks spawned onto this executor.
owned: LocalOwnedTasks<Arc<Shared>>,
/// Local run queue sender and receiver.
queue: VecDequeCell<task::Notified<Arc<Shared>>>,
/// State shared between threads.
shared: Arc<Shared>,
}
/// LocalSet state shared between threads.
struct Shared {
/// Remote run queue sender.
queue: Mutex<Option<VecDeque<task::Notified<Arc<Shared>>>>>,
/// Wake the `LocalSet` task.
waker: AtomicWaker,
}
pin_project! {
#[derive(Debug)]
struct RunUntil<'a, F> {
local_set: &'a LocalSet,
#[pin]
future: F,
}
}
scoped_thread_local!(static CURRENT: Context);
cfg_rt! {
/// Spawns a `!Send` future on the local task set.
///
/// The spawned future will be run on the same thread that called `spawn_local.`
/// This may only be called from the context of a local task set.
///
/// # Panics
///
/// - This function panics if called outside of a local task set.
///
/// # Examples
///
/// ```rust
/// use std::rc::Rc;
/// use tokio::task;
///
/// #[tokio::main]
/// async fn main() {
/// let unsend_data = Rc::new("my unsend data...");
///
/// let local = task::LocalSet::new();
///
/// // Run the local task set.
/// local.run_until(async move {
/// let unsend_data = unsend_data.clone();
/// task::spawn_local(async move {
/// println!("{}", unsend_data);
/// // ...
/// }).await.unwrap();
/// }).await;
/// }
/// ```
#[track_caller]
pub fn spawn_local<F>(future: F) -> JoinHandle<F::Output>
where
F: Future + 'static,
F::Output: 'static,
{
spawn_local_inner(future, None)
}
#[track_caller]
pub(super) fn spawn_local_inner<F>(future: F, name: Option<&str>) -> JoinHandle<F::Output>
where F: Future + 'static,
F::Output: 'static
{
CURRENT.with(|maybe_cx| {
let cx = maybe_cx
.expect("`spawn_local` called from outside of a `task::LocalSet`");
cx.spawn(future, name)
})
}
}
/// Initial queue capacity.
const INITIAL_CAPACITY: usize = 64;
/// Max number of tasks to poll per tick.
const MAX_TASKS_PER_TICK: usize = 61;
/// How often it check the remote queue first.
const REMOTE_FIRST_INTERVAL: u8 = 31;
impl LocalSet {
/// Returns a new local task set.
pub fn new() -> LocalSet {
LocalSet {
tick: Cell::new(0),
context: Context {
owned: LocalOwnedTasks::new(),
queue: VecDequeCell::with_capacity(INITIAL_CAPACITY),
shared: Arc::new(Shared {
queue: Mutex::new(Some(VecDeque::with_capacity(INITIAL_CAPACITY))),
waker: AtomicWaker::new(),
}),
},
_not_send: PhantomData,
}
}
/// Spawns a `!Send` task onto the local task set.
///
/// This task is guaranteed to be run on the current thread.
///
/// Unlike the free function [`spawn_local`], this method may be used to
/// spawn local tasks when the task set is _not_ running. For example:
/// ```rust
/// use tokio::task;
///
/// #[tokio::main]
/// async fn main() {
/// let local = task::LocalSet::new();
///
/// // Spawn a future on the local set. This future will be run when
/// // we call `run_until` to drive the task set.
/// local.spawn_local(async {
/// // ...
/// });
///
/// // Run the local task set.
/// local.run_until(async move {
/// // ...
/// }).await;
///
/// // When `run` finishes, we can spawn _more_ futures, which will
/// // run in subsequent calls to `run_until`.
/// local.spawn_local(async {
/// // ...
/// });
///
/// local.run_until(async move {
/// // ...
/// }).await;
/// }
/// ```
/// [`spawn_local`]: fn@spawn_local
#[track_caller]
pub fn spawn_local<F>(&self, future: F) -> JoinHandle<F::Output>
where
F: Future + 'static,
F::Output: 'static,
{
self.spawn_named(future, None)
}
/// Runs a future to completion on the provided runtime, driving any local
/// futures spawned on this task set on the current thread.
///
/// This runs the given future on the runtime, blocking until it is
/// complete, and yielding its resolved result. Any tasks or timers which
/// the future spawns internally will be executed on the runtime. The future
/// may also call [`spawn_local`] to spawn_local additional local futures on the
/// current thread.
///
/// This method should not be called from an asynchronous context.
///
/// # Panics
///
/// This function panics if the executor is at capacity, if the provided
/// future panics, or if called within an asynchronous execution context.
///
/// # Notes
///
/// Since this function internally calls [`Runtime::block_on`], and drives
/// futures in the local task set inside that call to `block_on`, the local
/// futures may not use [in-place blocking]. If a blocking call needs to be
/// issued from a local task, the [`spawn_blocking`] API may be used instead.
///
/// For example, this will panic:
/// ```should_panic
/// use tokio::runtime::Runtime;
/// use tokio::task;
///
/// let rt = Runtime::new().unwrap();
/// let local = task::LocalSet::new();
/// local.block_on(&rt, async {
/// let join = task::spawn_local(async {
/// let blocking_result = task::block_in_place(|| {
/// // ...
/// });
/// // ...
/// });
/// join.await.unwrap();
/// })
/// ```
/// This, however, will not panic:
/// ```
/// use tokio::runtime::Runtime;
/// use tokio::task;
///
/// let rt = Runtime::new().unwrap();
/// let local = task::LocalSet::new();
/// local.block_on(&rt, async {
/// let join = task::spawn_local(async {
/// let blocking_result = task::spawn_blocking(|| {
/// // ...
/// }).await;
/// // ...
/// });
/// join.await.unwrap();
/// })
/// ```
///
/// [`spawn_local`]: fn@spawn_local
/// [`Runtime::block_on`]: method@crate::runtime::Runtime::block_on
/// [in-place blocking]: fn@crate::task::block_in_place
/// [`spawn_blocking`]: fn@crate::task::spawn_blocking
#[cfg(feature = "rt")]
#[cfg_attr(docsrs, doc(cfg(feature = "rt")))]
pub fn block_on<F>(&self, rt: &crate::runtime::Runtime, future: F) -> F::Output
where
F: Future,
{
rt.block_on(self.run_until(future))
}
/// Runs a future to completion on the local set, returning its output.
///
/// This returns a future that runs the given future with a local set,
/// allowing it to call [`spawn_local`] to spawn additional `!Send` futures.
/// Any local futures spawned on the local set will be driven in the
/// background until the future passed to `run_until` completes. When the future
/// passed to `run` finishes, any local futures which have not completed
/// will remain on the local set, and will be driven on subsequent calls to
/// `run_until` or when [awaiting the local set] itself.
///
/// # Examples
///
/// ```rust
/// use tokio::task;
///
/// #[tokio::main]
/// async fn main() {
/// task::LocalSet::new().run_until(async {
/// task::spawn_local(async move {
/// // ...
/// }).await.unwrap();
/// // ...
/// }).await;
/// }
/// ```
///
/// [`spawn_local`]: fn@spawn_local
/// [awaiting the local set]: #awaiting-a-localset
pub async fn run_until<F>(&self, future: F) -> F::Output
where
F: Future,
{
let run_until = RunUntil {
future,
local_set: self,
};
run_until.await
}
pub(in crate::task) fn spawn_named<F>(
&self,
future: F,
name: Option<&str>,
) -> JoinHandle<F::Output>
where
F: Future + 'static,
F::Output: 'static,
{
let handle = self.context.spawn(future, name);
// Because a task was spawned from *outside* the `LocalSet`, wake the
// `LocalSet` future to execute the new task, if it hasn't been woken.
//
// Spawning via the free fn `spawn` does not require this, as it can
// only be called from *within* a future executing on the `LocalSet` —
// in that case, the `LocalSet` must already be awake.
self.context.shared.waker.wake();
handle
}
/// Ticks the scheduler, returning whether the local future needs to be
/// notified again.
fn tick(&self) -> bool {
for _ in 0..MAX_TASKS_PER_TICK {
match self.next_task() {
// Run the task
//
// Safety: As spawned tasks are `!Send`, `run_unchecked` must be
// used. We are responsible for maintaining the invariant that
// `run_unchecked` is only called on threads that spawned the
// task initially. Because `LocalSet` itself is `!Send`, and
// `spawn_local` spawns into the `LocalSet` on the current
// thread, the invariant is maintained.
Some(task) => crate::coop::budget(|| task.run()),
// We have fully drained the queue of notified tasks, so the
// local future doesn't need to be notified again — it can wait
// until something else wakes a task in the local set.
None => return false,
}
}
true
}
fn next_task(&self) -> Option<task::LocalNotified<Arc<Shared>>> {
let tick = self.tick.get();
self.tick.set(tick.wrapping_add(1));
let task = if tick % REMOTE_FIRST_INTERVAL == 0 {
self.context
.shared
.queue
.lock()
.as_mut()
.and_then(|queue| queue.pop_front())
.or_else(|| self.context.queue.pop_front())
} else {
self.context.queue.pop_front().or_else(|| {
self.context
.shared
.queue
.lock()
.as_mut()
.and_then(|queue| queue.pop_front())
})
};
task.map(|task| self.context.owned.assert_owner(task))
}
fn with<T>(&self, f: impl FnOnce() -> T) -> T {
CURRENT.set(&self.context, f)
}
}
impl fmt::Debug for LocalSet {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("LocalSet").finish()
}
}
impl Future for LocalSet {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
// Register the waker before starting to work
self.context.shared.waker.register_by_ref(cx.waker());
if self.with(|| self.tick()) {
// If `tick` returns true, we need to notify the local future again:
// there are still tasks remaining in the run queue.
cx.waker().wake_by_ref();
Poll::Pending
} else if self.context.owned.is_empty() {
// If the scheduler has no remaining futures, we're done!
Poll::Ready(())
} else {
// There are still futures in the local set, but we've polled all the
// futures in the run queue. Therefore, we can just return Pending
// since the remaining futures will be woken from somewhere else.
Poll::Pending
}
}
}
impl Default for LocalSet {
fn default() -> LocalSet {
LocalSet::new()
}
}
impl Drop for LocalSet {
fn drop(&mut self) {
self.with(|| {
// Shut down all tasks in the LocalOwnedTasks and close it to
// prevent new tasks from ever being added.
self.context.owned.close_and_shutdown_all();
// We already called shutdown on all tasks above, so there is no
// need to call shutdown.
for task in self.context.queue.take() {
drop(task);
}
// Take the queue from the Shared object to prevent pushing
// notifications to it in the future.
let queue = self.context.shared.queue.lock().take().unwrap();
for task in queue {
drop(task);
}
assert!(self.context.owned.is_empty());
});
}
}
// === impl Context ===
impl Context {
#[track_caller]
fn spawn<F>(&self, future: F, name: Option<&str>) -> JoinHandle<F::Output>
where
F: Future + 'static,
F::Output: 'static,
{
let id = crate::runtime::task::Id::next();
let future = crate::util::trace::task(future, "local", name, id.as_u64());
let (handle, notified) = self.owned.bind(future, self.shared.clone(), id);
if let Some(notified) = notified {
self.shared.schedule(notified);
}
handle
}
}
// === impl LocalFuture ===
impl<T: Future> Future for RunUntil<'_, T> {
type Output = T::Output;
fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
let me = self.project();
me.local_set.with(|| {
me.local_set
.context
.shared
.waker
.register_by_ref(cx.waker());
let _no_blocking = crate::runtime::enter::disallow_blocking();
let f = me.future;
if let Poll::Ready(output) = crate::coop::budget(|| f.poll(cx)) {
return Poll::Ready(output);
}
if me.local_set.tick() {
// If `tick` returns `true`, we need to notify the local future again:
// there are still tasks remaining in the run queue.
cx.waker().wake_by_ref();
}
Poll::Pending
})
}
}
impl Shared {
/// Schedule the provided task on the scheduler.
fn schedule(&self, task: task::Notified<Arc<Self>>) {
CURRENT.with(|maybe_cx| match maybe_cx {
Some(cx) if cx.shared.ptr_eq(self) => {
cx.queue.push_back(task);
}
_ => {
// First check whether the queue is still there (if not, the
// LocalSet is dropped). Then push to it if so, and if not,
// do nothing.
let mut lock = self.queue.lock();
if let Some(queue) = lock.as_mut() {
queue.push_back(task);
drop(lock);
self.waker.wake();
}
}
});
}
fn ptr_eq(&self, other: &Shared) -> bool {
std::ptr::eq(self, other)
}
}
impl task::Schedule for Arc<Shared> {
fn release(&self, task: &Task<Self>) -> Option<Task<Self>> {
CURRENT.with(|maybe_cx| {
let cx = maybe_cx.expect("scheduler context missing");
assert!(cx.shared.ptr_eq(self));
cx.owned.remove(task)
})
}
fn schedule(&self, task: task::Notified<Self>) {
Shared::schedule(self, task);
}
}