blob: 00b2adc9eca99ba5cfd4a769881aab47ce40d5d8 [file] [log] [blame]
// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "src/observation_store/memory_observation_store.h"
#include <utility>
#include "src/logger/logger_interface.h"
#include "src/logging.h"
namespace cobalt::observation_store {
namespace {
constexpr float kSendThresholdPercent = 0.6;
}
MemoryObservationStore::MemoryObservationStore(size_t max_bytes_per_observation,
size_t max_bytes_per_envelope,
size_t max_bytes_total,
logger::InternalMetrics* internal_metrics)
: ObservationStore(max_bytes_per_observation, max_bytes_per_envelope, max_bytes_total),
envelope_send_threshold_size_(size_t(kSendThresholdPercent * max_bytes_per_envelope_)),
current_envelope_(new EnvelopeMaker(max_bytes_per_observation, max_bytes_per_envelope)),
finalized_envelopes_size_(0) {
ResetInternalMetrics(internal_metrics);
}
ObservationStore::StoreStatus MemoryObservationStore::StoreObservation(
std::unique_ptr<StoredObservation> observation, std::unique_ptr<ObservationMetadata> metadata) {
if (IsDisabled()) {
return kOk;
}
std::unique_lock<std::mutex> lock(envelope_mutex_);
internal_metrics_->BytesStored(logger::PerProjectBytesStoredMetricDimensionStatus::Attempted,
SizeLocked(), metadata->customer_id(), metadata->project_id());
if (SizeLocked() > max_bytes_total_) {
VLOG(4) << "MemoryObservationStore::StoreObservation(): Rejecting "
"observation because the store is full. ("
<< SizeLocked() << " > " << max_bytes_total_ << ")";
return kStoreFull;
}
ObservationStore::StoreStatus status_peek = current_envelope_->CanAddObservation(*observation);
if (status_peek == kStoreFull) {
VLOG(4) << "MemoryObservationStore::StoreObservation(): Current "
"envelope would return kStoreFull. Swapping it out for "
"a new EnvelopeMaker";
AddEnvelopeToSend(std::move(current_envelope_));
current_envelope_ = NewEnvelopeMaker();
}
uint32_t customer_id = metadata->customer_id();
uint32_t project_id = metadata->project_id();
uint32_t metric_id = metadata->metric_id();
uint32_t report_id = metadata->report_id();
ObservationStore::StoreStatus status =
current_envelope_->StoreObservation(std::move(observation), std::move(metadata));
if (status == kOk) {
num_obs_per_report_[{.customer_id = customer_id,
.project_id = project_id,
.metric_id = metric_id,
.report_id = report_id}]++;
internal_metrics_->BytesStored(logger::PerProjectBytesStoredMetricDimensionStatus::Succeeded,
SizeLocked(), customer_id, project_id);
}
return status;
}
std::unique_ptr<EnvelopeMaker> MemoryObservationStore::NewEnvelopeMaker() {
return std::make_unique<EnvelopeMaker>(max_bytes_per_observation_, max_bytes_per_envelope_);
}
std::unique_ptr<ObservationStore::EnvelopeHolder>
MemoryObservationStore::TakeOldestEnvelopeHolderLocked() {
auto retval = std::move(finalized_envelopes_.front());
finalized_envelopes_.pop_front();
if (retval->Size() > finalized_envelopes_size_) {
finalized_envelopes_size_ = 0;
} else {
finalized_envelopes_size_ -= retval->Size();
}
return retval;
}
void MemoryObservationStore::AddEnvelopeToSend(std::unique_ptr<EnvelopeHolder> holder, bool back) {
finalized_envelopes_size_ += holder->Size();
if (back) {
finalized_envelopes_.push_back(std::move(holder));
} else {
finalized_envelopes_.push_front(std::move(holder));
}
}
std::unique_ptr<ObservationStore::EnvelopeHolder> MemoryObservationStore::TakeNextEnvelopeHolder() {
std::unique_lock<std::mutex> lock(envelope_mutex_);
auto retval = NewEnvelopeMaker();
size_t retval_size = 0;
while (!finalized_envelopes_.empty() &&
(retval_size == 0 ||
(retval_size + finalized_envelopes_.front()->Size() <= max_bytes_per_envelope_))) {
retval->MergeWith(TakeOldestEnvelopeHolderLocked());
retval_size = retval->Size();
}
if (retval_size + current_envelope_->Size() <= max_bytes_per_envelope_) {
retval->MergeWith(std::move(current_envelope_));
current_envelope_ = NewEnvelopeMaker();
}
if (retval->Size() == 0) {
return nullptr;
}
return retval;
}
void MemoryObservationStore::ReturnEnvelopeHolder(
std::unique_ptr<ObservationStore::EnvelopeHolder> envelope) {
std::unique_lock<std::mutex> lock(envelope_mutex_);
AddEnvelopeToSend(std::move(envelope));
}
size_t MemoryObservationStore::SizeLocked() const {
return current_envelope_->Size() + finalized_envelopes_size_;
}
size_t MemoryObservationStore::Size() const {
std::unique_lock<std::mutex> lock(envelope_mutex_);
return SizeLocked();
}
bool MemoryObservationStore::Empty() const {
std::unique_lock<std::mutex> lock(envelope_mutex_);
return current_envelope_->Empty() && finalized_envelopes_.empty();
}
void MemoryObservationStore::DeleteData() {
VLOG(4) << "MemoryObservationStore: Deleting stored data";
std::unique_lock<std::mutex> lock(envelope_mutex_);
current_envelope_ = NewEnvelopeMaker();
finalized_envelopes_.clear();
finalized_envelopes_size_ = 0;
}
} // namespace cobalt::observation_store