blob: 6e7d84b4f90d7f9da739c985e907914c85d0754d [file] [log] [blame]
use crate::io::driver::{platform, Direction, Handle};
use crate::util::slab::Address;
use mio::{self, Evented};
use std::io;
use std::task::{Context, Poll};
cfg_io_driver! {
/// Associates an I/O resource with the reactor instance that drives it.
///
/// A registration represents an I/O resource registered with a Reactor such
/// that it will receive task notifications on readiness. This is the lowest
/// level API for integrating with a reactor.
///
/// The association between an I/O resource is made by calling [`new`]. Once
/// the association is established, it remains established until the
/// registration instance is dropped.
///
/// A registration instance represents two separate readiness streams. One
/// for the read readiness and one for write readiness. These streams are
/// independent and can be consumed from separate tasks.
///
/// **Note**: while `Registration` is `Sync`, the caller must ensure that
/// there are at most two tasks that use a registration instance
/// concurrently. One task for [`poll_read_ready`] and one task for
/// [`poll_write_ready`]. While violating this requirement is "safe" from a
/// Rust memory safety point of view, it will result in unexpected behavior
/// in the form of lost notifications and tasks hanging.
///
/// ## Platform-specific events
///
/// `Registration` also allows receiving platform-specific `mio::Ready`
/// events. These events are included as part of the read readiness event
/// stream. The write readiness event stream is only for `Ready::writable()`
/// events.
///
/// [`new`]: #method.new
/// [`poll_read_ready`]: #method.poll_read_ready`]
/// [`poll_write_ready`]: #method.poll_write_ready`]
#[derive(Debug)]
pub struct Registration {
handle: Handle,
address: Address,
}
}
// ===== impl Registration =====
impl Registration {
/// Registers the I/O resource with the default reactor.
///
/// # Return
///
/// - `Ok` if the registration happened successfully
/// - `Err` if an error was encountered during registration
///
///
/// # Panics
///
/// This function panics if thread-local runtime is not set.
///
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a tokio runtime, otherwise runtime can be set
/// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function.
pub fn new<T>(io: &T) -> io::Result<Registration>
where
T: Evented,
{
Registration::new_with_ready(io, mio::Ready::all())
}
/// Registers the I/O resource with the default reactor, for a specific `mio::Ready` state.
/// `new_with_ready` should be used over `new` when you need control over the readiness state,
/// such as when a file descriptor only allows reads. This does not add `hup` or `error` so if
/// you are interested in those states, you will need to add them to the readiness state passed
/// to this function.
///
/// An example to listen to read only
///
/// ```rust
/// ##[cfg(unix)]
/// mio::Ready::from_usize(
/// mio::Ready::readable().as_usize()
/// | mio::unix::UnixReady::error().as_usize()
/// | mio::unix::UnixReady::hup().as_usize()
/// );
/// ```
///
/// # Return
///
/// - `Ok` if the registration happened successfully
/// - `Err` if an error was encountered during registration
///
///
/// # Panics
///
/// This function panics if thread-local runtime is not set.
///
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a tokio runtime, otherwise runtime can be set
/// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function.
pub fn new_with_ready<T>(io: &T, ready: mio::Ready) -> io::Result<Registration>
where
T: Evented,
{
let handle = Handle::current();
let address = if let Some(inner) = handle.inner() {
inner.add_source(io, ready)?
} else {
return Err(io::Error::new(
io::ErrorKind::Other,
"failed to find event loop",
));
};
Ok(Registration { handle, address })
}
/// Deregisters the I/O resource from the reactor it is associated with.
///
/// This function must be called before the I/O resource associated with the
/// registration is dropped.
///
/// Note that deregistering does not guarantee that the I/O resource can be
/// registered with a different reactor. Some I/O resource types can only be
/// associated with a single reactor instance for their lifetime.
///
/// # Return
///
/// If the deregistration was successful, `Ok` is returned. Any calls to
/// `Reactor::turn` that happen after a successful call to `deregister` will
/// no longer result in notifications getting sent for this registration.
///
/// `Err` is returned if an error is encountered.
pub fn deregister<T>(&mut self, io: &T) -> io::Result<()>
where
T: Evented,
{
let inner = match self.handle.inner() {
Some(inner) => inner,
None => return Err(io::Error::new(io::ErrorKind::Other, "reactor gone")),
};
inner.deregister_source(io)
}
/// Polls for events on the I/O resource's read readiness stream.
///
/// If the I/O resource receives a new read readiness event since the last
/// call to `poll_read_ready`, it is returned. If it has not, the current
/// task is notified once a new event is received.
///
/// All events except `HUP` are [edge-triggered]. Once `HUP` is returned,
/// the function will always return `Ready(HUP)`. This should be treated as
/// the end of the readiness stream.
///
/// Ensure that [`register`] has been called first.
///
/// # Return value
///
/// There are several possible return values:
///
/// * `Poll::Ready(Ok(readiness))` means that the I/O resource has received
/// a new readiness event. The readiness value is included.
///
/// * `Poll::Pending` means that no new readiness events have been received
/// since the last call to `poll_read_ready`.
///
/// * `Poll::Ready(Err(err))` means that the registration has encountered an
/// error. This error either represents a permanent internal error **or**
/// the fact that [`register`] was not called first.
///
/// [`register`]: #method.register
/// [edge-triggered]: https://docs.rs/mio/0.6/mio/struct.Poll.html#edge-triggered-and-level-triggered
///
/// # Panics
///
/// This function will panic if called from outside of a task context.
pub fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<mio::Ready>> {
// Keep track of task budget
ready!(crate::coop::poll_proceed(cx));
let v = self.poll_ready(Direction::Read, Some(cx))?;
match v {
Some(v) => Poll::Ready(Ok(v)),
None => Poll::Pending,
}
}
/// Consume any pending read readiness event.
///
/// This function is identical to [`poll_read_ready`] **except** that it
/// will not notify the current task when a new event is received. As such,
/// it is safe to call this function from outside of a task context.
///
/// [`poll_read_ready`]: #method.poll_read_ready
pub fn take_read_ready(&self) -> io::Result<Option<mio::Ready>> {
self.poll_ready(Direction::Read, None)
}
/// Polls for events on the I/O resource's write readiness stream.
///
/// If the I/O resource receives a new write readiness event since the last
/// call to `poll_write_ready`, it is returned. If it has not, the current
/// task is notified once a new event is received.
///
/// All events except `HUP` are [edge-triggered]. Once `HUP` is returned,
/// the function will always return `Ready(HUP)`. This should be treated as
/// the end of the readiness stream.
///
/// Ensure that [`register`] has been called first.
///
/// # Return value
///
/// There are several possible return values:
///
/// * `Poll::Ready(Ok(readiness))` means that the I/O resource has received
/// a new readiness event. The readiness value is included.
///
/// * `Poll::Pending` means that no new readiness events have been received
/// since the last call to `poll_write_ready`.
///
/// * `Poll::Ready(Err(err))` means that the registration has encountered an
/// error. This error either represents a permanent internal error **or**
/// the fact that [`register`] was not called first.
///
/// [`register`]: #method.register
/// [edge-triggered]: https://docs.rs/mio/0.6/mio/struct.Poll.html#edge-triggered-and-level-triggered
///
/// # Panics
///
/// This function will panic if called from outside of a task context.
pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<mio::Ready>> {
// Keep track of task budget
ready!(crate::coop::poll_proceed(cx));
let v = self.poll_ready(Direction::Write, Some(cx))?;
match v {
Some(v) => Poll::Ready(Ok(v)),
None => Poll::Pending,
}
}
/// Consumes any pending write readiness event.
///
/// This function is identical to [`poll_write_ready`] **except** that it
/// will not notify the current task when a new event is received. As such,
/// it is safe to call this function from outside of a task context.
///
/// [`poll_write_ready`]: #method.poll_write_ready
pub fn take_write_ready(&self) -> io::Result<Option<mio::Ready>> {
self.poll_ready(Direction::Write, None)
}
/// Polls for events on the I/O resource's `direction` readiness stream.
///
/// If called with a task context, notify the task when a new event is
/// received.
fn poll_ready(
&self,
direction: Direction,
cx: Option<&mut Context<'_>>,
) -> io::Result<Option<mio::Ready>> {
let inner = match self.handle.inner() {
Some(inner) => inner,
None => return Err(io::Error::new(io::ErrorKind::Other, "reactor gone")),
};
// If the task should be notified about new events, ensure that it has
// been registered
if let Some(ref cx) = cx {
inner.register(self.address, direction, cx.waker().clone())
}
let mask = direction.mask();
let mask_no_hup = (mask - platform::hup() - platform::error()).as_usize();
let sched = inner.io_dispatch.get(self.address).unwrap();
// This consumes the current readiness state **except** for HUP and
// error. HUP and error are excluded because a) they are final states
// and never transitition out and b) both the read AND the write
// directions need to be able to obvserve these states.
//
// # Platform-specific behavior
//
// HUP and error readiness are platform-specific. On epoll platforms,
// HUP has specific conditions that must be met by both peers of a
// connection in order to be triggered.
//
// On epoll platforms, `EPOLLERR` is signaled through
// `UnixReady::error()` and is important to be observable by both read
// AND write. A specific case that `EPOLLERR` occurs is when the read
// end of a pipe is closed. When this occurs, a peer blocked by
// writing to the pipe should be notified.
let curr_ready = sched
.set_readiness(self.address, |curr| curr & (!mask_no_hup))
.unwrap_or_else(|_| panic!("address {:?} no longer valid!", self.address));
let mut ready = mask & mio::Ready::from_usize(curr_ready);
if ready.is_empty() {
if let Some(cx) = cx {
// Update the task info
match direction {
Direction::Read => sched.reader.register_by_ref(cx.waker()),
Direction::Write => sched.writer.register_by_ref(cx.waker()),
}
// Try again
let curr_ready = sched
.set_readiness(self.address, |curr| curr & (!mask_no_hup))
.unwrap_or_else(|_| panic!("address {:?} no longer valid!", self.address));
ready = mask & mio::Ready::from_usize(curr_ready);
}
}
if ready.is_empty() {
Ok(None)
} else {
Ok(Some(ready))
}
}
}
unsafe impl Send for Registration {}
unsafe impl Sync for Registration {}
impl Drop for Registration {
fn drop(&mut self) {
let inner = match self.handle.inner() {
Some(inner) => inner,
None => return,
};
inner.drop_source(self.address);
}
}