// Copyright 2017 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "src/uploader/shipping_manager.h"

#include <mutex>
#include <utility>

#include "src/lib/util/protected_fields.h"
#include "src/logger/internal_metrics_config.cb.h"
#include "src/logger/logger_interface.h"
#include "src/logging.h"
#include "src/pb/clearcut_extensions.pb.h"
#include "third_party/protobuf/src/google/protobuf/util/delimited_message_util.h"

namespace cobalt::uploader {

using observation_store::ObservationStore;
using EnvelopeHolder = ObservationStore::EnvelopeHolder;
using cobalt::clearcut_extensions::LogEventExtension;

namespace {

// The number of upload failures after which ShippingManager will bail out of an
// invocation of SendAllEnvelopes().
constexpr size_t kMaxFailuresWithoutSuccess = 3;

std::string ToString(const std::chrono::system_clock::time_point& t) {
  std::time_t time_struct = std::chrono::system_clock::to_time_t(t);
  return std::ctime(&time_struct);
}

}  // namespace

ShippingManager::ShippingManager(const UploadScheduler& upload_scheduler,
                                 ObservationStore* observation_store,
                                 util::EncryptedMessageMaker* encrypt_to_analyzer,
                                 std::unique_ptr<ActivityListenerInterface> activity_listener)
    : encrypt_to_analyzer_(encrypt_to_analyzer),
      upload_scheduler_(upload_scheduler),
      next_scheduled_send_time_(std::chrono::system_clock::now() + upload_scheduler_.Interval()),
      activity_listener_(std::move(activity_listener)),
      protected_fields_(observation_store) {
  if (activity_listener_ != nullptr) {
    activity_listener_->Start([this](ActivityState state) {
      auto locked = protected_fields_.lock();
      if (state != ACTIVE && locked->waiting_for_idle_device) {
        locked->waiting_for_idle_device = false;
        RequestSendSoonLockHeld(&locked);
      }
    });
  }
}

ShippingManager::~ShippingManager() {
  if (worker_thread_.joinable()) {
    LOG(FATAL) << "worker_thread should be stopped in sub classes. Please call ShutDownAndJoin in "
                  "the sub class destructor.";
  }
}

void ShippingManager::ShutDownAndJoin() {
  if (!worker_thread_.joinable()) {
    return;
  }
  ShutDown();
  VLOG(4) << "ShippingManager destructor: waiting for worker thread to exit...";
  worker_thread_.join();
}

void ShippingManager::Start() {
  {
    // We set idle and waiting_for_schedule to false since we are about to
    // start the worker thread. The worker thread will set these variables
    // to true at the appropriate times.
    auto locked = protected_fields_.lock();
    locked->idle = false;
    locked->waiting_for_schedule = false;
  }

  std::thread t([this] { this->Run(); });
  worker_thread_ = std::move(t);
}

void ShippingManager::NotifyObservationsAdded() {
  auto locked = protected_fields_.lock();

  if (locked->observation_store->IsAlmostFull()) {
    VLOG(4) << name()
            << ": NotifyObservationsAdded(): observation_store "
               "IsAlmostFull.";
    RequestSendSoonLockHeld(&locked);
  }

  if (!locked->observation_store->Empty()) {
    // Set idle false because any thread that invokes WaitUntilIdle() after this
    // should wait until the Observation just added has been sent.
    locked->idle = false;
    locked->add_observation_notifier.notify_all();
  }
}

// The caller must hold a lock on mutex_.
void ShippingManager::RequestSendSoonLockHeld(ShippingManager::Fields::LockedFieldsPtr* fields) {
  auto& f = *fields;

  VLOG(5) << name() << ":RequestSendSoonLockHeld()";
  f->expedited_send_requested = true;
  f->expedited_send_notifier.notify_all();
  // We set waiting_for_schedule_ false here so that if the calling thread
  // invokes WaitUntilWorkerWaiting() after this then it will be waiting
  // for a *subsequent* time that the worker thread enters the
  // waiting-for-schedule state.
  f->waiting_for_schedule = false;
}

void ShippingManager::RequestSendSoon() { RequestSendSoon(SendCallback()); }

void ShippingManager::RequestSendSoon(const SendCallback& send_callback) {
  VLOG(4) << name() << ": Expedited send requested.";
  auto locked = protected_fields_.lock();
  RequestSendSoonLockHeld(&locked);

  // If we were given a SendCallback then do one of two things...
  if (send_callback) {
    if (locked->observation_store->Empty() && locked->idle) {
      // If the ObservationStore is empty and the ShippingManager is idle. Then
      // we can safely invoke the SendCallback immediately.
      locked->expedited_send_requested = false;
      VLOG(5) << name()
              << "::RequestSendSoon. Not waiting because there are no "
                 "observations. Invoking callback(true) now.";
      send_callback(true);
    } else {
      // Otherwise, we should put the callback into the send callback queue.
      locked->send_callback_queue.push_back(send_callback);
    }
  }
}

bool ShippingManager::shut_down() const { return protected_fields_.const_lock()->shut_down; }

bool ShippingManager::device_active() {
  if (activity_listener_ == nullptr) {
    return false;
  }
  return activity_listener_->state() == ACTIVE;
}

void ShippingManager::ShutDown() {
  {
    auto locked = protected_fields_.lock();
    locked->shut_down = true;
    locked->shutdown_notifier.notify_all();
    locked->add_observation_notifier.notify_all();
    locked->expedited_send_notifier.notify_all();
    locked->idle_notifier.notify_all();
    locked->waiting_for_schedule_notifier.notify_all();
    locked->waiting_for_idle_device_notifier.notify_all();
  }
  VLOG(4) << name() << ": shut-down requested.";
}

size_t ShippingManager::num_send_attempts() const {
  return protected_fields_.const_lock()->num_send_attempts;
}

size_t ShippingManager::num_failed_attempts() const {
  return protected_fields_.const_lock()->num_failed_attempts;
}

cobalt::util::Status ShippingManager::last_send_status() const {
  return protected_fields_.const_lock()->last_send_status;
}

size_t ShippingManager::num_delayed_sends() const {
  return protected_fields_.const_lock()->num_delayed_sends;
}

void ShippingManager::Disable(bool is_disabled) {
  LOG(INFO) << name() << ": " << (is_disabled ? "Disabling" : "Enabling")
            << " observation uploading.";
  auto locked = protected_fields_.lock();
  locked->is_disabled = is_disabled;
  if (!is_disabled) {
    // Set idle false when setting is_disabled to false to make sure that the worker thread wakes up
    // and runs through another loop. Even if there are no observations to send, we want to make
    // sure that any thread that invokes WaitUntilIdle() after this will actually wait until the
    // ShippingManager returns to the idle state.
    locked->idle = false;
    locked->add_observation_notifier.notify_all();
  }
}

void ShippingManager::Run() {
  while (true) {
    auto locked = protected_fields_.lock();
    if (locked->shut_down) {
      return;
    }

    // We start each iteration of the loop with a sleep of
    // upload_scheduler_.MinInterval().
    // This ensures that we never send twice within one
    // upload_scheduler_.MinInterval() period.

    // Sleep for upload_scheduler_.MinInterval() or until shut_down_.
    VLOG(4) << name() << " worker: sleeping for " << upload_scheduler_.MinInterval().count()
            << " seconds.";
    locked->shutdown_notifier.wait_for(locked, upload_scheduler_.MinInterval(),
                                       [&locked] { return (locked->shut_down); });

    VLOG(4) << name() << " worker: waking up from sleep. shut_down_=" << locked->shut_down;
    if (locked->shut_down) {
      return;
    }

    if (!locked->ObservationsAvailable()) {
      // There are no Observations at all in the observation_store_, or the ShippingManager is
      // disabled. Wait forever until one of the following occurs:
      //
      // 1. An observation is added.
      // 2. The value of is_disabled changes.
      // 3. The ShippingManager shuts down.
      VLOG(5) << name()
              << " worker: waiting for an Observation to "
                 "be added.";
      // If we are about to enter idle, we should make sure that we invoke all of the SendCallbacks
      // so they don't have to wait until the next time observations are added.
      InvokeSendCallbacksLockHeld(&locked, true);
      locked->idle = true;
      locked->idle_notifier.notify_all();
      locked->add_observation_notifier.wait(locked, [&locked] {
        return (
            locked->shut_down ||
            // If there are now observations available to upload, we should leave the waiting state.
            locked->ObservationsAvailable() ||
            // The ObservationStore has manually been set to non-idle by either Disable or
            // NotifyObservationAdded. We should leave the wait state and run another loop.
            !locked->idle);
      });
      VLOG(5) << name()
              << " worker: Waking up because an Observation was "
                 "added.";
      locked->idle = false;
      continue;
    }

    // Otherwise, there are Observations available in the observation_store_.
    auto now = std::chrono::system_clock::now();
    VLOG(4) << name() << ": now: " << ToString(now)
            << " next_scheduled_send_time_: " << ToString(next_scheduled_send_time_);

    if (!locked->expedited_send_requested) {
      bool should_wait_for_idle_device =
          (next_scheduled_send_time_ <= now) && device_active() && !locked->waiting_for_idle_device;
      if (should_wait_for_idle_device || next_scheduled_send_time_ > now) {
        if (should_wait_for_idle_device) {
          VLOG(4) << name() << " worker: starting to wait for idle device.";
          locked->num_delayed_sends++;
          locked->waiting_for_idle_device = true;
          locked->waiting_for_idle_device_notifier.notify_all();
        }

        // Wait until the next scheduled send.
        auto time = std::chrono::system_clock::to_time_t(next_scheduled_send_time_);
        VLOG(4) << name() << " worker: waiting until " << std::ctime(&time)
                << " for next scheduled send.";
        locked->waiting_for_schedule = true;
        locked->waiting_for_schedule_notifier.notify_all();
        locked->expedited_send_notifier.wait_until(locked, next_scheduled_send_time_, [&locked] {
          return (locked->shut_down || locked->expedited_send_requested);
        });
        locked->waiting_for_schedule = false;
        continue;
      }
    }

    if (locked->expedited_send_requested) {
      VLOG(4) << name() << " worker: expedited send requested.";
      locked->expedited_send_requested = false;
    }
    VLOG(4) << name() << " worker: time to send now.";
    locked.unlock();
    SendAllEnvelopes();
    next_scheduled_send_time_ = std::chrono::system_clock::now() + upload_scheduler_.Interval();
    locked.lock();
  }
}

void ShippingManager::SendAllEnvelopes() {
  if (protected_fields_.const_lock()->is_disabled) {
    return;
  }

  auto state = logger::IdleObservationUploadMetricDimensionDeviceState::Unreported;
  if (activity_listener_ != nullptr) {
    switch (activity_listener_->state()) {
      case ActivityState::ACTIVE:
        state = logger::IdleObservationUploadMetricDimensionDeviceState::Active;
        break;
      case ActivityState::UNKNOWN:
        state = logger::IdleObservationUploadMetricDimensionDeviceState::Unknown;
        break;
      case ActivityState::IDLE:
        state = logger::IdleObservationUploadMetricDimensionDeviceState::Idle;
        break;
    }
  }
  internal_metrics_->IdleObservationUpload(state);

  VLOG(5) << name() << ": SendAllEnvelopes().";
  bool success = true;
  size_t failures_without_success = 0;
  // Loop through all envelopes in the ObservationStore.
  while (true) {
    auto holder = protected_fields_.lock()->observation_store->TakeNextEnvelopeHolder();
    if (holder == nullptr) {
      // No more envelopes in the store, we can exit the loop.
      break;
    }
    auto failed_holder = SendEnvelopeToBackend(std::move(holder));
    if (failed_holder == nullptr) {
      // The send succeeded.
      failures_without_success = 0;
    } else {
      // The send failed. Increment failures_without_success and return the
      // failed EnvelopeHolder to the store.
      success = false;
      failures_without_success++;
      protected_fields_.lock()->observation_store->ReturnEnvelopeHolder(std::move(failed_holder));
    }

    if (failures_without_success >= kMaxFailuresWithoutSuccess) {
      VLOG(4) << name() << "::SendAllEnvelopes(): failed too many times ("
              << failures_without_success << "). Stopping uploads.";
      break;
    }
  }

  {
    auto locked = protected_fields_.lock();
    InvokeSendCallbacksLockHeld(&locked, success);
  }
}

void ShippingManager::InvokeSendCallbacksLockHeld(ShippingManager::Fields::LockedFieldsPtr* fields,
                                                  bool success) {
  auto& f = *fields;

  f->expedited_send_requested = false;
  std::vector<SendCallback> callbacks_to_invoke;
  callbacks_to_invoke.swap(f->send_callback_queue);
  for (SendCallback& callback : callbacks_to_invoke) {
    VLOG(5) << name() << ": Invoking send callback(" << success << ") now.";
    callback(success);
  }
}

ClearcutV1ShippingManager::ClearcutV1ShippingManager(
    const UploadScheduler& upload_scheduler, ObservationStore* observation_store,
    util::EncryptedMessageMaker* encrypt_to_shuffler,
    util::EncryptedMessageMaker* encrypt_to_analyzer,
    std::unique_ptr<lib::clearcut::ClearcutUploaderInterface> clearcut,
    std::unique_ptr<ActivityListenerInterface> activity_listener, int32_t log_source_id,
    size_t max_attempts_per_upload, std::string api_key)
    : ShippingManager(upload_scheduler, observation_store, encrypt_to_analyzer,
                      std::move(activity_listener)),
      max_attempts_per_upload_(max_attempts_per_upload),
      clearcut_(std::move(clearcut)),
      api_key_(std::move(api_key)),
      encrypt_to_shuffler_(encrypt_to_shuffler),
      log_source_id_(log_source_id) {}

void ClearcutV1ShippingManager::ResetInternalMetrics(logger::InternalMetrics* internal_metrics) {
  internal_metrics_.reset(internal_metrics);
  clearcut_->ResetInternalMetrics(internal_metrics);
}

std::unique_ptr<EnvelopeHolder> ClearcutV1ShippingManager::SendEnvelopeToBackend(
    std::unique_ptr<EnvelopeHolder> envelope_to_send) {
  auto envelope = envelope_to_send->GetEnvelope(encrypt_to_analyzer_);
  envelope.set_api_key(api_key_);

  util::Status status = SendEnvelopeToClearcutDestination(envelope, envelope_to_send->Size());
  if (!status.ok()) {
    VLOG(4) << name() << ": Cobalt send to Shuffler failed: (" << status.error_code() << ") "
            << status.error_message() << ". Observations have been re-enqueued for later.";
    return envelope_to_send;
  }
  return nullptr;
}

util::Status ClearcutV1ShippingManager::SendEnvelopeToClearcutDestination(const Envelope& envelope,
                                                                          size_t envelope_size) {
  auto log_extension = std::make_unique<LogEventExtension>();

  if (!encrypt_to_shuffler_->Encrypt(envelope,
                                     log_extension->mutable_cobalt_encrypted_envelope())) {
    // TODO(rudominer) log
    // Drop on floor.
    return util::Status::OK;
  }

  for (const auto& observation_batch : envelope.batch()) {
    const auto& metadata = observation_batch.meta_data();
    internal_metrics_->BytesUploaded(
        logger::PerProjectBytesUploadedMetricDimensionStatus::Attempted,
        observation_batch.ByteSizeLong(), metadata.customer_id(), metadata.project_id());
  }

  VLOG(5) << name() << " worker: Sending Envelope of size " << envelope_size
          << " bytes to clearcut.";

  lib::clearcut::LogRequest request;
  request.set_log_source(log_source_id_);
  request.add_log_event()->SetAllocatedExtension(LogEventExtension::ext, log_extension.release());

  util::Status status;
  {
    std::lock_guard<std::mutex> lock(clearcut_mutex_);
    status = clearcut_->UploadEvents(&request, max_attempts_per_upload_);
  }
  {
    auto locked = protected_fields_.lock();
    locked->num_send_attempts++;
    if (!status.ok()) {
      locked->num_failed_attempts++;
    }
    locked->last_send_status = status;
  }
  if (status.ok()) {
    VLOG(4) << name() << "::SendEnvelopeToBackend: OK";

    for (const auto& observation_batch : envelope.batch()) {
      const auto& metadata = observation_batch.meta_data();
      internal_metrics_->BytesUploaded(
          logger::PerProjectBytesUploadedMetricDimensionStatus::Succeeded,
          observation_batch.GetCachedSize(), metadata.customer_id(), metadata.project_id());
    }
  }
  return status;
}

void ShippingManager::WaitUntilIdle(std::chrono::seconds max_wait) {
  auto locked = protected_fields_.lock();
  if (locked->shut_down || locked->idle) {
    return;
  }
  locked->idle_notifier.wait_for(locked, max_wait,
                                 [&locked] { return (locked->shut_down || locked->idle); });
}

void ShippingManager::WaitUntilWorkerWaiting(std::chrono::seconds max_wait) {
  auto locked = protected_fields_.lock();
  if (locked->shut_down || locked->waiting_for_schedule) {
    return;
  }
  locked->waiting_for_schedule_notifier.wait_for(
      locked, max_wait, [&locked] { return (locked->shut_down || locked->waiting_for_schedule); });
}

void ShippingManager::WaitUntilWorkerWaitingIdleDevice(std::chrono::seconds max_wait) {
  auto locked = protected_fields_.lock();
  if (locked->shut_down || locked->waiting_for_idle_device) {
    return;
  }
  locked->waiting_for_idle_device_notifier.wait_for(locked, max_wait, [&locked] {
    return (locked->shut_down || locked->waiting_for_idle_device);
  });
}

LocalShippingManager::LocalShippingManager(observation_store::ObservationStore* observation_store,
                                           std::string output_file_path, util::FileSystem* fs)
    : LocalShippingManager(observation_store, nullptr, std::move(output_file_path), fs) {}

LocalShippingManager::LocalShippingManager(observation_store::ObservationStore* observation_store,
                                           util::EncryptedMessageMaker* encrypt_to_analyzer,
                                           std::string output_file_path, util::FileSystem* fs)
    : ShippingManager({std::chrono::seconds::zero(), std::chrono::seconds::zero()},
                      observation_store, encrypt_to_analyzer),
      output_file_path_(std::move(output_file_path)),
      fs_(fs) {
  CHECK(fs_);
}

std::unique_ptr<EnvelopeHolder> LocalShippingManager::SendEnvelopeToBackend(
    std::unique_ptr<EnvelopeHolder> envelope_to_send) {
  const Envelope& envelope = envelope_to_send->GetEnvelope(encrypt_to_analyzer_);

  VLOG(5) << name() << " worker: Saving Envelope of size " << envelope_to_send->Size()
          << " bytes to local file.";

  util::Status status = util::Status::OK;

  auto ofs = fs_->NewProtoOutputStream(output_file_path_, /*append=*/true);
  if (ofs.ok()) {
    auto proto_output_stream = std::move(ofs.ValueOrDie());
    if (!google::protobuf::util::SerializeDelimitedToZeroCopyStream(envelope,
                                                                    proto_output_stream.get())) {
      VLOG(4) << name() << ": Unable to write Envelope to local file: " << output_file_path_;
    }
  }

  {
    auto locked = protected_fields_.lock();
    locked->num_send_attempts++;
    if (!status.ok()) {
      locked->num_failed_attempts++;
    }
    locked->last_send_status = status;
  }
  if (status.ok()) {
    VLOG(4) << name() << "::SendEnvelopeToBackend: OK";
    return nullptr;
  }

  VLOG(4) << name() << ": Cobalt save to local file failed: (" << status.error_code() << ") "
          << status.error_message() << ". Observations have been re-enqueued for later.";
  return envelope_to_send;
}

}  // namespace cobalt::uploader
