blob: 470de79cb4408bc09845202d17db956ad3079067 [file] [log] [blame]
// Copyright 2017 The Fuchsia Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "tools/test_app/test_app.h"
#include <libgen.h>
#include <chrono>
#include <fstream>
#include <memory>
#include <string>
#include <thread>
#include <utility>
#include <vector>
#include "./observation.pb.h"
#include "analyzer/analyzer_service/analyzer.grpc.pb.h"
#include "config/cobalt_config.pb.h"
#include "config/encoding_config.h"
#include "encoder/encoder.h"
#include "encoder/envelope_maker.h"
#include "encoder/project_context.h"
#include "encoder/send_retryer.h"
#include "encoder/shuffler_client.h"
#include "gflags/gflags.h"
#include "glog/logging.h"
#include "grpc++/grpc++.h"
#include "util/pem_util.h"
namespace cobalt {
using analyzer::Analyzer;
using config::EncodingRegistry;
using config::MetricRegistry;
using encoder::ClientSecret;
using encoder::Encoder;
using encoder::EnvelopeMaker;
using encoder::ProjectContext;
using encoder::ShippingManager;
using encoder::ShufflerClient;
using encoder::ShufflerClientInterface;
using encoder::SystemData;
using encoder::send_retryer::SendRetryer;
using google::protobuf::Empty;
using grpc::Channel;
using grpc::ClientContext;
using grpc::Status;
using shuffler::Shuffler;
using util::PemUtil;
// There are three modes of operation of the Cobalt TestClient program
// determined by the value of this flag.
// - interactive: The program runs an interactive command-loop.
// - send-once: The program sends a single Envelope described by flags.
// - automatic: The program runs forever sending many Envelopes with randomly
// generated values.
//
// This flag is read in CreateFromFlagsOrDie() and used to invoke
// set_mode().
DEFINE_string(mode, "interactive",
"This program may be used in 3 modes: 'interactive', "
"'send-once', 'automatic'");
// The remainder of the flags fall into three categories.
// Category 1: Flags read by CreateFromFlagsOrDie() that set immutable
// values used in all three modes.
DEFINE_uint32(customer, 1, "Customer ID");
DEFINE_uint32(project, 1, "Project ID");
DEFINE_string(analyzer_uri, "",
"The URI of the Analyzer. Necessary only if sending observations "
"to the Analyzer.");
DEFINE_string(shuffler_uri, "",
"The URI of the Shuffler. Necessary only if sending observations "
"to the Shuffler.");
DEFINE_string(analyzer_pk_pem_file, "",
"Path to a file containing a PEM encoding of the public key of "
"the Analyzer used for Cobalt's internal encryption scheme. If "
"not specified then no encryption will be used.");
DEFINE_string(shuffler_pk_pem_file, "",
"Path to a file containing a PEM encoding of the public key of "
"the Shuffler used for Cobalt's internal encryption scheme. If "
"not specified then no encryption will be used.");
DEFINE_bool(
use_tls, false,
"Should tls be used for the connection to the shuffler or the analyzer?");
DEFINE_string(root_certs_pem_file, "",
"Full path to a file containing a PEM encoding of the TLS root "
"certificates to be used by the gRPC client.");
DEFINE_uint32(deadline_seconds, 10, "RPC deadline.");
DEFINE_string(config_bin_proto_path, "",
"Path to the serialized CobaltConfig proto from which the "
"configuration is to be read. (Optional)");
// Category 2: Flags consumed by CreateFromFlagsOrDie() that set values that
// may be overidden by a set command in interactive mode.
DEFINE_uint32(metric, 1, "Metric ID to use.");
DEFINE_bool(skip_shuffler, false,
"If true send Observations directly to the analyzer.");
// Category 3: Flags used only in send-once or automatic modes. These are not
// consumed by CreateFromFlagsOrDie().
DEFINE_uint32(num_clients, 1,
"Number of clients to simulate in the non-interactive modes.");
DEFINE_string(
values, "",
"A comma-separated list of colon-delimited triples of the form"
" <part>:<val>:<encoding> where <part> is the name of a metric part and"
" <val> is a string or int value and <encoding> is an EncodingConfig id."
" Used only in send-once mode to specify the multi-part value to"
" encode and the encodings to use.");
DEFINE_uint32(repeat, 1,
"Number of times to repeat the add-send cycle in the "
"non-interactive mode.");
DEFINE_uint32(num_adds_per_observation, 1,
"Number of times each Observation should be added to the "
"envelope. Setting this to more than 1 allows us to test "
"idempotency.");
DEFINE_string(override_board_name, "",
"A board name to override the default one");
namespace {
const size_t kMaxBytesPerObservation = 100 * 1024;
const size_t kMaxBytesPerEnvelope = 1024 * 1024;
const size_t kMaxBytesTotal = 10 * 1024 * 1024;
const size_t kMinEnvelopeSendSize = 1024;
const std::chrono::seconds kInitialRpcDeadline(FLAGS_deadline_seconds);
const std::chrono::seconds kDeadlinePerSendAttempt(60);
// Prints help for the interactive mode.
void PrintHelp(std::ostream* ostream) {
*ostream << std::endl;
*ostream << "Cobalt command-line testing client" << std::endl;
*ostream << "----------------------------------" << std::endl;
*ostream << "help \tPrint this help message."
<< std::endl;
*ostream << "encode <num> <val> \tEncode <num> independent copies "
"of the string or integer value <val>, or index <n> if "
"<val>='index=<n>'"
<< std::endl;
*ostream << std::endl;
*ostream << "encode <num> <part>:<val>:<encoding> "
"<part>:<val>:<encoding>..."
<< std::endl;
*ostream << " \tEncode <num> independent copies of "
"a multi-part value. Each <part> is a part name."
<< std::endl;
*ostream << " \tEach <val> is an int or string "
"value or an index <n> if <val>='index=<n>'."
<< std::endl;
*ostream << " \tEach <encoding> is an EncodingConfig "
"id."
<< std::endl;
*ostream << std::endl;
*ostream << "ls \tList current values of "
"parameters."
<< std::endl;
*ostream << "send \tSend all previously encoded "
"observations and clear the observation cache."
<< std::endl;
*ostream << "set encoding <id> \tSet encoding config id." << std::endl;
*ostream << "set metric <id> \tSet metric id." << std::endl;
*ostream << "set skip_shuffler <bool> \tSet skip_shuffler." << std::endl;
*ostream << "show config \tDisplay the current Metric and "
"Encoding configurations."
<< std::endl;
*ostream << "quit \tQuit." << std::endl;
*ostream << std::endl;
}
// Returns the path to the standard Cobalt configuration based on the presumed
// location of this binary.
std::string FindCobaltConfigProto(char* argv[]) {
char path[PATH_MAX], path2[PATH_MAX];
// Get the directory of this binary.
if (!realpath(argv[0], path)) {
LOG(FATAL) << "realpath(): " << argv[0];
}
char* dir = dirname(path);
// Set the relative path to the registry.
snprintf(path2, sizeof(path2),
"%s/../../config/third_party/config/cobalt_config.binproto", dir);
// Get the absolute path to the registry.
if (!realpath(path2, path)) {
LOG(FATAL) << "Computed path to serialized CobaltConfig is invalid: "
<< path;
}
return path;
}
// Parses the mode flag.
TestApp::Mode ParseMode() {
if (FLAGS_mode == "interactive") {
return TestApp::kInteractive;
}
if (FLAGS_mode == "send-once") {
return TestApp::kSendOnce;
}
if (FLAGS_mode == "automatic") {
return TestApp::kAutomatic;
}
LOG(FATAL) << "Unrecognized mode: " << FLAGS_mode;
}
// Reads the PEM file at the specified path and writes the contents into
// |*pem_out|. Returns true for success or false for failure.
bool ReadPublicKeyPem(const std::string& pem_file, std::string* pem_out) {
VLOG(2) << "Reading PEM file at " << pem_file;
if (PemUtil::ReadTextFile(pem_file, pem_out)) {
return true;
}
LOG(ERROR) << "Unable to open PEM file at " << pem_file
<< ". Skipping encryption!";
return false;
}
// Reads the specified serialized CobaltConfig proto. Returns a ProjectContext
// containing the read config and the values of the -customer and
// -project flags.
std::shared_ptr<ProjectContext> LoadProjectContext(
const std::string& config_bin_proto_path) {
VLOG(2) << "Loading Cobalt configuration from " << config_bin_proto_path;
std::ifstream config_file_stream;
config_file_stream.open(config_bin_proto_path);
CHECK(config_file_stream)
<< "Could not open cobalt config proto file: " << config_bin_proto_path;
// Parse the cobalt config file.
cobalt::CobaltConfig cobalt_config;
CHECK(cobalt_config.ParseFromIstream(&config_file_stream))
<< "Could not parse the cobalt config proto file: "
<< config_bin_proto_path;
// Load the encoding registry.
cobalt::RegisteredEncodings registered_encodings;
registered_encodings.mutable_element()->Swap(
cobalt_config.mutable_encoding_configs());
auto encodings = EncodingRegistry::TakeFrom(&registered_encodings, nullptr);
if (encodings.second != config::kOK) {
LOG(FATAL) << "Can't load encodings configuration";
}
std::shared_ptr<EncodingRegistry> encoding_registry(
encodings.first.release());
// Load the metrics registry.
cobalt::RegisteredMetrics registered_metrics;
registered_metrics.mutable_element()->Swap(
cobalt_config.mutable_metric_configs());
auto metrics = MetricRegistry::TakeFrom(&registered_metrics, nullptr);
if (metrics.second != config::kOK) {
LOG(FATAL) << "Can't load metrics configuration";
}
std::shared_ptr<MetricRegistry> metric_registry(metrics.first.release());
CHECK(FLAGS_project < 100) << "-project=" << FLAGS_project
<< " not allowed. Project ID must be less than "
"100 because this tool is not "
"intended to mutate real customer projects.";
return std::shared_ptr<ProjectContext>(new ProjectContext(
FLAGS_customer, FLAGS_project, metric_registry, encoding_registry));
}
bool ParseBool(const std::string& str) {
return (str == "true" || str == "True" || str == "1");
}
// Given a |line| of text, breaks it into tokens separated by white space.
std::vector<std::string> Tokenize(const std::string& line) {
std::istringstream line_stream(line);
std::vector<std::string> tokens;
do {
std::string token;
line_stream >> token;
std::remove(token.begin(), token.end(), ' ');
if (!token.empty()) {
tokens.push_back(token);
}
} while (line_stream);
return tokens;
}
// Given a |line| of text, breaks it into cells separated by commas.
std::vector<std::string> ParseCSV(const std::string& line) {
std::stringstream line_stream(line);
std::vector<std::string> cells;
std::string cell;
while (std::getline(line_stream, cell, ',')) {
if (!cell.empty()) {
cells.push_back(cell);
}
}
return cells;
}
std::shared_ptr<grpc::ChannelCredentials> CreateChannelCredentials(
bool use_tls, const char* pem_root_certs = nullptr) {
if (use_tls) {
auto opts = grpc::SslCredentialsOptions();
if (pem_root_certs) {
opts.pem_root_certs = pem_root_certs;
}
return grpc::SslCredentials(opts);
} else {
return grpc::InsecureChannelCredentials();
}
}
template <class T>
std::string ToString(std::vector<T> v) {
std::ostringstream stream;
for (const T& i : v) {
stream << i << " ";
}
return stream.str();
}
} // namespace
// An implementation of AnalyzerClientInterface that actually sends Envelopes.
class AnalyzerClient : public AnalyzerClientInterface {
public:
// The mode is used only to determine whether to print error messages to
// the logs or the the console.
AnalyzerClient(std::unique_ptr<analyzer::Analyzer::Stub> analyzer_stub,
TestApp::Mode mode)
: analyzer_stub_(std::move(analyzer_stub)), mode_(mode) {}
virtual ~AnalyzerClient() = default;
private:
void SendToAnalyzer(const Envelope& envelope) {
if (!analyzer_stub_) {
if (mode_ == TestApp::kInteractive) {
std::cout
<< "The flag -analyzer_uri was not specified so you cannot "
"send directly to the analyzer. Try 'set skip_shuffler false'."
<< std::endl;
} else {
LOG(ERROR) << "-analyzer_uri was not specified.";
}
return;
}
if (envelope.batch_size() == 0) {
if (mode_ == TestApp::kInteractive) {
std::cout << "There are no Observations to send yet." << std::endl;
} else {
LOG(ERROR) << "Not sending to analyzer. No observations were "
"successfully encoded.";
}
return;
}
Empty resp;
for (const ObservationBatch& batch : envelope.batch()) {
if (mode_ == TestApp::kInteractive) {
} else {
VLOG(2) << "Sending to analyzer with deadline = "
<< FLAGS_deadline_seconds << " seconds...";
}
std::unique_ptr<grpc::ClientContext> context(new grpc::ClientContext());
context->set_deadline(std::chrono::system_clock::now() +
std::chrono::seconds(FLAGS_deadline_seconds));
auto status =
analyzer_stub_->AddObservations(context.get(), batch, &resp);
if (status.ok()) {
if (mode_ == TestApp::kInteractive) {
std::cout << "Sent to Analyzer" << std::endl;
} else {
VLOG(2) << "Sent to Analyzer";
}
} else {
if (mode_ == TestApp::kInteractive) {
std::cout << "Send to analyzer failed with status="
<< status.error_code() << " " << status.error_message()
<< std::endl;
} else {
LOG(ERROR) << "Send to analyzer failed with status="
<< status.error_code() << " " << status.error_message();
}
return;
}
}
}
std::unique_ptr<analyzer::Analyzer::Stub> analyzer_stub_;
TestApp::Mode mode_;
};
void TestApp::SendToShuffler() {
if (!shuffler_client_) {
if (mode_ == TestApp::kInteractive) {
std::cout << "The flag -shuffler_uri was not specified so you cannot "
"send to the shuffler. Try 'set skip_shuffler true'."
<< std::endl;
} else {
LOG(ERROR) << "-shuffler_uri was not specified.";
}
return;
}
if (mode_ != TestApp::kInteractive) {
VLOG(2) << "Sending to shuffler with deadline = " << FLAGS_deadline_seconds
<< " seconds...";
}
shipping_manager_->RequestSendSoon();
shipping_manager_->WaitUntilIdle(kDeadlinePerSendAttempt);
auto status = shipping_manager_->last_send_status();
if (status.ok()) {
if (mode_ == TestApp::kInteractive) {
std::cout << "Sent to Shuffler." << std::endl;
} else {
VLOG(2) << "Sent to Shuffler";
}
return;
} else {
if (mode_ == TestApp::kInteractive) {
std::cout << "Send to shuffler failed with status=" << status.error_code()
<< " " << status.error_message() << std::endl;
return;
} else {
LOG(ERROR) << "Send to shuffler failed with status="
<< status.error_code() << " " << status.error_message();
}
}
}
std::unique_ptr<TestApp> TestApp::CreateFromFlagsOrDie(int argc, char* argv[]) {
std::string config_bin_proto_path = FLAGS_config_bin_proto_path;
// If no path is given, try to deduce it from the binary location.
if (config_bin_proto_path == "") {
config_bin_proto_path = FindCobaltConfigProto(argv);
}
std::shared_ptr<encoder::ProjectContext> project_context =
LoadProjectContext(config_bin_proto_path);
CHECK(!FLAGS_analyzer_uri.empty() || !FLAGS_shuffler_uri.empty())
<< "You must specify either -shuffler_uri or -analyzer_uri";
std::unique_ptr<analyzer::Analyzer::Stub> analyzer_stub;
if (!FLAGS_analyzer_uri.empty()) {
analyzer_stub = Analyzer::NewStub(grpc::CreateChannel(
FLAGS_analyzer_uri, CreateChannelCredentials(FLAGS_use_tls)));
}
auto mode = ParseMode();
std::shared_ptr<AnalyzerClientInterface> analyzer_client(
new AnalyzerClient(std::move(analyzer_stub), mode));
std::shared_ptr<encoder::ShufflerClient> shuffler_client;
if (!FLAGS_shuffler_uri.empty()) {
VLOG(2) << "Connecting to Shuffler at " << FLAGS_shuffler_uri;
const char* pem_root_certs = nullptr;
std::string pem_root_certs_str;
if (FLAGS_use_tls) {
VLOG(2) << "Using TLS.";
if (!FLAGS_root_certs_pem_file.empty()) {
VLOG(2) << "Reading root certs from " << FLAGS_root_certs_pem_file;
CHECK(PemUtil::ReadTextFile(FLAGS_root_certs_pem_file,
&pem_root_certs_str));
pem_root_certs = pem_root_certs_str.c_str();
}
} else {
VLOG(2) << "NOT using TLS.";
}
shuffler_client.reset(
new ShufflerClient(FLAGS_shuffler_uri, FLAGS_use_tls, pem_root_certs));
}
auto analyzer_encryption_scheme = EncryptedMessage::NONE;
std::string analyzer_public_key_pem = "";
if (FLAGS_analyzer_pk_pem_file.empty()) {
VLOG(2) << "WARNING: Encryption of Observations to the Analzyer not being "
"used. Pass the flag -analyzer_pk_pem_file";
} else if (ReadPublicKeyPem(FLAGS_analyzer_pk_pem_file,
&analyzer_public_key_pem)) {
analyzer_encryption_scheme = EncryptedMessage::HYBRID_ECDH_V1;
}
auto shuffler_encryption_scheme = EncryptedMessage::NONE;
std::string shuffler_public_key_pem = "";
if (FLAGS_shuffler_pk_pem_file.empty()) {
VLOG(2) << "WARNING: Encryption of Envelopes to the Shuffler not being "
"used. Pass the flag -shuffler_pk_pem_file";
} else if (ReadPublicKeyPem(FLAGS_shuffler_pk_pem_file,
&shuffler_public_key_pem)) {
shuffler_encryption_scheme = EncryptedMessage::HYBRID_ECDH_V1;
}
std::unique_ptr<SystemData> system_data(new SystemData("test_app"));
if (!FLAGS_override_board_name.empty()) {
SystemProfile profile;
profile.set_os(SystemProfile::FUCHSIA);
profile.set_arch(SystemProfile::X86_64);
profile.set_board_name(FLAGS_override_board_name);
system_data->OverrideSystemProfile(profile);
}
auto test_app = std::unique_ptr<TestApp>(new TestApp(
project_context, analyzer_client, shuffler_client, std::move(system_data),
analyzer_public_key_pem, analyzer_encryption_scheme,
shuffler_public_key_pem, shuffler_encryption_scheme, &std::cout));
test_app->set_metric(FLAGS_metric);
test_app->set_skip_shuffler(FLAGS_skip_shuffler);
test_app->set_mode(mode);
return test_app;
}
TestApp::TestApp(
std::shared_ptr<ProjectContext> project_context,
std::shared_ptr<AnalyzerClientInterface> analyzer_client,
std::shared_ptr<encoder::ShufflerClientInterface> shuffler_client,
std::unique_ptr<encoder::SystemData> system_data,
const std::string& analyzer_public_key_pem,
EncryptedMessage::EncryptionScheme analyzer_scheme,
const std::string& shuffler_public_key_pem,
EncryptedMessage::EncryptionScheme shuffler_scheme, std::ostream* ostream)
: customer_id_(project_context->customer_id()),
project_id_(project_context->project_id()),
project_context_(project_context),
analyzer_client_(analyzer_client),
shuffler_client_(shuffler_client),
send_retryer_(new SendRetryer(shuffler_client_.get())),
system_data_(std::move(system_data)),
shipping_manager_(new ShippingManager(
ShippingManager::SizeParams(kMaxBytesPerObservation,
kMaxBytesPerEnvelope, kMaxBytesTotal,
kMinEnvelopeSendSize),
// By using (kMaxSeconds, 0) here we are effectively putting the
// ShippingManager in manual mode. It will never send automatically
// and it will send immediately in response to RequestSendSoon().
ShippingManager::ScheduleParams(ShippingManager::kMaxSeconds,
std::chrono::seconds(0)),
ShippingManager::EnvelopeMakerParams(
analyzer_public_key_pem, analyzer_scheme, shuffler_public_key_pem,
shuffler_scheme),
ShippingManager::SendRetryerParams(kInitialRpcDeadline,
kDeadlinePerSendAttempt),
send_retryer_.get())),
ostream_(ostream) {
shipping_manager_->Start();
}
void TestApp::Run() {
switch (mode_) {
case kInteractive:
CommandLoop();
break;
case kSendOnce:
SendAndQuit();
break;
case kAutomatic:
RunAutomatic();
break;
}
}
void TestApp::RunAutomatic() {
// TODO(rudominer) Implement automatic mode.
LOG(FATAL) << "automatic mode is not yet implemented.";
}
void TestApp::SendAndQuit() {
VLOG(1) << "--values=" << FLAGS_values;
auto value_triples = ParseCSV(FLAGS_values);
if (value_triples.empty()) {
LOG(ERROR) << "--values was not set.";
return;
}
std::vector<std::string> part_names;
std::vector<std::string> values;
std::vector<uint32_t> encoding_config_ids;
for (const auto& triple : value_triples) {
part_names.emplace_back();
values.emplace_back();
encoding_config_ids.emplace_back();
if (!ParsePartValueEncodingTriple(triple, &part_names.back(),
&values.back(),
&encoding_config_ids.back())) {
LOG(ERROR)
<< "Malformed <part>:<value>:<encoding> triple in --values flag: "
<< triple;
return;
}
}
for (uint32_t i = 0; i < FLAGS_repeat; i++) {
VLOG(2) << "encoding_config_ids=" << ToString(encoding_config_ids)
<< " part_names=" << ToString(part_names)
<< " values=" << ToString(values);
Encode(encoding_config_ids, part_names, values);
SendAccumulatedObservations();
}
}
void TestApp::SendAccumulatedObservations() {
if (skip_shuffler_) {
auto envelope_maker = shipping_manager_->TakeActiveEnvelopeMaker();
analyzer_client_->SendToAnalyzer(envelope_maker->envelope());
} else {
SendToShuffler();
}
}
void TestApp::CommandLoop() {
std::string command_line;
while (true) {
*ostream_ << "Command or 'help': ";
getline(std::cin, command_line);
if (!ProcessCommandLine(command_line)) {
break;
}
}
}
// Generates FLAGS_num_clients independent Observations by encoding the
// multi-part value specified by the arguments and adds the Observations
// to the EnvelopeMaker.
void TestApp::Encode(const std::vector<uint32_t> encoding_config_ids,
const std::vector<std::string>& metric_parts,
const std::vector<std::string>& values) {
for (size_t i = 0; i < FLAGS_num_clients; i++) {
if (!EncodeAsNewClient(encoding_config_ids, metric_parts, values)) {
break;
}
}
}
// Generates a new ClientSecret, constructs a new Encoder using that secret,
// uses this Encoder to encode the multi-part value specified by the
// arguments, and adds the resulting Observation to the EnvelopeMaker.
bool TestApp::EncodeAsNewClient(const std::vector<uint32_t> encoding_config_ids,
const std::vector<std::string>& metric_parts,
const std::vector<std::string>& values) {
size_t num_parts = metric_parts.size();
CHECK_EQ(num_parts, values.size());
CHECK_EQ(num_parts, encoding_config_ids.size());
// Build the |Value|.
Encoder::Value value;
for (size_t i = 0; i < num_parts; i++) {
int64_t int_val;
uint32_t index;
if (ParseInt(values[i], false, &int_val)) {
value.AddIntPart(encoding_config_ids[i], metric_parts[i], int_val);
} else if (ParseIndex(values[i], &index)) {
value.AddIndexPart(encoding_config_ids[i], metric_parts[i], index);
} else {
value.AddStringPart(encoding_config_ids[i], metric_parts[i], values[i]);
}
}
// Construct a new Encoder.
std::unique_ptr<Encoder> encoder(new Encoder(
project_context_, ClientSecret::GenerateNewSecret(), system_data_.get()));
// Use the Encoder to encode the Value.
auto result = encoder->Encode(metric_, value);
if (result.status != Encoder::kOK) {
LOG(ERROR) << "Encode() failed with status " << result.status
<< ". metric_id=" << metric_ << ". Multi-part value:";
for (size_t i = 0; i < num_parts; i++) {
LOG(ERROR) << metric_parts[i] << ":" << values[i]
<< " encoding=" << encoding_config_ids[i];
}
return false;
}
// Add the observation to the EnvelopeMaker. For the sake of testing
// idempotency of the AddObservation() operation, we add the same Observation
// multiple times.
ShippingManager::Status status;
for (size_t i = 0; i < FLAGS_num_adds_per_observation; i++) {
uint64_t random_id;
std::memcpy(&random_id, result.observation->random_id().data(),
sizeof(random_id));
VLOG(5) << "Adding observation with random_id=" << random_id;
status = shipping_manager_->AddObservation(
*result.observation, std::unique_ptr<ObservationMetadata>(
new ObservationMetadata(*result.metadata)));
}
if (status != ShippingManager::kOk) {
LOG(ERROR) << "AddObservation() failed with status " << status
<< ". metric_id=" << metric_;
return false;
}
return true;
}
// Generates FLAGS_num_clients independent Observations by encoding the
// string value specified by the argument and adds the Observations
// to the ShippingManager.
void TestApp::EncodeString(const std::string value) {
for (size_t i = 0; i < FLAGS_num_clients; i++) {
if (!EncodeStringAsNewClient(value)) {
break;
}
}
}
// Generates a new ClientSecret, constructs a new Encoder using that secret,
// uses this Encoder to encode the string value specified by the
// argument, and adds the resulting Observation to the ShippingManager.
bool TestApp::EncodeStringAsNewClient(const std::string value) {
std::unique_ptr<Encoder> encoder(new Encoder(
project_context_, ClientSecret::GenerateNewSecret(), system_data_.get()));
auto result = encoder->EncodeString(metric_, encoding_config_id_, value);
if (result.status != Encoder::kOK) {
LOG(ERROR) << "EncodeString() failed with status " << result.status
<< ". metric_id=" << metric_
<< ". encoding_config_id=" << encoding_config_id_
<< ". value=" << value;
return false;
}
// Add the observation to the ShippingManager.
auto status = shipping_manager_->AddObservation(*result.observation,
std::move(result.metadata));
if (status != ShippingManager::kOk) {
LOG(ERROR) << "AddObservation() failed with status " << status
<< ". metric_id=" << metric_;
return false;
}
return true;
}
// Generates FLAGS_num_clients independent Observations by encoding the
// int value specified by the argument and adds the Observations
// to the ShippingManager.
void TestApp::EncodeInt(int64_t value) {
for (size_t i = 0; i < FLAGS_num_clients; i++) {
if (!EncodeIntAsNewClient(value)) {
break;
}
}
}
// Generates a new ClientSecret, constructs a new Encoder using that secret,
// uses this Encoder to encode the int value specified by the
// argument, and adds the resulting Observation to the ShippingManager.
bool TestApp::EncodeIntAsNewClient(int64_t value) {
std::unique_ptr<Encoder> encoder(new Encoder(
project_context_, ClientSecret::GenerateNewSecret(), system_data_.get()));
auto result = encoder->EncodeInt(metric_, encoding_config_id_, value);
if (result.status != Encoder::kOK) {
LOG(ERROR) << "EncodeInt() failed with status " << result.status
<< ". metric_id=" << metric_
<< ". encoding_config_id=" << encoding_config_id_
<< ". value=" << value;
return false;
}
// Add the observation to the ShippingManager.
auto status = shipping_manager_->AddObservation(*result.observation,
std::move(result.metadata));
if (status != ShippingManager::kOk) {
LOG(ERROR) << "AddObservation() failed with status " << status
<< ". metric_id=" << metric_;
return false;
}
return true;
}
void TestApp::EncodeIndex(uint32_t index) {
for (size_t i = 0; i < FLAGS_num_clients; i++) {
if (!EncodeIndexAsNewClient(index)) {
break;
}
}
}
bool TestApp::EncodeIndexAsNewClient(uint32_t index) {
std::unique_ptr<Encoder> encoder(new Encoder(
project_context_, ClientSecret::GenerateNewSecret(), system_data_.get()));
auto result = encoder->EncodeIndex(metric_, encoding_config_id_, index);
if (result.status != Encoder::kOK) {
LOG(ERROR) << "EncodeIndex() failed with status " << result.status
<< ". metric_id=" << metric_
<< ". encoding_config_id=" << encoding_config_id_
<< ". index=" << index;
return false;
}
// Add the observation to the ShippingManager.
auto status = shipping_manager_->AddObservation(*result.observation,
std::move(result.metadata));
if (status != ShippingManager::kOk) {
LOG(ERROR) << "AddObservation() failed with status " << status
<< ". metric_id=" << metric_;
return false;
}
return true;
}
bool TestApp::ProcessCommandLine(const std::string command_line) {
return ProcessCommand(Tokenize(command_line));
}
bool TestApp::ProcessCommand(const std::vector<std::string>& command) {
if (command.empty()) {
return true;
}
if (command[0] == "help") {
PrintHelp(ostream_);
return true;
}
if (command[0] == "encode") {
Encode(command);
return true;
}
if (command[0] == "ls") {
ListParameters();
return true;
}
if (command[0] == "send") {
Send(command);
return true;
}
if (command[0] == "set") {
SetParameter(command);
return true;
}
if (command[0] == "show") {
Show(command);
return true;
}
if (command[0] == "quit") {
return false;
}
*ostream_ << "Unrecognized command: " << command[0] << std::endl;
return true;
}
void TestApp::Encode(const std::vector<std::string>& command) {
if (command.size() < 3) {
*ostream_ << "Malformed encode command. Expected 2 additional arguments."
<< std::endl;
return;
}
if (command.size() > 3 || IsTriple(command[2])) {
EncodeMulti(command);
return;
}
int64_t num_clients;
if (!ParseInt(command[1], true, &num_clients)) {
return;
}
if (num_clients <= 0) {
*ostream_ << "<num> must be a positive integer: " << num_clients
<< std::endl;
return;
}
FLAGS_num_clients = num_clients;
int64_t int_val;
uint32_t index;
if (ParseInt(command[2], false, &int_val)) {
EncodeInt(int_val);
} else if (ParseIndex(command[2], &index)) {
EncodeIndex(index);
} else {
EncodeString(command[2]);
}
}
void TestApp::EncodeMulti(const std::vector<std::string>& command) {
CHECK_GE(command.size(), 3u);
int64_t num_clients;
if (!ParseInt(command[1], true, &num_clients)) {
return;
}
if (num_clients <= 0) {
*ostream_ << "<num> must be a positive integer: " << num_clients
<< std::endl;
}
FLAGS_num_clients = num_clients;
std::vector<std::string> part_names;
std::vector<std::string> values;
std::vector<uint32_t> encoding_config_ids;
for (size_t i = 2; i < command.size(); i++) {
part_names.emplace_back();
values.emplace_back();
encoding_config_ids.emplace_back();
if (!ParsePartValueEncodingTriple(command[i], &part_names.back(),
&values.back(),
&encoding_config_ids.back())) {
*ostream_
<< "Malformed <part>:<value>:<encoding> triple in encode command: "
<< command[i] << std::endl;
return;
}
}
Encode(encoding_config_ids, part_names, values);
}
void TestApp::ListParameters() {
*ostream_ << std::endl;
*ostream_ << "Settable values" << std::endl;
*ostream_ << "---------------" << std::endl;
*ostream_ << "Metric ID: " << metric_ << std::endl;
*ostream_ << "Encoding Config ID: " << encoding_config_id_ << std::endl;
*ostream_ << "Skip Shuffler: " << skip_shuffler_ << std::endl;
*ostream_ << std::endl;
*ostream_ << "Values set by flag at startup." << std::endl;
*ostream_ << "-----------------------------" << std::endl;
*ostream_ << "Customer ID: " << customer_id_ << std::endl;
*ostream_ << "Project ID: " << project_id_ << std::endl;
*ostream_ << "Analyzer URI: " << FLAGS_analyzer_uri << std::endl;
*ostream_ << "Shuffler URI: " << FLAGS_shuffler_uri << std::endl;
*ostream_ << std::endl;
}
void TestApp::SetParameter(const std::vector<std::string>& command) {
if (command.size() != 3) {
*ostream_ << "Malformed set command. Expected 2 additional arguments."
<< std::endl;
return;
}
if (command[1] == "metric") {
int64_t id;
if (!ParseInt(command[2], true, &id)) {
return;
}
if (id <= 0) {
*ostream_ << "<id> must be a positive integer";
return;
}
metric_ = id;
} else if (command[1] == "encoding") {
int64_t id;
if (!ParseInt(command[2], true, &id)) {
return;
}
if (id <= 0) {
*ostream_ << "<id> must be a positive integer";
return;
}
encoding_config_id_ = id;
} else if (command[1] == "skip_shuffler") {
skip_shuffler_ = ParseBool(command[2]);
} else {
*ostream_ << command[1] << " is not a settable parameter." << std::endl;
}
}
void TestApp::Send(const std::vector<std::string>& command) {
if (command.size() != 1) {
*ostream_ << "The send command doesn't take any arguments." << std::endl;
return;
}
SendAccumulatedObservations();
}
void TestApp::Show(const std::vector<std::string>& command) {
// show config is currently the only show command.
if (command.size() != 2 || command[1] != "config") {
*ostream_ << "Expected 'show config'." << std::endl;
return;
}
auto* metric = project_context_->Metric(metric_);
if (!metric) {
*ostream_ << "There is no metric with id=" << metric_ << "." << std::endl;
} else {
*ostream_ << "Metric " << metric->id() << std::endl;
*ostream_ << "-----------" << std::endl;
ShowMetric(*metric);
*ostream_ << std::endl;
}
auto* encoding = project_context_->EncodingConfig(encoding_config_id_);
if (!encoding) {
*ostream_ << "There is no encoding config with id=" << encoding_config_id_
<< "." << std::endl;
} else {
*ostream_ << "Encoding Config " << encoding->id() << std::endl;
*ostream_ << "--------------------" << std::endl;
ShowEncodingConfig(*encoding);
*ostream_ << std::endl;
}
}
void TestApp::ShowMetric(const Metric& metric) {
*ostream_ << metric.name() << std::endl;
*ostream_ << metric.description() << std::endl;
for (const auto& pair : metric.parts()) {
const std::string& name = pair.first;
const MetricPart& part = pair.second;
std::string data_type;
switch (part.data_type()) {
case MetricPart::STRING:
data_type = "string";
break;
case MetricPart::INT:
data_type = "int";
break;
case MetricPart::INDEX:
data_type = "indexed";
break;
case MetricPart::BLOB:
data_type = "blob";
break;
default:
data_type = "<missing case>";
break;
}
*ostream_ << "One " << data_type << " part named \"" << name
<< "\": " << part.description() << std::endl;
}
}
void TestApp::ShowEncodingConfig(const EncodingConfig& encoding) {
switch (encoding.config_case()) {
case EncodingConfig::kForculus:
ShowForculusConfig(encoding.forculus());
return;
case EncodingConfig::kRappor:
ShowRapporConfig(encoding.rappor());
return;
case EncodingConfig::kBasicRappor:
ShowBasicRapporConfig(encoding.basic_rappor());
return;
case EncodingConfig::kNoOpEncoding:
*ostream_ << "NoOp encoding";
return;
case EncodingConfig::CONFIG_NOT_SET:
*ostream_ << "Invalid Encoding!";
return;
}
}
void TestApp::ShowForculusConfig(const ForculusConfig& config) {
*ostream_ << "Forculus threshold=" << config.threshold() << std::endl;
}
void TestApp::ShowRapporConfig(const RapporConfig& config) {
*ostream_ << "String Rappor" << std::endl;
}
void TestApp::ShowBasicRapporConfig(const BasicRapporConfig& config) {
*ostream_ << "Basic Rappor " << std::endl;
*ostream_ << "p=" << config.prob_0_becomes_1()
<< ", q=" << config.prob_1_stays_1() << std::endl;
*ostream_ << "Categories:" << std::endl;
switch (config.categories_case()) {
case BasicRapporConfig::kStringCategories: {
for (const std::string& s : config.string_categories().category()) {
*ostream_ << s << std::endl;
}
return;
}
case BasicRapporConfig::kIntRangeCategories: {
*ostream_ << config.int_range_categories().first() << " - "
<< config.int_range_categories().last();
return;
}
case BasicRapporConfig::kIndexedCategories: {
*ostream_ << "num_categories: "
<< config.indexed_categories().num_categories();
return;
}
case BasicRapporConfig::CATEGORIES_NOT_SET:
*ostream_ << "Invalid Encoding!";
return;
}
}
bool TestApp::ParseInt(const std::string& str, bool complain, int64_t* x) {
CHECK(x);
std::istringstream iss(str);
*x = 0;
iss >> *x;
char c;
if (*x == 0 || iss.fail() || iss.get(c)) {
if (complain) {
if (mode_ == kInteractive) {
*ostream_ << "Expected positive integer instead of " << str << "."
<< std::endl;
} else {
LOG(ERROR) << "Expected positive integer instead of " << str;
}
}
return false;
}
return true;
}
bool TestApp::ParseIndex(const std::string& str, uint32_t* index) {
CHECK(index);
if (str.size() < 7) {
return false;
}
if (str.substr(0, 6) != "index=") {
return false;
}
auto index_string = str.substr(6);
std::istringstream iss(index_string);
int64_t possible_index;
iss >> possible_index;
char c;
if (iss.fail() || iss.get(c) || possible_index < 0 ||
possible_index > UINT32_MAX) {
if (mode_ == kInteractive) {
*ostream_ << "Expected small non-negative integer instead of "
<< index_string << "." << std::endl;
} else {
LOG(ERROR) << "Expected small non-negative integer instead of "
<< index_string;
}
return false;
}
*index = possible_index;
return true;
}
// Parses a string of the form <part>:<value>:<encoding> and writes <part> into
// |part_name| and <value> into |value| and <encoding> into encoding_config_id.
// Returns true if and only if this succeeds.
bool TestApp::ParsePartValueEncodingTriple(const std::string& triple,
std::string* part_name,
std::string* value,
uint32_t* encoding_config_id) {
CHECK(part_name);
CHECK(value);
if (triple.size() < 5) {
return false;
}
auto last_pos = triple.size() - 1;
auto index1 = triple.find(':');
if (index1 == std::string::npos || index1 == 0 || index1 > last_pos - 3) {
return false;
}
auto index2 = triple.find(':', index1 + 2);
if (index2 == std::string::npos || index2 > last_pos - 1) {
return false;
}
*part_name = std::string(triple, 0, index1);
*value = std::string(triple, index1 + 1, index2 - index1 - 1);
std::string int_string = std::string(triple, index2 + 1);
int64_t id;
if (!ParseInt(int_string, true, &id)) {
return false;
}
if (id < 0) {
if (mode_ == kInteractive) {
*ostream_ << "<encoding> must be positive: " << id << std::endl;
} else {
LOG(ERROR) << "<encoding> must be positive: " << id;
}
return false;
}
*encoding_config_id = id;
return true;
}
// Determines whether or not |str| is a triple of the kind that may be
// parsed by ParsePartValueEncodingTriple.
bool TestApp::IsTriple(const std::string str) {
std::string part_name;
std::string value;
uint32_t encoding_config_id;
return ParsePartValueEncodingTriple(str, &part_name, &value,
&encoding_config_id);
}
} // namespace cobalt