Reland "[InternalMetrics] Track data stored per class"

This is a reland of d414d5ae5e7b51c2d5a8068ebd64dd865974f31b

Original change's description:
> [InternalMetrics] Track data stored per class
>
> Used to track how much data is used on a per-class basis
>
> Classes tracked are:
>  - ObservationStore
>  - LocalAggregateStore
>  - AggregateStore
>  - ObservationHistory
>
> Prerequisite: https://fuchsia-review.googlesource.com/c/cobalt-registry/+/500200
> Bug: 71727
> Change-Id: I3bc6a9836a1a3a84701de9bc189d647365ca8c13
> Reviewed-on: https://fuchsia-review.googlesource.com/c/cobalt/+/500199
> Fuchsia-Auto-Submit: Zach Bush <zmbush@google.com>
> Reviewed-by: Cameron Dale <camrdale@google.com>
> Commit-Queue: Auto-Submit <auto-submit@fuchsia-infra.iam.gserviceaccount.com>

Bug: 71727
Change-Id: Iaf26c4bec961733424fd3df234045455ffd0e6ea
Reviewed-on: https://fuchsia-review.googlesource.com/c/cobalt/+/516227
Reviewed-by: Cameron Dale <camrdale@google.com>
Commit-Queue: Zach Bush <zmbush@google.com>
diff --git a/src/local_aggregation/BUILD.gn b/src/local_aggregation/BUILD.gn
index 8d7af10..cd603d4 100644
--- a/src/local_aggregation/BUILD.gn
+++ b/src/local_aggregation/BUILD.gn
@@ -107,6 +107,7 @@
     "$cobalt_root/src/lib/util:protected_fields",
     "$cobalt_root/src/lib/util:proto_util",
     "$cobalt_root/src/logger:encoder",
+    "$cobalt_root/src/logger:internal_metrics",
     "$cobalt_root/src/logger:observation_writer",
     "$cobalt_root/src/logger:status",
     "$cobalt_root/src/pb",
diff --git a/src/local_aggregation/aggregate_store.cc b/src/local_aggregation/aggregate_store.cc
index 4f3f64d..b793fe8 100644
--- a/src/local_aggregation/aggregate_store.cc
+++ b/src/local_aggregation/aggregate_store.cc
@@ -15,6 +15,7 @@
 #include "src/lib/util/proto_util.h"
 #include "src/lib/util/status.h"
 #include "src/local_aggregation/aggregation_utils.h"
+#include "src/logger/internal_metrics.h"
 #include "src/registry/packed_event_codes.h"
 
 namespace cobalt::local_aggregation {
@@ -178,7 +179,8 @@
     : encoder_(encoder),
       observation_writer_(observation_writer),
       local_aggregate_proto_store_(local_aggregate_proto_store),
-      obs_history_proto_store_(obs_history_proto_store) {
+      obs_history_proto_store_(obs_history_proto_store),
+      internal_metrics_(logger::InternalMetrics::NewWithLogger(nullptr)) {
   CHECK_LE(backfill_days, kMaxAllowedBackfillDays)
       << "backfill_days must be less than or equal to " << kMaxAllowedBackfillDays;
   backfill_days_ = backfill_days;
@@ -356,6 +358,10 @@
   // Lock, copy the LocalAggregateStore, and release the lock. Write the copy
   // to |local_aggregate_proto_store_|.
   auto local_aggregate_store = CopyLocalAggregateStore();
+  size_t store_size = local_aggregate_store.ByteSizeLong();
+  // Cannot cause a loop, since TrackDiskUsage does not use any Cobalt 1.0 metrics
+  internal_metrics_->TrackDiskUsage(logger::InternalMetrics::StorageClass::AggregateStore,
+                                    static_cast<int64_t>(store_size));
   auto status = local_aggregate_proto_store_->Write(local_aggregate_store);
   if (!status.ok()) {
     LOG(ERROR) << "Failed to back up the LocalAggregateStore with error code: "
@@ -368,6 +374,10 @@
 
 Status AggregateStore::BackUpObservationHistory() {
   auto obs_history = protected_obs_history_.lock()->obs_history;
+  size_t history_size = obs_history.ByteSizeLong();
+  // Cannot cause a loop, since TrackDiskUsage does not use any Cobalt 1.0 metrics
+  internal_metrics_->TrackDiskUsage(logger::InternalMetrics::StorageClass::ObservationHistory,
+                                    static_cast<int64_t>(history_size));
   auto status = obs_history_proto_store_->Write(obs_history);
   if (!status.ok()) {
     LOG(ERROR) << "Failed to back up the AggregatedObservationHistoryStore. "
diff --git a/src/local_aggregation/aggregate_store.h b/src/local_aggregation/aggregate_store.h
index 3b4164b..b02e249 100644
--- a/src/local_aggregation/aggregate_store.h
+++ b/src/local_aggregation/aggregate_store.h
@@ -14,6 +14,8 @@
 #include "src/lib/util/protected_fields.h"
 #include "src/local_aggregation/local_aggregation.pb.h"
 #include "src/logger/encoder.h"
+#include "src/logger/internal_metrics.h"
+#include "src/logger/logger_interface.h"
 #include "src/logger/observation_writer.h"
 #include "src/logger/project_context.h"
 #include "src/logger/status.h"
@@ -204,6 +206,10 @@
   //    information derived from the Metrics Registry, this is not a problem.
   void Disable(bool is_disabled);
 
+  void ResetInternalMetrics(logger::LoggerInterface* internal_logger) {
+    internal_metrics_ = logger::InternalMetrics::NewWithLogger(internal_logger);
+  }
+
  private:
   friend class AggregateStoreTest;
   friend class EventAggregatorTest;
@@ -345,6 +351,7 @@
   // Used for loading and backing up the proto stores to disk.
   util::ConsistentProtoStore* local_aggregate_proto_store_;  // not owned
   util::ConsistentProtoStore* obs_history_proto_store_;      // not owned
+  std::unique_ptr<logger::InternalMetrics> internal_metrics_;
 
   // In memory store of local aggregations and data needed to derive them.
   util::ProtectedFields<AggregateStoreFields> protected_aggregate_store_;
diff --git a/src/local_aggregation/event_aggregator_mgr.cc b/src/local_aggregation/event_aggregator_mgr.cc
index 400c8d3..5b90d0a 100644
--- a/src/local_aggregation/event_aggregator_mgr.cc
+++ b/src/local_aggregation/event_aggregator_mgr.cc
@@ -168,6 +168,7 @@
   aggregate_store_ = std::make_unique<AggregateStore>(
       encoder_, observation_writer_, owned_local_aggregate_proto_store_.get(),
       owned_obs_history_proto_store_.get(), backfill_days_);
+  aggregate_store_->ResetInternalMetrics(internal_logger_);
 
   event_aggregator_ = std::make_unique<EventAggregator>(aggregate_store_.get());
   steady_clock_ = std::make_unique<SteadyClock>();
diff --git a/src/local_aggregation/event_aggregator_mgr.h b/src/local_aggregation/event_aggregator_mgr.h
index 8cef62c..9c34f68 100644
--- a/src/local_aggregation/event_aggregator_mgr.h
+++ b/src/local_aggregation/event_aggregator_mgr.h
@@ -106,6 +106,11 @@
     TriggerBackups();
   }
 
+  void ResetInternalMetrics(logger::LoggerInterface* internal_logger) {
+    internal_logger_ = internal_logger;
+    aggregate_store_->ResetInternalMetrics(internal_logger);
+  }
+
  private:
   friend class TestEventAggregatorManager;
   friend class EventAggregatorManagerTest;
@@ -163,6 +168,7 @@
 
   const logger::Encoder* encoder_;
   const logger::ObservationWriter* observation_writer_;
+  logger::LoggerInterface* internal_logger_ = nullptr;
   size_t backfill_days_ = 0;
   std::chrono::seconds aggregate_backup_interval_;
   std::chrono::seconds generate_obs_interval_;
diff --git a/src/local_aggregation_1_1/BUILD.gn b/src/local_aggregation_1_1/BUILD.gn
index c45626e..5bf50ea 100644
--- a/src/local_aggregation_1_1/BUILD.gn
+++ b/src/local_aggregation_1_1/BUILD.gn
@@ -34,6 +34,7 @@
     "$cobalt_root/src/lib/util/testing:test_with_files",
     "$cobalt_root/src/local_aggregation_1_1/testing:test_registry",
     "$cobalt_root/src/logger:event_record",
+    "$cobalt_root/src/logger:fake_logger",
     "$cobalt_root/src/logger:logger_test_utils",
     "$cobalt_root/src/logger:project_context_factory",
     "$cobalt_root/src/system_data:client_secret",
diff --git a/src/local_aggregation_1_1/local_aggregate_storage/BUILD.gn b/src/local_aggregation_1_1/local_aggregate_storage/BUILD.gn
index 1db390e..caa2895 100644
--- a/src/local_aggregation_1_1/local_aggregate_storage/BUILD.gn
+++ b/src/local_aggregation_1_1/local_aggregate_storage/BUILD.gn
@@ -19,6 +19,8 @@
     "$cobalt_root/src/lib/util:protected_fields",
     "$cobalt_root/src/lib/util:status",
     "$cobalt_root/src/local_aggregation_1_1:proto",
+    "$cobalt_root/src/logger:internal_metrics",
+    "$cobalt_root/src/logger:logger_interface",
     "$cobalt_root/src/logger:project_context_factory",
     "$cobalt_root/src/registry:cobalt_registry_proto",
   ]
diff --git a/src/local_aggregation_1_1/local_aggregate_storage/delayed_local_aggregate_storage.cc b/src/local_aggregation_1_1/local_aggregate_storage/delayed_local_aggregate_storage.cc
index d1b95ea..0a0d0d9 100644
--- a/src/local_aggregation_1_1/local_aggregate_storage/delayed_local_aggregate_storage.cc
+++ b/src/local_aggregation_1_1/local_aggregate_storage/delayed_local_aggregate_storage.cc
@@ -11,6 +11,7 @@
 #include "src/lib/util/file_system.h"
 #include "src/local_aggregation_1_1/local_aggregate_storage/local_aggregate_storage.h"
 #include "src/local_aggregation_1_1/local_aggregation.pb.h"
+#include "src/logger/internal_metrics.h"
 #include "src/logger/project_context.h"
 #include "src/logger/project_context_factory.h"
 #include "src/logging.h"
@@ -26,6 +27,7 @@
     std::chrono::milliseconds writeback_frequency)
     : proto_store_(std::move(base_directory), fs),
       global_project_context_factory_(global_project_context_factory),
+      internal_metrics_(logger::InternalMetrics::NewWithLogger(nullptr)),
       writeback_frequency_(writeback_frequency) {
   ReadPersistentStore();
   DeleteOutdatedMetrics();
@@ -147,6 +149,14 @@
       // Here, we unlock locked_state before attempting to acquire data_mutex_ so that this codepath
       // acquires the two locks in the same order, thus avoiding any potential of deadlock.
       locked_state.unlock();
+
+      // While everything is unlocked, track the current storage usage.
+      // This cannot cause an infinite loop, since we are already in the 'data_modified' state,
+      // which will be reset to false at the end of this block.
+      auto size = static_cast<int64_t>(aggregates_.ByteSizeLong());
+      internal_metrics_->TrackDiskUsage(
+          logger::InternalMetrics::StorageClass::LocalAggregateStorage, size);
+
       // Lock the class so no new calls to GetMetricAggregate will return until the data has
       // written.
       std::scoped_lock<std::mutex> data_lock(data_mutex_);
diff --git a/src/local_aggregation_1_1/local_aggregate_storage/delayed_local_aggregate_storage.h b/src/local_aggregation_1_1/local_aggregate_storage/delayed_local_aggregate_storage.h
index da0536e..81138e3 100644
--- a/src/local_aggregation_1_1/local_aggregate_storage/delayed_local_aggregate_storage.h
+++ b/src/local_aggregation_1_1/local_aggregate_storage/delayed_local_aggregate_storage.h
@@ -17,6 +17,8 @@
 #include "src/lib/util/protected_fields.h"
 #include "src/local_aggregation_1_1/local_aggregate_storage/local_aggregate_storage.h"
 #include "src/local_aggregation_1_1/local_aggregation.pb.h"
+#include "src/logger/internal_metrics.h"
+#include "src/logger/logger_interface.h"
 #include "src/logger/project_context_factory.h"
 #include "src/registry/cobalt_registry.pb.h"
 
@@ -62,6 +64,10 @@
   // Blocks for |max_wait| milliseconds or until the writeback thread has begun to write data.
   bool WaitUntilSaveStart(std::chrono::milliseconds max_wait);
 
+  void ResetInternalMetrics(logger::LoggerInterface *internal_logger) override {
+    internal_metrics_ = logger::InternalMetrics::NewWithLogger(internal_logger);
+  }
+
  protected:
   util::Status SaveMetricAggregate(uint32_t customer_id, uint32_t project_id,
                                    uint32_t metric_id) override;
@@ -91,6 +97,7 @@
 
   util::ConsistentProtoStore proto_store_;
   const logger::ProjectContextFactory *global_project_context_factory_;
+  std::unique_ptr<logger::InternalMetrics> internal_metrics_;
 
   // This mutex only guards aggregates_ and its lock is passed into the MetricAggregateRef returned
   // from GetMetricAggregate. If this lock is needed in addition to a lock on state_, this lock
diff --git a/src/local_aggregation_1_1/local_aggregate_storage/immediate_local_aggregate_storage.h b/src/local_aggregation_1_1/local_aggregate_storage/immediate_local_aggregate_storage.h
index 5c1f997..ab575ed 100644
--- a/src/local_aggregation_1_1/local_aggregate_storage/immediate_local_aggregate_storage.h
+++ b/src/local_aggregation_1_1/local_aggregate_storage/immediate_local_aggregate_storage.h
@@ -59,6 +59,8 @@
   util::Status SaveMetricAggregate(uint32_t customer_id, uint32_t project_id,
                                    uint32_t metric_id) override;
 
+  void ResetInternalMetrics(logger::LoggerInterface *internal_logger) override {}
+
  private:
   // DeleteOutdatedMetrics walks the filesystem from the |base_directory_| down and deletes
   // MetricAggregate files, and project directories that do not exist in the CobaltRegistry.
diff --git a/src/local_aggregation_1_1/local_aggregate_storage/local_aggregate_storage.h b/src/local_aggregation_1_1/local_aggregate_storage/local_aggregate_storage.h
index c7e5521..9b96163 100644
--- a/src/local_aggregation_1_1/local_aggregate_storage/local_aggregate_storage.h
+++ b/src/local_aggregation_1_1/local_aggregate_storage/local_aggregate_storage.h
@@ -11,6 +11,7 @@
 #include "src/lib/util/file_system.h"
 #include "src/lib/util/status.h"
 #include "src/local_aggregation_1_1/local_aggregation.pb.h"
+#include "src/logger/logger_interface.h"
 #include "src/logger/project_context_factory.h"
 
 namespace cobalt::local_aggregation {
@@ -91,6 +92,8 @@
   // immediately.
   virtual void DeleteData() = 0;
 
+  virtual void ResetInternalMetrics(logger::LoggerInterface *internal_logger) = 0;
+
   virtual ~LocalAggregateStorage() = default;
 
  protected:
diff --git a/src/local_aggregation_1_1/local_aggregation.h b/src/local_aggregation_1_1/local_aggregation.h
index 39c8df6..6d810b9 100644
--- a/src/local_aggregation_1_1/local_aggregation.h
+++ b/src/local_aggregation_1_1/local_aggregation.h
@@ -12,6 +12,7 @@
 #include "src/local_aggregation_1_1/local_aggregate_storage/local_aggregate_storage.h"
 #include "src/local_aggregation_1_1/observation_generator.h"
 #include "src/logger/event_record.h"
+#include "src/logger/logger_interface.h"
 #include "src/logger/observation_writer.h"
 #include "src/logger/project_context_factory.h"
 #include "src/pb/metadata_builder.h"
@@ -69,6 +70,10 @@
 
   void DeleteData() { aggregate_storage_->DeleteData(); }
 
+  void ResetInternalMetrics(logger::LoggerInterface *internal_logger) {
+    aggregate_storage_->ResetInternalMetrics(internal_logger);
+  }
+
  private:
   std::unique_ptr<LocalAggregateStorage> aggregate_storage_;
   MetadataBuilder *metadata_builder_;
diff --git a/src/local_aggregation_1_1/observation_generator_test.cc b/src/local_aggregation_1_1/observation_generator_test.cc
index 4357b0d..c1a00cf 100644
--- a/src/local_aggregation_1_1/observation_generator_test.cc
+++ b/src/local_aggregation_1_1/observation_generator_test.cc
@@ -14,13 +14,19 @@
 #include "src/lib/util/datetime_util.h"
 #include "src/lib/util/testing/test_with_files.h"
 #include "src/local_aggregation_1_1/aggregation_procedures/aggregation_procedure.h"
+#include "src/local_aggregation_1_1/backfill_manager.h"
 #include "src/local_aggregation_1_1/local_aggregate_storage/immediate_local_aggregate_storage.h"
 #include "src/local_aggregation_1_1/local_aggregate_storage/local_aggregate_storage.h"
 #include "src/local_aggregation_1_1/local_aggregation.pb.h"
 #include "src/local_aggregation_1_1/testing/test_registry.cb.h"
+#include "src/logger/fake_logger.h"
+#include "src/logger/internal_metrics_config.cb.h"
 #include "src/logger/observation_writer.h"
 #include "src/logger/privacy_encoder.h"
 #include "src/logger/project_context_factory.h"
+#include "src/logger/status.h"
+#include "src/logger/types.h"
+#include "src/observation_store/file_observation_store.h"
 #include "src/observation_store/observation_store.h"
 #include "src/observation_store/observation_store_internal.pb.h"
 #include "src/pb/metadata_builder.h"
@@ -120,7 +126,9 @@
     return observation_generator_->GenerateObservationsOnce(utc, local);
   }
 
- private:
+  friend class BasicLogger;
+
+ protected:
   system_data::FakeSystemData system_data_;
 
   std::unique_ptr<MetadataBuilder> metadata_builder_;
@@ -284,4 +292,52 @@
   }
 }
 
+class BasicLogger : public logger::testing::FakeLogger {
+ public:
+  explicit BasicLogger(ObservationGeneratorTest* test) : test_(test) {}
+
+  logger::Status LogInteger(uint32_t metric_id, int64_t value,
+                            const std::vector<uint32_t>& event_codes) override {
+    // Lock the aggregate store to simulate actually performing log.
+    test_->aggregate_storage_->GetMetricAggregate(1, 1, 1);
+    return logger::testing::FakeLogger::LogInteger(metric_id, value, event_codes);
+  }
+
+ private:
+  ObservationGeneratorTest* test_;
+};
+
+TEST_F(ObservationGeneratorTest, DoesNotDeadlock) {
+  aggregate_storage_ =
+      LocalAggregateStorage::New(LocalAggregateStorage::StorageStrategy::Delayed, test_folder(),
+                                 fs(), project_context_factory_.get());
+  observation_store::FileObservationStore obs_store(10000, 10000, 10000, fs(),
+                                                    test_folder() + "/obs_store");
+  BasicLogger logger(this);
+  obs_store.ResetInternalMetrics(&logger);
+
+  const uint32_t kMaxHourId = 101;
+
+  {
+    MetricAggregateRef aggregate = GetMetricAggregate(kOccurrenceMetricMetricId);
+    ReportAggregate* report =
+        &(*aggregate.aggregate()
+               ->mutable_by_report_id())[kOccurrenceMetricFleetwideOccurrenceCountsReportReportId];
+    for (uint32_t i = 1; i <= kMaxHourId; i += 2) {
+      (*report->mutable_hourly()->mutable_by_hour_id())[i]
+          .add_by_event_code()
+          ->mutable_data()
+          ->set_count(i * 100);
+    }
+    ASSERT_TRUE(aggregate.Save().ok());
+  }
+
+  logger::ObservationWriter observation_writer(&obs_store, nullptr);
+  ConstructObservationGenerator(&observation_writer, std::make_unique<FakePrivacyEncoder>(false));
+
+  for (uint32_t i = 1; i <= kMaxHourId; i += 4) {
+    GenerateObservationsOnce(TimeInfo::FromHourId(i), TimeInfo::FromHourId(i));
+  }
+}
+
 }  // namespace cobalt::local_aggregation
diff --git a/src/logger/fake_logger.cc b/src/logger/fake_logger.cc
index 6dd9a21..16e5bf1 100644
--- a/src/logger/fake_logger.cc
+++ b/src/logger/fake_logger.cc
@@ -44,7 +44,7 @@
   Event event;
   event.set_metric_id(metric_id);
   event.mutable_event_occurred_event()->set_event_code(event_code);
-  last_event_logged_ = event;
+  TrackEvent(event);
 
   return Status::kOK;
 }
@@ -60,7 +60,7 @@
   CopyEventCodesAndComponent(event_codes, component, event_count_event);
   event_count_event->set_period_duration_micros(period_duration_micros);
   event_count_event->set_count(count);
-  last_event_logged_ = event;
+  TrackEvent(event);
 
   return Status::kOK;
 }
@@ -74,7 +74,7 @@
   auto* elapsed_time_event = event.mutable_elapsed_time_event();
   CopyEventCodesAndComponent(event_codes, component, elapsed_time_event);
   elapsed_time_event->set_elapsed_micros(elapsed_micros);
-  last_event_logged_ = event;
+  TrackEvent(event);
 
   return Status::kOK;
 }
@@ -89,7 +89,7 @@
   CopyEventCodesAndComponent(event_codes, component, frame_rate_event);
   // NOLINTNEXTLINE readability-magic-numbers
   frame_rate_event->set_frames_per_1000_seconds(std::round(fps * 1000.0));
-  last_event_logged_ = event;
+  TrackEvent(event);
 
   return Status::kOK;
 }
@@ -103,7 +103,7 @@
   auto* memory_usage_event = event.mutable_memory_usage_event();
   CopyEventCodesAndComponent(event_codes, component, memory_usage_event);
   memory_usage_event->set_bytes(bytes);
-  last_event_logged_ = event;
+  TrackEvent(event);
 
   return Status::kOK;
 }
@@ -117,7 +117,7 @@
   auto* int_histogram_event = event.mutable_int_histogram_event();
   CopyEventCodesAndComponent(event_codes, component, int_histogram_event);
   int_histogram_event->mutable_buckets()->Swap(histogram.get());
-  last_event_logged_ = event;
+  TrackEvent(event);
 
   return Status::kOK;
 }
@@ -131,7 +131,7 @@
   auto* occurrence_event = event.mutable_occurrence_event();
   CopyEventCodes(event_codes, occurrence_event);
   occurrence_event->set_count(count);
-  last_event_logged_ = event;
+  TrackEvent(event);
 
   return Status::kOK;
 }
@@ -145,7 +145,7 @@
   auto* integer_event = event.mutable_integer_event();
   CopyEventCodes(event_codes, integer_event);
   integer_event->set_value(value);
-  last_event_logged_ = event;
+  TrackEvent(event);
 
   return Status::kOK;
 }
@@ -159,7 +159,7 @@
   auto* integer_histogram_event = event.mutable_integer_histogram_event();
   CopyEventCodes(event_codes, integer_histogram_event);
   integer_histogram_event->mutable_buckets()->Swap(histogram.get());
-  last_event_logged_ = event;
+  TrackEvent(event);
 
   return Status::kOK;
 }
@@ -173,7 +173,7 @@
   auto* string_event = event.mutable_string_event();
   CopyEventCodes(event_codes, string_event);
   string_event->set_string_value(string_value);
-  last_event_logged_ = event;
+  TrackEvent(event);
 
   return Status::kOK;
 }
@@ -184,7 +184,7 @@
   Event event;
   event.set_metric_id(metric_id);
   event.mutable_custom_event()->mutable_values()->swap(*event_values);
-  last_event_logged_ = event;
+  TrackEvent(event);
 
   return Status::kOK;
 }
diff --git a/src/logger/fake_logger.h b/src/logger/fake_logger.h
index fb63c1f..93203a6 100644
--- a/src/logger/fake_logger.h
+++ b/src/logger/fake_logger.h
@@ -75,14 +75,20 @@
 
   void ResumeInternalLogging() override { internal_logging_paused_ = false; }
 
-  uint32_t call_count() { return call_count_; }
-  Event last_event_logged() { return last_event_logged_; }
+  void TrackEvent(Event e) {
+    events_logged_.push_back(e);
+    last_event_logged_ = std::move(e);
+  }
+  uint32_t call_count() const { return call_count_; }
+  Event last_event_logged() const { return last_event_logged_; }
+  Event nth_event_logged(int64_t n) const { return events_logged_[n]; }
   std::map<PerProjectLoggerCallsMadeMetricDimensionLoggerMethod, uint32_t> internal_logger_calls() {
     return internal_logger_calls_;
   }
 
  private:
   Event last_event_logged_;
+  std::vector<Event> events_logged_;
   uint32_t call_count_ = 0;
   std::map<PerProjectLoggerCallsMadeMetricDimensionLoggerMethod, uint32_t> internal_logger_calls_;
   bool internal_logging_paused_ = false;
diff --git a/src/logger/internal_metrics.cc b/src/logger/internal_metrics.cc
index 34c254c..8d10907 100644
--- a/src/logger/internal_metrics.cc
+++ b/src/logger/internal_metrics.cc
@@ -8,6 +8,7 @@
 #include <string>
 #include <utility>
 
+#include "src/logger/internal_metrics_config.cb.h"
 #include "src/logging.h"
 
 namespace cobalt::logger {
@@ -168,6 +169,32 @@
                "status="
             << status;
   }
-}  // namespace cobalt::logger
+}
+
+const float kPerMilleMultiplier = 1000.0;
+void InternalMetricsImpl::TrackDiskUsage(StorageClass storage_class, int64_t bytes,
+                                         int64_t max_bytes) {
+  if (paused_) {
+    return;
+  }
+
+  // N.B. This method may only include Cobalt 1.1 metrics. Using Cobalt 1.0 metrics here have the
+  // potential to cause logging loops.
+  auto status = logger_->LogInteger(kStoreUsageMetricId, bytes, {storage_class});
+  if (status != kOK) {
+    VLOG(1) << "InternalMetricsImpl::TrackDiskUsage: LogInteger(disk_usage) returned status="
+            << status;
+  }
+
+  if (max_bytes >= 0) {
+    auto fullness_per_mille = static_cast<uint32_t>(
+        (static_cast<float>(bytes) / static_cast<float>(max_bytes)) * kPerMilleMultiplier);
+    status = logger_->LogInteger(kStoreFullnessMetricId, fullness_per_mille, {storage_class});
+    if (status != kOK) {
+      VLOG(1) << "InternalMetricsImpl::TrackDiskUsage: LogInteger(disk_fullness) returned status="
+              << status;
+    }
+  }
+}
 
 }  // namespace cobalt::logger
diff --git a/src/logger/internal_metrics.h b/src/logger/internal_metrics.h
index 62a269e..abfd331 100644
--- a/src/logger/internal_metrics.h
+++ b/src/logger/internal_metrics.h
@@ -12,8 +12,7 @@
 #include "src/logger/logger_interface.h"
 #include "src/registry/project.pb.h"
 
-namespace cobalt {
-namespace logger {
+namespace cobalt::logger {
 
 // InternalMetrics defines the methods used for collecting cobalt-internal
 // metrics.
@@ -68,6 +67,20 @@
   virtual void SetSoftwareDistributionInfoCalled(
       SetSoftwareDistributionInfoCalledEventCodes event_codes) = 0;
 
+  // Used to mark which class this disk usage is coming from. Can be:
+  // - ObservationStore
+  // - LocalAggregateStorage
+  // - AggregateStore
+  // - ObservationHistory
+  using StorageClass = MetricsMetricDimensionStorageClass;
+
+  // cobalt_internal::metrics::storage_usage is used to track how much data is stored per-class on
+  // disk.
+  virtual void TrackDiskUsage(StorageClass storage_class, int64_t bytes, int64_t max_bytes) = 0;
+  void TrackDiskUsage(StorageClass storage_class, int64_t bytes) {
+    TrackDiskUsage(storage_class, bytes, -1);
+  }
+
   // After PauseLogging is called, all calls to log internal metrics will be
   // ignored.
   virtual void PauseLogging() = 0;
@@ -113,6 +126,8 @@
   void SetSoftwareDistributionInfoCalled(
       SetSoftwareDistributionInfoCalledEventCodes event_codes) override {}
 
+  void TrackDiskUsage(StorageClass storage_class, int64_t bytes, int64_t max_bytes) override {}
+
   void PauseLogging() override {}
   void ResumeLogging() override {}
 
@@ -151,6 +166,8 @@
   void SetSoftwareDistributionInfoCalled(
       SetSoftwareDistributionInfoCalledEventCodes event_codes) override;
 
+  void TrackDiskUsage(StorageClass storage_class, int64_t bytes, int64_t max_bytes) override;
+
   void PauseLogging() override { paused_ = true; }
   void ResumeLogging() override { paused_ = false; }
 
@@ -163,7 +180,6 @@
   LoggerInterface* logger_;  // not owned
 };
 
-}  // namespace logger
-}  // namespace cobalt
+}  // namespace cobalt::logger
 
 #endif  // COBALT_SRC_LOGGER_INTERNAL_METRICS_H_
diff --git a/src/logger/internal_metrics_test.cc b/src/logger/internal_metrics_test.cc
index 28e7b31..f72ba79 100644
--- a/src/logger/internal_metrics_test.cc
+++ b/src/logger/internal_metrics_test.cc
@@ -182,4 +182,16 @@
   ASSERT_EQ(logger.call_count(), 0);
 }
 
+TEST_F(InternalMetricsImplTest, TrackDiskUsageWorks) {
+  testing::FakeLogger logger;
+  InternalMetricsImpl metrics(&logger);
+
+  metrics.TrackDiskUsage(InternalMetrics::StorageClass::ObservationStore, 90, 100);
+  ASSERT_EQ(logger.call_count(), 2);
+  ASSERT_TRUE(logger.nth_event_logged(0).has_integer_event());
+  ASSERT_EQ(logger.nth_event_logged(0).integer_event().value(), 90);
+  ASSERT_TRUE(logger.nth_event_logged(1).has_integer_event());
+  ASSERT_EQ(logger.nth_event_logged(1).integer_event().value(), 900);
+}
+
 }  // namespace cobalt::logger
diff --git a/src/observation_store/file_observation_store.cc b/src/observation_store/file_observation_store.cc
index 8d1ae7d..408b896 100644
--- a/src/observation_store/file_observation_store.cc
+++ b/src/observation_store/file_observation_store.cc
@@ -9,6 +9,7 @@
 #include <regex>
 #include <utility>
 
+#include "src/logger/internal_metrics_config.cb.h"
 #include "src/logger/logger_interface.h"
 #include "src/logging.h"
 #include "src/observation_store/observation_store_internal.pb.h"
@@ -158,6 +159,7 @@
                        .report_id = report_id}]++;
   internal_metrics_->BytesStored(logger::PerProjectBytesStoredMetricDimensionStatus::Succeeded,
                                  obs_size, metadata->customer_id(), metadata->project_id());
+
   return kOk;
 }
 
@@ -274,6 +276,12 @@
 }
 
 std::unique_ptr<ObservationStore::EnvelopeHolder> FileObservationStore::TakeNextEnvelopeHolder() {
+  // Cannot cause a loop, since TrackDiskUsage is locally aggregated, and is sent at most once per
+  // hour.
+  internal_metrics_->TrackDiskUsage(logger::InternalMetrics::StorageClass::ObservationStore,
+                                    static_cast<int64_t>(Size()),
+                                    static_cast<int64_t>(max_bytes_total_));
+
   auto fields = protected_fields_.lock();
 
   auto oldest_file_name_or = GetOldestFinalizedFile(&fields);
@@ -330,6 +338,13 @@
     fields->finalized_bytes -= fs_->FileSize(FullPath(file_name)).ConsumeValueOr(0);
     fs_->Delete(FullPath(file_name));
   }
+  fields.unlock();
+
+  // Cannot cause a loop, since TrackDiskUsage is locally aggregated, and is sent at most once per
+  // hour.
+  store_->internal_metrics_->TrackDiskUsage(logger::InternalMetrics::StorageClass::ObservationStore,
+                                            static_cast<int64_t>(store_->Size()),
+                                            static_cast<int64_t>(store_->max_bytes_total_));
 }
 
 void FileObservationStore::FileEnvelopeHolder::MergeWith(
diff --git a/src/public/cobalt_service.cc b/src/public/cobalt_service.cc
index fe22c12..487180e 100644
--- a/src/public/cobalt_service.cc
+++ b/src/public/cobalt_service.cc
@@ -124,12 +124,12 @@
     LOG(ERROR) << "The global_registry provided does not include the expected internal metrics "
                   "project. Cobalt-measuring-cobalt will be disabled.";
   } else {
-    // This sets up internal metrics for both the ShippingManager and the ClearcutUploader.
+    // Set up internal metrics for various components.
     shipping_manager_->ResetInternalMetrics(internal_logger_.get());
-    // And this sets it up for the ObservationStore.
     observation_store_->ResetInternalMetrics(internal_logger_.get());
-    // And this sets it up for SystemData.
     system_data_.ResetInternalMetrics(internal_logger_.get());
+    event_aggregator_manager_.ResetInternalMetrics(internal_logger_.get());
+    local_aggregation_.ResetInternalMetrics(internal_logger_.get());
   }
   if (start_worker_threads_) {
     shipping_manager_->Start();