[LocalAggregation] Track LocalAggregation quota events

Stops logging local aggregation failures when also_log_locally is false.

Fixed: 95215
Change-Id: I7b5dbb42259892063654970a683001f010db6053
Reviewed-on: https://fuchsia-review.googlesource.com/c/cobalt/+/666454
Reviewed-by: Steve Fung <stevefung@google.com>
Commit-Queue: Zach Bush <zmbush@google.com>
Fuchsia-Auto-Submit: Zach Bush <zmbush@google.com>
diff --git a/src/local_aggregation_1_1/local_aggregation.cc b/src/local_aggregation_1_1/local_aggregation.cc
index 26e906c..8b4af42 100644
--- a/src/local_aggregation_1_1/local_aggregation.cc
+++ b/src/local_aggregation_1_1/local_aggregation.cc
@@ -12,6 +12,7 @@
 #include "src/lib/util/hash.h"
 #include "src/local_aggregation_1_1/aggregation_procedures/aggregation_procedure.h"
 #include "src/local_aggregation_1_1/local_aggregate_storage/local_aggregate_storage.h"
+#include "src/logger/internal_metrics_config.cb.h"
 #include "src/logger/privacy_encoder.h"
 #include "src/logger/project_context_factory.h"
 #include "src/pb/metadata_builder.h"
@@ -27,7 +28,8 @@
     const CobaltConfig &cfg, const logger::ProjectContextFactory *global_project_context_factory,
     system_data::SystemDataInterface &system_data, MetadataBuilder &metadata_builder,
     util::FileSystem &fs, const logger::ObservationWriter &observation_writer,
-    util::CivilTimeConverterInterface *civil_time_converter)
+    util::CivilTimeConverterInterface *civil_time_converter,
+    logger::InternalMetrics *internal_metrics)
     : global_project_context_factory_(global_project_context_factory),
       aggregate_storage_(LocalAggregateStorage::New(
           cfg.local_aggregate_store_strategy, cfg.local_aggregate_store_dir, fs,
@@ -38,7 +40,8 @@
                              civil_time_converter,
                              cfg.generate_observations_with_current_system_profile,
                              cfg.local_aggregation_backfill_days + 1),
-      storage_quotas_(cfg.storage_quotas) {
+      storage_quotas_(cfg.storage_quotas),
+      internal_metrics_(internal_metrics) {
   CHECK(SlushSize() > 0)
       << "There is no space in slush! The number of cobalt customers * per_project_reserved_bytes "
          "is greater than total_capacity_bytes. Please reduce per_project_reserved_bytes or "
@@ -54,21 +57,28 @@
 }
 
 bool LocalAggregation::CanStore(lib::ProjectIdentifier project) const {
-  size_t project_used = aggregate_storage_->AmountStored(project);
-  if (project_used < storage_quotas_.per_project_reserved_bytes) {
+  if (IsUnderQuota(project)) {
     return true;
   }
   return SlushUsed() < SlushSize();
 }
 
+bool LocalAggregation::IsUnderQuota(lib::ProjectIdentifier project) const {
+  size_t project_used = aggregate_storage_->AmountStored(project);
+  return project_used < storage_quotas_.per_project_reserved_bytes;
+}
+
 Status LocalAggregation::AddEvent(const logger::EventRecord &event_record,
                                   const std::chrono::system_clock::time_point &event_timestamp) {
+  using EventType = logger::LocalAggregationQuotaMetricDimensionEventType;
   if (IsDisabled()) {
     return Status::OkStatus();
   }
 
   lib::ProjectIdentifier proj = event_record.project_context()->Identifier();
+
   if (!CanStore(proj)) {
+    internal_metrics_->LocalAggregationQuotaEvent(proj, EventType::Rejected);
     return util::StatusBuilder(StatusCode::RESOURCE_EXHAUSTED,
                                "There is not enough space to add event: ")
         .AppendMsg(event_record)
@@ -79,6 +89,9 @@
         .Build();
   }
 
+  internal_metrics_->LocalAggregationQuotaEvent(
+      proj, IsUnderQuota(proj) ? EventType::Under : EventType::Over);
+
   CB_ASSIGN_OR_RETURN(LocalAggregateStorage::MetricAggregateRef aggregate,
                       aggregate_storage_->GetMetricAggregate(event_record.MetricIdentifier()));
 
diff --git a/src/local_aggregation_1_1/local_aggregation.h b/src/local_aggregation_1_1/local_aggregation.h
index 6b44482..189ead2 100644
--- a/src/local_aggregation_1_1/local_aggregation.h
+++ b/src/local_aggregation_1_1/local_aggregation.h
@@ -11,6 +11,7 @@
 #include "src/local_aggregation_1_1/local_aggregate_storage/local_aggregate_storage.h"
 #include "src/local_aggregation_1_1/observation_generator.h"
 #include "src/logger/event_record.h"
+#include "src/logger/internal_metrics.h"
 #include "src/logger/logger_interface.h"
 #include "src/logger/observation_writer.h"
 #include "src/logger/project_context_factory.h"
@@ -35,11 +36,13 @@
   // |fs|: An instance of util::FileSystem
   // |observation_writer|: An implementation of the ObservationWriter interface.
   // |civil_time_converter|: Converts a `time_point` to a civil time for a given time zone.
+  // |internal_metrics|: A possibly null pointer to an InternalMetrics instance.
   LocalAggregation(const CobaltConfig &cfg,
                    const logger::ProjectContextFactory *global_project_context_factory,
                    system_data::SystemDataInterface &system_data, MetadataBuilder &metadata_builder,
                    util::FileSystem &fs, const logger::ObservationWriter &observation_writer,
-                   util::CivilTimeConverterInterface *civil_time_converter);
+                   util::CivilTimeConverterInterface *civil_time_converter,
+                   logger::InternalMetrics *internal_metrics = nullptr);
 
   // Start should be called when the system is ready to start the background process for generating
   // observations based on the aggregated events.
@@ -81,6 +84,8 @@
   // 2. The project is over per_project_reserved_bytes, but there is room in the "Slush" storage.
   bool CanStore(lib::ProjectIdentifier project) const;
 
+  bool IsUnderQuota(lib::ProjectIdentifier project) const;
+
   // Returns the size of the "Slush" storage. This should be the total_capacity_bytes minus (number
   // of projects * per_project_reserved_bytes).
   int64_t SlushSize() const;
@@ -95,6 +100,7 @@
   std::unique_ptr<util::SystemClockInterface> system_clock_;
   bool is_disabled_ = false;
   StorageQuotas storage_quotas_;
+  logger::InternalMetricsPtr internal_metrics_;
 };
 
 }  // namespace cobalt::local_aggregation
diff --git a/src/logger/event_loggers.cc b/src/logger/event_loggers.cc
index 9d7caef..abf71e0 100644
--- a/src/logger/event_loggers.cc
+++ b/src/logger/event_loggers.cc
@@ -287,7 +287,6 @@
   if (IsOnePointOne()) {
     status = local_aggregation().AddEvent(*event_record, event_timestamp);
     if (!status.ok()) {
-      LOG(ERROR) << "Error occurred while locally aggregating event: " << status;
       TraceLogFailure(status, *event_record, trace, event_record->metric()->reports(0));
       return status;
     }
diff --git a/src/logger/internal_metrics.cc b/src/logger/internal_metrics.cc
index 7cfcdc2..60e518e 100644
--- a/src/logger/internal_metrics.cc
+++ b/src/logger/internal_metrics.cc
@@ -199,6 +199,26 @@
   });
 }
 
+void InternalMetricsImpl::LocalAggregationQuotaEvent(
+    const lib::ProjectIdentifier& project_identifier,
+    LocalAggregationQuotaMetricDimensionEventType event_type) {
+  fields_.lock()->queued_lambdas.emplace_back([this, project_identifier, event_type] {
+    std::ostringstream component;
+    component << project_identifier.customer_id() << '/' << project_identifier.project_id();
+
+    auto status = logger_.LogOccurrence(kLocalAggregationQuotaMetricId, 1,
+                                        {GetProjectEventCode(component.str()), event_type});
+
+    if (!status.ok()) {
+      VLOG(1) << "InternalMetricsImpl::BytesStored: LogInteger() returned status=" << status;
+    }
+
+    if (diagnostics_ != nullptr) {
+      diagnostics_->LocalAggregationQuotaEvent(project_identifier, event_type);
+    }
+  });
+}
+
 void InternalMetricsImpl::Flush() {
   std::vector<std::function<void()>> queued;
   fields_.lock()->queued_lambdas.swap(queued);
diff --git a/src/logger/internal_metrics.h b/src/logger/internal_metrics.h
index fb109d9..2c8d9ec 100644
--- a/src/logger/internal_metrics.h
+++ b/src/logger/internal_metrics.h
@@ -90,6 +90,10 @@
     TrackDiskUsage(storage_class, bytes, -1);
   }
 
+  virtual void LocalAggregationQuotaEvent(
+      const lib::ProjectIdentifier& project_identifier,
+      LocalAggregationQuotaMetricDimensionEventType event_type) = 0;
+
   // Flush queued internal metrics.
   virtual void Flush() = 0;
 
@@ -133,6 +137,10 @@
   // TODO(fxbug.dev/85571): NOLINTNEXTLINE(bugprone-easily-swappable-parameters)
   void TrackDiskUsage(StorageClass storage_class, size_t bytes, int64_t max_bytes) override {}
 
+  void LocalAggregationQuotaEvent(
+      const lib::ProjectIdentifier& project_identifier,
+      LocalAggregationQuotaMetricDimensionEventType event_type) override {}
+
   void Flush() override {}
 
   [[nodiscard]] bool IsRealImpl() const override { return false; }
@@ -169,6 +177,10 @@
 
   void TrackDiskUsage(StorageClass storage_class, size_t bytes, int64_t max_bytes) override;
 
+  void LocalAggregationQuotaEvent(
+      const lib::ProjectIdentifier& project_identifier,
+      LocalAggregationQuotaMetricDimensionEventType event_type) override;
+
   void Flush() override;
 
   [[nodiscard]] bool IsRealImpl() const override { return true; }
@@ -274,6 +286,14 @@
     }
   }
 
+  void LocalAggregationQuotaEvent(
+      const lib::ProjectIdentifier& project_identifier,
+      LocalAggregationQuotaMetricDimensionEventType event_type) override {
+    if (!paused_) {
+      wrapped_internal_metrics_->LocalAggregationQuotaEvent(project_identifier, event_type);
+    }
+  }
+
   // After PauseLogging is called, all calls to log internal metrics will be
   // ignored.
   void PauseLogging() { paused_ = true; }
diff --git a/src/public/diagnostics_interface.h b/src/public/diagnostics_interface.h
index 3f9a23d..18c934c 100644
--- a/src/public/diagnostics_interface.h
+++ b/src/public/diagnostics_interface.h
@@ -54,6 +54,14 @@
   // TODO(fxbug.dev/85571): use named types when InternalMetrics::TrackDiskUsage is fixed.
   // NOLINTNEXTLINE(bugprone-easily-swappable-parameters)
   virtual void TrackDiskUsage(int storageClass, int64_t bytes, int64_t byte_limit) = 0;
+
+  // Used to track what the quota state of each project on the system is.
+  //
+  // |event_type|: tracks what the state is (1: Below quota, 2: Above quota, 3: Above quota and log
+  //               rejected).
+  //
+  // TODO(fxbug.dev/96540): Remove default implementation once this is implemented in Fuchsia.
+  virtual void LocalAggregationQuotaEvent(const lib::ProjectIdentifier& project, int event_type) {}
 };
 
 }  // namespace cobalt
diff --git a/third_party/cobalt_config b/third_party/cobalt_config
index 06b03db..fd18d93 160000
--- a/third_party/cobalt_config
+++ b/third_party/cobalt_config
@@ -1 +1 @@
-Subproject commit 06b03db3de9d5fcecdfe2bd3e14c0dc656006cb9
+Subproject commit fd18d939c59a1d7479173d81666ea59f02dba7c3