blob: 1af694912c3c87a8944447bbe15052cf8bd62018 [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "src/local_aggregation_1_1/aggregation_procedures/at_least_once_string_aggregation_procedure.h"
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include "src/lib/util/datetime_util.h"
#include "src/lib/util/hash.h"
#include "src/local_aggregation_1_1/aggregation_procedures/aggregation_procedure.h"
#include "src/local_aggregation_1_1/aggregation_procedures/testing/test_aggregation_procedure.h"
#include "src/local_aggregation_1_1/testing/test_registry.cb.h"
#include "src/logger/project_context_factory.h"
namespace cobalt::local_aggregation {
using ::testing::IsSubsetOf;
using ::testing::UnorderedElementsAreArray;
class AtLeastOnceStringAggregationProcedureTest : public testing::TestAggregationProcedure {
protected:
void AddStringEventsForDay(uint32_t day_index,
const std::map<uint32_t, std::vector<std::string>>& events_to_strings,
uint64_t system_profile_hash, AggregationProcedure& procedure,
ReportAggregate& aggregate) {
std::unique_ptr<logger::EventRecord> record =
MakeEventRecord(util::TimeInfo::FromDayIndex(day_index));
StringEvent* event = record->event()->mutable_string_event();
event->add_event_code(0);
for (const auto& [event_code, strings] : events_to_strings) {
event->set_event_code(0, event_code);
for (const std::string& str : strings) {
event->set_string_value(str);
procedure.UpdateAggregate(*record, aggregate, system_profile_hash,
util::FromUnixSeconds(util::DayIndexToUnixSeconds(day_index)));
}
}
}
std::unique_ptr<AtLeastOnceStringAggregationProcedure> GetProcedure(uint32_t metric_id,
int report_index) {
return std::make_unique<AtLeastOnceStringAggregationProcedure>(
GetMetricDef(metric_id), GetReportDef(metric_id, report_index));
}
};
TEST_F(AtLeastOnceStringAggregationProcedureTest, UpdateAggregate1DayReport) {
uint32_t metric_id = kStringMetricMetricId;
int report_index = kStringMetricUniqueDeviceStringCountsReport1DayReportIndex;
std::unique_ptr<AggregationProcedure> procedure = GetProcedureFor(metric_id, report_index);
ReportAggregate report_aggregate;
const uint32_t kDayIndex = 10000;
const uint64_t system_profile_hash = uint64_t{2222};
const std::vector<std::string> kTestStrings = {
"Nunc dictum justo ac arcu.",
"Suspendisse ullamcorper mi vel pulvinar dictum.",
};
const std::vector<std::string> kTestStrings1 = {
kTestStrings.at(0),
};
const std::vector<std::string> kTestStrings2 = {
kTestStrings.at(0),
kTestStrings.at(1),
};
const std::map<uint32_t, std::vector<std::string>> events_to_strings = {
{0, kTestStrings1},
{2, kTestStrings2},
};
ASSERT_GE(GetReportDef(metric_id, report_index).event_vector_buffer_max(),
events_to_strings.size());
AddStringEventsForDay(kDayIndex, events_to_strings, system_profile_hash, *procedure,
report_aggregate);
std::vector<std::string> expected_hashes;
expected_hashes.reserve(kTestStrings.size());
for (const std::string& string : kTestStrings) {
expected_hashes.push_back(util::FarmhashFingerprint(string));
}
ASSERT_TRUE(report_aggregate.daily().by_day_index().contains(kDayIndex));
EXPECT_EQ(report_aggregate.daily().by_day_index().at(kDayIndex).string_hashes_size(),
kTestStrings.size());
EXPECT_THAT(report_aggregate.daily().by_day_index().at(kDayIndex).string_hashes(),
UnorderedElementsAreArray(expected_hashes));
ASSERT_EQ(report_aggregate.daily().by_day_index().at(kDayIndex).system_profile_aggregates_size(),
1u);
const SystemProfileAggregate& system_profile_agg =
report_aggregate.daily().by_day_index().at(kDayIndex).system_profile_aggregates(0);
ASSERT_EQ(system_profile_agg.system_profile_hash(), system_profile_hash);
for (int i = 0; i < events_to_strings.size(); i++) {
std::vector<std::string> test_strings =
events_to_strings.at(system_profile_agg.by_event_code(i).event_codes(0));
ASSERT_EQ(system_profile_agg.by_event_code(i).data().unique_strings().unique_strings().size(),
test_strings.size());
}
}
TEST_F(AtLeastOnceStringAggregationProcedureTest, MergeAggregateDataBothSet) {
UniqueString unique_string_1 = UniqueString();
unique_string_1.set_last_day_index(101);
UniqueString unique_string_2 = UniqueString();
unique_string_2.set_last_day_index(100);
AggregateData data;
data.mutable_unique_strings()->mutable_unique_strings()->insert({0, unique_string_1});
data.mutable_unique_strings()->mutable_unique_strings()->insert({1, unique_string_1});
AggregateData merged_data;
merged_data.mutable_unique_strings()->mutable_unique_strings()->insert({1, unique_string_2});
merged_data.mutable_unique_strings()->mutable_unique_strings()->insert({2, unique_string_2});
std::unique_ptr<AtLeastOnceStringAggregationProcedure> procedure = GetProcedure(
kStringMetricMetricId, kStringMetricUniqueDeviceStringCountsReport1DayReportIndex);
procedure->MergeAggregateData(merged_data, data);
EXPECT_EQ(merged_data.unique_strings().unique_strings_size(), 3);
ASSERT_TRUE(merged_data.unique_strings().unique_strings().contains(0));
EXPECT_EQ(merged_data.unique_strings().unique_strings().at(0).last_day_index(), 101);
ASSERT_TRUE(merged_data.unique_strings().unique_strings().contains(1));
EXPECT_EQ(merged_data.unique_strings().unique_strings().at(1).last_day_index(), 101);
ASSERT_TRUE(merged_data.unique_strings().unique_strings().contains(2));
EXPECT_EQ(merged_data.unique_strings().unique_strings().at(2).last_day_index(), 100);
}
TEST_F(AtLeastOnceStringAggregationProcedureTest, MergeAggregateDataNeitherSet) {
AggregateData data;
AggregateData merged_data;
std::unique_ptr<AtLeastOnceStringAggregationProcedure> procedure = GetProcedure(
kOccurrenceMetricMetricId, kOccurrenceMetricUniqueDeviceCountsReport1DayReportIndex);
procedure->MergeAggregateData(merged_data, data);
EXPECT_FALSE(merged_data.has_unique_strings());
EXPECT_EQ(merged_data.unique_strings().unique_strings_size(), 0);
}
TEST_F(AtLeastOnceStringAggregationProcedureTest, MergeAggregateDataFromSet) {
UniqueString unique_string = UniqueString();
unique_string.set_last_day_index(101);
AggregateData data;
data.mutable_unique_strings()->mutable_unique_strings()->insert({0, unique_string});
data.mutable_unique_strings()->mutable_unique_strings()->insert({1, unique_string});
AggregateData merged_data;
std::unique_ptr<AtLeastOnceStringAggregationProcedure> procedure = GetProcedure(
kOccurrenceMetricMetricId, kOccurrenceMetricUniqueDeviceCountsReport1DayReportIndex);
procedure->MergeAggregateData(merged_data, data);
EXPECT_EQ(merged_data.unique_strings().unique_strings_size(), 2);
ASSERT_TRUE(merged_data.unique_strings().unique_strings().contains(0));
EXPECT_EQ(merged_data.unique_strings().unique_strings().at(0).last_day_index(), 101);
ASSERT_TRUE(merged_data.unique_strings().unique_strings().contains(1));
EXPECT_EQ(merged_data.unique_strings().unique_strings().at(1).last_day_index(), 101);
}
TEST_F(AtLeastOnceStringAggregationProcedureTest, MergeAggregateDataToSet) {
AggregateData data;
UniqueString unique_string = UniqueString();
unique_string.set_last_day_index(100);
AggregateData merged_data;
data.mutable_unique_strings()->mutable_unique_strings()->insert({1, unique_string});
data.mutable_unique_strings()->mutable_unique_strings()->insert({2, unique_string});
std::unique_ptr<AtLeastOnceStringAggregationProcedure> procedure = GetProcedure(
kOccurrenceMetricMetricId, kOccurrenceMetricUniqueDeviceCountsReport1DayReportIndex);
procedure->MergeAggregateData(merged_data, data);
EXPECT_EQ(merged_data.unique_strings().unique_strings_size(), 2);
ASSERT_TRUE(merged_data.unique_strings().unique_strings().contains(1));
EXPECT_EQ(merged_data.unique_strings().unique_strings().at(1).last_day_index(), 100);
ASSERT_TRUE(merged_data.unique_strings().unique_strings().contains(2));
EXPECT_EQ(merged_data.unique_strings().unique_strings().at(2).last_day_index(), 100);
}
TEST_F(AtLeastOnceStringAggregationProcedureTest, GenerateObservation1DayReport) {
uint32_t metric_id = kStringMetricMetricId;
int report_index = kStringMetricUniqueDeviceStringCountsReport1DayReportIndex;
std::unique_ptr<AggregationProcedure> procedure = GetProcedureFor(metric_id, report_index);
const uint64_t system_profile_hash = uint64_t{2222};
const uint32_t kDayIndex = 10000;
util::TimeInfo time_info;
time_info.day_index = kDayIndex;
ReportAggregate report_aggregate;
const std::vector<std::string> kTestStrings = {
"Nunc dictum justo ac arcu.",
"Suspendisse ullamcorper mi vel pulvinar dictum.",
"Aenean feugiat consectetur vestibulum.",
};
const std::vector<std::string> kTestStrings1 = {
kTestStrings.at(0),
};
const std::vector<std::string> kTestStrings2 = {
kTestStrings.at(1),
kTestStrings.at(2),
};
const std::vector<std::string> kTestStrings3 = {
kTestStrings.at(2),
};
const std::map<uint32_t, std::vector<std::string>> events_to_strings = {
{0, kTestStrings1},
{2, kTestStrings2},
{5, kTestStrings3},
};
ASSERT_GE(GetReportDef(metric_id, report_index).event_vector_buffer_max(),
events_to_strings.size());
const std::vector<std::string> kTestHashes1 = {
util::FarmhashFingerprint(kTestStrings.at(0)),
};
const std::vector<std::string> kTestHashes2 = {
util::FarmhashFingerprint(kTestStrings.at(1)),
util::FarmhashFingerprint(kTestStrings.at(2)),
};
const std::vector<std::string> kTestHashes3 = {
util::FarmhashFingerprint(kTestStrings.at(2)),
};
const std::map<std::vector<std::string>, std::vector<std::string>> strings_to_hashes = {
{kTestStrings1, kTestHashes1},
{kTestStrings2, kTestHashes2},
{kTestStrings3, kTestHashes3},
};
AddStringEventsForDay(kDayIndex, events_to_strings, system_profile_hash, *procedure,
report_aggregate);
lib::statusor::StatusOr<std::vector<ObservationAndSystemProfile>> observations_or =
procedure->GenerateObservations(time_info, report_aggregate);
ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
std::vector<ObservationAndSystemProfile> observations = std::move(observations_or).value();
ASSERT_EQ(observations.size(), 1u);
EXPECT_EQ(observations[0].system_profile_hash, system_profile_hash);
ASSERT_TRUE(observations[0].observation->has_string_histogram());
const StringHistogramObservation& histogram = observations[0].observation->string_histogram();
ASSERT_EQ(histogram.string_histograms_size(), events_to_strings.size());
std::vector<std::string> expected_hashes;
expected_hashes.reserve(kTestStrings.size());
for (const std::string& string : kTestStrings) {
expected_hashes.push_back(util::FarmhashFingerprint(string));
}
EXPECT_THAT(histogram.string_hashes(), UnorderedElementsAreArray(expected_hashes));
for (const IndexHistogram& value : histogram.string_histograms()) {
// These string vectors represent the expected (test) vectors of strings and hashes that the
// current event vector histogram should have.
const std::vector<std::string>& test_strings = events_to_strings.at(value.event_codes(0));
const std::vector<std::string>& test_hashes = strings_to_hashes.at(test_strings);
// This creates a vector of string hashes by fetching the string hashes that correspond to each
// bucket indices found in the current event vector histogram.
std::vector<std::string> actualHashes;
actualHashes.reserve(test_hashes.size());
for (const uint32_t index : value.bucket_indices()) {
actualHashes.push_back(histogram.string_hashes(static_cast<int>(index)));
}
// Assert that the created (actual) string hash vector has all of the same string hashes as the
// expected (test) string hash vector.
ASSERT_THAT(actualHashes, UnorderedElementsAreArray(test_hashes));
}
// Check that obsolete aggregates get cleaned up.
procedure->ObservationsCommitted(report_aggregate, time_info, system_profile_hash);
ASSERT_EQ(report_aggregate.daily().by_day_index_size(), 0);
EXPECT_FALSE(report_aggregate.daily().by_day_index().contains(kDayIndex));
// Check that calling observation generation the next day generates no observation.
time_info.day_index++;
observations_or = procedure->GenerateObservations(time_info, report_aggregate);
ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
observations = std::move(observations_or).value();
EXPECT_EQ(observations.size(), 0u);
}
TEST_F(AtLeastOnceStringAggregationProcedureTest, GenerateObservation1DayReportExpedited) {
uint32_t metric_id = kExpeditedStringMetricMetricId;
int report_index = kExpeditedStringMetricUniqueDeviceStringCountsReport1DayReportIndex;
std::unique_ptr<AggregationProcedure> procedure = GetProcedureFor(metric_id, report_index);
const uint64_t system_profile_hash = uint64_t{2222};
const uint32_t kDayIndex = 10000;
util::TimeInfo time_info;
time_info.day_index = kDayIndex;
ReportAggregate report_aggregate;
const std::vector<std::string> kTestStrings = {
"Nunc dictum justo ac arcu.",
"Suspendisse ullamcorper mi vel pulvinar dictum.",
};
const std::vector<std::string> kTestStrings1 = {
kTestStrings.at(0),
};
const std::vector<std::string> kTestStrings2 = {
kTestStrings.at(0),
kTestStrings.at(1),
};
const std::map<uint32_t, std::vector<std::string>> events_to_strings = {
{0, kTestStrings1},
{2, kTestStrings2},
};
ASSERT_GE(GetReportDef(metric_id, report_index).event_vector_buffer_max(),
events_to_strings.size());
const std::vector<std::string> kTestHashes1 = {
util::FarmhashFingerprint(kTestStrings.at(0)),
};
const std::vector<std::string> kTestHashes2 = {
util::FarmhashFingerprint(kTestStrings.at(0)),
util::FarmhashFingerprint(kTestStrings.at(1)),
};
const std::map<std::vector<std::string>, std::vector<std::string>> strings_to_hashes = {
{kTestStrings1, kTestHashes1},
{kTestStrings2, kTestHashes2},
};
AddStringEventsForDay(kDayIndex, events_to_strings, system_profile_hash, *procedure,
report_aggregate);
lib::statusor::StatusOr<std::vector<ObservationAndSystemProfile>> observations_or =
procedure->GenerateObservations(time_info, report_aggregate);
ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
std::vector<ObservationAndSystemProfile> observations = std::move(observations_or).value();
ASSERT_EQ(observations.size(), 1u);
EXPECT_EQ(observations[0].system_profile_hash, system_profile_hash);
ASSERT_TRUE(observations[0].observation->has_string_histogram());
const StringHistogramObservation& histogram = observations[0].observation->string_histogram();
ASSERT_EQ(histogram.string_histograms_size(), events_to_strings.size());
std::vector<std::string> expected_hashes;
expected_hashes.reserve(kTestStrings.size());
for (const std::string& string : kTestStrings) {
expected_hashes.push_back(util::FarmhashFingerprint(string));
}
EXPECT_THAT(histogram.string_hashes(), UnorderedElementsAreArray(expected_hashes));
for (const IndexHistogram& value : histogram.string_histograms()) {
// These string vectors represent the expected (test) vectors of strings and hashes that the
// current event vector histogram should have.
const std::vector<std::string>& test_strings = events_to_strings.at(value.event_codes(0));
const std::vector<std::string>& test_hashes = strings_to_hashes.at(test_strings);
// This creates a vector of string hashes by fetching the string hashes that correspond to each
// bucket indices found in the current event vector histogram.
std::vector<std::string> actualHashes;
actualHashes.reserve(test_hashes.size());
for (const uint32_t index : value.bucket_indices()) {
actualHashes.push_back(histogram.string_hashes(static_cast<int>(index)));
}
// Assert that the created (actual) string hash vector has all of the same string hashes as the
// expected (test) string hash vector.
ASSERT_THAT(actualHashes, UnorderedElementsAreArray(test_hashes));
}
// Check that aggregates get marked as sent and don't get cleaned up while they are still needed
// that day.
procedure->ObservationsCommitted(report_aggregate, time_info, system_profile_hash);
ASSERT_NE(report_aggregate.daily().by_day_index_size(), 0);
EXPECT_TRUE(report_aggregate.daily().by_day_index().contains(kDayIndex));
EXPECT_EQ(report_aggregate.daily().by_day_index().at(kDayIndex).system_profile_aggregates_size(),
1U);
const SystemProfileAggregate& system_profile_agg =
report_aggregate.daily().by_day_index().at(kDayIndex).system_profile_aggregates(0);
for (int i = 0; i < events_to_strings.size(); i++) {
for (const auto& [index, unique_string] :
system_profile_agg.by_event_code(i).data().unique_strings().unique_strings()) {
EXPECT_EQ(unique_string.last_day_index(), kDayIndex);
}
}
// Check that calling observation generation again for the same day generates no observation.
observations_or = procedure->GenerateObservations(time_info, report_aggregate);
ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
observations = std::move(observations_or).value();
ASSERT_EQ(observations.size(), 1u);
EXPECT_EQ(observations[0].system_profile_hash, system_profile_hash);
EXPECT_EQ(observations[0].observation, nullptr);
// Check that calling observation generation the next day generates no observation.
time_info.day_index++;
observations_or = procedure->GenerateObservations(time_info, report_aggregate);
ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
observations = std::move(observations_or).value();
EXPECT_EQ(observations.size(), 0u);
// Check that obsolete aggregates get cleaned up the next day.
procedure->ObservationsCommitted(report_aggregate, time_info, system_profile_hash);
EXPECT_FALSE(report_aggregate.daily().by_day_index().contains(kDayIndex));
}
TEST_F(AtLeastOnceStringAggregationProcedureTest, GenerateObservation7DaysReport) {
uint32_t metric_id = kStringMetricMetricId;
int report_index = kStringMetricUniqueDeviceStringCountsReport7DaysReportIndex;
std::unique_ptr<AggregationProcedure> procedure = GetProcedureFor(metric_id, report_index);
const uint64_t system_profile_hash = uint64_t{2222};
const uint32_t kDayIndex = 10000;
util::TimeInfo time_info;
ReportAggregate report_aggregate;
const std::vector<std::string> kTestStrings = {
"Nunc dictum justo ac arcu.",
"Suspendisse ullamcorper mi vel pulvinar dictum.",
};
const std::vector<std::string> kTestStrings1 = {
kTestStrings.at(0),
};
const std::vector<std::string> kTestStrings2 = {
kTestStrings.at(0),
kTestStrings.at(1),
};
const std::map<uint32_t, std::vector<std::string>> events_to_strings = {
{0, kTestStrings1},
{2, kTestStrings2},
};
ASSERT_GE(GetReportDef(metric_id, report_index).event_vector_buffer_max(),
events_to_strings.size());
const std::vector<std::string> kTestHashes1 = {
util::FarmhashFingerprint(kTestStrings.at(0)),
};
const std::vector<std::string> kTestHashes2 = {
util::FarmhashFingerprint(kTestStrings.at(0)),
util::FarmhashFingerprint(kTestStrings.at(1)),
};
const std::map<std::vector<std::string>, std::vector<std::string>> strings_to_hashes = {
{kTestStrings1, kTestHashes1},
{kTestStrings2, kTestHashes2},
};
AddStringEventsForDay(kDayIndex, events_to_strings, system_profile_hash, *procedure,
report_aggregate);
std::vector<std::string> expected_hashes;
expected_hashes.reserve(kTestStrings.size());
for (const std::string& string : kTestStrings) {
expected_hashes.push_back(util::FarmhashFingerprint(string));
}
for (int i = 0; i < 7; i++) {
time_info.day_index = kDayIndex + i;
lib::statusor::StatusOr<std::vector<ObservationAndSystemProfile>> observations_or =
procedure->GenerateObservations(time_info, report_aggregate);
ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
std::vector<ObservationAndSystemProfile> observations = std::move(observations_or).value();
ASSERT_EQ(observations.size(), 1u);
EXPECT_EQ(observations[0].system_profile_hash, system_profile_hash);
ASSERT_TRUE(observations[0].observation->has_string_histogram());
const StringHistogramObservation& histogram = observations[0].observation->string_histogram();
ASSERT_EQ(histogram.string_histograms_size(), events_to_strings.size());
EXPECT_THAT(histogram.string_hashes(), UnorderedElementsAreArray(expected_hashes));
for (const IndexHistogram& value : histogram.string_histograms()) {
// These string vectors represent the expected (test) vectors of strings and hashes that the
// current event vector histogram should have.
const std::vector<std::string>& test_strings = events_to_strings.at(value.event_codes(0));
const std::vector<std::string>& test_hashes = strings_to_hashes.at(test_strings);
// This creates a vector of string hashes by fetching the string hashes that correspond to
// each bucket indices found in the current event vector histogram.
std::vector<std::string> actualHashes;
actualHashes.reserve(test_hashes.size());
for (const uint32_t index : value.bucket_indices()) {
actualHashes.push_back(histogram.string_hashes(static_cast<int>(index)));
}
// Assert that the created (actual) string hash vector has all of the same string hashes as
// the expected (test) string hash vector.
ASSERT_THAT(actualHashes, UnorderedElementsAreArray(test_hashes));
}
// Check that obsolete aggregates get cleaned up.
procedure->ObservationsCommitted(report_aggregate, time_info, system_profile_hash);
// No data for days beyond the 7-day window.
EXPECT_FALSE(report_aggregate.daily().by_day_index().contains(time_info.day_index - 6));
// Days in the 7-day window that had events continue to contain data until the last day when it
// is cleaned up.
if (time_info.day_index - 6 != kDayIndex) {
ASSERT_TRUE(report_aggregate.daily().by_day_index().contains(kDayIndex))
<< "day_index: " << time_info.day_index;
ASSERT_EQ(
report_aggregate.daily().by_day_index().at(kDayIndex).system_profile_aggregates_size(),
1u);
const SystemProfileAggregate& system_profile_agg =
report_aggregate.daily().by_day_index().at(kDayIndex).system_profile_aggregates(0);
EXPECT_EQ(system_profile_agg.system_profile_hash(), system_profile_hash);
EXPECT_EQ(system_profile_agg.by_event_code_size(), events_to_strings.size());
}
// Days that had no events contain no data.
for (uint32_t day = time_info.day_index - 5; day < kDayIndex; ++day) {
EXPECT_FALSE(report_aggregate.daily().by_day_index().contains(day));
}
for (uint32_t day = kDayIndex + 1; day <= time_info.day_index; ++day) {
EXPECT_FALSE(report_aggregate.daily().by_day_index().contains(day));
}
}
// After 7 days the observation is no longer generated.
time_info.day_index = kDayIndex + 7;
lib::statusor::StatusOr<std::vector<ObservationAndSystemProfile>> observations_or =
procedure->GenerateObservations(time_info, report_aggregate);
ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
std::vector<ObservationAndSystemProfile> observations = std::move(observations_or).value();
EXPECT_EQ(observations.size(), 0u);
}
TEST_F(AtLeastOnceStringAggregationProcedureTest,
GenerateObservation7DaysReportWithDifferentEventVectors) {
uint32_t metric_id = kStringMetricMetricId;
int report_index = kStringMetricUniqueDeviceStringCountsReport7DaysReportIndex;
std::unique_ptr<AggregationProcedure> procedure = GetProcedureFor(metric_id, report_index);
const uint64_t system_profile_hash = uint64_t{2222};
ReportAggregate report_aggregate;
const std::vector<std::string> kTestStrings = {
"Nunc dictum justo ac arcu.",
"Suspendisse ullamcorper mi vel pulvinar dictum.",
"Integer a ullamcorper dolor.",
};
const std::vector<std::string> kTestStrings1 = {
kTestStrings.at(0),
};
const std::vector<std::string> kTestStrings2 = {
kTestStrings.at(0),
kTestStrings.at(1),
};
const std::vector<std::string> kTestStrings3 = {
kTestStrings.at(1),
};
const std::vector<std::string> kTestStrings4 = {
kTestStrings.at(1),
kTestStrings.at(2),
};
const std::vector<std::string> kTestStrings5 = {
kTestStrings.at(2),
};
const std::vector<std::string> kTestHashes1 = {
util::FarmhashFingerprint(kTestStrings.at(0)),
};
const std::vector<std::string> kTestHashes2 = {
util::FarmhashFingerprint(kTestStrings.at(0)),
util::FarmhashFingerprint(kTestStrings.at(1)),
};
const std::vector<std::string> kTestHashes3 = {
util::FarmhashFingerprint(kTestStrings.at(1)),
};
const std::vector<std::string> kTestHashes4 = {
util::FarmhashFingerprint(kTestStrings.at(1)),
util::FarmhashFingerprint(kTestStrings.at(2)),
};
const std::vector<std::string> kTestHashes5 = {
util::FarmhashFingerprint(kTestStrings.at(2)),
};
const std::map<std::vector<std::string>, std::vector<std::string>> strings_to_hashes = {
{kTestStrings1, kTestHashes1}, {kTestStrings2, kTestHashes2}, {kTestStrings3, kTestHashes3},
{kTestStrings4, kTestHashes4}, {kTestStrings5, kTestHashes5},
};
const uint32_t kDayIndexDay1 = 10000;
const std::map<uint32_t, std::vector<std::string>> events_to_strings_day_1 = {
{0, kTestStrings1},
{2, kTestStrings2},
{5, kTestStrings3},
};
AddStringEventsForDay(kDayIndexDay1, events_to_strings_day_1, system_profile_hash, *procedure,
report_aggregate);
const uint32_t kDayIndexDay2 = kDayIndexDay1 + 1;
const std::map<uint32_t, std::vector<std::string>> events_to_strings_day_2 = {
{2, kTestStrings4},
{7, kTestStrings5},
};
AddStringEventsForDay(kDayIndexDay2, events_to_strings_day_2, system_profile_hash, *procedure,
report_aggregate);
// This a vector string hashes that is a combination of hashes across 2 days for a single event
// vector.
std::vector<std::string> kTestMultiDayHash = {
util::FarmhashFingerprint(kTestStrings.at(0)),
util::FarmhashFingerprint(kTestStrings.at(1)),
util::FarmhashFingerprint(kTestStrings.at(2)),
};
const std::map<uint32_t, std::vector<std::string>> events_to_hashes = {
{0, kTestHashes1},
{2, kTestMultiDayHash},
{5, kTestHashes3},
{7, kTestHashes5},
};
std::vector<std::string> expected_hashes;
expected_hashes.reserve(kTestStrings.size());
for (const std::string& string : kTestStrings) {
expected_hashes.push_back(util::FarmhashFingerprint(string));
}
util::TimeInfo time_info;
time_info.day_index = kDayIndexDay2;
lib::statusor::StatusOr<std::vector<ObservationAndSystemProfile>> observations_or =
procedure->GenerateObservations(time_info, report_aggregate);
ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
std::vector<ObservationAndSystemProfile> observations = std::move(observations_or).value();
ASSERT_EQ(observations.size(), 1u);
EXPECT_EQ(observations[0].system_profile_hash, system_profile_hash);
ASSERT_TRUE(observations[0].observation->has_string_histogram());
const StringHistogramObservation& histogram = observations[0].observation->string_histogram();
// Check that the number of string histograms for the second day index is the total number of
// unique events across both days, due to the fact that different event codes were reported on the
// first day and the observation for the second day encompasses both days.
ASSERT_EQ(histogram.string_histograms_size(), events_to_hashes.size());
// Check that the sting hashes from the histogram is the same as the expected string hashes.
EXPECT_THAT(histogram.string_hashes(), UnorderedElementsAreArray(expected_hashes));
std::set<uint32_t> seen_event_vectors;
for (const IndexHistogram& value : histogram.string_histograms()) {
// Check that there are unique event vectors for each observation.
uint32_t event_vector = value.event_codes(0);
ASSERT_FALSE(seen_event_vectors.count(event_vector));
seen_event_vectors.insert(event_vector);
// This string vector represent the expected (test) vector of string hashes that the
// current event vector histogram should have.
const std::vector<std::string>& test_hashes = events_to_hashes.at(event_vector);
// This creates a vector of string hashes by fetching the string hashes that correspond to each
// bucket indices found in the current event vector histogram.
std::vector<std::string> actualHashes;
actualHashes.reserve(test_hashes.size());
std::set<std::string> seen_string_hashes;
for (const uint32_t index : value.bucket_indices()) {
// Check that each string hash is unique for each event vector.
std::string string_hash = histogram.string_hashes(static_cast<int>(index));
ASSERT_FALSE(seen_string_hashes.count(string_hash));
seen_string_hashes.insert(string_hash);
actualHashes.push_back(string_hash);
}
// Assert that the created (actual) string hash vector has all of the same string hashes as the
// expected (test) string hash vector.
ASSERT_THAT(actualHashes, UnorderedElementsAreArray(test_hashes));
}
// Commit observation
procedure->ObservationsCommitted(report_aggregate, time_info, system_profile_hash);
// After 7 days the observation is no longer generated.
time_info.day_index = kDayIndexDay2 + 7;
observations_or = procedure->GenerateObservations(time_info, report_aggregate);
ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
observations = std::move(observations_or).value();
EXPECT_EQ(observations.size(), 0u);
}
TEST_F(AtLeastOnceStringAggregationProcedureTest, GenerateObservation7DaysReportExpedited) {
uint32_t metric_id = kExpeditedStringMetricMetricId;
int report_index = kExpeditedStringMetricUniqueDeviceStringCountsReport7DaysReportIndex;
std::unique_ptr<AggregationProcedure> procedure = GetProcedureFor(metric_id, report_index);
const uint64_t system_profile_hash = uint64_t{2222};
const uint32_t kDayIndex = 10000;
util::TimeInfo time_info;
ReportAggregate report_aggregate;
const std::vector<std::string> kTestStrings = {
"Nunc dictum justo ac arcu.",
"Suspendisse ullamcorper mi vel pulvinar dictum.",
};
const std::vector<std::string> kTestStrings1 = {
kTestStrings.at(0),
};
const std::vector<std::string> kTestStrings2 = {
kTestStrings.at(0),
kTestStrings.at(1),
};
const std::map<uint32_t, std::vector<std::string>> events_to_strings = {
{0, kTestStrings1},
{2, kTestStrings2},
};
ASSERT_GE(GetReportDef(metric_id, report_index).event_vector_buffer_max(),
events_to_strings.size());
const std::vector<std::string> kTestHashes1 = {
util::FarmhashFingerprint(kTestStrings.at(0)),
};
const std::vector<std::string> kTestHashes2 = {
util::FarmhashFingerprint(kTestStrings.at(0)),
util::FarmhashFingerprint(kTestStrings.at(1)),
};
const std::map<std::vector<std::string>, std::vector<std::string>> strings_to_hashes = {
{kTestStrings1, kTestHashes1},
{kTestStrings2, kTestHashes2},
};
AddStringEventsForDay(kDayIndex, events_to_strings, system_profile_hash, *procedure,
report_aggregate);
std::vector<std::string> expected_hashes;
expected_hashes.reserve(kTestStrings.size());
for (const std::string& string : kTestStrings) {
expected_hashes.push_back(util::FarmhashFingerprint(string));
}
for (int i = 0; i < 7; i++) {
time_info.day_index = kDayIndex + i;
lib::statusor::StatusOr<std::vector<ObservationAndSystemProfile>> observations_or =
procedure->GenerateObservations(time_info, report_aggregate);
ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
std::vector<ObservationAndSystemProfile> observations = std::move(observations_or).value();
ASSERT_EQ(observations.size(), 1u);
EXPECT_EQ(observations[0].system_profile_hash, system_profile_hash);
ASSERT_TRUE(observations[0].observation->has_string_histogram());
const StringHistogramObservation& histogram = observations[0].observation->string_histogram();
ASSERT_EQ(histogram.string_histograms_size(), events_to_strings.size());
EXPECT_THAT(histogram.string_hashes(), UnorderedElementsAreArray(expected_hashes));
for (const IndexHistogram& value : histogram.string_histograms()) {
// These string vectors represent the expected (test) vectors of strings and hashes that the
// current event vector histogram should have.
const std::vector<std::string>& test_strings = events_to_strings.at(value.event_codes(0));
const std::vector<std::string>& test_hashes = strings_to_hashes.at(test_strings);
// This creates a vector of string hashes by fetching the string hashes that correspond to
// each bucket indices found in the current event vector histogram.
std::vector<std::string> actualHashes;
actualHashes.reserve(test_hashes.size());
for (const uint32_t index : value.bucket_indices()) {
actualHashes.push_back(histogram.string_hashes(static_cast<int>(index)));
}
// Assert that the created (actual) string hash vector has all of the same string hashes as
// the expected (test) string hash vector.
ASSERT_THAT(actualHashes, UnorderedElementsAreArray(test_hashes));
}
// Check that aggregates get marked as sent and don't get cleaned up while they are still needed
// that day.
procedure->ObservationsCommitted(report_aggregate, time_info, system_profile_hash);
// No data for days beyond the 7-day window.
EXPECT_FALSE(report_aggregate.daily().by_day_index().contains(time_info.day_index - 7));
// Days in the 7-day window that had events continue to contain data.
ASSERT_TRUE(report_aggregate.daily().by_day_index().contains(kDayIndex));
ASSERT_EQ(
report_aggregate.daily().by_day_index().at(kDayIndex).system_profile_aggregates_size(), 1u);
const SystemProfileAggregate& system_profile_agg =
report_aggregate.daily().by_day_index().at(kDayIndex).system_profile_aggregates(0);
EXPECT_EQ(system_profile_agg.system_profile_hash(), system_profile_hash);
EXPECT_EQ(system_profile_agg.by_event_code_size(), events_to_strings.size());
// Days that had no events contain no data.
for (uint32_t day = time_info.day_index - 6; day < kDayIndex; ++day) {
EXPECT_FALSE(report_aggregate.daily().by_day_index().contains(day));
}
for (uint32_t day = kDayIndex + 1; day <= time_info.day_index; ++day) {
EXPECT_FALSE(report_aggregate.daily().by_day_index().contains(day));
}
// Check that calling observation generation again for the same day generates no observation.
observations_or = procedure->GenerateObservations(time_info, report_aggregate);
ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
observations = std::move(observations_or).value();
ASSERT_EQ(observations.size(), 1u);
EXPECT_EQ(observations[0].system_profile_hash, system_profile_hash);
EXPECT_EQ(observations[0].observation, nullptr);
}
// After 7 days the observation is no longer generated.
time_info.day_index = kDayIndex + 7;
lib::statusor::StatusOr<std::vector<ObservationAndSystemProfile>> observations_or =
procedure->GenerateObservations(time_info, report_aggregate);
ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
std::vector<ObservationAndSystemProfile> observations = std::move(observations_or).value();
EXPECT_EQ(observations.size(), 0u);
}
TEST_F(AtLeastOnceStringAggregationProcedureTest, RejectExcessStrings) {
uint32_t metric_id = kStringMetricMetricId;
int report_index = kStringMetricUniqueDeviceStringCountsReport1DayReportIndex;
std::unique_ptr<AggregationProcedure> procedure = GetProcedureFor(metric_id, report_index);
uint32_t string_buffer_max = GetStringBufferMax();
ReportAggregate report_aggregate;
const uint32_t kDayIndex = 10000;
const uint64_t system_profile_hash = uint64_t{2222};
const std::vector<std::string> kTestStrings = {
"Nunc dictum justo ac arcu.",
"Suspendisse ullamcorper mi vel pulvinar dictum.",
"Aenean feugiat consectetur vestibulum.",
"Integer a ullamcorper dolor.",
"Praesent vel nulla quis metus consectetur aliquam sed ut felis.",
"Integer quis tortor commodo, rutrum risus.",
"Nam consectetur velit ac sollicitudin tempus.",
"Integer ultricies libero quis suscipit lobortis.",
"Aenean bibendum egestas risus auctor tincidunt.",
"Sed sit amet scelerisque neque.",
"Pellentesque dictum quam nec lectus sagittis interdum.",
};
const std::vector<std::string> kTestStrings1 = {
kTestStrings.at(0), kTestStrings.at(1), kTestStrings.at(2), kTestStrings.at(3),
kTestStrings.at(4), kTestStrings.at(5), kTestStrings.at(6),
};
const std::vector<std::string> kTestStrings2 = {
kTestStrings.at(7),
kTestStrings.at(8),
kTestStrings.at(9),
};
const std::map<uint32_t, std::vector<std::string>> events_to_strings = {
{0, kTestStrings1},
{2, kTestStrings2},
};
ASSERT_GE(GetReportDef(metric_id, report_index).event_vector_buffer_max(),
events_to_strings.size());
AddStringEventsForDay(kDayIndex, events_to_strings, system_profile_hash, *procedure,
report_aggregate);
EXPECT_LT(string_buffer_max, kTestStrings.size());
EXPECT_EQ(report_aggregate.daily().by_day_index().at(kDayIndex).string_hashes_size(),
string_buffer_max);
const SystemProfileAggregate& system_profile_agg =
report_aggregate.daily().by_day_index().at(kDayIndex).system_profile_aggregates(0);
ASSERT_LT(system_profile_agg.by_event_code(0).data().unique_strings().unique_strings_size(),
kTestStrings1.size());
ASSERT_EQ(system_profile_agg.by_event_code(0).data().unique_strings().unique_strings_size(),
string_buffer_max);
ASSERT_LT(system_profile_agg.by_event_code(1).data().unique_strings().unique_strings_size(),
kTestStrings2.size());
ASSERT_EQ(system_profile_agg.by_event_code(1).data().unique_strings().unique_strings_size(), 0);
}
TEST_F(AtLeastOnceStringAggregationProcedureTest,
GenerateObservation7DaysReportRejectExcessStrings) {
std::unique_ptr<AggregationProcedure> procedure = GetProcedureFor(
kStringMetricMetricId, kStringMetricUniqueDeviceStringCountsReport7DaysReportIndex);
const uint32_t string_buffer_max = GetStringBufferMax();
ReportAggregate report_aggregate;
const uint64_t system_profile_hash = uint64_t{2222};
const std::vector<std::string> kTestStrings = {
"Nunc dictum justo ac arcu.",
"Suspendisse ullamcorper mi vel pulvinar dictum.",
"Aenean feugiat consectetur vestibulum.",
"Aenean bibendum egestas risus auctor tincidunt.",
"Sed sit amet scelerisque neque.",
"Pellentesque dictum quam nec lectus sagittis interdum.",
};
const std::vector<std::string> kTestStrings1 = {
kTestStrings.at(0),
kTestStrings.at(1),
kTestStrings.at(2),
};
const std::vector<std::string> kTestStrings2 = {
kTestStrings.at(3),
kTestStrings.at(4),
kTestStrings.at(5),
};
const std::vector<std::string> kTestHashes1 = {
util::FarmhashFingerprint(kTestStrings.at(0)),
util::FarmhashFingerprint(kTestStrings.at(1)),
util::FarmhashFingerprint(kTestStrings.at(2)),
};
const std::vector<std::string> kTestHashes2 = {
util::FarmhashFingerprint(kTestStrings.at(3)),
util::FarmhashFingerprint(kTestStrings.at(4)),
util::FarmhashFingerprint(kTestStrings.at(5)),
};
const uint32_t kDayIndexDay1 = 10000;
const std::map<uint32_t, std::vector<std::string>> events_to_strings_day_1 = {
{0, kTestStrings1},
};
AddStringEventsForDay(kDayIndexDay1, events_to_strings_day_1, system_profile_hash, *procedure,
report_aggregate);
const uint32_t kDayIndexDay2 = kDayIndexDay1 + 1;
const std::map<uint32_t, std::vector<std::string>> events_to_strings_day_2 = {
{3, kTestStrings2},
};
AddStringEventsForDay(kDayIndexDay2, events_to_strings_day_2, system_profile_hash, *procedure,
report_aggregate);
const std::map<uint32_t, std::vector<std::string>> events_to_hashes = {
{0, kTestHashes1},
{3, kTestHashes2},
};
// The total number of unique strings should be 6, which should be greater than the string buffer
// max value of 5.
EXPECT_GT(report_aggregate.daily().by_day_index().at(kDayIndexDay1).string_hashes_size() +
report_aggregate.daily().by_day_index().at(kDayIndexDay2).string_hashes_size(),
string_buffer_max);
// Generate the observation on the second day to generate an observation for the last 7 days,
// which should be the two days of String events reported above.
util::TimeInfo time_info;
time_info.day_index = kDayIndexDay2;
lib::statusor::StatusOr<std::vector<ObservationAndSystemProfile>> observations_or =
procedure->GenerateObservations(time_info, report_aggregate);
ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
std::vector<ObservationAndSystemProfile> observations = std::move(observations_or).value();
ASSERT_EQ(observations.size(), 1u);
EXPECT_EQ(observations[0].system_profile_hash, system_profile_hash);
ASSERT_TRUE(observations[0].observation->has_string_histogram());
const StringHistogramObservation& histogram = observations[0].observation->string_histogram();
ASSERT_EQ(histogram.string_histograms_size(), events_to_hashes.size());
// The observation should only have a max string hash size equal to or less than the string buffer
// max. So check that the string hash size is equal to the string buffer max.
ASSERT_EQ(histogram.string_hashes_size(), string_buffer_max);
for (const IndexHistogram& value : histogram.string_histograms()) {
// This string vector represent the expected (test) vector of string hashes that the
// current event vector histogram should have.
const std::vector<std::string>& test_hashes = events_to_hashes.at(value.event_codes(0));
// This creates a vector of string hashes by fetching the string hashes that correspond to each
// bucket indices found in the current event vector histogram.
std::vector<std::string> actualHashes;
actualHashes.reserve(test_hashes.size());
for (const uint32_t index : value.bucket_indices()) {
actualHashes.push_back(histogram.string_hashes(static_cast<int>(index)));
}
// Assert that the created (actual) string hash vector is a subset of the string hashes within
// the expected (test) string hash vector. Due to some string hashes not being observed when
// exceeding the string buffer max.
ASSERT_THAT(actualHashes, IsSubsetOf(test_hashes));
}
procedure->ObservationsCommitted(report_aggregate, time_info, system_profile_hash);
// Check that generating an observation again for the same day will result in no observation, even
// though a string has not been observed due to the string buffer max being reached.
observations_or = procedure->GenerateObservations(time_info, report_aggregate);
ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
observations = std::move(observations_or).value();
ASSERT_EQ(observations.size(), 1u);
EXPECT_EQ(observations[0].system_profile_hash, system_profile_hash);
EXPECT_EQ(observations[0].observation, nullptr);
}
} // namespace cobalt::local_aggregation