| // Copyright 2017 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #ifndef COBALT_ENCODER_SHIPPING_MANAGER_H_ |
| #define COBALT_ENCODER_SHIPPING_MANAGER_H_ |
| |
| #include <atomic> |
| #include <chrono> |
| #include <condition_variable> |
| #include <deque> |
| #include <memory> |
| #include <mutex> |
| #include <string> |
| #include <thread> |
| #include <vector> |
| |
| #include "./logging.h" |
| #include "encoder/envelope_maker.h" |
| #include "encoder/send_retryer.h" |
| #include "encoder/shuffler_client.h" |
| |
| namespace cobalt { |
| namespace encoder { |
| |
| using send_retryer::SendRetryerInterface; |
| |
| // ShippingManager is a central coordinator for collecting encoded Observations |
| // and sending them to the Shuffler. Observations are accumulated in memory |
| // and periodically sent in batches to the Shuffler by a background worker |
| // thread on a regular schedule. ShippingManager also performs expedited |
| // off-schedule sends when too much unsent Observation data has accumulated. |
| // A client may also explicitly request an expedited send. |
| // |
| // ShippingManager uses a SendRetryer to send Observations to the Shuffler so |
| // in case a send fails it will be retried multiple times with exponential |
| // back-off. |
| // |
| // ShippingManager uses gRPC to send to the Shuffler. The unit of data sent |
| // in a single gRPC request is the *Envelope*. ShippingManager will distribute |
| // the collection of Observations it controls into one or more Envelopes |
| // in order to try to achieve an efficient size Envelope to send. |
| // |
| // Usage: Construct a ShippingManager, invoke Start() once, and repeatedly |
| // invoke AddObservation(). After Start() has been invoked calls to |
| // AddObservation() are thread safe: It may be invoked concurrently |
| // by multiple threads. Optionally invoke RequestSendSoon() to expedite a send |
| // operation. |
| // |
| // Usually a single ShippingManager will be constructed for an entire |
| // client device and all applications running on that device that wish to use |
| // Cobalt to collect metrics will make use of this single instance. |
| class ShippingManager { |
| public: |
| // Parameters passed to the ShippingManager constructor that control its |
| // behavior with respect to the size of the data stored in memory and |
| // sent using gRPC. |
| class SizeParams { |
| public: |
| // max_bytes_per_observation: AddObservation() will return |
| // kObservationTooBig if the given Observation's serialized size is bigger |
| // than this. |
| |
| // max_bytes_per_envelope: When collecting Observations into Envelopes, |
| // ShippingManager will not build an Envelope larger than this size. Since |
| // ShippingManager sends a single Envelope in a gRPC request, this value |
| // should be used to ensure that ShippingManager does not exceed the |
| // maximum gRPC message size. |
| // |
| // max_bytes_total: ShippingManager will perform an expedited send if the |
| // size of the accumulated, unsent Observation data exceeds 60% of this |
| // value. If the size of the accumulated, unsent Observation data reaches |
| // this value then ShippingManager will not accept any more Observations: |
| // AddObservation() will return kFull, until ShippingManager is able to send |
| // the accumulated Observations to the Shuffler. |
| // |
| // min_envelope_send_size: ShippingManager will attempt to combine Envelopes |
| // with sizes smaller than this value (in bytes) into Envelopes whose size |
| // exceeds this value prior to sending to the Shuffler. |
| // |
| // REQUIRED: |
| // 0 <= max_bytes_per_observation <= max_bytes_per_envelope <= |
| // max_bytes_total |
| // 0 <= min_envelope_send_size <= max_bytes_per_envelope |
| SizeParams(size_t max_bytes_per_observation, size_t max_bytes_per_envelope, |
| size_t max_bytes_total, size_t min_envelope_send_size) |
| : max_bytes_per_observation_(max_bytes_per_observation), |
| max_bytes_per_envelope_(max_bytes_per_envelope), |
| max_bytes_total_(max_bytes_total), |
| min_envelope_send_size_(min_envelope_send_size) { |
| CHECK_LE(max_bytes_per_observation_, max_bytes_per_envelope_); |
| CHECK_LE(max_bytes_per_envelope_, max_bytes_total_); |
| CHECK_LE(min_envelope_send_size_, max_bytes_per_envelope_); |
| } |
| |
| private: |
| friend class ShippingManager; |
| size_t max_bytes_per_observation_; |
| size_t max_bytes_per_envelope_; |
| size_t max_bytes_total_; |
| size_t min_envelope_send_size_; |
| }; |
| |
| // Use this constant instead of std::chrono::seconds::max() in |
| // ScheduleParams below in order to effectively set the wait time to |
| // infinity. |
| static const std::chrono::seconds kMaxSeconds; |
| |
| // Parameters passed to the ShippingManager constructor that control its |
| // behavior with respect to scheduling. |
| // |
| // schedule_interval: How frequently should ShippingManager perform regular |
| // periodic sends to the Shuffler? Set to kMaxSeconds to effectively |
| // disable periodic sends. |
| // |
| // min_interval: Because of expedited sends, ShippingManager may sometimes |
| // send to the Shuffler more frequently than |schedule_interval|. This |
| // parameter is a safety setting. ShippingManager will never perform two |
| // sends within a single period of |min_interval| seconds. |
| // |
| // REQUIRED: |
| // 0 <= min_interval <= schedule_interval <= kMaxSeconds |
| class ScheduleParams { |
| public: |
| ScheduleParams(std::chrono::seconds schedule_interval, |
| std::chrono::seconds min_interval) |
| : schedule_interval_(schedule_interval), min_interval_(min_interval) { |
| CHECK_GE(min_interval.count(), 0); |
| CHECK_LE(min_interval_.count(), schedule_interval_.count()); |
| CHECK_LE(schedule_interval.count(), kMaxSeconds.count()); |
| } |
| |
| private: |
| friend class ShippingManager; |
| std::chrono::seconds schedule_interval_; |
| std::chrono::seconds min_interval_; |
| }; |
| |
| // Parameters passed to the ShippingManager constructor that will be used |
| // to construct EnvelopeMakers. See the documentation of the |
| // EnvelopeMaker constructor. |
| class EnvelopeMakerParams { |
| public: |
| EnvelopeMakerParams(std::string analyzer_public_key_pem, |
| EncryptedMessage::EncryptionScheme analyzer_scheme, |
| std::string shuffler_public_key_pem, |
| EncryptedMessage::EncryptionScheme shuffler_scheme) |
| : analyzer_public_key_pem_(analyzer_public_key_pem), |
| analyzer_scheme_(analyzer_scheme), |
| shuffler_public_key_pem_(shuffler_public_key_pem), |
| shuffler_scheme_(shuffler_scheme) {} |
| |
| private: |
| friend class ShippingManager; |
| std::string analyzer_public_key_pem_; |
| EncryptedMessage::EncryptionScheme analyzer_scheme_; |
| std::string shuffler_public_key_pem_; |
| EncryptedMessage::EncryptionScheme shuffler_scheme_; |
| }; |
| |
| // Parameters passed to the ShippingManager constructor that will be passed |
| // to the method SendRetryer::SendToShuffler(). See the documentation of |
| // that method. |
| class SendRetryerParams { |
| public: |
| SendRetryerParams(std::chrono::seconds initial_rpc_deadline, |
| std::chrono::seconds deadline_per_send_attempt) |
| : initial_rpc_deadline_(initial_rpc_deadline), |
| deadline_per_send_attempt_(deadline_per_send_attempt) {} |
| |
| private: |
| friend class ShippingManager; |
| std::chrono::seconds initial_rpc_deadline_; |
| std::chrono::seconds deadline_per_send_attempt_; |
| }; |
| |
| // Constructor |
| // |
| // size_params: These control the ShippingManager's behavior with respect to |
| // the size of the data stored in memory and sent using gRPC. |
| // |
| // scheduling_params: These control the ShippingManager's behavior with |
| // respect to scheduling sends. |
| // |
| // envelope_maker_params: Used when the ShippingManager needs to construct |
| // and EnvelopeMaker. |
| // |
| // send_retryer_params: Used when the ShippingManager needs to invoke |
| // SendRetryer::SendToShuffler(). |
| // |
| // send_retryer: The instance of |SendRetryerInterface| encapsulated by |
| // this ShippingManager. ShippingManager does not take ownership of |
| // send_retryer which must outlive ShippingManager. |
| ShippingManager(const SizeParams& size_params, |
| const ScheduleParams& scheduling_params, |
| const EnvelopeMakerParams& envelope_maker_params, |
| const SendRetryerParams send_retryer_params, |
| SendRetryerInterface* send_retryer); |
| |
| // The destructor will stop the worker thread and wait for it to stop |
| // before exiting. |
| ~ShippingManager(); |
| |
| // Starts the worker thread. Destruct this object to stop the worker thread. |
| // This method must be invoked exactly once. |
| void Start(); |
| |
| // The status of an AddObservation() call. |
| enum Status { |
| // AddObservation() succeeded. |
| kOk = 0, |
| |
| // The Observation was not added because it is too big. |
| kObservationTooBig, |
| |
| // The Observation was not added to the Envelope because the Shipping |
| // manager has too large of a backlog of Observations that have not yet |
| // been sent. In the current version we use a pure in-memory cache not |
| // backed by a persistent cache and so we return this error if the in-memory |
| // backlog gets too large. In later versions we will have a persistent |
| // cache and so the threshold for returning this error will be much higher. |
| kFull, |
| |
| // The ShippingManager is shutting down. No more Observations will be |
| // accepted. |
| kShutDown, |
| |
| // The Observation was not added to the Envelope because the encryption |
| // failed. This should never happen. |
| kEncryptionFailed |
| }; |
| |
| // Add |observation| and its associated |metadata| to the collection of |
| // Observations controlled by this ShippingManager. Eventually the |
| // ShippingManager's worker thread will use the |SendRetryer| to send |
| // all of the accumulated, unsent Observations. |
| Status AddObservation(const Observation& observation, |
| std::unique_ptr<ObservationMetadata> metadata); |
| |
| // Register a request with the ShippingManager for an expedited send. |
| // The ShippingManager's worker thread will use the |SendRetryer| to send |
| // all of the accumulated, unsent Observations as soon as possible but not |
| // sooner than |min_interval| seconds after the previous send operation |
| // has completed. |
| void RequestSendSoon(); |
| |
| using SendCallback = std::function<void(bool)>; |
| |
| // A version of RequestSendSoon() that provides feedback about the send. |
| // |send_callback| will be invoked with the result of the requested send |
| // attempt. More precisely, send_callback will be invoked after the |
| // ShippingManager has attempted to send all of the Observations that were |
| // added prior to the invocation of RequestSendSoon(). It will be invoked |
| // with true if all such Observations were succesfully sent. It will be |
| // invoked with false if some Observations were not able to be sent, but |
| // the status of any particular Observation may not be determined. This |
| // is useful mainly in tests. |
| void RequestSendSoon(SendCallback send_callback); |
| |
| // Blocks for |max_wait| seconds or until the worker thread has |
| // successfully sent all previously added Observations and is idle, waiting |
| // for more Observations to be added. This method is most useful if it |
| // can be arranged that there are no concurrent invocations of |
| // AddObservation() (for example in a test) because such concurrent |
| // invocations may cause the idle state to never be entered. |
| void WaitUntilIdle(std::chrono::seconds max_wait); |
| |
| // Blocks for |max_wait| seconds or until the worker thread is in the state |
| // where there are Observations to be sent but it is waiting for the |
| // next scheduled send time. This method is most useful if it can be |
| // arranged that there are no concurrent invocations of RequestSendSoon() |
| // (for example in a test) because such concurrent invocations might cause |
| // that state to never be entered. |
| void WaitUntilWorkerWaiting(std::chrono::seconds max_wait); |
| |
| // Returns the active EnvelopeMaker via move leaving the active EnvelopeMaker |
| // empty. This method is most likely only useful in a test. |
| std::unique_ptr<EnvelopeMaker> TakeActiveEnvelopeMaker(); |
| |
| // These diagnostic stats are mostly useful in a testing environment but |
| // may possibly prove useful in production also. |
| size_t num_send_attempts(); |
| size_t num_failed_attempts(); |
| grpc::Status last_send_status(); |
| |
| private: |
| // Has the ShippingManager been shut down? |
| bool shut_down(); |
| |
| // Causes the ShippingManager to shut down. Any active sends by the |
| // SendRetryer will be canceled. All condition variables will be notified |
| // in order to wake up any waiting threads. The worker thread will exit as |
| // soon as it can. |
| void ShutDown(); |
| |
| // The main method run by the worker thread. Executes a loop that |
| // exits when ShutDown() is invoked. |
| void Run(); |
| |
| // Helper method used by Run(). Does not assume mutex_ lock is held. |
| void SendAllEnvelopes(); |
| |
| // Helper method used by Run(). Does not assume mutex_ lock is held. |
| void SendOneEnvelope( |
| std::deque<std::unique_ptr<EnvelopeMaker>>* envelopes_that_failed); |
| |
| const SizeParams size_params_; |
| |
| // When the active EnvelopeMaker surpasses this size (in bytes) we invoke |
| // RequestSendSoon(). This value is set to 0.6 * max_bytes_per_envelope_ |
| // so that it is unlikely that we ever need to return kFull because the |
| // active Envelope is full. |
| const size_t envelope_send_threshold_size_; |
| |
| // When the total amount of Observation data surpasses this size |
| // we invoke RequestSendSoon(). This value is set to 0.6 * max_bytes_total_ |
| // so that it is unlikely that we ever need to return kFull because the |
| // the total amount of Observation data is too great. |
| const size_t total_bytes_send_threshold_; |
| |
| const ScheduleParams schedule_params_; |
| const EnvelopeMakerParams envelope_maker_params_; |
| const SendRetryerParams send_retryer_params_; |
| |
| SendRetryerInterface* send_retryer_; // not owned |
| |
| // Variables accessed only by the worker thread. These are not |
| // protected by a mutex. |
| std::chrono::system_clock::time_point next_scheduled_send_time_; |
| |
| std::deque<std::unique_ptr<EnvelopeMaker>> envelopes_to_send_; |
| send_retryer::CancelHandle cancel_handle_; |
| |
| // The background worker thread that runs the method "Run()." |
| std::thread worker_thread_; |
| |
| // A struct that contains a mutex and all the fields we want to protect |
| // with that mutex. |
| struct MutexProtectedFields { |
| // Protects access to all variables below. |
| std::mutex mutex; |
| |
| // Variables accessed by the worker thread and the API |
| // threads. These are protected by |mutex|. |
| |
| bool expedited_send_requested = false; |
| |
| // This is the EnvelopeMaker into which new Observations are added. |
| std::unique_ptr<EnvelopeMaker> active_envelope_maker; |
| |
| // RequestSendSoon() enqueues callbacks here just in case |
| // active_envelope_maker is not empty. These callbacks will be invoked |
| // with the result of the next send attempt that includes |
| // active_envelope_maker. |
| std::vector<SendCallback> on_deck_send_callback_queue; |
| |
| // The queue of callbacks that will be invoked when the next send |
| // attempt completes. The worker thread moves callbacks from |
| // on_deck_send_callback_queue to this queue at the same time that |
| // it picks up the active_envelope_maker. RequestSendSoon() enqueues |
| // callbacks directly here rather than onto on_deck_send_callback_queue |
| // just in case active_envelope_maker is empty but |
| // envelopes_to_send_total_bytes != 0. |
| std::vector<SendCallback> current_send_callback_queue; |
| |
| // Keeps track of the sum of the sizes of all Envelopes in |
| // |envelopes_to_send_|. |
| size_t envelopes_to_send_total_bytes = 0; |
| |
| // Set shut_down to true in order to stop "Run()". |
| bool shut_down = false; |
| |
| // Setting this to true indicates that the total size of all Observations |
| // currently stored in the ShippingManager is too large. We will stop |
| // accepting any more Observations until this is set to false again. |
| bool temporarily_full = false; |
| |
| // We initialize idle_ and waiting_for_schedule_ to true because initially |
| // the worker thread isn't even started so WaitUntilIdle() and |
| // WaitUntilWorkerWaiting() should return immediately if invoked. We will |
| // set them to false in Start(). |
| bool idle = true; |
| bool waiting_for_schedule = true; |
| |
| // These diagnostic stats are mostly useful in a testing environment but |
| // may possibly prove useful in production also. |
| size_t num_send_attempts = 0; |
| size_t num_failed_attempts = 0; |
| grpc::Status last_send_status; |
| |
| std::condition_variable add_observation_notifier; |
| std::condition_variable expedited_send_notifier; |
| std::condition_variable shutdown_notifier; |
| std::condition_variable idle_notifier; |
| std::condition_variable waiting_for_schedule_notifier; |
| }; |
| |
| // Constructing a LockedFields object constructs a std::unique_lock and |
| // therefore locks the mutex in |fields|. |
| struct LockedFields { |
| explicit LockedFields(MutexProtectedFields* fields) |
| : lock(fields->mutex), fields(fields) {} |
| std::unique_lock<std::mutex> lock; |
| MutexProtectedFields* fields; // not owned. |
| }; |
| |
| // All of the fields that are accessed by multiple threads and therefore |
| // need to be protected by a mutex. Do not access this variable directly. |
| // Instead invoke the method lock() which returns a smart pointer to a |
| // |LockedFields| that wraps this field. All access to this field should |
| // be via the following idiom: |
| // |
| // auto locked = lock(); |
| // locked->fields->[field_name]. |
| MutexProtectedFields _mutex_protected_fields_do_not_access_directly_; |
| |
| // Provides access to the fields that are protected by a mutex while |
| // acquiring a lock on the mutex. Destroy the returned std::unique_ptr to |
| // release the mutex. |
| std::unique_ptr<LockedFields> lock() { |
| return std::unique_ptr<LockedFields>( |
| new LockedFields(&_mutex_protected_fields_do_not_access_directly_)); |
| } |
| |
| // Does the work of TakeActiveEnvelopeMaker() and assumes that the |
| // fields->mutex lock is held. |
| std::unique_ptr<EnvelopeMaker> TakeActiveEnvelopeMakerLockHeld( |
| MutexProtectedFields* fields); |
| |
| // Does the work of RequestSendSoon() and assumes that the fields->mutex lock |
| // is held. |
| void RequestSendSoonLockHeld(MutexProtectedFields* fields); |
| |
| // Helper method used by Run(). Assumes the fields->mutex lock is held. |
| void PrepareForSendLockHeld(MutexProtectedFields* fields); |
| }; |
| |
| } // namespace encoder |
| } // namespace cobalt |
| |
| #endif // COBALT_ENCODER_SHIPPING_MANAGER_H_ |