blob: c0070087f80483e71d3c42e19b025fc4fcf0b89b [file] [log] [blame]
// Copyright 2021 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "src/media/audio/audio_core/stream.h"
#include <lib/syslog/cpp/macros.h>
#include <mutex>
#include "src/media/audio/audio_core/logging_flags.h"
#include "src/media/audio/audio_core/mixer/intersect.h"
#include "src/media/audio/lib/format/constants.h"
namespace {
// "ReadLock [start_frame, end_frame) stream_name (this-ptr)"
#define VERBOSE_READ_LOCK_PREFIX \
ffl::String::DecRational << "ReadLock [" << dest_frame << ", " \
<< Fixed(dest_frame + Fixed(frame_count)) << ") " << name() << "(" \
<< static_cast<void*>(this) << ")"
// "Trim [frame] stream_name (this-ptr)"
#define VERBOSE_TRIM_PREFIX \
ffl::String::DecRational << "Trim [" << dest_frame << "] " << name() << "(" \
<< static_cast<void*>(this) << ")"
} // namespace
namespace media::audio {
ReadableStream::ReadableStream(std::string n, Format format)
: BaseStream(std::move(n), format),
name_for_read_lock_(std::string(name()) + "::ReadLock"),
name_for_trim_(std::string(name()) + "::Trim") {}
std::optional<ReadableStream::Buffer> ReadableStream::ReadLock(ReadLockContext& ctx,
Fixed dest_frame,
int64_t frame_count) {
TRACE_DURATION("audio", name_for_read_lock_.c_str(), "dest_frame", dest_frame.Integral().Floor(),
"dest_frame.frac", dest_frame.Fraction().raw_value(), "frame_count", frame_count);
// Nested locks are not allowed.
FX_CHECK(!locked_) << VERBOSE_READ_LOCK_PREFIX << " already locked";
if constexpr (kLogReadLocks) {
VERBOSE_LOGS << VERBOSE_READ_LOCK_PREFIX;
}
DetectTimelineUpdate();
// Once a frame has been consumed, it cannot be locked again.
// We cannot travel backwards in time.
if (next_dest_frame_) {
FX_CHECK(dest_frame >= *next_dest_frame_)
<< VERBOSE_READ_LOCK_PREFIX
<< " travelled backwards in time; expected dest_frame >= " << *next_dest_frame_;
}
// Check if we can reuse a cached buffer.
if (auto out = ReadFromCachedBuffer(dest_frame, frame_count); out) {
if constexpr (kLogReadLocks) {
VERBOSE_LOGS << VERBOSE_READ_LOCK_PREFIX << " --> (cached) [" << out->start() << ", "
<< out->end() << "]";
}
return out;
}
cached_ = std::nullopt;
auto buffer = ReadLockImpl(ctx, dest_frame, frame_count);
if (!buffer) {
if constexpr (kLogReadLocks) {
VERBOSE_LOGS << VERBOSE_READ_LOCK_PREFIX << " --> null";
}
Trim(dest_frame + Fixed(frame_count));
return std::nullopt;
}
// Empty buffers should use std::nullopt.
FX_CHECK(buffer->length() > 0) << VERBOSE_READ_LOCK_PREFIX << " returned empty buffer ["
<< buffer->start() << ", " << buffer->end() << ")";
// See constraints defined in stream.h.
const Fixed buffer_lower_bound = dest_frame - Fixed(1);
const Fixed buffer_max_end = dest_frame + Fixed(frame_count);
if (buffer->cache_this_buffer_) {
// See comments for MakeCachedBuffer.
FX_CHECK(buffer->start() > buffer_lower_bound && buffer->start() < buffer_max_end)
<< VERBOSE_READ_LOCK_PREFIX << " returned out-of-range cached buffer [" << buffer->start()
<< ", " << buffer->end() << "), expected start > " << buffer_lower_bound << "&& start < "
<< buffer_max_end;
} else {
// See comments for MakeUncachedBuffer.
FX_CHECK(buffer->start() > buffer_lower_bound && buffer->end() <= buffer_max_end)
<< VERBOSE_READ_LOCK_PREFIX << " returned out-of-range uncached buffer [" << buffer->start()
<< ", " << buffer->end() << "), expected start > " << buffer_lower_bound
<< "&& end <= " << buffer_max_end;
FX_CHECK(buffer->length() <= frame_count)
<< VERBOSE_READ_LOCK_PREFIX << " returned too large uncached buffer [" << buffer->start()
<< ", " << buffer->end() << ")";
}
// Ready to lock this buffer.
if constexpr (kLogReadLocks) {
VERBOSE_LOGS << VERBOSE_READ_LOCK_PREFIX << " --> [" << buffer->start() << ", " << buffer->end()
<< ")";
}
locked_ = true;
if (!buffer->cache_this_buffer_) {
return buffer;
}
cached_ = std::move(buffer);
auto out = ReadFromCachedBuffer(dest_frame, frame_count);
FX_CHECK(out) << VERBOSE_READ_LOCK_PREFIX << " bad cached buffer [" << cached_->start() << ", "
<< cached_->end() << ")";
return out;
}
void ReadableStream::Trim(Fixed dest_frame) {
TRACE_DURATION("audio", name_for_trim_.c_str(), "frame", dest_frame.Integral().Floor(),
"frame.frac", dest_frame.Fraction().raw_value());
// Cannot be called while locked.
FX_CHECK(!locked_) << VERBOSE_TRIM_PREFIX << " already locked";
if constexpr (kLogTrims) {
VERBOSE_LOGS << VERBOSE_TRIM_PREFIX;
}
DetectTimelineUpdate();
// Advance the trim point.
if (!next_dest_frame_) {
next_dest_frame_ = dest_frame;
} else if (dest_frame <= *next_dest_frame_) {
return; // already trimmed past dest_frame
} else {
next_dest_frame_ = dest_frame;
}
// Hold onto the cached buffer until it's entirely trimmed. Once the cached buffer
// is trimmed, it's safe to discard the buffer and let TrimImpl discard any backing
// state that was referenced by the buffer.
if (cached_ && dest_frame < cached_->end()) {
return;
}
cached_ = std::nullopt;
TrimImpl(dest_frame);
}
std::optional<ReadableStream::Buffer> ReadableStream::ReadFromCachedBuffer(Fixed start_frame,
int64_t frame_count) {
if (!cached_) {
return std::nullopt;
}
// Check if the requested range intersects the cached buffer.
auto cached_packet = mixer::Packet{
.start = cached_->start(),
.length = cached_->length(),
.payload = cached_->payload(),
};
auto isect = mixer::IntersectPacket(format(), cached_packet, start_frame, frame_count);
if (!isect) {
return std::nullopt;
}
// Since we might be locking a subset of cached_, we can't return cached_ directly,
// Instead we return a proxy to cached_.
return MakeUncachedBuffer(isect->start, isect->length, isect->payload, cached_->usage_mask(),
cached_->total_applied_gain_db());
}
std::optional<ReadableStream::Buffer> ReadableStream::MakeCachedBuffer(
Fixed start_frame, int64_t frame_count, void* payload, StreamUsageMask usage_mask,
float total_applied_gain_db) {
// This buffer will be stored in cached_. It won't be returned to the ReadLock caller,
// instead we'll use ReadFromCachedBuffer to return a proxy to this buffer.
return ReadableStream::Buffer(start_frame, frame_count, payload, true /* use cache */, usage_mask,
total_applied_gain_db, [this](int64_t frames_consumed) mutable {
// Trim is handled by the proxy (see ReadFromCachedBuffer).
ReadUnlock();
});
}
std::optional<ReadableStream::Buffer> ReadableStream::MakeUncachedBuffer(
Fixed start_frame, int64_t frame_count, void* payload, StreamUsageMask usage_mask,
float total_applied_gain_db) {
return ReadableStream::Buffer(
start_frame, frame_count, payload, false /* don't use cache */, usage_mask,
total_applied_gain_db,
// Destructing this buffer unlocks the stream. Ensure the buffer
// holds a reference to this stream until it's unlocked.
[this, start_frame, stream = shared_from_this()](int64_t frames_consumed) {
locked_ = false;
Fixed trim_frame = start_frame + Fixed(frames_consumed);
if (frames_consumed > 0) {
previous_buffer_end_ = trim_frame;
};
Trim(trim_frame);
ReadUnlock();
});
}
std::optional<ReadableStream::Buffer> ReadableStream::ForwardBuffer(
std::optional<Buffer>&& buffer, std::optional<Fixed> start_frame) {
if (!buffer) {
return std::nullopt;
}
// Logically, we are passing `buffer` to the closure below. However, if we pass `buffer` directly,
// this creates a recursive type (ReadableStream::Buffer contains a dtor closure, which contains a
// ReadableStream::Buffer), which means the compiler cannot prove that the closure can be created
// without heap allocation. To break that circular type, we store the forwarded buffer in `this`.
forwarded_buffer_ = std::move(buffer);
auto buffer_start = start_frame ? *start_frame : buffer->start();
return ReadableStream::Buffer(
// Wrap the buffer with a proxy so we can be notified when the buffer is unlocked.
buffer_start, forwarded_buffer_->length(), forwarded_buffer_->payload(),
false /* don't cache */, forwarded_buffer_->usage_mask(),
forwarded_buffer_->total_applied_gain_db(),
// Destructing this proxy unlocks the stream. Ensure the proxy holds a reference
// to this stream AND to forwarded_buffer_ until the proxy is unlocked.
[this, buffer_start, stream = shared_from_this()](int64_t frames_consumed) mutable {
locked_ = false;
Fixed trim_frame = buffer_start + Fixed(frames_consumed);
if (frames_consumed > 0) {
previous_buffer_end_ = trim_frame;
};
// What is consumed from the proxy is also consumed from the forwarded buffer.
forwarded_buffer_->set_frames_consumed(frames_consumed);
// Destroy the forwarded buffer before calling Trim to ensure the source stream
// is unlocked before it is Trim'd.
forwarded_buffer_ = std::nullopt;
Trim(trim_frame);
ReadUnlock();
});
}
void ReadableStream::DetectTimelineUpdate() {
auto generation = ref_time_to_frac_presentation_frame().generation;
if (timeline_function_generation_ && generation == *timeline_function_generation_) {
return;
}
timeline_function_generation_ = generation;
// The presentation timeline has changed, so reset the stream. Ideally we'd reset
// the stream immediately after the timeline changes, however it's difficult to do
// that with our existing concurrency model, hence this polling approach.
next_dest_frame_ = std::nullopt;
previous_buffer_end_ = std::nullopt;
cached_ = std::nullopt;
}
} // namespace media::audio