| // Copyright 2018 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| //! Temporary Futures extensions used during the transition from 0.2 -> 0.3. |
| //! These SHOUD NOT be used for new code. Instead, use the corresponding 0.3 |
| //! functions or `async_await`-based utilities. |
| |
| #![allow(missing_docs)] |
| |
| use { |
| futures::{ |
| future::{Future, FutureExt}, |
| io::{self, AsyncRead, AsyncWrite}, |
| ready, |
| stream::{Stream, StreamExt}, |
| task::{LocalWaker, Poll}, |
| try_ready, |
| }, |
| pin_utils::unsafe_pinned, |
| std::{ |
| marker::Unpin, |
| mem, |
| pin::Pin, |
| }, |
| }; |
| |
| pub trait TempStreamExt: Stream + Sized { |
| fn first_elem(self) -> FirstElem<Self> { |
| FirstElem { stream: self } |
| } |
| fn try_into_future<T, E>(self) -> TryIntoFuture<Self> |
| where |
| Self: Stream<Item = Result<T, E>> + Unpin, |
| { |
| TryIntoFuture { stream: Some(self) } |
| } |
| } |
| |
| impl<T: Stream + Sized> TempStreamExt for T {} |
| |
| pub struct FirstElem<St> { |
| stream: St, |
| } |
| |
| impl<St> FirstElem<St> { |
| // Safety: `FirstElem` is `Unpin` iff `St` is `Unpin`. |
| unsafe_pinned!(stream: St); |
| } |
| |
| impl<St: Stream> Future for FirstElem<St> { |
| type Output = Option<St::Item>; |
| |
| fn poll(self: Pin<&mut Self>, lw: &LocalWaker) -> Poll<Self::Output> { |
| self.stream().poll_next(lw) |
| } |
| } |
| |
| pub struct TryIntoFuture<St> { |
| stream: Option<St>, |
| } |
| |
| impl<St> Unpin for TryIntoFuture<St> {} |
| |
| impl<T, E, St: Stream<Item = Result<T, E>> + Unpin> Future for TryIntoFuture<St> { |
| type Output = Result<(Option<T>, St), E>; |
| |
| fn poll(mut self: Pin<&mut Self>, lw: &LocalWaker) -> Poll<Self::Output> { |
| let res = ready!(self.stream.as_mut().unwrap().poll_next_unpin(lw)); |
| Poll::Ready(match res { |
| Some(Ok(elem)) => Ok((Some(elem), self.stream.take().unwrap())), |
| None => Ok((None, self.stream.take().unwrap())), |
| Some(Err(e)) => Err(e), |
| }) |
| } |
| } |
| |
| pub trait TempAsyncWriteExt: AsyncWrite + Sized { |
| fn write_all<T: AsRef<[u8]>>(self, buf: T) -> WriteAll<Self, T> { |
| write_all(self, buf) |
| } |
| } |
| |
| impl<T: AsyncWrite + Sized> TempAsyncWriteExt for T {} |
| |
| pub struct WriteAll<A, T> { |
| state: WriteState<A, T>, |
| } |
| |
| impl<A, T> Unpin for WriteAll<A, T> {} |
| |
| enum WriteState<A, T> { |
| Writing { a: A, buf: T, pos: usize }, |
| Empty, |
| } |
| |
| pub fn write_all<A, T>(a: A, buf: T) -> WriteAll<A, T> |
| where |
| A: AsyncWrite, |
| T: AsRef<[u8]>, |
| { |
| WriteAll { |
| state: WriteState::Writing { |
| a: a, |
| buf: buf, |
| pos: 0, |
| }, |
| } |
| } |
| |
| fn zero_write() -> io::Error { |
| io::Error::new(io::ErrorKind::WriteZero, "zero-length write") |
| } |
| |
| impl<A, T> Future for WriteAll<A, T> |
| where |
| A: AsyncWrite, |
| T: AsRef<[u8]>, |
| { |
| type Output = io::Result<(A, T)>; |
| |
| fn poll(mut self: Pin<&mut Self>, lw: &LocalWaker) -> Poll<Self::Output> { |
| match self.state { |
| WriteState::Writing { |
| ref mut a, |
| ref buf, |
| ref mut pos, |
| } => { |
| let buf = buf.as_ref(); |
| while *pos < buf.len() { |
| let n = try_ready!(a.poll_write(lw, &buf[*pos..])); |
| *pos += n; |
| if n == 0 { |
| return Poll::Ready(Err(zero_write())); |
| } |
| } |
| } |
| WriteState::Empty => panic!("poll a WriteAll after it's done"), |
| } |
| |
| match mem::replace(&mut self.state, WriteState::Empty) { |
| WriteState::Writing { a, buf, .. } => Poll::Ready(Ok((a, buf).into())), |
| WriteState::Empty => panic!(), |
| } |
| } |
| } |
| |
| pub trait TempAsyncReadExt: AsyncRead + Sized { |
| fn read_to_end(self, buf: Vec<u8>) -> ReadToEnd<Self> { |
| read_to_end(self, buf) |
| } |
| } |
| |
| impl<T: AsyncRead + Sized> TempAsyncReadExt for T {} |
| |
| pub struct ReadToEnd<A> { |
| state: State<A>, |
| } |
| |
| impl<A> Unpin for ReadToEnd<A> {} |
| |
| enum State<A> { |
| Reading { a: A, buf: Vec<u8> }, |
| Empty, |
| } |
| |
| pub fn read_to_end<A>(a: A, buf: Vec<u8>) -> ReadToEnd<A> |
| where |
| A: AsyncRead, |
| { |
| ReadToEnd { |
| state: State::Reading { a, buf }, |
| } |
| } |
| |
| struct Guard<'a> { |
| buf: &'a mut Vec<u8>, |
| len: usize, |
| } |
| |
| impl<'a> Drop for Guard<'a> { |
| fn drop(&mut self) { |
| unsafe { |
| self.buf.set_len(self.len); |
| } |
| } |
| } |
| |
| // This uses an adaptive system to extend the vector when it fills. We want to |
| // avoid paying to allocate and zero a huge chunk of memory if the reader only |
| // has 4 bytes while still making large reads if the reader does have a ton |
| // of data to return. Simply tacking on an extra DEFAULT_BUF_SIZE space every |
| // time is 4,500 times (!) slower than this if the reader has a very small |
| // amount of data to return. |
| // |
| // Because we're extending the buffer with uninitialized data for trusted |
| // readers, we need to make sure to truncate that if any of this panics. |
| fn read_to_end_internal<R: AsyncRead>( |
| r: &mut R, lw: &LocalWaker, buf: &mut Vec<u8>, |
| ) -> Poll<io::Result<usize>> { |
| let start_len = buf.len(); |
| let mut g = Guard { |
| len: buf.len(), |
| buf: buf, |
| }; |
| let ret; |
| loop { |
| if g.len == g.buf.len() { |
| unsafe { |
| g.buf.reserve(32); |
| let capacity = g.buf.capacity(); |
| g.buf.set_len(capacity); |
| r.initializer().initialize(&mut g.buf[g.len..]); |
| } |
| } |
| |
| match ready!(r.poll_read(lw, &mut g.buf[g.len..])) { |
| Ok(0) => { |
| ret = Poll::Ready(Ok(g.len - start_len)); |
| break; |
| } |
| Ok(n) => g.len += n, |
| Err(e) => { |
| ret = Poll::Ready(Err(e)); |
| break; |
| } |
| } |
| } |
| |
| ret |
| } |
| |
| impl<A> Future for ReadToEnd<A> |
| where |
| A: AsyncRead, |
| { |
| type Output = io::Result<(A, Vec<u8>)>; |
| |
| fn poll(mut self: Pin<&mut Self>, lw: &LocalWaker) -> Poll<Self::Output> { |
| let this = &mut *self; |
| match this.state { |
| State::Reading { |
| ref mut a, |
| ref mut buf, |
| } => { |
| // If we get `Ok`, then we know the stream hit EOF and we're done. If we |
| // hit "would block" then all the read data so far is in our buffer, and |
| // otherwise we propagate errors |
| try_ready!(read_to_end_internal(a, lw, buf)); |
| } |
| State::Empty => panic!("poll ReadToEnd after it's done"), |
| } |
| |
| match mem::replace(&mut this.state, State::Empty) { |
| State::Reading { a, buf } => Poll::Ready(Ok((a, buf).into())), |
| State::Empty => unreachable!(), |
| } |
| } |
| } |
| |
| pub trait TempFutureExt: Future + Sized { |
| fn left_future<B>(self) -> Either<Self, B> { |
| Either::Left(self) |
| } |
| |
| fn right_future<A>(self) -> Either<A, Self> { |
| Either::Right(self) |
| } |
| |
| fn select<B>(self, b: B) -> Select<Self, B> { |
| Select { a: self, b } |
| } |
| |
| fn select_unpin<B>(self, b: B) -> SelectUnpin<Self, B> |
| where |
| Self: Unpin, |
| B: Unpin, |
| { |
| SelectUnpin { |
| a: Some(self), |
| b: Some(b), |
| } |
| } |
| } |
| |
| impl<T: Future + Sized> TempFutureExt for T {} |
| |
| pub enum Either<A, B> { |
| Left(A), |
| Right(B), |
| } |
| |
| impl<A, B> Either<A, B> { |
| pub fn either<T>(self, lf: impl FnOnce(A) -> T, rf: impl FnOnce(B) -> T) -> T { |
| match self { |
| Either::Left(a) => (lf)(a), |
| Either::Right(b) => (rf)(b), |
| } |
| } |
| } |
| |
| impl<A: Future, B: Future<Output = A::Output>> Future for Either<A, B> { |
| type Output = A::Output; |
| fn poll(self: Pin<&mut Self>, lw: &LocalWaker) -> Poll<Self::Output> { |
| unsafe { |
| // Safety: neither child future is ever moved |
| match Pin::get_unchecked_mut(self) { |
| Either::Left(a) => Pin::new_unchecked(a).poll(lw), |
| Either::Right(b) => Pin::new_unchecked(b).poll(lw), |
| } |
| } |
| } |
| } |
| |
| pub struct Select<A, B> { |
| a: A, |
| b: B, |
| } |
| |
| impl<A, B> Select<A, B> { |
| unsafe_pinned!(a: A); |
| unsafe_pinned!(b: B); |
| } |
| |
| impl<A: Future, B: Future> Future for Select<A, B> { |
| type Output = Either<A::Output, B::Output>; |
| fn poll(mut self: Pin<&mut Self>, lw: &LocalWaker) -> Poll<Self::Output> { |
| if let Poll::Ready(a) = self.as_mut().a().poll(lw) { |
| return Poll::Ready(Either::Left(a)); |
| } |
| if let Poll::Ready(b) = self.as_mut().b().poll(lw) { |
| return Poll::Ready(Either::Right(b)); |
| } |
| Poll::Pending |
| } |
| } |
| |
| pub struct SelectUnpin<A, B> { |
| a: Option<A>, |
| b: Option<B>, |
| } |
| |
| impl<A: Future + Unpin, B: Future + Unpin> Future for SelectUnpin<A, B> { |
| type Output = Either<(A::Output, B), (A, B::Output)>; |
| fn poll(mut self: Pin<&mut Self>, lw: &LocalWaker) -> Poll<Self::Output> { |
| let this = &mut *self; |
| if let Poll::Ready(a) = this.a.as_mut().unwrap().poll_unpin(lw) { |
| return Poll::Ready(Either::Left((a, this.b.take().unwrap()))); |
| } |
| if let Poll::Ready(b) = this.b.as_mut().unwrap().poll_unpin(lw) { |
| return Poll::Ready(Either::Right((this.a.take().unwrap(), b))); |
| } |
| Poll::Pending |
| } |
| } |