// Copyright 2017 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include <mutex>
#include <utility>

#include "./clearcut_extensions.pb.h"
#include "./logging.h"
#include "encoder/shipping_manager.h"

namespace cobalt {
namespace encoder {

typedef ObservationStore::EnvelopeHolder EnvelopeHolder;
using cobalt::clearcut_extensions::LogEventExtension;

namespace {

// The number of upload failures after which ShippingManager will bail out of an
// invocation of SendAllEnvelopes().
static size_t kMaxFailuresWithoutSuccess = 3;

std::string ToString(const std::chrono::system_clock::time_point& t) {
  std::time_t time_struct = std::chrono::system_clock::to_time_t(t);
  return std::ctime(&time_struct);
}

grpc::Status CobaltStatusToGrpcStatus(util::Status status) {
  return grpc::Status(static_cast<grpc::StatusCode>(status.error_code()),
                      status.error_message(), status.error_details());
}

}  // namespace

ShippingManager::ShippingManager(
    const UploadScheduler& upload_scheduler,
    ObservationStore* observation_store,
    util::EncryptedMessageMaker* encrypt_to_shuffler)
    : upload_scheduler_(upload_scheduler),
      next_scheduled_send_time_(std::chrono::system_clock::now() +
                                upload_scheduler_.Interval()),
      encrypt_to_shuffler_(encrypt_to_shuffler) {
  CHECK(observation_store);
  _mutex_protected_fields_do_not_access_directly_.observation_store =
      observation_store;
}

ShippingManager::~ShippingManager() {
  if (!worker_thread_.joinable()) {
    return;
  }
  ShutDown();
  VLOG(4) << "ShippingManager waiting for worker thread to exit...";
  worker_thread_.join();
}

void ShippingManager::Start() {
  {
    // We set idle and waiting_for_schedule to false since we are about to
    // start the worker thread. The worker thread will set these variables
    // to true at the appropriate times.
    auto locked = lock();
    locked->fields->idle = false;
    locked->fields->waiting_for_schedule = false;
  }

  std::thread t([this] { this->Run(); });
  worker_thread_ = std::move(t);
}

void ShippingManager::NotifyObservationsAdded() {
  auto locked = lock();

  if (locked->fields->observation_store->IsAlmostFull()) {
    VLOG(4) << "NotifyObservationsAdded(): observation_store "
               "IsAlmostFull.";
    RequestSendSoonLockHeld(locked->fields);
  }

  if (!locked->fields->observation_store->Empty()) {
    // Set idle false because any thread that invokes WaitUntilIdle() after this
    // should wait until the Observation just added has been sent.
    locked->fields->idle = false;
    locked->fields->add_observation_notifier.notify_all();
  }
}

// The caller must hold a lock on mutex_.
void ShippingManager::RequestSendSoonLockHeld(MutexProtectedFields* fields) {
  VLOG(5) << "ShippingManager::RequestSendSoonLockHeld()";
  fields->expedited_send_requested = true;
  fields->expedited_send_notifier.notify_all();
  // We set waiting_for_schedule_ false here so that if the calling thread
  // invokes WaitUntilWorkerWaiting() after this then it will be waiting
  // for a *subsequent* time that the worker thread enters the
  // waiting-for-schedule state.
  fields->waiting_for_schedule = false;
}

void ShippingManager::RequestSendSoon() { RequestSendSoon(SendCallback()); }

void ShippingManager::RequestSendSoon(SendCallback send_callback) {
  VLOG(4) << "ShippingManager: Expedited send requested.";
  auto locked = lock();
  RequestSendSoonLockHeld(locked->fields);

  // If we were given a SendCallback then do one of two things...
  if (send_callback) {
    if (locked->fields->observation_store->Empty() && locked->fields->idle) {
      // If the ObservationStore is empty and the ShippingManager is idle. Then
      // we can safely invoke the SendCallback immediately.
      locked->fields->expedited_send_requested = false;
      send_callback(true);
    } else {
      // Otherwise, we should put the callback into the send callback queue.
      locked->fields->send_callback_queue.push_back(send_callback);
    }
  }
}

bool ShippingManager::shut_down() { return lock()->fields->shut_down; }

void ShippingManager::ShutDown() {
  {
    auto locked = lock();
    cancel_handle_.TryCancel();
    locked->fields->shut_down = true;
    locked->fields->shutdown_notifier.notify_all();
    locked->fields->add_observation_notifier.notify_all();
    locked->fields->expedited_send_notifier.notify_all();
    locked->fields->idle_notifier.notify_all();
    locked->fields->waiting_for_schedule_notifier.notify_all();
  }
  VLOG(4) << "ShippingManager: shut-down requested.";
}

size_t ShippingManager::num_send_attempts() {
  auto locked = lock();
  return locked->fields->num_send_attempts;
}

size_t ShippingManager::num_failed_attempts() {
  auto locked = lock();
  return locked->fields->num_failed_attempts;
}

grpc::Status ShippingManager::last_send_status() {
  auto locked = lock();
  return locked->fields->last_send_status;
}

void ShippingManager::Run() {
  while (true) {
    auto locked = lock();
    if (locked->fields->shut_down) {
      return;
    }

    // We start each iteration of the loop with a sleep of
    // upload_scheduler_.MinInterval().
    // This ensures that we never send twice within one
    // upload_scheduler_.MinInterval() period.

    // Sleep for upload_scheduler_.MinInterval() or until shut_down_.
    VLOG(4) << "ShippingManager worker: sleeping for "
            << upload_scheduler_.MinInterval().count() << " seconds.";
    locked->fields->shutdown_notifier.wait_for(
        locked->lock, upload_scheduler_.MinInterval(),
        [&locked] { return (locked->fields->shut_down); });
    VLOG(4) << "ShippingManager worker: waking up from sleep. shut_down_="
            << locked->fields->shut_down;
    if (locked->fields->shut_down) {
      return;
    }

    if (locked->fields->observation_store->Empty()) {
      // There are no Observations at all in the observation_store_. Wait
      // forever until notified that one arrived or shut down.
      VLOG(5) << "ShippingManager worker: waiting for an Observation to "
                 "be added.";
      // If we are about to leave idle, we should make sure that we invoke all
      // of the SendCallbacks so they don't have to wait until the next time
      // observations are added.
      InvokeSendCallbacksLockHeld(locked->fields, true);
      locked->fields->idle = true;
      locked->fields->idle_notifier.notify_all();
      locked->fields->add_observation_notifier.wait(locked->lock, [&locked] {
        return (locked->fields->shut_down ||
                !locked->fields->observation_store->Empty());
      });
      VLOG(5) << "ShippingManager worker: Waking up because an Observation was "
                 "added.";
      locked->fields->idle = false;
    } else {
      auto now = std::chrono::system_clock::now();
      VLOG(4) << "now: " << ToString(now) << " next_scheduled_send_time_: "
              << ToString(next_scheduled_send_time_);
      if (next_scheduled_send_time_ <= now ||
          locked->fields->expedited_send_requested) {
        VLOG(4) << "ShippingManager worker: time to send now.";
        locked->fields->expedited_send_requested = false;
        locked->lock.unlock();
        SendAllEnvelopes();
        next_scheduled_send_time_ =
            std::chrono::system_clock::now() + upload_scheduler_.Interval();
        locked->lock.lock();
      } else {
        // Wait until the next scheduled send time or until notified of
        // a new request for an expedited send or we are shut down.
        auto time =
            std::chrono::system_clock::to_time_t(next_scheduled_send_time_);
        VLOG(4) << "ShippingManager worker: waiting until " << std::ctime(&time)
                << " for next scheduled send.";
        locked->fields->waiting_for_schedule = true;
        locked->fields->waiting_for_schedule_notifier.notify_all();
        locked->fields->expedited_send_notifier.wait_until(
            locked->lock, next_scheduled_send_time_, [&locked] {
              return (locked->fields->shut_down ||
                      locked->fields->expedited_send_requested);
            });
        locked->fields->waiting_for_schedule = false;
      }
    }
  }
}

void ShippingManager::SendAllEnvelopes() {
  VLOG(5) << "ShippingManager: SendAllEnvelopes().";
  bool success = true;
  size_t failures_without_success = 0;
  // Loop through all envelopes in the ObservationStore.
  while (true) {
    auto holder = lock()->fields->observation_store->TakeNextEnvelopeHolder();
    if (holder == nullptr) {
      // No more envelopes in the store, we can exit the loop.
      break;
    }
    auto failed_holder = SendEnvelopeToBackend(std::move(holder));
    if (failed_holder == nullptr) {
      // The send succeeded.
      failures_without_success = 0;
    } else {
      // The send failed. Increment failures_without_success and return the
      // failed EnvelopeHolder to the store.
      success = false;
      failures_without_success++;
      lock()->fields->observation_store->ReturnEnvelopeHolder(
          std::move(failed_holder));
    }

    if (failures_without_success >= kMaxFailuresWithoutSuccess) {
      VLOG(4) << "ShippingManager::SendAllEnvelopes(): failed too many times ("
              << failures_without_success << "). Stopping uploads.";
      break;
    }
  }

  {
    auto locked = lock();
    InvokeSendCallbacksLockHeld(locked->fields, success);
  }
}

void ShippingManager::InvokeSendCallbacksLockHeld(MutexProtectedFields* fields,
                                                  bool success) {
  fields->expedited_send_requested = false;
  std::vector<SendCallback> callbacks_to_invoke;
  callbacks_to_invoke.swap(fields->send_callback_queue);
  for (SendCallback& callback : callbacks_to_invoke) {
    callback(success);
  }
}

LegacyShippingManager::LegacyShippingManager(
    const UploadScheduler& upload_scheduler,
    ObservationStore* observation_store,
    util::EncryptedMessageMaker* encrypt_to_shuffler,
    const SendRetryerParams send_retryer_params,
    SendRetryerInterface* send_retryer)
    : ShippingManager(upload_scheduler, observation_store, encrypt_to_shuffler),
      send_retryer_params_(send_retryer_params),
      send_retryer_(send_retryer) {
  CHECK(send_retryer_);
}

std::unique_ptr<EnvelopeHolder> LegacyShippingManager::SendEnvelopeToBackend(
    std::unique_ptr<EnvelopeHolder> envelope_to_send) {
  EncryptedMessage encrypted_envelope;
  if (!encrypt_to_shuffler_->Encrypt(envelope_to_send->GetEnvelope(),
                                     &encrypted_envelope)) {
    // TODO(rudominer) log
    // Drop on floor.
    return nullptr;
  }
  VLOG(5) << "ShippingManager worker: Sending Envelope of size "
          << envelope_to_send->Size() << " bytes to legacy backend.";
  auto status = send_retryer_->SendToShuffler(
      send_retryer_params_.initial_rpc_deadline_,
      send_retryer_params_.deadline_per_send_attempt_, &cancel_handle_,
      encrypted_envelope);
  {
    auto locked = lock();
    locked->fields->num_send_attempts++;
    if (!status.ok()) {
      locked->fields->num_failed_attempts++;
    }
    locked->fields->last_send_status = status;
  }
  if (status.ok()) {
    VLOG(4) << "ShippingManager::SendOneEnvelope: OK";
    return nullptr;
  }

  VLOG(4) << "Cobalt send to Shuffler failed: (" << status.error_code() << ") "
          << status.error_message()
          << ". Observations have been re-enqueued for later.";
  return envelope_to_send;
}

ClearcutV1ShippingManager::ClearcutV1ShippingManager(
    const UploadScheduler& upload_scheduler,
    ObservationStore* observation_store,
    util::EncryptedMessageMaker* encrypt_to_shuffler,
    std::unique_ptr<clearcut::ClearcutUploader> clearcut)
    : ShippingManager(upload_scheduler, observation_store, encrypt_to_shuffler),
      clearcut_(std::move(clearcut)) {}

std::unique_ptr<EnvelopeHolder>
ClearcutV1ShippingManager::SendEnvelopeToBackend(
    std::unique_ptr<EnvelopeHolder> envelope_to_send) {
  auto log_extension = std::make_unique<LogEventExtension>();
  if (!encrypt_to_shuffler_->Encrypt(
          envelope_to_send->GetEnvelope(),
          log_extension->mutable_cobalt_encrypted_envelope())) {
    // TODO(rudominer) log
    // Drop on floor.
    return nullptr;
  }
  VLOG(5) << "ShippingManager worker: Sending Envelope of size "
          << envelope_to_send->Size() << " bytes to clearcut.";

  clearcut::LogRequest request;
  request.set_log_source(clearcut::kFuchsiaCobaltShufflerInputDevel);
  request.add_log_event()->SetAllocatedExtension(LogEventExtension::ext,
                                                 log_extension.release());

  util::Status status;
  {
    std::lock_guard<std::mutex> lock(clearcut_mutex_);
    status = clearcut_->UploadEvents(&request);
  }
  {
    auto locked = lock();
    locked->fields->num_send_attempts++;
    if (!status.ok()) {
      locked->fields->num_failed_attempts++;
    }
    locked->fields->last_send_status = CobaltStatusToGrpcStatus(status);
  }
  if (status.ok()) {
    VLOG(4) << "ShippingManager::SendEnvelopeToBackend: OK";
    return nullptr;
  }

  VLOG(4) << "Cobalt send to Shuffler failed: (" << status.error_code() << ") "
          << status.error_message()
          << ". Observations have been re-enqueued for later.";
  return envelope_to_send;
}

void ShippingManager::WaitUntilIdle(std::chrono::seconds max_wait) {
  auto locked = lock();
  if (locked->fields->shut_down || locked->fields->idle) {
    return;
  }
  locked->fields->idle_notifier.wait_for(locked->lock, max_wait, [&locked] {
    return (locked->fields->shut_down || locked->fields->idle);
  });
}

void ShippingManager::WaitUntilWorkerWaiting(std::chrono::seconds max_wait) {
  auto locked = lock();
  if (locked->fields->shut_down || locked->fields->waiting_for_schedule) {
    return;
  }
  locked->fields->waiting_for_schedule_notifier.wait_for(
      locked->lock, max_wait, [&locked] {
        return (locked->fields->shut_down ||
                locked->fields->waiting_for_schedule);
      });
}

}  // namespace encoder
}  // namespace cobalt
