blob: 62e5f297126207d5631353dc7c80456cfdcbf446 [file] [log] [blame]
//! Throttle tasks if they poll too many I/O operations without yielding.
//!
//! This is used to prevent futures from running forever. Once a certain number of I/O operation is
//! hit in a single run, I/O operations will begin returning
//! [`Poll::Pending`][`std::task::Poll::Pending`] even if they're ready.
use std::cell::Cell;
use std::task::{Context, Poll};
use scoped_tls_hkt::scoped_thread_local;
scoped_thread_local! {
/// Number of times the current task is allowed to poll I/O operations.
///
/// When this budget is used up, I/O operations will wake the current task and return
/// [`Poll::Pending`][`std::task::Poll::Pending`].
///
/// This thread-local is set before running any task.
static BUDGET: Cell<u32>
}
/// Sets an I/O budget for polling a future.
///
/// Once this budget is exceeded, polled I/O operations will always wake the current task and
/// return [`Poll::Pending`][`std::task::Poll::Pending`].
///
/// We throttle I/O this way in order to prevent futures from running for
/// too long and thus starving other futures.
pub(crate) fn setup<T>(poll: impl FnOnce() -> T) -> T {
// This is a fairly arbitrary number that seems to work well in practice.
BUDGET.set(&Cell::new(200), poll)
}
/// Returns [`Poll::Pending`] if the I/O budget has been used up.
///
/// [`Poll::Pending`]: `std::task::Poll::Pending`
pub(crate) fn poll(cx: &mut Context<'_>) -> Poll<()> {
// Decrement the budget and check if it was zero.
if BUDGET.is_set() && BUDGET.with(|b| b.replace(b.get().saturating_sub(1))) == 0 {
// Make sure to wake the current task. The task is not *really* pending, we're just
// artificially throttling it to let other tasks be run.
cx.waker().wake_by_ref();
return Poll::Pending;
}
Poll::Ready(())
}