blob: b4c1b1461760c8152ae930a93b1943ee00f46bce [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef COBALT_SRC_LOCAL_AGGREGATION_1_1_LOCAL_AGGREGATE_STORAGE_DELAYED_LOCAL_AGGREGATE_STORAGE_H_
#define COBALT_SRC_LOCAL_AGGREGATION_1_1_LOCAL_AGGREGATE_STORAGE_DELAYED_LOCAL_AGGREGATE_STORAGE_H_
#include <condition_variable>
#include <cstdint>
#include <mutex>
#include <string>
#include <thread>
#include "src/lib/util/consistent_proto_store.h"
#include "src/lib/util/file_system.h"
#include "src/lib/util/protected_fields.h"
#include "src/local_aggregation_1_1/local_aggregate_storage/local_aggregate_storage.h"
#include "src/local_aggregation_1_1/local_aggregation.pb.h"
#include "src/logger/internal_metrics.h"
#include "src/logger/logger_interface.h"
#include "src/logger/project_context_factory.h"
#include "src/pb/common.pb.h"
#include "src/pb/metadata_builder.h"
#include "src/public/lib/registry_identifiers.h"
#include "src/public/lib/statusor/statusor.h"
#include "src/registry/cobalt_registry.pb.h"
namespace cobalt::local_aggregation {
// DelayedLocalAggregateStorage implements LocalAggregateStorage with writes to disk every 5 minutes
// by default.
//
// This implementation stores all MetricAggregates in a single GlobalAggregates proto. A background
// thread will attempt to write this GlobalAggregates proto to disk every five minutes, but only if
// a call to SaveMetricAggregate has happened.
//
// Note: This implementation has the potential to lose data if the system loses power before the 5
// minutes are up, but it performs significantly better on systems with slow writes, and those that
// suffer from write amplification.
class DelayedLocalAggregateStorage : public LocalAggregateStorage {
public:
static const std::chrono::milliseconds kDefaultWritebackFrequency;
// Constructor for a DelayedLocalAggregateStorage object
//
// |filename|: The absolute path to the file where the local aggregation data is
// stored. This file doesn't need to exist yet, but its parent
// directory must already exist.
// |fs|: An instance of the FileSystem interface. Used for reading/writing files.
// |global_project_context_factory|: The current global registry.
DelayedLocalAggregateStorage(
std::string filename, util::FileSystem *fs,
const logger::ProjectContextFactory *global_project_context_factory,
MetadataBuilder *metadata_builder, int64_t per_project_reserved_bytes,
std::chrono::milliseconds writeback_frequency = kDefaultWritebackFrequency);
~DelayedLocalAggregateStorage() override;
bool HasMetricAggregate(lib::MetricIdentifier metric);
lib::statusor::StatusOr<MetricAggregateRef> GetMetricAggregate(
lib::MetricIdentifier metric) override;
Status GarbageCollection() override;
void DeleteData() override;
// Blocks for |max_wait| milliseconds or until the writeback thread has written data to disk.
void WaitUntilSave(std::chrono::milliseconds max_wait);
// Blocks for |max_wait| milliseconds or until the writeback thread has begun to write data.
bool WaitUntilSaveStart(std::chrono::milliseconds max_wait);
void ResetInternalMetrics(logger::InternalMetrics *internal_metrics) override {
internal_metrics_.reset(internal_metrics);
}
// Causes the DelayedLocalAggregateStorage to shut down. If there is data to write, it will be
// written to disk. All condition variables will be notified in order to wake up any waiting
// threads.
void ShutDown() override;
protected:
Status SaveMetricAggregate(lib::MetricIdentifier metric) override;
void StoreFilteredSystemProfile(uint64_t system_profile_hash,
const SystemProfile &filtered_system_profile) override;
[[nodiscard]] lib::statusor::StatusOr<SystemProfile> RetrieveFilteredSystemProfile(
uint64_t system_profile_hash) const override;
private:
// DeleteOutdatedMetrics walks the filesystem from the |base_directory_| down and deletes
// MetricAggregate files, and project directories that do not exist in the CobaltRegistry.
//
// TODO(fxbug.dev/51390): Customers that are not present in the registry should be deleted
// too.
void DeleteOutdatedMetrics();
// MigrateStoredData updates the data in the aggregate storage to reflect changes that have
// occurred. Changes that are migrated include:
// - data that is using deprecated fields, is migrated to use the new fields
// - data is migrated to reflect registry changes that change the way data is stored
void MigrateStoredData();
// InitializePersistentStore iterates through the GlobalAggregates and updates the project size
// for each of them.
void InitializePersistentStore();
// InitializePersistentStore iterates through the registry and creates the customer/project
// directories for all of the metrics in the registry (if they don't already exist). Additionally,
// it adds empty MetricAggregate objects to the |aggregates_| object. It does not create metric
// files, since at this point they are guaranteed to be empty.
void ReadPersistentStore();
// The main method run by the worker thread. Executes a loop that exits when ShutDown() is
// invoked.
void Run();
util::ConsistentProtoStore proto_store_;
const logger::ProjectContextFactory *global_project_context_factory_;
MetadataBuilder *metadata_builder_;
logger::InternalMetricsPtr internal_metrics_;
// This mutex only guards aggregates_ and its lock is passed into the MetricAggregateRef returned
// from GetMetricAggregate. If this lock is needed in addition to a lock on state_, this lock
// should be taken first.
std::mutex data_mutex_;
GlobalAggregates aggregates_;
std::thread writeback_thread_;
std::chrono::milliseconds writeback_frequency_;
// This is the state that needs to be modified quickly, especially while data_mutex_ may be held.
// If this lock is needed in addition to a lock on data_mutex_, the lock on data_mutex_ should be
// taken first.
struct State {
// When ShutDown is called, this value will be set to true.
bool shut_down = false;
// Used to wake up threads when a shutdown has been requested.
std::condition_variable_any shutdown_notifier;
// Used to notify the ShutDown method that the shutdown has completed.
bool shut_down_complete = false;
std::condition_variable_any shutdown_complete_notifier;
// Set to true whenever SaveMetricAggregate is called.
bool data_modified = false;
// Used to notify when a writeback has begun (See: WaitUntilSaveStart)
std::condition_variable_any data_save_start_notifier;
// Used to notify when a writeback has finished (See: WaitUntilSave)
std::condition_variable_any data_save_notifier;
};
util::ProtectedFields<State> state_;
};
} // namespace cobalt::local_aggregation
#endif // COBALT_SRC_LOCAL_AGGREGATION_1_1_LOCAL_AGGREGATE_STORAGE_DELAYED_LOCAL_AGGREGATE_STORAGE_H_