| // Copyright 2023 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "examples/components/pw_rpc/runner/log_proxy.h" |
| |
| #include <fidl/fuchsia.logger/cpp/fidl.h> |
| #include <lib/async/cpp/task.h> |
| #include <lib/async/dispatcher.h> |
| #include <lib/syslog/cpp/log_level.h> |
| #include <lib/syslog/cpp/macros.h> |
| |
| #include <optional> |
| #include <thread> |
| |
| #include "pw_assert/assert.h" |
| #include "pw_assert/check.h" |
| #include "pw_hdlc/decoder.h" |
| #include "pw_hdlc/rpc_channel.h" |
| #include "pw_log/levels.h" |
| #include "pw_log/proto/log.pwpb.h" |
| #include "pw_log/proto/log.raw_rpc.pb.h" |
| #include "pw_protobuf/decoder.h" |
| #include "pw_rpc/channel.h" |
| #include "pw_rpc/client.h" |
| #include "pw_span/span.h" |
| #include "pw_stream/socket_stream.h" |
| |
| namespace { |
| |
| constexpr uint64_t kLoggingRpcAddress = 10000; |
| |
| template <typename S, uint32_t kChannelId = 10000, size_t kMTU = 1055, |
| uint64_t kRpcAddress = kLoggingRpcAddress> |
| class SocketClient { |
| public: |
| explicit SocketClient(pw::stream::SocketStream stream) : stream_(std::move(stream)) {} |
| |
| typename S::Client* operator->() { return &service_client_; } |
| |
| void ProcessPackets() { |
| constexpr size_t kDecoderBufferSize = pw::hdlc::Decoder::RequiredBufferSizeForFrameSize(kMTU); |
| std::array<std::byte, kDecoderBufferSize> decode_buffer; |
| pw::hdlc::Decoder decoder(decode_buffer); |
| |
| while (true) { |
| std::byte byte[1]; |
| pw::Result<pw::ByteSpan> read = stream_.Read(byte); |
| |
| if (read.status() == pw::Status::OutOfRange()) { |
| // Channel closed. |
| should_terminate_ = true; |
| } |
| |
| if (should_terminate_) { |
| return; |
| } |
| |
| if (!read.ok() || read->empty()) { |
| continue; |
| } |
| |
| pw::Result<pw::hdlc::Frame> result = decoder.Process(*byte); |
| if (result.ok()) { |
| pw::hdlc::Frame& frame = result.value(); |
| if (frame.address() == kRpcAddress) { |
| PW_ASSERT(client_.ProcessPacket(frame.data()).ok()); |
| } |
| } |
| } |
| } |
| |
| void terminate() { |
| should_terminate_ = true; |
| stream_.Close(); |
| } |
| |
| private: |
| pw::stream::SocketStream stream_; |
| pw::hdlc::FixedMtuChannelOutput<kMTU> channel_output_{stream_, kRpcAddress, "socket"}; |
| pw::rpc::Channel channel_{pw::rpc::Channel::Create<kChannelId>(&channel_output_)}; |
| pw::rpc::Client client_{pw::span(&channel_, 1)}; |
| typename S::Client service_client_{client_, kChannelId}; |
| bool should_terminate_ = false; |
| }; |
| |
| pw::Status DecodeOptionallyTokenizedData(pw::protobuf::Decoder& entry_decoder, std::string* out) { |
| pw::ConstByteSpan tokenized_data; |
| PW_TRY(entry_decoder.ReadBytes(&tokenized_data)); |
| *out = std::string(std::string_view(reinterpret_cast<const char*>(tokenized_data.data()), |
| tokenized_data.size())); |
| return pw::OkStatus(); |
| } |
| |
| #define PW_TRY_NEXT(expr) _PW_TRY_NEXT(_PW_TRY_UNIQUE(__LINE__), expr) |
| |
| #define _PW_TRY_NEXT(result, expr) \ |
| do { \ |
| if (auto result = (expr); !result.ok() && !result.IsOutOfRange()) { \ |
| return ::pw::internal::ConvertToStatus(result); \ |
| } \ |
| } while (0) |
| |
| fuchsia_logging::LogSeverity ConvertLogLevel(uint8_t level) { |
| switch (level) { |
| case PW_LOG_LEVEL_DEBUG: |
| return fuchsia_logging::Debug; |
| case PW_LOG_LEVEL_INFO: |
| return fuchsia_logging::Info; |
| case PW_LOG_LEVEL_WARN: |
| return fuchsia_logging::Warn; |
| case PW_LOG_LEVEL_ERROR: |
| case PW_LOG_LEVEL_CRITICAL: |
| return fuchsia_logging::Error; |
| case PW_LOG_LEVEL_FATAL: |
| return fuchsia_logging::Fatal; |
| default: |
| return fuchsia_logging::Info; |
| } |
| } |
| |
| pw::Status LogEntry(pw::protobuf::Decoder& entry_decoder, const fuchsia_logging::Logger& logger) { |
| PW_TRY_NEXT(entry_decoder.Next()); |
| if (entry_decoder.FieldNumber() != |
| static_cast<uint32_t>(pw::log::pwpb::LogEntry::Fields::kMessage)) { |
| // Valid log, but no message |
| return pw::OkStatus(); |
| } |
| std::string message; |
| PW_TRY(DecodeOptionallyTokenizedData(entry_decoder, &message)); |
| if (message.empty()) { |
| // Valid log, but no message |
| return pw::OkStatus(); |
| } |
| PW_TRY_NEXT(entry_decoder.Next()); |
| |
| uint32_t line = 0; |
| fuchsia_logging::LogSeverity level = fuchsia_logging::Info; |
| if (entry_decoder.FieldNumber() == |
| static_cast<uint32_t>(pw::log::pwpb::LogEntry::Fields::kLineLevel)) { |
| uint32_t line_level; |
| PW_TRY(entry_decoder.ReadUint32(&line_level)); |
| // Fuchsia and pigweed log severity codes are compatible |
| level = ConvertLogLevel(static_cast<uint8_t>(line_level & PW_LOG_LEVEL_BITMASK)); |
| if (level < logger.GetMinSeverity()) { |
| return pw::OkStatus(); |
| } |
| line = (line_level & ~PW_LOG_LEVEL_BITMASK) >> PW_LOG_LEVEL_BITS; |
| PW_TRY_NEXT(entry_decoder.Next()); |
| } |
| |
| if (entry_decoder.FieldNumber() == |
| static_cast<uint32_t>(pw::log::pwpb::LogEntry::Fields::kFlags)) { |
| PW_TRY_NEXT(entry_decoder.Next()); |
| } |
| |
| // TODO: forward this? |
| if (entry_decoder.FieldNumber() == |
| static_cast<uint32_t>(pw::log::pwpb::LogEntry::Fields::kTimestamp) || |
| entry_decoder.FieldNumber() == |
| static_cast<uint32_t>(pw::log::pwpb::LogEntry::Fields::kTimeSinceLastEntry |
| |
| )) { |
| PW_TRY_NEXT(entry_decoder.Next()); |
| } |
| |
| // TODO: forward this? |
| if (entry_decoder.FieldNumber() == |
| static_cast<uint32_t>(pw::log::pwpb::LogEntry::Fields::kDropped)) { |
| PW_TRY_NEXT(entry_decoder.Next()); |
| } |
| |
| // TODO: forward this? |
| if (entry_decoder.FieldNumber() == |
| static_cast<uint32_t>(pw::log::pwpb::LogEntry::Fields::kModule)) { |
| PW_TRY_NEXT(entry_decoder.Next()); |
| } |
| |
| std::string file; |
| if (entry_decoder.FieldNumber() == |
| static_cast<uint32_t>(pw::log::pwpb::LogEntry::Fields::kFile)) { |
| PW_TRY(DecodeOptionallyTokenizedData(entry_decoder, &file)); |
| PW_TRY_NEXT(entry_decoder.Next()); |
| } |
| |
| if (entry_decoder.FieldNumber() == |
| static_cast<uint32_t>(pw::log::pwpb::LogEntry::Fields::kThread)) { |
| PW_TRY_NEXT(entry_decoder.Next()); |
| } |
| |
| if (logger.IsValid()) { |
| fuchsia_logging::LogBuffer buffer = |
| fuchsia_logging::LogBufferBuilder(level).WithFile(file, line).WithMsg(message).Build(); |
| [[maybe_unused]] zx::result<> result = logger.FlushBuffer(buffer); |
| } |
| return pw::OkStatus(); |
| } |
| |
| void ListenNext(SocketClient<pw::log::pw_rpc::raw::Logs>& sc, const fuchsia_logging::Logger& logger, |
| pw::ConstByteSpan response) { |
| pw::protobuf::Decoder entries_decoder(response); |
| while (entries_decoder.Next().ok()) { |
| if (static_cast<pw::log::pwpb::LogEntries::Fields>(entries_decoder.FieldNumber()) == |
| pw::log::pwpb::LogEntries::Fields::kEntries) { |
| pw::ConstByteSpan entry; |
| if (!entries_decoder.ReadBytes(&entry).ok()) { |
| FX_LOG_KV(WARNING, "Failed to decode pigweed log entry"); |
| continue; |
| } |
| pw::protobuf::Decoder entry_decoder(entry); |
| ::pw::Status status = LogEntry(entry_decoder, logger); |
| if (!status.ok()) { |
| FX_LOG_KV(WARNING, "Encountered invalid pigweed log entry"); |
| } |
| } else if (static_cast<pw::log::pwpb::LogEntries::Fields>(entries_decoder.FieldNumber()) == |
| pw::log::pwpb::LogEntries::Fields::kFirstEntrySequenceId) { |
| continue; |
| } |
| } |
| } |
| |
| } // namespace |
| |
| void LogProxy::Detach() { |
| LogProxy self{std::move(*this)}; |
| std::thread thread([self = std::move(self)]() mutable { self.Run(); }); |
| thread.detach(); |
| } |
| |
| void LogProxy::Run() { |
| SocketClient<pw::log::pw_rpc::raw::Logs> sc{std::move(stream_)}; |
| auto call = sc->Listen( |
| {}, [&sc, this](pw::ConstByteSpan response) { ListenNext(sc, logger_, response); }, |
| [&](pw::Status status) { |
| if (!status.ok()) { |
| FX_LOG_KV(INFO, "Log listener completed with error", FX_KV("status", status.str())); |
| } |
| sc.terminate(); |
| }, |
| [&](pw::Status status) { |
| FX_LOG_KV(INFO, "Failed to read logs", FX_KV("status", status.str())); |
| sc.terminate(); |
| }); |
| |
| sc.ProcessPackets(); |
| |
| fuchsia_logging::LogBuffer buffer = fuchsia_logging::LogBufferBuilder(fuchsia_logging::Info) |
| .WithFile(__FILE__, __LINE__) |
| .WithMsg("Connection to proxy has terminated. Exiting.") |
| .Build(); |
| [[maybe_unused]] zx::result<> result = logger_.FlushBuffer(buffer); |
| } |