blob: 6937ddeef7bcec094b5ebbfbba622794bb8e90af [file] [log] [blame]
// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef GARNET_BIN_MEDIA_CODECS_SW_FFMPEG_MPSC_QUEUE_H_
#define GARNET_BIN_MEDIA_CODECS_SW_FFMPEG_MPSC_QUEUE_H_
#include <atomic>
#include <memory>
#include <optional>
#include <queue>
#include <stack>
#include <lib/fit/defer.h>
#include <lib/fxl/macros.h>
#include <lib/zx/event.h>
#include <zircon/assert.h>
// A lock free queue for multiple producers and a single consumer.
template <typename T>
class MpscQueue {
public:
MpscQueue() : cache_(nullptr), head_(nullptr) {}
~MpscQueue() { Clear(); }
// Pushes a new element onto the queue.
//
// In any given thread, elements pushed first will be dequeued first. When
// pushers on different threads contend it is not gauranteed that the thread
// to call first will end up in the queue first.
template <typename U>
void Push(U&& element) {
Cell* loaded_head;
Cell* new_head = new Cell{.element = std::forward<T>(element)};
do {
loaded_head = head_.load();
new_head->next = loaded_head;
} while (!head_.compare_exchange_strong(loaded_head, new_head));
}
// Pops an element from the queue.
//
// This should only be called from the consumer thread.
std::optional<T> Pop() {
if (!cache_) {
cache_ = TakeHead();
}
if (!cache_) {
return std::nullopt;
}
T elem = std::move(cache_->element);
Cell* to_delete = cache_;
cache_ = cache_->next;
delete to_delete;
return elem;
}
// Drops all elements from the queue.
//
// This should only be called from the consumer thread.
void Clear() {
while (Pop()) {
}
}
private:
struct Cell {
T element;
Cell* next;
};
Cell* TakeHead() {
Cell* node;
do {
node = head_.load();
} while (!head_.compare_exchange_strong(node, nullptr));
if (!node) {
return nullptr;
}
Cell* prev = nullptr;
while (node) {
Cell* tmp = node;
node = node->next;
tmp->next = prev;
prev = tmp;
}
return prev;
}
Cell* cache_;
std::atomic<Cell*> head_;
FXL_DISALLOW_COPY_ASSIGN_AND_MOVE(MpscQueue);
};
// A multiproducer single consumer queue which blocks for the consumer.
template <typename T>
class BlockingMpscQueue {
public:
// Deconstructs the queue and returns all its elements.
//
// This should only be called on the consumer thread.
static std::queue<T> Extract(BlockingMpscQueue&& queue) {
queue.StopAllWaits();
std::queue<T> elements;
std::optional<T> element;
while ((element = queue.queue_.Pop())) {
elements.push(std::move(*element));
}
return elements;
}
BlockingMpscQueue() : should_wait_(true) {
zx_status_t status = zx::event::create(0, &should_wait_event_);
ZX_ASSERT(status == ZX_OK);
}
template <typename U>
void Push(U&& element) {
queue_.Push(std::forward<T>(element));
should_wait_event_.signal(ZX_EVENT_SIGNAL_MASK, ZX_EVENT_SIGNALED);
}
// Stops all waiting threads. We call this when a stream is stopped to abort
// the input processing loop.
void StopAllWaits() {
should_wait_.store(false);
should_wait_event_.signal(ZX_EVENT_SIGNAL_MASK, ZX_EVENT_SIGNALED);
}
// Resets the queue to its default state.
void Reset(bool keep_data = false) {
should_wait_.store(true);
if (!keep_data) {
queue_.Clear();
}
}
// Get an element or block until one is available if the queue is empty.
// If a thread calls StopAllWaits(), std::nullopt is returned.
//
// This should only be called on the consumer thread.
std::optional<T> WaitForElement() {
std::optional<T> element;
while (should_wait_ && !(element = queue_.Pop())) {
should_wait_event_.wait_one(ZX_EVENT_SIGNALED, zx::time(ZX_TIME_INFINITE),
nullptr);
}
return element;
}
private:
zx::event should_wait_event_;
std::atomic<bool> should_wait_;
MpscQueue<T> queue_;
};
#endif // GARNET_BIN_MEDIA_CODECS_SW_FFMPEG_MPSC_QUEUE_H_