[privacy/LocalAggregation] Add Disable/DeleteData
- Disable allows disabling local aggregation, which means that the aggregate
store will ignore all incoming events.
- DeleteData will delete all device-specific data from the aggregate
store. The only data that will remain is data that is derived from the
Cobalt registry.
Bug: 37244
Change-Id: Ibd0eb29eaa3456b782c384bbb252037a7dc7a9ef
diff --git a/src/local_aggregation/aggregate_store.cc b/src/local_aggregation/aggregate_store.cc
index 994b569..34da7c3 100644
--- a/src/local_aggregation/aggregate_store.cc
+++ b/src/local_aggregation/aggregate_store.cc
@@ -183,6 +183,7 @@
<< "backfill_days must be less than or equal to " << kMaxAllowedBackfillDays;
backfill_days_ = backfill_days;
auto locked_store = protected_aggregate_store_.lock();
+ locked_store->empty_local_aggregate_store = MakeNewLocalAggregateStore();
auto restore_aggregates_status =
local_aggregate_proto_store_->Read(&(locked_store->local_aggregate_store));
switch (restore_aggregates_status.error_code()) {
@@ -261,11 +262,23 @@
}
(*locked->local_aggregate_store.mutable_by_report_key())[key] = report_aggregates;
}
+
+ // Make sure that the 'empty' store has the key as well.
+ ReportAggregates empty_report_aggregates;
+ if (locked->empty_local_aggregate_store.by_report_key().count(key) == 0) {
+ if (!PopulateReportAggregates(project_context, metric, report, &empty_report_aggregates)) {
+ return kInvalidArguments;
+ }
+ (*locked->empty_local_aggregate_store.mutable_by_report_key())[key] = empty_report_aggregates;
+ }
return kOK;
}
Status AggregateStore::SetActive(uint32_t customer_id, uint32_t project_id, uint32_t metric_id,
uint32_t report_id, uint64_t event_code, uint32_t day_index) {
+ if (is_disabled_) {
+ return kOK;
+ }
std::string key;
if (!PopulateReportKey(customer_id, project_id, metric_id, report_id, &key)) {
return kInvalidArguments;
@@ -293,6 +306,9 @@
uint32_t metric_id, uint32_t report_id,
const std::string& component, uint64_t event_code,
uint32_t day_index, int64_t value) {
+ if (is_disabled_) {
+ return kOK;
+ }
std::string report_key;
if (!PopulateReportKey(customer_id, project_id, metric_id, report_id, &report_key)) {
return kInvalidArguments;
@@ -806,6 +822,22 @@
->set_last_generated(value);
}
+void AggregateStore::DeleteData() {
+ LOG(INFO) << "AggregateStore: Deleting stored data";
+
+ {
+ auto locked = protected_aggregate_store_.lock();
+ locked->local_aggregate_store = locked->empty_local_aggregate_store;
+ }
+ protected_obs_history_.lock()->obs_history = MakeNewObservationHistoryStore();
+}
+
+void AggregateStore::Disable(bool is_disabled) {
+ LOG(INFO) << "AggregateStore: " << (is_disabled ? "Disabling" : "Enabling")
+ << " event aggregate storage.";
+ is_disabled_ = is_disabled;
+}
+
Status AggregateStore::GenerateSinglePerDeviceNumericObservation(
const MetricRef metric_ref, const ReportDefinition* report, uint32_t obs_day_index,
const std::string& component, uint32_t event_code, const OnDeviceAggregationWindow& window,
diff --git a/src/local_aggregation/aggregate_store.h b/src/local_aggregation/aggregate_store.h
index 729566f..d59c891 100644
--- a/src/local_aggregation/aggregate_store.h
+++ b/src/local_aggregation/aggregate_store.h
@@ -82,6 +82,9 @@
// event with the given |customer_id|, |project_id|, |metric_id|, |report_id| and |event_code|.
// Expects that MaybeInsertReportConfig() has been called previously for the ids being passed.
// Returns kInvalidArguments if the operation fails, and kOK otherwise.
+ //
+ // N.B. If the AggregateStore has been disabled (is_disabled_ == true), this method will do
+ // nothing, and will always return kOK.
logger::Status SetActive(uint32_t customer_id, uint32_t project_id, uint32_t metric_id,
uint32_t report_id, uint64_t event_code, uint32_t day_index);
@@ -89,6 +92,9 @@
// indexed by |customer_id|, |project_id|, |metric_id|, |report_id|, |component|, |event_code| and
// |day_index|. Expects that MaybeInsertReportConfig() has been called previously for the ids
// being passed. Returns kInvalidArguments if the operation fails, and kOK otherwise.
+ //
+ // N.B. If the AggregateStore has been disabled (is_disabled_ == true), this method will do
+ // nothing, and will always return kOK.
logger::Status UpdateNumericAggregate(uint32_t customer_id, uint32_t project_id,
uint32_t metric_id, uint32_t report_id,
const std::string& component, uint64_t event_code,
@@ -185,6 +191,19 @@
// given report to |value|, according to |protected_obs_history|
void SetReportParticipationLastGeneratedDayIndex(const std::string& report_key, uint32_t value);
+ // DeleteData removes all device-specific information from the LocalAggregateStore and the
+ // AggregatedObservationHistoryStore. The only data that remains is the data derived from the
+ // Metrics Registry in `MaybeInsertReportConfig`.
+ void DeleteData();
+
+ // Disable allows enabling/disabling the AggregateStore. When the store is disabled, the following
+ // will happen:
+ //
+ // 1. Calls to SetActive and UpdateNumericAggregate will do nothing and immediately return kOK.
+ // 2. Calls to MaybeInsertReportConfig will continue to function, but since this only stores
+ // information derived from the Metrics Registry, this is not a problem.
+ void Disable(bool is_disabled);
+
private:
friend class AggregateStoreTest;
friend class EventAggregatorTest;
@@ -297,12 +316,23 @@
struct AggregateStoreFields {
LocalAggregateStore local_aggregate_store;
+
+ // When clients connect to Cobalt, their ProjectContext is supplied to the AggregateStore in
+ // MaybeInsertReportConfig. This creates structures in the LocalAggregateStore that are required
+ // for SetActive and UpdateNumericAggregate to function. In order to allow deleting the data
+ // from the AggregateStore without needing to restart Cobalt, we store a copy of all these
+ // report configs here without any device-specific information. This way, when the DeleteData
+ // method is called, we can replace local_aggregate_store with empty_local_aggregate_store, and
+ // both SetActive and UpdateNumericAggregate will continue to function as expected.
+ LocalAggregateStore empty_local_aggregate_store;
};
struct AggregatedObservationHistoryStoreFields {
AggregatedObservationHistoryStore obs_history;
};
+ bool is_disabled_ = false;
+
// The number of past days for which the AggregateStore generates and sends Observations, in
// addition to a requested day index.
size_t backfill_days_ = 0;
diff --git a/src/local_aggregation/event_aggregator_mgr.cc b/src/local_aggregation/event_aggregator_mgr.cc
index 6310e5d..4d7bb1a 100644
--- a/src/local_aggregation/event_aggregator_mgr.cc
+++ b/src/local_aggregation/event_aggregator_mgr.cc
@@ -71,7 +71,7 @@
{
auto locked = protected_worker_thread_controller_.lock();
locked->shut_down = true;
- locked->shutdown_notifier.notify_all();
+ locked->wakeup_notifier.notify_all();
}
worker_thread_.join();
} else {
@@ -97,14 +97,18 @@
// Sleep until the next scheduled backup of the LocalAggregateStore or
// until notified of shutdown. Back up the LocalAggregateStore after
// waking.
- locked->shutdown_notifier.wait_for(locked, aggregate_backup_interval_, [&locked]() {
+ locked->wakeup_notifier.wait_for(locked, aggregate_backup_interval_, [&locked]() {
if (locked->immediate_run_trigger) {
locked->immediate_run_trigger = false;
return true;
}
- return locked->shut_down;
+ return locked->shut_down || locked->back_up_now;
});
aggregate_store_->BackUpLocalAggregateStore();
+ if (locked->back_up_now) {
+ locked->back_up_now = false;
+ aggregate_store_->BackUpObservationHistory();
+ }
// If the worker thread was woken up by a shutdown request, exit.
// Otherwise, complete any scheduled Observation generation and garbage
// collection.
@@ -168,6 +172,12 @@
return aggregate_store_->GenerateObservations(final_day_index_utc, final_day_index_local);
}
+void EventAggregatorManager::TriggerBackups() {
+ auto locked = protected_worker_thread_controller_.lock();
+ locked->back_up_now = true;
+ locked->wakeup_notifier.notify_all();
+}
+
void EventAggregatorManager::Reset() {
aggregate_store_ = std::make_unique<AggregateStore>(
encoder_, observation_writer_, owned_local_aggregate_proto_store_.get(),
diff --git a/src/local_aggregation/event_aggregator_mgr.h b/src/local_aggregation/event_aggregator_mgr.h
index 90dd243..0267f66 100644
--- a/src/local_aggregation/event_aggregator_mgr.h
+++ b/src/local_aggregation/event_aggregator_mgr.h
@@ -125,6 +125,12 @@
logger::Status GenerateObservationsNoWorker(uint32_t final_day_index_utc,
uint32_t final_day_index_local = 0u);
+ void Disable(bool is_disabled) { aggregate_store_->Disable(is_disabled); }
+ void DeleteData() {
+ aggregate_store_->DeleteData();
+ TriggerBackups();
+ }
+
private:
friend class TestEventAggregatorManager;
friend class EventAggregatorManagerTest;
@@ -157,6 +163,12 @@
void DoScheduledTasks(std::chrono::system_clock::time_point system_time,
std::chrono::steady_clock::time_point steady_time);
+ // Triggers the work thread to wake up and back up the LocalAggregateStore and the
+ // ObservationHistory.
+ //
+ // TODO(zmbush): Rename "backup" nomenclature to "checkpoint".
+ void TriggerBackups();
+
struct WorkerThreadController {
// Setting this value to true requests that the worker thread stop.
bool shut_down = true;
@@ -166,17 +178,12 @@
// its work, it will reset this value to false.
bool immediate_run_trigger = false;
- // Used to wait on to execute periodic EventAggregator tasks.
- std::condition_variable_any shutdown_notifier;
- };
+ // Setting this value to true requests that the worker thread wake up and back up the aggregate
+ // store and the observation history, before going back to sleep.
+ bool back_up_now = false;
- struct ShutDownFlag {
- // Used to trigger a shutdown of the EventAggregator.
- bool shut_down = true;
- // Used in tests to manually trigger a run of the EventAggregator's scheduled tasks.
- bool manual_trigger = false;
// Used to wait on to execute periodic EventAggregator tasks.
- std::condition_variable_any shutdown_notifier;
+ std::condition_variable_any wakeup_notifier;
};
const logger::Encoder* encoder_;
diff --git a/src/local_aggregation/event_aggregator_mgr_test.cc b/src/local_aggregation/event_aggregator_mgr_test.cc
index 104824e..53d78ab 100644
--- a/src/local_aggregation/event_aggregator_mgr_test.cc
+++ b/src/local_aggregation/event_aggregator_mgr_test.cc
@@ -203,7 +203,7 @@
// Acquire the lock to manually trigger the scheduled tasks.
auto locked = event_aggregator_mgr->protected_worker_thread_controller_.lock();
locked->immediate_run_trigger = true;
- locked->shutdown_notifier.notify_all();
+ locked->wakeup_notifier.notify_all();
}
while (true) {
// Reacquire the lock to make sure that the scheduled tasks have completed.
@@ -288,6 +288,106 @@
NumberOfKVPairsInStore(event_aggregator_mgr.get()));
}
+TEST_F(EventAggregatorManagerTest, LogEventsAfterDelete) {
+ auto event_aggregator_mgr = GetEventAggregatorManager(GetTestSteadyClock());
+ event_aggregator_mgr->Start(GetTestSystemClock());
+
+ auto day_index = CurrentDayIndex();
+ // Provide the EventAggregator with the all_report_types registry.
+ std::shared_ptr<ProjectContext> project_context = GetTestProject(kCobaltRegistryBase64);
+ EXPECT_EQ(kOK,
+ event_aggregator_mgr->GetEventAggregator()->UpdateAggregationConfigs(*project_context));
+
+ EXPECT_EQ(kOK, AddUniqueActivesEvent(event_aggregator_mgr.get(), project_context,
+ kDeviceBootsMetricReportId, day_index, /*event_code*/ 0u));
+
+ event_aggregator_mgr->DeleteData();
+
+ EXPECT_EQ(kOK,
+ AddUniqueActivesEvent(event_aggregator_mgr.get(), project_context,
+ kFeaturesActiveMetricReportId, day_index, /*event_code*/ 4u));
+ EXPECT_EQ(kOK,
+ AddUniqueActivesEvent(event_aggregator_mgr.get(), project_context,
+ kEventsOccurredMetricReportId, day_index, /*event_code*/ 1u));
+
+ EXPECT_EQ(2, GetNumberOfUniqueActivesAggregates(event_aggregator_mgr.get()));
+ EXPECT_FALSE(ContainsValidUniqueActivesAggregate(event_aggregator_mgr.get(), project_context,
+ kDeviceBootsMetricReportId, day_index,
+ /*event_code*/ 0u));
+ EXPECT_TRUE(ContainsValidUniqueActivesAggregate(event_aggregator_mgr.get(), project_context,
+ kFeaturesActiveMetricReportId, day_index,
+ /*event_code*/ 4u));
+ EXPECT_TRUE(ContainsValidUniqueActivesAggregate(event_aggregator_mgr.get(), project_context,
+ kEventsOccurredMetricReportId, day_index,
+ /*event_code*/ 1u));
+
+ ShutDown(event_aggregator_mgr.get());
+ EXPECT_TRUE(BackUpHappened());
+}
+
+TEST_F(EventAggregatorManagerTest, DeleteData) {
+ auto event_aggregator_mgr = GetEventAggregatorManager(GetTestSteadyClock());
+ event_aggregator_mgr->Start(GetTestSystemClock());
+
+ auto day_index = CurrentDayIndex();
+ // Provide the EventAggregator with the all_report_types registry.
+ std::shared_ptr<ProjectContext> project_context = GetTestProject(kCobaltRegistryBase64);
+ EXPECT_EQ(kOK,
+ event_aggregator_mgr->GetEventAggregator()->UpdateAggregationConfigs(*project_context));
+
+ EXPECT_EQ(kOK, AddUniqueActivesEvent(event_aggregator_mgr.get(), project_context,
+ kDeviceBootsMetricReportId, day_index, /*event_code*/ 0u));
+
+ EXPECT_EQ(kOK,
+ AddUniqueActivesEvent(event_aggregator_mgr.get(), project_context,
+ kFeaturesActiveMetricReportId, day_index, /*event_code*/ 4u));
+ EXPECT_EQ(kOK,
+ AddUniqueActivesEvent(event_aggregator_mgr.get(), project_context,
+ kEventsOccurredMetricReportId, day_index, /*event_code*/ 1u));
+
+ EXPECT_EQ(3, GetNumberOfUniqueActivesAggregates(event_aggregator_mgr.get()));
+ event_aggregator_mgr->DeleteData();
+ EXPECT_EQ(0, GetNumberOfUniqueActivesAggregates(event_aggregator_mgr.get()));
+}
+
+TEST_F(EventAggregatorManagerTest, Disable) {
+ auto event_aggregator_mgr = GetEventAggregatorManager(GetTestSteadyClock());
+ event_aggregator_mgr->Start(GetTestSystemClock());
+
+ auto day_index = CurrentDayIndex();
+ // Provide the EventAggregator with the all_report_types registry.
+ std::shared_ptr<ProjectContext> project_context = GetTestProject(kCobaltRegistryBase64);
+ EXPECT_EQ(kOK,
+ event_aggregator_mgr->GetEventAggregator()->UpdateAggregationConfigs(*project_context));
+
+ event_aggregator_mgr->Disable(true);
+
+ EXPECT_EQ(kOK, AddUniqueActivesEvent(event_aggregator_mgr.get(), project_context,
+ kDeviceBootsMetricReportId, day_index, /*event_code*/ 0u));
+
+ EXPECT_EQ(kOK,
+ AddUniqueActivesEvent(event_aggregator_mgr.get(), project_context,
+ kFeaturesActiveMetricReportId, day_index, /*event_code*/ 4u));
+ EXPECT_EQ(kOK,
+ AddUniqueActivesEvent(event_aggregator_mgr.get(), project_context,
+ kEventsOccurredMetricReportId, day_index, /*event_code*/ 1u));
+
+ EXPECT_EQ(0, GetNumberOfUniqueActivesAggregates(event_aggregator_mgr.get()));
+
+ event_aggregator_mgr->Disable(false);
+
+ EXPECT_EQ(kOK, AddUniqueActivesEvent(event_aggregator_mgr.get(), project_context,
+ kDeviceBootsMetricReportId, day_index, /*event_code*/ 0u));
+
+ EXPECT_EQ(kOK,
+ AddUniqueActivesEvent(event_aggregator_mgr.get(), project_context,
+ kFeaturesActiveMetricReportId, day_index, /*event_code*/ 4u));
+ EXPECT_EQ(kOK,
+ AddUniqueActivesEvent(event_aggregator_mgr.get(), project_context,
+ kEventsOccurredMetricReportId, day_index, /*event_code*/ 1u));
+ EXPECT_EQ(3, GetNumberOfUniqueActivesAggregates(event_aggregator_mgr.get()));
+}
+
TEST_F(EventAggregatorManagerTest, LogEvents) {
auto event_aggregator_mgr = GetEventAggregatorManager(GetTestSteadyClock());
event_aggregator_mgr->Start(GetTestSystemClock());
diff --git a/src/local_aggregation/event_aggregator_test.cc b/src/local_aggregation/event_aggregator_test.cc
index c0ddbc8..73911b2 100644
--- a/src/local_aggregation/event_aggregator_test.cc
+++ b/src/local_aggregation/event_aggregator_test.cc
@@ -155,7 +155,7 @@
// Acquire the lock to manually trigger the scheduled tasks.
auto locked = event_aggregator_mgr_->protected_worker_thread_controller_.lock();
locked->immediate_run_trigger = true;
- locked->shutdown_notifier.notify_all();
+ locked->wakeup_notifier.notify_all();
}
while (true) {
// Reacquire the lock to make sure that the scheduled tasks have completed.