blob: a2f1739ceb2b9402a938710f89e9292edab6627a [file] [log] [blame]
#![cfg(all(feature = "os-poll", feature = "net"))]
use mio::net::{TcpListener, TcpSocket, TcpStream};
use mio::{Events, Interest, Poll, Token};
use std::io::{self, Read, Write};
use std::net::{self, Shutdown};
use std::sync::mpsc::channel;
use std::thread::{self, sleep};
use std::time::Duration;
#[macro_use]
mod util;
use util::{
any_local_address, assert_send, assert_sync, expect_events, expect_no_events, init,
init_with_poll, ExpectEvent,
};
const LISTEN: Token = Token(0);
const CLIENT: Token = Token(1);
const SERVER: Token = Token(2);
#[test]
#[cfg(all(unix, not(debug_assertions)))]
fn assert_size() {
use mio::net::*;
use std::mem::size_of;
// Without debug assertions enabled `TcpListener`, `TcpStream` and
// `UdpSocket` should have the same size as the system specific socket, i.e.
// just a file descriptor on Unix platforms.
assert_eq!(size_of::<TcpListener>(), size_of::<std::net::TcpListener>());
assert_eq!(size_of::<TcpStream>(), size_of::<std::net::TcpStream>());
}
#[test]
fn is_send_and_sync() {
assert_send::<TcpListener>();
assert_sync::<TcpListener>();
assert_send::<TcpStream>();
assert_sync::<TcpStream>();
}
#[test]
fn accept() {
init();
struct Data {
hit: bool,
listener: TcpListener,
shutdown: bool,
}
let mut listener = TcpListener::bind("127.0.0.1:0".parse().unwrap()).unwrap();
let addr = listener.local_addr().unwrap();
let handle = thread::spawn(move || {
net::TcpStream::connect(addr).unwrap();
});
let mut poll = Poll::new().unwrap();
poll.registry()
.register(&mut listener, Token(1), Interest::READABLE)
.unwrap();
let mut events = Events::with_capacity(128);
let mut data = Data {
hit: false,
listener,
shutdown: false,
};
while !data.shutdown {
poll.poll(&mut events, None).unwrap();
for event in &events {
data.hit = true;
assert_eq!(event.token(), Token(1));
assert!(event.is_readable());
assert!(data.listener.accept().is_ok());
data.shutdown = true;
}
}
assert!(data.hit);
assert!(data.listener.accept().unwrap_err().kind() == io::ErrorKind::WouldBlock);
handle.join().unwrap();
}
#[test]
fn connect() {
init();
struct Data {
hit: u32,
shutdown: bool,
}
let listener = net::TcpListener::bind("127.0.0.1:0").unwrap();
let addr = listener.local_addr().unwrap();
let (tx, rx) = channel();
let (tx2, rx2) = channel();
let handle = thread::spawn(move || {
let stream = listener.accept().unwrap();
rx.recv().unwrap();
drop(stream);
tx2.send(()).unwrap();
});
let mut poll = Poll::new().unwrap();
let mut stream = TcpStream::connect(addr).unwrap();
poll.registry()
.register(
&mut stream,
Token(1),
Interest::READABLE | Interest::WRITABLE,
)
.unwrap();
let mut events = Events::with_capacity(128);
let mut data = Data {
hit: 0,
shutdown: false,
};
while !data.shutdown {
poll.poll(&mut events, None).unwrap();
for event in &events {
assert_eq!(event.token(), Token(1));
match data.hit {
0 => assert!(event.is_writable()),
1 => assert!(event.is_readable()),
_ => panic!(),
}
data.hit += 1;
data.shutdown = true;
}
}
assert_eq!(data.hit, 1);
tx.send(()).unwrap();
rx2.recv().unwrap();
data.shutdown = false;
while !data.shutdown {
poll.poll(&mut events, None).unwrap();
for event in &events {
assert_eq!(event.token(), Token(1));
match data.hit {
0 => assert!(event.is_writable()),
1 => assert!(event.is_readable()),
_ => panic!(),
}
data.hit += 1;
data.shutdown = true;
}
}
assert_eq!(data.hit, 2);
handle.join().unwrap();
}
#[test]
fn read() {
init();
const N: usize = 16 * 1024 * 1024;
struct Data {
amt: usize,
socket: TcpStream,
shutdown: bool,
}
let listener = net::TcpListener::bind("127.0.0.1:0").unwrap();
let addr = listener.local_addr().unwrap();
let handle = thread::spawn(move || {
let mut stream = listener.accept().unwrap().0;
let buf = [0; 1024];
let mut amt = 0;
while amt < N {
amt += stream.write(&buf).unwrap();
}
});
let mut poll = Poll::new().unwrap();
let mut stream = TcpStream::connect(addr).unwrap();
poll.registry()
.register(&mut stream, Token(1), Interest::READABLE)
.unwrap();
let mut events = Events::with_capacity(128);
let mut data = Data {
amt: 0,
socket: stream,
shutdown: false,
};
while !data.shutdown {
poll.poll(&mut events, None).unwrap();
for event in &events {
assert_eq!(event.token(), Token(1));
let mut buf = [0; 1024];
loop {
if let Ok(amt) = data.socket.read(&mut buf) {
data.amt += amt;
} else {
break;
}
if data.amt >= N {
data.shutdown = true;
break;
}
}
}
}
handle.join().unwrap();
}
#[test]
fn peek() {
init();
const N: usize = 16 * 1024 * 1024;
struct Data {
amt: usize,
socket: TcpStream,
shutdown: bool,
}
let listener = net::TcpListener::bind("127.0.0.1:0").unwrap();
let addr = listener.local_addr().unwrap();
let handle = thread::spawn(move || {
let mut stream = listener.accept().unwrap().0;
let buf = [0; 1024];
let mut amt = 0;
while amt < N {
amt += stream.write(&buf).unwrap();
}
});
let mut poll = Poll::new().unwrap();
let mut stream = TcpStream::connect(addr).unwrap();
poll.registry()
.register(&mut stream, Token(1), Interest::READABLE)
.unwrap();
let mut events = Events::with_capacity(128);
let mut data = Data {
amt: 0,
socket: stream,
shutdown: false,
};
while !data.shutdown {
poll.poll(&mut events, None).unwrap();
for event in &events {
assert_eq!(event.token(), Token(1));
let mut buf = [0; 1024];
match data.socket.peek(&mut buf) {
Ok(_) => (),
Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => continue,
Err(err) => panic!("unexpected error: {}", err),
}
loop {
if let Ok(amt) = data.socket.read(&mut buf) {
data.amt += amt;
} else {
break;
}
if data.amt >= N {
data.shutdown = true;
break;
}
}
}
}
handle.join().unwrap();
}
#[test]
fn write() {
init();
const N: usize = 16 * 1024 * 1024;
struct Data {
amt: usize,
socket: TcpStream,
shutdown: bool,
}
let listener = net::TcpListener::bind("127.0.0.1:0").unwrap();
let addr = listener.local_addr().unwrap();
let handle = thread::spawn(move || {
let mut stream = listener.accept().unwrap().0;
let mut buf = [0; 1024];
let mut amt = 0;
while amt < N {
amt += stream.read(&mut buf).unwrap();
}
});
let mut poll = Poll::new().unwrap();
let mut stream = TcpStream::connect(addr).unwrap();
poll.registry()
.register(&mut stream, Token(1), Interest::WRITABLE)
.unwrap();
let mut events = Events::with_capacity(128);
let mut data = Data {
amt: 0,
socket: stream,
shutdown: false,
};
while !data.shutdown {
poll.poll(&mut events, None).unwrap();
for event in &events {
assert_eq!(event.token(), Token(1));
let buf = [0; 1024];
loop {
if let Ok(amt) = data.socket.write(&buf) {
data.amt += amt;
} else {
break;
}
if data.amt >= N {
data.shutdown = true;
break;
}
}
}
}
handle.join().unwrap();
}
#[test]
fn connect_then_close() {
init();
struct Data {
listener: TcpListener,
shutdown: bool,
}
let mut poll = Poll::new().unwrap();
let mut listener = TcpListener::bind("127.0.0.1:0".parse().unwrap()).unwrap();
let mut s = TcpStream::connect(listener.local_addr().unwrap()).unwrap();
poll.registry()
.register(&mut listener, Token(1), Interest::READABLE)
.unwrap();
poll.registry()
.register(&mut s, Token(2), Interest::READABLE)
.unwrap();
let mut events = Events::with_capacity(128);
let mut data = Data {
listener,
shutdown: false,
};
while !data.shutdown {
poll.poll(&mut events, None).unwrap();
for event in &events {
if event.token() == Token(1) {
let mut s = data.listener.accept().unwrap().0;
poll.registry()
.register(&mut s, Token(3), Interest::READABLE | Interest::WRITABLE)
.unwrap();
drop(s);
} else if event.token() == Token(2) {
data.shutdown = true;
}
}
}
}
#[test]
fn listen_then_close() {
init();
let mut poll = Poll::new().unwrap();
let mut listener = TcpListener::bind("127.0.0.1:0".parse().unwrap()).unwrap();
poll.registry()
.register(&mut listener, Token(1), Interest::READABLE)
.unwrap();
drop(listener);
let mut events = Events::with_capacity(128);
poll.poll(&mut events, Some(Duration::from_millis(100)))
.unwrap();
for event in &events {
if event.token() == Token(1) {
panic!("recieved ready() on a closed TcpListener")
}
}
}
#[test]
fn bind_twice_bad() {
init();
let l1 = TcpListener::bind("127.0.0.1:0".parse().unwrap()).unwrap();
let addr = l1.local_addr().unwrap();
assert!(TcpListener::bind(addr).is_err());
}
#[test]
fn multiple_writes_immediate_success() {
init();
const N: usize = 16;
let listener = net::TcpListener::bind("127.0.0.1:0").unwrap();
let addr = listener.local_addr().unwrap();
let handle = thread::spawn(move || {
let mut s = listener.accept().unwrap().0;
let mut b = [0; 1024];
let mut amt = 0;
while amt < 1024 * N {
for byte in b.iter_mut() {
*byte = 0;
}
let n = s.read(&mut b).unwrap();
amt += n;
for byte in b[..n].iter() {
assert_eq!(*byte, 1);
}
}
});
let mut poll = Poll::new().unwrap();
let mut s = TcpStream::connect(addr).unwrap();
poll.registry()
.register(&mut s, Token(1), Interest::WRITABLE)
.unwrap();
let mut events = Events::with_capacity(16);
// Wait for our TCP stream to connect
'outer: loop {
poll.poll(&mut events, None).unwrap();
for event in events.iter() {
if event.token() == Token(1) && event.is_writable() {
break 'outer;
}
}
}
for _ in 0..N {
s.write_all(&[1; 1024]).unwrap();
}
handle.join().unwrap();
}
#[test]
fn connection_reset_by_peer() {
init();
let mut poll = Poll::new().unwrap();
let mut events = Events::with_capacity(16);
let mut buf = [0u8; 16];
// Create listener
let mut listener = TcpListener::bind("127.0.0.1:0".parse().unwrap()).unwrap();
let addr = listener.local_addr().unwrap();
// Connect client
let client = TcpSocket::new_v4().unwrap();
client.set_linger(Some(Duration::from_millis(0))).unwrap();
let mut client = client.connect(addr).unwrap();
// Register server
poll.registry()
.register(&mut listener, Token(0), Interest::READABLE)
.unwrap();
// Register interest in the client
poll.registry()
.register(
&mut client,
Token(1),
Interest::READABLE | Interest::WRITABLE,
)
.unwrap();
// Wait for listener to be ready
let mut server;
'outer: loop {
poll.poll(&mut events, None).unwrap();
for event in &events {
if event.token() == Token(0) {
match listener.accept() {
Ok((sock, _)) => {
server = sock;
break 'outer;
}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {}
Err(e) => panic!("unexpected error {:?}", e),
}
}
}
}
// Close the connection
drop(client);
// Wait a moment
sleep(Duration::from_millis(100));
// Register interest in the server socket
poll.registry()
.register(&mut server, Token(3), Interest::READABLE)
.unwrap();
loop {
poll.poll(&mut events, None).unwrap();
for event in &events {
if event.token() == Token(3) {
assert!(event.is_readable());
match server.read(&mut buf) {
Ok(0) | Err(_) => {}
Ok(x) => panic!("expected empty buffer but read {} bytes", x),
}
return;
}
}
}
}
#[test]
fn connect_error() {
init();
let mut poll = Poll::new().unwrap();
let mut events = Events::with_capacity(16);
// Pick a "random" port that shouldn't be in use.
let mut stream = match TcpStream::connect("127.0.0.1:38381".parse().unwrap()) {
Ok(l) => l,
Err(ref e) if e.kind() == io::ErrorKind::ConnectionRefused => {
// Connection failed synchronously. This is not a bug, but it
// unfortunately doesn't get us the code coverage we want.
return;
}
Err(e) => panic!("TcpStream::connect unexpected error {:?}", e),
};
poll.registry()
.register(&mut stream, Token(0), Interest::WRITABLE)
.unwrap();
'outer: loop {
poll.poll(&mut events, None).unwrap();
for event in &events {
if event.token() == Token(0) {
assert!(event.is_writable());
break 'outer;
}
}
}
assert!(stream.take_error().unwrap().is_some());
}
#[test]
fn write_error() {
init();
let mut poll = Poll::new().unwrap();
let mut events = Events::with_capacity(16);
let (tx, rx) = channel();
let listener = net::TcpListener::bind("127.0.0.1:0").unwrap();
let addr = listener.local_addr().unwrap();
let handle = thread::spawn(move || {
let (conn, _addr) = listener.accept().unwrap();
rx.recv().unwrap();
drop(conn);
});
let mut s = TcpStream::connect(addr).unwrap();
poll.registry()
.register(&mut s, Token(0), Interest::READABLE | Interest::WRITABLE)
.unwrap();
let mut wait_writable = || 'outer: loop {
poll.poll(&mut events, None).unwrap();
for event in &events {
if event.token() == Token(0) && event.is_writable() {
break 'outer;
}
}
};
wait_writable();
tx.send(()).unwrap();
handle.join().unwrap();
let buf = [0; 1024];
loop {
match s.write(&buf) {
Ok(_) => {}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => wait_writable(),
Err(e) => {
println!("good error: {}", e);
break;
}
}
}
}
macro_rules! wait {
($poll:ident, $ready:ident, $expect_read_closed: expr) => {{
use std::time::Instant;
let now = Instant::now();
let mut events = Events::with_capacity(16);
let mut found = false;
while !found {
if now.elapsed() > Duration::from_secs(5) {
panic!("not ready");
}
$poll
.poll(&mut events, Some(Duration::from_secs(1)))
.unwrap();
for event in &events {
if $expect_read_closed {
assert!(event.is_read_closed());
} else {
assert!(!event.is_read_closed() && !event.is_write_closed());
}
if event.token() == Token(0) && event.$ready() {
found = true;
break;
}
}
}
}};
}
#[test]
fn write_shutdown() {
init();
let mut poll = Poll::new().unwrap();
let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
let addr = listener.local_addr().unwrap();
let mut client = TcpStream::connect(addr).unwrap();
poll.registry()
.register(
&mut client,
Token(0),
Interest::READABLE.add(Interest::WRITABLE),
)
.unwrap();
let (socket, _) = listener.accept().unwrap();
wait!(poll, is_writable, false);
let mut events = Events::with_capacity(16);
// Polling should not have any events
poll.poll(&mut events, Some(Duration::from_millis(100)))
.unwrap();
let next = events.iter().next();
assert!(next.is_none());
println!("SHUTTING DOWN");
// Now, shutdown the write half of the socket.
socket.shutdown(Shutdown::Write).unwrap();
wait!(poll, is_readable, true);
}
struct MyHandler {
listener: TcpListener,
connected: TcpStream,
accepted: Option<TcpStream>,
shutdown: bool,
}
#[test]
fn local_addr_ready() {
init();
let addr = "127.0.0.1:0".parse().unwrap();
let mut server = TcpListener::bind(addr).unwrap();
let addr = server.local_addr().unwrap();
let mut poll = Poll::new().unwrap();
poll.registry()
.register(&mut server, LISTEN, Interest::READABLE)
.unwrap();
let mut sock = TcpStream::connect(addr).unwrap();
poll.registry()
.register(&mut sock, CLIENT, Interest::READABLE)
.unwrap();
let mut events = Events::with_capacity(1024);
let mut handler = MyHandler {
listener: server,
connected: sock,
accepted: None,
shutdown: false,
};
while !handler.shutdown {
poll.poll(&mut events, None).unwrap();
for event in &events {
match event.token() {
LISTEN => {
let mut sock = handler.listener.accept().unwrap().0;
poll.registry()
.register(&mut sock, SERVER, Interest::WRITABLE)
.unwrap();
handler.accepted = Some(sock);
}
SERVER => {
handler.accepted.as_ref().unwrap().peer_addr().unwrap();
handler.accepted.as_ref().unwrap().local_addr().unwrap();
let n = handler
.accepted
.as_mut()
.unwrap()
.write(&[1, 2, 3])
.unwrap();
assert_eq!(n, 3);
handler.accepted = None;
}
CLIENT => {
handler.connected.peer_addr().unwrap();
handler.connected.local_addr().unwrap();
handler.shutdown = true;
}
_ => panic!("unexpected token"),
}
}
}
}
#[test]
fn write_then_drop() {
init();
let mut a = TcpListener::bind("127.0.0.1:0".parse().unwrap()).unwrap();
let addr = a.local_addr().unwrap();
let mut s = TcpStream::connect(addr).unwrap();
let mut poll = Poll::new().unwrap();
poll.registry()
.register(&mut a, Token(1), Interest::READABLE)
.unwrap();
poll.registry()
.register(&mut s, Token(3), Interest::READABLE)
.unwrap();
let mut events = Events::with_capacity(1024);
while events.is_empty() {
poll.poll(&mut events, None).unwrap();
}
assert_eq!(events.iter().count(), 1);
assert_eq!(events.iter().next().unwrap().token(), Token(1));
let mut s2 = a.accept().unwrap().0;
poll.registry()
.register(&mut s2, Token(2), Interest::WRITABLE)
.unwrap();
let mut events = Events::with_capacity(1024);
while events.is_empty() {
poll.poll(&mut events, None).unwrap();
}
assert_eq!(events.iter().count(), 1);
assert_eq!(events.iter().next().unwrap().token(), Token(2));
s2.write_all(&[1, 2, 3, 4]).unwrap();
drop(s2);
poll.registry()
.reregister(&mut s, Token(3), Interest::READABLE)
.unwrap();
let mut events = Events::with_capacity(1024);
while events.is_empty() {
poll.poll(&mut events, None).unwrap();
}
assert_eq!(events.iter().count(), 1);
assert_eq!(events.iter().next().unwrap().token(), Token(3));
let mut buf = [0; 10];
expect_read!(s.read(&mut buf), &[1, 2, 3, 4]);
}
#[test]
fn write_then_deregister() {
init();
let mut a = TcpListener::bind("127.0.0.1:0".parse().unwrap()).unwrap();
let addr = a.local_addr().unwrap();
let mut s = TcpStream::connect(addr).unwrap();
let mut poll = Poll::new().unwrap();
poll.registry()
.register(&mut a, Token(1), Interest::READABLE)
.unwrap();
poll.registry()
.register(&mut s, Token(3), Interest::READABLE)
.unwrap();
let mut events = Events::with_capacity(1024);
while events.is_empty() {
poll.poll(&mut events, None).unwrap();
}
assert_eq!(events.iter().count(), 1);
assert_eq!(events.iter().next().unwrap().token(), Token(1));
let mut s2 = a.accept().unwrap().0;
poll.registry()
.register(&mut s2, Token(2), Interest::WRITABLE)
.unwrap();
let mut events = Events::with_capacity(1024);
while events.is_empty() {
poll.poll(&mut events, None).unwrap();
}
assert_eq!(events.iter().count(), 1);
assert_eq!(events.iter().next().unwrap().token(), Token(2));
s2.write_all(&[1, 2, 3, 4]).unwrap();
poll.registry().deregister(&mut s2).unwrap();
poll.registry()
.reregister(&mut s, Token(3), Interest::READABLE)
.unwrap();
let mut events = Events::with_capacity(1024);
while events.is_empty() {
poll.poll(&mut events, None).unwrap();
}
assert_eq!(events.iter().count(), 1);
assert_eq!(events.iter().next().unwrap().token(), Token(3));
let mut buf = [0; 10];
expect_read!(s.read(&mut buf), &[1, 2, 3, 4]);
}
const ID1: Token = Token(1);
const ID2: Token = Token(2);
const ID3: Token = Token(3);
#[test]
fn tcp_no_events_after_deregister() {
let (mut poll, mut events) = init_with_poll();
let mut listener = TcpListener::bind(any_local_address()).unwrap();
let addr = listener.local_addr().unwrap();
let mut stream = TcpStream::connect(addr).unwrap();
poll.registry()
.register(&mut listener, ID1, Interest::READABLE)
.unwrap();
poll.registry()
.register(&mut stream, ID3, Interest::READABLE)
.unwrap();
expect_events(
&mut poll,
&mut events,
vec![ExpectEvent::new(ID1, Interest::READABLE)],
);
let (mut stream2, peer_address) = listener.accept().expect("unable to accept connection");
assert!(peer_address.ip().is_loopback());
assert_eq!(stream2.peer_addr().unwrap(), peer_address);
assert_eq!(stream2.local_addr().unwrap(), addr);
poll.registry()
.register(&mut stream2, ID2, Interest::WRITABLE)
.unwrap();
expect_events(
&mut poll,
&mut events,
vec![ExpectEvent::new(ID2, Interest::WRITABLE)],
);
stream2.write_all(&[1, 2, 3, 4]).unwrap();
expect_events(
&mut poll,
&mut events,
vec![ExpectEvent::new(ID3, Interest::READABLE)],
);
poll.registry().deregister(&mut listener).unwrap();
poll.registry().deregister(&mut stream).unwrap();
poll.registry().deregister(&mut stream2).unwrap();
expect_no_events(&mut poll, &mut events);
let mut buf = [0; 10];
expect_read!(stream.read(&mut buf), &[1, 2, 3, 4]);
checked_write!(stream2.write(&[1, 2, 3, 4]));
expect_no_events(&mut poll, &mut events);
sleep(Duration::from_millis(200));
expect_read!(stream.read(&mut buf), &[1, 2, 3, 4]);
expect_no_events(&mut poll, &mut events);
}