| // Copyright 2021 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| use { |
| crate::Measurable, |
| anyhow::{Context as _, Result}, |
| fidl_fuchsia_pkg::{ |
| BlobIdIteratorNextResponder, BlobIdIteratorRequest, BlobIdIteratorRequestStream, |
| BlobInfoIteratorNextResponder, BlobInfoIteratorRequest, BlobInfoIteratorRequestStream, |
| PackageIndexEntry, PackageIndexIteratorNextResponder, PackageIndexIteratorRequest, |
| PackageIndexIteratorRequestStream, |
| }, |
| fuchsia_zircon_types::ZX_CHANNEL_MAX_MSG_BYTES, |
| futures::prelude::*, |
| }; |
| |
| /// Serves fidl iterators like: |
| /// |
| /// protocol PayloadIterator { |
| /// Next() -> (vector<Payload>:MAX payloads); |
| /// }; |
| /// |
| /// from: |
| /// `fidl_iterator`: effectively a stream of `PayloadIterator::Next` requests |
| /// `items`: a slice of `Payload`s. |
| /// |
| /// Fills each response to `Next()` with as many entries as will fit in a fidl message. The |
| /// returned future completes after `Next()` yields an empty response or the iterator |
| /// is interrupted (client closes the channel or the task encounters a FIDL layer error). |
| /// |
| /// To use with a new protocol (e.g. `PayloadIterator`), in this crate: |
| /// 1. implement `FidlIteratorRequestStream` for `PayloadIteratorRequestStream` |
| /// 2. implement `FidlIteratorNextResponder` for `PayloadIteratorNextResponder` |
| /// 3. implement `Measurable` for `Payload` using functions generated by |
| /// //tools/fidl/measure-tape |
| pub async fn serve_fidl_iterator_from_slice<I>( |
| mut fidl_iterator: I, |
| mut items: impl AsMut<[<I::Responder as FidlIteratorNextResponder>::Item]>, |
| ) -> Result<()> |
| where |
| I: FidlIteratorRequestStream, |
| { |
| let mut items = SliceChunker::new(items.as_mut()); |
| |
| loop { |
| let chunk = items.next(); |
| |
| let responder = |
| match fidl_iterator.try_next().await.context("while waiting for next() request")? { |
| None => break, |
| Some(request) => I::request_to_responder(request), |
| }; |
| |
| let () = responder.send_chunk(&chunk).context("while responding")?; |
| |
| // Yield a single empty chunk, then stop serving the protocol. |
| if chunk.is_empty() { |
| break; |
| } |
| } |
| |
| Ok(()) |
| } |
| |
| /// Serves fidl iterators like: |
| /// |
| /// protocol PayloadIterator { |
| /// Next() -> (vector<Payload>:MAX payloads); |
| /// }; |
| /// |
| /// from: |
| /// `fidl_iterator`: effectively a stream of `PayloadIterator::Next` requests |
| /// `stream`: a Stream<Vec<Payload>> |
| /// `max_stream_chunks`: the maximum number of `Vec<Payload>`'s to pull from `stream` at a time. |
| /// Making this number larger can pack more `Payload`s into the fidl response, decreasing |
| /// overhead, but the buffer of `Vec<Payload>`s is pre-allocated, so if this number is e.g. |
| /// `usize::MAX` the program will OOM. This number is the maximum, not the minimum, i.e |
| /// `serve_fidl_iterator_from_stream` will not block on `stream` if there are available |
| /// `Payload`s to send. Arguments of `0` will be converted to `1`. |
| /// |
| /// |
| /// Fills each response to `Next()` with as many available entries as will fit in a fidl message. |
| /// Only blocks on `stream` if there are no available entries. |
| /// The returned future completes after `Next()` yields an empty response or the iterator |
| /// is interrupted (client closes the channel or the task encounters a FIDL layer error). |
| /// |
| /// To use with a new protocol (e.g. `PayloadIterator`), in this crate: |
| /// 1. implement `FidlIteratorRequestStream` for `PayloadIteratorRequestStream` |
| /// 2. implement `FidlIteratorNextResponder` for `PayloadIteratorNextResponder` |
| /// 3. implement `Measurable` for `Payload` using functions generated by |
| /// //tools/fidl/measure-tape |
| pub async fn serve_fidl_iterator_from_stream<I>( |
| mut fidl_iterator: I, |
| stream: impl futures::stream::Stream<Item = Vec<<I::Responder as FidlIteratorNextResponder>::Item>> |
| + Unpin, |
| max_stream_chunks: usize, |
| ) -> Result<()> |
| where |
| I: FidlIteratorRequestStream, |
| { |
| let mut chunked_stream = stream.ready_chunks(std::cmp::max(max_stream_chunks, 1)); |
| let mut fidl_chunker = OwningChunker::new(); |
| |
| loop { |
| let responder = |
| match fidl_iterator.try_next().await.context("while waiting for next() request")? { |
| None => break, |
| Some(request) => I::request_to_responder(request), |
| }; |
| |
| // Get as many new items as possible, to minimize the number of FIDL messages, but don't |
| // block if we already have some. |
| if fidl_chunker.is_empty() { |
| loop { |
| if let Some(xss) = chunked_stream.next().await { |
| fidl_chunker.extend(xss.into_iter().flatten()); |
| if fidl_chunker.is_empty() { |
| continue; |
| } |
| } |
| break; |
| } |
| } else { |
| if let Some(Some(xss)) = chunked_stream.next().now_or_never() { |
| fidl_chunker.extend(xss.into_iter().flatten()); |
| } |
| } |
| |
| let mut chunk = fidl_chunker.next(); |
| let () = responder.send_chunk(chunk.make_contiguous()).context("while responding")?; |
| if chunk.is_empty() { |
| break; |
| } |
| } |
| |
| Ok(()) |
| } |
| |
| /// A FIDL request stream for a FIDL protocol following the iterator pattern. |
| pub trait FidlIteratorRequestStream: |
| fidl::endpoints::RequestStream + TryStream<Error = fidl::Error> |
| { |
| type Responder: FidlIteratorNextResponder; |
| |
| fn request_to_responder(request: <Self as TryStream>::Ok) -> Self::Responder; |
| } |
| |
| /// A responder to a Next() request for a FIDL iterator. |
| pub trait FidlIteratorNextResponder { |
| type Item: Measurable; |
| |
| fn send_chunk(self, chunk: &[Self::Item]) -> Result<(), fidl::Error>; |
| } |
| |
| impl FidlIteratorRequestStream for PackageIndexIteratorRequestStream { |
| type Responder = PackageIndexIteratorNextResponder; |
| |
| fn request_to_responder(request: PackageIndexIteratorRequest) -> Self::Responder { |
| let PackageIndexIteratorRequest::Next { responder } = request; |
| responder |
| } |
| } |
| |
| impl FidlIteratorNextResponder for PackageIndexIteratorNextResponder { |
| type Item = PackageIndexEntry; |
| |
| fn send_chunk(self, chunk: &[Self::Item]) -> Result<(), fidl::Error> { |
| self.send(chunk) |
| } |
| } |
| |
| impl FidlIteratorRequestStream for BlobInfoIteratorRequestStream { |
| type Responder = BlobInfoIteratorNextResponder; |
| |
| fn request_to_responder(request: BlobInfoIteratorRequest) -> Self::Responder { |
| let BlobInfoIteratorRequest::Next { responder } = request; |
| responder |
| } |
| } |
| |
| impl FidlIteratorRequestStream for BlobIdIteratorRequestStream { |
| type Responder = BlobIdIteratorNextResponder; |
| |
| fn request_to_responder(request: BlobIdIteratorRequest) -> Self::Responder { |
| let BlobIdIteratorRequest::Next { responder } = request; |
| responder |
| } |
| } |
| |
| impl FidlIteratorNextResponder for BlobInfoIteratorNextResponder { |
| type Item = fidl_fuchsia_pkg::BlobInfo; |
| |
| fn send_chunk(self, chunk: &[Self::Item]) -> Result<(), fidl::Error> { |
| self.send(chunk) |
| } |
| } |
| |
| impl FidlIteratorNextResponder for BlobIdIteratorNextResponder { |
| type Item = fidl_fuchsia_pkg::BlobId; |
| |
| fn send_chunk(self, chunk: &[Self::Item]) -> Result<(), fidl::Error> { |
| self.send(chunk) |
| } |
| } |
| |
| // FIXME(52297) This constant would ideally be exported by the `fidl` crate. |
| // sizeof(TransactionHeader) + sizeof(VectorHeader) |
| const FIDL_VEC_RESPONSE_OVERHEAD_BYTES: usize = 32; |
| |
| /// Assumes the fixed overhead of a single fidl response header and a single vec header per chunk. |
| /// It must not be used with more complex responses. |
| fn how_many_items_fit_in_fidl_vec_response<'a>( |
| items: impl Iterator<Item = &'a (impl Measurable + 'a)>, |
| ) -> usize { |
| let mut bytes_used: usize = FIDL_VEC_RESPONSE_OVERHEAD_BYTES; |
| let mut count = 0; |
| |
| for item in items { |
| bytes_used += item.measure(); |
| if bytes_used > ZX_CHANNEL_MAX_MSG_BYTES as usize { |
| break; |
| } |
| count += 1; |
| } |
| count |
| } |
| |
| /// Helper to split a slice of items into chunks that will fit in a single FIDL vec response. |
| /// |
| /// Note, SliceChunker assumes the fixed overhead of a single fidl response header and a single vec |
| /// header per chunk. It must not be used with more complex responses. |
| struct SliceChunker<'a, I> { |
| items: &'a mut [I], |
| } |
| |
| impl<'a, I> SliceChunker<'a, I> |
| where |
| I: Measurable, |
| { |
| fn new(items: &'a mut [I]) -> Self { |
| Self { items } |
| } |
| |
| /// Produce the next chunk of items to respond with. Iteration stops when this method returns |
| /// an empty slice, which occurs when either: |
| /// * All items have been returned |
| /// * SliceChunker encounters an item so large that it cannot even be stored in a response |
| /// dedicated to just that one item. |
| /// |
| /// Once next() returns an empty slice, it will continue to do so in future calls. |
| fn next(&mut self) -> &'a mut [I] { |
| let entry_count = how_many_items_fit_in_fidl_vec_response(self.items.iter()); |
| // tmp/swap dance to appease the borrow checker. |
| let tmp = std::mem::replace(&mut self.items, &mut []); |
| let (chunk, rest) = tmp.split_at_mut(entry_count); |
| self.items = rest; |
| chunk |
| } |
| } |
| |
| /// Helper to split a collection of items into chunks that will fit in a single FIDL vec response. |
| /// |
| /// Note, OwningChunker assumes the fixed overhead of a single fidl response header and a single vec |
| /// header per chunk. It must not be used with more complex responses. |
| struct OwningChunker<I> { |
| items: std::collections::VecDeque<I>, |
| } |
| |
| impl<I> OwningChunker<I> |
| where |
| I: Measurable, |
| { |
| fn new() -> Self { |
| Self { items: std::collections::VecDeque::new() } |
| } |
| |
| /// Produce the next chunk of items to respond with. Iteration stops when this method returns |
| /// an empty VecDeque, which occurs when either: |
| /// * All items have been returned (and no new items are added) |
| /// * OwningChunker encounters an item so large that it cannot even be stored in a response |
| /// dedicated to just that one item. |
| fn next(&mut self) -> std::collections::VecDeque<I> { |
| let count = how_many_items_fit_in_fidl_vec_response(self.items.iter()); |
| let mut other = self.items.split_off(count); |
| std::mem::swap(&mut self.items, &mut other); |
| other |
| } |
| |
| fn is_empty(&self) -> bool { |
| self.items.is_empty() |
| } |
| |
| fn extend(&mut self, iter: impl IntoIterator<Item = I>) { |
| self.items.extend(iter) |
| } |
| } |
| |
| #[cfg(test)] |
| mod tests { |
| use { |
| super::*, |
| fidl_fuchsia_pkg::{BlobInfoIteratorMarker, PackageIndexIteratorMarker}, |
| fuchsia_async::Task, |
| fuchsia_hash::HashRangeFull, |
| fuchsia_pkg::PackagePath, |
| proptest::prelude::*, |
| }; |
| |
| #[test] |
| fn zx_channel_max_msg_bytes_fits_in_usize() { |
| let _: usize = ZX_CHANNEL_MAX_MSG_BYTES.try_into().unwrap(); |
| } |
| |
| #[derive(Debug, Clone, Copy, PartialEq, Eq)] |
| struct Byte(u8); |
| |
| impl Measurable for Byte { |
| fn measure(&self) -> usize { |
| 1 |
| } |
| } |
| |
| #[test] |
| fn slice_chunker_fuses() { |
| let items = &mut [Byte(42)]; |
| let mut chunker = SliceChunker::new(items); |
| |
| assert_eq!(chunker.next(), &mut [Byte(42)]); |
| assert_eq!(chunker.next(), &mut []); |
| assert_eq!(chunker.next(), &mut []); |
| } |
| |
| #[test] |
| fn slice_chunker_chunks_at_expected_boundary() { |
| const BYTES_PER_CHUNK: usize = |
| ZX_CHANNEL_MAX_MSG_BYTES as usize - FIDL_VEC_RESPONSE_OVERHEAD_BYTES; |
| |
| // Expect to fill 2 full chunks with 1 item left over. |
| let mut items = |
| (0..=(BYTES_PER_CHUNK as u64 * 2)).map(|n| Byte(n as u8)).collect::<Vec<Byte>>(); |
| let expected = items.clone(); |
| let mut chunker = SliceChunker::new(&mut items); |
| |
| let mut actual: Vec<Byte> = vec![]; |
| |
| for _ in 0..2 { |
| let chunk = chunker.next(); |
| assert_eq!(chunk.len(), BYTES_PER_CHUNK); |
| |
| actual.extend(&*chunk); |
| } |
| |
| let chunk = chunker.next(); |
| assert_eq!(chunk.len(), 1); |
| actual.extend(&*chunk); |
| |
| assert_eq!(actual, expected); |
| } |
| |
| #[test] |
| fn slice_chunker_terminates_at_too_large_item() { |
| #[derive(Debug, PartialEq, Eq)] |
| struct TooBig; |
| impl Measurable for TooBig { |
| fn measure(&self) -> usize { |
| ZX_CHANNEL_MAX_MSG_BYTES as usize |
| } |
| } |
| |
| let items = &mut [TooBig]; |
| let mut chunker = SliceChunker::new(items); |
| assert_eq!(chunker.next(), &mut []); |
| } |
| |
| #[test] |
| fn owning_chunker_fuses() { |
| let items = [Byte(42)]; |
| let mut chunker = OwningChunker::new(); |
| chunker.extend(items); |
| |
| assert_eq!(chunker.next().make_contiguous(), &[Byte(42)]); |
| assert_eq!(chunker.next().make_contiguous(), &[]); |
| assert_eq!(chunker.next().make_contiguous(), &[]); |
| } |
| |
| #[test] |
| fn owning_chunker_chunks_at_expected_boundary() { |
| const BYTES_PER_CHUNK: usize = |
| ZX_CHANNEL_MAX_MSG_BYTES as usize - FIDL_VEC_RESPONSE_OVERHEAD_BYTES; |
| |
| // Expect to fill 2 full chunks with 1 item left over. |
| let items = |
| (0..=(BYTES_PER_CHUNK as u64 * 2)).map(|n| Byte(n as u8)).collect::<Vec<Byte>>(); |
| let expected = items.clone(); |
| let mut chunker = OwningChunker::new(); |
| chunker.extend(items.into_iter()); |
| |
| let mut actual: Vec<Byte> = vec![]; |
| |
| for _ in 0..2 { |
| let chunk = chunker.next(); |
| assert_eq!(chunk.len(), BYTES_PER_CHUNK); |
| |
| actual.extend(chunk); |
| } |
| |
| let chunk = chunker.next(); |
| assert_eq!(chunk.len(), 1); |
| actual.extend(chunk); |
| |
| assert_eq!(actual, expected); |
| } |
| |
| #[test] |
| fn owning_chunker_terminates_at_too_large_item() { |
| #[derive(Debug, PartialEq, Eq)] |
| struct TooBig; |
| impl Measurable for TooBig { |
| fn measure(&self) -> usize { |
| ZX_CHANNEL_MAX_MSG_BYTES as usize |
| } |
| } |
| |
| let items = [TooBig]; |
| let mut chunker = OwningChunker::new(); |
| chunker.extend(items); |
| assert_eq!(chunker.next().make_contiguous(), &mut []); |
| } |
| |
| #[test] |
| fn owning_chunker_extend_after_next() { |
| let mut chunker = OwningChunker::new(); |
| chunker.extend([Byte(0)]); |
| chunker.extend([Byte(1)]); |
| |
| assert_eq!(chunker.next().make_contiguous(), &[Byte(0), Byte(1)]); |
| assert_eq!(chunker.next().make_contiguous(), &[]); |
| |
| chunker.extend([Byte(2)]); |
| |
| assert_eq!(chunker.next().make_contiguous(), &[Byte(2)]); |
| } |
| |
| #[test] |
| fn verify_fidl_vec_response_overhead() { |
| let vec_response_overhead = { |
| use fidl::encoding::{ |
| DynamicFlags, TransactionHeader, TransactionMessage, TransactionMessageType, |
| UnboundedVector, |
| }; |
| |
| type Msg = TransactionMessageType<UnboundedVector<u8>>; |
| let msg = TransactionMessage { |
| header: TransactionHeader::new(0, 0, DynamicFlags::empty()), |
| body: &[] as &[u8], |
| }; |
| fidl::encoding::with_tls_encoded::<Msg, _>(msg, |bytes, _handles| Ok(bytes.len())) |
| .unwrap() |
| }; |
| assert_eq!(vec_response_overhead, FIDL_VEC_RESPONSE_OVERHEAD_BYTES); |
| } |
| |
| proptest! { |
| #![proptest_config(ProptestConfig{ |
| // Disable persistence to avoid the warning for not running in the |
| // source code directory (since we're running on a Fuchsia target) |
| failure_persistence: None, |
| .. ProptestConfig::default() |
| })] |
| |
| #[test] |
| fn serve_fidl_iterator_from_slice_yields_expected_entries(items: Vec<crate::BlobInfo>) { |
| let mut executor = fuchsia_async::TestExecutor::new(); |
| executor.run_singlethreaded(async move { |
| let (proxy, stream) = |
| fidl::endpoints::create_proxy_and_stream::<BlobInfoIteratorMarker>().unwrap(); |
| let mut actual_items = vec![]; |
| |
| let ((), ()) = futures::future::join( |
| async { |
| let items = items |
| .iter() |
| .cloned() |
| .map(fidl_fuchsia_pkg::BlobInfo::from) |
| .collect::<Vec<_>>(); |
| serve_fidl_iterator_from_slice(stream, items).await.unwrap() |
| }, |
| async { |
| loop { |
| let chunk = proxy.next().await.unwrap(); |
| if chunk.is_empty() { |
| break; |
| } |
| let chunk = chunk.into_iter().map(crate::BlobInfo::from); |
| actual_items.extend(chunk); |
| } |
| }, |
| ) |
| .await; |
| |
| assert_eq!(items, actual_items); |
| }) |
| } |
| |
| #[test] |
| fn serve_fidl_iterator_from_stream_yields_expected_entries( |
| items: Vec<crate::BlobInfo>, |
| repetition in 0..4usize, |
| max_chunking in 0..4usize, |
| ) { |
| let mut executor = fuchsia_async::TestExecutor::new(); |
| executor.run_singlethreaded(async move { |
| let (proxy, fidl_stream) = |
| fidl::endpoints::create_proxy_and_stream::<BlobInfoIteratorMarker>().unwrap(); |
| let (mut item_sender, item_stream) = futures::channel::mpsc::unbounded(); |
| let mut actual_items = vec![]; |
| |
| let ((), (), ()) = futures::future::join3( |
| async { |
| for _ in 0..repetition { |
| let () = item_sender.send(items |
| .iter() |
| .cloned() |
| .map(fidl_fuchsia_pkg::BlobInfo::from) |
| .collect::<Vec<_>>()).await.unwrap(); |
| } |
| drop(item_sender); |
| }, |
| async { |
| let () = serve_fidl_iterator_from_stream( |
| fidl_stream, |
| item_stream, |
| max_chunking |
| ) |
| .await |
| .unwrap(); |
| }, |
| async { |
| loop { |
| let chunk = proxy.next().await.unwrap(); |
| if chunk.is_empty() { |
| break; |
| } |
| let chunk = chunk.into_iter().map(crate::BlobInfo::from); |
| actual_items.extend(chunk); |
| } |
| }, |
| ) |
| .await; |
| |
| let expected_items = { |
| let mut expected_items = vec![]; |
| for _ in 0..repetition { |
| expected_items.extend(items.iter().cloned()) |
| } |
| expected_items |
| }; |
| assert_eq!(expected_items, actual_items); |
| }) |
| } |
| } |
| |
| const PACKAGE_INDEX_CHUNK_SIZE_MAX: usize = 818; |
| |
| // FIDL message is at most 65,536 bytes because of zx_channel_write [1]. |
| // `PackageIndexIterator.Next()` return value size, encoded [2], is: |
| // 16 bytes FIDL transaction header + |
| // 16 bytes vector header + |
| // N * (16 bytes string header (from url field of struct PackageUrl) + |
| // L bytes string content + |
| // 32 bytes array. |
| // This totals in 32 + N * (48 + L), where L is 8-byte aligned |
| // because secondary objects (e.g. string contents) are 8-byte aligned. |
| // |
| // The shortest possible package url is 29 bytes "fuchsia-pkg://fuchsia.com/a/0". |
| // |
| // And the longest is 283 bytes, which is 288 bytes with 8-byte alignment, so |
| // PACKAGE_INDEX_CHUNK_SIZE_MIN => 65536 <= 32 + N * (48 + 288) => N = 194 |
| // |
| // [1] https://fuchsia.dev/fuchsia-src/reference/syscalls/channel_write |
| // [2] https://fuchsia.dev/fuchsia-src/reference/fidl/language/wire-format |
| const PACKAGE_INDEX_CHUNK_SIZE_MIN: usize = 194; |
| |
| #[fuchsia_async::run_singlethreaded(test)] |
| async fn package_index_iterator_paginates_shortest_entries() { |
| let names = ('a'..='z').cycle().map(|c| c.to_string()); |
| let paths = names.map(|name| { |
| PackagePath::from_name_and_variant(name.parse().unwrap(), "0".parse().unwrap()) |
| }); |
| |
| verify_package_index_iterator_pagination(paths, PACKAGE_INDEX_CHUNK_SIZE_MAX).await; |
| } |
| |
| #[fuchsia_async::run_singlethreaded(test)] |
| async fn package_index_iterator_paginates_longest_entries() { |
| let names = ('a'..='z') |
| .map(|c| std::iter::repeat(c).take(PackagePath::MAX_NAME_BYTES).collect::<String>()) |
| .cycle(); |
| let paths = names.map(|name| { |
| PackagePath::from_name_and_variant(name.parse().unwrap(), "0".parse().unwrap()) |
| }); |
| |
| verify_package_index_iterator_pagination(paths, PACKAGE_INDEX_CHUNK_SIZE_MIN).await; |
| } |
| |
| async fn verify_package_index_iterator_pagination( |
| paths: impl Iterator<Item = PackagePath>, |
| expected_chunk_size: usize, |
| ) { |
| let package_entries: Vec<fidl_fuchsia_pkg::PackageIndexEntry> = paths |
| .zip(HashRangeFull::default()) |
| .take(expected_chunk_size * 2) |
| .map(|(path, hash)| fidl_fuchsia_pkg::PackageIndexEntry { |
| package_url: fidl_fuchsia_pkg::PackageUrl { |
| url: format!("fuchsia-pkg://fuchsia.com/{}", path), |
| }, |
| meta_far_blob_id: crate::BlobId::from(hash).into(), |
| }) |
| .collect(); |
| |
| let (iter, stream) = |
| fidl::endpoints::create_proxy_and_stream::<PackageIndexIteratorMarker>().unwrap(); |
| let task = Task::local(serve_fidl_iterator_from_slice(stream, package_entries)); |
| |
| let chunk = iter.next().await.unwrap(); |
| assert_eq!(chunk.len(), expected_chunk_size); |
| |
| let chunk = iter.next().await.unwrap(); |
| assert_eq!(chunk.len(), expected_chunk_size); |
| |
| let chunk = iter.next().await.unwrap(); |
| assert_eq!(chunk.len(), 0); |
| |
| let () = task.await.unwrap(); |
| } |
| |
| // TestExecutor.run_until_stalled is not available on host |
| #[cfg(target_os = "fuchsia")] |
| use assert_matches::assert_matches; |
| |
| // TestExecutor.run_until_stalled is not available on host |
| #[cfg(target_os = "fuchsia")] |
| #[test] |
| fn serve_fidl_iterator_from_stream_ignores_empty_vec() { |
| let mut executor = fuchsia_async::TestExecutor::new(); |
| let (proxy, fidl_stream) = |
| fidl::endpoints::create_proxy_and_stream::<BlobInfoIteratorMarker>().unwrap(); |
| let (item_sender, item_stream) = futures::channel::mpsc::unbounded(); |
| let mut serve_task = serve_fidl_iterator_from_stream(fidl_stream, item_stream, 10).boxed(); |
| |
| // serve_fidl_iterator_from_stream should ignore the empty vec of Payloads, so |
| // chunk_fut should not complete. |
| let () = item_sender.unbounded_send(vec![]).unwrap(); |
| let mut chunk_fut = proxy.next(); |
| assert_matches!(executor.run_until_stalled(&mut serve_task), std::task::Poll::Pending); |
| assert_matches!(executor.run_until_stalled(&mut chunk_fut), std::task::Poll::Pending); |
| |
| // chunk_fut should complete once serve_fidl_iterator_from_stream is given a Payload |
| let blob_info = crate::BlobInfo { blob_id: [0; 32].into(), length: 0 }; |
| let () = item_sender.unbounded_send(vec![blob_info.into()]).unwrap(); |
| assert_matches!(executor.run_until_stalled(&mut serve_task), std::task::Poll::Pending); |
| assert_matches!( |
| executor.run_until_stalled(&mut chunk_fut), |
| std::task::Poll::Ready(Ok(chunk)) |
| if chunk == vec![fidl_fuchsia_pkg::BlobInfo::from(blob_info)] |
| ); |
| } |
| |
| // TestExecutor.run_until_stalled is not available on host |
| #[cfg(target_os = "fuchsia")] |
| #[test] |
| fn serve_fidl_iterator_from_stream_does_not_block_if_chunker_not_empty() { |
| let mut executor = fuchsia_async::TestExecutor::new(); |
| let (proxy, fidl_stream) = |
| fidl::endpoints::create_proxy_and_stream::<BlobInfoIteratorMarker>().unwrap(); |
| let (item_sender, item_stream) = futures::channel::mpsc::unbounded(); |
| let mut serve_task = serve_fidl_iterator_from_stream(fidl_stream, item_stream, 10).boxed(); |
| |
| let blob_info = fidl_fuchsia_pkg::BlobInfo::from(crate::BlobInfo { |
| blob_id: [0; 32].into(), |
| length: 0, |
| }); |
| let max_payloads_per_fidl_response = (ZX_CHANNEL_MAX_MSG_BYTES as usize |
| - FIDL_VEC_RESPONSE_OVERHEAD_BYTES) |
| / measure_fuchsia_pkg::Measurable::measure(&blob_info).num_bytes; |
| let payloads = vec![blob_info; max_payloads_per_fidl_response + 1]; |
| assert_eq!( |
| how_many_items_fit_in_fidl_vec_response(payloads.iter()), |
| max_payloads_per_fidl_response |
| ); |
| |
| // Send all the payloads, the first FIDL response should contain as many as will fit. |
| let () = item_sender.unbounded_send(payloads).unwrap(); |
| let mut chunk_fut = proxy.next(); |
| assert_matches!(executor.run_until_stalled(&mut serve_task), std::task::Poll::Pending); |
| assert_matches!( |
| executor.run_until_stalled(&mut chunk_fut), |
| std::task::Poll::Ready(Ok(chunk)) |
| if chunk.len() == max_payloads_per_fidl_response |
| ); |
| |
| // There should be one payload left in the OwningChunker, so we should be able to obtain |
| // another FIDL response without sending more payloads. |
| let mut chunk_fut = proxy.next(); |
| assert_matches!(executor.run_until_stalled(&mut serve_task), std::task::Poll::Pending); |
| assert_matches!( |
| executor.run_until_stalled(&mut chunk_fut), |
| std::task::Poll::Ready(Ok(chunk)) |
| if chunk.len() == 1 |
| ); |
| |
| // There should be no payloads left, so the next Next request should block. |
| let mut chunk_fut = proxy.next(); |
| assert_matches!(executor.run_until_stalled(&mut serve_task), std::task::Poll::Pending); |
| assert_matches!(executor.run_until_stalled(&mut chunk_fut), std::task::Poll::Pending); |
| |
| // The serving task should start providing payloads again when more are provided. |
| let () = item_sender.unbounded_send(vec![blob_info; 2]).unwrap(); |
| assert_matches!(executor.run_until_stalled(&mut serve_task), std::task::Poll::Pending); |
| assert_matches!( |
| executor.run_until_stalled(&mut chunk_fut), |
| std::task::Poll::Ready(Ok(chunk)) |
| if chunk.len() == 2 |
| ); |
| } |
| } |