| use std::future::Future; |
| use std::io; |
| use std::net::{Shutdown, TcpListener, TcpStream, UdpSocket}; |
| #[cfg(unix)] |
| use std::os::unix::net::{UnixDatagram, UnixListener, UnixStream}; |
| use std::sync::Arc; |
| use std::thread; |
| use std::time::Duration; |
| |
| use async_io::{Async, Timer}; |
| use futures_lite::{future, prelude::*}; |
| #[cfg(unix)] |
| use tempfile::tempdir; |
| |
| const LOREM_IPSUM: &[u8] = b" |
| Lorem ipsum dolor sit amet, consectetur adipiscing elit. |
| Donec pretium ante erat, vitae sodales mi varius quis. |
| Etiam vestibulum lorem vel urna tempor, eu fermentum odio aliquam. |
| Aliquam consequat urna vitae ipsum pulvinar, in blandit purus eleifend. |
| "; |
| |
| fn spawn<T: Send + 'static>( |
| f: impl Future<Output = T> + Send + 'static, |
| ) -> impl Future<Output = T> + Send + 'static { |
| let (s, r) = async_channel::bounded(1); |
| |
| thread::spawn(move || { |
| future::block_on(async { |
| s.send(f.await).await.ok(); |
| }) |
| }); |
| |
| Box::pin(async move { r.recv().await.unwrap() }) |
| } |
| |
| #[test] |
| fn tcp_connect() -> io::Result<()> { |
| future::block_on(async { |
| let listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 0))?; |
| let addr = listener.get_ref().local_addr()?; |
| let task = spawn(async move { listener.accept().await }); |
| |
| let stream2 = Async::<TcpStream>::connect(addr).await?; |
| let stream1 = task.await?.0; |
| |
| assert_eq!( |
| stream1.get_ref().peer_addr()?, |
| stream2.get_ref().local_addr()?, |
| ); |
| assert_eq!( |
| stream2.get_ref().peer_addr()?, |
| stream1.get_ref().local_addr()?, |
| ); |
| |
| // Now that the listener is closed, connect should fail. |
| let err = Async::<TcpStream>::connect(addr).await.unwrap_err(); |
| assert_eq!(err.kind(), io::ErrorKind::ConnectionRefused); |
| |
| Ok(()) |
| }) |
| } |
| |
| #[test] |
| fn tcp_peek_read() -> io::Result<()> { |
| future::block_on(async { |
| let listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 0))?; |
| let addr = listener.get_ref().local_addr()?; |
| |
| let mut stream = Async::<TcpStream>::connect(addr).await?; |
| stream.write_all(LOREM_IPSUM).await?; |
| |
| let mut buf = [0; 1024]; |
| let mut incoming = Box::pin(listener.incoming()); |
| let mut stream = incoming.next().await.unwrap()?; |
| |
| let n = stream.peek(&mut buf).await?; |
| assert_eq!(&buf[..n], LOREM_IPSUM); |
| let n = stream.read(&mut buf).await?; |
| assert_eq!(&buf[..n], LOREM_IPSUM); |
| |
| Ok(()) |
| }) |
| } |
| |
| #[test] |
| fn tcp_reader_hangup() -> io::Result<()> { |
| future::block_on(async { |
| let listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 0))?; |
| let addr = listener.get_ref().local_addr()?; |
| let task = spawn(async move { listener.accept().await }); |
| |
| let mut stream2 = Async::<TcpStream>::connect(addr).await?; |
| let stream1 = task.await?.0; |
| |
| let task = spawn(async move { |
| Timer::after(Duration::from_secs(1)).await; |
| drop(stream1); |
| }); |
| |
| while stream2.write_all(LOREM_IPSUM).await.is_ok() {} |
| task.await; |
| |
| Ok(()) |
| }) |
| } |
| |
| #[test] |
| fn tcp_writer_hangup() -> io::Result<()> { |
| future::block_on(async { |
| let listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 0))?; |
| let addr = listener.get_ref().local_addr()?; |
| let task = spawn(async move { listener.accept().await }); |
| |
| let mut stream2 = Async::<TcpStream>::connect(addr).await?; |
| let stream1 = task.await?.0; |
| |
| let task = spawn(async move { |
| Timer::after(Duration::from_secs(1)).await; |
| drop(stream1); |
| }); |
| |
| let mut v = vec![]; |
| stream2.read_to_end(&mut v).await?; |
| assert!(v.is_empty()); |
| |
| task.await; |
| Ok(()) |
| }) |
| } |
| |
| #[test] |
| fn udp_send_recv() -> io::Result<()> { |
| future::block_on(async { |
| let socket1 = Async::<UdpSocket>::bind(([127, 0, 0, 1], 0))?; |
| let socket2 = Async::<UdpSocket>::bind(([127, 0, 0, 1], 0))?; |
| socket1.get_ref().connect(socket2.get_ref().local_addr()?)?; |
| |
| let mut buf = [0u8; 1024]; |
| |
| socket1.send(LOREM_IPSUM).await?; |
| let n = socket2.peek(&mut buf).await?; |
| assert_eq!(&buf[..n], LOREM_IPSUM); |
| let n = socket2.recv(&mut buf).await?; |
| assert_eq!(&buf[..n], LOREM_IPSUM); |
| |
| socket2 |
| .send_to(LOREM_IPSUM, socket1.get_ref().local_addr()?) |
| .await?; |
| let n = socket1.peek_from(&mut buf).await?.0; |
| assert_eq!(&buf[..n], LOREM_IPSUM); |
| let n = socket1.recv_from(&mut buf).await?.0; |
| assert_eq!(&buf[..n], LOREM_IPSUM); |
| |
| Ok(()) |
| }) |
| } |
| |
| #[cfg(unix)] |
| #[test] |
| fn udp_connect() -> io::Result<()> { |
| future::block_on(async { |
| let dir = tempdir()?; |
| let path = dir.path().join("socket"); |
| |
| let listener = Async::<UnixListener>::bind(&path)?; |
| |
| let mut stream = Async::<UnixStream>::connect(&path).await?; |
| stream.write_all(LOREM_IPSUM).await?; |
| |
| let mut buf = [0; 1024]; |
| let mut incoming = Box::pin(listener.incoming()); |
| let mut stream = incoming.next().await.unwrap()?; |
| |
| let n = stream.read(&mut buf).await?; |
| assert_eq!(&buf[..n], LOREM_IPSUM); |
| |
| Ok(()) |
| }) |
| } |
| |
| #[cfg(unix)] |
| #[test] |
| fn uds_connect() -> io::Result<()> { |
| future::block_on(async { |
| let dir = tempdir()?; |
| let path = dir.path().join("socket"); |
| let listener = Async::<UnixListener>::bind(&path)?; |
| |
| let addr = listener.get_ref().local_addr()?; |
| let task = spawn(async move { listener.accept().await }); |
| |
| let stream2 = Async::<UnixStream>::connect(addr.as_pathname().unwrap()).await?; |
| let stream1 = task.await?.0; |
| |
| assert_eq!( |
| stream1.get_ref().peer_addr()?.as_pathname(), |
| stream2.get_ref().local_addr()?.as_pathname(), |
| ); |
| assert_eq!( |
| stream2.get_ref().peer_addr()?.as_pathname(), |
| stream1.get_ref().local_addr()?.as_pathname(), |
| ); |
| |
| // Now that the listener is closed, connect should fail. |
| let err = Async::<UnixStream>::connect(addr.as_pathname().unwrap()) |
| .await |
| .unwrap_err(); |
| assert_eq!(err.kind(), io::ErrorKind::ConnectionRefused); |
| |
| Ok(()) |
| }) |
| } |
| |
| #[cfg(unix)] |
| #[test] |
| fn uds_send_recv() -> io::Result<()> { |
| future::block_on(async { |
| let (socket1, socket2) = Async::<UnixDatagram>::pair()?; |
| |
| socket1.send(LOREM_IPSUM).await?; |
| let mut buf = [0; 1024]; |
| let n = socket2.recv(&mut buf).await?; |
| assert_eq!(&buf[..n], LOREM_IPSUM); |
| |
| Ok(()) |
| }) |
| } |
| |
| #[cfg(unix)] |
| #[test] |
| fn uds_send_to_recv_from() -> io::Result<()> { |
| future::block_on(async { |
| let dir = tempdir()?; |
| let path = dir.path().join("socket"); |
| let socket1 = Async::<UnixDatagram>::bind(&path)?; |
| let socket2 = Async::<UnixDatagram>::unbound()?; |
| |
| socket2.send_to(LOREM_IPSUM, &path).await?; |
| let mut buf = [0; 1024]; |
| let n = socket1.recv_from(&mut buf).await?.0; |
| assert_eq!(&buf[..n], LOREM_IPSUM); |
| |
| Ok(()) |
| }) |
| } |
| |
| #[cfg(unix)] |
| #[test] |
| fn uds_reader_hangup() -> io::Result<()> { |
| future::block_on(async { |
| let (socket1, mut socket2) = Async::<UnixStream>::pair()?; |
| |
| let task = spawn(async move { |
| Timer::after(Duration::from_secs(1)).await; |
| drop(socket1); |
| }); |
| |
| while socket2.write_all(LOREM_IPSUM).await.is_ok() {} |
| task.await; |
| |
| Ok(()) |
| }) |
| } |
| |
| #[cfg(unix)] |
| #[test] |
| fn uds_writer_hangup() -> io::Result<()> { |
| future::block_on(async { |
| let (socket1, mut socket2) = Async::<UnixStream>::pair()?; |
| |
| let task = spawn(async move { |
| Timer::after(Duration::from_secs(1)).await; |
| drop(socket1); |
| }); |
| |
| let mut v = vec![]; |
| socket2.read_to_end(&mut v).await?; |
| assert!(v.is_empty()); |
| |
| task.await; |
| Ok(()) |
| }) |
| } |
| |
| // Test that we correctly re-register interests after we've previously been |
| // interested in both readable and writable events and then we get only one of |
| // those (we need to re-register interest on the other). |
| #[test] |
| fn tcp_duplex() -> io::Result<()> { |
| future::block_on(async { |
| let listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 0))?; |
| let stream1 = |
| Arc::new(Async::<TcpStream>::connect(listener.get_ref().local_addr()?).await?); |
| let stream2 = Arc::new(listener.accept().await?.0); |
| |
| async fn do_read(s: Arc<Async<TcpStream>>) -> io::Result<()> { |
| let mut buf = vec![0u8; 4096]; |
| loop { |
| let len = (&*s).read(&mut buf).await?; |
| if len == 0 { |
| return Ok(()); |
| } |
| } |
| } |
| |
| async fn do_write(s: Arc<Async<TcpStream>>) -> io::Result<()> { |
| let buf = vec![0u8; 4096]; |
| for _ in 0..4096 { |
| (&*s).write_all(&buf).await?; |
| } |
| s.get_ref().shutdown(Shutdown::Write)?; |
| Ok(()) |
| } |
| |
| // Read from and write to stream1. |
| let r1 = spawn(do_read(stream1.clone())); |
| let w1 = spawn(do_write(stream1)); |
| |
| // Sleep a bit, so that reading and writing are both blocked. |
| Timer::after(Duration::from_millis(5)).await; |
| |
| // Start reading stream2, make stream1 writable. |
| let r2 = spawn(do_read(stream2.clone())); |
| |
| // Finish writing to stream1. |
| w1.await?; |
| r2.await?; |
| |
| // Start writing to stream2, make stream1 readable. |
| let w2 = spawn(do_write(stream2)); |
| |
| // Will r1 be correctly woken? |
| r1.await?; |
| w2.await?; |
| |
| Ok(()) |
| }) |
| } |
| |
| #[test] |
| fn shutdown() -> io::Result<()> { |
| future::block_on(async { |
| let listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 0))?; |
| let addr = listener.get_ref().local_addr()?; |
| let ((mut reader, _), writer) = |
| future::try_zip(listener.accept(), Async::<TcpStream>::connect(addr)).await?; |
| |
| // The writer must be closed in order for `read_to_end()` to finish. |
| let mut buf = Vec::new(); |
| future::try_zip(reader.read_to_end(&mut buf), async { |
| writer.get_ref().shutdown(Shutdown::Write) |
| }) |
| .await?; |
| |
| Ok(()) |
| }) |
| } |