Remove the ability to write to multiple backend envoironments.
This depends on fxr/354572 which removes the use of this functionality
from Fuchsia.
Bug: 37134
Change-Id: Icab51c8ab2364640536ef77c74fba55a2d4e05c8
diff --git a/src/public/cobalt_config.h b/src/public/cobalt_config.h
index 6bd4d6d..09bfb81 100644
--- a/src/public/cobalt_config.h
+++ b/src/public/cobalt_config.h
@@ -65,21 +65,6 @@
~LocalPipeline() override = default;
};
-class ExtraPipeline : public TargetPipelineInterface {
- public:
- ExtraPipeline(system_data::Environment environment, std::string shuffler_encryption_key)
- : TargetPipelineInterface(environment),
- shuffler_encryption_key_(std::move(shuffler_encryption_key)) {}
- ~ExtraPipeline() override = default;
-
- [[nodiscard]] std::optional<std::string> shuffler_encryption_key() const override {
- return shuffler_encryption_key_;
- }
-
- private:
- std::string shuffler_encryption_key_;
-};
-
class TargetPipeline : public TargetPipelineInterface {
public:
TargetPipeline(system_data::Environment environment, std::string shuffler_encryption_key,
@@ -192,9 +177,6 @@
// |target_pipeline|: Used to determine where to send observations, and how to encrypt them.
std::unique_ptr<TargetPipelineInterface> target_pipeline;
- // |extra_pipelines|: A list of extra pipelines where observations should be sent.
- std::vector<std::unique_ptr<TargetPipelineInterface>> extra_pipelines;
-
// |local_shipping_manager_path|: If |environments| is equal to {LOCAL}, the observations will be
// written to this path, instead of being shipped to clearcut.
std::string local_shipping_manager_path;
diff --git a/src/public/cobalt_service.cc b/src/public/cobalt_service.cc
index 43fbd17..8c4c1c7 100644
--- a/src/public/cobalt_service.cc
+++ b/src/public/cobalt_service.cc
@@ -6,7 +6,6 @@
#include <memory>
-#include "src/lib/clearcut/http_client.h"
#include "src/lib/util/clock.h"
#include "src/lib/util/encrypted_message_util.h"
#include "src/logger/project_context.h"
@@ -59,44 +58,21 @@
return util::EncryptedMessageMaker::MakeUnencrypted();
}
-std::vector<std::unique_ptr<util::EncryptedMessageMaker>> GetExtraEncryptToShufflers(
- const std::vector<std::unique_ptr<TargetPipelineInterface>> &pipelines) {
- std::vector<std::unique_ptr<util::EncryptedMessageMaker>> retval;
-
- retval.reserve(pipelines.size());
- for (const auto &pipeline : pipelines) {
- retval.emplace_back(GetEncryptToShuffler(pipeline.get()));
- }
-
- return retval;
-}
-
std::unique_ptr<uploader::ShippingManager> NewShippingManager(
CobaltConfig *cfg, util::FileSystem *fs, observation_store::ObservationStore *observation_store,
util::EncryptedMessageMaker *encrypt_to_analyzer,
- const std::unique_ptr<util::EncryptedMessageMaker> &encrypt_to_shuffler,
- const std::vector<std::unique_ptr<util::EncryptedMessageMaker>> &extra_encrypt_to_shufflers) {
+ const std::unique_ptr<util::EncryptedMessageMaker> &encrypt_to_shuffler) {
if (cfg->target_pipeline->environment() == system_data::Environment::LOCAL) {
- CHECK(cfg->extra_pipelines.empty())
- << "Only one backend environment is supported if one is LOCAL.";
return std::make_unique<uploader::LocalShippingManager>(observation_store,
cfg->local_shipping_manager_path, fs);
}
auto shipping_manager = std::make_unique<uploader::ClearcutV1ShippingManager>(
uploader::UploadScheduler(cfg->target_interval, cfg->min_interval, cfg->initial_interval),
- observation_store, encrypt_to_analyzer,
+ observation_store, encrypt_to_shuffler.get(), encrypt_to_analyzer,
std::make_unique<lib::clearcut::ClearcutUploader>(cfg->target_pipeline->clearcut_endpoint(),
cfg->target_pipeline->TakeHttpClient()),
- nullptr, cfg->target_pipeline->clearcut_max_retries(), cfg->api_key);
-
- shipping_manager->AddClearcutDestination(
- encrypt_to_shuffler.get(),
- system_data::ConfigurationData(cfg->target_pipeline->environment()).GetLogSourceId());
- for (int i = 0; i < cfg->extra_pipelines.size(); i++) {
- shipping_manager->AddClearcutDestination(
- extra_encrypt_to_shufflers[i].get(),
- system_data::ConfigurationData(cfg->extra_pipelines[i]->environment()).GetLogSourceId());
- }
+ system_data::ConfigurationData(cfg->target_pipeline->environment()).GetLogSourceId(), nullptr,
+ cfg->target_pipeline->clearcut_max_retries(), cfg->api_key);
return std::move(shipping_manager);
}
@@ -109,10 +85,8 @@
observation_store_(NewObservationStore(cfg, fs_.get())),
encrypt_to_analyzer_(GetEncryptToAnalyzer(&cfg)),
encrypt_to_shuffler_(GetEncryptToShuffler(cfg.target_pipeline.get())),
- extra_encrypt_to_shufflers_(GetExtraEncryptToShufflers(cfg.extra_pipelines)),
shipping_manager_(NewShippingManager(&cfg, fs_.get(), observation_store_.get(),
- encrypt_to_analyzer_.get(), encrypt_to_shuffler_,
- extra_encrypt_to_shufflers_)),
+ encrypt_to_analyzer_.get(), encrypt_to_shuffler_)),
logger_encoder_(cfg.client_secret, &system_data_),
observation_writer_(observation_store_.get(), shipping_manager_.get(),
encrypt_to_analyzer_.get()),
diff --git a/src/public/cobalt_service.h b/src/public/cobalt_service.h
index 2014832..5b49062 100644
--- a/src/public/cobalt_service.h
+++ b/src/public/cobalt_service.h
@@ -112,7 +112,6 @@
std::unique_ptr<observation_store::ObservationStore> observation_store_;
std::unique_ptr<util::EncryptedMessageMaker> encrypt_to_analyzer_;
std::unique_ptr<util::EncryptedMessageMaker> encrypt_to_shuffler_;
- std::vector<std::unique_ptr<util::EncryptedMessageMaker>> extra_encrypt_to_shufflers_;
std::unique_ptr<uploader::ShippingManager> shipping_manager_;
logger::Encoder logger_encoder_;
logger::ObservationWriter observation_writer_;
diff --git a/src/uploader/shipping_manager.cc b/src/uploader/shipping_manager.cc
index 62cca21..79e1596 100644
--- a/src/uploader/shipping_manager.cc
+++ b/src/uploader/shipping_manager.cc
@@ -304,27 +304,13 @@
util::EncryptedMessageMaker* encrypt_to_analyzer,
std::unique_ptr<lib::clearcut::ClearcutUploader> clearcut, int32_t log_source_id,
logger::LoggerInterface* internal_logger, size_t max_attempts_per_upload, std::string api_key)
- : ClearcutV1ShippingManager(upload_scheduler, observation_store, encrypt_to_analyzer,
- std::move(clearcut), internal_logger, max_attempts_per_upload,
- std::move(api_key)) {
- AddClearcutDestination(encrypt_to_shuffler, log_source_id);
-}
-
-ClearcutV1ShippingManager::ClearcutV1ShippingManager(
- const UploadScheduler& upload_scheduler, ObservationStore* observation_store,
- util::EncryptedMessageMaker* encrypt_to_analyzer,
- std::unique_ptr<lib::clearcut::ClearcutUploader> clearcut,
- logger::LoggerInterface* internal_logger, size_t max_attempts_per_upload, std::string api_key)
: ShippingManager(upload_scheduler, observation_store, encrypt_to_analyzer),
max_attempts_per_upload_(max_attempts_per_upload),
clearcut_(std::move(clearcut)),
internal_metrics_(logger::InternalMetrics::NewWithLogger(internal_logger)),
- api_key_(std::move(api_key)) {}
-
-void ClearcutV1ShippingManager::AddClearcutDestination(
- util::EncryptedMessageMaker* encrypt_to_shuffler, int32_t log_source_id) {
- clearcut_destinations_.emplace_back(ClearcutDestination({encrypt_to_shuffler, log_source_id}));
-}
+ api_key_(std::move(api_key)),
+ encrypt_to_shuffler_(encrypt_to_shuffler),
+ log_source_id_(log_source_id) {}
void ClearcutV1ShippingManager::ResetInternalMetrics(logger::LoggerInterface* internal_logger) {
internal_metrics_ = logger::InternalMetrics::NewWithLogger(internal_logger);
@@ -336,29 +322,21 @@
auto envelope = envelope_to_send->GetEnvelope(encrypt_to_analyzer_);
envelope.set_api_key(api_key_);
- bool error_occurred = false;
- for (const auto& clearcut_destination : clearcut_destinations_) {
- util::Status status =
- SendEnvelopeToClearcutDestination(envelope, envelope_to_send->Size(), clearcut_destination);
- if (!status.ok()) {
- error_occurred = true;
- VLOG(4) << name() << ": Cobalt send to Shuffler failed: (" << status.error_code() << ") "
- << status.error_message() << ". Observations have been re-enqueued for later.";
- }
- }
- if (error_occurred) {
+ util::Status status = SendEnvelopeToClearcutDestination(envelope, envelope_to_send->Size());
+ if (!status.ok()) {
+ VLOG(4) << name() << ": Cobalt send to Shuffler failed: (" << status.error_code() << ") "
+ << status.error_message() << ". Observations have been re-enqueued for later.";
return envelope_to_send;
}
return nullptr;
}
-util::Status ClearcutV1ShippingManager::SendEnvelopeToClearcutDestination(
- const Envelope& envelope, size_t envelope_size,
- const ClearcutDestination& clearcut_destination) {
+util::Status ClearcutV1ShippingManager::SendEnvelopeToClearcutDestination(const Envelope& envelope,
+ size_t envelope_size) {
auto log_extension = std::make_unique<LogEventExtension>();
- if (!clearcut_destination.encrypt_to_shuffler_->Encrypt(
- envelope, log_extension->mutable_cobalt_encrypted_envelope())) {
+ if (!encrypt_to_shuffler_->Encrypt(envelope,
+ log_extension->mutable_cobalt_encrypted_envelope())) {
// TODO(rudominer) log
// Drop on floor.
return util::Status::OK;
@@ -375,7 +353,7 @@
<< " bytes to clearcut.";
lib::clearcut::LogRequest request;
- request.set_log_source(clearcut_destination.log_source_id_);
+ request.set_log_source(log_source_id_);
request.add_log_event()->SetAllocatedExtension(LogEventExtension::ext, log_extension.release());
util::Status status;
diff --git a/src/uploader/shipping_manager.h b/src/uploader/shipping_manager.h
index c7732ae..2e31291 100644
--- a/src/uploader/shipping_manager.h
+++ b/src/uploader/shipping_manager.h
@@ -245,35 +245,10 @@
size_t max_attempts_per_upload = lib::clearcut::kMaxRetries,
std::string api_key = "cobalt-default-api-key");
- // Create a shipping manager that can upload data to Clearcut.
- //
- // Call AddClearcutDestination to add Clearcut log sources to write to and the encryption key to
- // use when doing so.
- //
- // TODO(camrdale): remove this once the log source transition is complete.
- ClearcutV1ShippingManager(const UploadScheduler& upload_scheduler,
- observation_store::ObservationStore* observation_store,
- util::EncryptedMessageMaker* encrypt_to_analyzer,
- std::unique_ptr<lib::clearcut::ClearcutUploader> clearcut,
- logger::LoggerInterface* internal_logger = nullptr,
- size_t max_attempts_per_upload = lib::clearcut::kMaxRetries,
- std::string api_key = "cobalt-default-api-key");
-
// The destructor will stop the worker thread and wait for it to stop
// before exiting.
~ClearcutV1ShippingManager() override = default;
- // Add a Clearcut log source to write to, and the encryption key to use when doing so.
- //
- // encrypt_to_shuffler: An EncryptedMessageMaker used to encrypt
- // Envelopes to the shuffler.
- //
- // log_source_id: the Clearcut log source to write to.
- //
- // TODO(camrdale): remove this once the log source transition is complete.
- void AddClearcutDestination(util::EncryptedMessageMaker* encrypt_to_shuffler,
- int32_t log_source_id);
-
// Resets the internal metrics for the ShippingManager and the ClearcutUploader to use the
// provided logger.
void ResetInternalMetrics(logger::LoggerInterface* internal_logger) override;
@@ -283,13 +258,7 @@
std::unique_ptr<observation_store::ObservationStore::EnvelopeHolder> envelope_to_send)
override;
- struct ClearcutDestination {
- util::EncryptedMessageMaker* encrypt_to_shuffler_;
- const int32_t log_source_id_;
- };
-
- util::Status SendEnvelopeToClearcutDestination(const Envelope& envelope, size_t envelope_size,
- const ClearcutDestination& clearcut_destination);
+ util::Status SendEnvelopeToClearcutDestination(const Envelope& envelope, size_t envelope_size);
[[nodiscard]] std::string name() const override { return "ClearcutV1ShippingManager"; }
@@ -299,7 +268,8 @@
std::unique_ptr<lib::clearcut::ClearcutUploader> clearcut_;
std::unique_ptr<logger::InternalMetrics> internal_metrics_;
const std::string api_key_;
- std::vector<ClearcutDestination> clearcut_destinations_;
+ util::EncryptedMessageMaker* encrypt_to_shuffler_;
+ const int32_t log_source_id_;
};
// A concrete subclass of ShippingManager for capturing data locally to a file.