| // Copyright 2019 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| use { |
| anyhow::{format_err, Context as _, Error}, |
| fidl::endpoints::ClientEnd, |
| fidl_fuchsia_media::*, |
| fidl_fuchsia_mediacodec::*, |
| fidl_fuchsia_sysmem2::*, |
| fuchsia_stream_processors::*, |
| fuchsia_sync::{Mutex, RwLock}, |
| futures::{ |
| future::{maybe_done, MaybeDone}, |
| io::{self, AsyncWrite}, |
| ready, |
| stream::{FusedStream, Stream}, |
| task::{Context, Poll, Waker}, |
| Future, StreamExt, |
| }, |
| std::{ |
| collections::{HashSet, VecDeque}, |
| mem, |
| pin::Pin, |
| sync::Arc, |
| }, |
| tracing::{trace, warn}, |
| }; |
| |
| use crate::{ |
| buffer_collection_constraints::buffer_collection_constraints_default, |
| sysmem_allocator::{BufferName, SysmemAllocatedBuffers, SysmemAllocation}, |
| }; |
| |
| fn fidl_error_to_io_error(e: fidl::Error) -> io::Error { |
| io::Error::new(io::ErrorKind::Other, format_err!("Fidl Error: {}", e)) |
| } |
| |
| #[derive(Debug)] |
| /// Listener is a three-valued Option that captures the waker that a listener needs to be woken |
| /// upon when it polls the future instead of at registration time. |
| enum Listener { |
| /// No one is listening. |
| None, |
| /// Someone is listening, but either have been woken and not repolled, or never polled yet. |
| New, |
| /// Someone is listening, and can be woken with the waker. |
| Some(Waker), |
| } |
| |
| impl Listener { |
| /// Adds a waker to be awoken with `Listener::wake`. |
| /// Panics if no one is listening. |
| fn register(&mut self, waker: Waker) { |
| *self = match mem::replace(self, Listener::None) { |
| Listener::None => panic!("Polled a listener with no pollers"), |
| _ => Listener::Some(waker), |
| }; |
| } |
| |
| /// If a listener has polled, wake the listener and replace it with New. |
| /// Noop if no one has registered. |
| fn wake(&mut self) { |
| if let Listener::None = self { |
| return; |
| } |
| match mem::replace(self, Listener::New) { |
| Listener::None => panic!("Should have been polled"), |
| Listener::Some(waker) => waker.wake(), |
| Listener::New => {} |
| } |
| } |
| |
| /// Get a reference to the waker, if there is one waiting. |
| fn waker(&self) -> Option<&Waker> { |
| if let Listener::Some(ref waker) = self { |
| Some(waker) |
| } else { |
| None |
| } |
| } |
| } |
| |
| impl Default for Listener { |
| fn default() -> Self { |
| Listener::None |
| } |
| } |
| |
| /// A queue of encoded packets, to be sent to the `listener` when it polls next. |
| struct OutputQueue { |
| /// The listener. Woken when a packet arrives after a previous poll() returned Pending. |
| listener: Listener, |
| /// A queue of encoded packets to be delivered to the receiver. |
| queue: VecDeque<Packet>, |
| /// True when the stream has received an end-of-stream message. The stream will return None |
| /// after the `queue` is empty. |
| ended: bool, |
| } |
| |
| impl OutputQueue { |
| /// Adds a packet to the queue and wakes the listener if necessary. |
| fn enqueue(&mut self, packet: Packet) { |
| self.queue.push_back(packet); |
| self.listener.wake(); |
| } |
| |
| /// Signals the end of the stream has happened. |
| /// Wakes the listener if necessary. |
| fn mark_ended(&mut self) { |
| self.ended = true; |
| self.listener.wake(); |
| } |
| |
| fn waker(&self) -> Option<&Waker> { |
| self.listener.waker() |
| } |
| |
| /// Wakes the listener so that it will repoll, if it is waiting. |
| fn wake(&mut self) { |
| self.listener.wake(); |
| } |
| } |
| |
| impl Default for OutputQueue { |
| fn default() -> Self { |
| OutputQueue { listener: Listener::default(), queue: VecDeque::new(), ended: false } |
| } |
| } |
| |
| impl Stream for OutputQueue { |
| type Item = Packet; |
| |
| fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
| match self.queue.pop_front() { |
| Some(packet) => Poll::Ready(Some(packet)), |
| None if self.ended => Poll::Ready(None), |
| None => { |
| self.listener.register(cx.waker().clone()); |
| Poll::Pending |
| } |
| } |
| } |
| } |
| |
| // The minimum specified by codec is too small to contain the typical pcm frame chunk size for the |
| // encoder case (1024). Increase to a reasonable amount. |
| const MIN_INPUT_BUFFER_SIZE: u32 = 4096; |
| // Go with codec default for output, for frame alignment. |
| const MIN_OUTPUT_BUFFER_SIZE: u32 = 0; |
| |
| /// Index of an input buffer to be shared between the client and the StreamProcessor. |
| #[derive(PartialEq, Eq, Hash, Clone, Debug)] |
| struct InputBufferIndex(u32); |
| |
| /// The StreamProcessorInner handles the events that come from the StreamProcessor, mostly related |
| /// to setup of the buffers and handling the output packets as they arrive. |
| struct StreamProcessorInner { |
| /// The proxy to the stream processor. |
| processor: StreamProcessorProxy, |
| /// The proxy to the sysmem allocator. |
| sysmem_client: AllocatorProxy, |
| /// The event stream from the StreamProcessor. We handle these internally. |
| events: StreamProcessorEventStream, |
| /// The size in bytes of each input packet |
| input_packet_size: u64, |
| /// The set of input buffers that are available for writing by the client, without the one |
| /// possibly being used by the input_cursor. |
| client_owned: HashSet<InputBufferIndex>, |
| /// A cursor on the next input buffer location to be written to when new input data arrives. |
| input_cursor: Option<(InputBufferIndex, u64)>, |
| /// An queue of the indexes of output buffers that have been filled by the processor and a |
| /// waiter if someone is waiting on it. |
| /// Also holds the output waker, if it is registered. |
| output_queue: Mutex<OutputQueue>, |
| /// Waker that is waiting on input to be ready. |
| input_waker: Option<Waker>, |
| /// Allocation for the input buffers. |
| input_allocation: MaybeDone<SysmemAllocation>, |
| /// Allocation for the output buffers. |
| output_allocation: MaybeDone<SysmemAllocation>, |
| } |
| |
| impl StreamProcessorInner { |
| /// Handles an event from the StreamProcessor. A number of these events come on stream start to |
| /// setup the input and output buffers, and from then on the output packets and end of stream |
| /// marker, and the input packets are marked as usable after they are processed. |
| fn handle_event(&mut self, evt: StreamProcessorEvent) -> Result<(), Error> { |
| match evt { |
| StreamProcessorEvent::OnInputConstraints { input_constraints } => { |
| let _input_constraints = ValidStreamBufferConstraints::try_from(input_constraints)?; |
| let buffer_constraints = |
| Self::buffer_constraints_from_min_size(MIN_INPUT_BUFFER_SIZE); |
| let processor = self.processor.clone(); |
| let mut partial_settings = Self::partial_settings(); |
| let token_fn = move |token: ClientEnd<BufferCollectionTokenMarker>| { |
| // A sysmem token channel serves both sysmem(1) and sysmem2 token protocols, so |
| // we can convert here until StreamProcessor has a sysmem2 token field. |
| partial_settings.sysmem_token = |
| Some(ClientEnd::<fidl_fuchsia_sysmem::BufferCollectionTokenMarker>::new( |
| token.into_channel(), |
| )); |
| // FIDL failures will be caught via the request stream. |
| if let Err(e) = processor.set_input_buffer_partial_settings(partial_settings) { |
| warn!("Couldn't set input buffer settings: {:?}", e); |
| } |
| }; |
| self.input_allocation = maybe_done(SysmemAllocation::allocate( |
| self.sysmem_client.clone(), |
| BufferName { name: "StreamProcessorInput", priority: 1 }, |
| None, |
| buffer_constraints, |
| token_fn, |
| )?); |
| } |
| StreamProcessorEvent::OnOutputConstraints { output_config } => { |
| let output_constraints = ValidStreamOutputConstraints::try_from(output_config)?; |
| if !output_constraints.buffer_constraints_action_required { |
| return Ok(()); |
| } |
| let buffer_constraints = |
| Self::buffer_constraints_from_min_size(MIN_OUTPUT_BUFFER_SIZE); |
| let processor = self.processor.clone(); |
| let mut partial_settings = Self::partial_settings(); |
| let token_fn = move |token: ClientEnd<BufferCollectionTokenMarker>| { |
| // A sysmem token channel serves both sysmem(1) and sysmem2 token protocols, so |
| // we can convert here until StreamProcessor has a sysmem2 token field. |
| partial_settings.sysmem_token = |
| Some(ClientEnd::<fidl_fuchsia_sysmem::BufferCollectionTokenMarker>::new( |
| token.into_channel(), |
| )); |
| // FIDL failures will be caught via the request stream. |
| if let Err(e) = processor.set_output_buffer_partial_settings(partial_settings) { |
| warn!("Couldn't set output buffer settings: {:?}", e); |
| } |
| }; |
| |
| self.output_allocation = maybe_done(SysmemAllocation::allocate( |
| self.sysmem_client.clone(), |
| BufferName { name: "StreamProcessorOutput", priority: 1 }, |
| None, |
| buffer_constraints, |
| token_fn, |
| )?); |
| } |
| StreamProcessorEvent::OnOutputPacket { output_packet, .. } => { |
| let mut lock = self.output_queue.lock(); |
| lock.enqueue(output_packet); |
| } |
| StreamProcessorEvent::OnFreeInputPacket { |
| free_input_packet: PacketHeader { packet_index: Some(idx), .. }, |
| } => { |
| if !self.client_owned.insert(InputBufferIndex(idx)) { |
| warn!("Freed an input packet that was already freed: {:?}", idx); |
| } |
| self.setup_input_cursor(); |
| } |
| StreamProcessorEvent::OnOutputEndOfStream { .. } => { |
| let mut lock = self.output_queue.lock(); |
| lock.mark_ended(); |
| } |
| StreamProcessorEvent::OnOutputFormat { .. } => {} |
| e => trace!("Unhandled stream processor event: {:?}", e), |
| } |
| Ok(()) |
| } |
| |
| /// Process one event, and return Poll::Ready if the item has been processed, |
| /// and Poll::Pending if no event has been processed and the waker will be woken if |
| /// another event happens. |
| fn process_event(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Error>> { |
| match ready!(self.events.poll_next_unpin(cx)) { |
| Some(Err(e)) => Poll::Ready(Err(e.into())), |
| Some(Ok(event)) => Poll::Ready(self.handle_event(event)), |
| None => Poll::Ready(Err(format_err!("Client disconnected"))), |
| } |
| } |
| |
| fn buffer_constraints_from_min_size(min_buffer_size: u32) -> BufferCollectionConstraints { |
| BufferCollectionConstraints { |
| buffer_memory_constraints: Some(BufferMemoryConstraints { |
| min_size_bytes: Some(min_buffer_size as u64), |
| ..Default::default() |
| }), |
| ..buffer_collection_constraints_default() |
| } |
| } |
| |
| fn partial_settings() -> StreamBufferPartialSettings { |
| StreamBufferPartialSettings { |
| buffer_lifetime_ordinal: Some(1), |
| buffer_constraints_version_ordinal: Some(1), |
| sysmem_token: None, |
| ..Default::default() |
| } |
| } |
| |
| fn input_buffers(&mut self) -> &mut SysmemAllocatedBuffers { |
| Pin::new(&mut self.input_allocation) |
| .output_mut() |
| .expect("allocation completed") |
| .as_mut() |
| .expect("succcessful allocation") |
| } |
| |
| fn output_buffers(&mut self) -> &mut SysmemAllocatedBuffers { |
| Pin::new(&mut self.output_allocation) |
| .output_mut() |
| .expect("allocation completed") |
| .as_mut() |
| .expect("succcessful allocation") |
| } |
| |
| /// Called when the input_allocation future finishes. |
| /// Takes the buffers out of the allocator, and sets up the input cursor to accept data. |
| fn input_allocation_complete(&mut self) -> Result<(), Error> { |
| let _ = Pin::new(&mut self.input_allocation) |
| .output_mut() |
| .ok_or(format_err!("allocation isn't complete"))?; |
| |
| let settings = self.input_buffers().settings(); |
| self.input_packet_size = (*settings.size_bytes.as_ref().unwrap()).try_into()?; |
| let buffer_count = self.input_buffers().len(); |
| for i in 0..buffer_count { |
| let _ = self.client_owned.insert(InputBufferIndex(i.try_into()?)); |
| } |
| // allocation is complete, and we can write to the input. |
| self.setup_input_cursor(); |
| Ok(()) |
| } |
| |
| /// Called when the output allocation future finishes. |
| /// Takes the buffers out of the allocator, and sets up the output buffers for retrieval of output, |
| /// signaling to the processor that the output buffers are set. |
| fn output_allocation_complete(&mut self) -> Result<(), Error> { |
| let _ = Pin::new(&mut self.output_allocation) |
| .output_mut() |
| .ok_or(format_err!("allocation isn't complete"))?; |
| self.processor |
| .complete_output_buffer_partial_settings(/*buffer_lifetime_ordinal=*/ 1) |
| .context("setting output buffer settings")?; |
| Ok(()) |
| } |
| |
| /// Poll any of the allocations that are waiting to complete, returning Pending if |
| /// any are still waiting to finish, and Ready if one has failed or both have completed. |
| fn poll_buffer_allocation(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Error>> { |
| if let MaybeDone::Future(_) = self.input_allocation { |
| match Pin::new(&mut self.input_allocation).poll(cx) { |
| Poll::Ready(()) => { |
| if let Err(e) = self.input_allocation_complete() { |
| return Poll::Ready(Err(e)); |
| } |
| } |
| Poll::Pending => {} |
| }; |
| } |
| if let MaybeDone::Future(_) = self.output_allocation { |
| match Pin::new(&mut self.output_allocation).poll(cx) { |
| Poll::Ready(()) => { |
| if let Err(e) = self.output_allocation_complete() { |
| return Poll::Ready(Err(e)); |
| } |
| } |
| Poll::Pending => {} |
| }; |
| } |
| Poll::Pending |
| } |
| |
| /// Provides the current registered waiting context with priority given to the output waker. |
| fn waiting_waker(&self) -> Option<Waker> { |
| match (self.output_queue.lock().waker(), &self.input_waker) { |
| // No one is waiting. |
| (None, None) => None, |
| (Some(waker), _) => Some(waker.clone()), |
| (_, Some(waker)) => Some(waker.clone()), |
| } |
| } |
| |
| /// Process all the events that are currently available from the StreamProcessor and Allocators, |
| /// waking any known waker to be woken when another event arrives. |
| /// Returns Ok(()) if this was accomplished or Err() if an error occurred while processing. |
| fn poll_events(&mut self) -> Result<(), Error> { |
| let waker = loop { |
| let waker = match self.waiting_waker() { |
| // No one still needs to be woken. This means all the wakers have been awoke, |
| // and will repoll. |
| None => return Ok(()), |
| Some(waker) => waker, |
| }; |
| match self.process_event(&mut Context::from_waker(&waker)) { |
| Poll::Pending => break waker, |
| Poll::Ready(Err(e)) => { |
| warn!("Stream processing error: {:?}", e); |
| return Err(e.into()); |
| } |
| // Didn't set the waker to be awoken, so let's try again. |
| Poll::Ready(Ok(())) => {} |
| } |
| }; |
| |
| if let Poll::Ready(Err(e)) = self.poll_buffer_allocation(&mut Context::from_waker(&waker)) { |
| warn!("Stream buffer allocation error: {:?}", e); |
| return Err(e.into()); |
| } |
| Ok(()) |
| } |
| |
| fn wake_output(&mut self) { |
| self.output_queue.lock().wake(); |
| } |
| |
| fn wake_input(&mut self) { |
| if let Some(w) = self.input_waker.take() { |
| w.wake(); |
| } |
| } |
| |
| /// Attempts to set up a new input cursor, out of the current set of client owned input buffers. |
| /// If the cursor is already set, this does nothing. |
| fn setup_input_cursor(&mut self) { |
| if self.input_cursor.is_some() { |
| // Nothing to be done |
| return; |
| } |
| let next_idx = match self.client_owned.iter().next() { |
| None => return, |
| Some(idx) => idx.clone(), |
| }; |
| let _ = self.client_owned.remove(&next_idx); |
| self.input_cursor = Some((next_idx, 0)); |
| self.wake_input(); |
| } |
| |
| /// Reads an output packet from the output buffers, and marks the packets as recycled so the |
| /// output buffer can be reused. Allocates a new vector to hold the data. |
| fn read_output_packet(&mut self, packet: Packet) -> Result<Vec<u8>, Error> { |
| let packet = ValidPacket::try_from(packet)?; |
| |
| let output_size = packet.valid_length_bytes as usize; |
| let offset = packet.start_offset as u64; |
| let mut output = vec![0; output_size]; |
| let buf_idx = packet.buffer_index; |
| let vmo = self.output_buffers().get_mut(buf_idx).expect("output vmo should exist"); |
| vmo.read(&mut output, offset)?; |
| self.processor.recycle_output_packet(&packet.header.into())?; |
| Ok(output) |
| } |
| } |
| |
| /// Struct representing a CodecFactory . |
| /// Input sent to the encoder via `StreamProcessor::write_bytes` is queued for delivery, and delivered |
| /// whenever a packet is full or `StreamProcessor::send_packet` is called. Output can be retrieved using |
| /// an `StreamProcessorStream` from `StreamProcessor::take_output_stream`. |
| pub struct StreamProcessor { |
| inner: Arc<RwLock<StreamProcessorInner>>, |
| } |
| |
| /// An StreamProcessorStream is a Stream of processed data from a stream processor. |
| /// Returned from `StreamProcessor::take_output_stream`. |
| pub struct StreamProcessorOutputStream { |
| inner: Arc<RwLock<StreamProcessorInner>>, |
| } |
| |
| impl StreamProcessor { |
| /// Create a new StreamProcessor given the proxy. |
| /// Takes the event stream of the proxy. |
| fn create(processor: StreamProcessorProxy, sysmem_client: AllocatorProxy) -> Self { |
| let events = processor.take_event_stream(); |
| Self { |
| inner: Arc::new(RwLock::new(StreamProcessorInner { |
| processor, |
| sysmem_client, |
| events, |
| input_packet_size: 0, |
| client_owned: HashSet::new(), |
| input_cursor: None, |
| output_queue: Default::default(), |
| input_waker: None, |
| input_allocation: maybe_done(SysmemAllocation::pending()), |
| output_allocation: maybe_done(SysmemAllocation::pending()), |
| })), |
| } |
| } |
| |
| /// Create a new StreamProcessor encoder, with the given `input_domain` and `encoder_settings`. See |
| /// stream_processor.fidl for descriptions of these parameters. This is only meant for audio |
| /// encoding. |
| pub fn create_encoder( |
| input_domain: DomainFormat, |
| encoder_settings: EncoderSettings, |
| ) -> Result<StreamProcessor, Error> { |
| let sysmem_client = fuchsia_component::client::connect_to_protocol::<AllocatorMarker>() |
| .context("Connecting to sysmem")?; |
| |
| let format_details = FormatDetails { |
| domain: Some(input_domain), |
| encoder_settings: Some(encoder_settings), |
| format_details_version_ordinal: Some(1), |
| mime_type: Some("audio/pcm".to_string()), |
| oob_bytes: None, |
| pass_through_parameters: None, |
| timebase: None, |
| ..Default::default() |
| }; |
| |
| let encoder_params = CreateEncoderParams { |
| input_details: Some(format_details), |
| require_hw: Some(false), |
| ..Default::default() |
| }; |
| |
| let codec_svc = fuchsia_component::client::connect_to_protocol::<CodecFactoryMarker>() |
| .context("Failed to connect to Codec Factory")?; |
| |
| let (processor, stream_processor_serverend) = fidl::endpoints::create_proxy()?; |
| |
| codec_svc.create_encoder(&encoder_params, stream_processor_serverend)?; |
| |
| Ok(StreamProcessor::create(processor, sysmem_client)) |
| } |
| |
| /// Create a new StreamProcessor decoder, with the given `mime_type` and optional `oob_bytes`. See |
| /// stream_processor.fidl for descriptions of these parameters. This is only meant for audio |
| /// decoding. |
| pub fn create_decoder( |
| mime_type: &str, |
| oob_bytes: Option<Vec<u8>>, |
| ) -> Result<StreamProcessor, Error> { |
| let sysmem_client = fuchsia_component::client::connect_to_protocol::<AllocatorMarker>() |
| .context("Connecting to sysmem")?; |
| |
| let format_details = FormatDetails { |
| mime_type: Some(mime_type.to_string()), |
| oob_bytes: oob_bytes, |
| format_details_version_ordinal: Some(1), |
| encoder_settings: None, |
| domain: None, |
| pass_through_parameters: None, |
| timebase: None, |
| ..Default::default() |
| }; |
| |
| let decoder_params = CreateDecoderParams { |
| input_details: Some(format_details), |
| permit_lack_of_split_header_handling: Some(true), |
| ..Default::default() |
| }; |
| |
| let codec_svc = fuchsia_component::client::connect_to_protocol::<CodecFactoryMarker>() |
| .context("Failed to connect to Codec Factory")?; |
| |
| let (processor, stream_processor_serverend) = fidl::endpoints::create_proxy()?; |
| |
| codec_svc.create_decoder(&decoder_params, stream_processor_serverend)?; |
| |
| Ok(StreamProcessor::create(processor, sysmem_client)) |
| } |
| |
| /// Take a stream object which will produce the output of the processor. |
| /// Only one StreamProcessorOutputStream object can exist at a time, and this will return an Error if it is |
| /// already taken. |
| pub fn take_output_stream(&mut self) -> Result<StreamProcessorOutputStream, Error> { |
| { |
| let read = self.inner.read(); |
| let mut lock = read.output_queue.lock(); |
| if let Listener::None = lock.listener { |
| lock.listener = Listener::New; |
| } else { |
| return Err(format_err!("Output stream already taken")); |
| } |
| } |
| Ok(StreamProcessorOutputStream { inner: self.inner.clone() }) |
| } |
| |
| /// Deliver input to the stream processor. Returns the number of bytes delivered. |
| fn write_bytes(&mut self, bytes: &[u8]) -> Result<usize, io::Error> { |
| let mut bytes_idx = 0; |
| while bytes.len() > bytes_idx { |
| { |
| let mut write = self.inner.write(); |
| let (idx, size) = match write.input_cursor.take() { |
| None => return Ok(bytes_idx), |
| Some(x) => x, |
| }; |
| let space_left = write.input_packet_size - size; |
| let left_to_write = bytes.len() - bytes_idx; |
| let buffer_vmo = write.input_buffers().get_mut(idx.0).expect("need buffer vmo"); |
| if space_left as usize > left_to_write { |
| let write_buf = &bytes[bytes_idx..]; |
| let write_len = write_buf.len(); |
| buffer_vmo.write(write_buf, size)?; |
| bytes_idx += write_len; |
| write.input_cursor = Some((idx, size + write_len as u64)); |
| assert!(bytes.len() == bytes_idx); |
| return Ok(bytes_idx); |
| } |
| let end_idx = bytes_idx + space_left as usize; |
| let write_buf = &bytes[bytes_idx..end_idx]; |
| let write_len = write_buf.len(); |
| buffer_vmo.write(write_buf, size)?; |
| bytes_idx += write_len; |
| // this buffer is done, ship it! |
| assert_eq!(size + write_len as u64, write.input_packet_size); |
| write.input_cursor = Some((idx, write.input_packet_size)); |
| } |
| self.send_packet()?; |
| } |
| Ok(bytes_idx) |
| } |
| |
| /// Flush the input buffer to the processor, relinquishing the ownership of the buffer |
| /// currently in the input cursor, and picking a new input buffer. If there is no input |
| /// buffer left, the input cursor is left as None. |
| pub fn send_packet(&mut self) -> Result<(), io::Error> { |
| let mut write = self.inner.write(); |
| if write.input_cursor.is_none() { |
| // Nothing to flush, nothing can have been written to an empty input cursor. |
| return Ok(()); |
| } |
| let (idx, size) = write.input_cursor.take().expect("input cursor is none"); |
| if size == 0 { |
| // Can't send empty packet to processor. |
| write.input_cursor = Some((idx, size)); |
| return Ok(()); |
| } |
| let packet = Packet { |
| header: Some(PacketHeader { |
| buffer_lifetime_ordinal: Some(1), |
| packet_index: Some(idx.0), |
| ..Default::default() |
| }), |
| buffer_index: Some(idx.0), |
| stream_lifetime_ordinal: Some(1), |
| start_offset: Some(0), |
| valid_length_bytes: Some(size as u32), |
| start_access_unit: Some(true), |
| known_end_access_unit: Some(true), |
| ..Default::default() |
| }; |
| write.processor.queue_input_packet(&packet).map_err(fidl_error_to_io_error)?; |
| // pick another buffer for the input cursor |
| write.setup_input_cursor(); |
| Ok(()) |
| } |
| |
| /// Test whether it is possible to write to the StreamProcessor. If there are no input buffers |
| /// available, returns Poll::Pending and arranges for the input task to receive a |
| /// notification when an input buffer may be available or the encoder is closed. |
| fn poll_writable(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> { |
| let mut write = self.inner.write(); |
| // Drop the current input waker, since we have a new one. |
| // If the output waker is set, it should already be queued to be woken for the codec. |
| write.input_waker = None; |
| if write.input_cursor.is_some() { |
| return Poll::Ready(Ok(())); |
| } |
| write.input_waker = Some(cx.waker().clone()); |
| // This can: |
| // - wake the input waker (somehow received a input packet) |
| // - poll with the output waker, setting it up to be woken |
| // - poll with the input waker to be woken |
| if let Err(e) = write.poll_events() { |
| return Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, e))); |
| } |
| Poll::Pending |
| } |
| |
| pub fn close(&mut self) -> Result<(), io::Error> { |
| self.send_packet()?; |
| |
| let mut write = self.inner.write(); |
| |
| write.processor.queue_input_end_of_stream(1).map_err(fidl_error_to_io_error)?; |
| // TODO: indicate this another way so that we can send an error if someone tries to write |
| // it after it's closed. |
| write.input_cursor = None; |
| write.wake_input(); |
| write.wake_output(); |
| Ok(()) |
| } |
| } |
| |
| impl AsyncWrite for StreamProcessor { |
| fn poll_write( |
| mut self: Pin<&mut Self>, |
| cx: &mut Context<'_>, |
| buf: &[u8], |
| ) -> Poll<io::Result<usize>> { |
| ready!(self.poll_writable(cx))?; |
| match self.write_bytes(buf) { |
| Ok(written) => Poll::Ready(Ok(written)), |
| Err(e) => Poll::Ready(Err(e.into())), |
| } |
| } |
| |
| fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> { |
| Poll::Ready(self.send_packet()) |
| } |
| |
| fn poll_close(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> { |
| Poll::Ready(self.send_packet()) |
| } |
| } |
| |
| impl Stream for StreamProcessorOutputStream { |
| type Item = Result<Vec<u8>, Error>; |
| |
| fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
| let mut write = self.inner.write(); |
| // If we have a item ready, just return it. |
| let packet = { |
| let mut queue = write.output_queue.lock(); |
| match queue.poll_next_unpin(cx) { |
| Poll::Ready(Some(packet)) => Some(Some(packet)), |
| Poll::Ready(None) => Some(None), |
| Poll::Pending => { |
| // The waker has been set for when the queue gets data. |
| // We also need to set the same waker if an event happens. |
| None |
| } |
| } |
| }; |
| // We always need to set a waker for the events loop (this may be the same waker as above, |
| // or the input waker if the stream returned a packet) |
| if let Err(e) = write.poll_events() { |
| return Poll::Ready(Some(Err(e.into()))); |
| } |
| match packet { |
| Some(Some(packet)) => Poll::Ready(Some(write.read_output_packet(packet))), |
| Some(None) => Poll::Ready(None), |
| None => Poll::Pending, |
| } |
| } |
| } |
| |
| impl FusedStream for StreamProcessorOutputStream { |
| fn is_terminated(&self) -> bool { |
| self.inner.read().output_queue.lock().ended |
| } |
| } |
| |
| #[cfg(test)] |
| mod tests { |
| use super::*; |
| |
| use async_test_helpers::run_while; |
| use byteorder::{ByteOrder, NativeEndian}; |
| use fixture::fixture; |
| use fuchsia_async as fasync; |
| use futures::{io::AsyncWriteExt, FutureExt}; |
| use futures_test::task::new_count_waker; |
| use sha2::{Digest as _, Sha256}; |
| use std::fs::File; |
| use std::io::{Read, Write}; |
| use std::pin::pin; |
| |
| use stream_processor_test::ExpectedDigest; |
| |
| const PCM_SAMPLE_SIZE: usize = 2; |
| |
| #[derive(Clone, Debug)] |
| pub struct PcmAudio { |
| pcm_format: PcmFormat, |
| buffer: Vec<u8>, |
| } |
| |
| impl PcmAudio { |
| pub fn create_saw_wave(pcm_format: PcmFormat, frame_count: usize) -> Self { |
| const FREQUENCY: f32 = 20.0; |
| const AMPLITUDE: f32 = 0.2; |
| |
| let pcm_frame_size = PCM_SAMPLE_SIZE * pcm_format.channel_map.len(); |
| let samples_per_frame = pcm_format.channel_map.len(); |
| let sample_count = frame_count * samples_per_frame; |
| |
| let mut buffer = vec![0; frame_count * pcm_frame_size]; |
| |
| for i in 0..sample_count { |
| let frame = (i / samples_per_frame) as f32; |
| let value = |
| ((frame * FREQUENCY / (pcm_format.frames_per_second as f32)) % 1.0) * AMPLITUDE; |
| let sample = (value * i16::max_value() as f32) as i16; |
| |
| let mut sample_bytes = [0; std::mem::size_of::<i16>()]; |
| NativeEndian::write_i16(&mut sample_bytes, sample); |
| |
| let offset = i * PCM_SAMPLE_SIZE; |
| buffer[offset] = sample_bytes[0]; |
| buffer[offset + 1] = sample_bytes[1]; |
| } |
| |
| Self { pcm_format, buffer } |
| } |
| |
| pub fn frame_size(&self) -> usize { |
| self.pcm_format.channel_map.len() * PCM_SAMPLE_SIZE |
| } |
| } |
| |
| // Note: stolen from audio_encoder_test, update to stream_processor_test lib when this gets |
| // moved. |
| pub struct BytesValidator { |
| pub output_file: Option<&'static str>, |
| pub expected_digest: ExpectedDigest, |
| } |
| |
| impl BytesValidator { |
| fn write_and_hash(&self, mut file: impl Write, bytes: &[u8]) -> Result<(), Error> { |
| let mut hasher = Sha256::default(); |
| |
| file.write_all(&bytes)?; |
| hasher.update(&bytes); |
| |
| let digest: [u8; 32] = hasher.finalize().into(); |
| if self.expected_digest.bytes != digest { |
| return Err(format_err!( |
| "Expected {}; got {}", |
| self.expected_digest, |
| hex::encode(digest) |
| )) |
| .into(); |
| } |
| |
| Ok(()) |
| } |
| |
| fn output_file(&self) -> Result<impl Write, Error> { |
| Ok(if let Some(file) = self.output_file { |
| Box::new(std::fs::File::create(file)?) as Box<dyn Write> |
| } else { |
| Box::new(std::io::sink()) as Box<dyn Write> |
| }) |
| } |
| |
| fn validate(&self, bytes: &[u8]) -> Result<(), Error> { |
| self.write_and_hash(self.output_file()?, &bytes) |
| } |
| } |
| |
| #[fuchsia::test] |
| fn encode_sbc() { |
| let mut exec = fasync::TestExecutor::new(); |
| |
| let pcm_format = PcmFormat { |
| pcm_mode: AudioPcmMode::Linear, |
| bits_per_sample: 16, |
| frames_per_second: 44100, |
| channel_map: vec![AudioChannelId::Cf], |
| }; |
| |
| let sub_bands = SbcSubBands::SubBands4; |
| let block_count = SbcBlockCount::BlockCount8; |
| |
| let input_frames = 3000; |
| |
| let pcm_audio = PcmAudio::create_saw_wave(pcm_format.clone(), input_frames); |
| |
| let sbc_encoder_settings = EncoderSettings::Sbc(SbcEncoderSettings { |
| sub_bands, |
| block_count, |
| allocation: SbcAllocation::AllocLoudness, |
| channel_mode: SbcChannelMode::Mono, |
| bit_pool: 59, // Recommended from the SBC spec for these parameters. |
| }); |
| |
| let input_domain = DomainFormat::Audio(AudioFormat::Uncompressed( |
| AudioUncompressedFormat::Pcm(pcm_format), |
| )); |
| |
| let mut encoder = StreamProcessor::create_encoder(input_domain, sbc_encoder_settings) |
| .expect("to create Encoder"); |
| |
| let frames_per_packet: usize = 8; // Randomly chosen by fair d10 roll. |
| let packet_size = pcm_audio.frame_size() * frames_per_packet; |
| let mut packets = pcm_audio.buffer.as_slice().chunks(packet_size); |
| let first_packet = packets.next().unwrap(); |
| |
| // Write an initial frame to the encoder. |
| // This is required to get past allocating the input/output buffers. |
| let written = |
| exec.run_singlethreaded(&mut encoder.write(first_packet)).expect("successful write"); |
| assert_eq!(written, first_packet.len()); |
| |
| let mut encoded_stream = encoder.take_output_stream().expect("Stream should be taken"); |
| |
| // Shouldn't be able to take the stream twice |
| assert!(encoder.take_output_stream().is_err()); |
| |
| // Polling the encoded stream before the encoder has started up should wake it when |
| // output starts happening, set up the poll here. |
| let encoded_fut = pin!(encoded_stream.next()); |
| |
| let (waker, encoder_fut_wake_count) = new_count_waker(); |
| let mut counting_ctx = Context::from_waker(&waker); |
| |
| assert!(encoded_fut.poll(&mut counting_ctx).is_pending()); |
| |
| let mut frames_sent = first_packet.len() / pcm_audio.frame_size(); |
| |
| for packet in packets { |
| let mut written_fut = encoder.write(&packet); |
| |
| let written_bytes = |
| exec.run_singlethreaded(&mut written_fut).expect("to write to encoder"); |
| |
| assert_eq!(packet.len(), written_bytes); |
| frames_sent += packet.len() / pcm_audio.frame_size(); |
| } |
| |
| encoder.close().expect("stream should always be closable"); |
| |
| assert_eq!(input_frames, frames_sent); |
| |
| // When an unprocessed event has happened on the stream, even if intervening events have been |
| // procesed by the input processes, it should wake the output future to process the events. |
| let woke_count = encoder_fut_wake_count.get(); |
| while encoder_fut_wake_count.get() == woke_count { |
| let _ = exec.run_until_stalled(&mut futures::future::pending::<()>()); |
| } |
| assert_eq!(encoder_fut_wake_count.get(), woke_count + 1); |
| |
| // Get data from the output now. |
| let mut encoded = Vec::new(); |
| |
| loop { |
| let mut encoded_fut = encoded_stream.next(); |
| |
| match exec.run_singlethreaded(&mut encoded_fut) { |
| Some(Ok(enc_data)) => { |
| assert!(!enc_data.is_empty()); |
| encoded.extend_from_slice(&enc_data); |
| } |
| Some(Err(e)) => { |
| panic!("Unexpected error when polling encoded data: {}", e); |
| } |
| None => { |
| break; |
| } |
| } |
| } |
| |
| // Match the encoded data to the known hash. |
| let expected_digest = ExpectedDigest::new( |
| "Sbc: 44.1kHz/Loudness/Mono/bitpool 56/blocks 8/subbands 4", |
| "5c65a88bda3f132538966d87df34aa8675f85c9892b7f9f5571f76f3c7813562", |
| ); |
| let hash_validator = BytesValidator { output_file: None, expected_digest }; |
| |
| assert_eq!(6110, encoded.len(), "Encoded size should be equal"); |
| |
| let validated = hash_validator.validate(encoded.as_slice()); |
| assert!(validated.is_ok(), "Failed hash: {:?}", validated); |
| } |
| |
| fn fix_sbc_test_file<F>(_name: &str, test: F) |
| where |
| F: FnOnce(Vec<u8>) -> (), |
| { |
| const SBC_TEST_FILE: &str = "/pkg/data/s16le44100mono.sbc"; |
| |
| let mut sbc_data = Vec::new(); |
| let _ = File::open(SBC_TEST_FILE) |
| .expect("open test file") |
| .read_to_end(&mut sbc_data) |
| .expect("read test file"); |
| |
| test(sbc_data) |
| } |
| |
| #[fixture(fix_sbc_test_file)] |
| #[fuchsia::test] |
| fn decode_sbc(sbc_data: Vec<u8>) { |
| let mut exec = fasync::TestExecutor::new(); |
| |
| const SBC_FRAME_SIZE: usize = 72; |
| const INPUT_FRAMES: usize = 23; |
| |
| // SBC codec info corresponding to Mono reference stream. |
| let oob_data = Some([0x82, 0x00, 0x00, 0x00].to_vec()); |
| let mut decoder = |
| StreamProcessor::create_decoder("audio/sbc", oob_data).expect("to create decoder"); |
| |
| let mut decoded_stream = decoder.take_output_stream().expect("Stream should be taken"); |
| |
| // Shouldn't be able to take the stream twice |
| assert!(decoder.take_output_stream().is_err()); |
| |
| let mut frames_sent = 0; |
| |
| let frames_per_packet: usize = 1; // Randomly chosen by fair d10 roll. |
| let packet_size = SBC_FRAME_SIZE * frames_per_packet; |
| |
| for frames in sbc_data.as_slice().chunks(packet_size) { |
| let mut written_fut = decoder.write(&frames); |
| |
| let written_bytes = |
| exec.run_singlethreaded(&mut written_fut).expect("to write to decoder"); |
| |
| assert_eq!(frames.len(), written_bytes); |
| frames_sent += frames.len() / SBC_FRAME_SIZE; |
| } |
| |
| assert_eq!(INPUT_FRAMES, frames_sent); |
| |
| let mut flush_fut = pin!(decoder.flush()); |
| exec.run_singlethreaded(&mut flush_fut).expect("to flush the decoder"); |
| |
| decoder.close().expect("stream should always be closable"); |
| |
| // Get data from the output now. |
| let mut decoded = Vec::new(); |
| |
| loop { |
| let mut decoded_fut = decoded_stream.next(); |
| |
| match exec.run_singlethreaded(&mut decoded_fut) { |
| Some(Ok(dec_data)) => { |
| assert!(!dec_data.is_empty()); |
| decoded.extend_from_slice(&dec_data); |
| } |
| Some(Err(e)) => { |
| panic!("Unexpected error when polling decoded data: {}", e); |
| } |
| None => { |
| break; |
| } |
| } |
| } |
| |
| // Match the decoded data to the known hash. |
| let expected_digest = ExpectedDigest::new( |
| "Pcm: 44.1kHz/16bit/Mono", |
| "ff2e7afea51217886d3df15b9a623b4e49c9bd9bd79c58ac01bc94c5511e08d6", |
| ); |
| let hash_validator = BytesValidator { output_file: None, expected_digest }; |
| |
| assert_eq!(256 * INPUT_FRAMES, decoded.len(), "Decoded size should be equal"); |
| |
| let validated = hash_validator.validate(decoded.as_slice()); |
| assert!(validated.is_ok(), "Failed hash: {:?}", validated); |
| } |
| |
| #[fixture(fix_sbc_test_file)] |
| #[fuchsia::test] |
| fn decode_sbc_wakes_output_to_process_events(sbc_data: Vec<u8>) { |
| let mut exec = fasync::TestExecutor::new(); |
| const SBC_FRAME_SIZE: usize = 72; |
| |
| // SBC codec info corresponding to Mono reference stream. |
| let oob_data = Some([0x82, 0x00, 0x00, 0x00].to_vec()); |
| let mut decoder = |
| StreamProcessor::create_decoder("audio/sbc", oob_data).expect("to create decoder"); |
| |
| let mut chunks = sbc_data.as_slice().chunks(SBC_FRAME_SIZE); |
| let next_frame = chunks.next().unwrap(); |
| |
| // Write an initial frame to the encoder. |
| // This is required to get past allocating the input/output buffers. |
| let written = |
| exec.run_singlethreaded(&mut decoder.write(next_frame)).expect("successful write"); |
| assert_eq!(written, next_frame.len()); |
| |
| let mut decoded_stream = decoder.take_output_stream().expect("Stream should be taken"); |
| |
| // Polling the decoded stream before the decoder has started up should wake it when |
| // output starts happening, set up the poll here. |
| let decoded_fut = pin!(decoded_stream.next()); |
| |
| let (waker, decoder_fut_wake_count) = new_count_waker(); |
| let mut counting_ctx = Context::from_waker(&waker); |
| |
| assert!(decoded_fut.poll(&mut counting_ctx).is_pending()); |
| |
| // Send only one frame. This is not eneough to automatically cause output to be generated |
| // by pushing data. |
| let frame = chunks.next().unwrap(); |
| let mut written_fut = decoder.write(&frame); |
| let written_bytes = exec.run_singlethreaded(&mut written_fut).expect("to write to decoder"); |
| assert_eq!(frame.len(), written_bytes); |
| |
| let mut flush_fut = pin!(decoder.flush()); |
| exec.run_singlethreaded(&mut flush_fut).expect("to flush the decoder"); |
| |
| // When an unprocessed event has happened on the stream, even if intervening events have been |
| // procesed by the input processes, it should wake the output future to process the events. |
| assert_eq!(decoder_fut_wake_count.get(), 0); |
| while decoder_fut_wake_count.get() == 0 { |
| let _ = exec.run_until_stalled(&mut futures::future::pending::<()>()); |
| } |
| assert_eq!(decoder_fut_wake_count.get(), 1); |
| |
| let mut decoded = Vec::new(); |
| // Drops the previous decoder future, which is fine. |
| let mut decoded_fut = decoded_stream.next(); |
| |
| match exec.run_singlethreaded(&mut decoded_fut) { |
| Some(Ok(dec_data)) => { |
| assert!(!dec_data.is_empty()); |
| decoded.extend_from_slice(&dec_data); |
| } |
| x => panic!("Expected decoded frame, got {:?}", x), |
| } |
| |
| assert_eq!(512, decoded.len(), "Decoded size should be equal to one frame"); |
| } |
| |
| #[fixture(fix_sbc_test_file)] |
| #[fuchsia::test] |
| fn decode_sbc_wakes_input_to_process_events(sbc_data: Vec<u8>) { |
| let mut exec = fasync::TestExecutor::new(); |
| const SBC_FRAME_SIZE: usize = 72; |
| |
| // SBC codec info corresponding to Mono reference stream. |
| let oob_data = Some([0x82, 0x00, 0x00, 0x00].to_vec()); |
| let mut decoder = |
| StreamProcessor::create_decoder("audio/sbc", oob_data).expect("to create decoder"); |
| |
| let mut decoded_stream = decoder.take_output_stream().expect("Stream should be taken"); |
| |
| let decoded_fut = pin!(decoded_stream.next()); |
| |
| let mut chunks = sbc_data.as_slice().chunks(SBC_FRAME_SIZE); |
| let next_frame = chunks.next().unwrap(); |
| |
| // Write an initial frame to the encoder. |
| // This is to get past allocating the input/output buffers stage. |
| // TODO(https://fxbug.dev/42081385): Both futures need to be polled here even though it's only the |
| // writer we really care about because currently decoded_fut is needed to drive the |
| // allocation process. |
| let (written_res, mut decoded_fut) = |
| run_while(&mut exec, decoded_fut, decoder.write(next_frame)); |
| assert_eq!(written_res.expect("initial write should succeed"), next_frame.len()); |
| |
| // Write to the encoder until we cannot write anymore, because there are no input buffers |
| // available. This should happen when all the input buffers are full and and the input |
| // buffers are waiting to be written. |
| let (waker, write_fut_wake_count) = new_count_waker(); |
| let mut counting_ctx = Context::from_waker(&waker); |
| |
| let mut wake_count_before_stall = 0; |
| for frame in chunks { |
| wake_count_before_stall = write_fut_wake_count.get(); |
| let mut written_fut = decoder.write(&frame); |
| if written_fut.poll_unpin(&mut counting_ctx).is_pending() { |
| // The poll_unpin can wake the input waker if an event arrived for it, meaning we should |
| // continue filling. |
| if write_fut_wake_count.get() != wake_count_before_stall { |
| continue; |
| } |
| // We should have never been woken until now, because we always were ready before, |
| // and the output waker is not registered (so can't progress) |
| break; |
| } |
| // Flush the packet, to make input buffers get spent faster. |
| let mut flush_fut = pin!(decoder.flush()); |
| exec.run_singlethreaded(&mut flush_fut).expect("to flush the decoder"); |
| } |
| |
| // We should be able to get a decoded output, once the codec does it's thing. |
| let decoded_frame = exec.run_singlethreaded(&mut decoded_fut); |
| assert_eq!(512, decoded_frame.unwrap().unwrap().len(), "Decoded frame size wrong"); |
| |
| // Fill the input buffer again so the input waker is registered. |
| let chunks = sbc_data.as_slice().chunks(SBC_FRAME_SIZE); |
| for frame in chunks { |
| wake_count_before_stall = write_fut_wake_count.get(); |
| let mut written_fut = decoder.write(&frame); |
| if written_fut.poll_unpin(&mut counting_ctx).is_pending() { |
| // The poll_unpin can wake the input waker if an event arrived for it, meaning we should |
| // continue filling. |
| if write_fut_wake_count.get() != wake_count_before_stall { |
| continue; |
| } |
| break; |
| } |
| // Flush the packet, to make input buffers get spent faster. |
| let mut flush_fut = pin!(decoder.flush()); |
| exec.run_singlethreaded(&mut flush_fut).expect("to flush the decoder"); |
| } |
| |
| // The input waker should be the one waiting on events from the codec and get woken up, |
| // even if an output event happens. |
| // At some point, we will get an event from the encoder, with no output waker set, and this |
| // should wake the input waker, which is waiting to be woken up. |
| while write_fut_wake_count.get() == wake_count_before_stall { |
| let _ = exec.run_until_stalled(&mut futures::future::pending::<()>()); |
| } |
| |
| // Note: at this point, we may not be able to write another frame, but the waiter should |
| // repoll, and set the waker again. |
| } |
| } |