| use core::fmt; |
| use core::future::Future; |
| use core::marker::PhantomData; |
| use core::mem; |
| use core::ptr::NonNull; |
| use core::sync::atomic::Ordering; |
| use core::task::Waker; |
| |
| use crate::header::Header; |
| use crate::raw::RawTask; |
| use crate::state::*; |
| use crate::Task; |
| |
| /// Creates a new task. |
| /// |
| /// The returned [`Runnable`] is used to poll the `future`, and the [`Task`] is used to await its |
| /// output. |
| /// |
| /// Method [`run()`][`Runnable::run()`] polls the task's future once. Then, the [`Runnable`] |
| /// vanishes and only reappears when its [`Waker`] wakes the task, thus scheduling it to be run |
| /// again. |
| /// |
| /// When the task is woken, its [`Runnable`] is passed to the `schedule` function. |
| /// The `schedule` function should not attempt to run the [`Runnable`] nor to drop it. Instead, it |
| /// should push it into a task queue so that it can be processed later. |
| /// |
| /// If you need to spawn a future that does not implement [`Send`] or isn't `'static`, consider |
| /// using [`spawn_local()`] or [`spawn_unchecked()`] instead. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// // The future inside the task. |
| /// let future = async { |
| /// println!("Hello, world!"); |
| /// }; |
| /// |
| /// // A function that schedules the task when it gets woken up. |
| /// let (s, r) = flume::unbounded(); |
| /// let schedule = move |runnable| s.send(runnable).unwrap(); |
| /// |
| /// // Create a task with the future and the schedule function. |
| /// let (runnable, task) = async_task::spawn(future, schedule); |
| /// ``` |
| pub fn spawn<F, S>(future: F, schedule: S) -> (Runnable, Task<F::Output>) |
| where |
| F: Future + Send + 'static, |
| F::Output: Send + 'static, |
| S: Fn(Runnable) + Send + Sync + 'static, |
| { |
| unsafe { spawn_unchecked(future, schedule) } |
| } |
| |
| /// Creates a new thread-local task. |
| /// |
| /// This function is same as [`spawn()`], except it does not require [`Send`] on `future`. If the |
| /// [`Runnable`] is used or dropped on another thread, a panic will occur. |
| /// |
| /// This function is only available when the `std` feature for this crate is enabled. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// use async_task::Runnable; |
| /// use flume::{Receiver, Sender}; |
| /// use std::rc::Rc; |
| /// |
| /// thread_local! { |
| /// // A queue that holds scheduled tasks. |
| /// static QUEUE: (Sender<Runnable>, Receiver<Runnable>) = flume::unbounded(); |
| /// } |
| /// |
| /// // Make a non-Send future. |
| /// let msg: Rc<str> = "Hello, world!".into(); |
| /// let future = async move { |
| /// println!("{}", msg); |
| /// }; |
| /// |
| /// // A function that schedules the task when it gets woken up. |
| /// let s = QUEUE.with(|(s, _)| s.clone()); |
| /// let schedule = move |runnable| s.send(runnable).unwrap(); |
| /// |
| /// // Create a task with the future and the schedule function. |
| /// let (runnable, task) = async_task::spawn_local(future, schedule); |
| /// ``` |
| #[cfg(feature = "std")] |
| pub fn spawn_local<F, S>(future: F, schedule: S) -> (Runnable, Task<F::Output>) |
| where |
| F: Future + 'static, |
| F::Output: 'static, |
| S: Fn(Runnable) + Send + Sync + 'static, |
| { |
| use std::mem::ManuallyDrop; |
| use std::pin::Pin; |
| use std::task::{Context, Poll}; |
| use std::thread::{self, ThreadId}; |
| |
| #[inline] |
| fn thread_id() -> ThreadId { |
| thread_local! { |
| static ID: ThreadId = thread::current().id(); |
| } |
| ID.try_with(|id| *id) |
| .unwrap_or_else(|_| thread::current().id()) |
| } |
| |
| struct Checked<F> { |
| id: ThreadId, |
| inner: ManuallyDrop<F>, |
| } |
| |
| impl<F> Drop for Checked<F> { |
| fn drop(&mut self) { |
| assert!( |
| self.id == thread_id(), |
| "local task dropped by a thread that didn't spawn it" |
| ); |
| unsafe { |
| ManuallyDrop::drop(&mut self.inner); |
| } |
| } |
| } |
| |
| impl<F: Future> Future for Checked<F> { |
| type Output = F::Output; |
| |
| fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
| assert!( |
| self.id == thread_id(), |
| "local task polled by a thread that didn't spawn it" |
| ); |
| unsafe { self.map_unchecked_mut(|c| &mut *c.inner).poll(cx) } |
| } |
| } |
| |
| // Wrap the future into one that checks which thread it's on. |
| let future = Checked { |
| id: thread_id(), |
| inner: ManuallyDrop::new(future), |
| }; |
| |
| unsafe { spawn_unchecked(future, schedule) } |
| } |
| |
| /// Creates a new task without [`Send`], [`Sync`], and `'static` bounds. |
| /// |
| /// This function is same as [`spawn()`], except it does not require [`Send`], [`Sync`], and |
| /// `'static` on `future` and `schedule`. |
| /// |
| /// Safety requirements: |
| /// |
| /// - If `future` is not [`Send`], its [`Runnable`] must be used and dropped on the original |
| /// thread. |
| /// - If `future` is not `'static`, borrowed variables must outlive its [`Runnable`]. |
| /// - If `schedule` is not [`Send`] and [`Sync`], the task's [`Waker`] must be used and dropped on |
| /// the original thread. |
| /// - If `schedule` is not `'static`, borrowed variables must outlive the task's [`Waker`]. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// // The future inside the task. |
| /// let future = async { |
| /// println!("Hello, world!"); |
| /// }; |
| /// |
| /// // If the task gets woken up, it will be sent into this channel. |
| /// let (s, r) = flume::unbounded(); |
| /// let schedule = move |runnable| s.send(runnable).unwrap(); |
| /// |
| /// // Create a task with the future and the schedule function. |
| /// let (runnable, task) = unsafe { async_task::spawn_unchecked(future, schedule) }; |
| /// ``` |
| pub unsafe fn spawn_unchecked<F, S>(future: F, schedule: S) -> (Runnable, Task<F::Output>) |
| where |
| F: Future, |
| S: Fn(Runnable), |
| { |
| // Allocate large futures on the heap. |
| let ptr = if mem::size_of::<F>() >= 2048 { |
| let future = alloc::boxed::Box::pin(future); |
| RawTask::<_, F::Output, S>::allocate(future, schedule) |
| } else { |
| RawTask::<F, F::Output, S>::allocate(future, schedule) |
| }; |
| |
| let runnable = Runnable { ptr }; |
| let task = Task { |
| ptr, |
| _marker: PhantomData, |
| }; |
| (runnable, task) |
| } |
| |
| /// A handle to a runnable task. |
| /// |
| /// Every spawned task has a single [`Runnable`] handle, which only exists when the task is |
| /// scheduled for running. |
| /// |
| /// Method [`run()`][`Runnable::run()`] polls the task's future once. Then, the [`Runnable`] |
| /// vanishes and only reappears when its [`Waker`] wakes the task, thus scheduling it to be run |
| /// again. |
| /// |
| /// Dropping a [`Runnable`] cancels the task, which means its future won't be polled again, and |
| /// awaiting the [`Task`] after that will result in a panic. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// use async_task::Runnable; |
| /// use once_cell::sync::Lazy; |
| /// use std::{panic, thread}; |
| /// |
| /// // A simple executor. |
| /// static QUEUE: Lazy<flume::Sender<Runnable>> = Lazy::new(|| { |
| /// let (sender, receiver) = flume::unbounded::<Runnable>(); |
| /// thread::spawn(|| { |
| /// for runnable in receiver { |
| /// let _ignore_panic = panic::catch_unwind(|| runnable.run()); |
| /// } |
| /// }); |
| /// sender |
| /// }); |
| /// |
| /// // Create a task with a simple future. |
| /// let schedule = |runnable| QUEUE.send(runnable).unwrap(); |
| /// let (runnable, task) = async_task::spawn(async { 1 + 2 }, schedule); |
| /// |
| /// // Schedule the task and await its output. |
| /// runnable.schedule(); |
| /// assert_eq!(smol::future::block_on(task), 3); |
| /// ``` |
| pub struct Runnable { |
| /// A pointer to the heap-allocated task. |
| pub(crate) ptr: NonNull<()>, |
| } |
| |
| unsafe impl Send for Runnable {} |
| unsafe impl Sync for Runnable {} |
| |
| #[cfg(feature = "std")] |
| impl std::panic::UnwindSafe for Runnable {} |
| #[cfg(feature = "std")] |
| impl std::panic::RefUnwindSafe for Runnable {} |
| |
| impl Runnable { |
| /// Schedules the task. |
| /// |
| /// This is a convenience method that passes the [`Runnable`] to the schedule function. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// // A function that schedules the task when it gets woken up. |
| /// let (s, r) = flume::unbounded(); |
| /// let schedule = move |runnable| s.send(runnable).unwrap(); |
| /// |
| /// // Create a task with a simple future and the schedule function. |
| /// let (runnable, task) = async_task::spawn(async {}, schedule); |
| /// |
| /// // Schedule the task. |
| /// assert_eq!(r.len(), 0); |
| /// runnable.schedule(); |
| /// assert_eq!(r.len(), 1); |
| /// ``` |
| pub fn schedule(self) { |
| let ptr = self.ptr.as_ptr(); |
| let header = ptr as *const Header; |
| mem::forget(self); |
| |
| unsafe { |
| ((*header).vtable.schedule)(ptr); |
| } |
| } |
| |
| /// Runs the task by polling its future. |
| /// |
| /// Returns `true` if the task was woken while running, in which case the [`Runnable`] gets |
| /// rescheduled at the end of this method invocation. Otherwise, returns `false` and the |
| /// [`Runnable`] vanishes until the task is woken. |
| /// The return value is just a hint: `true` usually indicates that the task has yielded, i.e. |
| /// it woke itself and then gave the control back to the executor. |
| /// |
| /// If the [`Task`] handle was dropped or if [`cancel()`][`Task::cancel()`] was called, then |
| /// this method simply destroys the task. |
| /// |
| /// If the polled future panics, this method propagates the panic, and awaiting the [`Task`] |
| /// after that will also result in a panic. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// // A function that schedules the task when it gets woken up. |
| /// let (s, r) = flume::unbounded(); |
| /// let schedule = move |runnable| s.send(runnable).unwrap(); |
| /// |
| /// // Create a task with a simple future and the schedule function. |
| /// let (runnable, task) = async_task::spawn(async { 1 + 2 }, schedule); |
| /// |
| /// // Run the task and check its output. |
| /// runnable.run(); |
| /// assert_eq!(smol::future::block_on(task), 3); |
| /// ``` |
| pub fn run(self) -> bool { |
| let ptr = self.ptr.as_ptr(); |
| let header = ptr as *const Header; |
| mem::forget(self); |
| |
| unsafe { ((*header).vtable.run)(ptr) } |
| } |
| |
| /// Returns a waker associated with this task. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// use smol::future; |
| /// |
| /// // A function that schedules the task when it gets woken up. |
| /// let (s, r) = flume::unbounded(); |
| /// let schedule = move |runnable| s.send(runnable).unwrap(); |
| /// |
| /// // Create a task with a simple future and the schedule function. |
| /// let (runnable, task) = async_task::spawn(future::pending::<()>(), schedule); |
| /// |
| /// // Take a waker and run the task. |
| /// let waker = runnable.waker(); |
| /// runnable.run(); |
| /// |
| /// // Reschedule the task by waking it. |
| /// assert_eq!(r.len(), 0); |
| /// waker.wake(); |
| /// assert_eq!(r.len(), 1); |
| /// ``` |
| pub fn waker(&self) -> Waker { |
| let ptr = self.ptr.as_ptr(); |
| let header = ptr as *const Header; |
| |
| unsafe { |
| let raw_waker = ((*header).vtable.clone_waker)(ptr); |
| Waker::from_raw(raw_waker) |
| } |
| } |
| } |
| |
| impl Drop for Runnable { |
| fn drop(&mut self) { |
| let ptr = self.ptr.as_ptr(); |
| let header = ptr as *const Header; |
| |
| unsafe { |
| let mut state = (*header).state.load(Ordering::Acquire); |
| |
| loop { |
| // If the task has been completed or closed, it can't be canceled. |
| if state & (COMPLETED | CLOSED) != 0 { |
| break; |
| } |
| |
| // Mark the task as closed. |
| match (*header).state.compare_exchange_weak( |
| state, |
| state | CLOSED, |
| Ordering::AcqRel, |
| Ordering::Acquire, |
| ) { |
| Ok(_) => break, |
| Err(s) => state = s, |
| } |
| } |
| |
| // Drop the future. |
| ((*header).vtable.drop_future)(ptr); |
| |
| // Mark the task as unscheduled. |
| let state = (*header).state.fetch_and(!SCHEDULED, Ordering::AcqRel); |
| |
| // Notify the awaiter that the future has been dropped. |
| if state & AWAITER != 0 { |
| (*header).notify(None); |
| } |
| |
| // Drop the task reference. |
| ((*header).vtable.drop_ref)(ptr); |
| } |
| } |
| } |
| |
| impl fmt::Debug for Runnable { |
| fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
| let ptr = self.ptr.as_ptr(); |
| let header = ptr as *const Header; |
| |
| f.debug_struct("Runnable") |
| .field("header", unsafe { &(*header) }) |
| .finish() |
| } |
| } |