blob: 8f5a69de27af4d13330e2eddba68d14fd2fa8e9a [file] [log] [blame]
// Copyright 2019 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "src/public/cobalt_service.h"
#include <memory>
#include "src/lib/util/clock.h"
#include "src/lib/util/datetime_util.h"
#include "src/lib/util/encrypted_message_util.h"
#include "src/lib/util/not_null.h"
#include "src/lib/util/posix_file_system.h"
#include "src/logger/internal_metrics_config.cb.h"
#include "src/logger/project_context.h"
#include "src/logger/project_context_factory.h"
#include "src/observation_store/file_observation_store.h"
#include "src/observation_store/memory_observation_store.h"
#include "src/observation_store/observation_store.h"
#include "src/public/lib/clock_interfaces.h"
#include "src/public/lib/registry_identifiers.h"
#include "src/public/lib/statusor/statusor.h"
#include "src/system_data/configuration_data.h"
#include "src/uploader/shipping_manager.h"
namespace cobalt {
namespace {
util::NotNullUniquePtr<observation_store::ObservationStore> NewObservationStore(
const CobaltConfig &cfg, util::FileSystem &fs, DiagnosticsInterface *diagnostics) {
if (cfg.use_memory_observation_store) {
return util::MakeNotNullUniquePtr<observation_store::MemoryObservationStore>(
cfg.max_bytes_per_event, cfg.max_bytes_per_envelope, cfg.max_bytes_total);
}
return util::MakeNotNullUniquePtr<observation_store::FileObservationStore>(
cfg.max_bytes_per_event, cfg.max_bytes_per_envelope, cfg.max_bytes_total, fs, diagnostics,
cfg.observation_store_directory, "V1 FileObservationStore");
}
util::NotNullUniquePtr<util::EncryptedMessageMaker> GetEncryptToAnalyzer(CobaltConfig *cfg) {
if (auto key = cfg->target_pipeline->analyzer_encryption_key()) {
return util::EncryptedMessageMaker::MakeForObservations(*key).value();
}
return util::EncryptedMessageMaker::MakeUnencrypted();
}
util::NotNullUniquePtr<util::EncryptedMessageMaker> GetEncryptToShuffler(
TargetPipelineInterface *pipeline) {
if (auto key = pipeline->shuffler_encryption_key()) {
return util::EncryptedMessageMaker::MakeForEnvelopes(*key).value();
}
return util::EncryptedMessageMaker::MakeUnencrypted();
}
util::NotNullUniquePtr<uploader::ShippingManager> NewShippingManager(
CobaltConfig *cfg, util::FileSystem &fs, observation_store::ObservationStore &observation_store,
DiagnosticsInterface *diagnostics, util::EncryptedMessageMaker *encrypt_to_analyzer,
const util::PinnedUniquePtr<util::EncryptedMessageMaker> &encrypt_to_shuffler,
const uploader::UploadScheduler &upload_scheduler) {
if (cfg->target_pipeline->environment() == system_data::Environment::LOCAL) {
return util::MakeNotNullUniquePtr<uploader::LocalShippingManager>(
observation_store, encrypt_to_analyzer, cfg->local_shipping_manager_path, fs);
}
std::unique_ptr<lib::clearcut::ClearcutUploaderInterface> clearcut_uploader =
cfg->target_pipeline->TakeClearcutUploader();
if (!clearcut_uploader) {
clearcut_uploader = std::make_unique<lib::clearcut::ClearcutUploader>(
cfg->target_pipeline->clearcut_endpoint(),
util::WrapNotNullUniquePtrOrDefault<lib::EmptyHTTPClient>(
cfg->target_pipeline->TakeHttpClient()));
}
return {util::MakeNotNullUniquePtr<uploader::ClearcutV1ShippingManager>(
upload_scheduler, observation_store, encrypt_to_shuffler.get(), encrypt_to_analyzer,
std::move(clearcut_uploader), diagnostics,
system_data::ConfigurationData(cfg->target_pipeline->environment()).GetLogSourceId(),
cfg->target_pipeline->clearcut_max_retries(), cfg->api_key)};
}
} // namespace
lib::statusor::StatusOr<std::unique_ptr<CobaltService>> CobaltService::Create(CobaltConfig cfg) {
lib::statusor::StatusOr<util::NotNullUniquePtr<CobaltRegistry>> global_registry =
util::WrapNotNullUniquePtr(std::move(cfg.global_registry));
if (!global_registry.ok()) {
return util::StatusBuilder(global_registry.status())
.SetCode(StatusCode::INVALID_ARGUMENT)
.AppendMsg(": CobaltService requires a non-null global_registry")
.Build();
}
CB_ASSIGN_OR_RETURN(uploader::UploadScheduler upload_scheduler,
uploader::UploadScheduler::Create(cfg.upload_schedule_cfg));
logger::ProjectContextFactory global_project_context_factory(std::move(global_registry.value()));
return std::unique_ptr<CobaltService>(new CobaltService(
std::move(cfg), std::move(global_project_context_factory), upload_scheduler));
}
lib::statusor::StatusOr<std::unique_ptr<ClocklessCobaltServiceInterface>>
CobaltService::CreateClockless(CobaltConfig cfg) {
return Create(std::move(cfg));
}
void CobaltService::CleanUpLegacyFileOrDirectory(const std::string &file_or_directory) {
if (!file_or_directory.empty() && fs_->FileExists(file_or_directory)) {
if (!fs_->DeleteRecursive(file_or_directory)) {
LOG(ERROR) << "Unable to clean up legacy path " << file_or_directory;
}
}
}
CobaltService::CobaltService(CobaltConfig cfg,
logger::ProjectContextFactory &&global_project_context_factory,
const uploader::UploadScheduler &upload_scheduler)
: fs_(util::WrapNotNullUniquePtrOrDefault<util::PosixFileSystem>(std::move(cfg.file_system))),
system_data_(cfg.product_name, cfg.board_name_suggestion, cfg.release_stage, cfg.version,
cfg.build_type, cfg.experiment_ids),
global_project_context_factory_(std::move(global_project_context_factory)),
diagnostics_(std::move(cfg.diagnostics)),
observation_store_(NewObservationStore(cfg, *fs_, diagnostics_.get())),
encrypt_to_analyzer_(GetEncryptToAnalyzer(&cfg)),
encrypt_to_shuffler_(GetEncryptToShuffler(cfg.target_pipeline.get())),
shipping_manager_(NewShippingManager(&cfg, *fs_, *observation_store_, diagnostics_.get(),
encrypt_to_analyzer_.get(), encrypt_to_shuffler_,
upload_scheduler)),
observation_writer_(*observation_store_, shipping_manager_.get(), encrypt_to_analyzer_.get()),
civil_time_converter_(util::WrapNotNullUniquePtrOrDefault<util::UtcTimeConverter>(
std::move(cfg.civil_time_converter))),
local_aggregation_(cfg, global_project_context_factory_, system_data_, *fs_,
observation_writer_, *civil_time_converter_),
undated_event_manager_(new logger::UndatedEventManager(
local_aggregation_, observation_writer_, system_data_, *civil_time_converter_)),
validated_clock_(cfg.validated_clock),
internal_logger_(
NewLogger(cobalt::logger::kCustomerId, cobalt::logger::kProjectId, false, {})),
internal_metrics_(
logger::InternalMetrics::NewWithLogger(internal_logger_.get(), diagnostics_.get())),
start_worker_threads_(cfg.start_worker_threads) {
CleanUpLegacyFileOrDirectory(cfg.local_aggregate_proto_store_path);
CleanUpLegacyFileOrDirectory(cfg.obs_history_proto_store_path);
CleanUpLegacyFileOrDirectory(cfg.system_data_cache_path);
if (!internal_logger_) {
LOG(ERROR) << "The global_registry provided does not include the expected internal metrics "
"project. Cobalt-measuring-cobalt will be disabled.";
} else {
// Set up internal metrics for various components.
shipping_manager_->ResetInternalMetrics(internal_metrics_.get());
observation_store_->ResetInternalMetrics(internal_metrics_.get());
system_data_.ResetInternalMetrics(internal_metrics_.get());
local_aggregation_.ResetInternalMetrics(internal_metrics_.get());
}
if (start_worker_threads_) {
shipping_manager_->Start();
}
}
std::unique_ptr<logger::LoggerInterface> CobaltService::NewLogger(
std::unique_ptr<logger::ProjectContext> project_context) {
return NewLogger(std::move(project_context), true, {});
}
std::unique_ptr<logger::LoggerInterface> CobaltService::NewLogger(uint32_t customer_id,
uint32_t project_id) {
return NewLogger(customer_id, project_id, true, {});
}
std::unique_ptr<logger::AtTimePointLoggerInterface> CobaltService::NewAtTimePointLogger(
uint32_t customer_id, uint32_t project_id) {
return NewLogger(global_project_context_factory_.NewProjectContext(
lib::CustomerIdentifier(customer_id).ForProject(project_id)),
true, {});
}
std::unique_ptr<logger::LoggerInterface> CobaltService::NewLogger(
uint32_t customer_id, uint32_t project_id, std::vector<uint32_t> experiment_ids) {
return NewLogger(customer_id, project_id, true, std::move(experiment_ids));
}
std::unique_ptr<logger::Logger> CobaltService::NewLogger(uint32_t customer_id, uint32_t project_id,
bool include_internal_logger,
std::vector<uint32_t> experiment_ids) {
return NewLogger(global_project_context_factory_.NewProjectContext(
lib::CustomerIdentifier(customer_id).ForProject(project_id)),
include_internal_logger, std::move(experiment_ids));
}
std::unique_ptr<logger::Logger> CobaltService::NewLogger(
std::unique_ptr<logger::ProjectContext> project_context_in, bool include_internal_logger,
std::vector<uint32_t> experiment_ids) {
lib::statusor::StatusOr<util::NotNullUniquePtr<logger::ProjectContext>> not_null_project_context =
util::WrapNotNullUniquePtr(std::move(project_context_in));
if (!not_null_project_context.ok()) {
return nullptr;
}
util::NotNullUniquePtr<logger::ProjectContext> project_context =
std::move(not_null_project_context.value());
cobalt::logger::InternalMetrics *metrics =
include_internal_logger ? internal_metrics_.get() : nullptr;
if (undated_event_manager_) {
return std::make_unique<logger::Logger>(std::move(project_context), local_aggregation_,
observation_writer_, system_data_, validated_clock_,
*civil_time_converter_, undated_event_manager_,
std::move(experiment_ids), metrics);
}
return std::make_unique<logger::Logger>(std::move(project_context), local_aggregation_,
observation_writer_, system_data_, *civil_time_converter_,
std::move(experiment_ids), metrics);
}
void CobaltService::SystemClockIsAccurate(std::unique_ptr<util::SystemClockInterface> system_clock,
bool start_event_aggregator_worker) {
if (undated_event_manager_) {
undated_event_manager_->Flush(system_clock.get(), internal_metrics_.get());
undated_event_manager_.reset();
}
if (start_event_aggregator_worker && start_worker_threads_) {
local_aggregation_.Start(std::make_unique<util::SystemClockRef>(system_clock.get()));
}
system_clock_ = std::move(system_clock);
}
void CobaltService::SetDataCollectionPolicy(CobaltService::DataCollectionPolicy policy) {
switch (policy) {
case CobaltService::DataCollectionPolicy::COLLECT_AND_UPLOAD:
LOG(INFO) << "CobaltService: Switch to DataCollectionPolicy COLLECT_AND_UPLOAD";
observation_store_->Disable(false);
local_aggregation_.Disable(false);
shipping_manager_->Disable(false);
break;
case CobaltService::DataCollectionPolicy::DO_NOT_UPLOAD:
LOG(INFO) << "CobaltService: Switch to DataCollectionPolicy DO_NOT_UPLOAD";
observation_store_->Disable(false);
local_aggregation_.Disable(false);
shipping_manager_->Disable(true);
break;
case CobaltService::DataCollectionPolicy::DO_NOT_COLLECT:
LOG(INFO) << "CobaltService: Switch to DataCollectionPolicy DO_NOT_COLLECT";
observation_store_->Disable(true);
observation_store_->DeleteData();
local_aggregation_.Disable(true);
local_aggregation_.DeleteData();
shipping_manager_->Disable(true);
break;
}
}
Status CobaltService::GenerateAggregatedObservations() {
std::chrono::system_clock::time_point now = system_clock_->now();
Status status = local_aggregation_.GenerateAggregatedObservations(now);
if (!status.ok()) {
LOG(ERROR) << "Got error while trying to GenerateAggregatedObservations: "
<< status.error_message();
return status;
}
return Status::OkStatus();
}
Status CobaltService::GenerateAggregatedObservations(uint32_t final_day_index_utc) {
// In order to generate observations for day indices less than or equal to `final_day_index_utc`,
// the time passed to `GenerateAggregatedObservations` must belong to the following day. Use the
// first hour of the day after `final_day_index_utc` to generate observations.
Status status = local_aggregation_.GenerateAggregatedObservations(util::FromUnixSeconds(
util::HourIdToUnixSeconds(util::DayIndexToHourIdNoDst(final_day_index_utc + 1))));
if (!status.ok()) {
LOG(ERROR) << "Got error while trying to GenerateAggregatedObservations: " << status;
return status;
}
return Status::OkStatus();
}
Status CobaltService::GenerateAggregatedObservations(
const std::chrono::system_clock::time_point &timestamp) {
Status status = local_aggregation_.GenerateAggregatedObservations(timestamp);
if (!status.ok()) {
LOG(ERROR) << "Got error while trying to GenerateAggregatedObservations: "
<< status.error_message();
return status;
}
return Status::OkStatus();
}
} // namespace cobalt