blob: 297e900ea84730867129a01696ec6d807cb00f14 [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "src/local_aggregation_1_1/local_aggregate_storage/delayed_local_aggregate_storage.h"
#include <chrono>
#include <set>
#include "absl/strings/str_cat.h"
#include "src/lib/util/file_system.h"
#include "src/local_aggregation_1_1/local_aggregate_storage/local_aggregate_storage.h"
#include "src/local_aggregation_1_1/local_aggregation.pb.h"
#include "src/logger/internal_metrics.h"
#include "src/logger/project_context.h"
#include "src/logger/project_context_factory.h"
#include "src/logging.h"
#include "src/pb/metadata_builder.h"
#include "src/public/lib/registry_identifiers.h"
#include "src/public/lib/status.h"
#include "src/public/lib/statusor/statusor.h"
namespace cobalt::local_aggregation {
constexpr std::chrono::milliseconds DelayedLocalAggregateStorage::kDefaultWritebackFrequency =
std::chrono::minutes(5);
constexpr std::chrono::milliseconds kMaxShutdownDelay = std::chrono::seconds(4);
DelayedLocalAggregateStorage::DelayedLocalAggregateStorage(
std::string filename, util::FileSystem* fs,
const logger::ProjectContextFactory* global_project_context_factory,
MetadataBuilder* metadata_builder, int64_t per_project_reserved_bytes,
std::chrono::milliseconds writeback_frequency)
: LocalAggregateStorage(per_project_reserved_bytes),
proto_store_(std::move(filename), fs),
global_project_context_factory_(global_project_context_factory),
metadata_builder_(metadata_builder),
writeback_frequency_(writeback_frequency) {
ReadPersistentStore();
DeleteOutdatedMetrics();
MigrateStoredData();
InitializePersistentStore();
std::thread t([this] { this->Run(); });
writeback_thread_ = std::move(t);
}
DelayedLocalAggregateStorage::~DelayedLocalAggregateStorage() {
ShutDown();
if (writeback_thread_.joinable()) {
VLOG(4) << "~DelayedLocalAggregateStorage(): Waiting for writeback thread to exit...";
writeback_thread_.join();
}
}
void DelayedLocalAggregateStorage::ShutDown() {
VLOG(4) << "DelayedLocalAggregateStorage::ShutDown(): shut-down requested.";
{
auto locked = state_.lock();
locked->shut_down = true;
locked->shutdown_notifier.notify_all();
locked->data_save_notifier.notify_all();
if (!locked->shutdown_complete_notifier.wait_for(
locked, kMaxShutdownDelay, [&locked] { return locked->shut_down_complete; })) {
LOG(ERROR) << "DelayedLocalAggregateStorage::ShutDown(): Writeback thread did not shut down "
"in time. Data loss likely.";
}
}
}
void DelayedLocalAggregateStorage::ReadPersistentStore() { proto_store_.Read(&aggregates_); }
void DelayedLocalAggregateStorage::DeleteData() {
// Lock the class so no new calls to GetMetricAggregate will return until the deletion has
// completed. Lock data_mutex_ first, before taking a lock on state_.
std::scoped_lock<std::mutex> data_lock(data_mutex_);
aggregates_.clear_by_customer_id();
aggregates_.clear_filtered_system_profiles();
state_.lock()->data_modified = true;
WaitUntilSave(writeback_frequency_ * 2);
}
void DelayedLocalAggregateStorage::DeleteOutdatedMetrics() {
std::vector<std::tuple<uint32_t, uint32_t>> projects_to_delete;
std::vector<std::tuple<uint32_t, uint32_t, uint32_t>> metrics_to_delete;
for (auto [customer_id, customer] : aggregates_.by_customer_id()) {
for (auto [project_id, project] : customer.by_project_id()) {
std::unique_ptr<logger::ProjectContext> ctx =
global_project_context_factory_->NewProjectContext(
lib::CustomerIdentifier(customer_id).ForProject(project_id));
if (!ctx) {
LOG(WARNING) << "Found customer/project that is not present in the registry: ("
<< customer_id << ", " << project_id << "): Deleting it.";
projects_to_delete.emplace_back(std::make_tuple(customer_id, project_id));
continue;
}
for (auto [metric_id, _] : project.by_metric_id()) {
if (!ctx->GetMetric(metric_id)) {
LOG(WARNING) << "Found metric that is not present in the registry: (" << customer_id
<< ", " << project_id << ", " << metric_id << "): Deleting it.";
metrics_to_delete.emplace_back(std::make_tuple(customer_id, project_id, metric_id));
}
}
}
}
for (auto [customer_id, project_id] : projects_to_delete) {
aggregates_.mutable_by_customer_id()
->at(customer_id)
.mutable_by_project_id()
->erase(project_id);
}
for (auto [customer_id, project_id, metric_id] : metrics_to_delete) {
aggregates_.mutable_by_customer_id()
->at(customer_id)
.mutable_by_project_id()
->at(project_id)
.mutable_by_metric_id()
->erase(metric_id);
}
}
void DelayedLocalAggregateStorage::MigrateStoredData() {
bool changed = false;
for (auto& [customer_id, customer] : *aggregates_.mutable_by_customer_id()) {
for (auto& [project_id, project] : *customer.mutable_by_project_id()) {
std::unique_ptr<logger::ProjectContext> ctx =
global_project_context_factory_->NewProjectContext(
lib::CustomerIdentifier(customer_id).ForProject(project_id));
if (!ctx) {
// This shouldn't happen since DeleteOutdatedMetrics() is called before MigrateStoredData().
LOG(ERROR) << "Found customer/project that is not present in the registry: (" << customer_id
<< ", " << project_id << "): it will not be migrated.";
continue;
}
for (auto& [metric_id, metric] : *project.mutable_by_metric_id()) {
const MetricDefinition* metric_def = ctx->GetMetric(metric_id);
if (!metric_def) {
// This shouldn't happen since DeleteOutdatedMetrics() is called before
// MigrateStoredData().
LOG(ERROR) << "Found metric that is not present in the registry: (" << customer_id << ", "
<< project_id << ", " << metric_id << "): it will not be migrated.";
continue;
}
if (LocalAggregateStorage::MigrateStoredData(metric, *metric_def, metadata_builder_)) {
changed = true;
LOG(INFO) << "Migrated some local aggregate data for metric: " << customer_id << "-"
<< project_id << "-" << metric_id;
}
}
}
}
if (changed) {
// No need for locking as upgrading is only done at startup.
Status status = proto_store_.Write(aggregates_);
if (!status.ok()) {
LOG(ERROR) << "Failed to write migrated local aggregate file to disk: " << status;
}
}
}
void DelayedLocalAggregateStorage::InitializePersistentStore() {
for (auto [customer_id, customer] : aggregates_.by_customer_id()) {
lib::CustomerIdentifier cust(customer_id);
for (auto [project_id, project] : customer.by_project_id()) {
for (auto [unused, metric] : project.by_metric_id()) {
UpdateProjectSizeBy(cust.ForProject(project_id),
static_cast<int64_t>(metric.ByteSizeLong()));
}
}
}
}
void DelayedLocalAggregateStorage::WaitUntilSave(std::chrono::milliseconds max_wait) {
auto locked = state_.lock();
if (locked->shut_down || locked->data_modified) {
locked->data_save_notifier.wait_for(
locked, max_wait, [&locked] { return !(locked->shut_down || locked->data_modified); });
}
}
bool DelayedLocalAggregateStorage::WaitUntilSaveStart(std::chrono::milliseconds max_wait) {
auto locked = state_.lock();
return locked->data_save_start_notifier.wait_for(locked, max_wait) == std::cv_status::no_timeout;
}
void DelayedLocalAggregateStorage::Run() {
while (true) {
auto locked_state = state_.lock();
VLOG(4) << "DelayedLocalAggregateStorage worker: sleeping for "
<< std::chrono::duration_cast<std::chrono::seconds>(writeback_frequency_).count()
<< " seconds.";
// We start each iteration of the loop with a sleep of writeback_frequency_. This ensures that
// we never write to disk twice within one writeback_frequency_ period.
locked_state->shutdown_notifier.wait_for(locked_state, writeback_frequency_,
[&locked_state] { return locked_state->shut_down; });
VLOG(4) << "DelayedLocalAggregateStorage worker: waking up from sleep. shut_down="
<< locked_state->shut_down << ", data_modified=" << locked_state->data_modified;
if (locked_state->data_modified) {
locked_state->data_save_start_notifier.notify_all();
// The only other use of these two mutexes are in
// GetMetricAggregate(). In that codepath, the data_mutex_ is locked at the beginning of
// GetMetricAggregate, and returned inside of the MetricAggregateRef. Later, when the data
// has been modified, MetricAggregateRef::Save() is called, which then calls down to
// SaveMetricAggregate(), which then locks state_ so that it can update data_modified.
// Therefore, that codepath will always lock data_mutex_, then state_.
// Here, we unlock locked_state before attempting to acquire data_mutex_ so that this
// codepath acquires the two locks in the same order, thus avoiding any potential of
// deadlock.
locked_state.unlock();
// While everything is unlocked, track the current storage usage.
// This cannot cause an infinite loop, since we are already in the 'data_modified' state,
// which will be reset to false at the end of this block.
int64_t size;
{
// Take the data lock briefly so we can measure the size of aggregates_ without it being
// modified.
std::scoped_lock<std::mutex> data_lock(data_mutex_);
size = static_cast<int64_t>(aggregates_.ByteSizeLong());
}
internal_metrics_->TrackDiskUsage(
logger::InternalMetrics::StorageClass::LocalAggregateStorage, size);
// Lock the class so no new calls to GetMetricAggregate will return until the data has
// written.
std::scoped_lock<std::mutex> data_lock(data_mutex_);
locked_state.lock();
VLOG(4) << "DelayedLocalAggregateStorage worker: data has been modified, writing to disk.";
Status status = proto_store_.Write(aggregates_);
if (!status.ok()) {
LOG(WARNING) << "Failed to snapshot GlobalAggregates to disk: " << status;
}
locked_state->data_save_notifier.notify_all();
locked_state->data_modified = false;
}
if (locked_state->shut_down) {
locked_state->shut_down_complete = true;
locked_state->shutdown_complete_notifier.notify_all();
return;
}
}
}
bool DelayedLocalAggregateStorage::HasMetricAggregate(lib::MetricIdentifier metric) {
if (aggregates_.by_customer_id().count(metric.customer_id())) {
CustomerAggregates& customer = aggregates_.mutable_by_customer_id()->at(metric.customer_id());
if (customer.by_project_id().count(metric.project_id())) {
ProjectAggregates& project = customer.mutable_by_project_id()->at(metric.project_id());
if (project.by_metric_id().count(metric.metric_id())) {
return true;
}
}
}
return false;
}
lib::statusor::StatusOr<LocalAggregateStorage::MetricAggregateRef>
DelayedLocalAggregateStorage::GetMetricAggregate(lib::MetricIdentifier metric) {
std::unique_lock<std::mutex> data_lock(data_mutex_);
if (HasMetricAggregate(metric)) {
return MetricAggregateRef(metric,
&aggregates_.mutable_by_customer_id()
->at(metric.customer_id())
.mutable_by_project_id()
->at(metric.project_id())
.mutable_by_metric_id()
->at(metric.metric_id()),
this, std::move(data_lock));
}
std::unique_ptr<logger::ProjectContext> ctx =
global_project_context_factory_->NewProjectContext(metric.project());
if (!ctx) {
return util::StatusBuilder(StatusCode::NOT_FOUND, "Project (")
.AppendMsg(metric.project())
.AppendMsg(") not found in registry")
.Build();
}
CustomerAggregates& customer = (*aggregates_.mutable_by_customer_id())[metric.customer_id()];
ProjectAggregates& project = (*customer.mutable_by_project_id())[metric.project_id()];
if (!ctx->GetMetric(metric)) {
return util::StatusBuilder(StatusCode::NOT_FOUND, "Metric id `")
.AppendMsg(metric.metric_id())
.AppendMsg("` not found in registry for project (")
.AppendMsg(metric.project())
.AppendMsg(")")
.Build();
}
return MetricAggregateRef(metric, &(*project.mutable_by_metric_id())[metric.metric_id()], this,
std::move(data_lock));
}
void DelayedLocalAggregateStorage::StoreFilteredSystemProfile(
uint64_t system_profile_hash, const SystemProfile& filtered_system_profile) {
if (!aggregates_.filtered_system_profiles().by_system_profile_hash().contains(
system_profile_hash)) {
(*aggregates_.mutable_filtered_system_profiles()
->mutable_by_system_profile_hash())[system_profile_hash] = filtered_system_profile;
}
}
lib::statusor::StatusOr<SystemProfile> DelayedLocalAggregateStorage::RetrieveFilteredSystemProfile(
uint64_t system_profile_hash) const {
const google::protobuf::Map<uint64_t, SystemProfile>& by_system_profile_hash =
aggregates_.filtered_system_profiles().by_system_profile_hash();
const auto& system_profile_it = by_system_profile_hash.find(system_profile_hash);
if (system_profile_it != by_system_profile_hash.end()) {
return system_profile_it->second;
}
return util::StatusBuilder(StatusCode::NOT_FOUND, "SystemProfile hash `")
.AppendMsg(system_profile_hash)
.AppendMsg("` not found in filtered SystemProfile cache")
.Build();
}
Status DelayedLocalAggregateStorage::SaveMetricAggregate(lib::MetricIdentifier /* metric */) {
state_.lock()->data_modified = true;
return Status::OkStatus();
}
Status DelayedLocalAggregateStorage::GarbageCollection() {
VLOG(4) << "Garbage collecting unused system profiles from the local aggregate storage.";
// Lock the class so no new calls to GetMetricAggregate will return until the garbage collection
// has completed.
std::scoped_lock<std::mutex> data_lock(data_mutex_);
std::set<uint64_t> system_profiles_in_use;
// Find the hashes of all the system_profiles that are currently in use in the aggregate data.
for (const auto& [_, customer] : aggregates_.by_customer_id()) {
for (const auto& [_, project] : customer.by_project_id()) {
for (const auto& [_, metric] : project.by_metric_id()) {
for (const auto& [_, report] : metric.by_report_id()) {
switch (report.time_period_case()) {
case ReportAggregate::kHourly:
for (const auto& [_, bucket] : report.hourly().by_hour_id()) {
for (const SystemProfileAggregate& aggregate : bucket.system_profile_aggregates()) {
system_profiles_in_use.insert(aggregate.system_profile_hash());
}
}
break;
case ReportAggregate::kDaily:
for (const auto& [_, bucket] : report.daily().by_day_index()) {
for (const SystemProfileAggregate& aggregate : bucket.system_profile_aggregates()) {
system_profiles_in_use.insert(aggregate.system_profile_hash());
}
}
break;
default:
LOG(ERROR) << "";
}
}
}
}
}
// Remove the entries from the by_system_profile_hash that are not needed.
auto* by_system_profiles =
aggregates_.mutable_filtered_system_profiles()->mutable_by_system_profile_hash();
int removed = 0;
for (auto it = by_system_profiles->begin(); it != by_system_profiles->end();) {
if (system_profiles_in_use.count(it->first) == 0) {
++removed;
it = by_system_profiles->erase(it);
} else {
++it;
}
}
if (removed > 0) {
VLOG(4) << "Garbage collecting " << removed
<< " filtered SystemProfiles from the local aggregate storage.";
state_.lock()->data_modified = true;
}
return Status::OkStatus();
}
} // namespace cobalt::local_aggregation