blob: a85ea4c6bb090ee90bdced72299198989d99b897 [file] [log] [blame]
// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include <stdint.h>
#include <string.h>
#include <threads.h>
#include <unistd.h>
#include <cobalt-client/cpp/histogram-internal.h>
#include <fbl/auto_call.h>
#include <fbl/string.h>
#include <fbl/string_printf.h>
#include <fuchsia/cobalt/c/fidl.h>
#include <lib/sync/completion.h>
#include <lib/zx/time.h>
#include <unittest/unittest.h>
namespace cobalt_client {
namespace internal {
namespace {
// Number of threads to use for multithreading sanity check.
constexpr uint32_t kThreads = 20;
// Scalar used in UpdateHistogramFn to calculate the number of operations
// per bucket.
constexpr uint64_t kUpdateScalar = 100;
constexpr uint64_t kMetricId = 5;
constexpr uint32_t kEncodingId = 1;
constexpr uint32_t kBuckets = 20;
constexpr char kName[] = "SomeName";
bool Contains(const fbl::Vector<uint64_t>& container, uint64_t val) {
for (auto el : container) {
if (el == val) {
return true;
}
}
return false;
}
ObservationValue MakeObservationValue(const char* name, Value value) {
ObservationValue obs;
obs.name.size = strlen(name) + 1;
obs.name.data = const_cast<char*>(name);
obs.value = value;
obs.encoding_id = kEncodingId;
return obs;
}
const fbl::Vector<ObservationValue>& GetMetadata() {
static const char part_name[] = "part";
static const fbl::Vector<ObservationValue> metadata = {
MakeObservationValue(part_name, IntValue(24)),
MakeObservationValue(part_name, DoubleValue(0.125)),
};
return metadata;
}
BaseHistogram MakeHistogram() {
BaseHistogram histogram(kName, GetMetadata(), kBuckets, kMetricId, kEncodingId);
return histogram;
}
struct CheckContentsFlushFn {
void operator()(uint64_t metric_id, const fidl::VectorView<ObservationValue>& observations,
BaseHistogram::FlushCompleteFn complete_fn) const {
const fbl::Vector<ObservationValue>& metadata = GetMetadata();
auto complete_flush = fbl::MakeAutoCall(fbl::move(complete_fn));
size_t expected_size = metadata.size() + 1;
if (observations.count() < expected_size) {
*error = fbl::StringPrintf("observations.count()(%lu) != expected_size(%lu)\n",
observations.count(), expected_size);
return;
}
fbl::Vector<size_t> visited_obs;
for (size_t curr_meta = 0; curr_meta < metadata.size(); ++curr_meta) {
const auto& meta_obs = metadata[curr_meta];
bool found_match = false;
for (size_t current_obs = 0; current_obs < metadata.size(); ++current_obs) {
if (Contains(visited_obs, current_obs)) {
continue;
}
const ObservationValue& value = observations[current_obs];
if (value.encoding_id != encoding_id) {
*error = fbl::StringPrintf(
"observations[%lu].encoding_id(%u) != expected_encoding_id(%u)\n",
current_obs, meta_obs.encoding_id, encoding_id);
return;
}
if (memcmp(meta_obs.name.data, value.name.data, value.name.size) == 0 &&
memcmp(&meta_obs.value, &value.value, sizeof(Value)) == 0) {
found_match = true;
visited_obs.push_back(current_obs);
break;
}
}
if (!found_match) {
*error = fbl::StringPrintf("metadata[%lu] is not in observations.\n", curr_meta);
return;
}
}
size_t histogram_index = metadata.size();
const ObservationValue& hist_obs = observations[histogram_index];
if (hist_obs.encoding_id != encoding_id) {
*error =
fbl::StringPrintf("observations[%lu].encoding_id(%u) != expected_encoding_id(%u)\n",
histogram_index, hist_obs.encoding_id, encoding_id);
return;
}
if (memcmp(histogram_name.c_str(), hist_obs.name.data, hist_obs.name.size) != 0) {
*error = fbl::StringPrintf("observations[%lu].name(%*s) != histogram_name(%s)\n",
histogram_index, static_cast<int>(hist_obs.name.size),
hist_obs.name.data, histogram_name.c_str());
return;
}
// Check bucket values.
if (hist_obs.value.tag != fuchsia_cobalt_ValueTagint_bucket_distribution) {
*error = fbl::StringPrintf(
"observations[%lu].value not IntBucketDistribution. tag(%u) != %u\n",
histogram_index, hist_obs.value.tag,
fuchsia_cobalt_ValueTagint_bucket_distribution);
return;
}
if (hist_obs.value.int_bucket_distribution.count != bucket_values.size()) {
*error =
fbl::StringPrintf("observations[%lu].value.int_bucket_distribution.count(%lu) "
"!= bucket_values.size()(%lu)",
histogram_index, hist_obs.value.int_bucket_distribution.count,
bucket_values.size());
return;
}
BucketDistributionEntry* buckets =
static_cast<BucketDistributionEntry*>(hist_obs.value.int_bucket_distribution.data);
for (size_t bucket_index = 0; bucket_index < bucket_values.size(); ++bucket_index) {
bool index_found = false;
for (size_t bucket = 0; bucket < bucket_values.size(); ++bucket) {
if (buckets[bucket].index == bucket_index) {
index_found = true;
if (buckets[bucket].count != bucket_values[bucket_index]) {
*error = fbl::StringPrintf(
"bucket_value[%lu](%lu) != buckets[%lu].count(%lu), but index match!\n",
bucket_index, bucket_values[bucket], bucket, buckets[bucket].count);
return;
}
break;
}
}
if (!index_found) {
*error = fbl::StringPrintf(
"bucket at index %lu is missing from the observed buckets.\n", bucket_index);
return;
}
}
}
fbl::Vector<uint64_t> bucket_values;
fbl::String histogram_name;
fbl::String* error;
uint32_t encoding_id;
};
// Add an observation and verify the bucket is updated.
bool AddObservationTest() {
BEGIN_TEST;
BaseHistogram histogram = MakeHistogram();
ASSERT_EQ(histogram.GetCount(/*bucket=*/10), 0);
histogram.IncrementCount(/*bucket=*/10);
ASSERT_EQ(histogram.GetCount(10), 1);
END_TEST;
}
// Verifies the order and the data in the elements when flushed, everything in the histogram
// should be flushed. This is true only for single threaded environment, since operation reordering,
// might send stale version of the bucket, but everything will eventually be flushed, so eventually
// all data will handled in multi-threaded environment.
bool FlushTest() {
BEGIN_TEST;
fbl::String error;
CheckContentsFlushFn handler;
handler.histogram_name = kName;
handler.encoding_id = kEncodingId;
handler.error = &error;
BaseHistogram histogram = MakeHistogram();
// bucket[i] = i
for (uint64_t bucket_index = 0; bucket_index < kBuckets; ++bucket_index) {
for (uint64_t op = 0; op < bucket_index; ++op) {
histogram.IncrementCount(/*bucket=*/bucket_index);
}
handler.bucket_values.push_back(bucket_index);
}
ASSERT_TRUE(histogram.Flush(fbl::move(handler)));
ASSERT_TRUE(error.empty(), error.c_str());
END_TEST;
}
bool FlushWhileFlushingTest() {
BEGIN_TEST;
BaseHistogram::FlushCompleteFn complete_cb;
BaseHistogram histogram = MakeHistogram();
ASSERT_TRUE(histogram.Flush([&complete_cb](uint64_t metric_id,
const fidl::VectorView<ObservationValue>& obs,
BaseHistogram::FlushCompleteFn complete_fn) {
complete_cb = fbl::move(complete_fn);
}));
ASSERT_FALSE(histogram.Flush(BaseHistogram::FlushFn()));
complete_cb();
ASSERT_TRUE(histogram.Flush([](uint64_t, const fidl::VectorView<ObservationValue>&,
BaseHistogram::FlushCompleteFn) {}));
ASSERT_FALSE(histogram.Flush(BaseHistogram::FlushFn()));
END_TEST;
}
struct UpdateHistogramFnArgs {
BaseHistogram* histogram;
sync_completion_t* completion;
};
int UpdateHistogramFn(void* void_args) {
UpdateHistogramFnArgs* args = static_cast<UpdateHistogramFnArgs*>(void_args);
// Wait for call.
zx_status_t res = sync_completion_wait(args->completion, zx::sec(20).get());
if (res != ZX_OK) {
return thrd_error;
}
for (uint64_t bucket = 0; bucket < kBuckets; ++bucket) {
for (uint64_t i = 0; i < kUpdateScalar * bucket; ++i) {
args->histogram->IncrementCount(bucket);
}
}
return thrd_success;
}
// Verify that incrementing each bucket |kUpdateScalar|*|bucket_index| times per thread in
// |kThreads| threads, have a consistent view of the data, which allows the histogram to be
// thread-safe through this operations.
bool MultiThreadCountOpsConsistencyTest() {
BEGIN_TEST;
BaseHistogram histogram = MakeHistogram();
fbl::Vector<thrd_t> thread_ids;
sync_completion_t wait_for_start;
UpdateHistogramFnArgs args;
args.histogram = &histogram;
args.completion = &wait_for_start;
// Initialize all threads then signal start, and join all fo them.
thread_ids.reserve(kThreads);
for (uint32_t i = 0; i < kThreads; ++i) {
thread_ids.push_back(0);
}
for (auto& thread_id : thread_ids) {
ASSERT_EQ(thrd_create(&thread_id, UpdateHistogramFn, &args), thrd_success);
}
sync_completion_signal(&wait_for_start);
for (auto& thread_id : thread_ids) {
int res = thrd_success;
ASSERT_EQ(thrd_join(thread_id, &res), thrd_success);
ASSERT_EQ(res, thrd_success);
}
// Verify the result is coherent.
// bucket[i] = i * kUpdateScalar * kThreads
for (uint64_t bucket_index = 0; bucket_index < kBuckets; ++bucket_index) {
ASSERT_EQ(histogram.GetCount(bucket_index), bucket_index * kUpdateScalar * kThreads);
}
END_TEST;
}
struct WaitBeforeCompleteFlushHandler {
void operator()(uint64_t metric_id, const fidl::VectorView<ObservationValue>& observations,
BaseHistogram::FlushCompleteFn complete_fn) {
ZX_ASSERT(thread_ids != nullptr);
ZX_ASSERT(flushing_thread != nullptr);
for (auto thread_id : *thread_ids) {
if (thread_id != thrd_current()) {
int res;
thrd_join(thread_id, &res);
}
}
*flushing_thread = thrd_current();
counter->Increment();
complete_fn();
sync_completion_signal(completion);
}
// List of all threads that were spawned to flush.
fbl::Vector<thrd_t>* thread_ids;
// Notify main thread that all threads finished while this thread was flushing.
sync_completion_t* completion;
// Counter for the number of times Flush was actually called.
BaseCounter* counter;
// Main thread will join this thread.
thrd_t* flushing_thread;
};
struct FlushHistogramFnArgs {
BaseHistogram* histogram;
// FlushHandler
WaitBeforeCompleteFlushHandler flush_handler;
// Notify to start the threads together.
sync_completion_t* start;
};
// This verifies that while a thread is flushing, all other threads fail to flush the contents
// of the histogram thus any action that needs to be taken over the observation buffer is safe,
// as long as.
int FlushHistogramFn(void* void_args) {
FlushHistogramFnArgs* args = static_cast<FlushHistogramFnArgs*>(void_args);
sync_completion_wait(args->start, zx::sec(20).get());
args->histogram->Flush(fbl::move(args->flush_handler));
return thrd_success;
}
bool MultiThreadFlushOpsConsistencyTest() {
BEGIN_TEST;
BaseHistogram histogram = MakeHistogram();
FlushHistogramFnArgs args[kThreads];
fbl::Vector<thrd_t> thread_ids;
sync_completion_t wait_for_start, wait_for_completion;
thrd_t flushing_thread;
BaseCounter flushes;
// Initialize all threads then signal start, and join all fo them.
thread_ids.reserve(kThreads);
for (uint32_t i = 0; i < kThreads; ++i) {
thread_ids.push_back(0);
}
for (uint64_t thread = 0; thread < kThreads; ++thread) {
auto& thread_id = thread_ids[thread];
auto handler = WaitBeforeCompleteFlushHandler();
handler.thread_ids = &thread_ids;
handler.completion = &wait_for_completion;
handler.counter = &flushes;
handler.flushing_thread = &flushing_thread;
args[thread].flush_handler = fbl::move(handler);
args[thread].histogram = &histogram;
args[thread].start = &wait_for_start;
ASSERT_EQ(thrd_create(&thread_id, FlushHistogramFn, &args), thrd_success);
}
sync_completion_signal(&wait_for_start);
sync_completion_wait(&wait_for_completion, zx::sec(20).get());
// Wait for the flushing thread to finish.
thrd_join(flushing_thread, nullptr);
// Verify the number of times it was flushed, must be one.
ASSERT_EQ(flushes.Load(), 1);
// Verify that if we flush again it succeeds.
ASSERT_TRUE(histogram.Flush([](uint64_t, const fidl::VectorView<ObservationValue>&,
BaseHistogram::FlushCompleteFn) {}));
ASSERT_FALSE(histogram.Flush([](uint64_t, const fidl::VectorView<ObservationValue>&,
BaseHistogram::FlushCompleteFn) {}));
END_TEST;
}
BEGIN_TEST_CASE(BaseHistogramTest)
RUN_TEST(AddObservationTest)
RUN_TEST(FlushTest)
RUN_TEST(FlushWhileFlushingTest)
RUN_TEST(MultiThreadCountOpsConsistencyTest)
RUN_TEST(MultiThreadFlushOpsConsistencyTest)
END_TEST_CASE(BaseHistogramTest)
} // namespace
} // namespace internal
} // namespace cobalt_client