blob: 614c049f58c6493649c6b39a29a5bf08324e6855 [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "src/local_aggregation/local_aggregate_storage/delayed_local_aggregate_storage.h"
#include <chrono>
#include <set>
#include "src/lib/util/file_system.h"
#include "src/local_aggregation/local_aggregate_storage/local_aggregate_storage.h"
#include "src/local_aggregation/local_aggregation.pb.h"
#include "src/logger/internal_metrics.h"
#include "src/logger/project_context.h"
#include "src/logger/project_context_factory.h"
#include "src/public/lib/registry_identifiers.h"
#include "src/public/lib/status.h"
#include "src/public/lib/statusor/statusor.h"
#include "src/system_data/system_data.h"
namespace cobalt::local_aggregation {
constexpr std::chrono::milliseconds DelayedLocalAggregateStorage::kDefaultWritebackFrequency =
std::chrono::minutes(5);
constexpr std::chrono::milliseconds kMaxShutdownDelay = std::chrono::seconds(4);
DelayedLocalAggregateStorage::DelayedLocalAggregateStorage(
std::string filename, util::FileSystem& fs,
const logger::ProjectContextFactory& global_project_context_factory,
system_data::SystemDataInterface& system_data, int64_t per_project_reserved_bytes,
std::chrono::milliseconds writeback_frequency)
: LocalAggregateStorage(per_project_reserved_bytes),
proto_store_(std::move(filename), fs),
global_project_context_factory_(global_project_context_factory),
system_data_(system_data),
writeback_frequency_(writeback_frequency) {
ReadPersistentStore();
MigrateStoreStructure();
DeleteOutdatedMetrics();
MigrateStoredData();
InitializePersistentStore();
std::thread t([this] { this->Run(); });
writeback_thread_ = std::move(t);
}
DelayedLocalAggregateStorage::~DelayedLocalAggregateStorage() {
ShutDown();
if (writeback_thread_.joinable()) {
VLOG(4) << "~DelayedLocalAggregateStorage(): Waiting for writeback thread to exit...";
writeback_thread_.join();
}
}
void DelayedLocalAggregateStorage::ShutDown() {
VLOG(4) << "DelayedLocalAggregateStorage::ShutDown(): shut-down requested.";
{
auto locked = state_.lock();
locked->shut_down = true;
locked->shutdown_notifier.notify_all();
locked->data_save_notifier.notify_all();
if (!locked->shutdown_complete_notifier.wait_for(
locked, kMaxShutdownDelay, [&locked] { return locked->shut_down_complete; })) {
LOG(ERROR) << "DelayedLocalAggregateStorage::ShutDown(): Writeback thread did not shut down "
"in time. Data loss likely.";
}
}
}
void DelayedLocalAggregateStorage::ReadPersistentStore() { proto_store_.Read(&aggregates_); }
void DelayedLocalAggregateStorage::DeleteData() {
// Lock the class so no new calls to GetMetricAggregate will return until the deletion has
// completed. Lock data_mutex_ first, before taking a lock on state_.
std::scoped_lock<std::mutex> data_lock(data_mutex_);
aggregates_.clear_deprecated_by_customer_id();
aggregates_.clear_metric_aggregates();
aggregates_.clear_filtered_system_profiles();
state_.lock()->data_modified = true;
}
void DelayedLocalAggregateStorage::MigrateStoreStructure() {
bool changed = false;
if (aggregates_.deprecated_by_customer_id_size() > 0) {
for (auto& [customer_id, customer] : *aggregates_.mutable_deprecated_by_customer_id()) {
for (auto& [project_id, project] : *customer.mutable_deprecated_by_project_id()) {
for (auto& [metric_id, metric] : *project.mutable_deprecated_by_metric_id()) {
changed = true;
MetricAggregateEntry* entry = aggregates_.add_metric_aggregates();
entry->set_customer_id(customer_id);
entry->set_project_id(project_id);
entry->set_metric_id(metric_id);
entry->mutable_aggregate()->Swap(&metric);
}
}
}
aggregates_.clear_deprecated_by_customer_id();
}
if (changed) {
// No need for locking as migrating is only done at startup.
Status status = proto_store_.Write(aggregates_);
if (!status.ok()) {
LOG(ERROR) << "Failed to write migrated local aggregate file to disk: " << status;
}
}
}
void DelayedLocalAggregateStorage::DeleteOutdatedMetrics() {
google::protobuf::RepeatedPtrField<MetricAggregateEntry> retained;
for (MetricAggregateEntry& entry : *aggregates_.mutable_metric_aggregates()) {
std::unique_ptr<logger::ProjectContext> ctx = global_project_context_factory_.NewProjectContext(
lib::CustomerIdentifier(entry.customer_id()).ForProject(entry.project_id()));
if (!ctx) {
LOG(WARNING) << "Found customer/project that is not present in the registry: ("
<< entry.customer_id() << ", " << entry.project_id() << "): Deleting it.";
continue;
}
if (!ctx->GetMetric(entry.metric_id())) {
LOG(WARNING) << "Found metric that is not present in the registry: (" << entry.customer_id()
<< ", " << entry.project_id() << ", " << entry.metric_id() << "): Deleting it.";
continue;
}
retained.Add()->Swap(&entry);
}
aggregates_.mutable_metric_aggregates()->Swap(&retained);
}
void DelayedLocalAggregateStorage::MigrateStoredData() {
bool changed = false;
for (MetricAggregateEntry& entry : *aggregates_.mutable_metric_aggregates()) {
std::unique_ptr<logger::ProjectContext> ctx = global_project_context_factory_.NewProjectContext(
lib::CustomerIdentifier(entry.customer_id()).ForProject(entry.project_id()));
if (!ctx) {
// This shouldn't happen since DeleteOutdatedMetrics() is called before MigrateStoredData().
LOG(ERROR) << "Found customer/project that is not present in the registry: ("
<< entry.customer_id() << ", " << entry.project_id()
<< "): it will not be migrated.";
continue;
}
const MetricDefinition* metric_def = ctx->GetMetric(entry.metric_id());
if (!metric_def) {
// This shouldn't happen since DeleteOutdatedMetrics() is called before
// MigrateStoredData().
LOG(ERROR) << "Found metric that is not present in the registry: (" << entry.customer_id()
<< ", " << entry.project_id() << ", " << entry.metric_id()
<< "): it will not be migrated.";
continue;
}
if (LocalAggregateStorage::MigrateStoredData(
ctx->project().customer_name(), ctx->project().project_name(),
*entry.mutable_aggregate(), *metric_def, system_data_)) {
changed = true;
LOG(INFO) << "Migrated some local aggregate data for metric: " << entry.customer_id() << "-"
<< entry.project_id() << "-" << entry.metric_id();
}
}
if (changed) {
// No need for locking as upgrading is only done at startup.
Status status = proto_store_.Write(aggregates_);
if (!status.ok()) {
LOG(ERROR) << "Failed to write migrated local aggregate file to disk: " << status;
}
}
}
void DelayedLocalAggregateStorage::InitializePersistentStore() {
for (const MetricAggregateEntry& entry : aggregates_.metric_aggregates()) {
UpdateProjectSizeBy(lib::CustomerIdentifier(entry.customer_id()).ForProject(entry.project_id()),
static_cast<int64_t>(entry.aggregate().ByteSizeLong()));
}
}
void DelayedLocalAggregateStorage::WaitUntilSave(std::chrono::milliseconds max_wait) {
auto locked = state_.lock();
if (locked->shut_down || locked->data_modified) {
locked->data_save_notifier.wait_for(
locked, max_wait, [&locked] { return !(locked->shut_down || locked->data_modified); });
}
}
bool DelayedLocalAggregateStorage::WaitUntilSaveStart(std::chrono::milliseconds max_wait) {
auto locked = state_.lock();
return locked->data_save_start_notifier.wait_for(locked, max_wait) == std::cv_status::no_timeout;
}
void DelayedLocalAggregateStorage::Run() {
while (true) {
auto locked_state = state_.lock();
VLOG(4) << "DelayedLocalAggregateStorage worker: sleeping for "
<< std::chrono::duration_cast<std::chrono::seconds>(writeback_frequency_).count()
<< " seconds.";
// We start each iteration of the loop with a sleep of writeback_frequency_. This ensures that
// we never write to disk twice within one writeback_frequency_ period.
locked_state->shutdown_notifier.wait_for(locked_state, writeback_frequency_,
[&locked_state] { return locked_state->shut_down; });
VLOG(4) << "DelayedLocalAggregateStorage worker: waking up from sleep. shut_down="
<< locked_state->shut_down << ", data_modified=" << locked_state->data_modified;
if (locked_state->data_modified) {
locked_state->data_save_start_notifier.notify_all();
// The only other use of these two mutexes are in
// GetMetricAggregate(). In that codepath, the data_mutex_ is locked at the beginning of
// GetMetricAggregate, and returned inside of the MetricAggregateRef. Later, when the data
// has been modified, MetricAggregateRef::Save() is called, which then calls down to
// SaveMetricAggregate(), which then locks state_ so that it can update data_modified.
// Therefore, that codepath will always lock data_mutex_, then state_.
// Here, we unlock locked_state before attempting to acquire data_mutex_ so that this
// codepath acquires the two locks in the same order, thus avoiding any potential of
// deadlock.
locked_state.unlock();
// While everything is unlocked, track the current storage usage.
// This cannot cause an infinite loop, since we are already in the 'data_modified' state,
// which will be reset to false at the end of this block.
int64_t size;
{
// Take the data lock briefly so we can measure the size of aggregates_ without it being
// modified.
std::scoped_lock<std::mutex> data_lock(data_mutex_);
size = static_cast<int64_t>(aggregates_.ByteSizeLong());
}
{
logger::InternalMetrics::InternalMetricsFlusher flusher = internal_metrics_->Flusher();
internal_metrics_->TrackDiskUsage(
logger::InternalMetrics::StorageClass::LocalAggregateStorage, size);
}
// Lock the class so no new calls to GetMetricAggregate will return until the data has
// written.
std::scoped_lock<std::mutex> data_lock(data_mutex_);
locked_state.lock();
VLOG(4) << "DelayedLocalAggregateStorage worker: data has been modified, writing to disk.";
Status status = proto_store_.Write(aggregates_);
if (!status.ok()) {
LOG(WARNING) << "Failed to snapshot GlobalAggregates to disk: " << status;
}
locked_state->data_save_notifier.notify_all();
locked_state->data_modified = false;
}
if (locked_state->shut_down) {
locked_state->shut_down_complete = true;
locked_state->shutdown_complete_notifier.notify_all();
return;
}
}
}
lib::statusor::StatusOr<LocalAggregateStorage::MetricAggregateRef>
DelayedLocalAggregateStorage::GetMetricAggregate(lib::MetricIdentifier metric) {
std::unique_lock<std::mutex> data_lock(data_mutex_);
for (MetricAggregateEntry& entry : *aggregates_.mutable_metric_aggregates()) {
if (entry.customer_id() == metric.customer_id() && entry.project_id() == metric.project_id() &&
entry.metric_id() == metric.metric_id()) {
return MetricAggregateRef(metric, entry.mutable_aggregate(), this, std::move(data_lock));
}
}
std::unique_ptr<logger::ProjectContext> ctx =
global_project_context_factory_.NewProjectContext(metric.project());
if (!ctx) {
return util::StatusBuilder(StatusCode::NOT_FOUND, "Project (")
.AppendMsg(metric.project())
.AppendMsg(") not found in registry")
.Build();
}
if (!ctx->GetMetric(metric)) {
return util::StatusBuilder(StatusCode::NOT_FOUND, "Metric id `")
.AppendMsg(metric.metric_id())
.AppendMsg("` not found in registry for project (")
.AppendMsg(metric.project())
.AppendMsg(")")
.Build();
}
MetricAggregateEntry* entry = aggregates_.add_metric_aggregates();
entry->set_customer_id(metric.customer_id());
entry->set_project_id(metric.project_id());
entry->set_metric_id(metric.metric_id());
return MetricAggregateRef(metric, entry->mutable_aggregate(), this, std::move(data_lock));
}
void DelayedLocalAggregateStorage::StoreFilteredSystemProfile(
uint64_t system_profile_hash, const SystemProfile& filtered_system_profile) {
if (!aggregates_.filtered_system_profiles().by_system_profile_hash().contains(
system_profile_hash)) {
(*aggregates_.mutable_filtered_system_profiles()
->mutable_by_system_profile_hash())[system_profile_hash] = filtered_system_profile;
}
}
lib::statusor::StatusOr<SystemProfile> DelayedLocalAggregateStorage::RetrieveFilteredSystemProfile(
uint64_t system_profile_hash) const {
const google::protobuf::Map<uint64_t, SystemProfile>& by_system_profile_hash =
aggregates_.filtered_system_profiles().by_system_profile_hash();
const auto& system_profile_it = by_system_profile_hash.find(system_profile_hash);
if (system_profile_it != by_system_profile_hash.end()) {
return system_profile_it->second;
}
return util::StatusBuilder(StatusCode::NOT_FOUND, "SystemProfile hash `")
.AppendMsg(system_profile_hash)
.AppendMsg("` not found in filtered SystemProfile cache")
.Build();
}
Status DelayedLocalAggregateStorage::SaveMetricAggregate(lib::MetricIdentifier /* metric */) {
state_.lock()->data_modified = true;
return Status::OkStatus();
}
Status DelayedLocalAggregateStorage::GarbageCollection() {
VLOG(4) << "Garbage collecting unused system profiles from the local aggregate storage.";
// Lock the class so no new calls to GetMetricAggregate will return until the garbage collection
// has completed.
std::scoped_lock<std::mutex> data_lock(data_mutex_);
std::set<uint64_t> system_profiles_in_use;
// Find the hashes of all the system_profiles that are currently in use in the aggregate data.
for (const MetricAggregateEntry& entry : aggregates_.metric_aggregates()) {
for (const auto& [_, report] : entry.aggregate().by_report_id()) {
switch (report.time_period_case()) {
case ReportAggregate::kHourly:
for (const auto& [_, bucket] : report.hourly().by_hour_id()) {
for (const SystemProfileAggregate& aggregate : bucket.system_profile_aggregates()) {
system_profiles_in_use.insert(aggregate.system_profile_hash());
}
}
break;
case ReportAggregate::kDaily:
for (const auto& [_, bucket] : report.daily().by_day_index()) {
for (const SystemProfileAggregate& aggregate : bucket.system_profile_aggregates()) {
system_profiles_in_use.insert(aggregate.system_profile_hash());
}
}
break;
default:
// Ignore empty report aggregates.
break;
}
}
}
// Remove the entries from the by_system_profile_hash that are not needed.
auto* by_system_profiles =
aggregates_.mutable_filtered_system_profiles()->mutable_by_system_profile_hash();
int removed = 0;
for (auto it = by_system_profiles->begin(); it != by_system_profiles->end();) {
if (system_profiles_in_use.count(it->first) == 0) {
++removed;
it = by_system_profiles->erase(it);
} else {
++it;
}
}
if (removed > 0) {
VLOG(4) << "Garbage collecting " << removed
<< " filtered SystemProfiles from the local aggregate storage.";
state_.lock()->data_modified = true;
}
return Status::OkStatus();
}
} // namespace cobalt::local_aggregation