| #![cfg(all(feature = "os-poll", feature = "net"))] |
| |
| use log::{debug, info}; |
| use mio::net::UdpSocket; |
| use mio::{Events, Interest, Poll, Registry, Token}; |
| use std::net::{self, IpAddr, SocketAddr}; |
| #[cfg(unix)] |
| use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd}; |
| use std::str; |
| use std::sync::{Arc, Barrier}; |
| use std::thread; |
| use std::time::Duration; |
| |
| #[macro_use] |
| mod util; |
| use util::{ |
| any_local_address, any_local_ipv6_address, assert_error, assert_send, |
| assert_socket_close_on_exec, assert_socket_non_blocking, assert_sync, assert_would_block, |
| expect_events, expect_no_events, init, init_with_poll, ExpectEvent, |
| }; |
| |
| const DATA1: &[u8] = b"Hello world!"; |
| const DATA2: &[u8] = b"Hello mars!"; |
| |
| const LISTENER: Token = Token(0); |
| const SENDER: Token = Token(1); |
| const ID1: Token = Token(2); |
| const ID2: Token = Token(3); |
| const ID3: Token = Token(4); |
| |
| #[test] |
| #[cfg(all(unix, not(debug_assertions)))] |
| fn assert_size() { |
| use mio::net::*; |
| use std::mem::size_of; |
| |
| // Without debug assertions enabled `TcpListener`, `TcpStream` and |
| // `UdpSocket` should have the same size as the system specific socket, i.e. |
| // just a file descriptor on Unix platforms. |
| assert_eq!(size_of::<UdpSocket>(), size_of::<std::net::UdpSocket>()); |
| } |
| |
| #[test] |
| fn empty_datagram() { |
| const EMPTY: &[u8] = b""; |
| |
| let (mut poll, mut events) = init_with_poll(); |
| let mut s1 = UdpSocket::bind(any_local_address()).unwrap(); |
| let mut s2 = UdpSocket::bind(any_local_address()).unwrap(); |
| |
| poll.registry() |
| .register(&mut s1, ID1, Interest::WRITABLE) |
| .unwrap(); |
| poll.registry() |
| .register(&mut s2, ID2, Interest::READABLE) |
| .unwrap(); |
| |
| expect_events( |
| &mut poll, |
| &mut events, |
| vec![ExpectEvent::new(ID1, Interest::WRITABLE)], |
| ); |
| |
| checked_write!(s1.send_to(EMPTY, s2.local_addr().unwrap())); |
| |
| let mut buf = [0; 10]; |
| |
| expect_events( |
| &mut poll, |
| &mut events, |
| vec![ExpectEvent::new(ID2, Interest::READABLE)], |
| ); |
| expect_read!(s2.recv_from(&mut buf), EMPTY, s1.local_addr().unwrap()); |
| } |
| |
| #[test] |
| fn is_send_and_sync() { |
| assert_send::<UdpSocket>(); |
| assert_sync::<UdpSocket>(); |
| } |
| |
| #[test] |
| fn unconnected_udp_socket_ipv4() { |
| let socket1 = UdpSocket::bind(any_local_address()).unwrap(); |
| let socket2 = UdpSocket::bind(any_local_address()).unwrap(); |
| smoke_test_unconnected_udp_socket(socket1, socket2); |
| } |
| |
| #[test] |
| fn unconnected_udp_socket_ipv6() { |
| let socket1 = UdpSocket::bind(any_local_ipv6_address()).unwrap(); |
| let socket2 = UdpSocket::bind(any_local_ipv6_address()).unwrap(); |
| smoke_test_unconnected_udp_socket(socket1, socket2); |
| } |
| |
| #[test] |
| fn unconnected_udp_socket_std() { |
| let socket1 = net::UdpSocket::bind(any_local_address()).unwrap(); |
| let socket2 = net::UdpSocket::bind(any_local_address()).unwrap(); |
| |
| // `std::net::UdpSocket`s are blocking by default, so make sure they are |
| // in non-blocking mode before wrapping in a Mio equivalent. |
| socket1.set_nonblocking(true).unwrap(); |
| socket2.set_nonblocking(true).unwrap(); |
| |
| let socket1 = UdpSocket::from_std(socket1); |
| let socket2 = UdpSocket::from_std(socket2); |
| smoke_test_unconnected_udp_socket(socket1, socket2); |
| } |
| |
| fn smoke_test_unconnected_udp_socket(mut socket1: UdpSocket, mut socket2: UdpSocket) { |
| let (mut poll, mut events) = init_with_poll(); |
| |
| assert_socket_non_blocking(&socket1); |
| assert_socket_close_on_exec(&socket1); |
| assert_socket_non_blocking(&socket2); |
| assert_socket_close_on_exec(&socket2); |
| |
| let address1 = socket1.local_addr().unwrap(); |
| let address2 = socket2.local_addr().unwrap(); |
| |
| poll.registry() |
| .register( |
| &mut socket1, |
| ID1, |
| Interest::READABLE.add(Interest::WRITABLE), |
| ) |
| .expect("unable to register UDP socket"); |
| poll.registry() |
| .register( |
| &mut socket2, |
| ID2, |
| Interest::READABLE.add(Interest::WRITABLE), |
| ) |
| .expect("unable to register UDP socket"); |
| |
| expect_events( |
| &mut poll, |
| &mut events, |
| vec![ |
| ExpectEvent::new(ID1, Interest::WRITABLE), |
| ExpectEvent::new(ID2, Interest::WRITABLE), |
| ], |
| ); |
| |
| let mut buf = [0; 20]; |
| assert_would_block(socket1.peek_from(&mut buf)); |
| assert_would_block(socket1.recv_from(&mut buf)); |
| |
| checked_write!(socket1.send_to(DATA1, address2)); |
| checked_write!(socket2.send_to(DATA2, address1)); |
| |
| expect_events( |
| &mut poll, |
| &mut events, |
| vec![ |
| ExpectEvent::new(ID1, Interest::READABLE), |
| ExpectEvent::new(ID2, Interest::READABLE), |
| ], |
| ); |
| |
| expect_read!(socket1.peek_from(&mut buf), DATA2, address2); |
| expect_read!(socket2.peek_from(&mut buf), DATA1, address1); |
| |
| expect_read!(socket1.recv_from(&mut buf), DATA2, address2); |
| expect_read!(socket2.recv_from(&mut buf), DATA1, address1); |
| |
| assert!(socket1.take_error().unwrap().is_none()); |
| assert!(socket2.take_error().unwrap().is_none()); |
| } |
| |
| #[test] |
| fn set_get_ttl() { |
| let socket1 = UdpSocket::bind(any_local_address()).unwrap(); |
| |
| // set TTL, get TTL, make sure it has the expected value |
| const TTL: u32 = 10; |
| socket1.set_ttl(TTL).unwrap(); |
| assert_eq!(socket1.ttl().unwrap(), TTL); |
| assert!(socket1.take_error().unwrap().is_none()); |
| } |
| |
| #[test] |
| fn get_ttl_without_previous_set() { |
| let socket1 = UdpSocket::bind(any_local_address()).unwrap(); |
| |
| // expect a get TTL to work w/o any previous set_ttl |
| socket1.ttl().expect("unable to get TTL for UDP socket"); |
| } |
| |
| #[test] |
| fn set_get_broadcast() { |
| let socket1 = UdpSocket::bind(any_local_address()).unwrap(); |
| |
| socket1.set_broadcast(true).unwrap(); |
| assert_eq!(socket1.broadcast().unwrap(), true); |
| |
| socket1.set_broadcast(false).unwrap(); |
| assert_eq!(socket1.broadcast().unwrap(), false); |
| |
| assert!(socket1.take_error().unwrap().is_none()); |
| } |
| |
| #[test] |
| fn get_broadcast_without_previous_set() { |
| let socket1 = UdpSocket::bind(any_local_address()).unwrap(); |
| |
| socket1 |
| .broadcast() |
| .expect("unable to get broadcast for UDP socket"); |
| } |
| |
| #[test] |
| fn set_get_multicast_loop_v4() { |
| let socket1 = UdpSocket::bind(any_local_address()).unwrap(); |
| |
| socket1.set_multicast_loop_v4(true).unwrap(); |
| assert_eq!(socket1.multicast_loop_v4().unwrap(), true); |
| |
| socket1.set_multicast_loop_v4(false).unwrap(); |
| assert_eq!(socket1.multicast_loop_v4().unwrap(), false); |
| |
| assert!(socket1.take_error().unwrap().is_none()); |
| } |
| |
| #[test] |
| fn get_multicast_loop_v4_without_previous_set() { |
| let socket1 = UdpSocket::bind(any_local_address()).unwrap(); |
| |
| socket1 |
| .multicast_loop_v4() |
| .expect("unable to get multicast_loop_v4 for UDP socket"); |
| } |
| |
| #[test] |
| fn set_get_multicast_ttl_v4() { |
| let socket1 = UdpSocket::bind(any_local_address()).unwrap(); |
| |
| const TTL: u32 = 10; |
| socket1.set_multicast_ttl_v4(TTL).unwrap(); |
| assert_eq!(socket1.multicast_ttl_v4().unwrap(), TTL); |
| |
| assert!(socket1.take_error().unwrap().is_none()); |
| } |
| |
| #[test] |
| fn get_multicast_ttl_v4_without_previous_set() { |
| let socket1 = UdpSocket::bind(any_local_address()).unwrap(); |
| |
| socket1 |
| .multicast_ttl_v4() |
| .expect("unable to get multicast_ttl_v4 for UDP socket"); |
| } |
| |
| #[test] |
| fn set_get_multicast_loop_v6() { |
| let socket1 = UdpSocket::bind(any_local_ipv6_address()).unwrap(); |
| |
| socket1.set_multicast_loop_v6(true).unwrap(); |
| assert_eq!(socket1.multicast_loop_v6().unwrap(), true); |
| |
| socket1.set_multicast_loop_v6(false).unwrap(); |
| assert_eq!(socket1.multicast_loop_v6().unwrap(), false); |
| |
| assert!(socket1.take_error().unwrap().is_none()); |
| } |
| |
| #[test] |
| fn get_multicast_loop_v6_without_previous_set() { |
| let socket1 = UdpSocket::bind(any_local_ipv6_address()).unwrap(); |
| |
| socket1 |
| .multicast_loop_v6() |
| .expect("unable to get multicast_loop_v6 for UDP socket"); |
| } |
| |
| #[test] |
| fn connected_udp_socket_ipv4() { |
| let socket1 = UdpSocket::bind(any_local_address()).unwrap(); |
| let address1 = socket1.local_addr().unwrap(); |
| |
| let socket2 = UdpSocket::bind(any_local_address()).unwrap(); |
| let address2 = socket2.local_addr().unwrap(); |
| |
| socket1.connect(address2).unwrap(); |
| socket2.connect(address1).unwrap(); |
| |
| smoke_test_connected_udp_socket(socket1, socket2); |
| } |
| |
| #[test] |
| fn connected_udp_socket_ipv6() { |
| let socket1 = UdpSocket::bind(any_local_ipv6_address()).unwrap(); |
| let address1 = socket1.local_addr().unwrap(); |
| |
| let socket2 = UdpSocket::bind(any_local_ipv6_address()).unwrap(); |
| let address2 = socket2.local_addr().unwrap(); |
| |
| socket1.connect(address2).unwrap(); |
| socket2.connect(address1).unwrap(); |
| |
| smoke_test_connected_udp_socket(socket1, socket2); |
| } |
| |
| #[test] |
| fn connected_udp_socket_std() { |
| let socket1 = net::UdpSocket::bind(any_local_address()).unwrap(); |
| let address1 = socket1.local_addr().unwrap(); |
| |
| let socket2 = net::UdpSocket::bind(any_local_address()).unwrap(); |
| let address2 = socket2.local_addr().unwrap(); |
| |
| socket1.connect(address2).unwrap(); |
| socket2.connect(address1).unwrap(); |
| |
| // `std::net::UdpSocket`s are blocking by default, so make sure they are |
| // in non-blocking mode before wrapping in a Mio equivalent. |
| socket1.set_nonblocking(true).unwrap(); |
| socket2.set_nonblocking(true).unwrap(); |
| |
| let socket1 = UdpSocket::from_std(socket1); |
| let socket2 = UdpSocket::from_std(socket2); |
| |
| smoke_test_connected_udp_socket(socket1, socket2); |
| } |
| |
| fn smoke_test_connected_udp_socket(mut socket1: UdpSocket, mut socket2: UdpSocket) { |
| let (mut poll, mut events) = init_with_poll(); |
| |
| assert_socket_non_blocking(&socket1); |
| assert_socket_close_on_exec(&socket1); |
| assert_socket_non_blocking(&socket2); |
| assert_socket_close_on_exec(&socket2); |
| |
| poll.registry() |
| .register( |
| &mut socket1, |
| ID1, |
| Interest::READABLE.add(Interest::WRITABLE), |
| ) |
| .expect("unable to register UDP socket"); |
| poll.registry() |
| .register( |
| &mut socket2, |
| ID2, |
| Interest::READABLE.add(Interest::WRITABLE), |
| ) |
| .expect("unable to register UDP socket"); |
| |
| expect_events( |
| &mut poll, |
| &mut events, |
| vec![ |
| ExpectEvent::new(ID1, Interest::WRITABLE), |
| ExpectEvent::new(ID2, Interest::WRITABLE), |
| ], |
| ); |
| |
| let mut buf = [0; 20]; |
| assert_would_block(socket1.peek(&mut buf)); |
| assert_would_block(socket1.recv(&mut buf)); |
| |
| checked_write!(socket1.send(DATA1)); |
| checked_write!(socket2.send(DATA2)); |
| |
| expect_events( |
| &mut poll, |
| &mut events, |
| vec![ |
| ExpectEvent::new(ID1, Interest::READABLE), |
| ExpectEvent::new(ID2, Interest::READABLE), |
| ], |
| ); |
| |
| let mut buf = [0; 20]; |
| expect_read!(socket1.peek(&mut buf), DATA2); |
| expect_read!(socket2.peek(&mut buf), DATA1); |
| |
| expect_read!(socket1.recv(&mut buf), DATA2); |
| expect_read!(socket2.recv(&mut buf), DATA1); |
| |
| assert!(socket1.take_error().unwrap().is_none()); |
| assert!(socket2.take_error().unwrap().is_none()); |
| } |
| |
| #[test] |
| fn reconnect_udp_socket_sending() { |
| let (mut poll, mut events) = init_with_poll(); |
| |
| let mut socket1 = UdpSocket::bind(any_local_address()).unwrap(); |
| let mut socket2 = UdpSocket::bind(any_local_address()).unwrap(); |
| let mut socket3 = UdpSocket::bind(any_local_address()).unwrap(); |
| |
| let address1 = socket1.local_addr().unwrap(); |
| let address2 = socket2.local_addr().unwrap(); |
| let address3 = socket3.local_addr().unwrap(); |
| |
| socket1.connect(address2).unwrap(); |
| socket2.connect(address1).unwrap(); |
| socket3.connect(address1).unwrap(); |
| |
| poll.registry() |
| .register( |
| &mut socket1, |
| ID1, |
| Interest::READABLE.add(Interest::WRITABLE), |
| ) |
| .unwrap(); |
| poll.registry() |
| .register(&mut socket2, ID2, Interest::READABLE) |
| .unwrap(); |
| poll.registry() |
| .register(&mut socket3, ID3, Interest::READABLE) |
| .unwrap(); |
| |
| expect_events( |
| &mut poll, |
| &mut events, |
| vec![ExpectEvent::new(ID1, Interest::WRITABLE)], |
| ); |
| |
| checked_write!(socket1.send(DATA1)); |
| |
| expect_events( |
| &mut poll, |
| &mut events, |
| vec![ExpectEvent::new(ID2, Interest::READABLE)], |
| ); |
| |
| let mut buf = [0; 20]; |
| expect_read!(socket2.recv(&mut buf), DATA1); |
| |
| socket1.connect(address3).unwrap(); |
| checked_write!(socket1.send(DATA2)); |
| |
| expect_events( |
| &mut poll, |
| &mut events, |
| vec![ExpectEvent::new(ID3, Interest::READABLE)], |
| ); |
| |
| expect_read!(socket3.recv(&mut buf), DATA2); |
| |
| assert!(socket1.take_error().unwrap().is_none()); |
| assert!(socket2.take_error().unwrap().is_none()); |
| assert!(socket3.take_error().unwrap().is_none()); |
| } |
| |
| #[test] |
| fn reconnect_udp_socket_receiving() { |
| let (mut poll, mut events) = init_with_poll(); |
| |
| let mut socket1 = UdpSocket::bind(any_local_address()).unwrap(); |
| let mut socket2 = UdpSocket::bind(any_local_address()).unwrap(); |
| let mut socket3 = UdpSocket::bind(any_local_address()).unwrap(); |
| |
| let address1 = socket1.local_addr().unwrap(); |
| let address2 = socket2.local_addr().unwrap(); |
| let address3 = socket3.local_addr().unwrap(); |
| |
| socket1.connect(address2).unwrap(); |
| socket2.connect(address1).unwrap(); |
| socket3.connect(address1).unwrap(); |
| |
| poll.registry() |
| .register(&mut socket1, ID1, Interest::READABLE) |
| .unwrap(); |
| poll.registry() |
| .register(&mut socket2, ID2, Interest::WRITABLE) |
| .unwrap(); |
| poll.registry() |
| .register(&mut socket3, ID3, Interest::WRITABLE) |
| .unwrap(); |
| |
| expect_events( |
| &mut poll, |
| &mut events, |
| vec![ |
| ExpectEvent::new(ID2, Interest::WRITABLE), |
| ExpectEvent::new(ID3, Interest::WRITABLE), |
| ], |
| ); |
| |
| checked_write!(socket2.send(DATA1)); |
| |
| expect_events( |
| &mut poll, |
| &mut events, |
| vec![ExpectEvent::new(ID1, Interest::READABLE)], |
| ); |
| |
| let mut buf = [0; 20]; |
| expect_read!(socket1.recv(&mut buf), DATA1); |
| |
| //this will reregister socket1 resetting the interests |
| assert_would_block(socket1.recv(&mut buf)); |
| |
| socket1.connect(address3).unwrap(); |
| |
| checked_write!(socket3.send(DATA2)); |
| |
| expect_events( |
| &mut poll, |
| &mut events, |
| vec![ExpectEvent::new(ID1, Interest::READABLE)], |
| ); |
| |
| // Read all data. |
| // On Windows, reading part of data returns error WSAEMSGSIZE (10040). |
| expect_read!(socket1.recv(&mut buf), DATA2); |
| |
| //this will reregister socket1 resetting the interests |
| assert_would_block(socket1.recv(&mut buf)); |
| |
| // Now connect back to socket 2. |
| socket1.connect(address2).unwrap(); |
| |
| checked_write!(socket2.send(DATA2)); |
| |
| expect_events( |
| &mut poll, |
| &mut events, |
| vec![ExpectEvent::new(ID1, Interest::READABLE)], |
| ); |
| |
| expect_read!(socket1.recv(&mut buf), DATA2); |
| |
| assert!(socket1.take_error().unwrap().is_none()); |
| assert!(socket2.take_error().unwrap().is_none()); |
| assert!(socket3.take_error().unwrap().is_none()); |
| } |
| |
| #[test] |
| fn unconnected_udp_socket_connected_methods() { |
| let (mut poll, mut events) = init_with_poll(); |
| |
| let mut socket1 = UdpSocket::bind(any_local_address()).unwrap(); |
| let mut socket2 = UdpSocket::bind(any_local_address()).unwrap(); |
| let address2 = socket2.local_addr().unwrap(); |
| |
| poll.registry() |
| .register(&mut socket1, ID1, Interest::WRITABLE) |
| .unwrap(); |
| poll.registry() |
| .register(&mut socket2, ID2, Interest::READABLE) |
| .unwrap(); |
| |
| expect_events( |
| &mut poll, |
| &mut events, |
| vec![ExpectEvent::new(ID1, Interest::WRITABLE)], |
| ); |
| |
| // Socket is unconnected, but we're using an connected method. |
| if cfg!(not(target_os = "windows")) { |
| assert_error(socket1.send(DATA1), "address required"); |
| } |
| if cfg!(target_os = "windows") { |
| assert_error( |
| socket1.send(DATA1), |
| "no address was supplied. (os error 10057)", |
| ); |
| } |
| |
| // Now send some actual data. |
| checked_write!(socket1.send_to(DATA1, address2)); |
| |
| expect_events( |
| &mut poll, |
| &mut events, |
| vec![ExpectEvent::new(ID2, Interest::READABLE)], |
| ); |
| |
| // Receive methods don't require the socket to be connected, you just won't |
| // know the sender. |
| let mut buf = [0; 20]; |
| expect_read!(socket2.peek(&mut buf), DATA1); |
| expect_read!(socket2.recv(&mut buf), DATA1); |
| |
| assert!(socket1.take_error().unwrap().is_none()); |
| assert!(socket2.take_error().unwrap().is_none()); |
| } |
| |
| #[test] |
| fn connected_udp_socket_unconnected_methods() { |
| let (mut poll, mut events) = init_with_poll(); |
| |
| let mut socket1 = UdpSocket::bind(any_local_address()).unwrap(); |
| let mut socket2 = UdpSocket::bind(any_local_address()).unwrap(); |
| let mut socket3 = UdpSocket::bind(any_local_address()).unwrap(); |
| |
| let address2 = socket2.local_addr().unwrap(); |
| let address3 = socket3.local_addr().unwrap(); |
| |
| socket1.connect(address3).unwrap(); |
| socket3.connect(address2).unwrap(); |
| |
| poll.registry() |
| .register(&mut socket1, ID1, Interest::WRITABLE) |
| .unwrap(); |
| poll.registry() |
| .register(&mut socket2, ID2, Interest::WRITABLE) |
| .unwrap(); |
| poll.registry() |
| .register(&mut socket3, ID3, Interest::READABLE) |
| .unwrap(); |
| |
| expect_events( |
| &mut poll, |
| &mut events, |
| vec![ |
| ExpectEvent::new(ID1, Interest::WRITABLE), |
| ExpectEvent::new(ID2, Interest::WRITABLE), |
| ], |
| ); |
| |
| // Can't use `send_to`. |
| // Linux (and Android) and Windows actually allow `send_to` even if the |
| // socket is connected. |
| #[cfg(not(any(target_os = "android", target_os = "linux", target_os = "windows")))] |
| assert_error(socket1.send_to(DATA1, address2), "already connected"); |
| // Even if the address is the same. |
| #[cfg(not(any(target_os = "android", target_os = "linux", target_os = "windows")))] |
| assert_error(socket1.send_to(DATA1, address3), "already connected"); |
| |
| checked_write!(socket2.send_to(DATA2, address3)); |
| |
| expect_events( |
| &mut poll, |
| &mut events, |
| vec![ExpectEvent::new(ID3, Interest::READABLE)], |
| ); |
| |
| let mut buf = [0; 20]; |
| expect_read!(socket3.peek_from(&mut buf), DATA2, address2); |
| expect_read!(socket3.recv_from(&mut buf), DATA2, address2); |
| |
| assert!(socket1.take_error().unwrap().is_none()); |
| assert!(socket2.take_error().unwrap().is_none()); |
| assert!(socket3.take_error().unwrap().is_none()); |
| } |
| |
| #[cfg(unix)] |
| #[test] |
| fn udp_socket_raw_fd() { |
| init(); |
| |
| let socket = UdpSocket::bind(any_local_address()).unwrap(); |
| let address = socket.local_addr().unwrap(); |
| |
| let raw_fd1 = socket.as_raw_fd(); |
| let raw_fd2 = socket.into_raw_fd(); |
| assert_eq!(raw_fd1, raw_fd2); |
| |
| let socket = unsafe { UdpSocket::from_raw_fd(raw_fd2) }; |
| assert_eq!(socket.as_raw_fd(), raw_fd1); |
| assert_eq!(socket.local_addr().unwrap(), address); |
| } |
| |
| #[test] |
| fn udp_socket_register() { |
| let (mut poll, mut events) = init_with_poll(); |
| |
| let mut socket = UdpSocket::bind(any_local_address()).unwrap(); |
| poll.registry() |
| .register(&mut socket, ID1, Interest::READABLE) |
| .expect("unable to register UDP socket"); |
| |
| expect_no_events(&mut poll, &mut events); |
| |
| // NOTE: more tests are done in the smoke tests above. |
| } |
| |
| #[test] |
| fn udp_socket_reregister() { |
| let (mut poll, mut events) = init_with_poll(); |
| |
| let mut socket = UdpSocket::bind(any_local_address()).unwrap(); |
| let address = socket.local_addr().unwrap(); |
| |
| let barrier = Arc::new(Barrier::new(2)); |
| let thread_handle = send_packets(address, 1, barrier.clone()); |
| |
| poll.registry() |
| .register(&mut socket, ID1, Interest::WRITABLE) |
| .unwrap(); |
| // Let the first packet be send. |
| barrier.wait(); |
| expect_events( |
| &mut poll, |
| &mut events, |
| vec![ExpectEvent::new(ID1, Interest::WRITABLE)], // Not readable! |
| ); |
| |
| poll.registry() |
| .reregister(&mut socket, ID2, Interest::READABLE) |
| .unwrap(); |
| expect_events( |
| &mut poll, |
| &mut events, |
| vec![ExpectEvent::new(ID2, Interest::READABLE)], |
| ); |
| |
| let mut buf = [0; 20]; |
| expect_read!(socket.recv_from(&mut buf), DATA1, __anywhere); |
| |
| thread_handle.join().expect("unable to join thread"); |
| } |
| |
| #[test] |
| fn udp_socket_no_events_after_deregister() { |
| let (mut poll, mut events) = init_with_poll(); |
| |
| let mut socket = UdpSocket::bind(any_local_address()).unwrap(); |
| let address = socket.local_addr().unwrap(); |
| |
| let barrier = Arc::new(Barrier::new(2)); |
| let thread_handle = send_packets(address, 1, barrier.clone()); |
| |
| poll.registry() |
| .register(&mut socket, ID1, Interest::READABLE) |
| .unwrap(); |
| |
| // Let the packet be send. |
| barrier.wait(); |
| |
| poll.registry().deregister(&mut socket).unwrap(); |
| |
| expect_no_events(&mut poll, &mut events); |
| |
| // But we do expect a packet to be send. |
| let mut buf = [0; 20]; |
| expect_read!(socket.recv_from(&mut buf), DATA1, __anywhere); |
| |
| thread_handle.join().expect("unable to join thread"); |
| } |
| |
| /// Sends `n_packets` packets to `address`, over UDP, after the `barrier` is |
| /// waited (before each send) on in another thread. |
| fn send_packets( |
| address: SocketAddr, |
| n_packets: usize, |
| barrier: Arc<Barrier>, |
| ) -> thread::JoinHandle<()> { |
| thread::spawn(move || { |
| let socket = net::UdpSocket::bind(any_local_address()).unwrap(); |
| for _ in 0..n_packets { |
| barrier.wait(); |
| checked_write!(socket.send_to(DATA1, address)); |
| } |
| }) |
| } |
| |
| pub struct UdpHandlerSendRecv { |
| tx: UdpSocket, |
| rx: UdpSocket, |
| msg: &'static str, |
| buf: Vec<u8>, |
| rx_buf: Vec<u8>, |
| connected: bool, |
| shutdown: bool, |
| } |
| |
| impl UdpHandlerSendRecv { |
| fn new(tx: UdpSocket, rx: UdpSocket, connected: bool, msg: &'static str) -> UdpHandlerSendRecv { |
| UdpHandlerSendRecv { |
| tx, |
| rx, |
| msg, |
| buf: msg.as_bytes().to_vec(), |
| rx_buf: vec![0; 1024], |
| connected, |
| shutdown: false, |
| } |
| } |
| } |
| |
| fn send_recv_udp(mut tx: UdpSocket, mut rx: UdpSocket, connected: bool) { |
| init(); |
| |
| debug!("Starting TEST_UDP_SOCKETS"); |
| let mut poll = Poll::new().unwrap(); |
| |
| // ensure that the sockets are non-blocking |
| let mut buf = [0; 128]; |
| assert_would_block(rx.recv_from(&mut buf)); |
| |
| info!("Registering SENDER"); |
| poll.registry() |
| .register(&mut tx, SENDER, Interest::WRITABLE) |
| .unwrap(); |
| |
| info!("Registering LISTENER"); |
| poll.registry() |
| .register(&mut rx, LISTENER, Interest::READABLE) |
| .unwrap(); |
| |
| let mut events = Events::with_capacity(1024); |
| |
| info!("Starting event loop to test with..."); |
| let mut handler = UdpHandlerSendRecv::new(tx, rx, connected, "hello world"); |
| |
| while !handler.shutdown { |
| poll.poll(&mut events, None).unwrap(); |
| |
| for event in &events { |
| if event.is_readable() { |
| if let LISTENER = event.token() { |
| debug!("We are receiving a datagram now..."); |
| let cnt = if !handler.connected { |
| handler.rx.recv_from(&mut handler.rx_buf).unwrap().0 |
| } else { |
| handler.rx.recv(&mut handler.rx_buf).unwrap() |
| }; |
| |
| unsafe { handler.rx_buf.set_len(cnt) }; |
| assert_eq!( |
| str::from_utf8(handler.rx_buf.as_ref()).unwrap(), |
| handler.msg |
| ); |
| handler.shutdown = true; |
| } |
| } |
| |
| if event.is_writable() { |
| if let SENDER = event.token() { |
| let cnt = if !handler.connected { |
| let addr = handler.rx.local_addr().unwrap(); |
| handler.tx.send_to(&handler.buf, addr).unwrap() |
| } else { |
| handler.tx.send(&handler.buf).unwrap() |
| }; |
| |
| // Advance the buffer. |
| drop(handler.buf.drain(..cnt)); |
| } |
| } |
| } |
| } |
| } |
| |
| /// Returns the sender and the receiver |
| fn connected_sockets() -> (UdpSocket, UdpSocket) { |
| let tx = UdpSocket::bind(any_local_address()).unwrap(); |
| let rx = UdpSocket::bind(any_local_address()).unwrap(); |
| |
| let tx_addr = tx.local_addr().unwrap(); |
| let rx_addr = rx.local_addr().unwrap(); |
| |
| assert!(tx.connect(rx_addr).is_ok()); |
| assert!(rx.connect(tx_addr).is_ok()); |
| |
| (tx, rx) |
| } |
| |
| #[test] |
| pub fn udp_socket() { |
| init(); |
| |
| let tx = UdpSocket::bind(any_local_address()).unwrap(); |
| let rx = UdpSocket::bind(any_local_address()).unwrap(); |
| |
| send_recv_udp(tx, rx, false); |
| } |
| |
| #[test] |
| pub fn udp_socket_send_recv() { |
| init(); |
| |
| let (tx, rx) = connected_sockets(); |
| |
| send_recv_udp(tx, rx, true); |
| } |
| |
| #[test] |
| pub fn udp_socket_discard() { |
| init(); |
| |
| let mut tx = UdpSocket::bind(any_local_address()).unwrap(); |
| let mut rx = UdpSocket::bind(any_local_address()).unwrap(); |
| let udp_outside = UdpSocket::bind(any_local_address()).unwrap(); |
| |
| let tx_addr = tx.local_addr().unwrap(); |
| let rx_addr = rx.local_addr().unwrap(); |
| |
| assert!(tx.connect(rx_addr).is_ok()); |
| assert!(udp_outside.connect(rx_addr).is_ok()); |
| assert!(rx.connect(tx_addr).is_ok()); |
| |
| let mut poll = Poll::new().unwrap(); |
| |
| checked_write!(udp_outside.send(b"hello world")); |
| |
| poll.registry() |
| .register(&mut rx, LISTENER, Interest::READABLE) |
| .unwrap(); |
| poll.registry() |
| .register(&mut tx, SENDER, Interest::WRITABLE) |
| .unwrap(); |
| |
| let mut events = Events::with_capacity(1024); |
| |
| poll.poll(&mut events, Some(Duration::from_secs(5))) |
| .unwrap(); |
| |
| for event in &events { |
| if event.is_readable() { |
| if let LISTENER = event.token() { |
| panic!("Expected to no receive a packet but got something") |
| } |
| } |
| } |
| } |
| |
| pub struct UdpHandler { |
| tx: UdpSocket, |
| rx: UdpSocket, |
| msg: &'static str, |
| buf: Vec<u8>, |
| rx_buf: Vec<u8>, |
| localhost: IpAddr, |
| shutdown: bool, |
| } |
| |
| impl UdpHandler { |
| fn new(tx: UdpSocket, rx: UdpSocket, msg: &'static str) -> UdpHandler { |
| let sock = UdpSocket::bind(any_local_address()).unwrap(); |
| UdpHandler { |
| tx, |
| rx, |
| msg, |
| buf: msg.as_bytes().to_vec(), |
| rx_buf: Vec::with_capacity(1024), |
| localhost: sock.local_addr().unwrap().ip(), |
| shutdown: false, |
| } |
| } |
| |
| fn handle_read(&mut self, _: &Registry, token: Token) { |
| if let LISTENER = token { |
| debug!("We are receiving a datagram now..."); |
| unsafe { self.rx_buf.set_len(self.rx_buf.capacity()) }; |
| match self.rx.recv_from(&mut self.rx_buf) { |
| Ok((cnt, addr)) => { |
| unsafe { self.rx_buf.set_len(cnt) }; |
| assert_eq!(addr.ip(), self.localhost); |
| } |
| res => panic!("unexpected result: {:?}", res), |
| } |
| assert_eq!(str::from_utf8(&self.rx_buf).unwrap(), self.msg); |
| self.shutdown = true; |
| } |
| } |
| |
| fn handle_write(&mut self, _: &Registry, token: Token) { |
| if let SENDER = token { |
| let addr = self.rx.local_addr().unwrap(); |
| let cnt = self.tx.send_to(self.buf.as_ref(), addr).unwrap(); |
| self.buf.drain(..cnt); |
| } |
| } |
| } |
| |
| // TODO: This doesn't pass on android 64bit CI... |
| // Figure out why! |
| #[cfg_attr( |
| target_os = "android", |
| ignore = "Multicast doesn't work on Android 64bit" |
| )] |
| #[test] |
| pub fn multicast() { |
| init(); |
| |
| debug!("Starting TEST_UDP_CONNECTIONLESS"); |
| let mut poll = Poll::new().unwrap(); |
| |
| let mut tx = UdpSocket::bind(any_local_address()).unwrap(); |
| let mut rx = UdpSocket::bind(any_local_address()).unwrap(); |
| |
| info!("Joining group 227.1.1.100"); |
| let any = &"0.0.0.0".parse().unwrap(); |
| rx.join_multicast_v4(&"227.1.1.100".parse().unwrap(), any) |
| .unwrap(); |
| |
| info!("Joining group 227.1.1.101"); |
| rx.join_multicast_v4(&"227.1.1.101".parse().unwrap(), any) |
| .unwrap(); |
| |
| info!("Registering SENDER"); |
| poll.registry() |
| .register(&mut tx, SENDER, Interest::WRITABLE) |
| .unwrap(); |
| |
| info!("Registering LISTENER"); |
| poll.registry() |
| .register(&mut rx, LISTENER, Interest::READABLE) |
| .unwrap(); |
| |
| let mut events = Events::with_capacity(1024); |
| |
| let mut handler = UdpHandler::new(tx, rx, "hello world"); |
| |
| info!("Starting event loop to test with..."); |
| |
| while !handler.shutdown { |
| poll.poll(&mut events, None).unwrap(); |
| |
| for event in &events { |
| if event.is_readable() { |
| handler.handle_read(poll.registry(), event.token()); |
| } |
| |
| if event.is_writable() { |
| handler.handle_write(poll.registry(), event.token()); |
| } |
| } |
| } |
| } |
| |
| #[test] |
| fn et_behavior_recv() { |
| let (mut poll, mut events) = init_with_poll(); |
| |
| let mut socket1 = UdpSocket::bind(any_local_address()).unwrap(); |
| let mut socket2 = UdpSocket::bind(any_local_address()).unwrap(); |
| |
| let address2 = socket2.local_addr().unwrap(); |
| |
| poll.registry() |
| .register(&mut socket1, ID1, Interest::WRITABLE) |
| .expect("unable to register UDP socket"); |
| poll.registry() |
| .register( |
| &mut socket2, |
| ID2, |
| Interest::READABLE.add(Interest::WRITABLE), |
| ) |
| .expect("unable to register UDP socket"); |
| |
| expect_events( |
| &mut poll, |
| &mut events, |
| vec![ |
| ExpectEvent::new(ID1, Interest::WRITABLE), |
| ExpectEvent::new(ID2, Interest::WRITABLE), |
| ], |
| ); |
| |
| socket1.connect(address2).unwrap(); |
| |
| let mut buf = [0; 20]; |
| checked_write!(socket1.send(DATA1)); |
| expect_events( |
| &mut poll, |
| &mut events, |
| vec![ExpectEvent::new(ID2, Interest::READABLE)], |
| ); |
| |
| expect_read!(socket2.recv(&mut buf), DATA1); |
| |
| // this will reregister the socket2, resetting the interests |
| assert_would_block(socket2.recv(&mut buf)); |
| checked_write!(socket1.send(DATA1)); |
| expect_events( |
| &mut poll, |
| &mut events, |
| vec![ExpectEvent::new(ID2, Interest::READABLE)], |
| ); |
| |
| let mut buf = [0; 20]; |
| expect_read!(socket2.recv(&mut buf), DATA1); |
| } |
| |
| #[test] |
| fn et_behavior_recv_from() { |
| let (mut poll, mut events) = init_with_poll(); |
| |
| let mut socket1 = UdpSocket::bind(any_local_address()).unwrap(); |
| let mut socket2 = UdpSocket::bind(any_local_address()).unwrap(); |
| |
| let address1 = socket1.local_addr().unwrap(); |
| let address2 = socket2.local_addr().unwrap(); |
| |
| poll.registry() |
| .register( |
| &mut socket1, |
| ID1, |
| Interest::READABLE.add(Interest::WRITABLE), |
| ) |
| .expect("unable to register UDP socket"); |
| poll.registry() |
| .register( |
| &mut socket2, |
| ID2, |
| Interest::READABLE.add(Interest::WRITABLE), |
| ) |
| .expect("unable to register UDP socket"); |
| |
| expect_events( |
| &mut poll, |
| &mut events, |
| vec![ |
| ExpectEvent::new(ID1, Interest::WRITABLE), |
| ExpectEvent::new(ID2, Interest::WRITABLE), |
| ], |
| ); |
| |
| checked_write!(socket1.send_to(DATA1, address2)); |
| |
| expect_events( |
| &mut poll, |
| &mut events, |
| vec![ExpectEvent::new(ID2, Interest::READABLE)], |
| ); |
| |
| let mut buf = [0; 20]; |
| expect_read!(socket2.recv_from(&mut buf), DATA1, address1); |
| |
| // this will reregister the socket2, resetting the interests |
| assert_would_block(socket2.recv_from(&mut buf)); |
| checked_write!(socket1.send_to(DATA1, address2)); |
| expect_events( |
| &mut poll, |
| &mut events, |
| vec![ExpectEvent::new(ID2, Interest::READABLE)], |
| ); |
| |
| expect_read!(socket2.recv_from(&mut buf), DATA1, address1); |
| |
| assert!(socket1.take_error().unwrap().is_none()); |
| assert!(socket2.take_error().unwrap().is_none()); |
| } |