[cobalt[LocalAggregation] Make Observation History a protected field.
- moves obs_history in a ProtectedField<>
- Defines Get/Set methods for the *LastGeneratedDayIndex, so that it
separates accessing the underlying data structures from observation
generation
- Adds tests for Get/Set methods
- makes the methods pulic
- nit organization of the AggregateStore's private fields
Bug: 40853
Change-Id: I364b31f79c9c24d5784a77ed4d16fe26f63d9b57
diff --git a/src/local_aggregation/aggregate_store.cc b/src/local_aggregation/aggregate_store.cc
index c9dc50e..994b569 100644
--- a/src/local_aggregation/aggregate_store.cc
+++ b/src/local_aggregation/aggregate_store.cc
@@ -182,9 +182,9 @@
CHECK_LE(backfill_days, kMaxAllowedBackfillDays)
<< "backfill_days must be less than or equal to " << kMaxAllowedBackfillDays;
backfill_days_ = backfill_days;
- auto locked = protected_aggregate_store_.lock();
+ auto locked_store = protected_aggregate_store_.lock();
auto restore_aggregates_status =
- local_aggregate_proto_store_->Read(&(locked->local_aggregate_store));
+ local_aggregate_proto_store_->Read(&(locked_store->local_aggregate_store));
switch (restore_aggregates_status.error_code()) {
case StatusCode::OK: {
VLOG(4) << "Read LocalAggregateStore from disk.";
@@ -194,7 +194,7 @@
VLOG(4) << "No file found for local_aggregate_proto_store. Proceeding "
"with empty LocalAggregateStore. File will be created on "
"first snapshot of the LocalAggregateStore.";
- locked->local_aggregate_store = MakeNewLocalAggregateStore();
+ locked_store->local_aggregate_store = MakeNewLocalAggregateStore();
break;
}
default: {
@@ -203,18 +203,19 @@
<< "\nError message: " << restore_aggregates_status.error_message()
<< "\nError details: " << restore_aggregates_status.error_details()
<< "\nProceeding with empty LocalAggregateStore.";
- locked->local_aggregate_store = MakeNewLocalAggregateStore();
+ locked_store->local_aggregate_store = MakeNewLocalAggregateStore();
}
}
- if (auto status = MaybeUpgradeLocalAggregateStore(&(locked->local_aggregate_store));
+ if (auto status = MaybeUpgradeLocalAggregateStore(&(locked_store->local_aggregate_store));
status != kOK) {
LOG(ERROR) << "Failed to upgrade LocalAggregateStore to current version with status " << status
<< ".\nProceeding with empty "
"LocalAggregateStore.";
- locked->local_aggregate_store = MakeNewLocalAggregateStore();
+ locked_store->local_aggregate_store = MakeNewLocalAggregateStore();
}
- auto restore_history_status = obs_history_proto_store_->Read(&obs_history_);
+ auto locked_obs_history = protected_obs_history_.lock();
+ auto restore_history_status = obs_history_proto_store_->Read(&locked_obs_history->obs_history);
switch (restore_history_status.error_code()) {
case StatusCode::OK: {
VLOG(4) << "Read AggregatedObservationHistoryStore from disk.";
@@ -232,14 +233,15 @@
<< "\nError message: " << restore_history_status.error_message()
<< "\nError details: " << restore_history_status.error_details()
<< "\nProceeding with empty AggregatedObservationHistoryStore.";
- obs_history_ = MakeNewObservationHistoryStore();
+ locked_obs_history->obs_history = MakeNewObservationHistoryStore();
}
}
- if (auto status = MaybeUpgradeObservationHistoryStore(&obs_history_); status != kOK) {
+ if (auto status = MaybeUpgradeObservationHistoryStore(&locked_obs_history->obs_history);
+ status != kOK) {
LOG(ERROR)
<< "Failed to upgrade AggregatedObservationHistoryStore to current version with status "
<< status << ".\nProceeding with empty AggregatedObservationHistoryStore.";
- obs_history_ = MakeNewObservationHistoryStore();
+ locked_obs_history->obs_history = MakeNewObservationHistoryStore();
}
}
@@ -349,7 +351,8 @@
}
Status AggregateStore::BackUpObservationHistory() {
- auto status = obs_history_proto_store_->Write(obs_history_);
+ auto obs_history = protected_obs_history_.lock()->obs_history;
+ auto status = obs_history_proto_store_->Write(obs_history);
if (!status.ok()) {
LOG(ERROR) << "Failed to back up the AggregatedObservationHistoryStore. "
"::cobalt::util::Status error code: "
@@ -614,11 +617,12 @@
} // namespace
-uint32_t AggregateStore::UniqueActivesLastGeneratedDayIndex(const std::string& report_key,
- uint32_t event_code,
- uint32_t aggregation_days) const {
- auto report_history = obs_history_.by_report_key().find(report_key);
- if (report_history == obs_history_.by_report_key().end()) {
+uint32_t AggregateStore::GetUniqueActivesLastGeneratedDayIndex(const std::string& report_key,
+ uint32_t event_code,
+ uint32_t aggregation_days) const {
+ auto obs_history = protected_obs_history_.const_lock()->obs_history;
+ auto report_history = obs_history.by_report_key().find(report_key);
+ if (report_history == obs_history.by_report_key().end()) {
return 0u;
}
auto event_code_history =
@@ -633,6 +637,17 @@
return window_history->second;
}
+void AggregateStore::SetUniqueActivesLastGeneratedDayIndex(const std::string& report_key,
+ uint32_t event_code,
+ uint32_t aggregation_days,
+ uint32_t value) {
+ auto locked = protected_obs_history_.lock();
+ (*(*(*locked->obs_history.mutable_by_report_key())[report_key]
+ .mutable_unique_actives_history()
+ ->mutable_by_event_code())[event_code]
+ .mutable_by_window_size())[aggregation_days] = value;
+}
+
Status AggregateStore::GenerateSingleUniqueActivesObservation(
const MetricRef metric_ref, const ReportDefinition* report, uint32_t obs_day_index,
uint32_t event_code, const OnDeviceAggregationWindow& window, bool was_active) const {
@@ -688,7 +703,7 @@
// been generated for this report, event code, and window size. If
// that day index is later than |final_day_index|, no Observation is
// generated on this invocation.
- auto last_gen = UniqueActivesLastGeneratedDayIndex(report_key, event_code, window.days());
+ auto last_gen = GetUniqueActivesLastGeneratedDayIndex(report_key, event_code, window.days());
auto first_day_index = std::max(last_gen + 1, backfill_period_start);
// The latest day index on which |event_type| is known to have
// occurred, so far. This value will be updated as we search
@@ -721,12 +736,8 @@
if (status != kOK) {
return status;
}
- // Update |obs_history_| with the latest date of Observation
- // generation for this report, event code, and window size.
- (*(*(*obs_history_.mutable_by_report_key())[report_key]
- .mutable_unique_actives_history()
- ->mutable_by_event_code())[event_code]
- .mutable_by_window_size())[window.days()] = obs_day_index;
+
+ SetUniqueActivesLastGeneratedDayIndex(report_key, event_code, window.days(), obs_day_index);
}
}
}
@@ -735,12 +746,13 @@
////////// GenerateObsFromNumericAggregates and helper methods /////////////
-uint32_t AggregateStore::PerDeviceNumericLastGeneratedDayIndex(const std::string& report_key,
- const std::string& component,
- uint32_t event_code,
- uint32_t aggregation_days) const {
- const auto& report_history = obs_history_.by_report_key().find(report_key);
- if (report_history == obs_history_.by_report_key().end()) {
+uint32_t AggregateStore::GetPerDeviceNumericLastGeneratedDayIndex(const std::string& report_key,
+ const std::string& component,
+ uint32_t event_code,
+ uint32_t aggregation_days) const {
+ auto obs_history = protected_obs_history_.const_lock()->obs_history;
+ const auto& report_history = obs_history.by_report_key().find(report_key);
+ if (report_history == obs_history.by_report_key().end()) {
return 0u;
}
if (!report_history->second.has_per_device_numeric_history()) {
@@ -763,15 +775,37 @@
return window_history->second;
}
-uint32_t AggregateStore::ReportParticipationLastGeneratedDayIndex(
+void AggregateStore::SetPerDeviceNumericLastGeneratedDayIndex(const std::string& report_key,
+ const std::string& component,
+ uint32_t event_code,
+ uint32_t aggregation_days,
+ uint32_t value) {
+ auto locked = protected_obs_history_.lock();
+ (*(*(*(*locked->obs_history.mutable_by_report_key())[report_key]
+ .mutable_per_device_numeric_history()
+ ->mutable_by_component())[component]
+ .mutable_by_event_code())[event_code]
+ .mutable_by_window_size())[aggregation_days] = value;
+}
+
+uint32_t AggregateStore::GetReportParticipationLastGeneratedDayIndex(
const std::string& report_key) const {
- const auto& report_history = obs_history_.by_report_key().find(report_key);
- if (report_history == obs_history_.by_report_key().end()) {
+ auto obs_history = protected_obs_history_.const_lock()->obs_history;
+ const auto& report_history = obs_history.by_report_key().find(report_key);
+ if (report_history == obs_history.by_report_key().end()) {
return 0u;
}
return report_history->second.report_participation_history().last_generated();
}
+void AggregateStore::SetReportParticipationLastGeneratedDayIndex(const std::string& report_key,
+ uint32_t value) {
+ auto locked = protected_obs_history_.lock();
+ (*locked->obs_history.mutable_by_report_key())[report_key]
+ .mutable_report_participation_history()
+ ->set_last_generated(value);
+}
+
Status AggregateStore::GenerateSinglePerDeviceNumericObservation(
const MetricRef metric_ref, const ReportDefinition* report, uint32_t obs_day_index,
const std::string& component, uint32_t event_code, const OnDeviceAggregationWindow& window,
@@ -862,8 +896,8 @@
LOG(INFO) << "Skipping unsupported aggregation window.";
continue;
}
- auto last_gen =
- PerDeviceNumericLastGeneratedDayIndex(report_key, component, event_code, window.days());
+ auto last_gen = GetPerDeviceNumericLastGeneratedDayIndex(report_key, component, event_code,
+ window.days());
auto first_day_index = std::max(last_gen + 1, backfill_period_start);
for (auto obs_day_index = first_day_index; obs_day_index <= final_day_index;
obs_day_index++) {
@@ -956,27 +990,21 @@
return kInvalidArguments;
}
}
- // Update |obs_history_| with the latest date of Observation
- // generation for this report, component, event code, and window.
- (*(*(*(*obs_history_.mutable_by_report_key())[report_key]
- .mutable_per_device_numeric_history()
- ->mutable_by_component())[component]
- .mutable_by_event_code())[event_code]
- .mutable_by_window_size())[window.days()] = obs_day_index;
+
+ SetPerDeviceNumericLastGeneratedDayIndex(report_key, component, event_code, window.days(),
+ obs_day_index);
}
}
}
}
// Generate any necessary ReportParticipationObservations for this report.
- auto participation_last_gen = ReportParticipationLastGeneratedDayIndex(report_key);
+ auto participation_last_gen = GetReportParticipationLastGeneratedDayIndex(report_key);
auto participation_first_day_index = std::max(participation_last_gen + 1, backfill_period_start);
for (auto obs_day_index = participation_first_day_index; obs_day_index <= final_day_index;
obs_day_index++) {
GenerateSingleReportParticipationObservation(
metric_ref, &report_aggregates.aggregation_config().report(), obs_day_index);
- (*obs_history_.mutable_by_report_key())[report_key]
- .mutable_report_participation_history()
- ->set_last_generated(obs_day_index);
+ SetReportParticipationLastGeneratedDayIndex(report_key, obs_day_index);
}
return kOK;
}
diff --git a/src/local_aggregation/aggregate_store.h b/src/local_aggregation/aggregate_store.h
index 7875d43..2cd0929 100644
--- a/src/local_aggregation/aggregate_store.h
+++ b/src/local_aggregation/aggregate_store.h
@@ -146,6 +146,45 @@
logger::Status GenerateObservations(uint32_t final_day_index_utc,
uint32_t final_day_index_local = 0u);
+ // Returns the most recent day index for which an Observation was generated
+ // for a given UNIQUE_N_DAY_ACTIVES report, event code, and day-based aggregation window,
+ // according to |protected_obs_history|. Returns 0 if no Observation has been generated
+ // for the given arguments.
+ uint32_t GetUniqueActivesLastGeneratedDayIndex(const std::string& report_key, uint32_t event_code,
+ uint32_t aggregation_days) const;
+
+ // Sets the most recent day index for which an Observation was generated
+ // for a given UNIQUE_N_DAY_ACTIVES report, event code, and day-based aggregation window,
+ // to |value| in |protected_obs_history|
+ void SetUniqueActivesLastGeneratedDayIndex(const std::string& report_key, uint32_t event_code,
+ uint32_t aggregation_days, uint32_t value);
+
+ // Returns the most recent day index for which an Observation was generated for a given
+ // PER_DEVICE_NUMERIC_STATS or PER_DEVICE_HISTOGRAM report, component, event code, and day-based
+ // aggregation window, according to |protected_obs_history|. Returns 0 if no Observation has been
+ // generated for the given arguments.
+ uint32_t GetPerDeviceNumericLastGeneratedDayIndex(const std::string& report_key,
+ const std::string& component,
+ uint32_t event_code,
+ uint32_t aggregation_days) const;
+
+ // Sets the most recent day index for which an Observation was generated for a given
+ // PER_DEVICE_NUMERIC_STATS or PER_DEVICE_HISTOGRAM report, component, event code, and day-based
+ // aggregation window, according to |protected_obs_history|.
+ void SetPerDeviceNumericLastGeneratedDayIndex(const std::string& report_key,
+ const std::string& component, uint32_t event_code,
+ uint32_t aggregation_days, uint32_t value);
+
+ // Returns the most recent day index for which a
+ // ReportParticipationObservation was generated for a given report, according
+ // to |obs_history_|. Returns 0 if no Observation has been generated for the
+ // given arguments.
+ uint32_t GetReportParticipationLastGeneratedDayIndex(const std::string& report_key) const;
+
+ // Set the most recent day index for which a ReportParticipationObservation was generated for a
+ // given report to |value|, according to |protected_obs_history|
+ void SetReportParticipationLastGeneratedDayIndex(const std::string& report_key, uint32_t value);
+
private:
friend class EventAggregator; // used for transition during redesign.
friend class AggregateStoreTest;
@@ -185,27 +224,6 @@
logger::Status MaybeUpgradeLocalAggregateStore(LocalAggregateStore* store);
logger::Status MaybeUpgradeObservationHistoryStore(AggregatedObservationHistoryStore* store);
- // Returns the most recent day index for which an Observation was generated
- // for a given UNIQUE_N_DAY_ACTIVES report, event code, and day-based aggregation window,
- // according to |obs_history_|. Returns 0 if no Observation has been generated
- // for the given arguments.
- uint32_t UniqueActivesLastGeneratedDayIndex(const std::string& report_key, uint32_t event_code,
- uint32_t aggregation_days) const;
-
- // Returns the most recent day index for which an Observation was generated for a given
- // PER_DEVICE_NUMERIC_STATS or PER_DEVICE_HISTOGRAM report, component, event code, and day-based
- // aggregation window, according to |obs_history_|. Returns 0 if no Observation has been generated
- // for the given arguments.
- uint32_t PerDeviceNumericLastGeneratedDayIndex(const std::string& report_key,
- const std::string& component, uint32_t event_code,
- uint32_t aggregation_days) const;
-
- // Returns the most recent day index for which a
- // ReportParticipationObservation was generated for a given report, according
- // to |obs_history_|. Returns 0 if no Observation has been generated for the
- // given arguments.
- uint32_t ReportParticipationLastGeneratedDayIndex(const std::string& report_key) const;
-
// For a fixed report of type UNIQUE_N_DAY_ACTIVES, generates an Observation
// for each event code of the parent metric, for each day-based aggregation window of the
// report ending on |final_day_index|, unless an Observation with those parameters was generated
@@ -282,14 +300,25 @@
LocalAggregateStore local_aggregate_store;
};
- const logger::Encoder* encoder_;
- const logger::ObservationWriter* observation_writer_;
- util::ConsistentProtoStore* local_aggregate_proto_store_;
- util::ConsistentProtoStore* obs_history_proto_store_;
- util::ProtectedFields<AggregateStoreFields> protected_aggregate_store_;
- // Not protected by a mutex. Should only be accessed by the Event Aggregator's |worker_thread_|.
- AggregatedObservationHistoryStore obs_history_;
+ struct AggregatedObservationHistoryStoreFields {
+ AggregatedObservationHistoryStore obs_history;
+ };
+
+ // The number of past days for which the AggregateStore generates and sends Observations, in
+ // addition to a requested day index.
size_t backfill_days_ = 0;
+
+ // Objects used to generate observations.
+ const logger::Encoder* encoder_; // not owned
+ const logger::ObservationWriter* observation_writer_; // not owned
+
+ // Used for loading and backing up the proto stores to disk.
+ util::ConsistentProtoStore* local_aggregate_proto_store_; // not owned
+ util::ConsistentProtoStore* obs_history_proto_store_; // not owned
+
+ // In memory store of local aggregations and data needed to derive them.
+ util::ProtectedFields<AggregateStoreFields> protected_aggregate_store_;
+ util::ProtectedFields<AggregatedObservationHistoryStoreFields> protected_obs_history_;
};
} // namespace cobalt::local_aggregation
diff --git a/src/local_aggregation/aggregate_store_test.cc b/src/local_aggregation/aggregate_store_test.cc
index 5237df2..62d1482 100644
--- a/src/local_aggregation/aggregate_store_test.cc
+++ b/src/local_aggregation/aggregate_store_test.cc
@@ -1278,6 +1278,43 @@
"", kTestEventCode, kTestDayIndex, /*value*/ 4));
}
+TEST_F(AggregateStoreTest, SetUniqueActivesLastGeneratedDayIndex) {
+ const std::string kReportKey = "test_key";
+ const int64_t kFirstValue = 3;
+
+ EXPECT_EQ(0u,
+ GetAggregateStore()->GetUniqueActivesLastGeneratedDayIndex(kReportKey, kTestEventCode,
+ /*aggregation_days*/ 1));
+ GetAggregateStore()->SetUniqueActivesLastGeneratedDayIndex(kReportKey, kTestEventCode,
+ /*aggregation_days*/ 1, kFirstValue);
+ EXPECT_EQ(kFirstValue,
+ GetAggregateStore()->GetUniqueActivesLastGeneratedDayIndex(kReportKey, kTestEventCode,
+ /*aggregation_days*/ 1));
+}
+TEST_F(AggregateStoreTest, SetPerDeviceNumericLastGeneratedDayIndex) {
+ const std::string kReportKey = "test_key";
+ const int64_t kFirstValue = 3;
+
+ EXPECT_EQ(0u, GetAggregateStore()->GetPerDeviceNumericLastGeneratedDayIndex(
+ kReportKey, "", kTestEventCode,
+ /*aggregation_days*/ 1));
+ GetAggregateStore()->SetPerDeviceNumericLastGeneratedDayIndex(kReportKey, "", kTestEventCode,
+ /*aggregation_days*/ 1,
+ kFirstValue);
+ EXPECT_EQ(kFirstValue, GetAggregateStore()->GetPerDeviceNumericLastGeneratedDayIndex(
+ kReportKey, "", kTestEventCode,
+ /*aggregation_days*/ 1));
+}
+TEST_F(AggregateStoreTest, SetReportParticipationLastGeneratedDayIndex) {
+ const std::string kReportKey = "test_key";
+ const int64_t kFirstValue = 3;
+
+ EXPECT_EQ(0u, GetAggregateStore()->GetReportParticipationLastGeneratedDayIndex(kReportKey));
+ GetAggregateStore()->SetReportParticipationLastGeneratedDayIndex(kReportKey, kFirstValue);
+ EXPECT_EQ(kFirstValue,
+ GetAggregateStore()->GetReportParticipationLastGeneratedDayIndex(kReportKey));
+}
+
// Tests that EventAggregator::GenerateObservations() returns a positive
// status and that the expected number of Observations is generated when no
// Events have been logged to the EventAggregator.