[cobalt-client]: Add Collector.

Collector is the peer class in charge of collecting the
recorded data. This class uses a Logger to persist the data.

A CobaltLogger will be provided, and will be the default
for anyone instantiating the collector, through a factory
method. This abstractions simply lets us inject testing
loggers to verify what is persisted.

We require that the number of Histograms and counters
be defined upfront, to improve locality.

TEST=cobalt-client-tests

Change-Id: I78154a83e33df94e696e4cf3a24c90276ed4f01a
diff --git a/system/ulib/cobalt-client/collector.cpp b/system/ulib/cobalt-client/collector.cpp
new file mode 100644
index 0000000..118643f
--- /dev/null
+++ b/system/ulib/cobalt-client/collector.cpp
@@ -0,0 +1,127 @@
+// Copyright 2018 The Fuchsia Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include <threads.h>
+
+#include <cobalt-client/cpp/collector-internal.h>
+#include <cobalt-client/cpp/collector.h>
+#include <cobalt-client/cpp/counter-internal.h>
+#include <cobalt-client/cpp/histogram-internal.h>
+#include <cobalt-client/cpp/types-internal.h>
+#include <fuchsia/cobalt/c/fidl.h>
+#include <lib/fidl/cpp/vector_view.h>
+#include <lib/zx/channel.h>
+
+namespace cobalt_client {
+namespace {
+
+using internal::Logger;
+using internal::Metadata;
+using internal::RemoteCounter;
+using internal::RemoteHistogram;
+
+Metadata MakeMetadata(uint32_t event_type_index) {
+    Metadata metadata;
+    metadata.event_type = 0;
+    metadata.event_type_index = event_type_index;
+
+    return metadata;
+}
+
+} // namespace
+
+Collector::Collector(const CollectorOptions& options, fbl::unique_ptr<internal::Logger> logger)
+    : logger_(fbl::move(logger)) {
+    flushing_.store(false);
+    remote_counters_.reserve(options.max_counters);
+    remote_histograms_.reserve(options.max_histograms);
+    histogram_options_.reserve(options.max_histograms);
+}
+
+Collector::Collector(Collector&& other)
+    : histogram_options_(fbl::move(other.histogram_options_)),
+      remote_histograms_(fbl::move(other.remote_histograms_)),
+      remote_counters_(fbl::move(other.remote_counters_)), logger_(fbl::move(other.logger_)),
+      flushing_(other.flushing_.load()) {}
+
+Collector::~Collector() {
+    if (logger_ != nullptr) {
+        Flush();
+    }
+};
+
+Histogram Collector::AddHistogram(uint64_t metric_id, uint32_t event_type_index,
+                                  const HistogramOptions& options) {
+    ZX_DEBUG_ASSERT_MSG(remote_histograms_.size() < remote_histograms_.capacity(),
+                        "Exceeded pre-allocated histogram capacity.");
+    remote_histograms_.push_back(
+        RemoteHistogram(options.bucket_count + 2, metric_id, {MakeMetadata(event_type_index)}));
+    histogram_options_.push_back(options);
+    size_t index = remote_histograms_.size() - 1;
+    return Histogram(&histogram_options_[index], &remote_histograms_[index]);
+}
+
+Counter Collector::AddCounter(uint64_t metric_id, uint32_t event_type_index) {
+    ZX_DEBUG_ASSERT_MSG(remote_counters_.size() < remote_counters_.capacity(),
+                        "Exceeded pre-allocated counter capacity.");
+    remote_counters_.push_back(RemoteCounter(metric_id, {MakeMetadata(event_type_index)}));
+    size_t index = remote_counters_.size() - 1;
+    return Counter(&remote_counters_[index]);
+}
+
+void Collector::Flush() {
+    // If we are already flushing we just return and do nothing.
+    // First come first serve.
+    if (flushing_.exchange(true)) {
+        return;
+    }
+
+    for (auto& histogram : remote_histograms_) {
+        LogHistogram(&histogram);
+    }
+
+    for (auto& counter : remote_counters_) {
+        LogCounter(&counter);
+    }
+
+    // Once we are finished we allow flushing again.
+    flushing_.store(false);
+}
+
+void Collector::LogHistogram(RemoteHistogram* histogram) {
+    histogram->Flush([this, histogram](uint64_t metric_id,
+                                       const RemoteHistogram::EventBuffer& buffer,
+                                       RemoteHistogram::FlushCompleteFn complete_fn) {
+        if (!logger_->Log(metric_id, buffer)) {
+            // If we failed to log the data, then add the values again to the histogram, so they may
+            // be flushed in the future, and we dont need to keep a buffer around for retrying or
+            // anything.
+            for (auto& bucket : buffer.event_data()) {
+                if (bucket.count > 0) {
+                    histogram->IncrementCount(bucket.index, bucket.count);
+                }
+            }
+        }
+
+        // Make the buffer writeable again.
+        complete_fn();
+    });
+}
+
+void Collector::LogCounter(RemoteCounter* counter) {
+    counter->Flush([this, counter](uint64_t metric_id, const RemoteCounter::EventBuffer& buffer,
+                                   RemoteCounter::FlushCompleteFn complete_fn) {
+        // Attempt to log data, if we fail, we increase the in process counter by the amount
+        // flushed.
+        if (!logger_->Log(metric_id, buffer)) {
+            if (buffer.event_data() > 0) {
+                counter->Increment(buffer.event_data());
+            }
+        }
+        // Make the buffer writeable again.
+        complete_fn();
+    });
+}
+
+} // namespace cobalt_client
diff --git a/system/ulib/cobalt-client/counter.cpp b/system/ulib/cobalt-client/counter.cpp
index 413561b..14c02e5 100644
--- a/system/ulib/cobalt-client/counter.cpp
+++ b/system/ulib/cobalt-client/counter.cpp
@@ -27,8 +27,7 @@
     }
     // Write the current value of the counter to the buffer, and reset it to 0.
     *buffer_.mutable_event_data() = static_cast<uint32_t>(this->Exchange());
-    flush_handler(metric_id_, buffer_,
-                  fbl::BindMember(&buffer_, &EventBuffer<uint32_t>::CompleteFlush));
+    flush_handler(metric_id_, buffer_, fbl::BindMember(&buffer_, &EventBuffer::CompleteFlush));
     return true;
 }
 } // namespace internal
diff --git a/system/ulib/cobalt-client/event_buffer.cpp b/system/ulib/cobalt-client/event_buffer.cpp
index bcc801e..1154fc9 100644
--- a/system/ulib/cobalt-client/event_buffer.cpp
+++ b/system/ulib/cobalt-client/event_buffer.cpp
@@ -19,6 +19,7 @@
 template <typename T> EventBuffer<T>::EventBuffer(EventBuffer&& other) {
     flushing_.store(other.flushing_.load());
     buffer_ = fbl::move(other.buffer_);
+    metadata_ = fbl::move(other.metadata_);
 }
 
 template <typename T> EventBuffer<T>::~EventBuffer() = default;
diff --git a/system/ulib/cobalt-client/histogram.cpp b/system/ulib/cobalt-client/histogram.cpp
index 4823005..a24c387 100644
--- a/system/ulib/cobalt-client/histogram.cpp
+++ b/system/ulib/cobalt-client/histogram.cpp
@@ -66,6 +66,24 @@
     return unshifted_bucket + 1;
 }
 
+void LoadExponential(HistogramOptions* options) {
+    double max_value =
+        options->scalar * pow(options->base, options->bucket_count) + options->offset;
+    options->map_fn = [max_value](double val, const HistogramOptions& options) {
+        return internal::GetExponentialBucket(val, options, max_value);
+    };
+    options->reverse_map_fn = internal::GetExponentialBucketValue;
+}
+
+void LoadLinear(HistogramOptions* options) {
+    double max_value =
+        static_cast<double>(options->scalar * options->bucket_count + options->offset);
+    options->map_fn = [max_value](double val, const HistogramOptions& options) {
+        return internal::GetLinearBucket(val, options, max_value);
+    };
+    options->reverse_map_fn = internal::GetLinearBucketValue;
+}
+
 } // namespace
 
 BaseHistogram::BaseHistogram(uint32_t num_buckets) {
@@ -107,13 +125,21 @@
         bucket_buffer_[bucket_index].count = buckets_[bucket_index].Exchange();
     }
 
-    flush_handler(
-        metric_id_, buffer_,
-        fbl::BindMember(&buffer_, &EventBuffer<fidl::VectorView<HistogramBucket>>::CompleteFlush));
+    flush_handler(metric_id_, buffer_, fbl::BindMember(&buffer_, &EventBuffer::CompleteFlush));
     return true;
 }
 } // namespace internal
 
+HistogramOptions::HistogramOptions(const HistogramOptions& other)
+    : base(other.base), scalar(other.scalar), offset(other.offset),
+      bucket_count(other.bucket_count), type(other.type) {
+    if (type == Type::kLinear) {
+        internal::LoadLinear(this);
+    } else {
+        internal::LoadExponential(this);
+    }
+}
+
 HistogramOptions HistogramOptions::Exponential(uint32_t bucket_count, uint32_t base,
                                                uint32_t scalar, int64_t offset) {
     HistogramOptions options;
@@ -122,11 +148,7 @@
     options.scalar = scalar;
     options.offset = static_cast<double>(offset - scalar);
     options.type = Type::kExponential;
-    double max_value = scalar * pow(base, bucket_count) + options.offset;
-    options.map_fn = [max_value](double val, const HistogramOptions& options) {
-        return internal::GetExponentialBucket(val, options, max_value);
-    };
-    options.reverse_map_fn = internal::GetExponentialBucketValue;
+    internal::LoadExponential(&options);
     return options;
 }
 
@@ -136,11 +158,7 @@
     options.scalar = scalar;
     options.offset = static_cast<double>(offset);
     options.type = Type::kLinear;
-    double max_value = static_cast<double>(scalar * bucket_count + offset);
-    options.map_fn = [max_value](double val, const HistogramOptions& options) {
-        return internal::GetLinearBucket(val, options, max_value);
-    };
-    options.reverse_map_fn = internal::GetLinearBucketValue;
+    internal::LoadLinear(&options);
     return options;
 }
 
@@ -153,15 +171,13 @@
 Histogram& Histogram::operator=(Histogram&&) = default;
 Histogram::~Histogram() = default;
 
-template <typename ValueType>
-void Histogram::Add(ValueType value, Histogram::Count times) {
+template <typename ValueType> void Histogram::Add(ValueType value, Histogram::Count times) {
     double dbl_value = static_cast<double>(value);
     uint32_t bucket = options_->map_fn(dbl_value, *options_);
     remote_histogram_->IncrementCount(bucket, times);
 }
 
-template <typename ValueType>
-Histogram::Count Histogram::GetRemoteCount(ValueType value) const {
+template <typename ValueType> Histogram::Count Histogram::GetRemoteCount(ValueType value) const {
     double dbl_value = static_cast<double>(value);
     uint32_t bucket = options_->map_fn(dbl_value, *options_);
     return remote_histogram_->GetCount(bucket);
diff --git a/system/ulib/cobalt-client/include/cobalt-client/cpp/collector-internal.h b/system/ulib/cobalt-client/include/cobalt-client/cpp/collector-internal.h
new file mode 100644
index 0000000..34c84ea
--- /dev/null
+++ b/system/ulib/cobalt-client/include/cobalt-client/cpp/collector-internal.h
@@ -0,0 +1,34 @@
+// Copyright 2018 The Fuchsia Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#pragma once
+
+#include <stdint.h>
+
+#include <cobalt-client/cpp/counter-internal.h>
+#include <cobalt-client/cpp/histogram-internal.h>
+
+namespace cobalt_client {
+namespace internal {
+// Interface for persisting collected data.
+class Logger {
+public:
+    Logger(const Logger&) = delete;
+    Logger(Logger&&) = delete;
+    Logger& operator=(const Logger&) = delete;
+    Logger& operator=(Logger&&) = delete;
+    virtual ~Logger() = default;
+
+    // Returns true if the histogram was persisted.
+    virtual bool Log(uint64_t metric_id, const RemoteHistogram::EventBuffer& histogram) = 0;
+
+    // Returns true if the counter was persisted.
+    virtual bool Log(uint64_t metric_id, const RemoteCounter::EventBuffer& counter) = 0;
+
+protected:
+    Logger() = default;
+};
+
+} // namespace internal
+} // namespace cobalt_client
diff --git a/system/ulib/cobalt-client/include/cobalt-client/cpp/collector.h b/system/ulib/cobalt-client/include/cobalt-client/cpp/collector.h
new file mode 100644
index 0000000..377e771
--- /dev/null
+++ b/system/ulib/cobalt-client/include/cobalt-client/cpp/collector.h
@@ -0,0 +1,92 @@
+// Copyright 2018 The Fuchsia Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#pragma once
+
+#include <cobalt-client/cpp/counter.h>
+#include <cobalt-client/cpp/histogram.h>
+#include <fbl/atomic.h>
+#include <fbl/function.h>
+#include <fbl/string.h>
+#include <fbl/unique_ptr.h>
+#include <fbl/vector.h>
+#include <lib/zx/vmo.h>
+
+namespace cobalt_client {
+namespace internal {
+// Forward Declarations.
+class RemoteHistogram;
+class RemoteCounter;
+struct Metadata;
+class Logger;
+} // namespace internal
+
+// Defines the options for initializing the Collector.
+struct CollectorOptions {
+    // Callback used when reading the config to create a cobalt logger.
+    // Returns true when the write was successfull. The VMO will be transferred
+    // to the cobalt service.
+    fbl::Function<bool(zx::vmo*)> load_config;
+
+    // We need this information for pre-allocating storage
+    // and guaranteeing no dangling pointers, plus contiguos
+    // memory for cache friendlyness.
+
+    // Number of histograms to be used.
+    size_t max_histograms;
+
+    // Number of counters to be used.
+    size_t max_counters;
+};
+
+// This class acts as a peer for instantiating Hisotgrams and Counters. All
+// objects instantiated through this class act as a view, which means that
+// their lifetime is coupled to this object's lifetime. This class does require
+// the number of different configurations on construction.
+//
+// The Sink provides an API for persisiting the supported data types. This is
+// exposed to simplify testing.
+//
+// This class is moveable, but not copyable or assignable.
+// This class is thread-compatible.
+class Collector {
+public:
+    // TODO(gevalentino): Once the cobalt client is written add a factory method to return
+    // an instance of |Collector|. The cobalt client will implement the logger interface,
+    // we do this to simplify testing.
+    // static Collector Create(const Collector& options);
+
+    Collector(const CollectorOptions& options, fbl::unique_ptr<internal::Logger> logger);
+    Collector(const Collector&) = delete;
+    Collector(Collector&&);
+    Collector& operator=(const Collector&) = delete;
+    Collector& operator=(Collector&&) = delete;
+    ~Collector();
+
+    // Returns a histogram to log events for a given |metric_id|, |event_type_index|
+    // on a histogram described by |options|.
+    Histogram AddHistogram(uint64_t metric_id, uint32_t event_type_index,
+                           const HistogramOptions& options);
+
+    // Returns a counter to log events for a given |metric_id| and |event_type_index|
+    // as a raw counter.
+    Counter AddCounter(uint64_t metric_id, uint32_t event_type_index);
+
+    // Flushes the content of all flushable metrics into |sink_|. The |sink_| is
+    // in charge of persisting the data.
+    void Flush();
+
+private:
+    void LogHistogram(internal::RemoteHistogram* histogram);
+    void LogCounter(internal::RemoteCounter* counter);
+
+    fbl::Vector<HistogramOptions> histogram_options_;
+    fbl::Vector<internal::RemoteHistogram> remote_histograms_;
+    fbl::Vector<internal::RemoteCounter> remote_counters_;
+
+    fbl::unique_ptr<internal::Logger> logger_;
+    fbl::atomic<bool> flushing_;
+};
+
+} // namespace cobalt_client
diff --git a/system/ulib/cobalt-client/include/cobalt-client/cpp/counter-internal.h b/system/ulib/cobalt-client/include/cobalt-client/cpp/counter-internal.h
index e6105bb..8fde965 100644
--- a/system/ulib/cobalt-client/include/cobalt-client/cpp/counter-internal.h
+++ b/system/ulib/cobalt-client/include/cobalt-client/cpp/counter-internal.h
@@ -63,9 +63,12 @@
     // writeable again(this is buffer where the counter and its metadata are flushed).
     using FlushCompleteFn = fbl::Function<void()>;
 
+    // Alias for the specific buffer instantiation.
+    using EventBuffer = internal::EventBuffer<uint32_t>;
+
     // Function in charge persisting or processing the ObservationValue buffer.
-    using FlushFn = fbl::Function<void(uint64_t metric_id, const EventBuffer<uint32_t>&,
-                                       FlushCompleteFn complete)>;
+    using FlushFn =
+        fbl::Function<void(uint64_t metric_id, const EventBuffer&, FlushCompleteFn complete)>;
 
     RemoteCounter() = delete;
     RemoteCounter(uint64_t metric_id, const fbl::Vector<Metadata>& metadata);
@@ -83,7 +86,7 @@
 
 private:
     // The buffer containing the data to be flushed.
-    EventBuffer<uint32_t> buffer_;
+    EventBuffer buffer_;
 
     // Unique-Id representing this metric in the backend.
     uint64_t metric_id_;
diff --git a/system/ulib/cobalt-client/include/cobalt-client/cpp/counter.h b/system/ulib/cobalt-client/include/cobalt-client/cpp/counter.h
index 17dd544..ae5b39f 100644
--- a/system/ulib/cobalt-client/include/cobalt-client/cpp/counter.h
+++ b/system/ulib/cobalt-client/include/cobalt-client/cpp/counter.h
@@ -25,7 +25,7 @@
 
     Counter() = delete;
     Counter(internal::RemoteCounter* remote_counter);
-    Counter(const Counter&) = default;
+    Counter(const Counter& other) : remote_counter_(other.remote_counter_){};
     Counter(Counter&&) = default;
     ~Counter() = default;
 
diff --git a/system/ulib/cobalt-client/include/cobalt-client/cpp/histogram-internal.h b/system/ulib/cobalt-client/include/cobalt-client/cpp/histogram-internal.h
index ef4dbe8..d4f2fc4 100644
--- a/system/ulib/cobalt-client/include/cobalt-client/cpp/histogram-internal.h
+++ b/system/ulib/cobalt-client/include/cobalt-client/cpp/histogram-internal.h
@@ -41,7 +41,9 @@
 
     // Increases the count of the |bucket| bucket by 1.
     void IncrementCount(uint32_t bucket, Count val = 1) {
-        ZX_DEBUG_ASSERT_MSG(bucket < buckets_.size(), "IncrementCount bucket out of range.");
+        ZX_DEBUG_ASSERT_MSG(bucket < buckets_.size(),
+                            "IncrementCount bucket(%u) out of range(%lu).", bucket,
+                            buckets_.size());
         buckets_[bucket].Increment(val);
     }
 
@@ -67,10 +69,12 @@
     // writeable again(this is buffer where the histogram is flushed).
     using FlushCompleteFn = fbl::Function<void()>;
 
+    // Alias for the EventBuffer used for histogram.
+    using EventBuffer = internal::EventBuffer<fidl::VectorView<HistogramBucket>>;
+
     // Function in charge persisting or processing the EventValue buffer.
-    using FlushFn = fbl::Function<void(uint64_t metric_id,
-                                       const EventBuffer<fidl::VectorView<HistogramBucket>>&,
-                                       FlushCompleteFn complete)>;
+    using FlushFn =
+        fbl::Function<void(uint64_t metric_id, const EventBuffer&, FlushCompleteFn complete)>;
 
     RemoteHistogram() = delete;
     RemoteHistogram(uint32_t num_buckets, uint64_t metric_id,
@@ -88,10 +92,10 @@
     // | Metadata | Histogram|
     bool Flush(const FlushFn& flush_handler);
 
-private:
     // Keeps a buffer for the metadata and the metric.
-    EventBuffer<fidl::VectorView<HistogramBucket>> buffer_;
+    EventBuffer buffer_;
 
+private:
     // Buffer for out of line allocation for the data being sent
     // through fidl. This buffer is rewritten on every flush, and contains
     // an entry for each bucket.
diff --git a/system/ulib/cobalt-client/include/cobalt-client/cpp/histogram-options.h b/system/ulib/cobalt-client/include/cobalt-client/cpp/histogram-options.h
index 7bc23dc..f8948c8 100644
--- a/system/ulib/cobalt-client/include/cobalt-client/cpp/histogram-options.h
+++ b/system/ulib/cobalt-client/include/cobalt-client/cpp/histogram-options.h
@@ -47,6 +47,9 @@
     // |scalar| * current_step + offset = lowerbound(current_step).
     static HistogramOptions Linear(uint32_t bucket_count, uint32_t scalar, int64_t offset);
 
+    HistogramOptions() = default;
+    HistogramOptions(const HistogramOptions&);
+
     // Sanity check.
     bool IsValid() const {
         switch (type) {
diff --git a/system/ulib/cobalt-client/rules.mk b/system/ulib/cobalt-client/rules.mk
index d4e200c..0c311ea 100644
--- a/system/ulib/cobalt-client/rules.mk
+++ b/system/ulib/cobalt-client/rules.mk
@@ -9,6 +9,7 @@
 MODULE_TYPE := userlib
 
 MODULE_SRCS += \
+    $(LOCAL_DIR)/collector.cpp \
     $(LOCAL_DIR)/counter.cpp \
     $(LOCAL_DIR)/histogram.cpp \
     $(LOCAL_DIR)/event_buffer.cpp \
diff --git a/system/utest/cobalt-client/collector_test.cpp b/system/utest/cobalt-client/collector_test.cpp
new file mode 100644
index 0000000..c6567a1
--- /dev/null
+++ b/system/utest/cobalt-client/collector_test.cpp
@@ -0,0 +1,466 @@
+// Copyright 2018 The Fuchsia Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include <stdint.h>
+#include <string.h>
+#include <threads.h>
+#include <unistd.h>
+
+#include <cobalt-client/cpp/collector-internal.h>
+#include <cobalt-client/cpp/collector.h>
+#include <fbl/unique_ptr.h>
+#include <fbl/vector.h>
+#include <lib/sync/completion.h>
+#include <unittest/unittest.h>
+
+namespace cobalt_client {
+namespace internal {
+namespace {
+
+// Number of threads to spawn for multi threaded tests.
+constexpr size_t kThreads = 20;
+static_assert(kThreads % 2 == 0, "Use even number of threads for simplcity");
+
+// Number of times to perform an operation in a given thread.
+constexpr size_t kOperations = 50;
+
+// Fake storage used by our FakeLogger.
+template <typename T>
+class FakeStorage {
+public:
+    T* GetOrNull(uint64_t metric_id, uint32_t event_type, uint32_t event_type_index) {
+        size_t index = 0;
+        if (!Find(metric_id, event_type, event_type_index, &index)) {
+            return nullptr;
+        }
+        return entries_[index].data.get();
+    };
+
+    void InsertOrUpdateEntry(uint64_t metric_id, uint32_t event_type, uint32_t event_type_index,
+                             const fbl::Function<void(fbl::unique_ptr<T>*)>& update) {
+        size_t index = 0;
+        if (!Find(metric_id, event_type, event_type_index, &index)) {
+            entries_.push_back({.metric_id = metric_id,
+                                .event_type = event_type,
+                                .event_type_index = event_type_index,
+                                .data = nullptr});
+            index = entries_.size() - 1;
+        }
+        update(&entries_[index].data);
+    }
+
+private:
+    bool Find(uint64_t metric_id, uint32_t event_type, uint32_t event_type_index,
+              size_t* index) const {
+        *index = 0;
+        for (auto& entry : entries_) {
+            if (entry.metric_id == metric_id && entry.event_type == event_type &&
+                entry.event_type_index == event_type_index) {
+                return true;
+            }
+            ++(*index);
+        }
+        return false;
+    }
+
+    // Help to identify event data logged.
+    struct Entry {
+        uint64_t metric_id;
+        uint32_t event_type;
+        uint32_t event_type_index;
+        fbl::unique_ptr<T> data;
+    };
+    fbl::Vector<Entry> entries_;
+};
+
+// Logger for to verify that the Collector behavior is correct.
+class TestLogger : public Logger {
+public:
+    TestLogger(FakeStorage<BaseHistogram>* histograms, FakeStorage<BaseCounter>* counters)
+        : histograms_(histograms), counters_(counters), fail_(false) {}
+    TestLogger(const TestLogger&) = delete;
+    TestLogger(TestLogger&&) = delete;
+    TestLogger& operator=(const TestLogger&) = delete;
+    TestLogger& operator=(TestLogger&&) = delete;
+    ~TestLogger() override = default;
+
+    // Returns true if the histogram was persisted.
+    bool Log(uint64_t metric_id, const RemoteHistogram::EventBuffer& histogram) override {
+        if (!fail_.load()) {
+            histograms_->InsertOrUpdateEntry(
+                metric_id, histogram.metadata()[0].event_type,
+                histogram.metadata()[0].event_type_index,
+                [&histogram](fbl::unique_ptr<BaseHistogram>* persisted) {
+                    if (*persisted == nullptr) {
+                        persisted->reset(new BaseHistogram(
+                            static_cast<uint32_t>(histogram.event_data().count())));
+                    }
+                    for (auto& bucket : histogram.event_data()) {
+                        (*persisted)->IncrementCount(bucket.index, bucket.count);
+                    }
+                });
+        }
+        return !fail_.load();
+    }
+
+    // Returns true if the counter was persisted.
+    bool Log(uint64_t metric_id, const RemoteCounter::EventBuffer& counter) override {
+        if (!fail_.load()) {
+            counters_->InsertOrUpdateEntry(metric_id, counter.metadata()[0].event_type,
+                                           counter.metadata()[0].event_type_index,
+                                           [&counter](fbl::unique_ptr<BaseCounter>* persisted) {
+                                               if (*persisted == nullptr) {
+                                                   persisted->reset(new BaseCounter());
+                                               }
+                                               (*persisted)->Increment(counter.event_data());
+                                           });
+        }
+        return !fail_.load();
+    }
+
+    void set_fail(bool should_fail) { fail_.store(should_fail); }
+
+private:
+    FakeStorage<BaseHistogram>* histograms_;
+    FakeStorage<BaseCounter>* counters_;
+    fbl::atomic<bool> fail_;
+};
+
+Collector MakeCollector(size_t max_histograms, size_t max_counters,
+                        FakeStorage<BaseHistogram>* histograms, FakeStorage<BaseCounter>* counters,
+                        TestLogger** test_logger = nullptr) {
+    fbl::unique_ptr<TestLogger> logger = fbl::make_unique<TestLogger>(histograms, counters);
+    CollectorOptions options;
+    options.max_counters = max_counters;
+    options.max_histograms = max_histograms;
+
+    if (test_logger != nullptr) {
+        *test_logger = logger.get();
+    }
+
+    return fbl::move(Collector(options, fbl::move(logger)));
+}
+
+HistogramOptions MakeOptions() {
+    // | .....| ....| ...| .... |
+    // -inf  -2     0    2    +inf
+    HistogramOptions options =
+        HistogramOptions::Linear(/*bucket_count=*/2, /*scalar=*/2, /*offset=*/-2);
+    return fbl::move(options);
+}
+
+bool AddCounterTest() {
+    BEGIN_TEST;
+    FakeStorage<BaseHistogram> histograms;
+    FakeStorage<BaseCounter> counters;
+    Collector collector =
+        MakeCollector(/*max_histograms=*/0, /*max_counters=*/1, &histograms, &counters);
+    auto counter = collector.AddCounter(/*metric_id=*/1, /*event_type_index=*/1);
+    counter.Increment(5);
+    ASSERT_EQ(counter.GetRemoteCount(), 5);
+    END_TEST;
+}
+
+// Sanity check that different counters do not interfere with each other.
+bool AddCounterMultipleTest() {
+    BEGIN_TEST;
+    FakeStorage<BaseHistogram> histograms;
+    FakeStorage<BaseCounter> counters;
+    Collector collector =
+        MakeCollector(/*max_histograms=*/0, /*max_counters=*/3, &histograms, &counters);
+    auto counter = collector.AddCounter(/*metric_id=*/1, /*event_type_index=*/1);
+    auto counter_2 = collector.AddCounter(/*metric_id=*/1, /*event_type_index=*/2);
+    auto counter_3 = collector.AddCounter(/*metric_id=*/1, /*event_type_index=*/3);
+    counter.Increment(5);
+    counter_2.Increment(3);
+    counter_3.Increment(2);
+    ASSERT_EQ(counter.GetRemoteCount(), 5);
+    ASSERT_EQ(counter_2.GetRemoteCount(), 3);
+    ASSERT_EQ(counter_3.GetRemoteCount(), 2);
+    END_TEST;
+}
+
+bool AddHistogramTest() {
+    BEGIN_TEST;
+    FakeStorage<BaseHistogram> histograms;
+    FakeStorage<BaseCounter> counters;
+    Collector collector =
+        MakeCollector(/*max_histograms=*/1, /*max_counters=*/0, &histograms, &counters);
+    auto histogram = collector.AddHistogram(/*metric_id*/ 1, /*event_type_index*/ 1, MakeOptions());
+    histogram.Add(-4, 2);
+    ASSERT_EQ(histogram.GetRemoteCount(-4), 2);
+    END_TEST;
+}
+
+// Sanity check that different histograms do not interfere with each other.
+bool AddHistogramMultipleTest() {
+    BEGIN_TEST;
+    FakeStorage<BaseHistogram> histograms;
+    FakeStorage<BaseCounter> counters;
+    Collector collector =
+        MakeCollector(/*max_histograms=*/3, /*max_counters=*/0, &histograms, &counters);
+    auto histogram = collector.AddHistogram(/*metric_id*/ 1, /*event_type_index*/ 1, MakeOptions());
+    auto histogram_2 =
+        collector.AddHistogram(/*metric_id*/ 1, /*event_type_index*/ 2, MakeOptions());
+    auto histogram_3 =
+        collector.AddHistogram(/*metric_id*/ 1, /*event_type_index*/ 3, MakeOptions());
+    histogram.Add(-4, 2);
+    histogram_2.Add(-1, 3);
+    histogram_3.Add(1, 4);
+    EXPECT_EQ(histogram.GetRemoteCount(-4), 2);
+    EXPECT_EQ(histogram_2.GetRemoteCount(-1), 3);
+    EXPECT_EQ(histogram_3.GetRemoteCount(1), 4);
+    END_TEST;
+}
+
+// Verify that flushed data matches the logged data. This means that the FakeStorage has the right
+// values for the right metric and event_type_index.
+bool FlushTest() {
+    BEGIN_TEST;
+    FakeStorage<BaseHistogram> histograms;
+    FakeStorage<BaseCounter> counters;
+    HistogramOptions options = MakeOptions();
+    Collector collector =
+        MakeCollector(/*max_histograms=*/2, /*max_counters=*/2, &histograms, &counters);
+    auto histogram = collector.AddHistogram(/*metric_id*/ 1, /*event_type_index*/ 1, options);
+    auto histogram_2 = collector.AddHistogram(/*metric_id*/ 1, /*event_type_index*/ 2, options);
+    auto counter = collector.AddCounter(/*metric_id=*/2, /*event_type_index=*/1);
+    auto counter_2 = collector.AddCounter(/*metric_id=*/2, /*event_type_index=*/2);
+
+    histogram.Add(-4, 2);
+    histogram_2.Add(-1, 3);
+    counter.Increment(5);
+    counter_2.Increment(3);
+
+    collector.Flush();
+
+    // Verify reset of local data.
+    EXPECT_EQ(histogram.GetRemoteCount(-4), 0);
+    EXPECT_EQ(histogram_2.GetRemoteCount(-1), 0);
+    EXPECT_EQ(counter.GetRemoteCount(), 0);
+    EXPECT_EQ(counter_2.GetRemoteCount(), 0);
+
+    // Verify 'persisted' data matches what the local data used to be.
+    // Note: for now event_type is 0 for all metrics.
+
+    // -4 goes to underflow bucket(0)
+    EXPECT_EQ(histograms.GetOrNull(/*metric_id=*/1, /*event_type=*/0, /*event_type_index=*/1)
+                  ->GetCount(options.map_fn(-4, options)),
+              2);
+
+    // -1 goes to first non underflow bucket(1)
+    EXPECT_EQ(histograms.GetOrNull(1, 0, 2)->GetCount(options.map_fn(-1, options)), 3);
+
+    EXPECT_EQ(counters.GetOrNull(2, 0, 1)->Load(), 5);
+    EXPECT_EQ(counters.GetOrNull(2, 0, 2)->Load(), 3);
+    END_TEST;
+}
+
+// Verify that when the logger fails to persist data, the flushed values are restored.
+bool FlushFailTest() {
+    BEGIN_TEST;
+    FakeStorage<BaseHistogram> histograms;
+    FakeStorage<BaseCounter> counters;
+    TestLogger* logger;
+    HistogramOptions options = MakeOptions();
+    Collector collector =
+        MakeCollector(/*max_histograms=*/2, /*max_counters=*/2, &histograms, &counters, &logger);
+    auto histogram = collector.AddHistogram(/*metric_id*/ 1, /*event_type_index*/ 1, options);
+    auto histogram_2 = collector.AddHistogram(/*metric_id*/ 1, /*event_type_index*/ 2, options);
+    auto counter = collector.AddCounter(/*metric_id=*/2, /*event_type_index=*/1);
+    auto counter_2 = collector.AddCounter(/*metric_id=*/2, /*event_type_index=*/2);
+
+    histogram.Add(-4, 2);
+    counter.Increment(5);
+    collector.Flush();
+    logger->set_fail(/*should_fail=*/true);
+
+    histogram_2.Add(-1, 3);
+    counter_2.Increment(3);
+
+    collector.Flush();
+
+    // Verify reset of local data.
+    EXPECT_EQ(histogram.GetRemoteCount(-4), 0);
+    EXPECT_EQ(histogram_2.GetRemoteCount(-1), 3);
+    EXPECT_EQ(counter.GetRemoteCount(), 0);
+    EXPECT_EQ(counter_2.GetRemoteCount(), 3);
+
+    // Verify 'persisted' data matches what the local data used to be.
+    // Note: for now event_type is 0 for all metrics.
+
+    // -4 goes to underflow bucket(0)
+    EXPECT_EQ(histograms.GetOrNull(/*metric_id=*/1, /*event_type=*/0, /*event_type_index=*/1)
+                  ->GetCount(options.map_fn(-4, options)),
+              2);
+
+    // -1 goes to first non underflow bucket(1), and its expected to be 0 because the logger failed.
+    EXPECT_EQ(histograms.GetOrNull(1, 0, 2)->GetCount(options.map_fn(-1, options)), 0);
+
+    EXPECT_EQ(counters.GetOrNull(2, 0, 1)->Load(), 5);
+
+    // Expected to be 0, because the logger failed.
+    EXPECT_EQ(counters.GetOrNull(2, 0, 2)->Load(), 0);
+    END_TEST;
+}
+
+// All histograms have the same shape bucket for simplicity,
+// and we either operate on even or odd buckets.
+struct ObserveFnArgs {
+    // List of histograms to operate on.
+    fbl::Vector<Histogram> histograms;
+
+    // List of counters to operate on.
+    fbl::Vector<Counter> counters;
+
+    // Number of observations to register.
+    size_t count;
+
+    // Notify the thread when to start executing.
+    sync_completion_t* start;
+};
+
+int ObserveFn(void* vargs) {
+    ObserveFnArgs* args = reinterpret_cast<ObserveFnArgs*>(vargs);
+    static HistogramOptions options = MakeOptions();
+    sync_completion_wait(args->start, zx::sec(20).get());
+    size_t curr = 0;
+    for (auto& hist : args->histograms) {
+        for (size_t bucket_index = 0; bucket_index < options.bucket_count + 2; ++bucket_index) {
+            for (size_t i = 0; i < args->count; ++i) {
+                hist.Add(options.reverse_map_fn(static_cast<uint32_t>(bucket_index), options),
+                         curr + bucket_index);
+            }
+        }
+        ++curr;
+    }
+
+    curr = 0;
+    for (auto& counter : args->counters) {
+        for (size_t i = 0; i < args->count; ++i) {
+            counter.Increment(curr);
+        }
+        ++curr;
+    }
+    return thrd_success;
+}
+
+struct FlushFnArgs {
+    // Target collector to be flushed.
+    Collector* collector;
+
+    // Number of times to flush.
+    size_t count;
+
+    // Notify thread start.
+    sync_completion_t* start;
+};
+
+int FlushFn(void* vargs) {
+    FlushFnArgs* args = reinterpret_cast<FlushFnArgs*>(vargs);
+
+    sync_completion_wait(args->start, zx::sec(20).get());
+    for (size_t i = 0; i < args->count; ++i) {
+        args->collector->Flush();
+    }
+
+    return thrd_success;
+}
+
+// Verify that if we flush while the histograms and counters are being updated,
+// no data is lost, meaning that the sum of the persisted data and the local data
+// is equal to the expected value.
+template <bool should_fail>
+bool FlushMultithreadTest() {
+    BEGIN_TEST;
+    FakeStorage<BaseHistogram> histograms;
+    FakeStorage<BaseCounter> counters;
+    HistogramOptions options = MakeOptions();
+    sync_completion_t start;
+
+    ObserveFnArgs observe_args;
+    observe_args.start = &start;
+    observe_args.count = kOperations;
+    TestLogger* logger;
+
+    Collector collector =
+        MakeCollector(/*max_histograms=*/9, /*max_counters=*/9, &histograms, &counters, &logger);
+
+    for (uint64_t metric_id = 0; metric_id < 3; ++metric_id) {
+        for (uint32_t event_type_index = 0; event_type_index < 3; ++event_type_index) {
+            observe_args.histograms.push_back(
+                collector.AddHistogram(2 * metric_id, event_type_index, options));
+            observe_args.counters.push_back(
+                collector.AddCounter(2 * metric_id + 1, event_type_index));
+        }
+    }
+    // Add empty entries to the fake storage.
+    collector.Flush();
+    // Set the logger to either fail to persist or succeed.
+    logger->set_fail(should_fail);
+
+    FlushFnArgs flush_args;
+    flush_args.collector = &collector;
+    flush_args.count = kOperations;
+    flush_args.start = &start;
+
+    fbl::Vector<thrd_t> thread_ids;
+
+    thread_ids.reserve(kThreads);
+    for (size_t i = 0; i < kThreads; ++i) {
+        thrd_t thread_id;
+        if (i % 2 == 0) {
+            thrd_create(&thread_id, &ObserveFn, &observe_args);
+        } else {
+            thrd_create(&thread_id, &FlushFn, &flush_args);
+        }
+        thread_ids.push_back(thread_id);
+    }
+
+    // Start all threads.
+    sync_completion_signal(&start);
+
+    for (auto thread_id : thread_ids) {
+        ASSERT_EQ(thrd_join(thread_id, nullptr), thrd_success);
+    }
+
+    // Verify that all histograms buckets and counters have exactly |kOperations| * |kThreads| /
+    // 2 count.
+    constexpr size_t target_count = kThreads * kOperations / 2;
+    for (uint64_t metric_id = 0; metric_id < 3; ++metric_id) {
+        for (uint32_t event_type_index = 0; event_type_index < 3; ++event_type_index) {
+            size_t index = 3 * metric_id + event_type_index;
+            auto* tmp_hist = histograms.GetOrNull(2 * metric_id, 0, event_type_index);
+            // Each bucket is increased |index| + |i| for each thread recording observations.
+            for (uint32_t i = 0; i < 4; ++i) {
+                ASSERT_TRUE(tmp_hist != nullptr);
+                EXPECT_EQ(tmp_hist->GetCount(i) + observe_args.histograms[index].GetRemoteCount(
+                                                      options.reverse_map_fn(i, options)),
+                          target_count * (i + index));
+            }
+
+            auto* tmp_counter = counters.GetOrNull(2 * metric_id + 1, 0, event_type_index);
+            ASSERT_TRUE(tmp_counter != nullptr);
+            // Each counter is increased by |index| for each thread recording observations.
+            EXPECT_EQ(tmp_counter->Load() + observe_args.counters[index].GetRemoteCount(),
+                      target_count * index);
+        }
+    }
+    END_TEST;
+}
+
+BEGIN_TEST_CASE(CollectorTest)
+RUN_TEST(AddCounterTest)
+RUN_TEST(AddCounterMultipleTest)
+RUN_TEST(AddHistogramTest)
+RUN_TEST(AddHistogramMultipleTest)
+RUN_TEST(FlushTest)
+RUN_TEST(FlushFailTest)
+RUN_TEST_LARGE(FlushMultithreadTest<false>)
+RUN_TEST_LARGE(FlushMultithreadTest<true>)
+END_TEST_CASE(CollectorTest)
+
+} // namespace
+} // namespace internal
+} // namespace cobalt_client
diff --git a/system/utest/cobalt-client/rules.mk b/system/utest/cobalt-client/rules.mk
index 677ece6..cbfe4c7 100644
--- a/system/utest/cobalt-client/rules.mk
+++ b/system/utest/cobalt-client/rules.mk
@@ -10,8 +10,9 @@
 MODULE_TYPE := usertest
 
 MODULE_SRCS += \
-    $(LOCAL_DIR)/histogram_options_test.cpp \
+    $(LOCAL_DIR)/collector_test.cpp \
     $(LOCAL_DIR)/counter_test.cpp \
+    $(LOCAL_DIR)/histogram_options_test.cpp \
     $(LOCAL_DIR)/histogram_test.cpp \
     $(LOCAL_DIR)/test_main.cpp \
     $(LOCAL_DIR)/types_internal_test.cpp \