blob: c7c6eb2477dee8aaf9cc9bc1cd2c176a444312b3 [file] [log] [blame]
// Copyright 2019 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "src/developer/forensics/crash_reports/queue.h"
#include <lib/async/cpp/task.h>
#include <lib/fit/defer.h>
#include <lib/syslog/cpp/macros.h>
#include <lib/zx/time.h>
#include <zircon/errors.h>
#include "src/developer/forensics/crash_reports/constants.h"
#include "src/developer/forensics/crash_reports/info/queue_info.h"
#include "src/lib/fxl/strings/join_strings.h"
#include "src/lib/fxl/strings/string_printf.h"
namespace forensics {
namespace crash_reports {
Queue::Queue(async_dispatcher_t* dispatcher, std::shared_ptr<sys::ServiceDirectory> services,
std::shared_ptr<InfoContext> info_context, LogTags* tags, CrashServer* crash_server,
SnapshotManager* snapshot_manager)
: dispatcher_(dispatcher),
services_(services),
tags_(tags),
store_(tags_, info_context, /*temp_root=*/Store::Root{kStoreTmpPath, kStoreMaxTmpSize},
/*persistent_root=*/Store::Root{kStoreCachePath, kStoreMaxCacheSize}),
crash_server_(crash_server),
snapshot_manager_(snapshot_manager),
metrics_(std::move(info_context)) {
FX_CHECK(dispatcher_);
FX_CHECK(crash_server_);
InitFromStore();
}
void Queue::InitFromStore() {
// Note: The upload attempt data is lost when the component stops and all reports start with
// upload attempts of 0.
for (const auto& report_id : store_.GetReports()) {
// It could technically be an hourly snapshot, but the snapshot has not been persisted so it is
// okay to have another one here.
blocked_reports_.emplace_back(report_id, store_.GetSnapshotUuid(report_id),
false /*not a known hourly report*/);
}
std::sort(blocked_reports_.begin(), blocked_reports_.end(),
[](const PendingReport& lhs, const PendingReport& rhs) {
return lhs.report_id <= rhs.report_id;
});
if (!blocked_reports_.empty()) {
std::vector<std::string> report_id_strs;
for (const auto& pending_report : blocked_reports_) {
report_id_strs.push_back(std::to_string(pending_report.report_id));
}
FX_LOGS(INFO) << "Initializing queue with reports: " << ReportIdsStr(blocked_reports_);
}
}
uint64_t Queue::Size() const {
return ready_reports_.size() + blocked_reports_.size() + (bool)active_report_;
}
bool Queue::IsEmpty() const { return Size() == 0; }
ReportId Queue::LatestReport() const {
const ReportId active_report_id = (active_report_) ? active_report_->report_id : 0u;
return std::max({
active_report_id,
ready_reports_.back().report_id,
blocked_reports_.back().report_id,
});
}
bool Queue::Contains(const ReportId report_id) const {
return (std::find_if(ready_reports_.cbegin(), ready_reports_.cend(),
[=](const PendingReport& r) { return r.report_id == report_id; }) !=
ready_reports_.cend()) ||
(std::find_if(blocked_reports_.cbegin(), blocked_reports_.cend(),
[=](const PendingReport& r) { return r.report_id == report_id; }) !=
blocked_reports_.cend()) ||
(active_report_ && active_report_->report_id == report_id);
}
bool Queue::HasHourlyReport() const {
return (std::find_if(ready_reports_.cbegin(), ready_reports_.cend(),
[=](const PendingReport& r) { return r.is_hourly_report; }) !=
ready_reports_.cend()) ||
(std::find_if(blocked_reports_.cbegin(), blocked_reports_.cend(),
[=](const PendingReport& r) { return r.is_hourly_report; }) !=
blocked_reports_.cend()) ||
(active_report_ && active_report_->is_hourly_report);
}
bool Queue::IsPeriodicUploadScheduled() const {
return unblock_all_every_fifteen_minutes_task_.is_pending();
}
void Queue::StopUploading() {
stop_uploading_ = true;
// Re-add all active reports so they're put in the store (if need be) and not uploaded
// immediately.
for (auto& report : ready_reports_) {
const bool add_to_store = report.HasReport();
Add(std::move(report), /*consider_eager_upload=*/false, add_to_store);
}
ready_reports_.clear();
for (auto& report : blocked_reports_) {
Retire(std::move(report), RetireReason::kArchive);
}
blocked_reports_.clear();
unblock_all_every_fifteen_minutes_task_.Cancel();
}
bool Queue::Add(Report report) {
// Only allow a single hourly report in the queue at a time.
if (report.IsHourlyReport()) {
FX_CHECK(!HasHourlyReport());
}
return Add(PendingReport(std::move(report)), /*consider_eager_upload=*/true,
/*add_to_store=*/true);
}
bool Queue::Add(PendingReport pending_report, const bool consider_eager_upload,
const bool add_to_store) {
if (reporting_policy_ == ReportingPolicy::kDoNotFileAndDelete) {
Retire(std::move(pending_report), RetireReason::kDelete);
return true;
}
if (consider_eager_upload && reporting_policy_ == ReportingPolicy::kUpload && !stop_uploading_) {
ready_reports_.push_back(std::move(pending_report));
Upload();
return true;
}
if (add_to_store && pending_report.HasReport()) {
if (!AddToStore(pending_report.TakeReport())) {
Retire(std::move(pending_report), RetireReason::kDelete);
return false;
}
}
if (reporting_policy_ == ReportingPolicy::kArchive) {
Retire(std::move(pending_report), RetireReason::kArchive);
return true;
}
if (!stop_uploading_) {
blocked_reports_.push_back(std::move(pending_report));
}
return true;
}
bool Queue::AddToStore(Report report) {
std::vector<ReportId> garbage_collected_reports;
const bool success = store_.Add(std::move(report), &garbage_collected_reports);
// Retire each pending report that is garbage collected by the store.
for (auto& pending_report : ready_reports_) {
if (std::find(garbage_collected_reports.cbegin(), garbage_collected_reports.cend(),
pending_report.report_id) != garbage_collected_reports.end()) {
Retire(std::move(pending_report), RetireReason::kGarbageCollected);
}
}
for (auto& pending_report : blocked_reports_) {
if (std::find(garbage_collected_reports.cbegin(), garbage_collected_reports.cend(),
pending_report.report_id) != garbage_collected_reports.end()) {
Retire(std::move(pending_report), RetireReason::kGarbageCollected);
}
}
// Erase all pending reports that were garbage collected.
ready_reports_.erase(std::remove_if(ready_reports_.begin(), ready_reports_.end(),
[&](const PendingReport& pending_report) {
return std::find(garbage_collected_reports.cbegin(),
garbage_collected_reports.cend(),
pending_report.report_id) !=
garbage_collected_reports.end();
}),
ready_reports_.end());
blocked_reports_.erase(std::remove_if(blocked_reports_.begin(), blocked_reports_.end(),
[&](const PendingReport& pending_report) {
return std::find(garbage_collected_reports.cbegin(),
garbage_collected_reports.cend(),
pending_report.report_id) !=
garbage_collected_reports.end();
}),
blocked_reports_.end());
return success;
}
void Queue::Upload() {
// Don't upload if the queue isn't allow to upload.
if (stop_uploading_ || reporting_policy_ != ReportingPolicy::kUpload) {
return;
}
// Don't upload if there aren't any reports to uploade or a report is already being uploaded.
if (ready_reports_.empty() || active_report_ || crash_server_->HasPendingRequest()) {
return;
}
active_report_ = std::move(ready_reports_.front());
ready_reports_.pop_front();
bool add_to_store = active_report_->HasReport();
if (!active_report_->HasReport()) {
if (!store_.Contains(active_report_->report_id)) {
Retire(std::move(*active_report_), RetireReason::kGarbageCollected);
active_report_ = std::nullopt;
Upload();
return;
}
active_report_->SetReport(store_.Get(active_report_->report_id));
}
// The upload will fail if the annotations are empty.
if (active_report_->report->Annotations().Raw().empty()) {
FX_LOGST(INFO, tags_->Get(active_report_->report_id))
<< "Dropping report with empty annotations";
Retire(std::move(*active_report_), RetireReason::kGarbageCollected);
active_report_ = std::nullopt;
Upload();
return;
}
metrics_.IncrementUploadAttempts(active_report_->report_id);
crash_server_->MakeRequest(
*active_report_->report, snapshot_manager_->GetSnapshot(active_report_->snapshot_uuid),
[this, add_to_store](CrashServer::UploadStatus status, std::string server_report_id) mutable {
switch (status) {
case CrashServer::UploadStatus::kSuccess:
Retire(std::move(*active_report_), RetireReason::kUpload, server_report_id);
break;
case CrashServer::UploadStatus::kThrottled:
Retire(std::move(*active_report_), RetireReason::kThrottled);
break;
case CrashServer::UploadStatus::kTimedOut:
Retire(std::move(*active_report_), RetireReason::kTimedOut);
break;
case CrashServer::UploadStatus::kFailure:
if (active_report_->delete_post_upload) {
Retire(std::move(*active_report_), RetireReason::kDelete);
} else {
// If the report isn't deleted and should be added to the store post-upload, its
// content should still be present, e.g., DeleteAll didn't delete it.
if (add_to_store) {
FX_CHECK(active_report_->HasReport());
}
Add(std::move(*active_report_), /*consider_eager_upload=*/false, add_to_store);
}
break;
}
active_report_ = std::nullopt;
Upload();
});
// Clear the report from memory if it won't be added to the store.
if (!add_to_store) {
active_report_->TakeReport();
}
}
void Queue::Retire(const PendingReport pending_report, const Queue::RetireReason reason,
const std::string server_report_id) {
auto tags = tags_->Get(pending_report.report_id);
switch (reason) {
case RetireReason::kArchive:
FX_LOGST(INFO, tags) << "Archiving local report under /tmp/reports";
metrics_.Retire(pending_report, reason, server_report_id);
// Don't clean up resources if the report is being archived.
return;
case RetireReason::kUpload:
FX_LOGST(INFO, tags) << "Successfully uploaded report at https://crash.corp.google.com/"
<< server_report_id;
break;
case RetireReason::kThrottled:
FX_LOGST(INFO, tags) << "Upload throttled by server";
break;
case RetireReason::kTimedOut:
FX_LOGST(INFO, tags) << "Upload timed out, not re-trying";
break;
case RetireReason::kDelete:
FX_LOGST(INFO, tags) << "Deleted local report";
break;
case RetireReason::kGarbageCollected:
FX_LOGST(INFO, tags) << "Garbage collected local report";
break;
}
metrics_.Retire(pending_report, reason, server_report_id);
snapshot_manager_->Release(pending_report.snapshot_uuid);
tags_->Unregister(pending_report.report_id);
store_.Remove(pending_report.report_id);
}
void Queue::BlockAll() {
// Move all ready reports to blocked and add all reports to the store that haven't been
// added yet.
for (auto& pending_report : ready_reports_) {
const bool add_to_store = pending_report.HasReport();
Add(std::move(pending_report), /*consider_eager_upload=*/false, add_to_store);
}
ready_reports_.clear();
}
void Queue::UnblockAll() {
if (stop_uploading_ || reporting_policy_ != ReportingPolicy::kUpload) {
return;
}
ready_reports_.insert(ready_reports_.end(), std::make_move_iterator(blocked_reports_.begin()),
std::make_move_iterator(blocked_reports_.end()));
blocked_reports_.clear();
Upload();
}
void Queue::DeleteAll() {
FX_LOGS(INFO) << fxl::StringPrintf("Deleting all %zu pending reports", Size());
for (auto& pending_report : ready_reports_) {
Retire(std::move(pending_report), RetireReason::kDelete);
}
ready_reports_.clear();
for (auto& pending_report : blocked_reports_) {
Retire(std::move(pending_report), RetireReason::kDelete);
}
blocked_reports_.clear();
// Delete the report being uploaded, but don't retire it; the PendingReport is needed
// post-upload and will be retired once it is used.
if (active_report_) {
active_report_->TakeReport();
active_report_->delete_post_upload = true;
}
store_.RemoveAll();
}
// The queue is inheritly conservative with uploading crash reports meaning that a report that is
// forbidden from being uploaded will never be uploaded while crash reports that are permitted to
// be uploaded may later be considered to be forbidden. This is due to the fact that when uploads
// are disabled all reports are immediately archived after having been added to the queue, thus we
// never have to worry that a report that shouldn't be uploaded ends up being uploaded when the
// reporting policy changes.
void Queue::WatchReportingPolicy(ReportingPolicyWatcher* watcher) {
auto OnReportingPolicyChange = [this](const ReportingPolicy policy) {
reporting_policy_ = policy;
switch (reporting_policy_) {
case ReportingPolicy::kDoNotFileAndDelete:
unblock_all_every_fifteen_minutes_task_.Cancel();
DeleteAll();
break;
case ReportingPolicy::kUpload:
UnblockAllEveryFifteenMinutes();
break;
case ReportingPolicy::kArchive:
// The reporting policy shouldn't change to Archive outside of tests.
unblock_all_every_fifteen_minutes_task_.Cancel();
break;
case ReportingPolicy::kUndecided:
BlockAll();
unblock_all_every_fifteen_minutes_task_.Cancel();
break;
}
};
OnReportingPolicyChange(watcher->CurrentPolicy());
watcher->OnPolicyChange([=](const ReportingPolicy policy) { OnReportingPolicyChange(policy); });
}
void Queue::WatchNetwork(NetworkWatcher* network_watcher) {
network_watcher->Register([this](const bool network_is_reachable) {
if (!stop_uploading_ && network_is_reachable) {
if (!blocked_reports_.empty()) {
FX_LOGS(INFO) << "Uploading " << blocked_reports_.size()
<< " reports on network reachable: " << ReportIdsStr(blocked_reports_);
UnblockAll();
}
}
});
}
void Queue::UnblockAllEveryFifteenMinutes() {
if (stop_uploading_) {
return;
}
if (!blocked_reports_.empty()) {
FX_LOGS(INFO) << "Uploading " << blocked_reports_.size()
<< " reports on as a part of the 15-minute periodic upload: "
<< ReportIdsStr(blocked_reports_);
UnblockAll();
}
if (const auto status =
unblock_all_every_fifteen_minutes_task_.PostDelayed(dispatcher_, zx::min(15));
status != ZX_OK) {
FX_PLOGS(ERROR, status) << "Error posting periodic upload task to async loop. Won't retry.";
}
}
Queue::PendingReport::PendingReport(Report report)
: report_id(report.Id()),
snapshot_uuid(report.SnapshotUuid()),
is_hourly_report(report.IsHourlyReport()),
report(std::move(report)),
delete_post_upload(false) {}
Queue::PendingReport::PendingReport(const ReportId report_id, const SnapshotUuid snapshot_uuid,
const bool is_hourly_report)
: report_id(report_id),
snapshot_uuid(std::move(snapshot_uuid)),
is_hourly_report(is_hourly_report),
report(std::nullopt),
delete_post_upload(false) {}
void Queue::PendingReport::SetReport(Report r) { report = std::move(r); }
Report Queue::PendingReport::TakeReport() {
FX_CHECK(report);
auto r = std::move(*report);
report = std::nullopt;
return r;
}
bool Queue::PendingReport::HasReport() const { return (bool)report; }
Queue::UploadMetrics::UploadMetrics(std::shared_ptr<InfoContext> info_context)
: info_(std::move(info_context)) {}
void Queue::UploadMetrics::IncrementUploadAttempts(const ReportId report_id) {
upload_attempts_[report_id]++;
info_.RecordUploadAttemptNumber(upload_attempts_[report_id]);
}
void Queue::UploadMetrics::Retire(const PendingReport& pending_report,
const RetireReason retire_reason,
const std::string server_report_id) {
switch (retire_reason) {
case RetireReason::kUpload:
info_.MarkReportAsUploaded(server_report_id, upload_attempts_[pending_report.report_id]);
break;
case RetireReason::kDelete:
info_.MarkReportAsDeleted(upload_attempts_[pending_report.report_id]);
break;
case RetireReason::kThrottled:
info_.MarkReportAsThrottledByServer(upload_attempts_[pending_report.report_id]);
break;
case RetireReason::kTimedOut:
info_.MarkReportAsTimedOut(upload_attempts_[pending_report.report_id]);
break;
case RetireReason::kArchive:
info_.MarkReportAsArchived();
break;
case RetireReason::kGarbageCollected:
info_.MarkReportAsGarbageCollected(upload_attempts_[pending_report.report_id]);
break;
}
upload_attempts_.erase(pending_report.report_id);
}
std::string Queue::ReportIdsStr(const std::deque<PendingReport>& reports) const {
std::vector<std::string> report_id_strs;
for (const auto& pending_report : reports) {
report_id_strs.push_back(std::to_string(pending_report.report_id));
}
return "[" + fxl::JoinStrings(report_id_strs, ", ") + "]";
}
} // namespace crash_reports
} // namespace forensics