// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
//! Temporary Futures extensions used during the transition from 0.2 -> 0.3.
//! These SHOULD NOT be used for new code. Instead, use the corresponding 0.3
//! functions or `async_await`-based utilities.
use {
future::{Future, FutureExt},
io::{self, AsyncRead, AsyncWrite},
stream::{Stream, StreamExt},
task::{Context, Poll},
std::{marker::Unpin, mem, pin::Pin},
pub trait TempStreamExt: Stream + Sized {
fn first_elem(self) -> FirstElem<Self> {
FirstElem { stream: self }
fn try_into_future<T, E>(self) -> TryIntoFuture<Self>
Self: Stream<Item = Result<T, E>> + Unpin,
TryIntoFuture { stream: Some(self) }
impl<T: Stream + Sized> TempStreamExt for T {}
pub struct FirstElem<St> {
stream: St,
impl<St> FirstElem<St> {
// Safety: `FirstElem` is `Unpin` iff `St` is `Unpin`.
unsafe_pinned!(stream: St);
impl<St: Stream> Future for FirstElem<St> {
type Output = Option<St::Item>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
pub struct TryIntoFuture<St> {
stream: Option<St>,
impl<St> Unpin for TryIntoFuture<St> {}
impl<T, E, St: Stream<Item = Result<T, E>> + Unpin> Future for TryIntoFuture<St> {
type Output = Result<(Option<T>, St), E>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let res = ready!(;
Poll::Ready(match res {
Some(Ok(elem)) => Ok((Some(elem),,
None => Ok((None,,
Some(Err(e)) => Err(e),
pub trait TempAsyncWriteExt: AsyncWrite + Sized {
fn write_all<T: AsRef<[u8]>>(self, buf: T) -> WriteAll<Self, T> {
write_all(self, buf)
impl<T: AsyncWrite + Sized> TempAsyncWriteExt for T {}
pub struct WriteAll<A, T> {
state: WriteState<A, T>,
impl<A, T> Unpin for WriteAll<A, T> {}
enum WriteState<A, T> {
Writing { a: A, buf: T, pos: usize },
pub fn write_all<A, T>(a: A, buf: T) -> WriteAll<A, T>
A: AsyncWrite,
T: AsRef<[u8]>,
WriteAll { state: WriteState::Writing { a: a, buf: buf, pos: 0 } }
fn zero_write() -> io::Error {
io::Error::new(io::ErrorKind::WriteZero, "zero-length write")
impl<A, T> Future for WriteAll<A, T>
A: AsyncWrite + Unpin,
T: AsRef<[u8]>,
type Output = io::Result<(A, T)>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match &mut self.state {
WriteState::Writing { a, buf, pos } => {
let buf = buf.as_ref();
while *pos < buf.len() {
let n = ready!(Pin::new(&mut *a).poll_write(cx, &buf[*pos..]))?;
*pos += n;
if n == 0 {
return Poll::Ready(Err(zero_write()));
WriteState::Empty => panic!("poll a WriteAll after it's done"),
match mem::replace(&mut self.state, WriteState::Empty) {
WriteState::Writing { a, buf, .. } => Poll::Ready(Ok((a, buf).into())),
WriteState::Empty => panic!(),
pub trait TempAsyncReadExt: AsyncRead + Sized {
fn read_to_end(self, buf: Vec<u8>) -> ReadToEnd<Self> {
read_to_end(self, buf)
impl<T: AsyncRead + Sized> TempAsyncReadExt for T {}
pub struct ReadToEnd<A> {
state: State<A>,
impl<A> Unpin for ReadToEnd<A> {}
enum State<A> {
Reading { a: A, buf: Vec<u8> },
pub fn read_to_end<A>(a: A, buf: Vec<u8>) -> ReadToEnd<A>
A: AsyncRead,
ReadToEnd { state: State::Reading { a, buf } }
struct Guard<'a> {
buf: &'a mut Vec<u8>,
len: usize,
impl<'a> Drop for Guard<'a> {
fn drop(&mut self) {
unsafe {
// This uses an adaptive system to extend the vector when it fills. We want to
// avoid paying to allocate and zero a huge chunk of memory if the reader only
// has 4 bytes while still making large reads if the reader does have a ton
// of data to return. Simply tacking on an extra DEFAULT_BUF_SIZE space every
// time is 4,500 times (!) slower than this if the reader has a very small
// amount of data to return.
// Because we're extending the buffer with uninitialized data for trusted
// readers, we need to make sure to truncate that if any of this panics.
fn read_to_end_internal<R: AsyncRead + Unpin>(
r: &mut R,
cx: &mut Context<'_>,
buf: &mut Vec<u8>,
) -> Poll<io::Result<usize>> {
let start_len = buf.len();
let mut g = Guard { len: buf.len(), buf: buf };
let ret;
loop {
if g.len == g.buf.len() {
let capacity = g.buf.capacity();
// FIXME: don't zero when a sound `std::mem::freeze` or similar exists
g.buf.resize(capacity, 0);
match ready!(Pin::new(&mut *r).poll_read(cx, &mut g.buf[g.len..])) {
Ok(0) => {
ret = Poll::Ready(Ok(g.len - start_len));
Ok(n) => g.len += n,
Err(e) => {
ret = Poll::Ready(Err(e));
impl<A> Future for ReadToEnd<A>
A: AsyncRead + Unpin,
type Output = io::Result<(A, Vec<u8>)>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = &mut *self;
match this.state {
State::Reading { ref mut a, ref mut buf } => {
// If we get `Ok`, then we know the stream hit EOF and we're done. If we
// hit "would block" then all the read data so far is in our buffer, and
// otherwise we propagate errors
ready!(read_to_end_internal(a, cx, buf))?;
State::Empty => panic!("poll ReadToEnd after it's done"),
match mem::replace(&mut this.state, State::Empty) {
State::Reading { a, buf } => Poll::Ready(Ok((a, buf).into())),
State::Empty => unreachable!(),
pub trait TempFutureExt: Future + Sized {
fn select<B>(self, b: B) -> Select<Self, B> {
Select { a: self, b }
fn select_unpin<B>(self, b: B) -> SelectUnpin<Self, B>
Self: Unpin,
B: Unpin,
SelectUnpin { a: Some(self), b: Some(b) }
impl<T: Future + Sized> TempFutureExt for T {}
pub enum Either<A, B> {
impl<A, B> Either<A, B> {
pub fn either<T>(self, lf: impl FnOnce(A) -> T, rf: impl FnOnce(B) -> T) -> T {
match self {
Either::Left(a) => (lf)(a),
Either::Right(b) => (rf)(b),
impl<A: Future, B: Future<Output = A::Output>> Future for Either<A, B> {
type Output = A::Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
unsafe {
// Safety: neither child future is ever moved
match Pin::get_unchecked_mut(self) {
Either::Left(a) => Pin::new_unchecked(a).poll(cx),
Either::Right(b) => Pin::new_unchecked(b).poll(cx),
pub struct Select<A, B> {
a: A,
b: B,
impl<A, B> Select<A, B> {
unsafe_pinned!(a: A);
unsafe_pinned!(b: B);
impl<A: Future, B: Future> Future for Select<A, B> {
type Output = Either<A::Output, B::Output>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if let Poll::Ready(a) = self.as_mut().a().poll(cx) {
return Poll::Ready(Either::Left(a));
if let Poll::Ready(b) = self.as_mut().b().poll(cx) {
return Poll::Ready(Either::Right(b));
pub struct SelectUnpin<A, B> {
a: Option<A>,
b: Option<B>,
impl<A: Future + Unpin, B: Future + Unpin> Future for SelectUnpin<A, B> {
type Output = Either<(A::Output, B), (A, B::Output)>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = &mut *self;
if let Poll::Ready(a) = this.a.as_mut().unwrap().poll_unpin(cx) {
return Poll::Ready(Either::Left((a, this.b.take().unwrap())));
if let Poll::Ready(b) = this.b.as_mut().unwrap().poll_unpin(cx) {
return Poll::Ready(Either::Right((this.a.take().unwrap(), b)));