blob: 44f68cc6835f88428a4c4882231c88cc0ea77495 [file] [log] [blame]
use {Handler, Evented, Poll, NotifyError, Token};
use event::{IoEvent, EventSet, PollOpt};
use notify::Notify;
use timer::{Timer, Timeout, TimerResult};
use std::default::Default;
use std::{io, fmt, thread, usize};
/// Configure EventLoop runtime details
#[derive(Copy, Clone, Debug)]
pub struct EventLoopConfig {
pub io_poll_timeout_ms: usize,
// == Notifications ==
pub notify_capacity: usize,
pub messages_per_tick: usize,
// == Timer ==
pub timer_tick_ms: u64,
pub timer_wheel_size: usize,
pub timer_capacity: usize,
}
impl Default for EventLoopConfig {
fn default() -> EventLoopConfig {
EventLoopConfig {
io_poll_timeout_ms: 1_000,
notify_capacity: 4_096,
messages_per_tick: 256,
timer_tick_ms: 100,
timer_wheel_size: 1_024,
timer_capacity: 65_536,
}
}
}
/// Single threaded IO event loop.
#[derive(Debug)]
pub struct EventLoop<H: Handler> {
run: bool,
poll: Poll,
timer: Timer<H::Timeout>,
notify: Notify<H::Message>,
config: EventLoopConfig,
}
// Token used to represent notifications
const NOTIFY: Token = Token(usize::MAX);
impl<H: Handler> EventLoop<H> {
/// Initializes a new event loop using default configuration settings. The
/// event loop will not be running yet.
pub fn new() -> io::Result<EventLoop<H>> {
EventLoop::configured(Default::default())
}
pub fn configured(config: EventLoopConfig) -> io::Result<EventLoop<H>> {
// Create the IO poller
let mut poll = try!(Poll::new());
// Create the timer
let mut timer = Timer::new(
config.timer_tick_ms,
config.timer_wheel_size,
config.timer_capacity);
// Create cross thread notification queue
let notify = try!(Notify::with_capacity(config.notify_capacity));
// Register the notification wakeup FD with the IO poller
try!(poll.register(&notify, NOTIFY, EventSet::readable() | EventSet::writable() , PollOpt::edge()));
// Set the timer's starting time reference point
timer.setup();
Ok(EventLoop {
run: true,
poll: poll,
timer: timer,
notify: notify,
config: config,
})
}
/// Returns a sender that allows sending messages to the event loop in a
/// thread-safe way, waking up the event loop if needed.
///
/// # Example
/// ```
/// use std::thread;
/// use mio::{EventLoop, Handler};
///
/// struct MyHandler;
///
/// impl Handler for MyHandler {
/// type Timeout = ();
/// type Message = u32;
///
/// fn notify(&mut self, event_loop: &mut EventLoop<MyHandler>, msg: u32) {
/// assert_eq!(msg, 123);
/// event_loop.shutdown();
/// }
/// }
///
/// let mut event_loop = EventLoop::new().unwrap();
/// let sender = event_loop.channel();
///
/// // Send the notification from another thread
/// thread::spawn(move || {
/// let _ = sender.send(123);
/// });
///
/// let _ = event_loop.run(&mut MyHandler);
/// ```
///
/// # Implementation Details
///
/// Each [EventLoop](#) contains a lock-free queue with a pre-allocated
/// buffer size. The size can be changed by modifying
/// [EventLoopConfig.notify_capacity](struct.EventLoopConfig.html#structfield.notify_capacity).
/// When a message is sent to the EventLoop, it is first pushed on to the
/// queue. Then, if the EventLoop is currently running, an atomic flag is
/// set to indicate that the next loop iteration should be started without
/// waiting.
///
/// If the loop is blocked waiting for IO events, then it is woken up. The
/// strategy for waking up the event loop is platform dependent. For
/// example, on a modern Linux OS, eventfd is used. On older OSes, a pipe
/// is used.
///
/// The strategy of setting an atomic flag if the event loop is not already
/// sleeping allows avoiding an expensive wakeup operation if at all possible.
pub fn channel(&self) -> Sender<H::Message> {
Sender::new(self.notify.clone())
}
/// Schedules a timeout after the requested time interval. When the
/// duration has been reached,
/// [Handler::timeout](trait.Handler.html#method.timeout) will be invoked
/// passing in the supplied token.
///
/// Returns a handle to the timeout that can be used to cancel the timeout
/// using [#clear_timeout](#method.clear_timeout).
///
/// # Example
/// ```
/// use mio::{EventLoop, Handler};
///
/// struct MyHandler;
///
/// impl Handler for MyHandler {
/// type Timeout = u32;
/// type Message = ();
///
/// fn timeout(&mut self, event_loop: &mut EventLoop<MyHandler>, timeout: u32) {
/// assert_eq!(timeout, 123);
/// event_loop.shutdown();
/// }
/// }
///
///
/// let mut event_loop = EventLoop::new().unwrap();
/// let timeout = event_loop.timeout_ms(123, 300).unwrap();
/// let _ = event_loop.run(&mut MyHandler);
/// ```
pub fn timeout_ms(&mut self, token: H::Timeout, delay: u64) -> TimerResult<Timeout> {
self.timer.timeout_ms(token, delay)
}
/// If the supplied timeout has not been triggered, cancel it such that it
/// will not be triggered in the future.
pub fn clear_timeout(&mut self, timeout: Timeout) -> bool {
self.timer.clear(timeout)
}
/// Tells the event loop to exit after it is done handling all events in the
/// current iteration.
pub fn shutdown(&mut self) {
self.run = false;
}
/// Indicates whether the event loop is currently running. If it's not it has either
/// stopped or is scheduled to stop on the next tick.
pub fn is_running(&self) -> bool {
self.run
}
/// Registers an IO handle with the event loop.
pub fn register<E: ?Sized>(&mut self, io: &E, token: Token) -> io::Result<()>
where E: Evented
{
self.poll.register(io, token, EventSet::all(), PollOpt::level())
}
/// Registers an IO handle with the event loop.
pub fn register_opt<E: ?Sized>(&mut self, io: &E, token: Token, interest: EventSet, opt: PollOpt) -> io::Result<()>
where E: Evented
{
self.poll.register(io, token, interest, opt)
}
/// Re-Registers an IO handle with the event loop.
pub fn reregister<E: ?Sized>(&mut self, io: &E, token: Token, interest: EventSet, opt: PollOpt) -> io::Result<()>
where E: Evented
{
self.poll.reregister(io, token, interest, opt)
}
/// Keep spinning the event loop indefinitely, and notify the handler whenever
/// any of the registered handles are ready.
pub fn run(&mut self, handler: &mut H) -> io::Result<()> {
self.run = true;
while self.run {
// Execute ticks as long as the event loop is running
try!(self.run_once(handler));
}
Ok(())
}
/// Deregisters an IO handle with the event loop.
pub fn deregister<E: ?Sized>(&mut self, io: &E) -> io::Result<()> where E: Evented {
self.poll.deregister(io)
}
/// Spin the event loop once, with a timeout of one second, and notify the
/// handler if any of the registered handles become ready during that
/// time.
pub fn run_once(&mut self, handler: &mut H) -> io::Result<()> {
let mut messages;
trace!("event loop tick");
// Check the notify channel for any pending messages. If there are any,
// avoid blocking when polling for IO events. Messages will be
// processed after IO events.
messages = self.notify.check(self.config.messages_per_tick, true);
let pending = messages > 0;
// Check the registered IO handles for any new events. Each poll
// is for one second, so a shutdown request can last as long as
// one second before it takes effect.
let events = match self.io_poll(pending) {
Ok(e) => e,
Err(err) => {
if err.kind() == io::ErrorKind::Interrupted {
handler.interrupted(self);
0
} else {
return Err(err);
}
}
};
if !pending {
// Indicate that the sleep period is over, also grab any additional
// messages
let remaining = self.config.messages_per_tick - messages;
messages += self.notify.check(remaining, false);
}
self.io_process(handler, events);
self.notify(handler, messages);
self.timer_process(handler);
Ok(())
}
#[inline]
fn io_poll(&mut self, immediate: bool) -> io::Result<usize> {
if immediate {
self.poll.poll(0)
} else {
let mut sleep = self.timer.next_tick_in_ms() as usize;
if sleep > self.config.io_poll_timeout_ms {
sleep = self.config.io_poll_timeout_ms;
}
self.poll.poll(sleep)
}
}
// Process IO events that have been previously polled
fn io_process(&mut self, handler: &mut H, cnt: usize) {
let mut i = 0;
// Iterate over the notifications. Each event provides the token
// it was registered with (which usually represents, at least, the
// handle that the event is about) as well as information about
// what kind of event occurred (readable, writable, signal, etc.)
while i < cnt {
let evt = self.poll.event(i);
trace!("event={:?}", evt);
match evt.token {
NOTIFY => self.notify.cleanup(),
_ => self.io_event(handler, evt)
}
i += 1;
}
}
fn io_event(&mut self, handler: &mut H, evt: IoEvent) {
handler.ready(self, evt.token, evt.kind);
}
fn notify(&mut self, handler: &mut H, mut cnt: usize) {
while cnt > 0 {
match self.notify.poll() {
Some(msg) => {
handler.notify(self, msg);
cnt -= 1;
},
// If we expect messages, but the queue seems empty, a context
// switch has occurred in the queue's push() method between
// reserving a slot and marking that slot; let's spin for
// what should be a very brief period of time until the push
// is done.
None => thread::yield_now(),
}
}
}
fn timer_process(&mut self, handler: &mut H) {
let now = self.timer.now();
loop {
match self.timer.tick_to(now) {
Some(t) => handler.timeout(self, t),
_ => return
}
}
}
}
unsafe impl<H: Handler> Sync for EventLoop<H> { }
impl <H: Handler> Drop for EventLoop<H> {
fn drop(&mut self) {
self.notify.close();
}
}
/// Sends messages to the EventLoop from other threads.
pub struct Sender<M: Send> {
notify: Notify<M>
}
impl<M: Send> Clone for Sender<M> {
fn clone(&self) -> Sender<M> {
Sender { notify: self.notify.clone() }
}
}
impl<M: Send> fmt::Debug for Sender<M> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(fmt, "Sender<?> {{ ... }}")
}
}
unsafe impl<M: Send> Sync for Sender<M> { }
impl<M: Send> Sender<M> {
fn new(notify: Notify<M>) -> Sender<M> {
Sender { notify: notify }
}
pub fn send(&self, msg: M) -> Result<(), NotifyError<M>> {
self.notify.notify(msg)
}
}
#[cfg(test)]
mod tests {
use std::str;
use std::sync::Arc;
use std::sync::atomic::AtomicIsize;
use std::sync::atomic::Ordering::SeqCst;
use super::EventLoop;
use {buf, unix, Buf, Handler, Token, TryRead, TryWrite, EventSet};
#[test]
pub fn test_event_loop_size() {
use std::mem;
assert!(512 >= mem::size_of::<EventLoop<Funtimes>>());
}
struct Funtimes {
rcount: Arc<AtomicIsize>,
wcount: Arc<AtomicIsize>
}
impl Funtimes {
fn new(rcount: Arc<AtomicIsize>, wcount: Arc<AtomicIsize>) -> Funtimes {
Funtimes {
rcount: rcount,
wcount: wcount
}
}
}
impl Handler for Funtimes {
type Timeout = usize;
type Message = ();
fn ready(&mut self, _event_loop: &mut EventLoop<Funtimes>, token: Token, events: EventSet) {
if events.is_readable() {
(*self.rcount).fetch_add(1, SeqCst);
assert_eq!(token, Token(10));
}
if events.is_writable() {
(*self.wcount).fetch_add(1, SeqCst);
assert_eq!(token, Token(10));
}
}
}
#[test]
pub fn test_readable() {
let mut event_loop = EventLoop::new().ok().expect("Couldn't make event loop");
let (mut reader, mut writer) = unix::pipe().unwrap();
let rcount = Arc::new(AtomicIsize::new(0));
let wcount = Arc::new(AtomicIsize::new(0));
let mut handler = Funtimes::new(rcount.clone(), wcount.clone());
writer.try_write_buf(&mut buf::SliceBuf::wrap("hello".as_bytes())).unwrap();
event_loop.register(&reader, Token(10)).unwrap();
let _ = event_loop.run_once(&mut handler);
let mut b = buf::ByteBuf::mut_with_capacity(16);
assert_eq!((*rcount).load(SeqCst), 1);
reader.try_read_buf(&mut b).unwrap();
assert_eq!(str::from_utf8(b.flip().bytes()).unwrap(), "hello");
}
}