| // Copyright 2020 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #ifndef COBALT_SRC_LOCAL_AGGREGATION_1_1_LOCAL_AGGREGATE_STORAGE_DELAYED_LOCAL_AGGREGATE_STORAGE_H_ |
| #define COBALT_SRC_LOCAL_AGGREGATION_1_1_LOCAL_AGGREGATE_STORAGE_DELAYED_LOCAL_AGGREGATE_STORAGE_H_ |
| |
| #include <condition_variable> |
| #include <cstdint> |
| #include <mutex> |
| #include <string> |
| #include <thread> |
| |
| #include "src/lib/util/consistent_proto_store.h" |
| #include "src/lib/util/file_system.h" |
| #include "src/lib/util/protected_fields.h" |
| #include "src/local_aggregation_1_1/local_aggregate_storage/local_aggregate_storage.h" |
| #include "src/local_aggregation_1_1/local_aggregation.pb.h" |
| #include "src/logger/internal_metrics.h" |
| #include "src/logger/logger_interface.h" |
| #include "src/logger/project_context_factory.h" |
| #include "src/pb/common.pb.h" |
| #include "src/pb/metadata_builder.h" |
| #include "src/public/lib/registry_identifiers.h" |
| #include "src/public/lib/statusor/statusor.h" |
| #include "src/registry/cobalt_registry.pb.h" |
| |
| namespace cobalt::local_aggregation { |
| |
| // DelayedLocalAggregateStorage implements LocalAggregateStorage with writes to disk every 5 minutes |
| // by default. |
| // |
| // This implementation stores all MetricAggregates in a single GlobalAggregates proto. A background |
| // thread will attempt to write this GlobalAggregates proto to disk every five minutes, but only if |
| // a call to SaveMetricAggregate has happened. |
| // |
| // Note: This implementation has the potential to lose data if the system loses power before the 5 |
| // minutes are up, but it performs significantly better on systems with slow writes, and those that |
| // suffer from write amplification. |
| class DelayedLocalAggregateStorage : public LocalAggregateStorage { |
| public: |
| static const std::chrono::milliseconds kDefaultWritebackFrequency; |
| |
| // Constructor for a DelayedLocalAggregateStorage object |
| // |
| // |filename|: The absolute path to the file where the local aggregation data is |
| // stored. This file doesn't need to exist yet, but its parent |
| // directory must already exist. |
| // |fs|: An instance of the FileSystem interface. Used for reading/writing files. |
| // |global_project_context_factory|: The current global registry. |
| DelayedLocalAggregateStorage( |
| std::string filename, util::FileSystem *fs, |
| const logger::ProjectContextFactory *global_project_context_factory, |
| MetadataBuilder *metadata_builder, int64_t per_project_reserved_bytes, |
| std::chrono::milliseconds writeback_frequency = kDefaultWritebackFrequency); |
| |
| ~DelayedLocalAggregateStorage() override; |
| bool HasMetricAggregate(lib::MetricIdentifier metric); |
| lib::statusor::StatusOr<MetricAggregateRef> GetMetricAggregate( |
| lib::MetricIdentifier metric) override; |
| |
| Status GarbageCollection() override; |
| |
| void DeleteData() override; |
| |
| // Blocks for |max_wait| milliseconds or until the writeback thread has written data to disk. |
| void WaitUntilSave(std::chrono::milliseconds max_wait); |
| |
| // Blocks for |max_wait| milliseconds or until the writeback thread has begun to write data. |
| bool WaitUntilSaveStart(std::chrono::milliseconds max_wait); |
| |
| void ResetInternalMetrics(logger::InternalMetrics *internal_metrics) override { |
| internal_metrics_.reset(internal_metrics); |
| } |
| |
| // Causes the DelayedLocalAggregateStorage to shut down. If there is data to write, it will be |
| // written to disk. All condition variables will be notified in order to wake up any waiting |
| // threads. |
| void ShutDown() override; |
| |
| protected: |
| Status SaveMetricAggregate(lib::MetricIdentifier metric) override; |
| |
| void StoreFilteredSystemProfile(uint64_t system_profile_hash, |
| const SystemProfile &filtered_system_profile) override; |
| |
| [[nodiscard]] lib::statusor::StatusOr<SystemProfile> RetrieveFilteredSystemProfile( |
| uint64_t system_profile_hash) const override; |
| |
| private: |
| // DeleteOutdatedMetrics walks the filesystem from the |base_directory_| down and deletes |
| // MetricAggregate files, and project directories that do not exist in the CobaltRegistry. |
| // |
| // TODO(fxbug.dev/51390): Customers that are not present in the registry should be deleted |
| // too. |
| void DeleteOutdatedMetrics(); |
| |
| // MigrateStoredData updates the data in the aggregate storage to reflect changes that have |
| // occurred. Changes that are migrated include: |
| // - data that is using deprecated fields, is migrated to use the new fields |
| // - data is migrated to reflect registry changes that change the way data is stored |
| void MigrateStoredData(); |
| |
| // InitializePersistentStore iterates through the GlobalAggregates and updates the project size |
| // for each of them. |
| void InitializePersistentStore(); |
| |
| // InitializePersistentStore iterates through the registry and creates the customer/project |
| // directories for all of the metrics in the registry (if they don't already exist). Additionally, |
| // it adds empty MetricAggregate objects to the |aggregates_| object. It does not create metric |
| // files, since at this point they are guaranteed to be empty. |
| void ReadPersistentStore(); |
| |
| // The main method run by the worker thread. Executes a loop that exits when ShutDown() is |
| // invoked. |
| void Run(); |
| |
| util::ConsistentProtoStore proto_store_; |
| const logger::ProjectContextFactory *global_project_context_factory_; |
| MetadataBuilder *metadata_builder_; |
| logger::InternalMetricsPtr internal_metrics_; |
| |
| // This mutex only guards aggregates_ and its lock is passed into the MetricAggregateRef returned |
| // from GetMetricAggregate. If this lock is needed in addition to a lock on state_, this lock |
| // should be taken first. |
| std::mutex data_mutex_; |
| GlobalAggregates aggregates_; |
| |
| std::thread writeback_thread_; |
| std::chrono::milliseconds writeback_frequency_; |
| |
| // This is the state that needs to be modified quickly, especially while data_mutex_ may be held. |
| // If this lock is needed in addition to a lock on data_mutex_, the lock on data_mutex_ should be |
| // taken first. |
| struct State { |
| // When ShutDown is called, this value will be set to true. |
| bool shut_down = false; |
| // Used to wake up threads when a shutdown has been requested. |
| std::condition_variable_any shutdown_notifier; |
| // Used to notify the ShutDown method that the shutdown has completed. |
| bool shut_down_complete = false; |
| std::condition_variable_any shutdown_complete_notifier; |
| |
| // Set to true whenever SaveMetricAggregate is called. |
| bool data_modified = false; |
| // Used to notify when a writeback has begun (See: WaitUntilSaveStart) |
| std::condition_variable_any data_save_start_notifier; |
| // Used to notify when a writeback has finished (See: WaitUntilSave) |
| std::condition_variable_any data_save_notifier; |
| }; |
| util::ProtectedFields<State> state_; |
| }; |
| |
| } // namespace cobalt::local_aggregation |
| |
| #endif // COBALT_SRC_LOCAL_AGGREGATION_1_1_LOCAL_AGGREGATE_STORAGE_DELAYED_LOCAL_AGGREGATE_STORAGE_H_ |