[EventAggregator] Log events for PER_DEVICE_COUNT_STATS reports
Adds a method to log CountEvents to the EventAggregator for reports
of type PER_DEVICE_COUNT_STATS. Updates the config update
method and the garbage collection method to handle the new type of
local aggregates.
Observation generation for PER_DEVICE_COUNT_STATS reports
will be added in a follow-up CL.
Change-Id: Ic522f07e1264540ba02cb1dc721e72c1b033b38b
diff --git a/logger/event_aggregator.cc b/logger/event_aggregator.cc
index cd7a20b..706ef72 100644
--- a/logger/event_aggregator.cc
+++ b/logger/event_aggregator.cc
@@ -26,6 +26,80 @@
namespace logger {
+namespace {
+
+// Creates an AggregationConfig from a ProjectContext, MetricDefinition, and
+// ReportDefinition and populates the aggregation_config field of a specified
+// ReportAggregates. Also sets the type of the ReportAggregates based on the
+// ReportDefinition's type.
+bool PopulateReportAggregates(const ProjectContext& project_context,
+ const MetricDefinition& metric,
+ const ReportDefinition& report,
+ ReportAggregates* report_aggregates) {
+ AggregationConfig* aggregation_config =
+ report_aggregates->mutable_aggregation_config();
+ *aggregation_config->mutable_project() = project_context.project();
+ *aggregation_config->mutable_metric() =
+ *project_context.GetMetric(metric.id());
+ *aggregation_config->mutable_report() = report;
+ switch (report.report_type()) {
+ case ReportDefinition::UNIQUE_N_DAY_ACTIVES: {
+ report_aggregates->set_allocated_unique_actives_aggregates(
+ new UniqueActivesReportAggregates);
+ return true;
+ }
+ case ReportDefinition::PER_DEVICE_COUNT_STATS: {
+ report_aggregates->set_allocated_count_aggregates(
+ new PerDeviceCountReportAggregates);
+ return true;
+ }
+ default:
+ return false;
+ }
+}
+
+// Populates a ReportAggregationKey proto message and then populates a string
+// with the base64 encoding of the serialized proto.
+bool PopulateReportKey(uint32_t customer_id, uint32_t project_id,
+ uint32_t metric_id, uint32_t report_id,
+ std::string* key) {
+ ReportAggregationKey key_data;
+ key_data.set_customer_id(customer_id);
+ key_data.set_project_id(project_id);
+ key_data.set_metric_id(metric_id);
+ key_data.set_report_id(report_id);
+ return SerializeToBase64(key_data, key);
+}
+
+// Given a ProjectContext, MetricDefinition, and ReportDefinition and a pointer
+// to the LocalAggregateStore, checks whether a key with the same customer,
+// project, metric, and report ID already exists in the LocalAggregateStore. If
+// not, creates and inserts a new key and value. Returns kInvalidArguments if
+// creation of the key or value fails, and kOK otherwise. The caller should hold
+// the mutex protecting the LocalAggregateStore.
+Status MaybeInsertReportConfig(const ProjectContext& project_context,
+ const MetricDefinition& metric,
+ const ReportDefinition& report,
+ LocalAggregateStore* store) {
+ std::string key;
+ if (!PopulateReportKey(project_context.project().customer_id(),
+ project_context.project().project_id(), metric.id(),
+ report.id(), &key)) {
+ return kInvalidArguments;
+ }
+ ReportAggregates report_aggregates;
+ if (store->by_report_key().count(key) == 0) {
+ if (!PopulateReportAggregates(project_context, metric, report,
+ &report_aggregates)) {
+ return kInvalidArguments;
+ }
+ (*store->mutable_by_report_key())[key] = report_aggregates;
+ }
+ return kOK;
+}
+
+} // namespace
+
EventAggregator::EventAggregator(
const Encoder* encoder, const ObservationWriter* observation_writer,
ConsistentProtoStore* local_aggregate_proto_store,
@@ -42,9 +116,9 @@
"generate_obs_interval";
CHECK_LE(aggregate_backup_interval.count(), gc_interval.count())
<< "aggregate_backup_interval must be less than or equal to gc_interval";
- CHECK_LE(backfill_days, kEventAggregatorMaxAllowedBackfillDays)
+ CHECK_LE(backfill_days, kMaxAllowedBackfillDays)
<< "backfill_days must be less than or equal to "
- << kEventAggregatorMaxAllowedBackfillDays;
+ << kMaxAllowedBackfillDays;
aggregate_backup_interval_ = aggregate_backup_interval;
generate_obs_interval_ = generate_obs_interval;
gc_interval_ = gc_interval;
@@ -76,14 +150,14 @@
auto restore_history_status = obs_history_proto_store_->Read(&obs_history_);
switch (restore_history_status.error_code()) {
case StatusCode::OK: {
- VLOG(4) << "Read AggregatedObservationHistory from disk.";
+ VLOG(4) << "Read AggregatedObservationHistoryStore from disk.";
break;
}
case StatusCode::NOT_FOUND: {
- VLOG(4)
- << "No file found for obs_history_proto_store. Proceeding "
- "with empty AggregatedObservationHistory. File will be created on "
- "first snapshot of the AggregatedObservationHistory.";
+ VLOG(4) << "No file found for obs_history_proto_store. Proceeding "
+ "with empty AggregatedObservationHistoryStore. File will be "
+ "created on first snapshot of the "
+ "AggregatedObservationHistoryStore.";
break;
}
default: {
@@ -93,7 +167,7 @@
<< "\nError details: "
<< restore_history_status.error_details()
<< "\nProceeding with empty AggregatedObservationHistory.";
- obs_history_ = AggregatedObservationHistory();
+ obs_history_ = AggregatedObservationHistoryStore();
}
}
clock_.reset(new SystemClock());
@@ -111,38 +185,40 @@
// sizes are <= |kMaxAllowedAggregationWindowSize|. Additionally, have
// this method filter out any window sizes larger than
// |kMaxAllowedAggregationWindowSize|.
+//
+// TODO(pesk): update the EventAggregator's view of a Metric
+// or ReportDefinition when appropriate.
Status EventAggregator::UpdateAggregationConfigs(
const ProjectContext& project_context) {
auto locked = protected_aggregate_store_.lock();
- std::string key;
- ReportAggregationKey key_data;
- key_data.set_customer_id(project_context.project().customer_id());
- key_data.set_project_id(project_context.project().project_id());
+ Status status;
for (const auto& metric : project_context.metric_definitions()->metric()) {
switch (metric.metric_type()) {
case MetricDefinition::EVENT_OCCURRED: {
- key_data.set_metric_id(metric.id());
for (const auto& report : metric.reports()) {
switch (report.report_type()) {
case ReportDefinition::UNIQUE_N_DAY_ACTIVES: {
- key_data.set_report_id(report.id());
- if (!SerializeToBase64(key_data, &key)) {
- return kInvalidArguments;
+ status =
+ MaybeInsertReportConfig(project_context, metric, report,
+ &(locked->local_aggregate_store));
+ if (status != kOK) {
+ return status;
}
- // TODO(pesk): update the EventAggregator's view of a Metric
- // or ReportDefinition when appropriate.
- if (locked->local_aggregate_store.aggregates().count(key) == 0) {
- AggregationConfig aggregation_config;
- *aggregation_config.mutable_project() =
- project_context.project();
- *aggregation_config.mutable_metric() =
- *project_context.GetMetric(metric.id());
- *aggregation_config.mutable_report() = report;
- ReportAggregates report_aggregates;
- *report_aggregates.mutable_aggregation_config() =
- aggregation_config;
- (*locked->local_aggregate_store.mutable_aggregates())[key] =
- report_aggregates;
+ }
+ default:
+ continue;
+ }
+ }
+ }
+ case MetricDefinition::EVENT_COUNT: {
+ for (const auto& report : metric.reports()) {
+ switch (report.report_type()) {
+ case ReportDefinition::PER_DEVICE_COUNT_STATS: {
+ status =
+ MaybeInsertReportConfig(project_context, metric, report,
+ &(locked->local_aggregate_store));
+ if (status != kOK) {
+ return status;
}
}
default:
@@ -164,30 +240,73 @@
"accept OccurrenceEvents.";
return kInvalidArguments;
}
- ReportAggregationKey key_data;
- key_data.set_customer_id(event_record->metric->customer_id());
- key_data.set_project_id(event_record->metric->project_id());
- key_data.set_metric_id(event_record->metric->id());
- key_data.set_report_id(report_id);
std::string key;
- if (!SerializeToBase64(key_data, &key)) {
+ if (!PopulateReportKey(event_record->metric->customer_id(),
+ event_record->metric->project_id(),
+ event_record->metric->id(), report_id, &key)) {
return kInvalidArguments;
}
auto locked = protected_aggregate_store_.lock();
auto aggregates =
- locked->local_aggregate_store.mutable_aggregates()->find(key);
- if (aggregates == locked->local_aggregate_store.mutable_aggregates()->end()) {
+ locked->local_aggregate_store.mutable_by_report_key()->find(key);
+ if (aggregates ==
+ locked->local_aggregate_store.mutable_by_report_key()->end()) {
LOG(ERROR) << "The Local Aggregate Store received an unexpected key.";
return kInvalidArguments;
}
- (*(*aggregates->second.mutable_by_event_code())
- [event_record->event->occurrence_event().event_code()]
- .mutable_by_day_index())[event_record->event->day_index()]
+ if (!aggregates->second.has_unique_actives_aggregates()) {
+ LOG(ERROR) << "The local aggregates for this report key are not of type "
+ "UniqueActivesReportAggregates.";
+ return kInvalidArguments;
+ }
+ (*(*aggregates->second.mutable_unique_actives_aggregates()
+ ->mutable_by_event_code())[event_record->event->occurrence_event()
+ .event_code()]
+ .mutable_by_day_index())[event_record->event->day_index()]
.mutable_activity_daily_aggregate()
->set_activity_indicator(true);
return kOK;
}
+Status EventAggregator::LogPerDeviceCountEvent(uint32_t report_id,
+ EventRecord* event_record) {
+ if (!event_record->event->has_count_event()) {
+ LOG(ERROR) << "EventAggregator::LogPerDeviceCountEvent can only accept "
+ "CountEvents.";
+ return kInvalidArguments;
+ }
+ std::string key;
+ if (!PopulateReportKey(event_record->metric->customer_id(),
+ event_record->metric->project_id(),
+ event_record->metric->id(), report_id, &key)) {
+ return kInvalidArguments;
+ }
+ auto locked = protected_aggregate_store_.lock();
+ auto aggregates =
+ locked->local_aggregate_store.mutable_by_report_key()->find(key);
+ if (aggregates ==
+ locked->local_aggregate_store.mutable_by_report_key()->end()) {
+ LOG(ERROR) << "The Local Aggregate Store received an unexpected key.";
+ return kInvalidArguments;
+ }
+ if (!aggregates->second.has_count_aggregates()) {
+ LOG(ERROR) << "The local aggregates for this report key are not of type "
+ "PerDeviceCountReportAggregates.";
+ return kInvalidArguments;
+ }
+ auto daily_aggregate =
+ (*(*(*aggregates->second.mutable_count_aggregates()
+ ->mutable_by_component())[event_record->event->count_event()
+ .component()]
+ .mutable_by_event_code())[event_record->event->count_event()
+ .event_code(0)]
+ .mutable_by_day_index())[event_record->event->day_index()]
+ .mutable_count_daily_aggregate();
+ daily_aggregate->set_count(daily_aggregate->count() +
+ event_record->event->count_event().count());
+ return kOK;
+}
+
Status EventAggregator::GenerateObservationsNoWorker(
uint32_t final_day_index_utc, uint32_t final_day_index_local) {
if (worker_thread_.joinable()) {
@@ -216,7 +335,7 @@
Status EventAggregator::BackUpObservationHistory() {
auto status = obs_history_proto_store_->Write(obs_history_);
if (!status.ok()) {
- LOG(ERROR) << "Failed to back up the AggregatedObservationHistory. "
+ LOG(ERROR) << "Failed to back up the AggregatedObservationHistoryStore. "
"::cobalt::util::Status error code: "
<< status.error_code()
<< "\nError message: " << status.error_message()
@@ -248,19 +367,22 @@
// Acquire the mutex protecting the shutdown flag and condition variable.
auto locked = protected_shutdown_flag_.lock();
while (true) {
- // If shutdown has been requested, back up the LocalAggregateStore and exit.
+ // If shutdown has been requested, back up the LocalAggregateStore and
+ // exit.
if (locked->shut_down) {
BackUpLocalAggregateStore();
return;
}
- // Sleep until the next scheduled backup of the LocalAggregateStore or until
- // notified of shutdown. Back up the LocalAggregateStore after waking.
+ // Sleep until the next scheduled backup of the LocalAggregateStore or
+ // until notified of shutdown. Back up the LocalAggregateStore after
+ // waking.
auto shutdown_requested = locked.wait_for_with(
&(locked->shutdown_notifier), aggregate_backup_interval_,
[&locked]() { return locked->shut_down; });
BackUpLocalAggregateStore();
- // If the worker thread was woken up by a shutdown request, exit. Otherwise,
- // complete any scheduled Observation generation and garbage collection.
+ // If the worker thread was woken up by a shutdown request, exit.
+ // Otherwise, complete any scheduled Observation generation and garbage
+ // collection.
if (shutdown_requested) {
return;
}
@@ -308,7 +430,7 @@
// Lock, copy the LocalAggregateStore, and release the lock. Use the copy to
// generate observations.
auto local_aggregate_store = CopyLocalAggregateStore();
- for (auto pair : local_aggregate_store.aggregates()) {
+ for (auto pair : local_aggregate_store.by_report_key()) {
const auto& config = pair.second.aggregation_config();
const auto& metric = config.metric();
@@ -380,7 +502,7 @@
day_index_local = day_index_utc;
}
auto locked = protected_aggregate_store_.lock();
- for (auto pair : locked->local_aggregate_store.aggregates()) {
+ for (auto pair : locked->local_aggregate_store.by_report_key()) {
uint32_t day_index;
switch (pair.second.aggregation_config().metric().time_zone_policy()) {
case MetricDefinition::UTC: {
@@ -414,41 +536,114 @@
LOG(ERROR) << "day_index must be >= backfill_days_ + max_window_size.";
return kInvalidArguments;
}
- // For each event code, iterate over the sub-map of local aggregates
- // keyed by day index. Keep buckets with day indices greater than
- // |day_index| - |backfill_days_| - |max_window_size|, and remove
- // all buckets with smaller day indices.
- for (auto event_code_aggregates : pair.second.by_event_code()) {
- for (auto day_aggregates : event_code_aggregates.second.by_day_index()) {
- if (day_aggregates.first <=
- day_index - backfill_days_ - max_window_size) {
- locked->local_aggregate_store.mutable_aggregates()
- ->at(pair.first)
- .mutable_by_event_code()
- ->at(event_code_aggregates.first)
- .mutable_by_day_index()
- ->erase(day_aggregates.first);
+ // For each ReportAggregates, descend to and iterate over the sub-map of
+ // local aggregates keyed by day index. Keep buckets with day indices
+ // greater than |day_index| - |backfill_days_| - |max_window_size|, and
+ // remove all buckets with smaller day indices.
+ switch (pair.second.type_case()) {
+ case ReportAggregates::kUniqueActivesAggregates: {
+ for (auto event_code_aggregates :
+ pair.second.unique_actives_aggregates().by_event_code()) {
+ for (auto day_aggregates :
+ event_code_aggregates.second.by_day_index()) {
+ if (day_aggregates.first <=
+ day_index - backfill_days_ - max_window_size) {
+ locked->local_aggregate_store.mutable_by_report_key()
+ ->at(pair.first)
+ .mutable_unique_actives_aggregates()
+ ->mutable_by_event_code()
+ ->at(event_code_aggregates.first)
+ .mutable_by_day_index()
+ ->erase(day_aggregates.first);
+ }
+ }
+ // If the day index map under this event code is empty, remove the
+ // event code from the event code-keyed map under this
+ // ReportAggregationKey.
+ if (locked->local_aggregate_store.by_report_key()
+ .at(pair.first)
+ .unique_actives_aggregates()
+ .by_event_code()
+ .at(event_code_aggregates.first)
+ .by_day_index()
+ .empty()) {
+ locked->local_aggregate_store.mutable_by_report_key()
+ ->at(pair.first)
+ .mutable_unique_actives_aggregates()
+ ->mutable_by_event_code()
+ ->erase(event_code_aggregates.first);
+ }
}
+ break;
}
- // If the day index map under this event code is empty, remove the event
- // code from the event code map under this ReportAggregationKey.
- if (locked->local_aggregate_store.aggregates()
- .at(pair.first)
- .by_event_code()
- .at(event_code_aggregates.first)
- .by_day_index()
- .empty()) {
- locked->local_aggregate_store.mutable_aggregates()
- ->at(pair.first)
- .mutable_by_event_code()
- ->erase(event_code_aggregates.first);
+ case ReportAggregates::kCountAggregates: {
+ for (auto component_aggregates :
+ pair.second.count_aggregates().by_component()) {
+ for (auto event_code_aggregates :
+ component_aggregates.second.by_event_code()) {
+ for (auto day_aggregates :
+ event_code_aggregates.second.by_day_index()) {
+ if (day_aggregates.first <=
+ day_index - backfill_days_ - max_window_size) {
+ locked->local_aggregate_store.mutable_by_report_key()
+ ->at(pair.first)
+ .mutable_count_aggregates()
+ ->mutable_by_component()
+ ->at(component_aggregates.first)
+ .mutable_by_event_code()
+ ->at(event_code_aggregates.first)
+ .mutable_by_day_index()
+ ->erase(day_aggregates.first);
+ }
+ }
+ // If the day index map under this event code is empty, remove the
+ // event code from the event code-keyed map under this
+ // ReportAggregationKey.
+ if (locked->local_aggregate_store.by_report_key()
+ .at(pair.first)
+ .count_aggregates()
+ .by_component()
+ .at(component_aggregates.first)
+ .by_event_code()
+ .at(event_code_aggregates.first)
+ .by_day_index()
+ .empty()) {
+ locked->local_aggregate_store.mutable_by_report_key()
+ ->at(pair.first)
+ .mutable_count_aggregates()
+ ->mutable_by_component()
+ ->at(component_aggregates.first)
+ .mutable_by_event_code()
+ ->erase(event_code_aggregates.first);
+ }
+ }
+ // If the event code map under this component string is empty,
+ // remove the component string from the component-keyed map under
+ // this ReportAggregationKey.
+ if (locked->local_aggregate_store.by_report_key()
+ .at(pair.first)
+ .count_aggregates()
+ .by_component()
+ .at(component_aggregates.first)
+ .by_event_code()
+ .empty()) {
+ locked->local_aggregate_store.mutable_by_report_key()
+ ->at(pair.first)
+ .mutable_count_aggregates()
+ ->mutable_by_component()
+ ->erase(component_aggregates.first);
+ }
+ }
+ break;
}
+ default:
+ continue;
}
}
return kOK;
}
-////////// GenerateUniqueActivesObservations and helper methods ////////////////
+////////// GenerateUniqueActivesObservations and helper methods ///////////////
// Given the set of daily aggregates for a fixed event code, and the size and
// end date of an aggregation window, returns the first day index within that
@@ -478,16 +673,18 @@
active_day_index > obs_day_index - window_size);
}
-uint32_t EventAggregator::LastGeneratedDayIndex(const std::string& report_key,
- uint32_t event_code,
- uint32_t window_size) const {
+uint32_t EventAggregator::UniqueActivesLastGeneratedDayIndex(
+ const std::string& report_key, uint32_t event_code,
+ uint32_t window_size) const {
auto report_history = obs_history_.by_report_key().find(report_key);
if (report_history == obs_history_.by_report_key().end()) {
return 0u;
}
auto event_code_history =
- report_history->second.by_event_code().find(event_code);
- if (event_code_history == report_history->second.by_event_code().end()) {
+ report_history->second.unique_actives_history().by_event_code().find(
+ event_code);
+ if (event_code_history ==
+ report_history->second.unique_actives_history().by_event_code().end()) {
return 0u;
}
auto window_size_history =
@@ -527,10 +724,13 @@
const ReportAggregates& report_aggregates, uint32_t num_event_codes,
uint32_t final_day_index) {
for (uint32_t event_code = 0; event_code < num_event_codes; event_code++) {
- auto daily_aggregates = report_aggregates.by_event_code().find(event_code);
+ auto daily_aggregates =
+ report_aggregates.unique_actives_aggregates().by_event_code().find(
+ event_code);
// Have any events ever been logged for this report and event code?
bool found_event_code =
- (daily_aggregates != report_aggregates.by_event_code().end());
+ (daily_aggregates !=
+ report_aggregates.unique_actives_aggregates().by_event_code().end());
for (uint32_t window_size :
report_aggregates.aggregation_config().report().window_size()) {
// Skip any window size larger than
@@ -544,8 +744,8 @@
// been generated for this report, event code, and window size. If
// that day index is later than |final_day_index|, no Observation is
// generated on this invocation.
- auto last_gen =
- LastGeneratedDayIndex(report_key, event_code, window_size);
+ auto last_gen = UniqueActivesLastGeneratedDayIndex(report_key, event_code,
+ window_size);
auto first_day_index =
std::max(last_gen + 1, uint32_t(final_day_index - backfill_days_));
// The latest day index on which |event_type| is known to have
@@ -585,7 +785,8 @@
// Update |obs_history_| with the latest date of Observation
// generation for this report, event code, and window size.
(*(*(*obs_history_.mutable_by_report_key())[report_key]
- .mutable_by_event_code())[event_code]
+ .mutable_unique_actives_history()
+ ->mutable_by_event_code())[event_code]
.mutable_by_window_size())[window_size] = obs_day_index;
}
}
diff --git a/logger/event_aggregator.h b/logger/event_aggregator.h
index b9b9a2b..8fbaf24 100644
--- a/logger/event_aggregator.h
+++ b/logger/event_aggregator.h
@@ -27,17 +27,6 @@
namespace cobalt {
namespace logger {
-// Maximum value of |backfill_days| allowed by the constructor of
-// EventAggregator.
-static const size_t kEventAggregatorMaxAllowedBackfillDays = 1000;
-// EventAggregator::GenerateObservations() ignores all aggregation window
-// sizes larger than this value.
-static const size_t kMaxAllowedAggregationWindowSize = 365;
-// The number of seconds in an hour.
-static const size_t kHour = 3600;
-// The number of seconds in a day.
-static const size_t kDay = kHour * 24;
-
// The EventAggregator class manages an in-memory store of aggregated values
// of Events logged for locally aggregated report types. For each day, this
// LocalAggregateStore contains an aggregate of the values of logged Events of
@@ -64,6 +53,12 @@
// needed to compute aggregates for any windows of interest in the future.
class EventAggregator {
public:
+ // Maximum value of |backfill_days| allowed by the constructor.
+ static const size_t kMaxAllowedBackfillDays = 1000;
+ // GenerateObservations() ignores all aggregation window sizes larger than
+ // this value.
+ static const size_t kMaxAllowedAggregationWindowSize = 365;
+
// Constructs an EventAggregator.
//
// An EventAggregator maintains daily aggregates of Events in a
@@ -110,10 +105,9 @@
util::ConsistentProtoStore* obs_history_proto_store,
const size_t backfill_days = 0,
const std::chrono::seconds aggregate_backup_interval =
- std::chrono::seconds(60),
- const std::chrono::seconds generate_obs_interval =
- std::chrono::seconds(kHour),
- const std::chrono::seconds gc_interval = std::chrono::seconds(kDay));
+ std::chrono::minutes(1),
+ const std::chrono::seconds generate_obs_interval = std::chrono::hours(1),
+ const std::chrono::seconds gc_interval = std::chrono::hours(24));
// Shut down the worker thread before destructing the EventAggregator.
~EventAggregator() { ShutDown(); }
@@ -155,6 +149,23 @@
// called.
Status LogUniqueActivesEvent(uint32_t report_id, EventRecord* event_record);
+ // Logs an Event associated to a report of type
+ // PER_DEVICE_COUNT_STATS to the EventAggregator.
+ //
+ // report_id: the ID of the report associated to the logged Event.
+ //
+ // event_record: an EventRecord wrapping an Event of type CountEvent
+ // and the MetricDefinition for which the Event is to be logged.
+ //
+ // Returns kOK if the LocalAggregateStore was successfully updated, and
+ // kInvalidArguments if either a lookup key corresponding to |report_id| was
+ // not found in the LocalAggregateStore, or if the Event wrapped by
+ // EventRecord is not of type CountEvent.
+ //
+ // Currently compatible only with EVENT_COUNT metrics with a single event code
+ // dimension. TODO(pesk, zmbush): support multiple event codes.
+ Status LogPerDeviceCountEvent(uint32_t report_id, EventRecord* event_record);
+
// Checks that the worker thread is shut down, and if so, calls the private
// method GenerateObservations() and returns its result. Returns kOther if the
// worker thread is joinable. See the documentation on GenerateObservations()
@@ -248,12 +259,12 @@
Status GarbageCollect(uint32_t day_index_utc, uint32_t day_index_local = 0u);
// Returns the most recent day index for which an Observation was generated
- // for a given report, event code, and window size, according to
- // |obs_history_|. Returns 0 if no Observation has been generated for the
- // given arguments.
- uint32_t LastGeneratedDayIndex(const std::string& report_key,
- uint32_t event_code,
- uint32_t window_size) const;
+ // for a given UNIQUE_N_DAY_ACTIVES report, event code, and window size,
+ // according to |obs_history_|. Returns 0 if no Observation has been generated
+ // for the given arguments.
+ uint32_t UniqueActivesLastGeneratedDayIndex(const std::string& report_key,
+ uint32_t event_code,
+ uint32_t window_size) const;
// For a fixed report of type UNIQUE_N_DAY_ACTIVES, generates an Observation
// for each event code of the parent metric, for each window size of the
@@ -305,7 +316,7 @@
util::ConsistentProtoStore* obs_history_proto_store_;
util::ProtectedFields<AggregateStoreFields> protected_aggregate_store_;
// Not protected by a mutex. Should only be accessed by |worker_thread_|.
- AggregatedObservationHistory obs_history_;
+ AggregatedObservationHistoryStore obs_history_;
size_t backfill_days_ = 0;
std::thread worker_thread_;
diff --git a/logger/event_aggregator_test.cc b/logger/event_aggregator_test.cc
index 6958cea..6ba882e 100644
--- a/logger/event_aggregator_test.cc
+++ b/logger/event_aggregator_test.cc
@@ -47,6 +47,8 @@
using testing::TestUpdateRecipient;
namespace {
+// Number of seconds in a day
+const int kDay = 60 * 60 * 24;
// Number of seconds in an ideal year
const int kYear = kDay * 365;
@@ -67,6 +69,73 @@
const MetricReportId kFeaturesActiveMetricReportId = MetricReportId(20, 201);
const MetricReportId kErrorsOccurredMetricReportId = MetricReportId(30, 302);
const MetricReportId kEventsOccurredMetricReportId = MetricReportId(40, 402);
+const MetricReportId kConnectionFailuresMetricReportId =
+ MetricReportId(50, 501);
+const MetricReportId kSettingsChangedMetricReportId = MetricReportId(60, 601);
+
+// A set of metric definitions of various types, each with a locally aggregated
+// report.
+static const char kMetricDefinitions[] = R"(
+metric {
+ metric_name: "ErrorsOccurred"
+ metric_type: EVENT_OCCURRED
+ customer_id: 1
+ project_id: 1
+ id: 30
+ max_event_code: 2
+ reports: {
+ report_name: "ErrorsOccurred_SimpleCount"
+ id: 301
+ report_type: SIMPLE_OCCURRENCE_COUNT
+ local_privacy_noise_level: NONE
+ }
+ reports: {
+ report_name: "ErrorsOccurred_UniqueDevices"
+ id: 302
+ report_type: UNIQUE_N_DAY_ACTIVES
+ local_privacy_noise_level: LARGE
+ window_size: 1
+ window_size: 7
+ window_size: 30
+ }
+}
+
+metric {
+ metric_name: "ConnectionFailures"
+ metric_type: EVENT_COUNT
+ customer_id: 1
+ project_id: 1
+ id: 50
+ reports: {
+ report_name: "ConnectionFailures_PerDeviceCount"
+ id: 501
+ report_type: PER_DEVICE_COUNT_STATS
+ window_size: 1
+ }
+}
+
+)";
+
+// Properties of the locally aggregated Observations which should be generated
+// for the reports in |kMetricDefinitions|, assuming that no events have ever
+// been logged for those reports.
+static const ExpectedAggregationParams kExpectedParams = {
+ /* The total number of locally aggregated Observations that should be
+ generated for each day index. */
+ 9,
+ /* The MetricReportIds of the locally aggregated reports in this
+ configuration. */
+ {kErrorsOccurredMetricReportId, kConnectionFailuresMetricReportId},
+ /* The number of Observations which should be generated for each day index,
+ broken down by MetricReportId. */
+ {{kErrorsOccurredMetricReportId, 9},
+ {kConnectionFailuresMetricReportId, 0}},
+ /* The number of event codes for each report of type UNIQUE_N_DAY_ACTIVES,
+ by MetricReportId. */
+ {{kErrorsOccurredMetricReportId, 3}},
+ /* The set of window sizes for each MetricReportId. */
+ {{kErrorsOccurredMetricReportId, {1, 7, 30}},
+ {kConnectionFailuresMetricReportId, {1}}}};
// A set of metric definitions of type EVENT_OCCURRED, each of which has a
// UNIQUE_N_DAY_ACTIVES report.
@@ -239,6 +308,63 @@
{kFeaturesActiveMetricReportId, {1, 7, 30}},
{kEventsOccurredMetricReportId, {1, 7}}}};
+// A set of MetricDefinitions of type EVENT_COUNT, each of which has a
+// ReportDefinition of type PER_DEVICE_COUNT_STATS.
+static const char kPerDeviceCountMetricDefinitions[] = R"(
+metric {
+ metric_name: "ConnectionFailures"
+ metric_type: EVENT_COUNT
+ customer_id: 1
+ project_id: 1
+ id: 50
+ reports: {
+ report_name: "ConnectionFailures_PerDeviceCount"
+ id: 501
+ report_type: PER_DEVICE_COUNT_STATS
+ window_size: 1
+ }
+}
+
+metric {
+ metric_name: "SettingsChanged"
+ metric_type: EVENT_COUNT
+ customer_id: 1
+ project_id: 1
+ id: 60
+ reports: {
+ report_name: "SettingsChanged_PerDeviceCount"
+ id: 601
+ report_type: PER_DEVICE_COUNT_STATS
+ window_size: 7
+ window_size: 30
+ }
+}
+
+)";
+
+// Properties of the locally aggregated Observations which should be generated
+// for the reports in |kMetricDefinitions|, assuming that no events have ever
+// been logged for those reports.
+//
+// TODO(pesk): update these fields once the EventAggregator is generating
+// observations for PerDeviceCount reports.
+static const ExpectedAggregationParams kPerDeviceCountExpectedParams = {
+ /* The total number of Observations that should be generated for a day
+ index. */
+ 0,
+ /* The MetricReportIds of the locally aggregated reports in this
+ configuration. */
+ {kConnectionFailuresMetricReportId, kSettingsChangedMetricReportId},
+ /* The number of Observations which should be generated for a day index,
+ broken down by MetricReportId. */
+ {{kConnectionFailuresMetricReportId, 0},
+ {kSettingsChangedMetricReportId, 0}},
+ /* Omitted because this config contains no UNIQUE_N_DAY_ACTIVES reports. */
+ {},
+ /* The set of window sizes for each MetricReportId. */
+ {{kConnectionFailuresMetricReportId, {1}},
+ {kSettingsChangedMetricReportId, {7, 30}}}};
+
// A set of MetricDefinitions including one with TimeZonePolicy UTC and one with
// TimeZonePolicy LOCAL.
static const char kNoiseFreeMixedTimeZoneMetricDefinitions[] = R"(
@@ -300,6 +426,15 @@
typedef std::map<std::string, std::map<uint32_t, std::set<uint32_t>>>
LoggedActivity;
+// A map used in tests as a record, external to the LocalAggregateStore, of the
+// activity logged for PER_DEVICE_COUNT_STATS reports. The keys are, in
+// descending order, serialized ReportAggregationKeys, components, event codes,
+// and day indices. The innermost value is a count.
+typedef std::map<
+ std::string,
+ std::map<std::string, std::map<uint32_t, std::map<uint32_t, int64_t>>>>
+ LoggedCounts;
+
// Given a string representing a MetricDefinitions proto message, creates a
// ProjectContext from that MetricDefinitions and returns a unique pointer.
std::unique_ptr<ProjectContext> MakeProjectContext(const char metric_string[]) {
@@ -416,7 +551,7 @@
// Given a ProjectContext |project_context| and the MetricReportId of a
// UNIQUE_N_DAY_ACTIVES report in |project_context|, as well as a day index
- // and an event code, logs a UniqueActivesEvent to the EventAggregator for
+ // and an event code, logs an OccurrenceEvent to the EventAggregator for
// that report, day index, and event code. If a non-null LoggedActivity map is
// provided, updates the map with information about the logged Event.
Status LogUniqueActivesEvent(const ProjectContext& project_context,
@@ -441,9 +576,43 @@
return status;
}
- // Given a LoggedActivity map describing the events that have been logged to
- // the EventAggregator, checks whether the contents of the LocalAggregateStore
- // are as expected, accounting for any garbage collection.
+ // Given a ProjectContext |project_context| and the MetricReportId of an
+ // PER_DEVICE_COUNT_STATS report in |project_context|, as well as a
+ // day index, a component string, and an event code, logs a CountEvent to the
+ // EventAggregator for that report, day index, component, and event code. If a
+ // non-null LoggedCounts map is provided, updates the map with information
+ // about the logged Event.
+ Status LogPerDeviceCountEvent(const ProjectContext& project_context,
+ const MetricReportId& metric_report_id,
+ uint32_t day_index,
+ const std::string& component,
+ uint32_t event_code, int64_t count,
+ LoggedCounts* logged_counts = nullptr) {
+ EventRecord event_record;
+ event_record.metric = project_context.GetMetric(metric_report_id.first);
+ event_record.event->set_day_index(day_index);
+ auto count_event = event_record.event->mutable_count_event();
+ count_event->set_component(component);
+ count_event->add_event_code(event_code);
+ count_event->set_count(count);
+ auto status = event_aggregator_->LogPerDeviceCountEvent(
+ metric_report_id.second, &event_record);
+ if (logged_counts == nullptr) {
+ return status;
+ }
+ std::string key;
+ if (!SerializeToBase64(
+ MakeAggregationKey(project_context, metric_report_id), &key)) {
+ return kInvalidArguments;
+ }
+ (*logged_counts)[key][component][event_code][day_index] += count;
+ return status;
+ }
+
+ // Given a LoggedActivity map describing the events that have been logged
+ // to the EventAggregator, checks whether the contents of the
+ // LocalAggregateStore are as expected, accounting for any garbage
+ // collection.
//
// logged_activity: a LoggedActivity representing event occurrences
// since the LocalAggregateStore was created. All day indices should be
@@ -452,26 +621,33 @@
//
// current_day_index: The day index of the current day in the test's frame
// of reference.
- bool CheckAggregateStore(const LoggedActivity& logged_activity,
- uint32_t current_day_index) {
+ bool CheckUniqueActivesAggregates(const LoggedActivity& logged_activity,
+ uint32_t current_day_index) {
auto local_aggregate_store = event_aggregator_->CopyLocalAggregateStore();
- // Check that the LocalAggregateStore contains no more aggregates than
- // |logged_activity| and |day_last_garbage_collected_| should imply.
- for (const auto& report_pair : local_aggregate_store.aggregates()) {
- const auto& report_key = report_pair.first;
+ // Check that the LocalAggregateStore contains no more UniqueActives
+ // aggregates than |logged_activity| and |day_last_garbage_collected_|
+ // should imply.
+ for (const auto& report_pair : local_aggregate_store.by_report_key()) {
const auto& aggregates = report_pair.second;
+ if (aggregates.type_case() !=
+ ReportAggregates::kUniqueActivesAggregates) {
+ continue;
+ }
+ const auto& report_key = report_pair.first;
// Check whether this ReportAggregationKey is in |logged_activity|. If
// not, expect that its by_event_code map is empty.
auto report_activity = logged_activity.find(report_key);
if (report_activity == logged_activity.end()) {
- EXPECT_TRUE(aggregates.by_event_code().empty());
- if (!aggregates.by_event_code().empty()) {
+ EXPECT_TRUE(
+ aggregates.unique_actives_aggregates().by_event_code().empty());
+ if (!aggregates.unique_actives_aggregates().by_event_code().empty()) {
return false;
}
break;
}
auto expected_events = report_activity->second;
- for (const auto& event_pair : aggregates.by_event_code()) {
+ for (const auto& event_pair :
+ aggregates.unique_actives_aggregates().by_event_code()) {
// Check that this event code is in |logged_activity| under this
// ReportAggregationKey.
auto event_code = event_pair.first;
@@ -508,26 +684,37 @@
for (const auto& logged_pair : logged_activity) {
const auto& logged_key = logged_pair.first;
const auto& logged_event_map = logged_pair.second;
- // Check that this ReportAggregationKey is in the LocalAggregateStore.
+ // Check that this ReportAggregationKey is in the LocalAggregateStore, and
+ // that the aggregates are of the expected type.
auto report_aggregates =
- local_aggregate_store.aggregates().find(logged_key);
- EXPECT_NE(report_aggregates, local_aggregate_store.aggregates().end());
- if (report_aggregates == local_aggregate_store.aggregates().end()) {
+ local_aggregate_store.by_report_key().find(logged_key);
+ EXPECT_NE(report_aggregates, local_aggregate_store.by_report_key().end());
+ if (report_aggregates == local_aggregate_store.by_report_key().end()) {
return false;
}
+ if (report_aggregates->second.type_case() !=
+ ReportAggregates::kUniqueActivesAggregates) {
+ return false;
+ }
+ // Compute the earliest day index that should appear among the aggregates
+ // for this report.
+ auto earliest_allowed = EarliestAllowedDayIndex(
+ report_aggregates->second.aggregation_config());
for (const auto& logged_event_pair : logged_event_map) {
const auto& logged_event_code = logged_event_pair.first;
const auto& logged_days = logged_event_pair.second;
- auto earliest_allowed = EarliestAllowedDayIndex(
- report_aggregates->second.aggregation_config());
// Check whether this event code is in the LocalAggregateStore
// under this ReportAggregationKey. If not, check that all day indices
// for this event code are smaller than the day index of the earliest
// allowed aggregate.
auto event_code_aggregates =
- report_aggregates->second.by_event_code().find(logged_event_code);
+ report_aggregates->second.unique_actives_aggregates()
+ .by_event_code()
+ .find(logged_event_code);
if (event_code_aggregates ==
- report_aggregates->second.by_event_code().end()) {
+ report_aggregates->second.unique_actives_aggregates()
+ .by_event_code()
+ .end()) {
for (auto day_index : logged_days) {
EXPECT_LT(day_index, earliest_allowed);
if (day_index >= earliest_allowed) {
@@ -565,6 +752,172 @@
return true;
}
+ bool CheckPerDeviceCountAggregates(const LoggedCounts& logged_counts,
+ uint32_t current_day_index) {
+ auto local_aggregate_store = event_aggregator_->CopyLocalAggregateStore();
+ // Check that the LocalAggregateStore contains no more PerDeviceCount
+ // aggregates than |logged_counts| and |day_last_garbage_collected_| should
+ // imply.
+ for (const auto& report_pair : local_aggregate_store.by_report_key()) {
+ const auto& aggregates = report_pair.second;
+ if (aggregates.type_case() != ReportAggregates::kCountAggregates) {
+ continue;
+ }
+ const auto& report_key = report_pair.first;
+ // Check whether this ReportAggregationKey is in |logged_counts|. If not,
+ // expect that its by_component map is empty.
+ auto report_counts = logged_counts.find(report_key);
+ if (report_counts == logged_counts.end()) {
+ EXPECT_TRUE(aggregates.count_aggregates().by_component().empty());
+ if (!aggregates.count_aggregates().by_component().empty()) {
+ return false;
+ }
+ break;
+ }
+ auto expected_components = report_counts->second;
+ for (const auto& component_pair :
+ aggregates.count_aggregates().by_component()) {
+ // Check that this component is in |logged_counts| under this
+ // ReportAggregationKey.
+ auto component = component_pair.first;
+ auto component_counts = expected_components.find(component);
+ EXPECT_NE(component_counts, expected_components.end());
+ if (component_counts == expected_components.end()) {
+ return false;
+ }
+ const auto& expected_events = component_counts->second;
+ for (const auto& event_pair : component_pair.second.by_event_code()) {
+ // Check that this event code is in |logged_counts| under this
+ // ReportAggregationKey and component.
+ const auto& event_code = event_pair.first;
+ auto event_counts = expected_events.find(event_code);
+ EXPECT_NE(event_counts, expected_events.end());
+ if (event_counts == expected_events.end()) {
+ return false;
+ }
+ const auto& expected_days = event_counts->second;
+ for (const auto& day_pair : event_pair.second.by_day_index()) {
+ // Check that this day index is in |logged_counts| under this
+ // ReportAggregationKey, component, and event code.
+ const auto& day_index = day_pair.first;
+ auto day_count = expected_days.find(day_index);
+ EXPECT_NE(day_count, expected_days.end());
+ if (day_count == expected_days.end()) {
+ return false;
+ }
+ // Check that the day index is no earlier than is implied by the
+ // dates of store creation and garbage collection.
+ EXPECT_GE(day_index,
+ EarliestAllowedDayIndex(aggregates.aggregation_config()));
+ if (day_index <
+ EarliestAllowedDayIndex(aggregates.aggregation_config())) {
+ return false;
+ }
+ }
+ }
+ }
+ }
+
+ // Check that the LocalAggregateStore contains aggregates for all counts in
+ // |logged_counts|, as long as they are recent enough to have survived any
+ // garbage collection.
+ for (const auto& logged_pair : logged_counts) {
+ const auto& logged_key = logged_pair.first;
+ const auto& logged_component_map = logged_pair.second;
+ // Check that this ReportAggregationKey is in the LocalAggregateStore, and
+ // that the aggregates are of the expected type.
+ auto report_aggregates =
+ local_aggregate_store.by_report_key().find(logged_key);
+ EXPECT_NE(report_aggregates, local_aggregate_store.by_report_key().end());
+ if (report_aggregates == local_aggregate_store.by_report_key().end()) {
+ return false;
+ }
+ if (report_aggregates->second.type_case() !=
+ ReportAggregates::kCountAggregates) {
+ return false;
+ }
+ // Compute the earliest day index that should appear among the aggregates
+ // for this report.
+ auto earliest_allowed = EarliestAllowedDayIndex(
+ report_aggregates->second.aggregation_config());
+ for (const auto& logged_component_pair : logged_component_map) {
+ const auto& logged_component = logged_component_pair.first;
+ const auto& logged_event_code_map = logged_component_pair.second;
+ // Check whether this component is in the LocalAggregateStore under this
+ // ReportAggregationKey. If not, check that all day indices for all
+ // entries in |logged_counts| under this component are smaller than the
+ // day index of the earliest allowed aggregate.
+ bool component_found = false;
+ auto component_aggregates =
+ report_aggregates->second.count_aggregates().by_component().find(
+ logged_component);
+ if (component_aggregates !=
+ report_aggregates->second.count_aggregates().by_component().end()) {
+ component_found = true;
+ }
+ for (const auto& logged_event_pair : logged_event_code_map) {
+ const auto& logged_event_code = logged_event_pair.first;
+ const auto& logged_day_map = logged_event_pair.second;
+ // Check whether this event code is in the LocalAggregateStore under
+ // this ReportAggregationKey. If not, check that all day indices in
+ // |logged_counts| under this component are smaller than the day index
+ // of the earliest allowed aggregate.
+ bool event_code_found = false;
+ if (component_found) {
+ auto event_code_aggregates =
+ component_aggregates->second.by_event_code().find(
+ logged_event_code);
+ if (event_code_aggregates !=
+ component_aggregates->second.by_event_code().end()) {
+ event_code_found = true;
+ }
+ if (event_code_found) {
+ // Check that all of the day indices in |logged_counts| under this
+ // ReportAggregationKey, component, and event code are in the
+ // LocalAggregateStore, as long as they are recent enough to have
+ // survived any garbage collection. Check that each aggregate has
+ // the expected count.
+ for (const auto& logged_day_pair : logged_day_map) {
+ auto logged_day_index = logged_day_pair.first;
+ auto logged_count = logged_day_pair.second;
+ auto day_aggregate =
+ event_code_aggregates->second.by_day_index().find(
+ logged_day_index);
+ if (logged_day_index >= earliest_allowed) {
+ EXPECT_NE(day_aggregate,
+ event_code_aggregates->second.by_day_index().end());
+ if (day_aggregate ==
+ event_code_aggregates->second.by_day_index().end()) {
+ return false;
+ }
+ EXPECT_EQ(
+ day_aggregate->second.count_daily_aggregate().count(),
+ logged_count);
+ if (day_aggregate->second.count_daily_aggregate().count() !=
+ logged_count) {
+ return false;
+ }
+ }
+ }
+ }
+ }
+ if (!component_found | !event_code_found) {
+ for (auto logged_day_pair : logged_day_map) {
+ auto logged_day_index = logged_day_pair.first;
+ EXPECT_LT(logged_day_index, earliest_allowed);
+ if (logged_day_index >= earliest_allowed) {
+ return false;
+ }
+ }
+ break;
+ }
+ }
+ }
+ }
+
+ return true;
+ }
+
// Given the AggregationConfig of a locally aggregated report, returns the
// earliest (smallest) day index for which an aggregate may exist in the
// LocalAggregateStore for that report, accounting for garbage
@@ -583,9 +936,9 @@
// Otherwise, it is the later of:
// (a) The day index on which the store was created minus the number
// of backfill days.
- // (b) The day index for which the store was last garbage-collected minus
- // the number of backfill days, minus the largest window size in the
- // report associated to |config|, plus 1.
+ // (b) The day index for which the store was last garbage-collected
+ // minus the number of backfill days, minus the largest window size in
+ // the report associated to |config|, plus 1.
EXPECT_GE(day_last_garbage_collected_, backfill_days)
<< "The day index of last garbage collection must be larger than "
"the number of backfill days.";
@@ -639,7 +992,7 @@
event_aggregator_->UpdateAggregationConfigs(*project_context_);
}
- // Logs a UniqueActivesEvent for the MetricReportId of a locally
+ // Logs an OccurrenceEvent for the MetricReportId of a locally
// aggregated report in |metric_string|. Overrides the method
// EventAggregatorTest::LogUniqueActivesEvent.
Status LogUniqueActivesEvent(const MetricReportId& metric_report_id,
@@ -650,6 +1003,19 @@
logged_activity);
}
+ // Logs a CountEvent for the MetricReportId of a locally
+ // aggregated report in |metric_string|. Overrides the method
+ // EventAggregatorTest::LogPerDeviceCountEvent.
+ Status LogPerDeviceCountEvent(const MetricReportId& metric_report_id,
+ uint32_t day_index,
+ const std::string& component,
+ uint32_t event_code, int64_t count,
+ LoggedCounts* logged_counts = nullptr) {
+ return EventAggregatorTest::LogPerDeviceCountEvent(
+ *project_context_, metric_report_id, day_index, component, event_code,
+ count, logged_counts);
+ }
+
private:
// A ProjectContext wrapping the MetricDefinitions passed to the
// constructor in |metric_string|.
@@ -677,6 +1043,16 @@
};
// Creates an EventAggregator as in EventAggregatorTest and provides it with
+// |kPerDeviceCountMetricDefinitions|.
+class PerDeviceCountEventAggregatorTest
+ : public EventAggregatorTestWithProjectContext {
+ protected:
+ PerDeviceCountEventAggregatorTest()
+ : EventAggregatorTestWithProjectContext(
+ kPerDeviceCountMetricDefinitions) {}
+};
+
+// Creates an EventAggregator as in EventAggregatorTest and provides it with
// |kNoiseFreeMixedTimeZoneMetricDefinitions|.
class NoiseFreeMixedTimeZoneEventAggregatorTest
: public EventAggregatorTestWithProjectContext {
@@ -726,34 +1102,30 @@
// Tests that an empty LocalAggregateStore is updated with
// ReportAggregationKeys and AggregationConfigs as expected when
-// EventAggregator::UpdateAggregationConfigs is called.
+// EventAggregator::UpdateAggregationConfigs is called with a ProjectContext
+// containing at least one report for each locally aggregated report type.
TEST_F(EventAggregatorTest, UpdateAggregationConfigs) {
// Check that the LocalAggregateStore is empty.
- EXPECT_EQ(0u, CopyLocalAggregateStore().aggregates().size());
- // Provide |kUniqueActivesMetricDefinitions| to the EventAggregator.
- auto unique_actives_project_context =
- MakeProjectContext(kUniqueActivesMetricDefinitions);
- EXPECT_EQ(kOK, event_aggregator_->UpdateAggregationConfigs(
- *unique_actives_project_context));
+ EXPECT_EQ(0u, CopyLocalAggregateStore().by_report_key().size());
+ // Provide |kMetricDefinitions| to the EventAggregator.
+ auto project_context = MakeProjectContext(kMetricDefinitions);
+ EXPECT_EQ(kOK, event_aggregator_->UpdateAggregationConfigs(*project_context));
// Check that the number of key-value pairs in the LocalAggregateStore is
// now equal to the number of locally aggregated reports in
- // |kUniqueActivesMetricDefinitions|.
- EXPECT_EQ(kUniqueActivesExpectedParams.metric_report_ids.size(),
- CopyLocalAggregateStore().aggregates().size());
+ // |kMetricDefinitions|.
+ EXPECT_EQ(kExpectedParams.metric_report_ids.size(),
+ CopyLocalAggregateStore().by_report_key().size());
// Check that the LocalAggregateStore contains the expected
// ReportAggregationKey and AggregationConfig for each locally aggregated
// report in |kUniqueActivesMetricDefinitions|,
- for (const auto& metric_report_id :
- kUniqueActivesExpectedParams.metric_report_ids) {
+ for (const auto& metric_report_id : kExpectedParams.metric_report_ids) {
std::string key;
- SerializeToBase64(
- MakeAggregationKey(*unique_actives_project_context, metric_report_id),
- &key);
- auto config = MakeAggregationConfig(*unique_actives_project_context,
- metric_report_id);
+ SerializeToBase64(MakeAggregationKey(*project_context, metric_report_id),
+ &key);
+ auto config = MakeAggregationConfig(*project_context, metric_report_id);
LocalAggregateStore local_aggregate_store = CopyLocalAggregateStore();
- auto report_aggregates = local_aggregate_store.aggregates().find(key);
- EXPECT_NE(local_aggregate_store.aggregates().end(), report_aggregates);
+ auto report_aggregates = local_aggregate_store.by_report_key().find(key);
+ EXPECT_NE(local_aggregate_store.by_report_key().end(), report_aggregates);
EXPECT_TRUE(MessageDifferencer::Equals(
config, report_aggregates->second.aggregation_config()));
}
@@ -780,7 +1152,7 @@
// now equal to the number of locally aggregated reports in
// |kUniqueActivesMetricDefinitions|.
EXPECT_EQ(kUniqueActivesExpectedParams.metric_report_ids.size(),
- CopyLocalAggregateStore().aggregates().size());
+ CopyLocalAggregateStore().by_report_key().size());
// Provide the EventAggregator with
// |kNoiseFreeUniqueActivesMetricDefinitions|.
auto noise_free_unique_actives_project_context =
@@ -792,7 +1164,7 @@
// aggregated reports in |kUniqueActivesMetricDefinitions| and
// |kNoiseFreeUniqueActivesMetricDefinitions|.
auto local_aggregate_store = CopyLocalAggregateStore();
- EXPECT_EQ(4u, local_aggregate_store.aggregates().size());
+ EXPECT_EQ(4u, local_aggregate_store.by_report_key().size());
// The MetricReportId |kFeaturesActiveMetricReportId| appears in both
// |kUniqueActivesMetricDefinitions| and
// |kNoiseFreeUniqueActivesMetricDefinitions|. The associated
@@ -810,8 +1182,8 @@
&key));
auto unique_actives_config = MakeAggregationConfig(
*unique_actives_project_context, kFeaturesActiveMetricReportId);
- auto report_aggregates = local_aggregate_store.aggregates().find(key);
- EXPECT_NE(local_aggregate_store.aggregates().end(), report_aggregates);
+ auto report_aggregates = local_aggregate_store.by_report_key().find(key);
+ EXPECT_NE(local_aggregate_store.by_report_key().end(), report_aggregates);
EXPECT_TRUE(MessageDifferencer::Equals(
unique_actives_config, report_aggregates->second.aggregation_config()));
auto noise_free_config =
@@ -821,39 +1193,84 @@
noise_free_config, report_aggregates->second.aggregation_config()));
}
-// Tests that EventAggregator::LogUniqueActivesEvent returns
-// |kInvalidArguments| when passed a report ID which is not associated to a
-// key of the LocalAggregateStore, or when passed an EventRecord containing
-// an Event proto message which is not of type OccurrenceEvent.
+// Tests that EventAggregator::Log*Event returns |kInvalidArguments| when
+// passed a report ID which is not associated to a key of the
+// LocalAggregateStore, or when passed an EventRecord containing an Event
+// proto message which is not of the appropriate event type.
TEST_F(EventAggregatorTest, LogBadEvents) {
- // Provide the EventAggregator with |kUniqueActivesMetricDefinitions|.
- auto unique_actives_project_context =
- MakeProjectContext(kUniqueActivesMetricDefinitions);
- EXPECT_EQ(kOK, event_aggregator_->UpdateAggregationConfigs(
- *unique_actives_project_context));
- // Attempt to log a UniqueActivesEvent for
- // |kEventsOccurredMetricReportId|, which is not in
- // |kUniqueActivesMetricDefinitions|. Check that the result is
- // |kInvalidArguments|.
+ // Provide the EventAggregator with |kMetricDefinitions|.
+ auto project_context = MakeProjectContext(kUniqueActivesMetricDefinitions);
+ EXPECT_EQ(kOK, event_aggregator_->UpdateAggregationConfigs(*project_context));
+ // Attempt to log events for |kEventsOccurredMetricReportId|, which is not
+ // in |kMetricDefinitions|. Check that the result is |kInvalidArguments|.
auto noise_free_project_context =
MakeProjectContext(kNoiseFreeUniqueActivesMetricDefinitions);
EventRecord bad_event_record;
bad_event_record.metric = noise_free_project_context->GetMetric(
kEventsOccurredMetricReportId.first);
bad_event_record.event->set_day_index(CurrentDayIndex());
- bad_event_record.event->mutable_occurrence_event()->set_event_code(0u);
+ bad_event_record.event->mutable_occurrence_event();
EXPECT_EQ(kInvalidArguments,
event_aggregator_->LogUniqueActivesEvent(
kEventsOccurredMetricReportId.second, &bad_event_record));
+ bad_event_record.event->mutable_count_event();
+ EXPECT_EQ(kInvalidArguments,
+ event_aggregator_->LogPerDeviceCountEvent(
+ kEventsOccurredMetricReportId.second, &bad_event_record));
// Attempt to call LogUniqueActivesEvent() with a valid metric and report
// ID, but with an EventRecord wrapping an Event which is not an
// OccurrenceEvent. Check that the result is |kInvalidArguments|.
- bad_event_record.metric = unique_actives_project_context->GetMetric(
- kFeaturesActiveMetricReportId.first);
+ bad_event_record.metric =
+ project_context->GetMetric(kErrorsOccurredMetricReportId.first);
bad_event_record.event->mutable_count_event();
EXPECT_EQ(kInvalidArguments,
event_aggregator_->LogUniqueActivesEvent(
- kFeaturesActiveMetricReportId.second, &bad_event_record));
+ kErrorsOccurredMetricReportId.second, &bad_event_record));
+ // Attempt to call LogPerDeviceCountEvent() with a valid metric and report
+ // ID, but with an EventRecord wrapping an Event which is not a
+ // CountEvent. Check that the result is |kInvalidArguments|.
+ bad_event_record.metric =
+ project_context->GetMetric(kConnectionFailuresMetricReportId.first);
+ bad_event_record.event->mutable_occurrence_event();
+ EXPECT_EQ(kInvalidArguments,
+ event_aggregator_->LogPerDeviceCountEvent(
+ kConnectionFailuresMetricReportId.second, &bad_event_record));
+}
+
+// Tests that EventAggregator::GenerateObservations() returns a positive
+// status and that the expected number of Observations is generated when no
+// Events have been logged to the EventAggregator.
+TEST_F(EventAggregatorTest, GenerateObservationsNoEvents) {
+ // Provide the EventAggregator with |kMetricDefinitions|.
+ auto project_context = MakeProjectContext(kMetricDefinitions);
+ EXPECT_EQ(kOK, event_aggregator_->UpdateAggregationConfigs(*project_context));
+ // Generate locally aggregated Observations for the current day index.
+ EXPECT_EQ(kOK, GenerateObservations(CurrentDayIndex()));
+ std::vector<Observation2> observations(0);
+ EXPECT_TRUE(FetchAggregatedObservations(&observations, kExpectedParams,
+ observation_store_.get(),
+ update_recipient_.get()));
+}
+
+// Tests that EventAggregator::GenerateObservations() only generates
+// Observations the first time it is called for a given day index.
+TEST_F(EventAggregatorTest, GenerateObservationsTwice) {
+ // Provide the EventAggregator with |kMetricDefinitions|.
+ auto project_context = MakeProjectContext(kMetricDefinitions);
+ EXPECT_EQ(kOK, event_aggregator_->UpdateAggregationConfigs(*project_context));
+ // Check that Observations are generated when GenerateObservations is called
+ // for the current day index for the first time.
+ auto current_day_index = CurrentDayIndex();
+ EXPECT_EQ(kOK, GenerateObservations(current_day_index));
+ std::vector<Observation2> observations(0);
+ EXPECT_TRUE(FetchAggregatedObservations(&observations, kExpectedParams,
+ observation_store_.get(),
+ update_recipient_.get()));
+ // Check that no Observations are generated when GenerateObservations is
+ // called for the currentday index for the second time.
+ ResetObservationStore();
+ EXPECT_EQ(kOK, GenerateObservations(current_day_index));
+ EXPECT_EQ(0u, observation_store_->messages_received.size());
}
// Tests that the LocalAggregateStore is updated as expected when
@@ -864,7 +1281,7 @@
//
// Logs some valid events each day for 35 days, checking the contents of the
// LocalAggregateStore each day.
-TEST_F(UniqueActivesEventAggregatorTest, LogUniqueActivesEvents) {
+TEST_F(UniqueActivesEventAggregatorTest, LogEvents) {
LoggedActivity logged_activity;
uint32_t num_days = 35;
for (uint32_t offset = 0; offset < num_days; offset++) {
@@ -874,12 +1291,12 @@
auto day_index = CurrentDayIndex();
EXPECT_EQ(kOK, LogUniqueActivesEvent(kFeaturesActiveMetricReportId,
day_index, 0u, &logged_activity));
- EXPECT_TRUE(CheckAggregateStore(logged_activity, day_index));
+ EXPECT_TRUE(CheckUniqueActivesAggregates(logged_activity, day_index));
// Log another event for the same report, event code, and day index.
// Check the contents of the LocalAggregateStore.
EXPECT_EQ(kOK, LogUniqueActivesEvent(kFeaturesActiveMetricReportId,
day_index, 0u, &logged_activity));
- EXPECT_TRUE(CheckAggregateStore(logged_activity, day_index));
+ EXPECT_TRUE(CheckUniqueActivesAggregates(logged_activity, day_index));
// Log several more events for various valid reports and event codes.
// Check the contents of the LocalAggregateStore.
EXPECT_EQ(kOK, LogUniqueActivesEvent(kDeviceBootsMetricReportId, day_index,
@@ -888,15 +1305,15 @@
day_index, 4u, &logged_activity));
EXPECT_EQ(kOK, LogUniqueActivesEvent(kErrorsOccurredMetricReportId,
day_index, 1u, &logged_activity));
- EXPECT_TRUE(CheckAggregateStore(logged_activity, day_index));
+ EXPECT_TRUE(CheckUniqueActivesAggregates(logged_activity, day_index));
AdvanceClock(kDay);
}
}
-// Tests the method EventAggregator::GarbageCollect().
+// Tests GarbageCollect() for UniqueActivesReportAggregates.
//
// For each value of N in the range [0, 34], logs some UniqueActivesEvents
-// each day for N consecutive days and then garbage-collect the
+// each day for N consecutive days and then garbage-collects the
// LocalAggregateStore. After garbage collection, verifies the contents of
// the LocalAggregateStore.
TEST_F(UniqueActivesEventAggregatorTest, GarbageCollect) {
@@ -915,16 +1332,18 @@
&logged_activity));
EXPECT_EQ(kOK, LogUniqueActivesEvent(metric_report_id, day_index, 0u,
&logged_activity));
- // Log 1 event with event code 1.
- EXPECT_EQ(kOK, LogUniqueActivesEvent(metric_report_id, day_index, 1u,
- &logged_activity));
+ if (offset < 3) {
+ // Log 1 event with event code 1.
+ EXPECT_EQ(kOK, LogUniqueActivesEvent(metric_report_id, day_index, 1u,
+ &logged_activity));
+ }
}
AdvanceClock(kDay);
}
auto end_day_index = CurrentDayIndex();
EXPECT_EQ(kOK, GarbageCollect(end_day_index));
day_last_garbage_collected_ = end_day_index;
- EXPECT_TRUE(CheckAggregateStore(logged_activity, end_day_index));
+ EXPECT_TRUE(CheckUniqueActivesAggregates(logged_activity, end_day_index));
TearDown();
}
}
@@ -969,8 +1388,8 @@
//
// Each day, calls GenerateObservations() with the day index of the previous
// day. Checks that a positive status is returned and that the
-// FakeObservationStore has received the expected number of new observations for
-// each locally aggregated report ID in |kUniqueActivesMetricDefinitions|.
+// FakeObservationStore has received the expected number of new observations
+// for each locally aggregated report ID in |kUniqueActivesMetricDefinitions|.
TEST_F(UniqueActivesEventAggregatorTest, GenerateObservations) {
int num_days = 35;
std::vector<Observation2> observations(0);
@@ -999,8 +1418,8 @@
}
// Tests that GenerateObservations() returns a positive status and that the
-// expected number of Observations is generated each day when Events are logged
-// for UNIQUE_N_DAY_ACTIVES reports over multiple days, and when the
+// expected number of Observations is generated each day when Events are
+// logged for UNIQUE_N_DAY_ACTIVES reports over multiple days, and when the
// LocalAggregateStore is garbage-collected each day.
//
// For 35 days, logs 2 events each day for the ErrorsOccurred_UniqueDevices
@@ -1050,9 +1469,9 @@
// Sets the |backfill_days_| field of the EventAggregator to 3.
//
// Logging pattern:
-// For 35 days, logs 2 events each day for the SomeErrorsOccurred_UniqueDevices
-// report and 2 events for the SomeFeaturesActive_Unique_Devices report, all
-// with event code 0.
+// For 35 days, logs 2 events each day for the
+// SomeErrorsOccurred_UniqueDevices report and 2 events for the
+// SomeFeaturesActive_Unique_Devices report, all with event code 0.
//
// Observation generation pattern:
// Calls GenerateObservations() on the 1st through 5th and the 7th out of
@@ -1060,8 +1479,8 @@
//
// Expected numbers of Observations:
// It is expected that 4 days' worth of Observations are generated on
-// the first day of every 10 (the day index for which GenerateObservations() was
-// called, plus 3 days of backfill), that 1 day's worth of Observations
+// the first day of every 10 (the day index for which GenerateObservations()
+// was called, plus 3 days of backfill), that 1 day's worth of Observations
// are generated on the 2nd through 5th day of every 10, that 2 days'
// worth of Observations are generated on the 7th day of every 10 (the
// day index for which GenerateObservations() was called, plus 1 day of
@@ -1070,8 +1489,8 @@
// Set |backfill_days_| to 3.
size_t backfill_days = 3;
SetBackfillDays(backfill_days);
- // Log 2 events each day for 35 days. Call GenerateObservations() on the first
- // 5 day indices, and the 7th, out of every 10.
+ // Log 2 events each day for 35 days. Call GenerateObservations() on the
+ // first 5 day indices, and the 7th, out of every 10.
for (int offset = 0; offset < 35; offset++) {
auto day_index = CurrentDayIndex();
for (int i = 0; i < 2; i++) {
@@ -1118,9 +1537,9 @@
// Sets the |backfill_days_| field of the EventAggregator to 3.
//
// Logging pattern:
-// For 35 days, logs 2 events each day for the SomeErrorsOccurred_UniqueDevices
-// report and 2 events for the SomeFeaturesActive_Unique_Devices report, all
-// with event code 0.
+// For 35 days, logs 2 events each day for the
+// SomeErrorsOccurred_UniqueDevices report and 2 events for the
+// SomeFeaturesActive_Unique_Devices report, all with event code 0.
//
// Observation generation pattern:
// Calls GenerateObservations() on the 1st through 5th and the 7th out of
@@ -1129,8 +1548,8 @@
//
// Expected numbers of Observations:
// It is expected that 4 days' worth of Observations are generated on
-// the first day of every 10 (the day index for which GenerateObservations() was
-// called, plus 3 days of backfill), that 1 day's worth of Observations
+// the first day of every 10 (the day index for which GenerateObservations()
+// was called, plus 3 days of backfill), that 1 day's worth of Observations
// are generated on the 2nd through 5th day of every 10, that 2 days'
// worth of Observations are generated on the 7th day of every 10 (the
// day index for which GenerateObservations() was called, plus 1 day of
@@ -1141,8 +1560,8 @@
// Set |backfill_days_| to 3.
size_t backfill_days = 3;
SetBackfillDays(backfill_days);
- // Log 2 events each day for 35 days. Call GenerateObservations() on the first
- // 5 day indices, and the 7th, out of every 10.
+ // Log 2 events each day for 35 days. Call GenerateObservations() on the
+ // first 5 day indices, and the 7th, out of every 10.
for (int offset = 0; offset < num_days; offset++) {
auto day_index = CurrentDayIndex();
for (int i = 0; i < 2; i++) {
@@ -1198,15 +1617,15 @@
expected_obs, observation_store_.get(), update_recipient_.get()));
}
-// Checks that UniqueActivesObservations with the expected values are generated
-// when GenerateObservations() is called for a single day index after logging
-// some events for UNIQUE_N_DAY_ACTIVES reports for that day index, without any
-// garbage collection or backfill.
+// Checks that UniqueActivesObservations with the expected values are
+// generated when GenerateObservations() is called for a single day index
+// after logging some events for UNIQUE_N_DAY_ACTIVES reports for that day
+// index, without any garbage collection or backfill.
//
// Logging pattern:
// Logs 2 occurrences of event code 0 for the FeaturesActives_UniqueDevices
-// report, and 1 occurrence of event code 1 for the EventsOccurred_UniqueDevices
-// report, all on the same day.
+// report, and 1 occurrence of event code 1 for the
+// EventsOccurred_UniqueDevices report, all on the same day.
//
// Observation generation pattern:
// Calls GenerateObservations() after logging all events.
@@ -1265,8 +1684,8 @@
// Checks that UniqueActivesObservations with the expected values are
// generated when some events have been logged for a UNIQUE_N_DAY_ACTIVES
-// report for over multiple days and GenerateObservations() is called each day,
-// without garbage collection or backfill.
+// report for over multiple days and GenerateObservations() is called each
+// day, without garbage collection or backfill.
//
// Logging pattern:
// Logs events for the SomeEventsOccurred_UniqueDevices report (whose parent
@@ -1283,12 +1702,13 @@
//
// Expected number of Observations:
// Each call to GenerateObservations should generate a number of Observations
-// equal to the daily_num_obs field of |kNoisefreeUniqueActivesExpectedParams|.
+// equal to the daily_num_obs field of
+// |kNoisefreeUniqueActivesExpectedParams|.
//
// Expected Observation values:
// The SomeEventsOccurred_UniqueDevices report has window sizes 1 and 7, and
-// the expected activity indicators of Observations for that report on the i-th
-// day are:
+// the expected activity indicators of Observations for that report on the
+// i-th day are:
//
// (i, window size) active for event codes
// ------------------------------------------------------
@@ -1381,9 +1801,9 @@
// Checks that UniqueActivesObservations with the expected values are
// generated when some events have been logged for a UNIQUE_N_DAY_ACTIVES
-// report for over multiple days and GenerateObservations() is called each day,
-// and when the LocalAggregateStore is garbage-collected after each call to
-// GenerateObservations().
+// report for over multiple days and GenerateObservations() is called each
+// day, and when the LocalAggregateStore is garbage-collected after each call
+// to GenerateObservations().
//
// Logging pattern:
// Logs events for the SomeEventsOccurred_UniqueDevices report (whose parent
@@ -1400,12 +1820,13 @@
//
// Expected number of Observations:
// Each call to GenerateObservations should generate a number of Observations
-// equal to the daily_num_obs field of |kNoisefreeUniqueActivesExpectedParams|.
+// equal to the daily_num_obs field of
+// |kNoisefreeUniqueActivesExpectedParams|.
//
// Expected Observation values:
// The SomeEventsOccurred_UniqueDevices report has window sizes 1 and 7, and
-// the expected activity indicators of Observations for that report on the i-th
-// day are:
+// the expected activity indicators of Observations for that report on the
+// i-th day are:
//
// (i, window size) active for event codes
// ------------------------------------------------------
@@ -1491,7 +1912,7 @@
// Generate locally aggregated Observations and garbage-collect the
// LocalAggregateStore, both for the previous day as measured by
// |mock_clock_|. Back up the LocalAggregateStore and
- // AggregatedObservationHistory.
+ // AggregatedObservationHistoryStore.
DoScheduledTasksNow();
// Check the generated Observations against the expectation.
EXPECT_TRUE(CheckUniqueActivesObservations(expected_obs[offset],
@@ -1516,18 +1937,18 @@
// |start_day_index + i| and |start_day_index + 2*i|.
//
// Observation generation pattern:
-// The test calls GenerateObservations() on day |start_day_index + i| for i = 0
-// through i = 5 and for i = 8, skipping the days |start_day_index + 6| and
+// The test calls GenerateObservations() on day |start_day_index + i| for i =
+// 0 through i = 5 and for i = 8, skipping the days |start_day_index + 6| and
// |start_day_index + 7|.
//
// Expected numbers of Observations:
// It is expected that 4 days' worth of Observations are generated on the
-// first day (the day index for which GenerateObservations() was called, plus 3
-// days of backfill), that 1 day's worth of Observations is generated on the
-// 2nd through 6th day, that 3 days' worth of Observations are generated on the
-// 9th day (the day index for which GenerateObservations() was called, plus 2
-// days of backfill), and that no Observations are generated on the remaining
-// days.
+// first day (the day index for which GenerateObservations() was called, plus
+// 3 days of backfill), that 1 day's worth of Observations is generated on the
+// 2nd through 6th day, that 3 days' worth of Observations are generated on
+// the 9th day (the day index for which GenerateObservations() was called,
+// plus 2 days of backfill), and that no Observations are generated on the
+// remaining days.
//
// Expected Observation values:
// The expected activity indicators of Observations for the
@@ -1693,18 +2114,18 @@
// |start_day_index + i| and |start_day_index + 2*i|.
//
// Observation generation pattern:
-// The test calls GenerateObservations() on day |start_day_index + i| for i = 0
-// through i = 5 and for i = 8, skipping the days |start_day_index + 6| and
+// The test calls GenerateObservations() on day |start_day_index + i| for i =
+// 0 through i = 5 and for i = 8, skipping the days |start_day_index + 6| and
// |start_day_index + 7|.
//
// Expected numbers of Observations:
// It is expected that 4 days' worth of Observations are generated on the
-// first day (the day index for which GenerateObservations() was called, plus 3
-// days of backfill), that 1 day's worth of Observations is generated on the
-// 2nd through 6th day, that 3 days' worth of Observations are generated on the
-// 9th day (the day index for which GenerateObservations() was called, plus 2
-// days of backfill), and that no Observations are generated on the remaining
-// days.
+// first day (the day index for which GenerateObservations() was called, plus
+// 3 days of backfill), that 1 day's worth of Observations is generated on the
+// 2nd through 6th day, that 3 days' worth of Observations are generated on
+// the 9th day (the day index for which GenerateObservations() was called,
+// plus 2 days of backfill), and that no Observations are generated on the
+// remaining days.
//
// Expected Observation values:
// The expected activity indicators of Observations for the
@@ -1757,7 +2178,7 @@
if (offset < 6 || offset == 9) {
// Generate Observations and garbage-collect, both for the previous day
// index according to |mock_clock_|. Back up the LocalAggregateStore and
- // the AggregatedObservationHistory.
+ // the AggregatedObservationHistoryStore.
DoScheduledTasksNow();
}
// Make the set of Observations which are expected to be generated on
@@ -1858,6 +2279,88 @@
}
}
+// Tests that the LocalAggregateStore is updated as expected when
+// EventAggregator::LogPerDeviceCountEvent() is called with valid arguments;
+// i.e., with a report ID associated to an existing key of the
+// LocalAggregateStore, and with an EventRecord which wraps a CountEvent.
+//
+// Logs some valid events each day for 35 days, checking the contents of the
+// LocalAggregateStore each day.
+TEST_F(PerDeviceCountEventAggregatorTest, LogEvents) {
+ LoggedCounts logged_counts;
+ uint32_t num_days = 35;
+ for (uint32_t offset = 0; offset < num_days; offset++) {
+ auto day_index = CurrentDayIndex();
+ EXPECT_EQ(kOK, LogPerDeviceCountEvent(kConnectionFailuresMetricReportId,
+ day_index, "component_A", 0u, 5,
+ &logged_counts));
+ EXPECT_EQ(kOK, LogPerDeviceCountEvent(kConnectionFailuresMetricReportId,
+ day_index, "component_A", 0u, 7,
+ &logged_counts));
+ EXPECT_EQ(kOK, LogPerDeviceCountEvent(kConnectionFailuresMetricReportId,
+ day_index, "component_A", 1u, 3,
+ &logged_counts));
+ EXPECT_EQ(kOK,
+ LogPerDeviceCountEvent(kSettingsChangedMetricReportId, day_index,
+ "component_B", 0u, 10, &logged_counts));
+ EXPECT_EQ(kOK,
+ LogPerDeviceCountEvent(kSettingsChangedMetricReportId, day_index,
+ "component_A", 0u, 2, &logged_counts));
+ EXPECT_EQ(kOK, LogPerDeviceCountEvent(kConnectionFailuresMetricReportId,
+ day_index, "component_C", 0u, 15,
+ &logged_counts));
+ EXPECT_EQ(kOK,
+ LogPerDeviceCountEvent(kSettingsChangedMetricReportId, day_index,
+ "component_B", 0u, 4, &logged_counts));
+ EXPECT_TRUE(CheckPerDeviceCountAggregates(logged_counts, day_index));
+ AdvanceClock(kDay);
+ }
+}
+
+// Tests GarbageCollect() for PerDeviceCountReportAggregates.
+//
+// For each value of N in the range [0, 34], logs some CountEvents for a
+// PerDeviceCount report each day for N consecutive days, and then
+// garbage-collects the LocalAggregateStore. After garbage collection, verifies
+// the contents of the LocalAggregateStore.
+TEST_F(PerDeviceCountEventAggregatorTest, GarbageCollect) {
+ uint32_t max_days_before_gc = 35;
+ for (uint32_t days_before_gc = 0; days_before_gc < max_days_before_gc;
+ days_before_gc++) {
+ SetUp();
+ day_last_garbage_collected_ = 0u;
+ LoggedCounts logged_counts;
+ for (uint32_t offset = 0; offset < days_before_gc; offset++) {
+ auto day_index = CurrentDayIndex();
+ for (const auto& metric_report_id :
+ kPerDeviceCountExpectedParams.metric_report_ids) {
+ for (const auto& component :
+ {"component_A", "component_B", "component_C"}) {
+ // Log 2 events with event code 0, for each component A, B, C.
+ EXPECT_EQ(kOK,
+ LogPerDeviceCountEvent(metric_report_id, day_index,
+ component, 0u, 2, &logged_counts));
+ EXPECT_EQ(kOK,
+ LogPerDeviceCountEvent(metric_report_id, day_index,
+ component, 0u, 3, &logged_counts));
+ }
+ if (offset < 3) {
+ // Log 1 event for component D and event code 1.
+ EXPECT_EQ(kOK, LogPerDeviceCountEvent(metric_report_id, day_index,
+ "component_D", 1u, 4,
+ &logged_counts));
+ }
+ }
+ AdvanceClock(kDay);
+ }
+ auto end_day_index = CurrentDayIndex();
+ EXPECT_EQ(kOK, GarbageCollect(end_day_index));
+ day_last_garbage_collected_ = end_day_index;
+ EXPECT_TRUE(CheckPerDeviceCountAggregates(logged_counts, end_day_index));
+ TearDown();
+ }
+}
+
// Tests GenerateObservations() and GarbageCollect() in the case where the
// LocalAggregateStore contains aggregates for metrics with both UTC and LOCAL
// time zone policies, and where the day index in local time may be less than
@@ -1889,25 +2392,27 @@
LogUniqueActivesEvent(kFeaturesActiveMetricReportId, start_day_index + 1, 1u);
GenerateObservations(start_day_index, start_day_index - 1);
GarbageCollect(start_day_index, start_day_index - 1);
- // Form the expected contents of the FakeObservationStore. Since Observations
- // have already been generated for the DeviceBoots_UniqueDevices report for
- // |start_day_index - 1|, expect no Observations for that report.
+ // Form the expected contents of the FakeObservationStore. Since
+ // Observations have already been generated for the
+ // DeviceBoots_UniqueDevices report for |start_day_index - 1|, expect no
+ // Observations for that report.
expected_obs[1][{kFeaturesActiveMetricReportId, start_day_index}] = {
{1, {true, false, false}}};
EXPECT_TRUE(CheckUniqueActivesObservations(
expected_obs[1], observation_store_.get(), update_recipient_.get()));
ResetObservationStore();
- // Advance the day index in local time so that it is equal to the day index in
- // UTC. Log 1 event for event code 2 for each of the 2 reports, then generate
- // Observations and garbage-collect for the previous day in each of UTC and
- // local time.
+ // Advance the day index in local time so that it is equal to the day index
+ // in UTC. Log 1 event for event code 2 for each of the 2 reports, then
+ // generate Observations and garbage-collect for the previous day in each of
+ // UTC and local time.
LogUniqueActivesEvent(kDeviceBootsMetricReportId, start_day_index + 1, 2u);
LogUniqueActivesEvent(kFeaturesActiveMetricReportId, start_day_index + 1, 2u);
GenerateObservations(start_day_index, start_day_index);
GarbageCollect(start_day_index, start_day_index);
- // Form the expected contents of the FakeObservationStore. Since Observations
- // have already been generated for the FeaturesActive_UniqueDevices report for
- // day |start_day_index|, expect no Observations for that report.
+ // Form the expected contents of the FakeObservationStore. Since
+ // Observations have already been generated for the
+ // FeaturesActive_UniqueDevices report for day |start_day_index|, expect no
+ // Observations for that report.
expected_obs[2][{kDeviceBootsMetricReportId, start_day_index}] = {
{1, {true, true, false}}};
EXPECT_TRUE(CheckUniqueActivesObservations(
@@ -1945,9 +2450,10 @@
LogUniqueActivesEvent(kFeaturesActiveMetricReportId, start_day_index, 1u);
GenerateObservations(start_day_index - 1, start_day_index);
GarbageCollect(start_day_index - 1, start_day_index);
- // Form the expected contents of the FakeObservationStore. Since Observations
- // have already been generated for the FeaturesActive_UniqueDevices report for
- // |start_day_index - 1|, expect no Observations for that report.
+ // Form the expected contents of the FakeObservationStore. Since
+ // Observations have already been generated for the
+ // FeaturesActive_UniqueDevices report for |start_day_index - 1|, expect no
+ // Observations for that report.
expected_obs[1][{kDeviceBootsMetricReportId, start_day_index}] = {
{1, {true, false, false}}};
EXPECT_TRUE(CheckUniqueActivesObservations(
@@ -1961,9 +2467,10 @@
LogUniqueActivesEvent(kFeaturesActiveMetricReportId, start_day_index + 1, 2u);
GenerateObservations(start_day_index, start_day_index);
GarbageCollect(start_day_index, start_day_index);
- // Form the expected contents of the FakeObservationStore. Since Observations
- // have already been generated for the DeviceBoots_UniqueDevices report for
- // day |start_day_index|, expect no Observations for that report.
+ // Form the expected contents of the FakeObservationStore. Since
+ // Observations have already been generated for the
+ // DeviceBoots_UniqueDevices report for day |start_day_index|, expect no
+ // Observations for that report.
expected_obs[2][{kFeaturesActiveMetricReportId, start_day_index}] = {
{1, {true, true, false}}};
EXPECT_TRUE(CheckUniqueActivesObservations(
@@ -1971,8 +2478,8 @@
}
// Starts the worker thread, and destructs the EventAggregator without
-// explicitly shutting down the worker thread. Checks that the shutdown flag and
-// worker thread are in the expected states before and after the thread is
+// explicitly shutting down the worker thread. Checks that the shutdown flag
+// and worker thread are in the expected states before and after the thread is
// started.
TEST_F(EventAggregatorWorkerTest, StartWorkerThread) {
EXPECT_TRUE(in_shutdown_state());
@@ -2003,54 +2510,34 @@
// EventAggregator::UpdateAggregationConfigs() on the main thread.
TEST_F(EventAggregatorWorkerTest, UpdateAggregationConfigs) {
event_aggregator_->Start();
- // Provide the EventAggregator with |kUniqueActivesMetricDefinitions|.
- auto unique_actives_project_context =
- MakeProjectContext(kUniqueActivesMetricDefinitions);
- EXPECT_EQ(kOK, event_aggregator_->UpdateAggregationConfigs(
- *unique_actives_project_context));
+ // Provide the EventAggregator with |kMetricDefinitions|.
+ auto project_context = MakeProjectContext(kMetricDefinitions);
+ EXPECT_EQ(kOK, event_aggregator_->UpdateAggregationConfigs(*project_context));
// Check that the number of key-value pairs in the LocalAggregateStore is
// now equal to the number of locally aggregated reports in
- // |kUniqueActivesMetricDefinitions|.
- EXPECT_EQ(kUniqueActivesExpectedParams.metric_report_ids.size(),
- CopyLocalAggregateStore().aggregates().size());
+ // |kMetricDefinitions|.
+ EXPECT_EQ(kExpectedParams.metric_report_ids.size(),
+ CopyLocalAggregateStore().by_report_key().size());
}
// Starts the worker thread, provides a ProjectContext, logs some events, and
-// shuts down the worker thread. Checks that the LocalAggregateStore was backed
-// up at least once during the lifetime of the worker thread.
+// shuts down the worker thread. Checks that the LocalAggregateStore was
+// backed up at least once during the lifetime of the worker thread.
TEST_F(EventAggregatorWorkerTest, LogEvents) {
auto day_index = CurrentDayIndex();
event_aggregator_->Start();
- // Provide the EventAggregator with |kUniqueActivesMetricDefinitions|.
- auto unique_actives_project_context =
- MakeProjectContext(kUniqueActivesMetricDefinitions);
- EXPECT_EQ(kOK, event_aggregator_->UpdateAggregationConfigs(
- *unique_actives_project_context));
+ // Provide the EventAggregator with |kMetricDefinitions|.
+ auto project_context = MakeProjectContext(kMetricDefinitions);
+ EXPECT_EQ(kOK, event_aggregator_->UpdateAggregationConfigs(*project_context));
// Log some events.
LoggedActivity logged_activity;
- EXPECT_EQ(kOK, LogUniqueActivesEvent(*unique_actives_project_context,
- kDeviceBootsMetricReportId, day_index,
- 0u, &logged_activity));
- EXPECT_EQ(kOK, LogUniqueActivesEvent(*unique_actives_project_context,
- kFeaturesActiveMetricReportId, day_index,
- 4u, &logged_activity));
- EXPECT_EQ(kOK, LogUniqueActivesEvent(*unique_actives_project_context,
+ EXPECT_EQ(kOK, LogUniqueActivesEvent(*project_context,
kErrorsOccurredMetricReportId, day_index,
1u, &logged_activity));
- EXPECT_TRUE(CheckAggregateStore(logged_activity, day_index));
+ EXPECT_TRUE(CheckUniqueActivesAggregates(logged_activity, day_index));
ShutDownWorkerThread();
EXPECT_GE(local_aggregate_proto_store_->write_count_, 1);
}
-// Tests that GenerateObservationsNoWorker returns the error status kOther if
-// called while the worker thread is running.
-TEST_F(EventAggregatorWorkerTest, GenerateObservationsNoWorker) {
- event_aggregator_->Start();
- ASSERT_TRUE(in_run_state());
- EXPECT_EQ(kOther,
- event_aggregator_->GenerateObservationsNoWorker(CurrentDayIndex()));
- ShutDownWorkerThread();
-}
-
} // namespace logger
} // namespace cobalt
diff --git a/logger/local_aggregation.proto b/logger/local_aggregation.proto
index dbab1b8..28ceb2f 100644
--- a/logger/local_aggregation.proto
+++ b/logger/local_aggregation.proto
@@ -22,15 +22,59 @@
// logged events.
message LocalAggregateStore {
// Keyed by base64-encoded serializations of ReportAggregationKey messages.
- map<string, ReportAggregates> aggregates = 1;
+ map<string, ReportAggregates> by_report_key = 1;
}
message ReportAggregates {
- // Keyed by event code.
- map<uint32, DailyAggregates> by_event_code = 1;
+ // A collection of aggregates whose form depends on the report type.
+ oneof type {
+ UniqueActivesReportAggregates unique_actives_aggregates = 1;
+ PerDeviceCountReportAggregates count_aggregates = 2;
+ }
// The configuration for the report represented by the ReportAggregationKey
// of this ReportAggregates.
- AggregationConfig aggregation_config = 2;
+ AggregationConfig aggregation_config = 100;
+}
+
+message UniqueActivesReportAggregates {
+ // Keyed by event code.
+ map<uint32, DailyAggregates> by_event_code = 1;
+}
+
+message PerDeviceCountReportAggregates {
+ // Keyed by component string.
+ map<string, EventCodeAggregates> by_component = 1;
+}
+
+message EventCodeAggregates {
+ // Keyed by event code.
+ map<uint32, DailyAggregates> by_event_code = 1;
+}
+
+message DailyAggregates {
+ // Keyed by day index.
+ map<uint32, DailyAggregate> by_day_index = 1;
+}
+
+// A value formed by aggregating the events logged for a single report, event
+// code, and day index.
+message DailyAggregate {
+ oneof type {
+ ActivityDailyAggregate activity_daily_aggregate = 1;
+ CountDailyAggregate count_daily_aggregate = 2;
+ }
+}
+
+// A representation of the occurrence or non-occurrence of an event code on
+// a given day.
+message ActivityDailyAggregate {
+ bool activity_indicator = 1;
+}
+
+// A representation of the number of occurrences of a particular event code,
+// with a particular component label, on a given day.
+message CountDailyAggregate {
+ int64 count = 1;
}
// A representation of the configuration of a locally aggregated report and
@@ -54,39 +98,25 @@
ReportDefinition report = 3;
}
-message DailyAggregates {
- // Keyed by day index.
- map<uint32, DailyAggregate> by_day_index = 1;
-}
-
-// A value formed by aggregating the events logged for a single report, event
-// code, and day index.
-message DailyAggregate {
- oneof type {
- ActivityDailyAggregate activity_daily_aggregate = 1;
- }
-}
-
-// A representation of the occurrence or non-occurrence of an event code on
-// a given day.
-message ActivityDailyAggregate {
- bool activity_indicator = 1;
-}
-
// A container used by the EventAggregator to store the latest day index for
// which an Observation has been generated for each report, event code, and
// window size.
-message AggregatedObservationHistory {
+message AggregatedObservationHistoryStore {
// Keyed by base64-encoded serializations of ReportAggregationKey messages.
- map<string, HistoryByEventCode> by_report_key = 1;
+ map<string, AggregatedObservationHistory> by_report_key = 1;
}
-message HistoryByEventCode {
+message AggregatedObservationHistory {
+ oneof type {
+ UniqueActivesObservationHistory unique_actives_history = 1;
+ }
+}
+
+message UniqueActivesObservationHistory {
// Keyed by event code.
map<uint32, HistoryByWindowSize> by_event_code = 1;
}
-
message HistoryByWindowSize {
// Keyed by window size. The value at a window size is the latest day index
// for which an Observation has been generated for this report, event code,
diff --git a/logger/logger_test.cc b/logger/logger_test.cc
index 4cc2261..4ff2825 100644
--- a/logger/logger_test.cc
+++ b/logger/logger_test.cc
@@ -56,6 +56,8 @@
using testing::TestUpdateRecipient;
namespace {
+// Number of seconds in a day
+const int kDay = 60 * 60 * 24;
// Number of seconds in an ideal year
const int kYear = kDay * 365;
diff --git a/logger/logger_test_utils.h b/logger/logger_test_utils.h
index 535c1b5..16a25b6 100644
--- a/logger/logger_test_utils.h
+++ b/logger/logger_test_utils.h
@@ -90,23 +90,24 @@
int write_count_;
};
-// A container for information about the set of all locally aggregated
-// reports in a configuration. This is used by tests to check the output of the
+// A container for information about the locally aggregated Observations which
+// should be generated for a given day, for all locally aggregated reports in a
+// configuration. This is used by tests to check the output of the
// EventAggregator.
typedef struct ExpectedAggregationParams {
// The total number of locally aggregated Observations which should be
- // generated each day.
+ // generated for the day.
size_t daily_num_obs = 0;
// The MetricReportIds of the locally aggregated reports in this
// configuration.
std::set<MetricReportId> metric_report_ids;
// Keys are the MetricReportIds of all locally aggregated reports in a config.
// The value at a key is the number of Observations which should be generated
- // each day for that report.
+ // for that report, for the day.
std::map<MetricReportId, size_t> num_obs_per_report;
- // Keys are the MetricReportIds of all locally aggregated reports in a config.
- // The value at a key is the number of event codes for that report's
- // parent MetricDefinition.
+ // Keys are the MetricReportIds of the UNIQUE_N_DAY_ACTIVES reports in the
+ // config. The value at a MetricReportId is the number of event codes for that
+ // report's parent metric.
std::map<MetricReportId, size_t> num_event_codes;
// Keys are the MetricReportIds of all locally aggregated reports in a config.
// The value at a key is the set of window sizes of that report.