| // Copyright 2018 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include <lib/async-loop/cpp/loop.h> |
| #include <lib/async/cpp/task.h> |
| |
| #include "../mpsc_queue.h" |
| #include "gtest/gtest.h" |
| |
| TEST(MpscQueueTest, Sanity) { |
| MpscQueue<int> under_test; |
| std::queue<int> expectation; |
| const int kElements = 10; |
| |
| for (int i = 0; i < kElements; ++i) { |
| under_test.Push(i); |
| expectation.push(i); |
| } |
| |
| for (int i = 0; i < kElements; ++i) { |
| std::optional<int> maybe_elem = under_test.Pop(); |
| EXPECT_TRUE(maybe_elem.has_value()); |
| if (maybe_elem.has_value()) { |
| EXPECT_EQ(expectation.front(), *maybe_elem); |
| expectation.pop(); |
| } |
| } |
| } |
| |
| TEST(MpscQueueTest, TwoThreads) { |
| MpscQueue<int> under_test; |
| std::set<int> expectation; |
| |
| const int kElements = 100; |
| for (int i = 0; i < kElements; ++i) { |
| expectation.insert(i); |
| } |
| |
| async::Loop producer_loop(&kAsyncLoopConfigNoAttachToThread); |
| async::PostTask(producer_loop.dispatcher(), [&under_test] { |
| for (int i = 0; i < kElements; ++i) { |
| under_test.Push(i); |
| } |
| }); |
| |
| producer_loop.StartThread("Test Producer thread", nullptr); |
| |
| int element_count = 0; |
| while (element_count < kElements) { |
| std::optional<int> maybe_elem = under_test.Pop(); |
| if (maybe_elem.has_value()) { |
| ++element_count; |
| expectation.erase(*maybe_elem); |
| } |
| } |
| |
| EXPECT_EQ(expectation.size(), 0u); |
| } |
| |
| TEST(BlockingMpscQueueTest, TwoThreads) { |
| BlockingMpscQueue<int> under_test; |
| std::set<int> expectation; |
| |
| const int kElements = 100; |
| for (int i = 0; i < kElements; ++i) { |
| expectation.insert(i); |
| } |
| |
| async::Loop producer_loop(&kAsyncLoopConfigNoAttachToThread); |
| async::PostTask(producer_loop.dispatcher(), [&under_test] { |
| for (int i = 0; i < kElements; ++i) { |
| under_test.Push(i); |
| } |
| }); |
| |
| producer_loop.StartThread("Test Producer thread", nullptr); |
| |
| int element_count = 0; |
| while (element_count < kElements) { |
| std::optional<int> maybe_element = under_test.WaitForElement(); |
| if (maybe_element) { |
| ++element_count; |
| expectation.erase(*maybe_element); |
| } |
| } |
| |
| EXPECT_EQ(expectation.size(), 0u); |
| } |
| |
| TEST(BlockingMpscQueueTest, Clear) { |
| BlockingMpscQueue<int> under_test; |
| |
| const int kElements = 100; |
| for (int i = 0; i < kElements; ++i) { |
| under_test.Push(i); |
| } |
| |
| under_test.WaitForElement(); |
| under_test.Push(0); |
| under_test.Reset(); |
| std::queue<int> extracted = |
| BlockingMpscQueue<int>::Extract(std::move(under_test)); |
| EXPECT_EQ(extracted.size(), 0u); |
| } |
| |
| TEST(BlockingMpscQueueTest, Extract) { |
| BlockingMpscQueue<int> under_test; |
| std::set<int> expectation; |
| |
| const int kElements = 100; |
| for (int i = 0; i < kElements; ++i) { |
| expectation.insert(i); |
| under_test.Push(i); |
| } |
| |
| std::queue<int> extracted = |
| BlockingMpscQueue<int>::Extract(std::move(under_test)); |
| int element_count = 0; |
| while (element_count < kElements && !extracted.empty()) { |
| ++element_count; |
| expectation.erase(extracted.front()); |
| extracted.pop(); |
| } |
| |
| EXPECT_EQ(expectation.size(), 0u); |
| } |
| |
| TEST(BlockingMpscQueueTest, ManyThreads) { |
| BlockingMpscQueue<int> under_test; |
| |
| const int kElements = 1000; |
| const int kThreads = 10; |
| |
| // Order is not gauranteed when multiple producers contend, so we just test |
| // here that the implementation is stable and all elements are yielded. |
| std::unique_ptr<async::Loop> producer_loops[kThreads]; |
| for (auto& producer_loop : producer_loops) { |
| producer_loop = |
| std::make_unique<async::Loop>(&kAsyncLoopConfigNoAttachToThread); |
| async::PostTask(producer_loop->dispatcher(), [&under_test] { |
| for (int j = 0; j < kElements; ++j) { |
| under_test.Push(j); |
| } |
| }); |
| producer_loop->StartThread(nullptr, nullptr); |
| } |
| |
| int element_count = 0; |
| while (element_count < kElements * kThreads) { |
| std::optional<int> maybe_element = under_test.WaitForElement(); |
| if (maybe_element) { |
| ++element_count; |
| } |
| } |
| } |