blob: 2493a1159770ff287498343a0c0de28e07cba8ed [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
//! Provides async Channel type wrapped around an emulated zircon channel.
// TODO(ctiller): merge this implementation with the implementation in zircon_handle?
use super::{
on_signals::OnSignalsRef, Handle, HandleDisposition, HandleInfo, MessageBuf, MessageBufEtc,
Signals,
};
use fuchsia_zircon_status as zx_status;
use std::{
pin::Pin,
task::{Context, Poll},
};
/// An I/O object representing a `Channel`.
pub struct Channel {
channel: super::Channel,
}
impl AsRef<super::Channel> for Channel {
fn as_ref(&self) -> &super::Channel {
&self.channel
}
}
impl From<Channel> for super::Channel {
fn from(channel: Channel) -> super::Channel {
channel.channel
}
}
impl super::AsHandleRef for Channel {
fn as_handle_ref(&self) -> super::HandleRef<'_> {
self.channel.as_handle_ref()
}
}
impl std::fmt::Debug for Channel {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.channel.fmt(f)
}
}
impl Channel {
/// Returns true if the channel is closed (i.e. other side was dropped).
pub fn is_closed(&self) -> bool {
self.channel.is_closed()
}
/// If [`is_closed`] returns true, this may return a string explaining why the handle was closed.
pub fn closed_reason(&self) -> Option<String> {
self.channel.closed_reason()
}
/// Close this channel, setting `msg` as a reason for the closure.
pub fn close_with_reason(self, msg: String) {
self.channel.close_with_reason(msg)
}
/// Overnet announcing to us what protocol is being used to proxy this channel.
pub fn set_channel_proxy_protocol(&self, proto: super::ChannelProxyProtocol) {
self.channel.set_channel_proxy_protocol(proto)
}
/// Receive an announcement from overnet if this channel is proxied via a particular protocol
pub async fn get_channel_proxy_protocol(&self) -> Option<super::ChannelProxyProtocol> {
self.channel.get_channel_proxy_protocol().await
}
/// Returns a future that completes when `is_closed()` is true.
pub fn on_closed(&self) -> OnSignalsRef<'_> {
OnSignalsRef::new(self, Signals::CHANNEL_PEER_CLOSED)
}
/// Writes a message into the channel.
pub fn write(&self, bytes: &[u8], handles: &mut [Handle]) -> Result<(), zx_status::Status> {
self.channel.write(bytes, handles)
}
/// Writes a message into the channel.
pub fn write_etc<'a>(
&self,
bytes: &[u8],
handles: &mut [HandleDisposition<'a>],
) -> Result<(), zx_status::Status> {
self.channel.write_etc(bytes, handles)
}
/// Consumes self and returns the underlying Channel (named thusly for compatibility with
/// fasync variant)
pub fn into_zx_channel(self) -> super::Channel {
self.channel
}
/// Receives a message on the channel and registers this `Channel` as
/// needing a read on receiving a `io::std::ErrorKind::WouldBlock`.
///
/// Identical to `recv_from` except takes separate bytes and handles buffers
/// rather than a single `MessageBuf`.
pub fn read(
&self,
cx: &mut Context<'_>,
bytes: &mut Vec<u8>,
handles: &mut Vec<Handle>,
) -> Poll<Result<(), zx_status::Status>> {
self.channel.poll_read(cx, bytes, handles)
}
/// Reads a message from a channel into a fixed size buffer. If either `buf` or `handles` is
/// not large enough to hold the message, then `Err((buf_len, handles_len))` is returned. The
/// caller is then expected to invoke the read function again, albeit with a resized buffer
/// large enough to fit the output values.
///
/// If there are any general errors that happen during read, then `Ok(Err(_), (0, 0))` is
/// returned.
///
/// On success, `Ok(Ok(()), (buf_len, handles_len))` is returned. As with zx_channel_read, of
/// which this function is an analogue, there are no partial reads.
///
/// It is important to remember to check the `Ok(_)` result for potential errors, like
/// `PEER_CLOSED`, for example.
pub fn read_raw(
&self,
cx: &mut Context<'_>,
bytes: &mut [u8],
handles: &mut [std::mem::MaybeUninit<Handle>],
) -> Poll<Result<(Result<(), zx_status::Status>, usize, usize), (usize, usize)>> {
self.channel.poll_read_raw(cx, bytes, handles)
}
/// Receives a message on the channel and registers this `Channel` as
/// needing a read on receiving a `io::std::ErrorKind::WouldBlock`.
///
/// Identical to `recv_etc_from` except takes separate bytes and handles
/// buffers rather than a single `MessageBuf`.
pub fn read_etc(
&self,
cx: &mut Context<'_>,
bytes: &mut Vec<u8>,
handles: &mut Vec<HandleInfo>,
) -> Poll<Result<(), zx_status::Status>> {
self.channel.poll_read_etc(cx, bytes, handles)
}
/// Receives a message on the channel and registers this `Channel` as
/// needing a read on receiving a `io::std::ErrorKind::WouldBlock`.
pub fn recv_from(
&self,
ctx: &mut Context<'_>,
buf: &mut MessageBuf,
) -> Poll<Result<(), zx_status::Status>> {
let (bytes, handles) = buf.split_mut();
self.read(ctx, bytes, handles)
}
/// Receives a message on the channel and registers this `Channel` as
/// needing a read on receiving a `io::std::ErrorKind::WouldBlock`.
pub fn recv_etc_from(
&self,
ctx: &mut Context<'_>,
buf: &mut MessageBufEtc,
) -> Poll<Result<(), zx_status::Status>> {
let (bytes, handles) = buf.split_mut();
self.read_etc(ctx, bytes, handles)
}
/// Creates a future that receive a message to be written to the buffer
/// provided.
///
/// The returned future will return after a message has been received on
/// this socket and been placed into the buffer.
pub fn recv_msg<'a>(&'a self, buf: &'a mut MessageBuf) -> RecvMsg<'a> {
RecvMsg { channel: self, buf }
}
/// Creates a future that receive a message to be written to the buffer
/// provided.
///
/// The returned future will return after a message has been received on
/// this socket and been placed into the buffer.
pub fn recv_etc_msg<'a>(&'a self, buf: &'a mut MessageBufEtc) -> RecvEtcMsg<'a> {
RecvEtcMsg { channel: self, buf }
}
/// Creates a new `Channel` from a previously-created `emulated_handle::Channel`.
pub fn from_channel(channel: super::Channel) -> Self {
Channel { channel }
}
}
/// A future used to receive a message from a channel.
///
/// This is created by the `Channel::recv_msg` method.
#[must_use = "futures do nothing unless polled"]
pub struct RecvMsg<'a> {
channel: &'a Channel,
buf: &'a mut MessageBuf,
}
impl<'a> futures::Future for RecvMsg<'a> {
type Output = Result<(), zx_status::Status>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = &mut *self;
this.channel.recv_from(cx, this.buf)
}
}
/// A future used to receive a message from a channel.
///
/// This is created by the `Channel::recv_etc_msg` method.
#[must_use = "futures do nothing unless polled"]
pub struct RecvEtcMsg<'a> {
channel: &'a Channel,
buf: &'a mut MessageBufEtc,
}
impl<'a> futures::Future for RecvEtcMsg<'a> {
type Output = Result<(), zx_status::Status>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = &mut *self;
this.channel.recv_etc_from(cx, this.buf)
}
}
#[cfg(test)]
mod test {
use super::super::Channel;
use super::super::{Handle, HandleDisposition, HandleOp, ObjectType, Rights, Status};
use super::Channel as AsyncChannel;
use super::{MessageBuf, MessageBufEtc};
use futures::executor::block_on;
use futures::task::noop_waker_ref;
use std::future::Future;
use std::pin::Pin;
use std::task::Context;
#[test]
fn async_channel_write_read() {
block_on(async move {
let (a, b) = Channel::create();
let (a, b) = (AsyncChannel::from_channel(a), AsyncChannel::from_channel(b));
let mut buf = MessageBuf::new();
let mut cx = Context::from_waker(noop_waker_ref());
let mut rx = b.recv_msg(&mut buf);
assert_eq!(Pin::new(&mut rx).poll(&mut cx), std::task::Poll::Pending);
a.write(&[1, 2, 3], &mut vec![]).unwrap();
rx.await.unwrap();
assert_eq!(buf.bytes(), &[1, 2, 3]);
let mut rx = a.recv_msg(&mut buf);
assert!(Pin::new(&mut rx).poll(&mut cx).is_pending());
b.write(&[1, 2, 3], &mut vec![]).unwrap();
rx.await.unwrap();
assert_eq!(buf.bytes(), &[1, 2, 3]);
})
}
#[test]
fn async_channel_write_etc_read_etc() {
block_on(async move {
let (a, b) = Channel::create();
let (a, b) = (AsyncChannel::from_channel(a), AsyncChannel::from_channel(b));
let mut buf = MessageBufEtc::new();
let mut cx = Context::from_waker(noop_waker_ref());
let mut rx = b.recv_etc_msg(&mut buf);
assert_eq!(Pin::new(&mut rx).poll(&mut cx), std::task::Poll::Pending);
a.write_etc(&[1, 2, 3], &mut vec![]).unwrap();
rx.await.unwrap();
assert_eq!(buf.bytes(), &[1, 2, 3]);
let mut rx = a.recv_etc_msg(&mut buf);
assert!(Pin::new(&mut rx).poll(&mut cx).is_pending());
let (c, _) = Channel::create();
b.write_etc(
&[1, 2, 3],
&mut vec![HandleDisposition {
handle_op: HandleOp::Move(c.into()),
object_type: ObjectType::CHANNEL,
rights: Rights::TRANSFER | Rights::WRITE,
result: Status::OK,
}],
)
.unwrap();
rx.await.unwrap();
assert_eq!(buf.bytes(), &[1, 2, 3]);
assert_eq!(buf.n_handle_infos(), 1);
let hi = &buf.handle_infos[0];
assert_ne!(hi.handle, Handle::invalid());
assert_eq!(hi.object_type, ObjectType::CHANNEL);
assert_eq!(hi.rights, Rights::TRANSFER | Rights::WRITE);
})
}
}