blob: 1d3461f5c151b393b2b59aa368d63ef39208c6ce [file] [log] [blame]
// Copyright 2022 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef SRC_SYS_FUZZING_COMMON_ASYNC_DEQUE_H_
#define SRC_SYS_FUZZING_COMMON_ASYNC_DEQUE_H_
#include <lib/fit/thread_checker.h>
#include <deque>
#include "src/lib/fxl/macros.h"
#include "src/sys/fuzzing/common/async-types.h"
namespace fuzzing {
// The |AsyncDeque| class facilitates creating asynchronous pipelines which move objects from one
// future to another.
//
// While this class can be used by multiple futures, it is *not* thread-safe, i.e. all futures
// must share a common executor.
//
template <typename T>
class AsyncDeque {
public:
AsyncDeque() = default;
~AsyncDeque() = default;
// Takes ownership of |t|. If this object has not been closed, it will resume any pending futures
// waiting to |Receive| an |T|. Barring any calls to |Resend|, the waiter(s) will |Receive| |T|s
// in the order they were sent.
void Send(T&& t) {
FIT_DCHECK_IS_THREAD_VALID(thread_checker_);
if (closed_) {
return;
}
if (completers_.empty()) {
queue_.emplace_back(std::move(t));
} else {
completers_.front().complete_ok(std::move(t));
}
}
// Takes ownership of |t|. If this object has not been closed, it will resume any pending futures
// waiting to |Receive| an |T|. |T|s that have been resent will be |Receive|d before any other
// |T|s.
void Resend(T&& t) {
FIT_DCHECK_IS_THREAD_VALID(thread_checker_);
if (closed_) {
return;
}
if (completers_.empty()) {
queue_.emplace_front(std::move(t));
} else {
completers_.front().complete_ok(std::move(t));
}
}
// Returns a promise to get a |T| once it has been (re-)sent. If this object is closed, this can
// still return data that was "in-flight", i.e. (re-)sent but not yet |Receive|d.
Promise<T> Receive() {
FIT_DCHECK_IS_THREAD_VALID(thread_checker_);
if (completers_.empty() && !queue_.empty()) {
auto t = std::move(queue_.front());
queue_.pop_front();
return fpromise::make_result_promise<T>(fpromise::ok(std::move(t)));
}
if (closed_) {
return fpromise::make_result_promise<T>(fpromise::error());
}
fpromise::bridge<T> bridge;
completers_.emplace_back(std::move(bridge.completer));
return bridge.consumer.promise_or(fpromise::error())
.and_then([](T& t) -> Result<T> { return fpromise::ok(std::move(t)); })
.or_else([] { return fpromise::error(); })
.wrap_with(scope_);
}
// Closes this object, preventing any further data from being (re-)sent.
void Close() {
closed_ = true;
completers_.clear();
}
private:
std::deque<fpromise::completer<T>> completers_;
std::deque<T> queue_;
bool closed_ = false;
Scope scope_;
FIT_DECLARE_THREAD_CHECKER(thread_checker_)
FXL_DISALLOW_COPY_ASSIGN_AND_MOVE(AsyncDeque);
};
} // namespace fuzzing
#endif // SRC_SYS_FUZZING_COMMON_ASYNC_DEQUE_H_