blob: 118d537d2f196c63a8577ba89c4959bf3812699c [file] [log] [blame]
use crate::runtime::blocking::{BlockingTask, NoopSchedule};
use crate::runtime::task::{self, JoinHandle};
use crate::runtime::{blocking, context, driver, Spawner};
use crate::util::error::{CONTEXT_MISSING_ERROR, THREAD_LOCAL_DESTROYED_ERROR};
use std::future::Future;
use std::marker::PhantomData;
use std::{error, fmt};
/// Handle to the runtime.
///
/// The handle is internally reference-counted and can be freely cloned. A handle can be
/// obtained using the [`Runtime::handle`] method.
///
/// [`Runtime::handle`]: crate::runtime::Runtime::handle()
#[derive(Debug, Clone)]
pub struct Handle {
pub(super) spawner: Spawner,
}
/// All internal handles that are *not* the scheduler's spawner.
#[derive(Debug)]
pub(crate) struct HandleInner {
/// Handles to the I/O drivers
#[cfg_attr(
not(any(feature = "net", feature = "process", all(unix, feature = "signal"))),
allow(dead_code)
)]
pub(super) io_handle: driver::IoHandle,
/// Handles to the signal drivers
#[cfg_attr(
any(
loom,
not(all(unix, feature = "signal")),
not(all(unix, feature = "process")),
),
allow(dead_code)
)]
pub(super) signal_handle: driver::SignalHandle,
/// Handles to the time drivers
#[cfg_attr(not(feature = "time"), allow(dead_code))]
pub(super) time_handle: driver::TimeHandle,
/// Source of `Instant::now()`
#[cfg_attr(not(all(feature = "time", feature = "test-util")), allow(dead_code))]
pub(super) clock: driver::Clock,
/// Blocking pool spawner
pub(super) blocking_spawner: blocking::Spawner,
}
/// Create a new runtime handle.
pub(crate) trait ToHandle {
fn to_handle(&self) -> Handle;
}
/// Runtime context guard.
///
/// Returned by [`Runtime::enter`] and [`Handle::enter`], the context guard exits
/// the runtime context on drop.
///
/// [`Runtime::enter`]: fn@crate::runtime::Runtime::enter
#[derive(Debug)]
#[must_use = "Creating and dropping a guard does nothing"]
pub struct EnterGuard<'a> {
_guard: context::EnterGuard,
_handle_lifetime: PhantomData<&'a Handle>,
}
impl Handle {
/// Enters the runtime context. This allows you to construct types that must
/// have an executor available on creation such as [`Sleep`] or [`TcpStream`].
/// It will also allow you to call methods such as [`tokio::spawn`] and [`Handle::current`]
/// without panicking.
///
/// [`Sleep`]: struct@crate::time::Sleep
/// [`TcpStream`]: struct@crate::net::TcpStream
/// [`tokio::spawn`]: fn@crate::spawn
pub fn enter(&self) -> EnterGuard<'_> {
EnterGuard {
_guard: context::enter(self.clone()),
_handle_lifetime: PhantomData,
}
}
/// Returns a `Handle` view over the currently running `Runtime`.
///
/// # Panic
///
/// This will panic if called outside the context of a Tokio runtime. That means that you must
/// call this on one of the threads **being run by the runtime**, or from a thread with an active
/// `EnterGuard`. Calling this from within a thread created by `std::thread::spawn` (for example)
/// will cause a panic unless that thread has an active `EnterGuard`.
///
/// # Examples
///
/// This can be used to obtain the handle of the surrounding runtime from an async
/// block or function running on that runtime.
///
/// ```
/// # use std::thread;
/// # use tokio::runtime::Runtime;
/// # fn dox() {
/// # let rt = Runtime::new().unwrap();
/// # rt.spawn(async {
/// use tokio::runtime::Handle;
///
/// // Inside an async block or function.
/// let handle = Handle::current();
/// handle.spawn(async {
/// println!("now running in the existing Runtime");
/// });
///
/// # let handle =
/// thread::spawn(move || {
/// // Notice that the handle is created outside of this thread and then moved in
/// handle.spawn(async { /* ... */ });
/// // This next line would cause a panic because we haven't entered the runtime
/// // and created an EnterGuard
/// // let handle2 = Handle::current(); // panic
/// // So we create a guard here with Handle::enter();
/// let _guard = handle.enter();
/// // Now we can call Handle::current();
/// let handle2 = Handle::current();
/// });
/// # handle.join().unwrap();
/// # });
/// # }
/// ```
pub fn current() -> Self {
context::current()
}
/// Returns a Handle view over the currently running Runtime
///
/// Returns an error if no Runtime has been started
///
/// Contrary to `current`, this never panics
pub fn try_current() -> Result<Self, TryCurrentError> {
context::try_current()
}
/// Spawns a future onto the Tokio runtime.
///
/// This spawns the given future onto the runtime's executor, usually a
/// thread pool. The thread pool is then responsible for polling the future
/// until it completes.
///
/// See [module level][mod] documentation for more details.
///
/// [mod]: index.html
///
/// # Examples
///
/// ```
/// use tokio::runtime::Runtime;
///
/// # fn dox() {
/// // Create the runtime
/// let rt = Runtime::new().unwrap();
/// // Get a handle from this runtime
/// let handle = rt.handle();
///
/// // Spawn a future onto the runtime using the handle
/// handle.spawn(async {
/// println!("now running on a worker thread");
/// });
/// # }
/// ```
#[track_caller]
pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
self.spawn_named(future, None)
}
/// Runs the provided function on an executor dedicated to blocking.
/// operations.
///
/// # Examples
///
/// ```
/// use tokio::runtime::Runtime;
///
/// # fn dox() {
/// // Create the runtime
/// let rt = Runtime::new().unwrap();
/// // Get a handle from this runtime
/// let handle = rt.handle();
///
/// // Spawn a blocking function onto the runtime using the handle
/// handle.spawn_blocking(|| {
/// println!("now running on a worker thread");
/// });
/// # }
#[track_caller]
pub fn spawn_blocking<F, R>(&self, func: F) -> JoinHandle<R>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
self.as_inner().spawn_blocking(self, func)
}
pub(crate) fn as_inner(&self) -> &HandleInner {
self.spawner.as_handle_inner()
}
/// Runs a future to completion on this `Handle`'s associated `Runtime`.
///
/// This runs the given future on the current thread, blocking until it is
/// complete, and yielding its resolved result. Any tasks or timers which
/// the future spawns internally will be executed on the runtime.
///
/// When this is used on a `current_thread` runtime, only the
/// [`Runtime::block_on`] method can drive the IO and timer drivers, but the
/// `Handle::block_on` method cannot drive them. This means that, when using
/// this method on a current_thread runtime, anything that relies on IO or
/// timers will not work unless there is another thread currently calling
/// [`Runtime::block_on`] on the same runtime.
///
/// # If the runtime has been shut down
///
/// If the `Handle`'s associated `Runtime` has been shut down (through
/// [`Runtime::shutdown_background`], [`Runtime::shutdown_timeout`], or by
/// dropping it) and `Handle::block_on` is used it might return an error or
/// panic. Specifically IO resources will return an error and timers will
/// panic. Runtime independent futures will run as normal.
///
/// # Panics
///
/// This function panics if the provided future panics, if called within an
/// asynchronous execution context, or if a timer future is executed on a
/// runtime that has been shut down.
///
/// # Examples
///
/// ```
/// use tokio::runtime::Runtime;
///
/// // Create the runtime
/// let rt = Runtime::new().unwrap();
///
/// // Get a handle from this runtime
/// let handle = rt.handle();
///
/// // Execute the future, blocking the current thread until completion
/// handle.block_on(async {
/// println!("hello");
/// });
/// ```
///
/// Or using `Handle::current`:
///
/// ```
/// use tokio::runtime::Handle;
///
/// #[tokio::main]
/// async fn main () {
/// let handle = Handle::current();
/// std::thread::spawn(move || {
/// // Using Handle::block_on to run async code in the new thread.
/// handle.block_on(async {
/// println!("hello");
/// });
/// });
/// }
/// ```
///
/// [`JoinError`]: struct@crate::task::JoinError
/// [`JoinHandle`]: struct@crate::task::JoinHandle
/// [`Runtime::block_on`]: fn@crate::runtime::Runtime::block_on
/// [`Runtime::shutdown_background`]: fn@crate::runtime::Runtime::shutdown_background
/// [`Runtime::shutdown_timeout`]: fn@crate::runtime::Runtime::shutdown_timeout
/// [`spawn_blocking`]: crate::task::spawn_blocking
/// [`tokio::fs`]: crate::fs
/// [`tokio::net`]: crate::net
/// [`tokio::time`]: crate::time
#[track_caller]
pub fn block_on<F: Future>(&self, future: F) -> F::Output {
#[cfg(all(tokio_unstable, feature = "tracing"))]
let future =
crate::util::trace::task(future, "block_on", None, super::task::Id::next().as_u64());
// Enter the **runtime** context. This configures spawning, the current I/O driver, ...
let _rt_enter = self.enter();
// Enter a **blocking** context. This prevents blocking from a runtime.
let mut blocking_enter = crate::runtime::enter(true);
// Block on the future
blocking_enter
.block_on(future)
.expect("failed to park thread")
}
#[track_caller]
pub(crate) fn spawn_named<F>(&self, future: F, _name: Option<&str>) -> JoinHandle<F::Output>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
let id = crate::runtime::task::Id::next();
#[cfg(all(tokio_unstable, feature = "tracing"))]
let future = crate::util::trace::task(future, "task", _name, id.as_u64());
self.spawner.spawn(future, id)
}
pub(crate) fn shutdown(mut self) {
self.spawner.shutdown();
}
}
impl ToHandle for Handle {
fn to_handle(&self) -> Handle {
self.clone()
}
}
cfg_metrics! {
use crate::runtime::RuntimeMetrics;
impl Handle {
/// Returns a view that lets you get information about how the runtime
/// is performing.
pub fn metrics(&self) -> RuntimeMetrics {
RuntimeMetrics::new(self.clone())
}
}
}
impl HandleInner {
#[track_caller]
pub(crate) fn spawn_blocking<F, R>(&self, rt: &dyn ToHandle, func: F) -> JoinHandle<R>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
let (join_handle, _was_spawned) = if cfg!(debug_assertions)
&& std::mem::size_of::<F>() > 2048
{
self.spawn_blocking_inner(Box::new(func), blocking::Mandatory::NonMandatory, None, rt)
} else {
self.spawn_blocking_inner(func, blocking::Mandatory::NonMandatory, None, rt)
};
join_handle
}
cfg_fs! {
#[track_caller]
#[cfg_attr(any(
all(loom, not(test)), // the function is covered by loom tests
test
), allow(dead_code))]
pub(crate) fn spawn_mandatory_blocking<F, R>(&self, rt: &dyn ToHandle, func: F) -> Option<JoinHandle<R>>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
let (join_handle, was_spawned) = if cfg!(debug_assertions) && std::mem::size_of::<F>() > 2048 {
self.spawn_blocking_inner(
Box::new(func),
blocking::Mandatory::Mandatory,
None,
rt,
)
} else {
self.spawn_blocking_inner(
func,
blocking::Mandatory::Mandatory,
None,
rt,
)
};
if was_spawned {
Some(join_handle)
} else {
None
}
}
}
#[track_caller]
pub(crate) fn spawn_blocking_inner<F, R>(
&self,
func: F,
is_mandatory: blocking::Mandatory,
name: Option<&str>,
rt: &dyn ToHandle,
) -> (JoinHandle<R>, bool)
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
let fut = BlockingTask::new(func);
let id = super::task::Id::next();
#[cfg(all(tokio_unstable, feature = "tracing"))]
let fut = {
use tracing::Instrument;
let location = std::panic::Location::caller();
let span = tracing::trace_span!(
target: "tokio::task::blocking",
"runtime.spawn",
kind = %"blocking",
task.name = %name.unwrap_or_default(),
task.id = id.as_u64(),
"fn" = %std::any::type_name::<F>(),
spawn.location = %format_args!("{}:{}:{}", location.file(), location.line(), location.column()),
);
fut.instrument(span)
};
#[cfg(not(all(tokio_unstable, feature = "tracing")))]
let _ = name;
let (task, handle) = task::unowned(fut, NoopSchedule, id);
let spawned = self
.blocking_spawner
.spawn(blocking::Task::new(task, is_mandatory), rt);
(handle, spawned.is_ok())
}
}
/// Error returned by `try_current` when no Runtime has been started
#[derive(Debug)]
pub struct TryCurrentError {
kind: TryCurrentErrorKind,
}
impl TryCurrentError {
pub(crate) fn new_no_context() -> Self {
Self {
kind: TryCurrentErrorKind::NoContext,
}
}
pub(crate) fn new_thread_local_destroyed() -> Self {
Self {
kind: TryCurrentErrorKind::ThreadLocalDestroyed,
}
}
/// Returns true if the call failed because there is currently no runtime in
/// the Tokio context.
pub fn is_missing_context(&self) -> bool {
matches!(self.kind, TryCurrentErrorKind::NoContext)
}
/// Returns true if the call failed because the Tokio context thread-local
/// had been destroyed. This can usually only happen if in the destructor of
/// other thread-locals.
pub fn is_thread_local_destroyed(&self) -> bool {
matches!(self.kind, TryCurrentErrorKind::ThreadLocalDestroyed)
}
}
enum TryCurrentErrorKind {
NoContext,
ThreadLocalDestroyed,
}
impl fmt::Debug for TryCurrentErrorKind {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
use TryCurrentErrorKind::*;
match self {
NoContext => f.write_str("NoContext"),
ThreadLocalDestroyed => f.write_str("ThreadLocalDestroyed"),
}
}
}
impl fmt::Display for TryCurrentError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
use TryCurrentErrorKind::*;
match self.kind {
NoContext => f.write_str(CONTEXT_MISSING_ERROR),
ThreadLocalDestroyed => f.write_str(THREAD_LOCAL_DESTROYED_ERROR),
}
}
}
impl error::Error for TryCurrentError {}