// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#ifndef COBALT_SRC_LOCAL_AGGREGATION_1_1_AGGREGATION_PROCEDURES_AGGREGATION_PROCEDURE_H_
#define COBALT_SRC_LOCAL_AGGREGATION_1_1_AGGREGATION_PROCEDURES_AGGREGATION_PROCEDURE_H_

#include <cstdint>
#include <set>
#include <string>
#include <vector>

#include "src/lib/statusor/statusor.h"
#include "src/lib/util/datetime_util.h"
#include "src/local_aggregation_1_1/local_aggregation.pb.h"
#include "src/logger/event_record.h"
#include "src/logging.h"
#include "src/pb/event.pb.h"
#include "src/pb/observation.pb.h"
#include "src/registry/aggregation_window.pb.h"
#include "src/registry/metric_definition.pb.h"
#include "src/registry/report_definition.pb.h"

namespace cobalt::local_aggregation {

// AggregationProcedure is the abstract interface for aggregation logic.
//
// For each local aggregation procedure, there should be an implementation of this class.
//
// An AggregationProcedure should not have any of its own state. Its role is to update the state
// within the instance of ReportAggregate it is passed in UpdateAggregate() and
// MaybeGenerateObservation().
//
// See Also:
// * LocalAggregation calls UpdateAggregate() whenever AddEvent() is called.
// * ObservationGenerator calls MaybeGenerateObservation() on a regular schedule (~once per hour).
//
// Note:
// * AggregationProcedures that are always hour-based should implement HourlyAggregationProcedure
//   instead.
class AggregationProcedure {
 public:
  // Construct an AggregationProcedure
  //
  // |metric| Used to extract MetricType for validating EventRecords.
  // |report| Used for extracting report-specific parameters.
  explicit AggregationProcedure(const MetricDefinition &metric, const ReportDefinition &report);
  virtual ~AggregationProcedure() = default;

  // AggregationProcedure::Get returns the appropriate AggregationProcedure for the given
  // metric/report.
  //
  // A return value of nullptr should be interpreted as a report that is not supported by cobalt 1.1
  // (perhaps a cobalt 1.0 metric/report)
  static std::unique_ptr<AggregationProcedure> Get(const MetricDefinition &metric,
                                                   const ReportDefinition &report);

  // UpdateAggregate takes an |event_record| and adds it to the given |aggregate| object according
  // to the aggregation procedure implemented by this instance of AggregationProcedure.
  //
  // |event_record|: The event that needs to be added to the aggregate.
  // |aggregate|: A mutable ReportAggregate object.
  void UpdateAggregate(const logger::EventRecord &event_record, ReportAggregate *aggregate);

  // GenerateObservation is the public interface for generating observations. It handles
  // reading EventCodeAggregates out of the ReportAggregate based on the provided time_info, and
  // passing all that information down to GenerateSingleObservation.
  //
  // |time_info|: Time period for which this observation should be generated.
  // |aggregate|: A mutable reference to a ReportAggregate object.
  lib::statusor::StatusOr<std::unique_ptr<Observation>> GenerateObservation(
      const util::TimeInfo &time_info, ReportAggregate *aggregate);

  [[nodiscard]] virtual std::string DebugString() const = 0;

  // IsDaily returns true if this particular report should be treated as daily data.
  [[nodiscard]] virtual bool IsDaily() const = 0;

  // IsValidEventType returns true if the given event type can be handled by the
  // AggregationProcedure.
  bool IsValidEventType(Event::TypeCase type) const;

  util::TimeInfo GetLastTimeInfo(const ReportAggregate &aggregate) const {
    if (IsDaily()) {
      return util::TimeInfo::FromDayIndex(aggregate.daily().last_day_index());
    }
    return util::TimeInfo::FromHourId(aggregate.hourly().last_hour_id());
  }

  void SetLastTimeInfo(ReportAggregate *aggregate, util::TimeInfo info) const {
    if (IsDaily()) {
      aggregate->mutable_daily()->set_last_day_index(info.day_index);
    } else {
      aggregate->mutable_hourly()->set_last_hour_id(info.hour_id);
    }
  }

 protected:
  // UpdateAggregateData takes an |event_record| and adds it to the given |aggregate_data|
  // object according to the aggregation procedure implemented by this instance of
  // AggregationProcedure.
  //
  // |event_record|: The event that needs to be added to the aggregate data.
  // |aggregate_data|: A mutable AggregateData object.
  // |event_code_aggregate|: A mutable EventCodeAggregate object containing |aggregate_data|.
  virtual void UpdateAggregateData(const logger::EventRecord &event_record,
                                   AggregateData *aggregate_data,
                                   EventCodeAggregate *event_code_aggregate) = 0;

  // GenerateSingleObservation generates an observation for the given report at the given time.
  //
  // A non-ok Status means there was an error at some point while generating observations. An ok
  // status means that the observation was generated sucessfully and should never be null.
  //
  // This method should generate a 'zero' observation if no data is present. The 'zero' observation
  // should be of the same type as the non-zero observations, but usually with no fields populated.
  //
  // |aggregates|: A list of mutable references to EventCodeAggregate objects.
  // |event_vectors|: The event vectors which should be included in the observation.
  //
  // Note: This function should maintain only the minimum amount of data required to generate future
  // observations. If the ReportAggregate contains data points that cannot be useful for any future
  // observations, they should be deleted. This method will always be called with strictly
  // increasing time_info values. If an hour_id or a day_index is passed to this function, an
  // earlier hour_id or day_index will never be passed and the data associated with that can be
  // deleted.
  virtual lib::statusor::StatusOr<std::unique_ptr<Observation>> GenerateSingleObservation(
      const std::vector<EventCodeAggregate *> &aggregates,
      const std::set<std::vector<uint32_t>> &event_vectors) = 0;

  ReportDefinition::ReportType report_type() const { return report_type_; }
  MetricDefinition::MetricType metric_type() const { return metric_type_; }

  // Returns a pointer to the EventCodeAggregate for the current time window (daily or hourly,
  // depending on the report for which |event_record| was logged).
  EventCodeAggregate *GetEventCodeAggregate(const logger::EventRecord &event_record,
                                            ReportAggregate *aggregate) const;

  void SetEventCodeBufferMax(uint64_t event_code_buffer_max) {
    event_code_buffer_max_ = event_code_buffer_max;
  }

 private:
  // Returns a mutable reference to the EventCodeAggregate associated with the given time specifier.
  //
  // |time|: Either a day_index (for daily metrics) or an hour_id (for hourly metrics)
  // |aggregate|: A mutable reference to a ReportAggregate object.
  EventCodeAggregate *GetEventCodeAggregate(uint32_t time, ReportAggregate *aggregate) const;

  // Returns a pointer to the AggregateData of |aggregate| that |event_record| should be added to,
  // if |aggregate| has capacity for the event vector of |event_record|. In addition, adds
  // the event vector of |event_record| to |aggregate|'s event_vectors buffer if it has capacity and
  // the event vector is not yet present. Returns a null pointer if |aggregate| does not have
  // capacity.
  AggregateData *GetAggregateData(const logger::EventRecord &event_record,
                                  EventCodeAggregate *aggregate) const;

  // Returns a set containing the first |event_code_buffer_max_| event vectors which were logged for
  // |aggregates|. The |aggregates| should be ordered from earliest to latest.
  std::set<std::vector<uint32_t>> SelectEventVectorsForObservation(
      const std::vector<EventCodeAggregate *> &aggregates) const;

  OnDeviceAggregationWindow window_;
  MetricDefinition::MetricType metric_type_;
  ReportDefinition::ReportType report_type_;
  uint64_t event_code_buffer_max_;
};

class UnimplementedAggregationProcedure : public AggregationProcedure {
 public:
  explicit UnimplementedAggregationProcedure(std::string name)
      : AggregationProcedure(MetricDefinition(), ReportDefinition()), name_(std::move(name)) {}
  ~UnimplementedAggregationProcedure() override = default;

  void UpdateAggregateData(const logger::EventRecord & /* event_record */,
                           AggregateData * /*aggregate_data*/,
                           EventCodeAggregate * /* aggregate */) override {
    LOG(ERROR) << "UpdateEventCodeAggregate is UNIMPLEMENTED for " << DebugString();
  }

  lib::statusor::StatusOr<std::unique_ptr<Observation>> GenerateSingleObservation(
      const std::vector<EventCodeAggregate *> & /* aggregates */,
      const std::set<std::vector<uint32_t>> & /*event_vectors*/) override {
    LOG(ERROR) << "GenerateSingleObservation is UNIMPLEMENTED for " << DebugString();

    return util::Status::CANCELLED;
  }

  [[nodiscard]] std::string DebugString() const override {
    return absl::StrCat(name_, " (Unimplemented)");
  }

  [[nodiscard]] bool IsDaily() const override { return true; }

 private:
  std::string name_;
};

}  // namespace cobalt::local_aggregation

#endif  // COBALT_SRC_LOCAL_AGGREGATION_1_1_AGGREGATION_PROCEDURES_AGGREGATION_PROCEDURE_H_
