blob: 5028ecaca78e01b5cc86a598698f4aacc2736797 [file] [log] [blame]
// Copyright 2025 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include <fidl/fuchsia.tracing.controller/cpp/fidl.h>
#include <fidl/fuchsia.tracing/cpp/fidl.h>
#include <lib/async-loop/cpp/loop.h>
#include <lib/async-loop/default.h>
#include <lib/component/incoming/cpp/protocol.h>
#include <lib/syslog/cpp/macros.h>
#include <lib/zx/eventpair.h>
#include <lib/zx/socket.h>
#include <stdlib.h>
#include <gtest/gtest.h>
#include <trace-reader/reader.h>
#include <trace-reader/records.h>
/*
* This reads 10 MiB of data in streaming mode and checks that the data is valid, in order, and not
* duplicated.
*/
namespace {
void ConsumeEvents(zx::socket in_socket, zx::eventpair e) {
size_t most_recent_arg = 0;
trace::TraceReader::RecordConsumer handle_events = [&most_recent_arg](trace::Record record) {
if (record.type() == trace::RecordType::kEvent) {
uint64_t counter = record.GetEvent().arguments[0].value().GetUint64();
ASSERT_GT(counter, most_recent_arg);
most_recent_arg = counter;
}
};
trace::TraceReader reader(std::move(handle_events), [](std::string_view) {});
constexpr size_t kBufferSize = 4096;
uint8_t buffer[kBufferSize];
uint8_t* buffer_base = buffer;
size_t leftover_bytes = 0;
size_t bytes_read = 0;
for (;;) {
size_t actual = 0;
zx_status_t read_result =
in_socket.read(0, buffer_base, sizeof(buffer) - leftover_bytes, &actual);
if (read_result == ZX_ERR_SHOULD_WAIT) {
zx_signals_t signals;
in_socket.wait_one(ZX_SOCKET_READABLE | ZX_SIGNAL_HANDLE_CLOSED,
zx::deadline_after(zx::sec(1)), &signals);
continue;
}
if (read_result == ZX_ERR_PEER_CLOSED) {
return;
}
bytes_read += actual;
if (bytes_read > size_t{10} * 1024 * 1024) {
ASSERT_EQ(ZX_OK, e.signal_peer(0, ZX_USER_SIGNAL_0));
}
trace::Chunk data{reinterpret_cast<uint64_t*>(buffer), (actual + leftover_bytes) >> 3};
reader.ReadRecords(data);
// trace::Chunk only deals in full words so we may have some bytes we rounded off
size_t unchunked_bytes = (actual + leftover_bytes) % 8;
// We may have leftover data in the chunk if our read boundary was not on a record boundary.
// Copy it back into the buffer for the next round.
leftover_bytes = (data.remaining_words() * 8) + unchunked_bytes;
if (leftover_bytes != 0) {
memcpy(buffer, buffer + data.current_byte_offset(), leftover_bytes);
}
buffer_base = buffer + leftover_bytes;
}
}
} // namespace
TEST(StreamingTest, Stream) {
async::Loop loop(&kAsyncLoopConfigAttachToCurrentThread);
zx::result client_end = component::Connect<fuchsia_tracing_controller::Provisioner>();
ASSERT_TRUE(client_end.is_ok());
const fidl::SyncClient client{std::move(*client_end)};
// Wait for the producer to attach
for (unsigned retries = 0; retries < 5; retries++) {
fidl::Result<fuchsia_tracing_controller::Provisioner::GetProviders> providers =
client->GetProviders();
ASSERT_TRUE(providers.is_ok());
if (providers->providers().size() == 1) {
break;
}
sleep(1);
}
zx::socket in_socket;
zx::socket outgoing_socket;
ASSERT_EQ(zx::socket::create(0u, &in_socket, &outgoing_socket), ZX_OK);
const fuchsia_tracing_controller::TraceConfig config{{
.buffer_size_megabytes_hint = uint32_t{1},
.buffering_mode = fuchsia_tracing::BufferingMode::kStreaming,
}};
fidl::Result<fuchsia_tracing_controller::Provisioner::GetProviders> providers =
client->GetProviders();
auto endpoints = fidl::Endpoints<fuchsia_tracing_controller::Session>::Create();
auto init_response =
client->InitializeTracing({std::move(endpoints.server), config, std::move(outgoing_socket)});
ASSERT_TRUE(init_response.is_ok());
zx::eventpair e1;
zx::eventpair e2;
zx::eventpair::create(0, &e1, &e2);
std::thread t1{ConsumeEvents, std::move(in_socket), std::move(e1)};
{
const fidl::SyncClient controller_client{std::move(endpoints.client)};
controller_client->StartTracing({});
loop.Run(zx::deadline_after(zx::sec(1)));
zx_signals_t signals;
zx_status_t res = e2.wait_one(ZX_USER_SIGNAL_0, zx::deadline_after(zx::sec(10)), &signals);
ASSERT_EQ(res, ZX_OK);
ASSERT_TRUE((signals & ZX_USER_SIGNAL_0) != 0);
controller_client->StopTracing({{.write_results = true}});
loop.Run(zx::deadline_after(zx::sec(1)));
}
t1.join();
loop.RunUntilIdle();
}