blob: ee65f06b6cc424ed87c0be6d92310422b6bea539 [file] [log] [blame]
use std::fmt;
use std::io::{self, Read, Write};
use std::mem;
use std::net::{self, SocketAddr, Shutdown};
use std::time::Duration;
use bytes::{Buf, BufMut};
use futures::stream::Stream;
use futures::sync::oneshot;
use futures::{Future, Poll, Async};
use iovec::IoVec;
use mio;
use tokio_io::{AsyncRead, AsyncWrite};
use reactor::{Handle, PollEvented};
/// An I/O object representing a TCP socket listening for incoming connections.
///
/// This object can be converted into a stream of incoming connections for
/// various forms of processing.
pub struct TcpListener {
io: PollEvented<mio::net::TcpListener>,
pending_accept: Option<oneshot::Receiver<io::Result<(TcpStream, SocketAddr)>>>,
}
/// Stream returned by the `TcpListener::incoming` function representing the
/// stream of sockets received from a listener.
pub struct Incoming {
inner: TcpListener,
}
impl TcpListener {
/// Create a new TCP listener associated with this event loop.
///
/// The TCP listener will bind to the provided `addr` address, if available.
/// If the result is `Ok`, the socket has successfully bound.
pub fn bind(addr: &SocketAddr, handle: &Handle) -> io::Result<TcpListener> {
let l = try!(mio::net::TcpListener::bind(addr));
TcpListener::new(l, handle)
}
/// Attempt to accept a connection and create a new connected `TcpStream` if
/// successful.
///
/// This function will attempt an accept operation, but will not block
/// waiting for it to complete. If the operation would block then a "would
/// block" error is returned. Additionally, if this method would block, it
/// registers the current task to receive a notification when it would
/// otherwise not block.
///
/// Note that typically for simple usage it's easier to treat incoming
/// connections as a `Stream` of `TcpStream`s with the `incoming` method
/// below.
///
/// # Panics
///
/// This function will panic if it is called outside the context of a
/// future's task. It's recommended to only call this from the
/// implementation of a `Future::poll`, if necessary.
pub fn accept(&mut self) -> io::Result<(TcpStream, SocketAddr)> {
loop {
if let Some(mut pending) = self.pending_accept.take() {
match pending.poll().expect("shouldn't be canceled") {
Async::NotReady => {
self.pending_accept = Some(pending);
return Err(::would_block())
},
Async::Ready(r) => return r,
}
}
if let Async::NotReady = self.io.poll_read() {
return Err(io::Error::new(io::ErrorKind::WouldBlock, "not ready"))
}
match self.io.get_ref().accept() {
Err(e) => {
if e.kind() == io::ErrorKind::WouldBlock {
self.io.need_read();
}
return Err(e)
},
Ok((sock, addr)) => {
// Fast path if we haven't left the event loop
if let Some(handle) = self.io.remote().handle() {
let io = try!(PollEvented::new(sock, &handle));
return Ok((TcpStream { io: io }, addr))
}
// If we're off the event loop then send the socket back
// over there to get registered and then we'll get it back
// eventually.
let (tx, rx) = oneshot::channel();
let remote = self.io.remote().clone();
remote.spawn(move |handle| {
let res = PollEvented::new(sock, handle)
.map(move |io| {
(TcpStream { io: io }, addr)
});
drop(tx.send(res));
Ok(())
});
self.pending_accept = Some(rx);
// continue to polling the `rx` at the beginning of the loop
}
}
}
}
/// Create a new TCP listener from the standard library's TCP listener.
///
/// This method can be used when the `Handle::tcp_listen` method isn't
/// sufficient because perhaps some more configuration is needed in terms of
/// before the calls to `bind` and `listen`.
///
/// This API is typically paired with the `net2` crate and the `TcpBuilder`
/// type to build up and customize a listener before it's shipped off to the
/// backing event loop. This allows configuration of options like
/// `SO_REUSEPORT`, binding to multiple addresses, etc.
///
/// The `addr` argument here is one of the addresses that `listener` is
/// bound to and the listener will only be guaranteed to accept connections
/// of the same address type currently.
///
/// Finally, the `handle` argument is the event loop that this listener will
/// be bound to.
///
/// The platform specific behavior of this function looks like:
///
/// * On Unix, the socket is placed into nonblocking mode and connections
/// can be accepted as normal
///
/// * On Windows, the address is stored internally and all future accepts
/// will only be for the same IP version as `addr` specified. That is, if
/// `addr` is an IPv4 address then all sockets accepted will be IPv4 as
/// well (same for IPv6).
pub fn from_listener(listener: net::TcpListener,
addr: &SocketAddr,
handle: &Handle) -> io::Result<TcpListener> {
let l = try!(mio::net::TcpListener::from_listener(listener, addr));
TcpListener::new(l, handle)
}
fn new(listener: mio::net::TcpListener, handle: &Handle)
-> io::Result<TcpListener> {
let io = try!(PollEvented::new(listener, handle));
Ok(TcpListener { io: io, pending_accept: None })
}
/// Test whether this socket is ready to be read or not.
pub fn poll_read(&self) -> Async<()> {
self.io.poll_read()
}
/// Returns the local address that this listener is bound to.
///
/// This can be useful, for example, when binding to port 0 to figure out
/// which port was actually bound.
pub fn local_addr(&self) -> io::Result<SocketAddr> {
self.io.get_ref().local_addr()
}
/// Consumes this listener, returning a stream of the sockets this listener
/// accepts.
///
/// This method returns an implementation of the `Stream` trait which
/// resolves to the sockets the are accepted on this listener.
pub fn incoming(self) -> Incoming {
Incoming { inner: self }
}
/// Sets the value for the `IP_TTL` option on this socket.
///
/// This value sets the time-to-live field that is used in every packet sent
/// from this socket.
pub fn set_ttl(&self, ttl: u32) -> io::Result<()> {
self.io.get_ref().set_ttl(ttl)
}
/// Gets the value of the `IP_TTL` option for this socket.
///
/// For more information about this option, see [`set_ttl`][link].
///
/// [link]: #method.set_ttl
pub fn ttl(&self) -> io::Result<u32> {
self.io.get_ref().ttl()
}
/// Sets the value for the `IPV6_V6ONLY` option on this socket.
///
/// If this is set to `true` then the socket is restricted to sending and
/// receiving IPv6 packets only. In this case two IPv4 and IPv6 applications
/// can bind the same port at the same time.
///
/// If this is set to `false` then the socket can be used to send and
/// receive packets from an IPv4-mapped IPv6 address.
pub fn set_only_v6(&self, only_v6: bool) -> io::Result<()> {
self.io.get_ref().set_only_v6(only_v6)
}
/// Gets the value of the `IPV6_V6ONLY` option for this socket.
///
/// For more information about this option, see [`set_only_v6`][link].
///
/// [link]: #method.set_only_v6
pub fn only_v6(&self) -> io::Result<bool> {
self.io.get_ref().only_v6()
}
}
impl fmt::Debug for TcpListener {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
self.io.get_ref().fmt(f)
}
}
impl Stream for Incoming {
type Item = (TcpStream, SocketAddr);
type Error = io::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, io::Error> {
Ok(Async::Ready(Some(try_nb!(self.inner.accept()))))
}
}
/// An I/O object representing a TCP stream connected to a remote endpoint.
///
/// A TCP stream can either be created by connecting to an endpoint or by
/// accepting a connection from a listener. Inside the stream is access to the
/// raw underlying I/O object as well as streams for the read/write
/// notifications on the stream itself.
pub struct TcpStream {
io: PollEvented<mio::net::TcpStream>,
}
/// Future returned by `TcpStream::connect` which will resolve to a `TcpStream`
/// when the stream is connected.
pub struct TcpStreamNew {
inner: TcpStreamNewState,
}
enum TcpStreamNewState {
Waiting(TcpStream),
Error(io::Error),
Empty,
}
impl TcpStream {
/// Create a new TCP stream connected to the specified address.
///
/// This function will create a new TCP socket and attempt to connect it to
/// the `addr` provided. The returned future will be resolved once the
/// stream has successfully connected. If an error happens during the
/// connection or during the socket creation, that error will be returned to
/// the future instead.
pub fn connect(addr: &SocketAddr, handle: &Handle) -> TcpStreamNew {
let inner = match mio::net::TcpStream::connect(addr) {
Ok(tcp) => TcpStream::new(tcp, handle),
Err(e) => TcpStreamNewState::Error(e),
};
TcpStreamNew { inner: inner }
}
fn new(connected_stream: mio::net::TcpStream, handle: &Handle)
-> TcpStreamNewState {
match PollEvented::new(connected_stream, handle) {
Ok(io) => TcpStreamNewState::Waiting(TcpStream { io: io }),
Err(e) => TcpStreamNewState::Error(e),
}
}
/// Create a new `TcpStream` from a `net::TcpStream`.
///
/// This function will convert a TCP stream in the standard library to a TCP
/// stream ready to be used with the provided event loop handle. The object
/// returned is associated with the event loop and ready to perform I/O.
pub fn from_stream(stream: net::TcpStream, handle: &Handle)
-> io::Result<TcpStream> {
let inner = try!(mio::net::TcpStream::from_stream(stream));
Ok(TcpStream {
io: try!(PollEvented::new(inner, handle)),
})
}
/// Creates a new `TcpStream` from the pending socket inside the given
/// `std::net::TcpStream`, connecting it to the address specified.
///
/// This constructor allows configuring the socket before it's actually
/// connected, and this function will transfer ownership to the returned
/// `TcpStream` if successful. An unconnected `TcpStream` can be created
/// with the `net2::TcpBuilder` type (and also configured via that route).
///
/// The platform specific behavior of this function looks like:
///
/// * On Unix, the socket is placed into nonblocking mode and then a
/// `connect` call is issued.
///
/// * On Windows, the address is stored internally and the connect operation
/// is issued when the returned `TcpStream` is registered with an event
/// loop. Note that on Windows you must `bind` a socket before it can be
/// connected, so if a custom `TcpBuilder` is used it should be bound
/// (perhaps to `INADDR_ANY`) before this method is called.
pub fn connect_stream(stream: net::TcpStream,
addr: &SocketAddr,
handle: &Handle)
-> Box<Future<Item=TcpStream, Error=io::Error> + Send> {
let state = match mio::net::TcpStream::connect_stream(stream, addr) {
Ok(tcp) => TcpStream::new(tcp, handle),
Err(e) => TcpStreamNewState::Error(e),
};
state.boxed()
}
/// Test whether this socket is ready to be read or not.
///
/// If the socket is *not* readable then the current task is scheduled to
/// get a notification when the socket does become readable. That is, this
/// is only suitable for calling in a `Future::poll` method and will
/// automatically handle ensuring a retry once the socket is readable again.
pub fn poll_read(&self) -> Async<()> {
self.io.poll_read()
}
/// Test whether this socket is ready to be written to or not.
///
/// If the socket is *not* writable then the current task is scheduled to
/// get a notification when the socket does become writable. That is, this
/// is only suitable for calling in a `Future::poll` method and will
/// automatically handle ensuring a retry once the socket is writable again.
pub fn poll_write(&self) -> Async<()> {
self.io.poll_write()
}
/// Returns the local address that this stream is bound to.
pub fn local_addr(&self) -> io::Result<SocketAddr> {
self.io.get_ref().local_addr()
}
/// Returns the remote address that this stream is connected to.
pub fn peer_addr(&self) -> io::Result<SocketAddr> {
self.io.get_ref().peer_addr()
}
/// Shuts down the read, write, or both halves of this connection.
///
/// This function will cause all pending and future I/O on the specified
/// portions to return immediately with an appropriate value (see the
/// documentation of `Shutdown`).
pub fn shutdown(&self, how: Shutdown) -> io::Result<()> {
self.io.get_ref().shutdown(how)
}
/// Sets the value of the `TCP_NODELAY` option on this socket.
///
/// If set, this option disables the Nagle algorithm. This means that
/// segments are always sent as soon as possible, even if there is only a
/// small amount of data. When not set, data is buffered until there is a
/// sufficient amount to send out, thereby avoiding the frequent sending of
/// small packets.
pub fn set_nodelay(&self, nodelay: bool) -> io::Result<()> {
self.io.get_ref().set_nodelay(nodelay)
}
/// Gets the value of the `TCP_NODELAY` option on this socket.
///
/// For more information about this option, see [`set_nodelay`][link].
///
/// [link]: #method.set_nodelay
pub fn nodelay(&self) -> io::Result<bool> {
self.io.get_ref().nodelay()
}
/// Sets the value of the `SO_RCVBUF` option on this socket.
///
/// Changes the size of the operating system's receive buffer associated
/// with the socket.
pub fn set_recv_buffer_size(&self, size: usize) -> io::Result<()> {
self.io.get_ref().set_recv_buffer_size(size)
}
/// Gets the value of the `SO_RCVBUF` option on this socket.
///
/// For more information about this option, see
/// [`set_recv_buffer_size`][link].
///
/// [link]: #tymethod.set_recv_buffer_size
pub fn recv_buffer_size(&self) -> io::Result<usize> {
self.io.get_ref().recv_buffer_size()
}
/// Sets the value of the `SO_SNDBUF` option on this socket.
///
/// Changes the size of the operating system's send buffer associated with
/// the socket.
pub fn set_send_buffer_size(&self, size: usize) -> io::Result<()> {
self.io.get_ref().set_send_buffer_size(size)
}
/// Gets the value of the `SO_SNDBUF` option on this socket.
///
/// For more information about this option, see [`set_send_buffer`][link].
///
/// [link]: #tymethod.set_send_buffer
pub fn send_buffer_size(&self) -> io::Result<usize> {
self.io.get_ref().send_buffer_size()
}
/// Sets whether keepalive messages are enabled to be sent on this socket.
///
/// On Unix, this option will set the `SO_KEEPALIVE` as well as the
/// `TCP_KEEPALIVE` or `TCP_KEEPIDLE` option (depending on your platform).
/// On Windows, this will set the `SIO_KEEPALIVE_VALS` option.
///
/// If `None` is specified then keepalive messages are disabled, otherwise
/// the duration specified will be the time to remain idle before sending a
/// TCP keepalive probe.
///
/// Some platforms specify this value in seconds, so sub-second
/// specifications may be omitted.
pub fn set_keepalive(&self, keepalive: Option<Duration>) -> io::Result<()> {
self.io.get_ref().set_keepalive(keepalive)
}
/// Returns whether keepalive messages are enabled on this socket, and if so
/// the duration of time between them.
///
/// For more information about this option, see [`set_keepalive`][link].
///
/// [link]: #tymethod.set_keepalive
pub fn keepalive(&self) -> io::Result<Option<Duration>> {
self.io.get_ref().keepalive()
}
/// Sets the value for the `IP_TTL` option on this socket.
///
/// This value sets the time-to-live field that is used in every packet sent
/// from this socket.
pub fn set_ttl(&self, ttl: u32) -> io::Result<()> {
self.io.get_ref().set_ttl(ttl)
}
/// Gets the value of the `IP_TTL` option for this socket.
///
/// For more information about this option, see [`set_ttl`][link].
///
/// [link]: #tymethod.set_ttl
pub fn ttl(&self) -> io::Result<u32> {
self.io.get_ref().ttl()
}
/// Sets the value for the `IPV6_V6ONLY` option on this socket.
///
/// If this is set to `true` then the socket is restricted to sending and
/// receiving IPv6 packets only. In this case two IPv4 and IPv6 applications
/// can bind the same port at the same time.
///
/// If this is set to `false` then the socket can be used to send and
/// receive packets from an IPv4-mapped IPv6 address.
pub fn set_only_v6(&self, only_v6: bool) -> io::Result<()> {
self.io.get_ref().set_only_v6(only_v6)
}
/// Gets the value of the `IPV6_V6ONLY` option for this socket.
///
/// For more information about this option, see [`set_only_v6`][link].
///
/// [link]: #tymethod.set_only_v6
pub fn only_v6(&self) -> io::Result<bool> {
self.io.get_ref().only_v6()
}
/// Sets the linger duration of this socket by setting the SO_LINGER option
pub fn set_linger(&self, dur: Option<Duration>) -> io::Result<()> {
self.io.get_ref().set_linger(dur)
}
/// reads the linger duration for this socket by getting the SO_LINGER option
pub fn linger(&self) -> io::Result<Option<Duration>> {
self.io.get_ref().linger()
}
#[deprecated(since = "0.1.8", note = "use set_keepalive")]
#[doc(hidden)]
pub fn set_keepalive_ms(&self, keepalive: Option<u32>) -> io::Result<()> {
#[allow(deprecated)]
self.io.get_ref().set_keepalive_ms(keepalive)
}
#[deprecated(since = "0.1.8", note = "use keepalive")]
#[doc(hidden)]
pub fn keepalive_ms(&self) -> io::Result<Option<u32>> {
#[allow(deprecated)]
self.io.get_ref().keepalive_ms()
}
}
impl Read for TcpStream {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.io.read(buf)
}
}
impl Write for TcpStream {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.io.write(buf)
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}
impl AsyncRead for TcpStream {
unsafe fn prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool {
false
}
fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
<&TcpStream>::read_buf(&mut &*self, buf)
}
}
impl AsyncWrite for TcpStream {
fn shutdown(&mut self) -> Poll<(), io::Error> {
<&TcpStream>::shutdown(&mut &*self)
}
fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
<&TcpStream>::write_buf(&mut &*self, buf)
}
}
#[allow(deprecated)]
impl ::io::Io for TcpStream {
fn poll_read(&mut self) -> Async<()> {
<TcpStream>::poll_read(self)
}
fn poll_write(&mut self) -> Async<()> {
<TcpStream>::poll_write(self)
}
fn read_vec(&mut self, bufs: &mut [&mut IoVec]) -> io::Result<usize> {
if let Async::NotReady = <TcpStream>::poll_read(self) {
return Err(::would_block())
}
let r = self.io.get_ref().read_bufs(bufs);
if is_wouldblock(&r) {
self.io.need_read();
}
return r
}
fn write_vec(&mut self, bufs: &[&IoVec]) -> io::Result<usize> {
if let Async::NotReady = <TcpStream>::poll_write(self) {
return Err(::would_block())
}
let r = self.io.get_ref().write_bufs(bufs);
if is_wouldblock(&r) {
self.io.need_write();
}
return r
}
}
fn is_wouldblock<T>(r: &io::Result<T>) -> bool {
match *r {
Ok(_) => false,
Err(ref e) => e.kind() == io::ErrorKind::WouldBlock,
}
}
impl<'a> Read for &'a TcpStream {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
(&self.io).read(buf)
}
}
impl<'a> Write for &'a TcpStream {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
(&self.io).write(buf)
}
fn flush(&mut self) -> io::Result<()> {
(&self.io).flush()
}
}
impl<'a> AsyncRead for &'a TcpStream {
unsafe fn prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool {
false
}
fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
if let Async::NotReady = <TcpStream>::poll_read(self) {
return Ok(Async::NotReady)
}
let r = unsafe {
let mut bufs: [_; 16] = Default::default();
let n = buf.bytes_vec_mut(&mut bufs);
self.io.get_ref().read_bufs(&mut bufs[..n])
};
match r {
Ok(n) => {
unsafe { buf.advance_mut(n); }
Ok(Async::Ready(n))
}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
self.io.need_read();
Ok(Async::NotReady)
}
Err(e) => Err(e),
}
}
}
impl<'a> AsyncWrite for &'a TcpStream {
fn shutdown(&mut self) -> Poll<(), io::Error> {
Ok(().into())
}
fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
if let Async::NotReady = <TcpStream>::poll_write(self) {
return Ok(Async::NotReady)
}
let r = {
let mut bufs: [_; 16] = Default::default();
let n = buf.bytes_vec(&mut bufs);
self.io.get_ref().write_bufs(&bufs[..n])
};
match r {
Ok(n) => {
buf.advance(n);
Ok(Async::Ready(n))
}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
self.io.need_write();
Ok(Async::NotReady)
}
Err(e) => Err(e),
}
}
}
#[allow(deprecated)]
impl<'a> ::io::Io for &'a TcpStream {
fn poll_read(&mut self) -> Async<()> {
<TcpStream>::poll_read(self)
}
fn poll_write(&mut self) -> Async<()> {
<TcpStream>::poll_write(self)
}
}
impl fmt::Debug for TcpStream {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
self.io.get_ref().fmt(f)
}
}
impl Future for TcpStreamNew {
type Item = TcpStream;
type Error = io::Error;
fn poll(&mut self) -> Poll<TcpStream, io::Error> {
self.inner.poll()
}
}
impl Future for TcpStreamNewState {
type Item = TcpStream;
type Error = io::Error;
fn poll(&mut self) -> Poll<TcpStream, io::Error> {
{
let stream = match *self {
TcpStreamNewState::Waiting(ref s) => s,
TcpStreamNewState::Error(_) => {
let e = match mem::replace(self, TcpStreamNewState::Empty) {
TcpStreamNewState::Error(e) => e,
_ => panic!(),
};
return Err(e)
}
TcpStreamNewState::Empty => panic!("can't poll TCP stream twice"),
};
// Once we've connected, wait for the stream to be writable as
// that's when the actual connection has been initiated. Once we're
// writable we check for `take_socket_error` to see if the connect
// actually hit an error or not.
//
// If all that succeeded then we ship everything on up.
if let Async::NotReady = stream.io.poll_write() {
return Ok(Async::NotReady)
}
if let Some(e) = try!(stream.io.get_ref().take_error()) {
return Err(e)
}
}
match mem::replace(self, TcpStreamNewState::Empty) {
TcpStreamNewState::Waiting(stream) => Ok(Async::Ready(stream)),
_ => panic!(),
}
}
}
#[cfg(all(unix, not(target_os = "fuchsia")))]
mod sys {
use std::os::unix::prelude::*;
use super::{TcpStream, TcpListener};
impl AsRawFd for TcpStream {
fn as_raw_fd(&self) -> RawFd {
self.io.get_ref().as_raw_fd()
}
}
impl AsRawFd for TcpListener {
fn as_raw_fd(&self) -> RawFd {
self.io.get_ref().as_raw_fd()
}
}
}
#[cfg(windows)]
mod sys {
// TODO: let's land these upstream with mio and then we can add them here.
//
// use std::os::windows::prelude::*;
// use super::{TcpStream, TcpListener};
//
// impl AsRawHandle for TcpStream {
// fn as_raw_handle(&self) -> RawHandle {
// self.io.get_ref().as_raw_handle()
// }
// }
//
// impl AsRawHandle for TcpListener {
// fn as_raw_handle(&self) -> RawHandle {
// self.listener.io().as_raw_handle()
// }
// }
}