// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use {
fuchsia_zircon::{self as zx, AsHandleRef},
marker::{PhantomData, Unpin},
task::{Context, Poll},
/// Marker trait for types that can be read/written with a `Fifo`.
/// Unsafe because not all types may be represented by arbitrary bit patterns.
pub unsafe trait FifoEntry {}
/// Identifies that the object may be used to write entries into a FIFO.
pub trait FifoWritable<W: FifoEntry>
Self: Sized,
/// Creates a future that transmits entries to be written.
/// The returned future will return after an entry has been received on
/// this fifo. The future will resolve to the fifo once all elements
/// have been transmitted.
/// An error during writing will cause the fifo to get
/// destroyed and the status will be returned.
fn write_entries<'a>(&'a self, entries: &'a [W]) -> WriteEntry<'a, Self, W> {
WriteEntry::new(self, entries)
/// Writes entries to the fifo and registers this `Fifo` as
/// needing a write on receiving a `zx::Status::SHOULD_WAIT`.
/// Returns the number of elements processed.
fn write(&self, cx: &mut Context<'_>, entries: &[W]) -> Poll<Result<usize, zx::Status>>;
/// Identifies that the object may be used to read entries from a FIFO.
pub trait FifoReadable<R: FifoEntry>
Self: Sized,
/// Creates a future that receives an entry to be written to the element
/// provided.
/// The returned future will return after an entry has been received on
/// this fifo. The future will resolve to the fifo and the entry.
/// An error during reading will cause the fifo and entry to get
/// destroyed and the status will be returned.
fn read_entry(&self) -> ReadEntry<'_, Self, R> {
/// Reads an entry from the fifo and registers this `Fifo` as
/// needing a read on receiving a `zx::Status::SHOULD_WAIT`.
fn read(&self, cx: &mut Context<'_>) -> Poll<Result<Option<R>, zx::Status>>;
/// An I/O object representing a `Fifo`.
pub struct Fifo<R: FifoEntry, W: FifoEntry = R> {
handle: RWHandle<zx::Fifo>,
read_marker: PhantomData<R>,
write_marker: PhantomData<W>,
impl<R: FifoEntry, W: FifoEntry> AsRef<zx::Fifo> for Fifo<R, W> {
fn as_ref(&self) -> &zx::Fifo {
impl<R: FifoEntry, W: FifoEntry> AsHandleRef for Fifo<R, W> {
fn as_handle_ref(&self) -> zx::HandleRef<'_> {
impl<R: FifoEntry, W: FifoEntry> From<Fifo<R, W>> for zx::Fifo {
fn from(fifo: Fifo<R, W>) -> zx::Fifo {
impl<R: FifoEntry, W: FifoEntry> Fifo<R, W> {
/// Creates a new `Fifo` from a previously-created `zx::Fifo`.
pub fn from_fifo(fifo: zx::Fifo) -> Result<Self, zx::Status> {
Ok(Fifo {
handle: RWHandle::new(fifo)?,
read_marker: PhantomData,
write_marker: PhantomData,
/// Test whether this fifo is ready to be written or not.
/// If the fifo is *not* writable then the current task is scheduled to
/// get a notification when the fifo does become writable. That is, this
/// is only suitable for calling in a `Future::poll` method and will
/// automatically handle ensuring a retry once the fifo is writable again.
/// Returns `true` if the CLOSED signal has been received.
pub fn poll_write(&self, cx: &mut Context<'_>) -> Poll<Result<bool, zx::Status>> {
/// Writes entries to the fifo and registers this `Fifo` as
/// needing a write on receiving a `zx::Status::SHOULD_WAIT`.
/// Returns the number of elements processed.
pub fn try_write(
cx: &mut Context<'_>,
entries: &[W],
) -> Poll<Result<usize, zx::Status>> {
let clear_closed = ready!(self.poll_write(cx)?);
let elem_size = ::std::mem::size_of::<W>();
let elembuf = unsafe {
::std::slice::from_raw_parts(entries.as_ptr() as *const u8, elem_size * entries.len())
match self.as_ref().write(elem_size, elembuf) {
Err(e) => {
if e == zx::Status::SHOULD_WAIT {
self.handle.need_write(cx, clear_closed)?;
} else {
Ok(count) => Poll::Ready(Ok(count)),
/// Test whether this fifo is ready to be read or not.
/// If the fifo is *not* readable then the current task is scheduled to
/// get a notification when the fifo does become readable. That is, this
/// is only suitable for calling in a `Future::poll` method and will
/// automatically handle ensuring a retry once the fifo is readable again.
/// Returns `true` if the CLOSED signal has been received.
pub fn poll_read(&self, cx: &mut Context<'_>) -> Poll<Result<bool, zx::Status>> {
/// Reads an entry from the fifo and registers this `Fifo` as
/// needing a read on receiving a `zx::Status::SHOULD_WAIT`.
pub fn try_read(&self, cx: &mut Context<'_>) -> Poll<Result<Option<R>, zx::Status>> {
let clear_closed = ready!(self.handle.poll_read(cx)?);
let mut element = ::std::mem::MaybeUninit::<R>::uninit();
let elembuf = unsafe {
element.as_mut_ptr() as *mut u8,
match self.as_ref().read(::std::mem::size_of::<R>(), elembuf) {
Err(e) => {
if e == zx::Status::SHOULD_WAIT {
self.handle.need_read(cx, clear_closed)?;
return Poll::Pending;
if e == zx::Status::PEER_CLOSED {
return Poll::Ready(Ok(None));
return Poll::Ready(Err(e));
Ok(count) => {
debug_assert_eq!(1, count);
let element = unsafe { element.assume_init() };
return Poll::Ready(Ok(Some(element)));
impl<R: FifoEntry, W: FifoEntry> FifoReadable<R> for Fifo<R, W> {
fn read(&self, cx: &mut Context<'_>) -> Poll<Result<Option<R>, zx::Status>> {
impl<R: FifoEntry, W: FifoEntry> FifoWritable<W> for Fifo<R, W> {
fn write(&self, cx: &mut Context<'_>, entries: &[W]) -> Poll<Result<usize, zx::Status>> {
self.try_write(cx, entries)
impl<R: FifoEntry, W: FifoEntry> fmt::Debug for Fifo<R, W> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
/// WriteEntry represents the future of one or more writes.
pub struct WriteEntry<'a, F, W> {
fifo: &'a F,
entries: &'a [W],
impl<'a, F, W> Unpin for WriteEntry<'a, F, W> {}
impl<'a, F: FifoWritable<W>, W: FifoEntry> WriteEntry<'a, F, W> {
/// Create a new WriteEntry, which borrows the `FifoWritable` type
/// until the future completes.
pub fn new(fifo: &'a F, entries: &'a [W]) -> Self {
WriteEntry { fifo, entries }
impl<'a, F: FifoWritable<W>, W: FifoEntry> Future for WriteEntry<'a, F, W> {
type Output = Result<(), zx::Status>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = &mut *self;
while !this.entries.is_empty() {
let advance = ready!(this.fifo.write(cx, this.entries)?);
this.entries = &this.entries[advance..];
/// ReadEntry represents the future of a single read.
pub struct ReadEntry<'a, F, R> {
fifo: &'a F,
read_marker: PhantomData<R>,
impl<'a, F, W> Unpin for ReadEntry<'a, F, W> {}
impl<'a, F: FifoReadable<R>, R: FifoEntry> ReadEntry<'a, F, R> {
/// Create a new ReadEntry, which borrows the `FifoReadable` type
/// until the future completes.
pub fn new(fifo: &'a F) -> ReadEntry<'_, F, R> {
ReadEntry { fifo, read_marker: PhantomData }
impl<'a, F: FifoReadable<R>, R: FifoEntry> Future for ReadEntry<'a, F, R> {
type Output = Result<Option<R>, zx::Status>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
mod tests {
use super::*;
use crate::{DurationExt, Executor, TimeoutExt, Timer};
use fuchsia_zircon::prelude::*;
use futures::future::try_join;
use futures::prelude::*;
#[derive(Clone, Debug, PartialEq, Eq)]
struct entry {
a: u32,
b: u32,
unsafe impl FifoEntry for entry {}
#[derive(Clone, Debug, PartialEq, Eq)]
struct wrong_entry {
a: u16,
unsafe impl FifoEntry for wrong_entry {}
fn can_read_write() {
let mut exec = Executor::new().expect("failed to create executor");
let elements: &[entry; 1] = &[entry { a: 10, b: 20 }];
let (tx, rx) =
zx::Fifo::create(2, ::std::mem::size_of::<entry>()).expect("failed to create zx fifo");
let (tx, rx) = (
Fifo::<entry>::from_fifo(tx).expect("failed to create async tx fifo"),
Fifo::<entry>::from_fifo(rx).expect("failed to create async rx fifo"),
let receive_future = rx.read_entry().map_ok(|entry| {
assert_eq!(elements[0], entry.expect("peer closed"));
// add a timeout to receiver so if test is broken it doesn't take forever
let receiver = receive_future.on_timeout(300.millis().after_now(), || panic!("timeout"));
// Sends an entry after the timeout has passed
let sender = Timer::new(10.millis().after_now()).then(|()| tx.write_entries(elements));
let done = try_join(receiver, sender);
exec.run_singlethreaded(done).expect("failed to run receive future on executor");
fn read_wrong_size() {
let mut exec = Executor::new().expect("failed to create executor");
let elements: &[entry; 1] = &[entry { a: 10, b: 20 }];
let (tx, rx) =
zx::Fifo::create(2, ::std::mem::size_of::<entry>()).expect("failed to create zx fifo");
let (tx, rx) = (
Fifo::<entry>::from_fifo(tx).expect("failed to create async tx fifo"),
Fifo::<wrong_entry>::from_fifo(rx).expect("failed to create async rx fifo"),
let receive_future = rx.read_entry().map_ok(|_entry| panic!("read should have failed"));
// add a timeout to receiver so if test is broken it doesn't take forever
let receiver = receive_future.on_timeout(300.millis().after_now(), || panic!("timeout"));
// Sends an entry after the timeout has passed
let sender = Timer::new(10.millis().after_now()).then(|()| tx.write_entries(elements));
let done = try_join(receiver, sender);
let res = exec.run_singlethreaded(done);
match res {
Err(zx::Status::OUT_OF_RANGE) => (),
_ => panic!("did not get out-of-range error"),
fn write_wrong_size() {
let mut exec = Executor::new().expect("failed to create executor");
let elements: &[wrong_entry; 1] = &[wrong_entry { a: 10 }];
let (tx, rx) =
zx::Fifo::create(2, ::std::mem::size_of::<entry>()).expect("failed to create zx fifo");
let (tx, _rx) = (
Fifo::<wrong_entry>::from_fifo(tx).expect("failed to create async tx fifo"),
Fifo::<entry>::from_fifo(rx).expect("failed to create async rx fifo"),
let sender = Timer::new(10.millis().after_now()).then(|()| tx.write_entries(elements));
let res = exec.run_singlethreaded(sender);
match res {
Err(zx::Status::OUT_OF_RANGE) => (),
_ => panic!("did not get out-of-range error"),
fn write_into_full() {
use std::sync::atomic::{AtomicUsize, Ordering};
let mut exec = Executor::new().expect("failed to create executor");
let elements: &[entry; 3] =
&[entry { a: 10, b: 20 }, entry { a: 30, b: 40 }, entry { a: 50, b: 60 }];
let (tx, rx) =
zx::Fifo::create(2, ::std::mem::size_of::<entry>()).expect("failed to create zx fifo");
let (tx, rx) = (
Fifo::<entry>::from_fifo(tx).expect("failed to create async tx fifo"),
Fifo::<entry>::from_fifo(rx).expect("failed to create async rx fifo"),
// Use `writes_completed` to verify that not all writes
// are transmitted at once, and the last write is actually blocked.
let writes_completed = AtomicUsize::new(0);
let sender = async {
writes_completed.fetch_add(1, Ordering::SeqCst);
writes_completed.fetch_add(1, Ordering::SeqCst);
Ok::<(), zx::Status>(())
// Wait 10 ms, then read the messages from the fifo.
let receive_future = async {
let entry = rx.read_entry().await?;
assert_eq!(writes_completed.load(Ordering::SeqCst), 1);
assert_eq!(elements[0], entry.expect("peer closed"));
let entry = rx.read_entry().await?;
// At this point, the last write may or may not have
// been written.
assert_eq!(elements[1], entry.expect("peer closed"));
let entry = rx.read_entry().await?;
assert_eq!(writes_completed.load(Ordering::SeqCst), 2);
assert_eq!(elements[2], entry.expect("peer closed"));
Ok::<(), zx::Status>(())
// add a timeout to receiver so if test is broken it doesn't take forever
let receiver = receive_future.on_timeout(300.millis().after_now(), || panic!("timeout"));
let done = try_join(receiver, sender);
exec.run_singlethreaded(done).expect("failed to run receive future on executor");
fn write_more_than_full() {
let mut exec = Executor::new().expect("failed to create executor");
let elements: &[entry; 3] =
&[entry { a: 10, b: 20 }, entry { a: 30, b: 40 }, entry { a: 50, b: 60 }];
let (tx, rx) =
zx::Fifo::create(2, ::std::mem::size_of::<entry>()).expect("failed to create zx fifo");
let (tx, rx) = (
Fifo::<entry>::from_fifo(tx).expect("failed to create async tx fifo"),
Fifo::<entry>::from_fifo(rx).expect("failed to create async rx fifo"),
let sender = tx.write_entries(elements);
// Wait 10 ms, then read the messages from the fifo.
let receive_future = async {
let entry = rx.read_entry().await?;
assert_eq!(elements[0], entry.expect("peer closed"));
let entry = rx.read_entry().await?;
assert_eq!(elements[1], entry.expect("peer closed"));
let entry = rx.read_entry().await?;
assert_eq!(elements[2], entry.expect("peer closed"));
Ok::<(), zx::Status>(())
// add a timeout to receiver so if test is broken it doesn't take forever
let receiver = receive_future.on_timeout(300.millis().after_now(), || panic!("timeout"));
let done = try_join(receiver, sender);
exec.run_singlethreaded(done).expect("failed to run receive future on executor");