| // Copyright 2022 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "src/performance/perfetto-bridge/consumer_adapter.h" |
| |
| #include <lib/async/cpp/task.h> |
| #include <lib/async/default.h> |
| #include <lib/fit/function.h> |
| #include <lib/syslog/cpp/macros.h> |
| #include <lib/trace/event.h> |
| #include <unistd.h> |
| #include <zircon/status.h> |
| |
| #include <algorithm> |
| #include <functional> |
| #include <unordered_set> |
| #include <vector> |
| |
| #include <perfetto/ext/tracing/core/trace_packet.h> |
| #include <perfetto/ext/tracing/core/tracing_service.h> |
| #include <rapidjson/document.h> |
| #include <rapidjson/stringbuffer.h> |
| #include <rapidjson/writer.h> |
| |
| #include "lib/fpromise/bridge.h" |
| #include "lib/trace-engine/context.h" |
| #include "lib/trace-engine/instrumentation.h" |
| #include "lib/trace-provider/provider.h" |
| #include "src/lib/fxl/strings/string_printf.h" |
| |
| #include <protos/perfetto/common/track_event_descriptor.gen.h> |
| #include <protos/perfetto/config/track_event/track_event_config.gen.h> |
| #include <third_party/perfetto/protos/perfetto/common/data_source_descriptor.gen.h> |
| #include <third_party/perfetto/protos/perfetto/common/trace_stats.gen.h> |
| #include <third_party/perfetto/protos/perfetto/common/tracing_service_state.gen.h> |
| #include <third_party/perfetto/protos/perfetto/config/chrome/chrome_config.gen.h> |
| #include <third_party/perfetto/protos/perfetto/config/data_source_config.gen.h> |
| #include <third_party/perfetto/protos/perfetto/config/trace_config.gen.h> |
| #include <third_party/perfetto/protos/perfetto/config/track_event/track_event_config.gen.h> |
| |
| namespace { |
| // The size of the consumer buffer. |
| constexpr size_t kConsumerBufferSizeKb = 20ul * 1024ul; // 20MB. |
| |
| // The delay between buffer utilization checks. |
| constexpr int kConsumerStatsPollIntervalMs = 500; |
| |
| // Interval for recreating interned string data, in milliseconds. |
| // Used for stream recovery in the event of data loss. |
| constexpr uint32_t kIncrementalStateClearMs = 4000; |
| |
| constexpr char kBlobName[] = "perfetto-bridge"; |
| constexpr char kChromiumTraceEvent[] = "org.chromium.trace_event"; |
| constexpr char kTrackEvent[] = "track_event"; |
| |
| class EmptyConsumer : public perfetto::Consumer { |
| private: |
| void OnConnect() override {} |
| void OnDisconnect() override {} |
| void OnTracingDisabled(const std::string& error) override {} |
| void OnTraceData(std::vector<perfetto::TracePacket> packets, bool has_more) override {} |
| void OnDetach(bool success) override {} |
| void OnAttach(bool success, const perfetto::TraceConfig&) override {} |
| void OnTraceStats(bool success, const perfetto::TraceStats&) override {} |
| void OnObservableEvents(const perfetto::ObservableEvents&) override {} |
| }; |
| |
| void LogTraceStats(const perfetto::TraceStats& stats) { |
| const auto& buffer_stats = stats.buffer_stats().front(); |
| FX_LOGS(INFO) << fxl::StringPrintf( |
| "Trace stats: " |
| "producers_connected: %u , " |
| "data_sources_registered: %u , " |
| "tracing_sessions: %u", |
| stats.producers_connected(), stats.data_sources_registered(), stats.tracing_sessions()); |
| FX_LOGS(INFO) << fxl::StringPrintf( |
| "Consumer buffer stats :" |
| "consumer bytes_written: %lu, " |
| "consumer bytes_read: %lu, " |
| "consumer bytes_overwritten (lost): %lu, ", |
| buffer_stats.bytes_written(), buffer_stats.bytes_read(), buffer_stats.bytes_overwritten()); |
| if (buffer_stats.bytes_overwritten() > 0) { |
| // If too much data was lost, then the consumer buffer should be enlarged |
| // and/or the drain interval shortened. |
| FX_LOGS(WARNING) << "Perfetto consumer buffer overrun detected."; |
| } |
| } |
| |
| // TODO(https://fxbug.dev/42066806): Remove this once the migration to track_event_config is complete. |
| std::string GetChromeTraceConfigString( |
| const perfetto::protos::gen::TrackEventConfig& track_event_config) { |
| rapidjson::Document chrome_trace_config(rapidjson::kObjectType); |
| auto& allocator = chrome_trace_config.GetAllocator(); |
| |
| rapidjson::Value included_categories(rapidjson::kArrayType); |
| for (const auto& enabled_category : track_event_config.enabled_categories()) { |
| included_categories.PushBack(rapidjson::StringRef(enabled_category), allocator); |
| } |
| chrome_trace_config.AddMember("included_categories", included_categories, allocator); |
| |
| rapidjson::Value excluded_categories(rapidjson::kArrayType); |
| for (const auto& disabled_category : track_event_config.disabled_categories()) { |
| excluded_categories.PushBack(rapidjson::StringRef(disabled_category), allocator); |
| } |
| chrome_trace_config.AddMember("excluded_categories", excluded_categories, allocator); |
| |
| rapidjson::StringBuffer buffer; |
| rapidjson::Writer writer(buffer); |
| chrome_trace_config.Accept(writer); |
| return std::string(buffer.GetString(), buffer.GetSize()); |
| } |
| |
| perfetto::protos::gen::TrackEventConfig GetTrackEventConfig( |
| const std::vector<std::string>& enabled_categories) { |
| perfetto::protos::gen::TrackEventConfig track_event_config; |
| if (!enabled_categories.empty()) { |
| // Disable all categories that aren't added to `enabled_categories` |
| track_event_config.add_disabled_categories("*"); |
| } |
| for (const auto& enabled_category : enabled_categories) { |
| track_event_config.add_enabled_categories(enabled_category); |
| } |
| return track_event_config; |
| } |
| |
| perfetto::protos::gen::TraceConfig_DataSource* AddDataSource( |
| perfetto::TraceConfig& trace_config, const std::string& data_source_name, |
| const std::vector<std::string>& enabled_categories) { |
| perfetto::protos::gen::TraceConfig_DataSource* data_source = trace_config.add_data_sources(); |
| perfetto::protos::gen::DataSourceConfig* data_source_config = data_source->mutable_config(); |
| data_source_config->set_name(data_source_name); |
| |
| const auto track_event_config = GetTrackEventConfig(enabled_categories); |
| data_source_config->set_track_event_config_raw(track_event_config.SerializeAsString()); |
| |
| // TODO(https://fxbug.dev/42066806): Remove this once the migration to track_event_config is complete. |
| if (data_source_name == kChromiumTraceEvent) { |
| data_source_config->mutable_chrome_config()->set_trace_config( |
| GetChromeTraceConfigString(track_event_config)); |
| } |
| return data_source; |
| } |
| |
| void AddTargetedDataSource(perfetto::TraceConfig& trace_config, const std::string& data_source_name, |
| const std::string& producer_name, |
| const std::vector<std::string>& enabled_categories) { |
| auto data_source = AddDataSource(trace_config, data_source_name, enabled_categories); |
| data_source->add_producer_name_filter(producer_name); |
| } |
| |
| void AddDataSources(perfetto::TraceConfig& trace_config, |
| const trace::ProviderConfig& provider_config) { |
| std::vector<std::string> umbrella_categories; |
| std::unordered_map<std::string, std::vector<std::string>> producer_specific_categories; |
| |
| for (const auto& enabled_category : provider_config.categories) { |
| auto separator_pos = enabled_category.find('/'); |
| if (separator_pos == std::string::npos) { |
| umbrella_categories.push_back(enabled_category); |
| } else { |
| std::string producer_name = enabled_category.substr(0, separator_pos); |
| std::string category = enabled_category.substr(separator_pos + 1); |
| producer_specific_categories[producer_name].push_back(category); |
| } |
| } |
| |
| for (const auto data_source_name : std::vector{kTrackEvent, kChromiumTraceEvent}) { |
| AddDataSource(trace_config, data_source_name, umbrella_categories); |
| for (const auto& [producer_name, enabled_categories] : producer_specific_categories) { |
| AddTargetedDataSource(trace_config, data_source_name, producer_name, enabled_categories); |
| } |
| } |
| } |
| |
| class FuchsiaTracingImpl : public FuchsiaTracing { |
| public: |
| explicit FuchsiaTracingImpl(trace::TraceProvider* trace_provider) |
| : trace_provider_(trace_provider) { |
| FX_DCHECK(trace_provider_); |
| } |
| ~FuchsiaTracingImpl() override { |
| FX_DCHECK(!prolonged_context_); |
| FX_DCHECK(!blob_write_context_); |
| } |
| |
| void StartObserving(fit::function<void(trace_state_t)> observe_cb) override { |
| trace_observer_.Start( |
| async_get_default_dispatcher(), |
| [observe_cb = std::move(observe_cb)]() mutable { observe_cb(trace_state()); }); |
| } |
| |
| void AcquireProlongedContext() override { |
| FX_DCHECK(!prolonged_context_); |
| |
| prolonged_context_ = trace_acquire_prolonged_context(); |
| FX_DCHECK(prolonged_context_); |
| } |
| |
| void ReleaseProlongedContext() override { |
| FX_DCHECK(prolonged_context_); |
| FX_DCHECK(!blob_write_context_); |
| |
| trace_release_prolonged_context(prolonged_context_); |
| prolonged_context_ = nullptr; |
| } |
| |
| void AcquireWriteContext() override { |
| FX_DCHECK(!blob_write_context_); |
| |
| blob_write_context_ = trace_acquire_context(); |
| if (blob_write_context_) { |
| trace_context_register_string_literal(blob_write_context_, kBlobName, &blob_name_ref_); |
| } |
| } |
| |
| bool HasWriteContext() override { return blob_write_context_ != nullptr; } |
| |
| void WriteBlob(const char* data, size_t size) override { |
| FX_DCHECK(blob_write_context_); |
| trace_context_write_blob_record(blob_write_context_, TRACE_BLOB_TYPE_PERFETTO, &blob_name_ref_, |
| data, size); |
| } |
| |
| void ReleaseWriteContext() override { |
| if (blob_write_context_) { |
| trace_release_context(blob_write_context_); |
| blob_write_context_ = nullptr; |
| } |
| } |
| |
| trace::ProviderConfig GetProviderConfig() override { |
| return trace_provider_->GetProviderConfig(); |
| } |
| |
| void SetGetKnownCategoriesCallback(trace::GetKnownCategoriesCallback callback) override { |
| trace_provider_->SetGetKnownCategoriesCallback(std::move(callback)); |
| } |
| |
| private: |
| trace_prolonged_context_t* prolonged_context_ = nullptr; |
| trace_context_t* blob_write_context_ = nullptr; |
| trace_string_ref_t blob_name_ref_; |
| trace::TraceProvider* trace_provider_; |
| |
| // Used for handling Fuchsia trace system events. |
| trace::TraceObserver trace_observer_; |
| }; |
| |
| } // namespace |
| |
| ConsumerAdapter::ConsumerAdapter(ConnectConsumerCallback connect_callback, |
| std::unique_ptr<FuchsiaTracing> fuchsia_tracing, |
| perfetto::base::TaskRunner* perfetto_task_runner) |
| : perfetto_task_runner_(perfetto_task_runner), |
| connect_callback_(std::move(connect_callback)), |
| fuchsia_tracing_(std::move(fuchsia_tracing)) { |
| FX_DCHECK(connect_callback_); |
| FX_DCHECK(fuchsia_tracing_); |
| FX_DCHECK(perfetto_task_runner_); |
| |
| fuchsia_tracing_->SetGetKnownCategoriesCallback([this] { return GetKnownCategories(); }); |
| fuchsia_tracing_->StartObserving(fit::bind_member(this, &ConsumerAdapter::OnTraceStateUpdate)); |
| } |
| |
| ConsumerAdapter::ConsumerAdapter(perfetto::TracingService* tracing_service, |
| trace::TraceProvider* trace_provider, |
| perfetto::base::TaskRunner* perfetto_task_runner) |
| : ConsumerAdapter( |
| [tracing_service](perfetto::Consumer* consumer) { |
| return tracing_service->ConnectConsumer(consumer, 0); |
| }, |
| std::make_unique<FuchsiaTracingImpl>(trace_provider), perfetto_task_runner) {} |
| |
| ConsumerAdapter::~ConsumerAdapter() { |
| perfetto_task_runner_->PostTask( |
| [endpoint = this->consumer_endpoint_.release()]() { delete endpoint; }); |
| } |
| |
| ConsumerAdapter::State ConsumerAdapter::GetState() { |
| std::lock_guard lock(state_mutex_); |
| return state_; |
| } |
| |
| void ConsumerAdapter::ChangeState(State new_state) { |
| std::lock_guard lock(state_mutex_); |
| State old_state = state_; |
| |
| bool valid_transition = false; |
| switch (new_state) { |
| case State::INACTIVE: |
| valid_transition = old_state == State::SHUTDOWN_STATS; |
| break; |
| case State::ACTIVE: |
| valid_transition = |
| old_state == State::INACTIVE || old_state == State::STATS || old_state == State::READING; |
| break; |
| case State::STATS: |
| valid_transition = old_state == State::ACTIVE; |
| break; |
| case State::READING: |
| valid_transition = old_state == State::STATS; |
| break; |
| case State::SHUTDOWN_FLUSH: |
| valid_transition = old_state == State::ACTIVE || |
| old_state == State::READING_PENDING_SHUTDOWN || old_state == State::STATS; |
| break; |
| case State::READING_PENDING_SHUTDOWN: |
| valid_transition = old_state == State::READING; |
| break; |
| case State::SHUTDOWN_DISABLED: |
| valid_transition = old_state == State::SHUTDOWN_FLUSH || old_state == State::ACTIVE; |
| break; |
| case State::SHUTDOWN_READING: |
| valid_transition = old_state == State::SHUTDOWN_DISABLED; |
| break; |
| case State::SHUTDOWN_STATS: |
| valid_transition = old_state == State::SHUTDOWN_READING; |
| break; |
| } |
| |
| FX_CHECK(valid_transition) << "Unknown state transition: " << static_cast<int>(old_state) << "->" |
| << static_cast<int>(new_state); |
| state_ = new_state; |
| } |
| |
| void ConsumerAdapter::OnTraceData(std::vector<perfetto::TracePacket> packets, bool has_more) { |
| FX_DCHECK(GetState() == State::READING || GetState() == State::SHUTDOWN_READING || |
| GetState() == State::READING_PENDING_SHUTDOWN); |
| FX_DCHECK(perfetto_task_runner_->RunsTasksOnCurrentThread()); |
| |
| if (fuchsia_tracing_->HasWriteContext()) { |
| // Proto messages must be written as atomic blobs to prevent truncation mid-message |
| // if the output buffer is filled. |
| std::string proto_str; |
| for (auto& cur_packet : packets) { |
| auto [preamble_data, preamble_size] = cur_packet.GetProtoPreamble(); |
| proto_str.assign(preamble_data, preamble_size); |
| |
| for (auto& cur_slice : cur_packet.slices()) { |
| proto_str.append(reinterpret_cast<const char*>(cur_slice.start), cur_slice.size); |
| } |
| |
| if (proto_str.size() > TRACE_MAX_BLOB_SIZE) { |
| FX_LOGS(WARNING) << "Dropping excessively long Perfetto message (size=" << proto_str.size() |
| << " bytes)"; |
| } else { |
| fuchsia_tracing_->WriteBlob(proto_str.data(), proto_str.size()); |
| } |
| proto_str.clear(); |
| } |
| } |
| |
| if (!has_more) { |
| OnPerfettoReadBuffersComplete(); |
| } |
| } |
| |
| void ConsumerAdapter::OnTraceStateUpdate(trace_state_t new_state) { |
| switch (new_state) { |
| case TRACE_STARTED: |
| perfetto_task_runner_->PostTask([this]() { OnStartTracing(); }); |
| return; |
| case TRACE_STOPPING: |
| perfetto_task_runner_->PostTask([this]() { |
| if (GetState() == State::READING) { |
| ChangeState(State::READING_PENDING_SHUTDOWN); |
| } else { |
| CallPerfettoFlush(); |
| } |
| }); |
| return; |
| case TRACE_STOPPED: |
| break; |
| } |
| } |
| |
| void ConsumerAdapter::OnStartTracing() { |
| FX_DCHECK(perfetto_task_runner_->RunsTasksOnCurrentThread()); |
| |
| perfetto::TraceConfig trace_config; |
| trace_config.mutable_incremental_state_config()->set_clear_period_ms(kIncrementalStateClearMs); |
| |
| perfetto::TraceConfig::BufferConfig* buffer_config = trace_config.add_buffers(); |
| buffer_config->set_size_kb(kConsumerBufferSizeKb); |
| |
| // RING_BUFFER is the only FillPolicy suitable for streaming, because DISCARD will enter a |
| // bad state in the event of consumer buffer saturation (e.g. if there is a burst of data). |
| buffer_config->set_fill_policy(perfetto::TraceConfig::BufferConfig::RING_BUFFER); |
| |
| AddDataSources(trace_config, fuchsia_tracing_->GetProviderConfig()); |
| |
| FX_CHECK(!consumer_endpoint_); |
| consumer_endpoint_ = connect_callback_(this); |
| FX_DCHECK(consumer_endpoint_); |
| consumer_endpoint_->EnableTracing(trace_config); |
| |
| // Explicitly manage the lifetime of the Fuchsia tracing session. |
| fuchsia_tracing_->AcquireProlongedContext(); |
| |
| ChangeState(State::ACTIVE); |
| SchedulePerfettoGetStats(); |
| } |
| |
| void ConsumerAdapter::CallPerfettoDisableTracing() { |
| FX_DCHECK(perfetto_task_runner_->RunsTasksOnCurrentThread()); |
| |
| ChangeState(State::SHUTDOWN_DISABLED); |
| consumer_endpoint_->DisableTracing(); |
| } |
| |
| void ConsumerAdapter::SchedulePerfettoGetStats() { |
| FX_DCHECK(GetState() == State::ACTIVE); |
| |
| perfetto_task_runner_->PostDelayedTask( |
| [this]() { |
| if (GetState() == State::ACTIVE) { |
| CallPerfettoGetTraceStats(false /* on_shutdown */); |
| } |
| }, |
| kConsumerStatsPollIntervalMs); |
| } |
| |
| void ConsumerAdapter::CallPerfettoReadBuffers(bool on_shutdown) { |
| FX_DCHECK(!fuchsia_tracing_->HasWriteContext()); |
| ChangeState(on_shutdown ? State::SHUTDOWN_READING : State::READING); |
| |
| fuchsia_tracing_->AcquireWriteContext(); |
| if (fuchsia_tracing_->HasWriteContext()) { |
| consumer_endpoint_->ReadBuffers(); |
| } else { |
| // The Fuchsia tracing context is gone, so there is nowhere to write |
| // the data to. |
| OnPerfettoReadBuffersComplete(); |
| } |
| } |
| |
| void ConsumerAdapter::OnPerfettoReadBuffersComplete() { |
| fuchsia_tracing_->ReleaseWriteContext(); |
| |
| if (GetState() == State::READING) { |
| ChangeState(State::ACTIVE); |
| SchedulePerfettoGetStats(); |
| } else if (GetState() == State::SHUTDOWN_READING) { |
| CallPerfettoGetTraceStats(true); |
| } else if (GetState() == State::READING_PENDING_SHUTDOWN) { |
| CallPerfettoFlush(); |
| } |
| } |
| |
| void ConsumerAdapter::CallPerfettoFlush() { |
| ChangeState(State::SHUTDOWN_FLUSH); |
| consumer_endpoint_->Flush(0, [this](bool success) { |
| if (!success) { |
| FX_LOGS(WARNING) << "Flush failed."; |
| } |
| CallPerfettoDisableTracing(); |
| }); |
| } |
| |
| void ConsumerAdapter::CallPerfettoGetTraceStats(bool on_shutdown) { |
| ChangeState(on_shutdown ? State::SHUTDOWN_STATS : State::STATS); |
| consumer_endpoint_->GetTraceStats(); |
| } |
| |
| fpromise::promise<std::vector<trace::KnownCategory>> ConsumerAdapter::GetKnownCategories() { |
| fpromise::bridge<std::vector<trace::KnownCategory>> bridge; |
| auto completer = std::make_shared<fpromise::completer<std::vector<trace::KnownCategory>>>( |
| std::move(bridge.completer)); |
| auto empty_consumer = std::make_shared<EmptyConsumer>(); |
| const auto consumer_endpoint = connect_callback_(empty_consumer.get()); |
| auto on_service_state = [completer, empty_consumer]( |
| bool success, const perfetto::TracingServiceState& service_state) { |
| if (!success) { |
| completer->complete_error(); |
| } |
| std::unordered_set<trace::KnownCategory, trace::KnownCategoryHash> categories; |
| for (const auto& data_source : service_state.data_sources()) { |
| if (data_source.ds_descriptor().name() != kChromiumTraceEvent) { |
| continue; |
| } |
| auto raw_descriptor = data_source.ds_descriptor().track_event_descriptor_raw(); |
| perfetto::protos::gen::TrackEventDescriptor track_event_descriptor; |
| track_event_descriptor.ParseFromArray(raw_descriptor.data(), raw_descriptor.size()); |
| for (const auto& available_category : track_event_descriptor.available_categories()) { |
| categories.insert( |
| {.name = available_category.name(), .description = available_category.description()}); |
| } |
| } |
| completer->complete_ok(std::vector<trace::KnownCategory>{categories.begin(), categories.end()}); |
| }; |
| consumer_endpoint->QueryServiceState(std::move(on_service_state)); |
| |
| return bridge.consumer.promise(); |
| } |
| |
| void ConsumerAdapter::OnTracingDisabled(const std::string& error) { |
| FX_DCHECK(perfetto_task_runner_->RunsTasksOnCurrentThread()); |
| |
| if (!error.empty()) { |
| FX_LOGS(WARNING) << "OnTracingDisabled() reported an error: " << error; |
| } |
| |
| CallPerfettoReadBuffers(true /* shutdown */); |
| } |
| |
| void ConsumerAdapter::OnTraceStats(bool success, const perfetto::TraceStats& stats) { |
| FX_DCHECK(perfetto_task_runner_->RunsTasksOnCurrentThread()); |
| |
| if (GetState() == State::STATS) { |
| if (success) { |
| const auto& buffer_stats = stats.buffer_stats().front(); |
| const size_t buffer_used = buffer_stats.bytes_written() - |
| (buffer_stats.bytes_read() + buffer_stats.bytes_overwritten()); |
| const float utilization = |
| static_cast<float>(buffer_used) / static_cast<float>(buffer_stats.buffer_size()); |
| |
| if (utilization >= kConsumerUtilizationReadThreshold) { |
| CallPerfettoReadBuffers(false /* shutdown */); |
| return; |
| } |
| } |
| ChangeState(State::ACTIVE); |
| SchedulePerfettoGetStats(); |
| } else if (GetState() == State::SHUTDOWN_STATS) { |
| ChangeState(State::INACTIVE); |
| |
| if (success) { |
| LogTraceStats(stats); |
| } |
| |
| ShutdownTracing(); |
| } |
| } |
| |
| void ConsumerAdapter::ShutdownTracing() { |
| consumer_endpoint_.reset(); |
| fuchsia_tracing_->ReleaseWriteContext(); |
| fuchsia_tracing_->ReleaseProlongedContext(); |
| } |
| |
| // Ignored Perfetto Consumer events. |
| void ConsumerAdapter::OnConnect() {} |
| void ConsumerAdapter::OnDisconnect() {} |
| void ConsumerAdapter::OnDetach(bool success) {} |
| void ConsumerAdapter::OnAttach(bool success, const perfetto::TraceConfig&) {} |
| void ConsumerAdapter::OnObservableEvents(const perfetto::ObservableEvents&) {} |