blob: 196ca334f9c76b13b8f49c9b123e4e513cb4dca7 [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef COBALT_SRC_LOCAL_AGGREGATION_1_1_LOCAL_AGGREGATE_STORAGE_LOCAL_AGGREGATE_STORAGE_H_
#define COBALT_SRC_LOCAL_AGGREGATION_1_1_LOCAL_AGGREGATE_STORAGE_LOCAL_AGGREGATE_STORAGE_H_
#include <mutex>
#include <ostream>
#include "src/lib/util/datetime_util.h"
#include "src/lib/util/file_system.h"
#include "src/lib/util/protected_fields.h"
#include "src/local_aggregation_1_1/local_aggregation.pb.h"
#include "src/logger/internal_metrics.h"
#include "src/logger/logger_interface.h"
#include "src/logger/project_context_factory.h"
#include "src/pb/common.pb.h"
#include "src/pb/metadata_builder.h"
#include "src/public/lib/registry_identifiers.h"
#include "src/public/lib/status.h"
#include "src/public/lib/statusor/statusor.h"
namespace cobalt::local_aggregation {
// LocalAggregateStorage is the generic interface for storing MetricAggregates.
//
// Different systems perform better with different implementations of this class, so it is up to
// the embedder to chose which StorageStrategy is best.
class LocalAggregateStorage {
public:
enum class StorageStrategy {
// Use the ImmediateLocalAggregateStorage implementation. This method writes to disk
// synchronously after each call to SaveMetricAggregate.
Immediate,
// Use the DelayedLocalAggregateStorage implementation. This method is best used on embeddings
// where writes to disk are slow, or write amplification is an issue.
Delayed,
};
// MetricAggregateRef contains a pointer to the requested MetricAggregate, as well as
// unique_lock that holds the mutex for the LocalAggregateStorage it came from.
class MetricAggregateRef {
public:
MetricAggregateRef(lib::MetricIdentifier metric, MetricAggregate *aggregate,
LocalAggregateStorage *self, std::mutex &mutex)
: metric_(metric),
aggregate_(aggregate),
initial_aggregate_size_(aggregate_->ByteSizeLong()),
self_(self),
lock_(mutex) {}
MetricAggregateRef(lib::MetricIdentifier metric, MetricAggregate *aggregate,
LocalAggregateStorage *self, std::unique_lock<std::mutex> mutex_lock)
: metric_(metric),
aggregate_(aggregate),
initial_aggregate_size_(aggregate_->ByteSizeLong()),
self_(self),
lock_(std::move(mutex_lock)) {}
// Returns the pointer to the contained MetricAggregate
MetricAggregate *aggregate() { return aggregate_; }
// Store the filtered SystemProfile in the aggregate storage.
// This must be called for any system_profile_hash values that are added to the MetricAggregate
// data, after adding them to the aggregate data, and before calling Save.
void StoreFilteredSystemProfile(uint64_t system_profile_hash,
const SystemProfile &filtered_system_profile) {
self_->StoreFilteredSystemProfile(system_profile_hash, filtered_system_profile);
}
// Retrieve the filtered SystemProfile from the aggregate storage.
// This can be called for any system_profile_hash values that are present in the MetricAggregate
// data, as long as the unique_lock in the MetricAggregateRef is being held.
[[nodiscard]] lib::statusor::StatusOr<SystemProfile> RetrieveFilteredSystemProfile(
uint64_t system_profile_hash) const {
return self_->RetrieveFilteredSystemProfile(system_profile_hash);
}
// Triggers the source LocalAggregateStorage to save the given MetricAggregate.
Status Save() {
self_->UpdateProjectSizeBy(metric_.project(),
static_cast<int64_t>(aggregate_->ByteSizeLong()) -
static_cast<int64_t>(initial_aggregate_size_));
return self_->SaveMetricAggregate(metric_);
}
private:
lib::MetricIdentifier metric_;
MetricAggregate *aggregate_;
size_t initial_aggregate_size_;
LocalAggregateStorage *self_;
std::unique_lock<std::mutex> lock_;
};
explicit LocalAggregateStorage(int64_t per_project_reserved_bytes)
: per_project_reserved_bytes_(per_project_reserved_bytes) {}
// New is the expected way to construct LocalAggregateStorage objects. Depending on which
// StorageStrategy is supplied, a different implementation of LocalAggregateStorage will be
// used.
static std::unique_ptr<LocalAggregateStorage> New(
StorageStrategy strategy, std::string base_directory, util::FileSystem *fs,
const logger::ProjectContextFactory *global_project_context_factory,
MetadataBuilder *metadata_builder, int64_t per_project_reserved_bytes);
// MigrateStoredData updates the existing data in the metric to reflect changes that have
// occurred. Changes that are migrated include:
// - data that is using deprecated fields, is migrated to use the new fields
// - data is migrated to reflect registry changes that change the way data is stored
// Returns true if the data was changed.
bool MigrateStoredData(MetricAggregate &metric, const MetricDefinition &metric_definition,
const MetadataBuilder *metadata_builder);
// GetMetricAggregate returns a pointer to the live, mutable MetricAggregate that was requested.
// If no such aggregate exists, nullptr will be returned.
//
// Note: After modifying the MetricAggregate returned by this function, the user is expected to
// call 'SaveMetricAggregate' so that the modified values can be persisted to disk.
virtual lib::statusor::StatusOr<MetricAggregateRef> GetMetricAggregate(
lib::MetricIdentifier metric) = 0;
// GarbageCollection is called periodically to allow the storage to garbage collect any unneeded
// data.
virtual Status GarbageCollection() = 0;
// Returns the amount of data stored in the whole LocalAggregateStorage
[[nodiscard]] int64_t AmountStored() const { return byte_tracking_.const_lock()->total; }
// Returns the amount of data stored for the given project
[[nodiscard]] int64_t AmountStored(lib::ProjectIdentifier project) const {
auto lock = byte_tracking_.const_lock();
if (lock->total_per_project.count(project) > 0) {
return lock->total_per_project.at(project);
}
return 0;
}
// Returns the amount of data used by projects that exceed their guaranteed per-project data cap.
// Data in Slush is on a first come first served basis.
[[nodiscard]] int64_t SlushUsed() const { return byte_tracking_.const_lock()->total_slush; }
// When DeleteData is called, all aggregated data should be deleted and written to disk
// immediately.
virtual void DeleteData() = 0;
virtual void ResetInternalMetrics(logger::InternalMetrics *internal_metrics) = 0;
// Trigger the shut down procedures for the local_aggregate_storage instance.
virtual void ShutDown() = 0;
virtual ~LocalAggregateStorage() = default;
protected:
// StoreFilteredSystemProfile ensures that a filtered SystemProfile is present in the aggregate
// storage data.
//
// This must be called after modifying the MetricAggregate returned by GetMetricAggregate, to add
// the use of the system_profile_hash, and before calling SaveMetricAggregate. It must be called
// while a MetricAggregateRef continues to hold the mutex for the LocalAggregateStorage it came
// from. It can also be called during the call to MigrateStoredData.
//
// LocalAggregateStorage implementations should implement periodic garbage collection of unused
// SystemProfiles from the store. When there are no outstanding MetricAggregateRef locks, any
// system_profile_hash values that don't appear in any MetricAggregates can be removed.
virtual void StoreFilteredSystemProfile(uint64_t system_profile_hash,
const SystemProfile &filtered_system_profile) = 0;
// RetrieveFilteredSystemProfile loads a filtered SystemProfile that is present in the aggregates
// from the storage data.
//
// This must be called while a MetricAggregateRef continues to hold the mutex for the
// LocalAggregateStorage it came from.
[[nodiscard]] virtual lib::statusor::StatusOr<SystemProfile> RetrieveFilteredSystemProfile(
uint64_t system_profile_hash) const = 0;
// SaveMetricAggregate writes the current state of the MetricAggregate for the given
// (customer, project, metric) tuple to disk.
//
// Note: This should be called after modifying the MetricAggregate returned by
// GetMetricAggregate.
virtual Status SaveMetricAggregate(lib::MetricIdentifier metric) = 0;
// UpdateProjectSizeBy is called whenever the amount of data stored for a project changes. It
// handles the bookkeeping to keep track of how much data is stored per project, and how much is
// stored overall.
void UpdateProjectSizeBy(lib::ProjectIdentifier proj, int64_t size_increase);
private:
void MigrateAggregationPeriodBucket(AggregationPeriodBucket &bucket, util::TimeInfo time_info,
int64_t timestamp, const MetricDefinition &metric_definition,
const ReportDefinition &report_def,
const MetadataBuilder *metadata_builder);
const int64_t per_project_reserved_bytes_;
struct ByteTracking {
// The total number of bytes used across all projects.
int64_t total = 0;
// The total number of bytes over per_project_reserved_bytes_ for all projects.
int64_t total_slush = 0;
// The total number of bytes used per project
std::map<lib::ProjectIdentifier, int64_t> total_per_project;
};
util::ProtectedFields<ByteTracking> byte_tracking_;
friend class LocalAggregateStorageTest_HandlesBookkeepingAsExpected_Test;
};
} // namespace cobalt::local_aggregation
#endif // COBALT_SRC_LOCAL_AGGREGATION_1_1_LOCAL_AGGREGATE_STORAGE_LOCAL_AGGREGATE_STORAGE_H_