blob: e675c1ab8d8694d4e5ba1e5fff7a1e1533ad220b [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "src/local_aggregation_1_1/aggregation_procedures/aggregation_procedure.h"
#include <gtest/gtest.h>
#include "src/local_aggregation_1_1/aggregation_procedures/testing/test_aggregation_procedure.h"
#include "src/local_aggregation_1_1/testing/test_registry.cb.h"
namespace cobalt::local_aggregation {
class AggregationProcedureTest : public testing::TestAggregationProcedure {};
TEST_F(AggregationProcedureTest, GetWorks) {
std::unique_ptr<AggregationProcedure> count_aggregation_procedure = GetProcedureFor(
kOccurrenceMetricMetricId, kOccurrenceMetricFleetwideOccurrenceCountsReportReportIndex);
ASSERT_NE(count_aggregation_procedure, nullptr);
EXPECT_EQ(count_aggregation_procedure->DebugString(), "COUNT");
EXPECT_TRUE(count_aggregation_procedure->IsValidEventType(Event::TypeCase::kOccurrenceEvent));
count_aggregation_procedure = GetProcedureFor(kOccurrenceMetricMetricId,
kOccurrenceMetricHourlyDeviceHistogramsReportIndex);
ASSERT_NE(count_aggregation_procedure, nullptr);
EXPECT_EQ(count_aggregation_procedure->DebugString(), "COUNT");
EXPECT_TRUE(count_aggregation_procedure->IsValidEventType(Event::TypeCase::kOccurrenceEvent));
auto sum_and_count_aggregation_procedure =
GetProcedureFor(kIntegerMetricMetricId, kIntegerMetricFleetwideMeansReportReportIndex);
ASSERT_NE(sum_and_count_aggregation_procedure, nullptr);
EXPECT_EQ(sum_and_count_aggregation_procedure->DebugString(), "SUM_AND_COUNT");
EXPECT_TRUE(
sum_and_count_aggregation_procedure->IsValidEventType(Event::TypeCase::kIntegerEvent));
auto at_least_aggregation_procedure = GetProcedureFor(
kOccurrenceMetricMetricId, kOccurrenceMetricUniqueDeviceCountsReport1DayReportIndex);
ASSERT_NE(at_least_aggregation_procedure, nullptr);
EXPECT_EQ(at_least_aggregation_procedure->DebugString(), "AT_LEAST_ONCE");
EXPECT_TRUE(at_least_aggregation_procedure->IsValidEventType(Event::TypeCase::kOccurrenceEvent));
}
TEST_F(AggregationProcedureTest, UpdateHourlyAggregate) {
std::unique_ptr<AggregationProcedure> count_aggregation_procedure = GetProcedureFor(
kOccurrenceMetricMetricId, kOccurrenceMetricFleetwideOccurrenceCountsReportReportIndex);
uint64_t event_vector_buffer_max = GetEventVectorBufferMax();
ASSERT_GE(event_vector_buffer_max, 0u);
uint32_t kHourId = 10000;
uint64_t system_profile_hash = uint64_t{213};
ReportAggregate aggregate;
uint64_t num_events = event_vector_buffer_max + 1;
AddOccurrenceEventsForHour(num_events, kHourId, system_profile_hash,
count_aggregation_procedure.get(), &aggregate);
ASSERT_EQ(aggregate.hourly().by_hour_id().count(kHourId), 1u);
ASSERT_EQ(aggregate.hourly().by_hour_id().at(kHourId).system_profile_aggregates_size(), 1u);
const SystemProfileAggregate& system_profile_agg =
aggregate.hourly().by_hour_id().at(kHourId).system_profile_aggregates(0);
EXPECT_EQ(system_profile_agg.system_profile_hash(), system_profile_hash);
ASSERT_EQ(system_profile_agg.by_event_code_size(), event_vector_buffer_max);
for (uint32_t i = 0; i < event_vector_buffer_max; ++i) {
EXPECT_EQ(system_profile_agg.by_event_code(i).event_codes(0), i + 1);
EXPECT_EQ(system_profile_agg.by_event_code(i).data().count(), i + 1);
}
}
TEST_F(AggregationProcedureTest, UpdateHourlyAggregateSelectLastSystemProfile) {
std::unique_ptr<AggregationProcedure> count_aggregation_procedure = GetProcedureFor(
kOccurrenceMetricMetricId, kOccurrenceMetricFleetwideOccurrenceCountsReportReportIndex);
uint64_t event_vector_buffer_max = GetEventVectorBufferMax();
ASSERT_GE(event_vector_buffer_max, 0u);
uint32_t kHourId = 10000;
ReportAggregate aggregate;
int64_t first_event_time = util::HourIdToUnixSeconds(kHourId);
uint64_t first_system_profile_hash = uint64_t{213};
uint64_t num_events = event_vector_buffer_max + 1;
AddOccurrenceEventsForHour(num_events, kHourId, first_system_profile_hash,
count_aggregation_procedure.get(), &aggregate,
util::FromUnixSeconds(first_event_time));
int64_t second_event_time = first_event_time + 1000;
uint64_t second_system_profile_hash = uint64_t{324};
AddOccurrenceEventsForHour(num_events, kHourId, second_system_profile_hash,
count_aggregation_procedure.get(), &aggregate,
util::FromUnixSeconds(second_event_time));
ASSERT_EQ(aggregate.hourly().by_hour_id().count(kHourId), 1u);
ASSERT_EQ(aggregate.hourly().by_hour_id().at(kHourId).system_profile_aggregates_size(), 1u);
const SystemProfileAggregate& system_profile_agg =
aggregate.hourly().by_hour_id().at(kHourId).system_profile_aggregates(0);
EXPECT_EQ(system_profile_agg.system_profile_hash(), second_system_profile_hash);
EXPECT_EQ(system_profile_agg.first_seen_timestamp(), first_event_time);
EXPECT_EQ(system_profile_agg.last_seen_timestamp(), second_event_time);
ASSERT_EQ(system_profile_agg.by_event_code_size(), event_vector_buffer_max);
for (uint32_t i = 0; i < event_vector_buffer_max; ++i) {
EXPECT_EQ(system_profile_agg.by_event_code(i).event_codes(0), i + 1);
EXPECT_EQ(system_profile_agg.by_event_code(i).data().count(), 2 * (i + 1));
}
}
TEST_F(AggregationProcedureTest, UpdateHourlyAggregateSelectFirstSystemProfile) {
std::unique_ptr<AggregationProcedure> count_aggregation_procedure = GetProcedureFor(
kOccurrenceMetricMetricId, kOccurrenceMetricHourlyDeviceHistogramsReportIndex);
uint64_t event_vector_buffer_max = GetEventVectorBufferMax();
ASSERT_GE(event_vector_buffer_max, 0u);
uint32_t kHourId = 10000;
ReportAggregate aggregate;
int64_t first_event_time = util::HourIdToUnixSeconds(kHourId);
uint64_t first_system_profile_hash = uint64_t{213};
uint64_t num_events = event_vector_buffer_max + 1;
AddOccurrenceEventsForHour(num_events, kHourId, first_system_profile_hash,
count_aggregation_procedure.get(), &aggregate,
util::FromUnixSeconds(first_event_time));
int64_t second_event_time = first_event_time + 1000;
uint64_t second_system_profile_hash = uint64_t{324};
AddOccurrenceEventsForHour(num_events, kHourId, second_system_profile_hash,
count_aggregation_procedure.get(), &aggregate,
util::FromUnixSeconds(second_event_time));
ASSERT_EQ(aggregate.hourly().by_hour_id().count(kHourId), 1u);
ASSERT_EQ(aggregate.hourly().by_hour_id().at(kHourId).system_profile_aggregates_size(), 1u);
const SystemProfileAggregate& system_profile_agg =
aggregate.hourly().by_hour_id().at(kHourId).system_profile_aggregates(0);
EXPECT_EQ(system_profile_agg.system_profile_hash(), first_system_profile_hash);
EXPECT_EQ(system_profile_agg.first_seen_timestamp(), first_event_time);
EXPECT_EQ(system_profile_agg.last_seen_timestamp(), second_event_time);
ASSERT_EQ(system_profile_agg.by_event_code_size(), event_vector_buffer_max);
for (uint32_t i = 0; i < event_vector_buffer_max; ++i) {
EXPECT_EQ(system_profile_agg.by_event_code(i).event_codes(0), i + 1);
EXPECT_EQ(system_profile_agg.by_event_code(i).data().count(), 2 * (i + 1));
}
}
TEST_F(AggregationProcedureTest, UpdateDailyAggregate) {
std::unique_ptr<AggregationProcedure> at_least_once_aggregation_procedure = GetProcedureFor(
kOccurrenceMetricMetricId, kOccurrenceMetricUniqueDeviceCountsReport1DayReportIndex);
uint64_t event_vector_buffer_max = GetEventVectorBufferMax();
ASSERT_GE(event_vector_buffer_max, 0u);
const uint32_t kDayIndex = 10000;
uint64_t system_profile_hash = uint64_t{213};
ReportAggregate aggregate;
uint64_t num_events = event_vector_buffer_max + 1;
AddOccurrenceEventsForDay(num_events, kDayIndex, system_profile_hash,
at_least_once_aggregation_procedure.get(), &aggregate);
ASSERT_TRUE(aggregate.daily().by_day_index().contains(kDayIndex));
ASSERT_EQ(aggregate.daily().by_day_index().at(kDayIndex).system_profile_aggregates_size(), 1u);
const SystemProfileAggregate& system_profile_agg =
aggregate.daily().by_day_index().at(kDayIndex).system_profile_aggregates(0);
EXPECT_EQ(system_profile_agg.system_profile_hash(), system_profile_hash);
ASSERT_EQ(system_profile_agg.by_event_code_size(), event_vector_buffer_max);
for (uint32_t i = 0; i < event_vector_buffer_max; ++i) {
EXPECT_EQ(system_profile_agg.by_event_code(i).event_codes(0), i + 1);
EXPECT_TRUE(system_profile_agg.by_event_code(i).data().at_least_once().at_least_once());
}
}
TEST_F(AggregationProcedureTest, UpdateDailyAggregateSelectLast) {
std::unique_ptr<AggregationProcedure> at_least_once_aggregation_procedure = GetProcedureFor(
kOccurrenceMetricMetricId, kOccurrenceMetricUniqueDeviceCountsReport1DayReportIndex);
uint64_t event_vector_buffer_max = GetEventVectorBufferMax();
ASSERT_GE(event_vector_buffer_max, 0u);
const uint32_t kDayIndex = 10000;
ReportAggregate aggregate;
uint64_t num_events = event_vector_buffer_max + 1;
int64_t first_event_time = util::DayIndexToUnixSeconds(kDayIndex) + 4800;
uint64_t first_system_profile_hash = uint64_t{213};
AddOccurrenceEventsForDay(num_events, kDayIndex, first_system_profile_hash,
at_least_once_aggregation_procedure.get(), &aggregate,
util::FromUnixSeconds(first_event_time));
int64_t second_event_time = first_event_time + 7200;
uint64_t second_system_profile_hash = uint64_t{324};
AddOccurrenceEventsForDay(num_events, kDayIndex, second_system_profile_hash,
at_least_once_aggregation_procedure.get(), &aggregate,
util::FromUnixSeconds(second_event_time));
ASSERT_TRUE(aggregate.daily().by_day_index().contains(kDayIndex));
ASSERT_EQ(aggregate.daily().by_day_index().at(kDayIndex).system_profile_aggregates_size(), 1u);
const SystemProfileAggregate& system_profile_agg =
aggregate.daily().by_day_index().at(kDayIndex).system_profile_aggregates(0);
EXPECT_EQ(system_profile_agg.system_profile_hash(), second_system_profile_hash);
EXPECT_EQ(system_profile_agg.first_seen_timestamp(), first_event_time);
EXPECT_EQ(system_profile_agg.last_seen_timestamp(), second_event_time);
ASSERT_EQ(system_profile_agg.by_event_code_size(), event_vector_buffer_max);
for (uint32_t i = 0; i < event_vector_buffer_max; ++i) {
EXPECT_EQ(system_profile_agg.by_event_code(i).event_codes(0), i + 1);
EXPECT_TRUE(system_profile_agg.by_event_code(i).data().at_least_once().at_least_once());
}
}
TEST_F(AggregationProcedureTest, UpdateDailyAggregateSelectFirst) {
std::unique_ptr<AggregationProcedure> select_first_aggregation_procedure =
GetProcedureFor(kOccurrenceMetricMetricId,
kOccurrenceMetricUniqueDeviceCountsSelectFirstReport1DayReportIndex);
uint64_t event_vector_buffer_max = GetEventVectorBufferMax();
ASSERT_GE(event_vector_buffer_max, 0u);
const uint32_t kDayIndex = 10000;
ReportAggregate aggregate;
uint32_t kNumEventCodes = 2;
int64_t first_event_time = util::DayIndexToUnixSeconds(kDayIndex) + 4800;
uint64_t first_system_profile_hash = uint64_t{213};
// Add events for 2 different event vectors: {1} and {2}.
AddOccurrenceEventsForDay(kNumEventCodes, kDayIndex, first_system_profile_hash,
select_first_aggregation_procedure.get(), &aggregate,
util::FromUnixSeconds(first_event_time));
int64_t second_event_time = first_event_time + 7200;
uint64_t second_system_profile_hash = uint64_t{324};
AddOccurrenceEventsForDay(kNumEventCodes, kDayIndex, second_system_profile_hash,
select_first_aggregation_procedure.get(), &aggregate,
util::FromUnixSeconds(second_event_time));
ASSERT_TRUE(aggregate.daily().by_day_index().contains(kDayIndex));
ASSERT_EQ(aggregate.daily().by_day_index().at(kDayIndex).system_profile_aggregates_size(), 1u);
const SystemProfileAggregate& system_profile_agg =
aggregate.daily().by_day_index().at(kDayIndex).system_profile_aggregates(0);
EXPECT_EQ(system_profile_agg.system_profile_hash(), first_system_profile_hash);
EXPECT_EQ(system_profile_agg.first_seen_timestamp(), first_event_time);
EXPECT_EQ(system_profile_agg.last_seen_timestamp(), second_event_time);
ASSERT_EQ(system_profile_agg.by_event_code_size(), 1u);
ASSERT_EQ(system_profile_agg.by_event_code(0).event_codes(0), 1);
EXPECT_TRUE(system_profile_agg.by_event_code(0).data().at_least_once().at_least_once());
}
TEST_F(AggregationProcedureTest, MergeSystemProfileAggregatesForSelectLast) {
const MetricDefinition& metric = GetMetricDef(kOccurrenceMetricMetricId);
ReportDefinition report =
metric.reports(kOccurrenceMetricUniqueDeviceCountsReport1DayReportIndex);
uint64_t event_vector_buffer_max = report.event_vector_buffer_max();
ASSERT_GE(event_vector_buffer_max, 0u);
const uint32_t kDayIndex = 10000;
ReportAggregate aggregate;
// Manually create some events for the report as they would be with a REPORT_ALL policy.
// TODO(fxbug.dev/87403): use AddOccurrenceEventsForDay once REPORT_ALL is supported.
int64_t first_event_time = util::DayIndexToUnixSeconds(kDayIndex) + 4800;
uint64_t first_system_profile_hash = uint64_t{213};
AggregationPeriodBucket& bucket = (*aggregate.mutable_daily()->mutable_by_day_index())[kDayIndex];
SystemProfileAggregate* system_profile_aggregate = bucket.add_system_profile_aggregates();
system_profile_aggregate->set_system_profile_hash(first_system_profile_hash);
system_profile_aggregate->set_first_seen_timestamp(first_event_time);
system_profile_aggregate->set_last_seen_timestamp(first_event_time);
for (uint32_t i = 1; i <= event_vector_buffer_max / 2; i++) {
EventCodesAggregateData* data = system_profile_aggregate->add_by_event_code();
data->add_event_codes(i);
data->mutable_data()->mutable_at_least_once()->set_at_least_once(true);
}
int64_t second_event_time = first_event_time + 7200;
uint64_t second_system_profile_hash = uint64_t{324};
system_profile_aggregate = bucket.add_system_profile_aggregates();
system_profile_aggregate->set_system_profile_hash(second_system_profile_hash);
system_profile_aggregate->set_first_seen_timestamp(second_event_time);
system_profile_aggregate->set_last_seen_timestamp(second_event_time);
// Add more event_codes than can be supported in a single aggregate.
for (uint32_t i = event_vector_buffer_max / 2; i <= event_vector_buffer_max * 3 / 2; i++) {
EventCodesAggregateData* data = system_profile_aggregate->add_by_event_code();
data->add_event_codes(i);
data->mutable_data()->mutable_at_least_once()->set_at_least_once(true);
}
// Merge the second system profile aggregates into the first one.
std::unique_ptr<AggregationProcedure> at_least_once_aggregation_procedure = GetProcedureFor(
kOccurrenceMetricMetricId, kOccurrenceMetricUniqueDeviceCountsReport1DayReportIndex);
ASSERT_TRUE(aggregate.daily().by_day_index().contains(kDayIndex));
bucket = aggregate.mutable_daily()->mutable_by_day_index()->at(kDayIndex);
ASSERT_EQ(bucket.system_profile_aggregates_size(), 2u);
SystemProfileAggregate* merged_system_profile_aggregate =
bucket.mutable_system_profile_aggregates(0);
at_least_once_aggregation_procedure->MergeSystemProfileAggregates(
merged_system_profile_aggregate, bucket.system_profile_aggregates(1));
EXPECT_EQ(merged_system_profile_aggregate->system_profile_hash(), second_system_profile_hash);
EXPECT_EQ(merged_system_profile_aggregate->first_seen_timestamp(), first_event_time);
EXPECT_EQ(merged_system_profile_aggregate->last_seen_timestamp(), second_event_time);
ASSERT_EQ(merged_system_profile_aggregate->by_event_code_size(), event_vector_buffer_max);
for (uint32_t i = 0; i < event_vector_buffer_max; ++i) {
EXPECT_EQ(merged_system_profile_aggregate->by_event_code(i).event_codes(0), i + 1);
EXPECT_TRUE(
merged_system_profile_aggregate->by_event_code(i).data().at_least_once().at_least_once());
}
}
TEST_F(AggregationProcedureTest, MergeSystemProfileAggregatesForSelectFirst) {
const MetricDefinition& metric = GetMetricDef(kOccurrenceMetricMetricId);
ReportDefinition report = metric.reports(kOccurrenceMetricHourlyDeviceHistogramsReportIndex);
uint64_t event_vector_buffer_max = report.event_vector_buffer_max();
ASSERT_GE(event_vector_buffer_max, 0u);
uint32_t kHourId = 10000;
ReportAggregate aggregate;
// Manually create some events for the report as they would be with a REPORT_ALL policy.
// TODO(fxbug.dev/87403): use AddOccurrenceEventsForHour once REPORT_ALL is supported.
int64_t first_event_time = util::HourIdToUnixSeconds(kHourId);
uint64_t first_system_profile_hash = uint64_t{213};
AggregationPeriodBucket& bucket = (*aggregate.mutable_hourly()->mutable_by_hour_id())[kHourId];
SystemProfileAggregate* system_profile_aggregate = bucket.add_system_profile_aggregates();
system_profile_aggregate->set_system_profile_hash(first_system_profile_hash);
system_profile_aggregate->set_first_seen_timestamp(first_event_time);
system_profile_aggregate->set_last_seen_timestamp(first_event_time);
for (uint32_t i = 1; i <= event_vector_buffer_max / 2; i++) {
EventCodesAggregateData* data = system_profile_aggregate->add_by_event_code();
data->add_event_codes(i);
data->mutable_data()->set_count(i);
}
int64_t second_event_time = first_event_time + 1000;
uint64_t second_system_profile_hash = uint64_t{324};
system_profile_aggregate = bucket.add_system_profile_aggregates();
system_profile_aggregate->set_system_profile_hash(second_system_profile_hash);
system_profile_aggregate->set_first_seen_timestamp(second_event_time);
system_profile_aggregate->set_last_seen_timestamp(second_event_time);
// Add more event_codes than can be supported in a single aggregate.
for (uint32_t i = event_vector_buffer_max / 2; i <= event_vector_buffer_max * 3 / 2; i++) {
EventCodesAggregateData* data = system_profile_aggregate->add_by_event_code();
data->add_event_codes(i);
data->mutable_data()->set_count(i);
}
// Merge the second system profile aggregates into the first one.
std::unique_ptr<AggregationProcedure> at_least_once_aggregation_procedure = GetProcedureFor(
kOccurrenceMetricMetricId, kOccurrenceMetricHourlyDeviceHistogramsReportIndex);
ASSERT_TRUE(aggregate.hourly().by_hour_id().contains(kHourId));
bucket = aggregate.mutable_hourly()->mutable_by_hour_id()->at(kHourId);
ASSERT_EQ(bucket.system_profile_aggregates_size(), 2u);
SystemProfileAggregate* merged_system_profile_aggregate =
bucket.mutable_system_profile_aggregates(0);
at_least_once_aggregation_procedure->MergeSystemProfileAggregates(
merged_system_profile_aggregate, bucket.system_profile_aggregates(1));
EXPECT_EQ(merged_system_profile_aggregate->system_profile_hash(), first_system_profile_hash);
EXPECT_EQ(merged_system_profile_aggregate->first_seen_timestamp(), first_event_time);
EXPECT_EQ(merged_system_profile_aggregate->last_seen_timestamp(), second_event_time);
ASSERT_EQ(merged_system_profile_aggregate->by_event_code_size(), event_vector_buffer_max);
for (uint32_t i = 0; i < event_vector_buffer_max; ++i) {
EXPECT_EQ(merged_system_profile_aggregate->by_event_code(i).event_codes(0), i + 1);
// Middle event code is double due to merge from both system profiles.
EXPECT_EQ(merged_system_profile_aggregate->by_event_code(i).data().count(),
i + 1 == event_vector_buffer_max / 2 ? 2 * (i + 1) : i + 1);
}
}
TEST_F(AggregationProcedureTest, GenerateHourlyObservation) {
std::unique_ptr<AggregationProcedure> count_aggregation_procedure = GetProcedureFor(
kOccurrenceMetricMetricId, kOccurrenceMetricFleetwideOccurrenceCountsReportReportIndex);
uint64_t event_vector_buffer_max = GetEventVectorBufferMax();
ASSERT_GE(event_vector_buffer_max, 0u);
uint32_t kHourId = 10000;
uint64_t system_profile_hash = uint64_t{213};
ReportAggregate aggregate;
uint64_t num_events = event_vector_buffer_max + 1;
AddOccurrenceEventsForHour(num_events, kHourId - 1, system_profile_hash,
count_aggregation_procedure.get(), &aggregate);
AddOccurrenceEventsForHour(num_events, kHourId, system_profile_hash,
count_aggregation_procedure.get(), &aggregate);
lib::statusor::StatusOr<std::vector<ObservationAndSystemProfile>> observations_or =
count_aggregation_procedure->GenerateObservations(util::TimeInfo::FromHourId(kHourId),
&aggregate);
ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
std::vector<ObservationAndSystemProfile> observations = observations_or.ConsumeValueOrDie();
ASSERT_EQ(observations.size(), 1u);
EXPECT_EQ(observations[0].system_profile_hash, system_profile_hash);
ASSERT_EQ(observations[0].observation->integer().values_size(), event_vector_buffer_max);
for (uint32_t i = 0; i < event_vector_buffer_max; ++i) {
EXPECT_EQ(observations[0].observation->integer().values(i).event_codes(0), i + 1);
EXPECT_EQ(observations[0].observation->integer().values(i).value(), i + 1);
}
}
TEST_F(AggregationProcedureTest, GenerateDailyObservation) {
std::unique_ptr<AggregationProcedure> at_least_once_aggregation_procedure = GetProcedureFor(
kOccurrenceMetricMetricId, kOccurrenceMetricUniqueDeviceCountsReport7DaysReportIndex);
uint32_t event_vector_buffer_max = GetEventVectorBufferMax();
ASSERT_GE(event_vector_buffer_max, 0u);
uint32_t kFinalDayIndex = 10000;
uint32_t kFirstDayIndex = 10000 - 6;
uint64_t system_profile_hash = uint64_t{213};
ReportAggregate aggregate;
uint32_t kFirstDayNumEvents = event_vector_buffer_max - 1;
AddOccurrenceEventsForDay(kFirstDayNumEvents, kFirstDayIndex, system_profile_hash,
at_least_once_aggregation_procedure.get(), &aggregate);
uint32_t kSecondDayNumEvents = event_vector_buffer_max + 1;
AddOccurrenceEventsForDay(kSecondDayNumEvents, kFirstDayIndex + 1, system_profile_hash,
at_least_once_aggregation_procedure.get(), &aggregate);
lib::statusor::StatusOr<std::vector<ObservationAndSystemProfile>> observations_or =
at_least_once_aggregation_procedure->GenerateObservations(
util::TimeInfo::FromDayIndex(kFinalDayIndex), &aggregate);
ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
std::vector<ObservationAndSystemProfile> observations = observations_or.ConsumeValueOrDie();
ASSERT_EQ(observations.size(), 1u);
EXPECT_EQ(observations[0].system_profile_hash, system_profile_hash);
ASSERT_EQ(observations[0].observation->integer().values_size(), event_vector_buffer_max);
for (uint32_t i = 0; i < event_vector_buffer_max; ++i) {
EXPECT_EQ(observations[0].observation->integer().values(i).event_codes(0), i + 1);
EXPECT_EQ(observations[0].observation->integer().values(i).value(), 1);
}
}
TEST_F(AggregationProcedureTest, GenerateDailyObservationSelectLast) {
std::unique_ptr<AggregationProcedure> at_least_once_aggregation_procedure = GetProcedureFor(
kOccurrenceMetricMetricId, kOccurrenceMetricUniqueDeviceCountsReport7DaysReportIndex);
uint32_t event_vector_buffer_max = GetEventVectorBufferMax();
ASSERT_GE(event_vector_buffer_max, 0u);
uint32_t kFinalDayIndex = 10000;
uint32_t kFirstDayIndex = 10000 - 6;
ReportAggregate aggregate;
uint64_t first_system_profile_hash = uint64_t{213};
uint32_t kFirstDayNumEvents = event_vector_buffer_max - 1;
AddOccurrenceEventsForDay(kFirstDayNumEvents, kFirstDayIndex, first_system_profile_hash,
at_least_once_aggregation_procedure.get(), &aggregate);
uint64_t second_system_profile_hash = uint64_t{426};
uint32_t kSecondDayNumEvents = event_vector_buffer_max + 1;
AddOccurrenceEventsForDay(kSecondDayNumEvents, kFirstDayIndex + 1, second_system_profile_hash,
at_least_once_aggregation_procedure.get(), &aggregate);
lib::statusor::StatusOr<std::vector<ObservationAndSystemProfile>> observations_or =
at_least_once_aggregation_procedure->GenerateObservations(
util::TimeInfo::FromDayIndex(kFinalDayIndex), &aggregate);
ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
std::vector<ObservationAndSystemProfile> observations = observations_or.ConsumeValueOrDie();
ASSERT_EQ(observations.size(), 1u);
EXPECT_EQ(observations[0].system_profile_hash, second_system_profile_hash);
ASSERT_EQ(observations[0].observation->integer().values_size(), event_vector_buffer_max);
for (uint32_t i = 0; i < event_vector_buffer_max; ++i) {
EXPECT_EQ(observations[0].observation->integer().values(i).event_codes(0), i + 1);
EXPECT_EQ(observations[0].observation->integer().values(i).value(), 1);
}
}
TEST_F(AggregationProcedureTest, GenerateDailyObservationSelectFirst) {
std::unique_ptr<AggregationProcedure> select_first_aggregation_procedure =
GetProcedureFor(kOccurrenceMetricMetricId,
kOccurrenceMetricUniqueDeviceCountsSelectFirstReport7DaysReportIndex);
uint32_t event_vector_buffer_max = GetEventVectorBufferMax();
ASSERT_GE(event_vector_buffer_max, 0u);
uint32_t kFinalDayIndex = 10000;
uint32_t kFirstDayIndex = 10000 - 6;
ReportAggregate aggregate;
uint64_t first_system_profile_hash = uint64_t{213};
uint32_t kFirstDayNumEvents = event_vector_buffer_max - 1;
AddOccurrenceEventsForDay(kFirstDayNumEvents, kFirstDayIndex, first_system_profile_hash,
select_first_aggregation_procedure.get(), &aggregate);
uint64_t second_system_profile_hash = uint64_t{426};
uint32_t kSecondDayNumEvents = event_vector_buffer_max + 1;
AddOccurrenceEventsForDay(kSecondDayNumEvents, kFirstDayIndex + 1, second_system_profile_hash,
select_first_aggregation_procedure.get(), &aggregate);
lib::statusor::StatusOr<std::vector<ObservationAndSystemProfile>> observations_or =
select_first_aggregation_procedure->GenerateObservations(
util::TimeInfo::FromDayIndex(kFinalDayIndex), &aggregate);
ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
std::vector<ObservationAndSystemProfile> observations = observations_or.ConsumeValueOrDie();
ASSERT_EQ(observations.size(), 1u);
EXPECT_EQ(observations[0].system_profile_hash, first_system_profile_hash);
ASSERT_EQ(observations[0].observation->integer().values_size(), 1u);
EXPECT_EQ(observations[0].observation->integer().values(0).event_codes(0), 1);
EXPECT_EQ(observations[0].observation->integer().values(0).value(), 1);
}
} // namespace cobalt::local_aggregation