blob: 0ad590dd46796173bc89597a88933581297a06fe [file] [log] [blame]
use std::{cmp, fmt};
use std::cell::RefCell;
use std::os::unix::io::RawFd;
use std::collections::HashMap;
use std::sync::atomic::{AtomicUsize, Ordering, ATOMIC_USIZE_INIT};
use std::time::Duration;
use libc::{self, time_t};
use {io, Ready, PollOpt, Token};
use event::{self, Event};
use sys::unix::cvt;
use sys::unix::io::set_cloexec;
/// Each Selector has a globally unique(ish) ID associated with it. This ID
/// gets tracked by `TcpStream`, `TcpListener`, etc... when they are first
/// registered with the `Selector`. If a type that is previously associated with
/// a `Selector` attempts to register itself with a different `Selector`, the
/// operation will return with an error. This matches windows behavior.
static NEXT_ID: AtomicUsize = ATOMIC_USIZE_INIT;
pub struct Selector {
id: usize,
kq: RawFd,
changes: RefCell<KeventList>,
}
impl Selector {
pub fn new() -> io::Result<Selector> {
// offset by 1 to avoid choosing 0 as the id of a selector
let id = NEXT_ID.fetch_add(1, Ordering::Relaxed) + 1;
let kq = unsafe { try!(cvt(libc::kqueue())) };
drop(set_cloexec(kq));
Ok(Selector {
id: id,
kq: kq,
changes: RefCell::new(KeventList(Vec::new())),
})
}
pub fn id(&self) -> usize {
self.id
}
pub fn select(&self, evts: &mut Events, awakener: Token, timeout: Option<Duration>) -> io::Result<bool> {
let timeout = timeout.map(|to| {
libc::timespec {
tv_sec: cmp::min(to.as_secs(), time_t::max_value() as u64) as time_t,
tv_nsec: to.subsec_nanos() as libc::c_long,
}
});
let timeout = timeout.as_ref().map(|s| s as *const _).unwrap_or(0 as *const _);
unsafe {
let cnt = try!(cvt(libc::kevent(self.kq,
0 as *const _,
0,
evts.sys_events.0.as_mut_ptr(),
evts.sys_events.0.capacity() as i32,
timeout)));
self.changes.borrow_mut().0.clear();
evts.sys_events.0.set_len(cnt as usize);
Ok(evts.coalesce(awakener))
}
}
pub fn register(&self, fd: RawFd, token: Token, interests: Ready, opts: PollOpt) -> io::Result<()> {
trace!("registering; token={:?}; interests={:?}", token, interests);
self.ev_register(fd,
token.into(),
libc::EVFILT_READ,
interests.contains(Ready::readable()),
opts);
self.ev_register(fd,
token.into(),
libc::EVFILT_WRITE,
interests.contains(Ready::writable()),
opts);
self.flush_changes()
}
pub fn reregister(&self, fd: RawFd, token: Token, interests: Ready, opts: PollOpt) -> io::Result<()> {
// Just need to call register here since EV_ADD is a mod if already
// registered
self.register(fd, token, interests, opts)
}
pub fn deregister(&self, fd: RawFd) -> io::Result<()> {
self.ev_push(fd, 0, libc::EVFILT_READ, libc::EV_DELETE);
self.ev_push(fd, 0, libc::EVFILT_WRITE, libc::EV_DELETE);
self.flush_changes()
}
fn ev_register(&self,
fd: RawFd,
token: usize,
filter: i16,
enable: bool,
opts: PollOpt) {
let mut flags = libc::EV_ADD;
if enable {
flags = flags | libc::EV_ENABLE;
} else {
flags = flags | libc::EV_DISABLE;
}
if opts.contains(PollOpt::edge()) {
flags = flags | libc::EV_CLEAR;
}
if opts.contains(PollOpt::oneshot()) {
flags = flags | libc::EV_ONESHOT;
}
self.ev_push(fd, token, filter, flags);
}
fn ev_push(&self,
fd: RawFd,
token: usize,
filter: i16,
flags: u16) {
self.changes.borrow_mut().0.push(libc::kevent {
ident: fd as ::libc::uintptr_t,
filter: filter,
flags: flags,
fflags: 0,
data: 0,
udata: token as *mut _,
});
}
fn flush_changes(&self) -> io::Result<()> {
unsafe {
let mut changes = self.changes.borrow_mut();
try!(cvt(libc::kevent(self.kq,
changes.0.as_mut_ptr() as *const _,
changes.0.len() as i32,
0 as *mut _,
0,
0 as *const _)));
changes.0.clear();
Ok(())
}
}
}
impl fmt::Debug for Selector {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("Selector")
.field("id", &self.id)
.field("kq", &self.kq)
.field("changes", &self.changes.borrow().0.len())
.finish()
}
}
impl Drop for Selector {
fn drop(&mut self) {
unsafe {
let _ = libc::close(self.kq);
}
}
}
pub struct Events {
sys_events: KeventList,
events: Vec<Event>,
event_map: HashMap<Token, usize>,
}
struct KeventList(Vec<libc::kevent>);
unsafe impl Send for KeventList {}
unsafe impl Sync for KeventList {}
impl Events {
pub fn with_capacity(cap: usize) -> Events {
Events {
sys_events: KeventList(Vec::with_capacity(cap)),
events: Vec::with_capacity(cap),
event_map: HashMap::with_capacity(cap)
}
}
#[inline]
pub fn len(&self) -> usize {
self.events.len()
}
#[inline]
pub fn is_empty(&self) -> bool {
self.events.is_empty()
}
pub fn get(&self, idx: usize) -> Option<Event> {
self.events.get(idx).map(|e| *e)
}
fn coalesce(&mut self, awakener: Token) -> bool {
let mut ret = false;
self.events.clear();
self.event_map.clear();
for e in self.sys_events.0.iter() {
let token = Token(e.udata as usize);
let len = self.events.len();
if token == awakener {
// TODO: Should this return an error if event is an error. It
// is not critical as spurious wakeups are permitted.
ret = true;
continue;
}
let idx = *self.event_map.entry(token)
.or_insert(len);
if idx == len {
// New entry, insert the default
self.events.push(Event::new(Ready::none(), token));
}
if e.flags & libc::EV_ERROR != 0 {
event::kind_mut(&mut self.events[idx]).insert(Ready::error());
}
if e.filter == libc::EVFILT_READ {
event::kind_mut(&mut self.events[idx]).insert(Ready::readable());
} else if e.filter == libc::EVFILT_WRITE {
event::kind_mut(&mut self.events[idx]).insert(Ready::writable());
}
if e.flags & libc::EV_EOF != 0 {
event::kind_mut(&mut self.events[idx]).insert(Ready::hup());
// When the read end of the socket is closed, EV_EOF is set on
// flags, and fflags contains the error if there is one.
if e.fflags != 0 {
event::kind_mut(&mut self.events[idx]).insert(Ready::error());
}
}
}
ret
}
pub fn push_event(&mut self, event: Event) {
self.events.push(event);
}
}
impl fmt::Debug for Events {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(fmt, "Events {{ len: {} }}", self.sys_events.0.len())
}
}