// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#ifndef LIB_ASYNC_CPP_FUTURE_H_
#define LIB_ASYNC_CPP_FUTURE_H_

#include <memory>
#include <string>
#include <type_traits>
#include <utility>
#include <vector>

#include <lib/async/cpp/task.h>
#include <lib/async/default.h>

#include "garnet/public/lib/fxl/functional/apply.h"
#include "garnet/public/lib/fxl/macros.h"
#include "garnet/public/lib/fxl/memory/ref_counted.h"
#include "garnet/public/lib/fxl/memory/ref_ptr.h"
#include "garnet/public/lib/fxl/memory/weak_ptr.h"

namespace modular {

// # Futures
//
// A *future* is an object representing the eventual value of an asynchronous
// operation. They are a useful complement to callback functions or lambdas
// because they are _composable_: asynchronous operations can be sequentially
// executed, and an async operation's result can be passed to another async
// operation, like a Unix pipeline.
//
// To use a future:
//
// 1. A *producer*, typically an async operation, creates a Future with
//    Future<ResultType>::Create().
// 2. The producer starts its async operation (e.g. a network request or disk
//    read).
// 3. The producer synchronously returns the Future to a *consumer*.
// 4. A consumer attaches a *callback* lambda to the Future using Then(). (The
//    callback can be attached to the future any time after the future is
//    created, before or after the async operation is finished.)
// 5. Some time later, when the producer's async operation is finished, the
//    producer *completes* the future with a *result* using Complete(result).
//    |result| is 0 to N movable or copyable values, e.g. Complete(),
//    Complete(value), Complete(value1, value2, ...).
// 6. The consumer's callback is invoked after the future is completed, with the
//    completed result passed as zero or more parameters to the callback.
//
// The following example shows a simple use case:
//
// Producer:
//
// FuturePtr<Bytes> MakeNetworkRequest(NetworkRequest& request) {
//   auto f = Future<Bytes>::Create("NetworkRequest");  // a "trace_name" that's
//                                                      // logged when things go
//                                                      // wrong
//   auto network_request_callback = [f] (Bytes bytes) {
//     f->Complete(bytes);
//   };
//   PerformAsyncNetworkRequest(request, network_request_callback);
//   return f;
// }
//
// Client:
//
// FuturePtr<Bytes> f = MakeNetworkRequest();
// f->Then([] (Bytes bytes) {
//   ProcessBytes(bytes);
// });
//
// ## Chaining and sequencing futures
//
// Futures can be composed; this is commonly called "chaining". Methods that
// attach callbacks, such as Then(), will return another Future: the returned
// Future is completed once the previous callback has finished executing. For
// example:
//
// Client:
//
// ShowProgressSpinner(true);
// FuturePtr<Bytes> f = MakeNetworkRequest(request);
// f->AsyncMap([] (Bytes zipped_bytes) {
//   // Use AsyncMap() if your callback wants to return another Future.
//   FuturePtr<Bytes> f = UnzipInBackground(zipped_bytes);
//   return f;
// })->Map([] (Bytes unzipped_bytes) {
//   // Use Map() if your callback returns a non-future: the callback's return
//   // value will be wrapped up into the returned Future.
//   JPEGImage image = DecodeImageSynchronously(unzipped_bytes);
//   return image;
// })->AsyncMap([] (JPEGImage image) {
//   FuturePtr<> f = UpdateUIAsynchronously(image);
//   return f;
// })->Then([] {
//   // Use Then() if your callback returns void. Note that Then() still returns
//   // a Future<>, which will be completed only when your callback finishes
//   // executing.
//   ShowProgressSpinner(false);
// });
//
// ## Memory Management & Ownership
//
// FuturePtr is a fxl::RefPtr, which is a smart pointer similar to
// std::shared_ptr that holds a reference count (refcount). When you call
// Future::Create(), you are expected to maintain a reference to it. When the
// Future is deleted, its result and callbacks are also deleted.
//
// Each method documents how it affects the future's refcount. To summarize:
//
// * Calling Then() on a future does not affect its refcount. This applies to
//   all methods that returns a chained future, such as AsyncMap() and Map().
// * However, calling Then() on a future will cause that future to maintain a
//   reference to the returned chained future. So, you do not need to maintain a
//   reference to the returned future.
// * Calling Complete() does not affect the future's refcount.
// * Unlike Complete(), the closure returned by Completer() _does own_ the
//   future, so you do not need to maintain a reference to the future after
//   calling Completer(). (You do need to maintain a reference to the closure,
//   however.)
// * Wait(futures) returns a future that every future in |futures| owns, so you
//   do not need to maintain a reference to the returned future. The callback
//   attached to each future in |futures| will also keep a reference to
//   themselves, so that if a future that is Wait()ed on otherwise goes out of
//   scope, the future itself is kept alive.
//
// See each method's documentation for more details on memory management.
//
// ## Use Weak*() variants to cancel callback chains
//
// "Weak" variants exist for all chain/sequence methods (WeakThen(),
// WeakConstThen(), WeakMap() and WeakAsyncMap()). These are almost identical
// to their non-weak counterparts but take an fxl::WeakPtr<T> as a first
// argument. If, at callback invocation time, the WeakPtr is invalid, execution
// will halt and no future further down the chain will be executed.
//
// Example:
//
// FuturePtr<> f = MakeFuture();
// f->WeakThen(weak_ptr_factory.GetWeakPtr(), [] {
//   FXL_LOG(INFO) << "This won't execute";
// })->Then([] {
//   FXL_LOG(INFO) << "Neither will this";
// });
// weak_ptr_factory.InvalidateWeakPtrs();
// f->Complete();
//
// ## Use Wait() to synchronize on multiple futures
//
// If multiple futures are running, use the Wait() function to create a
// Future that completes when all the futures passed to it are completed:
//
// FuturePtr<Bytes> f1 = MakeNetworkRequest(request1);
// FuturePtr<Bytes> f2 = MakeNetworkRequest(request2);
// FuturePtr<Bytes> f3 = MakeNetworkRequest(request3);
// Wait<Future<>>("Network requests", {f1, f2, f3})->Then([] {
//   AllNetworkRequestsAreComplete();
// });
//
// See the Wait() function documentation for more details.
//
// ## Use Completer() to integrate with functions requiring callbacks
//
// Use the Completer() method to integrate with existing code that uses callback
// functions. Completer() returns an std::function<void(Result)> that, when
// called, calls Complete() on the future. Re-visiting the first example:
//
// Without Completer():
//
// FuturePtr<Bytes> MakeNetworkRequest(NetworkRequest& request) {
//   auto f = Future<Bytes>::Create("NetworkRequest");
//   auto network_request_callback = [f] (Bytes bytes) {
//     f->Complete(bytes);
//   };
//   PerformAsyncNetworkRequest(request, network_request_callback);
//   return f;
// }
//
// With Completer():
//
// FuturePtr<Bytes> MakeNetworkRequest(NetworkRequest& request) {
//   auto f = Future<Bytes>::Create("NetworkRequest");
//   PerformAsyncNetworkRequest(request, f->Completer());
//   return f;
// }
//
// ## Use error values to propagate errors back to consumers
//
// If the future can fail, use a value (or multiple values) that's capable of
// storing both the error and a successful result to propagate the return value
// back to the consumer. For example:
//
// FuturePtr<std::error_code, Bytes> f = MakeNetworkRequest();
// f->Then([] (std::error_code error, Bytes bytes) {
//   if (error) {
//     // handle error
//   } else {
//     // network request was successful
//     ProcessBytes(bytes);
//   }
// });
//
// ## fuchsia::modular::Future vs other Futures/Promises
//
// If you are familiar with Futures & Promises in other languages, this Future
// class is intentionally different from others, to better integrate with
// Fuchsia coding patterns:
//
// * NOT THREADSAFE. This will change in the future when thread safety is
//   required, but there are no use cases yet. (YAGNI!)
// * Support for multiple result values via variadic template parameters. This
//   is required for smooth integration with the fuchsia::modular::Operation
//   class.
// * Only a single callback can be set via Then(), since move semantics are used
//   so heavily in Fuchsia code. (If multiple callbacks were supported and the
//   result is moved, how does one move the result from one callback to
//   another?)
//   * Multiple callbacks can be set if the callback lambda takes the result via
//     const&. In this case, use ConstThen() to attach each callback, rather
//     than Then(). ConstThen() calls can also be chained, like Then().
// * There are no success/error callbacks and control flows: all callbacks are
//   "success callbacks".
//   * The traditional reason for error callbacks is to convert exceptions into
//     error values, but Google C++ style doesn't use exceptions, so error
//     callbacks aren't needed.
//   * There's some argument that using separate success/error control flow
//     paths is beneficial. However, in practice, a lot of client code using
//     this pattern don't attach error callbacks, only success callbacks, so
//     errors often go unchecked.
//   * If error values need to be propagated back to the client, use a dedicated
//     error type. (Note that many built-in types may have values that can be
//     interpreted as errors, e.g. nullptr, or 0 or -1.) This also forces a
//     consumer to inspect the type and check for errors.
// * No cancellation/progress support. Adding cancellation/progress:
//   * adds more complexity to the futures implementation,
//   * can be implemented on top of a core futures implementation for the few
//     cases where they're required, and
//   * typically requires extra cancel/progress callbacks, which adds more
//     control flow paths.
//   * see fxl::CancelableCallback if you need cancellation.
// * No execution contexts (yet).
//   * There needs to be a comprehensive story about runloops etc first.

template <typename... Result>
class Future;

template <typename... Result>
using FuturePtr = fxl::RefPtr<Future<Result...>>;

namespace internal {

enum class FutureStatus {
  kAwaiting,   // not completed
  kCompleted,  // value available, not yet moved into callback
  kConsumed    // value moved into callback
};

// type_traits functions, ported from C++17.
template <typename From, typename To>
constexpr bool is_convertible_v = std::is_convertible<From, To>::value;

template <class T>
constexpr bool is_void_v = std::is_void<T>::value;

template <typename... Result>
class DefaultResultsFuture {
 public:
  using type = Future<std::vector<std::tuple<Result...>>>;
};

template <typename Result>
class DefaultResultsFuture<Result> {
 public:
  using type = Future<std::vector<Result>>;
};

template <>
class DefaultResultsFuture<> {
 public:
  using type = Future<>;
};

template <typename... Result>
using DefaultResultsFuture_t = typename DefaultResultsFuture<Result...>::type;

template <typename ResultsFuture>
class ResultCollector {
 public:
  // ResultsFuture = Future<std::vector<ElementType>>
  using ElementType = typename std::tuple_element_t<
      0, typename ResultsFuture::result_tuple_type>::value_type;

  ResultCollector(size_t reserved_count) : results_(reserved_count) {}

  bool IsComplete() const { return finished_count_ == results_.size(); }

  template <typename... Result>
  void AssignResult(size_t result_index, Result&&... result) {
    results_[result_index] =
        std::make_unique<ElementType>(std::forward<Result>(result)...);
    finished_count_++;
  }

  void Complete(ResultsFuture* future) {
    std::vector<ElementType> final_results;
    final_results.reserve(results_.size());
    for (auto& result : results_) {
      final_results.push_back(std::move(*result));
    }
    future->Complete(std::move(final_results));
  }

 private:
  size_t finished_count_ = 0;
  // Use unique ptrs initially so that we can reserve even if |ElementType| is
  // not default-constructible.
  //
  // TODO(rosswang): Consider adding a specialization for default-constructible
  // types.
  std::vector<std::unique_ptr<ElementType>> results_;
};

template <>
class ResultCollector<Future<>> {
 public:
  ResultCollector(size_t reserved_count);
  bool IsComplete() const;

  // The template on this allows us to use Future<> collectors to swallow
  // unneeded results of futures we're waiting on.
  template <typename... Result>
  void AssignResult(size_t, Result... result) {
    finished_count_++;
  }

  void Complete(Future<>* future) const;

 private:
  size_t finished_count_ = 0;
  size_t reserved_count_;
};

}  // namespace internal

template <typename... Result>
class Future : public fxl::RefCountedThreadSafe<Future<Result...>> {
 public:
  using result_tuple_type = std::tuple<Result...>;

  // Creates a FuturePtr<Result...>. |trace_name| is used solely for debugging
  // purposes, and is logged when something goes wrong (e.g. Complete() is
  // called twice.)
  static FuturePtr<Result...> Create(const std::string& trace_name) {
    auto f = fxl::AdoptRef(new Future<Result...>);
    f->trace_name_ = std::move(trace_name);
    return f;
  }

  // Creates a FuturePtr<Result...> that's already completed. For example:
  //
  //   FuturePtr<int> f = Future<int>::CreateCompleted("MyFuture", 5);
  //   f->Then([] (int i) {
  //     // this lambda executes immediately
  //     assert(i == 5);
  //   });
  static FuturePtr<Result...> CreateCompleted(const std::string& trace_name,
                                              Result&&... result) {
    auto f = Create(std::move(trace_name));
    f->Complete(std::forward<Result>(result)...);
    return f;
  }

  // Completes a future with |result|. This causes any callbacks registered
  // with Then(), ConstThen(), etc to be invoked with |result| passed to them
  // as a parameter.
  //
  // Calling Complete() does not affect this future's refcount. This is because:
  //
  // 1. Any callbacks that are registered are called immediately and
  //    synchronously, so the future's lifetime does not need to be extended
  //    before callbacks are invoked.
  // 2. Then() correctly handles cases where the future may be deleted by their
  //    callbacks.
  // 3. There is no danger of the future being deleted before Complete() is
  //    called, because if Complete() is called, the code that calls Complete()
  //    must have a reference to the future.
  void Complete(Result&&... result) {
    CompleteWithTuple(std::forward_as_tuple(std::forward<Result>(result)...));
  }

  // Returns a std::function<void(Result)> that, when called, calls Complete()
  // on this future. For example:
  //
  // FuturePtr<Bytes> MakeNetworkRequest(NetworkRequest& request) {
  //   auto f = Future<Bytes>::Create();
  //   PerformAsyncNetworkRequest(request, f->Completer());
  //   return f;
  // }
  //
  // The returned closure will maintain a reference to the future, so that the
  // closure can call Complete() on it correctly later. In other words, calling
  // Completer() will increase this future's refcount, and you do not need to
  // maintain a reference to it. After the closure is called, the future's
  // refcount will drop by 1. This enables you to write code like
  //
  //   {
  //     auto f = Future<>::Create();
  //     CallAsyncMethod(f->Completer());
  //     // f will now go out of scope, but f->Completer() owns it, so it's
  //     // still kept alive.
  //   }
  std::function<void(Result...)> Completer() {
    return [shared_this = FuturePtr<Result...>(this)](Result&&... result) {
      shared_this->Complete(std::forward<Result>(result)...);
    };
  }

  // Attaches a |callback| that is invoked when the future is completed with
  // Complete(), and returns a Future that is complete once |callback| has
  // finished executing.
  //
  // * The callback is invoked immediately (synchronously); it is not scheduled
  //   on the event loop.
  // * The callback is invoked on the same thread as the code that calls
  //   Complete().
  // * Only one callback can be attached: any callback that was previously
  //   attached with Then() is discarded.
  // * |callback| is called after callbacks attached with ConstThen().
  // * It is safe for |callback| to delete the future that Then() is invoked on.
  //   If this occurs, any chained futures returned by Then(), Map() etc will be
  //   de-referenced by this future and not be completed, even if a reference to
  //   the chained future is maintained elsewhere.
  // * It is also safe for |callback| to delete the chained future that Then()
  //   returns.
  // * The future returned by Then() will be owned by this future, so you do not
  //   need to maintain a reference to it.
  //
  // The type of this function looks complex, but is basically:
  //
  //   FuturePtr<> Then(std::function<void(Result...)> callback);
  template <typename Callback,
            typename = typename std::enable_if_t<
                internal::is_void_v<std::result_of_t<Callback(Result...)>>>>
  FuturePtr<> Then(Callback callback) {
    return SubfutureCreate(
        Future<>::Create(trace_name_ + "(Then)"),
        SubfutureVoidCallback<Result...>(std::move(callback)),
        SubfutureCompleter<>(), [] { return true; });
  }

  // Equivalent to Then(), but guards execution of |callback| with a WeakPtr.
  // If, at the time |callback| is to be executed, |weak_ptr| has been
  // invalidated, |callback| is not run, nor is the next Future in the chain
  // completed.
  template <typename Callback, typename T,
            typename = typename std::enable_if_t<
                internal::is_void_v<std::result_of_t<Callback(Result...)>>>>
  FuturePtr<> WeakThen(fxl::WeakPtr<T> weak_ptr, Callback callback) {
    return SubfutureCreate(
        Future<>::Create(trace_name_ + "(WeakThen)"),
        SubfutureVoidCallback<Result...>(std::move(callback)),
        SubfutureCompleter<>(), [weak_ptr] { return !!weak_ptr; });
  }

  // Similar to Then(), except that:
  //
  // * |const_callback| must take in the completed result via a const&,
  // * multiple callbacks can be attached,
  // * |const_callback| is called _before_ the Then() callback.
  FuturePtr<> ConstThen(std::function<void(const Result&...)> const_callback) {
    FuturePtr<> subfuture = Future<>::Create(trace_name_ + "(ConstThen)");
    AddConstCallback(SubfutureCallback<const Result&...>(
        subfuture,
        SubfutureVoidCallback<const Result&...>(std::move(const_callback)),
        SubfutureCompleter<>(), [] { return true; }));
    return subfuture;
  }

  // See WeakThen().
  template <typename T>
  FuturePtr<> WeakConstThen(
      fxl::WeakPtr<T> weak_ptr,
      std::function<void(const Result&...)> const_callback) {
    FuturePtr<> subfuture = Future<>::Create(trace_name_ + "(WeakConstThen)");
    AddConstCallback(SubfutureCallback<const Result&...>(
        subfuture,
        SubfutureVoidCallback<const Result&...>(std::move(const_callback)),
        SubfutureCompleter<>(), [weak_ptr] { return !!weak_ptr; }));
    return subfuture;
  }

  // Attaches a |callback| that is invoked when this future is completed with
  // Complete(). |callback| must return another future: when the returned future
  // completes, the future returned by AsyncMap() will complete. For example:
  //
  // ShowProgressSpinner(true);
  // FuturePtr<Bytes> f = MakeNetworkRequest(request);
  // f->AsyncMap([] (Bytes zipped_bytes) {
  //   FuturePtr<Bytes> f = UnzipInBackground(zipped_bytes);
  //   return f;
  // })->AsyncMap([] (Bytes unzipped_bytes) {
  //   FuturePtr<JPEGImage> f = DecodeImageInBackground(unzipped_bytes);
  //   return f;
  // })->AsyncMap([] (JPEGImage image) {
  //   FuturePtr<> f = UpdateUIAsynchronously(image);
  //   return f;
  // })->Then([] {
  //   ShowProgressSpinner(false);
  // });
  //
  // The type of this method looks terrifying, but is basically:
  //
  //   FuturePtr<CallbackResult>
  //     AsyncMap(std::function<FuturePtr<CallbackResult>(Result...)> callback);
  template <typename Callback,
            typename AsyncMapResult = std::result_of_t<Callback(Result...)>,
            typename MapResult =
                typename AsyncMapResult::element_type::result_tuple_type,
            typename = typename std::enable_if_t<internal::is_convertible_v<
                FuturePtr<MapResult>, AsyncMapResult>>>
  AsyncMapResult AsyncMap(Callback callback) {
    return SubfutureCreate(
        AsyncMapResult::element_type::Create(trace_name_ + "(AsyncMap)"),
        std::move(callback), SubfutureAsyncMapCompleter<AsyncMapResult>(),
        [] { return true; });
  }

  template <typename Callback, typename T,
            typename AsyncMapResult = std::result_of_t<Callback(Result...)>,
            typename MapResult =
                typename AsyncMapResult::element_type::result_tuple_type,
            typename = typename std::enable_if_t<internal::is_convertible_v<
                FuturePtr<MapResult>, AsyncMapResult>>>
  AsyncMapResult WeakAsyncMap(fxl::WeakPtr<T> weak_ptr, Callback callback) {
    return SubfutureCreate(
        AsyncMapResult::element_type::Create(trace_name_ + "(WeakAsyncMap)"),
        std::move(callback), SubfutureAsyncMapCompleter<AsyncMapResult>(),
        [weak_ptr] { return !!weak_ptr; });
  }

  // Attaches a |callback| that is invoked when this future is completed with
  // Complete(). The returned future is completed with |callback|'s return
  // value, when |callback| finishes executing. Returned tuples are flattened
  // into variadic futures.
  //
  // To return a future that produces a tuple (uncommon), wrap the map result
  // in another tuple (or use |AsyncMap|).
  //
  // That is:
  // Callback return type              | Returned future
  // ----------------------------------+----------------
  // T                                 | FuturePtr<T>
  // std::tuple<T, U, ...>             | FuturePtr<T, U, ...>
  // std::tuple<std::tuple<T, U, ...>> | FuturePtr<std::tuple<T, U, ...>>
  template <typename Callback,
            typename MapResult = std::result_of_t<Callback(Result...)>>
  auto Map(Callback callback) {
    return Map(std::move(callback), Tag<MapResult>{});
  }

  template <typename Callback, typename T,
            typename MapResult = std::result_of_t<Callback(Result...)>>
  FuturePtr<MapResult> WeakMap(fxl::WeakPtr<T> weak_ptr, Callback callback) {
    return SubfutureCreate(Future<MapResult>::Create(trace_name_ + "(WeakMap)"),
                           std::move(callback), SubfutureCompleter<MapResult>(),
                           [weak_ptr] { return !!weak_ptr; });
  }

  const std::string& trace_name() const { return trace_name_; }

 private:
  template <typename... Args>
  friend class Future;

  // This is a utility class used as a template parameter to determine function
  // overloading; see
  // <https://www.boost.org/community/generic_programming.html#tag_dispatching>
  // for more info.
  //
  // It is used in the |Map| overloads to "specialize" for |std::tuple| and
  // capture its parameter pack.
  template <typename T>
  class Tag {};

  template <typename ResultsFuture, typename... Args>
  friend fxl::RefPtr<ResultsFuture> Wait(
      const std::string& trace_name,
      const std::vector<FuturePtr<Args...>>& futures);

  template <typename ResultsFuture, typename TimeoutCallback, typename... Args>
  friend fxl::RefPtr<ResultsFuture> WaitWithTimeout(
      const std::string& trace_name, async_dispatcher_t* dispatcher,
      zx::duration timeout, TimeoutCallback on_timeout,
      const std::vector<FuturePtr<Args...>>& futures);

  FRIEND_REF_COUNTED_THREAD_SAFE(Future);

  Future() : result_{}, weak_factory_(this) {}

  void SetCallback(std::function<void(Result...)>&& callback) {
    callback_ = callback;

    MaybeInvokeCallbacks();
  }

  void SetCallbackWithTuple(
      std::function<void(std::tuple<Result...>)>&& callback) {
    SetCallback([callback](Result&&... result) {
      callback(std::forward_as_tuple(std::forward<Result>(result)...));
    });
  }

  void AddConstCallback(std::function<void(const Result&...)>&& callback) {
    if (!callback) {
      return;
    }

    // It's impossible to add a const callback after a future is completed
    // *and* it has a callback: the completed value will be moved into the
    // callback and won't be available for a ConstThen().
    if (status_ == internal::FutureStatus::kConsumed) {
      FXL_LOG(FATAL)
          << "Future@" << static_cast<void*>(this)
          << (trace_name_.length() ? "(" + trace_name_ + ")" : "")
          << ": Cannot add a const callback after completed result is "
             "already moved into Then() callback.";
    }

    const_callbacks_.emplace_back(callback);

    MaybeInvokeCallbacks();
  }

  void CompleteWithTuple(std::tuple<Result...>&& result) {
    FXL_DCHECK(status_ == internal::FutureStatus::kAwaiting)
        << "Future@" << static_cast<void*>(this)
        << (trace_name_.length() ? "(" + trace_name_ + ")" : "")
        << ": Complete() called twice.";

    result_ = std::forward<std::tuple<Result...>>(result);
    status_ = internal::FutureStatus::kCompleted;

    MaybeInvokeCallbacks();
  }

  void MaybeInvokeCallbacks() {
    if (status_ == internal::FutureStatus::kAwaiting) {
      return;
    }

    if (const_callbacks_.size()) {
      // Move |const_callbacks_| to a local variable. MaybeInvokeCallbacks()
      // can be called multiple times if the client only uses ConstThen() or
      // WeakConstThen() to fetch the completed values. This prevents calling
      // these callbacks multiple times by moving them out of the members
      // scope.
      auto local_const_callbacks = std::move(const_callbacks_);
      for (auto& const_callback : local_const_callbacks) {
        fxl::Apply(const_callback, result_);
      }
    }

    if (callback_) {
      auto callback = std::move(callback_);
      status_ = internal::FutureStatus::kConsumed;
      fxl::Apply(callback, std::move(result_));
    }
  }

  // The "subfuture" methods below are private helper functions designed to be
  // used with the futures that are returned by the public API (Then(), Map(),
  // etc); those returned futures are named "subfutures" in the code, which is
  // why the methods are named likewise.

  // A convenience method to call this->SetCallback() with the lambda returned
  // by SubfutureCallback().
  template <typename Subfuture, typename SubfutureCompleter, typename Callback,
            typename Guard>
  Subfuture SubfutureCreate(Subfuture subfuture, Callback&& callback,
                            SubfutureCompleter&& subfuture_completer,
                            Guard&& guard) {
    SetCallback(SubfutureCallback<Result...>(subfuture, callback,
                                             subfuture_completer, guard));
    return subfuture;
  }

  // Returns a lambda that:
  //
  // 1. calls |guard| before calling |callback|, and only calls |callback| if
  //    |guard| returns true;
  // 2. will not call |subfuture_completer| if either |this| or |subfuture| are
  //    destroyed by |callback|.
  template <typename... CoercedResult, typename Subfuture,
            typename SubfutureCompleter, typename Callback, typename Guard>
  auto SubfutureCallback(Subfuture subfuture, Callback&& callback,
                         SubfutureCompleter&& subfuture_completer,
                         Guard&& guard) {
    return [this, subfuture, callback, subfuture_completer,
            guard](CoercedResult&&... result) {
      if (!guard())
        return;

      auto weak_future = weak_factory_.GetWeakPtr();
      auto weak_subfuture = subfuture->weak_factory_.GetWeakPtr();
      auto subfuture_result = callback(std::forward<CoercedResult>(result)...);

      // |callback| above may delete this future or the returned subfuture when
      // it finishes executing, so check if |weak_future| and |weak_subfuture|
      // are still valid before attempting to complete the subfuture.
      if (weak_future && weak_subfuture)
        subfuture_completer(subfuture, std::move(subfuture_result));
    };
  }

  // Returns a lambda that calls |callback|, and returns an empty tuple. The
  // consistent return type enables generic programming techniques to be
  // applied to |callback| since the return type is consistent (it's always a
  // |std::tuple<T...>|).
  template <typename... CoercedResult, typename Callback>
  auto SubfutureVoidCallback(Callback&& callback) {
    return [callback](CoercedResult&&... result) {
      callback(std::forward<CoercedResult>(result)...);
      return std::make_tuple();
    };
  }

  // Returns a lambda that, when called with a subfuture and a std::tuple, will
  // complete the subfuture with the values from the tuple elements. This method
  // is designed to be used with SubfutureAsyncMapCompleter(), which will do
  // the same thing but can be passed Futures for the std::tuple values.
  // Together, this enables generic programming techniques to be applied to the
  // returned lambda, since the lambda presents a consistent API for callers.
  template <typename... SubfutureResult>
  auto SubfutureCompleter() {
    return [](FuturePtr<SubfutureResult...> subfuture,
              std::tuple<SubfutureResult...> subfuture_result) {
      subfuture->CompleteWithTuple(std::move(subfuture_result));
    };
  }

  // See the documentation for SubfutureCompleter() above.
  template <typename AsyncMapResult>
  auto SubfutureAsyncMapCompleter() {
    return [](AsyncMapResult subfuture,
              std::tuple<AsyncMapResult> subfuture_result) {
      std::get<0>(subfuture_result)
          ->SetCallbackWithTuple(
              [subfuture](
                  std::tuple<
                      typename AsyncMapResult::element_type::result_tuple_type>
                      transformed_result) {
                subfuture->CompleteWithTuple(
                    std::move(std::get<0>(transformed_result)));
              });
    };
  }

  // The following overloads enable |Map| to flatten out functions that map to
  // |std::tuple|.

  template <typename Callback, typename MapResult>
  FuturePtr<MapResult> Map(Callback callback, Tag<MapResult>) {
    // Directly passing |callback| like this ends up relying on an implicit
    // |std::tuple| memberwise constructor, which should be fine. It will
    // convert from |MapResult| to |std::tuple<MapResult>| implicitly.
    return Map(std::move(callback), Tag<std::tuple<MapResult>>{});
  }

  template <typename Callback, typename... MapResult>
  FuturePtr<MapResult...> Map(Callback callback,
                              Tag<std::tuple<MapResult...>>) {
    return SubfutureCreate(Future<MapResult...>::Create(trace_name_ + "(Map)"),
                           std::move(callback),
                           SubfutureCompleter<MapResult...>(),
                           [] { return true; });
  }

  std::string trace_name_;

  internal::FutureStatus status_ = internal::FutureStatus::kAwaiting;
  std::tuple<Result...> result_;

  // TODO(MI4-1102): Convert std::function to fit::function here & everywhere.

  // The callback attached to this future.
  std::function<void(Result...)> callback_;

  // Callbacks that have attached with the Const*() methods, such as
  // ConstThen().
  std::vector<std::function<void(const Result&...)>> const_callbacks_;

  // Keep this last in the list of members. (See WeakPtrFactory documentation
  // for more info.)
  fxl::WeakPtrFactory<Future<Result...>> weak_factory_;

  FXL_DISALLOW_COPY_ASSIGN_AND_MOVE(Future);

  // For unit tests only.
  friend class FutureTest;

  // For unit tests only.
  const std::tuple<Result...>& get() const {
    FXL_DCHECK(status_ != internal::FutureStatus::kAwaiting)
        << trace_name_ << ": get() called on unset future";

    return result_;
  }
};

// Returns a Future that completes when every future in |futures| is complete.
// The future returned by Wait() will be kept alive until every future in
// |futures| either completes or is destroyed. If any future in |futures| is
// destroyed prior to completing, the returned future will never complete. The
// order of the results corresponds to the order of the given futures,
// regardless of their completion order.
//
// The default type of the resulting future depends on the types of the
// component futures. Void futures produce a void future. Monadic futures
// produce a future of a flat vector. Polyadic futures produce a future of a
// vector of tuples. That is:
//
// Components         | Result
// -------------------+--------------------------------------------
// FuturePtr<>        | FuturePtr<>
// FuturePtr<T>       | FuturePtr<std::vector<T>>
// FuturePtr<T, U...> | FuturePtr<std::vector<std::tuple<T, U...>>>
//
// These defaults may be overridden by specifying the template argument for
// Wait as the type of future desired. All cases may produce a Future<> or a
// Future<std::vector<std::tuple<...>>>.
//
// Example usage:
//
// FuturePtr<Bytes> f1 = MakeNetworkRequest(request1);
// FuturePtr<Bytes> f2 = MakeNetworkRequest(request2);
// FuturePtr<Bytes> f3 = MakeNetworkRequest(request3);
// std::vector<FuturePtr<Bytes>> requests{f1, f2, f3};
// Wait("NetworkRequests", requests)->Then([](
//     std::vector<Bytes> bytes_vector) {
//   Bytes f1_bytes = bytes_vector[0];
//   Bytes f2_bytes = bytes_vector[1];
//   Bytes f3_bytes = bytes_vector[2];
// });
//
// This is similar to Promise.All() in JavaScript, or Join() in Rust.
template <typename ResultsFuture, typename... Result>
fxl::RefPtr<ResultsFuture> Wait(
    const std::string& trace_name,
    const std::vector<FuturePtr<Result...>>& futures) {
  if (futures.empty()) {
    auto immediate = ResultsFuture::Create(trace_name + "(Completed)");
    immediate->CompleteWithTuple({});
    return immediate;
  }

  auto results = std::make_shared<internal::ResultCollector<ResultsFuture>>(
      futures.size());

  fxl::RefPtr<ResultsFuture> all_futures_completed =
      ResultsFuture::Create(trace_name + "(WillWait)");

  for (size_t i = 0; i < futures.size(); i++) {
    const auto& future = futures[i];
    future->SetCallback(
        [i, all_futures_completed, results](Result&&... result) {
          results->AssignResult(i, std::forward<Result>(result)...);

          if (results->IsComplete()) {
            results->Complete(all_futures_completed.get());
          }
        });
  }

  return all_futures_completed;
}

// Like |Wait|, but gives up after a timeout. After the timeout, |on_timeout| is
// invoked with a diagnostic error string containing the trace names of the
// futures that have not completed.
//
// This maintains a reference to the returned |Future| until all component
// futures have been completed or destroyed, or until the timeout has elapsed,
// whichever happens first. However, |on_timeout| will be invoked on timeout if
// any future has not completed even if any or all futures have been destroyed.
template <typename ResultsFuture, typename TimeoutCallback, typename... Result>
fxl::RefPtr<ResultsFuture> WaitWithTimeout(
    const std::string& trace_name, async_dispatcher_t* dispatcher,
    zx::duration timeout,
    TimeoutCallback on_timeout /* void (const std::string&) */,
    const std::vector<FuturePtr<Result...>>& futures) {
  auto all_futures_completed = Wait<ResultsFuture>(trace_name, futures);

  if (all_futures_completed->status_ != internal::FutureStatus::kAwaiting) {
    return all_futures_completed;
  }

  auto all_trace_names =
      std::make_shared<std::vector<std::unique_ptr<std::string>>>();
  all_trace_names->reserve(futures.size());

  for (const auto& future : futures) {
    // There's no point in waiting on completed futures. Furthermore if we tried
    // that we'd have to put this before |Wait| since |Wait| consumes the
    // results, but then if all futures are already completed this is all just
    // wasted effort.
    if (future->status_ == internal::FutureStatus::kAwaiting) {
      size_t i = all_trace_names->size();
      all_trace_names->push_back(
          std::make_unique<std::string>(future->trace_name_));
      future->AddConstCallback([i, all_trace_names](const Result&...) {
        if (!all_trace_names->empty()) {
          (*all_trace_names)[i] = nullptr;
        }
      });
    }
  }

  // Return a proxy so that we can cancel result forwarding in the case of a
  // timeout. This could with more difficulty be done within |Wait|, but this
  // way allows us to reuse the logic more easily.
  fxl::RefPtr<ResultsFuture> all_proxy =
      ResultsFuture::Create(trace_name + "(WillWaitWithTimeout)");
  all_futures_completed->SetCallback(all_proxy->Completer());

  // TODO(rosswang): Factor this into dump and cancel functions that can be
  // called at other times.
  async::PostDelayedTask(
      dispatcher,
      [all_trace_names = std::move(all_trace_names),
       on_timeout = std::move(on_timeout),
       all_futures_completed =
           all_futures_completed->weak_factory_.GetWeakPtr()] {
        std::ostringstream msg;
        for (const auto& trace_name : *all_trace_names) {
          if (trace_name) {
            msg << "\n\t" << *trace_name;
          }
        }
        if (!msg.str().empty()) {
          on_timeout("Wait timed out. Still waiting for futures:" + msg.str());
          if (all_futures_completed) {
            // cancel results forwarding (possibly releasing all_proxy)
            all_futures_completed->SetCallback(nullptr);
          }
          // Possibly release the component futures. Once this task completes
          // and goes out of scope, the last reference to the Wait future (also
          // holding onto the component futures) should be released as well.
          all_trace_names->clear();
        }
      },
      timeout);

  return all_proxy;
}

// |WaitWithTimeout| on the thread defaut dispatcher.
template <typename ResultsFuture, typename TimeoutCallback, typename... Result>
fxl::RefPtr<ResultsFuture> WaitWithTimeout(
    const std::string& trace_name, zx::duration timeout,
    TimeoutCallback on_timeout /* void (const std::string&) */,
    const std::vector<FuturePtr<Result...>>& futures) {
  return WaitWithTimeout<ResultsFuture>(trace_name,
                                        async_get_default_dispatcher(), timeout,
                                        std::move(on_timeout), futures);
}

// These overloads allow us to effectively default the first template parameter,
// |ResultsFuture| (since the others are intended to be inferred).
// TODO(rosswang): If the overload combinatoric explosion gets too heavy, we can
// use a dummy struct parameter instead to encapsulate that template parameter.

template <typename... Result>
auto Wait(const std::string& trace_name,
          const std::vector<FuturePtr<Result...>>& futures) {
  return Wait<internal::DefaultResultsFuture_t<Result...>>(trace_name, futures);
}

template <typename TimeoutCallback, typename... Result>
auto WaitWithTimeout(const std::string& trace_name,
                     async_dispatcher_t* dispatcher, zx::duration timeout,
                     TimeoutCallback on_timeout,
                     const std::vector<FuturePtr<Result...>>& futures) {
  return WaitWithTimeout<internal::DefaultResultsFuture_t<Result...>>(
      trace_name, dispatcher, timeout, std::move(on_timeout), futures);
}

template <typename TimeoutCallback, typename... Result>
auto WaitWithTimeout(const std::string& trace_name, zx::duration timeout,
                     TimeoutCallback on_timeout /* void (const std::string&) */,
                     const std::vector<FuturePtr<Result...>>& futures) {
  return WaitWithTimeout<internal::DefaultResultsFuture_t<Result...>>(
      trace_name, async_get_default_dispatcher(), timeout,
      std::move(on_timeout), futures);
}

// We need to provide the initializer list overloads or template deduction fails
// for the above overloads if given an initializer list.
// TODO(rosswang): Add a potentially heterogeneous variadic template instead,
// and prefer it over initializer lists outside of tests.
template <typename ResultsFuture, typename... Result>
auto Wait(const std::string& trace_name,
          std::initializer_list<FuturePtr<Result...>> futures) {
  return Wait<ResultsFuture>(trace_name,
                             std::vector<FuturePtr<Result...>>(futures));
}

template <typename ResultsFuture, typename TimeoutCallback, typename... Result>
fxl::RefPtr<ResultsFuture> WaitWithTimeout(
    const std::string& trace_name, async_dispatcher_t* dispatcher,
    zx::duration timeout, TimeoutCallback on_timeout,
    std::initializer_list<FuturePtr<Result...>> futures) {
  return WaitWithTimeout<ResultsFuture>(
      trace_name, dispatcher, timeout, std::move(on_timeout),
      std::vector<FuturePtr<Result...>>(futures));
}

template <typename ResultsFuture, typename TimeoutCallback, typename... Result>
fxl::RefPtr<ResultsFuture> WaitWithTimeout(
    const std::string& trace_name, zx::duration timeout,
    TimeoutCallback on_timeout /* void (const std::string&) */,
    std::initializer_list<FuturePtr<Result...>> futures) {
  return WaitWithTimeout<ResultsFuture>(
      trace_name, async_get_default_dispatcher(), timeout,
      std::move(on_timeout), std::vector<FuturePtr<Result...>>(futures));
}

template <typename... Result>
auto Wait(const std::string& trace_name,
          std::initializer_list<FuturePtr<Result...>> futures) {
  return Wait(trace_name, std::vector<FuturePtr<Result...>>(futures));
}

template <typename TimeoutCallback, typename... Result>
auto WaitWithTimeout(const std::string& trace_name,
                     async_dispatcher_t* dispatcher, zx::duration timeout,
                     TimeoutCallback on_timeout,
                     std::initializer_list<FuturePtr<Result...>> futures) {
  return WaitWithTimeout(trace_name, dispatcher, timeout, std::move(on_timeout),
                         std::vector<FuturePtr<Result...>>(futures));
}

template <typename TimeoutCallback, typename... Result>
auto WaitWithTimeout(const std::string& trace_name, zx::duration timeout,
                     TimeoutCallback on_timeout /* void (const std::string&) */,
                     std::initializer_list<FuturePtr<Result...>> futures) {
  return WaitWithTimeout(trace_name, async_get_default_dispatcher(), timeout,
                         std::move(on_timeout),
                         std::vector<FuturePtr<Result...>>(futures));
}

}  // namespace modular

#endif  // LIB_ASYNC_CPP_FUTURE_H_
