blob: ea6df9f0c3d795385e77dcbde80242305145f597 [file] [log] [blame]
// Copyright 2019 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use crate::{
buffer_set::*, elementary_stream::*, input_packet_stream::*, output_validator::*, stream::*,
Result,
};
use fidl::endpoints::*;
use fidl_fuchsia_media::*;
use fidl_fuchsia_mediacodec::*;
use fuchsia_component::client;
use futures::TryStreamExt;
use std::rc::Rc;
/// Runs elementary streams through a stream processor.
pub struct StreamRunner {
input_buffer_ordinals: OrdinalSequence,
output_buffer_ordinals: OrdinalSequence,
stream_lifetime_ordinals: OrdinalSequence,
format_details_ordinals: OrdinalSequence,
output_buffer_set: Option<BufferSet>,
input_buffer_set: Option<BufferSet>,
current_codec: Option<StreamProcessorProxy>,
}
impl StreamRunner {
pub fn new() -> Self {
Self {
input_buffer_ordinals: OrdinalPattern::Odd.into_iter(),
output_buffer_ordinals: OrdinalPattern::Odd.into_iter(),
stream_lifetime_ordinals: OrdinalPattern::Odd.into_iter(),
format_details_ordinals: OrdinalPattern::All.into_iter(),
input_buffer_set: None,
output_buffer_set: None,
current_codec: None,
}
}
pub async fn run_stream(
&mut self,
stream: Rc<dyn ElementaryStream>,
options: StreamOptions,
) -> Result<Vec<Output>> {
let format_details_version_ordinal = get_ordinal(&mut self.format_details_ordinals);
let stream_lifetime_ordinal = get_ordinal(&mut self.stream_lifetime_ordinals);
vlog!(
2,
"Starting a stream with lifetime ordinal {} and format details ordinal {}",
stream_lifetime_ordinal,
format_details_version_ordinal
);
let mut codec = if let Some(codec) = self.current_codec.take() {
codec
} else {
// TODO(turnage): Accept parameters for using a decoder vs encoder,
// and their parameters.
await!(get_decoder(stream.as_ref(), format_details_version_ordinal))?
};
let mut events = codec.take_event_stream();
let output = {
let mut stream = Stream {
format_details_version_ordinal,
stream_lifetime_ordinal,
input_buffer_ordinals: &mut self.input_buffer_ordinals,
input_packet_stream: self.input_buffer_set.take().map(|buffer_set| {
InputPacketStream::new(buffer_set, stream.stream(), stream_lifetime_ordinal)
}),
output_buffer_ordinals: &mut self.output_buffer_ordinals,
output_buffer_set: self.output_buffer_set.take(),
current_output_format: None,
codec: &mut codec,
stream: stream.as_ref(),
options,
output: vec![],
};
await!(stream.start())?;
let channel_closed = loop {
let event = if let Some(event) = await!(events.try_next())? {
event
} else {
break true;
};
let control_flow = await!(stream.handle_event(event))?;
match control_flow {
StreamControlFlow::Continue => {}
StreamControlFlow::Stop => break false,
};
};
let mut output = stream.output;
if channel_closed {
output.push(Output::CodecChannelClose);
}
self.input_buffer_set =
stream.input_packet_stream.map(|stream| stream.take_buffer_set());
self.output_buffer_set = stream.output_buffer_set;
output
};
self.current_codec = Some(codec);
if options.release_input_buffers_at_end {
self.input_buffer_set = None;
}
if options.release_output_buffers_at_end {
self.output_buffer_set = None;
}
Ok(output)
}
}
async fn get_decoder(
stream: &ElementaryStream,
format_details_version_ordinal: u64,
) -> Result<StreamProcessorProxy> {
let factory = client::connect_to_service::<CodecFactoryMarker>()?;
let (decoder_client_end, decoder_request) = create_endpoints()?;
let decoder = decoder_client_end.into_proxy()?;
// TODO(turnage): Account for all error reporting methods in the runner options and output.
factory.create_decoder(
CreateDecoderParams {
input_details: Some(stream.format_details(format_details_version_ordinal)),
promise_separate_access_units_on_input: Some(stream.is_access_units()),
require_can_stream_bytes_input: Some(false),
require_can_find_start: Some(false),
require_can_re_sync: Some(false),
require_report_all_detected_errors: Some(false),
require_hw: Some(false),
permit_lack_of_split_header_handling: Some(true),
},
decoder_request,
)?;
Ok(decoder)
}