blob: 9940cadcedba877a2be2bea037fc13ca34c6eb1d [file] [log] [blame]
// Copyright 2017 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "analyzer/report_master/report_exporter.h"
#include <memory>
#include <sstream>
#include <thread>
#include "analyzer/report_master/report_serializer.h"
#include "analyzer/report_master/report_stream.h"
#include "glog/logging.h"
#include "util/log_based_metrics.h"
namespace cobalt {
namespace analyzer {
using util::gcs::GcsUtil;
// Stackdriver metric constants
namespace {
const char kExportReportFailure[] = "report-exporter-export-report-failure";
const char kUploadToGCSError[] = "gcs-uploader-upload-to-gcs-failure";
const char kPingBucketFailure[] = "gcs-uploader-ping-bucket-failure";
} // namespace
namespace {
std::string ExtensionForMimeType(const std::string& mime_type) {
if (mime_type == "text/csv") {
return "csv";
}
return "";
}
} // namespace
ReportExporter::ReportExporter(std::shared_ptr<GcsUploadInterface> uploader)
: uploader_(uploader) {}
grpc::Status ReportExporter::ExportReport(const ReportConfig& report_config,
const ReportMetadataLite& metadata,
ReportRowIterator* row_iterator) {
if (metadata.export_name().empty()) {
// If we were not told to export this report, there is nothing to do.
return grpc::Status::OK;
}
grpc::Status overall_status = grpc::Status::OK;
bool first_export = true;
for (const auto& export_config : report_config.export_configs()) {
if (first_export) {
first_export = false;
} else {
auto status = row_iterator->Reset();
if (!status.ok()) {
return status;
}
}
auto status =
ExportReportOnce(report_config, metadata, export_config, row_iterator);
if (!status.ok()) {
overall_status = status;
}
}
return overall_status;
}
grpc::Status ReportExporter::ExportReportOnce(
const ReportConfig& report_config, const ReportMetadataLite& metadata,
const ReportExportConfig& export_config, ReportRowIterator* row_iterator) {
ReportSerializer serializer(&report_config, &metadata, &export_config);
ReportStream report_stream(&serializer, row_iterator);
auto status = report_stream.Start();
if (!status.ok()) {
return status;
}
auto location_case = export_config.export_location_case();
switch (location_case) {
case ReportExportConfig::kGcs:
return ExportReportToGCS(report_config, export_config.gcs(), metadata,
report_stream.mime_type(), &report_stream);
break;
default: {
std::ostringstream stream;
stream << "Unrecognized export_location: " << location_case;
std::string message = stream.str();
LOG_STACKDRIVER_COUNT_METRIC(ERROR, kExportReportFailure) << message;
return grpc::Status(grpc::INTERNAL, message);
}
}
}
grpc::Status ReportExporter::ExportReportToGCS(
const ReportConfig& report_config, const GCSExportLocation& location,
const ReportMetadataLite& metadata, const std::string& mime_type,
ReportStream* report_stream) {
if (location.bucket().empty()) {
std::string message = "CSVExportLocation has empty |bucket|";
LOG_STACKDRIVER_COUNT_METRIC(ERROR, kExportReportFailure) << message;
return grpc::Status(grpc::INVALID_ARGUMENT, message);
}
return uploader_->UploadToGCS(location.bucket(),
GcsPath(report_config, metadata, mime_type),
mime_type, report_stream);
}
std::string ReportExporter::GcsPath(const ReportConfig& report_config,
const ReportMetadataLite& metadata,
const std::string& mime_type) {
std::ostringstream stream;
stream << report_config.customer_id() << "_" << report_config.project_id()
<< "_" << report_config.id() << "/" << metadata.export_name();
if (metadata.export_name().find('.') == std::string::npos) {
std::string extension = ExtensionForMimeType(mime_type);
if (!extension.empty()) {
stream << "." << extension;
}
}
return stream.str();
}
grpc::Status GcsUploader::UploadToGCS(const std::string& bucket,
const std::string& path,
const std::string& mime_type,
ReportStream* report_stream) {
if (!gcs_util_) {
gcs_util_.reset(new GcsUtil());
if (!gcs_util_->InitFromDefaultPaths()) {
gcs_util_.reset();
std::string message = "Unable to initialize GcsUtil.";
LOG_STACKDRIVER_COUNT_METRIC(ERROR, kUploadToGCSError) << message;
return grpc::Status(grpc::INTERNAL, message);
}
}
// We perform a "Ping" before performing an upload. A ping uses
// a GET request whereas an upload uses a POST request. If the
// currently cached OAUTH token needs to be refreshed, then the
// server will return a 401 UNAUTHORIZED causing the google-api-cpp-client
// to refresh the OAUTH token and then retry the original request. We are
// working around a bug in that code in which it fails to reset the content
// input stream when it does that retry. The symptom is either that an empty
// file is uploaded, overwriting the previously uploaded serialized report,
// or else a timeout occurs, depending on which implementation of input
// stream is used. When using our ReportInputStream it is the former.
// Although we have fixed this bug in our copy of google-api-cpp-client we
// still choose to use the ping here in order to avoid that code path out of
// caution.
auto status = PingBucket(bucket);
if (!status.ok()) {
gcs_util_.reset();
return status;
}
int seconds_to_sleep = 1;
for (int i = 0; i < 5; i++) {
// We will allow up to 15 minutes to upload a single report to GCS.
static const uint32_t kReportUploadTimeoutSeconds = 60 * 15;
if (gcs_util_->Upload(bucket, path, mime_type, report_stream,
kReportUploadTimeoutSeconds) &&
report_stream->status().ok()) {
return grpc::Status::OK;
}
if (i < 4) {
LOG(WARNING) << "Upload to GCS at " << bucket << "|" << path
<< " failed. Sleeping for " << seconds_to_sleep
<< " seconds before trying again.";
std::this_thread::sleep_for(std::chrono::seconds(seconds_to_sleep));
seconds_to_sleep *= 2;
}
}
gcs_util_.reset();
std::ostringstream stream;
stream << "Upload to GCS at " << bucket << "|" << path
<< " failed five times. Giving up.";
std::string message = stream.str();
LOG_STACKDRIVER_COUNT_METRIC(ERROR, kUploadToGCSError) << message;
return grpc::Status(grpc::INTERNAL, message);
}
grpc::Status GcsUploader::PingBucket(const std::string& bucket) {
if (!gcs_util_) {
gcs_util_.reset(new GcsUtil());
if (!gcs_util_->InitFromDefaultPaths()) {
gcs_util_.reset();
std::string message = "Unable to initialize GcsUtil.";
LOG_STACKDRIVER_COUNT_METRIC(ERROR, kPingBucketFailure) << message;
return grpc::Status(grpc::INTERNAL, message);
}
}
int seconds_to_sleep = 1;
for (int i = 0; i < 5; i++) {
if (gcs_util_->Ping(bucket)) {
return grpc::Status::OK;
}
if (i < 4) {
LOG(WARNING) << "Pinging " << bucket << " failed. Sleeping for "
<< seconds_to_sleep << " seconds before trying again.";
std::this_thread::sleep_for(std::chrono::seconds(seconds_to_sleep));
seconds_to_sleep *= 2;
}
}
gcs_util_.reset();
std::ostringstream stream;
stream << "Pinging " << bucket << " failed five times. Giving up.";
std::string message = stream.str();
LOG_STACKDRIVER_COUNT_METRIC(ERROR, kPingBucketFailure) << message;
return grpc::Status(grpc::INTERNAL, message);
}
} // namespace analyzer
} // namespace cobalt