| // Copyright 2020 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #ifndef COBALT_SRC_LOCAL_AGGREGATION_1_1_AGGREGATION_PROCEDURES_AGGREGATION_PROCEDURE_H_ |
| #define COBALT_SRC_LOCAL_AGGREGATION_1_1_AGGREGATION_PROCEDURES_AGGREGATION_PROCEDURE_H_ |
| |
| #include <cstdint> |
| #include <set> |
| #include <string> |
| #include <vector> |
| |
| #include "src/lib/util/datetime_util.h" |
| #include "src/lib/util/status_builder.h" |
| #include "src/local_aggregation_1_1/local_aggregation.pb.h" |
| #include "src/logger/event_record.h" |
| #include "src/logging.h" |
| #include "src/pb/event.pb.h" |
| #include "src/pb/observation.pb.h" |
| #include "src/public/lib/statusor/statusor.h" |
| #include "src/registry/aggregation_window.pb.h" |
| #include "src/registry/metric_definition.pb.h" |
| #include "src/registry/report_definition.pb.h" |
| |
| namespace cobalt::local_aggregation { |
| |
| // The EventCodesAggregateData objects, and any other data, needed to generate a single observation. |
| // An instance of this object corresponds to the data for a single time period bucket of a report. |
| // For hourly or 1-day reports, this object is all the data needed to generate the observation. |
| // For multi-day reports, multiple of these objects are needed to generate the observation. When |
| // generating observations, all AggregateDataToGenerate and all the aggregate_data they contain must |
| // be for the same system profile. |
| struct AggregateDataToGenerate { |
| std::vector<EventCodesAggregateData *> aggregate_data; |
| const google::protobuf::RepeatedPtrField<std::string> &string_hashes; |
| }; |
| |
| // A container for the observation, with the hash of the system profile that it is for. |
| struct ObservationAndSystemProfile { |
| uint64_t system_profile_hash; |
| std::unique_ptr<Observation> observation; |
| }; |
| |
| // AggregationProcedure is the abstract interface for aggregation logic. |
| // |
| // For each local aggregation procedure, there should be an implementation of this class. |
| // |
| // An AggregationProcedure should not have any of its own state. Its role is to update the state |
| // within the instance of ReportAggregate it is passed in UpdateAggregate() and |
| // MaybeGenerateObservation(). |
| // |
| // See Also: |
| // * LocalAggregation calls UpdateAggregate() whenever AddEvent() is called. |
| // * ObservationGenerator calls MaybeGenerateObservation() on a regular schedule (~once per hour). |
| // |
| // Note: |
| // * AggregationProcedures that are always hour-based should implement HourlyAggregationProcedure |
| // instead. |
| class AggregationProcedure { |
| public: |
| // Construct an AggregationProcedure |
| // |
| // |metric| Used to extract MetricType for validating EventRecords. |
| // |report| Used for extracting report-specific parameters. |
| explicit AggregationProcedure(const MetricDefinition &metric, const ReportDefinition &report); |
| virtual ~AggregationProcedure() = default; |
| |
| // AggregationProcedure::Get returns the appropriate AggregationProcedure for the given |
| // metric/report. |
| static lib::statusor::StatusOr<std::unique_ptr<AggregationProcedure>> Get( |
| const MetricDefinition &metric, const ReportDefinition &report); |
| |
| // UpdateAggregate takes an |event_record| and adds it to the given |aggregate| object according |
| // to the aggregation procedure implemented by this instance of AggregationProcedure. |
| // |
| // |event_record|: The event that needs to be added to the aggregate. |
| // |aggregate|: A mutable ReportAggregate object. |
| // |system_profile_hash|: The hash of the filtered system profile this event is for. |
| // |system_time|: The system time that the event occurred at. |
| void UpdateAggregate(const logger::EventRecord &event_record, ReportAggregate *aggregate, |
| uint64_t system_profile_hash, |
| std::chrono::system_clock::time_point system_time); |
| |
| // GenerateObservations is the public interface for generating observations. It handles |
| // reading EventCodeAggregates out of the ReportAggregate based on the provided time_info, and |
| // passing all that information down to GenerateSingleObservation. |
| // |
| // |time_info|: Time period for which the observations should be generated. |
| // |aggregate|: A mutable reference to a ReportAggregate object. |
| // |
| // Returns all the generated observations (multiple can be generated for REPORT_ALL reports), |
| // and the system profile hash to use when generating their ObservationMetadata. |
| // |
| lib::statusor::StatusOr<std::vector<ObservationAndSystemProfile>> GenerateObservations( |
| const util::TimeInfo &time_info, ReportAggregate *aggregate); |
| |
| [[nodiscard]] virtual std::string DebugString() const = 0; |
| |
| // IsDaily returns true if this particular report should be treated as daily data. |
| [[nodiscard]] virtual bool IsDaily() const = 0; |
| |
| // IsExpedited returns true if this particular report can generate expedited observations during |
| // the current day. |
| [[nodiscard]] virtual bool IsExpedited() const { return false; } |
| |
| // IsValidEventType returns true if the given event type can be handled by the |
| // AggregationProcedure. |
| bool IsValidEventType(Event::TypeCase type) const; |
| |
| util::TimeInfo GetLastTimeInfo(const ReportAggregate &aggregate) const { |
| if (IsDaily()) { |
| return util::TimeInfo::FromDayIndex(aggregate.daily().last_day_index()); |
| } |
| return util::TimeInfo::FromHourId(aggregate.hourly().last_hour_id()); |
| } |
| |
| // Record that the observations generated from a previous call to GeneratedObservation, have been |
| // stored. This will clean up any aggregate data that is no longer needed, and update the last |
| // time info data so that future aggregation runs don't try to regenerate it. |
| // |
| // Note: This function should maintain only the minimum amount of data required to generate future |
| // observations. If the ReportAggregate contains data points that cannot be useful for any future |
| // observations, they should be deleted. If an hour_id or a day_index is passed to this function, |
| // an earlier hour_id or day_index will never be passed and the data associated with that can be |
| // deleted. |
| virtual void ObservationsCommitted(ReportAggregate *aggregate, util::TimeInfo time_info, |
| uint64_t system_profile_hash) const; |
| |
| protected: |
| // UpdateAggregateData takes an |event_record| and adds it to the given |aggregate_data| |
| // object according to the aggregation procedure implemented by this instance of |
| // AggregationProcedure. |
| // |
| // |event_record|: The event that needs to be added to the aggregate data. |
| // |aggregate_data|: A mutable AggregateData object. |
| // |bucket|: A mutable AggregationPeriodBucket object containing |aggregate_data|. |
| virtual void UpdateAggregateData(const logger::EventRecord &event_record, |
| AggregateData *aggregate_data, |
| AggregationPeriodBucket *bucket) = 0; |
| |
| // Merge two instances of the aggregate data for this procedure. |
| // |
| // The data from |aggregate_data| is merged into the data in |merged_aggregate_data|. Any fields |
| // in the |bucket| (which contains |aggregate_data|) are also merged into |merged_bucket| (which |
| // contains |merged_aggregate_data|). |
| // |
| // TODO(fxbug.dev/91520): implement this in all subclasses and make it pure virtual. |
| virtual void MergeAggregateData(AggregateData *merged_aggregate_data, |
| AggregationPeriodBucket *merged_bucket, |
| const AggregateData &aggregate_data, |
| const AggregationPeriodBucket &bucket) {} |
| |
| // GenerateSingleObservation generates an observation for the given report at the given time. |
| // |
| // A non-ok Status means there was an error at some point while generating observations. An ok |
| // status means that the observation generation process completed successfully. The observation |
| // may be null if there was nothing to send for this aggregate in this time window. |
| // |
| // |buckets|: A list of AggregateDataToGenerate objects containing the AggregationData objects. |
| // |event_vectors|: The event vectors which should be included in the observation. |
| // |time_info|: The time for which the observation is being generated. |
| // |
| // Note: This method will always be called with increasing time_info values. If the report is not |
| // expedited, the values will always be strictly increasing. For expedited reports, the last |
| // (current day's) time_info can be used to make multiple calls during the day, so the |
| // implementation must be able to determine if an observation has already been generated for the |
| // data and not regenerate it. |
| virtual lib::statusor::StatusOr<std::unique_ptr<Observation>> GenerateSingleObservation( |
| const std::vector<AggregateDataToGenerate> &buckets, |
| const std::set<std::vector<uint32_t>> &event_vectors, const util::TimeInfo &time_info) = 0; |
| |
| ReportDefinition::ReportType report_type() const { return report_type_; } |
| MetricDefinition::MetricType metric_type() const { return metric_type_; } |
| |
| // Accumulates the data that needs to be checked for observation generation for time_info. |
| // The return value is a map from the system_profile_hash, to the data to use to generate the |
| // observation for that system profile. |
| std::map<uint64_t, std::vector<AggregateDataToGenerate>> GetAggregateDataToGenerate( |
| const util::TimeInfo &time_info, ReportAggregate *aggregate) const; |
| |
| // Returns a set containing the first |event_vector_buffer_max_| event vectors which were logged |
| // for |aggregates|. The |buckets| should be ordered from earliest to latest aggregation period. |
| std::set<std::vector<uint32_t>> SelectEventVectorsForObservation( |
| const std::vector<AggregateDataToGenerate> &buckets) const; |
| |
| void SetEventVectorBufferMax(uint64_t event_vector_buffer_max) { |
| event_vector_buffer_max_ = event_vector_buffer_max; |
| } |
| |
| private: |
| // Returns a mutable reference to the AggregationPeriodBucket associated with the given time |
| // specifier. |
| // |
| // |time|: Either a day_index (for daily metrics) or an hour_id (for hourly metrics) |
| // |aggregate|: A mutable reference to a ReportAggregate object. |
| AggregationPeriodBucket *GetAggregationPeriodBucket(uint32_t time, |
| ReportAggregate *aggregate) const; |
| |
| // Returns a pointer to the AggregateData of |bucket| that |event_record| should be added to, |
| // if |bucket| has capacity for the event vector of |event_record|. In addition, adds |
| // the event vector of |event_record| to |bucket|'s event_vectors buffer if it has capacity and |
| // the event vector is not yet present. Returns a null pointer if |bucket| does not have |
| // capacity. |
| AggregateData *GetAggregateData(const logger::EventRecord &event_record, |
| AggregationPeriodBucket *bucket, uint64_t system_profile_hash, |
| std::chrono::system_clock::time_point system_time) const; |
| |
| // Returns the starting TimeInfo to use aggregate data from for this report, when processing the |
| // current_time_info. |
| util::TimeInfo GetStartTimeInfo(const util::TimeInfo ¤t_time_info) const; |
| |
| OnDeviceAggregationWindow window_; |
| MetricDefinition::MetricType metric_type_; |
| ReportDefinition::ReportType report_type_; |
| SystemProfileSelectionPolicy system_profile_selection_policy_; |
| uint64_t event_vector_buffer_max_; |
| }; |
| |
| class UnimplementedAggregationProcedure : public AggregationProcedure { |
| public: |
| explicit UnimplementedAggregationProcedure(std::string name) |
| : AggregationProcedure(MetricDefinition(), ReportDefinition()), name_(std::move(name)) {} |
| ~UnimplementedAggregationProcedure() override = default; |
| |
| void UpdateAggregateData(const logger::EventRecord & /* event_record */, |
| AggregateData * /*aggregate_data*/, |
| AggregationPeriodBucket * /* bucket */) override { |
| LOG(ERROR) << "UpdateAggregateData is UNIMPLEMENTED for " << DebugString(); |
| } |
| |
| lib::statusor::StatusOr<std::unique_ptr<Observation>> GenerateSingleObservation( |
| const std::vector<AggregateDataToGenerate> & /* aggregates */, |
| const std::set<std::vector<uint32_t>> & /*event_vectors*/, |
| const util::TimeInfo & /*time_info*/) override { |
| return util::StatusBuilder(StatusCode::CANCELLED, |
| "GeneratingSingleObservation is UNIMPLEMENTED for ") |
| .AppendMsg(DebugString()) |
| .LogError() |
| .Build(); |
| } |
| |
| [[nodiscard]] std::string DebugString() const override { |
| return absl::StrCat(name_, " (Unimplemented)"); |
| } |
| |
| [[nodiscard]] bool IsDaily() const override { return true; } |
| |
| private: |
| std::string name_; |
| }; |
| |
| } // namespace cobalt::local_aggregation |
| |
| #endif // COBALT_SRC_LOCAL_AGGREGATION_1_1_AGGREGATION_PROCEDURES_AGGREGATION_PROCEDURE_H_ |