blob: fa5c1c90b162bb121799dc9244371584761f1f75 [file] [log] [blame]
// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "garnet/bin/mediaplayer/graph/nodes/node.h"
#include <lib/async/cpp/task.h>
#include "garnet/bin/mediaplayer/graph/formatting.h"
namespace media_player {
namespace {
// Determines if the |PayloadManager| for the input's connection is ready.
// If so, notifies the node associated with the connected output and returns
// true. If not, returns false.
bool NotifyConnectionReady(const Input& input) {
if (!input.connected()) {
return false;
}
if (!input.payload_manager().ready()) {
return false;
}
Output* output = input.mate();
FXL_DCHECK(output);
FXL_DCHECK(output->node());
output->node()->NotifyOutputConnectionReady(output->index());
return true;
}
// Determines if the |PayloadManager| for the output's connection is ready.
// If so, notifies the node associated with the connected input and returns
// true. If not, returns false.
bool NotifyConnectionReady(const Output& output) {
if (!output.connected()) {
return false;
}
Input* input = output.mate();
FXL_DCHECK(input);
if (!input->payload_manager().ready()) {
return false;
}
FXL_DCHECK(input->node());
input->node()->NotifyInputConnectionReady(input->index());
return true;
}
} // namespace
Node::Node() { update_counter_ = 0; }
Node::~Node() {}
const char* Node::label() const { return "<not labelled>"; }
void Node::ShutDown() {
std::lock_guard<std::mutex> locker(tasks_mutex_);
while (!tasks_.empty()) {
tasks_.pop();
}
}
void Node::NeedsUpdate() {
// Atomically preincrement the update counter. If we get the value 1, that
// means the counter was zero, and we need to post an update. If we get
// anything else, |UpdateUntilDone| is already running. In that case, we know
// |UpdateUntilDone| will run |Update| after the increment occurred.
if (++update_counter_ == 1) {
// This stage has no update pending in the task queue or running.
PostTask([this]() { UpdateUntilDone(); });
}
}
void Node::UpdateUntilDone() {
while (true) {
// Set the counter to 1. If it's still 1 after we updated, we're done.
// Otherwise, we need to update more.
update_counter_ = 1;
Update();
// Quit if the counter is still at 1, otherwise update again.
uint32_t expected = 1;
if (update_counter_.compare_exchange_strong(expected, 0)) {
break;
}
}
}
void Node::Acquire(fit::closure callback) {
PostTask([this, callback = std::move(callback)]() {
{
std::lock_guard<std::mutex> locker(tasks_mutex_);
tasks_suspended_ = true;
}
callback();
});
}
void Node::Release() {
{
std::lock_guard<std::mutex> locker(tasks_mutex_);
tasks_suspended_ = false;
if (tasks_.empty()) {
// Don't need to run tasks.
return;
}
}
FXL_DCHECK(dispatcher_);
async::PostTask(dispatcher_, [shared_this = shared_from_this()]() {
shared_this->RunTasks();
});
}
void Node::SetDispatcher(async_dispatcher_t* dispatcher) {
FXL_DCHECK(dispatcher);
dispatcher_ = dispatcher;
}
void Node::PostTask(fit::closure task) {
FXL_DCHECK(task);
{
std::lock_guard<std::mutex> locker(tasks_mutex_);
tasks_.push(std::move(task));
if (tasks_.size() != 1 || tasks_suspended_) {
// Don't need to run tasks, either because there were already tasks in
// the queue or because task execution is suspended.
return;
}
}
FXL_DCHECK(dispatcher_);
async::PostTask(dispatcher_, [shared_this = shared_from_this()]() {
shared_this->RunTasks();
});
}
void Node::PostShutdownTask(fit::closure task) {
FXL_DCHECK(dispatcher_);
async::PostTask(dispatcher_, [shared_this = shared_from_this(),
task = std::move(task)]() { task(); });
}
void Node::RunTasks() {
tasks_mutex_.lock();
while (!tasks_.empty() && !tasks_suspended_) {
fit::closure task = std::move(tasks_.front());
tasks_mutex_.unlock();
task();
// The closure may be keeping objects alive. Destroy it here so those
// objects are destroyed with the mutex unlocked. It's OK to do this,
// because this method is the only consumer of tasks from the queue, and
// this method will not be re-entered.
task = nullptr;
tasks_mutex_.lock();
tasks_.pop();
}
tasks_mutex_.unlock();
}
void Node::Dump(std::ostream& os) const {
FXL_DCHECK_CREATION_THREAD_IS_CURRENT(thread_checker_);
if (inputs_.size() == 1) {
os << fostr::NewLine << "input:";
DumpInputDetail(os, inputs_[0]);
} else if (inputs_.size() > 1) {
os << fostr::NewLine << "inputs:";
int index = 0;
for (auto& input : inputs_) {
os << fostr::NewLine << "[" << index++ << "] ";
DumpInputDetail(os, input);
}
}
if (outputs_.size() == 1) {
os << fostr::NewLine << "output:";
DumpOutputDetail(os, outputs_[0]);
} else if (outputs_.size() > 1) {
os << fostr::NewLine << "outputs:";
int index = 0;
for (auto& output : outputs_) {
os << fostr::NewLine << "[" << index++ << "] ";
DumpOutputDetail(os, output);
}
}
}
void Node::DumpInputDetail(std::ostream& os, const Input& input) const {
FXL_DCHECK_CREATION_THREAD_IS_CURRENT(thread_checker_);
os << fostr::Indent;
if (input.connected()) {
os << fostr::NewLine << "connected to: " << *input.mate();
} else {
os << fostr::NewLine << "connected to: <nothing>";
}
os << fostr::NewLine << "payload config: " << input.payload_config();
os << fostr::NewLine << "payload manager: ";
input.payload_manager().Dump(os);
os << fostr::NewLine << "needs packet: " << input.needs_packet();
os << fostr::NewLine << "packet: " << input.packet();
os << fostr::Outdent;
}
void Node::DumpOutputDetail(std::ostream& os, const Output& output) const {
FXL_DCHECK_CREATION_THREAD_IS_CURRENT(thread_checker_);
os << fostr::Indent;
if (output.connected()) {
os << fostr::NewLine << "connected to: " << *output.mate();
} else {
os << fostr::NewLine << "connected to: <nothing>";
}
os << fostr::NewLine << "payload config: " << output.payload_config();
if (output.connected()) {
os << fostr::NewLine << "needs packet: " << output.needs_packet();
}
std::lock_guard<std::mutex> locker(packets_per_output_mutex_);
auto& packets = packets_per_output_[output.index()];
if (!packets.empty()) {
os << fostr::NewLine << "queued packets:" << fostr::Indent;
for (auto& packet : packets) {
os << fostr::NewLine << packet;
}
os << fostr::Outdent;
}
os << fostr::Outdent;
}
size_t Node::input_count() const {
FXL_DCHECK_CREATION_THREAD_IS_CURRENT(thread_checker_);
return inputs_.size();
};
Input& Node::input(size_t input_index) {
FXL_DCHECK_CREATION_THREAD_IS_CURRENT(thread_checker_);
FXL_DCHECK(input_index < inputs_.size());
return inputs_[input_index];
}
size_t Node::output_count() const {
FXL_DCHECK_CREATION_THREAD_IS_CURRENT(thread_checker_);
return outputs_.size();
}
Output& Node::output(size_t output_index) {
FXL_DCHECK_CREATION_THREAD_IS_CURRENT(thread_checker_);
FXL_DCHECK(output_index < outputs_.size());
return outputs_[output_index];
}
void Node::NotifyInputConnectionReady(size_t index) {
FXL_DCHECK(index < inputs_.size());
PostTask([this, index]() {
FXL_DCHECK_CREATION_THREAD_IS_CURRENT(thread_checker_);
OnInputConnectionReady(index);
});
}
void Node::NotifyOutputConnectionReady(size_t index) {
FXL_DCHECK(index < outputs_.size());
PostTask([this, index]() {
FXL_DCHECK_CREATION_THREAD_IS_CURRENT(thread_checker_);
OnOutputConnectionReady(index);
});
}
void Node::Update() {
FXL_DCHECK_CREATION_THREAD_IS_CURRENT(thread_checker_);
for (auto& input : inputs_) {
if (input.packet()) {
PutInputPacket(input.TakePacket(false), input.index());
}
}
bool request_packet = false;
for (auto& output : outputs_) {
if (!output.connected()) {
continue;
}
PacketPtr packet_to_supply;
if (MaybeTakePacketForOutput(output, &packet_to_supply)) {
request_packet = true;
}
if (packet_to_supply) {
output.SupplyPacket(std::move(packet_to_supply));
}
}
if (request_packet) {
RequestOutputPacket();
}
}
bool Node::MaybeTakePacketForOutput(const Output& output,
PacketPtr* packet_out) {
FXL_DCHECK_CREATION_THREAD_IS_CURRENT(thread_checker_);
FXL_DCHECK(packet_out);
if (!output.needs_packet()) {
return false;
}
bool request_packet = false;
std::lock_guard<std::mutex> locker(packets_per_output_mutex_);
std::deque<PacketPtr>& packets = packets_per_output_[output.index()];
if (packets.empty()) {
// The output needs a packet and has no packets queued. Request another
// packet so we can meet the demand.
request_packet = true;
} else {
// The output has demand and packets queued.
*packet_out = std::move(packets.front());
packets.pop_front();
}
return request_packet;
}
void Node::FlushInputExternal(size_t input_index, bool hold_frame,
fit::closure callback) {
FXL_DCHECK_CREATION_THREAD_IS_CURRENT(thread_checker_);
FXL_DCHECK(input_index < inputs_.size());
inputs_[input_index].Flush();
FlushInput(hold_frame, input_index,
[this, callback = std::move(callback)]() mutable {
PostTask(std::move(callback));
});
}
void Node::FlushOutputExternal(size_t output_index, fit::closure callback) {
FXL_DCHECK_CREATION_THREAD_IS_CURRENT(thread_checker_);
FXL_DCHECK(output_index < outputs_.size());
FlushOutput(output_index,
[this, output_index, callback = std::move(callback)]() mutable {
{
std::lock_guard<std::mutex> locker(packets_per_output_mutex_);
auto& packets = packets_per_output_[output_index];
while (!packets.empty()) {
packets.pop_front();
}
}
PostTask(std::move(callback));
});
}
void Node::ConfigureInputDeferred(size_t input_index) {
FXL_DCHECK_CREATION_THREAD_IS_CURRENT(thread_checker_);
EnsureInput(input_index);
}
bool Node::ConfigureInputToUseLocalMemory(uint64_t max_aggregate_payload_size,
uint32_t max_payload_count,
size_t input_index) {
// This method runs on an arbitrary thread.
FXL_DCHECK(max_aggregate_payload_size != 0 || max_payload_count != 0);
EnsureInput(input_index);
Input& input = inputs_[input_index];
PayloadConfig& config = input.payload_config();
config.mode_ = PayloadMode::kUsesLocalMemory;
config.max_aggregate_payload_size_ = max_aggregate_payload_size;
config.max_payload_count_ = max_payload_count;
config.max_payload_size_ = 0;
config.vmo_allocation_ = VmoAllocation::kNotApplicable;
config.physically_contiguous_ = false;
input.payload_manager().ApplyInputConfiguration(config, zx::handle(),
nullptr);
return NotifyConnectionReady(input);
}
bool Node::ConfigureInputToUseVmos(
uint64_t max_aggregate_payload_size, uint32_t max_payload_count,
uint64_t max_payload_size, VmoAllocation vmo_allocation,
bool physically_contiguous, zx::handle bti_handle,
AllocateCallback allocate_callback, size_t input_index) {
// This method runs on an arbitrary thread.
FXL_DCHECK(max_aggregate_payload_size != 0 || max_payload_count != 0);
FXL_DCHECK(physically_contiguous == bti_handle.is_valid());
EnsureInput(input_index);
Input& input = inputs_[input_index];
PayloadConfig& config = input.payload_config();
config.mode_ = PayloadMode::kUsesVmos;
config.max_aggregate_payload_size_ = max_aggregate_payload_size;
config.max_payload_count_ = max_payload_count;
config.max_payload_size_ = max_payload_size;
config.vmo_allocation_ = vmo_allocation;
config.physically_contiguous_ = physically_contiguous;
input.payload_manager().ApplyInputConfiguration(config, std::move(bti_handle),
std::move(allocate_callback));
return NotifyConnectionReady(input);
}
bool Node::ConfigureInputToProvideVmos(VmoAllocation vmo_allocation,
bool physically_contiguous,
AllocateCallback allocate_callback,
size_t input_index) {
// This method runs on an arbitrary thread.
EnsureInput(input_index);
Input& input = inputs_[input_index];
PayloadConfig& config = input.payload_config();
config.mode_ = PayloadMode::kProvidesVmos;
config.max_aggregate_payload_size_ = 0;
config.max_payload_count_ = 0;
config.max_payload_size_ = 0;
config.vmo_allocation_ = vmo_allocation;
config.physically_contiguous_ = physically_contiguous;
input.payload_manager().ApplyInputConfiguration(config, zx::handle(),
std::move(allocate_callback));
return NotifyConnectionReady(input);
}
bool Node::InputConnectionReady(size_t input_index) const {
FXL_DCHECK(input_index < inputs_.size());
return inputs_[input_index].payload_manager().ready();
}
const PayloadVmos& Node::UseInputVmos(size_t input_index) const {
// This method runs on an arbitrary thread.
FXL_DCHECK(input_index < inputs_.size());
const Input& input = inputs_[input_index];
FXL_DCHECK(input.payload_config().mode_ == PayloadMode::kUsesVmos ||
input.payload_config().mode_ == PayloadMode::kProvidesVmos);
FXL_DCHECK(input.payload_manager().ready());
return input.payload_manager().input_vmos();
}
PayloadVmoProvision& Node::ProvideInputVmos(size_t input_index) {
// This method runs on an arbitrary thread.
FXL_DCHECK(input_index < inputs_.size());
Input& input = inputs_[input_index];
FXL_DCHECK(input.payload_config().mode_ == PayloadMode::kProvidesVmos);
FXL_DCHECK(input.payload_manager().ready());
return input.payload_manager().input_external_vmos();
}
void Node::RequestInputPacket(size_t input_index) {
// This method runs on an arbitrary thread.
FXL_DCHECK(input_index < inputs_.size());
inputs_[input_index].RequestPacket();
}
void Node::ConfigureOutputDeferred(size_t output_index) {
FXL_DCHECK_CREATION_THREAD_IS_CURRENT(thread_checker_);
EnsureOutput(output_index);
}
bool Node::ConfigureOutputToUseLocalMemory(uint64_t max_aggregate_payload_size,
uint32_t max_payload_count,
uint64_t max_payload_size,
size_t output_index) {
// This method runs on an arbitrary thread.
FXL_DCHECK(max_aggregate_payload_size != 0 ||
(max_payload_count != 0 && max_payload_size != 0));
EnsureOutput(output_index);
Output& output = outputs_[output_index];
PayloadConfig& config = output.payload_config();
config.mode_ = PayloadMode::kUsesLocalMemory;
config.max_aggregate_payload_size_ = max_aggregate_payload_size;
config.max_payload_count_ = max_payload_count;
config.max_payload_size_ = max_payload_size;
config.vmo_allocation_ = VmoAllocation::kNotApplicable;
config.physically_contiguous_ = false;
if (output.connected()) {
output.mate()->payload_manager().ApplyOutputConfiguration(config,
zx::handle());
}
return NotifyConnectionReady(output);
}
bool Node::ConfigureOutputToProvideLocalMemory(size_t output_index) {
// This method runs on an arbitrary thread.
EnsureOutput(output_index);
Output& output = outputs_[output_index];
PayloadConfig& config = output.payload_config();
config.mode_ = PayloadMode::kProvidesLocalMemory;
config.max_aggregate_payload_size_ = 0;
config.max_payload_count_ = 0;
config.max_payload_size_ = 0;
config.vmo_allocation_ = VmoAllocation::kNotApplicable;
config.physically_contiguous_ = false;
if (output.connected()) {
output.mate()->payload_manager().ApplyOutputConfiguration(config,
zx::handle());
}
return NotifyConnectionReady(output);
}
bool Node::ConfigureOutputToUseVmos(
uint64_t max_aggregate_payload_size, uint32_t max_payload_count,
uint64_t max_payload_size, VmoAllocation vmo_allocation,
bool physically_contiguous, zx::handle bti_handle, size_t output_index) {
// This method runs on an arbitrary thread.
FXL_DCHECK(max_aggregate_payload_size != 0 ||
(max_payload_count != 0 && max_payload_size != 0));
FXL_DCHECK(physically_contiguous == bti_handle.is_valid());
EnsureOutput(output_index);
Output& output = outputs_[output_index];
PayloadConfig& config = output.payload_config();
config.mode_ = PayloadMode::kUsesVmos;
config.max_aggregate_payload_size_ = max_aggregate_payload_size;
config.max_payload_count_ = max_payload_count;
config.max_payload_size_ = max_payload_size;
config.vmo_allocation_ = vmo_allocation;
config.physically_contiguous_ = physically_contiguous;
if (output.connected()) {
output.mate()->payload_manager().ApplyOutputConfiguration(
config, std::move(bti_handle));
} else {
// TODO: stash the bti handle
}
return NotifyConnectionReady(output);
}
bool Node::ConfigureOutputToProvideVmos(VmoAllocation vmo_allocation,
bool physically_contiguous,
size_t output_index) {
// This method runs on an arbitrary thread.
EnsureOutput(output_index);
Output& output = outputs_[output_index];
PayloadConfig& config = output.payload_config();
config.mode_ = PayloadMode::kProvidesVmos;
config.max_aggregate_payload_size_ = 0;
config.max_payload_count_ = 0;
config.max_payload_size_ = 0;
config.vmo_allocation_ = vmo_allocation;
config.physically_contiguous_ = physically_contiguous;
if (output.connected()) {
output.mate()->payload_manager().ApplyOutputConfiguration(config,
zx::handle());
}
return NotifyConnectionReady(output);
}
bool Node::OutputConnectionReady(size_t output_index) const {
FXL_DCHECK(output_index < outputs_.size());
return outputs_[output_index].mate()->payload_manager().ready();
}
fbl::RefPtr<PayloadBuffer> Node::AllocatePayloadBuffer(uint64_t size,
size_t output_index) {
// This method runs on an arbitrary thread.
FXL_DCHECK(output_index < outputs_.size());
Output& output = outputs_[output_index];
FXL_DCHECK(output.payload_config().mode_ != PayloadMode::kNotConfigured);
FXL_DCHECK(output.connected());
FXL_DCHECK(output.mate()->payload_manager().ready());
return output.mate()->payload_manager().AllocatePayloadBufferForOutput(size);
}
const PayloadVmos& Node::UseOutputVmos(size_t output_index) const {
// This method runs on an arbitrary thread.
FXL_DCHECK(output_index < outputs_.size());
const Output& output = outputs_[output_index];
FXL_DCHECK(output.payload_config().mode_ == PayloadMode::kUsesVmos ||
output.payload_config().mode_ == PayloadMode::kProvidesVmos);
FXL_DCHECK(output.connected());
FXL_DCHECK(output.mate()->payload_manager().ready());
return output.mate()->payload_manager().output_vmos();
}
PayloadVmoProvision& Node::ProvideOutputVmos(size_t output_index) {
// This method runs on an arbitrary thread.
FXL_DCHECK(output_index < outputs_.size());
Output& output = outputs_[output_index];
FXL_DCHECK(output.payload_config().mode_ == PayloadMode::kProvidesVmos);
FXL_DCHECK(output.connected());
FXL_DCHECK(output.mate()->payload_manager().ready());
return output.mate()->payload_manager().output_external_vmos();
}
void Node::PutOutputPacket(PacketPtr packet, size_t output_index) {
// This method runs on an arbitrary thread.
FXL_DCHECK(packet);
FXL_DCHECK(output_index < outputs_.size());
// Queue the packet if the output is connected, otherwise discard the
// packet.
if (outputs_[output_index].connected()) {
std::lock_guard<std::mutex> locker(packets_per_output_mutex_);
packets_per_output_[output_index].push_back(std::move(packet));
}
NeedsUpdate();
}
void Node::EnsureInput(size_t input_index) {
FXL_DCHECK_CREATION_THREAD_IS_CURRENT(thread_checker_);
while (inputs_.size() <= input_index) {
inputs_.emplace_back(this, inputs_.size()); // node, index
}
}
void Node::EnsureOutput(size_t output_index) {
FXL_DCHECK_CREATION_THREAD_IS_CURRENT(thread_checker_);
while (outputs_.size() <= output_index) {
outputs_.emplace_back(this, outputs_.size()); // node, index
}
std::lock_guard<std::mutex> locker(packets_per_output_mutex_);
packets_per_output_.resize(output_index + 1);
}
} // namespace media_player