blob: 99d0bdd56726d5ecb1688e6f7971f221417b8853 [file] [log] [blame]
// Copyright 2022 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "src/local_aggregation_1_1/local_aggregate_storage/local_aggregate_storage.h"
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include "absl/strings/escaping.h"
#include "src/lib/util/testing/test_with_files.h"
#include "src/local_aggregation_1_1/testing/test_registry.cb.h"
#include "src/logging.h"
#include "src/public/lib/statusor/statusor.h"
#include "src/system_data/fake_system_data.h"
namespace cobalt::local_aggregation {
using lib::statusor::StatusOr;
using ::testing::Contains;
using MetricAggregateRef = LocalAggregateStorage::MetricAggregateRef;
namespace {
std::unique_ptr<CobaltRegistry> GetRegistry() {
std::string bytes;
if (!absl::Base64Unescape(kCobaltRegistryBase64, &bytes)) {
LOG(ERROR) << "Unable to decode Base64 String";
return nullptr;
}
auto registry = std::make_unique<CobaltRegistry>();
if (!registry->ParseFromString(bytes)) {
LOG(ERROR) << "Unable to parse registry from bytes";
return nullptr;
}
return registry;
}
} // namespace
class LocalAggregateStorageTest : public util::testing::TestWithFiles {
private:
void SetUp() override {
MakeTestFolder();
ReplaceRegistry();
}
public:
void ReplaceRegistry(std::unique_ptr<CobaltRegistry> registry = GetRegistry(),
int64_t per_project_reserved_bytes = 0) {
system_data_.ResetChangeCallbacks();
system_data_.SetVersion("100");
global_project_context_factory_ =
std::make_unique<logger::ProjectContextFactory>(std::move(registry));
mock_clock_ =
std::make_unique<util::IncrementingSystemClock>(std::chrono::system_clock::duration(0));
mock_clock_->increment_by(std::chrono::hours(24 * 100));
validated_clock_ = std::make_unique<util::FakeValidatedClock>(mock_clock_.get());
validated_clock_->SetAccurate(true);
metadata_builder_ = std::make_unique<MetadataBuilder>(&system_data_, validated_clock_.get(),
system_data_cache_path(), fs());
metadata_builder_->SnapshotSystemData();
storage_ = LocalAggregateStorage::New(
LocalAggregateStorage::StorageStrategy::Immediate, test_folder(), fs(),
global_project_context_factory_.get(), metadata_builder_.get(), per_project_reserved_bytes);
}
protected:
system_data::FakeSystemData system_data_;
std::unique_ptr<util::IncrementingSystemClock> mock_clock_;
std::unique_ptr<logger::ProjectContextFactory> global_project_context_factory_;
std::unique_ptr<LocalAggregateStorage> storage_;
std::unique_ptr<util::FakeValidatedClock> validated_clock_;
std::unique_ptr<MetadataBuilder> metadata_builder_;
};
TEST_F(LocalAggregateStorageTest, HandlesBookkeepingAsExpected) {
lib::ProjectIdentifier proj = lib::CustomerIdentifier(123).ForProject(100);
// Passing a positive size_increase works as expected
storage_->UpdateProjectSizeBy(proj, 10);
EXPECT_EQ(storage_->AmountStored(proj), 10);
// Passing a negative size_increase works as expected
storage_->UpdateProjectSizeBy(proj, -5);
EXPECT_EQ(storage_->AmountStored(proj), 5);
// If the size increase is too large and would take the value negative, it should be clamped to 0.
storage_->UpdateProjectSizeBy(proj, -8);
EXPECT_EQ(storage_->AmountStored(proj), 0);
}
TEST_F(LocalAggregateStorageTest, MigrateDailyReportAllToSelectLast) {
// Create multiple SystemProfileAggregates in the report's aggregation, as would be created for
// a REPORT_ALL metric.
uint64_t system_profile_hash_1 = uint64_t{1234};
uint64_t system_profile_hash_2 = uint64_t{5678};
uint32_t day_index = 1000;
{
StatusOr<MetricAggregateRef> agg_or =
storage_->GetMetricAggregate(lib::CustomerIdentifier(kCustomerId)
.ForProject(kProjectId)
.ForMetric(kOccurrenceMetricMetricId));
ASSERT_TRUE(agg_or.ok());
MetricAggregateRef agg = agg_or.ConsumeValueOrDie();
ReportAggregate& report_agg =
(*agg.aggregate()
->mutable_by_report_id())[kOccurrenceMetricUniqueDeviceCountsReport7DaysReportId];
report_agg.mutable_daily()->set_last_day_index(day_index);
AggregationPeriodBucket& mutable_bucket =
(*report_agg.mutable_daily()->mutable_by_day_index())[day_index];
SystemProfileAggregate* system_profile_aggregate =
mutable_bucket.add_system_profile_aggregates();
system_profile_aggregate->set_system_profile_hash(system_profile_hash_1);
system_profile_aggregate->set_first_seen_timestamp(9000);
system_profile_aggregate->set_last_seen_timestamp(9001);
EventCodesAggregateData* data = system_profile_aggregate->add_by_event_code();
data->add_event_codes(1);
data->mutable_data()->mutable_at_least_once()->set_at_least_once(true);
system_profile_aggregate = mutable_bucket.add_system_profile_aggregates();
system_profile_aggregate->set_system_profile_hash(system_profile_hash_2);
system_profile_aggregate->set_first_seen_timestamp(10000);
system_profile_aggregate->set_last_seen_timestamp(10001);
data = system_profile_aggregate->add_by_event_code();
data->add_event_codes(2);
data->mutable_data()->mutable_at_least_once()->set_at_least_once(true);
ASSERT_EQ(agg.Save().error_code(), StatusCode::OK);
} // Scope ensures the first MetricAggregateRef gets deleted before another one is created.
ASSERT_THAT(fs()->ListFiles(absl::StrCat(test_folder(), "/", kCustomerId, "/", kProjectId))
.ConsumeValueOrDie(),
Contains("1"));
// Reload the storage, triggering a migration.
ReplaceRegistry();
StatusOr<MetricAggregateRef> agg_or =
storage_->GetMetricAggregate(lib::CustomerIdentifier(kCustomerId)
.ForProject(kProjectId)
.ForMetric(kOccurrenceMetricMetricId));
ASSERT_EQ(agg_or.status().error_code(), StatusCode::OK);
MetricAggregateRef migrated_metric_agg = agg_or.ConsumeValueOrDie();
const ReportAggregate& report_agg = migrated_metric_agg.aggregate()->by_report_id().at(
kOccurrenceMetricUniqueDeviceCountsReport7DaysReportId);
ASSERT_TRUE(report_agg.daily().by_day_index().contains(day_index));
const AggregationPeriodBucket& bucket = report_agg.daily().by_day_index().at(day_index);
ASSERT_EQ(bucket.system_profile_aggregates_size(), 1);
const SystemProfileAggregate& system_profile_aggregate = bucket.system_profile_aggregates(0);
EXPECT_EQ(system_profile_aggregate.system_profile_hash(), system_profile_hash_2);
EXPECT_EQ(system_profile_aggregate.first_seen_timestamp(), 9000);
EXPECT_EQ(system_profile_aggregate.last_seen_timestamp(), 10001);
ASSERT_EQ(system_profile_aggregate.by_event_code_size(), 2);
const EventCodesAggregateData& data = system_profile_aggregate.by_event_code(0);
ASSERT_EQ(data.event_codes_size(), 1);
EXPECT_EQ(data.event_codes(0), 1);
EXPECT_TRUE(data.data().at_least_once().at_least_once());
const EventCodesAggregateData& data2 = system_profile_aggregate.by_event_code(1);
ASSERT_EQ(data2.event_codes_size(), 1);
EXPECT_EQ(data2.event_codes(0), 2);
EXPECT_TRUE(data2.data().at_least_once().at_least_once());
}
TEST_F(LocalAggregateStorageTest, MigrateHourlyReportAllToSelectFirst) {
// Create multiple SystemProfileAggregates in the report's aggregation, as would be created for
// a REPORT_ALL metric.
uint64_t system_profile_hash_1 = uint64_t{1234};
uint64_t system_profile_hash_2 = uint64_t{5678};
uint32_t hour_id = 100001;
{
StatusOr<MetricAggregateRef> agg_or =
storage_->GetMetricAggregate(lib::CustomerIdentifier(kCustomerId)
.ForProject(kProjectId)
.ForMetric(kOccurrenceMetricMetricId));
ASSERT_TRUE(agg_or.ok());
MetricAggregateRef agg = agg_or.ConsumeValueOrDie();
ReportAggregate& report_agg =
(*agg.aggregate()->mutable_by_report_id())[kOccurrenceMetricHourlyDeviceHistogramsReportId];
report_agg.mutable_hourly()->set_last_hour_id(hour_id);
AggregationPeriodBucket& mutable_bucket =
(*report_agg.mutable_hourly()->mutable_by_hour_id())[hour_id];
SystemProfileAggregate* system_profile_aggregate =
mutable_bucket.add_system_profile_aggregates();
system_profile_aggregate->set_system_profile_hash(system_profile_hash_1);
system_profile_aggregate->set_first_seen_timestamp(9000);
system_profile_aggregate->set_last_seen_timestamp(9001);
EventCodesAggregateData* data = system_profile_aggregate->add_by_event_code();
data->add_event_codes(1);
data->mutable_data()->set_count(3);
system_profile_aggregate = mutable_bucket.add_system_profile_aggregates();
system_profile_aggregate->set_system_profile_hash(system_profile_hash_2);
system_profile_aggregate->set_first_seen_timestamp(10000);
system_profile_aggregate->set_last_seen_timestamp(10001);
data = system_profile_aggregate->add_by_event_code();
data->add_event_codes(2);
data->mutable_data()->set_count(5);
ASSERT_EQ(agg.Save().error_code(), StatusCode::OK);
} // Scope ensures the first MetricAggregateRef gets deleted before another one is created.
ASSERT_THAT(fs()->ListFiles(absl::StrCat(test_folder(), "/", kCustomerId, "/", kProjectId))
.ConsumeValueOrDie(),
Contains("1"));
// Reload the storage, triggering a migration.
ReplaceRegistry();
StatusOr<MetricAggregateRef> agg_or =
storage_->GetMetricAggregate(lib::CustomerIdentifier(kCustomerId)
.ForProject(kProjectId)
.ForMetric(kOccurrenceMetricMetricId));
ASSERT_EQ(agg_or.status().error_code(), StatusCode::OK);
MetricAggregateRef migrated_metric_agg = agg_or.ConsumeValueOrDie();
const ReportAggregate& report_agg = migrated_metric_agg.aggregate()->by_report_id().at(
kOccurrenceMetricHourlyDeviceHistogramsReportId);
ASSERT_TRUE(report_agg.hourly().by_hour_id().contains(hour_id));
const AggregationPeriodBucket& bucket = report_agg.hourly().by_hour_id().at(hour_id);
ASSERT_EQ(bucket.system_profile_aggregates_size(), 1);
const SystemProfileAggregate& system_profile_aggregate = bucket.system_profile_aggregates(0);
EXPECT_EQ(system_profile_aggregate.system_profile_hash(), system_profile_hash_1);
EXPECT_EQ(system_profile_aggregate.first_seen_timestamp(), 9000);
EXPECT_EQ(system_profile_aggregate.last_seen_timestamp(), 10001);
ASSERT_EQ(system_profile_aggregate.by_event_code_size(), 2);
const EventCodesAggregateData& data = system_profile_aggregate.by_event_code(0);
ASSERT_EQ(data.event_codes_size(), 1);
EXPECT_EQ(data.event_codes(0), 1);
EXPECT_EQ(data.data().count(), 3);
const EventCodesAggregateData& data2 = system_profile_aggregate.by_event_code(1);
ASSERT_EQ(data2.event_codes_size(), 1);
EXPECT_EQ(data2.event_codes(0), 2);
EXPECT_EQ(data2.data().count(), 5);
}
} // namespace cobalt::local_aggregation