// Copyright 2017 The Fuchsia Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//    http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "analyzer/report_master/histogram_analysis_engine.h"

#include <memory>
#include <string>
#include <utility>
#include <vector>

#include "./observation.pb.h"
#include "algorithms/forculus/forculus_analyzer.h"
#include "algorithms/rappor/basic_rappor_analyzer.h"
#include "algorithms/rappor/rappor_analyzer.h"
#include "config/buckets_config.h"
#include "glog/logging.h"
#include "util/log_based_metrics.h"

namespace cobalt {
namespace analyzer {

using config::AnalyzerConfig;
using forculus::ForculusAnalyzer;
using rappor::BasicRapporAnalyzer;
using rappor::RapporAnalyzer;
using store::ObservationStore;
using store::ReportStore;

// Stackdriver metric constants
namespace {
const char kCheckConsistentEncodingFailure[] =
    "check-consistent-encoding-failure";
const char kForculusAdapterPerformAnalysisFailure[] =
    "forculus-adapter-perform-analysis-failure";
const char kRapporAdapterPerformAnalysisFailure[] =
    "rappor-adapter-perform-analysis-failure";
const char kNoOpAdapterProcessObservationPartFailure[] =
    "no-op-adapter-process-observation-part-failure";
const char kNoOpIntBucketDistributionAdapterProcessObservationPartFailure[] =
    "no-op-int-bucket-distribution-adapter-process-observation-part-failure";
const char kPerformAnalysisFailure[] =
    "histogram-analysis-engine-perform-analysis-failure";
const char kGetDecoderFailure[] =
    "histogram-analysis-engine-get-decoder-failure";
const char kNewDecoderFailure[] =
    "histogram-analysis-engine-new-decoder-failure";
}  // namespace

namespace {

// Checks that the type of encoding used by the observation_part is the
// one specified by the encoding_config.
bool CheckConsistentEncoding(const EncodingConfig& encoding_config,
                             const ObservationPart& observation_part,
                             const ReportId& report_id) {
  bool consistent = true;
  switch (observation_part.value_case()) {
    case ObservationPart::kForculus:
      consistent = encoding_config.has_forculus();
      break;
    case ObservationPart::kBasicRappor:
      consistent = encoding_config.has_basic_rappor();
      break;
    case ObservationPart::kRappor:
      consistent = encoding_config.has_rappor();
      break;
    case ObservationPart::kUnencoded:
      consistent = encoding_config.has_no_op_encoding();
      break;
    case ObservationPart::VALUE_NOT_SET:
      consistent = false;
      break;

    default:
      LOG(FATAL) << "Unexpected case " << observation_part.value_case();
  }
  if (!consistent) {
    LOG_STACKDRIVER_COUNT_METRIC(ERROR, kCheckConsistentEncodingFailure)
        << "Bad ObservationPart! Value uses encoding "
        << observation_part.value_case() << " but "
        << encoding_config.config_case() << " expected."
        << " For report_id=" << ReportStore::ToString(report_id);
  }

  return consistent;
}

}  // namespace

////////////////////////////////////////////////////////////////////////////
/// class ForculusAdapter
//
// A concrete subclass of DecoderAdapter that adapts to a ForculsAnalyzer.
///////////////////////////////////////////////////////////////////////////
class ForculusAdapter : public DecoderAdapter {
 public:
  ForculusAdapter(const ReportId& report_id,
                  const cobalt::ForculusConfig& config)
      : report_id_(report_id), analyzer_(new ForculusAnalyzer(config)) {}

  bool ProcessObservationPart(uint32_t day_index,
                              const ObservationPart& obs) override {
    return analyzer_->AddObservation(day_index, obs.forculus());
  }

  grpc::Status PerformAnalysis(std::vector<ReportRow>* results) override {
    auto result_map = analyzer_->TakeResults();
    results->clear();
    for (const auto& pair : result_map) {
      ValuePart value_part;
      if (!value_part.ParseFromString(pair.first)) {
        LOG_STACKDRIVER_COUNT_METRIC(ERROR,
                                     kForculusAdapterPerformAnalysisFailure)
            << "Bad value. Could not parse as ValuePart: " << pair.first
            << "report_id=" << ReportStore::ToString(report_id_);
        continue;
      }
      results->emplace_back();
      HistogramReportRow* row = results->back().mutable_histogram();
      row->mutable_value()->Swap(&value_part);
      row->set_count_estimate(pair.second->total_count);
      // TODO(rudominer) We are not using some of the data that the
      // ForculusAnalyzer can return to us such as observation_errors().
      // Consider adding monitoring around this.
    }

    return grpc::Status::OK;
  }

 private:
  ReportId report_id_;
  std::unique_ptr<ForculusAnalyzer> analyzer_;
};

////////////////////////////////////////////////////////////////////////////
/// class RapporAdapter
//
// A concrete subclass of DecoderAdapter that adapts to a
// StringRapporAnalyzer.
//
// NOTE: String RAPPOR analysis is not yet fully implemented in Cobalt.
///////////////////////////////////////////////////////////////////////////
class RapporAdapter : public DecoderAdapter {
 public:
  RapporAdapter(const ReportId& report_id, const RapporConfig& config,
                const RapporCandidateList* candidates)
      : report_id_(report_id),
        analyzer_(new RapporAnalyzer(config, candidates)),
        candidates_(candidates) {}

  bool ProcessObservationPart(uint32_t day_index,
                              const ObservationPart& obs) override {
    return analyzer_->AddObservation(obs.rappor());
  }

  grpc::Status PerformAnalysis(std::vector<ReportRow>* results) override {
    std::vector<rappor::CandidateResult> candidate_results;
    auto status = analyzer_->Analyze(&candidate_results);
    if (!status.ok()) {
      LOG_STACKDRIVER_COUNT_METRIC(ERROR, kRapporAdapterPerformAnalysisFailure)
          << "String RAPPOR analysis failed with status=("
          << status.error_code() << ") " << status.error_message()
          << " For report_id=" << ReportStore::ToString(report_id_);
      return status;
    }
    // If candidates_ is null or empty then analyzer_->Analyze() will return
    // INVALID_ARGUMENT. If we are here then RAPPR analysis succeeded.
    CHECK((int)candidate_results.size() == candidates_->candidates_size());
    size_t candidate_index = 0;
    for (auto& candidate_result : candidate_results) {
      results->emplace_back();
      HistogramReportRow* row = results->back().mutable_histogram();
      ValuePart v;
      v.set_string_value(candidates_->candidates(candidate_index++));
      row->mutable_value()->Swap(&v);
      row->set_count_estimate(candidate_result.count_estimate);
      row->set_std_error(candidate_result.std_error);
    }
    // TODO(rudominer) We are not using some of the data that the
    // RapporAnalyzer can return to us such as observation_errors().
    // Consider adding monitoring around this.
    return grpc::Status::OK;
  }

 private:
  ReportId report_id_;
  std::unique_ptr<RapporAnalyzer> analyzer_;
  const RapporCandidateList* candidates_;  // not owned.
};

////////////////////////////////////////////////////////////////////////////
/// class BasicRapporAdapter
//
// A concrete subclass of DecoderAdapter that adapts to a BasicRapporAnalyzer.
///////////////////////////////////////////////////////////////////////////
class BasicRapporAdapter : public DecoderAdapter {
 public:
  BasicRapporAdapter(const ReportId& report_id,
                     const cobalt::BasicRapporConfig& config,
                     const IndexLabels* index_labels)
      : report_id_(report_id),
        analyzer_(new BasicRapporAnalyzer(config)),
        index_labels_(index_labels) {}

  bool ProcessObservationPart(uint32_t day_index,
                              const ObservationPart& obs) override {
    return analyzer_->AddObservation(obs.basic_rappor());
  }

  grpc::Status PerformAnalysis(std::vector<ReportRow>* results) override {
    auto category_results = analyzer_->Analyze();
    for (auto& category_result : category_results) {
      results->emplace_back();
      HistogramReportRow* row = results->back().mutable_histogram();
      row->mutable_value()->Swap(&category_result.category);
      // If the value is of type INDEX, meaning that it represents an index
      // into some enumerated set defined outside of the Cobalt configuration,
      // then check whether we were given an index label for this index and
      // if so attach the label to the report row.
      if (index_labels_ != nullptr &&
          row->value().data_case() == ValuePart::kIndexValue) {
        auto iter = index_labels_->labels().find(row->value().index_value());
        if (iter != index_labels_->labels().end()) {
          row->set_label(iter->second);
        }
      }
      row->set_count_estimate(category_result.count_estimate);
      row->set_std_error(category_result.std_error);
    }
    // TODO(rudominer) We are not using some of the data that the
    // BasicRapporAnalyzer can return to us such as observation_errors().
    // Consider adding monitoring around this.
    return grpc::Status::OK;
  }

 private:
  ReportId report_id_;
  std::unique_ptr<BasicRapporAnalyzer> analyzer_;
  const IndexLabels* index_labels_;  // not owned.
};

////////////////////////////////////////////////////////////////////////////
/// class NoOpAdapter
//
// A concrete subclass of DecoderAdapter that collects counts of
// UnencodedObservations in a hash map.
///////////////////////////////////////////////////////////////////////////
class NoOpAdapter : public DecoderAdapter {
 public:
  NoOpAdapter(const ReportId& report_id,
              const cobalt::NoOpEncodingConfig& config,
              const IndexLabels* index_labels)
      : report_id_(report_id), config_(config), index_labels_(index_labels) {}

  bool ProcessObservationPart(uint32_t day_index,
                              const ObservationPart& obs) override {
    std::string serialized_value;
    if (!obs.unencoded().unencoded_value().SerializeToString(
            &serialized_value)) {
      return false;
    }
    if (VLOG_IS_ON(5)) {
      std::ostringstream str;
      const auto& value = obs.unencoded().unencoded_value();
      switch (value.data_case()) {
        case ValuePart::kStringValue:
          str << value.string_value();
          break;
        case ValuePart::kIntValue:
          str << value.int_value();
          break;
        case ValuePart::kIndexValue:
          str << "index=" << value.index_value();
          break;
        case ValuePart::kDoubleValue:
          str << value.double_value();
        default:
          str << "[UNKNOWN DATA TYPE]";
      }
      VLOG(5) << "NoOpAdapter::ProcessObservationPart: " << str.str();
    }
    // For safety we will accept only up to 10,000 different values.
    static const size_t kMaxNumValues = 10000;
    if (counts_.size() >= kMaxNumValues) {
      LOG_STACKDRIVER_COUNT_METRIC(ERROR,
                                   kNoOpAdapterProcessObservationPartFailure)
          << "Report truncated! May not exceed " << kMaxNumValues
          << " different values."
          << " report_id=" << ReportStore::ToString(report_id_);
      return false;
    }
    counts_[serialized_value]++;
    return true;
  }

  grpc::Status PerformAnalysis(std::vector<ReportRow>* results) override {
    for (const auto& pair : counts_) {
      results->emplace_back();
      HistogramReportRow* row = results->back().mutable_histogram();
      row->mutable_value()->ParseFromString(pair.first);
      row->set_count_estimate(pair.second);
      row->set_std_error(0);
      // If the value is of type INDEX, meaning that it represents an index
      // into some enumerated set defined outside of the Cobalt configuration,
      // then check whether we were given an index label for this index and
      // if so attach the label to the report row.
      if (index_labels_ != nullptr &&
          row->value().data_case() == ValuePart::kIndexValue) {
        auto iter = index_labels_->labels().find(row->value().index_value());
        if (iter != index_labels_->labels().end()) {
          row->set_label(iter->second);
        }
      }
    }
    return grpc::Status::OK;
  }

 private:
  ReportId report_id_;
  cobalt::NoOpEncodingConfig config_;
  std::map<std::string, size_t> counts_;
  const IndexLabels* index_labels_;  // not owned.
};

////////////////////////////////////////////////////////////////////////////
/// class NoOpIntBucketDistributionAdapter
//
// A concrete subclass of DecoderAdapter that collects counts of bucketed
// integer observations and merges int bucket distribution observations.
///////////////////////////////////////////////////////////////////////////
class NoOpIntBucketDistributionAdapter : public DecoderAdapter {
 public:
  NoOpIntBucketDistributionAdapter(
      const ReportId& report_id, const cobalt::NoOpEncodingConfig& config,
      std::unique_ptr<cobalt::config::IntegerBucketConfig> int_bucket_config)
      : report_id_(report_id),
        config_(config),
        int_bucket_config_(std::move(int_bucket_config)) {}

  bool ProcessObservationPart(uint32_t day_index,
                              const ObservationPart& obs) override {
    if (obs.value_case() != ObservationPart::kUnencoded) {
      LOG_STACKDRIVER_COUNT_METRIC(
          ERROR, kNoOpIntBucketDistributionAdapterProcessObservationPartFailure)
          << "Encoded observation ignored. report_id="
          << ReportStore::ToString(report_id_);
      return false;
    }

    const auto& value = obs.unencoded().unencoded_value();
    // If the value provided is an integer, we bucket it and increment the
    // corresponding bucket.
    if (ValuePart::kIntValue == value.data_case()) {
      counts_[int_bucket_config_->BucketIndex(value.int_value())] += 1;
      return true;
    }

    if (ValuePart::kIntBucketDistribution == value.data_case()) {
      // First, we check that all the indices correspond to valid buckets.
      for (auto iter = value.int_bucket_distribution().counts().begin();
           value.int_bucket_distribution().counts().end() != iter; iter++) {
        if (iter->first > int_bucket_config_->OverflowBucket()) {
          return false;
        }
      }

      for (auto iter = value.int_bucket_distribution().counts().begin();
           value.int_bucket_distribution().counts().end() != iter; iter++) {
        counts_[iter->first] += iter->second;
      }
      return true;
    }
    return false;
  }

  grpc::Status PerformAnalysis(std::vector<ReportRow>* results) override {
    for (const auto& pair : counts_) {
      results->emplace_back();
      HistogramReportRow* row = results->back().mutable_histogram();
      row->mutable_value()->set_index_value(pair.first);
      row->set_count_estimate(pair.second);
      row->set_std_error(0);
      // TODO(azani): Generate labels.
    }
    return grpc::Status::OK;
  }

 private:
  ReportId report_id_;
  cobalt::NoOpEncodingConfig config_;
  std::map<uint32_t, size_t> counts_;
  std::unique_ptr<cobalt::config::IntegerBucketConfig> int_bucket_config_;
};

////////////////////////////////////////////////////////////////////////////
/// HistogramAnalysisEngine methods.
///////////////////////////////////////////////////////////////////////////
HistogramAnalysisEngine::HistogramAnalysisEngine(
    const ReportId& report_id, const ReportVariable* report_variable,
    const MetricPart* metric_part,
    std::shared_ptr<AnalyzerConfig> analyzer_config)
    : report_id_(report_id),
      report_variable_(report_variable),
      metric_part_(metric_part),
      analyzer_config_(analyzer_config) {}

bool HistogramAnalysisEngine::ProcessObservationPart(
    uint32_t day_index, const ObservationPart& obs,
    std::unique_ptr<SystemProfile> profile) {
  DecoderAdapter* decoder = GetDecoder(obs, std::move(profile));
  if (!decoder) {
    return false;
  }
  return decoder->ProcessObservationPart(day_index, obs);
}

// Note that despite the comments in histogram_analysis_engine.h, version 0.1 of
// Cobalt does not yet support reports that are heterogeneous with respect
// to encoding. In this version the purpose of the HistogramAnalysisEngine is to
// ensure that in fact the set of observations is not heterogeneous.
grpc::Status HistogramAnalysisEngine::PerformAnalysis(
    std::vector<ReportRow>* results) {
  CHECK(results);

  if (grouped_decoders_.size() == 0) {
    LOG(INFO)
        << "Empty HISTOGRAM report. No valid observations found for report_id="
        << ReportStore::ToString(report_id_);
    return grpc::Status::OK;
  }

  grpc::Status status;
  for (auto& decoder_group : grouped_decoders_) {
    if (decoder_group.second.decoders.size() > 1) {
      std::ostringstream stream;
      stream << "Analysis aborted because more than one encoding_config_id was "
                "found among the observations: ";
      bool first = true;
      for (const auto& id : decoder_group.second.decoders) {
        if (!first) {
          stream << ", ";
        }
        stream << id.first;
        first = false;
      }
      stream << ". This version of Cobalt does not support heterogeneous "
                "reports. report_id="
             << ReportStore::ToString(report_id_);
      std::string message = stream.str();
      LOG_STACKDRIVER_COUNT_METRIC(ERROR, kPerformAnalysisFailure) << message;
      return grpc::Status(grpc::UNIMPLEMENTED, message);
    }

    auto decoder = decoder_group.second.decoders.begin();
    std::vector<ReportRow> sub_results;
    status = decoder->second->PerformAnalysis(&sub_results);
    if (!status.ok()) {
      return status;
    }

    for (auto& row : sub_results) {
      // This should always be true since this is the HistogramAnalysisEngine.
      if (row.has_histogram() && decoder_group.second.profile != nullptr) {
        *row.mutable_histogram()->mutable_system_profile() =
            *decoder_group.second.profile;
      }
    }

    results->insert(results->end(), sub_results.begin(), sub_results.end());
  }

  return status;
}

DecoderAdapter* HistogramAnalysisEngine::GetDecoder(
    const ObservationPart& observation_part,
    std::unique_ptr<SystemProfile> profile) {
  uint32_t encoding_config_id = observation_part.encoding_config_id();
  const EncodingConfig* encoding_config = analyzer_config_->EncodingConfig(
      report_id_.customer_id(), report_id_.project_id(), encoding_config_id);
  if (!encoding_config) {
    LOG_STACKDRIVER_COUNT_METRIC(ERROR, kGetDecoderFailure)
        << "Bad ObservationPart! Contains invalid encoding_config_id "
        << encoding_config_id
        << " for report_id=" << ReportStore::ToString(report_id_);
    return nullptr;
  }
  if (!CheckConsistentEncoding(*encoding_config, observation_part,
                               report_id_)) {
    return nullptr;
  }

  std::string group_by;
  if (profile != nullptr) {
    profile->SerializeToString(&group_by);
  }

  auto group = grouped_decoders_.find(group_by);
  if (group == grouped_decoders_.end()) {
    // This is the first time we are seeing this SystemProfile. Create a new
    // DecoderGroup and move the SystemProfile into it.
    grouped_decoders_[group_by].profile = std::move(profile);
  }

  auto iter = grouped_decoders_[group_by].decoders.find(encoding_config_id);
  if (iter != grouped_decoders_[group_by].decoders.end()) {
    return iter->second.get();
  }

  // This is the first time we have seen the pair (|group_by|,
  // |encoding_config_id|). Make a new decoder/analyzer for it.
  grouped_decoders_[group_by].decoders[encoding_config_id] =
      NewDecoder(encoding_config);

  return grouped_decoders_[group_by].decoders[encoding_config_id].get();
}

std::unique_ptr<DecoderAdapter> HistogramAnalysisEngine::NewDecoder(
    const EncodingConfig* encoding_config) {
  if (metric_part_->has_int_buckets()) {
    return std::unique_ptr<DecoderAdapter>(new NoOpIntBucketDistributionAdapter(
        report_id_, encoding_config->no_op_encoding(),
        cobalt::config::IntegerBucketConfig::CreateFromProto(
            metric_part_->int_buckets())));
  }

  const IndexLabels* index_labels = nullptr;
  if (report_variable_->has_index_labels()) {
    index_labels = &(report_variable_->index_labels());
  }
  switch (encoding_config->config_case()) {
    case EncodingConfig::kForculus:
      return std::unique_ptr<DecoderAdapter>(
          new ForculusAdapter(report_id_, encoding_config->forculus()));
    case EncodingConfig::kRappor: {
      const RapporCandidateList* rappor_candidates = nullptr;
      if (report_variable_->has_rappor_candidates()) {
        rappor_candidates = &(report_variable_->rappor_candidates());
      } else {
        LOG_STACKDRIVER_COUNT_METRIC(ERROR, kNewDecoderFailure)
            << "HistogramAnalysisEngine: Received an observation with "
               "encoding_config_id="
            << encoding_config->id()
            << " for String RAPPOR but no RAPPOR candidates are "
               "specified for report_id="
            << ReportStore::ToString(report_id_);
      }
      return std::unique_ptr<DecoderAdapter>(new RapporAdapter(
          report_id_, encoding_config->rappor(), rappor_candidates));
    }
    case EncodingConfig::kBasicRappor: {
      return std::unique_ptr<DecoderAdapter>(new BasicRapporAdapter(
          report_id_, encoding_config->basic_rappor(), index_labels));
    }
    case EncodingConfig::kNoOpEncoding: {
      return std::unique_ptr<DecoderAdapter>(new NoOpAdapter(
          report_id_, encoding_config->no_op_encoding(), index_labels));
    }
    default:
      LOG(FATAL) << "Unexpected EncodingConfig type "
                 << encoding_config->config_case();
  }
}

}  // namespace analyzer
}  // namespace cobalt
