blob: f9478ee7b38833fb813b2ea7045ab33e4f0cd429 [file] [log] [blame]
// Copyright 2019 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "src/local_aggregation/event_aggregator.h"
#include <algorithm>
#include <map>
#include <memory>
#include <set>
#include <utility>
#include <vector>
#include <google/protobuf/io/coded_stream.h>
#include <google/protobuf/io/zero_copy_stream_impl_lite.h>
#include "src/lib/util/clock.h"
#include "src/lib/util/datetime_util.h"
#include "src/lib/util/proto_util.h"
#include "src/local_aggregation/aggregation_utils.h"
#include "src/local_aggregation/event_aggregator_mgr.h"
#include "src/logger/logger_test_utils.h"
#include "src/logger/testing_constants.h"
#include "src/pb/event.pb.h"
#include "src/registry/packed_event_codes.h"
#include "src/registry/project_configs.h"
#include "third_party/googletest/googletest/include/gtest/gtest.h"
namespace cobalt::local_aggregation {
using config::PackEventCodes;
using logger::Encoder;
using logger::EventRecord;
using logger::kInvalidArguments;
using logger::kOK;
using logger::MetricReportId;
using logger::ObservationWriter;
using logger::ProjectContext;
using logger::Status;
using logger::testing::CheckUniqueActivesObservations;
using logger::testing::ExpectedUniqueActivesObservations;
using logger::testing::FakeObservationStore;
using logger::testing::GetTestProject;
using logger::testing::MakeAggregationConfig;
using logger::testing::MakeAggregationKey;
using logger::testing::MakeNullExpectedUniqueActivesObservations;
using logger::testing::MockConsistentProtoStore;
using logger::testing::TestUpdateRecipient;
using system_data::ClientSecret;
using system_data::SystemDataInterface;
using util::EncryptedMessageMaker;
using util::IncrementingSteadyClock;
using util::IncrementingSystemClock;
using util::SerializeToBase64;
using util::TimeToDayIndex;
namespace {
// Number of seconds in a day
constexpr int kDay = 60 * 60 * 24;
// Number of seconds in an ideal year
constexpr int kYear = kDay * 365;
template <typename T>
std::string SerializeAsStringDeterministic(const T& message) {
std::string s;
{
google::protobuf::io::StringOutputStream output(&s);
google::protobuf::io::CodedOutputStream out(&output);
out.SetSerializationDeterministic(true);
message.SerializePartialToCodedStream(&out);
}
return s;
}
// Filenames for constructors of ConsistentProtoStores
constexpr char kAggregateStoreFilename[] = "local_aggregate_store_backup";
constexpr char kObsHistoryFilename[] = "obs_history_backup";
// A map keyed by base64-encoded, serialized ReportAggregationKeys. The value at
// a key is a map of event codes to sets of day indices. Used in tests as
// a record, external to the LocalAggregateStore, of the activity logged for
// UNIQUE_N_DAY_ACTIVES reports.
using LoggedActivity = std::map<std::string, std::map<uint32_t, std::set<uint32_t>>>;
// A map used in tests as a record, external to the LocalAggregateStore, of the
// activity logged for PER_DEVICE_NUMERIC_STATS reports. The keys are, in
// descending order, serialized ReportAggregationKeys, components, event codes,
// and day indices. Each day index maps to a vector of numeric values that were
// logged for that day index..
using LoggedValues =
std::map<std::string,
std::map<std::string, std::map<uint32_t, std::map<uint32_t, std::vector<int64_t>>>>>;
} // namespace
// EventAggregatorTest creates an EventAggregator which sends its Observations
// to a FakeObservationStore. The EventAggregator is not pre-populated with
// aggregation configurations.
class EventAggregatorTest : public ::testing::Test {
protected:
void SetUp() override {
observation_store_ = std::make_unique<FakeObservationStore>();
update_recipient_ = std::make_unique<TestUpdateRecipient>();
observation_encrypter_ = EncryptedMessageMaker::MakeUnencrypted();
observation_writer_ = std::make_unique<ObservationWriter>(
observation_store_.get(), update_recipient_.get(), observation_encrypter_.get());
encoder_ = std::make_unique<Encoder>(system_data::ClientSecret::GenerateNewSecret(),
system_data_.get());
local_aggregate_proto_store_ =
std::make_unique<MockConsistentProtoStore>(kAggregateStoreFilename);
obs_history_proto_store_ = std::make_unique<MockConsistentProtoStore>(kObsHistoryFilename);
ResetEventAggregator();
}
void ResetEventAggregator() {
event_aggregator_mgr_ = std::make_unique<EventAggregatorManager>(
encoder_.get(), observation_writer_.get(), local_aggregate_proto_store_.get(),
obs_history_proto_store_.get());
// Pass this clock to the EventAggregator::Start method, if it is called.
test_clock_ = std::make_unique<IncrementingSystemClock>(std::chrono::system_clock::duration(0));
// Initilize it to 10 years after the beginning of time.
test_clock_->set_time(std::chrono::system_clock::time_point(std::chrono::seconds(10 * kYear)));
// Use this to advance the clock in the tests.
unowned_test_clock_ = test_clock_.get();
day_store_created_ = CurrentDayIndex();
test_steady_clock_ = new IncrementingSteadyClock(std::chrono::system_clock::duration(0));
event_aggregator_mgr_->GetEventAggregator()->SetSteadyClock(test_steady_clock_);
}
// Destruct the EventAggregator (thus calling EventAggregator::ShutDown())
// before destructing the objects which the EventAggregator points to but does
// not own.
void TearDown() override { event_aggregator_mgr_.reset(); }
// Advances |test_clock_| by |num_seconds| seconds.
void AdvanceClock(int num_seconds) {
unowned_test_clock_->increment_by(std::chrono::seconds(num_seconds));
test_steady_clock_->increment_by(std::chrono::seconds(num_seconds));
}
// Returns the day index of the current day according to |test_clock_|, in
// |time_zone|, without incrementing the clock.
uint32_t CurrentDayIndex(MetricDefinition::TimeZonePolicy time_zone = MetricDefinition::UTC) {
return TimeToDayIndex(std::chrono::system_clock::to_time_t(unowned_test_clock_->peek_now()),
time_zone);
}
size_t GetBackfillDays() {
return event_aggregator_mgr_->GetEventAggregator()->aggregate_store_->backfill_days_;
}
LocalAggregateStore CopyLocalAggregateStore() {
return event_aggregator_mgr_->GetEventAggregator()->aggregate_store_->CopyLocalAggregateStore();
}
void TriggerAndWaitForDoScheduledTasks() {
{
// Acquire the lock to manually trigger the scheduled tasks.
auto locked =
event_aggregator_mgr_->GetEventAggregator()->protected_worker_thread_controller_.lock();
locked->immediate_run_trigger = true;
locked->shutdown_notifier.notify_all();
}
while (true) {
// Reacquire the lock to make sure that the scheduled tasks have completed.
auto locked =
event_aggregator_mgr_->GetEventAggregator()->protected_worker_thread_controller_.lock();
if (!locked->immediate_run_trigger) {
break;
}
std::this_thread::yield();
}
}
// Clears the FakeObservationStore and resets the counts of Observations
// received by the FakeObservationStore and the TestUpdateRecipient.
void ResetObservationStore() {
observation_store_->messages_received.clear();
observation_store_->metadata_received.clear();
observation_store_->ResetObservationCounter();
update_recipient_->invocation_count = 0;
}
// Given a ProjectContext |project_context| and the MetricReportId of a
// UNIQUE_N_DAY_ACTIVES report in |project_context|, as well as a day index
// and an event code, logs an OccurrenceEvent to the EventAggregator for
// that report, day index, and event code. If a non-null LoggedActivity map is
// provided, updates the map with information about the logged Event.
Status AddUniqueActivesEvent(std::shared_ptr<const ProjectContext> project_context,
const MetricReportId& metric_report_id, uint32_t day_index,
uint32_t event_code, LoggedActivity* logged_activity = nullptr) {
EventRecord event_record(std::move(project_context), metric_report_id.first);
event_record.event()->set_day_index(day_index);
event_record.event()->mutable_occurrence_event()->set_event_code(event_code);
auto status = event_aggregator_mgr_->GetEventAggregator()->AddUniqueActivesEvent(
metric_report_id.second, event_record);
if (logged_activity == nullptr) {
return status;
}
std::string key;
if (!SerializeToBase64(MakeAggregationKey(*event_record.project_context(), metric_report_id),
&key)) {
return kInvalidArguments;
}
(*logged_activity)[key][event_code].insert(day_index);
return status;
}
// Given a ProjectContext |project_context| and the MetricReportId of an
// EVENT_COUNT metric with a PER_DEVICE_NUMERIC_STATS report in
// |project_context|, as well as a day index, a component string, and an event
// code, logs a CountEvent to the EventAggregator for that report, day
// index, component, and event code. If a non-null LoggedValues map is
// provided, updates the map with information about the logged Event.
Status AddPerDeviceCountEvent(std::shared_ptr<const ProjectContext> project_context,
const MetricReportId& metric_report_id, uint32_t day_index,
const std::string& component, uint32_t event_code, int64_t count,
LoggedValues* logged_values = nullptr) {
EventRecord event_record(std::move(project_context), metric_report_id.first);
event_record.event()->set_day_index(day_index);
auto count_event = event_record.event()->mutable_count_event();
count_event->set_component(component);
count_event->add_event_code(event_code);
count_event->set_count(count);
auto status = event_aggregator_mgr_->GetEventAggregator()->AddCountEvent(
metric_report_id.second, event_record);
if (logged_values == nullptr) {
return status;
}
std::string key;
if (!SerializeToBase64(MakeAggregationKey(*event_record.project_context(), metric_report_id),
&key)) {
return kInvalidArguments;
}
(*logged_values)[key][component][event_code][day_index].push_back(count);
return status;
}
// Given a ProjectContext |project_context| and the MetricReportId of an
// ELAPSED_TIME metric with a PER_DEVICE_NUMERIC_STATS report in
// |project_context|, as well as a day index, a component string, and an event
// code, logs an ElapsedTimeEvent to the EventAggregator for that report, day
// index, component, and event code. If a non-null LoggedValues map is
// provided, updates the map with information about the logged Event.
Status AddPerDeviceElapsedTimeEvent(std::shared_ptr<const ProjectContext> project_context,
const MetricReportId& metric_report_id, uint32_t day_index,
const std::string& component, uint32_t event_code,
int64_t micros, LoggedValues* logged_values = nullptr) {
EventRecord event_record(std::move(project_context), metric_report_id.first);
event_record.event()->set_day_index(day_index);
auto elapsed_time_event = event_record.event()->mutable_elapsed_time_event();
elapsed_time_event->set_component(component);
elapsed_time_event->add_event_code(event_code);
elapsed_time_event->set_elapsed_micros(micros);
auto status = event_aggregator_mgr_->GetEventAggregator()->AddElapsedTimeEvent(
metric_report_id.second, event_record);
if (logged_values == nullptr) {
return status;
}
std::string key;
if (!SerializeToBase64(MakeAggregationKey(*event_record.project_context(), metric_report_id),
&key)) {
return kInvalidArguments;
}
(*logged_values)[key][component][event_code][day_index].push_back(micros);
return status;
}
// Given a ProjectContext |project_context| and the MetricReportId of a
// FRAME_RATE metric with a PER_DEVICE_NUMERIC_STATS report in
// |project_context|, as well as a day index, a component string, and an event
// code, logs a FrameRateEvent to the EventAggregator for that report, day
// index, component, and event code. If a non-null LoggedValues map is
// provided, updates the map with information about the logged Event.
Status AddPerDeviceFrameRateEvent(std::shared_ptr<const ProjectContext> project_context,
const MetricReportId& metric_report_id, uint32_t day_index,
const std::string& component, uint32_t event_code, float fps,
LoggedValues* logged_values = nullptr) {
EventRecord event_record(std::move(project_context), metric_report_id.first);
event_record.event()->set_day_index(day_index);
auto frame_rate_event = event_record.event()->mutable_frame_rate_event();
frame_rate_event->set_component(component);
frame_rate_event->add_event_code(event_code);
int64_t frames_per_1000_seconds = std::round(fps * 1000.0);
frame_rate_event->set_frames_per_1000_seconds(frames_per_1000_seconds);
auto status = event_aggregator_mgr_->GetEventAggregator()->AddFrameRateEvent(
metric_report_id.second, event_record);
if (logged_values == nullptr) {
return status;
}
std::string key;
if (!SerializeToBase64(MakeAggregationKey(*event_record.project_context(), metric_report_id),
&key)) {
return kInvalidArguments;
}
(*logged_values)[key][component][event_code][day_index].push_back(frames_per_1000_seconds);
return status;
}
// Given a ProjectContext |project_context| and the MetricReportId of a
// MEMORY_USAGE metric with a PER_DEVICE_NUMERIC_STATS report in
// |project_context|, as well as a day index, a component string, and an event
// code, logs a MemoryUsageEvent to the EventAggregator for that report, day
// index, component, and event code. If a non-null LoggedValues map is
// provided, updates the map with information about the logged Event.
Status AddPerDeviceMemoryUsageEvent(std::shared_ptr<const ProjectContext> project_context,
const MetricReportId& metric_report_id, uint32_t day_index,
const std::string& component,
const std::vector<uint32_t>& event_codes, int64_t bytes,
LoggedValues* logged_values = nullptr) {
EventRecord event_record(std::move(project_context), metric_report_id.first);
event_record.event()->set_day_index(day_index);
auto memory_usage_event = event_record.event()->mutable_memory_usage_event();
memory_usage_event->set_component(component);
for (auto event_code : event_codes) {
memory_usage_event->add_event_code(event_code);
}
memory_usage_event->set_bytes(bytes);
auto status = event_aggregator_mgr_->GetEventAggregator()->AddMemoryUsageEvent(
metric_report_id.second, event_record);
if (logged_values == nullptr) {
return status;
}
std::string key;
if (!SerializeToBase64(MakeAggregationKey(*event_record.project_context(), metric_report_id),
&key)) {
return kInvalidArguments;
}
(*logged_values)[key][component][PackEventCodes(event_codes)][day_index].push_back(bytes);
return status;
}
// Given a LoggedActivity map describing the events that have been logged
// to the EventAggregator, checks whether the contents of the
// LocalAggregateStore are as expected, accounting for any garbage
// collection.
//
// logged_activity: a LoggedActivity representing event occurrences
// since the LocalAggregateStore was created. All day indices should be
// greater than or equal to |day_store_created_| and less than or equal to
// |current_day_index|.
//
// current_day_index: The day index of the current day in the test's frame
// of reference.
bool CheckUniqueActivesAggregates(const LoggedActivity& logged_activity,
uint32_t /*current_day_index*/) {
auto local_aggregate_store =
event_aggregator_mgr_->GetEventAggregator()->aggregate_store_->CopyLocalAggregateStore();
// Check that the LocalAggregateStore contains no more UniqueActives
// aggregates than |logged_activity| and |day_last_garbage_collected_|
// should imply.
for (const auto& report_pair : local_aggregate_store.by_report_key()) {
const auto& aggregates = report_pair.second;
if (aggregates.type_case() != ReportAggregates::kUniqueActivesAggregates) {
continue;
}
const auto& report_key = report_pair.first;
// Check whether this ReportAggregationKey is in |logged_activity|. If
// not, expect that its by_event_code map is empty.
auto report_activity = logged_activity.find(report_key);
if (report_activity == logged_activity.end()) {
EXPECT_TRUE(aggregates.unique_actives_aggregates().by_event_code().empty());
if (!aggregates.unique_actives_aggregates().by_event_code().empty()) {
return false;
}
break;
}
auto expected_events = report_activity->second;
for (const auto& event_pair : aggregates.unique_actives_aggregates().by_event_code()) {
// Check that this event code is in |logged_activity| under this
// ReportAggregationKey.
auto event_code = event_pair.first;
auto event_activity = expected_events.find(event_code);
EXPECT_NE(event_activity, expected_events.end());
if (event_activity == expected_events.end()) {
return false;
}
const auto& expected_days = event_activity->second;
for (const auto& day_pair : event_pair.second.by_day_index()) {
// Check that this day index is in |logged_activity| under this
// ReportAggregationKey and event code.
const auto& day_index = day_pair.first;
auto day_activity = expected_days.find(day_index);
EXPECT_NE(day_activity, expected_days.end());
if (day_activity == expected_days.end()) {
return false;
}
// Check that the day index is no earlier than is implied by the
// dates of store creation and garbage collection.
EXPECT_GE(day_index, EarliestAllowedDayIndex(aggregates.aggregation_config()));
if (day_index < EarliestAllowedDayIndex(aggregates.aggregation_config())) {
return false;
}
}
}
}
// Check that the LocalAggregateStore contains aggregates for all events in
// |logged_activity|, as long as they are recent enough to have survived any
// garbage collection.
for (const auto& logged_pair : logged_activity) {
const auto& logged_key = logged_pair.first;
const auto& logged_event_map = logged_pair.second;
// Check that this ReportAggregationKey is in the LocalAggregateStore, and
// that the aggregates are of the expected type.
auto report_aggregates = local_aggregate_store.by_report_key().find(logged_key);
EXPECT_NE(report_aggregates, local_aggregate_store.by_report_key().end());
if (report_aggregates == local_aggregate_store.by_report_key().end()) {
return false;
}
if (report_aggregates->second.type_case() != ReportAggregates::kUniqueActivesAggregates) {
return false;
}
// Compute the earliest day index that should appear among the aggregates
// for this report.
auto earliest_allowed =
EarliestAllowedDayIndex(report_aggregates->second.aggregation_config());
for (const auto& logged_event_pair : logged_event_map) {
const auto& logged_event_code = logged_event_pair.first;
const auto& logged_days = logged_event_pair.second;
// Check whether this event code is in the LocalAggregateStore
// under this ReportAggregationKey. If not, check that all day indices
// for this event code are smaller than the day index of the earliest
// allowed aggregate.
auto event_code_aggregates =
report_aggregates->second.unique_actives_aggregates().by_event_code().find(
logged_event_code);
if (event_code_aggregates ==
report_aggregates->second.unique_actives_aggregates().by_event_code().end()) {
for (auto day_index : logged_days) {
EXPECT_LT(day_index, earliest_allowed);
if (day_index >= earliest_allowed) {
return false;
}
}
break;
}
// Check that all of the day indices in |logged_activity| under this
// ReportAggregationKey and event code are in the
// LocalAggregateStore, as long as they are recent enough to have
// survived any garbage collection. Check that each aggregate has its
// activity field set to true.
for (const auto& logged_day_index : logged_days) {
auto day_aggregate = event_code_aggregates->second.by_day_index().find(logged_day_index);
if (logged_day_index >= earliest_allowed) {
EXPECT_NE(day_aggregate, event_code_aggregates->second.by_day_index().end());
if (day_aggregate == event_code_aggregates->second.by_day_index().end()) {
return false;
}
EXPECT_TRUE(day_aggregate->second.activity_daily_aggregate().activity_indicator());
if (!day_aggregate->second.activity_daily_aggregate().activity_indicator()) {
return false;
}
}
}
}
}
return true;
}
bool CheckPerDeviceNumericAggregates(const LoggedValues& logged_values,
uint32_t /*current_day_index*/) {
auto local_aggregate_store =
event_aggregator_mgr_->GetEventAggregator()->aggregate_store_->CopyLocalAggregateStore();
// Check that the LocalAggregateStore contains no more PerDeviceNumeric
// aggregates than |logged_values| and |day_last_garbage_collected_| should
// imply.
for (const auto& report_pair : local_aggregate_store.by_report_key()) {
const auto& aggregates = report_pair.second;
if (aggregates.type_case() != ReportAggregates::kNumericAggregates) {
continue;
}
const auto& report_key = report_pair.first;
// Check whether this ReportAggregationKey is in |logged_values|. If not,
// expect that its by_component map is empty.
auto report_values = logged_values.find(report_key);
if (report_values == logged_values.end()) {
EXPECT_TRUE(aggregates.numeric_aggregates().by_component().empty());
if (!aggregates.numeric_aggregates().by_component().empty()) {
return false;
}
break;
}
auto expected_components = report_values->second;
for (const auto& component_pair : aggregates.numeric_aggregates().by_component()) {
// Check that this component is in |logged_values| under this
// ReportAggregationKey.
auto component = component_pair.first;
auto component_values = expected_components.find(component);
EXPECT_NE(component_values, expected_components.end());
if (component_values == expected_components.end()) {
return false;
}
const auto& expected_events = component_values->second;
for (const auto& event_pair : component_pair.second.by_event_code()) {
// Check that this event code is in |logged_values| under this
// ReportAggregationKey and component.
const auto& event_code = event_pair.first;
auto event_values = expected_events.find(event_code);
EXPECT_NE(event_values, expected_events.end());
if (event_values == expected_events.end()) {
return false;
}
const auto& expected_days = event_values->second;
for (const auto& day_pair : event_pair.second.by_day_index()) {
// Check that this day index is in |logged_values| under this
// ReportAggregationKey, component, and event code.
const auto& day_index = day_pair.first;
auto day_value = expected_days.find(day_index);
EXPECT_NE(day_value, expected_days.end());
if (day_value == expected_days.end()) {
return false;
}
// Check that the day index is no earlier than is implied by the
// dates of store creation and garbage collection.
EXPECT_GE(day_index, EarliestAllowedDayIndex(aggregates.aggregation_config()));
if (day_index < EarliestAllowedDayIndex(aggregates.aggregation_config())) {
return false;
}
}
}
}
}
// Check that the LocalAggregateStore contains aggregates for all values in
// |logged_values|, as long as they are recent enough to have survived any
// garbage collection.
for (const auto& logged_pair : logged_values) {
const auto& logged_key = logged_pair.first;
const auto& logged_component_map = logged_pair.second;
// Check that this ReportAggregationKey is in the LocalAggregateStore, and
// that the aggregates are of the expected type.
auto report_aggregates = local_aggregate_store.by_report_key().find(logged_key);
EXPECT_NE(report_aggregates, local_aggregate_store.by_report_key().end());
if (report_aggregates == local_aggregate_store.by_report_key().end()) {
return false;
}
if (report_aggregates->second.type_case() != ReportAggregates::kNumericAggregates) {
return false;
}
const auto& aggregation_type =
report_aggregates->second.aggregation_config().report().aggregation_type();
// Compute the earliest day index that should appear among the aggregates
// for this report.
auto earliest_allowed =
EarliestAllowedDayIndex(report_aggregates->second.aggregation_config());
for (const auto& logged_component_pair : logged_component_map) {
const auto& logged_component = logged_component_pair.first;
const auto& logged_event_code_map = logged_component_pair.second;
// Check whether this component is in the LocalAggregateStore under this
// ReportAggregationKey. If not, check that all day indices for all
// entries in |logged_values| under this component are smaller than the
// day index of the earliest allowed aggregate.
bool component_found = false;
auto component_aggregates =
report_aggregates->second.numeric_aggregates().by_component().find(logged_component);
if (component_aggregates !=
report_aggregates->second.numeric_aggregates().by_component().end()) {
component_found = true;
}
for (const auto& logged_event_pair : logged_event_code_map) {
const auto& logged_event_code = logged_event_pair.first;
const auto& logged_day_map = logged_event_pair.second;
// Check whether this event code is in the LocalAggregateStore under
// this ReportAggregationKey. If not, check that all day indices in
// |logged_values| under this component are smaller than the day index
// of the earliest allowed aggregate.
bool event_code_found = false;
if (component_found) {
auto event_code_aggregates =
component_aggregates->second.by_event_code().find(logged_event_code);
if (event_code_aggregates != component_aggregates->second.by_event_code().end()) {
event_code_found = true;
}
if (event_code_found) {
// Check that all of the day indices in |logged_values| under this
// ReportAggregationKey, component, and event code are in the
// LocalAggregateStore, as long as they are recent enough to have
// survived any garbage collection. Check that each aggregate has
// the expected value.
for (const auto& logged_day_pair : logged_day_map) {
auto logged_day_index = logged_day_pair.first;
auto logged_values = logged_day_pair.second;
auto day_aggregate =
event_code_aggregates->second.by_day_index().find(logged_day_index);
if (logged_day_index >= earliest_allowed) {
EXPECT_NE(day_aggregate, event_code_aggregates->second.by_day_index().end());
if (day_aggregate == event_code_aggregates->second.by_day_index().end()) {
return false;
}
int64_t aggregate_from_logged_values = 0;
for (size_t index = 0; index < logged_values.size(); index++) {
switch (aggregation_type) {
case ReportDefinition::SUM:
aggregate_from_logged_values += logged_values[index];
break;
case ReportDefinition::MAX:
aggregate_from_logged_values =
std::max(aggregate_from_logged_values, logged_values[index]);
break;
case ReportDefinition::MIN:
if (index == 0) {
aggregate_from_logged_values = logged_values[0];
}
aggregate_from_logged_values =
std::min(aggregate_from_logged_values, logged_values[index]);
break;
default:
return false;
}
}
EXPECT_EQ(day_aggregate->second.numeric_daily_aggregate().value(),
aggregate_from_logged_values);
if (day_aggregate->second.numeric_daily_aggregate().value() !=
aggregate_from_logged_values) {
return false;
}
}
}
}
}
if (!component_found | !event_code_found) {
for (const auto& logged_day_pair : logged_day_map) {
auto logged_day_index = logged_day_pair.first;
EXPECT_LT(logged_day_index, earliest_allowed);
if (logged_day_index >= earliest_allowed) {
return false;
}
}
break;
}
}
}
}
return true;
}
// Given the AggregationConfig of a locally aggregated report, returns the
// earliest (smallest) day index for which an aggregate may exist in the
// LocalAggregateStore for that report, accounting for garbage
// collection and the number of backfill days.
uint32_t EarliestAllowedDayIndex(const AggregationConfig& config) {
// If the LocalAggregateStore has never been garbage-collected, then the
// earliest allowed day index is just the day when the store was created,
// minus the number of backfill days.
auto backfill_days = GetBackfillDays();
EXPECT_GE(day_store_created_, backfill_days)
<< "The day index of store creation must be larger than the number "
"of backfill days.";
if (day_last_garbage_collected_ == 0u) {
return day_store_created_ - backfill_days;
}
uint32_t max_aggregation_days = 1;
for (const auto& window : config.aggregation_window()) {
if (window.units_case() == OnDeviceAggregationWindow::kDays &&
window.days() > max_aggregation_days) {
max_aggregation_days = window.days();
}
}
// Otherwise, it is the later of:
// (a) The day index on which the store was created minus the number
// of backfill days.
// (b) The day index for which the store was last garbage-collected
// minus the number of backfill days, minus the largest window size in
// the report associated to |config|, plus 1.
EXPECT_GE(day_last_garbage_collected_, backfill_days)
<< "The day index of last garbage collection must be larger than "
"the number of backfill days.";
if (day_last_garbage_collected_ - backfill_days < (max_aggregation_days + 1)) {
return day_store_created_ - backfill_days;
}
return (day_store_created_ < (day_last_garbage_collected_ - max_aggregation_days + 1))
? (day_last_garbage_collected_ - backfill_days - max_aggregation_days + 1)
: day_store_created_ - backfill_days;
}
std::unique_ptr<EventAggregatorManager> event_aggregator_mgr_;
std::unique_ptr<MockConsistentProtoStore> local_aggregate_proto_store_;
std::unique_ptr<MockConsistentProtoStore> obs_history_proto_store_;
std::unique_ptr<ObservationWriter> observation_writer_;
std::unique_ptr<Encoder> encoder_;
std::unique_ptr<EncryptedMessageMaker> observation_encrypter_;
std::unique_ptr<TestUpdateRecipient> update_recipient_;
std::unique_ptr<FakeObservationStore> observation_store_;
std::unique_ptr<IncrementingSystemClock> test_clock_;
IncrementingSystemClock* unowned_test_clock_;
IncrementingSteadyClock* test_steady_clock_;
// The day index on which the LocalAggregateStore was last
// garbage-collected. A value of 0 indicates that the store has never been
// garbage-collected.
uint32_t day_last_garbage_collected_ = 0u;
// The day index on which the LocalAggregateStore was created.
uint32_t day_store_created_ = 0u;
private:
std::unique_ptr<system_data::SystemDataInterface> system_data_;
}; // namespace logger
// Creates an EventAggregator and provides it with a ProjectContext generated
// from a registry.
class EventAggregatorTestWithProjectContext : public EventAggregatorTest {
protected:
explicit EventAggregatorTestWithProjectContext(const std::string& registry_var_name) {
project_context_ = GetTestProject(registry_var_name);
}
void SetUp() override {
EventAggregatorTest::SetUp();
event_aggregator_mgr_->GetEventAggregator()->UpdateAggregationConfigs(*project_context_);
}
// Adds an OccurrenceEvent to the local aggregations for the MetricReportId of a locally
// aggregated report of the ProjectContext. Overrides the method
// EventAggregatorTest::AddUniqueActivesEvent.
Status AddUniqueActivesEvent(const MetricReportId& metric_report_id, uint32_t day_index,
uint32_t event_code, LoggedActivity* logged_activity = nullptr) {
return EventAggregatorTest::AddUniqueActivesEvent(project_context_, metric_report_id, day_index,
event_code, logged_activity);
}
// Adds a CountEvent to the local aggregations for the MetricReportId of a locally aggregated
// report of the ProjectContext. Overrides the method EventAggregatorTest::AddPerDeviceCountEvent.
Status AddPerDeviceCountEvent(const MetricReportId& metric_report_id, uint32_t day_index,
const std::string& component, uint32_t event_code, int64_t count,
LoggedValues* logged_values = nullptr) {
return EventAggregatorTest::AddPerDeviceCountEvent(
project_context_, metric_report_id, day_index, component, event_code, count, logged_values);
}
// Adds an ElapsedTimeEvent to the local aggregations for the MetricReportId of a locally
// aggregated report of the ProjectContext. Overrides the method
// EventAggregatorTest::AddPerDeviceElapsedTimeEvent.
Status AddPerDeviceElapsedTimeEvent(const MetricReportId& metric_report_id, uint32_t day_index,
const std::string& component, uint32_t event_code,
int64_t micros, LoggedValues* logged_values = nullptr) {
return EventAggregatorTest::AddPerDeviceElapsedTimeEvent(project_context_, metric_report_id,
day_index, component, event_code,
micros, logged_values);
}
// Adds a FrameRateEvent to the local aggregations for the MetricReportId of a locally aggregated
// report of the ProjectContext. Overrides the method
// EventAggregatorTest::AddPerDeviceFrameRateEvent.
Status AddPerDeviceFrameRateEvent(const MetricReportId& metric_report_id, uint32_t day_index,
const std::string& component, uint32_t event_code, float fps,
LoggedValues* logged_values = nullptr) {
return EventAggregatorTest::AddPerDeviceFrameRateEvent(
project_context_, metric_report_id, day_index, component, event_code, fps, logged_values);
}
// Adds a MemoryUsageEvent to the local aggregations for the MetricReportId of a locally
// aggregated report of the ProjectContext. Overrides the method
// EventAggregatorTest::AddPerDeviceMemoryUsageEvent.
Status AddPerDeviceMemoryUsageEvent(const MetricReportId& metric_report_id, uint32_t day_index,
const std::string& component,
const std::vector<uint32_t>& event_codes, int64_t bytes,
LoggedValues* logged_values = nullptr) {
return EventAggregatorTest::AddPerDeviceMemoryUsageEvent(project_context_, metric_report_id,
day_index, component, event_codes,
bytes, logged_values);
}
private:
// A ProjectContext wrapping the MetricDefinitions passed to the
// constructor in |metric_string|.
std::shared_ptr<ProjectContext> project_context_;
};
// Creates an EventAggregator and provides it with a ProjectContext generated
// from test_registries/unique_actives_test_registry.yaml. All metrics in this
// registry are of type EVENT_OCCURRED and have a UNIQUE_N_DAY_ACTIVES report.
class UniqueActivesEventAggregatorTest : public EventAggregatorTestWithProjectContext {
protected:
UniqueActivesEventAggregatorTest()
: EventAggregatorTestWithProjectContext(
logger::testing::unique_actives::kCobaltRegistryBase64) {}
};
// Creates an EventAggregator and provides it with a ProjectContext generated
// from test_registries/unique_actives_noise_free_test_registry.yaml. All
// metrics in this registry are of type EVENT_OCCURRED and have a
// UNIQUE_N_DAY_ACTIVES report with local_privacy_noise_level NONE.
class UniqueActivesNoiseFreeEventAggregatorTest : public EventAggregatorTestWithProjectContext {
protected:
UniqueActivesNoiseFreeEventAggregatorTest()
: EventAggregatorTestWithProjectContext(
logger::testing::unique_actives_noise_free::kCobaltRegistryBase64) {}
};
// Creates an EventAggregator and provides it with a ProjectContext generated
// from test_registries/per_device_numeric_stats_test_registry.yaml. All metrics
// in this registry are of type EVENT_COUNT and have a PER_DEVICE_NUMERIC_STATS
// report.
class PerDeviceNumericEventAggregatorTest : public EventAggregatorTestWithProjectContext {
protected:
PerDeviceNumericEventAggregatorTest()
: EventAggregatorTestWithProjectContext(
logger::testing::per_device_numeric_stats::kCobaltRegistryBase64) {}
};
class PerDeviceHistogramEventAggregatorTest : public EventAggregatorTestWithProjectContext {
protected:
PerDeviceHistogramEventAggregatorTest()
: EventAggregatorTestWithProjectContext(
logger::testing::per_device_histogram::kCobaltRegistryBase64) {}
};
class EventAggregatorWorkerTest : public EventAggregatorTest {
protected:
void SetUp() override { EventAggregatorTest::SetUp(); }
void ShutDownWorkerThread() { event_aggregator_mgr_->GetEventAggregator()->ShutDown(); }
bool in_shutdown_state() { return (shutdown_flag_set() && !worker_joinable()); }
bool in_run_state() { return (!shutdown_flag_set() && worker_joinable()); }
bool shutdown_flag_set() {
return event_aggregator_mgr_->GetEventAggregator()
->protected_worker_thread_controller_.const_lock()
->shut_down;
}
bool worker_joinable() {
return event_aggregator_mgr_->GetEventAggregator()->worker_thread_.joinable();
}
};
// Tests that an empty LocalAggregateStore is updated with
// ReportAggregationKeys and AggregationConfigs as expected when
// EventAggregator::UpdateAggregationConfigs is called with a ProjectContext
// containing at least one report for each locally aggregated report type.
TEST_F(EventAggregatorTest, UpdateAggregationConfigs) {
// Check that the LocalAggregateStore is empty.
EXPECT_EQ(0u, CopyLocalAggregateStore().by_report_key().size());
// Provide the unique_actives test registry to the EventAggregator.
auto unique_actives_project_context =
GetTestProject(logger::testing::unique_actives::kCobaltRegistryBase64);
EXPECT_EQ(kOK, event_aggregator_mgr_->GetEventAggregator()->UpdateAggregationConfigs(
*unique_actives_project_context));
// Check that the number of key-value pairs in the LocalAggregateStore is
// now equal to the number of locally aggregated reports in the unique_actives
// test registry.
EXPECT_EQ(logger::testing::unique_actives::kExpectedAggregationParams.metric_report_ids.size(),
CopyLocalAggregateStore().by_report_key().size());
// Check that the LocalAggregateStore contains the expected
// ReportAggregationKey and AggregationConfig for each locally aggregated
// report in the unique_actives registry.
for (const auto& metric_report_id :
logger::testing::unique_actives::kExpectedAggregationParams.metric_report_ids) {
std::string key;
SerializeToBase64(MakeAggregationKey(*unique_actives_project_context, metric_report_id), &key);
auto config = MakeAggregationConfig(*unique_actives_project_context, metric_report_id);
LocalAggregateStore local_aggregate_store = CopyLocalAggregateStore();
auto report_aggregates = local_aggregate_store.by_report_key().find(key);
EXPECT_NE(local_aggregate_store.by_report_key().end(), report_aggregates);
EXPECT_EQ(SerializeAsStringDeterministic(config),
SerializeAsStringDeterministic(report_aggregates->second.aggregation_config()));
}
}
// Tests two assumptions about the behavior of
// EventAggregator::UpdateAggregationConfigs when two projects with the same
// customer ID and project ID provide configurations to the EventAggregator.
// These assumptions are:
// (1) If the second project provides a report with a
// ReportAggregationKey which was not provided by the first project, then
// the EventAggregator accepts the new report.
// (2) If a report provided by the second project has a ReportAggregationKey
// which was already provided by the first project, then the EventAggregator
// rejects the new report, even if its ReportDefinition differs from that of
// existing report with the same ReportAggregationKey.
TEST_F(EventAggregatorTest, UpdateAggregationConfigsWithSameKey) {
// Provide the unique_actives test registry to the EventAggregator.
auto unique_actives_project_context =
GetTestProject(logger::testing::unique_actives::kCobaltRegistryBase64);
EXPECT_EQ(kOK, event_aggregator_mgr_->GetEventAggregator()->UpdateAggregationConfigs(
*unique_actives_project_context));
// Check that the number of key-value pairs in the LocalAggregateStore is
// now equal to the number of locally aggregated reports in the unique_actives
// test registry.
EXPECT_EQ(logger::testing::unique_actives::kExpectedAggregationParams.metric_report_ids.size(),
CopyLocalAggregateStore().by_report_key().size());
// Provide the unique_actives_noise_free test registry to the EventAggregator.
auto unique_actives_noise_free_project_context =
GetTestProject(logger::testing::unique_actives_noise_free::kCobaltRegistryBase64);
EXPECT_EQ(kOK, event_aggregator_mgr_->GetEventAggregator()->UpdateAggregationConfigs(
*unique_actives_noise_free_project_context));
// Check that the number of key-value pairs in the LocalAggregateStore is
// now equal to the number of distinct MetricReportIds of locally
// aggregated reports in the union of the unique_actives and
// unique_actives_noise_free registries.
auto local_aggregate_store = CopyLocalAggregateStore();
EXPECT_EQ(5u, local_aggregate_store.by_report_key().size());
// The MetricReportId |kFeaturesActiveMetricReportId| appears in both
// registries. The associated ReportAggregationKeys are identical, but the
// FeaturesActive_UniqueDevices reports in the two registries have different
// sets of window sizes, so their AggregationConfigs are different.
//
// Check that the AggregationConfig stored in the LocalAggregateStore
// under the key associated to |kFeaturesActiveMetricReportId| is the
// first AggregationConfig that was provided for that key; i.e., is
// derived from the unique_actives test registry.
std::string key;
EXPECT_TRUE(SerializeToBase64(
MakeAggregationKey(*unique_actives_project_context,
logger::testing::unique_actives::kFeaturesActiveMetricReportId),
&key));
auto unique_actives_config =
MakeAggregationConfig(*unique_actives_project_context,
logger::testing::unique_actives::kFeaturesActiveMetricReportId);
auto report_aggregates = local_aggregate_store.by_report_key().find(key);
EXPECT_NE(local_aggregate_store.by_report_key().end(), report_aggregates);
EXPECT_EQ(SerializeAsStringDeterministic(unique_actives_config),
SerializeAsStringDeterministic(report_aggregates->second.aggregation_config()));
auto noise_free_config = MakeAggregationConfig(
*unique_actives_noise_free_project_context,
logger::testing::unique_actives_noise_free::kFeaturesActiveMetricReportId);
EXPECT_NE(SerializeAsStringDeterministic(noise_free_config),
SerializeAsStringDeterministic(report_aggregates->second.aggregation_config()));
}
// Tests that EventAggregator::Log*Event returns |kInvalidArguments| when
// passed a report ID which is not associated to a key of the
// LocalAggregateStore, or when passed an EventRecord containing an Event
// proto message which is not of the appropriate event type.
TEST_F(EventAggregatorTest, LogBadEvents) {
// Provide the unique_actives test registry to the EventAggregator.
std::shared_ptr<ProjectContext> unique_actives_project_context =
GetTestProject(logger::testing::unique_actives::kCobaltRegistryBase64);
EXPECT_EQ(kOK, event_aggregator_mgr_->GetEventAggregator()->UpdateAggregationConfigs(
*unique_actives_project_context));
// Attempt to log a UniqueActivesEvent for
// |kEventsOccurredMetricReportId|, which is not in the unique_actives
// registry. Check that the result is |kInvalidArguments|.
std::shared_ptr<ProjectContext> noise_free_project_context =
GetTestProject(logger::testing::unique_actives_noise_free::kCobaltRegistryBase64);
EventRecord bad_event_record(noise_free_project_context,
logger::testing::unique_actives_noise_free::kEventsOccurredMetricId);
bad_event_record.event()->set_day_index(CurrentDayIndex());
bad_event_record.event()->mutable_occurrence_event()->set_event_code(0u);
EXPECT_EQ(kInvalidArguments, event_aggregator_mgr_->GetEventAggregator()->AddUniqueActivesEvent(
logger::testing::unique_actives_noise_free::
kEventsOccurredEventsOccurredUniqueDevicesReportId,
bad_event_record));
// Attempt to call AddUniqueActivesEvent() with a valid metric and report
// ID, but with an EventRecord wrapping an Event which is not an
// OccurrenceEvent. Check that the result is |kInvalidArguments|.
EventRecord bad_event_record2(unique_actives_project_context,
logger::testing::unique_actives::kFeaturesActiveMetricId);
bad_event_record2.event()->mutable_count_event();
EXPECT_EQ(kInvalidArguments,
event_aggregator_mgr_->GetEventAggregator()->AddUniqueActivesEvent(
logger::testing::unique_actives::kFeaturesActiveFeaturesActiveUniqueDevicesReportId,
bad_event_record2));
// Attempt to call AddPerDeviceCountEvent() with a valid metric and report
// ID, but with an EventRecord wrapping an Event which is not a
// CountEvent. Check that the result is |kInvalidArguments|.
EventRecord bad_event_record3(
noise_free_project_context,
logger::testing::per_device_numeric_stats::kConnectionFailuresMetricReportId.first);
bad_event_record3.event()->mutable_occurrence_event();
EXPECT_EQ(kInvalidArguments, event_aggregator_mgr_->GetEventAggregator()->AddCountEvent(
logger::testing::per_device_numeric_stats::
kConnectionFailuresConnectionFailuresPerDeviceCountReportId,
bad_event_record3));
}
// Tests that the LocalAggregateStore is updated as expected when
// EventAggregator::AddUniqueActivesEvent() is called with valid arguments;
// i.e., with a report ID associated to an existing key of the
// LocalAggregateStore, and with an EventRecord which wraps an
// OccurrenceEvent.
//
// Logs some valid events each day for 35 days, checking the contents of the
// LocalAggregateStore each day.
TEST_F(UniqueActivesEventAggregatorTest, LogEvents) {
LoggedActivity logged_activity;
uint32_t num_days = 35;
for (uint32_t offset = 0; offset < num_days; offset++) {
// Add an event to the local aggregations for the FeaturesActive_UniqueDevices report with event
// code 0. Check the contents of the LocalAggregateStore.
auto day_index = CurrentDayIndex();
EXPECT_EQ(kOK,
AddUniqueActivesEvent(logger::testing::unique_actives::kFeaturesActiveMetricReportId,
day_index, 0u, &logged_activity));
EXPECT_TRUE(CheckUniqueActivesAggregates(logged_activity, day_index));
// Add another event to the local aggregations for the same report, event code, and day index.
// Check the contents of the LocalAggregateStore.
EXPECT_EQ(kOK,
AddUniqueActivesEvent(logger::testing::unique_actives::kFeaturesActiveMetricReportId,
day_index, 0u, &logged_activity));
EXPECT_TRUE(CheckUniqueActivesAggregates(logged_activity, day_index));
// Add several more events to the local aggregations for various valid reports and event codes.
// Check the contents of the LocalAggregateStore.
EXPECT_EQ(kOK,
AddUniqueActivesEvent(logger::testing::unique_actives::kDeviceBootsMetricReportId,
day_index, 0u, &logged_activity));
EXPECT_EQ(kOK,
AddUniqueActivesEvent(logger::testing::unique_actives::kFeaturesActiveMetricReportId,
day_index, 4u, &logged_activity));
EXPECT_EQ(kOK, AddUniqueActivesEvent(
logger::testing::unique_actives::kNetworkActivityWindowSizeMetricReportId,
day_index, 1u, &logged_activity));
EXPECT_EQ(kOK,
AddUniqueActivesEvent(
logger::testing::unique_actives::kNetworkActivityAggregationWindowMetricReportId,
day_index, 1u, &logged_activity));
EXPECT_TRUE(CheckUniqueActivesAggregates(logged_activity, day_index));
AdvanceClock(kDay);
}
}
// Checks that UniqueActivesObservations with the expected values are
// generated by the the scheduled Observation generation when some events
// have been logged for a UNIQUE_N_DAY_ACTIVES.
// (based on UniqueActivesNoiseFreeEventAggregatorTest::CheckObservationValuesMultiDay)
//
// Logging pattern:
// Logs events for the EventsOccurred_UniqueDevices report (whose parent
// metric has max_event_code = 4) for event codes 1 through 4.
//
// Expected number of Observations:
// The call to GenerateObservations should generate a number of Observations
// equal to the daily_num_obs field of
// |logger::testing::unique_actives_noise_free::kExpectedAggregationParams|.
//
// Expected Observation values:
// The EventsOccurred_UniqueDevices report has window sizes 1 and 7, and
// the expected activity indicators of Observations for that report are:
//
// (window size) active for event codes
// ------------------------------------------------------
// (1) 1, 2, 3, 4
// (7) 1, 2, 3, 4
//
// TODO(ninai): remove duplicate test when the duplicate functions are removed.
TEST_F(UniqueActivesNoiseFreeEventAggregatorTest, Run) {
auto day_index = CurrentDayIndex();
// Form expected Observations for the 1 day of logging.
ExpectedUniqueActivesObservations expected_obs = MakeNullExpectedUniqueActivesObservations(
logger::testing::unique_actives_noise_free::kExpectedAggregationParams, day_index);
const auto& expected_id =
logger::testing::unique_actives_noise_free::kEventsOccurredMetricReportId;
expected_obs[{expected_id, day_index}] = {{1, {false, true, true, true, true}},
{7, {false, true, true, true, true}}};
event_aggregator_mgr_->Start(std::move(test_clock_));
// Trigger the initial call to DoScheduledTasks and wait for it to have completed.
TriggerAndWaitForDoScheduledTasks();
// Generate some events.
for (uint32_t event_code = 1;
event_code <
logger::testing::unique_actives_noise_free::kExpectedAggregationParams.num_event_codes.at(
expected_id);
event_code++) {
EXPECT_EQ(kOK, AddUniqueActivesEvent(expected_id, day_index, event_code));
}
// Advance |test_clock_| by 1 day.
AdvanceClock(kDay);
// Clear the FakeObservationStore.
ResetObservationStore();
// Trigger another iteration of the call to DoScheduledTasks and wait for it to have completed.
TriggerAndWaitForDoScheduledTasks();
// Check the generated Observations against the expectation.
EXPECT_TRUE(CheckUniqueActivesObservations(expected_obs, observation_store_.get(),
update_recipient_.get()));
}
// Tests that the LocalAggregateStore is updated as expected when
// EventAggregator::AddPerDeviceCountEvent() is called with valid arguments;
// i.e., with a report ID associated to an existing key of the
// LocalAggregateStore, and with an EventRecord which wraps a CountEvent.
//
// Logs some valid events each day for 35 days, checking the contents of the
// LocalAggregateStore each day.
TEST_F(PerDeviceNumericEventAggregatorTest, LogEvents) {
LoggedValues logged_values;
std::vector<MetricReportId> count_metric_report_ids = {
logger::testing::per_device_numeric_stats::kSettingsChangedWindowSizeMetricReportId,
logger::testing::per_device_numeric_stats::kSettingsChangedAggregationWindowMetricReportId,
logger::testing::per_device_numeric_stats::kConnectionFailuresMetricReportId};
std::vector<MetricReportId> elapsed_time_metric_report_ids = {
logger::testing::per_device_numeric_stats::kStreamingTimeTotalMetricReportId,
logger::testing::per_device_numeric_stats::kStreamingTimeMinMetricReportId,
logger::testing::per_device_numeric_stats::kStreamingTimeMaxMetricReportId};
MetricReportId frame_rate_metric_report_id =
logger::testing::per_device_numeric_stats::kLoginModuleFrameRateMinMetricReportId;
MetricReportId memory_usage_metric_report_id =
logger::testing::per_device_numeric_stats::kLedgerMemoryUsageMaxMetricReportId;
uint32_t num_days = 35;
for (uint32_t offset = 0; offset < num_days; offset++) {
auto day_index = CurrentDayIndex();
for (const auto& id : count_metric_report_ids) {
for (const auto& component : {"component_A", "component_B", "component_C"}) {
// Adds 2 events to the local aggregations with event code 0, for each component A, B, C.
EXPECT_EQ(kOK, AddPerDeviceCountEvent(id, day_index, component, 0u, 2, &logged_values));
EXPECT_EQ(kOK, AddPerDeviceCountEvent(id, day_index, component, 0u, 3, &logged_values));
}
if (offset < 3) {
// Adds 1 event to the local aggregations for component D and event code 1.
EXPECT_EQ(kOK, AddPerDeviceCountEvent(id, day_index, "component_D", 1u, 4, &logged_values));
}
}
for (const auto& id : elapsed_time_metric_report_ids) {
for (const auto& component : {"component_A", "component_B", "component_C"}) {
// Adds 2 events to the local aggregations with event code 0, for each component A, B, C.
EXPECT_EQ(kOK,
AddPerDeviceElapsedTimeEvent(id, day_index, component, 0u, 2, &logged_values));
EXPECT_EQ(kOK,
AddPerDeviceElapsedTimeEvent(id, day_index, component, 0u, 3, &logged_values));
}
if (offset < 3) {
// Adds 1 event to the local aggregations for component D and event code 1.
EXPECT_EQ(
kOK, AddPerDeviceElapsedTimeEvent(id, day_index, "component_D", 1u, 4, &logged_values));
}
}
for (const auto& component : {"component_A", "component_B"}) {
// Adds some events to the local aggregations for a FRAME_RATE metric with a
// PerDeviceNumericStats report.
EXPECT_EQ(kOK, AddPerDeviceFrameRateEvent(frame_rate_metric_report_id, day_index, component,
0u, 2.25, &logged_values));
EXPECT_EQ(kOK, AddPerDeviceFrameRateEvent(frame_rate_metric_report_id, day_index, component,
0u, 1.75, &logged_values));
// Adds some events to the local aggregations for a MEMORY_USAGE metric with a
// PerDeviceNumericStats report.
EXPECT_EQ(kOK,
AddPerDeviceMemoryUsageEvent(memory_usage_metric_report_id, day_index, component,
std::vector<uint32_t>{0u, 0u}, 300, &logged_values));
EXPECT_EQ(kOK,
AddPerDeviceMemoryUsageEvent(memory_usage_metric_report_id, day_index, component,
std::vector<uint32_t>{1u, 0u}, 300, &logged_values));
}
EXPECT_TRUE(CheckPerDeviceNumericAggregates(logged_values, day_index));
AdvanceClock(kDay);
}
}
// Tests that the LocalAggregateStore is updated as expected when
// EventAggregator::AddPerDeviceCountEvent() is called with valid arguments;
// i.e., with a report ID associated to an existing key of the
// LocalAggregateStore, and with an EventRecord which wraps a CountEvent.
//
// Logs some valid events each day for 35 days, checking the contents of the
// LocalAggregateStore each day.
TEST_F(PerDeviceHistogramEventAggregatorTest, LogEvents) {
LoggedValues logged_values;
auto event_count_id = logger::testing::per_device_histogram::kSettingsChangedMetricReportId;
auto elapsed_time_id = logger::testing::per_device_histogram::kStreamingTimeTotalMetricReportId;
auto frame_rate_id =
logger::testing::per_device_histogram::kLoginModuleFrameRateMinMetricReportId;
auto memory_usage_id = logger::testing::per_device_histogram::kLedgerMemoryUsageMaxMetricReportId;
uint32_t num_days = 35;
for (uint32_t offset = 0; offset < num_days; offset++) {
auto day_index = CurrentDayIndex();
for (const auto& component : {"component_A", "component_B", "component_C"}) {
// Adds 2 events to the local aggregations with event code 0, for each component A, B, C.
EXPECT_EQ(
kOK, AddPerDeviceCountEvent(event_count_id, day_index, component, 0u, 2, &logged_values));
EXPECT_EQ(
kOK, AddPerDeviceCountEvent(event_count_id, day_index, component, 0u, 3, &logged_values));
}
if (offset < 3) {
// Adds 1 event to the local aggregations for component D and event code 1.
EXPECT_EQ(kOK, AddPerDeviceCountEvent(event_count_id, day_index, "component_D", 1u, 4,
&logged_values));
}
for (const auto& component : {"component_A", "component_B", "component_C"}) {
// Adds 2 events to the local aggregations with event code 0, for each component A, B, C.
EXPECT_EQ(kOK, AddPerDeviceElapsedTimeEvent(elapsed_time_id, day_index, component, 0u, 2,
&logged_values));
EXPECT_EQ(kOK, AddPerDeviceElapsedTimeEvent(elapsed_time_id, day_index, component, 0u, 3,
&logged_values));
}
if (offset < 3) {
// Adds 1 event to the local aggregations for component D and event code 1.
EXPECT_EQ(kOK, AddPerDeviceElapsedTimeEvent(elapsed_time_id, day_index, "component_D", 1u, 4,
&logged_values));
}
for (const auto& component : {"component_A", "component_B"}) {
// Adds some events to the local aggregations for a FRAME_RATE metric with a
// PerDeviceHistogram report.
EXPECT_EQ(kOK, AddPerDeviceFrameRateEvent(frame_rate_id, day_index, component, 0u, 2.25,
&logged_values));
EXPECT_EQ(kOK, AddPerDeviceFrameRateEvent(frame_rate_id, day_index, component, 0u, 1.75,
&logged_values));
// Adds some events to the local aggregations for a MEMORY_USAGE metric with a
// PerDeviceHistogram report.
EXPECT_EQ(kOK,
AddPerDeviceMemoryUsageEvent(memory_usage_id, day_index, component,
std::vector<uint32_t>{0u, 0u}, 300, &logged_values));
EXPECT_EQ(kOK,
AddPerDeviceMemoryUsageEvent(memory_usage_id, day_index, component,
std::vector<uint32_t>{1u, 0u}, 300, &logged_values));
}
EXPECT_TRUE(CheckPerDeviceNumericAggregates(logged_values, day_index));
AdvanceClock(kDay);
}
}
// Starts the worker thread, and destructs the EventAggregator without
// explicitly shutting down the worker thread. Checks that the shutdown flag
// and worker thread are in the expected states before and after the thread is
// started.
//
// TODO(ninai): remove duplicate test when the duplicate functions are removed.
TEST_F(EventAggregatorWorkerTest, StartWorkerThread) {
EXPECT_TRUE(in_shutdown_state());
event_aggregator_mgr_->Start(std::move(test_clock_));
EXPECT_TRUE(in_run_state());
}
// Starts the worker thread, shuts down the worker thread, and destructs the
// EventAggregator. Checks that the shutdown flag and worker thread are in the
// expected states.
//
// TODO(ninai): remove duplicate test when the duplicate functions are removed.
TEST_F(EventAggregatorWorkerTest, StartAndShutDownWorkerThread) {
EXPECT_TRUE(in_shutdown_state());
event_aggregator_mgr_->Start(std::move(test_clock_));
EXPECT_TRUE(in_run_state());
ShutDownWorkerThread();
EXPECT_TRUE(in_shutdown_state());
}
// Starts the worker thread and immediately shuts it down. Checks that the
// LocalAggregateStore was backed up during shutdown.
//
// TODO(ninai): remove duplicate test when the duplicate functions are removed.
TEST_F(EventAggregatorWorkerTest, BackUpBeforeShutdown) {
event_aggregator_mgr_->Start(std::move(test_clock_));
ShutDownWorkerThread();
EXPECT_EQ(1, local_aggregate_proto_store_->write_count_);
}
// Starts the worker thread and calls
// EventAggregator::UpdateAggregationConfigs() on the main thread.
//
// TODO(ninai): remove duplicate test when the duplicate functions are removed.
TEST_F(EventAggregatorWorkerTest, UpdateAggregationConfigs) {
event_aggregator_mgr_->Start(std::move(test_clock_));
// Provide the EventAggregator with the all_report_types registry.
auto project_context = GetTestProject(logger::testing::all_report_types::kCobaltRegistryBase64);
EXPECT_EQ(
kOK, event_aggregator_mgr_->GetEventAggregator()->UpdateAggregationConfigs(*project_context));
// Check that the number of key-value pairs in the LocalAggregateStore is
// now equal to the number of locally aggregated reports in the
// all_report_types registry.
EXPECT_EQ(logger::testing::all_report_types::kExpectedAggregationParams.metric_report_ids.size(),
CopyLocalAggregateStore().by_report_key().size());
}
// Starts the worker thread, provides a ProjectContext, logs some events, and
// shuts down the worker thread. Checks that the LocalAggregateStore was
// backed up at least once during the lifetime of the worker thread.
//
// TODO(ninai): remove duplicate test when the duplicate functions are removed.
TEST_F(EventAggregatorWorkerTest, LogEvents) {
auto day_index = CurrentDayIndex();
event_aggregator_mgr_->Start(std::move(test_clock_));
// Provide the EventAggregator with the all_report_types registry.
std::shared_ptr<ProjectContext> project_context =
GetTestProject(logger::testing::all_report_types::kCobaltRegistryBase64);
EXPECT_EQ(
kOK, event_aggregator_mgr_->GetEventAggregator()->UpdateAggregationConfigs(*project_context));
// // Adds some events to the local aggregations.
LoggedActivity logged_activity;
EXPECT_EQ(kOK, AddUniqueActivesEvent(
project_context, logger::testing::all_report_types::kDeviceBootsMetricReportId,
day_index, 0u, &logged_activity));
EXPECT_EQ(kOK,
AddUniqueActivesEvent(project_context,
logger::testing::all_report_types::kFeaturesActiveMetricReportId,
day_index, 4u, &logged_activity));
EXPECT_EQ(kOK,
AddUniqueActivesEvent(project_context,
logger::testing::all_report_types::kEventsOccurredMetricReportId,
day_index, 1u, &logged_activity));
EXPECT_TRUE(CheckUniqueActivesAggregates(logged_activity, day_index));
ShutDownWorkerThread();
EXPECT_GE(local_aggregate_proto_store_->write_count_, 1);
}
} // namespace cobalt::local_aggregation