| // Copyright 2020 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "src/local_aggregation_1_1/local_aggregation.h" |
| |
| #include <time.h> |
| |
| #include <gmock/gmock.h> |
| #include <gtest/gtest.h> |
| |
| #include "absl/strings/escaping.h" |
| #include "absl/strings/str_cat.h" |
| #include "src/lib/util/clock.h" |
| #include "src/lib/util/testing/test_with_files.h" |
| #include "src/local_aggregation_1_1/testing/report_all_test_registry_default_system_profile_selection.cb.h" |
| #include "src/local_aggregation_1_1/testing/test_registry.cb.h" |
| #include "src/logger/event_record.h" |
| #include "src/logger/logger_test_utils.h" |
| #include "src/logger/observation_writer.h" |
| #include "src/logger/project_context_factory.h" |
| #include "src/observation_store/observation_store_internal.pb.h" |
| #include "src/pb/metadata_builder.h" |
| #include "src/public/lib/registry_identifiers.h" |
| #include "src/system_data/client_secret.h" |
| #include "src/system_data/fake_system_data.h" |
| |
| namespace cobalt::local_aggregation { |
| using ::testing::Contains; |
| using ::testing::Not; |
| using TimeInfo = util::TimeInfo; |
| |
| namespace { |
| |
| // Start 9th January 2014 at 10 minutes after 7PM, UTC. |
| std::tm kStartingTime = { |
| .tm_sec = 0, |
| .tm_min = 10, |
| .tm_hour = 19, |
| .tm_mday = 9, // 9th |
| .tm_mon = 0, // January |
| .tm_year = 114, // 2014 |
| }; |
| const std::chrono::time_point<std::chrono::system_clock> kStartingTimePoint = |
| std::chrono::system_clock::from_time_t(timegm(&kStartingTime)); |
| |
| std::unique_ptr<CobaltRegistry> GetRegistry(const std::string& registryBase64) { |
| std::string bytes; |
| if (!absl::Base64Unescape(registryBase64, &bytes)) { |
| LOG(ERROR) << "Unable to decode Base64 String"; |
| return nullptr; |
| } |
| |
| auto registry = std::make_unique<CobaltRegistry>(); |
| if (!registry->ParseFromString(bytes)) { |
| LOG(ERROR) << "Unable to parse registry from bytes"; |
| return nullptr; |
| } |
| |
| return registry; |
| } |
| |
| } // namespace |
| |
| class LocalAggregationTest : public util::testing::TestWithFiles { |
| private: |
| void SetUp() override { |
| TestWithFiles::SetUp(); |
| global_project_context_factory_ = |
| std::make_unique<logger::ProjectContextFactory>(GetRegistry(kCobaltRegistryBase64)); |
| mock_clock_ = |
| std::make_unique<util::IncrementingSystemClock>(std::chrono::system_clock::duration(0)); |
| mock_clock_->set_time(kStartingTimePoint); |
| validated_clock_ = std::make_unique<util::FakeValidatedClock>(mock_clock_.get()); |
| validated_clock_->SetAccurate(true); |
| metadata_builder_ = std::make_unique<MetadataBuilder>(&system_data_, validated_clock_.get(), |
| system_data_cache_path(), fs()); |
| metadata_builder_->SnapshotSystemData(); |
| test_writer_ = std::make_unique<logger::testing::FakeObservationStore>(); |
| observation_writer_ = std::make_unique<logger::ObservationWriter>(test_writer_.get(), nullptr); |
| } |
| |
| protected: |
| std::unique_ptr<LocalAggregation> MakeLocalAggregation() { |
| CobaltConfig cfg = {.local_aggregate_store_dir = local_aggregation_store_path(), |
| .client_secret = system_data::ClientSecret::GenerateNewSecret()}; |
| |
| return std::make_unique<LocalAggregation>(cfg, global_project_context_factory_.get(), |
| metadata_builder_.get(), fs(), |
| observation_writer_.get()); |
| } |
| |
| void OverrideRegistry(const std::string& registryBase64) { |
| global_project_context_factory_ = |
| std::make_unique<logger::ProjectContextFactory>(GetRegistry(registryBase64)); |
| } |
| |
| std::shared_ptr<const logger::ProjectContext> GetProjectContext() { |
| return global_project_context_factory_->NewProjectContext( |
| lib::CustomerIdentifier(kCustomerId).ForProject(kProjectId)); |
| } |
| |
| void VerifyStoredIntegerObservation( |
| int observation_num, lib::ReportIdentifier report, uint32_t day_index, |
| std::vector<int> expected_count, |
| std::vector<std::vector<uint32_t>> expected_event_codes = {}) { |
| ObservationMetadata* metadata = test_writer_->metadata_received[observation_num].get(); |
| EXPECT_EQ(metadata->customer_id(), report.customer_id()) |
| << " for observation: " << observation_num; |
| EXPECT_EQ(metadata->project_id(), report.project_id()) |
| << " for observation: " << observation_num; |
| EXPECT_EQ(metadata->metric_id(), report.metric_id()) << " for observation: " << observation_num; |
| EXPECT_EQ(metadata->report_id(), report.report_id()) << " for observation: " << observation_num; |
| EXPECT_EQ(metadata->day_index(), day_index) << " for observation: " << observation_num; |
| observation_store::StoredObservation* stored_observation = |
| test_writer_->messages_received[observation_num].get(); |
| ASSERT_TRUE(stored_observation->has_unencrypted()) << " for observation: " << observation_num; |
| ASSERT_TRUE(stored_observation->unencrypted().has_integer()) |
| << " for observation: " << observation_num; |
| ASSERT_EQ(stored_observation->unencrypted().integer().values_size(), expected_count.size()) |
| << " for observation: " << observation_num; |
| if (!expected_event_codes.empty()) { |
| ASSERT_EQ(expected_event_codes.size(), expected_count.size()) |
| << " for observation: " << observation_num; |
| } |
| for (int i = 0; i < expected_count.size(); i++) { |
| EXPECT_EQ(stored_observation->unencrypted().integer().values(i).value(), expected_count[i]) |
| << " for observation " << observation_num << " integer value: " << i; |
| if (!expected_event_codes.empty()) { |
| EXPECT_THAT(stored_observation->unencrypted().integer().values(i).event_codes(), |
| testing::ElementsAreArray(expected_event_codes[i])) |
| << " for observation " << observation_num << " integer value: " << i; |
| } |
| } |
| } |
| |
| system_data::FakeSystemData system_data_; |
| std::unique_ptr<logger::testing::FakeObservationStore> test_writer_; |
| std::unique_ptr<logger::ObservationWriter> observation_writer_; |
| std::unique_ptr<util::IncrementingSystemClock> mock_clock_; |
| |
| private: |
| std::unique_ptr<logger::ProjectContextFactory> global_project_context_factory_; |
| std::unique_ptr<util::FakeValidatedClock> validated_clock_; |
| std::unique_ptr<MetadataBuilder> metadata_builder_; |
| }; |
| |
| TEST_F(LocalAggregationTest, AddEventWorks) { |
| std::unique_ptr<LocalAggregation> aggregation = MakeLocalAggregation(); |
| |
| ASSERT_THAT(fs()->ListFiles(absl::StrCat(local_aggregation_store_path(), "/", kCustomerId, "/", |
| kProjectId)) |
| .ConsumeValueOrDie(), |
| Not(Contains(absl::StrCat(kOccurrenceMetricMetricId)))); |
| |
| auto record = logger::EventRecord::MakeEventRecord(GetProjectContext(), kOccurrenceMetricMetricId) |
| .ValueOrDie(); |
| record->event()->mutable_occurrence_event(); |
| aggregation->AddEvent(*record); |
| |
| ASSERT_THAT(fs()->ListFiles(absl::StrCat(local_aggregation_store_path(), "/", kCustomerId, "/", |
| kProjectId)) |
| .ConsumeValueOrDie(), |
| Contains(absl::StrCat(kOccurrenceMetricMetricId))); |
| } |
| |
| TEST_F(LocalAggregationTest, DisableWorks) { |
| std::unique_ptr<LocalAggregation> aggregation = MakeLocalAggregation(); |
| aggregation->Disable(true); |
| |
| // Check that the no data is present. |
| ASSERT_THAT(fs()->ListFiles(absl::StrCat(local_aggregation_store_path(), "/", kCustomerId, "/", |
| kProjectId)) |
| .ConsumeValueOrDie(), |
| Not(Contains(absl::StrCat(kOccurrenceMetricMetricId)))); |
| |
| auto record = logger::EventRecord::MakeEventRecord(GetProjectContext(), kOccurrenceMetricMetricId) |
| .ValueOrDie(); |
| record->event()->mutable_occurrence_event(); |
| aggregation->AddEvent(*record); |
| |
| // Check that the data still isn't present. |
| ASSERT_THAT(fs()->ListFiles(absl::StrCat(local_aggregation_store_path(), "/", kCustomerId, "/", |
| kProjectId)) |
| .ConsumeValueOrDie(), |
| Not(Contains(absl::StrCat(kOccurrenceMetricMetricId)))); |
| } |
| |
| TEST_F(LocalAggregationTest, GenerateObservationsWorks) { |
| std::shared_ptr<const logger::ProjectContext> project_context = GetProjectContext(); |
| lib::MetricIdentifier metric_id = |
| project_context->Identifier().ForMetric(kOccurrenceMetricMetricId); |
| const MetricDefinition* metric = project_context->GetMetric(kOccurrenceMetricMetricId); |
| // This test assumes that this metric has no reports with added privacy. |
| for (const ReportDefinition& report : metric->reports()) { |
| ASSERT_EQ(ReportDefinition::NO_ADDED_PRIVACY, report.privacy_level()); |
| } |
| |
| std::unique_ptr<LocalAggregation> aggregation = MakeLocalAggregation(); |
| |
| // Before any events occur, aggregation run happens, no observations are generated. |
| aggregation->GenerateAggregatedObservations( |
| util::TimeInfo::FromTimePointPrevious(mock_clock_->now(), MetricDefinition::UTC), |
| util::TimeInfo::FromTimePointPrevious(mock_clock_->now(), MetricDefinition::LOCAL)); |
| ASSERT_EQ(test_writer_->num_observations_added(), 0); |
| |
| // 10 minutes later, 4 events occur. |
| mock_clock_->increment_by(std::chrono::minutes(10)); |
| std::unique_ptr<logger::EventRecord> record = |
| logger::EventRecord::MakeEventRecord(project_context, kOccurrenceMetricMetricId).ValueOrDie(); |
| record->event()->mutable_occurrence_event()->set_count(1); |
| time_t now = std::chrono::system_clock::to_time_t(mock_clock_->now()); |
| uint32_t day_index = cobalt::util::TimeToDayIndex(now, record->metric()->time_zone_policy()); |
| record->event()->set_day_index(day_index); |
| record->event()->set_hour_id( |
| cobalt::util::TimeToHourId(now, record->metric()->time_zone_policy())); |
| ASSERT_TRUE(aggregation->AddEvent(*record).ok()); |
| ASSERT_TRUE(aggregation->AddEvent(*record).ok()); |
| ASSERT_TRUE(aggregation->AddEvent(*record).ok()); |
| ASSERT_TRUE(aggregation->AddEvent(*record).ok()); |
| |
| // 50 minutes later (1 hour from the previous run) at 8:10PM, aggregation run happens. |
| mock_clock_->increment_by(std::chrono::minutes(50)); |
| aggregation->GenerateAggregatedObservations( |
| util::TimeInfo::FromTimePointPrevious(mock_clock_->now(), MetricDefinition::UTC), |
| util::TimeInfo::FromTimePointPrevious(mock_clock_->now(), MetricDefinition::LOCAL)); |
| |
| // 2 observations created, for fleetwide_occurrence_counts_report and hourly_device_histograms. |
| ASSERT_EQ(test_writer_->num_observations_added(), 2); |
| VerifyStoredIntegerObservation( |
| 0, metric_id.ForReport(kOccurrenceMetricFleetwideOccurrenceCountsReportReportId), day_index, |
| {4}); |
| VerifyStoredIntegerObservation( |
| 1, metric_id.ForReport(kOccurrenceMetricHourlyDeviceHistogramsReportId), day_index, {4}); |
| |
| // An hour later at 9:10PM, no more events, next aggregation run happens. |
| mock_clock_->increment_by(std::chrono::minutes(60)); |
| aggregation->GenerateAggregatedObservations( |
| util::TimeInfo::FromTimePointPrevious(mock_clock_->now(), MetricDefinition::UTC), |
| util::TimeInfo::FromTimePointPrevious(mock_clock_->now(), MetricDefinition::LOCAL)); |
| |
| // Still only the original 2 observations created. |
| ASSERT_EQ(test_writer_->num_observations_added(), 2); |
| |
| // 45 minutes later, 2 more events occur. |
| mock_clock_->increment_by(std::chrono::minutes(45)); |
| now = std::chrono::system_clock::to_time_t(mock_clock_->now()); |
| record->event()->set_day_index(day_index); |
| record->event()->set_hour_id( |
| cobalt::util::TimeToHourId(now, record->metric()->time_zone_policy())); |
| ASSERT_TRUE(aggregation->AddEvent(*record).ok()); |
| ASSERT_TRUE(aggregation->AddEvent(*record).ok()); |
| |
| // 15 minutes later (1 hour from the previous run) at 10:10PM, aggregation run happens. |
| mock_clock_->increment_by(std::chrono::minutes(15)); |
| aggregation->GenerateAggregatedObservations( |
| util::TimeInfo::FromTimePointPrevious(mock_clock_->now(), MetricDefinition::UTC), |
| util::TimeInfo::FromTimePointPrevious(mock_clock_->now(), MetricDefinition::LOCAL)); |
| |
| // 2 new observations created, this time with an incremental count of 2. |
| ASSERT_EQ(test_writer_->num_observations_added(), 4); |
| VerifyStoredIntegerObservation( |
| 2, metric_id.ForReport(kOccurrenceMetricFleetwideOccurrenceCountsReportReportId), day_index, |
| {2}); |
| VerifyStoredIntegerObservation( |
| 3, metric_id.ForReport(kOccurrenceMetricHourlyDeviceHistogramsReportId), day_index, {2}); |
| |
| // An hour later at 11:10PM, no more events, next aggregation run happens. |
| mock_clock_->increment_by(std::chrono::minutes(60)); |
| aggregation->GenerateAggregatedObservations( |
| util::TimeInfo::FromTimePointPrevious(mock_clock_->now(), MetricDefinition::UTC), |
| util::TimeInfo::FromTimePointPrevious(mock_clock_->now(), MetricDefinition::LOCAL)); |
| |
| // Still only the expected 4 observations created. |
| ASSERT_EQ(test_writer_->num_observations_added(), 4); |
| |
| // An hour later at 12:10AM (after midnight UTC), daily event aggregation run happens. |
| mock_clock_->increment_by(std::chrono::minutes(60)); |
| aggregation->GenerateAggregatedObservations( |
| util::TimeInfo::FromTimePointPrevious(mock_clock_->now(), MetricDefinition::UTC), |
| util::TimeInfo::FromTimePointPrevious(mock_clock_->now(), MetricDefinition::LOCAL)); |
| |
| // 6 new observations are created, one for each of the UNIQUE_DEVICE_COUNTS reports. |
| ASSERT_EQ(test_writer_->num_observations_added(), 10); |
| VerifyStoredIntegerObservation( |
| 4, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsReport1DayReportId), day_index, |
| {1}); |
| VerifyStoredIntegerObservation( |
| 5, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsReport7DaysReportId), day_index, |
| {1}); |
| VerifyStoredIntegerObservation( |
| 6, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsSelectFirstReport1DayReportId), |
| day_index, {1}); |
| VerifyStoredIntegerObservation( |
| 7, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsSelectFirstReport7DaysReportId), |
| day_index, {1}); |
| VerifyStoredIntegerObservation( |
| 8, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsSelectMostCommonReport1DayReportId), |
| day_index, {1}); |
| VerifyStoredIntegerObservation( |
| 9, |
| metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsSelectMostCommonReport7DaysReportId), |
| day_index, {1}); |
| |
| // The next day, no events occur so all 23 hourly aggregation runs create no new observations. |
| for (int i = 1; i <= 23; i++) { |
| mock_clock_->increment_by(std::chrono::minutes(60)); |
| aggregation->GenerateAggregatedObservations( |
| util::TimeInfo::FromTimePointPrevious(mock_clock_->now(), MetricDefinition::UTC), |
| util::TimeInfo::FromTimePointPrevious(mock_clock_->now(), MetricDefinition::LOCAL)); |
| ASSERT_EQ(test_writer_->num_observations_added(), 10); |
| } |
| |
| // Finally, at 12:10AM (just after midnight UTC), daily event aggregation run happens. |
| mock_clock_->increment_by(std::chrono::minutes(60)); |
| aggregation->GenerateAggregatedObservations( |
| util::TimeInfo::FromTimePointPrevious(mock_clock_->now(), MetricDefinition::UTC), |
| util::TimeInfo::FromTimePointPrevious(mock_clock_->now(), MetricDefinition::LOCAL)); |
| |
| // 3 new observations are created for the next day, only for the 7-day window |
| // UNIQUE_DEVICE_COUNTS reports. |
| ASSERT_EQ(test_writer_->num_observations_added(), 13); |
| day_index += 1; |
| VerifyStoredIntegerObservation( |
| 10, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsReport7DaysReportId), day_index, |
| {1}); |
| VerifyStoredIntegerObservation( |
| 11, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsSelectFirstReport7DaysReportId), |
| day_index, {1}); |
| VerifyStoredIntegerObservation( |
| 12, |
| metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsSelectMostCommonReport7DaysReportId), |
| day_index, {1}); |
| } |
| |
| TEST_F(LocalAggregationTest, GenerateObservationsWriterFailures) { |
| std::shared_ptr<const logger::ProjectContext> project_context = GetProjectContext(); |
| lib::MetricIdentifier metric_id = |
| project_context->Identifier().ForMetric(kOccurrenceMetricMetricId); |
| const MetricDefinition* metric = project_context->GetMetric(kOccurrenceMetricMetricId); |
| // This test assumes that this metric has no reports with added privacy. |
| for (const ReportDefinition& report : metric->reports()) { |
| ASSERT_EQ(ReportDefinition::NO_ADDED_PRIVACY, report.privacy_level()); |
| } |
| |
| std::unique_ptr<LocalAggregation> aggregation = MakeLocalAggregation(); |
| |
| // Before any events occur, aggregation run happens, no observations are generated. |
| Status status = aggregation->GenerateAggregatedObservations( |
| util::TimeInfo::FromTimePointPrevious(mock_clock_->now(), MetricDefinition::UTC), |
| util::TimeInfo::FromTimePointPrevious(mock_clock_->now(), MetricDefinition::LOCAL)); |
| ASSERT_EQ(status.error_code(), StatusCode::OK); |
| ASSERT_EQ(test_writer_->num_observations_added(), 0); |
| |
| // 10 minutes later, 4 events occur. |
| mock_clock_->increment_by(std::chrono::minutes(10)); |
| std::unique_ptr<logger::EventRecord> record = |
| logger::EventRecord::MakeEventRecord(project_context, kOccurrenceMetricMetricId).ValueOrDie(); |
| record->event()->mutable_occurrence_event()->set_count(1); |
| time_t now = std::chrono::system_clock::to_time_t(mock_clock_->now()); |
| uint32_t day_index = cobalt::util::TimeToDayIndex(now, record->metric()->time_zone_policy()); |
| record->event()->set_day_index(day_index); |
| record->event()->set_hour_id( |
| cobalt::util::TimeToHourId(now, record->metric()->time_zone_policy())); |
| ASSERT_TRUE(aggregation->AddEvent(*record).ok()); |
| ASSERT_TRUE(aggregation->AddEvent(*record).ok()); |
| ASSERT_TRUE(aggregation->AddEvent(*record).ok()); |
| ASSERT_TRUE(aggregation->AddEvent(*record).ok()); |
| |
| // 50 minutes later (1 hour from the previous run) at 8:10PM, aggregation run happens. |
| // But errors occur in the writing to the ObservationStore. |
| test_writer_->SetFailCalls(true); |
| mock_clock_->increment_by(std::chrono::minutes(50)); |
| status = aggregation->GenerateAggregatedObservations( |
| util::TimeInfo::FromTimePointPrevious(mock_clock_->now(), MetricDefinition::UTC), |
| util::TimeInfo::FromTimePointPrevious(mock_clock_->now(), MetricDefinition::LOCAL)); |
| ASSERT_NE(status.error_code(), StatusCode::OK); |
| ASSERT_EQ(test_writer_->num_observations_added(), 0); |
| |
| // An hour later at 9:10PM, no more events, next aggregation run happens. |
| // The previously aggregated events should be retried. This time the writing succeeds. |
| test_writer_->SetFailCalls(false); |
| mock_clock_->increment_by(std::chrono::minutes(60)); |
| status = aggregation->GenerateAggregatedObservations( |
| util::TimeInfo::FromTimePointPrevious(mock_clock_->now(), MetricDefinition::UTC), |
| util::TimeInfo::FromTimePointPrevious(mock_clock_->now(), MetricDefinition::LOCAL)); |
| ASSERT_EQ(status.error_code(), StatusCode::OK); |
| |
| // 2 observations created, for fleetwide_occurrence_counts_report and hourly_device_histograms. |
| ASSERT_EQ(test_writer_->num_observations_added(), 2); |
| VerifyStoredIntegerObservation( |
| 0, metric_id.ForReport(kOccurrenceMetricFleetwideOccurrenceCountsReportReportId), day_index, |
| {4}); |
| VerifyStoredIntegerObservation( |
| 1, metric_id.ForReport(kOccurrenceMetricHourlyDeviceHistogramsReportId), day_index, {4}); |
| |
| // 45 minutes later, 2 more events occur. |
| mock_clock_->increment_by(std::chrono::minutes(45)); |
| now = std::chrono::system_clock::to_time_t(mock_clock_->now()); |
| record->event()->set_day_index(day_index); |
| record->event()->set_hour_id( |
| cobalt::util::TimeToHourId(now, record->metric()->time_zone_policy())); |
| ASSERT_TRUE(aggregation->AddEvent(*record).ok()); |
| ASSERT_TRUE(aggregation->AddEvent(*record).ok()); |
| |
| // 15 minutes later (1 hour from the previous run) at 10:10PM, aggregation run happens. |
| // But errors occur again in the writing to the ObservationStore. |
| test_writer_->SetFailCalls(true); |
| mock_clock_->increment_by(std::chrono::minutes(15)); |
| status = aggregation->GenerateAggregatedObservations( |
| util::TimeInfo::FromTimePointPrevious(mock_clock_->now(), MetricDefinition::UTC), |
| util::TimeInfo::FromTimePointPrevious(mock_clock_->now(), MetricDefinition::LOCAL)); |
| ASSERT_NE(status.error_code(), StatusCode::OK); |
| ASSERT_EQ(test_writer_->num_observations_added(), 2); |
| |
| // 30 minutes later, 1 more event occurs. |
| mock_clock_->increment_by(std::chrono::minutes(30)); |
| now = std::chrono::system_clock::to_time_t(mock_clock_->now()); |
| record->event()->set_day_index(day_index); |
| record->event()->set_hour_id( |
| cobalt::util::TimeToHourId(now, record->metric()->time_zone_policy())); |
| ASSERT_TRUE(aggregation->AddEvent(*record).ok()); |
| |
| // 30 minutes later at 11:10PM, next aggregation run happens, but still fails. |
| mock_clock_->increment_by(std::chrono::minutes(30)); |
| status = aggregation->GenerateAggregatedObservations( |
| util::TimeInfo::FromTimePointPrevious(mock_clock_->now(), MetricDefinition::UTC), |
| util::TimeInfo::FromTimePointPrevious(mock_clock_->now(), MetricDefinition::LOCAL)); |
| ASSERT_NE(status.error_code(), StatusCode::OK); |
| ASSERT_EQ(test_writer_->num_observations_added(), 2); |
| |
| // An hour later at 12:10AM (after midnight UTC), daily event aggregation run happens and fails. |
| mock_clock_->increment_by(std::chrono::minutes(60)); |
| status = aggregation->GenerateAggregatedObservations( |
| util::TimeInfo::FromTimePointPrevious(mock_clock_->now(), MetricDefinition::UTC), |
| util::TimeInfo::FromTimePointPrevious(mock_clock_->now(), MetricDefinition::LOCAL)); |
| ASSERT_NE(status.error_code(), StatusCode::OK); |
| ASSERT_EQ(test_writer_->num_observations_added(), 2); |
| |
| // An hour later at 1:10AM (after midnight UTC), now aggregation succeeds, both daily and hourly. |
| test_writer_->SetFailCalls(false); |
| mock_clock_->increment_by(std::chrono::minutes(60)); |
| status = aggregation->GenerateAggregatedObservations( |
| util::TimeInfo::FromTimePointPrevious(mock_clock_->now(), MetricDefinition::UTC), |
| util::TimeInfo::FromTimePointPrevious(mock_clock_->now(), MetricDefinition::LOCAL)); |
| ASSERT_EQ(status.error_code(), StatusCode::OK); |
| |
| // 8 total new observations created. |
| // 4 new hourly observations created, 2 with an incremental count of 2, 2 with count of 1. |
| // 6 new daily observations are created, one for each of the UNIQUE_DEVICE_COUNTS reports. |
| ASSERT_EQ(test_writer_->num_observations_added(), 12); |
| VerifyStoredIntegerObservation( |
| 2, metric_id.ForReport(kOccurrenceMetricFleetwideOccurrenceCountsReportReportId), day_index, |
| {2}); |
| VerifyStoredIntegerObservation( |
| 3, metric_id.ForReport(kOccurrenceMetricFleetwideOccurrenceCountsReportReportId), day_index, |
| {1}); |
| VerifyStoredIntegerObservation( |
| 4, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsReport1DayReportId), day_index, |
| {1}); |
| VerifyStoredIntegerObservation( |
| 5, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsReport7DaysReportId), day_index, |
| {1}); |
| VerifyStoredIntegerObservation( |
| 6, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsSelectFirstReport1DayReportId), |
| day_index, {1}); |
| VerifyStoredIntegerObservation( |
| 7, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsSelectFirstReport7DaysReportId), |
| day_index, {1}); |
| VerifyStoredIntegerObservation( |
| 8, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsSelectMostCommonReport1DayReportId), |
| day_index, {1}); |
| VerifyStoredIntegerObservation( |
| 9, |
| metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsSelectMostCommonReport7DaysReportId), |
| day_index, {1}); |
| VerifyStoredIntegerObservation( |
| 10, metric_id.ForReport(kOccurrenceMetricHourlyDeviceHistogramsReportId), day_index, {2}); |
| VerifyStoredIntegerObservation( |
| 11, metric_id.ForReport(kOccurrenceMetricHourlyDeviceHistogramsReportId), day_index, {1}); |
| } |
| |
| TEST_F(LocalAggregationTest, GenerateObservationsExpedited) { |
| std::shared_ptr<const logger::ProjectContext> project_context = GetProjectContext(); |
| lib::MetricIdentifier metric_id = |
| project_context->Identifier().ForMetric(kExpeditedOccurrenceMetricMetricId); |
| const MetricDefinition* metric = project_context->GetMetric(kExpeditedOccurrenceMetricMetricId); |
| // This test assumes that this metric has no reports with added privacy. |
| for (const ReportDefinition& report : metric->reports()) { |
| ASSERT_EQ(ReportDefinition::NO_ADDED_PRIVACY, report.privacy_level()); |
| } |
| |
| std::unique_ptr<LocalAggregation> aggregation = MakeLocalAggregation(); |
| |
| // Start 9th January, 2014 at 10 minutes after 7PM, UTC |
| std::tm tm; |
| strptime("2014-01-09 19:10:00", "%Y-%m-%d %H:%M:%S", &tm); |
| std::chrono::time_point<std::chrono::system_clock> system_time = |
| std::chrono::system_clock::from_time_t(timegm(&tm)); |
| |
| // Before any events occur, aggregation run happens, no observations are generated. |
| aggregation->GenerateAggregatedObservations( |
| util::TimeInfo::FromTimePointPrevious(system_time, MetricDefinition::UTC), |
| util::TimeInfo::FromTimePointPrevious(system_time, MetricDefinition::LOCAL)); |
| ASSERT_EQ(test_writer_->num_observations_added(), 0); |
| |
| // 10 minutes later, 4 events occur with event code 45. |
| system_time += std::chrono::minutes(10); |
| std::unique_ptr<logger::EventRecord> record = |
| logger::EventRecord::MakeEventRecord(project_context, kExpeditedOccurrenceMetricMetricId) |
| .ValueOrDie(); |
| record->event()->mutable_occurrence_event()->set_count(1); |
| record->event()->mutable_occurrence_event()->add_event_code( |
| ExpeditedOccurrenceMetricMetricDimensionFirstDimension_Code45); |
| time_t now = std::chrono::system_clock::to_time_t(system_time); |
| uint32_t day_index = cobalt::util::TimeToDayIndex(now, record->metric()->time_zone_policy()); |
| record->event()->set_day_index(day_index); |
| record->event()->set_hour_id( |
| cobalt::util::TimeToHourId(now, record->metric()->time_zone_policy())); |
| ASSERT_TRUE(aggregation->AddEvent(*record).ok()); |
| ASSERT_TRUE(aggregation->AddEvent(*record).ok()); |
| ASSERT_TRUE(aggregation->AddEvent(*record).ok()); |
| ASSERT_TRUE(aggregation->AddEvent(*record).ok()); |
| |
| // 50 minutes later (1 hour from the previous run) at 8:10PM, aggregation run happens. |
| system_time += std::chrono::minutes(50); |
| aggregation->GenerateAggregatedObservations( |
| util::TimeInfo::FromTimePointPrevious(system_time, MetricDefinition::UTC), |
| util::TimeInfo::FromTimePointPrevious(system_time, MetricDefinition::LOCAL)); |
| |
| // 4 observations created, for the expedited UNIQUE_DEVICE_COUNTS reports. |
| ASSERT_EQ(test_writer_->num_observations_added(), 4); |
| VerifyStoredIntegerObservation( |
| 0, metric_id.ForReport(kExpeditedOccurrenceMetricAtLeastOnce1DayReportId), day_index, {1}, |
| {{ExpeditedOccurrenceMetricMetricDimensionFirstDimension_Code45}}); |
| VerifyStoredIntegerObservation( |
| 1, metric_id.ForReport(kExpeditedOccurrenceMetricAtLeastOnce7DayReportId), day_index, {1}, |
| {{ExpeditedOccurrenceMetricMetricDimensionFirstDimension_Code45}}); |
| VerifyStoredIntegerObservation( |
| 2, metric_id.ForReport(kExpeditedOccurrenceMetricSelectFirst1DayReportId), day_index, {1}, |
| {{ExpeditedOccurrenceMetricMetricDimensionFirstDimension_Code45}}); |
| VerifyStoredIntegerObservation( |
| 3, metric_id.ForReport(kExpeditedOccurrenceMetricSelectFirst7DayReportId), day_index, {1}, |
| {{ExpeditedOccurrenceMetricMetricDimensionFirstDimension_Code45}}); |
| |
| // An hour later at 9:10PM, no more events, next aggregation run happens. |
| system_time += std::chrono::minutes(60); |
| aggregation->GenerateAggregatedObservations( |
| util::TimeInfo::FromTimePointPrevious(system_time, MetricDefinition::UTC), |
| util::TimeInfo::FromTimePointPrevious(system_time, MetricDefinition::LOCAL)); |
| |
| // Still only the original observations created, expedited observations are not re-sent. |
| ASSERT_EQ(test_writer_->num_observations_added(), 4); |
| |
| // 45 minutes later, 2 more events occur, one with a different event code 46. |
| system_time += std::chrono::minutes(45); |
| record->event()->mutable_occurrence_event()->set_event_code( |
| 0, ExpeditedOccurrenceMetricMetricDimensionFirstDimension_Code46); |
| now = std::chrono::system_clock::to_time_t(system_time); |
| record->event()->set_day_index(day_index); |
| record->event()->set_hour_id( |
| cobalt::util::TimeToHourId(now, record->metric()->time_zone_policy())); |
| ASSERT_TRUE(aggregation->AddEvent(*record).ok()); |
| record->event()->mutable_occurrence_event()->set_event_code( |
| 0, ExpeditedOccurrenceMetricMetricDimensionFirstDimension_Code46); |
| ASSERT_TRUE(aggregation->AddEvent(*record).ok()); |
| |
| // 15 minutes later (1 hour from the previous run) at 10:10PM, aggregation run happens. |
| system_time += std::chrono::minutes(15); |
| aggregation->GenerateAggregatedObservations( |
| util::TimeInfo::FromTimePointPrevious(system_time, MetricDefinition::UTC), |
| util::TimeInfo::FromTimePointPrevious(system_time, MetricDefinition::LOCAL)); |
| |
| // 2 new observation for the AT_LEAST_ONCE reports, with the new event code of 46. |
| // Note that the expedited SELECT_FIRST report was already sent, so doesn't get new observations. |
| // Also, no new observations for event code 45 are created, as they were already sent. |
| ASSERT_EQ(test_writer_->num_observations_added(), 6); |
| VerifyStoredIntegerObservation( |
| 4, metric_id.ForReport(kExpeditedOccurrenceMetricAtLeastOnce1DayReportId), day_index, {1}, |
| {{ExpeditedOccurrenceMetricMetricDimensionFirstDimension_Code46}}); |
| VerifyStoredIntegerObservation( |
| 5, metric_id.ForReport(kExpeditedOccurrenceMetricAtLeastOnce7DayReportId), day_index, {1}, |
| {{ExpeditedOccurrenceMetricMetricDimensionFirstDimension_Code46}}); |
| |
| // An hour later at 11:10PM, no more events, next aggregation run happens. |
| system_time += std::chrono::minutes(60); |
| aggregation->GenerateAggregatedObservations( |
| util::TimeInfo::FromTimePointPrevious(system_time, MetricDefinition::UTC), |
| util::TimeInfo::FromTimePointPrevious(system_time, MetricDefinition::LOCAL)); |
| |
| // Still only the expected original observation created. |
| ASSERT_EQ(test_writer_->num_observations_added(), 6); |
| |
| // An hour later at 12:10AM (after midnight UTC), daily event aggregation run happens. |
| system_time += std::chrono::minutes(60); |
| aggregation->GenerateAggregatedObservations( |
| util::TimeInfo::FromTimePointPrevious(system_time, MetricDefinition::UTC), |
| util::TimeInfo::FromTimePointPrevious(system_time, MetricDefinition::LOCAL)); |
| |
| // Note that the daily expedited reports were already sent, so they don't get new observations. |
| // But 2 more expedited observations are created for the next day's 7day reports: an AT_LEAST_ONCE |
| // report containing the codes 45 and 46, and a SELECT_FIRST report for the code 45. |
| ASSERT_EQ(test_writer_->num_observations_added(), 8); |
| day_index += 1; |
| VerifyStoredIntegerObservation( |
| 6, metric_id.ForReport(kExpeditedOccurrenceMetricAtLeastOnce7DayReportId), day_index, {1, 1}, |
| {{ExpeditedOccurrenceMetricMetricDimensionFirstDimension_Code45}, |
| {ExpeditedOccurrenceMetricMetricDimensionFirstDimension_Code46}}); |
| VerifyStoredIntegerObservation( |
| 7, metric_id.ForReport(kExpeditedOccurrenceMetricSelectFirst7DayReportId), day_index, {1}, |
| {{ExpeditedOccurrenceMetricMetricDimensionFirstDimension_Code45}}); |
| |
| // The next day, no events occur so all 23 hourly aggregation runs create no new observations. |
| for (int i = 1; i <= 23; i++) { |
| system_time += std::chrono::minutes(60); |
| aggregation->GenerateAggregatedObservations( |
| util::TimeInfo::FromTimePointPrevious(system_time, MetricDefinition::UTC), |
| util::TimeInfo::FromTimePointPrevious(system_time, MetricDefinition::LOCAL)); |
| ASSERT_EQ(test_writer_->num_observations_added(), 8); |
| } |
| |
| // Finally, at 12:10AM (just after midnight UTC), daily event aggregation run happens. |
| system_time += std::chrono::minutes(60); |
| aggregation->GenerateAggregatedObservations( |
| util::TimeInfo::FromTimePointPrevious(system_time, MetricDefinition::UTC), |
| util::TimeInfo::FromTimePointPrevious(system_time, MetricDefinition::LOCAL)); |
| |
| // 2 new observations are created, again only for the 7-day window UNIQUE_DEVICE_COUNTS reports. |
| ASSERT_EQ(test_writer_->num_observations_added(), 10); |
| day_index += 1; |
| VerifyStoredIntegerObservation( |
| 8, metric_id.ForReport(kExpeditedOccurrenceMetricAtLeastOnce7DayReportId), day_index, {1, 1}, |
| {{ExpeditedOccurrenceMetricMetricDimensionFirstDimension_Code45}, |
| {ExpeditedOccurrenceMetricMetricDimensionFirstDimension_Code46}}); |
| VerifyStoredIntegerObservation( |
| 9, metric_id.ForReport(kExpeditedOccurrenceMetricSelectFirst7DayReportId), day_index, {1}, |
| {{ExpeditedOccurrenceMetricMetricDimensionFirstDimension_Code45}}); |
| |
| // For the next 4 days (3 days of data have been sent above) no more events occur, and the 7-day |
| // reports continue to be sent at the start of the day. |
| for (int j = 0; j < 4; j++) { |
| // The 23 hourly aggregation runs. |
| for (int i = 1; i <= 23; i++) { |
| system_time += std::chrono::minutes(60); |
| aggregation->GenerateAggregatedObservations( |
| util::TimeInfo::FromTimePointPrevious(system_time, MetricDefinition::UTC), |
| util::TimeInfo::FromTimePointPrevious(system_time, MetricDefinition::LOCAL)); |
| ASSERT_EQ(test_writer_->num_observations_added(), 10 + 2 * j); |
| } |
| |
| // The 12:10AM (just after midnight UTC) daily event aggregation run. |
| system_time += std::chrono::minutes(60); |
| aggregation->GenerateAggregatedObservations( |
| util::TimeInfo::FromTimePointPrevious(system_time, MetricDefinition::UTC), |
| util::TimeInfo::FromTimePointPrevious(system_time, MetricDefinition::LOCAL)); |
| |
| // 2 new observations are created for the 7-day window UNIQUE_DEVICE_COUNTS reports. |
| ASSERT_EQ(test_writer_->num_observations_added(), 12 + 2 * j); |
| day_index += 1; |
| VerifyStoredIntegerObservation( |
| 10 + 2 * j, metric_id.ForReport(kExpeditedOccurrenceMetricAtLeastOnce7DayReportId), |
| day_index, {1, 1}, |
| {{ExpeditedOccurrenceMetricMetricDimensionFirstDimension_Code45}, |
| {ExpeditedOccurrenceMetricMetricDimensionFirstDimension_Code46}}); |
| VerifyStoredIntegerObservation( |
| 11 + 2 * j, metric_id.ForReport(kExpeditedOccurrenceMetricSelectFirst7DayReportId), |
| day_index, {1}, {{ExpeditedOccurrenceMetricMetricDimensionFirstDimension_Code45}}); |
| } |
| |
| // Finally, on the 8th day, and all subsequent days, no observations are created at all. |
| for (int i = 1; i <= 72; i++) { // run 72 more hours to be sure |
| system_time += std::chrono::minutes(60); |
| aggregation->GenerateAggregatedObservations( |
| util::TimeInfo::FromTimePointPrevious(system_time, MetricDefinition::UTC), |
| util::TimeInfo::FromTimePointPrevious(system_time, MetricDefinition::LOCAL)); |
| ASSERT_EQ(test_writer_->num_observations_added(), 18); |
| } |
| } |
| |
| } // namespace cobalt::local_aggregation |