blob: c436f5f78fc9566cc6752f8bcd1542202895b456 [file] [log] [blame]
// Copyright 2019 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use anyhow::Error;
use bitfield::bitfield;
use bt_avdtp as avdtp;
use fidl_fuchsia_media::{
AudioFormat, AudioUncompressedFormat, DomainFormat, EncoderSettings, PcmFormat, SbcAllocation,
SbcBlockCount, SbcChannelMode, SbcEncoderSettings, SbcSubBands,
};
use fuchsia_audio_codec::{StreamProcessor, StreamProcessorOutputStream};
use futures::{
io::AsyncWrite,
stream::BoxStream,
task::{Context, Poll},
Stream, StreamExt,
};
use std::{collections::VecDeque, iter::once, pin::Pin};
use fuchsia_syslog::fx_log_info;
pub struct RtpPacketBuilderSbc {
/// The number of frames to be included in each packet
frames_per_packet: u8,
/// The next packet's sequence number
next_sequence_number: u16,
/// Timestamp of the end of the packets sent so far. This is currently in audio sample units.
timestamp: u32,
/// Frames that will be in the next RtpPacket to be sent.
frames: VecDeque<Vec<u8>>,
/// Time that those frames represent, in the same units as `timestamp`
frame_time: u32,
}
bitfield! {
struct RtpHeader(MSB0 [u8]);
u8, version, set_version: 1, 0;
bool, padding, set_padding: 2;
bool, extension, set_extension: 3;
u8, csrc_count, set_csrc_count: 7, 4;
bool, marker, set_marker: 8;
u8, payload_type, set_payload_type: 15, 9;
u16, sequence_number, set_sequence_number: 31, 16;
u32, timestamp, set_timestamp: 63, 32;
u32, ssrc, set_ssrc: 95, 64;
}
impl RtpPacketBuilderSbc {
/// Make a new builder that will vend a packet every `frames_per_packet` frames.
pub fn new(frames_per_packet: u8) -> Self {
Self {
frames_per_packet,
next_sequence_number: 1,
timestamp: 0,
frames: VecDeque::with_capacity(frames_per_packet.into()),
frame_time: 0,
}
}
/// Add a frame that is `time` audio samples long into the builder.
/// Returns a serialized RTP packet if this frame fills the packet,
/// or an Error.
pub fn push_frame(&mut self, frame: Vec<u8>, time: u32) -> Result<Option<Vec<u8>>, Error> {
self.frames.push_back(frame);
self.frame_time = self.frame_time + time;
if self.frames.len() >= self.frames_per_packet.into() {
let mut header = RtpHeader([0; 12]);
header.set_version(2);
header.set_payload_type(96); // Dynamic payload type indicated by RFC 3551, recommended by the spec
header.set_sequence_number(self.next_sequence_number);
header.set_timestamp(self.timestamp);
let header_iter = header.0.iter().cloned();
let frame_bytes_iter = self.frames.drain(..).flatten();
let packet =
header_iter.chain(once(self.frames_per_packet)).chain(frame_bytes_iter).collect();
self.next_sequence_number = self.next_sequence_number.wrapping_add(1);
self.timestamp = self.timestamp + self.frame_time;
self.frame_time = 0;
return Ok(Some(packet));
}
Ok(None)
}
}
pub struct EncodedStreamSbc {
/// The input media stream
source: BoxStream<'static, fuchsia_audio_device_output::Result<Vec<u8>>>,
/// The underlying encoder object
encoder: StreamProcessor,
/// The underlying encoder stream
encoded_stream: StreamProcessorOutputStream,
/// Bytes that have been sent to the encoder and not flushed.
unflushed_bytecount: usize,
/// Bytes that are buffered to send to the encoder
encoder_input_buffers: VecDeque<Vec<u8>>,
/// Cursor on the first buffer waiting indicating the next byte to be written to the encoder
encoder_input_cursor: usize,
}
impl EncodedStreamSbc {
pub fn build(
input_format: PcmFormat,
_sbc_settings: &avdtp::ServiceCapability,
source: BoxStream<'static, fuchsia_audio_device_output::Result<Vec<u8>>>,
) -> Result<Self, Error> {
// TODO: take these from the sbc_settings
let sbc_encoder_settings = EncoderSettings::Sbc(SbcEncoderSettings {
sub_bands: SbcSubBands::SubBands8,
allocation: SbcAllocation::AllocLoudness,
block_count: SbcBlockCount::BlockCount16,
channel_mode: SbcChannelMode::JointStereo,
bit_pool: 53,
});
let pcm_input_format = DomainFormat::Audio(AudioFormat::Uncompressed(
AudioUncompressedFormat::Pcm(input_format),
));
let mut encoder = StreamProcessor::create_encoder(pcm_input_format, sbc_encoder_settings)?;
let encoded_stream = encoder.take_output_stream()?;
Ok(Self {
source,
encoder,
encoded_stream,
unflushed_bytecount: 0,
encoder_input_buffers: VecDeque::new(),
encoder_input_cursor: 0,
})
}
}
// TODO(40986, 41449): Update this based on the input format and the codec settings.
pub const PCM_FRAMES_PER_ENCODED: usize = 640;
const ENCODED_PER_PACKET: usize = 5;
const BYTES_PER_PCM_FRAME: usize = 4;
const PCM_BYTES_PER_ENCODED_PACKET: usize =
PCM_FRAMES_PER_ENCODED * ENCODED_PER_PACKET * BYTES_PER_PCM_FRAME;
impl Stream for EncodedStreamSbc {
type Item = Result<Vec<u8>, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
// Read audio out.
while let Poll::Ready(item) = self.source.poll_next_unpin(cx) {
match item {
None => {
fx_log_info!("Audio stream closed.");
return Poll::Ready(None);
}
Some(Err(e)) => return Poll::Ready(Some(Err(e.into()))),
Some(Ok(bytes)) => self.encoder_input_buffers.push_back(bytes),
}
}
// Push audio into the encoder.
while let Some(vec) = self.encoder_input_buffers.pop_front() {
let cursor = self.encoder_input_cursor;
match Pin::new(&mut self.encoder).poll_write(cx, &vec[cursor..]) {
Poll::Pending => break,
Poll::Ready(Err(e)) => return Poll::Ready(Some(Err(e.into()))),
Poll::Ready(Ok(written)) => {
self.encoder_input_cursor = cursor + written;
self.unflushed_bytecount = self.unflushed_bytecount + written;
// flush() if we have sent enough bytes to generate a frame
if self.unflushed_bytecount > PCM_BYTES_PER_ENCODED_PACKET {
// Attempt to flush.
if self.encoder.flush().is_ok() {
self.unflushed_bytecount = 0;
}
}
if self.encoder_input_cursor != vec.len() {
self.encoder_input_buffers.push_front(vec);
continue;
} else {
// Reset to the front of the next buffer.
self.encoder_input_cursor = 0
}
}
}
}
// Finally, read data out of the encoder if it's ready.
self.encoded_stream.poll_next_unpin(cx)
}
}
#[cfg(test)]
mod tests {
use super::*;
use fidl_fuchsia_media::{AudioChannelId, AudioPcmMode};
use fuchsia_async::{self as fasync, TimeoutExt};
use fuchsia_zircon::{self as zx, DurationNum};
use futures::FutureExt;
#[test]
fn test_packet_builder_sbc() {
let mut builder = RtpPacketBuilderSbc::new(5);
assert!(builder.push_frame(vec![0xf0], 1).unwrap().is_none());
assert!(builder.push_frame(vec![0x9f], 2).unwrap().is_none());
assert!(builder.push_frame(vec![0x92], 4).unwrap().is_none());
assert!(builder.push_frame(vec![0x96], 8).unwrap().is_none());
let result = builder.push_frame(vec![0x33], 16);
assert!(result.is_ok());
let expected = &[
0x80, 0x60, 0x00, 0x01, // Sequence num
0x00, 0x00, 0x00, 0x00, // timestamp = 0
0x00, 0x00, 0x00, 0x00, // SSRC = 0
0x05, // Frames in this packet
0xf0, 0x9f, 0x92, 0x96, 0x33, // 💖!
];
let result = result.unwrap().expect("a packet after 5 more frames");
assert_eq!(expected.len(), result.len());
assert_eq!(expected, &result[0..expected.len()]);
assert!(builder.push_frame(vec![0xf0], 32).unwrap().is_none());
assert!(builder.push_frame(vec![0x9f], 64).unwrap().is_none());
assert!(builder.push_frame(vec![0x92], 128).unwrap().is_none());
assert!(builder.push_frame(vec![0x96], 256).unwrap().is_none());
let result = builder.push_frame(vec![0x33], 512);
assert!(result.is_ok());
let expected = &[
0x80, 0x60, 0x00, 0x02, // Sequence num
0x00, 0x00, 0x00, 0x1F, // timestamp = 2^0 + 2^1 + 2^2 + 2^3 + 2^4)
0x00, 0x00, 0x00, 0x00, // SSRC = 0
0x05, // Frames in this packet
0xf0, 0x9f, 0x92, 0x96, 0x33, // 💖!
];
let result = result.unwrap().expect("a packet after 5 more frames");
assert_eq!(expected.len(), result.len());
assert_eq!(expected, &result[0..expected.len()]);
}
struct SilenceStream {
pcm_format: PcmFormat,
next_frame_timer: fasync::Timer,
/// the last time we delivered frames.
last_frame_time: Option<zx::Time>,
}
const PCM_SAMPLE_SIZE: usize = 2;
impl futures::Stream for SilenceStream {
type Item = fuchsia_audio_device_output::Result<Vec<u8>>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let now = zx::Time::get(zx::ClockId::Monotonic);
if self.last_frame_time.is_none() {
self.last_frame_time = Some(now - 1.second());
}
let last_time = self.last_frame_time.as_ref().unwrap().clone();
let repeats = (now - last_time).into_seconds();
if repeats == 0 {
self.next_frame_timer = fasync::Timer::new((last_time + 1.second()).into());
let poll = self.next_frame_timer.poll_unpin(cx);
assert_eq!(Poll::Pending, poll);
return Poll::Pending;
}
// Generate one second of silence.
let pcm_frame_size = self.pcm_format.channel_map.len() * PCM_SAMPLE_SIZE;
let buffer = vec![0; self.pcm_format.frames_per_second as usize * pcm_frame_size];
self.last_frame_time = Some(last_time + 1.second());
Poll::Ready(Some(Ok(buffer)))
}
}
impl SilenceStream {
fn build(pcm_format: PcmFormat) -> Self {
Self {
pcm_format,
next_frame_timer: fasync::Timer::new(fasync::Time::INFINITE_PAST),
last_frame_time: None,
}
}
}
const TIMEOUT: zx::Duration = zx::Duration::from_millis(5000);
#[test]
fn test_encodes_correctly() {
let mut exec = fasync::Executor::new().expect("failed to create an executor");
let pcm_format = PcmFormat {
pcm_mode: AudioPcmMode::Linear,
bits_per_sample: 16,
frames_per_second: 48000,
channel_map: vec![AudioChannelId::Lf, AudioChannelId::Rf],
};
let sbc_settings = avdtp::ServiceCapability::MediaCodec {
media_type: avdtp::MediaType::Audio,
codec_type: avdtp::MediaCodecType::AUDIO_SBC,
codec_extra: vec![0x11, 0x15, 2, 53],
};
// Mayve just replace this with future::repeat(0)
let silence_source = SilenceStream::build(pcm_format.clone());
let mut encoder =
EncodedStreamSbc::build(pcm_format, &sbc_settings, silence_source.boxed())
.expect("building Stream works");
let mut next_frame_fut = encoder
.next()
.on_timeout(fasync::Time::after(TIMEOUT), || panic!("Encoder took too long"));
match exec.run_singlethreaded(&mut next_frame_fut) {
Some(Ok(enc_frame)) => {
// maybe match the actual SBC output here, but since the encoder itself is tested,
// just confirming output can be created here is probably sufficient
assert!(!enc_frame.is_empty());
}
Some(Err(e)) => panic!("Uexpected error encoding: {}", e),
None => panic!("Encoder finished without a frame"),
}
}
}