| // Copyright 2020 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "src/local_aggregation_1_1/local_aggregation.h" |
| |
| #include <time.h> |
| |
| #include <gmock/gmock.h> |
| #include <gtest/gtest.h> |
| |
| #include "absl/strings/escaping.h" |
| #include "absl/strings/str_cat.h" |
| #include "src/lib/util/clock.h" |
| #include "src/lib/util/testing/test_with_files.h" |
| #include "src/local_aggregation_1_1/testing/report_all_test_registry_with_report_all_set.cb.h" |
| #include "src/local_aggregation_1_1/testing/report_all_test_registry_with_select_last_set.cb.h" |
| #include "src/local_aggregation_1_1/testing/test_registry.cb.h" |
| #include "src/logger/event_record.h" |
| #include "src/logger/logger_test_utils.h" |
| #include "src/logger/observation_writer.h" |
| #include "src/logger/project_context_factory.h" |
| #include "src/observation_store/observation_store_internal.pb.h" |
| #include "src/pb/metadata_builder.h" |
| #include "src/public/lib/registry_identifiers.h" |
| #include "src/public/lib/statusor/status_macros.h" |
| #include "src/system_data/client_secret.h" |
| #include "src/system_data/fake_system_data.h" |
| |
| namespace cobalt::local_aggregation { |
| using ::testing::Contains; |
| using ::testing::ContainsRegex; |
| using ::testing::HasSubstr; |
| using ::testing::Not; |
| using TimeInfo = util::TimeInfo; |
| |
| namespace { |
| |
| // Start 9th January 2014 at 10 minutes after 7PM, UTC. |
| std::tm kStartingTime = { |
| .tm_sec = 0, |
| .tm_min = 10, |
| .tm_hour = 19, |
| .tm_mday = 9, // 9th |
| .tm_mon = 0, // January |
| .tm_year = 114, // 2014 |
| }; |
| const std::chrono::time_point<std::chrono::system_clock> kStartingTimePoint = |
| std::chrono::system_clock::from_time_t(timegm(&kStartingTime)); |
| |
| std::unique_ptr<CobaltRegistry> GetRegistry(const std::string& registryBase64) { |
| std::string bytes; |
| if (!absl::Base64Unescape(registryBase64, &bytes)) { |
| LOG(ERROR) << "Unable to decode Base64 String"; |
| return nullptr; |
| } |
| |
| auto registry = std::make_unique<CobaltRegistry>(); |
| if (!registry->ParseFromString(bytes)) { |
| LOG(ERROR) << "Unable to parse registry from bytes"; |
| return nullptr; |
| } |
| |
| return registry; |
| } |
| |
| } // namespace |
| |
| class LocalAggregationTest : public util::testing::TestWithFiles { |
| private: |
| void SetUp() override { |
| TestWithFiles::SetUp(); |
| global_project_context_factory_ = |
| std::make_unique<logger::ProjectContextFactory>(GetRegistry(kCobaltRegistryBase64)); |
| mock_clock_ = |
| std::make_unique<util::IncrementingSystemClock>(std::chrono::system_clock::duration(0)); |
| mock_clock_->set_time(kStartingTimePoint); |
| validated_clock_ = std::make_unique<util::FakeValidatedClock>(mock_clock_.get()); |
| validated_clock_->SetAccurate(true); |
| civil_time_converter_ = std::make_unique<util::UtcTimeConverter>(); |
| metadata_builder_ = std::make_unique<MetadataBuilder>(system_data_, validated_clock_.get(), |
| system_data_cache_path(), fs()); |
| system_data_.SetVersion("never used"); |
| metadata_builder_->SnapshotSystemData(); |
| test_writer_ = std::make_unique<logger::testing::FakeObservationStore>(); |
| observation_writer_ = std::make_unique<logger::ObservationWriter>(*test_writer_, nullptr); |
| } |
| |
| protected: |
| std::unique_ptr<LocalAggregation> MakeLocalAggregation( |
| cobalt::StorageQuotas storage_quotas = cobalt::kDefaultStorageQuotas) { |
| CobaltConfig cfg = { |
| .storage_quotas = storage_quotas, |
| .local_aggregate_store_dir = local_aggregation_store_path(), |
| .client_secret = system_data::ClientSecret::GenerateNewSecret(), |
| }; |
| |
| return std::make_unique<LocalAggregation>(cfg, *global_project_context_factory_, system_data_, |
| *metadata_builder_, fs(), *observation_writer_, |
| *civil_time_converter_); |
| } |
| |
| void OverrideRegistry(const std::string& registryBase64) { |
| global_project_context_factory_ = |
| std::make_unique<logger::ProjectContextFactory>(GetRegistry(registryBase64)); |
| } |
| |
| std::shared_ptr<const logger::ProjectContext> GetProjectContext() { |
| return global_project_context_factory_->NewProjectContext( |
| lib::CustomerIdentifier(kCustomerId).ForProject(kProjectId)); |
| } |
| |
| void VerifyStoredIntegerObservation( |
| int observation_num, lib::ReportIdentifier report, uint32_t day_index, |
| const std::string& system_version, std::vector<int> expected_count, |
| std::vector<std::vector<uint32_t>> expected_event_codes = {}) { |
| ObservationMetadata* metadata = test_writer_->metadata_received[observation_num].get(); |
| EXPECT_EQ(metadata->customer_id(), report.customer_id()) |
| << " for observation: " << observation_num; |
| EXPECT_EQ(metadata->project_id(), report.project_id()) |
| << " for observation: " << observation_num; |
| EXPECT_EQ(metadata->metric_id(), report.metric_id()) << " for observation: " << observation_num; |
| EXPECT_EQ(metadata->report_id(), report.report_id()) << " for observation: " << observation_num; |
| EXPECT_EQ(metadata->day_index(), day_index) << " for observation: " << observation_num; |
| EXPECT_EQ(metadata->system_profile().system_version(), system_version) |
| << " for observation: " << observation_num; |
| EXPECT_EQ(metadata->system_profile().product_name(), "") |
| << " for observation: " << observation_num; |
| observation_store::StoredObservation* stored_observation = |
| test_writer_->messages_received[observation_num].get(); |
| ASSERT_TRUE(stored_observation->has_unencrypted()) << " for observation: " << observation_num; |
| ASSERT_TRUE(stored_observation->unencrypted().has_integer()) |
| << " for observation: " << observation_num; |
| ASSERT_EQ(stored_observation->unencrypted().integer().values_size(), expected_count.size()) |
| << " for observation: " << observation_num; |
| if (!expected_event_codes.empty()) { |
| ASSERT_EQ(expected_event_codes.size(), expected_count.size()) |
| << " for observation: " << observation_num; |
| } |
| for (int i = 0; i < expected_count.size(); i++) { |
| EXPECT_EQ(stored_observation->unencrypted().integer().values(i).value(), expected_count[i]) |
| << " for observation " << observation_num << " integer value: " << i; |
| if (!expected_event_codes.empty()) { |
| EXPECT_THAT(stored_observation->unencrypted().integer().values(i).event_codes(), |
| testing::ElementsAreArray(expected_event_codes[i])) |
| << " for observation " << observation_num << " integer value: " << i; |
| } |
| } |
| } |
| |
| system_data::FakeSystemData system_data_; |
| std::unique_ptr<logger::testing::FakeObservationStore> test_writer_; |
| std::unique_ptr<logger::ObservationWriter> observation_writer_; |
| std::unique_ptr<util::IncrementingSystemClock> mock_clock_; |
| std::unique_ptr<util::UtcTimeConverter> civil_time_converter_; |
| |
| private: |
| std::unique_ptr<logger::ProjectContextFactory> global_project_context_factory_; |
| std::unique_ptr<util::FakeValidatedClock> validated_clock_; |
| std::unique_ptr<MetadataBuilder> metadata_builder_; |
| }; |
| |
| TEST_F(LocalAggregationTest, AddEventWorks) { |
| std::unique_ptr<LocalAggregation> aggregation = MakeLocalAggregation(); |
| |
| ASSERT_THAT(fs().ListFiles(absl::StrCat(local_aggregation_store_path(), "/", kCustomerId, "/", |
| kProjectId)) |
| .value(), |
| Not(Contains(absl::StrCat(kOccurrenceMetricMetricId)))); |
| |
| auto record = |
| logger::EventRecord::MakeEventRecord(GetProjectContext(), kOccurrenceMetricMetricId).value(); |
| record->event()->mutable_occurrence_event(); |
| aggregation->AddEvent(*record, mock_clock_->now()); |
| |
| ASSERT_THAT(fs().ListFiles(absl::StrCat(local_aggregation_store_path(), "/", kCustomerId, "/", |
| kProjectId)) |
| .value(), |
| Contains(absl::StrCat(kOccurrenceMetricMetricId))); |
| } |
| |
| TEST_F(LocalAggregationTest, DisableWorks) { |
| std::unique_ptr<LocalAggregation> aggregation = MakeLocalAggregation(); |
| aggregation->Disable(true); |
| |
| // Check that the no data is present. |
| ASSERT_THAT(fs().ListFiles(absl::StrCat(local_aggregation_store_path(), "/", kCustomerId, "/", |
| kProjectId)) |
| .value(), |
| Not(Contains(absl::StrCat(kOccurrenceMetricMetricId)))); |
| |
| auto record = |
| logger::EventRecord::MakeEventRecord(GetProjectContext(), kOccurrenceMetricMetricId).value(); |
| record->event()->mutable_occurrence_event(); |
| aggregation->AddEvent(*record, mock_clock_->now()); |
| |
| // Check that the data still isn't present. |
| ASSERT_THAT(fs().ListFiles(absl::StrCat(local_aggregation_store_path(), "/", kCustomerId, "/", |
| kProjectId)) |
| .value(), |
| Not(Contains(absl::StrCat(kOccurrenceMetricMetricId)))); |
| } |
| |
| TEST_F(LocalAggregationTest, GenerateObservationsWorks) { |
| std::shared_ptr<const logger::ProjectContext> project_context = GetProjectContext(); |
| lib::MetricIdentifier metric_id = |
| project_context->Identifier().ForMetric(kOccurrenceMetricMetricId); |
| const MetricDefinition* metric = project_context->GetMetric(kOccurrenceMetricMetricId); |
| // This test assumes that this metric has no reports with added privacy. |
| for (const ReportDefinition& report : metric->reports()) { |
| ASSERT_EQ(ReportDefinition::NO_ADDED_PRIVACY, report.privacy_level()); |
| } |
| |
| std::unique_ptr<LocalAggregation> aggregation = MakeLocalAggregation(); |
| |
| // Before any events occur, aggregation run happens, no observations are generated. |
| aggregation->GenerateAggregatedObservations(mock_clock_->now()); |
| ASSERT_EQ(test_writer_->num_observations_added(), 0); |
| |
| // 10 minutes later, 4 events occur. |
| mock_clock_->increment_by(std::chrono::minutes(10)); |
| std::unique_ptr<logger::EventRecord> record = |
| logger::EventRecord::MakeEventRecord(project_context, kOccurrenceMetricMetricId).value(); |
| record->event()->mutable_occurrence_event()->set_count(1); |
| std::chrono::system_clock::time_point now = mock_clock_->now(); |
| CB_ASSERT_OK_AND_ASSIGN(uint32_t day_index, cobalt::util::TimePointToDayIndex( |
| now, *civil_time_converter_, *record->metric())); |
| CB_ASSERT_OK_AND_ASSIGN(uint32_t hour_id, cobalt::util::TimePointToHourId( |
| now, *civil_time_converter_, *record->metric())); |
| record->event()->set_day_index(day_index); |
| record->event()->set_hour_id(hour_id); |
| record->system_profile()->set_system_version("100"); |
| record->system_profile()->set_product_name("should-be-filtered"); |
| ASSERT_TRUE(aggregation->AddEvent(*record, mock_clock_->now()).ok()); |
| ASSERT_TRUE(aggregation->AddEvent(*record, mock_clock_->now()).ok()); |
| ASSERT_TRUE(aggregation->AddEvent(*record, mock_clock_->now()).ok()); |
| ASSERT_TRUE(aggregation->AddEvent(*record, mock_clock_->now()).ok()); |
| |
| // 50 minutes later (1 hour from the previous run) at 8:10PM, aggregation run happens. |
| mock_clock_->increment_by(std::chrono::minutes(50)); |
| aggregation->GenerateAggregatedObservations(mock_clock_->now()); |
| |
| // 3 observations created, for fleetwide_occurrence_counts_report, hourly_device_histograms, and |
| // hourly_device_histograms_report_all. |
| ASSERT_EQ(test_writer_->num_observations_added(), 3); |
| VerifyStoredIntegerObservation( |
| 0, metric_id.ForReport(kOccurrenceMetricFleetwideOccurrenceCountsReportReportId), day_index, |
| "100", {4}); |
| VerifyStoredIntegerObservation( |
| 1, metric_id.ForReport(kOccurrenceMetricHourlyDeviceHistogramsReportId), day_index, "100", |
| {4}); |
| VerifyStoredIntegerObservation( |
| 2, metric_id.ForReport(kOccurrenceMetricHourlyDeviceHistogramsReportAllReportId), day_index, |
| "100", {4}); |
| |
| // An hour later at 9:10PM, no more events, next aggregation run happens. |
| mock_clock_->increment_by(std::chrono::minutes(60)); |
| aggregation->GenerateAggregatedObservations(mock_clock_->now()); |
| |
| // Still only the original 3 observations created. |
| ASSERT_EQ(test_writer_->num_observations_added(), 3); |
| |
| // 45 minutes later, 3 more events occur, with a new system version. |
| mock_clock_->increment_by(std::chrono::minutes(45)); |
| now = mock_clock_->now(); |
| record->event()->set_day_index(day_index); |
| CB_ASSERT_OK_AND_ASSIGN( |
| hour_id, cobalt::util::TimePointToHourId(now, *civil_time_converter_, *record->metric())); |
| record->event()->set_hour_id(hour_id); |
| record->system_profile()->set_system_version("101"); |
| ASSERT_TRUE(aggregation->AddEvent(*record, mock_clock_->now()).ok()); |
| ASSERT_TRUE(aggregation->AddEvent(*record, mock_clock_->now()).ok()); |
| |
| // 15 minutes later (1 hour from the previous run) at 10:10PM, aggregation run happens. |
| mock_clock_->increment_by(std::chrono::minutes(15)); |
| aggregation->GenerateAggregatedObservations(mock_clock_->now()); |
| |
| // 3 new observations created, this time with an incremental count of 3. |
| ASSERT_EQ(test_writer_->num_observations_added(), 6); |
| VerifyStoredIntegerObservation( |
| 3, metric_id.ForReport(kOccurrenceMetricFleetwideOccurrenceCountsReportReportId), day_index, |
| "101", {2}); |
| VerifyStoredIntegerObservation( |
| 4, metric_id.ForReport(kOccurrenceMetricHourlyDeviceHistogramsReportId), day_index, "101", |
| {2}); |
| VerifyStoredIntegerObservation( |
| 5, metric_id.ForReport(kOccurrenceMetricHourlyDeviceHistogramsReportAllReportId), day_index, |
| "101", {2}); |
| |
| // An hour later at 11:10PM, no more events, next aggregation run happens. |
| mock_clock_->increment_by(std::chrono::minutes(60)); |
| aggregation->GenerateAggregatedObservations(mock_clock_->now()); |
| |
| // Still only the expected 6 observations created. |
| ASSERT_EQ(test_writer_->num_observations_added(), 6); |
| |
| // An hour later at 12:10AM (after midnight UTC), daily event aggregation run happens. |
| mock_clock_->increment_by(std::chrono::minutes(60)); |
| aggregation->GenerateAggregatedObservations(mock_clock_->now()); |
| |
| // 10 new observations are created, one for each of the non-REPORT_ALL UNIQUE_DEVICE_COUNTS |
| // reports, two for each of the REPORT_ALL UNIQUE_DEVICE_COUNTS reports. |
| ASSERT_EQ(test_writer_->num_observations_added(), 16); |
| VerifyStoredIntegerObservation( |
| 6, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsReport1DayReportId), day_index, |
| "101", {1}); |
| VerifyStoredIntegerObservation( |
| 7, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsReport7DaysReportId), day_index, |
| "101", {1}); |
| // SELECT_FIRST reports use the first system version, others use the second. |
| VerifyStoredIntegerObservation( |
| 8, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsSelectFirstReport1DayReportId), |
| day_index, "100", {1}); |
| VerifyStoredIntegerObservation( |
| 9, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsSelectFirstReport7DaysReportId), |
| day_index, "100", {1}); |
| |
| // This report is REPORT_ALL and generates 2 observations |
| VerifyStoredIntegerObservation( |
| 10, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsReportAllReport1DayReportId), |
| day_index, "101", {1}); |
| VerifyStoredIntegerObservation( |
| 11, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsReportAllReport1DayReportId), |
| day_index, "100", {1}); |
| |
| // This report is REPORT_ALL and generates 2 observations |
| VerifyStoredIntegerObservation( |
| 12, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsReportAllReport7DaysReportId), |
| day_index, "101", {1}); |
| VerifyStoredIntegerObservation( |
| 13, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsReportAllReport7DaysReportId), |
| day_index, "100", {1}); |
| |
| VerifyStoredIntegerObservation( |
| 14, |
| metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsSelectMostCommonReport1DayReportId), |
| day_index, "101", {1}); |
| VerifyStoredIntegerObservation( |
| 15, |
| metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsSelectMostCommonReport7DaysReportId), |
| day_index, "101", {1}); |
| |
| // 45 minutes later, 1 more event occurs, again with a new system version. |
| day_index += 1; |
| mock_clock_->increment_by(std::chrono::minutes(45)); |
| now = mock_clock_->now(); |
| record->event()->set_day_index(day_index); |
| CB_ASSERT_OK_AND_ASSIGN( |
| hour_id, cobalt::util::TimePointToHourId(now, *civil_time_converter_, *record->metric())); |
| record->event()->set_hour_id(hour_id); |
| record->system_profile()->set_system_version("102"); |
| ASSERT_TRUE(aggregation->AddEvent(*record, mock_clock_->now()).ok()); |
| |
| // 15 minutes later (1 hour from the previous run) at 1:10AM, aggregation run happens. |
| mock_clock_->increment_by(std::chrono::minutes(15)); |
| aggregation->GenerateAggregatedObservations(mock_clock_->now()); |
| |
| // 3 new observations created, this time with an incremental count of 1. |
| ASSERT_EQ(test_writer_->num_observations_added(), 19); |
| VerifyStoredIntegerObservation( |
| 16, metric_id.ForReport(kOccurrenceMetricFleetwideOccurrenceCountsReportReportId), day_index, |
| "102", {1}); |
| VerifyStoredIntegerObservation( |
| 17, metric_id.ForReport(kOccurrenceMetricHourlyDeviceHistogramsReportId), day_index, "102", |
| {1}); |
| VerifyStoredIntegerObservation( |
| 18, metric_id.ForReport(kOccurrenceMetricHourlyDeviceHistogramsReportAllReportId), day_index, |
| "102", {1}); |
| |
| // The rest of the day, no more events occur so 22 hourly aggregation runs create no new |
| // observations. |
| for (int i = 1; i <= 22; i++) { |
| mock_clock_->increment_by(std::chrono::minutes(60)); |
| aggregation->GenerateAggregatedObservations(mock_clock_->now()); |
| ASSERT_EQ(test_writer_->num_observations_added(), 19); |
| } |
| |
| // At 12:10AM (just after midnight UTC), daily event aggregation run happens. |
| mock_clock_->increment_by(std::chrono::minutes(60)); |
| aggregation->GenerateAggregatedObservations(mock_clock_->now()); |
| |
| // 10 new observations are created, one for each of the non-REPORT_ALL UNIQUE_DEVICE_COUNTS |
| // reports, 4 for each of the 7-day REPORT_ALL UNIQUE_DEVICE_COUNTS reports. |
| ASSERT_EQ(test_writer_->num_observations_added(), 29); |
| VerifyStoredIntegerObservation( |
| 19, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsReport1DayReportId), day_index, |
| "102", {1}); |
| VerifyStoredIntegerObservation( |
| 20, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsReport7DaysReportId), day_index, |
| "102", {1}); |
| // SELECT_FIRST 1-day report uses the new system version. |
| VerifyStoredIntegerObservation( |
| 21, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsSelectFirstReport1DayReportId), |
| day_index, "102", {1}); |
| // SELECT_FIRST 7-day report continues to use the original system version. |
| VerifyStoredIntegerObservation( |
| 22, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsSelectFirstReport7DaysReportId), |
| day_index, "100", {1}); |
| VerifyStoredIntegerObservation( |
| 23, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsReportAllReport1DayReportId), |
| day_index, "102", {1}); |
| |
| // 7 day REPORT_ALL metric reports an observation for each previously seen system version. (Order |
| // is deterministic based on the hash value) |
| VerifyStoredIntegerObservation( |
| 24, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsReportAllReport7DaysReportId), |
| day_index, "101", {1}); |
| VerifyStoredIntegerObservation( |
| 25, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsReportAllReport7DaysReportId), |
| day_index, "100", {1}); |
| VerifyStoredIntegerObservation( |
| 26, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsReportAllReport7DaysReportId), |
| day_index, "102", {1}); |
| |
| VerifyStoredIntegerObservation( |
| 27, |
| metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsSelectMostCommonReport1DayReportId), |
| day_index, "102", {1}); |
| VerifyStoredIntegerObservation( |
| 28, |
| metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsSelectMostCommonReport7DaysReportId), |
| day_index, "102", {1}); |
| |
| // The next day, no events occur so all 23 hourly aggregation runs create no new observations. |
| for (int i = 1; i <= 23; i++) { |
| mock_clock_->increment_by(std::chrono::minutes(60)); |
| aggregation->GenerateAggregatedObservations(mock_clock_->now()); |
| ASSERT_EQ(test_writer_->num_observations_added(), 29); |
| } |
| |
| // Finally, at 12:10AM (just after midnight UTC), daily event aggregation run happens. |
| mock_clock_->increment_by(std::chrono::minutes(60)); |
| aggregation->GenerateAggregatedObservations(mock_clock_->now()); |
| |
| // 6 new observations are created for the next day, only for the 7-day window |
| // UNIQUE_DEVICE_COUNTS reports, (3 for the REPORT_ALL 7-day report). |
| ASSERT_EQ(test_writer_->num_observations_added(), 35); |
| day_index += 1; |
| VerifyStoredIntegerObservation( |
| 29, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsReport7DaysReportId), day_index, |
| "102", {1}); |
| // SELECT_FIRST 7-day report continues to use the original system version. |
| VerifyStoredIntegerObservation( |
| 30, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsSelectFirstReport7DaysReportId), |
| day_index, "100", {1}); |
| |
| // 7 day REPORT_ALL metric reports an observation for each previously seen system version. (Order |
| // is deterministic based on the hash value) |
| VerifyStoredIntegerObservation( |
| 31, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsReportAllReport7DaysReportId), |
| day_index, "101", {1}); |
| VerifyStoredIntegerObservation( |
| 32, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsReportAllReport7DaysReportId), |
| day_index, "100", {1}); |
| VerifyStoredIntegerObservation( |
| 33, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsReportAllReport7DaysReportId), |
| day_index, "102", {1}); |
| |
| VerifyStoredIntegerObservation( |
| 34, |
| metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsSelectMostCommonReport7DaysReportId), |
| day_index, "102", {1}); |
| } |
| |
| TEST_F(LocalAggregationTest, GenerateObservationsWriterFailures) { |
| std::shared_ptr<const logger::ProjectContext> project_context = GetProjectContext(); |
| lib::MetricIdentifier metric_id = |
| project_context->Identifier().ForMetric(kOccurrenceMetricMetricId); |
| const MetricDefinition* metric = project_context->GetMetric(kOccurrenceMetricMetricId); |
| // This test assumes that this metric has no reports with added privacy. |
| for (const ReportDefinition& report : metric->reports()) { |
| ASSERT_EQ(ReportDefinition::NO_ADDED_PRIVACY, report.privacy_level()); |
| } |
| |
| std::unique_ptr<LocalAggregation> aggregation = MakeLocalAggregation(); |
| |
| // Before any events occur, aggregation run happens, no observations are generated. |
| Status status = aggregation->GenerateAggregatedObservations(mock_clock_->now()); |
| ASSERT_EQ(status.error_code(), StatusCode::OK); |
| ASSERT_EQ(test_writer_->num_observations_added(), 0); |
| |
| // 10 minutes later, 4 events occur. |
| mock_clock_->increment_by(std::chrono::minutes(10)); |
| std::unique_ptr<logger::EventRecord> record = |
| logger::EventRecord::MakeEventRecord(project_context, kOccurrenceMetricMetricId).value(); |
| record->event()->mutable_occurrence_event()->set_count(1); |
| std::chrono::system_clock::time_point now = mock_clock_->now(); |
| CB_ASSERT_OK_AND_ASSIGN(uint32_t day_index, cobalt::util::TimePointToDayIndex( |
| now, *civil_time_converter_, *record->metric())); |
| CB_ASSERT_OK_AND_ASSIGN(uint32_t hour_id, cobalt::util::TimePointToHourId( |
| now, *civil_time_converter_, *record->metric())); |
| record->event()->set_day_index(day_index); |
| record->event()->set_hour_id(hour_id); |
| record->system_profile()->set_system_version("100"); |
| ASSERT_TRUE(aggregation->AddEvent(*record, mock_clock_->now()).ok()); |
| ASSERT_TRUE(aggregation->AddEvent(*record, mock_clock_->now()).ok()); |
| ASSERT_TRUE(aggregation->AddEvent(*record, mock_clock_->now()).ok()); |
| ASSERT_TRUE(aggregation->AddEvent(*record, mock_clock_->now()).ok()); |
| |
| // 50 minutes later (1 hour from the previous run) at 8:10PM, aggregation run happens. |
| // But errors occur in the writing to the ObservationStore. |
| test_writer_->SetFailCalls(true); |
| mock_clock_->increment_by(std::chrono::minutes(50)); |
| status = aggregation->GenerateAggregatedObservations(mock_clock_->now()); |
| ASSERT_NE(status.error_code(), StatusCode::OK); |
| ASSERT_EQ(test_writer_->num_observations_added(), 0); |
| |
| // An hour later at 9:10PM, no more events, next aggregation run happens. |
| // The previously aggregated events should be retried. This time the writing succeeds. |
| test_writer_->SetFailCalls(false); |
| mock_clock_->increment_by(std::chrono::minutes(60)); |
| status = aggregation->GenerateAggregatedObservations(mock_clock_->now()); |
| ASSERT_EQ(status.error_code(), StatusCode::OK); |
| |
| // 3 observations created, for fleetwide_occurrence_counts_report, hourly_device_histograms, and |
| // hourly_device_histograms_report_all. |
| ASSERT_EQ(test_writer_->num_observations_added(), 3); |
| VerifyStoredIntegerObservation( |
| 0, metric_id.ForReport(kOccurrenceMetricFleetwideOccurrenceCountsReportReportId), day_index, |
| "100", {4}); |
| VerifyStoredIntegerObservation( |
| 1, metric_id.ForReport(kOccurrenceMetricHourlyDeviceHistogramsReportId), day_index, "100", |
| {4}); |
| VerifyStoredIntegerObservation( |
| 2, metric_id.ForReport(kOccurrenceMetricHourlyDeviceHistogramsReportAllReportId), day_index, |
| "100", {4}); |
| |
| // 45 minutes later, 2 more events occur. |
| mock_clock_->increment_by(std::chrono::minutes(45)); |
| now = mock_clock_->now(); |
| record->event()->set_day_index(day_index); |
| CB_ASSERT_OK_AND_ASSIGN( |
| hour_id, cobalt::util::TimePointToHourId(now, *civil_time_converter_, *record->metric())); |
| record->event()->set_hour_id(hour_id); |
| ASSERT_TRUE(aggregation->AddEvent(*record, mock_clock_->now()).ok()); |
| ASSERT_TRUE(aggregation->AddEvent(*record, mock_clock_->now()).ok()); |
| |
| // 15 minutes later (1 hour from the previous run) at 10:10PM, aggregation run happens. |
| // But errors occur again in the writing to the ObservationStore. |
| test_writer_->SetFailCalls(true); |
| mock_clock_->increment_by(std::chrono::minutes(15)); |
| status = aggregation->GenerateAggregatedObservations(mock_clock_->now()); |
| ASSERT_NE(status.error_code(), StatusCode::OK); |
| ASSERT_EQ(test_writer_->num_observations_added(), 3); |
| |
| // 30 minutes later, 1 more event occurs. |
| mock_clock_->increment_by(std::chrono::minutes(30)); |
| now = mock_clock_->now(); |
| record->event()->set_day_index(day_index); |
| CB_ASSERT_OK_AND_ASSIGN( |
| hour_id, cobalt::util::TimePointToHourId(now, *civil_time_converter_, *record->metric())); |
| record->event()->set_hour_id(hour_id); |
| ASSERT_TRUE(aggregation->AddEvent(*record, mock_clock_->now()).ok()); |
| |
| // 30 minutes later at 11:10PM, next aggregation run happens, but still fails. |
| mock_clock_->increment_by(std::chrono::minutes(30)); |
| status = aggregation->GenerateAggregatedObservations(mock_clock_->now()); |
| ASSERT_NE(status.error_code(), StatusCode::OK); |
| ASSERT_EQ(test_writer_->num_observations_added(), 3); |
| |
| // An hour later at 12:10AM (after midnight UTC), daily event aggregation run happens and fails. |
| mock_clock_->increment_by(std::chrono::minutes(60)); |
| status = aggregation->GenerateAggregatedObservations(mock_clock_->now()); |
| ASSERT_NE(status.error_code(), StatusCode::OK); |
| ASSERT_EQ(test_writer_->num_observations_added(), 3); |
| |
| // An hour later at 1:10AM (after midnight UTC), now aggregation succeeds, both daily and hourly. |
| test_writer_->SetFailCalls(false); |
| mock_clock_->increment_by(std::chrono::minutes(60)); |
| status = aggregation->GenerateAggregatedObservations(mock_clock_->now()); |
| ASSERT_EQ(status.error_code(), StatusCode::OK); |
| |
| // 15 total new observations created. |
| // 6 new hourly observations created, 3 with an incremental count of 2, 3 with count of 1. |
| // 9 new daily observations are created, one for each of the UNIQUE_DEVICE_COUNTS reports. |
| ASSERT_EQ(test_writer_->num_observations_added(), 17); |
| VerifyStoredIntegerObservation( |
| 3, metric_id.ForReport(kOccurrenceMetricFleetwideOccurrenceCountsReportReportId), day_index, |
| "100", {2}); |
| VerifyStoredIntegerObservation( |
| 4, metric_id.ForReport(kOccurrenceMetricFleetwideOccurrenceCountsReportReportId), day_index, |
| "100", {1}); |
| VerifyStoredIntegerObservation( |
| 5, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsReport1DayReportId), day_index, |
| "100", {1}); |
| VerifyStoredIntegerObservation( |
| 6, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsReport7DaysReportId), day_index, |
| "100", {1}); |
| VerifyStoredIntegerObservation( |
| 7, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsSelectFirstReport1DayReportId), |
| day_index, "100", {1}); |
| VerifyStoredIntegerObservation( |
| 8, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsSelectFirstReport7DaysReportId), |
| day_index, "100", {1}); |
| VerifyStoredIntegerObservation( |
| 9, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsReportAllReport1DayReportId), |
| day_index, "100", {1}); |
| VerifyStoredIntegerObservation( |
| 10, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsReportAllReport7DaysReportId), |
| day_index, "100", {1}); |
| VerifyStoredIntegerObservation( |
| 11, |
| metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsSelectMostCommonReport1DayReportId), |
| day_index, "100", {1}); |
| VerifyStoredIntegerObservation( |
| 12, |
| metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsSelectMostCommonReport7DaysReportId), |
| day_index, "100", {1}); |
| VerifyStoredIntegerObservation( |
| 13, metric_id.ForReport(kOccurrenceMetricHourlyDeviceHistogramsReportId), day_index, "100", |
| {2}); |
| VerifyStoredIntegerObservation( |
| 14, metric_id.ForReport(kOccurrenceMetricHourlyDeviceHistogramsReportId), day_index, "100", |
| {1}); |
| } |
| |
| TEST_F(LocalAggregationTest, GenerateObservationsExpedited) { |
| std::shared_ptr<const logger::ProjectContext> project_context = GetProjectContext(); |
| lib::MetricIdentifier metric_id = |
| project_context->Identifier().ForMetric(kExpeditedOccurrenceMetricMetricId); |
| const MetricDefinition* metric = project_context->GetMetric(kExpeditedOccurrenceMetricMetricId); |
| // This test assumes that this metric has no reports with added privacy. |
| for (const ReportDefinition& report : metric->reports()) { |
| ASSERT_EQ(ReportDefinition::NO_ADDED_PRIVACY, report.privacy_level()); |
| } |
| |
| std::unique_ptr<LocalAggregation> aggregation = MakeLocalAggregation(); |
| |
| // Start 9th January, 2014 at 10 minutes after 7PM, UTC |
| std::tm tm; |
| strptime("2014-01-09 19:10:00", "%Y-%m-%d %H:%M:%S", &tm); |
| std::chrono::time_point<std::chrono::system_clock> system_time = |
| std::chrono::system_clock::from_time_t(timegm(&tm)); |
| |
| // Before any events occur, aggregation run happens, no observations are generated. |
| aggregation->GenerateAggregatedObservations(system_time); |
| ASSERT_EQ(test_writer_->num_observations_added(), 0); |
| |
| // 10 minutes later, 4 events occur with event code 45, with two different system_versions. |
| system_time += std::chrono::minutes(10); |
| std::unique_ptr<logger::EventRecord> record = |
| logger::EventRecord::MakeEventRecord(project_context, kExpeditedOccurrenceMetricMetricId) |
| .value(); |
| record->event()->mutable_occurrence_event()->set_count(1); |
| record->event()->mutable_occurrence_event()->add_event_code( |
| ExpeditedOccurrenceMetricMetricDimensionFirstDimension_Code45); |
| std::chrono::system_clock::time_point now = mock_clock_->now(); |
| CB_ASSERT_OK_AND_ASSIGN(uint32_t day_index, cobalt::util::TimePointToDayIndex( |
| now, *civil_time_converter_, *record->metric())); |
| CB_ASSERT_OK_AND_ASSIGN(uint32_t hour_id, cobalt::util::TimePointToHourId( |
| now, *civil_time_converter_, *record->metric())); |
| record->event()->set_day_index(day_index); |
| record->event()->set_hour_id(hour_id); |
| record->system_profile()->set_system_version("99"); |
| ASSERT_TRUE(aggregation->AddEvent(*record, mock_clock_->now()).ok()); |
| ASSERT_TRUE(aggregation->AddEvent(*record, mock_clock_->now()).ok()); |
| record->system_profile()->set_system_version("100"); |
| ASSERT_TRUE(aggregation->AddEvent(*record, mock_clock_->now()).ok()); |
| ASSERT_TRUE(aggregation->AddEvent(*record, mock_clock_->now()).ok()); |
| |
| // 50 minutes later (1 hour from the previous run) at 8:10PM, aggregation run happens. |
| system_time += std::chrono::minutes(50); |
| aggregation->GenerateAggregatedObservations(system_time); |
| |
| // 8 observations created, one for each expedited non-REPORT_ALL UNIQUE_DEVICE_COUNTS reports, two |
| // for the REPORT_ALL UNIQUE_DEVICE_COUNTS reports. |
| ASSERT_EQ(test_writer_->num_observations_added(), 8); |
| VerifyStoredIntegerObservation( |
| 0, metric_id.ForReport(kExpeditedOccurrenceMetricAtLeastOnce1DayReportId), day_index, "100", |
| {1}, {{ExpeditedOccurrenceMetricMetricDimensionFirstDimension::Code45}}); |
| VerifyStoredIntegerObservation( |
| 1, metric_id.ForReport(kExpeditedOccurrenceMetricAtLeastOnce7DayReportId), day_index, "100", |
| {1}, {{ExpeditedOccurrenceMetricMetricDimensionFirstDimension::Code45}}); |
| VerifyStoredIntegerObservation( |
| 2, metric_id.ForReport(kExpeditedOccurrenceMetricSelectFirst1DayReportId), day_index, "100", |
| {1}, {{ExpeditedOccurrenceMetricMetricDimensionFirstDimension::Code45}}); |
| VerifyStoredIntegerObservation( |
| 3, metric_id.ForReport(kExpeditedOccurrenceMetricSelectFirst7DayReportId), day_index, "100", |
| {1}, {{ExpeditedOccurrenceMetricMetricDimensionFirstDimension::Code45}}); |
| VerifyStoredIntegerObservation( |
| 4, metric_id.ForReport(kExpeditedOccurrenceMetricReportAll1DayReportId), day_index, "99", {1}, |
| {{ExpeditedOccurrenceMetricMetricDimensionFirstDimension::Code45}}); |
| VerifyStoredIntegerObservation( |
| 5, metric_id.ForReport(kExpeditedOccurrenceMetricReportAll1DayReportId), day_index, "100", |
| {1}, {{ExpeditedOccurrenceMetricMetricDimensionFirstDimension::Code45}}); |
| VerifyStoredIntegerObservation( |
| 6, metric_id.ForReport(kExpeditedOccurrenceMetricReportAll7DayReportId), day_index, "99", {1}, |
| {{ExpeditedOccurrenceMetricMetricDimensionFirstDimension::Code45}}); |
| VerifyStoredIntegerObservation( |
| 7, metric_id.ForReport(kExpeditedOccurrenceMetricReportAll7DayReportId), day_index, "100", |
| {1}, {{ExpeditedOccurrenceMetricMetricDimensionFirstDimension::Code45}}); |
| |
| // An hour later at 9:10PM, no more events, next aggregation run happens. |
| system_time += std::chrono::minutes(60); |
| aggregation->GenerateAggregatedObservations(system_time); |
| |
| // Still only the original observations created, expedited observations are not re-sent. |
| ASSERT_EQ(test_writer_->num_observations_added(), 8); |
| |
| // 45 minutes later, 2 more events occur, one with a different event code 46. |
| system_time += std::chrono::minutes(45); |
| record->event()->mutable_occurrence_event()->set_event_code( |
| 0, ExpeditedOccurrenceMetricMetricDimensionFirstDimension_Code46); |
| now = system_time; |
| record->event()->set_day_index(day_index); |
| CB_ASSERT_OK_AND_ASSIGN( |
| hour_id, cobalt::util::TimePointToHourId(now, *civil_time_converter_, *record->metric())); |
| record->event()->set_hour_id(hour_id); |
| ASSERT_TRUE(aggregation->AddEvent(*record, mock_clock_->now()).ok()); |
| record->event()->mutable_occurrence_event()->set_event_code( |
| 0, ExpeditedOccurrenceMetricMetricDimensionFirstDimension_Code46); |
| ASSERT_TRUE(aggregation->AddEvent(*record, mock_clock_->now()).ok()); |
| |
| // 15 minutes later (1 hour from the previous run) at 10:10PM, aggregation run happens. |
| system_time += std::chrono::minutes(15); |
| aggregation->GenerateAggregatedObservations(system_time); |
| |
| // 4 new observation for the AT_LEAST_ONCE reports, with the new event code of 46. |
| // Note that the expedited SELECT_FIRST report was already sent, so doesn't get new observations. |
| // Also, no new observations for event code 45 are created, as they were already sent. |
| ASSERT_EQ(test_writer_->num_observations_added(), 12); |
| VerifyStoredIntegerObservation( |
| 8, metric_id.ForReport(kExpeditedOccurrenceMetricAtLeastOnce1DayReportId), day_index, "100", |
| {1}, {{ExpeditedOccurrenceMetricMetricDimensionFirstDimension::Code46}}); |
| VerifyStoredIntegerObservation( |
| 9, metric_id.ForReport(kExpeditedOccurrenceMetricAtLeastOnce7DayReportId), day_index, "100", |
| {1}, {{ExpeditedOccurrenceMetricMetricDimensionFirstDimension::Code46}}); |
| VerifyStoredIntegerObservation( |
| 10, metric_id.ForReport(kExpeditedOccurrenceMetricReportAll1DayReportId), day_index, "100", |
| {1}, {{ExpeditedOccurrenceMetricMetricDimensionFirstDimension::Code46}}); |
| VerifyStoredIntegerObservation( |
| 11, metric_id.ForReport(kExpeditedOccurrenceMetricReportAll7DayReportId), day_index, "100", |
| {1}, {{ExpeditedOccurrenceMetricMetricDimensionFirstDimension::Code46}}); |
| |
| // An hour later at 11:10PM, no more events, next aggregation run happens. |
| system_time += std::chrono::minutes(60); |
| aggregation->GenerateAggregatedObservations(system_time); |
| |
| // Still only the expected original observation created. |
| ASSERT_EQ(test_writer_->num_observations_added(), 12); |
| |
| // An hour later at 12:10AM (after midnight UTC), daily event aggregation run happens. |
| system_time += std::chrono::minutes(60); |
| aggregation->GenerateAggregatedObservations(system_time); |
| |
| // Note that the daily expedited reports were already sent, so they don't get new observations. |
| // But 4 more expedited observations are created for the next day's 7day reports: an AT_LEAST_ONCE |
| // report containing the codes 45 and 46, a SELECT_FIRST report for the code 45, and two |
| // REPORT_ALL reports for the two system_versions reported earlier. |
| ASSERT_EQ(test_writer_->num_observations_added(), 16); |
| day_index += 1; |
| VerifyStoredIntegerObservation( |
| 12, metric_id.ForReport(kExpeditedOccurrenceMetricAtLeastOnce7DayReportId), day_index, "100", |
| {1, 1}, |
| {{ExpeditedOccurrenceMetricMetricDimensionFirstDimension::Code45}, |
| {ExpeditedOccurrenceMetricMetricDimensionFirstDimension::Code46}}); |
| VerifyStoredIntegerObservation( |
| 13, metric_id.ForReport(kExpeditedOccurrenceMetricSelectFirst7DayReportId), day_index, "100", |
| {1}, {{ExpeditedOccurrenceMetricMetricDimensionFirstDimension::Code45}}); |
| VerifyStoredIntegerObservation( |
| 14, metric_id.ForReport(kExpeditedOccurrenceMetricReportAll7DayReportId), day_index, "99", |
| {1}, {{ExpeditedOccurrenceMetricMetricDimensionFirstDimension::Code45}}); |
| VerifyStoredIntegerObservation( |
| 15, metric_id.ForReport(kExpeditedOccurrenceMetricReportAll7DayReportId), day_index, "100", |
| {1, 1}, |
| {{ExpeditedOccurrenceMetricMetricDimensionFirstDimension::Code45}, |
| {ExpeditedOccurrenceMetricMetricDimensionFirstDimension::Code46}}); |
| |
| // The next day, no events occur so all 23 hourly aggregation runs create no new observations. |
| for (int i = 1; i <= 23; i++) { |
| system_time += std::chrono::minutes(60); |
| aggregation->GenerateAggregatedObservations(system_time); |
| ASSERT_EQ(test_writer_->num_observations_added(), 16); |
| } |
| |
| // Finally, at 12:10AM (just after midnight UTC), daily event aggregation run happens. |
| system_time += std::chrono::minutes(60); |
| aggregation->GenerateAggregatedObservations(system_time); |
| |
| // 4 new observations are created, again only for the 7-day window UNIQUE_DEVICE_COUNTS reports. |
| ASSERT_EQ(test_writer_->num_observations_added(), 20); |
| day_index += 1; |
| VerifyStoredIntegerObservation( |
| 16, metric_id.ForReport(kExpeditedOccurrenceMetricAtLeastOnce7DayReportId), day_index, "100", |
| {1, 1}, |
| {{ExpeditedOccurrenceMetricMetricDimensionFirstDimension::Code45}, |
| {ExpeditedOccurrenceMetricMetricDimensionFirstDimension::Code46}}); |
| VerifyStoredIntegerObservation( |
| 17, metric_id.ForReport(kExpeditedOccurrenceMetricSelectFirst7DayReportId), day_index, "100", |
| {1}, {{ExpeditedOccurrenceMetricMetricDimensionFirstDimension::Code45}}); |
| VerifyStoredIntegerObservation( |
| 18, metric_id.ForReport(kExpeditedOccurrenceMetricReportAll7DayReportId), day_index, "99", |
| {1}, {{ExpeditedOccurrenceMetricMetricDimensionFirstDimension::Code45}}); |
| VerifyStoredIntegerObservation( |
| 19, metric_id.ForReport(kExpeditedOccurrenceMetricReportAll7DayReportId), day_index, "100", |
| {1, 1}, |
| {{ExpeditedOccurrenceMetricMetricDimensionFirstDimension::Code45}, |
| {ExpeditedOccurrenceMetricMetricDimensionFirstDimension::Code46}}); |
| |
| // For the next 4 days (3 days of data have been sent above) no more events occur, and the 7-day |
| // reports continue to be sent at the start of the day. |
| for (int j = 0; j < 4; j++) { |
| // The 23 hourly aggregation runs. |
| for (int i = 1; i <= 23; i++) { |
| system_time += std::chrono::minutes(60); |
| aggregation->GenerateAggregatedObservations(system_time); |
| ASSERT_EQ(test_writer_->num_observations_added(), 20 + 4 * j); |
| } |
| |
| // The 12:10AM (just after midnight UTC) daily event aggregation run. |
| system_time += std::chrono::minutes(60); |
| aggregation->GenerateAggregatedObservations(system_time); |
| |
| // 3 new observations are created for the 7-day window UNIQUE_DEVICE_COUNTS reports. |
| ASSERT_EQ(test_writer_->num_observations_added(), 24 + 4 * j); |
| day_index += 1; |
| VerifyStoredIntegerObservation( |
| 20 + 4 * j, metric_id.ForReport(kExpeditedOccurrenceMetricAtLeastOnce7DayReportId), |
| day_index, "100", {1, 1}, |
| {{ExpeditedOccurrenceMetricMetricDimensionFirstDimension::Code45}, |
| {ExpeditedOccurrenceMetricMetricDimensionFirstDimension::Code46}}); |
| VerifyStoredIntegerObservation( |
| 21 + 4 * j, metric_id.ForReport(kExpeditedOccurrenceMetricSelectFirst7DayReportId), |
| day_index, "100", {1}, {{ExpeditedOccurrenceMetricMetricDimensionFirstDimension::Code45}}); |
| VerifyStoredIntegerObservation( |
| 22 + 4 * j, metric_id.ForReport(kExpeditedOccurrenceMetricReportAll7DayReportId), day_index, |
| "99", {1}, {{ExpeditedOccurrenceMetricMetricDimensionFirstDimension::Code45}}); |
| VerifyStoredIntegerObservation( |
| 23 + 4 * j, metric_id.ForReport(kExpeditedOccurrenceMetricReportAll7DayReportId), day_index, |
| "100", {1, 1}, |
| {{ExpeditedOccurrenceMetricMetricDimensionFirstDimension::Code45}, |
| {ExpeditedOccurrenceMetricMetricDimensionFirstDimension::Code46}}); |
| } |
| |
| // Finally, on the 8th day, and all subsequent days, no observations are created at all. |
| for (int i = 1; i <= 72; i++) { // run 72 more hours to be sure |
| system_time += std::chrono::minutes(60); |
| aggregation->GenerateAggregatedObservations(system_time); |
| ASSERT_EQ(test_writer_->num_observations_added(), 36); |
| } |
| } |
| |
| TEST_F(LocalAggregationTest, StorageQuotasWork) { |
| std::unique_ptr<LocalAggregation> aggregation = MakeLocalAggregation({ |
| .per_project_reserved_bytes = 60, |
| .total_capacity_bytes = 120, |
| }); |
| |
| auto record = |
| logger::EventRecord::MakeEventRecord(GetProjectContext(), kIntegerHistogramMetricMetricId) |
| .value(); |
| ::cobalt::HistogramBucket* bucket = |
| record->event()->mutable_integer_histogram_event()->add_buckets(); |
| bucket->set_count(100); |
| record->event()->set_day_index(1); |
| record->event()->set_hour_id(1); |
| |
| // Add entries to the histogram, until its size exceeds the per_project_reserved_bytes. |
| int num_to_slush = 0; |
| while (aggregation->SlushUsed() == 0) { |
| bucket->set_index(num_to_slush); |
| ASSERT_TRUE(aggregation->AddEvent(*record, mock_clock_->now()).ok()); |
| num_to_slush++; |
| } |
| |
| // Must have stored at least a couple events successfully before exceeding |
| // per_project_reserved_bytes. |
| ASSERT_GT(num_to_slush, 2); |
| |
| // We should be able to add another num_to_slush events successfully. |
| for (int i = 0; i < num_to_slush; i++) { |
| bucket->set_index(i + num_to_slush); |
| ASSERT_TRUE(aggregation->AddEvent(*record, mock_clock_->now()).ok()); |
| } |
| |
| bool failure_occurred = false; |
| Status last_status; |
| // Add another 8 events, it should fail on one of these. |
| for (int i = 0; i < 8; i++) { |
| bucket->set_index(num_to_slush * 2 + i); |
| last_status = aggregation->AddEvent(*record, mock_clock_->now()); |
| if (last_status.error_code() == StatusCode::RESOURCE_EXHAUSTED) { |
| failure_occurred = true; |
| } |
| } |
| |
| ASSERT_TRUE(failure_occurred); |
| ASSERT_THAT(last_status.error_details(), HasSubstr("per_project_reserved_bytes=60")); |
| ASSERT_THAT(last_status.error_details(), ContainsRegex("project_bytes=1..")); |
| ASSERT_THAT(last_status.error_details(), HasSubstr("SlushSize=60")); |
| ASSERT_THAT(last_status.error_details(), ContainsRegex("SlushUsed=6.")); |
| } |
| |
| TEST_F(LocalAggregationTest, CrashesIfTooManyBytes) { |
| ASSERT_DEATH(std::unique_ptr<LocalAggregation> aggregation = MakeLocalAggregation({ |
| .per_project_reserved_bytes = 120, |
| .total_capacity_bytes = 100, |
| }), |
| "There is no space in slush"); |
| } |
| |
| TEST_F(LocalAggregationTest, MigrateToReportAll) { |
| OverrideRegistry(report_all_test::with_select_last_set::kRegistryBase64); |
| std::shared_ptr<const logger::ProjectContext> project_context = GetProjectContext(); |
| |
| std::unique_ptr<LocalAggregation> aggregation = MakeLocalAggregation(); |
| uint32_t day_index; |
| uint32_t hour_id; |
| |
| { |
| auto record = logger::EventRecord::MakeEventRecord( |
| project_context, |
| report_all_test::with_select_last_set::kOccurrenceMetricReportAllMetricId) |
| .value(); |
| record->event()->mutable_occurrence_event()->set_count(1); |
| std::chrono::system_clock::time_point now = mock_clock_->now(); |
| CB_ASSERT_OK_AND_ASSIGN(day_index, cobalt::util::TimePointToDayIndex( |
| now, *civil_time_converter_, *record->metric())); |
| CB_ASSERT_OK_AND_ASSIGN( |
| hour_id, cobalt::util::TimePointToHourId(now, *civil_time_converter_, *record->metric())); |
| record->event()->set_day_index(day_index); |
| record->event()->set_hour_id(hour_id); |
| |
| record->system_profile()->set_system_version("100"); |
| ASSERT_EQ(aggregation->AddEvent(*record, mock_clock_->now()).error_code(), StatusCode::OK); |
| record->system_profile()->set_system_version("101"); |
| ASSERT_EQ(aggregation->AddEvent(*record, mock_clock_->now()).error_code(), StatusCode::OK); |
| } |
| |
| OverrideRegistry(report_all_test::with_report_all_set::kRegistryBase64); |
| project_context = GetProjectContext(); |
| aggregation = MakeLocalAggregation(); |
| { |
| auto record = logger::EventRecord::MakeEventRecord( |
| project_context, |
| report_all_test::with_report_all_set::kOccurrenceMetricReportAllMetricId) |
| .value(); |
| record->event()->mutable_occurrence_event()->set_count(1); |
| std::chrono::system_clock::time_point now = mock_clock_->now(); |
| CB_ASSERT_OK_AND_ASSIGN(day_index, cobalt::util::TimePointToDayIndex( |
| now, *civil_time_converter_, *record->metric())); |
| CB_ASSERT_OK_AND_ASSIGN( |
| hour_id, cobalt::util::TimePointToHourId(now, *civil_time_converter_, *record->metric())); |
| record->event()->set_day_index(day_index); |
| record->event()->set_hour_id(hour_id); |
| |
| record->system_profile()->set_system_version("100"); |
| ASSERT_EQ(aggregation->AddEvent(*record, mock_clock_->now()).error_code(), StatusCode::OK); |
| record->system_profile()->set_system_version("101"); |
| ASSERT_EQ(aggregation->AddEvent(*record, mock_clock_->now()).error_code(), StatusCode::OK); |
| } |
| |
| aggregation->GenerateAggregatedObservations(mock_clock_->now() + util::kOneDay); |
| ASSERT_EQ(test_writer_->num_observations_added(), 2); |
| EXPECT_EQ(test_writer_->metadata_received[0]->customer_id(), kCustomerId); |
| |
| lib::MetricIdentifier metric_id = project_context->Identifier().ForMetric( |
| report_all_test::with_report_all_set::kOccurrenceMetricReportAllMetricId); |
| |
| // The first two metrics will be logged to the non partitioned data portion. When the |
| // migration happens, those two will be moved to whatever value the SystemProfile currently |
| // holds. Thus 2 ungrouped + 1 grouped = 3 |
| VerifyStoredIntegerObservation( |
| 0, |
| metric_id.ForReport(report_all_test::with_report_all_set:: |
| kOccurrenceMetricReportAllUniqueDeviceCounts1DayReportAllReportId), |
| day_index, "101", {3}); |
| VerifyStoredIntegerObservation( |
| 1, |
| metric_id.ForReport(report_all_test::with_report_all_set:: |
| kOccurrenceMetricReportAllUniqueDeviceCounts1DayReportAllReportId), |
| day_index, "100", {1}); |
| } |
| |
| TEST_F(LocalAggregationTest, MigrateFromReportAll) { |
| OverrideRegistry(report_all_test::with_report_all_set::kRegistryBase64); |
| std::shared_ptr<const logger::ProjectContext> project_context = GetProjectContext(); |
| |
| std::unique_ptr<LocalAggregation> aggregation = MakeLocalAggregation(); |
| util::TimeInfo time_info; |
| |
| { |
| auto record = logger::EventRecord::MakeEventRecord( |
| project_context, |
| report_all_test::with_report_all_set::kOccurrenceMetricReportAllMetricId) |
| .value(); |
| record->event()->mutable_occurrence_event()->set_count(1); |
| std::chrono::system_clock::time_point now = mock_clock_->now(); |
| CB_ASSERT_OK_AND_ASSIGN( |
| time_info, util::TimeInfo::FromTimePoint(now, *civil_time_converter_, *record->metric())); |
| record->event()->set_day_index(time_info.day_index); |
| record->event()->set_hour_id(time_info.hour_id); |
| |
| record->system_profile()->set_system_version("100"); |
| ASSERT_EQ(aggregation->AddEvent(*record, mock_clock_->now()).error_code(), StatusCode::OK); |
| record->system_profile()->set_system_version("101"); |
| ASSERT_EQ(aggregation->AddEvent(*record, mock_clock_->now()).error_code(), StatusCode::OK); |
| } |
| |
| OverrideRegistry(report_all_test::with_select_last_set::kRegistryBase64); |
| project_context = GetProjectContext(); |
| aggregation = MakeLocalAggregation(); |
| { |
| auto record = logger::EventRecord::MakeEventRecord( |
| project_context, |
| report_all_test::with_select_last_set::kOccurrenceMetricReportAllMetricId) |
| .value(); |
| record->event()->mutable_occurrence_event()->set_count(1); |
| std::chrono::system_clock::time_point now = mock_clock_->now(); |
| CB_ASSERT_OK_AND_ASSIGN( |
| time_info, util::TimeInfo::FromTimePoint(now, *civil_time_converter_, *record->metric())); |
| record->event()->set_day_index(time_info.day_index); |
| record->event()->set_hour_id(time_info.hour_id); |
| |
| record->system_profile()->set_system_version("100"); |
| ASSERT_EQ(aggregation->AddEvent(*record, mock_clock_->now()).error_code(), StatusCode::OK); |
| record->system_profile()->set_system_version("101"); |
| ASSERT_EQ(aggregation->AddEvent(*record, mock_clock_->now()).error_code(), StatusCode::OK); |
| } |
| |
| aggregation->GenerateAggregatedObservations(mock_clock_->now() + util::kOneDay); |
| ASSERT_EQ(test_writer_->num_observations_added(), 1); |
| EXPECT_EQ(test_writer_->metadata_received[0]->customer_id(), kCustomerId); |
| |
| lib::MetricIdentifier metric_id = project_context->Identifier().ForMetric( |
| report_all_test::with_select_last_set::kOccurrenceMetricReportAllMetricId); |
| |
| // All 4 metrics should be grouped back together under system_version 101 (the most recently |
| // seen). |
| VerifyStoredIntegerObservation( |
| 0, |
| metric_id.ForReport(report_all_test::with_select_last_set:: |
| kOccurrenceMetricReportAllUniqueDeviceCounts1DayReportAllReportId), |
| time_info.day_index, "101", {4}); |
| } |
| |
| TEST_F(LocalAggregationTest, ReportAllWorks) { |
| const int32_t kNumPartitions = 15; |
| OverrideRegistry(report_all_test::with_report_all_set::kRegistryBase64); |
| std::shared_ptr<const logger::ProjectContext> project_context = GetProjectContext(); |
| |
| std::unique_ptr<LocalAggregation> aggregation = MakeLocalAggregation(); |
| |
| auto record = |
| logger::EventRecord::MakeEventRecord( |
| project_context, report_all_test::with_report_all_set::kOccurrenceMetricReportAllMetricId) |
| .value(); |
| record->event()->mutable_occurrence_event()->set_count(1); |
| std::chrono::system_clock::time_point now = mock_clock_->now(); |
| CB_ASSERT_OK_AND_ASSIGN( |
| util::TimeInfo time_info, |
| util::TimeInfo::FromTimePoint(now, *civil_time_converter_, *record->metric())); |
| record->event()->set_day_index(time_info.day_index); |
| record->event()->set_hour_id(time_info.hour_id); |
| for (int32_t i = 1; i <= kNumPartitions; i++) { |
| for (int32_t j = i; j <= kNumPartitions; j++) { |
| record->system_profile()->set_system_version(absl::StrCat("Version ", j)); |
| ASSERT_EQ(aggregation->AddEvent(*record, mock_clock_->now()).error_code(), StatusCode::OK); |
| } |
| } |
| |
| aggregation->GenerateAggregatedObservations(mock_clock_->now() + util::kOneDay); |
| ASSERT_EQ(test_writer_->num_observations_added(), kNumPartitions); |
| ASSERT_EQ(test_writer_->metadata_received[0]->customer_id(), kCustomerId); |
| size_t i = 0; |
| for (const std::unique_ptr<observation_store::StoredObservation>& obs : |
| test_writer_->messages_received) { |
| ASSERT_TRUE(obs->has_unencrypted()); |
| |
| Observation observation = obs->unencrypted(); |
| ASSERT_TRUE(observation.has_integer()); |
| ASSERT_GT(observation.integer().values_size(), 0); |
| |
| std::string system_version = |
| test_writer_->metadata_received[i]->system_profile().system_version(); |
| int system_version_number = std::stoi(system_version.substr(system_version.find(' '))); |
| ASSERT_EQ(observation.integer().values(0).value(), system_version_number); |
| ++i; |
| } |
| } |
| |
| } // namespace cobalt::local_aggregation |