| // Copyright 2019 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "src/developer/forensics/crash_reports/queue.h" |
| |
| #include <lib/async/cpp/task.h> |
| #include <lib/fit/defer.h> |
| #include <lib/syslog/cpp/macros.h> |
| #include <lib/zx/time.h> |
| #include <zircon/errors.h> |
| |
| #include "src/developer/forensics/crash_reports/constants.h" |
| #include "src/developer/forensics/crash_reports/info/queue_info.h" |
| #include "src/lib/fxl/strings/join_strings.h" |
| #include "src/lib/fxl/strings/string_printf.h" |
| |
| namespace forensics { |
| namespace crash_reports { |
| |
| Queue::Queue(async_dispatcher_t* dispatcher, std::shared_ptr<sys::ServiceDirectory> services, |
| std::shared_ptr<InfoContext> info_context, LogTags* tags, CrashServer* crash_server, |
| SnapshotManager* snapshot_manager) |
| : dispatcher_(dispatcher), |
| services_(services), |
| tags_(tags), |
| store_(tags_, info_context, /*temp_root=*/Store::Root{kStoreTmpPath, kStoreMaxTmpSize}, |
| /*persistent_root=*/Store::Root{kStoreCachePath, kStoreMaxCacheSize}), |
| crash_server_(crash_server), |
| snapshot_manager_(snapshot_manager), |
| metrics_(std::move(info_context)) { |
| FX_CHECK(dispatcher_); |
| FX_CHECK(crash_server_); |
| |
| InitFromStore(); |
| } |
| |
| void Queue::InitFromStore() { |
| // Note: The upload attempt data is lost when the component stops and all reports start with |
| // upload attempts of 0. |
| for (const auto& report_id : store_.GetReports()) { |
| // It could technically be an hourly snapshot, but the snapshot has not been persisted so it is |
| // okay to have another one here. |
| blocked_reports_.emplace_back(report_id, store_.GetSnapshotUuid(report_id), |
| false /*not a known hourly report*/); |
| } |
| |
| std::sort(blocked_reports_.begin(), blocked_reports_.end(), |
| [](const PendingReport& lhs, const PendingReport& rhs) { |
| return lhs.report_id <= rhs.report_id; |
| }); |
| |
| if (!blocked_reports_.empty()) { |
| std::vector<std::string> report_id_strs; |
| for (const auto& pending_report : blocked_reports_) { |
| report_id_strs.push_back(std::to_string(pending_report.report_id)); |
| } |
| FX_LOGS(INFO) << "Initializing queue with reports: " << ReportIdsStr(blocked_reports_); |
| } |
| } |
| |
| uint64_t Queue::Size() const { |
| return ready_reports_.size() + blocked_reports_.size() + (bool)active_report_; |
| } |
| |
| bool Queue::IsEmpty() const { return Size() == 0; } |
| |
| ReportId Queue::LatestReport() const { |
| const ReportId active_report_id = (active_report_) ? active_report_->report_id : 0u; |
| return std::max({ |
| active_report_id, |
| ready_reports_.back().report_id, |
| blocked_reports_.back().report_id, |
| }); |
| } |
| |
| bool Queue::Contains(const ReportId report_id) const { |
| return (std::find_if(ready_reports_.cbegin(), ready_reports_.cend(), |
| [=](const PendingReport& r) { return r.report_id == report_id; }) != |
| ready_reports_.cend()) || |
| (std::find_if(blocked_reports_.cbegin(), blocked_reports_.cend(), |
| [=](const PendingReport& r) { return r.report_id == report_id; }) != |
| blocked_reports_.cend()) || |
| (active_report_ && active_report_->report_id == report_id); |
| } |
| |
| bool Queue::HasHourlyReport() const { |
| return (std::find_if(ready_reports_.cbegin(), ready_reports_.cend(), |
| [=](const PendingReport& r) { return r.is_hourly_report; }) != |
| ready_reports_.cend()) || |
| (std::find_if(blocked_reports_.cbegin(), blocked_reports_.cend(), |
| [=](const PendingReport& r) { return r.is_hourly_report; }) != |
| blocked_reports_.cend()) || |
| (active_report_ && active_report_->is_hourly_report); |
| } |
| |
| bool Queue::IsPeriodicUploadScheduled() const { |
| return unblock_all_every_fifteen_minutes_task_.is_pending(); |
| } |
| |
| void Queue::StopUploading() { |
| stop_uploading_ = true; |
| |
| // Re-add all active reports so they're put in the store (if need be) and not uploaded |
| // immediately. |
| for (auto& report : ready_reports_) { |
| const bool add_to_store = report.HasReport(); |
| Add(std::move(report), /*consider_eager_upload=*/false, add_to_store); |
| } |
| ready_reports_.clear(); |
| |
| for (auto& report : blocked_reports_) { |
| Retire(std::move(report), RetireReason::kArchive); |
| } |
| blocked_reports_.clear(); |
| |
| unblock_all_every_fifteen_minutes_task_.Cancel(); |
| } |
| |
| bool Queue::Add(Report report) { |
| // Only allow a single hourly report in the queue at a time. |
| if (report.IsHourlyReport()) { |
| FX_CHECK(!HasHourlyReport()); |
| } |
| |
| return Add(PendingReport(std::move(report)), /*consider_eager_upload=*/true, |
| /*add_to_store=*/true); |
| } |
| |
| bool Queue::Add(PendingReport pending_report, const bool consider_eager_upload, |
| const bool add_to_store) { |
| if (reporting_policy_ == ReportingPolicy::kDoNotFileAndDelete) { |
| Retire(std::move(pending_report), RetireReason::kDelete); |
| return true; |
| } |
| |
| if (consider_eager_upload && reporting_policy_ == ReportingPolicy::kUpload && !stop_uploading_) { |
| ready_reports_.push_back(std::move(pending_report)); |
| Upload(); |
| return true; |
| } |
| |
| if (add_to_store && pending_report.HasReport()) { |
| if (!AddToStore(pending_report.TakeReport())) { |
| Retire(std::move(pending_report), RetireReason::kDelete); |
| return false; |
| } |
| } |
| |
| if (reporting_policy_ == ReportingPolicy::kArchive) { |
| Retire(std::move(pending_report), RetireReason::kArchive); |
| return true; |
| } |
| |
| if (!stop_uploading_) { |
| blocked_reports_.push_back(std::move(pending_report)); |
| } |
| return true; |
| } |
| |
| bool Queue::AddToStore(Report report) { |
| std::vector<ReportId> garbage_collected_reports; |
| const bool success = store_.Add(std::move(report), &garbage_collected_reports); |
| |
| // Retire each pending report that is garbage collected by the store. |
| for (auto& pending_report : ready_reports_) { |
| if (std::find(garbage_collected_reports.cbegin(), garbage_collected_reports.cend(), |
| pending_report.report_id) != garbage_collected_reports.end()) { |
| Retire(std::move(pending_report), RetireReason::kGarbageCollected); |
| } |
| } |
| for (auto& pending_report : blocked_reports_) { |
| if (std::find(garbage_collected_reports.cbegin(), garbage_collected_reports.cend(), |
| pending_report.report_id) != garbage_collected_reports.end()) { |
| Retire(std::move(pending_report), RetireReason::kGarbageCollected); |
| } |
| } |
| |
| // Erase all pending reports that were garbage collected. |
| ready_reports_.erase(std::remove_if(ready_reports_.begin(), ready_reports_.end(), |
| [&](const PendingReport& pending_report) { |
| return std::find(garbage_collected_reports.cbegin(), |
| garbage_collected_reports.cend(), |
| pending_report.report_id) != |
| garbage_collected_reports.end(); |
| }), |
| ready_reports_.end()); |
| blocked_reports_.erase(std::remove_if(blocked_reports_.begin(), blocked_reports_.end(), |
| [&](const PendingReport& pending_report) { |
| return std::find(garbage_collected_reports.cbegin(), |
| garbage_collected_reports.cend(), |
| pending_report.report_id) != |
| garbage_collected_reports.end(); |
| }), |
| blocked_reports_.end()); |
| |
| return success; |
| } |
| |
| void Queue::Upload() { |
| // Don't upload if the queue isn't allow to upload. |
| if (stop_uploading_ || reporting_policy_ != ReportingPolicy::kUpload) { |
| return; |
| } |
| |
| // Don't upload if there aren't any reports to uploade or a report is already being uploaded. |
| if (ready_reports_.empty() || active_report_ || crash_server_->HasPendingRequest()) { |
| return; |
| } |
| |
| active_report_ = std::move(ready_reports_.front()); |
| ready_reports_.pop_front(); |
| |
| bool add_to_store = active_report_->HasReport(); |
| if (!active_report_->HasReport()) { |
| if (!store_.Contains(active_report_->report_id)) { |
| Retire(std::move(*active_report_), RetireReason::kGarbageCollected); |
| active_report_ = std::nullopt; |
| Upload(); |
| return; |
| } |
| |
| active_report_->SetReport(store_.Get(active_report_->report_id)); |
| } |
| |
| // The upload will fail if the annotations are empty. |
| if (active_report_->report->Annotations().Raw().empty()) { |
| FX_LOGST(INFO, tags_->Get(active_report_->report_id)) |
| << "Dropping report with empty annotations"; |
| Retire(std::move(*active_report_), RetireReason::kGarbageCollected); |
| active_report_ = std::nullopt; |
| Upload(); |
| return; |
| } |
| |
| metrics_.IncrementUploadAttempts(active_report_->report_id); |
| crash_server_->MakeRequest( |
| *active_report_->report, snapshot_manager_->GetSnapshot(active_report_->snapshot_uuid), |
| [this, add_to_store](CrashServer::UploadStatus status, std::string server_report_id) mutable { |
| switch (status) { |
| case CrashServer::UploadStatus::kSuccess: |
| Retire(std::move(*active_report_), RetireReason::kUpload, server_report_id); |
| break; |
| case CrashServer::UploadStatus::kThrottled: |
| Retire(std::move(*active_report_), RetireReason::kThrottled); |
| break; |
| case CrashServer::UploadStatus::kTimedOut: |
| Retire(std::move(*active_report_), RetireReason::kTimedOut); |
| break; |
| case CrashServer::UploadStatus::kFailure: |
| if (active_report_->delete_post_upload) { |
| Retire(std::move(*active_report_), RetireReason::kDelete); |
| } else { |
| // If the report isn't deleted and should be added to the store post-upload, its |
| // content should still be present, e.g., DeleteAll didn't delete it. |
| if (add_to_store) { |
| FX_CHECK(active_report_->HasReport()); |
| } |
| Add(std::move(*active_report_), /*consider_eager_upload=*/false, add_to_store); |
| } |
| break; |
| } |
| active_report_ = std::nullopt; |
| Upload(); |
| }); |
| |
| // Clear the report from memory if it won't be added to the store. |
| if (!add_to_store) { |
| active_report_->TakeReport(); |
| } |
| } |
| |
| void Queue::Retire(const PendingReport pending_report, const Queue::RetireReason reason, |
| const std::string server_report_id) { |
| auto tags = tags_->Get(pending_report.report_id); |
| switch (reason) { |
| case RetireReason::kArchive: |
| FX_LOGST(INFO, tags) << "Archiving local report under /tmp/reports"; |
| metrics_.Retire(pending_report, reason, server_report_id); |
| // Don't clean up resources if the report is being archived. |
| return; |
| case RetireReason::kUpload: |
| FX_LOGST(INFO, tags) << "Successfully uploaded report at https://crash.corp.google.com/" |
| << server_report_id; |
| break; |
| case RetireReason::kThrottled: |
| FX_LOGST(INFO, tags) << "Upload throttled by server"; |
| break; |
| case RetireReason::kTimedOut: |
| FX_LOGST(INFO, tags) << "Upload timed out, not re-trying"; |
| break; |
| case RetireReason::kDelete: |
| FX_LOGST(INFO, tags) << "Deleted local report"; |
| break; |
| case RetireReason::kGarbageCollected: |
| FX_LOGST(INFO, tags) << "Garbage collected local report"; |
| break; |
| } |
| |
| metrics_.Retire(pending_report, reason, server_report_id); |
| snapshot_manager_->Release(pending_report.snapshot_uuid); |
| tags_->Unregister(pending_report.report_id); |
| store_.Remove(pending_report.report_id); |
| } |
| |
| void Queue::BlockAll() { |
| // Move all ready reports to blocked and add all reports to the store that haven't been |
| // added yet. |
| for (auto& pending_report : ready_reports_) { |
| const bool add_to_store = pending_report.HasReport(); |
| Add(std::move(pending_report), /*consider_eager_upload=*/false, add_to_store); |
| } |
| ready_reports_.clear(); |
| } |
| |
| void Queue::UnblockAll() { |
| if (stop_uploading_ || reporting_policy_ != ReportingPolicy::kUpload) { |
| return; |
| } |
| |
| ready_reports_.insert(ready_reports_.end(), std::make_move_iterator(blocked_reports_.begin()), |
| std::make_move_iterator(blocked_reports_.end())); |
| blocked_reports_.clear(); |
| Upload(); |
| } |
| |
| void Queue::DeleteAll() { |
| FX_LOGS(INFO) << fxl::StringPrintf("Deleting all %zu pending reports", Size()); |
| |
| for (auto& pending_report : ready_reports_) { |
| Retire(std::move(pending_report), RetireReason::kDelete); |
| } |
| ready_reports_.clear(); |
| |
| for (auto& pending_report : blocked_reports_) { |
| Retire(std::move(pending_report), RetireReason::kDelete); |
| } |
| blocked_reports_.clear(); |
| |
| // Delete the report being uploaded, but don't retire it; the PendingReport is needed |
| // post-upload and will be retired once it is used. |
| if (active_report_) { |
| active_report_->TakeReport(); |
| active_report_->delete_post_upload = true; |
| } |
| |
| store_.RemoveAll(); |
| } |
| |
| // The queue is inheritly conservative with uploading crash reports meaning that a report that is |
| // forbidden from being uploaded will never be uploaded while crash reports that are permitted to |
| // be uploaded may later be considered to be forbidden. This is due to the fact that when uploads |
| // are disabled all reports are immediately archived after having been added to the queue, thus we |
| // never have to worry that a report that shouldn't be uploaded ends up being uploaded when the |
| // reporting policy changes. |
| void Queue::WatchReportingPolicy(ReportingPolicyWatcher* watcher) { |
| auto OnReportingPolicyChange = [this](const ReportingPolicy policy) { |
| reporting_policy_ = policy; |
| switch (reporting_policy_) { |
| case ReportingPolicy::kDoNotFileAndDelete: |
| unblock_all_every_fifteen_minutes_task_.Cancel(); |
| DeleteAll(); |
| break; |
| case ReportingPolicy::kUpload: |
| UnblockAllEveryFifteenMinutes(); |
| break; |
| case ReportingPolicy::kArchive: |
| // The reporting policy shouldn't change to Archive outside of tests. |
| unblock_all_every_fifteen_minutes_task_.Cancel(); |
| break; |
| case ReportingPolicy::kUndecided: |
| BlockAll(); |
| unblock_all_every_fifteen_minutes_task_.Cancel(); |
| break; |
| } |
| }; |
| |
| OnReportingPolicyChange(watcher->CurrentPolicy()); |
| watcher->OnPolicyChange([=](const ReportingPolicy policy) { OnReportingPolicyChange(policy); }); |
| } |
| |
| void Queue::WatchNetwork(NetworkWatcher* network_watcher) { |
| network_watcher->Register([this](const bool network_is_reachable) { |
| if (!stop_uploading_ && network_is_reachable) { |
| if (!blocked_reports_.empty()) { |
| FX_LOGS(INFO) << "Uploading " << blocked_reports_.size() |
| << " reports on network reachable: " << ReportIdsStr(blocked_reports_); |
| UnblockAll(); |
| } |
| } |
| }); |
| } |
| |
| void Queue::UnblockAllEveryFifteenMinutes() { |
| if (stop_uploading_) { |
| return; |
| } |
| |
| if (!blocked_reports_.empty()) { |
| FX_LOGS(INFO) << "Uploading " << blocked_reports_.size() |
| << " reports on as a part of the 15-minute periodic upload: " |
| << ReportIdsStr(blocked_reports_); |
| UnblockAll(); |
| } |
| |
| if (const auto status = |
| unblock_all_every_fifteen_minutes_task_.PostDelayed(dispatcher_, zx::min(15)); |
| status != ZX_OK) { |
| FX_PLOGS(ERROR, status) << "Error posting periodic upload task to async loop. Won't retry."; |
| } |
| } |
| |
| Queue::PendingReport::PendingReport(Report report) |
| : report_id(report.Id()), |
| snapshot_uuid(report.SnapshotUuid()), |
| is_hourly_report(report.IsHourlyReport()), |
| report(std::move(report)), |
| delete_post_upload(false) {} |
| |
| Queue::PendingReport::PendingReport(const ReportId report_id, const SnapshotUuid snapshot_uuid, |
| const bool is_hourly_report) |
| : report_id(report_id), |
| snapshot_uuid(std::move(snapshot_uuid)), |
| is_hourly_report(is_hourly_report), |
| report(std::nullopt), |
| delete_post_upload(false) {} |
| |
| void Queue::PendingReport::SetReport(Report r) { report = std::move(r); } |
| |
| Report Queue::PendingReport::TakeReport() { |
| FX_CHECK(report); |
| auto r = std::move(*report); |
| report = std::nullopt; |
| return r; |
| } |
| |
| bool Queue::PendingReport::HasReport() const { return (bool)report; } |
| |
| Queue::UploadMetrics::UploadMetrics(std::shared_ptr<InfoContext> info_context) |
| : info_(std::move(info_context)) {} |
| |
| void Queue::UploadMetrics::IncrementUploadAttempts(const ReportId report_id) { |
| upload_attempts_[report_id]++; |
| info_.RecordUploadAttemptNumber(upload_attempts_[report_id]); |
| } |
| |
| void Queue::UploadMetrics::Retire(const PendingReport& pending_report, |
| const RetireReason retire_reason, |
| const std::string server_report_id) { |
| switch (retire_reason) { |
| case RetireReason::kUpload: |
| info_.MarkReportAsUploaded(server_report_id, upload_attempts_[pending_report.report_id]); |
| break; |
| case RetireReason::kDelete: |
| info_.MarkReportAsDeleted(upload_attempts_[pending_report.report_id]); |
| break; |
| case RetireReason::kThrottled: |
| info_.MarkReportAsThrottledByServer(upload_attempts_[pending_report.report_id]); |
| break; |
| case RetireReason::kTimedOut: |
| info_.MarkReportAsTimedOut(upload_attempts_[pending_report.report_id]); |
| break; |
| case RetireReason::kArchive: |
| info_.MarkReportAsArchived(); |
| break; |
| case RetireReason::kGarbageCollected: |
| info_.MarkReportAsGarbageCollected(upload_attempts_[pending_report.report_id]); |
| break; |
| } |
| upload_attempts_.erase(pending_report.report_id); |
| } |
| |
| std::string Queue::ReportIdsStr(const std::deque<PendingReport>& reports) const { |
| std::vector<std::string> report_id_strs; |
| for (const auto& pending_report : reports) { |
| report_id_strs.push_back(std::to_string(pending_report.report_id)); |
| } |
| |
| return "[" + fxl::JoinStrings(report_id_strs, ", ") + "]"; |
| } |
| |
| } // namespace crash_reports |
| } // namespace forensics |