blob: ab2053b3f8869cc6c8a94500ab4765bbea67152b [file] [log] [blame]
// Copyright 2017 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include <mutex>
#include <utility>
#include "./logging.h"
#include "encoder/shipping_manager.h"
namespace cobalt {
namespace encoder {
namespace {
std::string ToString(const std::chrono::system_clock::time_point& t) {
std::time_t time_struct = std::chrono::system_clock::to_time_t(t);
return std::ctime(&time_struct);
}
} // namespace
// Definition of the static constant declared in shipping_manager.h.
// This must be less than 2^31. There appears to be a bug in
// std::condition_variable::wait_for() in which setting the wait time to
// std::chrono::seconds::max() effectively sets the wait time to zero.
const std::chrono::seconds ShippingManager::kMaxSeconds(999999999);
ShippingManager::ShippingManager(
const SizeParams& size_params, const ScheduleParams& schedule_params,
const EnvelopeMakerParams& envelope_maker_params,
const SendRetryerParams send_retryer_params,
SendRetryerInterface* send_retryer)
: size_params_(size_params),
envelope_send_threshold_size_(
size_t(0.6 * size_params.max_bytes_per_envelope_)),
total_bytes_send_threshold_(size_t(0.6 * size_params.max_bytes_total_)),
schedule_params_(schedule_params),
envelope_maker_params_(envelope_maker_params),
send_retryer_params_(send_retryer_params),
send_retryer_(send_retryer),
next_scheduled_send_time_(std::chrono::system_clock::now() +
schedule_params_.schedule_interval_) {
CHECK(send_retryer);
_mutex_protected_fields_do_not_access_directly_.active_envelope_maker.reset(
new EnvelopeMaker(envelope_maker_params.analyzer_public_key_pem_,
envelope_maker_params.analyzer_scheme_,
envelope_maker_params.shuffler_public_key_pem_,
envelope_maker_params.shuffler_scheme_,
size_params.max_bytes_per_observation_,
size_params_.max_bytes_per_envelope_));
}
ShippingManager::~ShippingManager() {
if (!worker_thread_.joinable()) {
return;
}
ShutDown();
VLOG(4) << "ShippingManager waiting for worker thread to exit...";
worker_thread_.join();
}
void ShippingManager::Start() {
{
// We set idle and waiting_for_schedule to false since we are about to
// start the worker thread. The worker thread will set these variables
// to true at the appropriate times.
auto locked = lock();
locked->fields->idle = false;
locked->fields->waiting_for_schedule = false;
}
std::thread t([this] { this->Run(); });
worker_thread_ = std::move(t);
}
ShippingManager::Status ShippingManager::AddObservation(
const Observation& observation,
std::unique_ptr<ObservationMetadata> metadata) {
auto locked = lock();
if (locked->fields->shut_down) {
return kShutDown;
}
if (locked->fields->temporarily_full) {
// Not just the current EnvelopeMaker, but the ShippingManager in
// general is full. This should be very rare. Unless there is a problem
// with sending Observations to the server we should never be full.
// The dynamics of when this might happen will be different once we
// implement local persistence of Observations.
return kFull;
}
switch (locked->fields->active_envelope_maker->AddObservation(
observation, std::move(metadata))) {
case EnvelopeMaker::kOk:
VLOG(4) << "ShippingManager::AddObservation: OK";
// Set idle_ false because any thread that invokes WaitUntilIdle() after
// this should wait until the Observatoin just added has been sent.
locked->fields->idle = false;
break;
case EnvelopeMaker::kObservationTooBig:
return kObservationTooBig;
case EnvelopeMaker::kEnvelopeFull:
// This should be very rare because of the fact that we invoke
// RequestSendSoon() below when size() >= envelope_send_threshold_size_.
// This should prevent us from getting to the point that the active
// EnvelopeMaker is ever full.
RequestSendSoonLockHeld(locked->fields);
return kFull;
case EnvelopeMaker::kEncryptionFailed:
return kEncryptionFailed;
}
if (locked->fields->active_envelope_maker->size() >=
envelope_send_threshold_size_) {
// The active EnvelopeMaker is starting to get too large. Initiate
// a send.
RequestSendSoonLockHeld(locked->fields);
}
size_t new_total_bytes = locked->fields->envelopes_to_send_total_bytes +
locked->fields->active_envelope_maker->size();
if (new_total_bytes > total_bytes_send_threshold_) {
RequestSendSoonLockHeld(locked->fields);
if (new_total_bytes > size_params_.max_bytes_total_) {
// Not just the current EnvelopeMaker, but the ShippingManager in general
// is now temporarily full. This should be very rare. Unless there is
// a problem with sending Observations to the server we should never
// be full.
locked->fields->temporarily_full = true;
}
}
locked->fields->add_observation_notifier.notify_all();
return kOk;
}
// The caller must hold a lock on mutex_.
void ShippingManager::RequestSendSoonLockHeld(MutexProtectedFields* fields) {
VLOG(6) << "ShippingManager::RequestSendSoonLockHeld()";
fields->expedited_send_requested = true;
fields->expedited_send_notifier.notify_all();
// We set waiting_for_schedule_ false here so that if the calling thread
// invokes WaitUntilWorkerWaiting() after this then it will be waiting
// for a *subsequent* time that the worker thread enters the
// waiting-for-schedule state.
fields->waiting_for_schedule = false;
}
void ShippingManager::RequestSendSoon() { RequestSendSoon(SendCallback()); }
void ShippingManager::RequestSendSoon(SendCallback send_callback) {
VLOG(4) << "ShippingManager: Expedited send requested.";
auto locked = lock();
RequestSendSoonLockHeld(locked->fields);
// If we were given a SendCallback then do one of three things...
if (send_callback) {
if (locked->fields->active_envelope_maker->size() > 0) {
// If the active EnvelopeMaker is not empty put the SendCallback
// onto the on-deck queue so that it gets invoked after the next
// send that includes the active EnvelopeMaker.
locked->fields->on_deck_send_callback_queue.push_back(send_callback);
} else if (locked->fields->envelopes_to_send_total_bytes > 0) {
// If the active EnvelopeMaker is empty but the worker thread has
// some EnvelopeMakers it is currently dealing with then put the
// SendCallback directly onto the current queue so that it gets invoked
// after the next send attempt.
locked->fields->current_send_callback_queue.push_back(send_callback);
} else {
// Otherwise the ShippingManager has no Observations so invoke the
// SendCallback immediately and clear expedited_send_requested.
locked->fields->expedited_send_requested = false;
send_callback(true);
}
}
}
bool ShippingManager::shut_down() { return lock()->fields->shut_down; }
void ShippingManager::ShutDown() {
{
auto locked = lock();
cancel_handle_.TryCancel();
locked->fields->shut_down = true;
locked->fields->shutdown_notifier.notify_all();
locked->fields->add_observation_notifier.notify_all();
locked->fields->expedited_send_notifier.notify_all();
locked->fields->idle_notifier.notify_all();
locked->fields->waiting_for_schedule_notifier.notify_all();
}
VLOG(4) << "ShippingManager: shut-down requested.";
}
std::unique_ptr<EnvelopeMaker> ShippingManager::TakeActiveEnvelopeMaker() {
auto locked = lock();
return TakeActiveEnvelopeMakerLockHeld(locked->fields);
}
size_t ShippingManager::num_send_attempts() {
auto locked = lock();
return locked->fields->num_send_attempts;
}
size_t ShippingManager::num_failed_attempts() {
auto locked = lock();
return locked->fields->num_failed_attempts;
}
grpc::Status ShippingManager::last_send_status() {
auto locked = lock();
return locked->fields->last_send_status;
}
void ShippingManager::Run() {
while (true) {
auto locked = lock();
if (locked->fields->shut_down) {
return;
}
// We start each iteration of the loop with a sleep of
// schedule_params_.min_interval.
// This ensures that we never send twice within one
// schedule_params_.min_interval period.
// Sleep for schedule_params_.min_interval or until shut_down_.
VLOG(4) << "ShippingManager worker: sleeping for "
<< schedule_params_.min_interval_.count() << " seconds.";
locked->fields->shutdown_notifier.wait_for(
locked->lock, schedule_params_.min_interval_,
[&locked] { return (locked->fields->shut_down); });
VLOG(4) << "ShippingManager worker: waking up from sleep. shut_down_="
<< locked->fields->shut_down;
if (locked->fields->shut_down) {
return;
}
if (locked->fields->active_envelope_maker->Empty() &&
locked->fields->envelopes_to_send_total_bytes == 0) {
// There are no Observations at all in the ShippingManager. Wait
// forever until notified that one arrived or shut down.
VLOG(4) << "ShippingManager worker: waiting for an Observation to "
"arrive.";
locked->fields->idle = true;
locked->fields->idle_notifier.notify_all();
locked->fields->add_observation_notifier.wait(locked->lock, [&locked] {
return (locked->fields->shut_down ||
!locked->fields->active_envelope_maker->Empty());
});
locked->fields->idle = false;
} else {
auto now = std::chrono::system_clock::now();
VLOG(4) << "now: " << ToString(now) << " next_scheduled_send_time_: "
<< ToString(next_scheduled_send_time_);
if (next_scheduled_send_time_ <= now ||
locked->fields->expedited_send_requested) {
VLOG(4) << "ShippingManager worker: time to send now.";
PrepareForSendLockHeld(locked->fields);
locked->lock.unlock();
SendAllEnvelopes();
next_scheduled_send_time_ = std::chrono::system_clock::now() +
schedule_params_.schedule_interval_;
locked->lock.lock();
} else {
// Wait until the next scheduled send time or until notified of
// a new request for an expedited send or we are shut down.
VLOG(4) << "ShippingManager worker: waiting "
<< schedule_params_.schedule_interval_.count()
<< " seconds for next scheduled send.";
locked->fields->waiting_for_schedule = true;
locked->fields->waiting_for_schedule_notifier.notify_all();
locked->fields->expedited_send_notifier.wait_until(
locked->lock, next_scheduled_send_time_, [&locked] {
return (locked->fields->shut_down ||
locked->fields->expedited_send_requested);
});
locked->fields->waiting_for_schedule = false;
}
}
}
}
// A lock on mutex_ should be held by the caller.
std::unique_ptr<EnvelopeMaker> ShippingManager::TakeActiveEnvelopeMakerLockHeld(
MutexProtectedFields* fields) {
std::unique_ptr<EnvelopeMaker> latest_envelope_maker(
new EnvelopeMaker(envelope_maker_params_.analyzer_public_key_pem_,
envelope_maker_params_.analyzer_scheme_,
envelope_maker_params_.shuffler_public_key_pem_,
envelope_maker_params_.shuffler_scheme_,
size_params_.max_bytes_per_observation_,
size_params_.max_bytes_per_envelope_));
fields->active_envelope_maker.swap(latest_envelope_maker);
return latest_envelope_maker;
}
// A lock on mutex_ should be held by the caller.
void ShippingManager::PrepareForSendLockHeld(MutexProtectedFields* fields) {
fields->expedited_send_requested = false;
auto latest_envelope_maker = TakeActiveEnvelopeMakerLockHeld(fields);
fields->envelopes_to_send_total_bytes += latest_envelope_maker->size();
envelopes_to_send_.emplace_front(std::move(latest_envelope_maker));
// Copy on_deck_send_callback_queue onto the end of
// current_send_callback_queue and then clear current_send_callback_queue.
fields->current_send_callback_queue.insert(
fields->current_send_callback_queue.end(),
fields->on_deck_send_callback_queue.begin(),
fields->on_deck_send_callback_queue.end());
fields->on_deck_send_callback_queue.clear();
}
void ShippingManager::SendAllEnvelopes() {
std::deque<std::unique_ptr<EnvelopeMaker>> envelopes_that_failed;
while (!envelopes_to_send_.empty()) {
SendOneEnvelope(&envelopes_that_failed);
}
bool success = envelopes_that_failed.empty();
envelopes_to_send_ = std::move(envelopes_that_failed);
size_t envelopes_to_send_total_bytes = 0;
for (const auto& env : envelopes_to_send_) {
envelopes_to_send_total_bytes += env->size();
}
VLOG(5) << "ShippingManager: envelopes_to_send_total_bytes="
<< envelopes_to_send_total_bytes;
std::vector<SendCallback> callbacks_to_invoke;
{
auto locked = lock();
locked->fields->envelopes_to_send_total_bytes =
envelopes_to_send_total_bytes;
if (envelopes_to_send_total_bytes +
locked->fields->active_envelope_maker->size() <
size_params_.max_bytes_total_) {
locked->fields->temporarily_full = false;
}
callbacks_to_invoke.swap(locked->fields->current_send_callback_queue);
}
for (SendCallback& callback : callbacks_to_invoke) {
callback(success);
}
}
void ShippingManager::SendOneEnvelope(
std::deque<std::unique_ptr<EnvelopeMaker>>* envelopes_that_failed) {
std::unique_ptr<EnvelopeMaker> envelope_to_send;
size_t envelope_to_send_size = 0;
while (!envelopes_to_send_.empty() &&
envelope_to_send_size < size_params_.min_envelope_send_size_ &&
envelope_to_send_size + envelopes_to_send_.front()->size() <=
size_params_.max_bytes_per_envelope_) {
std::unique_ptr<EnvelopeMaker> next = std::move(envelopes_to_send_.front());
envelopes_to_send_.pop_front();
if (!envelope_to_send) {
envelope_to_send = std::move(next);
} else {
envelope_to_send->MergeOutOf(next.get());
}
envelope_to_send_size = envelope_to_send->size();
}
if (!envelope_to_send || envelope_to_send_size == 0) {
VLOG(3) << "ShippingManager worker: There are no Observations to send.";
return;
}
EncryptedMessage encrypted_envelope;
if (!envelope_to_send->MakeEncryptedEnvelope(&encrypted_envelope)) {
// TODO(rudominer) log
// Drop on floor.
return;
}
VLOG(5) << "ShippingManager worker: Sending Envelope of size "
<< envelope_to_send->size() << " bytes.";
auto status = send_retryer_->SendToShuffler(
send_retryer_params_.initial_rpc_deadline_,
send_retryer_params_.deadline_per_send_attempt_, &cancel_handle_,
encrypted_envelope);
{
auto locked = lock();
locked->fields->num_send_attempts++;
if (!status.ok()) {
locked->fields->num_failed_attempts++;
}
locked->fields->last_send_status = status;
}
if (status.ok()) {
VLOG(4) << "ShippingManager::SendOneEnvelope: OK";
return;
}
VLOG(1) << "Cobalt send to Shuffler failed: (" << status.error_code() << ") "
<< status.error_message()
<< ". Observations have been re-enqueued for later.";
envelopes_that_failed->emplace_back(std::move(envelope_to_send));
}
void ShippingManager::WaitUntilIdle(std::chrono::seconds max_wait) {
auto locked = lock();
if (locked->fields->shut_down || locked->fields->idle) {
return;
}
locked->fields->idle_notifier.wait_for(locked->lock, max_wait, [&locked] {
return (locked->fields->shut_down || locked->fields->idle);
});
}
void ShippingManager::WaitUntilWorkerWaiting(std::chrono::seconds max_wait) {
auto locked = lock();
if (locked->fields->shut_down || locked->fields->waiting_for_schedule) {
return;
}
locked->fields->waiting_for_schedule_notifier.wait_for(
locked->lock, max_wait, [&locked] {
return (locked->fields->shut_down ||
locked->fields->waiting_for_schedule);
});
}
} // namespace encoder
} // namespace cobalt