| // Copyright 2020 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #ifndef COBALT_SRC_LOCAL_AGGREGATION_1_1_LOCAL_AGGREGATE_STORAGE_DELAYED_LOCAL_AGGREGATE_STORAGE_H_ |
| #define COBALT_SRC_LOCAL_AGGREGATION_1_1_LOCAL_AGGREGATE_STORAGE_DELAYED_LOCAL_AGGREGATE_STORAGE_H_ |
| |
| #include <condition_variable> |
| #include <cstdint> |
| #include <mutex> |
| #include <string> |
| #include <thread> |
| |
| #include "src/lib/statusor/statusor.h" |
| #include "src/lib/util/consistent_proto_store.h" |
| #include "src/lib/util/file_system.h" |
| #include "src/lib/util/protected_fields.h" |
| #include "src/local_aggregation_1_1/local_aggregate_storage/local_aggregate_storage.h" |
| #include "src/local_aggregation_1_1/local_aggregation.pb.h" |
| #include "src/logger/internal_metrics.h" |
| #include "src/logger/logger_interface.h" |
| #include "src/logger/project_context_factory.h" |
| #include "src/registry/cobalt_registry.pb.h" |
| |
| namespace cobalt::local_aggregation { |
| |
| // DelayedLocalAggregateStorage implements LocalAggregateStorage with writes to disk every 5 seconds |
| // by default. |
| // |
| // This implementation stores all MetricAggregates in a single GlobalAggregates proto. A background |
| // thread will attempt to write this GlobalAggregates proto to disk every five seconds, but only if |
| // a call to SaveMetricAggregate has happened. |
| // |
| // Note: This implementation has the potential to lose data if the system loses power before the 5 |
| // seconds are up, but it performs significantly better on systems with slow writes, and those that |
| // suffer from write amplification. |
| class DelayedLocalAggregateStorage : public LocalAggregateStorage { |
| public: |
| static const std::chrono::milliseconds kDefaultWritebackFrequency; |
| |
| // Constructor for a DelayedLocalAggregateStorage object |
| // |
| // |base_directory|: The absolute path to the directory where the local aggregation files are |
| // stored. This directory doesn't need to exist yet, but its parent |
| // directory must already exist. |
| // |fs|: An instance of the FileSystem interface. Used for reading/writing files. |
| // |global_project_context_factory|: The current global registry. |
| DelayedLocalAggregateStorage( |
| std::string base_directory, util::FileSystem *fs, |
| const logger::ProjectContextFactory *global_project_context_factory, |
| std::chrono::milliseconds writeback_frequency = kDefaultWritebackFrequency); |
| |
| ~DelayedLocalAggregateStorage() override; |
| bool HasMetricAggregate(uint32_t customer_id, uint32_t project_id, uint32_t metric_id); |
| lib::statusor::StatusOr<MetricAggregateRef> GetMetricAggregate(uint32_t customer_id, |
| uint32_t project_id, |
| uint32_t metric_id) override; |
| |
| void DeleteData() override; |
| |
| // Blocks for |max_wait| milliseconds or until the writeback thread has written data to disk. |
| void WaitUntilSave(std::chrono::milliseconds max_wait); |
| |
| // Blocks for |max_wait| milliseconds or until the writeback thread has begun to write data. |
| bool WaitUntilSaveStart(std::chrono::milliseconds max_wait); |
| |
| void ResetInternalMetrics(logger::LoggerInterface *internal_logger) override { |
| internal_metrics_ = logger::InternalMetrics::NewWithLogger(internal_logger); |
| } |
| |
| protected: |
| util::Status SaveMetricAggregate(uint32_t customer_id, uint32_t project_id, |
| uint32_t metric_id) override; |
| |
| private: |
| // DeleteOutdatedMetrics walks the filesystem from the |base_directory_| down and deletes |
| // MetricAggregate files, and project directories that do not exist in the CobaltRegistry. |
| // |
| // TODO(fxbug.dev/51390): Customers that are not present in the registry should be deleted |
| // too. |
| void DeleteOutdatedMetrics(); |
| |
| // InitializePersistentStore iterates through the registry and creates the customer/project |
| // directories for all of the metrics in the registry (if they don't already exist). Additionally, |
| // it adds empty MetricAggregate objects to the |aggregates_| object. It does not create metric |
| // files, since at this point they are guaranteed to be empty. |
| void ReadPersistentStore(); |
| |
| // The main method run by the worker thread. Executes a loop that exits when ShutDown() is |
| // invoked. |
| void Run(); |
| |
| // Causes the DelayedLocalAggregateStorage to shut down. If there is data to write, it will be |
| // written to disk. All condition variables will be notified in order to wake up any waiting |
| // therads. |
| void ShutDown(); |
| |
| util::ConsistentProtoStore proto_store_; |
| const logger::ProjectContextFactory *global_project_context_factory_; |
| std::unique_ptr<logger::InternalMetrics> internal_metrics_; |
| |
| // This mutex only guards aggregates_ and its lock is passed into the MetricAggregateRef returned |
| // from GetMetricAggregate. If this lock is needed in addition to a lock on state_, this lock |
| // should be taken first. |
| std::mutex data_mutex_; |
| GlobalAggregates aggregates_; |
| |
| std::thread writeback_thread_; |
| std::chrono::milliseconds writeback_frequency_; |
| |
| // This is the state that needs to be modified quickly, especially while data_mutex_ may be held. |
| // If this lock is needed in addition to a lock on data_mutex_, the lock on data_mutex_ should be |
| // taken first. |
| struct State { |
| // When ShutDown is called, this value will be set to true. |
| bool shut_down = false; |
| // Used to wake up threads when a shutdown has been requested. |
| std::condition_variable_any shutdown_notifier; |
| |
| // Set to true whenever SaveMetricAggregate is called. |
| bool data_modified = false; |
| // Used to notify when a writeback has begun (See: WaitUntilSaveStart) |
| std::condition_variable_any data_save_start_notifier; |
| // Used to notify when a writeback has finished (See: WaitUntilSave) |
| std::condition_variable_any data_save_notifier; |
| }; |
| util::ProtectedFields<State> state_; |
| }; |
| |
| } // namespace cobalt::local_aggregation |
| |
| #endif // COBALT_SRC_LOCAL_AGGREGATION_1_1_LOCAL_AGGREGATE_STORAGE_DELAYED_LOCAL_AGGREGATE_STORAGE_H_ |