blob: 4d808575ba022ba112833ad7ae365eea22f7fa9d [file] [log] [blame]
// Copyright 2016 The Fuchsia Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// The analyzer collector process receives reports via gRPC and stores them
// persistently.
#include "analyzer/analyzer_service/analyzer_service.h"
#include <gflags/gflags.h>
#include <glog/logging.h>
#include <string>
#include <vector>
#include "./observation.pb.h"
#include "analyzer/store/bigtable_store.h"
#include "analyzer/store/data_store.h"
#include "util/encrypted_message_util.h"
#include "util/log_based_metrics.h"
#include "util/pem_util.h"
namespace cobalt {
namespace analyzer {
using store::BigtableStore;
using store::DataStore;
using store::ObservationStore;
using util::MessageDecrypter;
using util::PemUtil;
// Stackdriver metric constants
namespace {
const char kAddObservationsFailure[] =
"analyzer-service-add-observations-failure";
} // namespace
DEFINE_int32(port, 0, "The port that the Analyzer Service should listen on.");
DEFINE_string(
private_key_pem_file, "",
"Path to a file containing a PEM encoding of the private key of "
"the Analyzer used for Cobalt's internal encryption scheme. If "
"not specified then the Analyzer will not support encrypted Observations.");
std::unique_ptr<AnalyzerServiceImpl>
AnalyzerServiceImpl::CreateFromFlagsOrDie() {
std::shared_ptr<DataStore> data_store(
BigtableStore::CreateFromFlagsOrDie().release());
std::shared_ptr<ObservationStore> observation_store(
new ObservationStore(data_store));
CHECK(FLAGS_port) << "--port is a mandatory flag";
std::shared_ptr<grpc::ServerCredentials> server_credentials;
// TODO(rudominer) Currently there is not a compelling reason to protect the
// analyzer gRPC endpoint using TLS because we do not expose the endpoint
// to the internet. If we chose to so protect it then see
// report_master_service.cc for an example of how to do it.
server_credentials = grpc::InsecureServerCredentials();
std::string private_key_pem;
PemUtil::ReadTextFile(FLAGS_private_key_pem_file, &private_key_pem);
if (private_key_pem.empty()) {
LOG(WARNING) << "WARNING: No valid private key PEM was read. The Analyzer "
"will not be able to decrypt encrypted Observations.";
LOG(WARNING) << "-private_key_pem_file=" << FLAGS_private_key_pem_file;
} else {
LOG(INFO) << "Analyzer private key was read from file "
<< FLAGS_private_key_pem_file;
}
return std::unique_ptr<AnalyzerServiceImpl>(new AnalyzerServiceImpl(
observation_store, FLAGS_port, server_credentials, private_key_pem));
}
AnalyzerServiceImpl::AnalyzerServiceImpl(
std::shared_ptr<store::ObservationStore> observation_store, int port,
std::shared_ptr<grpc::ServerCredentials> server_credentials,
const std::string& private_key_pem)
: observation_store_(observation_store),
port_(port),
server_credentials_(server_credentials),
message_decrypter_(private_key_pem) {}
void AnalyzerServiceImpl::Start() {
grpc::ServerBuilder builder;
char local_address[1024];
// We use 0.0.0.0 to indicate the wildcard interface.
snprintf(local_address, sizeof(local_address), "0.0.0.0:%d", port_);
builder.AddListeningPort(local_address, server_credentials_);
builder.RegisterService(this);
server_ = builder.BuildAndStart();
LOG(INFO) << "Starting Analyzer service on port " << port_;
}
void AnalyzerServiceImpl::Shutdown() { server_->Shutdown(); }
void AnalyzerServiceImpl::Wait() { server_->Wait(); }
grpc::Status AnalyzerServiceImpl::AddObservations(
grpc::ServerContext* context, const ObservationBatch* batch,
google::protobuf::Empty* empty) {
VLOG(3) << "Received batch of " << batch->encrypted_observation_size()
<< " observations for metric (" << batch->meta_data().customer_id()
<< ", " << batch->meta_data().project_id() << ", "
<< batch->meta_data().metric_id() << ")";
std::vector<Observation> observations(batch->encrypted_observation_size());
size_t index = 0;
for (const EncryptedMessage& em : batch->encrypted_observation()) {
if (!message_decrypter_.DecryptMessage(em, &observations[index++])) {
std::string error_message = "Decryption of an Observation failed.";
LOG_STACKDRIVER_COUNT_METRIC(ERROR, kAddObservationsFailure)
<< error_message;
return grpc::Status(grpc::INVALID_ARGUMENT, error_message);
}
}
auto add_status =
observation_store_->AddObservationBatch(batch->meta_data(), observations);
if (add_status != store::kOK) {
LOG_STACKDRIVER_COUNT_METRIC(ERROR, kAddObservationsFailure)
<< "AddObservationBatch() failed with status code " << add_status;
switch (add_status) {
case store::kInvalidArguments:
return grpc::Status(grpc::INVALID_ARGUMENT, "");
default:
return grpc::Status(grpc::INTERNAL, "");
}
}
return grpc::Status::OK;
}
} // namespace analyzer
} // namespace cobalt