blob: 9e5db2259426ba7f63cb9b2a335a0ef7f8aaa169 [file] [log] [blame]
//! Readiness tracking streams, backing I/O objects.
//!
//! This module contains the core type which is used to back all I/O on object
//! in `tokio-core`. The `PollEvented` type is the implementation detail of
//! all I/O. Each `PollEvented` manages registration with a reactor,
//! acquisition of a token, and tracking of the readiness state on the
//! underlying I/O primitive.
use std::fmt;
use std::io::{self, Read, Write};
use std::sync::atomic::{AtomicUsize, Ordering};
use futures::{Async, Poll};
use mio::event::Evented;
use mio::Ready;
use tokio_io::{AsyncRead, AsyncWrite};
use reactor::{Handle, Remote};
use reactor::io_token::IoToken;
/// A concrete implementation of a stream of readiness notifications for I/O
/// objects that originates from an event loop.
///
/// Created by the `PollEvented::new` method, each `PollEvented` is
/// associated with a specific event loop and source of events that will be
/// registered with an event loop.
///
/// An instance of `PollEvented` is essentially the bridge between the `mio`
/// world and the `tokio-core` world, providing abstractions to receive
/// notifications about changes to an object's `mio::Ready` state.
///
/// Each readiness stream has a number of methods to test whether the underlying
/// object is readable or writable. Once the methods return that an object is
/// readable/writable, then it will continue to do so until the `need_read` or
/// `need_write` methods are called.
///
/// That is, this object is typically wrapped in another form of I/O object.
/// It's the responsibility of the wrapper to inform the readiness stream when a
/// "would block" I/O event is seen. The readiness stream will then take care of
/// any scheduling necessary to get notified when the event is ready again.
///
/// You can find more information about creating a custom I/O object [online].
///
/// [online]: https://tokio.rs/docs/going-deeper-tokio/core-low-level/#custom-io
///
/// ## Readiness to read/write
///
/// A `PollEvented` allows listening and waiting for an arbitrary `mio::Ready`
/// instance, including the platform-specific contents of `mio::Ready`. At most
/// two future tasks, however, can be waiting on a `PollEvented`. The
/// `need_read` and `need_write` methods can block two separate tasks, one on
/// reading and one on writing. Not all I/O events correspond to read/write,
/// however!
///
/// To account for this a `PollEvented` gets a little interesting when working
/// with an arbitrary instance of `mio::Ready` that may not map precisely to
/// "write" and "read" tasks. Currently it is defined that instances of
/// `mio::Ready` that do *not* return true from `is_writable` are all notified
/// through `need_read`, or the read task.
///
/// In other words, `poll_ready` with the `mio::UnixReady::hup` event will block
/// the read task of this `PollEvented` if the `hup` event isn't available.
/// Essentially a good rule of thumb is that if you're using the `poll_ready`
/// method you want to also use `need_read` to signal blocking and you should
/// otherwise probably avoid using two tasks on the same `PollEvented`.
pub struct PollEvented<E> {
token: IoToken,
handle: Remote,
readiness: AtomicUsize,
io: E,
}
impl<E: Evented + fmt::Debug> fmt::Debug for PollEvented<E> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("PollEvented")
.field("io", &self.io)
.finish()
}
}
impl<E: Evented> PollEvented<E> {
/// Creates a new readiness stream associated with the provided
/// `loop_handle` and for the given `source`.
///
/// This method returns a future which will resolve to the readiness stream
/// when it's ready.
pub fn new(io: E, handle: &Handle) -> io::Result<PollEvented<E>> {
Ok(PollEvented {
token: try!(IoToken::new(&io, handle)),
handle: handle.remote().clone(),
readiness: AtomicUsize::new(0),
io: io,
})
}
/// Deregisters this source of events from the reactor core specified.
///
/// This method can optionally be called to unregister the underlying I/O
/// object with the event loop that the `handle` provided points to.
/// Typically this method is not required as this automatically happens when
/// `E` is dropped, but for some use cases the `E` object doesn't represent
/// an owned reference, so dropping it won't automatically unregister with
/// the event loop.
///
/// This consumes `self` as it will no longer provide events after the
/// method is called, and will likely return an error if this `PollEvented`
/// was created on a separate event loop from the `handle` specified.
pub fn deregister(self, handle: &Handle) -> io::Result<()> {
let inner = match handle.inner.upgrade() {
Some(inner) => inner,
None => return Ok(()),
};
let ret = inner.borrow_mut().deregister_source(&self.io);
return ret
}
}
impl<E> PollEvented<E> {
/// Tests to see if this source is ready to be read from or not.
///
/// If this stream is not ready for a read then `NotReady` will be returned
/// and the current task will be scheduled to receive a notification when
/// the stream is readable again. In other words, this method is only safe
/// to call from within the context of a future's task, typically done in a
/// `Future::poll` method.
///
/// This is mostly equivalent to `self.poll_ready(Ready::readable())`.
///
/// # Panics
///
/// This function will panic if called outside the context of a future's
/// task.
pub fn poll_read(&self) -> Async<()> {
self.poll_ready(super::read_ready())
.map(|_| ())
}
/// Tests to see if this source is ready to be written to or not.
///
/// If this stream is not ready for a write then `NotReady` will be returned
/// and the current task will be scheduled to receive a notification when
/// the stream is writable again. In other words, this method is only safe
/// to call from within the context of a future's task, typically done in a
/// `Future::poll` method.
///
/// This is mostly equivalent to `self.poll_ready(Ready::writable())`.
///
/// # Panics
///
/// This function will panic if called outside the context of a future's
/// task.
pub fn poll_write(&self) -> Async<()> {
self.poll_ready(Ready::writable())
.map(|_| ())
}
/// Test to see whether this source fulfills any condition listed in `mask`
/// provided.
///
/// The `mask` given here is a mio `Ready` set of possible events. This can
/// contain any events like read/write but also platform-specific events
/// such as hup and error. The `mask` indicates events that are interested
/// in being ready.
///
/// If any event in `mask` is ready then it is returned through
/// `Async::Ready`. The `Ready` set returned is guaranteed to not be empty
/// and contains all events that are currently ready in the `mask` provided.
///
/// If no events are ready in the `mask` provided then the current task is
/// scheduled to receive a notification when any of them become ready. If
/// the `writable` event is contained within `mask` then this
/// `PollEvented`'s `write` task will be blocked and otherwise the `read`
/// task will be blocked. This is generally only relevant if you're working
/// with this `PollEvented` object on multiple tasks.
///
/// # Panics
///
/// This function will panic if called outside the context of a future's
/// task.
pub fn poll_ready(&self, mask: Ready) -> Async<Ready> {
let bits = super::ready2usize(mask);
match self.readiness.load(Ordering::SeqCst) & bits {
0 => {}
n => return Async::Ready(super::usize2ready(n)),
}
self.readiness.fetch_or(self.token.take_readiness(), Ordering::SeqCst);
match self.readiness.load(Ordering::SeqCst) & bits {
0 => {
if mask.is_writable() {
self.need_write();
} else {
self.need_read();
}
Async::NotReady
}
n => Async::Ready(super::usize2ready(n)),
}
}
/// Indicates to this source of events that the corresponding I/O object is
/// no longer readable, but it needs to be.
///
/// This function, like `poll_read`, is only safe to call from the context
/// of a future's task (typically in a `Future::poll` implementation). It
/// informs this readiness stream that the underlying object is no longer
/// readable, typically because a "would block" error was seen.
///
/// *All* readiness bits associated with this stream except the writable bit
/// will be reset when this method is called. The current task is then
/// scheduled to receive a notification whenever anything changes other than
/// the writable bit. Note that this typically just means the readable bit
/// is used here, but if you're using a custom I/O object for events like
/// hup/error this may also be relevant.
///
/// Note that it is also only valid to call this method if `poll_read`
/// previously indicated that the object is readable. That is, this function
/// must always be paired with calls to `poll_read` previously.
///
/// # Panics
///
/// This function will panic if called outside the context of a future's
/// task.
pub fn need_read(&self) {
let bits = super::ready2usize(super::read_ready());
self.readiness.fetch_and(!bits, Ordering::SeqCst);
self.token.schedule_read(&self.handle)
}
/// Indicates to this source of events that the corresponding I/O object is
/// no longer writable, but it needs to be.
///
/// This function, like `poll_write`, is only safe to call from the context
/// of a future's task (typically in a `Future::poll` implementation). It
/// informs this readiness stream that the underlying object is no longer
/// writable, typically because a "would block" error was seen.
///
/// The flag indicating that this stream is writable is unset and the
/// current task is scheduled to receive a notification when the stream is
/// then again writable.
///
/// Note that it is also only valid to call this method if `poll_write`
/// previously indicated that the object is writable. That is, this function
/// must always be paired with calls to `poll_write` previously.
///
/// # Panics
///
/// This function will panic if called outside the context of a future's
/// task.
pub fn need_write(&self) {
let bits = super::ready2usize(Ready::writable());
self.readiness.fetch_and(!bits, Ordering::SeqCst);
self.token.schedule_write(&self.handle)
}
/// Returns a reference to the event loop handle that this readiness stream
/// is associated with.
pub fn remote(&self) -> &Remote {
&self.handle
}
/// Returns a shared reference to the underlying I/O object this readiness
/// stream is wrapping.
pub fn get_ref(&self) -> &E {
&self.io
}
/// Returns a mutable reference to the underlying I/O object this readiness
/// stream is wrapping.
pub fn get_mut(&mut self) -> &mut E {
&mut self.io
}
}
impl<E: Read> Read for PollEvented<E> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
if let Async::NotReady = self.poll_read() {
return Err(io::ErrorKind::WouldBlock.into())
}
let r = self.get_mut().read(buf);
if is_wouldblock(&r) {
self.need_read();
}
return r
}
}
impl<E: Write> Write for PollEvented<E> {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
if let Async::NotReady = self.poll_write() {
return Err(io::ErrorKind::WouldBlock.into())
}
let r = self.get_mut().write(buf);
if is_wouldblock(&r) {
self.need_write();
}
return r
}
fn flush(&mut self) -> io::Result<()> {
if let Async::NotReady = self.poll_write() {
return Err(io::ErrorKind::WouldBlock.into())
}
let r = self.get_mut().flush();
if is_wouldblock(&r) {
self.need_write();
}
return r
}
}
impl<E: Read> AsyncRead for PollEvented<E> {
}
impl<E: Write> AsyncWrite for PollEvented<E> {
fn shutdown(&mut self) -> Poll<(), io::Error> {
Ok(().into())
}
}
#[allow(deprecated)]
impl<E: Read + Write> ::io::Io for PollEvented<E> {
fn poll_read(&mut self) -> Async<()> {
<PollEvented<E>>::poll_read(self)
}
fn poll_write(&mut self) -> Async<()> {
<PollEvented<E>>::poll_write(self)
}
}
impl<'a, E> Read for &'a PollEvented<E>
where &'a E: Read,
{
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
if let Async::NotReady = self.poll_read() {
return Err(io::ErrorKind::WouldBlock.into())
}
let r = self.get_ref().read(buf);
if is_wouldblock(&r) {
self.need_read();
}
return r
}
}
impl<'a, E> Write for &'a PollEvented<E>
where &'a E: Write,
{
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
if let Async::NotReady = self.poll_write() {
return Err(io::ErrorKind::WouldBlock.into())
}
let r = self.get_ref().write(buf);
if is_wouldblock(&r) {
self.need_write();
}
return r
}
fn flush(&mut self) -> io::Result<()> {
if let Async::NotReady = self.poll_write() {
return Err(io::ErrorKind::WouldBlock.into())
}
let r = self.get_ref().flush();
if is_wouldblock(&r) {
self.need_write();
}
return r
}
}
impl<'a, E> AsyncRead for &'a PollEvented<E>
where &'a E: Read,
{
}
impl<'a, E> AsyncWrite for &'a PollEvented<E>
where &'a E: Write,
{
fn shutdown(&mut self) -> Poll<(), io::Error> {
Ok(().into())
}
}
#[allow(deprecated)]
impl<'a, E> ::io::Io for &'a PollEvented<E>
where &'a E: Read + Write,
{
fn poll_read(&mut self) -> Async<()> {
<PollEvented<E>>::poll_read(self)
}
fn poll_write(&mut self) -> Async<()> {
<PollEvented<E>>::poll_write(self)
}
}
fn is_wouldblock<T>(r: &io::Result<T>) -> bool {
match *r {
Ok(_) => false,
Err(ref e) => e.kind() == io::ErrorKind::WouldBlock,
}
}
impl<E> Drop for PollEvented<E> {
fn drop(&mut self) {
self.token.drop_source(&self.handle);
}
}