// Copyright 2017 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "analyzer/report_master/report_scheduler.h"

#include <iomanip>
#include <string>
#include <utility>

#include "config/encodings.pb.h"
#include "config/metrics.pb.h"
#include "gflags/gflags.h"
#include "glog/logging.h"
#include "util/clock.h"
#include "util/datetime_util.h"
#include "util/log_based_metrics.h"

namespace cobalt {
namespace analyzer {

using config::ReportRegistry;
using util::SystemClock;
using util::TimeToDayIndex;

DEFINE_uint32(daily_report_makeup_days, 30,
              "The number of days in the past that the ReportMaster should "
              "look to find missed scheduled reports to make up. Must be less "
              "than 100 or we will CHECK fail.");

// Stackdriver metric constants
namespace {
const char kStartReportNowFailure[] = "report-scheduler-start-report-failure";
const char kProcessReportFailure[] =
    "report-scheduler-process-report-failure-for-";

const std::string ProcessReportFailMetricName(const std::string& suffix) {
  std::ostringstream stream;
  stream << kProcessReportFailure;
  stream << suffix;
  return stream.str();
}

}  // namespace

namespace {
// Returns a human-readable respresentation of the report config ID.
// Used in forming error messages.
// TODO(rudominer) This function has been copied multiple times throughout the
// code. We should centralize it in a utility.
std::string IdString(const ReportConfig& report_config) {
  std::ostringstream stream;
  stream << "(" << report_config.customer_id() << ","
         << report_config.project_id() << "," << report_config.id() << ")";
  return stream.str();
}

// Builds a string of the form YYYYMMDD to represent the date corresponding to
// the given day_index. This is the standard format used at Google for the
// suffix of a file name for files containing data for dated tables.
std::string DateSuffix(uint32_t day_index) {
  util::CalendarDate cd = util::DayIndexToCalendarDate(day_index);
  std::ostringstream stream;
  stream << std::setfill('0') << std::setw(4) << cd.year << std::setw(2)
         << cd.month << std::setw(2) << cd.day_of_month;
  return stream.str();
}

}  // namespace

ReportScheduler::ReportScheduler(
    std::shared_ptr<config::AnalyzerConfigManager> config_manager,
    std::shared_ptr<store::ReportStore> report_store,
    std::shared_ptr<ReportStarterInterface> report_starter,
    std::chrono::milliseconds sleep_interval)
    : clock_(new SystemClock()),
      config_manager_(config_manager),
      report_starter_(report_starter),
      report_history_(new ReportHistoryCache(
          CurrentDayIndex() - FLAGS_daily_report_makeup_days, report_store)),
      sleep_interval_(sleep_interval),
      shut_down_(false) {
  CHECK_LT(FLAGS_daily_report_makeup_days, 100);
}

ReportScheduler::~ReportScheduler() {
  {
    std::lock_guard<std::mutex> lock(mutex_);
    shut_down_ = true;
    if (!scheduler_thread_.joinable()) {
      return;
    }
  }
  scheduler_thread_.join();
}

void ReportScheduler::Start() {
  std::lock_guard<std::mutex> lock(mutex_);
  std::thread t([this] { this->Run(); });
  scheduler_thread_ = std::move(t);
}

void ReportScheduler::Run() {
  while (!shut_down_) {
    Sleep();
    if (shut_down_) {
      return;
    }
    ProcessReports();
  }
}

void ReportScheduler::Sleep() {
  // Note: We invoke the real system clock here, not clock_->now().
  // This is because even in a test we want to use the real system clock to
  // compute wakeup_time because std::condition_varaible::wait_until() always
  // uses the real system clock. A test is able to control the sleep time by
  // setting the value of sleep_interval_.
  auto wakeup_time = std::chrono::system_clock::now() + sleep_interval_;
  VLOG(3) << "ReportScheduler sleeping for " << sleep_interval_.count() << "ms";
  std::unique_lock<std::mutex> lock(mutex_);
  // Sleep until wakeup_time or shut_down_ = true.
  shut_down_notifier_.wait_until(lock, wakeup_time,
                                 [this] { return this->shut_down_.load(); });
}

void ReportScheduler::ProcessReports() {
  static const int kTimeoutSeconds = 60;
  config_manager_->Update(kTimeoutSeconds);
  uint32_t current_day_index = CurrentDayIndex();
  std::shared_ptr<config::ReportRegistry> report_registry =
      config_manager_->GetCurrent()->report_registry();
  for (const ReportConfig& report_config : *report_registry) {
    if (shut_down_) {
      return;
    }
    ProcessOneReport(report_config, current_day_index);
  }
}

void ReportScheduler::ProcessOneReport(const ReportConfig& report_config,
                                       uint32_t current_day_index) {
  LOG(INFO) << "ReportScheduler processing report_config "
            << IdString(report_config);
  if (!report_config.has_scheduling()) {
    LOG(INFO) << "Skpping report_config " << IdString(report_config)
              << " because it has no SchedulingConfig.";
    return;
  }
  switch (report_config.scheduling().aggregation_epoch_type()) {
    case DAY:
      ProcessDailyReport(report_config, current_day_index);
      return;

    case WEEK:
      ProcessWeeklyReport(report_config, current_day_index);
      return;

    case MONTH:
      ProcessMonthlyReport(report_config, current_day_index);
      return;

    default: {
      LOG_STACKDRIVER_COUNT_METRIC(ERROR, ProcessReportFailMetricName("one"))
          << "Unrecognized aggregatoin_epoch_type: "
          << report_config.scheduling().aggregation_epoch_type()
          << "In ReportConfig " << IdString(report_config);
      return;
    }
  }
}

void ReportScheduler::ProcessDailyReport(const ReportConfig& report_config,
                                         uint32_t current_day_index) {
  // Look back a number of days equal to the maximum of daily_report_makeup_days
  // and report_finalization_days.
  auto scheduling = report_config.scheduling();
  if (report_config.scheduling().report_finalization_days() > 20) {
    LOG_STACKDRIVER_COUNT_METRIC(ERROR, ProcessReportFailMetricName("daily"))
        << "Invalid ReportConfig: " << IdString(report_config)
        << " report_finalization_days too large: "
        << report_config.scheduling().report_finalization_days();
    return;
  }

  uint32_t finalization_days = scheduling.report_finalization_days();
  uint32_t lookback_days = FLAGS_daily_report_makeup_days >= finalization_days
                               ? FLAGS_daily_report_makeup_days
                               : finalization_days;
  uint32_t period_start =
      (current_day_index >= lookback_days ? (current_day_index - lookback_days)
                                          : 0u);
  VLOG(4) << "ReportScheduler considering days in the interval ["
          << period_start << ", " << current_day_index << "]";
  for (uint32_t day_index = period_start; day_index <= current_day_index;
       day_index++) {
    if (shut_down_) {
      return;
    }
    if (ShouldStartDailyReportNow(report_config, day_index,
                                  current_day_index)) {
      StartReportNow(report_config, day_index, day_index);
    } else {
      VLOG(4) << "ShouldStartDailyReportNow() returned false for report_config "
              << IdString(report_config) << " day_index=" << day_index
              << " current_day_index=" << current_day_index;
    }
  }
}

void ReportScheduler::ProcessWeeklyReport(const ReportConfig& report_config,
                                          uint32_t current_day_index) {
  LOG_STACKDRIVER_COUNT_METRIC(ERROR, ProcessReportFailMetricName("weekly"))
      << "Scheduling of weekly reports is not yet implemented. ReportConfig: "
      << IdString(report_config);
  return;
}

void ReportScheduler::ProcessMonthlyReport(const ReportConfig& report_config,
                                           uint32_t current_day_index) {
  LOG_STACKDRIVER_COUNT_METRIC(ERROR, ProcessReportFailMetricName("monthly"))
      << "Scheduling of monthly reports is not yet implemented. ReportConfig: "
      << IdString(report_config);
  return;
}

bool ReportScheduler::ShouldStartDailyReportNow(
    const ReportConfig& report_config, uint32_t day_index,
    uint32_t current_day_index) {
  if (day_index > current_day_index) {
    LOG_STACKDRIVER_COUNT_METRIC(ERROR, ProcessReportFailMetricName("daily"))
        << "Unexpected condition: " << day_index
        << " = day_index > current_day_index = " << current_day_index
        << " for ReportConfig " << IdString(report_config);
    return false;
  }
  if (day_index > current_day_index -
                      report_config.scheduling().report_finalization_days()) {
    // We want to generate the report repeatedly during the report finalization
    // period, but we don't want to start it again now if we previously started
    // it and that hasn't completed.
    return !report_history_->InProgress(report_config, day_index, day_index);
  }
  // After the report finalization period we only want to run the report once.
  // If it was ever successfully completed don't run it again. Also if we
  // previously started the report and that attempt hasn't finished yet,
  // don't start it again.
  return !(report_history_->CompletedSuccessfullyOrInProgress(
      report_config, day_index, day_index));
}

void ReportScheduler::StartReportNow(const ReportConfig& report_config,
                                     uint32_t first_day_index,
                                     uint32_t last_day_index) {
  const std::string export_name =
      ReportExportName(report_config, first_day_index, last_day_index);
  // We want to store the rows of the report in the report store in all cases
  // except when it is a RAW_DUMP report.
  bool in_store = (report_config.report_type() != RAW_DUMP);
  ReportId report_id;
  LOG(INFO) << "ReportScheduler starting report " << IdString(report_config)
            << " [" << first_day_index << ", " << last_day_index << "]";
  auto status = report_starter_->StartReport(report_config, first_day_index,
                                             last_day_index, export_name,
                                             in_store, &report_id);
  if (!status.ok()) {
    LOG_STACKDRIVER_COUNT_METRIC(ERROR, kStartReportNowFailure)
        << "ReportScheduler was unable to start a report for ReportConfig "
        << IdString(report_config) << " first_day_index=" << first_day_index
        << " last_day_index=" << last_day_index
        << " error code=" << status.error_code()
        << " error message=" << status.error_message();
    return;
  }
  report_history_->RecordStart(report_config, first_day_index, last_day_index,
                               report_id);
}

std::string ReportScheduler::ReportExportName(const ReportConfig& report_config,
                                              uint32_t first_day_index,
                                              uint32_t last_day_index) {
  std::ostringstream stream;
  stream << "report_" << report_config.customer_id() << "_"
         << report_config.project_id() << "_" << report_config.id() << "_"
         << DateSuffix(first_day_index);
  if (last_day_index != first_day_index) {
    stream << "_" << DateSuffix(last_day_index);
  }
  return stream.str();
}

uint32_t ReportScheduler::CurrentDayIndex() {
  CHECK(clock_);
  std::time_t current_time =
      std::chrono::system_clock::to_time_t(clock_->now());
  return TimeToDayIndex(current_time, Metric::UTC);
}

ReportStarter::ReportStarter(ReportMasterService* report_master_service)
    : report_master_service_(report_master_service) {}

grpc::Status ReportStarter::StartReport(const ReportConfig& report_config,
                                        uint32_t first_day_index,
                                        uint32_t last_day_index,
                                        const std::string& export_name,
                                        bool in_store,
                                        ReportId* report_id_out) {
  StartReportRequest start_request;
  start_request.set_customer_id(report_config.customer_id());
  start_request.set_project_id(report_config.project_id());
  start_request.set_report_config_id(report_config.id());
  start_request.set_first_day_index(first_day_index);
  start_request.set_last_day_index(last_day_index);
  StartReportResponse response_not_used;
  // This is not a one-off report generation. Rather it is scheduled.
  bool one_off = false;
  return report_master_service_->StartReportNoAuth(
      &start_request, one_off, export_name, in_store, report_id_out,
      &response_not_used);
}

}  // namespace analyzer
}  // namespace cobalt
