| use std::future::Future; |
| use std::pin::Pin; |
| use std::sync::Arc; |
| use std::task::{Context, Poll}; |
| |
| use pin_project_lite::pin_project; |
| |
| use crate::io; |
| use crate::task::{JoinHandle, Task, TaskLocalsWrapper}; |
| |
| /// Task builder that configures the settings of a new task. |
| #[derive(Debug, Default)] |
| pub struct Builder { |
| pub(crate) name: Option<String>, |
| } |
| |
| impl Builder { |
| /// Creates a new builder. |
| #[inline] |
| pub fn new() -> Builder { |
| Builder { name: None } |
| } |
| |
| /// Configures the name of the task. |
| #[inline] |
| pub fn name(mut self, name: String) -> Builder { |
| self.name = Some(name); |
| self |
| } |
| |
| fn build<F, T>(self, future: F) -> SupportTaskLocals<F> |
| where |
| F: Future<Output = T>, |
| { |
| let name = self.name.map(Arc::new); |
| |
| // Create a new task handle. |
| let task = Task::new(name); |
| |
| #[cfg(not(target_os = "unknown"))] |
| once_cell::sync::Lazy::force(&crate::rt::RUNTIME); |
| |
| let tag = TaskLocalsWrapper::new(task.clone()); |
| |
| SupportTaskLocals { tag, future } |
| } |
| |
| /// Spawns a task with the configured settings. |
| #[cfg(not(target_os = "unknown"))] |
| pub fn spawn<F, T>(self, future: F) -> io::Result<JoinHandle<T>> |
| where |
| F: Future<Output = T> + Send + 'static, |
| T: Send + 'static, |
| { |
| let wrapped = self.build(future); |
| |
| kv_log_macro::trace!("spawn", { |
| task_id: wrapped.tag.id().0, |
| parent_task_id: TaskLocalsWrapper::get_current(|t| t.id().0).unwrap_or(0), |
| }); |
| |
| let task = wrapped.tag.task().clone(); |
| let smol_task = smol::Task::spawn(wrapped).into(); |
| |
| Ok(JoinHandle::new(smol_task, task)) |
| } |
| |
| /// Spawns a task locally with the configured settings. |
| #[cfg(all(not(target_os = "unknown"), feature = "unstable"))] |
| pub fn local<F, T>(self, future: F) -> io::Result<JoinHandle<T>> |
| where |
| F: Future<Output = T> + 'static, |
| T: 'static, |
| { |
| let wrapped = self.build(future); |
| |
| kv_log_macro::trace!("spawn_local", { |
| task_id: wrapped.tag.id().0, |
| parent_task_id: TaskLocalsWrapper::get_current(|t| t.id().0).unwrap_or(0), |
| }); |
| |
| let task = wrapped.tag.task().clone(); |
| let smol_task = smol::Task::local(wrapped).into(); |
| |
| Ok(JoinHandle::new(smol_task, task)) |
| } |
| |
| /// Spawns a task locally with the configured settings. |
| #[cfg(all(target_arch = "wasm32", feature = "unstable"))] |
| pub fn local<F, T>(self, future: F) -> io::Result<JoinHandle<T>> |
| where |
| F: Future<Output = T> + 'static, |
| T: 'static, |
| { |
| use futures_channel::oneshot::channel; |
| let (sender, receiver) = channel(); |
| |
| let wrapped = self.build(async move { |
| let res = future.await; |
| let _ = sender.send(res); |
| }); |
| kv_log_macro::trace!("spawn_local", { |
| task_id: wrapped.tag.id().0, |
| parent_task_id: TaskLocalsWrapper::get_current(|t| t.id().0).unwrap_or(0), |
| }); |
| |
| let task = wrapped.tag.task().clone(); |
| wasm_bindgen_futures::spawn_local(wrapped); |
| |
| Ok(JoinHandle::new(receiver, task)) |
| } |
| |
| /// Spawns a task locally with the configured settings. |
| #[cfg(all(target_arch = "wasm32", not(feature = "unstable")))] |
| pub(crate) fn local<F, T>(self, future: F) -> io::Result<JoinHandle<T>> |
| where |
| F: Future<Output = T> + 'static, |
| T: 'static, |
| { |
| use futures_channel::oneshot::channel; |
| let (sender, receiver) = channel(); |
| |
| let wrapped = self.build(async move { |
| let res = future.await; |
| let _ = sender.send(res); |
| }); |
| |
| kv_log_macro::trace!("spawn_local", { |
| task_id: wrapped.tag.id().0, |
| parent_task_id: TaskLocalsWrapper::get_current(|t| t.id().0).unwrap_or(0), |
| }); |
| |
| let task = wrapped.tag.task().clone(); |
| wasm_bindgen_futures::spawn_local(wrapped); |
| |
| Ok(JoinHandle::new(receiver, task)) |
| } |
| |
| /// Spawns a task with the configured settings, blocking on its execution. |
| #[cfg(not(target_os = "unknown"))] |
| pub fn blocking<F, T>(self, future: F) -> T |
| where |
| F: Future<Output = T>, |
| { |
| let wrapped = self.build(future); |
| |
| // Log this `block_on` operation. |
| kv_log_macro::trace!("block_on", { |
| task_id: wrapped.tag.id().0, |
| parent_task_id: TaskLocalsWrapper::get_current(|t| t.id().0).unwrap_or(0), |
| }); |
| |
| // Run the future as a task. |
| unsafe { TaskLocalsWrapper::set_current(&wrapped.tag, || smol::run(wrapped)) } |
| } |
| } |
| |
| pin_project! { |
| /// Wrapper to add support for task locals. |
| struct SupportTaskLocals<F> { |
| tag: TaskLocalsWrapper, |
| #[pin] |
| future: F, |
| } |
| } |
| |
| impl<F: Future> Future for SupportTaskLocals<F> { |
| type Output = F::Output; |
| |
| fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
| unsafe { |
| TaskLocalsWrapper::set_current(&self.tag, || { |
| let this = self.project(); |
| this.future.poll(cx) |
| }) |
| } |
| } |
| } |