blob: b33dd64619971eac944d7464a68ed7d6324e2955 [file] [log] [blame]
// Copyright 2016 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "garnet/bin/trace_manager/tracee.h"
#include <lib/async/default.h>
#include <trace-engine/fields.h>
#include <trace-provider/provider.h>
#include "lib/fxl/logging.h"
namespace tracing {
namespace {
// Writes |len| bytes from |buffer| to |socket|. Returns
// TransferStatus::kComplete if the entire buffer has been
// successfully transferred. A return value of
// TransferStatus::kReceiverDead indicates that the peer was closed
// during the transfer.
Tracee::TransferStatus WriteBufferToSocket(const uint8_t* buffer,
size_t len,
const zx::socket& socket) {
size_t offset = 0;
while (offset < len) {
zx_status_t status = ZX_OK;
size_t actual = 0;
if ((status = socket.write(0u, buffer + offset, len - offset, &actual)) <
0) {
if (status == ZX_ERR_SHOULD_WAIT) {
zx_signals_t pending = 0;
status = socket.wait_one(ZX_SOCKET_WRITABLE | ZX_SOCKET_PEER_CLOSED,
zx::time::infinite(), &pending);
if (status < 0) {
FXL_LOG(ERROR) << "Wait on socket failed: " << status;
return Tracee::TransferStatus::kCorrupted;
}
if (pending & ZX_SOCKET_WRITABLE)
continue;
if (pending & ZX_SOCKET_PEER_CLOSED) {
FXL_LOG(ERROR) << "Peer closed while writing to socket";
return Tracee::TransferStatus::kReceiverDead;
}
}
return Tracee::TransferStatus::kCorrupted;
}
offset += actual;
}
return Tracee::TransferStatus::kComplete;
}
} // namespace
Tracee::Tracee(TraceProviderBundle* bundle)
: bundle_(bundle), wait_(this), weak_ptr_factory_(this) {}
Tracee::~Tracee() {
if (async_) {
wait_.Cancel();
wait_.set_object(ZX_HANDLE_INVALID);
async_ = nullptr;
}
}
bool Tracee::operator==(TraceProviderBundle* bundle) const {
return bundle_ == bundle;
}
bool Tracee::Start(size_t buffer_size,
fidl::VectorPtr<fidl::StringPtr> categories,
fxl::Closure started_callback,
fxl::Closure stopped_callback) {
FXL_DCHECK(state_ == State::kReady);
FXL_DCHECK(!buffer_vmo_);
FXL_DCHECK(started_callback);
FXL_DCHECK(stopped_callback);
zx::vmo buffer_vmo;
zx_status_t status = zx::vmo::create(buffer_size, 0u, &buffer_vmo);
if (status != ZX_OK) {
FXL_LOG(ERROR) << *bundle_
<< ": Failed to create trace buffer: status=" << status;
return false;
}
zx::vmo buffer_vmo_for_provider;
status = buffer_vmo.duplicate(ZX_RIGHTS_BASIC | ZX_RIGHTS_IO | ZX_RIGHT_MAP,
&buffer_vmo_for_provider);
if (status != ZX_OK) {
FXL_LOG(ERROR) << *bundle_
<< ": Failed to duplicate trace buffer for provider: status="
<< status;
return false;
}
zx::eventpair fence, fence_for_provider;
status = zx::eventpair::create(0u, &fence, &fence_for_provider);
if (status != ZX_OK) {
FXL_LOG(ERROR) << *bundle_
<< ": Failed to create trace buffer fence: status="
<< status;
return false;
}
bundle_->provider->Start(std::move(buffer_vmo_for_provider),
std::move(fence_for_provider),
std::move(categories));
buffer_vmo_ = std::move(buffer_vmo);
buffer_vmo_size_ = buffer_size;
fence_ = std::move(fence);
started_callback_ = std::move(started_callback);
stopped_callback_ = std::move(stopped_callback);
wait_.set_object(fence_.get());
wait_.set_trigger(TRACE_PROVIDER_SIGNAL_STARTED |
TRACE_PROVIDER_SIGNAL_BUFFER_OVERFLOW |
ZX_EPAIR_PEER_CLOSED);
async_ = async_get_default();
status = wait_.Begin(async_);
FXL_CHECK(status == ZX_OK) << "Failed to add handler: status=" << status;
TransitionToState(State::kStartPending);
return true;
}
void Tracee::Stop() {
if (state_ != State::kStarted)
return;
bundle_->provider->Stop();
TransitionToState(State::kStopping);
}
void Tracee::TransitionToState(State new_state) {
FXL_VLOG(2) << *bundle_ << ": Transitioning from " << state_ << " to "
<< new_state;
state_ = new_state;
}
void Tracee::OnHandleReady(async_t* async,
async::WaitBase* wait,
zx_status_t status,
const zx_packet_signal_t* signal) {
if (status != ZX_OK) {
OnHandleError(status);
return;
}
zx_signals_t pending = signal->observed;
FXL_VLOG(2) << *bundle_ << ": pending=0x" << std::hex << pending;
FXL_DCHECK(pending &
(TRACE_PROVIDER_SIGNAL_STARTED |
TRACE_PROVIDER_SIGNAL_BUFFER_OVERFLOW | ZX_EPAIR_PEER_CLOSED));
FXL_DCHECK(state_ == State::kStartPending || state_ == State::kStarted ||
state_ == State::kStopping);
// Handle this before BUFFER_OVERFLOW so that if they arrive together we'll
// have first transitioned to state kStarted before processing
// BUFFER_OVERFLOW.
if (pending & TRACE_PROVIDER_SIGNAL_STARTED) {
// Clear the signal before invoking the callback:
// a) It remains set until we do so,
// b) Clear it before the call back in case we get back to back
// notifications.
zx_object_signal(wait_.object(), TRACE_PROVIDER_SIGNAL_STARTED, 0u);
// The provider should only be signalling us when it has finished startup.
if (state_ == State::kStartPending) {
TransitionToState(State::kStarted);
fxl::Closure started_callback = std::move(started_callback_);
FXL_DCHECK(started_callback);
started_callback();
} else {
FXL_LOG(WARNING) << *bundle_
<< ": Received TRACE_PROVIDER_SIGNAL_STARTED in state "
<< state_;
}
}
if (pending & TRACE_PROVIDER_SIGNAL_BUFFER_OVERFLOW) {
// The signal remains set until we clear it.
zx_object_signal(wait_.object(), TRACE_PROVIDER_SIGNAL_BUFFER_OVERFLOW, 0u);
if (state_ == State::kStarted || state_ == State::kStopping) {
FXL_LOG(WARNING)
<< *bundle_
<< ": Records got dropped, probably due to buffer overflow";
buffer_overflow_ = true;
} else {
FXL_LOG(WARNING)
<< *bundle_
<< ": Received TRACE_PROVIDER_SIGNAL_BUFFER_OVERFLOW in state "
<< state_;
}
}
if (pending & ZX_EPAIR_PEER_CLOSED) {
wait_.set_object(ZX_HANDLE_INVALID);
async_ = nullptr;
TransitionToState(State::kStopped);
fxl::Closure stopped_callback = std::move(stopped_callback_);
FXL_DCHECK(stopped_callback);
stopped_callback();
return;
}
status = wait->Begin(async);
if (status != ZX_OK)
OnHandleError(status);
}
void Tracee::OnHandleError(zx_status_t status) {
FXL_VLOG(2) << *bundle_ << ": error=" << status;
FXL_DCHECK(status == ZX_ERR_CANCELED);
FXL_DCHECK(state_ == State::kStartPending || state_ == State::kStarted ||
state_ == State::kStopping);
wait_.set_object(ZX_HANDLE_INVALID);
async_ = nullptr;
TransitionToState(State::kStopped);
}
Tracee::TransferStatus Tracee::TransferRecords(const zx::socket& socket) const {
FXL_DCHECK(socket);
FXL_DCHECK(buffer_vmo_);
Tracee::TransferStatus transfer_status = TransferStatus::kComplete;
if ((transfer_status = WriteProviderInfoRecord(socket)) !=
TransferStatus::kComplete) {
FXL_LOG(ERROR) << *bundle_
<< ": Failed to write provider info record to trace.";
return transfer_status;
}
if (buffer_overflow_) {
// If we can't write the provider event record, it's not the end of the
// world.
if (WriteProviderBufferOverflowEvent(socket) != TransferStatus::kComplete) {
FXL_LOG(ERROR) << *bundle_
<< ": Failed to write provider event (buffer overflow)"
" record to trace.";
}
}
std::vector<uint8_t> buffer(buffer_vmo_size_);
if (buffer_vmo_.read(buffer.data(), 0, buffer_vmo_size_) != ZX_OK) {
FXL_LOG(WARNING) << *bundle_ << ": Failed to read data from buffer_vmo: "
<< "expected size=" << buffer_vmo_size_;
}
const uint64_t* start = reinterpret_cast<const uint64_t*>(buffer.data());
const uint64_t* current = start;
const uint64_t* end = start + trace::BytesToWords(buffer.size());
while (current < end) {
auto length = trace::RecordFields::RecordSize::Get<uint16_t>(*current);
if (length == 0 || length > trace::RecordFields::kMaxRecordSizeBytes ||
current + length >= end) {
break;
}
current += length;
}
return WriteBufferToSocket(buffer.data(),
trace::WordsToBytes(current - start), socket);
}
Tracee::TransferStatus Tracee::WriteProviderInfoRecord(
const zx::socket& socket) const {
std::string label(""); // TODO(ZX-1875): Provide meaningful labels or remove
// labels from the trace wire format altogether.
size_t num_words = 1u + trace::BytesToWords(trace::Pad(label.size()));
std::vector<uint64_t> record(num_words);
record[0] =
trace::ProviderInfoMetadataRecordFields::Type::Make(
trace::ToUnderlyingType(trace::RecordType::kMetadata)) |
trace::ProviderInfoMetadataRecordFields::RecordSize::Make(num_words) |
trace::ProviderInfoMetadataRecordFields::MetadataType::Make(
trace::ToUnderlyingType(trace::MetadataType::kProviderInfo)) |
trace::ProviderInfoMetadataRecordFields::Id::Make(bundle_->id) |
trace::ProviderInfoMetadataRecordFields::NameLength::Make(label.size());
memcpy(&record[1], label.c_str(), label.size());
return WriteBufferToSocket(reinterpret_cast<uint8_t*>(record.data()),
trace::WordsToBytes(num_words), socket);
}
Tracee::TransferStatus Tracee::WriteProviderBufferOverflowEvent(
const zx::socket& socket) const {
size_t num_words = 1u;
std::vector<uint64_t> record(num_words);
record[0] =
trace::ProviderEventMetadataRecordFields::Type::Make(
trace::ToUnderlyingType(trace::RecordType::kMetadata)) |
trace::ProviderEventMetadataRecordFields::RecordSize::Make(num_words) |
trace::ProviderEventMetadataRecordFields::MetadataType::Make(
trace::ToUnderlyingType(trace::MetadataType::kProviderEvent)) |
trace::ProviderEventMetadataRecordFields::Id::Make(bundle_->id) |
trace::ProviderEventMetadataRecordFields::Event::Make(
trace::ToUnderlyingType(trace::ProviderEventType::kBufferOverflow));
return WriteBufferToSocket(reinterpret_cast<uint8_t*>(record.data()),
trace::WordsToBytes(num_words), socket);
}
std::ostream& operator<<(std::ostream& out, Tracee::State state) {
switch (state) {
case Tracee::State::kReady:
out << "ready";
break;
case Tracee::State::kStartPending:
out << "start pending";
break;
case Tracee::State::kStarted:
out << "started";
break;
case Tracee::State::kStopping:
out << "stopping";
break;
case Tracee::State::kStopped:
out << "stopped";
break;
}
return out;
}
} // namespace tracing