| // Copyright 2020 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "src/lib/metrics_buffer/metrics_buffer.h" |
| |
| #include <fidl/fuchsia.io/cpp/hlcpp_conversion.h> |
| #include <inttypes.h> |
| #include <lib/async-loop/default.h> |
| #include <lib/async-loop/loop.h> |
| #include <lib/async/cpp/task.h> |
| #include <zircon/assert.h> |
| #include <zircon/types.h> |
| |
| #include <memory> |
| #include <mutex> |
| #include <utility> |
| |
| #include "fidl/fuchsia.io/cpp/markers.h" |
| #include "lib/fidl/cpp/wire/internal/transport_channel.h" |
| #include "lib/sync/completion.h" |
| #include "lib/syslog/global.h" |
| #include "log.h" |
| #include "src/lib/metrics_buffer/metrics_impl.h" |
| |
| namespace cobalt { |
| |
| namespace { |
| |
| template<class T> |
| constexpr bool sfinae_false_v = false; |
| |
| // This is essentially the same code as cobalt buckets_config BucketIndex. |
| uint32_t BucketIndex(std::vector<int64_t>& floors, int64_t val) { |
| // 0 is the underflow bucket. |
| if (val < floors[0]) { |
| return 0; |
| } |
| |
| // TODO(b/278918086): Maybe switch to binary search? |
| for (uint32_t i = 1; i < floors.size(); i++) { |
| if (val >= floors[i - 1] && val < floors[i]) { |
| return i; |
| } |
| } |
| |
| // floors_.size() is the overflow bucket. |
| return static_cast<uint32_t>(floors.size()); |
| } |
| |
| } // namespace |
| |
| // static |
| std::shared_ptr<MetricsBuffer> MetricsBuffer::Create(uint32_t project_id) { |
| return std::shared_ptr<MetricsBuffer>(new MetricsBuffer(project_id)); |
| } |
| |
| // static |
| std::shared_ptr<MetricsBuffer> MetricsBuffer::Create( |
| uint32_t project_id, std::shared_ptr<sys::ServiceDirectory> service_directory) { |
| return std::shared_ptr<MetricsBuffer>(new MetricsBuffer(project_id, service_directory)); |
| } |
| |
| MetricsBuffer::MetricsBuffer(uint32_t project_id) : project_id_(project_id) { |
| std::lock_guard<std::mutex> lock(lock_); |
| ZX_DEBUG_ASSERT(!loop_ && !cobalt_logger_); |
| } |
| |
| MetricsBuffer::MetricsBuffer(uint32_t project_id, |
| std::shared_ptr<sys::ServiceDirectory> service_directory) |
| : project_id_(project_id) { |
| SetServiceDirectory(service_directory); |
| } |
| |
| MetricsBuffer::~MetricsBuffer() { SetServiceDirectory(nullptr); } |
| |
| void MetricsBuffer::SetServiceDirectory(std::shared_ptr<sys::ServiceDirectory> service_directory) { |
| LOG(INFO, "SetServiceDirectory is called"); |
| std::unique_ptr<cobalt::MetricsImpl> logger_to_delete_outside_lock; |
| std::unique_ptr<async::Loop> loop_to_stop_outside_lock; |
| { // scope lock |
| std::lock_guard<std::mutex> lock(lock_); |
| ZX_DEBUG_ASSERT(!!loop_ == !!cobalt_logger_); |
| if (cobalt_logger_) { |
| LOG(INFO, "cobalt_logger already exists"); |
| ZX_DEBUG_ASSERT(loop_); |
| // Clean these up after we've released lock_, to avoid potential deadlock waiting on a thread |
| // that may be trying to get lock_. |
| loop_to_stop_outside_lock = std::move(loop_); |
| logger_to_delete_outside_lock = std::move(cobalt_logger_); |
| } |
| ZX_DEBUG_ASSERT(!loop_ && !cobalt_logger_); |
| if (service_directory) { |
| LOG(INFO, "Creating new cobalt_logger"); |
| std::unique_ptr<cobalt::MetricsImpl> new_logger; |
| fidl::ClientEnd<fuchsia_io::Directory> directory = |
| fidl::HLCPPToNatural(service_directory->CloneChannel()); |
| auto loop = std::make_unique<async::Loop>(&kAsyncLoopConfigNoAttachToCurrentThread); |
| zx_status_t status = loop->StartThread("MetricsBuffer"); |
| if (status != ZX_OK) { |
| LOG(WARNING, "MetricsBuffer::SetServiceDirectory() thread creation failed."); |
| // ~loop |
| // ~service_directory |
| return; |
| } |
| sync_completion_t finished; |
| // Must create fuchsia_metrics::MetricEventLogger on same dispatcher that it'll use. |
| async::PostTask(loop->dispatcher(), [this, &loop, &directory, &new_logger, &finished] { |
| // MetricsImpl will internally use the directory to reconnect as needed, should we ever lose |
| // connection. |
| new_logger = std::make_unique<cobalt::MetricsImpl>(loop->dispatcher(), std::move(directory), |
| project_id_); |
| sync_completion_signal(&finished); |
| }); |
| sync_completion_wait(&finished, ZX_TIME_INFINITE); |
| loop_ = std::move(loop); |
| cobalt_logger_ = std::move(new_logger); |
| ZX_DEBUG_ASSERT(!!loop_ && !!cobalt_logger_); |
| if (!pending_counts_.empty()) { |
| LOG(INFO, "MetricsBuffer::SetServiceDirectory() flushing counts soon."); |
| TryPostFlushCountsLocked(); |
| } |
| } |
| ZX_DEBUG_ASSERT(!!loop_ == !!cobalt_logger_); |
| } // ~lock |
| ZX_DEBUG_ASSERT(!!loop_to_stop_outside_lock == !!logger_to_delete_outside_lock); |
| if (loop_to_stop_outside_lock) { |
| // Need to delete the old MetricsImpl with the same dispatcher that created the old MetricsImpl. |
| sync_completion_t finished; |
| async::PostTask(loop_to_stop_outside_lock->dispatcher(), |
| [&logger_to_delete_outside_lock, &finished] { |
| logger_to_delete_outside_lock = nullptr; |
| sync_completion_signal(&finished); |
| }); |
| ZX_ASSERT(ZX_OK == sync_completion_wait(&finished, ZX_TIME_INFINITE)); |
| ZX_DEBUG_ASSERT(!logger_to_delete_outside_lock); |
| loop_to_stop_outside_lock->Quit(); |
| loop_to_stop_outside_lock->JoinThreads(); |
| loop_to_stop_outside_lock->Shutdown(); |
| // Delete here for clarity. |
| loop_to_stop_outside_lock = nullptr; |
| } |
| } |
| |
| void MetricsBuffer::SetMinLoggingPeriod(zx::duration min_logging_period) { |
| std::lock_guard<std::mutex> lock(lock_); |
| ZX_DEBUG_ASSERT(last_flushed_ == zx::time::infinite_past()); |
| min_logging_period_ = min_logging_period; |
| } |
| |
| void MetricsBuffer::LogEventCount(uint32_t metric_id, std::vector<uint32_t> dimension_values, |
| uint32_t count) { |
| std::lock_guard<std::mutex> lock(lock_); |
| ZX_DEBUG_ASSERT(!!loop_ == !!cobalt_logger_); |
| bool was_empty = pending_counts_.empty(); |
| MetricKey key(metric_id, std::move(dimension_values)); |
| pending_counts_[key] += count; |
| if (was_empty) { |
| // We don't try to process locally, because if we're logging infrequently then the optimization |
| // wouldn't matter, and if we're logging frequently then we need to post in order to delay |
| // anyway. So we opt to keep the code simpler and always post even if the deadline is in the |
| // past. |
| TryPostFlushCountsLocked(); |
| } |
| } |
| |
| void MetricsBuffer::LogEvent(uint32_t metric_id, std::vector<uint32_t> dimension_values) { |
| LogEventCount(metric_id, std::move(dimension_values), 1); |
| } |
| |
| void MetricsBuffer::LogString(uint32_t metric_id, std::vector<uint32_t> dimension_values, |
| std::string string_value) { |
| std::lock_guard<std::mutex> lock(lock_); |
| ZX_DEBUG_ASSERT(!!loop_ == !!cobalt_logger_); |
| bool was_empty = pending_strings_.empty(); |
| MetricKey key(metric_id, std::move(dimension_values)); |
| // By design, this emplace can fail if a matching string is already in the set. |
| pending_strings_[key].emplace(std::move(string_value)); |
| if (was_empty) { |
| TryPostFlushCountsLocked(); |
| } |
| } |
| |
| void MetricsBuffer::LogHistogramValue(const HistogramInfo& histogram_info, |
| std::vector<int64_t>& floors, |
| std::vector<uint32_t> dimension_values, int64_t value) { |
| std::lock_guard<std::mutex> lock(lock_); |
| ZX_DEBUG_ASSERT(!!loop_ == !!cobalt_logger_); |
| bool was_empty = pending_histograms_.empty(); |
| MetricKey key(histogram_info.metric_id, std::move(dimension_values)); |
| uint32_t bucket_index = BucketIndex(floors, value); |
| pending_histograms_[key][bucket_index]++; |
| if (was_empty) { |
| TryPostFlushCountsLocked(); |
| } |
| } |
| |
| void MetricsBuffer::FlushPendingEventCounts() { |
| std::lock_guard<std::mutex> lock(lock_); |
| ZX_DEBUG_ASSERT(!!loop_ == !!cobalt_logger_); |
| if (!cobalt_logger_) { |
| // In some testing scenarios, we may not have access to a real MetricEventLoggerFactory, and we |
| // can end up here if SetServiceDirectory() hit an error while (or shortly after) switching from |
| // an old loop_ and cobalt_logger_ to a new loop_ and cobalt_logger_. |
| // |
| // If later we get a new cobalt_logger_ from a new SetServiceDirectory(), this method will run |
| // again. |
| return; |
| } |
| last_flushed_ = zx::clock::get_monotonic(); |
| |
| constexpr uint32_t kMaxBatchSize = 64; |
| std::vector<fuchsia_metrics::MetricEvent> batch; |
| |
| PendingCounts snapped_pending_event_counts; |
| snapped_pending_event_counts.swap(pending_counts_); |
| for (auto& [key, count] : snapped_pending_event_counts) { |
| batch.emplace_back(key.metric_id(), key.dimension_values(), |
| fuchsia_metrics::MetricEventPayload::WithCount(count)); |
| ZX_DEBUG_ASSERT(batch.size() <= kMaxBatchSize); |
| if (batch.size() == kMaxBatchSize) { |
| cobalt_logger_->LogMetricEvents(std::move(batch)); |
| batch.clear(); |
| } |
| } |
| |
| PendingStrings snapped_pending_strings; |
| snapped_pending_strings.swap(pending_strings_); |
| for (auto& [key, strings] : snapped_pending_strings) { |
| for (auto& string : strings) { |
| batch.emplace_back(key.metric_id(), key.dimension_values(), |
| fuchsia_metrics::MetricEventPayload::WithStringValue(string)); |
| ZX_DEBUG_ASSERT(batch.size() <= kMaxBatchSize); |
| if (batch.size() == kMaxBatchSize) { |
| cobalt_logger_->LogMetricEvents(std::move(batch)); |
| batch.clear(); |
| } |
| } |
| } |
| |
| PendingHistograms snapped_pending_histograms; |
| snapped_pending_histograms.swap(pending_histograms_); |
| for (auto& [histogram_key, pending_buckets] : snapped_pending_histograms) { |
| std::vector<fuchsia_metrics::HistogramBucket> buckets; |
| auto bucket_iter = pending_buckets.begin(); |
| while (bucket_iter != pending_buckets.end()) { |
| auto [bucket_index, tally] = *bucket_iter; |
| bucket_iter++; |
| fuchsia_metrics::HistogramBucket bucket; |
| bucket.index() = bucket_index; |
| bucket.count() = tally; |
| buckets.emplace_back(std::move(bucket)); |
| if (buckets.size() == fuchsia_metrics::kMaxHistogramBuckets || |
| bucket_iter == pending_buckets.end()) { |
| batch.emplace_back(histogram_key.metric_id(), histogram_key.dimension_values(), |
| fuchsia_metrics::MetricEventPayload::WithHistogram(std::move(buckets))); |
| buckets.clear(); |
| ZX_DEBUG_ASSERT(batch.size() <= kMaxBatchSize); |
| if (batch.size() == kMaxBatchSize) { |
| cobalt_logger_->LogMetricEvents(std::move(batch)); |
| batch.clear(); |
| } |
| } |
| } |
| } |
| |
| ZX_DEBUG_ASSERT(batch.size() < kMaxBatchSize); |
| if (!batch.empty()) { |
| cobalt_logger_->LogMetricEvents(std::move(batch)); |
| batch.clear(); |
| } |
| } |
| |
| void MetricsBuffer::TryPostFlushCountsLocked() { |
| ZX_DEBUG_ASSERT(!!loop_ == !!cobalt_logger_); |
| if (cobalt_logger_) { |
| ZX_DEBUG_ASSERT(loop_); |
| async::PostTaskForTime( |
| loop_->dispatcher(), [this] { FlushPendingEventCounts(); }, |
| last_flushed_ + min_logging_period_); |
| } |
| } |
| |
| MetricsBuffer::MetricKey::MetricKey(uint32_t metric_id, std::vector<uint32_t> dimension_values) |
| : metric_id_(metric_id), dimension_values_(std::move(dimension_values)) {} |
| |
| uint32_t MetricsBuffer::MetricKey::metric_id() const { return metric_id_; } |
| |
| const std::vector<uint32_t>& MetricsBuffer::MetricKey::dimension_values() const { |
| return dimension_values_; |
| } |
| |
| size_t MetricsBuffer::MetricKeyHash::operator()(const MetricKey& key) const noexcept { |
| // Rely on size_t being unsigned so it'll wrap without being undefined behavior. |
| size_t hash = hash_uint32_(key.metric_id()); |
| for (auto value : key.dimension_values()) { |
| hash += hash_uint32_(value); |
| } |
| return hash; |
| } |
| |
| bool MetricsBuffer::MetricKeyEqual::operator()(const MetricKey& lhs, |
| const MetricKey& rhs) const noexcept { |
| if (lhs.metric_id() != rhs.metric_id()) { |
| return false; |
| } |
| if (lhs.dimension_values().size() != rhs.dimension_values().size()) { |
| return false; |
| } |
| size_t size = lhs.dimension_values().size(); |
| for (uint32_t i = 0; i < size; ++i) { |
| if (lhs.dimension_values()[i] != rhs.dimension_values()[i]) { |
| return false; |
| } |
| } |
| return true; |
| } |
| |
| MetricBuffer MetricsBuffer::CreateMetricBuffer(uint32_t metric_id) { |
| return MetricBuffer(shared_from_this(), metric_id); |
| } |
| |
| StringMetricBuffer MetricsBuffer::CreateStringMetricBuffer(uint32_t metric_id) { |
| return StringMetricBuffer(shared_from_this(), metric_id); |
| } |
| |
| HistogramMetricBuffer MetricsBuffer::CreateHistogramMetricBuffer(HistogramInfo histogram_info) { |
| return HistogramMetricBuffer(shared_from_this(), std::move(histogram_info)); |
| } |
| |
| MetricBuffer::MetricBuffer(std::shared_ptr<MetricsBuffer> parent, uint32_t metric_id) |
| : parent_(parent), metric_id_(metric_id) {} |
| |
| void MetricBuffer::LogEvent(std::vector<uint32_t> dimension_values) { |
| parent_->LogEvent(metric_id_, std::move(dimension_values)); |
| } |
| |
| void MetricBuffer::LogEventCount(std::vector<uint32_t> dimension_values, uint32_t count) { |
| parent_->LogEventCount(metric_id_, std::move(dimension_values), count); |
| } |
| |
| StringMetricBuffer::StringMetricBuffer(std::shared_ptr<MetricsBuffer> parent, uint32_t metric_id) |
| : parent_(parent), metric_id_(metric_id) {} |
| |
| void StringMetricBuffer::LogString(std::vector<uint32_t> dimension_values, |
| std::string string_value) { |
| parent_->LogString(metric_id_, std::move(dimension_values), std::move(string_value)); |
| } |
| |
| HistogramMetricBuffer::HistogramMetricBuffer(std::shared_ptr<MetricsBuffer> parent, |
| HistogramInfo histogram_info) |
| : parent_(parent), histogram_info_(std::move(histogram_info)) { |
| std::visit([this](auto&& info){ |
| using InfoT = std::decay_t<decltype(info)>; |
| if constexpr (std::is_same_v<InfoT, LinearIntegerBuckets>) { |
| floors_.resize(info.num_buckets + 1); |
| for (int64_t i = 0; i < info.num_buckets + 1; i++) { |
| floors_[i] = info.floor + i * info.step_size; |
| } |
| } else if constexpr (std::is_same_v<InfoT, ExponentialIntegerBuckets>) { |
| floors_.resize(info.num_buckets + 1); |
| floors_[0] = info.floor; |
| int64_t offset = info.initial_step; |
| for (uint32_t i = 1; i < info.num_buckets + 1; i++) { |
| floors_[i] = info.floor + offset; |
| offset *= info.step_multiplier; |
| } |
| } else { |
| static_assert(sfinae_false_v<InfoT>, "non-exhaustive"); |
| } |
| }, histogram_info_.buckets); |
| } |
| |
| void HistogramMetricBuffer::LogValue(std::vector<uint32_t> dimension_values, int64_t value) { |
| parent_->LogHistogramValue(histogram_info_, floors_, std::move(dimension_values), value); |
| } |
| |
| } // namespace cobalt |