| // Copyright 2020 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "src/local_aggregation_1_1/local_aggregate_storage/delayed_local_aggregate_storage.h" |
| |
| #include <chrono> |
| |
| #include "absl/strings/str_cat.h" |
| #include "src/lib/statusor/statusor.h" |
| #include "src/lib/util/file_system.h" |
| #include "src/local_aggregation_1_1/local_aggregate_storage/local_aggregate_storage.h" |
| #include "src/local_aggregation_1_1/local_aggregation.pb.h" |
| #include "src/logger/internal_metrics.h" |
| #include "src/logger/project_context.h" |
| #include "src/logger/project_context_factory.h" |
| #include "src/logging.h" |
| |
| namespace cobalt::local_aggregation { |
| |
| constexpr std::chrono::milliseconds DelayedLocalAggregateStorage::kDefaultWritebackFrequency = |
| std::chrono::seconds(5); |
| |
| DelayedLocalAggregateStorage::DelayedLocalAggregateStorage( |
| std::string base_directory, util::FileSystem *fs, |
| const logger::ProjectContextFactory *global_project_context_factory, |
| std::chrono::milliseconds writeback_frequency) |
| : proto_store_(std::move(base_directory), fs), |
| global_project_context_factory_(global_project_context_factory), |
| internal_metrics_(logger::InternalMetrics::NewWithLogger(nullptr)), |
| writeback_frequency_(writeback_frequency) { |
| ReadPersistentStore(); |
| DeleteOutdatedMetrics(); |
| |
| std::thread t([this] { this->Run(); }); |
| writeback_thread_ = std::move(t); |
| } |
| |
| DelayedLocalAggregateStorage::~DelayedLocalAggregateStorage() { |
| if (!writeback_thread_.joinable()) { |
| return; |
| } |
| ShutDown(); |
| VLOG(4) << "~DelayedLocalAggregateStorage: waiting for writeback thread to exit..."; |
| writeback_thread_.join(); |
| } |
| |
| void DelayedLocalAggregateStorage::ShutDown() { |
| WaitUntilSave(writeback_frequency_); |
| { |
| auto locked = state_.lock(); |
| locked->shut_down = true; |
| locked->shutdown_notifier.notify_all(); |
| locked->data_save_notifier.notify_all(); |
| } |
| VLOG(4) << "DelayedLocalAggregateStorage: shut-down requested."; |
| } |
| |
| void DelayedLocalAggregateStorage::ReadPersistentStore() { proto_store_.Read(&aggregates_); } |
| |
| void DelayedLocalAggregateStorage::DeleteData() { |
| aggregates_.clear_by_customer_id(); |
| state_.lock()->data_modified = true; |
| WaitUntilSave(writeback_frequency_ * 2); |
| } |
| |
| void DelayedLocalAggregateStorage::DeleteOutdatedMetrics() { |
| std::vector<std::tuple<uint32_t, uint32_t>> projects_to_delete; |
| std::vector<std::tuple<uint32_t, uint32_t, uint32_t>> metrics_to_delete; |
| for (auto [customer_id, customer] : aggregates_.by_customer_id()) { |
| for (auto [project_id, project] : customer.by_project_id()) { |
| std::unique_ptr<logger::ProjectContext> ctx = |
| global_project_context_factory_->NewProjectContext(customer_id, project_id); |
| if (!ctx) { |
| LOG(WARNING) << "Found customer/project that is not present in the registry: (" |
| << customer_id << ", " << project_id << "): Deleting it."; |
| projects_to_delete.emplace_back(std::make_tuple(customer_id, project_id)); |
| continue; |
| } |
| |
| for (auto [metric_id, _] : project.by_metric_id()) { |
| if (!ctx->GetMetric(metric_id)) { |
| LOG(WARNING) << "Found metric that is not present in the registry: (" << customer_id |
| << ", " << project_id << ", " << metric_id << "): Deleting it."; |
| metrics_to_delete.emplace_back(std::make_tuple(customer_id, project_id, metric_id)); |
| } |
| } |
| } |
| } |
| |
| for (auto [customer_id, project_id] : projects_to_delete) { |
| aggregates_.mutable_by_customer_id() |
| ->at(customer_id) |
| .mutable_by_project_id() |
| ->erase(project_id); |
| } |
| |
| for (auto [customer_id, project_id, metric_id] : metrics_to_delete) { |
| aggregates_.mutable_by_customer_id() |
| ->at(customer_id) |
| .mutable_by_project_id() |
| ->at(project_id) |
| .mutable_by_metric_id() |
| ->erase(metric_id); |
| } |
| } |
| |
| void DelayedLocalAggregateStorage::WaitUntilSave(std::chrono::milliseconds max_wait) { |
| auto locked = state_.lock(); |
| if (locked->shut_down || locked->data_modified) { |
| locked->data_save_notifier.wait_for( |
| locked, max_wait, [&locked] { return !(locked->shut_down || locked->data_modified); }); |
| } |
| } |
| |
| bool DelayedLocalAggregateStorage::WaitUntilSaveStart(std::chrono::milliseconds max_wait) { |
| auto locked = state_.lock(); |
| return locked->data_save_start_notifier.wait_for(locked, max_wait) == std::cv_status::no_timeout; |
| } |
| |
| void DelayedLocalAggregateStorage::Run() { |
| while (true) { |
| auto locked_state = state_.lock(); |
| if (locked_state->shut_down) { |
| return; |
| } |
| |
| VLOG(4) << "DelayedLocalAggregateStorage worker: sleeping for " << writeback_frequency_.count() |
| << " milliseconds."; |
| // We start each iteration of the loop with a sleep of writeback_frequency_. This ensures that |
| // we never write to disk twice within one writeback_frequency_ period. |
| locked_state->shutdown_notifier.wait_for(locked_state, writeback_frequency_, |
| [&locked_state] { return locked_state->shut_down; }); |
| |
| VLOG(4) << "DelayedLocalAggregateStorage worker: waking up from sleep. shut_down=" |
| << locked_state->shut_down; |
| if (locked_state->shut_down) { |
| return; |
| } |
| |
| if (locked_state->data_modified) { |
| locked_state->data_save_start_notifier.notify_all(); |
| // The only other use of these two mutexes are in |
| // GetMetricAggregate(). In that codepath, the data_mutex_ is locked at the beginning of |
| // GetMetricAggregate, and returned inside of the MetricAggregateRef. Later, when the data has |
| // been modified, MetricAggregateRef::Save() is called, which then calls down to |
| // SaveMetricAggregate(), which then locks state_ so that it can update data_modified. |
| // Therefore, that codepath will always lock data_mutex_, then state_. |
| // Here, we unlock locked_state before attempting to acquire data_mutex_ so that this codepath |
| // acquires the two locks in the same order, thus avoiding any potential of deadlock. |
| locked_state.unlock(); |
| |
| // While everything is unlocked, track the current storage usage. |
| // This cannot cause an infinite loop, since we are already in the 'data_modified' state, |
| // which will be reset to false at the end of this block. |
| auto size = static_cast<int64_t>(aggregates_.ByteSizeLong()); |
| internal_metrics_->TrackDiskUsage( |
| logger::InternalMetrics::StorageClass::LocalAggregateStorage, size); |
| |
| // Lock the class so no new calls to GetMetricAggregate will return until the data has |
| // written. |
| std::scoped_lock<std::mutex> data_lock(data_mutex_); |
| locked_state.lock(); |
| |
| VLOG(4) << "DelayedLocalAggregateStorage worker: data has been modified, writing to disk."; |
| util::Status status = proto_store_.Write(aggregates_); |
| if (!status.ok()) { |
| LOG(WARNING) << "Failed to snapshot GlobalAggregates to disk: " << status.error_message(); |
| } |
| locked_state->data_save_notifier.notify_all(); |
| locked_state->data_modified = false; |
| } |
| } |
| } |
| |
| bool DelayedLocalAggregateStorage::HasMetricAggregate(uint32_t customer_id, uint32_t project_id, |
| uint32_t metric_id) { |
| if (aggregates_.by_customer_id().count(customer_id)) { |
| CustomerAggregates &customer = aggregates_.mutable_by_customer_id()->at(customer_id); |
| if (customer.by_project_id().count(project_id)) { |
| ProjectAggregates &project = customer.mutable_by_project_id()->at(project_id); |
| if (project.by_metric_id().count(metric_id)) { |
| return true; |
| } |
| } |
| } |
| |
| return false; |
| } |
| |
| lib::statusor::StatusOr<LocalAggregateStorage::MetricAggregateRef> |
| DelayedLocalAggregateStorage::GetMetricAggregate(uint32_t customer_id, uint32_t project_id, |
| uint32_t metric_id) { |
| std::unique_lock<std::mutex> data_lock(data_mutex_); |
| |
| if (HasMetricAggregate(customer_id, project_id, metric_id)) { |
| return MetricAggregateRef(customer_id, project_id, metric_id, |
| &aggregates_.mutable_by_customer_id() |
| ->at(customer_id) |
| .mutable_by_project_id() |
| ->at(project_id) |
| .mutable_by_metric_id() |
| ->at(metric_id), |
| this, std::move(data_lock)); |
| } |
| |
| std::unique_ptr<logger::ProjectContext> ctx = |
| global_project_context_factory_->NewProjectContext(customer_id, project_id); |
| if (ctx) { |
| CustomerAggregates &customer = (*aggregates_.mutable_by_customer_id())[customer_id]; |
| ProjectAggregates &project = (*customer.mutable_by_project_id())[project_id]; |
| if (ctx->GetMetric(metric_id)) { |
| return MetricAggregateRef(customer_id, project_id, metric_id, |
| &(*project.mutable_by_metric_id())[metric_id], this, |
| std::move(data_lock)); |
| } |
| return util::Status( |
| util::StatusCode::NOT_FOUND, |
| absl::StrCat("Metric id `", metric_id, "` not found in registry for project (", customer_id, |
| ", ", project_id, ")")); |
| } |
| |
| return util::Status( |
| util::StatusCode::NOT_FOUND, |
| absl::StrCat("Project (", customer_id, ", ", project_id, ") not found in registry")); |
| } |
| |
| util::Status DelayedLocalAggregateStorage::SaveMetricAggregate(uint32_t /* customer_id */, |
| uint32_t /* project_id */, |
| uint32_t /* metric_id */) { |
| state_.lock()->data_modified = true; |
| return util::Status::OK; |
| } |
| |
| } // namespace cobalt::local_aggregation |