[NoCheck] Avoid pointers in EventAggregatorMgr
- Removes 2 CHECK calls
- Makes AggregateStore/EventAggregator move-assignable (so they no
longer need to be pointers)
- Adds move constructor for ProtectedFields.
- Creates 'NotNullUniquePtr' and 'PinnedUniquePtr' that guarantees that
the held object can never be nullptr.
- Used for steady_clock_ which must be a pointer, but should never be
null.
Change-Id: I2ea45ceef6cb8814cf471071b410942e861f3e50
Reviewed-on: https://fuchsia-review.googlesource.com/c/cobalt/+/659347
Fuchsia-Auto-Submit: Zach Bush <zmbush@google.com>
Reviewed-by: Cameron Dale <camrdale@google.com>
Commit-Queue: Auto-Submit <auto-submit@fuchsia-infra.iam.gserviceaccount.com>
diff --git a/src/bin/test_app/test_app.cc b/src/bin/test_app/test_app.cc
index eaa7bad..12db433 100644
--- a/src/bin/test_app/test_app.cc
+++ b/src/bin/test_app/test_app.cc
@@ -355,7 +355,7 @@
bool RealLoggerFactory::GenerateAggregatedObservations(uint32_t day_index) {
return cobalt_service_->event_aggregator_manager()
- ->aggregate_store_->GenerateObservations(day_index)
+ ->aggregate_store_.GenerateObservations(day_index)
.ok();
}
diff --git a/src/lib/util/BUILD.gn b/src/lib/util/BUILD.gn
index 6a55766..dfe1277 100644
--- a/src/lib/util/BUILD.gn
+++ b/src/lib/util/BUILD.gn
@@ -309,6 +309,11 @@
sources = [ "named_type.h" ]
}
+source_set("not_null") {
+ sources = [ "not_null.h" ]
+ public_deps = [ "$cobalt_root/src:logging" ]
+}
+
group("tests") {
testonly = true
deps = [
diff --git a/src/lib/util/not_null.h b/src/lib/util/not_null.h
new file mode 100644
index 0000000..cc675ba
--- /dev/null
+++ b/src/lib/util/not_null.h
@@ -0,0 +1,183 @@
+// Copyright 2022 The Fuchsia Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef COBALT_SRC_LIB_UTIL_NOT_NULL_H_
+#define COBALT_SRC_LIB_UTIL_NOT_NULL_H_
+
+#include <memory>
+#include <ostream>
+#include <type_traits>
+
+#include "src/logging.h"
+
+namespace cobalt::util {
+
+template <class T>
+class PinnedUniquePtr;
+
+// NotNullUniquePtr is a wrapper around std::unique_ptr<T> that guarantees that the contained type
+// is non-null at construction time.
+//
+// It does this by:
+// 1) Only being constructible by using MakeNotNullUniquePtr or WrapNotNullUniquePtrOrDefault
+// which both guarantee that the contained type cannot be null.
+// 2) Not allowing any access to the contained type.
+//
+// NotNullUniquePtr is a type that is only used in *transit*. It works as a return type to
+// functions, and it can be transparently converted into a PinnedUniquePtr (see below).
+template <class T>
+class NotNullUniquePtr {
+ private:
+ template <typename U, class... Args>
+ friend constexpr NotNullUniquePtr<U> MakeNotNullUniquePtr(Args&&... args);
+
+ template <typename U, typename V, class... Args>
+ friend constexpr NotNullUniquePtr<V> WrapNotNullUniquePtrOrDefault(std::unique_ptr<V>&& other,
+ Args&&... args);
+
+ template <typename U>
+ friend constexpr NotNullUniquePtr<U> TESTONLY_TakeRawPointer(U* other);
+
+ // PinnedUniquePtr is move-constructed from a NotNullUniquePtr, and thus needs to access the
+ // contained ptr_.
+ template <class U>
+ friend class PinnedUniquePtr;
+
+ template <class U>
+ friend class NotNullUniquePtr;
+
+ // Private constructor. ptr *must* be non null.
+ template <typename U>
+ explicit NotNullUniquePtr(std::unique_ptr<U> ptr) : ptr_(std::move(ptr)) {
+ // Invariants that must be true if we are to trust this class.
+ static_assert(sizeof(std::unique_ptr<T>) == sizeof(NotNullUniquePtr<T>),
+ "NotNullUniquePtr must have zero space overhead.");
+ static_assert(!std::is_copy_constructible<NotNullUniquePtr<T>>::value,
+ "NotNullUniquePtr must not be copy constructible.");
+ static_assert(!std::is_copy_assignable<NotNullUniquePtr<T>>::value,
+ "NotNullUniquePtr must not be copy assignable.");
+ static_assert(!std::is_default_constructible<NotNullUniquePtr<T>>::value,
+ "NotNullUniquePtr must have no default constructor.");
+ static_assert(std::is_move_constructible<NotNullUniquePtr<T>>::value,
+ "NotNullUniquePtr must be move constructible.");
+ static_assert(std::is_move_assignable<NotNullUniquePtr<T>>::value,
+ "NotNullUniquePtr must be move assignable.");
+ }
+
+ public:
+ // Move constructor. Source NotNullUniquePtr will be invalidated.
+ template <typename U>
+ NotNullUniquePtr(NotNullUniquePtr<U>&& other) // NOLINT(google-explicit-constructor)
+ : ptr_(std::move(other.ptr_)) {}
+
+ private:
+ std::unique_ptr<T> ptr_;
+};
+
+// MakeNotNullUniquePtr is equivalent to std::make_unique, and constructs a NotNullUniquePtr that is
+// guaranteed to not be null.
+template <typename T, class... Args>
+constexpr NotNullUniquePtr<T> MakeNotNullUniquePtr(Args&&... args) {
+ return NotNullUniquePtr<T>(std::make_unique<T>(std::forward<Args>(args)...));
+}
+
+// WrapNotNullUniquePtrOrDefault allows converting std::unique_ptr into a NotNullUniquePtr. It does
+// this by taking arguments for a fallback constructor after the std::unique_ptr argument. If the
+// provided std::unique_ptr happens to be null, the fallback constructor will be used. This way the
+// pointer is guaranteed never to be null.
+template <typename T, typename U, class... Args>
+constexpr NotNullUniquePtr<U> WrapNotNullUniquePtrOrDefault(std::unique_ptr<U>&& other,
+ Args&&... args) {
+ if (other) {
+ return NotNullUniquePtr<U>(std::move(other));
+ }
+ return NotNullUniquePtr<U>(std::make_unique<T>(std::forward<Args>(args)...));
+}
+
+// TESTONLY_TakeRawPointer is a hack to allow a test to keep a reference to a pointer while passing
+// a NotNullUniquePtr to another class. It is currently only used in testing Cobalt 1.0 local
+// aggregation and is not the preferred solution.
+template <typename T>
+constexpr NotNullUniquePtr<T> TESTONLY_TakeRawPointer(T* other) {
+ CHECK(other) << "Expected non-null other in TESTONLY_TakeRawPointer";
+ return NotNullUniquePtr<T>(std::unique_ptr<T>(other));
+}
+
+// PinnedUniquePtr is a wrapper around std::unique_ptr<T> that guarantees that the held pointer is
+// never null.
+//
+// It does this by:
+// 1) Only being constructible by a move-constructor from NotNullUniquePtr
+// 2) Being immovable (since a move can invalidate the source ptr)
+//
+// PinnedUniquePtr is the actually usable part of the pair of classes in this file, allowing get,
+// operator->, operator*, as well as swap() access. No other methods of the underlying unique_ptr
+// are allowed, as they may cause the held pointer to become null.
+template <class T>
+class PinnedUniquePtr {
+ public:
+ // Swaps the managed objects and associated deleters of *this and another PinnedUniquePtr object
+ // *other*.
+ template <class U>
+ void swap(PinnedUniquePtr<U>& other) noexcept {
+ other.ptr_.swap(ptr_);
+ }
+
+ // Swaps the managed objects and associated deleters of *this and a NotNullUniquePtr object.
+ template <class U>
+ void swap(NotNullUniquePtr<U>& other) noexcept {
+ other.ptr_.swap(ptr_);
+ }
+
+ // Swaps the managed objects and associated deleters of *this and a NotNullUniquePtr object (by
+ // rvalue).
+ template <class U>
+ void swap(NotNullUniquePtr<U>&& other) noexcept {
+ other.ptr_.swap(ptr_);
+ }
+
+ // Construct the PinnedUniquePtr from an existing NotNullUniquePtr. This invalidates the source
+ // ptr, and pins the managed object in place.
+ template <typename U>
+ explicit PinnedUniquePtr(NotNullUniquePtr<U>&& other) : ptr_(std::move(other.ptr_)) {
+ // Invariants that must be true if we are to trust this class.
+ static_assert(sizeof(std::unique_ptr<T>) == sizeof(PinnedUniquePtr<T>),
+ "PinnedUniquePtr must have zero space overhead.");
+ static_assert(sizeof(std::unique_ptr<T>) == sizeof(PinnedUniquePtr<T>),
+ "PinnedUniquePtr must have zero space overhead.");
+ static_assert(!std::is_copy_constructible<PinnedUniquePtr<T>>::value,
+ "PinnedUniquePtr must not be copy constructible.");
+ static_assert(!std::is_copy_assignable<PinnedUniquePtr<T>>::value,
+ "PinnedUniquePtr must not be copy assignable.");
+ static_assert(!std::is_default_constructible<PinnedUniquePtr<T>>::value,
+ "PinnedUniquePtr must have no default constructor.");
+ static_assert(!std::is_move_constructible<PinnedUniquePtr<T>>::value,
+ "PinnedUniquePtr must not be move constructible.");
+ static_assert(!std::is_move_assignable<PinnedUniquePtr<T>>::value,
+ "PinnedUniquePtr must not be move assignable.");
+ }
+
+ template <typename U>
+ PinnedUniquePtr& operator=(NotNullUniquePtr<U>&& other) {
+ ptr_ = std::move(other.ptr_);
+ return *this;
+ }
+
+ // Access the contained ptr. Guaranteed never to return null.
+ [[nodiscard]] T* get() const noexcept { return ptr_.get(); }
+ T* operator->() { return ptr_.operator->(); }
+ T& operator*() { return ptr_.operator*(); }
+
+ private:
+ std::unique_ptr<T> ptr_;
+
+ // Disable copy and move
+ PinnedUniquePtr() = delete;
+ PinnedUniquePtr(PinnedUniquePtr const&) = delete;
+ PinnedUniquePtr(PinnedUniquePtr&&) = delete;
+};
+
+} // namespace cobalt::util
+
+#endif // COBALT_SRC_LIB_UTIL_NOT_NULL_H_
diff --git a/src/lib/util/protected_fields.h b/src/lib/util/protected_fields.h
index 301283a..62df00b 100644
--- a/src/lib/util/protected_fields.h
+++ b/src/lib/util/protected_fields.h
@@ -167,6 +167,14 @@
BaseProtectedFields& operator=(const BaseProtectedFields&) = delete;
BaseProtectedFields(const BaseProtectedFields&) = delete;
BaseProtectedFields() = default;
+
+ // Make BaseProtectedFields Move constructible/assignable
+ BaseProtectedFields& operator=(BaseProtectedFields&& other) noexcept {
+ std::unique_lock<Mutex> lock(other.mutex_);
+ fields_ = std::move(other.fields_);
+ return *this;
+ }
+ BaseProtectedFields(BaseProtectedFields&& other) noexcept { *this = other; }
};
// The default implementation of ProtectedFields. Uses standard mutexes.
diff --git a/src/local_aggregation/BUILD.gn b/src/local_aggregation/BUILD.gn
index 5a993e4..6704a13 100644
--- a/src/local_aggregation/BUILD.gn
+++ b/src/local_aggregation/BUILD.gn
@@ -58,6 +58,7 @@
"$cobalt_root/src/lib/util:consistent_proto_store",
"$cobalt_root/src/lib/util:datetime_util",
"$cobalt_root/src/lib/util:file_system",
+ "$cobalt_root/src/lib/util:not_null",
"$cobalt_root/src/logger:encoder",
"$cobalt_root/src/logger:observation_writer",
"$cobalt_root/src/public:cobalt_config",
diff --git a/src/local_aggregation/aggregate_store.cc b/src/local_aggregation/aggregate_store.cc
index 606878f..842770c 100644
--- a/src/local_aggregation/aggregate_store.cc
+++ b/src/local_aggregation/aggregate_store.cc
@@ -169,8 +169,8 @@
AggregateStore::AggregateStore(
const Encoder& encoder, const ObservationWriter* observation_writer,
// TODO(fxbug.dev/85571): NOLINTNEXTLINE(bugprone-easily-swappable-parameters)
- ConsistentProtoStore* local_aggregate_proto_store,
- ConsistentProtoStore* obs_history_proto_store, const size_t backfill_days)
+ ConsistentProtoStore& local_aggregate_proto_store,
+ ConsistentProtoStore& obs_history_proto_store, const size_t backfill_days)
: encoder_(encoder),
observation_writer_(observation_writer),
local_aggregate_proto_store_(local_aggregate_proto_store),
@@ -181,7 +181,7 @@
auto locked_store = protected_aggregate_store_.lock();
locked_store->empty_local_aggregate_store = MakeNewLocalAggregateStore();
auto restore_aggregates_status =
- local_aggregate_proto_store_->Read(&(locked_store->local_aggregate_store));
+ local_aggregate_proto_store_.get().Read(&(locked_store->local_aggregate_store));
switch (restore_aggregates_status.error_code()) {
case StatusCode::OK: {
VLOG(4) << "Read LocalAggregateStore from disk.";
@@ -211,7 +211,8 @@
}
auto locked_obs_history = protected_obs_history_.lock();
- auto restore_history_status = obs_history_proto_store_->Read(&locked_obs_history->obs_history);
+ auto restore_history_status =
+ obs_history_proto_store_.get().Read(&locked_obs_history->obs_history);
switch (restore_history_status.error_code()) {
case StatusCode::OK: {
VLOG(4) << "Read AggregatedObservationHistoryStore from disk.";
@@ -371,7 +372,7 @@
// Cannot cause a loop, since TrackDiskUsage does not use any Cobalt 1.0 metrics
internal_metrics_->TrackDiskUsage(logger::InternalMetrics::StorageClass::AggregateStore,
static_cast<int64_t>(store_size));
- auto status = local_aggregate_proto_store_->Write(local_aggregate_store);
+ auto status = local_aggregate_proto_store_.get().Write(local_aggregate_store);
if (!status.ok()) {
LOG(ERROR) << "Failed to back up the LocalAggregateStore with error " << status;
return status;
@@ -385,7 +386,7 @@
// Cannot cause a loop, since TrackDiskUsage does not use any Cobalt 1.0 metrics
internal_metrics_->TrackDiskUsage(logger::InternalMetrics::StorageClass::ObservationHistory,
static_cast<int64_t>(history_size));
- auto status = obs_history_proto_store_->Write(obs_history);
+ auto status = obs_history_proto_store_.get().Write(obs_history);
if (!status.ok()) {
LOG(ERROR) << "Failed to back up the AggregatedObservationHistoryStore. " << status;
return status;
@@ -687,8 +688,8 @@
Status AggregateStore::GenerateSingleUniqueActivesObservation(
const MetricRef metric_ref, const ReportDefinition* report, uint32_t obs_day_index,
uint32_t event_code, const OnDeviceAggregationWindow& window, bool was_active) const {
- auto encoder_result = encoder_.EncodeUniqueActivesObservation(metric_ref, report, obs_day_index,
- event_code, was_active, window);
+ auto encoder_result = encoder_.get().EncodeUniqueActivesObservation(
+ metric_ref, report, obs_day_index, event_code, was_active, window);
if (!encoder_result.status.ok()) {
return encoder_result.status;
}
@@ -858,9 +859,9 @@
const MetricRef metric_ref, const ReportDefinition* report, uint32_t obs_day_index,
const std::string& component, uint32_t event_code, const OnDeviceAggregationWindow& window,
int64_t value) const {
- Encoder::Result encoder_result =
- encoder_.EncodePerDeviceNumericObservation(metric_ref, report, obs_day_index, component,
- UnpackEventCodesProto(event_code), value, window);
+ Encoder::Result encoder_result = encoder_.get().EncodePerDeviceNumericObservation(
+ metric_ref, report, obs_day_index, component, UnpackEventCodesProto(event_code), value,
+ window);
if (!encoder_result.status.ok()) {
return encoder_result.status;
}
@@ -879,7 +880,7 @@
const MetricRef metric_ref, const ReportDefinition* report, uint32_t obs_day_index,
const std::string& component, uint32_t event_code, const OnDeviceAggregationWindow& window,
int64_t value) const {
- Encoder::Result encoder_result = encoder_.EncodePerDeviceHistogramObservation(
+ Encoder::Result encoder_result = encoder_.get().EncodePerDeviceHistogramObservation(
metric_ref, report, obs_day_index, component, UnpackEventCodesProto(event_code), value,
window);
@@ -902,7 +903,7 @@
const ReportDefinition* report,
uint32_t obs_day_index) const {
auto encoder_result =
- encoder_.EncodeReportParticipationObservation(metric_ref, report, obs_day_index);
+ encoder_.get().EncodeReportParticipationObservation(metric_ref, report, obs_day_index);
if (!encoder_result.status.ok()) {
return encoder_result.status;
}
diff --git a/src/local_aggregation/aggregate_store.h b/src/local_aggregation/aggregate_store.h
index 9f9a709..9a0aca1 100644
--- a/src/local_aggregation/aggregate_store.h
+++ b/src/local_aggregation/aggregate_store.h
@@ -70,8 +70,11 @@
// value larger than |kMaxAllowedBackfillDays| is passed.
AggregateStore(const logger::Encoder& encoder,
const logger::ObservationWriter* observation_writer,
- util::ConsistentProtoStore* local_aggregate_proto_store,
- util::ConsistentProtoStore* obs_history_proto_store, size_t backfill_days = 0);
+ util::ConsistentProtoStore& local_aggregate_proto_store,
+ util::ConsistentProtoStore& obs_history_proto_store, size_t backfill_days = 0);
+
+ AggregateStore(AggregateStore&&) = default;
+ AggregateStore& operator=(AggregateStore&&) = default;
// Given a ProjectContext, MetricDefinition, and ReportDefinition checks whether a key with the
// same customer, project, metric, and report ID already exists in the LocalAggregateStore. If
@@ -339,12 +342,12 @@
size_t backfill_days_ = 0;
// Objects used to generate observations.
- const logger::Encoder& encoder_; // not owned
- const logger::ObservationWriter* observation_writer_; // not owned
+ std::reference_wrapper<const logger::Encoder> encoder_; // not owned
+ const logger::ObservationWriter* observation_writer_; // not owned
// Used for loading and backing up the proto stores to disk.
- util::ConsistentProtoStore* local_aggregate_proto_store_; // not owned
- util::ConsistentProtoStore* obs_history_proto_store_; // not owned
+ std::reference_wrapper<util::ConsistentProtoStore> local_aggregate_proto_store_; // not owned
+ std::reference_wrapper<util::ConsistentProtoStore> obs_history_proto_store_; // not owned
logger::InternalMetricsPtr internal_metrics_;
// In memory store of local aggregations and data needed to derive them.
diff --git a/src/local_aggregation/aggregate_store_test.cc b/src/local_aggregation/aggregate_store_test.cc
index 897c731..873530b 100644
--- a/src/local_aggregation/aggregate_store_test.cc
+++ b/src/local_aggregation/aggregate_store_test.cc
@@ -17,6 +17,7 @@
#include "src/lib/util/clock.h"
#include "src/lib/util/datetime_util.h"
+#include "src/lib/util/not_null.h"
#include "src/lib/util/proto_util.h"
#include "src/lib/util/testing/test_with_files.h"
#include "src/local_aggregation/aggregation_utils.h"
@@ -223,7 +224,7 @@
cfg, fs(), *encoder_, observation_writer_.get(), *metadata_builder_);
day_store_created_ = CurrentDayIndex();
test_steady_clock_ = new IncrementingSteadyClock(std::chrono::system_clock::duration(0));
- event_aggregator_mgr_->SetSteadyClock(test_steady_clock_);
+ event_aggregator_mgr_->SetSteadyClock(util::TESTONLY_TakeRawPointer(test_steady_clock_));
}
// Destruct the EventAggregator (thus calling EventAggregator::ShutDown())
@@ -244,45 +245,45 @@
time_zone);
}
- size_t GetBackfillDays() { return event_aggregator_mgr_->aggregate_store_->backfill_days_; }
+ size_t GetBackfillDays() { return event_aggregator_mgr_->aggregate_store_.backfill_days_; }
void SetBackfillDays(size_t num_days) {
- event_aggregator_mgr_->aggregate_store_->backfill_days_ = num_days;
+ event_aggregator_mgr_->aggregate_store_.backfill_days_ = num_days;
}
Status BackUpLocalAggregateStore() {
- return event_aggregator_mgr_->aggregate_store_->BackUpLocalAggregateStore();
+ return event_aggregator_mgr_->aggregate_store_.BackUpLocalAggregateStore();
}
Status BackUpObservationHistory() {
- return event_aggregator_mgr_->aggregate_store_->BackUpObservationHistory();
+ return event_aggregator_mgr_->aggregate_store_.BackUpObservationHistory();
}
- LocalAggregateStore MakeNewLocalAggregateStore(
+ static LocalAggregateStore MakeNewLocalAggregateStore(
uint32_t version = kCurrentLocalAggregateStoreVersion) {
- return event_aggregator_mgr_->aggregate_store_->MakeNewLocalAggregateStore(version);
+ return AggregateStore::MakeNewLocalAggregateStore(version);
}
- AggregatedObservationHistoryStore MakeNewObservationHistoryStore(
+ static AggregatedObservationHistoryStore MakeNewObservationHistoryStore(
uint32_t version = kCurrentObservationHistoryStoreVersion) {
- return event_aggregator_mgr_->aggregate_store_->MakeNewObservationHistoryStore(version);
+ return AggregateStore::MakeNewObservationHistoryStore(version);
}
- Status MaybeUpgradeLocalAggregateStore(LocalAggregateStore* store) {
- return event_aggregator_mgr_->aggregate_store_->MaybeUpgradeLocalAggregateStore(store);
+ static Status MaybeUpgradeLocalAggregateStore(LocalAggregateStore* store) {
+ return AggregateStore::MaybeUpgradeLocalAggregateStore(store);
}
- Status MaybeUpgradeObservationHistoryStore(AggregatedObservationHistoryStore* store) {
- return event_aggregator_mgr_->aggregate_store_->MaybeUpgradeObservationHistoryStore(store);
+ static Status MaybeUpgradeObservationHistoryStore(AggregatedObservationHistoryStore* store) {
+ return AggregateStore::MaybeUpgradeObservationHistoryStore(store);
}
LocalAggregateStore CopyLocalAggregateStore() {
- return event_aggregator_mgr_->aggregate_store_->CopyLocalAggregateStore();
+ return event_aggregator_mgr_->aggregate_store_.CopyLocalAggregateStore();
}
Status GenerateObservations(uint32_t final_day_index_utc, uint32_t final_day_index_local = 0u) {
- return event_aggregator_mgr_->aggregate_store_->GenerateObservations(final_day_index_utc,
- final_day_index_local);
+ return event_aggregator_mgr_->aggregate_store_.GenerateObservations(final_day_index_utc,
+ final_day_index_local);
}
bool IsReportInStore(lib::ReportIdentifier report) {
@@ -294,7 +295,7 @@
key_data.set_report_id(report.report_id());
SerializeToBase64(key_data, &key);
- auto locked = event_aggregator_mgr_->aggregate_store_->protected_aggregate_store_.lock();
+ auto locked = event_aggregator_mgr_->aggregate_store_.protected_aggregate_store_.lock();
return locked->local_aggregate_store.by_report_key().count(key) > 0;
}
@@ -307,7 +308,7 @@
key_data.set_report_id(report.report_id());
SerializeToBase64(key_data, &key);
- auto locked = event_aggregator_mgr_->aggregate_store_->protected_aggregate_store_.lock();
+ auto locked = event_aggregator_mgr_->aggregate_store_.protected_aggregate_store_.lock();
auto aggregates = locked->local_aggregate_store.by_report_key().find(key);
if (aggregates == locked->local_aggregate_store.by_report_key().end()) {
@@ -338,7 +339,7 @@
key_data.set_report_id(report.report_id());
SerializeToBase64(key_data, &key);
- auto locked = event_aggregator_mgr_->aggregate_store_->protected_aggregate_store_.lock();
+ auto locked = event_aggregator_mgr_->aggregate_store_.protected_aggregate_store_.lock();
auto aggregates = locked->local_aggregate_store.by_report_key().find(key);
if (aggregates == locked->local_aggregate_store.by_report_key().end()) {
@@ -363,10 +364,10 @@
return by_day_index->second.numeric_daily_aggregate().value();
}
- AggregateStore* GetAggregateStore() { return event_aggregator_mgr_->aggregate_store_.get(); }
+ AggregateStore& GetAggregateStore() { return event_aggregator_mgr_->aggregate_store_; }
Status GarbageCollect(uint32_t day_index_utc, uint32_t day_index_local = 0u) {
- return event_aggregator_mgr_->aggregate_store_->GarbageCollect(day_index_utc, day_index_local);
+ return event_aggregator_mgr_->aggregate_store_.GarbageCollect(day_index_utc, day_index_local);
}
void DoScheduledTasksNow() {
@@ -399,7 +400,7 @@
.ValueOrDie();
event_record->event()->set_day_index(day_index);
event_record->event()->mutable_event_occurred_event()->set_event_code(event_code);
- Status status = event_aggregator_mgr_->GetEventAggregator()->AddUniqueActivesEvent(
+ Status status = event_aggregator_mgr_->GetEventAggregator().AddUniqueActivesEvent(
metric_report_id.second, *event_record);
if (logged_activity == nullptr) {
return status;
@@ -431,7 +432,7 @@
event_count_event->set_component(component);
event_count_event->add_event_code(event_code);
event_count_event->set_count(count);
- Status status = event_aggregator_mgr_->GetEventAggregator()->AddEventCountEvent(
+ Status status = event_aggregator_mgr_->GetEventAggregator().AddEventCountEvent(
metric_report_id.second, *event_record);
if (logged_values == nullptr) {
return status;
@@ -463,7 +464,7 @@
elapsed_time_event->set_component(component);
elapsed_time_event->add_event_code(event_code);
elapsed_time_event->set_elapsed_micros(micros);
- Status status = event_aggregator_mgr_->GetEventAggregator()->AddElapsedTimeEvent(
+ Status status = event_aggregator_mgr_->GetEventAggregator().AddElapsedTimeEvent(
metric_report_id.second, *event_record);
if (logged_values == nullptr) {
return status;
@@ -496,7 +497,7 @@
frame_rate_event->add_event_code(event_code);
int64_t frames_per_1000_seconds = std::lround(fps * 1000.0);
frame_rate_event->set_frames_per_1000_seconds(frames_per_1000_seconds);
- Status status = event_aggregator_mgr_->GetEventAggregator()->AddFrameRateEvent(
+ Status status = event_aggregator_mgr_->GetEventAggregator().AddFrameRateEvent(
metric_report_id.second, *event_record);
if (logged_values == nullptr) {
return status;
@@ -531,7 +532,7 @@
memory_usage_event->add_event_code(event_code);
}
memory_usage_event->set_bytes(bytes);
- Status status = event_aggregator_mgr_->GetEventAggregator()->AddMemoryUsageEvent(
+ Status status = event_aggregator_mgr_->GetEventAggregator().AddMemoryUsageEvent(
metric_report_id.second, *event_record);
if (logged_values == nullptr) {
return status;
@@ -560,7 +561,7 @@
// of reference.
bool CheckUniqueActivesAggregates(const LoggedActivity& logged_activity,
uint32_t /*current_day_index*/) {
- auto local_aggregate_store = event_aggregator_mgr_->aggregate_store_->CopyLocalAggregateStore();
+ auto local_aggregate_store = event_aggregator_mgr_->aggregate_store_.CopyLocalAggregateStore();
// Check that the LocalAggregateStore contains no more UniqueActives
// aggregates than |logged_activity| and |day_last_garbage_collected_|
// should imply.
@@ -675,7 +676,7 @@
bool CheckPerDeviceNumericAggregates(const LoggedValues& logged_values,
uint32_t /*current_day_index*/) {
- auto local_aggregate_store = event_aggregator_mgr_->aggregate_store_->CopyLocalAggregateStore();
+ auto local_aggregate_store = event_aggregator_mgr_->aggregate_store_.CopyLocalAggregateStore();
// Check that the LocalAggregateStore contains no more PerDeviceNumeric
// aggregates than |logged_values| and |day_last_garbage_collected_| should
// imply.
@@ -922,7 +923,7 @@
void SetUp() override {
AggregateStoreTest::SetUp();
- event_aggregator_mgr_->GetEventAggregator()->UpdateAggregationConfigs(*project_context_);
+ event_aggregator_mgr_->GetEventAggregator().UpdateAggregationConfigs(*project_context_);
}
// Adds an EventOccurredEvent to the local aggregations for the MetricReportId of a locally
@@ -1142,7 +1143,7 @@
EXPECT_FALSE(IsReportInStore(kTestReportIdentifier));
ASSERT_EQ(
StatusCode::OK,
- GetAggregateStore()->MaybeInsertReportConfig(*project_context, metric, report).error_code());
+ GetAggregateStore().MaybeInsertReportConfig(*project_context, metric, report).error_code());
EXPECT_TRUE(IsReportInStore(kTestReportIdentifier));
}
@@ -1154,10 +1155,10 @@
EXPECT_FALSE(IsReportInStore(kTestReportIdentifier));
ASSERT_EQ(
StatusCode::OK,
- GetAggregateStore()->MaybeInsertReportConfig(*project_context, metric, report).error_code());
+ GetAggregateStore().MaybeInsertReportConfig(*project_context, metric, report).error_code());
ASSERT_EQ(
StatusCode::OK,
- GetAggregateStore()->MaybeInsertReportConfig(*project_context, metric, report).error_code());
+ GetAggregateStore().MaybeInsertReportConfig(*project_context, metric, report).error_code());
EXPECT_TRUE(IsReportInStore(kTestReportIdentifier));
}
@@ -1170,7 +1171,7 @@
EXPECT_FALSE(IsReportInStore(kTestReportIdentifier));
ASSERT_EQ(
StatusCode::INVALID_ARGUMENT,
- GetAggregateStore()->MaybeInsertReportConfig(*project_context, metric, report).error_code());
+ GetAggregateStore().MaybeInsertReportConfig(*project_context, metric, report).error_code());
}
// The Aggregate store sets the record for an event to active using SetActive.
@@ -1179,12 +1180,12 @@
auto project_context = GetProjectContextFor(metric);
EXPECT_EQ(StatusCode::OK, event_aggregator_mgr_->GetEventAggregator()
- ->UpdateAggregationConfigs(*project_context)
+ .UpdateAggregationConfigs(*project_context)
.error_code());
EXPECT_FALSE(IsActive(kTestReportIdentifier, kTestEventCode, kTestDayIndex));
ASSERT_EQ(StatusCode::OK, GetAggregateStore()
- ->SetActive(kTestReportIdentifier, kTestEventCode, kTestDayIndex)
+ .SetActive(kTestReportIdentifier, kTestEventCode, kTestDayIndex)
.error_code());
EXPECT_TRUE(IsActive(kTestReportIdentifier, kTestEventCode, kTestDayIndex));
}
@@ -1195,15 +1196,15 @@
auto project_context = GetProjectContextFor(metric);
EXPECT_EQ(StatusCode::OK, event_aggregator_mgr_->GetEventAggregator()
- ->UpdateAggregationConfigs(*project_context)
+ .UpdateAggregationConfigs(*project_context)
.error_code());
EXPECT_FALSE(IsActive(kTestReportIdentifier, kTestEventCode, kTestDayIndex));
ASSERT_EQ(StatusCode::OK, GetAggregateStore()
- ->SetActive(kTestReportIdentifier, kTestEventCode, kTestDayIndex)
+ .SetActive(kTestReportIdentifier, kTestEventCode, kTestDayIndex)
.error_code());
ASSERT_EQ(StatusCode::OK, GetAggregateStore()
- ->SetActive(kTestReportIdentifier, kTestEventCode, kTestDayIndex)
+ .SetActive(kTestReportIdentifier, kTestEventCode, kTestDayIndex)
.error_code());
EXPECT_TRUE(IsActive(kTestReportIdentifier, kTestEventCode, kTestDayIndex));
}
@@ -1215,11 +1216,11 @@
auto project_context = GetProjectContextFor(metric);
EXPECT_EQ(StatusCode::OK, event_aggregator_mgr_->GetEventAggregator()
- ->UpdateAggregationConfigs(*project_context)
+ .UpdateAggregationConfigs(*project_context)
.error_code());
ASSERT_EQ(StatusCode::INVALID_ARGUMENT,
GetAggregateStore()
- ->SetActive(kTestReportIdentifier, kTestEventCode, kTestDayIndex)
+ .SetActive(kTestReportIdentifier, kTestEventCode, kTestDayIndex)
.error_code());
}
@@ -1234,17 +1235,17 @@
const int64_t kSecondValue = 7;
EXPECT_EQ(StatusCode::OK, event_aggregator_mgr_->GetEventAggregator()
- ->UpdateAggregationConfigs(*project_context)
+ .UpdateAggregationConfigs(*project_context)
.error_code());
EXPECT_EQ(StatusCode::OK, GetAggregateStore()
- ->UpdateNumericAggregate(kTestReportIdentifier, "", kTestEventCode,
- kTestDayIndex, kFirstValue)
+ .UpdateNumericAggregate(kTestReportIdentifier, "", kTestEventCode,
+ kTestDayIndex, kFirstValue)
.error_code());
EXPECT_EQ(kFirstValue, GetValue(kTestReportIdentifier, "", kTestEventCode, kTestDayIndex));
EXPECT_EQ(StatusCode::OK, GetAggregateStore()
- ->UpdateNumericAggregate(kTestReportIdentifier, "", kTestEventCode,
- kTestDayIndex, kSecondValue)
+ .UpdateNumericAggregate(kTestReportIdentifier, "", kTestEventCode,
+ kTestDayIndex, kSecondValue)
.error_code());
EXPECT_EQ(kSecondValue + kFirstValue,
GetValue(kTestReportIdentifier, "", kTestEventCode, kTestDayIndex));
@@ -1261,17 +1262,17 @@
const int64_t kSecondValue = 7;
EXPECT_EQ(StatusCode::OK, event_aggregator_mgr_->GetEventAggregator()
- ->UpdateAggregationConfigs(*project_context)
+ .UpdateAggregationConfigs(*project_context)
.error_code());
EXPECT_EQ(StatusCode::OK, GetAggregateStore()
- ->UpdateNumericAggregate(kTestReportIdentifier, "", kTestEventCode,
- kTestDayIndex, kFirstValue)
+ .UpdateNumericAggregate(kTestReportIdentifier, "", kTestEventCode,
+ kTestDayIndex, kFirstValue)
.error_code());
EXPECT_EQ(kFirstValue, GetValue(kTestReportIdentifier, "", kTestEventCode, kTestDayIndex));
EXPECT_EQ(StatusCode::OK, GetAggregateStore()
- ->UpdateNumericAggregate(kTestReportIdentifier, "", kTestEventCode,
- kTestDayIndex, kSecondValue)
+ .UpdateNumericAggregate(kTestReportIdentifier, "", kTestEventCode,
+ kTestDayIndex, kSecondValue)
.error_code());
EXPECT_EQ(kSecondValue, GetValue(kTestReportIdentifier, "", kTestEventCode, kTestDayIndex));
}
@@ -1286,17 +1287,17 @@
const int64_t kSecondValue = 7;
EXPECT_EQ(StatusCode::OK, event_aggregator_mgr_->GetEventAggregator()
- ->UpdateAggregationConfigs(*project_context)
+ .UpdateAggregationConfigs(*project_context)
.error_code());
EXPECT_EQ(StatusCode::OK, GetAggregateStore()
- ->UpdateNumericAggregate(kTestReportIdentifier, "", kTestEventCode,
- kTestDayIndex, kFirstValue)
+ .UpdateNumericAggregate(kTestReportIdentifier, "", kTestEventCode,
+ kTestDayIndex, kFirstValue)
.error_code());
EXPECT_EQ(kFirstValue, GetValue(kTestReportIdentifier, "", kTestEventCode, kTestDayIndex));
EXPECT_EQ(StatusCode::OK, GetAggregateStore()
- ->UpdateNumericAggregate(kTestReportIdentifier, "", kTestEventCode,
- kTestDayIndex, kSecondValue)
+ .UpdateNumericAggregate(kTestReportIdentifier, "", kTestEventCode,
+ kTestDayIndex, kSecondValue)
.error_code());
EXPECT_EQ(kFirstValue, GetValue(kTestReportIdentifier, "", kTestEventCode, kTestDayIndex));
}
@@ -1310,8 +1311,8 @@
ASSERT_EQ(StatusCode::INVALID_ARGUMENT,
GetAggregateStore()
- ->UpdateNumericAggregate(kTestReportIdentifier, "", kTestEventCode, kTestDayIndex,
- /*value*/ 4)
+ .UpdateNumericAggregate(kTestReportIdentifier, "", kTestEventCode, kTestDayIndex,
+ /*value*/ 4)
.error_code());
}
@@ -1320,25 +1321,24 @@
const int64_t kFirstValue = 3;
EXPECT_EQ(0u,
- GetAggregateStore()->GetUniqueActivesLastGeneratedDayIndex(kReportKey, kTestEventCode,
- /*aggregation_days*/ 1));
- GetAggregateStore()->SetUniqueActivesLastGeneratedDayIndex(kReportKey, kTestEventCode,
- /*aggregation_days*/ 1, kFirstValue);
+ GetAggregateStore().GetUniqueActivesLastGeneratedDayIndex(kReportKey, kTestEventCode,
+ /*aggregation_days*/ 1));
+ GetAggregateStore().SetUniqueActivesLastGeneratedDayIndex(kReportKey, kTestEventCode,
+ /*aggregation_days*/ 1, kFirstValue);
EXPECT_EQ(kFirstValue,
- GetAggregateStore()->GetUniqueActivesLastGeneratedDayIndex(kReportKey, kTestEventCode,
- /*aggregation_days*/ 1));
+ GetAggregateStore().GetUniqueActivesLastGeneratedDayIndex(kReportKey, kTestEventCode,
+ /*aggregation_days*/ 1));
}
TEST_F(AggregateStoreTest, SetPerDeviceNumericLastGeneratedDayIndex) {
const std::string kReportKey = "test_key";
const int64_t kFirstValue = 3;
- EXPECT_EQ(0u, GetAggregateStore()->GetPerDeviceNumericLastGeneratedDayIndex(
+ EXPECT_EQ(0u, GetAggregateStore().GetPerDeviceNumericLastGeneratedDayIndex(
kReportKey, "", kTestEventCode,
/*aggregation_days*/ 1));
- GetAggregateStore()->SetPerDeviceNumericLastGeneratedDayIndex(kReportKey, "", kTestEventCode,
- /*aggregation_days*/ 1,
- kFirstValue);
- EXPECT_EQ(kFirstValue, GetAggregateStore()->GetPerDeviceNumericLastGeneratedDayIndex(
+ GetAggregateStore().SetPerDeviceNumericLastGeneratedDayIndex(kReportKey, "", kTestEventCode,
+ /*aggregation_days*/ 1, kFirstValue);
+ EXPECT_EQ(kFirstValue, GetAggregateStore().GetPerDeviceNumericLastGeneratedDayIndex(
kReportKey, "", kTestEventCode,
/*aggregation_days*/ 1));
}
@@ -1346,10 +1346,10 @@
const std::string kReportKey = "test_key";
const int64_t kFirstValue = 3;
- EXPECT_EQ(0u, GetAggregateStore()->GetReportParticipationLastGeneratedDayIndex(kReportKey));
- GetAggregateStore()->SetReportParticipationLastGeneratedDayIndex(kReportKey, kFirstValue);
+ EXPECT_EQ(0u, GetAggregateStore().GetReportParticipationLastGeneratedDayIndex(kReportKey));
+ GetAggregateStore().SetReportParticipationLastGeneratedDayIndex(kReportKey, kFirstValue);
EXPECT_EQ(kFirstValue,
- GetAggregateStore()->GetReportParticipationLastGeneratedDayIndex(kReportKey));
+ GetAggregateStore().GetReportParticipationLastGeneratedDayIndex(kReportKey));
}
// Tests that EventAggregator::GenerateObservations() returns a positive
@@ -1359,7 +1359,7 @@
// Provide the all_report_types test registry to the EventAggregator.
auto project_context = GetTestProject(logger::testing::all_report_types::kCobaltRegistryBase64);
EXPECT_EQ(StatusCode::OK, event_aggregator_mgr_->GetEventAggregator()
- ->UpdateAggregationConfigs(*project_context)
+ .UpdateAggregationConfigs(*project_context)
.error_code());
// Generate locally aggregated Observations for the current day index.
EXPECT_EQ(StatusCode::OK, GenerateObservations(CurrentDayIndex()).error_code());
@@ -1375,7 +1375,7 @@
// Provide the all_report_types test registry to the EventAggregator.
auto project_context = GetTestProject(logger::testing::all_report_types::kCobaltRegistryBase64);
EXPECT_EQ(StatusCode::OK, event_aggregator_mgr_->GetEventAggregator()
- ->UpdateAggregationConfigs(*project_context)
+ .UpdateAggregationConfigs(*project_context)
.error_code());
// Check that Observations are generated when GenerateObservations is called
// for the current day index for the first time.
@@ -1419,7 +1419,7 @@
// Provide the all_report_types test registry to the EventAggregator.
auto project_context = GetTestProject(logger::testing::all_report_types::kCobaltRegistryBase64);
EXPECT_EQ(StatusCode::OK, event_aggregator_mgr_->GetEventAggregator()
- ->UpdateAggregationConfigs(*project_context)
+ .UpdateAggregationConfigs(*project_context)
.error_code());
EXPECT_EQ(StatusCode::OK, GenerateObservations(CurrentDayIndex()).error_code());
std::vector<Observation> observations(0);
diff --git a/src/local_aggregation/event_aggregator.cc b/src/local_aggregation/event_aggregator.cc
index 833edcf..c668941 100644
--- a/src/local_aggregation/event_aggregator.cc
+++ b/src/local_aggregation/event_aggregator.cc
@@ -21,7 +21,7 @@
using logger::EventRecord;
using logger::ProjectContext;
-EventAggregator::EventAggregator(AggregateStore* aggregate_store)
+EventAggregator::EventAggregator(AggregateStore& aggregate_store)
: aggregate_store_(aggregate_store) {}
Status EventAggregator::UpdateAggregationConfigs(const ProjectContext& project_context) {
@@ -32,7 +32,8 @@
for (const auto& report : metric.reports()) {
switch (report.report_type()) {
case ReportDefinition::UNIQUE_N_DAY_ACTIVES: {
- status = aggregate_store_->MaybeInsertReportConfig(project_context, metric, report);
+ status =
+ aggregate_store_.get().MaybeInsertReportConfig(project_context, metric, report);
if (!status.ok()) {
return status;
}
@@ -50,7 +51,8 @@
switch (report.report_type()) {
case ReportDefinition::PER_DEVICE_NUMERIC_STATS:
case ReportDefinition::PER_DEVICE_HISTOGRAM: {
- status = aggregate_store_->MaybeInsertReportConfig(project_context, metric, report);
+ status =
+ aggregate_store_.get().MaybeInsertReportConfig(project_context, metric, report);
if (!status.ok()) {
return status;
}
@@ -91,9 +93,9 @@
.WithContexts(event_record)
.Build();
}
- return aggregate_store_->SetActive(event_record.MetricIdentifier().ForReport(report_id),
- event->event_occurred_event().event_code(),
- event->day_index());
+ return aggregate_store_.get().SetActive(event_record.MetricIdentifier().ForReport(report_id),
+ event->event_occurred_event().event_code(),
+ event->day_index());
}
Status EventAggregator::AddEventCountEvent(uint32_t report_id, const EventRecord& event_record) {
@@ -107,7 +109,7 @@
const EventCountEvent& event_count_event = event->event_count_event();
- return aggregate_store_->UpdateNumericAggregate(
+ return aggregate_store_.get().UpdateNumericAggregate(
event_record.MetricIdentifier().ForReport(report_id), event_count_event.component(),
config::PackEventCodes(event_count_event.event_code()), event->day_index(),
event_count_event.count());
@@ -124,7 +126,7 @@
const ElapsedTimeEvent& elapsed_time_event = event->elapsed_time_event();
- return aggregate_store_->UpdateNumericAggregate(
+ return aggregate_store_.get().UpdateNumericAggregate(
event_record.MetricIdentifier().ForReport(report_id), elapsed_time_event.component(),
config::PackEventCodes(elapsed_time_event.event_code()), event->day_index(),
elapsed_time_event.elapsed_micros());
@@ -140,7 +142,7 @@
}
const FrameRateEvent& frame_rate_event = event->frame_rate_event();
- return aggregate_store_->UpdateNumericAggregate(
+ return aggregate_store_.get().UpdateNumericAggregate(
event_record.MetricIdentifier().ForReport(report_id), frame_rate_event.component(),
config::PackEventCodes(frame_rate_event.event_code()), event->day_index(),
frame_rate_event.frames_per_1000_seconds());
@@ -157,7 +159,7 @@
const MemoryUsageEvent& memory_usage_event = event->memory_usage_event();
- return aggregate_store_->UpdateNumericAggregate(
+ return aggregate_store_.get().UpdateNumericAggregate(
event_record.MetricIdentifier().ForReport(report_id), memory_usage_event.component(),
config::PackEventCodes(memory_usage_event.event_code()), event->day_index(),
memory_usage_event.bytes());
diff --git a/src/local_aggregation/event_aggregator.h b/src/local_aggregation/event_aggregator.h
index 97b5d8f..a2b3bf0 100644
--- a/src/local_aggregation/event_aggregator.h
+++ b/src/local_aggregation/event_aggregator.h
@@ -33,7 +33,12 @@
// Constructs an EventAggregator.
//
// aggregate_store: an AggregateStore, which is used to store the local aggregates.
- explicit EventAggregator(AggregateStore* aggregate_store);
+ explicit EventAggregator(AggregateStore& aggregate_store);
+
+ // EventAggregator should be move-only.
+ EventAggregator(EventAggregator const&) = delete;
+ EventAggregator& operator=(EventAggregator const&) = delete;
+ EventAggregator& operator=(EventAggregator&&) = default;
// Updates the EventAggregator's view of the Cobalt metric and report
// registry.
@@ -94,7 +99,7 @@
Status AddMemoryUsageEvent(uint32_t report_id, const logger::EventRecord& event_record);
private:
- AggregateStore* aggregate_store_; // not owned
+ std::reference_wrapper<AggregateStore> aggregate_store_; // not owned
};
} // namespace cobalt::local_aggregation
diff --git a/src/local_aggregation/event_aggregator_mgr.cc b/src/local_aggregation/event_aggregator_mgr.cc
index b02c0a4..c9593b9 100644
--- a/src/local_aggregation/event_aggregator_mgr.cc
+++ b/src/local_aggregation/event_aggregator_mgr.cc
@@ -9,14 +9,13 @@
#include "src/lib/util/consistent_proto_store.h"
#include "src/lib/util/datetime_util.h"
#include "src/lib/util/file_system.h"
+#include "src/lib/util/not_null.h"
#include "src/lib/util/status_builder.h"
#include "src/public/lib/statusor/status_macros.h"
#include "src/registry/window_size.pb.h"
namespace cobalt::local_aggregation {
-using util::ConsistentProtoStore;
-using util::SteadyClock;
using util::TimeToDayIndex;
const std::chrono::seconds EventAggregatorManager::kDefaultAggregateBackupInterval =
@@ -36,12 +35,12 @@
aggregate_backup_interval_(kDefaultAggregateBackupInterval),
generate_obs_interval_(kDefaultGenerateObsInterval),
gc_interval_(kDefaultGCInterval),
- owned_local_aggregate_proto_store_(
- new ConsistentProtoStore(cfg.local_aggregate_proto_store_path, fs)),
- owned_obs_history_proto_store_(
- new ConsistentProtoStore(cfg.obs_history_proto_store_path, fs)) {
- Reset();
-}
+ steady_clock_(util::MakeNotNullUniquePtr<util::SteadyClock>()),
+ owned_local_aggregate_proto_store_(cfg.local_aggregate_proto_store_path, fs),
+ owned_obs_history_proto_store_(cfg.obs_history_proto_store_path, fs),
+ aggregate_store_(encoder_, observation_writer, owned_local_aggregate_proto_store_,
+ owned_obs_history_proto_store_, backfill_days_),
+ event_aggregator_(aggregate_store_) {}
void EventAggregatorManager::Start(std::unique_ptr<util::SystemClockInterface> clock) {
auto locked = protected_worker_thread_controller_.lock();
@@ -77,7 +76,7 @@
// If shutdown has been requested, back up the LocalAggregateStore and
// exit.
if (locked->shut_down) {
- aggregate_store_->BackUpLocalAggregateStore();
+ aggregate_store_.BackUpLocalAggregateStore();
return;
}
// Sleep until the next scheduled backup of the LocalAggregateStore or
@@ -90,10 +89,10 @@
}
return locked->shut_down || locked->back_up_now;
});
- aggregate_store_->BackUpLocalAggregateStore();
+ aggregate_store_.BackUpLocalAggregateStore();
if (locked->back_up_now) {
locked->back_up_now = false;
- aggregate_store_->BackUpObservationHistory();
+ aggregate_store_.BackUpObservationHistory();
}
// If the worker thread was woken up by a shutdown request, exit.
// Otherwise, complete any scheduled Observation generation and garbage
@@ -125,9 +124,9 @@
"current day index is too small.";
} else {
Status obs_status =
- aggregate_store_->GenerateObservations(yesterday_utc, yesterday_local_time);
+ aggregate_store_.GenerateObservations(yesterday_utc, yesterday_local_time);
if (obs_status.ok()) {
- aggregate_store_->BackUpObservationHistory();
+ aggregate_store_.BackUpObservationHistory();
// We can guarantee that we will never need a metadata if it is older than:
// BackfillManagerDays (currently 3 days) + WindowSize_MAX (currently 30 days).
@@ -145,9 +144,9 @@
LOG_FIRST_N(ERROR, 10) << "EventAggregator is skipping garbage collection because the "
"current day index is too small.";
} else {
- Status gc_status = aggregate_store_->GarbageCollect(yesterday_utc, yesterday_local_time);
+ Status gc_status = aggregate_store_.GarbageCollect(yesterday_utc, yesterday_local_time);
if (gc_status.ok()) {
- aggregate_store_->BackUpLocalAggregateStore();
+ aggregate_store_.BackUpLocalAggregateStore();
} else {
LOG(ERROR) << "GarbageCollect failed with status: " << gc_status.error_message();
}
@@ -166,7 +165,7 @@
}
CB_RETURN_IF_ERROR(
- aggregate_store_->GenerateObservations(final_day_index_utc, final_day_index_local));
+ aggregate_store_.GenerateObservations(final_day_index_utc, final_day_index_local));
// We can guarantee that we will never need a metadata if it is older than:
// BackfillManagerDays (currently 3 days) + WindowSize_MAX (currently 30 days).
@@ -184,13 +183,13 @@
}
void EventAggregatorManager::Reset() {
- aggregate_store_ = std::make_unique<AggregateStore>(
- encoder_, observation_writer_, owned_local_aggregate_proto_store_.get(),
- owned_obs_history_proto_store_.get(), backfill_days_);
- aggregate_store_->ResetInternalMetrics(internal_metrics_);
+ aggregate_store_ =
+ AggregateStore(encoder_, observation_writer_, owned_local_aggregate_proto_store_,
+ owned_obs_history_proto_store_, backfill_days_);
+ aggregate_store_.ResetInternalMetrics(internal_metrics_);
- event_aggregator_ = std::make_unique<EventAggregator>(aggregate_store_.get());
- steady_clock_ = std::make_unique<SteadyClock>();
+ event_aggregator_ = EventAggregator(aggregate_store_);
+ steady_clock_ = util::MakeNotNullUniquePtr<util::SteadyClock>();
}
} // namespace cobalt::local_aggregation
diff --git a/src/local_aggregation/event_aggregator_mgr.h b/src/local_aggregation/event_aggregator_mgr.h
index d2e1bc1..1f884bd 100644
--- a/src/local_aggregation/event_aggregator_mgr.h
+++ b/src/local_aggregation/event_aggregator_mgr.h
@@ -13,6 +13,7 @@
#include "src/lib/util/clock.h"
#include "src/lib/util/consistent_proto_store.h"
+#include "src/lib/util/not_null.h"
#include "src/lib/util/protected_fields.h"
#include "src/local_aggregation/aggregate_store.h"
#include "src/local_aggregation/event_aggregator.h"
@@ -81,7 +82,7 @@
void Reset();
// Returns a pointer to an EventAggregator to be used for logging.
- EventAggregator* GetEventAggregator() { return event_aggregator_.get(); }
+ EventAggregator& GetEventAggregator() { return event_aggregator_; }
// Checks that the worker thread is shut down, and if so, triggers an out of schedule
// AggregateStore::GenerateObservations() and returns its result. Returns kOther if the
@@ -97,15 +98,15 @@
// only useful in testing to verify that the worker thread is not running too frequently.
uint64_t num_runs() const { return num_runs_; }
- void Disable(bool is_disabled) { aggregate_store_->Disable(is_disabled); }
+ void Disable(bool is_disabled) { aggregate_store_.Disable(is_disabled); }
void DeleteData() {
- aggregate_store_->DeleteData();
+ aggregate_store_.DeleteData();
TriggerBackups();
}
void ResetInternalMetrics(logger::InternalMetrics* internal_metrics) {
internal_metrics_ = internal_metrics;
- aggregate_store_->ResetInternalMetrics(internal_metrics_);
+ aggregate_store_.ResetInternalMetrics(internal_metrics_);
}
private:
@@ -172,12 +173,12 @@
std::chrono::steady_clock::time_point next_generate_obs_;
std::chrono::steady_clock::time_point next_gc_;
- std::unique_ptr<util::SteadyClockInterface> steady_clock_;
+ util::PinnedUniquePtr<util::SteadyClockInterface> steady_clock_;
- std::unique_ptr<AggregateStore> aggregate_store_;
- std::unique_ptr<util::ConsistentProtoStore> owned_local_aggregate_proto_store_;
- std::unique_ptr<util::ConsistentProtoStore> owned_obs_history_proto_store_;
- std::unique_ptr<EventAggregator> event_aggregator_;
+ util::ConsistentProtoStore owned_local_aggregate_proto_store_;
+ util::ConsistentProtoStore owned_obs_history_proto_store_;
+ AggregateStore aggregate_store_;
+ EventAggregator event_aggregator_;
static const std::chrono::seconds kDefaultAggregateBackupInterval;
static const std::chrono::seconds kDefaultGenerateObsInterval;
diff --git a/src/local_aggregation/event_aggregator_mgr_test.cc b/src/local_aggregation/event_aggregator_mgr_test.cc
index c2e6d61..e2b488d 100644
--- a/src/local_aggregation/event_aggregator_mgr_test.cc
+++ b/src/local_aggregation/event_aggregator_mgr_test.cc
@@ -8,6 +8,7 @@
#include "src/lib/util/clock.h"
#include "src/lib/util/datetime_util.h"
+#include "src/lib/util/not_null.h"
#include "src/lib/util/proto_util.h"
#include "src/lib/util/testing/test_with_files.h"
#include "src/local_aggregation/test_utils/test_event_aggregator_mgr.h"
@@ -78,13 +79,13 @@
return test_clock;
}
- IncrementingSteadyClock* GetTestSteadyClock() {
+ util::NotNullUniquePtr<IncrementingSteadyClock> GetTestSteadyClock() {
test_steady_clock_ptr_ = new IncrementingSteadyClock(std::chrono::system_clock::duration(0));
- return test_steady_clock_ptr_;
+ return util::TESTONLY_TakeRawPointer(test_steady_clock_ptr_);
}
std::unique_ptr<TestEventAggregatorManager> GetEventAggregatorManager(
- IncrementingSteadyClock* steady_clock) {
+ util::NotNullUniquePtr<IncrementingSteadyClock> steady_clock) {
CobaltConfig cfg = {.client_secret = system_data::ClientSecret::GenerateNewSecret()};
cfg.local_aggregation_backfill_days = 0;
@@ -93,7 +94,7 @@
auto event_aggregator_mgr = std::make_unique<TestEventAggregatorManager>(
cfg, fs_, *encoder_, observation_writer_.get(), *metadata_builder_);
- event_aggregator_mgr->SetSteadyClock(steady_clock);
+ event_aggregator_mgr->SetSteadyClock(std::move(steady_clock));
return event_aggregator_mgr;
}
@@ -127,7 +128,7 @@
bool BackUpHappened() { return fs_.FileExists(aggregate_store_path()); }
static uint32_t NumberOfKVPairsInStore(EventAggregatorManager* event_aggregator_mgr) {
- return event_aggregator_mgr->aggregate_store_->CopyLocalAggregateStore().by_report_key().size();
+ return event_aggregator_mgr->aggregate_store_.CopyLocalAggregateStore().by_report_key().size();
}
// Given a ProjectContext |project_context| and the MetricReportId of a UNIQUE_N_DAY_ACTIVES
@@ -144,12 +145,12 @@
.ValueOrDie();
event_record->event()->set_day_index(day_index);
event_record->event()->mutable_event_occurred_event()->set_event_code(event_code);
- return event_aggregator_mgr->GetEventAggregator()->AddUniqueActivesEvent(
- metric_report_id.second, *event_record);
+ return event_aggregator_mgr->GetEventAggregator().AddUniqueActivesEvent(metric_report_id.second,
+ *event_record);
}
static uint32_t GetNumberOfUniqueActivesAggregates(EventAggregatorManager* event_aggregator_mgr) {
- auto local_aggregate_store = event_aggregator_mgr->aggregate_store_->CopyLocalAggregateStore();
+ auto local_aggregate_store = event_aggregator_mgr->aggregate_store_.CopyLocalAggregateStore();
uint32_t num_aggregates = 0;
for (const auto& [report_key, aggregates] : local_aggregate_store.by_report_key()) {
if (aggregates.type_case() != ReportAggregates::kUniqueActivesAggregates) {
@@ -168,7 +169,7 @@
EventAggregatorManager* event_aggregator_mgr,
const std::shared_ptr<const ProjectContext>& project_context,
const MetricReportId& metric_report_id, uint32_t day_index, uint32_t event_code) {
- auto local_aggregate_store = event_aggregator_mgr->aggregate_store_->CopyLocalAggregateStore();
+ auto local_aggregate_store = event_aggregator_mgr->aggregate_store_.CopyLocalAggregateStore();
std::string key;
if (!SerializeToBase64(MakeAggregationKey(*project_context, metric_report_id), &key)) {
return AssertionFailure() << "Could not serialize key with metric id "
@@ -293,7 +294,7 @@
// Provide the EventAggregator with the all_report_types registry.
auto project_context = GetTestProject(kCobaltRegistryBase64);
EXPECT_EQ(StatusCode::OK, event_aggregator_mgr->GetEventAggregator()
- ->UpdateAggregationConfigs(*project_context)
+ .UpdateAggregationConfigs(*project_context)
.error_code());
// Check that the number of key-value pairs in the LocalAggregateStore is
// now equal to the number of locally aggregated reports in the
@@ -310,7 +311,7 @@
// Provide the EventAggregator with the all_report_types registry.
std::shared_ptr<ProjectContext> project_context = GetTestProject(kCobaltRegistryBase64);
EXPECT_EQ(StatusCode::OK, event_aggregator_mgr->GetEventAggregator()
- ->UpdateAggregationConfigs(*project_context)
+ .UpdateAggregationConfigs(*project_context)
.error_code());
EXPECT_EQ(StatusCode::OK,
@@ -352,7 +353,7 @@
// Provide the EventAggregator with the all_report_types registry.
std::shared_ptr<ProjectContext> project_context = GetTestProject(kCobaltRegistryBase64);
EXPECT_EQ(StatusCode::OK, event_aggregator_mgr->GetEventAggregator()
- ->UpdateAggregationConfigs(*project_context)
+ .UpdateAggregationConfigs(*project_context)
.error_code());
EXPECT_EQ(StatusCode::OK,
@@ -382,7 +383,7 @@
// Provide the EventAggregator with the all_report_types registry.
std::shared_ptr<ProjectContext> project_context = GetTestProject(kCobaltRegistryBase64);
EXPECT_EQ(StatusCode::OK, event_aggregator_mgr->GetEventAggregator()
- ->UpdateAggregationConfigs(*project_context)
+ .UpdateAggregationConfigs(*project_context)
.error_code());
event_aggregator_mgr->Disable(true);
@@ -429,7 +430,7 @@
// Provide the EventAggregator with the all_report_types registry.
std::shared_ptr<ProjectContext> project_context = GetTestProject(kCobaltRegistryBase64);
EXPECT_EQ(StatusCode::OK, event_aggregator_mgr->GetEventAggregator()
- ->UpdateAggregationConfigs(*project_context)
+ .UpdateAggregationConfigs(*project_context)
.error_code());
EXPECT_EQ(StatusCode::OK,
@@ -501,7 +502,7 @@
TriggerAndWaitForDoScheduledTasks(event_aggregator_mgr.get());
EXPECT_EQ(StatusCode::OK, event_aggregator_mgr->GetEventAggregator()
- ->UpdateAggregationConfigs(*project_context)
+ .UpdateAggregationConfigs(*project_context)
.error_code());
EXPECT_EQ(StatusCode::OK, AddUniqueActivesEvent(event_aggregator_mgr.get(), project_context,
expected_id, day_index, /*event_code*/ 1)
diff --git a/src/local_aggregation/event_aggregator_test.cc b/src/local_aggregation/event_aggregator_test.cc
index 52b012f..f6b8110 100644
--- a/src/local_aggregation/event_aggregator_test.cc
+++ b/src/local_aggregation/event_aggregator_test.cc
@@ -17,6 +17,7 @@
#include "src/lib/util/clock.h"
#include "src/lib/util/datetime_util.h"
+#include "src/lib/util/not_null.h"
#include "src/lib/util/proto_util.h"
#include "src/lib/util/testing/test_with_files.h"
#include "src/local_aggregation/aggregation_utils.h"
@@ -121,7 +122,7 @@
unowned_test_clock_ = test_clock_.get();
day_store_created_ = CurrentDayIndex();
test_steady_clock_ = new IncrementingSteadyClock(std::chrono::system_clock::duration(0));
- event_aggregator_mgr_->SetSteadyClock(test_steady_clock_);
+ event_aggregator_mgr_->SetSteadyClock(util::TESTONLY_TakeRawPointer(test_steady_clock_));
}
// Destruct the EventAggregator (thus calling EventAggregator::ShutDown())
@@ -142,10 +143,10 @@
time_zone);
}
- size_t GetBackfillDays() { return event_aggregator_mgr_->aggregate_store_->backfill_days_; }
+ size_t GetBackfillDays() { return event_aggregator_mgr_->aggregate_store_.backfill_days_; }
LocalAggregateStore CopyLocalAggregateStore() {
- return event_aggregator_mgr_->aggregate_store_->CopyLocalAggregateStore();
+ return event_aggregator_mgr_->aggregate_store_.CopyLocalAggregateStore();
}
void TriggerAndWaitForDoScheduledTasks() {
@@ -187,7 +188,7 @@
.ValueOrDie();
event_record->event()->set_day_index(day_index);
event_record->event()->mutable_event_occurred_event()->set_event_code(event_code);
- auto status = event_aggregator_mgr_->GetEventAggregator()->AddUniqueActivesEvent(
+ auto status = event_aggregator_mgr_->GetEventAggregator().AddUniqueActivesEvent(
metric_report_id.second, *event_record);
if (logged_activity == nullptr) {
return status;
@@ -219,7 +220,7 @@
event_count_event->set_component(component);
event_count_event->add_event_code(event_code);
event_count_event->set_count(count);
- Status status = event_aggregator_mgr_->GetEventAggregator()->AddEventCountEvent(
+ Status status = event_aggregator_mgr_->GetEventAggregator().AddEventCountEvent(
metric_report_id.second, *event_record);
if (logged_values == nullptr) {
return status;
@@ -251,7 +252,7 @@
elapsed_time_event->set_component(component);
elapsed_time_event->add_event_code(event_code);
elapsed_time_event->set_elapsed_micros(micros);
- Status status = event_aggregator_mgr_->GetEventAggregator()->AddElapsedTimeEvent(
+ Status status = event_aggregator_mgr_->GetEventAggregator().AddElapsedTimeEvent(
metric_report_id.second, *event_record);
if (logged_values == nullptr) {
return status;
@@ -284,7 +285,7 @@
frame_rate_event->add_event_code(event_code);
int64_t frames_per_1000_seconds = std::lround(fps * 1000.0);
frame_rate_event->set_frames_per_1000_seconds(frames_per_1000_seconds);
- Status status = event_aggregator_mgr_->GetEventAggregator()->AddFrameRateEvent(
+ Status status = event_aggregator_mgr_->GetEventAggregator().AddFrameRateEvent(
metric_report_id.second, *event_record);
if (logged_values == nullptr) {
return status;
@@ -319,7 +320,7 @@
memory_usage_event->add_event_code(event_code);
}
memory_usage_event->set_bytes(bytes);
- Status status = event_aggregator_mgr_->GetEventAggregator()->AddMemoryUsageEvent(
+ Status status = event_aggregator_mgr_->GetEventAggregator().AddMemoryUsageEvent(
metric_report_id.second, *event_record);
if (logged_values == nullptr) {
return status;
@@ -347,7 +348,7 @@
// of reference.
bool CheckUniqueActivesAggregates(const LoggedActivity& logged_activity,
uint32_t /*current_day_index*/) {
- auto local_aggregate_store = event_aggregator_mgr_->aggregate_store_->CopyLocalAggregateStore();
+ auto local_aggregate_store = event_aggregator_mgr_->aggregate_store_.CopyLocalAggregateStore();
// Check that the LocalAggregateStore contains no more UniqueActives
// aggregates than |logged_activity| and |day_last_garbage_collected_|
// should imply.
@@ -462,7 +463,7 @@
bool CheckPerDeviceNumericAggregates(const LoggedValues& logged_values,
uint32_t /*current_day_index*/) {
- auto local_aggregate_store = event_aggregator_mgr_->aggregate_store_->CopyLocalAggregateStore();
+ auto local_aggregate_store = event_aggregator_mgr_->aggregate_store_.CopyLocalAggregateStore();
// Check that the LocalAggregateStore contains no more PerDeviceNumeric
// aggregates than |logged_values| and |day_last_garbage_collected_| should
// imply.
@@ -708,7 +709,7 @@
void SetUp() override {
EventAggregatorTest::SetUp();
- event_aggregator_mgr_->GetEventAggregator()->UpdateAggregationConfigs(*project_context_);
+ event_aggregator_mgr_->GetEventAggregator().UpdateAggregationConfigs(*project_context_);
}
// Adds an EventOccurredEvent to the local aggregations for the MetricReportId of a locally
@@ -836,7 +837,7 @@
auto unique_actives_project_context =
GetTestProject(logger::testing::unique_actives::kCobaltRegistryBase64);
EXPECT_TRUE(event_aggregator_mgr_->GetEventAggregator()
- ->UpdateAggregationConfigs(*unique_actives_project_context)
+ .UpdateAggregationConfigs(*unique_actives_project_context)
.ok());
// Check that the number of key-value pairs in the LocalAggregateStore is
// now equal to the number of locally aggregated reports in the unique_actives
@@ -875,7 +876,7 @@
auto unique_actives_project_context =
GetTestProject(logger::testing::unique_actives::kCobaltRegistryBase64);
EXPECT_TRUE(event_aggregator_mgr_->GetEventAggregator()
- ->UpdateAggregationConfigs(*unique_actives_project_context)
+ .UpdateAggregationConfigs(*unique_actives_project_context)
.ok());
// Check that the number of key-value pairs in the LocalAggregateStore is
// now equal to the number of locally aggregated reports in the unique_actives
@@ -886,7 +887,7 @@
auto unique_actives_noise_free_project_context =
GetTestProject(logger::testing::unique_actives_noise_free::kCobaltRegistryBase64);
EXPECT_TRUE(event_aggregator_mgr_->GetEventAggregator()
- ->UpdateAggregationConfigs(*unique_actives_noise_free_project_context)
+ .UpdateAggregationConfigs(*unique_actives_noise_free_project_context)
.ok());
// Check that the number of key-value pairs in the LocalAggregateStore is
// now equal to the number of distinct MetricReportIds of locally
@@ -931,7 +932,7 @@
std::shared_ptr<ProjectContext> unique_actives_project_context =
GetTestProject(logger::testing::unique_actives::kCobaltRegistryBase64);
EXPECT_TRUE(event_aggregator_mgr_->GetEventAggregator()
- ->UpdateAggregationConfigs(*unique_actives_project_context)
+ .UpdateAggregationConfigs(*unique_actives_project_context)
.ok());
// Attempt to log a UniqueActivesEvent for
// |kEventsOccurredMetricReportId|, which is not in the unique_actives
@@ -946,9 +947,9 @@
bad_event_record->event()->mutable_event_occurred_event()->set_event_code(0u);
EXPECT_EQ(StatusCode::INVALID_ARGUMENT,
event_aggregator_mgr_->GetEventAggregator()
- ->AddUniqueActivesEvent(logger::testing::unique_actives_noise_free::
- kEventsOccurredEventsOccurredUniqueDevicesReportId,
- *bad_event_record)
+ .AddUniqueActivesEvent(logger::testing::unique_actives_noise_free::
+ kEventsOccurredEventsOccurredUniqueDevicesReportId,
+ *bad_event_record)
.error_code());
// Attempt to call AddUniqueActivesEvent() with a valid metric and report
// ID, but with an EventRecord wrapping an Event which is not an
@@ -961,7 +962,7 @@
EXPECT_EQ(
StatusCode::INVALID_ARGUMENT,
event_aggregator_mgr_->GetEventAggregator()
- ->AddUniqueActivesEvent(
+ .AddUniqueActivesEvent(
logger::testing::unique_actives::kFeaturesActiveFeaturesActiveUniqueDevicesReportId,
*bad_event_record2)
.error_code());
@@ -974,13 +975,12 @@
logger::testing::per_device_numeric_stats::kConnectionFailuresMetricReportId.first)
.ValueOrDie();
bad_event_record3->event()->mutable_event_occurred_event();
- EXPECT_EQ(
- StatusCode::INVALID_ARGUMENT,
- event_aggregator_mgr_->GetEventAggregator()
- ->AddEventCountEvent(logger::testing::per_device_numeric_stats::
- kConnectionFailuresConnectionFailuresPerDeviceCountReportId,
- *bad_event_record3)
- .error_code());
+ EXPECT_EQ(StatusCode::INVALID_ARGUMENT,
+ event_aggregator_mgr_->GetEventAggregator()
+ .AddEventCountEvent(logger::testing::per_device_numeric_stats::
+ kConnectionFailuresConnectionFailuresPerDeviceCountReportId,
+ *bad_event_record3)
+ .error_code());
}
// Tests that the LocalAggregateStore is updated as expected when
diff --git a/src/local_aggregation/test_utils/test_event_aggregator_mgr.h b/src/local_aggregation/test_utils/test_event_aggregator_mgr.h
index 0021a10..209c83c 100644
--- a/src/local_aggregation/test_utils/test_event_aggregator_mgr.h
+++ b/src/local_aggregation/test_utils/test_event_aggregator_mgr.h
@@ -5,6 +5,7 @@
#ifndef COBALT_SRC_LOCAL_AGGREGATION_TEST_UTILS_TEST_EVENT_AGGREGATOR_MGR_H_
#define COBALT_SRC_LOCAL_AGGREGATION_TEST_UTILS_TEST_EVENT_AGGREGATOR_MGR_H_
+#include "src/lib/util/not_null.h"
#include "src/local_aggregation/event_aggregator_mgr.h"
namespace cobalt::local_aggregation {
@@ -21,21 +22,21 @@
// Triggers an out of schedule run of GenerateObservations(). This does not change the schedule
// of future runs.
Status GenerateObservations(uint32_t day_index_utc, uint32_t day_index_local = 0u) {
- return EventAggregatorManager::aggregate_store_->GenerateObservations(day_index_utc,
- day_index_local);
+ return EventAggregatorManager::aggregate_store_.GenerateObservations(day_index_utc,
+ day_index_local);
}
// Triggers an out of schedule run of GarbageCollect(). This does not change the schedule of
// future runs.
Status GarbageCollect(uint32_t day_index_utc, uint32_t day_index_local = 0u) {
- return EventAggregatorManager::aggregate_store_->GarbageCollect(day_index_utc, day_index_local);
+ return EventAggregatorManager::aggregate_store_.GarbageCollect(day_index_utc, day_index_local);
}
// Returns the number of aggregates of type per_device_numeric_aggregates.
size_t NumPerDeviceNumericAggregatesInStore() {
size_t count = 0;
for (const auto& aggregates :
- EventAggregatorManager::aggregate_store_->protected_aggregate_store_.lock()
+ EventAggregatorManager::aggregate_store_.protected_aggregate_store_.lock()
->local_aggregate_store.by_report_key()) {
if (aggregates.second.has_numeric_aggregates()) {
count += aggregates.second.numeric_aggregates().by_component().size();
@@ -45,7 +46,9 @@
}
// Sets the EventAggregatorManager's SteadyClockInterface.
- void SetSteadyClock(util::SteadyClockInterface* clock) { steady_clock_.reset(clock); }
+ void SetSteadyClock(util::NotNullUniquePtr<util::SteadyClockInterface> clock) {
+ steady_clock_ = std::move(clock);
+ }
};
} // namespace cobalt::local_aggregation
diff --git a/src/logger/event_loggers.cc b/src/logger/event_loggers.cc
index 4e95319..0c56cf7 100644
--- a/src/logger/event_loggers.cc
+++ b/src/logger/event_loggers.cc
@@ -33,7 +33,7 @@
std::unique_ptr<EventLogger> EventLogger::Create(
MetricDefinition::MetricType metric_type, const Encoder& encoder,
- EventAggregator* event_aggregator, local_aggregation::LocalAggregation* local_aggregation,
+ EventAggregator& event_aggregator, local_aggregation::LocalAggregation* local_aggregation,
const ObservationWriter* observation_writer,
const system_data::SystemDataInterface* system_data,
util::CivilTimeConverterInterface* civil_time_converter) {
@@ -566,7 +566,7 @@
const EventRecord& event_record) {
switch (report.report_type()) {
case ReportDefinition::UNIQUE_N_DAY_ACTIVES: {
- return event_aggregator()->AddUniqueActivesEvent(report.id(), event_record);
+ return event_aggregator().AddUniqueActivesEvent(report.id(), event_record);
}
default:
return Status::OkStatus();
@@ -620,7 +620,7 @@
switch (report.report_type()) {
case ReportDefinition::PER_DEVICE_NUMERIC_STATS:
case ReportDefinition::PER_DEVICE_HISTOGRAM: {
- return event_aggregator()->AddEventCountEvent(report.id(), event_record);
+ return event_aggregator().AddEventCountEvent(report.id(), event_record);
}
default:
return Status::OkStatus();
@@ -688,7 +688,7 @@
switch (report.report_type()) {
case ReportDefinition::PER_DEVICE_NUMERIC_STATS:
case ReportDefinition::PER_DEVICE_HISTOGRAM: {
- return event_aggregator()->AddElapsedTimeEvent(report.id(), event_record);
+ return event_aggregator().AddElapsedTimeEvent(report.id(), event_record);
}
default:
return Status::OkStatus();
@@ -724,7 +724,7 @@
switch (report.report_type()) {
case ReportDefinition::PER_DEVICE_NUMERIC_STATS:
case ReportDefinition::PER_DEVICE_HISTOGRAM: {
- return event_aggregator()->AddFrameRateEvent(report.id(), event_record);
+ return event_aggregator().AddFrameRateEvent(report.id(), event_record);
}
default:
return Status::OkStatus();
@@ -759,7 +759,7 @@
switch (report.report_type()) {
case ReportDefinition::PER_DEVICE_NUMERIC_STATS:
case ReportDefinition::PER_DEVICE_HISTOGRAM: {
- return event_aggregator()->AddMemoryUsageEvent(report.id(), event_record);
+ return event_aggregator().AddMemoryUsageEvent(report.id(), event_record);
}
default:
return Status::OkStatus();
diff --git a/src/logger/event_loggers.h b/src/logger/event_loggers.h
index a87a175..432c15e 100644
--- a/src/logger/event_loggers.h
+++ b/src/logger/event_loggers.h
@@ -33,7 +33,7 @@
// of EventLogger for each of several Metric types.
class EventLogger {
public:
- EventLogger(const Encoder& encoder, local_aggregation::EventAggregator* event_aggregator,
+ EventLogger(const Encoder& encoder, local_aggregation::EventAggregator& event_aggregator,
local_aggregation::LocalAggregation* local_aggregation,
const ObservationWriter* observation_writer,
const system_data::SystemDataInterface* system_data,
@@ -54,7 +54,7 @@
// The remaining parameters are passed to the EventLogger constructor.
static std::unique_ptr<EventLogger> Create(
MetricDefinition::MetricType metric_type, const Encoder& encoder,
- local_aggregation::EventAggregator* event_aggregator,
+ local_aggregation::EventAggregator& event_aggregator,
local_aggregation::LocalAggregation* local_aggregation,
const ObservationWriter* observation_writer,
const system_data::SystemDataInterface* system_data,
@@ -73,7 +73,7 @@
protected:
const Encoder& encoder() { return encoder_; }
- local_aggregation::EventAggregator* event_aggregator() { return event_aggregator_; }
+ local_aggregation::EventAggregator& event_aggregator() { return event_aggregator_; }
local_aggregation::LocalAggregation* local_aggregation() { return local_aggregation_; }
static Encoder::Result BadReportType(const std::string& full_metric_name,
@@ -152,7 +152,7 @@
static void TraceLogSuccess(const EventRecord& event_record, const std::string& trace);
const Encoder& encoder_;
- local_aggregation::EventAggregator* event_aggregator_;
+ local_aggregation::EventAggregator& event_aggregator_;
local_aggregation::LocalAggregation* local_aggregation_;
const ObservationWriter* observation_writer_;
const system_data::SystemDataInterface* system_data_;
diff --git a/src/logger/event_loggers_test.cc b/src/logger/event_loggers_test.cc
index ac5b46d..d71be4c 100644
--- a/src/logger/event_loggers_test.cc
+++ b/src/logger/event_loggers_test.cc
@@ -123,7 +123,7 @@
cfg, project_context_factory_.get(), &system_data_, *metadata_builder_, fs(),
observation_writer_.get(), civil_time_converter_.get());
project_context_ = GetTestProject(registry_base64);
- event_aggregator_mgr_->GetEventAggregator()->UpdateAggregationConfigs(*project_context_);
+ event_aggregator_mgr_->GetEventAggregator().UpdateAggregationConfigs(*project_context_);
logger_ = std::make_unique<EventLoggerClass>(
*encoder_, event_aggregator_mgr_->GetEventAggregator(), local_aggregation_.get(),
observation_writer_.get(), &system_data_, civil_time_converter_.get());
@@ -1508,7 +1508,7 @@
event_aggregator_mgr_ = std::make_unique<TestEventAggregatorManager>(
cfg, fs(), *encoder_, observation_writer_.get(), *metadata_builder_);
- event_aggregator_mgr_->GetEventAggregator()->UpdateAggregationConfigs(*project_context_);
+ event_aggregator_mgr_->GetEventAggregator().UpdateAggregationConfigs(*project_context_);
local_aggregation_ = std::make_unique<LocalAggregation>(
cfg, project_context_factory_.get(), &system_data_, *metadata_builder_, fs(),
observation_writer_.get(), civil_time_converter_.get());
@@ -1752,7 +1752,7 @@
event_aggregator_mgr_ = std::make_unique<TestEventAggregatorManager>(
cfg, fs(), *encoder_, observation_writer_.get(), *metadata_builder_);
- event_aggregator_mgr_->GetEventAggregator()->UpdateAggregationConfigs(*project_context_);
+ event_aggregator_mgr_->GetEventAggregator().UpdateAggregationConfigs(*project_context_);
local_aggregation_ = std::make_unique<LocalAggregation>(
cfg, project_context_factory_.get(), &system_data_, *metadata_builder_, fs(),
observation_writer_.get(), civil_time_converter_.get());
diff --git a/src/logger/logger.cc b/src/logger/logger.cc
index b33ff6f..279775e 100644
--- a/src/logger/logger.cc
+++ b/src/logger/logger.cc
@@ -41,7 +41,7 @@
} // namespace
Logger::Logger(std::unique_ptr<ProjectContext> project_context, const Encoder& encoder,
- EventAggregator* event_aggregator,
+ EventAggregator& event_aggregator,
local_aggregation::LocalAggregation* local_aggregation,
ObservationWriter* observation_writer, system_data::SystemDataInterface* system_data,
util::CivilTimeConverterInterface* civil_time_converter,
@@ -53,7 +53,7 @@
enable_replacement_metrics, internal_metrics) {}
Logger::Logger(std::unique_ptr<ProjectContext> project_context, const Encoder& encoder,
- EventAggregator* event_aggregator,
+ EventAggregator& event_aggregator,
local_aggregation::LocalAggregation* local_aggregation,
ObservationWriter* observation_writer, system_data::SystemDataInterface* system_data,
util::ValidatedClockInterface* validated_clock,
@@ -74,7 +74,6 @@
enable_replacement_metrics_(enable_replacement_metrics),
internal_metrics_(internal_metrics) {
CHECK(project_context_);
- CHECK(event_aggregator_);
CHECK(local_aggregation_);
CHECK(observation_writer_);
CHECK(civil_time_converter_);
@@ -85,7 +84,7 @@
}
// If we were passed an internal Logger then we are not an internal Logger.
is_internal_logger_ = !internal_metrics;
- Status status = event_aggregator_->UpdateAggregationConfigs(*project_context_);
+ Status status = event_aggregator_.UpdateAggregationConfigs(*project_context_);
if (!status.ok()) {
LOG(ERROR) << "Failed to provide aggregation configurations to the "
"EventAggregator. "
diff --git a/src/logger/logger.h b/src/logger/logger.h
index fc10bcc..09e4d77 100644
--- a/src/logger/logger.h
+++ b/src/logger/logger.h
@@ -76,7 +76,7 @@
// Logger to send metrics about Cobalt to Cobalt. If nullptr, no such internal
// logging will be performed by this Logger.
Logger(std::unique_ptr<ProjectContext> project_context, const Encoder& encoder,
- local_aggregation::EventAggregator* event_aggregator,
+ local_aggregation::EventAggregator& event_aggregator,
local_aggregation::LocalAggregation* local_aggregation,
ObservationWriter* observation_writer, system_data::SystemDataInterface* system_data,
util::CivilTimeConverterInterface* civil_time_converter,
@@ -125,7 +125,7 @@
// Logger to send metrics about Cobalt to Cobalt. If nullptr, no such internal
// logging will be performed by this Logger.
Logger(std::unique_ptr<ProjectContext> project_context, const Encoder& encoder,
- local_aggregation::EventAggregator* event_aggregator,
+ local_aggregation::EventAggregator& event_aggregator,
local_aggregation::LocalAggregation* local_aggregation,
ObservationWriter* observation_writer, system_data::SystemDataInterface* system_data,
util::ValidatedClockInterface* validated_clock,
@@ -202,7 +202,7 @@
const std::shared_ptr<const ProjectContext> project_context_;
const Encoder& encoder_;
- local_aggregation::EventAggregator* event_aggregator_;
+ local_aggregation::EventAggregator& event_aggregator_;
local_aggregation::LocalAggregation* local_aggregation_;
const ObservationWriter* observation_writer_;
const system_data::SystemDataInterface* system_data_;
diff --git a/src/logger/undated_event_manager.cc b/src/logger/undated_event_manager.cc
index 3085deb..e6b026e 100644
--- a/src/logger/undated_event_manager.cc
+++ b/src/logger/undated_event_manager.cc
@@ -19,7 +19,7 @@
using local_aggregation::EventAggregator;
-UndatedEventManager::UndatedEventManager(const Encoder& encoder, EventAggregator* event_aggregator,
+UndatedEventManager::UndatedEventManager(const Encoder& encoder, EventAggregator& event_aggregator,
local_aggregation::LocalAggregation* local_aggregation,
ObservationWriter* observation_writer,
system_data::SystemDataInterface* system_data,
@@ -32,7 +32,6 @@
system_data_(system_data),
civil_time_converter_(civil_time_converter),
max_saved_events_(max_saved_events) {
- CHECK(event_aggregator_);
CHECK(local_aggregation_);
CHECK(observation_writer_);
CHECK(civil_time_converter_);
diff --git a/src/logger/undated_event_manager.h b/src/logger/undated_event_manager.h
index 1cd0d91..a7b8ae9 100644
--- a/src/logger/undated_event_manager.h
+++ b/src/logger/undated_event_manager.h
@@ -51,7 +51,7 @@
//
// |max_saved_events| The maximum number of saved events. When this limit is reached, old events
// are dropped to make room for new events.
- UndatedEventManager(const Encoder& encoder, local_aggregation::EventAggregator* event_aggregator,
+ UndatedEventManager(const Encoder& encoder, local_aggregation::EventAggregator& event_aggregator,
local_aggregation::LocalAggregation* local_aggregation,
ObservationWriter* observation_writer,
system_data::SystemDataInterface* system_data,
@@ -85,7 +85,7 @@
// Used only to construct EventLogger instances.
const Encoder& encoder_;
- local_aggregation::EventAggregator* event_aggregator_;
+ local_aggregation::EventAggregator& event_aggregator_;
local_aggregation::LocalAggregation* local_aggregation_;
const ObservationWriter* observation_writer_;
const system_data::SystemDataInterface* system_data_;