| #![doc(html_root_url = "https://docs.rs/tokio-current-thread/0.1.6")] |
| #![deny(warnings, missing_docs, missing_debug_implementations)] |
| |
| //! A single-threaded executor which executes tasks on the same thread from which |
| //! they are spawned. |
| //! |
| //! |
| //! The crate provides: |
| //! |
| //! * [`CurrentThread`] is the main type of this crate. It executes tasks on the current thread. |
| //! The easiest way to start a new [`CurrentThread`] executor is to call |
| //! [`block_on_all`] with an initial task to seed the executor. |
| //! All tasks that are being managed by a [`CurrentThread`] executor are able to |
| //! spawn additional tasks by calling [`spawn`]. |
| //! |
| //! |
| //! Application authors will not use this crate directly. Instead, they will use the |
| //! `tokio` crate. Library authors should only depend on `tokio-current-thread` if they |
| //! are building a custom task executor. |
| //! |
| //! For more details, see [executor module] documentation in the Tokio crate. |
| //! |
| //! [`CurrentThread`]: struct.CurrentThread.html |
| //! [`spawn`]: fn.spawn.html |
| //! [`block_on_all`]: fn.block_on_all.html |
| //! [executor module]: https://docs.rs/tokio/0.1/tokio/executor/index.html |
| |
| extern crate futures; |
| extern crate tokio_executor; |
| |
| mod scheduler; |
| |
| use self::scheduler::Scheduler; |
| |
| use tokio_executor::park::{Park, ParkThread, Unpark}; |
| use tokio_executor::{Enter, SpawnError}; |
| |
| use futures::future::{ExecuteError, ExecuteErrorKind, Executor}; |
| use futures::{executor, Async, Future}; |
| |
| use std::cell::Cell; |
| use std::error::Error; |
| use std::fmt; |
| use std::rc::Rc; |
| use std::sync::{atomic, mpsc, Arc}; |
| use std::thread; |
| use std::time::{Duration, Instant}; |
| |
| /// Executes tasks on the current thread |
| pub struct CurrentThread<P: Park = ParkThread> { |
| /// Execute futures and receive unpark notifications. |
| scheduler: Scheduler<P::Unpark>, |
| |
| /// Current number of futures being executed. |
| /// |
| /// The LSB is used to indicate that the runtime is preparing to shut down. |
| /// Thus, to get the actual number of pending futures, `>>1`. |
| num_futures: Arc<atomic::AtomicUsize>, |
| |
| /// Thread park handle |
| park: P, |
| |
| /// Handle for spawning new futures from other threads |
| spawn_handle: Handle, |
| |
| /// Receiver for futures spawned from other threads |
| spawn_receiver: mpsc::Receiver<Box<Future<Item = (), Error = ()> + Send + 'static>>, |
| |
| /// The thread-local ID assigned to this executor. |
| id: u64, |
| } |
| |
| /// Executes futures on the current thread. |
| /// |
| /// All futures executed using this executor will be executed on the current |
| /// thread. As such, `run` will wait for these futures to complete before |
| /// returning. |
| /// |
| /// For more details, see the [module level](index.html) documentation. |
| #[derive(Debug, Clone)] |
| pub struct TaskExecutor { |
| // Prevent the handle from moving across threads. |
| _p: ::std::marker::PhantomData<Rc<()>>, |
| } |
| |
| /// Returned by the `turn` function. |
| #[derive(Debug)] |
| pub struct Turn { |
| polled: bool, |
| } |
| |
| impl Turn { |
| /// `true` if any futures were polled at all and `false` otherwise. |
| pub fn has_polled(&self) -> bool { |
| self.polled |
| } |
| } |
| |
| /// A `CurrentThread` instance bound to a supplied execution context. |
| pub struct Entered<'a, P: Park + 'a> { |
| executor: &'a mut CurrentThread<P>, |
| enter: &'a mut Enter, |
| } |
| |
| /// Error returned by the `run` function. |
| #[derive(Debug)] |
| pub struct RunError { |
| _p: (), |
| } |
| |
| impl fmt::Display for RunError { |
| fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { |
| write!(fmt, "{}", self.description()) |
| } |
| } |
| |
| impl Error for RunError { |
| fn description(&self) -> &str { |
| "Run error" |
| } |
| } |
| |
| /// Error returned by the `run_timeout` function. |
| #[derive(Debug)] |
| pub struct RunTimeoutError { |
| timeout: bool, |
| } |
| |
| impl fmt::Display for RunTimeoutError { |
| fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { |
| write!(fmt, "{}", self.description()) |
| } |
| } |
| |
| impl Error for RunTimeoutError { |
| fn description(&self) -> &str { |
| if self.timeout { |
| "Run timeout error (timeout)" |
| } else { |
| "Run timeout error (not timeout)" |
| } |
| } |
| } |
| |
| /// Error returned by the `turn` function. |
| #[derive(Debug)] |
| pub struct TurnError { |
| _p: (), |
| } |
| |
| impl fmt::Display for TurnError { |
| fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { |
| write!(fmt, "{}", self.description()) |
| } |
| } |
| |
| impl Error for TurnError { |
| fn description(&self) -> &str { |
| "Turn error" |
| } |
| } |
| |
| /// Error returned by the `block_on` function. |
| #[derive(Debug)] |
| pub struct BlockError<T> { |
| inner: Option<T>, |
| } |
| |
| impl<T> fmt::Display for BlockError<T> { |
| fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { |
| write!(fmt, "Block error") |
| } |
| } |
| |
| impl<T: fmt::Debug> Error for BlockError<T> { |
| fn description(&self) -> &str { |
| "Block error" |
| } |
| } |
| |
| /// This is mostly split out to make the borrow checker happy. |
| struct Borrow<'a, U: 'a> { |
| id: u64, |
| scheduler: &'a mut Scheduler<U>, |
| num_futures: &'a atomic::AtomicUsize, |
| } |
| |
| trait SpawnLocal { |
| fn spawn_local(&mut self, future: Box<Future<Item = (), Error = ()>>, already_counted: bool); |
| } |
| |
| struct CurrentRunner { |
| spawn: Cell<Option<*mut SpawnLocal>>, |
| id: Cell<Option<u64>>, |
| } |
| |
| thread_local! { |
| /// Current thread's task runner. This is set in `TaskRunner::with` |
| static CURRENT: CurrentRunner = CurrentRunner { |
| spawn: Cell::new(None), |
| id: Cell::new(None), |
| } |
| } |
| |
| thread_local! { |
| /// Unique ID to assign to each new executor launched on this thread. |
| /// |
| /// The unique ID is used to determine if the currently running executor matches the one |
| /// referred to by a `Handle` so that direct task dispatch can be used. |
| static EXECUTOR_ID: Cell<u64> = Cell::new(0) |
| } |
| |
| /// Run the executor bootstrapping the execution with the provided future. |
| /// |
| /// This creates a new [`CurrentThread`] executor, spawns the provided future, |
| /// and blocks the current thread until the provided future and **all** |
| /// subsequently spawned futures complete. In other words: |
| /// |
| /// * If the provided bootstrap future does **not** spawn any additional tasks, |
| /// `block_on_all` returns once `future` completes. |
| /// * If the provided bootstrap future **does** spawn additional tasks, then |
| /// `block_on_all` returns once **all** spawned futures complete. |
| /// |
| /// See [module level][mod] documentation for more details. |
| /// |
| /// [`CurrentThread`]: struct.CurrentThread.html |
| /// [mod]: index.html |
| pub fn block_on_all<F>(future: F) -> Result<F::Item, F::Error> |
| where |
| F: Future, |
| { |
| let mut current_thread = CurrentThread::new(); |
| |
| let ret = current_thread.block_on(future); |
| current_thread.run().unwrap(); |
| |
| ret.map_err(|e| e.into_inner().expect("unexpected execution error")) |
| } |
| |
| /// Executes a future on the current thread. |
| /// |
| /// The provided future must complete or be canceled before `run` will return. |
| /// |
| /// Unlike [`tokio::spawn`], this function will always spawn on a |
| /// `CurrentThread` executor and is able to spawn futures that are not `Send`. |
| /// |
| /// # Panics |
| /// |
| /// This function can only be invoked from the context of a `run` call; any |
| /// other use will result in a panic. |
| /// |
| /// [`tokio::spawn`]: ../fn.spawn.html |
| pub fn spawn<F>(future: F) |
| where |
| F: Future<Item = (), Error = ()> + 'static, |
| { |
| TaskExecutor::current() |
| .spawn_local(Box::new(future)) |
| .unwrap(); |
| } |
| |
| // ===== impl CurrentThread ===== |
| |
| impl CurrentThread<ParkThread> { |
| /// Create a new instance of `CurrentThread`. |
| pub fn new() -> Self { |
| CurrentThread::new_with_park(ParkThread::new()) |
| } |
| } |
| |
| impl<P: Park> CurrentThread<P> { |
| /// Create a new instance of `CurrentThread` backed by the given park |
| /// handle. |
| pub fn new_with_park(park: P) -> Self { |
| let unpark = park.unpark(); |
| |
| let (spawn_sender, spawn_receiver) = mpsc::channel(); |
| let thread = thread::current().id(); |
| let id = EXECUTOR_ID.with(|idc| { |
| let id = idc.get(); |
| idc.set(id + 1); |
| id |
| }); |
| |
| let scheduler = Scheduler::new(unpark); |
| let notify = scheduler.notify(); |
| |
| let num_futures = Arc::new(atomic::AtomicUsize::new(0)); |
| |
| CurrentThread { |
| scheduler: scheduler, |
| num_futures: num_futures.clone(), |
| park, |
| id, |
| spawn_handle: Handle { |
| sender: spawn_sender, |
| num_futures: num_futures, |
| notify: notify, |
| shut_down: Cell::new(false), |
| thread: thread, |
| id, |
| }, |
| spawn_receiver: spawn_receiver, |
| } |
| } |
| |
| /// Returns `true` if the executor is currently idle. |
| /// |
| /// An idle executor is defined by not currently having any spawned tasks. |
| /// |
| /// Note that this method is inherently racy -- if a future is spawned from a remote `Handle`, |
| /// this method may return `true` even though there are more futures to be executed. |
| pub fn is_idle(&self) -> bool { |
| self.num_futures.load(atomic::Ordering::SeqCst) <= 1 |
| } |
| |
| /// Spawn the future on the executor. |
| /// |
| /// This internally queues the future to be executed once `run` is called. |
| pub fn spawn<F>(&mut self, future: F) -> &mut Self |
| where |
| F: Future<Item = (), Error = ()> + 'static, |
| { |
| self.borrow().spawn_local(Box::new(future), false); |
| self |
| } |
| |
| /// Synchronously waits for the provided `future` to complete. |
| /// |
| /// This function can be used to synchronously block the current thread |
| /// until the provided `future` has resolved either successfully or with an |
| /// error. The result of the future is then returned from this function |
| /// call. |
| /// |
| /// Note that this function will **also** execute any spawned futures on the |
| /// current thread, but will **not** block until these other spawned futures |
| /// have completed. |
| /// |
| /// The caller is responsible for ensuring that other spawned futures |
| /// complete execution. |
| pub fn block_on<F>(&mut self, future: F) -> Result<F::Item, BlockError<F::Error>> |
| where |
| F: Future, |
| { |
| let mut enter = tokio_executor::enter().expect("failed to start `current_thread::Runtime`"); |
| self.enter(&mut enter).block_on(future) |
| } |
| |
| /// Run the executor to completion, blocking the thread until **all** |
| /// spawned futures have completed. |
| pub fn run(&mut self) -> Result<(), RunError> { |
| let mut enter = tokio_executor::enter().expect("failed to start `current_thread::Runtime`"); |
| self.enter(&mut enter).run() |
| } |
| |
| /// Run the executor to completion, blocking the thread until all |
| /// spawned futures have completed **or** `duration` time has elapsed. |
| pub fn run_timeout(&mut self, duration: Duration) -> Result<(), RunTimeoutError> { |
| let mut enter = tokio_executor::enter().expect("failed to start `current_thread::Runtime`"); |
| self.enter(&mut enter).run_timeout(duration) |
| } |
| |
| /// Perform a single iteration of the event loop. |
| /// |
| /// This function blocks the current thread even if the executor is idle. |
| pub fn turn(&mut self, duration: Option<Duration>) -> Result<Turn, TurnError> { |
| let mut enter = tokio_executor::enter().expect("failed to start `current_thread::Runtime`"); |
| self.enter(&mut enter).turn(duration) |
| } |
| |
| /// Bind `CurrentThread` instance with an execution context. |
| pub fn enter<'a>(&'a mut self, enter: &'a mut Enter) -> Entered<'a, P> { |
| Entered { |
| executor: self, |
| enter, |
| } |
| } |
| |
| /// Returns a reference to the underlying `Park` instance. |
| pub fn get_park(&self) -> &P { |
| &self.park |
| } |
| |
| /// Returns a mutable reference to the underlying `Park` instance. |
| pub fn get_park_mut(&mut self) -> &mut P { |
| &mut self.park |
| } |
| |
| fn borrow(&mut self) -> Borrow<P::Unpark> { |
| Borrow { |
| id: self.id, |
| scheduler: &mut self.scheduler, |
| num_futures: &*self.num_futures, |
| } |
| } |
| |
| /// Get a new handle to spawn futures on the executor |
| /// |
| /// Different to the executor itself, the handle can be sent to different |
| /// threads and can be used to spawn futures on the executor. |
| pub fn handle(&self) -> Handle { |
| self.spawn_handle.clone() |
| } |
| } |
| |
| impl<P: Park> Drop for CurrentThread<P> { |
| fn drop(&mut self) { |
| // Signal to Handles that no more futures can be spawned by setting LSB. |
| // |
| // NOTE: this isn't technically necessary since the send on the mpsc will fail once the |
| // receiver is dropped, but it's useful to illustrate how clean shutdown will be |
| // implemented (e.g., by setting the LSB). |
| let pending = self.num_futures.fetch_add(1, atomic::Ordering::SeqCst); |
| |
| // TODO: We currently ignore any pending futures at the time we shut down. |
| // |
| // The "proper" fix for this is to have an explicit shutdown phase (`shutdown_on_idle`) |
| // which sets LSB (as above) do make Handle::spawn stop working, and then runs until |
| // num_futures.load() == 1. |
| let _ = pending; |
| } |
| } |
| |
| impl tokio_executor::Executor for CurrentThread { |
| fn spawn( |
| &mut self, |
| future: Box<Future<Item = (), Error = ()> + Send>, |
| ) -> Result<(), SpawnError> { |
| self.borrow().spawn_local(future, false); |
| Ok(()) |
| } |
| } |
| |
| impl<T> tokio_executor::TypedExecutor<T> for CurrentThread |
| where |
| T: Future<Item = (), Error = ()> + 'static, |
| { |
| fn spawn(&mut self, future: T) -> Result<(), SpawnError> { |
| self.borrow().spawn_local(Box::new(future), false); |
| Ok(()) |
| } |
| } |
| |
| impl<P: Park> fmt::Debug for CurrentThread<P> { |
| fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { |
| fmt.debug_struct("CurrentThread") |
| .field("scheduler", &self.scheduler) |
| .field( |
| "num_futures", |
| &self.num_futures.load(atomic::Ordering::SeqCst), |
| ) |
| .finish() |
| } |
| } |
| |
| // ===== impl Entered ===== |
| |
| impl<'a, P: Park> Entered<'a, P> { |
| /// Spawn the future on the executor. |
| /// |
| /// This internally queues the future to be executed once `run` is called. |
| pub fn spawn<F>(&mut self, future: F) -> &mut Self |
| where |
| F: Future<Item = (), Error = ()> + 'static, |
| { |
| self.executor.borrow().spawn_local(Box::new(future), false); |
| self |
| } |
| |
| /// Synchronously waits for the provided `future` to complete. |
| /// |
| /// This function can be used to synchronously block the current thread |
| /// until the provided `future` has resolved either successfully or with an |
| /// error. The result of the future is then returned from this function |
| /// call. |
| /// |
| /// Note that this function will **also** execute any spawned futures on the |
| /// current thread, but will **not** block until these other spawned futures |
| /// have completed. |
| /// |
| /// The caller is responsible for ensuring that other spawned futures |
| /// complete execution. |
| pub fn block_on<F>(&mut self, future: F) -> Result<F::Item, BlockError<F::Error>> |
| where |
| F: Future, |
| { |
| let mut future = executor::spawn(future); |
| let notify = self.executor.scheduler.notify(); |
| |
| loop { |
| let res = self |
| .executor |
| .borrow() |
| .enter(self.enter, || future.poll_future_notify(¬ify, 0)); |
| |
| match res { |
| Ok(Async::Ready(e)) => return Ok(e), |
| Err(e) => return Err(BlockError { inner: Some(e) }), |
| Ok(Async::NotReady) => {} |
| } |
| |
| self.tick(); |
| |
| if let Err(_) = self.executor.park.park() { |
| return Err(BlockError { inner: None }); |
| } |
| } |
| } |
| |
| /// Run the executor to completion, blocking the thread until **all** |
| /// spawned futures have completed. |
| pub fn run(&mut self) -> Result<(), RunError> { |
| self.run_timeout2(None).map_err(|_| RunError { _p: () }) |
| } |
| |
| /// Run the executor to completion, blocking the thread until all |
| /// spawned futures have completed **or** `duration` time has elapsed. |
| pub fn run_timeout(&mut self, duration: Duration) -> Result<(), RunTimeoutError> { |
| self.run_timeout2(Some(duration)) |
| } |
| |
| /// Perform a single iteration of the event loop. |
| /// |
| /// This function blocks the current thread even if the executor is idle. |
| pub fn turn(&mut self, duration: Option<Duration>) -> Result<Turn, TurnError> { |
| let res = if self.executor.scheduler.has_pending_futures() { |
| self.executor.park.park_timeout(Duration::from_millis(0)) |
| } else { |
| match duration { |
| Some(duration) => self.executor.park.park_timeout(duration), |
| None => self.executor.park.park(), |
| } |
| }; |
| |
| if res.is_err() { |
| return Err(TurnError { _p: () }); |
| } |
| |
| let polled = self.tick(); |
| |
| Ok(Turn { polled }) |
| } |
| |
| /// Returns a reference to the underlying `Park` instance. |
| pub fn get_park(&self) -> &P { |
| &self.executor.park |
| } |
| |
| /// Returns a mutable reference to the underlying `Park` instance. |
| pub fn get_park_mut(&mut self) -> &mut P { |
| &mut self.executor.park |
| } |
| |
| fn run_timeout2(&mut self, dur: Option<Duration>) -> Result<(), RunTimeoutError> { |
| if self.executor.is_idle() { |
| // Nothing to do |
| return Ok(()); |
| } |
| |
| let mut time = dur.map(|dur| (Instant::now() + dur, dur)); |
| |
| loop { |
| self.tick(); |
| |
| if self.executor.is_idle() { |
| return Ok(()); |
| } |
| |
| match time { |
| Some((until, rem)) => { |
| if let Err(_) = self.executor.park.park_timeout(rem) { |
| return Err(RunTimeoutError::new(false)); |
| } |
| |
| let now = Instant::now(); |
| |
| if now >= until { |
| return Err(RunTimeoutError::new(true)); |
| } |
| |
| time = Some((until, until - now)); |
| } |
| None => { |
| if let Err(_) = self.executor.park.park() { |
| return Err(RunTimeoutError::new(false)); |
| } |
| } |
| } |
| } |
| } |
| |
| /// Returns `true` if any futures were processed |
| fn tick(&mut self) -> bool { |
| // Spawn any futures that were spawned from other threads by manually |
| // looping over the receiver stream |
| |
| // FIXME: Slightly ugly but needed to make the borrow checker happy |
| let (mut borrow, spawn_receiver) = ( |
| Borrow { |
| id: self.executor.id, |
| scheduler: &mut self.executor.scheduler, |
| num_futures: &*self.executor.num_futures, |
| }, |
| &mut self.executor.spawn_receiver, |
| ); |
| |
| while let Ok(future) = spawn_receiver.try_recv() { |
| borrow.spawn_local(future, true); |
| } |
| |
| // After any pending futures were scheduled, do the actual tick |
| borrow |
| .scheduler |
| .tick(borrow.id, &mut *self.enter, borrow.num_futures) |
| } |
| } |
| |
| impl<'a, P: Park> fmt::Debug for Entered<'a, P> { |
| fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { |
| fmt.debug_struct("Entered") |
| .field("executor", &self.executor) |
| .field("enter", &self.enter) |
| .finish() |
| } |
| } |
| |
| // ===== impl Handle ===== |
| |
| /// Handle to spawn a future on the corresponding `CurrentThread` instance |
| #[derive(Clone)] |
| pub struct Handle { |
| sender: mpsc::Sender<Box<Future<Item = (), Error = ()> + Send + 'static>>, |
| num_futures: Arc<atomic::AtomicUsize>, |
| shut_down: Cell<bool>, |
| notify: executor::NotifyHandle, |
| thread: thread::ThreadId, |
| |
| /// The thread-local ID assigned to this Handle's executor. |
| id: u64, |
| } |
| |
| // Manual implementation because the Sender does not implement Debug |
| impl fmt::Debug for Handle { |
| fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { |
| fmt.debug_struct("Handle") |
| .field("shut_down", &self.shut_down.get()) |
| .finish() |
| } |
| } |
| |
| impl Handle { |
| /// Spawn a future onto the `CurrentThread` instance corresponding to this handle |
| /// |
| /// # Panics |
| /// |
| /// This function panics if the spawn fails. Failure occurs if the `CurrentThread` |
| /// instance of the `Handle` does not exist anymore. |
| pub fn spawn<F>(&self, future: F) -> Result<(), SpawnError> |
| where |
| F: Future<Item = (), Error = ()> + Send + 'static, |
| { |
| if thread::current().id() == self.thread { |
| let mut e = TaskExecutor::current(); |
| if e.id() == Some(self.id) { |
| return e.spawn_local(Box::new(future)); |
| } |
| } |
| |
| if self.shut_down.get() { |
| return Err(SpawnError::shutdown()); |
| } |
| |
| // NOTE: += 2 since LSB is the shutdown bit |
| let pending = self.num_futures.fetch_add(2, atomic::Ordering::SeqCst); |
| if pending % 2 == 1 { |
| // Bring the count back so we still know when the Runtime is idle. |
| self.num_futures.fetch_sub(2, atomic::Ordering::SeqCst); |
| |
| // Once the Runtime is shutting down, we know it won't come back. |
| self.shut_down.set(true); |
| |
| return Err(SpawnError::shutdown()); |
| } |
| |
| self.sender |
| .send(Box::new(future)) |
| .expect("CurrentThread does not exist anymore"); |
| // use 0 for the id, CurrentThread does not make use of it |
| self.notify.notify(0); |
| Ok(()) |
| } |
| |
| /// Provides a best effort **hint** to whether or not `spawn` will succeed. |
| /// |
| /// This function may return both false positives **and** false negatives. |
| /// If `status` returns `Ok`, then a call to `spawn` will *probably* |
| /// succeed, but may fail. If `status` returns `Err`, a call to `spawn` will |
| /// *probably* fail, but may succeed. |
| /// |
| /// This allows a caller to avoid creating the task if the call to `spawn` |
| /// has a high likelihood of failing. |
| pub fn status(&self) -> Result<(), SpawnError> { |
| if self.shut_down.get() { |
| return Err(SpawnError::shutdown()); |
| } |
| |
| Ok(()) |
| } |
| } |
| |
| // ===== impl TaskExecutor ===== |
| |
| impl TaskExecutor { |
| /// Returns an executor that executes futures on the current thread. |
| /// |
| /// The user of `TaskExecutor` must ensure that when a future is submitted, |
| /// that it is done within the context of a call to `run`. |
| /// |
| /// For more details, see the [module level](index.html) documentation. |
| pub fn current() -> TaskExecutor { |
| TaskExecutor { |
| _p: ::std::marker::PhantomData, |
| } |
| } |
| |
| /// Get the current executor's thread-local ID. |
| fn id(&self) -> Option<u64> { |
| CURRENT.with(|current| current.id.get()) |
| } |
| |
| /// Spawn a future onto the current `CurrentThread` instance. |
| pub fn spawn_local( |
| &mut self, |
| future: Box<Future<Item = (), Error = ()>>, |
| ) -> Result<(), SpawnError> { |
| CURRENT.with(|current| match current.spawn.get() { |
| Some(spawn) => { |
| unsafe { (*spawn).spawn_local(future, false) }; |
| Ok(()) |
| } |
| None => Err(SpawnError::shutdown()), |
| }) |
| } |
| } |
| |
| impl tokio_executor::Executor for TaskExecutor { |
| fn spawn( |
| &mut self, |
| future: Box<Future<Item = (), Error = ()> + Send>, |
| ) -> Result<(), SpawnError> { |
| self.spawn_local(future) |
| } |
| } |
| |
| impl<F> tokio_executor::TypedExecutor<F> for TaskExecutor |
| where |
| F: Future<Item = (), Error = ()> + 'static, |
| { |
| fn spawn(&mut self, future: F) -> Result<(), SpawnError> { |
| self.spawn_local(Box::new(future)) |
| } |
| } |
| |
| impl<F> Executor<F> for TaskExecutor |
| where |
| F: Future<Item = (), Error = ()> + 'static, |
| { |
| fn execute(&self, future: F) -> Result<(), ExecuteError<F>> { |
| CURRENT.with(|current| match current.spawn.get() { |
| Some(spawn) => { |
| unsafe { (*spawn).spawn_local(Box::new(future), false) }; |
| Ok(()) |
| } |
| None => Err(ExecuteError::new(ExecuteErrorKind::Shutdown, future)), |
| }) |
| } |
| } |
| |
| // ===== impl Borrow ===== |
| |
| impl<'a, U: Unpark> Borrow<'a, U> { |
| fn enter<F, R>(&mut self, _: &mut Enter, f: F) -> R |
| where |
| F: FnOnce() -> R, |
| { |
| CURRENT.with(|current| { |
| current.id.set(Some(self.id)); |
| current.set_spawn(self, || f()) |
| }) |
| } |
| } |
| |
| impl<'a, U: Unpark> SpawnLocal for Borrow<'a, U> { |
| fn spawn_local(&mut self, future: Box<Future<Item = (), Error = ()>>, already_counted: bool) { |
| if !already_counted { |
| // NOTE: we have a borrow of the Runtime, so we know that it isn't shut down. |
| // NOTE: += 2 since LSB is the shutdown bit |
| self.num_futures.fetch_add(2, atomic::Ordering::SeqCst); |
| } |
| self.scheduler.schedule(future); |
| } |
| } |
| |
| // ===== impl CurrentRunner ===== |
| |
| impl CurrentRunner { |
| fn set_spawn<F, R>(&self, spawn: &mut SpawnLocal, f: F) -> R |
| where |
| F: FnOnce() -> R, |
| { |
| struct Reset<'a>(&'a CurrentRunner); |
| |
| impl<'a> Drop for Reset<'a> { |
| fn drop(&mut self) { |
| self.0.spawn.set(None); |
| self.0.id.set(None); |
| } |
| } |
| |
| let _reset = Reset(self); |
| |
| let spawn = unsafe { hide_lt(spawn as *mut SpawnLocal) }; |
| self.spawn.set(Some(spawn)); |
| |
| f() |
| } |
| } |
| |
| unsafe fn hide_lt<'a>(p: *mut (SpawnLocal + 'a)) -> *mut (SpawnLocal + 'static) { |
| use std::mem; |
| mem::transmute(p) |
| } |
| |
| // ===== impl RunTimeoutError ===== |
| |
| impl RunTimeoutError { |
| fn new(timeout: bool) -> Self { |
| RunTimeoutError { timeout } |
| } |
| |
| /// Returns `true` if the error was caused by the operation timing out. |
| pub fn is_timeout(&self) -> bool { |
| self.timeout |
| } |
| } |
| |
| impl From<tokio_executor::EnterError> for RunTimeoutError { |
| fn from(_: tokio_executor::EnterError) -> Self { |
| RunTimeoutError::new(false) |
| } |
| } |
| |
| // ===== impl BlockError ===== |
| |
| impl<T> BlockError<T> { |
| /// Returns the error yielded by the future being blocked on |
| pub fn into_inner(self) -> Option<T> { |
| self.inner |
| } |
| } |
| |
| impl<T> From<tokio_executor::EnterError> for BlockError<T> { |
| fn from(_: tokio_executor::EnterError) -> Self { |
| BlockError { inner: None } |
| } |
| } |