blob: 6939c0d034ec602d4a48abcb7536b5a6c2220abf [file] [log] [blame] [edit]
use std::ops::{Deref, DerefMut};
#[cfg(unix)]
use std::os::unix::io::AsRawFd;
#[cfg(windows)]
use std::os::windows::io::AsRawSocket;
#[cfg(debug_assertions)]
use std::sync::atomic::{AtomicUsize, Ordering};
use std::{fmt, io};
#[cfg(any(unix, debug_assertions))]
use crate::poll;
use crate::sys::IoSourceState;
use crate::{event, Interest, Registry, Token};
/// Adapter for a [`RawFd`] or [`RawSocket`] providing an [`event::Source`]
/// implementation.
///
/// `IoSource` enables registering any FD or socket wrapper with [`Poll`].
///
/// While only implementations for TCP, UDP, and UDS (Unix only) are provided,
/// Mio supports registering any FD or socket that can be registered with the
/// underlying OS selector. `IoSource` provides the necessary bridge.
///
/// [`RawFd`]: std::os::unix::io::RawFd
/// [`RawSocket`]: std::os::windows::io::RawSocket
///
/// # Notes
///
/// To handle the registrations and events properly **all** I/O operations (such
/// as `read`, `write`, etc.) must go through the [`do_io`] method to ensure the
/// internal state is updated accordingly.
///
/// [`Poll`]: crate::Poll
/// [`do_io`]: IoSource::do_io
/*
///
/// # Examples
///
/// Basic usage.
///
/// ```
/// # use std::error::Error;
/// # fn main() -> Result<(), Box<dyn Error>> {
/// use mio::{Interest, Poll, Token};
/// use mio::IoSource;
///
/// use std::net;
///
/// let poll = Poll::new()?;
///
/// // Bind a std TCP listener.
/// let listener = net::TcpListener::bind("127.0.0.1:0")?;
/// // Wrap it in the `IoSource` type.
/// let mut listener = IoSource::new(listener);
///
/// // Register the listener.
/// poll.registry().register(&mut listener, Token(0), Interest::READABLE)?;
/// # Ok(())
/// # }
/// ```
*/
pub struct IoSource<T> {
state: IoSourceState,
inner: T,
#[cfg(debug_assertions)]
selector_id: SelectorId,
}
impl<T> IoSource<T> {
/// Create a new `IoSource`.
pub fn new(io: T) -> IoSource<T> {
IoSource {
state: IoSourceState::new(),
inner: io,
#[cfg(debug_assertions)]
selector_id: SelectorId::new(),
}
}
/// Execute an I/O operations ensuring that the socket receives more events
/// if it hits a [`WouldBlock`] error.
///
/// # Notes
///
/// This method is required to be called for **all** I/O operations to
/// ensure the user will receive events once the socket is ready again after
/// returning a [`WouldBlock`] error.
///
/// [`WouldBlock`]: io::ErrorKind::WouldBlock
pub fn do_io<F, R>(&self, f: F) -> io::Result<R>
where
F: FnOnce(&T) -> io::Result<R>,
{
self.state.do_io(f, &self.inner)
}
/// Returns the I/O source, dropping the state.
///
/// # Notes
///
/// To ensure no more events are to be received for this I/O source first
/// [`deregister`] it.
///
/// [`deregister`]: Registry::deregister
pub fn into_inner(self) -> T {
self.inner
}
}
/// Be careful when using this method. All I/O operations that may block must go
/// through the [`do_io`] method.
///
/// [`do_io`]: IoSource::do_io
impl<T> Deref for IoSource<T> {
type Target = T;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
/// Be careful when using this method. All I/O operations that may block must go
/// through the [`do_io`] method.
///
/// [`do_io`]: IoSource::do_io
impl<T> DerefMut for IoSource<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.inner
}
}
#[cfg(unix)]
impl<T> event::Source for IoSource<T>
where
T: AsRawFd,
{
fn register(
&mut self,
registry: &Registry,
token: Token,
interests: Interest,
) -> io::Result<()> {
#[cfg(debug_assertions)]
self.selector_id.associate(registry)?;
poll::selector(registry).register(self.inner.as_raw_fd(), token, interests)
}
fn reregister(
&mut self,
registry: &Registry,
token: Token,
interests: Interest,
) -> io::Result<()> {
#[cfg(debug_assertions)]
self.selector_id.check_association(registry)?;
poll::selector(registry).reregister(self.inner.as_raw_fd(), token, interests)
}
fn deregister(&mut self, registry: &Registry) -> io::Result<()> {
#[cfg(debug_assertions)]
self.selector_id.remove_association(registry)?;
poll::selector(registry).deregister(self.inner.as_raw_fd())
}
}
#[cfg(windows)]
impl<T> event::Source for IoSource<T>
where
T: AsRawSocket,
{
fn register(
&mut self,
registry: &Registry,
token: Token,
interests: Interest,
) -> io::Result<()> {
#[cfg(debug_assertions)]
self.selector_id.associate(registry)?;
self.state
.register(registry, token, interests, self.inner.as_raw_socket())
}
fn reregister(
&mut self,
registry: &Registry,
token: Token,
interests: Interest,
) -> io::Result<()> {
#[cfg(debug_assertions)]
self.selector_id.check_association(registry)?;
self.state.reregister(registry, token, interests)
}
fn deregister(&mut self, _registry: &Registry) -> io::Result<()> {
#[cfg(debug_assertions)]
self.selector_id.remove_association(_registry)?;
self.state.deregister()
}
}
impl<T> fmt::Debug for IoSource<T>
where
T: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.inner.fmt(f)
}
}
/// Used to associate an `IoSource` with a `sys::Selector`.
#[cfg(debug_assertions)]
#[derive(Debug)]
struct SelectorId {
id: AtomicUsize,
}
#[cfg(debug_assertions)]
impl SelectorId {
/// Value of `id` if `SelectorId` is not associated with any
/// `sys::Selector`. Valid selector ids start at 1.
const UNASSOCIATED: usize = 0;
/// Create a new `SelectorId`.
const fn new() -> SelectorId {
SelectorId {
id: AtomicUsize::new(Self::UNASSOCIATED),
}
}
/// Associate an I/O source with `registry`, returning an error if its
/// already registered.
fn associate(&self, registry: &Registry) -> io::Result<()> {
let registry_id = poll::selector(&registry).id();
let previous_id = self.id.swap(registry_id, Ordering::AcqRel);
if previous_id == Self::UNASSOCIATED {
Ok(())
} else {
Err(io::Error::new(
io::ErrorKind::AlreadyExists,
"I/O source already registered with a `Registry`",
))
}
}
/// Check the association of an I/O source with `registry`, returning an
/// error if its registered with a different `Registry` or not registered at
/// all.
fn check_association(&self, registry: &Registry) -> io::Result<()> {
let registry_id = poll::selector(&registry).id();
let id = self.id.load(Ordering::Acquire);
if id == registry_id {
Ok(())
} else if id == Self::UNASSOCIATED {
Err(io::Error::new(
io::ErrorKind::NotFound,
"I/O source not registered with `Registry`",
))
} else {
Err(io::Error::new(
io::ErrorKind::AlreadyExists,
"I/O source already registered with a different `Registry`",
))
}
}
/// Remove a previously made association from `registry`, returns an error
/// if it was not previously associated with `registry`.
fn remove_association(&self, registry: &Registry) -> io::Result<()> {
let registry_id = poll::selector(&registry).id();
let previous_id = self.id.swap(Self::UNASSOCIATED, Ordering::AcqRel);
if previous_id == registry_id {
Ok(())
} else {
Err(io::Error::new(
io::ErrorKind::NotFound,
"I/O source not registered with `Registry`",
))
}
}
}
#[cfg(debug_assertions)]
impl Clone for SelectorId {
fn clone(&self) -> SelectorId {
SelectorId {
id: AtomicUsize::new(self.id.load(Ordering::Acquire)),
}
}
}