blob: 112e299d02d80cc9773930ea9160322982d568fd [file] [log] [blame]
// Copyright 2022 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "src/cobalt/bin/app/aggregate_and_upload_impl.h"
#include <chrono>
#include <cmath>
#include <future>
#include <memory>
#include "fuchsia/cobalt/cpp/fidl.h"
#include "src/lib/util/datetime_util.h"
#include "src/public/lib/status.h"
#include "src/public/lib/status_codes.h"
#include "zircon/system/public/zircon/time.h"
#include "zircon/third_party/ulib/musl/include/unistd.h"
namespace cobalt {
// The base delay that the exponential backoff will use.
const uint32_t kExponentialBackoffMicroseconds = 1000000; // 1000000 usec = 1 sec
AggregateAndUploadImpl::AggregateAndUploadImpl(
CobaltServiceInterface* cobalt_service,
MetricEventLoggerFactoryImpl* metric_event_logger_factory_impl)
: cobalt_service_(cobalt_service),
metric_event_logger_factory_impl_(metric_event_logger_factory_impl) {}
void AggregateAndUploadImpl::AggregateAndUploadMetricEvents(
AggregateAndUploadMetricEventsCallback callback) {
// Shutdown loggers and background aggregator threads.
ShutdownLoggersAndBackgroundAggregators();
std::chrono::system_clock::time_point now = std::chrono::system_clock::now();
uint32_t utc_day_index = util::TimePointToDayIndexUtc(now);
uint32_t exp_backoff_multiplier = 0;
bool should_retry = false;
uint32_t retry_attempts = 0;
do {
if (should_retry) {
FX_LOGS(WARNING) << "Aggregated observation generation retry attempt: " << ++retry_attempts
<< ".";
if (exp_backoff_multiplier > 0) {
// As the exp_backoff_multiplier increments for every retry with exponential backoff, the
// sleep will double.
// For example, if we retried 5 times with exponential backoff, the delay would look like
// this 1s...2s...4s...8s...16s.
usleep(static_cast<uint32_t>(std::pow(2, exp_backoff_multiplier - 1) *
kExponentialBackoffMicroseconds));
}
}
// Aggregate
cobalt::Status status = cobalt_service_->GenerateAggregatedObservations(utc_day_index);
if (status.ok()) {
should_retry = false;
} else {
StatusCode error_code = status.error_code();
FX_LOGS(WARNING) << "Aggregated observation generation failed. Failed with " << error_code
<< ".";
switch (error_code) {
case StatusCode::RESOURCE_EXHAUSTED:
// The RESOURCE_EXHAUSTED StatusCode means that the Observation store is full and we
// should retry immediately to avoid data loss. Setting the exp_backoff_multiplier_ to
// zero will run the retry immediately.
exp_backoff_multiplier = 0;
should_retry = true;
FX_LOGS(WARNING) << "Attempting to retry immediately.";
break;
case StatusCode::DATA_LOSS:
case StatusCode::ABORTED:
case StatusCode::INTERNAL:
case StatusCode::UNAVAILABLE:
// For these StatusCodes we want to retry with exponential backoff.
exp_backoff_multiplier++;
should_retry = true;
FX_LOGS(WARNING) << "Attempting to retry with exponential backoff.";
break;
default:
// For all other StatusCodes never retry and exit the loop.
should_retry = false;
FX_LOGS(ERROR) << "Will not retry! Error details: " << status.error_details() << ".";
break;
}
}
// Upload
std::promise<bool> send_result;
cobalt_service_->ShippingRequestSendSoon(
[&send_result](bool success) mutable { send_result.set_value(success); });
if (!send_result.get_future().get()) {
should_retry = true;
exp_backoff_multiplier++;
FX_LOGS(ERROR)
<< "There was a failure while sending Observations to Cobalt! Will attempt to retry.";
}
} while (should_retry);
callback();
}
void AggregateAndUploadImpl::ShutdownLoggersAndBackgroundAggregators() {
// Shutdown logger
FX_LOGS(INFO) << "Shutting down running loggers.";
metric_event_logger_factory_impl_->ShutDown();
FX_LOGS(INFO) << "Running loggers have been shut down.";
// Shutdown background aggregator threads
FX_LOGS(INFO) << "Shutting down other background aggregator threads.";
cobalt_service_->ShutDown();
FX_LOGS(INFO) << "Other background aggregator threads have been shut down.";
}
} // namespace cobalt