blob: 24a816a04b6abf9983de730e7ecc497452ffc9f5 [file] [log] [blame]
use std::cmp;
use std::io::prelude::*;
use std::io;
use std::net;
use std::sync::mpsc::channel;
use std::thread;
use std::time::Duration;
use net2::{self, TcpStreamExt};
use {TryRead, TryWrite};
use mio::{Token, Ready, PollOpt, Poll, Events};
use iovec::IoVec;
use mio::net::{TcpListener, TcpStream};
#[test]
fn accept() {
struct H { hit: bool, listener: TcpListener, shutdown: bool }
let l = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap();
let addr = l.local_addr().unwrap();
let t = thread::spawn(move || {
net::TcpStream::connect(&addr).unwrap();
});
let poll = Poll::new().unwrap();
poll.register(&l, Token(1), Ready::readable(), PollOpt::edge()).unwrap();
let mut events = Events::with_capacity(128);
let mut h = H { hit: false, listener: l, shutdown: false };
while !h.shutdown {
poll.poll(&mut events, None).unwrap();
for event in &events {
h.hit = true;
assert_eq!(event.token(), Token(1));
assert!(event.readiness().is_readable());
assert!(h.listener.accept().is_ok());
h.shutdown = true;
}
}
assert!(h.hit);
assert!(h.listener.accept().unwrap_err().kind() == io::ErrorKind::WouldBlock);
t.join().unwrap();
}
#[test]
fn connect() {
struct H { hit: u32, shutdown: bool }
let l = net::TcpListener::bind("127.0.0.1:0").unwrap();
let addr = l.local_addr().unwrap();
let (tx, rx) = channel();
let (tx2, rx2) = channel();
let t = thread::spawn(move || {
let s = l.accept().unwrap();
rx.recv().unwrap();
drop(s);
tx2.send(()).unwrap();
});
let poll = Poll::new().unwrap();
let s = TcpStream::connect(&addr).unwrap();
poll.register(&s, Token(1), Ready::readable() | Ready::writable(), PollOpt::edge()).unwrap();
let mut events = Events::with_capacity(128);
let mut h = H { hit: 0, shutdown: false };
while !h.shutdown {
poll.poll(&mut events, None).unwrap();
for event in &events {
assert_eq!(event.token(), Token(1));
match h.hit {
0 => assert!(event.readiness().is_writable()),
1 => assert!(event.readiness().is_readable()),
_ => panic!(),
}
h.hit += 1;
h.shutdown = true;
}
}
assert_eq!(h.hit, 1);
tx.send(()).unwrap();
rx2.recv().unwrap();
h.shutdown = false;
while !h.shutdown {
poll.poll(&mut events, None).unwrap();
for event in &events {
assert_eq!(event.token(), Token(1));
match h.hit {
0 => assert!(event.readiness().is_writable()),
1 => assert!(event.readiness().is_readable()),
_ => panic!(),
}
h.hit += 1;
h.shutdown = true;
}
}
assert_eq!(h.hit, 2);
t.join().unwrap();
}
#[test]
fn read() {
const N: usize = 16 * 1024 * 1024;
struct H { amt: usize, socket: TcpStream, shutdown: bool }
let l = net::TcpListener::bind("127.0.0.1:0").unwrap();
let addr = l.local_addr().unwrap();
let t = thread::spawn(move || {
let mut s = l.accept().unwrap().0;
let b = [0; 1024];
let mut amt = 0;
while amt < N {
amt += s.write(&b).unwrap();
}
});
let poll = Poll::new().unwrap();
let s = TcpStream::connect(&addr).unwrap();
poll.register(&s, Token(1), Ready::readable(), PollOpt::edge()).unwrap();
let mut events = Events::with_capacity(128);
let mut h = H { amt: 0, socket: s, shutdown: false };
while !h.shutdown {
poll.poll(&mut events, None).unwrap();
for event in &events {
assert_eq!(event.token(), Token(1));
let mut b = [0; 1024];
loop {
if let Some(amt) = h.socket.try_read(&mut b).unwrap() {
h.amt += amt;
} else {
break
}
if h.amt >= N {
h.shutdown = true;
break
}
}
}
}
t.join().unwrap();
}
#[test]
fn read_bufs() {
const N: usize = 16 * 1024 * 1024;
let l = net::TcpListener::bind("127.0.0.1:0").unwrap();
let addr = l.local_addr().unwrap();
let t = thread::spawn(move || {
let mut s = l.accept().unwrap().0;
let b = [1; 1024];
let mut amt = 0;
while amt < N {
amt += s.write(&b).unwrap();
}
});
let poll = Poll::new().unwrap();
let mut events = Events::with_capacity(128);
let s = TcpStream::connect(&addr).unwrap();
poll.register(&s, Token(1), Ready::readable(), PollOpt::level()).unwrap();
let b1 = &mut [0; 10][..];
let b2 = &mut [0; 383][..];
let b3 = &mut [0; 28][..];
let b4 = &mut [0; 8][..];
let b5 = &mut [0; 128][..];
let mut b: [&mut IoVec; 5] = [
b1.into(),
b2.into(),
b3.into(),
b4.into(),
b5.into(),
];
let mut so_far = 0;
loop {
for buf in b.iter_mut() {
for byte in buf.as_mut_bytes() {
*byte = 0;
}
}
poll.poll(&mut events, None).unwrap();
match s.read_bufs(&mut b) {
Ok(0) => {
assert_eq!(so_far, N);
break
}
Ok(mut n) => {
so_far += n;
for buf in b.iter() {
let buf = buf.as_bytes();
for byte in buf[..cmp::min(n, buf.len())].iter() {
assert_eq!(*byte, 1);
}
n = n.saturating_sub(buf.len());
if n == 0 {
break
}
}
assert_eq!(n, 0);
}
Err(e) => assert_eq!(e.kind(), io::ErrorKind::WouldBlock),
}
}
t.join().unwrap();
}
#[test]
fn write() {
const N: usize = 16 * 1024 * 1024;
struct H { amt: usize, socket: TcpStream, shutdown: bool }
let l = net::TcpListener::bind("127.0.0.1:0").unwrap();
let addr = l.local_addr().unwrap();
let t = thread::spawn(move || {
let mut s = l.accept().unwrap().0;
let mut b = [0; 1024];
let mut amt = 0;
while amt < N {
amt += s.read(&mut b).unwrap();
}
});
let poll = Poll::new().unwrap();
let s = TcpStream::connect(&addr).unwrap();
poll.register(&s, Token(1), Ready::writable(), PollOpt::edge()).unwrap();
let mut events = Events::with_capacity(128);
let mut h = H { amt: 0, socket: s, shutdown: false };
while !h.shutdown {
poll.poll(&mut events, None).unwrap();
for event in &events {
assert_eq!(event.token(), Token(1));
let b = [0; 1024];
loop {
if let Some(amt) = h.socket.try_write(&b).unwrap() {
h.amt += amt;
} else {
break
}
if h.amt >= N {
h.shutdown = true;
break
}
}
}
}
t.join().unwrap();
}
#[test]
fn write_bufs() {
const N: usize = 16 * 1024 * 1024;
let l = net::TcpListener::bind("127.0.0.1:0").unwrap();
let addr = l.local_addr().unwrap();
let t = thread::spawn(move || {
let mut s = l.accept().unwrap().0;
let mut b = [0; 1024];
let mut amt = 0;
while amt < N {
for byte in b.iter_mut() {
*byte = 0;
}
let n = s.read(&mut b).unwrap();
amt += n;
for byte in b[..n].iter() {
assert_eq!(*byte, 1);
}
}
});
let poll = Poll::new().unwrap();
let mut events = Events::with_capacity(128);
let s = TcpStream::connect(&addr).unwrap();
poll.register(&s, Token(1), Ready::writable(), PollOpt::level()).unwrap();
let b1 = &[1; 10][..];
let b2 = &[1; 383][..];
let b3 = &[1; 28][..];
let b4 = &[1; 8][..];
let b5 = &[1; 128][..];
let b: [&IoVec; 5] = [
b1.into(),
b2.into(),
b3.into(),
b4.into(),
b5.into(),
];
let mut so_far = 0;
while so_far < N {
poll.poll(&mut events, None).unwrap();
match s.write_bufs(&b) {
Ok(n) => so_far += n,
Err(e) => assert_eq!(e.kind(), io::ErrorKind::WouldBlock),
}
}
t.join().unwrap();
}
#[test]
fn connect_then_close() {
struct H { listener: TcpListener, shutdown: bool }
let poll = Poll::new().unwrap();
let l = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap();
let s = TcpStream::connect(&l.local_addr().unwrap()).unwrap();
poll.register(&l, Token(1), Ready::readable(), PollOpt::edge()).unwrap();
poll.register(&s, Token(2), Ready::readable(), PollOpt::edge()).unwrap();
let mut events = Events::with_capacity(128);
let mut h = H { listener: l, shutdown: false };
while !h.shutdown {
poll.poll(&mut events, None).unwrap();
for event in &events {
if event.token() == Token(1) {
let s = h.listener.accept().unwrap().0;
poll.register(&s, Token(3), Ready::readable() | Ready::writable(),
PollOpt::edge()).unwrap();
drop(s);
} else if event.token() == Token(2) {
h.shutdown = true;
}
}
}
}
#[test]
fn listen_then_close() {
let poll = Poll::new().unwrap();
let l = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap();
poll.register(&l, Token(1), Ready::readable(), PollOpt::edge()).unwrap();
drop(l);
let mut events = Events::with_capacity(128);
poll.poll(&mut events, Some(Duration::from_millis(100))).unwrap();
for event in &events {
if event.token() == Token(1) {
panic!("recieved ready() on a closed TcpListener")
}
}
}
fn assert_send<T: Send>() {
}
fn assert_sync<T: Sync>() {
}
#[test]
fn test_tcp_sockets_are_send() {
assert_send::<TcpListener>();
assert_send::<TcpStream>();
assert_sync::<TcpListener>();
assert_sync::<TcpStream>();
}
#[test]
fn bind_twice_bad() {
let l1 = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap();
let addr = l1.local_addr().unwrap();
assert!(TcpListener::bind(&addr).is_err());
}
#[test]
fn multiple_writes_immediate_success() {
const N: usize = 16;
let l = net::TcpListener::bind("127.0.0.1:0").unwrap();
let addr = l.local_addr().unwrap();
let t = thread::spawn(move || {
let mut s = l.accept().unwrap().0;
let mut b = [0; 1024];
let mut amt = 0;
while amt < 1024*N {
for byte in b.iter_mut() {
*byte = 0;
}
let n = s.read(&mut b).unwrap();
amt += n;
for byte in b[..n].iter() {
assert_eq!(*byte, 1);
}
}
});
let poll = Poll::new().unwrap();
let mut s = TcpStream::connect(&addr).unwrap();
poll.register(&s, Token(1), Ready::writable(), PollOpt::level()).unwrap();
let mut events = Events::with_capacity(16);
// Wait for our TCP stream to connect
'outer: loop {
poll.poll(&mut events, None).unwrap();
for event in events.iter() {
if event.token() == Token(1) && event.readiness().is_writable() {
break 'outer
}
}
}
for _ in 0..N {
s.write(&[1; 1024]).unwrap();
}
t.join().unwrap();
}
#[test]
fn connection_reset_by_peer() {
let poll = Poll::new().unwrap();
let mut events = Events::with_capacity(16);
let mut buf = [0u8; 16];
// Create listener
let l = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap();
let addr = l.local_addr().unwrap();
// Connect client
let client = net2::TcpBuilder::new_v4().unwrap()
.to_tcp_stream().unwrap();
client.set_linger(Some(Duration::from_millis(0))).unwrap();
client.connect(&addr).unwrap();
// Convert to Mio stream
let client = TcpStream::from_stream(client).unwrap();
// Register server
poll.register(&l, Token(0), Ready::readable(), PollOpt::edge()).unwrap();
// Register interest in the client
poll.register(&client, Token(1), Ready::readable() | Ready::writable(), PollOpt::edge()).unwrap();
// Wait for listener to be ready
let mut server;
'outer:
loop {
poll.poll(&mut events, None).unwrap();
for event in &events {
if event.token() == Token(0) {
match l.accept() {
Ok((sock, _)) => {
server = sock;
break 'outer;
}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {}
Err(e) => panic!("unexpected error {:?}", e),
}
}
}
}
// Close the connection
drop(client);
// Wait a moment
thread::sleep(Duration::from_millis(100));
// Register interest in the server socket
poll.register(&server, Token(3), Ready::readable(), PollOpt::edge()).unwrap();
loop {
poll.poll(&mut events, None).unwrap();
for event in &events {
if event.token() == Token(3) {
assert!(event.readiness().is_readable());
match server.read(&mut buf) {
Ok(0) |
Err(_) => {},
Ok(x) => panic!("expected empty buffer but read {} bytes", x),
}
return;
}
}
}
}
#[test]
#[cfg_attr(target_os = "fuchsia", ignore)]
fn connect_error() {
let poll = Poll::new().unwrap();
let mut events = Events::with_capacity(16);
// Pick a "random" port that shouldn't be in use.
let l = TcpStream::connect(&"127.0.0.1:38381".parse().unwrap()).unwrap();
poll.register(&l, Token(0), Ready::writable(), PollOpt::edge()).unwrap();
'outer:
loop {
poll.poll(&mut events, None).unwrap();
for event in &events {
if event.token() == Token(0) {
assert!(event.readiness().is_writable());
break 'outer
}
}
}
assert!(l.take_error().unwrap().is_some());
}
#[test]
fn write_error() {
let poll = Poll::new().unwrap();
let mut events = Events::with_capacity(16);
let (tx, rx) = channel();
let listener = net::TcpListener::bind("127.0.0.1:0").unwrap();
let addr = listener.local_addr().unwrap();
let t = thread::spawn(move || {
let (conn, _addr) = listener.accept().unwrap();
rx.recv().unwrap();
drop(conn);
});
let mut s = TcpStream::connect(&addr).unwrap();
poll.register(&s,
Token(0),
Ready::readable() | Ready::writable(),
PollOpt::edge()).unwrap();
let mut wait_writable = || {
'outer:
loop {
poll.poll(&mut events, None).unwrap();
for event in &events {
if event.token() == Token(0) && event.readiness().is_writable() {
break 'outer
}
}
}
};
wait_writable();
tx.send(()).unwrap();
t.join().unwrap();
let buf = [0; 1024];
loop {
match s.write(&buf) {
Ok(_) => {}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
wait_writable()
}
Err(e) => {
println!("good error: {}", e);
break
}
}
}
}