blob: a243534f1c2bc26c52cf5e941529c034cce9f79d [file] [log] [blame]
// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include <lib/fit/defer.h>
#include <lib/fpromise/bridge.h>
#include <lib/fpromise/scope.h>
#include <lib/fpromise/single_threaded_executor.h>
#include <unistd.h>
#include <thread>
#include <zxtest/zxtest.h>
#include "unittest_utils.h"
namespace {
class fake_context : public fpromise::context {
public:
fpromise::executor* executor() const override { ASSERT_CRITICAL(false); }
fpromise::suspended_task suspend_task() override { ASSERT_CRITICAL(false); }
};
// Asynchronously accumulates a sum.
// This is an example of an object that offers promises that captures
// the "this" pointer, thereby needing a scope to prevent dangling pointers
// in case it is destroyed before the promises complete.
class accumulator {
public:
// Adds a value to the counter then returns it.
// Takes time proportional to the value being added.
fpromise::promise<uint32_t> add(uint32_t value) {
return fpromise::make_promise(
[this,
cycles = value](fpromise::context& context) mutable -> fpromise::result<uint32_t> {
if (cycles == 0)
return fpromise::ok(counter_);
counter_++;
cycles--;
context.suspend_task().resume_task();
return fpromise::pending();
})
.wrap_with(scope_);
}
// Gets the current count, immediately.
uint32_t count() const { return counter_; }
private:
fpromise::scope scope_;
uint32_t counter_ = 0;
};
TEST(ScopeTests, scoping_tasks) {
auto acc = std::make_unique<accumulator>();
fpromise::single_threaded_executor executor;
uint32_t sums[4] = {};
// Schedule some tasks which accumulate values asynchronously.
executor.schedule_task(acc->add(2).and_then([&](const uint32_t& value) { sums[0] = value; }));
executor.schedule_task(acc->add(1).and_then([&](const uint32_t& value) { sums[1] = value; }));
executor.schedule_task(acc->add(5).and_then([&](const uint32_t& value) { sums[2] = value; }));
// Schedule a task which accumulates and then destroys the accumulator
// so that the scope is exited. Any remaining promises will be aborted.
uint32_t last_count = 0;
executor.schedule_task(acc->add(3).and_then([&](const uint32_t& value) {
sums[3] = value;
// Schedule destruction in another task to avoid re-entrance.
executor.schedule_task(fpromise::make_promise([&] {
last_count = acc->count();
acc.reset();
}));
}));
// Run the tasks.
executor.run();
// The counts reflect the fact that the scope is exited part-way through
// the cycle. For example, the sums[2] task doesn't get to run since
// it only runs after 5 cycles and the scope is exited on the third.
EXPECT_EQ(11, last_count);
EXPECT_EQ(7, sums[0]);
EXPECT_EQ(5, sums[1]);
EXPECT_EQ(0, sums[2]);
EXPECT_EQ(10, sums[3]);
}
TEST(ScopeTests, exit_destroys_wrapped_promises) {
fpromise::scope scope;
EXPECT_FALSE(scope.exited());
// Set up three wrapped promises.
bool destroyed[4] = {};
auto p0 = scope.wrap(fpromise::make_promise(
[d = fit::defer([&] { destroyed[0] = true; })] { return fpromise::ok(); }));
auto p1 = scope.wrap(fpromise::make_promise(
[d = fit::defer([&] { destroyed[1] = true; })] { return fpromise::ok(); }));
auto p2 = scope.wrap(fpromise::make_promise(
[d = fit::defer([&] { destroyed[2] = true; })] { return fpromise::ok(); }));
EXPECT_FALSE(destroyed[0]);
EXPECT_FALSE(destroyed[1]);
EXPECT_FALSE(destroyed[2]);
// Execute one of them to completion, causing it to be destroyed.
EXPECT_TRUE(fpromise::run_single_threaded(std::move(p1)).is_ok());
EXPECT_FALSE(destroyed[0]);
EXPECT_TRUE(destroyed[1]);
EXPECT_FALSE(destroyed[2]);
// Exit the scope, causing the wrapped promise to be destroyed
// while still leaving the wrapper alive (but aborted).
scope.exit();
EXPECT_TRUE(scope.exited());
EXPECT_TRUE(destroyed[0]);
EXPECT_TRUE(destroyed[1]);
EXPECT_TRUE(destroyed[2]);
// Wrapping another promise causes the wrapped promise to be immediately
// destroyed.
auto p3 = scope.wrap(fpromise::make_promise(
[d = fit::defer([&] { destroyed[3] = true; })] { return fpromise::ok(); }));
EXPECT_TRUE(destroyed[3]);
// Executing the wrapped promises returns pending.
EXPECT_TRUE(fpromise::run_single_threaded(std::move(p0)).is_pending());
EXPECT_TRUE(fpromise::run_single_threaded(std::move(p2)).is_pending());
EXPECT_TRUE(fpromise::run_single_threaded(std::move(p3)).is_pending());
// Exiting again has no effect.
scope.exit();
EXPECT_TRUE(scope.exited());
}
TEST(ScopeTests, double_wrap) {
fpromise::scope scope;
fake_context context;
// Here we wrap a task that's already been wrapped to see what happens
// when the scope is exited. This is interesting because it means that
// the destruction of one wrapped promise will cause the destruction of
// another wrapped promise and could uncover re-entrance issues.
uint32_t run_count = 0;
bool destroyed = false;
auto promise = fpromise::make_promise(
[&, d = fit::defer([&] { destroyed = true; })](fpromise::context& context) {
run_count++;
return fpromise::pending();
})
.wrap_with(scope)
.wrap_with(scope); // wrap again!
// Run the promise once to show that we can.
EXPECT_EQ(fpromise::result_state::pending, promise(context).state());
EXPECT_EQ(1, run_count);
EXPECT_FALSE(destroyed);
// Now exit the scope, which should cause the promise to be destroyed.
scope.exit();
EXPECT_EQ(1, run_count);
EXPECT_TRUE(destroyed);
// Running the promise again should do nothing.
EXPECT_EQ(fpromise::result_state::pending, promise(context).state());
EXPECT_EQ(1, run_count);
EXPECT_TRUE(destroyed);
}
TEST(ScopeTests, thread_safety) {
fpromise::scope scope;
fpromise::single_threaded_executor executor;
uint64_t run_count = 0;
// Schedule work from a few threads, just to show that we can.
// Part way through, exit the scope.
constexpr int num_threads = 4;
constexpr int num_tasks_per_thread = 100;
constexpr int exit_threshold = 75;
std::thread threads[num_threads];
for (int i = 0; i < num_threads; i++) {
fpromise::bridge bridge;
threads[i] = std::thread([&, completer = std::move(bridge.completer)]() mutable {
for (int j = 0; j < num_tasks_per_thread; j++) {
if (j == exit_threshold) {
executor.schedule_task(fpromise::make_promise([&] { scope.exit(); }));
}
executor.schedule_task(fpromise::make_promise([&] { run_count++; }).wrap_with(scope));
}
completer.complete_ok();
});
executor.schedule_task(bridge.consumer.promise());
}
// Run the tasks.
executor.run();
for (int i = 0; i < num_threads; i++)
threads[i].join();
// We expect some non-deterministic number of tasks to have run
// related to the exit threshold.
// We scheduled num_threads * num_tasks_per_thread tasks, but on each thread
// we exited the (common) scope after scheduling its first exit_threshold
// tasks. Once one of those threads exits the scope, no more tasks
// (scheduled by any thread) will run within the scope, so the number of
// executed tasks cannot increase any further. Therefore we know that at
// least exit_threshold tasks have run but we could have run as many as
// num_threads * exit_threshold in a perfect world where all of the threads
// called scope.exit() at the same time.
EXPECT_GE(run_count, exit_threshold);
EXPECT_LE(run_count, num_threads * exit_threshold);
}
} // namespace