// Copyright 2017 The Fuchsia Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//    http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "analyzer/report_master/report_executor.h"

#include <atomic>
#include <condition_variable>
#include <deque>
#include <memory>
#include <mutex>
#include <string>
#include <thread>
#include <utility>
#include <vector>

#include "glog/logging.h"
#include "util/log_based_metrics.h"

namespace cobalt {
namespace analyzer {

using store::ReportStore;

// Stackdriver metric constants
namespace {
const char kCheckReportIdChainFailure[] = "check-report-id-chain-failure";
const char kCheckQueueSizeFailure[] =
    "report-executor-check-queue-size-failure";
const char kEnqueueFailure[] = "report-executor-enqueue-failure";
const char kProcessDependencyChainFailure[] =
    "report-executor-process-dependency-chain-failure";
const char kProcessReportIdFailure[] =
    "report-executor-process-report-id-failure";
const char kGetMetadataFailure[] = "report-executor-get-metadata-failure";
const char kStartDependentReportFailure[] =
    "report-executor-start-dependent-report-failure";
const char kEndReportFailure[] = "report-executor-end-report-failure";
}  // namespace

namespace {
// If the worker queue grows larger than this we will stop accepting new
// Enqueue requests.
const size_t kMaxQueueSize = 50000;

// Checks that report_id_chain is not empty and contains only complete
// ReportIds.
grpc::Status CheckReportIdChain(const std::vector<ReportId>& report_id_chain) {
  if (report_id_chain.empty()) {
    LOG_STACKDRIVER_COUNT_METRIC(ERROR, kCheckReportIdChainFailure)
        << "report_id_chain is empty";
    return grpc::Status(grpc::INVALID_ARGUMENT, "report_id_chain is empty");
  }
  for (const ReportId& report_id : report_id_chain) {
    // When a client first creates a ReportId it is incomplete because
    // instance_id and creation_time_seconds are not set. These values are only
    // set by virtue of the client invoking ReportStore::StartNewReport(),
    // thereby creating a complete ReportId.
    if (report_id.instance_id() == 0 ||
        report_id.creation_time_seconds() == 0) {
      std::ostringstream stream;
      stream << "Not a complete ReportId: " << ReportStore::ToString(report_id);
      std::string message = stream.str();
      LOG_STACKDRIVER_COUNT_METRIC(ERROR, kCheckReportIdChainFailure)
          << message;
      return grpc::Status(grpc::INVALID_ARGUMENT, message);
    }
  }
  return grpc::Status::OK;
}

}  // namespace

ReportExecutor::ReportExecutor(
    std::shared_ptr<store::ReportStore> report_store,
    std::unique_ptr<ReportGenerator> report_generator)
    : report_store_(report_store),
      report_generator_(std::move(report_generator)),
      shut_down_(false) {}

void ReportExecutor::Start() {
  {
    std::lock_guard<std::mutex> lock(mutex_);
    // We set idle_ to false since we are about to start the worker thread.
    // The worker thread will set idle_ to true just before it becomes
    // idle.
    idle_ = false;
  }
  std::thread t([this] { this->Run(); });
  worker_thread_ = std::move(t);
}

ReportExecutor::~ReportExecutor() {
  if (!worker_thread_.joinable()) {
    return;
  }

  {
    std::lock_guard<std::mutex> lock(mutex_);
    shut_down_ = true;
    worker_notifier_.notify_all();
  }
  worker_thread_.join();
}

grpc::Status ReportExecutor::EnqueueReportGeneration(
    std::vector<ReportId> report_id_chain) {
  auto status = CheckReportIdChain(report_id_chain);
  if (!status.ok()) {
    return status;
  }

  status = CheckQueueSize();
  if (!status.ok()) {
    return status;
  }

  return Enqueue(std::move(report_id_chain));
}

grpc::Status ReportExecutor::CheckQueueSize() {
  bool too_long = false;
  {
    std::lock_guard<std::mutex> lock(mutex_);
    too_long = work_queue_.size() >= kMaxQueueSize;
  }
  if (too_long) {
    LOG_STACKDRIVER_COUNT_METRIC(ERROR, kCheckQueueSizeFailure)
        << "Work queue too long!";
    return grpc::Status(grpc::ABORTED,
                        "Can't enqueue reports: queue too long!");
  }
  return grpc::Status::OK;
}

grpc::Status ReportExecutor::Enqueue(std::vector<ReportId> report_id_chain) {
  {
    std::lock_guard<std::mutex> lock(mutex_);
    if (shut_down_) {
      std::string message = "Shutting down. Not enqueuing.";
      LOG_STACKDRIVER_COUNT_METRIC(ERROR, kEnqueueFailure) << message;
      return grpc::Status(grpc::ABORTED, message);
    }
    work_queue_.emplace_back(std::move(report_id_chain));
    // Set idle_ false because any thread that invokes WaitUntilIdle() after
    // this should wait until the |report_id_chain| just enqueued is
    // processed.
    idle_ = false;
  }
  worker_notifier_.notify_all();
  return grpc::Status::OK;
}

void ReportExecutor::Run() {
  while (!shut_down_) {
    std::vector<ReportId> dependency_chain;
    if (!WaitAndTakeFirst(&dependency_chain)) {
      return;
    }
    ProcessDependencyChain(dependency_chain);
  }
}

bool ReportExecutor::WaitAndTakeFirst(std::vector<ReportId>* chain_out) {
  CHECK(chain_out);
  std::unique_lock<std::mutex> lock(mutex_);
  if (shut_down_) {
    return false;
  }
  if (work_queue_.empty()) {
    // Notify observers that the the worker thread is now idle;
    idle_ = true;
    idle_notifier_.notify_all();

    // Wait until the condition variable is notified and either shut_down_
    // is set or the work_queue_ is not empty.
    worker_notifier_.wait(lock, [this] {
      return (this->shut_down_ || !this->work_queue_.empty());
    });
  }
  idle_ = false;
  if (shut_down_) {
    return false;
  }
  CHECK(!work_queue_.empty());
  chain_out->swap(work_queue_[0]);
  work_queue_.pop_front();
  return true;
}

void ReportExecutor::WaitUntilIdle() {
  std::unique_lock<std::mutex> lock(mutex_);
  if (idle_) {
    return;
  }
  // Wait until the condition variable is notified and idle_ is true.
  idle_notifier_.wait(lock, [this] { return (this->idle_); });
}

void ReportExecutor::ProcessDependencyChain(
    const std::vector<ReportId>& chain) {
  DCHECK(!chain.empty());
  bool chain_failed = false;
  for (const ReportId& report_id : chain) {
    if (shut_down_) {
      LOG(INFO) << "Shutting down.";
      return;
    }
    if (chain_failed) {
      std::ostringstream stream;
      stream << "Skipping report generation for report_id="
             << ReportStore::ToString(report_id)
             << " because an earlier report in its dependency chain failed.";
      std::string message = stream.str();
      LOG_STACKDRIVER_COUNT_METRIC(ERROR, kProcessDependencyChainFailure)
          << message;
      EndReport(report_id, false, message);
    } else {
      chain_failed = !ProcessReportId(report_id);
    }
  }
}

bool ReportExecutor::ProcessReportId(const ReportId& report_id) {
  ReportMetadataLite metadata;
  if (!GetMetadata(report_id, &metadata)) {
    EndReport(report_id, false, "Unable to fetch metadata for report.");
    return false;
  }

  switch (metadata.state()) {
    case WAITING_TO_START: {
      if (!StartDependentReport(report_id)) {
        EndReport(report_id, false, "Unable to start dependent report.");
        return false;
      }
      break;
    }

    case IN_PROGRESS:
      break;

    default: {
      // Already in a terminal state.
      LOG_STACKDRIVER_COUNT_METRIC(ERROR, kProcessReportIdFailure)
          << "Unexpected state: " << metadata.state()
          << " for report_id=" << ReportStore::ToString(report_id);
      return false;
    }
  }

  auto status = report_generator_->GenerateReport(report_id);
  std::string message = (status.ok() ? "" : status.error_message());

  // End the report and then return true only if both GenerateReport and
  // EndReport succeeded.
  return EndReport(report_id, status.ok(), message) && status.ok();
}

bool ReportExecutor::GetMetadata(const ReportId& report_id,
                                 ReportMetadataLite* metadata_out) {
  auto status = report_store_->GetMetadata(report_id, metadata_out);
  if (status != store::kOK) {
    LOG_STACKDRIVER_COUNT_METRIC(ERROR, kGetMetadataFailure)
        << "GetMetadata failed with status=" << status
        << " for report_id=" << ReportStore::ToString(report_id);
    return false;
  }
  return true;
}

bool ReportExecutor::StartDependentReport(const ReportId& report_id) {
  auto status = report_store_->StartDependentReport(report_id);
  if (status != store::kOK) {
    LOG_STACKDRIVER_COUNT_METRIC(ERROR, kStartDependentReportFailure)
        << "StartDependentReport failed with status=" << status
        << " for report_id=" << ReportStore::ToString(report_id);
    return false;
  }
  return true;
}

bool ReportExecutor::EndReport(const ReportId& report_id, bool success,
                               std::string message) {
  auto status =
      report_store_->EndReport(report_id, success, std::move(message));
  if (status != store::kOK) {
    LOG_STACKDRIVER_COUNT_METRIC(ERROR, kEndReportFailure)
        << "EndReport failed with status=" << status
        << " for report_id=" << ReportStore::ToString(report_id);
    return false;
  }

  return true;
}

}  // namespace analyzer
}  // namespace cobalt
