| use atomic::AtomicU64; |
| use timer::{HandlePriv, Inner}; |
| use Error; |
| |
| use crossbeam_utils::CachePadded; |
| use futures::task::AtomicTask; |
| use futures::Poll; |
| |
| use std::cell::UnsafeCell; |
| use std::ptr; |
| use std::sync::atomic::AtomicBool; |
| use std::sync::atomic::Ordering::{Relaxed, SeqCst}; |
| use std::sync::{Arc, Weak}; |
| use std::time::{Duration, Instant}; |
| use std::u64; |
| |
| /// Internal state shared between a `Delay` instance and the timer. |
| /// |
| /// This struct is used as a node in two intrusive data structures: |
| /// |
| /// * An atomic stack used to signal to the timer thread that the entry state |
| /// has changed. The timer thread will observe the entry on this stack and |
| /// perform any actions as necessary. |
| /// |
| /// * A doubly linked list used **only** by the timer thread. Each slot in the |
| /// timer wheel is a head pointer to the list of entries that must be |
| /// processed during that timer tick. |
| #[derive(Debug)] |
| pub(crate) struct Entry { |
| /// Only accessed from `Registration`. |
| time: CachePadded<UnsafeCell<Time>>, |
| |
| /// Timer internals. Using a weak pointer allows the timer to shutdown |
| /// without all `Delay` instances having completed. |
| /// |
| /// When `None`, the entry has not yet been linked with a timer instance. |
| inner: Option<Weak<Inner>>, |
| |
| /// Tracks the entry state. This value contains the following information: |
| /// |
| /// * The deadline at which the entry must be "fired". |
| /// * A flag indicating if the entry has already been fired. |
| /// * Whether or not the entry transitioned to the error state. |
| /// |
| /// When an `Entry` is created, `state` is initialized to the instant at |
| /// which the entry must be fired. When a timer is reset to a different |
| /// instant, this value is changed. |
| state: AtomicU64, |
| |
| /// Task to notify once the deadline is reached. |
| task: AtomicTask, |
| |
| /// True when the entry is queued in the "process" stack. This value |
| /// is set before pushing the value and unset after popping the value. |
| /// |
| /// TODO: This could possibly be rolled up into `state`. |
| pub(super) queued: AtomicBool, |
| |
| /// Next entry in the "process" linked list. |
| /// |
| /// Access to this field is coordinated by the `queued` flag. |
| /// |
| /// Represents a strong Arc ref. |
| pub(super) next_atomic: UnsafeCell<*mut Entry>, |
| |
| /// When the entry expires, relative to the `start` of the timer |
| /// (Inner::start). This is only used by the timer. |
| /// |
| /// A `Delay` instance can be reset to a different deadline by the thread |
| /// that owns the `Delay` instance. In this case, the timer thread will not |
| /// immediately know that this has happened. The timer thread must know the |
| /// last deadline that it saw as it uses this value to locate the entry in |
| /// its wheel. |
| /// |
| /// Once the timer thread observes that the instant has changed, it updates |
| /// the wheel and sets this value. The idea is that this value eventually |
| /// converges to the value of `state` as the timer thread makes updates. |
| when: UnsafeCell<Option<u64>>, |
| |
| /// Next entry in the State's linked list. |
| /// |
| /// This is only accessed by the timer |
| pub(super) next_stack: UnsafeCell<Option<Arc<Entry>>>, |
| |
| /// Previous entry in the State's linked list. |
| /// |
| /// This is only accessed by the timer and is used to unlink a canceled |
| /// entry. |
| /// |
| /// This is a weak reference. |
| pub(super) prev_stack: UnsafeCell<*const Entry>, |
| } |
| |
| /// Stores the info for `Delay`. |
| #[derive(Debug)] |
| pub(crate) struct Time { |
| pub(crate) deadline: Instant, |
| pub(crate) duration: Duration, |
| } |
| |
| /// Flag indicating a timer entry has elapsed |
| const ELAPSED: u64 = 1 << 63; |
| |
| /// Flag indicating a timer entry has reached an error state |
| const ERROR: u64 = u64::MAX; |
| |
| // ===== impl Entry ===== |
| |
| impl Entry { |
| pub fn new(deadline: Instant, duration: Duration) -> Entry { |
| Entry { |
| time: CachePadded::new(UnsafeCell::new(Time { deadline, duration })), |
| inner: None, |
| task: AtomicTask::new(), |
| state: AtomicU64::new(0), |
| queued: AtomicBool::new(false), |
| next_atomic: UnsafeCell::new(ptr::null_mut()), |
| when: UnsafeCell::new(None), |
| next_stack: UnsafeCell::new(None), |
| prev_stack: UnsafeCell::new(ptr::null_mut()), |
| } |
| } |
| |
| /// Only called by `Registration` |
| pub fn time_ref(&self) -> &Time { |
| unsafe { &*self.time.get() } |
| } |
| |
| /// Only called by `Registration` |
| pub fn time_mut(&self) -> &mut Time { |
| unsafe { &mut *self.time.get() } |
| } |
| |
| /// Returns `true` if the `Entry` is currently associated with a timer |
| /// instance. |
| pub fn is_registered(&self) -> bool { |
| self.inner.is_some() |
| } |
| |
| /// Only called by `Registration` |
| pub fn register(me: &mut Arc<Self>) { |
| let handle = match HandlePriv::try_current() { |
| Ok(handle) => handle, |
| Err(_) => { |
| // Could not associate the entry with a timer, transition the |
| // state to error |
| Arc::get_mut(me).unwrap().transition_to_error(); |
| |
| return; |
| } |
| }; |
| |
| Entry::register_with(me, handle) |
| } |
| |
| /// Only called by `Registration` |
| pub fn register_with(me: &mut Arc<Self>, handle: HandlePriv) { |
| assert!(!me.is_registered(), "only register an entry once"); |
| |
| let deadline = me.time_ref().deadline; |
| |
| let inner = match handle.inner() { |
| Some(inner) => inner, |
| None => { |
| // Could not associate the entry with a timer, transition the |
| // state to error |
| Arc::get_mut(me).unwrap().transition_to_error(); |
| |
| return; |
| } |
| }; |
| |
| // Increment the number of active timeouts |
| if inner.increment().is_err() { |
| Arc::get_mut(me).unwrap().transition_to_error(); |
| |
| return; |
| } |
| |
| // Associate the entry with the timer |
| Arc::get_mut(me).unwrap().inner = Some(handle.into_inner()); |
| |
| let when = inner.normalize_deadline(deadline); |
| |
| // Relaxed OK: At this point, there are no other threads that have |
| // access to this entry. |
| if when <= inner.elapsed() { |
| me.state.store(ELAPSED, Relaxed); |
| return; |
| } else { |
| me.state.store(when, Relaxed); |
| } |
| |
| if inner.queue(me).is_err() { |
| // The timer has shutdown, transition the entry to the error state. |
| me.error(); |
| } |
| } |
| |
| fn transition_to_error(&mut self) { |
| self.inner = Some(Weak::new()); |
| self.state = AtomicU64::new(ERROR); |
| } |
| |
| /// The current entry state as known by the timer. This is not the value of |
| /// `state`, but lets the timer know how to converge its state to `state`. |
| pub fn when_internal(&self) -> Option<u64> { |
| unsafe { (*self.when.get()) } |
| } |
| |
| pub fn set_when_internal(&self, when: Option<u64>) { |
| unsafe { |
| (*self.when.get()) = when; |
| } |
| } |
| |
| /// Called by `Timer` to load the current value of `state` for processing |
| pub fn load_state(&self) -> Option<u64> { |
| let state = self.state.load(SeqCst); |
| |
| if is_elapsed(state) { |
| None |
| } else { |
| Some(state) |
| } |
| } |
| |
| pub fn is_elapsed(&self) -> bool { |
| let state = self.state.load(SeqCst); |
| is_elapsed(state) |
| } |
| |
| pub fn fire(&self, when: u64) { |
| let mut curr = self.state.load(SeqCst); |
| |
| loop { |
| if is_elapsed(curr) || curr > when { |
| return; |
| } |
| |
| let next = ELAPSED | curr; |
| let actual = self.state.compare_and_swap(curr, next, SeqCst); |
| |
| if curr == actual { |
| break; |
| } |
| |
| curr = actual; |
| } |
| |
| self.task.notify(); |
| } |
| |
| pub fn error(&self) { |
| // Only transition to the error state if not currently elapsed |
| let mut curr = self.state.load(SeqCst); |
| |
| loop { |
| if is_elapsed(curr) { |
| return; |
| } |
| |
| let next = ERROR; |
| |
| let actual = self.state.compare_and_swap(curr, next, SeqCst); |
| |
| if curr == actual { |
| break; |
| } |
| |
| curr = actual; |
| } |
| |
| self.task.notify(); |
| } |
| |
| pub fn cancel(entry: &Arc<Entry>) { |
| let state = entry.state.fetch_or(ELAPSED, SeqCst); |
| |
| if is_elapsed(state) { |
| // Nothing more to do |
| return; |
| } |
| |
| // If registered with a timer instance, try to upgrade the Arc. |
| let inner = match entry.upgrade_inner() { |
| Some(inner) => inner, |
| None => return, |
| }; |
| |
| let _ = inner.queue(entry); |
| } |
| |
| pub fn poll_elapsed(&self) -> Poll<(), Error> { |
| use futures::Async::NotReady; |
| |
| let mut curr = self.state.load(SeqCst); |
| |
| if is_elapsed(curr) { |
| if curr == ERROR { |
| return Err(Error::shutdown()); |
| } else { |
| return Ok(().into()); |
| } |
| } |
| |
| self.task.register(); |
| |
| curr = self.state.load(SeqCst).into(); |
| |
| if is_elapsed(curr) { |
| if curr == ERROR { |
| return Err(Error::shutdown()); |
| } else { |
| return Ok(().into()); |
| } |
| } |
| |
| Ok(NotReady) |
| } |
| |
| /// Only called by `Registration` |
| pub fn reset(entry: &mut Arc<Entry>) { |
| if !entry.is_registered() { |
| return; |
| } |
| |
| let inner = match entry.upgrade_inner() { |
| Some(inner) => inner, |
| None => return, |
| }; |
| |
| let deadline = entry.time_ref().deadline; |
| let when = inner.normalize_deadline(deadline); |
| let elapsed = inner.elapsed(); |
| |
| let mut curr = entry.state.load(SeqCst); |
| let mut notify; |
| |
| loop { |
| // In these two cases, there is no work to do when resetting the |
| // timer. If the `Entry` is in an error state, then it cannot be |
| // used anymore. If resetting the entry to the current value, then |
| // the reset is a noop. |
| if curr == ERROR || curr == when { |
| return; |
| } |
| |
| let next; |
| |
| if when <= elapsed { |
| next = ELAPSED; |
| notify = !is_elapsed(curr); |
| } else { |
| next = when; |
| notify = true; |
| } |
| |
| let actual = entry.state.compare_and_swap(curr, next, SeqCst); |
| |
| if curr == actual { |
| break; |
| } |
| |
| curr = actual; |
| } |
| |
| if notify { |
| let _ = inner.queue(entry); |
| } |
| } |
| |
| fn upgrade_inner(&self) -> Option<Arc<Inner>> { |
| self.inner.as_ref().and_then(|inner| inner.upgrade()) |
| } |
| } |
| |
| fn is_elapsed(state: u64) -> bool { |
| state & ELAPSED == ELAPSED |
| } |
| |
| impl Drop for Entry { |
| fn drop(&mut self) { |
| let inner = match self.upgrade_inner() { |
| Some(inner) => inner, |
| None => return, |
| }; |
| |
| inner.decrement(); |
| } |
| } |
| |
| unsafe impl Send for Entry {} |
| unsafe impl Sync for Entry {} |