| //! Opt-in yield points for improved cooperative scheduling. |
| //! |
| //! A single call to [`poll`] on a top-level task may potentially do a lot of |
| //! work before it returns `Poll::Pending`. If a task runs for a long period of |
| //! time without yielding back to the executor, it can starve other tasks |
| //! waiting on that executor to execute them, or drive underlying resources. |
| //! Since Rust does not have a runtime, it is difficult to forcibly preempt a |
| //! long-running task. Instead, this module provides an opt-in mechanism for |
| //! futures to collaborate with the executor to avoid starvation. |
| //! |
| //! Consider a future like this one: |
| //! |
| //! ``` |
| //! # use tokio::stream::{Stream, StreamExt}; |
| //! async fn drop_all<I: Stream + Unpin>(mut input: I) { |
| //! while let Some(_) = input.next().await {} |
| //! } |
| //! ``` |
| //! |
| //! It may look harmless, but consider what happens under heavy load if the |
| //! input stream is _always_ ready. If we spawn `drop_all`, the task will never |
| //! yield, and will starve other tasks and resources on the same executor. With |
| //! opt-in yield points, this problem is alleviated: |
| //! |
| //! ```ignore |
| //! # use tokio::stream::{Stream, StreamExt}; |
| //! async fn drop_all<I: Stream + Unpin>(mut input: I) { |
| //! while let Some(_) = input.next().await { |
| //! tokio::coop::proceed().await; |
| //! } |
| //! } |
| //! ``` |
| //! |
| //! The `proceed` future will coordinate with the executor to make sure that |
| //! every so often control is yielded back to the executor so it can run other |
| //! tasks. |
| //! |
| //! # Placing yield points |
| //! |
| //! Voluntary yield points should be placed _after_ at least some work has been |
| //! done. If they are not, a future sufficiently deep in the task hierarchy may |
| //! end up _never_ getting to run because of the number of yield points that |
| //! inevitably appear before it is reached. In general, you will want yield |
| //! points to only appear in "leaf" futures -- those that do not themselves poll |
| //! other futures. By doing this, you avoid double-counting each iteration of |
| //! the outer future against the cooperating budget. |
| //! |
| //! [`poll`]: https://doc.rust-lang.org/std/future/trait.Future.html#tymethod.poll |
| |
| // NOTE: The doctests in this module are ignored since the whole module is (currently) private. |
| |
| use std::cell::Cell; |
| |
| thread_local! { |
| static CURRENT: Cell<Budget> = Cell::new(Budget::unconstrained()); |
| } |
| |
| /// Opaque type tracking the amount of "work" a task may still do before |
| /// yielding back to the scheduler. |
| #[derive(Debug, Copy, Clone)] |
| pub(crate) struct Budget(Option<u8>); |
| |
| impl Budget { |
| /// Budget assigned to a task on each poll. |
| /// |
| /// The value itself is chosen somewhat arbitrarily. It needs to be high |
| /// enough to amortize wakeup and scheduling costs, but low enough that we |
| /// do not starve other tasks for too long. The value also needs to be high |
| /// enough that particularly deep tasks are able to do at least some useful |
| /// work at all. |
| /// |
| /// Note that as more yield points are added in the ecosystem, this value |
| /// will probably also have to be raised. |
| const fn initial() -> Budget { |
| Budget(Some(128)) |
| } |
| |
| /// Returns an unconstrained budget. Operations will not be limited. |
| const fn unconstrained() -> Budget { |
| Budget(None) |
| } |
| } |
| |
| cfg_rt_threaded! { |
| impl Budget { |
| fn has_remaining(self) -> bool { |
| self.0.map(|budget| budget > 0).unwrap_or(true) |
| } |
| } |
| } |
| |
| /// Run the given closure with a cooperative task budget. When the function |
| /// returns, the budget is reset to the value prior to calling the function. |
| #[inline(always)] |
| pub(crate) fn budget<R>(f: impl FnOnce() -> R) -> R { |
| with_budget(Budget::initial(), f) |
| } |
| |
| cfg_rt_threaded! { |
| /// Set the current task's budget |
| #[cfg(feature = "blocking")] |
| pub(crate) fn set(budget: Budget) { |
| CURRENT.with(|cell| cell.set(budget)) |
| } |
| } |
| |
| #[inline(always)] |
| fn with_budget<R>(budget: Budget, f: impl FnOnce() -> R) -> R { |
| struct ResetGuard<'a> { |
| cell: &'a Cell<Budget>, |
| prev: Budget, |
| } |
| |
| impl<'a> Drop for ResetGuard<'a> { |
| fn drop(&mut self) { |
| self.cell.set(self.prev); |
| } |
| } |
| |
| CURRENT.with(move |cell| { |
| let prev = cell.get(); |
| |
| cell.set(budget); |
| |
| let _guard = ResetGuard { cell, prev }; |
| |
| f() |
| }) |
| } |
| |
| cfg_rt_threaded! { |
| #[inline(always)] |
| pub(crate) fn has_budget_remaining() -> bool { |
| CURRENT.with(|cell| cell.get().has_remaining()) |
| } |
| } |
| |
| cfg_blocking_impl! { |
| /// Forcibly remove the budgeting constraints early. |
| /// |
| /// Returns the remaining budget |
| pub(crate) fn stop() -> Budget { |
| CURRENT.with(|cell| { |
| let prev = cell.get(); |
| cell.set(Budget::unconstrained()); |
| prev |
| }) |
| } |
| } |
| |
| cfg_coop! { |
| use std::task::{Context, Poll}; |
| |
| /// Returns `Poll::Pending` if the current task has exceeded its budget and should yield. |
| #[inline] |
| pub(crate) fn poll_proceed(cx: &mut Context<'_>) -> Poll<()> { |
| CURRENT.with(|cell| { |
| let mut budget = cell.get(); |
| |
| if budget.decrement() { |
| cell.set(budget); |
| Poll::Ready(()) |
| } else { |
| cx.waker().wake_by_ref(); |
| Poll::Pending |
| } |
| }) |
| } |
| |
| impl Budget { |
| /// Decrement the budget. Returns `true` if successful. Decrementing fails |
| /// when there is not enough remaining budget. |
| fn decrement(&mut self) -> bool { |
| if let Some(num) = &mut self.0 { |
| if *num > 0 { |
| *num -= 1; |
| true |
| } else { |
| false |
| } |
| } else { |
| true |
| } |
| } |
| } |
| } |
| |
| #[cfg(all(test, not(loom)))] |
| mod test { |
| use super::*; |
| |
| fn get() -> Budget { |
| CURRENT.with(|cell| cell.get()) |
| } |
| |
| #[test] |
| fn bugeting() { |
| use futures::future::poll_fn; |
| use tokio_test::*; |
| |
| assert!(get().0.is_none()); |
| |
| assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx))); |
| |
| assert!(get().0.is_none()); |
| |
| budget(|| { |
| assert_eq!(get().0, Budget::initial().0); |
| assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx))); |
| assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 1); |
| assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx))); |
| assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 2); |
| |
| budget(|| { |
| assert_eq!(get().0, Budget::initial().0); |
| |
| assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx))); |
| assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 1); |
| }); |
| |
| assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 2); |
| }); |
| |
| assert!(get().0.is_none()); |
| |
| budget(|| { |
| let n = get().0.unwrap(); |
| |
| for _ in 0..n { |
| assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx))); |
| } |
| |
| let mut task = task::spawn(poll_fn(|cx| { |
| ready!(poll_proceed(cx)); |
| Poll::Ready(()) |
| })); |
| |
| assert_pending!(task.poll()); |
| }); |
| } |
| } |