blob: 2cafcb7120b236f0198afbc0912104857d332d14 [file] [log] [blame]
use std::collections::BTreeMap;
use std::io;
use std::mem;
#[cfg(unix)]
use std::os::unix::io::RawFd;
#[cfg(windows)]
use std::os::windows::io::RawSocket;
use std::panic;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex, MutexGuard};
use std::task::{Context, Poll, Waker};
use std::time::{Duration, Instant};
use concurrent_queue::ConcurrentQueue;
use futures_lite::future;
use once_cell::sync::Lazy;
use polling::{Event, Poller};
use slab::Slab;
const READ: usize = 0;
const WRITE: usize = 1;
/// The reactor.
///
/// There is only one global instance of this type, accessible by [`Reactor::get()`].
pub(crate) struct Reactor {
/// Portable bindings to epoll/kqueue/event ports/wepoll.
///
/// This is where I/O is polled, producing I/O events.
poller: Poller,
/// Ticker bumped before polling.
///
/// This is useful for checking what is the current "round" of `ReactorLock::react()` when
/// synchronizing things in `Source::readable()` and `Source::writable()`. Both of those
/// methods must make sure they don't receive stale I/O events - they only accept events from a
/// fresh "round" of `ReactorLock::react()`.
ticker: AtomicUsize,
/// Registered sources.
sources: Mutex<Slab<Arc<Source>>>,
/// Temporary storage for I/O events when polling the reactor.
///
/// Holding a lock on this event list implies the exclusive right to poll I/O.
events: Mutex<Vec<Event>>,
/// An ordered map of registered timers.
///
/// Timers are in the order in which they fire. The `usize` in this type is a timer ID used to
/// distinguish timers that fire at the same time. The `Waker` represents the task awaiting the
/// timer.
timers: Mutex<BTreeMap<(Instant, usize), Waker>>,
/// A queue of timer operations (insert and remove).
///
/// When inserting or removing a timer, we don't process it immediately - we just push it into
/// this queue. Timers actually get processed when the queue fills up or the reactor is polled.
timer_ops: ConcurrentQueue<TimerOp>,
}
impl Reactor {
/// Returns a reference to the reactor.
pub(crate) fn get() -> &'static Reactor {
static REACTOR: Lazy<Reactor> = Lazy::new(|| {
crate::driver::init();
Reactor {
poller: Poller::new().expect("cannot initialize I/O event notification"),
ticker: AtomicUsize::new(0),
sources: Mutex::new(Slab::new()),
events: Mutex::new(Vec::new()),
timers: Mutex::new(BTreeMap::new()),
timer_ops: ConcurrentQueue::bounded(1000),
}
});
&REACTOR
}
/// Returns the current ticker.
pub(crate) fn ticker(&self) -> usize {
self.ticker.load(Ordering::SeqCst)
}
/// Registers an I/O source in the reactor.
pub(crate) fn insert_io(
&self,
#[cfg(unix)] raw: RawFd,
#[cfg(windows)] raw: RawSocket,
) -> io::Result<Arc<Source>> {
// Create an I/O source for this file descriptor.
let source = {
let mut sources = self.sources.lock().unwrap();
let key = sources.vacant_entry().key();
let source = Arc::new(Source {
raw,
key,
state: Default::default(),
});
sources.insert(source.clone());
source
};
// Register the file descriptor.
if let Err(err) = self.poller.add(raw, Event::none(source.key)) {
let mut sources = self.sources.lock().unwrap();
sources.remove(source.key);
return Err(err);
}
Ok(source)
}
/// Deregisters an I/O source from the reactor.
pub(crate) fn remove_io(&self, source: &Source) -> io::Result<()> {
let mut sources = self.sources.lock().unwrap();
sources.remove(source.key);
self.poller.delete(source.raw)
}
/// Registers a timer in the reactor.
///
/// Returns the inserted timer's ID.
pub(crate) fn insert_timer(&self, when: Instant, waker: &Waker) -> usize {
// Generate a new timer ID.
static ID_GENERATOR: AtomicUsize = AtomicUsize::new(1);
let id = ID_GENERATOR.fetch_add(1, Ordering::Relaxed);
// Push an insert operation.
while self
.timer_ops
.push(TimerOp::Insert(when, id, waker.clone()))
.is_err()
{
// If the queue is full, drain it and try again.
let mut timers = self.timers.lock().unwrap();
self.process_timer_ops(&mut timers);
}
// Notify that a timer has been inserted.
self.notify();
id
}
/// Deregisters a timer from the reactor.
pub(crate) fn remove_timer(&self, when: Instant, id: usize) {
// Push a remove operation.
while self.timer_ops.push(TimerOp::Remove(when, id)).is_err() {
// If the queue is full, drain it and try again.
let mut timers = self.timers.lock().unwrap();
self.process_timer_ops(&mut timers);
}
}
/// Notifies the thread blocked on the reactor.
pub(crate) fn notify(&self) {
self.poller.notify().expect("failed to notify reactor");
}
/// Locks the reactor, potentially blocking if the lock is held by another thread.
pub(crate) fn lock(&self) -> ReactorLock<'_> {
let reactor = self;
let events = self.events.lock().unwrap();
ReactorLock { reactor, events }
}
/// Attempts to lock the reactor.
pub(crate) fn try_lock(&self) -> Option<ReactorLock<'_>> {
self.events.try_lock().ok().map(|events| {
let reactor = self;
ReactorLock { reactor, events }
})
}
/// Processes ready timers and extends the list of wakers to wake.
///
/// Returns the duration until the next timer before this method was called.
fn process_timers(&self, wakers: &mut Vec<Waker>) -> Option<Duration> {
let mut timers = self.timers.lock().unwrap();
self.process_timer_ops(&mut timers);
let now = Instant::now();
// Split timers into ready and pending timers.
let pending = timers.split_off(&(now, 0));
let ready = mem::replace(&mut *timers, pending);
// Calculate the duration until the next event.
let dur = if ready.is_empty() {
// Duration until the next timer.
timers
.keys()
.next()
.map(|(when, _)| when.saturating_duration_since(now))
} else {
// Timers are about to fire right now.
Some(Duration::from_secs(0))
};
// Drop the lock before waking.
drop(timers);
// Add wakers to the list.
log::trace!("process_timers: {} ready wakers", ready.len());
for (_, waker) in ready {
wakers.push(waker);
}
dur
}
/// Processes queued timer operations.
fn process_timer_ops(&self, timers: &mut MutexGuard<'_, BTreeMap<(Instant, usize), Waker>>) {
// Process only as much as fits into the queue, or else this loop could in theory run
// forever.
for _ in 0..self.timer_ops.capacity().unwrap() {
match self.timer_ops.pop() {
Ok(TimerOp::Insert(when, id, waker)) => {
timers.insert((when, id), waker);
}
Ok(TimerOp::Remove(when, id)) => {
timers.remove(&(when, id));
}
Err(_) => break,
}
}
}
}
/// A lock on the reactor.
pub(crate) struct ReactorLock<'a> {
reactor: &'a Reactor,
events: MutexGuard<'a, Vec<Event>>,
}
impl ReactorLock<'_> {
/// Processes new events, blocking until the first event or the timeout.
pub(crate) fn react(&mut self, timeout: Option<Duration>) -> io::Result<()> {
let mut wakers = Vec::new();
// Process ready timers.
let next_timer = self.reactor.process_timers(&mut wakers);
// compute the timeout for blocking on I/O events.
let timeout = match (next_timer, timeout) {
(None, None) => None,
(Some(t), None) | (None, Some(t)) => Some(t),
(Some(a), Some(b)) => Some(a.min(b)),
};
// Bump the ticker before polling I/O.
let tick = self
.reactor
.ticker
.fetch_add(1, Ordering::SeqCst)
.wrapping_add(1);
self.events.clear();
// Block on I/O events.
let res = match self.reactor.poller.wait(&mut self.events, timeout) {
// No I/O events occurred.
Ok(0) => {
if timeout != Some(Duration::from_secs(0)) {
// The non-zero timeout was hit so fire ready timers.
self.reactor.process_timers(&mut wakers);
}
Ok(())
}
// At least one I/O event occurred.
Ok(_) => {
// Iterate over sources in the event list.
let sources = self.reactor.sources.lock().unwrap();
for ev in self.events.iter() {
// Check if there is a source in the table with this key.
if let Some(source) = sources.get(ev.key) {
let mut state = source.state.lock().unwrap();
// Collect wakers if a writability event was emitted.
for &(dir, emitted) in &[(WRITE, ev.writable), (READ, ev.readable)] {
if emitted {
state[dir].tick = tick;
state[dir].drain_into(&mut wakers);
}
}
// Re-register if there are still writers or readers. The can happen if
// e.g. we were previously interested in both readability and writability,
// but only one of them was emitted.
if !state[READ].is_empty() || !state[WRITE].is_empty() {
self.reactor.poller.modify(
source.raw,
Event {
key: source.key,
readable: !state[READ].is_empty(),
writable: !state[WRITE].is_empty(),
},
)?;
}
}
}
Ok(())
}
// The syscall was interrupted.
Err(err) if err.kind() == io::ErrorKind::Interrupted => Ok(()),
// An actual error occureed.
Err(err) => Err(err),
};
// Wake up ready tasks.
log::trace!("react: {} ready wakers", wakers.len());
for waker in wakers {
// Don't let a panicking waker blow everything up.
panic::catch_unwind(|| waker.wake()).ok();
}
res
}
}
/// A single timer operation.
enum TimerOp {
Insert(Instant, usize, Waker),
Remove(Instant, usize),
}
/// A registered source of I/O events.
#[derive(Debug)]
pub(crate) struct Source {
/// Raw file descriptor on Unix platforms.
#[cfg(unix)]
pub(crate) raw: RawFd,
/// Raw socket handle on Windows.
#[cfg(windows)]
pub(crate) raw: RawSocket,
/// The key of this source obtained during registration.
key: usize,
/// Inner state with registered wakers.
state: Mutex<[Direction; 2]>,
}
/// A read or write direction.
#[derive(Debug, Default)]
struct Direction {
/// Last reactor tick that delivered an event.
tick: usize,
/// Ticks remembered by `Async::poll_readable()` or `Async::poll_writable()`.
ticks: Option<(usize, usize)>,
/// Waker stored by `Async::poll_readable()` or `Async::poll_writable()`.
waker: Option<Waker>,
/// Wakers of tasks waiting for the next event.
///
/// Registered by `Async::readable()` and `Async::writable()`.
wakers: Slab<Option<Waker>>,
}
impl Direction {
/// Returns `true` if there are no wakers interested in this direction.
fn is_empty(&self) -> bool {
self.waker.is_none() && self.wakers.iter().all(|(_, opt)| opt.is_none())
}
/// Moves all wakers into a `Vec`.
fn drain_into(&mut self, dst: &mut Vec<Waker>) {
if let Some(w) = self.waker.take() {
dst.push(w);
}
for (_, opt) in self.wakers.iter_mut() {
if let Some(w) = opt.take() {
dst.push(w);
}
}
}
}
impl Source {
/// Polls the I/O source for readability.
pub(crate) fn poll_readable(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.poll_ready(READ, cx)
}
/// Polls the I/O source for writability.
pub(crate) fn poll_writable(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.poll_ready(WRITE, cx)
}
/// Registers a waker from `poll_readable()` or `poll_writable()`.
///
/// If a different waker is already registered, it gets replaced and woken.
fn poll_ready(&self, dir: usize, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
let mut state = self.state.lock().unwrap();
// Check if the reactor has delivered an event.
if let Some((a, b)) = state[dir].ticks {
// If `state[dir].tick` has changed to a value other than the old reactor tick,
// that means a newer reactor tick has delivered an event.
if state[dir].tick != a && state[dir].tick != b {
state[dir].ticks = None;
return Poll::Ready(Ok(()));
}
}
let was_empty = state[dir].is_empty();
// Register the current task's waker.
if let Some(w) = state[dir].waker.take() {
if w.will_wake(cx.waker()) {
state[dir].waker = Some(w);
return Poll::Pending;
}
// Wake the previous waker because it's going to get replaced.
panic::catch_unwind(|| w.wake()).ok();
}
state[dir].waker = Some(cx.waker().clone());
state[dir].ticks = Some((Reactor::get().ticker(), state[dir].tick));
// Update interest in this I/O handle.
if was_empty {
Reactor::get().poller.modify(
self.raw,
Event {
key: self.key,
readable: !state[READ].is_empty(),
writable: !state[WRITE].is_empty(),
},
)?;
}
Poll::Pending
}
/// Waits until the I/O source is readable.
pub(crate) async fn readable(&self) -> io::Result<()> {
self.ready(READ).await?;
log::trace!("readable: fd={}", self.raw);
Ok(())
}
/// Waits until the I/O source is writable.
pub(crate) async fn writable(&self) -> io::Result<()> {
self.ready(WRITE).await?;
log::trace!("writable: fd={}", self.raw);
Ok(())
}
/// Waits until the I/O source is readable or writable.
async fn ready(&self, dir: usize) -> io::Result<()> {
let mut ticks = None;
let mut index = None;
let mut _guard = None;
future::poll_fn(|cx| {
let mut state = self.state.lock().unwrap();
// Check if the reactor has delivered an event.
if let Some((a, b)) = ticks {
// If `state[dir].tick` has changed to a value other than the old reactor tick,
// that means a newer reactor tick has delivered an event.
if state[dir].tick != a && state[dir].tick != b {
return Poll::Ready(Ok(()));
}
}
let was_empty = state[dir].is_empty();
// Register the current task's waker.
let i = match index {
Some(i) => i,
None => {
let i = state[dir].wakers.insert(None);
_guard = Some(CallOnDrop(move || {
let mut state = self.state.lock().unwrap();
state[dir].wakers.remove(i);
}));
index = Some(i);
ticks = Some((Reactor::get().ticker(), state[dir].tick));
i
}
};
state[dir].wakers[i] = Some(cx.waker().clone());
// Update interest in this I/O handle.
if was_empty {
Reactor::get().poller.modify(
self.raw,
Event {
key: self.key,
readable: !state[READ].is_empty(),
writable: !state[WRITE].is_empty(),
},
)?;
}
Poll::Pending
})
.await
}
}
/// Runs a closure when dropped.
struct CallOnDrop<F: Fn()>(F);
impl<F: Fn()> Drop for CallOnDrop<F> {
fn drop(&mut self) {
(self.0)();
}
}