| use std::ops::{Deref, DerefMut}; |
| #[cfg(unix)] |
| use std::os::unix::io::AsRawFd; |
| #[cfg(windows)] |
| use std::os::windows::io::AsRawSocket; |
| #[cfg(debug_assertions)] |
| use std::sync::atomic::{AtomicUsize, Ordering}; |
| use std::{fmt, io}; |
| |
| #[cfg(any(unix, debug_assertions))] |
| use crate::poll; |
| use crate::sys::IoSourceState; |
| use crate::{event, Interest, Registry, Token}; |
| |
| /// Adapter for a [`RawFd`] or [`RawSocket`] providing an [`event::Source`] |
| /// implementation. |
| /// |
| /// `IoSource` enables registering any FD or socket wrapper with [`Poll`]. |
| /// |
| /// While only implementations for TCP, UDP, and UDS (Unix only) are provided, |
| /// Mio supports registering any FD or socket that can be registered with the |
| /// underlying OS selector. `IoSource` provides the necessary bridge. |
| /// |
| /// [`RawFd`]: std::os::unix::io::RawFd |
| /// [`RawSocket`]: std::os::windows::io::RawSocket |
| /// |
| /// # Notes |
| /// |
| /// To handle the registrations and events properly **all** I/O operations (such |
| /// as `read`, `write`, etc.) must go through the [`do_io`] method to ensure the |
| /// internal state is updated accordingly. |
| /// |
| /// [`Poll`]: crate::Poll |
| /// [`do_io`]: IoSource::do_io |
| /* |
| /// |
| /// # Examples |
| /// |
| /// Basic usage. |
| /// |
| /// ``` |
| /// # use std::error::Error; |
| /// # fn main() -> Result<(), Box<dyn Error>> { |
| /// use mio::{Interest, Poll, Token}; |
| /// use mio::IoSource; |
| /// |
| /// use std::net; |
| /// |
| /// let poll = Poll::new()?; |
| /// |
| /// // Bind a std TCP listener. |
| /// let listener = net::TcpListener::bind("127.0.0.1:0")?; |
| /// // Wrap it in the `IoSource` type. |
| /// let mut listener = IoSource::new(listener); |
| /// |
| /// // Register the listener. |
| /// poll.registry().register(&mut listener, Token(0), Interest::READABLE)?; |
| /// # Ok(()) |
| /// # } |
| /// ``` |
| */ |
| pub struct IoSource<T> { |
| state: IoSourceState, |
| inner: T, |
| #[cfg(debug_assertions)] |
| selector_id: SelectorId, |
| } |
| |
| impl<T> IoSource<T> { |
| /// Create a new `IoSource`. |
| pub fn new(io: T) -> IoSource<T> { |
| IoSource { |
| state: IoSourceState::new(), |
| inner: io, |
| #[cfg(debug_assertions)] |
| selector_id: SelectorId::new(), |
| } |
| } |
| |
| /// Execute an I/O operations ensuring that the socket receives more events |
| /// if it hits a [`WouldBlock`] error. |
| /// |
| /// # Notes |
| /// |
| /// This method is required to be called for **all** I/O operations to |
| /// ensure the user will receive events once the socket is ready again after |
| /// returning a [`WouldBlock`] error. |
| /// |
| /// [`WouldBlock`]: io::ErrorKind::WouldBlock |
| pub fn do_io<F, R>(&self, f: F) -> io::Result<R> |
| where |
| F: FnOnce(&T) -> io::Result<R>, |
| { |
| self.state.do_io(f, &self.inner) |
| } |
| |
| /// Returns the I/O source, dropping the state. |
| /// |
| /// # Notes |
| /// |
| /// To ensure no more events are to be received for this I/O source first |
| /// [`deregister`] it. |
| /// |
| /// [`deregister`]: Registry::deregister |
| pub fn into_inner(self) -> T { |
| self.inner |
| } |
| } |
| |
| /// Be careful when using this method. All I/O operations that may block must go |
| /// through the [`do_io`] method. |
| /// |
| /// [`do_io`]: IoSource::do_io |
| impl<T> Deref for IoSource<T> { |
| type Target = T; |
| |
| fn deref(&self) -> &Self::Target { |
| &self.inner |
| } |
| } |
| |
| /// Be careful when using this method. All I/O operations that may block must go |
| /// through the [`do_io`] method. |
| /// |
| /// [`do_io`]: IoSource::do_io |
| impl<T> DerefMut for IoSource<T> { |
| fn deref_mut(&mut self) -> &mut Self::Target { |
| &mut self.inner |
| } |
| } |
| |
| #[cfg(unix)] |
| impl<T> event::Source for IoSource<T> |
| where |
| T: AsRawFd, |
| { |
| fn register( |
| &mut self, |
| registry: &Registry, |
| token: Token, |
| interests: Interest, |
| ) -> io::Result<()> { |
| #[cfg(debug_assertions)] |
| self.selector_id.associate(registry)?; |
| poll::selector(registry).register(self.inner.as_raw_fd(), token, interests) |
| } |
| |
| fn reregister( |
| &mut self, |
| registry: &Registry, |
| token: Token, |
| interests: Interest, |
| ) -> io::Result<()> { |
| #[cfg(debug_assertions)] |
| self.selector_id.check_association(registry)?; |
| poll::selector(registry).reregister(self.inner.as_raw_fd(), token, interests) |
| } |
| |
| fn deregister(&mut self, registry: &Registry) -> io::Result<()> { |
| #[cfg(debug_assertions)] |
| self.selector_id.remove_association(registry)?; |
| poll::selector(registry).deregister(self.inner.as_raw_fd()) |
| } |
| } |
| |
| #[cfg(windows)] |
| impl<T> event::Source for IoSource<T> |
| where |
| T: AsRawSocket, |
| { |
| fn register( |
| &mut self, |
| registry: &Registry, |
| token: Token, |
| interests: Interest, |
| ) -> io::Result<()> { |
| #[cfg(debug_assertions)] |
| self.selector_id.associate(registry)?; |
| self.state |
| .register(registry, token, interests, self.inner.as_raw_socket()) |
| } |
| |
| fn reregister( |
| &mut self, |
| registry: &Registry, |
| token: Token, |
| interests: Interest, |
| ) -> io::Result<()> { |
| #[cfg(debug_assertions)] |
| self.selector_id.check_association(registry)?; |
| self.state.reregister(registry, token, interests) |
| } |
| |
| fn deregister(&mut self, _registry: &Registry) -> io::Result<()> { |
| #[cfg(debug_assertions)] |
| self.selector_id.remove_association(_registry)?; |
| self.state.deregister() |
| } |
| } |
| |
| impl<T> fmt::Debug for IoSource<T> |
| where |
| T: fmt::Debug, |
| { |
| fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
| self.inner.fmt(f) |
| } |
| } |
| |
| /// Used to associate an `IoSource` with a `sys::Selector`. |
| #[cfg(debug_assertions)] |
| #[derive(Debug)] |
| struct SelectorId { |
| id: AtomicUsize, |
| } |
| |
| #[cfg(debug_assertions)] |
| impl SelectorId { |
| /// Value of `id` if `SelectorId` is not associated with any |
| /// `sys::Selector`. Valid selector ids start at 1. |
| const UNASSOCIATED: usize = 0; |
| |
| /// Create a new `SelectorId`. |
| const fn new() -> SelectorId { |
| SelectorId { |
| id: AtomicUsize::new(Self::UNASSOCIATED), |
| } |
| } |
| |
| /// Associate an I/O source with `registry`, returning an error if its |
| /// already registered. |
| fn associate(&self, registry: &Registry) -> io::Result<()> { |
| let registry_id = poll::selector(®istry).id(); |
| let previous_id = self.id.swap(registry_id, Ordering::AcqRel); |
| |
| if previous_id == Self::UNASSOCIATED { |
| Ok(()) |
| } else { |
| Err(io::Error::new( |
| io::ErrorKind::AlreadyExists, |
| "I/O source already registered with a `Registry`", |
| )) |
| } |
| } |
| |
| /// Check the association of an I/O source with `registry`, returning an |
| /// error if its registered with a different `Registry` or not registered at |
| /// all. |
| fn check_association(&self, registry: &Registry) -> io::Result<()> { |
| let registry_id = poll::selector(®istry).id(); |
| let id = self.id.load(Ordering::Acquire); |
| |
| if id == registry_id { |
| Ok(()) |
| } else if id == Self::UNASSOCIATED { |
| Err(io::Error::new( |
| io::ErrorKind::NotFound, |
| "I/O source not registered with `Registry`", |
| )) |
| } else { |
| Err(io::Error::new( |
| io::ErrorKind::AlreadyExists, |
| "I/O source already registered with a different `Registry`", |
| )) |
| } |
| } |
| |
| /// Remove a previously made association from `registry`, returns an error |
| /// if it was not previously associated with `registry`. |
| fn remove_association(&self, registry: &Registry) -> io::Result<()> { |
| let registry_id = poll::selector(®istry).id(); |
| let previous_id = self.id.swap(Self::UNASSOCIATED, Ordering::AcqRel); |
| |
| if previous_id == registry_id { |
| Ok(()) |
| } else { |
| Err(io::Error::new( |
| io::ErrorKind::NotFound, |
| "I/O source not registered with `Registry`", |
| )) |
| } |
| } |
| } |
| |
| #[cfg(debug_assertions)] |
| impl Clone for SelectorId { |
| fn clone(&self) -> SelectorId { |
| SelectorId { |
| id: AtomicUsize::new(self.id.load(Ordering::Acquire)), |
| } |
| } |
| } |