blob: 7fc325631f5caa2871343e487371ed813bc74b2d [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "src/local_aggregation_1_1/local_aggregate_storage/immediate_local_aggregate_storage.h"
#include <set>
#include "absl/strings/numbers.h"
#include "absl/strings/str_cat.h"
#include "src/local_aggregation_1_1/local_aggregate_storage/local_aggregate_storage.h"
#include "src/logger/project_context_factory.h"
#include "src/logging.h"
#include "src/pb/metadata_builder.h"
#include "src/public/lib/registry_identifiers.h"
#include "src/public/lib/statusor/status_macros.h"
namespace cobalt::local_aggregation {
ImmediateLocalAggregateStorage::ImmediateLocalAggregateStorage(
std::string base_directory, util::FileSystem *fs,
const logger::ProjectContextFactory *global_project_context_factory,
MetadataBuilder *metadata_builder, int64_t per_project_reserved_bytes)
: LocalAggregateStorage(per_project_reserved_bytes),
base_directory_(std::move(base_directory)),
fs_(fs),
proto_store_(fs_),
global_project_context_factory_(global_project_context_factory),
metadata_builder_(metadata_builder) {
CHECK(global_project_context_factory_)
<< "No global_project_context_factory provided. Cannot initialize store!";
std::string filtered_system_profiles_file =
absl::StrCat(base_directory_, "/", kFilteredSystemProfilesFile);
if (fs_->FileExists(filtered_system_profiles_file)) {
auto status = proto_store_.Read(filtered_system_profiles_file, &filtered_system_profiles_);
if (!status.ok()) {
LOG(FATAL) << "Failed to read the FilteredSystemProfiles from file '"
<< filtered_system_profiles_file << "':" << status;
}
}
DeleteOutdatedMetrics();
MigrateStoredData();
InitializePersistentStore();
}
void ImmediateLocalAggregateStorage::DeleteData() {
// Lock the class so no new calls to GetMetricAggregate will return until the deletion has
// completed.
std::scoped_lock<std::mutex> data_lock(mutex_);
for (const auto &customer_folder : fs_->ListFiles(base_directory_).ConsumeValueOr({})) {
if (customer_folder == kFilteredSystemProfilesFile) {
fs_->Delete(absl::StrCat(base_directory_, "/", customer_folder));
} else {
fs_->DeleteRecursive(absl::StrCat(base_directory_, "/", customer_folder));
}
}
aggregates_.clear();
filtered_system_profiles_.clear_by_system_profile_hash();
}
void ImmediateLocalAggregateStorage::DeleteOutdatedMetrics() {
for (const auto &customer_folder : fs_->ListFiles(base_directory_).ConsumeValueOr({})) {
if (customer_folder == kFilteredSystemProfilesFile) {
continue; // Skip the filtered system profiles file.
}
auto customer_dir = absl::StrCat(base_directory_, "/", customer_folder);
uint32_t customer_id;
if (!absl::SimpleAtoi(customer_folder, &customer_id)) {
LOG(WARNING) << "Found bad customer folder name: " << customer_folder;
continue;
}
for (const auto &project_folder : fs_->ListFiles(customer_dir).ConsumeValueOr({})) {
auto project_dir = absl::StrCat(customer_dir, "/", project_folder);
uint32_t project_id;
if (!absl::SimpleAtoi(project_folder, &project_id)) {
LOG(WARNING) << "Found bad project folder name: " << project_folder;
continue;
}
auto project_context = global_project_context_factory_->NewProjectContext(
lib::CustomerIdentifier(customer_id).ForProject(project_id));
if (!project_context) {
LOG(WARNING) << "Found customer/project that is not present in the registry: ("
<< customer_id << ", " << project_id << "): Deleting it.";
if (!fs_->DeleteRecursive(project_dir)) {
LOG(ERROR) << "Unable to delete project folder";
}
continue;
}
for (const auto &metric_file : fs_->ListFiles(project_dir).ConsumeValueOr({})) {
auto metric_filename = absl::StrCat(project_dir, "/", metric_file);
uint32_t metric_id;
if (!absl::SimpleAtoi(metric_file, &metric_id)) {
LOG(WARNING) << "Unable to parse metric file name `" << metric_file << "` as an integer";
continue;
}
if (!project_context->GetMetric(metric_id)) {
LOG(WARNING) << "Found metric that is not present in the registry: (" << customer_id
<< ", " << project_id << ", " << metric_id << "): Deleting it.";
if (!fs_->Delete(metric_filename)) {
LOG(ERROR) << "Unable to delete metric file";
}
}
}
}
}
}
void ImmediateLocalAggregateStorage::MigrateStoredData() {
for (const auto &project : global_project_context_factory_->ListProjects()) {
std::string customer_dir = absl::StrCat(base_directory_, "/", project.customer_id());
if (!fs_->FileExists(customer_dir)) {
continue;
}
std::string project_dir = absl::StrCat(customer_dir, "/", project.project_id());
if (!fs_->FileExists(project_dir)) {
continue;
}
std::unique_ptr<logger::ProjectContext> ctx =
global_project_context_factory_->NewProjectContext(project);
for (lib::MetricIdentifier metric : ctx->ListMetrics()) {
std::string metric_file = absl::StrCat(project_dir, "/", metric.metric_id());
if (!fs_->FileExists(metric_file)) {
continue;
}
MetricAggregate metric_aggregate;
auto status = proto_store_.Read(absl::StrCat(base_directory_, "/", metric.customer_id(), "/",
metric.project_id(), "/", metric.metric_id()),
&metric_aggregate);
if (!status.ok()) {
LOG(ERROR) << "Failed to migrate aggregate file: " << metric.customer_id() << "/"
<< metric.project_id() << "/" << metric.metric_id() << ": " << status;
continue;
}
bool changed = LocalAggregateStorage::MigrateStoredData(
metric_aggregate, *ctx->GetMetric(metric), metadata_builder_);
if (changed) {
status = proto_store_.Write(absl::StrCat(base_directory_, "/", metric.customer_id(), "/",
metric.project_id(), "/", metric.metric_id()),
metric_aggregate);
if (!status.ok()) {
LOG(ERROR) << "Failed to write migrated aggregate file: " << metric.customer_id() << "/"
<< metric.project_id() << "/" << metric.metric_id() << ": " << status;
continue;
}
LOG(INFO) << "Migrated the local aggregate file for metric: " << metric;
}
}
}
if (filtered_system_profiles_changed_) {
std::string filtered_system_profiles_file =
absl::StrCat(base_directory_, "/", kFilteredSystemProfilesFile);
auto status = proto_store_.Write(filtered_system_profiles_file, filtered_system_profiles_);
if (!status.ok()) {
LOG(ERROR) << "Failed to write the migrated FilteredSystemProfiles to file '"
<< filtered_system_profiles_file << "':" << status;
} else {
filtered_system_profiles_changed_ = false;
}
}
}
void ImmediateLocalAggregateStorage::InitializePersistentStore() {
if (!fs_->FileExists(base_directory_)) {
fs_->MakeDirectory(base_directory_);
}
for (const auto &project : global_project_context_factory_->ListProjects()) {
std::string customer_dir = absl::StrCat(base_directory_, "/", project.customer_id());
if (!fs_->FileExists(customer_dir)) {
fs_->MakeDirectory(customer_dir);
}
std::string project_dir = absl::StrCat(customer_dir, "/", project.project_id());
if (!fs_->FileExists(project_dir)) {
fs_->MakeDirectory(project_dir);
}
std::unique_ptr<logger::ProjectContext> ctx =
global_project_context_factory_->NewProjectContext(project);
for (lib::MetricIdentifier metric_identifier : ctx->ListMetrics()) {
std::string metric_file = absl::StrCat(project_dir, "/", metric_identifier.metric_id());
if (!fs_->FileExists(metric_file)) {
aggregates_[metric_identifier] = MetricAggregate();
UpdateProjectSizeBy(metric_identifier.project(), 0);
} else {
UpdateProjectSizeBy(metric_identifier.project(),
static_cast<int64_t>(fs_->FileSize(metric_file).ValueOrDie()));
}
}
}
}
lib::statusor::StatusOr<LocalAggregateStorage::MetricAggregateRef>
ImmediateLocalAggregateStorage::GetMetricAggregate(lib::MetricIdentifier metric) {
// Check if the file has already been read
auto aggregate = aggregates_.find(metric);
if (aggregate != aggregates_.end()) {
return MetricAggregateRef(metric, &aggregate->second, this, mutex_);
}
// Try to read the file (if the customer_id/project_id/metric_id tuple actually exists in the
// registry)
std::unique_ptr<logger::ProjectContext> ctx =
global_project_context_factory_->NewProjectContext(metric.project());
if (!ctx) {
return util::StatusBuilder(StatusCode::NOT_FOUND, "Project (")
.AppendMsg(metric.project())
.AppendMsg(") not found in registry")
.Build();
}
if (!ctx->GetMetric(metric)) {
return util::StatusBuilder(StatusCode::NOT_FOUND, "Metric id `")
.AppendMsg(metric.metric_id())
.AppendMsg("` not found in registry for project (")
.AppendMsg(metric.project())
.AppendMsg(")")
.Build();
}
auto status = proto_store_.Read(absl::StrCat(base_directory_, "/", metric.customer_id(), "/",
metric.project_id(), "/", metric.metric_id()),
&aggregates_[metric]);
if (!status.ok()) {
// Should be unlikely to occur, as InitializePersistentStore ensures that every metric either
// has an entry in |aggregates_| or a file in the file system.
LOG(ERROR) << "Failed to load aggregate file: " << metric.customer_id() << "/"
<< metric.project_id() << "/" << metric.metric_id() << ": " << status;
// Fall through and return an empty aggregate for this metric. If the file did exist and was
// unable to be read, this results in all current data for the metric aggregate being lost
// (overwritten) when SaveMetricAggregate is called.
}
return MetricAggregateRef(metric, &aggregates_[metric], this, mutex_);
}
void ImmediateLocalAggregateStorage::StoreFilteredSystemProfile(
uint64_t system_profile_hash, const SystemProfile &filtered_system_profile) {
if (!filtered_system_profiles_.by_system_profile_hash().contains(system_profile_hash)) {
(*filtered_system_profiles_.mutable_by_system_profile_hash())[system_profile_hash] =
filtered_system_profile;
filtered_system_profiles_changed_ = true;
}
}
lib::statusor::StatusOr<SystemProfile>
ImmediateLocalAggregateStorage::RetrieveFilteredSystemProfile(uint64_t system_profile_hash) const {
const google::protobuf::Map<uint64_t, SystemProfile> &by_system_profile_hash =
filtered_system_profiles_.by_system_profile_hash();
const auto &system_profile_it = by_system_profile_hash.find(system_profile_hash);
if (system_profile_it != by_system_profile_hash.end()) {
return system_profile_it->second;
}
return util::StatusBuilder(StatusCode::NOT_FOUND, "SystemProfile hash `")
.AppendMsg(system_profile_hash)
.AppendMsg("` not found in filtered SystemProfile cache")
.Build();
}
Status ImmediateLocalAggregateStorage::SaveMetricAggregate(lib::MetricIdentifier metric) {
auto aggregate = aggregates_.find(metric);
if (aggregate == aggregates_.end()) {
return Status(StatusCode::NOT_FOUND, "No matching metric aggregate found");
}
CB_RETURN_IF_ERROR(
proto_store_.Write(absl::StrCat(base_directory_, "/", metric.customer_id(), "/",
metric.project_id(), "/", metric.metric_id()),
aggregate->second));
if (filtered_system_profiles_changed_) {
CB_RETURN_IF_ERROR(
proto_store_.Write(absl::StrCat(base_directory_, "/", kFilteredSystemProfilesFile),
filtered_system_profiles_));
filtered_system_profiles_changed_ = false;
}
return Status::OkStatus();
}
Status ImmediateLocalAggregateStorage::GarbageCollection() {
VLOG(4) << "Garbage collecting unused system profiles from the local aggregate storage.";
// Lock the class so no new calls to GetMetricAggregate will return until the garbage collection
// has completed.
std::scoped_lock<std::mutex> data_lock(mutex_);
std::set<uint64_t> system_profiles_in_use;
// Find the hashes of all the system_profiles that are currently in use in the aggregate data.
for (const auto &project : global_project_context_factory_->ListProjects()) {
std::string customer_dir = absl::StrCat(base_directory_, "/", project.customer_id());
if (!fs_->FileExists(customer_dir)) {
continue;
}
std::string project_dir = absl::StrCat(customer_dir, "/", project.project_id());
if (!fs_->FileExists(project_dir)) {
continue;
}
std::unique_ptr<logger::ProjectContext> ctx =
global_project_context_factory_->NewProjectContext(project);
for (lib::MetricIdentifier metric : ctx->ListMetrics()) {
MetricAggregate *metric_aggregate;
// Load the data for the metric. Either from the in-memory map, or the file system.
auto aggregate_it = aggregates_.find(metric);
if (aggregate_it != aggregates_.end()) {
metric_aggregate = &aggregate_it->second;
} else {
metric_aggregate = &aggregates_[metric];
// If a metric aggregate file can't be read, abort to avoid garbage collecting any system
// profiles that metric aggregate is currently using.
CB_RETURN_IF_ERROR(
proto_store_.Read(absl::StrCat(base_directory_, "/", metric.customer_id(), "/",
metric.project_id(), "/", metric.metric_id()),
metric_aggregate));
}
for (const auto &[_, report] : metric_aggregate->by_report_id()) {
switch (report.time_period_case()) {
case ReportAggregate::kHourly:
for (const auto &[_, bucket] : report.hourly().by_hour_id()) {
for (const SystemProfileAggregate &aggregate : bucket.system_profile_aggregates()) {
system_profiles_in_use.insert(aggregate.system_profile_hash());
}
}
break;
case ReportAggregate::kDaily:
for (const auto &[_, bucket] : report.daily().by_day_index()) {
for (const SystemProfileAggregate &aggregate : bucket.system_profile_aggregates()) {
system_profiles_in_use.insert(aggregate.system_profile_hash());
}
}
break;
default:
LOG(ERROR) << "";
}
}
}
}
// Remove the entries from the by_system_profile_hash that are not needed.
auto *by_system_profiles = filtered_system_profiles_.mutable_by_system_profile_hash();
int removed = 0;
for (auto it = by_system_profiles->begin(); it != by_system_profiles->end();) {
if (system_profiles_in_use.count(it->first) == 0) {
++removed;
it = by_system_profiles->erase(it);
} else {
++it;
}
}
if (removed > 0) {
VLOG(4) << "Garbage collecting " << removed
<< " filtered SystemProfiles from the local aggregate storage.";
CB_RETURN_IF_ERROR(
proto_store_.Write(absl::StrCat(base_directory_, "/", kFilteredSystemProfilesFile),
filtered_system_profiles_));
filtered_system_profiles_changed_ = false;
}
return Status::OkStatus();
}
} // namespace cobalt::local_aggregation