blob: 6a9b731ab74c8f6d2258d0f848673e595885c546 [file] [log] [blame]
// Copyright 2016 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "garnet/bin/mediaplayer/graph/graph.h"
#include "garnet/bin/mediaplayer/graph/formatting.h"
#include "garnet/bin/mediaplayer/util/callback_joiner.h"
#include "garnet/bin/mediaplayer/util/threadsafe_callback_joiner.h"
namespace media_player {
Graph::Graph(async_dispatcher_t* dispatcher) : dispatcher_(dispatcher) {}
Graph::~Graph() { Reset(); }
NodeRef Graph::Add(std::shared_ptr<Node> node) {
FXL_DCHECK(node);
FXL_DCHECK(dispatcher_);
node->SetDispatcher(dispatcher_);
node->ConfigureConnectors();
nodes_.push_back(node);
if (node->input_count() == 0) {
sources_.push_back(node.get());
}
if (node->output_count() == 0) {
sinks_.push_back(node.get());
}
return NodeRef(node.get());
}
void Graph::RemoveNode(NodeRef node_ref) {
FXL_DCHECK(node_ref);
Node* node = node_ref.node_;
size_t input_count = node->input_count();
for (size_t input_index = 0; input_index < input_count; input_index++) {
Input& input = node->input(input_index);
if (input.connected()) {
DisconnectInput(InputRef(&input));
}
}
size_t output_count = node->output_count();
for (size_t output_index = 0; output_index < output_count; output_index++) {
Output& output = node->output(output_index);
if (output.connected()) {
DisconnectOutput(OutputRef(&output));
}
}
sources_.remove(node);
sinks_.remove(node);
nodes_.remove(node->shared_from_this());
}
NodeRef Graph::Connect(const OutputRef& output_ref, const InputRef& input_ref) {
FXL_DCHECK(output_ref);
FXL_DCHECK(input_ref);
if (output_ref.connected()) {
DisconnectOutput(output_ref);
}
if (input_ref.connected()) {
DisconnectInput(input_ref);
}
Output& output = *output_ref.actual();
Input& input = *input_ref.actual();
input.Connect(&output);
// This call might apply the output configuration to the payload manager.
output.Connect(&input);
// If the payload manager is ready, notify the nodes.
if (input.payload_manager().ready()) {
input.node()->NotifyInputConnectionReady(input.index());
output.node()->NotifyOutputConnectionReady(output.index());
}
return input_ref.node();
}
NodeRef Graph::ConnectNodes(NodeRef upstream_node, NodeRef downstream_node) {
FXL_DCHECK(upstream_node);
FXL_DCHECK(downstream_node);
Connect(upstream_node.output(), downstream_node.input());
return downstream_node;
}
NodeRef Graph::ConnectOutputToNode(const OutputRef& output,
NodeRef downstream_node) {
FXL_DCHECK(output);
FXL_DCHECK(downstream_node);
Connect(output, downstream_node.input());
return downstream_node;
}
NodeRef Graph::ConnectNodeToInput(NodeRef upstream_node,
const InputRef& input) {
FXL_DCHECK(upstream_node);
FXL_DCHECK(input);
Connect(upstream_node.output(), input);
return input.node();
}
void Graph::DisconnectOutput(const OutputRef& output) {
FXL_DCHECK(output);
if (!output.connected()) {
return;
}
Output* actual_output = output.actual();
FXL_DCHECK(actual_output);
Input* mate = actual_output->mate();
FXL_DCHECK(mate);
mate->Disconnect();
actual_output->Disconnect();
}
void Graph::DisconnectInput(const InputRef& input) {
FXL_DCHECK(input);
if (!input.connected()) {
return;
}
Input* actual_input = input.actual();
FXL_DCHECK(actual_input);
Output* mate = actual_input->mate();
FXL_DCHECK(mate);
mate->Disconnect();
actual_input->Disconnect();
}
void Graph::RemoveNodesConnectedToNode(NodeRef node) {
FXL_DCHECK(node);
std::deque<NodeRef> to_remove{node};
while (!to_remove.empty()) {
NodeRef node = to_remove.front();
to_remove.pop_front();
for (size_t i = 0; i < node.input_count(); ++i) {
if (node.input(i).connected()) {
to_remove.push_back(node.input(i).mate().node());
}
}
for (size_t i = 0; i < node.output_count(); ++i) {
if (node.output(i).connected()) {
to_remove.push_back(node.output(i).mate().node());
}
}
RemoveNode(node);
}
}
void Graph::RemoveNodesConnectedToOutput(const OutputRef& output) {
FXL_DCHECK(output);
if (!output.connected()) {
return;
}
NodeRef downstream_node = output.mate().node();
DisconnectOutput(output);
RemoveNodesConnectedToNode(downstream_node);
}
void Graph::RemoveNodesConnectedToInput(const InputRef& input) {
FXL_DCHECK(input);
if (!input.connected()) {
return;
}
NodeRef upstream_node = input.mate().node();
DisconnectInput(input);
RemoveNodesConnectedToNode(upstream_node);
}
void Graph::Reset() {
sources_.clear();
sinks_.clear();
auto joiner = ThreadsafeCallbackJoiner::Create();
for (auto& node : nodes_) {
node->Acquire(joiner->NewCallback());
}
joiner->WhenJoined(dispatcher_, [nodes = std::move(nodes_)]() mutable {
while (!nodes.empty()) {
std::shared_ptr<Node> node = nodes.front();
nodes.pop_front();
node->ShutDown();
}
});
}
void Graph::FlushOutput(const OutputRef& output, bool hold_frame,
fit::closure callback) {
FXL_DCHECK(output);
std::queue<Output*> backlog;
backlog.push(output.actual());
FlushOutputs(&backlog, hold_frame, std::move(callback));
}
void Graph::FlushAllOutputs(NodeRef node, bool hold_frame,
fit::closure callback) {
FXL_DCHECK(node);
std::queue<Output*> backlog;
size_t output_count = node.output_count();
for (size_t output_index = 0; output_index < output_count; output_index++) {
backlog.push(node.output(output_index).actual());
}
FlushOutputs(&backlog, hold_frame, std::move(callback));
}
void Graph::PostTask(fit::closure task,
std::initializer_list<NodeRef> node_refs) {
auto joiner = ThreadsafeCallbackJoiner::Create();
std::vector<Node*> nodes;
for (NodeRef node_ref : node_refs) {
node_ref.node_->Acquire(joiner->NewCallback());
nodes.push_back(node_ref.node_);
}
joiner->WhenJoined(dispatcher_,
[task = std::move(task), nodes = std::move(nodes)]() {
task();
for (auto node : nodes) {
node->Release();
}
});
}
void Graph::FlushOutputs(std::queue<Output*>* backlog, bool hold_frame,
fit::closure callback) {
FXL_DCHECK(backlog);
auto callback_joiner = CallbackJoiner::Create();
// Walk the graph downstream from the outputs already in the backlog until
// we hit a sink. The |FlushOutputExternal| and |FlushInputExternal| calls are
// all issued synchronously from this loop, and then we wait for all the
// callbacks to be called. This works, because downstream flow is halted
// synchronously, even though the nodes may have additional flushing business
// that needs time to complete.
while (!backlog->empty()) {
Output* output = backlog->front();
backlog->pop();
FXL_DCHECK(output);
if (!output->connected()) {
continue;
}
Input* input = output->mate();
FXL_DCHECK(input);
Node* input_node = input->node();
output->node()->FlushOutputExternal(output->index(),
callback_joiner->NewCallback());
input_node->FlushInputExternal(input->index(), hold_frame,
callback_joiner->NewCallback());
for (size_t output_index = 0; output_index < input_node->output_count();
++output_index) {
backlog->push(&input_node->output(output_index));
}
}
callback_joiner->WhenJoined(std::move(callback));
}
void Graph::VisitUpstream(Input* input, const Visitor& visitor) {
FXL_DCHECK(input);
std::queue<Input*> backlog;
backlog.push(input);
while (!backlog.empty()) {
Input* input = backlog.front();
backlog.pop();
FXL_DCHECK(input);
if (!input->connected()) {
continue;
}
Output* output = input->mate();
Node* output_node = output->node();
visitor(input, output);
for (size_t input_index = 0; input_index < output_node->input_count();
++input_index) {
backlog.push(&output_node->input(input_index));
}
}
}
} // namespace media_player