| // Copyright 2020 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #ifndef COBALT_SRC_LOCAL_AGGREGATION_1_1_AGGREGATION_PROCEDURES_AGGREGATION_PROCEDURE_H_ |
| #define COBALT_SRC_LOCAL_AGGREGATION_1_1_AGGREGATION_PROCEDURES_AGGREGATION_PROCEDURE_H_ |
| |
| #include <cstdint> |
| #include <set> |
| #include <string> |
| #include <vector> |
| |
| #include "src/lib/statusor/statusor.h" |
| #include "src/lib/util/datetime_util.h" |
| #include "src/local_aggregation_1_1/local_aggregation.pb.h" |
| #include "src/logger/event_record.h" |
| #include "src/logging.h" |
| #include "src/pb/event.pb.h" |
| #include "src/pb/observation.pb.h" |
| #include "src/registry/aggregation_window.pb.h" |
| #include "src/registry/metric_definition.pb.h" |
| #include "src/registry/report_definition.pb.h" |
| |
| namespace cobalt::local_aggregation { |
| |
| // AggregationProcedure is the abstract interface for aggregation logic. |
| // |
| // For each local aggregation procedure, there should be an implementation of this class. |
| // |
| // An AggregationProcedure should not have any of its own state. Its role is to update the state |
| // within the instance of ReportAggregate it is passed in UpdateAggregate() and |
| // MaybeGenerateObservation(). |
| // |
| // See Also: |
| // * LocalAggregation calls UpdateAggregate() whenever AddEvent() is called. |
| // * ObservationGenerator calls MaybeGenerateObservation() on a regular schedule (~once per hour). |
| // |
| // Note: |
| // * AggregationProcedures that are always hour-based should implement HourlyAggregationProcedure |
| // instead. |
| class AggregationProcedure { |
| public: |
| // Construct an AggregationProcedure |
| // |
| // |metric| Used to extract MetricType for validating EventRecords. |
| // |report| Used for extracting report-specific parameters. |
| explicit AggregationProcedure(const MetricDefinition &metric, const ReportDefinition &report); |
| virtual ~AggregationProcedure() = default; |
| |
| // AggregationProcedure::Get returns the appropriate AggregationProcedure for the given |
| // metric/report. |
| // |
| // A return value of nullptr should be interpreted as a report that is not supported by cobalt 1.1 |
| // (perhaps a cobalt 1.0 metric/report) |
| static std::unique_ptr<AggregationProcedure> Get(const MetricDefinition &metric, |
| const ReportDefinition &report); |
| |
| // UpdateAggregate takes an |event_record| and adds it to the given |aggregate| object according |
| // to the aggregation procedure implemented by this instance of AggregationProcedure. |
| // |
| // |event_record|: The event that needs to be added to the aggregate. |
| // |aggregate|: A mutable ReportAggregate object. |
| void UpdateAggregate(const logger::EventRecord &event_record, ReportAggregate *aggregate); |
| |
| // GenerateObservation is the public interface for generating observations. It handles |
| // reading EventCodeAggregates out of the ReportAggregate based on the provided time_info, and |
| // passing all that information down to GenerateSingleObservation. |
| // |
| // |time_info|: Time period for which this observation should be generated. |
| // |aggregate|: A mutable reference to a ReportAggregate object. |
| lib::statusor::StatusOr<std::unique_ptr<Observation>> GenerateObservation( |
| const util::TimeInfo &time_info, ReportAggregate *aggregate); |
| |
| [[nodiscard]] virtual std::string DebugString() const = 0; |
| |
| // IsDaily returns true if this particular report should be treated as daily data. |
| [[nodiscard]] virtual bool IsDaily() const = 0; |
| |
| // IsValidEventType returns true if the given event type can be handled by the |
| // AggregationProcedure. |
| bool IsValidEventType(Event::TypeCase type) const; |
| |
| util::TimeInfo GetLastTimeInfo(const ReportAggregate &aggregate) const { |
| if (IsDaily()) { |
| return util::TimeInfo::FromDayIndex(aggregate.daily().last_day_index()); |
| } |
| return util::TimeInfo::FromHourId(aggregate.hourly().last_hour_id()); |
| } |
| |
| void SetLastTimeInfo(ReportAggregate *aggregate, util::TimeInfo info) const { |
| if (IsDaily()) { |
| aggregate->mutable_daily()->set_last_day_index(info.day_index); |
| } else { |
| aggregate->mutable_hourly()->set_last_hour_id(info.hour_id); |
| } |
| } |
| |
| protected: |
| // UpdateAggregateData takes an |event_record| and adds it to the given |aggregate_data| |
| // object according to the aggregation procedure implemented by this instance of |
| // AggregationProcedure. |
| // |
| // |event_record|: The event that needs to be added to the aggregate data. |
| // |aggregate_data|: A mutable AggregateData object. |
| // |event_code_aggregate|: A mutable EventCodeAggregate object containing |aggregate_data|. |
| virtual void UpdateAggregateData(const logger::EventRecord &event_record, |
| AggregateData *aggregate_data, |
| EventCodeAggregate *event_code_aggregate) = 0; |
| |
| // GenerateSingleObservation generates an observation for the given report at the given time. |
| // |
| // A non-ok Status means there was an error at some point while generating observations. An ok |
| // status means that the observation was generated sucessfully and should never be null. |
| // |
| // This method should generate a 'zero' observation if no data is present. The 'zero' observation |
| // should be of the same type as the non-zero observations, but usually with no fields populated. |
| // |
| // |aggregates|: A list of mutable references to EventCodeAggregate objects. |
| // |event_vectors|: The event vectors which should be included in the observation. |
| // |
| // Note: This function should maintain only the minimum amount of data required to generate future |
| // observations. If the ReportAggregate contains data points that cannot be useful for any future |
| // observations, they should be deleted. This method will always be called with strictly |
| // increasing time_info values. If an hour_id or a day_index is passed to this function, an |
| // earlier hour_id or day_index will never be passed and the data associated with that can be |
| // deleted. |
| virtual lib::statusor::StatusOr<std::unique_ptr<Observation>> GenerateSingleObservation( |
| const std::vector<EventCodeAggregate *> &aggregates, |
| const std::set<std::vector<uint32_t>> &event_vectors) = 0; |
| |
| ReportDefinition::ReportType report_type() const { return report_type_; } |
| MetricDefinition::MetricType metric_type() const { return metric_type_; } |
| |
| // Returns a pointer to the EventCodeAggregate for the current time window (daily or hourly, |
| // depending on the report for which |event_record| was logged). |
| EventCodeAggregate *GetEventCodeAggregate(const logger::EventRecord &event_record, |
| ReportAggregate *aggregate) const; |
| |
| void SetEventVectorBufferMax(uint64_t event_vector_buffer_max) { |
| event_vector_buffer_max_ = event_vector_buffer_max; |
| } |
| |
| private: |
| // Returns a mutable reference to the EventCodeAggregate associated with the given time specifier. |
| // |
| // |time|: Either a day_index (for daily metrics) or an hour_id (for hourly metrics) |
| // |aggregate|: A mutable reference to a ReportAggregate object. |
| EventCodeAggregate *GetEventCodeAggregate(uint32_t time, ReportAggregate *aggregate) const; |
| |
| // Returns a pointer to the AggregateData of |aggregate| that |event_record| should be added to, |
| // if |aggregate| has capacity for the event vector of |event_record|. In addition, adds |
| // the event vector of |event_record| to |aggregate|'s event_vectors buffer if it has capacity and |
| // the event vector is not yet present. Returns a null pointer if |aggregate| does not have |
| // capacity. |
| AggregateData *GetAggregateData(const logger::EventRecord &event_record, |
| EventCodeAggregate *aggregate) const; |
| |
| // Returns a set containing the first |event_vector_buffer_max_| event vectors which were logged |
| // for |aggregates|. The |aggregates| should be ordered from earliest to latest. |
| std::set<std::vector<uint32_t>> SelectEventVectorsForObservation( |
| const std::vector<EventCodeAggregate *> &aggregates) const; |
| |
| OnDeviceAggregationWindow window_; |
| MetricDefinition::MetricType metric_type_; |
| ReportDefinition::ReportType report_type_; |
| uint64_t event_vector_buffer_max_; |
| }; |
| |
| class UnimplementedAggregationProcedure : public AggregationProcedure { |
| public: |
| explicit UnimplementedAggregationProcedure(std::string name) |
| : AggregationProcedure(MetricDefinition(), ReportDefinition()), name_(std::move(name)) {} |
| ~UnimplementedAggregationProcedure() override = default; |
| |
| void UpdateAggregateData(const logger::EventRecord & /* event_record */, |
| AggregateData * /*aggregate_data*/, |
| EventCodeAggregate * /* aggregate */) override { |
| LOG(ERROR) << "UpdateEventCodeAggregate is UNIMPLEMENTED for " << DebugString(); |
| } |
| |
| lib::statusor::StatusOr<std::unique_ptr<Observation>> GenerateSingleObservation( |
| const std::vector<EventCodeAggregate *> & /* aggregates */, |
| const std::set<std::vector<uint32_t>> & /*event_vectors*/) override { |
| LOG(ERROR) << "GenerateSingleObservation is UNIMPLEMENTED for " << DebugString(); |
| |
| return util::Status::CANCELLED; |
| } |
| |
| [[nodiscard]] std::string DebugString() const override { |
| return absl::StrCat(name_, " (Unimplemented)"); |
| } |
| |
| [[nodiscard]] bool IsDaily() const override { return true; } |
| |
| private: |
| std::string name_; |
| }; |
| |
| } // namespace cobalt::local_aggregation |
| |
| #endif // COBALT_SRC_LOCAL_AGGREGATION_1_1_AGGREGATION_PROCEDURES_AGGREGATION_PROCEDURE_H_ |