[mediacodec] Begin stress tests of codecs.

This CL begins a test suite of StreamProcessor
implementations.

Testing a StreamProcessor is hard because there
is a lot to keep track of. This CL features a
StreamRunner that takes a description of
intended behavior in data, so that tests can just
specify a config for the stream run.

StreamRunner is generic over elementary stream.
All that is required of new codecs to fit into
the StreamRunner is implementing the
ElementaryStream trait for some test data stream.

A test spec type is defined that allows testers
to specify the elementary streams to run,
the options with which to run them, and a set of
validators for the behavior of the stream.

This is all needed because the scenarios we need
to run for stress tests involve complex scenarios
with many concurrent streams. To keep that under
control it is easiest to define them
declaratively.

Change-Id: Iab9d27448166881ca37fbbe64fb44c4f2495c967
diff --git a/garnet/packages/tests/BUILD.gn b/garnet/packages/tests/BUILD.gn
index 447fcd5..04178e7 100644
--- a/garnet/packages/tests/BUILD.gn
+++ b/garnet/packages/tests/BUILD.gn
@@ -784,9 +784,8 @@
     ":virtual_audio",
     "//garnet/bin/media/codecs:tests",
     "//garnet/public/lib/media/timeline:media_lib_timeline_tests",
-    "//src/media/audio:tests",
+    "//src/media:tests",
     "//src/media/playback/mediaplayer:tests",
-    "//src/media/sessions:tests",
   ]
 }
 
diff --git a/src/media/BUILD.gn b/src/media/BUILD.gn
index e80129f..c22e97b 100644
--- a/src/media/BUILD.gn
+++ b/src/media/BUILD.gn
@@ -7,6 +7,7 @@
   deps = [
     ":tests",
     "audio",
+    "codecs",
     "lib",
     "playback",
     "sessions",
@@ -19,5 +20,6 @@
   data_deps = [
     "audio:tests",
     "sessions:tests",
+    "codecs:tests",
   ]
 }
diff --git a/src/media/codecs/BUILD.gn b/src/media/codecs/BUILD.gn
new file mode 100644
index 0000000..5f6b40d
--- /dev/null
+++ b/src/media/codecs/BUILD.gn
@@ -0,0 +1,14 @@
+# Copyright 2019 The Fuchsia Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+group("codecs") {
+  deps = []
+}
+
+group("tests") {
+  testonly = true
+  deps = [
+    "stress",
+  ]
+}
diff --git a/src/media/codecs/stress/BUILD.gn b/src/media/codecs/stress/BUILD.gn
new file mode 100644
index 0000000..bb3f4e4
--- /dev/null
+++ b/src/media/codecs/stress/BUILD.gn
@@ -0,0 +1,67 @@
+# Copyright 2019 The Fuchsia Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+import("//build/config.gni")
+import("//build/rust/rustc_library.gni")
+import("//build/rust/rustc_macro.gni")
+import("//build/test/test_package.gni")
+import("//build/testing/environments.gni")
+
+rustc_library("lib") {
+  name = "codec_stress"
+  edition = "2018"
+  with_unit_tests = true
+
+  deps = [
+    "//garnet/public/lib/fidl/rust/fidl",
+    "//garnet/public/rust/fuchsia-async",
+    "//garnet/public/rust/fuchsia-component",
+    "//garnet/public/rust/fuchsia-syslog",
+    "//garnet/public/rust/fuchsia-zircon",
+    "//sdk/fidl/fuchsia.media:fuchsia.media-rustc",
+    "//sdk/fidl/fuchsia.mediacodec:fuchsia.mediacodec-rustc",
+    "//src/lib/fidl_table_validation",
+    "//third_party/rust_crates:failure",
+    "//third_party/rust_crates:futures-preview",
+    "//third_party/rust_crates:lazy_static",
+    "//third_party/rust_crates:log",
+    "//third_party/rust_crates:parking_lot",
+    "//zircon/public/fidl/fuchsia-sysmem:fuchsia-sysmem-rustc",
+  ]
+}
+
+test_package("codec_stress_tests") {
+  deps = [
+    ":lib",
+  ]
+
+  meta = [
+    {
+      path = rebase_path("meta/codec_stress_tests.cmx")
+      dest = "codec_stress_tests.cmx"
+    },
+  ]
+
+  tests = [
+    {
+      name = "codec_stress_lib_test"
+      environments = basic_envs
+    },
+  ]
+
+  resources = [
+    {
+      path = rebase_path(
+              "../../../../garnet/test_data/media/third_party/chromium_media_test_data/bear.h264")
+      dest = "bear.h264"
+    },
+  ]
+}
+
+group("stress") {
+  testonly = true
+  deps = [
+    ":codec_stress_tests",
+  ]
+}
diff --git a/src/media/codecs/stress/meta/codec_stress_lib_test.cmx b/src/media/codecs/stress/meta/codec_stress_lib_test.cmx
new file mode 100644
index 0000000..f97e33b
--- /dev/null
+++ b/src/media/codecs/stress/meta/codec_stress_lib_test.cmx
@@ -0,0 +1,26 @@
+{
+    "facets": {
+        "fuchsia.test": {
+            "injected-services": {
+                "fuchsia.mediacodec.CodecFactory": "fuchsia-pkg://fuchsia.com/codec_factory#meta/codec_factory.cmx",
+                "fuchsia.sysmem.Allocator": "fuchsia-pkg://fuchsia.com/sysmem_connector#meta/sysmem_connector.cmx"
+            }
+        }
+    },
+    "program": {
+        "binary": "test/codec_stress_lib_test"
+    },
+    "sandbox": {
+        "dev": [
+            "class/media-codec",
+            "class/sysmem"
+        ],
+        "features": [],
+        "services": [
+            "fuchsia.mediacodec.CodecFactory",
+            "fuchsia.tracelink.Registry",
+            "fuchsia.sysmem.Allocator",
+            "fuchsia.logger.LogSink"
+        ]
+    }
+}
diff --git a/src/media/codecs/stress/meta/codec_stress_tests.cmx b/src/media/codecs/stress/meta/codec_stress_tests.cmx
new file mode 100644
index 0000000..79b6d89
--- /dev/null
+++ b/src/media/codecs/stress/meta/codec_stress_tests.cmx
@@ -0,0 +1,9 @@
+{
+    "program": {
+        "binary": "bin/app"
+    },
+    "sandbox": {
+        "features": [],
+        "services": []
+    }
+}
diff --git a/src/media/codecs/stress/src/buffer_collection_constraints.rs b/src/media/codecs/stress/src/buffer_collection_constraints.rs
new file mode 100644
index 0000000..e5b2f4f
--- /dev/null
+++ b/src/media/codecs/stress/src/buffer_collection_constraints.rs
@@ -0,0 +1,57 @@
+// Copyright 2019 The Fuchsia Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+use fidl_fuchsia_sysmem::*;
+
+pub const IMAGE_FORMAT_CONSTRAINTS_DEFAULT: ImageFormatConstraints = ImageFormatConstraints {
+    pixel_format: PixelFormat {
+        type_: PixelFormatType::Nv12,
+        has_format_modifier: false,
+        format_modifier: FormatModifier { value: 0 },
+    },
+    color_spaces_count: 0,
+    color_space: [ColorSpace { type_: ColorSpaceType::Invalid }; 32],
+    min_coded_width: 0,
+    max_coded_width: 0,
+    min_coded_height: 0,
+    max_coded_height: 0,
+    min_bytes_per_row: 0,
+    max_bytes_per_row: 0,
+    max_coded_width_times_coded_height: 0,
+    layers: 0,
+    coded_width_divisor: 0,
+    coded_height_divisor: 0,
+    bytes_per_row_divisor: 0,
+    start_offset_divisor: 0,
+    display_width_divisor: 0,
+    display_height_divisor: 0,
+    required_min_coded_width: 0,
+    required_max_coded_width: 0,
+    required_min_coded_height: 0,
+    required_max_coded_height: 0,
+    required_min_bytes_per_row: 0,
+    required_max_bytes_per_row: 0,
+};
+
+pub const BUFFER_COLLECTION_CONSTRAINTS_DEFAULT: BufferCollectionConstraints =
+    BufferCollectionConstraints {
+        usage: BufferUsage { cpu: 1, vulkan: 0, display: 0, video: 1 },
+        min_buffer_count_for_camping: 0,
+        min_buffer_count_for_dedicated_slack: 0,
+        min_buffer_count_for_shared_slack: 0,
+        min_buffer_count: 0,
+        max_buffer_count: 0,
+        has_buffer_memory_constraints: false,
+        buffer_memory_constraints: BufferMemoryConstraints {
+            min_size_bytes: 0,
+            max_size_bytes: std::u32::MAX,
+            physically_contiguous_required: false,
+            secure_required: false,
+            secure_permitted: false,
+            ram_domain_supported: false,
+            cpu_domain_supported: true,
+        },
+        image_format_constraints_count: 0,
+        image_format_constraints: [IMAGE_FORMAT_CONSTRAINTS_DEFAULT; 32],
+    };
diff --git a/src/media/codecs/stress/src/buffer_set.rs b/src/media/codecs/stress/src/buffer_set.rs
new file mode 100644
index 0000000..5864348
--- /dev/null
+++ b/src/media/codecs/stress/src/buffer_set.rs
@@ -0,0 +1,349 @@
+// Copyright 2019 The Fuchsia Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+//! Handles negotiating buffer sets with the codec server and sysmem.
+
+use crate::{buffer_collection_constraints::*, Result};
+use failure::{self, Fail, ResultExt};
+use fidl::endpoints::{create_endpoints, ClientEnd};
+use fidl_fuchsia_media::*;
+use fidl_fuchsia_sysmem::*;
+use fidl_table_validation::{ValidFidlTable, Validate};
+use fuchsia_component::client;
+use fuchsia_zircon as zx;
+use std::{
+    convert::TryFrom,
+    fmt,
+    iter::{IntoIterator, StepBy},
+    ops::RangeFrom,
+};
+
+#[derive(Debug)]
+pub enum Error {
+    ReclaimClientTokenChannel,
+    ServerOmittedBufferVmo,
+    PacketReferencesInvalidBuffer,
+    VmoReadFail(zx::Status),
+}
+
+impl fmt::Display for Error {
+    fn fmt(&self, w: &mut fmt::Formatter) -> fmt::Result {
+        fmt::Debug::fmt(&self, w)
+    }
+}
+
+impl Fail for Error {}
+
+#[allow(unused)]
+#[derive(ValidFidlTable, Copy, Clone, Debug, PartialEq)]
+#[fidl_table_src(StreamBufferSettings)]
+pub struct ValidStreamBufferSettings {
+    buffer_lifetime_ordinal: u64,
+    buffer_constraints_version_ordinal: u64,
+    packet_count_for_server: u32,
+    packet_count_for_client: u32,
+    per_packet_buffer_bytes: u32,
+    #[fidl_field_type(default = false)]
+    single_buffer_mode: bool,
+}
+
+#[allow(unused)]
+#[derive(ValidFidlTable)]
+#[fidl_table_src(StreamBufferConstraints)]
+#[fidl_table_validator(StreamBufferConstraintsValidator)]
+pub struct ValidStreamBufferConstraints {
+    buffer_constraints_version_ordinal: u64,
+    default_settings: ValidStreamBufferSettings,
+    per_packet_buffer_bytes_min: u32,
+    per_packet_buffer_bytes_recommended: u32,
+    per_packet_buffer_bytes_max: u32,
+    packet_count_for_server_min: u32,
+    packet_count_for_server_recommended: u32,
+    packet_count_for_server_recommended_max: u32,
+    packet_count_for_server_max: u32,
+    packet_count_for_client_min: u32,
+    packet_count_for_client_max: u32,
+    single_buffer_mode_allowed: bool,
+    #[fidl_field_type(default = false)]
+    is_physically_contiguous_required: bool,
+    #[fidl_field_type(optional)]
+    very_temp_kludge_bti_handle: Option<zx::Handle>,
+}
+
+#[derive(ValidFidlTable)]
+#[fidl_table_src(StreamOutputConstraints)]
+pub struct ValidStreamOutputConstraints {
+    pub stream_lifetime_ordinal: u64,
+    pub buffer_constraints_action_required: bool,
+    pub buffer_constraints: ValidStreamBufferConstraints,
+}
+
+pub struct StreamBufferConstraintsValidator;
+
+#[derive(Debug)]
+pub enum StreamBufferConstraintsError {
+    VersionOrdinalZero,
+    SingleBufferMode,
+    ConstraintsNoBtiHandleForPhysicalBuffers,
+}
+
+impl Validate<ValidStreamBufferConstraints> for StreamBufferConstraintsValidator {
+    type Error = StreamBufferConstraintsError;
+    fn validate(candidate: &ValidStreamBufferConstraints) -> std::result::Result<(), Self::Error> {
+        if candidate.buffer_constraints_version_ordinal == 0 {
+            // An ordinal of 0 in StreamBufferConstraints is not allowed.
+            return Err(StreamBufferConstraintsError::VersionOrdinalZero);
+        }
+
+        if candidate.default_settings.single_buffer_mode {
+            // StreamBufferConstraints should never suggest single buffer mode.
+            return Err(StreamBufferConstraintsError::SingleBufferMode);
+        }
+
+        if candidate.is_physically_contiguous_required
+            && candidate
+                .very_temp_kludge_bti_handle
+                .as_ref()
+                .map(|h| h.is_invalid())
+                .unwrap_or(true)
+        {
+            // The bti handle must be provided if the buffers need to be physically contiguous.
+            return Err(StreamBufferConstraintsError::ConstraintsNoBtiHandleForPhysicalBuffers);
+        }
+
+        Ok(())
+    }
+}
+
+/// The pattern to use when advancing ordinals.
+#[derive(Debug, Clone, Copy)]
+pub enum OrdinalPattern {
+    /// Odd ordinal pattern starts at 1 and moves in increments of 2: [1,3,5..]
+    Odd,
+    /// All ordinal pattern starts at 1 and moves in increments of 1: [1,2,3..]
+    All,
+}
+
+impl IntoIterator for OrdinalPattern {
+    type Item = u64;
+    type IntoIter = StepBy<RangeFrom<Self::Item>>;
+    fn into_iter(self) -> Self::IntoIter {
+        let (start, step) = match self {
+            OrdinalPattern::Odd => (1, 2),
+            OrdinalPattern::All => (1, 1),
+        };
+        (start..).step_by(step)
+    }
+}
+
+pub fn get_ordinal(pattern: &mut <OrdinalPattern as IntoIterator>::IntoIter) -> u64 {
+    pattern.next().expect("Getting next item in infinite pattern")
+}
+
+pub enum BufferSetType {
+    Input,
+    Output,
+}
+
+pub struct BufferSetFactory;
+
+impl BufferSetFactory {
+    pub async fn buffer_set(
+        buffer_lifetime_ordinal: u64,
+        constraints: ValidStreamBufferConstraints,
+        codec: &mut StreamProcessorProxy,
+        buffer_set_type: BufferSetType,
+        buffer_collection_constraints: Option<BufferCollectionConstraints>,
+    ) -> Result<BufferSet> {
+        let (collection_client, settings) = await!(Self::settings(
+            buffer_lifetime_ordinal,
+            constraints,
+            buffer_collection_constraints
+        ))?;
+
+        vlog!(2, "Got settings; waiting for buffers.");
+
+        match buffer_set_type {
+            BufferSetType::Input => codec
+                .set_input_buffer_partial_settings(settings)
+                .context("Sending input partial settings to codec")?,
+            BufferSetType::Output => codec
+                .set_output_buffer_partial_settings(settings)
+                .context("Sending output partial settings to codec")?,
+        };
+
+        let (status, collection_info) = await!(collection_client.wait_for_buffers_allocated())
+            .context("Waiting for buffers")?;
+        vlog!(2, "Sysmem responded: {:?}", status);
+        let collection_info = zx::Status::ok(status).map(|_| collection_info)?;
+
+        if let BufferSetType::Output = buffer_set_type {
+            vlog!(2, "Completing settings for output.");
+            codec.complete_output_buffer_partial_settings(buffer_lifetime_ordinal)?;
+        }
+
+        //collection_client.close()?;
+
+        vlog!(
+            2,
+            "Got {} buffers of size {:?}",
+            collection_info.buffer_count,
+            collection_info.settings.buffer_settings.size_bytes
+        );
+        vlog!(3, "Buffer collection is: {:#?}", collection_info.settings);
+        for (i, buffer) in collection_info.buffers.iter().enumerate() {
+            // We enumerate beyond collection_info.buffer_count just for debugging
+            // purposes at this log level.
+            vlog!(3, "Buffer {} is : {:#?}", i, buffer);
+        }
+
+        Ok(BufferSet::try_from(BufferSetSpec {
+            proxy: collection_client,
+            buffer_lifetime_ordinal,
+            collection_info,
+        })?)
+    }
+
+    async fn settings(
+        buffer_lifetime_ordinal: u64,
+        constraints: ValidStreamBufferConstraints,
+        buffer_collection_constraints: Option<BufferCollectionConstraints>,
+    ) -> Result<(BufferCollectionProxy, StreamBufferPartialSettings)> {
+        let (client_token, client_token_request) =
+            create_endpoints::<BufferCollectionTokenMarker>()?;
+        let (codec_token, codec_token_request) = create_endpoints::<BufferCollectionTokenMarker>()?;
+        let client_token = client_token.into_proxy()?;
+
+        let sysmem_client =
+            client::connect_to_service::<AllocatorMarker>().context("Connecting to sysmem")?;
+
+        sysmem_client
+            .allocate_shared_collection(client_token_request)
+            .context("Allocating shared collection")?;
+        client_token.duplicate(std::u32::MAX, codec_token_request)?;
+
+        let (collection_client, collection_request) = create_endpoints::<BufferCollectionMarker>()?;
+        sysmem_client.bind_shared_collection(
+            ClientEnd::new(
+                client_token
+                    .into_channel()
+                    .map_err(|_| Error::ReclaimClientTokenChannel)?
+                    .into_zx_channel(),
+            ),
+            collection_request,
+        )?;
+        let collection_client = collection_client.into_proxy()?;
+        await!(collection_client.sync()).context("Syncing codec_token_request with sysmem")?;
+
+        let mut collection_constraints =
+            buffer_collection_constraints.unwrap_or(BUFFER_COLLECTION_CONSTRAINTS_DEFAULT);
+        assert_eq!(
+            collection_constraints.min_buffer_count_for_camping, 0,
+            "Codecs assert that buffer_count == packet count, so we can't change this yet."
+        );
+        collection_constraints.min_buffer_count_for_camping =
+            constraints.default_settings.packet_count_for_client;
+
+        vlog!(3, "Our buffer collection constraints are: {:#?}", collection_constraints);
+
+        // By design we must say true even if all our fields are left at
+        // default, or sysmem will not give us buffer handles.
+        let has_constraints = true;
+        collection_client
+            .set_constraints(has_constraints, &mut collection_constraints)
+            .context("Sending buffer constraints to sysmem")?;
+
+        Ok((
+            collection_client,
+            StreamBufferPartialSettings {
+                buffer_lifetime_ordinal: Some(buffer_lifetime_ordinal),
+                buffer_constraints_version_ordinal: Some(
+                    constraints.buffer_constraints_version_ordinal,
+                ),
+                single_buffer_mode: Some(constraints.default_settings.single_buffer_mode),
+                packet_count_for_server: Some(constraints.default_settings.packet_count_for_server),
+                packet_count_for_client: Some(constraints.default_settings.packet_count_for_client),
+                sysmem_token: Some(codec_token),
+            },
+        ))
+    }
+}
+
+#[derive(ValidFidlTable, Clone, Copy, Debug, PartialEq)]
+#[fidl_table_src(PacketHeader)]
+pub struct ValidPacketHeader {
+    pub buffer_lifetime_ordinal: u64,
+    pub packet_index: u32,
+}
+
+#[derive(ValidFidlTable, Clone, Copy, Debug, PartialEq)]
+#[fidl_table_src(Packet)]
+pub struct ValidPacket {
+    pub header: ValidPacketHeader,
+    pub buffer_index: u32,
+    pub stream_lifetime_ordinal: u64,
+    pub start_offset: u32,
+    pub valid_length_bytes: u32,
+    #[fidl_field_type(optional)]
+    pub timestamp_ish: Option<u64>,
+    #[fidl_field_type(default = false)]
+    pub start_access_unit: bool,
+    #[fidl_field_type(default = false)]
+    pub known_end_access_unit: bool,
+}
+
+struct BufferSetSpec {
+    proxy: BufferCollectionProxy,
+    buffer_lifetime_ordinal: u64,
+    collection_info: BufferCollectionInfo2,
+}
+
+#[derive(Debug, PartialEq)]
+pub struct Buffer {
+    pub data: zx::Vmo,
+    pub start: u64,
+    pub size: u64,
+}
+
+#[derive(Debug)]
+pub struct BufferSet {
+    pub proxy: BufferCollectionProxy,
+    pub buffers: Vec<Buffer>,
+    pub buffer_lifetime_ordinal: u64,
+}
+
+impl TryFrom<BufferSetSpec> for BufferSet {
+    type Error = failure::Error;
+    fn try_from(mut src: BufferSetSpec) -> std::result::Result<Self, Self::Error> {
+        let mut buffers = vec![];
+        for (i, buffer) in src.collection_info.buffers
+            [0..(src.collection_info.buffer_count as usize)]
+            .iter_mut()
+            .enumerate()
+        {
+            buffers.push(Buffer {
+                data: buffer.vmo.take().ok_or(Error::ServerOmittedBufferVmo).context(format!(
+                    "Trying to ingest {}th buffer of {}: {:#?}",
+                    i, src.collection_info.buffer_count, buffer
+                ))?,
+                start: buffer.vmo_usable_start,
+                size: src.collection_info.settings.buffer_settings.size_bytes as u64,
+            });
+        }
+
+        Ok(Self { proxy: src.proxy, buffers, buffer_lifetime_ordinal: src.buffer_lifetime_ordinal })
+    }
+}
+
+impl BufferSet {
+    pub fn read_packet(&self, packet: &ValidPacket) -> Result<Vec<u8>> {
+        let buffer = self
+            .buffers
+            .get(packet.buffer_index as usize)
+            .ok_or(Error::PacketReferencesInvalidBuffer)?;
+        let mut dest = vec![0; packet.valid_length_bytes as usize];
+        buffer.data.read(&mut dest, packet.start_offset as u64).map_err(Error::VmoReadFail)?;
+        Ok(dest)
+    }
+}
diff --git a/src/media/codecs/stress/src/codecs.rs b/src/media/codecs/stress/src/codecs.rs
new file mode 100644
index 0000000..0e7170d
--- /dev/null
+++ b/src/media/codecs/stress/src/codecs.rs
@@ -0,0 +1,5 @@
+// Copyright 2019 The Fuchsia Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+pub mod h264;
diff --git a/src/media/codecs/stress/src/codecs/h264.rs b/src/media/codecs/stress/src/codecs/h264.rs
new file mode 100644
index 0000000..69a5caf
--- /dev/null
+++ b/src/media/codecs/stress/src/codecs/h264.rs
@@ -0,0 +1,134 @@
+// Copyright 2019 The Fuchsia Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+use crate::{elementary_stream::*, Result};
+use fidl_fuchsia_media::FormatDetails;
+use std::{fs, path::Path};
+
+pub const BEAR_TEST_FILE: &str = "/pkg/data/bear.h264";
+
+/// Represents an H264 elementary stream.
+pub struct H264Stream {
+    data: Vec<u8>,
+}
+
+impl H264Stream {
+    /// Constructs an H264 elementary stream from a file with raw elementary stream data.
+    pub fn from_file(filename: impl AsRef<Path>) -> Result<Self> {
+        Ok(Self { data: fs::read(filename)? })
+    }
+
+    /// Returns an iterator over H264 NALs that does not copy.
+    fn nal_iter(&self) -> impl Iterator<Item = H264Nal> {
+        H264NalIter { data: &self.data, pos: 0 }
+    }
+}
+
+impl ElementaryStream for H264Stream {
+    fn format_details(&self, version_ordinal: u64) -> FormatDetails {
+        FormatDetails {
+            format_details_version_ordinal: Some(version_ordinal),
+            mime_type: Some(String::from("video/h264")),
+            oob_bytes: None,
+            domain: None,
+            pass_through_parameters: None,
+        }
+    }
+
+    fn is_access_units(&self) -> bool {
+        true
+    }
+
+    fn stream<'a>(&'a self) -> Box<dyn Iterator<Item = ElementaryStreamChunk> + 'a> {
+        Box::new(self.nal_iter().map(|nal| ElementaryStreamChunk {
+            start_access_unit: true,
+            known_end_access_unit: true,
+            data: nal.data,
+            significance: match nal.kind {
+                H264NalKind::Picture => Significance::Video(VideoSignificance::Picture),
+                H264NalKind::NotPicture => Significance::Video(VideoSignificance::NotPicture),
+            },
+            timestamp: None,
+        }))
+    }
+}
+
+pub struct H264Nal<'a> {
+    pub kind: H264NalKind,
+    pub data: &'a [u8],
+}
+
+pub enum H264NalKind {
+    Picture,
+    NotPicture,
+}
+
+impl H264NalKind {
+    const NON_IDR_PICTURE_CODE: u8 = 1;
+    const IDR_PICTURE_CODE: u8 = 5;
+
+    fn from_header(header: u8) -> Self {
+        let kind = header & 0xf;
+        if kind == Self::NON_IDR_PICTURE_CODE || kind == Self::IDR_PICTURE_CODE {
+            H264NalKind::Picture
+        } else {
+            H264NalKind::NotPicture
+        }
+    }
+}
+
+struct H264NalStart<'a> {
+    /// Position in the h264 stream of the start.
+    pos: usize,
+    /// All the data from the start of the NAL onward.
+    data: &'a [u8],
+    kind: H264NalKind,
+}
+
+/// An iterator over NALs in an H264 stream.
+struct H264NalIter<'a> {
+    data: &'a [u8],
+    pos: usize,
+}
+
+impl<'a> H264NalIter<'a> {
+    fn next_nal(&self, pos: usize) -> Option<H264Nal<'a>> {
+        // This won't need to search if pos already at a start code.
+        let nal_start = self.next_nal_start(pos)?;
+        // We search 3 bytes after the found nal's start, because that will
+        // ensure we don't just find the same start code again.
+        match self.next_nal_start(nal_start.pos + 3) {
+            Some(next_start) => Some(H264Nal {
+                kind: nal_start.kind,
+                data: &nal_start.data[0..(next_start.pos - nal_start.pos)],
+            }),
+            None => Some(H264Nal { kind: nal_start.kind, data: nal_start.data }),
+        }
+    }
+
+    fn next_nal_start(&self, pos: usize) -> Option<H264NalStart<'a>> {
+        // This search size will find 3 and 4 byte start codes, and the
+        // header value.
+        const NAL_SEARCH_SIZE: usize = 5;
+
+        let data = self.data.get(pos..)?;
+        data.windows(NAL_SEARCH_SIZE).enumerate().find_map(|(i, candidate)| match candidate {
+            [0, 0, 0, 1, h] | [0, 0, 1, h, _] => Some(H264NalStart {
+                pos: i + pos,
+                data: data.get(i..).expect("Getting slice starting where we just matched"),
+                kind: H264NalKind::from_header(*h),
+            }),
+            _ => None,
+        })
+    }
+}
+
+impl<'a> Iterator for H264NalIter<'a> {
+    type Item = H264Nal<'a>;
+    fn next(&mut self) -> Option<Self::Item> {
+        let nal = self.next_nal(self.pos);
+        self.pos += nal.as_ref().map(|n| n.data.len()).unwrap_or(0);
+        nal
+    }
+}
diff --git a/src/media/codecs/stress/src/elementary_stream.rs b/src/media/codecs/stress/src/elementary_stream.rs
new file mode 100644
index 0000000..906b130
--- /dev/null
+++ b/src/media/codecs/stress/src/elementary_stream.rs
@@ -0,0 +1,78 @@
+// Copyright 2019 The Fuchsia Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+use fidl_fuchsia_media::FormatDetails;
+
+pub trait ElementaryStream {
+    fn format_details(&self, version_ordinal: u64) -> FormatDetails;
+
+    /// Whether _all_ chunks in the elementary stream will be on access unit boundaries. These are
+    /// units for decoder input (e.g. in H264, NALs). When input is not in access units, the server
+    /// must parse and/or buffer the bitstream.
+    fn is_access_units(&self) -> bool;
+
+    fn stream<'a>(&'a self) -> Box<dyn Iterator<Item = ElementaryStreamChunk> + 'a>;
+
+    fn video_frame_count(&self) -> usize {
+        self.stream()
+            .filter(|chunk| match chunk.significance {
+                Significance::Video(VideoSignificance::Picture) => true,
+                _ => false,
+            })
+            .count()
+    }
+}
+
+#[derive(Copy, Clone, Debug)]
+pub struct ElementaryStreamChunk<'a> {
+    pub start_access_unit: bool,
+    pub known_end_access_unit: bool,
+    pub data: &'a [u8],
+    pub significance: Significance,
+    pub timestamp: Option<u64>,
+}
+
+#[derive(Copy, Clone, Debug)]
+pub enum Significance {
+    Video(VideoSignificance),
+}
+
+#[derive(Copy, Clone, Debug)]
+pub enum VideoSignificance {
+    Picture,
+    NotPicture,
+}
+
+/// Wraps an elementary stream and adds sequential dummy timestamps to its chunks.
+pub struct TimestampedStream<S, I> {
+    pub source: S,
+    pub timestamps: I,
+}
+
+impl<S, I> ElementaryStream for TimestampedStream<S, I>
+where
+    S: ElementaryStream,
+    I: Iterator<Item = u64> + Clone,
+{
+    fn format_details(&self, version_ordinal: u64) -> FormatDetails {
+        self.source.format_details(version_ordinal)
+    }
+
+    fn is_access_units(&self) -> bool {
+        self.source.is_access_units()
+    }
+
+    fn stream<'a>(&'a self) -> Box<dyn Iterator<Item = ElementaryStreamChunk> + 'a> {
+        let mut timestamps = self.timestamps.clone();
+        Box::new(self.source.stream().map(move |mut chunk| {
+            match chunk.significance {
+                Significance::Video(VideoSignificance::Picture) => {
+                    chunk.timestamp = timestamps.next();
+                }
+                _ => {}
+            };
+            chunk
+        }))
+    }
+}
diff --git a/src/media/codecs/stress/src/input_packet_stream.rs b/src/media/codecs/stress/src/input_packet_stream.rs
new file mode 100644
index 0000000..bd7bbfe
--- /dev/null
+++ b/src/media/codecs/stress/src/input_packet_stream.rs
@@ -0,0 +1,132 @@
+use crate::{buffer_set::*, elementary_stream::*};
+// Copyright 2019 The Fuchsia Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+use failure::Fail;
+use fidl_fuchsia_media::*;
+use fuchsia_zircon as zx;
+use std::{collections::HashMap, fmt};
+
+type PacketIdx = u32;
+type BufferIdx = u32;
+
+/// A stream converting elementary stream chunks into input packets for a stream processor.
+pub struct InputPacketStream<I> {
+    packet_and_buffer_pairs: HashMap<PacketIdx, (BufferIdx, UsageStatus)>,
+    buffer_set: BufferSet,
+    stream_lifetime_ordinal: u64,
+    stream: I,
+    sent_eos: bool,
+}
+
+#[derive(Copy, Clone, PartialEq, Debug)]
+enum UsageStatus {
+    Free,
+    InUse,
+}
+
+#[derive(Debug)]
+pub enum Error {
+    PacketRefersToInvalidBuffer,
+    BufferTooSmall { buffer_size: usize, stream_chunk_size: usize },
+    VmoWriteFail(zx::Status),
+}
+
+impl fmt::Display for Error {
+    fn fmt(&self, w: &mut fmt::Formatter) -> fmt::Result {
+        fmt::Debug::fmt(&self, w)
+    }
+}
+
+impl Fail for Error {}
+
+pub enum PacketPoll {
+    Ready(Packet),
+    Eos,
+    NotReady,
+}
+
+impl<'a, I: Iterator<Item = ElementaryStreamChunk<'a>>> InputPacketStream<I> {
+    pub fn new(buffer_set: BufferSet, stream: I, stream_lifetime_ordinal: u64) -> Self {
+        let packets = 0..(buffer_set.buffers.len() as u32);
+        let buffers = packets.clone().rev().map(|idx| (idx, UsageStatus::Free));
+        Self {
+            packet_and_buffer_pairs: packets.zip(buffers).collect(),
+            buffer_set,
+            stream_lifetime_ordinal,
+            stream,
+            sent_eos: false,
+        }
+    }
+
+    pub fn add_free_packet(&mut self, packet: ValidPacketHeader) -> Result<(), Error> {
+        let (_, ref mut status) = *self
+            .packet_and_buffer_pairs
+            .get_mut(&packet.packet_index)
+            .ok_or(Error::PacketRefersToInvalidBuffer)?;
+        *status = UsageStatus::Free;
+        Ok(())
+    }
+
+    fn free_packet_and_buffer(&mut self) -> Option<(u32, u32)> {
+        // This is a linear search. This may not be appropriate in prod code.
+        self.packet_and_buffer_pairs.iter_mut().find_map(|(packet, (buffer, usage))| match usage {
+            UsageStatus::Free => {
+                *usage = UsageStatus::InUse;
+                Some((*packet, *buffer))
+            }
+            UsageStatus::InUse => None,
+        })
+    }
+
+    pub fn next_packet(&mut self) -> Result<PacketPoll, Error> {
+        let (packet_idx, buffer_idx) = if let Some(idxs) = self.free_packet_and_buffer() {
+            idxs
+        } else {
+            return Ok(PacketPoll::NotReady);
+        };
+
+        let chunk = if let Some(chunk) = self.stream.next() {
+            chunk
+        } else if !self.sent_eos {
+            self.sent_eos = true;
+            return Ok(PacketPoll::Eos);
+        } else {
+            return Ok(PacketPoll::NotReady);
+        };
+
+        let buffer = self
+            .buffer_set
+            .buffers
+            .get(buffer_idx as usize)
+            .ok_or(Error::PacketRefersToInvalidBuffer)?;
+
+        if (buffer.size as usize) < chunk.data.len() {
+            return Err(Error::BufferTooSmall {
+                buffer_size: buffer.size as usize,
+                stream_chunk_size: chunk.data.len(),
+            });
+        }
+
+        buffer.data.write(chunk.data, 0).map_err(Error::VmoWriteFail)?;
+
+        Ok(PacketPoll::Ready(Packet {
+            header: Some(PacketHeader {
+                packet_index: Some(packet_idx),
+                buffer_lifetime_ordinal: Some(self.buffer_set.buffer_lifetime_ordinal),
+            }),
+            buffer_index: Some(buffer_idx),
+            stream_lifetime_ordinal: Some(self.stream_lifetime_ordinal),
+            start_offset: Some(0),
+            valid_length_bytes: Some(chunk.data.len() as u32),
+            timestamp_ish: chunk.timestamp,
+            start_access_unit: Some(chunk.start_access_unit),
+            known_end_access_unit: Some(chunk.known_end_access_unit),
+        }))
+    }
+
+    pub fn take_buffer_set(self) -> BufferSet {
+        self.buffer_set
+    }
+}
diff --git a/src/media/codecs/stress/src/lib.rs b/src/media/codecs/stress/src/lib.rs
new file mode 100644
index 0000000..a222c54
--- /dev/null
+++ b/src/media/codecs/stress/src/lib.rs
@@ -0,0 +1,161 @@
+// Copyright 2019 The Fuchsia Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#![feature(async_await, await_macro)]
+#![cfg(test)]
+
+#[macro_use]
+mod log_macros;
+mod buffer_collection_constraints;
+mod buffer_set;
+mod codecs;
+mod elementary_stream;
+mod input_packet_stream;
+mod output_validator;
+mod stream;
+mod stream_runner;
+mod test_spec;
+
+use crate::codecs::h264::*;
+use crate::elementary_stream::*;
+use crate::output_validator::*;
+use crate::stream::*;
+use crate::test_spec::*;
+use failure::{Error, Fail};
+use fuchsia_async as fasync;
+use lazy_static::lazy_static;
+use parking_lot::Mutex;
+use std::{fmt, rc::Rc, sync::Arc};
+
+type Result<T> = std::result::Result<T, Error>;
+
+lazy_static! {
+    // We use a test lock to prevent any tests from running in parallel.
+    //
+    // Running in parallel is something we want to control for specific tests cases especially
+    // when testing hardware stream processors.
+    //
+    // This can be removed in the future if we get a static way to specify environment variables
+    // for a component, so we can set `RUST_TEST_THREADS=1` for this component.
+    static ref TEST_LOCK: Arc<Mutex<()>> = Arc::new(Mutex::new(()));
+    static ref LOGGER: () = ::fuchsia_syslog::init().expect("Initializing syslog");
+}
+
+#[derive(Debug)]
+pub struct FatalError(String);
+
+impl fmt::Display for FatalError {
+    fn fmt(&self, w: &mut fmt::Formatter) -> fmt::Result {
+        write!(w, "FatalError: {}", self.0)
+    }
+}
+
+impl Fail for FatalError {}
+
+// TODO(turnage): Add test spec for buffers released between streams.
+// TODO(turnage): Add hash validator for NV12 and YV12.
+
+#[fasync::run_singlethreaded]
+#[test]
+async fn serial_bear_new_codec_for_each() -> Result<()> {
+    let _lock = TEST_LOCK.lock();
+    *LOGGER;
+
+    eprintln!("reading stream file...");
+
+    let stream = Rc::new(TimestampedStream {
+        source: H264Stream::from_file(BEAR_TEST_FILE)?,
+        timestamps: 0..,
+    });
+
+    eprintln!("got file");
+
+    let frame_count_validator = Rc::new(OutputPacketCountValidator {
+        expected_output_packet_count: stream.video_frame_count(),
+    });
+
+    let spec1 = TestSpec {
+        cases: vec![TestCase {
+            name: "Simple bear test run 1 on new channel",
+            stream: stream.clone(),
+            validators: vec![
+                Rc::new(TerminatesWithValidator {
+                    expected_terminal_output: Output::Eos { stream_lifetime_ordinal: 1 },
+                }),
+                frame_count_validator.clone(),
+            ],
+            stream_options: None,
+        }],
+        relation: CaseRelation::Serial,
+    };
+
+    let spec2 = TestSpec {
+        cases: vec![TestCase {
+            name: "Simple bear test run 2 on new channel",
+            stream,
+            validators: vec![
+                Rc::new(TerminatesWithValidator {
+                    expected_terminal_output: Output::Eos { stream_lifetime_ordinal: 1 },
+                }),
+                frame_count_validator,
+            ],
+            stream_options: None,
+        }],
+        relation: CaseRelation::Serial,
+    };
+
+    await!(spec1.run())?;
+    await!(spec2.run())?;
+
+    Ok(())
+}
+
+#[fasync::run_singlethreaded]
+#[test]
+async fn serial_bear_no_release_buffers() -> Result<()> {
+    let _lock = TEST_LOCK.lock();
+    *LOGGER;
+
+    let stream = Rc::new(TimestampedStream {
+        source: H264Stream::from_file(BEAR_TEST_FILE)?,
+        timestamps: 0..,
+    });
+
+    let frame_count_validator = Rc::new(OutputPacketCountValidator {
+        expected_output_packet_count: stream.video_frame_count(),
+    });
+
+    let spec = TestSpec {
+        cases: vec![
+            TestCase {
+                name: "Simple bear test run 1 on same channel",
+                stream: stream.clone(),
+                validators: vec![
+                    Rc::new(TerminatesWithValidator {
+                        expected_terminal_output: Output::Eos { stream_lifetime_ordinal: 1 },
+                    }),
+                    frame_count_validator.clone(),
+                ],
+                stream_options: None,
+            },
+            TestCase {
+                name: "Simple bear test run 2 on same channel",
+                stream,
+                validators: vec![
+                    Rc::new(TerminatesWithValidator {
+                        expected_terminal_output: Output::Eos { stream_lifetime_ordinal: 3 },
+                    }),
+                    frame_count_validator,
+                ],
+                stream_options: Some(StreamOptions {
+                    queue_format_details: false,
+                    ..StreamOptions::default()
+                }),
+            },
+        ],
+        relation: CaseRelation::Serial,
+    };
+
+    await!(spec.run())
+}
diff --git a/src/media/codecs/stress/src/log_macros.rs b/src/media/codecs/stress/src/log_macros.rs
new file mode 100644
index 0000000..b35a55d
--- /dev/null
+++ b/src/media/codecs/stress/src/log_macros.rs
@@ -0,0 +1,8 @@
+// Copyright 2019 The Fuchsia Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#[macro_export]
+macro_rules! vlog {
+    ($v:expr, $($arg:tt)+) => (::fuchsia_syslog::fx_vlog!(tag: "codec_stress_tests", $v, $($arg)+))
+}
diff --git a/src/media/codecs/stress/src/output_validator.rs b/src/media/codecs/stress/src/output_validator.rs
new file mode 100644
index 0000000..56f3c7d
--- /dev/null
+++ b/src/media/codecs/stress/src/output_validator.rs
@@ -0,0 +1,96 @@
+// Copyright 2019 The Fuchsia Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+use crate::{buffer_set::*, FatalError};
+use failure::Error;
+use fidl_fuchsia_media::{FormatDetails, StreamOutputFormat};
+use fidl_table_validation::*;
+use std::rc::Rc;
+
+#[derive(ValidFidlTable, Debug, PartialEq)]
+#[fidl_table_src(StreamOutputFormat)]
+pub struct ValidStreamOutputFormat {
+    pub stream_lifetime_ordinal: u64,
+    pub format_details: FormatDetails,
+}
+
+/// An output packet from the stream.
+#[derive(Debug, PartialEq)]
+pub struct OutputPacket {
+    pub data: Vec<u8>,
+    pub format: Rc<ValidStreamOutputFormat>,
+    pub packet: ValidPacket,
+}
+
+/// Output represents any output from a stream we might want to validate programmatically.
+///
+/// This may extend to contain not just explicit events but certain stream control behaviors or
+/// even errors.
+#[derive(Debug, PartialEq)]
+pub enum Output {
+    Packet(OutputPacket),
+    Eos { stream_lifetime_ordinal: u64 },
+    CodecChannelClose,
+}
+
+/// Checks all output packets, which are provided to the validator in the order in which they
+/// were received from the stream processor.
+///
+/// Failure should be indicated by returning an error, not by panic, so that the full context of
+/// the error will be available in the failure output.
+pub trait OutputValidator {
+    fn validate(&self, output: &[Output]) -> Result<(), Error>;
+}
+
+/// Validates that the output contains the expected number of packets.
+pub struct OutputPacketCountValidator {
+    pub expected_output_packet_count: usize,
+}
+
+impl OutputValidator for OutputPacketCountValidator {
+    fn validate(&self, output: &[Output]) -> Result<(), Error> {
+        let actual_output_packet_count: usize = output
+            .iter()
+            .filter(|output| match output {
+                Output::Packet(_) => true,
+                _ => false,
+            })
+            .count();
+
+        if actual_output_packet_count != self.expected_output_packet_count {
+            return Err(FatalError(format!(
+                "actual output packet count: {}; expected output packet count: {}",
+                actual_output_packet_count, self.expected_output_packet_count
+            ))
+            .into());
+        }
+
+        Ok(())
+    }
+}
+
+/// Validates that a stream terminates with Eos.
+pub struct TerminatesWithValidator {
+    pub expected_terminal_output: Output,
+}
+
+impl OutputValidator for TerminatesWithValidator {
+    fn validate(&self, output: &[Output]) -> Result<(), Error> {
+        let actual_terminal_output = output.last().ok_or(FatalError(format!(
+            "In terminal output: expected {:?}; found: None",
+            Some(&self.expected_terminal_output)
+        )))?;
+
+        if *actual_terminal_output == self.expected_terminal_output {
+            Ok(())
+        } else {
+            Err(FatalError(format!(
+                "In terminal output: expected {:?}; found: {:?}",
+                Some(&self.expected_terminal_output),
+                actual_terminal_output
+            ))
+            .into())
+        }
+    }
+}
diff --git a/src/media/codecs/stress/src/stream.rs b/src/media/codecs/stress/src/stream.rs
new file mode 100644
index 0000000..d86dd09
--- /dev/null
+++ b/src/media/codecs/stress/src/stream.rs
@@ -0,0 +1,226 @@
+// Copyright 2019 The Fuchsia Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+use crate::{
+    buffer_set::*, elementary_stream::*, input_packet_stream::*, output_validator::*, FatalError,
+    Result,
+};
+use fidl_fuchsia_media::*;
+use fidl_fuchsia_sysmem::BufferCollectionConstraints;
+use std::{convert::TryFrom, rc::Rc};
+
+pub type OrdinalSequence = <OrdinalPattern as IntoIterator>::IntoIter;
+
+#[derive(Debug, Copy, Clone)]
+pub struct StreamOptions {
+    /// When true, the stream runner will queue format details for each stream. Otherwise it will
+    /// inherit format details from the codec factory.
+    pub queue_format_details: bool,
+    pub release_input_buffers_at_end: bool,
+    pub release_output_buffers_at_end: bool,
+    pub input_buffer_collection_constraints: Option<BufferCollectionConstraints>,
+    pub output_buffer_collection_constraints: Option<BufferCollectionConstraints>,
+}
+
+impl Default for StreamOptions {
+    fn default() -> Self {
+        Self {
+            queue_format_details: true,
+            release_input_buffers_at_end: false,
+            release_output_buffers_at_end: false,
+            input_buffer_collection_constraints: None,
+            output_buffer_collection_constraints: None,
+        }
+    }
+}
+
+pub struct Stream<'a> {
+    pub format_details_version_ordinal: u64,
+    pub stream_lifetime_ordinal: u64,
+    pub input_buffer_ordinals: &'a mut OrdinalSequence,
+    pub input_packet_stream:
+        Option<InputPacketStream<Box<dyn Iterator<Item = ElementaryStreamChunk<'a>> + 'a>>>,
+    pub output_buffer_ordinals: &'a mut OrdinalSequence,
+    pub output_buffer_set: Option<BufferSet>,
+    pub current_output_format: Option<Rc<ValidStreamOutputFormat>>,
+    pub codec: &'a mut StreamProcessorProxy,
+    pub stream: &'a ElementaryStream,
+    pub options: StreamOptions,
+    pub output: Vec<Output>,
+}
+
+pub enum StreamControlFlow {
+    Continue,
+    Stop,
+}
+
+impl<'a: 'b, 'b> Stream<'a> {
+    pub async fn start(&'b mut self) -> Result<()> {
+        if self.options.queue_format_details && self.input_packet_stream.is_some() {
+            self.codec.queue_input_format_details(
+                self.stream_lifetime_ordinal,
+                self.stream.format_details(self.format_details_version_ordinal),
+            )?;
+        }
+
+        self.send_available_input()?;
+
+        Ok(())
+    }
+
+    pub async fn handle_event(
+        &'b mut self,
+        event: StreamProcessorEvent,
+    ) -> Result<StreamControlFlow> {
+        match event {
+            StreamProcessorEvent::OnInputConstraints { input_constraints } => {
+                vlog!(2, "Received input constraints.");
+                vlog!(3, "Input constraints are: {:#?}", input_constraints);
+
+                let buffer_set = await!(BufferSetFactory::buffer_set(
+                    get_ordinal(self.input_buffer_ordinals),
+                    ValidStreamBufferConstraints::try_from(input_constraints)?,
+                    self.codec,
+                    BufferSetType::Input,
+                    self.options.input_buffer_collection_constraints,
+                ))?;
+
+                self.codec.queue_input_format_details(
+                    self.stream_lifetime_ordinal,
+                    self.stream.format_details(self.format_details_version_ordinal),
+                )?;
+
+                self.input_packet_stream = Some(InputPacketStream::new(
+                    buffer_set,
+                    self.stream.stream(),
+                    self.stream_lifetime_ordinal,
+                ));
+                self.send_available_input()?;
+            }
+            StreamProcessorEvent::OnOutputConstraints { output_config } => {
+                vlog!(2, "Received output constraints.");
+                vlog!(3, "Output constraints are: {:#?}", output_config);
+
+                let constraints = ValidStreamOutputConstraints::try_from(output_config)?;
+                if constraints.buffer_constraints_action_required {
+                    self.output_buffer_set = Some(await!(BufferSetFactory::buffer_set(
+                        get_ordinal(self.output_buffer_ordinals),
+                        constraints.buffer_constraints,
+                        self.codec,
+                        BufferSetType::Output,
+                        self.options.output_buffer_collection_constraints,
+                    ))?);
+                }
+            }
+            StreamProcessorEvent::OnFreeInputPacket { free_input_packet } => {
+                vlog!(2, "Received freed input packet.");
+                vlog!(2, "Freed input packet is: {:#?}", free_input_packet);
+
+                let free_input_packet = ValidPacketHeader::try_from(free_input_packet)?;
+                let input_packet_stream = self.input_packet_stream.as_mut().expect(concat!(
+                    "Unwrapping packet stream; ",
+                    "it should be set before we ",
+                    "get free input packets back."
+                ));
+                input_packet_stream.add_free_packet(free_input_packet)?;
+                self.send_available_input()?;
+            }
+            StreamProcessorEvent::OnOutputFormat { output_format } => {
+                vlog!(2, "Received output format.");
+                vlog!(3, "Output format is: {:#?}", output_format);
+
+                let output_format = ValidStreamOutputFormat::try_from(output_format)?;
+                assert_eq!(output_format.stream_lifetime_ordinal, self.stream_lifetime_ordinal);
+                self.current_output_format = Some(Rc::new(output_format));
+            }
+            StreamProcessorEvent::OnOutputPacket {
+                output_packet,
+                error_detected_before,
+                error_detected_during,
+            } => {
+                assert!(!error_detected_before);
+                assert!(!error_detected_during);
+                vlog!(2, "Received output packet.");
+                vlog!(3, "Output packet is: {:#?}", output_packet);
+
+                let output_packet = ValidPacket::try_from(output_packet)?;
+                self.output.push(Output::Packet(OutputPacket {
+                    data: self
+                        .output_buffer_set
+                        .as_ref()
+                        .ok_or(FatalError(String::from(concat!(
+                            "There should be an output buffer set ",
+                            "if we are receiving output packets"
+                        ))))?
+                        .read_packet(&output_packet)?,
+                    format: self.current_output_format.clone().ok_or(FatalError(String::from(
+                        concat!(
+                            "There should be an output format set ",
+                            "if we are receiving output packets"
+                        ),
+                    )))?,
+                    packet: output_packet,
+                }));
+
+                self.codec.recycle_output_packet(PacketHeader {
+                    buffer_lifetime_ordinal: Some(output_packet.header.buffer_lifetime_ordinal),
+                    packet_index: Some(output_packet.header.packet_index),
+                })?;
+            }
+            StreamProcessorEvent::OnOutputEndOfStream {
+                stream_lifetime_ordinal,
+                error_detected_before,
+            } => {
+                assert!(!error_detected_before);
+                vlog!(2, "Received output end of stream.");
+                vlog!(3, "End of stream is for stream {}", stream_lifetime_ordinal);
+
+                // TODO(turnage): Enable the flush method of ending stream in options.
+                self.output.push(Output::Eos { stream_lifetime_ordinal });
+                self.codec.close_current_stream(
+                    self.stream_lifetime_ordinal,
+                    self.options.release_input_buffers_at_end,
+                    self.options.release_output_buffers_at_end,
+                )?;
+                await!(self.codec.sync())?;
+
+                // TODO(turnage): Some codecs return all input packets explicitly, not
+                //                implicitly. All codecs should return explicitly. For now
+                //                we forgive it but soon we want to check that all input
+                //                packets will come back.
+                return Ok(StreamControlFlow::Stop);
+            }
+            e => {
+                vlog!(2, "Got other event: {:#?}", e);
+            }
+        }
+
+        Ok(StreamControlFlow::Continue)
+    }
+
+    fn send_available_input(&'b mut self) -> Result<()> {
+        let input_packet_stream =
+            if let Some(input_packet_stream) = self.input_packet_stream.as_mut() {
+                input_packet_stream
+            } else {
+                return Ok(());
+            };
+
+        loop {
+            match input_packet_stream.next_packet()? {
+                PacketPoll::Ready(input_packet) => {
+                    vlog!(2, "Sending input packet.");
+                    break Ok(self.codec.queue_input_packet(input_packet)?);
+                }
+                PacketPoll::Eos => {
+                    vlog!(2, "Sending stream close");
+                    break Ok(self
+                        .codec
+                        .queue_input_end_of_stream(self.stream_lifetime_ordinal)?);
+                }
+                PacketPoll::NotReady => break Ok(()),
+            }
+        }
+    }
+}
diff --git a/src/media/codecs/stress/src/stream_runner.rs b/src/media/codecs/stress/src/stream_runner.rs
new file mode 100644
index 0000000..ea6df9f
--- /dev/null
+++ b/src/media/codecs/stress/src/stream_runner.rs
@@ -0,0 +1,145 @@
+// Copyright 2019 The Fuchsia Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+use crate::{
+    buffer_set::*, elementary_stream::*, input_packet_stream::*, output_validator::*, stream::*,
+    Result,
+};
+use fidl::endpoints::*;
+use fidl_fuchsia_media::*;
+use fidl_fuchsia_mediacodec::*;
+use fuchsia_component::client;
+use futures::TryStreamExt;
+use std::rc::Rc;
+
+/// Runs elementary streams through a stream processor.
+pub struct StreamRunner {
+    input_buffer_ordinals: OrdinalSequence,
+    output_buffer_ordinals: OrdinalSequence,
+    stream_lifetime_ordinals: OrdinalSequence,
+    format_details_ordinals: OrdinalSequence,
+    output_buffer_set: Option<BufferSet>,
+    input_buffer_set: Option<BufferSet>,
+    current_codec: Option<StreamProcessorProxy>,
+}
+
+impl StreamRunner {
+    pub fn new() -> Self {
+        Self {
+            input_buffer_ordinals: OrdinalPattern::Odd.into_iter(),
+            output_buffer_ordinals: OrdinalPattern::Odd.into_iter(),
+            stream_lifetime_ordinals: OrdinalPattern::Odd.into_iter(),
+            format_details_ordinals: OrdinalPattern::All.into_iter(),
+            input_buffer_set: None,
+            output_buffer_set: None,
+            current_codec: None,
+        }
+    }
+
+    pub async fn run_stream(
+        &mut self,
+        stream: Rc<dyn ElementaryStream>,
+        options: StreamOptions,
+    ) -> Result<Vec<Output>> {
+        let format_details_version_ordinal = get_ordinal(&mut self.format_details_ordinals);
+        let stream_lifetime_ordinal = get_ordinal(&mut self.stream_lifetime_ordinals);
+
+        vlog!(
+            2,
+            "Starting a stream with lifetime ordinal {} and format details ordinal {}",
+            stream_lifetime_ordinal,
+            format_details_version_ordinal
+        );
+
+        let mut codec = if let Some(codec) = self.current_codec.take() {
+            codec
+        } else {
+            // TODO(turnage): Accept parameters for using a decoder vs encoder,
+            // and their parameters.
+            await!(get_decoder(stream.as_ref(), format_details_version_ordinal))?
+        };
+        let mut events = codec.take_event_stream();
+
+        let output = {
+            let mut stream = Stream {
+                format_details_version_ordinal,
+                stream_lifetime_ordinal,
+                input_buffer_ordinals: &mut self.input_buffer_ordinals,
+                input_packet_stream: self.input_buffer_set.take().map(|buffer_set| {
+                    InputPacketStream::new(buffer_set, stream.stream(), stream_lifetime_ordinal)
+                }),
+                output_buffer_ordinals: &mut self.output_buffer_ordinals,
+                output_buffer_set: self.output_buffer_set.take(),
+                current_output_format: None,
+                codec: &mut codec,
+                stream: stream.as_ref(),
+                options,
+                output: vec![],
+            };
+
+            await!(stream.start())?;
+
+            let channel_closed = loop {
+                let event = if let Some(event) = await!(events.try_next())? {
+                    event
+                } else {
+                    break true;
+                };
+
+                let control_flow = await!(stream.handle_event(event))?;
+                match control_flow {
+                    StreamControlFlow::Continue => {}
+                    StreamControlFlow::Stop => break false,
+                };
+            };
+
+            let mut output = stream.output;
+            if channel_closed {
+                output.push(Output::CodecChannelClose);
+            }
+
+            self.input_buffer_set =
+                stream.input_packet_stream.map(|stream| stream.take_buffer_set());
+            self.output_buffer_set = stream.output_buffer_set;
+
+            output
+        };
+
+        self.current_codec = Some(codec);
+
+        if options.release_input_buffers_at_end {
+            self.input_buffer_set = None;
+        }
+
+        if options.release_output_buffers_at_end {
+            self.output_buffer_set = None;
+        }
+
+        Ok(output)
+    }
+}
+
+async fn get_decoder(
+    stream: &ElementaryStream,
+    format_details_version_ordinal: u64,
+) -> Result<StreamProcessorProxy> {
+    let factory = client::connect_to_service::<CodecFactoryMarker>()?;
+    let (decoder_client_end, decoder_request) = create_endpoints()?;
+    let decoder = decoder_client_end.into_proxy()?;
+    // TODO(turnage): Account for all error reporting methods in the runner options and output.
+    factory.create_decoder(
+        CreateDecoderParams {
+            input_details: Some(stream.format_details(format_details_version_ordinal)),
+            promise_separate_access_units_on_input: Some(stream.is_access_units()),
+            require_can_stream_bytes_input: Some(false),
+            require_can_find_start: Some(false),
+            require_can_re_sync: Some(false),
+            require_report_all_detected_errors: Some(false),
+            require_hw: Some(false),
+            permit_lack_of_split_header_handling: Some(true),
+        },
+        decoder_request,
+    )?;
+    Ok(decoder)
+}
diff --git a/src/media/codecs/stress/src/test_spec.rs b/src/media/codecs/stress/src/test_spec.rs
new file mode 100644
index 0000000..b74398d
--- /dev/null
+++ b/src/media/codecs/stress/src/test_spec.rs
@@ -0,0 +1,69 @@
+// Copyright 2019 The Fuchsia Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+use crate::{elementary_stream::*, output_validator::*, stream::*, stream_runner::*};
+use failure::{Error, ResultExt};
+use futures::{stream::FuturesUnordered, TryStreamExt};
+use std::rc::Rc;
+
+/// A test spec describes all the cases that will run and the circumstances in which
+/// they will run.
+pub struct TestSpec {
+    pub cases: Vec<TestCase>,
+    pub relation: CaseRelation,
+}
+
+/// A case relation describes the temporal relationship between two test cases.
+pub enum CaseRelation {
+    /// With serial relation, test cases will be run in sequence using the same codec server.
+    Serial,
+    /// With concurrent relation, test cases will run concurrently using two codec servers.
+    #[allow(unused)]
+    Concurrent,
+}
+
+/// A test cases describes a sequence of elementary stream chunks that should be fed into a codec
+/// server, and a set of validators to check the output. To pass, all validations must pass for all
+/// output from the stream.
+pub struct TestCase {
+    pub name: &'static str,
+    pub stream: Rc<dyn ElementaryStream>,
+    pub validators: Vec<Rc<dyn OutputValidator>>,
+    pub stream_options: Option<StreamOptions>,
+}
+
+impl TestSpec {
+    pub async fn run(self) -> Result<(), Error> {
+        match self.relation {
+            CaseRelation::Serial => await!(run_cases_serially(self.cases)),
+            CaseRelation::Concurrent => await!(run_cases_concurrently(self.cases)),
+        }
+    }
+}
+
+async fn run_cases_serially(cases: Vec<TestCase>) -> Result<(), Error> {
+    let mut stream_runner = StreamRunner::new();
+
+    for case in cases {
+        let output =
+            await!(stream_runner.run_stream(case.stream, case.stream_options.unwrap_or_default()))
+                .context(format!("Running case {}", case.name))?;
+        for validator in case.validators {
+            validator.validate(&output).context(format!("Validating case {}", case.name))?;
+        }
+    }
+
+    Ok(())
+}
+
+async fn run_cases_concurrently(cases: Vec<TestCase>) -> Result<(), Error> {
+    let mut unordered = FuturesUnordered::new();
+    for case in cases {
+        unordered.push(run_cases_serially(vec![case]))
+    }
+
+    while let Some(()) = await!(unordered.try_next())? {}
+
+    Ok(())
+}