blob: e4030f51bff87713bf03a7cd471771766fbc32a7 [file] [log] [blame]
// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "analyzer/report_master/raw_dump_reports.h"
#include <memory>
#include <string>
#include <utility>
#include <vector>
#include "glog/logging.h"
#include "util/log_based_metrics.h"
namespace cobalt {
namespace analyzer {
using config::AnalyzerConfig;
using store::ObservationStore;
// Stackdriver metric constants
namespace {
const char kRawDumpReportError[] = "raw-dump-report-error";
} // namespace
RawDumpReportRowIterator::RawDumpReportRowIterator(
uint32_t customer_id, uint32_t project_id, uint32_t metric_id,
uint32_t start_day_index, uint32_t end_day_index,
std::vector<std::string> parts,
const SystemProfileFields& included_system_profile_fields,
std::string report_id_string,
std::shared_ptr<store::ObservationStore> observation_store,
std::shared_ptr<AnalyzerConfig> analyzer_config)
: customer_id_(customer_id),
project_id_(project_id),
metric_id_(metric_id),
report_id_string_(std::move(report_id_string)),
start_day_index_(start_day_index),
end_day_index_(end_day_index),
parts_(std::move(parts)),
included_system_profile_fields_(included_system_profile_fields),
observation_store_(observation_store) {
std::ostringstream stream;
stream << "(" << customer_id << ", " << project_id << ", " << metric_id
<< ")";
std::string metric_id_string = stream.str();
const Metric* metric =
analyzer_config->Metric(customer_id_, project_id_, metric_id_);
if (!metric) {
LOG_STACKDRIVER_COUNT_METRIC(ERROR, kRawDumpReportError)
<< "Metric " << metric_id_string
<< " not found in the the AnalyzerConfig, when initializing a "
"RawDumpReportRowIterator for report_id="
<< report_id_string_;
return;
}
for (const auto& part_name : parts_) {
auto it = metric->parts().find(part_name);
if (it == metric->parts().cend()) {
LOG_STACKDRIVER_COUNT_METRIC(ERROR, kRawDumpReportError)
<< "Metric part '" << part_name << "' not found in Metric "
<< metric_id_string
<< " when initializing a RawDumpReportRowIterator for report_id="
<< report_id_string_;
return;
}
expected_data_types_.push_back(it->second.data_type());
}
std::string parts_string;
if (VLOG_IS_ON(1)) {
std::ostringstream stream;
bool first = true;
for (const std::string& part : parts_) {
if (first) {
first = false;
} else {
stream << ", ";
}
stream << part;
}
parts_string.assign(stream.str());
}
VLOG(1) << "RawDumpReportRowIterator: Initialized for report_id="
<< report_id_string_ << "with metric " << metric_id_string
<< " day range =[" << start_day_index << ", " << end_day_index
<< "] parts=[" << parts_string << "]";
}
grpc::Status RawDumpReportRowIterator::Reset() {
have_query_response_ = false;
have_next_row_ = false;
eof_ = false;
result_index_ = -1;
return grpc::Status::OK;
}
grpc::Status RawDumpReportRowIterator::NextRow(const ReportRow** row) {
TryEnsureHaveNextRow();
if (have_next_row_) {
current_row_.Swap(&next_row_);
have_next_row_ = false;
*row = &current_row_;
return grpc::Status::OK;
}
if (query_response_.status != store::kOK) {
return grpc::Status(grpc::INTERNAL,
"QueryObservations() returned error status.");
}
return grpc::Status(grpc::NOT_FOUND, "eof");
}
grpc::Status RawDumpReportRowIterator::HasMoreRows(bool* b) {
CHECK(b);
TryEnsureHaveNextRow();
if (have_next_row_) {
*b = true;
return grpc::Status::OK;
}
if (query_response_.status != store::kOK) {
return grpc::Status(grpc::INTERNAL,
"QueryObservations() returned error status.");
}
*b = false;
return grpc::Status::OK;
}
void RawDumpReportRowIterator::TryEnsureHaveNextRow() {
ValidateState();
if (have_next_row_ || eof_) {
return;
}
if (!have_query_response_) {
QueryObservations("");
}
if (query_response_.status != store::kOK) {
have_next_row_ = false;
return;
}
// Keep trying to build next_row_ until we succeed or reach EOF or encounter
// a query error. In the case that we encounter an invalid input row (one
// that cannot be converted to next_row_) then we LOG(ERROR) but keep going.
while (true) {
// Advance to the next input result.
result_index_++;
// If we have used up all the results in the current query_response_
// then perform another query.
if (result_index_ >= static_cast<int>(query_response_.results.size())) {
if (query_response_.pagination_token.empty()) {
eof_ = true;
return;
}
QueryObservations(query_response_.pagination_token);
if (query_response_.status != store::kOK) {
return;
}
if (query_response_.results.empty()) {
eof_ = true;
return;
}
result_index_ = 0;
}
TryBuildNextRow();
if (have_next_row_) {
return;
}
}
}
void RawDumpReportRowIterator::TryBuildNextRow() {
have_next_row_ = false;
if (!have_query_response_ ||
result_index_ >= static_cast<int>(query_response_.results.size())) {
LOG_STACKDRIVER_COUNT_METRIC(ERROR, kRawDumpReportError)
<< "Internal logic error. "
"TryBuildNextRow() invoked with no available input row.";
return;
}
next_row_.Clear();
RawDumpReportRow* dump = next_row_.mutable_raw_dump();
Observation& observation = query_response_.results[result_index_].observation;
dump->set_allocated_system_profile(
query_response_.results[result_index_].metadata.release_system_profile());
size_t part_index = 0;
for (const auto& part_name : parts_) {
auto part_iter = observation.parts().find(part_name);
if (part_iter == observation.parts().cend()) {
LOG_STACKDRIVER_COUNT_METRIC(ERROR, kRawDumpReportError)
<< "Encountered an Observation that was "
"missing a part while processing a RAW_DUMP report."
<< " For report_id=" << report_id_string_ << ", part=" << part_name;
return;
}
auto observation_part = part_iter->second;
if (observation_part.value_case() != ObservationPart::kUnencoded) {
LOG_STACKDRIVER_COUNT_METRIC(ERROR, kRawDumpReportError)
<< "Encountered an ObservationPart that "
"did not use the no-op encoding while processing a "
"RAW_DUMP report."
<< " For report_id=" << report_id_string_ << ", part=" << part_name
<< ". value_case=" << observation_part.value_case();
return;
}
ValuePart* unencoded_value =
observation_part.mutable_unencoded()->mutable_unencoded_value();
MetricPart::DataType value_data_type;
switch (unencoded_value->data_case()) {
case ValuePart::kStringValue:
value_data_type = MetricPart::STRING;
break;
case ValuePart::kIntValue:
value_data_type = MetricPart::INT;
break;
case ValuePart::kDoubleValue:
value_data_type = MetricPart::DOUBLE;
break;
case ValuePart::kBlobValue:
value_data_type = MetricPart::BLOB;
break;
case ValuePart::kIndexValue: {
value_data_type = MetricPart::INDEX;
break;
}
default: {
LOG_STACKDRIVER_COUNT_METRIC(ERROR, kRawDumpReportError)
<< "Encountered an unrecognized "
"ValuePart data type while processing a RAW_DUMP report. "
"For report_id="
<< report_id_string_ << ", part=" << part_name;
return;
}
}
auto expected_type = expected_data_types_[part_index++];
if (value_data_type != expected_type) {
LOG_STACKDRIVER_COUNT_METRIC(ERROR, kRawDumpReportError)
<< "Encountered the wrong ValuePart "
"data type while processing a RAW_DUMP report. For "
"report_id="
<< report_id_string_ << ", part=" << part_name
<< " expected type=" << expected_type
<< " value type=" << value_data_type;
return;
}
dump->add_values()->Swap(unencoded_value);
}
have_next_row_ = true;
}
void RawDumpReportRowIterator::QueryObservations(std::string pagination_token) {
static const size_t kMaxResultsPerQuery = 1000;
query_response_ = observation_store_->QueryObservations(
customer_id_, project_id_, metric_id_, start_day_index_, end_day_index_,
parts_, included_system_profile_fields_, kMaxResultsPerQuery,
pagination_token);
if (query_response_.status != store::kOK) {
LOG_STACKDRIVER_COUNT_METRIC(ERROR, kRawDumpReportError)
<< "QueryObservations() returned error "
"status: "
<< query_response_.status << ". For report_id=" << report_id_string_;
}
have_query_response_ = true;
}
void RawDumpReportRowIterator::ValidateState() {
bool valid = true;
if (parts_.empty()) {
LOG_STACKDRIVER_COUNT_METRIC(ERROR, kRawDumpReportError)
<< "Config for RAW_DUMP report did not specify any variables to dump."
". For report_id="
<< report_id_string_;
valid = false;
}
if (expected_data_types_.size() != parts_.size()) {
LOG_STACKDRIVER_COUNT_METRIC(ERROR, kRawDumpReportError)
<< "Not all of the specified metric parts were found in the Metric "
"when initializing this RawDumpReportRowIterator for report_id="
<< report_id_string_
<< ". num found parts=" << expected_data_types_.size()
<< ", num expected parts=" << parts_.size();
valid = false;
}
if (!valid) {
have_next_row_ = false;
eof_ = false;
have_query_response_ = true;
query_response_.status = store::kOperationFailed;
}
}
} // namespace analyzer
} // namespace cobalt