| // Copyright 2018 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include <stdint.h> |
| #include <string.h> |
| #include <threads.h> |
| #include <unistd.h> |
| |
| #include <cobalt-client/cpp/collector-internal.h> |
| #include <cobalt-client/cpp/collector.h> |
| #include <fbl/unique_ptr.h> |
| #include <fbl/vector.h> |
| #include <lib/sync/completion.h> |
| #include <unittest/unittest.h> |
| |
| namespace cobalt_client { |
| namespace internal { |
| namespace { |
| |
| // Number of threads to spawn for multi threaded tests. |
| constexpr size_t kThreads = 20; |
| static_assert(kThreads % 2 == 0, "Use even number of threads for simplcity"); |
| |
| // Number of times to perform an operation in a given thread. |
| constexpr size_t kOperations = 50; |
| |
| // Fake storage used by our FakeLogger. |
| template <typename T> |
| class FakeStorage { |
| public: |
| T* GetOrNull(uint64_t metric_id, uint32_t event_type, uint32_t event_type_index) { |
| size_t index = 0; |
| if (!Find(metric_id, event_type, event_type_index, &index)) { |
| return nullptr; |
| } |
| return entries_[index].data.get(); |
| }; |
| |
| void InsertOrUpdateEntry(uint64_t metric_id, uint32_t event_type, uint32_t event_type_index, |
| const fbl::Function<void(fbl::unique_ptr<T>*)>& update) { |
| size_t index = 0; |
| if (!Find(metric_id, event_type, event_type_index, &index)) { |
| entries_.push_back({.metric_id = metric_id, |
| .event_type = event_type, |
| .event_type_index = event_type_index, |
| .data = nullptr}); |
| index = entries_.size() - 1; |
| } |
| update(&entries_[index].data); |
| } |
| |
| private: |
| bool Find(uint64_t metric_id, uint32_t event_type, uint32_t event_type_index, |
| size_t* index) const { |
| *index = 0; |
| for (auto& entry : entries_) { |
| if (entry.metric_id == metric_id && entry.event_type == event_type && |
| entry.event_type_index == event_type_index) { |
| return true; |
| } |
| ++(*index); |
| } |
| return false; |
| } |
| |
| // Help to identify event data logged. |
| struct Entry { |
| uint64_t metric_id; |
| uint32_t event_type; |
| uint32_t event_type_index; |
| fbl::unique_ptr<T> data; |
| }; |
| fbl::Vector<Entry> entries_; |
| }; |
| |
| // Logger for to verify that the Collector behavior is correct. |
| class TestLogger : public Logger { |
| public: |
| TestLogger(FakeStorage<BaseHistogram>* histograms, FakeStorage<BaseCounter>* counters) |
| : histograms_(histograms), counters_(counters), fail_(false) {} |
| TestLogger(const TestLogger&) = delete; |
| TestLogger(TestLogger&&) = delete; |
| TestLogger& operator=(const TestLogger&) = delete; |
| TestLogger& operator=(TestLogger&&) = delete; |
| ~TestLogger() override = default; |
| |
| // Returns true if the histogram was persisted. |
| bool Log(uint64_t metric_id, const RemoteHistogram::EventBuffer& histogram) override { |
| if (!fail_.load()) { |
| histograms_->InsertOrUpdateEntry( |
| metric_id, histogram.metadata()[0].event_type, |
| histogram.metadata()[0].event_type_index, |
| [&histogram](fbl::unique_ptr<BaseHistogram>* persisted) { |
| if (*persisted == nullptr) { |
| persisted->reset(new BaseHistogram( |
| static_cast<uint32_t>(histogram.event_data().count()))); |
| } |
| for (auto& bucket : histogram.event_data()) { |
| (*persisted)->IncrementCount(bucket.index, bucket.count); |
| } |
| }); |
| } |
| return !fail_.load(); |
| } |
| |
| // Returns true if the counter was persisted. |
| bool Log(uint64_t metric_id, const RemoteCounter::EventBuffer& counter) override { |
| if (!fail_.load()) { |
| counters_->InsertOrUpdateEntry(metric_id, counter.metadata()[0].event_type, |
| counter.metadata()[0].event_type_index, |
| [&counter](fbl::unique_ptr<BaseCounter>* persisted) { |
| if (*persisted == nullptr) { |
| persisted->reset(new BaseCounter()); |
| } |
| (*persisted)->Increment(counter.event_data()); |
| }); |
| } |
| return !fail_.load(); |
| } |
| |
| void set_fail(bool should_fail) { fail_.store(should_fail); } |
| |
| private: |
| FakeStorage<BaseHistogram>* histograms_; |
| FakeStorage<BaseCounter>* counters_; |
| fbl::atomic<bool> fail_; |
| }; |
| |
| Collector MakeCollector(size_t max_histograms, size_t max_counters, |
| FakeStorage<BaseHistogram>* histograms, FakeStorage<BaseCounter>* counters, |
| TestLogger** test_logger = nullptr) { |
| fbl::unique_ptr<TestLogger> logger = fbl::make_unique<TestLogger>(histograms, counters); |
| CollectorOptions options; |
| options.max_counters = max_counters; |
| options.max_histograms = max_histograms; |
| |
| if (test_logger != nullptr) { |
| *test_logger = logger.get(); |
| } |
| |
| return fbl::move(Collector(options, fbl::move(logger))); |
| } |
| |
| HistogramOptions MakeOptions() { |
| // | .....| ....| ...| .... | |
| // -inf -2 0 2 +inf |
| HistogramOptions options = |
| HistogramOptions::Linear(/*bucket_count=*/2, /*scalar=*/2, /*offset=*/-2); |
| return fbl::move(options); |
| } |
| |
| bool AddCounterTest() { |
| BEGIN_TEST; |
| FakeStorage<BaseHistogram> histograms; |
| FakeStorage<BaseCounter> counters; |
| Collector collector = |
| MakeCollector(/*max_histograms=*/0, /*max_counters=*/1, &histograms, &counters); |
| auto counter = collector.AddCounter(/*metric_id=*/1, /*event_type_index=*/1); |
| counter.Increment(5); |
| ASSERT_EQ(counter.GetRemoteCount(), 5); |
| END_TEST; |
| } |
| |
| // Sanity check that different counters do not interfere with each other. |
| bool AddCounterMultipleTest() { |
| BEGIN_TEST; |
| FakeStorage<BaseHistogram> histograms; |
| FakeStorage<BaseCounter> counters; |
| Collector collector = |
| MakeCollector(/*max_histograms=*/0, /*max_counters=*/3, &histograms, &counters); |
| auto counter = collector.AddCounter(/*metric_id=*/1, /*event_type_index=*/1); |
| auto counter_2 = collector.AddCounter(/*metric_id=*/1, /*event_type_index=*/2); |
| auto counter_3 = collector.AddCounter(/*metric_id=*/1, /*event_type_index=*/3); |
| counter.Increment(5); |
| counter_2.Increment(3); |
| counter_3.Increment(2); |
| ASSERT_EQ(counter.GetRemoteCount(), 5); |
| ASSERT_EQ(counter_2.GetRemoteCount(), 3); |
| ASSERT_EQ(counter_3.GetRemoteCount(), 2); |
| END_TEST; |
| } |
| |
| bool AddHistogramTest() { |
| BEGIN_TEST; |
| FakeStorage<BaseHistogram> histograms; |
| FakeStorage<BaseCounter> counters; |
| Collector collector = |
| MakeCollector(/*max_histograms=*/1, /*max_counters=*/0, &histograms, &counters); |
| auto histogram = collector.AddHistogram(/*metric_id*/ 1, /*event_type_index*/ 1, MakeOptions()); |
| histogram.Add(-4, 2); |
| ASSERT_EQ(histogram.GetRemoteCount(-4), 2); |
| END_TEST; |
| } |
| |
| // Sanity check that different histograms do not interfere with each other. |
| bool AddHistogramMultipleTest() { |
| BEGIN_TEST; |
| FakeStorage<BaseHistogram> histograms; |
| FakeStorage<BaseCounter> counters; |
| Collector collector = |
| MakeCollector(/*max_histograms=*/3, /*max_counters=*/0, &histograms, &counters); |
| auto histogram = collector.AddHistogram(/*metric_id*/ 1, /*event_type_index*/ 1, MakeOptions()); |
| auto histogram_2 = |
| collector.AddHistogram(/*metric_id*/ 1, /*event_type_index*/ 2, MakeOptions()); |
| auto histogram_3 = |
| collector.AddHistogram(/*metric_id*/ 1, /*event_type_index*/ 3, MakeOptions()); |
| histogram.Add(-4, 2); |
| histogram_2.Add(-1, 3); |
| histogram_3.Add(1, 4); |
| EXPECT_EQ(histogram.GetRemoteCount(-4), 2); |
| EXPECT_EQ(histogram_2.GetRemoteCount(-1), 3); |
| EXPECT_EQ(histogram_3.GetRemoteCount(1), 4); |
| END_TEST; |
| } |
| |
| // Verify that flushed data matches the logged data. This means that the FakeStorage has the right |
| // values for the right metric and event_type_index. |
| bool FlushTest() { |
| BEGIN_TEST; |
| FakeStorage<BaseHistogram> histograms; |
| FakeStorage<BaseCounter> counters; |
| HistogramOptions options = MakeOptions(); |
| Collector collector = |
| MakeCollector(/*max_histograms=*/2, /*max_counters=*/2, &histograms, &counters); |
| auto histogram = collector.AddHistogram(/*metric_id*/ 1, /*event_type_index*/ 1, options); |
| auto histogram_2 = collector.AddHistogram(/*metric_id*/ 1, /*event_type_index*/ 2, options); |
| auto counter = collector.AddCounter(/*metric_id=*/2, /*event_type_index=*/1); |
| auto counter_2 = collector.AddCounter(/*metric_id=*/2, /*event_type_index=*/2); |
| |
| histogram.Add(-4, 2); |
| histogram_2.Add(-1, 3); |
| counter.Increment(5); |
| counter_2.Increment(3); |
| |
| collector.Flush(); |
| |
| // Verify reset of local data. |
| EXPECT_EQ(histogram.GetRemoteCount(-4), 0); |
| EXPECT_EQ(histogram_2.GetRemoteCount(-1), 0); |
| EXPECT_EQ(counter.GetRemoteCount(), 0); |
| EXPECT_EQ(counter_2.GetRemoteCount(), 0); |
| |
| // Verify 'persisted' data matches what the local data used to be. |
| // Note: for now event_type is 0 for all metrics. |
| |
| // -4 goes to underflow bucket(0) |
| EXPECT_EQ(histograms.GetOrNull(/*metric_id=*/1, /*event_type=*/0, /*event_type_index=*/1) |
| ->GetCount(options.map_fn(-4, options)), |
| 2); |
| |
| // -1 goes to first non underflow bucket(1) |
| EXPECT_EQ(histograms.GetOrNull(1, 0, 2)->GetCount(options.map_fn(-1, options)), 3); |
| |
| EXPECT_EQ(counters.GetOrNull(2, 0, 1)->Load(), 5); |
| EXPECT_EQ(counters.GetOrNull(2, 0, 2)->Load(), 3); |
| END_TEST; |
| } |
| |
| // Verify that when the logger fails to persist data, the flushed values are restored. |
| bool FlushFailTest() { |
| BEGIN_TEST; |
| FakeStorage<BaseHistogram> histograms; |
| FakeStorage<BaseCounter> counters; |
| TestLogger* logger; |
| HistogramOptions options = MakeOptions(); |
| Collector collector = |
| MakeCollector(/*max_histograms=*/2, /*max_counters=*/2, &histograms, &counters, &logger); |
| auto histogram = collector.AddHistogram(/*metric_id*/ 1, /*event_type_index*/ 1, options); |
| auto histogram_2 = collector.AddHistogram(/*metric_id*/ 1, /*event_type_index*/ 2, options); |
| auto counter = collector.AddCounter(/*metric_id=*/2, /*event_type_index=*/1); |
| auto counter_2 = collector.AddCounter(/*metric_id=*/2, /*event_type_index=*/2); |
| |
| histogram.Add(-4, 2); |
| counter.Increment(5); |
| collector.Flush(); |
| logger->set_fail(/*should_fail=*/true); |
| |
| histogram_2.Add(-1, 3); |
| counter_2.Increment(3); |
| |
| collector.Flush(); |
| |
| // Verify reset of local data. |
| EXPECT_EQ(histogram.GetRemoteCount(-4), 0); |
| EXPECT_EQ(histogram_2.GetRemoteCount(-1), 3); |
| EXPECT_EQ(counter.GetRemoteCount(), 0); |
| EXPECT_EQ(counter_2.GetRemoteCount(), 3); |
| |
| // Verify 'persisted' data matches what the local data used to be. |
| // Note: for now event_type is 0 for all metrics. |
| |
| // -4 goes to underflow bucket(0) |
| EXPECT_EQ(histograms.GetOrNull(/*metric_id=*/1, /*event_type=*/0, /*event_type_index=*/1) |
| ->GetCount(options.map_fn(-4, options)), |
| 2); |
| |
| // -1 goes to first non underflow bucket(1), and its expected to be 0 because the logger failed. |
| EXPECT_EQ(histograms.GetOrNull(1, 0, 2)->GetCount(options.map_fn(-1, options)), 0); |
| |
| EXPECT_EQ(counters.GetOrNull(2, 0, 1)->Load(), 5); |
| |
| // Expected to be 0, because the logger failed. |
| EXPECT_EQ(counters.GetOrNull(2, 0, 2)->Load(), 0); |
| END_TEST; |
| } |
| |
| // All histograms have the same shape bucket for simplicity, |
| // and we either operate on even or odd buckets. |
| struct ObserveFnArgs { |
| // List of histograms to operate on. |
| fbl::Vector<Histogram> histograms; |
| |
| // List of counters to operate on. |
| fbl::Vector<Counter> counters; |
| |
| // Number of observations to register. |
| size_t count; |
| |
| // Notify the thread when to start executing. |
| sync_completion_t* start; |
| }; |
| |
| int ObserveFn(void* vargs) { |
| ObserveFnArgs* args = reinterpret_cast<ObserveFnArgs*>(vargs); |
| static HistogramOptions options = MakeOptions(); |
| sync_completion_wait(args->start, zx::sec(20).get()); |
| size_t curr = 0; |
| for (auto& hist : args->histograms) { |
| for (size_t bucket_index = 0; bucket_index < options.bucket_count + 2; ++bucket_index) { |
| for (size_t i = 0; i < args->count; ++i) { |
| hist.Add(options.reverse_map_fn(static_cast<uint32_t>(bucket_index), options), |
| curr + bucket_index); |
| } |
| } |
| ++curr; |
| } |
| |
| curr = 0; |
| for (auto& counter : args->counters) { |
| for (size_t i = 0; i < args->count; ++i) { |
| counter.Increment(curr); |
| } |
| ++curr; |
| } |
| return thrd_success; |
| } |
| |
| struct FlushFnArgs { |
| // Target collector to be flushed. |
| Collector* collector; |
| |
| // Number of times to flush. |
| size_t count; |
| |
| // Notify thread start. |
| sync_completion_t* start; |
| }; |
| |
| int FlushFn(void* vargs) { |
| FlushFnArgs* args = reinterpret_cast<FlushFnArgs*>(vargs); |
| |
| sync_completion_wait(args->start, zx::sec(20).get()); |
| for (size_t i = 0; i < args->count; ++i) { |
| args->collector->Flush(); |
| } |
| |
| return thrd_success; |
| } |
| |
| // Verify that if we flush while the histograms and counters are being updated, |
| // no data is lost, meaning that the sum of the persisted data and the local data |
| // is equal to the expected value. |
| template <bool should_fail> |
| bool FlushMultithreadTest() { |
| BEGIN_TEST; |
| FakeStorage<BaseHistogram> histograms; |
| FakeStorage<BaseCounter> counters; |
| HistogramOptions options = MakeOptions(); |
| sync_completion_t start; |
| |
| ObserveFnArgs observe_args; |
| observe_args.start = &start; |
| observe_args.count = kOperations; |
| TestLogger* logger; |
| |
| Collector collector = |
| MakeCollector(/*max_histograms=*/9, /*max_counters=*/9, &histograms, &counters, &logger); |
| |
| for (uint64_t metric_id = 0; metric_id < 3; ++metric_id) { |
| for (uint32_t event_type_index = 0; event_type_index < 3; ++event_type_index) { |
| observe_args.histograms.push_back( |
| collector.AddHistogram(2 * metric_id, event_type_index, options)); |
| observe_args.counters.push_back( |
| collector.AddCounter(2 * metric_id + 1, event_type_index)); |
| } |
| } |
| // Add empty entries to the fake storage. |
| collector.Flush(); |
| // Set the logger to either fail to persist or succeed. |
| logger->set_fail(should_fail); |
| |
| FlushFnArgs flush_args; |
| flush_args.collector = &collector; |
| flush_args.count = kOperations; |
| flush_args.start = &start; |
| |
| fbl::Vector<thrd_t> thread_ids; |
| |
| thread_ids.reserve(kThreads); |
| for (size_t i = 0; i < kThreads; ++i) { |
| thrd_t thread_id; |
| if (i % 2 == 0) { |
| thrd_create(&thread_id, &ObserveFn, &observe_args); |
| } else { |
| thrd_create(&thread_id, &FlushFn, &flush_args); |
| } |
| thread_ids.push_back(thread_id); |
| } |
| |
| // Start all threads. |
| sync_completion_signal(&start); |
| |
| for (auto thread_id : thread_ids) { |
| ASSERT_EQ(thrd_join(thread_id, nullptr), thrd_success); |
| } |
| |
| // Verify that all histograms buckets and counters have exactly |kOperations| * |kThreads| / |
| // 2 count. |
| constexpr size_t target_count = kThreads * kOperations / 2; |
| for (uint64_t metric_id = 0; metric_id < 3; ++metric_id) { |
| for (uint32_t event_type_index = 0; event_type_index < 3; ++event_type_index) { |
| size_t index = 3 * metric_id + event_type_index; |
| auto* tmp_hist = histograms.GetOrNull(2 * metric_id, 0, event_type_index); |
| // Each bucket is increased |index| + |i| for each thread recording observations. |
| for (uint32_t i = 0; i < 4; ++i) { |
| ASSERT_TRUE(tmp_hist != nullptr); |
| EXPECT_EQ(tmp_hist->GetCount(i) + observe_args.histograms[index].GetRemoteCount( |
| options.reverse_map_fn(i, options)), |
| target_count * (i + index)); |
| } |
| |
| auto* tmp_counter = counters.GetOrNull(2 * metric_id + 1, 0, event_type_index); |
| ASSERT_TRUE(tmp_counter != nullptr); |
| // Each counter is increased by |index| for each thread recording observations. |
| EXPECT_EQ(tmp_counter->Load() + observe_args.counters[index].GetRemoteCount(), |
| target_count * index); |
| } |
| } |
| END_TEST; |
| } |
| |
| BEGIN_TEST_CASE(CollectorTest) |
| RUN_TEST(AddCounterTest) |
| RUN_TEST(AddCounterMultipleTest) |
| RUN_TEST(AddHistogramTest) |
| RUN_TEST(AddHistogramMultipleTest) |
| RUN_TEST(FlushTest) |
| RUN_TEST(FlushFailTest) |
| RUN_TEST_LARGE(FlushMultithreadTest<false>) |
| RUN_TEST_LARGE(FlushMultithreadTest<true>) |
| END_TEST_CASE(CollectorTest) |
| |
| } // namespace |
| } // namespace internal |
| } // namespace cobalt_client |