blob: f3b4ac219e4f7b3e390a97302777c02068899eb1 [file] [log] [blame]
use mio::*;
use mio::deprecated::{Handler, EventLoop};
use mio::udp::*;
use bytes::{Buf, RingBuf, SliceBuf, MutBuf};
use std::str;
use std::time;
use localhost;
const LISTENER: Token = Token(0);
const SENDER: Token = Token(1);
pub struct UdpHandlerSendRecv {
tx: UdpSocket,
rx: UdpSocket,
msg: &'static str,
buf: SliceBuf<'static>,
rx_buf: RingBuf,
connected: bool
}
impl UdpHandlerSendRecv {
fn new(tx: UdpSocket, rx: UdpSocket, connected: bool, msg : &'static str) -> UdpHandlerSendRecv {
UdpHandlerSendRecv {
tx: tx,
rx: rx,
msg: msg,
buf: SliceBuf::wrap(msg.as_bytes()),
rx_buf: RingBuf::new(1024),
connected: connected
}
}
}
impl Handler for UdpHandlerSendRecv {
type Timeout = usize;
type Message = ();
fn ready(&mut self, event_loop: &mut EventLoop<UdpHandlerSendRecv>, token: Token, events: Ready) {
if events.is_readable() {
match token {
LISTENER => {
debug!("We are receiving a datagram now...");
let cnt = unsafe {
if !self.connected {
self.rx.recv_from(self.rx_buf.mut_bytes()).unwrap()
.unwrap().0
} else {
self.rx.recv(self.rx_buf.mut_bytes()).unwrap()
.unwrap()
}
};
unsafe { MutBuf::advance(&mut self.rx_buf, cnt); }
assert!(str::from_utf8(self.rx_buf.bytes()).unwrap() == self.msg);
event_loop.shutdown();
},
_ => ()
}
}
if events.is_writable() {
match token {
SENDER => {
let cnt = if !self.connected {
let addr = self.rx.local_addr().unwrap();
self.tx.send_to(self.buf.bytes(), &addr).unwrap()
.unwrap()
} else {
self.tx.send(self.buf.bytes()).unwrap()
.unwrap()
};
self.buf.advance(cnt);
},
_ => {}
}
}
}
}
pub struct UdpHandlerTimeout {
}
impl Handler for UdpHandlerTimeout {
type Timeout = usize;
type Message = ();
fn ready(&mut self, _event_loop: &mut EventLoop<UdpHandlerTimeout>, token: Token, events: Ready) {
if events.is_readable() {
match token {
LISTENER => {
assert!(false, "Expected to no receive a packet but got something")
},
_ => ()
}
}
}
/// Invoked when a timeout has completed.
fn timeout(&mut self, event_loop: &mut EventLoop<Self>, _timeout: Self::Timeout) {
event_loop.shutdown();
}
}
fn assert_send<T: Send>() {
}
fn assert_sync<T: Sync>() {
}
#[cfg(test)]
fn test_send_recv_udp(tx: UdpSocket, rx: UdpSocket, connected: bool) {
debug!("Starting TEST_UDP_SOCKETS");
let mut event_loop = EventLoop::new().unwrap();
assert_send::<UdpSocket>();
assert_sync::<UdpSocket>();
// ensure that the sockets are non-blocking
let mut buf = [0; 128];
assert!(rx.recv_from(&mut buf).unwrap().is_none());
info!("Registering SENDER");
event_loop.register(&tx, SENDER, Ready::writable(), PollOpt::edge()).unwrap();
info!("Registering LISTENER");
event_loop.register(&rx, LISTENER, Ready::readable(), PollOpt::edge()).unwrap();
info!("Starting event loop to test with...");
event_loop.run(&mut UdpHandlerSendRecv::new(tx, rx, connected, "hello world")).unwrap();
}
#[test]
pub fn test_udp_socket() {
let addr = localhost();
let any = localhost();
let tx = UdpSocket::bind(&any).unwrap();
let rx = UdpSocket::bind(&addr).unwrap();
test_send_recv_udp(tx, rx, false);
}
#[test]
pub fn test_udp_socket_send_recv() {
let addr = localhost();
let any = localhost();
let tx = UdpSocket::bind(&any).unwrap();
let rx = UdpSocket::bind(&addr).unwrap();
let tx_addr = tx.local_addr().unwrap();
let rx_addr = rx.local_addr().unwrap();
assert!(tx.connect(rx_addr).is_ok());
assert!(rx.connect(tx_addr).is_ok());
test_send_recv_udp(tx, rx, true);
}
#[test]
pub fn test_udp_socket_discard() {
let addr = localhost();
let any = localhost();
let outside = localhost();
let tx = UdpSocket::bind(&any).unwrap();
let rx = UdpSocket::bind(&addr).unwrap();
let udp_outside = UdpSocket::bind(&outside).unwrap();
let tx_addr = tx.local_addr().unwrap();
let rx_addr = rx.local_addr().unwrap();
assert!(tx.connect(rx_addr).is_ok());
assert!(udp_outside.connect(rx_addr).is_ok());
assert!(rx.connect(tx_addr).is_ok());
let mut event_loop = EventLoop::new().unwrap();
assert!(udp_outside.send("hello world".as_bytes()).is_ok());
event_loop.register(&rx, LISTENER, Ready::readable(), PollOpt::edge()).unwrap();
event_loop.register(&tx, SENDER, Ready::writable(), PollOpt::edge()).unwrap();
event_loop.timeout(5000, time::Duration::from_secs(5)).unwrap();
event_loop.run(&mut UdpHandlerTimeout {}).unwrap();
}