// Copyright 2017 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include <mutex>
#include <utility>

#include "./logging.h"
#include "encoder/shipping_manager.h"

namespace cobalt {
namespace encoder {

namespace {
std::string ToString(const std::chrono::system_clock::time_point& t) {
  std::time_t time_struct = std::chrono::system_clock::to_time_t(t);
  return std::ctime(&time_struct);
}
}  // namespace

// Definition of the static constant declared in shipping_manager.h.
// This must be less than 2^31. There appears to be a bug in
// std::condition_variable::wait_for() in which setting the wait time to
// std::chrono::seconds::max() effectively sets the wait time to zero.
const std::chrono::seconds ShippingManager::kMaxSeconds(999999999);

ShippingManager::ShippingManager(
    const SizeParams& size_params, const ScheduleParams& schedule_params,
    const EnvelopeMakerParams& envelope_maker_params,
    const SendRetryerParams send_retryer_params,
    SendRetryerInterface* send_retryer)
    : size_params_(size_params),
      envelope_send_threshold_size_(
          size_t(0.6 * size_params.max_bytes_per_envelope_)),
      total_bytes_send_threshold_(size_t(0.6 * size_params.max_bytes_total_)),
      schedule_params_(schedule_params),
      envelope_maker_params_(envelope_maker_params),
      send_retryer_params_(send_retryer_params),
      send_retryer_(send_retryer),
      next_scheduled_send_time_(std::chrono::system_clock::now() +
                                schedule_params_.schedule_interval_) {
  CHECK(send_retryer);
  _mutex_protected_fields_do_not_access_directly_.active_envelope_maker.reset(
      new EnvelopeMaker(envelope_maker_params.analyzer_public_key_pem_,
                        envelope_maker_params.analyzer_scheme_,
                        envelope_maker_params.shuffler_public_key_pem_,
                        envelope_maker_params.shuffler_scheme_,
                        size_params.max_bytes_per_observation_,
                        size_params_.max_bytes_per_envelope_));
}

ShippingManager::~ShippingManager() {
  if (!worker_thread_.joinable()) {
    return;
  }
  ShutDown();
  VLOG(4) << "ShippingManager waiting for worker thread to exit...";
  worker_thread_.join();
}

void ShippingManager::Start() {
  {
    // We set idle and waiting_for_schedule to false since we are about to
    // start the worker thread. The worker thread will set these variables
    // to true at the appropriate times.
    auto locked = lock();
    locked->fields->idle = false;
    locked->fields->waiting_for_schedule = false;
  }

  std::thread t([this] { this->Run(); });
  worker_thread_ = std::move(t);
}

ShippingManager::Status ShippingManager::AddObservation(
    const Observation& observation,
    std::unique_ptr<ObservationMetadata> metadata) {
  auto locked = lock();
  if (locked->fields->shut_down) {
    return kShutDown;
  }
  if (locked->fields->temporarily_full) {
    // Not just the current EnvelopeMaker, but the ShippingManager in
    // general is full. This should be very rare. Unless there is a problem
    // with sending Observations to the server we should never be full.
    // The dynamics of when this might happen will be different once we
    // implement local persistence of Observations.
    return kFull;
  }
  switch (locked->fields->active_envelope_maker->AddObservation(
      observation, std::move(metadata))) {
    case EnvelopeMaker::kOk:
      VLOG(4) << "ShippingManager::AddObservation: OK";
      // Set idle_ false because any thread that invokes WaitUntilIdle() after
      // this should wait until the Observatoin just added has been sent.
      locked->fields->idle = false;
      break;

    case EnvelopeMaker::kObservationTooBig:
      return kObservationTooBig;

    case EnvelopeMaker::kEnvelopeFull:
      // This should be very rare because of the fact that we invoke
      // RequestSendSoon() below when size() >= envelope_send_threshold_size_.
      // This should prevent us from getting to the point that the active
      // EnvelopeMaker is ever full.
      RequestSendSoonLockHeld(locked->fields);
      return kFull;

    case EnvelopeMaker::kEncryptionFailed:
      return kEncryptionFailed;
  }
  if (locked->fields->active_envelope_maker->size() >=
      envelope_send_threshold_size_) {
    // The active EnvelopeMaker is starting to get too large. Initiate
    // a send.
    RequestSendSoonLockHeld(locked->fields);
  }

  size_t new_total_bytes = locked->fields->envelopes_to_send_total_bytes +
                           locked->fields->active_envelope_maker->size();

  if (new_total_bytes > total_bytes_send_threshold_) {
    RequestSendSoonLockHeld(locked->fields);
    if (new_total_bytes > size_params_.max_bytes_total_) {
      // Not just the current EnvelopeMaker, but the ShippingManager in general
      // is now temporarily full. This should be very rare. Unless there is
      // a problem with sending Observations to the server we should never
      // be full.
      locked->fields->temporarily_full = true;
    }
  }

  locked->fields->add_observation_notifier.notify_all();
  return kOk;
}

// The caller must hold a lock on mutex_.
void ShippingManager::RequestSendSoonLockHeld(MutexProtectedFields* fields) {
  VLOG(6) << "ShippingManager::RequestSendSoonLockHeld()";
  fields->expedited_send_requested = true;
  fields->expedited_send_notifier.notify_all();
  // We set waiting_for_schedule_ false here so that if the calling thread
  // invokes WaitUntilWorkerWaiting() after this then it will be waiting
  // for a *subsequent* time that the worker thread enters the
  // waiting-for-schedule state.
  fields->waiting_for_schedule = false;
}

void ShippingManager::RequestSendSoon() { RequestSendSoon(SendCallback()); }

void ShippingManager::RequestSendSoon(SendCallback send_callback) {
  VLOG(4) << "ShippingManager: Expedited send requested.";
  auto locked = lock();
  RequestSendSoonLockHeld(locked->fields);

  // If we were given a SendCallback then do one of three things...
  if (send_callback) {
    if (locked->fields->active_envelope_maker->size() > 0) {
      // If the active EnvelopeMaker is not empty put the SendCallback
      // onto the on-deck queue so that it gets invoked after the next
      // send that includes the active EnvelopeMaker.
      locked->fields->on_deck_send_callback_queue.push_back(send_callback);
    } else if (locked->fields->envelopes_to_send_total_bytes > 0) {
      // If the active EnvelopeMaker is empty but the worker thread has
      // some EnvelopeMakers it is currently dealing with then put the
      // SendCallback directly onto the current queue so that it gets invoked
      // after the next send attempt.
      locked->fields->current_send_callback_queue.push_back(send_callback);
    } else {
      // Otherwise the ShippingManager has no Observations so invoke the
      // SendCallback immediately and clear expedited_send_requested.
      locked->fields->expedited_send_requested = false;
      send_callback(true);
    }
  }
}

bool ShippingManager::shut_down() { return lock()->fields->shut_down; }

void ShippingManager::ShutDown() {
  {
    auto locked = lock();
    cancel_handle_.TryCancel();
    locked->fields->shut_down = true;
    locked->fields->shutdown_notifier.notify_all();
    locked->fields->add_observation_notifier.notify_all();
    locked->fields->expedited_send_notifier.notify_all();
    locked->fields->idle_notifier.notify_all();
    locked->fields->waiting_for_schedule_notifier.notify_all();
  }
  VLOG(4) << "ShippingManager: shut-down requested.";
}

std::unique_ptr<EnvelopeMaker> ShippingManager::TakeActiveEnvelopeMaker() {
  auto locked = lock();
  return TakeActiveEnvelopeMakerLockHeld(locked->fields);
}

size_t ShippingManager::num_send_attempts() {
  auto locked = lock();
  return locked->fields->num_send_attempts;
}

size_t ShippingManager::num_failed_attempts() {
  auto locked = lock();
  return locked->fields->num_failed_attempts;
}

grpc::Status ShippingManager::last_send_status() {
  auto locked = lock();
  return locked->fields->last_send_status;
}

void ShippingManager::Run() {
  while (true) {
    auto locked = lock();
    if (locked->fields->shut_down) {
      return;
    }

    // We start each iteration of the loop with a sleep of
    // schedule_params_.min_interval.
    // This ensures that we never send twice within one
    // schedule_params_.min_interval period.

    // Sleep for schedule_params_.min_interval or until shut_down_.
    VLOG(4) << "ShippingManager worker: sleeping for "
            << schedule_params_.min_interval_.count() << " seconds.";
    locked->fields->shutdown_notifier.wait_for(
        locked->lock, schedule_params_.min_interval_,
        [&locked] { return (locked->fields->shut_down); });
    VLOG(4) << "ShippingManager worker: waking up from sleep. shut_down_="
            << locked->fields->shut_down;
    if (locked->fields->shut_down) {
      return;
    }

    if (locked->fields->active_envelope_maker->Empty() &&
        locked->fields->envelopes_to_send_total_bytes == 0) {
      // There are no Observations at all in the ShippingManager. Wait
      // forever until notified that one arrived or shut down.
      VLOG(4) << "ShippingManager worker: waiting for an Observation to "
                 "arrive.";
      locked->fields->idle = true;
      locked->fields->idle_notifier.notify_all();
      locked->fields->add_observation_notifier.wait(locked->lock, [&locked] {
        return (locked->fields->shut_down ||
                !locked->fields->active_envelope_maker->Empty());
      });
      locked->fields->idle = false;
    } else {
      auto now = std::chrono::system_clock::now();
      VLOG(4) << "now: " << ToString(now) << " next_scheduled_send_time_: "
              << ToString(next_scheduled_send_time_);
      if (next_scheduled_send_time_ <= now ||
          locked->fields->expedited_send_requested) {
        VLOG(4) << "ShippingManager worker: time to send now.";
        PrepareForSendLockHeld(locked->fields);
        locked->lock.unlock();
        SendAllEnvelopes();
        next_scheduled_send_time_ = std::chrono::system_clock::now() +
                                    schedule_params_.schedule_interval_;
        locked->lock.lock();
      } else {
        // Wait until the next scheduled send time or until notified of
        // a new request for an expedited send or we are shut down.
        VLOG(4) << "ShippingManager worker: waiting "
                << schedule_params_.schedule_interval_.count()
                << " seconds for next scheduled send.";
        locked->fields->waiting_for_schedule = true;
        locked->fields->waiting_for_schedule_notifier.notify_all();
        locked->fields->expedited_send_notifier.wait_until(
            locked->lock, next_scheduled_send_time_, [&locked] {
              return (locked->fields->shut_down ||
                      locked->fields->expedited_send_requested);
            });
        locked->fields->waiting_for_schedule = false;
      }
    }
  }
}

// A lock on mutex_ should be held by the caller.
std::unique_ptr<EnvelopeMaker> ShippingManager::TakeActiveEnvelopeMakerLockHeld(
    MutexProtectedFields* fields) {
  std::unique_ptr<EnvelopeMaker> latest_envelope_maker(
      new EnvelopeMaker(envelope_maker_params_.analyzer_public_key_pem_,
                        envelope_maker_params_.analyzer_scheme_,
                        envelope_maker_params_.shuffler_public_key_pem_,
                        envelope_maker_params_.shuffler_scheme_,
                        size_params_.max_bytes_per_observation_,
                        size_params_.max_bytes_per_envelope_));
  fields->active_envelope_maker.swap(latest_envelope_maker);
  return latest_envelope_maker;
}

// A lock on mutex_ should be held by the caller.
void ShippingManager::PrepareForSendLockHeld(MutexProtectedFields* fields) {
  fields->expedited_send_requested = false;
  auto latest_envelope_maker = TakeActiveEnvelopeMakerLockHeld(fields);
  fields->envelopes_to_send_total_bytes += latest_envelope_maker->size();
  envelopes_to_send_.emplace_front(std::move(latest_envelope_maker));
  // Copy on_deck_send_callback_queue onto the end of
  // current_send_callback_queue and then clear current_send_callback_queue.
  fields->current_send_callback_queue.insert(
      fields->current_send_callback_queue.end(),
      fields->on_deck_send_callback_queue.begin(),
      fields->on_deck_send_callback_queue.end());
  fields->on_deck_send_callback_queue.clear();
}

void ShippingManager::SendAllEnvelopes() {
  std::deque<std::unique_ptr<EnvelopeMaker>> envelopes_that_failed;
  while (!envelopes_to_send_.empty()) {
    SendOneEnvelope(&envelopes_that_failed);
  }
  bool success = envelopes_that_failed.empty();
  envelopes_to_send_ = std::move(envelopes_that_failed);
  size_t envelopes_to_send_total_bytes = 0;
  for (const auto& env : envelopes_to_send_) {
    envelopes_to_send_total_bytes += env->size();
  }
  VLOG(5) << "ShippingManager: envelopes_to_send_total_bytes="
          << envelopes_to_send_total_bytes;

  std::vector<SendCallback> callbacks_to_invoke;
  {
    auto locked = lock();
    locked->fields->envelopes_to_send_total_bytes =
        envelopes_to_send_total_bytes;
    if (envelopes_to_send_total_bytes +
            locked->fields->active_envelope_maker->size() <
        size_params_.max_bytes_total_) {
      locked->fields->temporarily_full = false;
    }
    callbacks_to_invoke.swap(locked->fields->current_send_callback_queue);
  }
  for (SendCallback& callback : callbacks_to_invoke) {
    callback(success);
  }
}

void ShippingManager::SendOneEnvelope(
    std::deque<std::unique_ptr<EnvelopeMaker>>* envelopes_that_failed) {
  std::unique_ptr<EnvelopeMaker> envelope_to_send;
  size_t envelope_to_send_size = 0;
  while (!envelopes_to_send_.empty() &&
         envelope_to_send_size < size_params_.min_envelope_send_size_ &&
         envelope_to_send_size + envelopes_to_send_.front()->size() <=
             size_params_.max_bytes_per_envelope_) {
    std::unique_ptr<EnvelopeMaker> next = std::move(envelopes_to_send_.front());
    envelopes_to_send_.pop_front();
    if (!envelope_to_send) {
      envelope_to_send = std::move(next);
    } else {
      envelope_to_send->MergeOutOf(next.get());
    }
    envelope_to_send_size = envelope_to_send->size();
  }
  if (!envelope_to_send || envelope_to_send_size == 0) {
    VLOG(3) << "ShippingManager worker: There are no Observations to send.";
    return;
  }
  EncryptedMessage encrypted_envelope;
  if (!envelope_to_send->MakeEncryptedEnvelope(&encrypted_envelope)) {
    // TODO(rudominer) log
    // Drop on floor.
    return;
  }

  VLOG(5) << "ShippingManager worker: Sending Envelope of size "
          << envelope_to_send->size() << " bytes.";
  auto status = send_retryer_->SendToShuffler(
      send_retryer_params_.initial_rpc_deadline_,
      send_retryer_params_.deadline_per_send_attempt_, &cancel_handle_,
      encrypted_envelope);
  {
    auto locked = lock();
    locked->fields->num_send_attempts++;
    if (!status.ok()) {
      locked->fields->num_failed_attempts++;
    }
    locked->fields->last_send_status = status;
  }
  if (status.ok()) {
    VLOG(4) << "ShippingManager::SendOneEnvelope: OK";
    return;
  }

  VLOG(1) << "Cobalt send to Shuffler failed: (" << status.error_code() << ") "
          << status.error_message()
          << ". Observations have been re-enqueued for later.";
  envelopes_that_failed->emplace_back(std::move(envelope_to_send));
}

void ShippingManager::WaitUntilIdle(std::chrono::seconds max_wait) {
  auto locked = lock();
  if (locked->fields->shut_down || locked->fields->idle) {
    return;
  }
  locked->fields->idle_notifier.wait_for(locked->lock, max_wait, [&locked] {
    return (locked->fields->shut_down || locked->fields->idle);
  });
}

void ShippingManager::WaitUntilWorkerWaiting(std::chrono::seconds max_wait) {
  auto locked = lock();
  if (locked->fields->shut_down || locked->fields->waiting_for_schedule) {
    return;
  }
  locked->fields->waiting_for_schedule_notifier.wait_for(
      locked->lock, max_wait, [&locked] {
        return (locked->fields->shut_down ||
                locked->fields->waiting_for_schedule);
      });
}

}  // namespace encoder
}  // namespace cobalt
