[LocalAggregation] Track LocalAggregation quota events
Stops logging local aggregation failures when also_log_locally is false.
Fixed: 95215
Change-Id: I7b5dbb42259892063654970a683001f010db6053
Reviewed-on: https://fuchsia-review.googlesource.com/c/cobalt/+/666454
Reviewed-by: Steve Fung <stevefung@google.com>
Commit-Queue: Zach Bush <zmbush@google.com>
Fuchsia-Auto-Submit: Zach Bush <zmbush@google.com>
diff --git a/src/local_aggregation_1_1/local_aggregation.cc b/src/local_aggregation_1_1/local_aggregation.cc
index 26e906c..8b4af42 100644
--- a/src/local_aggregation_1_1/local_aggregation.cc
+++ b/src/local_aggregation_1_1/local_aggregation.cc
@@ -12,6 +12,7 @@
#include "src/lib/util/hash.h"
#include "src/local_aggregation_1_1/aggregation_procedures/aggregation_procedure.h"
#include "src/local_aggregation_1_1/local_aggregate_storage/local_aggregate_storage.h"
+#include "src/logger/internal_metrics_config.cb.h"
#include "src/logger/privacy_encoder.h"
#include "src/logger/project_context_factory.h"
#include "src/pb/metadata_builder.h"
@@ -27,7 +28,8 @@
const CobaltConfig &cfg, const logger::ProjectContextFactory *global_project_context_factory,
system_data::SystemDataInterface &system_data, MetadataBuilder &metadata_builder,
util::FileSystem &fs, const logger::ObservationWriter &observation_writer,
- util::CivilTimeConverterInterface *civil_time_converter)
+ util::CivilTimeConverterInterface *civil_time_converter,
+ logger::InternalMetrics *internal_metrics)
: global_project_context_factory_(global_project_context_factory),
aggregate_storage_(LocalAggregateStorage::New(
cfg.local_aggregate_store_strategy, cfg.local_aggregate_store_dir, fs,
@@ -38,7 +40,8 @@
civil_time_converter,
cfg.generate_observations_with_current_system_profile,
cfg.local_aggregation_backfill_days + 1),
- storage_quotas_(cfg.storage_quotas) {
+ storage_quotas_(cfg.storage_quotas),
+ internal_metrics_(internal_metrics) {
CHECK(SlushSize() > 0)
<< "There is no space in slush! The number of cobalt customers * per_project_reserved_bytes "
"is greater than total_capacity_bytes. Please reduce per_project_reserved_bytes or "
@@ -54,21 +57,28 @@
}
bool LocalAggregation::CanStore(lib::ProjectIdentifier project) const {
- size_t project_used = aggregate_storage_->AmountStored(project);
- if (project_used < storage_quotas_.per_project_reserved_bytes) {
+ if (IsUnderQuota(project)) {
return true;
}
return SlushUsed() < SlushSize();
}
+bool LocalAggregation::IsUnderQuota(lib::ProjectIdentifier project) const {
+ size_t project_used = aggregate_storage_->AmountStored(project);
+ return project_used < storage_quotas_.per_project_reserved_bytes;
+}
+
Status LocalAggregation::AddEvent(const logger::EventRecord &event_record,
const std::chrono::system_clock::time_point &event_timestamp) {
+ using EventType = logger::LocalAggregationQuotaMetricDimensionEventType;
if (IsDisabled()) {
return Status::OkStatus();
}
lib::ProjectIdentifier proj = event_record.project_context()->Identifier();
+
if (!CanStore(proj)) {
+ internal_metrics_->LocalAggregationQuotaEvent(proj, EventType::Rejected);
return util::StatusBuilder(StatusCode::RESOURCE_EXHAUSTED,
"There is not enough space to add event: ")
.AppendMsg(event_record)
@@ -79,6 +89,9 @@
.Build();
}
+ internal_metrics_->LocalAggregationQuotaEvent(
+ proj, IsUnderQuota(proj) ? EventType::Under : EventType::Over);
+
CB_ASSIGN_OR_RETURN(LocalAggregateStorage::MetricAggregateRef aggregate,
aggregate_storage_->GetMetricAggregate(event_record.MetricIdentifier()));
diff --git a/src/local_aggregation_1_1/local_aggregation.h b/src/local_aggregation_1_1/local_aggregation.h
index 6b44482..189ead2 100644
--- a/src/local_aggregation_1_1/local_aggregation.h
+++ b/src/local_aggregation_1_1/local_aggregation.h
@@ -11,6 +11,7 @@
#include "src/local_aggregation_1_1/local_aggregate_storage/local_aggregate_storage.h"
#include "src/local_aggregation_1_1/observation_generator.h"
#include "src/logger/event_record.h"
+#include "src/logger/internal_metrics.h"
#include "src/logger/logger_interface.h"
#include "src/logger/observation_writer.h"
#include "src/logger/project_context_factory.h"
@@ -35,11 +36,13 @@
// |fs|: An instance of util::FileSystem
// |observation_writer|: An implementation of the ObservationWriter interface.
// |civil_time_converter|: Converts a `time_point` to a civil time for a given time zone.
+ // |internal_metrics|: A possibly null pointer to an InternalMetrics instance.
LocalAggregation(const CobaltConfig &cfg,
const logger::ProjectContextFactory *global_project_context_factory,
system_data::SystemDataInterface &system_data, MetadataBuilder &metadata_builder,
util::FileSystem &fs, const logger::ObservationWriter &observation_writer,
- util::CivilTimeConverterInterface *civil_time_converter);
+ util::CivilTimeConverterInterface *civil_time_converter,
+ logger::InternalMetrics *internal_metrics = nullptr);
// Start should be called when the system is ready to start the background process for generating
// observations based on the aggregated events.
@@ -81,6 +84,8 @@
// 2. The project is over per_project_reserved_bytes, but there is room in the "Slush" storage.
bool CanStore(lib::ProjectIdentifier project) const;
+ bool IsUnderQuota(lib::ProjectIdentifier project) const;
+
// Returns the size of the "Slush" storage. This should be the total_capacity_bytes minus (number
// of projects * per_project_reserved_bytes).
int64_t SlushSize() const;
@@ -95,6 +100,7 @@
std::unique_ptr<util::SystemClockInterface> system_clock_;
bool is_disabled_ = false;
StorageQuotas storage_quotas_;
+ logger::InternalMetricsPtr internal_metrics_;
};
} // namespace cobalt::local_aggregation
diff --git a/src/logger/event_loggers.cc b/src/logger/event_loggers.cc
index 9d7caef..abf71e0 100644
--- a/src/logger/event_loggers.cc
+++ b/src/logger/event_loggers.cc
@@ -287,7 +287,6 @@
if (IsOnePointOne()) {
status = local_aggregation().AddEvent(*event_record, event_timestamp);
if (!status.ok()) {
- LOG(ERROR) << "Error occurred while locally aggregating event: " << status;
TraceLogFailure(status, *event_record, trace, event_record->metric()->reports(0));
return status;
}
diff --git a/src/logger/internal_metrics.cc b/src/logger/internal_metrics.cc
index 7cfcdc2..60e518e 100644
--- a/src/logger/internal_metrics.cc
+++ b/src/logger/internal_metrics.cc
@@ -199,6 +199,26 @@
});
}
+void InternalMetricsImpl::LocalAggregationQuotaEvent(
+ const lib::ProjectIdentifier& project_identifier,
+ LocalAggregationQuotaMetricDimensionEventType event_type) {
+ fields_.lock()->queued_lambdas.emplace_back([this, project_identifier, event_type] {
+ std::ostringstream component;
+ component << project_identifier.customer_id() << '/' << project_identifier.project_id();
+
+ auto status = logger_.LogOccurrence(kLocalAggregationQuotaMetricId, 1,
+ {GetProjectEventCode(component.str()), event_type});
+
+ if (!status.ok()) {
+ VLOG(1) << "InternalMetricsImpl::BytesStored: LogInteger() returned status=" << status;
+ }
+
+ if (diagnostics_ != nullptr) {
+ diagnostics_->LocalAggregationQuotaEvent(project_identifier, event_type);
+ }
+ });
+}
+
void InternalMetricsImpl::Flush() {
std::vector<std::function<void()>> queued;
fields_.lock()->queued_lambdas.swap(queued);
diff --git a/src/logger/internal_metrics.h b/src/logger/internal_metrics.h
index fb109d9..2c8d9ec 100644
--- a/src/logger/internal_metrics.h
+++ b/src/logger/internal_metrics.h
@@ -90,6 +90,10 @@
TrackDiskUsage(storage_class, bytes, -1);
}
+ virtual void LocalAggregationQuotaEvent(
+ const lib::ProjectIdentifier& project_identifier,
+ LocalAggregationQuotaMetricDimensionEventType event_type) = 0;
+
// Flush queued internal metrics.
virtual void Flush() = 0;
@@ -133,6 +137,10 @@
// TODO(fxbug.dev/85571): NOLINTNEXTLINE(bugprone-easily-swappable-parameters)
void TrackDiskUsage(StorageClass storage_class, size_t bytes, int64_t max_bytes) override {}
+ void LocalAggregationQuotaEvent(
+ const lib::ProjectIdentifier& project_identifier,
+ LocalAggregationQuotaMetricDimensionEventType event_type) override {}
+
void Flush() override {}
[[nodiscard]] bool IsRealImpl() const override { return false; }
@@ -169,6 +177,10 @@
void TrackDiskUsage(StorageClass storage_class, size_t bytes, int64_t max_bytes) override;
+ void LocalAggregationQuotaEvent(
+ const lib::ProjectIdentifier& project_identifier,
+ LocalAggregationQuotaMetricDimensionEventType event_type) override;
+
void Flush() override;
[[nodiscard]] bool IsRealImpl() const override { return true; }
@@ -274,6 +286,14 @@
}
}
+ void LocalAggregationQuotaEvent(
+ const lib::ProjectIdentifier& project_identifier,
+ LocalAggregationQuotaMetricDimensionEventType event_type) override {
+ if (!paused_) {
+ wrapped_internal_metrics_->LocalAggregationQuotaEvent(project_identifier, event_type);
+ }
+ }
+
// After PauseLogging is called, all calls to log internal metrics will be
// ignored.
void PauseLogging() { paused_ = true; }
diff --git a/src/public/diagnostics_interface.h b/src/public/diagnostics_interface.h
index 3f9a23d..18c934c 100644
--- a/src/public/diagnostics_interface.h
+++ b/src/public/diagnostics_interface.h
@@ -54,6 +54,14 @@
// TODO(fxbug.dev/85571): use named types when InternalMetrics::TrackDiskUsage is fixed.
// NOLINTNEXTLINE(bugprone-easily-swappable-parameters)
virtual void TrackDiskUsage(int storageClass, int64_t bytes, int64_t byte_limit) = 0;
+
+ // Used to track what the quota state of each project on the system is.
+ //
+ // |event_type|: tracks what the state is (1: Below quota, 2: Above quota, 3: Above quota and log
+ // rejected).
+ //
+ // TODO(fxbug.dev/96540): Remove default implementation once this is implemented in Fuchsia.
+ virtual void LocalAggregationQuotaEvent(const lib::ProjectIdentifier& project, int event_type) {}
};
} // namespace cobalt
diff --git a/third_party/cobalt_config b/third_party/cobalt_config
index 06b03db..fd18d93 160000
--- a/third_party/cobalt_config
+++ b/third_party/cobalt_config
@@ -1 +1 @@
-Subproject commit 06b03db3de9d5fcecdfe2bd3e14c0dc656006cb9
+Subproject commit fd18d939c59a1d7479173d81666ea59f02dba7c3