| use std::io; |
| use std::mem; |
| use std::net::{self, SocketAddr, Ipv4Addr, Ipv6Addr}; |
| use std::fmt; |
| |
| use futures::{Async, Future, Poll}; |
| use mio; |
| |
| use reactor::{Handle, PollEvented}; |
| |
| /// An I/O object representing a UDP socket. |
| pub struct UdpSocket { |
| io: PollEvented<mio::net::UdpSocket>, |
| } |
| |
| mod frame; |
| pub use self::frame::{UdpFramed, UdpCodec}; |
| |
| impl UdpSocket { |
| /// Create a new UDP socket bound to the specified address. |
| /// |
| /// This function will create a new UDP socket and attempt to bind it to the |
| /// `addr` provided. If the result is `Ok`, the socket has successfully bound. |
| pub fn bind(addr: &SocketAddr, handle: &Handle) -> io::Result<UdpSocket> { |
| let udp = try!(mio::net::UdpSocket::bind(addr)); |
| UdpSocket::new(udp, handle) |
| } |
| |
| fn new(socket: mio::net::UdpSocket, handle: &Handle) -> io::Result<UdpSocket> { |
| let io = try!(PollEvented::new(socket, handle)); |
| Ok(UdpSocket { io: io }) |
| } |
| |
| /// Creates a new `UdpSocket` from the previously bound socket provided. |
| /// |
| /// The socket given will be registered with the event loop that `handle` is |
| /// associated with. This function requires that `socket` has previously |
| /// been bound to an address to work correctly. |
| /// |
| /// This can be used in conjunction with net2's `UdpBuilder` interface to |
| /// configure a socket before it's handed off, such as setting options like |
| /// `reuse_address` or binding to multiple addresses. |
| pub fn from_socket(socket: net::UdpSocket, |
| handle: &Handle) -> io::Result<UdpSocket> { |
| let udp = try!(mio::net::UdpSocket::from_socket(socket)); |
| UdpSocket::new(udp, handle) |
| } |
| |
| /// Provides a `Stream` and `Sink` interface for reading and writing to this |
| /// `UdpSocket` object, using the provided `UdpCodec` to read and write the |
| /// raw data. |
| /// |
| /// Raw UDP sockets work with datagrams, but higher-level code usually |
| /// wants to batch these into meaningful chunks, called "frames". This |
| /// method layers framing on top of this socket by using the `UdpCodec` |
| /// trait to handle encoding and decoding of messages frames. Note that |
| /// the incoming and outgoing frame types may be distinct. |
| /// |
| /// This function returns a *single* object that is both `Stream` and |
| /// `Sink`; grouping this into a single object is often useful for layering |
| /// things which require both read and write access to the underlying |
| /// object. |
| /// |
| /// If you want to work more directly with the streams and sink, consider |
| /// calling `split` on the `UdpFramed` returned by this method, which will |
| /// break them into separate objects, allowing them to interact more |
| /// easily. |
| pub fn framed<C: UdpCodec>(self, codec: C) -> UdpFramed<C> { |
| frame::new(self, codec) |
| } |
| |
| /// Returns the local address that this stream is bound to. |
| pub fn local_addr(&self) -> io::Result<SocketAddr> { |
| self.io.get_ref().local_addr() |
| } |
| |
| /// Test whether this socket is ready to be read or not. |
| /// |
| /// If the socket is *not* readable then the current task is scheduled to |
| /// get a notification when the socket does become readable. That is, this |
| /// is only suitable for calling in a `Future::poll` method and will |
| /// automatically handle ensuring a retry once the socket is readable again. |
| pub fn poll_read(&self) -> Async<()> { |
| self.io.poll_read() |
| } |
| |
| /// Test whether this socket is ready to be written to or not. |
| /// |
| /// If the socket is *not* writable then the current task is scheduled to |
| /// get a notification when the socket does become writable. That is, this |
| /// is only suitable for calling in a `Future::poll` method and will |
| /// automatically handle ensuring a retry once the socket is writable again. |
| pub fn poll_write(&self) -> Async<()> { |
| self.io.poll_write() |
| } |
| |
| /// Sends data on the socket to the given address. On success, returns the |
| /// number of bytes written. |
| /// |
| /// Address type can be any implementor of `ToSocketAddrs` trait. See its |
| /// documentation for concrete examples. |
| pub fn send_to(&self, buf: &[u8], target: &SocketAddr) -> io::Result<usize> { |
| if let Async::NotReady = self.io.poll_write() { |
| return Err(::would_block()) |
| } |
| match self.io.get_ref().send_to(buf, target) { |
| Ok(n) => Ok(n), |
| Err(e) => { |
| if e.kind() == io::ErrorKind::WouldBlock { |
| self.io.need_write(); |
| } |
| Err(e) |
| } |
| } |
| } |
| |
| /// Creates a future that will write the entire contents of the buffer |
| /// `buf` provided as a datagram to this socket. |
| /// |
| /// The returned future will return after data has been written to the |
| /// outbound socket. The future will resolve to the stream as well as the |
| /// buffer (for reuse if needed). |
| /// |
| /// Any error which happens during writing will cause both the stream and |
| /// the buffer to get destroyed. Note that failure to write the entire |
| /// buffer is considered an error for the purposes of sending a datagram. |
| /// |
| /// The `buf` parameter here only requires the `AsRef<[u8]>` trait, which |
| /// should be broadly applicable to accepting data which can be converted |
| /// to a slice. The `Window` struct is also available in this crate to |
| /// provide a different window into a slice if necessary. |
| pub fn send_dgram<T>(self, buf: T, addr: SocketAddr) -> SendDgram<T> |
| where T: AsRef<[u8]>, |
| { |
| SendDgram { |
| state: SendState::Writing { |
| sock: self, |
| addr: addr, |
| buf: buf, |
| }, |
| } |
| } |
| |
| /// Receives data from the socket. On success, returns the number of bytes |
| /// read and the address from whence the data came. |
| pub fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { |
| if let Async::NotReady = self.io.poll_read() { |
| return Err(::would_block()) |
| } |
| match self.io.get_ref().recv_from(buf) { |
| Ok(n) => Ok(n), |
| Err(e) => { |
| if e.kind() == io::ErrorKind::WouldBlock { |
| self.io.need_read(); |
| } |
| Err(e) |
| } |
| } |
| } |
| |
| /// Creates a future that receive a datagram to be written to the buffer |
| /// provided. |
| /// |
| /// The returned future will return after a datagram has been received on |
| /// this socket. The future will resolve to the socket, the buffer, the |
| /// amount of data read, and the address the data was received from. |
| /// |
| /// An error during reading will cause the socket and buffer to get |
| /// destroyed and the socket will be returned. |
| /// |
| /// The `buf` parameter here only requires the `AsMut<[u8]>` trait, which |
| /// should be broadly applicable to accepting data which can be converted |
| /// to a slice. The `Window` struct is also available in this crate to |
| /// provide a different window into a slice if necessary. |
| pub fn recv_dgram<T>(self, buf: T) -> RecvDgram<T> |
| where T: AsMut<[u8]>, |
| { |
| RecvDgram { |
| state: RecvState::Reading { |
| sock: self, |
| buf: buf, |
| }, |
| } |
| } |
| |
| /// Gets the value of the `SO_BROADCAST` option for this socket. |
| /// |
| /// For more information about this option, see |
| /// [`set_broadcast`][link]. |
| /// |
| /// [link]: #method.set_broadcast |
| pub fn broadcast(&self) -> io::Result<bool> { |
| self.io.get_ref().broadcast() |
| } |
| |
| /// Sets the value of the `SO_BROADCAST` option for this socket. |
| /// |
| /// When enabled, this socket is allowed to send packets to a broadcast |
| /// address. |
| pub fn set_broadcast(&self, on: bool) -> io::Result<()> { |
| self.io.get_ref().set_broadcast(on) |
| } |
| |
| /// Gets the value of the `IP_MULTICAST_LOOP` option for this socket. |
| /// |
| /// For more information about this option, see |
| /// [`set_multicast_loop_v4`][link]. |
| /// |
| /// [link]: #method.set_multicast_loop_v4 |
| pub fn multicast_loop_v4(&self) -> io::Result<bool> { |
| self.io.get_ref().multicast_loop_v4() |
| } |
| |
| /// Sets the value of the `IP_MULTICAST_LOOP` option for this socket. |
| /// |
| /// If enabled, multicast packets will be looped back to the local socket. |
| /// Note that this may not have any affect on IPv6 sockets. |
| pub fn set_multicast_loop_v4(&self, on: bool) -> io::Result<()> { |
| self.io.get_ref().set_multicast_loop_v4(on) |
| } |
| |
| /// Gets the value of the `IP_MULTICAST_TTL` option for this socket. |
| /// |
| /// For more information about this option, see |
| /// [`set_multicast_ttl_v4`][link]. |
| /// |
| /// [link]: #method.set_multicast_ttl_v4 |
| pub fn multicast_ttl_v4(&self) -> io::Result<u32> { |
| self.io.get_ref().multicast_ttl_v4() |
| } |
| |
| /// Sets the value of the `IP_MULTICAST_TTL` option for this socket. |
| /// |
| /// Indicates the time-to-live value of outgoing multicast packets for |
| /// this socket. The default value is 1 which means that multicast packets |
| /// don't leave the local network unless explicitly requested. |
| /// |
| /// Note that this may not have any affect on IPv6 sockets. |
| pub fn set_multicast_ttl_v4(&self, ttl: u32) -> io::Result<()> { |
| self.io.get_ref().set_multicast_ttl_v4(ttl) |
| } |
| |
| /// Gets the value of the `IPV6_MULTICAST_LOOP` option for this socket. |
| /// |
| /// For more information about this option, see |
| /// [`set_multicast_loop_v6`][link]. |
| /// |
| /// [link]: #method.set_multicast_loop_v6 |
| pub fn multicast_loop_v6(&self) -> io::Result<bool> { |
| self.io.get_ref().multicast_loop_v6() |
| } |
| |
| /// Sets the value of the `IPV6_MULTICAST_LOOP` option for this socket. |
| /// |
| /// Controls whether this socket sees the multicast packets it sends itself. |
| /// Note that this may not have any affect on IPv4 sockets. |
| pub fn set_multicast_loop_v6(&self, on: bool) -> io::Result<()> { |
| self.io.get_ref().set_multicast_loop_v6(on) |
| } |
| |
| /// Gets the value of the `IP_TTL` option for this socket. |
| /// |
| /// For more information about this option, see [`set_ttl`][link]. |
| /// |
| /// [link]: #method.set_ttl |
| pub fn ttl(&self) -> io::Result<u32> { |
| self.io.get_ref().ttl() |
| } |
| |
| /// Sets the value for the `IP_TTL` option on this socket. |
| /// |
| /// This value sets the time-to-live field that is used in every packet sent |
| /// from this socket. |
| pub fn set_ttl(&self, ttl: u32) -> io::Result<()> { |
| self.io.get_ref().set_ttl(ttl) |
| } |
| |
| /// Executes an operation of the `IP_ADD_MEMBERSHIP` type. |
| /// |
| /// This function specifies a new multicast group for this socket to join. |
| /// The address must be a valid multicast address, and `interface` is the |
| /// address of the local interface with which the system should join the |
| /// multicast group. If it's equal to `INADDR_ANY` then an appropriate |
| /// interface is chosen by the system. |
| pub fn join_multicast_v4(&self, |
| multiaddr: &Ipv4Addr, |
| interface: &Ipv4Addr) -> io::Result<()> { |
| self.io.get_ref().join_multicast_v4(multiaddr, interface) |
| } |
| |
| /// Executes an operation of the `IPV6_ADD_MEMBERSHIP` type. |
| /// |
| /// This function specifies a new multicast group for this socket to join. |
| /// The address must be a valid multicast address, and `interface` is the |
| /// index of the interface to join/leave (or 0 to indicate any interface). |
| pub fn join_multicast_v6(&self, |
| multiaddr: &Ipv6Addr, |
| interface: u32) -> io::Result<()> { |
| self.io.get_ref().join_multicast_v6(multiaddr, interface) |
| } |
| |
| /// Executes an operation of the `IP_DROP_MEMBERSHIP` type. |
| /// |
| /// For more information about this option, see |
| /// [`join_multicast_v4`][link]. |
| /// |
| /// [link]: #method.join_multicast_v4 |
| pub fn leave_multicast_v4(&self, |
| multiaddr: &Ipv4Addr, |
| interface: &Ipv4Addr) -> io::Result<()> { |
| self.io.get_ref().leave_multicast_v4(multiaddr, interface) |
| } |
| |
| /// Executes an operation of the `IPV6_DROP_MEMBERSHIP` type. |
| /// |
| /// For more information about this option, see |
| /// [`join_multicast_v6`][link]. |
| /// |
| /// [link]: #method.join_multicast_v6 |
| pub fn leave_multicast_v6(&self, |
| multiaddr: &Ipv6Addr, |
| interface: u32) -> io::Result<()> { |
| self.io.get_ref().leave_multicast_v6(multiaddr, interface) |
| } |
| } |
| |
| impl fmt::Debug for UdpSocket { |
| fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { |
| self.io.get_ref().fmt(f) |
| } |
| } |
| |
| /// A future used to write the entire contents of some data to a UDP socket. |
| /// |
| /// This is created by the `UdpSocket::send_dgram` method. |
| pub struct SendDgram<T> { |
| state: SendState<T>, |
| } |
| |
| enum SendState<T> { |
| Writing { |
| sock: UdpSocket, |
| buf: T, |
| addr: SocketAddr, |
| }, |
| Empty, |
| } |
| |
| fn incomplete_write(reason: &str) -> io::Error { |
| io::Error::new(io::ErrorKind::Other, reason) |
| } |
| |
| impl<T> Future for SendDgram<T> |
| where T: AsRef<[u8]>, |
| { |
| type Item = (UdpSocket, T); |
| type Error = io::Error; |
| |
| fn poll(&mut self) -> Poll<(UdpSocket, T), io::Error> { |
| match self.state { |
| SendState::Writing { ref sock, ref buf, ref addr } => { |
| let n = try_nb!(sock.send_to(buf.as_ref(), addr)); |
| if n != buf.as_ref().len() { |
| return Err(incomplete_write("failed to send entire message \ |
| in datagram")) |
| } |
| } |
| SendState::Empty => panic!("poll a SendDgram after it's done"), |
| } |
| |
| match mem::replace(&mut self.state, SendState::Empty) { |
| SendState::Writing { sock, buf, addr: _ } => { |
| Ok(Async::Ready((sock, buf))) |
| } |
| SendState::Empty => panic!(), |
| } |
| } |
| } |
| |
| /// A future used to receive a datagram from a UDP socket. |
| /// |
| /// This is created by the `UdpSocket::recv_dgram` method. |
| pub struct RecvDgram<T> { |
| state: RecvState<T>, |
| } |
| |
| enum RecvState<T> { |
| Reading { |
| sock: UdpSocket, |
| buf: T, |
| }, |
| Empty, |
| } |
| |
| impl<T> Future for RecvDgram<T> |
| where T: AsMut<[u8]>, |
| { |
| type Item = (UdpSocket, T, usize, SocketAddr); |
| type Error = io::Error; |
| |
| fn poll(&mut self) -> Poll<Self::Item, io::Error> { |
| let (n, addr) = match self.state { |
| RecvState::Reading { ref sock, ref mut buf } => { |
| try_nb!(sock.recv_from(buf.as_mut())) |
| } |
| RecvState::Empty => panic!("poll a RecvDgram after it's done"), |
| }; |
| |
| match mem::replace(&mut self.state, RecvState::Empty) { |
| RecvState::Reading { sock, buf } => { |
| Ok(Async::Ready((sock, buf, n, addr))) |
| } |
| RecvState::Empty => panic!(), |
| } |
| } |
| } |
| |
| #[cfg(all(unix, not(target_os = "fuchsia")))] |
| mod sys { |
| use std::os::unix::prelude::*; |
| use super::UdpSocket; |
| |
| impl AsRawFd for UdpSocket { |
| fn as_raw_fd(&self) -> RawFd { |
| self.io.get_ref().as_raw_fd() |
| } |
| } |
| } |
| |
| #[cfg(windows)] |
| mod sys { |
| // TODO: let's land these upstream with mio and then we can add them here. |
| // |
| // use std::os::windows::prelude::*; |
| // use super::UdpSocket; |
| // |
| // impl AsRawHandle for UdpSocket { |
| // fn as_raw_handle(&self) -> RawHandle { |
| // self.io.get_ref().as_raw_handle() |
| // } |
| // } |
| } |