| // Copyright 2018 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include <stdint.h> |
| #include <string.h> |
| #include <threads.h> |
| #include <unistd.h> |
| |
| #include <cobalt-client/cpp/counter-internal.h> |
| #include <cobalt-client/cpp/counter.h> |
| #include <fbl/auto_call.h> |
| #include <fbl/string.h> |
| #include <fbl/string_printf.h> |
| #include <fuchsia/cobalt/c/fidl.h> |
| #include <lib/sync/completion.h> |
| #include <lib/zx/time.h> |
| #include <unittest/unittest.h> |
| |
| namespace cobalt_client { |
| namespace { |
| |
| // Encoder Id used for setting up metrics parts in this test. |
| constexpr uint32_t kEncodingId = 20; |
| |
| constexpr uint64_t kMetricId = 1; |
| |
| // Name used for metric returned by metadata parts. |
| constexpr char kPartName[] = "SomeName"; |
| |
| // Name used for metric returned by ObservationBuffer->GetMutableMetric() |
| constexpr char kMetricName[] = "SomeMetricName"; |
| |
| // Number of threads spawned for multi-threaded tests. |
| constexpr uint64_t kThreads = 20; |
| } // namespace |
| |
| namespace internal { |
| namespace { |
| |
| ObservationValue MakeObservation(const char* name, Value value) { |
| ObservationValue obs; |
| obs.name.size = strlen(name) + 1; |
| obs.name.data = const_cast<char*>(name); |
| obs.value = value; |
| obs.encoding_id = kEncodingId; |
| return obs; |
| } |
| |
| fbl::Vector<ObservationValue>& GetMetadata() { |
| static fbl::Vector<ObservationValue> metadata = {MakeObservation(kPartName, IntValue(2)), |
| MakeObservation(kPartName, IntValue(3))}; |
| return metadata; |
| } |
| |
| RemoteCounter MakeRemoteCounter() { |
| return RemoteCounter(kMetricName, kMetricId, kEncodingId, GetMetadata()); |
| } |
| |
| // Verify that increments increases the underlying count by 1. |
| // This is veryfying the default behaviour. |
| bool TestIncrement() { |
| BEGIN_TEST; |
| BaseCounter counter; |
| |
| ASSERT_EQ(counter.Load(), 0); |
| counter.Increment(); |
| ASSERT_EQ(counter.Load(), 1); |
| counter.Increment(); |
| ASSERT_EQ(counter.Load(), 2); |
| END_TEST; |
| } |
| |
| // Verify that increments increases the underlying count by val. |
| bool TestIncrementByVal() { |
| BEGIN_TEST; |
| BaseCounter counter; |
| |
| ASSERT_EQ(counter.Load(), 0); |
| counter.Increment(23); |
| ASSERT_EQ(counter.Load(), 23); |
| END_TEST; |
| } |
| |
| // Verify that exchangest the underlying count to 0, and returns the current value. |
| // This is veryfying the default behaviour. |
| bool TestExchange() { |
| BEGIN_TEST; |
| BaseCounter counter; |
| |
| counter.Increment(24); |
| ASSERT_EQ(counter.Load(), 24); |
| EXPECT_EQ(counter.Exchange(), 24); |
| ASSERT_EQ(counter.Load(), 0); |
| END_TEST; |
| } |
| |
| // Verify that exchangest the underlying count to 0, and returns the current value. |
| // This is veryfying the default behaviour. |
| bool TestExchangeByVal() { |
| BEGIN_TEST; |
| BaseCounter counter; |
| |
| counter.Increment(4); |
| ASSERT_EQ(counter.Load(), 4); |
| EXPECT_EQ(counter.Exchange(3), 4); |
| ASSERT_EQ(counter.Load(), 3); |
| EXPECT_EQ(counter.Exchange(2), 3); |
| ASSERT_EQ(counter.Load(), 2); |
| END_TEST; |
| } |
| |
| struct IncrementArgs { |
| // Counter to be operated on. |
| BaseCounter* counter; |
| |
| // Wait for main thread to signal before we start. |
| sync_completion_t* start; |
| |
| // Amount to increment the counter with. |
| BaseCounter::Type value; |
| }; |
| |
| int IncrementFn(void* args) { |
| IncrementArgs* increment_args = static_cast<IncrementArgs*>(args); |
| sync_completion_wait(increment_args->start, zx::sec(20).get()); |
| for (uint64_t i = 0; i < increment_args->value; ++i) { |
| increment_args->counter->Increment(increment_args->value); |
| } |
| return thrd_success; |
| } |
| |
| bool TestIncrementMultiThread() { |
| BEGIN_TEST; |
| sync_completion_t start; |
| BaseCounter counter; |
| fbl::Vector<thrd_t> thread_ids; |
| IncrementArgs args[kThreads]; |
| |
| thread_ids.reserve(kThreads); |
| for (uint64_t i = 0; i < kThreads; ++i) { |
| thread_ids.push_back({}); |
| } |
| |
| for (uint64_t i = 0; i < kThreads; ++i) { |
| auto& thread_id = thread_ids[i]; |
| args[i].counter = &counter; |
| args[i].value = static_cast<BaseCounter::Type>(i + 1); |
| args[i].start = &start; |
| ASSERT_EQ(thrd_create(&thread_id, IncrementFn, &args[i]), thrd_success); |
| } |
| |
| // Notify threads to start incrementing the count. |
| sync_completion_signal(&start); |
| |
| // Wait for all threads to finish. |
| for (const auto& thread_id : thread_ids) { |
| thrd_join(thread_id, nullptr); |
| } |
| |
| // Each thread should increase the counter by a total of value^2, which yields a total of: |
| // kThreads * (kThreads + 1) * (2 * kThreads + 1) / 6 = Sum(i=1, kThreads) i^2 |
| ASSERT_EQ(counter.Load(), kThreads * (kThreads + 1) * (2 * kThreads + 1) / 6); |
| END_TEST; |
| } |
| |
| struct ExchangeArgs { |
| // Counter to be operated on. |
| BaseCounter* counter; |
| |
| // Accumulated value of exchanged values in the counter. |
| fbl::atomic<BaseCounter::Type>* accumulated; |
| |
| // Wait for main thread to signal before we start. |
| sync_completion_t* start; |
| |
| // Amount to increment the counter with. |
| BaseCounter::Type value; |
| }; |
| |
| // After all threads exit, all but one value has been added to the accumulated var, |
| // this is the last thread to call exchange, which is why test should add the current |
| // value of the counter to the accumulated atomic obtain a deterministic result. |
| int ExchangeFn(void* args) { |
| ExchangeArgs* exchange_args = static_cast<ExchangeArgs*>(args); |
| sync_completion_wait(exchange_args->start, zx::sec(20).get()); |
| BaseCounter::Type value = exchange_args->counter->Exchange(exchange_args->value); |
| exchange_args->accumulated->fetch_add(value, fbl::memory_order_relaxed); |
| return thrd_success; |
| } |
| |
| // Verify that when exchanging all intermediate values are seen by exactly 1 thread. |
| // Everythread will exhange the seen value with their value, and add it to an atomic, |
| // the result should be the same as above except that we need to add counter.Load() + |
| // accumulated_value. |
| bool TestExchangeMultiThread() { |
| BEGIN_TEST; |
| sync_completion_t start; |
| BaseCounter counter; |
| fbl::atomic<BaseCounter::Type> accumulated(0); |
| fbl::Vector<thrd_t> thread_ids; |
| ExchangeArgs args[kThreads]; |
| |
| thread_ids.reserve(kThreads); |
| for (uint64_t i = 0; i < kThreads; ++i) { |
| thread_ids.push_back({}); |
| } |
| |
| for (uint64_t i = 0; i < kThreads; ++i) { |
| auto& thread_id = thread_ids[i]; |
| args[i].counter = &counter; |
| args[i].value = static_cast<BaseCounter::Type>(i + 1); |
| args[i].start = &start; |
| args[i].accumulated = &accumulated; |
| ASSERT_EQ(thrd_create(&thread_id, ExchangeFn, &args[i]), thrd_success); |
| } |
| |
| // Notify threads to start incrementing the count. |
| sync_completion_signal(&start); |
| |
| // Wait for all threads to finish. |
| for (const auto& thread_id : thread_ids) { |
| thrd_join(thread_id, nullptr); |
| } |
| |
| // Each thread should increase the counter by a total of value, which yields a total of: |
| // kThreads * (kThreads + 1)/ 2 = Sum(i=1, kThreads) i |
| ASSERT_EQ(counter.Load() + accumulated.load(fbl::memory_order_relaxed), |
| kThreads * (kThreads + 1) / 2); |
| END_TEST; |
| } |
| |
| bool ObservationValuesEq(ObservationValue actual, ObservationValue expected) { |
| BEGIN_HELPER; |
| EXPECT_EQ(actual.encoding_id, expected.encoding_id); |
| EXPECT_EQ(actual.name.size, expected.name.size); |
| EXPECT_STR_EQ(actual.name.data, expected.name.data); |
| EXPECT_EQ(actual.value.tag, expected.value.tag); |
| EXPECT_EQ(actual.value.int_value, expected.value.int_value); |
| END_HELPER; |
| } |
| |
| // Verify that the metadata used to create the counter is part of the flushes observation |
| // and that the current value of the counter is correct, plus resets to 0 after flush. |
| bool TestFlush() { |
| BEGIN_TEST; |
| fbl::Vector<ObservationValue>& metadata = GetMetadata(); |
| RemoteCounter counter = MakeRemoteCounter(); |
| RemoteCounter::FlushCompleteFn mark_complete; |
| counter.Increment(20); |
| uint64_t actual_metric_id; |
| fidl::VectorView<ObservationValue> actual_values; |
| |
| // Check that all data is present, we abuse some implementation details which guarantee |
| // that metadata is first in the flushed values, and the last element is the metric we |
| // are measuring, which adds some restrictions to the internal implementation, but makes the |
| // test cleaner and readable. |
| ASSERT_TRUE( |
| counter.Flush([&](uint64_t metric_id, const fidl::VectorView<ObservationValue>& values, |
| RemoteCounter::FlushCompleteFn complete_fn) { |
| actual_metric_id = metric_id; |
| actual_values = values; |
| mark_complete = fbl::move(complete_fn); |
| })); |
| // We capture the values and then verify outside to avoid having to pass flag around, |
| // and have more descriptive messages on errors. |
| ASSERT_EQ(actual_metric_id, kMetricId); |
| ASSERT_EQ(actual_values.count(), metadata.size() + 1); |
| // All metadata is present. |
| for (size_t i = 0; i < metadata.size(); ++i) { |
| ASSERT_TRUE(ObservationValuesEq(actual_values[i], metadata[i])); |
| } |
| |
| const ObservationValue& metric_obs = actual_values[actual_values.count() - 1]; |
| // Check that the last value has the expected int_value and tag. |
| ASSERT_TRUE(ObservationValuesEq(metric_obs, MakeObservation(kMetricName, IntValue(20)))); |
| |
| // We haven't 'completed' the flush, so another call should return false. |
| ASSERT_FALSE(counter.Flush(RemoteCounter::FlushFn())); |
| mark_complete(); |
| ASSERT_EQ(counter.Load(), 0); |
| ASSERT_TRUE(counter.Flush([](uint64_t metric_id, const fidl::VectorView<ObservationValue>& val, |
| RemoteCounter::FlushCompleteFn flush) {})); |
| END_TEST; |
| } |
| |
| struct FlushArgs { |
| // Counter to be incremented or flushed by a given thread. |
| RemoteCounter* counter; |
| |
| // Used to make the threads wait until all have been initialized. |
| sync_completion_t* start; |
| |
| // Flushed accumulated value. |
| fbl::atomic<RemoteCounter::Type>* accumulated; |
| |
| // Number of times to perform the operation. |
| size_t operation_count = 0; |
| |
| // Whether the thread should flush or increment. |
| bool flush = false; |
| }; |
| |
| int FlushFn(void* args) { |
| FlushArgs* flush_args = static_cast<FlushArgs*>(args); |
| sync_completion_wait(flush_args->start, zx::sec(20).get()); |
| for (size_t i = 0; i < flush_args->operation_count; ++i) { |
| if (flush_args->flush) { |
| flush_args->counter->Flush([&flush_args](uint64_t metric_id, |
| const fidl::VectorView<ObservationValue>& vals, |
| RemoteCounter::FlushCompleteFn complete_fn) { |
| const ObservationValue& val = vals[vals.count() - 1]; |
| flush_args->accumulated->fetch_add(val.value.int_value, fbl::memory_order_relaxed); |
| complete_fn(); |
| }); |
| } else { |
| flush_args->counter->Increment(); |
| } |
| } |
| return thrd_success; |
| } |
| |
| // Verify the consistency calling flush from multiple threads. There will be kThreads incrementing |
| // the counter, kThreads flushing, and at the end we flush again, and the accumulated counter should |
| // be equal to the total |kThreads| (|kThreads| + 1) / 2. |
| bool TestFlushMultithread() { |
| BEGIN_TEST; |
| sync_completion_t start; |
| RemoteCounter counter = MakeRemoteCounter(); |
| fbl::atomic<BaseCounter::Type> accumulated(0); |
| fbl::Vector<thrd_t> thread_ids; |
| FlushArgs args[kThreads]; |
| |
| thread_ids.reserve(kThreads); |
| for (uint64_t i = 0; i < kThreads; ++i) { |
| thread_ids.push_back({}); |
| } |
| |
| for (uint64_t i = 0; i < kThreads; ++i) { |
| auto& thread_id = thread_ids[i]; |
| args[i].counter = &counter; |
| args[i].operation_count = static_cast<BaseCounter::Type>(i + 1); |
| args[i].start = &start; |
| args[i].accumulated = &accumulated; |
| args[i].flush = i % 2; |
| ASSERT_EQ(thrd_create(&thread_id, FlushFn, &args[i]), thrd_success); |
| } |
| |
| // Notify threads to start incrementing the count. |
| sync_completion_signal(&start); |
| |
| // Wait for all threads to finish. |
| for (const auto& thread_id : thread_ids) { |
| thrd_join(thread_id, nullptr); |
| } |
| |
| // The total number of increment is the sum of odd numbers from 1 to 20 so |
| // |ceil(kThreads/2)|^2. |
| constexpr size_t ceil_threads = (kThreads / 2) + kThreads % 2; |
| |
| // Since the last thread to finish might not have flushed, we verify that the total of whats |
| // left, plust what we have accumulated equals the expected amount. |
| ASSERT_EQ(counter.Load() + accumulated.load(fbl::memory_order_relaxed), |
| ceil_threads * ceil_threads); |
| END_TEST; |
| } |
| |
| BEGIN_TEST_CASE(BaseCounterTest) |
| RUN_TEST(TestIncrement) |
| RUN_TEST(TestIncrementByVal) |
| RUN_TEST(TestExchange) |
| RUN_TEST(TestExchangeByVal) |
| RUN_TEST(TestIncrementMultiThread) |
| RUN_TEST(TestExchangeMultiThread) |
| END_TEST_CASE(BaseCounterTest) |
| |
| BEGIN_TEST_CASE(RemoteCounterTest) |
| RUN_TEST(TestFlush) |
| RUN_TEST(TestFlushMultithread) |
| END_TEST_CASE(RemoteCounterTest) |
| |
| } // namespace |
| } // namespace internal |
| |
| namespace { |
| |
| bool TestIncrement() { |
| BEGIN_TEST; |
| internal::RemoteCounter remote_counter = internal::MakeRemoteCounter(); |
| Counter counter(&remote_counter); |
| |
| ASSERT_EQ(counter.GetRemoteCount(), 0); |
| counter.Increment(); |
| ASSERT_EQ(counter.GetRemoteCount(), 1); |
| counter.Increment(24); |
| ASSERT_EQ(counter.GetRemoteCount(), 25); |
| END_TEST; |
| } |
| |
| struct IncrementArgs { |
| // Counter to be incremented. |
| Counter* counter; |
| |
| // Number of times to call increment. |
| size_t times = 0; |
| |
| // Signals threads to start incrementing. |
| sync_completion_t* start; |
| }; |
| |
| int IncrementFn(void* args) { |
| IncrementArgs* increment_args = static_cast<IncrementArgs*>(args); |
| sync_completion_wait(increment_args->start, zx::sec(20).get()); |
| |
| for (size_t i = 0; i < increment_args->times; ++i) { |
| increment_args->counter->Increment(increment_args->times); |
| } |
| return thrd_success; |
| } |
| |
| bool TestIncrementMultiThread() { |
| BEGIN_TEST; |
| sync_completion_t start; |
| internal::RemoteCounter remote_counter = internal::MakeRemoteCounter(); |
| Counter counter(&remote_counter); |
| fbl::Vector<thrd_t> thread_ids; |
| IncrementArgs args[kThreads]; |
| |
| thread_ids.reserve(kThreads); |
| for (uint64_t i = 0; i < kThreads; ++i) { |
| thread_ids.push_back({}); |
| } |
| |
| for (uint64_t i = 0; i < kThreads; ++i) { |
| auto& thread_id = thread_ids[i]; |
| args[i].counter = &counter; |
| args[i].times = static_cast<Counter::Count>(i + 1); |
| args[i].start = &start; |
| ASSERT_EQ(thrd_create(&thread_id, IncrementFn, &args[i]), thrd_success); |
| } |
| |
| // Notify threads to start incrementing the count. |
| sync_completion_signal(&start); |
| |
| // Wait for all threads to finish. |
| for (const auto& thread_id : thread_ids) { |
| thrd_join(thread_id, nullptr); |
| } |
| |
| // Each thread should increase the counter by a total of value^2, which yields a total of: |
| // kThreads * (kThreads + 1) * (2 * kThreads + 1) / 6 = Sum(i=1, kThreads) i^2 |
| ASSERT_EQ(counter.GetRemoteCount(), kThreads * (kThreads + 1) * (2 * kThreads + 1) / 6); |
| END_TEST; |
| } |
| |
| BEGIN_TEST_CASE(CounterTest) |
| RUN_TEST(TestIncrement) |
| RUN_TEST(TestIncrementMultiThread) |
| END_TEST_CASE(CounterTest) |
| |
| } // namespace |
| } // namespace cobalt_client |