blob: b43eb19125b5f573fdd8eb05b4a3cba7b8a1c507 [file] [log] [blame]
// Copyright 2019 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
//! Helpers for working with streams.
use {
futures::{ready, Stream, StreamExt},
std::{
future::Future,
pin::Pin,
task::{Context, Poll},
},
};
/// Essentially `stream.into_future().map(move |(value, stream) (value, stream, with))`
pub(super) struct NextWith<St, With> {
opt: Option<(St, With)>,
}
const USED_AFTER_COMPLETION: &str = "`NextWith` used after completion";
impl<St, With> NextWith<St, With> {
/// Create a new `NextWith` future.
pub fn new(stream: St, with: With) -> Self {
NextWith { opt: Some((stream, with)) }
}
fn stream(&mut self) -> &mut St {
&mut self.opt.as_mut().expect(USED_AFTER_COMPLETION).0
}
fn take(&mut self) -> (St, With) {
self.opt.take().expect(USED_AFTER_COMPLETION)
}
}
impl<St, With> Unpin for NextWith<St, With> {}
impl<St, With> Future for NextWith<St, With>
where
St: Stream + Unpin,
{
type Output = Option<(St::Item, St, With)>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let opt = ready!(self.stream().poll_next_unpin(cx));
Poll::Ready(opt.map(|next| {
let (st, with) = self.take();
(next, st, with)
}))
}
}