// Copyright 2021 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include <lib/fpromise/bridge.h>
#include <lib/fpromise/promise.h>
#include <lib/syslog/cpp/macros.h>
#include <deque>
#include "src/lib/fxl/synchronization/thread_annotations.h"
namespace fmlib {
// Errors returned by |StreamQueue::pull|.
enum class StreamQueueError {
// |cancel_pull| was called.
// |drain| was called, and all elements have been pulled from the queue.
// Thread-safe, single-producer, single-consumer queue intended for media streams. |T| is the packet
// type, which must be movable. |U| is the clear request type, which must also be moveable.
// A queue element can be a packet, a clear request, or an 'ended' signal. All elements pass
// through the queue on a first-in, first-out basis with the exception of clear requests.
// A packet represents a fragment of the stream corresponding to some interval time. The packet
// type is a template parameter to allow this container to be used with a variety of packet
// implementations.
// A clear request is used to clear a pipeline. When a clear request is added to the queue, all
// elements in the queue other than clear requests are removed from the queue and destroyed.
// Clear requests are intended to be forwarded downstream to clear an entire pipeline. The clear
// request type is a template parameter to allow this container to be used with a variety of
// clear request implementations.
// An 'ended' signal marks the end of a stream.
// TODO(dalesat): lockless?
// TODO(dalesat): multi-consumer?
template <typename T, typename U>
class StreamQueue {
using PacketType = T;
using ClearRequestType = U;
// Type for elements of a queue. An |Element| may contain a packet, a clear request, or an
// end-of-stream indication.
class Element {
enum Tag { kPacket, kClearRequest, kEnded };
// Constructs an element containing a packet.
explicit Element(T packet) : state_(std::in_place_index<Tag::kPacket>, std::move(packet)) {}
// Constructs an element containing a clear request.
explicit Element(U clear_request)
: state_(std::in_place_index<Tag::kClearRequest>, std::move(clear_request)) {}
// Returns an element containing an end-of-stream indication.
static Element Ended() { return Element(); }
// Returns the tag classifying the content of this |Element|.
constexpr Tag tag() const { return static_cast<Tag>(state_.index()); }
// Determines whether this element contains a packet.
constexpr bool is_packet() const { return tag() == Tag::kPacket; }
// Determines whether this element contains a clear request.
constexpr bool is_clear_request() const { return tag() == Tag::kClearRequest; }
// Determines whether this element contains an end-of-stream indication.
constexpr bool is_ended() const { return tag() == Tag::kEnded; }
// Returns a const reference to the contained packet. May only be called if this |Element|
// contains a packet.
constexpr T& packet() { return std::get<Tag::kPacket>(state_); }
// Takes (moves) the contained packet. May only be called if this |Element| contains a packet.
T take_packet() { return std::move(std::get<Tag::kPacket>(state_)); }
// Returns a const reference to the contained clear request. May only be called if this
// |Element| contains a clear request.
constexpr U& clear_request() { return std::get<Tag::kClearRequest>(state_); }
// Takes (moves) the contained clear request. May only be called if this |Element| contains a
// clear request.
U take_clear_request() { return std::move(std::get<Tag::kClearRequest>(state_)); }
// Constructs an |Element| containing an end-of-stream indication.
Element() : state_(std::in_place_index<Tag::kEnded>) {}
std::variant<T, U, std::monostate> state_;
// Constructs an empty queue.
StreamQueue() = default;
// Destructs a queue.
~StreamQueue() { cancel_pull(); }
// Disallow copy, assign, and move.
StreamQueue(StreamQueue&&) = delete;
StreamQueue(const StreamQueue&) = delete;
StreamQueue& operator=(StreamQueue&&) = delete;
StreamQueue& operator=(const StreamQueue&) = delete;
// Pushes a packet to the tail of the queue.
template <typename V>
void push(V&& packet) {
std::lock_guard<std::mutex> locker(mutex_);
if (pull_completer_) {
// Pushes a packet to the tail of the queue. Viable for copyable packet types only.
// TODO(dalesat): This version could be removed if T is always move-only (as opposed to moveable).
// The unit tests use uint32_t for T, so changing that is a prerequisite.
template <typename V>
void push(const V& packet) {
std::lock_guard<std::mutex> locker(mutex_);
if (pull_completer_) {
using PullResult = fpromise::result<Element, StreamQueueError>;
// Returns a promise that completes with the element at the front of the queue, removing it on
// completion. After this method is called, it may not be called again until after the promise
// completes.
[[nodiscard]] fpromise::promise<Element, StreamQueueError> pull() {
std::lock_guard<std::mutex> locker(mutex_);
FX_CHECK(!pull_completer_) << "pull() was called before the previous call completed.";
if (!deque_.empty()) {
auto result = fpromise::make_result_promise<Element, StreamQueueError>(
return result;
if (draining_) {
return fpromise::make_result_promise<Element, StreamQueueError>(
fpromise::bridge<Element, StreamQueueError> bridge;
pull_completer_ = std::move(bridge.completer);
return bridge.consumer.promise();
// Sets a closure that is called whenever a |clear| method is called. Pass a null |closure| to
// deregister a previously-registered closure.
// This method is typically used when the thread that calls |pull| may be blocked when |clear| is
// called, and another thread must take action to unblock that thread so that the clear operation
// may propagate. Threads that process packets in software will often need to block pending memory
// allocation from the output side, because the third-party software on which they are based calls
// out synchronously for allocation of output buffers.
void set_cleared_closure(fit::closure closure) {
std::lock_guard<std::mutex> locker(mutex_);
cleared_closure_ = std::move(closure);
// Cancels the previously-created |pull| promise and returns true. Returns false if there is no
// |pull| promise pending.
bool cancel_pull() {
std::lock_guard<std::mutex> locker(mutex_);
if (!pull_completer_) {
return false;
return true;
// Clears the queue of all packets and end-of-stream elements and enqueues a |kCleared|
// element.
template <typename V>
void clear(V&& clear_request) {
fit::closure cleared_closure;
std::lock_guard<std::mutex> locker(mutex_);
if (cleared_closure_) {
cleared_closure = cleared_closure_.share();
if (pull_completer_) {
if (cleared_closure) {
// Clears the queue of all packets and end-of-stream elements and enqueues a |kCleared|
// element. Viable for copyable clear request types only.
// TODO(dalesat): As with pull() above, this version can be removed if U is move-only.
template <typename V>
void clear(V& clear_request) {
fit::closure cleared_closure;
std::lock_guard<std::mutex> locker(mutex_);
if (cleared_closure_) {
cleared_closure = cleared_closure_.share();
if (pull_completer_) {
if (cleared_closure) {
// Enqueues a |kEnded| element.
void end() {
std::lock_guard<std::mutex> locker(mutex_);
if (pull_completer_) {
// Starts draining the queue. After this method is called, |push|, |clear|, |end| and |drain|
// may not be called. After this method is called and the queue is empty, the promise returned
// by |pull| will return |StreamQueueError::kDrained|.
void drain() {
std::lock_guard<std::mutex> locker(mutex_);
draining_ = true;
if (pull_completer_) {
// Returns true if and only if the queue is empty.
bool empty() const {
std::lock_guard<std::mutex> locker(mutex_);
return deque_.empty();
// Returns the number of elements in the queue.
size_t size() const {
std::lock_guard<std::mutex> locker(mutex_);
return deque_.size();
// Returns true if and only if |drain| has been called and the queue is empty.
bool is_drained() const {
std::lock_guard<std::mutex> locker(mutex_);
return draining_ && deque_.empty();
// Removes all packet and end-of-stream elements from the queue.
void clear_internal() FXL_EXCLUSIVE_LOCKS_REQUIRED(mutex_) {
while (!deque_.empty()) {
if (deque_.back().is_clear_request()) {
mutable std::mutex mutex_;
std::deque<Element> deque_ FXL_GUARDED_BY(mutex_);
fpromise::completer<Element, StreamQueueError> pull_completer_ FXL_GUARDED_BY(mutex_);
fit::closure cleared_closure_ FXL_GUARDED_BY(mutex_);
bool draining_ FXL_GUARDED_BY(mutex_) = false;
} // namespace fmlib