blob: 6ff2cbb4f1b64ac1a8df22df04d4a3cadf4883e0 [file] [log] [blame]
// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "src/observation_store/memory_observation_store.h"
#include <utility>
#include "src/logger/logger_interface.h"
#include "src/logging.h"
#include "src/public/lib/registry_identifiers.h"
namespace cobalt::observation_store {
namespace {
constexpr float kSendThresholdPercent = 0.6;
}
MemoryObservationStore::MemoryObservationStore(size_t max_bytes_per_observation,
size_t max_bytes_per_envelope,
size_t max_bytes_total,
logger::InternalMetrics* internal_metrics)
: ObservationStore(max_bytes_per_observation, max_bytes_per_envelope, max_bytes_total),
envelope_send_threshold_size_(
std::lround(kSendThresholdPercent * static_cast<float>(max_bytes_per_envelope_))),
current_envelope_(new EnvelopeMaker(max_bytes_per_observation, max_bytes_per_envelope)) {
ResetInternalMetrics(internal_metrics);
}
Status MemoryObservationStore::StoreObservation(std::unique_ptr<StoredObservation> observation,
std::unique_ptr<ObservationMetadata> metadata) {
if (IsDisabled()) {
return Status::OkStatus();
}
std::unique_lock<std::mutex> lock(envelope_mutex_);
internal_metrics_->BytesStored(logger::PerProjectBytesStoredMetricDimensionStatus::Attempted,
SizeLocked(), metadata->customer_id(), metadata->project_id());
if (SizeLocked() > max_bytes_total_) {
return Status(StatusCode::RESOURCE_EXHAUSTED,
"MemoryObservationStore::StoreObservation(): Rejecting observation because "
"the store is full. (" +
std::to_string(SizeLocked()) + " > " + std::to_string(max_bytes_total_) +
")");
}
Status status_peek = current_envelope_->CanAddObservation(*observation);
if (status_peek.error_code() == StatusCode::RESOURCE_EXHAUSTED) {
VLOG(4) << "MemoryObservationStore::StoreObservation(): Current "
"envelope would return kStoreFull. Swapping it out for "
"a new EnvelopeMaker";
AddEnvelopeToSend(std::move(current_envelope_));
current_envelope_ = NewEnvelopeMaker();
}
uint32_t customer_id = metadata->customer_id();
uint32_t project_id = metadata->project_id();
lib::ReportIdentifier report_id(*metadata);
Status status = current_envelope_->StoreObservation(std::move(observation), std::move(metadata));
if (status.ok()) {
num_obs_per_report_[report_id]++;
internal_metrics_->BytesStored(logger::PerProjectBytesStoredMetricDimensionStatus::Succeeded,
SizeLocked(), customer_id, project_id);
}
return status;
}
std::unique_ptr<EnvelopeMaker> MemoryObservationStore::NewEnvelopeMaker() {
return std::make_unique<EnvelopeMaker>(max_bytes_per_observation_, max_bytes_per_envelope_);
}
std::unique_ptr<ObservationStore::EnvelopeHolder>
MemoryObservationStore::TakeOldestEnvelopeHolderLocked() {
auto retval = std::move(finalized_envelopes_.front());
finalized_envelopes_.pop_front();
if (retval->Size() > finalized_envelopes_size_) {
finalized_envelopes_size_ = 0;
} else {
finalized_envelopes_size_ -= retval->Size();
}
return retval;
}
void MemoryObservationStore::AddEnvelopeToSend(std::unique_ptr<EnvelopeHolder> holder, bool back) {
finalized_envelopes_size_ += holder->Size();
if (back) {
finalized_envelopes_.push_back(std::move(holder));
} else {
finalized_envelopes_.push_front(std::move(holder));
}
}
std::unique_ptr<ObservationStore::EnvelopeHolder> MemoryObservationStore::TakeNextEnvelopeHolder() {
std::unique_lock<std::mutex> lock(envelope_mutex_);
auto retval = NewEnvelopeMaker();
size_t retval_size = 0;
while (!finalized_envelopes_.empty() &&
(retval_size == 0 ||
(retval_size + finalized_envelopes_.front()->Size() <= max_bytes_per_envelope_))) {
retval->MergeWith(TakeOldestEnvelopeHolderLocked());
retval_size = retval->Size();
}
if (retval_size + current_envelope_->Size() <= max_bytes_per_envelope_) {
retval->MergeWith(std::move(current_envelope_));
current_envelope_ = NewEnvelopeMaker();
}
if (retval->Size() == 0) {
return nullptr;
}
return retval;
}
void MemoryObservationStore::ReturnEnvelopeHolder(
std::unique_ptr<ObservationStore::EnvelopeHolder> envelope) {
std::unique_lock<std::mutex> lock(envelope_mutex_);
AddEnvelopeToSend(std::move(envelope));
}
size_t MemoryObservationStore::SizeLocked() const {
return current_envelope_->Size() + finalized_envelopes_size_;
}
size_t MemoryObservationStore::Size() const {
std::unique_lock<std::mutex> lock(envelope_mutex_);
return SizeLocked();
}
bool MemoryObservationStore::Empty() const {
std::unique_lock<std::mutex> lock(envelope_mutex_);
return current_envelope_->Empty() && finalized_envelopes_.empty();
}
void MemoryObservationStore::DeleteData() {
VLOG(4) << "MemoryObservationStore: Deleting stored data";
std::unique_lock<std::mutex> lock(envelope_mutex_);
current_envelope_ = NewEnvelopeMaker();
finalized_envelopes_.clear();
finalized_envelopes_size_ = 0;
}
} // namespace cobalt::observation_store