blob: b604c667fc2d205ee00cdaf0e0174cf6518d5109 [file] [log] [blame]
// Copyright 2024 The Pigweed Authors
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not
// use this file except in compliance with the License. You may obtain a copy of
// the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations under
// the License.
#include "pw_channel/stream_channel.h"
#include <algorithm>
#include <array>
#include "pw_async2/pend_func_task.h"
#include "pw_bytes/suffix.h"
#include "pw_multibuf/simple_allocator_for_test.h"
#include "pw_status/status.h"
#include "pw_stream/mpsc_stream.h"
#include "pw_thread/test_thread_context.h"
#include "pw_unit_test/framework.h"
namespace {
using ::pw::async2::Context;
using ::pw::async2::PendFuncTask;
using ::pw::async2::Pending;
using ::pw::async2::Poll;
using ::pw::async2::Ready;
using ::pw::multibuf::MultiBuf;
using ::pw::multibuf::test::SimpleAllocatorForTest;
using ::pw::operator""_b;
template <typename ActualIterable, typename ExpectedIterable>
void ExpectElementsEqual(const ActualIterable& actual,
const ExpectedIterable& expected) {
auto actual_iter = actual.begin();
auto expected_iter = expected.begin();
for (; expected_iter != expected.end(); ++actual_iter, ++expected_iter) {
ASSERT_NE(actual_iter, actual.end());
EXPECT_EQ(*actual_iter, *expected_iter);
}
}
template <typename ActualIterable, typename T>
void ExpectElementsEqual(const ActualIterable& actual,
std::initializer_list<T> expected) {
ExpectElementsEqual<ActualIterable, std::initializer_list<T>>(actual,
expected);
}
struct LiveForeverTestData {
LiveForeverTestData() {
pw::stream::CreateMpscStream(channel_input_reader, channel_input_writer);
pw::stream::CreateMpscStream(channel_output_reader, channel_output_writer);
}
pw::stream::BufferedMpscReader<512> channel_input_reader;
pw::stream::MpscWriter channel_input_writer;
pw::stream::BufferedMpscReader<512> channel_output_reader;
pw::stream::MpscWriter channel_output_writer;
SimpleAllocatorForTest<> allocator;
pw::thread::test::TestThreadContext read_thread_cx;
pw::thread::test::TestThreadContext write_thread_cx;
};
TEST(StreamChannel, ReadsAndWritesData) {
static pw::NoDestructor<LiveForeverTestData> test_data;
static pw::NoDestructor<pw::channel::StreamChannel> stream_channel(
test_data->allocator,
test_data->channel_input_reader,
test_data->read_thread_cx.options(),
test_data->channel_output_writer,
test_data->write_thread_cx.options());
PendFuncTask read_task([&](Context& cx) -> Poll<> {
auto read = stream_channel->PendRead(cx);
if (read.IsPending()) {
return Pending();
}
EXPECT_EQ(read->status(), pw::OkStatus());
if (read->ok()) {
ExpectElementsEqual(**read, {1_b, 2_b, 3_b});
}
return Ready();
});
MultiBuf to_send = test_data->allocator.BufWith({4_b, 5_b, 6_b});
PendFuncTask write_task([&](Context& cx) -> Poll<> {
if (stream_channel->PendReadyToWrite(cx).IsPending()) {
return Pending();
}
PW_TEST_EXPECT_OK(stream_channel->StageWrite(std::move(to_send)));
return Ready();
});
pw::async2::Dispatcher dispatcher;
dispatcher.Post(write_task);
dispatcher.Post(read_task);
EXPECT_EQ(Pending(), dispatcher.RunUntilStalled());
std::array<const std::byte, 3> data_to_send({1_b, 2_b, 3_b});
ASSERT_EQ(pw::OkStatus(),
test_data->channel_input_writer.Write(data_to_send));
dispatcher.RunToCompletion();
}
} // namespace