| // Copyright 2021 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "src/devices/bin/driver_runtime/dispatcher.h" |
| |
| #include <lib/async/cpp/irq.h> |
| #include <lib/async/cpp/wait.h> |
| #include <lib/async/task.h> |
| #include <lib/fdf/arena.h> |
| #include <lib/fdf/channel.h> |
| #include <lib/fdf/cpp/channel_read.h> |
| #include <lib/fdf/cpp/dispatcher.h> |
| #include <lib/fdf/cpp/internal.h> |
| #include <lib/fit/defer.h> |
| #include <lib/sync/cpp/completion.h> |
| #include <lib/zx/event.h> |
| #include <lib/zx/interrupt.h> |
| #include <zircon/errors.h> |
| |
| #include <thread> |
| |
| #include <zxtest/zxtest.h> |
| |
| #include "lib/async/cpp/task.h" |
| #include "lib/fdf/dispatcher.h" |
| #include "lib/zx/time.h" |
| #include "src/devices/bin/driver_runtime/driver_context.h" |
| #include "src/devices/bin/driver_runtime/runtime_test_case.h" |
| |
| class DispatcherTest : public RuntimeTestCase { |
| public: |
| DispatcherTest() : config_(MakeConfig()), loop_(&config_) {} |
| |
| void SetUp() override; |
| void TearDown() override; |
| |
| // Creates a dispatcher and returns it in |out_dispatcher|. |
| // The dispatcher will automatically be destroyed in |TearDown|. |
| void CreateDispatcher(uint32_t options, const char* scheduler_role, const void* owner, |
| fdf_dispatcher_t** out_dispatcher); |
| |
| // Registers an async read, which on callback will acquire |lock| and read from |read_channel|. |
| // If |reply_channel| is not null, it will write an empty message. |
| // If |completion| is not null, it will signal before returning from the callback. |
| static void RegisterAsyncReadReply(fdf_handle_t read_channel, fdf_dispatcher_t* dispatcher, |
| fbl::Mutex* lock, |
| fdf_handle_t reply_channel = ZX_HANDLE_INVALID, |
| sync_completion_t* completion = nullptr); |
| |
| // Registers an async read, which on callback will acquire |lock|, read from |read_channel| and |
| // signal |completion|. |
| static void RegisterAsyncReadSignal(fdf_handle_t read_channel, fdf_dispatcher_t* dispatcher, |
| fbl::Mutex* lock, sync_completion_t* completion) { |
| return RegisterAsyncReadReply(read_channel, dispatcher, lock, ZX_HANDLE_INVALID, completion); |
| } |
| |
| // Registers an async read, which on callback will signal |entered_callback| and block |
| // until |complete_blocking_read| is signaled. |
| static void RegisterAsyncReadBlock(fdf_handle_t ch, fdf_dispatcher_t* dispatcher, |
| libsync::Completion* entered_callback, |
| libsync::Completion* complete_blocking_read); |
| |
| static constexpr async_loop_config_t MakeConfig() { |
| async_loop_config_t config = kAsyncLoopConfigNoAttachToCurrentThread; |
| config.irq_support = true; |
| return config; |
| } |
| |
| fdf_handle_t local_ch_; |
| fdf_handle_t remote_ch_; |
| |
| fdf_handle_t local_ch2_; |
| fdf_handle_t remote_ch2_; |
| |
| async_loop_config_t config_; |
| async::Loop loop_; |
| std::vector<fdf_dispatcher_t*> dispatchers_; |
| std::vector<std::unique_ptr<DispatcherShutdownObserver>> observers_; |
| }; |
| |
| void DispatcherTest::SetUp() { |
| ASSERT_EQ(ZX_OK, fdf_channel_create(0, &local_ch_, &remote_ch_)); |
| ASSERT_EQ(ZX_OK, fdf_channel_create(0, &local_ch2_, &remote_ch2_)); |
| |
| loop_.StartThread(); |
| } |
| |
| void DispatcherTest::TearDown() { |
| if (local_ch_) { |
| fdf_handle_close(local_ch_); |
| } |
| if (remote_ch_) { |
| fdf_handle_close(remote_ch_); |
| } |
| if (local_ch2_) { |
| fdf_handle_close(local_ch2_); |
| } |
| if (remote_ch2_) { |
| fdf_handle_close(remote_ch2_); |
| } |
| |
| loop_.StartThread(); // Make sure an async loop thread is running for dispatcher destruction. |
| |
| for (auto* dispatcher : dispatchers_) { |
| fdf_dispatcher_shutdown_async(dispatcher); |
| } |
| for (auto& observer : observers_) { |
| ASSERT_OK(observer->WaitUntilShutdown()); |
| } |
| for (auto* dispatcher : dispatchers_) { |
| fdf_dispatcher_destroy(dispatcher); |
| } |
| |
| loop_.Quit(); |
| loop_.JoinThreads(); |
| } |
| |
| void DispatcherTest::CreateDispatcher(uint32_t options, const char* scheduler_role, |
| const void* owner, fdf_dispatcher_t** out_dispatcher) { |
| auto observer = std::make_unique<DispatcherShutdownObserver>(); |
| driver_runtime::Dispatcher* dispatcher; |
| ASSERT_EQ(ZX_OK, driver_runtime::Dispatcher::CreateWithLoop( |
| options, scheduler_role, strlen(scheduler_role), owner, &loop_, |
| observer->fdf_observer(), &dispatcher)); |
| *out_dispatcher = static_cast<fdf_dispatcher_t*>(dispatcher); |
| dispatchers_.push_back(*out_dispatcher); |
| observers_.push_back(std::move(observer)); |
| } |
| |
| // static |
| void DispatcherTest::RegisterAsyncReadReply(fdf_handle_t read_channel, fdf_dispatcher_t* dispatcher, |
| fbl::Mutex* lock, fdf_handle_t reply_channel, |
| sync_completion_t* completion) { |
| auto channel_read = std::make_unique<fdf::ChannelRead>( |
| read_channel, 0 /* options */, |
| [=](fdf_dispatcher_t* dispatcher, fdf::ChannelRead* channel_read, fdf_status_t status) { |
| ASSERT_OK(status); |
| |
| { |
| fbl::AutoLock auto_lock(lock); |
| |
| ASSERT_NO_FATAL_FAILURE(AssertRead(channel_read->channel(), nullptr, 0, nullptr, 0)); |
| if (reply_channel != ZX_HANDLE_INVALID) { |
| ASSERT_EQ(ZX_OK, fdf_channel_write(reply_channel, 0, nullptr, nullptr, 0, nullptr, 0)); |
| } |
| } |
| if (completion) { |
| sync_completion_signal(completion); |
| } |
| delete channel_read; |
| }); |
| ASSERT_OK(channel_read->Begin(dispatcher)); |
| channel_read.release(); // Deleted on callback. |
| } |
| |
| // static |
| void DispatcherTest::RegisterAsyncReadBlock(fdf_handle_t ch, fdf_dispatcher_t* dispatcher, |
| libsync::Completion* entered_callback, |
| libsync::Completion* complete_blocking_read) { |
| auto channel_read = std::make_unique<fdf::ChannelRead>( |
| ch, 0 /* options */, |
| [=](fdf_dispatcher_t* dispatcher, fdf::ChannelRead* channel_read, fdf_status_t status) { |
| ASSERT_OK(status); |
| entered_callback->Signal(); |
| ASSERT_OK(complete_blocking_read->Wait(zx::time::infinite())); |
| delete channel_read; |
| }); |
| ASSERT_OK(channel_read->Begin(dispatcher)); |
| channel_read.release(); // Will be deleted on callback. |
| } |
| |
| // |
| // Synchronous dispatcher tests |
| // |
| |
| // Tests that a synchronous dispatcher will call directly into the next driver |
| // if it is not reentrant. |
| // This creates 2 drivers and writes a message between them. |
| TEST_F(DispatcherTest, SyncDispatcherDirectCall) { |
| const void* local_driver = CreateFakeDriver(); |
| const void* remote_driver = CreateFakeDriver(); |
| |
| // We should bypass the async loop, so quit it now to make sure we don't use it. |
| loop_.Quit(); |
| loop_.JoinThreads(); |
| loop_.ResetQuit(); |
| |
| fdf_dispatcher_t* dispatcher; |
| ASSERT_NO_FATAL_FAILURE(CreateDispatcher(0, "scheduler_role", local_driver, &dispatcher)); |
| |
| sync_completion_t read_completion; |
| ASSERT_NO_FATAL_FAILURE(SignalOnChannelReadable(local_ch_, dispatcher, &read_completion)); |
| |
| { |
| driver_context::PushDriver(remote_driver); |
| auto pop_driver = fit::defer([]() { driver_context::PopDriver(); }); |
| // As |local_driver| is not in the thread's call stack, |
| // this should call directly into local driver's channel_read callback. |
| ASSERT_EQ(ZX_OK, fdf_channel_write(remote_ch_, 0, nullptr, nullptr, 0, nullptr, 0)); |
| ASSERT_OK(sync_completion_wait(&read_completion, ZX_TIME_INFINITE)); |
| } |
| } |
| |
| // Tests that a synchronous dispatcher will queue a request on the async loop if it is reentrant. |
| // This writes and reads a message from the same driver. |
| TEST_F(DispatcherTest, SyncDispatcherCallOnLoop) { |
| const void* driver = CreateFakeDriver(); |
| |
| loop_.Quit(); |
| loop_.JoinThreads(); |
| loop_.ResetQuit(); |
| |
| fdf_dispatcher_t* dispatcher; |
| ASSERT_NO_FATAL_FAILURE(CreateDispatcher(0, "scheduler_role", driver, &dispatcher)); |
| |
| sync_completion_t read_completion; |
| ASSERT_NO_FATAL_FAILURE(SignalOnChannelReadable(local_ch_, dispatcher, &read_completion)); |
| |
| { |
| // Add the same driver to the thread's call stack. |
| driver_context::PushDriver(driver); |
| auto pop_driver = fit::defer([]() { driver_context::PopDriver(); }); |
| |
| // This should queue the callback to run on an async loop thread. |
| ASSERT_EQ(ZX_OK, fdf_channel_write(remote_ch_, 0, nullptr, nullptr, 0, nullptr, 0)); |
| // Check that the callback hasn't been called yet, as we shutdown the async loop. |
| ASSERT_FALSE(sync_completion_signaled(&read_completion)); |
| ASSERT_EQ(1, dispatcher->callback_queue_size_slow()); |
| } |
| |
| loop_.StartThread(); |
| ASSERT_OK(sync_completion_wait(&read_completion, ZX_TIME_INFINITE)); |
| } |
| |
| // Tests that a synchronous dispatcher only allows one callback to be running at a time. |
| // We will register a callback that blocks and one that doesn't. We will then send |
| // 2 requests, and check that the second callback is not run until the first returns. |
| TEST_F(DispatcherTest, SyncDispatcherDisallowsParallelCallbacks) { |
| const void* driver = CreateFakeDriver(); |
| fdf_dispatcher_t* dispatcher; |
| ASSERT_NO_FATAL_FAILURE(CreateDispatcher(0, "scheduler_role", driver, &dispatcher)); |
| |
| // We shouldn't actually block on a dispatcher that doesn't have ALLOW_SYNC_CALLS set, |
| // but this is just for synchronizing the test. |
| libsync::Completion entered_callback; |
| libsync::Completion complete_blocking_read; |
| ASSERT_NO_FATAL_FAILURE( |
| RegisterAsyncReadBlock(local_ch_, dispatcher, &entered_callback, &complete_blocking_read)); |
| |
| sync_completion_t read_completion; |
| ASSERT_NO_FATAL_FAILURE(SignalOnChannelReadable(local_ch2_, dispatcher, &read_completion)); |
| |
| { |
| // This should make the callback run on the async loop, as it would be reentrant. |
| driver_context::PushDriver(driver); |
| auto pop_driver = fit::defer([]() { driver_context::PopDriver(); }); |
| ASSERT_EQ(ZX_OK, fdf_channel_write(remote_ch_, 0, nullptr, nullptr, 0, nullptr, 0)); |
| } |
| |
| ASSERT_OK(entered_callback.Wait(zx::time::infinite())); |
| |
| // Write another request. This should also be queued on the async loop. |
| std::thread t1 = std::thread([&] { |
| // Make the call not reentrant. |
| driver_context::PushDriver(CreateFakeDriver()); |
| auto pop_driver = fit::defer([]() { driver_context::PopDriver(); }); |
| ASSERT_EQ(ZX_OK, fdf_channel_write(remote_ch2_, 0, nullptr, nullptr, 0, nullptr, 0)); |
| }); |
| |
| // The dispatcher should not call the callback while there is an existing callback running, |
| // so we should be able to join with the thread immediately. |
| t1.join(); |
| ASSERT_FALSE(sync_completion_signaled(&read_completion)); |
| |
| // Complete the first callback. |
| complete_blocking_read.Signal(); |
| |
| // The second callback should complete now. |
| ASSERT_OK(sync_completion_wait(&read_completion, ZX_TIME_INFINITE)); |
| } |
| |
| // Tests that a synchronous dispatcher does not schedule parallel callbacks on the async loop. |
| TEST_F(DispatcherTest, SyncDispatcherDisallowsParallelCallbacksReentrant) { |
| loop_.Quit(); |
| loop_.JoinThreads(); |
| loop_.ResetQuit(); |
| |
| constexpr uint32_t kNumThreads = 2; |
| constexpr uint32_t kNumClients = 12; |
| |
| const void* driver = CreateFakeDriver(); |
| fdf_dispatcher_t* dispatcher; |
| ASSERT_NO_FATAL_FAILURE(CreateDispatcher(0, "scheduler_role", driver, &dispatcher)); |
| |
| struct ReadClient { |
| fdf_handle_t channel; |
| libsync::Completion entered_callback; |
| libsync::Completion complete_blocking_read; |
| }; |
| |
| std::vector<ReadClient> local(kNumClients); |
| std::vector<fdf_handle_t> remote(kNumClients); |
| |
| for (uint32_t i = 0; i < kNumClients; i++) { |
| ASSERT_OK(fdf_channel_create(0, &local[i].channel, &remote[i])); |
| ASSERT_NO_FATAL_FAILURE(RegisterAsyncReadBlock(local[i].channel, dispatcher, |
| &local[i].entered_callback, |
| &local[i].complete_blocking_read)); |
| } |
| |
| for (uint32_t i = 0; i < kNumClients; i++) { |
| // Call is considered reentrant and will be queued on the async loop. |
| ASSERT_EQ(ZX_OK, fdf_channel_write(remote[i], 0, nullptr, nullptr, 0, nullptr, 0)); |
| } |
| |
| for (uint32_t i = 0; i < kNumThreads; i++) { |
| loop_.StartThread(); |
| } |
| |
| ASSERT_OK(local[0].entered_callback.Wait(zx::time::infinite())); |
| local[0].complete_blocking_read.Signal(); |
| |
| // Check that we aren't blocking the second thread by posting a task to another |
| // dispatcher. |
| fdf_dispatcher_t* dispatcher2; |
| ASSERT_NO_FATAL_FAILURE(CreateDispatcher(0, "scheduler_role", driver, &dispatcher2)); |
| async_dispatcher_t* async_dispatcher = fdf_dispatcher_get_async_dispatcher(dispatcher2); |
| ASSERT_NOT_NULL(async_dispatcher); |
| |
| sync_completion_t task_completion; |
| ASSERT_OK(async::PostTask(async_dispatcher, |
| [&task_completion] { sync_completion_signal(&task_completion); })); |
| ASSERT_OK(sync_completion_wait(&task_completion, ZX_TIME_INFINITE)); |
| |
| // Allow all the read callbacks to complete. |
| for (uint32_t i = 1; i < kNumClients; i++) { |
| local[i].complete_blocking_read.Signal(); |
| } |
| |
| for (uint32_t i = 0; i < kNumClients; i++) { |
| ASSERT_OK(local[i].entered_callback.Wait(zx::time::infinite())); |
| } |
| |
| fdf_internal_wait_until_dispatcher_idle(dispatcher); |
| fdf_internal_wait_until_dispatcher_idle(dispatcher2); |
| |
| for (uint32_t i = 0; i < kNumClients; i++) { |
| fdf_handle_close(local[i].channel); |
| fdf_handle_close(remote[i]); |
| } |
| } |
| |
| // |
| // Unsynchronized dispatcher tests |
| // |
| |
| // Tests that an unsynchronized dispatcher allows multiple callbacks to run at the same time. |
| // We will send requests from multiple threads and check that the expected number of callbacks |
| // is running. |
| TEST_F(DispatcherTest, UnsyncDispatcherAllowsParallelCallbacks) { |
| const void* driver = CreateFakeDriver(); |
| fdf_dispatcher_t* dispatcher; |
| ASSERT_NO_FATAL_FAILURE(CreateDispatcher(FDF_DISPATCHER_OPTION_UNSYNCHRONIZED, "scheduler_role", |
| driver, &dispatcher)); |
| |
| constexpr uint32_t kNumClients = 10; |
| |
| std::vector<fdf_handle_t> local(kNumClients); |
| std::vector<fdf_handle_t> remote(kNumClients); |
| |
| for (uint32_t i = 0; i < kNumClients; i++) { |
| ASSERT_OK(fdf_channel_create(0, &local[i], &remote[i])); |
| } |
| |
| fbl::Mutex callback_lock; |
| uint32_t num_callbacks = 0; |
| sync_completion_t completion; |
| |
| for (uint32_t i = 0; i < kNumClients; i++) { |
| auto channel_read = std::make_unique<fdf::ChannelRead>( |
| local[i], 0 /* options */, |
| [&](fdf_dispatcher_t* dispatcher, fdf::ChannelRead* channel_read, fdf_status_t status) { |
| { |
| fbl::AutoLock lock(&callback_lock); |
| num_callbacks++; |
| if (num_callbacks == kNumClients) { |
| sync_completion_signal(&completion); |
| } |
| } |
| // Wait for all threads to ensure we are correctly supporting parallel callbacks. |
| ASSERT_OK(sync_completion_wait(&completion, ZX_TIME_INFINITE)); |
| delete channel_read; |
| }); |
| ASSERT_OK(channel_read->Begin(dispatcher)); |
| channel_read.release(); // Deleted by the callback. |
| } |
| |
| std::vector<std::thread> threads; |
| for (uint32_t i = 0; i < kNumClients; i++) { |
| std::thread client = std::thread( |
| [&](fdf_handle_t channel) { |
| { |
| // Ensure the call is not reentrant. |
| driver_context::PushDriver(CreateFakeDriver()); |
| auto pop_driver = fit::defer([]() { driver_context::PopDriver(); }); |
| ASSERT_EQ(ZX_OK, fdf_channel_write(channel, 0, nullptr, nullptr, 0, nullptr, 0)); |
| } |
| }, |
| remote[i]); |
| threads.push_back(std::move(client)); |
| } |
| |
| for (auto& t : threads) { |
| t.join(); |
| } |
| |
| for (uint32_t i = 0; i < kNumClients; i++) { |
| fdf_handle_close(local[i]); |
| fdf_handle_close(remote[i]); |
| } |
| } |
| |
| // Tests that an unsynchronized dispatcher allows multiple callbacks to run at the same time |
| // on the async loop. |
| TEST_F(DispatcherTest, UnsyncDispatcherAllowsParallelCallbacksReentrant) { |
| loop_.Quit(); |
| loop_.JoinThreads(); |
| loop_.ResetQuit(); |
| |
| constexpr uint32_t kNumThreads = 3; |
| constexpr uint32_t kNumClients = 22; |
| |
| const void* driver = CreateFakeDriver(); |
| fdf_dispatcher_t* dispatcher; |
| ASSERT_NO_FATAL_FAILURE(CreateDispatcher(FDF_DISPATCHER_OPTION_UNSYNCHRONIZED, "scheduler_role", |
| driver, &dispatcher)); |
| |
| std::vector<fdf_handle_t> local(kNumClients); |
| std::vector<fdf_handle_t> remote(kNumClients); |
| |
| for (uint32_t i = 0; i < kNumClients; i++) { |
| ASSERT_OK(fdf_channel_create(0, &local[i], &remote[i])); |
| } |
| |
| fbl::Mutex callback_lock; |
| uint32_t num_callbacks = 0; |
| sync_completion_t all_threads_running; |
| |
| for (uint32_t i = 0; i < kNumClients; i++) { |
| auto channel_read = std::make_unique<fdf::ChannelRead>( |
| local[i], 0 /* options */, |
| [&](fdf_dispatcher_t* dispatcher, fdf::ChannelRead* channel_read, fdf_status_t status) { |
| { |
| fbl::AutoLock lock(&callback_lock); |
| num_callbacks++; |
| if (num_callbacks == kNumThreads) { |
| sync_completion_signal(&all_threads_running); |
| } |
| } |
| // Wait for all threads to ensure we are correctly supporting parallel callbacks. |
| ASSERT_OK(sync_completion_wait(&all_threads_running, ZX_TIME_INFINITE)); |
| delete channel_read; |
| }); |
| ASSERT_OK(channel_read->Begin(dispatcher)); |
| channel_read.release(); // Deleted by the callback. |
| } |
| |
| for (uint32_t i = 0; i < kNumClients; i++) { |
| // Call is considered reentrant and will be queued on the async loop. |
| ASSERT_EQ(ZX_OK, fdf_channel_write(remote[i], 0, nullptr, nullptr, 0, nullptr, 0)); |
| } |
| |
| for (uint32_t i = 0; i < kNumThreads; i++) { |
| loop_.StartThread(); |
| } |
| |
| ASSERT_OK(sync_completion_wait(&all_threads_running, ZX_TIME_INFINITE)); |
| fdf_internal_wait_until_dispatcher_idle(dispatcher); |
| ASSERT_EQ(num_callbacks, kNumClients); |
| |
| for (uint32_t i = 0; i < kNumClients; i++) { |
| fdf_handle_close(local[i]); |
| fdf_handle_close(remote[i]); |
| } |
| } |
| |
| // |
| // Blocking dispatcher tests |
| // |
| |
| // Tests that a blocking dispatcher will not directly call into the next driver. |
| TEST_F(DispatcherTest, AllowSyncCallsDoesNotDirectlyCall) { |
| const void* blocking_driver = CreateFakeDriver(); |
| fdf_dispatcher_t* blocking_dispatcher; |
| ASSERT_NO_FATAL_FAILURE(CreateDispatcher(FDF_DISPATCHER_OPTION_ALLOW_SYNC_CALLS, "scheduler_role", |
| blocking_driver, &blocking_dispatcher)); |
| |
| // Queue a blocking request. |
| libsync::Completion entered_callback; |
| libsync::Completion complete_blocking_read; |
| ASSERT_NO_FATAL_FAILURE(RegisterAsyncReadBlock(remote_ch_, blocking_dispatcher, &entered_callback, |
| &complete_blocking_read)); |
| |
| { |
| // Simulate a driver writing a message to the driver with the blocking dispatcher. |
| driver_context::PushDriver(CreateFakeDriver()); |
| auto pop_driver = fit::defer([]() { driver_context::PopDriver(); }); |
| |
| // This is a non reentrant call, but we still shouldn't call into the driver directly. |
| ASSERT_EQ(ZX_OK, fdf_channel_write(local_ch_, 0, nullptr, nullptr, 0, nullptr, 0)); |
| } |
| |
| ASSERT_OK(entered_callback.Wait(zx::time::infinite())); |
| |
| // Signal and wait for the blocking read handler to return. |
| complete_blocking_read.Signal(); |
| |
| fdf_internal_wait_until_dispatcher_idle(blocking_dispatcher); |
| } |
| |
| // Tests that a blocking dispatcher does not block the global async loop shared between |
| // all dispatchers in a process. |
| // We will register a blocking callback, and ensure we can receive other callbacks |
| // at the same time. |
| TEST_F(DispatcherTest, AllowSyncCallsDoesNotBlockGlobalLoop) { |
| const void* driver = CreateFakeDriver(); |
| fdf_dispatcher_t* dispatcher; |
| ASSERT_NO_FATAL_FAILURE(CreateDispatcher(0, "scheduler_role", driver, &dispatcher)); |
| |
| const void* blocking_driver = CreateFakeDriver(); |
| fdf_dispatcher_t* blocking_dispatcher; |
| ASSERT_NO_FATAL_FAILURE(CreateDispatcher(FDF_DISPATCHER_OPTION_ALLOW_SYNC_CALLS, "scheduler_role", |
| blocking_driver, &blocking_dispatcher)); |
| |
| fdf_handle_t blocking_local_ch, blocking_remote_ch; |
| ASSERT_EQ(ZX_OK, fdf_channel_create(0, &blocking_local_ch, &blocking_remote_ch)); |
| |
| // Queue a blocking read. |
| libsync::Completion entered_callback; |
| libsync::Completion complete_blocking_read; |
| ASSERT_NO_FATAL_FAILURE(RegisterAsyncReadBlock(blocking_remote_ch, blocking_dispatcher, |
| &entered_callback, &complete_blocking_read)); |
| |
| // Write a message for the blocking dispatcher. |
| { |
| driver_context::PushDriver(blocking_driver); |
| auto pop_driver = fit::defer([]() { driver_context::PopDriver(); }); |
| ASSERT_EQ(ZX_OK, fdf_channel_write(blocking_local_ch, 0, nullptr, nullptr, 0, nullptr, 0)); |
| } |
| |
| ASSERT_OK(entered_callback.Wait(zx::time::infinite())); |
| |
| sync_completion_t read_completion; |
| ASSERT_NO_FATAL_FAILURE(SignalOnChannelReadable(remote_ch_, dispatcher, &read_completion)); |
| |
| { |
| // Write a message which will be read on the non-blocking dispatcher. |
| // Make the call reentrant so that the request is queued for the async loop. |
| driver_context::PushDriver(driver); |
| auto pop_driver = fit::defer([]() { driver_context::PopDriver(); }); |
| ASSERT_EQ(ZX_OK, fdf_channel_write(local_ch_, 0, nullptr, nullptr, 0, nullptr, 0)); |
| } |
| |
| ASSERT_OK(sync_completion_wait(&read_completion, ZX_TIME_INFINITE)); |
| ASSERT_NO_FATAL_FAILURE(AssertRead(remote_ch_, nullptr, 0, nullptr, 0)); |
| |
| // Signal and wait for the blocking read handler to return. |
| complete_blocking_read.Signal(); |
| |
| fdf_internal_wait_until_dispatcher_idle(dispatcher); |
| fdf_internal_wait_until_dispatcher_idle(blocking_dispatcher); |
| |
| fdf_handle_close(blocking_local_ch); |
| fdf_handle_close(blocking_remote_ch); |
| } |
| |
| // |
| // Additional re-entrancy tests |
| // |
| |
| // Tests sending a request to another driver and receiving a reply across a single channel. |
| TEST_F(DispatcherTest, ReentrancySimpleSendAndReply) { |
| // Create a dispatcher for each end of the channel. |
| const void* driver = CreateFakeDriver(); |
| fdf_dispatcher_t* dispatcher; |
| ASSERT_NO_FATAL_FAILURE(CreateDispatcher(0, "scheduler_role", driver, &dispatcher)); |
| |
| const void* driver2 = CreateFakeDriver(); |
| fdf_dispatcher_t* dispatcher2; |
| ASSERT_NO_FATAL_FAILURE(CreateDispatcher(0, "scheduler_role", driver2, &dispatcher2)); |
| |
| // Lock that is acquired by the first driver whenever it writes or reads from |local_ch_|. |
| // We shouldn't need to lock in a synchronous dispatcher, but this is just for testing |
| // that the dispatcher handles reentrant calls. If the dispatcher attempts to call |
| // reentrantly, this test will deadlock. |
| fbl::Mutex driver_lock; |
| fbl::Mutex driver2_lock; |
| sync_completion_t completion; |
| |
| ASSERT_NO_FATAL_FAILURE( |
| RegisterAsyncReadSignal(local_ch_, dispatcher, &driver_lock, &completion)); |
| ASSERT_NO_FATAL_FAILURE( |
| RegisterAsyncReadReply(remote_ch_, dispatcher2, &driver2_lock, remote_ch_)); |
| |
| { |
| driver_context::PushDriver(driver); |
| auto pop_driver = fit::defer([]() { driver_context::PopDriver(); }); |
| |
| fbl::AutoLock lock(&driver_lock); |
| // This should call directly into the next driver. When the driver writes its reply, |
| // the dispatcher should detect that it is reentrant and queue it to be run on the |
| // async loop. This will allow |fdf_channel_write| to return and |driver_lock| will |
| // be released. |
| ASSERT_EQ(ZX_OK, fdf_channel_write(local_ch_, 0, nullptr, nullptr, 0, nullptr, 0)); |
| } |
| |
| ASSERT_OK(sync_completion_wait(&completion, ZX_TIME_INFINITE)); |
| } |
| |
| // Tests sending a request to another driver, who sends a request back into the original driver |
| // on a different channel. |
| TEST_F(DispatcherTest, ReentrancyMultipleDriversAndDispatchers) { |
| // Driver will own |local_ch_| and |local_ch2_|. |
| const void* driver = CreateFakeDriver(); |
| fdf_dispatcher_t* dispatcher; |
| ASSERT_NO_FATAL_FAILURE(CreateDispatcher(0, "scheduler_role", driver, &dispatcher)); |
| |
| // Driver2 will own |remote_ch_| and |remote_ch2_|. |
| const void* driver2 = CreateFakeDriver(); |
| fdf_dispatcher_t* dispatcher2; |
| ASSERT_NO_FATAL_FAILURE(CreateDispatcher(0, "scheduler_role", driver2, &dispatcher2)); |
| |
| // Lock that is acquired by the driver whenever it writes or reads from its channels. |
| // We shouldn't need to lock in a synchronous dispatcher, but this is just for testing |
| // that the dispatcher handles reentrant calls. If the dispatcher attempts to call |
| // reentrantly, this test will deadlock. |
| fbl::Mutex driver_lock; |
| fbl::Mutex driver2_lock; |
| sync_completion_t completion; |
| |
| ASSERT_NO_FATAL_FAILURE( |
| RegisterAsyncReadSignal(local_ch2_, dispatcher, &driver_lock, &completion)); |
| ASSERT_NO_FATAL_FAILURE( |
| RegisterAsyncReadReply(remote_ch_, dispatcher2, &driver2_lock, remote_ch2_)); |
| |
| { |
| driver_context::PushDriver(driver); |
| auto pop_driver = fit::defer([]() { driver_context::PopDriver(); }); |
| |
| fbl::AutoLock lock(&driver_lock); |
| // This should call directly into the next driver. When the driver writes its reply, |
| // the dispatcher should detect that it is reentrant and queue it to be run on the |
| // async loop. This will allow |fdf_channel_write| to return and |driver_lock| will |
| // be released. |
| ASSERT_EQ(ZX_OK, fdf_channel_write(local_ch_, 0, nullptr, nullptr, 0, nullptr, 0)); |
| } |
| |
| ASSERT_OK(sync_completion_wait(&completion, ZX_TIME_INFINITE)); |
| } |
| |
| // Tests a driver sending a request to another channel it owns. |
| TEST_F(DispatcherTest, ReentrancyOneDriverMultipleChannels) { |
| const void* driver = CreateFakeDriver(); |
| fdf_dispatcher_t* dispatcher; |
| ASSERT_NO_FATAL_FAILURE(CreateDispatcher(0, "scheduler_role", driver, &dispatcher)); |
| |
| // Lock that is acquired by the driver whenever it writes or reads from its channels. |
| // We shouldn't need to lock in a synchronous dispatcher, but this is just for testing |
| // that the dispatcher handles reentrant calls. If the dispatcher attempts to call |
| // reentrantly, this test will deadlock. |
| fbl::Mutex driver_lock; |
| sync_completion_t completion; |
| |
| ASSERT_NO_FATAL_FAILURE( |
| RegisterAsyncReadSignal(local_ch2_, dispatcher, &driver_lock, &completion)); |
| ASSERT_NO_FATAL_FAILURE( |
| RegisterAsyncReadReply(remote_ch_, dispatcher, &driver_lock, remote_ch2_)); |
| |
| { |
| driver_context::PushDriver(driver); |
| auto pop_driver = fit::defer([]() { driver_context::PopDriver(); }); |
| |
| fbl::AutoLock lock(&driver_lock); |
| // Every call callback in this driver will be reentrant and should be run on the async loop. |
| ASSERT_EQ(ZX_OK, fdf_channel_write(local_ch_, 0, nullptr, nullptr, 0, nullptr, 0)); |
| } |
| |
| ASSERT_OK(sync_completion_wait(&completion, ZX_TIME_INFINITE)); |
| } |
| |
| // Tests forwarding a request across many drivers, before calling back into the original driver. |
| TEST_F(DispatcherTest, ReentrancyManyDrivers) { |
| constexpr uint32_t kNumDrivers = 30; |
| |
| // Each driver i uses ch_to_prev[i] and ch_to_next[i] to communicate with the driver before and |
| // after it, except ch_to_prev[0] and ch_to_next[kNumDrivers-1]. |
| std::vector<fdf_handle_t> ch_to_prev(kNumDrivers); |
| std::vector<fdf_handle_t> ch_to_next(kNumDrivers); |
| |
| // Lock that is acquired by the driver whenever it writes or reads from its channels. |
| // We shouldn't need to lock in a synchronous dispatcher, but this is just for testing |
| // that the dispatcher handles reentrant calls. If the dispatcher attempts to call |
| // reentrantly, this test will deadlock. |
| std::vector<fbl::Mutex> driver_locks(kNumDrivers); |
| |
| for (uint32_t i = 0; i < kNumDrivers; i++) { |
| const void* driver = CreateFakeDriver(); |
| fdf_dispatcher_t* dispatcher; |
| ASSERT_NO_FATAL_FAILURE(CreateDispatcher(0, "scheduler_role", driver, &dispatcher)); |
| |
| // Get the next driver's channel which is connected to the current driver's channel. |
| // The last driver will be connected to the first driver. |
| fdf_handle_t* peer = (i == kNumDrivers - 1) ? &ch_to_prev[0] : &ch_to_prev[i + 1]; |
| ASSERT_OK(fdf_channel_create(0, &ch_to_next[i], peer)); |
| } |
| |
| // Signal once the first driver is called into. |
| sync_completion_t completion; |
| ASSERT_NO_FATAL_FAILURE(RegisterAsyncReadSignal(ch_to_prev[0], |
| static_cast<fdf_dispatcher_t*>(dispatchers_[0]), |
| &driver_locks[0], &completion)); |
| |
| // Each driver will wait for a callback, then write a message to the next driver. |
| for (uint32_t i = 1; i < kNumDrivers; i++) { |
| ASSERT_NO_FATAL_FAILURE(RegisterAsyncReadReply(ch_to_prev[i], |
| static_cast<fdf_dispatcher_t*>(dispatchers_[i]), |
| &driver_locks[i], ch_to_next[i])); |
| } |
| |
| { |
| driver_context::PushDriver(dispatchers_[0]->owner()); |
| auto pop_driver = fit::defer([]() { driver_context::PopDriver(); }); |
| |
| fbl::AutoLock lock(&driver_locks[0]); |
| // Write from the first driver. |
| // This should call directly into the next |kNumDrivers - 1| drivers. |
| ASSERT_EQ(ZX_OK, fdf_channel_write(ch_to_next[0], 0, nullptr, nullptr, 0, nullptr, 0)); |
| } |
| |
| ASSERT_OK(sync_completion_wait(&completion, ZX_TIME_INFINITE)); |
| for (uint32_t i = 0; i < kNumDrivers; i++) { |
| fdf_internal_wait_until_dispatcher_idle(dispatchers_[i]); |
| } |
| for (uint32_t i = 0; i < kNumDrivers; i++) { |
| fdf_handle_close(ch_to_prev[i]); |
| fdf_handle_close(ch_to_next[i]); |
| } |
| } |
| |
| // Tests writing a request from an unknown driver context. |
| TEST_F(DispatcherTest, EmptyCallStack) { |
| loop_.Quit(); |
| loop_.JoinThreads(); |
| loop_.ResetQuit(); |
| |
| fdf_dispatcher_t* dispatcher; |
| ASSERT_NO_FATAL_FAILURE(CreateDispatcher(0, "scheduler_role", CreateFakeDriver(), &dispatcher)); |
| |
| sync_completion_t read_completion; |
| ASSERT_NO_FATAL_FAILURE(SignalOnChannelReadable(local_ch_, dispatcher, &read_completion)); |
| |
| { |
| // Call without any recorded call stack. |
| // This should queue the callback to run on an async loop thread. |
| ASSERT_EQ(ZX_OK, fdf_channel_write(remote_ch_, 0, nullptr, nullptr, 0, nullptr, 0)); |
| ASSERT_EQ(1, dispatcher->callback_queue_size_slow()); |
| ASSERT_FALSE(sync_completion_signaled(&read_completion)); |
| } |
| |
| loop_.StartThread(); |
| ASSERT_OK(sync_completion_wait(&read_completion, ZX_TIME_INFINITE)); |
| } |
| |
| // |
| // Shutdown() tests |
| // |
| |
| // Tests shutting down a synchronized dispatcher that has a pending channel read |
| // that does not have a corresponding channel write. |
| TEST_F(DispatcherTest, SyncDispatcherShutdownBeforeWrite) { |
| libsync::Completion read_complete; |
| DispatcherShutdownObserver observer; |
| |
| const void* driver = CreateFakeDriver(); |
| auto scheduler_role = "scheduler_role"; |
| |
| driver_runtime::Dispatcher* dispatcher; |
| ASSERT_EQ(ZX_OK, driver_runtime::Dispatcher::CreateWithLoop( |
| 0, scheduler_role, strlen(scheduler_role), driver, &loop_, |
| observer.fdf_observer(), &dispatcher)); |
| |
| fdf::Dispatcher fdf_dispatcher(static_cast<fdf_dispatcher_t*>(dispatcher)); |
| |
| // Registered, but not yet ready to run. |
| auto channel_read = std::make_unique<fdf::ChannelRead>( |
| remote_ch_, 0, |
| [&](fdf_dispatcher_t* dispatcher, fdf::ChannelRead* channel_read, fdf_status_t status) { |
| ASSERT_EQ(status, ZX_ERR_CANCELED); |
| read_complete.Signal(); |
| delete channel_read; |
| }); |
| ASSERT_OK(channel_read->Begin(fdf_dispatcher.get())); |
| channel_read.release(); |
| |
| fdf_dispatcher.ShutdownAsync(); |
| |
| ASSERT_OK(read_complete.Wait(zx::time::infinite())); |
| ASSERT_OK(observer.WaitUntilShutdown()); |
| } |
| |
| // Tests shutting down a synchronized dispatcher that has a pending async wait |
| // that hasn't been signaled yet. |
| TEST_F(DispatcherTest, SyncDispatcherShutdownBeforeSignaled) { |
| libsync::Completion wait_complete; |
| DispatcherShutdownObserver observer; |
| |
| zx::event event; |
| ASSERT_OK(zx::event::create(0, &event)); |
| |
| async::WaitOnce wait(event.get(), ZX_USER_SIGNAL_0); |
| |
| const void* driver = CreateFakeDriver(); |
| auto scheduler_role = "scheduler_role"; |
| |
| driver_runtime::Dispatcher* dispatcher; |
| ASSERT_EQ(ZX_OK, driver_runtime::Dispatcher::CreateWithLoop( |
| 0, scheduler_role, strlen(scheduler_role), driver, &loop_, |
| observer.fdf_observer(), &dispatcher)); |
| |
| fdf::Dispatcher fdf_dispatcher(static_cast<fdf_dispatcher_t*>(dispatcher)); |
| |
| // Registered, but not yet signaled. |
| async_dispatcher_t* async_dispatcher = dispatcher->GetAsyncDispatcher(); |
| ASSERT_NOT_NULL(async_dispatcher); |
| |
| ASSERT_OK(wait.Begin(async_dispatcher, [&wait_complete, event = std::move(event)]( |
| async_dispatcher_t* dispatcher, async::WaitOnce* wait, |
| zx_status_t status, const zx_packet_signal_t* signal) { |
| ASSERT_STATUS(status, ZX_ERR_CANCELED); |
| wait_complete.Signal(); |
| })); |
| |
| // Shutdown the dispatcher, which should schedule cancellation of the channel read. |
| dispatcher->ShutdownAsync(); |
| |
| ASSERT_OK(wait_complete.Wait(zx::time::infinite())); |
| ASSERT_OK(observer.WaitUntilShutdown()); |
| } |
| |
| // Tests shutting down an unsynchronized dispatcher. |
| TEST_F(DispatcherTest, UnsyncDispatcherShutdown) { |
| libsync::Completion complete_task; |
| libsync::Completion read_complete; |
| |
| DispatcherShutdownObserver observer; |
| |
| const void* driver = CreateFakeDriver(); |
| auto scheduler_role = "scheduler_role"; |
| |
| driver_runtime::Dispatcher* dispatcher; |
| ASSERT_EQ(ZX_OK, driver_runtime::Dispatcher::CreateWithLoop( |
| FDF_DISPATCHER_OPTION_UNSYNCHRONIZED, scheduler_role, strlen(scheduler_role), |
| driver, &loop_, observer.fdf_observer(), &dispatcher)); |
| |
| fdf::Dispatcher fdf_dispatcher(static_cast<fdf_dispatcher_t*>(dispatcher)); |
| libsync::Completion task_started; |
| // Post a task that will block until we signal it. |
| ASSERT_OK(async::PostTask(fdf_dispatcher.async_dispatcher(), [&] { |
| task_started.Signal(); |
| ASSERT_OK(complete_task.Wait(zx::time::infinite())); |
| })); |
| // Ensure the task has been started. |
| ASSERT_OK(task_started.Wait(zx::time::infinite())); |
| |
| // Register a channel read, which should not be queued until the |
| // write happens. |
| auto channel_read = std::make_unique<fdf::ChannelRead>( |
| remote_ch_, 0, |
| [&](fdf_dispatcher_t* dispatcher, fdf::ChannelRead* channel_read, fdf_status_t status) { |
| ASSERT_EQ(status, ZX_ERR_CANCELED); |
| read_complete.Signal(); |
| delete channel_read; |
| }); |
| ASSERT_OK(channel_read->Begin(fdf_dispatcher.get())); |
| channel_read.release(); |
| |
| { |
| driver_context::PushDriver(driver); |
| auto pop_driver = fit::defer([]() { driver_context::PopDriver(); }); |
| // This should be considered reentrant and be queued on the async loop. |
| ASSERT_EQ(ZX_OK, fdf_channel_write(local_ch_, 0, nullptr, nullptr, 0, nullptr, 0)); |
| } |
| |
| fdf_dispatcher.ShutdownAsync(); |
| |
| // The cancellation should not happen until the task completes. |
| ASSERT_FALSE(read_complete.signaled()); |
| complete_task.Signal(); |
| ASSERT_OK(read_complete.Wait(zx::time::infinite())); |
| |
| ASSERT_OK(observer.WaitUntilShutdown()); |
| } |
| |
| // Tests shutting down an unsynchronized dispatcher that has a pending channel read |
| // that does not have a corresponding channel write. |
| TEST_F(DispatcherTest, UnsyncDispatcherShutdonwBeforeWrite) { |
| libsync::Completion read_complete; |
| DispatcherShutdownObserver observer; |
| |
| const void* driver = CreateFakeDriver(); |
| auto scheduler_role = "scheduler_role"; |
| |
| driver_runtime::Dispatcher* dispatcher; |
| ASSERT_EQ(ZX_OK, driver_runtime::Dispatcher::CreateWithLoop( |
| FDF_DISPATCHER_OPTION_UNSYNCHRONIZED, scheduler_role, strlen(scheduler_role), |
| driver, &loop_, observer.fdf_observer(), &dispatcher)); |
| |
| fdf::Dispatcher fdf_dispatcher(static_cast<fdf_dispatcher_t*>(dispatcher)); |
| |
| // Registered, but not yet ready to run. |
| auto channel_read = std::make_unique<fdf::ChannelRead>( |
| remote_ch_, 0, |
| [&](fdf_dispatcher_t* dispatcher, fdf::ChannelRead* channel_read, fdf_status_t status) { |
| ASSERT_EQ(status, ZX_ERR_CANCELED); |
| read_complete.Signal(); |
| delete channel_read; |
| }); |
| ASSERT_OK(channel_read->Begin(fdf_dispatcher.get())); |
| channel_read.release(); |
| |
| fdf_dispatcher.ShutdownAsync(); |
| |
| ASSERT_OK(read_complete.Wait(zx::time::infinite())); |
| ASSERT_OK(observer.WaitUntilShutdown()); |
| } |
| |
| // Tests shutting down a unsynchronized dispatcher that has a pending async wait |
| // that hasn't been signaled yet. |
| TEST_F(DispatcherTest, UnsyncDispatcherShutdownBeforeSignaled) { |
| libsync::Completion wait_complete; |
| DispatcherShutdownObserver observer; |
| |
| zx::event event; |
| ASSERT_OK(zx::event::create(0, &event)); |
| |
| async::WaitOnce wait(event.get(), ZX_USER_SIGNAL_0); |
| |
| const void* driver = CreateFakeDriver(); |
| auto scheduler_role = "scheduler_role"; |
| |
| driver_runtime::Dispatcher* dispatcher; |
| ASSERT_EQ(ZX_OK, driver_runtime::Dispatcher::CreateWithLoop( |
| FDF_DISPATCHER_OPTION_UNSYNCHRONIZED, scheduler_role, strlen(scheduler_role), |
| driver, &loop_, observer.fdf_observer(), &dispatcher)); |
| |
| fdf::Dispatcher fdf_dispatcher(static_cast<fdf_dispatcher_t*>(dispatcher)); |
| |
| // Registered, but not yet signaled. |
| async_dispatcher_t* async_dispatcher = dispatcher->GetAsyncDispatcher(); |
| ASSERT_NOT_NULL(async_dispatcher); |
| |
| ASSERT_OK(wait.Begin(async_dispatcher, [&wait_complete, event = std::move(event)]( |
| async_dispatcher_t* dispatcher, async::WaitOnce* wait, |
| zx_status_t status, const zx_packet_signal_t* signal) { |
| ASSERT_STATUS(status, ZX_ERR_CANCELED); |
| wait_complete.Signal(); |
| })); |
| |
| // Shutdown the dispatcher, which should schedule cancellation of the channel read. |
| dispatcher->ShutdownAsync(); |
| |
| ASSERT_OK(wait_complete.Wait(zx::time::infinite())); |
| ASSERT_OK(observer.WaitUntilShutdown()); |
| } |
| |
| // Tests shutting down an unsynchronized dispatcher from a channel read callback running |
| // on the async loop. |
| TEST_F(DispatcherTest, ShutdownDispatcherInAsyncLoopCallback) { |
| const void* driver = CreateFakeDriver(); |
| auto scheduler_role = "scheduler_role"; |
| |
| DispatcherShutdownObserver dispatcher_observer; |
| |
| driver_runtime::Dispatcher* dispatcher; |
| ASSERT_EQ(ZX_OK, driver_runtime::Dispatcher::CreateWithLoop( |
| FDF_DISPATCHER_OPTION_UNSYNCHRONIZED, scheduler_role, strlen(scheduler_role), |
| driver, &loop_, dispatcher_observer.fdf_observer(), &dispatcher)); |
| |
| libsync::Completion completion; |
| auto channel_read = std::make_unique<fdf::ChannelRead>( |
| remote_ch_, 0 /* options */, |
| [&](fdf_dispatcher_t* dispatcher, fdf::ChannelRead* channel_read, fdf_status_t status) { |
| ASSERT_OK(status); |
| fdf_dispatcher_shutdown_async(dispatcher); |
| completion.Signal(); |
| delete channel_read; |
| }); |
| ASSERT_OK(channel_read->Begin(static_cast<fdf_dispatcher_t*>(dispatcher))); |
| channel_read.release(); // Deleted on callback. |
| |
| { |
| // Make the write reentrant so it is scheduled to run on the async loop. |
| driver_context::PushDriver(driver); |
| auto pop_driver = fit::defer([]() { driver_context::PopDriver(); }); |
| |
| ASSERT_EQ(ZX_OK, fdf_channel_write(local_ch_, 0, nullptr, nullptr, 0, nullptr, 0)); |
| } |
| |
| ASSERT_OK(completion.Wait(zx::time::infinite())); |
| |
| ASSERT_OK(dispatcher_observer.WaitUntilShutdown()); |
| dispatcher->Destroy(); |
| } |
| |
| // Tests that attempting to shut down a dispatcher twice from callbacks does not crash. |
| TEST_F(DispatcherTest, ShutdownDispatcherFromTwoCallbacks) { |
| // Stop the async loop, so that the channel reads don't get scheduled |
| // until after we shut down the dispatcher. |
| loop_.Quit(); |
| loop_.JoinThreads(); |
| loop_.ResetQuit(); |
| |
| DispatcherShutdownObserver observer; |
| const void* driver = CreateFakeDriver(); |
| auto scheduler_role = "scheduler_role"; |
| |
| driver_runtime::Dispatcher* dispatcher; |
| ASSERT_EQ(ZX_OK, driver_runtime::Dispatcher::CreateWithLoop( |
| FDF_DISPATCHER_OPTION_UNSYNCHRONIZED, scheduler_role, strlen(scheduler_role), |
| driver, &loop_, observer.fdf_observer(), &dispatcher)); |
| |
| libsync::Completion completion; |
| auto channel_read = std::make_unique<fdf::ChannelRead>( |
| remote_ch_, 0 /* options */, |
| [&](fdf_dispatcher_t* dispatcher, fdf::ChannelRead* channel_read, fdf_status_t status) { |
| ASSERT_OK(status); |
| fdf_dispatcher_shutdown_async(dispatcher); |
| completion.Signal(); |
| }); |
| ASSERT_OK(channel_read->Begin(static_cast<fdf_dispatcher_t*>(dispatcher))); |
| |
| libsync::Completion completion2; |
| auto channel_read2 = std::make_unique<fdf::ChannelRead>( |
| remote_ch2_, 0 /* options */, |
| [&](fdf_dispatcher_t* dispatcher, fdf::ChannelRead* channel_read, fdf_status_t status) { |
| ASSERT_OK(status); |
| fdf_dispatcher_shutdown_async(dispatcher); |
| completion2.Signal(); |
| }); |
| ASSERT_OK(channel_read2->Begin(static_cast<fdf_dispatcher_t*>(dispatcher))); |
| |
| { |
| // Make the writes reentrant so they are scheduled to run on the async loop. |
| driver_context::PushDriver(driver); |
| auto pop_driver = fit::defer([]() { driver_context::PopDriver(); }); |
| |
| ASSERT_EQ(ZX_OK, fdf_channel_write(local_ch_, 0, nullptr, nullptr, 0, nullptr, 0)); |
| ASSERT_EQ(ZX_OK, fdf_channel_write(local_ch2_, 0, nullptr, nullptr, 0, nullptr, 0)); |
| } |
| |
| loop_.StartThread(); |
| |
| ASSERT_OK(completion.Wait(zx::time::infinite())); |
| ASSERT_OK(completion2.Wait(zx::time::infinite())); |
| ASSERT_OK(observer.WaitUntilShutdown()); |
| dispatcher->Destroy(); |
| } |
| |
| // Tests that queueing a ChannelRead while the dispatcher is shutting down fails. |
| TEST_F(DispatcherTest, ShutdownDispatcherQueueChannelReadCallback) { |
| // Stop the async loop, so that the channel read doesn't get scheduled |
| // until after we shut down the dispatcher. |
| loop_.Quit(); |
| loop_.JoinThreads(); |
| loop_.ResetQuit(); |
| |
| libsync::Completion read_complete; |
| DispatcherShutdownObserver observer; |
| |
| const void* driver = CreateFakeDriver(); |
| auto scheduler_role = "scheduler_role"; |
| |
| driver_runtime::Dispatcher* dispatcher; |
| ASSERT_EQ(ZX_OK, driver_runtime::Dispatcher::CreateWithLoop( |
| FDF_DISPATCHER_OPTION_UNSYNCHRONIZED, scheduler_role, strlen(scheduler_role), |
| driver, &loop_, observer.fdf_observer(), &dispatcher)); |
| |
| fdf::Dispatcher fdf_dispatcher(static_cast<fdf_dispatcher_t*>(dispatcher)); |
| |
| auto channel_read = std::make_unique<fdf::ChannelRead>( |
| remote_ch_, 0, |
| [&](fdf_dispatcher_t* dispatcher, fdf::ChannelRead* channel_read, fdf_status_t status) { |
| ASSERT_EQ(status, ZX_ERR_CANCELED); |
| // We should not be able to queue the read again. |
| ASSERT_EQ(channel_read->Begin(dispatcher), ZX_ERR_UNAVAILABLE); |
| read_complete.Signal(); |
| delete channel_read; |
| }); |
| ASSERT_OK(channel_read->Begin(fdf_dispatcher.get())); |
| channel_read.release(); // Deleted on callback. |
| |
| { |
| driver_context::PushDriver(driver); |
| auto pop_driver = fit::defer([]() { driver_context::PopDriver(); }); |
| // This should be considered reentrant and be queued on the async loop. |
| ASSERT_EQ(ZX_OK, fdf_channel_write(local_ch_, 0, nullptr, nullptr, 0, nullptr, 0)); |
| } |
| |
| fdf_dispatcher.ShutdownAsync(); |
| |
| loop_.StartThread(); |
| |
| ASSERT_OK(read_complete.Wait(zx::time::infinite())); |
| ASSERT_OK(observer.WaitUntilShutdown()); |
| } |
| |
| TEST_F(DispatcherTest, ShutdownCallbackIsNotReentrant) { |
| fbl::Mutex driver_lock; |
| |
| libsync::Completion completion; |
| auto destructed_handler = [&](fdf_dispatcher_t* dispatcher) { |
| { fbl::AutoLock lock(&driver_lock); } |
| completion.Signal(); |
| }; |
| |
| driver_context::PushDriver(CreateFakeDriver()); |
| auto pop_driver = fit::defer([]() { driver_context::PopDriver(); }); |
| |
| auto dispatcher = fdf::Dispatcher::Create(0, destructed_handler); |
| ASSERT_FALSE(dispatcher.is_error()); |
| |
| { |
| fbl::AutoLock lock(&driver_lock); |
| dispatcher->ShutdownAsync(); |
| } |
| |
| ASSERT_OK(completion.Wait(zx::time::infinite())); |
| } |
| |
| // |
| // async_dispatcher_t |
| // |
| |
| // Tests that we can use the fdf_dispatcher_t as an async_dispatcher_t. |
| TEST_F(DispatcherTest, AsyncDispatcher) { |
| fdf_dispatcher_t* dispatcher; |
| ASSERT_NO_FATAL_FAILURE(CreateDispatcher(0, "scheduler_role", CreateFakeDriver(), &dispatcher)); |
| |
| async_dispatcher_t* async_dispatcher = fdf_dispatcher_get_async_dispatcher(dispatcher); |
| ASSERT_NOT_NULL(async_dispatcher); |
| |
| sync_completion_t completion; |
| ASSERT_OK( |
| async::PostTask(async_dispatcher, [&completion] { sync_completion_signal(&completion); })); |
| ASSERT_OK(sync_completion_wait(&completion, ZX_TIME_INFINITE)); |
| } |
| |
| TEST_F(DispatcherTest, DelayedTask) { |
| fdf_dispatcher_t* dispatcher; |
| ASSERT_NO_FATAL_FAILURE(CreateDispatcher(0, "scheduler_role", CreateFakeDriver(), &dispatcher)); |
| |
| async_dispatcher_t* async_dispatcher = fdf_dispatcher_get_async_dispatcher(dispatcher); |
| ASSERT_NOT_NULL(async_dispatcher); |
| |
| sync_completion_t completion; |
| ASSERT_OK(async::PostTaskForTime( |
| async_dispatcher, [&completion] { sync_completion_signal(&completion); }, |
| zx::deadline_after(zx::msec(10)))); |
| ASSERT_OK(sync_completion_wait(&completion, ZX_TIME_INFINITE)); |
| } |
| |
| TEST_F(DispatcherTest, TasksDoNotCallDirectly) { |
| fdf_dispatcher_t* dispatcher; |
| ASSERT_NO_FATAL_FAILURE(CreateDispatcher(0, "scheduler_role", CreateFakeDriver(), &dispatcher)); |
| |
| async_dispatcher_t* async_dispatcher = fdf_dispatcher_get_async_dispatcher(dispatcher); |
| ASSERT_NOT_NULL(async_dispatcher); |
| |
| loop_.Quit(); |
| loop_.JoinThreads(); |
| loop_.ResetQuit(); |
| |
| driver_context::PushDriver(CreateFakeDriver()); |
| auto pop_driver = fit::defer([]() { driver_context::PopDriver(); }); |
| |
| libsync::Completion completion; |
| ASSERT_OK(async::PostTask(async_dispatcher, [&completion] { completion.Signal(); })); |
| ASSERT_FALSE(completion.signaled()); |
| |
| loop_.StartThread(); |
| completion.Wait(); |
| } |
| |
| TEST_F(DispatcherTest, FromAsyncDispatcher) { |
| fdf_dispatcher_t* dispatcher; |
| ASSERT_NO_FATAL_FAILURE(CreateDispatcher(0, "scheduler_role", CreateFakeDriver(), &dispatcher)); |
| |
| async_dispatcher_t* async_dispatcher = fdf_dispatcher_get_async_dispatcher(dispatcher); |
| ASSERT_NOT_NULL(async_dispatcher); |
| |
| ASSERT_EQ(fdf_dispatcher_from_async_dispatcher(async_dispatcher), dispatcher); |
| } |
| |
| TEST_F(DispatcherTest, CancelTask) { |
| loop_.Quit(); |
| loop_.JoinThreads(); |
| loop_.ResetQuit(); |
| |
| fdf_dispatcher_t* dispatcher; |
| ASSERT_NO_FATAL_FAILURE(CreateDispatcher(0, "scheduler_role", CreateFakeDriver(), &dispatcher)); |
| |
| async_dispatcher_t* async_dispatcher = fdf_dispatcher_get_async_dispatcher(dispatcher); |
| ASSERT_NOT_NULL(async_dispatcher); |
| |
| async::TaskClosure task; |
| task.set_handler([] { ASSERT_FALSE(true); }); |
| ASSERT_OK(task.Post(async_dispatcher)); |
| |
| ASSERT_OK(task.Cancel()); // Task should not be running yet. |
| } |
| |
| TEST_F(DispatcherTest, CancelDelayedTask) { |
| loop_.Quit(); |
| loop_.JoinThreads(); |
| loop_.ResetQuit(); |
| |
| fdf_dispatcher_t* dispatcher; |
| ASSERT_NO_FATAL_FAILURE(CreateDispatcher(0, "scheduler_role", CreateFakeDriver(), &dispatcher)); |
| |
| async_dispatcher_t* async_dispatcher = fdf_dispatcher_get_async_dispatcher(dispatcher); |
| ASSERT_NOT_NULL(async_dispatcher); |
| |
| async::TaskClosure task; |
| task.set_handler([] { ASSERT_FALSE(true); }); |
| ASSERT_OK(task.PostForTime(async_dispatcher, zx::deadline_after(zx::sec(100)))); |
| |
| ASSERT_OK(task.Cancel()); // Task should not be running yet. |
| } |
| |
| TEST_F(DispatcherTest, CancelTaskNotYetPosted) { |
| loop_.Quit(); |
| loop_.JoinThreads(); |
| loop_.ResetQuit(); |
| |
| fdf_dispatcher_t* dispatcher; |
| ASSERT_NO_FATAL_FAILURE(CreateDispatcher(0, "scheduler_role", CreateFakeDriver(), &dispatcher)); |
| |
| async_dispatcher_t* async_dispatcher = fdf_dispatcher_get_async_dispatcher(dispatcher); |
| ASSERT_NOT_NULL(async_dispatcher); |
| |
| async::TaskClosure task; |
| task.set_handler([] { ASSERT_FALSE(true); }); |
| |
| ASSERT_EQ(task.Cancel(), ZX_ERR_NOT_FOUND); // Task should not be running yet. |
| } |
| |
| TEST_F(DispatcherTest, CancelTaskAlreadyRunning) { |
| fdf_dispatcher_t* dispatcher; |
| ASSERT_NO_FATAL_FAILURE(CreateDispatcher(0, "scheduler_role", CreateFakeDriver(), &dispatcher)); |
| |
| async_dispatcher_t* async_dispatcher = fdf_dispatcher_get_async_dispatcher(dispatcher); |
| ASSERT_NOT_NULL(async_dispatcher); |
| |
| async::TaskClosure task; |
| libsync::Completion completion; |
| task.set_handler([&] { |
| ASSERT_EQ(task.Cancel(), ZX_ERR_NOT_FOUND); // Task is already running. |
| completion.Signal(); |
| }); |
| ASSERT_OK(task.Post(async_dispatcher)); |
| ASSERT_OK(completion.Wait(zx::time::infinite())); |
| } |
| |
| TEST_F(DispatcherTest, AsyncWaitOnce) { |
| fdf_dispatcher_t* dispatcher; |
| ASSERT_NO_FATAL_FAILURE(CreateDispatcher(0, "scheduler_role", CreateFakeDriver(), &dispatcher)); |
| |
| async_dispatcher_t* async_dispatcher = fdf_dispatcher_get_async_dispatcher(dispatcher); |
| ASSERT_NOT_NULL(async_dispatcher); |
| |
| zx::event event; |
| ASSERT_OK(zx::event::create(0, &event)); |
| |
| sync_completion_t completion; |
| async::WaitOnce wait(event.get(), ZX_USER_SIGNAL_0); |
| ASSERT_OK(wait.Begin(async_dispatcher, [&completion, &async_dispatcher]( |
| async_dispatcher_t* dispatcher, async::WaitOnce* wait, |
| zx_status_t status, const zx_packet_signal_t* signal) { |
| ASSERT_EQ(async_dispatcher, dispatcher); |
| ASSERT_OK(status); |
| sync_completion_signal(&completion); |
| })); |
| ASSERT_OK(event.signal(0, ZX_USER_SIGNAL_0)); |
| ASSERT_OK(sync_completion_wait(&completion, ZX_TIME_INFINITE)); |
| } |
| |
| TEST_F(DispatcherTest, CancelWait) { |
| fdf_dispatcher_t* dispatcher; |
| ASSERT_NO_FATAL_FAILURE(CreateDispatcher(0, "scheduler_role", CreateFakeDriver(), &dispatcher)); |
| |
| async_dispatcher_t* async_dispatcher = fdf_dispatcher_get_async_dispatcher(dispatcher); |
| ASSERT_NOT_NULL(async_dispatcher); |
| |
| zx::event event; |
| ASSERT_OK(zx::event::create(0, &event)); |
| |
| async::WaitOnce wait(event.get(), ZX_USER_SIGNAL_0); |
| ASSERT_OK(wait.Begin(async_dispatcher, |
| [](async_dispatcher_t* dispatcher, async::WaitOnce* wait, zx_status_t status, |
| const zx_packet_signal_t* signal) { ZX_ASSERT(false); })); |
| ASSERT_OK(wait.Cancel()); |
| } |
| |
| TEST_F(DispatcherTest, CancelWaitFromWithinCanceledWait) { |
| auto fake_driver = CreateFakeDriver(); |
| driver_context::PushDriver(fake_driver); |
| auto pop_driver = fit::defer([]() { driver_context::PopDriver(); }); |
| |
| auto dispatcher = fdf::Dispatcher::Create(0, [](fdf_dispatcher_t* dispatcher) {}); |
| ASSERT_FALSE(dispatcher.is_error()); |
| |
| async_dispatcher_t* async_dispatcher = dispatcher->async_dispatcher(); |
| ASSERT_NOT_NULL(async_dispatcher); |
| |
| zx::event event; |
| ASSERT_OK(zx::event::create(0, &event)); |
| |
| async::WaitOnce wait(event.get(), ZX_USER_SIGNAL_0); |
| async::WaitOnce wait2(event.get(), ZX_USER_SIGNAL_0); |
| |
| ASSERT_OK(wait.Begin(async_dispatcher, [&](async_dispatcher_t* dispatcher, async::WaitOnce* wait, |
| zx_status_t status, const zx_packet_signal_t* signal) { |
| ASSERT_EQ(status, ZX_ERR_CANCELED); |
| wait2.Cancel(); |
| })); |
| |
| // We will cancel this wait from wait's handler, so we never expect it to complete. |
| ASSERT_OK(wait2.Begin(async_dispatcher, [](async_dispatcher_t* dispatcher, async::WaitOnce* wait, |
| zx_status_t status, const zx_packet_signal_t* signal) { |
| ZX_ASSERT(false); |
| })); |
| |
| fdf_internal::DriverShutdown driver_shutdown; |
| libsync::Completion driver_shutdown_completion; |
| ASSERT_OK(driver_shutdown.Begin(fake_driver, [&](const void* driver) { |
| ASSERT_EQ(fake_driver, driver); |
| driver_shutdown_completion.Signal(); |
| })); |
| |
| ASSERT_OK(driver_shutdown_completion.Wait()); |
| } |
| |
| TEST_F(DispatcherTest, GetCurrentDispatcherInWait) { |
| fdf_dispatcher_t* dispatcher; |
| ASSERT_NO_FATAL_FAILURE(CreateDispatcher(0, "scheduler_role", CreateFakeDriver(), &dispatcher)); |
| |
| async_dispatcher_t* async_dispatcher = fdf_dispatcher_get_async_dispatcher(dispatcher); |
| ASSERT_NOT_NULL(async_dispatcher); |
| |
| zx::event event; |
| ASSERT_OK(zx::event::create(0, &event)); |
| |
| sync_completion_t completion; |
| async::WaitOnce wait(event.get(), ZX_USER_SIGNAL_0); |
| ASSERT_OK(wait.Begin( |
| async_dispatcher, |
| [&completion, &dispatcher](async_dispatcher_t* async_dispatcher, async::WaitOnce* wait, |
| zx_status_t status, const zx_packet_signal_t* signal) { |
| ASSERT_EQ(fdf_dispatcher_get_current_dispatcher(), dispatcher); |
| ASSERT_OK(status); |
| sync_completion_signal(&completion); |
| })); |
| ASSERT_OK(event.signal(0, ZX_USER_SIGNAL_0)); |
| ASSERT_OK(sync_completion_wait(&completion, ZX_TIME_INFINITE)); |
| } |
| |
| TEST_F(DispatcherTest, WaitSynchronized) { |
| fdf_dispatcher_t* dispatcher; |
| ASSERT_NO_FATAL_FAILURE(CreateDispatcher(0, "scheduler_role", CreateFakeDriver(), &dispatcher)); |
| |
| // Create a second dispatcher which allows sync calls to force multiple threads. |
| fdf_dispatcher_t* unused_dispatcher; |
| ASSERT_NO_FATAL_FAILURE(CreateDispatcher(FDF_DISPATCHER_OPTION_ALLOW_SYNC_CALLS, "scheduler_role", |
| CreateFakeDriver(), &unused_dispatcher)); |
| |
| async_dispatcher_t* async_dispatcher = fdf_dispatcher_get_async_dispatcher(dispatcher); |
| ASSERT_NOT_NULL(async_dispatcher); |
| |
| zx::event event1, event2; |
| ASSERT_OK(zx::event::create(0, &event1)); |
| ASSERT_OK(zx::event::create(0, &event2)); |
| |
| fbl::Mutex lock1, lock2; |
| sync_completion_t completion1, completion2; |
| |
| async::WaitOnce wait1(event1.get(), ZX_USER_SIGNAL_0); |
| ASSERT_OK(wait1.Begin( |
| async_dispatcher, |
| [&completion1, &lock1, &lock2](async_dispatcher_t* dispatcher, async::WaitOnce* wait, |
| zx_status_t status, const zx_packet_signal_t* signal) { |
| // Take note of the order the locks are acquired here. |
| { |
| fbl::AutoLock al1(&lock1); |
| fbl::AutoLock al2(&lock2); |
| } |
| sync_completion_signal(&completion1); |
| })); |
| async::WaitOnce wait2(event1.get(), ZX_USER_SIGNAL_0); |
| ASSERT_OK(wait2.Begin( |
| async_dispatcher, |
| [&completion2, &lock1, &lock2](async_dispatcher_t* dispatcher, async::WaitOnce* wait, |
| zx_status_t status, const zx_packet_signal_t* signal) { |
| // Locks acquired here in opposite order. If these calls are ever made in parallel, then we |
| // run into a deadlock. The test should hang and eventually timeout in that case. |
| { |
| fbl::AutoLock al2(&lock2); |
| fbl::AutoLock al1(&lock1); |
| } |
| sync_completion_signal(&completion2); |
| })); |
| |
| // While the order of these signals are serialized, the order in which the signals are observed by |
| // the waits is not. As a result either of the above waits may trigger first. |
| ASSERT_OK(event1.signal(0, ZX_USER_SIGNAL_0)); |
| ASSERT_OK(event2.signal(0, ZX_USER_SIGNAL_0)); |
| // The order of observing these completions does not matter. |
| ASSERT_OK(sync_completion_wait(&completion2, ZX_TIME_INFINITE)); |
| ASSERT_OK(sync_completion_wait(&completion1, ZX_TIME_INFINITE)); |
| } |
| |
| // Tests an irq can be bound and multiple callbacks received. |
| TEST_F(DispatcherTest, Irq) { |
| static constexpr uint32_t kNumCallbacks = 10; |
| |
| fdf_dispatcher_t* fdf_dispatcher; |
| ASSERT_NO_FATAL_FAILURE( |
| CreateDispatcher(0, "scheduler_role", CreateFakeDriver(), &fdf_dispatcher)); |
| |
| async_dispatcher_t* dispatcher = fdf_dispatcher_get_async_dispatcher(fdf_dispatcher); |
| ASSERT_NOT_NULL(dispatcher); |
| |
| zx::interrupt irq_object; |
| ASSERT_EQ(ZX_OK, zx::interrupt::create(zx::resource(0), 0, ZX_INTERRUPT_VIRTUAL, &irq_object)); |
| |
| libsync::Completion irq_signal; |
| async::Irq irq(irq_object.get(), 0, |
| [&](async_dispatcher_t* dispatcher_arg, async::Irq* irq_arg, zx_status_t status, |
| const zx_packet_interrupt_t* interrupt) { |
| irq_object.ack(); |
| ASSERT_EQ(irq_arg, &irq); |
| ASSERT_EQ(ZX_OK, status); |
| irq_signal.Signal(); |
| }); |
| ASSERT_EQ(ZX_OK, irq.Begin(dispatcher)); |
| ASSERT_EQ(ZX_ERR_ALREADY_EXISTS, irq.Begin(dispatcher)); |
| |
| for (uint32_t i = 0; i < kNumCallbacks; i++) { |
| irq_object.trigger(0, zx::time()); |
| ASSERT_OK(irq_signal.Wait()); |
| irq_signal.Reset(); |
| } |
| |
| // Must unbind irq from dispatcher thread. |
| libsync::Completion unbind_complete; |
| ASSERT_OK(async::PostTask(dispatcher, [&] { |
| ASSERT_OK(irq.Cancel()); |
| ASSERT_EQ(ZX_ERR_NOT_FOUND, irq.Cancel()); |
| unbind_complete.Signal(); |
| })); |
| ASSERT_OK(unbind_complete.Wait()); |
| } |
| |
| // Tests that the client will stop receiving callbacks after unbinding the irq. |
| TEST_F(DispatcherTest, UnbindIrq) { |
| fdf_dispatcher_t* fdf_dispatcher; |
| ASSERT_NO_FATAL_FAILURE( |
| CreateDispatcher(0, "scheduler_role", CreateFakeDriver(), &fdf_dispatcher)); |
| |
| async_dispatcher_t* dispatcher = fdf_dispatcher_get_async_dispatcher(fdf_dispatcher); |
| ASSERT_NOT_NULL(dispatcher); |
| |
| zx::interrupt irq_object; |
| ASSERT_EQ(ZX_OK, zx::interrupt::create(zx::resource(0), 0, ZX_INTERRUPT_VIRTUAL, &irq_object)); |
| |
| async::Irq irq(irq_object.get(), 0, |
| [&](async_dispatcher_t* dispatcher_arg, async::Irq* irq_arg, zx_status_t status, |
| const zx_packet_interrupt_t* interrupt) { ASSERT_FALSE(true); }); |
| ASSERT_EQ(ZX_OK, irq.Begin(dispatcher)); |
| |
| // Must unbind irq from dispatcher thread. |
| libsync::Completion unbind_complete; |
| ASSERT_OK(async::PostTask(dispatcher, [&] { |
| ASSERT_OK(irq.Cancel()); |
| unbind_complete.Signal(); |
| })); |
| ASSERT_OK(unbind_complete.Wait()); |
| |
| // The irq has been unbound, so this should not call the handler. |
| irq_object.trigger(0, zx::time()); |
| } |
| |
| // Tests that we get cancellation callbacks for irqs that are still bound when shutting down. |
| TEST_F(DispatcherTest, IrqCancelOnShutdown) { |
| libsync::Completion completion; |
| auto destructed_handler = [&](fdf_dispatcher_t* dispatcher) { completion.Signal(); }; |
| |
| driver_context::PushDriver(CreateFakeDriver()); |
| auto pop_driver = fit::defer([]() { driver_context::PopDriver(); }); |
| |
| auto fdf_dispatcher = fdf::Dispatcher::Create(0, destructed_handler); |
| ASSERT_FALSE(fdf_dispatcher.is_error()); |
| |
| async_dispatcher_t* dispatcher = fdf_dispatcher->async_dispatcher(); |
| ASSERT_NOT_NULL(dispatcher); |
| |
| zx::interrupt irq_object; |
| ASSERT_EQ(ZX_OK, zx::interrupt::create(zx::resource(0), 0, ZX_INTERRUPT_VIRTUAL, &irq_object)); |
| |
| libsync::Completion irq_completion; |
| async::Irq irq(irq_object.get(), 0, |
| [&](async_dispatcher_t* dispatcher_arg, async::Irq* irq_arg, zx_status_t status, |
| const zx_packet_interrupt_t* interrupt) { |
| ASSERT_EQ(ZX_ERR_CANCELED, status); |
| irq_completion.Signal(); |
| }); |
| ASSERT_EQ(ZX_OK, irq.Begin(dispatcher)); |
| |
| // This should unbind the irq and call the handler with ZX_ERR_CANCELED. |
| fdf_dispatcher->ShutdownAsync(); |
| ASSERT_OK(irq_completion.Wait()); |
| ASSERT_OK(completion.Wait(zx::time::infinite())); |
| } |
| |
| // Tests that we get one cancellation callback per irq that is still bound when shutting down. |
| TEST_F(DispatcherTest, IrqCancelOnShutdownCallbackOnlyOnce) { |
| auto shutdown_observer = std::make_unique<DispatcherShutdownObserver>(); |
| driver_runtime::Dispatcher* runtime_dispatcher; |
| ASSERT_EQ(ZX_OK, driver_runtime::Dispatcher::CreateWithLoop(0, "", 0, CreateFakeDriver(), &loop_, |
| shutdown_observer->fdf_observer(), |
| &runtime_dispatcher)); |
| fdf_dispatcher_t* fdf_dispatcher = static_cast<fdf_dispatcher_t*>(runtime_dispatcher); |
| |
| async_dispatcher_t* dispatcher = fdf_dispatcher_get_async_dispatcher(fdf_dispatcher); |
| ASSERT_NOT_NULL(dispatcher); |
| |
| // Create a second dispatcher which allows sync calls to force multiple threads. |
| fdf_dispatcher_t* fdf_dispatcher2; |
| ASSERT_NO_FATAL_FAILURE(CreateDispatcher(FDF_DISPATCHER_OPTION_ALLOW_SYNC_CALLS, "scheduler_role", |
| CreateFakeDriver(), &fdf_dispatcher2)); |
| |
| zx::interrupt irq_object; |
| ASSERT_EQ(ZX_OK, zx::interrupt::create(zx::resource(0), 0, ZX_INTERRUPT_VIRTUAL, &irq_object)); |
| |
| libsync::Completion irq_completion; |
| async::Irq irq(irq_object.get(), 0, |
| [&](async_dispatcher_t* dispatcher_arg, async::Irq* irq_arg, zx_status_t status, |
| const zx_packet_interrupt_t* interrupt) { |
| ASSERT_FALSE(irq_completion.signaled()); // Make sure it is only called once. |
| ASSERT_EQ(ZX_ERR_CANCELED, status); |
| irq_completion.Signal(); |
| }); |
| ASSERT_EQ(ZX_OK, irq.Begin(dispatcher)); |
| |
| // Block the sync dispatcher thread with a task. |
| libsync::Completion complete_task; |
| ASSERT_OK(async::PostTask(dispatcher, [&] { ASSERT_OK(complete_task.Wait()); })); |
| |
| // Trigger the irq to queue a callback request. |
| irq_object.trigger(0, zx::time()); |
| |
| // Make sure the callback request has already been queued by the second global dispatcher thread, |
| // by queueing a task after the trigger and waiting for the task's completion. |
| libsync::Completion task_complete; |
| ASSERT_OK(async::PostTask(fdf_dispatcher_get_async_dispatcher(fdf_dispatcher2), |
| [&] { task_complete.Signal(); })); |
| ASSERT_OK(task_complete.Wait()); |
| |
| complete_task.Signal(); |
| |
| // This should unbind the irq and call the handler with ZX_ERR_CANCELED. |
| fdf_dispatcher_shutdown_async(fdf_dispatcher); |
| ASSERT_OK(shutdown_observer->WaitUntilShutdown()); |
| ASSERT_OK(irq_completion.Wait()); |
| fdf_dispatcher_destroy(fdf_dispatcher); |
| } |
| |
| // Tests that an irq can be unbound after a dispatcher begins shutting down. |
| TEST_F(DispatcherTest, UnbindIrqAfterDispatcherShutdown) { |
| libsync::Completion completion; |
| auto destructed_handler = [&](fdf_dispatcher_t* dispatcher) { completion.Signal(); }; |
| |
| driver_context::PushDriver(CreateFakeDriver()); |
| auto pop_driver = fit::defer([]() { driver_context::PopDriver(); }); |
| |
| auto fdf_dispatcher = fdf::Dispatcher::Create(0, destructed_handler); |
| ASSERT_FALSE(fdf_dispatcher.is_error()); |
| |
| async_dispatcher_t* dispatcher = fdf_dispatcher->async_dispatcher(); |
| ASSERT_NOT_NULL(dispatcher); |
| |
| zx::interrupt irq_object; |
| ASSERT_EQ(ZX_OK, zx::interrupt::create(zx::resource(0), 0, ZX_INTERRUPT_VIRTUAL, &irq_object)); |
| |
| async::Irq irq(irq_object.get(), 0, |
| [&](async_dispatcher_t* dispatcher_arg, async::Irq* irq_arg, zx_status_t status, |
| const zx_packet_interrupt_t* interrupt) { ASSERT_TRUE(false); }); |
| ASSERT_EQ(ZX_OK, irq.Begin(dispatcher)); |
| |
| ASSERT_OK(async::PostTask(dispatcher, [&] { |
| fdf_dispatcher->ShutdownAsync(); |
| ASSERT_OK(irq.Cancel()); |
| })); |
| |
| ASSERT_OK(completion.Wait(zx::time::infinite())); |
| } |
| |
| // Tests that when using a SYNCHRONIZED dispatcher, irqs are not delivered in parallel. |
| TEST_F(DispatcherTest, IrqSynchronized) { |
| // Create a dispatcher that we will bind 2 irqs to. |
| fdf_dispatcher_t* fdf_dispatcher; |
| ASSERT_NO_FATAL_FAILURE( |
| CreateDispatcher(0, "scheduler_role", CreateFakeDriver(), &fdf_dispatcher)); |
| |
| async_dispatcher_t* dispatcher = fdf_dispatcher_get_async_dispatcher(fdf_dispatcher); |
| ASSERT_NOT_NULL(dispatcher); |
| |
| // Create a second dispatcher which allows sync calls to force multiple threads. |
| fdf_dispatcher_t* fdf_dispatcher2; |
| ASSERT_NO_FATAL_FAILURE(CreateDispatcher(FDF_DISPATCHER_OPTION_ALLOW_SYNC_CALLS, "scheduler_role", |
| CreateFakeDriver(), &fdf_dispatcher2)); |
| |
| zx::interrupt irq_object1; |
| ASSERT_EQ(ZX_OK, zx::interrupt::create(zx::resource(0), 0, ZX_INTERRUPT_VIRTUAL, &irq_object1)); |
| zx::interrupt irq_object2; |
| ASSERT_EQ(ZX_OK, zx::interrupt::create(zx::resource(0), 0, ZX_INTERRUPT_VIRTUAL, &irq_object2)); |
| |
| // We will bind 2 irqs to one dispatcher, and trigger them both. The irq handlers will block |
| // until a task posted to another dispatcher completes. If the irqs callbacks happen |
| // in parallel, the task will not be able to run, and the test will hang. |
| libsync::Completion task_completion; |
| libsync::Completion irq_completion1, irq_completion2; |
| |
| async::Irq irq1(irq_object1.get(), 0, |
| [&](async_dispatcher_t* dispatcher_arg, async::Irq* irq_arg, zx_status_t status, |
| const zx_packet_interrupt_t* interrupt) { |
| ASSERT_OK(task_completion.Wait()); |
| irq_object1.ack(); |
| ASSERT_EQ(irq_arg, &irq1); |
| ASSERT_EQ(ZX_OK, status); |
| irq_completion1.Signal(); |
| }); |
| ASSERT_EQ(ZX_OK, irq1.Begin(dispatcher)); |
| |
| async::Irq irq2(irq_object2.get(), 0, |
| [&](async_dispatcher_t* dispatcher_arg, async::Irq* irq_arg, zx_status_t status, |
| const zx_packet_interrupt_t* interrupt) { |
| ASSERT_OK(task_completion.Wait()); |
| irq_object2.ack(); |
| ASSERT_EQ(irq_arg, &irq2); |
| ASSERT_EQ(ZX_OK, status); |
| irq_completion2.Signal(); |
| }); |
| ASSERT_EQ(ZX_OK, irq2.Begin(dispatcher)); |
| |
| // While the order of these triggers are serialized, the order in which the triggers are observed |
| // by the async_irqs is not. As a result either of the above async_irqs may trigger first. If the |
| // irqs are not synchronized, both irq handlers will run and block. |
| irq_object1.trigger(0, zx::time()); |
| irq_object2.trigger(0, zx::time()); |
| |
| // Unblock the irq handler. |
| ASSERT_OK(async::PostTask(fdf_dispatcher_get_async_dispatcher(fdf_dispatcher2), |
| [&] { task_completion.Signal(); })); |
| ASSERT_OK(task_completion.Wait()); |
| |
| // The order of observing these completions does not matter. |
| ASSERT_OK(irq_completion2.Wait()); |
| ASSERT_OK(irq_completion1.Wait()); |
| |
| // Must unbind irqs from dispatcher thread. |
| libsync::Completion unbind_complete; |
| ASSERT_OK(async::PostTask(dispatcher, [&] { |
| ASSERT_OK(irq1.Cancel()); |
| ASSERT_OK(irq2.Cancel()); |
| unbind_complete.Signal(); |
| })); |
| ASSERT_OK(unbind_complete.Wait()); |
| } |
| |
| TEST_F(DispatcherTest, UnbindIrqRemovesPacketFromPort) { |
| libsync::Completion completion; |
| auto destructed_handler = [&](fdf_dispatcher_t* dispatcher) { completion.Signal(); }; |
| |
| driver_context::PushDriver(CreateFakeDriver()); |
| auto pop_driver = fit::defer([]() { driver_context::PopDriver(); }); |
| |
| auto fdf_dispatcher = fdf::Dispatcher::Create(0, destructed_handler); |
| ASSERT_FALSE(fdf_dispatcher.is_error()); |
| |
| async_dispatcher_t* dispatcher = fdf_dispatcher->async_dispatcher(); |
| ASSERT_NOT_NULL(dispatcher); |
| |
| zx::interrupt irq_object; |
| ASSERT_EQ(ZX_OK, zx::interrupt::create(zx::resource(0), 0, ZX_INTERRUPT_VIRTUAL, &irq_object)); |
| |
| async::Irq irq(irq_object.get(), 0, |
| [&](async_dispatcher_t* dispatcher_arg, async::Irq* irq_arg, zx_status_t status, |
| const zx_packet_interrupt_t* interrupt) { ASSERT_TRUE(false); }); |
| ASSERT_EQ(ZX_OK, irq.Begin(dispatcher)); |
| |
| libsync::Completion task_complete; |
| ASSERT_OK(async::PostTask(dispatcher, [&] { |
| // The irq handler should not be called yet since the dispatcher thread is blocked. |
| irq_object.trigger(0, zx::time()); |
| // This should remove the pending irq packet from the port. |
| ASSERT_OK(irq.Cancel()); |
| task_complete.Signal(); |
| })); |
| ASSERT_OK(task_complete.Wait()); |
| |
| fdf_dispatcher->ShutdownAsync(); |
| ASSERT_OK(completion.Wait(zx::time::infinite())); |
| } |
| |
| TEST_F(DispatcherTest, UnbindIrqRemovesQueuedIrqs) { |
| // Create a dispatcher that we will bind 2 irqs to. |
| fdf_dispatcher_t* fdf_dispatcher; |
| ASSERT_NO_FATAL_FAILURE( |
| CreateDispatcher(0, "scheduler_role", CreateFakeDriver(), &fdf_dispatcher)); |
| |
| async_dispatcher_t* dispatcher = fdf_dispatcher_get_async_dispatcher(fdf_dispatcher); |
| ASSERT_NOT_NULL(dispatcher); |
| |
| // Create a second dispatcher which allows sync calls to force multiple threads. |
| fdf_dispatcher_t* fdf_dispatcher2; |
| ASSERT_NO_FATAL_FAILURE(CreateDispatcher(FDF_DISPATCHER_OPTION_ALLOW_SYNC_CALLS, "scheduler_role", |
| CreateFakeDriver(), &fdf_dispatcher2)); |
| |
| zx::interrupt irq_object; |
| ASSERT_EQ(ZX_OK, zx::interrupt::create(zx::resource(0), 0, ZX_INTERRUPT_VIRTUAL, &irq_object)); |
| |
| async::Irq irq(irq_object.get(), 0, |
| [&](async_dispatcher_t* dispatcher_arg, async::Irq* irq_arg, zx_status_t status, |
| const zx_packet_interrupt_t* interrupt) { ASSERT_FALSE(true); }); |
| ASSERT_EQ(ZX_OK, irq.Begin(dispatcher)); |
| |
| // Block the dispatcher thread. |
| libsync::Completion task_started; |
| libsync::Completion complete_task; |
| libsync::Completion task_complete; |
| ASSERT_OK(async::PostTask(dispatcher, [&] { |
| task_started.Signal(); |
| // We will cancel the irq once the test has confirmed that the irq |OnSignal| has happened. |
| ASSERT_OK(complete_task.Wait()); |
| ASSERT_OK(irq.Cancel()); |
| task_complete.Signal(); |
| })); |
| ASSERT_OK(task_started.Wait()); |
| |
| irq_object.trigger(0, zx::time()); |
| |
| // Make sure the irq |OnSignal| has happened on the other |process_shared_dispatcher| thread. |
| // Since there are only 2 threads, and 1 is blocked by the task, the other must have already |
| // processed the irq. |
| libsync::Completion task2_completion; |
| ASSERT_OK(async::PostTask(fdf_dispatcher_get_async_dispatcher(fdf_dispatcher2), |
| [&] { task2_completion.Signal(); })); |
| ASSERT_OK(task2_completion.Wait()); |
| |
| complete_task.Signal(); |
| ASSERT_OK(task_complete.Wait()); |
| |
| // The task unbound the irq, so any queued irq callback request should be cancelled. |
| // If not, the irq handler will be called and assert. |
| } |
| |
| namespace driver_runtime { |
| extern DispatcherCoordinator& GetDispatcherCoordinator(); |
| } |
| |
| // Tests the potential race condition that occurs when an irq is unbound |
| // but the port has just read the irq packet from the port. |
| TEST_F(DispatcherTest, UnbindIrqImmediatelyAfterTriggering) { |
| static constexpr uint32_t kNumIrqs = 3000; |
| static constexpr uint32_t kNumThreads = 10; |
| |
| // TODO(fxbug.dev/102878): this can be replaced by |fdf_internal::DriverShutdown| once it works |
| // properly. |
| libsync::Completion shutdown_completion; |
| std::atomic_int num_destructed = 0; |
| auto destructed_handler = [&](fdf_dispatcher_t* dispatcher) { |
| // |fetch_add| returns the value before incrementing. |
| if (num_destructed.fetch_add(1) == kNumThreads - 1) { |
| shutdown_completion.Signal(); |
| } |
| }; |
| |
| auto driver = CreateFakeDriver(); |
| driver_context::PushDriver(driver); |
| auto pop_driver = fit::defer([]() { driver_context::PopDriver(); }); |
| |
| auto fdf_dispatcher = fdf::Dispatcher::Create(0, destructed_handler); |
| ASSERT_FALSE(fdf_dispatcher.is_error()); |
| |
| async_dispatcher_t* dispatcher = fdf_dispatcher->async_dispatcher(); |
| ASSERT_NOT_NULL(dispatcher); |
| |
| // Create a bunch of blocking dispatchers to force new threads. |
| fdf::Dispatcher unused_dispatchers[kNumThreads - 1]; |
| { |
| for (uint32_t i = 0; i < kNumThreads - 1; i++) { |
| auto fdf_dispatcher = |
| fdf::Dispatcher::Create(FDF_DISPATCHER_OPTION_ALLOW_SYNC_CALLS, destructed_handler); |
| ASSERT_FALSE(fdf_dispatcher.is_error()); |
| unused_dispatchers[i] = *std::move(fdf_dispatcher); |
| } |
| } |
| |
| // Create and unbind a bunch of irqs. |
| zx::interrupt irqs[kNumIrqs] = {}; |
| for (uint32_t i = 0; i < kNumIrqs; i++) { |
| // Must unbind irq from dispatcher thread. |
| libsync::Completion unbind_complete; |
| ASSERT_OK(async::PostTask(dispatcher, [&] { |
| ASSERT_EQ(ZX_OK, zx::interrupt::create(zx::resource(0), 0, ZX_INTERRUPT_VIRTUAL, &irqs[i])); |
| |
| async::Irq irq( |
| irqs[i].get(), 0, |
| [&](async_dispatcher_t* dispatcher_arg, async::Irq* irq_arg, zx_status_t status, |
| const zx_packet_interrupt_t* interrupt) { ASSERT_FALSE(true); }); |
| ASSERT_EQ(ZX_OK, irq.Begin(dispatcher)); |
| // This queues the irq packet on the port, which may be read by another thread. |
| irqs[i].trigger(0, zx::time()); |
| ASSERT_OK(irq.Cancel()); |
| unbind_complete.Signal(); |
| })); |
| ASSERT_OK(unbind_complete.Wait()); |
| } |
| |
| fdf_dispatcher->ShutdownAsync(); |
| for (uint32_t i = 0; i < kNumThreads - 1; i++) { |
| unused_dispatchers[i].ShutdownAsync(); |
| } |
| ASSERT_OK(shutdown_completion.Wait()); |
| |
| fdf_dispatcher->reset(); |
| for (uint32_t i = 0; i < kNumThreads - 1; i++) { |
| unused_dispatchers[i].reset(); |
| } |
| |
| // Reset the number of threads to 1. |
| driver_runtime::GetDispatcherCoordinator().Reset(); |
| } |
| |
| // Tests that binding irqs to an unsynchronized dispatcher is not allowed. |
| TEST_F(DispatcherTest, IrqUnsynchronized) { |
| fdf_dispatcher_t* fdf_dispatcher; |
| ASSERT_NO_FATAL_FAILURE(CreateDispatcher(FDF_DISPATCHER_OPTION_UNSYNCHRONIZED, "scheduler_role", |
| CreateFakeDriver(), &fdf_dispatcher)); |
| |
| async_dispatcher_t* dispatcher = fdf_dispatcher_get_async_dispatcher(fdf_dispatcher); |
| ASSERT_NOT_NULL(dispatcher); |
| |
| zx::interrupt irq_object; |
| ASSERT_EQ(ZX_OK, zx::interrupt::create(zx::resource(0), 0, ZX_INTERRUPT_VIRTUAL, &irq_object)); |
| |
| async::Irq irq(irq_object.get(), 0, |
| [&](async_dispatcher_t* dispatcher_arg, async::Irq* irq_arg, zx_status_t status, |
| const zx_packet_interrupt_t* interrupt) { ASSERT_TRUE(false); }); |
| ASSERT_EQ(ZX_ERR_NOT_SUPPORTED, irq.Begin(dispatcher)); |
| } |
| |
| void IrqNotCalledHandler(async_dispatcher_t* async, async_irq_t* irq, zx_status_t status, |
| const zx_packet_interrupt_t* packet) { |
| ASSERT_TRUE(status == ZX_ERR_CANCELED); |
| } |
| |
| // Tests that you cannot unbind an irq from a different dispatcher from which it was bound to. |
| TEST_F(DispatcherTest, UnbindIrqFromWrongDispatcher) { |
| fdf_dispatcher_t* fdf_dispatcher; |
| ASSERT_NO_FATAL_FAILURE( |
| CreateDispatcher(0, "scheduler_role", CreateFakeDriver(), &fdf_dispatcher)); |
| |
| async_dispatcher_t* dispatcher = fdf_dispatcher_get_async_dispatcher(fdf_dispatcher); |
| ASSERT_NOT_NULL(dispatcher); |
| |
| fdf_dispatcher_t* fdf_dispatcher2; |
| ASSERT_NO_FATAL_FAILURE( |
| CreateDispatcher(0, "scheduler_role", CreateFakeDriver(), &fdf_dispatcher2)); |
| |
| async_dispatcher_t* dispatcher2 = fdf_dispatcher_get_async_dispatcher(fdf_dispatcher2); |
| ASSERT_NOT_NULL(dispatcher2); |
| |
| zx::interrupt irq_object; |
| ASSERT_EQ(ZX_OK, zx::interrupt::create(zx::resource(0), 0, ZX_INTERRUPT_VIRTUAL, &irq_object)); |
| |
| // Use the C API, as the C++ async::Irq will clear the dispatcher on the first call to Cancel. |
| async_irq_t irq = {{ASYNC_STATE_INIT}, &IrqNotCalledHandler, irq_object.get()}; |
| |
| ASSERT_OK(async_bind_irq(dispatcher, &irq)); |
| |
| libsync::Completion task_complete; |
| ASSERT_OK(async::PostTask(dispatcher2, [&] { |
| // Cancel the irq from a different dispatcher it was bound to. |
| ASSERT_EQ(ZX_ERR_BAD_STATE, async_unbind_irq(dispatcher, &irq)); |
| task_complete.Signal(); |
| })); |
| ASSERT_OK(task_complete.Wait()); |
| |
| task_complete.Reset(); |
| ASSERT_OK(async::PostTask(dispatcher, [&] { |
| ASSERT_EQ(ZX_OK, async_unbind_irq(dispatcher, &irq)); |
| task_complete.Signal(); |
| })); |
| ASSERT_OK(task_complete.Wait()); |
| } |
| |
| // |
| // WaitUntilIdle tests |
| // |
| |
| TEST_F(DispatcherTest, WaitUntilIdle) { |
| fdf_dispatcher_t* dispatcher; |
| ASSERT_NO_FATAL_FAILURE(CreateDispatcher(0, "scheduler_role", CreateFakeDriver(), &dispatcher)); |
| |
| ASSERT_TRUE(dispatcher->IsIdle()); |
| fdf_internal_wait_until_dispatcher_idle(dispatcher); |
| ASSERT_TRUE(dispatcher->IsIdle()); |
| } |
| |
| TEST_F(DispatcherTest, WaitUntilIdleWithDirectCall) { |
| fdf_dispatcher_t* dispatcher; |
| ASSERT_NO_FATAL_FAILURE(CreateDispatcher(0, "scheduler_role", CreateFakeDriver(), &dispatcher)); |
| |
| // We shouldn't actually block on a dispatcher that doesn't have ALLOW_SYNC_CALLS set, |
| // but this is just for synchronizing the test. |
| libsync::Completion entered_callback; |
| libsync::Completion complete_blocking_read; |
| ASSERT_NO_FATAL_FAILURE( |
| RegisterAsyncReadBlock(local_ch_, dispatcher, &entered_callback, &complete_blocking_read)); |
| |
| std::thread t1 = std::thread([&] { |
| // Make the call not reentrant, so that the read will run immediately once the write happens. |
| driver_context::PushDriver(CreateFakeDriver()); |
| auto pop_driver = fit::defer([]() { driver_context::PopDriver(); }); |
| ASSERT_EQ(ZX_OK, fdf_channel_write(remote_ch_, 0, nullptr, nullptr, 0, nullptr, 0)); |
| }); |
| |
| // Wait for the read callback to be called, it will block until we signal it to complete. |
| ASSERT_OK(entered_callback.Wait(zx::time::infinite())); |
| |
| ASSERT_FALSE(dispatcher->IsIdle()); |
| |
| // Start a thread that blocks until the dispatcher is idle. |
| libsync::Completion wait_started; |
| libsync::Completion wait_complete; |
| std::thread t2 = std::thread([&] { |
| wait_started.Signal(); |
| fdf_internal_wait_until_dispatcher_idle(dispatcher); |
| ASSERT_TRUE(dispatcher->IsIdle()); |
| wait_complete.Signal(); |
| }); |
| |
| ASSERT_OK(wait_started.Wait(zx::time::infinite())); |
| ASSERT_FALSE(wait_complete.signaled()); |
| ASSERT_FALSE(dispatcher->IsIdle()); |
| |
| complete_blocking_read.Signal(); |
| |
| // Dispatcher should be idle now. |
| ASSERT_OK(wait_complete.Wait(zx::time::infinite())); |
| |
| t1.join(); |
| t2.join(); |
| } |
| |
| TEST_F(DispatcherTest, WaitUntilIdleWithAsyncLoop) { |
| fdf_dispatcher_t* dispatcher; |
| ASSERT_NO_FATAL_FAILURE(CreateDispatcher(0, "scheduler_role", CreateFakeDriver(), &dispatcher)); |
| |
| // We shouldn't actually block on a dispatcher that doesn't have ALLOW_SYNC_CALLS set, |
| // but this is just for synchronizing the test. |
| libsync::Completion entered_callback; |
| libsync::Completion complete_blocking_read; |
| ASSERT_NO_FATAL_FAILURE( |
| RegisterAsyncReadBlock(local_ch_, dispatcher, &entered_callback, &complete_blocking_read)); |
| |
| // Call is reentrant, so the read will be queued on the async loop. |
| ASSERT_EQ(ZX_OK, fdf_channel_write(remote_ch_, 0, nullptr, nullptr, 0, nullptr, 0)); |
| ASSERT_FALSE(dispatcher->IsIdle()); |
| |
| // Wait for the read callback to be called, it will block until we signal it to complete. |
| ASSERT_OK(entered_callback.Wait(zx::time::infinite())); |
| |
| ASSERT_FALSE(dispatcher->IsIdle()); |
| |
| complete_blocking_read.Signal(); |
| fdf_internal_wait_until_dispatcher_idle(dispatcher); |
| ASSERT_TRUE(dispatcher->IsIdle()); |
| } |
| |
| TEST_F(DispatcherTest, WaitUntilIdleCanceledRead) { |
| loop_.Quit(); |
| loop_.JoinThreads(); |
| loop_.ResetQuit(); |
| |
| fdf_dispatcher_t* dispatcher; |
| ASSERT_NO_FATAL_FAILURE(CreateDispatcher(0, "scheduler_role", CreateFakeDriver(), &dispatcher)); |
| |
| auto channel_read = std::make_unique<fdf::ChannelRead>( |
| local_ch_, 0, |
| [&](fdf_dispatcher_t* dispatcher, fdf::ChannelRead* channel_read, fdf_status_t status) { |
| ASSERT_FALSE(true); // This callback should never be called. |
| }); |
| ASSERT_OK(channel_read->Begin(dispatcher)); |
| |
| // Call is reentrant, so the read will be queued on the async loop. |
| ASSERT_EQ(ZX_OK, fdf_channel_write(remote_ch_, 0, nullptr, nullptr, 0, nullptr, 0)); |
| ASSERT_FALSE(dispatcher->IsIdle()); |
| |
| ASSERT_OK(channel_read->Cancel()); |
| |
| loop_.StartThread(); |
| |
| fdf_internal_wait_until_dispatcher_idle(dispatcher); |
| } |
| |
| TEST_F(DispatcherTest, WaitUntilIdlePendingWait) { |
| fdf_dispatcher_t* dispatcher; |
| ASSERT_NO_FATAL_FAILURE(CreateDispatcher(0, "scheduler_role", CreateFakeDriver(), &dispatcher)); |
| |
| async_dispatcher_t* async_dispatcher = fdf_dispatcher_get_async_dispatcher(dispatcher); |
| ASSERT_NOT_NULL(async_dispatcher); |
| |
| zx::event event; |
| ASSERT_OK(zx::event::create(0, &event)); |
| |
| async::WaitOnce wait(event.get(), ZX_USER_SIGNAL_0); |
| ASSERT_OK( |
| wait.Begin(async_dispatcher, |
| [](async_dispatcher_t* async_dispatcher, async::WaitOnce* wait, zx_status_t status, |
| const zx_packet_signal_t* signal) { ASSERT_FALSE(true); })); |
| ASSERT_TRUE(dispatcher->IsIdle()); |
| fdf_internal_wait_until_dispatcher_idle(dispatcher); |
| } |
| |
| TEST_F(DispatcherTest, WaitUntilIdleDelayedTask) { |
| loop_.Quit(); |
| loop_.JoinThreads(); |
| loop_.ResetQuit(); |
| |
| fdf_dispatcher_t* dispatcher; |
| ASSERT_NO_FATAL_FAILURE(CreateDispatcher(0, "scheduler_role", CreateFakeDriver(), &dispatcher)); |
| |
| async_dispatcher_t* async_dispatcher = fdf_dispatcher_get_async_dispatcher(dispatcher); |
| ASSERT_NOT_NULL(async_dispatcher); |
| |
| async::TaskClosure task; |
| task.set_handler([] { ASSERT_FALSE(true); }); |
| ASSERT_OK(task.PostForTime(async_dispatcher, zx::deadline_after(zx::sec(100)))); |
| |
| ASSERT_TRUE(dispatcher->IsIdle()); |
| fdf_internal_wait_until_dispatcher_idle(dispatcher); |
| |
| ASSERT_OK(task.Cancel()); // Task should not be running yet. |
| } |
| |
| TEST_F(DispatcherTest, WaitUntilIdleWithAsyncLoopMultipleThreads) { |
| loop_.Quit(); |
| loop_.JoinThreads(); |
| loop_.ResetQuit(); |
| |
| constexpr uint32_t kNumThreads = 2; |
| constexpr uint32_t kNumClients = 22; |
| |
| fdf_dispatcher_t* dispatcher; |
| ASSERT_NO_FATAL_FAILURE(CreateDispatcher(FDF_DISPATCHER_OPTION_UNSYNCHRONIZED, "scheduler_role", |
| CreateFakeDriver(), &dispatcher)); |
| |
| struct ReadClient { |
| fdf::Channel channel; |
| libsync::Completion entered_callback; |
| libsync::Completion complete_blocking_read; |
| }; |
| |
| std::vector<ReadClient> local(kNumClients); |
| std::vector<fdf::Channel> remote(kNumClients); |
| |
| for (uint32_t i = 0; i < kNumClients; i++) { |
| auto channels = fdf::ChannelPair::Create(0); |
| ASSERT_OK(channels.status_value()); |
| local[i].channel = std::move(channels->end0); |
| remote[i] = std::move(channels->end1); |
| ASSERT_NO_FATAL_FAILURE(RegisterAsyncReadBlock(local[i].channel.get(), dispatcher, |
| &local[i].entered_callback, |
| &local[i].complete_blocking_read)); |
| } |
| |
| fdf::Arena arena; |
| for (uint32_t i = 0; i < kNumClients; i++) { |
| // Call is considered reentrant and will be queued on the async loop. |
| auto write_status = remote[i].Write(0, arena, nullptr, 0, cpp20::span<zx_handle_t>()); |
| ASSERT_OK(write_status.status_value()); |
| } |
| |
| for (uint32_t i = 0; i < kNumThreads; i++) { |
| loop_.StartThread(); |
| } |
| |
| ASSERT_OK(local[0].entered_callback.Wait(zx::time::infinite())); |
| local[0].complete_blocking_read.Signal(); |
| |
| ASSERT_FALSE(dispatcher->IsIdle()); |
| |
| // Allow all the read callbacks to complete. |
| for (uint32_t i = 1; i < kNumClients; i++) { |
| local[i].complete_blocking_read.Signal(); |
| } |
| |
| fdf_internal_wait_until_dispatcher_idle(dispatcher); |
| |
| for (uint32_t i = 0; i < kNumClients; i++) { |
| ASSERT_TRUE(local[i].complete_blocking_read.signaled()); |
| } |
| } |
| |
| TEST_F(DispatcherTest, WaitUntilIdleMultipleDispatchers) { |
| fdf_dispatcher_t* dispatcher; |
| ASSERT_NO_FATAL_FAILURE(CreateDispatcher(0, "scheduler_role", CreateFakeDriver(), &dispatcher)); |
| |
| fdf_dispatcher_t* dispatcher2; |
| ASSERT_NO_FATAL_FAILURE(CreateDispatcher(0, "scheduler_role", CreateFakeDriver(), &dispatcher2)); |
| |
| // We shouldn't actually block on a dispatcher that doesn't have ALLOW_SYNC_CALLS set, |
| // but this is just for synchronizing the test. |
| libsync::Completion entered_callback; |
| libsync::Completion complete_blocking_read; |
| ASSERT_NO_FATAL_FAILURE( |
| RegisterAsyncReadBlock(local_ch_, dispatcher, &entered_callback, &complete_blocking_read)); |
| |
| // Call is reentrant, so the read will be queued on the async loop. |
| ASSERT_EQ(ZX_OK, fdf_channel_write(remote_ch_, 0, nullptr, nullptr, 0, nullptr, 0)); |
| ASSERT_FALSE(dispatcher->IsIdle()); |
| |
| // Wait for the read callback to be called, it will block until we signal it to complete. |
| ASSERT_OK(entered_callback.Wait(zx::time::infinite())); |
| |
| ASSERT_FALSE(dispatcher->IsIdle()); |
| ASSERT_TRUE(dispatcher2->IsIdle()); |
| fdf_internal_wait_until_dispatcher_idle(dispatcher2); |
| |
| complete_blocking_read.Signal(); |
| fdf_internal_wait_until_dispatcher_idle(dispatcher); |
| ASSERT_TRUE(dispatcher->IsIdle()); |
| } |
| |
| // Tests shutting down the process async loop while requests are still pending. |
| TEST_F(DispatcherTest, ShutdownProcessAsyncLoop) { |
| DispatcherShutdownObserver observer; |
| |
| const void* driver = CreateFakeDriver(); |
| auto scheduler_role = "scheduler_role"; |
| |
| driver_runtime::Dispatcher* dispatcher; |
| ASSERT_EQ(ZX_OK, driver_runtime::Dispatcher::CreateWithLoop( |
| FDF_DISPATCHER_OPTION_UNSYNCHRONIZED, scheduler_role, strlen(scheduler_role), |
| driver, &loop_, observer.fdf_observer(), &dispatcher)); |
| |
| libsync::Completion entered_read; |
| auto channel_read = std::make_unique<fdf::ChannelRead>( |
| local_ch_, 0, |
| [&](fdf_dispatcher_t* dispatcher, fdf::ChannelRead* channel_read, fdf_status_t status) { |
| entered_read.Signal(); |
| // Do not let the read callback complete until the loop has entered a shutdown state. |
| while (loop_.GetState() != ASYNC_LOOP_SHUTDOWN) { |
| } |
| }); |
| ASSERT_OK(channel_read->Begin(static_cast<fdf_dispatcher_t*>(dispatcher))); |
| |
| // Call is reentrant, so the read will be queued on the async loop. |
| ASSERT_EQ(ZX_OK, fdf_channel_write(remote_ch_, 0, nullptr, nullptr, 0, nullptr, 0)); |
| // This will queue the wait to run |Dispatcher::CompleteShutdown|. |
| dispatcher->ShutdownAsync(); |
| |
| ASSERT_OK(entered_read.Wait(zx::time::infinite())); |
| |
| loop_.Shutdown(); |
| |
| ASSERT_OK(observer.WaitUntilShutdown()); |
| dispatcher->Destroy(); |
| } |
| |
| TEST_F(DispatcherTest, SyncDispatcherCancelRequestDuringShutdown) { |
| DispatcherShutdownObserver observer; |
| |
| const void* driver = CreateFakeDriver(); |
| auto scheduler_role = "scheduler_role"; |
| |
| driver_runtime::Dispatcher* dispatcher; |
| ASSERT_EQ(ZX_OK, driver_runtime::Dispatcher::CreateWithLoop( |
| 0, scheduler_role, strlen(scheduler_role), driver, &loop_, |
| observer.fdf_observer(), &dispatcher)); |
| |
| // Register a channel read that will be canceled by a posted task. |
| auto channel_read = std::make_unique<fdf::ChannelRead>( |
| local_ch_, 0, |
| [&](fdf_dispatcher_t* dispatcher, fdf::ChannelRead* channel_read, fdf_status_t status) { |
| ASSERT_FALSE(true); // This should never be called. |
| }); |
| ASSERT_OK(channel_read->Begin(static_cast<fdf_dispatcher_t*>(dispatcher))); |
| |
| libsync::Completion task_started; |
| libsync::Completion dispatcher_shutdown_started; |
| |
| ASSERT_OK(async::PostTask(dispatcher->GetAsyncDispatcher(), [&] { |
| task_started.Signal(); |
| ASSERT_OK(dispatcher_shutdown_started.Wait(zx::time::infinite())); |
| ASSERT_OK(channel_read->Cancel()); |
| })); |
| |
| ASSERT_OK(task_started.Wait(zx::time::infinite())); |
| |
| // |Dispatcher::ShutdownAsync| will move the registered channel read into |shutdown_queue_|. |
| dispatcher->ShutdownAsync(); |
| dispatcher_shutdown_started.Signal(); |
| |
| ASSERT_OK(observer.WaitUntilShutdown()); |
| dispatcher->Destroy(); |
| } |
| |
| // |
| // Misc tests |
| // |
| |
| TEST_F(DispatcherTest, GetCurrentDispatcherNone) { |
| ASSERT_NULL(fdf_dispatcher_get_current_dispatcher()); |
| } |
| |
| TEST_F(DispatcherTest, GetCurrentDispatcher) { |
| const void* driver1 = CreateFakeDriver(); |
| fdf_dispatcher_t* dispatcher1; |
| ASSERT_NO_FATAL_FAILURE(CreateDispatcher(0, "scheduler_role", driver1, &dispatcher1)); |
| |
| const void* driver2 = CreateFakeDriver(); |
| fdf_dispatcher_t* dispatcher2; |
| ASSERT_NO_FATAL_FAILURE(CreateDispatcher(0, "scheduler_role", driver2, &dispatcher2)); |
| |
| // driver1 will wait on a message from driver2, then reply back. |
| auto channel_read1 = std::make_unique<fdf::ChannelRead>( |
| local_ch_, 0, |
| [&](fdf_dispatcher_t* dispatcher, fdf::ChannelRead* channel_read, fdf_status_t status) { |
| ASSERT_OK(status); |
| ASSERT_EQ(dispatcher1, fdf_dispatcher_get_current_dispatcher()); |
| // This reply will be reentrant and queued on the async loop. |
| ASSERT_EQ(ZX_OK, fdf_channel_write(local_ch_, 0, nullptr, nullptr, 0, nullptr, 0)); |
| }); |
| ASSERT_OK(channel_read1->Begin(dispatcher1)); |
| |
| libsync::Completion got_reply; |
| auto channel_read2 = std::make_unique<fdf::ChannelRead>( |
| remote_ch_, 0, |
| [&](fdf_dispatcher_t* dispatcher, fdf::ChannelRead* channel_read, fdf_status_t status) { |
| ASSERT_OK(status); |
| ASSERT_EQ(dispatcher2, fdf_dispatcher_get_current_dispatcher()); |
| got_reply.Signal(); |
| }); |
| ASSERT_OK(channel_read2->Begin(dispatcher2)); |
| |
| // Write from driver 2 to driver1. |
| ASSERT_OK(async::PostTask(fdf_dispatcher_get_async_dispatcher(dispatcher2), [&] { |
| ASSERT_EQ(dispatcher2, fdf_dispatcher_get_current_dispatcher()); |
| // Non-reentrant write. |
| ASSERT_EQ(ZX_OK, fdf_channel_write(remote_ch_, 0, nullptr, nullptr, 0, nullptr, 0)); |
| })); |
| |
| ASSERT_OK(got_reply.Wait(zx::time::infinite())); |
| fdf_internal_wait_until_dispatcher_idle(dispatcher2); |
| } |
| |
| TEST_F(DispatcherTest, HasQueuedTasks) { |
| fdf_dispatcher_t* dispatcher; |
| ASSERT_NO_FATAL_FAILURE(CreateDispatcher(0, "scheduler_role", CreateFakeDriver(), &dispatcher)); |
| |
| ASSERT_FALSE(dispatcher->HasQueuedTasks()); |
| |
| // We shouldn't actually block on a dispatcher that doesn't have ALLOW_SYNC_CALLS set, |
| // but this is just for synchronizing the test. |
| libsync::Completion entered_callback; |
| libsync::Completion complete_blocking_read; |
| ASSERT_NO_FATAL_FAILURE( |
| RegisterAsyncReadBlock(local_ch_, dispatcher, &entered_callback, &complete_blocking_read)); |
| |
| // Call is reentrant, so the read will be queued on the async loop. |
| ASSERT_EQ(ZX_OK, fdf_channel_write(remote_ch_, 0, nullptr, nullptr, 0, nullptr, 0)); |
| ASSERT_FALSE(dispatcher->IsIdle()); |
| |
| // Wait for the read callback to be called, it will block until we signal it to complete. |
| ASSERT_OK(entered_callback.Wait(zx::time::infinite())); |
| |
| libsync::Completion entered_task; |
| ASSERT_OK(async::PostTask(dispatcher, [&] { entered_task.Signal(); })); |
| ASSERT_TRUE(dispatcher->HasQueuedTasks()); |
| |
| complete_blocking_read.Signal(); |
| |
| ASSERT_OK(entered_task.Wait()); |
| ASSERT_FALSE(dispatcher->HasQueuedTasks()); |
| |
| fdf_internal_wait_until_dispatcher_idle(dispatcher); |
| ASSERT_FALSE(dispatcher->HasQueuedTasks()); |
| } |
| |
| // Tests shutting down all the dispatchers owned by a driver. |
| TEST_F(DispatcherTest, ShutdownAllDriverDispatchers) { |
| const void* fake_driver = CreateFakeDriver(); |
| const void* fake_driver2 = CreateFakeDriver(); |
| auto scheduler_role = "scheduler_role"; |
| |
| constexpr uint32_t kNumDispatchers = 3; |
| DispatcherShutdownObserver observers[kNumDispatchers]; |
| driver_runtime::Dispatcher* dispatchers[kNumDispatchers]; |
| |
| for (uint32_t i = 0; i < kNumDispatchers; i++) { |
| const void* driver = i == 0 ? fake_driver : fake_driver2; |
| ASSERT_EQ(ZX_OK, driver_runtime::Dispatcher::CreateWithLoop( |
| 0, scheduler_role, strlen(scheduler_role), driver, &loop_, |
| observers[i].fdf_observer(), &dispatchers[i])); |
| } |
| |
| // Shutdown the second driver, dispatchers[1] and dispatchers[2] should be shutdown. |
| fdf_internal::DriverShutdown driver2_shutdown; |
| libsync::Completion driver2_shutdown_completion; |
| ASSERT_OK(driver2_shutdown.Begin(fake_driver2, [&](const void* driver) { |
| ASSERT_EQ(fake_driver2, driver); |
| driver2_shutdown_completion.Signal(); |
| })); |
| |
| ASSERT_OK(observers[1].WaitUntilShutdown()); |
| ASSERT_OK(observers[2].WaitUntilShutdown()); |
| ASSERT_OK(driver2_shutdown_completion.Wait()); |
| |
| // Shutdown the first driver, dispatchers[0] should be shutdown. |
| fdf_internal::DriverShutdown driver_shutdown; |
| libsync::Completion driver_shutdown_completion; |
| ASSERT_OK(driver2_shutdown.Begin(fake_driver, [&](const void* driver) { |
| ASSERT_EQ(fake_driver, driver); |
| driver_shutdown_completion.Signal(); |
| })); |
| |
| ASSERT_OK(observers[0].WaitUntilShutdown()); |
| ASSERT_OK(driver_shutdown_completion.Wait()); |
| |
| for (uint32_t i = 0; i < kNumDispatchers; i++) { |
| dispatchers[i]->Destroy(); |
| } |
| } |
| |
| TEST_F(DispatcherTest, DriverDestroysDispatcherShutdownByDriverHost) { |
| zx::status<fdf::Dispatcher> dispatcher; |
| |
| libsync::Completion completion; |
| auto shutdown_handler = [&](fdf_dispatcher_t* shutdown_dispatcher) mutable { |
| ASSERT_EQ(shutdown_dispatcher, dispatcher->get()); |
| dispatcher->reset(); |
| completion.Signal(); |
| }; |
| |
| auto fake_driver = CreateFakeDriver(); |
| driver_context::PushDriver(fake_driver); |
| auto pop_driver = fit::defer([]() { driver_context::PopDriver(); }); |
| |
| dispatcher = fdf::Dispatcher::Create(0, shutdown_handler); |
| ASSERT_FALSE(dispatcher.is_error()); |
| |
| fdf_internal::DriverShutdown driver_shutdown; |
| libsync::Completion driver_shutdown_completion; |
| ASSERT_OK(driver_shutdown.Begin(fake_driver, [&](const void* driver) { |
| ASSERT_EQ(fake_driver, driver); |
| driver_shutdown_completion.Signal(); |
| })); |
| |
| ASSERT_OK(completion.Wait()); |
| ASSERT_OK(driver_shutdown_completion.Wait()); |
| } |
| |
| TEST_F(DispatcherTest, CannotCreateNewDispatcherDuringDriverShutdown) { |
| libsync::Completion completion; |
| auto shutdown_handler = [&](fdf_dispatcher_t* shutdown_dispatcher) { completion.Signal(); }; |
| |
| auto fake_driver = CreateFakeDriver(); |
| driver_context::PushDriver(fake_driver); |
| auto pop_driver = fit::defer([]() { driver_context::PopDriver(); }); |
| |
| auto dispatcher = fdf::Dispatcher::Create(0, shutdown_handler); |
| ASSERT_FALSE(dispatcher.is_error()); |
| |
| libsync::Completion task_started; |
| libsync::Completion driver_shutting_down; |
| ASSERT_OK(async::PostTask(dispatcher->async_dispatcher(), [&] { |
| task_started.Signal(); |
| ASSERT_OK(driver_shutting_down.Wait(zx::time::infinite())); |
| auto dispatcher = fdf::Dispatcher::Create(0, [](fdf_dispatcher_t* dispatcher) {}); |
| // Creating a new dispatcher should fail, as the driver is currently shutting down. |
| ASSERT_TRUE(dispatcher.is_error()); |
| })); |
| ASSERT_OK(task_started.Wait(zx::time::infinite())); |
| |
| fdf_internal::DriverShutdown driver_shutdown; |
| libsync::Completion driver_shutdown_completion; |
| ASSERT_OK(driver_shutdown.Begin(fake_driver, [&](const void* driver) { |
| ASSERT_EQ(fake_driver, driver); |
| driver_shutdown_completion.Signal(); |
| })); |
| |
| driver_shutting_down.Signal(); |
| |
| ASSERT_OK(completion.Wait(zx::time::infinite())); |
| ASSERT_OK(driver_shutdown_completion.Wait()); |
| } |
| |
| // Tests shutting down all dispatchers for a driver, but the dispatchers are already in a shutdown |
| // state. |
| TEST_F(DispatcherTest, ShutdownAllDispatchersAlreadyShutdown) { |
| libsync::Completion completion; |
| auto shutdown_handler = [&](fdf_dispatcher_t* shutdown_dispatcher) { completion.Signal(); }; |
| |
| auto fake_driver = CreateFakeDriver(); |
| driver_context::PushDriver(fake_driver); |
| auto pop_driver = fit::defer([]() { driver_context::PopDriver(); }); |
| |
| auto dispatcher = fdf::Dispatcher::Create(0, shutdown_handler); |
| ASSERT_FALSE(dispatcher.is_error()); |
| |
| dispatcher->ShutdownAsync(); |
| ASSERT_OK(completion.Wait(zx::time::infinite())); |
| |
| fdf_internal::DriverShutdown driver_shutdown; |
| libsync::Completion driver_shutdown_completion; |
| ASSERT_OK(driver_shutdown.Begin(fake_driver, [&](const void* driver) { |
| ASSERT_EQ(fake_driver, driver); |
| driver_shutdown_completion.Signal(); |
| })); |
| ASSERT_OK(driver_shutdown_completion.Wait()); |
| } |
| |
| // Tests shutting down all dispatchers for a driver, but the dispatcher is in the shutdown observer |
| // callback. |
| TEST_F(DispatcherTest, ShutdownAllDispatchersCurrentlyInShutdownCallback) { |
| libsync::Completion entered_shutdown_handler; |
| libsync::Completion complete_shutdown_handler; |
| auto shutdown_handler = [&](fdf_dispatcher_t* shutdown_dispatcher) { |
| entered_shutdown_handler.Signal(); |
| ASSERT_OK(complete_shutdown_handler.Wait()); |
| }; |
| |
| auto fake_driver = CreateFakeDriver(); |
| driver_context::PushDriver(fake_driver); |
| auto pop_driver = fit::defer([]() { driver_context::PopDriver(); }); |
| |
| auto dispatcher = fdf::Dispatcher::Create(0, shutdown_handler); |
| ASSERT_FALSE(dispatcher.is_error()); |
| |
| dispatcher->ShutdownAsync(); |
| ASSERT_OK(entered_shutdown_handler.Wait()); |
| |
| fdf_internal::DriverShutdown driver_shutdown; |
| libsync::Completion driver_shutdown_completion; |
| ASSERT_OK(driver_shutdown.Begin(fake_driver, [&](const void* driver) { |
| ASSERT_EQ(fake_driver, driver); |
| driver_shutdown_completion.Signal(); |
| })); |
| |
| // The dispatcher is still in the dispatcher shutdown handler. |
| ASSERT_FALSE(driver_shutdown_completion.signaled()); |
| complete_shutdown_handler.Signal(); |
| ASSERT_OK(driver_shutdown_completion.Wait()); |
| } |
| |
| TEST_F(DispatcherTest, DestroyAllDispatchers) { |
| // Create drivers which leak their dispatchers. |
| auto fake_driver = CreateFakeDriver(); |
| { |
| driver_context::PushDriver(fake_driver); |
| auto pop_driver = fit::defer([]() { driver_context::PopDriver(); }); |
| auto dispatcher = fdf::Dispatcher::Create(0, [](fdf_dispatcher_t* dispatcher) {}); |
| ASSERT_FALSE(dispatcher.is_error()); |
| dispatcher->release(); |
| } |
| |
| auto fake_driver2 = CreateFakeDriver(); |
| { |
| driver_context::PushDriver(fake_driver2); |
| auto pop_driver = fit::defer([]() { driver_context::PopDriver(); }); |
| auto dispatcher2 = fdf::Dispatcher::Create(0, [](fdf_dispatcher_t* dispatcher) {}); |
| ASSERT_FALSE(dispatcher2.is_error()); |
| dispatcher2->release(); |
| } |
| |
| // Driver host shuts down all drivers. |
| fdf_internal::DriverShutdown driver_shutdown; |
| libsync::Completion driver_shutdown_completion; |
| ASSERT_OK(driver_shutdown.Begin(fake_driver, [&](const void* driver) { |
| ASSERT_EQ(fake_driver, driver); |
| driver_shutdown_completion.Signal(); |
| })); |
| ASSERT_OK(driver_shutdown_completion.Wait()); |
| driver_shutdown_completion.Reset(); |
| |
| ASSERT_OK(driver_shutdown.Begin(fake_driver2, [&](const void* driver) { |
| ASSERT_EQ(fake_driver2, driver); |
| driver_shutdown_completion.Signal(); |
| })); |
| ASSERT_OK(driver_shutdown_completion.Wait()); |
| |
| // This will stop memory from leaking. |
| fdf_internal_destroy_all_dispatchers(); |
| } |
| |
| TEST_F(DispatcherTest, WaitUntilAllDispatchersDestroyed) { |
| // No dispatchers, should immediately return. |
| fdf_internal_wait_until_all_dispatchers_destroyed(); |
| |
| constexpr uint32_t kNumDispatchers = 4; |
| fdf_dispatcher_t* dispatchers[kNumDispatchers]; |
| |
| for (uint32_t i = 0; i < kNumDispatchers; i++) { |
| auto fake_driver = CreateFakeDriver(); |
| driver_context::PushDriver(fake_driver); |
| auto pop_driver = fit::defer([]() { driver_context::PopDriver(); }); |
| |
| auto dispatcher = fdf::Dispatcher::Create( |
| 0, [&](fdf_dispatcher_t* dispatcher) { fdf_dispatcher_destroy(dispatcher); }); |
| ASSERT_FALSE(dispatcher.is_error()); |
| dispatchers[i] = dispatcher->release(); // Destroyed in shutdown handler. |
| } |
| |
| libsync::Completion thread_started; |
| std::atomic_bool wait_complete = false; |
| std::thread thread = std::thread([&]() { |
| thread_started.Signal(); |
| fdf_internal_wait_until_all_dispatchers_destroyed(); |
| wait_complete = true; |
| }); |
| |
| ASSERT_OK(thread_started.Wait()); |
| for (uint32_t i = 0; i < kNumDispatchers; i++) { |
| // Not all dispatchers have been destroyed yet. |
| ASSERT_FALSE(wait_complete); |
| dispatchers[i]->ShutdownAsync(); |
| } |
| thread.join(); |
| ASSERT_TRUE(wait_complete); |
| } |
| |
| // Tests waiting for all dispatchers to be destroyed when a driver shutdown |
| // observer is also registered. |
| TEST_F(DispatcherTest, WaitUntilAllDispatchersDestroyedHasDriverShutdownObserver) { |
| auto fake_driver = CreateFakeDriver(); |
| driver_context::PushDriver(fake_driver); |
| auto pop_driver = fit::defer([]() { driver_context::PopDriver(); }); |
| |
| auto dispatcher = fdf::Dispatcher::Create( |
| 0, [&](fdf_dispatcher_t* dispatcher) { fdf_dispatcher_destroy(dispatcher); }); |
| ASSERT_FALSE(dispatcher.is_error()); |
| dispatcher->release(); // Destroyed in the shutdown handler. |
| |
| libsync::Completion thread_started; |
| std::atomic_bool wait_complete = false; |
| std::thread thread = std::thread([&]() { |
| thread_started.Signal(); |
| fdf_internal_wait_until_all_dispatchers_destroyed(); |
| wait_complete = true; |
| }); |
| |
| ASSERT_OK(thread_started.Wait()); |
| |
| fdf_internal::DriverShutdown driver_shutdown; |
| libsync::Completion driver_shutdown_completion; |
| ASSERT_OK(driver_shutdown.Begin(fake_driver, [&](const void* driver) { |
| ASSERT_EQ(fake_driver, driver); |
| driver_shutdown_completion.Signal(); |
| })); |
| ASSERT_OK(driver_shutdown_completion.Wait()); |
| |
| thread.join(); |
| ASSERT_TRUE(wait_complete); |
| } |
| |
| TEST_F(DispatcherTest, WaitUntilAllDispatchersDestroyedDuringDriverShutdownHandler) { |
| auto fake_driver = CreateFakeDriver(); |
| driver_context::PushDriver(fake_driver); |
| auto pop_driver = fit::defer([]() { driver_context::PopDriver(); }); |
| |
| auto dispatcher = fdf::Dispatcher::Create( |
| 0, [&](fdf_dispatcher_t* dispatcher) { fdf_dispatcher_destroy(dispatcher); }); |
| ASSERT_FALSE(dispatcher.is_error()); |
| dispatcher->release(); // Destroyed in shutdown handler. |
| |
| // Block in the driver shutdown handler until we signal. |
| fdf_internal::DriverShutdown driver_shutdown; |
| libsync::Completion driver_shutdown_started; |
| libsync::Completion complete_driver_shutdown; |
| ASSERT_OK(driver_shutdown.Begin(fake_driver, [&](const void* driver) { |
| ASSERT_EQ(fake_driver, driver); |
| driver_shutdown_started.Signal(); |
| ASSERT_OK(complete_driver_shutdown.Wait()); |
| })); |
| |
| ASSERT_OK(driver_shutdown_started.Wait()); |
| |
| // Start waiting for all dispatchers to be destroyed. This should not complete |
| // until the shutdown handler completes. |
| libsync::Completion thread_started; |
| std::atomic_bool wait_complete = false; |
| std::thread thread = std::thread([&]() { |
| thread_started.Signal(); |
| fdf_internal_wait_until_all_dispatchers_destroyed(); |
| wait_complete = true; |
| }); |
| |
| ASSERT_OK(thread_started.Wait()); |
| |
| // Shutdown handler has not returned yet. |
| ASSERT_FALSE(wait_complete); |
| complete_driver_shutdown.Signal(); |
| |
| thread.join(); |
| ASSERT_TRUE(wait_complete); |
| } |
| |
| // |
| // Error handling |
| // |
| |
| // Tests that you cannot create an unsynchronized blocking dispatcher. |
| TEST_F(DispatcherTest, CreateUnsynchronizedAllowSyncCallsFails) { |
| driver_context::PushDriver(CreateFakeDriver()); |
| auto pop_driver = fit::defer([]() { driver_context::PopDriver(); }); |
| |
| DispatcherShutdownObserver observer(false /* require_callback */); |
| driver_runtime::Dispatcher* dispatcher; |
| uint32_t options = FDF_DISPATCHER_OPTION_UNSYNCHRONIZED | FDF_DISPATCHER_OPTION_ALLOW_SYNC_CALLS; |
| ASSERT_NE(ZX_OK, fdf_dispatcher::Create(options, "scheduler_role", 0, observer.fdf_observer(), |
| &dispatcher)); |
| } |
| |
| // Tests that you cannot create a dispatcher on a thread not managed by the driver runtime. |
| TEST_F(DispatcherTest, CreateDispatcherOnNonRuntimeThreadFails) { |
| DispatcherShutdownObserver observer(false /* require_callback */); |
| driver_runtime::Dispatcher* dispatcher; |
| ASSERT_NE(ZX_OK, |
| fdf_dispatcher::Create(0, "scheduler_role", 0, observer.fdf_observer(), &dispatcher)); |
| } |
| |
| // Tests that we don't spawn more threads than we need. |
| TEST_F(DispatcherTest, ExtraThreadIsReused) { |
| { |
| void* driver = reinterpret_cast<void*>(uintptr_t(1)); |
| fdf_internal_push_driver(driver); |
| auto deferred = fit::defer([]() { fdf_internal_pop_driver(); }); |
| |
| ASSERT_EQ(driver_runtime::GetDispatcherCoordinator().num_threads(), 1); |
| |
| // Create first dispatcher |
| driver_runtime::Dispatcher* dispatcher; |
| DispatcherShutdownObserver observer; |
| ASSERT_OK(fdf_dispatcher::Create(FDF_DISPATCHER_OPTION_ALLOW_SYNC_CALLS, "scheduler_role", 0, |
| observer.fdf_observer(), &dispatcher)); |
| ASSERT_EQ(driver_runtime::GetDispatcherCoordinator().num_threads(), 2); |
| |
| dispatcher->ShutdownAsync(); |
| ASSERT_OK(observer.WaitUntilShutdown()); |
| dispatcher->Destroy(); |
| |
| // Create second dispatcher |
| DispatcherShutdownObserver observer2; |
| ASSERT_OK(fdf_dispatcher::Create(FDF_DISPATCHER_OPTION_ALLOW_SYNC_CALLS, "scheduler_role", 0, |
| observer2.fdf_observer(), &dispatcher)); |
| // Note that we are still at 2 threads. |
| ASSERT_EQ(driver_runtime::GetDispatcherCoordinator().num_threads(), 2); |
| |
| dispatcher->ShutdownAsync(); |
| ASSERT_OK(observer2.WaitUntilShutdown()); |
| dispatcher->Destroy(); |
| |
| // Ideally we would be back down 1 thread at this point, but that is challenging. A future |
| // change may rememdy this. |
| ASSERT_EQ(driver_runtime::GetDispatcherCoordinator().num_threads(), 2); |
| } |
| |
| driver_runtime::GetDispatcherCoordinator().Reset(); |
| ASSERT_EQ(driver_runtime::GetDispatcherCoordinator().num_threads(), 1); |
| } |
| |
| TEST_F(DispatcherTest, MaximumTenThreads) { |
| { |
| void* driver = reinterpret_cast<void*>(uintptr_t(1)); |
| fdf_internal_push_driver(driver); |
| auto deferred = fit::defer([]() { fdf_internal_pop_driver(); }); |
| |
| ASSERT_EQ(driver_runtime::GetDispatcherCoordinator().num_threads(), 1); |
| |
| constexpr uint32_t kNumDispatchers = 11; |
| |
| std::array<driver_runtime::Dispatcher*, kNumDispatchers> dispatchers; |
| std::array<DispatcherShutdownObserver, kNumDispatchers> observers; |
| for (uint32_t i = 0; i < kNumDispatchers; i++) { |
| ASSERT_OK(fdf_dispatcher::Create(FDF_DISPATCHER_OPTION_ALLOW_SYNC_CALLS, "scheduler_role", 0, |
| observers[i].fdf_observer(), &dispatchers[i])); |
| ASSERT_EQ(driver_runtime::GetDispatcherCoordinator().num_threads(), std::min(i + 2, 10u)); |
| } |
| |
| ASSERT_EQ(driver_runtime::GetDispatcherCoordinator().num_threads(), 10); |
| |
| for (uint32_t i = 0; i < kNumDispatchers; i++) { |
| dispatchers[i]->ShutdownAsync(); |
| ASSERT_OK(observers[i].WaitUntilShutdown()); |
| dispatchers[i]->Destroy(); |
| } |
| } |
| |
| driver_runtime::GetDispatcherCoordinator().Reset(); |
| } |