blob: 23b6c0b83646e997cfe4846eda475cb197dc898e [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef COBALT_SRC_LOCAL_AGGREGATION_AGGREGATION_PROCEDURES_AGGREGATION_PROCEDURE_H_
#define COBALT_SRC_LOCAL_AGGREGATION_AGGREGATION_PROCEDURES_AGGREGATION_PROCEDURE_H_
#include <cstdint>
#include <optional>
#include <set>
#include <string>
#include <vector>
#include "src/lib/util/datetime_util.h"
#include "src/lib/util/not_null.h"
#include "src/lib/util/status_builder.h"
#include "src/local_aggregation/civil_time_manager.h"
#include "src/local_aggregation/local_aggregation.pb.h"
#include "src/logger/event_record.h"
#include "src/logging.h"
#include "src/pb/event.pb.h"
#include "src/pb/observation.pb.h"
#include "src/public/lib/statusor/statusor.h"
#include "src/registry/metric_definition.pb.h"
#include "src/registry/report_definition.pb.h"
#include "src/registry/window_size.pb.h"
namespace cobalt::local_aggregation {
// The EventCodesAggregateData objects, and any other data, needed to generate a single observation.
// An instance of this object corresponds to the data for a single time period bucket of a report.
// For hourly or 1-day reports, this object is all the data needed to generate the observation.
// For multi-day reports, multiple of these objects are needed to generate the observation. When
// generating observations, all AggregateDataToGenerate and all the aggregate_data they contain must
// be for the same system profile.
//
// TODO(https://fxbug.dev/322409910): Delete usage of |use_legacy_hash| after clients no longer
// store them.
struct AggregateDataToGenerate {
std::vector<std::reference_wrapper<EventCodesAggregateData>> aggregate_data;
const google::protobuf::RepeatedPtrField<std::string> &string_hashes;
bool use_legacy_hash;
explicit AggregateDataToGenerate(
const google::protobuf::RepeatedPtrField<std::string> &string_hashes, bool use_legacy_hash)
: string_hashes(string_hashes), use_legacy_hash(use_legacy_hash) {}
// Make the struct move only
AggregateDataToGenerate(AggregateDataToGenerate const &) = delete;
AggregateDataToGenerate &operator=(AggregateDataToGenerate const &) = delete;
AggregateDataToGenerate(AggregateDataToGenerate &&) = default;
};
// A container for the observation, with the hash of the system profile that it is for.
struct ObservationAndSystemProfile {
uint64_t system_profile_hash;
std::unique_ptr<Observation> observation;
};
// AggregationProcedure is the abstract interface for aggregation logic.
//
// For each local aggregation procedure, there should be an implementation of this class.
//
// An AggregationProcedure should not have any of its own state. Its role is to update the state
// within the instance of ReportAggregate it is passed in UpdateAggregate() and
// MaybeGenerateObservation().
//
// See Also:
// * LocalAggregation calls UpdateAggregate() whenever AddEvent() is called.
// * ObservationGenerator calls MaybeGenerateObservation() on a regular schedule (~once per hour).
//
// Note:
// * AggregationProcedures that are always hour-based should implement HourlyAggregationProcedure
// instead.
class AggregationProcedure {
public:
// Construct an AggregationProcedure
//
// |metric| Used to extract MetricType for validating EventRecords.
// |report| Used for extracting report-specific parameters.
explicit AggregationProcedure(const MetricDefinition &metric, const ReportDefinition &report);
virtual ~AggregationProcedure() = default;
// AggregationProcedure::Get returns the appropriate AggregationProcedure for the given
// metric/report.
static lib::statusor::StatusOr<util::NotNullUniquePtr<AggregationProcedure>> Get(
const std::string &customer_name, const std::string &project_name,
const MetricDefinition &metric, const ReportDefinition &report);
// Get a `ReportAggregate` from `aggregate` for a `report_id`.
//
// If the ReportAggregate doesn't exist yet, it will be created and initialized.
lib::statusor::StatusOr<ReportAggregate *> GetReportAggregate(
MetricAggregate *metric_aggregate, uint32_t report_id,
CivilTimeManager *civil_time_mgr) const;
// UpdateAggregate takes an |event_record| and adds it to the given |aggregate| object according
// to the aggregation procedure implemented by this instance of AggregationProcedure.
//
// |event_record|: The event that needs to be added to the aggregate.
// |aggregate|: A mutable ReportAggregate object.
// |system_profile_hash|: The hash of the filtered system profile this event is for.
// |system_time|: The system time that the event occurred at.
void UpdateAggregate(const logger::EventRecord &event_record, ReportAggregate &aggregate,
uint64_t system_profile_hash,
std::chrono::system_clock::time_point system_time);
// Merge two instances of SystemProfileAggregate according to this aggregation procedure.
//
// The event codes and their data from |aggregate| are merged into the event codes and data in
// |merged_aggregate|. Both system profile aggregates must be included in the same
// AggregationPeriodBucket. Each procedure's implementation of MergeAggregateData does the work of
// merging the AggregateData in each bucket.
void MergeSystemProfileAggregates(SystemProfileAggregate &merged_aggregate,
const SystemProfileAggregate &aggregate);
// GenerateObservations is the public interface for generating observations. It handles
// reading EventCodeAggregates out of the ReportAggregate based on the provided time_info, and
// passing all that information down to GenerateSingleObservation.
//
// |time_info|: Time period for which the observations should be generated.
// |aggregate|: A mutable reference to a ReportAggregate object.
//
// Returns all the generated observations (multiple can be generated for REPORT_ALL reports),
// and the system profile hash to use when generating their ObservationMetadata.
//
lib::statusor::StatusOr<std::vector<ObservationAndSystemProfile>> GenerateObservations(
const util::TimeInfo &time_info, ReportAggregate &aggregate);
[[nodiscard]] virtual std::string DebugString() const = 0;
// IsDaily returns true if this particular report should be treated as daily data.
[[nodiscard]] virtual bool IsDaily() const = 0;
// IsExpedited returns true if this particular report can generate expedited observations during
// the current day.
[[nodiscard]] virtual bool IsExpedited() const { return false; }
// IsValidEventType returns true if the given event type can be handled by the
// AggregationProcedure.
[[nodiscard]] bool IsValidEventType(Event::TypeCase type) const;
[[nodiscard]] util::TimeInfo GetLastTimeInfo(const ReportAggregate &aggregate) const {
if (IsDaily()) {
return util::TimeInfo::FromDayIndex(aggregate.daily().last_day_index());
}
return util::TimeInfo::FromHourId(aggregate.hourly().last_hour_id());
}
// Record that the observation generation from a previous call to GenerateObservations is done.
// This will clean up any aggregate data that is no longer needed, and update the last
// time info data so that future aggregation runs don't try to regenerate it.
//
// If |system_profile_hash| is set, then it identifies the System Profile of the observations that
// have been stored. If it is not set, then GenerateObservations returned no observations for the
// |time_info|.
//
// Note: This function should maintain only the minimum amount of data required to generate future
// observations. If the ReportAggregate contains data points that cannot be useful for any future
// observations, they should be deleted. If an hour_id or a day_index is passed to this function,
// an earlier hour_id or day_index will never be passed and the data associated with that can be
// deleted.
virtual void ObservationsCommitted(ReportAggregate &aggregate, util::TimeInfo time_info,
std::optional<uint64_t> system_profile_hash) const;
protected:
// UpdateAggregateData takes an |event_record| and adds it to the given |aggregate_data|
// object according to the aggregation procedure implemented by this instance of
// AggregationProcedure.
//
// |event_record|: The event that needs to be added to the aggregate data.
// |aggregate_data|: A mutable AggregateData object.
// |bucket|: A mutable AggregationPeriodBucket object containing |aggregate_data|.
virtual void UpdateAggregateData(const logger::EventRecord &event_record,
AggregateData &aggregate_data,
AggregationPeriodBucket &bucket) = 0;
// Merge two instances of the aggregate data for this procedure.
//
// The data from |aggregate_data| is merged into the data in |merged_aggregate_data|. Both
// AggregateData objects must be part of the same AggregationPeriodBucket for the fields on the
// bucket to be preserved accurately.
virtual void MergeAggregateData(AggregateData &merged_aggregate_data,
const AggregateData &aggregate_data) = 0;
// GenerateSingleObservation generates an observation for the given report at the given time.
//
// A non-ok Status means there was an error at some point while generating observations. An ok
// status means that the observation generation process completed successfully. The observation
// may be null if there was nothing to send for this aggregate in this time window.
//
// |buckets|: A list of AggregateDataToGenerate objects containing the AggregationData objects.
// |event_vectors|: The event vectors which should be included in the observation.
// |time_info|: The time for which the observation is being generated.
//
// Note: This method will always be called with increasing time_info values. If the report is not
// expedited, the values will always be strictly increasing. For expedited reports, the last
// (current day's) time_info can be used to make multiple calls during the day, so the
// implementation must be able to determine if an observation has already been generated for the
// data and not regenerate it.
virtual lib::statusor::StatusOr<std::unique_ptr<Observation>> GenerateSingleObservation(
const std::vector<AggregateDataToGenerate> &buckets,
const std::set<std::vector<uint32_t>> &event_vectors, const util::TimeInfo &time_info) = 0;
[[nodiscard]] ReportDefinition::ReportType report_type() const { return report_type_; }
[[nodiscard]] MetricDefinition::MetricType metric_type() const { return metric_type_; }
// Accumulates the data that needs to be checked for observation generation for time_info.
// The return value is a map from the system_profile_hash, to the data to use to generate the
// observation for that system profile.
std::map<uint64_t, std::vector<AggregateDataToGenerate>> GetAggregateDataToGenerate(
const util::TimeInfo &time_info, ReportAggregate &aggregate) const;
// Returns a set containing the first |event_vector_buffer_max_| event vectors which were logged
// for |aggregates|. The |buckets| should be ordered from earliest to latest aggregation period.
[[nodiscard]] std::set<std::vector<uint32_t>> SelectEventVectorsForObservation(
const std::vector<AggregateDataToGenerate> &buckets) const;
void SetEventVectorBufferMax(uint64_t event_vector_buffer_max) {
event_vector_buffer_max_ = event_vector_buffer_max;
}
private:
// Returns a mutable reference to the AggregationPeriodBucket associated with the given time
// specifier.
//
// |time|: util::TimeInfo to fetch the bucket for.
// |aggregate|: A mutable reference to a ReportAggregate object.
AggregationPeriodBucket &GetAggregationPeriodBucket(const util::TimeInfo &time,
ReportAggregate &aggregate) const;
// Returns a pointer to the AggregateData of |bucket| that |event_record| should be added to,
// if |bucket| has capacity for the event vector of |event_record|. In addition, adds
// the event vector of |event_record| to |bucket|'s event_vectors buffer if it has capacity and
// the event vector is not yet present. Returns a non-OK status if |bucket| does not have
// capacity.
lib::statusor::StatusOr<std::reference_wrapper<AggregateData>> GetAggregateData(
const logger::EventRecord &event_record, AggregationPeriodBucket &bucket,
uint64_t system_profile_hash, std::chrono::system_clock::time_point system_time) const;
// Returns a pointer to the AggregateData of |system_profile_aggregate| that data for
// |event_codes| should be added to, if |system_profile_aggregate| has capacity for the event
// vector of |event_codes|. If it is not present, adds the event vector of |event_codes| to
// |system_profile_aggregate|'s event_vectors buffer, if it has capacity. Returns a non-OK status
// if |system_profile_aggregate| does not have capacity for new event codes.
lib::statusor::StatusOr<std::reference_wrapper<AggregateData>> GetAggregateData(
SystemProfileAggregate &system_profile_aggregate,
google::protobuf::RepeatedField<uint32_t> event_codes) const;
// Returns the starting TimeInfo to use aggregate data from for this report, when processing the
// current_time_info.
[[nodiscard]] util::TimeInfo GetStartTimeInfo(const util::TimeInfo &current_time_info) const;
const MetricDefinition &metric_;
WindowSize window_;
MetricDefinition::MetricType metric_type_;
ReportDefinition::ReportType report_type_;
SystemProfileSelectionPolicy system_profile_selection_policy_;
uint64_t event_vector_buffer_max_;
};
class UnimplementedAggregationProcedure : public AggregationProcedure {
public:
explicit UnimplementedAggregationProcedure(std::string name)
: AggregationProcedure(MetricDefinition(), ReportDefinition()), name_(std::move(name)) {}
~UnimplementedAggregationProcedure() override = default;
void UpdateAggregateData(const logger::EventRecord & /* event_record */,
AggregateData & /*aggregate_data*/,
AggregationPeriodBucket & /* bucket */) override {
LOG(ERROR) << "UpdateAggregateData is UNIMPLEMENTED for " << DebugString();
}
void MergeAggregateData(AggregateData & /*merged_aggregate_data*/,
const AggregateData & /*aggregate_data*/) override {
LOG(ERROR) << "MergeAggregateData is UNIMPLEMENTED for " << DebugString();
}
lib::statusor::StatusOr<std::unique_ptr<Observation>> GenerateSingleObservation(
const std::vector<AggregateDataToGenerate> & /* aggregates */,
const std::set<std::vector<uint32_t>> & /*event_vectors*/,
const util::TimeInfo & /*time_info*/) override {
return util::StatusBuilder(StatusCode::CANCELLED,
"GeneratingSingleObservation is UNIMPLEMENTED for ")
.AppendMsg(DebugString())
.LogError()
.Build();
}
[[nodiscard]] std::string DebugString() const override {
return absl::StrCat(name_, " (Unimplemented)");
}
[[nodiscard]] bool IsDaily() const override { return true; }
private:
std::string name_;
};
} // namespace cobalt::local_aggregation
#endif // COBALT_SRC_LOCAL_AGGREGATION_AGGREGATION_PROCEDURES_AGGREGATION_PROCEDURE_H_