Basic reservoir sampler takes the first N samples.
This is just to try out the API and atomics.
Change-Id: I200945dcd06f88a38298f041a1af45bd3349782d
diff --git a/client/collection/observations_collector.cc b/client/collection/observations_collector.cc
index 1caa037..63a9c69 100644
--- a/client/collection/observations_collector.cc
+++ b/client/collection/observations_collector.cc
@@ -43,6 +43,11 @@
return observation;
}
+template <>
+ValuePart Sampler<int64_t>::GetValuePart(size_t idx) {
+ return ValuePart::MakeIntValuePart(reservoir_[idx]);
+}
+
std::shared_ptr<Counter> ObservationsCollector::MakeCounter(
uint32_t metric_id, const std::string& part_name, uint32_t encoding_id) {
return GetMetricObservers(metric_id)->MakeCounter(part_name, encoding_id);
@@ -53,6 +58,24 @@
return MakeCounter(metric_id, part_name, default_encoding_id_);
}
+std::shared_ptr<IntegerSampler> ObservationsCollector::MakeIntegerSampler(
+ uint32_t metric_id, const std::string& part_name, uint32_t encoding_id,
+ size_t samples) {
+ auto reservoir_sampler =
+ Sampler<int64_t>::Make(metric_id, part_name, encoding_id, samples);
+ reservoir_samplers_.push_back(
+ [reservoir_sampler](std::vector<Observation>* observations) {
+ reservoir_sampler->AppendObservations(observations);
+ });
+ return reservoir_sampler;
+}
+
+std::shared_ptr<IntegerSampler> ObservationsCollector::MakeIntegerSampler(
+ uint32_t metric_id, const std::string& part_name, size_t samples) {
+ return MakeIntegerSampler(metric_id, part_name, default_encoding_id_,
+ samples);
+}
+
std::shared_ptr<MetricObservers> ObservationsCollector::GetMetricObservers(
uint32_t metric_id) {
if (metrics_.count(metric_id) == 0) {
@@ -78,6 +101,11 @@
for (auto iter = metrics_.begin(); iter != metrics_.end(); iter++) {
observations.push_back(iter->second->GetObservation());
}
+
+ for (auto iter = reservoir_samplers_.begin();
+ iter != reservoir_samplers_.end(); iter++) {
+ (*iter)(&observations);
+ }
auto errors = send_observations_(&observations);
// Undo failed observations.
diff --git a/client/collection/observations_collector.h b/client/collection/observations_collector.h
index 5aea1c1..1ff56c3 100644
--- a/client/collection/observations_collector.h
+++ b/client/collection/observations_collector.h
@@ -4,23 +4,35 @@
// This file contains a library to be used by users of Cobalt in order to
// collect metrics at a high frequency. The main building blocks are the
-// ObservationsCollector and Counter classes.
+// ObservationsCollector, Counter and IntegerSampler classes.
//
-// Example: counting function calls
+// Example: counting and timing function calls
//
// ObservationsCollector collector(send_to_cobalt_function_pointer,
// kDefaultEncodingId);
//
-// auto foo_calls = collector.MakeCounter("foo_calls");
-// auto foo_calls = collector.MakeCounter("bar_calls");
+// auto foo_calls = collector.MakeCounter(kFooCallsMetricId,
+// kFooCallsMetricPartName);
+//
+// auto bar_calls = collector.MakeCounter(kBarCallsMetricId,
+// kBarCallsMetricPartName);
+//
+//
+// auto foo_call_time_sampler = collector.MakeIntegerSampler(
+// kFooCallTimeMetricId, kFooCallTimeMetricPartName, kNumberOfSamples);
//
// // Perform aggregation and send to Cobalt FIDL service every 1 second.
// collector.Start(std::chrono::seconds(1));
//
// void Foo() {
+// int64_t start = getCurTime();
// foo_calls.Increment();
// DoSomeFooWork
// ...
+// // Logs the amount of time Foo took to execute to the foo_call_sampler
+// // which will randomly select kNumberOfSamples observations to be sent to
+// // Cobalt.
+// foo_call_time_sampler.LogObservation(getCurTime() - start);
// }
//
// void Bar() {
@@ -49,15 +61,17 @@
namespace cobalt {
namespace client {
-// An SendObservationsFn is a callable object that takes a pointer to a vector
+// A SendObservationsFn is a callable object that takes a pointer to a vector
// of observations and returns a list of the observation indices for
-// observations that failed to be sent.
+// observations that failed to be sent. An empty list is returned on success.
+// The expectation is that this function will send observations to a consumer
+// such as sending observations to the Cobalt FIDL service on Fuchsia.
typedef std::function<std::vector<size_t>(std::vector<Observation>*)>
SendObservationsFn;
// A Counter allows you to keep track of the number of times an event has
-// occured. Every counter has an associated metric part.
-// A Counter can be incremented from an arbitrary number of threads.
+// occured. A counter is associated with a metric part.
+// Incrementing a counter is thread-safe.
class Counter {
public:
// Increments the counter by 1.
@@ -85,12 +99,82 @@
uint32_t encoding_id_;
};
+// A Sampler has an associated |size| passed as |samples| to the Make*Sampler()
+// method on the ObservationsCollector.
+// Each collection period, the Sampler will attempt to uniformly sample up to
+// |size| of the logged observations. The sampled observations will be
+// collected by the ObservationsCollector.
+// LogObservation is thread-safe.
+template <class T>
+class Sampler {
+ public:
+ void LogObservation(const T& value) {
+ uint64_t idx = num_seen_++;
+ // idx should now be a unique number.
+
+ if (idx < size_) {
+ reservoir_[idx] = value;
+ num_written_++;
+ }
+
+ // TODO(azani): Handle the case where num_seen_ > size_.
+ }
+
+ private:
+ friend class ObservationsCollector;
+
+ static std::shared_ptr<Sampler<T>> Make(uint32_t metric_id,
+ const std::string& part_name,
+ uint32_t encoding_id,
+ size_t samples) {
+ return std::shared_ptr<Sampler<T>>(
+ new Sampler(metric_id, part_name, encoding_id, samples));
+ }
+
+ Sampler(uint32_t metric_id, const std::string& part_name,
+ uint32_t encoding_id, size_t samples)
+ : metric_id_(metric_id),
+ part_name_(part_name),
+ encoding_id_(encoding_id),
+ size_(samples),
+ reservoir_(new std::atomic<T>[size_]),
+ num_seen_(0),
+ num_written_(0) {}
+
+ ValuePart GetValuePart(size_t idx);
+
+ void AppendObservations(std::vector<Observation>* observations) {
+ for (size_t i = 0; i < num_written_; i++) {
+ Observation observation;
+ observation.metric_id = metric_id_;
+ // TODO(azani): Figure out how to do the undo function.
+ observation.parts.push_back(
+ ObservationPart(part_name_, encoding_id_, GetValuePart(i), []() {}));
+ observations->push_back(observation);
+ }
+ num_written_ = 0;
+ num_seen_ = 0;
+ }
+
+ uint32_t metric_id_;
+ std::string part_name_;
+ uint32_t encoding_id_;
+ // Reservoir size.
+ size_t size_;
+ std::unique_ptr<std::atomic<T>[]> reservoir_;
+ std::atomic<size_t> num_seen_;
+ // num_written_ is used to determin how many values are available to be read.
+ std::atomic<size_t> num_written_;
+};
+
+using IntegerSampler = Sampler<int64_t>;
+
// A MetricObservers allows you to group together several observers that
// correspond to metric parts.
class MetricObservers {
public:
// Makes a Counter associated with this metric.
- // The part_name specified must correspond to an integer part name.
+ // The part_name specified must be the name of an integer part.
// The encoding_id specified must be the id of an encoding in the cobalt
// config.
std::shared_ptr<Counter> MakeCounter(const std::string& part_name,
@@ -117,24 +201,37 @@
class ObservationsCollector {
public:
// send_observations will be used to send the collected observations.
- // default_encoding_id is the encoding id used unless another one is
- // specified.
+ // default_encoding_id is the encoding id used when no other encoding id
+ // is used while making Counters or Samplers.
explicit ObservationsCollector(SendObservationsFn send_observations,
uint32_t default_encoding_id)
: send_observations_(send_observations),
default_encoding_id_(default_encoding_id) {}
- // Makes a Counter object for the specified metric id, part name and to be
+ // Makes a Counter object for the specified metric id, part name and
// encoded using the default encoding id.
std::shared_ptr<Counter> MakeCounter(uint32_t metric_id,
const std::string& part_name);
- // Makes a Counter object for the specified metric id, part name and to be
+ // Makes a Counter object for the specified metric id, part name and
// encoded using the specified encoding id.
std::shared_ptr<Counter> MakeCounter(uint32_t metric_id,
const std::string& part_name,
uint32_t encoding_id);
+ // Makes an IntegerSampler for the specified metric id, part name and
+ // encoded using the specified encoding id. At most, |samples| samples will be
+ // collected per collection period.
+ std::shared_ptr<IntegerSampler> MakeIntegerSampler(
+ uint32_t metric_id, const std::string& part_name, uint32_t encoding_id,
+ size_t samples);
+
+ // Makes an IntegerSampler for the specified metric id, part name and
+ // encoded using the default encoding id. At most, |samples| samples will be
+ // collected per collection period.
+ std::shared_ptr<IntegerSampler> MakeIntegerSampler(
+ uint32_t metric_id, const std::string& part_name, size_t samples);
+
// Starts a new thread that collects and attempts to send metrics every
// |collection_interval|.
// Calling Start more than once without first calling Stop has undefined
@@ -156,6 +253,8 @@
// Map of metric id -> MetricObservers.
std::map<uint32_t, std::shared_ptr<MetricObservers>> metrics_;
+ std::vector<std::function<void(std::vector<Observation>*)>>
+ reservoir_samplers_;
// Thread on which the collection loop is run.
std::thread collection_loop_;
// Set to false to stop collection.
diff --git a/client/collection/observations_collector_test.cc b/client/collection/observations_collector_test.cc
index 5d0fc5d..c0fba38 100644
--- a/client/collection/observations_collector_test.cc
+++ b/client/collection/observations_collector_test.cc
@@ -30,13 +30,15 @@
// Sink is used to gather all the observations sent by the MetricFactory.
struct Sink {
+ explicit Sink(bool with_errors) : with_errors(with_errors) {}
+
std::vector<size_t> SendObservations(std::vector<Observation>* obs) {
std::vector<size_t> errors;
for (auto iter = std::make_move_iterator(obs->begin());
std::make_move_iterator(obs->end()) != iter; iter++) {
// Randomly fail to "send" some observations.
- if (std::rand() % 5 == 0) {
+ if (with_errors && std::rand() % 5 == 0) {
errors.push_back(
std::distance(std::make_move_iterator(obs->begin()), iter));
continue;
@@ -47,6 +49,7 @@
}
std::vector<Observation> observations;
+ bool with_errors;
};
} // namespace
@@ -55,7 +58,7 @@
TEST(Counter, Normal) {
// Metric id.
const int64_t id = 10;
- Sink sink;
+ Sink sink(true);
ObservationsCollector collector(
std::bind(&Sink::SendObservations, &sink, std::placeholders::_1), 1);
auto counter = collector.MakeCounter(id, "part_name");
@@ -94,6 +97,37 @@
EXPECT_EQ(expected, actual);
}
+TEST(Sampler, IntegerNoErrors) {
+ // Metric id.
+ const int64_t id = 10;
+ Sink sink(false);
+ ObservationsCollector collector(
+ std::bind(&Sink::SendObservations, &sink, std::placeholders::_1), 1);
+
+ auto int_sampler = collector.MakeIntegerSampler(id, "part_name", 10);
+
+ int64_t primes[] = {2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37,
+ 41, 43, 47, 53, 59, 61, 67, 71, 73, 79, 83};
+ int64_t expected = 1;
+ for (size_t i = 0; i < 10; i++) {
+ expected *= primes[i];
+ }
+
+ for (size_t i = 0; i < 15; i++) {
+ int_sampler->LogObservation(primes[i]);
+ }
+
+ collector.CollectAll();
+
+ int64_t actual = 1;
+ for (auto iter = sink.observations.begin(); sink.observations.end() != iter;
+ iter++) {
+ actual *= (*iter).parts[0].value.GetIntValue();
+ }
+
+ EXPECT_EQ(expected, actual);
+}
+
// Check that the integer value part work correctly.
TEST(ValuePart, IntValuePart) {
ValuePart value = ValuePart::MakeIntValuePart(10);