blob: e6b026e500409602c28c63a315bfbd4c64040fcb [file] [log] [blame]
// Copyright 2019 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "src/logger/undated_event_manager.h"
#include <memory>
#include <string>
#include <google/protobuf/stubs/map_util.h>
#include "src/lib/util/clock.h"
#include "src/logger/event_loggers.h"
#include "src/logger/event_record.h"
#include "src/logger/internal_metrics.h"
#include "src/registry/metric_definition.pb.h"
namespace cobalt::logger {
using local_aggregation::EventAggregator;
UndatedEventManager::UndatedEventManager(const Encoder& encoder, EventAggregator& event_aggregator,
local_aggregation::LocalAggregation* local_aggregation,
ObservationWriter* observation_writer,
system_data::SystemDataInterface* system_data,
util::CivilTimeConverterInterface* civil_time_converter,
int32_t max_saved_events)
: encoder_(encoder),
event_aggregator_(event_aggregator),
local_aggregation_(local_aggregation),
observation_writer_(observation_writer),
system_data_(system_data),
civil_time_converter_(civil_time_converter),
max_saved_events_(max_saved_events) {
CHECK(local_aggregation_);
CHECK(observation_writer_);
CHECK(civil_time_converter_);
steady_clock_ = std::make_unique<util::SteadyClock>();
}
Status UndatedEventManager::Save(std::unique_ptr<EventRecord> event_record) {
auto id =
std::make_pair(event_record->metric()->customer_id(), event_record->metric()->project_id());
std::unique_ptr<SavedEventRecord> saved_record = std::make_unique<SavedEventRecord>(
SavedEventRecord{steady_clock_->now(), std::move(event_record)});
auto lock = protected_saved_records_fields_.lock();
if (lock->flushed) {
LOG(WARNING) << "Event saved after the queue has been flushed for metric: "
<< saved_record->event_record->GetLogDetails();
return FlushSavedRecord(std::move(saved_record), lock->reference_system_time_,
lock->reference_monotonic_time_);
}
// Save the event record to the FIFO queue.
if (lock->saved_records_.size() >= max_saved_events_) {
auto dropped_record = std::move(lock->saved_records_.front());
lock->saved_records_.pop_front();
++lock->num_events_dropped_[std::make_pair(
dropped_record->event_record->metric()->customer_id(),
dropped_record->event_record->metric()->project_id())];
}
lock->saved_records_.emplace_back(std::move(saved_record));
lock->num_events_cached_[id] += 1;
return Status::OkStatus();
}
Status UndatedEventManager::Flush(util::SystemClockInterface* system_clock,
InternalMetrics* internal_metrics) {
// should no longer be incoming records to save as the clock is now accurate.
auto lock = protected_saved_records_fields_.lock();
// Get reference times that will be used to convert the saved event record's monotonic time to an
// accurate system time.
lock->reference_system_time_ = system_clock->now();
lock->reference_monotonic_time_ = steady_clock_->now();
std::unique_ptr<SavedEventRecord> saved_record;
while (!lock->saved_records_.empty()) {
saved_record = std::move(lock->saved_records_.front());
auto log_details = saved_record->event_record->GetLogDetails();
Status result = FlushSavedRecord(std::move(saved_record), lock->reference_system_time_,
lock->reference_monotonic_time_);
lock->saved_records_.pop_front();
if (!result.ok()) {
LOG_FIRST_N(ERROR, 10) << "Error " << result.error_message()
<< " occurred while processing a saved event for metric: "
<< log_details;
}
}
NoOpInternalMetrics noop_internal_metrics;
if (!internal_metrics) {
internal_metrics = &noop_internal_metrics;
}
// Record that we saved records due to clock inaccuracy.
for (auto cached_events : lock->num_events_cached_) {
internal_metrics->InaccurateClockEventsCached(cached_events.second);
}
lock->num_events_cached_.clear();
// Record that we dropped events due to memory constraints.
for (auto dropped_events : lock->num_events_dropped_) {
internal_metrics->InaccurateClockEventsDropped(dropped_events.second);
}
lock->num_events_dropped_.clear();
lock->flushed = true;
return Status::OkStatus();
}
Status UndatedEventManager::FlushSavedRecord(
std::unique_ptr<SavedEventRecord> saved_record,
const std::chrono::system_clock::time_point& reference_system_time,
const std::chrono::steady_clock::time_point& reference_monotonic_time) {
// Convert the recorded monotonic time to a system time.
const std::chrono::seconds& time_shift = std::chrono::duration_cast<std::chrono::seconds>(
reference_monotonic_time - saved_record->monotonic_time);
std::chrono::system_clock::time_point event_timestamp = reference_system_time - time_shift;
auto event_logger = internal::EventLogger::Create(
saved_record->event_record->metric()->metric_type(), encoder_, event_aggregator_,
local_aggregation_, observation_writer_, system_data_, civil_time_converter_);
if (event_logger == nullptr) {
return Status(StatusCode::INVALID_ARGUMENT, "No event_logger found");
}
return event_logger->Log(std::move(saved_record->event_record), event_timestamp);
}
int UndatedEventManager::NumSavedEvents() const {
return static_cast<int>(protected_saved_records_fields_.const_lock()->saved_records_.size());
}
} // namespace cobalt::logger