blob: 6e3ce947ab04253d0b9ff3a5d108c889d09e6b21 [file] [log] [blame]
// Copyright 2019 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include <lib/fasync/barrier.h>
#include <lib/fasync/sequencer.h>
#include <lib/fasync/single_threaded_executor.h>
#include <unistd.h>
#include <string>
#include <thread>
#include <zxtest/zxtest.h>
namespace {
// Wrapping tasks with a barrier should still allow them to complete, even without a sync.
TEST(BarrierTests, wrapping_tasks_no_sync) {
bool array[3] = {};
auto a = fasync::make_future([&] { array[0] = true; });
auto b = fasync::make_future([&] { array[1] = true; });
auto c = fasync::make_future([&] { array[2] = true; });
for (bool b : array) {
EXPECT_FALSE(b);
}
fasync::barrier barrier;
fasync::single_threaded_executor executor;
executor.schedule(std::move(a) | fasync::wrap_with(barrier));
executor.schedule(std::move(b) | fasync::wrap_with(barrier));
executor.schedule(std::move(c) | fasync::wrap_with(barrier));
executor.run();
for (bool b : array) {
EXPECT_TRUE(b);
}
}
// Syncing tasks with should still allow them to complete, even without pending work.
TEST(BarrierTests, sync_no_wrapped_tasks) {
bool array[3] = {};
auto a = [&] { array[0] = true; };
auto b = [&] { array[1] = true; };
auto c = [&] { array[2] = true; };
for (bool b : array) {
EXPECT_FALSE(b);
}
fasync::barrier barrier;
fasync::single_threaded_executor executor;
executor.schedule(barrier.sync() | fasync::and_then(std::move(a)));
executor.schedule(barrier.sync() | fasync::and_then(std::move(b)));
executor.schedule(barrier.sync() | fasync::and_then(std::move(c)));
executor.run();
for (bool b : array) {
EXPECT_TRUE(b);
}
}
// Wrap up a bunch of work in the barrier before syncing a barrier.
// Observe that the wrapped work completes before the sync.
TEST(BarrierTests, wrap_then_sync) {
bool array[3] = {};
auto a = fasync::make_future([&] { array[0] = true; });
auto b = fasync::make_future([&] { array[1] = true; });
auto c = fasync::make_future([&] { array[2] = true; });
bool sync_complete = false;
auto sync = [&] {
for (size_t i = 0; i < std::size(array); i++) {
EXPECT_TRUE(array[i]);
}
sync_complete = true;
};
for (size_t i = 0; i < std::size(array); i++) {
EXPECT_FALSE(array[i]);
}
fasync::barrier barrier;
auto a_tracked = std::move(a) | fasync::wrap_with(barrier);
auto b_tracked = std::move(b) | fasync::wrap_with(barrier);
auto c_tracked = std::move(c) | fasync::wrap_with(barrier);
// Note that we schedule the "sync" task first, even though we expect it to actually be executed
// last. This is just a little extra nudge to ensure our executor isn't implicitly supplying this
// order for us.
fasync::single_threaded_executor executor;
executor.schedule(barrier.sync() | fasync::and_then(std::move(sync)));
executor.schedule(std::move(a_tracked));
executor.schedule(std::move(b_tracked));
executor.schedule(std::move(c_tracked));
executor.run();
EXPECT_TRUE(sync_complete);
}
// Observe that the order of "barrier.wrap" does not re-order the wrapped futures, but merely
// provides ordering before the sync point.
TEST(BarrierTests, wrap_preserves_initial_order) {
// Create three futures.
//
// They will be sequencer-wrapped in the order "a, b, c".
// They will be barrier-wrapped in the order "c, b, a".
//
// Observe that by wrapping them, the sequence order is still preserved.
bool array[3] = {};
auto a = fasync::make_future([&] {
array[0] = true;
assert(!array[1]);
assert(!array[2]);
});
auto b = fasync::make_future([&] {
assert(array[0]);
array[1] = true;
assert(!array[2]);
});
auto c = fasync::make_future([&] {
assert(array[0]);
assert(array[1]);
array[2] = true;
});
bool sync_complete = false;
auto sync = [&] {
for (size_t i = 0; i < std::size(array); i++) {
EXPECT_TRUE(array[i]);
}
sync_complete = true;
};
for (size_t i = 0; i < std::size(array); i++) {
EXPECT_FALSE(array[i]);
}
fasync::sequencer seq;
auto a_sequenced = std::move(a) | fasync::wrap_with(seq);
auto b_sequenced = std::move(b) | fasync::wrap_with(seq);
auto c_sequenced = std::move(c) | fasync::wrap_with(seq);
fasync::barrier barrier;
auto c_tracked = std::move(c_sequenced) | fasync::wrap_with(barrier);
auto b_tracked = std::move(b_sequenced) | fasync::wrap_with(barrier);
auto a_tracked = std::move(a_sequenced) | fasync::wrap_with(barrier);
fasync::single_threaded_executor executor;
executor.schedule(barrier.sync() | fasync::and_then(std::move(sync)));
executor.schedule(std::move(a_tracked));
executor.schedule(std::move(b_tracked));
executor.schedule(std::move(c_tracked));
executor.run();
EXPECT_TRUE(sync_complete);
}
// Observe that futures chained after the "wrap" request do not block the sync.
TEST(BarrierTests, work_after_wrap_non_blocking) {
bool work_complete = false;
auto work = fasync::make_future([&] { work_complete = true; });
bool sync_complete = false;
auto sync = [&] {
assert(work_complete);
sync_complete = true;
};
fasync::barrier barrier;
auto work_wrapped =
barrier.wrap(std::move(work)) | fasync::then([&](fasync::context& context) -> fasync::poll<> {
// If the full chain of execution after "work" was required to complete before sync, then
// |sync_complete| will remain false forever, and this task will never be completed.
if (!sync_complete) {
context.suspend_task().resume();
return fasync::pending();
}
return fasync::ready();
});
fasync::single_threaded_executor executor;
executor.schedule(std::move(work_wrapped));
executor.schedule(barrier.sync() | fasync::and_then(std::move(sync)));
executor.run();
EXPECT_TRUE(work_complete);
EXPECT_TRUE(sync_complete);
}
// Observe that back-to-back sync operations are still ordered, and cannot skip ahead of previously
// wrapped work.
TEST(BarrierTests, multiple_syncs_after_work_are_ordered) {
bool work_complete = false;
auto work = fasync::make_future([&] { work_complete = true; });
bool syncs_complete[] = {false, false};
auto sync1 = [&] {
assert(work_complete);
assert(!syncs_complete[1]);
syncs_complete[0] = true;
};
auto sync2 = [&] {
assert(work_complete);
assert(syncs_complete[0]);
syncs_complete[1] = true;
};
fasync::barrier barrier;
auto work_wrapped = std::move(work) | fasync::wrap_with(barrier);
fasync::single_threaded_executor executor;
executor.schedule(barrier.sync() | fasync::and_then(std::move(sync1)));
executor.schedule(barrier.sync() | fasync::and_then(std::move(sync2)));
executor.schedule(std::move(work_wrapped));
executor.run();
EXPECT_TRUE(work_complete);
EXPECT_TRUE(syncs_complete[0]);
EXPECT_TRUE(syncs_complete[1]);
}
// Abandoning futures should still allow sync to complete.
TEST(BarrierTests, abandoned_futures_are_ordered_by_sync) {
auto work = fasync::make_future([&] { assert(false); });
bool sync_complete = false;
auto sync = [&] { sync_complete = true; };
fasync::barrier barrier;
fasync::single_threaded_executor executor;
{
auto work_wrapped = std::move(work) | fasync::wrap_with(barrier);
executor.schedule(barrier.sync() | fasync::and_then(std::move(sync)));
// |work_wrapped| is destroyed (abandoned) here.
}
executor.run();
EXPECT_TRUE(sync_complete);
}
} // namespace