blob: b8db5d70652782de50404b9bbaaea9cdd665f1d4 [file] [log] [blame]
// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "client/collection/observations_collector.h"
namespace cobalt {
namespace client {
ObservationPart Counter::GetObservationPart() {
// Atomically swaps the value in counter_ for 0 and puts the former value of
// counter_ in value.
ValuePart value = ValuePart::MakeInt(counter_.exchange(0));
// If the undo function is called, it adds |value| back to the counter_.
return ObservationPart(part_name_, encoding_id_, value,
[this, value]() { counter_ += value.GetIntValue(); });
}
std::shared_ptr<Counter> MetricObservers::MakeCounter(
const std::string& part_name, uint32_t encoding_id) {
if (counters_.count(part_name) != 0) {
return nullptr;
}
auto counter = std::shared_ptr<Counter>(new Counter(part_name, encoding_id));
counters_[part_name] = counter;
return counter;
}
Observation MetricObservers::GetObservation() {
Observation observation;
observation.metric_id = id_;
for (auto iter = counters_.begin(); iter != counters_.end(); iter++) {
observation.parts.push_back(iter->second->GetObservationPart());
}
return observation;
}
Observation EventLogger::GetEventObservation(double average_time) {
Observation observation;
observation.metric_id = event_metric_id_;
auto now = std::chrono::steady_clock::now();
auto collection_duration =
std::chrono::duration_cast<std::chrono::nanoseconds>(now - start_time_);
start_time_ = now;
std::map<uint32_t, int64_t> status_histogram;
int64_t total = 0;
for (size_t status = 0; status <= max_status_; status++) {
int64_t status_num = status_histogram_[status].exchange(0);
status_histogram[status] = status_num;
total += status_num;
}
observation.parts.push_back(
ObservationPart("status", encoding_id_,
ValuePart::MakeDistribution(status_histogram), []() {}));
observation.parts.push_back(ObservationPart(
"total", encoding_id_, ValuePart::MakeInt(total), []() {}));
observation.parts.push_back(ObservationPart(
"collection_duration_ns", encoding_id_,
ValuePart::MakeInt(collection_duration.count()), []() {}));
observation.parts.push_back(
ObservationPart("average_time", encoding_id_,
ValuePart::MakeDouble(average_time), []() {}));
return observation;
}
void EventLogger::AppendObservations(std::vector<Observation>* observations) {
std::vector<Observation> sampler_observations;
timing_sampler_->AppendObservations(&sampler_observations);
int64_t total = 0;
for (auto iter = sampler_observations.begin();
sampler_observations.end() != iter; iter++) {
total += iter->parts[0].value.GetIntValue();
}
observations->insert(observations->end(), sampler_observations.begin(),
sampler_observations.end());
observations->push_back(
GetEventObservation(static_cast<double>(total) /
static_cast<double>(sampler_observations.size())));
}
template <>
ValuePart Sampler<int64_t>::GetValuePart(size_t idx) {
return ValuePart::MakeInt(reservoir_[idx]);
}
std::shared_ptr<Counter> ObservationsCollector::MakeCounter(
uint32_t metric_id, const std::string& part_name, uint32_t encoding_id) {
return GetMetricObservers(metric_id)->MakeCounter(part_name, encoding_id);
}
std::shared_ptr<Counter> ObservationsCollector::MakeCounter(
uint32_t metric_id, const std::string& part_name) {
return MakeCounter(metric_id, part_name, default_encoding_id_);
}
std::shared_ptr<IntegerSampler> ObservationsCollector::MakeIntegerSampler(
uint32_t metric_id, const std::string& part_name, uint32_t encoding_id,
size_t samples) {
auto reservoir_sampler = std::shared_ptr<Sampler<int64_t>>(
new Sampler<int64_t>(metric_id, part_name, encoding_id, samples));
reservoir_samplers_.push_back(
[reservoir_sampler](std::vector<Observation>* observations) {
reservoir_sampler->AppendObservations(observations);
});
return reservoir_sampler;
}
std::shared_ptr<IntegerSampler> ObservationsCollector::MakeIntegerSampler(
uint32_t metric_id, const std::string& part_name, size_t samples) {
return MakeIntegerSampler(metric_id, part_name, default_encoding_id_,
samples);
}
std::shared_ptr<EventLogger> ObservationsCollector::MakeEventLogger(
uint32_t event_metric_id, uint32_t max_status,
uint32_t event_timing_metric_id, size_t samples) {
auto event_logger = std::shared_ptr<EventLogger>(
new EventLogger(event_metric_id, max_status, event_timing_metric_id,
default_encoding_id_, samples));
event_loggers_.push_back(event_logger);
return event_logger;
}
std::shared_ptr<MetricObservers> ObservationsCollector::GetMetricObservers(
uint32_t metric_id) {
if (metrics_.count(metric_id) == 0) {
metrics_[metric_id] =
std::shared_ptr<MetricObservers>(new MetricObservers(metric_id));
}
return metrics_[metric_id];
}
void ObservationsCollector::Start(
std::chrono::nanoseconds collection_interval) {
collection_loop_continue_ = true;
collection_loop_ = std::thread(&ObservationsCollector::CollectLoop, this,
collection_interval);
}
void ObservationsCollector::Stop() {
collection_loop_continue_ = false;
collection_loop_.join();
}
void ObservationsCollector::CollectAll() {
std::vector<Observation> observations;
for (auto iter = metrics_.begin(); iter != metrics_.end(); iter++) {
observations.push_back(iter->second->GetObservation());
}
for (auto iter = reservoir_samplers_.begin();
iter != reservoir_samplers_.end(); iter++) {
(*iter)(&observations);
}
for (auto iter = event_loggers_.begin(); event_loggers_.end() != iter;
iter++) {
(*iter)->AppendObservations(&observations);
}
auto errors = send_observations_(&observations);
// Undo failed observations.
for (auto iter = errors.begin(); iter != errors.end(); iter++) {
for (auto parts_iter = observations[*iter].parts.begin();
parts_iter != observations[*iter].parts.end(); parts_iter++) {
parts_iter->undo();
}
}
}
void ObservationsCollector::CollectLoop(
std::chrono::nanoseconds collection_interval) {
while (collection_loop_continue_) {
CollectAll();
// TODO(azani): Add jitter.
std::this_thread::sleep_for(collection_interval);
}
// Collect one more time after being told to stop.
CollectAll();
}
} // namespace client
} // namespace cobalt