blob: 0a0d0d94a1b2c47730cb691112242325bbfcfeff [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "src/local_aggregation_1_1/local_aggregate_storage/delayed_local_aggregate_storage.h"
#include <chrono>
#include "absl/strings/str_cat.h"
#include "src/lib/statusor/statusor.h"
#include "src/lib/util/file_system.h"
#include "src/local_aggregation_1_1/local_aggregate_storage/local_aggregate_storage.h"
#include "src/local_aggregation_1_1/local_aggregation.pb.h"
#include "src/logger/internal_metrics.h"
#include "src/logger/project_context.h"
#include "src/logger/project_context_factory.h"
#include "src/logging.h"
namespace cobalt::local_aggregation {
constexpr std::chrono::milliseconds DelayedLocalAggregateStorage::kDefaultWritebackFrequency =
std::chrono::seconds(5);
DelayedLocalAggregateStorage::DelayedLocalAggregateStorage(
std::string base_directory, util::FileSystem *fs,
const logger::ProjectContextFactory *global_project_context_factory,
std::chrono::milliseconds writeback_frequency)
: proto_store_(std::move(base_directory), fs),
global_project_context_factory_(global_project_context_factory),
internal_metrics_(logger::InternalMetrics::NewWithLogger(nullptr)),
writeback_frequency_(writeback_frequency) {
ReadPersistentStore();
DeleteOutdatedMetrics();
std::thread t([this] { this->Run(); });
writeback_thread_ = std::move(t);
}
DelayedLocalAggregateStorage::~DelayedLocalAggregateStorage() {
if (!writeback_thread_.joinable()) {
return;
}
ShutDown();
VLOG(4) << "~DelayedLocalAggregateStorage: waiting for writeback thread to exit...";
writeback_thread_.join();
}
void DelayedLocalAggregateStorage::ShutDown() {
WaitUntilSave(writeback_frequency_);
{
auto locked = state_.lock();
locked->shut_down = true;
locked->shutdown_notifier.notify_all();
locked->data_save_notifier.notify_all();
}
VLOG(4) << "DelayedLocalAggregateStorage: shut-down requested.";
}
void DelayedLocalAggregateStorage::ReadPersistentStore() { proto_store_.Read(&aggregates_); }
void DelayedLocalAggregateStorage::DeleteData() {
aggregates_.clear_by_customer_id();
state_.lock()->data_modified = true;
WaitUntilSave(writeback_frequency_ * 2);
}
void DelayedLocalAggregateStorage::DeleteOutdatedMetrics() {
std::vector<std::tuple<uint32_t, uint32_t>> projects_to_delete;
std::vector<std::tuple<uint32_t, uint32_t, uint32_t>> metrics_to_delete;
for (auto [customer_id, customer] : aggregates_.by_customer_id()) {
for (auto [project_id, project] : customer.by_project_id()) {
std::unique_ptr<logger::ProjectContext> ctx =
global_project_context_factory_->NewProjectContext(customer_id, project_id);
if (!ctx) {
LOG(WARNING) << "Found customer/project that is not present in the registry: ("
<< customer_id << ", " << project_id << "): Deleting it.";
projects_to_delete.emplace_back(std::make_tuple(customer_id, project_id));
continue;
}
for (auto [metric_id, _] : project.by_metric_id()) {
if (!ctx->GetMetric(metric_id)) {
LOG(WARNING) << "Found metric that is not present in the registry: (" << customer_id
<< ", " << project_id << ", " << metric_id << "): Deleting it.";
metrics_to_delete.emplace_back(std::make_tuple(customer_id, project_id, metric_id));
}
}
}
}
for (auto [customer_id, project_id] : projects_to_delete) {
aggregates_.mutable_by_customer_id()
->at(customer_id)
.mutable_by_project_id()
->erase(project_id);
}
for (auto [customer_id, project_id, metric_id] : metrics_to_delete) {
aggregates_.mutable_by_customer_id()
->at(customer_id)
.mutable_by_project_id()
->at(project_id)
.mutable_by_metric_id()
->erase(metric_id);
}
}
void DelayedLocalAggregateStorage::WaitUntilSave(std::chrono::milliseconds max_wait) {
auto locked = state_.lock();
if (locked->shut_down || locked->data_modified) {
locked->data_save_notifier.wait_for(
locked, max_wait, [&locked] { return !(locked->shut_down || locked->data_modified); });
}
}
bool DelayedLocalAggregateStorage::WaitUntilSaveStart(std::chrono::milliseconds max_wait) {
auto locked = state_.lock();
return locked->data_save_start_notifier.wait_for(locked, max_wait) == std::cv_status::no_timeout;
}
void DelayedLocalAggregateStorage::Run() {
while (true) {
auto locked_state = state_.lock();
if (locked_state->shut_down) {
return;
}
VLOG(4) << "DelayedLocalAggregateStorage worker: sleeping for " << writeback_frequency_.count()
<< " milliseconds.";
// We start each iteration of the loop with a sleep of writeback_frequency_. This ensures that
// we never write to disk twice within one writeback_frequency_ period.
locked_state->shutdown_notifier.wait_for(locked_state, writeback_frequency_,
[&locked_state] { return locked_state->shut_down; });
VLOG(4) << "DelayedLocalAggregateStorage worker: waking up from sleep. shut_down="
<< locked_state->shut_down;
if (locked_state->shut_down) {
return;
}
if (locked_state->data_modified) {
locked_state->data_save_start_notifier.notify_all();
// The only other use of these two mutexes are in
// GetMetricAggregate(). In that codepath, the data_mutex_ is locked at the beginning of
// GetMetricAggregate, and returned inside of the MetricAggregateRef. Later, when the data has
// been modified, MetricAggregateRef::Save() is called, which then calls down to
// SaveMetricAggregate(), which then locks state_ so that it can update data_modified.
// Therefore, that codepath will always lock data_mutex_, then state_.
// Here, we unlock locked_state before attempting to acquire data_mutex_ so that this codepath
// acquires the two locks in the same order, thus avoiding any potential of deadlock.
locked_state.unlock();
// While everything is unlocked, track the current storage usage.
// This cannot cause an infinite loop, since we are already in the 'data_modified' state,
// which will be reset to false at the end of this block.
auto size = static_cast<int64_t>(aggregates_.ByteSizeLong());
internal_metrics_->TrackDiskUsage(
logger::InternalMetrics::StorageClass::LocalAggregateStorage, size);
// Lock the class so no new calls to GetMetricAggregate will return until the data has
// written.
std::scoped_lock<std::mutex> data_lock(data_mutex_);
locked_state.lock();
VLOG(4) << "DelayedLocalAggregateStorage worker: data has been modified, writing to disk.";
util::Status status = proto_store_.Write(aggregates_);
if (!status.ok()) {
LOG(WARNING) << "Failed to snapshot GlobalAggregates to disk: " << status.error_message();
}
locked_state->data_save_notifier.notify_all();
locked_state->data_modified = false;
}
}
}
bool DelayedLocalAggregateStorage::HasMetricAggregate(uint32_t customer_id, uint32_t project_id,
uint32_t metric_id) {
if (aggregates_.by_customer_id().count(customer_id)) {
CustomerAggregates &customer = aggregates_.mutable_by_customer_id()->at(customer_id);
if (customer.by_project_id().count(project_id)) {
ProjectAggregates &project = customer.mutable_by_project_id()->at(project_id);
if (project.by_metric_id().count(metric_id)) {
return true;
}
}
}
return false;
}
lib::statusor::StatusOr<LocalAggregateStorage::MetricAggregateRef>
DelayedLocalAggregateStorage::GetMetricAggregate(uint32_t customer_id, uint32_t project_id,
uint32_t metric_id) {
std::unique_lock<std::mutex> data_lock(data_mutex_);
if (HasMetricAggregate(customer_id, project_id, metric_id)) {
return MetricAggregateRef(customer_id, project_id, metric_id,
&aggregates_.mutable_by_customer_id()
->at(customer_id)
.mutable_by_project_id()
->at(project_id)
.mutable_by_metric_id()
->at(metric_id),
this, std::move(data_lock));
}
std::unique_ptr<logger::ProjectContext> ctx =
global_project_context_factory_->NewProjectContext(customer_id, project_id);
if (ctx) {
CustomerAggregates &customer = (*aggregates_.mutable_by_customer_id())[customer_id];
ProjectAggregates &project = (*customer.mutable_by_project_id())[project_id];
if (ctx->GetMetric(metric_id)) {
return MetricAggregateRef(customer_id, project_id, metric_id,
&(*project.mutable_by_metric_id())[metric_id], this,
std::move(data_lock));
}
return util::Status(
util::StatusCode::NOT_FOUND,
absl::StrCat("Metric id `", metric_id, "` not found in registry for project (", customer_id,
", ", project_id, ")"));
}
return util::Status(
util::StatusCode::NOT_FOUND,
absl::StrCat("Project (", customer_id, ", ", project_id, ") not found in registry"));
}
util::Status DelayedLocalAggregateStorage::SaveMetricAggregate(uint32_t /* customer_id */,
uint32_t /* project_id */,
uint32_t /* metric_id */) {
state_.lock()->data_modified = true;
return util::Status::OK;
}
} // namespace cobalt::local_aggregation