blob: ffa16f68c69eec909e9a3836c8b7c843fdae0449 [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
//! Provides async Socket type wrapped around an emulated zircon socket.
// TODO(ctiller): merge this implementation with the implementation in zircon_handle?
use fuchsia_zircon_status as zx_status;
use futures::future::poll_fn;
use futures::prelude::*;
use futures::ready;
use std::pin::Pin;
use std::task::{Context, Poll};
/// An I/O object representing a `Socket`.
pub struct Socket {
socket: super::Socket,
}
impl AsRef<super::Socket> for Socket {
fn as_ref(&self) -> &super::Socket {
&self.socket
}
}
impl super::AsHandleRef for Socket {
fn as_handle_ref(&self) -> super::HandleRef<'_> {
self.socket.as_handle_ref()
}
}
impl std::fmt::Debug for Socket {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.socket.fmt(f)
}
}
impl Socket {
/// Construct an `Socket` from an existing `emulated_handle::Socket`
pub fn from_socket(socket: super::Socket) -> Self {
Socket { socket }
}
/// Convert AsyncSocket back into a regular socket
pub fn into_zx_socket(self) -> super::Socket {
self.socket
}
/// Polls for the next data on the socket, appending it to the end of |out| if it has arrived.
/// Not very useful for a non-datagram socket as it will return all available data
/// on the socket.
pub fn poll_datagram(
&self,
cx: &mut Context<'_>,
out: &mut Vec<u8>,
) -> Poll<Result<usize, zx_status::Status>> {
let avail = self.socket.outstanding_read_bytes()?;
let len = out.len();
out.resize(len + avail, 0);
let (_, mut tail) = out.split_at_mut(len);
match ready!(self.socket.poll_read(&mut tail, cx)) {
Err(zx_status::Status::PEER_CLOSED) => Poll::Ready(Ok(0)),
Err(e) => Poll::Ready(Err(e)),
Ok(bytes) => {
if bytes == avail {
Poll::Ready(Ok(bytes))
} else {
Poll::Ready(Err(zx_status::Status::BAD_STATE))
}
}
}
}
/// Attempt to read from the socket, registering for wakeup if the socket doesn’t have any
/// contents available. Used internally in the `AsyncRead` implementation, exposed for users who
/// know the concrete type they’re using and don’t want to pin the socket.
pub fn poll_read_ref(
&self,
cx: &mut Context<'_>,
out: &mut [u8],
) -> Poll<Result<usize, zx_status::Status>> {
self.socket.poll_read(out, cx)
}
/// Reads the next datagram that becomes available onto the end of |out|. Note: Using this
/// multiple times concurrently is an error and the first one will never complete.
pub async fn read_datagram<'a>(
&'a self,
out: &'a mut Vec<u8>,
) -> Result<usize, zx_status::Status> {
poll_fn(move |cx| self.poll_datagram(cx, out)).await
}
}
#[cfg(not(target_arch = "wasm32"))]
impl tokio::io::AsyncWrite for Socket {
fn poll_write(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &[u8],
) -> std::task::Poll<Result<usize, futures::io::Error>> {
futures::io::AsyncWrite::poll_write(self, cx, buf)
}
fn poll_flush(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), tokio::io::Error>> {
futures::io::AsyncWrite::poll_flush(self, cx)
}
fn poll_shutdown(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), tokio::io::Error>> {
futures::io::AsyncWrite::poll_close(self, cx)
}
}
#[cfg(not(target_arch = "wasm32"))]
impl tokio::io::AsyncRead for Socket {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
futures::io::AsyncRead::poll_read(self, cx, buf.initialize_unfilled())
.map(|value| value.map(|size| buf.advance(size)))
}
}
impl AsyncWrite for Socket {
fn poll_write(
self: Pin<&mut Self>,
_cx: &mut std::task::Context<'_>,
bytes: &[u8],
) -> Poll<Result<usize, std::io::Error>> {
Poll::Ready(self.socket.write(bytes).map_err(|e| e.into()))
}
fn poll_flush(
self: Pin<&mut Self>,
_cx: &mut std::task::Context<'_>,
) -> Poll<Result<(), std::io::Error>> {
Poll::Ready(Ok(()))
}
fn poll_close(
self: Pin<&mut Self>,
_cx: &mut std::task::Context<'_>,
) -> Poll<Result<(), std::io::Error>> {
Poll::Ready(Ok(()))
}
}
impl AsyncRead for Socket {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
bytes: &mut [u8],
) -> Poll<Result<usize, std::io::Error>> {
match ready!(self.socket.poll_read(bytes, cx)) {
Err(zx_status::Status::PEER_CLOSED) => Poll::Ready(Ok(0)),
Ok(x) => {
assert_ne!(x, 0);
Poll::Ready(Ok(x))
}
Err(x) => Poll::Ready(Err(x.into())),
}
}
}
impl AsyncWrite for &'_ Socket {
fn poll_write(
self: Pin<&mut Self>,
_cx: &mut std::task::Context<'_>,
bytes: &[u8],
) -> Poll<Result<usize, std::io::Error>> {
Poll::Ready(self.socket.write(bytes).map_err(|e| e.into()))
}
fn poll_flush(
self: Pin<&mut Self>,
_cx: &mut std::task::Context<'_>,
) -> Poll<Result<(), std::io::Error>> {
Poll::Ready(Ok(()))
}
fn poll_close(
self: Pin<&mut Self>,
_cx: &mut std::task::Context<'_>,
) -> Poll<Result<(), std::io::Error>> {
Poll::Ready(Ok(()))
}
}
impl AsyncRead for &'_ Socket {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
bytes: &mut [u8],
) -> Poll<Result<usize, std::io::Error>> {
match ready!(self.socket.poll_read(bytes, cx)) {
Err(zx_status::Status::PEER_CLOSED) => Poll::Ready(Ok(0)),
Ok(x) => {
assert_ne!(x, 0);
Poll::Ready(Ok(x))
}
Err(x) => Poll::Ready(Err(x.into())),
}
}
}
#[cfg(test)]
mod test {
use super::super::Socket;
use super::Socket as AsyncSocket;
use futures::executor::block_on;
use futures::prelude::*;
use futures::task::noop_waker_ref;
use std::pin::Pin;
use std::task::Context;
#[test]
fn async_socket_write_read() {
block_on(async move {
let (a, b) = Socket::create_stream();
let (mut a, mut b) = (AsyncSocket::from_socket(a), AsyncSocket::from_socket(b));
let mut buf = [0u8; 128];
let mut cx = Context::from_waker(noop_waker_ref());
let mut rx = b.read(&mut buf);
assert!(Pin::new(&mut rx).poll(&mut cx).is_pending());
assert!(Pin::new(&mut a.write(&[1, 2, 3])).poll(&mut cx).is_ready());
rx.await.unwrap();
assert_eq!(&buf[0..3], &[1, 2, 3]);
let mut rx = a.read(&mut buf);
assert!(Pin::new(&mut rx).poll(&mut cx).is_pending());
assert!(Pin::new(&mut b.write(&[1, 2, 3])).poll(&mut cx).is_ready());
rx.await.unwrap();
assert_eq!(&buf[0..3], &[1, 2, 3]);
})
}
}