blob: 83a1580edfcbaa399da6b4f64f70010dd36aa72c [file] [log] [blame]
// Copyright 2017 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "peridot/lib/cobalt/cobalt.h"
#include <set>
#include <fuchsia/cpp/cobalt.h>
#include <fuchsia/cpp/component.h>
#include <lib/async/cpp/task.h>
#include <lib/async/default.h>
#include "garnet/lib/backoff/exponential_backoff.h"
#include "garnet/lib/callback/waiter.h"
#include "lib/app/cpp/connect.h"
#include "lib/fidl/cpp/clone.h"
#include "lib/fxl/functional/auto_call.h"
#include "lib/fxl/functional/make_copyable.h"
#include "lib/fxl/logging.h"
#include "lib/fxl/macros.h"
namespace cobalt {
namespace {
fidl::VectorPtr<cobalt::ObservationValue> CloneObservationValues(
const fidl::VectorPtr<cobalt::ObservationValue>& other) {
fidl::VectorPtr<cobalt::ObservationValue> result;
zx_status_t status = fidl::Clone(other, &result);
FXL_DCHECK(status == ZX_OK);
return result;
}
} // namespace
CobaltObservation::CobaltObservation(uint32_t metric_id,
uint32_t encoding_id,
Value value)
: metric_id_(metric_id) {
FXL_DCHECK(value.is_string_value() || value.is_int_value() ||
value.is_double_value() || value.is_index_value() ||
value.is_int_bucket_distribution());
parts_.push_back(cobalt::ObservationValue());
parts_->at(0).value = std::move(value);
parts_->at(0).encoding_id = encoding_id;
}
CobaltObservation::~CobaltObservation() = default;
CobaltObservation::CobaltObservation(
uint32_t metric_id,
fidl::VectorPtr<cobalt::ObservationValue> parts)
: metric_id_(metric_id), parts_(std::move(parts)) {}
CobaltObservation::CobaltObservation(const CobaltObservation& rhs)
: CobaltObservation(rhs.metric_id_, CloneObservationValues(rhs.parts_)) {}
CobaltObservation::CobaltObservation(CobaltObservation&& rhs) :
CobaltObservation(rhs.metric_id_, std::move(rhs.parts_)) {}
void CobaltObservation::Report(CobaltEncoderPtr& encoder,
std::function<void(Status)> callback) && {
if (parts_->size() == 1) {
encoder->AddObservation(metric_id_, parts_->at(0).encoding_id,
std::move(parts_->at(0).value), callback);
} else {
encoder->AddMultipartObservation(metric_id_, std::move(parts_), callback);
}
}
std::string CobaltObservation::ValueRepr() {
std::ostringstream stream;
stream << "[";
for (auto& observation_value : *parts_) {
const Value& value = observation_value.value;
switch (value.Which()) {
case Value::Tag::Invalid: {
stream << "unknown";
break;
}
case Value::Tag::kStringValue: {
stream << value.string_value();
break;
}
case Value::Tag::kDoubleValue: {
stream << value.double_value();
break;
}
case Value::Tag::kIntValue: {
stream << value.int_value();
break;
}
case Value::Tag::kIndexValue: {
stream << value.index_value();
break;
}
case Value::Tag::kIntBucketDistribution: {
stream << "bucket of size " << value.int_bucket_distribution()->size();
break;
}
}
stream << ",";
}
stream << "]";
return stream.str();
}
bool CobaltObservation::operator<(const CobaltObservation& rhs) const {
if (metric_id_ != rhs.metric_id_) {
return metric_id_ < rhs.metric_id_;
}
if (parts_->size() < rhs.parts_->size()) {
return true;
}
for (uint64_t i = 0; i < parts_->size(); i++) {
if (!CompareObservationValueLess(parts_->at(i), rhs.parts_->at(i))) {
return false;
}
}
return true;
}
bool CobaltObservation::CompareObservationValueLess(
const ObservationValue& observation_value,
const ObservationValue& rhs_observation_value) const {
if (observation_value.encoding_id != observation_value.encoding_id) {
return observation_value.encoding_id < rhs_observation_value.encoding_id;
}
const Value& value = observation_value.value;
const Value& rhs_value = rhs_observation_value.value;
if (value.Which() != rhs_value.Which()) {
return value.Which() < rhs_value.Which();
}
switch (value.Which()) {
case Value::Tag::Invalid:
return false;
case Value::Tag::kDoubleValue:
return value.double_value() < rhs_value.double_value();
case Value::Tag::kIntValue:
return value.int_value() < rhs_value.int_value();
case Value::Tag::kIndexValue:
return value.index_value() < rhs_value.index_value();
case Value::Tag::kStringValue:
return value.string_value() < rhs_value.string_value();
case Value::Tag::kIntBucketDistribution: {
if (value.int_bucket_distribution()->size() ==
rhs_value.int_bucket_distribution()->size()) {
auto i = value.int_bucket_distribution()->begin();
auto j = rhs_value.int_bucket_distribution()->begin();
while (i != value.int_bucket_distribution()->end()) {
if ((*i).index != (*j).index) {
return (*i).index < (*j).index;
}
if ((*i).count != (*j).count) {
return (*i).count < (*j).count;
}
++i;
++j;
}
return false;
}
return value.int_bucket_distribution()->size() <
rhs_value.int_bucket_distribution()->size();
}
}
}
CobaltObservation& CobaltObservation::operator=(const CobaltObservation& rhs) {
if (this != &rhs) {
metric_id_ = rhs.metric_id_;
parts_ = CloneObservationValues(rhs.parts_);
}
return *this;
}
CobaltObservation& CobaltObservation::operator=(CobaltObservation&& rhs) {
if (this != &rhs) {
metric_id_ = rhs.metric_id_;
parts_ = std::move(rhs.parts_);
}
return *this;
}
CobaltContext::CobaltContext(async_t* async,
component::ApplicationContext* app_context,
int32_t project_id)
: async_(async),
app_context_(app_context),
project_id_(project_id) {
ConnectToCobaltApplication();
}
CobaltContext::~CobaltContext() {
if (!observations_in_transit_.empty() || !observations_to_send_.empty()) {
FXL_LOG(WARNING) << "Disconnecting connection to cobalt with observation "
"still pending... Observations will be lost.";
}
}
void CobaltContext::ReportObservation(CobaltObservation observation) {
if (async_ == async_get_default()) {
ReportObservationOnMainThread(std::move(observation));
return;
}
// Hop to the main thread, and go back to the global object dispatcher.
async::PostTask(async_, [observation = std::move(observation), this]() {
::cobalt::ReportObservation(observation, this); });
}
void CobaltContext::ConnectToCobaltApplication() {
auto encoder_factory =
app_context_->ConnectToEnvironmentService<CobaltEncoderFactory>();
encoder_factory->GetEncoder(project_id_, encoder_.NewRequest());
encoder_.set_error_handler([this] { OnConnectionError(); });
SendObservations();
}
void CobaltContext::OnConnectionError() {
FXL_LOG(ERROR) << "Connection to cobalt failed. Reconnecting after a delay.";
observations_to_send_.insert(observations_in_transit_.begin(),
observations_in_transit_.end());
observations_in_transit_.clear();
encoder_.Unbind();
async::PostDelayedTask(async_,
[this] { ConnectToCobaltApplication(); },
zx::nsec(backoff_.GetNext().ToNanoseconds()));
}
void CobaltContext::ReportObservationOnMainThread(
CobaltObservation observation) {
observations_to_send_.insert(observation);
if (!encoder_ || !observations_in_transit_.empty()) {
return;
}
SendObservations();
}
void CobaltContext::SendObservations() {
FXL_DCHECK(observations_in_transit_.empty());
if (observations_to_send_.empty()) {
return;
}
observations_in_transit_ = std::move(observations_to_send_);
observations_to_send_.clear();
auto waiter = callback::CompletionWaiter::Create();
for (auto observation : observations_in_transit_) {
auto callback = waiter->NewCallback();
std::move(observation)
.Report(encoder_, [this, observation,
callback = std::move(callback)](Status status) {
AddObservationCallback(observation, status);
callback();
});
}
waiter->Finalize([this]() {
// No transient errors.
if (observations_in_transit_.empty()) {
backoff_.Reset();
// Send any observation received while |observations_in_transit_| was not
// empty.
SendObservations();
return;
}
// A transient error happened, retry after a delay.
// TODO(miguelfrde): issue if we delete the context while a retry is in
// flight.
async::PostDelayedTask(
async_,
[this]() {
observations_to_send_.insert(observations_in_transit_.begin(),
observations_in_transit_.end());
observations_in_transit_.clear();
SendObservations();
},
zx::nsec(backoff_.GetNext().ToNanoseconds()));
});
}
void CobaltContext::AddObservationCallback(CobaltObservation observation,
cobalt::Status status) {
switch (status) {
case cobalt::Status::INVALID_ARGUMENTS:
case cobalt::Status::FAILED_PRECONDITION:
FXL_DCHECK(false) << "Unexpected status: " << status;
case cobalt::Status::OBSERVATION_TOO_BIG: // fall through
// Log the failure.
FXL_LOG(WARNING)
<< "Cobalt rejected obsevation for metric: "
<< observation.metric_id()
<< " with value: " << observation.ValueRepr()
<< " with status: " << status;
case cobalt::Status::OK: // fall through
// Remove the observation from the set of observations to send.
observations_in_transit_.erase(observation);
break;
case cobalt::Status::INTERNAL_ERROR:
case cobalt::Status::SEND_FAILED:
case cobalt::Status::TEMPORARILY_FULL:
// Keep the observation for re-queueing.
break;
}
}
fxl::AutoCall<fxl::Closure> InitializeCobalt(
async_t* async,
component::ApplicationContext* app_context, int32_t project_id,
CobaltContext** cobalt_context) {
FXL_DCHECK(!*cobalt_context);
auto context = std::make_unique<CobaltContext>(async, app_context,
project_id);
*cobalt_context = context.get();
return fxl::MakeAutoCall<fxl::Closure>(fxl::MakeCopyable(
[context = std::move(context), cobalt_context]() mutable {
context.reset();
*cobalt_context = nullptr;
}));
}
void ReportObservation(CobaltObservation observation,
CobaltContext* cobalt_context) {
if (cobalt_context) {
cobalt_context->ReportObservation(observation);
}
}
} // namespace cobalt