blob: 1f6ee10359f103dde34f307685cbff86e956d87b [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "src/local_aggregation_1_1/aggregation_procedures/select_first_aggregation_procedure.h"
#include <memory>
#include <gtest/gtest.h>
#include "src/lib/util/datetime_util.h"
#include "src/local_aggregation_1_1/aggregation_procedures/aggregation_procedure.h"
#include "src/local_aggregation_1_1/aggregation_procedures/testing/test_aggregation_procedure.h"
#include "src/local_aggregation_1_1/testing/test_registry.cb.h"
#include "src/logger/project_context.h"
#include "src/logger/project_context_factory.h"
#include "src/registry/report_definition.pb.h"
namespace cobalt::local_aggregation {
class SelectFirstAggregationProcedureTest : public testing::TestAggregationProcedure {
protected:
std::unique_ptr<logger::EventRecord> GetEventRecord(uint32_t metric_id) {
return logger::EventRecord::MakeEventRecord(GetProjectContext(), metric_id).ValueOrDie();
}
std::unique_ptr<SelectFirstAggregationProcedure> GetProcedure(uint32_t metric_id,
int report_index) {
return std::make_unique<SelectFirstAggregationProcedure>(GetMetricDef(metric_id),
GetReportDef(metric_id, report_index));
}
};
TEST_F(SelectFirstAggregationProcedureTest, UpdateAggregate1DayReport) {
uint32_t metric_id = kOccurrenceMetricMetricId;
int report_index = kOccurrenceMetricUniqueDeviceCountsSelectFirstReport1DayReportIndex;
std::unique_ptr<AggregationProcedure> procedure = GetProcedure(metric_id, report_index);
const uint32_t kDayIndex = 10000;
const uint64_t system_profile_hash = uint64_t{999};
std::unique_ptr<logger::EventRecord> record = GetEventRecord(metric_id);
record->event()->set_day_index(kDayIndex);
uint32_t kNumEventCodes = 2;
ReportAggregate aggregate;
// Add events for 2 different event vectors: {1} and {2}.
AddOccurrenceEventsForDay(kNumEventCodes, kDayIndex, system_profile_hash, procedure.get(),
&aggregate);
// Check that |aggregate| was updated for the first event vector but not the second.
ASSERT_TRUE(aggregate.daily().by_day_index().contains(kDayIndex));
ASSERT_EQ(aggregate.daily().by_day_index().at(kDayIndex).system_profile_aggregates_size(), 1u);
const SystemProfileAggregate &system_profile_agg =
aggregate.daily().by_day_index().at(kDayIndex).system_profile_aggregates(0);
EXPECT_EQ(system_profile_agg.system_profile_hash(), system_profile_hash);
ASSERT_EQ(system_profile_agg.by_event_code(0).event_codes(0), 1);
ASSERT_TRUE(system_profile_agg.by_event_code(0).data().at_least_once().at_least_once());
}
TEST_F(SelectFirstAggregationProcedureTest, MergeAggregateDataBothSet) {
AggregateData data;
data.mutable_at_least_once()->set_at_least_once(true);
data.mutable_at_least_once()->set_last_day_index(101);
AggregateData merged_data;
merged_data.mutable_at_least_once()->set_at_least_once(true);
merged_data.mutable_at_least_once()->set_last_day_index(100);
std::unique_ptr<SelectFirstAggregationProcedure> procedure =
GetProcedure(kOccurrenceMetricMetricId,
kOccurrenceMetricUniqueDeviceCountsSelectFirstReport1DayReportIndex);
procedure->MergeAggregateData(&merged_data, data);
EXPECT_TRUE(merged_data.at_least_once().at_least_once());
EXPECT_EQ(merged_data.at_least_once().last_day_index(), 101);
}
TEST_F(SelectFirstAggregationProcedureTest, MergeAggregateDataNeitherSet) {
AggregateData data;
AggregateData merged_data;
std::unique_ptr<SelectFirstAggregationProcedure> procedure =
GetProcedure(kOccurrenceMetricMetricId,
kOccurrenceMetricUniqueDeviceCountsSelectFirstReport1DayReportIndex);
procedure->MergeAggregateData(&merged_data, data);
EXPECT_FALSE(merged_data.has_at_least_once());
EXPECT_FALSE(merged_data.at_least_once().at_least_once());
EXPECT_EQ(merged_data.at_least_once().last_day_index(), 0);
}
TEST_F(SelectFirstAggregationProcedureTest, MergeAggregateDataFromSet) {
AggregateData data;
data.mutable_at_least_once()->set_at_least_once(true);
data.mutable_at_least_once()->set_last_day_index(101);
AggregateData merged_data;
std::unique_ptr<SelectFirstAggregationProcedure> procedure =
GetProcedure(kOccurrenceMetricMetricId,
kOccurrenceMetricUniqueDeviceCountsSelectFirstReport1DayReportIndex);
procedure->MergeAggregateData(&merged_data, data);
EXPECT_TRUE(merged_data.at_least_once().at_least_once());
EXPECT_EQ(merged_data.at_least_once().last_day_index(), 101);
}
TEST_F(SelectFirstAggregationProcedureTest, MergeAggregateDataToSet) {
AggregateData data;
AggregateData merged_data;
merged_data.mutable_at_least_once()->set_at_least_once(true);
merged_data.mutable_at_least_once()->set_last_day_index(100);
std::unique_ptr<SelectFirstAggregationProcedure> procedure =
GetProcedure(kOccurrenceMetricMetricId,
kOccurrenceMetricUniqueDeviceCountsSelectFirstReport1DayReportIndex);
procedure->MergeAggregateData(&merged_data, data);
EXPECT_TRUE(merged_data.at_least_once().at_least_once());
EXPECT_EQ(merged_data.at_least_once().last_day_index(), 100);
}
TEST_F(SelectFirstAggregationProcedureTest, GenerateObservation1DayReport) {
uint32_t metric_id = kOccurrenceMetricMetricId;
int report_index = kOccurrenceMetricUniqueDeviceCountsSelectFirstReport1DayReportIndex;
std::unique_ptr<AggregationProcedure> procedure = GetProcedure(metric_id, report_index);
const uint32_t kDayIndex = 10000;
util::TimeInfo time_info;
time_info.day_index = kDayIndex;
const uint64_t system_profile_hash = uint64_t{999};
const uint32_t kNumEventCodes = 2;
ReportAggregate report_aggregate;
// Add events for 2 different event vectors: {1} and {2}.
AddOccurrenceEventsForDay(kNumEventCodes, kDayIndex, system_profile_hash, procedure.get(),
&report_aggregate);
lib::statusor::StatusOr<std::vector<ObservationAndSystemProfile>> observations_or =
procedure->GenerateObservations(time_info, &report_aggregate);
ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
std::vector<ObservationAndSystemProfile> observations = observations_or.ConsumeValueOrDie();
ASSERT_EQ(observations.size(), 1u);
EXPECT_EQ(observations[0].system_profile_hash, system_profile_hash);
// Check that |observation| includes the first event vector but not the second.
ASSERT_TRUE(observations[0].observation->has_integer());
const IntegerObservation &integer_obs = observations[0].observation->integer();
ASSERT_EQ(integer_obs.values_size(), 1u);
ASSERT_EQ(integer_obs.values(0).event_codes(0), 1u);
// Check that obsolete aggregates get cleaned up.
procedure->ObservationsCommitted(&report_aggregate, time_info, system_profile_hash);
EXPECT_FALSE(report_aggregate.daily().by_day_index().contains(kDayIndex));
// Check that calling observation generation the next day generates no observation.
time_info.day_index++;
observations_or = procedure->GenerateObservations(time_info, &report_aggregate);
ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
observations = observations_or.ConsumeValueOrDie();
ASSERT_EQ(observations.size(), 0u);
}
TEST_F(SelectFirstAggregationProcedureTest, GenerateObservation1DayReportExpedited) {
uint32_t metric_id = kExpeditedOccurrenceMetricMetricId;
int report_index = kExpeditedOccurrenceMetricSelectFirst1DayReportIndex;
std::unique_ptr<AggregationProcedure> procedure = GetProcedure(metric_id, report_index);
const uint32_t kDayIndex = 10000;
util::TimeInfo time_info;
time_info.day_index = kDayIndex;
const uint64_t system_profile_hash = uint64_t{999};
const uint32_t kNumEventCodes = 2;
ReportAggregate report_aggregate;
// Add events for 2 different event vectors: {1} and {2}.
AddOccurrenceEventsForDay(kNumEventCodes, kDayIndex, system_profile_hash, procedure.get(),
&report_aggregate);
lib::statusor::StatusOr<std::vector<ObservationAndSystemProfile>> observations_or =
procedure->GenerateObservations(time_info, &report_aggregate);
ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
std::vector<ObservationAndSystemProfile> observations = observations_or.ConsumeValueOrDie();
ASSERT_EQ(observations.size(), 1u);
EXPECT_EQ(observations[0].system_profile_hash, system_profile_hash);
// Check that |observation| includes the first event vector but not the second.
ASSERT_TRUE(observations[0].observation->has_integer());
const IntegerObservation &integer_obs = observations[0].observation->integer();
ASSERT_EQ(integer_obs.values_size(), 1u);
ASSERT_EQ(integer_obs.values(0).event_codes(0), 1u);
// Check that aggregates get marked as sent and don't get cleaned up while they are still needed
// that day.
procedure->ObservationsCommitted(&report_aggregate, time_info, system_profile_hash);
EXPECT_TRUE(report_aggregate.daily().by_day_index().contains(kDayIndex));
ASSERT_EQ(report_aggregate.daily().by_day_index().at(kDayIndex).system_profile_aggregates_size(),
1u);
const SystemProfileAggregate &system_profile_agg =
report_aggregate.daily().by_day_index().at(kDayIndex).system_profile_aggregates(0);
EXPECT_EQ(system_profile_agg.system_profile_hash(), system_profile_hash);
EXPECT_EQ(system_profile_agg.by_event_code(0).data().at_least_once().last_day_index(), kDayIndex);
// Check that calling observation generation again for the same day generates no observation.
observations_or = procedure->GenerateObservations(time_info, &report_aggregate);
ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
observations = observations_or.ConsumeValueOrDie();
ASSERT_EQ(observations.size(), 1u);
EXPECT_EQ(observations[0].system_profile_hash, system_profile_hash);
EXPECT_EQ(observations[0].observation, nullptr);
// Check that calling observation generation the next day generates no observation.
time_info.day_index++;
observations_or = procedure->GenerateObservations(time_info, &report_aggregate);
ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
observations = observations_or.ConsumeValueOrDie();
ASSERT_EQ(observations.size(), 0u);
// Check that obsolete aggregates get cleaned up the next day.
procedure->ObservationsCommitted(&report_aggregate, time_info, system_profile_hash);
EXPECT_FALSE(report_aggregate.daily().by_day_index().contains(kDayIndex));
}
TEST_F(SelectFirstAggregationProcedureTest, GenerateObservation7DaysReport) {
uint32_t metric_id = kOccurrenceMetricMetricId;
int report_index = kOccurrenceMetricUniqueDeviceCountsSelectFirstReport7DaysReportIndex;
std::unique_ptr<AggregationProcedure> procedure = GetProcedure(metric_id, report_index);
const uint32_t kDayIndex = 10000;
util::TimeInfo time_info;
time_info.day_index = kDayIndex;
const uint64_t system_profile_hash = uint64_t{999};
const uint32_t kNumEventCodes = 2;
ReportAggregate report_aggregate;
// Add events for 2 different event vectors: {1} and {2}, for each of the 7 days in the window
// ending on |kDayIndex|.
for (uint32_t day = kDayIndex; day > kDayIndex - 7; --day) {
AddOccurrenceEventsForDay(kNumEventCodes, day, system_profile_hash, procedure.get(),
&report_aggregate);
}
// Check that the observation is generated for the next 7 days.
for (int i = 0; i < 7; i++) {
time_info.day_index = kDayIndex + i;
lib::statusor::StatusOr<std::vector<ObservationAndSystemProfile>> observations_or =
procedure->GenerateObservations(time_info, &report_aggregate);
ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
std::vector<ObservationAndSystemProfile> observations = observations_or.ConsumeValueOrDie();
ASSERT_EQ(observations.size(), 1u);
EXPECT_EQ(observations[0].system_profile_hash, system_profile_hash);
// Check that |observation| includes the first event vector but not the second.
ASSERT_TRUE(observations[0].observation->has_integer());
const IntegerObservation &integer_obs = observations[0].observation->integer();
ASSERT_EQ(integer_obs.values_size(), 1u);
ASSERT_EQ(integer_obs.values(0).event_codes(0), 1u);
// Check that obsolete aggregates get cleaned up.
procedure->ObservationsCommitted(&report_aggregate, time_info, system_profile_hash);
// No data for days beyond the 7-day window.
EXPECT_FALSE(report_aggregate.daily().by_day_index().contains(time_info.day_index - 6));
// Days in the 7-day window that had events continue to contain data.
for (uint32_t day = time_info.day_index - 5; day <= kDayIndex; ++day) {
ASSERT_TRUE(report_aggregate.daily().by_day_index().contains(day));
ASSERT_EQ(
report_aggregate.daily().by_day_index().at(kDayIndex).system_profile_aggregates_size(),
1u);
const SystemProfileAggregate &system_profile_agg =
report_aggregate.daily().by_day_index().at(kDayIndex).system_profile_aggregates(0);
EXPECT_EQ(system_profile_agg.system_profile_hash(), system_profile_hash);
EXPECT_EQ(system_profile_agg.by_event_code_size(), 1);
}
// Days that had no events contain no data.
for (uint32_t day = kDayIndex + 1; day <= time_info.day_index; ++day) {
EXPECT_FALSE(report_aggregate.daily().by_day_index().contains(day));
}
}
// After 7 days the observation is no longer generated.
time_info.day_index = kDayIndex + 7;
lib::statusor::StatusOr<std::vector<ObservationAndSystemProfile>> observations_or =
procedure->GenerateObservations(time_info, &report_aggregate);
ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
std::vector<ObservationAndSystemProfile> observations = observations_or.ConsumeValueOrDie();
ASSERT_EQ(observations.size(), 0u);
}
TEST_F(SelectFirstAggregationProcedureTest, GenerateObservation7DaysReportExpedited) {
uint32_t metric_id = kExpeditedOccurrenceMetricMetricId;
int report_index = kExpeditedOccurrenceMetricSelectFirst7DayReportIndex;
std::unique_ptr<AggregationProcedure> procedure = GetProcedure(metric_id, report_index);
const uint32_t kDayIndex = 10000;
util::TimeInfo time_info;
time_info.day_index = kDayIndex;
const uint64_t system_profile_hash = uint64_t{999};
const uint32_t kNumEventCodes = 2;
ReportAggregate report_aggregate;
// Add events for 2 different event vectors: {1} and {2}, for each of the 7 days in the window
// ending on |kDayIndex|.
for (uint32_t day = kDayIndex; day > kDayIndex - 7; --day) {
AddOccurrenceEventsForDay(kNumEventCodes, day, system_profile_hash, procedure.get(),
&report_aggregate);
}
// Check that the observation is generated for the next 7 days.
for (int i = 0; i < 7; i++) {
time_info.day_index = kDayIndex + i;
lib::statusor::StatusOr<std::vector<ObservationAndSystemProfile>> observations_or =
procedure->GenerateObservations(time_info, &report_aggregate);
ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
std::vector<ObservationAndSystemProfile> observations = observations_or.ConsumeValueOrDie();
ASSERT_EQ(observations.size(), 1u);
EXPECT_EQ(observations[0].system_profile_hash, system_profile_hash);
// Check that |observation| includes the first event vector but not the second.
ASSERT_TRUE(observations[0].observation->has_integer());
const IntegerObservation &integer_obs = observations[0].observation->integer();
ASSERT_EQ(integer_obs.values_size(), 1u);
ASSERT_EQ(integer_obs.values(0).event_codes(0), 1u);
// Check that obsolete aggregates get cleaned up.
procedure->ObservationsCommitted(&report_aggregate, time_info, system_profile_hash);
// No data for days beyond the 7-day window.
EXPECT_FALSE(report_aggregate.daily().by_day_index().contains(time_info.day_index - 7));
// Days in the 7-day window that had events continue to contain data.
for (uint32_t day = time_info.day_index - 6; day <= kDayIndex; ++day) {
ASSERT_TRUE(report_aggregate.daily().by_day_index().contains(day));
ASSERT_EQ(
report_aggregate.daily().by_day_index().at(kDayIndex).system_profile_aggregates_size(),
1u);
const SystemProfileAggregate &system_profile_agg =
report_aggregate.daily().by_day_index().at(kDayIndex).system_profile_aggregates(0);
EXPECT_EQ(system_profile_agg.system_profile_hash(), system_profile_hash);
EXPECT_EQ(system_profile_agg.by_event_code_size(), 1);
}
// Days that had no events contain no data.
for (uint32_t day = kDayIndex + 1; day <= time_info.day_index; ++day) {
EXPECT_FALSE(report_aggregate.daily().by_day_index().contains(day));
}
// Check that calling observation generation again for the same day generates no observation.
observations_or = procedure->GenerateObservations(time_info, &report_aggregate);
ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
observations = observations_or.ConsumeValueOrDie();
ASSERT_EQ(observations.size(), 1u);
EXPECT_EQ(observations[0].system_profile_hash, system_profile_hash);
EXPECT_EQ(observations[0].observation, nullptr);
}
// After 7 days the observation is no longer generated.
time_info.day_index = kDayIndex + 7;
lib::statusor::StatusOr<std::vector<ObservationAndSystemProfile>> observations_or =
procedure->GenerateObservations(time_info, &report_aggregate);
ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
std::vector<ObservationAndSystemProfile> observations = observations_or.ConsumeValueOrDie();
ASSERT_EQ(observations.size(), 0u);
}
} // namespace cobalt::local_aggregation