blob: 1ff56c3c683fe6f5fbc20df87e0ae31c372abbc2 [file] [log] [blame]
// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
// This file contains a library to be used by users of Cobalt in order to
// collect metrics at a high frequency. The main building blocks are the
// ObservationsCollector, Counter and IntegerSampler classes.
//
// Example: counting and timing function calls
//
// ObservationsCollector collector(send_to_cobalt_function_pointer,
// kDefaultEncodingId);
//
// auto foo_calls = collector.MakeCounter(kFooCallsMetricId,
// kFooCallsMetricPartName);
//
// auto bar_calls = collector.MakeCounter(kBarCallsMetricId,
// kBarCallsMetricPartName);
//
//
// auto foo_call_time_sampler = collector.MakeIntegerSampler(
// kFooCallTimeMetricId, kFooCallTimeMetricPartName, kNumberOfSamples);
//
// // Perform aggregation and send to Cobalt FIDL service every 1 second.
// collector.Start(std::chrono::seconds(1));
//
// void Foo() {
// int64_t start = getCurTime();
// foo_calls.Increment();
// DoSomeFooWork
// ...
// // Logs the amount of time Foo took to execute to the foo_call_sampler
// // which will randomly select kNumberOfSamples observations to be sent to
// // Cobalt.
// foo_call_time_sampler.LogObservation(getCurTime() - start);
// }
//
// void Bar() {
// bar_calls.Increment();
// DoSomeBarWork
// ...
// }
#ifndef COBALT_CLIENT_COLLECTION_OBSERVATIONS_COLLECTOR_H_
#define COBALT_CLIENT_COLLECTION_OBSERVATIONS_COLLECTOR_H_
#include <atomic>
#include <chrono>
#include <cstdint>
#include <cstdlib>
#include <iostream>
#include <map>
#include <memory>
#include <string>
#include <thread>
#include <utility>
#include <vector>
#include "client/collection/observation.h"
namespace cobalt {
namespace client {
// A SendObservationsFn is a callable object that takes a pointer to a vector
// of observations and returns a list of the observation indices for
// observations that failed to be sent. An empty list is returned on success.
// The expectation is that this function will send observations to a consumer
// such as sending observations to the Cobalt FIDL service on Fuchsia.
typedef std::function<std::vector<size_t>(std::vector<Observation>*)>
SendObservationsFn;
// A Counter allows you to keep track of the number of times an event has
// occured. A counter is associated with a metric part.
// Incrementing a counter is thread-safe.
class Counter {
public:
// Increments the counter by 1.
inline void Increment() { counter_++; }
private:
friend class MetricObservers;
// Make a counter with the specified part name.
static std::shared_ptr<Counter> Make(const std::string& part_name,
uint32_t encoding_id) {
return std::shared_ptr<Counter>(new Counter(part_name, encoding_id));
}
explicit Counter(const std::string& part_name, uint32_t encoding_id)
: counter_(0), part_name_(part_name), encoding_id_(encoding_id) {}
// Returns an integer ObservationPart and sets the counter's value to 0.
// If the ObservationPart undo function is called, the counter's value is
// added back on top of the counter.
ObservationPart GetObservationPart();
std::atomic<int64_t> counter_;
std::string part_name_;
uint32_t encoding_id_;
};
// A Sampler has an associated |size| passed as |samples| to the Make*Sampler()
// method on the ObservationsCollector.
// Each collection period, the Sampler will attempt to uniformly sample up to
// |size| of the logged observations. The sampled observations will be
// collected by the ObservationsCollector.
// LogObservation is thread-safe.
template <class T>
class Sampler {
public:
void LogObservation(const T& value) {
uint64_t idx = num_seen_++;
// idx should now be a unique number.
if (idx < size_) {
reservoir_[idx] = value;
num_written_++;
}
// TODO(azani): Handle the case where num_seen_ > size_.
}
private:
friend class ObservationsCollector;
static std::shared_ptr<Sampler<T>> Make(uint32_t metric_id,
const std::string& part_name,
uint32_t encoding_id,
size_t samples) {
return std::shared_ptr<Sampler<T>>(
new Sampler(metric_id, part_name, encoding_id, samples));
}
Sampler(uint32_t metric_id, const std::string& part_name,
uint32_t encoding_id, size_t samples)
: metric_id_(metric_id),
part_name_(part_name),
encoding_id_(encoding_id),
size_(samples),
reservoir_(new std::atomic<T>[size_]),
num_seen_(0),
num_written_(0) {}
ValuePart GetValuePart(size_t idx);
void AppendObservations(std::vector<Observation>* observations) {
for (size_t i = 0; i < num_written_; i++) {
Observation observation;
observation.metric_id = metric_id_;
// TODO(azani): Figure out how to do the undo function.
observation.parts.push_back(
ObservationPart(part_name_, encoding_id_, GetValuePart(i), []() {}));
observations->push_back(observation);
}
num_written_ = 0;
num_seen_ = 0;
}
uint32_t metric_id_;
std::string part_name_;
uint32_t encoding_id_;
// Reservoir size.
size_t size_;
std::unique_ptr<std::atomic<T>[]> reservoir_;
std::atomic<size_t> num_seen_;
// num_written_ is used to determin how many values are available to be read.
std::atomic<size_t> num_written_;
};
using IntegerSampler = Sampler<int64_t>;
// A MetricObservers allows you to group together several observers that
// correspond to metric parts.
class MetricObservers {
public:
// Makes a Counter associated with this metric.
// The part_name specified must be the name of an integer part.
// The encoding_id specified must be the id of an encoding in the cobalt
// config.
std::shared_ptr<Counter> MakeCounter(const std::string& part_name,
uint32_t encoding_id);
private:
friend class ObservationsCollector;
static std::shared_ptr<MetricObservers> Make(uint32_t id);
explicit MetricObservers(uint32_t id) : id_(id) {}
// Gets the Observation.
Observation GetObservation();
// MetricObservers id.
uint32_t id_;
// Map of counters part_name -> Counter.
std::map<std::string, std::shared_ptr<Counter>> counters_;
};
// A ObservationsCollector tracks various metrics, collects their values into
// observations and sends them.
class ObservationsCollector {
public:
// send_observations will be used to send the collected observations.
// default_encoding_id is the encoding id used when no other encoding id
// is used while making Counters or Samplers.
explicit ObservationsCollector(SendObservationsFn send_observations,
uint32_t default_encoding_id)
: send_observations_(send_observations),
default_encoding_id_(default_encoding_id) {}
// Makes a Counter object for the specified metric id, part name and
// encoded using the default encoding id.
std::shared_ptr<Counter> MakeCounter(uint32_t metric_id,
const std::string& part_name);
// Makes a Counter object for the specified metric id, part name and
// encoded using the specified encoding id.
std::shared_ptr<Counter> MakeCounter(uint32_t metric_id,
const std::string& part_name,
uint32_t encoding_id);
// Makes an IntegerSampler for the specified metric id, part name and
// encoded using the specified encoding id. At most, |samples| samples will be
// collected per collection period.
std::shared_ptr<IntegerSampler> MakeIntegerSampler(
uint32_t metric_id, const std::string& part_name, uint32_t encoding_id,
size_t samples);
// Makes an IntegerSampler for the specified metric id, part name and
// encoded using the default encoding id. At most, |samples| samples will be
// collected per collection period.
std::shared_ptr<IntegerSampler> MakeIntegerSampler(
uint32_t metric_id, const std::string& part_name, size_t samples);
// Starts a new thread that collects and attempts to send metrics every
// |collection_interval|.
// Calling Start more than once without first calling Stop has undefined
// behavior.
void Start(std::chrono::nanoseconds collection_interval);
// Instructs the collection thread started by Start to stop and joins that
// thread.
void Stop();
// CollectAll attempts to collect observations for all MetricObservers
// created with this collector and send them using |send_observations|.
void CollectAll();
private:
std::shared_ptr<MetricObservers> GetMetricObservers(uint32_t id);
void CollectLoop(std::chrono::nanoseconds collection_interval);
// Map of metric id -> MetricObservers.
std::map<uint32_t, std::shared_ptr<MetricObservers>> metrics_;
std::vector<std::function<void(std::vector<Observation>*)>>
reservoir_samplers_;
// Thread on which the collection loop is run.
std::thread collection_loop_;
// Set to false to stop collection.
bool collection_loop_continue_;
// Call this function to send observations.
SendObservationsFn send_observations_;
// The encoding id to be used when none is specified.
uint32_t default_encoding_id_;
};
} // namespace client
} // namespace cobalt
#endif // COBALT_CLIENT_COLLECTION_OBSERVATIONS_COLLECTOR_H_