// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "src/local_aggregation/observation_generator.h"

#include <chrono>
#include <map>
#include <optional>
#include <thread>

#include "absl/strings/str_cat.h"
#include "src/lib/util/clock.h"
#include "src/lib/util/datetime_util.h"
#include "src/local_aggregation/aggregation_procedures/aggregation_procedure.h"
#include "src/local_aggregation/backfill_manager.h"
#include "src/local_aggregation/local_aggregate_storage/local_aggregate_storage.h"
#include "src/logger/observation_writer.h"
#include "src/pb/metadata_builder.h"
#include "src/pb/observation_batch.pb.h"
#include "src/public/lib/status_codes.h"
#include "src/public/lib/statusor/status_macros.h"
#include "src/public/lib/statusor/statusor.h"
#include "src/registry/window_size.pb.h"
#include "src/system_data/system_data.h"

namespace cobalt::local_aggregation {

const std::chrono::seconds kDefaultGenerateObsInterval = std::chrono::hours(1);
// Wait at least 3 minutes from startup before running the first observation generation.
// This allows expedited device count metrics to be logged and sent quickly for devices that
// won't be up for very long.
const std::chrono::seconds kDefaultInitialWaitToGenerate = std::chrono::minutes(3);

ObservationGenerator::ObservationGenerator(
    LocalAggregateStorage& aggregate_storage,
    const logger::ProjectContextFactory& global_project_context_factory,
    system_data::SystemDataInterface& system_data,
    const logger::ObservationWriter& observation_writer,
    std::unique_ptr<logger::PrivacyEncoder> privacy_encoder,
    util::CivilTimeConverterInterface& civil_time_converter,
    bool generate_observations_with_current_system_profile, bool test_dont_backfill_empty_reports,
    uint32_t backfill_manager_days)
    : aggregate_storage_(aggregate_storage),
      global_project_context_factory_(global_project_context_factory),
      system_data_(system_data),
      observation_writer_(observation_writer),
      steady_clock_(new util::SteadyClock()),
      generate_obs_interval_(kDefaultGenerateObsInterval),
      backfill_manager_(backfill_manager_days),
      privacy_encoder_(std::move(privacy_encoder)),
      civil_time_converter_(civil_time_converter),
      generate_observations_with_current_system_profile_(
          generate_observations_with_current_system_profile),
      test_dont_backfill_empty_reports_(test_dont_backfill_empty_reports) {
  next_generate_obs_ = steady_clock_->now() + kDefaultInitialWaitToGenerate;
}

void ObservationGenerator::Start(util::SystemClockInterface* clock) {
  auto locked = protected_fields_.lock();
  locked->shut_down = false;
  LOG(INFO) << "Starting ObservationGenerator Worker Thread";
  worker_thread_ = std::thread([this, clock]() mutable { this->Run(clock); });
}

void ObservationGenerator::ShutDown() {
  if (worker_thread_.joinable()) {
    {
      auto locked = protected_fields_.lock();
      locked->shut_down = true;
      locked->wakeup_notifier.notify_all();
    }
    worker_thread_.join();
  } else {
    protected_fields_.lock()->shut_down = true;
  }
}

void ObservationGenerator::Run(util::SystemClockInterface* clock) {
  LOG(INFO) << "ObservationGenerator Worker Thread running";
  auto locked = protected_fields_.lock();
  while (true) {
    // Exit if a shutdown has been requested.
    if (locked->shut_down) {
      return;
    }

    // Sleep until the next scheduled observation generation or until notified of a shutdown.
    locked->wakeup_notifier.wait_until(locked, next_generate_obs_,
                                       [&locked]() { return locked->shut_down; });

    const Status status = GenerateObservations(clock->now(), steady_clock_->now());
    if (!status.ok()) {
      LOG(ERROR) << "Error occurred while generating observations: " << status;
    }
  }
}

Status ObservationGenerator::GenerateObservations(
    std::chrono::system_clock::time_point system_time,
    std::chrono::steady_clock::time_point steady_time) {
  VLOG(5) << "ObservationGenerator::GenerateObservations";

  if (steady_time >= next_generate_obs_) {
    next_generate_obs_ += generate_obs_interval_;
    return GenerateObservationsOnce(system_time);
  }

  return Status::OkStatus();
}

Status ObservationGenerator::GenerateObservationsOnce(
    std::chrono::system_clock::time_point system_time) {
  logger::InternalMetrics::InternalMetricsFlusher flusher = internal_metrics_->Flusher();
  CivilTimeManager civil_time_mgr(system_time, civil_time_converter_);
  util::TimeInfo utc_time_info = civil_time_mgr.GetInitialUtc();
  LOG(INFO) << "Generating aggregated observations for periods ending before system time: "
            << std::chrono::system_clock::to_time_t(system_time) << " (day index "
            << utc_time_info.day_index << ", hour ID " << utc_time_info.hour_id << " in UTC)";
  std::vector<Status> generation_errors;
  for (lib::ProjectIdentifier project_identifier : global_project_context_factory_.ListProjects()) {
    std::unique_ptr<logger::ProjectContext> project =
        global_project_context_factory_.NewProjectContext(project_identifier);

    for (lib::MetricIdentifier metric_identifier : project->ListMetrics()) {
      const MetricDefinition* metric = project->GetMetric(metric_identifier);
      lib::statusor::StatusOr<LocalAggregateStorage::MetricAggregateRef> aggregate_or =
          aggregate_storage_.GetMetricAggregate(metric_identifier);
      if (!aggregate_or.ok()) {
        generation_errors.push_back(aggregate_or.status());
        continue;
      }
      LocalAggregateStorage::MetricAggregateRef aggregate = std::move(aggregate_or.value());
      logger::MetricRef metric_ref = project->RefMetric(metric);

      for (const ReportDefinition& report : metric->reports()) {
        CB_ASSIGN_OR_RETURN(std::unique_ptr<AggregationProcedure> procedure,
                            AggregationProcedure::Get(*metric, report));

        if (procedure) {
          if (test_dont_backfill_empty_reports_ &&
              aggregate.aggregate()->mutable_by_report_id()->count(report.id()) == 0) {
            continue;
          }

          ReportAggregate& report_aggregate =
              (*aggregate.aggregate()->mutable_by_report_id())[report.id()];

          Status status =
              GenerateObservationsForReportAggregate(aggregate, report_aggregate, *procedure,
                                                     &civil_time_mgr, *metric, metric_ref, report);

          if (!status.ok()) {
            generation_errors.push_back(status);
          }
        }
      }

      aggregate.Save();
    }
  }

  Status status = aggregate_storage_.GarbageCollection();
  if (!status.ok()) {
    generation_errors.push_back(status);
  }

  if (!generation_errors.empty()) {
    // Set the status code to the error that first occurred during the generation of observations.
    util::StatusBuilder sb(generation_errors.at(0).error_code(), "Encountered ");
    sb.AppendMsg(generation_errors.size()).AppendMsg(" errors while generating observations");
    for (const Status& error : generation_errors) {
      sb.WithContext("error", error);
    }
    return sb.Build();
  }

  return Status::OkStatus();
}

Status ObservationGenerator::GenerateObservationsForReportAggregate(
    const LocalAggregateStorage::MetricAggregateRef& aggregate, ReportAggregate& report_aggregate,
    AggregationProcedure& procedure, CivilTimeManager* civil_time_mgr,
    const MetricDefinition& metric, const logger::MetricRef& metric_ref,
    const ReportDefinition& report) {
  SystemProfile current_filtered_system_profile;
  MetadataBuilder::FilteredSystemProfile(report, system_data_.system_profile(),
                                         &current_filtered_system_profile);
  std::map<uint64_t, SystemProfile> system_profile_cache;

  CB_ASSIGN_OR_RETURN(std::vector<util::TimeInfo> to_generate,
                      backfill_manager_.CalculateBackfill(
                          procedure.GetLastTimeInfo(report_aggregate), civil_time_mgr, metric,
                          procedure.IsDaily(), procedure.IsExpedited()));

  for (util::TimeInfo time_info : to_generate) {
    CB_ASSIGN_OR_RETURN(std::vector<ObservationAndSystemProfile> observations,
                        procedure.GenerateObservations(time_info, report_aggregate));

    if (!observations.empty()) {
      for (ObservationAndSystemProfile& observation : observations) {
        const SystemProfile* system_profile =
            GetSystemProfile(observation.system_profile_hash, &system_profile_cache, aggregate,
                             current_filtered_system_profile);

        CB_ASSIGN_OR_RETURN(std::vector<std::unique_ptr<Observation>> private_observations,
                            privacy_encoder_->MaybeMakePrivateObservations(
                                std::move(observation.observation), metric, report));

        CB_RETURN_IF_ERROR(WriteObservations(report_aggregate, procedure, time_info, metric_ref,
                                             report, private_observations, *system_profile,
                                             observation.system_profile_hash));
      }
    } else {
      // Allow the privacy encoder a chance to create a fabricated observation.
      CB_ASSIGN_OR_RETURN(std::vector<std::unique_ptr<Observation>> private_observations,
                          privacy_encoder_->MaybeMakePrivateObservations(nullptr, metric, report));

      // Use the current system profile if any fabricated private observations are created.
      // TODO(fxbug.dev/92955): choose plausible SystemProfiles for fabricated observations.
      CB_RETURN_IF_ERROR(WriteObservations(report_aggregate, procedure, time_info, metric_ref,
                                           report, private_observations,
                                           current_filtered_system_profile, std::nullopt));
    }
  }

  return Status::OkStatus();
}

Status ObservationGenerator::WriteObservations(
    ReportAggregate& report_aggregate, AggregationProcedure& procedure, util::TimeInfo time_info,
    const logger::MetricRef& metric_ref, const ReportDefinition& report,
    const std::vector<std::unique_ptr<Observation>>& private_observations,
    const SystemProfile& system_profile, std::optional<uint64_t> commit_system_profile_hash) {
  bool is_contribution = true;
  for (const std::unique_ptr<Observation>& private_observation : private_observations) {
    if (private_observation == nullptr) {
      continue;
    }
    CB_RETURN_IF_ERROR(observation_writer_.WriteObservation(
        private_observation,
        MetadataBuilder::BuildFromProfile(metric_ref, report, time_info.day_index, system_profile),
        is_contribution));
    is_contribution = false;
  }
  // Mark the aggregated data as having been sent.
  procedure.ObservationsCommitted(report_aggregate, time_info, commit_system_profile_hash);
  return Status::OkStatus();
}

const SystemProfile* ObservationGenerator::GetSystemProfile(
    uint64_t system_profile_hash, std::map<uint64_t, SystemProfile>* system_profile_cache,
    const LocalAggregateStorage::MetricAggregateRef& aggregate,
    const SystemProfile& current_filtered_system_profile) const {
  auto system_profile_cache_it = system_profile_cache->find(system_profile_hash);
  if (system_profile_cache_it != system_profile_cache->end()) {
    return &system_profile_cache_it->second;
  }
  lib::statusor::StatusOr<SystemProfile> aggregate_profile =
      aggregate.RetrieveFilteredSystemProfile(system_profile_hash);
  if (!aggregate_profile.ok()) {
    LOG(ERROR) << "Failed to retrieve system profile from local aggregation, falling back to using "
                  "current system profile for metric: "
               << aggregate.DebugString();
  }
  SystemProfile system_profile = aggregate_profile.ValueOr(current_filtered_system_profile);
  if (generate_observations_with_current_system_profile_) {
    SystemProfile current_system_profile = current_filtered_system_profile;
    MergeSystemProfiles(&current_system_profile, system_profile);

    system_profile_cache->emplace(system_profile_hash, std::move(current_system_profile));
  } else {
    system_profile_cache->emplace(system_profile_hash, std::move(system_profile));
  }
  return &system_profile_cache->at(system_profile_hash);
}

void ObservationGenerator::MergeSystemProfiles(SystemProfile* to, const SystemProfile& from) {
  // From
  // https://developers.google.com/protocol-buffers/docs/reference/cpp/google.protobuf.message#Message.MergeFrom.details:
  // Singular fields will be overwritten, if specified in from, except for embedded messages which
  // will be merged. Repeated fields will be concatenated.
  //
  // Concatenating repeated fields is undesirable here. Instead, replace the repeated field data.
  if (from.experiment_ids_size() > 0) {
    to->clear_experiment_ids();
  }
  to->MergeFrom(from);
}

}  // namespace cobalt::local_aggregation
