blob: 0898e0d462dd75d98900d2aff4fa0b779263efb1 [file] [log] [blame]
// Copyright 2016 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef LIB_ASYNC_CPP_OPERATION_H_
#define LIB_ASYNC_CPP_OPERATION_H_
#include <functional>
#include <memory>
#include <queue>
#include <tuple>
#include <utility>
#include <vector>
#include <lib/async/cpp/future.h>
#include <lib/async_promise/executor.h>
#include <lib/fxl/logging.h>
#include <lib/fxl/macros.h>
#include <lib/fxl/memory/weak_ptr.h>
#include <lib/fit/promise.h>
namespace modular {
class OperationBase;
// An abstract base class which provides methods to hold on to Operation
// instances until they declare themselves to be Done(). Ownership of
// the operations must be managed by implementations.
class OperationContainer {
public:
OperationContainer();
virtual ~OperationContainer();
// Adds |o| to this container and takes ownership.
virtual void Add(OperationBase* o) final;
// Adds |task| to be scheduled on |this|. This mirrors
// the interface in fit::executor, and is here only during
// a transition from Operations to fit::promises.
virtual void ScheduleTask(fit::pending_task task) = 0;
protected:
void Schedule(OperationBase* o);
void InvalidateWeakPtrs(OperationBase* o);
private:
// OperationBase calls the methods below.
friend class OperationBase;
virtual fxl::WeakPtr<OperationContainer> GetWeakPtr() = 0;
// Must take ownership of |o|.
virtual void Hold(OperationBase* o) = 0;
// Must clean up memory for |o|.
virtual void Drop(OperationBase* o) = 0;
virtual void Cont() = 0;
};
// An implementation of |OperationContainer| which runs every instance of
// Operation as soon as it arrives.
class OperationCollection : public OperationContainer {
public:
OperationCollection();
~OperationCollection() override;
void ScheduleTask(fit::pending_task task) override;
private:
fxl::WeakPtr<OperationContainer> GetWeakPtr() override;
void Hold(OperationBase* o) override;
void Drop(OperationBase* o) override;
void Cont() override;
async::Executor executor_;
std::vector<std::unique_ptr<OperationBase>> operations_;
// It is essential that the weak_ptr_factory is defined after the operations_
// container, so that it gets destroyed before operations_, and hence before
// the Operation instances in it, so that the Done() call on their
// OperationBase sees the container weak pointer to be null. That's also why
// we need the virtual GetWeakPtr() in the base class, rather than to put the
// weak ptr factory in the base class.
fxl::WeakPtrFactory<OperationContainer> weak_ptr_factory_;
FXL_DISALLOW_COPY_AND_ASSIGN(OperationCollection);
};
// An implementation of |OperationContainer| which runs incoming Operations
// sequentially. This ensures that there are no partially complete operations
// when a new operation starts.
class OperationQueue : public OperationContainer {
public:
OperationQueue();
~OperationQueue() override;
void ScheduleTask(fit::pending_task task) override;
private:
fxl::WeakPtr<OperationContainer> GetWeakPtr() override;
void Hold(OperationBase* o) override;
void Drop(OperationBase* o) override;
void Cont() override;
// Are there any operations running? An operation is considered running once
// its |Run()| has been invoked, up until result callback has finished. Note
// that an operation could be running while it is not present in
// |operations_|: its result callback could be executing.
bool idle_ = true;
async::Executor executor_;
std::queue<std::unique_ptr<OperationBase>> operations_;
// It is essential that the weak_ptr_factory is defined after the operations_
// container, so that it gets destroyed before operations_, and hence before
// the Operation instances in it, so that the Done() call on their
// OperationBase sees the container weak pointer to be null. That's also why
// we need the virtual GetWeakPtr() in the base class, rather than to put the
// weak ptr factory in the base class.
fxl::WeakPtrFactory<OperationContainer> weak_ptr_factory_;
FXL_DISALLOW_COPY_AND_ASSIGN(OperationQueue);
};
// Something that can be put in an OperationContainer until it calls Done() on
// itself. Used to implement asynchronous operations that need to hold on to
// handles until the operation asynchronously completes and returns a value.
//
// Held by a unique_ptr<> in the OperationContainer, so instances of derived
// classes need to be created with new.
//
// Advantages of using an Operation instance to implement asynchronous fidl
// method invocations:
//
// 1. To receive the return callback, the interface pointer on which the method
// is invoked needs to be kept around. It's tricky to place a move-only
// interface pointer on the capture list of a lambda passed as argument to
// its own method. An instance is much simpler.
//
// 2. The capture list of the callbacks only holds this, everything else that
// needs to be passed on is in the instance.
//
// 3. Completion callbacks don't need to be made copyable and don't need to be
// mutable, because no move only pointer is pulled from their capture list.
//
// 4. Conversion of Handle to Ptr can be done by Bind() because the Ptr is
// already there.
//
// Use of Operation instances must adhere to invariants:
//
// * Deleting the operation container deletes all Operation instances in it, and
// it must be guaranteed that no callbacks of ongoing method invocations are
// invoked after the Operation instance they access is deleted. In order to
// accomplish this, an Operation instance must only invoke methods on fidl
// pointers that are either owned by the Operation instance, or that are owned
// by the immediate owner of the operation container. This way, when the
// operation container is deleted, the destructors of all involved fidl
// pointers are called, close their channels, and cancel all pending method
// callbacks. If a method is invoked on a fidl pointer that lives on beyond
// the lifetime of the operation container, this is not guaranteed.
class OperationBase {
public:
virtual ~OperationBase();
// Derived classes implement this method. It is called by the container of
// this Operation when it is ready. At some point after Run() is invoked,
// |Done()| must be called to signal completion of this operation. This
// usually happens implicitly from the destructor of a FlowToken that is
// passed along the asynchronous flow of control of the Operation.
virtual void Run() = 0;
// Needed to guard callbacks to methods on FIDL pointers that are not owned by
// this Operation instance.
//
// Callbacks on methods on FIDL pointers that are owned by Operation instance
// are never invoked after the Operation instance is deleted, because they are
// cancelled by the destructor of the FIDL pointer. However, if the FIDL
// pointer is owned outside of the Operation instance, such a callback may be
// invoked after the Operation instance was destroyed, if the FIDL pointer
// lives longer.
fxl::WeakPtr<OperationBase> GetWeakPtr();
protected:
// |trace_name| and |trace_info| are used to annotate performance traces.
// |trace_name| must outlive *this.
OperationBase(const char* trace_name, std::string trace_info);
// Useful in log messages.
const char* trace_name() const { return trace_name_; }
class FlowTokenBase;
private:
// OperationContainer calls SetOwner(), InvalidateWeakPtrs() and Schedule().
friend class OperationContainer;
// Called only by OperationContainer::Add().
void SetOwner(OperationContainer* c);
// Called only by OperationContainer.
void Schedule();
// Called only by OperationContainer.
void InvalidateWeakPtrs();
// Operation<..> class DispatchCallback() and accesses trace info fields
// below.
template <typename... Args>
friend class Operation;
// Called only by Operation<...>.
template <typename... Args,
typename ResultCall = std::function<void(Args...)>>
void DispatchCallback(ResultCall result_call, Args... result_args) {
// Move |container| pointer out of this, because |this| gets deleted before
// we stop using |container|.
auto container = std::move(container_);
if (container) {
// Deletes |this|.
container->Drop(this);
// Can no longer refer to |this|.
result_call(std::move(result_args)...);
// The result callback may cause the container to be deleted, so we must
// check it still exists before telling it to continue.
if (container) {
container->Cont();
}
}
}
// Traces the duration of the Operation excution. Begin() is called from
// Schedule(), End() is called from Done().
void TraceAsyncBegin();
void TraceAsyncEnd();
fxl::WeakPtr<OperationContainer> container_;
// Used by FlowTokenBase to suppress Done() calls after the Operation instance
// is deleted. The OperationContainer will invalidate all weak pointers to
// this instance before the destructor is invoked, so it's not necessary that
// this field is last.
fxl::WeakPtrFactory<OperationBase> weak_ptr_factory_;
// Name used to label traces for this operation.
const char* const trace_name_;
// Unique identifier used to correlate trace events for this operation.
const uint64_t trace_id_;
// Additional information added to trace events for this operation.
const std::string trace_info_;
FXL_DISALLOW_COPY_AND_ASSIGN(OperationBase);
};
template <typename... Args>
class Operation : public OperationBase {
public:
~Operation() override = default;
using ResultCall = std::function<void(Args...)>;
protected:
Operation(const char* const trace_name, ResultCall result_call,
const std::string& trace_info = "")
: OperationBase(trace_name, trace_info),
result_call_(std::move(result_call)) {}
// Derived classes call this when they are prepared to be removed from the
// container. Must be the last thing this instance does, as it results in
// destructor invocation.
void Done(Args... result_args) {
TraceAsyncEnd();
DispatchCallback(std::move(result_call_), std::move(result_args)...);
}
class FlowToken;
class FlowTokenHolder;
private:
ResultCall result_call_;
FXL_DISALLOW_COPY_AND_ASSIGN(Operation);
};
// The instance of FlowToken at which the refcount reaches zero in the
// destructor calls Done() on the Operation it holds a reference of.
//
// It is an inner class of Operation so that it has access to Done(), which is
// protected.
//
// Why this is ref counted, not moved:
//
// 1. Some flows of control branch into multiple parallel branches, and then its
// subtle to figure out which one to move it along.
//
// 2. To move something onto a capture list is more verbose than to just copy
// it, so it would defeat the purpose of being simpler to write than the
// status quo.
//
// 3. A lambda with something that is moveonly on its capture list is not
// copyable anymore, and one of the points of Operation was to only have
// copyable continuations.
//
// NOTE: You cannot mix flow tokens and explicit Done() calls. Once an Operation
// uses a flow token, this is how Done() is called, and calling Done()
// explicitly would call it twice.
//
// The parts that are not dependent on the template parameter are factored off
// into a base class as usual.
class OperationBase::FlowTokenBase {
public:
explicit FlowTokenBase(OperationBase* op);
FlowTokenBase(const FlowTokenBase& other);
~FlowTokenBase();
protected:
int refcount() const { return *refcount_; }
bool weak_op() const { return weak_op_.get() != nullptr; }
private:
int* const refcount_; // shared between copies of FlowToken.
fxl::WeakPtr<OperationBase> weak_op_;
};
template <typename... Args>
class Operation<Args...>::FlowToken : OperationBase::FlowTokenBase {
public:
explicit FlowToken(Operation<Args...>* const op, Args* const... result)
: FlowTokenBase(op), op_(op), result_(result...) {}
FlowToken(const FlowToken& other)
: FlowTokenBase(other), op_(other.op_), result_(other.result_) {}
~FlowToken() {
// If refcount is 1 here, it will become 0 in ~FlowTokenBase.
if (refcount() == 1 && weak_op()) {
apply(std::make_index_sequence<
std::tuple_size<decltype(result_)>::value>{});
}
}
private:
// This usage is based on the implementation of std::apply(), which is only
// available in C++17: http://en.cppreference.com/w/cpp/utility/apply
template <size_t... I>
void apply(std::integer_sequence<size_t, I...> /*unused*/) {
op_->Done(std::move((*std::get<I>(result_)))...);
}
Operation<Args...>* const op_; // not owned
// The pointers that FlowToken() is constructed with are stored in this
// std::tuple. They are then extracted and std::move()'d in the |apply()|
// method.
std::tuple<Args* const...> result_; // the values are not owned
};
// Sometimes the asynchronous flow of control that is represented by a FlowToken
// branches, but is actually continued on exactly one branch. For example, when
// a method is called on an external fidl service, and the callback from that
// method is also scheduled as a timeout to avoid blocking the operation queue
// in case the external service misbehaves and doesn't respond.
//
// In that situation, it would be wrong to place two copies of the flow token on
// the capture list of both callbacks, because then the flow would only be
// Done() once both callbacks are destroyed. In the case where the second
// callback is a timeout, this might be really late.
//
// Instead, a single copy of the flow token is placed in a shared container, a
// reference to which is placed on the capture list of both callbacks. The first
// callback that is invoked removes the flow token from the shared container and
// propagates it from there.
//
// That way, the flow token in the shared container also acts as call-once flag.
//
// The flow token holder is a simple wrapper of a shared ptr to a unique ptr. We
// define it because such a nested smart pointer is rather unwieldy to write
// every time.
//
// Example use:
//
// FlowTokenHolder branch{flow};
//
// auto kill_agent = [this, branch] {
// std::unique_ptr<FlowToken> flow = branch.Continue();
// if (!flow) {
// return;
// }
//
// stopped_ = true;
// };
//
// StopAgent(kill_agent);
// SetTimeout(kill_agent, 1);
//
template <typename... Args>
class Operation<Args...>::FlowTokenHolder {
public:
using FlowToken = Operation<Args...>::FlowToken;
explicit FlowTokenHolder(const FlowToken& flow)
: ptr_(std::make_shared<std::unique_ptr<FlowToken>>(
std::make_unique<FlowToken>(flow))) {}
FlowTokenHolder(const FlowTokenHolder& other) : ptr_(other.ptr_) {}
// Calling this method again on any copy of the same FlowTokenHolder yields a
// nullptr. Clients can check for that to enforce call once semantics.
//
// The method is const because it only mutates the pointed to object. It's
// useful to exploit this loophole because this way lambdas that have a
// FlowTokenHolder on their capture list don't need to be mutable.
std::unique_ptr<FlowToken> Continue() const { return std::move(*ptr_); }
private:
std::shared_ptr<std::unique_ptr<FlowToken>> ptr_;
};
// Following is a list of commonly used operations.
// An operation which wraps a Future<>. See WrapFutureAsOperation() below.
template <typename... Args>
class FutureOperation : public Operation<Args...> {
public:
using ResultCall = std::function<void(Args...)>;
FutureOperation(const char* trace_name, FuturePtr<> on_run,
FuturePtr<Args...> done, ResultCall result_call)
: Operation<Args...>(trace_name, std::move(result_call)),
on_run_(on_run),
done_(done) {}
private:
// |OperationBase|
void Run() override {
// FuturePtr is a shared ptr, so the Then() callback is not necessarily
// cancelled by the destructor of this Operation instance. Hence the
// callback must be protected against invocation after delete of this.
done_->WeakThen(this->GetWeakPtr(), [this](Args&&... args) {
this->Done(std::forward<Args>(args)...);
// Can no longer refer to |this|.
});
on_run_->Complete();
}
FuturePtr<> on_run_;
FuturePtr<Args...> done_;
FXL_DISALLOW_COPY_AND_ASSIGN(FutureOperation);
};
// EXPERIMENTAL
//
// This is useful to glue a FIDL call that expects its result as a callback with
// a Future<>, but have the business logic of the Future<> run on an
// OperationContainer.
//
// Usage:
//
// void MyFidlCall(args..., MyFidlCallResult result_call) {
// auto on_run = Future<>::Create();
// auto done = on_run->Map(...)->Then(...);
// operation_container_.Add(WrapFutureAsOperation(
// "MyFidlCall", on_run, done, result_call));
// }
//
template <typename... ResultArgs, typename... FutureArgs>
OperationBase* WrapFutureAsOperation(
const char* const trace_name, FuturePtr<> on_run,
FuturePtr<FutureArgs...> done,
std::function<void(ResultArgs...)> result_call) {
return new FutureOperation<ResultArgs...>(
trace_name, std::move(on_run), std::move(done), std::move(result_call));
}
template <typename... Args>
class FutureOperation2 : public Operation<Args...> {
public:
using ResultCall = std::function<void(Args...)>;
using RunOpCall = std::function<FuturePtr<Args...>(OperationBase*)>;
FutureOperation2(const char* const trace_name, RunOpCall run_op,
ResultCall done)
: Operation<Args...>(trace_name, std::move(done)), run_op_(run_op) {}
private:
// |OperationBase|
void Run() override {
// FuturePtr is a shared ptr, so the Then() callback is not necessarily
// cancelled by the destructor of this Operation instance. Hence the
// callback must be protected against invocation after delete of this.
run_op_(this)->WeakThen(this->GetWeakPtr(), [this](Args&&... args) {
this->Done(std::forward<Args>(args)...);
});
}
RunOpCall run_op_;
FXL_DISALLOW_COPY_AND_ASSIGN(FutureOperation2);
};
// EXPERIMENTAL
//
// Glue code to define operations where the body of the operation is defined
// inline to where the operation instance is created. It is an alternative to
// WrapFutureAsOperation.
//
// The body of the operation is defined as a callback, and is expected to return
// a Future<> that is completed when the operation is finished. If the Future<>
// is completed with values, those values are used as the result of the
// Operation.
//
// Usage:
//
// void MyFidlCall(args..., MyFidlCallResult result_call) {
// operation_container.Add(NewCallbackOperation(
// "MyFidlService::MyFidlCall",
// [this] (OperationBase* op) {
// // Use |op->GetWeakPtr()| to guard any async calls.
// auto f = Future<>::Create();
//
// DoSomethingAsync(f->Completer());
//
// f->WeakMap(op->GetWeakPtr(), [] (int do_something_result) {
// return do_something_result * 2;
// });
//
// return f;
// },
// result_call);
// }
template <typename... ResultArgs>
OperationBase* NewCallbackOperation(
const char* const trace_name,
typename FutureOperation2<ResultArgs...>::RunOpCall run,
typename FutureOperation2<ResultArgs...>::ResultCall done) {
return new FutureOperation2<ResultArgs...>(trace_name, std::move(run),
std::move(done));
}
// An operation which immediately calls its result callback. This is useful for
// making sure that all operations that run before this have completed.
class SyncCall : public Operation<> {
public:
SyncCall(ResultCall result_call)
: Operation("SyncCall", std::move(result_call)) {}
void Run() override { Done(); }
private:
FXL_DISALLOW_COPY_AND_ASSIGN(SyncCall);
};
} // namespace modular
#endif // LIB_ASYNC_CPP_OPERATION_H_