// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "src/local_aggregation_1_1/local_aggregate_storage/delayed_local_aggregate_storage.h"

#include <chrono>

#include "absl/strings/str_cat.h"
#include "src/lib/statusor/statusor.h"
#include "src/lib/util/file_system.h"
#include "src/local_aggregation_1_1/local_aggregate_storage/local_aggregate_storage.h"
#include "src/local_aggregation_1_1/local_aggregation.pb.h"
#include "src/logger/internal_metrics.h"
#include "src/logger/project_context.h"
#include "src/logger/project_context_factory.h"
#include "src/logging.h"

namespace cobalt::local_aggregation {

constexpr std::chrono::milliseconds DelayedLocalAggregateStorage::kDefaultWritebackFrequency =
    std::chrono::seconds(5);

DelayedLocalAggregateStorage::DelayedLocalAggregateStorage(
    std::string base_directory, util::FileSystem *fs,
    const logger::ProjectContextFactory *global_project_context_factory,
    std::chrono::milliseconds writeback_frequency)
    : proto_store_(std::move(base_directory), fs),
      global_project_context_factory_(global_project_context_factory),
      writeback_frequency_(writeback_frequency) {
  ReadPersistentStore();
  DeleteOutdatedMetrics();

  std::thread t([this] { this->Run(); });
  writeback_thread_ = std::move(t);
}

DelayedLocalAggregateStorage::~DelayedLocalAggregateStorage() {
  if (!writeback_thread_.joinable()) {
    return;
  }
  ShutDown();
  VLOG(4) << "~DelayedLocalAggregateStorage: waiting for writeback thread to exit...";
  writeback_thread_.join();
}

void DelayedLocalAggregateStorage::ShutDown() {
  WaitUntilSave(writeback_frequency_);
  {
    auto locked = state_.lock();
    locked->shut_down = true;
    locked->shutdown_notifier.notify_all();
    locked->data_save_notifier.notify_all();
  }
  VLOG(4) << "DelayedLocalAggregateStorage: shut-down requested.";
}

void DelayedLocalAggregateStorage::ReadPersistentStore() { proto_store_.Read(&aggregates_); }

void DelayedLocalAggregateStorage::DeleteData() {
  aggregates_.clear_by_customer_id();
  state_.lock()->data_modified = true;
  WaitUntilSave(writeback_frequency_ * 2);
}

void DelayedLocalAggregateStorage::DeleteOutdatedMetrics() {
  std::vector<std::tuple<uint32_t, uint32_t>> projects_to_delete;
  std::vector<std::tuple<uint32_t, uint32_t, uint32_t>> metrics_to_delete;
  for (auto [customer_id, customer] : aggregates_.by_customer_id()) {
    for (auto [project_id, project] : customer.by_project_id()) {
      std::unique_ptr<logger::ProjectContext> ctx =
          global_project_context_factory_->NewProjectContext(customer_id, project_id);
      if (!ctx) {
        LOG(WARNING) << "Found customer/project that is not present in the registry: ("
                     << customer_id << ", " << project_id << "): Deleting it.";
        projects_to_delete.emplace_back(std::make_tuple(customer_id, project_id));
        continue;
      }

      for (auto [metric_id, _] : project.by_metric_id()) {
        if (!ctx->GetMetric(metric_id)) {
          LOG(WARNING) << "Found metric that is not present in the registry: (" << customer_id
                       << ", " << project_id << ", " << metric_id << "): Deleting it.";
          metrics_to_delete.emplace_back(std::make_tuple(customer_id, project_id, metric_id));
        }
      }
    }
  }

  for (auto [customer_id, project_id] : projects_to_delete) {
    aggregates_.mutable_by_customer_id()
        ->at(customer_id)
        .mutable_by_project_id()
        ->erase(project_id);
  }

  for (auto [customer_id, project_id, metric_id] : metrics_to_delete) {
    aggregates_.mutable_by_customer_id()
        ->at(customer_id)
        .mutable_by_project_id()
        ->at(project_id)
        .mutable_by_metric_id()
        ->erase(metric_id);
  }
}

void DelayedLocalAggregateStorage::WaitUntilSave(std::chrono::milliseconds max_wait) {
  auto locked = state_.lock();
  if (locked->shut_down || locked->data_modified) {
    locked->data_save_notifier.wait_for(
        locked, max_wait, [&locked] { return !(locked->shut_down || locked->data_modified); });
  }
}

bool DelayedLocalAggregateStorage::WaitUntilSaveStart(std::chrono::milliseconds max_wait) {
  auto locked = state_.lock();
  return locked->data_save_start_notifier.wait_for(locked, max_wait) == std::cv_status::no_timeout;
}

void DelayedLocalAggregateStorage::Run() {
  while (true) {
    auto locked_state = state_.lock();
    if (locked_state->shut_down) {
      return;
    }

    VLOG(4) << "DelayedLocalAggregateStorage worker: sleeping for " << writeback_frequency_.count()
            << " milliseconds.";
    // We start each iteration of the loop with a sleep of writeback_frequency_. This ensures that
    // we never write to disk twice within one writeback_frequency_ period.
    locked_state->shutdown_notifier.wait_for(locked_state, writeback_frequency_,
                                             [&locked_state] { return locked_state->shut_down; });

    VLOG(4) << "DelayedLocalAggregateStorage worker: waking up from sleep. shut_down="
            << locked_state->shut_down;
    if (locked_state->shut_down) {
      return;
    }

    if (locked_state->data_modified) {
      locked_state->data_save_start_notifier.notify_all();
      // The only other use of these two mutexes are in
      // GetMetricAggregate(). In that codepath, the data_mutex_ is locked at the beginning of
      // GetMetricAggregate, and returned inside of the MetricAggregateRef. Later, when the data has
      // been modified, MetricAggregateRef::Save() is called, which then calls down to
      // SaveMetricAggregate(), which then locks state_ so that it can update data_modified.
      // Therefore, that codepath will always lock data_mutex_, then state_.
      // Here, we unlock locked_state before attempting to acquire data_mutex_ so that this codepath
      // acquires the two locks in the same order, thus avoiding any potential of deadlock.
      locked_state.unlock();

      // While everything is unlocked, track the current storage usage.
      // This cannot cause an infinite loop, since we are already in the 'data_modified' state,
      // which will be reset to false at the end of this block.
      int64_t size;
      {
        // Take the data lock briefly so we can measure the size of aggregates_ without it being
        // modified.
        std::scoped_lock<std::mutex> data_lock(data_mutex_);
        size = static_cast<int64_t>(aggregates_.ByteSizeLong());
      }
      internal_metrics_->TrackDiskUsage(
          logger::InternalMetrics::StorageClass::LocalAggregateStorage, size);

      // Lock the class so no new calls to GetMetricAggregate will return until the data has
      // written.
      std::scoped_lock<std::mutex> data_lock(data_mutex_);
      locked_state.lock();

      VLOG(4) << "DelayedLocalAggregateStorage worker: data has been modified, writing to disk.";
      util::Status status = proto_store_.Write(aggregates_);
      if (!status.ok()) {
        LOG(WARNING) << "Failed to snapshot GlobalAggregates to disk: " << status.error_message();
      }
      locked_state->data_save_notifier.notify_all();
      locked_state->data_modified = false;
    }
  }
}

bool DelayedLocalAggregateStorage::HasMetricAggregate(uint32_t customer_id, uint32_t project_id,
                                                      uint32_t metric_id) {
  if (aggregates_.by_customer_id().count(customer_id)) {
    CustomerAggregates &customer = aggregates_.mutable_by_customer_id()->at(customer_id);
    if (customer.by_project_id().count(project_id)) {
      ProjectAggregates &project = customer.mutable_by_project_id()->at(project_id);
      if (project.by_metric_id().count(metric_id)) {
        return true;
      }
    }
  }

  return false;
}

lib::statusor::StatusOr<LocalAggregateStorage::MetricAggregateRef>
DelayedLocalAggregateStorage::GetMetricAggregate(uint32_t customer_id, uint32_t project_id,
                                                 uint32_t metric_id) {
  std::unique_lock<std::mutex> data_lock(data_mutex_);

  if (HasMetricAggregate(customer_id, project_id, metric_id)) {
    return MetricAggregateRef(customer_id, project_id, metric_id,
                              &aggregates_.mutable_by_customer_id()
                                   ->at(customer_id)
                                   .mutable_by_project_id()
                                   ->at(project_id)
                                   .mutable_by_metric_id()
                                   ->at(metric_id),
                              this, std::move(data_lock));
  }

  std::unique_ptr<logger::ProjectContext> ctx =
      global_project_context_factory_->NewProjectContext(customer_id, project_id);
  if (ctx) {
    CustomerAggregates &customer = (*aggregates_.mutable_by_customer_id())[customer_id];
    ProjectAggregates &project = (*customer.mutable_by_project_id())[project_id];
    if (ctx->GetMetric(metric_id)) {
      return MetricAggregateRef(customer_id, project_id, metric_id,
                                &(*project.mutable_by_metric_id())[metric_id], this,
                                std::move(data_lock));
    }
    return util::Status(
        util::StatusCode::NOT_FOUND,
        absl::StrCat("Metric id `", metric_id, "` not found in registry for project (", customer_id,
                     ", ", project_id, ")"));
  }

  return util::Status(
      util::StatusCode::NOT_FOUND,
      absl::StrCat("Project (", customer_id, ", ", project_id, ") not found in registry"));
}

util::Status DelayedLocalAggregateStorage::SaveMetricAggregate(uint32_t /* customer_id */,
                                                               uint32_t /* project_id */,
                                                               uint32_t /* metric_id */) {
  state_.lock()->data_modified = true;
  return util::Status::OK;
}

}  // namespace cobalt::local_aggregation
