blob: 8662abe0a07f4bacac771ea1f35d0c04dbbfab46 [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "src/lib/metrics_buffer/metrics_buffer.h"
#include <inttypes.h>
#include <lib/async-loop/default.h>
#include <lib/async-loop/loop.h>
//#include <lib/syslog/global.h>
#include <lib/async/cpp/task.h>
#include <zircon/assert.h>
#include <zircon/types.h>
#include <mutex>
#include <src/lib/cobalt/cpp/cobalt_event_builder.h>
#include "log.h"
namespace cobalt {
// static
std::shared_ptr<MetricsBuffer> MetricsBuffer::Create(uint32_t project_id) {
return std::shared_ptr<MetricsBuffer>(new MetricsBuffer(project_id));
}
// static
std::shared_ptr<MetricsBuffer> MetricsBuffer::Create(
uint32_t project_id, std::shared_ptr<sys::ServiceDirectory> service_directory) {
return std::shared_ptr<MetricsBuffer>(new MetricsBuffer(project_id, service_directory));
}
MetricsBuffer::MetricsBuffer(uint32_t project_id) : project_id_(project_id) {
std::lock_guard<std::mutex> lock(lock_);
ZX_DEBUG_ASSERT(!loop_ && !cobalt_logger_);
}
MetricsBuffer::MetricsBuffer(uint32_t project_id,
std::shared_ptr<sys::ServiceDirectory> service_directory)
: project_id_(project_id) {
SetServiceDirectory(service_directory);
}
MetricsBuffer::~MetricsBuffer() { SetServiceDirectory(nullptr); }
void MetricsBuffer::SetServiceDirectory(std::shared_ptr<sys::ServiceDirectory> service_directory) {
std::unique_ptr<cobalt::CobaltLogger> logger_to_delete_outside_lock;
std::unique_ptr<async::Loop> loop_to_stop_outside_lock;
{ // scope lock
std::lock_guard<std::mutex> lock(lock_);
ZX_DEBUG_ASSERT(!!loop_ == !!cobalt_logger_);
if (cobalt_logger_) {
ZX_DEBUG_ASSERT(loop_);
// Clean these up after we've released lock_, to avoid potential deadlock waiting on a thread
// that may be trying to get lock_.
loop_to_stop_outside_lock = std::move(loop_);
logger_to_delete_outside_lock = std::move(cobalt_logger_);
}
ZX_DEBUG_ASSERT(!loop_ && !cobalt_logger_);
if (service_directory) {
std::unique_ptr<cobalt::CobaltLogger> new_logger;
auto loop = std::make_unique<async::Loop>(&kAsyncLoopConfigNoAttachToCurrentThread);
zx_status_t status = loop->StartThread("MetricsBuffer");
if (status != ZX_OK) {
LOG(WARNING, "MetricsBuffer::SetServiceDirectory() thread creation failed.");
// ~loop
// ~service_directory
return;
}
zx::event cobalt_logger_creation_done;
status = zx::event::create(/*options=*/0, &cobalt_logger_creation_done);
if (status != ZX_OK) {
LOG(WARNING, "zx::event::create() failed - status: %d", status);
// ~loop
// ~service_directory
return;
}
// Must create cobalt::CobaltLogger on same dispatcher that it'll use.
async::PostTask(loop->dispatcher(),
[this, &loop, &service_directory, &new_logger, &cobalt_logger_creation_done] {
new_logger = cobalt::NewCobaltLoggerFromProjectId(
loop->dispatcher(), service_directory, project_id_);
cobalt_logger_creation_done.signal(0, ZX_EVENT_SIGNALED);
});
zx_signals_t observed = 0;
status =
cobalt_logger_creation_done.wait_one(ZX_EVENT_SIGNALED, zx::time::infinite(), &observed);
if (status != ZX_OK) {
// ~loop
// ~new_logger
return;
}
ZX_DEBUG_ASSERT((observed & ZX_EVENT_SIGNALED) != 0);
loop_ = std::move(loop);
cobalt_logger_ = std::move(new_logger);
ZX_DEBUG_ASSERT(!!loop_ && !!cobalt_logger_);
if (!pending_counts_.empty()) {
LOG(INFO, "MetricsBuffer::SetServiceDirectory() flushing counts soon.");
TryPostFlushCountsLocked();
}
}
ZX_DEBUG_ASSERT(!!loop_ == !!cobalt_logger_);
} // ~lock
ZX_DEBUG_ASSERT(!!loop_to_stop_outside_lock == !!logger_to_delete_outside_lock);
if (loop_to_stop_outside_lock) {
// Stop the loop first, to avoid any async tasks queued by the CobaltLogger outlasting the
// CobaltLogger.
loop_to_stop_outside_lock->Quit();
loop_to_stop_outside_lock->JoinThreads();
loop_to_stop_outside_lock->Shutdown();
// Delete here for clarity.
loop_to_stop_outside_lock = nullptr;
// Now it's safe to delete the CobaltLogger, which we do here manually for clarity.
logger_to_delete_outside_lock = nullptr;
}
}
void MetricsBuffer::SetMinLoggingPeriod(zx::duration min_logging_period) {
std::lock_guard<std::mutex> lock(lock_);
ZX_DEBUG_ASSERT(last_flushed_ == zx::time::infinite_past());
min_logging_period_ = min_logging_period;
}
void MetricsBuffer::LogEventCount(uint32_t metric_id, std::vector<uint32_t> dimension_values,
uint32_t count) {
std::lock_guard<std::mutex> lock(lock_);
ZX_DEBUG_ASSERT(!!loop_ == !!cobalt_logger_);
bool was_empty = pending_counts_.empty();
PendingCountsKey key(metric_id, std::move(dimension_values));
pending_counts_[key] += count;
if (was_empty) {
// We don't try to process locally, because if we're logging infrequently then the optimization
// wouldn't matter, and if we're logging frequently then we need to post in order to delay
// anyway. So we opt to keep the code simpler and always post even if the deadline is in the
// past.
TryPostFlushCountsLocked();
}
}
void MetricsBuffer::LogEvent(uint32_t metric_id, std::vector<uint32_t> dimension_values) {
LogEventCount(metric_id, std::move(dimension_values), 1);
}
void MetricsBuffer::FlushPendingEventCounts() {
std::lock_guard<std::mutex> lock(lock_);
ZX_DEBUG_ASSERT(!!loop_ == !!cobalt_logger_);
if (!cobalt_logger_) {
// In some testing scenarios, we may not have access to a real LoggerFactory, and we can end up
// here if SetServiceDirectory() hit an error while (or shortly after) switching from an old
// loop_ and cobalt_logger_ to a new loop_ and cobalt_logger_.
//
// If later we get a new cobalt_logger_ from a new SetServiceDirectory(), this method will run
// again.
return;
}
last_flushed_ = zx::clock::get_monotonic();
PendingCounts snapped_pending_event_counts;
snapped_pending_event_counts.swap(pending_counts_);
auto iter = snapped_pending_event_counts.begin();
constexpr uint32_t kMaxBatchSize = 64;
std::vector<fuchsia::cobalt::CobaltEvent> batch;
while (iter != snapped_pending_event_counts.end()) {
auto [key, count] = *iter;
iter++;
batch.emplace_back(cobalt::CobaltEventBuilder(key.metric_id())
.with_event_codes(key.dimension_values())
.as_count_event(/*period_duration_micros=*/0, count));
ZX_DEBUG_ASSERT(batch.size() <= kMaxBatchSize);
if (batch.size() == kMaxBatchSize || iter == snapped_pending_event_counts.end()) {
cobalt_logger_->LogCobaltEvents(std::move(batch));
ZX_DEBUG_ASSERT(batch.empty());
}
}
}
void MetricsBuffer::TryPostFlushCountsLocked() {
ZX_DEBUG_ASSERT(!!loop_ == !!cobalt_logger_);
if (cobalt_logger_) {
ZX_DEBUG_ASSERT(loop_);
async::PostTaskForTime(
loop_->dispatcher(), [this] { FlushPendingEventCounts(); },
last_flushed_ + min_logging_period_);
}
}
MetricsBuffer::PendingCountsKey::PendingCountsKey(uint32_t metric_id,
std::vector<uint32_t> dimension_values)
: metric_id_(metric_id), dimension_values_(dimension_values) {}
uint32_t MetricsBuffer::PendingCountsKey::metric_id() const { return metric_id_; }
const std::vector<uint32_t>& MetricsBuffer::PendingCountsKey::dimension_values() const {
return dimension_values_;
}
size_t MetricsBuffer::PendingCountsKeyHash::operator()(const PendingCountsKey& key) const noexcept {
// Rely on size_t being unsigned so it'll wrap without being undefined behavior.
size_t hash = hash_uint32_(key.metric_id());
for (auto value : key.dimension_values()) {
hash += hash_uint32_(value);
}
return hash;
}
bool MetricsBuffer::PendingCountsKeyEqual::operator()(const PendingCountsKey& lhs,
const PendingCountsKey& rhs) const noexcept {
if (lhs.metric_id() != rhs.metric_id()) {
return false;
}
if (lhs.dimension_values().size() != rhs.dimension_values().size()) {
return false;
}
size_t size = lhs.dimension_values().size();
for (uint32_t i = 0; i < size; ++i) {
if (lhs.dimension_values()[i] != rhs.dimension_values()[i]) {
return false;
}
}
return true;
}
MetricBuffer MetricsBuffer::CreateMetricBuffer(uint32_t metric_id) {
return MetricBuffer(shared_from_this(), metric_id);
}
MetricBuffer::MetricBuffer(std::shared_ptr<MetricsBuffer> parent, uint32_t metric_id)
: parent_(parent), metric_id_(metric_id) {}
void MetricBuffer::LogEvent(std::vector<uint32_t> dimension_values) {
parent_->LogEvent(metric_id_, std::move(dimension_values));
}
void MetricBuffer::LogEventCount(std::vector<uint32_t> dimension_values, uint32_t count) {
parent_->LogEventCount(metric_id_, std::move(dimension_values), count);
}
} // namespace cobalt