| // Copyright 2018 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "lib/async_promise/executor.h" |
| |
| #include <future> |
| |
| #include <lib/async-loop/cpp/loop.h> |
| #include <lib/fit/defer.h> |
| #include <unittest/unittest.h> |
| |
| namespace { |
| |
| bool running_tasks() { |
| BEGIN_TEST; |
| |
| async::Loop loop(&kAsyncLoopConfigNoAttachToThread); |
| async::Executor executor(loop.dispatcher()); |
| uint64_t run_count[3] = {}; |
| |
| // Schedule a task that runs once and increments a counter. |
| executor.schedule_task(fit::make_promise([&] { run_count[0]++; })); |
| |
| // Schedule a task that runs once, increments a counter, |
| // and scheduled another task. |
| executor.schedule_task(fit::make_promise([&](fit::context& context) { |
| run_count[1]++; |
| assert(context.executor() == &executor); |
| context.executor()->schedule_task(fit::make_promise([&] { run_count[2]++; })); |
| })); |
| EXPECT_EQ(0, run_count[0]); |
| EXPECT_EQ(0, run_count[1]); |
| EXPECT_EQ(0, run_count[2]); |
| |
| // We expect that all of the tasks will run to completion including newly |
| // scheduled tasks. |
| loop.RunUntilIdle(); |
| EXPECT_EQ(1, run_count[0]); |
| EXPECT_EQ(1, run_count[1]); |
| EXPECT_EQ(1, run_count[2]); |
| |
| END_TEST; |
| } |
| |
| bool suspending_and_resuming_tasks() { |
| BEGIN_TEST; |
| |
| async::Loop loop(&kAsyncLoopConfigNoAttachToThread); |
| async::Executor executor(loop.dispatcher()); |
| |
| uint64_t run_count[5] = {}; |
| uint64_t resume_count[5] = {}; |
| uint64_t resume_count4b = 0; |
| |
| // Schedule a task that suspends itself and immediately resumes. |
| executor.schedule_task(fit::make_promise([&](fit::context& context) |
| -> fit::result<> { |
| if (++run_count[0] == 100) |
| return fit::ok(); |
| resume_count[0]++; |
| context.suspend_task().resume_task(); |
| return fit::pending(); |
| })); |
| |
| // Schedule a task that requires several iterations to complete, each |
| // time scheduling another task to resume itself after suspension. |
| executor.schedule_task(fit::make_promise([&](fit::context& context) |
| -> fit::result<> { |
| if (++run_count[1] == 100) |
| return fit::ok(); |
| context.executor()->schedule_task( |
| fit::make_promise([&, s = context.suspend_task()]() mutable { |
| resume_count[1]++; |
| s.resume_task(); |
| })); |
| return fit::pending(); |
| })); |
| |
| // Same as the above but use another thread to resume. |
| executor.schedule_task(fit::make_promise([&](fit::context& context) |
| -> fit::result<> { |
| if (++run_count[2] == 100) |
| return fit::ok(); |
| std::async(std::launch::async, [&, s = context.suspend_task()]() mutable { |
| resume_count[2]++; |
| s.resume_task(); |
| }); |
| return fit::pending(); |
| })); |
| |
| // Schedule a task that suspends itself but doesn't actually return pending |
| // so it only runs once. |
| executor.schedule_task(fit::make_promise([&](fit::context& context) |
| -> fit::result<> { |
| run_count[3]++; |
| context.suspend_task(); |
| return fit::ok(); |
| })); |
| |
| // Schedule a task that suspends itself and arranges to be resumed on |
| // one of two other threads, whichever gets there first. |
| executor.schedule_task(fit::make_promise([&](fit::context& context) |
| -> fit::result<> { |
| if (++run_count[4] == 100) |
| return fit::ok(); |
| std::async(std::launch::async, [&, s = context.suspend_task()]() mutable { |
| resume_count[4]++; |
| s.resume_task(); |
| }); |
| std::async(std::launch::async, [&, s = context.suspend_task()]() mutable { |
| resume_count4b++; // use a different variable to avoid data races |
| s.resume_task(); |
| }); |
| return fit::pending(); |
| })); |
| |
| // We expect the tasks to have been completed after being resumed several times. |
| loop.RunUntilIdle(); |
| EXPECT_EQ(100, run_count[0]); |
| EXPECT_EQ(99, resume_count[0]); |
| EXPECT_EQ(100, run_count[1]); |
| EXPECT_EQ(99, resume_count[1]); |
| EXPECT_EQ(100, run_count[2]); |
| EXPECT_EQ(99, resume_count[2]); |
| EXPECT_EQ(1, run_count[3]); |
| EXPECT_EQ(0, resume_count[3]); |
| EXPECT_EQ(100, run_count[4]); |
| EXPECT_EQ(99, resume_count[4]); |
| EXPECT_EQ(99, resume_count4b); |
| |
| END_TEST; |
| } |
| |
| bool abandoning_tasks() { |
| BEGIN_TEST; |
| |
| async::Loop loop(&kAsyncLoopConfigNoAttachToThread); |
| async::Executor executor(loop.dispatcher()); |
| uint64_t run_count[4] = {}; |
| uint64_t destruction[4] = {}; |
| |
| // Schedule a task that returns pending without suspending itself |
| // so it is immediately abandoned. |
| executor.schedule_task(fit::make_promise( |
| [&, d = fit::defer([&] { destruction[0]++; })]() -> fit::result<> { |
| run_count[0]++; |
| return fit::pending(); |
| })); |
| |
| // Schedule a task that suspends itself but drops the |suspended_task| |
| // object before returning so it is immediately abandoned. |
| executor.schedule_task(fit::make_promise( |
| [&, d = fit::defer([&] { destruction[1]++; })](fit::context& context) |
| -> fit::result<> { |
| run_count[1]++; |
| context.suspend_task(); // ignore result |
| return fit::pending(); |
| })); |
| |
| // Schedule a task that suspends itself and drops the |suspended_task| |
| // object from a different thread so it is abandoned concurrently. |
| executor.schedule_task(fit::make_promise( |
| [&, d = fit::defer([&] { destruction[2]++; })](fit::context& context) |
| -> fit::result<> { |
| run_count[2]++; |
| std::async(std::launch::async, [s = context.suspend_task()] {}); |
| return fit::pending(); |
| })); |
| |
| // Schedule a task that creates several suspended task handles and drops |
| // them all on the floor. |
| executor.schedule_task(fit::make_promise( |
| [&, d = fit::defer([&] { destruction[3]++; })](fit::context& context) |
| -> fit::result<> { |
| run_count[3]++; |
| fit::suspended_task s[3]; |
| for (size_t i = 0; i < 3; i++) |
| s[i] = context.suspend_task(); |
| return fit::pending(); |
| })); |
| |
| // We expect the tasks to have been executed but to have been abandoned. |
| loop.RunUntilIdle(); |
| EXPECT_EQ(1, run_count[0]); |
| EXPECT_EQ(1, destruction[0]); |
| EXPECT_EQ(1, run_count[1]); |
| EXPECT_EQ(1, destruction[1]); |
| EXPECT_EQ(1, run_count[2]); |
| EXPECT_EQ(1, destruction[2]); |
| EXPECT_EQ(1, run_count[3]); |
| EXPECT_EQ(1, destruction[3]); |
| |
| END_TEST; |
| } |
| |
| bool dispatcher_property() { |
| BEGIN_TEST; |
| |
| async::Loop loop(&kAsyncLoopConfigNoAttachToThread); |
| async::Executor executor(loop.dispatcher()); |
| EXPECT_EQ(loop.dispatcher(), executor.dispatcher()); |
| |
| // Just check that the task receives a context that exposes the dispatcher |
| // property. |
| async_dispatcher_t* received_dispatcher = nullptr; |
| executor.schedule_task(fit::make_promise([&](fit::context& context) { |
| received_dispatcher = context.as<async::Context>().dispatcher(); |
| })); |
| EXPECT_NULL(received_dispatcher); |
| |
| // We expect that all of the tasks will run to completion. |
| loop.RunUntilIdle(); |
| EXPECT_EQ(loop.dispatcher(), received_dispatcher); |
| |
| END_TEST; |
| } |
| |
| bool tasks_scheduled_after_loop_shutdown_are_immediately_destroyed() { |
| BEGIN_TEST; |
| |
| async::Loop loop(&kAsyncLoopConfigNoAttachToThread); |
| async::Executor executor(loop.dispatcher()); |
| |
| // Shutdown the loop then schedule a task. |
| // The task should be immediately destroyed. |
| loop.Shutdown(); |
| bool was_destroyed = false; |
| executor.schedule_task(fit::make_promise( |
| [d = fit::defer([&] { was_destroyed = true; })] {})); |
| EXPECT_TRUE(was_destroyed); |
| |
| END_TEST; |
| } |
| |
| bool when_loop_is_shutdown_all_remaining_tasks_are_immediately_destroyed() { |
| BEGIN_TEST; |
| |
| async::Loop loop(&kAsyncLoopConfigNoAttachToThread); |
| async::Executor executor(loop.dispatcher()); |
| |
| // Schedule a task and let it be suspended. |
| fit::suspended_task suspend; |
| bool was_destroyed[2] = {}; |
| executor.schedule_task(fit::make_promise( |
| [&, d = fit::defer([&] { was_destroyed[0] = true; })](fit::context& context) { |
| suspend = context.suspend_task(); |
| return fit::pending(); |
| })); |
| loop.RunUntilIdle(); |
| EXPECT_TRUE(suspend); |
| EXPECT_FALSE(was_destroyed[0]); |
| |
| // Schedule another task that never gets a chance to run. |
| executor.schedule_task(fit::make_promise( |
| [d = fit::defer([&] { was_destroyed[1] = true; })] {})); |
| EXPECT_FALSE(was_destroyed[1]); |
| |
| // Shutdown the loop and ensure that everything was destroyed, including |
| // the task that remained suspended. |
| loop.Shutdown(); |
| EXPECT_TRUE(was_destroyed[0]); |
| EXPECT_TRUE(was_destroyed[1]); |
| |
| END_TEST; |
| } |
| |
| } // namespace |
| |
| BEGIN_TEST_CASE(executor_tests) |
| RUN_TEST(running_tasks) |
| RUN_TEST(suspending_and_resuming_tasks) |
| RUN_TEST(abandoning_tasks) |
| RUN_TEST(dispatcher_property) |
| RUN_TEST(tasks_scheduled_after_loop_shutdown_are_immediately_destroyed) |
| RUN_TEST(when_loop_is_shutdown_all_remaining_tasks_are_immediately_destroyed) |
| END_TEST_CASE(executor_tests) |