blob: 28fcd9a726671dca75b0f8bb27797128b35a6838 [file] [log] [blame]
// Copyright 2019 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "src/developer/forensics/crash_reports/queue.h"
#include <lib/async/cpp/task.h>
#include <lib/fit/defer.h>
#include <lib/syslog/cpp/macros.h>
#include <zircon/errors.h>
#include "src/developer/forensics/crash_reports/constants.h"
#include "src/developer/forensics/crash_reports/info/queue_info.h"
#include "src/lib/fxl/strings/string_printf.h"
namespace forensics {
namespace crash_reports {
using async::PostDelayedTask;
using async::PostTask;
using crashpad::FileReader;
using UploadPolicy = Settings::UploadPolicy;
constexpr char kStorePath[] = "/tmp/reports";
void Queue::WatchSettings(Settings* settings) {
settings->RegisterUploadPolicyWatcher(
[this](const UploadPolicy& upload_policy) { OnUploadPolicyChange(upload_policy); });
}
Queue::Queue(async_dispatcher_t* dispatcher, std::shared_ptr<sys::ServiceDirectory> services,
std::shared_ptr<InfoContext> info_context, CrashServer* crash_server)
: dispatcher_(dispatcher),
services_(services),
store_(info_context, kStorePath, kStoreMaxSize),
crash_server_(crash_server),
info_(std::move(info_context)),
network_reconnection_backoff_(/*initial_delay=*/zx::min(1), /*retry_factor=*/2u,
/*max_delay=*/zx::hour(1)) {
FX_CHECK(dispatcher_);
ProcessAllEveryHour();
ProcessAllOnNetworkReachable();
// TODO(fxbug.dev/56448): Initialize queue with the reports in the store. We need to be able to
// distinguish archived reports from reports that have not been uploaded yet.
}
bool Queue::Contains(const Store::Uid& uuid) const {
return std::find(pending_reports_.begin(), pending_reports_.end(), uuid) !=
pending_reports_.end();
}
bool Queue::Add(Report report) {
// Process all pending reports after Add has completed. The processing is done asynchronously so
// that the crash report filer is not blocked.
auto process_all = fit::defer([this] {
if (const auto status = PostTask(dispatcher_, [this] { ProcessAll(); }); status != ZX_OK) {
FX_PLOGS(ERROR, status) << "Error posting task to process reports after adding new report";
}
});
// Attempt to upload a report before putting it in the store.
std::string server_report_id;
if (state_ == State::Upload) {
info_.RecordUploadAttemptNumber(1u);
if (Upload(report, &server_report_id)) {
info_.MarkReportAsUploaded(server_report_id, 1u);
return true;
}
}
std::vector<Store::Uid> garbage_collected_reports;
std::optional<Store::Uid> local_report_id =
store_.Add(std::move(report), &garbage_collected_reports);
for (const auto& id : garbage_collected_reports) {
GarbageCollect(id);
}
if (!local_report_id.has_value()) {
return false;
}
// Early upload that failed.
if (state_ == State::Upload) {
upload_attempts_[local_report_id.value()]++;
}
pending_reports_.push_back(local_report_id.value());
return true;
}
size_t Queue::ProcessAll() {
switch (state_) {
case State::Archive:
return ArchiveAll();
case State::Upload:
return UploadAll();
case State::LeaveAsPending:
return 0;
}
}
bool Queue::Upload(const Store::Uid& local_report_id) {
std::optional<Report> report = store_.Get(local_report_id);
if (!report.has_value()) {
// |pending_reports_| is kept in sync with |store_| so Get should only ever fail if the report
// is deleted from the store by an external influence, e.g., the filesystem flushes /cache.
return true;
}
upload_attempts_[local_report_id]++;
info_.RecordUploadAttemptNumber(upload_attempts_[local_report_id]);
std::string server_report_id;
if (Upload(report.value(), &server_report_id)) {
info_.MarkReportAsUploaded(server_report_id, upload_attempts_[local_report_id]);
upload_attempts_.erase(local_report_id);
store_.Remove(local_report_id);
return true;
}
FX_LOGS(ERROR) << "Error uploading local report " << std::to_string(local_report_id);
return false;
}
bool Queue::Upload(const Report& report, std::string* server_report_id) {
if (crash_server_->MakeRequest(report, server_report_id)) {
FX_LOGS(INFO) << "Successfully uploaded report at https://crash.corp.google.com/"
<< *server_report_id;
return true;
}
return false;
}
void Queue::GarbageCollect(const Store::Uid& local_report_id) {
FX_LOGS(INFO) << "Garbage collected local report " << std::to_string(local_report_id);
info_.MarkReportAsGarbageCollected(upload_attempts_[local_report_id]);
upload_attempts_.erase(local_report_id);
pending_reports_.erase(
std::remove(pending_reports_.begin(), pending_reports_.end(), local_report_id),
pending_reports_.end());
}
size_t Queue::UploadAll() {
std::vector<Store::Uid> new_pending_reports;
for (const auto& local_report_id : pending_reports_) {
if (!Upload(local_report_id)) {
new_pending_reports.push_back(local_report_id);
}
}
pending_reports_.swap(new_pending_reports);
// |new_pending_reports| now contains the pending reports before attempting to upload them.
return new_pending_reports.size() - pending_reports_.size();
}
size_t Queue::ArchiveAll() {
size_t successful = 0;
for (const auto& local_report_id : pending_reports_) {
FX_LOGS(INFO) << "Archiving local report " << std::to_string(local_report_id)
<< " under /tmp/reports";
info_.MarkReportAsArchived(upload_attempts_[local_report_id]);
}
pending_reports_.clear();
return successful;
}
// The queue is inheritly conservative with uploading crash reports meaning that a report that is
// forbidden from being uploaded will never be uploaded while crash reports that are permitted to
// be uploaded may later be considered to be forbidden. This is due to the fact that when uploads
// are disabled all reports are immediately archived after having been added to the queue, thus we
// never have to worry that a report that shouldn't be uploaded ends up being uploaded when the
// upload policy changes.
void Queue::OnUploadPolicyChange(const Settings::UploadPolicy& upload_policy) {
switch (upload_policy) {
case UploadPolicy::DISABLED:
state_ = State::Archive;
break;
case UploadPolicy::ENABLED:
state_ = State::Upload;
break;
case UploadPolicy::LIMBO:
state_ = State::LeaveAsPending;
break;
}
ProcessAll();
}
void Queue::ProcessAllEveryHour() {
if (const auto status = PostDelayedTask(
dispatcher_,
[this] {
if (ProcessAll() > 0) {
FX_LOGS(INFO) << "Hourly processing of pending crash reports queue";
}
ProcessAllEveryHour();
},
zx::hour(1));
status != ZX_OK) {
FX_PLOGS(ERROR, status) << "Error posting hourly process task to async loop. Won't retry.";
}
}
void Queue::ProcessAllOnNetworkReachable() {
netstack_ = services_->Connect<fuchsia::netstack::Netstack>();
netstack_.set_error_handler([this](zx_status_t status) {
FX_PLOGS(ERROR, status) << "Lost connection to " << fuchsia::netstack::Netstack::Name_;
network_reconnection_task_.Reset([this]() mutable { ProcessAllOnNetworkReachable(); });
async::PostDelayedTask(
dispatcher_, [cb = network_reconnection_task_.callback()]() { cb(); },
network_reconnection_backoff_.GetNext());
});
auto isReachable = [](const fuchsia::netstack::NetInterface& interface) {
if ((interface.flags & fuchsia::netstack::Flags::UP) != fuchsia::netstack::Flags::UP) {
return false;
}
if ((interface.flags & fuchsia::netstack::Flags::DHCP) != fuchsia::netstack::Flags::DHCP) {
return false;
}
auto isZero = [](const uint8_t octet) { return octet == 0; };
switch (interface.addr.Which()) {
case fuchsia::net::IpAddress::Tag::kIpv4: {
const auto& octets = interface.addr.ipv4().addr;
return !std::all_of(octets.cbegin(), octets.cend(), isZero);
}
case fuchsia::net::IpAddress::Tag::kIpv6: {
const auto& octets = interface.addr.ipv6().addr;
return !std::all_of(octets.cbegin(), octets.cend(), isZero);
}
case fuchsia::net::IpAddress::Tag::Invalid: {
FX_LOGS(ERROR) << "Network interface " << interface.name << " has malformed IP address";
return false;
}
}
};
netstack_.events().OnInterfacesChanged =
[this, isReachable](std::vector<fuchsia::netstack::NetInterface> interfaces) {
network_reconnection_backoff_.Reset();
const bool reachable = std::any_of(interfaces.cbegin(), interfaces.cend(), isReachable);
if (reachable) {
if (ProcessAll() > 0) {
FX_LOGS(INFO) << "Processing of pending crash reports queue on network reachable";
}
}
};
}
} // namespace crash_reports
} // namespace forensics