blob: ecd07c108fe6469b0d7d597287f64a77c4aae7b3 [file] [log] [blame]
// Copyright 2019 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "src/local_aggregation/event_aggregator_mgr.h"
#include <gtest/gtest.h>
#include "src/lib/util/clock.h"
#include "src/lib/util/datetime_util.h"
#include "src/lib/util/proto_util.h"
#include "src/lib/util/testing/test_with_files.h"
#include "src/local_aggregation/test_utils/test_event_aggregator_mgr.h"
#include "src/logger/testing_constants.h"
#include "src/pb/event.pb.h"
#include "src/registry/packed_event_codes.h"
#include "src/registry/project_configs.h"
namespace cobalt::local_aggregation {
using logger::Encoder;
using logger::EventRecord;
using logger::kOK;
using logger::MetricReportId;
using logger::ObservationWriter;
using logger::ProjectContext;
using logger::Status;
using logger::testing::ExpectedUniqueActivesObservations;
using logger::testing::FakeObservationStore;
using logger::testing::GetTestProject;
using logger::testing::MakeAggregationKey;
using logger::testing::MakeNullExpectedUniqueActivesObservations;
using logger::testing::TestUpdateRecipient;
using logger::testing::all_report_types::kCobaltRegistryBase64;
using logger::testing::all_report_types::kDeviceBootsMetricReportId;
using logger::testing::all_report_types::kEventsOccurredMetricReportId;
using logger::testing::all_report_types::kFeaturesActiveMetricReportId;
using system_data::ClientSecret;
using system_data::SystemDataInterface;
using testing::AssertionFailure;
using testing::AssertionResult;
using testing::AssertionSuccess;
using util::EncryptedMessageMaker;
using util::IncrementingSteadyClock;
using util::IncrementingSystemClock;
using util::SerializeToBase64;
using util::TimeToDayIndex;
// Number of seconds in a day
constexpr int kSecondsInOneDay = 60 * 60 * 24;
// Number of seconds in an ideal year
constexpr int kYear = kSecondsInOneDay * 365;
class EventAggregatorManagerTest : public util::testing::TestWithFiles {
protected:
void SetUp() override {
MakeTestFolder();
observation_store_ = std::make_unique<FakeObservationStore>();
update_recipient_ = std::make_unique<TestUpdateRecipient>();
observation_encrypter_ = EncryptedMessageMaker::MakeUnencrypted();
observation_writer_ = std::make_unique<ObservationWriter>(
observation_store_.get(), update_recipient_.get(), observation_encrypter_.get());
metadata_builder_ =
std::make_unique<MetadataBuilder>(system_data_.get(), system_data_cache_path(), fs());
encoder_ =
std::make_unique<Encoder>(ClientSecret::GenerateNewSecret(), metadata_builder_.get());
}
std::unique_ptr<IncrementingSystemClock> GetTestSystemClock() {
auto test_clock =
std::make_unique<IncrementingSystemClock>(std::chrono::system_clock::duration(0));
test_clock->set_time(std::chrono::system_clock::time_point(std::chrono::seconds(10 * kYear)));
test_system_clock_ptr_ = test_clock.get();
return test_clock;
}
IncrementingSteadyClock* GetTestSteadyClock() {
test_steady_clock_ptr_ = new IncrementingSteadyClock(std::chrono::system_clock::duration(0));
return test_steady_clock_ptr_;
}
std::unique_ptr<TestEventAggregatorManager> GetEventAggregatorManager(
IncrementingSteadyClock* steady_clock) {
CobaltConfig cfg = {.client_secret = system_data::ClientSecret::GenerateNewSecret()};
cfg.local_aggregation_backfill_days = 0;
cfg.local_aggregate_proto_store_path = aggregate_store_path();
cfg.obs_history_proto_store_path = obs_history_path();
auto event_aggregator_mgr = std::make_unique<TestEventAggregatorManager>(
cfg, &fs_, encoder_.get(), observation_writer_.get());
event_aggregator_mgr->SetSteadyClock(steady_clock);
return event_aggregator_mgr;
}
static void ShutDown(EventAggregatorManager* event_aggregator_mgr) {
event_aggregator_mgr->ShutDown();
}
static bool IsInShutdownState(EventAggregatorManager* event_aggregator_mgr) {
return (IsShutdownFlagSet(event_aggregator_mgr) && !IsWorkerJoinable(event_aggregator_mgr));
}
static bool IsInRunState(EventAggregatorManager* event_aggregator_mgr) {
return (!IsShutdownFlagSet(event_aggregator_mgr) && IsWorkerJoinable(event_aggregator_mgr));
}
static bool IsShutdownFlagSet(EventAggregatorManager* event_aggregator_mgr) {
return event_aggregator_mgr->protected_worker_thread_controller_.const_lock()->shut_down;
}
static bool IsWorkerJoinable(EventAggregatorManager* event_aggregator_mgr) {
return event_aggregator_mgr->worker_thread_.joinable();
}
// Returns the day index of the current day according to |test_clock_|, in
// |time_zone|, without incrementing the clock.
uint32_t CurrentDayIndex(MetricDefinition::TimeZonePolicy time_zone = MetricDefinition::UTC) {
return TimeToDayIndex(std::chrono::system_clock::to_time_t(test_system_clock_ptr_->peek_now()),
time_zone);
}
bool BackUpHappened() { return fs_.FileExists(aggregate_store_path()); }
static uint32_t NumberOfKVPairsInStore(EventAggregatorManager* event_aggregator_mgr) {
return event_aggregator_mgr->aggregate_store_->CopyLocalAggregateStore().by_report_key().size();
}
// Given a ProjectContext |project_context| and the MetricReportId of a UNIQUE_N_DAY_ACTIVES
// report in |project_context|, as well as a day index and an event code, adds an
// EventOccurredEvent to the EventAggregator's AggregateStore for that report, day index, and
// event code. If a non-null LoggedActivity map is provided, updates the map with information
// about the logged Event.
static Status AddUniqueActivesEvent(EventAggregatorManager* event_aggregator_mgr,
std::shared_ptr<const ProjectContext> project_context,
const MetricReportId& metric_report_id, uint32_t day_index,
uint32_t event_code) {
EventRecord event_record(std::move(project_context), metric_report_id.first);
event_record.event()->set_day_index(day_index);
event_record.event()->mutable_event_occurred_event()->set_event_code(event_code);
return event_aggregator_mgr->GetEventAggregator()->AddUniqueActivesEvent(
metric_report_id.second, event_record);
}
static uint32_t GetNumberOfUniqueActivesAggregates(EventAggregatorManager* event_aggregator_mgr) {
auto local_aggregate_store = event_aggregator_mgr->aggregate_store_->CopyLocalAggregateStore();
uint32_t num_aggregates = 0;
for (const auto& [report_key, aggregates] : local_aggregate_store.by_report_key()) {
if (aggregates.type_case() != ReportAggregates::kUniqueActivesAggregates) {
continue;
}
for (const auto& [event_code, event_code_aggregates] :
aggregates.unique_actives_aggregates().by_event_code()) {
num_aggregates += event_code_aggregates.by_day_index_size();
}
}
return num_aggregates;
}
static AssertionResult ContainsValidUniqueActivesAggregate(
EventAggregatorManager* event_aggregator_mgr,
const std::shared_ptr<const ProjectContext>& project_context,
const MetricReportId& metric_report_id, uint32_t day_index, uint32_t event_code) {
auto local_aggregate_store = event_aggregator_mgr->aggregate_store_->CopyLocalAggregateStore();
std::string key;
if (!SerializeToBase64(MakeAggregationKey(*project_context, metric_report_id), &key)) {
return AssertionFailure() << "Could not serialize key with metric id "
<< metric_report_id.first << " and report id "
<< metric_report_id.second;
}
auto report_aggregates = local_aggregate_store.by_report_key().find(key);
if (report_aggregates == local_aggregate_store.by_report_key().end()) {
return AssertionFailure() << "No report aggregates for key with metric id "
<< metric_report_id.first << " and report id "
<< metric_report_id.second;
}
if (report_aggregates->second.type_case() != ReportAggregates::kUniqueActivesAggregates) {
return AssertionFailure() << "Report aggregates for key with metric id "
<< metric_report_id.first << " and report id "
<< metric_report_id.second << " are not of type unique actives.";
}
auto event_code_aggregates =
report_aggregates->second.unique_actives_aggregates().by_event_code().find(event_code);
if (event_code_aggregates ==
report_aggregates->second.unique_actives_aggregates().by_event_code().end()) {
return AssertionFailure() << "No report aggregates for key with metric id "
<< metric_report_id.first << " and report id "
<< metric_report_id.second << " with event code " << event_code;
}
auto day_aggregate = event_code_aggregates->second.by_day_index().find(day_index);
if (day_aggregate == event_code_aggregates->second.by_day_index().end()) {
return AssertionFailure() << "No report aggregates for key with metric id "
<< metric_report_id.first << " and report id "
<< metric_report_id.second << " and event code " << event_code
<< " with day index " << day_index;
}
return AssertionSuccess();
}
static void TriggerAndWaitForDoScheduledTasks(EventAggregatorManager* event_aggregator_mgr) {
{
// Acquire the lock to manually trigger the scheduled tasks.
auto locked = event_aggregator_mgr->protected_worker_thread_controller_.lock();
locked->immediate_run_trigger = true;
locked->wakeup_notifier.notify_all();
}
while (true) {
// Reacquire the lock to make sure that the scheduled tasks have completed.
auto locked = event_aggregator_mgr->protected_worker_thread_controller_.lock();
if (!locked->immediate_run_trigger) {
break;
}
std::this_thread::yield();
}
}
// Clears the FakeObservationStore and resets the counts of Observations
// received by the FakeObservationStore and the TestUpdateRecipient.
void ResetObservationStore() {
observation_store_->messages_received.clear();
observation_store_->metadata_received.clear();
observation_store_->ResetObservationCounter();
update_recipient_->invocation_count = 0;
}
void AdvanceClock(int num_seconds) {
test_steady_clock_ptr_->increment_by(std::chrono::seconds(num_seconds));
test_system_clock_ptr_->increment_by(std::chrono::seconds(num_seconds));
}
IncrementingSystemClock* test_system_clock_ptr_; // used for determing CurrentDayIndex
IncrementingSteadyClock* test_steady_clock_ptr_; // used for scheduling events.
std::string test_folder_;
util::PosixFileSystem fs_;
std::unique_ptr<ObservationWriter> observation_writer_;
std::unique_ptr<MetadataBuilder> metadata_builder_;
std::unique_ptr<Encoder> encoder_;
std::unique_ptr<EncryptedMessageMaker> observation_encrypter_;
std::unique_ptr<TestUpdateRecipient> update_recipient_;
std::unique_ptr<FakeObservationStore> observation_store_;
std::unique_ptr<SystemDataInterface> system_data_;
};
TEST_F(EventAggregatorManagerTest, StartWorkerThread) {
auto event_aggregator_mgr = GetEventAggregatorManager(GetTestSteadyClock());
EXPECT_TRUE(IsInShutdownState(event_aggregator_mgr.get()));
event_aggregator_mgr->Start(GetTestSystemClock());
EXPECT_TRUE(IsInRunState(event_aggregator_mgr.get()));
}
TEST_F(EventAggregatorManagerTest, StartAndShutDownWorkerThread) {
auto event_aggregator_mgr = GetEventAggregatorManager(GetTestSteadyClock());
EXPECT_TRUE(IsInShutdownState(event_aggregator_mgr.get()));
event_aggregator_mgr->Start(GetTestSystemClock());
EXPECT_TRUE(IsInRunState(event_aggregator_mgr.get()));
ShutDown(event_aggregator_mgr.get());
EXPECT_TRUE(IsInShutdownState(event_aggregator_mgr.get()));
}
TEST_F(EventAggregatorManagerTest, BackUpBeforeShutdown) {
auto event_aggregator_mgr = GetEventAggregatorManager(GetTestSteadyClock());
event_aggregator_mgr->Start(GetTestSystemClock());
ShutDown(event_aggregator_mgr.get());
EXPECT_TRUE(BackUpHappened());
}
// Starts the worker thread and calls
// EventAggregator::UpdateAggregationConfigs() on the main thread.
TEST_F(EventAggregatorManagerTest, UpdateAggregationConfigs) {
auto event_aggregator_mgr = GetEventAggregatorManager(GetTestSteadyClock());
event_aggregator_mgr->Start(GetTestSystemClock());
// Provide the EventAggregator with the all_report_types registry.
auto project_context = GetTestProject(kCobaltRegistryBase64);
EXPECT_EQ(kOK,
event_aggregator_mgr->GetEventAggregator()->UpdateAggregationConfigs(*project_context));
// Check that the number of key-value pairs in the LocalAggregateStore is
// now equal to the number of locally aggregated reports in the
// all_report_types registry.
EXPECT_EQ(logger::testing::all_report_types::kExpectedAggregationParams.metric_report_ids.size(),
NumberOfKVPairsInStore(event_aggregator_mgr.get()));
}
TEST_F(EventAggregatorManagerTest, LogEventsAfterDelete) {
auto event_aggregator_mgr = GetEventAggregatorManager(GetTestSteadyClock());
event_aggregator_mgr->Start(GetTestSystemClock());
auto day_index = CurrentDayIndex();
// Provide the EventAggregator with the all_report_types registry.
std::shared_ptr<ProjectContext> project_context = GetTestProject(kCobaltRegistryBase64);
EXPECT_EQ(kOK,
event_aggregator_mgr->GetEventAggregator()->UpdateAggregationConfigs(*project_context));
EXPECT_EQ(kOK, AddUniqueActivesEvent(event_aggregator_mgr.get(), project_context,
kDeviceBootsMetricReportId, day_index, /*event_code*/ 0u));
event_aggregator_mgr->DeleteData();
EXPECT_EQ(kOK,
AddUniqueActivesEvent(event_aggregator_mgr.get(), project_context,
kFeaturesActiveMetricReportId, day_index, /*event_code*/ 4u));
EXPECT_EQ(kOK,
AddUniqueActivesEvent(event_aggregator_mgr.get(), project_context,
kEventsOccurredMetricReportId, day_index, /*event_code*/ 1u));
EXPECT_EQ(2, GetNumberOfUniqueActivesAggregates(event_aggregator_mgr.get()));
EXPECT_FALSE(ContainsValidUniqueActivesAggregate(event_aggregator_mgr.get(), project_context,
kDeviceBootsMetricReportId, day_index,
/*event_code*/ 0u));
EXPECT_TRUE(ContainsValidUniqueActivesAggregate(event_aggregator_mgr.get(), project_context,
kFeaturesActiveMetricReportId, day_index,
/*event_code*/ 4u));
EXPECT_TRUE(ContainsValidUniqueActivesAggregate(event_aggregator_mgr.get(), project_context,
kEventsOccurredMetricReportId, day_index,
/*event_code*/ 1u));
ShutDown(event_aggregator_mgr.get());
EXPECT_TRUE(BackUpHappened());
}
TEST_F(EventAggregatorManagerTest, DeleteData) {
auto event_aggregator_mgr = GetEventAggregatorManager(GetTestSteadyClock());
event_aggregator_mgr->Start(GetTestSystemClock());
auto day_index = CurrentDayIndex();
// Provide the EventAggregator with the all_report_types registry.
std::shared_ptr<ProjectContext> project_context = GetTestProject(kCobaltRegistryBase64);
EXPECT_EQ(kOK,
event_aggregator_mgr->GetEventAggregator()->UpdateAggregationConfigs(*project_context));
EXPECT_EQ(kOK, AddUniqueActivesEvent(event_aggregator_mgr.get(), project_context,
kDeviceBootsMetricReportId, day_index, /*event_code*/ 0u));
EXPECT_EQ(kOK,
AddUniqueActivesEvent(event_aggregator_mgr.get(), project_context,
kFeaturesActiveMetricReportId, day_index, /*event_code*/ 4u));
EXPECT_EQ(kOK,
AddUniqueActivesEvent(event_aggregator_mgr.get(), project_context,
kEventsOccurredMetricReportId, day_index, /*event_code*/ 1u));
EXPECT_EQ(3, GetNumberOfUniqueActivesAggregates(event_aggregator_mgr.get()));
event_aggregator_mgr->DeleteData();
EXPECT_EQ(0, GetNumberOfUniqueActivesAggregates(event_aggregator_mgr.get()));
}
TEST_F(EventAggregatorManagerTest, Disable) {
auto event_aggregator_mgr = GetEventAggregatorManager(GetTestSteadyClock());
event_aggregator_mgr->Start(GetTestSystemClock());
auto day_index = CurrentDayIndex();
// Provide the EventAggregator with the all_report_types registry.
std::shared_ptr<ProjectContext> project_context = GetTestProject(kCobaltRegistryBase64);
EXPECT_EQ(kOK,
event_aggregator_mgr->GetEventAggregator()->UpdateAggregationConfigs(*project_context));
event_aggregator_mgr->Disable(true);
EXPECT_EQ(kOK, AddUniqueActivesEvent(event_aggregator_mgr.get(), project_context,
kDeviceBootsMetricReportId, day_index, /*event_code*/ 0u));
EXPECT_EQ(kOK,
AddUniqueActivesEvent(event_aggregator_mgr.get(), project_context,
kFeaturesActiveMetricReportId, day_index, /*event_code*/ 4u));
EXPECT_EQ(kOK,
AddUniqueActivesEvent(event_aggregator_mgr.get(), project_context,
kEventsOccurredMetricReportId, day_index, /*event_code*/ 1u));
EXPECT_EQ(0, GetNumberOfUniqueActivesAggregates(event_aggregator_mgr.get()));
event_aggregator_mgr->Disable(false);
EXPECT_EQ(kOK, AddUniqueActivesEvent(event_aggregator_mgr.get(), project_context,
kDeviceBootsMetricReportId, day_index, /*event_code*/ 0u));
EXPECT_EQ(kOK,
AddUniqueActivesEvent(event_aggregator_mgr.get(), project_context,
kFeaturesActiveMetricReportId, day_index, /*event_code*/ 4u));
EXPECT_EQ(kOK,
AddUniqueActivesEvent(event_aggregator_mgr.get(), project_context,
kEventsOccurredMetricReportId, day_index, /*event_code*/ 1u));
EXPECT_EQ(3, GetNumberOfUniqueActivesAggregates(event_aggregator_mgr.get()));
}
TEST_F(EventAggregatorManagerTest, LogEvents) {
auto event_aggregator_mgr = GetEventAggregatorManager(GetTestSteadyClock());
event_aggregator_mgr->Start(GetTestSystemClock());
auto day_index = CurrentDayIndex();
// Provide the EventAggregator with the all_report_types registry.
std::shared_ptr<ProjectContext> project_context = GetTestProject(kCobaltRegistryBase64);
EXPECT_EQ(kOK,
event_aggregator_mgr->GetEventAggregator()->UpdateAggregationConfigs(*project_context));
EXPECT_EQ(kOK, AddUniqueActivesEvent(event_aggregator_mgr.get(), project_context,
kDeviceBootsMetricReportId, day_index, /*event_code*/ 0u));
EXPECT_EQ(kOK,
AddUniqueActivesEvent(event_aggregator_mgr.get(), project_context,
kFeaturesActiveMetricReportId, day_index, /*event_code*/ 4u));
EXPECT_EQ(kOK,
AddUniqueActivesEvent(event_aggregator_mgr.get(), project_context,
kEventsOccurredMetricReportId, day_index, /*event_code*/ 1u));
EXPECT_EQ(3, GetNumberOfUniqueActivesAggregates(event_aggregator_mgr.get()));
EXPECT_TRUE(ContainsValidUniqueActivesAggregate(event_aggregator_mgr.get(), project_context,
kDeviceBootsMetricReportId, day_index,
/*event_code*/ 0u));
EXPECT_TRUE(ContainsValidUniqueActivesAggregate(event_aggregator_mgr.get(), project_context,
kFeaturesActiveMetricReportId, day_index,
/*event_code*/ 4u));
EXPECT_TRUE(ContainsValidUniqueActivesAggregate(event_aggregator_mgr.get(), project_context,
kEventsOccurredMetricReportId, day_index,
/*event_code*/ 1u));
ShutDown(event_aggregator_mgr.get());
EXPECT_TRUE(BackUpHappened());
}
// Checks that UniqueActivesObservations with the expected values are
// generated by the the scheduled Observation generation when some events
// have been logged for a UNIQUE_N_DAY_ACTIVES.
// (based on UniqueActivesNoiseFreeEventAggregatorTest::CheckObservationValuesMultiDay)
//
// Logging pattern:
// Logs events for the EventsOccurred_UniqueDevices report (whose parent
// metric has max_event_code = 4) for event codes 1 through 4.
//
// Expected number of Observations:
// The call to GenerateObservations should generate a number of Observations
// equal to the daily_num_obs field of
// |logger::testing::unique_actives_noise_free::kExpectedAggregationParams|.
//
// Expected Observation values:
// The EventsOccurred_UniqueDevices report has window sizes 1 and 7, and
// the expected activity indicators of Observations for that report are:
//
// (window size) active for event codes
// ------------------------------------------------------
// (1) 1, 2, 3, 4
// (7) 1, 2, 3, 4
TEST_F(EventAggregatorManagerTest, Run) {
auto event_aggregator_mgr = GetEventAggregatorManager(GetTestSteadyClock());
event_aggregator_mgr->Start(GetTestSystemClock());
auto day_index = CurrentDayIndex();
std::shared_ptr<const ProjectContext> project_context =
GetTestProject(logger::testing::unique_actives_noise_free::kCobaltRegistryBase64);
// Form expected Observations for the 1 day of logging.
auto expected_obs = MakeNullExpectedUniqueActivesObservations(
logger::testing::unique_actives_noise_free::kExpectedAggregationParams, day_index);
const auto& expected_id =
logger::testing::unique_actives_noise_free::kEventsOccurredMetricReportId;
expected_obs[{expected_id, day_index}] = {{1, {false, true, true, true, true}},
{7, {false, true, true, true, true}}};
TriggerAndWaitForDoScheduledTasks(event_aggregator_mgr.get());
EXPECT_EQ(kOK,
event_aggregator_mgr->GetEventAggregator()->UpdateAggregationConfigs(*project_context));
EXPECT_EQ(kOK, AddUniqueActivesEvent(event_aggregator_mgr.get(), project_context, expected_id,
day_index, /*event_code*/ 1));
EXPECT_EQ(kOK, AddUniqueActivesEvent(event_aggregator_mgr.get(), project_context, expected_id,
day_index, /*event_code*/ 2));
EXPECT_EQ(kOK, AddUniqueActivesEvent(event_aggregator_mgr.get(), project_context, expected_id,
day_index, /*event_code*/ 3));
EXPECT_EQ(kOK, AddUniqueActivesEvent(event_aggregator_mgr.get(), project_context, expected_id,
day_index, /*event_code*/ 4));
AdvanceClock(kSecondsInOneDay);
ResetObservationStore();
TriggerAndWaitForDoScheduledTasks(event_aggregator_mgr.get());
EXPECT_TRUE(CheckUniqueActivesObservations(expected_obs, observation_store_.get(),
update_recipient_.get()));
}
} // namespace cobalt::local_aggregation