blob: b65d716da6399a63e6fb06404751864366c879c3 [file] [log] [blame]
// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "garnet/bin/mediaplayer/core/player_core.h"
#include <lib/async/cpp/task.h>
#include <lib/async/dispatcher.h>
#include <queue>
#include <unordered_set>
#include "garnet/bin/mediaplayer/graph/formatting.h"
#include "garnet/bin/mediaplayer/util/callback_joiner.h"
#include "lib/fxl/logging.h"
namespace media_player {
PlayerCore::PlayerCore(async_dispatcher_t* dispatcher)
: graph_(dispatcher), dispatcher_(dispatcher) {}
PlayerCore::~PlayerCore() {}
void PlayerCore::SetSourceSegment(std::unique_ptr<SourceSegment> source_segment,
fit::closure callback) {
FXL_DCHECK(source_segment);
FXL_DCHECK(source_segment->provisioned());
FXL_DCHECK(callback);
ClearSourceSegment();
source_segment_ = std::move(source_segment);
set_source_segment_callback_ = std::move(callback);
set_source_segment_countdown_ = 1;
// This callback notifies the player of changes to source_segment_'s
// problem() and/or metadata() values.
source_segment_->SetUpdateCallback([this]() { NotifyUpdate(); });
// This callback notifies of changes to the set of streams.
source_segment_->SetStreamUpdateCallback(
[this](size_t index, const SourceSegment::Stream* stream, bool more) {
if (stream) {
++set_source_segment_countdown_;
OnStreamUpdated(index, *stream);
} else {
OnStreamRemoved(index);
}
if (more) {
return;
}
if (set_source_segment_callback_) {
MaybeCompleteSetSourceSegment();
return;
}
NotifyUpdate();
});
// Account for the streams that have already been enumerated.
size_t index = 0;
for (auto& stream : source_segment_->streams()) {
if (stream.valid()) {
++set_source_segment_countdown_;
OnStreamUpdated(index, stream);
}
++index;
}
if (!source_segment_->stream_add_imminent()) {
MaybeCompleteSetSourceSegment();
}
}
void PlayerCore::ClearSourceSegment() {
if (!source_segment_) {
return;
}
while (!streams_.empty()) {
OnStreamRemoved(streams_.size() - 1);
}
source_segment_->Deprovision();
source_segment_ = nullptr;
}
void PlayerCore::SetSinkSegment(std::unique_ptr<SinkSegment> sink_segment,
StreamType::Medium medium) {
// If we already have a sink segment for this medium, discard it.
auto old_sink_segment = TakeSinkSegment(medium);
if (old_sink_segment) {
old_sink_segment->Deprovision();
old_sink_segment.reset();
}
if (!sink_segment) {
return;
}
sink_segment->Provision(&graph_, dispatcher_, [this]() {
// This callback notifies the player of changes to
// source_segment_'s problem() and/or
// end_of_stream() values.
NotifyUpdate();
});
Stream* stream = GetStream(medium);
if (stream) {
FXL_DCHECK(!stream->sink_segment_);
stream->sink_segment_ = std::move(sink_segment);
ConnectStream(stream);
return;
}
// We have no stream for this medium. Park the segment.
parked_sink_segments_[medium] = std::move(sink_segment);
}
void PlayerCore::Prime(fit::closure callback) {
primed_ = true;
auto callback_joiner = CallbackJoiner::Create();
for (auto& stream : streams_) {
if (stream.sink_segment_) {
stream.sink_segment_->Prime(callback_joiner->NewCallback());
}
}
callback_joiner->WhenJoined([this, callback = std::move(callback)]() mutable {
async::PostTask(dispatcher_, std::move(callback));
});
}
void PlayerCore::Flush(bool hold_frame, fit::closure callback) {
primed_ = false;
if (source_segment_) {
source_segment_->Flush(hold_frame,
[this, callback = std::move(callback)]() mutable {
async::PostTask(dispatcher_, std::move(callback));
});
} else {
async::PostTask(dispatcher_, std::move(callback));
}
}
void PlayerCore::SetTimelineFunction(media::TimelineFunction timeline_function,
fit::closure callback) {
FXL_DCHECK(timeline_function.reference_delta() != 0);
// We allow pause even though the source may not be capable. We should be
// able to stop progress prior to shutting down the player.
// TODO(dalesat): Check for pause if this turns out not to be the case.
int64_t reference_time = timeline_function.reference_time();
if (reference_time == fuchsia::media::NO_TIMESTAMP) {
reference_time = media::Timeline::local_now() + kMinimumLeadTime;
}
int64_t subject_time = timeline_function.subject_time();
if (subject_time == fuchsia::media::NO_TIMESTAMP) {
subject_time = timeline_function_(reference_time);
}
timeline_function_ = media::TimelineFunction(subject_time, reference_time,
timeline_function.rate());
auto callback_joiner = CallbackJoiner::Create();
for (auto& stream : streams_) {
if (stream.sink_segment_) {
stream.sink_segment_->SetTimelineFunction(timeline_function_,
callback_joiner->NewCallback());
}
}
callback_joiner->WhenJoined([this, callback = std::move(callback)]() mutable {
async::PostTask(dispatcher_, std::move(callback));
});
}
void PlayerCore::SetProgramRange(uint64_t program, int64_t min_pts,
int64_t max_pts) {
for (auto& stream : streams_) {
if (stream.sink_segment_) {
stream.sink_segment_->SetProgramRange(program, min_pts, max_pts);
}
}
}
void PlayerCore::Seek(int64_t position, fit::closure callback) {
if (source_segment_) {
FXL_DCHECK(can_seek());
source_segment_->Seek(position,
[this, callback = std::move(callback)]() mutable {
async::PostTask(dispatcher_, std::move(callback));
});
} else {
async::PostTask(dispatcher_, std::move(callback));
}
}
bool PlayerCore::end_of_stream() const {
bool result = false;
for (auto& stream : streams_) {
if (stream.sink_segment_) {
if (!stream.sink_segment_->end_of_stream()) {
return false;
}
result = true;
}
}
return result;
}
int64_t PlayerCore::duration_ns() const {
if (source_segment_) {
return source_segment_->duration_ns();
}
return 0;
}
bool PlayerCore::can_pause() const {
if (source_segment_) {
return source_segment_->can_pause();
}
return false;
}
bool PlayerCore::can_seek() const {
if (source_segment_) {
return source_segment_->can_seek();
}
return false;
}
const Metadata* PlayerCore::metadata() const {
if (source_segment_ && source_segment_->metadata()) {
return source_segment_->metadata();
}
return nullptr;
}
const fuchsia::mediaplayer::Problem* PlayerCore::problem() const {
// First, see if the source segment has a problem to report.
if (source_segment_ && source_segment_->problem()) {
return source_segment_->problem();
}
// See if any of the sink segments have a problem to report.
for (auto& stream : streams_) {
if (stream.sink_segment_ && stream.sink_segment_->problem()) {
return stream.sink_segment_->problem();
}
}
return nullptr;
}
void PlayerCore::NotifyUpdate() {
if (update_callback_) {
update_callback_();
}
}
const PlayerCore::Stream* PlayerCore::GetStream(
StreamType::Medium medium) const {
for (auto& stream : streams_) {
if (stream.stream_type_ && stream.stream_type_->medium() == medium) {
return &stream;
}
}
return nullptr;
}
PlayerCore::Stream* PlayerCore::GetStream(StreamType::Medium medium) {
for (auto& stream : streams_) {
if (stream.stream_type_ && stream.stream_type_->medium() == medium) {
return &stream;
}
}
return nullptr;
}
SinkSegment* PlayerCore::GetParkedSinkSegment(StreamType::Medium medium) const {
auto iter = parked_sink_segments_.find(medium);
return iter == parked_sink_segments_.end() ? nullptr : iter->second.get();
}
void PlayerCore::OnStreamUpdated(size_t index,
const SourceSegment::Stream& update_stream) {
if (streams_.size() < index + 1) {
streams_.resize(index + 1);
}
Stream& stream = streams_[index];
if (stream.sink_segment_) {
FXL_DCHECK(stream.stream_type_);
if (stream.stream_type_->medium() != update_stream.type().medium()) {
// The sink segment for this stream is for the wrong medium. Park it.
FXL_DCHECK(!GetParkedSinkSegment(stream.stream_type_->medium()));
parked_sink_segments_[stream.stream_type_->medium()] =
TakeSinkSegment(&stream);
}
}
stream.stream_type_ = update_stream.type().Clone();
stream.output_ = update_stream.output();
if (!stream.sink_segment_) {
stream.sink_segment_ = TakeSinkSegment(stream.stream_type_->medium());
if (!stream.sink_segment_) {
// No sink segment has been registered for this medium.
MaybeCompleteSetSourceSegment();
return;
}
}
ConnectStream(&stream);
}
void PlayerCore::OnStreamRemoved(size_t index) {
if (streams_.size() < index + 1) {
return;
}
Stream& stream = streams_[index];
if (stream.sink_segment_) {
FXL_DCHECK(stream.stream_type_);
// Park this stream segment.
FXL_DCHECK(!GetParkedSinkSegment(stream.stream_type_->medium()));
parked_sink_segments_[stream.stream_type_->medium()] =
TakeSinkSegment(&stream);
}
stream.stream_type_ = nullptr;
stream.output_ = nullptr;
// Remove unused entries at the back of streams_.
while (!streams_.empty() && !streams_[streams_.size() - 1].stream_type_) {
streams_.resize(streams_.size() - 1);
}
}
void PlayerCore::MaybeCompleteSetSourceSegment() {
if (!set_source_segment_callback_) {
return;
}
if (--set_source_segment_countdown_ == 0) {
fit::closure callback = std::move(set_source_segment_callback_);
callback();
}
}
std::unique_ptr<SinkSegment> PlayerCore::TakeSinkSegment(
StreamType::Medium medium) {
auto iter = parked_sink_segments_.find(medium);
if (iter != parked_sink_segments_.end()) {
auto result = std::move(iter->second);
parked_sink_segments_.erase(iter);
return result;
}
Stream* stream = GetStream(medium);
if (stream && stream->sink_segment_) {
return TakeSinkSegment(stream);
}
return nullptr;
}
std::unique_ptr<SinkSegment> PlayerCore::TakeSinkSegment(Stream* stream) {
FXL_DCHECK(stream);
FXL_DCHECK(stream->sink_segment_);
if (stream->sink_segment_->connected()) {
stream->sink_segment_->Disconnect();
}
return std::move(stream->sink_segment_);
}
void PlayerCore::ConnectStream(Stream* stream) {
FXL_DCHECK(stream);
FXL_DCHECK(stream->sink_segment_);
FXL_DCHECK(stream->stream_type_);
FXL_DCHECK(stream->output_);
stream->sink_segment_->Connect(
*stream->stream_type_, stream->output_, [this, stream](Result result) {
if (result != Result::kOk) {
// The segment will report a problem separately.
return;
}
MaybeCompleteSetSourceSegment();
if (primed_ && stream->sink_segment_) {
stream->sink_segment_->Prime([this, stream]() {
if (timeline_function_.subject_delta() != 0 &&
stream->sink_segment_) {
stream->sink_segment_->SetTimelineFunction(timeline_function_,
[]() {});
}
});
}
});
}
void PlayerCore::Dump(std::ostream& os) const {
std::queue<NodeRef> backlog;
std::unordered_set<Node*> visited;
backlog.push(source_node());
visited.insert(source_node().GetNode());
while (!backlog.empty()) {
NodeRef node = backlog.front();
backlog.pop();
os << fostr::NewLine << fostr::NewLine;
node.GetNode()->Dump(os);
for (size_t output_index = 0; output_index < node.output_count();
++output_index) {
OutputRef output = node.output(output_index);
if (!output.connected()) {
continue;
}
NodeRef downstream = output.mate().node();
Node* downstream_node = downstream.GetNode();
if (visited.find(downstream_node) != visited.end()) {
continue;
}
backlog.push(downstream);
visited.insert(downstream_node);
}
}
}
} // namespace media_player