Basic reservoir sampler takes the first N samples.

This is just to try out the API and atomics.

Change-Id: I200945dcd06f88a38298f041a1af45bd3349782d
diff --git a/client/collection/observations_collector.cc b/client/collection/observations_collector.cc
index 1caa037..63a9c69 100644
--- a/client/collection/observations_collector.cc
+++ b/client/collection/observations_collector.cc
@@ -43,6 +43,11 @@
   return observation;
 }
 
+template <>
+ValuePart Sampler<int64_t>::GetValuePart(size_t idx) {
+  return ValuePart::MakeIntValuePart(reservoir_[idx]);
+}
+
 std::shared_ptr<Counter> ObservationsCollector::MakeCounter(
     uint32_t metric_id, const std::string& part_name, uint32_t encoding_id) {
   return GetMetricObservers(metric_id)->MakeCounter(part_name, encoding_id);
@@ -53,6 +58,24 @@
   return MakeCounter(metric_id, part_name, default_encoding_id_);
 }
 
+std::shared_ptr<IntegerSampler> ObservationsCollector::MakeIntegerSampler(
+    uint32_t metric_id, const std::string& part_name, uint32_t encoding_id,
+    size_t samples) {
+  auto reservoir_sampler =
+      Sampler<int64_t>::Make(metric_id, part_name, encoding_id, samples);
+  reservoir_samplers_.push_back(
+      [reservoir_sampler](std::vector<Observation>* observations) {
+        reservoir_sampler->AppendObservations(observations);
+      });
+  return reservoir_sampler;
+}
+
+std::shared_ptr<IntegerSampler> ObservationsCollector::MakeIntegerSampler(
+    uint32_t metric_id, const std::string& part_name, size_t samples) {
+  return MakeIntegerSampler(metric_id, part_name, default_encoding_id_,
+                            samples);
+}
+
 std::shared_ptr<MetricObservers> ObservationsCollector::GetMetricObservers(
     uint32_t metric_id) {
   if (metrics_.count(metric_id) == 0) {
@@ -78,6 +101,11 @@
   for (auto iter = metrics_.begin(); iter != metrics_.end(); iter++) {
     observations.push_back(iter->second->GetObservation());
   }
+
+  for (auto iter = reservoir_samplers_.begin();
+       iter != reservoir_samplers_.end(); iter++) {
+    (*iter)(&observations);
+  }
   auto errors = send_observations_(&observations);
 
   // Undo failed observations.
diff --git a/client/collection/observations_collector.h b/client/collection/observations_collector.h
index 5aea1c1..1ff56c3 100644
--- a/client/collection/observations_collector.h
+++ b/client/collection/observations_collector.h
@@ -4,23 +4,35 @@
 
 // This file contains a library to be used by users of Cobalt in order to
 // collect metrics at a high frequency. The main building blocks are the
-// ObservationsCollector and Counter classes.
+// ObservationsCollector, Counter and IntegerSampler classes.
 //
-// Example: counting function calls
+// Example: counting and timing function calls
 //
 // ObservationsCollector collector(send_to_cobalt_function_pointer,
 //                                 kDefaultEncodingId);
 //
-// auto foo_calls = collector.MakeCounter("foo_calls");
-// auto foo_calls = collector.MakeCounter("bar_calls");
+// auto foo_calls = collector.MakeCounter(kFooCallsMetricId,
+//                                        kFooCallsMetricPartName);
+//
+// auto bar_calls = collector.MakeCounter(kBarCallsMetricId,
+//                                        kBarCallsMetricPartName);
+//
+//
+// auto foo_call_time_sampler = collector.MakeIntegerSampler(
+//     kFooCallTimeMetricId, kFooCallTimeMetricPartName, kNumberOfSamples);
 //
 // // Perform aggregation and send to Cobalt FIDL service every 1 second.
 // collector.Start(std::chrono::seconds(1));
 //
 // void Foo() {
+//   int64_t start = getCurTime();
 //   foo_calls.Increment();
 //   DoSomeFooWork
 //   ...
+//   // Logs the amount of time Foo took to execute to the foo_call_sampler
+//   // which will randomly select kNumberOfSamples observations to be sent to
+//   // Cobalt.
+//   foo_call_time_sampler.LogObservation(getCurTime() - start);
 // }
 //
 // void Bar() {
@@ -49,15 +61,17 @@
 namespace cobalt {
 namespace client {
 
-// An SendObservationsFn is a callable object that takes a pointer to a vector
+// A SendObservationsFn is a callable object that takes a pointer to a vector
 // of observations and returns a list of the observation indices for
-// observations that failed to be sent.
+// observations that failed to be sent. An empty list is returned on success.
+// The expectation is that this function will send observations to a consumer
+// such as sending observations to the Cobalt FIDL service on Fuchsia.
 typedef std::function<std::vector<size_t>(std::vector<Observation>*)>
     SendObservationsFn;
 
 // A Counter allows you to keep track of the number of times an event has
-// occured. Every counter has an associated metric part.
-// A Counter can be incremented from an arbitrary number of threads.
+// occured. A counter is associated with a metric part.
+// Incrementing a counter is thread-safe.
 class Counter {
  public:
   // Increments the counter by 1.
@@ -85,12 +99,82 @@
   uint32_t encoding_id_;
 };
 
+// A Sampler has an associated |size| passed as |samples| to the Make*Sampler()
+// method on the ObservationsCollector.
+// Each collection period, the Sampler will attempt to uniformly sample up to
+// |size| of the logged observations. The sampled observations will be
+// collected by the ObservationsCollector.
+// LogObservation is thread-safe.
+template <class T>
+class Sampler {
+ public:
+  void LogObservation(const T& value) {
+    uint64_t idx = num_seen_++;
+    // idx should now be a unique number.
+
+    if (idx < size_) {
+      reservoir_[idx] = value;
+      num_written_++;
+    }
+
+    // TODO(azani): Handle the case where num_seen_ > size_.
+  }
+
+ private:
+  friend class ObservationsCollector;
+
+  static std::shared_ptr<Sampler<T>> Make(uint32_t metric_id,
+                                          const std::string& part_name,
+                                          uint32_t encoding_id,
+                                          size_t samples) {
+    return std::shared_ptr<Sampler<T>>(
+        new Sampler(metric_id, part_name, encoding_id, samples));
+  }
+
+  Sampler(uint32_t metric_id, const std::string& part_name,
+          uint32_t encoding_id, size_t samples)
+      : metric_id_(metric_id),
+        part_name_(part_name),
+        encoding_id_(encoding_id),
+        size_(samples),
+        reservoir_(new std::atomic<T>[size_]),
+        num_seen_(0),
+        num_written_(0) {}
+
+  ValuePart GetValuePart(size_t idx);
+
+  void AppendObservations(std::vector<Observation>* observations) {
+    for (size_t i = 0; i < num_written_; i++) {
+      Observation observation;
+      observation.metric_id = metric_id_;
+      // TODO(azani): Figure out how to do the undo function.
+      observation.parts.push_back(
+          ObservationPart(part_name_, encoding_id_, GetValuePart(i), []() {}));
+      observations->push_back(observation);
+    }
+    num_written_ = 0;
+    num_seen_ = 0;
+  }
+
+  uint32_t metric_id_;
+  std::string part_name_;
+  uint32_t encoding_id_;
+  // Reservoir size.
+  size_t size_;
+  std::unique_ptr<std::atomic<T>[]> reservoir_;
+  std::atomic<size_t> num_seen_;
+  // num_written_ is used to determin how many values are available to be read.
+  std::atomic<size_t> num_written_;
+};
+
+using IntegerSampler = Sampler<int64_t>;
+
 // A MetricObservers allows you to group together several observers that
 // correspond to metric parts.
 class MetricObservers {
  public:
   // Makes a Counter associated with this metric.
-  // The part_name specified must correspond to an integer part name.
+  // The part_name specified must be the name of an integer part.
   // The encoding_id specified must be the id of an encoding in the cobalt
   // config.
   std::shared_ptr<Counter> MakeCounter(const std::string& part_name,
@@ -117,24 +201,37 @@
 class ObservationsCollector {
  public:
   // send_observations will be used to send the collected observations.
-  // default_encoding_id is the encoding id used unless another one is
-  // specified.
+  // default_encoding_id is the encoding id used when no other encoding id
+  // is used while making Counters or Samplers.
   explicit ObservationsCollector(SendObservationsFn send_observations,
                                  uint32_t default_encoding_id)
       : send_observations_(send_observations),
         default_encoding_id_(default_encoding_id) {}
 
-  // Makes a Counter object for the specified metric id, part name and to be
+  // Makes a Counter object for the specified metric id, part name and
   // encoded using the default encoding id.
   std::shared_ptr<Counter> MakeCounter(uint32_t metric_id,
                                        const std::string& part_name);
 
-  // Makes a Counter object for the specified metric id, part name and to be
+  // Makes a Counter object for the specified metric id, part name and
   // encoded using the specified encoding id.
   std::shared_ptr<Counter> MakeCounter(uint32_t metric_id,
                                        const std::string& part_name,
                                        uint32_t encoding_id);
 
+  // Makes an IntegerSampler for the specified metric id, part name and
+  // encoded using the specified encoding id. At most, |samples| samples will be
+  // collected per collection period.
+  std::shared_ptr<IntegerSampler> MakeIntegerSampler(
+      uint32_t metric_id, const std::string& part_name, uint32_t encoding_id,
+      size_t samples);
+
+  // Makes an IntegerSampler for the specified metric id, part name and
+  // encoded using the default encoding id. At most, |samples| samples will be
+  // collected per collection period.
+  std::shared_ptr<IntegerSampler> MakeIntegerSampler(
+      uint32_t metric_id, const std::string& part_name, size_t samples);
+
   // Starts a new thread that collects and attempts to send metrics every
   // |collection_interval|.
   // Calling Start more than once without first calling Stop has undefined
@@ -156,6 +253,8 @@
 
   // Map of metric id -> MetricObservers.
   std::map<uint32_t, std::shared_ptr<MetricObservers>> metrics_;
+  std::vector<std::function<void(std::vector<Observation>*)>>
+      reservoir_samplers_;
   // Thread on which the collection loop is run.
   std::thread collection_loop_;
   // Set to false to stop collection.
diff --git a/client/collection/observations_collector_test.cc b/client/collection/observations_collector_test.cc
index 5d0fc5d..c0fba38 100644
--- a/client/collection/observations_collector_test.cc
+++ b/client/collection/observations_collector_test.cc
@@ -30,13 +30,15 @@
 
 // Sink is used to gather all the observations sent by the MetricFactory.
 struct Sink {
+  explicit Sink(bool with_errors) : with_errors(with_errors) {}
+
   std::vector<size_t> SendObservations(std::vector<Observation>* obs) {
     std::vector<size_t> errors;
 
     for (auto iter = std::make_move_iterator(obs->begin());
          std::make_move_iterator(obs->end()) != iter; iter++) {
       // Randomly fail to "send" some observations.
-      if (std::rand() % 5 == 0) {
+      if (with_errors && std::rand() % 5 == 0) {
         errors.push_back(
             std::distance(std::make_move_iterator(obs->begin()), iter));
         continue;
@@ -47,6 +49,7 @@
   }
 
   std::vector<Observation> observations;
+  bool with_errors;
 };
 
 }  // namespace
@@ -55,7 +58,7 @@
 TEST(Counter, Normal) {
   // Metric id.
   const int64_t id = 10;
-  Sink sink;
+  Sink sink(true);
   ObservationsCollector collector(
       std::bind(&Sink::SendObservations, &sink, std::placeholders::_1), 1);
   auto counter = collector.MakeCounter(id, "part_name");
@@ -94,6 +97,37 @@
   EXPECT_EQ(expected, actual);
 }
 
+TEST(Sampler, IntegerNoErrors) {
+  // Metric id.
+  const int64_t id = 10;
+  Sink sink(false);
+  ObservationsCollector collector(
+      std::bind(&Sink::SendObservations, &sink, std::placeholders::_1), 1);
+
+  auto int_sampler = collector.MakeIntegerSampler(id, "part_name", 10);
+
+  int64_t primes[] = {2,  3,  5,  7,  11, 13, 17, 19, 23, 29, 31, 37,
+                      41, 43, 47, 53, 59, 61, 67, 71, 73, 79, 83};
+  int64_t expected = 1;
+  for (size_t i = 0; i < 10; i++) {
+    expected *= primes[i];
+  }
+
+  for (size_t i = 0; i < 15; i++) {
+    int_sampler->LogObservation(primes[i]);
+  }
+
+  collector.CollectAll();
+
+  int64_t actual = 1;
+  for (auto iter = sink.observations.begin(); sink.observations.end() != iter;
+       iter++) {
+    actual *= (*iter).parts[0].value.GetIntValue();
+  }
+
+  EXPECT_EQ(expected, actual);
+}
+
 // Check that the integer value part work correctly.
 TEST(ValuePart, IntValuePart) {
   ValuePart value = ValuePart::MakeIntValuePart(10);