[operation] Support adding fit::pending_task to operation collections.
TEST=operation_unittest
MF-147 #comment [operation] Support adding fit::pending_task to operation collections.
Change-Id: Ib9d1e4bcb522d8328991e3eb39cbcba1087d816c
diff --git a/public/lib/async/cpp/BUILD.gn b/public/lib/async/cpp/BUILD.gn
index 7c7f408..6476beb 100644
--- a/public/lib/async/cpp/BUILD.gn
+++ b/public/lib/async/cpp/BUILD.gn
@@ -24,6 +24,7 @@
public_deps = [
":future",
"//garnet/public/lib/fxl",
+ "//garnet/public/lib/async_promise",
]
deps = [
diff --git a/public/lib/async/cpp/operation.cc b/public/lib/async/cpp/operation.cc
index 0a102bf..7d636bd 100644
--- a/public/lib/async/cpp/operation.cc
+++ b/public/lib/async/cpp/operation.cc
@@ -8,6 +8,8 @@
#include <lib/async/cpp/task.h>
#include <lib/async/default.h>
+#include <lib/fit/bridge.h>
+#include <lib/fit/defer.h>
#include <lib/fxl/logging.h>
#include <trace/event.h>
@@ -33,7 +35,8 @@
o->InvalidateWeakPtrs();
}
-OperationCollection::OperationCollection() : weak_ptr_factory_(this) {}
+OperationCollection::OperationCollection()
+ : executor_(async_get_default_dispatcher()), weak_ptr_factory_(this) {}
OperationCollection::~OperationCollection() {
// We invalidate weakptrs to all Operation<>s first before destroying them, so
@@ -88,7 +91,12 @@
// no-op for operation collection.
}
-OperationQueue::OperationQueue() : weak_ptr_factory_(this) {}
+void OperationCollection::ScheduleTask(fit::pending_task task) {
+ executor_.schedule_task(std::move(task));
+}
+
+OperationQueue::OperationQueue()
+ : executor_(async_get_default_dispatcher()), weak_ptr_factory_(this) {}
OperationQueue::~OperationQueue() {
// We invalidate weakptrs to all Operation<>s first before destroying them, so
@@ -135,6 +143,54 @@
}
}
+namespace {
+class PromiseWrapperCall : public Operation<> {
+ public:
+ PromiseWrapperCall(fit::completer<> completer)
+ : Operation("PromiseDoneCall", [] {}), completer_(std::move(completer)) {}
+
+ void Run() {
+ running_ = true;
+ completer_.complete_ok();
+ }
+
+ void SayDone() {
+ FXL_CHECK(running_);
+ Done();
+ }
+
+ private:
+ fit::completer<> completer_;
+ bool running_{false};
+};
+} // namespace
+
+void OperationQueue::ScheduleTask(fit::pending_task task) {
+ // We need to block the execution of this task on tasks in |operations_|, and
+ // then also block further execution of |operations_| on this task finishing.
+ // We do this by "wrapping" the promise in a PromiseWrapperCall.
+ //
+ // |start| will be completed when |wrapper->Run()| is called, and |wrapper|
+ // will be finished as a result of the promise finishing: we call
+ // |wrapper->SayDone()| when |task| is destroyed. It is destroyed when either
+ // a) it is abandoned or b) it is completed successfully or with an error. In
+ // either case we want to unblock the next operation in the queue.
+ fit::bridge start;
+ auto wrapper = new PromiseWrapperCall(std::move(start.completer));
+ executor_.schedule_task(start.consumer.promise().then(
+ [p = task.take_promise(), wrapper](fit::result<>&) mutable {
+ // It is safe to call SayDone() on |wrapper| because we know that
+ // |wrapper| will be alive so long as this promise is being executed.
+ //
+ // We use a fit::defer on the capture list of .then() so that if |p| is
+ // abandoned, we still unblock the queue.
+ return p.then([defer = fit::defer([wrapper] { wrapper->SayDone(); })](
+ fit::result<>&) {});
+ }));
+
+ Add(wrapper);
+}
+
OperationBase::OperationBase(const char* const trace_name,
std::string trace_info)
// While we transition all operations to be explicitly added to containers
diff --git a/public/lib/async/cpp/operation.h b/public/lib/async/cpp/operation.h
index dfef6cc..0898e0d 100644
--- a/public/lib/async/cpp/operation.h
+++ b/public/lib/async/cpp/operation.h
@@ -13,9 +13,11 @@
#include <vector>
#include <lib/async/cpp/future.h>
+#include <lib/async_promise/executor.h>
#include <lib/fxl/logging.h>
#include <lib/fxl/macros.h>
#include <lib/fxl/memory/weak_ptr.h>
+#include <lib/fit/promise.h>
namespace modular {
class OperationBase;
@@ -31,6 +33,11 @@
// Adds |o| to this container and takes ownership.
virtual void Add(OperationBase* o) final;
+ // Adds |task| to be scheduled on |this|. This mirrors
+ // the interface in fit::executor, and is here only during
+ // a transition from Operations to fit::promises.
+ virtual void ScheduleTask(fit::pending_task task) = 0;
+
protected:
void Schedule(OperationBase* o);
void InvalidateWeakPtrs(OperationBase* o);
@@ -54,12 +61,15 @@
OperationCollection();
~OperationCollection() override;
+ void ScheduleTask(fit::pending_task task) override;
+
private:
fxl::WeakPtr<OperationContainer> GetWeakPtr() override;
void Hold(OperationBase* o) override;
void Drop(OperationBase* o) override;
void Cont() override;
+ async::Executor executor_;
std::vector<std::unique_ptr<OperationBase>> operations_;
// It is essential that the weak_ptr_factory is defined after the operations_
@@ -81,6 +91,8 @@
OperationQueue();
~OperationQueue() override;
+ void ScheduleTask(fit::pending_task task) override;
+
private:
fxl::WeakPtr<OperationContainer> GetWeakPtr() override;
void Hold(OperationBase* o) override;
@@ -93,6 +105,7 @@
// |operations_|: its result callback could be executing.
bool idle_ = true;
+ async::Executor executor_;
std::queue<std::unique_ptr<OperationBase>> operations_;
// It is essential that the weak_ptr_factory is defined after the operations_
diff --git a/public/lib/async/cpp/operation_unittest.cc b/public/lib/async/cpp/operation_unittest.cc
index 769f3f8..16d7d05 100644
--- a/public/lib/async/cpp/operation_unittest.cc
+++ b/public/lib/async/cpp/operation_unittest.cc
@@ -41,6 +41,8 @@
void Cont() override { ++cont_count; }
+ void ScheduleTask(fit::pending_task task) override {}
+
void PretendToDie() { weak_ptr_factory_.InvalidateWeakPtrs(); }
using OperationContainer::InvalidateWeakPtrs; // Promote for testing.
@@ -185,8 +187,8 @@
: Operation("Test FlowToken Operation", std::move(done)) {}
// |call_before_flow_dies| is invoked before the FlowToken goes out of scope.
- void SayDone(int result,
- std::function<void()> call_before_flow_dies = [] {}) {
+ void SayDone(
+ int result, std::function<void()> call_before_flow_dies = [] {}) {
// When |flow| goes out of scope, it will call Done() for us.
FlowToken flow{this, &result_};
@@ -255,29 +257,47 @@
TEST_F(OperationTest, OperationQueue) {
// Here we test a specific implementation of OperationContainer
// (OperationQueue), which should only allow one operation to "run" at a
- // given time. That means that operation #2 is not scheduled until operation
- // #1 finishes.
+ // given time.
OperationQueue container;
- // OperationQueue, unlike TestContainer, does manage its memory.
+ // OperationQueue, unlike TestContainer, does own the Operations.
bool op1_ran = false;
bool op1_done = false;
auto* op1 = new TestOperation<>([&op1_ran]() { op1_ran = true; },
[&op1_done]() { op1_done = true; });
+ bool op3_ran = false;
+ bool op3_done = false;
+ auto* op3 = new TestOperation<>([&op3_ran]() { op3_ran = true; },
+ [&op3_done]() { op3_done = true; });
+
+ // We'll queue |op1|, then a fit::promise ("op2") and another Operation,
+ // |op3|.
+ container.Add(op1);
+
bool op2_ran = false;
bool op2_done = false;
- auto* op2 = new TestOperation<>([&op2_ran]() { op2_ran = true; },
- [&op2_done]() { op2_done = true; });
+ fit::suspended_task suspended_op2;
+ container.ScheduleTask(
+ fit::make_promise([&](fit::context& c) -> fit::result<> {
+ if (op2_ran == true) {
+ op2_done = true;
+ return fit::ok();
+ }
+ op2_ran = true;
+ suspended_op2 = c.suspend_task();
+ return fit::pending();
+ }));
- container.Add(op1);
- container.Add(op2);
+ container.Add(op3);
// Nothing has run yet because we haven't run the async loop.
EXPECT_FALSE(op1_ran);
EXPECT_FALSE(op1_done);
EXPECT_FALSE(op2_ran);
EXPECT_FALSE(op2_done);
+ EXPECT_FALSE(op3_ran);
+ EXPECT_FALSE(op3_done);
// Running the loop we expect op1 to have run, but not completed.
RunLoopUntilIdle();
@@ -285,24 +305,42 @@
EXPECT_FALSE(op1_done);
EXPECT_FALSE(op2_ran);
EXPECT_FALSE(op2_done);
+ EXPECT_FALSE(op3_ran);
+ EXPECT_FALSE(op3_done);
- // But even if we run more, we do not expect op2 to run.
+ // But even if we run more, we do not expect any other ops to run.
RunLoopUntilIdle();
EXPECT_TRUE(op1_ran);
EXPECT_FALSE(op1_done);
EXPECT_FALSE(op2_ran);
EXPECT_FALSE(op2_done);
+ EXPECT_FALSE(op3_ran);
+ EXPECT_FALSE(op3_done);
- // If op1 says it's Done(), we expect op2 to be queued, but not run yet.
+ // If op1 says it's Done(), we expect op2 to run.
op1->SayDone();
- EXPECT_TRUE(op1_done);
- EXPECT_FALSE(op2_ran);
- EXPECT_FALSE(op2_done);
-
- // Running the loop again we expect op2 to start, but not completed.
RunLoopUntilIdle();
+ EXPECT_TRUE(op1_done);
EXPECT_TRUE(op2_ran);
EXPECT_FALSE(op2_done);
+ EXPECT_FALSE(op3_ran);
+ EXPECT_FALSE(op3_done);
+
+ // Running the loop again should do nothing, as op2 is still pending (until
+ // we resume it manuall).
+ RunLoopUntilIdle();
+ EXPECT_TRUE(suspended_op2);
+ EXPECT_FALSE(op2_done);
+ EXPECT_FALSE(op3_ran);
+ EXPECT_FALSE(op3_done);
+
+ // Resume op2, and run the loop again. We expect op3 to start, but not
+ // complete.
+ suspended_op2.resume_task();
+ RunLoopUntilIdle();
+ EXPECT_TRUE(op2_done);
+ EXPECT_TRUE(op3_ran);
+ EXPECT_FALSE(op3_done);
}
TEST_F(OperationTest, OperationCollection) {
@@ -323,19 +361,25 @@
container.Add(op1);
container.Add(op2);
+ bool op3_ran = false;
+ container.ScheduleTask(fit::make_promise([&] { op3_ran = true; }));
// Nothing has run yet because we haven't run the async loop.
EXPECT_FALSE(op1_ran);
EXPECT_FALSE(op1_done);
EXPECT_FALSE(op2_ran);
EXPECT_FALSE(op2_done);
+ EXPECT_FALSE(op3_ran);
- // Running the loop we expect op1 to have run, but not completed.
+ // Running the loop we expect all ops to have run. TestOperations won't have
+ // completed because they require us to call SayDone(). The fit::promise,
+ // however, will have completed (it doesn't suspend).
RunLoopUntilIdle();
EXPECT_TRUE(op1_ran);
EXPECT_FALSE(op1_done);
EXPECT_TRUE(op2_ran);
EXPECT_FALSE(op2_done);
+ EXPECT_TRUE(op2_ran);
}
class TestOperationNotNullPtr : public Operation<> {