blob: 92678973c6dd7af89a9ffbb9d676cf79c8cb7956 [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef COBALT_SRC_LOCAL_AGGREGATION_1_1_AGGREGATION_PROCEDURES_AGGREGATION_PROCEDURE_H_
#define COBALT_SRC_LOCAL_AGGREGATION_1_1_AGGREGATION_PROCEDURES_AGGREGATION_PROCEDURE_H_
#include <cstdint>
#include <set>
#include <string>
#include <vector>
#include "src/lib/util/datetime_util.h"
#include "src/lib/util/status_builder.h"
#include "src/local_aggregation_1_1/local_aggregation.pb.h"
#include "src/logger/event_record.h"
#include "src/logging.h"
#include "src/pb/event.pb.h"
#include "src/pb/observation.pb.h"
#include "src/public/lib/statusor/statusor.h"
#include "src/registry/aggregation_window.pb.h"
#include "src/registry/metric_definition.pb.h"
#include "src/registry/report_definition.pb.h"
namespace cobalt::local_aggregation {
// The EventCodesAggregateData objects, and any other data, needed to generate a single observation.
// An instance of this object corresponds to the data for a single time period bucket of a report.
// For hourly or 1-day reports, this object is all the data needed to generate the observation.
// For multi-day reports, multiple of these objects are needed to generate the observation. When
// generating observations, all AggregateDataToGenerate and all the aggregate_data they contain must
// be for the same system profile.
struct AggregateDataToGenerate {
std::vector<EventCodesAggregateData *> aggregate_data;
const google::protobuf::RepeatedPtrField<std::string> &string_hashes;
};
// A container for the observation, with the hash of the system profile that it is for.
struct ObservationAndSystemProfile {
uint64_t system_profile_hash;
std::unique_ptr<Observation> observation;
};
// AggregationProcedure is the abstract interface for aggregation logic.
//
// For each local aggregation procedure, there should be an implementation of this class.
//
// An AggregationProcedure should not have any of its own state. Its role is to update the state
// within the instance of ReportAggregate it is passed in UpdateAggregate() and
// MaybeGenerateObservation().
//
// See Also:
// * LocalAggregation calls UpdateAggregate() whenever AddEvent() is called.
// * ObservationGenerator calls MaybeGenerateObservation() on a regular schedule (~once per hour).
//
// Note:
// * AggregationProcedures that are always hour-based should implement HourlyAggregationProcedure
// instead.
class AggregationProcedure {
public:
// Construct an AggregationProcedure
//
// |metric| Used to extract MetricType for validating EventRecords.
// |report| Used for extracting report-specific parameters.
explicit AggregationProcedure(const MetricDefinition &metric, const ReportDefinition &report);
virtual ~AggregationProcedure() = default;
// AggregationProcedure::Get returns the appropriate AggregationProcedure for the given
// metric/report.
static lib::statusor::StatusOr<std::unique_ptr<AggregationProcedure>> Get(
const MetricDefinition &metric, const ReportDefinition &report);
// UpdateAggregate takes an |event_record| and adds it to the given |aggregate| object according
// to the aggregation procedure implemented by this instance of AggregationProcedure.
//
// |event_record|: The event that needs to be added to the aggregate.
// |aggregate|: A mutable ReportAggregate object.
// |system_profile_hash|: The hash of the filtered system profile this event is for.
// |system_time|: The system time that the event occurred at.
void UpdateAggregate(const logger::EventRecord &event_record, ReportAggregate *aggregate,
uint64_t system_profile_hash,
std::chrono::system_clock::time_point system_time);
// GenerateObservations is the public interface for generating observations. It handles
// reading EventCodeAggregates out of the ReportAggregate based on the provided time_info, and
// passing all that information down to GenerateSingleObservation.
//
// |time_info|: Time period for which the observations should be generated.
// |aggregate|: A mutable reference to a ReportAggregate object.
//
// Returns all the generated observations (multiple can be generated for REPORT_ALL reports),
// and the system profile hash to use when generating their ObservationMetadata.
//
lib::statusor::StatusOr<std::vector<ObservationAndSystemProfile>> GenerateObservations(
const util::TimeInfo &time_info, ReportAggregate *aggregate);
[[nodiscard]] virtual std::string DebugString() const = 0;
// IsDaily returns true if this particular report should be treated as daily data.
[[nodiscard]] virtual bool IsDaily() const = 0;
// IsExpedited returns true if this particular report can generate expedited observations during
// the current day.
[[nodiscard]] virtual bool IsExpedited() const { return false; }
// IsValidEventType returns true if the given event type can be handled by the
// AggregationProcedure.
bool IsValidEventType(Event::TypeCase type) const;
util::TimeInfo GetLastTimeInfo(const ReportAggregate &aggregate) const {
if (IsDaily()) {
return util::TimeInfo::FromDayIndex(aggregate.daily().last_day_index());
}
return util::TimeInfo::FromHourId(aggregate.hourly().last_hour_id());
}
// Record that the observations generated from a previous call to GeneratedObservation, have been
// stored. This will clean up any aggregate data that is no longer needed, and update the last
// time info data so that future aggregation runs don't try to regenerate it.
//
// Note: This function should maintain only the minimum amount of data required to generate future
// observations. If the ReportAggregate contains data points that cannot be useful for any future
// observations, they should be deleted. If an hour_id or a day_index is passed to this function,
// an earlier hour_id or day_index will never be passed and the data associated with that can be
// deleted.
virtual void ObservationsCommitted(ReportAggregate *aggregate, util::TimeInfo time_info,
uint64_t system_profile_hash) const;
protected:
// UpdateAggregateData takes an |event_record| and adds it to the given |aggregate_data|
// object according to the aggregation procedure implemented by this instance of
// AggregationProcedure.
//
// |event_record|: The event that needs to be added to the aggregate data.
// |aggregate_data|: A mutable AggregateData object.
// |bucket|: A mutable AggregationPeriodBucket object containing |aggregate_data|.
virtual void UpdateAggregateData(const logger::EventRecord &event_record,
AggregateData *aggregate_data,
AggregationPeriodBucket *bucket) = 0;
// Merge two instances of the aggregate data for this procedure.
//
// The data from |aggregate_data| is merged into the data in |merged_aggregate_data|. Any fields
// in the |bucket| (which contains |aggregate_data|) are also merged into |merged_bucket| (which
// contains |merged_aggregate_data|).
//
// TODO(fxbug.dev/91520): implement this in all subclasses and make it pure virtual.
virtual void MergeAggregateData(AggregateData *merged_aggregate_data,
AggregationPeriodBucket *merged_bucket,
const AggregateData &aggregate_data,
const AggregationPeriodBucket &bucket) {}
// GenerateSingleObservation generates an observation for the given report at the given time.
//
// A non-ok Status means there was an error at some point while generating observations. An ok
// status means that the observation generation process completed successfully. The observation
// may be null if there was nothing to send for this aggregate in this time window.
//
// |buckets|: A list of AggregateDataToGenerate objects containing the AggregationData objects.
// |event_vectors|: The event vectors which should be included in the observation.
// |time_info|: The time for which the observation is being generated.
//
// Note: This method will always be called with increasing time_info values. If the report is not
// expedited, the values will always be strictly increasing. For expedited reports, the last
// (current day's) time_info can be used to make multiple calls during the day, so the
// implementation must be able to determine if an observation has already been generated for the
// data and not regenerate it.
virtual lib::statusor::StatusOr<std::unique_ptr<Observation>> GenerateSingleObservation(
const std::vector<AggregateDataToGenerate> &buckets,
const std::set<std::vector<uint32_t>> &event_vectors, const util::TimeInfo &time_info) = 0;
ReportDefinition::ReportType report_type() const { return report_type_; }
MetricDefinition::MetricType metric_type() const { return metric_type_; }
// Accumulates the data that needs to be checked for observation generation for time_info.
// The return value is a map from the system_profile_hash, to the data to use to generate the
// observation for that system profile.
std::map<uint64_t, std::vector<AggregateDataToGenerate>> GetAggregateDataToGenerate(
const util::TimeInfo &time_info, ReportAggregate *aggregate) const;
// Returns a set containing the first |event_vector_buffer_max_| event vectors which were logged
// for |aggregates|. The |buckets| should be ordered from earliest to latest aggregation period.
std::set<std::vector<uint32_t>> SelectEventVectorsForObservation(
const std::vector<AggregateDataToGenerate> &buckets) const;
void SetEventVectorBufferMax(uint64_t event_vector_buffer_max) {
event_vector_buffer_max_ = event_vector_buffer_max;
}
private:
// Returns a mutable reference to the AggregationPeriodBucket associated with the given time
// specifier.
//
// |time|: Either a day_index (for daily metrics) or an hour_id (for hourly metrics)
// |aggregate|: A mutable reference to a ReportAggregate object.
AggregationPeriodBucket *GetAggregationPeriodBucket(uint32_t time,
ReportAggregate *aggregate) const;
// Returns a pointer to the AggregateData of |bucket| that |event_record| should be added to,
// if |bucket| has capacity for the event vector of |event_record|. In addition, adds
// the event vector of |event_record| to |bucket|'s event_vectors buffer if it has capacity and
// the event vector is not yet present. Returns a null pointer if |bucket| does not have
// capacity.
AggregateData *GetAggregateData(const logger::EventRecord &event_record,
AggregationPeriodBucket *bucket, uint64_t system_profile_hash,
std::chrono::system_clock::time_point system_time) const;
// Returns the starting TimeInfo to use aggregate data from for this report, when processing the
// current_time_info.
util::TimeInfo GetStartTimeInfo(const util::TimeInfo &current_time_info) const;
OnDeviceAggregationWindow window_;
MetricDefinition::MetricType metric_type_;
ReportDefinition::ReportType report_type_;
SystemProfileSelectionPolicy system_profile_selection_policy_;
uint64_t event_vector_buffer_max_;
};
class UnimplementedAggregationProcedure : public AggregationProcedure {
public:
explicit UnimplementedAggregationProcedure(std::string name)
: AggregationProcedure(MetricDefinition(), ReportDefinition()), name_(std::move(name)) {}
~UnimplementedAggregationProcedure() override = default;
void UpdateAggregateData(const logger::EventRecord & /* event_record */,
AggregateData * /*aggregate_data*/,
AggregationPeriodBucket * /* bucket */) override {
LOG(ERROR) << "UpdateAggregateData is UNIMPLEMENTED for " << DebugString();
}
lib::statusor::StatusOr<std::unique_ptr<Observation>> GenerateSingleObservation(
const std::vector<AggregateDataToGenerate> & /* aggregates */,
const std::set<std::vector<uint32_t>> & /*event_vectors*/,
const util::TimeInfo & /*time_info*/) override {
return util::StatusBuilder(StatusCode::CANCELLED,
"GeneratingSingleObservation is UNIMPLEMENTED for ")
.AppendMsg(DebugString())
.LogError()
.Build();
}
[[nodiscard]] std::string DebugString() const override {
return absl::StrCat(name_, " (Unimplemented)");
}
[[nodiscard]] bool IsDaily() const override { return true; }
private:
std::string name_;
};
} // namespace cobalt::local_aggregation
#endif // COBALT_SRC_LOCAL_AGGREGATION_1_1_AGGREGATION_PROCEDURES_AGGREGATION_PROCEDURE_H_