POSIX AIO support (#586)
* POSIX AIO support
On BSD-based operating systems, add low-level support for queuing AIO
operations in the event loop. This commit adds 3 new public methods:
UnixReady::aio, UnixReady::is_aio, and Poll.as_raw_fd. Full support
will be added in a separate crate.
* POSIX AIO support
Revert f8a8e4a's changes to ReadinessState. Add unit tests.
diff --git a/src/event_imp.rs b/src/event_imp.rs
index 4eaafc0..556a373 100644
--- a/src/event_imp.rs
+++ b/src/event_imp.rs
@@ -543,10 +543,10 @@
#[derive(Copy, PartialEq, Eq, Clone, PartialOrd, Ord)]
pub struct Ready(usize);
-const READABLE: usize = 0b0001;
-const WRITABLE: usize = 0b0010;
-const ERROR: usize = 0b0100;
-const HUP: usize = 0b1000;
+const READABLE: usize = 0b00001;
+const WRITABLE: usize = 0b00010;
+const ERROR: usize = 0b00100;
+const HUP: usize = 0b01000;
impl Ready {
/// Returns the empty `Ready` set.
@@ -860,6 +860,7 @@
}
}
+// TODO: impl Debug for UnixReady
impl fmt::Debug for Ready {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
let mut one = false;
diff --git a/src/poll.rs b/src/poll.rs
index 1def059..e430d01 100644
--- a/src/poll.rs
+++ b/src/poll.rs
@@ -3,10 +3,16 @@
use std::{fmt, io, ptr, usize};
use std::cell::UnsafeCell;
use std::{mem, ops, isize};
+#[cfg(unix)]
+use std::os::unix::io::AsRawFd;
+#[cfg(unix)]
+use std::os::unix::io::RawFd;
use std::sync::{Arc, Mutex, Condvar};
use std::sync::atomic::{AtomicUsize, AtomicPtr, AtomicBool};
use std::sync::atomic::Ordering::{self, Acquire, Release, AcqRel, Relaxed, SeqCst};
use std::time::{Duration, Instant};
+#[cfg(unix)]
+use sys::unix::UnixReady;
// Poll is backed by two readiness queues. The first is a system readiness queue
// represented by `sys::Selector`. The system readiness queue handles events
@@ -1098,13 +1104,24 @@
}
}
+#[cfg(unix)]
+fn registerable(interest: Ready) -> bool {
+ let unixinterest = UnixReady::from(interest);
+ unixinterest.is_readable() || unixinterest.is_writable() || unixinterest.is_aio()
+}
+
+#[cfg(not(unix))]
+fn registerable(interest: Ready) -> bool {
+ interest.is_readable() || interest.is_writable()
+}
+
fn validate_args(token: Token, interest: Ready) -> io::Result<()> {
if token == AWAKEN {
return Err(io::Error::new(io::ErrorKind::Other, "invalid token"));
}
- if !interest.is_readable() && !interest.is_writable() {
- return Err(io::Error::new(io::ErrorKind::Other, "interest must include readable or writable"));
+ if !registerable(interest) {
+ return Err(io::Error::new(io::ErrorKind::Other, "interest must include readable or writable or aio"));
}
Ok(())
@@ -1116,6 +1133,13 @@
}
}
+#[cfg(unix)]
+impl AsRawFd for Poll {
+ fn as_raw_fd(&self) -> RawFd {
+ self.selector.as_raw_fd()
+ }
+}
+
/// A collection of readiness events.
///
/// `Events` is passed as an argument to [`Poll::poll`] and will be used to
@@ -2454,3 +2478,10 @@
}
}
}
+
+#[test]
+#[cfg(unix)]
+pub fn as_raw_fd() {
+ let poll = Poll::new().unwrap();
+ assert!(poll.as_raw_fd() > 0);
+}
diff --git a/src/sys/unix/epoll.rs b/src/sys/unix/epoll.rs
index 21f4de9..b9d4638 100644
--- a/src/sys/unix/epoll.rs
+++ b/src/sys/unix/epoll.rs
@@ -1,4 +1,5 @@
#![allow(deprecated)]
+use std::os::unix::io::AsRawFd;
use std::os::unix::io::RawFd;
use std::sync::atomic::{AtomicUsize, Ordering, ATOMIC_USIZE_INIT};
use std::time::Duration;
@@ -184,6 +185,12 @@
kind as u32
}
+impl AsRawFd for Selector {
+ fn as_raw_fd(&self) -> RawFd {
+ self.epfd
+ }
+}
+
impl Drop for Selector {
fn drop(&mut self) {
unsafe {
diff --git a/src/sys/unix/kqueue.rs b/src/sys/unix/kqueue.rs
index b47be1a..3aba51b 100644
--- a/src/sys/unix/kqueue.rs
+++ b/src/sys/unix/kqueue.rs
@@ -1,5 +1,6 @@
use std::{cmp, fmt, ptr};
use std::os::raw::c_int;
+use std::os::unix::io::AsRawFd;
use std::os::unix::io::RawFd;
use std::collections::HashMap;
use std::sync::atomic::{AtomicUsize, Ordering, ATOMIC_USIZE_INIT};
@@ -172,6 +173,12 @@
}
}
+impl AsRawFd for Selector {
+ fn as_raw_fd(&self) -> RawFd {
+ self.kq
+ }
+}
+
impl Drop for Selector {
fn drop(&mut self) {
unsafe {
@@ -253,6 +260,13 @@
} else if e.filter == libc::EVFILT_WRITE {
event::kind_mut(&mut self.events[idx]).insert(Ready::writable());
}
+#[cfg(any(target_os = "dragonfly",
+ target_os = "freebsd", target_os = "ios", target_os = "macos"))]
+ {
+ if e.filter == libc::EVFILT_AIO {
+ event::kind_mut(&mut self.events[idx]).insert(UnixReady::aio());
+ }
+ }
if e.flags & libc::EV_EOF != 0 {
event::kind_mut(&mut self.events[idx]).insert(UnixReady::hup());
@@ -299,3 +313,14 @@
evtloop.register(&kqf, Token(1234), Ready::readable(),
PollOpt::edge() | PollOpt::oneshot()).unwrap();
}
+
+#[cfg(any(target_os = "dragonfly",
+ target_os = "freebsd", target_os = "ios", target_os = "macos"))]
+#[test]
+fn test_coalesce_aio() {
+ let mut events = Events::with_capacity(1);
+ events.sys_events.0.push(kevent!(0x1234, libc::EVFILT_AIO, 0, 42));
+ events.coalesce(Token(0));
+ assert!(events.events[0].readiness() == UnixReady::aio().into());
+ assert!(events.events[0].token() == Token(42));
+}
diff --git a/src/sys/unix/ready.rs b/src/sys/unix/ready.rs
index 2b08c23..7c62492 100644
--- a/src/sys/unix/ready.rs
+++ b/src/sys/unix/ready.rs
@@ -84,10 +84,31 @@
#[derive(Debug, Copy, PartialEq, Eq, Clone, PartialOrd, Ord)]
pub struct UnixReady(Ready);
-const ERROR: usize = 0b0100;
-const HUP: usize = 0b1000;
+const ERROR: usize = 0b00100;
+const HUP: usize = 0b01000;
+const AIO: usize = 0b10000;
impl UnixReady {
+ /// Returns a `Ready` representing AIO completion readiness
+ ///
+ /// See [`Poll`] for more documentation on polling.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use mio::unix::UnixReady;
+ ///
+ /// let ready = UnixReady::aio();
+ ///
+ /// assert!(ready.is_aio());
+ /// ```
+ ///
+ /// [`Poll`]: struct.Poll.html
+ #[inline]
+ pub fn aio() -> UnixReady {
+ UnixReady(ready_from_usize(AIO))
+ }
+
/// Returns a `Ready` representing error readiness.
///
/// **Note that only readable and writable readiness is guaranteed to be
@@ -143,6 +164,24 @@
UnixReady(ready_from_usize(HUP))
}
+ /// Returns true if `Ready` contains AIO readiness
+ ///
+ /// See [`Poll`] for more documentation on polling.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use mio::unix::UnixReady;
+ ///
+ /// let ready = UnixReady::aio();
+ ///
+ /// assert!(ready.is_aio());
+ /// ```
+ #[inline]
+ pub fn is_aio(&self) -> bool {
+ self.contains(ready_from_usize(AIO))
+ }
+
/// Returns true if the value includes error readiness
///
/// **Note that only readable and writable readiness is guaranteed to be