blob: b3590890e7325d84a7ffeca17e1c508c4811338f [file] [log] [blame]
#![cfg(all(feature = "os-poll", feature = "tcp"))]
use mio::net::TcpStream;
use mio::{Interest, Token};
use std::io::{self, IoSlice, IoSliceMut, Read, Write};
use std::net::{self, Shutdown, SocketAddr};
#[cfg(unix)]
use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd};
use std::sync::{mpsc::channel, Arc, Barrier};
use std::thread;
use std::time::Duration;
#[macro_use]
mod util;
#[cfg(not(target_os = "windows"))]
use util::init;
use util::{
any_local_address, any_local_ipv6_address, assert_send, assert_socket_close_on_exec,
assert_socket_non_blocking, assert_sync, assert_would_block, expect_events, expect_no_events,
init_with_poll, ExpectEvent, Readiness,
};
const DATA1: &[u8] = b"Hello world!";
const DATA2: &[u8] = b"Hello mars!";
// TODO: replace with `DATA1.len()` once `const_slice_len` is stable.
const DATA1_LEN: usize = 12;
const DATA2_LEN: usize = 11;
const ID1: Token = Token(0);
const ID2: Token = Token(1);
#[test]
fn is_send_and_sync() {
assert_send::<TcpStream>();
assert_sync::<TcpStream>();
}
#[test]
fn tcp_stream_ipv4() {
smoke_test_tcp_stream(any_local_address(), TcpStream::connect);
}
#[test]
fn tcp_stream_ipv6() {
smoke_test_tcp_stream(any_local_ipv6_address(), TcpStream::connect);
}
#[test]
fn tcp_stream_std() {
smoke_test_tcp_stream(any_local_address(), |addr| {
let stream = net::TcpStream::connect(addr).unwrap();
// `std::net::TcpStream`s are blocking by default, so make sure it is
// in non-blocking mode before wrapping in a Mio equivalent.
stream.set_nonblocking(true).unwrap();
Ok(TcpStream::from_std(stream))
});
}
fn smoke_test_tcp_stream<F>(addr: SocketAddr, make_stream: F)
where
F: FnOnce(SocketAddr) -> io::Result<TcpStream>,
{
let (mut poll, mut events) = init_with_poll();
let (handle, addr) = echo_listener(addr, 1);
let mut stream = make_stream(addr).unwrap();
assert_socket_non_blocking(&stream);
assert_socket_close_on_exec(&stream);
poll.registry()
.register(&mut stream, ID1, Interest::WRITABLE.add(Interest::READABLE))
.expect("unable to register TCP stream");
expect_events(
&mut poll,
&mut events,
vec![ExpectEvent::new(ID1, Interest::WRITABLE)],
);
let mut buf = [0; 16];
assert_would_block(stream.peek(&mut buf));
assert_would_block(stream.read(&mut buf));
// NOTE: the call to `peer_addr` must happen after we received a writable
// event as the stream might not yet be connected.
assert_eq!(stream.peer_addr().unwrap(), addr);
assert!(stream.local_addr().unwrap().ip().is_loopback());
checked_write!(stream.write(&DATA1));
stream.flush().unwrap();
expect_events(
&mut poll,
&mut events,
vec![ExpectEvent::new(ID1, Interest::READABLE)],
);
expect_read!(stream.peek(&mut buf), DATA1);
expect_read!(stream.read(&mut buf), DATA1);
assert!(stream.take_error().unwrap().is_none());
assert_would_block(stream.read(&mut buf));
let bufs = [IoSlice::new(&DATA1), IoSlice::new(&DATA2)];
let n = stream
.write_vectored(&bufs)
.expect("unable to write vectored to stream");
assert_eq!(n, DATA1.len() + DATA2.len());
expect_events(
&mut poll,
&mut events,
vec![ExpectEvent::new(ID1, Interest::READABLE)],
);
let mut buf1 = [1; DATA1_LEN];
let mut buf2 = [2; DATA2_LEN + 1];
let mut bufs = [IoSliceMut::new(&mut buf1), IoSliceMut::new(&mut buf2)];
let n = stream
.read_vectored(&mut bufs)
.expect("unable to read vectored from stream");
assert_eq!(n, DATA1.len() + DATA2.len());
assert_eq!(&buf1, DATA1);
assert_eq!(&buf2[..DATA2.len()], DATA2);
assert_eq!(buf2[DATA2.len()], 2); // Last byte should be unchanged.
// Close the connection to allow the listener to shutdown.
drop(stream);
handle.join().expect("unable to join thread");
}
#[test]
fn set_get_ttl() {
let (mut poll, mut events) = init_with_poll();
let barrier = Arc::new(Barrier::new(2));
let (thread_handle, address) = start_listener(1, Some(barrier.clone()), false);
let mut stream = TcpStream::connect(address).unwrap();
// on Windows: the stream must be connected before setting the ttl, otherwise
// it is undefined behavior, register and expect a WRITABLE here to make sure
// the stream is connected
poll.registry()
.register(&mut stream, ID1, Interest::WRITABLE)
.expect("unable to register TCP stream");
expect_events(
&mut poll,
&mut events,
vec![ExpectEvent::new(ID1, Interest::WRITABLE)],
);
// set TTL, get TTL, make sure it has the expected value
const TTL: u32 = 10;
stream.set_ttl(TTL).unwrap();
assert_eq!(stream.ttl().unwrap(), TTL);
assert!(stream.take_error().unwrap().is_none());
barrier.wait();
thread_handle.join().expect("unable to join thread");
}
#[test]
fn get_ttl_without_previous_set() {
let (mut poll, mut events) = init_with_poll();
let barrier = Arc::new(Barrier::new(2));
let (thread_handle, address) = start_listener(1, Some(barrier.clone()), false);
let mut stream = TcpStream::connect(address).unwrap();
// on Windows: the stream must be connected before getting the ttl, otherwise
// it is undefined behavior, register and expect a WRITABLE here to make sure
// the stream is connected
poll.registry()
.register(&mut stream, ID1, Interest::WRITABLE)
.expect("unable to register TCP stream");
expect_events(
&mut poll,
&mut events,
vec![ExpectEvent::new(ID1, Interest::WRITABLE)],
);
// expect a get TTL to work w/o any previous set_ttl
stream.ttl().expect("unable to get TTL for TCP stream");
assert!(stream.take_error().unwrap().is_none());
barrier.wait();
thread_handle.join().expect("unable to join thread");
}
#[test]
fn set_get_nodelay() {
let (mut poll, mut events) = init_with_poll();
let barrier = Arc::new(Barrier::new(2));
let (thread_handle, address) = start_listener(1, Some(barrier.clone()), false);
let mut stream = TcpStream::connect(address).unwrap();
// on Windows: the stream must be connected before setting the nodelay, otherwise
// it is undefined behavior, register and expect a WRITABLE here to make sure
// the stream is connected
poll.registry()
.register(&mut stream, ID1, Interest::WRITABLE)
.expect("unable to register TCP stream");
expect_events(
&mut poll,
&mut events,
vec![ExpectEvent::new(ID1, Interest::WRITABLE)],
);
// set nodelay, get nodelay, make sure it has the expected value
const NO_DELAY: bool = true;
stream.set_nodelay(NO_DELAY).unwrap();
assert_eq!(stream.nodelay().unwrap(), NO_DELAY);
assert!(stream.take_error().unwrap().is_none());
barrier.wait();
thread_handle.join().expect("unable to join thread");
}
#[test]
fn get_nodelay_without_previous_set() {
let (mut poll, mut events) = init_with_poll();
let barrier = Arc::new(Barrier::new(2));
let (thread_handle, address) = start_listener(1, Some(barrier.clone()), false);
let mut stream = TcpStream::connect(address).unwrap();
// on Windows: the stream must be connected before setting the nodelay, otherwise
// it is undefined behavior, register and expect a WRITABLE here to make sure
// the stream is connected
poll.registry()
.register(&mut stream, ID1, Interest::WRITABLE)
.expect("unable to register TCP stream");
expect_events(
&mut poll,
&mut events,
vec![ExpectEvent::new(ID1, Interest::WRITABLE)],
);
// expect a get nodelay to work w/o any previous set nodelay
stream
.nodelay()
.expect("Unable to get nodelay for TCP stream");
assert!(stream.take_error().unwrap().is_none());
barrier.wait();
thread_handle.join().expect("unable to join thread");
}
#[test]
fn shutdown_read() {
let (mut poll, mut events) = init_with_poll();
let (thread_handle, address) = echo_listener(any_local_address(), 1);
let mut stream = TcpStream::connect(address).unwrap();
poll.registry()
.register(&mut stream, ID1, Interest::WRITABLE.add(Interest::READABLE))
.expect("unable to register TCP stream");
expect_events(
&mut poll,
&mut events,
vec![ExpectEvent::new(ID1, Interest::WRITABLE)],
);
checked_write!(stream.write(&DATA2));
expect_events(
&mut poll,
&mut events,
vec![ExpectEvent::new(ID1, Interest::READABLE)],
);
stream.shutdown(Shutdown::Read).unwrap();
// Shutting down the reading side is different on each platform. For example
// on Linux based systems we can still read.
#[cfg(any(
target_os = "dragonfly",
target_os = "freebsd",
target_os = "ios",
target_os = "macos",
target_os = "netbsd",
target_os = "openbsd"
))]
{
let mut buf = [0; 20];
expect_read!(stream.read(&mut buf), &[]);
}
drop(stream);
thread_handle.join().expect("unable to join thread");
}
#[test]
#[ignore = "This test is flaky, it doesn't always receive an event after shutting down the write side"]
fn shutdown_write() {
let (mut poll, mut events) = init_with_poll();
let (thread_handle, address) = echo_listener(any_local_address(), 1);
let mut stream = TcpStream::connect(address).unwrap();
poll.registry()
.register(&mut stream, ID1, Interest::WRITABLE.add(Interest::READABLE))
.expect("unable to register TCP stream");
expect_events(
&mut poll,
&mut events,
vec![ExpectEvent::new(ID1, Interest::WRITABLE)],
);
checked_write!(stream.write(&DATA1));
stream.shutdown(Shutdown::Write).unwrap();
let err = stream.write(DATA2).unwrap_err();
assert_eq!(err.kind(), io::ErrorKind::BrokenPipe);
// FIXME: we don't always receive the following event.
expect_events(
&mut poll,
&mut events,
vec![ExpectEvent::new(ID1, Interest::READABLE)],
);
// Read should be ok.
let mut buf = [0; 20];
expect_read!(stream.read(&mut buf), DATA1);
drop(stream);
thread_handle.join().expect("unable to join thread");
}
#[test]
fn shutdown_both() {
let (mut poll, mut events) = init_with_poll();
let (thread_handle, address) = echo_listener(any_local_address(), 1);
let mut stream = TcpStream::connect(address).unwrap();
poll.registry()
.register(&mut stream, ID1, Interest::WRITABLE.add(Interest::READABLE))
.expect("unable to register TCP stream");
expect_events(
&mut poll,
&mut events,
vec![ExpectEvent::new(ID1, Interest::WRITABLE)],
);
checked_write!(stream.write(&DATA1));
expect_events(
&mut poll,
&mut events,
vec![ExpectEvent::new(ID1, Interest::READABLE)],
);
stream.shutdown(Shutdown::Both).unwrap();
// Shutting down the reading side is different on each platform. For example
// on Linux based systems we can still read.
#[cfg(any(
target_os = "dragonfly",
target_os = "freebsd",
target_os = "ios",
target_os = "macos",
target_os = "netbsd",
target_os = "openbsd"
))]
{
let mut buf = [0; 20];
expect_read!(stream.read(&mut buf), &[]);
}
let err = stream.write(DATA2).unwrap_err();
#[cfg(unix)]
assert_eq!(err.kind(), io::ErrorKind::BrokenPipe);
#[cfg(windows)]
assert_eq!(err.kind(), io::ErrorKind::ConnectionAborted);
drop(stream);
thread_handle.join().expect("unable to join thread");
}
#[cfg(unix)]
#[test]
fn raw_fd() {
init();
let (thread_handle, address) = start_listener(1, None, false);
let stream = TcpStream::connect(address).unwrap();
let address = stream.local_addr().unwrap();
let raw_fd1 = stream.as_raw_fd();
let raw_fd2 = stream.into_raw_fd();
assert_eq!(raw_fd1, raw_fd2);
let stream = unsafe { TcpStream::from_raw_fd(raw_fd2) };
assert_eq!(stream.as_raw_fd(), raw_fd1);
assert_eq!(stream.local_addr().unwrap(), address);
thread_handle.join().expect("unable to join thread");
}
#[test]
fn registering() {
let (mut poll, mut events) = init_with_poll();
let (thread_handle, address) = echo_listener(any_local_address(), 1);
let mut stream = TcpStream::connect(address).unwrap();
poll.registry()
.register(&mut stream, ID1, Interest::READABLE)
.expect("unable to register TCP stream");
expect_no_events(&mut poll, &mut events);
// NOTE: more tests are done in the smoke tests above.
drop(stream);
thread_handle.join().expect("unable to join thread");
}
#[test]
fn reregistering() {
let (mut poll, mut events) = init_with_poll();
let (thread_handle, address) = echo_listener(any_local_address(), 1);
let mut stream = TcpStream::connect(address).unwrap();
poll.registry()
.register(&mut stream, ID1, Interest::READABLE)
.expect("unable to register TCP stream");
poll.registry()
.reregister(&mut stream, ID2, Interest::WRITABLE)
.expect("unable to reregister TCP stream");
expect_events(
&mut poll,
&mut events,
vec![ExpectEvent::new(ID2, Interest::WRITABLE)],
);
assert_eq!(stream.peer_addr().unwrap(), address);
drop(stream);
thread_handle.join().expect("unable to join thread");
}
#[test]
fn no_events_after_deregister() {
let (mut poll, mut events) = init_with_poll();
let (thread_handle, address) = echo_listener(any_local_address(), 1);
let mut stream = TcpStream::connect(address).unwrap();
poll.registry()
.register(&mut stream, ID1, Interest::WRITABLE.add(Interest::READABLE))
.expect("unable to register TCP stream");
poll.registry()
.deregister(&mut stream)
.expect("unable to deregister TCP stream");
expect_no_events(&mut poll, &mut events);
// We do expect to be connected.
assert_eq!(stream.peer_addr().unwrap(), address);
// Also, write should work
let mut buf = [0; 16];
assert_would_block(stream.peek(&mut buf));
assert_would_block(stream.read(&mut buf));
checked_write!(stream.write(&DATA1));
stream.flush().unwrap();
expect_no_events(&mut poll, &mut events);
drop(stream);
thread_handle.join().expect("unable to join thread");
}
#[test]
#[cfg_attr(
windows,
ignore = "fails on Windows; client read closed events are not triggered"
)]
fn tcp_shutdown_client_read_close_event() {
let (mut poll, mut events) = init_with_poll();
let barrier = Arc::new(Barrier::new(2));
let (handle, sockaddr) = start_listener(1, Some(barrier.clone()), false);
let mut stream = TcpStream::connect(sockaddr).unwrap();
let interests = Interest::READABLE | Interest::WRITABLE;
poll.registry()
.register(&mut stream, ID1, interests)
.unwrap();
expect_events(
&mut poll,
&mut events,
vec![ExpectEvent::new(ID1, Interest::WRITABLE)],
);
stream.shutdown(Shutdown::Read).unwrap();
expect_events(
&mut poll,
&mut events,
vec![ExpectEvent::new(ID1, Readiness::READ_CLOSED)],
);
barrier.wait();
handle.join().expect("failed to join thread");
}
#[test]
#[cfg_attr(windows, ignore = "fails; client write_closed events are not found")]
#[cfg_attr(
any(
target_os = "android",
target_os = "illumos",
target_os = "linux",
target_os = "solaris"
),
ignore = "fails; client write_closed events are not found"
)]
fn tcp_shutdown_client_write_close_event() {
let (mut poll, mut events) = init_with_poll();
let barrier = Arc::new(Barrier::new(2));
let (handle, sockaddr) = start_listener(1, Some(barrier.clone()), false);
let mut stream = TcpStream::connect(sockaddr).unwrap();
let interests = Interest::READABLE | Interest::WRITABLE;
poll.registry()
.register(&mut stream, ID1, interests)
.unwrap();
expect_events(
&mut poll,
&mut events,
vec![ExpectEvent::new(ID1, Interest::WRITABLE)],
);
stream.shutdown(Shutdown::Write).unwrap();
expect_events(
&mut poll,
&mut events,
vec![ExpectEvent::new(ID1, Readiness::WRITE_CLOSED)],
);
barrier.wait();
handle.join().expect("failed to join thread");
}
#[test]
fn tcp_shutdown_server_write_close_event() {
let (mut poll, mut events) = init_with_poll();
let barrier = Arc::new(Barrier::new(2));
let (handle, sockaddr) = start_listener(1, Some(barrier.clone()), true);
let mut stream = TcpStream::connect(sockaddr).unwrap();
poll.registry()
.register(&mut stream, ID1, Interest::READABLE.add(Interest::WRITABLE))
.unwrap();
expect_events(
&mut poll,
&mut events,
vec![ExpectEvent::new(ID1, Interest::WRITABLE)],
);
barrier.wait();
expect_events(
&mut poll,
&mut events,
vec![ExpectEvent::new(ID1, Readiness::READ_CLOSED)],
);
barrier.wait();
handle.join().expect("failed to join thread");
}
#[test]
fn tcp_reset_close_event() {
let (mut poll, mut events) = init_with_poll();
let listener = net::TcpListener::bind(any_local_address()).unwrap();
let sockaddr = listener.local_addr().unwrap();
let mut stream = TcpStream::connect(sockaddr).unwrap();
poll.registry()
.register(&mut stream, ID1, Interest::READABLE.add(Interest::WRITABLE))
.unwrap();
let server_stream = listener.accept().unwrap();
expect_events(
&mut poll,
&mut events,
vec![ExpectEvent::new(ID1, Interest::WRITABLE)],
);
checked_write!(stream.write(DATA1));
// Try to read something.
assert_would_block(stream.read(&mut [0]));
// Server goes away.
drop(server_stream);
expect_events(
&mut poll,
&mut events,
vec![ExpectEvent::new(ID1, Readiness::READ_CLOSED)],
);
// Make sure we quiesce. `expect_no_events` seems to flake sometimes on mac/freebsd.
loop {
poll.poll(&mut events, Some(Duration::from_millis(100)))
.expect("poll failed");
if events.iter().count() == 0 {
break;
}
}
}
#[test]
#[cfg_attr(
windows,
ignore = "fails on Windows; client close events are not found"
)]
fn tcp_shutdown_client_both_close_event() {
let (mut poll, mut events) = init_with_poll();
let barrier = Arc::new(Barrier::new(2));
let (handle, sockaddr) = start_listener(1, Some(barrier.clone()), false);
let mut stream = TcpStream::connect(sockaddr).unwrap();
poll.registry()
.register(&mut stream, ID1, Interest::READABLE.add(Interest::WRITABLE))
.unwrap();
expect_events(
&mut poll,
&mut events,
vec![ExpectEvent::new(ID1, Interest::WRITABLE)],
);
stream.shutdown(Shutdown::Both).unwrap();
expect_events(
&mut poll,
&mut events,
vec![ExpectEvent::new(ID1, Readiness::WRITE_CLOSED)],
);
barrier.wait();
handle.join().expect("failed to join thread");
}
/// Start a listener that accepts `n_connections` connections on the returned
/// address. It echos back any data it reads from the connection before
/// accepting another one.
fn echo_listener(addr: SocketAddr, n_connections: usize) -> (thread::JoinHandle<()>, SocketAddr) {
let (sender, receiver) = channel();
let thread_handle = thread::spawn(move || {
let listener = net::TcpListener::bind(addr).unwrap();
let local_address = listener.local_addr().unwrap();
sender.send(local_address).unwrap();
let mut buf = [0; 128];
for _ in 0..n_connections {
let (mut stream, _) = listener.accept().unwrap();
loop {
let n = stream
.read(&mut buf)
// On Linux based system it will cause a connection reset
// error when the reading side of the peer connection is
// shutdown, we don't consider it an actual here.
.or_else(|err| match err {
ref err if err.kind() == io::ErrorKind::ConnectionReset => Ok(0),
err => Err(err),
})
.expect("error reading");
if n == 0 {
break;
}
checked_write!(stream.write(&buf[..n]));
}
}
});
(thread_handle, receiver.recv().unwrap())
}
/// Start a listener that accepts `n_connections` connections on the returned
/// address. If a barrier is provided it will wait on it before closing the
/// connection.
fn start_listener(
n_connections: usize,
barrier: Option<Arc<Barrier>>,
shutdown_write: bool,
) -> (thread::JoinHandle<()>, SocketAddr) {
let (sender, receiver) = channel();
let thread_handle = thread::spawn(move || {
let listener = net::TcpListener::bind(any_local_address()).unwrap();
let local_address = listener.local_addr().unwrap();
sender.send(local_address).unwrap();
for _ in 0..n_connections {
let (stream, _) = listener.accept().unwrap();
if let Some(ref barrier) = barrier {
barrier.wait();
if shutdown_write {
stream.shutdown(Shutdown::Write).unwrap();
barrier.wait();
}
}
drop(stream);
}
});
(thread_handle, receiver.recv().unwrap())
}