blob: f8019ae01c75e4c1e0889a71808477dd33028fc0 [file] [log] [blame]
//! Readiness tracking streams, backing I/O objects.
//!
//! This module contains the core type which is used to back all I/O on object
//! in `tokio-core`. The `PollEvented` type is the implementation detail of
//! all I/O. Each `PollEvented` manages registration with a reactor,
//! acquisition of a token, and tracking of the readiness state on the
//! underlying I/O primitive.
use std::fmt;
use std::io::{self, Read, Write};
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::Relaxed;
use futures::{task, Async, Poll};
use mio::event::Evented;
use mio::Ready;
use tokio_io::{AsyncRead, AsyncWrite};
use tokio::reactor::{Registration};
use reactor::{Handle, Remote};
/// A concrete implementation of a stream of readiness notifications for I/O
/// objects that originates from an event loop.
///
/// Created by the `PollEvented::new` method, each `PollEvented` is
/// associated with a specific event loop and source of events that will be
/// registered with an event loop.
///
/// An instance of `PollEvented` is essentially the bridge between the `mio`
/// world and the `tokio-core` world, providing abstractions to receive
/// notifications about changes to an object's `mio::Ready` state.
///
/// Each readiness stream has a number of methods to test whether the underlying
/// object is readable or writable. Once the methods return that an object is
/// readable/writable, then it will continue to do so until the `need_read` or
/// `need_write` methods are called.
///
/// That is, this object is typically wrapped in another form of I/O object.
/// It's the responsibility of the wrapper to inform the readiness stream when a
/// "would block" I/O event is seen. The readiness stream will then take care of
/// any scheduling necessary to get notified when the event is ready again.
///
/// You can find more information about creating a custom I/O object [online].
///
/// [online]: https://tokio.rs/docs/going-deeper-tokio/core-low-level/#custom-io
///
/// ## Readiness to read/write
///
/// A `PollEvented` allows listening and waiting for an arbitrary `mio::Ready`
/// instance, including the platform-specific contents of `mio::Ready`. At most
/// two future tasks, however, can be waiting on a `PollEvented`. The
/// `need_read` and `need_write` methods can block two separate tasks, one on
/// reading and one on writing. Not all I/O events correspond to read/write,
/// however!
///
/// To account for this a `PollEvented` gets a little interesting when working
/// with an arbitrary instance of `mio::Ready` that may not map precisely to
/// "write" and "read" tasks. Currently it is defined that instances of
/// `mio::Ready` that do *not* return true from `is_writable` are all notified
/// through `need_read`, or the read task.
///
/// In other words, `poll_ready` with the `mio::UnixReady::hup` event will block
/// the read task of this `PollEvented` if the `hup` event isn't available.
/// Essentially a good rule of thumb is that if you're using the `poll_ready`
/// method you want to also use `need_read` to signal blocking and you should
/// otherwise probably avoid using two tasks on the same `PollEvented`.
pub struct PollEvented<E> {
io: E,
inner: Inner,
remote: Remote,
}
struct Inner {
registration: Registration,
/// Currently visible read readiness
read_readiness: AtomicUsize,
/// Currently visible write readiness
write_readiness: AtomicUsize,
}
impl<E: Evented> PollEvented<E> {
/// Creates a new readiness stream associated with the provided
/// `loop_handle` and for the given `source`.
///
/// This method returns a future which will resolve to the readiness stream
/// when it's ready.
pub fn new(io: E, handle: &Handle) -> io::Result<PollEvented<E>> {
let registration = Registration::new();
registration.register_with(&io, handle.new_tokio_handle())?;
Ok(PollEvented {
io: io,
inner: Inner {
registration,
read_readiness: AtomicUsize::new(0),
write_readiness: AtomicUsize::new(0),
},
remote: handle.remote().clone(),
})
}
/// Deregisters this source of events from the reactor core specified.
///
/// This method can optionally be called to unregister the underlying I/O
/// object with the event loop that the `handle` provided points to.
/// Typically this method is not required as this automatically happens when
/// `E` is dropped, but for some use cases the `E` object doesn't represent
/// an owned reference, so dropping it won't automatically unregister with
/// the event loop.
///
/// This consumes `self` as it will no longer provide events after the
/// method is called, and will likely return an error if this `PollEvented`
/// was created on a separate event loop from the `handle` specified.
pub fn deregister(self, _: &Handle) -> io::Result<()> {
// Nothing has to happen here anymore as I/O objects are explicitly
// deregistered before dropped.
Ok(())
}
}
impl<E> PollEvented<E> {
/// Tests to see if this source is ready to be read from or not.
///
/// If this stream is not ready for a read then `NotReady` will be returned
/// and the current task will be scheduled to receive a notification when
/// the stream is readable again. In other words, this method is only safe
/// to call from within the context of a future's task, typically done in a
/// `Future::poll` method.
///
/// This is mostly equivalent to `self.poll_ready(Ready::readable())`.
///
/// # Panics
///
/// This function will panic if called outside the context of a future's
/// task.
pub fn poll_read(&self) -> Async<()> {
if self.poll_read2().is_ready() {
return ().into();
}
Async::NotReady
}
fn poll_read2(&self) -> Async<Ready> {
// Load the cached readiness
match self.inner.read_readiness.load(Relaxed) {
0 => {}
mut n => {
// Check what's new with the reactor.
if let Some(ready) = self.inner.registration.take_read_ready().unwrap() {
n |= super::ready2usize(ready);
self.inner.read_readiness.store(n, Relaxed);
}
return super::usize2ready(n).into();
}
}
let ready = match self.inner.registration.poll_read_ready().unwrap() {
Async::Ready(r) => r,
_ => return Async::NotReady,
};
// Cache the value
self.inner.read_readiness.store(super::ready2usize(ready), Relaxed);
ready.into()
}
/// Tests to see if this source is ready to be written to or not.
///
/// If this stream is not ready for a write then `NotReady` will be returned
/// and the current task will be scheduled to receive a notification when
/// the stream is writable again. In other words, this method is only safe
/// to call from within the context of a future's task, typically done in a
/// `Future::poll` method.
///
/// This is mostly equivalent to `self.poll_ready(Ready::writable())`.
///
/// # Panics
///
/// This function will panic if called outside the context of a future's
/// task.
pub fn poll_write(&self) -> Async<()> {
match self.inner.write_readiness.load(Relaxed) {
0 => {}
mut n => {
// Check what's new with the reactor.
if let Some(ready) = self.inner.registration.take_write_ready().unwrap() {
n |= super::ready2usize(ready);
self.inner.write_readiness.store(n, Relaxed);
}
return ().into();
}
}
let ready = match self.inner.registration.poll_write_ready().unwrap() {
Async::Ready(r) => r,
_ => return Async::NotReady,
};
// Cache the value
self.inner.write_readiness.store(super::ready2usize(ready), Relaxed);
().into()
}
/// Test to see whether this source fulfills any condition listed in `mask`
/// provided.
///
/// The `mask` given here is a mio `Ready` set of possible events. This can
/// contain any events like read/write but also platform-specific events
/// such as hup and error. The `mask` indicates events that are interested
/// in being ready.
///
/// If any event in `mask` is ready then it is returned through
/// `Async::Ready`. The `Ready` set returned is guaranteed to not be empty
/// and contains all events that are currently ready in the `mask` provided.
///
/// If no events are ready in the `mask` provided then the current task is
/// scheduled to receive a notification when any of them become ready. If
/// the `writable` event is contained within `mask` then this
/// `PollEvented`'s `write` task will be blocked and otherwise the `read`
/// task will be blocked. This is generally only relevant if you're working
/// with this `PollEvented` object on multiple tasks.
///
/// # Panics
///
/// This function will panic if called outside the context of a future's
/// task.
pub fn poll_ready(&self, mask: Ready) -> Async<Ready> {
let mut ret = Ready::empty();
if mask.is_empty() {
return ret.into();
}
if mask.is_writable() {
if self.poll_write().is_ready() {
ret = Ready::writable();
}
}
let mask = mask - Ready::writable();
if !mask.is_empty() {
if let Async::Ready(v) = self.poll_read2() {
ret |= v & mask;
}
}
if ret.is_empty() {
if mask.is_writable() {
self.need_write();
}
if mask.is_readable() {
self.need_read();
}
Async::NotReady
} else {
ret.into()
}
}
/// Indicates to this source of events that the corresponding I/O object is
/// no longer readable, but it needs to be.
///
/// This function, like `poll_read`, is only safe to call from the context
/// of a future's task (typically in a `Future::poll` implementation). It
/// informs this readiness stream that the underlying object is no longer
/// readable, typically because a "would block" error was seen.
///
/// *All* readiness bits associated with this stream except the writable bit
/// will be reset when this method is called. The current task is then
/// scheduled to receive a notification whenever anything changes other than
/// the writable bit. Note that this typically just means the readable bit
/// is used here, but if you're using a custom I/O object for events like
/// hup/error this may also be relevant.
///
/// Note that it is also only valid to call this method if `poll_read`
/// previously indicated that the object is readable. That is, this function
/// must always be paired with calls to `poll_read` previously.
///
/// # Panics
///
/// This function will panic if called outside the context of a future's
/// task.
pub fn need_read(&self) {
self.inner.read_readiness.store(0, Relaxed);
if self.poll_read().is_ready() {
// Notify the current task
task::current().notify();
}
}
/// Indicates to this source of events that the corresponding I/O object is
/// no longer writable, but it needs to be.
///
/// This function, like `poll_write`, is only safe to call from the context
/// of a future's task (typically in a `Future::poll` implementation). It
/// informs this readiness stream that the underlying object is no longer
/// writable, typically because a "would block" error was seen.
///
/// The flag indicating that this stream is writable is unset and the
/// current task is scheduled to receive a notification when the stream is
/// then again writable.
///
/// Note that it is also only valid to call this method if `poll_write`
/// previously indicated that the object is writable. That is, this function
/// must always be paired with calls to `poll_write` previously.
///
/// # Panics
///
/// This function will panic if called outside the context of a future's
/// task.
pub fn need_write(&self) {
self.inner.write_readiness.store(0, Relaxed);
if self.poll_write().is_ready() {
// Notify the current task
task::current().notify();
}
}
/// Returns a reference to the event loop handle that this readiness stream
/// is associated with.
pub fn remote(&self) -> &Remote {
&self.remote
}
/// Returns a shared reference to the underlying I/O object this readiness
/// stream is wrapping.
pub fn get_ref(&self) -> &E {
&self.io
}
/// Returns a mutable reference to the underlying I/O object this readiness
/// stream is wrapping.
pub fn get_mut(&mut self) -> &mut E {
&mut self.io
}
}
impl<E: Read> Read for PollEvented<E> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
if let Async::NotReady = PollEvented::poll_read(self) {
return Err(io::ErrorKind::WouldBlock.into())
}
let r = self.get_mut().read(buf);
if is_wouldblock(&r) {
self.need_read();
}
r
}
}
impl<E: Write> Write for PollEvented<E> {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
if let Async::NotReady = PollEvented::poll_write(self) {
return Err(io::ErrorKind::WouldBlock.into())
}
let r = self.get_mut().write(buf);
if is_wouldblock(&r) {
self.need_write();
}
r
}
fn flush(&mut self) -> io::Result<()> {
if let Async::NotReady = PollEvented::poll_write(self) {
return Err(io::ErrorKind::WouldBlock.into())
}
let r = self.get_mut().flush();
if is_wouldblock(&r) {
self.need_write();
}
r
}
}
impl<E: Read> AsyncRead for PollEvented<E> {
}
impl<E: Write> AsyncWrite for PollEvented<E> {
fn shutdown(&mut self) -> Poll<(), io::Error> {
Ok(().into())
}
}
#[allow(deprecated)]
impl<E: Read + Write> ::io::Io for PollEvented<E> {
fn poll_read(&mut self) -> Async<()> {
<PollEvented<E>>::poll_read(self)
}
fn poll_write(&mut self) -> Async<()> {
<PollEvented<E>>::poll_write(self)
}
}
impl<'a, E> Read for &'a PollEvented<E>
where &'a E: Read,
{
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
if let Async::NotReady = PollEvented::poll_read(self) {
return Err(io::ErrorKind::WouldBlock.into())
}
let r = self.get_ref().read(buf);
if is_wouldblock(&r) {
self.need_read();
}
r
}
}
impl<'a, E> Write for &'a PollEvented<E>
where &'a E: Write,
{
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
if let Async::NotReady = PollEvented::poll_write(self) {
return Err(io::ErrorKind::WouldBlock.into())
}
let r = self.get_ref().write(buf);
if is_wouldblock(&r) {
self.need_write();
}
r
}
fn flush(&mut self) -> io::Result<()> {
if let Async::NotReady = PollEvented::poll_write(self) {
return Err(io::ErrorKind::WouldBlock.into())
}
let r = self.get_ref().flush();
if is_wouldblock(&r) {
self.need_write();
}
r
}
}
impl<'a, E> AsyncRead for &'a PollEvented<E>
where &'a E: Read,
{
}
impl<'a, E> AsyncWrite for &'a PollEvented<E>
where &'a E: Write,
{
fn shutdown(&mut self) -> Poll<(), io::Error> {
Ok(().into())
}
}
#[allow(deprecated)]
impl<'a, E> ::io::Io for &'a PollEvented<E>
where &'a E: Read + Write,
{
fn poll_read(&mut self) -> Async<()> {
<PollEvented<E>>::poll_read(self)
}
fn poll_write(&mut self) -> Async<()> {
<PollEvented<E>>::poll_write(self)
}
}
fn is_wouldblock<T>(r: &io::Result<T>) -> bool {
match *r {
Ok(_) => false,
Err(ref e) => e.kind() == io::ErrorKind::WouldBlock,
}
}
impl<E: Evented + fmt::Debug> fmt::Debug for PollEvented<E> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("PollEvented")
.field("io", &self.io)
.finish()
}
}