blob: 5f21ef5bf3b42f22038107e31b36e692bcfac76a [file] [log] [blame]
// Copyright 2016 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include <numeric>
#include "garnet/bin/trace_manager/trace_session.h"
#include "lib/fidl/cpp/clone.h"
#include "lib/fsl/tasks/message_loop.h"
#include "lib/fxl/logging.h"
namespace tracing {
TraceSession::TraceSession(zx::socket destination,
fidl::VectorPtr<fidl::StringPtr> categories,
size_t trace_buffer_size,
fxl::Closure abort_handler)
: destination_(std::move(destination)),
categories_(std::move(categories)),
trace_buffer_size_(trace_buffer_size),
buffer_(trace_buffer_size_),
abort_handler_(std::move(abort_handler)),
weak_ptr_factory_(this) {}
TraceSession::~TraceSession() {
destination_.reset();
}
void TraceSession::WaitForProvidersToStart(fxl::Closure callback,
fxl::TimeDelta timeout) {
start_callback_ = std::move(callback);
session_start_timeout_.Start(
fsl::MessageLoop::GetCurrent()->task_runner().get(),
[weak = weak_ptr_factory_.GetWeakPtr()]() {
if (weak) {
FXL_LOG(WARNING) << "Waiting for start timed out.";
weak->NotifyStarted();
}
},
std::move(timeout));
}
void TraceSession::AddProvider(TraceProviderBundle* bundle) {
if (!(state_ == State::kReady || state_ == State::kStarted))
return;
FXL_VLOG(1) << "Adding provider " << *bundle;
tracees_.emplace_back(std::make_unique<Tracee>(bundle));
fidl::VectorPtr<fidl::StringPtr> categories_clone;
fidl::Clone(categories_, &categories_clone);
if (!tracees_.back()->Start(
trace_buffer_size_, std::move(categories_clone),
[ weak = weak_ptr_factory_.GetWeakPtr(), bundle ]() {
if (weak)
weak->CheckAllProvidersStarted();
},
[ weak = weak_ptr_factory_.GetWeakPtr(), bundle ]() {
if (weak)
weak->FinishProvider(bundle);
})) {
tracees_.pop_back();
} else {
// We haven't fully started at this point, we still have to wait for
// the provider to indicate it has started, but for our purposes we have
// started "enough".
TransitionToState(State::kStarted);
}
}
void TraceSession::RemoveDeadProvider(TraceProviderBundle* bundle) {
if (!(state_ == State::kStarted || state_ == State::kStopping))
return;
FinishProvider(bundle);
}
void TraceSession::Stop(fxl::Closure done_callback,
const fxl::TimeDelta& timeout) {
if (!(state_ == State::kReady || state_ == State::kStarted))
return;
FXL_VLOG(1) << "Stopping trace";
TransitionToState(State::kStopping);
done_callback_ = std::move(done_callback);
// Walk through all remaining tracees and send out their buffers.
for (const auto& tracee : tracees_)
tracee->Stop();
session_finalize_timeout_.Start(
fsl::MessageLoop::GetCurrent()->task_runner().get(),
[weak = weak_ptr_factory_.GetWeakPtr()]() {
if (weak)
weak->FinishSessionDueToTimeout();
},
timeout);
FinishSessionIfEmpty();
}
void TraceSession::NotifyStarted() {
if (start_callback_) {
FXL_VLOG(1) << "Marking session as having started";
session_start_timeout_.Stop();
auto start_callback = std::move(start_callback_);
start_callback();
}
}
void TraceSession::Abort() {
FXL_VLOG(1) << "Marking session as having aborted";
TransitionToState(State::kStopped);
tracees_.clear();
abort_handler_();
}
// Called when a provider state change is detected.
// This includes "failed" as well as "started".
void TraceSession::CheckAllProvidersStarted() {
bool all_started = std::accumulate(
tracees_.begin(), tracees_.end(), true,
[](bool value, const auto& tracee) {
bool ready = (tracee->state() == Tracee::State::kStarted ||
// If a provider fails to start continue tracing.
// TODO(TO-530): We should still record what providers
// failed to start.
tracee->state() == Tracee::State::kStopped);
FXL_VLOG(2) << "tracee " << *tracee->bundle() << (ready ? "" : " not")
<< " ready";
return value && ready;
});
if (all_started) {
FXL_VLOG(2) << "All providers reporting started";
NotifyStarted();
}
}
void TraceSession::FinishProvider(TraceProviderBundle* bundle) {
auto it =
std::find_if(tracees_.begin(), tracees_.end(),
[bundle](const auto& tracee) { return *tracee == bundle; });
if (it != tracees_.end()) {
if (destination_) {
switch ((*it)->TransferRecords(destination_)) {
case Tracee::TransferStatus::kComplete:
break;
case Tracee::TransferStatus::kCorrupted:
FXL_LOG(ERROR) << "Encountered unrecoverable error writing socket, "
"aborting trace";
Abort();
return;
case Tracee::TransferStatus::kReceiverDead:
FXL_LOG(ERROR) << "Peer is closed, aborting trace";
Abort();
return;
default:
break;
}
}
tracees_.erase(it);
}
if (state_ != State::kStopping) {
// A trace provider may have entered the finished state without having
// first successfully started. Check whether all remaining providers have
// now started.
CheckAllProvidersStarted();
}
FinishSessionIfEmpty();
}
void TraceSession::FinishSessionIfEmpty() {
if (state_ == State::kStopping && tracees_.empty()) {
FXL_VLOG(1) << "Marking session as stopped, no more tracees";
TransitionToState(State::kStopped);
session_finalize_timeout_.Stop();
done_callback_();
}
}
void TraceSession::FinishSessionDueToTimeout() {
// We do not consider pending_start_tracees_ here as we only
// stop them as a best effort.
if (state_ == State::kStopping && !tracees_.empty()) {
FXL_VLOG(1)
<< "Marking session as stopped, timed out waiting for tracee(s)";
TransitionToState(State::kStopped);
for (auto& tracee : tracees_) {
if (tracee->state() != Tracee::State::kStopped)
FXL_LOG(WARNING) << "Timed out waiting for trace provider "
<< *tracee->bundle() << " to finish";
}
done_callback_();
}
}
void TraceSession::TransitionToState(State new_state) {
FXL_VLOG(2) << "Transitioning from " << state_ << " to " << new_state;
state_ = new_state;
}
std::ostream& operator<<(std::ostream& out, TraceSession::State state) {
switch (state) {
case TraceSession::State::kReady:
out << "ready";
break;
case TraceSession::State::kStarted:
out << "started";
break;
case TraceSession::State::kStopping:
out << "stopping";
break;
case TraceSession::State::kStopped:
out << "stopped";
break;
}
return out;
}
} // namespace tracing