Reland "[InternalMetrics] Track data stored per class"
This is a reland of d414d5ae5e7b51c2d5a8068ebd64dd865974f31b
Original change's description:
> [InternalMetrics] Track data stored per class
>
> Used to track how much data is used on a per-class basis
>
> Classes tracked are:
> - ObservationStore
> - LocalAggregateStore
> - AggregateStore
> - ObservationHistory
>
> Prerequisite: https://fuchsia-review.googlesource.com/c/cobalt-registry/+/500200
> Bug: 71727
> Change-Id: I3bc6a9836a1a3a84701de9bc189d647365ca8c13
> Reviewed-on: https://fuchsia-review.googlesource.com/c/cobalt/+/500199
> Fuchsia-Auto-Submit: Zach Bush <zmbush@google.com>
> Reviewed-by: Cameron Dale <camrdale@google.com>
> Commit-Queue: Auto-Submit <auto-submit@fuchsia-infra.iam.gserviceaccount.com>
Bug: 71727
Change-Id: Iaf26c4bec961733424fd3df234045455ffd0e6ea
Reviewed-on: https://fuchsia-review.googlesource.com/c/cobalt/+/516227
Reviewed-by: Cameron Dale <camrdale@google.com>
Commit-Queue: Zach Bush <zmbush@google.com>
diff --git a/src/local_aggregation/BUILD.gn b/src/local_aggregation/BUILD.gn
index 8d7af10..cd603d4 100644
--- a/src/local_aggregation/BUILD.gn
+++ b/src/local_aggregation/BUILD.gn
@@ -107,6 +107,7 @@
"$cobalt_root/src/lib/util:protected_fields",
"$cobalt_root/src/lib/util:proto_util",
"$cobalt_root/src/logger:encoder",
+ "$cobalt_root/src/logger:internal_metrics",
"$cobalt_root/src/logger:observation_writer",
"$cobalt_root/src/logger:status",
"$cobalt_root/src/pb",
diff --git a/src/local_aggregation/aggregate_store.cc b/src/local_aggregation/aggregate_store.cc
index 4f3f64d..b793fe8 100644
--- a/src/local_aggregation/aggregate_store.cc
+++ b/src/local_aggregation/aggregate_store.cc
@@ -15,6 +15,7 @@
#include "src/lib/util/proto_util.h"
#include "src/lib/util/status.h"
#include "src/local_aggregation/aggregation_utils.h"
+#include "src/logger/internal_metrics.h"
#include "src/registry/packed_event_codes.h"
namespace cobalt::local_aggregation {
@@ -178,7 +179,8 @@
: encoder_(encoder),
observation_writer_(observation_writer),
local_aggregate_proto_store_(local_aggregate_proto_store),
- obs_history_proto_store_(obs_history_proto_store) {
+ obs_history_proto_store_(obs_history_proto_store),
+ internal_metrics_(logger::InternalMetrics::NewWithLogger(nullptr)) {
CHECK_LE(backfill_days, kMaxAllowedBackfillDays)
<< "backfill_days must be less than or equal to " << kMaxAllowedBackfillDays;
backfill_days_ = backfill_days;
@@ -356,6 +358,10 @@
// Lock, copy the LocalAggregateStore, and release the lock. Write the copy
// to |local_aggregate_proto_store_|.
auto local_aggregate_store = CopyLocalAggregateStore();
+ size_t store_size = local_aggregate_store.ByteSizeLong();
+ // Cannot cause a loop, since TrackDiskUsage does not use any Cobalt 1.0 metrics
+ internal_metrics_->TrackDiskUsage(logger::InternalMetrics::StorageClass::AggregateStore,
+ static_cast<int64_t>(store_size));
auto status = local_aggregate_proto_store_->Write(local_aggregate_store);
if (!status.ok()) {
LOG(ERROR) << "Failed to back up the LocalAggregateStore with error code: "
@@ -368,6 +374,10 @@
Status AggregateStore::BackUpObservationHistory() {
auto obs_history = protected_obs_history_.lock()->obs_history;
+ size_t history_size = obs_history.ByteSizeLong();
+ // Cannot cause a loop, since TrackDiskUsage does not use any Cobalt 1.0 metrics
+ internal_metrics_->TrackDiskUsage(logger::InternalMetrics::StorageClass::ObservationHistory,
+ static_cast<int64_t>(history_size));
auto status = obs_history_proto_store_->Write(obs_history);
if (!status.ok()) {
LOG(ERROR) << "Failed to back up the AggregatedObservationHistoryStore. "
diff --git a/src/local_aggregation/aggregate_store.h b/src/local_aggregation/aggregate_store.h
index 3b4164b..b02e249 100644
--- a/src/local_aggregation/aggregate_store.h
+++ b/src/local_aggregation/aggregate_store.h
@@ -14,6 +14,8 @@
#include "src/lib/util/protected_fields.h"
#include "src/local_aggregation/local_aggregation.pb.h"
#include "src/logger/encoder.h"
+#include "src/logger/internal_metrics.h"
+#include "src/logger/logger_interface.h"
#include "src/logger/observation_writer.h"
#include "src/logger/project_context.h"
#include "src/logger/status.h"
@@ -204,6 +206,10 @@
// information derived from the Metrics Registry, this is not a problem.
void Disable(bool is_disabled);
+ void ResetInternalMetrics(logger::LoggerInterface* internal_logger) {
+ internal_metrics_ = logger::InternalMetrics::NewWithLogger(internal_logger);
+ }
+
private:
friend class AggregateStoreTest;
friend class EventAggregatorTest;
@@ -345,6 +351,7 @@
// Used for loading and backing up the proto stores to disk.
util::ConsistentProtoStore* local_aggregate_proto_store_; // not owned
util::ConsistentProtoStore* obs_history_proto_store_; // not owned
+ std::unique_ptr<logger::InternalMetrics> internal_metrics_;
// In memory store of local aggregations and data needed to derive them.
util::ProtectedFields<AggregateStoreFields> protected_aggregate_store_;
diff --git a/src/local_aggregation/event_aggregator_mgr.cc b/src/local_aggregation/event_aggregator_mgr.cc
index 400c8d3..5b90d0a 100644
--- a/src/local_aggregation/event_aggregator_mgr.cc
+++ b/src/local_aggregation/event_aggregator_mgr.cc
@@ -168,6 +168,7 @@
aggregate_store_ = std::make_unique<AggregateStore>(
encoder_, observation_writer_, owned_local_aggregate_proto_store_.get(),
owned_obs_history_proto_store_.get(), backfill_days_);
+ aggregate_store_->ResetInternalMetrics(internal_logger_);
event_aggregator_ = std::make_unique<EventAggregator>(aggregate_store_.get());
steady_clock_ = std::make_unique<SteadyClock>();
diff --git a/src/local_aggregation/event_aggregator_mgr.h b/src/local_aggregation/event_aggregator_mgr.h
index 8cef62c..9c34f68 100644
--- a/src/local_aggregation/event_aggregator_mgr.h
+++ b/src/local_aggregation/event_aggregator_mgr.h
@@ -106,6 +106,11 @@
TriggerBackups();
}
+ void ResetInternalMetrics(logger::LoggerInterface* internal_logger) {
+ internal_logger_ = internal_logger;
+ aggregate_store_->ResetInternalMetrics(internal_logger);
+ }
+
private:
friend class TestEventAggregatorManager;
friend class EventAggregatorManagerTest;
@@ -163,6 +168,7 @@
const logger::Encoder* encoder_;
const logger::ObservationWriter* observation_writer_;
+ logger::LoggerInterface* internal_logger_ = nullptr;
size_t backfill_days_ = 0;
std::chrono::seconds aggregate_backup_interval_;
std::chrono::seconds generate_obs_interval_;
diff --git a/src/local_aggregation_1_1/BUILD.gn b/src/local_aggregation_1_1/BUILD.gn
index c45626e..5bf50ea 100644
--- a/src/local_aggregation_1_1/BUILD.gn
+++ b/src/local_aggregation_1_1/BUILD.gn
@@ -34,6 +34,7 @@
"$cobalt_root/src/lib/util/testing:test_with_files",
"$cobalt_root/src/local_aggregation_1_1/testing:test_registry",
"$cobalt_root/src/logger:event_record",
+ "$cobalt_root/src/logger:fake_logger",
"$cobalt_root/src/logger:logger_test_utils",
"$cobalt_root/src/logger:project_context_factory",
"$cobalt_root/src/system_data:client_secret",
diff --git a/src/local_aggregation_1_1/local_aggregate_storage/BUILD.gn b/src/local_aggregation_1_1/local_aggregate_storage/BUILD.gn
index 1db390e..caa2895 100644
--- a/src/local_aggregation_1_1/local_aggregate_storage/BUILD.gn
+++ b/src/local_aggregation_1_1/local_aggregate_storage/BUILD.gn
@@ -19,6 +19,8 @@
"$cobalt_root/src/lib/util:protected_fields",
"$cobalt_root/src/lib/util:status",
"$cobalt_root/src/local_aggregation_1_1:proto",
+ "$cobalt_root/src/logger:internal_metrics",
+ "$cobalt_root/src/logger:logger_interface",
"$cobalt_root/src/logger:project_context_factory",
"$cobalt_root/src/registry:cobalt_registry_proto",
]
diff --git a/src/local_aggregation_1_1/local_aggregate_storage/delayed_local_aggregate_storage.cc b/src/local_aggregation_1_1/local_aggregate_storage/delayed_local_aggregate_storage.cc
index d1b95ea..0a0d0d9 100644
--- a/src/local_aggregation_1_1/local_aggregate_storage/delayed_local_aggregate_storage.cc
+++ b/src/local_aggregation_1_1/local_aggregate_storage/delayed_local_aggregate_storage.cc
@@ -11,6 +11,7 @@
#include "src/lib/util/file_system.h"
#include "src/local_aggregation_1_1/local_aggregate_storage/local_aggregate_storage.h"
#include "src/local_aggregation_1_1/local_aggregation.pb.h"
+#include "src/logger/internal_metrics.h"
#include "src/logger/project_context.h"
#include "src/logger/project_context_factory.h"
#include "src/logging.h"
@@ -26,6 +27,7 @@
std::chrono::milliseconds writeback_frequency)
: proto_store_(std::move(base_directory), fs),
global_project_context_factory_(global_project_context_factory),
+ internal_metrics_(logger::InternalMetrics::NewWithLogger(nullptr)),
writeback_frequency_(writeback_frequency) {
ReadPersistentStore();
DeleteOutdatedMetrics();
@@ -147,6 +149,14 @@
// Here, we unlock locked_state before attempting to acquire data_mutex_ so that this codepath
// acquires the two locks in the same order, thus avoiding any potential of deadlock.
locked_state.unlock();
+
+ // While everything is unlocked, track the current storage usage.
+ // This cannot cause an infinite loop, since we are already in the 'data_modified' state,
+ // which will be reset to false at the end of this block.
+ auto size = static_cast<int64_t>(aggregates_.ByteSizeLong());
+ internal_metrics_->TrackDiskUsage(
+ logger::InternalMetrics::StorageClass::LocalAggregateStorage, size);
+
// Lock the class so no new calls to GetMetricAggregate will return until the data has
// written.
std::scoped_lock<std::mutex> data_lock(data_mutex_);
diff --git a/src/local_aggregation_1_1/local_aggregate_storage/delayed_local_aggregate_storage.h b/src/local_aggregation_1_1/local_aggregate_storage/delayed_local_aggregate_storage.h
index da0536e..81138e3 100644
--- a/src/local_aggregation_1_1/local_aggregate_storage/delayed_local_aggregate_storage.h
+++ b/src/local_aggregation_1_1/local_aggregate_storage/delayed_local_aggregate_storage.h
@@ -17,6 +17,8 @@
#include "src/lib/util/protected_fields.h"
#include "src/local_aggregation_1_1/local_aggregate_storage/local_aggregate_storage.h"
#include "src/local_aggregation_1_1/local_aggregation.pb.h"
+#include "src/logger/internal_metrics.h"
+#include "src/logger/logger_interface.h"
#include "src/logger/project_context_factory.h"
#include "src/registry/cobalt_registry.pb.h"
@@ -62,6 +64,10 @@
// Blocks for |max_wait| milliseconds or until the writeback thread has begun to write data.
bool WaitUntilSaveStart(std::chrono::milliseconds max_wait);
+ void ResetInternalMetrics(logger::LoggerInterface *internal_logger) override {
+ internal_metrics_ = logger::InternalMetrics::NewWithLogger(internal_logger);
+ }
+
protected:
util::Status SaveMetricAggregate(uint32_t customer_id, uint32_t project_id,
uint32_t metric_id) override;
@@ -91,6 +97,7 @@
util::ConsistentProtoStore proto_store_;
const logger::ProjectContextFactory *global_project_context_factory_;
+ std::unique_ptr<logger::InternalMetrics> internal_metrics_;
// This mutex only guards aggregates_ and its lock is passed into the MetricAggregateRef returned
// from GetMetricAggregate. If this lock is needed in addition to a lock on state_, this lock
diff --git a/src/local_aggregation_1_1/local_aggregate_storage/immediate_local_aggregate_storage.h b/src/local_aggregation_1_1/local_aggregate_storage/immediate_local_aggregate_storage.h
index 5c1f997..ab575ed 100644
--- a/src/local_aggregation_1_1/local_aggregate_storage/immediate_local_aggregate_storage.h
+++ b/src/local_aggregation_1_1/local_aggregate_storage/immediate_local_aggregate_storage.h
@@ -59,6 +59,8 @@
util::Status SaveMetricAggregate(uint32_t customer_id, uint32_t project_id,
uint32_t metric_id) override;
+ void ResetInternalMetrics(logger::LoggerInterface *internal_logger) override {}
+
private:
// DeleteOutdatedMetrics walks the filesystem from the |base_directory_| down and deletes
// MetricAggregate files, and project directories that do not exist in the CobaltRegistry.
diff --git a/src/local_aggregation_1_1/local_aggregate_storage/local_aggregate_storage.h b/src/local_aggregation_1_1/local_aggregate_storage/local_aggregate_storage.h
index c7e5521..9b96163 100644
--- a/src/local_aggregation_1_1/local_aggregate_storage/local_aggregate_storage.h
+++ b/src/local_aggregation_1_1/local_aggregate_storage/local_aggregate_storage.h
@@ -11,6 +11,7 @@
#include "src/lib/util/file_system.h"
#include "src/lib/util/status.h"
#include "src/local_aggregation_1_1/local_aggregation.pb.h"
+#include "src/logger/logger_interface.h"
#include "src/logger/project_context_factory.h"
namespace cobalt::local_aggregation {
@@ -91,6 +92,8 @@
// immediately.
virtual void DeleteData() = 0;
+ virtual void ResetInternalMetrics(logger::LoggerInterface *internal_logger) = 0;
+
virtual ~LocalAggregateStorage() = default;
protected:
diff --git a/src/local_aggregation_1_1/local_aggregation.h b/src/local_aggregation_1_1/local_aggregation.h
index 39c8df6..6d810b9 100644
--- a/src/local_aggregation_1_1/local_aggregation.h
+++ b/src/local_aggregation_1_1/local_aggregation.h
@@ -12,6 +12,7 @@
#include "src/local_aggregation_1_1/local_aggregate_storage/local_aggregate_storage.h"
#include "src/local_aggregation_1_1/observation_generator.h"
#include "src/logger/event_record.h"
+#include "src/logger/logger_interface.h"
#include "src/logger/observation_writer.h"
#include "src/logger/project_context_factory.h"
#include "src/pb/metadata_builder.h"
@@ -69,6 +70,10 @@
void DeleteData() { aggregate_storage_->DeleteData(); }
+ void ResetInternalMetrics(logger::LoggerInterface *internal_logger) {
+ aggregate_storage_->ResetInternalMetrics(internal_logger);
+ }
+
private:
std::unique_ptr<LocalAggregateStorage> aggregate_storage_;
MetadataBuilder *metadata_builder_;
diff --git a/src/local_aggregation_1_1/observation_generator_test.cc b/src/local_aggregation_1_1/observation_generator_test.cc
index 4357b0d..c1a00cf 100644
--- a/src/local_aggregation_1_1/observation_generator_test.cc
+++ b/src/local_aggregation_1_1/observation_generator_test.cc
@@ -14,13 +14,19 @@
#include "src/lib/util/datetime_util.h"
#include "src/lib/util/testing/test_with_files.h"
#include "src/local_aggregation_1_1/aggregation_procedures/aggregation_procedure.h"
+#include "src/local_aggregation_1_1/backfill_manager.h"
#include "src/local_aggregation_1_1/local_aggregate_storage/immediate_local_aggregate_storage.h"
#include "src/local_aggregation_1_1/local_aggregate_storage/local_aggregate_storage.h"
#include "src/local_aggregation_1_1/local_aggregation.pb.h"
#include "src/local_aggregation_1_1/testing/test_registry.cb.h"
+#include "src/logger/fake_logger.h"
+#include "src/logger/internal_metrics_config.cb.h"
#include "src/logger/observation_writer.h"
#include "src/logger/privacy_encoder.h"
#include "src/logger/project_context_factory.h"
+#include "src/logger/status.h"
+#include "src/logger/types.h"
+#include "src/observation_store/file_observation_store.h"
#include "src/observation_store/observation_store.h"
#include "src/observation_store/observation_store_internal.pb.h"
#include "src/pb/metadata_builder.h"
@@ -120,7 +126,9 @@
return observation_generator_->GenerateObservationsOnce(utc, local);
}
- private:
+ friend class BasicLogger;
+
+ protected:
system_data::FakeSystemData system_data_;
std::unique_ptr<MetadataBuilder> metadata_builder_;
@@ -284,4 +292,52 @@
}
}
+class BasicLogger : public logger::testing::FakeLogger {
+ public:
+ explicit BasicLogger(ObservationGeneratorTest* test) : test_(test) {}
+
+ logger::Status LogInteger(uint32_t metric_id, int64_t value,
+ const std::vector<uint32_t>& event_codes) override {
+ // Lock the aggregate store to simulate actually performing log.
+ test_->aggregate_storage_->GetMetricAggregate(1, 1, 1);
+ return logger::testing::FakeLogger::LogInteger(metric_id, value, event_codes);
+ }
+
+ private:
+ ObservationGeneratorTest* test_;
+};
+
+TEST_F(ObservationGeneratorTest, DoesNotDeadlock) {
+ aggregate_storage_ =
+ LocalAggregateStorage::New(LocalAggregateStorage::StorageStrategy::Delayed, test_folder(),
+ fs(), project_context_factory_.get());
+ observation_store::FileObservationStore obs_store(10000, 10000, 10000, fs(),
+ test_folder() + "/obs_store");
+ BasicLogger logger(this);
+ obs_store.ResetInternalMetrics(&logger);
+
+ const uint32_t kMaxHourId = 101;
+
+ {
+ MetricAggregateRef aggregate = GetMetricAggregate(kOccurrenceMetricMetricId);
+ ReportAggregate* report =
+ &(*aggregate.aggregate()
+ ->mutable_by_report_id())[kOccurrenceMetricFleetwideOccurrenceCountsReportReportId];
+ for (uint32_t i = 1; i <= kMaxHourId; i += 2) {
+ (*report->mutable_hourly()->mutable_by_hour_id())[i]
+ .add_by_event_code()
+ ->mutable_data()
+ ->set_count(i * 100);
+ }
+ ASSERT_TRUE(aggregate.Save().ok());
+ }
+
+ logger::ObservationWriter observation_writer(&obs_store, nullptr);
+ ConstructObservationGenerator(&observation_writer, std::make_unique<FakePrivacyEncoder>(false));
+
+ for (uint32_t i = 1; i <= kMaxHourId; i += 4) {
+ GenerateObservationsOnce(TimeInfo::FromHourId(i), TimeInfo::FromHourId(i));
+ }
+}
+
} // namespace cobalt::local_aggregation
diff --git a/src/logger/fake_logger.cc b/src/logger/fake_logger.cc
index 6dd9a21..16e5bf1 100644
--- a/src/logger/fake_logger.cc
+++ b/src/logger/fake_logger.cc
@@ -44,7 +44,7 @@
Event event;
event.set_metric_id(metric_id);
event.mutable_event_occurred_event()->set_event_code(event_code);
- last_event_logged_ = event;
+ TrackEvent(event);
return Status::kOK;
}
@@ -60,7 +60,7 @@
CopyEventCodesAndComponent(event_codes, component, event_count_event);
event_count_event->set_period_duration_micros(period_duration_micros);
event_count_event->set_count(count);
- last_event_logged_ = event;
+ TrackEvent(event);
return Status::kOK;
}
@@ -74,7 +74,7 @@
auto* elapsed_time_event = event.mutable_elapsed_time_event();
CopyEventCodesAndComponent(event_codes, component, elapsed_time_event);
elapsed_time_event->set_elapsed_micros(elapsed_micros);
- last_event_logged_ = event;
+ TrackEvent(event);
return Status::kOK;
}
@@ -89,7 +89,7 @@
CopyEventCodesAndComponent(event_codes, component, frame_rate_event);
// NOLINTNEXTLINE readability-magic-numbers
frame_rate_event->set_frames_per_1000_seconds(std::round(fps * 1000.0));
- last_event_logged_ = event;
+ TrackEvent(event);
return Status::kOK;
}
@@ -103,7 +103,7 @@
auto* memory_usage_event = event.mutable_memory_usage_event();
CopyEventCodesAndComponent(event_codes, component, memory_usage_event);
memory_usage_event->set_bytes(bytes);
- last_event_logged_ = event;
+ TrackEvent(event);
return Status::kOK;
}
@@ -117,7 +117,7 @@
auto* int_histogram_event = event.mutable_int_histogram_event();
CopyEventCodesAndComponent(event_codes, component, int_histogram_event);
int_histogram_event->mutable_buckets()->Swap(histogram.get());
- last_event_logged_ = event;
+ TrackEvent(event);
return Status::kOK;
}
@@ -131,7 +131,7 @@
auto* occurrence_event = event.mutable_occurrence_event();
CopyEventCodes(event_codes, occurrence_event);
occurrence_event->set_count(count);
- last_event_logged_ = event;
+ TrackEvent(event);
return Status::kOK;
}
@@ -145,7 +145,7 @@
auto* integer_event = event.mutable_integer_event();
CopyEventCodes(event_codes, integer_event);
integer_event->set_value(value);
- last_event_logged_ = event;
+ TrackEvent(event);
return Status::kOK;
}
@@ -159,7 +159,7 @@
auto* integer_histogram_event = event.mutable_integer_histogram_event();
CopyEventCodes(event_codes, integer_histogram_event);
integer_histogram_event->mutable_buckets()->Swap(histogram.get());
- last_event_logged_ = event;
+ TrackEvent(event);
return Status::kOK;
}
@@ -173,7 +173,7 @@
auto* string_event = event.mutable_string_event();
CopyEventCodes(event_codes, string_event);
string_event->set_string_value(string_value);
- last_event_logged_ = event;
+ TrackEvent(event);
return Status::kOK;
}
@@ -184,7 +184,7 @@
Event event;
event.set_metric_id(metric_id);
event.mutable_custom_event()->mutable_values()->swap(*event_values);
- last_event_logged_ = event;
+ TrackEvent(event);
return Status::kOK;
}
diff --git a/src/logger/fake_logger.h b/src/logger/fake_logger.h
index fb63c1f..93203a6 100644
--- a/src/logger/fake_logger.h
+++ b/src/logger/fake_logger.h
@@ -75,14 +75,20 @@
void ResumeInternalLogging() override { internal_logging_paused_ = false; }
- uint32_t call_count() { return call_count_; }
- Event last_event_logged() { return last_event_logged_; }
+ void TrackEvent(Event e) {
+ events_logged_.push_back(e);
+ last_event_logged_ = std::move(e);
+ }
+ uint32_t call_count() const { return call_count_; }
+ Event last_event_logged() const { return last_event_logged_; }
+ Event nth_event_logged(int64_t n) const { return events_logged_[n]; }
std::map<PerProjectLoggerCallsMadeMetricDimensionLoggerMethod, uint32_t> internal_logger_calls() {
return internal_logger_calls_;
}
private:
Event last_event_logged_;
+ std::vector<Event> events_logged_;
uint32_t call_count_ = 0;
std::map<PerProjectLoggerCallsMadeMetricDimensionLoggerMethod, uint32_t> internal_logger_calls_;
bool internal_logging_paused_ = false;
diff --git a/src/logger/internal_metrics.cc b/src/logger/internal_metrics.cc
index 34c254c..8d10907 100644
--- a/src/logger/internal_metrics.cc
+++ b/src/logger/internal_metrics.cc
@@ -8,6 +8,7 @@
#include <string>
#include <utility>
+#include "src/logger/internal_metrics_config.cb.h"
#include "src/logging.h"
namespace cobalt::logger {
@@ -168,6 +169,32 @@
"status="
<< status;
}
-} // namespace cobalt::logger
+}
+
+const float kPerMilleMultiplier = 1000.0;
+void InternalMetricsImpl::TrackDiskUsage(StorageClass storage_class, int64_t bytes,
+ int64_t max_bytes) {
+ if (paused_) {
+ return;
+ }
+
+ // N.B. This method may only include Cobalt 1.1 metrics. Using Cobalt 1.0 metrics here have the
+ // potential to cause logging loops.
+ auto status = logger_->LogInteger(kStoreUsageMetricId, bytes, {storage_class});
+ if (status != kOK) {
+ VLOG(1) << "InternalMetricsImpl::TrackDiskUsage: LogInteger(disk_usage) returned status="
+ << status;
+ }
+
+ if (max_bytes >= 0) {
+ auto fullness_per_mille = static_cast<uint32_t>(
+ (static_cast<float>(bytes) / static_cast<float>(max_bytes)) * kPerMilleMultiplier);
+ status = logger_->LogInteger(kStoreFullnessMetricId, fullness_per_mille, {storage_class});
+ if (status != kOK) {
+ VLOG(1) << "InternalMetricsImpl::TrackDiskUsage: LogInteger(disk_fullness) returned status="
+ << status;
+ }
+ }
+}
} // namespace cobalt::logger
diff --git a/src/logger/internal_metrics.h b/src/logger/internal_metrics.h
index 62a269e..abfd331 100644
--- a/src/logger/internal_metrics.h
+++ b/src/logger/internal_metrics.h
@@ -12,8 +12,7 @@
#include "src/logger/logger_interface.h"
#include "src/registry/project.pb.h"
-namespace cobalt {
-namespace logger {
+namespace cobalt::logger {
// InternalMetrics defines the methods used for collecting cobalt-internal
// metrics.
@@ -68,6 +67,20 @@
virtual void SetSoftwareDistributionInfoCalled(
SetSoftwareDistributionInfoCalledEventCodes event_codes) = 0;
+ // Used to mark which class this disk usage is coming from. Can be:
+ // - ObservationStore
+ // - LocalAggregateStorage
+ // - AggregateStore
+ // - ObservationHistory
+ using StorageClass = MetricsMetricDimensionStorageClass;
+
+ // cobalt_internal::metrics::storage_usage is used to track how much data is stored per-class on
+ // disk.
+ virtual void TrackDiskUsage(StorageClass storage_class, int64_t bytes, int64_t max_bytes) = 0;
+ void TrackDiskUsage(StorageClass storage_class, int64_t bytes) {
+ TrackDiskUsage(storage_class, bytes, -1);
+ }
+
// After PauseLogging is called, all calls to log internal metrics will be
// ignored.
virtual void PauseLogging() = 0;
@@ -113,6 +126,8 @@
void SetSoftwareDistributionInfoCalled(
SetSoftwareDistributionInfoCalledEventCodes event_codes) override {}
+ void TrackDiskUsage(StorageClass storage_class, int64_t bytes, int64_t max_bytes) override {}
+
void PauseLogging() override {}
void ResumeLogging() override {}
@@ -151,6 +166,8 @@
void SetSoftwareDistributionInfoCalled(
SetSoftwareDistributionInfoCalledEventCodes event_codes) override;
+ void TrackDiskUsage(StorageClass storage_class, int64_t bytes, int64_t max_bytes) override;
+
void PauseLogging() override { paused_ = true; }
void ResumeLogging() override { paused_ = false; }
@@ -163,7 +180,6 @@
LoggerInterface* logger_; // not owned
};
-} // namespace logger
-} // namespace cobalt
+} // namespace cobalt::logger
#endif // COBALT_SRC_LOGGER_INTERNAL_METRICS_H_
diff --git a/src/logger/internal_metrics_test.cc b/src/logger/internal_metrics_test.cc
index 28e7b31..f72ba79 100644
--- a/src/logger/internal_metrics_test.cc
+++ b/src/logger/internal_metrics_test.cc
@@ -182,4 +182,16 @@
ASSERT_EQ(logger.call_count(), 0);
}
+TEST_F(InternalMetricsImplTest, TrackDiskUsageWorks) {
+ testing::FakeLogger logger;
+ InternalMetricsImpl metrics(&logger);
+
+ metrics.TrackDiskUsage(InternalMetrics::StorageClass::ObservationStore, 90, 100);
+ ASSERT_EQ(logger.call_count(), 2);
+ ASSERT_TRUE(logger.nth_event_logged(0).has_integer_event());
+ ASSERT_EQ(logger.nth_event_logged(0).integer_event().value(), 90);
+ ASSERT_TRUE(logger.nth_event_logged(1).has_integer_event());
+ ASSERT_EQ(logger.nth_event_logged(1).integer_event().value(), 900);
+}
+
} // namespace cobalt::logger
diff --git a/src/observation_store/file_observation_store.cc b/src/observation_store/file_observation_store.cc
index 8d1ae7d..408b896 100644
--- a/src/observation_store/file_observation_store.cc
+++ b/src/observation_store/file_observation_store.cc
@@ -9,6 +9,7 @@
#include <regex>
#include <utility>
+#include "src/logger/internal_metrics_config.cb.h"
#include "src/logger/logger_interface.h"
#include "src/logging.h"
#include "src/observation_store/observation_store_internal.pb.h"
@@ -158,6 +159,7 @@
.report_id = report_id}]++;
internal_metrics_->BytesStored(logger::PerProjectBytesStoredMetricDimensionStatus::Succeeded,
obs_size, metadata->customer_id(), metadata->project_id());
+
return kOk;
}
@@ -274,6 +276,12 @@
}
std::unique_ptr<ObservationStore::EnvelopeHolder> FileObservationStore::TakeNextEnvelopeHolder() {
+ // Cannot cause a loop, since TrackDiskUsage is locally aggregated, and is sent at most once per
+ // hour.
+ internal_metrics_->TrackDiskUsage(logger::InternalMetrics::StorageClass::ObservationStore,
+ static_cast<int64_t>(Size()),
+ static_cast<int64_t>(max_bytes_total_));
+
auto fields = protected_fields_.lock();
auto oldest_file_name_or = GetOldestFinalizedFile(&fields);
@@ -330,6 +338,13 @@
fields->finalized_bytes -= fs_->FileSize(FullPath(file_name)).ConsumeValueOr(0);
fs_->Delete(FullPath(file_name));
}
+ fields.unlock();
+
+ // Cannot cause a loop, since TrackDiskUsage is locally aggregated, and is sent at most once per
+ // hour.
+ store_->internal_metrics_->TrackDiskUsage(logger::InternalMetrics::StorageClass::ObservationStore,
+ static_cast<int64_t>(store_->Size()),
+ static_cast<int64_t>(store_->max_bytes_total_));
}
void FileObservationStore::FileEnvelopeHolder::MergeWith(
diff --git a/src/public/cobalt_service.cc b/src/public/cobalt_service.cc
index fe22c12..487180e 100644
--- a/src/public/cobalt_service.cc
+++ b/src/public/cobalt_service.cc
@@ -124,12 +124,12 @@
LOG(ERROR) << "The global_registry provided does not include the expected internal metrics "
"project. Cobalt-measuring-cobalt will be disabled.";
} else {
- // This sets up internal metrics for both the ShippingManager and the ClearcutUploader.
+ // Set up internal metrics for various components.
shipping_manager_->ResetInternalMetrics(internal_logger_.get());
- // And this sets it up for the ObservationStore.
observation_store_->ResetInternalMetrics(internal_logger_.get());
- // And this sets it up for SystemData.
system_data_.ResetInternalMetrics(internal_logger_.get());
+ event_aggregator_manager_.ResetInternalMetrics(internal_logger_.get());
+ local_aggregation_.ResetInternalMetrics(internal_logger_.get());
}
if (start_worker_threads_) {
shipping_manager_->Start();