blob: 01156e03d03e29edbe4d675d9c350c7bf7479299 [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "src/local_aggregation_1_1/local_aggregate_storage/delayed_local_aggregate_storage.h"
#include <chrono>
#include <future>
#include <memory>
#include <gtest/gtest.h>
#include "absl/strings/escaping.h"
#include "src/lib/util/hash.h"
#include "src/lib/util/testing/test_with_files.h"
#include "src/local_aggregation_1_1/local_aggregate_storage/local_aggregate_storage.h"
#include "src/local_aggregation_1_1/testing/test_registry.cb.h"
#include "src/logger/project_context_factory.h"
#include "src/logging.h"
#include "src/pb/metadata_builder.h"
#include "src/public/lib/statusor/statusor.h"
#include "src/registry/cobalt_registry.pb.h"
#include "src/system_data/fake_system_data.h"
namespace cobalt::local_aggregation {
using lib::statusor::StatusOr;
using MetricAggregateRef = LocalAggregateStorage::MetricAggregateRef;
const std::chrono::seconds kMaxWait = std::chrono::seconds(5);
namespace {
std::unique_ptr<CobaltRegistry> GetRegistry() {
std::string bytes;
if (!absl::Base64Unescape(kCobaltRegistryBase64, &bytes)) {
LOG(ERROR) << "Unable to decode Base64 String";
return nullptr;
}
auto registry = std::make_unique<CobaltRegistry>();
if (!registry->ParseFromString(bytes)) {
LOG(ERROR) << "Unable to parse registry from bytes";
return nullptr;
}
return registry;
}
} // namespace
class DelayedLocalAggregateStorageTest : public util::testing::TestWithFiles {
private:
void SetUp() override {
MakeTestFolder();
InitStorage();
}
public:
void InitStorage(std::unique_ptr<CobaltRegistry> registry = GetRegistry(),
size_t per_project_reserved_bytes = 0,
std::chrono::milliseconds writeback_delay = std::chrono::milliseconds(200)) {
storage_ = nullptr;
system_data_.ResetChangeCallbacks();
system_data_.SetVersion("100");
global_project_context_factory_ =
std::make_unique<logger::ProjectContextFactory>(std::move(registry));
mock_clock_ =
std::make_unique<util::IncrementingSystemClock>(std::chrono::system_clock::duration(0));
mock_clock_->increment_by(std::chrono::hours(24 * 100));
validated_clock_ = std::make_unique<util::FakeValidatedClock>(mock_clock_.get());
validated_clock_->SetAccurate(true);
metadata_builder_ = std::make_unique<MetadataBuilder>(&system_data_, validated_clock_.get(),
system_data_cache_path(), fs());
metadata_builder_->SnapshotSystemData();
storage_ = std::make_unique<DelayedLocalAggregateStorage>(
local_aggregation_store_path(), fs(), global_project_context_factory_.get(),
metadata_builder_.get(), per_project_reserved_bytes, writeback_delay);
}
protected:
system_data::FakeSystemData system_data_;
std::unique_ptr<util::IncrementingSystemClock> mock_clock_;
std::unique_ptr<logger::ProjectContextFactory> global_project_context_factory_;
std::unique_ptr<DelayedLocalAggregateStorage> storage_;
std::unique_ptr<util::FakeValidatedClock> validated_clock_;
std::unique_ptr<MetadataBuilder> metadata_builder_;
};
TEST_F(DelayedLocalAggregateStorageTest, CanReadAlreadyWrittenFiles) {
{
StatusOr<MetricAggregateRef> agg_or =
storage_->GetMetricAggregate(lib::CustomerIdentifier(123).ForProject(100).ForMetric(1));
ASSERT_TRUE(agg_or.ok());
MetricAggregateRef agg = agg_or.ConsumeValueOrDie();
agg.aggregate()->set_version(100);
ASSERT_TRUE(agg.Save().ok());
}
storage_->WaitUntilSave(kMaxWait);
InitStorage();
{
StatusOr<MetricAggregateRef> agg_or =
storage_->GetMetricAggregate(lib::CustomerIdentifier(123).ForProject(100).ForMetric(1));
ASSERT_TRUE(agg_or.ok());
MetricAggregateRef agg = agg_or.ConsumeValueOrDie();
ASSERT_EQ(agg.aggregate()->version(), 100);
ASSERT_TRUE(agg.Save().ok());
}
storage_->WaitUntilSave(kMaxWait);
}
TEST_F(DelayedLocalAggregateStorageTest, CleansUpOldReports) {
{
StatusOr<MetricAggregateRef> agg_or =
storage_->GetMetricAggregate(lib::CustomerIdentifier(kCustomerId)
.ForProject(kProjectId)
.ForMetric(kOccurrenceMetricMetricId));
ASSERT_TRUE(agg_or.ok());
MetricAggregateRef agg = agg_or.ConsumeValueOrDie();
agg.aggregate()->set_version(100);
ReportAggregate& report_agg =
(*agg.aggregate()
->mutable_by_report_id())[kOccurrenceMetricUniqueDeviceCountsReport7DaysReportId];
report_agg.mutable_daily()->set_last_day_index(100);
AggregationPeriodBucket& mutable_bucket =
(*report_agg.mutable_daily()->mutable_by_day_index())[100];
SystemProfileAggregate* system_profile_aggregate =
mutable_bucket.add_system_profile_aggregates();
EventCodesAggregateData* data = system_profile_aggregate->add_by_event_code();
data->mutable_data()->mutable_at_least_once()->set_at_least_once(true);
ASSERT_TRUE(agg.Save().ok());
}
storage_->WaitUntilSave(kMaxWait);
auto registry = GetRegistry();
google::protobuf::RepeatedPtrField<ReportDefinition>* reports =
registry->mutable_customers(0)
->mutable_projects(0)
->mutable_metrics(kOccurrenceMetricMetricIndex)
->mutable_reports();
reports->erase(reports->begin() + kOccurrenceMetricUniqueDeviceCountsReport7DaysReportIndex);
InitStorage(std::move(registry));
StatusOr<MetricAggregateRef> agg_or =
storage_->GetMetricAggregate(lib::CustomerIdentifier(kCustomerId)
.ForProject(kProjectId)
.ForMetric(kOccurrenceMetricMetricId));
ASSERT_EQ(agg_or.status().error_code(), StatusCode::OK);
MetricAggregateRef metric_agg = agg_or.ConsumeValueOrDie();
EXPECT_FALSE(metric_agg.aggregate()->by_report_id().contains(
kOccurrenceMetricUniqueDeviceCountsReport7DaysReportId));
}
TEST_F(DelayedLocalAggregateStorageTest, CleansUpOldMetrics) {
{
StatusOr<MetricAggregateRef> agg_or =
storage_->GetMetricAggregate(lib::CustomerIdentifier(123).ForProject(100).ForMetric(1));
ASSERT_TRUE(agg_or.ok());
MetricAggregateRef agg = agg_or.ConsumeValueOrDie();
agg.aggregate()->set_version(100);
ASSERT_TRUE(agg.Save().ok());
}
storage_->WaitUntilSave(kMaxWait);
auto registry = GetRegistry();
google::protobuf::RepeatedPtrField<MetricDefinition>* metrics =
registry->mutable_customers(0)->mutable_projects(0)->mutable_metrics();
metrics->erase(metrics->begin() + kOccurrenceMetricMetricIndex);
InitStorage(std::move(registry));
ASSERT_FALSE(
storage_->GetMetricAggregate(lib::CustomerIdentifier(123).ForProject(100).ForMetric(1)).ok());
}
TEST_F(DelayedLocalAggregateStorageTest, CleansUpOldProjects) {
{
StatusOr<MetricAggregateRef> agg_or =
storage_->GetMetricAggregate(lib::CustomerIdentifier(123).ForProject(100).ForMetric(1));
ASSERT_TRUE(agg_or.ok());
MetricAggregateRef agg = agg_or.ConsumeValueOrDie();
agg.aggregate()->set_version(100);
ASSERT_TRUE(agg.Save().ok());
}
storage_->WaitUntilSave(kMaxWait);
auto registry = GetRegistry();
registry->mutable_customers(0)->mutable_projects()->RemoveLast();
InitStorage(std::move(registry));
ASSERT_FALSE(
storage_->GetMetricAggregate(lib::CustomerIdentifier(123).ForProject(100).ForMetric(1)).ok());
}
TEST_F(DelayedLocalAggregateStorageTest, ClearBySystemProfiles) {
// Have the storage create an aggregate file with a single MetricAggregate in it.
{
StatusOr<MetricAggregateRef> agg_or =
storage_->GetMetricAggregate(lib::CustomerIdentifier(kCustomerId)
.ForProject(kProjectId)
.ForMetric(kOccurrenceMetricMetricId));
ASSERT_EQ(agg_or.status().error_code(), StatusCode::OK);
MetricAggregateRef agg = agg_or.ConsumeValueOrDie();
agg.aggregate()->set_version(100);
ASSERT_EQ(agg.Save().error_code(), StatusCode::OK);
}
storage_->WaitUntilSave(kMaxWait);
// Load the written global aggregate file.
GlobalAggregates aggregates;
Status status = fs()->Read(local_aggregation_store_path(), &aggregates);
ASSERT_EQ(status.error_code(), StatusCode::OK);
// Update the stored local aggregate data to use the DEPRECATED_by_system_profile field.
MetricAggregate& metric_agg = aggregates.mutable_by_customer_id()
->at(kCustomerId)
.mutable_by_project_id()
->at(kProjectId)
.mutable_by_metric_id()
->at(kOccurrenceMetricMetricId);
metric_agg.add_deprecated_by_system_profile();
EXPECT_EQ(metric_agg.deprecated_by_system_profile_size(), 1);
// Write the updated local aggregate storage file and recreate the storage so it reads the file.
status = fs()->Write(local_aggregation_store_path(), aggregates);
ASSERT_EQ(status.error_code(), StatusCode::OK);
InitStorage();
StatusOr<MetricAggregateRef> agg_or =
storage_->GetMetricAggregate(lib::CustomerIdentifier(kCustomerId)
.ForProject(kProjectId)
.ForMetric(kOccurrenceMetricMetricId));
ASSERT_EQ(agg_or.status().error_code(), StatusCode::OK);
EXPECT_EQ(agg_or.ConsumeValueOrDie().aggregate()->deprecated_by_system_profile_size(), 0);
}
TEST_F(DelayedLocalAggregateStorageTest, MigrateDailyToSystemProfileAggregates) {
// Have the storage create an aggregate file with a single MetricAggregate in it.
{
StatusOr<MetricAggregateRef> agg_or =
storage_->GetMetricAggregate(lib::CustomerIdentifier(kCustomerId)
.ForProject(kProjectId)
.ForMetric(kOccurrenceMetricMetricId));
ASSERT_EQ(agg_or.status().error_code(), StatusCode::OK);
MetricAggregateRef agg = agg_or.ConsumeValueOrDie();
agg.aggregate()->set_version(100);
ASSERT_EQ(agg.Save().error_code(), StatusCode::OK);
}
storage_->WaitUntilSave(kMaxWait);
// Load the written global aggregate file.
GlobalAggregates aggregates;
Status status = fs()->Read(local_aggregation_store_path(), &aggregates);
ASSERT_EQ(status.error_code(), StatusCode::OK);
// Update the stored local aggregate data to use the deprecated_by_event_code field.
uint32_t day_index = cobalt::util::TimeToDayIndex(
std::chrono::system_clock::to_time_t(mock_clock_->now()), MetricDefinition::UTC);
MetricAggregate& metric_agg = aggregates.mutable_by_customer_id()
->at(kCustomerId)
.mutable_by_project_id()
->at(kProjectId)
.mutable_by_metric_id()
->at(kOccurrenceMetricMetricId);
ReportAggregate& report_agg =
(*metric_agg.mutable_by_report_id())[kOccurrenceMetricUniqueDeviceCountsReport7DaysReportId];
report_agg.mutable_daily()->set_last_day_index(day_index);
AggregationPeriodBucket& mutable_bucket =
(*report_agg.mutable_daily()->mutable_by_day_index())[day_index];
EventCodesAggregateData* data = mutable_bucket.add_deprecated_by_event_code();
data->mutable_data()->mutable_at_least_once()->set_at_least_once(true);
// Write the updated local aggregate storage file and recreate the storage so it reads the file.
status = fs()->Write(local_aggregation_store_path(), aggregates);
ASSERT_EQ(status.error_code(), StatusCode::OK);
InitStorage();
StatusOr<MetricAggregateRef> agg_or =
storage_->GetMetricAggregate(lib::CustomerIdentifier(kCustomerId)
.ForProject(kProjectId)
.ForMetric(kOccurrenceMetricMetricId));
ASSERT_EQ(agg_or.status().error_code(), StatusCode::OK);
MetricAggregateRef migrated_metric_agg = agg_or.ConsumeValueOrDie();
report_agg = migrated_metric_agg.aggregate()->by_report_id().at(
kOccurrenceMetricUniqueDeviceCountsReport7DaysReportId);
EXPECT_TRUE(report_agg.daily().by_day_index().contains(day_index));
const AggregationPeriodBucket& bucket = report_agg.daily().by_day_index().at(day_index);
EXPECT_EQ(bucket.deprecated_by_event_code_size(), 0);
ASSERT_EQ(bucket.system_profile_aggregates_size(), 1);
const SystemProfileAggregate& system_profile_aggregate = bucket.system_profile_aggregates(0);
EXPECT_EQ(system_profile_aggregate.first_seen_timestamp(),
util::DayIndexToUnixSeconds(day_index));
EXPECT_EQ(system_profile_aggregate.last_seen_timestamp(), util::DayIndexToUnixSeconds(day_index));
EXPECT_TRUE(system_profile_aggregate.by_event_code(0).data().at_least_once().at_least_once());
StatusOr<SystemProfile> system_profile_or = migrated_metric_agg.RetrieveFilteredSystemProfile(
system_profile_aggregate.system_profile_hash());
ASSERT_EQ(system_profile_or.status().error_code(), StatusCode::OK);
SystemProfile system_profile = system_profile_or.ConsumeValueOrDie();
EXPECT_EQ(system_profile.system_version(), system_data_.system_profile().system_version());
EXPECT_EQ(system_profile.os(), system_data_.system_profile().os());
EXPECT_EQ(system_profile.board_name(), "");
}
TEST_F(DelayedLocalAggregateStorageTest, MigrateHourlyToSystemProfileAggregates) {
// Have the storage create an aggregate file with a single MetricAggregate in it.
{
StatusOr<MetricAggregateRef> agg_or =
storage_->GetMetricAggregate(lib::CustomerIdentifier(kCustomerId)
.ForProject(kProjectId)
.ForMetric(kOccurrenceMetricMetricId));
ASSERT_EQ(agg_or.status().error_code(), StatusCode::OK);
MetricAggregateRef agg = agg_or.ConsumeValueOrDie();
agg.aggregate()->set_version(100);
ASSERT_EQ(agg.Save().error_code(), StatusCode::OK);
}
storage_->WaitUntilSave(kMaxWait);
// Load the written global aggregate file.
GlobalAggregates aggregates;
Status status = fs()->Read(local_aggregation_store_path(), &aggregates);
ASSERT_EQ(status.error_code(), StatusCode::OK);
// Update the stored local aggregate data to use the deprecated_by_event_code field.
uint32_t hour_id = cobalt::util::TimeToHourId(
std::chrono::system_clock::to_time_t(mock_clock_->now()), MetricDefinition::UTC);
MetricAggregate& metric_agg = aggregates.mutable_by_customer_id()
->at(kCustomerId)
.mutable_by_project_id()
->at(kProjectId)
.mutable_by_metric_id()
->at(kOccurrenceMetricMetricId);
ReportAggregate& report_agg =
(*metric_agg
.mutable_by_report_id())[kOccurrenceMetricFleetwideOccurrenceCountsReportReportId];
report_agg.mutable_hourly()->set_last_hour_id(hour_id);
AggregationPeriodBucket& mutable_bucket =
(*report_agg.mutable_hourly()->mutable_by_hour_id())[hour_id];
EventCodesAggregateData* data = mutable_bucket.add_deprecated_by_event_code();
data->mutable_data()->set_count(10);
// Write the updated local aggregate storage file and recreate the storage so it reads the file.
status = fs()->Write(local_aggregation_store_path(), aggregates);
ASSERT_EQ(status.error_code(), StatusCode::OK);
InitStorage();
StatusOr<MetricAggregateRef> agg_or =
storage_->GetMetricAggregate(lib::CustomerIdentifier(kCustomerId)
.ForProject(kProjectId)
.ForMetric(kOccurrenceMetricMetricId));
ASSERT_EQ(agg_or.status().error_code(), StatusCode::OK);
MetricAggregateRef migrated_metric_agg = agg_or.ConsumeValueOrDie();
report_agg = migrated_metric_agg.aggregate()->by_report_id().at(
kOccurrenceMetricFleetwideOccurrenceCountsReportReportId);
ASSERT_TRUE(report_agg.hourly().by_hour_id().contains(hour_id));
const AggregationPeriodBucket& bucket = report_agg.hourly().by_hour_id().at(hour_id);
EXPECT_EQ(bucket.deprecated_by_event_code_size(), 0);
ASSERT_EQ(bucket.system_profile_aggregates_size(), 1);
const SystemProfileAggregate& system_profile_aggregate = bucket.system_profile_aggregates(0);
EXPECT_EQ(system_profile_aggregate.first_seen_timestamp(), util::HourIdToUnixSeconds(hour_id));
EXPECT_EQ(system_profile_aggregate.last_seen_timestamp(), util::HourIdToUnixSeconds(hour_id));
EXPECT_EQ(system_profile_aggregate.by_event_code(0).data().count(), 10);
StatusOr<SystemProfile> system_profile_or = migrated_metric_agg.RetrieveFilteredSystemProfile(
system_profile_aggregate.system_profile_hash());
ASSERT_EQ(system_profile_or.status().error_code(), StatusCode::OK);
SystemProfile system_profile = system_profile_or.ConsumeValueOrDie();
EXPECT_EQ(system_profile.system_version(), system_data_.system_profile().system_version());
EXPECT_EQ(system_profile.os(), system_data_.system_profile().os());
EXPECT_EQ(system_profile.board_name(), "");
}
TEST_F(DelayedLocalAggregateStorageTest, MigrateAtLeastOnce) {
// Have the storage create an aggregate file with a single MetricAggregate in it.
{
StatusOr<MetricAggregateRef> agg_or =
storage_->GetMetricAggregate(lib::CustomerIdentifier(kCustomerId)
.ForProject(kProjectId)
.ForMetric(kOccurrenceMetricMetricId));
ASSERT_EQ(agg_or.status().error_code(), StatusCode::OK);
MetricAggregateRef agg = agg_or.ConsumeValueOrDie();
agg.aggregate()->set_version(100);
ASSERT_EQ(agg.Save().error_code(), StatusCode::OK);
}
storage_->WaitUntilSave(kMaxWait);
// Load the written global aggregate file.
GlobalAggregates aggregates;
Status status = fs()->Read(local_aggregation_store_path(), &aggregates);
ASSERT_EQ(status.error_code(), StatusCode::OK);
// Update the stored local aggregate data to use the deprecated_at_least_once type.
uint32_t day_index = 100;
MetricAggregate& metric_agg = aggregates.mutable_by_customer_id()
->at(kCustomerId)
.mutable_by_project_id()
->at(kProjectId)
.mutable_by_metric_id()
->at(kOccurrenceMetricMetricId);
ReportAggregate& report_agg =
(*metric_agg.mutable_by_report_id())[kOccurrenceMetricUniqueDeviceCountsReport7DaysReportId];
report_agg.mutable_daily()->set_last_day_index(day_index);
AggregationPeriodBucket& mutable_bucket =
(*report_agg.mutable_daily()->mutable_by_day_index())[day_index];
SystemProfileAggregate* system_profile_aggregate = mutable_bucket.add_system_profile_aggregates();
EventCodesAggregateData* data = system_profile_aggregate->add_by_event_code();
data->mutable_data()->set_deprecated_at_least_once(true);
// Write the updated local aggregate storage file and recreate the storage so it reads the file.
status = fs()->Write(local_aggregation_store_path(), aggregates);
ASSERT_EQ(status.error_code(), StatusCode::OK);
InitStorage();
StatusOr<MetricAggregateRef> agg_or =
storage_->GetMetricAggregate(lib::CustomerIdentifier(kCustomerId)
.ForProject(kProjectId)
.ForMetric(kOccurrenceMetricMetricId));
ASSERT_EQ(agg_or.status().error_code(), StatusCode::OK);
report_agg = agg_or.ConsumeValueOrDie().aggregate()->by_report_id().at(
kOccurrenceMetricUniqueDeviceCountsReport7DaysReportId);
EXPECT_TRUE(report_agg.daily().by_day_index().contains(day_index));
const AggregationPeriodBucket& bucket = report_agg.daily().by_day_index().at(day_index);
EXPECT_FALSE(
bucket.system_profile_aggregates(0).by_event_code(0).data().has_deprecated_at_least_once());
EXPECT_TRUE(
bucket.system_profile_aggregates(0).by_event_code(0).data().at_least_once().at_least_once());
EXPECT_EQ(
bucket.system_profile_aggregates(0).by_event_code(0).data().at_least_once().last_day_index(),
day_index);
}
TEST_F(DelayedLocalAggregateStorageTest, DeleteDataWorks) {
{
StatusOr<MetricAggregateRef> agg_or =
storage_->GetMetricAggregate(lib::CustomerIdentifier(123).ForProject(100).ForMetric(1));
ASSERT_TRUE(agg_or.ok());
MetricAggregateRef agg = agg_or.ConsumeValueOrDie();
agg.aggregate()->set_version(100);
ASSERT_TRUE(agg.Save().ok());
}
storage_->WaitUntilSave(kMaxWait);
storage_->DeleteData();
ASSERT_FALSE(
storage_->HasMetricAggregate(lib::CustomerIdentifier(123).ForProject(100).ForMetric(1)));
}
TEST_F(DelayedLocalAggregateStorageTest, ShutDownIsFast) {
// Construct a storage with an extremely long writeback frequency.
InitStorage(GetRegistry(), 0, std::chrono::hours(99999));
{
StatusOr<MetricAggregateRef> agg_or =
storage_->GetMetricAggregate(lib::CustomerIdentifier(123).ForProject(100).ForMetric(1));
ASSERT_TRUE(agg_or.ok());
MetricAggregateRef agg = agg_or.ConsumeValueOrDie();
agg.aggregate()->set_version(100);
ASSERT_TRUE(agg.Save().ok());
}
// Destroy the storage. Immediate writeback should occur.
storage_ = nullptr;
InitStorage(GetRegistry(), 0, std::chrono::hours(99999));
{
StatusOr<MetricAggregateRef> agg_or =
storage_->GetMetricAggregate(lib::CustomerIdentifier(123).ForProject(100).ForMetric(1));
ASSERT_TRUE(agg_or.ok());
MetricAggregateRef agg = agg_or.ConsumeValueOrDie();
ASSERT_EQ(agg.aggregate()->version(), 100);
}
}
TEST_F(DelayedLocalAggregateStorageTest, StoreAndSave) {
SystemProfile system_profile;
system_profile.set_os(SystemProfile::FUCHSIA);
system_profile.set_system_version("100");
uint64_t system_profile_hash = util::Farmhash64(system_profile.SerializeAsString());
// Have the storage create an aggregate file with a single MetricAggregate in it.
{
StatusOr<MetricAggregateRef> agg_or =
storage_->GetMetricAggregate(lib::CustomerIdentifier(kCustomerId)
.ForProject(kProjectId)
.ForMetric(kOccurrenceMetricMetricId));
ASSERT_EQ(agg_or.status().error_code(), StatusCode::OK);
MetricAggregateRef agg = agg_or.ConsumeValueOrDie();
// Add some aggregate data broken down by system profile for the metric.
uint32_t day_index = 100;
ReportAggregate& report_agg =
(*agg.aggregate()
->mutable_by_report_id())[kOccurrenceMetricUniqueDeviceCountsReport7DaysReportId];
report_agg.mutable_daily()->set_last_day_index(day_index - 1);
AggregationPeriodBucket& mutable_bucket =
(*report_agg.mutable_daily()->mutable_by_day_index())[day_index];
SystemProfileAggregate* system_profile_agg = mutable_bucket.add_system_profile_aggregates();
system_profile_agg->set_system_profile_hash(system_profile_hash);
system_profile_agg->set_last_seen_timestamp(1000);
system_profile_agg->set_first_seen_timestamp(1000);
EventCodesAggregateData* data = system_profile_agg->add_by_event_code();
data->mutable_data()->mutable_at_least_once()->set_at_least_once(true);
agg.StoreFilteredSystemProfile(system_profile_hash, system_profile);
// Save all the changes to the metric aggregate.
ASSERT_EQ(agg.Save().error_code(), StatusCode::OK);
}
storage_->WaitUntilSave(kMaxWait);
// Allow the system to garbage collect unused system profiles.
storage_->GarbageCollection();
StatusOr<MetricAggregateRef> agg_or =
storage_->GetMetricAggregate(lib::CustomerIdentifier(kCustomerId)
.ForProject(kProjectId)
.ForMetric(kOccurrenceMetricMetricId));
ASSERT_EQ(agg_or.status().error_code(), StatusCode::OK);
MetricAggregateRef agg = agg_or.ConsumeValueOrDie();
StatusOr<SystemProfile> stored_system_profile =
agg.RetrieveFilteredSystemProfile(system_profile_hash);
ASSERT_EQ(stored_system_profile.status().error_code(), StatusCode::OK);
EXPECT_EQ(stored_system_profile.ConsumeValueOrDie().system_version(),
system_profile.system_version());
}
TEST_F(DelayedLocalAggregateStorageTest, GarbageCollection) {
uint32_t day_index = 100;
SystemProfile system_profile;
system_profile.set_os(SystemProfile::FUCHSIA);
system_profile.set_system_version("100");
uint64_t system_profile_hash = util::Farmhash64(system_profile.SerializeAsString());
// Have the storage create an aggregate file with a single MetricAggregate in it.
{
StatusOr<MetricAggregateRef> agg_or =
storage_->GetMetricAggregate(lib::CustomerIdentifier(kCustomerId)
.ForProject(kProjectId)
.ForMetric(kOccurrenceMetricMetricId));
ASSERT_EQ(agg_or.status().error_code(), StatusCode::OK);
MetricAggregateRef agg = agg_or.ConsumeValueOrDie();
// Add some aggregate data broken down by system profile for the metric.
ReportAggregate& report_agg =
(*agg.aggregate()
->mutable_by_report_id())[kOccurrenceMetricUniqueDeviceCountsReport7DaysReportId];
report_agg.mutable_daily()->set_last_day_index(day_index - 1);
AggregationPeriodBucket& mutable_bucket =
(*report_agg.mutable_daily()->mutable_by_day_index())[day_index];
SystemProfileAggregate* system_profile_agg = mutable_bucket.add_system_profile_aggregates();
system_profile_agg->set_system_profile_hash(system_profile_hash);
system_profile_agg->set_last_seen_timestamp(1000);
system_profile_agg->set_first_seen_timestamp(1000);
EventCodesAggregateData* data = system_profile_agg->add_by_event_code();
data->mutable_data()->mutable_at_least_once()->set_at_least_once(true);
agg.StoreFilteredSystemProfile(system_profile_hash, system_profile);
// Save all the changes to the metric aggregate.
ASSERT_EQ(agg.Save().error_code(), StatusCode::OK);
}
storage_->WaitUntilSave(kMaxWait);
// Allow the system to garbage collect unused system profiles.
storage_->GarbageCollection();
// Update the system profile due to an OTA to a new version.
SystemProfile updated_system_profile(system_profile);
updated_system_profile.set_system_version("101");
uint64_t updated_system_profile_hash =
util::Farmhash64(updated_system_profile.SerializeAsString());
{
StatusOr<MetricAggregateRef> agg_or =
storage_->GetMetricAggregate(lib::CustomerIdentifier(kCustomerId)
.ForProject(kProjectId)
.ForMetric(kOccurrenceMetricMetricId));
ASSERT_EQ(agg_or.status().error_code(), StatusCode::OK);
MetricAggregateRef agg = agg_or.ConsumeValueOrDie();
StatusOr<SystemProfile> stored_system_profile =
agg.RetrieveFilteredSystemProfile(system_profile_hash);
ASSERT_EQ(stored_system_profile.status().error_code(), StatusCode::OK);
EXPECT_EQ(stored_system_profile.ConsumeValueOrDie().system_version(),
system_profile.system_version());
// Update the aggregate data to use the updated system profile.
ReportAggregate& report_agg =
(*agg.aggregate()
->mutable_by_report_id())[kOccurrenceMetricUniqueDeviceCountsReport7DaysReportId];
AggregationPeriodBucket& mutable_bucket =
(*report_agg.mutable_daily()->mutable_by_day_index())[day_index];
SystemProfileAggregate* system_profile_agg =
mutable_bucket.mutable_system_profile_aggregates(0);
system_profile_agg->set_system_profile_hash(updated_system_profile_hash);
system_profile_agg->set_last_seen_timestamp(1001);
agg.StoreFilteredSystemProfile(updated_system_profile_hash, updated_system_profile);
// Save all the changes to the metric aggregate.
ASSERT_EQ(agg.Save().error_code(), StatusCode::OK);
}
storage_->WaitUntilSave(kMaxWait);
// Allow the system to garbage collect unused system profiles.
storage_->GarbageCollection();
{
StatusOr<MetricAggregateRef> agg_or =
storage_->GetMetricAggregate(lib::CustomerIdentifier(kCustomerId)
.ForProject(kProjectId)
.ForMetric(kOccurrenceMetricMetricId));
ASSERT_EQ(agg_or.status().error_code(), StatusCode::OK);
MetricAggregateRef agg = agg_or.ConsumeValueOrDie();
StatusOr<SystemProfile> stored_system_profile =
agg.RetrieveFilteredSystemProfile(system_profile_hash);
EXPECT_EQ(stored_system_profile.status().error_code(), StatusCode::NOT_FOUND);
stored_system_profile = agg.RetrieveFilteredSystemProfile(updated_system_profile_hash);
ASSERT_EQ(stored_system_profile.status().error_code(), StatusCode::OK);
EXPECT_EQ(stored_system_profile.ConsumeValueOrDie().system_version(),
updated_system_profile.system_version());
// Remove the aggregate data that used the updated system profile.
ReportAggregate& report_agg =
(*agg.aggregate()
->mutable_by_report_id())[kOccurrenceMetricUniqueDeviceCountsReport7DaysReportId];
(*report_agg.mutable_daily()->mutable_by_day_index()).erase(day_index);
// Save all the changes to the metric aggregate.
ASSERT_EQ(agg.Save().error_code(), StatusCode::OK);
}
storage_->WaitUntilSave(kMaxWait);
// Allow the system to garbage collect unused system profiles.
storage_->GarbageCollection();
{
StatusOr<MetricAggregateRef> agg_or =
storage_->GetMetricAggregate(lib::CustomerIdentifier(kCustomerId)
.ForProject(kProjectId)
.ForMetric(kOccurrenceMetricMetricId));
ASSERT_EQ(agg_or.status().error_code(), StatusCode::OK);
MetricAggregateRef agg = agg_or.ConsumeValueOrDie();
StatusOr<SystemProfile> stored_system_profile =
agg.RetrieveFilteredSystemProfile(updated_system_profile_hash);
EXPECT_EQ(stored_system_profile.status().error_code(), StatusCode::NOT_FOUND);
}
}
TEST_F(DelayedLocalAggregateStorageTest, UsageTrackingWorks) {
// Set per-project-reserved-bytes to 2
InitStorage(GetRegistry(), 2);
lib::ProjectIdentifier proj = lib::CustomerIdentifier(123).ForProject(100);
ASSERT_EQ(storage_->AmountStored(), 0);
ASSERT_EQ(storage_->AmountStored(proj), 0);
{
StatusOr<MetricAggregateRef> agg_or = storage_->GetMetricAggregate(proj.ForMetric(1));
ASSERT_TRUE(agg_or.ok());
MetricAggregateRef agg = std::move(agg_or.ValueOrDie());
agg.aggregate()->set_version(100); // Aggregate should now take up 2 bytes
agg.Save();
}
ASSERT_EQ(storage_->SlushUsed(), 0);
ASSERT_EQ(storage_->AmountStored(), 2);
ASSERT_EQ(storage_->AmountStored(proj), 2);
{
StatusOr<MetricAggregateRef> agg_or = storage_->GetMetricAggregate(proj.ForMetric(1));
ASSERT_TRUE(agg_or.ok());
MetricAggregateRef agg = std::move(agg_or.ValueOrDie());
agg.aggregate()->set_version(1000000); // Aggregate should now take up 4 bytes.
agg.Save();
}
ASSERT_EQ(storage_->SlushUsed(), 2);
ASSERT_EQ(storage_->AmountStored(), 4);
ASSERT_EQ(storage_->AmountStored(proj), 4);
// Add another 4 bytes. All should be counted by slush.
{
StatusOr<MetricAggregateRef> agg_or = storage_->GetMetricAggregate(proj.ForMetric(2));
ASSERT_TRUE(agg_or.ok());
MetricAggregateRef agg = std::move(agg_or.ValueOrDie());
agg.aggregate()->set_version(1000000); // Aggregate should now take up 4 bytes.
agg.Save();
}
ASSERT_EQ(storage_->SlushUsed(), 6);
ASSERT_EQ(storage_->AmountStored(), 8);
ASSERT_EQ(storage_->AmountStored(proj), 8);
// Reloading the store from disk, the usage counts should remain the same.
storage_->WaitUntilSave(kMaxWait);
InitStorage(GetRegistry(), 2);
ASSERT_EQ(storage_->SlushUsed(), 6);
ASSERT_EQ(storage_->AmountStored(), 8);
ASSERT_EQ(storage_->AmountStored(proj), 8);
// Reloading with different slush size should only update SlushUsed.
storage_->WaitUntilSave(kMaxWait);
InitStorage(GetRegistry(), 4);
ASSERT_EQ(storage_->SlushUsed(), 4);
ASSERT_EQ(storage_->AmountStored(), 8);
ASSERT_EQ(storage_->AmountStored(proj), 8);
}
class DelayedLocalAggregateStorageTestDeadlock : public util::testing::TestWithFiles {};
TEST_F(DelayedLocalAggregateStorageTestDeadlock, NoDeadlock) {
std::promise<bool> complete;
// Run the test in a thread in case it hangs.
auto thread = std::thread([this, &complete]() {
logger::ProjectContextFactory global_project_context_factory(GetRegistry());
DelayedLocalAggregateStorage storage(local_aggregation_store_path(), fs(),
&global_project_context_factory, nullptr, 0,
std::chrono::milliseconds(100));
StatusOr<MetricAggregateRef> agg_or =
storage.GetMetricAggregate(lib::CustomerIdentifier(123).ForProject(100).ForMetric(1));
ASSERT_TRUE(agg_or.ok());
MetricAggregateRef agg = agg_or.ConsumeValueOrDie();
ASSERT_TRUE(agg.Save().ok());
ASSERT_TRUE(storage.WaitUntilSaveStart(kMaxWait));
ASSERT_TRUE(agg.Save().ok()); // Save while storage_ is attempting to save should not hang.
complete.set_value(true);
});
// If the promise isn't resolved after 5 seconds, a deadlock has likely occurred.
if (complete.get_future().wait_for(std::chrono::seconds(5)) == std::future_status::timeout) {
ADD_FAILURE() << "Deadlock found";
thread.detach();
return;
}
thread.join();
}
} // namespace cobalt::local_aggregation