| //! Notify async tasks or threads. |
| //! |
| //! This is a synchronization primitive similar to [eventcounts] invented by Dmitry Vyukov. |
| //! |
| //! You can use this crate to turn non-blocking data structures into async or blocking data |
| //! structures. See a [simple mutex] implementation that exposes an async and a blocking interface |
| //! for acquiring locks. |
| //! |
| //! [eventcounts]: http://www.1024cores.net/home/lock-free-algorithms/eventcounts |
| //! [simple mutex]: https://github.com/stjepang/event-listener/blob/master/examples/mutex.rs |
| //! |
| //! # Examples |
| //! |
| //! Wait until another thread sets a boolean flag: |
| //! |
| //! ``` |
| //! use std::sync::atomic::{AtomicBool, Ordering}; |
| //! use std::sync::Arc; |
| //! use std::thread; |
| //! use std::time::Duration; |
| //! use std::usize; |
| //! use event_listener::Event; |
| //! |
| //! let flag = Arc::new(AtomicBool::new(false)); |
| //! let event = Arc::new(Event::new()); |
| //! |
| //! // Spawn a thread that will set the flag after 1 second. |
| //! thread::spawn({ |
| //! let flag = flag.clone(); |
| //! let event = event.clone(); |
| //! move || { |
| //! // Wait for a second. |
| //! thread::sleep(Duration::from_secs(1)); |
| //! |
| //! // Set the flag. |
| //! flag.store(true, Ordering::SeqCst); |
| //! |
| //! // Notify all listeners that the flag has been set. |
| //! event.notify(usize::MAX); |
| //! } |
| //! }); |
| //! |
| //! // Wait until the flag is set. |
| //! loop { |
| //! // Check the flag. |
| //! if flag.load(Ordering::SeqCst) { |
| //! break; |
| //! } |
| //! |
| //! // Start listening for events. |
| //! let listener = event.listen(); |
| //! |
| //! // Check the flag again after creating the listener. |
| //! if flag.load(Ordering::SeqCst) { |
| //! break; |
| //! } |
| //! |
| //! // Wait for a notification and continue the loop. |
| //! listener.wait(); |
| //! } |
| //! ``` |
| |
| #![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)] |
| |
| use std::cell::{Cell, UnsafeCell}; |
| use std::fmt; |
| use std::future::Future; |
| use std::mem::{self, ManuallyDrop}; |
| use std::ops::{Deref, DerefMut}; |
| use std::panic::{RefUnwindSafe, UnwindSafe}; |
| use std::pin::Pin; |
| use std::ptr::{self, NonNull}; |
| use std::sync::atomic::{self, AtomicPtr, AtomicUsize, Ordering}; |
| use std::sync::{Arc, Mutex, MutexGuard}; |
| use std::task::{Context, Poll, Waker}; |
| use std::thread::{self, Thread}; |
| use std::time::{Duration, Instant}; |
| use std::usize; |
| |
| /// Inner state of [`Event`]. |
| struct Inner { |
| /// The number of notified entries, or `usize::MAX` if all of them have been notified. |
| /// |
| /// If there are no entries, this value is set to `usize::MAX`. |
| notified: AtomicUsize, |
| |
| /// A linked list holding registered listeners. |
| list: Mutex<List>, |
| |
| /// A single cached list entry to avoid allocations on the fast path of the insertion. |
| cache: UnsafeCell<Entry>, |
| } |
| |
| impl Inner { |
| /// Locks the list. |
| fn lock(&self) -> ListGuard<'_> { |
| ListGuard { |
| inner: self, |
| guard: self.list.lock().unwrap(), |
| } |
| } |
| |
| /// Returns the pointer to the single cached list entry. |
| #[inline(always)] |
| fn cache_ptr(&self) -> NonNull<Entry> { |
| unsafe { NonNull::new_unchecked(self.cache.get()) } |
| } |
| } |
| |
| /// A synchronization primitive for notifying async tasks and threads. |
| /// |
| /// Listeners can be registered using [`Event::listen()`]. There are two ways to notify listeners: |
| /// |
| /// 1. [`Event::notify()`] notifies a number of listeners. |
| /// 2. [`Event::notify_additional()`] notifies a number of previously unnotified listeners. |
| /// |
| /// If there are no active listeners at the time a notification is sent, it simply gets lost. |
| /// |
| /// There are two ways for a listener to wait for a notification: |
| /// |
| /// 1. In an asynchronous manner using `.await`. |
| /// 2. In a blocking manner by calling [`EventListener::wait()`] on it. |
| /// |
| /// If a notified listener is dropped without receiving a notification, dropping will notify |
| /// another active listener. Whether one *additional* listener will be notified depends on what |
| /// kind of notification was delivered. |
| /// |
| /// Listeners are registered and notified in the first-in first-out fashion, ensuring fairness. |
| pub struct Event { |
| /// A pointer to heap-allocated inner state. |
| /// |
| /// This pointer is initially null and gets lazily initialized on first use. Semantically, it |
| /// is an `Arc<Inner>` so it's important to keep in mind that it contributes to the [`Arc`]'s |
| /// reference count. |
| inner: AtomicPtr<Inner>, |
| } |
| |
| unsafe impl Send for Event {} |
| unsafe impl Sync for Event {} |
| |
| impl UnwindSafe for Event {} |
| impl RefUnwindSafe for Event {} |
| |
| impl Event { |
| /// Creates a new [`Event`]. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// use event_listener::Event; |
| /// |
| /// let event = Event::new(); |
| /// ``` |
| #[inline] |
| pub const fn new() -> Event { |
| Event { |
| inner: AtomicPtr::new(ptr::null_mut()), |
| } |
| } |
| |
| /// Returns a guard listening for a notification. |
| /// |
| /// This method emits a `SeqCst` fence after registering a listener. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// use event_listener::Event; |
| /// |
| /// let event = Event::new(); |
| /// let listener = event.listen(); |
| /// ``` |
| #[cold] |
| pub fn listen(&self) -> EventListener { |
| let inner = self.inner(); |
| let listener = EventListener { |
| inner: unsafe { Arc::clone(&ManuallyDrop::new(Arc::from_raw(inner))) }, |
| entry: Some(inner.lock().insert(inner.cache_ptr())), |
| }; |
| |
| // Make sure the listener is registered before whatever happens next. |
| full_fence(); |
| listener |
| } |
| |
| /// Notifies a number of active listeners. |
| /// |
| /// The number is allowed to be zero or exceed the current number of listeners. |
| /// |
| /// In contrast to [`Event::notify_additional()`], this method only makes sure *at least* `n` |
| /// listeners among the active ones are notified. |
| /// |
| /// This method emits a `SeqCst` fence before notifying listeners. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// use event_listener::Event; |
| /// |
| /// let event = Event::new(); |
| /// |
| /// // This notification gets lost because there are no listeners. |
| /// event.notify(1); |
| /// |
| /// let listener1 = event.listen(); |
| /// let listener2 = event.listen(); |
| /// let listener3 = event.listen(); |
| /// |
| /// // Notifies two listeners. |
| /// // |
| /// // Listener queueing is fair, which means `listener1` and `listener2` |
| /// // get notified here since they start listening before `listener3`. |
| /// event.notify(2); |
| /// ``` |
| #[inline] |
| pub fn notify(&self, n: usize) { |
| // Make sure the notification comes after whatever triggered it. |
| full_fence(); |
| |
| if let Some(inner) = self.try_inner() { |
| // Notify if there is at least one unnotified listener and the number of notified |
| // listeners is less than `n`. |
| if inner.notified.load(Ordering::Acquire) < n { |
| inner.lock().notify(n); |
| } |
| } |
| } |
| |
| /// Notifies a number of active listeners without emitting a `SeqCst` fence. |
| /// |
| /// The number is allowed to be zero or exceed the current number of listeners. |
| /// |
| /// In contrast to [`Event::notify_additional()`], this method only makes sure *at least* `n` |
| /// listeners among the active ones are notified. |
| /// |
| /// Unlike [`Event::notify()`], this method does not emit a `SeqCst` fence. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// use event_listener::Event; |
| /// use std::sync::atomic::{self, Ordering}; |
| /// |
| /// let event = Event::new(); |
| /// |
| /// // This notification gets lost because there are no listeners. |
| /// event.notify(1); |
| /// |
| /// let listener1 = event.listen(); |
| /// let listener2 = event.listen(); |
| /// let listener3 = event.listen(); |
| /// |
| /// // We should emit a fence manually when using relaxed notifications. |
| /// atomic::fence(Ordering::SeqCst); |
| /// |
| /// // Notifies two listeners. |
| /// // |
| /// // Listener queueing is fair, which means `listener1` and `listener2` |
| /// // get notified here since they start listening before `listener3`. |
| /// event.notify(2); |
| /// ``` |
| #[inline] |
| pub fn notify_relaxed(&self, n: usize) { |
| if let Some(inner) = self.try_inner() { |
| // Notify if there is at least one unnotified listener and the number of notified |
| // listeners is less than `n`. |
| if inner.notified.load(Ordering::Acquire) < n { |
| inner.lock().notify(n); |
| } |
| } |
| } |
| |
| /// Notifies a number of active and still unnotified listeners. |
| /// |
| /// The number is allowed to be zero or exceed the current number of listeners. |
| /// |
| /// In contrast to [`Event::notify()`], this method will notify `n` *additional* listeners that |
| /// were previously unnotified. |
| /// |
| /// This method emits a `SeqCst` fence before notifying listeners. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// use event_listener::Event; |
| /// |
| /// let event = Event::new(); |
| /// |
| /// // This notification gets lost because there are no listeners. |
| /// event.notify(1); |
| /// |
| /// let listener1 = event.listen(); |
| /// let listener2 = event.listen(); |
| /// let listener3 = event.listen(); |
| /// |
| /// // Notifies two listeners. |
| /// // |
| /// // Listener queueing is fair, which means `listener1` and `listener2` |
| /// // get notified here since they start listening before `listener3`. |
| /// event.notify_additional(1); |
| /// event.notify_additional(1); |
| /// ``` |
| #[inline] |
| pub fn notify_additional(&self, n: usize) { |
| // Make sure the notification comes after whatever triggered it. |
| full_fence(); |
| |
| if let Some(inner) = self.try_inner() { |
| // Notify if there is at least one unnotified listener. |
| if inner.notified.load(Ordering::Acquire) < usize::MAX { |
| inner.lock().notify_additional(n); |
| } |
| } |
| } |
| |
| /// Notifies a number of active and still unnotified listeners without emitting a `SeqCst` |
| /// fence. |
| /// |
| /// The number is allowed to be zero or exceed the current number of listeners. |
| /// |
| /// In contrast to [`Event::notify()`], this method will notify `n` *additional* listeners that |
| /// were previously unnotified. |
| /// |
| /// Unlike [`Event::notify_additional()`], this method does not emit a `SeqCst` fence. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// use event_listener::Event; |
| /// use std::sync::atomic::{self, Ordering}; |
| /// |
| /// let event = Event::new(); |
| /// |
| /// // This notification gets lost because there are no listeners. |
| /// event.notify(1); |
| /// |
| /// let listener1 = event.listen(); |
| /// let listener2 = event.listen(); |
| /// let listener3 = event.listen(); |
| /// |
| /// // We should emit a fence manually when using relaxed notifications. |
| /// atomic::fence(Ordering::SeqCst); |
| /// |
| /// // Notifies two listeners. |
| /// // |
| /// // Listener queueing is fair, which means `listener1` and `listener2` |
| /// // get notified here since they start listening before `listener3`. |
| /// event.notify_additional_relaxed(1); |
| /// event.notify_additional_relaxed(1); |
| /// ``` |
| #[inline] |
| pub fn notify_additional_relaxed(&self, n: usize) { |
| if let Some(inner) = self.try_inner() { |
| // Notify if there is at least one unnotified listener. |
| if inner.notified.load(Ordering::Acquire) < usize::MAX { |
| inner.lock().notify_additional(n); |
| } |
| } |
| } |
| |
| /// Returns a reference to the inner state if it was initialized. |
| #[inline] |
| fn try_inner(&self) -> Option<&Inner> { |
| let inner = self.inner.load(Ordering::Acquire); |
| unsafe { inner.as_ref() } |
| } |
| |
| /// Returns a reference to the inner state, initializing it if necessary. |
| fn inner(&self) -> &Inner { |
| let mut inner = self.inner.load(Ordering::Acquire); |
| |
| // Initialize the state if this is its first use. |
| if inner.is_null() { |
| // Allocate on the heap. |
| let new = Arc::new(Inner { |
| notified: AtomicUsize::new(usize::MAX), |
| list: std::sync::Mutex::new(List { |
| head: None, |
| tail: None, |
| start: None, |
| len: 0, |
| notified: 0, |
| cache_used: false, |
| }), |
| cache: UnsafeCell::new(Entry { |
| state: Cell::new(State::Created), |
| prev: Cell::new(None), |
| next: Cell::new(None), |
| }), |
| }); |
| // Convert the heap-allocated state into a raw pointer. |
| let new = Arc::into_raw(new) as *mut Inner; |
| |
| // Attempt to replace the null-pointer with the new state pointer. |
| inner = self.inner.compare_and_swap(inner, new, Ordering::AcqRel); |
| |
| // Check if the old pointer value was indeed null. |
| if inner.is_null() { |
| // If yes, then use the new state pointer. |
| inner = new; |
| } else { |
| // If not, that means a concurrent operation has initialized the state. |
| // In that case, use the old pointer and deallocate the new one. |
| unsafe { |
| drop(Arc::from_raw(new)); |
| } |
| } |
| } |
| |
| unsafe { &*inner } |
| } |
| } |
| |
| impl Drop for Event { |
| #[inline] |
| fn drop(&mut self) { |
| let inner: *mut Inner = *self.inner.get_mut(); |
| |
| // If the state pointer has been initialized, deallocate it. |
| if !inner.is_null() { |
| unsafe { |
| drop(Arc::from_raw(inner)); |
| } |
| } |
| } |
| } |
| |
| impl fmt::Debug for Event { |
| fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
| f.pad("Event { .. }") |
| } |
| } |
| |
| impl Default for Event { |
| fn default() -> Event { |
| Event::new() |
| } |
| } |
| |
| /// A guard waiting for a notification from an [`Event`]. |
| /// |
| /// There are two ways for a listener to wait for a notification: |
| /// |
| /// 1. In an asynchronous manner using `.await`. |
| /// 2. In a blocking manner by calling [`EventListener::wait()`] on it. |
| /// |
| /// If a notified listener is dropped without receiving a notification, dropping will notify |
| /// another active listener. Whether one *additional* listener will be notified depends on what |
| /// kind of notification was delivered. |
| pub struct EventListener { |
| /// A reference to [`Event`]'s inner state. |
| inner: Arc<Inner>, |
| |
| /// A pointer to this listener's entry in the linked list. |
| entry: Option<NonNull<Entry>>, |
| } |
| |
| unsafe impl Send for EventListener {} |
| unsafe impl Sync for EventListener {} |
| |
| impl UnwindSafe for EventListener {} |
| impl RefUnwindSafe for EventListener {} |
| |
| impl EventListener { |
| /// Blocks until a notification is received. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// use event_listener::Event; |
| /// |
| /// let event = Event::new(); |
| /// let listener = event.listen(); |
| /// |
| /// // Notify `listener`. |
| /// event.notify(1); |
| /// |
| /// // Receive the notification. |
| /// listener.wait(); |
| /// ``` |
| pub fn wait(self) { |
| self.wait_internal(None); |
| } |
| |
| /// Blocks until a notification is received or a timeout is reached. |
| /// |
| /// Returns `true` if a notification was received. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// use std::time::Duration; |
| /// use event_listener::Event; |
| /// |
| /// let event = Event::new(); |
| /// let listener = event.listen(); |
| /// |
| /// // There are no notification so this times out. |
| /// assert!(!listener.wait_timeout(Duration::from_secs(1))); |
| /// ``` |
| pub fn wait_timeout(self, timeout: Duration) -> bool { |
| self.wait_internal(Some(Instant::now() + timeout)) |
| } |
| |
| /// Blocks until a notification is received or a deadline is reached. |
| /// |
| /// Returns `true` if a notification was received. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// use std::time::{Duration, Instant}; |
| /// use event_listener::Event; |
| /// |
| /// let event = Event::new(); |
| /// let listener = event.listen(); |
| /// |
| /// // There are no notification so this times out. |
| /// assert!(!listener.wait_deadline(Instant::now() + Duration::from_secs(1))); |
| /// ``` |
| pub fn wait_deadline(self, deadline: Instant) -> bool { |
| self.wait_internal(Some(deadline)) |
| } |
| |
| /// Drops this listener and discards its notification (if any) without notifying another |
| /// active listener. |
| /// |
| /// Returns `true` if a notification was discarded. |
| /// |
| /// # Examples |
| /// ``` |
| /// use event_listener::Event; |
| /// |
| /// let event = Event::new(); |
| /// let listener1 = event.listen(); |
| /// let listener2 = event.listen(); |
| /// |
| /// event.notify(1); |
| /// |
| /// assert!(listener1.discard()); |
| /// assert!(!listener2.discard()); |
| /// ``` |
| pub fn discard(mut self) -> bool { |
| // If this listener has never picked up a notification... |
| if let Some(entry) = self.entry.take() { |
| let mut list = self.inner.lock(); |
| // Remove the listener from the list and return `true` if it was notified. |
| if let State::Notified(_) = list.remove(entry, self.inner.cache_ptr()) { |
| return true; |
| } |
| } |
| false |
| } |
| |
| /// Returns `true` if this listener listens to the given `Event`. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// use event_listener::Event; |
| /// |
| /// let event = Event::new(); |
| /// let listener = event.listen(); |
| /// |
| /// assert!(listener.listens_to(&event)); |
| /// ``` |
| #[inline] |
| pub fn listens_to(&self, event: &Event) -> bool { |
| ptr::eq::<Inner>(&*self.inner, event.inner.load(Ordering::Acquire)) |
| } |
| |
| /// Returns `true` if both listeners listen to the same `Event`. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// use event_listener::Event; |
| /// |
| /// let event = Event::new(); |
| /// let listener1 = event.listen(); |
| /// let listener2 = event.listen(); |
| /// |
| /// assert!(listener1.same_event(&listener2)); |
| /// ``` |
| pub fn same_event(&self, other: &EventListener) -> bool { |
| ptr::eq::<Inner>(&*self.inner, &*other.inner) |
| } |
| |
| fn wait_internal(mut self, deadline: Option<Instant>) -> bool { |
| // Take out the entry pointer and set it to `None`. |
| let entry = match self.entry.take() { |
| None => unreachable!("cannot wait twice on an `EventListener`"), |
| Some(entry) => entry, |
| }; |
| |
| // Set this listener's state to `Waiting`. |
| { |
| let mut list = self.inner.lock(); |
| let e = unsafe { entry.as_ref() }; |
| |
| // Do a dummy replace operation in order to take out the state. |
| match e.state.replace(State::Notified(false)) { |
| State::Notified(_) => { |
| // If this listener has been notified, remove it from the list and return. |
| list.remove(entry, self.inner.cache_ptr()); |
| return true; |
| } |
| // Otherwise, set the state to `Waiting`. |
| _ => e.state.set(State::Waiting(thread::current())), |
| } |
| } |
| |
| // Wait until a notification is received or the timeout is reached. |
| loop { |
| match deadline { |
| None => thread::park(), |
| |
| Some(deadline) => { |
| // Check for timeout. |
| let now = Instant::now(); |
| if now >= deadline { |
| // Remove the entry and check if notified. |
| return self |
| .inner |
| .lock() |
| .remove(entry, self.inner.cache_ptr()) |
| .is_notified(); |
| } |
| |
| // Park until the deadline. |
| thread::park_timeout(deadline - now); |
| } |
| } |
| |
| let mut list = self.inner.lock(); |
| let e = unsafe { entry.as_ref() }; |
| |
| // Do a dummy replace operation in order to take out the state. |
| match e.state.replace(State::Notified(false)) { |
| State::Notified(_) => { |
| // If this listener has been notified, remove it from the list and return. |
| list.remove(entry, self.inner.cache_ptr()); |
| return true; |
| } |
| // Otherwise, set the state back to `Waiting`. |
| state => e.state.set(state), |
| } |
| } |
| } |
| } |
| |
| impl fmt::Debug for EventListener { |
| fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
| f.pad("EventListener { .. }") |
| } |
| } |
| |
| impl Future for EventListener { |
| type Output = (); |
| |
| fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
| let mut list = self.inner.lock(); |
| |
| let entry = match self.entry { |
| None => unreachable!("cannot poll a completed `EventListener` future"), |
| Some(entry) => entry, |
| }; |
| let state = unsafe { &entry.as_ref().state }; |
| |
| // Do a dummy replace operation in order to take out the state. |
| match state.replace(State::Notified(false)) { |
| State::Notified(_) => { |
| // If this listener has been notified, remove it from the list and return. |
| list.remove(entry, self.inner.cache_ptr()); |
| drop(list); |
| self.entry = None; |
| return Poll::Ready(()); |
| } |
| State::Created => { |
| // If the listener was just created, put it in the `Polling` state. |
| state.set(State::Polling(cx.waker().clone())); |
| } |
| State::Polling(w) => { |
| // If the listener was in the `Polling` state, update the waker. |
| if w.will_wake(cx.waker()) { |
| state.set(State::Polling(w)); |
| } else { |
| state.set(State::Polling(cx.waker().clone())); |
| } |
| } |
| State::Waiting(_) => { |
| unreachable!("cannot poll and wait on `EventListener` at the same time") |
| } |
| } |
| |
| Poll::Pending |
| } |
| } |
| |
| impl Drop for EventListener { |
| fn drop(&mut self) { |
| // If this listener has never picked up a notification... |
| if let Some(entry) = self.entry.take() { |
| let mut list = self.inner.lock(); |
| |
| // But if a notification was delivered to it... |
| if let State::Notified(additional) = list.remove(entry, self.inner.cache_ptr()) { |
| // Then pass it on to another active listener. |
| if additional { |
| list.notify_additional(1); |
| } else { |
| list.notify(1); |
| } |
| } |
| } |
| } |
| } |
| |
| /// A guard holding the linked list locked. |
| struct ListGuard<'a> { |
| /// A reference to [`Event`]'s inner state. |
| inner: &'a Inner, |
| |
| /// The actual guard that acquired the linked list. |
| guard: MutexGuard<'a, List>, |
| } |
| |
| impl Drop for ListGuard<'_> { |
| #[inline] |
| fn drop(&mut self) { |
| let list = &mut **self; |
| |
| // Update the atomic `notified` counter. |
| let notified = if list.notified < list.len { |
| list.notified |
| } else { |
| usize::MAX |
| }; |
| self.inner.notified.store(notified, Ordering::Release); |
| } |
| } |
| |
| impl Deref for ListGuard<'_> { |
| type Target = List; |
| |
| #[inline] |
| fn deref(&self) -> &List { |
| &*self.guard |
| } |
| } |
| |
| impl DerefMut for ListGuard<'_> { |
| #[inline] |
| fn deref_mut(&mut self) -> &mut List { |
| &mut *self.guard |
| } |
| } |
| |
| /// The state of a listener. |
| enum State { |
| /// It has just been created. |
| Created, |
| |
| /// It has received a notification. |
| /// |
| /// The `bool` is `true` if this was an "additional" notification. |
| Notified(bool), |
| |
| /// An async task is polling it. |
| Polling(Waker), |
| |
| /// A thread is blocked on it. |
| Waiting(Thread), |
| } |
| |
| impl State { |
| /// Returns `true` if this is the `Notified` state. |
| #[inline] |
| fn is_notified(&self) -> bool { |
| match self { |
| State::Notified(_) => true, |
| State::Created | State::Polling(_) | State::Waiting(_) => false, |
| } |
| } |
| } |
| |
| /// An entry representing a registered listener. |
| struct Entry { |
| /// THe state of this listener. |
| state: Cell<State>, |
| |
| /// Previous entry in the linked list. |
| prev: Cell<Option<NonNull<Entry>>>, |
| |
| /// Next entry in the linked list. |
| next: Cell<Option<NonNull<Entry>>>, |
| } |
| |
| /// A linked list of entries. |
| struct List { |
| /// First entry in the list. |
| head: Option<NonNull<Entry>>, |
| |
| /// Last entry in the list. |
| tail: Option<NonNull<Entry>>, |
| |
| /// The first unnotified entry in the list. |
| start: Option<NonNull<Entry>>, |
| |
| /// Total number of entries in the list. |
| len: usize, |
| |
| /// The number of notified entries in the list. |
| notified: usize, |
| |
| /// Whether the cached entry is used. |
| cache_used: bool, |
| } |
| |
| impl List { |
| /// Inserts a new entry into the list. |
| fn insert(&mut self, cache: NonNull<Entry>) -> NonNull<Entry> { |
| unsafe { |
| let entry = Entry { |
| state: Cell::new(State::Created), |
| prev: Cell::new(self.tail), |
| next: Cell::new(None), |
| }; |
| |
| let entry = if self.cache_used { |
| // Allocate an entry that is going to become the new tail. |
| NonNull::new_unchecked(Box::into_raw(Box::new(entry))) |
| } else { |
| // No need to allocate - we can use the cached entry. |
| self.cache_used = true; |
| cache.as_ptr().write(entry); |
| cache |
| }; |
| |
| // Replace the tail with the new entry. |
| match mem::replace(&mut self.tail, Some(entry)) { |
| None => self.head = Some(entry), |
| Some(t) => t.as_ref().next.set(Some(entry)), |
| } |
| |
| // If there were no unnotified entries, this one is the first now. |
| if self.start.is_none() { |
| self.start = self.tail; |
| } |
| |
| // Bump the entry count. |
| self.len += 1; |
| |
| entry |
| } |
| } |
| |
| /// Removes an entry from the list and returns its state. |
| fn remove(&mut self, entry: NonNull<Entry>, cache: NonNull<Entry>) -> State { |
| unsafe { |
| let prev = entry.as_ref().prev.get(); |
| let next = entry.as_ref().next.get(); |
| |
| // Unlink from the previous entry. |
| match prev { |
| None => self.head = next, |
| Some(p) => p.as_ref().next.set(next), |
| } |
| |
| // Unlink from the next entry. |
| match next { |
| None => self.tail = prev, |
| Some(n) => n.as_ref().prev.set(prev), |
| } |
| |
| // If this was the first unnotified entry, move the pointer to the next one. |
| if self.start == Some(entry) { |
| self.start = next; |
| } |
| |
| // Extract the state. |
| let state = if ptr::eq(entry.as_ptr(), cache.as_ptr()) { |
| // Free the cached entry. |
| self.cache_used = false; |
| entry.as_ref().state.replace(State::Created) |
| } else { |
| // Deallocate the entry. |
| Box::from_raw(entry.as_ptr()).state.into_inner() |
| }; |
| |
| // Update the counters. |
| if state.is_notified() { |
| self.notified -= 1; |
| } |
| self.len -= 1; |
| |
| state |
| } |
| } |
| |
| /// Notifies a number of entries. |
| #[cold] |
| fn notify(&mut self, mut n: usize) { |
| if n <= self.notified { |
| return; |
| } |
| n -= self.notified; |
| |
| while n > 0 { |
| n -= 1; |
| |
| // Notify the first unnotified entry. |
| match self.start { |
| None => break, |
| Some(e) => { |
| // Get the entry and move the pointer forward. |
| let e = unsafe { e.as_ref() }; |
| self.start = e.next.get(); |
| |
| // Set the state of this entry to `Notified` and notify. |
| match e.state.replace(State::Notified(false)) { |
| State::Notified(_) => {} |
| State::Created => {} |
| State::Polling(w) => w.wake(), |
| State::Waiting(t) => t.unpark(), |
| } |
| |
| // Update the counter. |
| self.notified += 1; |
| } |
| } |
| } |
| } |
| |
| /// Notifies a number of additional entries. |
| #[cold] |
| fn notify_additional(&mut self, mut n: usize) { |
| while n > 0 { |
| n -= 1; |
| |
| // Notify the first unnotified entry. |
| match self.start { |
| None => break, |
| Some(e) => { |
| // Get the entry and move the pointer forward. |
| let e = unsafe { e.as_ref() }; |
| self.start = e.next.get(); |
| |
| // Set the state of this entry to `Notified` and notify. |
| match e.state.replace(State::Notified(true)) { |
| State::Notified(_) => {} |
| State::Created => {} |
| State::Polling(w) => w.wake(), |
| State::Waiting(t) => t.unpark(), |
| } |
| |
| // Update the counter. |
| self.notified += 1; |
| } |
| } |
| } |
| } |
| } |
| |
| /// Equivalent to `atomic::fence(Ordering::SeqCst)`, but in some cases faster. |
| #[inline] |
| fn full_fence() { |
| if cfg!(any(target_arch = "x86", target_arch = "x86_64")) { |
| // HACK(stjepang): On x86 architectures there are two different ways of executing |
| // a `SeqCst` fence. |
| // |
| // 1. `atomic::fence(SeqCst)`, which compiles into a `mfence` instruction. |
| // 2. `_.compare_and_swap(_, _, SeqCst)`, which compiles into a `lock cmpxchg` instruction. |
| // |
| // Both instructions have the effect of a full barrier, but empirical benchmarks have shown |
| // that the second one is sometimes a bit faster. |
| // |
| // The ideal solution here would be to use inline assembly, but we're instead creating a |
| // temporary atomic variable and compare-and-exchanging its value. No sane compiler to |
| // x86 platforms is going to optimize this away. |
| let a = AtomicUsize::new(0); |
| a.compare_and_swap(0, 1, Ordering::SeqCst); |
| } else { |
| atomic::fence(Ordering::SeqCst); |
| } |
| } |