[cobalt_service] Create CobaltService

This moves a lot of the management of the different aspects of Cobalt
core out of Fuchsia (or other clients) and in to the core repository.
All that should be necessary in the client platforms is configuring this
CobaltCore object.

This has several advantages:

1. The exposed API surface from the Cobalt core repository becomes a lot
smaller. The classes in Cobalt core that are no longer needed in Fuchsia
can have their visibility limited to just the Cobalt core code.

2. Changes to the interactions between objects in the Cobalt core
repository are easier to make, since they don't affect any external
repositories. For example, adding a field to the constructor of the
EventAggregator can be done in a single Cobalt core CL, instead of 3 CLs
(1 in core, 1 in Fuchsia, then a cleanup CL in core).

3 The Cobalt core repository becomes easier to use for new customers
(that are not Fuchsia). Instead of having to copy from Fuchsia the
complex instantiation and interaction with all the Cobalt core objects,
only the new Core object needs to be created and configured correctly
for the new customer.

Bug: 39090
Change-Id: I5f039b4deaabdcf633ad7a77a2cb2d2e1b6bd790
diff --git a/src/bin/test_app/BUILD.gn b/src/bin/test_app/BUILD.gn
index 5a557c3..38114dc 100644
--- a/src/bin/test_app/BUILD.gn
+++ b/src/bin/test_app/BUILD.gn
@@ -42,7 +42,7 @@
     "$cobalt_root/src/lib/util:posix_file_system",
     "$cobalt_root/src/lib/util:proto_serialization",
     "$cobalt_root/src/logger:project_context_factory",
-    "$cobalt_root/src/observation_store",
+    "$cobalt_root/src/public:cobalt_service",
     "$cobalt_root/src/uploader:shipping_manager",
     "//third_party/abseil-cpp",
     "//third_party/grpc:grpc++",
diff --git a/src/bin/test_app/test_app.cc b/src/bin/test_app/test_app.cc
index 9a12b14..58e739f 100644
--- a/src/bin/test_app/test_app.cc
+++ b/src/bin/test_app/test_app.cc
@@ -16,53 +16,38 @@
 
 #include "gflags/gflags.h"
 #include "src/lib/clearcut/curl_http_client.h"
-#include "src/lib/statusor/statusor.h"
 #include "src/lib/util/clock.h"
-#include "src/lib/util/consistent_proto_store.h"
 #include "src/lib/util/datetime_util.h"
 #include "src/lib/util/file_util.h"
 #include "src/lib/util/posix_file_system.h"
 #include "src/lib/util/status.h"
-#include "src/local_aggregation/event_aggregator_mgr.h"
 #include "src/local_aggregation/local_aggregation.pb.h"
-#include "src/logger/encoder.h"
 #include "src/logger/project_context.h"
 #include "src/logger/project_context_factory.h"
 #include "src/logger/status.h"
-#include "src/logger/undated_event_manager.h"
 #include "src/logging.h"
-#include "src/observation_store/memory_observation_store.h"
 #include "src/pb/observation2.pb.h"
+#include "src/public/cobalt_config.h"
+#include "src/public/cobalt_service.h"
 #include "src/registry/cobalt_registry.pb.h"
 #include "src/registry/metric_definition.pb.h"
 #include "src/registry/project_configs.h"
 #include "src/registry/report_definition.pb.h"
-#include "src/system_data/system_data.h"
 #include "src/uploader/shipping_manager.h"
+#include "src/uploader/upload_scheduler.h"
 #include "third_party/protobuf/src/google/protobuf/io/zero_copy_stream_impl.h"
 
 namespace cobalt {
 
 using google::protobuf::RepeatedPtrField;
-using local_aggregation::EventAggregatorManager;
-using logger::Encoder;
 using logger::EventValuesPtr;
 using logger::HistogramPtr;
 using logger::kOK;
-using logger::Logger;
 using logger::LoggerInterface;
-using logger::ObservationWriter;
 using logger::ProjectContext;
 using logger::ProjectContextFactory;
 using logger::Status;
-using logger::UndatedEventManager;
-using observation_store::MemoryObservationStore;
 using system_data::ClientSecret;
-using system_data::SystemData;
-using system_data::SystemDataInterface;
-using uploader::ClearcutV1ShippingManager;
-using util::ConsistentProtoStore;
-using util::EncryptedMessageMaker;
 using util::FakeValidatedClock;
 using util::IncrementingSystemClock;
 using util::PosixFileSystem;
@@ -317,15 +302,9 @@
  public:
   ~RealLoggerFactory() override = default;
 
-  RealLoggerFactory(std::unique_ptr<EncryptedMessageMaker> observation_encrypter,
-                    std::unique_ptr<EncryptedMessageMaker> envelope_encrypter,
+  RealLoggerFactory(std::unique_ptr<CobaltService> cobalt_service,
                     std::unique_ptr<ProjectContextFactory> project_context_factory,
-                    std::string customer_name, std::string project_name,
-                    std::unique_ptr<MemoryObservationStore> observation_store,
-                    std::unique_ptr<ClearcutV1ShippingManager> shipping_manager,
-                    std::unique_ptr<ConsistentProtoStore> local_aggregate_proto_store,
-                    std::unique_ptr<ConsistentProtoStore> obs_history_proto_store,
-                    std::unique_ptr<SystemDataInterface> system_data);
+                    std::string customer_name, std::string project_name);
 
   std::unique_ptr<LoggerInterface> NewLogger(uint32_t day_index) override;
   size_t ObservationCount() override;
@@ -336,54 +315,23 @@
   const ProjectContext* project_context() override { return project_context_.get(); }
 
  private:
-  std::unique_ptr<EncryptedMessageMaker> observation_encrypter_;
-  std::unique_ptr<EncryptedMessageMaker> envelope_encrypter_;
+  std::unique_ptr<CobaltService> cobalt_service_;
   std::unique_ptr<ProjectContextFactory> project_context_factory_;
   std::string customer_name_;
   std::string project_name_;
   std::unique_ptr<ProjectContext> project_context_;
-  std::unique_ptr<MemoryObservationStore> observation_store_;
-  std::unique_ptr<ClearcutV1ShippingManager> shipping_manager_;
-  std::unique_ptr<ConsistentProtoStore> local_aggregate_proto_store_;
-  std::unique_ptr<ConsistentProtoStore> obs_history_proto_store_;
-  std::unique_ptr<SystemDataInterface> system_data_;
-  std::unique_ptr<Encoder> encoder_;
-  std::unique_ptr<ObservationWriter> observation_writer_;
-  std::unique_ptr<EventAggregatorManager> event_aggregator_mgr_;
-  std::shared_ptr<UndatedEventManager> undated_event_manager_;
   std::unique_ptr<FakeValidatedClock> validated_clock_;
   std::unique_ptr<SystemClockInterface> mock_clock_;
 };
 
-RealLoggerFactory::RealLoggerFactory(
-    std::unique_ptr<EncryptedMessageMaker> observation_encrypter,
-    std::unique_ptr<EncryptedMessageMaker> envelope_encrypter,
-    std::unique_ptr<ProjectContextFactory> project_context_factory, std::string customer_name,
-    std::string project_name, std::unique_ptr<MemoryObservationStore> observation_store,
-    std::unique_ptr<ClearcutV1ShippingManager> shipping_manager,
-    std::unique_ptr<ConsistentProtoStore> local_aggregate_proto_store,
-    std::unique_ptr<ConsistentProtoStore> obs_history_proto_store,
-    std::unique_ptr<SystemDataInterface> system_data)
-    : observation_encrypter_(std::move(observation_encrypter)),
-      envelope_encrypter_(std::move(envelope_encrypter)),
+RealLoggerFactory::RealLoggerFactory(std::unique_ptr<CobaltService> cobalt_service,
+                                     std::unique_ptr<ProjectContextFactory> project_context_factory,
+                                     std::string customer_name, std::string project_name)
+    : cobalt_service_(std::move(cobalt_service)),
       project_context_factory_(std::move(project_context_factory)),
       customer_name_(std::move(customer_name)),
       project_name_(std::move(project_name)),
-      project_context_(project_context_factory_->NewProjectContext(customer_name_, project_name_)),
-      observation_store_(std::move(observation_store)),
-      shipping_manager_(std::move(shipping_manager)),
-      local_aggregate_proto_store_(std::move(local_aggregate_proto_store)),
-      obs_history_proto_store_(std::move(obs_history_proto_store)),
-      system_data_(std::move(system_data)) {
-  encoder_ = std::make_unique<Encoder>(ClientSecret::GenerateNewSecret(), system_data_.get());
-  observation_writer_ = std::make_unique<ObservationWriter>(
-      observation_store_.get(), shipping_manager_.get(), observation_encrypter_.get());
-  event_aggregator_mgr_ = std::make_unique<EventAggregatorManager>(
-      encoder_.get(), observation_writer_.get(), local_aggregate_proto_store_.get(),
-      obs_history_proto_store_.get());
-  undated_event_manager_ = std::make_shared<UndatedEventManager>(
-      encoder_.get(), event_aggregator_mgr_->GetEventAggregator(), observation_writer_.get(),
-      system_data_.get());
+      project_context_(project_context_factory_->NewProjectContext(customer_name_, project_name_)) {
 }
 
 std::unique_ptr<LoggerInterface> RealLoggerFactory::NewLogger(uint32_t day_index) {
@@ -396,38 +344,69 @@
     mock_clock_ = std::make_unique<SystemClock>();
   }
   validated_clock_ = std::make_unique<FakeValidatedClock>(mock_clock_.get());
-  std::unique_ptr<Logger> logger = std::make_unique<Logger>(
-      project_context_factory_->NewProjectContext(customer_name_, project_name_), encoder_.get(),
-      event_aggregator_mgr_->GetEventAggregator(), observation_writer_.get(), system_data_.get(),
-      validated_clock_.get(), undated_event_manager_);
-  return std::move(logger);
+
+  return cobalt_service_->NewLogger(
+      project_context_factory_->NewProjectContext(customer_name_, project_name_));
 }
 
 size_t RealLoggerFactory::ObservationCount() {
-  return observation_store_->num_observations_added();
+  return cobalt_service_->observation_store()->num_observations_added();
 }
 
-void RealLoggerFactory::ResetObservationCount() { observation_store_->ResetObservationCounter(); }
+void RealLoggerFactory::ResetObservationCount() {
+  cobalt_service_->observation_store()->ResetObservationCounter();
+}
 
 // TODO(pesk): also clear the contents of the ConsistentProtoStores if we
 // implement a mode which uses them.
 void RealLoggerFactory::ResetLocalAggregation() {
-  event_aggregator_mgr_ = std::make_unique<EventAggregatorManager>(
-      encoder_.get(), observation_writer_.get(), local_aggregate_proto_store_.get(),
-      obs_history_proto_store_.get());
+  cobalt_service_->event_aggregator_manager()->Reset();
 }
 
 bool RealLoggerFactory::GenerateAggregatedObservations(uint32_t day_index) {
-  return kOK == event_aggregator_mgr_->aggregate_store_->GenerateObservations(day_index);
+  return kOK == cobalt_service_->event_aggregator_manager()->aggregate_store_->GenerateObservations(
+                    day_index);
 }
 
 bool RealLoggerFactory::SendAccumulatedObservations() {
-  shipping_manager_->RequestSendSoon();
-  shipping_manager_->WaitUntilIdle(kDeadlinePerSendAttempt);
-  auto status = shipping_manager_->last_send_status();
+  cobalt_service_->shipping_manager()->RequestSendSoon();
+  cobalt_service_->shipping_manager()->WaitUntilIdle(kDeadlinePerSendAttempt);
+  auto status = cobalt_service_->shipping_manager()->last_send_status();
   return status.ok();
 }
 
+class TestAppPipeline : public TargetPipelineInterface {
+ public:
+  TestAppPipeline() : TargetPipelineInterface(system_data::Environment::DEVEL) {}
+
+  [[nodiscard]] std::optional<std::string> analyzer_encryption_key() const override {
+    if (FLAGS_analyzer_tink_keyset_file.empty() || FLAGS_shuffler_tink_keyset_file.empty()) {
+      VLOG(2) << "WARNING: Observations will not be encrypted to the Analyzer. "
+                 "Rerun with the flag -analyzer_tink_keyset_file.";
+      return std::nullopt;
+    }
+    VLOG(2) << "Reading analyzer keyset file at " << FLAGS_analyzer_tink_keyset_file;
+    return cobalt::util::ReadNonEmptyTextFile(FLAGS_analyzer_tink_keyset_file).ValueOrDie();
+  }
+
+  [[nodiscard]] std::optional<std::string> shuffler_encryption_key() const override {
+    if (FLAGS_analyzer_tink_keyset_file.empty() || FLAGS_shuffler_tink_keyset_file.empty()) {
+      VLOG(2) << "WARNING: Envelopes will not be encrypted to the Shuffler. "
+                 "Rerun with the flag -shuffler_tink_keyset_file.";
+      return std::nullopt;
+    }
+    VLOG(2) << "Reading shuffler keyset file at " << FLAGS_shuffler_tink_keyset_file;
+    return cobalt::util::ReadNonEmptyTextFile(FLAGS_shuffler_tink_keyset_file).ValueOrDie();
+  }
+
+  [[nodiscard]] std::unique_ptr<lib::clearcut::HTTPClient> TakeHttpClient() override {
+    return std::make_unique<lib::clearcut::CurlHTTPClient>();
+  }
+
+  [[nodiscard]] std::string clearcut_endpoint() const override { return FLAGS_clearcut_endpoint; }
+  [[nodiscard]] size_t clearcut_max_retries() const override { return kDefaultClearcutMaxRetries; }
+};
+
 }  // namespace internal
 
 std::unique_ptr<TestApp> TestApp::CreateFromFlagsOrDie(char* argv[]) {
@@ -446,71 +425,50 @@
 
   auto mode = ParseMode();
 
-  std::unique_ptr<EncryptedMessageMaker> observation_encrypter;
-  if (FLAGS_analyzer_tink_keyset_file.empty()) {
-    VLOG(2) << "WARNING: Observations will not be encrypted to the Analyzer. "
-               "Rerun with the flag -analyzer_tink_keyset_file.";
-    observation_encrypter = EncryptedMessageMaker::MakeUnencrypted();
-  } else {
-    VLOG(2) << "Reading analyzer keyset file at " << FLAGS_analyzer_tink_keyset_file;
-    auto key_value =
-        cobalt::util::ReadNonEmptyTextFile(FLAGS_analyzer_tink_keyset_file).ValueOrDie();
-    observation_encrypter = EncryptedMessageMaker::MakeForObservations(key_value).ValueOrDie();
-  }
+  CobaltConfig cfg = {.client_secret = system_data::ClientSecret::GenerateNewSecret()};
 
-  std::unique_ptr<EncryptedMessageMaker> envelope_encrypter;
-  if (FLAGS_shuffler_tink_keyset_file.empty()) {
-    VLOG(2) << "WARNING: Envelopes will not be encrypted to the Shuffler. "
-               "Rerun with the flag -shuffler_tink_keyset_file.";
-    observation_encrypter = EncryptedMessageMaker::MakeUnencrypted();
-    envelope_encrypter = EncryptedMessageMaker::MakeUnencrypted();
-  } else {
-    VLOG(2) << "Reading shuffler keyset file at " << FLAGS_shuffler_tink_keyset_file;
-    auto key_value =
-        cobalt::util::ReadNonEmptyTextFile(FLAGS_shuffler_tink_keyset_file).ValueOrDie();
-    envelope_encrypter = EncryptedMessageMaker::MakeForObservations(key_value).ValueOrDie();
-  }
+  cfg.product_name = "test_app";
+  cfg.version = "";
+  cfg.release_stage = ReleaseStage::DEBUG;
 
-  std::unique_ptr<SystemDataInterface> system_data(
-      new SystemData("test_app", "", ReleaseStage::DEBUG));
+  cfg.file_system = std::make_unique<util::PosixFileSystem>();
 
-  auto observation_store = std::make_unique<MemoryObservationStore>(
-      kMaxBytesPerObservation, kMaxBytesPerEnvelope, kMaxBytesTotal);
-  auto local_aggregate_proto_store = std::make_unique<ConsistentProtoStore>(
-      FLAGS_local_aggregate_backup_file, std::make_unique<PosixFileSystem>());
-  auto obs_history_proto_store = std::make_unique<ConsistentProtoStore>(
-      FLAGS_aggregated_obs_history_backup_file, std::make_unique<PosixFileSystem>());
+  cfg.use_memory_observation_store = true;
+  cfg.max_bytes_per_event = kMaxBytesPerObservation;
+  cfg.max_bytes_per_envelope = kMaxBytesPerEnvelope;
+  cfg.max_bytes_total = kMaxBytesTotal;
+
+  cfg.local_aggregate_proto_store_path = FLAGS_local_aggregate_backup_file;
+  cfg.obs_history_proto_store_path = FLAGS_aggregated_obs_history_backup_file;
 
   // By using (kMaxSeconds, 0) here we are effectively putting the
-  // ShippingDispatcher in manual mode. It will never send
-  // automatically and it will send immediately in response to
-  // RequestSendSoon().
-  auto upload_scheduler =
-      uploader::UploadScheduler(uploader::UploadScheduler::kMaxSeconds, std::chrono::seconds(0));
+  // ShippingDispatcher in manual mode. It will never send automatically
+  // and it will send immediately in response to RequestSendSoon().
+  cfg.target_interval = uploader::UploadScheduler::kMaxSeconds;
+  cfg.min_interval = std::chrono::seconds(0);
+  cfg.initial_interval = uploader::UploadScheduler::kMaxSeconds;
+
+  cfg.target_pipeline = std::make_unique<internal::TestAppPipeline>();
+
   if (mode == TestApp::kAutomatic) {
     // In automatic mode, let the ShippingManager send to the Shuffler
     // every 10 seconds.
     const auto upload_interval = std::chrono::seconds(10);
-    upload_scheduler = uploader::UploadScheduler(upload_interval, std::chrono::seconds(1));
+    cfg.target_interval = upload_interval;
+    cfg.min_interval = std::chrono::seconds(1);
+    cfg.initial_interval = upload_interval;
   }
-  auto shipping_manager = std::make_unique<ClearcutV1ShippingManager>(
-      upload_scheduler, observation_store.get(), envelope_encrypter.get(),
-      observation_encrypter.get(),
-      std::make_unique<lib::clearcut::ClearcutUploader>(
-          FLAGS_clearcut_endpoint, std::make_unique<lib::clearcut::CurlHTTPClient>()));
-  shipping_manager->Start();
 
-  std::unique_ptr<LoggerFactory> logger_factory(new internal::RealLoggerFactory(
-      std::move(observation_encrypter), std::move(envelope_encrypter),
-      std::move(project_context_factory), FLAGS_customer_name, FLAGS_project_name,
-      std::move(observation_store), std::move(shipping_manager),
-      std::move(local_aggregate_proto_store), std::move(obs_history_proto_store),
-      std::move(system_data)));
+  auto cobalt_service = std::make_unique<CobaltService>(std::move(cfg));
+
+  std::unique_ptr<LoggerFactory> logger_factory(
+      new internal::RealLoggerFactory(std::move(cobalt_service), std::move(project_context_factory),
+                                      FLAGS_customer_name, FLAGS_project_name));
 
   std::unique_ptr<TestApp> test_app(
       new TestApp(std::move(logger_factory), FLAGS_metric_name, mode, &std::cout));
   return test_app;
-}
+}  // namespace cobalt
 
 TestApp::TestApp(std::unique_ptr<LoggerFactory> logger_factory,
                  const std::string& initial_metric_name, Mode mode, std::ostream* ostream)
diff --git a/src/lib/util/BUILD.gn b/src/lib/util/BUILD.gn
index ecd7223..945ad30 100644
--- a/src/lib/util/BUILD.gn
+++ b/src/lib/util/BUILD.gn
@@ -175,16 +175,25 @@
   configs += [ "$cobalt_root:cobalt_config" ]
 }
 
-static_library("posix_file_system") {
+source_set("file_system") {
   sources = [
     "file_system.h",
+  ]
+  configs += [ "$cobalt_root:cobalt_config" ]
+  public_deps = [
+    ":proto_serialization",
+    "$cobalt_root/src/lib/statusor",
+  ]
+}
+
+static_library("posix_file_system") {
+  sources = [
     "posix_file_system.cc",
     "posix_file_system.h",
   ]
   configs += [ "$cobalt_root:cobalt_config" ]
-  deps = [
-    ":proto_serialization",
-    "$cobalt_root/src/lib/statusor",
+  public_deps = [
+    ":file_system",
   ]
 }
 
diff --git a/src/lib/util/consistent_proto_store.cc b/src/lib/util/consistent_proto_store.cc
index 00de539..2bab676 100644
--- a/src/lib/util/consistent_proto_store.cc
+++ b/src/lib/util/consistent_proto_store.cc
@@ -18,11 +18,19 @@
 constexpr char kTmpSuffix[] = ".tmp";
 constexpr char kOverrideSuffix[] = ".override";
 
-ConsistentProtoStore::ConsistentProtoStore(std::string filename, std::unique_ptr<FileSystem> fs)
+ConsistentProtoStore::ConsistentProtoStore(std::string filename,
+                                           std::unique_ptr<FileSystem> owned_fs)
     : primary_file_(std::move(filename)),
       tmp_file_(primary_file_ + kTmpSuffix),
       override_file_(primary_file_ + kOverrideSuffix),
-      fs_(std::move(fs)) {}
+      owned_fs_(std::move(owned_fs)),
+      fs_(owned_fs_.get()) {}
+
+ConsistentProtoStore::ConsistentProtoStore(std::string filename, FileSystem *fs)
+    : primary_file_(std::move(filename)),
+      tmp_file_(primary_file_ + kTmpSuffix),
+      override_file_(primary_file_ + kOverrideSuffix),
+      fs_(fs) {}
 
 // Write uses a series of operations to write new data. The goal of each of
 // these operations is that if the operation is interrupted or fails, the
diff --git a/src/lib/util/consistent_proto_store.h b/src/lib/util/consistent_proto_store.h
index c0ca1d2..80f57ac 100644
--- a/src/lib/util/consistent_proto_store.h
+++ b/src/lib/util/consistent_proto_store.h
@@ -24,7 +24,15 @@
   //
   // |filename| the fully qualified path of the file to store data in.
   // |fs| is used for detecting the presence of, renaming, and deleting files.
-  explicit ConsistentProtoStore(std::string filename, std::unique_ptr<FileSystem> fs);
+  explicit ConsistentProtoStore(std::string filename, FileSystem *fs);
+
+  // Constructs a ConsistentProtoStore
+  //
+  // |filename| the fully qualified path of the file to store data in.
+  // |owned_fs| is used for detecting the presence of, renaming, and deleting files.
+  //
+  // DEPRECATED: Use non-owned FileSystem
+  explicit ConsistentProtoStore(std::string filename, std::unique_ptr<FileSystem> owned_fs);
 
   virtual ~ConsistentProtoStore() = default;
 
@@ -59,7 +67,8 @@
   // read instead of primary_file_.
   const std::string override_file_;
 
-  std::unique_ptr<FileSystem> fs_;
+  std::unique_ptr<FileSystem> owned_fs_;
+  FileSystem *fs_;
 };
 
 }  // namespace util
diff --git a/src/lib/util/consistent_proto_store_test.cc b/src/lib/util/consistent_proto_store_test.cc
index 7fc7e5e..71721ca 100644
--- a/src/lib/util/consistent_proto_store_test.cc
+++ b/src/lib/util/consistent_proto_store_test.cc
@@ -30,7 +30,7 @@
 class TestConsistentProtoStore : public ConsistentProtoStore {
  public:
   explicit TestConsistentProtoStore(const std::string &filename)
-      : ConsistentProtoStore(filename, std::make_unique<PosixFileSystem>()),
+      : ConsistentProtoStore(filename, &fs_),
         fail_write_tmp_(false),
         fail_move_tmp_(false),
         fail_delete_primary_(false),
@@ -76,6 +76,7 @@
     return ConsistentProtoStore::MoveOverrideToPrimary();
   }
 
+  PosixFileSystem fs_;
   bool fail_write_tmp_;
   bool fail_move_tmp_;
   bool fail_delete_primary_;
diff --git a/src/local_aggregation/BUILD.gn b/src/local_aggregation/BUILD.gn
index 8a9db2a..3ae8022 100644
--- a/src/local_aggregation/BUILD.gn
+++ b/src/local_aggregation/BUILD.gn
@@ -53,9 +53,11 @@
     "$cobalt_root/src/lib/util:clock",
     "$cobalt_root/src/lib/util:consistent_proto_store",
     "$cobalt_root/src/lib/util:datetime_util",
+    "$cobalt_root/src/lib/util:file_system",
     "$cobalt_root/src/logger:encoder",
     "$cobalt_root/src/logger:observation_writer",
     "$cobalt_root/src/logger:status",
+    "$cobalt_root/src/public:cobalt_config",
   ]
 
   configs += [ "$cobalt_root:cobalt_config" ]
diff --git a/src/local_aggregation/event_aggregator_mgr.cc b/src/local_aggregation/event_aggregator_mgr.cc
index 204fdb6..0dc0810 100644
--- a/src/local_aggregation/event_aggregator_mgr.cc
+++ b/src/local_aggregation/event_aggregator_mgr.cc
@@ -4,7 +4,11 @@
 
 #include "src/local_aggregation/event_aggregator_mgr.h"
 
+#include <memory>
+
+#include "src/lib/util/consistent_proto_store.h"
 #include "src/lib/util/datetime_util.h"
+#include "src/lib/util/file_system.h"
 
 namespace cobalt::local_aggregation {
 
@@ -15,6 +19,12 @@
 using util::SteadyClock;
 using util::TimeToDayIndex;
 
+const std::chrono::seconds EventAggregatorManager::kDefaultAggregateBackupInterval =
+    std::chrono::minutes(1);
+const std::chrono::seconds EventAggregatorManager::kDefaultGenerateObsInterval =
+    std::chrono::hours(1);
+const std::chrono::seconds EventAggregatorManager::kDefaultGCInterval = std::chrono::hours(24);
+
 EventAggregatorManager::EventAggregatorManager(const logger::Encoder* encoder,
                                                const logger::ObservationWriter* observation_writer,
                                                ConsistentProtoStore* local_aggregate_proto_store,
@@ -34,6 +44,19 @@
   steady_clock_ = std::make_unique<SteadyClock>();
 }
 
+EventAggregatorManager::EventAggregatorManager(const CobaltConfig& cfg, util::FileSystem* fs,
+                                               const logger::Encoder* encoder,
+                                               const logger::ObservationWriter* observation_writer)
+    : encoder_(encoder),
+      observation_writer_(observation_writer),
+      backfill_days_(cfg.local_aggregation_backfill_days),
+      owned_local_aggregate_proto_store_(
+          new ConsistentProtoStore(cfg.local_aggregate_proto_store_path, fs)),
+      owned_obs_history_proto_store_(
+          new ConsistentProtoStore(cfg.obs_history_proto_store_path, fs)) {
+  Reset();
+}
+
 void EventAggregatorManager::Start(std::unique_ptr<util::SystemClockInterface> clock) {
   auto locked = protected_worker_thread_controller_.lock();
   locked->shut_down = false;
@@ -144,4 +167,13 @@
   }
   return aggregate_store_->GenerateObservations(final_day_index_utc, final_day_index_local);
 }
+
+void EventAggregatorManager::Reset() {
+  aggregate_store_ = std::make_unique<AggregateStore>(
+      encoder_, observation_writer_, owned_local_aggregate_proto_store_.get(),
+      owned_obs_history_proto_store_.get(), backfill_days_);
+
+  event_aggregator_ = std::make_unique<EventAggregator>(aggregate_store_.get());
+}
+
 }  // namespace cobalt::local_aggregation
diff --git a/src/local_aggregation/event_aggregator_mgr.h b/src/local_aggregation/event_aggregator_mgr.h
index 7c1fc25..90dd243 100644
--- a/src/local_aggregation/event_aggregator_mgr.h
+++ b/src/local_aggregation/event_aggregator_mgr.h
@@ -6,6 +6,7 @@
 #define COBALT_SRC_LOCAL_AGGREGATION_EVENT_AGGREGATOR_MGR_H_
 
 #include <condition_variable>
+#include <cstddef>
 #include <memory>
 #include <string>
 #include <thread>
@@ -19,6 +20,7 @@
 #include "src/logger/encoder.h"
 #include "src/logger/observation_writer.h"
 #include "src/logger/status.h"
+#include "src/public/cobalt_config.h"
 
 namespace cobalt {
 
@@ -81,14 +83,17 @@
   // and |gc_interval|, since each of Observation generation and garbage collection will be done at
   // the smallest multiple of |aggregate_backup_interval| which is greater than or equal to its
   // specified interval.
-  EventAggregatorManager(const logger::Encoder* encoder,
-                         const logger::ObservationWriter* observation_writer,
-                         util::ConsistentProtoStore* local_aggregate_proto_store,
-                         util::ConsistentProtoStore* obs_history_proto_store,
-                         size_t backfill_days = 0,
-                         std::chrono::seconds aggregate_backup_interval = std::chrono::minutes(1),
-                         std::chrono::seconds generate_obs_interval = std::chrono::hours(1),
-                         std::chrono::seconds gc_interval = std::chrono::hours(kHoursInADay));
+  EventAggregatorManager(
+      const logger::Encoder* encoder, const logger::ObservationWriter* observation_writer,
+      util::ConsistentProtoStore* local_aggregate_proto_store,
+      util::ConsistentProtoStore* obs_history_proto_store, size_t backfill_days = 0,
+      std::chrono::seconds aggregate_backup_interval = kDefaultAggregateBackupInterval,
+      std::chrono::seconds generate_obs_interval = kDefaultGenerateObsInterval,
+      std::chrono::seconds gc_interval = kDefaultGCInterval);
+
+  EventAggregatorManager(const CobaltConfig& cfg, util::FileSystem* fs,
+                         const logger::Encoder* encoder,
+                         const logger::ObservationWriter* observation_writer);
 
   // Shut down the worker thread before destructing the EventAggregatorManager.
   ~EventAggregatorManager() { ShutDown(); }
@@ -101,6 +106,12 @@
   //         accurate.
   void Start(std::unique_ptr<util::SystemClockInterface> clock);
 
+  // Resets the state of the local aggregator.
+  //
+  // TODO(pesk): also clear the contents of the ConsistentProtoStores if we
+  // implement a mode which uses them.
+  void Reset();
+
   // Returns a pointer to an EventAggregator to be used for logging.
   EventAggregator* GetEventAggregator() { return event_aggregator_.get(); }
 
@@ -168,7 +179,9 @@
     std::condition_variable_any shutdown_notifier;
   };
 
-  size_t backfill_days_;
+  const logger::Encoder* encoder_;
+  const logger::ObservationWriter* observation_writer_;
+  size_t backfill_days_ = 0;
   std::chrono::seconds aggregate_backup_interval_;
   std::chrono::seconds generate_obs_interval_;
   std::chrono::seconds gc_interval_;
@@ -178,8 +191,14 @@
   std::unique_ptr<util::SteadyClockInterface> steady_clock_;
 
   std::unique_ptr<AggregateStore> aggregate_store_;
+  std::unique_ptr<util::ConsistentProtoStore> owned_local_aggregate_proto_store_;
+  std::unique_ptr<util::ConsistentProtoStore> owned_obs_history_proto_store_;
   std::unique_ptr<EventAggregator> event_aggregator_;
 
+  static const std::chrono::seconds kDefaultAggregateBackupInterval;
+  static const std::chrono::seconds kDefaultGenerateObsInterval;
+  static const std::chrono::seconds kDefaultGCInterval;
+
   std::thread worker_thread_;
   util::ProtectedFields<WorkerThreadController> protected_worker_thread_controller_;
 };
diff --git a/src/logger/logger_test_utils.h b/src/logger/logger_test_utils.h
index 7f8c9a4..dab5179 100644
--- a/src/logger/logger_test_utils.h
+++ b/src/logger/logger_test_utils.h
@@ -125,8 +125,7 @@
 class MockConsistentProtoStore : public ::cobalt::util::ConsistentProtoStore {
  public:
   explicit MockConsistentProtoStore(std::string filename)
-      : ::cobalt::util::ConsistentProtoStore(std::move(filename),
-                                             std::make_unique<::cobalt::util::PosixFileSystem>()) {
+      : ::cobalt::util::ConsistentProtoStore(std::move(filename), &fs_) {
     read_count_ = 0;
     write_count_ = 0;
   }
@@ -161,6 +160,7 @@
   }
 
  private:
+  ::cobalt::util::PosixFileSystem fs_;
   std::unique_ptr<google::protobuf::MessageLite> stored_proto_;
 };
 
diff --git a/src/observation_store/file_observation_store.cc b/src/observation_store/file_observation_store.cc
index c2a0c4e..f3872a3 100644
--- a/src/observation_store/file_observation_store.cc
+++ b/src/observation_store/file_observation_store.cc
@@ -36,11 +36,11 @@
 
 FileObservationStore::FileObservationStore(size_t max_bytes_per_observation,
                                            size_t max_bytes_per_envelope, size_t max_bytes_total,
-                                           std::unique_ptr<FileSystem> fs,
-                                           std::string root_directory, std::string name,
+                                           FileSystem *fs, std::string root_directory,
+                                           std::string name,
                                            logger::LoggerInterface *internal_logger)
     : ObservationStore(max_bytes_per_observation, max_bytes_per_envelope, max_bytes_total),
-      fs_(std::move(fs)),
+      fs_(fs),
       root_directory_(std::move(root_directory)),
       active_file_name_(FullPath(kActiveFileName)),
       name_(std::move(name)),
@@ -287,7 +287,7 @@
 
   auto oldest_file_name = oldest_file_name_or.ConsumeValueOrDie();
   fields->files_taken.insert(oldest_file_name);
-  return std::make_unique<FileEnvelopeHolder>(fs_.get(), this, root_directory_, oldest_file_name);
+  return std::make_unique<FileEnvelopeHolder>(fs_, this, root_directory_, oldest_file_name);
 }
 
 void FileObservationStore::ReturnEnvelopeHolder(
diff --git a/src/observation_store/file_observation_store.h b/src/observation_store/file_observation_store.h
index a460ac1..a9bc849 100644
--- a/src/observation_store/file_observation_store.h
+++ b/src/observation_store/file_observation_store.h
@@ -119,10 +119,21 @@
   // |name| is used in log messages to distinguish this instance of
   // FileObservationStore.
   FileObservationStore(size_t max_bytes_per_observation, size_t max_bytes_per_envelope,
-                       size_t max_bytes_total, std::unique_ptr<util::FileSystem> fs,
-                       std::string root_directory, std::string name = "FileObservationStore",
+                       size_t max_bytes_total, util::FileSystem *fs, std::string root_directory,
+                       std::string name = "FileObservationStore",
                        logger::LoggerInterface *internal_logger = nullptr);
 
+  // DEPRECATED: Use non-owned FileSystem
+  FileObservationStore(size_t max_bytes_per_observation, size_t max_bytes_per_envelope,
+                       size_t max_bytes_total, std::unique_ptr<util::FileSystem> owned_fs,
+                       std::string root_directory, std::string name = "FileObservationStore",
+                       logger::LoggerInterface *internal_logger = nullptr)
+      : FileObservationStore(max_bytes_per_observation, max_bytes_per_envelope, max_bytes_total,
+                             owned_fs.get(), std::move(root_directory), std::move(name),
+                             internal_logger) {
+    owned_fs_ = std::move(owned_fs);
+  }
+
   using ObservationStore::StoreObservation;
   StoreStatus StoreObservation(std::unique_ptr<StoredObservation> observation,
                                std::unique_ptr<ObservationMetadata> metadata) override;
@@ -183,7 +194,8 @@
   google::protobuf::io::ZeroCopyOutputStream *GetActiveFile(
       util::ProtectedFields<Fields>::LockedFieldsPtr *fields);
 
-  const std::unique_ptr<util::FileSystem> fs_;
+  std::unique_ptr<util::FileSystem> owned_fs_;
+  util::FileSystem *fs_;
   const std::string root_directory_;
   const std::string active_file_name_;
   const std::string name_;
diff --git a/src/observation_store/file_observation_store_test.cc b/src/observation_store/file_observation_store_test.cc
index 570c431..edf2f1e 100644
--- a/src/observation_store/file_observation_store_test.cc
+++ b/src/observation_store/file_observation_store_test.cc
@@ -52,9 +52,8 @@
   }
 
   void MakeStore() {
-    store_ = std::make_unique<FileObservationStore>(
-        kMaxBytesPerObservation, kMaxBytesPerEnvelope, kMaxBytesTotal,
-        std::make_unique<PosixFileSystem>(), test_dir_name_);
+    store_ = std::make_unique<FileObservationStore>(kMaxBytesPerObservation, kMaxBytesPerEnvelope,
+                                                    kMaxBytesTotal, &fs_, test_dir_name_);
   }
 
   void TearDown() override { store_->Delete(); }
@@ -76,6 +75,7 @@
   }
 
  protected:
+  PosixFileSystem fs_;
   std::string test_dir_name_;
   std::unique_ptr<FileObservationStore> store_;
   std::unique_ptr<util::EncryptedMessageMaker> encrypt_;
diff --git a/src/public/BUILD.gn b/src/public/BUILD.gn
new file mode 100644
index 0000000..48b930c
--- /dev/null
+++ b/src/public/BUILD.gn
@@ -0,0 +1,43 @@
+# Copyright 2019 The Fuchsia Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+source_set("cobalt_config") {
+  sources = [
+    "cobalt_config.cc",
+    "cobalt_config.h",
+  ]
+
+  configs += [ "$cobalt_root:cobalt_config" ]
+
+  deps = [
+    "$cobalt_root/src/lib/clearcut",
+    "$cobalt_root/src/lib/util:encrypted_message_util",
+    "$cobalt_root/src/lib/util:file_system",
+    "$cobalt_root/src/logger:project_context",
+    "$cobalt_root/src/registry:cobalt_registry_proto",
+    "$cobalt_root/src/system_data:client_secret",
+    "$cobalt_root/src/system_data:configuration_data",
+  ]
+}
+
+source_set("cobalt_service") {
+  sources = [
+    "cobalt_service.cc",
+    "cobalt_service.h",
+  ]
+
+  configs += [ "$cobalt_root:cobalt_config" ]
+
+  deps = [
+    ":cobalt_config",
+    "$cobalt_root/src/local_aggregation:event_aggregator_mgr",
+    "$cobalt_root/src/logger",
+    "$cobalt_root/src/logger:undated_event_manager",
+    "$cobalt_root/src/observation_store",
+    "$cobalt_root/src/system_data",
+    "$cobalt_root/src/system_data:client_secret",
+    "$cobalt_root/src/system_data:configuration_data",
+    "$cobalt_root/src/uploader:shipping_manager",
+  ]
+}
diff --git a/src/public/cobalt_config.cc b/src/public/cobalt_config.cc
new file mode 100644
index 0000000..eca0eec
--- /dev/null
+++ b/src/public/cobalt_config.cc
@@ -0,0 +1,7 @@
+// Copyright 2019 The Fuchsia Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "src/public/cobalt_config.h"
+
+namespace cobalt {}  // namespace cobalt
diff --git a/src/public/cobalt_config.h b/src/public/cobalt_config.h
new file mode 100644
index 0000000..6bd4d6d
--- /dev/null
+++ b/src/public/cobalt_config.h
@@ -0,0 +1,224 @@
+// Copyright 2019 The Fuchsia Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef COBALT_SRC_PUBLIC_COBALT_CONFIG_H_
+#define COBALT_SRC_PUBLIC_COBALT_CONFIG_H_
+
+#include <memory>
+
+#include "src/lib/clearcut/http_client.h"
+#include "src/lib/util/clock.h"
+#include "src/lib/util/encrypted_message_util.h"
+#include "src/lib/util/file_system.h"
+#include "src/logger/project_context.h"
+#include "src/registry/metric_definition.pb.h"
+#include "src/system_data/client_secret.h"
+#include "src/system_data/configuration_data.h"
+
+namespace cobalt {
+
+constexpr char kDefaultClearcutEndpoint[] = "https://play.googleapis.com/staging/log";
+constexpr size_t kDefaultClearcutMaxRetries = 5;
+
+class TargetPipelineInterface {
+ public:
+  explicit TargetPipelineInterface(system_data::Environment environment)
+      : environment_(environment){};
+  virtual ~TargetPipelineInterface() = default;
+
+  // The target environment for the pipeline. Used to determine where to send the data.
+  [[nodiscard]] system_data::Environment environment() const { return environment_; }
+
+  // An encoded CobaltEncryptionKey proto representing the encryption key to be used for envelopes
+  // sent to the shuffler. (Or nullopt for no encryption)
+  [[nodiscard]] virtual std::optional<std::string> shuffler_encryption_key() const {
+    return std::nullopt;
+  }
+
+  // An encoded CobaltEncryptionKey proto representing the encryption key to be used for
+  // observations sent to the analyzer. (Or nullopt for no encryption)
+  [[nodiscard]] virtual std::optional<std::string> analyzer_encryption_key() const {
+    return std::nullopt;
+  }
+
+  // The URL for the desired clearcut endpoint.
+  [[nodiscard]] virtual std::string clearcut_endpoint() const { return ""; };
+
+  // Returns an implementation of lib::clearcut::HTTPClient. Will be used for uploading data in the
+  // ShippingManager.
+  [[nodiscard]] virtual std::unique_ptr<lib::clearcut::HTTPClient> TakeHttpClient() {
+    return nullptr;
+  }
+
+  // How many times should the clearcut upload be reattempted, before returning the observations to
+  // the ObservationStore.
+  [[nodiscard]] virtual size_t clearcut_max_retries() const { return 0; }
+
+ private:
+  system_data::Environment environment_;
+};
+
+class LocalPipeline : public TargetPipelineInterface {
+ public:
+  LocalPipeline() : TargetPipelineInterface(system_data::Environment::LOCAL) {}
+  ~LocalPipeline() override = default;
+};
+
+class ExtraPipeline : public TargetPipelineInterface {
+ public:
+  ExtraPipeline(system_data::Environment environment, std::string shuffler_encryption_key)
+      : TargetPipelineInterface(environment),
+        shuffler_encryption_key_(std::move(shuffler_encryption_key)) {}
+  ~ExtraPipeline() override = default;
+
+  [[nodiscard]] std::optional<std::string> shuffler_encryption_key() const override {
+    return shuffler_encryption_key_;
+  }
+
+ private:
+  std::string shuffler_encryption_key_;
+};
+
+class TargetPipeline : public TargetPipelineInterface {
+ public:
+  TargetPipeline(system_data::Environment environment, std::string shuffler_encryption_key,
+                 std::string analyzer_encryption_key,
+                 std::unique_ptr<lib::clearcut::HTTPClient> http_client,
+                 size_t clearcut_max_retries = kDefaultClearcutMaxRetries,
+                 std::string clearcut_endpoint = kDefaultClearcutEndpoint)
+      : TargetPipelineInterface(environment),
+        shuffler_encryption_key_(std::move(shuffler_encryption_key)),
+        analyzer_encryption_key_(std::move(analyzer_encryption_key)),
+        http_client_(std::move(http_client)),
+        clearcut_endpoint_(std::move(clearcut_endpoint)),
+        clearcut_max_retries_(clearcut_max_retries) {}
+  ~TargetPipeline() override = default;
+
+  [[nodiscard]] std::optional<std::string> shuffler_encryption_key() const override {
+    return shuffler_encryption_key_;
+  }
+  [[nodiscard]] std::optional<std::string> analyzer_encryption_key() const override {
+    return analyzer_encryption_key_;
+  }
+
+  [[nodiscard]] std::string clearcut_endpoint() const override { return clearcut_endpoint_; };
+  [[nodiscard]] std::unique_ptr<lib::clearcut::HTTPClient> TakeHttpClient() override {
+    return std::move(http_client_);
+  }
+  [[nodiscard]] size_t clearcut_max_retries() const override { return clearcut_max_retries_; }
+
+ private:
+  std::string shuffler_encryption_key_;
+  std::string analyzer_encryption_key_;
+  std::unique_ptr<lib::clearcut::HTTPClient> http_client_;
+  std::string clearcut_endpoint_;
+  size_t clearcut_max_retries_;
+};
+
+struct CobaltConfig {
+  // |product_name|: The value to use for the |product_name| field of the SystemProfile.
+  std::string product_name = "";
+
+  // |board_name_suggestion|: A suggestion for the value to use for the |board_name| field of the
+  // SystemProfile. This may be ignored if SystemData is able to determine the board name directly.
+  // A value of "" indicates that the caller has no information about board name, so one should be
+  // guessed.
+  std::string board_name_suggestion = "";
+
+  // |version|: The version of the running system. The use of this field is system-specific. For
+  // example on Fuchsia a possible value for |version| is "20190220_01_RC00".
+  std::string version = "";
+
+  // |release_stage|: The ReleaseStage of the running system.
+  ReleaseStage release_stage = ReleaseStage::GA;
+
+  // |file_system|: The FileSystem implementation to be used for all file system operations in
+  // Cobalt.
+  std::unique_ptr<util::FileSystem> file_system;
+
+  // |use_memory_observation_store|: If true, the ObservaitonStore used will be memory backed intead
+  // of file backed.
+  bool use_memory_observation_store = false;
+
+  // These three values are provided to the ObservationStore.
+  //
+  //|max_bytes_per_event|: Attempting to log an event that is larger than this value will result in
+  // an error code kObservationTooBig. (See observation_store.h)
+  //
+  // |max_bytes_per_envelope|: When pooling together Observaitons into an Envelope, the
+  // ObservationStore will try not to form envelopes larger than this size. This value is used to
+  // avoid sending messages over HTTP that are too large. (see observation_store.h)
+  //
+  // |max_bytes_total|: This is the maximum size of the Observations in the ObservationStore. If the
+  // size of the accumulated Observation data reaches this value, then ObservationStore will not
+  // accept any more Observations, resulting in an error code of kStoreFull. (See
+  // observaiton_store.h)
+  //
+  // REQUIRED:
+  // 0 <= max_bytes_per_event <= max_bytes_per_envelope <= max_bytes_total
+  // 0 <= max_bytes_per_envelope
+  size_t max_bytes_per_event;
+  size_t max_bytes_per_envelope;
+  size_t max_bytes_total;
+
+  // |observation_store_directory|: If |use_memory_observation_store| is false, this is the absolute
+  // path to the directory the observation_store will use to store observations.
+  std::string observation_store_directory;
+
+  // |local_aggregate_proto_store_path|: The absolute path where the local aggregate proto should be
+  // stored.
+  std::string local_aggregate_proto_store_path;
+
+  // |obs_history_proto_store_path|: The absolute path where the observation history proto should be
+  // stored.
+  std::string obs_history_proto_store_path;
+
+  // These three values are provided to the UploadScheduler of the shipping manager.
+  //
+  // |target_interval|: How frequently should ShippingManager perform regular periodic sends to the
+  // Shuffler? Set to kMaxSeconds to effectively disable periodic sends.
+  //
+  // |min_interval|: Used as the basis for exponentially increasing the upload interval. The
+  // resulting interval starts at this value, and then is multiplied by 2 each time until the value
+  // is greater than or equal to |target_interval|.
+  //
+  // REQUIRED:
+  // 0 <= min_interval <= target_interval <= kMaxSeconds
+  std::chrono::seconds target_interval;
+  std::chrono::seconds min_interval;
+  std::chrono::seconds initial_interval;
+
+  // |target_pipeline|: Used to determine where to send observations, and how to encrypt them.
+  std::unique_ptr<TargetPipelineInterface> target_pipeline;
+
+  // |extra_pipelines|: A list of extra pipelines where observations should be sent.
+  std::vector<std::unique_ptr<TargetPipelineInterface>> extra_pipelines;
+
+  // |local_shipping_manager_path|: If |environments| is equal to {LOCAL}, the observations will be
+  // written to this path, instead of being shipped to clearcut.
+  std::string local_shipping_manager_path;
+
+  // |api_key|: An API key included in each request to the Shuffler. If the API key is unrecognized
+  // on the server, the observations may be discarded.
+  std::string api_key;
+
+  // |client_secret|: The ClientSecret for this device.
+  system_data::ClientSecret client_secret;
+
+  // |internal_logger_project_context|: A ProjectContext that can be used to construct the internal
+  // Logger.
+  std::unique_ptr<logger::ProjectContext> internal_logger_project_context;
+
+  // |local_aggregation_backfill_days|: The number of past days for which the AggregateStore
+  // generates and sends Observations, in addition to a requested day index.
+  size_t local_aggregation_backfill_days;
+
+  // |validated_clock|: A reference to a ValidatedClockInterface, used to determine when the system
+  // has a clock that we can rely on.
+  util::ValidatedClockInterface* validated_clock;
+};
+
+}  // namespace cobalt
+
+#endif  // COBALT_SRC_PUBLIC_COBALT_CONFIG_H_
diff --git a/src/public/cobalt_service.cc b/src/public/cobalt_service.cc
new file mode 100644
index 0000000..01b8779
--- /dev/null
+++ b/src/public/cobalt_service.cc
@@ -0,0 +1,153 @@
+// Copyright 2019 The Fuchsia Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "src/public/cobalt_service.h"
+
+#include <memory>
+
+#include "src/lib/clearcut/http_client.h"
+#include "src/lib/util/clock.h"
+#include "src/lib/util/encrypted_message_util.h"
+#include "src/logger/project_context.h"
+#include "src/observation_store/file_observation_store.h"
+#include "src/observation_store/memory_observation_store.h"
+#include "src/observation_store/observation_store.h"
+#include "src/system_data/configuration_data.h"
+#include "src/uploader/shipping_manager.h"
+
+namespace cobalt {
+
+namespace {
+
+std::unique_ptr<observation_store::ObservationStore> NewObservationStore(const CobaltConfig &cfg,
+                                                                         util::FileSystem *fs) {
+  if (cfg.use_memory_observation_store) {
+    return std::make_unique<observation_store::MemoryObservationStore>(
+        cfg.max_bytes_per_event, cfg.max_bytes_per_envelope, cfg.max_bytes_total);
+  }
+  return std::make_unique<observation_store::FileObservationStore>(
+      cfg.max_bytes_per_event, cfg.max_bytes_per_envelope, cfg.max_bytes_total, fs,
+      cfg.observation_store_directory, "V1 FileObservationStore");
+}
+
+std::unique_ptr<util::EncryptedMessageMaker> GetEncryptToAnalyzer(CobaltConfig *cfg) {
+  if (cfg->target_pipeline->environment() == system_data::Environment::LOCAL) {
+    return util::EncryptedMessageMaker::MakeUnencrypted();
+  }
+
+  if (cfg->target_pipeline->analyzer_encryption_key()) {
+    return util::EncryptedMessageMaker::MakeForObservations(
+               *cfg->target_pipeline->analyzer_encryption_key())
+        .ConsumeValueOrDie();
+  }
+
+  return util::EncryptedMessageMaker::MakeUnencrypted();
+}
+
+std::unique_ptr<util::EncryptedMessageMaker> GetEncryptToShuffler(
+    TargetPipelineInterface *pipeline) {
+  if (pipeline->environment() == system_data::Environment::LOCAL) {
+    return util::EncryptedMessageMaker::MakeUnencrypted();
+  }
+
+  if (pipeline->shuffler_encryption_key()) {
+    return util::EncryptedMessageMaker::MakeForEnvelopes(*pipeline->shuffler_encryption_key())
+        .ConsumeValueOrDie();
+  }
+
+  return util::EncryptedMessageMaker::MakeUnencrypted();
+}
+
+std::vector<std::unique_ptr<util::EncryptedMessageMaker>> GetExtraEncryptToShufflers(
+    const std::vector<std::unique_ptr<TargetPipelineInterface>> &pipelines) {
+  std::vector<std::unique_ptr<util::EncryptedMessageMaker>> retval;
+
+  retval.reserve(pipelines.size());
+  for (const auto &pipeline : pipelines) {
+    retval.emplace_back(GetEncryptToShuffler(pipeline.get()));
+  }
+
+  return retval;
+}
+
+std::unique_ptr<uploader::ShippingManager> NewShippingManager(
+    CobaltConfig *cfg, util::FileSystem *fs, observation_store::ObservationStore *observation_store,
+    util::EncryptedMessageMaker *encrypt_to_analyzer,
+    const std::unique_ptr<util::EncryptedMessageMaker> &encrypt_to_shuffler,
+    const std::vector<std::unique_ptr<util::EncryptedMessageMaker>> &extra_encrypt_to_shufflers) {
+  if (cfg->target_pipeline->environment() == system_data::Environment::LOCAL) {
+    CHECK(cfg->extra_pipelines.empty())
+        << "Only one backend environment is supported if one is LOCAL.";
+    return std::make_unique<uploader::LocalShippingManager>(observation_store,
+                                                            cfg->local_shipping_manager_path, fs);
+  }
+  auto shipping_manager = std::make_unique<uploader::ClearcutV1ShippingManager>(
+      uploader::UploadScheduler(cfg->target_interval, cfg->min_interval, cfg->initial_interval),
+      observation_store, encrypt_to_analyzer,
+      std::make_unique<lib::clearcut::ClearcutUploader>(cfg->target_pipeline->clearcut_endpoint(),
+                                                        cfg->target_pipeline->TakeHttpClient()),
+      nullptr, cfg->target_pipeline->clearcut_max_retries(), cfg->api_key);
+
+  shipping_manager->AddClearcutDestination(
+      encrypt_to_shuffler.get(),
+      system_data::ConfigurationData(cfg->target_pipeline->environment()).GetLogSourceId());
+  for (int i = 0; i < cfg->extra_pipelines.size(); i++) {
+    shipping_manager->AddClearcutDestination(
+        extra_encrypt_to_shufflers[i].get(),
+        system_data::ConfigurationData(cfg->extra_pipelines[i]->environment()).GetLogSourceId());
+  }
+
+  return std::move(shipping_manager);
+}
+
+}  // namespace
+
+CobaltService::CobaltService(CobaltConfig cfg)
+    : fs_(std::move(cfg.file_system)),
+      system_data_(cfg.product_name, cfg.board_name_suggestion, cfg.release_stage, cfg.version),
+      observation_store_(NewObservationStore(cfg, fs_.get())),
+      encrypt_to_analyzer_(GetEncryptToAnalyzer(&cfg)),
+      encrypt_to_shuffler_(GetEncryptToShuffler(cfg.target_pipeline.get())),
+      extra_encrypt_to_shufflers_(GetExtraEncryptToShufflers(cfg.extra_pipelines)),
+      shipping_manager_(NewShippingManager(&cfg, fs_.get(), observation_store_.get(),
+                                           encrypt_to_analyzer_.get(), encrypt_to_shuffler_,
+                                           extra_encrypt_to_shufflers_)),
+      logger_encoder_(cfg.client_secret, &system_data_),
+      observation_writer_(observation_store_.get(), shipping_manager_.get(),
+                          encrypt_to_analyzer_.get()),
+      event_aggregator_manager_(cfg, fs_.get(), &logger_encoder_, &observation_writer_),
+      undated_event_manager_(new logger::UndatedEventManager(
+          &logger_encoder_, event_aggregator_manager_.GetEventAggregator(), &observation_writer_,
+          &system_data_)),
+      validated_clock_(cfg.validated_clock),
+      internal_logger_(NewLogger(std::move(cfg.internal_logger_project_context))) {
+  shipping_manager_->Start();
+}
+
+std::unique_ptr<logger::Logger> CobaltService::NewLogger(
+    std::unique_ptr<logger::ProjectContext> project_context) {
+  if (undated_event_manager_) {
+    return std::make_unique<logger::Logger>(
+        std::move(project_context), &logger_encoder_,
+        event_aggregator_manager_.GetEventAggregator(), &observation_writer_, &system_data_,
+        validated_clock_.get(), undated_event_manager_, internal_logger_.get());
+  }
+  return std::make_unique<logger::Logger>(
+      std::move(project_context), &logger_encoder_, event_aggregator_manager_.GetEventAggregator(),
+      &observation_writer_, &system_data_, internal_logger_.get());
+}
+
+void CobaltService::SystemClockIsAccurate(std::unique_ptr<util::SystemClockInterface> system_clock,
+                                          bool start_event_aggregator_worker) {
+  if (undated_event_manager_) {
+    undated_event_manager_->Flush(system_clock.get(), internal_logger_.get());
+    undated_event_manager_.reset();
+  }
+
+  if (start_event_aggregator_worker) {
+    event_aggregator_manager_.Start(std::move(system_clock));
+  }
+}
+
+}  // namespace cobalt
diff --git a/src/public/cobalt_service.h b/src/public/cobalt_service.h
new file mode 100644
index 0000000..9be7e2a
--- /dev/null
+++ b/src/public/cobalt_service.h
@@ -0,0 +1,104 @@
+// Copyright 2019 The Fuchsia Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef COBALT_SRC_PUBLIC_COBALT_SERVICE_H_
+#define COBALT_SRC_PUBLIC_COBALT_SERVICE_H_
+
+#include <chrono>
+#include <cstddef>
+#include <memory>
+#include <string>
+
+#include "src/lib/clearcut/http_client.h"
+#include "src/lib/util/clock.h"
+#include "src/lib/util/consistent_proto_store.h"
+#include "src/lib/util/encrypted_message_util.h"
+#include "src/local_aggregation/event_aggregator_mgr.h"
+#include "src/logger/logger.h"
+#include "src/logger/observation_writer.h"
+#include "src/logger/project_context.h"
+#include "src/logger/undated_event_manager.h"
+#include "src/observation_store/observation_store.h"
+#include "src/public/cobalt_config.h"
+#include "src/system_data/client_secret.h"
+#include "src/system_data/system_data.h"
+#include "src/uploader/shipping_manager.h"
+
+namespace cobalt {
+
+// CobaltService is the primary public interface for Cobalt on client platforms.
+//
+// It is constructed using a CobaltConfig struct, which provides all available customization options
+// for the behavior of Cobalt.
+//
+// Example:
+//
+// CobaltConfig cfg;
+// cfg.product_name = "product";
+// cfg.version = "version";
+// ... continue setting config values ...
+//
+// CobaltService service(std::move(cfg));
+//
+// // Get a logger:
+//
+// auto logger = service.NewLogger(project_context);
+// logger.LogEvent(Event);
+//
+class CobaltService {
+ public:
+  explicit CobaltService(CobaltConfig cfg);
+
+  // NewLogger returns a new instance of a Logger object based on the provided |project_context|.
+  std::unique_ptr<logger::Logger> NewLogger(
+      std::unique_ptr<logger::ProjectContext> project_context);
+
+  // SystemClockIsAccurate lets CobaltService know that it no longer needs to maintain an
+  // UndatedEventManager, and can flush the data from it into the observation store.
+  //
+  // This method should be used at most once in the lifetime of a CobaltService object.
+  //
+  // |system_clock|: An instance of SystemClockInterface that is used to add a timestamp to all of
+  // the events that were received before the system clock was made accurate. It is then given to
+  // the EventAggregator if |start_event_aggregator_worker| is true.
+  void SystemClockIsAccurate(std::unique_ptr<util::SystemClockInterface> system_clock,
+                             bool start_event_aggregator_worker);
+
+  // system_data returns a pointer to the internal SystemData object. This should only be used for
+  // updating the Expirement state or channel in SystemData.
+  system_data::SystemData *system_data() { return &system_data_; }
+
+ private:
+  friend class internal::RealLoggerFactory;
+
+  observation_store::ObservationStore *observation_store() { return observation_store_.get(); }
+
+  local_aggregation::EventAggregatorManager *event_aggregator_manager() {
+    return &event_aggregator_manager_;
+  }
+
+  uploader::ShippingManager *shipping_manager() { return shipping_manager_.get(); }
+
+  logger::UndatedEventManager *undated_event_manager() { return undated_event_manager_.get(); }
+
+  void ResetLocalAggregation() { event_aggregator_manager_.Reset(); }
+
+  std::unique_ptr<util::FileSystem> fs_;
+  system_data::SystemData system_data_;
+  std::unique_ptr<observation_store::ObservationStore> observation_store_;
+  std::unique_ptr<util::EncryptedMessageMaker> encrypt_to_analyzer_;
+  std::unique_ptr<util::EncryptedMessageMaker> encrypt_to_shuffler_;
+  std::vector<std::unique_ptr<util::EncryptedMessageMaker>> extra_encrypt_to_shufflers_;
+  std::unique_ptr<uploader::ShippingManager> shipping_manager_;
+  logger::Encoder logger_encoder_;
+  logger::ObservationWriter observation_writer_;
+  local_aggregation::EventAggregatorManager event_aggregator_manager_;
+  std::shared_ptr<logger::UndatedEventManager> undated_event_manager_;
+  std::unique_ptr<util::ValidatedClockInterface> validated_clock_;
+  std::unique_ptr<logger::LoggerInterface> internal_logger_;
+};
+
+}  // namespace cobalt
+
+#endif  // COBALT_SRC_PUBLIC_COBALT_SERVICE_H_
diff --git a/src/uploader/shipping_manager.cc b/src/uploader/shipping_manager.cc
index 9dd7069..f49a08c 100644
--- a/src/uploader/shipping_manager.cc
+++ b/src/uploader/shipping_manager.cc
@@ -394,19 +394,16 @@
 }
 
 LocalShippingManager::LocalShippingManager(observation_store::ObservationStore* observation_store,
-                                           std::string output_file_path,
-                                           std::unique_ptr<util::FileSystem> fs)
-    : LocalShippingManager(observation_store, nullptr, std::move(output_file_path), std::move(fs)) {
-}
+                                           std::string output_file_path, util::FileSystem* fs)
+    : LocalShippingManager(observation_store, nullptr, std::move(output_file_path), fs) {}
 
 LocalShippingManager::LocalShippingManager(observation_store::ObservationStore* observation_store,
                                            util::EncryptedMessageMaker* encrypt_to_analyzer,
-                                           std::string output_file_path,
-                                           std::unique_ptr<util::FileSystem> fs)
+                                           std::string output_file_path, util::FileSystem* fs)
     : ShippingManager({std::chrono::seconds::zero(), std::chrono::seconds::zero()},
                       observation_store, encrypt_to_analyzer),
       output_file_path_(std::move(output_file_path)),
-      fs_(std::move(fs)) {
+      fs_(fs) {
   CHECK(fs_);
 }
 
diff --git a/src/uploader/shipping_manager.h b/src/uploader/shipping_manager.h
index b1b9f3c..78f870c 100644
--- a/src/uploader/shipping_manager.h
+++ b/src/uploader/shipping_manager.h
@@ -119,6 +119,10 @@
   size_t num_failed_attempts() const;
   grpc::Status last_send_status() const;
 
+  // Resets the internal metrics for the ShippingManager and the ClearcutUploader to use the
+  // provided logger.
+  virtual void ResetInternalMetrics(logger::LoggerInterface* internal_logger) = 0;
+
  private:
   friend class ClearcutV1ShippingManager;
   friend class LocalShippingManager;
@@ -261,7 +265,7 @@
 
   // Resets the internal metrics for the ShippingManager and the ClearcutUploader to use the
   // provided logger.
-  void ResetInternalMetrics(logger::LoggerInterface* internal_logger = nullptr);
+  void ResetInternalMetrics(logger::LoggerInterface* internal_logger) override;
 
  private:
   std::unique_ptr<observation_store::ObservationStore::EnvelopeHolder> SendEnvelopeToBackend(
@@ -291,16 +295,37 @@
 class LocalShippingManager : public ShippingManager {
  public:
   explicit LocalShippingManager(observation_store::ObservationStore* observation_store,
-                                std::string output_file_path, std::unique_ptr<util::FileSystem> fs);
+                                std::string output_file_path, util::FileSystem* fs);
+
+  // DEPRECATED: Use non-owned FileSystem
+  explicit LocalShippingManager(observation_store::ObservationStore* observation_store,
+                                std::string output_file_path,
+                                std::unique_ptr<util::FileSystem> owned_fs)
+      : LocalShippingManager(observation_store, std::move(output_file_path), owned_fs.get()) {
+    owned_fs_ = std::move(owned_fs);
+  }
 
   explicit LocalShippingManager(observation_store::ObservationStore* observation_store,
                                 util::EncryptedMessageMaker* encrypt_to_analyzer,
-                                std::string output_file_path, std::unique_ptr<util::FileSystem> fs);
+                                std::string output_file_path, util::FileSystem* fs);
+
+  // DEPRECATED: Use non-owned FileSystem
+  explicit LocalShippingManager(observation_store::ObservationStore* observation_store,
+                                util::EncryptedMessageMaker* encrypt_to_analyzer,
+                                std::string output_file_path,
+                                std::unique_ptr<util::FileSystem> owned_fs)
+      : LocalShippingManager(observation_store, encrypt_to_analyzer, std::move(output_file_path),
+                             owned_fs.get()) {
+    owned_fs_ = std::move(owned_fs);
+  }
 
   // The destructor will stop the worker thread and wait for it to stop
   // before exiting.
   ~LocalShippingManager() override = default;
 
+  // We don't want to track internal metrics for LocalShippingManager
+  void ResetInternalMetrics(logger::LoggerInterface* internal_logger) override {}
+
  private:
   std::unique_ptr<observation_store::ObservationStore::EnvelopeHolder> SendEnvelopeToBackend(
       std::unique_ptr<observation_store::ObservationStore::EnvelopeHolder> envelope_to_send)
@@ -309,7 +334,8 @@
   [[nodiscard]] std::string name() const override { return "LocalShippingManager"; }
 
   std::string output_file_path_;
-  const std::unique_ptr<util::FileSystem> fs_;
+  std::unique_ptr<util::FileSystem> owned_fs_;
+  util::FileSystem* fs_;
 };
 
 }  // namespace cobalt::uploader
diff --git a/src/uploader/shipping_manager_test.cc b/src/uploader/shipping_manager_test.cc
index 8b3f649..703f3cd 100644
--- a/src/uploader/shipping_manager_test.cc
+++ b/src/uploader/shipping_manager_test.cc
@@ -591,8 +591,8 @@
  protected:
   void SetUp() override {
     fs_.Delete(test_file_name_);
-    shipping_manager_ = std::make_unique<LocalShippingManager>(
-        &observation_store_, test_file_name_, std::make_unique<util::PosixFileSystem>());
+    shipping_manager_ =
+        std::make_unique<LocalShippingManager>(&observation_store_, test_file_name_, &fs_);
     shipping_manager_->Start();
   }