blob: bf9e455bfb2b4620109f7fda8a1503147391bea1 [file] [log] [blame]
// Copyright 2025 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "stream_case.h"
#include <fidl/test.ipc/cpp/wire.h>
#include <lib/async-loop/cpp/loop.h>
#include <lib/async-loop/default.h>
#include <lib/syslog/cpp/macros.h>
#include <zircon/assert.h>
void StreamCase::send(zx::channel chan, Timing* cur_timing) {
ZX_ASSERT(chan.is_valid());
std::string message = std::string(config_.message_size, 'a');
start_barrier_.arrive_and_wait();
{
Timer t(&cur_timing->send_duration);
fidl::ClientEnd<test_ipc::StreamSender> client_end(std::move(chan));
fidl::WireSyncClient client(std::move(client_end));
zx::socket s1, s2;
ZX_ASSERT(zx::socket::create(ZX_SOCKET_STREAM, &s1, &s2) == ZX_OK);
ZX_ASSERT(client->Send(std::move(s1)).ok());
size_t left_to_send = config_.messages_to_send;
while (left_to_send > 0) {
std::vector<uint8_t> formatted_msg(config_.batch_size * (sizeof(uint32_t) + message.size()));
*reinterpret_cast<uint32_t*>(formatted_msg.data()) = static_cast<uint32_t>(message.size());
std::memcpy(formatted_msg.data() + sizeof(uint32_t), message.data(), message.size());
left_to_send -= 1;
for (size_t i = 1; i < config_.batch_size && left_to_send > 0; ++i) {
std::memcpy(formatted_msg.data() + (i * (sizeof(uint32_t) + message.size())),
formatted_msg.data(), sizeof(uint32_t) + message.size());
left_to_send -= 1;
}
size_t offset = 0;
while (offset < formatted_msg.size()) {
size_t actual;
auto ret =
s2.write(0, formatted_msg.data() + offset, formatted_msg.size() - offset, &actual);
if (ret == ZX_ERR_SHOULD_WAIT) {
zx_signals_t obs;
ZX_ASSERT(s2.wait_one(ZX_SOCKET_WRITABLE, zx::time::infinite(), &obs) == ZX_OK);
} else {
ZX_ASSERT(ret == ZX_OK);
offset += actual;
}
}
}
}
stop_barrier_.arrive_and_wait();
}
namespace {
class StreamReceiver : public fidl::WireServer<test_ipc::StreamSender> {
using SendCompleter = ::fidl::internal::WireCompleter<test_ipc::StreamSender::Send>;
public:
explicit StreamReceiver(size_t* received) : received_(received) {}
void Send(::test_ipc::wire::StreamSenderSendRequest* request,
SendCompleter::Sync& completer) override {
completer.Close(ZX_OK);
auto socket = std::move(request->stream);
std::vector<uint8_t> buffer;
buffer.resize(64 * 1024);
std::vector<uint8_t> remaining;
while (true) {
size_t actual;
auto ret = socket.read(0, buffer.data(), buffer.size(), &actual);
if (ret == ZX_ERR_PEER_CLOSED) {
buffer.clear();
actual = 0;
} else if (ret == ZX_ERR_SHOULD_WAIT) {
zx_signals_t obs;
socket.wait_one(ZX_SOCKET_READABLE | ZX_SOCKET_PEER_CLOSED, zx::time::infinite(), &obs);
continue;
} else {
if (ret != ZX_OK) {
FX_LOGS(ERROR) << "Failed to read: " << ret;
}
ZX_ASSERT(ret == ZX_OK);
}
remaining.insert(remaining.end(), buffer.data(), buffer.data() + actual);
std::optional<uint32_t> remaining_size;
while (remaining.size() >= sizeof(uint32_t)) {
remaining_size = *reinterpret_cast<uint32_t*>(remaining.data());
if (remaining.size() < sizeof(uint32_t) + remaining_size.value()) {
break;
}
*received_ += 1;
remaining.erase(remaining.begin(),
remaining.begin() + sizeof(uint32_t) + remaining_size.value());
remaining_size.reset();
}
if (actual == 0) {
break;
}
}
}
private:
size_t* received_;
};
} // namespace
void StreamCase::recv(zx::channel chan, Timing* cur_timing) {
ZX_ASSERT(chan.is_valid());
async::Loop loop(&kAsyncLoopConfigNeverAttachToThread);
start_barrier_.arrive_and_wait();
{
Timer t(&cur_timing->recv_duration);
fidl::ServerEnd<test_ipc::StreamSender> server_end(std::move(chan));
size_t received = 0;
StreamReceiver receiver(&received);
auto binding = fidl::BindServer(
loop.dispatcher(), std::move(server_end), &receiver,
[&loop](StreamReceiver*, fidl::UnbindInfo info,
fidl::ServerEnd<test_ipc::StreamSender> server_end) { loop.Quit(); });
loop.Run();
ZX_ASSERT(received == config_.messages_to_send);
}
stop_barrier_.arrive_and_wait();
}