blob: 8bc485af6db56635e1f7a6712c334010f93e725b [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "src/local_aggregation_1_1/aggregation_procedures/aggregation_procedure.h"
#include <gtest/gtest.h>
#include "src/local_aggregation_1_1/aggregation_procedures/testing/test_aggregation_procedure.h"
#include "src/local_aggregation_1_1/testing/test_registry.cb.h"
namespace cobalt::local_aggregation {
class AggregationProcedureTest : public testing::TestAggregationProcedure {};
TEST_F(AggregationProcedureTest, GetWorks) {
std::unique_ptr<AggregationProcedure> count_aggregation_procedure = GetProcedureFor(
kOccurrenceMetricMetricId, kOccurrenceMetricFleetwideOccurrenceCountsReportReportIndex);
ASSERT_NE(count_aggregation_procedure, nullptr);
EXPECT_EQ(count_aggregation_procedure->DebugString(), "COUNT");
EXPECT_TRUE(count_aggregation_procedure->IsValidEventType(Event::TypeCase::kOccurrenceEvent));
count_aggregation_procedure = GetProcedureFor(kOccurrenceMetricMetricId,
kOccurrenceMetricHourlyDeviceHistogramsReportIndex);
ASSERT_NE(count_aggregation_procedure, nullptr);
EXPECT_EQ(count_aggregation_procedure->DebugString(), "COUNT");
EXPECT_TRUE(count_aggregation_procedure->IsValidEventType(Event::TypeCase::kOccurrenceEvent));
auto sum_and_count_aggregation_procedure =
GetProcedureFor(kIntegerMetricMetricId, kIntegerMetricFleetwideMeansReportReportIndex);
ASSERT_NE(sum_and_count_aggregation_procedure, nullptr);
EXPECT_EQ(sum_and_count_aggregation_procedure->DebugString(), "SUM_AND_COUNT");
EXPECT_TRUE(
sum_and_count_aggregation_procedure->IsValidEventType(Event::TypeCase::kIntegerEvent));
auto at_least_once_aggregation_procedure = GetProcedureFor(
kOccurrenceMetricMetricId, kOccurrenceMetricUniqueDeviceCountsReport1DayReportIndex);
ASSERT_NE(at_least_once_aggregation_procedure, nullptr);
EXPECT_EQ(at_least_once_aggregation_procedure->DebugString(), "AT_LEAST_ONCE");
EXPECT_TRUE(
at_least_once_aggregation_procedure->IsValidEventType(Event::TypeCase::kOccurrenceEvent));
auto at_least_once_string_aggregation_procedure = GetProcedureFor(
kStringMetricMetricId, kStringMetricUniqueDeviceStringCountsReport1DayReportIndex);
ASSERT_NE(at_least_once_string_aggregation_procedure, nullptr);
EXPECT_EQ(at_least_once_string_aggregation_procedure->DebugString(), "AT_LEAST_ONCE_STRING");
EXPECT_TRUE(
at_least_once_string_aggregation_procedure->IsValidEventType(Event::TypeCase::kStringEvent));
}
TEST_F(AggregationProcedureTest, UpdateHourlyAggregate) {
std::unique_ptr<AggregationProcedure> count_aggregation_procedure = GetProcedureFor(
kOccurrenceMetricMetricId, kOccurrenceMetricFleetwideOccurrenceCountsReportReportIndex);
uint64_t event_vector_buffer_max = GetEventVectorBufferMax();
ASSERT_GE(event_vector_buffer_max, 0u);
uint32_t kHourId = 10000;
uint64_t system_profile_hash = uint64_t{213};
ReportAggregate aggregate;
uint64_t num_events = event_vector_buffer_max + 1;
AddOccurrenceEventsForHour(num_events, kHourId, system_profile_hash, *count_aggregation_procedure,
aggregate);
ASSERT_EQ(aggregate.hourly().by_hour_id().count(kHourId), 1u);
ASSERT_EQ(aggregate.hourly().by_hour_id().at(kHourId).system_profile_aggregates_size(), 1u);
const SystemProfileAggregate& system_profile_agg =
aggregate.hourly().by_hour_id().at(kHourId).system_profile_aggregates(0);
EXPECT_EQ(system_profile_agg.system_profile_hash(), system_profile_hash);
ASSERT_EQ(system_profile_agg.by_event_code_size(), event_vector_buffer_max);
for (uint32_t i = 0; i < event_vector_buffer_max; ++i) {
EXPECT_EQ(system_profile_agg.by_event_code(i).event_codes(0), i + 1);
EXPECT_EQ(system_profile_agg.by_event_code(i).data().count(), i + 1);
}
}
TEST_F(AggregationProcedureTest, UpdateHourlyAggregateSelectLastSystemProfile) {
std::unique_ptr<AggregationProcedure> count_aggregation_procedure = GetProcedureFor(
kOccurrenceMetricMetricId, kOccurrenceMetricFleetwideOccurrenceCountsReportReportIndex);
uint64_t event_vector_buffer_max = GetEventVectorBufferMax();
ASSERT_GE(event_vector_buffer_max, 0u);
uint32_t kHourId = 10000;
ReportAggregate aggregate;
int64_t first_event_time = util::HourIdToUnixSeconds(kHourId);
uint64_t first_system_profile_hash = uint64_t{213};
uint64_t num_events = event_vector_buffer_max + 1;
AddOccurrenceEventsForHour(num_events, kHourId, first_system_profile_hash,
*count_aggregation_procedure, aggregate,
util::FromUnixSeconds(first_event_time));
int64_t second_event_time = first_event_time + 1000;
uint64_t second_system_profile_hash = uint64_t{324};
AddOccurrenceEventsForHour(num_events, kHourId, second_system_profile_hash,
*count_aggregation_procedure, aggregate,
util::FromUnixSeconds(second_event_time));
ASSERT_EQ(aggregate.hourly().by_hour_id().count(kHourId), 1u);
ASSERT_EQ(aggregate.hourly().by_hour_id().at(kHourId).system_profile_aggregates_size(), 1u);
const SystemProfileAggregate& system_profile_agg =
aggregate.hourly().by_hour_id().at(kHourId).system_profile_aggregates(0);
EXPECT_EQ(system_profile_agg.system_profile_hash(), second_system_profile_hash);
EXPECT_EQ(system_profile_agg.first_seen_timestamp(), first_event_time);
EXPECT_EQ(system_profile_agg.last_seen_timestamp(), second_event_time);
ASSERT_EQ(system_profile_agg.by_event_code_size(), event_vector_buffer_max);
for (uint32_t i = 0; i < event_vector_buffer_max; ++i) {
EXPECT_EQ(system_profile_agg.by_event_code(i).event_codes(0), i + 1);
EXPECT_EQ(system_profile_agg.by_event_code(i).data().count(), 2 * (i + 1));
}
}
TEST_F(AggregationProcedureTest, UpdateHourlyAggregateSelectFirstSystemProfile) {
std::unique_ptr<AggregationProcedure> count_aggregation_procedure = GetProcedureFor(
kOccurrenceMetricMetricId, kOccurrenceMetricHourlyDeviceHistogramsReportIndex);
uint64_t event_vector_buffer_max = GetEventVectorBufferMax();
ASSERT_GE(event_vector_buffer_max, 0u);
uint32_t kHourId = 10000;
ReportAggregate aggregate;
int64_t first_event_time = util::HourIdToUnixSeconds(kHourId);
uint64_t first_system_profile_hash = uint64_t{213};
uint64_t num_events = event_vector_buffer_max + 1;
AddOccurrenceEventsForHour(num_events, kHourId, first_system_profile_hash,
*count_aggregation_procedure, aggregate,
util::FromUnixSeconds(first_event_time));
int64_t second_event_time = first_event_time + 1000;
uint64_t second_system_profile_hash = uint64_t{324};
AddOccurrenceEventsForHour(num_events, kHourId, second_system_profile_hash,
*count_aggregation_procedure, aggregate,
util::FromUnixSeconds(second_event_time));
ASSERT_EQ(aggregate.hourly().by_hour_id().count(kHourId), 1u);
ASSERT_EQ(aggregate.hourly().by_hour_id().at(kHourId).system_profile_aggregates_size(), 1u);
const SystemProfileAggregate& system_profile_agg =
aggregate.hourly().by_hour_id().at(kHourId).system_profile_aggregates(0);
EXPECT_EQ(system_profile_agg.system_profile_hash(), first_system_profile_hash);
EXPECT_EQ(system_profile_agg.first_seen_timestamp(), first_event_time);
EXPECT_EQ(system_profile_agg.last_seen_timestamp(), second_event_time);
ASSERT_EQ(system_profile_agg.by_event_code_size(), event_vector_buffer_max);
for (uint32_t i = 0; i < event_vector_buffer_max; ++i) {
EXPECT_EQ(system_profile_agg.by_event_code(i).event_codes(0), i + 1);
EXPECT_EQ(system_profile_agg.by_event_code(i).data().count(), 2 * (i + 1));
}
}
TEST_F(AggregationProcedureTest, UpdateHourlyAggregateReportAllSystemProfile) {
std::unique_ptr<AggregationProcedure> count_aggregation_procedure = GetProcedureFor(
kOccurrenceMetricMetricId, kOccurrenceMetricHourlyDeviceHistogramsReportAllReportIndex);
uint64_t event_vector_buffer_max = GetEventVectorBufferMax();
ASSERT_GT(event_vector_buffer_max, 0u);
uint32_t kHourId = 10000;
ReportAggregate aggregate;
int64_t first_event_time = util::HourIdToUnixSeconds(kHourId);
uint64_t first_system_profile_hash = uint64_t{213};
uint64_t num_events = event_vector_buffer_max + 1;
AddOccurrenceEventsForHour(num_events, kHourId, first_system_profile_hash,
*count_aggregation_procedure, aggregate,
util::FromUnixSeconds(first_event_time));
int64_t second_event_time = first_event_time + 1000;
uint64_t second_system_profile_hash = uint64_t{324};
AddOccurrenceEventsForHour(num_events, kHourId, second_system_profile_hash,
*count_aggregation_procedure, aggregate,
util::FromUnixSeconds(second_event_time));
ASSERT_EQ(aggregate.hourly().by_hour_id().count(kHourId), 1u);
ASSERT_EQ(aggregate.hourly().by_hour_id().at(kHourId).system_profile_aggregates_size(), 2u);
{
// Check aggregate 1 (first_system_profile_hash)
const SystemProfileAggregate& system_profile_agg =
aggregate.hourly().by_hour_id().at(kHourId).system_profile_aggregates(0);
EXPECT_EQ(system_profile_agg.system_profile_hash(), first_system_profile_hash);
EXPECT_EQ(system_profile_agg.first_seen_timestamp(), first_event_time);
EXPECT_EQ(system_profile_agg.last_seen_timestamp(), first_event_time);
ASSERT_EQ(system_profile_agg.by_event_code_size(), event_vector_buffer_max);
for (uint32_t i = 0; i < event_vector_buffer_max; ++i) {
EXPECT_EQ(system_profile_agg.by_event_code(i).event_codes(0), i + 1);
EXPECT_EQ(system_profile_agg.by_event_code(i).data().count(), i + 1);
}
}
{
// Check aggregate 2 (second_system_profile_hash)
const SystemProfileAggregate& system_profile_agg =
aggregate.hourly().by_hour_id().at(kHourId).system_profile_aggregates(1);
EXPECT_EQ(system_profile_agg.system_profile_hash(), second_system_profile_hash);
EXPECT_EQ(system_profile_agg.first_seen_timestamp(), second_event_time);
EXPECT_EQ(system_profile_agg.last_seen_timestamp(), second_event_time);
ASSERT_EQ(system_profile_agg.by_event_code_size(), event_vector_buffer_max);
for (uint32_t i = 0; i < event_vector_buffer_max; ++i) {
EXPECT_EQ(system_profile_agg.by_event_code(i).event_codes(0), i + 1);
EXPECT_EQ(system_profile_agg.by_event_code(i).data().count(), i + 1);
}
}
}
TEST_F(AggregationProcedureTest, UpdateDailyAggregate) {
std::unique_ptr<AggregationProcedure> at_least_once_aggregation_procedure = GetProcedureFor(
kOccurrenceMetricMetricId, kOccurrenceMetricUniqueDeviceCountsReport1DayReportIndex);
uint64_t event_vector_buffer_max = GetEventVectorBufferMax();
ASSERT_GE(event_vector_buffer_max, 0u);
const uint32_t kDayIndex = 10000;
uint64_t system_profile_hash = uint64_t{213};
ReportAggregate aggregate;
uint64_t num_events = event_vector_buffer_max + 1;
AddOccurrenceEventsForDay(num_events, kDayIndex, system_profile_hash,
*at_least_once_aggregation_procedure, aggregate);
ASSERT_TRUE(aggregate.daily().by_day_index().contains(kDayIndex));
ASSERT_EQ(aggregate.daily().by_day_index().at(kDayIndex).system_profile_aggregates_size(), 1u);
const SystemProfileAggregate& system_profile_agg =
aggregate.daily().by_day_index().at(kDayIndex).system_profile_aggregates(0);
EXPECT_EQ(system_profile_agg.system_profile_hash(), system_profile_hash);
ASSERT_EQ(system_profile_agg.by_event_code_size(), event_vector_buffer_max);
for (uint32_t i = 0; i < event_vector_buffer_max; ++i) {
EXPECT_EQ(system_profile_agg.by_event_code(i).event_codes(0), i + 1);
EXPECT_TRUE(system_profile_agg.by_event_code(i).data().at_least_once().at_least_once());
}
}
TEST_F(AggregationProcedureTest, UpdateDailyAggregateSelectLast) {
std::unique_ptr<AggregationProcedure> at_least_once_aggregation_procedure = GetProcedureFor(
kOccurrenceMetricMetricId, kOccurrenceMetricUniqueDeviceCountsReport1DayReportIndex);
uint64_t event_vector_buffer_max = GetEventVectorBufferMax();
ASSERT_GE(event_vector_buffer_max, 0u);
const uint32_t kDayIndex = 10000;
ReportAggregate aggregate;
uint64_t num_events = event_vector_buffer_max + 1;
int64_t first_event_time = util::DayIndexToUnixSeconds(kDayIndex) + 4800;
uint64_t first_system_profile_hash = uint64_t{213};
AddOccurrenceEventsForDay(num_events, kDayIndex, first_system_profile_hash,
*at_least_once_aggregation_procedure, aggregate,
util::FromUnixSeconds(first_event_time));
int64_t second_event_time = first_event_time + 7200;
uint64_t second_system_profile_hash = uint64_t{324};
AddOccurrenceEventsForDay(num_events, kDayIndex, second_system_profile_hash,
*at_least_once_aggregation_procedure, aggregate,
util::FromUnixSeconds(second_event_time));
ASSERT_TRUE(aggregate.daily().by_day_index().contains(kDayIndex));
ASSERT_EQ(aggregate.daily().by_day_index().at(kDayIndex).system_profile_aggregates_size(), 1u);
const SystemProfileAggregate& system_profile_agg =
aggregate.daily().by_day_index().at(kDayIndex).system_profile_aggregates(0);
EXPECT_EQ(system_profile_agg.system_profile_hash(), second_system_profile_hash);
EXPECT_EQ(system_profile_agg.first_seen_timestamp(), first_event_time);
EXPECT_EQ(system_profile_agg.last_seen_timestamp(), second_event_time);
ASSERT_EQ(system_profile_agg.by_event_code_size(), event_vector_buffer_max);
for (uint32_t i = 0; i < event_vector_buffer_max; ++i) {
EXPECT_EQ(system_profile_agg.by_event_code(i).event_codes(0), i + 1);
EXPECT_TRUE(system_profile_agg.by_event_code(i).data().at_least_once().at_least_once());
}
}
TEST_F(AggregationProcedureTest, UpdateDailyAggregateSelectFirst) {
std::unique_ptr<AggregationProcedure> select_first_aggregation_procedure =
GetProcedureFor(kOccurrenceMetricMetricId,
kOccurrenceMetricUniqueDeviceCountsSelectFirstReport1DayReportIndex);
uint64_t event_vector_buffer_max = GetEventVectorBufferMax();
ASSERT_GE(event_vector_buffer_max, 0u);
const uint32_t kDayIndex = 10000;
ReportAggregate aggregate;
uint32_t kNumEventCodes = 2;
int64_t first_event_time = util::DayIndexToUnixSeconds(kDayIndex) + 4800;
uint64_t first_system_profile_hash = uint64_t{213};
// Add events for 2 different event vectors: {1} and {2}.
AddOccurrenceEventsForDay(kNumEventCodes, kDayIndex, first_system_profile_hash,
*select_first_aggregation_procedure, aggregate,
util::FromUnixSeconds(first_event_time));
int64_t second_event_time = first_event_time + 7200;
uint64_t second_system_profile_hash = uint64_t{324};
AddOccurrenceEventsForDay(kNumEventCodes, kDayIndex, second_system_profile_hash,
*select_first_aggregation_procedure, aggregate,
util::FromUnixSeconds(second_event_time));
ASSERT_TRUE(aggregate.daily().by_day_index().contains(kDayIndex));
ASSERT_EQ(aggregate.daily().by_day_index().at(kDayIndex).system_profile_aggregates_size(), 1u);
const SystemProfileAggregate& system_profile_agg =
aggregate.daily().by_day_index().at(kDayIndex).system_profile_aggregates(0);
EXPECT_EQ(system_profile_agg.system_profile_hash(), first_system_profile_hash);
EXPECT_EQ(system_profile_agg.first_seen_timestamp(), first_event_time);
EXPECT_EQ(system_profile_agg.last_seen_timestamp(), second_event_time);
ASSERT_EQ(system_profile_agg.by_event_code_size(), 1u);
ASSERT_EQ(system_profile_agg.by_event_code(0).event_codes(0), 1);
EXPECT_TRUE(system_profile_agg.by_event_code(0).data().at_least_once().at_least_once());
}
TEST_F(AggregationProcedureTest, UpdateDailyAggregateReportAll) {
std::unique_ptr<AggregationProcedure> select_first_aggregation_procedure = GetProcedureFor(
kOccurrenceMetricMetricId, kOccurrenceMetricUniqueDeviceCountsReportAllReport1DayReportIndex);
uint64_t event_vector_buffer_max = GetEventVectorBufferMax();
ASSERT_GT(event_vector_buffer_max, 0u);
const uint32_t kDayIndex = 10000;
ReportAggregate aggregate;
uint32_t kNumEventCodes = 2;
int64_t first_event_time = util::DayIndexToUnixSeconds(kDayIndex) + 4800;
uint64_t first_system_profile_hash = uint64_t{213};
// Add events for 2 different event vectors: {1} and {2}.
AddOccurrenceEventsForDay(kNumEventCodes, kDayIndex, first_system_profile_hash,
*select_first_aggregation_procedure, aggregate,
util::FromUnixSeconds(first_event_time));
int64_t second_event_time = first_event_time + 7200;
uint64_t second_system_profile_hash = uint64_t{324};
AddOccurrenceEventsForDay(kNumEventCodes, kDayIndex, second_system_profile_hash,
*select_first_aggregation_procedure, aggregate,
util::FromUnixSeconds(second_event_time));
ASSERT_TRUE(aggregate.daily().by_day_index().contains(kDayIndex));
ASSERT_EQ(aggregate.daily().by_day_index().at(kDayIndex).system_profile_aggregates_size(), 2u);
{
// Check aggregate 1 (first_system_profile_hash)
const SystemProfileAggregate& system_profile_agg =
aggregate.daily().by_day_index().at(kDayIndex).system_profile_aggregates(0);
EXPECT_EQ(system_profile_agg.system_profile_hash(), first_system_profile_hash);
EXPECT_EQ(system_profile_agg.first_seen_timestamp(), first_event_time);
EXPECT_EQ(system_profile_agg.last_seen_timestamp(), first_event_time);
ASSERT_EQ(system_profile_agg.by_event_code_size(), kNumEventCodes);
ASSERT_EQ(system_profile_agg.by_event_code(0).event_codes(0), 1);
EXPECT_TRUE(system_profile_agg.by_event_code(0).data().at_least_once().at_least_once());
}
{
// Check aggregate 2 (second_system_profile_hash)
const SystemProfileAggregate& system_profile_agg =
aggregate.daily().by_day_index().at(kDayIndex).system_profile_aggregates(1);
EXPECT_EQ(system_profile_agg.system_profile_hash(), second_system_profile_hash);
EXPECT_EQ(system_profile_agg.first_seen_timestamp(), second_event_time);
EXPECT_EQ(system_profile_agg.last_seen_timestamp(), second_event_time);
ASSERT_EQ(system_profile_agg.by_event_code_size(), kNumEventCodes);
ASSERT_EQ(system_profile_agg.by_event_code(0).event_codes(0), 1);
EXPECT_TRUE(system_profile_agg.by_event_code(0).data().at_least_once().at_least_once());
}
}
TEST_F(AggregationProcedureTest, MergeSystemProfileAggregatesForSelectLast) {
const MetricDefinition& metric = GetMetricDef(kOccurrenceMetricMetricId);
ReportDefinition report =
metric.reports(kOccurrenceMetricUniqueDeviceCountsReportAllReport1DayReportIndex);
uint64_t event_vector_buffer_max = report.event_vector_buffer_max();
ASSERT_GE(event_vector_buffer_max, 0u);
const uint32_t kDayIndex = 10000;
ReportAggregate aggregate;
std::unique_ptr<AggregationProcedure> report_all_aggregation_procedure = GetProcedureFor(
kOccurrenceMetricMetricId, kOccurrenceMetricUniqueDeviceCountsReportAllReport1DayReportIndex);
int64_t first_event_time = util::DayIndexToUnixSeconds(kDayIndex) + 4800;
uint64_t first_system_profile_hash = uint64_t{213};
AddOccurrenceEventsForDay(event_vector_buffer_max, kDayIndex, first_system_profile_hash,
*report_all_aggregation_procedure, aggregate,
util::FromUnixSeconds(first_event_time));
AggregationPeriodBucket& bucket =
aggregate.mutable_daily()->mutable_by_day_index()->at(kDayIndex);
ASSERT_EQ(bucket.system_profile_aggregates(0).by_event_code_size(), event_vector_buffer_max);
int64_t second_event_time = first_event_time + 7200;
uint64_t second_system_profile_hash = uint64_t{324};
AddOccurrenceEventsForDay(event_vector_buffer_max, kDayIndex, second_system_profile_hash,
*report_all_aggregation_procedure, aggregate,
util::FromUnixSeconds(second_event_time));
// Merge the second system profile aggregates into the first one.
std::unique_ptr<AggregationProcedure> at_least_once_aggregation_procedure = GetProcedureFor(
kOccurrenceMetricMetricId, kOccurrenceMetricUniqueDeviceCountsReport1DayReportIndex);
ASSERT_TRUE(aggregate.daily().by_day_index().contains(kDayIndex));
bucket = aggregate.mutable_daily()->mutable_by_day_index()->at(kDayIndex);
ASSERT_EQ(bucket.system_profile_aggregates_size(), 2u);
SystemProfileAggregate* merged_system_profile_aggregate =
bucket.mutable_system_profile_aggregates(0);
at_least_once_aggregation_procedure->MergeSystemProfileAggregates(
*merged_system_profile_aggregate, bucket.system_profile_aggregates(1));
EXPECT_EQ(merged_system_profile_aggregate->system_profile_hash(), second_system_profile_hash);
EXPECT_EQ(merged_system_profile_aggregate->first_seen_timestamp(), first_event_time);
EXPECT_EQ(merged_system_profile_aggregate->last_seen_timestamp(), second_event_time);
ASSERT_EQ(merged_system_profile_aggregate->by_event_code_size(), event_vector_buffer_max);
for (uint32_t i = 0; i < event_vector_buffer_max; ++i) {
EXPECT_EQ(merged_system_profile_aggregate->by_event_code(i).event_codes(0), i + 1);
EXPECT_TRUE(
merged_system_profile_aggregate->by_event_code(i).data().at_least_once().at_least_once());
}
}
TEST_F(AggregationProcedureTest, MergeSystemProfileAggregatesForSelectFirst) {
const MetricDefinition& metric = GetMetricDef(kOccurrenceMetricMetricId);
ReportDefinition report = metric.reports(kOccurrenceMetricHourlyDeviceHistogramsReportIndex);
uint64_t event_vector_buffer_max = report.event_vector_buffer_max();
ASSERT_GE(event_vector_buffer_max, 0u);
uint32_t kHourId = 10000;
ReportAggregate aggregate;
std::unique_ptr<AggregationProcedure> report_all_aggregation_procedure = GetProcedureFor(
kOccurrenceMetricMetricId, kOccurrenceMetricHourlyDeviceHistogramsReportAllReportIndex);
int64_t first_event_time = util::HourIdToUnixSeconds(kHourId);
uint64_t first_system_profile_hash = uint64_t{213};
AddOccurrenceEventsForHour(event_vector_buffer_max, kHourId, first_system_profile_hash,
*report_all_aggregation_procedure, aggregate,
util::FromUnixSeconds(first_event_time));
AggregationPeriodBucket& bucket = aggregate.mutable_hourly()->mutable_by_hour_id()->at(kHourId);
ASSERT_EQ(bucket.system_profile_aggregates_size(), 1u);
ASSERT_EQ(bucket.system_profile_aggregates(0).by_event_code_size(), event_vector_buffer_max);
int64_t second_event_time = first_event_time + 1000;
uint64_t second_system_profile_hash = uint64_t{324};
AddOccurrenceEventsForHour(event_vector_buffer_max, kHourId, second_system_profile_hash,
*report_all_aggregation_procedure, aggregate,
util::FromUnixSeconds(second_event_time));
ASSERT_EQ(bucket.system_profile_aggregates_size(), 2u);
ASSERT_EQ(bucket.system_profile_aggregates(1).by_event_code_size(), event_vector_buffer_max);
// Merge the second system profile aggregates into the first one.
std::unique_ptr<AggregationProcedure> at_least_once_aggregation_procedure = GetProcedureFor(
kOccurrenceMetricMetricId, kOccurrenceMetricHourlyDeviceHistogramsReportIndex);
ASSERT_TRUE(aggregate.hourly().by_hour_id().contains(kHourId));
bucket = aggregate.mutable_hourly()->mutable_by_hour_id()->at(kHourId);
ASSERT_EQ(bucket.system_profile_aggregates_size(), 2u);
SystemProfileAggregate* merged_system_profile_aggregate =
bucket.mutable_system_profile_aggregates(0);
at_least_once_aggregation_procedure->MergeSystemProfileAggregates(
*merged_system_profile_aggregate, bucket.system_profile_aggregates(1));
EXPECT_EQ(merged_system_profile_aggregate->system_profile_hash(), first_system_profile_hash);
EXPECT_EQ(merged_system_profile_aggregate->first_seen_timestamp(), first_event_time);
EXPECT_EQ(merged_system_profile_aggregate->last_seen_timestamp(), second_event_time);
ASSERT_EQ(merged_system_profile_aggregate->by_event_code_size(), event_vector_buffer_max);
for (uint32_t i = 0; i < event_vector_buffer_max; ++i) {
EXPECT_EQ(merged_system_profile_aggregate->by_event_code(i).event_codes(0), i + 1);
// Middle event code is double due to merge from both system profiles.
EXPECT_EQ(merged_system_profile_aggregate->by_event_code(i).data().count(), 2 * (i + 1));
}
}
TEST_F(AggregationProcedureTest, GenerateHourlyObservation) {
std::unique_ptr<AggregationProcedure> count_aggregation_procedure = GetProcedureFor(
kOccurrenceMetricMetricId, kOccurrenceMetricFleetwideOccurrenceCountsReportReportIndex);
uint64_t event_vector_buffer_max = GetEventVectorBufferMax();
ASSERT_GE(event_vector_buffer_max, 0u);
uint32_t kHourId = 10000;
uint64_t system_profile_hash = uint64_t{213};
ReportAggregate aggregate;
uint64_t num_events = event_vector_buffer_max + 1;
AddOccurrenceEventsForHour(num_events, kHourId - 1, system_profile_hash,
*count_aggregation_procedure, aggregate);
AddOccurrenceEventsForHour(num_events, kHourId, system_profile_hash, *count_aggregation_procedure,
aggregate);
lib::statusor::StatusOr<std::vector<ObservationAndSystemProfile>> observations_or =
count_aggregation_procedure->GenerateObservations(util::TimeInfo::FromHourId(kHourId),
aggregate);
ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
std::vector<ObservationAndSystemProfile> observations = std::move(observations_or).value();
ASSERT_EQ(observations.size(), 1u);
EXPECT_EQ(observations[0].system_profile_hash, system_profile_hash);
ASSERT_EQ(observations[0].observation->integer().values_size(), event_vector_buffer_max);
for (uint32_t i = 0; i < event_vector_buffer_max; ++i) {
EXPECT_EQ(observations[0].observation->integer().values(i).event_codes(0), i + 1);
EXPECT_EQ(observations[0].observation->integer().values(i).value(), i + 1);
}
}
TEST_F(AggregationProcedureTest, GenerateDailyObservation) {
std::unique_ptr<AggregationProcedure> at_least_once_aggregation_procedure = GetProcedureFor(
kOccurrenceMetricMetricId, kOccurrenceMetricUniqueDeviceCountsReport7DaysReportIndex);
uint32_t event_vector_buffer_max = GetEventVectorBufferMax();
ASSERT_GE(event_vector_buffer_max, 0u);
uint32_t kFinalDayIndex = 10000;
uint32_t kFirstDayIndex = 10000 - 6;
uint64_t system_profile_hash = uint64_t{213};
ReportAggregate aggregate;
uint32_t kFirstDayNumEvents = event_vector_buffer_max - 1;
AddOccurrenceEventsForDay(kFirstDayNumEvents, kFirstDayIndex, system_profile_hash,
*at_least_once_aggregation_procedure, aggregate);
uint32_t kSecondDayNumEvents = event_vector_buffer_max + 1;
AddOccurrenceEventsForDay(kSecondDayNumEvents, kFirstDayIndex + 1, system_profile_hash,
*at_least_once_aggregation_procedure, aggregate);
lib::statusor::StatusOr<std::vector<ObservationAndSystemProfile>> observations_or =
at_least_once_aggregation_procedure->GenerateObservations(
util::TimeInfo::FromDayIndex(kFinalDayIndex), aggregate);
ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
std::vector<ObservationAndSystemProfile> observations = std::move(observations_or).value();
ASSERT_EQ(observations.size(), 1u);
EXPECT_EQ(observations[0].system_profile_hash, system_profile_hash);
ASSERT_EQ(observations[0].observation->integer().values_size(), event_vector_buffer_max);
for (uint32_t i = 0; i < event_vector_buffer_max; ++i) {
EXPECT_EQ(observations[0].observation->integer().values(i).event_codes(0), i + 1);
EXPECT_EQ(observations[0].observation->integer().values(i).value(), 1);
}
}
TEST_F(AggregationProcedureTest, GenerateDailyObservationSelectLast) {
std::unique_ptr<AggregationProcedure> at_least_once_aggregation_procedure = GetProcedureFor(
kOccurrenceMetricMetricId, kOccurrenceMetricUniqueDeviceCountsReport7DaysReportIndex);
uint32_t event_vector_buffer_max = GetEventVectorBufferMax();
ASSERT_GE(event_vector_buffer_max, 0u);
uint32_t kFinalDayIndex = 10000;
uint32_t kFirstDayIndex = 10000 - 6;
ReportAggregate aggregate;
uint64_t first_system_profile_hash = uint64_t{213};
uint32_t kFirstDayNumEvents = event_vector_buffer_max - 1;
AddOccurrenceEventsForDay(kFirstDayNumEvents, kFirstDayIndex, first_system_profile_hash,
*at_least_once_aggregation_procedure, aggregate);
uint64_t second_system_profile_hash = uint64_t{426};
uint32_t kSecondDayNumEvents = event_vector_buffer_max + 1;
AddOccurrenceEventsForDay(kSecondDayNumEvents, kFirstDayIndex + 1, second_system_profile_hash,
*at_least_once_aggregation_procedure, aggregate);
lib::statusor::StatusOr<std::vector<ObservationAndSystemProfile>> observations_or =
at_least_once_aggregation_procedure->GenerateObservations(
util::TimeInfo::FromDayIndex(kFinalDayIndex), aggregate);
ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
std::vector<ObservationAndSystemProfile> observations = std::move(observations_or).value();
ASSERT_EQ(observations.size(), 1u);
EXPECT_EQ(observations[0].system_profile_hash, second_system_profile_hash);
ASSERT_EQ(observations[0].observation->integer().values_size(), event_vector_buffer_max);
for (uint32_t i = 0; i < event_vector_buffer_max; ++i) {
EXPECT_EQ(observations[0].observation->integer().values(i).event_codes(0), i + 1);
EXPECT_EQ(observations[0].observation->integer().values(i).value(), 1);
}
}
TEST_F(AggregationProcedureTest, GenerateDailyObservationSelectFirst) {
std::unique_ptr<AggregationProcedure> select_first_aggregation_procedure =
GetProcedureFor(kOccurrenceMetricMetricId,
kOccurrenceMetricUniqueDeviceCountsSelectFirstReport7DaysReportIndex);
uint32_t event_vector_buffer_max = GetEventVectorBufferMax();
ASSERT_EQ(event_vector_buffer_max, 1u);
uint32_t kFinalDayIndex = 10000;
uint32_t kFirstDayIndex = 10000 - 6;
ReportAggregate aggregate;
uint64_t first_system_profile_hash = uint64_t{213};
uint32_t kFirstDayNumEvents = event_vector_buffer_max;
AddOccurrenceEventsForDay(kFirstDayNumEvents, kFirstDayIndex, first_system_profile_hash,
*select_first_aggregation_procedure, aggregate);
uint64_t second_system_profile_hash = uint64_t{426};
uint32_t kSecondDayNumEvents = event_vector_buffer_max + 1;
AddOccurrenceEventsForDay(kSecondDayNumEvents, kFirstDayIndex + 1, second_system_profile_hash,
*select_first_aggregation_procedure, aggregate);
lib::statusor::StatusOr<std::vector<ObservationAndSystemProfile>> observations_or =
select_first_aggregation_procedure->GenerateObservations(
util::TimeInfo::FromDayIndex(kFinalDayIndex), aggregate);
ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
std::vector<ObservationAndSystemProfile> observations = std::move(observations_or).value();
ASSERT_EQ(observations.size(), 1u);
EXPECT_EQ(observations[0].system_profile_hash, first_system_profile_hash);
ASSERT_EQ(observations[0].observation->integer().values_size(), 1u);
EXPECT_EQ(observations[0].observation->integer().values(0).event_codes(0), 1);
EXPECT_EQ(observations[0].observation->integer().values(0).value(), 1);
}
} // namespace cobalt::local_aggregation