blob: c5b337bca8f658f297e47f010e08cb3dfbafa834 [file] [log] [blame]
use crate::{Interest, Token};
use log::error;
use std::mem::MaybeUninit;
use std::ops::{Deref, DerefMut};
use std::os::unix::io::{AsRawFd, RawFd};
#[cfg(debug_assertions)]
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::time::Duration;
use std::{cmp, io, ptr, slice};
/// Unique id for use as `SelectorId`.
#[cfg(debug_assertions)]
static NEXT_ID: AtomicUsize = AtomicUsize::new(1);
// Type of the `nchanges` and `nevents` parameters in the `kevent` function.
#[cfg(not(target_os = "netbsd"))]
type Count = libc::c_int;
#[cfg(target_os = "netbsd")]
type Count = libc::size_t;
// Type of the `filter` field in the `kevent` structure.
#[cfg(any(target_os = "dragonfly", target_os = "freebsd", target_os = "openbsd"))]
type Filter = libc::c_short;
#[cfg(any(target_os = "macos", target_os = "ios"))]
type Filter = i16;
#[cfg(target_os = "netbsd")]
type Filter = u32;
// Type of the `flags` field in the `kevent` structure.
#[cfg(any(target_os = "dragonfly", target_os = "freebsd", target_os = "openbsd"))]
type Flags = libc::c_ushort;
#[cfg(any(target_os = "macos", target_os = "ios"))]
type Flags = u16;
#[cfg(target_os = "netbsd")]
type Flags = u32;
// Type of the `data` field in the `kevent` structure.
#[cfg(any(
target_os = "dragonfly",
target_os = "freebsd",
target_os = "ios",
target_os = "macos"
))]
type Data = libc::intptr_t;
#[cfg(any(target_os = "netbsd", target_os = "openbsd"))]
type Data = i64;
// Type of the `udata` field in the `kevent` structure.
#[cfg(not(target_os = "netbsd"))]
type UData = *mut libc::c_void;
#[cfg(target_os = "netbsd")]
type UData = libc::intptr_t;
macro_rules! kevent {
($id: expr, $filter: expr, $flags: expr, $data: expr) => {
libc::kevent {
ident: $id as libc::uintptr_t,
filter: $filter as Filter,
flags: $flags,
fflags: 0,
data: 0,
udata: $data as UData,
}
};
}
#[derive(Debug)]
pub struct Selector {
#[cfg(debug_assertions)]
id: usize,
kq: RawFd,
#[cfg(debug_assertions)]
has_waker: AtomicBool,
}
impl Selector {
pub fn new() -> io::Result<Selector> {
syscall!(kqueue())
.and_then(|kq| syscall!(fcntl(kq, libc::F_SETFD, libc::FD_CLOEXEC)).map(|_| kq))
.map(|kq| Selector {
#[cfg(debug_assertions)]
id: NEXT_ID.fetch_add(1, Ordering::Relaxed),
kq,
#[cfg(debug_assertions)]
has_waker: AtomicBool::new(false),
})
}
pub fn try_clone(&self) -> io::Result<Selector> {
syscall!(fcntl(self.kq, libc::F_DUPFD_CLOEXEC)).map(|kq| Selector {
// It's the same selector, so we use the same id.
#[cfg(debug_assertions)]
id: self.id,
kq,
#[cfg(debug_assertions)]
has_waker: AtomicBool::new(self.has_waker.load(Ordering::Acquire)),
})
}
pub fn select(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<()> {
let timeout = timeout.map(|to| libc::timespec {
tv_sec: cmp::min(to.as_secs(), libc::time_t::max_value() as u64) as libc::time_t,
// `Duration::subsec_nanos` is guaranteed to be less than one
// billion (the number of nanoseconds in a second), making the
// cast to i32 safe. The cast itself is needed for platforms
// where C's long is only 32 bits.
tv_nsec: libc::c_long::from(to.subsec_nanos() as i32),
});
let timeout = timeout
.as_ref()
.map(|s| s as *const _)
.unwrap_or(ptr::null_mut());
events.clear();
syscall!(kevent(
self.kq,
ptr::null(),
0,
events.as_mut_ptr(),
events.capacity() as Count,
timeout,
))
.map(|n_events| {
// This is safe because `kevent` ensures that `n_events` are
// assigned.
unsafe { events.set_len(n_events as usize) };
})
}
pub fn register(&self, fd: RawFd, token: Token, interests: Interest) -> io::Result<()> {
let flags = libc::EV_CLEAR | libc::EV_RECEIPT | libc::EV_ADD;
// At most we need two changes, but maybe we only need 1.
let mut changes: [MaybeUninit<libc::kevent>; 2] =
[MaybeUninit::uninit(), MaybeUninit::uninit()];
let mut n_changes = 0;
if interests.is_writable() {
let kevent = kevent!(fd, libc::EVFILT_WRITE, flags, token.0);
changes[n_changes] = MaybeUninit::new(kevent);
n_changes += 1;
}
if interests.is_readable() {
let kevent = kevent!(fd, libc::EVFILT_READ, flags, token.0);
changes[n_changes] = MaybeUninit::new(kevent);
n_changes += 1;
}
// Older versions of macOS (OS X 10.11 and 10.10 have been witnessed)
// can return EPIPE when registering a pipe file descriptor where the
// other end has already disappeared. For example code that creates a
// pipe, closes a file descriptor, and then registers the other end will
// see an EPIPE returned from `register`.
//
// It also turns out that kevent will still report events on the file
// descriptor, telling us that it's readable/hup at least after we've
// done this registration. As a result we just ignore `EPIPE` here
// instead of propagating it.
//
// More info can be found at tokio-rs/mio#582.
let changes = unsafe {
// This is safe because we ensure that at least `n_changes` are in
// the array.
slice::from_raw_parts_mut(changes[0].as_mut_ptr(), n_changes)
};
kevent_register(self.kq, changes, &[libc::EPIPE as Data])
}
pub fn reregister(&self, fd: RawFd, token: Token, interests: Interest) -> io::Result<()> {
let flags = libc::EV_CLEAR | libc::EV_RECEIPT;
let write_flags = if interests.is_writable() {
flags | libc::EV_ADD
} else {
flags | libc::EV_DELETE
};
let read_flags = if interests.is_readable() {
flags | libc::EV_ADD
} else {
flags | libc::EV_DELETE
};
let mut changes: [libc::kevent; 2] = [
kevent!(fd, libc::EVFILT_WRITE, write_flags, token.0),
kevent!(fd, libc::EVFILT_READ, read_flags, token.0),
];
// Since there is no way to check with which interests the fd was
// registered we modify both readable and write, adding it when required
// and removing it otherwise, ignoring the ENOENT error when it comes
// up. The ENOENT error informs us that a filter we're trying to remove
// wasn't there in first place, but we don't really care since our goal
// is accomplished.
//
// For the explanation of ignoring `EPIPE` see `register`.
kevent_register(
self.kq,
&mut changes,
&[libc::ENOENT as Data, libc::EPIPE as Data],
)
}
pub fn deregister(&self, fd: RawFd) -> io::Result<()> {
let flags = libc::EV_DELETE | libc::EV_RECEIPT;
let mut changes: [libc::kevent; 2] = [
kevent!(fd, libc::EVFILT_WRITE, flags, 0),
kevent!(fd, libc::EVFILT_READ, flags, 0),
];
// Since there is no way to check with which interests the fd was
// registered we remove both filters (readable and writeable) and ignore
// the ENOENT error when it comes up. The ENOENT error informs us that
// the filter wasn't there in first place, but we don't really care
// about that since our goal is to remove it.
kevent_register(self.kq, &mut changes, &[libc::ENOENT as Data])
}
#[cfg(debug_assertions)]
pub fn register_waker(&self) -> bool {
self.has_waker.swap(true, Ordering::AcqRel)
}
// Used by `Waker`.
#[cfg(any(target_os = "freebsd", target_os = "ios", target_os = "macos"))]
pub fn setup_waker(&self, token: Token) -> io::Result<()> {
// First attempt to accept user space notifications.
let mut kevent = kevent!(
0,
libc::EVFILT_USER,
libc::EV_ADD | libc::EV_CLEAR | libc::EV_RECEIPT,
token.0
);
syscall!(kevent(self.kq, &kevent, 1, &mut kevent, 1, ptr::null())).and_then(|_| {
if (kevent.flags & libc::EV_ERROR) != 0 && kevent.data != 0 {
Err(io::Error::from_raw_os_error(kevent.data as i32))
} else {
Ok(())
}
})
}
// Used by `Waker`.
#[cfg(any(target_os = "freebsd", target_os = "ios", target_os = "macos"))]
pub fn wake(&self, token: Token) -> io::Result<()> {
let mut kevent = kevent!(
0,
libc::EVFILT_USER,
libc::EV_ADD | libc::EV_RECEIPT,
token.0
);
kevent.fflags = libc::NOTE_TRIGGER;
syscall!(kevent(self.kq, &kevent, 1, &mut kevent, 1, ptr::null())).and_then(|_| {
if (kevent.flags & libc::EV_ERROR) != 0 && kevent.data != 0 {
Err(io::Error::from_raw_os_error(kevent.data as i32))
} else {
Ok(())
}
})
}
}
/// Register `changes` with `kq`ueue.
fn kevent_register(
kq: RawFd,
changes: &mut [libc::kevent],
ignored_errors: &[Data],
) -> io::Result<()> {
syscall!(kevent(
kq,
changes.as_ptr(),
changes.len() as Count,
changes.as_mut_ptr(),
changes.len() as Count,
ptr::null(),
))
.map(|_| ())
.or_else(|err| {
// According to the manual page of FreeBSD: "When kevent() call fails
// with EINTR error, all changes in the changelist have been applied",
// so we can safely ignore it.
if err.raw_os_error() == Some(libc::EINTR) {
Ok(())
} else {
Err(err)
}
})
.and_then(|()| check_errors(&changes, ignored_errors))
}
/// Check all events for possible errors, it returns the first error found.
fn check_errors(events: &[libc::kevent], ignored_errors: &[Data]) -> io::Result<()> {
for event in events {
// We can't use references to packed structures (in checking the ignored
// errors), so we need copy the data out before use.
let data = event.data;
// Check for the error flag, the actual error will be in the `data`
// field.
if (event.flags & libc::EV_ERROR != 0) && data != 0 && !ignored_errors.contains(&data) {
return Err(io::Error::from_raw_os_error(data as i32));
}
}
Ok(())
}
cfg_io_source! {
#[cfg(debug_assertions)]
impl Selector {
pub fn id(&self) -> usize {
self.id
}
}
}
impl AsRawFd for Selector {
fn as_raw_fd(&self) -> RawFd {
self.kq
}
}
impl Drop for Selector {
fn drop(&mut self) {
if let Err(err) = syscall!(close(self.kq)) {
error!("error closing kqueue: {}", err);
}
}
}
pub type Event = libc::kevent;
pub struct Events(Vec<libc::kevent>);
impl Events {
pub fn with_capacity(capacity: usize) -> Events {
Events(Vec::with_capacity(capacity))
}
}
impl Deref for Events {
type Target = Vec<libc::kevent>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl DerefMut for Events {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
// `Events` cannot derive `Send` or `Sync` because of the
// `udata: *mut ::c_void` field in `libc::kevent`. However, `Events`'s public
// API treats the `udata` field as a `uintptr_t` which is `Send`. `Sync` is
// safe because with a `events: &Events` value, the only access to the `udata`
// field is through `fn token(event: &Event)` which cannot mutate the field.
unsafe impl Send for Events {}
unsafe impl Sync for Events {}
pub mod event {
use std::fmt;
use crate::sys::Event;
use crate::Token;
use super::{Filter, Flags};
pub fn token(event: &Event) -> Token {
Token(event.udata as usize)
}
pub fn is_readable(event: &Event) -> bool {
event.filter == libc::EVFILT_READ || {
#[cfg(any(target_os = "freebsd", target_os = "ios", target_os = "macos"))]
// Used by the `Awakener`. On platforms that use `eventfd` or a unix
// pipe it will emit a readable event so we'll fake that here as
// well.
{
event.filter == libc::EVFILT_USER
}
#[cfg(not(any(target_os = "freebsd", target_os = "ios", target_os = "macos")))]
{
false
}
}
}
pub fn is_writable(event: &Event) -> bool {
event.filter == libc::EVFILT_WRITE
}
pub fn is_error(event: &Event) -> bool {
(event.flags & libc::EV_ERROR) != 0 ||
// When the read end of the socket is closed, EV_EOF is set on
// flags, and fflags contains the error if there is one.
(event.flags & libc::EV_EOF) != 0 && event.fflags != 0
}
pub fn is_read_closed(event: &Event) -> bool {
event.filter == libc::EVFILT_READ && event.flags & libc::EV_EOF != 0
}
pub fn is_write_closed(event: &Event) -> bool {
event.filter == libc::EVFILT_WRITE && event.flags & libc::EV_EOF != 0
}
pub fn is_priority(_: &Event) -> bool {
// kqueue doesn't have priority indicators.
false
}
#[allow(unused_variables)] // `event` is not used on some platforms.
pub fn is_aio(event: &Event) -> bool {
#[cfg(any(
target_os = "dragonfly",
target_os = "freebsd",
target_os = "ios",
target_os = "macos"
))]
{
event.filter == libc::EVFILT_AIO
}
#[cfg(not(any(
target_os = "dragonfly",
target_os = "freebsd",
target_os = "ios",
target_os = "macos"
)))]
{
false
}
}
#[allow(unused_variables)] // `event` is only used on FreeBSD.
pub fn is_lio(event: &Event) -> bool {
#[cfg(target_os = "freebsd")]
{
event.filter == libc::EVFILT_LIO
}
#[cfg(not(target_os = "freebsd"))]
{
false
}
}
pub fn debug_details(f: &mut fmt::Formatter<'_>, event: &Event) -> fmt::Result {
debug_detail!(
FilterDetails(Filter),
PartialEq::eq,
libc::EVFILT_READ,
libc::EVFILT_WRITE,
libc::EVFILT_AIO,
libc::EVFILT_VNODE,
libc::EVFILT_PROC,
libc::EVFILT_SIGNAL,
libc::EVFILT_TIMER,
#[cfg(target_os = "freebsd")]
libc::EVFILT_PROCDESC,
#[cfg(any(
target_os = "freebsd",
target_os = "dragonfly",
target_os = "ios",
target_os = "macos"
))]
libc::EVFILT_FS,
#[cfg(target_os = "freebsd")]
libc::EVFILT_LIO,
#[cfg(any(
target_os = "freebsd",
target_os = "dragonfly",
target_os = "ios",
target_os = "macos"
))]
libc::EVFILT_USER,
#[cfg(target_os = "freebsd")]
libc::EVFILT_SENDFILE,
#[cfg(target_os = "freebsd")]
libc::EVFILT_EMPTY,
#[cfg(target_os = "dragonfly")]
libc::EVFILT_EXCEPT,
#[cfg(any(target_os = "ios", target_os = "macos"))]
libc::EVFILT_MACHPORT,
#[cfg(any(target_os = "ios", target_os = "macos"))]
libc::EVFILT_VM,
);
#[allow(clippy::trivially_copy_pass_by_ref)]
fn check_flag(got: &Flags, want: &Flags) -> bool {
(got & want) != 0
}
debug_detail!(
FlagsDetails(Flags),
check_flag,
libc::EV_ADD,
libc::EV_DELETE,
libc::EV_ENABLE,
libc::EV_DISABLE,
libc::EV_ONESHOT,
libc::EV_CLEAR,
libc::EV_RECEIPT,
libc::EV_DISPATCH,
#[cfg(target_os = "freebsd")]
libc::EV_DROP,
libc::EV_FLAG1,
libc::EV_ERROR,
libc::EV_EOF,
libc::EV_SYSFLAGS,
#[cfg(any(target_os = "ios", target_os = "macos"))]
libc::EV_FLAG0,
#[cfg(any(target_os = "ios", target_os = "macos"))]
libc::EV_POLL,
#[cfg(any(target_os = "ios", target_os = "macos"))]
libc::EV_OOBAND,
#[cfg(target_os = "dragonfly")]
libc::EV_NODATA,
);
#[allow(clippy::trivially_copy_pass_by_ref)]
fn check_fflag(got: &u32, want: &u32) -> bool {
(got & want) != 0
}
debug_detail!(
FflagsDetails(u32),
check_fflag,
#[cfg(any(
target_os = "dragonfly",
target_os = "freebsd",
target_os = "ios",
target_os = "macos"
))]
libc::NOTE_TRIGGER,
#[cfg(any(
target_os = "dragonfly",
target_os = "freebsd",
target_os = "ios",
target_os = "macos"
))]
libc::NOTE_FFNOP,
#[cfg(any(
target_os = "dragonfly",
target_os = "freebsd",
target_os = "ios",
target_os = "macos"
))]
libc::NOTE_FFAND,
#[cfg(any(
target_os = "dragonfly",
target_os = "freebsd",
target_os = "ios",
target_os = "macos"
))]
libc::NOTE_FFOR,
#[cfg(any(
target_os = "dragonfly",
target_os = "freebsd",
target_os = "ios",
target_os = "macos"
))]
libc::NOTE_FFCOPY,
#[cfg(any(
target_os = "dragonfly",
target_os = "freebsd",
target_os = "ios",
target_os = "macos"
))]
libc::NOTE_FFCTRLMASK,
#[cfg(any(
target_os = "dragonfly",
target_os = "freebsd",
target_os = "ios",
target_os = "macos"
))]
libc::NOTE_FFLAGSMASK,
libc::NOTE_LOWAT,
libc::NOTE_DELETE,
libc::NOTE_WRITE,
#[cfg(target_os = "dragonfly")]
libc::NOTE_OOB,
#[cfg(target_os = "openbsd")]
libc::NOTE_EOF,
#[cfg(any(target_os = "ios", target_os = "macos"))]
libc::NOTE_EXTEND,
libc::NOTE_ATTRIB,
libc::NOTE_LINK,
libc::NOTE_RENAME,
libc::NOTE_REVOKE,
#[cfg(any(target_os = "ios", target_os = "macos"))]
libc::NOTE_NONE,
#[cfg(any(target_os = "openbsd"))]
libc::NOTE_TRUNCATE,
libc::NOTE_EXIT,
libc::NOTE_FORK,
libc::NOTE_EXEC,
#[cfg(any(target_os = "ios", target_os = "macos"))]
libc::NOTE_SIGNAL,
#[cfg(any(target_os = "ios", target_os = "macos"))]
libc::NOTE_EXITSTATUS,
#[cfg(any(target_os = "ios", target_os = "macos"))]
libc::NOTE_EXIT_DETAIL,
libc::NOTE_PDATAMASK,
libc::NOTE_PCTRLMASK,
#[cfg(any(
target_os = "dragonfly",
target_os = "freebsd",
target_os = "netbsd",
target_os = "openbsd"
))]
libc::NOTE_TRACK,
#[cfg(any(
target_os = "dragonfly",
target_os = "freebsd",
target_os = "netbsd",
target_os = "openbsd"
))]
libc::NOTE_TRACKERR,
#[cfg(any(
target_os = "dragonfly",
target_os = "freebsd",
target_os = "netbsd",
target_os = "openbsd"
))]
libc::NOTE_CHILD,
#[cfg(any(target_os = "ios", target_os = "macos"))]
libc::NOTE_EXIT_DETAIL_MASK,
#[cfg(any(target_os = "ios", target_os = "macos"))]
libc::NOTE_EXIT_DECRYPTFAIL,
#[cfg(any(target_os = "ios", target_os = "macos"))]
libc::NOTE_EXIT_MEMORY,
#[cfg(any(target_os = "ios", target_os = "macos"))]
libc::NOTE_EXIT_CSERROR,
#[cfg(any(target_os = "ios", target_os = "macos"))]
libc::NOTE_VM_PRESSURE,
#[cfg(any(target_os = "ios", target_os = "macos"))]
libc::NOTE_VM_PRESSURE_TERMINATE,
#[cfg(any(target_os = "ios", target_os = "macos"))]
libc::NOTE_VM_PRESSURE_SUDDEN_TERMINATE,
#[cfg(any(target_os = "ios", target_os = "macos"))]
libc::NOTE_VM_ERROR,
#[cfg(any(target_os = "freebsd", target_os = "ios", target_os = "macos"))]
libc::NOTE_SECONDS,
#[cfg(any(target_os = "freebsd"))]
libc::NOTE_MSECONDS,
#[cfg(any(target_os = "freebsd", target_os = "ios", target_os = "macos"))]
libc::NOTE_USECONDS,
#[cfg(any(target_os = "freebsd", target_os = "ios", target_os = "macos"))]
libc::NOTE_NSECONDS,
#[cfg(any(target_os = "ios", target_os = "macos"))]
#[cfg(any(target_os = "freebsd", target_os = "ios", target_os = "macos"))]
libc::NOTE_ABSOLUTE,
#[cfg(any(target_os = "ios", target_os = "macos"))]
libc::NOTE_LEEWAY,
#[cfg(any(target_os = "ios", target_os = "macos"))]
libc::NOTE_CRITICAL,
#[cfg(any(target_os = "ios", target_os = "macos"))]
libc::NOTE_BACKGROUND,
);
// Can't reference fields in packed structures.
let ident = event.ident;
let data = event.data;
let udata = event.udata;
f.debug_struct("kevent")
.field("ident", &ident)
.field("filter", &FilterDetails(event.filter))
.field("flags", &FlagsDetails(event.flags))
.field("fflags", &FflagsDetails(event.fflags))
.field("data", &data)
.field("udata", &udata)
.finish()
}
}
#[test]
#[cfg(feature = "os-ext")]
fn does_not_register_rw() {
use crate::unix::SourceFd;
use crate::{Poll, Token};
let kq = unsafe { libc::kqueue() };
let mut kqf = SourceFd(&kq);
let poll = Poll::new().unwrap();
// Registering kqueue fd will fail if write is requested (On anything but
// some versions of macOS).
poll.registry()
.register(&mut kqf, Token(1234), Interest::READABLE)
.unwrap();
}