| use {expect_events, sleep_ms}; |
| use mio::{channel, Events, Poll, PollOpt, Ready, Token}; |
| use mio::event::Event; |
| use std::sync::mpsc::TryRecvError; |
| use std::thread; |
| use std::time::Duration; |
| |
| #[test] |
| pub fn test_poll_channel_edge() { |
| let poll = Poll::new().unwrap(); |
| let mut events = Events::with_capacity(1024); |
| let (tx, rx) = channel::channel(); |
| |
| poll.register(&rx, Token(123), Ready::readable(), PollOpt::edge()).unwrap(); |
| |
| // Wait, but nothing should happen |
| let num = poll.poll(&mut events, Some(Duration::from_millis(300))).unwrap(); |
| assert_eq!(0, num); |
| |
| // Push the value |
| tx.send("hello").unwrap(); |
| |
| // Polling will contain the event |
| let num = poll.poll(&mut events, Some(Duration::from_millis(300))).unwrap(); |
| assert_eq!(1, num); |
| |
| let event = events.get(0).unwrap(); |
| assert_eq!(event.token(), Token(123)); |
| assert_eq!(event.readiness(), Ready::readable()); |
| |
| // Poll again and there should be no events |
| let num = poll.poll(&mut events, Some(Duration::from_millis(300))).unwrap(); |
| assert_eq!(0, num); |
| |
| // Read the value |
| assert_eq!("hello", rx.try_recv().unwrap()); |
| |
| // Poll again, nothing |
| let num = poll.poll(&mut events, Some(Duration::from_millis(300))).unwrap(); |
| assert_eq!(0, num); |
| |
| // Push a value |
| tx.send("goodbye").unwrap(); |
| |
| // Have an event |
| let num = poll.poll(&mut events, Some(Duration::from_millis(300))).unwrap(); |
| assert_eq!(1, num); |
| |
| let event = events.get(0).unwrap(); |
| assert_eq!(event.token(), Token(123)); |
| assert_eq!(event.readiness(), Ready::readable()); |
| |
| // Read the value |
| rx.try_recv().unwrap(); |
| |
| // Drop the sender half |
| drop(tx); |
| |
| let num = poll.poll(&mut events, Some(Duration::from_millis(300))).unwrap(); |
| assert_eq!(1, num); |
| |
| let event = events.get(0).unwrap(); |
| assert_eq!(event.token(), Token(123)); |
| assert_eq!(event.readiness(), Ready::readable()); |
| |
| match rx.try_recv() { |
| Err(TryRecvError::Disconnected) => {} |
| no => panic!("unexpected value {:?}", no), |
| } |
| |
| } |
| |
| #[test] |
| pub fn test_poll_channel_oneshot() { |
| let poll = Poll::new().unwrap(); |
| let mut events = Events::with_capacity(1024); |
| let (tx, rx) = channel::channel(); |
| |
| poll.register(&rx, Token(123), Ready::readable(), PollOpt::edge() | PollOpt::oneshot()).unwrap(); |
| |
| // Wait, but nothing should happen |
| let num = poll.poll(&mut events, Some(Duration::from_millis(300))).unwrap(); |
| assert_eq!(0, num); |
| |
| // Push the value |
| tx.send("hello").unwrap(); |
| |
| // Polling will contain the event |
| let num = poll.poll(&mut events, Some(Duration::from_millis(300))).unwrap(); |
| assert_eq!(1, num); |
| |
| let event = events.get(0).unwrap(); |
| assert_eq!(event.token(), Token(123)); |
| assert_eq!(event.readiness(), Ready::readable()); |
| |
| // Poll again and there should be no events |
| let num = poll.poll(&mut events, Some(Duration::from_millis(300))).unwrap(); |
| assert_eq!(0, num); |
| |
| // Read the value |
| assert_eq!("hello", rx.try_recv().unwrap()); |
| |
| // Poll again, nothing |
| let num = poll.poll(&mut events, Some(Duration::from_millis(300))).unwrap(); |
| assert_eq!(0, num); |
| |
| // Push a value |
| tx.send("goodbye").unwrap(); |
| |
| // Poll again, nothing |
| let num = poll.poll(&mut events, Some(Duration::from_millis(300))).unwrap(); |
| assert_eq!(0, num); |
| |
| // Reregistering will re-trigger the notification |
| for _ in 0..3 { |
| poll.reregister(&rx, Token(123), Ready::readable(), PollOpt::edge() | PollOpt::oneshot()).unwrap(); |
| |
| // Have an event |
| let num = poll.poll(&mut events, Some(Duration::from_millis(300))).unwrap(); |
| assert_eq!(1, num); |
| |
| let event = events.get(0).unwrap(); |
| assert_eq!(event.token(), Token(123)); |
| assert_eq!(event.readiness(), Ready::readable()); |
| } |
| |
| // Get the value |
| assert_eq!("goodbye", rx.try_recv().unwrap()); |
| |
| poll.reregister(&rx, Token(123), Ready::readable(), PollOpt::edge() | PollOpt::oneshot()).unwrap(); |
| |
| // Have an event |
| let num = poll.poll(&mut events, Some(Duration::from_millis(300))).unwrap(); |
| assert_eq!(0, num); |
| |
| poll.reregister(&rx, Token(123), Ready::readable(), PollOpt::edge() | PollOpt::oneshot()).unwrap(); |
| |
| // Have an event |
| let num = poll.poll(&mut events, Some(Duration::from_millis(300))).unwrap(); |
| assert_eq!(0, num); |
| } |
| |
| #[test] |
| pub fn test_poll_channel_level() { |
| let poll = Poll::new().unwrap(); |
| let mut events = Events::with_capacity(1024); |
| let (tx, rx) = channel::channel(); |
| |
| poll.register(&rx, Token(123), Ready::readable(), PollOpt::level()).unwrap(); |
| |
| // Wait, but nothing should happen |
| let num = poll.poll(&mut events, Some(Duration::from_millis(300))).unwrap(); |
| assert_eq!(0, num); |
| |
| // Push the value |
| tx.send("hello").unwrap(); |
| |
| // Polling will contain the event |
| for i in 0..5 { |
| let num = poll.poll(&mut events, Some(Duration::from_millis(300))).unwrap(); |
| assert!(1 == num, "actually got {} on iteration {}", num, i); |
| |
| let event = events.get(0).unwrap(); |
| assert_eq!(event.token(), Token(123)); |
| assert_eq!(event.readiness(), Ready::readable()); |
| } |
| |
| // Read the value |
| assert_eq!("hello", rx.try_recv().unwrap()); |
| |
| // Wait, but nothing should happen |
| let num = poll.poll(&mut events, Some(Duration::from_millis(300))).unwrap(); |
| assert_eq!(0, num); |
| } |
| |
| #[test] |
| pub fn test_poll_channel_writable() { |
| let poll = Poll::new().unwrap(); |
| let mut events = Events::with_capacity(1024); |
| let (tx, rx) = channel::channel(); |
| |
| poll.register(&rx, Token(123), Ready::writable(), PollOpt::edge()).unwrap(); |
| |
| // Wait, but nothing should happen |
| let num = poll.poll(&mut events, Some(Duration::from_millis(300))).unwrap(); |
| assert_eq!(0, num); |
| |
| // Push the value |
| tx.send("hello").unwrap(); |
| |
| // Wait, but nothing should happen |
| let num = poll.poll(&mut events, Some(Duration::from_millis(300))).unwrap(); |
| assert_eq!(0, num); |
| } |
| |
| #[test] |
| pub fn test_dropping_receive_before_poll() { |
| let poll = Poll::new().unwrap(); |
| let mut events = Events::with_capacity(1024); |
| let (tx, rx) = channel::channel(); |
| |
| poll.register(&rx, Token(123), Ready::readable(), PollOpt::edge()).unwrap(); |
| |
| // Push the value |
| tx.send("hello").unwrap(); |
| |
| // Drop the receive end |
| drop(rx); |
| |
| // Wait, but nothing should happen |
| let num = poll.poll(&mut events, Some(Duration::from_millis(300))).unwrap(); |
| assert_eq!(0, num); |
| } |
| |
| #[test] |
| pub fn test_mixing_channel_with_socket() { |
| use mio::net::{TcpListener, TcpStream}; |
| |
| let poll = Poll::new().unwrap(); |
| let mut events = Events::with_capacity(1024); |
| let (tx, rx) = channel::channel(); |
| |
| // Create the listener |
| let l = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); |
| |
| // Register the listener with `Poll` |
| poll.register(&l, Token(0), Ready::readable(), PollOpt::edge()).unwrap(); |
| poll.register(&rx, Token(1), Ready::readable(), PollOpt::edge()).unwrap(); |
| |
| // Push a value onto the channel |
| tx.send("hello").unwrap(); |
| |
| // Connect a TCP socket |
| let s1 = TcpStream::connect(&l.local_addr().unwrap()).unwrap(); |
| |
| // Register the socket |
| poll.register(&s1, Token(2), Ready::readable(), PollOpt::edge()).unwrap(); |
| |
| // Sleep a bit to ensure it arrives at dest |
| sleep_ms(250); |
| |
| expect_events(&poll, &mut events, 2, vec![ |
| Event::new(Ready::empty(), Token(0)), |
| Event::new(Ready::empty(), Token(1)), |
| ]); |
| } |
| |
| #[test] |
| pub fn test_sending_from_other_thread_while_polling() { |
| const ITERATIONS: usize = 20; |
| const THREADS: usize = 5; |
| |
| // Make sure to run multiple times |
| let poll = Poll::new().unwrap(); |
| let mut events = Events::with_capacity(1024); |
| |
| for _ in 0..ITERATIONS { |
| let (tx, rx) = channel::channel(); |
| poll.register(&rx, Token(0), Ready::readable(), PollOpt::edge()).unwrap(); |
| |
| for _ in 0..THREADS { |
| let tx = tx.clone(); |
| |
| thread::spawn(move || { |
| sleep_ms(50); |
| tx.send("ping").unwrap(); |
| }); |
| } |
| |
| let mut recv = 0; |
| |
| while recv < THREADS { |
| let num = poll.poll(&mut events, None).unwrap(); |
| |
| if num != 0 { |
| assert_eq!(1, num); |
| assert_eq!(events.get(0).unwrap().token(), Token(0)); |
| |
| while let Ok(_) = rx.try_recv() { |
| recv += 1; |
| } |
| } |
| } |
| } |
| } |