blob: 35523b472a5cbb0e5d18308c4e9c061ed8a4af45 [file] [log] [blame]
//! The task system.
//!
//! A [`Task`] handle represents a spawned future that is run by the executor.
use std::fmt::Debug;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use crate::blocking::BlockingExecutor;
use crate::thread_local::ThreadLocalExecutor;
use crate::work_stealing::WorkStealingExecutor;
/// A runnable future, ready for execution.
///
/// When a future is internally spawned using `async_task::spawn()` or `async_task::spawn_local()`,
/// we get back two values:
///
/// 1. an `async_task::Task<()>`, which we refer to as a `Runnable`
/// 2. an `async_task::JoinHandle<T, ()>`, which is wrapped inside a `Task<T>`
///
/// Once a `Runnable` is run, it "vanishes" and only reappears when its future is woken. When it's
/// woken up, its schedule function is called, which means the `Runnable` gets pushed into a task
/// queue in an executor.
pub(crate) type Runnable = async_task::Task<()>;
/// A spawned future.
///
/// Tasks are also futures themselves and yield the output of the spawned future.
///
/// When a task is dropped, its gets canceled and won't be polled again. To cancel a task a bit
/// more gracefully and wait until it stops running, use the [`cancel()`][Task::cancel()] method.
///
/// Tasks that panic get immediately canceled. Awaiting a canceled task also causes a panic.
///
/// If the future panics, the panic will be unwound into the [`run()`] invocation that polled it.
/// However, this does not apply to the blocking executor - it will simply ignore panics and
/// continue running.
///
/// # Examples
///
/// ```
/// use smol::Task;
///
/// # smol::run(async {
/// // Spawn a task onto the work-stealing executor.
/// let task = Task::spawn(async {
/// println!("Hello from a task!");
/// 1 + 2
/// });
///
/// // Wait for the task to complete.
/// assert_eq!(task.await, 3);
/// # });
/// ```
///
/// [`run()`]: crate::run()
#[must_use = "tasks get canceled when dropped, use `.detach()` to run them in the background"]
#[derive(Debug)]
pub struct Task<T>(pub(crate) Option<async_task::JoinHandle<T, ()>>);
impl<T: 'static> Task<T> {
/// Spawns a future onto the thread-local executor.
///
/// Panics if the current thread is not inside an invocation of [`run()`].
///
/// # Examples
///
/// ```
/// use smol::Task;
///
/// # smol::run(async {
/// let task = Task::local(async { 1 + 2 });
/// assert_eq!(task.await, 3);
/// # })
/// ```
///
/// [`run()`]: crate::run()
pub fn local(future: impl Future<Output = T> + 'static) -> Task<T> {
ThreadLocalExecutor::spawn(future)
}
}
impl<T: Send + 'static> Task<T> {
/// Spawns a future onto the work-stealing executor.
///
/// This future may be stolen and polled by any thread calling [`run()`].
///
/// # Examples
///
/// ```
/// use smol::Task;
///
/// # smol::run(async {
/// let task = Task::spawn(async { 1 + 2 });
/// assert_eq!(task.await, 3);
/// # });
/// ```
///
/// [`run()`]: crate::run()
pub fn spawn(future: impl Future<Output = T> + Send + 'static) -> Task<T> {
WorkStealingExecutor::get().spawn(future)
}
/// Spawns a future onto the blocking executor.
///
/// This future is allowed to block for an indefinite length of time.
///
/// For convenience, there is also the [`blocking!`] macro that spawns a blocking tasks and
/// immediately awaits it.
///
/// # Examples
///
/// Read a line from the standard input:
///
/// ```no_run
/// use smol::Task;
/// use std::io::stdin;
///
/// # smol::block_on(async {
/// let line = Task::blocking(async {
/// let mut line = String::new();
/// std::io::stdin().read_line(&mut line).unwrap();
/// line
/// })
/// .await;
/// # });
/// ```
///
/// See also examples for [`blocking!`], [`iter()`], [`reader()`], and [`writer()`].
///
/// [`iter()`]: `crate::iter()`
/// [`reader()`]: `crate::reader()`
/// [`writer()`]: `crate::writer()`
pub fn blocking(future: impl Future<Output = T> + Send + 'static) -> Task<T> {
BlockingExecutor::get().spawn(future)
}
}
impl<T, E> Task<Result<T, E>>
where
T: Send + 'static,
E: Debug + Send + 'static,
{
/// Spawns a new task that awaits and unwraps the result.
///
/// The new task will panic if the original task results in an error.
///
/// # Examples
///
/// ```
/// use smol::{Async, Task};
/// use std::net::TcpStream;
///
/// # smol::run(async {
/// let stream = Task::spawn(async {
/// Async::<TcpStream>::connect("example.com:80").await
/// })
/// .unwrap()
/// .await;
/// # })
/// ```
pub fn unwrap(self) -> Task<T> {
Task::spawn(async { self.await.unwrap() })
}
/// Spawns a new task that awaits and unwraps the result.
///
/// The new task will panic with the provided message if the original task results in an error.
///
/// # Examples
///
/// ```
/// use smol::{Async, Task};
/// use std::net::TcpStream;
///
/// # smol::run(async {
/// let stream = Task::spawn(async {
/// Async::<TcpStream>::connect("example.com:80").await
/// })
/// .expect("cannot connect")
/// .await;
/// # })
/// ```
pub fn expect(self, msg: &str) -> Task<T> {
let msg = msg.to_owned();
Task::spawn(async move { self.await.expect(&msg) })
}
}
impl Task<()> {
/// Detaches the task to let it keep running in the background.
///
/// # Examples
///
/// ```no_run
/// use smol::{Task, Timer};
/// use std::time::Duration;
///
/// # smol::run(async {
/// Task::spawn(async {
/// loop {
/// println!("I'm a daemon task looping forever.");
/// Timer::after(Duration::from_secs(1)).await;
/// }
/// })
/// .detach();
/// # })
/// ```
pub fn detach(mut self) {
self.0.take().unwrap();
}
}
impl<T> Task<T> {
/// Cancels the task and waits for it to stop running.
///
/// Returns the task's output if it was completed just before it got canceled, or `None` if it
/// didn't complete.
///
/// While it's possible to simply drop the [`Task`] to cancel it, this is a cleaner way of
/// canceling because it also waits for the task to stop running.
///
/// # Examples
///
/// ```
/// use smol::{Task, Timer};
/// use std::time::Duration;
///
/// # smol::run(async {
/// let task = Task::spawn(async {
/// loop {
/// println!("Even though I'm in an infinite loop, you can still cancel me!");
/// Timer::after(Duration::from_secs(1)).await;
/// }
/// });
///
/// Timer::after(Duration::from_secs(3)).await;
/// task.cancel().await;
/// # })
/// ```
pub async fn cancel(self) -> Option<T> {
// There's a bug in rustdoc causing it to render `mut self` as `__arg0: Self`, so we just
// do `{ self }` here to avoid marking `self` as mutable.
let handle = { self }.0.take().unwrap();
handle.cancel();
handle.await
}
}
impl<T> Drop for Task<T> {
fn drop(&mut self) {
if let Some(handle) = &self.0 {
handle.cancel();
}
}
}
impl<T> Future for Task<T> {
type Output = T;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match Pin::new(&mut self.0.as_mut().unwrap()).poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(output) => Poll::Ready(output.expect("task has failed")),
}
}
}
impl<T> Into<async_task::JoinHandle<T, ()>> for Task<T> {
fn into(mut self) -> async_task::JoinHandle<T, ()> {
self.0
.take()
.expect("task was already canceled or has failed")
}
}