blob: 8bac337ebf6eca8fd0bb7563351d31bda04a3f0f [file] [log] [blame]
use std::{cmp, fmt, ptr};
#[cfg(not(target_os = "netbsd"))]
use std::os::raw::{c_int, c_short};
use std::os::unix::io::AsRawFd;
use std::os::unix::io::RawFd;
use std::collections::HashMap;
use std::sync::atomic::{AtomicUsize, Ordering, ATOMIC_USIZE_INIT};
use std::time::Duration;
use libc::{self, time_t};
use {io, Ready, PollOpt, Token};
use event_imp::{self as event, Event};
use sys::unix::{cvt, UnixReady};
use sys::unix::io::set_cloexec;
/// Each Selector has a globally unique(ish) ID associated with it. This ID
/// gets tracked by `TcpStream`, `TcpListener`, etc... when they are first
/// registered with the `Selector`. If a type that is previously associated with
/// a `Selector` attempts to register itself with a different `Selector`, the
/// operation will return with an error. This matches windows behavior.
static NEXT_ID: AtomicUsize = ATOMIC_USIZE_INIT;
#[cfg(not(target_os = "netbsd"))]
type Filter = c_short;
#[cfg(not(target_os = "netbsd"))]
type UData = *mut ::libc::c_void;
#[cfg(not(target_os = "netbsd"))]
type Count = c_int;
#[cfg(target_os = "netbsd")]
type Filter = u32;
#[cfg(target_os = "netbsd")]
type UData = ::libc::intptr_t;
#[cfg(target_os = "netbsd")]
type Count = usize;
macro_rules! kevent {
($id: expr, $filter: expr, $flags: expr, $data: expr) => {
libc::kevent {
ident: $id as ::libc::uintptr_t,
filter: $filter as Filter,
flags: $flags,
fflags: 0,
data: 0,
udata: $data as UData,
}
}
}
pub struct Selector {
id: usize,
kq: RawFd,
}
impl Selector {
pub fn new() -> io::Result<Selector> {
// offset by 1 to avoid choosing 0 as the id of a selector
let id = NEXT_ID.fetch_add(1, Ordering::Relaxed) + 1;
let kq = unsafe { cvt(libc::kqueue())? };
drop(set_cloexec(kq));
Ok(Selector {
id,
kq,
})
}
pub fn id(&self) -> usize {
self.id
}
pub fn select(&self, evts: &mut Events, awakener: Token, timeout: Option<Duration>) -> io::Result<bool> {
let timeout = timeout.map(|to| {
libc::timespec {
tv_sec: cmp::min(to.as_secs(), time_t::max_value() as u64) as time_t,
// `Duration::subsec_nanos` is guaranteed to be less than one
// billion (the number of nanoseconds in a second), making the
// cast to i32 safe. The cast itself is needed for platforms
// where C's long is only 32 bits.
tv_nsec: libc::c_long::from(to.subsec_nanos() as i32),
}
});
let timeout = timeout.as_ref().map(|s| s as *const _).unwrap_or(ptr::null_mut());
evts.clear();
unsafe {
let cnt = cvt(libc::kevent(self.kq,
ptr::null(),
0,
evts.sys_events.0.as_mut_ptr(),
evts.sys_events.0.capacity() as Count,
timeout))?;
evts.sys_events.0.set_len(cnt as usize);
Ok(evts.coalesce(awakener))
}
}
pub fn register(&self, fd: RawFd, token: Token, interests: Ready, opts: PollOpt) -> io::Result<()> {
trace!("registering; token={:?}; interests={:?}", token, interests);
let flags = if opts.contains(PollOpt::edge()) { libc::EV_CLEAR } else { 0 } |
if opts.contains(PollOpt::oneshot()) { libc::EV_ONESHOT } else { 0 } |
libc::EV_RECEIPT;
unsafe {
let r = if interests.contains(Ready::readable()) { libc::EV_ADD } else { libc::EV_DELETE };
let w = if interests.contains(Ready::writable()) { libc::EV_ADD } else { libc::EV_DELETE };
let mut changes = [
kevent!(fd, libc::EVFILT_READ, flags | r, usize::from(token)),
kevent!(fd, libc::EVFILT_WRITE, flags | w, usize::from(token)),
];
cvt(libc::kevent(self.kq,
changes.as_ptr(),
changes.len() as Count,
changes.as_mut_ptr(),
changes.len() as Count,
::std::ptr::null()))?;
for change in changes.iter() {
debug_assert_eq!(change.flags & libc::EV_ERROR, libc::EV_ERROR);
// Test to see if an error happened
if change.data == 0 {
continue
}
// Older versions of OSX (10.11 and 10.10 have been witnessed)
// can return EPIPE when registering a pipe file descriptor
// where the other end has already disappeared. For example code
// that creates a pipe, closes a file descriptor, and then
// registers the other end will see an EPIPE returned from
// `register`.
//
// It also turns out that kevent will still report events on the
// file descriptor, telling us that it's readable/hup at least
// after we've done this registration. As a result we just
// ignore `EPIPE` here instead of propagating it.
//
// More info can be found at carllerche/mio#582
if change.data as i32 == libc::EPIPE &&
change.filter == libc::EVFILT_WRITE as Filter {
continue
}
// ignore ENOENT error for EV_DELETE
let orig_flags = if change.filter == libc::EVFILT_READ as Filter { r } else { w };
if change.data as i32 == libc::ENOENT && orig_flags & libc::EV_DELETE != 0 {
continue
}
return Err(::std::io::Error::from_raw_os_error(change.data as i32));
}
Ok(())
}
}
pub fn reregister(&self, fd: RawFd, token: Token, interests: Ready, opts: PollOpt) -> io::Result<()> {
// Just need to call register here since EV_ADD is a mod if already
// registered
self.register(fd, token, interests, opts)
}
pub fn deregister(&self, fd: RawFd) -> io::Result<()> {
unsafe {
// EV_RECEIPT is a nice way to apply changes and get back per-event results while not
// draining the actual changes.
let filter = libc::EV_DELETE | libc::EV_RECEIPT;
#[cfg(not(target_os = "netbsd"))]
let mut changes = [
kevent!(fd, libc::EVFILT_READ, filter, ptr::null_mut()),
kevent!(fd, libc::EVFILT_WRITE, filter, ptr::null_mut()),
];
#[cfg(target_os = "netbsd")]
let mut changes = [
kevent!(fd, libc::EVFILT_READ, filter, 0),
kevent!(fd, libc::EVFILT_WRITE, filter, 0),
];
cvt(libc::kevent(self.kq,
changes.as_ptr(),
changes.len() as Count,
changes.as_mut_ptr(),
changes.len() as Count,
::std::ptr::null())).map(|_| ())?;
if changes[0].data as i32 == libc::ENOENT && changes[1].data as i32 == libc::ENOENT {
return Err(::std::io::Error::from_raw_os_error(changes[0].data as i32));
}
for change in changes.iter() {
debug_assert_eq!(libc::EV_ERROR & change.flags, libc::EV_ERROR);
if change.data != 0 && change.data as i32 != libc::ENOENT {
return Err(::std::io::Error::from_raw_os_error(changes[0].data as i32));
}
}
Ok(())
}
}
}
impl fmt::Debug for Selector {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("Selector")
.field("id", &self.id)
.field("kq", &self.kq)
.finish()
}
}
impl AsRawFd for Selector {
fn as_raw_fd(&self) -> RawFd {
self.kq
}
}
impl Drop for Selector {
fn drop(&mut self) {
unsafe {
let _ = libc::close(self.kq);
}
}
}
pub struct Events {
sys_events: KeventList,
events: Vec<Event>,
event_map: HashMap<Token, usize>,
}
struct KeventList(Vec<libc::kevent>);
unsafe impl Send for KeventList {}
unsafe impl Sync for KeventList {}
impl Events {
pub fn with_capacity(cap: usize) -> Events {
Events {
sys_events: KeventList(Vec::with_capacity(cap)),
events: Vec::with_capacity(cap),
event_map: HashMap::with_capacity(cap)
}
}
#[inline]
pub fn len(&self) -> usize {
self.events.len()
}
#[inline]
pub fn capacity(&self) -> usize {
self.events.capacity()
}
#[inline]
pub fn is_empty(&self) -> bool {
self.events.is_empty()
}
pub fn get(&self, idx: usize) -> Option<Event> {
self.events.get(idx).cloned()
}
fn coalesce(&mut self, awakener: Token) -> bool {
let mut ret = false;
self.events.clear();
self.event_map.clear();
for e in self.sys_events.0.iter() {
let token = Token(e.udata as usize);
let len = self.events.len();
if token == awakener {
// TODO: Should this return an error if event is an error. It
// is not critical as spurious wakeups are permitted.
ret = true;
continue;
}
let idx = *self.event_map.entry(token)
.or_insert(len);
if idx == len {
// New entry, insert the default
self.events.push(Event::new(Ready::empty(), token));
}
if e.flags & libc::EV_ERROR != 0 {
event::kind_mut(&mut self.events[idx]).insert(*UnixReady::error());
}
if e.filter == libc::EVFILT_READ as Filter {
event::kind_mut(&mut self.events[idx]).insert(Ready::readable());
} else if e.filter == libc::EVFILT_WRITE as Filter {
event::kind_mut(&mut self.events[idx]).insert(Ready::writable());
// `EV_EOF` set with `EVFILT_WRITE` indicates the connection has been fully
// disconnected (read and write), but only sockets...
if e.flags & libc::EV_EOF != 0 {
event::kind_mut(&mut self.events[idx]).insert(UnixReady::hup());
// When the read end of the socket is closed, EV_EOF is set on
// flags, and fflags contains the error if there is one.
if e.fflags != 0 {
event::kind_mut(&mut self.events[idx]).insert(UnixReady::error());
}
}
}
#[cfg(any(target_os = "dragonfly",
target_os = "freebsd", target_os = "ios", target_os = "macos"))]
{
if e.filter == libc::EVFILT_AIO {
event::kind_mut(&mut self.events[idx]).insert(UnixReady::aio());
}
}
#[cfg(any(target_os = "freebsd"))]
{
if e.filter == libc::EVFILT_LIO {
event::kind_mut(&mut self.events[idx]).insert(UnixReady::lio());
}
}
}
ret
}
pub fn push_event(&mut self, event: Event) {
self.events.push(event);
}
pub fn clear(&mut self) {
self.sys_events.0.truncate(0);
self.events.truncate(0);
self.event_map.clear();
}
}
impl fmt::Debug for Events {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("Events")
.field("len", &self.sys_events.0.len())
.finish()
}
}
#[test]
fn does_not_register_rw() {
use {Poll, Ready, PollOpt, Token};
use unix::EventedFd;
let kq = unsafe { libc::kqueue() };
let kqf = EventedFd(&kq);
let poll = Poll::new().unwrap();
// registering kqueue fd will fail if write is requested (On anything but some versions of OS
// X)
poll.register(&kqf, Token(1234), Ready::readable(),
PollOpt::edge() | PollOpt::oneshot()).unwrap();
}
#[cfg(any(target_os = "dragonfly",
target_os = "freebsd", target_os = "ios", target_os = "macos"))]
#[test]
fn test_coalesce_aio() {
let mut events = Events::with_capacity(1);
events.sys_events.0.push(kevent!(0x1234, libc::EVFILT_AIO, 0, 42));
events.coalesce(Token(0));
assert!(events.events[0].readiness() == UnixReady::aio().into());
assert!(events.events[0].token() == Token(42));
}