| // Copyright 2019 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "src/public/cobalt_service.h" |
| |
| #include <memory> |
| |
| #include "src/lib/util/clock.h" |
| #include "src/lib/util/datetime_util.h" |
| #include "src/lib/util/encrypted_message_util.h" |
| #include "src/lib/util/not_null.h" |
| #include "src/lib/util/posix_file_system.h" |
| #include "src/logger/internal_metrics_config.cb.h" |
| #include "src/logger/project_context.h" |
| #include "src/logger/project_context_factory.h" |
| #include "src/observation_store/file_observation_store.h" |
| #include "src/observation_store/memory_observation_store.h" |
| #include "src/observation_store/observation_store.h" |
| #include "src/public/lib/clock_interfaces.h" |
| #include "src/public/lib/registry_identifiers.h" |
| #include "src/public/lib/statusor/statusor.h" |
| #include "src/system_data/configuration_data.h" |
| #include "src/uploader/shipping_manager.h" |
| |
| namespace cobalt { |
| |
| namespace { |
| |
| util::NotNullUniquePtr<observation_store::ObservationStore> NewObservationStore( |
| const CobaltConfig &cfg, util::FileSystem &fs, DiagnosticsInterface *diagnostics) { |
| if (cfg.use_memory_observation_store) { |
| return util::MakeNotNullUniquePtr<observation_store::MemoryObservationStore>( |
| cfg.max_bytes_per_event, cfg.max_bytes_per_envelope, cfg.max_bytes_total); |
| } |
| return util::MakeNotNullUniquePtr<observation_store::FileObservationStore>( |
| cfg.max_bytes_per_event, cfg.max_bytes_per_envelope, cfg.max_bytes_total, fs, diagnostics, |
| cfg.observation_store_directory, "V1 FileObservationStore"); |
| } |
| |
| util::NotNullUniquePtr<util::EncryptedMessageMaker> GetEncryptToAnalyzer(CobaltConfig *cfg) { |
| if (auto key = cfg->target_pipeline->analyzer_encryption_key()) { |
| return util::EncryptedMessageMaker::MakeForObservations(*key).value(); |
| } |
| |
| return util::EncryptedMessageMaker::MakeUnencrypted(); |
| } |
| |
| util::NotNullUniquePtr<util::EncryptedMessageMaker> GetEncryptToShuffler( |
| TargetPipelineInterface *pipeline) { |
| if (auto key = pipeline->shuffler_encryption_key()) { |
| return util::EncryptedMessageMaker::MakeForEnvelopes(*key).value(); |
| } |
| |
| return util::EncryptedMessageMaker::MakeUnencrypted(); |
| } |
| |
| util::NotNullUniquePtr<uploader::ShippingManager> NewShippingManager( |
| CobaltConfig *cfg, util::FileSystem &fs, observation_store::ObservationStore &observation_store, |
| DiagnosticsInterface *diagnostics, util::EncryptedMessageMaker *encrypt_to_analyzer, |
| const util::PinnedUniquePtr<util::EncryptedMessageMaker> &encrypt_to_shuffler, |
| const uploader::UploadScheduler &upload_scheduler) { |
| if (cfg->target_pipeline->environment() == system_data::Environment::LOCAL) { |
| return util::MakeNotNullUniquePtr<uploader::LocalShippingManager>( |
| observation_store, encrypt_to_analyzer, cfg->local_shipping_manager_path, fs); |
| } |
| |
| std::unique_ptr<lib::clearcut::ClearcutUploaderInterface> clearcut_uploader = |
| cfg->target_pipeline->TakeClearcutUploader(); |
| if (!clearcut_uploader) { |
| clearcut_uploader = std::make_unique<lib::clearcut::ClearcutUploader>( |
| cfg->target_pipeline->clearcut_endpoint(), |
| util::WrapNotNullUniquePtrOrDefault<lib::EmptyHTTPClient>( |
| cfg->target_pipeline->TakeHttpClient())); |
| } |
| |
| return {util::MakeNotNullUniquePtr<uploader::ClearcutV1ShippingManager>( |
| upload_scheduler, observation_store, encrypt_to_shuffler.get(), encrypt_to_analyzer, |
| std::move(clearcut_uploader), diagnostics, |
| system_data::ConfigurationData(cfg->target_pipeline->environment()).GetLogSourceId(), |
| cfg->target_pipeline->clearcut_max_retries(), cfg->api_key)}; |
| } |
| |
| } // namespace |
| |
| lib::statusor::StatusOr<std::unique_ptr<CobaltService>> CobaltService::Create(CobaltConfig cfg) { |
| lib::statusor::StatusOr<util::NotNullUniquePtr<CobaltRegistry>> global_registry = |
| util::WrapNotNullUniquePtr(std::move(cfg.global_registry)); |
| if (!global_registry.ok()) { |
| return util::StatusBuilder(global_registry.status()) |
| .SetCode(StatusCode::INVALID_ARGUMENT) |
| .AppendMsg(": CobaltService requires a non-null global_registry") |
| .Build(); |
| } |
| |
| CB_ASSIGN_OR_RETURN(uploader::UploadScheduler upload_scheduler, |
| uploader::UploadScheduler::Create(cfg.upload_schedule_cfg)); |
| logger::ProjectContextFactory global_project_context_factory(std::move(global_registry.value())); |
| |
| return std::unique_ptr<CobaltService>(new CobaltService( |
| std::move(cfg), std::move(global_project_context_factory), upload_scheduler)); |
| } |
| |
| lib::statusor::StatusOr<std::unique_ptr<ClocklessCobaltServiceInterface>> |
| CobaltService::CreateClockless(CobaltConfig cfg) { |
| return Create(std::move(cfg)); |
| } |
| |
| void CobaltService::CleanUpLegacyFileOrDirectory(const std::string &file_or_directory) { |
| if (!file_or_directory.empty() && fs_->FileExists(file_or_directory)) { |
| if (!fs_->DeleteRecursive(file_or_directory)) { |
| LOG(ERROR) << "Unable to clean up legacy path " << file_or_directory; |
| } |
| } |
| } |
| |
| CobaltService::CobaltService(CobaltConfig cfg, |
| logger::ProjectContextFactory &&global_project_context_factory, |
| const uploader::UploadScheduler &upload_scheduler) |
| : fs_(util::WrapNotNullUniquePtrOrDefault<util::PosixFileSystem>(std::move(cfg.file_system))), |
| system_data_(cfg.product_name, cfg.board_name_suggestion, cfg.release_stage, cfg.version, |
| cfg.build_type, cfg.experiment_ids), |
| global_project_context_factory_(std::move(global_project_context_factory)), |
| diagnostics_(std::move(cfg.diagnostics)), |
| observation_store_(NewObservationStore(cfg, *fs_, diagnostics_.get())), |
| encrypt_to_analyzer_(GetEncryptToAnalyzer(&cfg)), |
| encrypt_to_shuffler_(GetEncryptToShuffler(cfg.target_pipeline.get())), |
| shipping_manager_(NewShippingManager(&cfg, *fs_, *observation_store_, diagnostics_.get(), |
| encrypt_to_analyzer_.get(), encrypt_to_shuffler_, |
| upload_scheduler)), |
| observation_writer_(*observation_store_, shipping_manager_.get(), encrypt_to_analyzer_.get()), |
| civil_time_converter_(util::WrapNotNullUniquePtrOrDefault<util::UtcTimeConverter>( |
| std::move(cfg.civil_time_converter))), |
| local_aggregation_(cfg, global_project_context_factory_, system_data_, *fs_, |
| observation_writer_, *civil_time_converter_), |
| undated_event_manager_(new logger::UndatedEventManager( |
| local_aggregation_, observation_writer_, system_data_, *civil_time_converter_)), |
| validated_clock_(cfg.validated_clock), |
| internal_logger_( |
| NewLogger(cobalt::logger::kCustomerId, cobalt::logger::kProjectId, false, {})), |
| internal_metrics_( |
| logger::InternalMetrics::NewWithLogger(internal_logger_.get(), diagnostics_.get())), |
| start_worker_threads_(cfg.start_worker_threads) { |
| CleanUpLegacyFileOrDirectory(cfg.local_aggregate_proto_store_path); |
| CleanUpLegacyFileOrDirectory(cfg.obs_history_proto_store_path); |
| CleanUpLegacyFileOrDirectory(cfg.system_data_cache_path); |
| |
| if (!internal_logger_) { |
| LOG(ERROR) << "The global_registry provided does not include the expected internal metrics " |
| "project. Cobalt-measuring-cobalt will be disabled."; |
| } else { |
| // Set up internal metrics for various components. |
| shipping_manager_->ResetInternalMetrics(internal_metrics_.get()); |
| observation_store_->ResetInternalMetrics(internal_metrics_.get()); |
| system_data_.ResetInternalMetrics(internal_metrics_.get()); |
| local_aggregation_.ResetInternalMetrics(internal_metrics_.get()); |
| } |
| if (start_worker_threads_) { |
| shipping_manager_->Start(); |
| } |
| } |
| |
| std::unique_ptr<logger::LoggerInterface> CobaltService::NewLogger( |
| std::unique_ptr<logger::ProjectContext> project_context) { |
| return NewLogger(std::move(project_context), true, {}); |
| } |
| |
| std::unique_ptr<logger::LoggerInterface> CobaltService::NewLogger(uint32_t customer_id, |
| uint32_t project_id) { |
| return NewLogger(customer_id, project_id, true, {}); |
| } |
| |
| std::unique_ptr<logger::AtTimePointLoggerInterface> CobaltService::NewAtTimePointLogger( |
| uint32_t customer_id, uint32_t project_id) { |
| return NewLogger(global_project_context_factory_.NewProjectContext( |
| lib::CustomerIdentifier(customer_id).ForProject(project_id)), |
| true, {}); |
| } |
| |
| std::unique_ptr<logger::LoggerInterface> CobaltService::NewLogger( |
| uint32_t customer_id, uint32_t project_id, std::vector<uint32_t> experiment_ids) { |
| return NewLogger(customer_id, project_id, true, std::move(experiment_ids)); |
| } |
| |
| std::unique_ptr<logger::Logger> CobaltService::NewLogger(uint32_t customer_id, uint32_t project_id, |
| bool include_internal_logger, |
| std::vector<uint32_t> experiment_ids) { |
| return NewLogger(global_project_context_factory_.NewProjectContext( |
| lib::CustomerIdentifier(customer_id).ForProject(project_id)), |
| include_internal_logger, std::move(experiment_ids)); |
| } |
| |
| std::unique_ptr<logger::Logger> CobaltService::NewLogger( |
| std::unique_ptr<logger::ProjectContext> project_context_in, bool include_internal_logger, |
| std::vector<uint32_t> experiment_ids) { |
| lib::statusor::StatusOr<util::NotNullUniquePtr<logger::ProjectContext>> not_null_project_context = |
| util::WrapNotNullUniquePtr(std::move(project_context_in)); |
| |
| if (!not_null_project_context.ok()) { |
| return nullptr; |
| } |
| |
| util::NotNullUniquePtr<logger::ProjectContext> project_context = |
| std::move(not_null_project_context.value()); |
| |
| cobalt::logger::InternalMetrics *metrics = |
| include_internal_logger ? internal_metrics_.get() : nullptr; |
| if (undated_event_manager_) { |
| return std::make_unique<logger::Logger>(std::move(project_context), local_aggregation_, |
| observation_writer_, system_data_, validated_clock_, |
| *civil_time_converter_, undated_event_manager_, |
| std::move(experiment_ids), metrics); |
| } |
| return std::make_unique<logger::Logger>(std::move(project_context), local_aggregation_, |
| observation_writer_, system_data_, *civil_time_converter_, |
| std::move(experiment_ids), metrics); |
| } |
| |
| void CobaltService::SystemClockIsAccurate(std::unique_ptr<util::SystemClockInterface> system_clock, |
| bool start_event_aggregator_worker) { |
| if (undated_event_manager_) { |
| undated_event_manager_->Flush(system_clock.get(), internal_metrics_.get()); |
| undated_event_manager_.reset(); |
| } |
| |
| if (start_event_aggregator_worker && start_worker_threads_) { |
| local_aggregation_.Start(std::make_unique<util::SystemClockRef>(system_clock.get())); |
| } |
| |
| system_clock_ = std::move(system_clock); |
| } |
| |
| void CobaltService::SetDataCollectionPolicy(CobaltService::DataCollectionPolicy policy) { |
| switch (policy) { |
| case CobaltService::DataCollectionPolicy::COLLECT_AND_UPLOAD: |
| LOG(INFO) << "CobaltService: Switch to DataCollectionPolicy COLLECT_AND_UPLOAD"; |
| observation_store_->Disable(false); |
| local_aggregation_.Disable(false); |
| shipping_manager_->Disable(false); |
| break; |
| case CobaltService::DataCollectionPolicy::DO_NOT_UPLOAD: |
| LOG(INFO) << "CobaltService: Switch to DataCollectionPolicy DO_NOT_UPLOAD"; |
| observation_store_->Disable(false); |
| local_aggregation_.Disable(false); |
| shipping_manager_->Disable(true); |
| break; |
| case CobaltService::DataCollectionPolicy::DO_NOT_COLLECT: |
| LOG(INFO) << "CobaltService: Switch to DataCollectionPolicy DO_NOT_COLLECT"; |
| observation_store_->Disable(true); |
| observation_store_->DeleteData(); |
| |
| local_aggregation_.Disable(true); |
| local_aggregation_.DeleteData(); |
| |
| shipping_manager_->Disable(true); |
| break; |
| } |
| } |
| |
| Status CobaltService::GenerateAggregatedObservations() { |
| std::chrono::system_clock::time_point now = system_clock_->now(); |
| |
| Status status = local_aggregation_.GenerateAggregatedObservations(now); |
| if (!status.ok()) { |
| LOG(ERROR) << "Got error while trying to GenerateAggregatedObservations: " |
| << status.error_message(); |
| return status; |
| } |
| |
| return Status::OkStatus(); |
| } |
| |
| Status CobaltService::GenerateAggregatedObservations(uint32_t final_day_index_utc) { |
| // In order to generate observations for day indices less than or equal to `final_day_index_utc`, |
| // the time passed to `GenerateAggregatedObservations` must belong to the following day. Use the |
| // first hour of the day after `final_day_index_utc` to generate observations. |
| Status status = local_aggregation_.GenerateAggregatedObservations(util::FromUnixSeconds( |
| util::HourIdToUnixSeconds(util::DayIndexToHourIdNoDst(final_day_index_utc + 1)))); |
| |
| if (!status.ok()) { |
| LOG(ERROR) << "Got error while trying to GenerateAggregatedObservations: " << status; |
| return status; |
| } |
| |
| return Status::OkStatus(); |
| } |
| |
| Status CobaltService::GenerateAggregatedObservations( |
| const std::chrono::system_clock::time_point ×tamp) { |
| Status status = local_aggregation_.GenerateAggregatedObservations(timestamp); |
| if (!status.ok()) { |
| LOG(ERROR) << "Got error while trying to GenerateAggregatedObservations: " |
| << status.error_message(); |
| return status; |
| } |
| |
| return Status::OkStatus(); |
| } |
| |
| } // namespace cobalt |