blob: 4b5fc7845be36aac74d45cbe4c67cd2b7e3bc34c [file] [log] [blame]
// Copyright 2017 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "src/media/audio/audio_core/base_capturer.h"
#include <lib/fit/bridge.h>
#include <lib/fit/defer.h>
#include <lib/media/audio/cpp/types.h>
#include <lib/zx/clock.h>
#include <memory>
#include "src/media/audio/audio_core/audio_admin.h"
#include "src/media/audio/audio_core/audio_core_impl.h"
#include "src/media/audio/audio_core/audio_driver.h"
#include "src/media/audio/audio_core/reporter.h"
#include "src/media/audio/lib/clock/utils.h"
#include "src/media/audio/lib/logging/logging.h"
namespace media::audio {
namespace {
// To what extent should client-side under/overflows be logged? (A "client-side underflow" or
// "client-side overflow" refers to when part of a data section is discarded because its start
// timestamp had passed.) For each Capturer, we will log the first overflow. For subsequent
// occurrences, depending on audio_core's logging level, we throttle how frequently these are
// displayed. If log_level is set to TRACE or SPEW, all client-side overflows are logged -- at
// log_level -1: VLOG TRACE -- as specified by kCaptureOverflowTraceInterval. If set to INFO, we
// log less often, at log_level 1: INFO, throttling by factor kCaptureOverflowInfoInterval. If set
// to WARNING or higher, we throttle these even more, specified by kCaptureOverflowErrorInterval.
// To disable all logging of client-side overflows, set kLogCaptureOverflow to false.
//
// Note: by default we set NDEBUG builds to WARNING and DEBUG builds to INFO.
static constexpr bool kLogCaptureOverflow = true;
static constexpr uint16_t kCaptureOverflowTraceInterval = 1;
static constexpr uint16_t kCaptureOverflowInfoInterval = 10;
static constexpr uint16_t kCaptureOverflowErrorInterval = 100;
// Currently, the time we spend mixing must also be taken into account when reasoning about the
// capture fence duration. Today (before any attempt at optimization), a particularly heavy mix
// pass may take longer than 1.5 msec on a DEBUG build(!) on relevant hardware. The constant below
// accounts for this, with additional padding for safety.
const zx::duration kFenceTimePadding = zx::msec(3);
constexpr int64_t kMaxTimePerCapture = ZX_MSEC(50);
const Format kInitialFormat =
Format::Create({.sample_format = fuchsia::media::AudioSampleFormat::SIGNED_16,
.channels = 1,
.frames_per_second = 8000})
.take_value();
} // namespace
BaseCapturer::BaseCapturer(
std::optional<Format> format,
fidl::InterfaceRequest<fuchsia::media::AudioCapturer> audio_capturer_request, Context* context)
: AudioObject(Type::AudioCapturer),
binding_(this, std::move(audio_capturer_request)),
context_(*context),
mix_domain_(context_.threading_model().AcquireMixDomain()),
state_(State::WaitingForVmo),
min_fence_time_(zx::nsec(0)),
// Ideally, initialize this to the native configuration of our initially-bound source.
format_(kInitialFormat),
overflow_count_(0u),
partial_overflow_count_(0u) {
FX_DCHECK(mix_domain_);
REPORT(AddingCapturer(*this));
binding_.set_error_handler([this](zx_status_t status) { BeginShutdown(); });
source_links_.reserve(16u);
if (format) {
UpdateFormat(*format);
}
// For now, optimal clock is set as a clone of MONOTONIC. Ultimately this will be the clock of
// the device where the capturer is initially routed.
CreateOptimalReferenceClock();
EstablishDefaultReferenceClock();
zx_status_t status =
finish_buffers_wakeup_.Activate(context_.threading_model().FidlDomain().dispatcher(),
[this](WakeupEvent* event) -> zx_status_t {
FinishBuffersThunk();
return ZX_OK;
});
FX_DCHECK(status == ZX_OK) << "Failed to activate FinishBuffers wakeup signal";
}
BaseCapturer::~BaseCapturer() {
TRACE_DURATION("audio.debug", "BaseCapturer::~BaseCapturer");
REPORT(RemovingCapturer(*this));
}
void BaseCapturer::OnLinkAdded() { RecomputeMinFenceTime(); }
void BaseCapturer::UpdateState(State new_state) {
State old_state = state_.exchange(new_state);
OnStateChanged(old_state, new_state);
}
fit::promise<> BaseCapturer::Cleanup() {
TRACE_DURATION("audio.debug", "BaseCapturer::Cleanup");
// We need to stop all the async operations happening on the mix dispatcher. These components can
// only be touched on that thread, so post a task there to run that cleanup.
fit::bridge<> bridge;
auto nonce = TRACE_NONCE();
TRACE_FLOW_BEGIN("audio.debug", "BaseCapturer.capture_cleanup", nonce);
async::PostTask(mix_domain_->dispatcher(),
[this, completer = std::move(bridge.completer), nonce]() mutable {
TRACE_DURATION("audio.debug", "BaseCapturer.cleanup_thunk");
TRACE_FLOW_END("audio.debug", "BaseCapturer.capture_cleanup", nonce);
OBTAIN_EXECUTION_DOMAIN_TOKEN(token, mix_domain_);
CleanupFromMixThread();
completer.complete_ok();
});
// After CleanupFromMixThread is done, no more work will happen on the mix dispatch thread. We
// need to now ensure our finish_buffers signal is De-asserted.
return bridge.consumer.promise().then(
[this](fit::result<>&) { finish_buffers_wakeup_.Deactivate(); });
}
void BaseCapturer::CleanupFromMixThread() {
TRACE_DURATION("audio", "BaseCapturer::CleanupFromMixThread");
mix_wakeup_.Deactivate();
mix_timer_.Cancel();
mix_domain_ = nullptr;
UpdateState(State::Shutdown);
}
void BaseCapturer::BeginShutdown() {
context_.threading_model().FidlDomain().ScheduleTask(Cleanup().then([this](fit::result<>&) {
ReportStop();
context_.route_graph().RemoveCapturer(*this);
}));
}
void BaseCapturer::OnStateChanged(State old_state, State new_state) {
bool was_routable = StateIsRoutable(old_state);
bool is_routable = StateIsRoutable(new_state);
if (was_routable != is_routable) {
SetRoutingProfile(is_routable);
}
}
fit::result<std::shared_ptr<Mixer>, zx_status_t> BaseCapturer::InitializeSourceLink(
const AudioObject& source, std::shared_ptr<Stream> stream) {
TRACE_DURATION("audio", "BaseCapturer::InitializeSourceLink");
switch (state_.load()) {
// We are operational. Go ahead and add the input to our mix stage.
case State::OperatingSync:
case State::OperatingAsync:
case State::AsyncStopping:
case State::AsyncStoppingCallbackPending:
return fit::ok(mix_stage_->AddInput(std::move(stream)));
// If we are shut down, then I'm not sure why new links are being added, but
// just go ahead and reject this one. We will be going away shortly.
case State::Shutdown:
// If we have not received a VMO yet, then we are still waiting for the user
// to commit to a format. We should not be establishing links before the
// capturer is ready.
case State::WaitingForVmo:
return fit::error(ZX_ERR_BAD_STATE);
}
}
void BaseCapturer::CleanupSourceLink(const AudioObject& source, std::shared_ptr<Stream> stream) {
mix_stage_->RemoveInput(*stream);
}
void BaseCapturer::GetStreamType(GetStreamTypeCallback cbk) {
TRACE_DURATION("audio", "BaseCapturer::GetStreamType");
fuchsia::media::StreamType ret;
ret.encoding = fuchsia::media::AUDIO_ENCODING_LPCM;
ret.medium_specific.set_audio(format_.stream_type());
cbk(std::move(ret));
}
void BaseCapturer::AddPayloadBuffer(uint32_t id, zx::vmo payload_buf_vmo) {
TRACE_DURATION("audio", "BaseCapturer::AddPayloadBuffer");
if (id != 0) {
FX_LOGS(ERROR) << "Only buffer ID 0 is currently supported.";
BeginShutdown();
return;
}
FX_DCHECK(payload_buf_vmo.is_valid());
// If something goes wrong, hang up the phone and shutdown.
auto cleanup = fit::defer([this]() { BeginShutdown(); });
zx_status_t res;
State state = state_.load();
if (state != State::WaitingForVmo) {
FX_DCHECK(payload_buf_.start() != nullptr);
FX_DCHECK(payload_buf_.size() != 0);
FX_DCHECK(payload_buf_frames_ != 0);
FX_LOGS(ERROR) << "Bad state while assigning payload buffer "
<< "(state = " << static_cast<uint32_t>(state) << ")";
return;
}
FX_DCHECK(payload_buf_.start() == nullptr);
FX_DCHECK(payload_buf_.size() == 0);
FX_DCHECK(payload_buf_frames_ == 0);
size_t payload_buf_size;
res = payload_buf_vmo.get_size(&payload_buf_size);
if (res != ZX_OK) {
FX_PLOGS(ERROR, res) << "Failed to fetch payload buffer VMO size";
return;
}
constexpr uint64_t max_uint32 = std::numeric_limits<uint32_t>::max();
if ((payload_buf_size < format_.bytes_per_frame()) ||
(payload_buf_size > (max_uint32 * format_.bytes_per_frame()))) {
FX_LOGS(ERROR) << "Bad payload buffer VMO size (size = " << payload_buf_.size()
<< ", bytes per frame = " << format_.bytes_per_frame() << ")";
return;
}
REPORT(AddingCapturerPayloadBuffer(*this, id, payload_buf_size));
payload_buf_frames_ = static_cast<uint32_t>(payload_buf_size / format_.bytes_per_frame());
AUD_VLOG_OBJ(TRACE, this) << "payload buf -- size:" << payload_buf_size
<< ", frames:" << payload_buf_frames_
<< ", bytes/frame:" << format_.bytes_per_frame();
// Allocate our MixStage for mixing.
//
// TODO(39886): Limit this to something more reasonable than the entire user-provided VMO.
mix_stage_ = std::make_shared<MixStage>(format_, payload_buf_frames_,
clock_mono_to_fractional_dest_frames_);
// Map the VMO into our process.
res = payload_buf_.Map(payload_buf_vmo, /*offset=*/0, payload_buf_size,
/*map_flags=*/ZX_VM_PERM_READ | ZX_VM_PERM_WRITE);
if (res != ZX_OK) {
FX_PLOGS(ERROR, res) << "Failed to map payload buffer VMO";
return;
}
// Activate the dispatcher primitives we will use to drive the mixing process. Note we must call
// Activate on the WakeupEvent from the mix domain, but Signal can be called anytime, even before
// this Activate occurs.
async::PostTask(mix_domain_->dispatcher(), [this] {
OBTAIN_EXECUTION_DOMAIN_TOKEN(token, mix_domain_);
zx_status_t status =
mix_wakeup_.Activate(mix_domain_->dispatcher(), [this](WakeupEvent* event) -> zx_status_t {
OBTAIN_EXECUTION_DOMAIN_TOKEN(token, mix_domain_);
FX_DCHECK(event == &mix_wakeup_);
return Process();
});
if (status != ZX_OK) {
FX_PLOGS(ERROR, status) << "Failed activate mix WakeupEvent";
ShutdownFromMixDomain();
return;
}
});
// Next, select our output producer.
output_producer_ = OutputProducer::Select(format_.stream_type());
if (output_producer_ == nullptr) {
FX_LOGS(ERROR) << "Failed to select output producer";
return;
}
FX_DCHECK(context_.link_matrix().SourceLinkCount(*this) == 0u)
<< "No links should be established before a capturer has a payload buffer";
// Mark ourselves as routable now that we're fully configured. Although we might still fail to
// create links to audio sources, we have successfully configured this capturer's mode, so we are
// now in the OperatingSync state.
UpdateState(State::OperatingSync);
cleanup.cancel();
}
void BaseCapturer::RemovePayloadBuffer(uint32_t id) {
TRACE_DURATION("audio", "BaseCapturer::RemovePayloadBuffer");
FX_LOGS(ERROR) << "RemovePayloadBuffer is not currently supported.";
BeginShutdown();
}
void BaseCapturer::CaptureAt(uint32_t payload_buffer_id, uint32_t offset_frames,
uint32_t num_frames, CaptureAtCallback cbk) {
TRACE_DURATION("audio", "BaseCapturer::CaptureAt");
if (payload_buffer_id != 0) {
FX_LOGS(ERROR) << "payload_buffer_id must be 0 for now.";
return;
}
// If something goes wrong, hang up the phone and shutdown.
auto cleanup = fit::defer([this]() { BeginShutdown(); });
// It is illegal to call CaptureAt unless we are currently operating in
// synchronous mode.
State state = state_.load();
if (state != State::OperatingSync) {
FX_LOGS(ERROR) << "CaptureAt called while not operating in sync mode "
<< "(state = " << static_cast<uint32_t>(state) << ")";
return;
}
// Buffers submitted by clients must exist entirely within the shared payload buffer, and must
// have at least some payloads in them.
uint64_t buffer_end = static_cast<uint64_t>(offset_frames) + num_frames;
if (!num_frames || (buffer_end > payload_buf_frames_)) {
FX_LOGS(ERROR) << "Bad buffer range submitted. "
<< " offset " << offset_frames << " length " << num_frames
<< ". Shared buffer is " << payload_buf_frames_ << " frames long.";
return;
}
// Allocate bookkeeping to track this pending capture operation.
auto pending_capture_buffer =
PendingCaptureBuffer::Allocator::New(offset_frames, num_frames, std::move(cbk));
if (pending_capture_buffer == nullptr) {
FX_LOGS(ERROR) << "Failed to allocate pending capture buffer!";
return;
}
// Place the capture operation on the pending list.
bool wake_mixer;
{
std::lock_guard<std::mutex> pending_lock(pending_lock_);
wake_mixer = pending_capture_buffers_.is_empty();
pending_capture_buffers_.push_back(std::move(pending_capture_buffer));
}
// If the pending list was empty, we need to poke the mixer.
if (wake_mixer) {
mix_wakeup_.Signal();
}
ReportStart();
// Things went well. Cancel the cleanup timer and we are done.
cleanup.cancel();
}
void BaseCapturer::ReleasePacket(fuchsia::media::StreamPacket packet) {
TRACE_DURATION("audio", "BaseCapturer::ReleasePacket");
// TODO(43507): Implement.
FX_LOGS(ERROR) << "ReleasePacket not implemented yet.";
}
void BaseCapturer::DiscardAllPacketsNoReply() {
TRACE_DURATION("audio", "BaseCapturer::DiscardAllPacketsNoReply");
DiscardAllPackets(nullptr);
}
void BaseCapturer::DiscardAllPackets(DiscardAllPacketsCallback cbk) {
TRACE_DURATION("audio", "BaseCapturer::DiscardAllPackets");
// It is illegal to call Flush unless we are currently operating in
// synchronous mode.
State state = state_.load();
if (state != State::OperatingSync) {
FX_LOGS(ERROR) << "Flush called while not operating in sync mode "
<< "(state = " << static_cast<uint32_t>(state) << ")";
BeginShutdown();
return;
}
// Lock and move the contents of the finished list and pending list to a temporary list. Then
// deliver the flushed buffers back to the client and send an OnEndOfStream event.
//
// Note: the capture thread may currently be mixing frames for the buffer at the head of the
// pending queue, when the queue is cleared. The fact that these frames were mixed will not be
// reported to the client; however, the frames will be written to the shared payload buffer.
PcbList finished;
{
std::lock_guard<std::mutex> pending_lock(pending_lock_);
finished = std::move(finished_capture_buffers_);
finished.splice(finished.end(), pending_capture_buffers_);
}
if (!finished.is_empty()) {
FinishBuffers(finished);
binding_.events().OnEndOfStream();
}
ReportStop();
if (cbk != nullptr && binding_.is_bound()) {
cbk();
}
}
void BaseCapturer::StartAsyncCapture(uint32_t frames_per_packet) {
TRACE_DURATION("audio", "BaseCapturer::StartAsyncCapture");
auto cleanup = fit::defer([this]() { BeginShutdown(); });
// To enter Async mode, we must be in Synchronous mode and not have pending buffers in flight.
State state = state_.load();
if (state != State::OperatingSync) {
FX_LOGS(ERROR) << "Bad state while attempting to enter async capture mode "
<< "(state = " << static_cast<uint32_t>(state) << ")";
return;
}
bool queues_empty;
{
std::lock_guard<std::mutex> pending_lock(pending_lock_);
queues_empty = pending_capture_buffers_.is_empty() && finished_capture_buffers_.is_empty();
}
if (!queues_empty) {
FX_LOGS(ERROR) << "Attempted to enter async capture mode with capture buffers still in flight.";
return;
}
// Sanity check the number of frames per packet the user is asking for.
//
// Currently our minimum frames-per-packet is 1, which is absurdly low.
// TODO(13344): Decide on a proper minimum packet size, document it, and enforce the limit here.
if (frames_per_packet == 0) {
FX_LOGS(ERROR) << "Frames per packet may not be zero.";
return;
}
FX_DCHECK(payload_buf_frames_ > 0);
if (frames_per_packet > (payload_buf_frames_ / 2)) {
FX_LOGS(ERROR)
<< "There must be enough room in the shared payload buffer (" << payload_buf_frames_
<< " frames) to fit at least two packets of the requested number of frames per packet ("
<< frames_per_packet << " frames).";
return;
}
// Everything looks good...
// 1) Record the number of frames per packet we want to produce
// 2) Transition to the OperatingAsync state
// 3) Kick the work thread to get the ball rolling.
async_frames_per_packet_ = frames_per_packet;
UpdateState(State::OperatingAsync);
ReportStart();
mix_wakeup_.Signal();
cleanup.cancel();
}
void BaseCapturer::StopAsyncCaptureNoReply() {
TRACE_DURATION("audio", "BaseCapturer::StopAsyncCaptureNoReply");
StopAsyncCapture(nullptr);
}
void BaseCapturer::StopAsyncCapture(StopAsyncCaptureCallback cbk) {
TRACE_DURATION("audio", "BaseCapturer::StopAsyncCapture");
// To leave async mode, we must be (1) in Async mode or (2) already in Sync mode (in which case,
// there is really nothing to do but signal the callback if one was provided).
State state = state_.load();
if (state == State::OperatingSync) {
if (cbk != nullptr) {
cbk();
}
return;
}
if (state != State::OperatingAsync) {
FX_LOGS(ERROR) << "Bad state while attempting to stop async capture mode "
<< "(state = " << static_cast<uint32_t>(state) << ")";
BeginShutdown();
return;
}
// Stash our callback, transition to AsyncStopping, then poke the work thread to shut down.
FX_DCHECK(pending_async_stop_cbk_ == nullptr);
pending_async_stop_cbk_ = std::move(cbk);
ReportStop();
UpdateState(State::AsyncStopping);
mix_wakeup_.Signal();
}
void BaseCapturer::RecomputeMinFenceTime() {
TRACE_DURATION("audio", "BaseCapturer::RecomputeMinFenceTime");
zx::duration cur_min_fence_time{0};
context_.link_matrix().ForEachSourceLink(
*this, [&cur_min_fence_time](LinkMatrix::LinkHandle link) {
if (link.object->is_input()) {
const auto& device = static_cast<const AudioDevice&>(*link.object);
auto fence_time = device.driver()->fifo_depth_duration();
cur_min_fence_time = std::max(cur_min_fence_time, fence_time);
}
});
if (min_fence_time_ != cur_min_fence_time) {
FX_VLOGS(TRACE) << "Changing min_fence_time_ (ns) from " << min_fence_time_.get() << " to "
<< cur_min_fence_time.get();
REPORT(SettingCapturerMinFenceTime(*this, cur_min_fence_time));
min_fence_time_ = cur_min_fence_time;
}
}
// TODO(48598): Remove these and revert back to FX_DCHECK.
#define FXB_48598_DCHECK(level) FX_CHECK(level)
zx_status_t BaseCapturer::Process() {
TRACE_DURATION("audio", "BaseCapturer::Process");
uint32_t packets_produced = 0;
while (true) {
// Start by figure out what state we are currently in for this cycle.
bool async_mode = false;
switch (state_.load()) {
// If we are still waiting for a VMO, we should not be operating right now.
case State::WaitingForVmo:
FXB_48598_DCHECK(false);
ShutdownFromMixDomain();
return ZX_ERR_INTERNAL;
// If we are awakened while in the callback pending state, this is spurious wakeup: ignore it.
case State::AsyncStoppingCallbackPending:
return ZX_OK;
// If we were operating in async mode, but we have been asked to stop, do so now.
case State::AsyncStopping:
DoStopAsyncCapture();
return ZX_OK;
case State::OperatingSync:
async_mode = false;
break;
case State::OperatingAsync:
async_mode = true;
break;
case State::Shutdown:
// This should be impossible. If the main message loop thread shut us down, then it should
// have shut down our mix timer before setting the state_ variable to Shutdown.
FX_CHECK(false);
return ZX_ERR_INTERNAL;
}
// Look at the head of the queue, determine our payload buffer position, and get to work.
void* mix_target = nullptr;
uint32_t mix_frames;
uint32_t buffer_sequence_number;
{
std::lock_guard<std::mutex> pending_lock(pending_lock_);
if (!pending_capture_buffers_.is_empty()) {
auto& p = pending_capture_buffers_.front();
// This should have been established by CaptureAt; it had better still be true.
FXB_48598_DCHECK((static_cast<uint64_t>(p.offset_frames) + p.num_frames) <=
payload_buf_frames_);
FXB_48598_DCHECK(p.filled_frames < p.num_frames);
// If we don't know our timeline transformation, then the next buffer we produce is
// guaranteed to be discontinuous relative to the previous one (if any).
if (!clock_mono_to_fractional_dest_frames_->get().first.invertible()) {
p.flags |= fuchsia::media::STREAM_PACKET_FLAG_DISCONTINUITY;
}
// If we are running, there is no way our shared buffer can get stolen out from under us.
FXB_48598_DCHECK(payload_buf_.start() != nullptr);
uint64_t offset_bytes =
format_.bytes_per_frame() * static_cast<uint64_t>(p.offset_frames + p.filled_frames);
mix_target = reinterpret_cast<void*>(reinterpret_cast<uintptr_t>(payload_buf_.start()) +
offset_bytes);
mix_frames = p.num_frames - p.filled_frames;
buffer_sequence_number = p.sequence_number;
} else {
if (state_.load() == State::OperatingSync) {
ReportStop();
}
}
}
// If there was nothing in our pending capture buffer queue, then one of two things is true:
//
// 1) We are operating in synchronous mode and our user is not supplying buffers fast enough.
// 2) We are starting up in asynchronous mode and have not queued our first buffer yet.
//
// Either way, invalidate the frames_to_clock_mono transformation and make sure we don't have a
// wakeup timer pending. Then, if we are in synchronous mode, simply get out. If we are in
// asynchronous mode, reset our async ring buffer state, add a new pending capture buffer to the
// queue, and restart the main Process loop.
if (mix_target == nullptr) {
clock_mono_to_fractional_dest_frames_->Update(TimelineFunction());
frame_count_ = 0;
mix_timer_.Cancel();
if (!async_mode) {
return ZX_OK;
}
// If we cannot queue a new pending buffer, it is a fatal error. Simply return instead of
// trying again, as we are now shutting down.
async_next_frame_offset_ = 0;
if (!QueueNextAsyncPendingBuffer()) {
// If this fails, QueueNextAsyncPendingBuffer should have already shut us down. Assert this.
FXB_48598_DCHECK(state_.load() == State::Shutdown);
return ZX_ERR_INTERNAL;
}
continue;
}
// Establish the transform from capture frames to clock monotonic, if we haven't already.
//
// Ideally, if there were only one capture source and our frame rates match, we would align our
// start time exactly with a source sample boundary.
auto now = zx::clock::get_monotonic();
if (!clock_mono_to_fractional_dest_frames_->get().first.invertible()) {
// Ideally a timeline function could alter offsets without also recalculating the scale
// factor. Then we could re-establish this function without re-reducing the fps-to-nsec rate.
// Since we supply a rate that is already reduced, this should go pretty quickly.
clock_mono_to_fractional_dest_frames_->Update(
TimelineFunction(FractionalFrames<int64_t>(frame_count_).raw_value(), now.get(),
fractional_dest_frames_to_clock_mono_rate().Inverse()));
}
// Limit our job size to our max job size.
if (mix_frames > max_frames_per_capture_) {
mix_frames = max_frames_per_capture_;
}
// Figure out when we can finish the job. If in the future, wait until then.
zx::time last_frame_time =
zx::time(clock_mono_to_fractional_dest_frames_->get().first.Inverse().Apply(
FractionalFrames<int64_t>(frame_count_ + mix_frames).raw_value()));
if (last_frame_time.get() == TimelineRate::kOverflow) {
FX_LOGS(ERROR) << "Fatal timeline overflow in capture mixer, shutting down capture.";
ShutdownFromMixDomain();
return ZX_ERR_INTERNAL;
}
if (last_frame_time > now) {
// TODO(40183): We should not assume anything about fence times for our sources. Instead, we
// should heed the actual reported fence times (FIFO depth), and the arrivals and departures
// of sources, and update this number dynamically.
//
// Additionally, we must be mindful that if a newly-arriving source causes our "fence time" to
// increase, we will wake up early. At wakeup time, we need to be able to detect this case and
// sleep a bit longer before mixing.
zx::time next_mix_time = last_frame_time + min_fence_time_ + kFenceTimePadding;
zx_status_t status = mix_timer_.PostForTime(mix_domain_->dispatcher(), next_mix_time);
if (status != ZX_OK) {
FX_PLOGS(ERROR, status) << "Failed to schedule capturer mix";
ShutdownFromMixDomain();
return ZX_ERR_INTERNAL;
}
return ZX_OK;
}
// Mix the requested number of frames from sources to intermediate buffer, then into output.
auto buf = mix_stage_->LockBuffer(now, frame_count_, mix_frames);
FXB_48598_DCHECK(buf);
FXB_48598_DCHECK(buf->start().Floor() == frame_count_);
FXB_48598_DCHECK(buf->length().Floor() == mix_frames);
if (!buf) {
ShutdownFromMixDomain();
return ZX_ERR_INTERNAL;
}
FXB_48598_DCHECK(output_producer_ != nullptr);
output_producer_->ProduceOutput(reinterpret_cast<float*>(buf->payload()), mix_target,
mix_frames);
// Update the pending buffer in progress. If finished, return it to the user. If flushed (no
// pending packet, or queue head was different from what we were working on), just move on.
bool buffer_finished = false;
bool wakeup_service_thread = false;
{
std::lock_guard<std::mutex> pending_lock(pending_lock_);
if (!pending_capture_buffers_.is_empty()) {
auto& p = pending_capture_buffers_.front();
if (buffer_sequence_number == p.sequence_number) {
// Update the filled status of the buffer.
p.filled_frames += mix_frames;
FXB_48598_DCHECK(p.filled_frames <= p.num_frames);
// Assign a timestamp if one has not already been assigned.
if (p.capture_timestamp == fuchsia::media::NO_TIMESTAMP) {
auto [clock_mono_to_fractional_dest_frames, _] =
clock_mono_to_fractional_dest_frames_->get();
FXB_48598_DCHECK(clock_mono_to_fractional_dest_frames.invertible());
p.capture_timestamp = clock_mono_to_fractional_dest_frames.Inverse().Apply(
FractionalFrames<int64_t>(frame_count_).raw_value());
}
// If we filled the entire buffer, put it in the queue to be returned to the user.
buffer_finished = p.filled_frames >= p.num_frames;
if (buffer_finished) {
wakeup_service_thread = finished_capture_buffers_.is_empty();
if (wakeup_service_thread) {
finish_buffers_signal_time_ = now;
}
finished_capture_buffers_.push_back(pending_capture_buffers_.pop_front());
if (++packets_produced % 20 == 0) {
FX_LOGS_FIRST_N(WARNING, 100) << "Process producing a lot of packets "
<< packets_produced << " frame_count_ " << frame_count_;
}
}
} else {
// It looks like we were flushed while we were mixing. Invalidate our timeline function,
// we will re-establish it and flag a discontinuity next time we have work to do.
clock_mono_to_fractional_dest_frames_->Update(
TimelineFunction(FractionalFrames<int64_t>(frame_count_).raw_value(), now.get(),
fractional_dest_frames_to_clock_mono_rate().Inverse()));
}
}
}
// Update the total number of frames we have mixed so far.
frame_count_ += mix_frames;
// If we need to poke the service thread, do so.
if (wakeup_service_thread) {
finish_buffers_wakeup_.Signal();
}
// If in async mode, and we just finished a buffer, queue a new pending buffer (or die trying).
if (buffer_finished && async_mode && !QueueNextAsyncPendingBuffer()) {
// If this fails, QueueNextAsyncPendingBuffer should have already shut us down. Assert this.
FXB_48598_DCHECK(state_.load() == State::Shutdown);
return ZX_ERR_INTERNAL;
}
} // while (true)
}
#undef FXB_48598_DCHECK
void BaseCapturer::OverflowOccurred(FractionalFrames<int64_t> frac_source_start,
FractionalFrames<int64_t> frac_source_mix_point,
zx::duration overflow_duration) {
TRACE_INSTANT("audio", "BaseCapturer::OverflowOccurred", TRACE_SCOPE_PROCESS);
uint16_t overflow_count = std::atomic_fetch_add<uint16_t>(&overflow_count_, 1u);
if constexpr (kLogCaptureOverflow) {
auto overflow_msec = static_cast<double>(overflow_duration.to_nsecs()) / ZX_MSEC(1);
std::ostringstream stream;
stream << "CAPTURE OVERFLOW #" << overflow_count + 1 << " (1/" << kCaptureOverflowErrorInterval
<< "): source-start " << frac_source_start.raw_value() << " missed mix-point "
<< frac_source_mix_point.raw_value() << " by " << std::setprecision(4) << overflow_msec
<< " ms";
if ((kCaptureOverflowErrorInterval > 0) &&
(overflow_count % kCaptureOverflowErrorInterval == 0)) {
FX_LOGS(ERROR) << stream.str();
} else if ((kCaptureOverflowInfoInterval > 0) &&
(overflow_count % kCaptureOverflowInfoInterval == 0)) {
FX_LOGS(INFO) << stream.str();
} else if ((kCaptureOverflowTraceInterval > 0) &&
(overflow_count % kCaptureOverflowTraceInterval == 0)) {
FX_VLOGS(TRACE) << stream.str();
}
}
}
void BaseCapturer::PartialOverflowOccurred(FractionalFrames<int64_t> frac_source_offset,
int64_t dest_mix_offset) {
TRACE_INSTANT("audio", "BaseCapturer::PartialOverflowOccurred", TRACE_SCOPE_PROCESS);
// Slips by less than four source frames do not necessarily indicate overflow. A slip of this
// duration can be caused by the round-to-nearest-dest-frame step, when our rate-conversion
// ratio is sufficiently large (it can be as large as 4:1).
if (frac_source_offset.Absolute() >= 4) {
uint16_t partial_overflow_count = std::atomic_fetch_add<uint16_t>(&partial_overflow_count_, 1u);
if constexpr (kLogCaptureOverflow) {
std::ostringstream stream;
stream << "CAPTURE SLIP #" << partial_overflow_count + 1 << " (1/"
<< kCaptureOverflowErrorInterval << "): shifting by "
<< (frac_source_offset < 0 ? "-0x" : "0x") << std::hex
<< frac_source_offset.Absolute().raw_value() << " source subframes (" << std::dec
<< frac_source_offset.Floor() << " frames) and " << dest_mix_offset
<< " mix (capture) frames";
if ((kCaptureOverflowErrorInterval > 0) &&
(partial_overflow_count % kCaptureOverflowErrorInterval == 0)) {
FX_LOGS(ERROR) << stream.str();
} else if ((kCaptureOverflowInfoInterval > 0) &&
(partial_overflow_count % kCaptureOverflowInfoInterval == 0)) {
FX_LOGS(INFO) << stream.str();
} else if ((kCaptureOverflowTraceInterval > 0) &&
(partial_overflow_count % kCaptureOverflowTraceInterval == 0)) {
FX_VLOGS(TRACE) << stream.str();
}
}
} else {
if constexpr (kLogCaptureOverflow) {
FX_VLOGS(TRACE) << "Slipping by " << dest_mix_offset
<< " mix (capture) frames to align with source region";
}
}
}
void BaseCapturer::DoStopAsyncCapture() {
TRACE_DURATION("audio", "BaseCapturer::DoStopAsyncCapture");
// If this is being called, we had better be in the async stopping state.
FX_DCHECK(state_.load() == State::AsyncStopping);
// Finish all pending buffers. We should have at most one pending buffer. Don't bother to move an
// empty buffer into the finished queue. If there are any buffers in the finished queue waiting to
// be sent back to the user, make sure that the last one is flagged as the end of stream.
{
std::lock_guard<std::mutex> pending_lock(pending_lock_);
if (!pending_capture_buffers_.is_empty()) {
auto buf = pending_capture_buffers_.pop_front();
// When we are in async mode, the Process method will attempt to keep
// exactly one capture buffer in flight at all times, and never any more.
// If we just popped that one buffer from the pending queue, we should be
// able to DCHECK that the queue is now empty.
FX_CHECK(pending_capture_buffers_.is_empty());
if (buf->filled_frames > 0) {
finished_capture_buffers_.push_back(std::move(buf));
}
}
}
// Invalidate our clock transformation (our next packet will be discontinuous)
clock_mono_to_fractional_dest_frames_->Update(TimelineFunction());
// If we had a timer set, make sure that it is canceled. There is no point in
// having it armed right now as we are in the process of stopping.
mix_timer_.Cancel();
// Transition to the AsyncStoppingCallbackPending state, and signal the
// service thread so it can complete the stop operation.
UpdateState(State::AsyncStoppingCallbackPending);
async::PostTask(context_.threading_model().FidlDomain().dispatcher(),
[this]() { FinishAsyncStopThunk(); });
}
bool BaseCapturer::QueueNextAsyncPendingBuffer() {
TRACE_DURATION("audio", "BaseCapturer::QueueNextAsyncPendingBuffer");
// Sanity check our async offset bookkeeping.
FX_DCHECK(async_next_frame_offset_ < payload_buf_frames_);
FX_DCHECK(async_frames_per_packet_ <= (payload_buf_frames_ / 2));
FX_DCHECK(async_next_frame_offset_ <= (payload_buf_frames_ - async_frames_per_packet_));
// Allocate bookkeeping to track this pending capture operation. If we cannot
// allocate a new pending capture buffer, it is a fatal error and we need to
// start the process of shutting down.
auto pending_capture_buffer = PendingCaptureBuffer::Allocator::New(
async_next_frame_offset_, async_frames_per_packet_, nullptr);
if (pending_capture_buffer == nullptr) {
FX_LOGS(ERROR) << "Failed to allocate pending capture buffer during async capture mode!";
ShutdownFromMixDomain();
return false;
}
// Update our next frame offset. If the new position of the next frame offset
// does not leave enough room to produce another contiguous payload for our
// user, reset the next frame offset to zero. We made sure that we have space
// for at least two contiguous payload buffers when we started, so the worst
// case is that we will end up ping-ponging back and forth between two payload
// buffers located at the start of our shared buffer.
async_next_frame_offset_ += async_frames_per_packet_;
uint32_t next_frame_end = async_next_frame_offset_ + async_frames_per_packet_;
if (next_frame_end > payload_buf_frames_) {
async_next_frame_offset_ = 0;
}
// Queue the pending buffer and signal success.
{
std::lock_guard<std::mutex> pending_lock(pending_lock_);
pending_capture_buffers_.push_back(std::move(pending_capture_buffer));
}
return true;
}
void BaseCapturer::ShutdownFromMixDomain() {
TRACE_DURATION("audio", "BaseCapturer::ShutdownFromMixDomain");
async::PostTask(context_.threading_model().FidlDomain().dispatcher(),
[this]() { BeginShutdown(); });
}
void BaseCapturer::FinishAsyncStopThunk() {
TRACE_DURATION("audio", "BaseCapturer::FinishAsyncStopThunk");
// Do nothing if we were shutdown between the time that this message was
// posted to the main message loop and the time that we were dispatched.
if (state_.load() == State::Shutdown) {
return;
}
// Start by sending back all of our completed buffers. Finish up by sending
// an OnEndOfStream event.
PcbList finished;
{
std::lock_guard<std::mutex> pending_lock(pending_lock_);
FX_DCHECK(pending_capture_buffers_.is_empty());
finished = std::move(finished_capture_buffers_);
}
if (!finished.is_empty()) {
FinishBuffers(finished);
}
binding_.events().OnEndOfStream();
// If we have a valid callback to make, call it now.
if (pending_async_stop_cbk_ != nullptr) {
pending_async_stop_cbk_();
pending_async_stop_cbk_ = nullptr;
}
// All done! Transition back to the OperatingSync state.
ReportStop();
UpdateState(State::OperatingSync);
}
void BaseCapturer::FinishBuffersThunk() {
TRACE_DURATION("audio", "BaseCapturer::FinishBuffersThunk");
// Do nothing if we were shutdown between the time that this message was
// posted to the main message loop and the time that we were dispatched.
if (state_.load() == State::Shutdown) {
return;
}
PcbList finished;
zx::time signal_time;
{
std::lock_guard<std::mutex> pending_lock(pending_lock_);
finished = std::move(finished_capture_buffers_);
signal_time = finish_buffers_signal_time_;
finish_buffers_signal_time_ = zx::time(0);
}
auto now = zx::clock::get_monotonic();
auto time_to_schedule = now - signal_time;
if (signal_time != zx::time(0) && time_to_schedule > zx::msec(500)) {
FX_LOGS(WARNING) << "FinishBuffersThunk took " << time_to_schedule.to_msecs()
<< "ms to schedule";
}
FinishBuffers(finished);
}
void BaseCapturer::FinishBuffers(const PcbList& finished_buffers) {
TRACE_DURATION("audio", "BaseCapturer::FinishBuffers");
if (finished_buffers.size() > 50) {
FX_LOGS_FIRST_N(WARNING, 100) << "Finishing large batch of capture buffers: "
<< finished_buffers.size();
}
for (const auto& finished_buffer : finished_buffers) {
// If there is no callback tied to this buffer (meaning that it was generated while operating in
// async mode), and it is not filled at all, just skip it.
if ((finished_buffer.cbk == nullptr) && !finished_buffer.filled_frames) {
continue;
}
fuchsia::media::StreamPacket pkt;
pkt.pts = finished_buffer.capture_timestamp;
pkt.flags = finished_buffer.flags;
pkt.payload_buffer_id = 0u;
pkt.payload_offset = finished_buffer.offset_frames * format_.bytes_per_frame();
pkt.payload_size = finished_buffer.filled_frames * format_.bytes_per_frame();
REPORT(SendingCapturerPacket(*this, pkt));
if (finished_buffer.cbk != nullptr) {
AUD_VLOG_OBJ(SPEW, this) << "Sync -mode -- payload size:" << pkt.payload_size
<< " bytes, offset:" << pkt.payload_offset
<< " bytes, flags:" << pkt.flags << ", pts:" << pkt.pts;
finished_buffer.cbk(pkt);
} else {
AUD_VLOG_OBJ(SPEW, this) << "Async-mode -- payload size:" << pkt.payload_size
<< " bytes, offset:" << pkt.payload_offset
<< " bytes, flags:" << pkt.flags << ", pts:" << pkt.pts;
binding_.events().OnPacketProduced(pkt);
}
}
}
void BaseCapturer::UpdateFormat(Format format) {
TRACE_DURATION("audio", "BaseCapturer::UpdateFormat");
// Record our new format.
FX_DCHECK(state_.load() == State::WaitingForVmo);
format_ = format;
// Pre-compute the ratio between frames and clock mono ticks. Also figure out
// the maximum number of frames we are allowed to mix and capture at a time.
//
// Some sources (like AudioOutputs) have a limited amount of time which they
// are able to hold onto data after presentation. We need to wait until after
// presentation time to capture these frames, but if we batch up too much
// work, then the AudioOutput may have overwritten the data before we decide
// to get around to capturing it. Limiting our maximum number of frames of to
// capture to be less than this amount of time prevents this issue.
int64_t tmp;
tmp = dest_frames_to_clock_mono_rate().Inverse().Scale(kMaxTimePerCapture);
max_frames_per_capture_ = static_cast<uint32_t>(tmp);
FX_DCHECK(tmp <= std::numeric_limits<uint32_t>::max());
FX_DCHECK(max_frames_per_capture_ > 0);
}
// Eventually, we'll set the optimal clock according to the source where it is initially routed.
// For now, we just clone CLOCK_MONOTONIC.
void BaseCapturer::CreateOptimalReferenceClock() {
TRACE_DURATION("audio", "BaseCapturer::CreateOptimalReferenceClock");
auto status =
zx::clock::create(ZX_CLOCK_OPT_MONOTONIC | ZX_CLOCK_OPT_CONTINUOUS | ZX_CLOCK_OPT_AUTO_START,
nullptr, &optimal_clock_);
FX_DCHECK(status == ZX_OK) << "Could not create an AUTOSTART clock for optimal clock";
}
// For now, we supply the optimal clock as the default: we know it is a clone of MONOTONIC.
// When we switch optimal clock to device clock, the default must still be a clone of MONOTONIC.
// In long-term, use the optimal clock by default.
void BaseCapturer::EstablishDefaultReferenceClock() {
TRACE_DURATION("audio", "BaseCapturer::EstablishDefaultReferenceClock");
auto status = DuplicateClock(optimal_clock_, &reference_clock_);
FX_DCHECK(status == ZX_OK) << "Could not duplicate the optimal clock";
}
// Regardless of the source of the reference clock, we can duplicate and return it here.
void BaseCapturer::GetReferenceClock(GetReferenceClockCallback callback) {
TRACE_DURATION("audio", "BaseCapturer::GetReferenceClock");
AUD_VLOG_OBJ(TRACE, this);
auto cleanup = fit::defer([this]() { BeginShutdown(); });
zx::clock dupe_clock_for_client;
auto status = DuplicateClock(reference_clock_, &dupe_clock_for_client);
if (status != ZX_OK) {
FX_PLOGS(ERROR, status) << "Could not duplicate the current reference clock handle";
return;
}
callback(std::move(dupe_clock_for_client));
cleanup.cancel();
}
} // namespace media::audio