| // Copyright 2020 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "src/local_aggregation_1_1/local_aggregate_storage/delayed_local_aggregate_storage.h" |
| |
| #include <chrono> |
| #include <set> |
| |
| #include "absl/strings/str_cat.h" |
| #include "src/lib/util/file_system.h" |
| #include "src/local_aggregation_1_1/local_aggregate_storage/local_aggregate_storage.h" |
| #include "src/local_aggregation_1_1/local_aggregation.pb.h" |
| #include "src/logger/internal_metrics.h" |
| #include "src/logger/project_context.h" |
| #include "src/logger/project_context_factory.h" |
| #include "src/logging.h" |
| #include "src/pb/metadata_builder.h" |
| #include "src/public/lib/registry_identifiers.h" |
| #include "src/public/lib/status.h" |
| #include "src/public/lib/statusor/statusor.h" |
| |
| namespace cobalt::local_aggregation { |
| |
| constexpr std::chrono::milliseconds DelayedLocalAggregateStorage::kDefaultWritebackFrequency = |
| std::chrono::minutes(5); |
| constexpr std::chrono::milliseconds kMaxShutdownDelay = std::chrono::seconds(4); |
| |
| DelayedLocalAggregateStorage::DelayedLocalAggregateStorage( |
| std::string filename, util::FileSystem* fs, |
| const logger::ProjectContextFactory* global_project_context_factory, |
| MetadataBuilder* metadata_builder, int64_t per_project_reserved_bytes, |
| std::chrono::milliseconds writeback_frequency) |
| : LocalAggregateStorage(per_project_reserved_bytes), |
| proto_store_(std::move(filename), fs), |
| global_project_context_factory_(global_project_context_factory), |
| metadata_builder_(metadata_builder), |
| writeback_frequency_(writeback_frequency) { |
| ReadPersistentStore(); |
| DeleteOutdatedMetrics(); |
| MigrateStoredData(); |
| InitializePersistentStore(); |
| |
| std::thread t([this] { this->Run(); }); |
| writeback_thread_ = std::move(t); |
| } |
| |
| DelayedLocalAggregateStorage::~DelayedLocalAggregateStorage() { |
| ShutDown(); |
| if (writeback_thread_.joinable()) { |
| VLOG(4) << "~DelayedLocalAggregateStorage(): Waiting for writeback thread to exit..."; |
| writeback_thread_.join(); |
| } |
| } |
| |
| void DelayedLocalAggregateStorage::ShutDown() { |
| VLOG(4) << "DelayedLocalAggregateStorage::ShutDown(): shut-down requested."; |
| { |
| auto locked = state_.lock(); |
| locked->shut_down = true; |
| locked->shutdown_notifier.notify_all(); |
| locked->data_save_notifier.notify_all(); |
| if (!locked->shutdown_complete_notifier.wait_for( |
| locked, kMaxShutdownDelay, [&locked] { return locked->shut_down_complete; })) { |
| LOG(ERROR) << "DelayedLocalAggregateStorage::ShutDown(): Writeback thread did not shut down " |
| "in time. Data loss likely."; |
| } |
| } |
| } |
| |
| void DelayedLocalAggregateStorage::ReadPersistentStore() { proto_store_.Read(&aggregates_); } |
| |
| void DelayedLocalAggregateStorage::DeleteData() { |
| // Lock the class so no new calls to GetMetricAggregate will return until the deletion has |
| // completed. Lock data_mutex_ first, before taking a lock on state_. |
| std::scoped_lock<std::mutex> data_lock(data_mutex_); |
| aggregates_.clear_by_customer_id(); |
| aggregates_.clear_filtered_system_profiles(); |
| state_.lock()->data_modified = true; |
| WaitUntilSave(writeback_frequency_ * 2); |
| } |
| |
| void DelayedLocalAggregateStorage::DeleteOutdatedMetrics() { |
| std::vector<std::tuple<uint32_t, uint32_t>> projects_to_delete; |
| std::vector<std::tuple<uint32_t, uint32_t, uint32_t>> metrics_to_delete; |
| for (auto [customer_id, customer] : aggregates_.by_customer_id()) { |
| for (auto [project_id, project] : customer.by_project_id()) { |
| std::unique_ptr<logger::ProjectContext> ctx = |
| global_project_context_factory_->NewProjectContext( |
| lib::CustomerIdentifier(customer_id).ForProject(project_id)); |
| if (!ctx) { |
| LOG(WARNING) << "Found customer/project that is not present in the registry: (" |
| << customer_id << ", " << project_id << "): Deleting it."; |
| projects_to_delete.emplace_back(std::make_tuple(customer_id, project_id)); |
| continue; |
| } |
| |
| for (auto [metric_id, _] : project.by_metric_id()) { |
| if (!ctx->GetMetric(metric_id)) { |
| LOG(WARNING) << "Found metric that is not present in the registry: (" << customer_id |
| << ", " << project_id << ", " << metric_id << "): Deleting it."; |
| metrics_to_delete.emplace_back(std::make_tuple(customer_id, project_id, metric_id)); |
| } |
| } |
| } |
| } |
| |
| for (auto [customer_id, project_id] : projects_to_delete) { |
| aggregates_.mutable_by_customer_id() |
| ->at(customer_id) |
| .mutable_by_project_id() |
| ->erase(project_id); |
| } |
| |
| for (auto [customer_id, project_id, metric_id] : metrics_to_delete) { |
| aggregates_.mutable_by_customer_id() |
| ->at(customer_id) |
| .mutable_by_project_id() |
| ->at(project_id) |
| .mutable_by_metric_id() |
| ->erase(metric_id); |
| } |
| } |
| |
| void DelayedLocalAggregateStorage::MigrateStoredData() { |
| bool changed = false; |
| for (auto& [customer_id, customer] : *aggregates_.mutable_by_customer_id()) { |
| for (auto& [project_id, project] : *customer.mutable_by_project_id()) { |
| std::unique_ptr<logger::ProjectContext> ctx = |
| global_project_context_factory_->NewProjectContext( |
| lib::CustomerIdentifier(customer_id).ForProject(project_id)); |
| if (!ctx) { |
| // This shouldn't happen since DeleteOutdatedMetrics() is called before MigrateStoredData(). |
| LOG(ERROR) << "Found customer/project that is not present in the registry: (" << customer_id |
| << ", " << project_id << "): it will not be migrated."; |
| continue; |
| } |
| for (auto& [metric_id, metric] : *project.mutable_by_metric_id()) { |
| const MetricDefinition* metric_def = ctx->GetMetric(metric_id); |
| if (!metric_def) { |
| // This shouldn't happen since DeleteOutdatedMetrics() is called before |
| // MigrateStoredData(). |
| LOG(ERROR) << "Found metric that is not present in the registry: (" << customer_id << ", " |
| << project_id << ", " << metric_id << "): it will not be migrated."; |
| continue; |
| } |
| |
| if (LocalAggregateStorage::MigrateStoredData(metric, *metric_def, metadata_builder_)) { |
| changed = true; |
| LOG(INFO) << "Migrated some local aggregate data for metric: " << customer_id << "-" |
| << project_id << "-" << metric_id; |
| } |
| } |
| } |
| } |
| if (changed) { |
| // No need for locking as upgrading is only done at startup. |
| Status status = proto_store_.Write(aggregates_); |
| if (!status.ok()) { |
| LOG(ERROR) << "Failed to write migrated local aggregate file to disk: " << status; |
| } |
| } |
| } |
| |
| void DelayedLocalAggregateStorage::InitializePersistentStore() { |
| for (auto [customer_id, customer] : aggregates_.by_customer_id()) { |
| lib::CustomerIdentifier cust(customer_id); |
| for (auto [project_id, project] : customer.by_project_id()) { |
| for (auto [unused, metric] : project.by_metric_id()) { |
| UpdateProjectSizeBy(cust.ForProject(project_id), |
| static_cast<int64_t>(metric.ByteSizeLong())); |
| } |
| } |
| } |
| } |
| |
| void DelayedLocalAggregateStorage::WaitUntilSave(std::chrono::milliseconds max_wait) { |
| auto locked = state_.lock(); |
| if (locked->shut_down || locked->data_modified) { |
| locked->data_save_notifier.wait_for( |
| locked, max_wait, [&locked] { return !(locked->shut_down || locked->data_modified); }); |
| } |
| } |
| |
| bool DelayedLocalAggregateStorage::WaitUntilSaveStart(std::chrono::milliseconds max_wait) { |
| auto locked = state_.lock(); |
| return locked->data_save_start_notifier.wait_for(locked, max_wait) == std::cv_status::no_timeout; |
| } |
| |
| void DelayedLocalAggregateStorage::Run() { |
| while (true) { |
| auto locked_state = state_.lock(); |
| VLOG(4) << "DelayedLocalAggregateStorage worker: sleeping for " |
| << std::chrono::duration_cast<std::chrono::seconds>(writeback_frequency_).count() |
| << " seconds."; |
| // We start each iteration of the loop with a sleep of writeback_frequency_. This ensures that |
| // we never write to disk twice within one writeback_frequency_ period. |
| locked_state->shutdown_notifier.wait_for(locked_state, writeback_frequency_, |
| [&locked_state] { return locked_state->shut_down; }); |
| |
| VLOG(4) << "DelayedLocalAggregateStorage worker: waking up from sleep. shut_down=" |
| << locked_state->shut_down << ", data_modified=" << locked_state->data_modified; |
| |
| if (locked_state->data_modified) { |
| locked_state->data_save_start_notifier.notify_all(); |
| // The only other use of these two mutexes are in |
| // GetMetricAggregate(). In that codepath, the data_mutex_ is locked at the beginning of |
| // GetMetricAggregate, and returned inside of the MetricAggregateRef. Later, when the data |
| // has been modified, MetricAggregateRef::Save() is called, which then calls down to |
| // SaveMetricAggregate(), which then locks state_ so that it can update data_modified. |
| // Therefore, that codepath will always lock data_mutex_, then state_. |
| // Here, we unlock locked_state before attempting to acquire data_mutex_ so that this |
| // codepath acquires the two locks in the same order, thus avoiding any potential of |
| // deadlock. |
| locked_state.unlock(); |
| |
| // While everything is unlocked, track the current storage usage. |
| // This cannot cause an infinite loop, since we are already in the 'data_modified' state, |
| // which will be reset to false at the end of this block. |
| int64_t size; |
| { |
| // Take the data lock briefly so we can measure the size of aggregates_ without it being |
| // modified. |
| std::scoped_lock<std::mutex> data_lock(data_mutex_); |
| size = static_cast<int64_t>(aggregates_.ByteSizeLong()); |
| } |
| internal_metrics_->TrackDiskUsage( |
| logger::InternalMetrics::StorageClass::LocalAggregateStorage, size); |
| |
| // Lock the class so no new calls to GetMetricAggregate will return until the data has |
| // written. |
| std::scoped_lock<std::mutex> data_lock(data_mutex_); |
| locked_state.lock(); |
| |
| VLOG(4) << "DelayedLocalAggregateStorage worker: data has been modified, writing to disk."; |
| Status status = proto_store_.Write(aggregates_); |
| if (!status.ok()) { |
| LOG(WARNING) << "Failed to snapshot GlobalAggregates to disk: " << status; |
| } |
| locked_state->data_save_notifier.notify_all(); |
| locked_state->data_modified = false; |
| } |
| |
| if (locked_state->shut_down) { |
| locked_state->shut_down_complete = true; |
| locked_state->shutdown_complete_notifier.notify_all(); |
| return; |
| } |
| } |
| } |
| |
| bool DelayedLocalAggregateStorage::HasMetricAggregate(lib::MetricIdentifier metric) { |
| if (aggregates_.by_customer_id().count(metric.customer_id())) { |
| CustomerAggregates& customer = aggregates_.mutable_by_customer_id()->at(metric.customer_id()); |
| if (customer.by_project_id().count(metric.project_id())) { |
| ProjectAggregates& project = customer.mutable_by_project_id()->at(metric.project_id()); |
| if (project.by_metric_id().count(metric.metric_id())) { |
| return true; |
| } |
| } |
| } |
| |
| return false; |
| } |
| |
| lib::statusor::StatusOr<LocalAggregateStorage::MetricAggregateRef> |
| DelayedLocalAggregateStorage::GetMetricAggregate(lib::MetricIdentifier metric) { |
| std::unique_lock<std::mutex> data_lock(data_mutex_); |
| |
| if (HasMetricAggregate(metric)) { |
| return MetricAggregateRef(metric, |
| &aggregates_.mutable_by_customer_id() |
| ->at(metric.customer_id()) |
| .mutable_by_project_id() |
| ->at(metric.project_id()) |
| .mutable_by_metric_id() |
| ->at(metric.metric_id()), |
| this, std::move(data_lock)); |
| } |
| |
| std::unique_ptr<logger::ProjectContext> ctx = |
| global_project_context_factory_->NewProjectContext(metric.project()); |
| if (!ctx) { |
| return util::StatusBuilder(StatusCode::NOT_FOUND, "Project (") |
| .AppendMsg(metric.project()) |
| .AppendMsg(") not found in registry") |
| .Build(); |
| } |
| |
| CustomerAggregates& customer = (*aggregates_.mutable_by_customer_id())[metric.customer_id()]; |
| ProjectAggregates& project = (*customer.mutable_by_project_id())[metric.project_id()]; |
| if (!ctx->GetMetric(metric)) { |
| return util::StatusBuilder(StatusCode::NOT_FOUND, "Metric id `") |
| .AppendMsg(metric.metric_id()) |
| .AppendMsg("` not found in registry for project (") |
| .AppendMsg(metric.project()) |
| .AppendMsg(")") |
| .Build(); |
| } |
| |
| return MetricAggregateRef(metric, &(*project.mutable_by_metric_id())[metric.metric_id()], this, |
| std::move(data_lock)); |
| } |
| |
| void DelayedLocalAggregateStorage::StoreFilteredSystemProfile( |
| uint64_t system_profile_hash, const SystemProfile& filtered_system_profile) { |
| if (!aggregates_.filtered_system_profiles().by_system_profile_hash().contains( |
| system_profile_hash)) { |
| (*aggregates_.mutable_filtered_system_profiles() |
| ->mutable_by_system_profile_hash())[system_profile_hash] = filtered_system_profile; |
| } |
| } |
| |
| lib::statusor::StatusOr<SystemProfile> DelayedLocalAggregateStorage::RetrieveFilteredSystemProfile( |
| uint64_t system_profile_hash) const { |
| const google::protobuf::Map<uint64_t, SystemProfile>& by_system_profile_hash = |
| aggregates_.filtered_system_profiles().by_system_profile_hash(); |
| const auto& system_profile_it = by_system_profile_hash.find(system_profile_hash); |
| if (system_profile_it != by_system_profile_hash.end()) { |
| return system_profile_it->second; |
| } |
| return util::StatusBuilder(StatusCode::NOT_FOUND, "SystemProfile hash `") |
| .AppendMsg(system_profile_hash) |
| .AppendMsg("` not found in filtered SystemProfile cache") |
| .Build(); |
| } |
| |
| Status DelayedLocalAggregateStorage::SaveMetricAggregate(lib::MetricIdentifier /* metric */) { |
| state_.lock()->data_modified = true; |
| return Status::OkStatus(); |
| } |
| |
| Status DelayedLocalAggregateStorage::GarbageCollection() { |
| VLOG(4) << "Garbage collecting unused system profiles from the local aggregate storage."; |
| // Lock the class so no new calls to GetMetricAggregate will return until the garbage collection |
| // has completed. |
| std::scoped_lock<std::mutex> data_lock(data_mutex_); |
| std::set<uint64_t> system_profiles_in_use; |
| |
| // Find the hashes of all the system_profiles that are currently in use in the aggregate data. |
| for (const auto& [_, customer] : aggregates_.by_customer_id()) { |
| for (const auto& [_, project] : customer.by_project_id()) { |
| for (const auto& [_, metric] : project.by_metric_id()) { |
| for (const auto& [_, report] : metric.by_report_id()) { |
| switch (report.time_period_case()) { |
| case ReportAggregate::kHourly: |
| for (const auto& [_, bucket] : report.hourly().by_hour_id()) { |
| for (const SystemProfileAggregate& aggregate : bucket.system_profile_aggregates()) { |
| system_profiles_in_use.insert(aggregate.system_profile_hash()); |
| } |
| } |
| break; |
| case ReportAggregate::kDaily: |
| for (const auto& [_, bucket] : report.daily().by_day_index()) { |
| for (const SystemProfileAggregate& aggregate : bucket.system_profile_aggregates()) { |
| system_profiles_in_use.insert(aggregate.system_profile_hash()); |
| } |
| } |
| break; |
| default: |
| LOG(ERROR) << ""; |
| } |
| } |
| } |
| } |
| } |
| |
| // Remove the entries from the by_system_profile_hash that are not needed. |
| auto* by_system_profiles = |
| aggregates_.mutable_filtered_system_profiles()->mutable_by_system_profile_hash(); |
| int removed = 0; |
| for (auto it = by_system_profiles->begin(); it != by_system_profiles->end();) { |
| if (system_profiles_in_use.count(it->first) == 0) { |
| ++removed; |
| it = by_system_profiles->erase(it); |
| } else { |
| ++it; |
| } |
| } |
| |
| if (removed > 0) { |
| VLOG(4) << "Garbage collecting " << removed |
| << " filtered SystemProfiles from the local aggregate storage."; |
| state_.lock()->data_modified = true; |
| } |
| |
| return Status::OkStatus(); |
| } |
| |
| } // namespace cobalt::local_aggregation |