| use crate::loom::sync::atomic::AtomicU64; |
| use crate::sync::AtomicWaker; |
| use crate::time::driver::{Handle, Inner}; |
| use crate::time::{Duration, Error, Instant}; |
| |
| use std::cell::UnsafeCell; |
| use std::ptr; |
| use std::sync::atomic::AtomicBool; |
| use std::sync::atomic::Ordering::SeqCst; |
| use std::sync::{Arc, Weak}; |
| use std::task::{self, Poll}; |
| use std::u64; |
| |
| /// Internal state shared between a `Delay` instance and the timer. |
| /// |
| /// This struct is used as a node in two intrusive data structures: |
| /// |
| /// * An atomic stack used to signal to the timer thread that the entry state |
| /// has changed. The timer thread will observe the entry on this stack and |
| /// perform any actions as necessary. |
| /// |
| /// * A doubly linked list used **only** by the timer thread. Each slot in the |
| /// timer wheel is a head pointer to the list of entries that must be |
| /// processed during that timer tick. |
| #[derive(Debug)] |
| pub(crate) struct Entry { |
| /// Only accessed from `Registration`. |
| time: CachePadded<UnsafeCell<Time>>, |
| |
| /// Timer internals. Using a weak pointer allows the timer to shutdown |
| /// without all `Delay` instances having completed. |
| /// |
| /// When `None`, the entry has not yet been linked with a timer instance. |
| inner: Weak<Inner>, |
| |
| /// Tracks the entry state. This value contains the following information: |
| /// |
| /// * The deadline at which the entry must be "fired". |
| /// * A flag indicating if the entry has already been fired. |
| /// * Whether or not the entry transitioned to the error state. |
| /// |
| /// When an `Entry` is created, `state` is initialized to the instant at |
| /// which the entry must be fired. When a timer is reset to a different |
| /// instant, this value is changed. |
| state: AtomicU64, |
| |
| /// Task to notify once the deadline is reached. |
| waker: AtomicWaker, |
| |
| /// True when the entry is queued in the "process" stack. This value |
| /// is set before pushing the value and unset after popping the value. |
| /// |
| /// TODO: This could possibly be rolled up into `state`. |
| pub(super) queued: AtomicBool, |
| |
| /// Next entry in the "process" linked list. |
| /// |
| /// Access to this field is coordinated by the `queued` flag. |
| /// |
| /// Represents a strong Arc ref. |
| pub(super) next_atomic: UnsafeCell<*mut Entry>, |
| |
| /// When the entry expires, relative to the `start` of the timer |
| /// (Inner::start). This is only used by the timer. |
| /// |
| /// A `Delay` instance can be reset to a different deadline by the thread |
| /// that owns the `Delay` instance. In this case, the timer thread will not |
| /// immediately know that this has happened. The timer thread must know the |
| /// last deadline that it saw as it uses this value to locate the entry in |
| /// its wheel. |
| /// |
| /// Once the timer thread observes that the instant has changed, it updates |
| /// the wheel and sets this value. The idea is that this value eventually |
| /// converges to the value of `state` as the timer thread makes updates. |
| when: UnsafeCell<Option<u64>>, |
| |
| /// Next entry in the State's linked list. |
| /// |
| /// This is only accessed by the timer |
| pub(super) next_stack: UnsafeCell<Option<Arc<Entry>>>, |
| |
| /// Previous entry in the State's linked list. |
| /// |
| /// This is only accessed by the timer and is used to unlink a canceled |
| /// entry. |
| /// |
| /// This is a weak reference. |
| pub(super) prev_stack: UnsafeCell<*const Entry>, |
| } |
| |
| /// Stores the info for `Delay`. |
| #[derive(Debug)] |
| pub(crate) struct Time { |
| pub(crate) deadline: Instant, |
| pub(crate) duration: Duration, |
| } |
| |
| /// Flag indicating a timer entry has elapsed |
| const ELAPSED: u64 = 1 << 63; |
| |
| /// Flag indicating a timer entry has reached an error state |
| const ERROR: u64 = u64::MAX; |
| |
| // ===== impl Entry ===== |
| |
| impl Entry { |
| pub(crate) fn new(handle: &Handle, deadline: Instant, duration: Duration) -> Arc<Entry> { |
| let inner = handle.inner().unwrap(); |
| let entry: Entry; |
| |
| // Increment the number of active timeouts |
| if inner.increment().is_err() { |
| entry = Entry::new2(deadline, duration, Weak::new(), ERROR) |
| } else { |
| let when = inner.normalize_deadline(deadline); |
| let state = if when <= inner.elapsed() { |
| ELAPSED |
| } else { |
| when |
| }; |
| entry = Entry::new2(deadline, duration, Arc::downgrade(&inner), state); |
| } |
| |
| let entry = Arc::new(entry); |
| if inner.queue(&entry).is_err() { |
| entry.error(); |
| } |
| |
| entry |
| } |
| |
| /// Only called by `Registration` |
| pub(crate) fn time_ref(&self) -> &Time { |
| unsafe { &*self.time.0.get() } |
| } |
| |
| /// Only called by `Registration` |
| #[allow(clippy::mut_from_ref)] // https://github.com/rust-lang/rust-clippy/issues/4281 |
| pub(crate) unsafe fn time_mut(&self) -> &mut Time { |
| &mut *self.time.0.get() |
| } |
| |
| /// The current entry state as known by the timer. This is not the value of |
| /// `state`, but lets the timer know how to converge its state to `state`. |
| pub(crate) fn when_internal(&self) -> Option<u64> { |
| unsafe { *self.when.get() } |
| } |
| |
| pub(crate) fn set_when_internal(&self, when: Option<u64>) { |
| unsafe { |
| *self.when.get() = when; |
| } |
| } |
| |
| /// Called by `Timer` to load the current value of `state` for processing |
| pub(crate) fn load_state(&self) -> Option<u64> { |
| let state = self.state.load(SeqCst); |
| |
| if is_elapsed(state) { |
| None |
| } else { |
| Some(state) |
| } |
| } |
| |
| pub(crate) fn is_elapsed(&self) -> bool { |
| let state = self.state.load(SeqCst); |
| is_elapsed(state) |
| } |
| |
| pub(crate) fn fire(&self, when: u64) { |
| let mut curr = self.state.load(SeqCst); |
| |
| loop { |
| if is_elapsed(curr) || curr > when { |
| return; |
| } |
| |
| let next = ELAPSED | curr; |
| let actual = self.state.compare_and_swap(curr, next, SeqCst); |
| |
| if curr == actual { |
| break; |
| } |
| |
| curr = actual; |
| } |
| |
| self.waker.wake(); |
| } |
| |
| pub(crate) fn error(&self) { |
| // Only transition to the error state if not currently elapsed |
| let mut curr = self.state.load(SeqCst); |
| |
| loop { |
| if is_elapsed(curr) { |
| return; |
| } |
| |
| let next = ERROR; |
| |
| let actual = self.state.compare_and_swap(curr, next, SeqCst); |
| |
| if curr == actual { |
| break; |
| } |
| |
| curr = actual; |
| } |
| |
| self.waker.wake(); |
| } |
| |
| pub(crate) fn cancel(entry: &Arc<Entry>) { |
| let state = entry.state.fetch_or(ELAPSED, SeqCst); |
| |
| if is_elapsed(state) { |
| // Nothing more to do |
| return; |
| } |
| |
| // If registered with a timer instance, try to upgrade the Arc. |
| let inner = match entry.upgrade_inner() { |
| Some(inner) => inner, |
| None => return, |
| }; |
| |
| let _ = inner.queue(entry); |
| } |
| |
| pub(crate) fn poll_elapsed(&self, cx: &mut task::Context<'_>) -> Poll<Result<(), Error>> { |
| let mut curr = self.state.load(SeqCst); |
| |
| if is_elapsed(curr) { |
| return Poll::Ready(if curr == ERROR { |
| Err(Error::shutdown()) |
| } else { |
| Ok(()) |
| }); |
| } |
| |
| self.waker.register_by_ref(cx.waker()); |
| |
| curr = self.state.load(SeqCst); |
| |
| if is_elapsed(curr) { |
| return Poll::Ready(if curr == ERROR { |
| Err(Error::shutdown()) |
| } else { |
| Ok(()) |
| }); |
| } |
| |
| Poll::Pending |
| } |
| |
| /// Only called by `Registration` |
| pub(crate) fn reset(entry: &mut Arc<Entry>) { |
| let inner = match entry.upgrade_inner() { |
| Some(inner) => inner, |
| None => return, |
| }; |
| |
| let deadline = entry.time_ref().deadline; |
| let when = inner.normalize_deadline(deadline); |
| let elapsed = inner.elapsed(); |
| |
| let next = if when <= elapsed { ELAPSED } else { when }; |
| |
| let mut curr = entry.state.load(SeqCst); |
| |
| loop { |
| // In these two cases, there is no work to do when resetting the |
| // timer. If the `Entry` is in an error state, then it cannot be |
| // used anymore. If resetting the entry to the current value, then |
| // the reset is a noop. |
| if curr == ERROR || curr == when { |
| return; |
| } |
| |
| let actual = entry.state.compare_and_swap(curr, next, SeqCst); |
| |
| if curr == actual { |
| break; |
| } |
| |
| curr = actual; |
| } |
| |
| // If the state has transitioned to 'elapsed' then wake the task as |
| // this entry is ready to be polled. |
| if !is_elapsed(curr) && is_elapsed(next) { |
| entry.waker.wake(); |
| } |
| |
| // The driver tracks all non-elapsed entries; notify the driver that it |
| // should update its state for this entry unless the entry had already |
| // elapsed and remains elapsed. |
| if !is_elapsed(curr) || !is_elapsed(next) { |
| let _ = inner.queue(entry); |
| } |
| } |
| |
| fn new2(deadline: Instant, duration: Duration, inner: Weak<Inner>, state: u64) -> Self { |
| Self { |
| time: CachePadded(UnsafeCell::new(Time { deadline, duration })), |
| inner, |
| waker: AtomicWaker::new(), |
| state: AtomicU64::new(state), |
| queued: AtomicBool::new(false), |
| next_atomic: UnsafeCell::new(ptr::null_mut()), |
| when: UnsafeCell::new(None), |
| next_stack: UnsafeCell::new(None), |
| prev_stack: UnsafeCell::new(ptr::null_mut()), |
| } |
| } |
| |
| fn upgrade_inner(&self) -> Option<Arc<Inner>> { |
| self.inner.upgrade() |
| } |
| } |
| |
| fn is_elapsed(state: u64) -> bool { |
| state & ELAPSED == ELAPSED |
| } |
| |
| impl Drop for Entry { |
| fn drop(&mut self) { |
| let inner = match self.upgrade_inner() { |
| Some(inner) => inner, |
| None => return, |
| }; |
| |
| inner.decrement(); |
| } |
| } |
| |
| unsafe impl Send for Entry {} |
| unsafe impl Sync for Entry {} |
| |
| #[cfg_attr(target_arch = "x86_64", repr(align(128)))] |
| #[cfg_attr(not(target_arch = "x86_64"), repr(align(64)))] |
| #[derive(Debug)] |
| struct CachePadded<T>(T); |