blob: 581e08bedd719c54dac8955bbf22821535ec948c [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "src/local_aggregation_1_1/aggregation_procedures/integer_histogram_aggregation_procedure.h"
#include <gtest/gtest.h>
#include "src/lib/util/datetime_util.h"
#include "src/local_aggregation_1_1/aggregation_procedures/aggregation_procedure.h"
#include "src/local_aggregation_1_1/aggregation_procedures/testing/test_aggregation_procedure.h"
#include "src/local_aggregation_1_1/testing/test_registry.cb.h"
#include "src/logger/project_context_factory.h"
namespace cobalt::local_aggregation {
class IntegerHistogramAggregationProcedureTest : public testing::TestAggregationProcedure {
public:
void LogIntegerEvents(uint32_t hour_id, uint32_t num_event_codes, uint64_t system_profile_hash,
AggregationProcedure* procedure, ReportAggregate* aggregate) {
std::unique_ptr<logger::EventRecord> record =
MakeEventRecord(util::TimeInfo::FromHourId(hour_id));
IntegerEvent* event = record->event()->mutable_integer_event();
event->add_event_code(0);
for (int i = 0; i < num_event_codes; i++) {
event->set_event_code(0, i);
event->set_value(i);
procedure->UpdateAggregate(*record, aggregate, system_profile_hash,
util::FromUnixSeconds(util::HourIdToUnixSeconds(hour_id)));
}
}
void LogIntegerHistogramEvents(uint32_t hour_id, uint32_t num_event_codes,
const std::map<uint32_t, uint64_t>& histogram,
uint64_t system_profile_hash, AggregationProcedure* procedure,
ReportAggregate* aggregate) {
std::unique_ptr<logger::EventRecord> record =
MakeEventRecord(util::TimeInfo::FromHourId(hour_id));
IntegerHistogramEvent* event = record->event()->mutable_integer_histogram_event();
for (auto [index, count] : histogram) {
HistogramBucket* bucket = event->add_buckets();
bucket->set_index(index);
bucket->set_count(count);
}
event->add_event_code(0);
for (int i = 0; i < num_event_codes; i++) {
event->set_event_code(0, i);
procedure->UpdateAggregate(*record, aggregate, system_profile_hash,
util::FromUnixSeconds(util::HourIdToUnixSeconds(hour_id)));
}
}
std::unique_ptr<IntegerHistogramAggregationProcedure> GetProcedure(uint32_t metric_id,
int report_index) {
return std::make_unique<IntegerHistogramAggregationProcedure>(
GetMetricDef(metric_id), GetReportDef(metric_id, report_index));
}
};
TEST_F(IntegerHistogramAggregationProcedureTest, UpdateAggregateWorksInteger) {
uint32_t metric_id = kIntegerMetricMetricId;
int report_index = kIntegerMetricFleetwideHistogramsReportIndex;
std::unique_ptr<AggregationProcedure> procedure = GetProcedureFor(metric_id, report_index);
ReportAggregate aggregate;
const uint32_t kNumEventCodes = 100;
ASSERT_GE(GetReportDef(metric_id, report_index).event_vector_buffer_max(), kNumEventCodes);
const uint32_t kHourId = 1;
const uint64_t system_profile_hash = uint64_t{1867};
LogIntegerEvents(kHourId, kNumEventCodes, system_profile_hash, procedure.get(), &aggregate);
ASSERT_EQ(aggregate.hourly().by_hour_id_size(), 1);
ASSERT_EQ(aggregate.hourly().by_hour_id().at(kHourId).system_profile_aggregates_size(), 1u);
const SystemProfileAggregate& system_profile_agg =
aggregate.hourly().by_hour_id().at(kHourId).system_profile_aggregates(0);
EXPECT_EQ(system_profile_agg.system_profile_hash(), system_profile_hash);
EXPECT_EQ(system_profile_agg.by_event_code_size(), kNumEventCodes);
}
TEST_F(IntegerHistogramAggregationProcedureTest, UpdateAggregateWorksIntegerHistogram) {
uint32_t metric_id = kIntegerHistogramMetricMetricId;
int report_index = kIntegerHistogramMetricFleetwideHistogramsReportIndex;
std::unique_ptr<AggregationProcedure> procedure = GetProcedureFor(metric_id, report_index);
ReportAggregate aggregate;
const uint32_t kNumEventCodes = 100;
ASSERT_GE(GetReportDef(metric_id, report_index).event_vector_buffer_max(), kNumEventCodes);
const uint32_t kHourId = 1;
const uint64_t system_profile_hash = uint64_t{1867};
LogIntegerHistogramEvents(kHourId, kNumEventCodes, {{1, 10}, {2, 100}, {3, 50}},
system_profile_hash, procedure.get(), &aggregate);
ASSERT_EQ(aggregate.hourly().by_hour_id_size(), 1);
ASSERT_EQ(aggregate.hourly().by_hour_id().at(kHourId).system_profile_aggregates_size(), 1u);
const SystemProfileAggregate& system_profile_agg =
aggregate.hourly().by_hour_id().at(kHourId).system_profile_aggregates(0);
EXPECT_EQ(system_profile_agg.system_profile_hash(), system_profile_hash);
EXPECT_EQ(system_profile_agg.by_event_code_size(), kNumEventCodes);
}
TEST_F(IntegerHistogramAggregationProcedureTest, MergeAggregateDataBothSet) {
AggregateData data;
data.mutable_integer_histogram()->mutable_histogram()->insert({0, 10});
data.mutable_integer_histogram()->mutable_histogram()->insert({1, 20});
AggregateData merged_data;
merged_data.mutable_integer_histogram()->mutable_histogram()->insert({1, 30});
merged_data.mutable_integer_histogram()->mutable_histogram()->insert({2, 40});
std::unique_ptr<IntegerHistogramAggregationProcedure> procedure =
GetProcedure(kStringMetricMetricId, kStringMetricStringCountsReportIndex);
procedure->MergeAggregateData(&merged_data, data);
EXPECT_EQ(merged_data.integer_histogram().histogram_size(), 3);
ASSERT_TRUE(merged_data.integer_histogram().histogram().contains(0));
EXPECT_EQ(merged_data.integer_histogram().histogram().at(0), 10);
ASSERT_TRUE(merged_data.integer_histogram().histogram().contains(1));
EXPECT_EQ(merged_data.integer_histogram().histogram().at(1), 50);
ASSERT_TRUE(merged_data.integer_histogram().histogram().contains(2));
EXPECT_EQ(merged_data.integer_histogram().histogram().at(2), 40);
}
TEST_F(IntegerHistogramAggregationProcedureTest, MergeAggregateDataNeitherSet) {
AggregateData data;
AggregateData merged_data;
std::unique_ptr<IntegerHistogramAggregationProcedure> procedure =
GetProcedure(kStringMetricMetricId, kStringMetricStringCountsReportIndex);
procedure->MergeAggregateData(&merged_data, data);
EXPECT_FALSE(merged_data.has_integer_histogram());
EXPECT_EQ(merged_data.integer_histogram().histogram_size(), 0);
}
TEST_F(IntegerHistogramAggregationProcedureTest, MergeAggregateDataFromSet) {
AggregateData data;
data.mutable_integer_histogram()->mutable_histogram()->insert({0, 10});
data.mutable_integer_histogram()->mutable_histogram()->insert({1, 20});
AggregateData merged_data;
std::unique_ptr<IntegerHistogramAggregationProcedure> procedure =
GetProcedure(kStringMetricMetricId, kStringMetricStringCountsReportIndex);
procedure->MergeAggregateData(&merged_data, data);
EXPECT_EQ(merged_data.integer_histogram().histogram_size(), 2);
ASSERT_TRUE(merged_data.integer_histogram().histogram().contains(0));
EXPECT_EQ(merged_data.integer_histogram().histogram().at(0), 10);
ASSERT_TRUE(merged_data.integer_histogram().histogram().contains(1));
EXPECT_EQ(merged_data.integer_histogram().histogram().at(1), 20);
}
TEST_F(IntegerHistogramAggregationProcedureTest, MergeAggregateDataToSet) {
AggregateData data;
AggregateData merged_data;
merged_data.mutable_integer_histogram()->mutable_histogram()->insert({1, 30});
merged_data.mutable_integer_histogram()->mutable_histogram()->insert({2, 40});
std::unique_ptr<IntegerHistogramAggregationProcedure> procedure =
GetProcedure(kStringMetricMetricId, kStringMetricStringCountsReportIndex);
procedure->MergeAggregateData(&merged_data, data);
EXPECT_EQ(merged_data.integer_histogram().histogram_size(), 2);
ASSERT_TRUE(merged_data.integer_histogram().histogram().contains(1));
EXPECT_EQ(merged_data.integer_histogram().histogram().at(1), 30);
ASSERT_TRUE(merged_data.integer_histogram().histogram().contains(2));
EXPECT_EQ(merged_data.integer_histogram().histogram().at(2), 40);
}
TEST_F(IntegerHistogramAggregationProcedureTest, GenerateObservationWorksInteger) {
uint32_t metric_id = kIntegerMetricMetricId;
int report_index = kIntegerMetricFleetwideHistogramsReportIndex;
std::unique_ptr<AggregationProcedure> procedure = GetProcedureFor(metric_id, report_index);
ReportAggregate aggregate;
const uint32_t kNumEventCodes = 10;
ASSERT_GE(GetReportDef(metric_id, report_index).event_vector_buffer_max(), kNumEventCodes);
const uint32_t kEndHourId = 11;
const uint64_t system_profile_hash = uint64_t{1867};
for (int hour_id = 1; hour_id <= kEndHourId; hour_id += 2) {
LogIntegerEvents(hour_id, kNumEventCodes, system_profile_hash, procedure.get(), &aggregate);
}
lib::statusor::StatusOr<std::vector<ObservationAndSystemProfile>> observations_or =
procedure->GenerateObservations(util::TimeInfo::FromHourId(kEndHourId), &aggregate);
ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
std::vector<ObservationAndSystemProfile> observations = observations_or.ConsumeValueOrDie();
// Should only generate for kEndHourId
ASSERT_EQ(observations.size(), 1u);
EXPECT_EQ(observations[0].system_profile_hash, system_profile_hash);
EXPECT_EQ(observations[0].observation->index_histogram().index_histograms_size(), kNumEventCodes);
for (const IndexHistogram& value :
observations[0].observation->index_histogram().index_histograms()) {
EXPECT_EQ(value.bucket_indices(0), value.event_codes(0));
EXPECT_EQ(value.bucket_counts(0), 1);
}
// Check that obsolete aggregates get cleaned up.
procedure->ObservationsCommitted(&aggregate, util::TimeInfo::FromHourId(kEndHourId),
system_profile_hash);
EXPECT_EQ(aggregate.hourly().by_hour_id_size(), 0);
}
TEST_F(IntegerHistogramAggregationProcedureTest, GenerateObservationWorksIntegerHistogram) {
uint32_t metric_id = kIntegerHistogramMetricMetricId;
int report_index = kIntegerHistogramMetricFleetwideHistogramsReportIndex;
std::unique_ptr<AggregationProcedure> procedure = GetProcedureFor(metric_id, report_index);
ReportAggregate aggregate;
const uint32_t kNumEventCodes = 10;
ASSERT_GE(GetReportDef(metric_id, report_index).event_vector_buffer_max(), kNumEventCodes);
const uint32_t kEndHourId = 11;
const uint64_t system_profile_hash = uint64_t{1867};
const std::map<uint32_t, uint64_t> kLoggedHistogram = {{1, 10}, {2, 100}, {3, 50}};
for (int hour_id = 1; hour_id <= kEndHourId; hour_id += 2) {
LogIntegerHistogramEvents(hour_id, kNumEventCodes, kLoggedHistogram, system_profile_hash,
procedure.get(), &aggregate);
}
lib::statusor::StatusOr<std::vector<ObservationAndSystemProfile>> observations_or =
procedure->GenerateObservations(util::TimeInfo::FromHourId(kEndHourId), &aggregate);
ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
std::vector<ObservationAndSystemProfile> observations = observations_or.ConsumeValueOrDie();
// Should only generate for kEndHourId
ASSERT_EQ(observations.size(), 1u);
EXPECT_EQ(observations[0].system_profile_hash, system_profile_hash);
EXPECT_EQ(observations[0].observation->index_histogram().index_histograms_size(), kNumEventCodes);
for (const IndexHistogram& value :
observations[0].observation->index_histogram().index_histograms()) {
for (int i = 0; i < value.bucket_indices_size(); i++) {
uint64_t expected_value = kLoggedHistogram.at(value.bucket_indices(i));
EXPECT_EQ(expected_value, value.bucket_counts(i));
}
}
// Check that obsolete aggregates get cleaned up.
procedure->ObservationsCommitted(&aggregate, util::TimeInfo::FromHourId(kEndHourId),
system_profile_hash);
EXPECT_EQ(aggregate.hourly().by_hour_id_size(), 0);
}
} // namespace cobalt::local_aggregation