blob: 71186511697f170bd29b6f299c63502b15015599 [file] [log] [blame]
// Copyright 2017 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "src/uploader/shipping_manager.h"
#include <mutex>
#include <utility>
#include "src/lib/util/protected_fields.h"
#include "src/logger/internal_metrics_config.cb.h"
#include "src/logger/logger_interface.h"
#include "src/logging.h"
#include "src/pb/clearcut_extensions.pb.h"
#include "src/public/lib/registry_identifiers.h"
#include "third_party/protobuf/src/google/protobuf/util/delimited_message_util.h"
namespace cobalt::uploader {
using observation_store::ObservationStore;
using EnvelopeHolder = ObservationStore::EnvelopeHolder;
using cobalt::clearcut_extensions::LogEventExtension;
namespace {
// The number of upload failures after which ShippingManager will bail out of an
// invocation of SendAllEnvelopes().
constexpr size_t kMaxFailuresWithoutSuccess = 3;
std::string ToString(const std::chrono::system_clock::time_point& t) {
std::time_t time_struct = std::chrono::system_clock::to_time_t(t);
return std::ctime(&time_struct);
}
} // namespace
ShippingManager::ShippingManager(const UploadScheduler& upload_scheduler,
ObservationStore& observation_store,
util::EncryptedMessageMaker* encrypt_to_analyzer,
std::unique_ptr<ActivityListenerInterface> activity_listener)
: encrypt_to_analyzer_(encrypt_to_analyzer),
upload_scheduler_(upload_scheduler),
next_scheduled_send_time_(std::chrono::system_clock::now() + upload_scheduler_.Interval()),
activity_listener_(std::move(activity_listener)),
protected_fields_(observation_store) {
if (activity_listener_ != nullptr) {
activity_listener_->Start([this](ActivityState state) {
auto locked = protected_fields_.lock();
if (state != ACTIVE && locked->waiting_for_idle_device) {
locked->waiting_for_idle_device = false;
RequestSendSoonLockHeld(&locked);
}
});
}
}
ShippingManager::~ShippingManager() {
if (worker_thread_.joinable()) {
LOG(FATAL) << "worker_thread should be stopped in sub classes. Please call ShutDownAndJoin in "
"the sub class destructor.";
}
}
void ShippingManager::ShutDownAndJoin() {
if (!worker_thread_.joinable()) {
return;
}
ShutDown();
VLOG(4) << "ShippingManager destructor: waiting for worker thread to exit...";
worker_thread_.join();
}
void ShippingManager::Start() {
{
// We set idle and waiting_for_schedule to false since we are about to
// start the worker thread. The worker thread will set these variables
// to true at the appropriate times.
auto locked = protected_fields_.lock();
locked->idle = false;
locked->waiting_for_schedule = false;
}
std::thread t([this] { this->Run(); });
worker_thread_ = std::move(t);
}
void ShippingManager::NotifyObservationsAdded() {
auto locked = protected_fields_.lock();
if (locked->observation_store.IsAlmostFull()) {
VLOG(4) << name()
<< ": NotifyObservationsAdded(): observation_store "
"IsAlmostFull.";
RequestSendSoonLockHeld(&locked);
}
if (!locked->observation_store.Empty()) {
// Set idle false because any thread that invokes WaitUntilIdle() after this
// should wait until the Observation just added has been sent.
locked->idle = false;
locked->add_observation_notifier.notify_all();
}
}
// The caller must hold a lock on mutex_.
void ShippingManager::RequestSendSoonLockHeld(ShippingManager::Fields::LockedFieldsPtr* fields) {
auto& f = *fields;
VLOG(5) << name() << ":RequestSendSoonLockHeld()";
f->expedited_send_requested = true;
f->expedited_send_notifier.notify_all();
// We set waiting_for_schedule_ false here so that if the calling thread
// invokes WaitUntilWorkerWaiting() after this then it will be waiting
// for a *subsequent* time that the worker thread enters the
// waiting-for-schedule state.
f->waiting_for_schedule = false;
}
void ShippingManager::RequestSendSoon() { RequestSendSoon(SendCallback()); }
void ShippingManager::RequestSendSoon(const SendCallback& send_callback) {
VLOG(4) << name() << ": Expedited send requested.";
auto locked = protected_fields_.lock();
RequestSendSoonLockHeld(&locked);
// If we were given a SendCallback then do one of two things...
if (send_callback) {
if (locked->observation_store.Empty() && locked->idle) {
// If the ObservationStore is empty and the ShippingManager is idle. Then
// we can safely invoke the SendCallback immediately.
locked->expedited_send_requested = false;
VLOG(5) << name()
<< "::RequestSendSoon. Not waiting because there are no "
"observations. Invoking callback(true) now.";
send_callback(true);
} else {
// Otherwise, we should put the callback into the send callback queue.
locked->send_callback_queue.push_back(send_callback);
}
}
}
bool ShippingManager::shut_down() const { return protected_fields_.const_lock()->shut_down; }
bool ShippingManager::device_active() {
if (activity_listener_ == nullptr) {
return false;
}
return activity_listener_->state() == ACTIVE;
}
void ShippingManager::ShutDown() {
{
auto locked = protected_fields_.lock();
locked->shut_down = true;
locked->shutdown_notifier.notify_all();
locked->add_observation_notifier.notify_all();
locked->expedited_send_notifier.notify_all();
locked->idle_notifier.notify_all();
locked->waiting_for_schedule_notifier.notify_all();
locked->waiting_for_idle_device_notifier.notify_all();
}
VLOG(4) << name() << ": shut-down requested.";
}
size_t ShippingManager::num_send_attempts() const {
return protected_fields_.const_lock()->num_send_attempts;
}
size_t ShippingManager::num_failed_attempts() const {
return protected_fields_.const_lock()->num_failed_attempts;
}
cobalt::Status ShippingManager::last_send_status() const {
return protected_fields_.const_lock()->last_send_status;
}
size_t ShippingManager::num_delayed_sends() const {
return protected_fields_.const_lock()->num_delayed_sends;
}
void ShippingManager::Disable(bool is_disabled) {
LOG(INFO) << name() << ": " << (is_disabled ? "Disabling" : "Enabling")
<< " observation uploading.";
auto locked = protected_fields_.lock();
locked->is_disabled = is_disabled;
if (!is_disabled) {
// Set idle false when setting is_disabled to false to make sure that the worker thread wakes up
// and runs through another loop. Even if there are no observations to send, we want to make
// sure that any thread that invokes WaitUntilIdle() after this will actually wait until the
// ShippingManager returns to the idle state.
locked->idle = false;
locked->add_observation_notifier.notify_all();
}
}
void ShippingManager::Run() {
while (true) {
auto locked = protected_fields_.lock();
if (locked->shut_down) {
return;
}
// We start each iteration of the loop with a sleep of
// upload_scheduler_.MinInterval().
// This ensures that we never send twice within one
// upload_scheduler_.MinInterval() period.
// Sleep for upload_scheduler_.MinInterval() or until shut_down_.
VLOG(4) << name() << " worker: sleeping for " << upload_scheduler_.MinInterval().count()
<< " seconds.";
locked->shutdown_notifier.wait_for(locked, upload_scheduler_.MinInterval(),
[&locked] { return (locked->shut_down); });
VLOG(4) << name() << " worker: waking up from sleep. shut_down_=" << locked->shut_down;
if (locked->shut_down) {
return;
}
if (!locked->ObservationsAvailable()) {
// There are no Observations at all in the observation_store_, or the ShippingManager is
// disabled. Wait forever until one of the following occurs:
//
// 1. An observation is added.
// 2. The value of is_disabled changes.
// 3. The ShippingManager shuts down.
VLOG(5) << name()
<< " worker: waiting for an Observation to "
"be added.";
// If we are about to enter idle, we should make sure that we invoke all of the SendCallbacks
// so they don't have to wait until the next time observations are added.
InvokeSendCallbacksLockHeld(&locked, true);
locked->idle = true;
locked->idle_notifier.notify_all();
locked->add_observation_notifier.wait(locked, [&locked] {
return (
locked->shut_down ||
// If there are now observations available to upload, we should leave the waiting state.
locked->ObservationsAvailable() ||
// The ObservationStore has manually been set to non-idle by either Disable or
// NotifyObservationAdded. We should leave the wait state and run another loop.
!locked->idle);
});
VLOG(5) << name()
<< " worker: Waking up because an Observation was "
"added.";
locked->idle = false;
continue;
}
// Otherwise, there are Observations available in the observation_store_.
auto now = std::chrono::system_clock::now();
VLOG(4) << name() << ": now: " << ToString(now)
<< " next_scheduled_send_time_: " << ToString(next_scheduled_send_time_);
if (!locked->expedited_send_requested) {
bool should_wait_for_idle_device =
(next_scheduled_send_time_ <= now) && device_active() && !locked->waiting_for_idle_device;
if (should_wait_for_idle_device || next_scheduled_send_time_ > now) {
if (should_wait_for_idle_device) {
VLOG(4) << name() << " worker: starting to wait for idle device.";
locked->num_delayed_sends++;
locked->waiting_for_idle_device = true;
locked->waiting_for_idle_device_notifier.notify_all();
}
// Wait until the next scheduled send.
auto time = std::chrono::system_clock::to_time_t(next_scheduled_send_time_);
VLOG(4) << name() << " worker: waiting until " << std::ctime(&time)
<< " for next scheduled send.";
locked->waiting_for_schedule = true;
locked->waiting_for_schedule_notifier.notify_all();
locked->expedited_send_notifier.wait_until(locked, next_scheduled_send_time_, [&locked] {
return (locked->shut_down || locked->expedited_send_requested);
});
locked->waiting_for_schedule = false;
continue;
}
}
if (locked->expedited_send_requested) {
VLOG(4) << name() << " worker: expedited send requested.";
locked->expedited_send_requested = false;
}
VLOG(4) << name() << " worker: time to send now.";
locked.unlock();
SendAllEnvelopes();
next_scheduled_send_time_ = std::chrono::system_clock::now() + upload_scheduler_.Interval();
locked.lock();
}
}
void ShippingManager::SendAllEnvelopes() {
if (protected_fields_.const_lock()->is_disabled) {
return;
}
auto state = logger::IdleObservationUploadMigratedMetricDimensionDeviceState::Unreported;
if (activity_listener_ != nullptr) {
switch (activity_listener_->state()) {
case ActivityState::ACTIVE:
state = logger::IdleObservationUploadMigratedMetricDimensionDeviceState::Active;
break;
case ActivityState::UNKNOWN:
state = logger::IdleObservationUploadMigratedMetricDimensionDeviceState::Unknown;
break;
case ActivityState::IDLE:
state = logger::IdleObservationUploadMigratedMetricDimensionDeviceState::Idle;
break;
}
}
internal_metrics_->IdleObservationUpload(state);
LOG(INFO) << name() << ": Sending all pending envelopes";
bool success = true;
size_t failures_without_success = 0;
// Loop through all envelopes in the ObservationStore.
while (true) {
auto holder = protected_fields_.lock()->observation_store.TakeNextEnvelopeHolder();
if (holder == nullptr) {
// No more envelopes in the store, we can exit the loop.
break;
}
auto failed_holder = SendEnvelopeToBackend(std::move(holder));
if (failed_holder == nullptr) {
// The send succeeded.
failures_without_success = 0;
} else {
// The send failed. Increment failures_without_success and return the
// failed EnvelopeHolder to the store.
success = false;
failures_without_success++;
protected_fields_.lock()->observation_store.ReturnEnvelopeHolder(std::move(failed_holder));
}
if (failures_without_success >= kMaxFailuresWithoutSuccess) {
VLOG(4) << name() << "::SendAllEnvelopes(): failed too many times ("
<< failures_without_success << "). Stopping uploads.";
break;
}
}
{
auto locked = protected_fields_.lock();
InvokeSendCallbacksLockHeld(&locked, success);
}
}
void ShippingManager::InvokeSendCallbacksLockHeld(ShippingManager::Fields::LockedFieldsPtr* fields,
bool success) {
auto& f = *fields;
f->expedited_send_requested = false;
std::vector<SendCallback> callbacks_to_invoke;
callbacks_to_invoke.swap(f->send_callback_queue);
for (SendCallback& callback : callbacks_to_invoke) {
VLOG(5) << name() << ": Invoking send callback(" << success << ") now.";
callback(success);
}
}
ClearcutV1ShippingManager::ClearcutV1ShippingManager(
const UploadScheduler& upload_scheduler, ObservationStore& observation_store,
// TODO(fxbug.dev/85571): NOLINTNEXTLINE(bugprone-easily-swappable-parameters)
util::EncryptedMessageMaker* encrypt_to_shuffler,
util::EncryptedMessageMaker* encrypt_to_analyzer,
std::unique_ptr<lib::clearcut::ClearcutUploaderInterface> clearcut,
std::unique_ptr<ActivityListenerInterface> activity_listener, DiagnosticsInterface* diagnostics,
// TODO(fxbug.dev/85571): NOLINTNEXTLINE(bugprone-easily-swappable-parameters)
int32_t log_source_id, size_t max_attempts_per_upload, std::string api_key)
: ShippingManager(upload_scheduler, observation_store, encrypt_to_analyzer,
std::move(activity_listener)),
max_attempts_per_upload_(max_attempts_per_upload),
clearcut_(std::move(clearcut)),
diagnostics_(diagnostics),
api_key_(std::move(api_key)),
encrypt_to_shuffler_(encrypt_to_shuffler),
log_source_id_(log_source_id) {}
void ClearcutV1ShippingManager::ResetInternalMetrics(logger::InternalMetrics* internal_metrics) {
internal_metrics_.reset(internal_metrics);
clearcut_->ResetInternalMetrics(internal_metrics);
}
std::unique_ptr<EnvelopeHolder> ClearcutV1ShippingManager::SendEnvelopeToBackend(
std::unique_ptr<EnvelopeHolder> envelope_to_send) {
auto envelope = envelope_to_send->GetEnvelope(encrypt_to_analyzer_);
envelope.set_api_key(api_key_);
Status status = SendEnvelopeToClearcutDestination(envelope, envelope_to_send->Size());
if (diagnostics_ != nullptr) {
diagnostics_->SentObservationResult(status);
}
if (!status.ok()) {
VLOG(4) << name() << ": Cobalt send to Shuffler failed: (" << status.error_code() << ") "
<< status.error_message() << ". Observations have been re-enqueued for later.";
return envelope_to_send;
}
return nullptr;
}
Status ClearcutV1ShippingManager::SendEnvelopeToClearcutDestination(const Envelope& envelope,
size_t envelope_size) {
auto log_extension = std::make_unique<LogEventExtension>();
if (!encrypt_to_shuffler_->Encrypt(envelope,
log_extension->mutable_cobalt_encrypted_envelope())) {
LOG_FIRST_N(ERROR, 10) << "Failed to encrypt an envelope to the shuffler. Dropping envelope.";
return Status::OkStatus();
}
for (const auto& observation_batch : envelope.batch()) {
const auto& metadata = observation_batch.meta_data();
internal_metrics_->BytesUploaded(
logger::PerProjectBytesUploadedMigratedMetricDimensionStatus::Attempted,
observation_batch.ByteSizeLong(),
lib::ProjectIdentifier(lib::CustomerIdentifier(metadata.customer_id()),
metadata.project_id()));
}
VLOG(5) << name() << " worker: Sending Envelope of size " << envelope_size
<< " bytes to clearcut.";
lib::clearcut::LogRequest request;
request.set_log_source(log_source_id_);
request.add_log_event()->SetAllocatedExtension(LogEventExtension::ext, log_extension.release());
Status status;
{
std::lock_guard<std::mutex> lock(clearcut_mutex_);
status = clearcut_->UploadEvents(&request, static_cast<int32_t>(max_attempts_per_upload_));
}
{
auto locked = protected_fields_.lock();
locked->num_send_attempts++;
if (!status.ok()) {
locked->num_failed_attempts++;
}
locked->last_send_status = status;
}
if (status.ok()) {
VLOG(4) << name() << "::SendEnvelopeToBackend: OK";
for (const auto& observation_batch : envelope.batch()) {
const auto& metadata = observation_batch.meta_data();
internal_metrics_->BytesUploaded(
logger::PerProjectBytesUploadedMigratedMetricDimensionStatus::Succeeded,
observation_batch.GetCachedSize(),
lib::ProjectIdentifier(lib::CustomerIdentifier(metadata.customer_id()),
metadata.project_id()));
}
}
return status;
}
void ShippingManager::WaitUntilIdle(std::chrono::seconds max_wait) {
auto locked = protected_fields_.lock();
if (locked->shut_down || locked->idle) {
return;
}
locked->idle_notifier.wait_for(locked, max_wait,
[&locked] { return (locked->shut_down || locked->idle); });
}
void ShippingManager::WaitUntilWorkerWaiting(std::chrono::seconds max_wait) {
auto locked = protected_fields_.lock();
if (locked->shut_down || locked->waiting_for_schedule) {
return;
}
locked->waiting_for_schedule_notifier.wait_for(
locked, max_wait, [&locked] { return (locked->shut_down || locked->waiting_for_schedule); });
}
void ShippingManager::WaitUntilWorkerWaitingIdleDevice(std::chrono::seconds max_wait) {
auto locked = protected_fields_.lock();
if (locked->shut_down || locked->waiting_for_idle_device) {
return;
}
locked->waiting_for_idle_device_notifier.wait_for(locked, max_wait, [&locked] {
return (locked->shut_down || locked->waiting_for_idle_device);
});
}
LocalShippingManager::LocalShippingManager(observation_store::ObservationStore& observation_store,
util::EncryptedMessageMaker* encrypt_to_analyzer,
std::string output_file_path, util::FileSystem& fs)
: ShippingManager({std::chrono::seconds::zero(), std::chrono::seconds::zero()},
observation_store, encrypt_to_analyzer),
output_file_path_(std::move(output_file_path)),
fs_(fs) {}
std::unique_ptr<EnvelopeHolder> LocalShippingManager::SendEnvelopeToBackend(
std::unique_ptr<EnvelopeHolder> envelope_to_send) {
const Envelope& envelope = envelope_to_send->GetEnvelope(encrypt_to_analyzer_);
VLOG(5) << name() << " worker: Saving Envelope of size " << envelope_to_send->Size()
<< " bytes to local file.";
Status status = Status::OkStatus();
auto ofs = fs_.NewProtoOutputStream(output_file_path_, /*append=*/true);
if (ofs.ok()) {
auto proto_output_stream = std::move(ofs.value());
if (!google::protobuf::util::SerializeDelimitedToZeroCopyStream(envelope,
proto_output_stream.get())) {
VLOG(4) << name() << ": Unable to write Envelope to local file: " << output_file_path_;
}
}
{
auto locked = protected_fields_.lock();
locked->num_send_attempts++;
if (!status.ok()) {
locked->num_failed_attempts++;
}
locked->last_send_status = status;
}
if (status.ok()) {
VLOG(4) << name() << "::SendEnvelopeToBackend: OK";
return nullptr;
}
VLOG(4) << name() << ": Cobalt save to local file failed: (" << status.error_code() << ") "
<< status.error_message() << ". Observations have been re-enqueued for later.";
return envelope_to_send;
}
} // namespace cobalt::uploader