blob: 4359deb958a82fbd9657a6958c0e6253232306e2 [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "src/local_aggregation_1_1/local_aggregation.h"
#include <time.h>
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include "absl/strings/escaping.h"
#include "absl/strings/str_cat.h"
#include "src/lib/util/clock.h"
#include "src/lib/util/testing/test_with_files.h"
#include "src/local_aggregation_1_1/testing/report_all_test_registry_with_report_all_set.cb.h"
#include "src/local_aggregation_1_1/testing/report_all_test_registry_with_select_last_set.cb.h"
#include "src/local_aggregation_1_1/testing/test_registry.cb.h"
#include "src/logger/event_record.h"
#include "src/logger/logger_test_utils.h"
#include "src/logger/observation_writer.h"
#include "src/logger/project_context_factory.h"
#include "src/observation_store/observation_store_internal.pb.h"
#include "src/pb/metadata_builder.h"
#include "src/public/lib/registry_identifiers.h"
#include "src/public/lib/statusor/status_macros.h"
#include "src/system_data/client_secret.h"
#include "src/system_data/fake_system_data.h"
namespace cobalt::local_aggregation {
using ::testing::Contains;
using ::testing::ContainsRegex;
using ::testing::HasSubstr;
using ::testing::Not;
using TimeInfo = util::TimeInfo;
namespace {
// Start 9th January 2014 at 10 minutes after 7PM, UTC.
std::tm kStartingTime = {
.tm_sec = 0,
.tm_min = 10,
.tm_hour = 19,
.tm_mday = 9, // 9th
.tm_mon = 0, // January
.tm_year = 114, // 2014
};
const std::chrono::time_point<std::chrono::system_clock> kStartingTimePoint =
std::chrono::system_clock::from_time_t(timegm(&kStartingTime));
std::unique_ptr<CobaltRegistry> GetRegistry(const std::string& registryBase64) {
std::string bytes;
if (!absl::Base64Unescape(registryBase64, &bytes)) {
LOG(ERROR) << "Unable to decode Base64 String";
return nullptr;
}
auto registry = std::make_unique<CobaltRegistry>();
if (!registry->ParseFromString(bytes)) {
LOG(ERROR) << "Unable to parse registry from bytes";
return nullptr;
}
return registry;
}
} // namespace
class LocalAggregationTest : public util::testing::TestWithFiles {
private:
void SetUp() override {
TestWithFiles::SetUp();
global_project_context_factory_ =
std::make_unique<logger::ProjectContextFactory>(GetRegistry(kCobaltRegistryBase64));
mock_clock_ =
std::make_unique<util::IncrementingSystemClock>(std::chrono::system_clock::duration(0));
mock_clock_->set_time(kStartingTimePoint);
validated_clock_ = std::make_unique<util::FakeValidatedClock>(mock_clock_.get());
validated_clock_->SetAccurate(true);
civil_time_converter_ = std::make_unique<util::UtcTimeConverter>();
metadata_builder_ = std::make_unique<MetadataBuilder>(system_data_, validated_clock_.get(),
system_data_cache_path(), fs());
system_data_.SetVersion("never used");
metadata_builder_->SnapshotSystemData();
test_writer_ = std::make_unique<logger::testing::FakeObservationStore>();
observation_writer_ = std::make_unique<logger::ObservationWriter>(*test_writer_, nullptr);
}
protected:
std::unique_ptr<LocalAggregation> MakeLocalAggregation(
cobalt::StorageQuotas storage_quotas = cobalt::kDefaultStorageQuotas) {
CobaltConfig cfg = {
.storage_quotas = storage_quotas,
.local_aggregate_store_dir = local_aggregation_store_path(),
.client_secret = system_data::ClientSecret::GenerateNewSecret(),
};
return std::make_unique<LocalAggregation>(cfg, *global_project_context_factory_, system_data_,
*metadata_builder_, fs(), *observation_writer_,
*civil_time_converter_);
}
void OverrideRegistry(const std::string& registryBase64) {
global_project_context_factory_ =
std::make_unique<logger::ProjectContextFactory>(GetRegistry(registryBase64));
}
std::shared_ptr<const logger::ProjectContext> GetProjectContext() {
return global_project_context_factory_->NewProjectContext(
lib::CustomerIdentifier(kCustomerId).ForProject(kProjectId));
}
void VerifyStoredIntegerObservation(
int observation_num, lib::ReportIdentifier report, uint32_t day_index,
const std::string& system_version, std::vector<int> expected_count,
std::vector<std::vector<uint32_t>> expected_event_codes = {}) {
ObservationMetadata* metadata = test_writer_->metadata_received[observation_num].get();
EXPECT_EQ(metadata->customer_id(), report.customer_id())
<< " for observation: " << observation_num;
EXPECT_EQ(metadata->project_id(), report.project_id())
<< " for observation: " << observation_num;
EXPECT_EQ(metadata->metric_id(), report.metric_id()) << " for observation: " << observation_num;
EXPECT_EQ(metadata->report_id(), report.report_id()) << " for observation: " << observation_num;
EXPECT_EQ(metadata->day_index(), day_index) << " for observation: " << observation_num;
EXPECT_EQ(metadata->system_profile().system_version(), system_version)
<< " for observation: " << observation_num;
EXPECT_EQ(metadata->system_profile().product_name(), "")
<< " for observation: " << observation_num;
observation_store::StoredObservation* stored_observation =
test_writer_->messages_received[observation_num].get();
ASSERT_TRUE(stored_observation->has_unencrypted()) << " for observation: " << observation_num;
ASSERT_TRUE(stored_observation->unencrypted().has_integer())
<< " for observation: " << observation_num;
ASSERT_EQ(stored_observation->unencrypted().integer().values_size(), expected_count.size())
<< " for observation: " << observation_num;
if (!expected_event_codes.empty()) {
ASSERT_EQ(expected_event_codes.size(), expected_count.size())
<< " for observation: " << observation_num;
}
for (int i = 0; i < expected_count.size(); i++) {
EXPECT_EQ(stored_observation->unencrypted().integer().values(i).value(), expected_count[i])
<< " for observation " << observation_num << " integer value: " << i;
if (!expected_event_codes.empty()) {
EXPECT_THAT(stored_observation->unencrypted().integer().values(i).event_codes(),
testing::ElementsAreArray(expected_event_codes[i]))
<< " for observation " << observation_num << " integer value: " << i;
}
}
}
system_data::FakeSystemData system_data_;
std::unique_ptr<logger::testing::FakeObservationStore> test_writer_;
std::unique_ptr<logger::ObservationWriter> observation_writer_;
std::unique_ptr<util::IncrementingSystemClock> mock_clock_;
std::unique_ptr<util::UtcTimeConverter> civil_time_converter_;
private:
std::unique_ptr<logger::ProjectContextFactory> global_project_context_factory_;
std::unique_ptr<util::FakeValidatedClock> validated_clock_;
std::unique_ptr<MetadataBuilder> metadata_builder_;
};
TEST_F(LocalAggregationTest, AddEventWorks) {
std::unique_ptr<LocalAggregation> aggregation = MakeLocalAggregation();
ASSERT_THAT(fs().ListFiles(absl::StrCat(local_aggregation_store_path(), "/", kCustomerId, "/",
kProjectId))
.value(),
Not(Contains(absl::StrCat(kOccurrenceMetricMetricId))));
auto record =
logger::EventRecord::MakeEventRecord(GetProjectContext(), kOccurrenceMetricMetricId).value();
record->event()->mutable_occurrence_event();
aggregation->AddEvent(*record, mock_clock_->now());
ASSERT_THAT(fs().ListFiles(absl::StrCat(local_aggregation_store_path(), "/", kCustomerId, "/",
kProjectId))
.value(),
Contains(absl::StrCat(kOccurrenceMetricMetricId)));
}
TEST_F(LocalAggregationTest, DisableWorks) {
std::unique_ptr<LocalAggregation> aggregation = MakeLocalAggregation();
aggregation->Disable(true);
// Check that the no data is present.
ASSERT_THAT(fs().ListFiles(absl::StrCat(local_aggregation_store_path(), "/", kCustomerId, "/",
kProjectId))
.value(),
Not(Contains(absl::StrCat(kOccurrenceMetricMetricId))));
auto record =
logger::EventRecord::MakeEventRecord(GetProjectContext(), kOccurrenceMetricMetricId).value();
record->event()->mutable_occurrence_event();
aggregation->AddEvent(*record, mock_clock_->now());
// Check that the data still isn't present.
ASSERT_THAT(fs().ListFiles(absl::StrCat(local_aggregation_store_path(), "/", kCustomerId, "/",
kProjectId))
.value(),
Not(Contains(absl::StrCat(kOccurrenceMetricMetricId))));
}
TEST_F(LocalAggregationTest, GenerateObservationsWorks) {
std::shared_ptr<const logger::ProjectContext> project_context = GetProjectContext();
lib::MetricIdentifier metric_id =
project_context->Identifier().ForMetric(kOccurrenceMetricMetricId);
const MetricDefinition* metric = project_context->GetMetric(kOccurrenceMetricMetricId);
// This test assumes that this metric has no reports with added privacy.
for (const ReportDefinition& report : metric->reports()) {
ASSERT_EQ(ReportDefinition::NO_ADDED_PRIVACY, report.privacy_level());
}
std::unique_ptr<LocalAggregation> aggregation = MakeLocalAggregation();
// Before any events occur, aggregation run happens, no observations are generated.
aggregation->GenerateAggregatedObservations(mock_clock_->now());
ASSERT_EQ(test_writer_->num_observations_added(), 0);
// 10 minutes later, 4 events occur.
mock_clock_->increment_by(std::chrono::minutes(10));
std::unique_ptr<logger::EventRecord> record =
logger::EventRecord::MakeEventRecord(project_context, kOccurrenceMetricMetricId).value();
record->event()->mutable_occurrence_event()->set_count(1);
std::chrono::system_clock::time_point now = mock_clock_->now();
CB_ASSERT_OK_AND_ASSIGN(uint32_t day_index, cobalt::util::TimePointToDayIndex(
now, *civil_time_converter_, *record->metric()));
CB_ASSERT_OK_AND_ASSIGN(uint32_t hour_id, cobalt::util::TimePointToHourId(
now, *civil_time_converter_, *record->metric()));
record->event()->set_day_index(day_index);
record->event()->set_hour_id(hour_id);
record->system_profile()->set_system_version("100");
record->system_profile()->set_product_name("should-be-filtered");
ASSERT_TRUE(aggregation->AddEvent(*record, mock_clock_->now()).ok());
ASSERT_TRUE(aggregation->AddEvent(*record, mock_clock_->now()).ok());
ASSERT_TRUE(aggregation->AddEvent(*record, mock_clock_->now()).ok());
ASSERT_TRUE(aggregation->AddEvent(*record, mock_clock_->now()).ok());
// 50 minutes later (1 hour from the previous run) at 8:10PM, aggregation run happens.
mock_clock_->increment_by(std::chrono::minutes(50));
aggregation->GenerateAggregatedObservations(mock_clock_->now());
// 3 observations created, for fleetwide_occurrence_counts_report, hourly_device_histograms, and
// hourly_device_histograms_report_all.
ASSERT_EQ(test_writer_->num_observations_added(), 3);
VerifyStoredIntegerObservation(
0, metric_id.ForReport(kOccurrenceMetricFleetwideOccurrenceCountsReportReportId), day_index,
"100", {4});
VerifyStoredIntegerObservation(
1, metric_id.ForReport(kOccurrenceMetricHourlyDeviceHistogramsReportId), day_index, "100",
{4});
VerifyStoredIntegerObservation(
2, metric_id.ForReport(kOccurrenceMetricHourlyDeviceHistogramsReportAllReportId), day_index,
"100", {4});
// An hour later at 9:10PM, no more events, next aggregation run happens.
mock_clock_->increment_by(std::chrono::minutes(60));
aggregation->GenerateAggregatedObservations(mock_clock_->now());
// Still only the original 3 observations created.
ASSERT_EQ(test_writer_->num_observations_added(), 3);
// 45 minutes later, 3 more events occur, with a new system version.
mock_clock_->increment_by(std::chrono::minutes(45));
now = mock_clock_->now();
record->event()->set_day_index(day_index);
CB_ASSERT_OK_AND_ASSIGN(
hour_id, cobalt::util::TimePointToHourId(now, *civil_time_converter_, *record->metric()));
record->event()->set_hour_id(hour_id);
record->system_profile()->set_system_version("101");
ASSERT_TRUE(aggregation->AddEvent(*record, mock_clock_->now()).ok());
ASSERT_TRUE(aggregation->AddEvent(*record, mock_clock_->now()).ok());
// 15 minutes later (1 hour from the previous run) at 10:10PM, aggregation run happens.
mock_clock_->increment_by(std::chrono::minutes(15));
aggregation->GenerateAggregatedObservations(mock_clock_->now());
// 3 new observations created, this time with an incremental count of 3.
ASSERT_EQ(test_writer_->num_observations_added(), 6);
VerifyStoredIntegerObservation(
3, metric_id.ForReport(kOccurrenceMetricFleetwideOccurrenceCountsReportReportId), day_index,
"101", {2});
VerifyStoredIntegerObservation(
4, metric_id.ForReport(kOccurrenceMetricHourlyDeviceHistogramsReportId), day_index, "101",
{2});
VerifyStoredIntegerObservation(
5, metric_id.ForReport(kOccurrenceMetricHourlyDeviceHistogramsReportAllReportId), day_index,
"101", {2});
// An hour later at 11:10PM, no more events, next aggregation run happens.
mock_clock_->increment_by(std::chrono::minutes(60));
aggregation->GenerateAggregatedObservations(mock_clock_->now());
// Still only the expected 6 observations created.
ASSERT_EQ(test_writer_->num_observations_added(), 6);
// An hour later at 12:10AM (after midnight UTC), daily event aggregation run happens.
mock_clock_->increment_by(std::chrono::minutes(60));
aggregation->GenerateAggregatedObservations(mock_clock_->now());
// 10 new observations are created, one for each of the non-REPORT_ALL UNIQUE_DEVICE_COUNTS
// reports, two for each of the REPORT_ALL UNIQUE_DEVICE_COUNTS reports.
ASSERT_EQ(test_writer_->num_observations_added(), 16);
VerifyStoredIntegerObservation(
6, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsReport1DayReportId), day_index,
"101", {1});
VerifyStoredIntegerObservation(
7, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsReport7DaysReportId), day_index,
"101", {1});
// SELECT_FIRST reports use the first system version, others use the second.
VerifyStoredIntegerObservation(
8, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsSelectFirstReport1DayReportId),
day_index, "100", {1});
VerifyStoredIntegerObservation(
9, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsSelectFirstReport7DaysReportId),
day_index, "100", {1});
// This report is REPORT_ALL and generates 2 observations
VerifyStoredIntegerObservation(
10, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsReportAllReport1DayReportId),
day_index, "101", {1});
VerifyStoredIntegerObservation(
11, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsReportAllReport1DayReportId),
day_index, "100", {1});
// This report is REPORT_ALL and generates 2 observations
VerifyStoredIntegerObservation(
12, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsReportAllReport7DaysReportId),
day_index, "101", {1});
VerifyStoredIntegerObservation(
13, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsReportAllReport7DaysReportId),
day_index, "100", {1});
VerifyStoredIntegerObservation(
14,
metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsSelectMostCommonReport1DayReportId),
day_index, "101", {1});
VerifyStoredIntegerObservation(
15,
metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsSelectMostCommonReport7DaysReportId),
day_index, "101", {1});
// 45 minutes later, 1 more event occurs, again with a new system version.
day_index += 1;
mock_clock_->increment_by(std::chrono::minutes(45));
now = mock_clock_->now();
record->event()->set_day_index(day_index);
CB_ASSERT_OK_AND_ASSIGN(
hour_id, cobalt::util::TimePointToHourId(now, *civil_time_converter_, *record->metric()));
record->event()->set_hour_id(hour_id);
record->system_profile()->set_system_version("102");
ASSERT_TRUE(aggregation->AddEvent(*record, mock_clock_->now()).ok());
// 15 minutes later (1 hour from the previous run) at 1:10AM, aggregation run happens.
mock_clock_->increment_by(std::chrono::minutes(15));
aggregation->GenerateAggregatedObservations(mock_clock_->now());
// 3 new observations created, this time with an incremental count of 1.
ASSERT_EQ(test_writer_->num_observations_added(), 19);
VerifyStoredIntegerObservation(
16, metric_id.ForReport(kOccurrenceMetricFleetwideOccurrenceCountsReportReportId), day_index,
"102", {1});
VerifyStoredIntegerObservation(
17, metric_id.ForReport(kOccurrenceMetricHourlyDeviceHistogramsReportId), day_index, "102",
{1});
VerifyStoredIntegerObservation(
18, metric_id.ForReport(kOccurrenceMetricHourlyDeviceHistogramsReportAllReportId), day_index,
"102", {1});
// The rest of the day, no more events occur so 22 hourly aggregation runs create no new
// observations.
for (int i = 1; i <= 22; i++) {
mock_clock_->increment_by(std::chrono::minutes(60));
aggregation->GenerateAggregatedObservations(mock_clock_->now());
ASSERT_EQ(test_writer_->num_observations_added(), 19);
}
// At 12:10AM (just after midnight UTC), daily event aggregation run happens.
mock_clock_->increment_by(std::chrono::minutes(60));
aggregation->GenerateAggregatedObservations(mock_clock_->now());
// 10 new observations are created, one for each of the non-REPORT_ALL UNIQUE_DEVICE_COUNTS
// reports, 4 for each of the 7-day REPORT_ALL UNIQUE_DEVICE_COUNTS reports.
ASSERT_EQ(test_writer_->num_observations_added(), 29);
VerifyStoredIntegerObservation(
19, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsReport1DayReportId), day_index,
"102", {1});
VerifyStoredIntegerObservation(
20, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsReport7DaysReportId), day_index,
"102", {1});
// SELECT_FIRST 1-day report uses the new system version.
VerifyStoredIntegerObservation(
21, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsSelectFirstReport1DayReportId),
day_index, "102", {1});
// SELECT_FIRST 7-day report continues to use the original system version.
VerifyStoredIntegerObservation(
22, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsSelectFirstReport7DaysReportId),
day_index, "100", {1});
VerifyStoredIntegerObservation(
23, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsReportAllReport1DayReportId),
day_index, "102", {1});
// 7 day REPORT_ALL metric reports an observation for each previously seen system version. (Order
// is deterministic based on the hash value)
VerifyStoredIntegerObservation(
24, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsReportAllReport7DaysReportId),
day_index, "101", {1});
VerifyStoredIntegerObservation(
25, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsReportAllReport7DaysReportId),
day_index, "100", {1});
VerifyStoredIntegerObservation(
26, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsReportAllReport7DaysReportId),
day_index, "102", {1});
VerifyStoredIntegerObservation(
27,
metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsSelectMostCommonReport1DayReportId),
day_index, "102", {1});
VerifyStoredIntegerObservation(
28,
metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsSelectMostCommonReport7DaysReportId),
day_index, "102", {1});
// The next day, no events occur so all 23 hourly aggregation runs create no new observations.
for (int i = 1; i <= 23; i++) {
mock_clock_->increment_by(std::chrono::minutes(60));
aggregation->GenerateAggregatedObservations(mock_clock_->now());
ASSERT_EQ(test_writer_->num_observations_added(), 29);
}
// Finally, at 12:10AM (just after midnight UTC), daily event aggregation run happens.
mock_clock_->increment_by(std::chrono::minutes(60));
aggregation->GenerateAggregatedObservations(mock_clock_->now());
// 6 new observations are created for the next day, only for the 7-day window
// UNIQUE_DEVICE_COUNTS reports, (3 for the REPORT_ALL 7-day report).
ASSERT_EQ(test_writer_->num_observations_added(), 35);
day_index += 1;
VerifyStoredIntegerObservation(
29, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsReport7DaysReportId), day_index,
"102", {1});
// SELECT_FIRST 7-day report continues to use the original system version.
VerifyStoredIntegerObservation(
30, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsSelectFirstReport7DaysReportId),
day_index, "100", {1});
// 7 day REPORT_ALL metric reports an observation for each previously seen system version. (Order
// is deterministic based on the hash value)
VerifyStoredIntegerObservation(
31, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsReportAllReport7DaysReportId),
day_index, "101", {1});
VerifyStoredIntegerObservation(
32, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsReportAllReport7DaysReportId),
day_index, "100", {1});
VerifyStoredIntegerObservation(
33, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsReportAllReport7DaysReportId),
day_index, "102", {1});
VerifyStoredIntegerObservation(
34,
metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsSelectMostCommonReport7DaysReportId),
day_index, "102", {1});
}
TEST_F(LocalAggregationTest, GenerateObservationsWriterFailures) {
std::shared_ptr<const logger::ProjectContext> project_context = GetProjectContext();
lib::MetricIdentifier metric_id =
project_context->Identifier().ForMetric(kOccurrenceMetricMetricId);
const MetricDefinition* metric = project_context->GetMetric(kOccurrenceMetricMetricId);
// This test assumes that this metric has no reports with added privacy.
for (const ReportDefinition& report : metric->reports()) {
ASSERT_EQ(ReportDefinition::NO_ADDED_PRIVACY, report.privacy_level());
}
std::unique_ptr<LocalAggregation> aggregation = MakeLocalAggregation();
// Before any events occur, aggregation run happens, no observations are generated.
Status status = aggregation->GenerateAggregatedObservations(mock_clock_->now());
ASSERT_EQ(status.error_code(), StatusCode::OK);
ASSERT_EQ(test_writer_->num_observations_added(), 0);
// 10 minutes later, 4 events occur.
mock_clock_->increment_by(std::chrono::minutes(10));
std::unique_ptr<logger::EventRecord> record =
logger::EventRecord::MakeEventRecord(project_context, kOccurrenceMetricMetricId).value();
record->event()->mutable_occurrence_event()->set_count(1);
std::chrono::system_clock::time_point now = mock_clock_->now();
CB_ASSERT_OK_AND_ASSIGN(uint32_t day_index, cobalt::util::TimePointToDayIndex(
now, *civil_time_converter_, *record->metric()));
CB_ASSERT_OK_AND_ASSIGN(uint32_t hour_id, cobalt::util::TimePointToHourId(
now, *civil_time_converter_, *record->metric()));
record->event()->set_day_index(day_index);
record->event()->set_hour_id(hour_id);
record->system_profile()->set_system_version("100");
ASSERT_TRUE(aggregation->AddEvent(*record, mock_clock_->now()).ok());
ASSERT_TRUE(aggregation->AddEvent(*record, mock_clock_->now()).ok());
ASSERT_TRUE(aggregation->AddEvent(*record, mock_clock_->now()).ok());
ASSERT_TRUE(aggregation->AddEvent(*record, mock_clock_->now()).ok());
// 50 minutes later (1 hour from the previous run) at 8:10PM, aggregation run happens.
// But errors occur in the writing to the ObservationStore.
test_writer_->SetFailCalls(true);
mock_clock_->increment_by(std::chrono::minutes(50));
status = aggregation->GenerateAggregatedObservations(mock_clock_->now());
ASSERT_NE(status.error_code(), StatusCode::OK);
ASSERT_EQ(test_writer_->num_observations_added(), 0);
// An hour later at 9:10PM, no more events, next aggregation run happens.
// The previously aggregated events should be retried. This time the writing succeeds.
test_writer_->SetFailCalls(false);
mock_clock_->increment_by(std::chrono::minutes(60));
status = aggregation->GenerateAggregatedObservations(mock_clock_->now());
ASSERT_EQ(status.error_code(), StatusCode::OK);
// 3 observations created, for fleetwide_occurrence_counts_report, hourly_device_histograms, and
// hourly_device_histograms_report_all.
ASSERT_EQ(test_writer_->num_observations_added(), 3);
VerifyStoredIntegerObservation(
0, metric_id.ForReport(kOccurrenceMetricFleetwideOccurrenceCountsReportReportId), day_index,
"100", {4});
VerifyStoredIntegerObservation(
1, metric_id.ForReport(kOccurrenceMetricHourlyDeviceHistogramsReportId), day_index, "100",
{4});
VerifyStoredIntegerObservation(
2, metric_id.ForReport(kOccurrenceMetricHourlyDeviceHistogramsReportAllReportId), day_index,
"100", {4});
// 45 minutes later, 2 more events occur.
mock_clock_->increment_by(std::chrono::minutes(45));
now = mock_clock_->now();
record->event()->set_day_index(day_index);
CB_ASSERT_OK_AND_ASSIGN(
hour_id, cobalt::util::TimePointToHourId(now, *civil_time_converter_, *record->metric()));
record->event()->set_hour_id(hour_id);
ASSERT_TRUE(aggregation->AddEvent(*record, mock_clock_->now()).ok());
ASSERT_TRUE(aggregation->AddEvent(*record, mock_clock_->now()).ok());
// 15 minutes later (1 hour from the previous run) at 10:10PM, aggregation run happens.
// But errors occur again in the writing to the ObservationStore.
test_writer_->SetFailCalls(true);
mock_clock_->increment_by(std::chrono::minutes(15));
status = aggregation->GenerateAggregatedObservations(mock_clock_->now());
ASSERT_NE(status.error_code(), StatusCode::OK);
ASSERT_EQ(test_writer_->num_observations_added(), 3);
// 30 minutes later, 1 more event occurs.
mock_clock_->increment_by(std::chrono::minutes(30));
now = mock_clock_->now();
record->event()->set_day_index(day_index);
CB_ASSERT_OK_AND_ASSIGN(
hour_id, cobalt::util::TimePointToHourId(now, *civil_time_converter_, *record->metric()));
record->event()->set_hour_id(hour_id);
ASSERT_TRUE(aggregation->AddEvent(*record, mock_clock_->now()).ok());
// 30 minutes later at 11:10PM, next aggregation run happens, but still fails.
mock_clock_->increment_by(std::chrono::minutes(30));
status = aggregation->GenerateAggregatedObservations(mock_clock_->now());
ASSERT_NE(status.error_code(), StatusCode::OK);
ASSERT_EQ(test_writer_->num_observations_added(), 3);
// An hour later at 12:10AM (after midnight UTC), daily event aggregation run happens and fails.
mock_clock_->increment_by(std::chrono::minutes(60));
status = aggregation->GenerateAggregatedObservations(mock_clock_->now());
ASSERT_NE(status.error_code(), StatusCode::OK);
ASSERT_EQ(test_writer_->num_observations_added(), 3);
// An hour later at 1:10AM (after midnight UTC), now aggregation succeeds, both daily and hourly.
test_writer_->SetFailCalls(false);
mock_clock_->increment_by(std::chrono::minutes(60));
status = aggregation->GenerateAggregatedObservations(mock_clock_->now());
ASSERT_EQ(status.error_code(), StatusCode::OK);
// 15 total new observations created.
// 6 new hourly observations created, 3 with an incremental count of 2, 3 with count of 1.
// 9 new daily observations are created, one for each of the UNIQUE_DEVICE_COUNTS reports.
ASSERT_EQ(test_writer_->num_observations_added(), 17);
VerifyStoredIntegerObservation(
3, metric_id.ForReport(kOccurrenceMetricFleetwideOccurrenceCountsReportReportId), day_index,
"100", {2});
VerifyStoredIntegerObservation(
4, metric_id.ForReport(kOccurrenceMetricFleetwideOccurrenceCountsReportReportId), day_index,
"100", {1});
VerifyStoredIntegerObservation(
5, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsReport1DayReportId), day_index,
"100", {1});
VerifyStoredIntegerObservation(
6, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsReport7DaysReportId), day_index,
"100", {1});
VerifyStoredIntegerObservation(
7, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsSelectFirstReport1DayReportId),
day_index, "100", {1});
VerifyStoredIntegerObservation(
8, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsSelectFirstReport7DaysReportId),
day_index, "100", {1});
VerifyStoredIntegerObservation(
9, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsReportAllReport1DayReportId),
day_index, "100", {1});
VerifyStoredIntegerObservation(
10, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsReportAllReport7DaysReportId),
day_index, "100", {1});
VerifyStoredIntegerObservation(
11,
metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsSelectMostCommonReport1DayReportId),
day_index, "100", {1});
VerifyStoredIntegerObservation(
12,
metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsSelectMostCommonReport7DaysReportId),
day_index, "100", {1});
VerifyStoredIntegerObservation(
13, metric_id.ForReport(kOccurrenceMetricHourlyDeviceHistogramsReportId), day_index, "100",
{2});
VerifyStoredIntegerObservation(
14, metric_id.ForReport(kOccurrenceMetricHourlyDeviceHistogramsReportId), day_index, "100",
{1});
}
TEST_F(LocalAggregationTest, GenerateObservationsExpedited) {
std::shared_ptr<const logger::ProjectContext> project_context = GetProjectContext();
lib::MetricIdentifier metric_id =
project_context->Identifier().ForMetric(kExpeditedOccurrenceMetricMetricId);
const MetricDefinition* metric = project_context->GetMetric(kExpeditedOccurrenceMetricMetricId);
// This test assumes that this metric has no reports with added privacy.
for (const ReportDefinition& report : metric->reports()) {
ASSERT_EQ(ReportDefinition::NO_ADDED_PRIVACY, report.privacy_level());
}
std::unique_ptr<LocalAggregation> aggregation = MakeLocalAggregation();
// Start 9th January, 2014 at 10 minutes after 7PM, UTC
std::tm tm;
strptime("2014-01-09 19:10:00", "%Y-%m-%d %H:%M:%S", &tm);
std::chrono::time_point<std::chrono::system_clock> system_time =
std::chrono::system_clock::from_time_t(timegm(&tm));
// Before any events occur, aggregation run happens, no observations are generated.
aggregation->GenerateAggregatedObservations(system_time);
ASSERT_EQ(test_writer_->num_observations_added(), 0);
// 10 minutes later, 4 events occur with event code 45, with two different system_versions.
system_time += std::chrono::minutes(10);
std::unique_ptr<logger::EventRecord> record =
logger::EventRecord::MakeEventRecord(project_context, kExpeditedOccurrenceMetricMetricId)
.value();
record->event()->mutable_occurrence_event()->set_count(1);
record->event()->mutable_occurrence_event()->add_event_code(
ExpeditedOccurrenceMetricMetricDimensionFirstDimension_Code45);
std::chrono::system_clock::time_point now = mock_clock_->now();
CB_ASSERT_OK_AND_ASSIGN(uint32_t day_index, cobalt::util::TimePointToDayIndex(
now, *civil_time_converter_, *record->metric()));
CB_ASSERT_OK_AND_ASSIGN(uint32_t hour_id, cobalt::util::TimePointToHourId(
now, *civil_time_converter_, *record->metric()));
record->event()->set_day_index(day_index);
record->event()->set_hour_id(hour_id);
record->system_profile()->set_system_version("99");
ASSERT_TRUE(aggregation->AddEvent(*record, mock_clock_->now()).ok());
ASSERT_TRUE(aggregation->AddEvent(*record, mock_clock_->now()).ok());
record->system_profile()->set_system_version("100");
ASSERT_TRUE(aggregation->AddEvent(*record, mock_clock_->now()).ok());
ASSERT_TRUE(aggregation->AddEvent(*record, mock_clock_->now()).ok());
// 50 minutes later (1 hour from the previous run) at 8:10PM, aggregation run happens.
system_time += std::chrono::minutes(50);
aggregation->GenerateAggregatedObservations(system_time);
// 8 observations created, one for each expedited non-REPORT_ALL UNIQUE_DEVICE_COUNTS reports, two
// for the REPORT_ALL UNIQUE_DEVICE_COUNTS reports.
ASSERT_EQ(test_writer_->num_observations_added(), 8);
VerifyStoredIntegerObservation(
0, metric_id.ForReport(kExpeditedOccurrenceMetricAtLeastOnce1DayReportId), day_index, "100",
{1}, {{ExpeditedOccurrenceMetricMetricDimensionFirstDimension::Code45}});
VerifyStoredIntegerObservation(
1, metric_id.ForReport(kExpeditedOccurrenceMetricAtLeastOnce7DayReportId), day_index, "100",
{1}, {{ExpeditedOccurrenceMetricMetricDimensionFirstDimension::Code45}});
VerifyStoredIntegerObservation(
2, metric_id.ForReport(kExpeditedOccurrenceMetricSelectFirst1DayReportId), day_index, "100",
{1}, {{ExpeditedOccurrenceMetricMetricDimensionFirstDimension::Code45}});
VerifyStoredIntegerObservation(
3, metric_id.ForReport(kExpeditedOccurrenceMetricSelectFirst7DayReportId), day_index, "100",
{1}, {{ExpeditedOccurrenceMetricMetricDimensionFirstDimension::Code45}});
VerifyStoredIntegerObservation(
4, metric_id.ForReport(kExpeditedOccurrenceMetricReportAll1DayReportId), day_index, "99", {1},
{{ExpeditedOccurrenceMetricMetricDimensionFirstDimension::Code45}});
VerifyStoredIntegerObservation(
5, metric_id.ForReport(kExpeditedOccurrenceMetricReportAll1DayReportId), day_index, "100",
{1}, {{ExpeditedOccurrenceMetricMetricDimensionFirstDimension::Code45}});
VerifyStoredIntegerObservation(
6, metric_id.ForReport(kExpeditedOccurrenceMetricReportAll7DayReportId), day_index, "99", {1},
{{ExpeditedOccurrenceMetricMetricDimensionFirstDimension::Code45}});
VerifyStoredIntegerObservation(
7, metric_id.ForReport(kExpeditedOccurrenceMetricReportAll7DayReportId), day_index, "100",
{1}, {{ExpeditedOccurrenceMetricMetricDimensionFirstDimension::Code45}});
// An hour later at 9:10PM, no more events, next aggregation run happens.
system_time += std::chrono::minutes(60);
aggregation->GenerateAggregatedObservations(system_time);
// Still only the original observations created, expedited observations are not re-sent.
ASSERT_EQ(test_writer_->num_observations_added(), 8);
// 45 minutes later, 2 more events occur, one with a different event code 46.
system_time += std::chrono::minutes(45);
record->event()->mutable_occurrence_event()->set_event_code(
0, ExpeditedOccurrenceMetricMetricDimensionFirstDimension_Code46);
now = system_time;
record->event()->set_day_index(day_index);
CB_ASSERT_OK_AND_ASSIGN(
hour_id, cobalt::util::TimePointToHourId(now, *civil_time_converter_, *record->metric()));
record->event()->set_hour_id(hour_id);
ASSERT_TRUE(aggregation->AddEvent(*record, mock_clock_->now()).ok());
record->event()->mutable_occurrence_event()->set_event_code(
0, ExpeditedOccurrenceMetricMetricDimensionFirstDimension_Code46);
ASSERT_TRUE(aggregation->AddEvent(*record, mock_clock_->now()).ok());
// 15 minutes later (1 hour from the previous run) at 10:10PM, aggregation run happens.
system_time += std::chrono::minutes(15);
aggregation->GenerateAggregatedObservations(system_time);
// 4 new observation for the AT_LEAST_ONCE reports, with the new event code of 46.
// Note that the expedited SELECT_FIRST report was already sent, so doesn't get new observations.
// Also, no new observations for event code 45 are created, as they were already sent.
ASSERT_EQ(test_writer_->num_observations_added(), 12);
VerifyStoredIntegerObservation(
8, metric_id.ForReport(kExpeditedOccurrenceMetricAtLeastOnce1DayReportId), day_index, "100",
{1}, {{ExpeditedOccurrenceMetricMetricDimensionFirstDimension::Code46}});
VerifyStoredIntegerObservation(
9, metric_id.ForReport(kExpeditedOccurrenceMetricAtLeastOnce7DayReportId), day_index, "100",
{1}, {{ExpeditedOccurrenceMetricMetricDimensionFirstDimension::Code46}});
VerifyStoredIntegerObservation(
10, metric_id.ForReport(kExpeditedOccurrenceMetricReportAll1DayReportId), day_index, "100",
{1}, {{ExpeditedOccurrenceMetricMetricDimensionFirstDimension::Code46}});
VerifyStoredIntegerObservation(
11, metric_id.ForReport(kExpeditedOccurrenceMetricReportAll7DayReportId), day_index, "100",
{1}, {{ExpeditedOccurrenceMetricMetricDimensionFirstDimension::Code46}});
// An hour later at 11:10PM, no more events, next aggregation run happens.
system_time += std::chrono::minutes(60);
aggregation->GenerateAggregatedObservations(system_time);
// Still only the expected original observation created.
ASSERT_EQ(test_writer_->num_observations_added(), 12);
// An hour later at 12:10AM (after midnight UTC), daily event aggregation run happens.
system_time += std::chrono::minutes(60);
aggregation->GenerateAggregatedObservations(system_time);
// Note that the daily expedited reports were already sent, so they don't get new observations.
// But 4 more expedited observations are created for the next day's 7day reports: an AT_LEAST_ONCE
// report containing the codes 45 and 46, a SELECT_FIRST report for the code 45, and two
// REPORT_ALL reports for the two system_versions reported earlier.
ASSERT_EQ(test_writer_->num_observations_added(), 16);
day_index += 1;
VerifyStoredIntegerObservation(
12, metric_id.ForReport(kExpeditedOccurrenceMetricAtLeastOnce7DayReportId), day_index, "100",
{1, 1},
{{ExpeditedOccurrenceMetricMetricDimensionFirstDimension::Code45},
{ExpeditedOccurrenceMetricMetricDimensionFirstDimension::Code46}});
VerifyStoredIntegerObservation(
13, metric_id.ForReport(kExpeditedOccurrenceMetricSelectFirst7DayReportId), day_index, "100",
{1}, {{ExpeditedOccurrenceMetricMetricDimensionFirstDimension::Code45}});
VerifyStoredIntegerObservation(
14, metric_id.ForReport(kExpeditedOccurrenceMetricReportAll7DayReportId), day_index, "99",
{1}, {{ExpeditedOccurrenceMetricMetricDimensionFirstDimension::Code45}});
VerifyStoredIntegerObservation(
15, metric_id.ForReport(kExpeditedOccurrenceMetricReportAll7DayReportId), day_index, "100",
{1, 1},
{{ExpeditedOccurrenceMetricMetricDimensionFirstDimension::Code45},
{ExpeditedOccurrenceMetricMetricDimensionFirstDimension::Code46}});
// The next day, no events occur so all 23 hourly aggregation runs create no new observations.
for (int i = 1; i <= 23; i++) {
system_time += std::chrono::minutes(60);
aggregation->GenerateAggregatedObservations(system_time);
ASSERT_EQ(test_writer_->num_observations_added(), 16);
}
// Finally, at 12:10AM (just after midnight UTC), daily event aggregation run happens.
system_time += std::chrono::minutes(60);
aggregation->GenerateAggregatedObservations(system_time);
// 4 new observations are created, again only for the 7-day window UNIQUE_DEVICE_COUNTS reports.
ASSERT_EQ(test_writer_->num_observations_added(), 20);
day_index += 1;
VerifyStoredIntegerObservation(
16, metric_id.ForReport(kExpeditedOccurrenceMetricAtLeastOnce7DayReportId), day_index, "100",
{1, 1},
{{ExpeditedOccurrenceMetricMetricDimensionFirstDimension::Code45},
{ExpeditedOccurrenceMetricMetricDimensionFirstDimension::Code46}});
VerifyStoredIntegerObservation(
17, metric_id.ForReport(kExpeditedOccurrenceMetricSelectFirst7DayReportId), day_index, "100",
{1}, {{ExpeditedOccurrenceMetricMetricDimensionFirstDimension::Code45}});
VerifyStoredIntegerObservation(
18, metric_id.ForReport(kExpeditedOccurrenceMetricReportAll7DayReportId), day_index, "99",
{1}, {{ExpeditedOccurrenceMetricMetricDimensionFirstDimension::Code45}});
VerifyStoredIntegerObservation(
19, metric_id.ForReport(kExpeditedOccurrenceMetricReportAll7DayReportId), day_index, "100",
{1, 1},
{{ExpeditedOccurrenceMetricMetricDimensionFirstDimension::Code45},
{ExpeditedOccurrenceMetricMetricDimensionFirstDimension::Code46}});
// For the next 4 days (3 days of data have been sent above) no more events occur, and the 7-day
// reports continue to be sent at the start of the day.
for (int j = 0; j < 4; j++) {
// The 23 hourly aggregation runs.
for (int i = 1; i <= 23; i++) {
system_time += std::chrono::minutes(60);
aggregation->GenerateAggregatedObservations(system_time);
ASSERT_EQ(test_writer_->num_observations_added(), 20 + 4 * j);
}
// The 12:10AM (just after midnight UTC) daily event aggregation run.
system_time += std::chrono::minutes(60);
aggregation->GenerateAggregatedObservations(system_time);
// 3 new observations are created for the 7-day window UNIQUE_DEVICE_COUNTS reports.
ASSERT_EQ(test_writer_->num_observations_added(), 24 + 4 * j);
day_index += 1;
VerifyStoredIntegerObservation(
20 + 4 * j, metric_id.ForReport(kExpeditedOccurrenceMetricAtLeastOnce7DayReportId),
day_index, "100", {1, 1},
{{ExpeditedOccurrenceMetricMetricDimensionFirstDimension::Code45},
{ExpeditedOccurrenceMetricMetricDimensionFirstDimension::Code46}});
VerifyStoredIntegerObservation(
21 + 4 * j, metric_id.ForReport(kExpeditedOccurrenceMetricSelectFirst7DayReportId),
day_index, "100", {1}, {{ExpeditedOccurrenceMetricMetricDimensionFirstDimension::Code45}});
VerifyStoredIntegerObservation(
22 + 4 * j, metric_id.ForReport(kExpeditedOccurrenceMetricReportAll7DayReportId), day_index,
"99", {1}, {{ExpeditedOccurrenceMetricMetricDimensionFirstDimension::Code45}});
VerifyStoredIntegerObservation(
23 + 4 * j, metric_id.ForReport(kExpeditedOccurrenceMetricReportAll7DayReportId), day_index,
"100", {1, 1},
{{ExpeditedOccurrenceMetricMetricDimensionFirstDimension::Code45},
{ExpeditedOccurrenceMetricMetricDimensionFirstDimension::Code46}});
}
// Finally, on the 8th day, and all subsequent days, no observations are created at all.
for (int i = 1; i <= 72; i++) { // run 72 more hours to be sure
system_time += std::chrono::minutes(60);
aggregation->GenerateAggregatedObservations(system_time);
ASSERT_EQ(test_writer_->num_observations_added(), 36);
}
}
TEST_F(LocalAggregationTest, StorageQuotasWork) {
std::unique_ptr<LocalAggregation> aggregation = MakeLocalAggregation({
.per_project_reserved_bytes = 60,
.total_capacity_bytes = 120,
});
auto record =
logger::EventRecord::MakeEventRecord(GetProjectContext(), kIntegerHistogramMetricMetricId)
.value();
::cobalt::HistogramBucket* bucket =
record->event()->mutable_integer_histogram_event()->add_buckets();
bucket->set_count(100);
record->event()->set_day_index(1);
record->event()->set_hour_id(1);
// Add entries to the histogram, until its size exceeds the per_project_reserved_bytes.
int num_to_slush = 0;
while (aggregation->SlushUsed() == 0) {
bucket->set_index(num_to_slush);
ASSERT_TRUE(aggregation->AddEvent(*record, mock_clock_->now()).ok());
num_to_slush++;
}
// Must have stored at least a couple events successfully before exceeding
// per_project_reserved_bytes.
ASSERT_GT(num_to_slush, 2);
// We should be able to add another num_to_slush events successfully.
for (int i = 0; i < num_to_slush; i++) {
bucket->set_index(i + num_to_slush);
ASSERT_TRUE(aggregation->AddEvent(*record, mock_clock_->now()).ok());
}
bool failure_occurred = false;
Status last_status;
// Add another 8 events, it should fail on one of these.
for (int i = 0; i < 8; i++) {
bucket->set_index(num_to_slush * 2 + i);
last_status = aggregation->AddEvent(*record, mock_clock_->now());
if (last_status.error_code() == StatusCode::RESOURCE_EXHAUSTED) {
failure_occurred = true;
}
}
ASSERT_TRUE(failure_occurred);
ASSERT_THAT(last_status.error_details(), HasSubstr("per_project_reserved_bytes=60"));
ASSERT_THAT(last_status.error_details(), ContainsRegex("project_bytes=1.."));
ASSERT_THAT(last_status.error_details(), HasSubstr("SlushSize=60"));
ASSERT_THAT(last_status.error_details(), ContainsRegex("SlushUsed=6."));
}
TEST_F(LocalAggregationTest, CrashesIfTooManyBytes) {
ASSERT_DEATH(std::unique_ptr<LocalAggregation> aggregation = MakeLocalAggregation({
.per_project_reserved_bytes = 120,
.total_capacity_bytes = 100,
}),
"There is no space in slush");
}
TEST_F(LocalAggregationTest, MigrateToReportAll) {
OverrideRegistry(report_all_test::with_select_last_set::kRegistryBase64);
std::shared_ptr<const logger::ProjectContext> project_context = GetProjectContext();
std::unique_ptr<LocalAggregation> aggregation = MakeLocalAggregation();
uint32_t day_index;
uint32_t hour_id;
{
auto record = logger::EventRecord::MakeEventRecord(
project_context,
report_all_test::with_select_last_set::kOccurrenceMetricReportAllMetricId)
.value();
record->event()->mutable_occurrence_event()->set_count(1);
std::chrono::system_clock::time_point now = mock_clock_->now();
CB_ASSERT_OK_AND_ASSIGN(day_index, cobalt::util::TimePointToDayIndex(
now, *civil_time_converter_, *record->metric()));
CB_ASSERT_OK_AND_ASSIGN(
hour_id, cobalt::util::TimePointToHourId(now, *civil_time_converter_, *record->metric()));
record->event()->set_day_index(day_index);
record->event()->set_hour_id(hour_id);
record->system_profile()->set_system_version("100");
ASSERT_EQ(aggregation->AddEvent(*record, mock_clock_->now()).error_code(), StatusCode::OK);
record->system_profile()->set_system_version("101");
ASSERT_EQ(aggregation->AddEvent(*record, mock_clock_->now()).error_code(), StatusCode::OK);
}
OverrideRegistry(report_all_test::with_report_all_set::kRegistryBase64);
project_context = GetProjectContext();
aggregation = MakeLocalAggregation();
{
auto record = logger::EventRecord::MakeEventRecord(
project_context,
report_all_test::with_report_all_set::kOccurrenceMetricReportAllMetricId)
.value();
record->event()->mutable_occurrence_event()->set_count(1);
std::chrono::system_clock::time_point now = mock_clock_->now();
CB_ASSERT_OK_AND_ASSIGN(day_index, cobalt::util::TimePointToDayIndex(
now, *civil_time_converter_, *record->metric()));
CB_ASSERT_OK_AND_ASSIGN(
hour_id, cobalt::util::TimePointToHourId(now, *civil_time_converter_, *record->metric()));
record->event()->set_day_index(day_index);
record->event()->set_hour_id(hour_id);
record->system_profile()->set_system_version("100");
ASSERT_EQ(aggregation->AddEvent(*record, mock_clock_->now()).error_code(), StatusCode::OK);
record->system_profile()->set_system_version("101");
ASSERT_EQ(aggregation->AddEvent(*record, mock_clock_->now()).error_code(), StatusCode::OK);
}
aggregation->GenerateAggregatedObservations(mock_clock_->now() + util::kOneDay);
ASSERT_EQ(test_writer_->num_observations_added(), 2);
EXPECT_EQ(test_writer_->metadata_received[0]->customer_id(), kCustomerId);
lib::MetricIdentifier metric_id = project_context->Identifier().ForMetric(
report_all_test::with_report_all_set::kOccurrenceMetricReportAllMetricId);
// The first two metrics will be logged to the non partitioned data portion. When the
// migration happens, those two will be moved to whatever value the SystemProfile currently
// holds. Thus 2 ungrouped + 1 grouped = 3
VerifyStoredIntegerObservation(
0,
metric_id.ForReport(report_all_test::with_report_all_set::
kOccurrenceMetricReportAllUniqueDeviceCounts1DayReportAllReportId),
day_index, "101", {3});
VerifyStoredIntegerObservation(
1,
metric_id.ForReport(report_all_test::with_report_all_set::
kOccurrenceMetricReportAllUniqueDeviceCounts1DayReportAllReportId),
day_index, "100", {1});
}
TEST_F(LocalAggregationTest, MigrateFromReportAll) {
OverrideRegistry(report_all_test::with_report_all_set::kRegistryBase64);
std::shared_ptr<const logger::ProjectContext> project_context = GetProjectContext();
std::unique_ptr<LocalAggregation> aggregation = MakeLocalAggregation();
util::TimeInfo time_info;
{
auto record = logger::EventRecord::MakeEventRecord(
project_context,
report_all_test::with_report_all_set::kOccurrenceMetricReportAllMetricId)
.value();
record->event()->mutable_occurrence_event()->set_count(1);
std::chrono::system_clock::time_point now = mock_clock_->now();
CB_ASSERT_OK_AND_ASSIGN(
time_info, util::TimeInfo::FromTimePoint(now, *civil_time_converter_, *record->metric()));
record->event()->set_day_index(time_info.day_index);
record->event()->set_hour_id(time_info.hour_id);
record->system_profile()->set_system_version("100");
ASSERT_EQ(aggregation->AddEvent(*record, mock_clock_->now()).error_code(), StatusCode::OK);
record->system_profile()->set_system_version("101");
ASSERT_EQ(aggregation->AddEvent(*record, mock_clock_->now()).error_code(), StatusCode::OK);
}
OverrideRegistry(report_all_test::with_select_last_set::kRegistryBase64);
project_context = GetProjectContext();
aggregation = MakeLocalAggregation();
{
auto record = logger::EventRecord::MakeEventRecord(
project_context,
report_all_test::with_select_last_set::kOccurrenceMetricReportAllMetricId)
.value();
record->event()->mutable_occurrence_event()->set_count(1);
std::chrono::system_clock::time_point now = mock_clock_->now();
CB_ASSERT_OK_AND_ASSIGN(
time_info, util::TimeInfo::FromTimePoint(now, *civil_time_converter_, *record->metric()));
record->event()->set_day_index(time_info.day_index);
record->event()->set_hour_id(time_info.hour_id);
record->system_profile()->set_system_version("100");
ASSERT_EQ(aggregation->AddEvent(*record, mock_clock_->now()).error_code(), StatusCode::OK);
record->system_profile()->set_system_version("101");
ASSERT_EQ(aggregation->AddEvent(*record, mock_clock_->now()).error_code(), StatusCode::OK);
}
aggregation->GenerateAggregatedObservations(mock_clock_->now() + util::kOneDay);
ASSERT_EQ(test_writer_->num_observations_added(), 1);
EXPECT_EQ(test_writer_->metadata_received[0]->customer_id(), kCustomerId);
lib::MetricIdentifier metric_id = project_context->Identifier().ForMetric(
report_all_test::with_select_last_set::kOccurrenceMetricReportAllMetricId);
// All 4 metrics should be grouped back together under system_version 101 (the most recently
// seen).
VerifyStoredIntegerObservation(
0,
metric_id.ForReport(report_all_test::with_select_last_set::
kOccurrenceMetricReportAllUniqueDeviceCounts1DayReportAllReportId),
time_info.day_index, "101", {4});
}
TEST_F(LocalAggregationTest, ReportAllWorks) {
const int32_t kNumPartitions = 15;
OverrideRegistry(report_all_test::with_report_all_set::kRegistryBase64);
std::shared_ptr<const logger::ProjectContext> project_context = GetProjectContext();
std::unique_ptr<LocalAggregation> aggregation = MakeLocalAggregation();
auto record =
logger::EventRecord::MakeEventRecord(
project_context, report_all_test::with_report_all_set::kOccurrenceMetricReportAllMetricId)
.value();
record->event()->mutable_occurrence_event()->set_count(1);
std::chrono::system_clock::time_point now = mock_clock_->now();
CB_ASSERT_OK_AND_ASSIGN(
util::TimeInfo time_info,
util::TimeInfo::FromTimePoint(now, *civil_time_converter_, *record->metric()));
record->event()->set_day_index(time_info.day_index);
record->event()->set_hour_id(time_info.hour_id);
for (int32_t i = 1; i <= kNumPartitions; i++) {
for (int32_t j = i; j <= kNumPartitions; j++) {
record->system_profile()->set_system_version(absl::StrCat("Version ", j));
ASSERT_EQ(aggregation->AddEvent(*record, mock_clock_->now()).error_code(), StatusCode::OK);
}
}
aggregation->GenerateAggregatedObservations(mock_clock_->now() + util::kOneDay);
ASSERT_EQ(test_writer_->num_observations_added(), kNumPartitions);
ASSERT_EQ(test_writer_->metadata_received[0]->customer_id(), kCustomerId);
size_t i = 0;
for (const std::unique_ptr<observation_store::StoredObservation>& obs :
test_writer_->messages_received) {
ASSERT_TRUE(obs->has_unencrypted());
Observation observation = obs->unencrypted();
ASSERT_TRUE(observation.has_integer());
ASSERT_GT(observation.integer().values_size(), 0);
std::string system_version =
test_writer_->metadata_received[i]->system_profile().system_version();
int system_version_number = std::stoi(system_version.substr(system_version.find(' ')));
ASSERT_EQ(observation.integer().values(0).value(), system_version_number);
++i;
}
}
} // namespace cobalt::local_aggregation