blob: f6fa06f3e57e35a6903a26986f872037dd46c48f [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "src/local_aggregation_1_1/aggregation_procedures/at_least_once_aggregation_procedure.h"
#include <memory>
#include <gtest/gtest.h>
#include "src/lib/util/datetime_util.h"
#include "src/lib/util/testing/test_with_files.h"
#include "src/local_aggregation_1_1/aggregation_procedures/aggregation_procedure.h"
#include "src/local_aggregation_1_1/aggregation_procedures/testing/test_aggregation_procedure.h"
#include "src/local_aggregation_1_1/testing/test_registry.cb.h"
#include "src/logger/project_context.h"
#include "src/logger/project_context_factory.h"
#include "src/registry/report_definition.pb.h"
namespace cobalt::local_aggregation {
class AtLeastOnceAggregationProcedureTest : public testing::TestAggregationProcedure {
protected:
logger::EventRecord GetEventRecord(uint32_t metric_id) {
return logger::EventRecord(GetProjectContext(), metric_id);
}
std::unique_ptr<AggregationProcedure> GetProcedure(uint32_t metric_id, uint32_t report_index) {
return std::make_unique<AtLeastOnceAggregationProcedure>(GetMetricDef(metric_id),
GetReportDef(metric_id, report_index));
}
};
TEST_F(AtLeastOnceAggregationProcedureTest, UpdateAggregate1DayReport) {
uint32_t metric_id = kOccurrenceMetricMetricId;
uint32_t report_index = kOccurrenceMetricUniqueDeviceCountsReport1DayReportIndex;
auto report = GetReportDef(metric_id, report_index);
auto procedure = GetProcedure(metric_id, report_index);
const uint32_t kDayIndex = 10000;
uint32_t kNumEventCodes = 100;
ASSERT_GE(report.event_vector_buffer_max(), kNumEventCodes);
ReportAggregate aggregate;
AddOccurrenceEventsForDay(kNumEventCodes, kDayIndex, procedure.get(), &aggregate);
for (uint32_t i = 0; i < kNumEventCodes; ++i) {
ASSERT_EQ(aggregate.daily().by_day_index().count(kDayIndex), 1u);
ASSERT_EQ(aggregate.daily().by_day_index().at(kDayIndex).by_event_code(i).event_codes(0),
i + 1);
ASSERT_TRUE(
aggregate.daily().by_day_index().at(kDayIndex).by_event_code(i).data().at_least_once());
}
}
TEST_F(AtLeastOnceAggregationProcedureTest, GenerateObservation1DayReport) {
uint32_t metric_id = kOccurrenceMetricMetricId;
uint32_t report_index = kOccurrenceMetricUniqueDeviceCountsReport1DayReportIndex;
auto procedure = GetProcedure(metric_id, report_index);
const uint32_t kDayIndex = 10000;
util::TimeInfo time_info;
time_info.day_index = kDayIndex;
const uint32_t kNumEventCodes = 100;
ASSERT_GE(GetReportDef(metric_id, report_index).event_vector_buffer_max(), kNumEventCodes);
ReportAggregate report_aggregate;
AddOccurrenceEventsForDay(kNumEventCodes, kDayIndex, procedure.get(), &report_aggregate);
auto status_or_observation = procedure->GenerateObservation(time_info, &report_aggregate);
ASSERT_TRUE(status_or_observation.ok());
auto observation = std::move(status_or_observation.ValueOrDie());
ASSERT_TRUE(observation->has_integer());
const auto &integer_obs = observation->integer();
ASSERT_EQ(integer_obs.values_size(), kNumEventCodes);
std::set<uint32_t> obs_event_codes;
for (const auto &val : integer_obs.values()) {
ASSERT_EQ(val.event_codes_size(), 1);
uint32_t first = val.event_codes(0);
ASSERT_LE(first, kNumEventCodes);
obs_event_codes.insert(first);
ASSERT_EQ(val.value(), 1);
}
ASSERT_EQ(obs_event_codes.size(), kNumEventCodes);
// Check that obsolete aggregates were cleaned up.
EXPECT_EQ(report_aggregate.daily().by_day_index().count(kDayIndex), 0u);
}
TEST_F(AtLeastOnceAggregationProcedureTest, GenerateObservation7DaysReport) {
uint32_t metric_id = kOccurrenceMetricMetricId;
uint32_t report_index = kOccurrenceMetricUniqueDeviceCountsReport7DaysReportIndex;
auto procedure = GetProcedure(metric_id, report_index);
const uint32_t kDayIndex = 10000;
util::TimeInfo time_info;
time_info.day_index = kDayIndex;
const uint32_t kNumEventCodes = 7;
ASSERT_GE(GetReportDef(metric_id, report_index).event_vector_buffer_max(), kNumEventCodes);
ReportAggregate report_aggregate;
AddOccurrenceEventsForDay(kNumEventCodes, kDayIndex, procedure.get(), &report_aggregate);
auto status_or_observation = procedure->GenerateObservation(time_info, &report_aggregate);
ASSERT_TRUE(status_or_observation.ok());
auto observation = std::move(status_or_observation.ValueOrDie());
ASSERT_TRUE(observation->has_integer());
const auto &integer_obs = observation->integer();
ASSERT_EQ(integer_obs.values_size(), kNumEventCodes);
std::set<uint32_t> obs_event_codes;
for (const auto &val : integer_obs.values()) {
ASSERT_EQ(val.event_codes_size(), 1);
uint32_t first = val.event_codes(0);
ASSERT_LE(first, kNumEventCodes);
obs_event_codes.insert(first);
ASSERT_EQ(val.value(), 1);
}
ASSERT_EQ(obs_event_codes.size(), kNumEventCodes);
// Check that obsolete aggregates were cleaned up.
EXPECT_EQ(report_aggregate.daily().by_day_index().count(kDayIndex - 6), 0u);
for (uint32_t day = kDayIndex; day > kDayIndex - 6; --day) {
EXPECT_EQ(report_aggregate.daily().by_day_index().count(day), 1u);
}
}
// Test the case where a report with a 7 days window does not have daily aggregates for all days.
TEST_F(AtLeastOnceAggregationProcedureTest, GenerateObservation7DaysReportSomeDaysMissing) {
uint32_t metric_id = kOccurrenceMetricMetricId;
uint32_t report_index = kOccurrenceMetricUniqueDeviceCountsReport7DaysReportIndex;
auto procedure = GetProcedure(metric_id, report_index);
const uint32_t kDayIndex = 10000;
util::TimeInfo time_info;
time_info.day_index = kDayIndex;
const uint32_t num_events = 1;
ASSERT_GE(GetReportDef(metric_id, report_index).event_vector_buffer_max(), num_events);
ReportAggregate report_aggregate;
AddOccurrenceEventsForDay(num_events, kDayIndex, procedure.get(), &report_aggregate);
auto status_or_observation = procedure->GenerateObservation(time_info, &report_aggregate);
ASSERT_TRUE(status_or_observation.ok());
auto observation = std::move(status_or_observation.ValueOrDie());
ASSERT_TRUE(observation->has_integer());
const auto &integer_obs = observation->integer();
ASSERT_EQ(integer_obs.values_size(), num_events);
EXPECT_EQ(integer_obs.values(0).value(), num_events);
EXPECT_EQ(integer_obs.values(0).event_codes_size(), 1);
EXPECT_EQ(integer_obs.values(0).event_codes(0), num_events);
// Check that the aggregate for the latest day remains in the store.
EXPECT_EQ(report_aggregate.daily().by_day_index().count(kDayIndex), 1u);
}
// Test the case where no events have been seen for the aggregation period.
TEST_F(AtLeastOnceAggregationProcedureTest, GenerateObservationNoAggregates) {
uint32_t metric_id = kOccurrenceMetricMetricId;
uint32_t report_index = kOccurrenceMetricUniqueDeviceCountsReport1DayReportIndex;
auto procedure = GetProcedure(metric_id, report_index);
const uint32_t kDayIndex = 10000;
util::TimeInfo time_info;
time_info.day_index = kDayIndex;
ReportAggregate report_aggregate;
lib::statusor::StatusOr<std::unique_ptr<Observation>> status_or_observation =
procedure->GenerateObservation(time_info, &report_aggregate);
ASSERT_TRUE(status_or_observation.ok());
std::unique_ptr<Observation> observation = status_or_observation.ConsumeValueOrDie();
EXPECT_EQ(observation, nullptr);
}
} // namespace cobalt::local_aggregation