blob: 7d636bddd331aa509161acaea91d00ee0c233fd1 [file] [log] [blame]
// Copyright 2016 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include <lib/async/cpp/operation.h>
#include <utility>
#include <lib/async/cpp/task.h>
#include <lib/async/default.h>
#include <lib/fit/bridge.h>
#include <lib/fit/defer.h>
#include <lib/fxl/logging.h>
#include <trace/event.h>
namespace modular {
constexpr char kModularTraceCategory[] = "modular";
constexpr char kTraceIdKey[] = "id";
constexpr char kTraceInfoKey[] = "info";
OperationContainer::OperationContainer() = default;
OperationContainer::~OperationContainer() = default;
void OperationContainer::Add(OperationBase* const o) {
FXL_DCHECK(o != nullptr);
o->SetOwner(this);
Hold(o); // Takes ownership.
}
void OperationContainer::Schedule(OperationBase* const o) { o->Schedule(); }
void OperationContainer::InvalidateWeakPtrs(OperationBase* const o) {
o->InvalidateWeakPtrs();
}
OperationCollection::OperationCollection()
: executor_(async_get_default_dispatcher()), weak_ptr_factory_(this) {}
OperationCollection::~OperationCollection() {
// We invalidate weakptrs to all Operation<>s first before destroying them, so
// that an outstanding FlowToken<> that gets destroyed in the process doesn't
// erroneously call Operation<>::Done.
for (auto& operation : operations_) {
FXL_DCHECK(operation.get() != nullptr);
InvalidateWeakPtrs(operation.get());
}
}
fxl::WeakPtr<OperationContainer> OperationCollection::GetWeakPtr() {
return weak_ptr_factory_.GetWeakPtr();
}
void OperationCollection::Hold(OperationBase* const o) {
operations_.emplace_back(o);
Schedule(o);
}
void OperationCollection::Drop(OperationBase* const o) {
auto it = std::find_if(
operations_.begin(), operations_.end(),
[o](const std::unique_ptr<OperationBase>& p) { return p.get() == o; });
FXL_DCHECK(it != operations_.end());
// Ensures we erase the operation off our container first.
// By keeping a reference to the operation its destructor is only triggered
// after the scope of this method. Otherwise, we would trigger the
// operation's destructor first before we actually removed the operation from
// the container. To prevent reentry in case, an operation might have a
// member variable to someone who has the parent container containing our
// operation.
//
// Example:
// operations.erase(operation) calls operation's destructor ~KillModuleCall()
// ~KillModuleCall() [has member variable FlowToken of OnModuleUpdatedCall]
// ~OnModuleUpdatedCall() [holding container containing KillModuleCall]
// ~OperationQueue() [triggered by ~OnModuleUpdatedCall()]
// [ERROR KillModuleCall in operation_ is nullptr at this point]
// operations_.erase() actually erases the operation from the container
//
// See operation_unittest.cc for testcase. TestCollectionNotNullPtr
std::unique_ptr<OperationBase> operation = std::move(*it);
FXL_DCHECK(it->get() == nullptr);
FXL_DCHECK(operation.get() == o);
InvalidateWeakPtrs(operation.get());
operations_.erase(it);
}
void OperationCollection::Cont() {
// no-op for operation collection.
}
void OperationCollection::ScheduleTask(fit::pending_task task) {
executor_.schedule_task(std::move(task));
}
OperationQueue::OperationQueue()
: executor_(async_get_default_dispatcher()), weak_ptr_factory_(this) {}
OperationQueue::~OperationQueue() {
// We invalidate weakptrs to all Operation<>s first before destroying them, so
// that an outstanding FlowToken<> that gets destroyed in the process doesn't
// erroneously call Operation<>::Done.
while (!operations_.empty()) {
FXL_DCHECK(operations_.front().get() != nullptr);
InvalidateWeakPtrs(operations_.front().get());
operations_.pop();
}
}
fxl::WeakPtr<OperationContainer> OperationQueue::GetWeakPtr() {
return weak_ptr_factory_.GetWeakPtr();
}
void OperationQueue::Hold(OperationBase* const o) {
operations_.emplace(o);
if (idle_) {
FXL_DCHECK(operations_.size() == 1);
idle_ = false;
Schedule(o);
}
}
void OperationQueue::Drop(OperationBase* const o) {
FXL_DCHECK(!operations_.empty());
FXL_DCHECK(operations_.front().get() == o);
// See comment in OperationCollection::Drop() for why this move is important.
std::unique_ptr<OperationBase> operation = std::move(operations_.front());
FXL_DCHECK(operations_.front().get() == nullptr);
FXL_DCHECK(operation.get() == o);
InvalidateWeakPtrs(operation.get());
operations_.pop();
}
void OperationQueue::Cont() {
if (!operations_.empty()) {
auto o = operations_.front().get();
Schedule(o);
} else {
idle_ = true;
}
}
namespace {
class PromiseWrapperCall : public Operation<> {
public:
PromiseWrapperCall(fit::completer<> completer)
: Operation("PromiseDoneCall", [] {}), completer_(std::move(completer)) {}
void Run() {
running_ = true;
completer_.complete_ok();
}
void SayDone() {
FXL_CHECK(running_);
Done();
}
private:
fit::completer<> completer_;
bool running_{false};
};
} // namespace
void OperationQueue::ScheduleTask(fit::pending_task task) {
// We need to block the execution of this task on tasks in |operations_|, and
// then also block further execution of |operations_| on this task finishing.
// We do this by "wrapping" the promise in a PromiseWrapperCall.
//
// |start| will be completed when |wrapper->Run()| is called, and |wrapper|
// will be finished as a result of the promise finishing: we call
// |wrapper->SayDone()| when |task| is destroyed. It is destroyed when either
// a) it is abandoned or b) it is completed successfully or with an error. In
// either case we want to unblock the next operation in the queue.
fit::bridge start;
auto wrapper = new PromiseWrapperCall(std::move(start.completer));
executor_.schedule_task(start.consumer.promise().then(
[p = task.take_promise(), wrapper](fit::result<>&) mutable {
// It is safe to call SayDone() on |wrapper| because we know that
// |wrapper| will be alive so long as this promise is being executed.
//
// We use a fit::defer on the capture list of .then() so that if |p| is
// abandoned, we still unblock the queue.
return p.then([defer = fit::defer([wrapper] { wrapper->SayDone(); })](
fit::result<>&) {});
}));
Add(wrapper);
}
OperationBase::OperationBase(const char* const trace_name,
std::string trace_info)
// While we transition all operations to be explicitly added to containers
// with OperationContainer::Add(), some |c|'s are going to be null.
: weak_ptr_factory_(this),
trace_name_(trace_name),
trace_id_(TRACE_NONCE()),
trace_info_(std::move(trace_info)) {}
OperationBase::~OperationBase() = default;
fxl::WeakPtr<OperationBase> OperationBase::GetWeakPtr() {
return weak_ptr_factory_.GetWeakPtr();
}
void OperationBase::SetOwner(OperationContainer* c) {
FXL_DCHECK(!container_);
container_ = c->GetWeakPtr();
}
void OperationBase::Schedule() {
TraceAsyncBegin();
async::PostTask(async_get_default_dispatcher(),
[this, weak = weak_ptr_factory_.GetWeakPtr()] {
if (weak) {
Run();
}
});
}
void OperationBase::InvalidateWeakPtrs() {
weak_ptr_factory_.InvalidateWeakPtrs();
}
void OperationBase::TraceAsyncBegin() {
TRACE_ASYNC_BEGIN(kModularTraceCategory, trace_name_, trace_id_, kTraceIdKey,
trace_id_, kTraceInfoKey, trace_info_);
}
void OperationBase::TraceAsyncEnd() {
TRACE_ASYNC_END(kModularTraceCategory, trace_name_, trace_id_, kTraceIdKey,
trace_id_, kTraceInfoKey, trace_info_);
}
OperationBase::FlowTokenBase::FlowTokenBase(OperationBase* const op)
: refcount_(new int), weak_op_(op->weak_ptr_factory_.GetWeakPtr()) {
*refcount_ = 1;
}
OperationBase::FlowTokenBase::FlowTokenBase(const FlowTokenBase& other)
: refcount_(other.refcount_), weak_op_(other.weak_op_) {
++*refcount_;
}
OperationBase::FlowTokenBase::~FlowTokenBase() {
--*refcount_;
if (*refcount_ == 0) {
delete refcount_;
}
}
} // namespace modular