[InternalMetrics] Create InternalMetricsFlusher

This is used to make sure that InternalMetrics will always get flushed
even if a given method returns early. Any code that uses internal
metrics should make sure that there will always be an
InternalMetricsFlusher, or they will crash in debug mode. This should
make it much harder to introduce a regression that causes unbound
memory increases in the internal metrics logging queue.

Possible root cause of fxbug.dev/115140

Also includes:
- https://fuchsia-review.googlesource.com/c/cobalt/+/776788
- https://fuchsia-review.googlesource.com/c/cobalt/+/775543

Bug: 115140
Change-Id: Idd3afff192ca3dc7a36389a84cb4b80a4be419c7
Reviewed-on: https://fuchsia-review.googlesource.com/c/cobalt/+/770305
Reviewed-by: Cameron Dale <camrdale@google.com>
Fuchsia-Auto-Submit: Zach Bush <zmbush@google.com>
Commit-Queue: Auto-Submit <auto-submit@fuchsia-infra.iam.gserviceaccount.com>
(cherry picked from commit 74d7f35055fac45427e5fae116330ba0effc80b4)
diff --git a/src/local_aggregation/local_aggregate_storage/delayed_local_aggregate_storage.cc b/src/local_aggregation/local_aggregate_storage/delayed_local_aggregate_storage.cc
index b92b9ba..8710d9e 100644
--- a/src/local_aggregation/local_aggregate_storage/delayed_local_aggregate_storage.cc
+++ b/src/local_aggregation/local_aggregate_storage/delayed_local_aggregate_storage.cc
@@ -222,8 +222,12 @@
         std::scoped_lock<std::mutex> data_lock(data_mutex_);
         size = static_cast<int64_t>(aggregates_.ByteSizeLong());
       }
-      internal_metrics_->TrackDiskUsage(
-          logger::InternalMetrics::StorageClass::LocalAggregateStorage, size);
+
+      {
+        logger::InternalMetrics::InternalMetricsFlusher flusher = internal_metrics_->Flusher();
+        internal_metrics_->TrackDiskUsage(
+            logger::InternalMetrics::StorageClass::LocalAggregateStorage, size);
+      }
 
       // Lock the class so no new calls to GetMetricAggregate will return until the data has
       // written.
diff --git a/src/local_aggregation/local_aggregation.h b/src/local_aggregation/local_aggregation.h
index f6eae57..0506ae4 100644
--- a/src/local_aggregation/local_aggregation.h
+++ b/src/local_aggregation/local_aggregation.h
@@ -78,6 +78,7 @@
   void DeleteData() { aggregate_storage_->DeleteData(); }
 
   void ResetInternalMetrics(logger::InternalMetrics *internal_metrics) {
+    observation_generator_.ResetInternalMetrics(internal_metrics);
     aggregate_storage_->ResetInternalMetrics(internal_metrics);
   }
 
diff --git a/src/local_aggregation/observation_generator.cc b/src/local_aggregation/observation_generator.cc
index 6fd1278..7138ff2 100644
--- a/src/local_aggregation/observation_generator.cc
+++ b/src/local_aggregation/observation_generator.cc
@@ -111,6 +111,7 @@
 
 Status ObservationGenerator::GenerateObservationsOnce(
     std::chrono::system_clock::time_point system_time) {
+  logger::InternalMetrics::InternalMetricsFlusher flusher = internal_metrics_->Flusher();
   CivilTimeManager civil_time_mgr(system_time, civil_time_converter_);
   util::TimeInfo utc_time_info = civil_time_mgr.GetInitialUtc();
   LOG(INFO) << "Generating aggregated observations for periods ending before system time: "
diff --git a/src/local_aggregation/observation_generator.h b/src/local_aggregation/observation_generator.h
index b22d4be..89e4678 100644
--- a/src/local_aggregation/observation_generator.h
+++ b/src/local_aggregation/observation_generator.h
@@ -78,6 +78,10 @@
   // can be called manually while testing to avoid having to wait.
   Status GenerateObservationsOnce(std::chrono::system_clock::time_point system_time);
 
+  void ResetInternalMetrics(logger::InternalMetrics* internal_metrics) {
+    internal_metrics_.reset(internal_metrics);
+  }
+
  private:
   void Run(util::SystemClockInterface* clock);
 
@@ -113,6 +117,7 @@
   const logger::ProjectContextFactory& global_project_context_factory_;
   system_data::SystemDataInterface& system_data_;
   const logger::ObservationWriter& observation_writer_;
+  logger::InternalMetricsPtr internal_metrics_;
   std::unique_ptr<util::SteadyClockInterface> steady_clock_;
   std::chrono::steady_clock::time_point next_generate_obs_;
   std::chrono::seconds generate_obs_interval_;
diff --git a/src/local_aggregation/observation_generator_test.cc b/src/local_aggregation/observation_generator_test.cc
index a5c5d40..d0ce2df 100644
--- a/src/local_aggregation/observation_generator_test.cc
+++ b/src/local_aggregation/observation_generator_test.cc
@@ -146,12 +146,14 @@
   void ConstructObservationGenerator(const logger::ObservationWriter& observation_writer,
                                      std::unique_ptr<FakePrivacyEncoder> privacy_encoder,
                                      util::CivilTimeConverterInterface& civil_time_converter,
+                                     logger::InternalMetrics* internal_metrics = nullptr,
                                      bool generate_observations_with_current_system_profile = false,
                                      bool test_dont_backfill_empty_reports = false) {
     observation_generator_ = std::make_unique<ObservationGenerator>(
         *aggregate_storage_, *project_context_factory_, system_data_, observation_writer,
         std::move(privacy_encoder), civil_time_converter,
         generate_observations_with_current_system_profile, test_dont_backfill_empty_reports);
+    observation_generator_->ResetInternalMetrics(internal_metrics);
   }
 
   void TearDown() override {
@@ -1093,7 +1095,7 @@
 
   logger::ObservationWriter observation_writer(test_writer, nullptr);
   ConstructObservationGenerator(observation_writer, std::make_unique<FakePrivacyEncoder>(false),
-                                *converter,
+                                *converter, /*internal_metrics=*/nullptr,
                                 /*generate_observations_with_current_system_profile=*/true);
   // Generate observations for the day with day index `start_day_index`.
   GenerateObservationsOnce(starting_time_ + std::chrono::hours(util::kNumHoursPerDay));
@@ -1168,7 +1170,7 @@
 
   logger::ObservationWriter observation_writer(test_writer, nullptr);
   ConstructObservationGenerator(observation_writer, std::make_unique<FakePrivacyEncoder>(false),
-                                *converter,
+                                *converter, /*internal_metrics=*/nullptr,
                                 /*generate_observations_with_current_system_profile=*/true);
   // Generate observations for the day with day index `start_day_index`.
   GenerateObservationsOnce(starting_time_ + std::chrono::hours(util::kNumHoursPerDay));
@@ -1271,7 +1273,7 @@
 
   logger::ObservationWriter observation_writer(obs_store, nullptr);
   ConstructObservationGenerator(observation_writer, std::make_unique<FakePrivacyEncoder>(false),
-                                *converter);
+                                *converter, &internal_metrics);
 
   for (uint32_t i = starting_time_info.hour_id; i < starting_time_info.hour_id + kMaxHourOffset;
        i += 4) {
@@ -1289,6 +1291,7 @@
 
   void ConstructObservationGenerator(const logger::ObservationWriter& observation_writer,
                                      util::CivilTimeConverterInterface& civil_time_converter,
+                                     logger::InternalMetrics* internal_metrics = nullptr,
                                      bool generate_observations_with_current_system_profile = false,
                                      bool test_dont_backfill_empty_reports = false) {
     std::unique_ptr<logger::PrivacyEncoder> privacy_encoder =
@@ -1299,6 +1302,7 @@
         *aggregate_storage_, *project_context_factory_, system_data_, observation_writer,
         std::move(privacy_encoder), civil_time_converter,
         generate_observations_with_current_system_profile, test_dont_backfill_empty_reports);
+    observation_generator_->ResetInternalMetrics(internal_metrics);
   }
 };
 
diff --git a/src/logger/internal_metrics.cc b/src/logger/internal_metrics.cc
index 8d234c0..b8d4e37 100644
--- a/src/logger/internal_metrics.cc
+++ b/src/logger/internal_metrics.cc
@@ -45,7 +45,7 @@
 
 void InternalMetricsImpl::LoggerCalled(LoggerCallsMadeMigratedMetricDimensionLoggerMethod method,
                                        const Project& project) {
-  fields_.lock()->queued_lambdas.emplace_back([this, method, project] {
+  Queue([this, method, project] {
     auto status = logger_.LogOccurrence(kLoggerCallsMadeMigratedMetricId, 1, {method});
 
     if (!status.ok()) {
@@ -66,7 +66,7 @@
 
 void InternalMetricsImpl::BytesUploaded(BytesUploadedMetricDimensionStatus upload_status,
                                         size_t byte_count) {
-  fields_.lock()->queued_lambdas.emplace_back([this, upload_status, byte_count] {
+  Queue([this, upload_status, byte_count] {
     Status status = logger_.LogOccurrence(kBytesUploadedMetricId, static_cast<uint64_t>(byte_count),
                                           {upload_status});
 
@@ -80,27 +80,25 @@
 void InternalMetricsImpl::BytesUploaded(
     PerProjectBytesUploadedMigratedMetricDimensionStatus upload_status, size_t byte_count,
     const lib::ProjectIdentifier& project_identifier) {
-  fields_.lock()->queued_lambdas.emplace_back(
-      [this, upload_status, byte_count, project_identifier] {
-        std::ostringstream component;
-        component << project_identifier.customer_id() << '/' << project_identifier.project_id();
+  Queue([this, upload_status, byte_count, project_identifier] {
+    std::ostringstream component;
+    component << project_identifier.customer_id() << '/' << project_identifier.project_id();
 
-        auto status = logger_.LogInteger(kPerProjectBytesUploadedMigratedMetricId,
-                                         static_cast<int64_t>(byte_count),
-                                         {GetProjectEventCode(component.str()), upload_status});
+    auto status = logger_.LogInteger(kPerProjectBytesUploadedMigratedMetricId,
+                                     static_cast<int64_t>(byte_count),
+                                     {GetProjectEventCode(component.str()), upload_status});
 
-        if (!status.ok()) {
-          VLOG(1) << "InternalMetricsImpl::BytesUploaded: LogInteger() returned "
-                  << "status=" << status;
-        }
-      });
+    if (!status.ok()) {
+      VLOG(1) << "InternalMetricsImpl::BytesUploaded: LogInteger() returned "
+              << "status=" << status;
+    }
+  });
 }
 
 void InternalMetricsImpl::BytesStored(
     PerProjectBytesStoredMigratedMetricDimensionStatus upload_status, size_t byte_count,
     const lib::ProjectIdentifier& project_identifier) {
-  fields_.lock()->queued_lambdas.emplace_back([this, upload_status, byte_count,
-                                               project_identifier] {
+  Queue([this, upload_status, byte_count, project_identifier] {
     std::ostringstream component;
     component << project_identifier.customer_id() << '/' << project_identifier.project_id();
 
@@ -115,7 +113,7 @@
 }
 
 void InternalMetricsImpl::InaccurateClockEventsCached(int64_t event_count) {
-  fields_.lock()->queued_lambdas.emplace_back([this, event_count] {
+  Queue([this, event_count] {
     auto status =
         logger_.LogOccurrence(kInaccurateClockEventsCachedMigratedMetricId, event_count, {});
 
@@ -128,7 +126,7 @@
 }
 
 void InternalMetricsImpl::InaccurateClockEventsDropped(int64_t event_count) {
-  fields_.lock()->queued_lambdas.emplace_back([this, event_count] {
+  Queue([this, event_count] {
     auto status =
         logger_.LogOccurrence(kInaccurateClockEventsDroppedMigratedMetricId, event_count, {});
 
@@ -142,7 +140,7 @@
 
 void InternalMetricsImpl::SetSoftwareDistributionInfoCalled(
     SetSoftwareDistributionInfoCalledMigratedEventCodes event_codes) {
-  fields_.lock()->queued_lambdas.emplace_back([this, event_codes] {
+  Queue([this, event_codes] {
     auto status = logger_.LogOccurrence(kSetSoftwareDistributionInfoCalledMigratedMetricId, 1,
                                         event_codes.ToVector());
 
@@ -157,7 +155,7 @@
 const float kPerMilleMultiplier = 1000.0;
 void InternalMetricsImpl::TrackDiskUsage(StorageClass storage_class, size_t bytes,
                                          int64_t max_bytes) {
-  fields_.lock()->queued_lambdas.emplace_back([this, storage_class, bytes, max_bytes] {
+  Queue([this, storage_class, bytes, max_bytes] {
     // N.B. This method may only include Cobalt 1.1 metrics. Using Cobalt 1.0 metrics here have the
     // potential to cause logging loops.
     auto status =
@@ -185,16 +183,41 @@
 
 void InternalMetricsImpl::LocalAggregationQuotaEvent(
     const lib::ProjectIdentifier& project_identifier, int event_type) {
-  fields_.lock()->queued_lambdas.emplace_back([this, project_identifier, event_type] {
+  Queue([this, project_identifier, event_type] {
     if (diagnostics_ != nullptr) {
       diagnostics_->LocalAggregationQuotaEvent(project_identifier, event_type);
     }
   });
 }
 
+void InternalMetricsImpl::Queue(std::function<void()>&& lambda) {
+  auto fields = fields_.lock();
+  // In debug code we crash when a lambda is queued without having first created an
+  // InternalMetricsFlusher. This should hopefully avoid introducing any instances of queue without
+  // flush, but we don't want to crash in production in case we miss some cases.
+  DCHECK(fields->flusher_count > 0)
+      << "An internal metric was queued without a corresponding InternalMetricsFlusher. This can "
+         "potentially cause memory leaks in production code.";
+  fields->queued_lambdas.emplace_back(std::move(lambda));
+}
+
+void InternalMetricsImpl::IncrementFlushers() { fields_.lock()->flusher_count += 1; }
+
 void InternalMetricsImpl::Flush() {
   std::vector<std::function<void()>> queued;
-  fields_.lock()->queued_lambdas.swap(queued);
+  {
+    auto fields = fields_.lock();
+    if (fields->flusher_count <= 0) {
+      LOG_FIRST_N(ERROR, 10) << "Flush called without corresponding InternalMetricsFlusher. This "
+                                "should not be possible.";
+    }
+    fields->flusher_count = std::max(fields->flusher_count - 1, 0);
+    // Only perform the flush once the last flusher has been dropped.
+    if (fields->flusher_count > 0) {
+      return;
+    }
+    fields->queued_lambdas.swap(queued);
+  }
 
   for (const auto& task : queued) {
     task();
diff --git a/src/logger/internal_metrics.h b/src/logger/internal_metrics.h
index 0a3f320..5c99b83 100644
--- a/src/logger/internal_metrics.h
+++ b/src/logger/internal_metrics.h
@@ -23,6 +23,21 @@
 // metrics.
 class InternalMetrics {
  public:
+  // InternalMetricsFlusher calls Flush on the contained InternalMetrics whenever it goes out of
+  // scope.
+  class InternalMetricsFlusher {
+   public:
+    ~InternalMetricsFlusher() { metrics_->Flush(); }
+
+   private:
+    friend class InternalMetrics;
+    // We only want InternalMetrics to be able to construct an InternalMetricsFlusher.
+    explicit InternalMetricsFlusher(InternalMetrics* metrics) : metrics_(metrics) {
+      metrics_->IncrementFlushers();
+    }
+    InternalMetrics* metrics_;
+  };
+
   // Returns a pointer to an InternalMetrics object which can be used for
   // collecting cobalt-internal metrics.
   //
@@ -91,9 +106,27 @@
   virtual void LocalAggregationQuotaEvent(const lib::ProjectIdentifier& project_identifier,
                                           int event_type) = 0;
 
-  // Flush queued internal metrics.
+  // Used to ensure that internal metrics data is flushed before a scope is exited.
+  //
+  // All calls to InternalMetrics methods should only be made after a Flusher has been created.
+  InternalMetricsFlusher Flusher() { return InternalMetricsFlusher(this); }
+
+ protected:
+  friend class InternalMetricsPauseWrapper;
+  // Flushes queued internal metrics.
+  //
+  // This is necessary because we collect some metrics in places where a lock is
+  // held (for example inside of LocalAggregation). If we immediately logged
+  // again, this would cause a deadlock as we are still holding the lock, but
+  // the logger call requires taking it again. With the queue-and-flush model,
+  // we make sure that all of the internal metrics logging is done once at the
+  // end when no more locks are held.
   virtual void Flush() = 0;
 
+  // Notifies the internal metrics implementation that a flusher has been created.
+  virtual void IncrementFlushers() = 0;
+
+ public:
   // Returns true if this is an instance of the real implementation of
   // InternalMetrics that will really do internal logging and false if this
   // is a fake or no-op implementation.
@@ -134,8 +167,11 @@
   void LocalAggregationQuotaEvent(const lib::ProjectIdentifier& project_identifier,
                                   int event_type) override {}
 
+ protected:
   void Flush() override {}
+  void IncrementFlushers() override {}
 
+ public:
   [[nodiscard]] bool IsRealImpl() const override { return false; }
 
   ~NoOpInternalMetrics() override = default;
@@ -170,14 +206,20 @@
   void LocalAggregationQuotaEvent(const lib::ProjectIdentifier& project_identifier,
                                   int event_type) override;
 
+ protected:
   void Flush() override;
+  void IncrementFlushers() override;
 
+ public:
   [[nodiscard]] bool IsRealImpl() const override { return true; }
 
   ~InternalMetricsImpl() override = default;
 
  private:
+  void Queue(std::function<void()>&& lambda);
+
   struct Protected {
+    int32_t flusher_count = 0;
     std::vector<std::function<void()>> queued_lambdas;
   };
   util::ProtectedFields<Protected> fields_;
@@ -284,6 +326,7 @@
   void ResumeLogging() { paused_ = false; }
 
   void Flush() override { wrapped_internal_metrics_->Flush(); }
+  void IncrementFlushers() override { wrapped_internal_metrics_->IncrementFlushers(); }
 
   [[nodiscard]] bool IsRealImpl() const override { return wrapped_internal_metrics_->IsRealImpl(); }
 
diff --git a/src/logger/internal_metrics_test.cc b/src/logger/internal_metrics_test.cc
index f0e6eec..cc9944b 100644
--- a/src/logger/internal_metrics_test.cc
+++ b/src/logger/internal_metrics_test.cc
@@ -34,9 +34,11 @@
   testing::FakeLogger logger;
   InternalMetricsImpl metrics(logger, nullptr);
 
-  metrics.LoggerCalled(LoggerCallsMadeMigratedMetricDimensionLoggerMethod::LogMemoryUsage,
-                       GetTestProject());
-  metrics.Flush();
+  {
+    InternalMetrics::InternalMetricsFlusher flusher = metrics.Flusher();
+    metrics.LoggerCalled(LoggerCallsMadeMigratedMetricDimensionLoggerMethod::LogMemoryUsage,
+                         GetTestProject());
+  }
 
   ASSERT_EQ(logger.call_count(), 1);
   ASSERT_TRUE(logger.last_event_logged().has_occurrence_event());
@@ -49,13 +51,15 @@
   InternalMetricsImpl inner_metrics(logger, nullptr);
   InternalMetricsPauseWrapper metrics(&inner_metrics);
 
-  metrics.PauseLogging();
-  for (int i = 0; i < kMany; i++) {
-    metrics.LoggerCalled(LoggerCallsMadeMigratedMetricDimensionLoggerMethod::LogMemoryUsage,
-                         GetTestProject());
+  {
+    InternalMetrics::InternalMetricsFlusher flusher = metrics.Flusher();
+    metrics.PauseLogging();
+    for (int i = 0; i < kMany; i++) {
+      metrics.LoggerCalled(LoggerCallsMadeMigratedMetricDimensionLoggerMethod::LogMemoryUsage,
+                           GetTestProject());
+    }
+    metrics.ResumeLogging();
   }
-  metrics.ResumeLogging();
-  metrics.Flush();
 
   ASSERT_EQ(logger.call_count(), 0);
 }
@@ -65,8 +69,10 @@
   InternalMetricsImpl metrics(logger, nullptr);
 
   ASSERT_EQ(logger.call_count(), 0);
-  metrics.BytesUploaded(BytesUploadedMetricDimensionStatus::Attempted, kNumBytes);
-  metrics.Flush();
+  {
+    InternalMetrics::InternalMetricsFlusher flusher = metrics.Flusher();
+    metrics.BytesUploaded(BytesUploadedMetricDimensionStatus::Attempted, kNumBytes);
+  }
 
   ASSERT_EQ(logger.call_count(), 1);
   ASSERT_TRUE(logger.last_event_logged().has_occurrence_event());
@@ -79,12 +85,14 @@
   InternalMetricsImpl inner_metrics(logger, nullptr);
   InternalMetricsPauseWrapper metrics(&inner_metrics);
 
-  metrics.PauseLogging();
-  for (int i = 0; i < kMany; i++) {
-    metrics.BytesUploaded(BytesUploadedMetricDimensionStatus::Attempted, kNumBytes);
+  {
+    InternalMetrics::InternalMetricsFlusher flusher = metrics.Flusher();
+    metrics.PauseLogging();
+    for (int i = 0; i < kMany; i++) {
+      metrics.BytesUploaded(BytesUploadedMetricDimensionStatus::Attempted, kNumBytes);
+    }
+    metrics.ResumeLogging();
   }
-  metrics.ResumeLogging();
-  metrics.Flush();
   ASSERT_EQ(logger.call_count(), 0);
 }
 
@@ -93,10 +101,12 @@
   InternalMetricsImpl metrics(logger, nullptr);
 
   ASSERT_EQ(logger.call_count(), 0);
-  metrics.BytesUploaded(
-      PerProjectBytesUploadedMigratedMetricDimensionStatus::Attempted, kNumBytes,
-      lib::ProjectIdentifier(lib::CustomerIdentifier(kTestCustomerId), kTestProjectId));
-  metrics.Flush();
+  {
+    InternalMetrics::InternalMetricsFlusher flusher = metrics.Flusher();
+    metrics.BytesUploaded(
+        PerProjectBytesUploadedMigratedMetricDimensionStatus::Attempted, kNumBytes,
+        lib::ProjectIdentifier(lib::CustomerIdentifier(kTestCustomerId), kTestProjectId));
+  }
 
   ASSERT_EQ(logger.call_count(), 1);
   ASSERT_TRUE(logger.last_event_logged().has_integer_event());
@@ -108,14 +118,16 @@
   InternalMetricsImpl inner_metrics(logger, nullptr);
   InternalMetricsPauseWrapper metrics(&inner_metrics);
 
-  metrics.PauseLogging();
-  for (int i = 0; i < kMany; i++) {
-    metrics.BytesUploaded(
-        PerProjectBytesUploadedMigratedMetricDimensionStatus::Attempted, kNumBytes,
-        lib::ProjectIdentifier(lib::CustomerIdentifier(kTestCustomerId), kTestProjectId));
+  {
+    InternalMetrics::InternalMetricsFlusher flusher = metrics.Flusher();
+    metrics.PauseLogging();
+    for (int i = 0; i < kMany; i++) {
+      metrics.BytesUploaded(
+          PerProjectBytesUploadedMigratedMetricDimensionStatus::Attempted, kNumBytes,
+          lib::ProjectIdentifier(lib::CustomerIdentifier(kTestCustomerId), kTestProjectId));
+    }
+    metrics.ResumeLogging();
   }
-  metrics.ResumeLogging();
-  metrics.Flush();
   ASSERT_EQ(logger.call_count(), 0);
 }
 
@@ -124,10 +136,12 @@
   InternalMetricsImpl metrics(logger, nullptr);
 
   ASSERT_EQ(logger.call_count(), 0);
-  metrics.BytesStored(
-      PerProjectBytesStoredMigratedMetricDimensionStatus::Attempted, kNumBytes,
-      lib::ProjectIdentifier(lib::CustomerIdentifier(kTestCustomerId), kTestProjectId));
-  metrics.Flush();
+  {
+    InternalMetrics::InternalMetricsFlusher flusher = metrics.Flusher();
+    metrics.BytesStored(
+        PerProjectBytesStoredMigratedMetricDimensionStatus::Attempted, kNumBytes,
+        lib::ProjectIdentifier(lib::CustomerIdentifier(kTestCustomerId), kTestProjectId));
+  }
 
   ASSERT_EQ(logger.call_count(), 1);
   ASSERT_TRUE(logger.last_event_logged().has_integer_event());
@@ -139,14 +153,16 @@
   InternalMetricsImpl inner_metrics(logger, nullptr);
   InternalMetricsPauseWrapper metrics(&inner_metrics);
 
-  metrics.PauseLogging();
-  for (int i = 0; i < kMany; i++) {
-    metrics.BytesStored(
-        PerProjectBytesStoredMigratedMetricDimensionStatus::Attempted, kNumBytes,
-        lib::ProjectIdentifier(lib::CustomerIdentifier(kTestCustomerId), kTestProjectId));
+  {
+    InternalMetrics::InternalMetricsFlusher flusher = metrics.Flusher();
+    metrics.PauseLogging();
+    for (int i = 0; i < kMany; i++) {
+      metrics.BytesStored(
+          PerProjectBytesStoredMigratedMetricDimensionStatus::Attempted, kNumBytes,
+          lib::ProjectIdentifier(lib::CustomerIdentifier(kTestCustomerId), kTestProjectId));
+    }
+    metrics.ResumeLogging();
   }
-  metrics.ResumeLogging();
-  metrics.Flush();
   ASSERT_EQ(logger.call_count(), 0);
 }
 
@@ -155,9 +171,11 @@
   InternalMetricsImpl metrics(logger, nullptr);
 
   ASSERT_EQ(logger.call_count(), 0);
-  metrics.SetSoftwareDistributionInfoCalled(
-      {SetSoftwareDistributionInfoCalledMigratedMetricDimensionNewRealm::Valid});
-  metrics.Flush();
+  {
+    InternalMetrics::InternalMetricsFlusher flusher = metrics.Flusher();
+    metrics.SetSoftwareDistributionInfoCalled(
+        {SetSoftwareDistributionInfoCalledMigratedMetricDimensionNewRealm::Valid});
+  }
 
   ASSERT_EQ(logger.call_count(), 1);
   ASSERT_TRUE(logger.last_event_logged().has_occurrence_event());
@@ -169,13 +187,15 @@
   InternalMetricsImpl inner_metrics(logger, nullptr);
   InternalMetricsPauseWrapper metrics(&inner_metrics);
 
-  metrics.PauseLogging();
-  for (int i = 0; i < kMany; i++) {
-    metrics.SetSoftwareDistributionInfoCalled(
-        {SetSoftwareDistributionInfoCalledMigratedMetricDimensionNewRealm::Valid});
+  {
+    InternalMetrics::InternalMetricsFlusher flusher = metrics.Flusher();
+    metrics.PauseLogging();
+    for (int i = 0; i < kMany; i++) {
+      metrics.SetSoftwareDistributionInfoCalled(
+          {SetSoftwareDistributionInfoCalledMigratedMetricDimensionNewRealm::Valid});
+    }
+    metrics.ResumeLogging();
   }
-  metrics.ResumeLogging();
-  metrics.Flush();
   ASSERT_EQ(logger.call_count(), 0);
 }
 
@@ -183,8 +203,10 @@
   testing::FakeLogger logger;
   InternalMetricsImpl metrics(logger, nullptr);
 
-  metrics.TrackDiskUsage(InternalMetrics::StorageClass::ObservationStore, 90, 100);
-  metrics.Flush();
+  {
+    InternalMetrics::InternalMetricsFlusher flusher = metrics.Flusher();
+    metrics.TrackDiskUsage(InternalMetrics::StorageClass::ObservationStore, 90, 100);
+  }
 
   ASSERT_EQ(logger.call_count(), 2);
   ASSERT_TRUE(logger.nth_event_logged(0).has_integer_event());
diff --git a/src/logger/logger.cc b/src/logger/logger.cc
index 69159e7..e250609 100644
--- a/src/logger/logger.cc
+++ b/src/logger/logger.cc
@@ -74,6 +74,7 @@
 // TODO(fxbug.dev/85571): NOLINTNEXTLINE(bugprone-easily-swappable-parameters)
 Status Logger::LogOccurrence(uint32_t metric_id, uint64_t count,
                              const std::vector<uint32_t>& event_codes) {
+  InternalMetrics::InternalMetricsFlusher flusher = internal_metrics_.Flusher();
   internal_metrics_.LoggerCalled(LoggerMethod::LogOccurrence, project_context_->project());
   CB_ASSIGN_OR_RETURN(auto event_record, EventRecord::MakeEventRecord(project_context_, metric_id));
   OccurrenceEvent* occurrence_event = event_record->event()->mutable_occurrence_event();
@@ -85,6 +86,7 @@
 // TODO(fxbug.dev/85571): NOLINTNEXTLINE(bugprone-easily-swappable-parameters)
 Status Logger::LogInteger(uint32_t metric_id, int64_t value,
                           const std::vector<uint32_t>& event_codes) {
+  InternalMetrics::InternalMetricsFlusher flusher = internal_metrics_.Flusher();
   internal_metrics_.LoggerCalled(LoggerMethod::LogInteger, project_context_->project());
   CB_ASSIGN_OR_RETURN(auto event_record, EventRecord::MakeEventRecord(project_context_, metric_id));
   IntegerEvent* integer_event = event_record->event()->mutable_integer_event();
@@ -95,6 +97,7 @@
 
 Status Logger::LogIntegerHistogram(uint32_t metric_id, HistogramPtr histogram,
                                    const std::vector<uint32_t>& event_codes) {
+  InternalMetrics::InternalMetricsFlusher flusher = internal_metrics_.Flusher();
   internal_metrics_.LoggerCalled(LoggerMethod::LogIntegerHistogram, project_context_->project());
   CB_ASSIGN_OR_RETURN(auto event_record, EventRecord::MakeEventRecord(project_context_, metric_id));
   IntegerHistogramEvent* integer_histogram_event =
@@ -106,6 +109,7 @@
 
 Status Logger::LogString(uint32_t metric_id, const std::string& string_value,
                          const std::vector<uint32_t>& event_codes) {
+  InternalMetrics::InternalMetricsFlusher flusher = internal_metrics_.Flusher();
   internal_metrics_.LoggerCalled(LoggerMethod::LogString, project_context_->project());
   CB_ASSIGN_OR_RETURN(auto event_record, EventRecord::MakeEventRecord(project_context_, metric_id));
   StringEvent* string_event = event_record->event()->mutable_string_event();
@@ -167,8 +171,6 @@
 
   Status result = event_logger->Log(std::move(event_record), *now);
 
-  // Flush queued internal metrics.
-  internal_metrics_.Flush();
   return result;
 }
 
diff --git a/src/logger/logger.h b/src/logger/logger.h
index 61ff9e1..a8ff15e 100644
--- a/src/logger/logger.h
+++ b/src/logger/logger.h
@@ -120,6 +120,7 @@
   // LoggerCalled (cobalt_internal::metrics::logger_calls_made_migrated) is logged
   // for every call to Logger along with which method was called.
   void RecordLoggerCall(LoggerCallsMadeMigratedMetricDimensionLoggerMethod method) override {
+    InternalMetrics::InternalMetricsFlusher flusher = internal_metrics_.Flusher();
     internal_metrics_.LoggerCalled(method, project_context_->project());
   }
 
diff --git a/src/logger/undated_event_manager.cc b/src/logger/undated_event_manager.cc
index a3ba6fa..3955445 100644
--- a/src/logger/undated_event_manager.cc
+++ b/src/logger/undated_event_manager.cc
@@ -87,6 +87,7 @@
     internal_metrics = &noop_internal_metrics;
   }
 
+  InternalMetrics::InternalMetricsFlusher flusher = internal_metrics->Flusher();
   // Record that we saved records due to clock inaccuracy.
   for (auto cached_events : lock->num_events_cached_) {
     internal_metrics->InaccurateClockEventsCached(cached_events.second);
diff --git a/src/logger/undated_event_manager_test.cc b/src/logger/undated_event_manager_test.cc
index 6d8e6e9..c945acb 100644
--- a/src/logger/undated_event_manager_test.cc
+++ b/src/logger/undated_event_manager_test.cc
@@ -152,7 +152,6 @@
   // Monotonic clock was not advanced, so the day index is the current one.
   ASSERT_EQ(kSystemTimePoint, local_aggregation_->last_timestamp());
   // One internal metric call for the number of cached events.
-  internal_metrics_->Flush();
   EXPECT_EQ(1, internal_logger_->call_count());
   EXPECT_EQ(internal_logger_->last_event_logged().metric_id(),
             kInaccurateClockEventsCachedMigratedMetricId);
@@ -185,7 +184,6 @@
   // Monotonic clock was not advanced, so the day index is the current one.
   ASSERT_EQ(kSystemTimePoint, local_aggregation_->last_timestamp());
   // One internal metric call for the number of cached events.
-  internal_metrics_->Flush();
   EXPECT_EQ(1, internal_logger_->call_count());
   EXPECT_EQ(internal_logger_->last_event_logged().metric_id(),
             kInaccurateClockEventsCachedMigratedMetricId);
@@ -220,7 +218,6 @@
   // Day index is from the previous day, due to the monotonic time advancing one day.
   ASSERT_EQ(kSystemTimePoint - std::chrono::hours(24), local_aggregation_->last_timestamp());
   // One internal metric call for the number of cached events.
-  internal_metrics_->Flush();
   EXPECT_EQ(1, internal_logger_->call_count());
   EXPECT_EQ(internal_logger_->last_event_logged().metric_id(),
             kInaccurateClockEventsCachedMigratedMetricId);
@@ -289,7 +286,6 @@
               local_aggregation_->metric_id(i));
     EXPECT_EQ(start_time + std::chrono::hours(24 * i), local_aggregation_->timestamp(i));
   }
-  internal_metrics_->Flush();
   // Two internal metric calls for the number of cached and dropped events.
   EXPECT_EQ(2, internal_logger_->call_count());
   EXPECT_EQ(internal_logger_->last_event_logged().metric_id(),
@@ -315,9 +311,9 @@
   ASSERT_NE(nullptr, event_record->metric());
 
   // One day later, the clock is finally accurate, try to save an event, this will generate
-  // observations. (This scenario is not very likely, as the clock should report accurate around the
-  // same time as the flush, but this allows us to verify the steady clock time being after the
-  // flush time results in a correct date setting.)
+  // observations. (This scenario is not very likely, as the clock should report accurate around
+  // the same time as the flush, but this allows us to verify the steady clock time being after
+  // the flush time results in a correct date setting.)
   mock_steady_clock_->increment_by(std::chrono::hours(24));
   ASSERT_EQ(StatusCode::OK, undated_event_manager_->Save(std::move(event_record)).error_code());
   EXPECT_EQ(0, undated_event_manager_->NumSavedEvents());
@@ -325,7 +321,6 @@
   ASSERT_EQ(1, local_aggregation_->num_events_added());
   // Day index is from the previous day, due to the monotonic time advancing one day.
   ASSERT_EQ(kSystemTimePoint + std::chrono::hours(24), local_aggregation_->last_timestamp());
-  internal_metrics_->Flush();
   // No internal metric calls for the number of cached events.
   EXPECT_EQ(0, internal_logger_->call_count());
 }
diff --git a/src/logging.h b/src/logging.h
index b367cd7..0022482 100644
--- a/src/logging.h
+++ b/src/logging.h
@@ -31,6 +31,13 @@
 #define CHECK_GE(val1, val2) FX_CHECKT((val1 >= val2), "core")
 #define CHECK_GT(val1, val2) FX_CHECKT((val1 > val2), "core")
 
+#define DCHECK(condition) FX_DCHECK(condition)
+#define DCHECK_EQ(val1, val2) FX_DCHECK((val1 == val2))
+#define DCHECK_NE(val1, val2) FX_DCHECK((val1 != val2))
+#define DCHECK_LE(val1, val2) FX_DCHECK((val1 <= val2))
+#define DCHECK_LT(val1, val2) FX_DCHECK((val1 < val2))
+#define DCHECK_GE(val1, val2) FX_DCHECK((val1 >= val2))
+#define DCHECK_GT(val1, val2) FX_DCHECK((val1 > val2))
 #else
 #error "Either HAVE_GLOG or __Fuchsia__ must be defined"
 #endif
diff --git a/src/system_data/system_data.cc b/src/system_data/system_data.cc
index c982a2c..57e6257 100644
--- a/src/system_data/system_data.cc
+++ b/src/system_data/system_data.cc
@@ -171,6 +171,7 @@
     }
   }
 
+  logger::InternalMetrics::InternalMetricsFlusher flusher = internal_metrics_->Flusher();
   internal_metrics_->SetSoftwareDistributionInfoCalled(event_codes);
   NotifyChange();
 }
diff --git a/src/uploader/shipping_manager.cc b/src/uploader/shipping_manager.cc
index 77fd08f..1a080c0 100644
--- a/src/uploader/shipping_manager.cc
+++ b/src/uploader/shipping_manager.cc
@@ -235,7 +235,10 @@
         }
         VLOG(4) << name() << " worker: time to send now.";
         locked.unlock();
-        SendAllEnvelopes();
+        {
+          logger::InternalMetrics::InternalMetricsFlusher flusher = internal_metrics_->Flusher();
+          SendAllEnvelopes();
+        }
         next_scheduled_send_time_ = std::chrono::system_clock::now() + upload_scheduler_.Interval();
         locked.lock();
       } else {
diff --git a/src/uploader/shipping_manager_test.cc b/src/uploader/shipping_manager_test.cc
index f73b983..a2da6fc 100644
--- a/src/uploader/shipping_manager_test.cc
+++ b/src/uploader/shipping_manager_test.cc
@@ -267,7 +267,6 @@
   EXPECT_EQ(StatusCode::OK, shipping_manager_->last_send_status().error_code());
   CheckCallCount(1, 1);
 
-  metrics->Flush();
   // Attempt and Succeed in the Shipping Manager for a project (2), Attempt and Succeed in Clearcut
   // Uploader with 1.1 metrics (2).
   EXPECT_EQ(4, logger.call_count());
@@ -327,8 +326,6 @@
   EXPECT_EQ(0u, shipping_manager_->num_failed_attempts());
   EXPECT_EQ(StatusCode::OK, shipping_manager_->last_send_status().error_code());
   CheckCallCount(1, 2);
-
-  metrics->Flush();
   // Attempt and Succeed in the Shipping Manager (2), Attempt and Succeed in Clearcut Uploader for
   // the envelope with 1.1 metrics (2).
   EXPECT_EQ(4, logger.call_count());