[cobalt][LocalAggregation] Move EventAggregatorManager code to the
appropriate files
- removes thread management and memory management from the
EventAggregator and moves it to the EventAggregatorManager
- Small changes to tests to match this change
- deletes GenerateObservationsNoWorker()
- the AggregateStoreTest is minimally changed as it will be reworked in
a following CL.
Bug: 40853
Change-Id: Id409279bc92edbbe50089a0eaa32bc8e5d5230aa
diff --git a/src/bin/test_app/test_app.cc b/src/bin/test_app/test_app.cc
index 33269cc..a2caedf 100644
--- a/src/bin/test_app/test_app.cc
+++ b/src/bin/test_app/test_app.cc
@@ -418,8 +418,7 @@
}
bool RealLoggerFactory::GenerateAggregatedObservations(uint32_t day_index) {
- return kOK == event_aggregator_mgr_->GetEventAggregator()->aggregate_store_->GenerateObservations(
- day_index);
+ return kOK == event_aggregator_mgr_->aggregate_store_->GenerateObservations(day_index);
}
bool RealLoggerFactory::SendAccumulatedObservations() {
diff --git a/src/local_aggregation/aggregate_store.h b/src/local_aggregation/aggregate_store.h
index 7875d43..c1c5e16 100644
--- a/src/local_aggregation/aggregate_store.h
+++ b/src/local_aggregation/aggregate_store.h
@@ -147,7 +147,6 @@
uint32_t final_day_index_local = 0u);
private:
- friend class EventAggregator; // used for transition during redesign.
friend class AggregateStoreTest;
friend class EventAggregatorTest;
friend class EventAggregatorManagerTest;
diff --git a/src/local_aggregation/aggregate_store_test.cc b/src/local_aggregation/aggregate_store_test.cc
index 5237df2..272e216 100644
--- a/src/local_aggregation/aggregate_store_test.cc
+++ b/src/local_aggregation/aggregate_store_test.cc
@@ -18,7 +18,7 @@
#include "src/lib/util/datetime_util.h"
#include "src/lib/util/proto_util.h"
#include "src/local_aggregation/aggregation_utils.h"
-#include "src/local_aggregation/event_aggregator.h"
+#include "src/local_aggregation/event_aggregator_mgr.h"
#include "src/logger/logger_test_utils.h"
#include "src/logger/testing_constants.h"
#include "src/pb/event.pb.h"
@@ -204,9 +204,9 @@
}
void ResetEventAggregator() {
- event_aggregator_ = std::make_unique<EventAggregator>(encoder_.get(), observation_writer_.get(),
- local_aggregate_proto_store_.get(),
- obs_history_proto_store_.get());
+ event_aggregator_mgr_ = std::make_unique<EventAggregatorManager>(
+ encoder_.get(), observation_writer_.get(), local_aggregate_proto_store_.get(),
+ obs_history_proto_store_.get());
// Pass this clock to the EventAggregator::Start method, if it is called.
test_clock_ = std::make_unique<IncrementingSystemClock>(std::chrono::system_clock::duration(0));
// Initilize it to 10 years after the beginning of time.
@@ -215,13 +215,13 @@
unowned_test_clock_ = test_clock_.get();
day_store_created_ = CurrentDayIndex();
test_steady_clock_ = new IncrementingSteadyClock(std::chrono::system_clock::duration(0));
- event_aggregator_->SetSteadyClock(test_steady_clock_);
+ event_aggregator_mgr_->SetSteadyClock(test_steady_clock_);
}
// Destruct the EventAggregator (thus calling EventAggregator::ShutDown())
// before destructing the objects which the EventAggregator points to but does
// not own.
- void TearDown() override { event_aggregator_.reset(); }
+ void TearDown() override { event_aggregator_mgr_.reset(); }
// Advances |test_clock_| by |num_seconds| seconds.
void AdvanceClock(int num_seconds) {
@@ -236,45 +236,45 @@
time_zone);
}
- size_t GetBackfillDays() { return event_aggregator_->aggregate_store_->backfill_days_; }
+ size_t GetBackfillDays() { return event_aggregator_mgr_->aggregate_store_->backfill_days_; }
void SetBackfillDays(size_t num_days) {
- event_aggregator_->aggregate_store_->backfill_days_ = num_days;
+ event_aggregator_mgr_->aggregate_store_->backfill_days_ = num_days;
}
Status BackUpLocalAggregateStore() {
- return event_aggregator_->aggregate_store_->BackUpLocalAggregateStore();
+ return event_aggregator_mgr_->aggregate_store_->BackUpLocalAggregateStore();
}
Status BackUpObservationHistory() {
- return event_aggregator_->aggregate_store_->BackUpObservationHistory();
+ return event_aggregator_mgr_->aggregate_store_->BackUpObservationHistory();
}
LocalAggregateStore MakeNewLocalAggregateStore(
uint32_t version = kCurrentLocalAggregateStoreVersion) {
- return event_aggregator_->aggregate_store_->MakeNewLocalAggregateStore(version);
+ return event_aggregator_mgr_->aggregate_store_->MakeNewLocalAggregateStore(version);
}
AggregatedObservationHistoryStore MakeNewObservationHistoryStore(
uint32_t version = kCurrentObservationHistoryStoreVersion) {
- return event_aggregator_->aggregate_store_->MakeNewObservationHistoryStore(version);
+ return event_aggregator_mgr_->aggregate_store_->MakeNewObservationHistoryStore(version);
}
Status MaybeUpgradeLocalAggregateStore(LocalAggregateStore* store) {
- return event_aggregator_->aggregate_store_->MaybeUpgradeLocalAggregateStore(store);
+ return event_aggregator_mgr_->aggregate_store_->MaybeUpgradeLocalAggregateStore(store);
}
Status MaybeUpgradeObservationHistoryStore(AggregatedObservationHistoryStore* store) {
- return event_aggregator_->aggregate_store_->MaybeUpgradeObservationHistoryStore(store);
+ return event_aggregator_mgr_->aggregate_store_->MaybeUpgradeObservationHistoryStore(store);
}
LocalAggregateStore CopyLocalAggregateStore() {
- return event_aggregator_->aggregate_store_->CopyLocalAggregateStore();
+ return event_aggregator_mgr_->aggregate_store_->CopyLocalAggregateStore();
}
Status GenerateObservations(uint32_t final_day_index_utc, uint32_t final_day_index_local = 0u) {
- return event_aggregator_->GenerateObservationsNoWorker(final_day_index_utc,
- final_day_index_local);
+ return event_aggregator_mgr_->aggregate_store_->GenerateObservations(final_day_index_utc,
+ final_day_index_local);
}
bool IsReportInStore(uint32_t customer_id, uint32_t project_id, uint32_t metric_id,
@@ -287,7 +287,7 @@
key_data.set_report_id(report_id);
SerializeToBase64(key_data, &key);
- auto locked = event_aggregator_->aggregate_store_->protected_aggregate_store_.lock();
+ auto locked = event_aggregator_mgr_->aggregate_store_->protected_aggregate_store_.lock();
return locked->local_aggregate_store.by_report_key().count(key) > 0;
}
@@ -301,7 +301,7 @@
key_data.set_report_id(report_id);
SerializeToBase64(key_data, &key);
- auto locked = event_aggregator_->aggregate_store_->protected_aggregate_store_.lock();
+ auto locked = event_aggregator_mgr_->aggregate_store_->protected_aggregate_store_.lock();
auto aggregates = locked->local_aggregate_store.by_report_key().find(key);
if (aggregates == locked->local_aggregate_store.by_report_key().end()) {
@@ -333,7 +333,7 @@
key_data.set_report_id(report_id);
SerializeToBase64(key_data, &key);
- auto locked = event_aggregator_->aggregate_store_->protected_aggregate_store_.lock();
+ auto locked = event_aggregator_mgr_->aggregate_store_->protected_aggregate_store_.lock();
auto aggregates = locked->local_aggregate_store.by_report_key().find(key);
if (aggregates == locked->local_aggregate_store.by_report_key().end()) {
@@ -358,18 +358,18 @@
return by_day_index->second.numeric_daily_aggregate().value();
}
- AggregateStore* GetAggregateStore() { return event_aggregator_->aggregate_store_.get(); }
+ AggregateStore* GetAggregateStore() { return event_aggregator_mgr_->aggregate_store_.get(); }
Status GarbageCollect(uint32_t day_index_utc, uint32_t day_index_local = 0u) {
- return event_aggregator_->aggregate_store_->GarbageCollect(day_index_utc, day_index_local);
+ return event_aggregator_mgr_->aggregate_store_->GarbageCollect(day_index_utc, day_index_local);
}
void DoScheduledTasksNow() {
// Steady values don't matter, just tell DoScheduledTasks to run everything.
auto steady_time = std::chrono::steady_clock::now();
- event_aggregator_->next_generate_obs_ = steady_time;
- event_aggregator_->next_gc_ = steady_time;
- event_aggregator_->DoScheduledTasks(unowned_test_clock_->now(), steady_time);
+ event_aggregator_mgr_->next_generate_obs_ = steady_time;
+ event_aggregator_mgr_->next_gc_ = steady_time;
+ event_aggregator_mgr_->DoScheduledTasks(unowned_test_clock_->now(), steady_time);
}
// Clears the FakeObservationStore and resets the counts of Observations
@@ -392,7 +392,8 @@
EventRecord event_record(std::move(project_context), metric_report_id.first);
event_record.event()->set_day_index(day_index);
event_record.event()->mutable_occurrence_event()->set_event_code(event_code);
- auto status = event_aggregator_->AddUniqueActivesEvent(metric_report_id.second, event_record);
+ auto status = event_aggregator_mgr_->GetEventAggregator()->AddUniqueActivesEvent(
+ metric_report_id.second, event_record);
if (logged_activity == nullptr) {
return status;
}
@@ -421,7 +422,8 @@
count_event->set_component(component);
count_event->add_event_code(event_code);
count_event->set_count(count);
- auto status = event_aggregator_->AddCountEvent(metric_report_id.second, event_record);
+ auto status = event_aggregator_mgr_->GetEventAggregator()->AddCountEvent(
+ metric_report_id.second, event_record);
if (logged_values == nullptr) {
return status;
}
@@ -450,7 +452,8 @@
elapsed_time_event->set_component(component);
elapsed_time_event->add_event_code(event_code);
elapsed_time_event->set_elapsed_micros(micros);
- auto status = event_aggregator_->AddElapsedTimeEvent(metric_report_id.second, event_record);
+ auto status = event_aggregator_mgr_->GetEventAggregator()->AddElapsedTimeEvent(
+ metric_report_id.second, event_record);
if (logged_values == nullptr) {
return status;
}
@@ -480,7 +483,8 @@
frame_rate_event->add_event_code(event_code);
int64_t frames_per_1000_seconds = std::round(fps * 1000.0);
frame_rate_event->set_frames_per_1000_seconds(frames_per_1000_seconds);
- auto status = event_aggregator_->AddFrameRateEvent(metric_report_id.second, event_record);
+ auto status = event_aggregator_mgr_->GetEventAggregator()->AddFrameRateEvent(
+ metric_report_id.second, event_record);
if (logged_values == nullptr) {
return status;
}
@@ -512,7 +516,8 @@
memory_usage_event->add_event_code(event_code);
}
memory_usage_event->set_bytes(bytes);
- auto status = event_aggregator_->AddMemoryUsageEvent(metric_report_id.second, event_record);
+ auto status = event_aggregator_mgr_->GetEventAggregator()->AddMemoryUsageEvent(
+ metric_report_id.second, event_record);
if (logged_values == nullptr) {
return status;
}
@@ -539,7 +544,7 @@
// of reference.
bool CheckUniqueActivesAggregates(const LoggedActivity& logged_activity,
uint32_t /*current_day_index*/) {
- auto local_aggregate_store = event_aggregator_->aggregate_store_->CopyLocalAggregateStore();
+ auto local_aggregate_store = event_aggregator_mgr_->aggregate_store_->CopyLocalAggregateStore();
// Check that the LocalAggregateStore contains no more UniqueActives
// aggregates than |logged_activity| and |day_last_garbage_collected_|
// should imply.
@@ -654,7 +659,7 @@
bool CheckPerDeviceNumericAggregates(const LoggedValues& logged_values,
uint32_t /*current_day_index*/) {
- auto local_aggregate_store = event_aggregator_->aggregate_store_->CopyLocalAggregateStore();
+ auto local_aggregate_store = event_aggregator_mgr_->aggregate_store_->CopyLocalAggregateStore();
// Check that the LocalAggregateStore contains no more PerDeviceNumeric
// aggregates than |logged_values| and |day_last_garbage_collected_| should
// imply.
@@ -869,7 +874,7 @@
: day_store_created_ - backfill_days;
}
- std::unique_ptr<EventAggregator> event_aggregator_;
+ std::unique_ptr<EventAggregatorManager> event_aggregator_mgr_;
std::unique_ptr<MockConsistentProtoStore> local_aggregate_proto_store_;
std::unique_ptr<MockConsistentProtoStore> obs_history_proto_store_;
std::unique_ptr<ObservationWriter> observation_writer_;
@@ -901,7 +906,7 @@
void SetUp() override {
AggregateStoreTest::SetUp();
- event_aggregator_->UpdateAggregationConfigs(*project_context_);
+ event_aggregator_mgr_->GetEventAggregator()->UpdateAggregationConfigs(*project_context_);
}
// Adds an OccurrenceEvent to the local aggregations for the MetricReportId of a locally
@@ -1015,17 +1020,17 @@
protected:
void SetUp() override { AggregateStoreTest::SetUp(); }
- void ShutDownWorkerThread() { event_aggregator_->ShutDown(); }
+ void ShutDownWorkerThread() { event_aggregator_mgr_->ShutDown(); }
bool in_shutdown_state() { return (shutdown_flag_set() && !worker_joinable()); }
bool in_run_state() { return (!shutdown_flag_set() && worker_joinable()); }
bool shutdown_flag_set() {
- return event_aggregator_->protected_worker_thread_controller_.const_lock()->shut_down;
+ return event_aggregator_mgr_->protected_worker_thread_controller_.const_lock()->shut_down;
}
- bool worker_joinable() { return event_aggregator_->worker_thread_.joinable(); }
+ bool worker_joinable() { return event_aggregator_mgr_->worker_thread_.joinable(); }
};
// Tests that the Read() method of each ConsistentProtoStore is called once
@@ -1154,7 +1159,8 @@
kTestMetricId, kTestReportId);
auto project_context = GetProjectContextFor(metric);
- EXPECT_EQ(kOK, event_aggregator_->UpdateAggregationConfigs(*project_context));
+ EXPECT_EQ(
+ kOK, event_aggregator_mgr_->GetEventAggregator()->UpdateAggregationConfigs(*project_context));
EXPECT_FALSE(IsActive(kTestCustomerId, kTestProjectId, kTestMetricId, kTestReportId,
kTestEventCode, kTestDayIndex));
@@ -1170,7 +1176,8 @@
kTestMetricId, kTestReportId);
auto project_context = GetProjectContextFor(metric);
- EXPECT_EQ(kOK, event_aggregator_->UpdateAggregationConfigs(*project_context));
+ EXPECT_EQ(
+ kOK, event_aggregator_mgr_->GetEventAggregator()->UpdateAggregationConfigs(*project_context));
EXPECT_FALSE(IsActive(kTestCustomerId, kTestProjectId, kTestMetricId, kTestReportId,
kTestEventCode, kTestDayIndex));
@@ -1189,7 +1196,8 @@
kTestMetricId, kTestReportId);
auto project_context = GetProjectContextFor(metric);
- EXPECT_EQ(kOK, event_aggregator_->UpdateAggregationConfigs(*project_context));
+ EXPECT_EQ(
+ kOK, event_aggregator_mgr_->GetEventAggregator()->UpdateAggregationConfigs(*project_context));
ASSERT_EQ(kInvalidArguments,
GetAggregateStore()->SetActive(kTestCustomerId, kTestProjectId, kTestMetricId,
kTestReportId, kTestEventCode, kTestDayIndex));
@@ -1205,7 +1213,8 @@
const int64_t kFirstValue = 3;
const int64_t kSecondValue = 7;
- EXPECT_EQ(kOK, event_aggregator_->UpdateAggregationConfigs(*project_context));
+ EXPECT_EQ(
+ kOK, event_aggregator_mgr_->GetEventAggregator()->UpdateAggregationConfigs(*project_context));
EXPECT_EQ(kOK, GetAggregateStore()->UpdateNumericAggregate(
kTestCustomerId, kTestProjectId, kTestMetricId, kTestReportId, "",
@@ -1229,7 +1238,8 @@
const int64_t kFirstValue = 3;
const int64_t kSecondValue = 7;
- EXPECT_EQ(kOK, event_aggregator_->UpdateAggregationConfigs(*project_context));
+ EXPECT_EQ(
+ kOK, event_aggregator_mgr_->GetEventAggregator()->UpdateAggregationConfigs(*project_context));
EXPECT_EQ(kOK, GetAggregateStore()->UpdateNumericAggregate(
kTestCustomerId, kTestProjectId, kTestMetricId, kTestReportId, "",
@@ -1252,7 +1262,8 @@
const int64_t kFirstValue = 3;
const int64_t kSecondValue = 7;
- EXPECT_EQ(kOK, event_aggregator_->UpdateAggregationConfigs(*project_context));
+ EXPECT_EQ(
+ kOK, event_aggregator_mgr_->GetEventAggregator()->UpdateAggregationConfigs(*project_context));
EXPECT_EQ(kOK, GetAggregateStore()->UpdateNumericAggregate(
kTestCustomerId, kTestProjectId, kTestMetricId, kTestReportId, "",
@@ -1284,7 +1295,8 @@
TEST_F(AggregateStoreTest, GenerateObservationsNoEvents) {
// Provide the all_report_types test registry to the EventAggregator.
auto project_context = GetTestProject(logger::testing::all_report_types::kCobaltRegistryBase64);
- EXPECT_EQ(kOK, event_aggregator_->UpdateAggregationConfigs(*project_context));
+ EXPECT_EQ(
+ kOK, event_aggregator_mgr_->GetEventAggregator()->UpdateAggregationConfigs(*project_context));
// Generate locally aggregated Observations for the current day index.
EXPECT_EQ(kOK, GenerateObservations(CurrentDayIndex()));
std::vector<Observation2> observations(0);
@@ -1298,7 +1310,8 @@
TEST_F(AggregateStoreTest, GenerateObservationsTwice) {
// Provide the all_report_types test registry to the EventAggregator.
auto project_context = GetTestProject(logger::testing::all_report_types::kCobaltRegistryBase64);
- EXPECT_EQ(kOK, event_aggregator_->UpdateAggregationConfigs(*project_context));
+ EXPECT_EQ(
+ kOK, event_aggregator_mgr_->GetEventAggregator()->UpdateAggregationConfigs(*project_context));
// Check that Observations are generated when GenerateObservations is called
// for the current day index for the first time.
auto current_day_index = CurrentDayIndex();
@@ -1338,7 +1351,8 @@
ResetEventAggregator();
// Provide the all_report_types test registry to the EventAggregator.
auto project_context = GetTestProject(logger::testing::all_report_types::kCobaltRegistryBase64);
- EXPECT_EQ(kOK, event_aggregator_->UpdateAggregationConfigs(*project_context));
+ EXPECT_EQ(
+ kOK, event_aggregator_mgr_->GetEventAggregator()->UpdateAggregationConfigs(*project_context));
EXPECT_EQ(kOK, GenerateObservations(CurrentDayIndex()));
std::vector<Observation2> observations(0);
EXPECT_TRUE(FetchAggregatedObservations(
diff --git a/src/local_aggregation/event_aggregator.cc b/src/local_aggregation/event_aggregator.cc
index f0a2e38..d54aa98 100644
--- a/src/local_aggregation/event_aggregator.cc
+++ b/src/local_aggregation/event_aggregator.cc
@@ -16,50 +16,14 @@
namespace cobalt::local_aggregation {
-using logger::Encoder;
using logger::EventRecord;
using logger::kInvalidArguments;
using logger::kOK;
-using logger::kOther;
-using logger::ObservationWriter;
using logger::ProjectContext;
using logger::Status;
-using util::ConsistentProtoStore;
-using util::SteadyClock;
-using util::TimeToDayIndex;
-EventAggregator::EventAggregator(const Encoder* encoder,
- const ObservationWriter* observation_writer,
- ConsistentProtoStore* local_aggregate_proto_store,
- ConsistentProtoStore* obs_history_proto_store,
- const size_t backfill_days,
- const std::chrono::seconds aggregate_backup_interval,
- const std::chrono::seconds generate_obs_interval,
- const std::chrono::seconds gc_interval) {
- CHECK_LE(aggregate_backup_interval.count(), generate_obs_interval.count())
- << "aggregate_backup_interval must be less than or equal to "
- "generate_obs_interval";
- CHECK_LE(aggregate_backup_interval.count(), gc_interval.count())
- << "aggregate_backup_interval must be less than or equal to gc_interval";
- aggregate_backup_interval_ = aggregate_backup_interval;
- generate_obs_interval_ = generate_obs_interval;
- gc_interval_ = gc_interval;
-
- aggregate_store_ =
- std::make_unique<AggregateStore>(encoder, observation_writer, local_aggregate_proto_store,
- obs_history_proto_store, backfill_days);
-
- steady_clock_ = std::make_unique<SteadyClock>();
-}
-
-void EventAggregator::Start(std::unique_ptr<util::SystemClockInterface> clock) {
- auto locked = protected_worker_thread_controller_.lock();
- locked->shut_down = false;
- std::thread t(std::bind(
- [this](std::unique_ptr<util::SystemClockInterface>& clock) { this->Run(std::move(clock)); },
- std::move(clock)));
- worker_thread_ = std::move(t);
-}
+EventAggregator::EventAggregator(AggregateStore* aggregate_store)
+ : aggregate_store_(aggregate_store) {}
// TODO(pesk): update the EventAggregator's view of a Metric
// or ReportDefinition when appropriate.
@@ -190,107 +154,4 @@
memory_usage_event.component(), config::PackEventCodes(memory_usage_event.event_code()),
event->day_index(), memory_usage_event.bytes());
}
-
-Status EventAggregator::GenerateObservationsNoWorker(uint32_t final_day_index_utc,
- uint32_t final_day_index_local) {
- if (worker_thread_.joinable()) {
- LOG(ERROR) << "GenerateObservationsNoWorker() was called while "
- "worker thread was running.";
- return kOther;
- }
- return aggregate_store_->GenerateObservations(final_day_index_utc, final_day_index_local);
-}
-
-void EventAggregator::ShutDown() {
- if (worker_thread_.joinable()) {
- {
- auto locked = protected_worker_thread_controller_.lock();
- locked->shut_down = true;
- locked->shutdown_notifier.notify_all();
- }
- worker_thread_.join();
- } else {
- protected_worker_thread_controller_.lock()->shut_down = true;
- }
-}
-
-void EventAggregator::Run(std::unique_ptr<util::SystemClockInterface> system_clock) {
- std::chrono::steady_clock::time_point steady_time = steady_clock_->now();
- // Schedule Observation generation to happen in the first cycle.
- next_generate_obs_ = steady_time;
- // Schedule garbage collection to happen |gc_interval_| seconds from now.
- next_gc_ = steady_time + gc_interval_;
- // Acquire the mutex protecting the shutdown flag and condition variable.
- auto locked = protected_worker_thread_controller_.lock();
- while (true) {
- // If shutdown has been requested, back up the LocalAggregateStore and
- // exit.
- if (locked->shut_down) {
- aggregate_store_->BackUpLocalAggregateStore();
- return;
- }
- // Sleep until the next scheduled backup of the LocalAggregateStore or
- // until notified of shutdown. Back up the LocalAggregateStore after
- // waking.
- locked->shutdown_notifier.wait_for(locked, aggregate_backup_interval_, [&locked]() {
- if (locked->immediate_run_trigger) {
- locked->immediate_run_trigger = false;
- return true;
- }
- return locked->shut_down;
- });
- aggregate_store_->BackUpLocalAggregateStore();
- // If the worker thread was woken up by a shutdown request, exit.
- // Otherwise, complete any scheduled Observation generation and garbage
- // collection.
- if (locked->shut_down) {
- return;
- }
- // Check whether it is time to generate Observations or to garbage-collect
- // the LocalAggregate store. If so, do that task and schedule the next
- // occurrence.
- DoScheduledTasks(system_clock->now(), steady_clock_->now());
- }
-}
-
-void EventAggregator::DoScheduledTasks(std::chrono::system_clock::time_point system_time,
- std::chrono::steady_clock::time_point steady_time) {
- auto current_time_t = std::chrono::system_clock::to_time_t(system_time);
- auto yesterday_utc = TimeToDayIndex(current_time_t, MetricDefinition::UTC) - 1;
- auto yesterday_local_time = TimeToDayIndex(current_time_t, MetricDefinition::LOCAL) - 1;
-
- // Skip the tasks (but do schedule a retry) if either day index is too small.
- uint32_t min_allowed_day_index = kMaxAllowedAggregationDays + aggregate_store_->backfill_days_;
- bool skip_tasks =
- (yesterday_utc < min_allowed_day_index || yesterday_local_time < min_allowed_day_index);
- if (steady_time >= next_generate_obs_) {
- next_generate_obs_ += generate_obs_interval_;
- if (skip_tasks) {
- LOG_FIRST_N(ERROR, 10) << "EventAggregator is skipping Observation generation because the "
- "current day index is too small.";
- } else {
- auto obs_status = aggregate_store_->GenerateObservations(yesterday_utc, yesterday_local_time);
- if (obs_status == kOK) {
- aggregate_store_->BackUpObservationHistory();
- } else {
- LOG(ERROR) << "GenerateObservations failed with status: " << obs_status;
- }
- }
- }
- if (steady_time >= next_gc_) {
- next_gc_ += gc_interval_;
- if (skip_tasks) {
- LOG_FIRST_N(ERROR, 10) << "EventAggregator is skipping garbage collection because the "
- "current day index is too small.";
- } else {
- auto gc_status = aggregate_store_->GarbageCollect(yesterday_utc, yesterday_local_time);
- if (gc_status == kOK) {
- aggregate_store_->BackUpLocalAggregateStore();
- } else {
- LOG(ERROR) << "GarbageCollect failed with status: " << gc_status;
- }
- }
- }
-}
-
} // namespace cobalt::local_aggregation
diff --git a/src/local_aggregation/event_aggregator.h b/src/local_aggregation/event_aggregator.h
index 116cfda..d6a887c 100644
--- a/src/local_aggregation/event_aggregator.h
+++ b/src/local_aggregation/event_aggregator.h
@@ -18,19 +18,7 @@
#include "src/logger/observation_writer.h"
#include "src/logger/status.h"
-
-namespace cobalt {
-
-// Forward declaration used for friend tests. These will be removed once a better solution is
-// designed.
-// TODO(ninai): remove this
-namespace internal {
-
-class RealLoggerFactory;
-
-} // namespace internal
-
-namespace local_aggregation {
+namespace cobalt::local_aggregation {
// The EventAggregator manages the Loggers' interactions with the local aggregation.
//
@@ -39,74 +27,13 @@
// provide the EventAggregator with its view of Cobalt's metric and report
// registry.
// (2) When logging an Event for a locally aggregated report, a Logger
-// calls a Log*() method with the EventRecord and the report id for the event.
-//
-// Functionality that the EventAggregator curently has but will be moved to the
-// EventAggregatorManager:
-// A worker thread calls on AggregateStore methods to do the following tasks at intervals
-// specified in the EventAggregator's constructor:
-// (1) Calls BackUp*() to back up the EventAggregator's state to the file system.
-// (2) Calls GenerateObservations() with the previous day's day index to generate all Observations
-// for rolling windows ending on that day index, as well as any missing Observations for a specified
-// number of days in the past.
-// (3) Calls GarbageCollect() to delete daily aggregates which are not needed to compute aggregates
-// for any windows of interest in the future.
+// calls an Add*() method with the EventRecord and the report id for the event.
class EventAggregator {
public:
// Constructs an EventAggregator.
//
- // encoder: the singleton instance of an Encoder on the system.
- //
- // local_aggregate_proto_store: A ConsistentProtoStore to be used by the
- // AggregateStore to store snapshots of its in-memory store of event
- // aggregates.
- //
- // obs_history_proto_store: A ConsistentProtoStore to be used by the
- // AggregateStore to store snapshots of its in-memory history of generated
- // Observations.
- //
- // backfill_days: the number of past days for which the AggregateStore
- // generates and sends Observations, in addition to a requested day index.
- // See the comment above AggreateStoe::GenerateObservations for more
- // detail. The constructor CHECK-fails if a value larger than
- // |kEventAggregatorMaxAllowedBackfillDays| is passed.
- //
- // aggregate_backup_interval: the interval in seconds at which a snapshot of
- // the in-memory store of event aggregates should be written to
- // |local_aggregate_proto_store|.
- //
- // generate_obs_interval: the interval in seconds at which the
- // EventAggregator should generate Observations.
- //
- // gc_interval: the interval in seconds at which the LocalAggregateStore
- // should be garbage-collected.
- //
- // The constructor CHECK-fails if the value of |aggregate_backup_interval| is
- // larger than either of |generate_obs_interval| or |gc_interval|. In
- // practice, the value of |aggregate_backup_interval| should be small relative
- // to the values of |generate_obs_interval| and |gc_interval|, since each of
- // Observation generation and garbage collection will be done at the smallest
- // multiple of |aggregate_backup_interval| which is greater than or equal to
- // its specified interval.
- EventAggregator(const logger::Encoder* encoder,
- const logger::ObservationWriter* observation_writer,
- util::ConsistentProtoStore* local_aggregate_proto_store,
- util::ConsistentProtoStore* obs_history_proto_store, size_t backfill_days = 0,
- std::chrono::seconds aggregate_backup_interval = std::chrono::minutes(1),
- std::chrono::seconds generate_obs_interval = std::chrono::hours(1),
- std::chrono::seconds gc_interval = kOneDay);
-
- // Shut down the worker thread before destructing the EventAggregator.
- ~EventAggregator() { ShutDown(); }
-
- // Starts the worker thread.
- //
- // |clock| The clock that should be used by the worker thread for scheduling
- // tasks and determining the current day and hour. On systems on which
- // the clock may be initially inaccurate, the caller should wait to
- // invoke this method until after it is known that the clock is
- // accurate.
- void Start(std::unique_ptr<util::SystemClockInterface> clock);
+ // aggregate_store: an AggregateStore, which is used to store the local aggregates.
+ explicit EventAggregator(AggregateStore* aggregate_store);
// Updates the EventAggregator's view of the Cobalt metric and report
// registry.
@@ -166,85 +93,10 @@
// AddMemoryUsageEvent: |event_record| should wrap a MemoryUsageEvent.
logger::Status AddMemoryUsageEvent(uint32_t report_id, const logger::EventRecord& event_record);
- // Checks that the worker thread is shut down, and if so, calls
- // AggregateStore::GenerateObservations() and returns its result. Returns kOther if the
- // worker thread is joinable. See the documentation on AggregateStore::GenerateObservations()
- // for a description of the parameters.
- //
- // This method is intended for use in tests which require a single thread to
- // both log events to and generate Observations from an EventAggregator.
- logger::Status GenerateObservationsNoWorker(uint32_t final_day_index_utc,
- uint32_t final_day_index_local = 0u);
-
private:
- friend class EventAggregatorManager; // used for transition during redesign.
- friend class AggregateStoreTest;
- friend class AggregateStoreWorkerTest;
- friend class EventAggregatorTest;
- friend class EventAggregatorManagerTest;
- friend class EventAggregatorWorkerTest;
- friend class TestEventAggregatorManager;
- friend class internal::RealLoggerFactory;
-
- // Request that the worker thread shut down and wait for it to exit. The
- // worker thread backs up the LocalAggregateStore before exiting.
- void ShutDown();
-
- // Main loop executed by the worker thread. The thread sleeps for
- // |aggregate_backup_interval_| seconds or until notified of shutdown, then
- // calls BackUpLocalAggregateStore(). If not notified of shutdown, calls
- // DoScheduledTasks() and schedules the next occurrence of any completed
- // tasks.
- void Run(std::unique_ptr<util::SystemClockInterface> system_clock);
-
- // Helper method called by Run(). If |next_generate_obs_| is less than or equal to |steady_time|,
- // calls AggregateStore::GenerateObservations() with the day index of the previous day from
- // |system_time| in each of UTC and local time, and then backs up the history of generated
- // Observations. If |next_gc_| is less than or equal to |steady_time|, calls
- // AggregateStore::GarbageCollect() with the day index of the previous day from |system_time| in
- // each of UTC and local time and then backs up the LocalAggregateStore. In each case, an error is
- // logged and execution continues if the operation fails.
- void DoScheduledTasks(std::chrono::system_clock::time_point system_time,
- std::chrono::steady_clock::time_point steady_time);
-
- // Sets the EventAggregator's SteadyClockInterface. Only for use in tests.
- void SetSteadyClock(util::SteadyClockInterface* clock) { steady_clock_.reset(clock); }
-
- struct WorkerThreadController {
- // Setting this value to true requests that the worker thread stop.
- bool shut_down = true;
-
- // Setting this value to true requests that the worker thread immediately perform its work
- // rather than waiting for the next scheduled time to run. After the worker thread has completed
- // its work, it will reset this value to false.
- bool immediate_run_trigger = false;
-
- // Used to wait on to execute periodic EventAggregator tasks.
- std::condition_variable_any shutdown_notifier;
- };
-
- struct ShutDownFlag {
- // Used to trigger a shutdown of the EventAggregator.
- bool shut_down = true;
- // Used in tests to manually trigger a run of the EventAggregator's scheduled tasks.
- bool manual_trigger = false;
- // Used to wait on to execute periodic EventAggregator tasks.
- std::condition_variable_any shutdown_notifier;
- };
-
- std::unique_ptr<AggregateStore> aggregate_store_;
-
- std::thread worker_thread_;
- util::ProtectedFields<WorkerThreadController> protected_worker_thread_controller_;
- std::chrono::seconds aggregate_backup_interval_;
- std::chrono::seconds generate_obs_interval_;
- std::chrono::seconds gc_interval_;
- std::chrono::steady_clock::time_point next_generate_obs_;
- std::chrono::steady_clock::time_point next_gc_;
- std::unique_ptr<util::SteadyClockInterface> steady_clock_;
+ AggregateStore* aggregate_store_; // not owned
};
-} // namespace local_aggregation
-} // namespace cobalt
+} // namespace cobalt::local_aggregation
#endif // COBALT_SRC_LOCAL_AGGREGATION_EVENT_AGGREGATOR_H_
diff --git a/src/local_aggregation/event_aggregator_mgr.cc b/src/local_aggregation/event_aggregator_mgr.cc
index dda9219..2418856 100644
--- a/src/local_aggregation/event_aggregator_mgr.cc
+++ b/src/local_aggregation/event_aggregator_mgr.cc
@@ -4,9 +4,14 @@
#include "src/local_aggregation/event_aggregator_mgr.h"
+#include "src/lib/util/datetime_util.h"
+
namespace cobalt::local_aggregation {
+using logger::kOK;
+using logger::Status;
using util::ConsistentProtoStore;
+using util::TimeToDayIndex;
EventAggregatorManager::EventAggregatorManager(const logger::Encoder* encoder,
const logger::ObservationWriter* observation_writer,
@@ -15,14 +20,116 @@
const size_t backfill_days,
const std::chrono::seconds aggregate_backup_interval,
const std::chrono::seconds generate_obs_interval,
- const std::chrono::seconds gc_interval) {
- event_aggregator_ = std::make_unique<EventAggregator>(
- encoder, observation_writer, local_aggregate_proto_store, obs_history_proto_store,
- backfill_days, aggregate_backup_interval, generate_obs_interval, gc_interval);
+ const std::chrono::seconds gc_interval)
+ : backfill_days_(backfill_days),
+ aggregate_backup_interval_(aggregate_backup_interval),
+ generate_obs_interval_(generate_obs_interval),
+ gc_interval_(gc_interval) {
+ aggregate_store_ =
+ std::make_unique<AggregateStore>(encoder, observation_writer, local_aggregate_proto_store,
+ obs_history_proto_store, backfill_days);
+ event_aggregator_ = std::make_unique<EventAggregator>(aggregate_store_.get());
}
void EventAggregatorManager::Start(std::unique_ptr<util::SystemClockInterface> clock) {
- event_aggregator_->Start(std::move(clock));
+ auto locked = protected_worker_thread_controller_.lock();
+ locked->shut_down = false;
+ std::thread t(std::bind(
+ [this](std::unique_ptr<util::SystemClockInterface>& clock) { this->Run(std::move(clock)); },
+ std::move(clock)));
+ worker_thread_ = std::move(t);
+}
+
+void EventAggregatorManager::ShutDown() {
+ if (worker_thread_.joinable()) {
+ {
+ auto locked = protected_worker_thread_controller_.lock();
+ locked->shut_down = true;
+ locked->shutdown_notifier.notify_all();
+ }
+ worker_thread_.join();
+ } else {
+ protected_worker_thread_controller_.lock()->shut_down = true;
+ }
+}
+
+void EventAggregatorManager::Run(std::unique_ptr<util::SystemClockInterface> system_clock) {
+ std::chrono::steady_clock::time_point steady_time = steady_clock_->now();
+ // Schedule Observation generation to happen in the first cycle.
+ next_generate_obs_ = steady_time;
+ // Schedule garbage collection to happen |gc_interval_| seconds from now.
+ next_gc_ = steady_time + gc_interval_;
+ // Acquire the mutex protecting the shutdown flag and condition variable.
+ auto locked = protected_worker_thread_controller_.lock();
+ while (true) {
+ // If shutdown has been requested, back up the LocalAggregateStore and
+ // exit.
+ if (locked->shut_down) {
+ aggregate_store_->BackUpLocalAggregateStore();
+ return;
+ }
+ // Sleep until the next scheduled backup of the LocalAggregateStore or
+ // until notified of shutdown. Back up the LocalAggregateStore after
+ // waking.
+ locked->shutdown_notifier.wait_for(locked, aggregate_backup_interval_, [&locked]() {
+ if (locked->immediate_run_trigger) {
+ locked->immediate_run_trigger = false;
+ return true;
+ }
+ return locked->shut_down;
+ });
+ aggregate_store_->BackUpLocalAggregateStore();
+ // If the worker thread was woken up by a shutdown request, exit.
+ // Otherwise, complete any scheduled Observation generation and garbage
+ // collection.
+ if (locked->shut_down) {
+ return;
+ }
+ // Check whether it is time to generate Observations or to garbage-collect
+ // the LocalAggregate store. If so, do that task and schedule the next
+ // occurrence.
+ DoScheduledTasks(system_clock->now(), steady_clock_->now());
+ }
+}
+
+void EventAggregatorManager::DoScheduledTasks(std::chrono::system_clock::time_point system_time,
+ std::chrono::steady_clock::time_point steady_time) {
+ auto current_time_t = std::chrono::system_clock::to_time_t(system_time);
+ auto yesterday_utc = TimeToDayIndex(current_time_t, MetricDefinition::UTC) - 1;
+ auto yesterday_local_time = TimeToDayIndex(current_time_t, MetricDefinition::LOCAL) - 1;
+
+ // Skip the tasks (but do schedule a retry) if either day index is too small.
+ uint32_t min_allowed_day_index = kMaxAllowedAggregationDays + backfill_days_;
+ bool skip_tasks =
+ (yesterday_utc < min_allowed_day_index || yesterday_local_time < min_allowed_day_index);
+ if (steady_time >= next_generate_obs_) {
+ next_generate_obs_ += generate_obs_interval_;
+ if (skip_tasks) {
+ LOG_FIRST_N(ERROR, 10) << "EventAggregator is skipping Observation generation because the "
+ "current day index is too small.";
+ } else {
+ auto obs_status = aggregate_store_->GenerateObservations(yesterday_utc, yesterday_local_time);
+ if (obs_status == kOK) {
+ aggregate_store_->BackUpObservationHistory();
+ } else {
+ LOG(ERROR) << "GenerateObservations failed with status: " << obs_status;
+ }
+ }
+ }
+ if (steady_time >= next_gc_) {
+ next_gc_ += gc_interval_;
+ if (skip_tasks) {
+ LOG_FIRST_N(ERROR, 10) << "EventAggregator is skipping garbage collection because the "
+ "current day index is too small.";
+ } else {
+ auto gc_status = aggregate_store_->GarbageCollect(yesterday_utc, yesterday_local_time);
+ if (gc_status == kOK) {
+ aggregate_store_->BackUpLocalAggregateStore();
+ } else {
+ LOG(ERROR) << "GarbageCollect failed with status: " << gc_status;
+ }
+ }
+ }
}
} // namespace cobalt::local_aggregation
diff --git a/src/local_aggregation/event_aggregator_mgr.h b/src/local_aggregation/event_aggregator_mgr.h
index 12ea6d1..34b8f22 100644
--- a/src/local_aggregation/event_aggregator_mgr.h
+++ b/src/local_aggregation/event_aggregator_mgr.h
@@ -13,16 +13,38 @@
#include "src/lib/util/clock.h"
#include "src/lib/util/consistent_proto_store.h"
#include "src/lib/util/protected_fields.h"
+#include "src/local_aggregation/aggregate_store.h"
#include "src/local_aggregation/event_aggregator.h"
#include "src/local_aggregation/local_aggregation.pb.h"
#include "src/logger/encoder.h"
#include "src/logger/observation_writer.h"
#include "src/logger/status.h"
-namespace cobalt::local_aggregation {
+namespace cobalt {
constexpr int64_t kHoursInADay = 24;
+// Forward declaration used for friend tests. These will be removed once a better solution is
+// designed.
+// TODO(ninai): remove this
+namespace internal {
+
+class RealLoggerFactory;
+
+} // namespace internal
+
+namespace local_aggregation {
+
+// Class responsible for managing memory, threading, locks, time and the main loop.
+//
+// A worker thread calls on AggregateStore methods to do the following tasks at intervals
+// specified in the EventAggregator's constructor:
+// (1) Calls BackUp*() to back up the EventAggregator's state to the file system.
+// (2) Calls GenerateObservations() with the previous day's day index to generate all Observations
+// for rolling windows ending on that day index, as well as any missing Observations for a specified
+// number of days in the past.
+// (3) Calls GarbageCollect() to delete daily aggregates which are not needed to compute aggregates
+// for any windows of interest in the future.
class EventAggregatorManager {
public:
// Constructs a class to manage local aggregation and provide EventAggregators.
@@ -68,6 +90,9 @@
std::chrono::seconds generate_obs_interval = std::chrono::hours(1),
std::chrono::seconds gc_interval = std::chrono::hours(kHoursInADay));
+ // Shut down the worker thread before destructing the EventAggregatorManager.
+ ~EventAggregatorManager() { ShutDown(); }
+
// Starts the worker thread.
//
// |clock| The clock that should be used by the worker thread for scheduling tasks and determining
@@ -82,15 +107,76 @@
private:
friend class TestEventAggregatorManager;
friend class EventAggregatorManagerTest;
+ friend class AggregateStoreTest;
+ friend class AggregateStoreWorkerTest;
+ friend class EventAggregatorTest;
+ friend class EventAggregatorManagerTest;
+ friend class EventAggregatorWorkerTest;
+ friend class internal::RealLoggerFactory;
- // Sets the EventAggregator's SteadyClockInterface. Only for use in tests.
- void SetSteadyClock(util::SteadyClockInterface* clock) {
- event_aggregator_->SetSteadyClock(clock);
- }
+ // Request that the worker thread shut down and wait for it to exit. The
+ // worker thread backs up the LocalAggregateStore before exiting.
+ void ShutDown();
+ // Main loop executed by the worker thread. The thread sleeps for
+ // |aggregate_backup_interval_| seconds or until notified of shutdown, then
+ // calls BackUpLocalAggregateStore(). If not notified of shutdown, calls
+ // DoScheduledTasks() and schedules the next occurrence of any completed
+ // tasks.
+ void Run(std::unique_ptr<util::SystemClockInterface> system_clock);
+
+ // Helper method called by Run(). If |next_generate_obs_| is less than or equal to |steady_time|,
+ // calls AggregateStore::GenerateObservations() with the day index of the previous day from
+ // |system_time| in each of UTC and local time, and then backs up the history of generated
+ // Observations. If |next_gc_| is less than or equal to |steady_time|, calls
+ // AggregateStore::GarbageCollect() with the day index of the previous day from |system_time| in
+ // each of UTC and local time and then backs up the LocalAggregateStore. In each case, an error is
+ // logged and execution continues if the operation fails.
+ void DoScheduledTasks(std::chrono::system_clock::time_point system_time,
+ std::chrono::steady_clock::time_point steady_time);
+
+ // Sets the EventAggregatorManager's SteadyClockInterface. Only for use in tests.
+ void SetSteadyClock(util::SteadyClockInterface* clock) { steady_clock_.reset(clock); }
+
+ struct WorkerThreadController {
+ // Setting this value to true requests that the worker thread stop.
+ bool shut_down = true;
+
+ // Setting this value to true requests that the worker thread immediately perform its work
+ // rather than waiting for the next scheduled time to run. After the worker thread has completed
+ // its work, it will reset this value to false.
+ bool immediate_run_trigger = false;
+
+ // Used to wait on to execute periodic EventAggregator tasks.
+ std::condition_variable_any shutdown_notifier;
+ };
+
+ struct ShutDownFlag {
+ // Used to trigger a shutdown of the EventAggregator.
+ bool shut_down = true;
+ // Used in tests to manually trigger a run of the EventAggregator's scheduled tasks.
+ bool manual_trigger = false;
+ // Used to wait on to execute periodic EventAggregator tasks.
+ std::condition_variable_any shutdown_notifier;
+ };
+
+ size_t backfill_days_;
+ std::chrono::seconds aggregate_backup_interval_;
+ std::chrono::seconds generate_obs_interval_;
+ std::chrono::seconds gc_interval_;
+
+ std::chrono::steady_clock::time_point next_generate_obs_;
+ std::chrono::steady_clock::time_point next_gc_;
+ std::unique_ptr<util::SteadyClockInterface> steady_clock_;
+
+ std::unique_ptr<AggregateStore> aggregate_store_;
std::unique_ptr<EventAggregator> event_aggregator_;
+
+ std::thread worker_thread_;
+ util::ProtectedFields<WorkerThreadController> protected_worker_thread_controller_;
};
-} // namespace cobalt::local_aggregation
+} // namespace local_aggregation
+} // namespace cobalt
#endif // COBALT_SRC_LOCAL_AGGREGATION_EVENT_AGGREGATOR_MGR_H_
diff --git a/src/local_aggregation/event_aggregator_mgr_test.cc b/src/local_aggregation/event_aggregator_mgr_test.cc
index 308b54d..1df6bf3 100644
--- a/src/local_aggregation/event_aggregator_mgr_test.cc
+++ b/src/local_aggregation/event_aggregator_mgr_test.cc
@@ -89,9 +89,7 @@
return event_aggregator_mgr_;
}
- void ShutDown(EventAggregatorManager* event_aggregator_mgr) {
- event_aggregator_mgr->GetEventAggregator()->ShutDown();
- }
+ void ShutDown(EventAggregatorManager* event_aggregator_mgr) { event_aggregator_mgr->ShutDown(); }
bool IsInShutdownState(EventAggregatorManager* event_aggregator_mgr) {
return (IsShutdownFlagSet(event_aggregator_mgr) && !IsWorkerJoinable(event_aggregator_mgr));
@@ -102,13 +100,11 @@
}
bool IsShutdownFlagSet(EventAggregatorManager* event_aggregator_mgr) {
- return event_aggregator_mgr->GetEventAggregator()
- ->protected_worker_thread_controller_.const_lock()
- ->shut_down;
+ return event_aggregator_mgr->protected_worker_thread_controller_.const_lock()->shut_down;
}
bool IsWorkerJoinable(EventAggregatorManager* event_aggregator_mgr) {
- return event_aggregator_mgr->GetEventAggregator()->worker_thread_.joinable();
+ return event_aggregator_mgr->worker_thread_.joinable();
}
// Returns the day index of the current day according to |test_clock_|, in
@@ -121,10 +117,7 @@
bool BackUpHappened() { return local_aggregate_proto_store_->write_count_ >= 1; }
uint32_t NumberOfKVPairsInStore(EventAggregatorManager* event_aggregator_mgr) {
- return event_aggregator_mgr->GetEventAggregator()
- ->aggregate_store_->CopyLocalAggregateStore()
- .by_report_key()
- .size();
+ return event_aggregator_mgr->aggregate_store_->CopyLocalAggregateStore().by_report_key().size();
}
// Given a ProjectContext |project_context| and the MetricReportId of a UNIQUE_N_DAY_ACTIVES
@@ -144,8 +137,7 @@
}
uint32_t GetNumberOfUniqueActivesAggregates(EventAggregatorManager* event_aggregator_mgr) {
- auto local_aggregate_store =
- event_aggregator_mgr->GetEventAggregator()->aggregate_store_->CopyLocalAggregateStore();
+ auto local_aggregate_store = event_aggregator_mgr->aggregate_store_->CopyLocalAggregateStore();
uint32_t num_aggregates = 0;
for (const auto& [report_key, aggregates] : local_aggregate_store.by_report_key()) {
if (aggregates.type_case() != ReportAggregates::kUniqueActivesAggregates) {
@@ -164,8 +156,7 @@
EventAggregatorManager* event_aggregator_mgr,
const std::shared_ptr<const ProjectContext>& project_context,
const MetricReportId& metric_report_id, uint32_t day_index, uint32_t event_code) {
- auto local_aggregate_store =
- event_aggregator_mgr->GetEventAggregator()->aggregate_store_->CopyLocalAggregateStore();
+ auto local_aggregate_store = event_aggregator_mgr->aggregate_store_->CopyLocalAggregateStore();
std::string key;
if (!SerializeToBase64(MakeAggregationKey(*project_context, metric_report_id), &key)) {
return AssertionFailure() << "Could not serialize key with metric id "
@@ -206,16 +197,16 @@
return AssertionSuccess();
}
- void TriggerAndWaitForDoScheduledTasks(EventAggregator* event_aggregator) {
+ void TriggerAndWaitForDoScheduledTasks(EventAggregatorManager* event_aggregator_mgr) {
{
// Acquire the lock to manually trigger the scheduled tasks.
- auto locked = event_aggregator->protected_worker_thread_controller_.lock();
+ auto locked = event_aggregator_mgr->protected_worker_thread_controller_.lock();
locked->immediate_run_trigger = true;
locked->shutdown_notifier.notify_all();
}
while (true) {
// Reacquire the lock to make sure that the scheduled tasks have completed.
- auto locked = event_aggregator->protected_worker_thread_controller_.lock();
+ auto locked = event_aggregator_mgr->protected_worker_thread_controller_.lock();
if (!locked->immediate_run_trigger) {
break;
}
@@ -368,7 +359,7 @@
expected_obs[{expected_id, day_index}] = {{1, {false, true, true, true, true}},
{7, {false, true, true, true, true}}};
- TriggerAndWaitForDoScheduledTasks(event_aggregator_mgr->GetEventAggregator());
+ TriggerAndWaitForDoScheduledTasks(event_aggregator_mgr.get());
EXPECT_EQ(kOK,
event_aggregator_mgr->GetEventAggregator()->UpdateAggregationConfigs(*project_context));
@@ -384,7 +375,7 @@
AdvanceClock(kSecondsInOneDay);
ResetObservationStore();
- TriggerAndWaitForDoScheduledTasks(event_aggregator_mgr->GetEventAggregator());
+ TriggerAndWaitForDoScheduledTasks(event_aggregator_mgr.get());
EXPECT_TRUE(CheckUniqueActivesObservations(expected_obs, observation_store_.get(),
update_recipient_.get()));
diff --git a/src/local_aggregation/event_aggregator_test.cc b/src/local_aggregation/event_aggregator_test.cc
index f9478ee..5629e9d 100644
--- a/src/local_aggregation/event_aggregator_test.cc
+++ b/src/local_aggregation/event_aggregator_test.cc
@@ -37,13 +37,11 @@
using logger::ObservationWriter;
using logger::ProjectContext;
using logger::Status;
-using logger::testing::CheckUniqueActivesObservations;
using logger::testing::ExpectedUniqueActivesObservations;
using logger::testing::FakeObservationStore;
using logger::testing::GetTestProject;
using logger::testing::MakeAggregationConfig;
using logger::testing::MakeAggregationKey;
-using logger::testing::MakeNullExpectedUniqueActivesObservations;
using logger::testing::MockConsistentProtoStore;
using logger::testing::TestUpdateRecipient;
using system_data::ClientSecret;
@@ -124,7 +122,7 @@
unowned_test_clock_ = test_clock_.get();
day_store_created_ = CurrentDayIndex();
test_steady_clock_ = new IncrementingSteadyClock(std::chrono::system_clock::duration(0));
- event_aggregator_mgr_->GetEventAggregator()->SetSteadyClock(test_steady_clock_);
+ event_aggregator_mgr_->SetSteadyClock(test_steady_clock_);
}
// Destruct the EventAggregator (thus calling EventAggregator::ShutDown())
@@ -145,26 +143,22 @@
time_zone);
}
- size_t GetBackfillDays() {
- return event_aggregator_mgr_->GetEventAggregator()->aggregate_store_->backfill_days_;
- }
+ size_t GetBackfillDays() { return event_aggregator_mgr_->aggregate_store_->backfill_days_; }
LocalAggregateStore CopyLocalAggregateStore() {
- return event_aggregator_mgr_->GetEventAggregator()->aggregate_store_->CopyLocalAggregateStore();
+ return event_aggregator_mgr_->aggregate_store_->CopyLocalAggregateStore();
}
void TriggerAndWaitForDoScheduledTasks() {
{
// Acquire the lock to manually trigger the scheduled tasks.
- auto locked =
- event_aggregator_mgr_->GetEventAggregator()->protected_worker_thread_controller_.lock();
+ auto locked = event_aggregator_mgr_->protected_worker_thread_controller_.lock();
locked->immediate_run_trigger = true;
locked->shutdown_notifier.notify_all();
}
while (true) {
// Reacquire the lock to make sure that the scheduled tasks have completed.
- auto locked =
- event_aggregator_mgr_->GetEventAggregator()->protected_worker_thread_controller_.lock();
+ auto locked = event_aggregator_mgr_->protected_worker_thread_controller_.lock();
if (!locked->immediate_run_trigger) {
break;
}
@@ -344,8 +338,7 @@
// of reference.
bool CheckUniqueActivesAggregates(const LoggedActivity& logged_activity,
uint32_t /*current_day_index*/) {
- auto local_aggregate_store =
- event_aggregator_mgr_->GetEventAggregator()->aggregate_store_->CopyLocalAggregateStore();
+ auto local_aggregate_store = event_aggregator_mgr_->aggregate_store_->CopyLocalAggregateStore();
// Check that the LocalAggregateStore contains no more UniqueActives
// aggregates than |logged_activity| and |day_last_garbage_collected_|
// should imply.
@@ -460,8 +453,7 @@
bool CheckPerDeviceNumericAggregates(const LoggedValues& logged_values,
uint32_t /*current_day_index*/) {
- auto local_aggregate_store =
- event_aggregator_mgr_->GetEventAggregator()->aggregate_store_->CopyLocalAggregateStore();
+ auto local_aggregate_store = event_aggregator_mgr_->aggregate_store_->CopyLocalAggregateStore();
// Check that the LocalAggregateStore contains no more PerDeviceNumeric
// aggregates than |logged_values| and |day_last_garbage_collected_| should
// imply.
@@ -811,21 +803,17 @@
protected:
void SetUp() override { EventAggregatorTest::SetUp(); }
- void ShutDownWorkerThread() { event_aggregator_mgr_->GetEventAggregator()->ShutDown(); }
+ void ShutDownWorkerThread() { event_aggregator_mgr_->ShutDown(); }
bool in_shutdown_state() { return (shutdown_flag_set() && !worker_joinable()); }
bool in_run_state() { return (!shutdown_flag_set() && worker_joinable()); }
bool shutdown_flag_set() {
- return event_aggregator_mgr_->GetEventAggregator()
- ->protected_worker_thread_controller_.const_lock()
- ->shut_down;
+ return event_aggregator_mgr_->protected_worker_thread_controller_.const_lock()->shut_down;
}
- bool worker_joinable() {
- return event_aggregator_mgr_->GetEventAggregator()->worker_thread_.joinable();
- }
+ bool worker_joinable() { return event_aggregator_mgr_->worker_thread_.joinable(); }
};
// Tests that an empty LocalAggregateStore is updated with
@@ -1013,65 +1001,6 @@
}
}
-// Checks that UniqueActivesObservations with the expected values are
-// generated by the the scheduled Observation generation when some events
-// have been logged for a UNIQUE_N_DAY_ACTIVES.
-// (based on UniqueActivesNoiseFreeEventAggregatorTest::CheckObservationValuesMultiDay)
-//
-// Logging pattern:
-// Logs events for the EventsOccurred_UniqueDevices report (whose parent
-// metric has max_event_code = 4) for event codes 1 through 4.
-//
-// Expected number of Observations:
-// The call to GenerateObservations should generate a number of Observations
-// equal to the daily_num_obs field of
-// |logger::testing::unique_actives_noise_free::kExpectedAggregationParams|.
-//
-// Expected Observation values:
-// The EventsOccurred_UniqueDevices report has window sizes 1 and 7, and
-// the expected activity indicators of Observations for that report are:
-//
-// (window size) active for event codes
-// ------------------------------------------------------
-// (1) 1, 2, 3, 4
-// (7) 1, 2, 3, 4
-//
-// TODO(ninai): remove duplicate test when the duplicate functions are removed.
-TEST_F(UniqueActivesNoiseFreeEventAggregatorTest, Run) {
- auto day_index = CurrentDayIndex();
-
- // Form expected Observations for the 1 day of logging.
- ExpectedUniqueActivesObservations expected_obs = MakeNullExpectedUniqueActivesObservations(
- logger::testing::unique_actives_noise_free::kExpectedAggregationParams, day_index);
- const auto& expected_id =
- logger::testing::unique_actives_noise_free::kEventsOccurredMetricReportId;
- expected_obs[{expected_id, day_index}] = {{1, {false, true, true, true, true}},
- {7, {false, true, true, true, true}}};
- event_aggregator_mgr_->Start(std::move(test_clock_));
- // Trigger the initial call to DoScheduledTasks and wait for it to have completed.
- TriggerAndWaitForDoScheduledTasks();
-
- // Generate some events.
- for (uint32_t event_code = 1;
- event_code <
- logger::testing::unique_actives_noise_free::kExpectedAggregationParams.num_event_codes.at(
- expected_id);
- event_code++) {
- EXPECT_EQ(kOK, AddUniqueActivesEvent(expected_id, day_index, event_code));
- }
- // Advance |test_clock_| by 1 day.
- AdvanceClock(kDay);
- // Clear the FakeObservationStore.
- ResetObservationStore();
-
- // Trigger another iteration of the call to DoScheduledTasks and wait for it to have completed.
- TriggerAndWaitForDoScheduledTasks();
-
- // Check the generated Observations against the expectation.
- EXPECT_TRUE(CheckUniqueActivesObservations(expected_obs, observation_store_.get(),
- update_recipient_.get()));
-}
-
// Tests that the LocalAggregateStore is updated as expected when
// EventAggregator::AddPerDeviceCountEvent() is called with valid arguments;
// i.e., with a report ID associated to an existing key of the
@@ -1210,87 +1139,4 @@
}
}
-// Starts the worker thread, and destructs the EventAggregator without
-// explicitly shutting down the worker thread. Checks that the shutdown flag
-// and worker thread are in the expected states before and after the thread is
-// started.
-//
-// TODO(ninai): remove duplicate test when the duplicate functions are removed.
-TEST_F(EventAggregatorWorkerTest, StartWorkerThread) {
- EXPECT_TRUE(in_shutdown_state());
- event_aggregator_mgr_->Start(std::move(test_clock_));
- EXPECT_TRUE(in_run_state());
-}
-
-// Starts the worker thread, shuts down the worker thread, and destructs the
-// EventAggregator. Checks that the shutdown flag and worker thread are in the
-// expected states.
-//
-// TODO(ninai): remove duplicate test when the duplicate functions are removed.
-TEST_F(EventAggregatorWorkerTest, StartAndShutDownWorkerThread) {
- EXPECT_TRUE(in_shutdown_state());
- event_aggregator_mgr_->Start(std::move(test_clock_));
- EXPECT_TRUE(in_run_state());
- ShutDownWorkerThread();
- EXPECT_TRUE(in_shutdown_state());
-}
-
-// Starts the worker thread and immediately shuts it down. Checks that the
-// LocalAggregateStore was backed up during shutdown.
-//
-// TODO(ninai): remove duplicate test when the duplicate functions are removed.
-TEST_F(EventAggregatorWorkerTest, BackUpBeforeShutdown) {
- event_aggregator_mgr_->Start(std::move(test_clock_));
- ShutDownWorkerThread();
- EXPECT_EQ(1, local_aggregate_proto_store_->write_count_);
-}
-
-// Starts the worker thread and calls
-// EventAggregator::UpdateAggregationConfigs() on the main thread.
-//
-// TODO(ninai): remove duplicate test when the duplicate functions are removed.
-TEST_F(EventAggregatorWorkerTest, UpdateAggregationConfigs) {
- event_aggregator_mgr_->Start(std::move(test_clock_));
- // Provide the EventAggregator with the all_report_types registry.
- auto project_context = GetTestProject(logger::testing::all_report_types::kCobaltRegistryBase64);
- EXPECT_EQ(
- kOK, event_aggregator_mgr_->GetEventAggregator()->UpdateAggregationConfigs(*project_context));
- // Check that the number of key-value pairs in the LocalAggregateStore is
- // now equal to the number of locally aggregated reports in the
- // all_report_types registry.
- EXPECT_EQ(logger::testing::all_report_types::kExpectedAggregationParams.metric_report_ids.size(),
- CopyLocalAggregateStore().by_report_key().size());
-}
-
-// Starts the worker thread, provides a ProjectContext, logs some events, and
-// shuts down the worker thread. Checks that the LocalAggregateStore was
-// backed up at least once during the lifetime of the worker thread.
-//
-// TODO(ninai): remove duplicate test when the duplicate functions are removed.
-TEST_F(EventAggregatorWorkerTest, LogEvents) {
- auto day_index = CurrentDayIndex();
- event_aggregator_mgr_->Start(std::move(test_clock_));
- // Provide the EventAggregator with the all_report_types registry.
- std::shared_ptr<ProjectContext> project_context =
- GetTestProject(logger::testing::all_report_types::kCobaltRegistryBase64);
- EXPECT_EQ(
- kOK, event_aggregator_mgr_->GetEventAggregator()->UpdateAggregationConfigs(*project_context));
- // // Adds some events to the local aggregations.
- LoggedActivity logged_activity;
- EXPECT_EQ(kOK, AddUniqueActivesEvent(
- project_context, logger::testing::all_report_types::kDeviceBootsMetricReportId,
- day_index, 0u, &logged_activity));
- EXPECT_EQ(kOK,
- AddUniqueActivesEvent(project_context,
- logger::testing::all_report_types::kFeaturesActiveMetricReportId,
- day_index, 4u, &logged_activity));
- EXPECT_EQ(kOK,
- AddUniqueActivesEvent(project_context,
- logger::testing::all_report_types::kEventsOccurredMetricReportId,
- day_index, 1u, &logged_activity));
- EXPECT_TRUE(CheckUniqueActivesAggregates(logged_activity, day_index));
- ShutDownWorkerThread();
- EXPECT_GE(local_aggregate_proto_store_->write_count_, 1);
-}
-
} // namespace cobalt::local_aggregation
diff --git a/src/local_aggregation/test_utils/test_event_aggregator_mgr.h b/src/local_aggregation/test_utils/test_event_aggregator_mgr.h
index f3834ed..45d39eb 100644
--- a/src/local_aggregation/test_utils/test_event_aggregator_mgr.h
+++ b/src/local_aggregation/test_utils/test_event_aggregator_mgr.h
@@ -26,23 +26,21 @@
// Triggers an out of schedule run of GenerateObservations(). This does not change the schedule
// of future runs.
logger::Status GenerateObservations(uint32_t day_index_utc, uint32_t day_index_local = 0u) {
- return EventAggregatorManager::event_aggregator_->aggregate_store_->GenerateObservations(
- day_index_utc, day_index_local);
+ return EventAggregatorManager::aggregate_store_->GenerateObservations(day_index_utc,
+ day_index_local);
}
// Triggers an out of schedule run of GarbageCollect(). This does not change the schedule of
// future runs.
logger::Status GarbageCollect(uint32_t day_index_utc, uint32_t day_index_local = 0u) {
- return EventAggregatorManager::event_aggregator_->aggregate_store_->GarbageCollect(
- day_index_utc, day_index_local);
+ return EventAggregatorManager::aggregate_store_->GarbageCollect(day_index_utc, day_index_local);
}
// Returns the number of aggregates of type per_device_numeric_aggregates.
uint32_t NumPerDeviceNumericAggregatesInStore() {
int count = 0;
for (const auto& aggregates :
- EventAggregatorManager::event_aggregator_->aggregate_store_->protected_aggregate_store_
- .lock()
+ EventAggregatorManager::aggregate_store_->protected_aggregate_store_.lock()
->local_aggregate_store.by_report_key()) {
if (aggregates.second.has_numeric_aggregates()) {
count += aggregates.second.numeric_aggregates().by_component().size();