blob: 1fe04e832556e85a7b54f68df7458fe52b59c83d [file] [log] [blame]
// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include <lib/fpromise/bridge.h>
#include <lib/fpromise/sequencer.h>
#include <lib/fpromise/single_threaded_executor.h>
#include <unistd.h>
#include <string>
#include <thread>
#include <zxtest/zxtest.h>
namespace {
TEST(SequencerTests, sequencing_tasks) {
fpromise::sequencer seq;
std::string str;
// This promise writes ":a" sequentially then writes ":a2" later.
auto a = fpromise::make_promise([&] { str += ":a"; })
.wrap_with(seq)
.then([&](const fpromise::result<>&) { str += ":a2"; });
// This promise writes ":b" sequentially then writes ":b2" and ":b3" later.
// Also schedules another sequential task that writes ":e".
auto b = fpromise::make_promise([&](fpromise::context& context) {
str += ":b";
context.executor()->schedule_task(
fpromise::make_promise([&] { str += ":e"; }).wrap_with(seq));
})
.wrap_with(seq)
.then([&, count = 0](fpromise::context& context,
const fpromise::result<>&) mutable -> fpromise::result<> {
if (++count == 5) {
str += ":b3";
return fpromise::error();
}
str += ":b2";
context.suspend_task().resume_task(); // immediately resume
return fpromise::pending();
});
// This promise writes ":c" sequentially then abandons itself.
auto c = fpromise::make_promise([&](fpromise::context& context) {
str += ":c";
context.suspend_task(); // abandon result
return fpromise::pending();
})
.wrap_with(seq)
.then([&](const fpromise::result<>&) { str += ":c2"; });
// This promise writes ":d" sequentially.
auto d = fpromise::make_promise([&] { str += ":d"; }).wrap_with(seq);
// These promises just write ":z1" and ":z2" whenever they happen to run.
auto z1 = fpromise::make_promise([&] { str += ":z1"; });
auto z2 = fpromise::make_promise([&] { str += ":z2"; });
// Schedule the promises in an order which intentionally does not
// match the sequencing.
fpromise::single_threaded_executor executor;
executor.schedule_task(std::move(z1));
executor.schedule_task(std::move(b));
executor.schedule_task(std::move(c));
executor.schedule_task(std::move(a));
executor.schedule_task(std::move(d));
executor.schedule_task(std::move(z2));
executor.run();
// Evaluate the promises and check the execution order.
EXPECT_STREQ(":z1:a:a2:z2:b:b2:c:b2:d:b2:e:b2:b3", str.c_str());
}
TEST(SequencerTests, thread_safety) {
fpromise::sequencer seq;
fpromise::single_threaded_executor executor;
uint64_t run_count = 0;
// Schedule work from a few threads, just to show that we can.
constexpr int num_threads = 4;
constexpr int num_tasks_per_thread = 100;
std::thread threads[num_threads];
for (int i = 0; i < num_threads; i++) {
fpromise::bridge bridge;
threads[i] = std::thread([&, completer = std::move(bridge.completer)]() mutable {
for (int j = 0; j < num_tasks_per_thread; j++) {
executor.schedule_task(fpromise::make_promise([&] { run_count++; }).wrap_with(seq));
usleep(1);
}
completer.complete_ok();
});
executor.schedule_task(bridge.consumer.promise());
}
// Run the tasks.
executor.run();
for (int i = 0; i < num_threads; i++)
threads[i].join();
// We expect all tasks to have run.
EXPECT_EQ(num_threads * num_tasks_per_thread, run_count);
}
} // namespace