blob: 7f714421eebcea4c5029d92719f9e571c2d69cc7 [file] [log] [blame]
// Copyright 2019 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use crate::{
buffer_set::*, elementary_stream::*, input_packet_stream::*, output_validator::*, stream::*,
Result,
};
use fidl_fuchsia_media::*;
use futures::TryStreamExt;
use std::rc::Rc;
/// Runs elementary streams through a stream processor.
pub struct StreamRunner {
input_buffer_ordinals: OrdinalSequence,
output_buffer_ordinals: OrdinalSequence,
stream_lifetime_ordinals: OrdinalSequence,
format_details_ordinals: OrdinalSequence,
output_buffer_set: Option<BufferSet>,
input_buffer_set: Option<BufferSet>,
stream_processor: StreamProcessorProxy,
}
impl StreamRunner {
pub fn new(stream_processor: StreamProcessorProxy) -> Self {
Self {
input_buffer_ordinals: OrdinalPattern::Odd.into_iter(),
output_buffer_ordinals: OrdinalPattern::Odd.into_iter(),
stream_lifetime_ordinals: OrdinalPattern::Odd.into_iter(),
format_details_ordinals: OrdinalPattern::All.into_iter(),
input_buffer_set: None,
output_buffer_set: None,
stream_processor,
}
}
pub async fn run_stream(
&mut self,
stream: Rc<dyn ElementaryStream>,
options: StreamOptions,
) -> Result<Vec<Output>> {
let format_details_version_ordinal = get_ordinal(&mut self.format_details_ordinals);
let stream_lifetime_ordinal = get_ordinal(&mut self.stream_lifetime_ordinals);
vlog!(
2,
"Starting a stream with lifetime ordinal {} and format details ordinal {}",
stream_lifetime_ordinal,
format_details_version_ordinal
);
let mut events = self.stream_processor.take_event_stream();
let output = {
let mut stream = Stream {
format_details_version_ordinal,
stream_lifetime_ordinal,
input_buffer_ordinals: &mut self.input_buffer_ordinals,
input_packet_stream: self.input_buffer_set.take().map(|buffer_set| {
InputPacketStream::new(buffer_set, stream.stream(), stream_lifetime_ordinal)
}),
output_buffer_ordinals: &mut self.output_buffer_ordinals,
output_buffer_set: self.output_buffer_set.take(),
current_output_format: None,
stream_processor: &mut self.stream_processor,
stream: stream.as_ref(),
options,
output: vec![],
};
stream.start().await?;
let channel_closed = loop {
let event = if let Some(event) = events.try_next().await? {
event
} else {
break true;
};
let control_flow = stream.handle_event(event).await?;
match control_flow {
StreamControlFlow::Continue => {}
StreamControlFlow::Stop => break false,
};
};
let mut output = stream.output;
if channel_closed {
output.push(Output::CodecChannelClose);
}
self.input_buffer_set =
stream.input_packet_stream.map(|stream| stream.take_buffer_set());
self.output_buffer_set = stream.output_buffer_set;
output
};
if options.release_input_buffers_at_end {
self.input_buffer_set = None;
}
if options.release_output_buffers_at_end {
self.output_buffer_set = None;
}
Ok(output)
}
}