// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include <lib/fpromise/result.h>
#include <lib/fzl/vmo-mapper.h>
#include <lib/syslog/cpp/macros.h>
#include <condition_variable>
#include <deque>
#include <memory>
#include <mutex>
#include <unordered_map>
#include <fbl/ref_counted.h>
#include <fbl/ref_ptr.h>
#include <fbl/slab_allocator.h>
#include "src/lib/fxl/synchronization/thread_annotations.h"
#include "src/media/audio/lib/format/format.h"
namespace media::audio {
// This queue has two states:
// Initially, a packet is pushed onto a "pending" queue. The mixer will write to the first
// packet in the pending queue, and once the mixer is done, it will move that packet to the
// "ready" queue, meaning the packet is now ready to be sent to the client. The FIDL loop
// will pop packets from the ready queue as they become available.
// Packets are pushed onto the pending queue in one of two ways:
// 1. For preallocated queues, the pending queue is prepopulated automatically. After
// a packet is popped from the ready queue, it can be added back to the pending queue
// by Recycle().
// 2. For dynamically allocated queues, packets must be explicitly pushed onto the
// pending queue, and once the packet is popped from the ready queue, the caller
// takes permanent ownership.
// This class is thread safe.
class CapturePacketQueue {
using CaptureAtCallback = fuchsia::media::AudioCapturer::CaptureAtCallback;
using StreamPacket = fuchsia::media::StreamPacket;
class Packet;
using AllocatorTraits = ::fbl::SlabAllocatorTraits<fbl::RefPtr<Packet>>;
using Allocator = fbl::SlabAllocator<AllocatorTraits>;
// This class is thread safe.
class Packet : public fbl::SlabAllocated<AllocatorTraits>, public fbl::RefCounted<Packet> {
Packet(CaptureAtCallback cb, size_t nf, size_t pbo, char* pbs)
: callback_(std::move(cb)),
payload_buffer_start_(pbs) {}
const CaptureAtCallback& callback() const { return callback_; }
const StreamPacket& stream_packet() const {
return stream_packet_;
zx::duration time_since_ready() const {
return zx::clock::get_monotonic() - ready_time_;
friend class CapturePacketQueue;
void Reset() FXL_EXCLUSIVE_LOCKS_REQUIRED(&CapturePacketQueue::mutex_) {
state_ = State();;
const CaptureAtCallback callback_;
const size_t num_frames_;
const size_t payload_buffer_offset_;
char* const payload_buffer_start_;
// This state is updated during mixing.
struct State {
int64_t capture_timestamp = fuchsia::media::NO_TIMESTAMP;
uint32_t flags = 0;
size_t filled_frames = 0;
State state_ FXL_GUARDED_BY(&CapturePacketQueue::mutex_);
// These are set when the packet is moved from the pending queue to the ready queue.
StreamPacket stream_packet_;
zx::time ready_time_;
std::atomic<bool> ready_;
// Create a packet queue where all available packets are preallocated. To use payload_buffer
// as a ring buffer, ensure that packets are recycled in the same order they are popped.
// It is illegal to call Push on the returned packet queue.
static fpromise::result<std::shared_ptr<CapturePacketQueue>, std::string> CreatePreallocated(
const fzl::VmoMapper& payload_buffer, const Format& format, size_t frames_per_packet);
// Create a packet queue where all packets will be dynamically allocated by Push.
// It is illegal to call Recycle on packets returned from this queue.
static std::shared_ptr<CapturePacketQueue> CreateDynamicallyAllocated(
const fzl::VmoMapper& payload_buffer, const Format& format);
// Report whether the pending and ready queues are both empty.
bool empty() const;
// Report the number of pending and ready packets.
size_t PendingSize() const;
size_t ReadySize() const;
// Start mixing the packet at the front of the mix queue.
// Returns nullopt if the queue is empty.
// The returned PacketMixState object contains bookkeeping information about the mix.
// The caller should update this state as necessary and pass that final updated state
// to FinishMixerJob once this mix operation is ready. If the mix operation only partially
// fills the packet, then the next call to NextMixerJob will return the same state that
// was passed to FinishMixerJob (except with an updated mix_target).
// For example, a typical usage might look like:
// while (true) {
// auto mix_state = pq->NextMixerJob();
// if (mix_state.capture_timestamp == NO_TIMESTAMP) {
// mix_state.capture_timestamp = current_timestamp;
// }
// if (mix_state.frames > max_mix_frames) {
// mix_state.frames = max_mix_frames;
// }
// mix(, mix_state.frames);
// pq->FinishMixerJob(mix_state);
// }
struct PacketMixState {
fbl::RefPtr<Packet> packet;
int64_t capture_timestamp = fuchsia::media::NO_TIMESTAMP;
uint32_t flags = 0;
void* target = nullptr;
size_t frames = 0;
std::optional<PacketMixState> NextMixerJob();
// Complete the job started by the last call to NextMixerJob().
enum class PacketMixStatus {
// If the packet was fully mixed, it will be moved from the pending queue to
// the back of the ready queue.
// If the packet was only partially mixed, we expect another call to NextMixerJob.
// The packet will be left at the front of the pending queue.
// If the packet was discarded by a concurrent call to DiscardPendingPackets,
// the packet will be left alone.
PacketMixStatus FinishMixerJob(const PacketMixState& state);
// Atomically move all packets from the pending queue to the ready queue.
void DiscardPendingPackets();
// Pop a packet from the ready queue. Returns nullptr if the ready queue is empty.
fbl::RefPtr<Packet> PopReady();
// Push a packet onto the the end of the pending queue.
// The queue must have been created with CreateDynamicallyAllocated.
// Returns an error if the packet is malformed.
fpromise::result<void, std::string> PushPending(size_t offset_frames, size_t num_frames,
CaptureAtCallback callback);
// Recycle a packet back onto the queue. The packet must have been previously
// returned by Pop and the queue must have been created with CreatePreallocated.
// Returns an error if stream_packet was not in flight.
fpromise::result<void, std::string> Recycle(const StreamPacket& stream_packet);
// Stop accepting packets. All further calls to PushPending and Recycle will be
// ignored, and NextMixerJob will return nullopt.
void Shutdown();
// Block until the pending queue is non-empty or the queue has been shut down.
void WaitForPendingPacket();
enum class Mode { Preallocated, DynamicallyAllocated };
// This needs to be public for std::make_unique.
CapturePacketQueue(Mode mode, const fzl::VmoMapper& payload_buffer, const Format& format);
fbl::RefPtr<Packet> Alloc(size_t frame_offset, size_t num_frames, CaptureAtCallback callback);
void PopPendingLocked() FXL_EXCLUSIVE_LOCKS_REQUIRED(mutex_);
const Mode mode_;
const fzl::VmoMapper& payload_buffer_;
const size_t payload_buffer_frames_;
const Format format_;
Allocator allocator_;
mutable std::mutex mutex_;
// Set by Shutdown.
bool shutdown_ FXL_GUARDED_BY(mutex_) = false;
// Pending and ready queues.
std::deque<fbl::RefPtr<Packet>> pending_ FXL_GUARDED_BY(mutex_);
std::deque<fbl::RefPtr<Packet>> ready_ FXL_GUARDED_BY(mutex_);
std::condition_variable pending_signal_ FXL_GUARDED_BY(mutex_);
// Mapping from payload_offset to packet, for packets that have been popped from ready_.
// These packets will be returned to pending_ by Recycle().
// For mode_ == Preallocated only.
std::unordered_map<uint64_t, fbl::RefPtr<Packet>> inflight_ FXL_GUARDED_BY(mutex_);
} // namespace media::audio