blob: 0b64cd6c8a65554f4f8d470ae700d8351b7425e3 [file] [log] [blame]
// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include <lib/fpromise/bridge.h>
#include <lib/fpromise/promise.h>
#include <lib/fpromise/single_threaded_executor.h>
#include <future>
#include <string>
#include <thread>
#include <tuple>
#include <zxtest/zxtest.h>
#include "unittest_utils.h"
namespace {
void async_invoke_callback_no_args(uint64_t* run_count, fit::function<void()> callback) {
std::thread([run_count, callback = std::move(callback)]() mutable {
(*run_count)++;
callback();
}).detach();
}
void async_invoke_callback_one_arg(uint64_t* run_count, fit::function<void(std::string)> callback) {
std::thread([run_count, callback = std::move(callback)]() mutable {
(*run_count)++;
callback("Hippopotamus");
}).detach();
}
void async_invoke_callback_two_args(uint64_t* run_count,
fit::function<void(std::string, int)> callback) {
std::thread([run_count, callback = std::move(callback)]() mutable {
(*run_count)++;
callback("What do you get when you multiply six by nine?", 42);
}).detach();
}
TEST(BridgeTests, bridge_construction_and_assignment) {
// Create a new bridge.
fpromise::bridge<int, const char*> bridge;
EXPECT_TRUE(bridge.completer);
EXPECT_TRUE(bridge.consumer);
// Can move-construct.
fpromise::bridge<int, const char*> bridge2(std::move(bridge));
EXPECT_TRUE(bridge2.completer);
EXPECT_TRUE(bridge2.consumer);
EXPECT_FALSE(bridge.completer);
EXPECT_FALSE(bridge.consumer);
// Can move-assign.
bridge = std::move(bridge2);
EXPECT_TRUE(bridge.completer);
EXPECT_TRUE(bridge.consumer);
EXPECT_FALSE(bridge2.completer);
EXPECT_FALSE(bridge2.consumer);
// It still works.
bridge.completer.complete_error("Test");
EXPECT_FALSE(bridge.completer);
fpromise::result<int, const char*> result =
fpromise::run_single_threaded(bridge.consumer.promise());
EXPECT_FALSE(bridge.consumer);
EXPECT_EQ(fpromise::result_state::error, result.state());
EXPECT_STREQ("Test", result.error());
}
TEST(BridgeTests, completer_construction_and_assignment) {
// Default constructed completer is empty.
fpromise::completer<int, const char*> completer;
EXPECT_FALSE(completer);
// Can move-construct from non-empty.
fpromise::bridge<int, const char*> bridge;
fpromise::completer<int, const char*> completer2(std::move(bridge.completer));
EXPECT_TRUE(completer2);
// Can move-assign from non-empty.
completer = std::move(completer2);
EXPECT_TRUE(completer);
EXPECT_FALSE(completer2);
// It still works.
completer.complete_error("Test");
EXPECT_FALSE(completer);
fpromise::result<int, const char*> result =
fpromise::run_single_threaded(bridge.consumer.promise());
EXPECT_FALSE(bridge.consumer);
EXPECT_EQ(fpromise::result_state::error, result.state());
EXPECT_STREQ("Test", result.error());
// Can move-construct from empty.
fpromise::completer<int, const char*> completer3(std::move(completer2));
EXPECT_FALSE(completer3);
EXPECT_FALSE(completer2);
// Can move-assign from empty.
completer2 = std::move(completer3);
EXPECT_FALSE(completer2);
EXPECT_FALSE(completer3);
}
TEST(BridgeTests, completer_abandon) {
// abandon()
{
fpromise::bridge<int, const char*> bridge;
EXPECT_TRUE(bridge.completer);
EXPECT_FALSE(bridge.consumer.was_abandoned());
bridge.completer.abandon();
EXPECT_FALSE(bridge.completer);
EXPECT_TRUE(bridge.consumer.was_abandoned());
fpromise::result<int, const char*> result =
fpromise::run_single_threaded(bridge.consumer.promise_or(fpromise::error("Abandoned")));
EXPECT_FALSE(bridge.consumer);
EXPECT_EQ(fpromise::result_state::error, result.state());
EXPECT_STREQ("Abandoned", result.error());
}
// completer is discarded
{
fpromise::bridge<int, const char*> bridge;
EXPECT_TRUE(bridge.completer);
EXPECT_FALSE(bridge.consumer.was_abandoned());
bridge.completer = fpromise::completer<int, const char*>();
EXPECT_FALSE(bridge.completer);
EXPECT_TRUE(bridge.consumer.was_abandoned());
fpromise::result<int, const char*> result =
fpromise::run_single_threaded(bridge.consumer.promise_or(fpromise::error("Abandoned")));
EXPECT_FALSE(bridge.consumer);
EXPECT_EQ(fpromise::result_state::error, result.state());
EXPECT_STREQ("Abandoned", result.error());
}
}
TEST(BridgeTests, completer_complete) {
// complete_ok()
{
fpromise::bridge<void, const char*> bridge;
EXPECT_TRUE(bridge.completer);
EXPECT_FALSE(bridge.consumer.was_abandoned());
bridge.completer.complete_ok();
EXPECT_FALSE(bridge.completer);
EXPECT_FALSE(bridge.consumer.was_abandoned());
fpromise::result<void, const char*> result =
fpromise::run_single_threaded(bridge.consumer.promise());
EXPECT_FALSE(bridge.consumer);
EXPECT_EQ(fpromise::result_state::ok, result.state());
}
// complete_ok(value)
{
fpromise::bridge<int, const char*> bridge;
EXPECT_TRUE(bridge.completer);
EXPECT_FALSE(bridge.consumer.was_abandoned());
bridge.completer.complete_ok(42);
EXPECT_FALSE(bridge.completer);
EXPECT_FALSE(bridge.consumer.was_abandoned());
fpromise::result<int, const char*> result =
fpromise::run_single_threaded(bridge.consumer.promise());
EXPECT_FALSE(bridge.consumer);
EXPECT_EQ(fpromise::result_state::ok, result.state());
EXPECT_EQ(42, result.value());
}
// complete_error()
{
fpromise::bridge<int, void> bridge;
EXPECT_TRUE(bridge.completer);
EXPECT_FALSE(bridge.consumer.was_abandoned());
bridge.completer.complete_error();
EXPECT_FALSE(bridge.completer);
EXPECT_FALSE(bridge.consumer.was_abandoned());
fpromise::result<int, void> result = fpromise::run_single_threaded(bridge.consumer.promise());
EXPECT_FALSE(bridge.consumer);
EXPECT_EQ(fpromise::result_state::error, result.state());
}
// complete_error(error)
{
fpromise::bridge<int, const char*> bridge;
EXPECT_TRUE(bridge.completer);
EXPECT_FALSE(bridge.consumer.was_abandoned());
bridge.completer.complete_error("Test");
EXPECT_FALSE(bridge.completer);
EXPECT_FALSE(bridge.consumer.was_abandoned());
fpromise::result<int, const char*> result =
fpromise::run_single_threaded(bridge.consumer.promise());
EXPECT_FALSE(bridge.consumer);
EXPECT_EQ(fpromise::result_state::error, result.state());
EXPECT_STREQ("Test", result.error());
}
// complete_or_abandon(fpromise::ok(...))
{
fpromise::bridge<int, const char*> bridge;
EXPECT_TRUE(bridge.completer);
EXPECT_FALSE(bridge.consumer.was_abandoned());
bridge.completer.complete_or_abandon(fpromise::ok(42));
EXPECT_FALSE(bridge.completer);
EXPECT_FALSE(bridge.consumer.was_abandoned());
fpromise::result<int, const char*> result =
fpromise::run_single_threaded(bridge.consumer.promise());
EXPECT_FALSE(bridge.consumer);
EXPECT_EQ(fpromise::result_state::ok, result.state());
EXPECT_EQ(42, result.value());
}
// complete_or_abandon(fpromise::error(...))
{
fpromise::bridge<int, const char*> bridge;
EXPECT_TRUE(bridge.completer);
EXPECT_FALSE(bridge.consumer.was_abandoned());
bridge.completer.complete_or_abandon(fpromise::error("Test"));
EXPECT_FALSE(bridge.completer);
EXPECT_FALSE(bridge.consumer.was_abandoned());
fpromise::result<int, const char*> result =
fpromise::run_single_threaded(bridge.consumer.promise());
EXPECT_FALSE(bridge.consumer);
EXPECT_EQ(fpromise::result_state::error, result.state());
EXPECT_STREQ("Test", result.error());
}
// complete_or_abandon(fpromise::pending())
{
fpromise::bridge<int, const char*> bridge;
EXPECT_TRUE(bridge.completer);
EXPECT_FALSE(bridge.consumer.was_abandoned());
bridge.completer.complete_or_abandon(fpromise::pending());
EXPECT_FALSE(bridge.completer);
EXPECT_TRUE(bridge.consumer.was_abandoned());
fpromise::result<int, const char*> result =
fpromise::run_single_threaded(bridge.consumer.promise_or(fpromise::error("Abandoned")));
EXPECT_FALSE(bridge.consumer);
EXPECT_EQ(fpromise::result_state::error, result.state());
EXPECT_STREQ("Abandoned", result.error());
}
}
TEST(BridgeTests, completer_bind_no_arg_callback) {
// Use bind()
{
uint64_t run_count = 0;
fpromise::bridge<> bridge;
async_invoke_callback_no_args(&run_count, bridge.completer.bind());
EXPECT_FALSE(bridge.completer);
EXPECT_FALSE(bridge.consumer.was_abandoned());
fpromise::result<> result = fpromise::run_single_threaded(bridge.consumer.promise());
EXPECT_EQ(fpromise::result_state::ok, result.state());
EXPECT_EQ(1, run_count);
}
// Use bind_tuple()
{
uint64_t run_count = 0;
fpromise::bridge<std::tuple<>> bridge;
async_invoke_callback_no_args(&run_count, bridge.completer.bind_tuple());
EXPECT_FALSE(bridge.completer);
EXPECT_FALSE(bridge.consumer.was_abandoned());
fpromise::result<std::tuple<>> result =
fpromise::run_single_threaded(bridge.consumer.promise());
EXPECT_EQ(fpromise::result_state::ok, result.state());
EXPECT_EQ(1, run_count);
}
}
TEST(BridgeTests, completer_bind_one_arg_callback) {
// Use bind()
{
uint64_t run_count = 0;
fpromise::bridge<std::string> bridge;
async_invoke_callback_one_arg(&run_count, bridge.completer.bind());
EXPECT_FALSE(bridge.completer);
EXPECT_FALSE(bridge.consumer.was_abandoned());
fpromise::result<std::string> result = fpromise::run_single_threaded(bridge.consumer.promise());
EXPECT_EQ(fpromise::result_state::ok, result.state());
EXPECT_TRUE(result.value() == "Hippopotamus");
EXPECT_EQ(1, run_count);
}
// Use bind_tuple()
{
uint64_t run_count = 0;
fpromise::bridge<std::tuple<std::string>> bridge;
async_invoke_callback_one_arg(&run_count, bridge.completer.bind_tuple());
EXPECT_FALSE(bridge.completer);
EXPECT_FALSE(bridge.consumer.was_abandoned());
fpromise::result<std::tuple<std::string>> result =
fpromise::run_single_threaded(bridge.consumer.promise());
EXPECT_EQ(fpromise::result_state::ok, result.state());
EXPECT_TRUE(std::get<0>(result.value()) == "Hippopotamus");
EXPECT_EQ(1, run_count);
}
}
TEST(BridgeTests, completer_bind_two_arg_callback) {
// Use bind_tuple()
{
uint64_t run_count = 0;
fpromise::bridge<std::tuple<std::string, int>> bridge;
async_invoke_callback_two_args(&run_count, bridge.completer.bind_tuple());
EXPECT_FALSE(bridge.completer);
EXPECT_FALSE(bridge.consumer.was_abandoned());
fpromise::result<std::tuple<std::string, int>> result =
fpromise::run_single_threaded(bridge.consumer.promise());
EXPECT_EQ(fpromise::result_state::ok, result.state());
EXPECT_TRUE(std::get<0>(result.value()) == "What do you get when you multiply six by nine?");
EXPECT_EQ(42, std::get<1>(result.value()));
EXPECT_EQ(1, run_count);
}
}
TEST(BridgeTests, consumer_construction_and_assignment) {
// Default constructed consumer is empty.
fpromise::consumer<int, const char*> consumer;
EXPECT_FALSE(consumer);
// Can move-construct from non-empty.
fpromise::bridge<int, const char*> bridge;
fpromise::consumer<int, const char*> consumer2(std::move(bridge.consumer));
EXPECT_TRUE(consumer2);
// Can move-assign from non-empty.
consumer = std::move(consumer2);
EXPECT_TRUE(consumer);
EXPECT_FALSE(consumer2);
// It still works.
bridge.completer.complete_error("Test");
EXPECT_FALSE(bridge.completer);
fpromise::result<int, const char*> result = fpromise::run_single_threaded(consumer.promise());
EXPECT_FALSE(consumer);
EXPECT_EQ(fpromise::result_state::error, result.state());
EXPECT_STREQ("Test", result.error());
// Can move-construct from empty.
fpromise::consumer<int, const char*> consumer3(std::move(consumer2));
EXPECT_FALSE(consumer3);
EXPECT_FALSE(consumer2);
// Can move-assign from empty.
consumer2 = std::move(consumer3);
EXPECT_FALSE(consumer2);
EXPECT_FALSE(consumer3);
}
TEST(BridgeTests, consumer_cancel) {
// cancel()
{
fpromise::bridge<int, const char*> bridge;
EXPECT_TRUE(bridge.consumer);
EXPECT_FALSE(bridge.completer.was_canceled());
bridge.consumer.cancel();
EXPECT_FALSE(bridge.consumer);
EXPECT_TRUE(bridge.completer.was_canceled());
bridge.completer.complete_ok(42);
EXPECT_FALSE(bridge.completer);
}
// consumer is discarded()
{
fpromise::bridge<int, const char*> bridge;
EXPECT_TRUE(bridge.consumer);
EXPECT_FALSE(bridge.completer.was_canceled());
bridge.consumer = fpromise::consumer<int, const char*>();
EXPECT_FALSE(bridge.consumer);
EXPECT_TRUE(bridge.completer.was_canceled());
bridge.completer.complete_ok(42);
EXPECT_FALSE(bridge.completer);
}
}
TEST(BridgeTests, consumer_promise) {
// promise() when completed
{
fpromise::bridge<int, const char*> bridge;
EXPECT_TRUE(bridge.consumer);
EXPECT_FALSE(bridge.completer.was_canceled());
fpromise::promise<int, const char*> promise = bridge.consumer.promise();
EXPECT_FALSE(bridge.consumer);
EXPECT_FALSE(bridge.completer.was_canceled());
bridge.completer.complete_ok(42);
EXPECT_FALSE(bridge.completer);
fpromise::result<int, const char*> result = fpromise::run_single_threaded(std::move(promise));
EXPECT_EQ(fpromise::result_state::ok, result.state());
EXPECT_EQ(42, result.value());
}
// promise() when abandoned
{
fpromise::bridge<int, const char*> bridge;
EXPECT_TRUE(bridge.consumer);
EXPECT_FALSE(bridge.completer.was_canceled());
fpromise::promise<int, const char*> promise = bridge.consumer.promise();
EXPECT_FALSE(bridge.consumer);
EXPECT_FALSE(bridge.completer.was_canceled());
bridge.completer.abandon();
EXPECT_FALSE(bridge.completer);
fpromise::result<int, const char*> result = fpromise::run_single_threaded(std::move(promise));
EXPECT_EQ(fpromise::result_state::pending, result.state());
}
// promise_or() when completed
{
fpromise::bridge<int, const char*> bridge;
EXPECT_TRUE(bridge.consumer);
EXPECT_FALSE(bridge.completer.was_canceled());
fpromise::promise<int, const char*> promise =
bridge.consumer.promise_or(fpromise::error("Abandoned"));
EXPECT_FALSE(bridge.consumer);
EXPECT_FALSE(bridge.completer.was_canceled());
bridge.completer.complete_ok(42);
EXPECT_FALSE(bridge.completer);
fpromise::result<int, const char*> result = fpromise::run_single_threaded(std::move(promise));
EXPECT_EQ(fpromise::result_state::ok, result.state());
EXPECT_EQ(42, result.value());
}
// promise_or() when abandoned
{
fpromise::bridge<int, const char*> bridge;
EXPECT_TRUE(bridge.consumer);
EXPECT_FALSE(bridge.completer.was_canceled());
fpromise::promise<int, const char*> promise =
bridge.consumer.promise_or(fpromise::error("Abandoned"));
EXPECT_FALSE(bridge.consumer);
EXPECT_FALSE(bridge.completer.was_canceled());
bridge.completer.abandon();
EXPECT_FALSE(bridge.completer);
fpromise::result<int, const char*> result = fpromise::run_single_threaded(std::move(promise));
EXPECT_EQ(fpromise::result_state::error, result.state());
EXPECT_STREQ("Abandoned", result.error());
}
}
TEST(BridgeTests, schedule_for_consumer) {
// Promise completes normally.
{
uint64_t run_count[2] = {};
fpromise::single_threaded_executor executor;
fpromise::consumer<int> consumer = fpromise::schedule_for_consumer(
&executor, fpromise::make_promise([&](fpromise::context& context) {
ASSERT_CRITICAL(context.executor() == &executor);
run_count[0]++;
return fpromise::ok(42);
}));
EXPECT_EQ(0, run_count[0]);
auto t = std::thread([&] { executor.run(); });
fpromise::run_single_threaded(consumer.promise().then(
[&](fpromise::context& context, const fpromise::result<int>& result) {
ASSERT_CRITICAL(context.executor() != &executor);
ASSERT_CRITICAL(result.value() == 42);
run_count[1]++;
}));
EXPECT_EQ(1, run_count[0]);
EXPECT_EQ(1, run_count[1]);
t.join();
}
// Promise abandons its task so the consumer is abandoned too.
{
uint64_t run_count[2] = {};
fpromise::single_threaded_executor executor;
fpromise::consumer<int> consumer = fpromise::schedule_for_consumer(
&executor, fpromise::make_promise([&](fpromise::context& context) -> fpromise::result<int> {
ASSERT_CRITICAL(context.executor() == &executor);
run_count[0]++;
// The task will be abandoned after we return since
// we do not acquire a susended task token for it.
return fpromise::pending();
}));
EXPECT_EQ(0, run_count[0]);
auto t = std::thread([&] { executor.run(); });
fpromise::run_single_threaded(consumer.promise().then(
[&](fpromise::context& context, const fpromise::result<int>& result) {
// This should not run because the promise was abandoned.
run_count[1]++;
}));
EXPECT_EQ(1, run_count[0]);
EXPECT_EQ(0, run_count[1]);
t.join();
}
// Promise abandons its task so the consumer is abandoned too
// but this time we use promise_or() so we can handle the abandonment.
{
uint64_t run_count[2] = {};
fpromise::single_threaded_executor executor;
fpromise::consumer<int> consumer = fpromise::schedule_for_consumer(
&executor, fpromise::make_promise([&](fpromise::context& context) -> fpromise::result<int> {
ASSERT_CRITICAL(context.executor() == &executor);
run_count[0]++;
// The task will be abandoned after we return since
// we do not acquire a susended task token for it.
return fpromise::pending();
}));
EXPECT_EQ(0, run_count[0]);
auto t = std::thread([&] { executor.run(); });
fpromise::run_single_threaded(
consumer.promise_or(fpromise::error())
.then([&](fpromise::context& context, const fpromise::result<int>& result) {
ASSERT_CRITICAL(context.executor() != &executor);
ASSERT_CRITICAL(result.is_error());
run_count[1]++;
}));
EXPECT_EQ(1, run_count[0]);
EXPECT_EQ(1, run_count[1]);
t.join();
}
}
} // namespace