blob: e520274f433a122116c922e06381db987d40ac9f [file] [log] [blame]
// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use fuchsia_zircon::{self as zx, AsHandleRef};
use futures::io::{self, AsyncRead, AsyncWrite, Initializer};
use futures::{future::poll_fn, stream::Stream, task::LocalWaker, try_ready, Poll};
use std::fmt;
use std::pin::Pin;
use crate::RWHandle;
/// An I/O object representing a `Socket`.
pub struct Socket(RWHandle<zx::Socket>);
impl AsRef<zx::Socket> for Socket {
fn as_ref(&self) -> &zx::Socket {
self.0.get_ref()
}
}
impl AsHandleRef for Socket {
fn as_handle_ref(&self) -> zx::HandleRef {
self.0.get_ref().as_handle_ref()
}
}
impl From<Socket> for zx::Socket {
fn from(socket: Socket) -> zx::Socket {
socket.0.into_inner()
}
}
impl Socket {
/// Creates a new `Socket` from a previously-created `zx::Socket`.
pub fn from_socket(socket: zx::Socket) -> Result<Socket, zx::Status> {
Ok(Socket(RWHandle::new(socket)?))
}
/// Test whether this socket is ready to be read or not.
///
/// If the socket is *not* readable then the current task is scheduled to
/// get a notification when the socket does become readable. That is, this
/// is only suitable for calling in a `Future::poll` method and will
/// automatically handle ensuring a retry once the socket is readable again.
fn poll_read(&self, lw: &LocalWaker) -> Poll<Result<(), zx::Status>> {
self.0.poll_read(lw)
}
/// Test whether this socket is ready to be written to or not.
///
/// If the socket is *not* writable then the current task is scheduled to
/// get a notification when the socket does become writable. That is, this
/// is only suitable for calling in a `Future::poll` method and will
/// automatically handle ensuring a retry once the socket is writable again.
fn poll_write(&self, lw: &LocalWaker) -> Poll<Result<(), zx::Status>> {
self.0.poll_write(lw)
}
// Private helper for reading without `&mut` self.
// This is used in the impls of `Read` for `Socket` and `&Socket`.
fn read_nomut(&self, buf: &mut [u8], lw: &LocalWaker) -> Poll<Result<usize, zx::Status>> {
try_ready!(self.poll_read(lw));
let res = self.0.get_ref().read(buf);
if res == Err(zx::Status::SHOULD_WAIT) {
self.0.need_read(lw)?;
return Poll::Pending;
}
if res == Err(zx::Status::PEER_CLOSED) {
return Poll::Ready(Ok(0));
}
Poll::Ready(res)
}
// Private helper for writing without `&mut` self.
// This is used in the impls of `Write` for `Socket` and `&Socket`.
fn write_nomut(&self, buf: &[u8], lw: &LocalWaker) -> Poll<Result<usize, zx::Status>> {
try_ready!(self.poll_write(lw));
let res = self.0.get_ref().write(buf);
if res == Err(zx::Status::SHOULD_WAIT) {
self.0.need_write(lw)?;
Poll::Pending
} else {
Poll::Ready(res)
}
}
/// Polls for the next data on the socket, appending it to the end of |out| if it has arrived.
/// Not very useful for a non-datagram socket as it will return all available data
/// on the socket.
pub fn poll_datagram(
&self, out: &mut Vec<u8>, lw: &LocalWaker,
) -> Poll<Result<usize, zx::Status>> {
try_ready!(self.0.poll_read(lw));
let avail = self.0.get_ref().outstanding_read_bytes()?;
let len = out.len();
out.resize(len + avail, 0);
let (_, mut tail) = out.split_at_mut(len);
match self.0.get_ref().read(&mut tail) {
Err(zx::Status::SHOULD_WAIT) => {
self.0.need_read(lw)?;
Poll::Pending
}
Err(e) => Poll::Ready(Err(e)),
Ok(bytes) => {
if bytes == avail {
Poll::Ready(Ok(bytes))
} else {
Poll::Ready(Err(zx::Status::BAD_STATE))
}
}
}
}
/// Reads the next datagram that becomes available onto the end of |out|. Note: Using this
/// multiple times concurrently is an error and the first one will never complete.
pub async fn read_datagram<'a>(&'a self, out: &'a mut Vec<u8>) -> Result<usize, zx::Status> {
await!(poll_fn(move |lw| self.poll_datagram(out, lw)))
}
}
impl fmt::Debug for Socket {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
self.0.get_ref().fmt(f)
}
}
impl AsyncRead for Socket {
unsafe fn initializer(&self) -> Initializer {
// This is safe because `zx::Socket::read` does not examine
// the buffer before reading into it.
Initializer::nop()
}
fn poll_read(&mut self, lw: &LocalWaker, buf: &mut [u8]) -> Poll<io::Result<usize>> {
self.read_nomut(buf, lw).map_err(Into::into)
}
}
impl AsyncWrite for Socket {
fn poll_write(&mut self, lw: &LocalWaker, buf: &[u8]) -> Poll<io::Result<usize>> {
self.write_nomut(buf, lw).map_err(Into::into)
}
fn poll_flush(&mut self, _: &LocalWaker) -> Poll<io::Result<()>> {
Poll::Ready(Ok(()))
}
fn poll_close(&mut self, _: &LocalWaker) -> Poll<io::Result<()>> {
Poll::Ready(Ok(()))
}
}
impl<'a> AsyncRead for &'a Socket {
unsafe fn initializer(&self) -> Initializer {
// This is safe because `zx::Socket::read` does not examine
// the buffer before reading into it.
Initializer::nop()
}
fn poll_read(&mut self, lw: &LocalWaker, buf: &mut [u8]) -> Poll<io::Result<usize>> {
self.read_nomut(buf, lw).map_err(Into::into)
}
}
impl<'a> AsyncWrite for &'a Socket {
fn poll_write(&mut self, lw: &LocalWaker, buf: &[u8]) -> Poll<io::Result<usize>> {
self.write_nomut(buf, lw).map_err(Into::into)
}
fn poll_flush(&mut self, _: &LocalWaker) -> Poll<io::Result<()>> {
Poll::Ready(Ok(()))
}
fn poll_close(&mut self, _: &LocalWaker) -> Poll<io::Result<()>> {
Poll::Ready(Ok(()))
}
}
/// Note: It's probably a good idea to split the socket into read/write halves
/// before using this so you can still write on this socket.
/// Taking two streams of the same socket will not work.
impl Stream for Socket {
type Item = Result<Vec<u8>, zx::Status>;
fn poll_next(self: Pin<&mut Self>, lw: &LocalWaker) -> Poll<Option<Self::Item>> {
let mut res = Vec::<u8>::new();
match self.poll_datagram(&mut res, lw) {
Poll::Ready(Ok(_size)) => Poll::Ready(Some(Ok(res))),
Poll::Ready(Err(e)) => Poll::Ready(Some(Err(e))),
Poll::Pending => Poll::Pending,
}
}
}
impl<'a> Stream for &'a Socket {
type Item = Result<Vec<u8>, zx::Status>;
fn poll_next(self: Pin<&mut Self>, lw: &LocalWaker) -> Poll<Option<Self::Item>> {
let mut res = Vec::<u8>::new();
match self.poll_datagram(&mut res, lw) {
Poll::Ready(Ok(_size)) => Poll::Ready(Some(Ok(res))),
Poll::Ready(Err(e)) => Poll::Ready(Some(Err(e))),
Poll::Pending => Poll::Pending,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
temp::{TempAsyncReadExt, TempAsyncWriteExt},
Executor, TimeoutExt, Timer,
};
use fuchsia_zircon::prelude::*;
use futures::future::{FutureExt, TryFutureExt};
use futures::stream::StreamExt;
#[test]
fn can_read_write() {
let mut exec = Executor::new().unwrap();
let bytes = &[0, 1, 2, 3];
let (tx, rx) = zx::Socket::create(zx::SocketOpts::STREAM).unwrap();
let (tx, rx) = (
Socket::from_socket(tx).unwrap(),
Socket::from_socket(rx).unwrap(),
);
let receive_future = rx.read_to_end(vec![]).map_ok(|(_socket, buf)| {
assert_eq!(&*buf, bytes);
});
// add a timeout to receiver so if test is broken it doesn't take forever
let receiver = receive_future.on_timeout(300.millis().after_now(), || panic!("timeout"));
// Sends a message after the timeout has passed
let sender = Timer::new(100.millis().after_now())
.then(|()| tx.write_all(bytes))
.map_ok(|_tx| ());
let done = receiver.try_join(sender);
exec.run_singlethreaded(done).unwrap();
}
#[test]
fn can_read_datagram() {
let mut exec = Executor::new().unwrap();
let (one, two) = (&[0, 1], &[2, 3, 4, 5]);
let (tx, rx) = zx::Socket::create(zx::SocketOpts::DATAGRAM).unwrap();
let rx = Socket::from_socket(rx).unwrap();
let mut out = vec![50];
assert!(tx.write(one).is_ok());
assert!(tx.write(two).is_ok());
let size = exec.run_singlethreaded(rx.read_datagram(&mut out));
assert!(size.is_ok());
assert_eq!(one.len(), size.unwrap());
assert_eq!([50, 0, 1], out.as_slice());
let size = exec.run_singlethreaded(rx.read_datagram(&mut out));
assert!(size.is_ok());
assert_eq!(two.len(), size.unwrap());
assert_eq!([50, 0, 1, 2, 3, 4, 5], out.as_slice());
}
#[test]
fn stream_datagram() {
let mut exec = Executor::new().unwrap();
let (tx, rx) = zx::Socket::create(zx::SocketOpts::DATAGRAM).unwrap();
let mut rx = Socket::from_socket(rx).unwrap();
let packets = 20;
for size in 1..packets + 1 {
let mut vec = Vec::<u8>::new();
vec.resize(size, size as u8);
assert!(tx.write(&vec).is_ok());
}
// Close the socket.
drop(tx);
let stream_read_fut = async move {
let mut count = 0;
while let Some(packet) = await!(rx.next()) {
let packet = match packet {
Ok(bytes) => bytes,
Err(zx::Status::PEER_CLOSED) => break,
e => panic!("received error from stream: {:?}", e),
};
count = count + 1;
assert_eq!(packet.len(), count);
assert!(packet.iter().all(|&x| x == count as u8));
}
assert_eq!(packets, count);
};
exec.run_singlethreaded(stream_read_fut);
}
}