blob: 1feb91ba6530900997a9dd2ca0c72e2992f94ada [file] [log] [blame]
// Copyright 2016 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "src/performance/trace_manager/trace_manager.h"
#include <fuchsia/tracing/cpp/fidl.h>
#include <lib/fidl/cpp/clone.h>
#include <lib/fpromise/bridge.h>
#include <lib/fpromise/promise.h>
#include <lib/syslog/cpp/macros.h>
#include <lib/zx/time.h>
#include <algorithm>
#include <iostream>
#include <unordered_set>
#include "src/performance/trace_manager/app.h"
namespace tracing {
namespace {
// For large traces or when verbosity is on it can take awhile to write out
// all the records. E.g., cpuperf_provider can take 40 seconds with --verbose=2
constexpr zx::duration kStopTimeout = zx::sec(60);
constexpr uint32_t kMinBufferSizeMegabytes = 1;
// These defaults are copied from fuchsia.tracing/trace_controller.fidl.
constexpr uint32_t kDefaultBufferSizeMegabytesHint = 4;
constexpr zx::duration kDefaultStartTimeout{zx::msec(5000)};
constexpr fuchsia::tracing::BufferingMode kDefaultBufferingMode =
fuchsia::tracing::BufferingMode::ONESHOT;
constexpr size_t kMaxAlertQueueDepth = 16;
uint32_t ConstrainBufferSize(uint32_t buffer_size_megabytes) {
return std::max(buffer_size_megabytes, kMinBufferSizeMegabytes);
}
struct KnownCategoryHash {
auto operator()(const fuchsia::tracing::KnownCategory& k) const -> size_t {
return std::hash<std::string>{}(k.name) ^ std::hash<std::string>{}(k.description);
}
};
struct KnownCategoryEquals {
auto operator()(const fuchsia::tracing::KnownCategory& k1,
const fuchsia::tracing::KnownCategory& k2) const -> bool {
return k1.name == k2.name && k1.description == k2.description;
}
};
using KnownCategorySet =
std::unordered_set<fuchsia::tracing::KnownCategory, KnownCategoryHash, KnownCategoryEquals>;
using KnownCategoryVector = std::vector<fuchsia::tracing::KnownCategory>;
} // namespace
TraceManager::TraceManager(TraceManagerApp* app, Config config, async::Executor& executor)
: app_(app), config_(std::move(config)), executor_(executor) {}
TraceManager::~TraceManager() = default;
void TraceManager::OnEmptyControllerSet() {
// While one controller could go away and another remain causing a trace
// to not be terminated, at least handle the common case.
FX_LOGS(DEBUG) << "Controller is gone";
if (session_) {
// Check the state first because the log messages are useful, but not if
// tracing has ended.
if (session_->state() != TraceSession::State::kTerminating) {
FX_LOGS(INFO) << "Controller is gone, terminating trace";
session_->Terminate([this](controller::Controller_TerminateTracing_Result result) {
FX_LOGS(INFO) << "Trace terminated";
session_.reset();
});
}
}
while (!watch_alert_callbacks_.empty()) {
watch_alert_callbacks_.pop();
}
while (!alerts_.empty()) {
alerts_.pop();
}
}
// fidl
void TraceManager::handle_unknown_method(uint64_t ordinal, bool method_has_response) {
FX_LOGS(WARNING) << "Received an unknown method with ordinal " << ordinal;
}
// fidl
void TraceManager::InitializeTracing(controller::TraceConfig config, zx::socket output) {
FX_LOGS(DEBUG) << "InitializeTracing";
if (session_) {
FX_LOGS(ERROR) << "Ignoring initialize request, trace already initialized";
return;
}
uint32_t default_buffer_size_megabytes = kDefaultBufferSizeMegabytesHint;
if (config.has_buffer_size_megabytes_hint()) {
const uint32_t buffer_size_mb_hint = config.buffer_size_megabytes_hint();
default_buffer_size_megabytes = ConstrainBufferSize(buffer_size_mb_hint);
}
TraceProviderSpecMap provider_specs;
if (config.has_provider_specs()) {
for (const auto& it : config.provider_specs()) {
TraceProviderSpec provider_spec;
if (it.has_buffer_size_megabytes_hint()) {
provider_spec.buffer_size_megabytes = it.buffer_size_megabytes_hint();
}
if (it.has_categories()) {
provider_spec.categories = it.categories();
}
provider_specs[it.name()] = provider_spec;
}
}
fuchsia::tracing::BufferingMode tracing_buffering_mode = kDefaultBufferingMode;
if (config.has_buffering_mode()) {
tracing_buffering_mode = config.buffering_mode();
}
const char* mode_name;
switch (tracing_buffering_mode) {
case fuchsia::tracing::BufferingMode::ONESHOT:
mode_name = "oneshot";
break;
case fuchsia::tracing::BufferingMode::CIRCULAR:
mode_name = "circular";
break;
case fuchsia::tracing::BufferingMode::STREAMING:
mode_name = "streaming";
break;
default:
FX_LOGS(ERROR) << "Invalid buffering mode: " << static_cast<unsigned>(tracing_buffering_mode);
return;
}
FX_LOGS(INFO) << "Initializing trace with " << default_buffer_size_megabytes
<< " MB buffers, buffering mode=" << mode_name;
if (provider_specs.size() > 0) {
FX_LOGS(INFO) << "Provider overrides:";
for (const auto& it : provider_specs) {
FX_LOGS(INFO) << it.first << ": buffer size "
<< it.second.buffer_size_megabytes.value_or(default_buffer_size_megabytes)
<< " MB";
}
}
std::vector<std::string> categories;
if (config.has_categories()) {
categories = config.categories();
}
zx::duration start_timeout = kDefaultStartTimeout;
if (config.has_start_timeout_milliseconds()) {
start_timeout = zx::msec(config.start_timeout_milliseconds());
}
session_ = std::make_unique<TraceSession>(
executor_, std::move(output), std::move(categories), default_buffer_size_megabytes,
tracing_buffering_mode, std::move(provider_specs), start_timeout, kStopTimeout,
[this]() {
if (session_->state() != TraceSession::State::kTerminating) {
FX_LOGS(INFO) << "Aborting and terminating trace";
session_->Terminate([this](controller::Controller_TerminateTracing_Result result) {
FX_LOGS(INFO) << "Terminated trace";
session_.reset();
});
}
},
[this](const std::string& alert_name) { OnAlert(alert_name); });
// The trace header is written now to ensure it appears first, and to avoid
// timing issues if the trace is terminated early (and the session being
// deleted).
session_->WriteTraceInfo();
for (auto& bundle : providers_) {
session_->AddProvider(&bundle);
}
session_->MarkInitialized();
}
// fidl
void TraceManager::TerminateTracing(controller::TerminateOptions options,
TerminateTracingCallback terminate_callback) {
controller::Controller_TerminateTracing_Result result;
controller::TerminateResult terminate_result;
if (!session_) {
FX_LOGS(DEBUG) << "Ignoring terminate request, tracing not initialized";
result.set_response(
controller::Controller_TerminateTracing_Response(std::move(terminate_result)));
terminate_callback(std::move(result));
return;
}
if (session_->state() == TraceSession::State::kTerminating) {
FX_LOGS(INFO) << "Ignoring terminate request. Already terminating";
result.set_response(
controller::Controller_TerminateTracing_Response(std::move(terminate_result)));
terminate_callback(std::move(result));
return;
}
if (options.has_write_results()) {
session_->set_write_results_on_terminate(options.write_results());
}
FX_LOGS(INFO) << "Terminating trace";
session_->Terminate([this, terminate_callback = std::move(terminate_callback)](
controller::Controller_TerminateTracing_Result result) {
terminate_callback(std::move(result));
session_.reset();
});
}
// fidl
void TraceManager::StartTracing(controller::StartOptions options,
StartTracingCallback start_callback) {
FX_LOGS(DEBUG) << "StartTracing";
controller::Controller_StartTracing_Result result;
if (!session_) {
FX_LOGS(ERROR) << "Ignoring start request, trace must be initialized first";
result.set_err(controller::StartErrorCode::NOT_INITIALIZED);
start_callback(std::move(result));
return;
}
switch (session_->state()) {
case TraceSession::State::kStarting:
case TraceSession::State::kStarted:
FX_LOGS(ERROR) << "Ignoring start request, trace already started";
result.set_err(controller::StartErrorCode::ALREADY_STARTED);
start_callback(std::move(result));
return;
case TraceSession::State::kStopping:
FX_LOGS(ERROR) << "Ignoring start request, trace stopping";
result.set_err(controller::StartErrorCode::STOPPING);
start_callback(std::move(result));
return;
case TraceSession::State::kTerminating:
FX_LOGS(ERROR) << "Ignoring start request, trace terminating";
result.set_err(controller::StartErrorCode::TERMINATING);
start_callback(std::move(result));
return;
case TraceSession::State::kInitialized:
case TraceSession::State::kStopped:
break;
default:
FX_NOTREACHED();
return;
}
std::vector<std::string> additional_categories;
if (options.has_additional_categories()) {
additional_categories = std::move(options.additional_categories());
}
// This default matches trace's.
fuchsia::tracing::BufferDisposition buffer_disposition =
fuchsia::tracing::BufferDisposition::RETAIN;
if (options.has_buffer_disposition()) {
buffer_disposition = options.buffer_disposition();
switch (buffer_disposition) {
case fuchsia::tracing::BufferDisposition::CLEAR_ENTIRE:
case fuchsia::tracing::BufferDisposition::CLEAR_NONDURABLE:
case fuchsia::tracing::BufferDisposition::RETAIN:
break;
default:
FX_LOGS(ERROR) << "Bad value for buffer disposition: " << buffer_disposition
<< ", dropping connection";
// TODO(dje): IWBN to drop the connection. How?
result.set_err(controller::StartErrorCode::TERMINATING);
start_callback(std::move(result));
return;
}
}
FX_LOGS(INFO) << "Starting trace, buffer disposition: " << buffer_disposition;
session_->Start(buffer_disposition, additional_categories, std::move(start_callback));
}
// fidl
void TraceManager::StopTracing(controller::StopOptions options, StopTracingCallback stop_callback) {
controller::Controller_StopTracing_Result stop_result;
controller::Controller_StopTracing_Response response;
if (!session_) {
FX_LOGS(DEBUG) << "Ignoring stop request, tracing not started";
stop_result.set_response(response);
stop_callback(std::move(stop_result));
return;
}
if (session_->state() != TraceSession::State::kInitialized &&
session_->state() != TraceSession::State::kStarting &&
session_->state() != TraceSession::State::kStarted) {
FX_LOGS(DEBUG) << "Ignoring stop request, state != Initialized,Starting,Started";
stop_result.set_response(response);
stop_callback(std::move(stop_result));
return;
}
bool write_results = false;
if (options.has_write_results()) {
write_results = options.write_results();
}
FX_LOGS(INFO) << "Stopping trace" << (write_results ? ", and writing results" : "");
session_->Stop(write_results, [stop_callback = std::move(stop_callback)](
controller::Controller_StopTracing_Result result) {
FX_LOGS(INFO) << "Stopped trace";
stop_callback(std::move(result));
});
}
// fidl
void TraceManager::GetProviders(GetProvidersCallback callback) {
FX_LOGS(DEBUG) << "GetProviders";
controller::Controller_GetProviders_Result result;
std::vector<controller::ProviderInfo> provider_info;
for (const auto& provider : providers_) {
controller::ProviderInfo info;
info.set_id(provider.id);
info.set_pid(provider.pid);
info.set_name(provider.name);
provider_info.push_back(std::move(info));
}
result.set_response(controller::Controller_GetProviders_Response(std::move(provider_info)));
callback(std::move(result));
}
// Allows multiple callers to race to call the same callback.
// The first caller will successfully have their value forwarded to the callback, and each
// subsequent call will be dropped. This allows a callback to race against a timeout to call a
// completer.
//
// The CompleterMerger is internally reference counted so that it may be passed by value as a
// callback to multiple callers
template <typename T>
class CompleterMerger {
public:
explicit CompleterMerger(fit::function<void(T)> completer)
: state_(std::make_shared<State>(std::move(completer))) {}
void operator()(T&& categories) const {
bool expected = false;
if (state_->called_.compare_exchange_weak(expected, true)) {
state_->completer_(std::forward<T>(categories));
}
}
private:
struct State {
explicit State(fit::function<void(T)> completer)
: called_(false), completer_(std::move(completer)) {}
std::atomic<bool> called_;
fit::function<void(T)> completer_;
};
std::shared_ptr<State> state_;
};
// fidl
void TraceManager::GetKnownCategories(GetKnownCategoriesCallback callback) {
FX_LOGS(DEBUG) << "GetKnownCategories";
KnownCategorySet known_categories;
for (const auto& [name, description] : config_.known_categories()) {
known_categories.insert({.name = name, .description = description});
}
std::vector<fpromise::promise<KnownCategoryVector>> promises;
fpromise::promise<> timeout = executor_.MakeDelayedPromise(zx::sec(1));
for (const auto& provider : providers_) {
fpromise::bridge<KnownCategoryVector> bridge;
promises.push_back(bridge.consumer.promise());
CompleterMerger<KnownCategoryVector> merger{bridge.completer.bind()};
provider.provider->GetKnownCategories(merger);
timeout = fpromise::promise<>{timeout.and_then([merger = merger]() mutable { merger({}); })};
}
auto joined_promise =
fpromise::join_promise_vector(std::move(promises))
.and_then(
[callback = std::move(callback), known_categories = std::move(known_categories)](
std::vector<fpromise::result<KnownCategoryVector>>& results) mutable {
for (const auto& result : results) {
if (result.is_ok()) {
const auto& result_known_categories = result.value();
known_categories.insert(result_known_categories.begin(),
result_known_categories.end());
}
}
controller::Controller_GetKnownCategories_Result result;
result.set_response(controller::Controller_GetKnownCategories_Response(
{known_categories.begin(), known_categories.end()}));
callback(std::move(result));
});
executor_.schedule_task(std::move(joined_promise));
executor_.schedule_task(std::move(timeout));
}
void TraceManager::WatchAlert(WatchAlertCallback cb) {
FX_LOGS(DEBUG) << "WatchAlert";
if (alerts_.empty()) {
watch_alert_callbacks_.push(std::move(cb));
} else {
controller::Controller_WatchAlert_Result result;
result.set_response(controller::Controller_WatchAlert_Response(std::move(alerts_.front())));
cb(std::move(result));
alerts_.pop();
}
}
void TraceManager::RegisterProviderWorker(fidl::InterfaceHandle<provider::Provider> provider,
uint64_t pid, fidl::StringPtr name) {
FX_LOGS(DEBUG) << "Registering provider {" << pid << ":" << name.value_or("") << "}";
auto it = providers_.emplace(providers_.end(), provider.Bind(), next_provider_id_++, pid,
name.value_or(""));
it->provider.set_error_handler([this, it](zx_status_t status) {
if (session_)
session_->RemoveDeadProvider(&(*it));
providers_.erase(it);
});
if (session_) {
session_->AddProvider(&(*it));
}
}
// fidl
void TraceManager::RegisterProvider(fidl::InterfaceHandle<provider::Provider> provider,
uint64_t pid, std::string name) {
RegisterProviderWorker(std::move(provider), pid, std::move(name));
}
// fidl
void TraceManager::RegisterProviderSynchronously(fidl::InterfaceHandle<provider::Provider> provider,
uint64_t pid, std::string name,
RegisterProviderSynchronouslyCallback callback) {
RegisterProviderWorker(std::move(provider), pid, std::move(name));
bool already_started = (session_ && (session_->state() == TraceSession::State::kStarting ||
session_->state() == TraceSession::State::kStarted));
callback(ZX_OK, already_started);
}
void TraceManager::SendSessionStateEvent(controller::SessionState state) {
for (const auto& binding : app_->controller_bindings().bindings()) {
binding->events().OnSessionStateChange(state);
}
}
controller::SessionState TraceManager::TranslateSessionState(TraceSession::State state) {
switch (state) {
case TraceSession::State::kReady:
return controller::SessionState::READY;
case TraceSession::State::kInitialized:
return controller::SessionState::INITIALIZED;
case TraceSession::State::kStarting:
return controller::SessionState::STARTING;
case TraceSession::State::kStarted:
return controller::SessionState::STARTED;
case TraceSession::State::kStopping:
return controller::SessionState::STOPPING;
case TraceSession::State::kStopped:
return controller::SessionState::STOPPED;
case TraceSession::State::kTerminating:
return controller::SessionState::TERMINATING;
}
}
void TraceManager::OnAlert(const std::string& alert_name) {
if (watch_alert_callbacks_.empty()) {
if (alerts_.size() == kMaxAlertQueueDepth) {
// We're at our queue depth limit. Discard the oldest alert.
alerts_.pop();
}
alerts_.push(alert_name);
return;
}
controller::Controller_WatchAlert_Result result;
result.set_response(controller::Controller_WatchAlert_Response(alert_name));
watch_alert_callbacks_.front()(std::move(result));
watch_alert_callbacks_.pop();
}
} // namespace tracing