blob: 5b7451197be905b1cec08d22e2a36d521c39ccb4 [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "src/local_aggregation_1_1/local_aggregate_storage/immediate_local_aggregate_storage.h"
#include "absl/strings/numbers.h"
#include "absl/strings/str_cat.h"
#include "src/local_aggregation_1_1/local_aggregate_storage/local_aggregate_storage.h"
#include "src/logger/project_context_factory.h"
#include "src/logging.h"
namespace cobalt::local_aggregation {
ImmediateLocalAggregateStorage::ImmediateLocalAggregateStorage(
std::string base_directory, util::FileSystem *fs,
const logger::ProjectContextFactory *global_project_context_factory)
: base_directory_(std::move(base_directory)),
fs_(fs),
proto_store_(fs_),
global_project_context_factory_(global_project_context_factory) {
CHECK(global_project_context_factory_)
<< "No global_project_context_factory provided. Cannot initialize store!";
DeleteOutdatedMetrics();
InitializePersistentStore();
}
void ImmediateLocalAggregateStorage::DeleteData() {
for (const auto &customer_folder : fs_->ListFiles(base_directory_).ConsumeValueOr({})) {
fs_->DeleteRecursive(absl::StrCat(base_directory_, "/", customer_folder));
}
aggregates_.clear();
}
void ImmediateLocalAggregateStorage::DeleteOutdatedMetrics() {
for (const auto &customer_folder : fs_->ListFiles(base_directory_).ConsumeValueOr({})) {
auto customer_dir = absl::StrCat(base_directory_, "/", customer_folder);
uint32_t customer_id;
if (!absl::SimpleAtoi(customer_folder, &customer_id)) {
LOG(WARNING) << "Found bad customer folder name: " << customer_folder;
continue;
}
for (const auto &project_folder : fs_->ListFiles(customer_dir).ConsumeValueOr({})) {
auto project_dir = absl::StrCat(customer_dir, "/", project_folder);
uint32_t project_id;
if (!absl::SimpleAtoi(project_folder, &project_id)) {
LOG(WARNING) << "Found bad project folder name: " << project_folder;
continue;
}
auto project_context =
global_project_context_factory_->NewProjectContext(customer_id, project_id);
if (!project_context) {
LOG(WARNING) << "Found customer/project that is not present in the registry: ("
<< customer_id << ", " << project_id << "): Deleting it.";
if (!fs_->DeleteRecursive(project_dir)) {
LOG(ERROR) << "Unable to delete project folder";
}
continue;
}
for (const auto &metric_file : fs_->ListFiles(project_dir).ConsumeValueOr({})) {
auto metric_filename = absl::StrCat(project_dir, "/", metric_file);
uint32_t metric_id;
if (!absl::SimpleAtoi(metric_file, &metric_id)) {
LOG(WARNING) << "Unable to parse metric file name `" << metric_file << "` as an integer";
continue;
}
if (!project_context->GetMetric(metric_id)) {
LOG(WARNING) << "Found metric that is not present in the registry: (" << customer_id
<< ", " << project_id << ", " << metric_id << "): Deleting it.";
if (!fs_->Delete(metric_filename)) {
LOG(ERROR) << "Unable to delete metric file";
}
}
}
}
}
}
void ImmediateLocalAggregateStorage::InitializePersistentStore() {
if (!fs_->FileExists(base_directory_)) {
fs_->MakeDirectory(base_directory_);
}
for (const auto &project : global_project_context_factory_->ListProjects()) {
auto customer_id = std::get<0>(project);
auto project_id = std::get<1>(project);
auto customer_dir = absl::StrCat(base_directory_, "/", customer_id);
if (!fs_->FileExists(customer_dir)) {
fs_->MakeDirectory(customer_dir);
}
auto project_dir = absl::StrCat(customer_dir, "/", project_id);
if (!fs_->FileExists(project_dir)) {
fs_->MakeDirectory(project_dir);
}
auto ctx = global_project_context_factory_->NewProjectContext(customer_id, project_id);
for (const auto &metric_id : ctx->ListMetrics()) {
auto metric_file = absl::StrCat(project_dir, "/", metric_id);
if (!fs_->FileExists(metric_file)) {
aggregates_[std::make_tuple(customer_id, project_id, metric_id)] = MetricAggregate();
}
}
}
}
lib::statusor::StatusOr<LocalAggregateStorage::MetricAggregateRef>
ImmediateLocalAggregateStorage::GetMetricAggregate(const uint32_t customer_id,
const uint32_t project_id,
const uint32_t metric_id) {
// Check if the file has already been read
auto tuple = std::make_tuple(customer_id, project_id, metric_id);
auto aggregate = aggregates_.find(tuple);
if (aggregate != aggregates_.end()) {
return MetricAggregateRef(customer_id, project_id, metric_id, &aggregate->second, this, mutex_);
}
// Try to read the file (if the customer_id/project_id/metric_id tuple actually exists in the
// registry)
auto ctx = global_project_context_factory_->NewProjectContext(customer_id, project_id);
if (ctx) {
if (ctx->GetMetric(metric_id)) {
auto status = proto_store_.Read(
absl::StrCat(base_directory_, "/", customer_id, "/", project_id, "/", metric_id),
&aggregates_[tuple]);
if (!status.ok()) {
LOG(WARNING) << "Failed to load aggregate file: " << customer_id << "/" << project_id << "/"
<< metric_id << ": " << status.error_message();
}
return MetricAggregateRef(customer_id, project_id, metric_id, &aggregates_[tuple], this,
mutex_);
}
return util::Status(
util::StatusCode::NOT_FOUND,
absl::StrCat("Metric id `", metric_id, "` not found in registry for project (", customer_id,
", ", project_id, ")"));
}
return util::Status(
util::StatusCode::NOT_FOUND,
absl::StrCat("Project (", customer_id, ", ", project_id, ") not found in registry"));
}
util::Status ImmediateLocalAggregateStorage::SaveMetricAggregate(const uint32_t customer_id,
const uint32_t project_id,
const uint32_t metric_id) {
auto aggregate = aggregates_.find(std::make_tuple(customer_id, project_id, metric_id));
if (aggregate != aggregates_.end()) {
return proto_store_.Write(
absl::StrCat(base_directory_, "/", customer_id, "/", project_id, "/", metric_id),
aggregate->second);
}
return util::Status(util::StatusCode::NOT_FOUND, "No matching metric aggregate found");
}
} // namespace cobalt::local_aggregation