blob: 63a9c69bc76260575e42efce8acf4a42b853ae98 [file] [log] [blame]
// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "client/collection/observations_collector.h"
namespace cobalt {
namespace client {
ObservationPart Counter::GetObservationPart() {
// Atomically swaps the value in counter_ for 0 and puts the former value of
// counter_ in value.
ValuePart value = ValuePart::MakeIntValuePart(counter_.exchange(0));
// If the undo function is called, it adds |value| back to the counter_.
return ObservationPart(part_name_, encoding_id_, value,
[this, value]() { counter_ += value.GetIntValue(); });
}
std::shared_ptr<MetricObservers> MetricObservers::Make(uint32_t id) {
// An empty string for the collection period part name disables the collection
// timer.
return std::shared_ptr<MetricObservers>(new MetricObservers(id));
}
std::shared_ptr<Counter> MetricObservers::MakeCounter(
const std::string& part_name, uint32_t encoding_id) {
if (counters_.count(part_name) != 0) {
return nullptr;
}
auto counter = Counter::Make(part_name, encoding_id);
counters_[part_name] = counter;
return counter;
}
Observation MetricObservers::GetObservation() {
Observation observation;
observation.metric_id = id_;
for (auto iter = counters_.begin(); iter != counters_.end(); iter++) {
observation.parts.push_back(iter->second->GetObservationPart());
}
return observation;
}
template <>
ValuePart Sampler<int64_t>::GetValuePart(size_t idx) {
return ValuePart::MakeIntValuePart(reservoir_[idx]);
}
std::shared_ptr<Counter> ObservationsCollector::MakeCounter(
uint32_t metric_id, const std::string& part_name, uint32_t encoding_id) {
return GetMetricObservers(metric_id)->MakeCounter(part_name, encoding_id);
}
std::shared_ptr<Counter> ObservationsCollector::MakeCounter(
uint32_t metric_id, const std::string& part_name) {
return MakeCounter(metric_id, part_name, default_encoding_id_);
}
std::shared_ptr<IntegerSampler> ObservationsCollector::MakeIntegerSampler(
uint32_t metric_id, const std::string& part_name, uint32_t encoding_id,
size_t samples) {
auto reservoir_sampler =
Sampler<int64_t>::Make(metric_id, part_name, encoding_id, samples);
reservoir_samplers_.push_back(
[reservoir_sampler](std::vector<Observation>* observations) {
reservoir_sampler->AppendObservations(observations);
});
return reservoir_sampler;
}
std::shared_ptr<IntegerSampler> ObservationsCollector::MakeIntegerSampler(
uint32_t metric_id, const std::string& part_name, size_t samples) {
return MakeIntegerSampler(metric_id, part_name, default_encoding_id_,
samples);
}
std::shared_ptr<MetricObservers> ObservationsCollector::GetMetricObservers(
uint32_t metric_id) {
if (metrics_.count(metric_id) == 0) {
metrics_[metric_id] = MetricObservers::Make(metric_id);
}
return metrics_[metric_id];
}
void ObservationsCollector::Start(
std::chrono::nanoseconds collection_interval) {
collection_loop_continue_ = true;
collection_loop_ = std::thread(&ObservationsCollector::CollectLoop, this,
collection_interval);
}
void ObservationsCollector::Stop() {
collection_loop_continue_ = false;
collection_loop_.join();
}
void ObservationsCollector::CollectAll() {
std::vector<Observation> observations;
for (auto iter = metrics_.begin(); iter != metrics_.end(); iter++) {
observations.push_back(iter->second->GetObservation());
}
for (auto iter = reservoir_samplers_.begin();
iter != reservoir_samplers_.end(); iter++) {
(*iter)(&observations);
}
auto errors = send_observations_(&observations);
// Undo failed observations.
for (auto iter = errors.begin(); iter != errors.end(); iter++) {
for (auto parts_iter = observations[*iter].parts.begin();
parts_iter != observations[*iter].parts.end(); parts_iter++) {
parts_iter->undo();
}
}
}
void ObservationsCollector::CollectLoop(
std::chrono::nanoseconds collection_interval) {
while (collection_loop_continue_) {
CollectAll();
// TODO(azani): Add jitter.
std::this_thread::sleep_for(collection_interval);
}
}
} // namespace client
} // namespace cobalt