blob: df748b092d349b472198dc8083924755374c3b26 [file] [log] [blame]
// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include <lib/fasync/bridge.h>
#include <lib/fasync/future.h>
#include <lib/fasync/scope.h>
#include <lib/fasync/single_threaded_executor.h>
#include <lib/fit/defer.h>
#include <unistd.h>
#include <thread>
#include <zxtest/zxtest.h>
#include "test_utils.h"
namespace {
// Asynchronously accumulates a sum.
// This is an example of an object that offers futures that captures the "this" pointer, thereby
// needing a scope to prevent dangling pointers in case it is destroyed before the futures complete.
class accumulator {
public:
// Adds a value to the counter then returns it.
// Takes time proportional to the value being added.
fasync::future<uint32_t> add(uint32_t value) {
return fasync::make_future(
[this, cycles = value](fasync::context& context) mutable -> fasync::poll<uint32_t> {
if (cycles == 0) {
return fasync::ready(counter_);
}
counter_++;
cycles--;
context.suspend_task().resume();
return fasync::pending();
}) |
fasync::wrap_with(scope_);
}
// Gets the current count, immediately.
uint32_t count() const { return counter_; }
private:
fasync::scope scope_;
uint32_t counter_ = 0;
};
TEST(ScopeTests, scoping_tasks) {
auto acc = std::make_unique<accumulator>();
fasync::single_threaded_executor executor;
uint32_t sums[4] = {};
// Schedule some tasks which accumulate values asynchronously.
executor.schedule(acc->add(2) | fasync::then([&](const uint32_t& value) { sums[0] = value; }));
executor.schedule(acc->add(1) | fasync::then([&](const uint32_t& value) { sums[1] = value; }));
executor.schedule(acc->add(5) | fasync::then([&](const uint32_t& value) { sums[2] = value; }));
// Schedule a task which accumulates and then destroys the accumulator so that the scope is
// exited. Any remaining futures will be aborted.
uint32_t last_count = 0;
executor.schedule(acc->add(3) | fasync::then([&](const uint32_t& value) {
sums[3] = value;
// Schedule destruction in another task to avoid re-entrance.
executor.schedule(fasync::make_future([&] {
last_count = acc->count();
acc.reset();
}));
}));
// Run the tasks.
executor.run();
// The counts reflect the fact that the scope is exited part-way through the cycle. For example,
// the sums[2] task doesn't get to run since it only runs after 5 cycles and the scope is exited
// on the third.
EXPECT_EQ(11, last_count);
EXPECT_EQ(7, sums[0]);
EXPECT_EQ(5, sums[1]);
EXPECT_EQ(0, sums[2]);
EXPECT_EQ(10, sums[3]);
}
TEST(ScopeTests, exit_destroys_wrapped_futures) {
fasync::scope scope;
EXPECT_FALSE(scope.exited());
// Set up three wrapped futures.
bool destroyed[4] = {};
auto p0 =
scope.wrap(fasync::make_future([d = fit::defer([&] {
destroyed[0] = true;
})]() -> fitx::result<fitx::failed> { return fitx::ok(); }));
auto p1 =
scope.wrap(fasync::make_future([d = fit::defer([&] {
destroyed[1] = true;
})]() -> fitx::result<fitx::failed> { return fitx::ok(); }));
auto p2 =
scope.wrap(fasync::make_future([d = fit::defer([&] {
destroyed[2] = true;
})]() -> fitx::result<fitx::failed> { return fitx::ok(); }));
EXPECT_FALSE(destroyed[0]);
EXPECT_FALSE(destroyed[1]);
EXPECT_FALSE(destroyed[2]);
// Execute one of them to completion, causing it to be destroyed.
EXPECT_TRUE(fasync::block(std::move(p1)).value().is_ok());
EXPECT_FALSE(destroyed[0]);
EXPECT_TRUE(destroyed[1]);
EXPECT_FALSE(destroyed[2]);
// Exit the scope, causing the wrapped future to be destroyed while still leaving the wrapper
// alive (but aborted).
scope.exit();
EXPECT_TRUE(scope.exited());
EXPECT_TRUE(destroyed[0]);
EXPECT_TRUE(destroyed[1]);
EXPECT_TRUE(destroyed[2]);
// Wrapping another future causes the wrapped future to be immediately destroyed.
auto p3 =
scope.wrap(fasync::make_future([d = fit::defer([&] {
destroyed[3] = true;
})]() -> fitx::result<fitx::failed> { return fitx::ok(); }));
EXPECT_TRUE(destroyed[3]);
// Executing the wrapped futures returns pending.
EXPECT_TRUE(fasync::testing::poll(std::move(p0)).is_pending());
EXPECT_TRUE(fasync::testing::poll(std::move(p2)).is_pending());
EXPECT_TRUE(fasync::testing::poll(std::move(p3)).is_pending());
// Exiting again has no effect.
scope.exit();
EXPECT_TRUE(scope.exited());
}
TEST(ScopeTests, double_wrap) {
fasync::scope scope;
// Here we wrap a task that's already been wrapped to see what happens when the scope is exited.
// This is interesting because it means that the destruction of one wrapped future will cause the
// destruction of another wrapped future and could uncover re-entrance issues.
uint32_t run_count = 0;
bool destroyed = false;
auto future =
fasync::make_future([&, d = fit::defer([&] { destroyed = true; })](fasync::context& context) {
run_count++;
return fasync::pending();
}) |
fasync::wrap_with(scope) | fasync::wrap_with(scope); // wrap again!
// Run the future once to show that we can.
EXPECT_TRUE(fasync::testing::poll(future).is_pending());
EXPECT_EQ(1, run_count);
EXPECT_FALSE(destroyed);
// Now exit the scope, which should cause the future to be destroyed.
scope.exit();
EXPECT_EQ(1, run_count);
EXPECT_TRUE(destroyed);
// Running the future again should do nothing.
EXPECT_TRUE(fasync::testing::poll(std::move(future)).is_pending());
EXPECT_EQ(1, run_count);
EXPECT_TRUE(destroyed);
}
TEST(ScopeTests, thread_safety) {
fasync::scope scope;
fasync::single_threaded_executor executor;
uint64_t run_count = 0;
// Schedule work from a few threads, just to show that we can.
// Part way through, exit the scope.
constexpr int num_threads = 4;
constexpr int num_tasks_per_thread = 100;
constexpr int exit_threshold = 75;
std::thread threads[num_threads];
for (int i = 0; i < num_threads; i++) {
fasync::bridge<fitx::failed> bridge;
threads[i] = std::thread([&, completer = std::move(bridge.completer)]() mutable {
for (int j = 0; j < num_tasks_per_thread; j++) {
if (j == exit_threshold) {
executor.schedule(fasync::make_future([&] { scope.exit(); }));
}
executor.schedule(fasync::make_future([&] { run_count++; }) | fasync::wrap_with(scope));
}
completer.complete_ok();
});
executor.schedule(bridge.consumer.future());
}
// Run the tasks.
executor.run();
for (int i = 0; i < num_threads; i++) {
threads[i].join();
}
// We expect some non-deterministic number of tasks to have run related to the exit threshold.
// We scheduled num_threads * num_tasks_per_thread tasks, but on each thread we exited the
// (common) scope after scheduling its first exit_threshold tasks. Once one of those threads
// exits the scope, no more tasks (scheduled by any thread) will run within the scope, so the
// number of executed tasks cannot increase any further. Therefore we know that at least
// |exit_threshold| tasks have run but we could have run as many as |num_threads * exit_threshold|
// in a perfect world where all of the threads called |scope.exit()| at the same time.
EXPECT_GE(run_count, exit_threshold);
EXPECT_LE(run_count, num_threads * exit_threshold);
}
} // namespace