blob: e25b894d177db0b5d7de28cdbf9a5650675ba7b0 [file] [log] [blame]
// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use std::fmt;
use std::io;
use std::pin::Pin;
use fuchsia_zircon::{self as zx, AsHandleRef, MessageBuf};
use futures::{task::LocalWaker, try_ready, Future, Poll};
use crate::RWHandle;
/// An I/O object representing a `Channel`.
pub struct Channel(RWHandle<zx::Channel>);
impl AsRef<zx::Channel> for Channel {
fn as_ref(&self) -> &zx::Channel {
self.0.get_ref()
}
}
impl AsHandleRef for Channel {
fn as_handle_ref(&self) -> zx::HandleRef {
self.0.get_ref().as_handle_ref()
}
}
impl From<Channel> for zx::Channel {
fn from(channel: Channel) -> zx::Channel {
channel.0.into_inner()
}
}
impl Channel {
/// Creates a new `Channel` from a previously-created `zx::Channel`.
pub fn from_channel(channel: zx::Channel) -> io::Result<Channel> {
Ok(Channel(RWHandle::new(channel)?))
}
/// Tests to see if the channel received a OBJECT_PEER_CLOSED signal
pub fn is_closed(&self) -> bool {
self.0.is_closed()
}
/// Test whether this socket is ready to be read or not.
///
/// If the socket is *not* readable then the current task is scheduled to
/// get a notification when the socket does become readable. That is, this
/// is only suitable for calling in a `Future::poll` method and will
/// automatically handle ensuring a retry once the socket is readable again.
fn poll_read(&self, lw: &LocalWaker) -> Poll<Result<(), zx::Status>> {
self.0.poll_read(lw)
}
/// Receives a message on the channel and registers this `Channel` as
/// needing a read on receiving a `zx::Status::SHOULD_WAIT`.
pub fn recv_from(&self, buf: &mut MessageBuf, lw: &LocalWaker) -> Poll<Result<(), zx::Status>> {
try_ready!(self.poll_read(lw));
let res = self.0.get_ref().read(buf);
if res == Err(zx::Status::SHOULD_WAIT) {
self.0.need_read(lw)?;
return Poll::Pending;
}
Poll::Ready(res)
}
/// Creates a future that receive a message to be written to the buffer
/// provided.
///
/// The returned future will return after a message has been received on
/// this socket and been placed into the buffer.
pub fn recv_msg<'a>(&'a self, buf: &'a mut MessageBuf) -> RecvMsg<'a> {
RecvMsg { channel: self, buf }
}
/// Writes a message into the channel.
pub fn write(&self, bytes: &[u8], handles: &mut Vec<zx::Handle>) -> Result<(), zx::Status> {
self.0.get_ref().write(bytes, handles)
}
/// Consumes self and returns the underlying zx::Channel
pub fn into_zx_channel(self) -> zx::Channel {
self.0.into_inner()
}
}
impl fmt::Debug for Channel {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
self.0.get_ref().fmt(f)
}
}
/// A future used to receive a message from a channel.
///
/// This is created by the `Channel::recv_msg` method.
#[must_use = "futures do nothing unless polled"]
pub struct RecvMsg<'a> {
channel: &'a Channel,
buf: &'a mut MessageBuf,
}
impl<'a> Future for RecvMsg<'a> {
type Output = Result<(), zx::Status>;
fn poll(mut self: Pin<&mut Self>, lw: &LocalWaker) -> Poll<Self::Output> {
let this = &mut *self;
this.channel.recv_from(this.buf, lw)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::Executor;
use fuchsia_zircon::{self as zx, MessageBuf};
use pin_utils::pin_mut;
#[test]
fn can_receive() {
let mut exec = Executor::new().unwrap();
let bytes = &[0, 1, 2, 3];
let (tx, rx) = zx::Channel::create().unwrap();
let f_rx = Channel::from_channel(rx).unwrap();
let receiver = async move {
let mut buffer = MessageBuf::new();
await!(f_rx.recv_msg(&mut buffer)).expect("failed to receive message");
assert_eq!(bytes, buffer.bytes());
};
pin_mut!(receiver);
assert!(exec.run_until_stalled(&mut receiver).is_pending());
let mut handles = Vec::new();
tx.write(bytes, &mut handles)
.expect("failed to write message");
assert!(exec.run_until_stalled(&mut receiver).is_ready());
}
}