| // Copyright 2020 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #ifndef COBALT_SRC_LOCAL_AGGREGATION_1_1_LOCAL_AGGREGATE_STORAGE_LOCAL_AGGREGATE_STORAGE_H_ |
| #define COBALT_SRC_LOCAL_AGGREGATION_1_1_LOCAL_AGGREGATE_STORAGE_LOCAL_AGGREGATE_STORAGE_H_ |
| |
| #include <mutex> |
| #include <ostream> |
| |
| #include "src/lib/util/datetime_util.h" |
| #include "src/lib/util/file_system.h" |
| #include "src/lib/util/protected_fields.h" |
| #include "src/local_aggregation_1_1/local_aggregation.pb.h" |
| #include "src/logger/internal_metrics.h" |
| #include "src/logger/logger_interface.h" |
| #include "src/logger/project_context_factory.h" |
| #include "src/pb/common.pb.h" |
| #include "src/pb/metadata_builder.h" |
| #include "src/public/lib/registry_identifiers.h" |
| #include "src/public/lib/status.h" |
| #include "src/public/lib/statusor/statusor.h" |
| |
| namespace cobalt::local_aggregation { |
| // LocalAggregateStorage is the generic interface for storing MetricAggregates. |
| // |
| // Different systems perform better with different implementations of this class, so it is up to |
| // the embedder to chose which StorageStrategy is best. |
| class LocalAggregateStorage { |
| public: |
| enum class StorageStrategy { |
| // Use the ImmediateLocalAggregateStorage implementation. This method writes to disk |
| // synchronously after each call to SaveMetricAggregate. |
| Immediate, |
| |
| // Use the DelayedLocalAggregateStorage implementation. This method is best used on embeddings |
| // where writes to disk are slow, or write amplification is an issue. |
| Delayed, |
| }; |
| |
| // MetricAggregateRef contains a pointer to the requested MetricAggregate, as well as |
| // unique_lock that holds the mutex for the LocalAggregateStorage it came from. |
| class MetricAggregateRef { |
| public: |
| MetricAggregateRef(lib::MetricIdentifier metric, MetricAggregate *aggregate, |
| LocalAggregateStorage *self, std::mutex &mutex) |
| : metric_(metric), |
| aggregate_(aggregate), |
| initial_aggregate_size_(aggregate_->ByteSizeLong()), |
| self_(self), |
| lock_(mutex) {} |
| |
| MetricAggregateRef(lib::MetricIdentifier metric, MetricAggregate *aggregate, |
| LocalAggregateStorage *self, std::unique_lock<std::mutex> mutex_lock) |
| : metric_(metric), |
| aggregate_(aggregate), |
| initial_aggregate_size_(aggregate_->ByteSizeLong()), |
| self_(self), |
| lock_(std::move(mutex_lock)) {} |
| |
| // Returns the pointer to the contained MetricAggregate |
| MetricAggregate *aggregate() { return aggregate_; } |
| |
| // Store the filtered SystemProfile in the aggregate storage. |
| // This must be called for any system_profile_hash values that are added to the MetricAggregate |
| // data, after adding them to the aggregate data, and before calling Save. |
| void StoreFilteredSystemProfile(uint64_t system_profile_hash, |
| const SystemProfile &filtered_system_profile) { |
| self_->StoreFilteredSystemProfile(system_profile_hash, filtered_system_profile); |
| } |
| |
| // Retrieve the filtered SystemProfile from the aggregate storage. |
| // This can be called for any system_profile_hash values that are present in the MetricAggregate |
| // data, as long as the unique_lock in the MetricAggregateRef is being held. |
| [[nodiscard]] lib::statusor::StatusOr<SystemProfile> RetrieveFilteredSystemProfile( |
| uint64_t system_profile_hash) const { |
| return self_->RetrieveFilteredSystemProfile(system_profile_hash); |
| } |
| |
| // Triggers the source LocalAggregateStorage to save the given MetricAggregate. |
| Status Save() { |
| self_->UpdateProjectSizeBy(metric_.project(), |
| static_cast<int64_t>(aggregate_->ByteSizeLong()) - |
| static_cast<int64_t>(initial_aggregate_size_)); |
| return self_->SaveMetricAggregate(metric_); |
| } |
| |
| private: |
| lib::MetricIdentifier metric_; |
| |
| MetricAggregate *aggregate_; |
| size_t initial_aggregate_size_; |
| LocalAggregateStorage *self_; |
| std::unique_lock<std::mutex> lock_; |
| }; |
| |
| explicit LocalAggregateStorage(int64_t per_project_reserved_bytes) |
| : per_project_reserved_bytes_(per_project_reserved_bytes) {} |
| |
| // New is the expected way to construct LocalAggregateStorage objects. Depending on which |
| // StorageStrategy is supplied, a different implementation of LocalAggregateStorage will be |
| // used. |
| static std::unique_ptr<LocalAggregateStorage> New( |
| StorageStrategy strategy, std::string base_directory, util::FileSystem *fs, |
| const logger::ProjectContextFactory *global_project_context_factory, |
| MetadataBuilder *metadata_builder, int64_t per_project_reserved_bytes); |
| |
| // MigrateStoredData updates the existing data in the metric to reflect changes that have |
| // occurred. Changes that are migrated include: |
| // - data that is using deprecated fields, is migrated to use the new fields |
| // - data is migrated to reflect registry changes that change the way data is stored |
| // Returns true if the data was changed. |
| bool MigrateStoredData(MetricAggregate &metric, const MetricDefinition &metric_definition, |
| const MetadataBuilder *metadata_builder); |
| |
| // GetMetricAggregate returns a pointer to the live, mutable MetricAggregate that was requested. |
| // If no such aggregate exists, nullptr will be returned. |
| // |
| // Note: After modifying the MetricAggregate returned by this function, the user is expected to |
| // call 'SaveMetricAggregate' so that the modified values can be persisted to disk. |
| virtual lib::statusor::StatusOr<MetricAggregateRef> GetMetricAggregate( |
| lib::MetricIdentifier metric) = 0; |
| |
| // GarbageCollection is called periodically to allow the storage to garbage collect any unneeded |
| // data. |
| virtual Status GarbageCollection() = 0; |
| |
| // Returns the amount of data stored in the whole LocalAggregateStorage |
| [[nodiscard]] int64_t AmountStored() const { return byte_tracking_.const_lock()->total; } |
| |
| // Returns the amount of data stored for the given project |
| [[nodiscard]] int64_t AmountStored(lib::ProjectIdentifier project) const { |
| auto lock = byte_tracking_.const_lock(); |
| if (lock->total_per_project.count(project) > 0) { |
| return lock->total_per_project.at(project); |
| } |
| return 0; |
| } |
| |
| // Returns the amount of data used by projects that exceed their guaranteed per-project data cap. |
| // Data in Slush is on a first come first served basis. |
| [[nodiscard]] int64_t SlushUsed() const { return byte_tracking_.const_lock()->total_slush; } |
| |
| // When DeleteData is called, all aggregated data should be deleted and written to disk |
| // immediately. |
| virtual void DeleteData() = 0; |
| |
| virtual void ResetInternalMetrics(logger::InternalMetrics *internal_metrics) = 0; |
| |
| // Trigger the shut down procedures for the local_aggregate_storage instance. |
| virtual void ShutDown() = 0; |
| |
| virtual ~LocalAggregateStorage() = default; |
| |
| protected: |
| // StoreFilteredSystemProfile ensures that a filtered SystemProfile is present in the aggregate |
| // storage data. |
| // |
| // This must be called after modifying the MetricAggregate returned by GetMetricAggregate, to add |
| // the use of the system_profile_hash, and before calling SaveMetricAggregate. It must be called |
| // while a MetricAggregateRef continues to hold the mutex for the LocalAggregateStorage it came |
| // from. It can also be called during the call to MigrateStoredData. |
| // |
| // LocalAggregateStorage implementations should implement periodic garbage collection of unused |
| // SystemProfiles from the store. When there are no outstanding MetricAggregateRef locks, any |
| // system_profile_hash values that don't appear in any MetricAggregates can be removed. |
| virtual void StoreFilteredSystemProfile(uint64_t system_profile_hash, |
| const SystemProfile &filtered_system_profile) = 0; |
| |
| // RetrieveFilteredSystemProfile loads a filtered SystemProfile that is present in the aggregates |
| // from the storage data. |
| // |
| // This must be called while a MetricAggregateRef continues to hold the mutex for the |
| // LocalAggregateStorage it came from. |
| [[nodiscard]] virtual lib::statusor::StatusOr<SystemProfile> RetrieveFilteredSystemProfile( |
| uint64_t system_profile_hash) const = 0; |
| |
| // SaveMetricAggregate writes the current state of the MetricAggregate for the given |
| // (customer, project, metric) tuple to disk. |
| // |
| // Note: This should be called after modifying the MetricAggregate returned by |
| // GetMetricAggregate. |
| virtual Status SaveMetricAggregate(lib::MetricIdentifier metric) = 0; |
| |
| // UpdateProjectSizeBy is called whenever the amount of data stored for a project changes. It |
| // handles the bookkeeping to keep track of how much data is stored per project, and how much is |
| // stored overall. |
| void UpdateProjectSizeBy(lib::ProjectIdentifier proj, int64_t size_increase); |
| |
| private: |
| void MigrateAggregationPeriodBucket(AggregationPeriodBucket &bucket, util::TimeInfo time_info, |
| int64_t timestamp, const MetricDefinition &metric_definition, |
| const ReportDefinition &report_def, |
| const MetadataBuilder *metadata_builder); |
| |
| const int64_t per_project_reserved_bytes_; |
| struct ByteTracking { |
| // The total number of bytes used across all projects. |
| int64_t total = 0; |
| |
| // The total number of bytes over per_project_reserved_bytes_ for all projects. |
| int64_t total_slush = 0; |
| |
| // The total number of bytes used per project |
| std::map<lib::ProjectIdentifier, int64_t> total_per_project; |
| }; |
| util::ProtectedFields<ByteTracking> byte_tracking_; |
| |
| friend class LocalAggregateStorageTest_HandlesBookkeepingAsExpected_Test; |
| }; |
| |
| } // namespace cobalt::local_aggregation |
| |
| #endif // COBALT_SRC_LOCAL_AGGREGATION_1_1_LOCAL_AGGREGATE_STORAGE_LOCAL_AGGREGATE_STORAGE_H_ |