blob: 67c92035eb379062518092b7038faf8eeb33c3c2 [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "src/local_aggregation_1_1/aggregation_procedures/string_histogram_aggregation_procedure.h"
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include "src/lib/util/datetime_util.h"
#include "src/lib/util/hash.h"
#include "src/local_aggregation_1_1/aggregation_procedures/aggregation_procedure.h"
#include "src/local_aggregation_1_1/aggregation_procedures/testing/test_aggregation_procedure.h"
#include "src/local_aggregation_1_1/testing/test_registry.cb.h"
#include "src/logger/project_context_factory.h"
namespace cobalt::local_aggregation {
using ::testing::Contains;
class StringHistogramAggregationProcedureTest : public testing::TestAggregationProcedure {
protected:
void LogStringEvents(uint32_t hour_id, uint32_t num_event_codes,
const std::vector<std::string>& strings, uint64_t system_profile_hash,
AggregationProcedure& procedure, ReportAggregate& aggregate) {
std::unique_ptr<logger::EventRecord> record =
MakeEventRecord(util::TimeInfo::FromHourId(hour_id));
StringEvent* event = record->event()->mutable_string_event();
event->add_event_code(0);
for (int i = 0; i < num_event_codes; i++) {
event->set_event_code(0, i);
for (const std::string& str : strings) {
event->set_string_value(str);
procedure.UpdateAggregate(*record, aggregate, system_profile_hash,
util::FromUnixSeconds(util::HourIdToUnixSeconds(hour_id)));
}
}
}
std::unique_ptr<StringHistogramAggregationProcedure> GetProcedure(uint32_t metric_id,
int report_index) {
return std::make_unique<StringHistogramAggregationProcedure>(
GetMetricDef(metric_id), GetReportDef(metric_id, report_index));
}
};
TEST_F(StringHistogramAggregationProcedureTest, UpdateAggregateWorks) {
std::unique_ptr<AggregationProcedure> procedure =
GetProcedureFor(kStringMetricMetricId, kStringMetricStringCountsReportIndex);
ReportAggregate aggregate;
const uint32_t kNumEventCodes = 100;
const uint32_t kHourId = 1;
const uint64_t system_profile_hash = uint64_t{111111};
const std::vector<std::string> kTestStrings = {
"Nunc dictum justo ac arcu.",
"Suspendisse ullamcorper mi vel pulvinar dictum.",
"Aenean feugiat consectetur vestibulum.",
"Integer a ullamcorper dolor.",
"Praesent vel nulla quis metus consectetur aliquam sed ut felis.",
};
LogStringEvents(kHourId, kNumEventCodes, kTestStrings, system_profile_hash, *procedure,
aggregate);
ASSERT_EQ(aggregate.hourly().by_hour_id_size(), 1);
ASSERT_EQ(aggregate.hourly().by_hour_id().at(kHourId).system_profile_aggregates_size(), 1u);
const SystemProfileAggregate& system_profile_agg =
aggregate.hourly().by_hour_id().at(kHourId).system_profile_aggregates(0);
EXPECT_EQ(system_profile_agg.system_profile_hash(), system_profile_hash);
ASSERT_EQ(system_profile_agg.by_event_code_size(), kNumEventCodes);
}
TEST_F(StringHistogramAggregationProcedureTest, MergeAggregateDataBothSet) {
AggregateData data;
data.mutable_string_histogram()->mutable_histogram()->insert({0, 10});
data.mutable_string_histogram()->mutable_histogram()->insert({1, 20});
AggregateData merged_data;
merged_data.mutable_string_histogram()->mutable_histogram()->insert({1, 30});
merged_data.mutable_string_histogram()->mutable_histogram()->insert({2, 40});
std::unique_ptr<StringHistogramAggregationProcedure> procedure =
GetProcedure(kStringMetricMetricId, kStringMetricStringCountsReportIndex);
procedure->MergeAggregateData(merged_data, data);
EXPECT_EQ(merged_data.string_histogram().histogram_size(), 3);
ASSERT_TRUE(merged_data.string_histogram().histogram().contains(0));
EXPECT_EQ(merged_data.string_histogram().histogram().at(0), 10);
ASSERT_TRUE(merged_data.string_histogram().histogram().contains(1));
EXPECT_EQ(merged_data.string_histogram().histogram().at(1), 50);
ASSERT_TRUE(merged_data.string_histogram().histogram().contains(2));
EXPECT_EQ(merged_data.string_histogram().histogram().at(2), 40);
}
TEST_F(StringHistogramAggregationProcedureTest, MergeAggregateDataNeitherSet) {
AggregateData data;
AggregateData merged_data;
std::unique_ptr<StringHistogramAggregationProcedure> procedure =
GetProcedure(kStringMetricMetricId, kStringMetricStringCountsReportIndex);
procedure->MergeAggregateData(merged_data, data);
EXPECT_FALSE(merged_data.has_string_histogram());
EXPECT_EQ(merged_data.string_histogram().histogram_size(), 0);
}
TEST_F(StringHistogramAggregationProcedureTest, MergeAggregateDataFromSet) {
AggregateData data;
data.mutable_string_histogram()->mutable_histogram()->insert({0, 10});
data.mutable_string_histogram()->mutable_histogram()->insert({1, 20});
AggregateData merged_data;
std::unique_ptr<StringHistogramAggregationProcedure> procedure =
GetProcedure(kStringMetricMetricId, kStringMetricStringCountsReportIndex);
procedure->MergeAggregateData(merged_data, data);
EXPECT_EQ(merged_data.string_histogram().histogram_size(), 2);
ASSERT_TRUE(merged_data.string_histogram().histogram().contains(0));
EXPECT_EQ(merged_data.string_histogram().histogram().at(0), 10);
ASSERT_TRUE(merged_data.string_histogram().histogram().contains(1));
EXPECT_EQ(merged_data.string_histogram().histogram().at(1), 20);
}
TEST_F(StringHistogramAggregationProcedureTest, MergeAggregateDataToSet) {
AggregateData data;
AggregateData merged_data;
merged_data.mutable_string_histogram()->mutable_histogram()->insert({1, 30});
merged_data.mutable_string_histogram()->mutable_histogram()->insert({2, 40});
std::unique_ptr<StringHistogramAggregationProcedure> procedure =
GetProcedure(kStringMetricMetricId, kStringMetricStringCountsReportIndex);
procedure->MergeAggregateData(merged_data, data);
EXPECT_EQ(merged_data.string_histogram().histogram_size(), 2);
ASSERT_TRUE(merged_data.string_histogram().histogram().contains(1));
EXPECT_EQ(merged_data.string_histogram().histogram().at(1), 30);
ASSERT_TRUE(merged_data.string_histogram().histogram().contains(2));
EXPECT_EQ(merged_data.string_histogram().histogram().at(2), 40);
}
TEST_F(StringHistogramAggregationProcedureTest, GenerateObservationWorks) {
std::unique_ptr<AggregationProcedure> procedure =
GetProcedureFor(kStringMetricMetricId, kStringMetricStringCountsReportIndex);
ReportAggregate aggregate;
const uint32_t kNumEventCodes = 10;
const uint32_t kEndHourId = 11;
const uint64_t system_profile_hash = uint64_t{111111};
const std::vector<std::string> kTestStrings = {
"Nunc dictum justo ac arcu.",
"Suspendisse ullamcorper mi vel pulvinar dictum.",
"Aenean feugiat consectetur vestibulum.",
"Integer a ullamcorper dolor.",
"Praesent vel nulla quis metus consectetur aliquam sed ut felis.",
};
for (int hour_id = 1; hour_id <= kEndHourId; hour_id += 2) {
LogStringEvents(hour_id, kNumEventCodes, kTestStrings, system_profile_hash, *procedure,
aggregate);
}
lib::statusor::StatusOr<std::vector<ObservationAndSystemProfile>> observations_or =
procedure->GenerateObservations(util::TimeInfo::FromHourId(kEndHourId), aggregate);
ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
std::vector<ObservationAndSystemProfile> observations = std::move(observations_or).value();
// Should only generate for kEndHourId
ASSERT_EQ(observations.size(), 1u);
EXPECT_EQ(observations[0].system_profile_hash, system_profile_hash);
ASSERT_EQ(observations[0].observation->string_histogram().string_histograms_size(),
kNumEventCodes);
const StringHistogramObservation& histogram = observations[0].observation->string_histogram();
std::vector<std::string> expected_hashes;
expected_hashes.reserve(kTestStrings.size());
for (const std::string& string : kTestStrings) {
expected_hashes.push_back(util::FarmhashFingerprint(string));
}
for (const IndexHistogram& value : histogram.string_histograms()) {
for (int i = 0; i < value.bucket_indices_size(); i++) {
ASSERT_EQ(value.bucket_counts(i), 1);
ASSERT_THAT(expected_hashes, Contains(histogram.string_hashes(value.bucket_indices(i))));
}
}
// Check that obsolete aggregates get cleaned up.
procedure->ObservationsCommitted(aggregate, util::TimeInfo::FromHourId(kEndHourId),
system_profile_hash);
ASSERT_EQ(aggregate.hourly().by_hour_id_size(), 0);
}
TEST_F(StringHistogramAggregationProcedureTest, RejectExcessStrings) {
std::unique_ptr<AggregationProcedure> procedure =
GetProcedureFor(kStringMetricMetricId, kStringMetricStringCountsReportIndex);
ReportAggregate aggregate;
const uint32_t kNumEventCodes = 10;
const uint32_t kEndHourId = 11;
const uint64_t system_profile_hash = uint64_t{111111};
// The string_buffer_max is 5, this is too many
const std::vector<std::string> kTestStrings = {
"Nunc dictum justo ac arcu.",
"Suspendisse ullamcorper mi vel pulvinar dictum.",
"Aenean feugiat consectetur vestibulum.",
"Integer a ullamcorper dolor.",
"Praesent vel nulla quis metus consectetur aliquam sed ut felis.",
"Integer quis tortor commodo, rutrum risus.",
"Nam consectetur velit ac sollicitudin tempus.",
"Integer ultricies libero quis suscipit lobortis.",
"Aenean bibendum egestas risus auctor tincidunt.",
"Sed sit amet scelerisque neque.",
"Pellentesque dictum quam nec lectus sagittis interdum.",
};
for (int hour_id = 1; hour_id <= kEndHourId; hour_id += 2) {
LogStringEvents(hour_id, kNumEventCodes, kTestStrings, system_profile_hash, *procedure,
aggregate);
}
lib::statusor::StatusOr<std::vector<ObservationAndSystemProfile>> observations_or =
procedure->GenerateObservations(util::TimeInfo::FromHourId(kEndHourId), aggregate);
ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
std::vector<ObservationAndSystemProfile> observations = std::move(observations_or).value();
// Should only generate for kEndHourId
ASSERT_EQ(observations.size(), 1u);
EXPECT_EQ(observations[0].system_profile_hash, system_profile_hash);
ASSERT_EQ(observations[0].observation->string_histogram().string_histograms_size(),
kNumEventCodes);
for (const IndexHistogram& value :
observations[0].observation->string_histogram().string_histograms()) {
ASSERT_EQ(value.bucket_counts_size(), 5);
}
// Check that obsolete aggregates get cleaned up.
procedure->ObservationsCommitted(aggregate, util::TimeInfo::FromHourId(kEndHourId),
system_profile_hash);
ASSERT_EQ(aggregate.hourly().by_hour_id_size(), 0);
}
} // namespace cobalt::local_aggregation