// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "src/local_aggregation_1_1/local_aggregate_storage/local_aggregate_storage.h"

#include <memory>

#include "src/lib/util/datetime_util.h"
#include "src/lib/util/hash.h"
#include "src/local_aggregation_1_1/aggregation_procedures/aggregation_procedure.h"
#include "src/local_aggregation_1_1/local_aggregate_storage/delayed_local_aggregate_storage.h"
#include "src/local_aggregation_1_1/local_aggregate_storage/immediate_local_aggregate_storage.h"
#include "src/logging.h"
#include "src/pb/metadata_builder.h"
#include "src/public/lib/statusor/statusor.h"

namespace cobalt::local_aggregation {

std::unique_ptr<LocalAggregateStorage> LocalAggregateStorage::New(
    StorageStrategy strategy, std::string base_directory, util::FileSystem& fs,
    const logger::ProjectContextFactory* global_project_context_factory,
    MetadataBuilder& metadata_builder, int64_t per_project_reserved_bytes) {
  switch (strategy) {
    case StorageStrategy::Immediate:
      return std::make_unique<ImmediateLocalAggregateStorage>(
          std::move(base_directory), fs, global_project_context_factory, metadata_builder,
          per_project_reserved_bytes);

    case StorageStrategy::Delayed:
      return std::make_unique<DelayedLocalAggregateStorage>(
          std::move(base_directory), fs, global_project_context_factory, metadata_builder,
          per_project_reserved_bytes);
  }
}

namespace {

const ReportDefinition* FindReportDefinition(const MetricDefinition& metric_definition,
                                             uint32_t report_id) {
  for (const ReportDefinition& report_def : metric_definition.reports()) {
    if (report_def.id() == report_id) {
      return &report_def;
    }
  }
  return nullptr;
}

}  // namespace

void LocalAggregateStorage::MigrateAggregationPeriodBucket(
    AggregationPeriodBucket& bucket, util::TimeInfo time_info, int64_t timestamp,
    const MetricDefinition& metric_definition, const ReportDefinition& report_def,
    const MetadataBuilder& metadata_builder) {
  // Get the system profile that was active at the time of the aggregated events.
  SystemProfile system_profile =
      metadata_builder.GetSystemProfile(metric_definition, report_def, time_info);

  // Build the filtered system profile for this report.
  SystemProfile filtered_system_profile;
  MetadataBuilder::FilteredSystemProfile(report_def, system_profile, &filtered_system_profile);
  uint64_t system_profile_hash = util::Farmhash64(filtered_system_profile.SerializeAsString());

  // Add the new SystemProfileAggregate for this filtered system profile.
  SystemProfileAggregate* system_profile_agg = bucket.add_system_profile_aggregates();
  system_profile_agg->set_system_profile_hash(system_profile_hash);
  system_profile_agg->set_first_seen_timestamp(timestamp);
  system_profile_agg->set_last_seen_timestamp(timestamp);

  // Copy the existing aggregate data into the new SystemProfileAggregate.
  for (const auto& deprecated_by_event_code : bucket.deprecated_by_event_code()) {
    *system_profile_agg->add_by_event_code() = deprecated_by_event_code;
  }
  // Remove the old deprecated aggregate data.
  bucket.clear_deprecated_by_event_code();

  // Ensure the filtered SystemProfile is present in the aggregate storage.
  StoreFilteredSystemProfile(system_profile_hash, filtered_system_profile);
}

bool LocalAggregateStorage::MigrateReportAllData(AggregationPeriodBucket& bucket,
                                                 const MetricDefinition& metric_definition,
                                                 const ReportDefinition& report_def) {
  lib::statusor::StatusOr<std::unique_ptr<AggregationProcedure>> procedure_or =
      AggregationProcedure::Get(metric_definition, report_def);
  if (!procedure_or.ok()) {
    LOG(ERROR) << "Failed to create aggregation procedure to migrate REPORT_ALL data for report "
               << metric_definition.id() << "-" << report_def.id() << ": " << procedure_or.status();
    return false;
  }
  std::unique_ptr<AggregationProcedure> procedure = procedure_or.ConsumeValueOrDie();
  if (procedure == nullptr) {
    // This is a non cobalt 1.1 report type, should be silently ignored.
    return false;
  }

  google::protobuf::RepeatedPtrField<SystemProfileAggregate>* system_profile_aggregates =
      bucket.mutable_system_profile_aggregates();
  auto system_profile_aggregates_it = system_profile_aggregates->begin();

  // Merge all the other aggregates into the first entry.
  SystemProfileAggregate& merged_system_profile_aggregate = *system_profile_aggregates_it;

  ++system_profile_aggregates_it;
  while (system_profile_aggregates_it != system_profile_aggregates->end()) {
    procedure->MergeSystemProfileAggregates(merged_system_profile_aggregate,
                                            *system_profile_aggregates_it);

    // After merging, remove the old aggregate from the repeated field.
    system_profile_aggregates_it = system_profile_aggregates->erase(system_profile_aggregates_it);
  }
  return true;
}

bool LocalAggregateStorage::MigrateStoredData(MetricAggregate& metric,
                                              const MetricDefinition& metric_definition,
                                              const MetadataBuilder& metadata_builder) {
  bool changed = false;

  if (metric.deprecated_by_system_profile_size() > 0) {
    // NOTE(fxbug.dev/87271): This is needed to clean up excess by_system_profiles in the aggregate
    // store. Do not delete unless you are *certain* there are no devices in the wild that still
    // need this cleanup
    metric.clear_deprecated_by_system_profile();
    changed = true;
    LOG(INFO) << "Cleared the local aggregate data storage use of the DEPRECATED_by_system_profile "
                 "field.";
  }

  // Migrate from AggregationPeriodBucket.deprecated_by_event_code to use system_profile_aggregates.
  // ReportAggregates for reports not found in the Registry are also removed.
  auto* by_report_id = metric.mutable_by_report_id();
  for (auto report_it = by_report_id->begin(); report_it != by_report_id->end();) {
    uint32_t report_id = report_it->first;
    ReportAggregate& report = report_it->second;
    const ReportDefinition* report_def = FindReportDefinition(metric_definition, report_id);

    if (report_def == nullptr) {
      LOG(WARNING) << "Found report that is not present in the registry: ("
                   << metric_definition.customer_id() << ", " << metric_definition.project_id()
                   << ", " << metric_definition.id() << ", " << report_id << "): Deleting it.";
      changed = true;
      report_it = by_report_id->erase(report_it);
      continue;
    }
    ++report_it;

    switch (report.time_period_case()) {
      case ReportAggregate::kHourly:
        for (auto& [hour_id, bucket] : *report.mutable_hourly()->mutable_by_hour_id()) {
          if (bucket.deprecated_by_event_code_size() > 0) {
            changed = true;
            MigrateAggregationPeriodBucket(bucket, util::TimeInfo::FromHourId(hour_id),
                                           util::HourIdToUnixSeconds(hour_id), metric_definition,
                                           *report_def, metadata_builder);
            LOG(INFO) << "Migrated the local aggregate data to store SystemProfiles "
                         "for report: "
                      << report_id << " hour id: " << hour_id;
          }
          if (report_def->system_profile_selection() != REPORT_ALL &&
              bucket.system_profile_aggregates_size() > 1) {
            if (MigrateReportAllData(bucket, metric_definition, *report_def)) {
              changed = true;
              LOG(INFO) << "Migrated the previously REPORT_ALL report " << report_id << " to: "
                        << SystemProfileSelectionPolicy_Name(
                               report_def->system_profile_selection());
            }
          }
        }
        break;
      case ReportAggregate::kDaily:
        for (auto& [day_index, bucket] : *report.mutable_daily()->mutable_by_day_index()) {
          if (bucket.deprecated_by_event_code_size() > 0) {
            changed = true;
            MigrateAggregationPeriodBucket(bucket, util::TimeInfo::FromDayIndex(day_index),
                                           util::DayIndexToUnixSeconds(day_index),
                                           metric_definition, *report_def, metadata_builder);
            LOG(INFO) << "Migrated the local aggregate data to store SystemProfiles "
                         "for report: "
                      << report_id << " day index: " << day_index;
          }
          if (report_def->system_profile_selection() != REPORT_ALL &&
              bucket.system_profile_aggregates_size() > 1) {
            if (MigrateReportAllData(bucket, metric_definition, *report_def)) {
              changed = true;
              LOG(INFO) << "Migrated the previously REPORT_ALL report " << report_id << " to: "
                        << SystemProfileSelectionPolicy_Name(
                               report_def->system_profile_selection());
            }
          }
        }
        break;
      default:
        // Ignore empty ReportAggregate.
        break;
    }
  }

  // Migration from deprecated_at_least_once boolean to new AtLeastOnce message.
  // Must come after the migration to use system_profile_aggregates.
  for (auto& [report_id, report] : *metric.mutable_by_report_id()) {
    if (report.has_daily()) {
      uint32_t last_day_index = report.daily().last_day_index();
      for (auto& [day_index, bucket] : *report.mutable_daily()->mutable_by_day_index()) {
        for (SystemProfileAggregate& system_profile_agg :
             *bucket.mutable_system_profile_aggregates()) {
          for (EventCodesAggregateData& data : *system_profile_agg.mutable_by_event_code()) {
            if (data.data().deprecated_at_least_once()) {
              changed = true;
              data.mutable_data()->clear_deprecated_at_least_once();
              data.mutable_data()->mutable_at_least_once()->set_at_least_once(true);
              data.mutable_data()->mutable_at_least_once()->set_last_day_index(last_day_index);
              LOG(INFO) << "Migrated the local aggregate data to use the new AtLeastOnce "
                           "message for report: "
                        << report_id
                        << " system profile hash: " << system_profile_agg.system_profile_hash()
                        << " day index: " << day_index;
            }
          }
        }
      }
    }
    if (report.has_hourly()) {
      for (auto& [hour_id, bucket] : *report.mutable_hourly()->mutable_by_hour_id()) {
        for (SystemProfileAggregate& system_profile_agg :
             *bucket.mutable_system_profile_aggregates()) {
          for (EventCodesAggregateData& data : *system_profile_agg.mutable_by_event_code()) {
            // Migration from deprecated_at_least_once boolean to new AtLeastOnce message.
            if (data.data().deprecated_at_least_once()) {
              changed = true;
              data.mutable_data()->clear_deprecated_at_least_once();
              data.mutable_data()->mutable_at_least_once()->set_at_least_once(true);
              LOG(INFO) << "Migrated the local aggregate data to use the new AtLeastOnce "
                           "message for report: "
                        << report_id
                        << " system profile hash: " << system_profile_agg.system_profile_hash()
                        << " hour id: " << hour_id;
            }
          }
        }
      }
    }
  }
  return changed;
}

void LocalAggregateStorage::UpdateProjectSizeBy(lib::ProjectIdentifier proj,
                                                int64_t size_increase) {
  // If the size hasn't changed, nothing needs to be done.
  if (size_increase == 0) {
    return;
  }
  auto locked = byte_tracking_.lock();
  locked->total += size_increase;

  // Subtract current slush usage for project.
  if (locked->total_per_project[proj] > per_project_reserved_bytes_) {
    locked->total_slush -= locked->total_per_project[proj] - per_project_reserved_bytes_;
  }

  locked->total_per_project[proj] += size_increase;
  // This shouldn't happen, but a negative total_per_project is very undesirable.
  if (locked->total_per_project[proj] < 0) {
    LOG(ERROR) << "More data was deleted from a project than was previously present. This "
                  "represents a bug in Cobalt.";
    locked->total_per_project[proj] = 0;
  }

  // Add current slush usage for project.
  if (locked->total_per_project[proj] > per_project_reserved_bytes_) {
    locked->total_slush += locked->total_per_project[proj] - per_project_reserved_bytes_;
  }
}

}  // namespace cobalt::local_aggregation
