// Copyright 2017 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "analyzer/report_master/report_exporter.h"

#include <memory>
#include <sstream>
#include <thread>

#include "analyzer/report_master/report_serializer.h"
#include "analyzer/report_master/report_stream.h"
#include "glog/logging.h"
#include "util/log_based_metrics.h"

namespace cobalt {
namespace analyzer {

using util::gcs::GcsUtil;

// Stackdriver metric constants
namespace {
const char kExportReportFailure[] = "report-exporter-export-report-failure";
const char kUploadToGCSError[] = "gcs-uploader-upload-to-gcs-failure";
const char kPingBucketFailure[] = "gcs-uploader-ping-bucket-failure";
}  // namespace

namespace {

std::string ExtensionForMimeType(const std::string& mime_type) {
  if (mime_type == "text/csv") {
    return "csv";
  }
  return "";
}

}  // namespace

ReportExporter::ReportExporter(std::shared_ptr<GcsUploadInterface> uploader)
    : uploader_(uploader) {}

grpc::Status ReportExporter::ExportReport(const ReportConfig& report_config,
                                          const ReportMetadataLite& metadata,
                                          ReportRowIterator* row_iterator) {
  if (metadata.export_name().empty()) {
    // If we were not told to export this report, there is nothing to do.
    return grpc::Status::OK;
  }

  grpc::Status overall_status = grpc::Status::OK;
  bool first_export = true;
  for (const auto& export_config : report_config.export_configs()) {
    if (first_export) {
      first_export = false;
    } else {
      auto status = row_iterator->Reset();
      if (!status.ok()) {
        return status;
      }
    }
    auto status =
        ExportReportOnce(report_config, metadata, export_config, row_iterator);
    if (!status.ok()) {
      overall_status = status;
    }
  }
  return overall_status;
}

grpc::Status ReportExporter::ExportReportOnce(
    const ReportConfig& report_config, const ReportMetadataLite& metadata,
    const ReportExportConfig& export_config, ReportRowIterator* row_iterator) {
  ReportSerializer serializer(&report_config, &metadata, &export_config);
  ReportStream report_stream(&serializer, row_iterator);
  auto status = report_stream.Start();
  if (!status.ok()) {
    return status;
  }
  auto location_case = export_config.export_location_case();
  switch (location_case) {
    case ReportExportConfig::kGcs:
      return ExportReportToGCS(report_config, export_config.gcs(), metadata,
                               report_stream.mime_type(), &report_stream);
      break;

    default: {
      std::ostringstream stream;
      stream << "Unrecognized export_location: " << location_case;
      std::string message = stream.str();
      LOG_STACKDRIVER_COUNT_METRIC(ERROR, kExportReportFailure) << message;
      return grpc::Status(grpc::INTERNAL, message);
    }
  }
}

grpc::Status ReportExporter::ExportReportToGCS(
    const ReportConfig& report_config, const GCSExportLocation& location,
    const ReportMetadataLite& metadata, const std::string& mime_type,
    ReportStream* report_stream) {
  if (location.bucket().empty()) {
    std::string message = "CSVExportLocation has empty |bucket|";
    LOG_STACKDRIVER_COUNT_METRIC(ERROR, kExportReportFailure) << message;
    return grpc::Status(grpc::INVALID_ARGUMENT, message);
  }

  return uploader_->UploadToGCS(location.bucket(),
                                GcsPath(report_config, metadata, mime_type),
                                mime_type, report_stream);
}

std::string ReportExporter::GcsPath(const ReportConfig& report_config,
                                    const ReportMetadataLite& metadata,
                                    const std::string& mime_type) {
  std::ostringstream stream;
  stream << report_config.customer_id() << "_" << report_config.project_id()
         << "_" << report_config.id() << "/" << metadata.export_name();
  if (metadata.export_name().find('.') == std::string::npos) {
    std::string extension = ExtensionForMimeType(mime_type);
    if (!extension.empty()) {
      stream << "." << extension;
    }
  }
  return stream.str();
}

grpc::Status GcsUploader::UploadToGCS(const std::string& bucket,
                                      const std::string& path,
                                      const std::string& mime_type,
                                      ReportStream* report_stream) {
  if (!gcs_util_) {
    gcs_util_.reset(new GcsUtil());
    if (!gcs_util_->InitFromDefaultPaths()) {
      gcs_util_.reset();
      std::string message = "Unable to initialize GcsUtil.";
      LOG_STACKDRIVER_COUNT_METRIC(ERROR, kUploadToGCSError) << message;
      return grpc::Status(grpc::INTERNAL, message);
    }
  }
  // We perform a "Ping" before performing an upload. A ping uses
  // a GET request whereas an upload uses a POST request. If the
  // currently cached OAUTH token needs to be refreshed, then the
  // server will return a 401 UNAUTHORIZED causing the google-api-cpp-client
  // to refresh the OAUTH token and then retry the original request. We are
  // working around a bug in that code in which it fails to reset the content
  // input stream when it does that retry. The symptom is either that an empty
  // file is uploaded, overwriting the previously uploaded serialized report,
  // or else a timeout occurs, depending on which implementation of input
  // stream is used. When using our ReportInputStream it is the former.
  // Although we have fixed this bug in our copy of google-api-cpp-client we
  // still choose to use the ping here in order to avoid that code path out of
  // caution.
  auto status = PingBucket(bucket);
  if (!status.ok()) {
    gcs_util_.reset();
    return status;
  }
  int seconds_to_sleep = 1;
  for (int i = 0; i < 5; i++) {
    // We will allow up to 15 minutes to upload a single report to GCS.
    static const uint32_t kReportUploadTimeoutSeconds = 60 * 15;
    if (gcs_util_->Upload(bucket, path, mime_type, report_stream,
                          kReportUploadTimeoutSeconds) &&
        report_stream->status().ok()) {
      return grpc::Status::OK;
    }
    if (i < 4) {
      LOG(WARNING) << "Upload to GCS at " << bucket << "|" << path
                   << " failed. Sleeping for " << seconds_to_sleep
                   << " seconds before trying again.";
      std::this_thread::sleep_for(std::chrono::seconds(seconds_to_sleep));
      seconds_to_sleep *= 2;
    }
  }
  gcs_util_.reset();
  std::ostringstream stream;
  stream << "Upload to GCS at " << bucket << "|" << path
         << " failed five times. Giving up.";
  std::string message = stream.str();
  LOG_STACKDRIVER_COUNT_METRIC(ERROR, kUploadToGCSError) << message;
  return grpc::Status(grpc::INTERNAL, message);
}

grpc::Status GcsUploader::PingBucket(const std::string& bucket) {
  if (!gcs_util_) {
    gcs_util_.reset(new GcsUtil());
    if (!gcs_util_->InitFromDefaultPaths()) {
      gcs_util_.reset();
      std::string message = "Unable to initialize GcsUtil.";
      LOG_STACKDRIVER_COUNT_METRIC(ERROR, kPingBucketFailure) << message;
      return grpc::Status(grpc::INTERNAL, message);
    }
  }
  int seconds_to_sleep = 1;
  for (int i = 0; i < 5; i++) {
    if (gcs_util_->Ping(bucket)) {
      return grpc::Status::OK;
    }

    if (i < 4) {
      LOG(WARNING) << "Pinging " << bucket << " failed. Sleeping for "
                   << seconds_to_sleep << " seconds before trying again.";
      std::this_thread::sleep_for(std::chrono::seconds(seconds_to_sleep));
      seconds_to_sleep *= 2;
    }
  }
  gcs_util_.reset();
  std::ostringstream stream;
  stream << "Pinging " << bucket << " failed five times. Giving up.";
  std::string message = stream.str();
  LOG_STACKDRIVER_COUNT_METRIC(ERROR, kPingBucketFailure) << message;
  return grpc::Status(grpc::INTERNAL, message);
}

}  // namespace analyzer
}  // namespace cobalt
