blob: e5c487f8f3fe10ec7e7f8bc97727cbe6690751f5 [file] [log] [blame]
// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include <lib/fasync/bridge.h>
#include <lib/fasync/sequencer.h>
#include <lib/fasync/single_threaded_executor.h>
#include <unistd.h>
#include <string>
#include <thread>
#include <zxtest/zxtest.h>
#undef assert
#define assert(expr) \
do { \
if (!(expr)) { \
std::cout << "Assert " << #expr << " failed at:" << __LINE__ << std::endl; \
abort(); \
} \
} while (false)
namespace {
TEST(SequencerTests, sequencing_tasks) {
fasync::sequencer seq;
std::string str;
// This future writes ":a" sequentially then writes ":a2" later.
auto a = fasync::make_future([&] { str += ":a"; }) | fasync::wrap_with(seq) |
fasync::then([&] { str += ":a2"; });
// This future writes ":b" sequentially then writes ":b2" and ":b3" later.
// Also schedules another sequential task that writes ":e".
auto b = fasync::make_future([&](fasync::context& context) {
str += ":b";
context.executor().schedule(fasync::make_future([&] { str += ":e"; }) |
fasync::wrap_with(seq));
}) |
fasync::wrap_with(seq) |
fasync::then(
[&, count = 0](fasync::context& context) mutable -> fasync::try_poll<fitx::failed> {
if (++count == 5) {
str += ":b3";
return fasync::ready(fitx::failed());
}
str += ":b2";
context.suspend_task().resume(); // immediately resume
return fasync::pending();
});
static_assert(fasync::is_try_future_v<decltype(b)>);
// This future writes ":c" sequentially then abandons itself.
auto c = fasync::make_future([&](fasync::context& context) {
str += ":c";
context.suspend_task(); // abandon result
return fasync::pending();
}) |
fasync::wrap_with(seq) | fasync::then([&] { str += ":c2"; });
// This future writes ":d" sequentially.
auto d = fasync::make_future([&] { str += ":d"; }) | fasync::wrap_with(seq);
// These futures just write ":z1" and ":z2" whenever they happen to run.
auto z1 = fasync::make_future([&] { str += ":z1"; });
auto z2 = fasync::make_future([&] { str += ":z2"; });
// Schedule the futures in an order which intentionally does not match the sequencing.
fasync::single_threaded_executor executor;
executor.schedule(std::move(z1));
executor.schedule(std::move(b));
executor.schedule(std::move(c));
executor.schedule(std::move(a));
executor.schedule(std::move(d));
executor.schedule(std::move(z2));
executor.run();
// Evaluate the futures and check the execution order.
EXPECT_STREQ(":z1:a:a2:z2:b:b2:c:b2:d:b2:e:b2:b3", str.c_str());
}
TEST(SequencerTests, thread_safety) {
fasync::sequencer seq;
fasync::single_threaded_executor executor;
uint64_t run_count = 0;
// Schedule work from a few threads, just to show that we can.
constexpr int num_threads = 4;
constexpr int num_tasks_per_thread = 100;
std::thread threads[num_threads];
for (int i = 0; i < num_threads; i++) {
fasync::bridge<fitx::failed> bridge;
threads[i] = std::thread([&, completer = std::move(bridge.completer)]() mutable {
for (int j = 0; j < num_tasks_per_thread; j++) {
executor.schedule(fasync::make_future([&] { run_count++; }) | fasync::wrap_with(seq));
usleep(1);
}
completer.complete_ok();
});
executor.schedule(bridge.consumer.future());
}
// Run the tasks.
executor.run();
for (int i = 0; i < num_threads; i++) {
threads[i].join();
}
// We expect all tasks to have run.
EXPECT_EQ(num_threads * num_tasks_per_thread, run_count);
}
} // namespace