blob: e284ea654b67ed4ce9ea90967e11ef580d1d9223 [file] [log] [blame]
// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "client/collection/observations_collector.h"
#include "gflags/gflags.h"
#include "glog/logging.h"
#include "third_party/googletest/googletest/include/gtest/gtest.h"
const int64_t kPeriodSize = 1000;
const int64_t kPeriodCount = 1000;
const int64_t kThreadNum = 100;
const int64_t kSamplerMaxInt = 20;
namespace cobalt {
namespace client {
namespace {
// Function that increments a counter kPeriodSize * kPeriodCount times in
// kPeriodCount increments with some random jitter in between.
void DoIncrement(std::shared_ptr<Counter> counter) {
for (int64_t i = 0; i < kPeriodCount; i++) {
for (int64_t j = 0; j < kPeriodSize; j++) {
// Introduce jitter to test.
std::this_thread::sleep_for(std::chrono::microseconds(std::rand() % 100));
// Sink is used to gather all the observations sent by the MetricFactory.
struct Sink {
explicit Sink(bool with_errors) : with_errors(with_errors) {}
std::vector<size_t> SendObservations(std::vector<Observation>* obs) {
std::vector<size_t> errors;
for (auto iter = std::make_move_iterator(obs->begin());
std::make_move_iterator(obs->end()) != iter; iter++) {
// Randomly fail to "send" some observations.
if (with_errors && std::rand() % 5 == 0) {
std::distance(std::make_move_iterator(obs->begin()), iter));
return errors;
std::vector<Observation> observations;
bool with_errors;
} // namespace
// Checks that Counters work correctly with many threads updating them.
TEST(Counter, Normal) {
// Metric id.
const int64_t kMetricId = 10;
Sink sink(true);
ObservationsCollector collector(
std::bind(&Sink::SendObservations, &sink, std::placeholders::_1), 1);
auto counter = collector.MakeCounter(kMetricId, "part_name");
// Each thread will add kPeriodSize * kPeriodCount to the counter.
int64_t expected = kPeriodSize * kPeriodCount * kThreadNum;
std::vector<std::thread> threads;
// Start all the incrementer threads.
for (int i = 0; i < kThreadNum; i++) {
threads.push_back(std::thread(DoIncrement, counter));
// Start the collection thread.
// Wait until all the incrementer threads have finished.
for (auto iter = threads.begin(); iter != threads.end(); iter++) {
// Stop the collection thread.
// Add up all the observations in the sink.
int64_t actual = 0;
for (auto iter = sink.observations.begin(); sink.observations.end() != iter;
iter++) {
actual += (*iter).parts[0].value.GetIntValue();
EXPECT_EQ(expected, actual);
// Function that logs kPeriodSize * kPeriodCount random integers to an
// IntegerSampler in kPeriodCount increments with some random jitter in between.
void DoLogObservation(std::shared_ptr<IntegerSampler> int_sampler) {
std::random_device rd;
std::default_random_engine gen(rd());
// Uniformly sample from the set [0...kSamplerMaxInt].
std::uniform_int_distribution<> dis(0, kSamplerMaxInt);
for (int64_t i = 0; i < kPeriodCount; i++) {
for (int64_t j = 0; j < kPeriodSize; j++) {
// Introduce jitter to test.
std::this_thread::sleep_for(std::chrono::microseconds(std::rand() % 100));
// Test that the average of the samples is within an expected range of the
// theoretical average.
TEST(IntegerSampler, IntegerAverage) {
// Metric id.
const int64_t kMetricId = 10;
Sink sink(false);
ObservationsCollector collector(
std::bind(&Sink::SendObservations, &sink, std::placeholders::_1), 1);
// The IntegerSampler will collect at most 100 elements at a time.
size_t sample_size = 100;
auto int_sampler =
collector.MakeIntegerSampler(kMetricId, "part_name", sample_size);
// Each thread will log kPeriodSize * kPeriodCount times to the
// IntegerSampler.
std::vector<std::thread> threads;
for (int i = 0; i < kThreadNum; i++) {
threads.push_back(std::thread(DoLogObservation, int_sampler));
// Start the collection thread.
// Wait until all the logging threads have finished.
for (auto iter = threads.begin(); iter != threads.end(); iter++) {
// Stop the collection thread.
// In the worst case scenario, the number of collected observations is
// kPeriodSize * kPeriodCount * kThreadNum.
// In the worst case scenario, where every generated number is 20, the total
// is 2*10^9. This comfortably fits in a 64 bits signed integer.
int64_t total = 0;
int64_t num_obs = 0;
for (auto iter = sink.observations.begin(); sink.observations.end() != iter;
iter++) {
total += (*iter).parts[0].value.GetIntValue();
double sample_mean =
static_cast<double>(total) / static_cast<double>(num_obs);
// We sample num_obs elements from the uniform distribution
// [0...kSamplerMaxInt]. The central limit theorem tells us the average of
// these num_obs samples will be normally distributed with the same mean as
// the uniform distribution and a variance equal to 1/num_obs times the
// uniform distribution's variance.
double expected_mean = kSamplerMaxInt / 2;
double expected_stddev =
std::sqrt((kSamplerMaxInt + 1) * (kSamplerMaxInt) / 12.0 / num_obs);
// We test to see if the sample mean is within 4.5 standard deviations of
// the expected mean. This should prevent false positives at least 99.999% of
// the time.
EXPECT_NEAR(expected_mean, sample_mean, expected_stddev * 4.5);
// Test that if a source of data has a very strong time-dependent bias, the
// IntegerSampler does not reflect that time-dependency.
TEST(IntegerSampler, CheckUniformity) {
// Metric id.
const int64_t kMetricId = 10;
Sink sink(false);
ObservationsCollector collector(
std::bind(&Sink::SendObservations, &sink, std::placeholders::_1), 1);
// The IntegerSampler will collect at most 100 elements at a time.
size_t sample_size = 100;
auto int_sampler =
collector.MakeIntegerSampler(kMetricId, "part_name", sample_size);
// We log integers in increasing order. This is to check that the ordering of
// the logged observations is not related to the distribution of the sampled
// observations.
for (int64_t val = 0; val <= kSamplerMaxInt; val++) {
for (size_t i = 0; i < sample_size * 10; i++) {
int64_t total = 0;
int64_t num_obs = 0;
for (auto iter = sink.observations.begin(); sink.observations.end() != iter;
iter++) {
total += (*iter).parts[0].value.GetIntValue();
double sample_mean =
static_cast<double>(total) / static_cast<double>(num_obs);
// We sample num_obs elements from the uniform distribution
// [0...kSamplerMaxInt]. The central limit theorem tells us the average of
// these num_obs samples will be normally distributed with the same mean as
// the uniform distribution and a variance equal to 1/num_obs times the
// uniform distribution's variance.
double expected_mean = kSamplerMaxInt / 2;
double expected_stddev =
std::sqrt((kSamplerMaxInt + 1) * (kSamplerMaxInt) / 12.0 / num_obs);
// We test to see if the sample mean is within 4.5 standard deviations of
// the expected mean. This should prevent false positives at least 99.999% of
// the time.
EXPECT_NEAR(expected_mean, sample_mean, expected_stddev * 4.5);
void DoLogEvent(std::shared_ptr<EventLogger> logger, uint32_t status) {
std::random_device rd;
std::default_random_engine gen(rd());
// Uniformly sample from the set [0...kSamplerMaxInt].
std::uniform_int_distribution<> time(0, kSamplerMaxInt);
for (int64_t i = 0; i < kPeriodCount; i++) {
for (int64_t j = 0; j < kPeriodSize; j++) {
logger->LogEvent(time(gen), status);
// Introduce jitter to test.
std::this_thread::sleep_for(std::chrono::microseconds(std::rand() % 100));
TEST(EventLogger, Normal) {
const uint32_t kEventMetricId = 1;
const uint32_t kMaxStatus = 4;
const uint32_t kEventTimingMetricId = 2;
const size_t kThreadNum = 100;
ASSERT_EQ(kThreadNum % (kMaxStatus + 1), size_t(0))
<< "kThreadNum must be divisible by the number of statuses.";
size_t samples = 100;
Sink sink(false);
ObservationsCollector collector(
std::bind(&Sink::SendObservations, &sink, std::placeholders::_1), 1);
auto logger = collector.MakeEventLogger(kEventMetricId, kMaxStatus,
kEventTimingMetricId, samples);
std::vector<std::thread> threads;
for (size_t i = 0; i < kThreadNum; i++) {
uint32_t status = i % (kMaxStatus + 1);
threads.push_back(std::thread(DoLogEvent, logger, status));
// Start the collection thread.
// Wait until all the logging threads have finished.
for (auto iter = threads.begin(); iter != threads.end(); iter++) {
// Stop the collection thread.
// We sum up the distributions and check the sum is as expected.
int64_t histogram[kMaxStatus + 1] = {0, 0, 0, 0, 0};
for (auto iter = sink.observations.begin(); sink.observations.end() != iter;
iter++) {
if (iter->metric_id != kEventMetricId) continue;
for (auto part_iter = iter->parts.begin(); iter->parts.end() != part_iter;
part_iter++) {
if (part_iter->part_name == "status") {
auto dist = part_iter->value.GetDistribution();
for (auto status_iter = dist.begin(); dist.end() != status_iter;
status_iter++) {
histogram[status_iter->first] += status_iter->second;
int64_t expected_histogram_value =
kThreadNum / (kMaxStatus + 1) * kPeriodSize * kPeriodCount;
for (size_t status = 0; status <= kMaxStatus; status++) {
EXPECT_EQ(histogram[status], expected_histogram_value);
// Check that the integer value part work correctly.
TEST(ValuePart, IntValuePart) {
ValuePart value = ValuePart::Make(10);
EXPECT_EQ(10, value.GetIntValue());
EXPECT_EQ(ValuePart::INT, value.Which());
TEST(ValuePart, DistributionValuePart) {
std::map<uint32_t, int64_t> distribution = {{0, 1}, {1, 2}, {2, 4}};
ValuePart value = ValuePart::Make(distribution);
EXPECT_EQ(distribution.size(), value.GetDistribution().size());
EXPECT_EQ(ValuePart::DISTRIBUTION, value.Which());
// Check that the copy constructor works.
ValuePart copy = value;
EXPECT_EQ(distribution.size(), copy.GetDistribution().size());
EXPECT_EQ(ValuePart::DISTRIBUTION, copy.Which());
} // namespace client
} // namespace cobalt