blob: 7eb88bf0be34aed75d7c2bd6f539a8a1219c3731 [file] [log] [blame]
// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include <atomic>
#include <stdint.h>
#include <string.h>
#include <threads.h>
#include <unistd.h>
#include <cobalt-client/cpp/counter-internal.h>
#include <cobalt-client/cpp/counter.h>
#include <fbl/auto_call.h>
#include <fbl/auto_lock.h>
#include <fbl/mutex.h>
#include <fbl/string.h>
#include <fbl/string_printf.h>
#include <fuchsia/cobalt/c/fidl.h>
#include <lib/fidl/cpp/vector_view.h>
#include <lib/sync/completion.h>
#include <lib/zx/time.h>
#include <unittest/unittest.h>
#include "fake_logger.h"
namespace cobalt_client {
namespace {
constexpr uint64_t kMetricId = 1;
// Number of threads spawned for multi-threaded tests.
constexpr uint64_t kThreads = 20;
// Component name.
constexpr char kComponent[] = "SomeRamdomCounterComponent";
constexpr uint32_t kEventCode = 2;
} // namespace
namespace internal {
namespace {
MetricOptions MakeMetricOptions() {
MetricOptions options;
options.SetMode(MetricOptions::Mode::kRemote);
options.metric_id = kMetricId;
options.component = kComponent;
options.event_code = kEventCode;
return options;
}
RemoteMetricInfo MakeRemoteMetricInfo() {
return RemoteMetricInfo::From(MakeMetricOptions());
}
RemoteCounter MakeRemoteCounter() {
return RemoteCounter(MakeRemoteMetricInfo());
}
// Verify that increments increases the underlying count by 1.
// This is veryfying the default behaviour.
bool TestIncrement() {
BEGIN_TEST;
BaseCounter<uint64_t> counter;
ASSERT_EQ(counter.Load(), 0);
counter.Increment();
ASSERT_EQ(counter.Load(), 1);
counter.Increment();
ASSERT_EQ(counter.Load(), 2);
END_TEST;
}
// Verify that increments increases the underlying count by val.
bool TestIncrementByVal() {
BEGIN_TEST;
BaseCounter<uint64_t> counter;
ASSERT_EQ(counter.Load(), 0);
counter.Increment(23);
ASSERT_EQ(counter.Load(), 23);
END_TEST;
}
// Verify that exchangest the underlying count to 0, and returns the current value.
// This is veryfying the default behaviour.
bool TestExchange() {
BEGIN_TEST;
BaseCounter<uint64_t> counter;
counter.Increment(24);
ASSERT_EQ(counter.Load(), 24);
EXPECT_EQ(counter.Exchange(), 24);
ASSERT_EQ(counter.Load(), 0);
END_TEST;
}
// Verify that exchangest the underlying count to 0, and returns the current value.
// This is veryfying the default behaviour.
bool TestExchangeByVal() {
BEGIN_TEST;
BaseCounter<uint64_t> counter;
counter.Increment(4);
ASSERT_EQ(counter.Load(), 4);
EXPECT_EQ(counter.Exchange(3), 4);
ASSERT_EQ(counter.Load(), 3);
EXPECT_EQ(counter.Exchange(2), 3);
ASSERT_EQ(counter.Load(), 2);
END_TEST;
}
struct IncrementArgs {
// Counter to be operated on.
BaseCounter<uint64_t>* counter;
// Wait for main thread to signal before we start.
sync_completion_t* start;
// Amount to increment the counter with.
BaseCounter<uint64_t>::Type value;
};
int IncrementFn(void* args) {
IncrementArgs* increment_args = static_cast<IncrementArgs*>(args);
sync_completion_wait(increment_args->start, zx::sec(20).get());
for (uint64_t i = 0; i < increment_args->value; ++i) {
increment_args->counter->Increment(increment_args->value);
}
return thrd_success;
}
bool TestIncrementMultiThread() {
BEGIN_TEST;
sync_completion_t start;
BaseCounter<uint64_t> counter;
fbl::Vector<thrd_t> thread_ids;
IncrementArgs args[kThreads];
thread_ids.reserve(kThreads);
for (uint64_t i = 0; i < kThreads; ++i) {
thread_ids.push_back({});
}
for (uint64_t i = 0; i < kThreads; ++i) {
auto& thread_id = thread_ids[i];
args[i].counter = &counter;
args[i].value = static_cast<BaseCounter<uint64_t>::Type>(i + 1);
args[i].start = &start;
ASSERT_EQ(thrd_create(&thread_id, IncrementFn, &args[i]), thrd_success);
}
// Notify threads to start incrementing the count.
sync_completion_signal(&start);
// Wait for all threads to finish.
for (const auto& thread_id : thread_ids) {
thrd_join(thread_id, nullptr);
}
// Each thread should increase the counter by a total of value^2, which yields a total of:
// kThreads * (kThreads + 1) * (2 * kThreads + 1) / 6 = Sum(i=1, kThreads) i^2
ASSERT_EQ(counter.Load(), kThreads * (kThreads + 1) * (2 * kThreads + 1) / 6);
END_TEST;
}
struct ExchangeArgs {
// Counter to be operated on.
BaseCounter<uint64_t>* counter;
// Accumulated value of exchanged values in the counter.
std::atomic<BaseCounter<uint64_t>::Type>* accumulated;
// Wait for main thread to signal before we start.
sync_completion_t* start;
// Amount to increment the counter with.
BaseCounter<uint64_t>::Type value;
};
// After all threads exit, all but one value has been added to the accumulated var,
// this is the last thread to call exchange, which is why test should add the current
// value of the counter to the accumulated atomic obtain a deterministic result.
int ExchangeFn(void* args) {
ExchangeArgs* exchange_args = static_cast<ExchangeArgs*>(args);
sync_completion_wait(exchange_args->start, zx::sec(20).get());
BaseCounter<uint64_t>::Type value = exchange_args->counter->Exchange(exchange_args->value);
exchange_args->accumulated->fetch_add(value, std::memory_order_relaxed);
return thrd_success;
}
// Verify that when exchanging all intermediate values are seen by exactly 1 thread.
// Everythread will exhange the seen value with their value, and add it to an atomic,
// the result should be the same as above except that we need to add counter.Load() +
// accumulated_value.
bool TestExchangeMultiThread() {
BEGIN_TEST;
sync_completion_t start;
BaseCounter<uint64_t> counter;
std::atomic<BaseCounter<uint64_t>::Type> accumulated(0);
fbl::Vector<thrd_t> thread_ids;
ExchangeArgs args[kThreads];
thread_ids.reserve(kThreads);
for (uint64_t i = 0; i < kThreads; ++i) {
thread_ids.push_back({});
}
for (uint64_t i = 0; i < kThreads; ++i) {
auto& thread_id = thread_ids[i];
args[i].counter = &counter;
args[i].value = static_cast<BaseCounter<uint64_t>::Type>(i + 1);
args[i].start = &start;
args[i].accumulated = &accumulated;
ASSERT_EQ(thrd_create(&thread_id, ExchangeFn, &args[i]), thrd_success);
}
// Notify threads to start incrementing the count.
sync_completion_signal(&start);
// Wait for all threads to finish.
for (const auto& thread_id : thread_ids) {
thrd_join(thread_id, nullptr);
}
// Each thread should increase the counter by a total of value, which yields a total of:
// kThreads * (kThreads + 1)/ 2 = Sum(i=1, kThreads) i
ASSERT_EQ(counter.Load() + accumulated.load(std::memory_order_relaxed),
kThreads * (kThreads + 1) / 2);
END_TEST;
}
// Verify flushed values match and the delta is set to 0.
bool TestFlush() {
BEGIN_TEST;
RemoteCounter counter = MakeRemoteCounter();
counter.Increment(20);
RemoteMetricInfo actual_metric_info;
RemoteMetricInfo expected_metric_info = MakeRemoteMetricInfo();
FakeLogger logger;
logger.set_should_fail(false);
int64_t actual_count;
// Check that all data is present, we abuse some implementation details which guarantee
// that metadata is first in the flushed values, and the last element is the event_data we
// are measuring, which adds some restrictions to the internal implementation, but makes the
// test cleaner and readable.
ASSERT_TRUE(counter.Flush(&logger));
actual_metric_info = logger.logged_counts()[0].metric_info;
actual_count = logger.logged_counts()[0].count;
// We capture the values and then verify outside to avoid having to pass flag around,
// and have more descriptive messages on errors.
ASSERT_TRUE(actual_metric_info == expected_metric_info);
ASSERT_EQ(actual_count, 20);
ASSERT_EQ(counter.Load(), 0);
END_TEST;
}
// Verify that the metadata used to create the counter is part of the flushes observation
// and that the current value of the counter is correct, plus resets to 0 after flush.
bool TestUndoFlush() {
BEGIN_TEST;
RemoteCounter counter = MakeRemoteCounter();
FakeLogger logger;
logger.set_should_fail(false);
counter.Increment(20);
ASSERT_TRUE(counter.Flush(&logger));
ASSERT_EQ(counter.Load(), 0);
counter.UndoFlush();
ASSERT_EQ(counter.Load(), 20);
END_TEST;
}
struct FlushArgs {
// Counter to be incremented or flushed by a given thread.
RemoteCounter* counter;
// Used to make the threads wait until all have been initialized.
sync_completion_t* start;
// Flushed accumulated value.
std::atomic<RemoteCounter::Type>* accumulated;
// Number of times to perform the operation.
size_t operation_count = 0;
// Flush operations are thread-compatible and we use a mutex
// to provide a barrier. Interaction between other operations
// and flushes should remain thread-safe and eventually consistent.
fbl::Mutex* flush_mutex = nullptr;
// Whether the thread should flush or increment.
bool flush = false;
};
int FlushFn(void* args) {
FlushArgs* flush_args = static_cast<FlushArgs*>(args);
sync_completion_wait(flush_args->start, zx::sec(20).get());
FakeLogger logger;
for (size_t i = 0; i < flush_args->operation_count; ++i) {
if (flush_args->flush) {
fbl::AutoLock lock(flush_args->flush_mutex);
flush_args->counter->Flush(&logger);
} else {
flush_args->counter->Increment();
}
}
if (flush_args->flush) {
for (auto count_entry : logger.logged_counts()) {
flush_args->accumulated->fetch_add(count_entry.count, std::memory_order_relaxed);
}
}
return thrd_success;
}
// Verify the consistency calling flush from multiple threads. There will be kThreads incrementing
// the counter, kThreads flushing, and at the end we flush again, and the accumulated counter should
// be equal to the total |kThreads| (|kThreads| + 1) / 2.
bool TestFlushMultithread() {
BEGIN_TEST;
sync_completion_t start;
RemoteCounter counter = MakeRemoteCounter();
std::atomic<BaseCounter<int64_t>::Type> accumulated(0);
fbl::Vector<thrd_t> thread_ids;
fbl::Mutex flush_mutex;
FlushArgs args[kThreads];
thread_ids.reserve(kThreads);
for (uint64_t i = 0; i < kThreads; ++i) {
thread_ids.push_back({});
}
for (uint64_t i = 0; i < kThreads; ++i) {
auto& thread_id = thread_ids[i];
args[i].counter = &counter;
args[i].operation_count = static_cast<BaseCounter<uint64_t>::Type>(i + 1);
args[i].start = &start;
args[i].accumulated = &accumulated;
args[i].flush = i % 2;
args[i].flush_mutex = &flush_mutex;
ASSERT_EQ(thrd_create(&thread_id, FlushFn, &args[i]), thrd_success);
}
// Notify threads to start incrementing the count.
sync_completion_signal(&start);
// Wait for all threads to finish.
for (const auto& thread_id : thread_ids) {
thrd_join(thread_id, nullptr);
}
// The total number of increment is the sum of odd numbers from 1 to 20 so
// |ceil(kThreads/2)|^2.
constexpr size_t ceil_threads = (kThreads / 2) + kThreads % 2;
// Since the last thread to finish might not have flushed, we verify that the total of whats
// left, plust what we have accumulated equals the expected amount.
ASSERT_EQ(counter.Load() + accumulated.load(std::memory_order_relaxed),
ceil_threads * ceil_threads);
END_TEST;
}
BEGIN_TEST_CASE(BaseCounterTest)
RUN_TEST(TestIncrement)
RUN_TEST(TestIncrementByVal)
RUN_TEST(TestExchange)
RUN_TEST(TestExchangeByVal)
RUN_TEST(TestIncrementMultiThread)
RUN_TEST(TestExchangeMultiThread)
END_TEST_CASE(BaseCounterTest)
BEGIN_TEST_CASE(RemoteCounterTest)
RUN_TEST(TestFlush)
RUN_TEST(TestUndoFlush)
RUN_TEST(TestFlushMultithread)
END_TEST_CASE(RemoteCounterTest)
} // namespace
} // namespace internal
namespace {
bool TestIncrement() {
BEGIN_TEST;
Counter counter(internal::MakeMetricOptions());
ASSERT_EQ(counter.GetRemoteCount(), 0);
counter.Increment();
ASSERT_EQ(counter.GetRemoteCount(), 1);
counter.Increment(24);
ASSERT_EQ(counter.GetRemoteCount(), 25);
END_TEST;
}
struct IncrementArgs {
// Counter to be incremented.
Counter* counter;
// Number of times to call increment.
size_t times = 0;
// Signals threads to start incrementing.
sync_completion_t* start;
};
int IncrementFn(void* args) {
IncrementArgs* increment_args = static_cast<IncrementArgs*>(args);
sync_completion_wait(increment_args->start, zx::sec(20).get());
for (size_t i = 0; i < increment_args->times; ++i) {
increment_args->counter->Increment(increment_args->times);
}
return thrd_success;
}
bool TestIncrementMultiThread() {
BEGIN_TEST;
sync_completion_t start;
Counter counter(internal::MakeMetricOptions());
fbl::Vector<thrd_t> thread_ids;
IncrementArgs args[kThreads];
thread_ids.reserve(kThreads);
for (uint64_t i = 0; i < kThreads; ++i) {
thread_ids.push_back({});
}
for (uint64_t i = 0; i < kThreads; ++i) {
auto& thread_id = thread_ids[i];
args[i].counter = &counter;
args[i].times = static_cast<Counter::Count>(i + 1);
args[i].start = &start;
ASSERT_EQ(thrd_create(&thread_id, IncrementFn, &args[i]), thrd_success);
}
// Notify threads to start incrementing the count.
sync_completion_signal(&start);
// Wait for all threads to finish.
for (const auto& thread_id : thread_ids) {
thrd_join(thread_id, nullptr);
}
// Each thread should increase the counter by a total of value^2, which yields a total of:
// kThreads * (kThreads + 1) * (2 * kThreads + 1) / 6 = Sum(i=1, kThreads) i^2
ASSERT_EQ(counter.GetRemoteCount(), kThreads * (kThreads + 1) * (2 * kThreads + 1) / 6);
END_TEST;
}
BEGIN_TEST_CASE(CounterTest)
RUN_TEST(TestIncrement)
RUN_TEST(TestIncrementMultiThread)
END_TEST_CASE(CounterTest)
} // namespace
} // namespace cobalt_client