[operation] Support adding fit::pending_task to operation collections.

TEST=operation_unittest
MF-147 #comment [operation] Support adding fit::pending_task to operation collections.

Change-Id: Ib9d1e4bcb522d8328991e3eb39cbcba1087d816c
diff --git a/public/lib/async/cpp/BUILD.gn b/public/lib/async/cpp/BUILD.gn
index 7c7f408..6476beb 100644
--- a/public/lib/async/cpp/BUILD.gn
+++ b/public/lib/async/cpp/BUILD.gn
@@ -24,6 +24,7 @@
   public_deps = [
     ":future",
     "//garnet/public/lib/fxl",
+    "//garnet/public/lib/async_promise",
   ]
 
   deps = [
diff --git a/public/lib/async/cpp/operation.cc b/public/lib/async/cpp/operation.cc
index 0a102bf..7d636bd 100644
--- a/public/lib/async/cpp/operation.cc
+++ b/public/lib/async/cpp/operation.cc
@@ -8,6 +8,8 @@
 
 #include <lib/async/cpp/task.h>
 #include <lib/async/default.h>
+#include <lib/fit/bridge.h>
+#include <lib/fit/defer.h>
 #include <lib/fxl/logging.h>
 #include <trace/event.h>
 
@@ -33,7 +35,8 @@
   o->InvalidateWeakPtrs();
 }
 
-OperationCollection::OperationCollection() : weak_ptr_factory_(this) {}
+OperationCollection::OperationCollection()
+    : executor_(async_get_default_dispatcher()), weak_ptr_factory_(this) {}
 
 OperationCollection::~OperationCollection() {
   // We invalidate weakptrs to all Operation<>s first before destroying them, so
@@ -88,7 +91,12 @@
   // no-op for operation collection.
 }
 
-OperationQueue::OperationQueue() : weak_ptr_factory_(this) {}
+void OperationCollection::ScheduleTask(fit::pending_task task) {
+  executor_.schedule_task(std::move(task));
+}
+
+OperationQueue::OperationQueue()
+    : executor_(async_get_default_dispatcher()), weak_ptr_factory_(this) {}
 
 OperationQueue::~OperationQueue() {
   // We invalidate weakptrs to all Operation<>s first before destroying them, so
@@ -135,6 +143,54 @@
   }
 }
 
+namespace {
+class PromiseWrapperCall : public Operation<> {
+ public:
+  PromiseWrapperCall(fit::completer<> completer)
+      : Operation("PromiseDoneCall", [] {}), completer_(std::move(completer)) {}
+
+  void Run() {
+    running_ = true;
+    completer_.complete_ok();
+  }
+
+  void SayDone() {
+    FXL_CHECK(running_);
+    Done();
+  }
+
+ private:
+  fit::completer<> completer_;
+  bool running_{false};
+};
+}  // namespace
+
+void OperationQueue::ScheduleTask(fit::pending_task task) {
+  // We need to block the execution of this task on tasks in |operations_|, and
+  // then also block further execution of |operations_| on this task finishing.
+  // We do this by "wrapping" the promise in a PromiseWrapperCall.
+  //
+  // |start| will be completed when |wrapper->Run()| is called, and |wrapper|
+  // will be finished as a result of the promise finishing: we call
+  // |wrapper->SayDone()| when |task| is destroyed. It is destroyed when either
+  // a) it is abandoned or b) it is completed successfully or with an error. In
+  // either case we want to unblock the next operation in the queue.
+  fit::bridge start;
+  auto wrapper = new PromiseWrapperCall(std::move(start.completer));
+  executor_.schedule_task(start.consumer.promise().then(
+      [p = task.take_promise(), wrapper](fit::result<>&) mutable {
+        // It is safe to call SayDone() on |wrapper| because we know that
+        // |wrapper| will be alive so long as this promise is being executed.
+        //
+        // We use a fit::defer on the capture list of .then() so that if |p| is
+        // abandoned, we still unblock the queue.
+        return p.then([defer = fit::defer([wrapper] { wrapper->SayDone(); })](
+                          fit::result<>&) {});
+      }));
+
+  Add(wrapper);
+}
+
 OperationBase::OperationBase(const char* const trace_name,
                              std::string trace_info)
     // While we transition all operations to be explicitly added to containers
diff --git a/public/lib/async/cpp/operation.h b/public/lib/async/cpp/operation.h
index dfef6cc..0898e0d 100644
--- a/public/lib/async/cpp/operation.h
+++ b/public/lib/async/cpp/operation.h
@@ -13,9 +13,11 @@
 #include <vector>
 
 #include <lib/async/cpp/future.h>
+#include <lib/async_promise/executor.h>
 #include <lib/fxl/logging.h>
 #include <lib/fxl/macros.h>
 #include <lib/fxl/memory/weak_ptr.h>
+#include <lib/fit/promise.h>
 
 namespace modular {
 class OperationBase;
@@ -31,6 +33,11 @@
   // Adds |o| to this container and takes ownership.
   virtual void Add(OperationBase* o) final;
 
+  // Adds |task| to be scheduled on |this|. This mirrors
+  // the interface in fit::executor, and is here only during
+  // a transition from Operations to fit::promises.
+  virtual void ScheduleTask(fit::pending_task task) = 0;
+
  protected:
   void Schedule(OperationBase* o);
   void InvalidateWeakPtrs(OperationBase* o);
@@ -54,12 +61,15 @@
   OperationCollection();
   ~OperationCollection() override;
 
+  void ScheduleTask(fit::pending_task task) override;
+
  private:
   fxl::WeakPtr<OperationContainer> GetWeakPtr() override;
   void Hold(OperationBase* o) override;
   void Drop(OperationBase* o) override;
   void Cont() override;
 
+  async::Executor executor_;
   std::vector<std::unique_ptr<OperationBase>> operations_;
 
   // It is essential that the weak_ptr_factory is defined after the operations_
@@ -81,6 +91,8 @@
   OperationQueue();
   ~OperationQueue() override;
 
+  void ScheduleTask(fit::pending_task task) override;
+
  private:
   fxl::WeakPtr<OperationContainer> GetWeakPtr() override;
   void Hold(OperationBase* o) override;
@@ -93,6 +105,7 @@
   // |operations_|: its result callback could be executing.
   bool idle_ = true;
 
+  async::Executor executor_;
   std::queue<std::unique_ptr<OperationBase>> operations_;
 
   // It is essential that the weak_ptr_factory is defined after the operations_
diff --git a/public/lib/async/cpp/operation_unittest.cc b/public/lib/async/cpp/operation_unittest.cc
index 769f3f8..16d7d05 100644
--- a/public/lib/async/cpp/operation_unittest.cc
+++ b/public/lib/async/cpp/operation_unittest.cc
@@ -41,6 +41,8 @@
 
   void Cont() override { ++cont_count; }
 
+  void ScheduleTask(fit::pending_task task) override {}
+
   void PretendToDie() { weak_ptr_factory_.InvalidateWeakPtrs(); }
 
   using OperationContainer::InvalidateWeakPtrs;  // Promote for testing.
@@ -185,8 +187,8 @@
       : Operation("Test FlowToken Operation", std::move(done)) {}
 
   // |call_before_flow_dies| is invoked before the FlowToken goes out of scope.
-  void SayDone(int result,
-               std::function<void()> call_before_flow_dies = [] {}) {
+  void SayDone(
+      int result, std::function<void()> call_before_flow_dies = [] {}) {
     // When |flow| goes out of scope, it will call Done() for us.
     FlowToken flow{this, &result_};
 
@@ -255,29 +257,47 @@
 TEST_F(OperationTest, OperationQueue) {
   // Here we test a specific implementation of OperationContainer
   // (OperationQueue), which should only allow one operation to "run" at a
-  // given time. That means that operation #2 is not scheduled until operation
-  // #1 finishes.
+  // given time.
   OperationQueue container;
 
-  // OperationQueue, unlike TestContainer, does manage its memory.
+  // OperationQueue, unlike TestContainer, does own the Operations.
   bool op1_ran = false;
   bool op1_done = false;
   auto* op1 = new TestOperation<>([&op1_ran]() { op1_ran = true; },
                                   [&op1_done]() { op1_done = true; });
 
+  bool op3_ran = false;
+  bool op3_done = false;
+  auto* op3 = new TestOperation<>([&op3_ran]() { op3_ran = true; },
+                                  [&op3_done]() { op3_done = true; });
+
+  // We'll queue |op1|, then a fit::promise ("op2") and another Operation,
+  // |op3|.
+  container.Add(op1);
+
   bool op2_ran = false;
   bool op2_done = false;
-  auto* op2 = new TestOperation<>([&op2_ran]() { op2_ran = true; },
-                                  [&op2_done]() { op2_done = true; });
+  fit::suspended_task suspended_op2;
+  container.ScheduleTask(
+      fit::make_promise([&](fit::context& c) -> fit::result<> {
+        if (op2_ran == true) {
+          op2_done = true;
+          return fit::ok();
+        }
+        op2_ran = true;
+        suspended_op2 = c.suspend_task();
+        return fit::pending();
+      }));
 
-  container.Add(op1);
-  container.Add(op2);
+  container.Add(op3);
 
   // Nothing has run yet because we haven't run the async loop.
   EXPECT_FALSE(op1_ran);
   EXPECT_FALSE(op1_done);
   EXPECT_FALSE(op2_ran);
   EXPECT_FALSE(op2_done);
+  EXPECT_FALSE(op3_ran);
+  EXPECT_FALSE(op3_done);
 
   // Running the loop we expect op1 to have run, but not completed.
   RunLoopUntilIdle();
@@ -285,24 +305,42 @@
   EXPECT_FALSE(op1_done);
   EXPECT_FALSE(op2_ran);
   EXPECT_FALSE(op2_done);
+  EXPECT_FALSE(op3_ran);
+  EXPECT_FALSE(op3_done);
 
-  // But even if we run more, we do not expect op2 to run.
+  // But even if we run more, we do not expect any other ops to run.
   RunLoopUntilIdle();
   EXPECT_TRUE(op1_ran);
   EXPECT_FALSE(op1_done);
   EXPECT_FALSE(op2_ran);
   EXPECT_FALSE(op2_done);
+  EXPECT_FALSE(op3_ran);
+  EXPECT_FALSE(op3_done);
 
-  // If op1 says it's Done(), we expect op2 to be queued, but not run yet.
+  // If op1 says it's Done(), we expect op2 to run.
   op1->SayDone();
-  EXPECT_TRUE(op1_done);
-  EXPECT_FALSE(op2_ran);
-  EXPECT_FALSE(op2_done);
-
-  // Running the loop again we expect op2 to start, but not completed.
   RunLoopUntilIdle();
+  EXPECT_TRUE(op1_done);
   EXPECT_TRUE(op2_ran);
   EXPECT_FALSE(op2_done);
+  EXPECT_FALSE(op3_ran);
+  EXPECT_FALSE(op3_done);
+
+  // Running the loop again should do nothing, as op2 is still pending (until
+  // we resume it manuall).
+  RunLoopUntilIdle();
+  EXPECT_TRUE(suspended_op2);
+  EXPECT_FALSE(op2_done);
+  EXPECT_FALSE(op3_ran);
+  EXPECT_FALSE(op3_done);
+
+  // Resume op2, and run the loop again. We expect op3 to start, but not
+  // complete.
+  suspended_op2.resume_task();
+  RunLoopUntilIdle();
+  EXPECT_TRUE(op2_done);
+  EXPECT_TRUE(op3_ran);
+  EXPECT_FALSE(op3_done);
 }
 
 TEST_F(OperationTest, OperationCollection) {
@@ -323,19 +361,25 @@
 
   container.Add(op1);
   container.Add(op2);
+  bool op3_ran = false;
+  container.ScheduleTask(fit::make_promise([&] { op3_ran = true; }));
 
   // Nothing has run yet because we haven't run the async loop.
   EXPECT_FALSE(op1_ran);
   EXPECT_FALSE(op1_done);
   EXPECT_FALSE(op2_ran);
   EXPECT_FALSE(op2_done);
+  EXPECT_FALSE(op3_ran);
 
-  // Running the loop we expect op1 to have run, but not completed.
+  // Running the loop we expect all ops to have run. TestOperations won't have
+  // completed because they require us to call SayDone(). The fit::promise,
+  // however, will have completed (it doesn't suspend).
   RunLoopUntilIdle();
   EXPECT_TRUE(op1_ran);
   EXPECT_FALSE(op1_done);
   EXPECT_TRUE(op2_ran);
   EXPECT_FALSE(op2_done);
+  EXPECT_TRUE(op2_ran);
 }
 
 class TestOperationNotNullPtr : public Operation<> {