| use std::fmt; |
| use std::io::{self, Read, Write}; |
| use std::mem; |
| use std::net::{self, SocketAddr, Shutdown}; |
| use std::time::Duration; |
| |
| use bytes::{Buf, BufMut}; |
| use futures::stream::Stream; |
| use futures::sync::oneshot; |
| use futures::{Future, Poll, Async}; |
| use iovec::IoVec; |
| use mio; |
| use tokio_io::{AsyncRead, AsyncWrite}; |
| |
| use reactor::{Handle, PollEvented}; |
| |
| /// An I/O object representing a TCP socket listening for incoming connections. |
| /// |
| /// This object can be converted into a stream of incoming connections for |
| /// various forms of processing. |
| pub struct TcpListener { |
| io: PollEvented<mio::net::TcpListener>, |
| pending_accept: Option<oneshot::Receiver<io::Result<(TcpStream, SocketAddr)>>>, |
| } |
| |
| /// Stream returned by the `TcpListener::incoming` function representing the |
| /// stream of sockets received from a listener. |
| pub struct Incoming { |
| inner: TcpListener, |
| } |
| |
| impl TcpListener { |
| /// Create a new TCP listener associated with this event loop. |
| /// |
| /// The TCP listener will bind to the provided `addr` address, if available. |
| /// If the result is `Ok`, the socket has successfully bound. |
| pub fn bind(addr: &SocketAddr, handle: &Handle) -> io::Result<TcpListener> { |
| let l = try!(mio::net::TcpListener::bind(addr)); |
| TcpListener::new(l, handle) |
| } |
| |
| /// Attempt to accept a connection and create a new connected `TcpStream` if |
| /// successful. |
| /// |
| /// This function will attempt an accept operation, but will not block |
| /// waiting for it to complete. If the operation would block then a "would |
| /// block" error is returned. Additionally, if this method would block, it |
| /// registers the current task to receive a notification when it would |
| /// otherwise not block. |
| /// |
| /// Note that typically for simple usage it's easier to treat incoming |
| /// connections as a `Stream` of `TcpStream`s with the `incoming` method |
| /// below. |
| /// |
| /// # Panics |
| /// |
| /// This function will panic if it is called outside the context of a |
| /// future's task. It's recommended to only call this from the |
| /// implementation of a `Future::poll`, if necessary. |
| pub fn accept(&mut self) -> io::Result<(TcpStream, SocketAddr)> { |
| loop { |
| if let Some(mut pending) = self.pending_accept.take() { |
| match pending.poll().expect("shouldn't be canceled") { |
| Async::NotReady => { |
| self.pending_accept = Some(pending); |
| return Err(io::ErrorKind::WouldBlock.into()) |
| }, |
| Async::Ready(r) => return r, |
| } |
| } |
| |
| if let Async::NotReady = self.io.poll_read() { |
| return Err(io::Error::new(io::ErrorKind::WouldBlock, "not ready")) |
| } |
| |
| match self.io.get_ref().accept() { |
| Err(e) => { |
| if e.kind() == io::ErrorKind::WouldBlock { |
| self.io.need_read(); |
| } |
| return Err(e) |
| }, |
| Ok((sock, addr)) => { |
| // Fast path if we haven't left the event loop |
| if let Some(handle) = self.io.remote().handle() { |
| let io = try!(PollEvented::new(sock, &handle)); |
| return Ok((TcpStream { io: io }, addr)) |
| } |
| |
| // If we're off the event loop then send the socket back |
| // over there to get registered and then we'll get it back |
| // eventually. |
| let (tx, rx) = oneshot::channel(); |
| let remote = self.io.remote().clone(); |
| remote.spawn(move |handle| { |
| let res = PollEvented::new(sock, handle) |
| .map(move |io| { |
| (TcpStream { io: io }, addr) |
| }); |
| drop(tx.send(res)); |
| Ok(()) |
| }); |
| self.pending_accept = Some(rx); |
| // continue to polling the `rx` at the beginning of the loop |
| } |
| } |
| } |
| } |
| |
| /// Create a new TCP listener from the standard library's TCP listener. |
| /// |
| /// This method can be used when the `Handle::tcp_listen` method isn't |
| /// sufficient because perhaps some more configuration is needed in terms of |
| /// before the calls to `bind` and `listen`. |
| /// |
| /// This API is typically paired with the `net2` crate and the `TcpBuilder` |
| /// type to build up and customize a listener before it's shipped off to the |
| /// backing event loop. This allows configuration of options like |
| /// `SO_REUSEPORT`, binding to multiple addresses, etc. |
| /// |
| /// The `addr` argument here is one of the addresses that `listener` is |
| /// bound to and the listener will only be guaranteed to accept connections |
| /// of the same address type currently. |
| /// |
| /// Finally, the `handle` argument is the event loop that this listener will |
| /// be bound to. |
| /// |
| /// The platform specific behavior of this function looks like: |
| /// |
| /// * On Unix, the socket is placed into nonblocking mode and connections |
| /// can be accepted as normal |
| /// |
| /// * On Windows, the address is stored internally and all future accepts |
| /// will only be for the same IP version as `addr` specified. That is, if |
| /// `addr` is an IPv4 address then all sockets accepted will be IPv4 as |
| /// well (same for IPv6). |
| pub fn from_listener(listener: net::TcpListener, |
| addr: &SocketAddr, |
| handle: &Handle) -> io::Result<TcpListener> { |
| let l = try!(mio::net::TcpListener::from_listener(listener, addr)); |
| TcpListener::new(l, handle) |
| } |
| |
| fn new(listener: mio::net::TcpListener, handle: &Handle) |
| -> io::Result<TcpListener> { |
| let io = try!(PollEvented::new(listener, handle)); |
| Ok(TcpListener { io: io, pending_accept: None }) |
| } |
| |
| /// Test whether this socket is ready to be read or not. |
| pub fn poll_read(&self) -> Async<()> { |
| self.io.poll_read() |
| } |
| |
| /// Returns the local address that this listener is bound to. |
| /// |
| /// This can be useful, for example, when binding to port 0 to figure out |
| /// which port was actually bound. |
| pub fn local_addr(&self) -> io::Result<SocketAddr> { |
| self.io.get_ref().local_addr() |
| } |
| |
| /// Consumes this listener, returning a stream of the sockets this listener |
| /// accepts. |
| /// |
| /// This method returns an implementation of the `Stream` trait which |
| /// resolves to the sockets the are accepted on this listener. |
| pub fn incoming(self) -> Incoming { |
| Incoming { inner: self } |
| } |
| |
| /// Sets the value for the `IP_TTL` option on this socket. |
| /// |
| /// This value sets the time-to-live field that is used in every packet sent |
| /// from this socket. |
| pub fn set_ttl(&self, ttl: u32) -> io::Result<()> { |
| self.io.get_ref().set_ttl(ttl) |
| } |
| |
| /// Gets the value of the `IP_TTL` option for this socket. |
| /// |
| /// For more information about this option, see [`set_ttl`][link]. |
| /// |
| /// [link]: #method.set_ttl |
| pub fn ttl(&self) -> io::Result<u32> { |
| self.io.get_ref().ttl() |
| } |
| |
| /// Sets the value for the `IPV6_V6ONLY` option on this socket. |
| /// |
| /// If this is set to `true` then the socket is restricted to sending and |
| /// receiving IPv6 packets only. In this case two IPv4 and IPv6 applications |
| /// can bind the same port at the same time. |
| /// |
| /// If this is set to `false` then the socket can be used to send and |
| /// receive packets from an IPv4-mapped IPv6 address. |
| pub fn set_only_v6(&self, only_v6: bool) -> io::Result<()> { |
| self.io.get_ref().set_only_v6(only_v6) |
| } |
| |
| /// Gets the value of the `IPV6_V6ONLY` option for this socket. |
| /// |
| /// For more information about this option, see [`set_only_v6`][link]. |
| /// |
| /// [link]: #method.set_only_v6 |
| pub fn only_v6(&self) -> io::Result<bool> { |
| self.io.get_ref().only_v6() |
| } |
| } |
| |
| impl fmt::Debug for TcpListener { |
| fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { |
| self.io.get_ref().fmt(f) |
| } |
| } |
| |
| impl Stream for Incoming { |
| type Item = (TcpStream, SocketAddr); |
| type Error = io::Error; |
| |
| fn poll(&mut self) -> Poll<Option<Self::Item>, io::Error> { |
| Ok(Async::Ready(Some(try_nb!(self.inner.accept())))) |
| } |
| } |
| |
| /// An I/O object representing a TCP stream connected to a remote endpoint. |
| /// |
| /// A TCP stream can either be created by connecting to an endpoint or by |
| /// accepting a connection from a listener. Inside the stream is access to the |
| /// raw underlying I/O object as well as streams for the read/write |
| /// notifications on the stream itself. |
| pub struct TcpStream { |
| io: PollEvented<mio::net::TcpStream>, |
| } |
| |
| /// Future returned by `TcpStream::connect` which will resolve to a `TcpStream` |
| /// when the stream is connected. |
| pub struct TcpStreamNew { |
| inner: TcpStreamNewState, |
| } |
| |
| enum TcpStreamNewState { |
| Waiting(TcpStream), |
| Error(io::Error), |
| Empty, |
| } |
| |
| impl TcpStream { |
| /// Create a new TCP stream connected to the specified address. |
| /// |
| /// This function will create a new TCP socket and attempt to connect it to |
| /// the `addr` provided. The returned future will be resolved once the |
| /// stream has successfully connected. If an error happens during the |
| /// connection or during the socket creation, that error will be returned to |
| /// the future instead. |
| pub fn connect(addr: &SocketAddr, handle: &Handle) -> TcpStreamNew { |
| let inner = match mio::net::TcpStream::connect(addr) { |
| Ok(tcp) => TcpStream::new(tcp, handle), |
| Err(e) => TcpStreamNewState::Error(e), |
| }; |
| TcpStreamNew { inner: inner } |
| } |
| |
| fn new(connected_stream: mio::net::TcpStream, handle: &Handle) |
| -> TcpStreamNewState { |
| match PollEvented::new(connected_stream, handle) { |
| Ok(io) => TcpStreamNewState::Waiting(TcpStream { io: io }), |
| Err(e) => TcpStreamNewState::Error(e), |
| } |
| } |
| |
| /// Create a new `TcpStream` from a `net::TcpStream`. |
| /// |
| /// This function will convert a TCP stream in the standard library to a TCP |
| /// stream ready to be used with the provided event loop handle. The object |
| /// returned is associated with the event loop and ready to perform I/O. |
| pub fn from_stream(stream: net::TcpStream, handle: &Handle) |
| -> io::Result<TcpStream> { |
| let inner = try!(mio::net::TcpStream::from_stream(stream)); |
| Ok(TcpStream { |
| io: try!(PollEvented::new(inner, handle)), |
| }) |
| } |
| |
| /// Creates a new `TcpStream` from the pending socket inside the given |
| /// `std::net::TcpStream`, connecting it to the address specified. |
| /// |
| /// This constructor allows configuring the socket before it's actually |
| /// connected, and this function will transfer ownership to the returned |
| /// `TcpStream` if successful. An unconnected `TcpStream` can be created |
| /// with the `net2::TcpBuilder` type (and also configured via that route). |
| /// |
| /// The platform specific behavior of this function looks like: |
| /// |
| /// * On Unix, the socket is placed into nonblocking mode and then a |
| /// `connect` call is issued. |
| /// |
| /// * On Windows, the address is stored internally and the connect operation |
| /// is issued when the returned `TcpStream` is registered with an event |
| /// loop. Note that on Windows you must `bind` a socket before it can be |
| /// connected, so if a custom `TcpBuilder` is used it should be bound |
| /// (perhaps to `INADDR_ANY`) before this method is called. |
| pub fn connect_stream(stream: net::TcpStream, |
| addr: &SocketAddr, |
| handle: &Handle) |
| -> Box<Future<Item=TcpStream, Error=io::Error> + Send> { |
| let state = match mio::net::TcpStream::connect_stream(stream, addr) { |
| Ok(tcp) => TcpStream::new(tcp, handle), |
| Err(e) => TcpStreamNewState::Error(e), |
| }; |
| Box::new(state) |
| } |
| |
| /// Test whether this socket is ready to be read or not. |
| /// |
| /// If the socket is *not* readable then the current task is scheduled to |
| /// get a notification when the socket does become readable. That is, this |
| /// is only suitable for calling in a `Future::poll` method and will |
| /// automatically handle ensuring a retry once the socket is readable again. |
| pub fn poll_read(&self) -> Async<()> { |
| self.io.poll_read() |
| } |
| |
| /// Test whether this socket is ready to be written to or not. |
| /// |
| /// If the socket is *not* writable then the current task is scheduled to |
| /// get a notification when the socket does become writable. That is, this |
| /// is only suitable for calling in a `Future::poll` method and will |
| /// automatically handle ensuring a retry once the socket is writable again. |
| pub fn poll_write(&self) -> Async<()> { |
| self.io.poll_write() |
| } |
| |
| /// Returns the local address that this stream is bound to. |
| pub fn local_addr(&self) -> io::Result<SocketAddr> { |
| self.io.get_ref().local_addr() |
| } |
| |
| /// Returns the remote address that this stream is connected to. |
| pub fn peer_addr(&self) -> io::Result<SocketAddr> { |
| self.io.get_ref().peer_addr() |
| } |
| |
| /// Shuts down the read, write, or both halves of this connection. |
| /// |
| /// This function will cause all pending and future I/O on the specified |
| /// portions to return immediately with an appropriate value (see the |
| /// documentation of `Shutdown`). |
| pub fn shutdown(&self, how: Shutdown) -> io::Result<()> { |
| self.io.get_ref().shutdown(how) |
| } |
| |
| /// Sets the value of the `TCP_NODELAY` option on this socket. |
| /// |
| /// If set, this option disables the Nagle algorithm. This means that |
| /// segments are always sent as soon as possible, even if there is only a |
| /// small amount of data. When not set, data is buffered until there is a |
| /// sufficient amount to send out, thereby avoiding the frequent sending of |
| /// small packets. |
| pub fn set_nodelay(&self, nodelay: bool) -> io::Result<()> { |
| self.io.get_ref().set_nodelay(nodelay) |
| } |
| |
| /// Gets the value of the `TCP_NODELAY` option on this socket. |
| /// |
| /// For more information about this option, see [`set_nodelay`][link]. |
| /// |
| /// [link]: #method.set_nodelay |
| pub fn nodelay(&self) -> io::Result<bool> { |
| self.io.get_ref().nodelay() |
| } |
| |
| /// Sets the value of the `SO_RCVBUF` option on this socket. |
| /// |
| /// Changes the size of the operating system's receive buffer associated |
| /// with the socket. |
| pub fn set_recv_buffer_size(&self, size: usize) -> io::Result<()> { |
| self.io.get_ref().set_recv_buffer_size(size) |
| } |
| |
| /// Gets the value of the `SO_RCVBUF` option on this socket. |
| /// |
| /// For more information about this option, see |
| /// [`set_recv_buffer_size`][link]. |
| /// |
| /// [link]: #tymethod.set_recv_buffer_size |
| pub fn recv_buffer_size(&self) -> io::Result<usize> { |
| self.io.get_ref().recv_buffer_size() |
| } |
| |
| /// Sets the value of the `SO_SNDBUF` option on this socket. |
| /// |
| /// Changes the size of the operating system's send buffer associated with |
| /// the socket. |
| pub fn set_send_buffer_size(&self, size: usize) -> io::Result<()> { |
| self.io.get_ref().set_send_buffer_size(size) |
| } |
| |
| /// Gets the value of the `SO_SNDBUF` option on this socket. |
| /// |
| /// For more information about this option, see [`set_send_buffer`][link]. |
| /// |
| /// [link]: #tymethod.set_send_buffer |
| pub fn send_buffer_size(&self) -> io::Result<usize> { |
| self.io.get_ref().send_buffer_size() |
| } |
| |
| /// Sets whether keepalive messages are enabled to be sent on this socket. |
| /// |
| /// On Unix, this option will set the `SO_KEEPALIVE` as well as the |
| /// `TCP_KEEPALIVE` or `TCP_KEEPIDLE` option (depending on your platform). |
| /// On Windows, this will set the `SIO_KEEPALIVE_VALS` option. |
| /// |
| /// If `None` is specified then keepalive messages are disabled, otherwise |
| /// the duration specified will be the time to remain idle before sending a |
| /// TCP keepalive probe. |
| /// |
| /// Some platforms specify this value in seconds, so sub-second |
| /// specifications may be omitted. |
| pub fn set_keepalive(&self, keepalive: Option<Duration>) -> io::Result<()> { |
| self.io.get_ref().set_keepalive(keepalive) |
| } |
| |
| /// Returns whether keepalive messages are enabled on this socket, and if so |
| /// the duration of time between them. |
| /// |
| /// For more information about this option, see [`set_keepalive`][link]. |
| /// |
| /// [link]: #tymethod.set_keepalive |
| pub fn keepalive(&self) -> io::Result<Option<Duration>> { |
| self.io.get_ref().keepalive() |
| } |
| |
| /// Sets the value for the `IP_TTL` option on this socket. |
| /// |
| /// This value sets the time-to-live field that is used in every packet sent |
| /// from this socket. |
| pub fn set_ttl(&self, ttl: u32) -> io::Result<()> { |
| self.io.get_ref().set_ttl(ttl) |
| } |
| |
| /// Gets the value of the `IP_TTL` option for this socket. |
| /// |
| /// For more information about this option, see [`set_ttl`][link]. |
| /// |
| /// [link]: #tymethod.set_ttl |
| pub fn ttl(&self) -> io::Result<u32> { |
| self.io.get_ref().ttl() |
| } |
| |
| /// Sets the value for the `IPV6_V6ONLY` option on this socket. |
| /// |
| /// If this is set to `true` then the socket is restricted to sending and |
| /// receiving IPv6 packets only. In this case two IPv4 and IPv6 applications |
| /// can bind the same port at the same time. |
| /// |
| /// If this is set to `false` then the socket can be used to send and |
| /// receive packets from an IPv4-mapped IPv6 address. |
| pub fn set_only_v6(&self, only_v6: bool) -> io::Result<()> { |
| self.io.get_ref().set_only_v6(only_v6) |
| } |
| |
| /// Gets the value of the `IPV6_V6ONLY` option for this socket. |
| /// |
| /// For more information about this option, see [`set_only_v6`][link]. |
| /// |
| /// [link]: #tymethod.set_only_v6 |
| pub fn only_v6(&self) -> io::Result<bool> { |
| self.io.get_ref().only_v6() |
| } |
| |
| /// Sets the linger duration of this socket by setting the SO_LINGER option |
| pub fn set_linger(&self, dur: Option<Duration>) -> io::Result<()> { |
| self.io.get_ref().set_linger(dur) |
| } |
| |
| /// reads the linger duration for this socket by getting the SO_LINGER option |
| pub fn linger(&self) -> io::Result<Option<Duration>> { |
| self.io.get_ref().linger() |
| } |
| |
| #[deprecated(since = "0.1.8", note = "use set_keepalive")] |
| #[doc(hidden)] |
| pub fn set_keepalive_ms(&self, keepalive: Option<u32>) -> io::Result<()> { |
| #[allow(deprecated)] |
| self.io.get_ref().set_keepalive_ms(keepalive) |
| } |
| |
| #[deprecated(since = "0.1.8", note = "use keepalive")] |
| #[doc(hidden)] |
| pub fn keepalive_ms(&self) -> io::Result<Option<u32>> { |
| #[allow(deprecated)] |
| self.io.get_ref().keepalive_ms() |
| } |
| } |
| |
| impl Read for TcpStream { |
| fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { |
| self.io.read(buf) |
| } |
| } |
| |
| impl Write for TcpStream { |
| fn write(&mut self, buf: &[u8]) -> io::Result<usize> { |
| self.io.write(buf) |
| } |
| fn flush(&mut self) -> io::Result<()> { |
| Ok(()) |
| } |
| } |
| |
| impl AsyncRead for TcpStream { |
| unsafe fn prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool { |
| false |
| } |
| |
| fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> Poll<usize, io::Error> { |
| <&TcpStream>::read_buf(&mut &*self, buf) |
| } |
| } |
| |
| impl AsyncWrite for TcpStream { |
| fn shutdown(&mut self) -> Poll<(), io::Error> { |
| <&TcpStream>::shutdown(&mut &*self) |
| } |
| |
| fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error> { |
| <&TcpStream>::write_buf(&mut &*self, buf) |
| } |
| } |
| |
| #[allow(deprecated)] |
| impl ::io::Io for TcpStream { |
| fn poll_read(&mut self) -> Async<()> { |
| <TcpStream>::poll_read(self) |
| } |
| |
| fn poll_write(&mut self) -> Async<()> { |
| <TcpStream>::poll_write(self) |
| } |
| |
| fn read_vec(&mut self, bufs: &mut [&mut IoVec]) -> io::Result<usize> { |
| if let Async::NotReady = <TcpStream>::poll_read(self) { |
| return Err(io::ErrorKind::WouldBlock.into()) |
| } |
| let r = self.io.get_ref().read_bufs(bufs); |
| if is_wouldblock(&r) { |
| self.io.need_read(); |
| } |
| return r |
| } |
| |
| fn write_vec(&mut self, bufs: &[&IoVec]) -> io::Result<usize> { |
| if let Async::NotReady = <TcpStream>::poll_write(self) { |
| return Err(io::ErrorKind::WouldBlock.into()) |
| } |
| let r = self.io.get_ref().write_bufs(bufs); |
| if is_wouldblock(&r) { |
| self.io.need_write(); |
| } |
| return r |
| } |
| } |
| |
| fn is_wouldblock<T>(r: &io::Result<T>) -> bool { |
| match *r { |
| Ok(_) => false, |
| Err(ref e) => e.kind() == io::ErrorKind::WouldBlock, |
| } |
| } |
| |
| impl<'a> Read for &'a TcpStream { |
| fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { |
| (&self.io).read(buf) |
| } |
| } |
| |
| impl<'a> Write for &'a TcpStream { |
| fn write(&mut self, buf: &[u8]) -> io::Result<usize> { |
| (&self.io).write(buf) |
| } |
| |
| fn flush(&mut self) -> io::Result<()> { |
| (&self.io).flush() |
| } |
| } |
| |
| impl<'a> AsyncRead for &'a TcpStream { |
| unsafe fn prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool { |
| false |
| } |
| |
| fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> Poll<usize, io::Error> { |
| if let Async::NotReady = <TcpStream>::poll_read(self) { |
| return Ok(Async::NotReady) |
| } |
| let r = unsafe { |
| // The `IoVec` type can't have a 0-length size, so we create a bunch |
| // of dummy versions on the stack with 1 length which we'll quickly |
| // overwrite. |
| let mut b1: &mut [u8] = &mut [0]; |
| let mut b2: &mut [u8] = &mut [0]; |
| let mut b3: &mut [u8] = &mut [0]; |
| let mut b4: &mut [u8] = &mut [0]; |
| let mut b5: &mut [u8] = &mut [0]; |
| let mut b6: &mut [u8] = &mut [0]; |
| let mut b7: &mut [u8] = &mut [0]; |
| let mut b8: &mut [u8] = &mut [0]; |
| let mut b9: &mut [u8] = &mut [0]; |
| let mut b10: &mut [u8] = &mut [0]; |
| let mut b11: &mut [u8] = &mut [0]; |
| let mut b12: &mut [u8] = &mut [0]; |
| let mut b13: &mut [u8] = &mut [0]; |
| let mut b14: &mut [u8] = &mut [0]; |
| let mut b15: &mut [u8] = &mut [0]; |
| let mut b16: &mut [u8] = &mut [0]; |
| let mut bufs: [&mut IoVec; 16] = [ |
| b1.into(), b2.into(), b3.into(), b4.into(), |
| b5.into(), b6.into(), b7.into(), b8.into(), |
| b9.into(), b10.into(), b11.into(), b12.into(), |
| b13.into(), b14.into(), b15.into(), b16.into(), |
| ]; |
| let n = buf.bytes_vec_mut(&mut bufs); |
| self.io.get_ref().read_bufs(&mut bufs[..n]) |
| }; |
| |
| match r { |
| Ok(n) => { |
| unsafe { buf.advance_mut(n); } |
| Ok(Async::Ready(n)) |
| } |
| Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { |
| self.io.need_read(); |
| Ok(Async::NotReady) |
| } |
| Err(e) => Err(e), |
| } |
| } |
| } |
| |
| impl<'a> AsyncWrite for &'a TcpStream { |
| fn shutdown(&mut self) -> Poll<(), io::Error> { |
| Ok(().into()) |
| } |
| |
| fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error> { |
| if let Async::NotReady = <TcpStream>::poll_write(self) { |
| return Ok(Async::NotReady) |
| } |
| let r = { |
| // The `IoVec` type can't have a zero-length size, so create a dummy |
| // version from a 1-length slice which we'll overwrite with the |
| // `bytes_vec` method. |
| static DUMMY: &[u8] = &[0]; |
| let iovec = <&IoVec>::from(DUMMY); |
| let mut bufs = [ |
| iovec, iovec, iovec, iovec, |
| iovec, iovec, iovec, iovec, |
| iovec, iovec, iovec, iovec, |
| iovec, iovec, iovec, iovec, |
| ]; |
| let n = buf.bytes_vec(&mut bufs); |
| self.io.get_ref().write_bufs(&bufs[..n]) |
| }; |
| match r { |
| Ok(n) => { |
| buf.advance(n); |
| Ok(Async::Ready(n)) |
| } |
| Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { |
| self.io.need_write(); |
| Ok(Async::NotReady) |
| } |
| Err(e) => Err(e), |
| } |
| } |
| } |
| |
| #[allow(deprecated)] |
| impl<'a> ::io::Io for &'a TcpStream { |
| fn poll_read(&mut self) -> Async<()> { |
| <TcpStream>::poll_read(self) |
| } |
| |
| fn poll_write(&mut self) -> Async<()> { |
| <TcpStream>::poll_write(self) |
| } |
| } |
| |
| impl fmt::Debug for TcpStream { |
| fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { |
| self.io.get_ref().fmt(f) |
| } |
| } |
| |
| impl Future for TcpStreamNew { |
| type Item = TcpStream; |
| type Error = io::Error; |
| |
| fn poll(&mut self) -> Poll<TcpStream, io::Error> { |
| self.inner.poll() |
| } |
| } |
| |
| impl Future for TcpStreamNewState { |
| type Item = TcpStream; |
| type Error = io::Error; |
| |
| fn poll(&mut self) -> Poll<TcpStream, io::Error> { |
| { |
| let stream = match *self { |
| TcpStreamNewState::Waiting(ref s) => s, |
| TcpStreamNewState::Error(_) => { |
| let e = match mem::replace(self, TcpStreamNewState::Empty) { |
| TcpStreamNewState::Error(e) => e, |
| _ => panic!(), |
| }; |
| return Err(e) |
| } |
| TcpStreamNewState::Empty => panic!("can't poll TCP stream twice"), |
| }; |
| |
| // Once we've connected, wait for the stream to be writable as |
| // that's when the actual connection has been initiated. Once we're |
| // writable we check for `take_socket_error` to see if the connect |
| // actually hit an error or not. |
| // |
| // If all that succeeded then we ship everything on up. |
| if let Async::NotReady = stream.io.poll_write() { |
| return Ok(Async::NotReady) |
| } |
| if let Some(e) = try!(stream.io.get_ref().take_error()) { |
| return Err(e) |
| } |
| } |
| match mem::replace(self, TcpStreamNewState::Empty) { |
| TcpStreamNewState::Waiting(stream) => Ok(Async::Ready(stream)), |
| _ => panic!(), |
| } |
| } |
| } |
| |
| #[cfg(all(unix, not(target_os = "fuchsia")))] |
| mod sys { |
| use std::os::unix::prelude::*; |
| use super::{TcpStream, TcpListener}; |
| |
| impl AsRawFd for TcpStream { |
| fn as_raw_fd(&self) -> RawFd { |
| self.io.get_ref().as_raw_fd() |
| } |
| } |
| |
| impl AsRawFd for TcpListener { |
| fn as_raw_fd(&self) -> RawFd { |
| self.io.get_ref().as_raw_fd() |
| } |
| } |
| } |
| |
| #[cfg(windows)] |
| mod sys { |
| // TODO: let's land these upstream with mio and then we can add them here. |
| // |
| // use std::os::windows::prelude::*; |
| // use super::{TcpStream, TcpListener}; |
| // |
| // impl AsRawHandle for TcpStream { |
| // fn as_raw_handle(&self) -> RawHandle { |
| // self.io.get_ref().as_raw_handle() |
| // } |
| // } |
| // |
| // impl AsRawHandle for TcpListener { |
| // fn as_raw_handle(&self) -> RawHandle { |
| // self.listener.io().as_raw_handle() |
| // } |
| // } |
| } |