blob: 435cf243f6b9a32adf159f5b6d18bcf5487d06e2 [file] [log] [blame]
// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include <lib/fit/defer.h>
#include <lib/fpromise/single_threaded_executor.h>
#include <thread>
#include <zxtest/zxtest.h>
#include "unittest_utils.h"
namespace {
TEST(SingleThreadedExecutorTests, running_tasks) {
fpromise::single_threaded_executor executor;
uint64_t run_count[3] = {};
// Schedule a task that runs once and increments a counter.
executor.schedule_task(fpromise::make_promise([&] { run_count[0]++; }));
// Schedule a task that runs once, increments a counter,
// and scheduled another task.
executor.schedule_task(fpromise::make_promise([&](fpromise::context& context) {
run_count[1]++;
ASSERT_CRITICAL(context.executor() == &executor);
context.executor()->schedule_task(fpromise::make_promise([&] { run_count[2]++; }));
}));
EXPECT_EQ(0, run_count[0]);
EXPECT_EQ(0, run_count[1]);
EXPECT_EQ(0, run_count[2]);
// We expect that all of the tasks will run to completion including newly
// scheduled tasks.
executor.run();
EXPECT_EQ(1, run_count[0]);
EXPECT_EQ(1, run_count[1]);
EXPECT_EQ(1, run_count[2]);
}
TEST(SingleThreadedExecutorTests, suspending_and_resuming_tasks) {
fpromise::single_threaded_executor executor;
uint64_t run_count[5] = {};
uint64_t resume_count[5] = {};
// Schedule a task that suspends itself and immediately resumes.
executor.schedule_task(
fpromise::make_promise([&](fpromise::context& context) -> fpromise::result<> {
if (++run_count[0] == 100)
return fpromise::ok();
resume_count[0]++;
context.suspend_task().resume_task();
return fpromise::pending();
}));
// Schedule a task that requires several iterations to complete, each
// time scheduling another task to resume itself after suspension.
executor.schedule_task(
fpromise::make_promise([&](fpromise::context& context) -> fpromise::result<> {
if (++run_count[1] == 100)
return fpromise::ok();
context.executor()->schedule_task(
fpromise::make_promise([&, s = context.suspend_task()]() mutable {
resume_count[1]++;
s.resume_task();
}));
return fpromise::pending();
}));
// Same as the above but use another thread to resume.
executor.schedule_task(
fpromise::make_promise([&](fpromise::context& context) -> fpromise::result<> {
if (++run_count[2] == 100)
return fpromise::ok();
std::thread([&, s = context.suspend_task()]() mutable {
resume_count[2]++;
s.resume_task();
}).detach();
return fpromise::pending();
}));
// Schedule a task that suspends itself but doesn't actually return pending
// so it only runs once.
executor.schedule_task(
fpromise::make_promise([&](fpromise::context& context) -> fpromise::result<> {
run_count[3]++;
context.suspend_task();
return fpromise::ok();
}));
// Schedule a task that suspends itself and arranges to be resumed on
// one of two other threads, whichever gets there first.
executor.schedule_task(
fpromise::make_promise([&](fpromise::context& context) -> fpromise::result<> {
if (++run_count[4] == 100)
return fpromise::ok();
// Race two threads to resume the task. Either can win.
// This is safe because these threads don't capture references to
// local variables that might go out of scope when the test exits.
std::thread([s = context.suspend_task()]() mutable { s.resume_task(); }).detach();
std::thread([s = context.suspend_task()]() mutable { s.resume_task(); }).detach();
return fpromise::pending();
}));
// We expect the tasks to have been completed after being resumed several times.
executor.run();
EXPECT_EQ(100, run_count[0]);
EXPECT_EQ(99, resume_count[0]);
EXPECT_EQ(100, run_count[1]);
EXPECT_EQ(99, resume_count[1]);
EXPECT_EQ(100, run_count[2]);
EXPECT_EQ(99, resume_count[2]);
EXPECT_EQ(1, run_count[3]);
EXPECT_EQ(0, resume_count[3]);
EXPECT_EQ(100, run_count[4]);
}
// Test disabled due to flakiness. See fxbug.dev/8378.
TEST(SingleThreadedExecutorTests, DISABLED_abandoning_tasks) {
fpromise::single_threaded_executor executor;
uint64_t run_count[4] = {};
uint64_t destruction[4] = {};
// Schedule a task that returns pending without suspending itself
// so it is immediately abandoned.
executor.schedule_task(fpromise::make_promise(
[&, d = fit::defer([&] { destruction[0]++; })]() -> fpromise::result<> {
run_count[0]++;
return fpromise::pending();
}));
// Schedule a task that suspends itself but drops the |suspended_task|
// object before returning so it is immediately abandoned.
executor.schedule_task(
fpromise::make_promise([&, d = fit::defer([&] { destruction[1]++; })](
fpromise::context& context) -> fpromise::result<> {
run_count[1]++;
context.suspend_task(); // ignore result
return fpromise::pending();
}));
// Schedule a task that suspends itself and drops the |suspended_task|
// object from a different thread so it is abandoned concurrently.
executor.schedule_task(
fpromise::make_promise([&, d = fit::defer([&] { destruction[2]++; })](
fpromise::context& context) -> fpromise::result<> {
run_count[2]++;
std::thread([s = context.suspend_task()] {}).detach();
return fpromise::pending();
}));
// Schedule a task that creates several suspended task handles and drops
// them all on the floor.
executor.schedule_task(
fpromise::make_promise([&, d = fit::defer([&] { destruction[3]++; })](
fpromise::context& context) -> fpromise::result<> {
run_count[3]++;
fpromise::suspended_task s[3];
for (size_t i = 0; i < 3; i++)
s[i] = context.suspend_task();
return fpromise::pending();
}));
// We expect the tasks to have been executed but to have been abandoned.
executor.run();
EXPECT_EQ(1, run_count[0]);
EXPECT_EQ(1, destruction[0]);
EXPECT_EQ(1, run_count[1]);
EXPECT_EQ(1, destruction[1]);
EXPECT_EQ(1, run_count[2]);
EXPECT_EQ(1, destruction[2]);
EXPECT_EQ(1, run_count[3]);
EXPECT_EQ(1, destruction[3]);
}
TEST(SingleThreadedExecutorTests, run_single_threaded) {
uint64_t run_count = 0;
fpromise::result<int> result = fpromise::run_single_threaded(fpromise::make_promise([&]() {
run_count++;
return fpromise::ok(42);
}));
EXPECT_EQ(42, result.value());
EXPECT_EQ(1, run_count);
}
TEST(SingleThreadedExecutorTests, run_single_threaded_move_only_result) {
const int kGolden = 5;
size_t run_count = 0;
auto promise = fpromise::make_promise([&]() {
run_count++;
return fpromise::ok(std::make_unique<int>(kGolden));
});
fpromise::result<std::unique_ptr<int>> result = fpromise::run_single_threaded(std::move(promise));
EXPECT_EQ(kGolden, *result.value());
EXPECT_EQ(1, run_count);
}
} // namespace