| // Copyright 2018 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "analyzer/report_master/raw_dump_reports.h" |
| |
| #include <memory> |
| #include <string> |
| #include <utility> |
| #include <vector> |
| |
| #include "glog/logging.h" |
| #include "util/log_based_metrics.h" |
| |
| namespace cobalt { |
| namespace analyzer { |
| |
| using config::AnalyzerConfig; |
| using store::ObservationStore; |
| |
| // Stackdriver metric constants |
| namespace { |
| const char kRawDumpReportError[] = "raw-dump-report-error"; |
| } // namespace |
| |
| RawDumpReportRowIterator::RawDumpReportRowIterator( |
| uint32_t customer_id, uint32_t project_id, uint32_t metric_id, |
| uint32_t start_day_index, uint32_t end_day_index, |
| std::vector<std::string> parts, |
| const SystemProfileFields& included_system_profile_fields, |
| std::string report_id_string, |
| std::shared_ptr<store::ObservationStore> observation_store, |
| std::shared_ptr<AnalyzerConfig> analyzer_config) |
| : customer_id_(customer_id), |
| project_id_(project_id), |
| metric_id_(metric_id), |
| report_id_string_(std::move(report_id_string)), |
| start_day_index_(start_day_index), |
| end_day_index_(end_day_index), |
| parts_(std::move(parts)), |
| included_system_profile_fields_(included_system_profile_fields), |
| observation_store_(observation_store) { |
| std::ostringstream stream; |
| stream << "(" << customer_id << ", " << project_id << ", " << metric_id |
| << ")"; |
| std::string metric_id_string = stream.str(); |
| const Metric* metric = |
| analyzer_config->Metric(customer_id_, project_id_, metric_id_); |
| if (!metric) { |
| LOG_STACKDRIVER_COUNT_METRIC(ERROR, kRawDumpReportError) |
| << "Metric " << metric_id_string |
| << " not found in the the AnalyzerConfig, when initializing a " |
| "RawDumpReportRowIterator for report_id=" |
| << report_id_string_; |
| return; |
| } |
| for (const auto& part_name : parts_) { |
| auto it = metric->parts().find(part_name); |
| if (it == metric->parts().cend()) { |
| LOG_STACKDRIVER_COUNT_METRIC(ERROR, kRawDumpReportError) |
| << "Metric part '" << part_name << "' not found in Metric " |
| << metric_id_string |
| << " when initializing a RawDumpReportRowIterator for report_id=" |
| << report_id_string_; |
| return; |
| } |
| expected_data_types_.push_back(it->second.data_type()); |
| } |
| std::string parts_string; |
| if (VLOG_IS_ON(1)) { |
| std::ostringstream stream; |
| bool first = true; |
| for (const std::string& part : parts_) { |
| if (first) { |
| first = false; |
| } else { |
| stream << ", "; |
| } |
| stream << part; |
| } |
| parts_string.assign(stream.str()); |
| } |
| VLOG(1) << "RawDumpReportRowIterator: Initialized for report_id=" |
| << report_id_string_ << "with metric " << metric_id_string |
| << " day range =[" << start_day_index << ", " << end_day_index |
| << "] parts=[" << parts_string << "]"; |
| } |
| |
| grpc::Status RawDumpReportRowIterator::Reset() { |
| have_query_response_ = false; |
| have_next_row_ = false; |
| eof_ = false; |
| result_index_ = -1; |
| return grpc::Status::OK; |
| } |
| |
| grpc::Status RawDumpReportRowIterator::NextRow(const ReportRow** row) { |
| TryEnsureHaveNextRow(); |
| if (have_next_row_) { |
| current_row_.Swap(&next_row_); |
| have_next_row_ = false; |
| *row = ¤t_row_; |
| return grpc::Status::OK; |
| } |
| if (query_response_.status != store::kOK) { |
| return grpc::Status(grpc::INTERNAL, |
| "QueryObservations() returned error status."); |
| } |
| return grpc::Status(grpc::NOT_FOUND, "eof"); |
| } |
| |
| grpc::Status RawDumpReportRowIterator::HasMoreRows(bool* b) { |
| CHECK(b); |
| TryEnsureHaveNextRow(); |
| if (have_next_row_) { |
| *b = true; |
| return grpc::Status::OK; |
| } |
| if (query_response_.status != store::kOK) { |
| return grpc::Status(grpc::INTERNAL, |
| "QueryObservations() returned error status."); |
| } |
| *b = false; |
| return grpc::Status::OK; |
| } |
| |
| void RawDumpReportRowIterator::TryEnsureHaveNextRow() { |
| ValidateState(); |
| if (have_next_row_ || eof_) { |
| return; |
| } |
| if (!have_query_response_) { |
| QueryObservations(""); |
| } |
| if (query_response_.status != store::kOK) { |
| have_next_row_ = false; |
| return; |
| } |
| // Keep trying to build next_row_ until we succeed or reach EOF or encounter |
| // a query error. In the case that we encounter an invalid input row (one |
| // that cannot be converted to next_row_) then we LOG(ERROR) but keep going. |
| while (true) { |
| // Advance to the next input result. |
| result_index_++; |
| // If we have used up all the results in the current query_response_ |
| // then perform another query. |
| if (result_index_ >= static_cast<int>(query_response_.results.size())) { |
| if (query_response_.pagination_token.empty()) { |
| eof_ = true; |
| return; |
| } |
| QueryObservations(query_response_.pagination_token); |
| if (query_response_.status != store::kOK) { |
| return; |
| } |
| if (query_response_.results.empty()) { |
| eof_ = true; |
| return; |
| } |
| result_index_ = 0; |
| } |
| TryBuildNextRow(); |
| if (have_next_row_) { |
| return; |
| } |
| } |
| } |
| |
| void RawDumpReportRowIterator::TryBuildNextRow() { |
| have_next_row_ = false; |
| if (!have_query_response_ || |
| result_index_ >= static_cast<int>(query_response_.results.size())) { |
| LOG_STACKDRIVER_COUNT_METRIC(ERROR, kRawDumpReportError) |
| << "Internal logic error. " |
| "TryBuildNextRow() invoked with no available input row."; |
| return; |
| } |
| next_row_.Clear(); |
| RawDumpReportRow* dump = next_row_.mutable_raw_dump(); |
| Observation& observation = query_response_.results[result_index_].observation; |
| dump->set_allocated_system_profile( |
| query_response_.results[result_index_].metadata.release_system_profile()); |
| size_t part_index = 0; |
| for (const auto& part_name : parts_) { |
| auto part_iter = observation.parts().find(part_name); |
| if (part_iter == observation.parts().cend()) { |
| LOG_STACKDRIVER_COUNT_METRIC(ERROR, kRawDumpReportError) |
| << "Encountered an Observation that was " |
| "missing a part while processing a RAW_DUMP report." |
| << " For report_id=" << report_id_string_ << ", part=" << part_name; |
| return; |
| } |
| auto observation_part = part_iter->second; |
| if (observation_part.value_case() != ObservationPart::kUnencoded) { |
| LOG_STACKDRIVER_COUNT_METRIC(ERROR, kRawDumpReportError) |
| << "Encountered an ObservationPart that " |
| "did not use the no-op encoding while processing a " |
| "RAW_DUMP report." |
| << " For report_id=" << report_id_string_ << ", part=" << part_name |
| << ". value_case=" << observation_part.value_case(); |
| return; |
| } |
| ValuePart* unencoded_value = |
| observation_part.mutable_unencoded()->mutable_unencoded_value(); |
| MetricPart::DataType value_data_type; |
| switch (unencoded_value->data_case()) { |
| case ValuePart::kStringValue: |
| value_data_type = MetricPart::STRING; |
| break; |
| |
| case ValuePart::kIntValue: |
| value_data_type = MetricPart::INT; |
| break; |
| |
| case ValuePart::kDoubleValue: |
| value_data_type = MetricPart::DOUBLE; |
| break; |
| |
| case ValuePart::kBlobValue: |
| value_data_type = MetricPart::BLOB; |
| break; |
| |
| case ValuePart::kIndexValue: { |
| value_data_type = MetricPart::INDEX; |
| break; |
| } |
| default: { |
| LOG_STACKDRIVER_COUNT_METRIC(ERROR, kRawDumpReportError) |
| << "Encountered an unrecognized " |
| "ValuePart data type while processing a RAW_DUMP report. " |
| "For report_id=" |
| << report_id_string_ << ", part=" << part_name; |
| return; |
| } |
| } |
| auto expected_type = expected_data_types_[part_index++]; |
| if (value_data_type != expected_type) { |
| LOG_STACKDRIVER_COUNT_METRIC(ERROR, kRawDumpReportError) |
| << "Encountered the wrong ValuePart " |
| "data type while processing a RAW_DUMP report. For " |
| "report_id=" |
| << report_id_string_ << ", part=" << part_name |
| << " expected type=" << expected_type |
| << " value type=" << value_data_type; |
| return; |
| } |
| dump->add_values()->Swap(unencoded_value); |
| } |
| |
| have_next_row_ = true; |
| } |
| |
| void RawDumpReportRowIterator::QueryObservations(std::string pagination_token) { |
| static const size_t kMaxResultsPerQuery = 1000; |
| query_response_ = observation_store_->QueryObservations( |
| customer_id_, project_id_, metric_id_, start_day_index_, end_day_index_, |
| parts_, included_system_profile_fields_, kMaxResultsPerQuery, |
| pagination_token); |
| if (query_response_.status != store::kOK) { |
| LOG_STACKDRIVER_COUNT_METRIC(ERROR, kRawDumpReportError) |
| << "QueryObservations() returned error " |
| "status: " |
| << query_response_.status << ". For report_id=" << report_id_string_; |
| } |
| have_query_response_ = true; |
| } |
| |
| void RawDumpReportRowIterator::ValidateState() { |
| bool valid = true; |
| if (parts_.empty()) { |
| LOG_STACKDRIVER_COUNT_METRIC(ERROR, kRawDumpReportError) |
| << "Config for RAW_DUMP report did not specify any variables to dump." |
| ". For report_id=" |
| << report_id_string_; |
| valid = false; |
| } |
| if (expected_data_types_.size() != parts_.size()) { |
| LOG_STACKDRIVER_COUNT_METRIC(ERROR, kRawDumpReportError) |
| << "Not all of the specified metric parts were found in the Metric " |
| "when initializing this RawDumpReportRowIterator for report_id=" |
| << report_id_string_ |
| << ". num found parts=" << expected_data_types_.size() |
| << ", num expected parts=" << parts_.size(); |
| valid = false; |
| } |
| if (!valid) { |
| have_next_row_ = false; |
| eof_ = false; |
| have_query_response_ = true; |
| query_response_.status = store::kOperationFailed; |
| } |
| } |
| |
| } // namespace analyzer |
| } // namespace cobalt |