| use std::cmp; |
| use std::io::prelude::*; |
| use std::io; |
| use std::net; |
| use std::sync::mpsc::channel; |
| use std::thread; |
| use std::time::Duration; |
| |
| use net2::{self, TcpStreamExt}; |
| |
| use {TryRead, TryWrite}; |
| use mio::{Token, Ready, PollOpt, Poll, Events}; |
| use iovec::IoVec; |
| use mio::net::{TcpListener, TcpStream}; |
| |
| #[test] |
| fn accept() { |
| struct H { hit: bool, listener: TcpListener, shutdown: bool } |
| |
| let l = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); |
| let addr = l.local_addr().unwrap(); |
| |
| let t = thread::spawn(move || { |
| net::TcpStream::connect(&addr).unwrap(); |
| }); |
| |
| let poll = Poll::new().unwrap(); |
| |
| poll.register(&l, Token(1), Ready::readable(), PollOpt::edge()).unwrap(); |
| |
| let mut events = Events::with_capacity(128); |
| |
| let mut h = H { hit: false, listener: l, shutdown: false }; |
| while !h.shutdown { |
| poll.poll(&mut events, None).unwrap(); |
| |
| for event in &events { |
| h.hit = true; |
| assert_eq!(event.token(), Token(1)); |
| assert!(event.readiness().is_readable()); |
| assert!(h.listener.accept().is_ok()); |
| h.shutdown = true; |
| } |
| } |
| assert!(h.hit); |
| assert!(h.listener.accept().unwrap_err().kind() == io::ErrorKind::WouldBlock); |
| t.join().unwrap(); |
| } |
| |
| #[test] |
| fn connect() { |
| struct H { hit: u32, shutdown: bool } |
| |
| let l = net::TcpListener::bind("127.0.0.1:0").unwrap(); |
| let addr = l.local_addr().unwrap(); |
| |
| let (tx, rx) = channel(); |
| let (tx2, rx2) = channel(); |
| let t = thread::spawn(move || { |
| let s = l.accept().unwrap(); |
| rx.recv().unwrap(); |
| drop(s); |
| tx2.send(()).unwrap(); |
| }); |
| |
| let poll = Poll::new().unwrap(); |
| let s = TcpStream::connect(&addr).unwrap(); |
| |
| poll.register(&s, Token(1), Ready::readable() | Ready::writable(), PollOpt::edge()).unwrap(); |
| |
| let mut events = Events::with_capacity(128); |
| |
| let mut h = H { hit: 0, shutdown: false }; |
| while !h.shutdown { |
| poll.poll(&mut events, None).unwrap(); |
| |
| for event in &events { |
| assert_eq!(event.token(), Token(1)); |
| match h.hit { |
| 0 => assert!(event.readiness().is_writable()), |
| 1 => assert!(event.readiness().is_readable()), |
| _ => panic!(), |
| } |
| h.hit += 1; |
| h.shutdown = true; |
| } |
| } |
| assert_eq!(h.hit, 1); |
| tx.send(()).unwrap(); |
| rx2.recv().unwrap(); |
| h.shutdown = false; |
| while !h.shutdown { |
| poll.poll(&mut events, None).unwrap(); |
| |
| for event in &events { |
| assert_eq!(event.token(), Token(1)); |
| match h.hit { |
| 0 => assert!(event.readiness().is_writable()), |
| 1 => assert!(event.readiness().is_readable()), |
| _ => panic!(), |
| } |
| h.hit += 1; |
| h.shutdown = true; |
| } |
| } |
| assert_eq!(h.hit, 2); |
| t.join().unwrap(); |
| } |
| |
| #[test] |
| fn read() { |
| const N: usize = 16 * 1024 * 1024; |
| struct H { amt: usize, socket: TcpStream, shutdown: bool } |
| |
| let l = net::TcpListener::bind("127.0.0.1:0").unwrap(); |
| let addr = l.local_addr().unwrap(); |
| |
| let t = thread::spawn(move || { |
| let mut s = l.accept().unwrap().0; |
| let b = [0; 1024]; |
| let mut amt = 0; |
| while amt < N { |
| amt += s.write(&b).unwrap(); |
| } |
| }); |
| |
| let poll = Poll::new().unwrap(); |
| let s = TcpStream::connect(&addr).unwrap(); |
| |
| poll.register(&s, Token(1), Ready::readable(), PollOpt::edge()).unwrap(); |
| |
| let mut events = Events::with_capacity(128); |
| |
| let mut h = H { amt: 0, socket: s, shutdown: false }; |
| while !h.shutdown { |
| poll.poll(&mut events, None).unwrap(); |
| |
| for event in &events { |
| assert_eq!(event.token(), Token(1)); |
| let mut b = [0; 1024]; |
| loop { |
| if let Some(amt) = h.socket.try_read(&mut b).unwrap() { |
| h.amt += amt; |
| } else { |
| break |
| } |
| if h.amt >= N { |
| h.shutdown = true; |
| break |
| } |
| } |
| } |
| } |
| t.join().unwrap(); |
| } |
| |
| #[test] |
| fn read_bufs() { |
| const N: usize = 16 * 1024 * 1024; |
| |
| let l = net::TcpListener::bind("127.0.0.1:0").unwrap(); |
| let addr = l.local_addr().unwrap(); |
| |
| let t = thread::spawn(move || { |
| let mut s = l.accept().unwrap().0; |
| let b = [1; 1024]; |
| let mut amt = 0; |
| while amt < N { |
| amt += s.write(&b).unwrap(); |
| } |
| }); |
| |
| let poll = Poll::new().unwrap(); |
| let mut events = Events::with_capacity(128); |
| |
| let s = TcpStream::connect(&addr).unwrap(); |
| |
| poll.register(&s, Token(1), Ready::readable(), PollOpt::level()).unwrap(); |
| |
| let b1 = &mut [0; 10][..]; |
| let b2 = &mut [0; 383][..]; |
| let b3 = &mut [0; 28][..]; |
| let b4 = &mut [0; 8][..]; |
| let b5 = &mut [0; 128][..]; |
| let mut b: [&mut IoVec; 5] = [ |
| b1.into(), |
| b2.into(), |
| b3.into(), |
| b4.into(), |
| b5.into(), |
| ]; |
| |
| let mut so_far = 0; |
| loop { |
| for buf in b.iter_mut() { |
| for byte in buf.as_mut_bytes() { |
| *byte = 0; |
| } |
| } |
| |
| poll.poll(&mut events, None).unwrap(); |
| |
| match s.read_bufs(&mut b) { |
| Ok(0) => { |
| assert_eq!(so_far, N); |
| break |
| } |
| Ok(mut n) => { |
| so_far += n; |
| for buf in b.iter() { |
| let buf = buf.as_bytes(); |
| for byte in buf[..cmp::min(n, buf.len())].iter() { |
| assert_eq!(*byte, 1); |
| } |
| n = n.saturating_sub(buf.len()); |
| if n == 0 { |
| break |
| } |
| } |
| assert_eq!(n, 0); |
| } |
| Err(e) => assert_eq!(e.kind(), io::ErrorKind::WouldBlock), |
| } |
| } |
| |
| t.join().unwrap(); |
| } |
| |
| #[test] |
| fn write() { |
| const N: usize = 16 * 1024 * 1024; |
| struct H { amt: usize, socket: TcpStream, shutdown: bool } |
| |
| let l = net::TcpListener::bind("127.0.0.1:0").unwrap(); |
| let addr = l.local_addr().unwrap(); |
| |
| let t = thread::spawn(move || { |
| let mut s = l.accept().unwrap().0; |
| let mut b = [0; 1024]; |
| let mut amt = 0; |
| while amt < N { |
| amt += s.read(&mut b).unwrap(); |
| } |
| }); |
| |
| let poll = Poll::new().unwrap(); |
| let s = TcpStream::connect(&addr).unwrap(); |
| |
| poll.register(&s, Token(1), Ready::writable(), PollOpt::edge()).unwrap(); |
| |
| let mut events = Events::with_capacity(128); |
| |
| let mut h = H { amt: 0, socket: s, shutdown: false }; |
| while !h.shutdown { |
| poll.poll(&mut events, None).unwrap(); |
| |
| for event in &events { |
| assert_eq!(event.token(), Token(1)); |
| let b = [0; 1024]; |
| loop { |
| if let Some(amt) = h.socket.try_write(&b).unwrap() { |
| h.amt += amt; |
| } else { |
| break |
| } |
| if h.amt >= N { |
| h.shutdown = true; |
| break |
| } |
| } |
| } |
| } |
| t.join().unwrap(); |
| } |
| |
| #[test] |
| fn write_bufs() { |
| const N: usize = 16 * 1024 * 1024; |
| |
| let l = net::TcpListener::bind("127.0.0.1:0").unwrap(); |
| let addr = l.local_addr().unwrap(); |
| |
| let t = thread::spawn(move || { |
| let mut s = l.accept().unwrap().0; |
| let mut b = [0; 1024]; |
| let mut amt = 0; |
| while amt < N { |
| for byte in b.iter_mut() { |
| *byte = 0; |
| } |
| let n = s.read(&mut b).unwrap(); |
| amt += n; |
| for byte in b[..n].iter() { |
| assert_eq!(*byte, 1); |
| } |
| } |
| }); |
| |
| let poll = Poll::new().unwrap(); |
| let mut events = Events::with_capacity(128); |
| let s = TcpStream::connect(&addr).unwrap(); |
| poll.register(&s, Token(1), Ready::writable(), PollOpt::level()).unwrap(); |
| |
| let b1 = &[1; 10][..]; |
| let b2 = &[1; 383][..]; |
| let b3 = &[1; 28][..]; |
| let b4 = &[1; 8][..]; |
| let b5 = &[1; 128][..]; |
| let b: [&IoVec; 5] = [ |
| b1.into(), |
| b2.into(), |
| b3.into(), |
| b4.into(), |
| b5.into(), |
| ]; |
| |
| let mut so_far = 0; |
| while so_far < N { |
| poll.poll(&mut events, None).unwrap(); |
| |
| match s.write_bufs(&b) { |
| Ok(n) => so_far += n, |
| Err(e) => assert_eq!(e.kind(), io::ErrorKind::WouldBlock), |
| } |
| } |
| |
| t.join().unwrap(); |
| } |
| |
| #[test] |
| fn connect_then_close() { |
| struct H { listener: TcpListener, shutdown: bool } |
| |
| let poll = Poll::new().unwrap(); |
| let l = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); |
| let s = TcpStream::connect(&l.local_addr().unwrap()).unwrap(); |
| |
| poll.register(&l, Token(1), Ready::readable(), PollOpt::edge()).unwrap(); |
| poll.register(&s, Token(2), Ready::readable(), PollOpt::edge()).unwrap(); |
| |
| let mut events = Events::with_capacity(128); |
| |
| let mut h = H { listener: l, shutdown: false }; |
| while !h.shutdown { |
| poll.poll(&mut events, None).unwrap(); |
| |
| for event in &events { |
| if event.token() == Token(1) { |
| let s = h.listener.accept().unwrap().0; |
| poll.register(&s, Token(3), Ready::readable() | Ready::writable(), |
| PollOpt::edge()).unwrap(); |
| drop(s); |
| } else if event.token() == Token(2) { |
| h.shutdown = true; |
| } |
| } |
| } |
| } |
| |
| #[test] |
| fn listen_then_close() { |
| let poll = Poll::new().unwrap(); |
| let l = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); |
| |
| poll.register(&l, Token(1), Ready::readable(), PollOpt::edge()).unwrap(); |
| drop(l); |
| |
| let mut events = Events::with_capacity(128); |
| |
| poll.poll(&mut events, Some(Duration::from_millis(100))).unwrap(); |
| |
| for event in &events { |
| if event.token() == Token(1) { |
| panic!("recieved ready() on a closed TcpListener") |
| } |
| } |
| } |
| |
| fn assert_send<T: Send>() { |
| } |
| |
| fn assert_sync<T: Sync>() { |
| } |
| |
| #[test] |
| fn test_tcp_sockets_are_send() { |
| assert_send::<TcpListener>(); |
| assert_send::<TcpStream>(); |
| assert_sync::<TcpListener>(); |
| assert_sync::<TcpStream>(); |
| } |
| |
| #[test] |
| fn bind_twice_bad() { |
| let l1 = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); |
| let addr = l1.local_addr().unwrap(); |
| assert!(TcpListener::bind(&addr).is_err()); |
| } |
| |
| #[test] |
| fn multiple_writes_immediate_success() { |
| const N: usize = 16; |
| let l = net::TcpListener::bind("127.0.0.1:0").unwrap(); |
| let addr = l.local_addr().unwrap(); |
| |
| let t = thread::spawn(move || { |
| let mut s = l.accept().unwrap().0; |
| let mut b = [0; 1024]; |
| let mut amt = 0; |
| while amt < 1024*N { |
| for byte in b.iter_mut() { |
| *byte = 0; |
| } |
| let n = s.read(&mut b).unwrap(); |
| amt += n; |
| for byte in b[..n].iter() { |
| assert_eq!(*byte, 1); |
| } |
| } |
| }); |
| |
| let poll = Poll::new().unwrap(); |
| let mut s = TcpStream::connect(&addr).unwrap(); |
| poll.register(&s, Token(1), Ready::writable(), PollOpt::level()).unwrap(); |
| let mut events = Events::with_capacity(16); |
| |
| // Wait for our TCP stream to connect |
| 'outer: loop { |
| poll.poll(&mut events, None).unwrap(); |
| for event in events.iter() { |
| if event.token() == Token(1) && event.readiness().is_writable() { |
| break 'outer |
| } |
| } |
| } |
| |
| for _ in 0..N { |
| s.write(&[1; 1024]).unwrap(); |
| } |
| |
| t.join().unwrap(); |
| } |
| |
| #[test] |
| fn connection_reset_by_peer() { |
| let poll = Poll::new().unwrap(); |
| let mut events = Events::with_capacity(16); |
| let mut buf = [0u8; 16]; |
| |
| // Create listener |
| let l = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); |
| let addr = l.local_addr().unwrap(); |
| |
| // Connect client |
| let client = net2::TcpBuilder::new_v4().unwrap() |
| .to_tcp_stream().unwrap(); |
| |
| client.set_linger(Some(Duration::from_millis(0))).unwrap(); |
| client.connect(&addr).unwrap(); |
| |
| // Convert to Mio stream |
| let client = TcpStream::from_stream(client).unwrap(); |
| |
| // Register server |
| poll.register(&l, Token(0), Ready::readable(), PollOpt::edge()).unwrap(); |
| |
| // Register interest in the client |
| poll.register(&client, Token(1), Ready::readable() | Ready::writable(), PollOpt::edge()).unwrap(); |
| |
| // Wait for listener to be ready |
| let mut server; |
| 'outer: |
| loop { |
| poll.poll(&mut events, None).unwrap(); |
| |
| for event in &events { |
| if event.token() == Token(0) { |
| match l.accept() { |
| Ok((sock, _)) => { |
| server = sock; |
| break 'outer; |
| } |
| Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {} |
| Err(e) => panic!("unexpected error {:?}", e), |
| } |
| } |
| } |
| } |
| |
| // Close the connection |
| drop(client); |
| |
| // Wait a moment |
| thread::sleep(Duration::from_millis(100)); |
| |
| // Register interest in the server socket |
| poll.register(&server, Token(3), Ready::readable(), PollOpt::edge()).unwrap(); |
| |
| |
| loop { |
| poll.poll(&mut events, None).unwrap(); |
| |
| for event in &events { |
| if event.token() == Token(3) { |
| assert!(event.readiness().is_readable()); |
| |
| match server.read(&mut buf) { |
| Ok(0) | |
| Err(_) => {}, |
| |
| Ok(x) => panic!("expected empty buffer but read {} bytes", x), |
| } |
| return; |
| } |
| } |
| } |
| |
| } |
| |
| #[test] |
| #[cfg_attr(target_os = "fuchsia", ignore)] |
| fn connect_error() { |
| let poll = Poll::new().unwrap(); |
| let mut events = Events::with_capacity(16); |
| |
| // Pick a "random" port that shouldn't be in use. |
| let l = TcpStream::connect(&"127.0.0.1:38381".parse().unwrap()).unwrap(); |
| poll.register(&l, Token(0), Ready::writable(), PollOpt::edge()).unwrap(); |
| |
| 'outer: |
| loop { |
| poll.poll(&mut events, None).unwrap(); |
| |
| for event in &events { |
| if event.token() == Token(0) { |
| assert!(event.readiness().is_writable()); |
| break 'outer |
| } |
| } |
| } |
| |
| assert!(l.take_error().unwrap().is_some()); |
| } |
| |
| #[test] |
| fn write_error() { |
| let poll = Poll::new().unwrap(); |
| let mut events = Events::with_capacity(16); |
| let (tx, rx) = channel(); |
| |
| let listener = net::TcpListener::bind("127.0.0.1:0").unwrap(); |
| let addr = listener.local_addr().unwrap(); |
| let t = thread::spawn(move || { |
| let (conn, _addr) = listener.accept().unwrap(); |
| rx.recv().unwrap(); |
| drop(conn); |
| }); |
| |
| let mut s = TcpStream::connect(&addr).unwrap(); |
| poll.register(&s, |
| Token(0), |
| Ready::readable() | Ready::writable(), |
| PollOpt::edge()).unwrap(); |
| |
| let mut wait_writable = || { |
| 'outer: |
| loop { |
| poll.poll(&mut events, None).unwrap(); |
| |
| for event in &events { |
| if event.token() == Token(0) && event.readiness().is_writable() { |
| break 'outer |
| } |
| } |
| } |
| }; |
| |
| wait_writable(); |
| |
| tx.send(()).unwrap(); |
| t.join().unwrap(); |
| |
| let buf = [0; 1024]; |
| loop { |
| match s.write(&buf) { |
| Ok(_) => {} |
| Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { |
| wait_writable() |
| } |
| Err(e) => { |
| println!("good error: {}", e); |
| break |
| } |
| } |
| } |
| } |