blob: d706209a6a94a4c2415502cafb53c5fb7df7c3e2 [file] [log] [blame]
// Copyright 2017 The Fuchsia Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// The report_master periodically scans the database, decodes any observations,
// and
// publishes them.
#include "analyzer/report_master/report_master_service.h"
#include <memory>
#include <string>
#include <utility>
#include <vector>
#include "analyzer/report_master/report_executor.h"
#include "analyzer/report_master/report_generator.h"
#include "analyzer/store/bigtable_store.h"
#include "analyzer/store/data_store.h"
#include "config/analyzer_config.h"
#include "glog/logging.h"
#include "util/crypto_util/base64.h"
namespace cobalt {
namespace analyzer {
using config::AnalyzerConfig;
using crypto::Base64Decode;
using crypto::Base64Encode;
using grpc::ServerContext;
using grpc::ServerWriter;
using grpc::WriteOptions;
using store::BigtableStore;
using store::DataStore;
using store::ObservationStore;
using store::ReportStore;
DEFINE_int32(port, 0,
"The port that the ReportMaster Service should listen on.");
DEFINE_string(tls_info, "", "TBD: Some info about TLS.");
namespace {
// Builds the string form of a report_id used in the public ReportMasterService
// API from the ReportId message used in the internal API to ReportStore.
grpc::Status ReportIdToString(const ReportId& report_id,
std::string* id_string_out) {
std::string serialized_id;
if (!report_id.SerializeToString(&serialized_id)) {
// Note(rudominer) This is just for completeness. I expect this to never
// happen.
LOG(ERROR) << "ReportId serialization failed: "
<< ReportStore::ToString(report_id);
return grpc::Status(grpc::ABORTED, "Unable to build report_id string");
}
if (!Base64Encode(serialized_id, id_string_out)) {
// Note(rudominer) This is just for completeness. I expect this to never
// happen.
LOG(ERROR) << "Base64Encode failed: " << ReportStore::ToString(report_id);
return grpc::Status(grpc::ABORTED, "Unable to build report_id string");
}
return grpc::Status::OK;
}
// Builds the ReportId message used in the internal ReportStore API from the
// string form of a report_id used in the public ReportMaster API.
grpc::Status ReportIdFromString(const std::string& id_string,
ReportId* report_id_out) {
std::string serialized_id;
if (!Base64Decode(id_string, &serialized_id)) {
LOG(ERROR) << "Base64Encode failed: " << id_string;
return grpc::Status(grpc::INVALID_ARGUMENT, "Bad report_id.");
}
if (!report_id_out->ParseFromString(serialized_id)) {
LOG(ERROR) << "ParseFromString failed: " << id_string;
return grpc::Status(grpc::INVALID_ARGUMENT, "Bad report_id.");
}
return grpc::Status::OK;
}
// Builds a ReportMetadata to be returned to a client of the public
// ReportMaster API, extracting data from the arguments. The |metadata_lite|
// argument will be modified as some data will be swapped out of it.
// Returns OK on success or an error status.
grpc::Status MakeReportMetadata(const std::string& report_id_string,
const ReportId& report_id,
const ReportConfig* report_config,
ReportMetadataLite* metadata_lite,
ReportMetadata* metadata) {
metadata->set_report_id(report_id_string);
metadata->set_customer_id(report_id.customer_id());
metadata->set_project_id(report_id.project_id());
metadata->set_report_config_id(report_id.report_config_id());
metadata->set_state(metadata_lite->state());
metadata->mutable_creation_time()->set_seconds(
report_id.creation_time_seconds());
// Copy the start_time and finish_time as appropriate.
switch (metadata->state()) {
case WAITING_TO_START:
break;
case IN_PROGRESS:
metadata->mutable_start_time()->set_seconds(
metadata_lite->start_time_seconds());
break;
case COMPLETED_SUCCESSFULLY:
case TERMINATED:
metadata->mutable_start_time()->set_seconds(
metadata_lite->start_time_seconds());
metadata->mutable_finish_time()->set_seconds(
metadata_lite->finish_time_seconds());
break;
default: {
std::ostringstream stream;
stream << "Bad metadata found for report_id="
<< ReportStore::ToString(report_id)
<< ". Unrecognized state: " << metadata->state();
std::string message = stream.str();
LOG(ERROR) << message;
return grpc::Status(grpc::FAILED_PRECONDITION, message);
}
}
metadata->set_first_day_index(metadata_lite->first_day_index());
metadata->set_last_day_index(metadata_lite->last_day_index());
metadata->set_report_type(metadata_lite->report_type());
if (metadata_lite->variable_indices_size() == 0) {
std::ostringstream stream;
stream << "Invalid metadata, no variable indices for report_id="
<< ReportStore::ToString(report_id);
std::string message = stream.str();
LOG(ERROR) << message;
return grpc::Status(grpc::FAILED_PRECONDITION, message);
}
// Set the metric parts.
for (int index : metadata_lite->variable_indices()) {
if (index >= report_config->variable_size()) {
std::ostringstream stream;
stream << "Invalid variable index encountered while processing report_id="
<< ReportStore::ToString(report_id) << ". index=" << index
<< ". variable_size=" << report_config->variable_size();
std::string message = stream.str();
LOG(ERROR) << message;
return grpc::Status(grpc::FAILED_PRECONDITION, message);
}
metadata->add_metric_parts(report_config->variable(index).metric_part());
}
// Add the associated_report_ids as appropriate. Currently we do this only
// in the case tht the report type is JOINT. In this case the ReportId's
// sequence_num should be 2 and we add as associated reports the ReportIDs
// with sequence_nums 0 and 1 which should be the two one-way marginals.
if (metadata->report_type() == JOINT) {
if (report_id.sequence_num() != 2) {
std::ostringstream stream;
stream << "Inconsistent metadata encountered while processing report_id="
<< ReportStore::ToString(report_id)
<< ". sequence_num=" << report_id.sequence_num()
<< " but report_type == JOINT.";
std::string message = stream.str();
LOG(ERROR) << message;
return grpc::Status(grpc::FAILED_PRECONDITION, message);
}
ReportId associated_id = report_id;
// Make the ID for the marginal report for variable 1.
associated_id.set_sequence_num(0);
auto status =
ReportIdToString(associated_id, metadata->add_associated_report_ids());
if (!status.ok()) {
// This is just for completeness. We don't expect it to ever happen.
LOG(ERROR) << "ReportIdToString failed unexpectedly.";
return status;
}
// Make the ID for the marginal report for variable 2.
associated_id.set_sequence_num(1);
status =
ReportIdToString(associated_id, metadata->add_associated_report_ids());
if (!status.ok()) {
// This is just for completeness. We don't expect it to ever happen.
LOG(ERROR) << "ReportIdToString failed unexpectedly.";
return status;
}
}
metadata->set_one_off(metadata_lite->one_off());
metadata->mutable_info_messages()->Swap(
metadata_lite->mutable_info_messages());
return grpc::Status::OK;
}
} // namespace
std::unique_ptr<ReportMasterService>
ReportMasterService::CreateFromFlagsOrDie() {
std::shared_ptr<DataStore> data_store(
BigtableStore::CreateFromFlagsOrDie().release());
std::shared_ptr<ObservationStore> observation_store(
new ObservationStore(data_store));
std::shared_ptr<ReportStore> report_store(new ReportStore(data_store));
std::shared_ptr<AnalyzerConfig> analyzer_config(
AnalyzerConfig::CreateFromFlagsOrDie().release());
CHECK(FLAGS_port) << "--port is a mandatory flag";
std::shared_ptr<grpc::ServerCredentials> server_credentials;
if (FLAGS_tls_info.empty()) {
LOG(WARNING) << "WARNING: Using insecure server credentials. Pass "
"-tls_info to enable TLS.";
server_credentials = grpc::InsecureServerCredentials();
} else {
grpc::SslServerCredentialsOptions options;
// TODO(rudominer) Set up options based on FLAGS_tls_info.
server_credentials = grpc::SslServerCredentials(options);
}
return std::unique_ptr<ReportMasterService>(
new ReportMasterService(FLAGS_port, observation_store, report_store,
analyzer_config, server_credentials));
}
ReportMasterService::ReportMasterService(
int port, std::shared_ptr<store::ObservationStore> observation_store,
std::shared_ptr<store::ReportStore> report_store,
std::shared_ptr<config::AnalyzerConfig> analyzer_config,
std::shared_ptr<grpc::ServerCredentials> server_credentials)
: port_(port),
observation_store_(observation_store),
report_store_(report_store),
analyzer_config_(analyzer_config),
report_executor_(new ReportExecutor(
report_store_,
std::unique_ptr<ReportGenerator>(new ReportGenerator(
analyzer_config_, observation_store_, report_store_)))),
server_credentials_(server_credentials) {}
void ReportMasterService::Start() {
// Start the ReportExecutor worker thread.
StartWorkerThread();
grpc::ServerBuilder builder;
char local_address[1024];
// We use 0.0.0.0 to indicate the wildcard interface.
snprintf(local_address, sizeof(local_address), "0.0.0.0:%d", port_);
builder.AddListeningPort(local_address, server_credentials_);
builder.RegisterService(this);
server_ = builder.BuildAndStart();
LOG(INFO) << "Starting ReportMaster service on port " << port_;
}
void ReportMasterService::Shutdown() {
// TODO(rudominer) Stop accepting further requests during shutdown.
// Wait until all current report generation finishes.
WaitUntilIdle();
// Stop the ReportExecutor worker thread.
report_executor_.reset();
server_->Shutdown();
}
void ReportMasterService::Wait() { server_->Wait(); }
grpc::Status ReportMasterService::StartReport(ServerContext* context,
const StartReportRequest* request,
StartReportResponse* response) {
CHECK(request);
CHECK(response);
response->Clear();
uint32_t customer_id = request->customer_id();
uint32_t project_id = request->project_id();
uint32_t report_config_id = request->report_config_id();
// Fetch the ReportConfig from the registry and validate it.
const ReportConfig* report_config;
auto status = GetAndValidateReportConfig(customer_id, project_id,
report_config_id, &report_config);
if (!status.ok()) {
return status;
}
// Set up the fields of the ReportId.
ReportId report_id;
report_id.set_customer_id(customer_id);
report_id.set_project_id(project_id);
report_id.set_report_config_id(report_config_id);
switch (report_config->report_type()) {
case HISTOGRAM:
return StartHistogramReport(*request, &report_id, response);
break;
case JOINT:
return StartJointReport(*request, &report_id, response);
break;
default:
std::ostringstream stream;
stream << "Bad ReportConfig found with id=" << report_config_id
<< ". Unrecognized report type: " << report_config->report_type();
std::string message = stream.str();
LOG(ERROR) << message;
return grpc::Status(grpc::FAILED_PRECONDITION, message);
}
}
grpc::Status ReportMasterService::StartHistogramReport(
const StartReportRequest& request, ReportId* report_id,
StartReportResponse* response) {
// We will be creating and starting one report only.
report_id->set_sequence_num(0);
std::vector<uint32_t> variable_indices = {0};
auto status = StartNewReport(request, HISTOGRAM, variable_indices, report_id);
if (!status.ok()) {
return status;
}
// Build the public report_id string to return in the repsonse.
status = ReportIdToString(*report_id, response->mutable_report_id());
if (!status.ok()) {
return status;
}
// Finally enqueue the chain of one report to be generated.
std::vector<ReportId> report_chain(1);
report_chain[0] = *report_id;
return report_executor_->EnqueueReportGeneration(report_chain);
}
grpc::Status ReportMasterService::StartJointReport(
const StartReportRequest& request, ReportId* report_id,
StartReportResponse* response) {
// We will be creating three reports all together and starting the first one.
std::vector<ReportId> report_chain(3);
// First we create and start the HISTOGRAM report for the first marginal.
report_id->set_sequence_num(0);
std::vector<uint32_t> variable_indices = {0}; // Specify the first variable.
auto status = StartNewReport(request, HISTOGRAM, variable_indices, report_id);
if (!status.ok()) {
return status;
}
report_chain[0] = *report_id;
// Second we create, but don't yet start, the HISTOGRAM report for the second
// marginal.
variable_indices = {1}; // Specify the second variable
size_t sequence_number = 1;
// This call will modify report_id to specify the new sequence_number.
status = CreateDependentReport(sequence_number, HISTOGRAM, variable_indices,
report_id);
if (!status.ok()) {
return status;
}
report_chain[1] = *report_id;
// Third we create, but don't yet start, the JOINT report.
variable_indices = {0, 1}; // Specify both variables.
sequence_number = 2;
// This call will modify report_id to specify the new sequence_number.
status = CreateDependentReport(sequence_number, JOINT, variable_indices,
report_id);
if (!status.ok()) {
return status;
}
report_chain[2] = *report_id;
// Build the public report_id string to return in the repsonse. We return the
// report_id of the joint report as this is the primary report the user is
// interested in. He can learn the IDs of the marginal reports by invoking
// GetReport() on the primary report and inspecting the
// |associated_report_ids| in the ReportMetadata in that response.
status = ReportIdToString(*report_id, response->mutable_report_id());
if (!status.ok()) {
return status;
}
// Finally enqueue the chain of reports to be generated.
return report_executor_->EnqueueReportGeneration(report_chain);
}
grpc::Status ReportMasterService::GetReport(ServerContext* context,
const GetReportRequest* request,
Report* response) {
CHECK(request);
CHECK(response);
response->Clear();
// Parse the report_id.
ReportId report_id;
auto status = ReportIdFromString(request->report_id(), &report_id);
if (!status.ok()) {
return status;
}
// Fetch the metadata and possibly the rows from the ReportStore.
ReportMetadataLite metadata_lite;
ReportRows report_rows;
status = GetReport(report_id, &metadata_lite, &report_rows);
if (!status.ok()) {
return status;
}
// Fetch the ReportConfig from the registry and validate it.
const ReportConfig* report_config;
status = GetAndValidateReportConfig(
report_id.customer_id(), report_id.project_id(),
report_id.report_config_id(), &report_config);
if (!status.ok()) {
return status;
}
// Build the ReportMetadata in the response.
auto metadata = response->mutable_metadata();
status = MakeReportMetadata(request->report_id(), report_id, report_config,
&metadata_lite, metadata);
if (!status.ok()) {
return status;
}
// Swap over the actual report rows if the report completed successfully.
if (metadata->state() == COMPLETED_SUCCESSFULLY) {
response->mutable_rows()->Swap(&report_rows);
}
return grpc::Status::OK;
}
grpc::Status ReportMasterService::QueryReports(
ServerContext* context, const QueryReportsRequest* request,
ServerWriter<QueryReportsResponse>* writer) {
return QueryReportsInternal(context, request, writer);
}
grpc::Status ReportMasterService::QueryReportsInternal(
ServerContext* context, const QueryReportsRequest* request,
grpc::WriterInterface<QueryReportsResponse>* writer) {
CHECK(request);
CHECK(writer);
// The max number of ReportMetadata we send back in each QueryReportsResponse.
static const size_t kBatchSize = 100;
// Extract the fields of the request.
uint32_t customer_id = request->customer_id();
uint32_t project_id = request->project_id();
uint32_t report_config_id = request->report_config_id();
int64_t interval_start_time_seconds = request->first_timestamp().seconds();
int64_t interval_limit_time_seconds = request->limit_timestamp().seconds();
if (request->limit_timestamp().nanos() > 0) {
interval_limit_time_seconds++;
}
// Query the store and return the results in batches of size kBatchSize.
ReportStore::QueryReportsResponse store_response;
do {
// Query one batch from the store, passing in the pagination_token from
// the previous time through this loop.
store_response = report_store_->QueryReports(
customer_id, project_id, report_config_id, interval_start_time_seconds,
interval_limit_time_seconds, kBatchSize,
store_response.pagination_token);
if (store_response.status != store::kOK) {
LOG(ERROR) << "Read failed during QueryReports.";
return grpc::Status(grpc::ABORTED, "Read failed.");
}
// Iterate through the batch, building up |rpc_response|.
QueryReportsResponse rpc_response;
for (auto& store_result : store_response.results) {
// Build the public report_id string.
std::string public_report_id_string;
auto status =
ReportIdToString(store_result.report_id, &public_report_id_string);
if (!status.ok()) {
return status;
}
// Fetch the ReportConfig from the registry and validate it.
const ReportConfig* report_config;
status = GetAndValidateReportConfig(customer_id, project_id,
report_config_id, &report_config);
if (!status.ok()) {
return status;
}
// Build the ReportMetadata in the response.
status = MakeReportMetadata(
public_report_id_string, store_result.report_id, report_config,
&store_result.report_metadata, rpc_response.add_reports());
if (!status.ok()) {
return status;
}
}
// Send |rpc_response| containing the current batch back to the client.
if (!writer->Write(rpc_response)) {
LOG(ERROR) << "Stream closed while writing response from QueryReports.";
return grpc::Status(grpc::ABORTED, "Stream closed.");
}
} while (!store_response.pagination_token.empty());
return grpc::Status::OK;
}
/////////// private methods ////////////////
grpc::Status ReportMasterService::GetAndValidateReportConfig(
uint32_t customer_id, uint32_t project_id, uint32_t report_config_id,
const ReportConfig** report_config_out) {
// Fetch the ReportConfig from the registry.
*report_config_out =
analyzer_config_->ReportConfig(customer_id, project_id, report_config_id);
if (!*report_config_out) {
std::ostringstream stream;
stream << "No ReportConfig found with id=(" << customer_id << ", "
<< project_id << ", " << report_config_id << ")";
std::string message = stream.str();
LOG(ERROR) << message;
return grpc::Status(grpc::NOT_FOUND, message);
}
// Make sure it has either one or two variables.
size_t num_variables = (*report_config_out)->variable_size();
if (num_variables == 0 || num_variables > 2) {
std::ostringstream stream;
stream << "The ReportConfig with id=(" << customer_id << ", " << project_id
<< ", " << report_config_id
<< ") is invalid. Number of variables=" << num_variables
<< ". Cobalt ReportConfigs may have either one or two variables.";
std::string message = stream.str();
LOG(ERROR) << message;
return grpc::Status(grpc::FAILED_PRECONDITION, message);
}
return grpc::Status::OK;
}
grpc::Status ReportMasterService::StartNewReport(
const StartReportRequest& request, ReportType report_type,
const std::vector<uint32_t>& variable_indices, ReportId* report_id) {
// Invoke ReportStore::StartNewReport().
auto store_status = report_store_->StartNewReport(
request.first_day_index(), request.last_day_index(), true, report_type,
variable_indices, report_id);
// Log(ERROR) if not OK.
if (store_status != store::kOK) {
std::ostringstream stream;
stream << "StartNewReport failed with status=" << store_status
<< " for report_id=" << ReportStore::ToString(*report_id);
std::string message = stream.str();
LOG(ERROR) << message;
return grpc::Status(grpc::ABORTED, message);
}
return grpc::Status::OK;
}
grpc::Status ReportMasterService::CreateDependentReport(
uint32_t sequence_number, ReportType report_type,
const std::vector<uint32_t>& variable_indices, ReportId* report_id) {
auto store_status = report_store_->CreateDependentReport(
sequence_number, report_type, variable_indices, report_id);
// LOG(ERROR) if not OK.
if (store_status != store::kOK) {
std::ostringstream stream;
stream << "CreateDependentReport failed with status=" << store_status
<< " for report_id=" << ReportStore::ToString(*report_id);
std::string message = stream.str();
LOG(ERROR) << message;
return grpc::Status(grpc::ABORTED, message);
}
return grpc::Status::OK;
}
grpc::Status ReportMasterService::GetReport(const ReportId& report_id,
ReportMetadataLite* metadata_out,
ReportRows* report_out) {
// Invoke ReportStore::GetMetadata
auto store_status =
report_store_->GetReport(report_id, metadata_out, report_out);
// LOG(ERROR) if not OK.
if (store_status != store::kOK) {
std::ostringstream stream;
stream << "GetReport failed with status=" << store_status
<< " for report_id=" << ReportStore::ToString(report_id);
std::string message = stream.str();
LOG(ERROR) << message;
return grpc::Status(grpc::ABORTED, message);
}
return grpc::Status::OK;
}
std::string ReportMasterService::MakeStringReportId(const ReportId& report_id) {
std::string string_id_out;
ReportIdToString(report_id, &string_id_out);
return string_id_out;
}
void ReportMasterService::StartWorkerThread() { report_executor_->Start(); }
void ReportMasterService::WaitUntilIdle() { report_executor_->WaitUntilIdle(); }
} // namespace analyzer
} // namespace cobalt