|  | use std::fmt; | 
|  | use std::io; | 
|  | use std::net::{SocketAddr, TcpListener as StdTcpListener}; | 
|  | use std::time::Duration; | 
|  |  | 
|  | use futures_util::FutureExt as _; | 
|  | use tokio::net::TcpListener; | 
|  | use tokio::time::Delay; | 
|  |  | 
|  | use crate::common::{task, Future, Pin, Poll}; | 
|  |  | 
|  | pub use self::addr_stream::AddrStream; | 
|  | use super::Accept; | 
|  |  | 
|  | /// A stream of connections from binding to an address. | 
|  | #[must_use = "streams do nothing unless polled"] | 
|  | pub struct AddrIncoming { | 
|  | addr: SocketAddr, | 
|  | listener: TcpListener, | 
|  | sleep_on_errors: bool, | 
|  | tcp_keepalive_timeout: Option<Duration>, | 
|  | tcp_nodelay: bool, | 
|  | timeout: Option<Delay>, | 
|  | } | 
|  |  | 
|  | impl AddrIncoming { | 
|  | pub(super) fn new(addr: &SocketAddr) -> crate::Result<Self> { | 
|  | let std_listener = StdTcpListener::bind(addr).map_err(crate::Error::new_listen)?; | 
|  |  | 
|  | AddrIncoming::from_std(std_listener) | 
|  | } | 
|  |  | 
|  | pub(super) fn from_std(std_listener: StdTcpListener) -> crate::Result<Self> { | 
|  | let listener = TcpListener::from_std(std_listener).map_err(crate::Error::new_listen)?; | 
|  | let addr = listener.local_addr().map_err(crate::Error::new_listen)?; | 
|  | Ok(AddrIncoming { | 
|  | listener, | 
|  | addr, | 
|  | sleep_on_errors: true, | 
|  | tcp_keepalive_timeout: None, | 
|  | tcp_nodelay: false, | 
|  | timeout: None, | 
|  | }) | 
|  | } | 
|  |  | 
|  | /// Creates a new `AddrIncoming` binding to provided socket address. | 
|  | pub fn bind(addr: &SocketAddr) -> crate::Result<Self> { | 
|  | AddrIncoming::new(addr) | 
|  | } | 
|  |  | 
|  | /// Get the local address bound to this listener. | 
|  | pub fn local_addr(&self) -> SocketAddr { | 
|  | self.addr | 
|  | } | 
|  |  | 
|  | /// Set whether TCP keepalive messages are enabled on accepted connections. | 
|  | /// | 
|  | /// If `None` is specified, keepalive is disabled, otherwise the duration | 
|  | /// specified will be the time to remain idle before sending TCP keepalive | 
|  | /// probes. | 
|  | pub fn set_keepalive(&mut self, keepalive: Option<Duration>) -> &mut Self { | 
|  | self.tcp_keepalive_timeout = keepalive; | 
|  | self | 
|  | } | 
|  |  | 
|  | /// Set the value of `TCP_NODELAY` option for accepted connections. | 
|  | pub fn set_nodelay(&mut self, enabled: bool) -> &mut Self { | 
|  | self.tcp_nodelay = enabled; | 
|  | self | 
|  | } | 
|  |  | 
|  | /// Set whether to sleep on accept errors. | 
|  | /// | 
|  | /// A possible scenario is that the process has hit the max open files | 
|  | /// allowed, and so trying to accept a new connection will fail with | 
|  | /// `EMFILE`. In some cases, it's preferable to just wait for some time, if | 
|  | /// the application will likely close some files (or connections), and try | 
|  | /// to accept the connection again. If this option is `true`, the error | 
|  | /// will be logged at the `error` level, since it is still a big deal, | 
|  | /// and then the listener will sleep for 1 second. | 
|  | /// | 
|  | /// In other cases, hitting the max open files should be treat similarly | 
|  | /// to being out-of-memory, and simply error (and shutdown). Setting | 
|  | /// this option to `false` will allow that. | 
|  | /// | 
|  | /// Default is `true`. | 
|  | pub fn set_sleep_on_errors(&mut self, val: bool) { | 
|  | self.sleep_on_errors = val; | 
|  | } | 
|  |  | 
|  | fn poll_next_(&mut self, cx: &mut task::Context<'_>) -> Poll<io::Result<AddrStream>> { | 
|  | // Check if a previous timeout is active that was set by IO errors. | 
|  | if let Some(ref mut to) = self.timeout { | 
|  | match Pin::new(to).poll(cx) { | 
|  | Poll::Ready(()) => {} | 
|  | Poll::Pending => return Poll::Pending, | 
|  | } | 
|  | } | 
|  | self.timeout = None; | 
|  |  | 
|  | let accept = self.listener.accept(); | 
|  | futures_util::pin_mut!(accept); | 
|  |  | 
|  | loop { | 
|  | match accept.poll_unpin(cx) { | 
|  | Poll::Ready(Ok((socket, addr))) => { | 
|  | if let Some(dur) = self.tcp_keepalive_timeout { | 
|  | if let Err(e) = socket.set_keepalive(Some(dur)) { | 
|  | trace!("error trying to set TCP keepalive: {}", e); | 
|  | } | 
|  | } | 
|  | if let Err(e) = socket.set_nodelay(self.tcp_nodelay) { | 
|  | trace!("error trying to set TCP nodelay: {}", e); | 
|  | } | 
|  | return Poll::Ready(Ok(AddrStream::new(socket, addr))); | 
|  | } | 
|  | Poll::Pending => return Poll::Pending, | 
|  | Poll::Ready(Err(e)) => { | 
|  | // Connection errors can be ignored directly, continue by | 
|  | // accepting the next request. | 
|  | if is_connection_error(&e) { | 
|  | debug!("accepted connection already errored: {}", e); | 
|  | continue; | 
|  | } | 
|  |  | 
|  | if self.sleep_on_errors { | 
|  | error!("accept error: {}", e); | 
|  |  | 
|  | // Sleep 1s. | 
|  | let mut timeout = tokio::time::delay_for(Duration::from_secs(1)); | 
|  |  | 
|  | match Pin::new(&mut timeout).poll(cx) { | 
|  | Poll::Ready(()) => { | 
|  | // Wow, it's been a second already? Ok then... | 
|  | continue; | 
|  | } | 
|  | Poll::Pending => { | 
|  | self.timeout = Some(timeout); | 
|  | return Poll::Pending; | 
|  | } | 
|  | } | 
|  | } else { | 
|  | return Poll::Ready(Err(e)); | 
|  | } | 
|  | } | 
|  | } | 
|  | } | 
|  | } | 
|  | } | 
|  |  | 
|  | impl Accept for AddrIncoming { | 
|  | type Conn = AddrStream; | 
|  | type Error = io::Error; | 
|  |  | 
|  | fn poll_accept( | 
|  | mut self: Pin<&mut Self>, | 
|  | cx: &mut task::Context<'_>, | 
|  | ) -> Poll<Option<Result<Self::Conn, Self::Error>>> { | 
|  | let result = ready!(self.poll_next_(cx)); | 
|  | Poll::Ready(Some(result)) | 
|  | } | 
|  | } | 
|  |  | 
|  | /// This function defines errors that are per-connection. Which basically | 
|  | /// means that if we get this error from `accept()` system call it means | 
|  | /// next connection might be ready to be accepted. | 
|  | /// | 
|  | /// All other errors will incur a timeout before next `accept()` is performed. | 
|  | /// The timeout is useful to handle resource exhaustion errors like ENFILE | 
|  | /// and EMFILE. Otherwise, could enter into tight loop. | 
|  | fn is_connection_error(e: &io::Error) -> bool { | 
|  | match e.kind() { | 
|  | io::ErrorKind::ConnectionRefused | 
|  | | io::ErrorKind::ConnectionAborted | 
|  | | io::ErrorKind::ConnectionReset => true, | 
|  | _ => false, | 
|  | } | 
|  | } | 
|  |  | 
|  | impl fmt::Debug for AddrIncoming { | 
|  | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { | 
|  | f.debug_struct("AddrIncoming") | 
|  | .field("addr", &self.addr) | 
|  | .field("sleep_on_errors", &self.sleep_on_errors) | 
|  | .field("tcp_keepalive_timeout", &self.tcp_keepalive_timeout) | 
|  | .field("tcp_nodelay", &self.tcp_nodelay) | 
|  | .finish() | 
|  | } | 
|  | } | 
|  |  | 
|  | mod addr_stream { | 
|  | use bytes::{Buf, BufMut}; | 
|  | use std::io; | 
|  | use std::net::SocketAddr; | 
|  | use tokio::io::{AsyncRead, AsyncWrite}; | 
|  | use tokio::net::TcpStream; | 
|  |  | 
|  | use crate::common::{task, Pin, Poll}; | 
|  |  | 
|  | /// A transport returned yieled by `AddrIncoming`. | 
|  | #[derive(Debug)] | 
|  | pub struct AddrStream { | 
|  | inner: TcpStream, | 
|  | pub(super) remote_addr: SocketAddr, | 
|  | } | 
|  |  | 
|  | impl AddrStream { | 
|  | pub(super) fn new(tcp: TcpStream, addr: SocketAddr) -> AddrStream { | 
|  | AddrStream { | 
|  | inner: tcp, | 
|  | remote_addr: addr, | 
|  | } | 
|  | } | 
|  |  | 
|  | /// Returns the remote (peer) address of this connection. | 
|  | #[inline] | 
|  | pub fn remote_addr(&self) -> SocketAddr { | 
|  | self.remote_addr | 
|  | } | 
|  |  | 
|  | /// Consumes the AddrStream and returns the underlying IO object | 
|  | #[inline] | 
|  | pub fn into_inner(self) -> TcpStream { | 
|  | self.inner | 
|  | } | 
|  |  | 
|  | /// Attempt to receive data on the socket, without removing that data | 
|  | /// from the queue, registering the current task for wakeup if data is | 
|  | /// not yet available. | 
|  | pub fn poll_peek( | 
|  | &mut self, | 
|  | cx: &mut task::Context<'_>, | 
|  | buf: &mut [u8], | 
|  | ) -> Poll<io::Result<usize>> { | 
|  | self.inner.poll_peek(cx, buf) | 
|  | } | 
|  | } | 
|  |  | 
|  | impl AsyncRead for AddrStream { | 
|  | unsafe fn prepare_uninitialized_buffer( | 
|  | &self, | 
|  | buf: &mut [std::mem::MaybeUninit<u8>], | 
|  | ) -> bool { | 
|  | self.inner.prepare_uninitialized_buffer(buf) | 
|  | } | 
|  |  | 
|  | #[inline] | 
|  | fn poll_read( | 
|  | mut self: Pin<&mut Self>, | 
|  | cx: &mut task::Context<'_>, | 
|  | buf: &mut [u8], | 
|  | ) -> Poll<io::Result<usize>> { | 
|  | Pin::new(&mut self.inner).poll_read(cx, buf) | 
|  | } | 
|  |  | 
|  | #[inline] | 
|  | fn poll_read_buf<B: BufMut>( | 
|  | mut self: Pin<&mut Self>, | 
|  | cx: &mut task::Context<'_>, | 
|  | buf: &mut B, | 
|  | ) -> Poll<io::Result<usize>> { | 
|  | Pin::new(&mut self.inner).poll_read_buf(cx, buf) | 
|  | } | 
|  | } | 
|  |  | 
|  | impl AsyncWrite for AddrStream { | 
|  | #[inline] | 
|  | fn poll_write( | 
|  | mut self: Pin<&mut Self>, | 
|  | cx: &mut task::Context<'_>, | 
|  | buf: &[u8], | 
|  | ) -> Poll<io::Result<usize>> { | 
|  | Pin::new(&mut self.inner).poll_write(cx, buf) | 
|  | } | 
|  |  | 
|  | #[inline] | 
|  | fn poll_write_buf<B: Buf>( | 
|  | mut self: Pin<&mut Self>, | 
|  | cx: &mut task::Context<'_>, | 
|  | buf: &mut B, | 
|  | ) -> Poll<io::Result<usize>> { | 
|  | Pin::new(&mut self.inner).poll_write_buf(cx, buf) | 
|  | } | 
|  |  | 
|  | #[inline] | 
|  | fn poll_flush(self: Pin<&mut Self>, _cx: &mut task::Context<'_>) -> Poll<io::Result<()>> { | 
|  | // TCP flush is a noop | 
|  | Poll::Ready(Ok(())) | 
|  | } | 
|  |  | 
|  | #[inline] | 
|  | fn poll_shutdown( | 
|  | mut self: Pin<&mut Self>, | 
|  | cx: &mut task::Context<'_>, | 
|  | ) -> Poll<io::Result<()>> { | 
|  | Pin::new(&mut self.inner).poll_shutdown(cx) | 
|  | } | 
|  | } | 
|  | } |