blob: fdb1c08a97bafab5ddb410e8500e3e0cc265cd3a [file] [log] [blame]
#![cfg(all(feature = "os-poll", feature = "net"))]
use mio::net::TcpListener;
use mio::{Interest, Token};
use std::io::{self, Read};
use std::net::{self, SocketAddr};
#[cfg(unix)]
use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd};
use std::sync::{Arc, Barrier};
use std::thread;
mod util;
use util::{
any_local_address, any_local_ipv6_address, assert_send, assert_socket_close_on_exec,
assert_socket_non_blocking, assert_sync, assert_would_block, expect_events, expect_no_events,
init, init_with_poll, ExpectEvent,
};
const ID1: Token = Token(0);
const ID2: Token = Token(1);
#[test]
fn is_send_and_sync() {
assert_send::<TcpListener>();
assert_sync::<TcpListener>();
}
#[test]
fn tcp_listener() {
smoke_test_tcp_listener(any_local_address(), TcpListener::bind);
}
#[test]
fn tcp_listener_ipv6() {
smoke_test_tcp_listener(any_local_ipv6_address(), TcpListener::bind);
}
#[test]
fn tcp_listener_std() {
smoke_test_tcp_listener(any_local_address(), |addr| {
let listener = net::TcpListener::bind(addr).unwrap();
// `std::net::TcpListener`s are blocking by default, so make sure it is in
// non-blocking mode before wrapping in a Mio equivalent.
listener.set_nonblocking(true).unwrap();
Ok(TcpListener::from_std(listener))
});
}
fn smoke_test_tcp_listener<F>(addr: SocketAddr, make_listener: F)
where
F: FnOnce(SocketAddr) -> io::Result<TcpListener>,
{
let (mut poll, mut events) = init_with_poll();
let mut listener = make_listener(addr).unwrap();
let address = listener.local_addr().unwrap();
assert_socket_non_blocking(&listener);
assert_socket_close_on_exec(&listener);
poll.registry()
.register(&mut listener, ID1, Interest::READABLE)
.expect("unable to register TCP listener");
let barrier = Arc::new(Barrier::new(2));
let thread_handle = start_connections(address, 1, barrier.clone());
expect_events(
&mut poll,
&mut events,
vec![ExpectEvent::new(ID1, Interest::READABLE)],
);
// Expect a single connection.
let (mut stream, peer_address) = listener.accept().expect("unable to accept connection");
assert!(peer_address.ip().is_loopback());
assert_eq!(stream.peer_addr().unwrap(), peer_address);
assert_eq!(stream.local_addr().unwrap(), address);
// Expect the stream to be non-blocking.
let mut buf = [0; 20];
assert_would_block(stream.read(&mut buf));
// Expect no more connections.
assert_would_block(listener.accept());
assert!(listener.take_error().unwrap().is_none());
barrier.wait();
thread_handle.join().expect("unable to join thread");
}
#[test]
fn set_get_ttl() {
init();
let listener = TcpListener::bind(any_local_address()).unwrap();
// set TTL, get TTL, make sure it has the expected value
const TTL: u32 = 10;
listener.set_ttl(TTL).unwrap();
assert_eq!(listener.ttl().unwrap(), TTL);
assert!(listener.take_error().unwrap().is_none());
}
#[test]
fn get_ttl_without_previous_set() {
init();
let listener = TcpListener::bind(any_local_address()).unwrap();
// expect a get TTL to work w/o any previous set_ttl
listener.ttl().expect("unable to get TTL for TCP listener");
assert!(listener.take_error().unwrap().is_none());
}
#[cfg(unix)]
#[test]
fn raw_fd() {
init();
let listener = TcpListener::bind(any_local_address()).unwrap();
let address = listener.local_addr().unwrap();
let raw_fd1 = listener.as_raw_fd();
let raw_fd2 = listener.into_raw_fd();
assert_eq!(raw_fd1, raw_fd2);
let listener = unsafe { TcpListener::from_raw_fd(raw_fd2) };
assert_eq!(listener.as_raw_fd(), raw_fd1);
assert_eq!(listener.local_addr().unwrap(), address);
}
#[test]
fn registering() {
let (mut poll, mut events) = init_with_poll();
let mut stream = TcpListener::bind(any_local_address()).unwrap();
poll.registry()
.register(&mut stream, ID1, Interest::READABLE)
.expect("unable to register TCP listener");
expect_no_events(&mut poll, &mut events);
// NOTE: more tests are done in the smoke tests above.
}
#[test]
fn reregister() {
let (mut poll, mut events) = init_with_poll();
let mut listener = TcpListener::bind(any_local_address()).unwrap();
let address = listener.local_addr().unwrap();
poll.registry()
.register(&mut listener, ID1, Interest::READABLE)
.unwrap();
poll.registry()
.reregister(&mut listener, ID2, Interest::READABLE)
.unwrap();
let barrier = Arc::new(Barrier::new(2));
let thread_handle = start_connections(address, 1, barrier.clone());
expect_events(
&mut poll,
&mut events,
vec![ExpectEvent::new(ID2, Interest::READABLE)],
);
let (stream, peer_address) = listener.accept().expect("unable to accept connection");
assert!(peer_address.ip().is_loopback());
assert_eq!(stream.peer_addr().unwrap(), peer_address);
assert_eq!(stream.local_addr().unwrap(), address);
assert_would_block(listener.accept());
assert!(listener.take_error().unwrap().is_none());
barrier.wait();
thread_handle.join().expect("unable to join thread");
}
#[test]
fn no_events_after_deregister() {
let (mut poll, mut events) = init_with_poll();
let mut listener = TcpListener::bind(any_local_address()).unwrap();
let address = listener.local_addr().unwrap();
poll.registry()
.register(&mut listener, ID1, Interest::READABLE)
.unwrap();
let barrier = Arc::new(Barrier::new(2));
let thread_handle = start_connections(address, 1, barrier.clone());
poll.registry().deregister(&mut listener).unwrap();
expect_no_events(&mut poll, &mut events);
// Should still be able to accept the connection.
let (stream, peer_address) = listener.accept().expect("unable to accept connection");
assert!(peer_address.ip().is_loopback());
assert_eq!(stream.peer_addr().unwrap(), peer_address);
assert_eq!(stream.local_addr().unwrap(), address);
assert_would_block(listener.accept());
assert!(listener.take_error().unwrap().is_none());
barrier.wait();
thread_handle.join().expect("unable to join thread");
}
/// This tests reregister on successful accept works
#[test]
fn tcp_listener_two_streams() {
let (mut poll1, mut events) = init_with_poll();
let mut listener = TcpListener::bind(any_local_address()).unwrap();
let address = listener.local_addr().unwrap();
let barrier = Arc::new(Barrier::new(3));
let thread_handle1 = start_connections(address, 1, barrier.clone());
poll1
.registry()
.register(&mut listener, ID1, Interest::READABLE)
.unwrap();
expect_events(
&mut poll1,
&mut events,
vec![ExpectEvent::new(ID1, Interest::READABLE)],
);
{
let (stream, peer_address) = listener.accept().expect("unable to accept connection");
assert!(peer_address.ip().is_loopback());
assert_eq!(stream.peer_addr().unwrap(), peer_address);
assert_eq!(stream.local_addr().unwrap(), address);
}
assert_would_block(listener.accept());
let thread_handle2 = start_connections(address, 1, barrier.clone());
expect_events(
&mut poll1,
&mut events,
vec![ExpectEvent::new(ID1, Interest::READABLE)],
);
{
let (stream, peer_address) = listener.accept().expect("unable to accept connection");
assert!(peer_address.ip().is_loopback());
assert_eq!(stream.peer_addr().unwrap(), peer_address);
assert_eq!(stream.local_addr().unwrap(), address);
}
expect_no_events(&mut poll1, &mut events);
barrier.wait();
thread_handle1.join().expect("unable to join thread");
thread_handle2.join().expect("unable to join thread");
}
/// Start `n_connections` connections to `address`. If a `barrier` is provided
/// it will wait on it after each connection is made before it is dropped.
fn start_connections(
address: SocketAddr,
n_connections: usize,
barrier: Arc<Barrier>,
) -> thread::JoinHandle<()> {
thread::spawn(move || {
for _ in 0..n_connections {
let conn = net::TcpStream::connect(address).unwrap();
barrier.wait();
drop(conn);
}
})
}