blob: 47f80d55e92b47e0751d34821032280a4dd3c093 [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef COBALT_SRC_LOCAL_AGGREGATION_1_1_AGGREGATION_PROCEDURES_AGGREGATION_PROCEDURE_H_
#define COBALT_SRC_LOCAL_AGGREGATION_1_1_AGGREGATION_PROCEDURES_AGGREGATION_PROCEDURE_H_
#include <cstdint>
#include <set>
#include <string>
#include <vector>
#include "src/lib/statusor/statusor.h"
#include "src/lib/util/datetime_util.h"
#include "src/local_aggregation_1_1/local_aggregation.pb.h"
#include "src/logger/event_record.h"
#include "src/logging.h"
#include "src/pb/event.pb.h"
#include "src/pb/observation.pb.h"
#include "src/registry/aggregation_window.pb.h"
#include "src/registry/metric_definition.pb.h"
#include "src/registry/report_definition.pb.h"
namespace cobalt::local_aggregation {
// AggregationProcedure is the abstract interface for aggregation logic.
//
// For each local aggregation procedure, there should be an implementation of this class.
//
// An AggregationProcedure should not have any of its own state. Its role is to update the state
// within the instance of ReportAggregate it is passed in UpdateAggregate() and
// MaybeGenerateObservation().
//
// See Also:
// * LocalAggregation calls UpdateAggregate() whenever AddEvent() is called.
// * ObservationGenerator calls MaybeGenerateObservation() on a regular schedule (~once per hour).
//
// Note:
// * AggregationProcedures that are always hour-based should implement HourlyAggregationProcedure
// instead.
class AggregationProcedure {
public:
// Construct an AggregationProcedure
//
// |metric| Used to extract MetricType for validating EventRecords.
// |report| Used for extracting report-specific parameters.
explicit AggregationProcedure(const MetricDefinition &metric, const ReportDefinition &report);
virtual ~AggregationProcedure() = default;
// AggregationProcedure::Get returns the appropriate AggregationProcedure for the given
// metric/report.
//
// A return value of nullptr should be interpreted as a report that is not supported by cobalt 1.1
// (perhaps a cobalt 1.0 metric/report)
static std::unique_ptr<AggregationProcedure> Get(const MetricDefinition &metric,
const ReportDefinition &report);
// UpdateAggregate takes an |event_record| and adds it to the given |aggregate| object according
// to the aggregation procedure implemented by this instance of AggregationProcedure.
//
// |event_record|: The event that needs to be added to the aggregate.
// |aggregate|: A mutable ReportAggregate object.
void UpdateAggregate(const logger::EventRecord &event_record, ReportAggregate *aggregate);
// GenerateObservation is the public interface for generating observations. It handles
// reading EventCodeAggregates out of the ReportAggregate based on the provided time_info, and
// passing all that information down to GenerateSingleObservation.
//
// |time_info|: Time period for which this observation should be generated.
// |aggregate|: A mutable reference to a ReportAggregate object.
lib::statusor::StatusOr<std::unique_ptr<Observation>> GenerateObservation(
const util::TimeInfo &time_info, ReportAggregate *aggregate);
[[nodiscard]] virtual std::string DebugString() const = 0;
// IsDaily returns true if this particular report should be treated as daily data.
[[nodiscard]] virtual bool IsDaily() const = 0;
// IsValidEventType returns true if the given event type can be handled by the
// AggregationProcedure.
bool IsValidEventType(Event::TypeCase type) const;
util::TimeInfo GetLastTimeInfo(const ReportAggregate &aggregate) const {
if (IsDaily()) {
return util::TimeInfo::FromDayIndex(aggregate.daily().last_day_index());
}
return util::TimeInfo::FromHourId(aggregate.hourly().last_hour_id());
}
void SetLastTimeInfo(ReportAggregate *aggregate, util::TimeInfo info) const {
if (IsDaily()) {
aggregate->mutable_daily()->set_last_day_index(info.day_index);
} else {
aggregate->mutable_hourly()->set_last_hour_id(info.hour_id);
}
}
protected:
// UpdateAggregateData takes an |event_record| and adds it to the given |aggregate_data|
// object according to the aggregation procedure implemented by this instance of
// AggregationProcedure.
//
// |event_record|: The event that needs to be added to the aggregate data.
// |aggregate_data|: A mutable AggregateData object.
// |event_code_aggregate|: A mutable EventCodeAggregate object containing |aggregate_data|.
virtual void UpdateAggregateData(const logger::EventRecord &event_record,
AggregateData *aggregate_data,
EventCodeAggregate *event_code_aggregate) = 0;
// GenerateSingleObservation generates an observation for the given report at the given time.
//
// A non-ok Status means there was an error at some point while generating observations. An ok
// status means that the observation was generated sucessfully and should never be null.
//
// This method should generate a 'zero' observation if no data is present. The 'zero' observation
// should be of the same type as the non-zero observations, but usually with no fields populated.
//
// |aggregates|: A list of mutable references to EventCodeAggregate objects.
// |event_vectors|: The event vectors which should be included in the observation.
//
// Note: This function should maintain only the minimum amount of data required to generate future
// observations. If the ReportAggregate contains data points that cannot be useful for any future
// observations, they should be deleted. This method will always be called with strictly
// increasing time_info values. If an hour_id or a day_index is passed to this function, an
// earlier hour_id or day_index will never be passed and the data associated with that can be
// deleted.
virtual lib::statusor::StatusOr<std::unique_ptr<Observation>> GenerateSingleObservation(
const std::vector<EventCodeAggregate *> &aggregates,
const std::set<std::vector<uint32_t>> &event_vectors) = 0;
ReportDefinition::ReportType report_type() const { return report_type_; }
MetricDefinition::MetricType metric_type() const { return metric_type_; }
// Returns a pointer to the EventCodeAggregate for the current time window (daily or hourly,
// depending on the report for which |event_record| was logged).
EventCodeAggregate *GetEventCodeAggregate(const logger::EventRecord &event_record,
ReportAggregate *aggregate) const;
void SetEventVectorBufferMax(uint64_t event_vector_buffer_max) {
event_vector_buffer_max_ = event_vector_buffer_max;
}
private:
// Returns a mutable reference to the EventCodeAggregate associated with the given time specifier.
//
// |time|: Either a day_index (for daily metrics) or an hour_id (for hourly metrics)
// |aggregate|: A mutable reference to a ReportAggregate object.
EventCodeAggregate *GetEventCodeAggregate(uint32_t time, ReportAggregate *aggregate) const;
// Returns a pointer to the AggregateData of |aggregate| that |event_record| should be added to,
// if |aggregate| has capacity for the event vector of |event_record|. In addition, adds
// the event vector of |event_record| to |aggregate|'s event_vectors buffer if it has capacity and
// the event vector is not yet present. Returns a null pointer if |aggregate| does not have
// capacity.
AggregateData *GetAggregateData(const logger::EventRecord &event_record,
EventCodeAggregate *aggregate) const;
// Returns a set containing the first |event_vector_buffer_max_| event vectors which were logged
// for |aggregates|. The |aggregates| should be ordered from earliest to latest.
std::set<std::vector<uint32_t>> SelectEventVectorsForObservation(
const std::vector<EventCodeAggregate *> &aggregates) const;
OnDeviceAggregationWindow window_;
MetricDefinition::MetricType metric_type_;
ReportDefinition::ReportType report_type_;
uint64_t event_vector_buffer_max_;
};
class UnimplementedAggregationProcedure : public AggregationProcedure {
public:
explicit UnimplementedAggregationProcedure(std::string name)
: AggregationProcedure(MetricDefinition(), ReportDefinition()), name_(std::move(name)) {}
~UnimplementedAggregationProcedure() override = default;
void UpdateAggregateData(const logger::EventRecord & /* event_record */,
AggregateData * /*aggregate_data*/,
EventCodeAggregate * /* aggregate */) override {
LOG(ERROR) << "UpdateEventCodeAggregate is UNIMPLEMENTED for " << DebugString();
}
lib::statusor::StatusOr<std::unique_ptr<Observation>> GenerateSingleObservation(
const std::vector<EventCodeAggregate *> & /* aggregates */,
const std::set<std::vector<uint32_t>> & /*event_vectors*/) override {
LOG(ERROR) << "GenerateSingleObservation is UNIMPLEMENTED for " << DebugString();
return util::Status::CANCELLED;
}
[[nodiscard]] std::string DebugString() const override {
return absl::StrCat(name_, " (Unimplemented)");
}
[[nodiscard]] bool IsDaily() const override { return true; }
private:
std::string name_;
};
} // namespace cobalt::local_aggregation
#endif // COBALT_SRC_LOCAL_AGGREGATION_1_1_AGGREGATION_PROCEDURES_AGGREGATION_PROCEDURE_H_