// Copyright 2019 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "src/public/cobalt_service.h"

#include <memory>

#include "src/lib/util/clock.h"
#include "src/lib/util/datetime_util.h"
#include "src/lib/util/encrypted_message_util.h"
#include "src/local_aggregation_1_1/aggregation_procedures/aggregation_procedure.h"
#include "src/logger/internal_metrics_config.cb.h"
#include "src/logger/project_context.h"
#include "src/observation_store/file_observation_store.h"
#include "src/observation_store/memory_observation_store.h"
#include "src/observation_store/observation_store.h"
#include "src/pb/metadata_builder.h"
#include "src/public/lib/clock_interfaces.h"
#include "src/public/lib/registry_identifiers.h"
#include "src/system_data/configuration_data.h"
#include "src/uploader/shipping_manager.h"

namespace cobalt {

namespace {

std::unique_ptr<observation_store::ObservationStore> NewObservationStore(
    const CobaltConfig &cfg, util::FileSystem *fs, DiagnosticsInterface *diagnostics) {
  if (cfg.use_memory_observation_store) {
    return std::make_unique<observation_store::MemoryObservationStore>(
        cfg.max_bytes_per_event, cfg.max_bytes_per_envelope, cfg.max_bytes_total);
  }
  return std::make_unique<observation_store::FileObservationStore>(
      cfg.max_bytes_per_event, cfg.max_bytes_per_envelope, cfg.max_bytes_total, fs, diagnostics,
      cfg.observation_store_directory, "V1 FileObservationStore");
}

std::unique_ptr<util::EncryptedMessageMaker> GetEncryptToAnalyzer(CobaltConfig *cfg) {
  if (cfg->target_pipeline->environment() == system_data::Environment::LOCAL) {
    return util::EncryptedMessageMaker::MakeUnencrypted();
  }

  if (cfg->target_pipeline->analyzer_encryption_key()) {
    return util::EncryptedMessageMaker::MakeForObservations(
               *cfg->target_pipeline->analyzer_encryption_key())
        .ConsumeValueOrDie();
  }

  return util::EncryptedMessageMaker::MakeUnencrypted();
}

std::unique_ptr<util::EncryptedMessageMaker> GetEncryptToShuffler(
    TargetPipelineInterface *pipeline) {
  if (pipeline->environment() == system_data::Environment::LOCAL) {
    return util::EncryptedMessageMaker::MakeUnencrypted();
  }

  if (pipeline->shuffler_encryption_key()) {
    return util::EncryptedMessageMaker::MakeForEnvelopes(*pipeline->shuffler_encryption_key())
        .ConsumeValueOrDie();
  }

  return util::EncryptedMessageMaker::MakeUnencrypted();
}

std::unique_ptr<uploader::ShippingManager> NewShippingManager(
    CobaltConfig *cfg, util::FileSystem *fs, observation_store::ObservationStore *observation_store,
    DiagnosticsInterface *diagnostics, util::EncryptedMessageMaker *encrypt_to_analyzer,
    const std::unique_ptr<util::EncryptedMessageMaker> &encrypt_to_shuffler) {
  if (cfg->target_pipeline->environment() == system_data::Environment::LOCAL) {
    return std::make_unique<uploader::LocalShippingManager>(observation_store,
                                                            cfg->local_shipping_manager_path, fs);
  }

  std::unique_ptr<lib::clearcut::ClearcutUploaderInterface> clearcut_uploader =
      cfg->target_pipeline->TakeClearcutUploader();
  if (!clearcut_uploader) {
    clearcut_uploader = std::make_unique<lib::clearcut::ClearcutUploader>(
        cfg->target_pipeline->clearcut_endpoint(), cfg->target_pipeline->TakeHttpClient());
  }

  auto shipping_manager = std::make_unique<uploader::ClearcutV1ShippingManager>(
      uploader::UploadScheduler(cfg->upload_schedule_cfg), observation_store,
      encrypt_to_shuffler.get(), encrypt_to_analyzer, std::move(clearcut_uploader),
      std::move(cfg->activity_listener), diagnostics,
      system_data::ConfigurationData(cfg->target_pipeline->environment()).GetLogSourceId(),
      cfg->target_pipeline->clearcut_max_retries(), cfg->api_key);

  return std::move(shipping_manager);
}

}  // namespace

CobaltService::CobaltService(CobaltConfig cfg)
    : enable_replacement_metrics_(cfg.enable_replacement_metrics),
      fs_(std::move(cfg.file_system)),
      system_data_(cfg.product_name, cfg.board_name_suggestion, cfg.release_stage, cfg.version,
                   cfg.build_type, cfg.experiment_tokens, cfg.experiment_ids),
      global_project_context_factory_(
          cfg.global_registry
              ? std::make_unique<logger::ProjectContextFactory>(std::move(cfg.global_registry))
              : nullptr),
      diagnostics_(std::move(cfg.diagnostics)),
      observation_store_(NewObservationStore(cfg, fs_.get(), diagnostics_.get())),
      encrypt_to_analyzer_(GetEncryptToAnalyzer(&cfg)),
      encrypt_to_shuffler_(GetEncryptToShuffler(cfg.target_pipeline.get())),
      shipping_manager_(NewShippingManager(&cfg, fs_.get(), observation_store_.get(),
                                           diagnostics_.get(), encrypt_to_analyzer_.get(),
                                           encrypt_to_shuffler_)),
      metadata_builder_(&system_data_, cfg.validated_clock, cfg.system_data_cache_path, fs_.get()),
      logger_encoder_(cfg.client_secret, &metadata_builder_),
      observation_writer_(observation_store_.get(), shipping_manager_.get(),
                          encrypt_to_analyzer_.get()),
      civil_time_converter_(cfg.civil_time_converter ? std::move(cfg.civil_time_converter)
                                                     : std::make_unique<util::UtcTimeConverter>()),
      event_aggregator_manager_(cfg, fs_.get(), &logger_encoder_, &observation_writer_,
                                &metadata_builder_),
      local_aggregation_(cfg, global_project_context_factory_.get(), &system_data_,
                         &metadata_builder_, fs_.get(), &observation_writer_,
                         civil_time_converter()),
      undated_event_manager_(new logger::UndatedEventManager(
          &logger_encoder_, event_aggregator_manager_.GetEventAggregator(), &local_aggregation_,
          &observation_writer_, &system_data_, civil_time_converter())),
      validated_clock_(cfg.validated_clock),
      internal_logger_(
          (global_project_context_factory_)
              ? NewLogger(cobalt::logger::kCustomerId, cobalt::logger::kProjectId, false, {})
              : nullptr),
      internal_metrics_(
          logger::InternalMetrics::NewWithLogger(internal_logger_.get(), diagnostics_.get())),
      start_worker_threads_(cfg.start_worker_threads) {
  if (!internal_logger_) {
    LOG(ERROR) << "The global_registry provided does not include the expected internal metrics "
                  "project. Cobalt-measuring-cobalt will be disabled.";
  } else {
    // Set up internal metrics for various components.
    shipping_manager_->ResetInternalMetrics(internal_metrics_.get());
    observation_store_->ResetInternalMetrics(internal_metrics_.get());
    system_data_.ResetInternalMetrics(internal_metrics_.get());
    event_aggregator_manager_.ResetInternalMetrics(internal_metrics_.get());
    local_aggregation_.ResetInternalMetrics(internal_metrics_.get());
  }
  if (start_worker_threads_) {
    shipping_manager_->Start();
  }
}

std::unique_ptr<logger::LoggerInterface> CobaltService::NewLogger(
    std::unique_ptr<logger::ProjectContext> project_context) {
  return NewLogger(std::move(project_context), true, {});
}

std::unique_ptr<logger::LoggerInterface> CobaltService::NewLogger(uint32_t customer_id,
                                                                  uint32_t project_id) {
  return NewLogger(customer_id, project_id, true, {});
}

std::unique_ptr<logger::LoggerInterface> CobaltService::NewLogger(
    uint32_t customer_id, uint32_t project_id, std::vector<uint32_t> experiment_ids) {
  return NewLogger(customer_id, project_id, true, std::move(experiment_ids));
}

std::unique_ptr<logger::Logger> CobaltService::NewLogger(uint32_t customer_id, uint32_t project_id,
                                                         bool include_internal_logger,
                                                         std::vector<uint32_t> experiment_ids) {
  CHECK(global_project_context_factory_)
      << "No global_registry provided. NewLogger with customer/project id is not supported";
  return NewLogger(global_project_context_factory_->NewProjectContext(
                       lib::CustomerIdentifier(customer_id).ForProject(project_id)),
                   include_internal_logger, std::move(experiment_ids));
}

std::unique_ptr<logger::Logger> CobaltService::NewLogger(
    std::unique_ptr<logger::ProjectContext> project_context, bool include_internal_logger,
    std::vector<uint32_t> experiment_ids) {
  if (project_context == nullptr) {
    return nullptr;
  }

  cobalt::logger::InternalMetrics *metrics =
      include_internal_logger ? internal_metrics_.get() : nullptr;
  if (undated_event_manager_) {
    return std::make_unique<logger::Logger>(
        std::move(project_context), &logger_encoder_,
        event_aggregator_manager_.GetEventAggregator(), &local_aggregation_, &observation_writer_,
        &system_data_, validated_clock_, civil_time_converter(), undated_event_manager_,
        std::move(experiment_ids), enable_replacement_metrics_, metrics);
  }
  return std::make_unique<logger::Logger>(
      std::move(project_context), &logger_encoder_, event_aggregator_manager_.GetEventAggregator(),
      &local_aggregation_, &observation_writer_, &system_data_, civil_time_converter(),
      std::move(experiment_ids), enable_replacement_metrics_, metrics);
}

void CobaltService::SystemClockIsAccurate(std::unique_ptr<util::SystemClockInterface> system_clock,
                                          bool start_event_aggregator_worker) {
  // Now that validated_clock is accurate, snapshot the system data before flushing cached events or
  // starting the aggregators that can generate observations, so that a system profile is available
  // for observation generation.
  metadata_builder_.SnapshotSystemData();

  if (undated_event_manager_) {
    undated_event_manager_->Flush(system_clock.get(), internal_metrics_.get());
    undated_event_manager_.reset();
  }

  if (start_event_aggregator_worker && start_worker_threads_) {
    local_aggregation_.Start(std::make_unique<util::SystemClockRef>(system_clock.get()));
    event_aggregator_manager_.Start(std::make_unique<util::SystemClockRef>(system_clock.get()));
  }

  system_clock_ = std::move(system_clock);
}

void CobaltService::SetDataCollectionPolicy(CobaltService::DataCollectionPolicy policy) {
  switch (policy) {
    case CobaltService::DataCollectionPolicy::COLLECT_AND_UPLOAD:
      LOG(INFO) << "CobaltService: Switch to DataCollectionPolicy COLLECT_AND_UPLOAD";
      observation_store_->Disable(false);
      event_aggregator_manager_.Disable(false);
      local_aggregation_.Disable(false);
      shipping_manager_->Disable(false);
      break;
    case CobaltService::DataCollectionPolicy::DO_NOT_UPLOAD:
      LOG(INFO) << "CobaltService: Switch to DataCollectionPolicy DO_NOT_UPLOAD";
      observation_store_->Disable(false);
      event_aggregator_manager_.Disable(false);
      local_aggregation_.Disable(false);
      shipping_manager_->Disable(true);
      break;
    case CobaltService::DataCollectionPolicy::DO_NOT_COLLECT:
      LOG(INFO) << "CobaltService: Switch to DataCollectionPolicy DO_NOT_COLLECT";
      observation_store_->Disable(true);
      observation_store_->DeleteData();

      event_aggregator_manager_.Disable(true);
      event_aggregator_manager_.DeleteData();

      local_aggregation_.Disable(true);
      local_aggregation_.DeleteData();

      shipping_manager_->Disable(true);
      break;
  }
}

Status CobaltService::GenerateAggregatedObservations() {
  std::chrono::system_clock::time_point now = system_clock_->now();
  std::time_t now_time_t = std::chrono::system_clock::to_time_t(now);
  uint32_t utc_day_index = util::TimeToDayIndex(now_time_t, MetricDefinition::UTC) - 1;
  uint32_t local_day_index = util::TimeToDayIndex(now_time_t, MetricDefinition::LOCAL) - 1;

  Status status =
      event_aggregator_manager_.GenerateObservationsNoWorker(utc_day_index, local_day_index);
  if (!status.ok()) {
    return status;
  }

  status = local_aggregation_.GenerateAggregatedObservations(now);
  if (!status.ok()) {
    LOG(ERROR) << "Got error while trying to GenerateAggregatedObservations: "
               << status.error_message();
    return status;
  }

  return Status::OkStatus();
}

Status CobaltService::GenerateAggregatedObservations(uint32_t final_day_index_utc) {
  Status status = event_aggregator_manager_.GenerateObservationsNoWorker(final_day_index_utc);
  if (!status.ok()) {
    return status;
  }

  // In order to generate observations for day indices less than or equal to `final_day_index_utc`,
  // the time passed to `GenerateAggregatedObservations` must belong to the following day. Use the
  // first hour of the day after `final_day_index_utc` to generate observations.
  status = local_aggregation_.GenerateAggregatedObservations(util::FromUnixSeconds(
      util::HourIdToUnixSeconds(util::DayIndexToHourId(final_day_index_utc + 1))));

  if (!status.ok()) {
    LOG(ERROR) << "Got error while trying to GenerateAggregatedObservations: " << status;
    return status;
  }

  return Status::OkStatus();
}

}  // namespace cobalt
