| // Copyright 2020 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "src/local_aggregation_1_1/aggregation_procedures/integer_histogram_aggregation_procedure.h" |
| |
| #include <gtest/gtest.h> |
| |
| #include "src/lib/util/datetime_util.h" |
| #include "src/local_aggregation_1_1/aggregation_procedures/aggregation_procedure.h" |
| #include "src/local_aggregation_1_1/aggregation_procedures/testing/test_aggregation_procedure.h" |
| #include "src/local_aggregation_1_1/testing/test_registry.cb.h" |
| #include "src/logger/project_context_factory.h" |
| |
| namespace cobalt::local_aggregation { |
| |
| class IntegerHistogramAggregationProcedureTest : public testing::TestAggregationProcedure { |
| public: |
| void LogIntegerEvents(uint32_t hour_id, uint32_t num_event_codes, uint64_t system_profile_hash, |
| AggregationProcedure* procedure, ReportAggregate* aggregate) { |
| std::unique_ptr<logger::EventRecord> record = |
| MakeEventRecord(util::TimeInfo::FromHourId(hour_id)); |
| IntegerEvent* event = record->event()->mutable_integer_event(); |
| event->add_event_code(0); |
| for (int i = 0; i < num_event_codes; i++) { |
| event->set_event_code(0, i); |
| event->set_value(i); |
| procedure->UpdateAggregate(*record, aggregate, system_profile_hash, |
| util::FromUnixSeconds(util::HourIdToUnixSeconds(hour_id))); |
| } |
| } |
| |
| void LogIntegerHistogramEvents(uint32_t hour_id, uint32_t num_event_codes, |
| const std::map<uint32_t, uint64_t>& histogram, |
| uint64_t system_profile_hash, AggregationProcedure* procedure, |
| ReportAggregate* aggregate) { |
| std::unique_ptr<logger::EventRecord> record = |
| MakeEventRecord(util::TimeInfo::FromHourId(hour_id)); |
| IntegerHistogramEvent* event = record->event()->mutable_integer_histogram_event(); |
| for (auto [index, count] : histogram) { |
| HistogramBucket* bucket = event->add_buckets(); |
| bucket->set_index(index); |
| bucket->set_count(count); |
| } |
| event->add_event_code(0); |
| for (int i = 0; i < num_event_codes; i++) { |
| event->set_event_code(0, i); |
| procedure->UpdateAggregate(*record, aggregate, system_profile_hash, |
| util::FromUnixSeconds(util::HourIdToUnixSeconds(hour_id))); |
| } |
| } |
| |
| std::unique_ptr<IntegerHistogramAggregationProcedure> GetProcedure(uint32_t metric_id, |
| int report_index) { |
| return std::make_unique<IntegerHistogramAggregationProcedure>( |
| GetMetricDef(metric_id), GetReportDef(metric_id, report_index)); |
| } |
| }; |
| |
| TEST_F(IntegerHistogramAggregationProcedureTest, UpdateAggregateWorksInteger) { |
| uint32_t metric_id = kIntegerMetricMetricId; |
| int report_index = kIntegerMetricFleetwideHistogramsReportIndex; |
| std::unique_ptr<AggregationProcedure> procedure = GetProcedureFor(metric_id, report_index); |
| |
| ReportAggregate aggregate; |
| const uint32_t kNumEventCodes = 100; |
| ASSERT_GE(GetReportDef(metric_id, report_index).event_vector_buffer_max(), kNumEventCodes); |
| |
| const uint32_t kHourId = 1; |
| const uint64_t system_profile_hash = uint64_t{1867}; |
| LogIntegerEvents(kHourId, kNumEventCodes, system_profile_hash, procedure.get(), &aggregate); |
| |
| ASSERT_EQ(aggregate.hourly().by_hour_id_size(), 1); |
| ASSERT_EQ(aggregate.hourly().by_hour_id().at(kHourId).system_profile_aggregates_size(), 1u); |
| const SystemProfileAggregate& system_profile_agg = |
| aggregate.hourly().by_hour_id().at(kHourId).system_profile_aggregates(0); |
| EXPECT_EQ(system_profile_agg.system_profile_hash(), system_profile_hash); |
| EXPECT_EQ(system_profile_agg.by_event_code_size(), kNumEventCodes); |
| } |
| |
| TEST_F(IntegerHistogramAggregationProcedureTest, UpdateAggregateWorksIntegerHistogram) { |
| uint32_t metric_id = kIntegerHistogramMetricMetricId; |
| int report_index = kIntegerHistogramMetricFleetwideHistogramsReportIndex; |
| std::unique_ptr<AggregationProcedure> procedure = GetProcedureFor(metric_id, report_index); |
| |
| ReportAggregate aggregate; |
| const uint32_t kNumEventCodes = 100; |
| ASSERT_GE(GetReportDef(metric_id, report_index).event_vector_buffer_max(), kNumEventCodes); |
| |
| const uint32_t kHourId = 1; |
| const uint64_t system_profile_hash = uint64_t{1867}; |
| LogIntegerHistogramEvents(kHourId, kNumEventCodes, {{1, 10}, {2, 100}, {3, 50}}, |
| system_profile_hash, procedure.get(), &aggregate); |
| |
| ASSERT_EQ(aggregate.hourly().by_hour_id_size(), 1); |
| ASSERT_EQ(aggregate.hourly().by_hour_id().at(kHourId).system_profile_aggregates_size(), 1u); |
| const SystemProfileAggregate& system_profile_agg = |
| aggregate.hourly().by_hour_id().at(kHourId).system_profile_aggregates(0); |
| EXPECT_EQ(system_profile_agg.system_profile_hash(), system_profile_hash); |
| EXPECT_EQ(system_profile_agg.by_event_code_size(), kNumEventCodes); |
| } |
| |
| TEST_F(IntegerHistogramAggregationProcedureTest, MergeAggregateDataBothSet) { |
| AggregateData data; |
| data.mutable_integer_histogram()->mutable_histogram()->insert({0, 10}); |
| data.mutable_integer_histogram()->mutable_histogram()->insert({1, 20}); |
| AggregateData merged_data; |
| merged_data.mutable_integer_histogram()->mutable_histogram()->insert({1, 30}); |
| merged_data.mutable_integer_histogram()->mutable_histogram()->insert({2, 40}); |
| |
| std::unique_ptr<IntegerHistogramAggregationProcedure> procedure = |
| GetProcedure(kStringMetricMetricId, kStringMetricStringCountsReportIndex); |
| procedure->MergeAggregateData(&merged_data, data); |
| |
| EXPECT_EQ(merged_data.integer_histogram().histogram_size(), 3); |
| ASSERT_TRUE(merged_data.integer_histogram().histogram().contains(0)); |
| EXPECT_EQ(merged_data.integer_histogram().histogram().at(0), 10); |
| ASSERT_TRUE(merged_data.integer_histogram().histogram().contains(1)); |
| EXPECT_EQ(merged_data.integer_histogram().histogram().at(1), 50); |
| ASSERT_TRUE(merged_data.integer_histogram().histogram().contains(2)); |
| EXPECT_EQ(merged_data.integer_histogram().histogram().at(2), 40); |
| } |
| |
| TEST_F(IntegerHistogramAggregationProcedureTest, MergeAggregateDataNeitherSet) { |
| AggregateData data; |
| AggregateData merged_data; |
| |
| std::unique_ptr<IntegerHistogramAggregationProcedure> procedure = |
| GetProcedure(kStringMetricMetricId, kStringMetricStringCountsReportIndex); |
| procedure->MergeAggregateData(&merged_data, data); |
| |
| EXPECT_FALSE(merged_data.has_integer_histogram()); |
| EXPECT_EQ(merged_data.integer_histogram().histogram_size(), 0); |
| } |
| |
| TEST_F(IntegerHistogramAggregationProcedureTest, MergeAggregateDataFromSet) { |
| AggregateData data; |
| data.mutable_integer_histogram()->mutable_histogram()->insert({0, 10}); |
| data.mutable_integer_histogram()->mutable_histogram()->insert({1, 20}); |
| AggregateData merged_data; |
| |
| std::unique_ptr<IntegerHistogramAggregationProcedure> procedure = |
| GetProcedure(kStringMetricMetricId, kStringMetricStringCountsReportIndex); |
| procedure->MergeAggregateData(&merged_data, data); |
| |
| EXPECT_EQ(merged_data.integer_histogram().histogram_size(), 2); |
| ASSERT_TRUE(merged_data.integer_histogram().histogram().contains(0)); |
| EXPECT_EQ(merged_data.integer_histogram().histogram().at(0), 10); |
| ASSERT_TRUE(merged_data.integer_histogram().histogram().contains(1)); |
| EXPECT_EQ(merged_data.integer_histogram().histogram().at(1), 20); |
| } |
| |
| TEST_F(IntegerHistogramAggregationProcedureTest, MergeAggregateDataToSet) { |
| AggregateData data; |
| AggregateData merged_data; |
| merged_data.mutable_integer_histogram()->mutable_histogram()->insert({1, 30}); |
| merged_data.mutable_integer_histogram()->mutable_histogram()->insert({2, 40}); |
| |
| std::unique_ptr<IntegerHistogramAggregationProcedure> procedure = |
| GetProcedure(kStringMetricMetricId, kStringMetricStringCountsReportIndex); |
| procedure->MergeAggregateData(&merged_data, data); |
| |
| EXPECT_EQ(merged_data.integer_histogram().histogram_size(), 2); |
| ASSERT_TRUE(merged_data.integer_histogram().histogram().contains(1)); |
| EXPECT_EQ(merged_data.integer_histogram().histogram().at(1), 30); |
| ASSERT_TRUE(merged_data.integer_histogram().histogram().contains(2)); |
| EXPECT_EQ(merged_data.integer_histogram().histogram().at(2), 40); |
| } |
| |
| TEST_F(IntegerHistogramAggregationProcedureTest, GenerateObservationWorksInteger) { |
| uint32_t metric_id = kIntegerMetricMetricId; |
| int report_index = kIntegerMetricFleetwideHistogramsReportIndex; |
| std::unique_ptr<AggregationProcedure> procedure = GetProcedureFor(metric_id, report_index); |
| |
| ReportAggregate aggregate; |
| const uint32_t kNumEventCodes = 10; |
| ASSERT_GE(GetReportDef(metric_id, report_index).event_vector_buffer_max(), kNumEventCodes); |
| |
| const uint32_t kEndHourId = 11; |
| const uint64_t system_profile_hash = uint64_t{1867}; |
| for (int hour_id = 1; hour_id <= kEndHourId; hour_id += 2) { |
| LogIntegerEvents(hour_id, kNumEventCodes, system_profile_hash, procedure.get(), &aggregate); |
| } |
| |
| lib::statusor::StatusOr<std::vector<ObservationAndSystemProfile>> observations_or = |
| procedure->GenerateObservations(util::TimeInfo::FromHourId(kEndHourId), &aggregate); |
| ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK); |
| std::vector<ObservationAndSystemProfile> observations = observations_or.ConsumeValueOrDie(); |
| |
| // Should only generate for kEndHourId |
| ASSERT_EQ(observations.size(), 1u); |
| EXPECT_EQ(observations[0].system_profile_hash, system_profile_hash); |
| |
| EXPECT_EQ(observations[0].observation->index_histogram().index_histograms_size(), kNumEventCodes); |
| |
| for (const IndexHistogram& value : |
| observations[0].observation->index_histogram().index_histograms()) { |
| EXPECT_EQ(value.bucket_indices(0), value.event_codes(0)); |
| EXPECT_EQ(value.bucket_counts(0), 1); |
| } |
| // Check that obsolete aggregates get cleaned up. |
| procedure->ObservationsCommitted(&aggregate, util::TimeInfo::FromHourId(kEndHourId), |
| system_profile_hash); |
| EXPECT_EQ(aggregate.hourly().by_hour_id_size(), 0); |
| } |
| |
| TEST_F(IntegerHistogramAggregationProcedureTest, GenerateObservationWorksIntegerHistogram) { |
| uint32_t metric_id = kIntegerHistogramMetricMetricId; |
| int report_index = kIntegerHistogramMetricFleetwideHistogramsReportIndex; |
| std::unique_ptr<AggregationProcedure> procedure = GetProcedureFor(metric_id, report_index); |
| |
| ReportAggregate aggregate; |
| const uint32_t kNumEventCodes = 10; |
| ASSERT_GE(GetReportDef(metric_id, report_index).event_vector_buffer_max(), kNumEventCodes); |
| |
| const uint32_t kEndHourId = 11; |
| const uint64_t system_profile_hash = uint64_t{1867}; |
| const std::map<uint32_t, uint64_t> kLoggedHistogram = {{1, 10}, {2, 100}, {3, 50}}; |
| for (int hour_id = 1; hour_id <= kEndHourId; hour_id += 2) { |
| LogIntegerHistogramEvents(hour_id, kNumEventCodes, kLoggedHistogram, system_profile_hash, |
| procedure.get(), &aggregate); |
| } |
| |
| lib::statusor::StatusOr<std::vector<ObservationAndSystemProfile>> observations_or = |
| procedure->GenerateObservations(util::TimeInfo::FromHourId(kEndHourId), &aggregate); |
| ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK); |
| std::vector<ObservationAndSystemProfile> observations = observations_or.ConsumeValueOrDie(); |
| |
| // Should only generate for kEndHourId |
| ASSERT_EQ(observations.size(), 1u); |
| EXPECT_EQ(observations[0].system_profile_hash, system_profile_hash); |
| |
| EXPECT_EQ(observations[0].observation->index_histogram().index_histograms_size(), kNumEventCodes); |
| |
| for (const IndexHistogram& value : |
| observations[0].observation->index_histogram().index_histograms()) { |
| for (int i = 0; i < value.bucket_indices_size(); i++) { |
| uint64_t expected_value = kLoggedHistogram.at(value.bucket_indices(i)); |
| EXPECT_EQ(expected_value, value.bucket_counts(i)); |
| } |
| } |
| // Check that obsolete aggregates get cleaned up. |
| procedure->ObservationsCommitted(&aggregate, util::TimeInfo::FromHourId(kEndHourId), |
| system_profile_hash); |
| EXPECT_EQ(aggregate.hourly().by_hour_id_size(), 0); |
| } |
| |
| } // namespace cobalt::local_aggregation |