| // Copyright 2016 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "garnet/bin/trace/tracer.h" |
| |
| #include <lib/async/cpp/task.h> |
| #include <lib/async/default.h> |
| #include <trace-engine/fields.h> |
| #include <trace-reader/reader.h> |
| #include <utility> |
| |
| #include "lib/fxl/logging.h" |
| |
| namespace tracing { |
| |
| Tracer::Tracer(fuchsia::tracing::TraceController* controller) |
| : controller_(controller), dispatcher_(nullptr), wait_(this) { |
| FXL_DCHECK(controller_); |
| wait_.set_trigger(ZX_SOCKET_READABLE | ZX_SOCKET_PEER_CLOSED); |
| } |
| |
| Tracer::~Tracer() { CloseSocket(); } |
| |
| void Tracer::Start(fuchsia::tracing::TraceOptions options, |
| RecordConsumer record_consumer, ErrorHandler error_handler, |
| fit::closure start_callback, fit::closure done_callback) { |
| FXL_DCHECK(state_ == State::kStopped); |
| |
| state_ = State::kStarted; |
| done_callback_ = std::move(done_callback); |
| start_callback_ = std::move(start_callback); |
| |
| zx::socket outgoing_socket; |
| zx_status_t status = zx::socket::create(0u, &socket_, &outgoing_socket); |
| if (status != ZX_OK) { |
| FXL_LOG(ERROR) << "Failed to create socket: status=" << status; |
| Done(); |
| return; |
| } |
| |
| controller_->StartTracing(std::move(options), std::move(outgoing_socket), |
| [this]() { start_callback_(); }); |
| |
| reader_.reset(new trace::TraceReader(std::move(record_consumer), |
| std::move(error_handler))); |
| |
| dispatcher_ = async_get_default_dispatcher(); |
| wait_.set_object(socket_.get()); |
| status = wait_.Begin(dispatcher_); |
| FXL_CHECK(status == ZX_OK) << "Failed to add handler: status=" << status; |
| } |
| |
| void Tracer::Stop() { |
| // Note: The controller will close the socket when finished. |
| if (state_ == State::kStarted) { |
| state_ = State::kStopping; |
| controller_->StopTracing(); |
| } |
| } |
| |
| void Tracer::OnHandleReady(async_dispatcher_t* dispatcher, |
| async::WaitBase* wait, zx_status_t status, |
| const zx_packet_signal_t* signal) { |
| FXL_DCHECK(state_ == State::kStarted || state_ == State::kStopping); |
| |
| if (status != ZX_OK) { |
| OnHandleError(status); |
| return; |
| } |
| |
| if (signal->observed & ZX_SOCKET_READABLE) { |
| DrainSocket(dispatcher); |
| } else if (signal->observed & ZX_SOCKET_PEER_CLOSED) { |
| Done(); |
| } else { |
| FXL_CHECK(false); |
| } |
| } |
| |
| void Tracer::DrainSocket(async_dispatcher_t* dispatcher) { |
| for (;;) { |
| size_t actual; |
| zx_status_t status = |
| socket_.read(0u, buffer_.data() + buffer_end_, |
| buffer_.size() - buffer_end_, &actual); |
| if (status == ZX_ERR_SHOULD_WAIT) { |
| status = wait_.Begin(dispatcher); |
| if (status != ZX_OK) { |
| OnHandleError(status); |
| } |
| return; |
| } |
| |
| if (status || actual == 0) { |
| if (status != ZX_ERR_PEER_CLOSED) { |
| FXL_LOG(ERROR) << "Failed to read data from socket: status=" << status; |
| } |
| Done(); |
| return; |
| } |
| |
| buffer_end_ += actual; |
| size_t bytes_available = buffer_end_; |
| FXL_DCHECK(bytes_available > 0); |
| |
| trace::Chunk chunk(reinterpret_cast<const uint64_t*>(buffer_.data()), |
| trace::BytesToWords(bytes_available)); |
| if (!reader_->ReadRecords(chunk)) { |
| FXL_LOG(ERROR) << "Trace stream is corrupted"; |
| Done(); |
| return; |
| } |
| |
| size_t bytes_consumed = |
| bytes_available - trace::WordsToBytes(chunk.remaining_words()); |
| bytes_available -= bytes_consumed; |
| memmove(buffer_.data(), buffer_.data() + bytes_consumed, bytes_available); |
| buffer_end_ = bytes_available; |
| } |
| } |
| |
| void Tracer::OnHandleError(zx_status_t status) { |
| FXL_LOG(ERROR) << "Failed to wait on socket: status=" << status; |
| Done(); |
| } |
| |
| void Tracer::CloseSocket() { |
| if (socket_) { |
| wait_.Cancel(); |
| wait_.set_object(ZX_HANDLE_INVALID); |
| dispatcher_ = nullptr; |
| socket_.reset(); |
| } |
| } |
| |
| void Tracer::Done() { |
| FXL_DCHECK(state_ == State::kStarted || state_ == State::kStopping); |
| |
| state_ = State::kStopped; |
| reader_.reset(); |
| |
| CloseSocket(); |
| |
| if (done_callback_) { |
| async::PostTask(async_get_default_dispatcher(), std::move(done_callback_)); |
| } |
| } |
| |
| } // namespace tracing |