blob: 12608facf1b7dbe0f5fb3506b0d853a56274e6df [file] [log] [blame]
// Copyright 2021 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use {
crate::Measurable,
anyhow::{Context as _, Result},
fidl_fuchsia_pkg::{
BlobIdIteratorNextResponder, BlobIdIteratorRequest, BlobIdIteratorRequestStream,
BlobInfoIteratorNextResponder, BlobInfoIteratorRequest, BlobInfoIteratorRequestStream,
PackageIndexEntry, PackageIndexIteratorNextResponder, PackageIndexIteratorRequest,
PackageIndexIteratorRequestStream,
},
fuchsia_zircon_types::ZX_CHANNEL_MAX_MSG_BYTES,
futures::prelude::*,
};
/// Serves fidl iterators like:
///
/// protocol PayloadIterator {
/// Next() -> (vector<Payload>:MAX payloads);
/// };
///
/// from:
/// `fidl_iterator`: effectively a stream of `PayloadIterator::Next` requests
/// `items`: a slice of `Payload`s.
///
/// Fills each response to `Next()` with as many entries as will fit in a fidl message. The
/// returned future completes after `Next()` yields an empty response or the iterator
/// is interrupted (client closes the channel or the task encounters a FIDL layer error).
///
/// To use with a new protocol (e.g. `PayloadIterator`), in this crate:
/// 1. implement `FidlIteratorRequestStream` for `PayloadIteratorRequestStream`
/// 2. implement `FidlIteratorNextResponder` for `PayloadIteratorNextResponder`
/// 3. implement `Measurable` for `Payload` using functions generated by
/// //tools/fidl/measure-tape
pub async fn serve_fidl_iterator_from_slice<I>(
mut fidl_iterator: I,
mut items: impl AsMut<[<I::Responder as FidlIteratorNextResponder>::Item]>,
) -> Result<()>
where
I: FidlIteratorRequestStream,
{
let mut items = SliceChunker::new(items.as_mut());
loop {
let chunk = items.next();
let responder =
match fidl_iterator.try_next().await.context("while waiting for next() request")? {
None => break,
Some(request) => I::request_to_responder(request),
};
let () = responder.send_chunk(&chunk).context("while responding")?;
// Yield a single empty chunk, then stop serving the protocol.
if chunk.is_empty() {
break;
}
}
Ok(())
}
/// Serves fidl iterators like:
///
/// protocol PayloadIterator {
/// Next() -> (vector<Payload>:MAX payloads);
/// };
///
/// from:
/// `fidl_iterator`: effectively a stream of `PayloadIterator::Next` requests
/// `stream`: a Stream<Vec<Payload>>
/// `max_stream_chunks`: the maximum number of `Vec<Payload>`'s to pull from `stream` at a time.
/// Making this number larger can pack more `Payload`s into the fidl response, decreasing
/// overhead, but the buffer of `Vec<Payload>`s is pre-allocated, so if this number is e.g.
/// `usize::MAX` the program will OOM. This number is the maximum, not the minimum, i.e
/// `serve_fidl_iterator_from_stream` will not block on `stream` if there are available
/// `Payload`s to send. Arguments of `0` will be converted to `1`.
///
///
/// Fills each response to `Next()` with as many available entries as will fit in a fidl message.
/// Only blocks on `stream` if there are no available entries.
/// The returned future completes after `Next()` yields an empty response or the iterator
/// is interrupted (client closes the channel or the task encounters a FIDL layer error).
///
/// To use with a new protocol (e.g. `PayloadIterator`), in this crate:
/// 1. implement `FidlIteratorRequestStream` for `PayloadIteratorRequestStream`
/// 2. implement `FidlIteratorNextResponder` for `PayloadIteratorNextResponder`
/// 3. implement `Measurable` for `Payload` using functions generated by
/// //tools/fidl/measure-tape
pub async fn serve_fidl_iterator_from_stream<I>(
mut fidl_iterator: I,
stream: impl futures::stream::Stream<Item = Vec<<I::Responder as FidlIteratorNextResponder>::Item>>
+ Unpin,
max_stream_chunks: usize,
) -> Result<()>
where
I: FidlIteratorRequestStream,
{
let mut chunked_stream = stream.ready_chunks(std::cmp::max(max_stream_chunks, 1));
let mut fidl_chunker = OwningChunker::new();
loop {
let responder =
match fidl_iterator.try_next().await.context("while waiting for next() request")? {
None => break,
Some(request) => I::request_to_responder(request),
};
// Get as many new items as possible, to minimize the number of FIDL messages, but don't
// block if we already have some.
if fidl_chunker.is_empty() {
loop {
if let Some(xss) = chunked_stream.next().await {
fidl_chunker.extend(xss.into_iter().flatten());
if fidl_chunker.is_empty() {
continue;
}
}
break;
}
} else {
if let Some(Some(xss)) = chunked_stream.next().now_or_never() {
fidl_chunker.extend(xss.into_iter().flatten());
}
}
let mut chunk = fidl_chunker.next();
let () = responder.send_chunk(chunk.make_contiguous()).context("while responding")?;
if chunk.is_empty() {
break;
}
}
Ok(())
}
/// A FIDL request stream for a FIDL protocol following the iterator pattern.
pub trait FidlIteratorRequestStream:
fidl::endpoints::RequestStream + TryStream<Error = fidl::Error>
{
type Responder: FidlIteratorNextResponder;
fn request_to_responder(request: <Self as TryStream>::Ok) -> Self::Responder;
}
/// A responder to a Next() request for a FIDL iterator.
pub trait FidlIteratorNextResponder {
type Item: Measurable;
fn send_chunk(self, chunk: &[Self::Item]) -> Result<(), fidl::Error>;
}
impl FidlIteratorRequestStream for PackageIndexIteratorRequestStream {
type Responder = PackageIndexIteratorNextResponder;
fn request_to_responder(request: PackageIndexIteratorRequest) -> Self::Responder {
let PackageIndexIteratorRequest::Next { responder } = request;
responder
}
}
impl FidlIteratorNextResponder for PackageIndexIteratorNextResponder {
type Item = PackageIndexEntry;
fn send_chunk(self, chunk: &[Self::Item]) -> Result<(), fidl::Error> {
self.send(chunk)
}
}
impl FidlIteratorRequestStream for BlobInfoIteratorRequestStream {
type Responder = BlobInfoIteratorNextResponder;
fn request_to_responder(request: BlobInfoIteratorRequest) -> Self::Responder {
let BlobInfoIteratorRequest::Next { responder } = request;
responder
}
}
impl FidlIteratorRequestStream for BlobIdIteratorRequestStream {
type Responder = BlobIdIteratorNextResponder;
fn request_to_responder(request: BlobIdIteratorRequest) -> Self::Responder {
let BlobIdIteratorRequest::Next { responder } = request;
responder
}
}
impl FidlIteratorNextResponder for BlobInfoIteratorNextResponder {
type Item = fidl_fuchsia_pkg::BlobInfo;
fn send_chunk(self, chunk: &[Self::Item]) -> Result<(), fidl::Error> {
self.send(chunk)
}
}
impl FidlIteratorNextResponder for BlobIdIteratorNextResponder {
type Item = fidl_fuchsia_pkg::BlobId;
fn send_chunk(self, chunk: &[Self::Item]) -> Result<(), fidl::Error> {
self.send(chunk)
}
}
// FIXME(52297) This constant would ideally be exported by the `fidl` crate.
// sizeof(TransactionHeader) + sizeof(VectorHeader)
const FIDL_VEC_RESPONSE_OVERHEAD_BYTES: usize = 32;
/// Assumes the fixed overhead of a single fidl response header and a single vec header per chunk.
/// It must not be used with more complex responses.
fn how_many_items_fit_in_fidl_vec_response<'a>(
items: impl Iterator<Item = &'a (impl Measurable + 'a)>,
) -> usize {
let mut bytes_used: usize = FIDL_VEC_RESPONSE_OVERHEAD_BYTES;
let mut count = 0;
for item in items {
bytes_used += item.measure();
if bytes_used > ZX_CHANNEL_MAX_MSG_BYTES as usize {
break;
}
count += 1;
}
count
}
/// Helper to split a slice of items into chunks that will fit in a single FIDL vec response.
///
/// Note, SliceChunker assumes the fixed overhead of a single fidl response header and a single vec
/// header per chunk. It must not be used with more complex responses.
struct SliceChunker<'a, I> {
items: &'a mut [I],
}
impl<'a, I> SliceChunker<'a, I>
where
I: Measurable,
{
fn new(items: &'a mut [I]) -> Self {
Self { items }
}
/// Produce the next chunk of items to respond with. Iteration stops when this method returns
/// an empty slice, which occurs when either:
/// * All items have been returned
/// * SliceChunker encounters an item so large that it cannot even be stored in a response
/// dedicated to just that one item.
///
/// Once next() returns an empty slice, it will continue to do so in future calls.
fn next(&mut self) -> &'a mut [I] {
let entry_count = how_many_items_fit_in_fidl_vec_response(self.items.iter());
// tmp/swap dance to appease the borrow checker.
let tmp = std::mem::replace(&mut self.items, &mut []);
let (chunk, rest) = tmp.split_at_mut(entry_count);
self.items = rest;
chunk
}
}
/// Helper to split a collection of items into chunks that will fit in a single FIDL vec response.
///
/// Note, OwningChunker assumes the fixed overhead of a single fidl response header and a single vec
/// header per chunk. It must not be used with more complex responses.
struct OwningChunker<I> {
items: std::collections::VecDeque<I>,
}
impl<I> OwningChunker<I>
where
I: Measurable,
{
fn new() -> Self {
Self { items: std::collections::VecDeque::new() }
}
/// Produce the next chunk of items to respond with. Iteration stops when this method returns
/// an empty VecDeque, which occurs when either:
/// * All items have been returned (and no new items are added)
/// * OwningChunker encounters an item so large that it cannot even be stored in a response
/// dedicated to just that one item.
fn next(&mut self) -> std::collections::VecDeque<I> {
let count = how_many_items_fit_in_fidl_vec_response(self.items.iter());
let mut other = self.items.split_off(count);
std::mem::swap(&mut self.items, &mut other);
other
}
fn is_empty(&self) -> bool {
self.items.is_empty()
}
fn extend(&mut self, iter: impl IntoIterator<Item = I>) {
self.items.extend(iter)
}
}
#[cfg(test)]
mod tests {
use {
super::*,
fidl_fuchsia_pkg::{BlobInfoIteratorMarker, PackageIndexIteratorMarker},
fuchsia_async::Task,
fuchsia_hash::HashRangeFull,
fuchsia_pkg::PackagePath,
proptest::prelude::*,
};
#[test]
fn zx_channel_max_msg_bytes_fits_in_usize() {
let _: usize = ZX_CHANNEL_MAX_MSG_BYTES.try_into().unwrap();
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
struct Byte(u8);
impl Measurable for Byte {
fn measure(&self) -> usize {
1
}
}
#[test]
fn slice_chunker_fuses() {
let items = &mut [Byte(42)];
let mut chunker = SliceChunker::new(items);
assert_eq!(chunker.next(), &mut [Byte(42)]);
assert_eq!(chunker.next(), &mut []);
assert_eq!(chunker.next(), &mut []);
}
#[test]
fn slice_chunker_chunks_at_expected_boundary() {
const BYTES_PER_CHUNK: usize =
ZX_CHANNEL_MAX_MSG_BYTES as usize - FIDL_VEC_RESPONSE_OVERHEAD_BYTES;
// Expect to fill 2 full chunks with 1 item left over.
let mut items =
(0..=(BYTES_PER_CHUNK as u64 * 2)).map(|n| Byte(n as u8)).collect::<Vec<Byte>>();
let expected = items.clone();
let mut chunker = SliceChunker::new(&mut items);
let mut actual: Vec<Byte> = vec![];
for _ in 0..2 {
let chunk = chunker.next();
assert_eq!(chunk.len(), BYTES_PER_CHUNK);
actual.extend(&*chunk);
}
let chunk = chunker.next();
assert_eq!(chunk.len(), 1);
actual.extend(&*chunk);
assert_eq!(actual, expected);
}
#[test]
fn slice_chunker_terminates_at_too_large_item() {
#[derive(Debug, PartialEq, Eq)]
struct TooBig;
impl Measurable for TooBig {
fn measure(&self) -> usize {
ZX_CHANNEL_MAX_MSG_BYTES as usize
}
}
let items = &mut [TooBig];
let mut chunker = SliceChunker::new(items);
assert_eq!(chunker.next(), &mut []);
}
#[test]
fn owning_chunker_fuses() {
let items = [Byte(42)];
let mut chunker = OwningChunker::new();
chunker.extend(items);
assert_eq!(chunker.next().make_contiguous(), &[Byte(42)]);
assert_eq!(chunker.next().make_contiguous(), &[]);
assert_eq!(chunker.next().make_contiguous(), &[]);
}
#[test]
fn owning_chunker_chunks_at_expected_boundary() {
const BYTES_PER_CHUNK: usize =
ZX_CHANNEL_MAX_MSG_BYTES as usize - FIDL_VEC_RESPONSE_OVERHEAD_BYTES;
// Expect to fill 2 full chunks with 1 item left over.
let items =
(0..=(BYTES_PER_CHUNK as u64 * 2)).map(|n| Byte(n as u8)).collect::<Vec<Byte>>();
let expected = items.clone();
let mut chunker = OwningChunker::new();
chunker.extend(items.into_iter());
let mut actual: Vec<Byte> = vec![];
for _ in 0..2 {
let chunk = chunker.next();
assert_eq!(chunk.len(), BYTES_PER_CHUNK);
actual.extend(chunk);
}
let chunk = chunker.next();
assert_eq!(chunk.len(), 1);
actual.extend(chunk);
assert_eq!(actual, expected);
}
#[test]
fn owning_chunker_terminates_at_too_large_item() {
#[derive(Debug, PartialEq, Eq)]
struct TooBig;
impl Measurable for TooBig {
fn measure(&self) -> usize {
ZX_CHANNEL_MAX_MSG_BYTES as usize
}
}
let items = [TooBig];
let mut chunker = OwningChunker::new();
chunker.extend(items);
assert_eq!(chunker.next().make_contiguous(), &mut []);
}
#[test]
fn owning_chunker_extend_after_next() {
let mut chunker = OwningChunker::new();
chunker.extend([Byte(0)]);
chunker.extend([Byte(1)]);
assert_eq!(chunker.next().make_contiguous(), &[Byte(0), Byte(1)]);
assert_eq!(chunker.next().make_contiguous(), &[]);
chunker.extend([Byte(2)]);
assert_eq!(chunker.next().make_contiguous(), &[Byte(2)]);
}
#[test]
fn verify_fidl_vec_response_overhead() {
let vec_response_overhead = {
use fidl::encoding::{
DynamicFlags, TransactionHeader, TransactionMessage, TransactionMessageType,
UnboundedVector,
};
type Msg = TransactionMessageType<UnboundedVector<u8>>;
let msg = TransactionMessage {
header: TransactionHeader::new(0, 0, DynamicFlags::empty()),
body: &[] as &[u8],
};
fidl::encoding::with_tls_encoded::<Msg, _>(msg, |bytes, _handles| Ok(bytes.len()))
.unwrap()
};
assert_eq!(vec_response_overhead, FIDL_VEC_RESPONSE_OVERHEAD_BYTES);
}
proptest! {
#![proptest_config(ProptestConfig{
// Disable persistence to avoid the warning for not running in the
// source code directory (since we're running on a Fuchsia target)
failure_persistence: None,
.. ProptestConfig::default()
})]
#[test]
fn serve_fidl_iterator_from_slice_yields_expected_entries(items: Vec<crate::BlobInfo>) {
let mut executor = fuchsia_async::TestExecutor::new();
executor.run_singlethreaded(async move {
let (proxy, stream) =
fidl::endpoints::create_proxy_and_stream::<BlobInfoIteratorMarker>().unwrap();
let mut actual_items = vec![];
let ((), ()) = futures::future::join(
async {
let items = items
.iter()
.cloned()
.map(fidl_fuchsia_pkg::BlobInfo::from)
.collect::<Vec<_>>();
serve_fidl_iterator_from_slice(stream, items).await.unwrap()
},
async {
loop {
let chunk = proxy.next().await.unwrap();
if chunk.is_empty() {
break;
}
let chunk = chunk.into_iter().map(crate::BlobInfo::from);
actual_items.extend(chunk);
}
},
)
.await;
assert_eq!(items, actual_items);
})
}
#[test]
fn serve_fidl_iterator_from_stream_yields_expected_entries(
items: Vec<crate::BlobInfo>,
repetition in 0..4usize,
max_chunking in 0..4usize,
) {
let mut executor = fuchsia_async::TestExecutor::new();
executor.run_singlethreaded(async move {
let (proxy, fidl_stream) =
fidl::endpoints::create_proxy_and_stream::<BlobInfoIteratorMarker>().unwrap();
let (mut item_sender, item_stream) = futures::channel::mpsc::unbounded();
let mut actual_items = vec![];
let ((), (), ()) = futures::future::join3(
async {
for _ in 0..repetition {
let () = item_sender.send(items
.iter()
.cloned()
.map(fidl_fuchsia_pkg::BlobInfo::from)
.collect::<Vec<_>>()).await.unwrap();
}
drop(item_sender);
},
async {
let () = serve_fidl_iterator_from_stream(
fidl_stream,
item_stream,
max_chunking
)
.await
.unwrap();
},
async {
loop {
let chunk = proxy.next().await.unwrap();
if chunk.is_empty() {
break;
}
let chunk = chunk.into_iter().map(crate::BlobInfo::from);
actual_items.extend(chunk);
}
},
)
.await;
let expected_items = {
let mut expected_items = vec![];
for _ in 0..repetition {
expected_items.extend(items.iter().cloned())
}
expected_items
};
assert_eq!(expected_items, actual_items);
})
}
}
const PACKAGE_INDEX_CHUNK_SIZE_MAX: usize = 818;
// FIDL message is at most 65,536 bytes because of zx_channel_write [1].
// `PackageIndexIterator.Next()` return value size, encoded [2], is:
// 16 bytes FIDL transaction header +
// 16 bytes vector header +
// N * (16 bytes string header (from url field of struct PackageUrl) +
// L bytes string content +
// 32 bytes array.
// This totals in 32 + N * (48 + L), where L is 8-byte aligned
// because secondary objects (e.g. string contents) are 8-byte aligned.
//
// The shortest possible package url is 29 bytes "fuchsia-pkg://fuchsia.com/a/0".
//
// And the longest is 283 bytes, which is 288 bytes with 8-byte alignment, so
// PACKAGE_INDEX_CHUNK_SIZE_MIN => 65536 <= 32 + N * (48 + 288) => N = 194
//
// [1] https://fuchsia.dev/fuchsia-src/reference/syscalls/channel_write
// [2] https://fuchsia.dev/fuchsia-src/reference/fidl/language/wire-format
const PACKAGE_INDEX_CHUNK_SIZE_MIN: usize = 194;
#[fuchsia_async::run_singlethreaded(test)]
async fn package_index_iterator_paginates_shortest_entries() {
let names = ('a'..='z').cycle().map(|c| c.to_string());
let paths = names.map(|name| {
PackagePath::from_name_and_variant(name.parse().unwrap(), "0".parse().unwrap())
});
verify_package_index_iterator_pagination(paths, PACKAGE_INDEX_CHUNK_SIZE_MAX).await;
}
#[fuchsia_async::run_singlethreaded(test)]
async fn package_index_iterator_paginates_longest_entries() {
let names = ('a'..='z')
.map(|c| std::iter::repeat(c).take(PackagePath::MAX_NAME_BYTES).collect::<String>())
.cycle();
let paths = names.map(|name| {
PackagePath::from_name_and_variant(name.parse().unwrap(), "0".parse().unwrap())
});
verify_package_index_iterator_pagination(paths, PACKAGE_INDEX_CHUNK_SIZE_MIN).await;
}
async fn verify_package_index_iterator_pagination(
paths: impl Iterator<Item = PackagePath>,
expected_chunk_size: usize,
) {
let package_entries: Vec<fidl_fuchsia_pkg::PackageIndexEntry> = paths
.zip(HashRangeFull::default())
.take(expected_chunk_size * 2)
.map(|(path, hash)| fidl_fuchsia_pkg::PackageIndexEntry {
package_url: fidl_fuchsia_pkg::PackageUrl {
url: format!("fuchsia-pkg://fuchsia.com/{}", path),
},
meta_far_blob_id: crate::BlobId::from(hash).into(),
})
.collect();
let (iter, stream) =
fidl::endpoints::create_proxy_and_stream::<PackageIndexIteratorMarker>().unwrap();
let task = Task::local(serve_fidl_iterator_from_slice(stream, package_entries));
let chunk = iter.next().await.unwrap();
assert_eq!(chunk.len(), expected_chunk_size);
let chunk = iter.next().await.unwrap();
assert_eq!(chunk.len(), expected_chunk_size);
let chunk = iter.next().await.unwrap();
assert_eq!(chunk.len(), 0);
let () = task.await.unwrap();
}
// TestExecutor.run_until_stalled is not available on host
#[cfg(target_os = "fuchsia")]
use assert_matches::assert_matches;
// TestExecutor.run_until_stalled is not available on host
#[cfg(target_os = "fuchsia")]
#[test]
fn serve_fidl_iterator_from_stream_ignores_empty_vec() {
let mut executor = fuchsia_async::TestExecutor::new();
let (proxy, fidl_stream) =
fidl::endpoints::create_proxy_and_stream::<BlobInfoIteratorMarker>().unwrap();
let (item_sender, item_stream) = futures::channel::mpsc::unbounded();
let mut serve_task = serve_fidl_iterator_from_stream(fidl_stream, item_stream, 10).boxed();
// serve_fidl_iterator_from_stream should ignore the empty vec of Payloads, so
// chunk_fut should not complete.
let () = item_sender.unbounded_send(vec![]).unwrap();
let mut chunk_fut = proxy.next();
assert_matches!(executor.run_until_stalled(&mut serve_task), std::task::Poll::Pending);
assert_matches!(executor.run_until_stalled(&mut chunk_fut), std::task::Poll::Pending);
// chunk_fut should complete once serve_fidl_iterator_from_stream is given a Payload
let blob_info = crate::BlobInfo { blob_id: [0; 32].into(), length: 0 };
let () = item_sender.unbounded_send(vec![blob_info.into()]).unwrap();
assert_matches!(executor.run_until_stalled(&mut serve_task), std::task::Poll::Pending);
assert_matches!(
executor.run_until_stalled(&mut chunk_fut),
std::task::Poll::Ready(Ok(chunk))
if chunk == vec![fidl_fuchsia_pkg::BlobInfo::from(blob_info)]
);
}
// TestExecutor.run_until_stalled is not available on host
#[cfg(target_os = "fuchsia")]
#[test]
fn serve_fidl_iterator_from_stream_does_not_block_if_chunker_not_empty() {
let mut executor = fuchsia_async::TestExecutor::new();
let (proxy, fidl_stream) =
fidl::endpoints::create_proxy_and_stream::<BlobInfoIteratorMarker>().unwrap();
let (item_sender, item_stream) = futures::channel::mpsc::unbounded();
let mut serve_task = serve_fidl_iterator_from_stream(fidl_stream, item_stream, 10).boxed();
let blob_info = fidl_fuchsia_pkg::BlobInfo::from(crate::BlobInfo {
blob_id: [0; 32].into(),
length: 0,
});
let max_payloads_per_fidl_response = (ZX_CHANNEL_MAX_MSG_BYTES as usize
- FIDL_VEC_RESPONSE_OVERHEAD_BYTES)
/ measure_fuchsia_pkg::Measurable::measure(&blob_info).num_bytes;
let payloads = vec![blob_info; max_payloads_per_fidl_response + 1];
assert_eq!(
how_many_items_fit_in_fidl_vec_response(payloads.iter()),
max_payloads_per_fidl_response
);
// Send all the payloads, the first FIDL response should contain as many as will fit.
let () = item_sender.unbounded_send(payloads).unwrap();
let mut chunk_fut = proxy.next();
assert_matches!(executor.run_until_stalled(&mut serve_task), std::task::Poll::Pending);
assert_matches!(
executor.run_until_stalled(&mut chunk_fut),
std::task::Poll::Ready(Ok(chunk))
if chunk.len() == max_payloads_per_fidl_response
);
// There should be one payload left in the OwningChunker, so we should be able to obtain
// another FIDL response without sending more payloads.
let mut chunk_fut = proxy.next();
assert_matches!(executor.run_until_stalled(&mut serve_task), std::task::Poll::Pending);
assert_matches!(
executor.run_until_stalled(&mut chunk_fut),
std::task::Poll::Ready(Ok(chunk))
if chunk.len() == 1
);
// There should be no payloads left, so the next Next request should block.
let mut chunk_fut = proxy.next();
assert_matches!(executor.run_until_stalled(&mut serve_task), std::task::Poll::Pending);
assert_matches!(executor.run_until_stalled(&mut chunk_fut), std::task::Poll::Pending);
// The serving task should start providing payloads again when more are provided.
let () = item_sender.unbounded_send(vec![blob_info; 2]).unwrap();
assert_matches!(executor.run_until_stalled(&mut serve_task), std::task::Poll::Pending);
assert_matches!(
executor.run_until_stalled(&mut chunk_fut),
std::task::Poll::Ready(Ok(chunk))
if chunk.len() == 2
);
}
}