| // Copyright 2020 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "src/local_aggregation_1_1/local_aggregate_storage/immediate_local_aggregate_storage.h" |
| |
| #include <set> |
| |
| #include "absl/strings/numbers.h" |
| #include "absl/strings/str_cat.h" |
| #include "src/local_aggregation_1_1/local_aggregate_storage/local_aggregate_storage.h" |
| #include "src/logger/project_context_factory.h" |
| #include "src/logging.h" |
| #include "src/pb/metadata_builder.h" |
| #include "src/public/lib/registry_identifiers.h" |
| #include "src/public/lib/statusor/status_macros.h" |
| |
| namespace cobalt::local_aggregation { |
| |
| ImmediateLocalAggregateStorage::ImmediateLocalAggregateStorage( |
| std::string base_directory, util::FileSystem *fs, |
| const logger::ProjectContextFactory *global_project_context_factory, |
| MetadataBuilder *metadata_builder, int64_t per_project_reserved_bytes) |
| : LocalAggregateStorage(per_project_reserved_bytes), |
| base_directory_(std::move(base_directory)), |
| fs_(fs), |
| proto_store_(fs_), |
| global_project_context_factory_(global_project_context_factory), |
| metadata_builder_(metadata_builder) { |
| CHECK(global_project_context_factory_) |
| << "No global_project_context_factory provided. Cannot initialize store!"; |
| |
| std::string filtered_system_profiles_file = |
| absl::StrCat(base_directory_, "/", kFilteredSystemProfilesFile); |
| if (fs_->FileExists(filtered_system_profiles_file)) { |
| auto status = proto_store_.Read(filtered_system_profiles_file, &filtered_system_profiles_); |
| if (!status.ok()) { |
| LOG(FATAL) << "Failed to read the FilteredSystemProfiles from file '" |
| << filtered_system_profiles_file << "':" << status; |
| } |
| } |
| |
| DeleteOutdatedMetrics(); |
| MigrateStoredData(); |
| InitializePersistentStore(); |
| } |
| |
| void ImmediateLocalAggregateStorage::DeleteData() { |
| // Lock the class so no new calls to GetMetricAggregate will return until the deletion has |
| // completed. |
| std::scoped_lock<std::mutex> data_lock(mutex_); |
| for (const auto &customer_folder : fs_->ListFiles(base_directory_).ConsumeValueOr({})) { |
| if (customer_folder == kFilteredSystemProfilesFile) { |
| fs_->Delete(absl::StrCat(base_directory_, "/", customer_folder)); |
| } else { |
| fs_->DeleteRecursive(absl::StrCat(base_directory_, "/", customer_folder)); |
| } |
| } |
| aggregates_.clear(); |
| filtered_system_profiles_.clear_by_system_profile_hash(); |
| } |
| |
| void ImmediateLocalAggregateStorage::DeleteOutdatedMetrics() { |
| for (const auto &customer_folder : fs_->ListFiles(base_directory_).ConsumeValueOr({})) { |
| if (customer_folder == kFilteredSystemProfilesFile) { |
| continue; // Skip the filtered system profiles file. |
| } |
| auto customer_dir = absl::StrCat(base_directory_, "/", customer_folder); |
| |
| uint32_t customer_id; |
| if (!absl::SimpleAtoi(customer_folder, &customer_id)) { |
| LOG(WARNING) << "Found bad customer folder name: " << customer_folder; |
| continue; |
| } |
| |
| for (const auto &project_folder : fs_->ListFiles(customer_dir).ConsumeValueOr({})) { |
| auto project_dir = absl::StrCat(customer_dir, "/", project_folder); |
| |
| uint32_t project_id; |
| if (!absl::SimpleAtoi(project_folder, &project_id)) { |
| LOG(WARNING) << "Found bad project folder name: " << project_folder; |
| continue; |
| } |
| |
| auto project_context = global_project_context_factory_->NewProjectContext( |
| lib::CustomerIdentifier(customer_id).ForProject(project_id)); |
| if (!project_context) { |
| LOG(WARNING) << "Found customer/project that is not present in the registry: (" |
| << customer_id << ", " << project_id << "): Deleting it."; |
| if (!fs_->DeleteRecursive(project_dir)) { |
| LOG(ERROR) << "Unable to delete project folder"; |
| } |
| continue; |
| } |
| |
| for (const auto &metric_file : fs_->ListFiles(project_dir).ConsumeValueOr({})) { |
| auto metric_filename = absl::StrCat(project_dir, "/", metric_file); |
| |
| uint32_t metric_id; |
| if (!absl::SimpleAtoi(metric_file, &metric_id)) { |
| LOG(WARNING) << "Unable to parse metric file name `" << metric_file << "` as an integer"; |
| continue; |
| } |
| |
| if (!project_context->GetMetric(metric_id)) { |
| LOG(WARNING) << "Found metric that is not present in the registry: (" << customer_id |
| << ", " << project_id << ", " << metric_id << "): Deleting it."; |
| if (!fs_->Delete(metric_filename)) { |
| LOG(ERROR) << "Unable to delete metric file"; |
| } |
| } |
| } |
| } |
| } |
| } |
| |
| void ImmediateLocalAggregateStorage::MigrateStoredData() { |
| for (const auto &project : global_project_context_factory_->ListProjects()) { |
| std::string customer_dir = absl::StrCat(base_directory_, "/", project.customer_id()); |
| if (!fs_->FileExists(customer_dir)) { |
| continue; |
| } |
| std::string project_dir = absl::StrCat(customer_dir, "/", project.project_id()); |
| if (!fs_->FileExists(project_dir)) { |
| continue; |
| } |
| std::unique_ptr<logger::ProjectContext> ctx = |
| global_project_context_factory_->NewProjectContext(project); |
| for (lib::MetricIdentifier metric : ctx->ListMetrics()) { |
| std::string metric_file = absl::StrCat(project_dir, "/", metric.metric_id()); |
| if (!fs_->FileExists(metric_file)) { |
| continue; |
| } |
| MetricAggregate metric_aggregate; |
| auto status = proto_store_.Read(absl::StrCat(base_directory_, "/", metric.customer_id(), "/", |
| metric.project_id(), "/", metric.metric_id()), |
| &metric_aggregate); |
| if (!status.ok()) { |
| LOG(ERROR) << "Failed to migrate aggregate file: " << metric.customer_id() << "/" |
| << metric.project_id() << "/" << metric.metric_id() << ": " << status; |
| continue; |
| } |
| bool changed = LocalAggregateStorage::MigrateStoredData( |
| metric_aggregate, *ctx->GetMetric(metric), metadata_builder_); |
| if (changed) { |
| status = proto_store_.Write(absl::StrCat(base_directory_, "/", metric.customer_id(), "/", |
| metric.project_id(), "/", metric.metric_id()), |
| metric_aggregate); |
| if (!status.ok()) { |
| LOG(ERROR) << "Failed to write migrated aggregate file: " << metric.customer_id() << "/" |
| << metric.project_id() << "/" << metric.metric_id() << ": " << status; |
| continue; |
| } |
| LOG(INFO) << "Migrated the local aggregate file for metric: " << metric; |
| } |
| } |
| } |
| if (filtered_system_profiles_changed_) { |
| std::string filtered_system_profiles_file = |
| absl::StrCat(base_directory_, "/", kFilteredSystemProfilesFile); |
| auto status = proto_store_.Write(filtered_system_profiles_file, filtered_system_profiles_); |
| if (!status.ok()) { |
| LOG(ERROR) << "Failed to write the migrated FilteredSystemProfiles to file '" |
| << filtered_system_profiles_file << "':" << status; |
| } else { |
| filtered_system_profiles_changed_ = false; |
| } |
| } |
| } |
| |
| void ImmediateLocalAggregateStorage::InitializePersistentStore() { |
| if (!fs_->FileExists(base_directory_)) { |
| fs_->MakeDirectory(base_directory_); |
| } |
| for (const auto &project : global_project_context_factory_->ListProjects()) { |
| std::string customer_dir = absl::StrCat(base_directory_, "/", project.customer_id()); |
| if (!fs_->FileExists(customer_dir)) { |
| fs_->MakeDirectory(customer_dir); |
| } |
| |
| std::string project_dir = absl::StrCat(customer_dir, "/", project.project_id()); |
| if (!fs_->FileExists(project_dir)) { |
| fs_->MakeDirectory(project_dir); |
| } |
| |
| std::unique_ptr<logger::ProjectContext> ctx = |
| global_project_context_factory_->NewProjectContext(project); |
| for (lib::MetricIdentifier metric_identifier : ctx->ListMetrics()) { |
| std::string metric_file = absl::StrCat(project_dir, "/", metric_identifier.metric_id()); |
| if (!fs_->FileExists(metric_file)) { |
| aggregates_[metric_identifier] = MetricAggregate(); |
| UpdateProjectSizeBy(metric_identifier.project(), 0); |
| } else { |
| UpdateProjectSizeBy(metric_identifier.project(), |
| static_cast<int64_t>(fs_->FileSize(metric_file).ValueOrDie())); |
| } |
| } |
| } |
| } |
| |
| lib::statusor::StatusOr<LocalAggregateStorage::MetricAggregateRef> |
| ImmediateLocalAggregateStorage::GetMetricAggregate(lib::MetricIdentifier metric) { |
| // Check if the file has already been read |
| auto aggregate = aggregates_.find(metric); |
| if (aggregate != aggregates_.end()) { |
| return MetricAggregateRef(metric, &aggregate->second, this, mutex_); |
| } |
| |
| // Try to read the file (if the customer_id/project_id/metric_id tuple actually exists in the |
| // registry) |
| std::unique_ptr<logger::ProjectContext> ctx = |
| global_project_context_factory_->NewProjectContext(metric.project()); |
| if (!ctx) { |
| return util::StatusBuilder(StatusCode::NOT_FOUND, "Project (") |
| .AppendMsg(metric.project()) |
| .AppendMsg(") not found in registry") |
| .Build(); |
| } |
| if (!ctx->GetMetric(metric)) { |
| return util::StatusBuilder(StatusCode::NOT_FOUND, "Metric id `") |
| .AppendMsg(metric.metric_id()) |
| .AppendMsg("` not found in registry for project (") |
| .AppendMsg(metric.project()) |
| .AppendMsg(")") |
| .Build(); |
| } |
| |
| auto status = proto_store_.Read(absl::StrCat(base_directory_, "/", metric.customer_id(), "/", |
| metric.project_id(), "/", metric.metric_id()), |
| &aggregates_[metric]); |
| |
| if (!status.ok()) { |
| // Should be unlikely to occur, as InitializePersistentStore ensures that every metric either |
| // has an entry in |aggregates_| or a file in the file system. |
| LOG(ERROR) << "Failed to load aggregate file: " << metric.customer_id() << "/" |
| << metric.project_id() << "/" << metric.metric_id() << ": " << status; |
| // Fall through and return an empty aggregate for this metric. If the file did exist and was |
| // unable to be read, this results in all current data for the metric aggregate being lost |
| // (overwritten) when SaveMetricAggregate is called. |
| } |
| |
| return MetricAggregateRef(metric, &aggregates_[metric], this, mutex_); |
| } |
| |
| void ImmediateLocalAggregateStorage::StoreFilteredSystemProfile( |
| uint64_t system_profile_hash, const SystemProfile &filtered_system_profile) { |
| if (!filtered_system_profiles_.by_system_profile_hash().contains(system_profile_hash)) { |
| (*filtered_system_profiles_.mutable_by_system_profile_hash())[system_profile_hash] = |
| filtered_system_profile; |
| filtered_system_profiles_changed_ = true; |
| } |
| } |
| |
| lib::statusor::StatusOr<SystemProfile> |
| ImmediateLocalAggregateStorage::RetrieveFilteredSystemProfile(uint64_t system_profile_hash) const { |
| const google::protobuf::Map<uint64_t, SystemProfile> &by_system_profile_hash = |
| filtered_system_profiles_.by_system_profile_hash(); |
| const auto &system_profile_it = by_system_profile_hash.find(system_profile_hash); |
| if (system_profile_it != by_system_profile_hash.end()) { |
| return system_profile_it->second; |
| } |
| return util::StatusBuilder(StatusCode::NOT_FOUND, "SystemProfile hash `") |
| .AppendMsg(system_profile_hash) |
| .AppendMsg("` not found in filtered SystemProfile cache") |
| .Build(); |
| } |
| |
| Status ImmediateLocalAggregateStorage::SaveMetricAggregate(lib::MetricIdentifier metric) { |
| auto aggregate = aggregates_.find(metric); |
| if (aggregate == aggregates_.end()) { |
| return Status(StatusCode::NOT_FOUND, "No matching metric aggregate found"); |
| } |
| |
| CB_RETURN_IF_ERROR( |
| proto_store_.Write(absl::StrCat(base_directory_, "/", metric.customer_id(), "/", |
| metric.project_id(), "/", metric.metric_id()), |
| aggregate->second)); |
| if (filtered_system_profiles_changed_) { |
| CB_RETURN_IF_ERROR( |
| proto_store_.Write(absl::StrCat(base_directory_, "/", kFilteredSystemProfilesFile), |
| filtered_system_profiles_)); |
| filtered_system_profiles_changed_ = false; |
| } |
| return Status::OkStatus(); |
| } |
| |
| Status ImmediateLocalAggregateStorage::GarbageCollection() { |
| VLOG(4) << "Garbage collecting unused system profiles from the local aggregate storage."; |
| // Lock the class so no new calls to GetMetricAggregate will return until the garbage collection |
| // has completed. |
| std::scoped_lock<std::mutex> data_lock(mutex_); |
| std::set<uint64_t> system_profiles_in_use; |
| |
| // Find the hashes of all the system_profiles that are currently in use in the aggregate data. |
| for (const auto &project : global_project_context_factory_->ListProjects()) { |
| std::string customer_dir = absl::StrCat(base_directory_, "/", project.customer_id()); |
| if (!fs_->FileExists(customer_dir)) { |
| continue; |
| } |
| std::string project_dir = absl::StrCat(customer_dir, "/", project.project_id()); |
| if (!fs_->FileExists(project_dir)) { |
| continue; |
| } |
| std::unique_ptr<logger::ProjectContext> ctx = |
| global_project_context_factory_->NewProjectContext(project); |
| for (lib::MetricIdentifier metric : ctx->ListMetrics()) { |
| MetricAggregate *metric_aggregate; |
| // Load the data for the metric. Either from the in-memory map, or the file system. |
| auto aggregate_it = aggregates_.find(metric); |
| if (aggregate_it != aggregates_.end()) { |
| metric_aggregate = &aggregate_it->second; |
| } else { |
| metric_aggregate = &aggregates_[metric]; |
| // If a metric aggregate file can't be read, abort to avoid garbage collecting any system |
| // profiles that metric aggregate is currently using. |
| CB_RETURN_IF_ERROR( |
| proto_store_.Read(absl::StrCat(base_directory_, "/", metric.customer_id(), "/", |
| metric.project_id(), "/", metric.metric_id()), |
| metric_aggregate)); |
| } |
| |
| for (const auto &[_, report] : metric_aggregate->by_report_id()) { |
| switch (report.time_period_case()) { |
| case ReportAggregate::kHourly: |
| for (const auto &[_, bucket] : report.hourly().by_hour_id()) { |
| for (const SystemProfileAggregate &aggregate : bucket.system_profile_aggregates()) { |
| system_profiles_in_use.insert(aggregate.system_profile_hash()); |
| } |
| } |
| break; |
| case ReportAggregate::kDaily: |
| for (const auto &[_, bucket] : report.daily().by_day_index()) { |
| for (const SystemProfileAggregate &aggregate : bucket.system_profile_aggregates()) { |
| system_profiles_in_use.insert(aggregate.system_profile_hash()); |
| } |
| } |
| break; |
| default: |
| LOG(ERROR) << ""; |
| } |
| } |
| } |
| } |
| |
| // Remove the entries from the by_system_profile_hash that are not needed. |
| auto *by_system_profiles = filtered_system_profiles_.mutable_by_system_profile_hash(); |
| int removed = 0; |
| for (auto it = by_system_profiles->begin(); it != by_system_profiles->end();) { |
| if (system_profiles_in_use.count(it->first) == 0) { |
| ++removed; |
| it = by_system_profiles->erase(it); |
| } else { |
| ++it; |
| } |
| } |
| |
| if (removed > 0) { |
| VLOG(4) << "Garbage collecting " << removed |
| << " filtered SystemProfiles from the local aggregate storage."; |
| CB_RETURN_IF_ERROR( |
| proto_store_.Write(absl::StrCat(base_directory_, "/", kFilteredSystemProfilesFile), |
| filtered_system_profiles_)); |
| filtered_system_profiles_changed_ = false; |
| } |
| |
| return Status::OkStatus(); |
| } |
| |
| } // namespace cobalt::local_aggregation |