| use {Handler, Evented, Poll, NotifyError, Token}; |
| use event::{IoEvent, EventSet, PollOpt}; |
| use notify::Notify; |
| use timer::{Timer, Timeout, TimerResult}; |
| use std::default::Default; |
| use std::{io, fmt, thread, usize}; |
| |
| /// Configure EventLoop runtime details |
| #[derive(Copy, Clone, Debug)] |
| pub struct EventLoopConfig { |
| pub io_poll_timeout_ms: usize, |
| |
| // == Notifications == |
| pub notify_capacity: usize, |
| pub messages_per_tick: usize, |
| |
| // == Timer == |
| pub timer_tick_ms: u64, |
| pub timer_wheel_size: usize, |
| pub timer_capacity: usize, |
| } |
| |
| impl Default for EventLoopConfig { |
| fn default() -> EventLoopConfig { |
| EventLoopConfig { |
| io_poll_timeout_ms: 1_000, |
| notify_capacity: 4_096, |
| messages_per_tick: 256, |
| timer_tick_ms: 100, |
| timer_wheel_size: 1_024, |
| timer_capacity: 65_536, |
| } |
| } |
| } |
| |
| /// Single threaded IO event loop. |
| #[derive(Debug)] |
| pub struct EventLoop<H: Handler> { |
| run: bool, |
| poll: Poll, |
| timer: Timer<H::Timeout>, |
| notify: Notify<H::Message>, |
| config: EventLoopConfig, |
| } |
| |
| // Token used to represent notifications |
| const NOTIFY: Token = Token(usize::MAX); |
| |
| impl<H: Handler> EventLoop<H> { |
| |
| /// Initializes a new event loop using default configuration settings. The |
| /// event loop will not be running yet. |
| pub fn new() -> io::Result<EventLoop<H>> { |
| EventLoop::configured(Default::default()) |
| } |
| |
| pub fn configured(config: EventLoopConfig) -> io::Result<EventLoop<H>> { |
| // Create the IO poller |
| let mut poll = try!(Poll::new()); |
| |
| // Create the timer |
| let mut timer = Timer::new( |
| config.timer_tick_ms, |
| config.timer_wheel_size, |
| config.timer_capacity); |
| |
| // Create cross thread notification queue |
| let notify = try!(Notify::with_capacity(config.notify_capacity)); |
| |
| // Register the notification wakeup FD with the IO poller |
| try!(poll.register(¬ify, NOTIFY, EventSet::readable() | EventSet::writable() , PollOpt::edge())); |
| |
| // Set the timer's starting time reference point |
| timer.setup(); |
| |
| Ok(EventLoop { |
| run: true, |
| poll: poll, |
| timer: timer, |
| notify: notify, |
| config: config, |
| }) |
| } |
| |
| /// Returns a sender that allows sending messages to the event loop in a |
| /// thread-safe way, waking up the event loop if needed. |
| /// |
| /// # Example |
| /// ``` |
| /// use std::thread; |
| /// use mio::{EventLoop, Handler}; |
| /// |
| /// struct MyHandler; |
| /// |
| /// impl Handler for MyHandler { |
| /// type Timeout = (); |
| /// type Message = u32; |
| /// |
| /// fn notify(&mut self, event_loop: &mut EventLoop<MyHandler>, msg: u32) { |
| /// assert_eq!(msg, 123); |
| /// event_loop.shutdown(); |
| /// } |
| /// } |
| /// |
| /// let mut event_loop = EventLoop::new().unwrap(); |
| /// let sender = event_loop.channel(); |
| /// |
| /// // Send the notification from another thread |
| /// thread::spawn(move || { |
| /// let _ = sender.send(123); |
| /// }); |
| /// |
| /// let _ = event_loop.run(&mut MyHandler); |
| /// ``` |
| /// |
| /// # Implementation Details |
| /// |
| /// Each [EventLoop](#) contains a lock-free queue with a pre-allocated |
| /// buffer size. The size can be changed by modifying |
| /// [EventLoopConfig.notify_capacity](struct.EventLoopConfig.html#structfield.notify_capacity). |
| /// When a message is sent to the EventLoop, it is first pushed on to the |
| /// queue. Then, if the EventLoop is currently running, an atomic flag is |
| /// set to indicate that the next loop iteration should be started without |
| /// waiting. |
| /// |
| /// If the loop is blocked waiting for IO events, then it is woken up. The |
| /// strategy for waking up the event loop is platform dependent. For |
| /// example, on a modern Linux OS, eventfd is used. On older OSes, a pipe |
| /// is used. |
| /// |
| /// The strategy of setting an atomic flag if the event loop is not already |
| /// sleeping allows avoiding an expensive wakeup operation if at all possible. |
| pub fn channel(&self) -> Sender<H::Message> { |
| Sender::new(self.notify.clone()) |
| } |
| |
| /// Schedules a timeout after the requested time interval. When the |
| /// duration has been reached, |
| /// [Handler::timeout](trait.Handler.html#method.timeout) will be invoked |
| /// passing in the supplied token. |
| /// |
| /// Returns a handle to the timeout that can be used to cancel the timeout |
| /// using [#clear_timeout](#method.clear_timeout). |
| /// |
| /// # Example |
| /// ``` |
| /// use mio::{EventLoop, Handler}; |
| /// |
| /// struct MyHandler; |
| /// |
| /// impl Handler for MyHandler { |
| /// type Timeout = u32; |
| /// type Message = (); |
| /// |
| /// fn timeout(&mut self, event_loop: &mut EventLoop<MyHandler>, timeout: u32) { |
| /// assert_eq!(timeout, 123); |
| /// event_loop.shutdown(); |
| /// } |
| /// } |
| /// |
| /// |
| /// let mut event_loop = EventLoop::new().unwrap(); |
| /// let timeout = event_loop.timeout_ms(123, 300).unwrap(); |
| /// let _ = event_loop.run(&mut MyHandler); |
| /// ``` |
| pub fn timeout_ms(&mut self, token: H::Timeout, delay: u64) -> TimerResult<Timeout> { |
| self.timer.timeout_ms(token, delay) |
| } |
| |
| /// If the supplied timeout has not been triggered, cancel it such that it |
| /// will not be triggered in the future. |
| pub fn clear_timeout(&mut self, timeout: Timeout) -> bool { |
| self.timer.clear(timeout) |
| } |
| |
| /// Tells the event loop to exit after it is done handling all events in the |
| /// current iteration. |
| pub fn shutdown(&mut self) { |
| self.run = false; |
| } |
| |
| /// Indicates whether the event loop is currently running. If it's not it has either |
| /// stopped or is scheduled to stop on the next tick. |
| pub fn is_running(&self) -> bool { |
| self.run |
| } |
| |
| /// Registers an IO handle with the event loop. |
| pub fn register<E: ?Sized>(&mut self, io: &E, token: Token) -> io::Result<()> |
| where E: Evented |
| { |
| self.poll.register(io, token, EventSet::all(), PollOpt::level()) |
| } |
| |
| /// Registers an IO handle with the event loop. |
| pub fn register_opt<E: ?Sized>(&mut self, io: &E, token: Token, interest: EventSet, opt: PollOpt) -> io::Result<()> |
| where E: Evented |
| { |
| self.poll.register(io, token, interest, opt) |
| } |
| |
| /// Re-Registers an IO handle with the event loop. |
| pub fn reregister<E: ?Sized>(&mut self, io: &E, token: Token, interest: EventSet, opt: PollOpt) -> io::Result<()> |
| where E: Evented |
| { |
| self.poll.reregister(io, token, interest, opt) |
| } |
| |
| /// Keep spinning the event loop indefinitely, and notify the handler whenever |
| /// any of the registered handles are ready. |
| pub fn run(&mut self, handler: &mut H) -> io::Result<()> { |
| self.run = true; |
| |
| while self.run { |
| // Execute ticks as long as the event loop is running |
| try!(self.run_once(handler)); |
| } |
| |
| Ok(()) |
| } |
| |
| /// Deregisters an IO handle with the event loop. |
| pub fn deregister<E: ?Sized>(&mut self, io: &E) -> io::Result<()> where E: Evented { |
| self.poll.deregister(io) |
| } |
| |
| /// Spin the event loop once, with a timeout of one second, and notify the |
| /// handler if any of the registered handles become ready during that |
| /// time. |
| pub fn run_once(&mut self, handler: &mut H) -> io::Result<()> { |
| let mut messages; |
| |
| trace!("event loop tick"); |
| |
| // Check the notify channel for any pending messages. If there are any, |
| // avoid blocking when polling for IO events. Messages will be |
| // processed after IO events. |
| messages = self.notify.check(self.config.messages_per_tick, true); |
| let pending = messages > 0; |
| |
| // Check the registered IO handles for any new events. Each poll |
| // is for one second, so a shutdown request can last as long as |
| // one second before it takes effect. |
| let events = match self.io_poll(pending) { |
| Ok(e) => e, |
| Err(err) => { |
| if err.kind() == io::ErrorKind::Interrupted { |
| handler.interrupted(self); |
| 0 |
| } else { |
| return Err(err); |
| } |
| } |
| }; |
| |
| if !pending { |
| // Indicate that the sleep period is over, also grab any additional |
| // messages |
| let remaining = self.config.messages_per_tick - messages; |
| messages += self.notify.check(remaining, false); |
| } |
| |
| self.io_process(handler, events); |
| self.notify(handler, messages); |
| self.timer_process(handler); |
| Ok(()) |
| } |
| |
| #[inline] |
| fn io_poll(&mut self, immediate: bool) -> io::Result<usize> { |
| if immediate { |
| self.poll.poll(0) |
| } else { |
| let mut sleep = self.timer.next_tick_in_ms() as usize; |
| |
| if sleep > self.config.io_poll_timeout_ms { |
| sleep = self.config.io_poll_timeout_ms; |
| } |
| |
| self.poll.poll(sleep) |
| } |
| } |
| |
| // Process IO events that have been previously polled |
| fn io_process(&mut self, handler: &mut H, cnt: usize) { |
| let mut i = 0; |
| |
| // Iterate over the notifications. Each event provides the token |
| // it was registered with (which usually represents, at least, the |
| // handle that the event is about) as well as information about |
| // what kind of event occurred (readable, writable, signal, etc.) |
| while i < cnt { |
| let evt = self.poll.event(i); |
| |
| trace!("event={:?}", evt); |
| |
| match evt.token { |
| NOTIFY => self.notify.cleanup(), |
| _ => self.io_event(handler, evt) |
| } |
| |
| i += 1; |
| } |
| } |
| |
| fn io_event(&mut self, handler: &mut H, evt: IoEvent) { |
| handler.ready(self, evt.token, evt.kind); |
| } |
| |
| fn notify(&mut self, handler: &mut H, mut cnt: usize) { |
| while cnt > 0 { |
| match self.notify.poll() { |
| Some(msg) => { |
| handler.notify(self, msg); |
| cnt -= 1; |
| }, |
| // If we expect messages, but the queue seems empty, a context |
| // switch has occurred in the queue's push() method between |
| // reserving a slot and marking that slot; let's spin for |
| // what should be a very brief period of time until the push |
| // is done. |
| None => thread::yield_now(), |
| } |
| } |
| } |
| |
| fn timer_process(&mut self, handler: &mut H) { |
| let now = self.timer.now(); |
| |
| loop { |
| match self.timer.tick_to(now) { |
| Some(t) => handler.timeout(self, t), |
| _ => return |
| } |
| } |
| } |
| } |
| |
| unsafe impl<H: Handler> Sync for EventLoop<H> { } |
| |
| impl <H: Handler> Drop for EventLoop<H> { |
| fn drop(&mut self) { |
| self.notify.close(); |
| } |
| } |
| |
| /// Sends messages to the EventLoop from other threads. |
| pub struct Sender<M: Send> { |
| notify: Notify<M> |
| } |
| |
| impl<M: Send> Clone for Sender<M> { |
| fn clone(&self) -> Sender<M> { |
| Sender { notify: self.notify.clone() } |
| } |
| } |
| |
| impl<M: Send> fmt::Debug for Sender<M> { |
| fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { |
| write!(fmt, "Sender<?> {{ ... }}") |
| } |
| } |
| |
| unsafe impl<M: Send> Sync for Sender<M> { } |
| |
| impl<M: Send> Sender<M> { |
| fn new(notify: Notify<M>) -> Sender<M> { |
| Sender { notify: notify } |
| } |
| |
| pub fn send(&self, msg: M) -> Result<(), NotifyError<M>> { |
| self.notify.notify(msg) |
| } |
| } |
| |
| #[cfg(test)] |
| mod tests { |
| use std::str; |
| use std::sync::Arc; |
| use std::sync::atomic::AtomicIsize; |
| use std::sync::atomic::Ordering::SeqCst; |
| use super::EventLoop; |
| use {buf, unix, Buf, Handler, Token, TryRead, TryWrite, EventSet}; |
| |
| #[test] |
| pub fn test_event_loop_size() { |
| use std::mem; |
| assert!(512 >= mem::size_of::<EventLoop<Funtimes>>()); |
| } |
| |
| struct Funtimes { |
| rcount: Arc<AtomicIsize>, |
| wcount: Arc<AtomicIsize> |
| } |
| |
| impl Funtimes { |
| fn new(rcount: Arc<AtomicIsize>, wcount: Arc<AtomicIsize>) -> Funtimes { |
| Funtimes { |
| rcount: rcount, |
| wcount: wcount |
| } |
| } |
| } |
| |
| impl Handler for Funtimes { |
| type Timeout = usize; |
| type Message = (); |
| |
| fn ready(&mut self, _event_loop: &mut EventLoop<Funtimes>, token: Token, events: EventSet) { |
| if events.is_readable() { |
| (*self.rcount).fetch_add(1, SeqCst); |
| assert_eq!(token, Token(10)); |
| } |
| |
| if events.is_writable() { |
| (*self.wcount).fetch_add(1, SeqCst); |
| assert_eq!(token, Token(10)); |
| } |
| } |
| } |
| |
| #[test] |
| pub fn test_readable() { |
| let mut event_loop = EventLoop::new().ok().expect("Couldn't make event loop"); |
| |
| let (mut reader, mut writer) = unix::pipe().unwrap(); |
| |
| let rcount = Arc::new(AtomicIsize::new(0)); |
| let wcount = Arc::new(AtomicIsize::new(0)); |
| let mut handler = Funtimes::new(rcount.clone(), wcount.clone()); |
| |
| writer.try_write_buf(&mut buf::SliceBuf::wrap("hello".as_bytes())).unwrap(); |
| event_loop.register(&reader, Token(10)).unwrap(); |
| |
| let _ = event_loop.run_once(&mut handler); |
| let mut b = buf::ByteBuf::mut_with_capacity(16); |
| |
| assert_eq!((*rcount).load(SeqCst), 1); |
| |
| reader.try_read_buf(&mut b).unwrap(); |
| |
| assert_eq!(str::from_utf8(b.flip().bytes()).unwrap(), "hello"); |
| } |
| } |