blob: 4a77af98a252ad8f438802371b734085a7e20e8b [file] [log] [blame]
// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include <lib/media/test/codec_client.h>
#include <lib/async/cpp/task.h>
#include <src/lib/fxl/logging.h>
namespace {
// The client would like there to be at least this many extra input packets in
// addition to the bare minimum required for the codec to be able to function.
// Using 1 or 2 here can help avoid short stalls while packets are in transit
// even if the client doesn't actually need to hold any free input packets for
// any significant duration for any client-specific reason.
constexpr uint32_t kMinExtraInputPacketsForClient = 2;
// The client would like there to be at least this many extra output packets in
// addition to the bare minimum required for the codec to be able to function.
// The client _must_ use a non-zero value here if any output packets will be
// held indefinitely (such as held until the next output packet is available),
// since otherwise the Codec can be unable to continue processing.
constexpr uint32_t kMinExtraOutputPacketsForClient = 2;
// For input, this example doesn't re-configure input buffers, so there's only
// one buffer_lifetime_ordinal.
constexpr uint64_t kInputBufferLifetimeOrdinal = 1;
} // namespace
CodecClient::CodecClient(async::Loop* loop, fidl::InterfaceHandle<fuchsia::sysmem::Allocator> sysmem)
: loop_(loop), dispatcher_(loop_->dispatcher()) {
// We haven't created a channel yet, but that's fine, and we want the error
// handler set up before any error can possibly be generated by the channel so
// there's no chance of missing an error. The async::Loop that we'll use is
// already running separately from the current thread.
codec_.set_error_handler([](zx_status_t status) {
// Obviously a non-example client that continues to have a purpose even if
// one of its codecs dies would want to handle errors in a more contained
// way.
//
// TODO(dustingreen): get and print epitaph once that's possible.
FXL_LOG(FATAL) << "codec_ failed - exiting";
});
// Only one request is ever created, so we create it in the constructor to
// avoid needing any manual enforcement that we only do this once.
temp_codec_request_ = codec_.NewRequest(loop_->dispatcher());
// We treat event setup as much as possible like a hidden part of creating the
// CodecPtr. If NewBinding() has !is_valid(), we rely on the Codec server to
// close the Codec channel async.
codec_.events().OnStreamFailed =
fit::bind_member(this, &CodecClient::OnStreamFailed);
codec_.events().OnInputConstraints =
fit::bind_member(this, &CodecClient::OnInputConstraints);
codec_.events().OnFreeInputPacket =
fit::bind_member(this, &CodecClient::OnFreeInputPacket);
codec_.events().OnOutputConstraints =
fit::bind_member(this, &CodecClient::OnOutputConstraints);
codec_.events().OnOutputFormat =
fit::bind_member(this, &CodecClient::OnOutputFormat);
codec_.events().OnOutputPacket =
fit::bind_member(this, &CodecClient::OnOutputPacket);
codec_.events().OnOutputEndOfStream =
fit::bind_member(this, &CodecClient::OnOutputEndOfStream);
// Bind sysmem_ using FIDL thread. This is ok because all communication with
// sysmem also happens via FIDL thread so will queue after this posted lambda.
PostToFidlThread([this, sysmem = std::move(sysmem)]() mutable {
zx_status_t bind_status = sysmem_.Bind(std::move(sysmem), dispatcher_);
ZX_ASSERT(bind_status == ZX_OK);
});
}
CodecClient::~CodecClient() { Stop(); }
fidl::InterfaceRequest<fuchsia::media::StreamProcessor>
CodecClient::GetTheRequestOnce() {
return std::move(temp_codec_request_);
}
void CodecClient::Start() {
// The caller is responsible for calling this method only once, using the main
// thread. This method only holds the lock for short periods, and has to
// release the lock many times during this method, which is reasonable given
// the nature of this method as an overall state progression sequencer.
// Call Sync() and wait for it's response _only_ to force the Codec server to
// reach the point of being able response to messages, just for easier
// debugging if just starting the Codec server fails instead. Actual clients
// don't need to use Sync() here.
CallSyncAndWaitForResponse();
FXL_VLOG(3) << "Sync() completed, which means the Codec server exists.";
FXL_VLOG(3) << "Waiting for OnInputConstraints() from the Codec server...";
// The Codec client can rely on an OnInputConstraints() arriving shortly,
// without any message required from the client first. The
// OnInputConstraints() may in future actually be sent by the CodecFactory,
// but it'll still be sent to the client on the Codec channel in any case.
{ // scope lock
std::unique_lock<std::mutex> lock(lock_);
// In this example we're not paying attention to channel failure here
// because channel failure calls exit().
while (!input_constraints_) {
input_constraints_exist_condition_.wait(lock);
}
} // ~lock
ZX_ASSERT(input_constraints_);
FXL_VLOG(3) << "Got OnInputConstraints() from the Codec server.";
// We know input_constraints_ won't change outside the lock because we prevent
// that in OnInputConstraints() by only accepting input constraints if there
// aren't already input constraints.
// Now that we have input constraints, we can create all the input buffers and
// tell the Codec server about them. We tell the Codec server by using
// SetInputSettings() followed by one or num_buffers calls to
// AddInputBuffer(). These are necessary before it becomes permissible to
// call CreateStream().
//
// We're not on the FIDL thread, so we need to async::PostTask() over to the
// FIDL thread to send any FIDL message.
FXL_CHECK(input_constraints_->has_packet_count_for_server_recommended());
FXL_CHECK(input_constraints_->has_packet_count_for_server_max());
uint32_t packet_count_for_client = std::max(
kMinExtraInputPacketsForClient,
input_constraints_->packet_count_for_client_min());
uint32_t packet_count_for_server =
input_constraints_->packet_count_for_server_recommended();
if (packet_count_for_client > input_constraints_->packet_count_for_client_max()) {
FXL_LOG(FATAL) << "server can't accomodate "
"kMinExtraInputPacketsForClient - not "
"using server - exiting";
}
uint32_t input_packet_count;
fuchsia::sysmem::BufferCollectionInfo_2 buffer_collection_info;
if (!ConfigurePortBufferCollection(
false, kInputBufferLifetimeOrdinal,
input_constraints_->buffer_constraints_version_ordinal(),
packet_count_for_server, packet_count_for_client, &input_packet_count,
&input_buffer_collection_, &buffer_collection_info)) {
FXL_LOG(FATAL) << "ConfigurePortBufferCollection failed (input)";
}
ZX_ASSERT(input_free_bits_.empty());
input_free_bits_.resize(input_packet_count, true);
all_input_buffers_.reserve(input_packet_count);
for (uint32_t i = 0; i < input_packet_count; i++) {
std::unique_ptr<CodecBuffer> local_buffer =
CodecBuffer::CreateFromVmo(i, std::move(buffer_collection_info.buffers[i].vmo), buffer_collection_info.buffers[i].vmo_usable_start, buffer_collection_info.settings.buffer_settings.size_bytes, true);
if (!local_buffer) {
FXL_LOG(FATAL) << "CodecBuffer::CreateFromVmo() failed";
}
ZX_ASSERT(all_input_buffers_.size() == i);
all_input_buffers_.push_back(std::move(local_buffer));
}
// Now that we've SetInputBufferPartialSettings(), the codec will get the
// input buffers from sysmem. The input packets all start as free with the
// Codec client, per protocol. Same goes for input buffers - this client
// happens to track in terms of packets and have buffer_index == packet_index.
//
// TODO(dustingreen): Have CodecClient scramble the order of packets vs.
// buffers to check that CodecImpl is handling that correctly for input
// packets.
input_free_list_.reserve(input_packet_count);
for (uint32_t i = 0; i < input_packet_count; i++) {
input_free_list_.push_back(i);
}
}
bool CodecClient::CreateAndSyncBufferCollection(
fuchsia::sysmem::BufferCollectionSyncPtr* out_buffer_collection,
fidl::InterfaceHandle<fuchsia::sysmem::BufferCollectionToken>*
out_codec_sysmem_token) {
fuchsia::sysmem::BufferCollectionSyncPtr buffer_collection;
fidl::InterfaceHandle<fuchsia::sysmem::BufferCollectionToken> codec_sysmem_token;
// Create client_token which will get converted into out_buffer_collection.
fuchsia::sysmem::BufferCollectionTokenSyncPtr client_token;
fidl::InterfaceRequest<fuchsia::sysmem::BufferCollectionToken>
client_token_request = client_token.NewRequest();
// Create codec_sysmem_token that'll get returned via out_codec_sysmem_token.
client_token->Duplicate(
std::numeric_limits<uint32_t>::max(), codec_sysmem_token.NewRequest());
// client_token gets converted into a buffer_collection.
//
// Start client_token connection and start converting it into a BufferCollection,
// so we can Sync() the previous Duplicate().
PostToFidlThread(
[this, client_token_request = std::move(client_token_request),
client_token = client_token.Unbind(),
buffer_collection_request = buffer_collection.NewRequest()]() mutable {
if (!sysmem_) {
return;
}
sysmem_->AllocateSharedCollection(std::move(client_token_request));
// codec_sysmem_token will be known to sysmem by the time client_token
// closure is seen by sysmem, which in turn is before
// buffer_collection_request will be hooked up, which is why
// buffer_collection->Sync() completion below is enough to prove that sysmem
// knows about codec_sysmem_token before codec_sysmem_token is sent to the
// codec.
sysmem_->BindSharedCollection(std::move(client_token), std::move(buffer_collection_request));
});
// After Sync() completes its round trip, we know that sysmem knows about
// codec_sysmem_token (causally), which is important because we'll shortly
// send codec_sysmem_token to the codec which will use codec_sysmem_token via
// a different sysmem channel.
zx_status_t sync_status = buffer_collection->Sync();
if (sync_status != ZX_OK) {
FXL_LOG(FATAL) << "buffer_collection->Sync() failed - status: " << sync_status;
}
*out_buffer_collection = std::move(buffer_collection);
*out_codec_sysmem_token = std::move(codec_sysmem_token);
return true;
}
bool CodecClient::WaitForSysmemBuffersAllocated(
fuchsia::sysmem::BufferCollectionSyncPtr* buffer_collection_param,
fuchsia::sysmem::BufferCollectionInfo_2* out_buffer_collection_info) {
// The style guide doesn't like non-const &, but the code in this method is
// easier to read with a non-const &, so treat it that way within this method.
fuchsia::sysmem::BufferCollectionSyncPtr& buffer_collection =
*buffer_collection_param;
fuchsia::sysmem::BufferCollectionInfo_2 result_buffer_collection_info;
// It's not permitted to send input data until the client knows that sysmem
// is done allocating. It's not required that the client know that the
// codec knows that sysmem is done allocating though - the server will
// verify that sysmem is done by communicating with sysmem directly as
// needed.
zx_status_t allocate_status;
zx_status_t call_status = buffer_collection->WaitForBuffersAllocated(
&allocate_status, &result_buffer_collection_info);
if (call_status != ZX_OK) {
FXL_LOG(ERROR) << "WaitForBuffersAllocated returned failure - status: " << call_status;
return false;
}
if (allocate_status != ZX_OK) {
FXL_LOG(ERROR) << "WaitForBuffersAllocated allocation failed - status: " << allocate_status;
return false;
}
*out_buffer_collection_info = std::move(result_buffer_collection_info);
return true;
}
void CodecClient::Stop() {
if (codec_.is_bound()) {
codec_.Unbind();
}
if (sysmem_.is_bound()) {
sysmem_.Unbind();
}
if (input_buffer_collection_.is_bound()) {
input_buffer_collection_.Unbind();
}
if (output_buffer_collection_.is_bound()) {
output_buffer_collection_.Unbind();
}
}
void CodecClient::PostToFidlThread(fit::closure to_run) {
zx_status_t post_status = async::PostTask(dispatcher_, std::move(to_run));
ZX_ASSERT(post_status == ZX_OK);
}
void CodecClient::CallSyncAndWaitForResponse() {
// Just for clarity of the example, we'll use a local lock here, since that's
// all we actually need.
std::mutex is_sync_complete_lock;
bool is_sync_complete = false;
std::condition_variable is_sync_complete_condition;
// Capturing stuff with just "&" is sometimes frowned upon, but in this case
// there's no chance of any lambda outliving anything, so it's fine. The
// outer lambda is because ProxyController isn't thread-safe and the present
// method is called from the main thread not the FIDL thread, so we have to
// switch threads to send a FIDL message. The inner lambda is the completion
// callback.
FXL_VLOG(3) << "before calling Sync() (main thread)...";
async::PostTask(dispatcher_, [&] {
FXL_VLOG(3) << "before calling Sync() (fidl thread)...";
codec_->Sync([&]() {
{ // scope lock
std::unique_lock<std::mutex> lock(is_sync_complete_lock);
is_sync_complete = true;
} // ~lock
is_sync_complete_condition.notify_all();
});
});
FXL_VLOG(3) << "after calling Sync() - waiting...\n";
{ // scope lock
std::unique_lock<std::mutex> lock(is_sync_complete_lock);
// We rely on the channel error handler to be doing an exit() for this loop
// to be reasonable without checking for channel failure here.
while (!is_sync_complete) {
is_sync_complete_condition.wait(lock);
}
}
FXL_VLOG(3) << "after calling Sync() - done waiting\n";
ZX_ASSERT(is_sync_complete);
}
void CodecClient::TrackOutputStreamLifetimeOrdinal(
uint64_t output_stream_lifetime_ordinal) {
// must be odd
ZX_ASSERT(output_stream_lifetime_ordinal % 2 == 1);
ZX_ASSERT(output_stream_lifetime_ordinal >= output_stream_lifetime_ordinal_);
if (output_stream_lifetime_ordinal > output_stream_lifetime_ordinal_) {
// We're allowed to forget format any time there's a stream change, so we
// do. This isn't critical for this test code, but it's closer to how a
// real client will likely track the output format on a per-stream basis.
ZX_ASSERT(!last_output_format_ || last_output_format_->stream_lifetime_ordinal() == output_stream_lifetime_ordinal_);
output_stream_lifetime_ordinal_ = output_stream_lifetime_ordinal;
last_output_format_ = nullptr;
// We intentionally don't reset is_packet_since_last_format_.
}
}
void CodecClient::OnInputConstraints(
fuchsia::media::StreamBufferConstraints input_constraints) {
if (input_constraints_) {
FXL_LOG(FATAL) << "server sent more than one input constraints";
}
{ // scope lock
std::unique_lock<std::mutex> lock(lock_);
input_constraints_ =
std::make_unique<fuchsia::media::StreamBufferConstraints>(
std::move(input_constraints));
} // ~lock
input_constraints_exist_condition_.notify_all();
}
void CodecClient::OnFreeInputPacket(
fuchsia::media::PacketHeader free_input_packet) {
bool was_empty;
{ // scope lock
std::unique_lock<std::mutex> lock(lock_);
if (!free_input_packet.has_packet_index()) {
FXL_LOG(FATAL) << "OnFreeInputPacket(): Packet has no index.";
}
if (input_free_bits_[free_input_packet.packet_index()]) {
// already free - a normal client wouldn't want to just exit here since
// this is the server's fault - in this example we just care that we
// detect it
FXL_LOG(FATAL)
<< "OnFreeInputPacket() when already free - server's fault? - "
"packet_index: "
<< free_input_packet.packet_index();
}
was_empty = input_free_list_.empty();
input_free_list_.push_back(free_input_packet.packet_index());
input_free_bits_[free_input_packet.packet_index()] = true;
} // ~lock
if (was_empty) {
input_free_list_not_empty_.notify_all();
}
}
std::unique_ptr<fuchsia::media::Packet>
CodecClient::BlockingGetFreeInputPacket() {
uint32_t free_index;
{ // scope lock
std::unique_lock<std::mutex> lock(lock_);
while (input_free_list_.empty()) {
input_free_list_not_empty_.wait(lock);
}
free_index = input_free_list_.back();
input_free_list_.pop_back();
// We intentionally do not modify input_free_bits_ here, as those bits are
// tracking the protocol level free-ness, so will get updated when the
// caller queues the input packet.
ZX_ASSERT(input_free_bits_[free_index]);
}
std::unique_ptr<fuchsia::media::Packet> packet =
fuchsia::media::Packet::New();
packet->mutable_header()->set_buffer_lifetime_ordinal(
kInputBufferLifetimeOrdinal);
packet->mutable_header()->set_packet_index(free_index);
packet->set_buffer_index(free_index);
return packet;
}
const CodecBuffer& CodecClient::GetInputBufferByIndex(uint32_t packet_index) {
return *all_input_buffers_[packet_index];
}
const CodecBuffer& CodecClient::GetOutputBufferByIndex(uint32_t packet_index) {
return *all_output_buffers_[packet_index];
}
void CodecClient::QueueInputFormatDetails(
uint64_t stream_lifetime_ordinal,
fuchsia::media::FormatDetails input_format_details) {
async::PostTask(dispatcher_, [this, stream_lifetime_ordinal,
input_format_details =
std::move(input_format_details)]() mutable {
codec_->QueueInputFormatDetails(stream_lifetime_ordinal,
std::move(input_format_details));
});
}
// buffer - the populated input packet buffer, or an empty input buffers with
// end_of_stream set on it.
void CodecClient::QueueInputPacket(
std::unique_ptr<fuchsia::media::Packet> packet) {
ZX_ASSERT(packet->has_header());
ZX_ASSERT(packet->header().has_buffer_lifetime_ordinal());
ZX_ASSERT(packet->header().has_packet_index());
ZX_ASSERT(packet->has_buffer_index());
ZX_ASSERT(packet->has_stream_lifetime_ordinal());
ZX_ASSERT(packet->has_start_offset());
ZX_ASSERT(packet->has_valid_length_bytes());
// timestamp_ish field is optional
// start_access_unit field is optional
// known_end_access_unit is optional
fuchsia::media::Packet local_packet = fidl::Clone(*packet);
{ // scope lock
// This packet is already not on the free list, but is still considered free
// from a protocol point of view, so update that part.
std::unique_lock<std::mutex> lock(lock_);
ZX_ASSERT(input_free_bits_[local_packet.header().packet_index()]);
input_free_bits_[local_packet.header().packet_index()] = false;
// From here it's as if this packet is already in flight with the server.
} // ~lock
async::PostTask(dispatcher_,
[this, packet = std::move(local_packet)]() mutable {
codec_->QueueInputPacket(std::move(packet));
});
}
void CodecClient::QueueInputEndOfStream(uint64_t stream_lifetime_ordinal) {
async::PostTask(dispatcher_, [this, stream_lifetime_ordinal] {
codec_->QueueInputEndOfStream(stream_lifetime_ordinal);
});
}
void CodecClient::FlushEndOfStreamAndCloseStream(
uint64_t stream_lifetime_ordinal) {
async::PostTask(dispatcher_, [this, stream_lifetime_ordinal] {
codec_->FlushEndOfStreamAndCloseStream(stream_lifetime_ordinal);
});
}
std::unique_ptr<CodecOutput> CodecClient::BlockingGetEmittedOutput() {
while (true) {
// The rule is that a required pending constraints won't be followed by any
// more output packets until it's no longer pending (in the sense that the
// output buffers have been suitably re-configured). We verify the server
// is following that rule elsewhere, which means we know here that when both
// packets are pending and constraints is pending, the packets were
// delivered to the client first. So we drain the packets first.
std::unique_ptr<CodecOutput> packet;
std::shared_ptr<const fuchsia::media::StreamOutputConstraints> constraints;
{ // scope lock
std::unique_lock<std::mutex> lock(lock_);
while (!output_pending_) {
output_pending_condition_.wait(lock);
}
if (!emitted_output_.empty()) {
packet = std::move(emitted_output_.front());
emitted_output_.pop_front();
// This only does anything when the last packet is consumed and there is
// no config pending.
if (!ComputeOutputPendingLocked()) {
output_pending_ = false;
}
} else {
ZX_ASSERT(output_constraints_action_pending_);
ZX_ASSERT(last_required_output_constraints_);
constraints = last_required_output_constraints_;
}
} // ~lock
// Now we own a packet or have a required config to deal with, but not both,
// so it doesn't matter which order we check here, but for clarity we check
// in the same order as above.
if (packet) {
return packet;
}
// We have a required output config change to deal with here.
// The server implicitly has relinquished ownership of all output packets
// and all output buffers as a semantic of the required config change. This
// shouldn't really be thought of the packets being "emitted" - rather from
// the server's point of view they're deallocated already. Now it's the
// client's turn to deallocate the old buffers. It is not permitted to
// re-use a previous buffer as a new buffer, per protocol rules, not even
// for the same Codec instance, and not even for a mid-stream output config
// change.
//
// The main mechanism used to detect that the server isn't sending output
// too soon is output_config_action_pending_. In contrast, the client code
// in this example permits itself to send RecycleOutputPacket() after the
// client has already seen OnOutputConstraints() with action required true, even
// though the client could stop itself from doing so as a potential
// optimization. The client is allowed to send RecycleOutputPacket() up
// until the implied ReleaseOutputBuffers() at the start of
// SetOutputSettings(). Between then and a given packet_index becoming
// emitted again (!free), a RecycleOutputPacket() for that packet_index is
// forbidden.
//
// Because of the client allowing itself to send RecycleOutputPacket() for a
// while longer than fundamentally necessary, we delay upkeep on
// output_free_bits_ until here. This upkeep isn't really fundamentally
// necessary between OnOutputConstraints() with action required true and the last
// AddOutputBuffer() as part of output re-configuration, but ... this
// explicit delayed upkeep _may_ help illustrate how it's acceptable for a
// client to let the completion end of output processing send
// RecycleOutputPacket() as long as all those will be sent before
// SetOutputSettings().
std::shared_ptr<const fuchsia::media::StreamOutputConstraints>
snapped_constraints;
uint64_t new_output_buffer_lifetime_ordinal;
{ // scope lock
std::unique_lock<std::mutex> lock(lock_);
// We know this because the previous OnOutputConstraints() set this and
// because we're only here if it's set.
ZX_ASSERT(output_constraints_action_pending_);
// We know this because we reject additional output from the server when
// output_config_action_pending_ is true, and because we've drained all
// previous output by this point.
ZX_ASSERT(emitted_output_.empty());
// We know this because we're only here if we have pending constraints.
ZX_ASSERT(constraints);
// Not really critical to do this, as we'll just end up setting these
// back to true under the same lock hold interval as we set
// output_config_action_pending_ to false, but see comment above re.
// how this might help illustrate how late RecycleOutputPacket() can be
// sent.
//
// Think of this assignment as slightly more than a comment in this
// example, rather than any real need.
output_free_bits_.resize(0);
// Free the old output buffers, if any.
all_output_buffers_.clear();
// Here is where we snap which exact constraints version we'll actually use.
//
// For a client that's doing output buffer re-config on the FIDL thread
// during OnOutputConstraints with action required true, this will always
// just be the constraints being presently received. But this example shows
// how to drive the codec in a protocol-valid way without being forced to
// perform buffer re-configuration on the FIDL thread.
ZX_ASSERT(output_constraints_action_pending_);
ZX_ASSERT(last_required_output_constraints_);
ZX_ASSERT(last_output_constraints_);
// We'll snap the last_output_constraints_, which is always at least as
// recent as the last_required_output_constraints_.
snapped_constraints = last_output_constraints_;
ZX_ASSERT(snapped_constraints);
new_output_buffer_lifetime_ordinal = next_output_buffer_lifetime_ordinal_;
next_output_buffer_lifetime_ordinal_ += 2;
} // ~lock
//
// Tell the server about output settings.
//
ZX_ASSERT(snapped_constraints->has_buffer_constraints());
const fuchsia::media::StreamBufferConstraints& buffer_constraints =
snapped_constraints->buffer_constraints();
ZX_ASSERT(buffer_constraints.has_packet_count_for_server_recommended());
uint32_t packet_count_for_server =
buffer_constraints.packet_count_for_server_recommended();
uint32_t packet_count_for_client = std::max(
kMinExtraOutputPacketsForClient,
buffer_constraints.packet_count_for_client_min());
if (packet_count_for_client > buffer_constraints.packet_count_for_client_max()) {
FXL_LOG(FATAL) << "server can't accomodate "
"kMinExtraOutputPacketsForClient - not "
"using server - exiting";
}
uint32_t packet_count;
fuchsia::sysmem::BufferCollectionInfo_2 buffer_collection_info;
if (!ConfigurePortBufferCollection(
true, new_output_buffer_lifetime_ordinal,
buffer_constraints.buffer_constraints_version_ordinal(),
packet_count_for_server, packet_count_for_client,
&packet_count, &output_buffer_collection_, &buffer_collection_info)) {
FXL_LOG(FATAL) << "ConfigurePortBufferCollection failed (output)";
}
// Configure tracking for output buffers.
{ // scope lock
std::lock_guard<std::mutex> lock(lock_);
all_output_buffers_.reserve(packet_count);
for (uint32_t i = 0; i < packet_count; i++) {
std::unique_ptr<CodecBuffer> buffer = CodecBuffer::CreateFromVmo(
i, std::move(buffer_collection_info.buffers[i].vmo),
buffer_collection_info.buffers[i].vmo_usable_start,
buffer_collection_info.settings.buffer_settings.size_bytes, false);
if (!buffer) {
FXL_LOG(FATAL) << "CodecBuffer::Allocate() failed (output)";
}
ZX_ASSERT(all_output_buffers_.size() == i);
all_output_buffers_.push_back(std::move(buffer));
if (i == packet_count - 1) {
output_free_bits_.resize(packet_count, true);
}
}
current_output_buffer_lifetime_ordinal_ =
new_output_buffer_lifetime_ordinal;
} // ~lock
// We're ready to receive output.
PostToFidlThread([this, output_buffer_lifetime_ordinal = new_output_buffer_lifetime_ordinal]{
if (!codec_) {
return;
}
if (output_buffer_lifetime_ordinal != current_output_buffer_lifetime_ordinal_) {
return;
}
codec_->CompleteOutputBufferPartialSettings(
output_buffer_lifetime_ordinal);
});
{ // scope lock
std::lock_guard<std::mutex> lock(lock_);
// So, now that we're done with that output re-config, it's time to see if
// that re-config was the last one we need to do, or if there's a newer
// config that's action-required.
if (snapped_constraints->buffer_constraints()
.buffer_constraints_version_ordinal() >=
last_required_output_constraints_->buffer_constraints()
.buffer_constraints_version_ordinal()) {
// Good. The client is caught up. The output_config_action_pending_
// can become false here, but may very shortly become true again if
// another OnOutputConstraints() is received after we release the lock
// (roughly speaking; see code).
//
// It's ok that we didn't set output_config_action_pending_ to false
// before sending the last AddOutputBuffer() above, because
// OnOutputConstraints() was still able to update
// last_required_output_config_ as needed, which it's been able to do
// all along during most of this whole method. If we had set to false
// up there, it would probably be less obvious why it works vs. here,
// but either can work.
FXL_VLOG(3) << "output_config_action_pending_ = false, because client "
"caught up";
output_constraints_action_pending_ = false;
// Because this was true for at least pending config reason which we
// are only just clearing immediately above.
ZX_ASSERT(output_pending_);
// There can be output packets by this point so only clear
// output_pending_ if there are also no packets.
if (!ComputeOutputPendingLocked()) {
output_pending_ = false;
}
} else {
// We've received and even more recent constraints that's
// action-required, so go around again without clearing
// output_constraints_action_pending_ or output_pending_. Both remain
// true until we've caught up to a config that's at least as new as the
// last_required_output_constraints_.
FXL_VLOG(3)
<< "output_constraints_action_pending_ remains true because server "
"has sent yet another action-required output constraints";
ZX_ASSERT(output_constraints_action_pending_);
ZX_ASSERT(output_pending_);
}
} // ~lock
}
}
bool CodecClient::ConfigurePortBufferCollection(
bool is_output, uint64_t new_buffer_lifetime_ordinal,
uint64_t buffer_constraints_version_ordinal,
uint32_t packet_count_for_server, uint32_t packet_count_for_client,
uint32_t* out_packet_count,
fuchsia::sysmem::BufferCollectionPtr* out_buffer_collection,
fuchsia::sysmem::BufferCollectionInfo_2* out_buffer_collection_info) {
uint32_t packet_count = packet_count_for_server + packet_count_for_client;
fuchsia::media::StreamBufferPartialSettings settings;
settings.set_buffer_lifetime_ordinal(new_buffer_lifetime_ordinal);
settings.set_buffer_constraints_version_ordinal(
buffer_constraints_version_ordinal);
settings.set_single_buffer_mode(false);
settings.set_packet_count_for_server(packet_count_for_server);
settings.set_packet_count_for_client(packet_count_for_client);
fuchsia::sysmem::BufferCollectionSyncPtr buffer_collection;
fidl::InterfaceHandle<fuchsia::sysmem::BufferCollectionToken> codec_sysmem_token;
if (!CreateAndSyncBufferCollection(&buffer_collection, &codec_sysmem_token)) {
FXL_LOG(FATAL) << "CreateAndSyncBufferCollection failed (output)";
return false;
}
settings.set_sysmem_token(std::move(codec_sysmem_token));
fuchsia::sysmem::BufferCollectionConstraints constraints;
constraints.usage.cpu = is_output ? fuchsia::sysmem::cpuUsageReadOften : fuchsia::sysmem::cpuUsageWriteOften;
// TODO(dustingreen): Make this more flexible once we're more flexible on
// frame_count on output of decoder.
constraints.min_buffer_count_for_camping = packet_count_for_client;
ZX_DEBUG_ASSERT(constraints.min_buffer_count_for_dedicated_slack == 0);
ZX_DEBUG_ASSERT(constraints.min_buffer_count_for_shared_slack == 0);
// 0 is treated as 0xFFFFFFFF.
ZX_DEBUG_ASSERT(constraints.max_buffer_count == 0);
constraints.has_buffer_memory_constraints = true;
// Sysmem has a built-in min_size_bytes of 1, so no need to really constrain
// min_size_bytes here.
ZX_DEBUG_ASSERT(constraints.buffer_memory_constraints.min_size_bytes == 0);
constraints.buffer_memory_constraints.max_size_bytes =
std::numeric_limits<uint32_t>::max();
constraints.buffer_memory_constraints.physically_contiguous_required = false;
constraints.buffer_memory_constraints.secure_required = false;
// This test client code has no way to produce or consume output data in
// protected memory.
constraints.buffer_memory_constraints.secure_permitted = false;
// Despite being a consumer of output uncompressed video frames (when
// decoding video and is_output), for now we intentionally don't constrain to
// the PixelFormatType(s) that we can consume, and instead fail later if we
// get something unexpected on output. That's just easier than plumbing
// PixelFormatType(s) to here for now.
ZX_DEBUG_ASSERT(constraints.image_format_constraints_count == 0);
PostToFidlThread([this, is_output, settings = std::move(settings)]() mutable {
if (!codec_) {
return;
}
if (is_output) {
codec_->SetOutputBufferPartialSettings(std::move(settings));
} else {
codec_->SetInputBufferPartialSettings(std::move(settings));
}
});
buffer_collection->SetConstraints(true, std::move(constraints));
fuchsia::sysmem::BufferCollectionInfo_2 buffer_collection_info;
// This borrows buffer_collection during the call.
if (!WaitForSysmemBuffersAllocated(&buffer_collection,
&buffer_collection_info)) {
FXL_LOG(FATAL) << "WaitForSysmemBuffersAllocated failed";
return false;
}
packet_count = std::max(
packet_count, buffer_collection_info.buffer_count);
fuchsia::sysmem::BufferCollectionPtr buffer_collection_ptr;
// For the Bind() we probably don't strictly need to be on FIDL thread, so
// do the Bind() here. This does mean we need to set the error handler
// before the Bind() however.
buffer_collection_ptr.set_error_handler([is_output](zx_status_t status){
FXL_LOG(FATAL) << "BufferCollection failed - status: " << status << " is_output: " << is_output;
});
// This implicitly converts buffer_collection from
// fidl::SynchronousInterfacePtr<> to fidl::InterfaceHandle<>, which is
// what we want; we're moving handling of the BufferCollection from this
// thread to the FIDL thread.
zx_status_t bind_status = buffer_collection_ptr.Bind(
std::move(buffer_collection), dispatcher_);
if (bind_status != ZX_OK) {
FXL_LOG(FATAL) << "buffer_collection_ptr.Bind() failed - status: " << bind_status << " is_output: " << is_output;
return false;
}
*out_packet_count = packet_count;
*out_buffer_collection = std::move(buffer_collection_ptr);
*out_buffer_collection_info = std::move(buffer_collection_info);
return true;
}
void CodecClient::RecycleOutputPacket(
fuchsia::media::PacketHeader free_packet) {
ZX_ASSERT(free_packet.has_packet_index());
{ // scope lock
std::unique_lock<std::mutex> lock(lock_);
output_free_bits_[free_packet.packet_index()] = true;
} // ~lock
async::PostTask(dispatcher_,
[this, free_packet = std::move(free_packet)]() mutable {
codec_->RecycleOutputPacket(std::move(free_packet));
});
}
void CodecClient::OnOutputConstraints(
fuchsia::media::StreamOutputConstraints output_constraints) {
bool output_pending_notify_needed = false;
// Not that the std::move() actaully helps here, but that's what we're doing.
std::shared_ptr<fuchsia::media::StreamOutputConstraints> shared_constraints =
std::make_shared<fuchsia::media::StreamOutputConstraints>(
std::move(output_constraints));
{ // scope lock
std::unique_lock<std::mutex> lock(lock_);
if (!shared_constraints->has_stream_lifetime_ordinal()) {
FXL_LOG(FATAL) << "StreamOutputConstraints missing stream_lifetime_ordinal";
}
uint64_t stream_lifetime_ordinal =
shared_constraints->stream_lifetime_ordinal();
TrackOutputStreamLifetimeOrdinal(stream_lifetime_ordinal);
ZX_ASSERT(!last_output_constraints_ ||
(last_output_constraints_->has_buffer_constraints() &&
last_output_constraints_->buffer_constraints()
.has_buffer_constraints_version_ordinal()));
uint64_t previous_buffer_constraints_version_ordinal =
last_output_constraints_ ? last_output_constraints_->buffer_constraints()
.buffer_constraints_version_ordinal()
: 0;
ZX_ASSERT(shared_constraints->has_buffer_constraints() &&
shared_constraints->buffer_constraints()
.has_buffer_constraints_version_ordinal());
if (shared_constraints->buffer_constraints()
.buffer_constraints_version_ordinal() <
previous_buffer_constraints_version_ordinal) {
FXL_LOG(FATAL)
<< "broken server sent badly ordered buffer constraints ordinals";
}
if ((shared_constraints->has_buffer_constraints_action_required() &&
shared_constraints->buffer_constraints_action_required()) &&
shared_constraints->buffer_constraints()
.buffer_constraints_version_ordinal() <=
previous_buffer_constraints_version_ordinal) {
FXL_LOG(FATAL)
<< "broken server sent buffer_constraints_action_required without "
"increasingbuffer_constraints_version_ordinal";
}
last_output_constraints_ = shared_constraints;
FXL_VLOG(3) << "OnOutputConstraints buffer_constraints_version_ordinal: "
<< shared_constraints->buffer_constraints()
.buffer_constraints_version_ordinal()
<< "buffer_constraints_action_required: "
<< shared_constraints->buffer_constraints_action_required();
if (shared_constraints->buffer_constraints_action_required()) {
last_required_output_constraints_ = shared_constraints;
// A client is allowed to forget the output format on any action required
// buffer constraints, so forget here.
last_output_format_ = nullptr;
FXL_VLOG(3) << "output_config_action_pending_ = true, because received a "
"buffer_constraints_action_required constraints\n";
output_constraints_action_pending_ = true;
if (!output_pending_) {
output_pending_ = true;
output_pending_notify_needed = true;
}
}
} // ~lock
if (output_pending_notify_needed) {
output_pending_condition_.notify_all();
}
}
void CodecClient::OnOutputFormat(
fuchsia::media::StreamOutputFormat output_format) {
// We don't need to notify output_pending_condition_ since in contrast to
// constraints we don't need to take action, and nobody cares about the format
// until the next packet arrives.
std::shared_ptr<fuchsia::media::StreamOutputFormat> shared_format =
std::make_shared<fuchsia::media::StreamOutputFormat>(
std::move(output_format));
std::unique_lock<std::mutex> lock(lock_);
if (!shared_format->has_stream_lifetime_ordinal()) {
FXL_LOG(FATAL) << "StreamOutputFormat missing stream_lifetime_ordinal";
}
uint64_t stream_lifetime_ordinal =
shared_format->stream_lifetime_ordinal();
TrackOutputStreamLifetimeOrdinal(stream_lifetime_ordinal);
if (is_format_since_last_packet_) {
// A server can easily elide unnecessary OnOutputFormat by not sending
// OnOutputFormat until immediately before OnOutputPacket. The format is
// logically part of each output packet, with the optimization that we only
// send output format when it changes, in between packets, to avoid needing
// to send output format with every packet.
FXL_LOG(FATAL)
<< "broken server sent two OnOutputFormat() in a row";
}
if (!shared_format->has_stream_lifetime_ordinal()) {
FXL_LOG(FATAL) << "OnOutputFormat !has_stream_lifetime_ordinal()";
}
if (!shared_format->has_format_details()) {
FXL_LOG(FATAL) << "OnOutputFormat !has_format_details()";
}
last_output_format_ = shared_format;
is_format_since_last_packet_ = true;
}
void CodecClient::OnOutputPacket(fuchsia::media::Packet output_packet,
bool error_detected_before,
bool error_detected_during) {
FXL_CHECK(output_packet.has_header());
FXL_CHECK(output_packet.has_stream_lifetime_ordinal());
FXL_CHECK(output_packet.header().has_packet_index());
bool output_pending_notify_needed = false;
std::unique_ptr<const fuchsia::media::Packet> local_packet =
std::make_unique<fuchsia::media::Packet>(std::move(output_packet));
uint32_t packet_index = local_packet->header().packet_index();
{ // scope lock
std::unique_lock<std::mutex> lock(lock_);
if (!local_packet->has_stream_lifetime_ordinal()) {
FXL_LOG(FATAL) << "Packet missing stream_lifetime_ordinal";
}
uint64_t stream_lifetime_ordinal =
local_packet->stream_lifetime_ordinal();
TrackOutputStreamLifetimeOrdinal(stream_lifetime_ordinal);
if (!last_output_format_ || last_output_format_->stream_lifetime_ordinal() != stream_lifetime_ordinal) {
FXL_LOG(FATAL) << "OnOutputFormat required before OnOutputPacket, per-stream";
}
std::unique_ptr<CodecOutput> output = std::make_unique<CodecOutput>(
stream_lifetime_ordinal, last_output_constraints_, last_output_format_,
std::move(local_packet), false);
if (output_constraints_action_pending_) {
// FWIW, we wouldn't be able to detect this if we were using the
// async::Loop thread to perform output buffer re-configuration.
FXL_LOG(FATAL) << "server incorrectly sent output packet while required "
"constraints change pending";
}
if (!output_free_bits_[packet_index]) {
// The packet was emitted twice by the server without it becoming free in
// between, which is broken server behavior.
FXL_LOG(FATAL)
<< "server incorrectly emitted an output packet without it becoming "
"free in between";
}
// Emitted by server, so not free until later when we send it back to server
// with RecycleOutputPacket(), or until we re-configure output buffers in
// which case all the output packets start free with the server.
output_free_bits_[packet_index] = false;
emitted_output_.push_back(std::move(output));
is_format_since_last_packet_ = false;
if (!output_pending_) {
output_pending_ = true;
output_pending_notify_needed = true;
}
} // ~lock
if (output_pending_notify_needed) {
output_pending_condition_.notify_all();
}
}
void CodecClient::OnOutputEndOfStream(uint64_t stream_lifetime_ordinal,
bool error_detected_before) {
bool output_pending_notify_needed = false;
std::unique_ptr<CodecOutput> output = std::make_unique<CodecOutput>(
stream_lifetime_ordinal, nullptr, nullptr, nullptr, true);
{ // scope lock
std::unique_lock<std::mutex> lock(lock_);
if (output_constraints_action_pending_) {
// FWIW, we wouldn't be able to detect this if we were using the
// async::Loop thread to perform output buffer re-configuration.
FXL_LOG(FATAL) << "server incorrectly sent OnOutputEndOfStream() while "
"required constraints change pending";
}
emitted_output_.push_back(std::move(output));
if (!output_pending_) {
output_pending_ = true;
output_pending_notify_needed = true;
}
} // ~lock
if (output_pending_notify_needed) {
output_pending_condition_.notify_all();
}
}
void CodecClient::OnStreamFailed(uint64_t stream_lifetime_ordinal) {
ZX_ASSERT(false && "not implemented");
}