blob: 81138e358ee3e49d69e75710a59ca23b385be0c0 [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef COBALT_SRC_LOCAL_AGGREGATION_1_1_LOCAL_AGGREGATE_STORAGE_DELAYED_LOCAL_AGGREGATE_STORAGE_H_
#define COBALT_SRC_LOCAL_AGGREGATION_1_1_LOCAL_AGGREGATE_STORAGE_DELAYED_LOCAL_AGGREGATE_STORAGE_H_
#include <condition_variable>
#include <cstdint>
#include <mutex>
#include <string>
#include <thread>
#include "src/lib/statusor/statusor.h"
#include "src/lib/util/consistent_proto_store.h"
#include "src/lib/util/file_system.h"
#include "src/lib/util/protected_fields.h"
#include "src/local_aggregation_1_1/local_aggregate_storage/local_aggregate_storage.h"
#include "src/local_aggregation_1_1/local_aggregation.pb.h"
#include "src/logger/internal_metrics.h"
#include "src/logger/logger_interface.h"
#include "src/logger/project_context_factory.h"
#include "src/registry/cobalt_registry.pb.h"
namespace cobalt::local_aggregation {
// DelayedLocalAggregateStorage implements LocalAggregateStorage with writes to disk every 5 seconds
// by default.
//
// This implementation stores all MetricAggregates in a single GlobalAggregates proto. A background
// thread will attempt to write this GlobalAggregates proto to disk every five seconds, but only if
// a call to SaveMetricAggregate has happened.
//
// Note: This implementation has the potential to lose data if the system loses power before the 5
// seconds are up, but it performs significantly better on systems with slow writes, and those that
// suffer from write amplification.
class DelayedLocalAggregateStorage : public LocalAggregateStorage {
public:
static const std::chrono::milliseconds kDefaultWritebackFrequency;
// Constructor for a DelayedLocalAggregateStorage object
//
// |base_directory|: The absolute path to the directory where the local aggregation files are
// stored. This directory doesn't need to exist yet, but its parent
// directory must already exist.
// |fs|: An instance of the FileSystem interface. Used for reading/writing files.
// |global_project_context_factory|: The current global registry.
DelayedLocalAggregateStorage(
std::string base_directory, util::FileSystem *fs,
const logger::ProjectContextFactory *global_project_context_factory,
std::chrono::milliseconds writeback_frequency = kDefaultWritebackFrequency);
~DelayedLocalAggregateStorage() override;
bool HasMetricAggregate(uint32_t customer_id, uint32_t project_id, uint32_t metric_id);
lib::statusor::StatusOr<MetricAggregateRef> GetMetricAggregate(uint32_t customer_id,
uint32_t project_id,
uint32_t metric_id) override;
void DeleteData() override;
// Blocks for |max_wait| milliseconds or until the writeback thread has written data to disk.
void WaitUntilSave(std::chrono::milliseconds max_wait);
// Blocks for |max_wait| milliseconds or until the writeback thread has begun to write data.
bool WaitUntilSaveStart(std::chrono::milliseconds max_wait);
void ResetInternalMetrics(logger::LoggerInterface *internal_logger) override {
internal_metrics_ = logger::InternalMetrics::NewWithLogger(internal_logger);
}
protected:
util::Status SaveMetricAggregate(uint32_t customer_id, uint32_t project_id,
uint32_t metric_id) override;
private:
// DeleteOutdatedMetrics walks the filesystem from the |base_directory_| down and deletes
// MetricAggregate files, and project directories that do not exist in the CobaltRegistry.
//
// TODO(fxbug.dev/51390): Customers that are not present in the registry should be deleted
// too.
void DeleteOutdatedMetrics();
// InitializePersistentStore iterates through the registry and creates the customer/project
// directories for all of the metrics in the registry (if they don't already exist). Additionally,
// it adds empty MetricAggregate objects to the |aggregates_| object. It does not create metric
// files, since at this point they are guaranteed to be empty.
void ReadPersistentStore();
// The main method run by the worker thread. Executes a loop that exits when ShutDown() is
// invoked.
void Run();
// Causes the DelayedLocalAggregateStorage to shut down. If there is data to write, it will be
// written to disk. All condition variables will be notified in order to wake up any waiting
// therads.
void ShutDown();
util::ConsistentProtoStore proto_store_;
const logger::ProjectContextFactory *global_project_context_factory_;
std::unique_ptr<logger::InternalMetrics> internal_metrics_;
// This mutex only guards aggregates_ and its lock is passed into the MetricAggregateRef returned
// from GetMetricAggregate. If this lock is needed in addition to a lock on state_, this lock
// should be taken first.
std::mutex data_mutex_;
GlobalAggregates aggregates_;
std::thread writeback_thread_;
std::chrono::milliseconds writeback_frequency_;
// This is the state that needs to be modified quickly, especially while data_mutex_ may be held.
// If this lock is needed in addition to a lock on data_mutex_, the lock on data_mutex_ should be
// taken first.
struct State {
// When ShutDown is called, this value will be set to true.
bool shut_down = false;
// Used to wake up threads when a shutdown has been requested.
std::condition_variable_any shutdown_notifier;
// Set to true whenever SaveMetricAggregate is called.
bool data_modified = false;
// Used to notify when a writeback has begun (See: WaitUntilSaveStart)
std::condition_variable_any data_save_start_notifier;
// Used to notify when a writeback has finished (See: WaitUntilSave)
std::condition_variable_any data_save_notifier;
};
util::ProtectedFields<State> state_;
};
} // namespace cobalt::local_aggregation
#endif // COBALT_SRC_LOCAL_AGGREGATION_1_1_LOCAL_AGGREGATE_STORAGE_DELAYED_LOCAL_AGGREGATE_STORAGE_H_