blob: f62bb8a60a804aa35963c9deca03002839418078 [file] [log] [blame]
// Copyright 2022 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "src/developer/forensics/feedback_data/log_source.h"
#include <fuchsia/diagnostics/cpp/fidl.h>
#include <fuchsia/logger/cpp/fidl.h>
#include <lib/async/cpp/task.h>
#include <lib/fit/defer.h>
#include <lib/sys/cpp/service_directory.h>
#include <lib/syslog/cpp/macros.h>
#include <memory>
#include "src/developer/forensics/feedback_data/constants.h"
#include "src/developer/forensics/feedback_data/log_source.h"
#include "src/lib/backoff/backoff.h"
#include "src/lib/diagnostics/accessor2logger/log_message.h"
namespace forensics::feedback_data {
LogSource::LogSource(async_dispatcher_t* dispatcher,
std::shared_ptr<sys::ServiceDirectory> services, LogSink* sink,
std::unique_ptr<backoff::Backoff> backoff)
: dispatcher_(dispatcher),
services_(std::move(services)),
sink_(sink),
backoff_(std::move(backoff)) {
FX_CHECK(dispatcher_ != nullptr);
FX_CHECK(services_ != nullptr);
FX_CHECK(sink_ != nullptr);
// |backoff_| can only be null if we know a reconnection won't occur.
if (sink_->SafeAfterInterruption()) {
FX_CHECK(backoff_ != nullptr);
}
archive_accessor_.set_error_handler([this](const zx_status_t status) {
FX_LOGS(WARNING) << "Lost connection to " << kArchiveAccessorName;
// The batch iterator and archive accessor connections are not expected to close. Ensure both
// are unbound at the same time to simplify reconnections.
batch_iterator_.Unbind();
OnDisconnect();
});
batch_iterator_.set_error_handler([this](const zx_status_t status) {
FX_LOGS(WARNING) << "Lost connection to fuchsia.diagnostics.BatchIterator";
// The batch iterator and archive accessor connections are not expected to close. Ensure both
// are unbound at the same time to simplify reconnections.
archive_accessor_.Unbind();
OnDisconnect();
});
}
void LogSource::Start() {
if (is_stopped_ && !sink_->SafeAfterInterruption()) {
FX_LOGS(FATAL) << "Log streaming unsafe to resume";
}
is_stopped_ = false;
services_->Connect(archive_accessor_.NewRequest(dispatcher_), kArchiveAccessorName);
fuchsia::diagnostics::StreamParameters params;
params.set_data_type(fuchsia::diagnostics::DataType::LOGS)
.set_format(fuchsia::diagnostics::Format::JSON)
.set_stream_mode(fuchsia::diagnostics::StreamMode::SNAPSHOT_THEN_SUBSCRIBE)
.set_client_selector_configuration(
fuchsia::diagnostics::ClientSelectorConfiguration::WithSelectAll(true));
archive_accessor_->StreamDiagnostics(std::move(params), batch_iterator_.NewRequest(dispatcher_));
GetNext();
}
void LogSource::OnDisconnect() {
is_stopped_ = true;
sink_->NotifyInterruption();
if (!sink_->SafeAfterInterruption()) {
return;
}
async::PostDelayedTask(
dispatcher_,
[self = ptr_factory_.GetWeakPtr()] {
// Safe to call Start because it's confirmed the sink can safely resume receiving messages.
if (self) {
self->Start();
}
},
backoff_->GetNext());
}
void LogSource::GetNext() {
using diagnostics::accessor2logger::ConvertFormattedContentToLogMessages;
batch_iterator_->GetNext([this](auto result) {
auto get_next = ::fit::defer([this] { GetNext(); });
if (result.is_err()) {
return;
}
for (auto& content : result.response().batch) {
auto formatted = ConvertFormattedContentToLogMessages(std::move(content));
if (formatted.is_error()) {
sink_->Add(::fpromise::error(std::move(formatted.error())));
continue;
}
for (auto& message : formatted.value()) {
sink_->Add(std::move(message));
}
}
});
}
void LogSource::Stop() {
batch_iterator_.Unbind();
archive_accessor_.Unbind();
is_stopped_ = true;
sink_->NotifyInterruption();
}
} // namespace forensics::feedback_data