| // Copyright 2018 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #ifndef LIB_ASYNC_CPP_FUTURE_H_ |
| #define LIB_ASYNC_CPP_FUTURE_H_ |
| |
| #include <memory> |
| #include <string> |
| #include <type_traits> |
| #include <utility> |
| #include <vector> |
| |
| #include <lib/async/cpp/task.h> |
| #include <lib/async/default.h> |
| |
| #include "garnet/public/lib/fxl/functional/apply.h" |
| #include "garnet/public/lib/fxl/macros.h" |
| #include "garnet/public/lib/fxl/memory/ref_counted.h" |
| #include "garnet/public/lib/fxl/memory/ref_ptr.h" |
| #include "garnet/public/lib/fxl/memory/weak_ptr.h" |
| |
| namespace modular { |
| |
| // # Futures |
| // |
| // A *future* is an object representing the eventual value of an asynchronous |
| // operation. They are a useful complement to callback functions or lambdas |
| // because they are _composable_: asynchronous operations can be sequentially |
| // executed, and an async operation's result can be passed to another async |
| // operation, like a Unix pipeline. |
| // |
| // To use a future: |
| // |
| // 1. A *producer*, typically an async operation, creates a Future with |
| // Future<ResultType>::Create(). |
| // 2. The producer starts its async operation (e.g. a network request or disk |
| // read). |
| // 3. The producer synchronously returns the Future to a *consumer*. |
| // 4. A consumer attaches a *callback* lambda to the Future using Then(). (The |
| // callback can be attached to the future any time after the future is |
| // created, before or after the async operation is finished.) |
| // 5. Some time later, when the producer's async operation is finished, the |
| // producer *completes* the future with a *result* using Complete(result). |
| // |result| is 0 to N movable or copyable values, e.g. Complete(), |
| // Complete(value), Complete(value1, value2, ...). |
| // 6. The consumer's callback is invoked after the future is completed, with the |
| // completed result passed as zero or more parameters to the callback. |
| // |
| // The following example shows a simple use case: |
| // |
| // Producer: |
| // |
| // FuturePtr<Bytes> MakeNetworkRequest(NetworkRequest& request) { |
| // auto f = Future<Bytes>::Create("NetworkRequest"); // a "trace_name" that's |
| // // logged when things go |
| // // wrong |
| // auto network_request_callback = [f] (Bytes bytes) { |
| // f->Complete(bytes); |
| // }; |
| // PerformAsyncNetworkRequest(request, network_request_callback); |
| // return f; |
| // } |
| // |
| // Client: |
| // |
| // FuturePtr<Bytes> f = MakeNetworkRequest(); |
| // f->Then([] (Bytes bytes) { |
| // ProcessBytes(bytes); |
| // }); |
| // |
| // ## Chaining and sequencing futures |
| // |
| // Futures can be composed; this is commonly called "chaining". Methods that |
| // attach callbacks, such as Then(), will return another Future: the returned |
| // Future is completed once the previous callback has finished executing. For |
| // example: |
| // |
| // Client: |
| // |
| // ShowProgressSpinner(true); |
| // FuturePtr<Bytes> f = MakeNetworkRequest(request); |
| // f->AsyncMap([] (Bytes zipped_bytes) { |
| // // Use AsyncMap() if your callback wants to return another Future. |
| // FuturePtr<Bytes> f = UnzipInBackground(zipped_bytes); |
| // return f; |
| // })->Map([] (Bytes unzipped_bytes) { |
| // // Use Map() if your callback returns a non-future: the callback's return |
| // // value will be wrapped up into the returned Future. |
| // JPEGImage image = DecodeImageSynchronously(unzipped_bytes); |
| // return image; |
| // })->AsyncMap([] (JPEGImage image) { |
| // FuturePtr<> f = UpdateUIAsynchronously(image); |
| // return f; |
| // })->Then([] { |
| // // Use Then() if your callback returns void. Note that Then() still returns |
| // // a Future<>, which will be completed only when your callback finishes |
| // // executing. |
| // ShowProgressSpinner(false); |
| // }); |
| // |
| // ## Memory Management & Ownership |
| // |
| // FuturePtr is a fxl::RefPtr, which is a smart pointer similar to |
| // std::shared_ptr that holds a reference count (refcount). When you call |
| // Future::Create(), you are expected to maintain a reference to it. When the |
| // Future is deleted, its result and callbacks are also deleted. |
| // |
| // Each method documents how it affects the future's refcount. To summarize: |
| // |
| // * Calling Then() on a future does not affect its refcount. This applies to |
| // all methods that returns a chained future, such as AsyncMap() and Map(). |
| // * However, calling Then() on a future will cause that future to maintain a |
| // reference to the returned chained future. So, you do not need to maintain a |
| // reference to the returned future. |
| // * Calling Complete() does not affect the future's refcount. |
| // * Unlike Complete(), the closure returned by Completer() _does own_ the |
| // future, so you do not need to maintain a reference to the future after |
| // calling Completer(). (You do need to maintain a reference to the closure, |
| // however.) |
| // * Wait(futures) returns a future that every future in |futures| owns, so you |
| // do not need to maintain a reference to the returned future. The callback |
| // attached to each future in |futures| will also keep a reference to |
| // themselves, so that if a future that is Wait()ed on otherwise goes out of |
| // scope, the future itself is kept alive. |
| // |
| // See each method's documentation for more details on memory management. |
| // |
| // ## Use Weak*() variants to cancel callback chains |
| // |
| // "Weak" variants exist for all chain/sequence methods (WeakThen(), |
| // WeakConstThen(), WeakMap() and WeakAsyncMap()). These are almost identical |
| // to their non-weak counterparts but take an fxl::WeakPtr<T> as a first |
| // argument. If, at callback invocation time, the WeakPtr is invalid, execution |
| // will halt and no future further down the chain will be executed. |
| // |
| // Example: |
| // |
| // FuturePtr<> f = MakeFuture(); |
| // f->WeakThen(weak_ptr_factory.GetWeakPtr(), [] { |
| // FXL_LOG(INFO) << "This won't execute"; |
| // })->Then([] { |
| // FXL_LOG(INFO) << "Neither will this"; |
| // }); |
| // weak_ptr_factory.InvalidateWeakPtrs(); |
| // f->Complete(); |
| // |
| // ## Use Wait() to synchronize on multiple futures |
| // |
| // If multiple futures are running, use the Wait() function to create a |
| // Future that completes when all the futures passed to it are completed: |
| // |
| // FuturePtr<Bytes> f1 = MakeNetworkRequest(request1); |
| // FuturePtr<Bytes> f2 = MakeNetworkRequest(request2); |
| // FuturePtr<Bytes> f3 = MakeNetworkRequest(request3); |
| // Wait<Future<>>("Network requests", {f1, f2, f3})->Then([] { |
| // AllNetworkRequestsAreComplete(); |
| // }); |
| // |
| // See the Wait() function documentation for more details. |
| // |
| // ## Use Completer() to integrate with functions requiring callbacks |
| // |
| // Use the Completer() method to integrate with existing code that uses callback |
| // functions. Completer() returns an std::function<void(Result)> that, when |
| // called, calls Complete() on the future. Re-visiting the first example: |
| // |
| // Without Completer(): |
| // |
| // FuturePtr<Bytes> MakeNetworkRequest(NetworkRequest& request) { |
| // auto f = Future<Bytes>::Create("NetworkRequest"); |
| // auto network_request_callback = [f] (Bytes bytes) { |
| // f->Complete(bytes); |
| // }; |
| // PerformAsyncNetworkRequest(request, network_request_callback); |
| // return f; |
| // } |
| // |
| // With Completer(): |
| // |
| // FuturePtr<Bytes> MakeNetworkRequest(NetworkRequest& request) { |
| // auto f = Future<Bytes>::Create("NetworkRequest"); |
| // PerformAsyncNetworkRequest(request, f->Completer()); |
| // return f; |
| // } |
| // |
| // ## Use error values to propagate errors back to consumers |
| // |
| // If the future can fail, use a value (or multiple values) that's capable of |
| // storing both the error and a successful result to propagate the return value |
| // back to the consumer. For example: |
| // |
| // FuturePtr<std::error_code, Bytes> f = MakeNetworkRequest(); |
| // f->Then([] (std::error_code error, Bytes bytes) { |
| // if (error) { |
| // // handle error |
| // } else { |
| // // network request was successful |
| // ProcessBytes(bytes); |
| // } |
| // }); |
| // |
| // ## fuchsia::modular::Future vs other Futures/Promises |
| // |
| // If you are familiar with Futures & Promises in other languages, this Future |
| // class is intentionally different from others, to better integrate with |
| // Fuchsia coding patterns: |
| // |
| // * NOT THREADSAFE. This will change in the future when thread safety is |
| // required, but there are no use cases yet. (YAGNI!) |
| // * Support for multiple result values via variadic template parameters. This |
| // is required for smooth integration with the fuchsia::modular::Operation |
| // class. |
| // * Only a single callback can be set via Then(), since move semantics are used |
| // so heavily in Fuchsia code. (If multiple callbacks were supported and the |
| // result is moved, how does one move the result from one callback to |
| // another?) |
| // * Multiple callbacks can be set if the callback lambda takes the result via |
| // const&. In this case, use ConstThen() to attach each callback, rather |
| // than Then(). ConstThen() calls can also be chained, like Then(). |
| // * There are no success/error callbacks and control flows: all callbacks are |
| // "success callbacks". |
| // * The traditional reason for error callbacks is to convert exceptions into |
| // error values, but Google C++ style doesn't use exceptions, so error |
| // callbacks aren't needed. |
| // * There's some argument that using separate success/error control flow |
| // paths is beneficial. However, in practice, a lot of client code using |
| // this pattern don't attach error callbacks, only success callbacks, so |
| // errors often go unchecked. |
| // * If error values need to be propagated back to the client, use a dedicated |
| // error type. (Note that many built-in types may have values that can be |
| // interpreted as errors, e.g. nullptr, or 0 or -1.) This also forces a |
| // consumer to inspect the type and check for errors. |
| // * No cancellation/progress support. Adding cancellation/progress: |
| // * adds more complexity to the futures implementation, |
| // * can be implemented on top of a core futures implementation for the few |
| // cases where they're required, and |
| // * typically requires extra cancel/progress callbacks, which adds more |
| // control flow paths. |
| // * see fxl::CancelableCallback if you need cancellation. |
| // * No execution contexts (yet). |
| // * There needs to be a comprehensive story about runloops etc first. |
| |
| template <typename... Result> |
| class Future; |
| |
| template <typename... Result> |
| using FuturePtr = fxl::RefPtr<Future<Result...>>; |
| |
| namespace internal { |
| |
| enum class FutureStatus { |
| kAwaiting, // not completed |
| kCompleted, // value available, not yet moved into callback |
| kConsumed // value moved into callback |
| }; |
| |
| // type_traits functions, ported from C++17. |
| template <typename From, typename To> |
| constexpr bool is_convertible_v = std::is_convertible<From, To>::value; |
| |
| template <class T> |
| constexpr bool is_void_v = std::is_void<T>::value; |
| |
| template <typename... Result> |
| class DefaultResultsFuture { |
| public: |
| using type = Future<std::vector<std::tuple<Result...>>>; |
| }; |
| |
| template <typename Result> |
| class DefaultResultsFuture<Result> { |
| public: |
| using type = Future<std::vector<Result>>; |
| }; |
| |
| template <> |
| class DefaultResultsFuture<> { |
| public: |
| using type = Future<>; |
| }; |
| |
| template <typename... Result> |
| using DefaultResultsFuture_t = typename DefaultResultsFuture<Result...>::type; |
| |
| template <typename ResultsFuture> |
| class ResultCollector { |
| public: |
| // ResultsFuture = Future<std::vector<ElementType>> |
| using ElementType = typename std::tuple_element_t< |
| 0, typename ResultsFuture::result_tuple_type>::value_type; |
| |
| ResultCollector(size_t reserved_count) : results_(reserved_count) {} |
| |
| bool IsComplete() const { return finished_count_ == results_.size(); } |
| |
| template <typename... Result> |
| void AssignResult(size_t result_index, Result&&... result) { |
| results_[result_index] = |
| std::make_unique<ElementType>(std::forward<Result>(result)...); |
| finished_count_++; |
| } |
| |
| void Complete(ResultsFuture* future) { |
| std::vector<ElementType> final_results; |
| final_results.reserve(results_.size()); |
| for (auto& result : results_) { |
| final_results.push_back(std::move(*result)); |
| } |
| future->Complete(std::move(final_results)); |
| } |
| |
| private: |
| size_t finished_count_ = 0; |
| // Use unique ptrs initially so that we can reserve even if |ElementType| is |
| // not default-constructible. |
| // |
| // TODO(rosswang): Consider adding a specialization for default-constructible |
| // types. |
| std::vector<std::unique_ptr<ElementType>> results_; |
| }; |
| |
| template <> |
| class ResultCollector<Future<>> { |
| public: |
| ResultCollector(size_t reserved_count); |
| bool IsComplete() const; |
| |
| // The template on this allows us to use Future<> collectors to swallow |
| // unneeded results of futures we're waiting on. |
| template <typename... Result> |
| void AssignResult(size_t, Result... result) { |
| finished_count_++; |
| } |
| |
| void Complete(Future<>* future) const; |
| |
| private: |
| size_t finished_count_ = 0; |
| size_t reserved_count_; |
| }; |
| |
| } // namespace internal |
| |
| template <typename... Result> |
| class Future : public fxl::RefCountedThreadSafe<Future<Result...>> { |
| public: |
| using result_tuple_type = std::tuple<Result...>; |
| |
| // Creates a FuturePtr<Result...>. |trace_name| is used solely for debugging |
| // purposes, and is logged when something goes wrong (e.g. Complete() is |
| // called twice.) |
| static FuturePtr<Result...> Create(const std::string& trace_name) { |
| auto f = fxl::AdoptRef(new Future<Result...>); |
| f->trace_name_ = std::move(trace_name); |
| return f; |
| } |
| |
| // Creates a FuturePtr<Result...> that's already completed. For example: |
| // |
| // FuturePtr<int> f = Future<int>::CreateCompleted("MyFuture", 5); |
| // f->Then([] (int i) { |
| // // this lambda executes immediately |
| // assert(i == 5); |
| // }); |
| static FuturePtr<Result...> CreateCompleted(const std::string& trace_name, |
| Result&&... result) { |
| auto f = Create(std::move(trace_name)); |
| f->Complete(std::forward<Result>(result)...); |
| return f; |
| } |
| |
| // Completes a future with |result|. This causes any callbacks registered |
| // with Then(), ConstThen(), etc to be invoked with |result| passed to them |
| // as a parameter. |
| // |
| // Calling Complete() does not affect this future's refcount. This is because: |
| // |
| // 1. Any callbacks that are registered are called immediately and |
| // synchronously, so the future's lifetime does not need to be extended |
| // before callbacks are invoked. |
| // 2. Then() correctly handles cases where the future may be deleted by their |
| // callbacks. |
| // 3. There is no danger of the future being deleted before Complete() is |
| // called, because if Complete() is called, the code that calls Complete() |
| // must have a reference to the future. |
| void Complete(Result&&... result) { |
| CompleteWithTuple(std::forward_as_tuple(std::forward<Result>(result)...)); |
| } |
| |
| // Returns a std::function<void(Result)> that, when called, calls Complete() |
| // on this future. For example: |
| // |
| // FuturePtr<Bytes> MakeNetworkRequest(NetworkRequest& request) { |
| // auto f = Future<Bytes>::Create(); |
| // PerformAsyncNetworkRequest(request, f->Completer()); |
| // return f; |
| // } |
| // |
| // The returned closure will maintain a reference to the future, so that the |
| // closure can call Complete() on it correctly later. In other words, calling |
| // Completer() will increase this future's refcount, and you do not need to |
| // maintain a reference to it. After the closure is called, the future's |
| // refcount will drop by 1. This enables you to write code like |
| // |
| // { |
| // auto f = Future<>::Create(); |
| // CallAsyncMethod(f->Completer()); |
| // // f will now go out of scope, but f->Completer() owns it, so it's |
| // // still kept alive. |
| // } |
| std::function<void(Result...)> Completer() { |
| return [shared_this = FuturePtr<Result...>(this)](Result&&... result) { |
| shared_this->Complete(std::forward<Result>(result)...); |
| }; |
| } |
| |
| // Attaches a |callback| that is invoked when the future is completed with |
| // Complete(), and returns a Future that is complete once |callback| has |
| // finished executing. |
| // |
| // * The callback is invoked immediately (synchronously); it is not scheduled |
| // on the event loop. |
| // * The callback is invoked on the same thread as the code that calls |
| // Complete(). |
| // * Only one callback can be attached: any callback that was previously |
| // attached with Then() is discarded. |
| // * |callback| is called after callbacks attached with ConstThen(). |
| // * It is safe for |callback| to delete the future that Then() is invoked on. |
| // If this occurs, any chained futures returned by Then(), Map() etc will be |
| // de-referenced by this future and not be completed, even if a reference to |
| // the chained future is maintained elsewhere. |
| // * It is also safe for |callback| to delete the chained future that Then() |
| // returns. |
| // * The future returned by Then() will be owned by this future, so you do not |
| // need to maintain a reference to it. |
| // |
| // The type of this function looks complex, but is basically: |
| // |
| // FuturePtr<> Then(std::function<void(Result...)> callback); |
| template <typename Callback, |
| typename = typename std::enable_if_t< |
| internal::is_void_v<std::result_of_t<Callback(Result...)>>>> |
| FuturePtr<> Then(Callback callback) { |
| return SubfutureCreate( |
| Future<>::Create(trace_name_ + "(Then)"), |
| SubfutureVoidCallback<Result...>(std::move(callback)), |
| SubfutureCompleter<>(), [] { return true; }); |
| } |
| |
| // Equivalent to Then(), but guards execution of |callback| with a WeakPtr. |
| // If, at the time |callback| is to be executed, |weak_ptr| has been |
| // invalidated, |callback| is not run, nor is the next Future in the chain |
| // completed. |
| template <typename Callback, typename T, |
| typename = typename std::enable_if_t< |
| internal::is_void_v<std::result_of_t<Callback(Result...)>>>> |
| FuturePtr<> WeakThen(fxl::WeakPtr<T> weak_ptr, Callback callback) { |
| return SubfutureCreate( |
| Future<>::Create(trace_name_ + "(WeakThen)"), |
| SubfutureVoidCallback<Result...>(std::move(callback)), |
| SubfutureCompleter<>(), [weak_ptr] { return !!weak_ptr; }); |
| } |
| |
| // Similar to Then(), except that: |
| // |
| // * |const_callback| must take in the completed result via a const&, |
| // * multiple callbacks can be attached, |
| // * |const_callback| is called _before_ the Then() callback. |
| FuturePtr<> ConstThen(std::function<void(const Result&...)> const_callback) { |
| FuturePtr<> subfuture = Future<>::Create(trace_name_ + "(ConstThen)"); |
| AddConstCallback(SubfutureCallback<const Result&...>( |
| subfuture, |
| SubfutureVoidCallback<const Result&...>(std::move(const_callback)), |
| SubfutureCompleter<>(), [] { return true; })); |
| return subfuture; |
| } |
| |
| // See WeakThen(). |
| template <typename T> |
| FuturePtr<> WeakConstThen( |
| fxl::WeakPtr<T> weak_ptr, |
| std::function<void(const Result&...)> const_callback) { |
| FuturePtr<> subfuture = Future<>::Create(trace_name_ + "(WeakConstThen)"); |
| AddConstCallback(SubfutureCallback<const Result&...>( |
| subfuture, |
| SubfutureVoidCallback<const Result&...>(std::move(const_callback)), |
| SubfutureCompleter<>(), [weak_ptr] { return !!weak_ptr; })); |
| return subfuture; |
| } |
| |
| // Attaches a |callback| that is invoked when this future is completed with |
| // Complete(). |callback| must return another future: when the returned future |
| // completes, the future returned by AsyncMap() will complete. For example: |
| // |
| // ShowProgressSpinner(true); |
| // FuturePtr<Bytes> f = MakeNetworkRequest(request); |
| // f->AsyncMap([] (Bytes zipped_bytes) { |
| // FuturePtr<Bytes> f = UnzipInBackground(zipped_bytes); |
| // return f; |
| // })->AsyncMap([] (Bytes unzipped_bytes) { |
| // FuturePtr<JPEGImage> f = DecodeImageInBackground(unzipped_bytes); |
| // return f; |
| // })->AsyncMap([] (JPEGImage image) { |
| // FuturePtr<> f = UpdateUIAsynchronously(image); |
| // return f; |
| // })->Then([] { |
| // ShowProgressSpinner(false); |
| // }); |
| // |
| // The type of this method looks terrifying, but is basically: |
| // |
| // FuturePtr<CallbackResult> |
| // AsyncMap(std::function<FuturePtr<CallbackResult>(Result...)> callback); |
| template <typename Callback, |
| typename AsyncMapResult = std::result_of_t<Callback(Result...)>, |
| typename MapResult = |
| typename AsyncMapResult::element_type::result_tuple_type, |
| typename = typename std::enable_if_t<internal::is_convertible_v< |
| FuturePtr<MapResult>, AsyncMapResult>>> |
| AsyncMapResult AsyncMap(Callback callback) { |
| return SubfutureCreate( |
| AsyncMapResult::element_type::Create(trace_name_ + "(AsyncMap)"), |
| std::move(callback), SubfutureAsyncMapCompleter<AsyncMapResult>(), |
| [] { return true; }); |
| } |
| |
| template <typename Callback, typename T, |
| typename AsyncMapResult = std::result_of_t<Callback(Result...)>, |
| typename MapResult = |
| typename AsyncMapResult::element_type::result_tuple_type, |
| typename = typename std::enable_if_t<internal::is_convertible_v< |
| FuturePtr<MapResult>, AsyncMapResult>>> |
| AsyncMapResult WeakAsyncMap(fxl::WeakPtr<T> weak_ptr, Callback callback) { |
| return SubfutureCreate( |
| AsyncMapResult::element_type::Create(trace_name_ + "(WeakAsyncMap)"), |
| std::move(callback), SubfutureAsyncMapCompleter<AsyncMapResult>(), |
| [weak_ptr] { return !!weak_ptr; }); |
| } |
| |
| // Attaches a |callback| that is invoked when this future is completed with |
| // Complete(). The returned future is completed with |callback|'s return |
| // value, when |callback| finishes executing. Returned tuples are flattened |
| // into variadic futures. |
| // |
| // To return a future that produces a tuple (uncommon), wrap the map result |
| // in another tuple (or use |AsyncMap|). |
| // |
| // That is: |
| // Callback return type | Returned future |
| // ----------------------------------+---------------- |
| // T | FuturePtr<T> |
| // std::tuple<T, U, ...> | FuturePtr<T, U, ...> |
| // std::tuple<std::tuple<T, U, ...>> | FuturePtr<std::tuple<T, U, ...>> |
| template <typename Callback, |
| typename MapResult = std::result_of_t<Callback(Result...)>> |
| auto Map(Callback callback) { |
| return Map(std::move(callback), Tag<MapResult>{}); |
| } |
| |
| template <typename Callback, typename T, |
| typename MapResult = std::result_of_t<Callback(Result...)>> |
| FuturePtr<MapResult> WeakMap(fxl::WeakPtr<T> weak_ptr, Callback callback) { |
| return SubfutureCreate(Future<MapResult>::Create(trace_name_ + "(WeakMap)"), |
| std::move(callback), SubfutureCompleter<MapResult>(), |
| [weak_ptr] { return !!weak_ptr; }); |
| } |
| |
| const std::string& trace_name() const { return trace_name_; } |
| |
| private: |
| template <typename... Args> |
| friend class Future; |
| |
| // This is a utility class used as a template parameter to determine function |
| // overloading; see |
| // <https://www.boost.org/community/generic_programming.html#tag_dispatching> |
| // for more info. |
| // |
| // It is used in the |Map| overloads to "specialize" for |std::tuple| and |
| // capture its parameter pack. |
| template <typename T> |
| class Tag {}; |
| |
| template <typename ResultsFuture, typename... Args> |
| friend fxl::RefPtr<ResultsFuture> Wait( |
| const std::string& trace_name, |
| const std::vector<FuturePtr<Args...>>& futures); |
| |
| template <typename ResultsFuture, typename TimeoutCallback, typename... Args> |
| friend fxl::RefPtr<ResultsFuture> WaitWithTimeout( |
| const std::string& trace_name, async_dispatcher_t* dispatcher, |
| zx::duration timeout, TimeoutCallback on_timeout, |
| const std::vector<FuturePtr<Args...>>& futures); |
| |
| FRIEND_REF_COUNTED_THREAD_SAFE(Future); |
| |
| Future() : result_{}, weak_factory_(this) {} |
| |
| void SetCallback(std::function<void(Result...)>&& callback) { |
| callback_ = callback; |
| |
| MaybeInvokeCallbacks(); |
| } |
| |
| void SetCallbackWithTuple( |
| std::function<void(std::tuple<Result...>)>&& callback) { |
| SetCallback([callback](Result&&... result) { |
| callback(std::forward_as_tuple(std::forward<Result>(result)...)); |
| }); |
| } |
| |
| void AddConstCallback(std::function<void(const Result&...)>&& callback) { |
| if (!callback) { |
| return; |
| } |
| |
| // It's impossible to add a const callback after a future is completed |
| // *and* it has a callback: the completed value will be moved into the |
| // callback and won't be available for a ConstThen(). |
| if (status_ == internal::FutureStatus::kConsumed) { |
| FXL_LOG(FATAL) |
| << "Future@" << static_cast<void*>(this) |
| << (trace_name_.length() ? "(" + trace_name_ + ")" : "") |
| << ": Cannot add a const callback after completed result is " |
| "already moved into Then() callback."; |
| } |
| |
| const_callbacks_.emplace_back(callback); |
| |
| MaybeInvokeCallbacks(); |
| } |
| |
| void CompleteWithTuple(std::tuple<Result...>&& result) { |
| FXL_DCHECK(status_ == internal::FutureStatus::kAwaiting) |
| << "Future@" << static_cast<void*>(this) |
| << (trace_name_.length() ? "(" + trace_name_ + ")" : "") |
| << ": Complete() called twice."; |
| |
| result_ = std::forward<std::tuple<Result...>>(result); |
| status_ = internal::FutureStatus::kCompleted; |
| |
| MaybeInvokeCallbacks(); |
| } |
| |
| void MaybeInvokeCallbacks() { |
| if (status_ == internal::FutureStatus::kAwaiting) { |
| return; |
| } |
| |
| if (const_callbacks_.size()) { |
| // Move |const_callbacks_| to a local variable. MaybeInvokeCallbacks() |
| // can be called multiple times if the client only uses ConstThen() or |
| // WeakConstThen() to fetch the completed values. This prevents calling |
| // these callbacks multiple times by moving them out of the members |
| // scope. |
| auto local_const_callbacks = std::move(const_callbacks_); |
| for (auto& const_callback : local_const_callbacks) { |
| fxl::Apply(const_callback, result_); |
| } |
| } |
| |
| if (callback_) { |
| auto callback = std::move(callback_); |
| status_ = internal::FutureStatus::kConsumed; |
| fxl::Apply(callback, std::move(result_)); |
| } |
| } |
| |
| // The "subfuture" methods below are private helper functions designed to be |
| // used with the futures that are returned by the public API (Then(), Map(), |
| // etc); those returned futures are named "subfutures" in the code, which is |
| // why the methods are named likewise. |
| |
| // A convenience method to call this->SetCallback() with the lambda returned |
| // by SubfutureCallback(). |
| template <typename Subfuture, typename SubfutureCompleter, typename Callback, |
| typename Guard> |
| Subfuture SubfutureCreate(Subfuture subfuture, Callback&& callback, |
| SubfutureCompleter&& subfuture_completer, |
| Guard&& guard) { |
| SetCallback(SubfutureCallback<Result...>(subfuture, callback, |
| subfuture_completer, guard)); |
| return subfuture; |
| } |
| |
| // Returns a lambda that: |
| // |
| // 1. calls |guard| before calling |callback|, and only calls |callback| if |
| // |guard| returns true; |
| // 2. will not call |subfuture_completer| if either |this| or |subfuture| are |
| // destroyed by |callback|. |
| template <typename... CoercedResult, typename Subfuture, |
| typename SubfutureCompleter, typename Callback, typename Guard> |
| auto SubfutureCallback(Subfuture subfuture, Callback&& callback, |
| SubfutureCompleter&& subfuture_completer, |
| Guard&& guard) { |
| return [this, subfuture, callback, subfuture_completer, |
| guard](CoercedResult&&... result) { |
| if (!guard()) |
| return; |
| |
| auto weak_future = weak_factory_.GetWeakPtr(); |
| auto weak_subfuture = subfuture->weak_factory_.GetWeakPtr(); |
| auto subfuture_result = callback(std::forward<CoercedResult>(result)...); |
| |
| // |callback| above may delete this future or the returned subfuture when |
| // it finishes executing, so check if |weak_future| and |weak_subfuture| |
| // are still valid before attempting to complete the subfuture. |
| if (weak_future && weak_subfuture) |
| subfuture_completer(subfuture, std::move(subfuture_result)); |
| }; |
| } |
| |
| // Returns a lambda that calls |callback|, and returns an empty tuple. The |
| // consistent return type enables generic programming techniques to be |
| // applied to |callback| since the return type is consistent (it's always a |
| // |std::tuple<T...>|). |
| template <typename... CoercedResult, typename Callback> |
| auto SubfutureVoidCallback(Callback&& callback) { |
| return [callback](CoercedResult&&... result) { |
| callback(std::forward<CoercedResult>(result)...); |
| return std::make_tuple(); |
| }; |
| } |
| |
| // Returns a lambda that, when called with a subfuture and a std::tuple, will |
| // complete the subfuture with the values from the tuple elements. This method |
| // is designed to be used with SubfutureAsyncMapCompleter(), which will do |
| // the same thing but can be passed Futures for the std::tuple values. |
| // Together, this enables generic programming techniques to be applied to the |
| // returned lambda, since the lambda presents a consistent API for callers. |
| template <typename... SubfutureResult> |
| auto SubfutureCompleter() { |
| return [](FuturePtr<SubfutureResult...> subfuture, |
| std::tuple<SubfutureResult...> subfuture_result) { |
| subfuture->CompleteWithTuple(std::move(subfuture_result)); |
| }; |
| } |
| |
| // See the documentation for SubfutureCompleter() above. |
| template <typename AsyncMapResult> |
| auto SubfutureAsyncMapCompleter() { |
| return [](AsyncMapResult subfuture, |
| std::tuple<AsyncMapResult> subfuture_result) { |
| std::get<0>(subfuture_result) |
| ->SetCallbackWithTuple( |
| [subfuture]( |
| std::tuple< |
| typename AsyncMapResult::element_type::result_tuple_type> |
| transformed_result) { |
| subfuture->CompleteWithTuple( |
| std::move(std::get<0>(transformed_result))); |
| }); |
| }; |
| } |
| |
| // The following overloads enable |Map| to flatten out functions that map to |
| // |std::tuple|. |
| |
| template <typename Callback, typename MapResult> |
| FuturePtr<MapResult> Map(Callback callback, Tag<MapResult>) { |
| // Directly passing |callback| like this ends up relying on an implicit |
| // |std::tuple| memberwise constructor, which should be fine. It will |
| // convert from |MapResult| to |std::tuple<MapResult>| implicitly. |
| return Map(std::move(callback), Tag<std::tuple<MapResult>>{}); |
| } |
| |
| template <typename Callback, typename... MapResult> |
| FuturePtr<MapResult...> Map(Callback callback, |
| Tag<std::tuple<MapResult...>>) { |
| return SubfutureCreate(Future<MapResult...>::Create(trace_name_ + "(Map)"), |
| std::move(callback), |
| SubfutureCompleter<MapResult...>(), |
| [] { return true; }); |
| } |
| |
| std::string trace_name_; |
| |
| internal::FutureStatus status_ = internal::FutureStatus::kAwaiting; |
| std::tuple<Result...> result_; |
| |
| // TODO(MI4-1102): Convert std::function to fit::function here & everywhere. |
| |
| // The callback attached to this future. |
| std::function<void(Result...)> callback_; |
| |
| // Callbacks that have attached with the Const*() methods, such as |
| // ConstThen(). |
| std::vector<std::function<void(const Result&...)>> const_callbacks_; |
| |
| // Keep this last in the list of members. (See WeakPtrFactory documentation |
| // for more info.) |
| fxl::WeakPtrFactory<Future<Result...>> weak_factory_; |
| |
| FXL_DISALLOW_COPY_ASSIGN_AND_MOVE(Future); |
| |
| // For unit tests only. |
| friend class FutureTest; |
| |
| // For unit tests only. |
| const std::tuple<Result...>& get() const { |
| FXL_DCHECK(status_ != internal::FutureStatus::kAwaiting) |
| << trace_name_ << ": get() called on unset future"; |
| |
| return result_; |
| } |
| }; |
| |
| // Returns a Future that completes when every future in |futures| is complete. |
| // The future returned by Wait() will be kept alive until every future in |
| // |futures| either completes or is destroyed. If any future in |futures| is |
| // destroyed prior to completing, the returned future will never complete. The |
| // order of the results corresponds to the order of the given futures, |
| // regardless of their completion order. |
| // |
| // The default type of the resulting future depends on the types of the |
| // component futures. Void futures produce a void future. Monadic futures |
| // produce a future of a flat vector. Polyadic futures produce a future of a |
| // vector of tuples. That is: |
| // |
| // Components | Result |
| // -------------------+-------------------------------------------- |
| // FuturePtr<> | FuturePtr<> |
| // FuturePtr<T> | FuturePtr<std::vector<T>> |
| // FuturePtr<T, U...> | FuturePtr<std::vector<std::tuple<T, U...>>> |
| // |
| // These defaults may be overridden by specifying the template argument for |
| // Wait as the type of future desired. All cases may produce a Future<> or a |
| // Future<std::vector<std::tuple<...>>>. |
| // |
| // Example usage: |
| // |
| // FuturePtr<Bytes> f1 = MakeNetworkRequest(request1); |
| // FuturePtr<Bytes> f2 = MakeNetworkRequest(request2); |
| // FuturePtr<Bytes> f3 = MakeNetworkRequest(request3); |
| // std::vector<FuturePtr<Bytes>> requests{f1, f2, f3}; |
| // Wait("NetworkRequests", requests)->Then([]( |
| // std::vector<Bytes> bytes_vector) { |
| // Bytes f1_bytes = bytes_vector[0]; |
| // Bytes f2_bytes = bytes_vector[1]; |
| // Bytes f3_bytes = bytes_vector[2]; |
| // }); |
| // |
| // This is similar to Promise.All() in JavaScript, or Join() in Rust. |
| template <typename ResultsFuture, typename... Result> |
| fxl::RefPtr<ResultsFuture> Wait( |
| const std::string& trace_name, |
| const std::vector<FuturePtr<Result...>>& futures) { |
| if (futures.empty()) { |
| auto immediate = ResultsFuture::Create(trace_name + "(Completed)"); |
| immediate->CompleteWithTuple({}); |
| return immediate; |
| } |
| |
| auto results = std::make_shared<internal::ResultCollector<ResultsFuture>>( |
| futures.size()); |
| |
| fxl::RefPtr<ResultsFuture> all_futures_completed = |
| ResultsFuture::Create(trace_name + "(WillWait)"); |
| |
| for (size_t i = 0; i < futures.size(); i++) { |
| const auto& future = futures[i]; |
| future->SetCallback( |
| [i, all_futures_completed, results](Result&&... result) { |
| results->AssignResult(i, std::forward<Result>(result)...); |
| |
| if (results->IsComplete()) { |
| results->Complete(all_futures_completed.get()); |
| } |
| }); |
| } |
| |
| return all_futures_completed; |
| } |
| |
| // Like |Wait|, but gives up after a timeout. After the timeout, |on_timeout| is |
| // invoked with a diagnostic error string containing the trace names of the |
| // futures that have not completed. |
| // |
| // This maintains a reference to the returned |Future| until all component |
| // futures have been completed or destroyed, or until the timeout has elapsed, |
| // whichever happens first. However, |on_timeout| will be invoked on timeout if |
| // any future has not completed even if any or all futures have been destroyed. |
| template <typename ResultsFuture, typename TimeoutCallback, typename... Result> |
| fxl::RefPtr<ResultsFuture> WaitWithTimeout( |
| const std::string& trace_name, async_dispatcher_t* dispatcher, |
| zx::duration timeout, |
| TimeoutCallback on_timeout /* void (const std::string&) */, |
| const std::vector<FuturePtr<Result...>>& futures) { |
| auto all_futures_completed = Wait<ResultsFuture>(trace_name, futures); |
| |
| if (all_futures_completed->status_ != internal::FutureStatus::kAwaiting) { |
| return all_futures_completed; |
| } |
| |
| auto all_trace_names = |
| std::make_shared<std::vector<std::unique_ptr<std::string>>>(); |
| all_trace_names->reserve(futures.size()); |
| |
| for (const auto& future : futures) { |
| // There's no point in waiting on completed futures. Furthermore if we tried |
| // that we'd have to put this before |Wait| since |Wait| consumes the |
| // results, but then if all futures are already completed this is all just |
| // wasted effort. |
| if (future->status_ == internal::FutureStatus::kAwaiting) { |
| size_t i = all_trace_names->size(); |
| all_trace_names->push_back( |
| std::make_unique<std::string>(future->trace_name_)); |
| future->AddConstCallback([i, all_trace_names](const Result&...) { |
| if (!all_trace_names->empty()) { |
| (*all_trace_names)[i] = nullptr; |
| } |
| }); |
| } |
| } |
| |
| // Return a proxy so that we can cancel result forwarding in the case of a |
| // timeout. This could with more difficulty be done within |Wait|, but this |
| // way allows us to reuse the logic more easily. |
| fxl::RefPtr<ResultsFuture> all_proxy = |
| ResultsFuture::Create(trace_name + "(WillWaitWithTimeout)"); |
| all_futures_completed->SetCallback(all_proxy->Completer()); |
| |
| // TODO(rosswang): Factor this into dump and cancel functions that can be |
| // called at other times. |
| async::PostDelayedTask( |
| dispatcher, |
| [all_trace_names = std::move(all_trace_names), |
| on_timeout = std::move(on_timeout), |
| all_futures_completed = |
| all_futures_completed->weak_factory_.GetWeakPtr()] { |
| std::ostringstream msg; |
| for (const auto& trace_name : *all_trace_names) { |
| if (trace_name) { |
| msg << "\n\t" << *trace_name; |
| } |
| } |
| if (!msg.str().empty()) { |
| on_timeout("Wait timed out. Still waiting for futures:" + msg.str()); |
| if (all_futures_completed) { |
| // cancel results forwarding (possibly releasing all_proxy) |
| all_futures_completed->SetCallback(nullptr); |
| } |
| // Possibly release the component futures. Once this task completes |
| // and goes out of scope, the last reference to the Wait future (also |
| // holding onto the component futures) should be released as well. |
| all_trace_names->clear(); |
| } |
| }, |
| timeout); |
| |
| return all_proxy; |
| } |
| |
| // |WaitWithTimeout| on the thread defaut dispatcher. |
| template <typename ResultsFuture, typename TimeoutCallback, typename... Result> |
| fxl::RefPtr<ResultsFuture> WaitWithTimeout( |
| const std::string& trace_name, zx::duration timeout, |
| TimeoutCallback on_timeout /* void (const std::string&) */, |
| const std::vector<FuturePtr<Result...>>& futures) { |
| return WaitWithTimeout<ResultsFuture>(trace_name, |
| async_get_default_dispatcher(), timeout, |
| std::move(on_timeout), futures); |
| } |
| |
| // These overloads allow us to effectively default the first template parameter, |
| // |ResultsFuture| (since the others are intended to be inferred). |
| // TODO(rosswang): If the overload combinatoric explosion gets too heavy, we can |
| // use a dummy struct parameter instead to encapsulate that template parameter. |
| |
| template <typename... Result> |
| auto Wait(const std::string& trace_name, |
| const std::vector<FuturePtr<Result...>>& futures) { |
| return Wait<internal::DefaultResultsFuture_t<Result...>>(trace_name, futures); |
| } |
| |
| template <typename TimeoutCallback, typename... Result> |
| auto WaitWithTimeout(const std::string& trace_name, |
| async_dispatcher_t* dispatcher, zx::duration timeout, |
| TimeoutCallback on_timeout, |
| const std::vector<FuturePtr<Result...>>& futures) { |
| return WaitWithTimeout<internal::DefaultResultsFuture_t<Result...>>( |
| trace_name, dispatcher, timeout, std::move(on_timeout), futures); |
| } |
| |
| template <typename TimeoutCallback, typename... Result> |
| auto WaitWithTimeout(const std::string& trace_name, zx::duration timeout, |
| TimeoutCallback on_timeout /* void (const std::string&) */, |
| const std::vector<FuturePtr<Result...>>& futures) { |
| return WaitWithTimeout<internal::DefaultResultsFuture_t<Result...>>( |
| trace_name, async_get_default_dispatcher(), timeout, |
| std::move(on_timeout), futures); |
| } |
| |
| // We need to provide the initializer list overloads or template deduction fails |
| // for the above overloads if given an initializer list. |
| // TODO(rosswang): Add a potentially heterogeneous variadic template instead, |
| // and prefer it over initializer lists outside of tests. |
| template <typename ResultsFuture, typename... Result> |
| auto Wait(const std::string& trace_name, |
| std::initializer_list<FuturePtr<Result...>> futures) { |
| return Wait<ResultsFuture>(trace_name, |
| std::vector<FuturePtr<Result...>>(futures)); |
| } |
| |
| template <typename ResultsFuture, typename TimeoutCallback, typename... Result> |
| fxl::RefPtr<ResultsFuture> WaitWithTimeout( |
| const std::string& trace_name, async_dispatcher_t* dispatcher, |
| zx::duration timeout, TimeoutCallback on_timeout, |
| std::initializer_list<FuturePtr<Result...>> futures) { |
| return WaitWithTimeout<ResultsFuture>( |
| trace_name, dispatcher, timeout, std::move(on_timeout), |
| std::vector<FuturePtr<Result...>>(futures)); |
| } |
| |
| template <typename ResultsFuture, typename TimeoutCallback, typename... Result> |
| fxl::RefPtr<ResultsFuture> WaitWithTimeout( |
| const std::string& trace_name, zx::duration timeout, |
| TimeoutCallback on_timeout /* void (const std::string&) */, |
| std::initializer_list<FuturePtr<Result...>> futures) { |
| return WaitWithTimeout<ResultsFuture>( |
| trace_name, async_get_default_dispatcher(), timeout, |
| std::move(on_timeout), std::vector<FuturePtr<Result...>>(futures)); |
| } |
| |
| template <typename... Result> |
| auto Wait(const std::string& trace_name, |
| std::initializer_list<FuturePtr<Result...>> futures) { |
| return Wait(trace_name, std::vector<FuturePtr<Result...>>(futures)); |
| } |
| |
| template <typename TimeoutCallback, typename... Result> |
| auto WaitWithTimeout(const std::string& trace_name, |
| async_dispatcher_t* dispatcher, zx::duration timeout, |
| TimeoutCallback on_timeout, |
| std::initializer_list<FuturePtr<Result...>> futures) { |
| return WaitWithTimeout(trace_name, dispatcher, timeout, std::move(on_timeout), |
| std::vector<FuturePtr<Result...>>(futures)); |
| } |
| |
| template <typename TimeoutCallback, typename... Result> |
| auto WaitWithTimeout(const std::string& trace_name, zx::duration timeout, |
| TimeoutCallback on_timeout /* void (const std::string&) */, |
| std::initializer_list<FuturePtr<Result...>> futures) { |
| return WaitWithTimeout(trace_name, async_get_default_dispatcher(), timeout, |
| std::move(on_timeout), |
| std::vector<FuturePtr<Result...>>(futures)); |
| } |
| |
| } // namespace modular |
| |
| #endif // LIB_ASYNC_CPP_FUTURE_H_ |