[archivist] Migrate log budget tests to v2

Bug: 76502
Tested: archivist-integration-tests-v2
Multiply: archivist-integration-tests-v2

Change-Id: Icaa9713fad69237d534ba03d3c5c284eba073c5f
Reviewed-on: https://fuchsia-review.googlesource.com/c/fuchsia/+/532604
Commit-Queue: Auto-Submit <auto-submit@fuchsia-infra.iam.gserviceaccount.com>
Fuchsia-Auto-Submit: Miguel Flores <miguelfrde@google.com>
Reviewed-by: Christopher Johnson <crjohns@google.com>
API-Review: Christopher Johnson <crjohns@google.com>
diff --git a/src/diagnostics/archivist/tests/BUILD.gn b/src/diagnostics/archivist/tests/BUILD.gn
index ba825ed..e07adba 100644
--- a/src/diagnostics/archivist/tests/BUILD.gn
+++ b/src/diagnostics/archivist/tests/BUILD.gn
@@ -10,7 +10,6 @@
   deps = [
     ":archivist-integration-tests",
     "logs-basic-integration",
-    "logs-budget",
     "logs/cpp:test-spec",
     "logs/cpp:test-spec-structured",
     "v2:tests",
diff --git a/src/diagnostics/archivist/tests/logs-budget/BUILD.gn b/src/diagnostics/archivist/tests/logs-budget/BUILD.gn
deleted file mode 100644
index 153a692..0000000
--- a/src/diagnostics/archivist/tests/logs-budget/BUILD.gn
+++ /dev/null
@@ -1,107 +0,0 @@
-# Copyright 2020 The Fuchsia Authors. All rights reserved.
-# Use of this source code is governed by a BSD-style license that can be
-# found in the LICENSE file.
-
-import("//build/components.gni")
-import("//build/fidl/fidl.gni")
-import("//build/rust/rustc_binary.gni")
-
-group("logs-budget") {
-  testonly = true
-  deps = [ ":package" ]
-}
-
-rustc_binary("test") {
-  testonly = true
-  output_name = "logs_budget"
-
-  deps = [
-    ":fidl-rustc",
-    "//sdk/fidl/fuchsia.diagnostics:fuchsia.diagnostics-rustc",
-    "//sdk/fidl/fuchsia.logger:fuchsia.logger-rustc",
-    "//sdk/fidl/fuchsia.sys:fuchsia.sys-rustc",
-    "//src/diagnostics/archivist:lib",
-    "//src/lib/diagnostics/data/rust",
-    "//src/lib/diagnostics/hierarchy/rust",
-    "//src/lib/diagnostics/reader/rust",
-    "//src/lib/fidl/rust/fidl",
-    "//src/lib/fuchsia-async",
-    "//src/lib/fuchsia-component",
-    "//src/lib/syslog/rust",
-    "//src/lib/zircon/rust:fuchsia-zircon",
-    "//third_party/rust_crates:futures",
-    "//third_party/rust_crates:rand",
-    "//third_party/rust_crates:tracing",
-  ]
-
-  sources = [ "src/main.rs" ]
-}
-
-fuchsia_component("component") {
-  testonly = true
-  component_name = "test-logs-budget"
-  manifest = "meta/test-logs-budget.cmx"
-  deps = [ ":test" ]
-}
-
-fuchsia_component("archivist-with-small-caches") {
-  testonly = true
-  manifest = "//src/diagnostics/archivist/meta/archivist-for-embedding.cmx"
-  deps = [
-    ":small-caches-archivist-config",
-    "//src/diagnostics/archivist:bin",
-  ]
-}
-
-resource("small-caches-archivist-config") {
-  sources = [ "config/small-caches-config.json" ]
-  outputs = "data/embedding-config.json"
-}
-
-fidl("fidl") {
-  testonly = true
-  name = "test.logs.budget"
-  sources = [ "fidl/socket_puppet.test.fidl" ]
-}
-
-rustc_binary("puppet") {
-  testonly = true
-  name = "socket_puppet"
-  source_root = "src/puppet.rs"
-
-  deps = [
-    ":fidl-rustc",
-    "//sdk/fidl/fuchsia.logger:fuchsia.logger-rustc",
-    "//src/lib/fidl/rust/fidl",
-    "//src/lib/fuchsia-async",
-    "//src/lib/fuchsia-component",
-    "//src/lib/zircon/rust:fuchsia-zircon",
-    "//third_party/rust_crates:futures",
-  ]
-
-  sources = [ "src/puppet.rs" ]
-}
-
-puppet_ids = [
-  0,
-  1,
-  2,
-  3,
-  4,
-]
-puppets = []
-foreach(puppet_id, puppet_ids) {
-  puppet_name = "socket-puppet" + puppet_id
-  puppets += [ ":$puppet_name" ]
-  fuchsia_component(puppet_name) {
-    testonly = true
-    manifest = "meta/socket-puppet.cmx"
-    deps = [ ":puppet" ]
-  }
-}
-
-fuchsia_test_package("package") {
-  package_name = "test-logs-budget"
-  test_components = [ ":component" ]
-  deps = [ ":archivist-with-small-caches" ] + puppets
-}
diff --git a/src/diagnostics/archivist/tests/logs-budget/meta/socket-puppet.cmx b/src/diagnostics/archivist/tests/logs-budget/meta/socket-puppet.cmx
deleted file mode 100644
index c04e7e2..0000000
--- a/src/diagnostics/archivist/tests/logs-budget/meta/socket-puppet.cmx
+++ /dev/null
@@ -1,13 +0,0 @@
-{
-    "include": [
-        "syslog/client.shard.cmx"
-    ],
-    "program": {
-        "binary": "bin/socket_puppet"
-    },
-    "sandbox": {
-        "services": [
-            "test.logs.budget.SocketPuppetController"
-        ]
-    }
-}
diff --git a/src/diagnostics/archivist/tests/logs-budget/meta/test-logs-budget.cmx b/src/diagnostics/archivist/tests/logs-budget/meta/test-logs-budget.cmx
deleted file mode 100644
index 6074a2f..0000000
--- a/src/diagnostics/archivist/tests/logs-budget/meta/test-logs-budget.cmx
+++ /dev/null
@@ -1,15 +0,0 @@
-{
-    "include": [
-        "syslog/client.shard.cmx"
-    ],
-    "program": {
-        "binary": "bin/logs_budget"
-    },
-    "sandbox": {
-        "services": [
-            "fuchsia.sys.Environment",
-            "fuchsia.sys.Launcher",
-            "fuchsia.sys.Loader"
-        ]
-    }
-}
diff --git a/src/diagnostics/archivist/tests/logs/cpp/meta/cpp-logs.cml b/src/diagnostics/archivist/tests/logs/cpp/meta/cpp-logs.cml
new file mode 100644
index 0000000..af0ab7b
--- /dev/null
+++ b/src/diagnostics/archivist/tests/logs/cpp/meta/cpp-logs.cml
@@ -0,0 +1,10 @@
+{
+    include: [ "sdk/lib/diagnostics/syslog/client.shard.cml" ],
+    program: {
+        runner: "elf",
+        binary: "bin/logs_integration_cpp_tests",
+    },
+    use: [
+        { protocol: "fuchsia.logger.Log" },
+    ],
+}
diff --git a/src/diagnostics/archivist/tests/v2/BUILD.gn b/src/diagnostics/archivist/tests/v2/BUILD.gn
index 2100432..625e39b 100644
--- a/src/diagnostics/archivist/tests/v2/BUILD.gn
+++ b/src/diagnostics/archivist/tests/v2/BUILD.gn
@@ -21,6 +21,7 @@
     "//sdk/fidl/fuchsia.sys.internal:fuchsia.sys.internal-rustc",
     "//sdk/fidl/fuchsia.sys2:fuchsia.sys2-rustc",
     "//src/diagnostics/archivist:lib",
+    "//src/diagnostics/archivist/tests/v2/components/fidl:fidl-rustc",
     "//src/lib/diagnostics/data/rust",
     "//src/lib/diagnostics/hierarchy/rust",
     "//src/lib/diagnostics/reader/rust",
@@ -38,6 +39,7 @@
     "//third_party/rust_crates:difference",
     "//third_party/rust_crates:futures",
     "//third_party/rust_crates:lazy_static",
+    "//third_party/rust_crates:rand",
     "//third_party/rust_crates:serde_json",
     "//third_party/rust_crates:tracing",
   ]
@@ -51,6 +53,7 @@
     "src/inspect/truncation.rs",
     "src/lib.rs",
     "src/logs/attribution.rs",
+    "src/logs/budget.rs",
     "src/logs/crash.rs",
     "src/logs/lifecycle.rs",
     "src/logs/mod.rs",
@@ -58,6 +61,7 @@
     "src/logs/sorting.rs",
     "src/logs/utils.rs",
     "src/test_topology.rs",
+    "src/utils.rs",
   ]
   inputs = [
     "test_data/unified_reader_all_golden.json",
@@ -83,6 +87,13 @@
   manifest = "meta/archivist_for_integration.cml"
 }
 
+fuchsia_component("archivist-with-small-caches") {
+  component_name = "archivist-with-small-caches"
+  testonly = true
+  deps = [ "//src/diagnostics/archivist:bin" ]
+  manifest = "meta/archivist-with-small-caches.cml"
+}
+
 fuchsia_component("archivist_integration_tests") {
   testonly = true
   deps = [ ":integration_tests_bin" ]
@@ -94,6 +105,11 @@
   outputs = [ "data/config/archivist_config.json" ]
 }
 
+resource("small-caches-config") {
+  sources = [ "configs/small-caches-config.json" ]
+  outputs = [ "data/config/small-caches-config.json" ]
+}
+
 resource("filter_feedback") {
   sources = [ "configs/static_selectors.cfg" ]
   outputs = [ "data/config/pipelines/feedback/static_selectors.cfg" ]
@@ -107,8 +123,10 @@
 fuchsia_test_package("archivist-integration-tests-v2") {
   test_components = [ ":archivist_integration_tests" ]
   deps = [
+    ":archivist-with-small-caches",
     ":archivist_for_integration",
     ":archivist_for_integration_with_log_connector",
+    ":small-caches-config",
     ":test_archivist_config",
     "components",
     "//src/diagnostics/iquery/test/test_component",
diff --git a/src/diagnostics/archivist/tests/v2/components/BUILD.gn b/src/diagnostics/archivist/tests/v2/components/BUILD.gn
index 7037390..64b583c 100644
--- a/src/diagnostics/archivist/tests/v2/components/BUILD.gn
+++ b/src/diagnostics/archivist/tests/v2/components/BUILD.gn
@@ -78,12 +78,37 @@
   manifest = "meta/log-and-crash.cml"
 }
 
+rustc_binary("socket_puppet_bin") {
+  testonly = true
+  name = "socket_puppet"
+  source_root = "src/socket_puppet.rs"
+
+  deps = [
+    "//sdk/fidl/fuchsia.logger:fuchsia.logger-rustc",
+    "//src/diagnostics/archivist/tests/v2/components/fidl:fidl-rustc",
+    "//src/lib/fidl/rust/fidl",
+    "//src/lib/fuchsia-async",
+    "//src/lib/fuchsia-component",
+    "//src/lib/zircon/rust:fuchsia-zircon",
+    "//third_party/rust_crates:futures",
+  ]
+
+  sources = [ "src/socket_puppet.rs" ]
+}
+
+fuchsia_component("socket-puppet") {
+  testonly = true
+  manifest = "meta/socket-puppet.cml"
+  deps = [ ":socket_puppet_bin" ]
+}
+
 group("components") {
   testonly = true
   deps = [
     ":component_with_children",
     ":log_and_crash_component",
     ":log_and_exit_component",
+    ":socket-puppet",
     ":stub_inspect_component",
   ]
 }
diff --git a/src/diagnostics/archivist/tests/v2/components/fidl/BUILD.gn b/src/diagnostics/archivist/tests/v2/components/fidl/BUILD.gn
new file mode 100644
index 0000000..1c06b39
--- /dev/null
+++ b/src/diagnostics/archivist/tests/v2/components/fidl/BUILD.gn
@@ -0,0 +1,11 @@
+# Copyright 2021 The Fuchsia Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+import("//build/fidl/fidl.gni")
+
+fidl("fidl") {
+  testonly = true
+  name = "fuchsia.archivist.tests"
+  sources = [ "socket_puppet.test.fidl" ]
+}
diff --git a/src/diagnostics/archivist/tests/logs-budget/fidl/socket_puppet.test.fidl b/src/diagnostics/archivist/tests/v2/components/fidl/socket_puppet.test.fidl
similarity index 93%
rename from src/diagnostics/archivist/tests/logs-budget/fidl/socket_puppet.test.fidl
rename to src/diagnostics/archivist/tests/v2/components/fidl/socket_puppet.test.fidl
index a3cc77c..61cce20 100644
--- a/src/diagnostics/archivist/tests/logs-budget/fidl/socket_puppet.test.fidl
+++ b/src/diagnostics/archivist/tests/v2/components/fidl/socket_puppet.test.fidl
@@ -1,7 +1,7 @@
 // Copyright 2020 The Fuchsia Authors. All rights reserved.
 // Use of this source code is governed by a BSD-style license that can be
 // found in the LICENSE file.
-library test.logs.budget;
+library fuchsia.archivist.tests;
 
 @discoverable
 protocol SocketPuppetController {
diff --git a/src/diagnostics/archivist/tests/v2/components/meta/socket-puppet.cml b/src/diagnostics/archivist/tests/v2/components/meta/socket-puppet.cml
new file mode 100644
index 0000000..7836fa4
--- /dev/null
+++ b/src/diagnostics/archivist/tests/v2/components/meta/socket-puppet.cml
@@ -0,0 +1,10 @@
+{
+    include: [ "syslog/client.shard.cml" ],
+    program: {
+        runner: "elf",
+        binary: "bin/socket_puppet",
+    },
+    use: [
+        { protocol: "fuchsia.archivist.tests.SocketPuppetController" },
+    ],
+}
diff --git a/src/diagnostics/archivist/tests/logs-budget/src/puppet.rs b/src/diagnostics/archivist/tests/v2/components/src/socket_puppet.rs
similarity index 96%
rename from src/diagnostics/archivist/tests/logs-budget/src/puppet.rs
rename to src/diagnostics/archivist/tests/v2/components/src/socket_puppet.rs
index cfddb7f..1a58517 100644
--- a/src/diagnostics/archivist/tests/logs-budget/src/puppet.rs
+++ b/src/diagnostics/archivist/tests/v2/components/src/socket_puppet.rs
@@ -3,10 +3,10 @@
 // found in the LICENSE file.
 
 use fidl::endpoints::create_request_stream;
-use fidl_fuchsia_logger::LogSinkMarker;
-use fidl_test_logs_budget::{
+use fidl_fuchsia_archivist_tests::{
     SocketPuppetControllerMarker, SocketPuppetMarker, SocketPuppetRequest,
 };
+use fidl_fuchsia_logger::LogSinkMarker;
 use fuchsia_component::client::connect_to_protocol;
 use fuchsia_zircon as zx;
 use futures::StreamExt;
diff --git a/src/diagnostics/archivist/tests/logs-budget/config/small-caches-config.json b/src/diagnostics/archivist/tests/v2/configs/small-caches-config.json
similarity index 100%
rename from src/diagnostics/archivist/tests/logs-budget/config/small-caches-config.json
rename to src/diagnostics/archivist/tests/v2/configs/small-caches-config.json
diff --git a/src/diagnostics/archivist/tests/v2/meta/archivist-with-small-caches.cml b/src/diagnostics/archivist/tests/v2/meta/archivist-with-small-caches.cml
new file mode 100644
index 0000000..26e0490
--- /dev/null
+++ b/src/diagnostics/archivist/tests/v2/meta/archivist-with-small-caches.cml
@@ -0,0 +1,17 @@
+{
+    include: [
+        "//src/diagnostics/archivist/meta/common.shard.cml",
+        "//src/diagnostics/archivist/tests/v2/meta/test.shard.cml",
+        "syslog/client.shard.cml",
+    ],
+    program: {
+        binary: "bin/archivist",
+        args: [
+            "--disable-klog",
+            "--disable-log-connector",
+            "--disable-component-event-provider",
+            "--config-path",
+            "/pkg/data/config/small-caches-config.json",
+        ],
+    },
+}
diff --git a/src/diagnostics/archivist/tests/v2/meta/archivist_integration_tests.cml b/src/diagnostics/archivist/tests/v2/meta/archivist_integration_tests.cml
index d5dd260..293d4ba 100644
--- a/src/diagnostics/archivist/tests/v2/meta/archivist_integration_tests.cml
+++ b/src/diagnostics/archivist/tests/v2/meta/archivist_integration_tests.cml
@@ -35,5 +35,20 @@
             to: "#fuchsia_component_test_collection",
             filter: { name: "fuchsia.logger.LogSink" },
         },
+        {
+            event: "stopped",
+            from: "framework",
+            to: "#fuchsia_component_test_collection",
+        },
+        {
+            event: "started",
+            from: "framework",
+            to: "#fuchsia_component_test_collection",
+        },
+        {
+            event: "running",
+            from: "framework",
+            to: "#fuchsia_component_test_collection",
+        },
     ],
 }
diff --git a/src/diagnostics/archivist/tests/v2/src/constants.rs b/src/diagnostics/archivist/tests/v2/src/constants.rs
index 2a1e50f..b114139 100644
--- a/src/diagnostics/archivist/tests/v2/src/constants.rs
+++ b/src/diagnostics/archivist/tests/v2/src/constants.rs
@@ -6,12 +6,16 @@
     "fuchsia-pkg://fuchsia.com/archivist-with-feedback-filtering#meta/archivist.cm";
 pub const ARCHIVIST_WITH_FEEDBACK_FILTERING_DISABLED: &str =
     "fuchsia-pkg://fuchsia.com/archivist-with-feedback-filtering-disabled#meta/archivist.cm";
+pub const ARCHIVIST_WITH_SMALL_CACHES: &str =
+    "fuchsia-pkg://fuchsia.com/archivist-integration-tests-v2#meta/archivist-with-small-caches.cm";
 pub const COMPONENT_WITH_CHILDREN_URL: &str =
     "fuchsia-pkg://fuchsia.com/archivist-integration-tests-v2#meta/component_with_children.cm";
 pub const IQUERY_TEST_COMPONENT_URL: &str =
     "fuchsia-pkg://fuchsia.com/archivist-integration-tests-v2#meta/test_component.cm";
 pub const LOG_AND_CRASH_COMPONENT_URL: &str =
     "fuchsia-pkg://fuchsia.com/archivist-integration-tests-v2#meta/log-and-crash.cm";
+pub const SOCKET_PUPPET_COMPONENT_URL: &str =
+    "fuchsia-pkg://fuchsia.com/archivist-integration-tests-v2#meta/socket-puppet.cm";
 pub const LOG_AND_EXIT_COMPONENT_URL: &str =
     "fuchsia-pkg://fuchsia.com/archivist-integration-tests-v2#meta/log-and-exit.cm";
 pub const STUB_INSPECT_COMPONENT_URL: &str =
diff --git a/src/diagnostics/archivist/tests/v2/src/lib.rs b/src/diagnostics/archivist/tests/v2/src/lib.rs
index 5475724..0477ac2 100644
--- a/src/diagnostics/archivist/tests/v2/src/lib.rs
+++ b/src/diagnostics/archivist/tests/v2/src/lib.rs
@@ -6,3 +6,4 @@
 mod inspect;
 mod logs;
 mod test_topology;
+mod utils;
diff --git a/src/diagnostics/archivist/tests/logs-budget/src/main.rs b/src/diagnostics/archivist/tests/v2/src/logs/budget.rs
similarity index 70%
rename from src/diagnostics/archivist/tests/logs-budget/src/main.rs
rename to src/diagnostics/archivist/tests/v2/src/logs/budget.rs
index 66f2491a..ceae93c 100644
--- a/src/diagnostics/archivist/tests/logs-budget/src/main.rs
+++ b/src/diagnostics/archivist/tests/v2/src/logs/budget.rs
@@ -1,26 +1,26 @@
-// Copyright 2020 The Fuchsia Authors. All rights reserved.
+// Copyright 2021 The Fuchsia Authors. All rights reserved.
 // Use of this source code is governed by a BSD-style license that can be
 // found in the LICENSE file.
 
+use crate::{constants::*, test_topology, utils};
+use anyhow::Error;
 use archivist_lib::{
     configs::parse_config,
     logs::message::{fx_log_packet_t, METADATA_SIZE},
 };
+use component_events::{events::*, matcher::ExitStatusMatcher};
 use diagnostics_data::{Data, LogError, Logs, Severity};
 use diagnostics_hierarchy::trie::TrieIterableNode;
 use diagnostics_reader::{ArchiveReader, Inspect, SubscriptionResultsStream};
-use fidl::endpoints::ProtocolMarker;
-use fidl_fuchsia_diagnostics::ArchiveAccessorMarker;
-use fidl_fuchsia_logger::LogSinkMarker;
-use fidl_fuchsia_sys::{ComponentControllerEvent::OnTerminated, LauncherProxy};
-use fidl_test_logs_budget::{
+use fidl_fuchsia_archivist_tests::{
     SocketPuppetControllerRequest, SocketPuppetControllerRequestStream, SocketPuppetProxy,
 };
+use fidl_fuchsia_diagnostics::ArchiveAccessorMarker;
+use fidl_fuchsia_io::DirectoryMarker;
+use fidl_fuchsia_sys2::{ChildRef, EventSourceMarker, RealmMarker};
 use fuchsia_async::{Task, Timer};
-use fuchsia_component::{
-    client::{launch, launch_with_options, App, LaunchOptions},
-    server::ServiceFs,
-};
+use fuchsia_component::{client, server::ServiceFs};
+use fuchsia_component_test::{builder::*, mock, RealmInstance};
 use fuchsia_zircon as zx;
 use futures::{
     channel::mpsc::{self, Receiver},
@@ -30,34 +30,32 @@
 use std::{collections::BTreeMap, ops::Deref, time::Duration};
 use tracing::{debug, info, trace};
 
-const ARCHIVIST_URL: &str =
-    "fuchsia-pkg://fuchsia.com/test-logs-budget#meta/archivist-with-small-caches.cmx";
-
 const TEST_PACKET_LEN: usize = 49;
+const MAX_PUPPETS: usize = 5;
 
-#[fuchsia_async::run_singlethreaded]
-async fn main() {
+#[fuchsia_async::run_singlethreaded(test)]
+async fn test_budget() {
     fuchsia_syslog::init().unwrap();
     fuchsia_syslog::set_severity(fuchsia_syslog::levels::DEBUG);
 
     info!("testing that the archivist's log buffers correctly enforce their budget");
 
     info!("creating nested environment for collecting diagnostics");
-    let mut env = PuppetEnv::create().await;
+    let mut env = PuppetEnv::create(MAX_PUPPETS).await;
 
     info!("check that archivist log state is clean");
     env.assert_archivist_state_matches_expected().await;
 
-    for i in 0..5 {
+    for i in 0..MAX_PUPPETS {
         env.launch_puppet(i).await;
     }
     env.validate().await;
 }
 
 struct PuppetEnv {
-    launcher: LauncherProxy,
+    max_puppets: usize,
+    instance: RealmInstance,
     controllers: Receiver<SocketPuppetControllerRequestStream>,
-    _archivist: App,
     messages_allowed_in_cache: usize,
     messages_sent: Vec<MessageReceipt>,
     launched_monikers: Vec<String>,
@@ -66,57 +64,65 @@
     log_reader: ArchiveReader,
     log_subscription: SubscriptionResultsStream<Logs>,
     rng: StdRng,
-    _serve_fs: Task<()>,
     _log_errors: Task<()>,
 }
 
 impl PuppetEnv {
-    async fn create() -> Self {
-        let (mut sender, controllers) = mpsc::channel(1);
-        let mut fs = ServiceFs::new();
-        fs.add_fidl_service(move |requests: SocketPuppetControllerRequestStream| {
-            debug!("got controller request, forwarding back to main");
-            sender.start_send(requests).unwrap();
-        });
+    async fn create(max_puppets: usize) -> Self {
+        let (sender, controllers) = mpsc::channel(1);
+        let mut builder = test_topology::create(test_topology::Options {
+            archivist_url: ARCHIVIST_WITH_SMALL_CACHES,
+        })
+        .await
+        .expect("create base topology");
+        builder
+            .add_component(
+                "mocks-server",
+                ComponentSource::Mock(mock::Mock::new(move |mock_handles: mock::MockHandles| {
+                    Box::pin(run_mocks(mock_handles, sender.clone()))
+                })),
+            )
+            .await
+            .unwrap();
 
-        let env = fs.create_salted_nested_environment("diagnostics").unwrap();
-        let launcher = env.launcher().clone();
-        let _serve_fs = Task::spawn(async move {
-            let _env = env; // move env into the task so it stays alive
-            fs.collect::<()>().await
-        });
-
-        // creating a proxy to logsink in our own environment, otherwise embedded archivist just
-        // eats its own logs via logconnector
-        let options = {
-            let mut options = LaunchOptions::new();
-            let (dir_client, dir_server) = zx::Channel::create().unwrap();
-            let mut fs = ServiceFs::new();
-            fs.add_proxy_service::<LogSinkMarker, _>().serve_connection(dir_server).unwrap();
-            Task::spawn(fs.collect()).detach();
-            options.set_additional_services(vec![LogSinkMarker::NAME.to_string()], dir_client);
-            options
-        };
-
-        info!("starting our archivist");
-        let _archivist =
-            launch_with_options(&launcher, ARCHIVIST_URL.to_string(), None, options).unwrap();
-        let config = parse_config("/pkg/data/embedding-config.json").unwrap();
-
-        let mut archivist_events = _archivist.controller().take_event_stream();
-        if let OnTerminated { .. } = archivist_events.next().await.unwrap().unwrap() {
-            panic!("archivist terminated early");
+        for i in 0..max_puppets {
+            let name = format!("test/puppet-{}", i);
+            builder
+                .add_component(name.clone(), ComponentSource::url(SOCKET_PUPPET_COMPONENT_URL))
+                .await
+                .unwrap()
+                .add_route(CapabilityRoute {
+                    capability: Capability::protocol(
+                        "fuchsia.archivist.tests.SocketPuppetController",
+                    ),
+                    source: RouteEndpoint::component("mocks-server"),
+                    targets: vec![RouteEndpoint::component(name.clone())],
+                })
+                .unwrap()
+                .add_route(CapabilityRoute {
+                    capability: Capability::protocol("fuchsia.logger.LogSink"),
+                    source: RouteEndpoint::component("test/archivist"),
+                    targets: vec![RouteEndpoint::component(name)],
+                })
+                .unwrap();
         }
 
+        info!("starting our instance");
+        let mut realm = builder.build();
+        test_topology::expose_test_realm_protocol(&mut realm).await;
+        let instance = realm.create().await.expect("create instance");
+
+        let config = parse_config("/pkg/data/config/small-caches-config.json").unwrap();
         let messages_allowed_in_cache = config.logs.max_cached_original_bytes / TEST_PACKET_LEN;
 
-        let archive = || _archivist.connect_to_protocol::<ArchiveAccessorMarker>().unwrap();
+        let archive =
+            || instance.root.connect_to_protocol_at_exposed_dir::<ArchiveAccessorMarker>().unwrap();
         let mut inspect_reader = ArchiveReader::new();
         inspect_reader
             .with_archive(archive())
             .with_minimum_schema_count(1) // we only request inspect from our archivist
-            .add_selector("archivist-with-small-caches.cmx:root/logs_buffer")
-            .add_selector("archivist-with-small-caches.cmx:root/sources");
+            .add_selector("archivist:root/logs_buffer")
+            .add_selector("archivist:root/sources");
         let mut log_reader = ArchiveReader::new();
         log_reader
             .with_archive(archive())
@@ -132,9 +138,9 @@
         });
 
         Self {
-            launcher,
+            max_puppets,
             controllers,
-            _archivist,
+            instance,
             messages_allowed_in_cache,
             messages_sent: vec![],
             launched_monikers: vec![],
@@ -143,16 +149,18 @@
             log_reader,
             log_subscription,
             rng: StdRng::seed_from_u64(0xA455),
-            _serve_fs,
             _log_errors,
         }
     }
 
     async fn launch_puppet(&mut self, id: usize) {
-        let url =
-            format!("fuchsia-pkg://fuchsia.com/test-logs-budget#meta/socket-puppet{}.cmx", id);
-        info!(%url, "launching puppet");
-        let app = launch(&self.launcher, url, None).unwrap();
+        assert!(id < self.max_puppets);
+        let mut child_ref = ChildRef { name: format!("puppet-{}", id), collection: None };
+
+        let (_client_end, server_end) =
+            fidl::endpoints::create_endpoints::<DirectoryMarker>().unwrap();
+        let realm = self.instance.root.connect_to_protocol_at_exposed_dir::<RealmMarker>().unwrap();
+        realm.bind_child(&mut child_ref, server_end).await.unwrap().unwrap();
 
         debug!("waiting for controller request");
         let mut controller = self.controllers.next().await.unwrap();
@@ -169,14 +177,18 @@
             _ => panic!("did not expect that"),
         };
 
-        let moniker = format!("socket-puppet{}.cmx", id);
-        let puppet = Puppet { app, moniker, proxy };
+        let moniker = format!(
+            "fuchsia_component_test_collection:{}/test/puppet-{}",
+            self.instance.root.child_name(),
+            id
+        );
+        let puppet = Puppet { id, moniker: moniker.clone(), proxy };
 
         info!("having the puppet connect to LogSink");
         puppet.connect_to_log_sink().await.unwrap();
 
         info!("observe the puppet appears in archivist's inspect output");
-        self.launched_monikers.push(puppet.moniker.clone());
+        self.launched_monikers.push(moniker);
         self.running_puppets.push(puppet);
 
         // wait for archivist to catch up with what we launched
@@ -264,8 +276,7 @@
         let mut dropped_message_warnings = BTreeMap::new();
         for observed in observed_logs {
             if observed.metadata.errors.is_some() {
-                let moniker = observed.moniker.split(":").next().unwrap().to_string();
-                dropped_message_warnings.insert(moniker, observed);
+                dropped_message_warnings.insert(observed.moniker.clone(), observed);
             } else {
                 let expected = expected_logs.next().unwrap();
                 assert_eq!(expected, &observed);
@@ -293,16 +304,31 @@
         // messages for the stopped component and actually dropping it
         let iteration_for_killing_a_puppet = self.messages_allowed_in_cache;
 
+        let event_source =
+            EventSource::from_proxy(client::connect_to_protocol::<EventSourceMarker>().unwrap());
+        let mut event_stream = event_source
+            .subscribe(vec![EventSubscription::new(vec![Stopped::NAME], EventMode::Async)])
+            .await
+            .unwrap();
+
         info!("having the puppets log packets until overflow");
         for i in 0..overall_messages_to_log {
             trace!(i, "loop ticked");
             if i == iteration_for_killing_a_puppet {
-                let mut to_stop = self.running_puppets.pop().unwrap();
+                let to_stop = self.running_puppets.pop().unwrap();
                 let receipt = to_stop.emit_packet().await;
                 self.check_receipt(receipt).await;
 
-                to_stop.app.kill().unwrap();
-                to_stop.app.wait().await.unwrap();
+                let id = to_stop.id;
+                drop(to_stop);
+
+                utils::wait_for_component_stopped_event(
+                    &self.instance.root.child_name(),
+                    &format!("puppet-{}", id),
+                    ExitStatusMatcher::Clean,
+                    &mut event_stream,
+                )
+                .await;
             }
 
             let puppet = self.running_puppets.choose(&mut self.rng).unwrap();
@@ -330,7 +356,7 @@
 struct Puppet {
     proxy: SocketPuppetProxy,
     moniker: String,
-    app: App,
+    id: usize,
 }
 
 impl std::fmt::Debug for Puppet {
@@ -358,6 +384,19 @@
     }
 }
 
+async fn run_mocks(
+    mock_handles: mock::MockHandles,
+    mut sender: mpsc::Sender<SocketPuppetControllerRequestStream>,
+) -> Result<(), Error> {
+    let mut fs = ServiceFs::new();
+    fs.dir("svc").add_fidl_service(move |stream: SocketPuppetControllerRequestStream| {
+        sender.start_send(stream).unwrap();
+    });
+    fs.serve_connection(mock_handles.outgoing_dir.into_channel())?;
+    fs.collect::<()>().await;
+    Ok(())
+}
+
 #[derive(Clone, Copy, Debug, PartialEq)]
 struct Count {
     total: usize,
diff --git a/src/diagnostics/archivist/tests/v2/src/logs/crash.rs b/src/diagnostics/archivist/tests/v2/src/logs/crash.rs
index dc98867..cbbc7b2 100644
--- a/src/diagnostics/archivist/tests/v2/src/logs/crash.rs
+++ b/src/diagnostics/archivist/tests/v2/src/logs/crash.rs
@@ -2,14 +2,13 @@
 // Use of this source code is governed by a BSD-style license that can be
 // found in the LICENSE file.
 
-use crate::{constants::*, test_topology};
-use component_events::{events::*, matcher::*};
+use crate::{constants::*, test_topology, utils};
+use component_events::matcher::ExitStatusMatcher;
 use diagnostics_reader::{assert_data_tree, ArchiveReader, Logs, Severity};
 use fidl_fuchsia_diagnostics::ArchiveAccessorMarker;
 use fidl_fuchsia_io::DirectoryMarker;
-use fidl_fuchsia_sys2::{ChildRef, EventSourceMarker, RealmMarker};
+use fidl_fuchsia_sys2::{ChildRef, RealmMarker};
 use fuchsia_async::Task;
-use fuchsia_component::client;
 use futures::prelude::*;
 
 #[fuchsia::test]
@@ -36,29 +35,18 @@
         }
     });
 
-    let event_source =
-        EventSource::from_proxy(client::connect_to_protocol::<EventSourceMarker>().unwrap());
-    let mut event_stream = event_source
-        .subscribe(vec![EventSubscription::new(vec![Stopped::NAME], EventMode::Async)])
-        .await
-        .unwrap();
-
     let mut child_ref = ChildRef { name: "log_and_crash".to_string(), collection: None };
-    reader.retry_if_empty(true);
     // launch our child and wait for it to exit before asserting on its logs
     let (_client_end, server_end) = fidl::endpoints::create_endpoints::<DirectoryMarker>().unwrap();
     let realm = instance.root.connect_to_protocol_at_exposed_dir::<RealmMarker>().unwrap();
     realm.bind_child(&mut child_ref, server_end).await.unwrap().unwrap();
 
-    EventMatcher::ok()
-        .stop(Some(ExitStatusMatcher::AnyCrash))
-        .moniker(format!(
-            "./fuchsia_component_test_collection:{}:\\d+/test:\\d+/log_and_crash:\\d+",
-            instance.root.child_name()
-        ))
-        .wait::<Stopped>(&mut event_stream)
-        .await
-        .unwrap();
+    utils::wait_for_component_stopped(
+        &instance.root.child_name(),
+        "log_and_crash",
+        ExitStatusMatcher::AnyCrash,
+    )
+    .await;
 
     let crasher_info = logs.next().await.unwrap();
     assert_eq!(crasher_info.metadata.severity, Severity::Info);
diff --git a/src/diagnostics/archivist/tests/v2/src/logs/lifecycle.rs b/src/diagnostics/archivist/tests/v2/src/logs/lifecycle.rs
index 5946c78..72a618b 100644
--- a/src/diagnostics/archivist/tests/v2/src/logs/lifecycle.rs
+++ b/src/diagnostics/archivist/tests/v2/src/logs/lifecycle.rs
@@ -5,6 +5,7 @@
 use crate::{
     constants::*,
     test_topology::{self, expose_test_realm_protocol},
+    utils,
 };
 use component_events::{events::*, matcher::*};
 use diagnostics_reader::{assert_data_tree, ArchiveReader, Data, Logs};
@@ -158,7 +159,6 @@
 
     let event_source =
         EventSource::from_proxy(client::connect_to_protocol::<EventSourceMarker>().unwrap());
-
     let mut event_stream = event_source
         .subscribe(vec![EventSubscription::new(vec![Stopped::NAME], EventMode::Async)])
         .await
@@ -173,15 +173,13 @@
         let realm = instance.root.connect_to_protocol_at_exposed_dir::<RealmMarker>().unwrap();
         realm.bind_child(&mut child_ref, server_end).await.unwrap().unwrap();
 
-        EventMatcher::ok()
-            .stop(Some(ExitStatusMatcher::Clean))
-            .moniker(format!(
-                "./fuchsia_component_test_collection:{}:\\d+/test:\\d+/log_and_exit:\\d+",
-                instance.root.child_name()
-            ))
-            .wait::<Stopped>(&mut event_stream)
-            .await
-            .unwrap();
+        utils::wait_for_component_stopped_event(
+            &instance.root.child_name(),
+            "log_and_exit",
+            ExitStatusMatcher::Clean,
+            &mut event_stream,
+        )
+        .await;
 
         check_message(&moniker, subscription.next().await.unwrap());
 
diff --git a/src/diagnostics/archivist/tests/v2/src/logs/mod.rs b/src/diagnostics/archivist/tests/v2/src/logs/mod.rs
index 41fc4af..8815e23 100644
--- a/src/diagnostics/archivist/tests/v2/src/logs/mod.rs
+++ b/src/diagnostics/archivist/tests/v2/src/logs/mod.rs
@@ -3,6 +3,7 @@
 // found in the LICENSE file.
 
 mod attribution;
+mod budget;
 mod crash;
 mod lifecycle;
 mod redaction;
diff --git a/src/diagnostics/archivist/tests/v2/src/test_topology.rs b/src/diagnostics/archivist/tests/v2/src/test_topology.rs
index 56d42d2..b8c7690 100644
--- a/src/diagnostics/archivist/tests/v2/src/test_topology.rs
+++ b/src/diagnostics/archivist/tests/v2/src/test_topology.rs
@@ -45,17 +45,17 @@
         })?
         .add_route(CapabilityRoute {
             capability: Capability::Event(Event::Started, cm_rust::EventMode::Async),
-            source: RouteEndpoint::component("test"),
+            source: RouteEndpoint::AboveRoot,
             targets: vec![RouteEndpoint::component("test/archivist")],
         })?
         .add_route(CapabilityRoute {
             capability: Capability::Event(Event::Stopped, cm_rust::EventMode::Async),
-            source: RouteEndpoint::component("test"),
+            source: RouteEndpoint::AboveRoot,
             targets: vec![RouteEndpoint::component("test/archivist")],
         })?
         .add_route(CapabilityRoute {
             capability: Capability::Event(Event::Running, cm_rust::EventMode::Async),
-            source: RouteEndpoint::component("test"),
+            source: RouteEndpoint::AboveRoot,
             targets: vec![RouteEndpoint::component("test/archivist")],
         })?
         .add_route(CapabilityRoute {
diff --git a/src/diagnostics/archivist/tests/v2/src/utils.rs b/src/diagnostics/archivist/tests/v2/src/utils.rs
new file mode 100644
index 0000000..ec8c1e2
--- /dev/null
+++ b/src/diagnostics/archivist/tests/v2/src/utils.rs
@@ -0,0 +1,45 @@
+// Copyright 2021 The Fuchsia Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+use component_events::{events::*, matcher::*};
+use fidl_fuchsia_sys2::EventSourceMarker;
+use fuchsia_component::client;
+
+pub async fn wait_for_component_stopped(
+    instance_child_name: &str,
+    component: &str,
+    status_match: ExitStatusMatcher,
+) {
+    let event_source =
+        EventSource::from_proxy(client::connect_to_protocol::<EventSourceMarker>().unwrap());
+    let mut event_stream = event_source
+        .subscribe(vec![EventSubscription::new(vec![Stopped::NAME], EventMode::Async)])
+        .await
+        .unwrap();
+    wait_for_component_stopped_event(
+        instance_child_name,
+        component,
+        status_match,
+        &mut event_stream,
+    )
+    .await;
+}
+
+pub async fn wait_for_component_stopped_event(
+    instance_child_name: &str,
+    component: &str,
+    status_match: ExitStatusMatcher,
+    event_stream: &mut EventStream,
+) {
+    let moniker_for_match = format!(
+        "./fuchsia_component_test_collection:{}:\\d+/test:\\d+/{}:\\d+",
+        instance_child_name, component
+    );
+    EventMatcher::ok()
+        .stop(Some(status_match))
+        .moniker(moniker_for_match)
+        .wait::<Stopped>(event_stream)
+        .await
+        .unwrap();
+}