blob: f462105d3eb94306f1f9748bef4aa34dfa4bf07c [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "src/media/audio/audio_core/capture_packet_queue.h"
#include <lib/syslog/cpp/macros.h>
#include <lib/trace/event.h>
#include "src/lib/fxl/strings/string_printf.h"
#include "src/media/audio/lib/format/format.h"
using Packet = media::audio::CapturePacketQueue::Packet;
// The maximum number of slabs per each capture queue. Allow enough slabs so
// we can allocate ~1000 packets. At 10ms per packet, that is ~10s of audio.
static const size_t kMaxSlabs = static_cast<size_t>(
ceil(1000.0 / (static_cast<double>(fbl::DEFAULT_SLAB_ALLOCATOR_SLAB_SIZE) / sizeof(Packet))));
static const size_t kMaxPackets =
(kMaxSlabs * fbl::DEFAULT_SLAB_ALLOCATOR_SLAB_SIZE) / sizeof(Packet);
namespace media::audio {
CapturePacketQueue::CapturePacketQueue(Mode mode, const fzl::VmoMapper& payload_buffer,
const Format& format)
: mode_(mode),
payload_buffer_(payload_buffer),
payload_buffer_frames_(payload_buffer.size() / format.bytes_per_frame()),
format_(format),
allocator_(kMaxSlabs, true) {}
fit::result<std::shared_ptr<CapturePacketQueue>, std::string>
CapturePacketQueue::CreatePreallocated(const fzl::VmoMapper& payload_buffer, const Format& format,
size_t frames_per_packet) {
auto out = std::make_shared<CapturePacketQueue>(Mode::Preallocated, payload_buffer, format);
// Locking is not strictly necessary here, but it makes the lock analysis simpler.
std::lock_guard<std::mutex> lock(out->mutex_);
// Sanity check the number of frames per packet the user is asking for.
//
// Currently our minimum frames-per-packet is 1, which is absurdly low.
// TODO(fxbug.dev/13344): Decide on a proper minimum packet size, document it, and enforce the
// limit here.
if (frames_per_packet == 0) {
return fit::error("frames per packet may not be zero");
}
if (frames_per_packet > (out->payload_buffer_frames_ / 2)) {
return fit::error(fxl::StringPrintf(
"there must be enough room in the shared payload buffer (%lu frames) to fit at least two "
"packets of the requested number of frames per packet (%lu frames).",
out->payload_buffer_frames_, frames_per_packet));
}
// Pre-allocate every packet.
for (size_t frame = 0; frame + frames_per_packet <= out->payload_buffer_frames_;
frame += frames_per_packet) {
auto p = out->Alloc(frame, frames_per_packet, nullptr);
if (!p) {
return fit::error(fxl::StringPrintf(
"packet queue is too large; exceeded limit after %lu packets", kMaxPackets));
}
out->pending_.push_back(p);
}
return fit::ok(std::move(out));
}
std::shared_ptr<CapturePacketQueue> CapturePacketQueue::CreateDynamicallyAllocated(
const fzl::VmoMapper& payload_buffer, const Format& format) {
return std::make_shared<CapturePacketQueue>(Mode::DynamicallyAllocated, payload_buffer, format);
}
fbl::RefPtr<Packet> CapturePacketQueue::Alloc(size_t offset_frames, size_t num_frames,
CaptureAtCallback callback) {
size_t payload_offset = offset_frames * format_.bytes_per_frame();
char* payload_start = static_cast<char*>(payload_buffer_.start()) + payload_offset;
return allocator_.New(std::move(callback), num_frames, payload_offset, payload_start);
}
bool CapturePacketQueue::empty() const {
std::lock_guard<std::mutex> lock(mutex_);
return pending_.empty() && ready_.empty();
}
size_t CapturePacketQueue::PendingSize() const {
std::lock_guard<std::mutex> lock(mutex_);
return pending_.size();
}
size_t CapturePacketQueue::ReadySize() const {
std::lock_guard<std::mutex> lock(mutex_);
return ready_.size();
}
std::optional<CapturePacketQueue::PacketMixState> CapturePacketQueue::NextMixerJob() {
TRACE_INSTANT("audio", "CapturePacketQueue::NextMixerJob", TRACE_SCOPE_THREAD);
std::lock_guard<std::mutex> lock(mutex_);
if (shutdown_ || pending_.empty()) {
return std::nullopt;
}
auto p = pending_.front();
return PacketMixState{
.packet = p,
.capture_timestamp = p->state_.capture_timestamp,
.flags = p->state_.flags,
.target = p->payload_buffer_start_ + p->state_.filled_frames * format_.bytes_per_frame(),
.frames = p->num_frames_ - p->state_.filled_frames,
};
}
CapturePacketQueue::PacketMixStatus CapturePacketQueue::FinishMixerJob(
const PacketMixState& state) {
TRACE_INSTANT("audio", "CapturePacketQueue::FinishMixerJob", TRACE_SCOPE_THREAD);
std::lock_guard<std::mutex> lock(mutex_);
if (pending_.empty() || pending_.front() != state.packet) {
return PacketMixStatus::Discarded;
}
auto& p = state.packet;
p->state_.capture_timestamp = state.capture_timestamp;
p->state_.flags = state.flags;
p->state_.filled_frames += state.frames;
if (p->state_.filled_frames < p->num_frames_) {
return PacketMixStatus::Partial;
}
PopPendingLocked();
return PacketMixStatus::Done;
}
void CapturePacketQueue::DiscardPendingPackets() {
TRACE_INSTANT("audio", "CapturePacketQueue::DiscardPendingPackets", TRACE_SCOPE_THREAD);
std::lock_guard<std::mutex> lock(mutex_);
while (!pending_.empty()) {
PopPendingLocked();
}
}
void CapturePacketQueue::PopPendingLocked() {
// Caller must acquire the lock.
auto p = pending_.front();
// Now that this packet is ready, create the final StreamPacket.
auto& pkt = p->stream_packet_;
pkt.pts = p->state_.capture_timestamp;
pkt.flags = p->state_.flags;
pkt.payload_buffer_id = 0u;
pkt.payload_offset = p->payload_buffer_offset_;
pkt.payload_size = p->state_.filled_frames * format_.bytes_per_frame();
// Move to the ready queue.
pending_.pop_front();
ready_.push_back(p);
p->ready_time_ = zx::clock::get_monotonic();
p->ready_.store(true);
}
fbl::RefPtr<CapturePacketQueue::Packet> CapturePacketQueue::PopReady() {
TRACE_INSTANT("audio", "CapturePacketQueue::PopReady", TRACE_SCOPE_THREAD);
std::lock_guard<std::mutex> lock(mutex_);
if (ready_.empty()) {
return nullptr;
}
auto p = ready_.front();
ready_.pop_front();
if (mode_ == Mode::Preallocated) {
// In preallocated mode, we retain a reference so the packet can be recycled.
inflight_[p->stream_packet_.payload_offset] = p;
}
return p;
}
fit::result<void, std::string> CapturePacketQueue::PushPending(size_t offset_frames,
size_t num_frames,
CaptureAtCallback callback) {
TRACE_INSTANT("audio", "CapturePacketQueue::PushPending", TRACE_SCOPE_THREAD);
FX_CHECK(mode_ == Mode::DynamicallyAllocated);
// Buffers submitted by clients must exist entirely within the shared payload buffer, and must
// have at least some payloads in them.
uint64_t offset_frames_end = static_cast<uint64_t>(offset_frames) + num_frames;
if (!num_frames || (offset_frames_end > payload_buffer_frames_)) {
return fit::error(
fxl::StringPrintf("cannot push buffer range { offset = %lu, num_frames = %lu } into shared "
"buffer with %lu frames",
offset_frames, num_frames, payload_buffer_frames_));
}
auto p = Alloc(offset_frames, num_frames, std::move(callback));
if (!p) {
return fit::error(fxl::StringPrintf(
"packet queue is too large; exceeded limit after %lu packets", kMaxPackets));
}
std::lock_guard<std::mutex> lock(mutex_);
if (!shutdown_) {
pending_.push_back(p);
pending_signal_.notify_all();
}
return fit::ok();
}
fit::result<void, std::string> CapturePacketQueue::Recycle(const StreamPacket& stream_packet) {
TRACE_INSTANT("audio", "CapturePacketQueue::Recycle", TRACE_SCOPE_THREAD);
FX_CHECK(mode_ == Mode::Preallocated);
std::lock_guard<std::mutex> lock(mutex_);
if (shutdown_) {
return fit::ok();
}
auto it = inflight_.find(stream_packet.payload_offset);
if (it == inflight_.end()) {
return fit::error(
fxl::StringPrintf("could not release unknown packet with payload_offset = %lu",
stream_packet.payload_offset));
}
auto p = it->second;
if (p->stream_packet_.payload_buffer_id != stream_packet.payload_buffer_id ||
p->stream_packet_.payload_offset != stream_packet.payload_offset ||
p->stream_packet_.payload_size != stream_packet.payload_size) {
return fit::error(fxl::StringPrintf(
"could not release packet with payload { buffer_id = %u, offset = %lu, size = %lu }, "
"expected packet with payload { buffer_id = %u, offset = %lu, size = %lu }",
stream_packet.payload_buffer_id, stream_packet.payload_offset, stream_packet.payload_size,
p->stream_packet_.payload_buffer_id, p->stream_packet_.payload_offset,
p->stream_packet_.payload_size));
}
// Move from inflight to pending.
p->Reset();
pending_.push_back(p);
pending_signal_.notify_all();
inflight_.erase(it);
return fit::ok();
}
void CapturePacketQueue::Shutdown() {
std::lock_guard<std::mutex> lock(mutex_);
shutdown_ = true;
pending_signal_.notify_all();
}
namespace {
// WaitForPendingPacket uses std::unique_lock as required by std::condition_variable::wait.
// Unfortunately, std::unique_lock supports optional locking, which is not supported by clang's
// thread annotations. We use std::unique_lock in a purely scoped way, which is supported,
// so we wrap it with a simple class annotated as a scoped lock.
class FXL_SCOPED_LOCKABLE scoped_unique_lock : public std::unique_lock<std::mutex> {
public:
explicit scoped_unique_lock(std::mutex& m) FXL_ACQUIRE(m) : std::unique_lock<std::mutex>(m) {}
~scoped_unique_lock() FXL_RELEASE() { std::unique_lock<std::mutex>::~unique_lock(); }
};
} // namespace
void CapturePacketQueue::WaitForPendingPacket() {
TRACE_DURATION("audio", "CapturePacketQueue::WaitForPendingPacket");
scoped_unique_lock lock(mutex_);
while (!shutdown_ && pending_.empty()) {
pending_signal_.wait(lock);
}
}
} // namespace media::audio