blob: ee5fe02e548f4b2785e06585f70ac858ae64581b [file] [log] [blame]
// Copyright 2017 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include <stdlib.h>
#include <chrono>
#include <memory>
#include <string>
#include <thread>
#include <utility>
#include "lib/app/cpp/application_context.h"
#include "lib/cobalt/fidl/cobalt.fidl.h"
#include "lib/cobalt/fidl/cobalt_controller.fidl.h"
#include "garnet/bin/cobalt/config.h"
#include "grpc++/grpc++.h"
#include "lib/fidl/cpp/bindings/binding.h"
#include "lib/fxl/command_line.h"
#include "lib/fxl/log_settings_command_line.h"
#include "lib/fxl/logging.h"
#include "lib/fxl/macros.h"
#include "lib/fsl/tasks/message_loop.h"
#include "third_party/cobalt/config/metric_config.h"
#include "third_party/cobalt/config/report_config.h"
#include "third_party/cobalt/encoder/client_secret.h"
#include "third_party/cobalt/encoder/encoder.h"
#include "third_party/cobalt/encoder/project_context.h"
#include "third_party/cobalt/encoder/send_retryer.h"
#include "third_party/cobalt/encoder/shipping_manager.h"
#include "third_party/cobalt/encoder/shuffler_client.h"
namespace {
using cobalt::CobaltController;
using cobalt::CobaltEncoder;
using cobalt::CobaltEncoderFactory;
using cobalt::EncryptedMessage;
using cobalt::Status;
using cobalt::config::EncodingRegistry;
using cobalt::config::MetricRegistry;
using cobalt::encoder::ClientSecret;
using cobalt::encoder::Encoder;
using cobalt::encoder::ProjectContext;
using cobalt::encoder::ShippingManager;
using cobalt::encoder::ShufflerClient;
using cobalt::encoder::SystemData;
using cobalt::encoder::send_retryer::SendRetryer;
// Command-line flags
// Used to override kScheduleIntervalDefault;
constexpr fxl::StringView kScheduleIntervalSecondsFlagName =
"schedule_interval_seconds";
// Used to override kMinIntervalDefault;
constexpr fxl::StringView kMinIntervalSecondsFlagName = "min_interval_seconds";
const char kCloudShufflerUri[] = "shuffler.cobalt-api.fuchsia.com:443";
const int32_t kFuchsiaCustomerId = 1;
const size_t kMaxBytesPerEnvelope = 512 * 1024; // 0.5 MiB.
const size_t kMaxBytesTotal = 1024 * 1024; // 1 MiB
const size_t kMinEnvelopeSendSize = 10 * 1024; // 10 K
// Because we don't yet persist Observations to local, non-volatile storage,
// we send accumulated Observations every 10 seconds. After persistence is
// implemented this value should be changed to something more like one hour.
const std::chrono::seconds kScheduleIntervalDefault(10);
// We send Observations to the Shuffler more frequently than kScheduleInterval
// under some circumstances, namely, if there is memory pressure or if we
// are explicitly asked to do so via the RequestSendSoon() method. This value
// is a safety parameter. We do not make two attempts within a period of this
// specified length.
const std::chrono::seconds kMinIntervalDefault(1);
// Each "send attempt" is actually a cycle of potential retries. These
// two parameters configure the SendRetryer.
const std::chrono::seconds kInitialRpcDeadline(10);
const std::chrono::seconds kDeadlinePerSendAttempt(60);
// Maps a ShippingManager::Status to a cobalt::Status.
cobalt::Status ToCobaltStatus(ShippingManager::Status s) {
switch (s) {
case ShippingManager::kOk:
return Status::OK;
case ShippingManager::kObservationTooBig:
return Status::OBSERVATION_TOO_BIG;
case ShippingManager::kFull:
return Status::TEMPORARILY_FULL;
case ShippingManager::kShutDown:
case ShippingManager::kEncryptionFailed:
return Status::INTERNAL_ERROR;
}
}
//////////////////////////////////////////////////////////
// class CobaltEncoderImpl
///////////////////////////////////////////////////////////
class CobaltEncoderImpl : public CobaltEncoder {
public:
// Does not take ownership of |shipping_manager|.
CobaltEncoderImpl(std::unique_ptr<ProjectContext> project_context,
ClientSecret client_secret,
ShippingManager* shipping_manager);
private:
void AddStringObservation(uint32_t metric_id,
uint32_t encoding_id,
const fidl::String& observation,
const AddStringObservationCallback& callback);
void AddIndexObservation(uint32_t metric_id,
uint32_t encoding_id,
uint32_t index,
const AddIndexObservationCallback& callback);
void SendObservations(const SendObservationsCallback& callback);
Encoder encoder_;
ShippingManager* shipping_manager_; // not owned
FXL_DISALLOW_COPY_AND_ASSIGN(CobaltEncoderImpl);
};
CobaltEncoderImpl::CobaltEncoderImpl(
std::unique_ptr<ProjectContext> project_context,
ClientSecret client_secret,
ShippingManager* shipping_manager)
: encoder_(std::move(project_context), std::move(client_secret)),
shipping_manager_(shipping_manager) {}
void CobaltEncoderImpl::AddStringObservation(
uint32_t metric_id,
uint32_t encoding_id,
const fidl::String& observation,
const AddStringObservationCallback& callback) {
auto result = encoder_.EncodeString(metric_id, encoding_id, observation);
switch (result.status) {
case Encoder::kOK:
break;
case Encoder::kInvalidArguments:
callback(Status::INVALID_ARGUMENTS);
return;
case Encoder::kInvalidConfig:
case Encoder::kEncodingFailed:
callback(Status::INTERNAL_ERROR);
FXL_LOG(WARNING) << "Cobalt internal error: " << result.status;
return;
}
Status status = ToCobaltStatus(shipping_manager_->AddObservation(
*result.observation, std::move(result.metadata)));
callback(status);
}
void CobaltEncoderImpl::AddIndexObservation(
uint32_t metric_id,
uint32_t encoding_id,
uint32_t index,
const AddIndexObservationCallback& callback) {
auto result = encoder_.EncodeIndex(metric_id, encoding_id, index);
switch (result.status) {
case Encoder::kOK:
break;
case Encoder::kInvalidArguments:
callback(Status::INVALID_ARGUMENTS);
return;
case Encoder::kInvalidConfig:
case Encoder::kEncodingFailed:
callback(Status::INTERNAL_ERROR);
FXL_LOG(WARNING) << "Cobalt internal error: " << result.status;
return;
}
Status status = ToCobaltStatus(shipping_manager_->AddObservation(
*result.observation, std::move(result.metadata)));
callback(status);
}
void CobaltEncoderImpl::SendObservations(
const SendObservationsCallback& callback) {
callback(Status::OK);
}
///////////////////////////////////////////////////////////
// class CobaltControllerImpl
///////////////////////////////////////////////////////////
class CobaltControllerImpl : public CobaltController {
public:
// Does not take ownerhsip of |shipping_manager|.
CobaltControllerImpl(fxl::RefPtr<fxl::TaskRunner> task_runner,
ShippingManager* shipping_manager);
private:
void RequestSendSoon(const RequestSendSoonCallback& callback);
void BlockUntilEmpty(uint32_t max_wait_seconds,
const BlockUntilEmptyCallback& callback);
void NumSendAttempts(const NumSendAttemptsCallback& callback);
void FailedSendAttempts(const FailedSendAttemptsCallback& callback);
fxl::RefPtr<fxl::TaskRunner> task_runner_;
ShippingManager* shipping_manager_; // not owned
FXL_DISALLOW_COPY_AND_ASSIGN(CobaltControllerImpl);
};
CobaltControllerImpl::CobaltControllerImpl(
fxl::RefPtr<fxl::TaskRunner> task_runner,
ShippingManager* shipping_manager)
: task_runner_(std::move(task_runner)),
shipping_manager_(shipping_manager) {}
void CobaltControllerImpl::RequestSendSoon(
const RequestSendSoonCallback& callback) {
// callback_adapter invokes |callback| on the main thread.
std::function<void(bool)> callback_adapter = [this, callback](bool success) {
task_runner_->PostTask([callback, success]() { callback(success); });
};
shipping_manager_->RequestSendSoon(callback_adapter);
}
void CobaltControllerImpl::BlockUntilEmpty(
uint32_t max_wait_seconds,
const BlockUntilEmptyCallback& callback) {
shipping_manager_->WaitUntilIdle(std::chrono::seconds(max_wait_seconds));
callback();
}
void CobaltControllerImpl::NumSendAttempts(
const NumSendAttemptsCallback& callback) {
callback(shipping_manager_->num_send_attempts());
}
void CobaltControllerImpl::FailedSendAttempts(
const FailedSendAttemptsCallback& callback) {
callback(shipping_manager_->num_failed_attempts());
}
///////////////////////////////////////////////////////////
// class CobaltEncoderFactoryImpl
///////////////////////////////////////////////////////////
class CobaltEncoderFactoryImpl : public CobaltEncoderFactory {
public:
// Does not take ownerhsip of |shipping_manager|.
CobaltEncoderFactoryImpl(std::shared_ptr<MetricRegistry> metric_registry,
std::shared_ptr<EncodingRegistry> encoding_registry,
ClientSecret client_secret,
ShippingManager* shipping_manager);
private:
void GetEncoder(int32_t project_id,
fidl::InterfaceRequest<CobaltEncoder> request);
std::shared_ptr<MetricRegistry> metric_registry_;
std::shared_ptr<EncodingRegistry> encoding_registry_;
ClientSecret client_secret_;
fidl::BindingSet<CobaltEncoder, std::unique_ptr<CobaltEncoder>>
cobalt_encoder_bindings_;
ShippingManager* shipping_manager_; // not owned
FXL_DISALLOW_COPY_AND_ASSIGN(CobaltEncoderFactoryImpl);
};
CobaltEncoderFactoryImpl::CobaltEncoderFactoryImpl(
std::shared_ptr<MetricRegistry> metric_registry,
std::shared_ptr<EncodingRegistry> encoding_registry,
ClientSecret client_secret,
ShippingManager* shipping_manager)
: metric_registry_(metric_registry),
encoding_registry_(encoding_registry),
client_secret_(std::move(client_secret)),
shipping_manager_(shipping_manager) {}
void CobaltEncoderFactoryImpl::GetEncoder(
int32_t project_id,
fidl::InterfaceRequest<CobaltEncoder> request) {
std::unique_ptr<ProjectContext> project_context(new ProjectContext(
kFuchsiaCustomerId, project_id, metric_registry_, encoding_registry_));
std::unique_ptr<CobaltEncoderImpl> cobalt_encoder_impl(new CobaltEncoderImpl(
std::move(project_context), client_secret_, shipping_manager_));
cobalt_encoder_bindings_.AddBinding(std::move(cobalt_encoder_impl),
std::move(request));
}
///////////////////////////////////////////////////////////
// class CobaltApp
///////////////////////////////////////////////////////////
class CobaltApp {
public:
CobaltApp(fxl::RefPtr<fxl::TaskRunner> task_runner,
std::chrono::seconds schedule_interval,
std::chrono::seconds min_interval);
private:
static ClientSecret getClientSecret();
SystemData system_data_;
std::unique_ptr<app::ApplicationContext> context_;
ShufflerClient shuffler_client_;
SendRetryer send_retryer_;
ShippingManager shipping_manager_;
std::shared_ptr<MetricRegistry> metric_registry_;
std::shared_ptr<EncodingRegistry> encoding_registry_;
std::unique_ptr<CobaltController> controller_impl_;
fidl::BindingSet<CobaltController> controller_bindings_;
std::unique_ptr<CobaltEncoderFactory> factory_impl_;
fidl::BindingSet<CobaltEncoderFactory> factory_bindings_;
FXL_DISALLOW_COPY_AND_ASSIGN(CobaltApp);
};
CobaltApp::CobaltApp(fxl::RefPtr<fxl::TaskRunner> task_runner,
std::chrono::seconds schedule_interval,
std::chrono::seconds min_interval)
: context_(app::ApplicationContext::CreateFromStartupInfo()),
shuffler_client_(kCloudShufflerUri, true),
send_retryer_(&shuffler_client_),
shipping_manager_(
ShippingManager::SizeParams(cobalt::kMaxBytesPerObservation,
kMaxBytesPerEnvelope,
kMaxBytesTotal,
kMinEnvelopeSendSize),
ShippingManager::ScheduleParams(schedule_interval, min_interval),
// TODO(rudominer): Enable encryption.
ShippingManager::EnvelopeMakerParams(&system_data_,
"",
EncryptedMessage::NONE,
"",
EncryptedMessage::NONE),
ShippingManager::SendRetryerParams(kInitialRpcDeadline,
kDeadlinePerSendAttempt),
&send_retryer_),
controller_impl_(new CobaltControllerImpl(std::move(task_runner),
&shipping_manager_)) {
shipping_manager_.Start();
// Parse the metric config string
auto metric_parse_result =
MetricRegistry::FromString(cobalt::kMetricConfigText, nullptr);
// TODO(rudominer) Checkfailing is probably not the right thing to do.
FXL_CHECK(cobalt::config::kOK == metric_parse_result.second);
metric_registry_.reset(metric_parse_result.first.release());
// Parse the encoding config string
auto encoding_parse_result =
EncodingRegistry::FromString(cobalt::kEncodingConfigText, nullptr);
FXL_CHECK(cobalt::config::kOK == encoding_parse_result.second);
encoding_registry_.reset(encoding_parse_result.first.release());
factory_impl_.reset(
new CobaltEncoderFactoryImpl(metric_registry_, encoding_registry_,
getClientSecret(), &shipping_manager_));
context_->outgoing_services()->AddService<CobaltEncoderFactory>(
[this](fidl::InterfaceRequest<CobaltEncoderFactory> request) {
factory_bindings_.AddBinding(factory_impl_.get(), std::move(request));
});
context_->outgoing_services()->AddService<CobaltController>(
[this](fidl::InterfaceRequest<CobaltController> request) {
controller_bindings_.AddBinding(controller_impl_.get(),
std::move(request));
});
}
ClientSecret CobaltApp::getClientSecret() {
// TODO(rudominer): Generate a client secret only once, store it
// persistently and reuse it in future instances.
return ClientSecret::GenerateNewSecret();
}
} // namespace
int main(int argc, const char** argv) {
setenv(
"GRPC_DEFAULT_SSL_ROOTS_FILE_PATH", "/system/data/boringssl/cert.pem", 1);
// Parse the flags.
const auto command_line = fxl::CommandLineFromArgcArgv(argc, argv);
fxl::SetLogSettingsFromCommandLine(command_line);
// Parse the schedule_interval_seconds flag.
std::chrono::seconds schedule_interval = kScheduleIntervalDefault;
std::string flag_value;
if (command_line.GetOptionValue(kScheduleIntervalSecondsFlagName,
&flag_value)) {
int num_seconds = std::stoi(flag_value);
if (num_seconds > 0) {
schedule_interval = std::chrono::seconds(num_seconds);
}
}
// Parse the min_interval_seconds flag.
std::chrono::seconds min_interval = kMinIntervalDefault;
flag_value.clear();
if (command_line.GetOptionValue(kMinIntervalSecondsFlagName, &flag_value)) {
int num_seconds = std::stoi(flag_value);
// We allow min_interval = 0.
if (num_seconds >= 0) {
min_interval = std::chrono::seconds(num_seconds);
}
}
FXL_LOG(INFO) << "Cobalt client schedule params: schedule_interval="
<< schedule_interval.count()
<< " seconds, min_interval=" << min_interval.count()
<< " seconds.";
fsl::MessageLoop loop;
CobaltApp app(loop.task_runner(), schedule_interval, min_interval);
loop.Run();
return 0;
}