blob: 11583772a9f229b55d30ea2552e7ba99e4f1ccdf [file] [log] [blame]
// Copyright 2022 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "src/media/audio/audio_core/v2/stream_sink_server.h"
#include <lib/fit/defer.h>
#include <lib/syslog/cpp/macros.h>
#include <lib/trace/event.h>
#include "src/media/audio/services/common/logging.h"
namespace media_audio {
using fuchsia_audio::wire::Timestamp;
using fuchsia_media2::ConsumerClosedReason;
// static
std::shared_ptr<StreamSinkServer> StreamSinkServer::Create(
std::shared_ptr<const FidlThread> thread, fidl::ServerEnd<fuchsia_audio::StreamSink> server_end,
Args args) {
return BaseFidlServer::Create(std::move(thread), std::move(server_end), std::move(args));
}
StreamSinkServer::StreamSinkServer(Args args)
: format_(args.format),
stream_converter_(StreamConverter::CreateFromFloatSource(args.format)),
payload_buffer_(std::move(args.payload_buffer)) {}
void StreamSinkServer::PutPacket(PutPacketRequestView request,
PutPacketCompleter::Sync& completer) {
TRACE_DURATION("audio", "StreamSink::PutPacket");
ScopedThreadChecker checker(thread().checker());
if (!request->has_packet()) {
FX_LOGS(WARNING) << "PutPacket: missing packet";
CloseWithReason(ConsumerClosedReason::kInvalidPacket);
return;
}
if (!request->packet().has_payload()) {
FX_LOGS(WARNING) << "PutPacket: missing payload";
CloseWithReason(ConsumerClosedReason::kInvalidPacket);
return;
}
const auto& packet = request->packet();
if (packet.has_flags() || packet.has_front_frames_to_drop() || packet.has_back_frames_to_drop() ||
packet.has_encryption_properties()) {
FX_LOGS(WARNING) << "PutPacket: unsupported field";
CloseWithReason(ConsumerClosedReason::kInvalidPacket);
return;
}
const auto which_timestamp =
packet.has_timestamp() ? packet.timestamp().Which() : Timestamp::Tag::kUnspecifiedBestEffort;
zx::time timestamp;
zx::duration overflow;
// TODO(https://fxbug.dev/42181009): once the mixer service populates `Packet.capture_timestamp`, we can
// ignore `packet.timestamp` and instead use `packet.capture_timestamp`.
switch (which_timestamp) {
case Timestamp::Tag::kSpecified:
timestamp = zx::time(packet.timestamp().specified());
if (next_continuous_timestamp_) {
if (*next_continuous_timestamp_ > timestamp) {
FX_LOGS(WARNING) << "packet timestamp went backwards from " << *next_continuous_timestamp_
<< " to " << timestamp;
} else {
overflow = timestamp - *next_continuous_timestamp_;
}
}
break;
case Timestamp::Tag::kUnspecifiedContinuous:
// The mixer service uses Specified for the first packet in a sequence, therefore
// `next_continuous_timestamp_` should always be defined.
if (!next_continuous_timestamp_) {
FX_LOGS(WARNING) << "kUnspecifiedContinuous timestamp not preceded by Specified timestamp";
Shutdown(ZX_ERR_NOT_SUPPORTED);
}
timestamp = *next_continuous_timestamp_;
break;
case Timestamp::Tag::kUnspecifiedBestEffort:
// The mixer service ConsumerNodes never use BestEffort, so we don't need to support it.
FX_LOGS(WARNING) << "UnspecifiedBestEffort timestamps not supported";
Shutdown(ZX_ERR_NOT_SUPPORTED);
return;
default:
FX_LOGS(WARNING) << "PutPacket: unepxected packet timestamp tag = "
<< static_cast<int>(packet.timestamp().Which());
CloseWithReason(ConsumerClosedReason::kInvalidPacket);
return;
}
const auto& payload = packet.payload();
if (payload.buffer_id != 0) {
FX_LOGS(WARNING) << "PutPacket: unknown payload buffer id " << payload.buffer_id;
CloseWithReason(ConsumerClosedReason::kInvalidPacket);
return;
}
// Since the offset is an unsigned integer, the payload is out-of-range if its endpoint is too
// large or wraps around.
const uint64_t payload_offset_end = payload.offset + payload.size;
if (payload_offset_end > payload_buffer_->size() || payload_offset_end < payload.offset) {
FX_LOGS(WARNING) << "PutPacket: payload buffer out-of-range: offset=" << payload.offset
<< ", size=" << payload.size << " buffer_size=" << payload_buffer_->size();
CloseWithReason(ConsumerClosedReason::kInvalidPacket);
return;
}
if (payload.size % format_.bytes_per_frame() != 0) {
FX_LOGS(WARNING) << "PutPacket: payload buffer has a non-integral number of frames";
CloseWithReason(ConsumerClosedReason::kInvalidPacket);
return;
}
pending_packets_.push_back({
.timestamp = timestamp,
.overflow = overflow,
.data = static_cast<char*>(payload_buffer_->offset(payload.offset)),
.bytes_remaining = static_cast<int64_t>(payload.size),
.fence = request->has_release_fence() ? std::move(request->release_fence()) : zx::eventpair(),
});
next_continuous_timestamp_ =
timestamp + format_.duration_per(Fixed(payload.size / format_.bytes_per_frame()));
ServePendingCaptures();
}
void StreamSinkServer::StartSegment(StartSegmentRequestView request,
StartSegmentCompleter::Sync& completer) {
TRACE_DURATION("audio", "StreamSink::StartSegment");
// The mixer service ConsumerNodes never use StartSegment, so we don't need to implement it.
FX_LOGS(WARNING) << "StartSegment not supported";
Shutdown(ZX_ERR_NOT_SUPPORTED);
}
void StreamSinkServer::End(EndCompleter::Sync& completer) {
// End-of-stream is not used in audio_core.
}
void StreamSinkServer::WillClose(WillCloseRequestView request,
WillCloseCompleter::Sync& completer) {
TRACE_DURATION("audio", "StreamSink::WillClose");
ScopedThreadChecker checker(thread().checker());
if (request->has_reason()) {
FX_LOGS(INFO) << "StreamSink closing with reason " << static_cast<uint32_t>(request->reason());
}
}
void StreamSinkServer::CapturePacket(void* dest_buffer, int64_t dest_bytes,
CaptureCallback callback) {
pending_captures_.push_back({
.data = static_cast<char*>(dest_buffer),
.bytes_remaining = dest_bytes,
.bytes_captured = 0,
.callback = std::move(callback),
});
ServePendingCaptures();
}
void StreamSinkServer::DiscardPackets() {
pending_packets_.clear();
while (!pending_captures_.empty()) {
auto& capture = pending_captures_.front();
capture.callback(capture.start_timestamp.value_or(zx::time(0)).get(), capture.bytes_captured,
capture.overflow);
pending_captures_.pop_front();
}
}
void StreamSinkServer::ServePendingCaptures() {
while (!pending_captures_.empty() && !pending_packets_.empty()) {
auto& packet = pending_packets_.front();
auto& capture = pending_captures_.front();
capture.overflow += packet.overflow;
packet.overflow = zx::nsec(0);
if (!capture.start_timestamp) {
// We haven't copied anything into `capture` yet, so start from `packet`'s current position.
capture.start_timestamp = packet.timestamp;
capture.next_timestamp = packet.timestamp;
} else {
FX_CHECK(capture.next_timestamp);
// If `capture`'s current position doesn't align with `packet`, fill with silence. `capture`
// cannot be ahead of `packet`, since consecutive packet timestamps cannot go backwards.
const auto duration_ahead = packet.timestamp - *capture.next_timestamp;
const auto frames_ahead = format_.integer_frames_per(duration_ahead);
FX_CHECK(frames_ahead >= 0);
if (frames_ahead > 0) {
const auto capture_frames_needed = capture.bytes_remaining / format_.bytes_per_frame();
const auto silent_frames = std::max(frames_ahead, capture_frames_needed);
const auto silent_duration = format_.duration_per(Fixed(silent_frames));
const auto silent_bytes = silent_frames * format_.bytes_per_frame();
stream_converter_.WriteSilence(capture.data, silent_frames);
capture.bytes_remaining -= silent_bytes;
capture.bytes_captured += silent_bytes;
*capture.next_timestamp += silent_duration;
// If this `capture` is done, advance to the next `capture`.
if (capture.bytes_remaining == 0) {
capture.callback(capture.start_timestamp->get(), capture.bytes_captured,
capture.overflow);
pending_captures_.pop_front();
continue;
}
}
}
// Copy `packet` into `capture`.
const auto bytes_to_copy = std::min(capture.bytes_remaining, packet.bytes_remaining);
memmove(capture.data, packet.data, bytes_to_copy);
capture.data += bytes_to_copy;
capture.bytes_remaining -= bytes_to_copy;
capture.bytes_captured += bytes_to_copy;
packet.data += bytes_to_copy;
packet.bytes_remaining -= bytes_to_copy;
const auto frames_copied = bytes_to_copy / format_.bytes_per_frame();
const auto duration_copied = format_.duration_per(Fixed(frames_copied));
*capture.next_timestamp += duration_copied;
packet.timestamp += duration_copied;
if (packet.bytes_remaining == 0) {
pending_packets_.pop_front();
}
if (capture.bytes_remaining == 0) {
capture.callback(capture.start_timestamp->get(), capture.bytes_captured, capture.overflow);
pending_captures_.pop_front();
}
}
}
void StreamSinkServer::CloseWithReason(ConsumerClosedReason reason) {
fidl::Arena<> arena;
std::ignore = fidl::WireSendEvent(binding())->OnWillClose(
fuchsia_audio::wire::StreamSinkOnWillCloseRequest::Builder(arena).reason(reason).Build());
Shutdown();
}
} // namespace media_audio