blob: 1caa9576324b11d220041fa3c14d26e10659d106 [file] [log] [blame]
//! The core reactor driving all I/O
//!
//! This module contains the `Core` type which is the reactor for all I/O
//! happening in `tokio-core`. This reactor (or event loop) is used to run
//! futures, schedule tasks, issue I/O requests, etc.
use std::cell::RefCell;
use std::cmp;
use std::fmt;
use std::io::{self, ErrorKind};
use std::mem;
use std::rc::{Rc, Weak};
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, ATOMIC_USIZE_INIT, Ordering};
use std::time::{Instant, Duration};
use futures::{Future, IntoFuture, Async};
use futures::future::{self, Executor, ExecuteError};
use futures::executor::{self, Spawn, Notify};
use futures::sync::mpsc;
use futures::task::Task;
use mio;
use mio::event::Evented;
use slab::Slab;
use heap::{Heap, Slot};
mod io_token;
mod timeout_token;
mod poll_evented;
mod timeout;
mod interval;
pub use self::poll_evented::PollEvented;
pub use self::timeout::Timeout;
pub use self::interval::Interval;
static NEXT_LOOP_ID: AtomicUsize = ATOMIC_USIZE_INIT;
scoped_thread_local!(static CURRENT_LOOP: Core);
/// An event loop.
///
/// The event loop is the main source of blocking in an application which drives
/// all other I/O events and notifications happening. Each event loop can have
/// multiple handles pointing to it, each of which can then be used to create
/// various I/O objects to interact with the event loop in interesting ways.
// TODO: expand this
pub struct Core {
events: mio::Events,
tx: mpsc::UnboundedSender<Message>,
rx: RefCell<Spawn<mpsc::UnboundedReceiver<Message>>>,
_rx_registration: mio::Registration,
rx_readiness: Arc<MySetReadiness>,
inner: Rc<RefCell<Inner>>,
// Used for determining when the future passed to `run` is ready. Once the
// registration is passed to `io` above we never touch it again, just keep
// it alive.
_future_registration: mio::Registration,
future_readiness: Arc<MySetReadiness>,
}
struct Inner {
id: usize,
io: mio::Poll,
// Dispatch slabs for I/O and futures events
io_dispatch: Slab<ScheduledIo>,
task_dispatch: Slab<ScheduledTask>,
// Timer wheel keeping track of all timeouts. The `usize` stored in the
// timer wheel is an index into the slab below.
//
// The slab below keeps track of the timeouts themselves as well as the
// state of the timeout itself. The `TimeoutToken` type is an index into the
// `timeouts` slab.
timer_heap: Heap<(Instant, usize)>,
timeouts: Slab<(Option<Slot>, TimeoutState)>,
}
/// An unique ID for a Core
///
/// An ID by which different cores may be distinguished. Can be compared and used as an index in
/// a `HashMap`.
///
/// The ID is globally unique and never reused.
#[derive(Clone,Copy,Eq,PartialEq,Hash,Debug)]
pub struct CoreId(usize);
/// Handle to an event loop, used to construct I/O objects, send messages, and
/// otherwise interact indirectly with the event loop itself.
///
/// Handles can be cloned, and when cloned they will still refer to the
/// same underlying event loop.
#[derive(Clone)]
pub struct Remote {
id: usize,
tx: mpsc::UnboundedSender<Message>,
}
/// A non-sendable handle to an event loop, useful for manufacturing instances
/// of `LoopData`.
#[derive(Clone)]
pub struct Handle {
remote: Remote,
inner: Weak<RefCell<Inner>>,
}
struct ScheduledIo {
readiness: Arc<AtomicUsize>,
reader: Option<Task>,
writer: Option<Task>,
}
struct ScheduledTask {
_registration: mio::Registration,
spawn: Option<Spawn<Box<Future<Item=(), Error=()>>>>,
wake: Option<Arc<MySetReadiness>>,
}
enum TimeoutState {
NotFired,
Fired,
Waiting(Task),
}
enum Direction {
Read,
Write,
}
enum Message {
DropSource(usize),
Schedule(usize, Task, Direction),
UpdateTimeout(usize, Task),
ResetTimeout(usize, Instant),
CancelTimeout(usize),
Run(Box<FnBox>),
}
const TOKEN_MESSAGES: mio::Token = mio::Token(0);
const TOKEN_FUTURE: mio::Token = mio::Token(1);
const TOKEN_START: usize = 2;
impl Core {
/// Creates a new event loop, returning any error that happened during the
/// creation.
pub fn new() -> io::Result<Core> {
let io = try!(mio::Poll::new());
let future_pair = mio::Registration::new2();
try!(io.register(&future_pair.0,
TOKEN_FUTURE,
mio::Ready::readable(),
mio::PollOpt::level()));
let (tx, rx) = mpsc::unbounded();
let channel_pair = mio::Registration::new2();
try!(io.register(&channel_pair.0,
TOKEN_MESSAGES,
mio::Ready::readable(),
mio::PollOpt::level()));
let rx_readiness = Arc::new(MySetReadiness(channel_pair.1));
rx_readiness.notify(0);
Ok(Core {
events: mio::Events::with_capacity(1024),
tx: tx,
rx: RefCell::new(executor::spawn(rx)),
_rx_registration: channel_pair.0,
rx_readiness: rx_readiness,
_future_registration: future_pair.0,
future_readiness: Arc::new(MySetReadiness(future_pair.1)),
inner: Rc::new(RefCell::new(Inner {
id: NEXT_LOOP_ID.fetch_add(1, Ordering::Relaxed),
io: io,
io_dispatch: Slab::with_capacity(1),
task_dispatch: Slab::with_capacity(1),
timeouts: Slab::with_capacity(1),
timer_heap: Heap::new(),
})),
})
}
/// Returns a handle to this event loop which cannot be sent across threads
/// but can be used as a proxy to the event loop itself.
///
/// Handles are cloneable and clones always refer to the same event loop.
/// This handle is typically passed into functions that create I/O objects
/// to bind them to this event loop.
pub fn handle(&self) -> Handle {
Handle {
remote: self.remote(),
inner: Rc::downgrade(&self.inner),
}
}
/// Generates a remote handle to this event loop which can be used to spawn
/// tasks from other threads into this event loop.
pub fn remote(&self) -> Remote {
Remote {
id: self.inner.borrow().id,
tx: self.tx.clone(),
}
}
/// Runs a future until completion, driving the event loop while we're
/// otherwise waiting for the future to complete.
///
/// This function will begin executing the event loop and will finish once
/// the provided future is resolved. Note that the future argument here
/// crucially does not require the `'static` nor `Send` bounds. As a result
/// the future will be "pinned" to not only this thread but also this stack
/// frame.
///
/// This function will return the value that the future resolves to once
/// the future has finished. If the future never resolves then this function
/// will never return.
///
/// # Panics
///
/// This method will **not** catch panics from polling the future `f`. If
/// the future panics then it's the responsibility of the caller to catch
/// that panic and handle it as appropriate.
pub fn run<F>(&mut self, f: F) -> Result<F::Item, F::Error>
where F: Future,
{
let mut task = executor::spawn(f);
let mut future_fired = true;
loop {
if future_fired {
let res = try!(CURRENT_LOOP.set(self, || {
task.poll_future_notify(&self.future_readiness, 0)
}));
if let Async::Ready(e) = res {
return Ok(e)
}
}
future_fired = self.poll(None);
}
}
/// Performs one iteration of the event loop, blocking on waiting for events
/// for at most `max_wait` (forever if `None`).
///
/// It only makes sense to call this method if you've previously spawned
/// a future onto this event loop.
///
/// `loop { lp.turn(None) }` is equivalent to calling `run` with an
/// empty future (one that never finishes).
pub fn turn(&mut self, max_wait: Option<Duration>) {
self.poll(max_wait);
}
fn poll(&mut self, max_wait: Option<Duration>) -> bool {
// Given the `max_wait` variable specified, figure out the actual
// timeout that we're going to pass to `poll`. This involves taking a
// look at active timers on our heap as well.
let start = Instant::now();
let timeout = self.inner.borrow_mut().timer_heap.peek().map(|t| {
if t.0 < start {
Duration::new(0, 0)
} else {
t.0 - start
}
});
let timeout = match (max_wait, timeout) {
(Some(d1), Some(d2)) => Some(cmp::min(d1, d2)),
(max_wait, timeout) => max_wait.or(timeout),
};
// Block waiting for an event to happen, peeling out how many events
// happened.
let amt = match self.inner.borrow_mut().io.poll(&mut self.events, timeout) {
Ok(a) => a,
Err(ref e) if e.kind() == ErrorKind::Interrupted => return false,
Err(e) => panic!("error in poll: {}", e),
};
let after_poll = Instant::now();
debug!("loop poll - {:?}", after_poll - start);
debug!("loop time - {:?}", after_poll);
// Process all timeouts that may have just occurred, updating the
// current time since
self.consume_timeouts(after_poll);
// Process all the events that came in, dispatching appropriately
let mut fired = false;
for i in 0..self.events.len() {
let event = self.events.get(i).unwrap();
let token = event.token();
trace!("event {:?} {:?}", event.readiness(), event.token());
if token == TOKEN_MESSAGES {
self.rx_readiness.0.set_readiness(mio::Ready::empty()).unwrap();
CURRENT_LOOP.set(&self, || self.consume_queue());
} else if token == TOKEN_FUTURE {
self.future_readiness.0.set_readiness(mio::Ready::empty()).unwrap();
fired = true;
} else {
self.dispatch(token, event.readiness());
}
}
debug!("loop process - {} events, {:?}", amt, after_poll.elapsed());
return fired
}
fn dispatch(&mut self, token: mio::Token, ready: mio::Ready) {
let token = usize::from(token) - TOKEN_START;
if token % 2 == 0 {
self.dispatch_io(token / 2, ready)
} else {
self.dispatch_task(token / 2)
}
}
fn dispatch_io(&mut self, token: usize, ready: mio::Ready) {
let mut reader = None;
let mut writer = None;
let mut inner = self.inner.borrow_mut();
if let Some(io) = inner.io_dispatch.get_mut(token) {
io.readiness.fetch_or(ready2usize(ready), Ordering::Relaxed);
if ready.is_writable() {
writer = io.writer.take();
}
if !(ready & (!mio::Ready::writable())).is_empty() {
reader = io.reader.take();
}
}
drop(inner);
// TODO: don't notify the same task twice
if let Some(reader) = reader {
self.notify_handle(reader);
}
if let Some(writer) = writer {
self.notify_handle(writer);
}
}
fn dispatch_task(&mut self, token: usize) {
let mut inner = self.inner.borrow_mut();
let (task, wake) = match inner.task_dispatch.get_mut(token) {
Some(slot) => (slot.spawn.take(), slot.wake.take()),
None => return,
};
let (mut task, wake) = match (task, wake) {
(Some(task), Some(wake)) => (task, wake),
_ => return,
};
wake.0.set_readiness(mio::Ready::empty()).unwrap();
drop(inner);
let res = CURRENT_LOOP.set(self, || {
task.poll_future_notify(&wake, 0)
});
let _task_to_drop;
inner = self.inner.borrow_mut();
match res {
Ok(Async::NotReady) => {
assert!(inner.task_dispatch[token].spawn.is_none());
inner.task_dispatch[token].spawn = Some(task);
inner.task_dispatch[token].wake = Some(wake);
}
Ok(Async::Ready(())) |
Err(()) => {
_task_to_drop = inner.task_dispatch.remove(token).unwrap();
}
}
drop(inner);
}
fn consume_timeouts(&mut self, now: Instant) {
loop {
let mut inner = self.inner.borrow_mut();
match inner.timer_heap.peek() {
Some(head) if head.0 <= now => {}
Some(_) => break,
None => break,
};
let (_, slab_idx) = inner.timer_heap.pop().unwrap();
trace!("firing timeout: {}", slab_idx);
inner.timeouts[slab_idx].0.take().unwrap();
let handle = inner.timeouts[slab_idx].1.fire();
drop(inner);
if let Some(handle) = handle {
self.notify_handle(handle);
}
}
}
/// Method used to notify a task handle.
///
/// Note that this should be used instead of `handle.notify()` to ensure
/// that the `CURRENT_LOOP` variable is set appropriately.
fn notify_handle(&self, handle: Task) {
debug!("notifying a task handle");
CURRENT_LOOP.set(&self, || handle.notify());
}
fn consume_queue(&self) {
debug!("consuming notification queue");
// TODO: can we do better than `.unwrap()` here?
loop {
let msg = self.rx.borrow_mut().poll_stream_notify(&self.rx_readiness, 0).unwrap();
match msg {
Async::Ready(Some(msg)) => self.notify(msg),
Async::NotReady |
Async::Ready(None) => break,
}
}
}
fn notify(&self, msg: Message) {
match msg {
Message::DropSource(tok) => self.inner.borrow_mut().drop_source(tok),
Message::Schedule(tok, wake, dir) => {
let task = self.inner.borrow_mut().schedule(tok, wake, dir);
if let Some(task) = task {
self.notify_handle(task);
}
}
Message::UpdateTimeout(t, handle) => {
let task = self.inner.borrow_mut().update_timeout(t, handle);
if let Some(task) = task {
self.notify_handle(task);
}
}
Message::ResetTimeout(t, at) => {
self.inner.borrow_mut().reset_timeout(t, at);
}
Message::CancelTimeout(t) => {
self.inner.borrow_mut().cancel_timeout(t)
}
Message::Run(r) => r.call_box(self),
}
}
/// Get the ID of this loop
pub fn id(&self) -> CoreId {
CoreId(self.inner.borrow().id)
}
}
impl<F> Executor<F> for Core
where F: Future<Item = (), Error = ()> + 'static,
{
fn execute(&self, future: F) -> Result<(), ExecuteError<F>> {
self.handle().execute(future)
}
}
impl fmt::Debug for Core {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Core")
.field("id", &self.id())
.finish()
}
}
impl Inner {
fn add_source(&mut self, source: &Evented)
-> io::Result<(Arc<AtomicUsize>, usize)> {
debug!("adding a new I/O source");
let sched = ScheduledIo {
readiness: Arc::new(AtomicUsize::new(0)),
reader: None,
writer: None,
};
if self.io_dispatch.vacant_entry().is_none() {
let amt = self.io_dispatch.len();
self.io_dispatch.reserve_exact(amt);
}
let entry = self.io_dispatch.vacant_entry().unwrap();
try!(self.io.register(source,
mio::Token(TOKEN_START + entry.index() * 2),
mio::Ready::readable() |
mio::Ready::writable() |
platform::all(),
mio::PollOpt::edge()));
Ok((sched.readiness.clone(), entry.insert(sched).index()))
}
fn deregister_source(&mut self, source: &Evented) -> io::Result<()> {
self.io.deregister(source)
}
fn drop_source(&mut self, token: usize) {
debug!("dropping I/O source: {}", token);
self.io_dispatch.remove(token).unwrap();
}
fn schedule(&mut self, token: usize, wake: Task, dir: Direction)
-> Option<Task> {
debug!("scheduling direction for: {}", token);
let sched = self.io_dispatch.get_mut(token).unwrap();
let (slot, ready) = match dir {
Direction::Read => (&mut sched.reader, !mio::Ready::writable()),
Direction::Write => (&mut sched.writer, mio::Ready::writable()),
};
if sched.readiness.load(Ordering::SeqCst) & ready2usize(ready) != 0 {
debug!("cancelling block");
*slot = None;
Some(wake)
} else {
debug!("blocking");
*slot = Some(wake);
None
}
}
fn add_timeout(&mut self, at: Instant) -> usize {
if self.timeouts.vacant_entry().is_none() {
let len = self.timeouts.len();
self.timeouts.reserve_exact(len);
}
let entry = self.timeouts.vacant_entry().unwrap();
let slot = self.timer_heap.push((at, entry.index()));
let entry = entry.insert((Some(slot), TimeoutState::NotFired));
debug!("added a timeout: {}", entry.index());
return entry.index();
}
fn update_timeout(&mut self, token: usize, handle: Task) -> Option<Task> {
debug!("updating a timeout: {}", token);
self.timeouts[token].1.block(handle)
}
fn reset_timeout(&mut self, token: usize, at: Instant) {
let pair = &mut self.timeouts[token];
// TODO: avoid remove + push and instead just do one sift of the heap?
// In theory we could update it in place and then do the percolation
// as necessary
if let Some(slot) = pair.0.take() {
self.timer_heap.remove(slot);
}
let slot = self.timer_heap.push((at, token));
*pair = (Some(slot), TimeoutState::NotFired);
debug!("set a timeout: {}", token);
}
fn cancel_timeout(&mut self, token: usize) {
debug!("cancel a timeout: {}", token);
let pair = self.timeouts.remove(token);
if let Some((Some(slot), _state)) = pair {
self.timer_heap.remove(slot);
}
}
fn spawn(&mut self, future: Box<Future<Item=(), Error=()>>) {
if self.task_dispatch.vacant_entry().is_none() {
let len = self.task_dispatch.len();
self.task_dispatch.reserve_exact(len);
}
let entry = self.task_dispatch.vacant_entry().unwrap();
let token = TOKEN_START + 2 * entry.index() + 1;
let pair = mio::Registration::new2();
self.io.register(&pair.0,
mio::Token(token),
mio::Ready::readable(),
mio::PollOpt::level())
.expect("cannot fail future registration with mio");
let unpark = Arc::new(MySetReadiness(pair.1));
unpark.notify(0);
entry.insert(ScheduledTask {
spawn: Some(executor::spawn(future)),
wake: Some(unpark),
_registration: pair.0,
});
}
}
impl Remote {
fn send(&self, msg: Message) {
self.with_loop(|lp| {
match lp {
Some(lp) => {
// Need to execute all existing requests first, to ensure
// that our message is processed "in order"
lp.consume_queue();
lp.notify(msg);
}
None => {
match mpsc::UnboundedSender::send(&self.tx, msg) {
Ok(()) => {}
// TODO: this error should punt upwards and we should
// notify the caller that the message wasn't
// received. This is tokio-core#17
Err(e) => drop(e),
}
}
}
})
}
fn with_loop<F, R>(&self, f: F) -> R
where F: FnOnce(Option<&Core>) -> R
{
if CURRENT_LOOP.is_set() {
CURRENT_LOOP.with(|lp| {
let same = lp.inner.borrow().id == self.id;
if same {
f(Some(lp))
} else {
f(None)
}
})
} else {
f(None)
}
}
/// Spawns a new future into the event loop this remote is associated with.
///
/// This function takes a closure which is executed within the context of
/// the I/O loop itself. The future returned by the closure will be
/// scheduled on the event loop and run to completion.
///
/// Note that while the closure, `F`, requires the `Send` bound as it might
/// cross threads, the future `R` does not.
///
/// # Panics
///
/// This method will **not** catch panics from polling the future `f`. If
/// the future panics then it's the responsibility of the caller to catch
/// that panic and handle it as appropriate.
pub fn spawn<F, R>(&self, f: F)
where F: FnOnce(&Handle) -> R + Send + 'static,
R: IntoFuture<Item=(), Error=()>,
R::Future: 'static,
{
self.send(Message::Run(Box::new(|lp: &Core| {
let f = f(&lp.handle());
lp.inner.borrow_mut().spawn(Box::new(f.into_future()));
})));
}
/// Return the ID of the represented Core
pub fn id(&self) -> CoreId {
CoreId(self.id)
}
/// Attempts to "promote" this remote to a handle, if possible.
///
/// This function is intended for structures which typically work through a
/// `Remote` but want to optimize runtime when the remote doesn't actually
/// leave the thread of the original reactor. This will attempt to return a
/// handle if the `Remote` is on the same thread as the event loop and the
/// event loop is running.
///
/// If this `Remote` has moved to a different thread or if the event loop is
/// running, then `None` may be returned. If you need to guarantee access to
/// a `Handle`, then you can call this function and fall back to using
/// `spawn` above if it returns `None`.
pub fn handle(&self) -> Option<Handle> {
if CURRENT_LOOP.is_set() {
CURRENT_LOOP.with(|lp| {
let same = lp.inner.borrow().id == self.id;
if same {
Some(lp.handle())
} else {
None
}
})
} else {
None
}
}
}
impl<F> Executor<F> for Remote
where F: Future<Item = (), Error = ()> + Send + 'static,
{
fn execute(&self, future: F) -> Result<(), ExecuteError<F>> {
self.spawn(|_| future);
Ok(())
}
}
impl fmt::Debug for Remote {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Remote")
.field("id", &self.id())
.finish()
}
}
impl Handle {
/// Returns a reference to the underlying remote handle to the event loop.
pub fn remote(&self) -> &Remote {
&self.remote
}
/// Spawns a new future on the event loop this handle is associated with.
///
/// # Panics
///
/// This method will **not** catch panics from polling the future `f`. If
/// the future panics then it's the responsibility of the caller to catch
/// that panic and handle it as appropriate.
pub fn spawn<F>(&self, f: F)
where F: Future<Item=(), Error=()> + 'static,
{
let inner = match self.inner.upgrade() {
Some(inner) => inner,
None => return,
};
inner.borrow_mut().spawn(Box::new(f));
}
/// Spawns a closure on this event loop.
///
/// This function is a convenience wrapper around the `spawn` function above
/// for running a closure wrapped in `futures::lazy`. It will spawn the
/// function `f` provided onto the event loop, and continue to run the
/// future returned by `f` on the event loop as well.
///
/// # Panics
///
/// This method will **not** catch panics from polling the future `f`. If
/// the future panics then it's the responsibility of the caller to catch
/// that panic and handle it as appropriate.
pub fn spawn_fn<F, R>(&self, f: F)
where F: FnOnce() -> R + 'static,
R: IntoFuture<Item=(), Error=()> + 'static,
{
self.spawn(future::lazy(f))
}
/// Return the ID of the represented Core
pub fn id(&self) -> CoreId {
self.remote.id()
}
}
impl<F> Executor<F> for Handle
where F: Future<Item = (), Error = ()> + 'static,
{
fn execute(&self, future: F) -> Result<(), ExecuteError<F>> {
self.spawn(future);
Ok(())
}
}
impl fmt::Debug for Handle {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Handle")
.field("id", &self.id())
.finish()
}
}
impl TimeoutState {
fn block(&mut self, handle: Task) -> Option<Task> {
match *self {
TimeoutState::Fired => return Some(handle),
_ => {}
}
*self = TimeoutState::Waiting(handle);
None
}
fn fire(&mut self) -> Option<Task> {
match mem::replace(self, TimeoutState::Fired) {
TimeoutState::NotFired => None,
TimeoutState::Fired => panic!("fired twice?"),
TimeoutState::Waiting(handle) => Some(handle),
}
}
}
struct MySetReadiness(mio::SetReadiness);
impl Notify for MySetReadiness {
fn notify(&self, _id: usize) {
self.0.set_readiness(mio::Ready::readable())
.expect("failed to set readiness");
}
}
trait FnBox: Send + 'static {
fn call_box(self: Box<Self>, lp: &Core);
}
impl<F: FnOnce(&Core) + Send + 'static> FnBox for F {
fn call_box(self: Box<Self>, lp: &Core) {
(*self)(lp)
}
}
fn read_ready() -> mio::Ready {
mio::Ready::readable() | platform::hup()
}
const READ: usize = 1 << 0;
const WRITE: usize = 1 << 1;
fn ready2usize(ready: mio::Ready) -> usize {
let mut bits = 0;
if ready.is_readable() {
bits |= READ;
}
if ready.is_writable() {
bits |= WRITE;
}
bits | platform::ready2usize(ready)
}
fn usize2ready(bits: usize) -> mio::Ready {
let mut ready = mio::Ready::empty();
if bits & READ != 0 {
ready.insert(mio::Ready::readable());
}
if bits & WRITE != 0 {
ready.insert(mio::Ready::writable());
}
ready | platform::usize2ready(bits)
}
#[cfg(all(unix, not(target_os = "fuchsia")))]
mod platform {
use mio::Ready;
use mio::unix::UnixReady;
pub fn aio() -> Ready {
UnixReady::aio().into()
}
pub fn all() -> Ready {
hup() | aio()
}
pub fn hup() -> Ready {
UnixReady::hup().into()
}
const HUP: usize = 1 << 2;
const ERROR: usize = 1 << 3;
const AIO: usize = 1 << 4;
pub fn ready2usize(ready: Ready) -> usize {
let ready = UnixReady::from(ready);
let mut bits = 0;
if ready.is_aio() {
bits |= AIO;
}
if ready.is_error() {
bits |= ERROR;
}
if ready.is_hup() {
bits |= HUP;
}
bits
}
pub fn usize2ready(bits: usize) -> Ready {
let mut ready = UnixReady::from(Ready::empty());
if bits & AIO != 0 {
ready.insert(UnixReady::aio());
}
if bits & HUP != 0 {
ready.insert(UnixReady::hup());
}
if bits & ERROR != 0 {
ready.insert(UnixReady::error());
}
ready.into()
}
}
#[cfg(any(windows, target_os = "fuchsia"))]
mod platform {
use mio::Ready;
pub fn all() -> Ready {
// No platform-specific Readinesses for Windows
Ready::empty()
}
pub fn hup() -> Ready {
Ready::empty()
}
pub fn ready2usize(_r: Ready) -> usize {
0
}
pub fn usize2ready(_r: usize) -> Ready {
Ready::empty()
}
}