| // Copyright 2018 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "src/observation_store/memory_observation_store.h" |
| |
| #include <utility> |
| |
| #include "src/logger/logger_interface.h" |
| #include "src/logging.h" |
| #include "src/public/lib/registry_identifiers.h" |
| |
| namespace cobalt::observation_store { |
| |
| namespace { |
| |
| constexpr float kSendThresholdPercent = 0.6; |
| |
| } |
| |
| MemoryObservationStore::MemoryObservationStore(size_t max_bytes_per_observation, |
| size_t max_bytes_per_envelope, |
| size_t max_bytes_total, |
| logger::InternalMetrics* internal_metrics) |
| : ObservationStore(max_bytes_per_observation, max_bytes_per_envelope, max_bytes_total), |
| envelope_send_threshold_size_( |
| std::lround(kSendThresholdPercent * static_cast<float>(max_bytes_per_envelope_))), |
| current_envelope_(new EnvelopeMaker(max_bytes_per_observation, max_bytes_per_envelope)) { |
| ResetInternalMetrics(internal_metrics); |
| } |
| |
| Status MemoryObservationStore::StoreObservation(std::unique_ptr<StoredObservation> observation, |
| std::unique_ptr<ObservationMetadata> metadata) { |
| if (IsDisabled()) { |
| return Status::OkStatus(); |
| } |
| |
| std::unique_lock<std::mutex> lock(envelope_mutex_); |
| |
| internal_metrics_->BytesStored( |
| logger::PerProjectBytesStoredMigratedMetricDimensionStatus::Attempted, SizeLocked(), |
| lib::ProjectIdentifier(lib::CustomerIdentifier(metadata->customer_id()), |
| metadata->project_id())); |
| |
| if (SizeLocked() > max_bytes_total_) { |
| return Status(StatusCode::RESOURCE_EXHAUSTED, |
| "MemoryObservationStore::StoreObservation(): Rejecting observation because " |
| "the store is full. (" + |
| std::to_string(SizeLocked()) + " > " + std::to_string(max_bytes_total_) + |
| ")"); |
| } |
| |
| Status status_peek = current_envelope_->CanAddObservation(*observation); |
| |
| if (status_peek.error_code() == StatusCode::RESOURCE_EXHAUSTED) { |
| VLOG(4) << "MemoryObservationStore::StoreObservation(): Current " |
| "envelope would return kStoreFull. Swapping it out for " |
| "a new EnvelopeMaker"; |
| AddEnvelopeToSend(std::move(current_envelope_)); |
| current_envelope_ = NewEnvelopeMaker(); |
| } |
| |
| uint32_t customer_id = metadata->customer_id(); |
| uint32_t project_id = metadata->project_id(); |
| lib::ReportIdentifier report_id(*metadata); |
| |
| Status status = current_envelope_->StoreObservation(std::move(observation), std::move(metadata)); |
| if (status.ok()) { |
| num_obs_per_report_[report_id]++; |
| internal_metrics_->BytesStored( |
| logger::PerProjectBytesStoredMigratedMetricDimensionStatus::Succeeded, SizeLocked(), |
| lib::ProjectIdentifier(lib::CustomerIdentifier(customer_id), project_id)); |
| } |
| return status; |
| } |
| |
| std::unique_ptr<EnvelopeMaker> MemoryObservationStore::NewEnvelopeMaker() { |
| return std::make_unique<EnvelopeMaker>(max_bytes_per_observation_, max_bytes_per_envelope_); |
| } |
| |
| std::unique_ptr<ObservationStore::EnvelopeHolder> |
| MemoryObservationStore::TakeOldestEnvelopeHolderLocked() { |
| auto retval = std::move(finalized_envelopes_.front()); |
| finalized_envelopes_.pop_front(); |
| if (retval->Size() > finalized_envelopes_size_) { |
| finalized_envelopes_size_ = 0; |
| } else { |
| finalized_envelopes_size_ -= retval->Size(); |
| } |
| return retval; |
| } |
| |
| void MemoryObservationStore::AddEnvelopeToSend(std::unique_ptr<EnvelopeHolder> holder, bool back) { |
| finalized_envelopes_size_ += holder->Size(); |
| if (back) { |
| finalized_envelopes_.push_back(std::move(holder)); |
| } else { |
| finalized_envelopes_.push_front(std::move(holder)); |
| } |
| } |
| |
| std::unique_ptr<ObservationStore::EnvelopeHolder> MemoryObservationStore::TakeNextEnvelopeHolder() { |
| std::unique_lock<std::mutex> lock(envelope_mutex_); |
| |
| auto retval = NewEnvelopeMaker(); |
| size_t retval_size = 0; |
| while (!finalized_envelopes_.empty() && |
| (retval_size == 0 || |
| (retval_size + finalized_envelopes_.front()->Size() <= max_bytes_per_envelope_))) { |
| retval->MergeWith(TakeOldestEnvelopeHolderLocked()); |
| retval_size = retval->Size(); |
| } |
| |
| if (retval_size + current_envelope_->Size() <= max_bytes_per_envelope_) { |
| retval->MergeWith(std::move(current_envelope_)); |
| current_envelope_ = NewEnvelopeMaker(); |
| } |
| |
| if (retval->Size() == 0) { |
| return nullptr; |
| } |
| |
| return retval; |
| } |
| |
| void MemoryObservationStore::ReturnEnvelopeHolder( |
| std::unique_ptr<ObservationStore::EnvelopeHolder> envelope) { |
| std::unique_lock<std::mutex> lock(envelope_mutex_); |
| AddEnvelopeToSend(std::move(envelope)); |
| } |
| |
| size_t MemoryObservationStore::SizeLocked() const { |
| return current_envelope_->Size() + finalized_envelopes_size_; |
| } |
| |
| size_t MemoryObservationStore::Size() const { |
| std::unique_lock<std::mutex> lock(envelope_mutex_); |
| return SizeLocked(); |
| } |
| |
| bool MemoryObservationStore::Empty() const { |
| std::unique_lock<std::mutex> lock(envelope_mutex_); |
| return current_envelope_->Empty() && finalized_envelopes_.empty(); |
| } |
| |
| void MemoryObservationStore::DeleteData() { |
| VLOG(4) << "MemoryObservationStore: Deleting stored data"; |
| |
| std::unique_lock<std::mutex> lock(envelope_mutex_); |
| current_envelope_ = NewEnvelopeMaker(); |
| finalized_envelopes_.clear(); |
| finalized_envelopes_size_ = 0; |
| } |
| |
| } // namespace cobalt::observation_store |