| use {sys, Evented, EventSet, PollOpt, Selector, Token}; |
| use util::BoundedQueue; |
| use std::{fmt, cmp, io}; |
| use std::sync::Arc; |
| use std::sync::atomic::AtomicIsize; |
| use std::sync::atomic::Ordering::Relaxed; |
| |
| const SLEEP: isize = -1; |
| const CLOSED: isize = -2; |
| |
| /// Send notifications to the event loop, waking it up if necessary. If the |
| /// event loop is not currently sleeping, avoid using an OS wake-up strategy |
| /// (eventfd, pipe, ...). Backed by a pre-allocated lock free MPMC queue. |
| /// |
| /// TODO: Use more efficient wake-up strategy if available |
| pub struct Notify<M: Send> { |
| inner: Arc<NotifyInner<M>> |
| } |
| |
| impl<M: Send> Notify<M> { |
| #[inline] |
| pub fn with_capacity(capacity: usize) -> io::Result<Notify<M>> { |
| Ok(Notify { |
| inner: Arc::new(try!(NotifyInner::with_capacity(capacity))) |
| }) |
| } |
| |
| #[inline] |
| pub fn check(&self, max: usize, will_sleep: bool) -> usize { |
| self.inner.check(max, will_sleep) |
| } |
| |
| #[inline] |
| pub fn notify(&self, value: M) -> Result<(), NotifyError<M>> { |
| self.inner.notify(value) |
| } |
| |
| #[inline] |
| pub fn poll(&self) -> Option<M> { |
| self.inner.poll() |
| } |
| |
| #[inline] |
| pub fn cleanup(&self) { |
| self.inner.cleanup(); |
| } |
| |
| #[inline] |
| pub fn close(&self) { |
| self.inner.close(); |
| } |
| } |
| |
| impl<M: Send> Clone for Notify<M> { |
| fn clone(&self) -> Notify<M> { |
| Notify { |
| inner: self.inner.clone() |
| } |
| } |
| } |
| |
| impl<M> fmt::Debug for Notify<M> { |
| fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { |
| write!(fmt, "Notify<?>") |
| } |
| } |
| |
| unsafe impl<M: Send> Sync for Notify<M> { } |
| unsafe impl<M: Send> Send for Notify<M> { } |
| |
| struct NotifyInner<M> { |
| state: AtomicIsize, |
| queue: BoundedQueue<M>, |
| awaken: sys::Awakener |
| } |
| |
| impl<M: Send> NotifyInner<M> { |
| fn with_capacity(capacity: usize) -> io::Result<NotifyInner<M>> { |
| Ok(NotifyInner { |
| state: AtomicIsize::new(0), |
| queue: BoundedQueue::with_capacity(capacity), |
| awaken: try!(sys::Awakener::new()) |
| }) |
| } |
| |
| fn check(&self, max: usize, will_sleep: bool) -> usize { |
| let max = max as isize; |
| let mut cur = self.state.load(Relaxed); |
| let mut nxt; |
| let mut val; |
| |
| loop { |
| // This should be impossible if close() in only called from the event loop destructor |
| debug_assert!(cur != CLOSED); |
| |
| // If there are pending messages, then whether or not the event loop |
| // was planning to sleep does not matter - it will not sleep. |
| if cur > 0 { |
| if max >= cur { |
| nxt = 0; |
| } else { |
| nxt = cur - max; |
| } |
| } else { |
| if will_sleep { |
| nxt = SLEEP; |
| } else { |
| nxt = 0; |
| } |
| } |
| |
| val = self.state.compare_and_swap(cur, nxt, Relaxed); |
| |
| if val == cur { |
| break; |
| } |
| |
| cur = val; |
| } |
| |
| if cur < 0 { |
| 0 |
| } else { |
| cmp::min(cur, max) as usize |
| } |
| } |
| |
| fn poll(&self) -> Option<M> { |
| self.queue.pop() |
| } |
| |
| fn notify(&self, value: M) -> Result<(), NotifyError<M>> { |
| let mut cur = self.state.load(Relaxed); |
| |
| if cur == CLOSED { |
| // The receiving end has already hung up |
| return Err(NotifyError::Closed(Some(value))); |
| } |
| |
| // First, push the message onto the queue |
| if let Err(value) = self.queue.push(value) { |
| return Err(NotifyError::Full(value)); |
| } |
| |
| let mut nxt; |
| let mut val; |
| |
| loop { |
| nxt = match cur { |
| CLOSED => { |
| // The receiving end has hung up, and we cannot reliably get our message back |
| // We poll 1 message from the queue to make sure that no message is stuck |
| let _ = self.queue.pop(); |
| return Err(NotifyError::Closed(None)); |
| } |
| SLEEP => { 1 } |
| _ => { cur + 1 } |
| }; |
| |
| val = self.state.compare_and_swap(cur, nxt, Relaxed); |
| |
| if val == cur { |
| break; |
| } |
| |
| cur = val; |
| } |
| |
| if cur == SLEEP { |
| if let Err(e) = self.awaken.wakeup() { |
| return Err(NotifyError::Io(e)); |
| } |
| } |
| |
| Ok(()) |
| } |
| |
| fn close(&self) { |
| self.state.swap(CLOSED, Relaxed); |
| while let Some(m) = self.queue.pop() { |
| drop(m); |
| } |
| } |
| |
| fn cleanup(&self) { |
| self.awaken.cleanup(); |
| } |
| } |
| |
| impl<M: Send> Evented for Notify<M> { |
| fn register(&self, selector: &mut Selector, token: Token, interest: EventSet, opts: PollOpt) -> io::Result<()> { |
| self.inner.awaken.register(selector, token, interest, opts) |
| } |
| |
| fn reregister(&self, selector: &mut Selector, token: Token, interest: EventSet, opts: PollOpt) -> io::Result<()> { |
| self.inner.awaken.reregister(selector, token, interest, opts) |
| } |
| |
| fn deregister(&self, selector: &mut Selector) -> io::Result<()> { |
| self.inner.awaken.deregister(selector) |
| } |
| } |
| |
| pub enum NotifyError<T> { |
| Io(io::Error), |
| Full(T), |
| Closed(Option<T>), |
| } |
| |
| impl<M> fmt::Debug for NotifyError<M> { |
| fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { |
| match *self { |
| NotifyError::Io(ref e) => { |
| write!(fmt, "NotifyError::Io({:?})", e) |
| } |
| NotifyError::Full(..) => { |
| write!(fmt, "NotifyError::Full(..)") |
| } |
| NotifyError::Closed(..) => { |
| write!(fmt, "NotifyError::Closed(..)") |
| } |
| } |
| } |
| } |