| // Copyright 2016 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #ifndef LIB_ASYNC_CPP_OPERATION_H_ |
| #define LIB_ASYNC_CPP_OPERATION_H_ |
| |
| #include <functional> |
| #include <memory> |
| #include <queue> |
| #include <tuple> |
| #include <utility> |
| #include <vector> |
| |
| #include <lib/async/cpp/future.h> |
| #include <lib/async_promise/executor.h> |
| #include <lib/fxl/logging.h> |
| #include <lib/fxl/macros.h> |
| #include <lib/fxl/memory/weak_ptr.h> |
| #include <lib/fit/promise.h> |
| |
| namespace modular { |
| class OperationBase; |
| |
| // An abstract base class which provides methods to hold on to Operation |
| // instances until they declare themselves to be Done(). Ownership of |
| // the operations must be managed by implementations. |
| class OperationContainer { |
| public: |
| OperationContainer(); |
| virtual ~OperationContainer(); |
| |
| // Adds |o| to this container and takes ownership. |
| virtual void Add(OperationBase* o) final; |
| |
| // Adds |task| to be scheduled on |this|. This mirrors |
| // the interface in fit::executor, and is here only during |
| // a transition from Operations to fit::promises. |
| virtual void ScheduleTask(fit::pending_task task) = 0; |
| |
| protected: |
| void Schedule(OperationBase* o); |
| void InvalidateWeakPtrs(OperationBase* o); |
| |
| private: |
| // OperationBase calls the methods below. |
| friend class OperationBase; |
| |
| virtual fxl::WeakPtr<OperationContainer> GetWeakPtr() = 0; |
| // Must take ownership of |o|. |
| virtual void Hold(OperationBase* o) = 0; |
| // Must clean up memory for |o|. |
| virtual void Drop(OperationBase* o) = 0; |
| virtual void Cont() = 0; |
| }; |
| |
| // An implementation of |OperationContainer| which runs every instance of |
| // Operation as soon as it arrives. |
| class OperationCollection : public OperationContainer { |
| public: |
| OperationCollection(); |
| ~OperationCollection() override; |
| |
| void ScheduleTask(fit::pending_task task) override; |
| |
| private: |
| fxl::WeakPtr<OperationContainer> GetWeakPtr() override; |
| void Hold(OperationBase* o) override; |
| void Drop(OperationBase* o) override; |
| void Cont() override; |
| |
| async::Executor executor_; |
| std::vector<std::unique_ptr<OperationBase>> operations_; |
| |
| // It is essential that the weak_ptr_factory is defined after the operations_ |
| // container, so that it gets destroyed before operations_, and hence before |
| // the Operation instances in it, so that the Done() call on their |
| // OperationBase sees the container weak pointer to be null. That's also why |
| // we need the virtual GetWeakPtr() in the base class, rather than to put the |
| // weak ptr factory in the base class. |
| fxl::WeakPtrFactory<OperationContainer> weak_ptr_factory_; |
| |
| FXL_DISALLOW_COPY_AND_ASSIGN(OperationCollection); |
| }; |
| |
| // An implementation of |OperationContainer| which runs incoming Operations |
| // sequentially. This ensures that there are no partially complete operations |
| // when a new operation starts. |
| class OperationQueue : public OperationContainer { |
| public: |
| OperationQueue(); |
| ~OperationQueue() override; |
| |
| void ScheduleTask(fit::pending_task task) override; |
| |
| private: |
| fxl::WeakPtr<OperationContainer> GetWeakPtr() override; |
| void Hold(OperationBase* o) override; |
| void Drop(OperationBase* o) override; |
| void Cont() override; |
| |
| // Are there any operations running? An operation is considered running once |
| // its |Run()| has been invoked, up until result callback has finished. Note |
| // that an operation could be running while it is not present in |
| // |operations_|: its result callback could be executing. |
| bool idle_ = true; |
| |
| async::Executor executor_; |
| std::queue<std::unique_ptr<OperationBase>> operations_; |
| |
| // It is essential that the weak_ptr_factory is defined after the operations_ |
| // container, so that it gets destroyed before operations_, and hence before |
| // the Operation instances in it, so that the Done() call on their |
| // OperationBase sees the container weak pointer to be null. That's also why |
| // we need the virtual GetWeakPtr() in the base class, rather than to put the |
| // weak ptr factory in the base class. |
| fxl::WeakPtrFactory<OperationContainer> weak_ptr_factory_; |
| |
| FXL_DISALLOW_COPY_AND_ASSIGN(OperationQueue); |
| }; |
| |
| // Something that can be put in an OperationContainer until it calls Done() on |
| // itself. Used to implement asynchronous operations that need to hold on to |
| // handles until the operation asynchronously completes and returns a value. |
| // |
| // Held by a unique_ptr<> in the OperationContainer, so instances of derived |
| // classes need to be created with new. |
| // |
| // Advantages of using an Operation instance to implement asynchronous fidl |
| // method invocations: |
| // |
| // 1. To receive the return callback, the interface pointer on which the method |
| // is invoked needs to be kept around. It's tricky to place a move-only |
| // interface pointer on the capture list of a lambda passed as argument to |
| // its own method. An instance is much simpler. |
| // |
| // 2. The capture list of the callbacks only holds this, everything else that |
| // needs to be passed on is in the instance. |
| // |
| // 3. Completion callbacks don't need to be made copyable and don't need to be |
| // mutable, because no move only pointer is pulled from their capture list. |
| // |
| // 4. Conversion of Handle to Ptr can be done by Bind() because the Ptr is |
| // already there. |
| // |
| // Use of Operation instances must adhere to invariants: |
| // |
| // * Deleting the operation container deletes all Operation instances in it, and |
| // it must be guaranteed that no callbacks of ongoing method invocations are |
| // invoked after the Operation instance they access is deleted. In order to |
| // accomplish this, an Operation instance must only invoke methods on fidl |
| // pointers that are either owned by the Operation instance, or that are owned |
| // by the immediate owner of the operation container. This way, when the |
| // operation container is deleted, the destructors of all involved fidl |
| // pointers are called, close their channels, and cancel all pending method |
| // callbacks. If a method is invoked on a fidl pointer that lives on beyond |
| // the lifetime of the operation container, this is not guaranteed. |
| class OperationBase { |
| public: |
| virtual ~OperationBase(); |
| |
| // Derived classes implement this method. It is called by the container of |
| // this Operation when it is ready. At some point after Run() is invoked, |
| // |Done()| must be called to signal completion of this operation. This |
| // usually happens implicitly from the destructor of a FlowToken that is |
| // passed along the asynchronous flow of control of the Operation. |
| virtual void Run() = 0; |
| |
| // Needed to guard callbacks to methods on FIDL pointers that are not owned by |
| // this Operation instance. |
| // |
| // Callbacks on methods on FIDL pointers that are owned by Operation instance |
| // are never invoked after the Operation instance is deleted, because they are |
| // cancelled by the destructor of the FIDL pointer. However, if the FIDL |
| // pointer is owned outside of the Operation instance, such a callback may be |
| // invoked after the Operation instance was destroyed, if the FIDL pointer |
| // lives longer. |
| fxl::WeakPtr<OperationBase> GetWeakPtr(); |
| |
| protected: |
| // |trace_name| and |trace_info| are used to annotate performance traces. |
| // |trace_name| must outlive *this. |
| OperationBase(const char* trace_name, std::string trace_info); |
| |
| // Useful in log messages. |
| const char* trace_name() const { return trace_name_; } |
| |
| class FlowTokenBase; |
| |
| private: |
| // OperationContainer calls SetOwner(), InvalidateWeakPtrs() and Schedule(). |
| friend class OperationContainer; |
| |
| // Called only by OperationContainer::Add(). |
| void SetOwner(OperationContainer* c); |
| |
| // Called only by OperationContainer. |
| void Schedule(); |
| |
| // Called only by OperationContainer. |
| void InvalidateWeakPtrs(); |
| |
| // Operation<..> class DispatchCallback() and accesses trace info fields |
| // below. |
| template <typename... Args> |
| friend class Operation; |
| |
| // Called only by Operation<...>. |
| template <typename... Args, |
| typename ResultCall = std::function<void(Args...)>> |
| void DispatchCallback(ResultCall result_call, Args... result_args) { |
| // Move |container| pointer out of this, because |this| gets deleted before |
| // we stop using |container|. |
| auto container = std::move(container_); |
| if (container) { |
| // Deletes |this|. |
| container->Drop(this); |
| |
| // Can no longer refer to |this|. |
| result_call(std::move(result_args)...); |
| |
| // The result callback may cause the container to be deleted, so we must |
| // check it still exists before telling it to continue. |
| if (container) { |
| container->Cont(); |
| } |
| } |
| } |
| |
| // Traces the duration of the Operation excution. Begin() is called from |
| // Schedule(), End() is called from Done(). |
| void TraceAsyncBegin(); |
| void TraceAsyncEnd(); |
| |
| fxl::WeakPtr<OperationContainer> container_; |
| |
| // Used by FlowTokenBase to suppress Done() calls after the Operation instance |
| // is deleted. The OperationContainer will invalidate all weak pointers to |
| // this instance before the destructor is invoked, so it's not necessary that |
| // this field is last. |
| fxl::WeakPtrFactory<OperationBase> weak_ptr_factory_; |
| |
| // Name used to label traces for this operation. |
| const char* const trace_name_; |
| |
| // Unique identifier used to correlate trace events for this operation. |
| const uint64_t trace_id_; |
| |
| // Additional information added to trace events for this operation. |
| const std::string trace_info_; |
| |
| FXL_DISALLOW_COPY_AND_ASSIGN(OperationBase); |
| }; |
| |
| template <typename... Args> |
| class Operation : public OperationBase { |
| public: |
| ~Operation() override = default; |
| |
| using ResultCall = std::function<void(Args...)>; |
| |
| protected: |
| Operation(const char* const trace_name, ResultCall result_call, |
| const std::string& trace_info = "") |
| : OperationBase(trace_name, trace_info), |
| result_call_(std::move(result_call)) {} |
| |
| // Derived classes call this when they are prepared to be removed from the |
| // container. Must be the last thing this instance does, as it results in |
| // destructor invocation. |
| void Done(Args... result_args) { |
| TraceAsyncEnd(); |
| DispatchCallback(std::move(result_call_), std::move(result_args)...); |
| } |
| |
| class FlowToken; |
| class FlowTokenHolder; |
| |
| private: |
| ResultCall result_call_; |
| |
| FXL_DISALLOW_COPY_AND_ASSIGN(Operation); |
| }; |
| |
| // The instance of FlowToken at which the refcount reaches zero in the |
| // destructor calls Done() on the Operation it holds a reference of. |
| // |
| // It is an inner class of Operation so that it has access to Done(), which is |
| // protected. |
| // |
| // Why this is ref counted, not moved: |
| // |
| // 1. Some flows of control branch into multiple parallel branches, and then its |
| // subtle to figure out which one to move it along. |
| // |
| // 2. To move something onto a capture list is more verbose than to just copy |
| // it, so it would defeat the purpose of being simpler to write than the |
| // status quo. |
| // |
| // 3. A lambda with something that is moveonly on its capture list is not |
| // copyable anymore, and one of the points of Operation was to only have |
| // copyable continuations. |
| // |
| // NOTE: You cannot mix flow tokens and explicit Done() calls. Once an Operation |
| // uses a flow token, this is how Done() is called, and calling Done() |
| // explicitly would call it twice. |
| // |
| // The parts that are not dependent on the template parameter are factored off |
| // into a base class as usual. |
| class OperationBase::FlowTokenBase { |
| public: |
| explicit FlowTokenBase(OperationBase* op); |
| FlowTokenBase(const FlowTokenBase& other); |
| ~FlowTokenBase(); |
| |
| protected: |
| int refcount() const { return *refcount_; } |
| bool weak_op() const { return weak_op_.get() != nullptr; } |
| |
| private: |
| int* const refcount_; // shared between copies of FlowToken. |
| fxl::WeakPtr<OperationBase> weak_op_; |
| }; |
| |
| template <typename... Args> |
| class Operation<Args...>::FlowToken : OperationBase::FlowTokenBase { |
| public: |
| explicit FlowToken(Operation<Args...>* const op, Args* const... result) |
| : FlowTokenBase(op), op_(op), result_(result...) {} |
| |
| FlowToken(const FlowToken& other) |
| : FlowTokenBase(other), op_(other.op_), result_(other.result_) {} |
| |
| ~FlowToken() { |
| // If refcount is 1 here, it will become 0 in ~FlowTokenBase. |
| if (refcount() == 1 && weak_op()) { |
| apply(std::make_index_sequence< |
| std::tuple_size<decltype(result_)>::value>{}); |
| } |
| } |
| |
| private: |
| // This usage is based on the implementation of std::apply(), which is only |
| // available in C++17: http://en.cppreference.com/w/cpp/utility/apply |
| template <size_t... I> |
| void apply(std::integer_sequence<size_t, I...> /*unused*/) { |
| op_->Done(std::move((*std::get<I>(result_)))...); |
| } |
| |
| Operation<Args...>* const op_; // not owned |
| |
| // The pointers that FlowToken() is constructed with are stored in this |
| // std::tuple. They are then extracted and std::move()'d in the |apply()| |
| // method. |
| std::tuple<Args* const...> result_; // the values are not owned |
| }; |
| |
| // Sometimes the asynchronous flow of control that is represented by a FlowToken |
| // branches, but is actually continued on exactly one branch. For example, when |
| // a method is called on an external fidl service, and the callback from that |
| // method is also scheduled as a timeout to avoid blocking the operation queue |
| // in case the external service misbehaves and doesn't respond. |
| // |
| // In that situation, it would be wrong to place two copies of the flow token on |
| // the capture list of both callbacks, because then the flow would only be |
| // Done() once both callbacks are destroyed. In the case where the second |
| // callback is a timeout, this might be really late. |
| // |
| // Instead, a single copy of the flow token is placed in a shared container, a |
| // reference to which is placed on the capture list of both callbacks. The first |
| // callback that is invoked removes the flow token from the shared container and |
| // propagates it from there. |
| // |
| // That way, the flow token in the shared container also acts as call-once flag. |
| // |
| // The flow token holder is a simple wrapper of a shared ptr to a unique ptr. We |
| // define it because such a nested smart pointer is rather unwieldy to write |
| // every time. |
| // |
| // Example use: |
| // |
| // FlowTokenHolder branch{flow}; |
| // |
| // auto kill_agent = [this, branch] { |
| // std::unique_ptr<FlowToken> flow = branch.Continue(); |
| // if (!flow) { |
| // return; |
| // } |
| // |
| // stopped_ = true; |
| // }; |
| // |
| // StopAgent(kill_agent); |
| // SetTimeout(kill_agent, 1); |
| // |
| template <typename... Args> |
| class Operation<Args...>::FlowTokenHolder { |
| public: |
| using FlowToken = Operation<Args...>::FlowToken; |
| |
| explicit FlowTokenHolder(const FlowToken& flow) |
| : ptr_(std::make_shared<std::unique_ptr<FlowToken>>( |
| std::make_unique<FlowToken>(flow))) {} |
| |
| FlowTokenHolder(const FlowTokenHolder& other) : ptr_(other.ptr_) {} |
| |
| // Calling this method again on any copy of the same FlowTokenHolder yields a |
| // nullptr. Clients can check for that to enforce call once semantics. |
| // |
| // The method is const because it only mutates the pointed to object. It's |
| // useful to exploit this loophole because this way lambdas that have a |
| // FlowTokenHolder on their capture list don't need to be mutable. |
| std::unique_ptr<FlowToken> Continue() const { return std::move(*ptr_); } |
| |
| private: |
| std::shared_ptr<std::unique_ptr<FlowToken>> ptr_; |
| }; |
| |
| // Following is a list of commonly used operations. |
| |
| // An operation which wraps a Future<>. See WrapFutureAsOperation() below. |
| template <typename... Args> |
| class FutureOperation : public Operation<Args...> { |
| public: |
| using ResultCall = std::function<void(Args...)>; |
| |
| FutureOperation(const char* trace_name, FuturePtr<> on_run, |
| FuturePtr<Args...> done, ResultCall result_call) |
| : Operation<Args...>(trace_name, std::move(result_call)), |
| on_run_(on_run), |
| done_(done) {} |
| |
| private: |
| // |OperationBase| |
| void Run() override { |
| // FuturePtr is a shared ptr, so the Then() callback is not necessarily |
| // cancelled by the destructor of this Operation instance. Hence the |
| // callback must be protected against invocation after delete of this. |
| done_->WeakThen(this->GetWeakPtr(), [this](Args&&... args) { |
| this->Done(std::forward<Args>(args)...); |
| // Can no longer refer to |this|. |
| }); |
| on_run_->Complete(); |
| } |
| |
| FuturePtr<> on_run_; |
| FuturePtr<Args...> done_; |
| |
| FXL_DISALLOW_COPY_AND_ASSIGN(FutureOperation); |
| }; |
| |
| // EXPERIMENTAL |
| // |
| // This is useful to glue a FIDL call that expects its result as a callback with |
| // a Future<>, but have the business logic of the Future<> run on an |
| // OperationContainer. |
| // |
| // Usage: |
| // |
| // void MyFidlCall(args..., MyFidlCallResult result_call) { |
| // auto on_run = Future<>::Create(); |
| // auto done = on_run->Map(...)->Then(...); |
| // operation_container_.Add(WrapFutureAsOperation( |
| // "MyFidlCall", on_run, done, result_call)); |
| // } |
| // |
| template <typename... ResultArgs, typename... FutureArgs> |
| OperationBase* WrapFutureAsOperation( |
| const char* const trace_name, FuturePtr<> on_run, |
| FuturePtr<FutureArgs...> done, |
| std::function<void(ResultArgs...)> result_call) { |
| return new FutureOperation<ResultArgs...>( |
| trace_name, std::move(on_run), std::move(done), std::move(result_call)); |
| } |
| |
| template <typename... Args> |
| class FutureOperation2 : public Operation<Args...> { |
| public: |
| using ResultCall = std::function<void(Args...)>; |
| using RunOpCall = std::function<FuturePtr<Args...>(OperationBase*)>; |
| |
| FutureOperation2(const char* const trace_name, RunOpCall run_op, |
| ResultCall done) |
| : Operation<Args...>(trace_name, std::move(done)), run_op_(run_op) {} |
| |
| private: |
| // |OperationBase| |
| void Run() override { |
| // FuturePtr is a shared ptr, so the Then() callback is not necessarily |
| // cancelled by the destructor of this Operation instance. Hence the |
| // callback must be protected against invocation after delete of this. |
| run_op_(this)->WeakThen(this->GetWeakPtr(), [this](Args&&... args) { |
| this->Done(std::forward<Args>(args)...); |
| }); |
| } |
| |
| RunOpCall run_op_; |
| |
| FXL_DISALLOW_COPY_AND_ASSIGN(FutureOperation2); |
| }; |
| |
| // EXPERIMENTAL |
| // |
| // Glue code to define operations where the body of the operation is defined |
| // inline to where the operation instance is created. It is an alternative to |
| // WrapFutureAsOperation. |
| // |
| // The body of the operation is defined as a callback, and is expected to return |
| // a Future<> that is completed when the operation is finished. If the Future<> |
| // is completed with values, those values are used as the result of the |
| // Operation. |
| // |
| // Usage: |
| // |
| // void MyFidlCall(args..., MyFidlCallResult result_call) { |
| // operation_container.Add(NewCallbackOperation( |
| // "MyFidlService::MyFidlCall", |
| // [this] (OperationBase* op) { |
| // // Use |op->GetWeakPtr()| to guard any async calls. |
| // auto f = Future<>::Create(); |
| // |
| // DoSomethingAsync(f->Completer()); |
| // |
| // f->WeakMap(op->GetWeakPtr(), [] (int do_something_result) { |
| // return do_something_result * 2; |
| // }); |
| // |
| // return f; |
| // }, |
| // result_call); |
| // } |
| template <typename... ResultArgs> |
| OperationBase* NewCallbackOperation( |
| const char* const trace_name, |
| typename FutureOperation2<ResultArgs...>::RunOpCall run, |
| typename FutureOperation2<ResultArgs...>::ResultCall done) { |
| return new FutureOperation2<ResultArgs...>(trace_name, std::move(run), |
| std::move(done)); |
| } |
| |
| // An operation which immediately calls its result callback. This is useful for |
| // making sure that all operations that run before this have completed. |
| class SyncCall : public Operation<> { |
| public: |
| SyncCall(ResultCall result_call) |
| : Operation("SyncCall", std::move(result_call)) {} |
| |
| void Run() override { Done(); } |
| |
| private: |
| FXL_DISALLOW_COPY_AND_ASSIGN(SyncCall); |
| }; |
| |
| } // namespace modular |
| |
| #endif // LIB_ASYNC_CPP_OPERATION_H_ |