// Copyright 2017 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#ifndef COBALT_ENCODER_SHIPPING_MANAGER_H_
#define COBALT_ENCODER_SHIPPING_MANAGER_H_

#include <atomic>
#include <chrono>
#include <condition_variable>
#include <deque>
#include <memory>
#include <mutex>
#include <string>
#include <thread>
#include <vector>

#include "./logging.h"
#include "encoder/envelope_maker.h"
#include "encoder/send_retryer.h"
#include "encoder/shuffler_client.h"

namespace cobalt {
namespace encoder {

using send_retryer::SendRetryerInterface;

// ShippingManager is a central coordinator for collecting encoded Observations
// and sending them to the Shuffler. Observations are accumulated in memory
// and periodically sent in batches to the Shuffler by a background worker
// thread on a regular schedule. ShippingManager also performs expedited
// off-schedule sends when too much unsent Observation data has accumulated.
// A client may also explicitly request an expedited send.
//
// ShippingManager uses a SendRetryer to send Observations to the Shuffler so
// in case a send fails it will be retried multiple times with exponential
// back-off.
//
// ShippingManager uses gRPC to send to the Shuffler. The unit of data sent
// in a single gRPC request is the *Envelope*. ShippingManager will distribute
// the collection of Observations it controls into one or more Envelopes
// in order to try to achieve an efficient size Envelope to send.
//
// Usage: Construct a ShippingManager, invoke Start() once, and repeatedly
// invoke AddObservation(). After Start() has been invoked calls to
// AddObservation() are thread safe: It may be invoked concurrently
// by multiple threads. Optionally invoke RequestSendSoon() to expedite a send
// operation.
//
// Usually a single ShippingManager will be constructed for an entire
// client device and all applications running on that device that wish to use
// Cobalt to collect metrics will make use of this single instance.
class ShippingManager {
 public:
  // Parameters passed to the ShippingManager constructor that control its
  // behavior with respect to the size of the data stored in memory and
  // sent using gRPC.
  class SizeParams {
   public:
    // max_bytes_per_observation: AddObservation() will return
    // kObservationTooBig if the given Observation's serialized size is bigger
    // than this.

    // max_bytes_per_envelope: When collecting Observations into Envelopes,
    // ShippingManager will not build an Envelope larger than this size. Since
    // ShippingManager sends a single Envelope in a gRPC request, this value
    // should be used to ensure that ShippingManager does not exceed the
    // maximum gRPC message size.
    //
    // max_bytes_total: ShippingManager will perform an expedited send if the
    // size of the accumulated, unsent Observation data exceeds 60% of this
    // value. If the size of the accumulated, unsent Observation data reaches
    // this value then ShippingManager will not accept any more Observations:
    // AddObservation() will return kFull, until ShippingManager is able to send
    // the accumulated Observations to the Shuffler.
    //
    // min_envelope_send_size: ShippingManager will attempt to combine Envelopes
    // with sizes smaller than this value (in bytes) into Envelopes whose size
    // exceeds this value prior to sending to the Shuffler.
    //
    // REQUIRED:
    // 0 <= max_bytes_per_observation <= max_bytes_per_envelope <=
    // max_bytes_total
    // 0 <= min_envelope_send_size <= max_bytes_per_envelope
    SizeParams(size_t max_bytes_per_observation, size_t max_bytes_per_envelope,
               size_t max_bytes_total, size_t min_envelope_send_size)
        : max_bytes_per_observation_(max_bytes_per_observation),
          max_bytes_per_envelope_(max_bytes_per_envelope),
          max_bytes_total_(max_bytes_total),
          min_envelope_send_size_(min_envelope_send_size) {
      CHECK_LE(max_bytes_per_observation_, max_bytes_per_envelope_);
      CHECK_LE(max_bytes_per_envelope_, max_bytes_total_);
      CHECK_LE(min_envelope_send_size_, max_bytes_per_envelope_);
    }

   private:
    friend class ShippingManager;
    size_t max_bytes_per_observation_;
    size_t max_bytes_per_envelope_;
    size_t max_bytes_total_;
    size_t min_envelope_send_size_;
  };

  // Use this constant instead of std::chrono::seconds::max() in
  // ScheduleParams below in order to effectively set the wait time to
  // infinity.
  static const std::chrono::seconds kMaxSeconds;

  // Parameters passed to the ShippingManager constructor that control its
  // behavior with respect to scheduling.
  //
  // schedule_interval: How frequently should ShippingManager perform regular
  // periodic sends to the Shuffler? Set to kMaxSeconds to effectively
  // disable periodic sends.
  //
  // min_interval: Because of expedited sends, ShippingManager may sometimes
  // send to the Shuffler more frequently than |schedule_interval|. This
  // parameter is a safety setting. ShippingManager will never perform two
  // sends within a single period of |min_interval| seconds.
  //
  // REQUIRED:
  // 0 <= min_interval <= schedule_interval <= kMaxSeconds
  class ScheduleParams {
   public:
    ScheduleParams(std::chrono::seconds schedule_interval,
                   std::chrono::seconds min_interval)
        : schedule_interval_(schedule_interval), min_interval_(min_interval) {
      CHECK_GE(min_interval.count(), 0);
      CHECK_LE(min_interval_.count(), schedule_interval_.count());
      CHECK_LE(schedule_interval.count(), kMaxSeconds.count());
    }

   private:
    friend class ShippingManager;
    std::chrono::seconds schedule_interval_;
    std::chrono::seconds min_interval_;
  };

  // Parameters passed to the ShippingManager constructor that will be used
  // to construct EnvelopeMakers. See the documentation of the
  // EnvelopeMaker constructor.
  class EnvelopeMakerParams {
   public:
    EnvelopeMakerParams(std::string analyzer_public_key_pem,
                        EncryptedMessage::EncryptionScheme analyzer_scheme,
                        std::string shuffler_public_key_pem,
                        EncryptedMessage::EncryptionScheme shuffler_scheme)
        : analyzer_public_key_pem_(analyzer_public_key_pem),
          analyzer_scheme_(analyzer_scheme),
          shuffler_public_key_pem_(shuffler_public_key_pem),
          shuffler_scheme_(shuffler_scheme) {}

   private:
    friend class ShippingManager;
    std::string analyzer_public_key_pem_;
    EncryptedMessage::EncryptionScheme analyzer_scheme_;
    std::string shuffler_public_key_pem_;
    EncryptedMessage::EncryptionScheme shuffler_scheme_;
  };

  // Parameters passed to the ShippingManager constructor that will be passed
  // to the method SendRetryer::SendToShuffler(). See the documentation of
  // that method.
  class SendRetryerParams {
   public:
    SendRetryerParams(std::chrono::seconds initial_rpc_deadline,
                      std::chrono::seconds deadline_per_send_attempt)
        : initial_rpc_deadline_(initial_rpc_deadline),
          deadline_per_send_attempt_(deadline_per_send_attempt) {}

   private:
    friend class ShippingManager;
    std::chrono::seconds initial_rpc_deadline_;
    std::chrono::seconds deadline_per_send_attempt_;
  };

  // Constructor
  //
  // size_params: These control the ShippingManager's behavior with respect to
  // the size of the data stored in memory and sent using gRPC.
  //
  // scheduling_params: These control the ShippingManager's behavior with
  // respect to scheduling sends.
  //
  // envelope_maker_params: Used when the ShippingManager needs to construct
  // and EnvelopeMaker.
  //
  // send_retryer_params: Used when the ShippingManager needs to invoke
  // SendRetryer::SendToShuffler().
  //
  // send_retryer: The instance of |SendRetryerInterface| encapsulated by
  // this ShippingManager. ShippingManager does not take ownership of
  // send_retryer which must outlive ShippingManager.
  ShippingManager(const SizeParams& size_params,
                  const ScheduleParams& scheduling_params,
                  const EnvelopeMakerParams& envelope_maker_params,
                  const SendRetryerParams send_retryer_params,
                  SendRetryerInterface* send_retryer);

  // The destructor will stop the worker thread and wait for it to stop
  // before exiting.
  ~ShippingManager();

  // Starts the worker thread. Destruct this object to stop the worker thread.
  // This method must be invoked exactly once.
  void Start();

  // The status of an AddObservation() call.
  enum Status {
    // AddObservation() succeeded.
    kOk = 0,

    // The Observation was not added because it is too big.
    kObservationTooBig,

    // The Observation was not added to the Envelope because the Shipping
    // manager has too large of a backlog of Observations that have not yet
    // been sent. In the current version we use a pure in-memory cache not
    // backed by a persistent cache and so we return this error if the in-memory
    // backlog gets too large. In later versions we will have a persistent
    // cache and so the threshold for returning this error will be much higher.
    kFull,

    // The ShippingManager is shutting down. No more Observations will be
    // accepted.
    kShutDown,

    // The Observation was not added to the Envelope because the encryption
    // failed. This should never happen.
    kEncryptionFailed
  };

  // Add |observation| and its associated |metadata| to the collection of
  // Observations controlled by this ShippingManager. Eventually the
  // ShippingManager's worker thread will use the |SendRetryer| to send
  // all of the accumulated, unsent Observations.
  Status AddObservation(const Observation& observation,
                        std::unique_ptr<ObservationMetadata> metadata);

  // Register a request with the ShippingManager for an expedited send.
  // The ShippingManager's worker thread will use the |SendRetryer| to send
  // all of the accumulated, unsent Observations as soon as possible but not
  // sooner than |min_interval| seconds after the previous send operation
  // has completed.
  void RequestSendSoon();

  using SendCallback = std::function<void(bool)>;

  // A version of RequestSendSoon() that provides feedback about the send.
  // |send_callback| will be invoked with the result of the requested send
  // attempt. More precisely, send_callback will be invoked after the
  // ShippingManager has attempted to send all of the Observations that were
  // added prior to the invocation of RequestSendSoon(). It will be invoked
  // with true if all such Observations were succesfully sent. It will be
  // invoked with false if some Observations were not able to be sent, but
  // the status of any particular Observation may not be determined. This
  // is useful mainly in tests.
  void RequestSendSoon(SendCallback send_callback);

  // Blocks for |max_wait| seconds or until the worker thread has
  // successfully sent all previously added Observations and is idle, waiting
  // for more Observations to be added. This method is most useful if it
  // can be arranged that there are no concurrent invocations of
  // AddObservation() (for example in a test) because such concurrent
  // invocations may cause the idle state to never be entered.
  void WaitUntilIdle(std::chrono::seconds max_wait);

  // Blocks for |max_wait| seconds or until the worker thread is in the state
  // where there are Observations to be sent but it is waiting for the
  // next scheduled send time. This method is most useful if it can be
  // arranged that there are no concurrent invocations of RequestSendSoon()
  // (for example in a test) because such concurrent invocations might cause
  // that state to never be entered.
  void WaitUntilWorkerWaiting(std::chrono::seconds max_wait);

  // Returns the active EnvelopeMaker via move leaving the active EnvelopeMaker
  // empty. This method is most likely only useful in a test.
  std::unique_ptr<EnvelopeMaker> TakeActiveEnvelopeMaker();

  // These diagnostic stats are mostly useful in a testing environment but
  // may possibly prove useful in production also.
  size_t num_send_attempts();
  size_t num_failed_attempts();
  grpc::Status last_send_status();

 private:
  // Has the ShippingManager been shut down?
  bool shut_down();

  // Causes the ShippingManager to shut down. Any active sends by the
  // SendRetryer will be canceled. All condition variables will be notified
  // in order to wake up any waiting threads. The worker thread will exit as
  // soon as it can.
  void ShutDown();

  // The main method run by the worker thread. Executes a loop that
  // exits when ShutDown() is invoked.
  void Run();

  // Helper method used by Run(). Does not assume mutex_ lock is held.
  void SendAllEnvelopes();

  // Helper method used by Run(). Does not assume mutex_ lock is held.
  void SendOneEnvelope(
      std::deque<std::unique_ptr<EnvelopeMaker>>* envelopes_that_failed);

  const SizeParams size_params_;

  // When the active EnvelopeMaker surpasses this size (in bytes) we invoke
  // RequestSendSoon(). This value is set to 0.6 * max_bytes_per_envelope_
  // so that it is unlikely that we ever need to return kFull because the
  // active Envelope is full.
  const size_t envelope_send_threshold_size_;

  // When the total amount of Observation data surpasses this size
  // we invoke RequestSendSoon(). This value is set to 0.6 * max_bytes_total_
  // so that it is unlikely that we ever need to return kFull because the
  // the total amount of Observation data is too great.
  const size_t total_bytes_send_threshold_;

  const ScheduleParams schedule_params_;
  const EnvelopeMakerParams envelope_maker_params_;
  const SendRetryerParams send_retryer_params_;

  SendRetryerInterface* send_retryer_;  // not owned

  // Variables accessed only by the worker thread. These are not
  // protected by a mutex.
  std::chrono::system_clock::time_point next_scheduled_send_time_;

  std::deque<std::unique_ptr<EnvelopeMaker>> envelopes_to_send_;
  send_retryer::CancelHandle cancel_handle_;

  // The background worker thread that runs the method "Run()."
  std::thread worker_thread_;

  // A struct that contains a mutex and all the fields we want to protect
  // with that mutex.
  struct MutexProtectedFields {
    // Protects access to all variables below.
    std::mutex mutex;

    // Variables accessed by the worker thread and the API
    // threads. These are protected by |mutex|.

    bool expedited_send_requested = false;

    // This is the EnvelopeMaker into which new Observations are added.
    std::unique_ptr<EnvelopeMaker> active_envelope_maker;

    // RequestSendSoon() enqueues callbacks here just in case
    // active_envelope_maker is not empty. These callbacks will be invoked
    // with the result of the next send attempt that includes
    // active_envelope_maker.
    std::vector<SendCallback> on_deck_send_callback_queue;

    // The queue of callbacks that will be invoked when the next send
    // attempt completes. The worker thread moves callbacks from
    // on_deck_send_callback_queue to this queue at the same time that
    // it picks up the active_envelope_maker. RequestSendSoon() enqueues
    // callbacks directly here rather than onto on_deck_send_callback_queue
    // just in case active_envelope_maker is empty but
    // envelopes_to_send_total_bytes != 0.
    std::vector<SendCallback> current_send_callback_queue;

    // Keeps track of the sum of the sizes of all Envelopes in
    // |envelopes_to_send_|.
    size_t envelopes_to_send_total_bytes = 0;

    // Set shut_down to true in order to stop "Run()".
    bool shut_down = false;

    // Setting this to true indicates that the total size of all Observations
    // currently stored in the ShippingManager is too large. We will stop
    // accepting any more Observations until this is set to false again.
    bool temporarily_full = false;

    // We initialize idle_ and waiting_for_schedule_ to true because initially
    // the worker thread isn't even started so WaitUntilIdle() and
    // WaitUntilWorkerWaiting() should return immediately if invoked. We will
    // set them to false in Start().
    bool idle = true;
    bool waiting_for_schedule = true;

    // These diagnostic stats are mostly useful in a testing environment but
    // may possibly prove useful in production also.
    size_t num_send_attempts = 0;
    size_t num_failed_attempts = 0;
    grpc::Status last_send_status;

    std::condition_variable add_observation_notifier;
    std::condition_variable expedited_send_notifier;
    std::condition_variable shutdown_notifier;
    std::condition_variable idle_notifier;
    std::condition_variable waiting_for_schedule_notifier;
  };

  // Constructing a LockedFields object constructs a std::unique_lock and
  // therefore locks the mutex in |fields|.
  struct LockedFields {
    explicit LockedFields(MutexProtectedFields* fields)
        : lock(fields->mutex), fields(fields) {}
    std::unique_lock<std::mutex> lock;
    MutexProtectedFields* fields;  // not owned.
  };

  // All of the fields that are accessed by multiple threads and therefore
  // need to be protected by a mutex. Do not access this variable directly.
  // Instead invoke the method lock() which returns a smart pointer to a
  // |LockedFields| that wraps this field. All access to this field should
  // be via the following idiom:
  //
  // auto locked = lock();
  // locked->fields->[field_name].
  MutexProtectedFields _mutex_protected_fields_do_not_access_directly_;

  // Provides access to the fields that are protected by a mutex while
  // acquiring a lock on the mutex. Destroy the returned std::unique_ptr to
  // release the mutex.
  std::unique_ptr<LockedFields> lock() {
    return std::unique_ptr<LockedFields>(
        new LockedFields(&_mutex_protected_fields_do_not_access_directly_));
  }

  // Does the work of TakeActiveEnvelopeMaker() and assumes that the
  // fields->mutex lock is held.
  std::unique_ptr<EnvelopeMaker> TakeActiveEnvelopeMakerLockHeld(
      MutexProtectedFields* fields);

  // Does the work of RequestSendSoon() and assumes that the fields->mutex lock
  // is held.
  void RequestSendSoonLockHeld(MutexProtectedFields* fields);

  // Helper method used by Run(). Assumes the fields->mutex lock is held.
  void PrepareForSendLockHeld(MutexProtectedFields* fields);
};

}  // namespace encoder
}  // namespace cobalt

#endif  // COBALT_ENCODER_SHIPPING_MANAGER_H_
