[logs] Add an out-of-band BudgetManager.

Prefactor for fxrev.dev/470522, lets us manage multiple buffers
centrally. Notably does not introduce asynchrony to the ingestion flow,
as we've previously discovered it's easy to blow our heap up a lot
with that.

Bug: 47661
Change-Id: I4efd4b05f2f21529d28f80ce6b6d4845626048ff
Reviewed-on: https://fuchsia-review.googlesource.com/c/fuchsia/+/475865
Commit-Queue: Auto-Submit <auto-submit@fuchsia-infra.iam.gserviceaccount.com>
Reviewed-by: Miguel Flores <miguelfrde@google.com>
Fuchsia-Auto-Submit: Adam Perry <adamperry@google.com>
diff --git a/src/diagnostics/archivist/BUILD.gn b/src/diagnostics/archivist/BUILD.gn
index d861b8d..7d61c24 100644
--- a/src/diagnostics/archivist/BUILD.gn
+++ b/src/diagnostics/archivist/BUILD.gn
@@ -102,6 +102,7 @@
     "src/lib.rs",
     "src/lifecycle/container.rs",
     "src/lifecycle/mod.rs",
+    "src/logs/budget.rs",
     "src/logs/buffer.rs",
     "src/logs/buffer/arc_list.rs",
     "src/logs/container.rs",
diff --git a/src/diagnostics/archivist/src/archivist.rs b/src/diagnostics/archivist/src/archivist.rs
index b55e808..722711b 100644
--- a/src/diagnostics/archivist/src/archivist.rs
+++ b/src/diagnostics/archivist/src/archivist.rs
@@ -17,7 +17,9 @@
                 EventSource,
             },
         },
-        logs::{redact::Redactor, socket::LogMessageSocket},
+        logs::{
+            budget::BudgetManager, buffer::ArcList, redact::Redactor, socket::LogMessageSocket,
+        },
         pipeline::Pipeline,
         repository::DataRepo,
     },
@@ -141,10 +143,12 @@
             && feedback_config.has_error())
             || (Path::new("/config/data/legacy_metrics").is_dir() && legacy_config.has_error()));
 
-        let diagnostics_repo = DataRepo::new(
+        let logs_buffer = ArcList::default();
+        let logs_budget = BudgetManager::new(
             archivist_configuration.logs.max_cached_original_bytes,
-            diagnostics::root(),
+            &logs_buffer,
         );
+        let diagnostics_repo = DataRepo::new(logs_buffer, &logs_budget, diagnostics::root());
 
         // The Inspect Repository offered to the ALL_ACCESS pipeline. This
         // repository is unique in that it has no statically configured
@@ -562,10 +566,7 @@
                 diagnostics_ready_data.metadata.timestamp.clone(),
             )
             .unwrap_or_else(|e| {
-                warn!(
-                    component = ?identity, ?e,
-                    "Failed to add inspect artifacts to repository"
-                );
+                warn!(%identity, ?e, "Failed to add inspect artifacts to repository");
             });
 
         // Let each pipeline know that a new component arrived, and allow the pipeline
@@ -574,10 +575,7 @@
         for pipeline in &locked_state.diagnostics_pipelines {
             pipeline.write().add_inspect_artifacts(&identity.relative_moniker).unwrap_or_else(
                 |e| {
-                    warn!(
-                        component = ?identity, ?e,
-                        "Failed to add inspect artifacts to pipeline wrapper"
-                    );
+                    warn!(%identity, ?e, "Failed to add inspect artifacts to pipeline wrapper");
                 },
             );
         }
@@ -595,7 +593,7 @@
             event_timestamp,
             component_start_time,
         ) {
-            error!(?identity, ?e, "Failed to add new component to repository");
+            error!(%identity, ?e, "Failed to add new component to repository");
         }
     }
 
@@ -613,13 +611,13 @@
         match event {
             ComponentEvent::Start(start) => {
                 let archived_metadata = start.metadata.clone();
-                debug!(identity = ?start.metadata.identity, "Adding new component.");
+                debug!(identity = %start.metadata.identity, "Adding new component.");
                 self.add_new_component(start.metadata.identity, start.metadata.timestamp, None);
                 self.archive_event("START", archived_metadata).await
             }
             ComponentEvent::Running(running) => {
                 let archived_metadata = running.metadata.clone();
-                debug!(identity = ?running.metadata.identity, "Component is running.");
+                debug!(identity = %running.metadata.identity, "Component is running.");
                 self.add_new_component(
                     running.metadata.identity,
                     running.metadata.timestamp,
@@ -630,13 +628,13 @@
             ComponentEvent::Stop(stop) => {
                 // TODO(fxbug.dev/53939): Get inspect data from repository before removing
                 // for post-mortem inspection.
-                debug!("Component stopped. id={:?}", &stop.metadata.identity);
+                debug!(identity = %stop.metadata.identity, "Component stopped.");
                 self.remove_from_inspect_repo(&stop.metadata.identity);
                 self.archive_event("STOP", stop.metadata).await
             }
             ComponentEvent::DiagnosticsReady(diagnostics_ready) => {
                 debug!(
-                    identity = ?diagnostics_ready.metadata.identity,
+                    identity = %diagnostics_ready.metadata.identity,
                     "Diagnostics directory is ready.",
                 );
                 self.populate_inspect_repo(diagnostics_ready).await;
diff --git a/src/diagnostics/archivist/src/container.rs b/src/diagnostics/archivist/src/container.rs
index 6d447ff..0627fc2 100644
--- a/src/diagnostics/archivist/src/container.rs
+++ b/src/diagnostics/archivist/src/container.rs
@@ -10,8 +10,8 @@
         inspect::container::InspectArtifactsContainer,
         lifecycle::container::LifecycleArtifactsContainer,
         logs::{
-            buffer::AccountedBuffer, container::LogsArtifactsContainer, stats::LogStreamStats,
-            Message,
+            budget::BudgetManager, buffer::ArcList, container::LogsArtifactsContainer,
+            stats::LogStreamStats, Message,
         },
     },
     diagnostics_data as schema,
@@ -22,7 +22,6 @@
     fuchsia_inspect::reader::snapshot::{Snapshot, SnapshotTree},
     fuchsia_inspect_derive::WithInspect,
     fuchsia_zircon as zx,
-    parking_lot::Mutex,
     std::{convert::TryFrom, sync::Arc},
 };
 
@@ -148,7 +147,8 @@
     pub fn logs(
         &mut self,
         // TODO(fxbug.dev/47611) remove this and construct a local buffer in this function
-        buffer: &Arc<Mutex<AccountedBuffer<Message>>>,
+        buffer: &ArcList<Message>,
+        budget: &BudgetManager,
         interest_selectors: &[LogInterestSelector],
     ) -> Arc<LogsArtifactsContainer> {
         if let Some(logs) = &self.logs {
@@ -162,6 +162,7 @@
                 interest_selectors,
                 stats,
                 buffer.clone(),
+                budget.handle(),
             ));
             self.logs = Some(container.clone());
             container
diff --git a/src/diagnostics/archivist/src/logs/budget.rs b/src/diagnostics/archivist/src/logs/budget.rs
new file mode 100644
index 0000000..26e4f6f
--- /dev/null
+++ b/src/diagnostics/archivist/src/logs/budget.rs
@@ -0,0 +1,58 @@
+// Copyright 2021 The Fuchsia Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+use crate::logs::{buffer::ArcList, message::Message};
+use parking_lot::Mutex;
+use std::sync::Arc;
+
+#[derive(Clone)]
+pub struct BudgetManager {
+    state: Arc<Mutex<BudgetState>>,
+}
+
+impl BudgetManager {
+    pub fn new(capacity: usize, buffer: &ArcList<Message>) -> Self {
+        Self {
+            state: Arc::new(Mutex::new(BudgetState {
+                buffer: buffer.clone(),
+                capacity,
+                current: 0,
+            })),
+        }
+    }
+
+    pub fn handle(&self) -> BudgetHandle {
+        BudgetHandle { state: self.state.clone() }
+    }
+}
+
+struct BudgetState {
+    buffer: ArcList<Message>,
+    current: usize,
+    capacity: usize,
+}
+
+impl BudgetState {
+    fn allocate(&mut self, size: usize) {
+        self.current += size;
+        while self.current > self.capacity {
+            self.current -= self
+                .buffer
+                .pop_front()
+                .expect("if we need to reduce capacity, there are messages")
+                .metadata
+                .size_bytes;
+        }
+    }
+}
+
+pub struct BudgetHandle {
+    state: Arc<Mutex<BudgetState>>,
+}
+
+impl BudgetHandle {
+    pub fn allocate(&self, size: usize) {
+        self.state.lock().allocate(size);
+    }
+}
diff --git a/src/diagnostics/archivist/src/logs/buffer.rs b/src/diagnostics/archivist/src/logs/buffer.rs
index 5891f06..97ceda8 100644
--- a/src/diagnostics/archivist/src/logs/buffer.rs
+++ b/src/diagnostics/archivist/src/logs/buffer.rs
@@ -2,149 +2,5 @@
 // Use of this source code is governed by a BSD-style license that can be
 // found in the LICENSE file.
 
-use fidl_fuchsia_diagnostics::StreamMode;
-use fuchsia_inspect as inspect;
-use fuchsia_inspect_derive::Inspect;
-use inspect::NumericProperty;
-
 mod arc_list;
-use arc_list::ArcList;
-pub use arc_list::{Cursor, LazyItem};
-
-/// A value which knows approximately how many bytes it requires to send or store.
-pub trait Accounted {
-    /// Bytes used by this value.
-    fn bytes_used(&self) -> usize;
-}
-
-/// A Memory bounded buffer. Sizes are calculated by items' implementation of `Accounted`.
-#[derive(Inspect)]
-pub struct AccountedBuffer<T> {
-    #[inspect(skip)]
-    buffer: ArcList<T>,
-    #[inspect(skip)]
-    total_size: usize,
-    #[inspect(skip)]
-    capacity: usize,
-    inspect_node: inspect::Node,
-    rolled_out_entries: inspect::UintProperty,
-}
-
-impl<T> AccountedBuffer<T>
-where
-    T: Accounted,
-{
-    pub fn new(capacity: usize) -> AccountedBuffer<T> {
-        assert!(capacity > 0, "capacity should be more than 0");
-        Self {
-            buffer: ArcList::default(),
-            capacity: capacity,
-            total_size: 0,
-            inspect_node: inspect::Node::default(),
-            rolled_out_entries: inspect::UintProperty::default(),
-        }
-    }
-
-    /// Add an item to the buffer.
-    ///
-    /// If adding the item overflows the capacity, oldest item(s) are deleted until under the limit.
-    pub fn push(&mut self, item: T) {
-        let size = item.bytes_used();
-        self.buffer.push_back(item);
-        self.total_size += size;
-        while self.total_size > self.capacity {
-            let bytes_freed =
-                self.buffer.pop_front().expect("there are items if reducing size").bytes_used();
-            self.total_size -= bytes_freed;
-            self.rolled_out_entries.add(1);
-        }
-    }
-
-    /// Return a lazy cursor over items in the buffer.
-    pub fn cursor(&self, mode: StreamMode) -> Cursor<T> {
-        self.buffer.cursor(mode)
-    }
-
-    /// Stop accepting new messages, flush cursors.
-    pub fn terminate(&mut self) {
-        self.buffer.terminate();
-    }
-}
-
-#[cfg(test)]
-mod tests {
-    use super::*;
-    use fuchsia_inspect::assert_inspect_tree;
-    use fuchsia_inspect_derive::WithInspect;
-    use futures::StreamExt;
-    use tracing::error;
-
-    impl Accounted for (i32, usize) {
-        fn bytes_used(&self) -> usize {
-            self.1
-        }
-    }
-
-    impl<T> AccountedBuffer<T>
-    where
-        T: Accounted + Clone,
-    {
-        /// Returns a Vec of the current items.
-        pub async fn collect(&self) -> Vec<T> {
-            let mut items = vec![];
-            let mut snapshot = self.buffer.cursor(StreamMode::Snapshot);
-
-            while let Some(item) = snapshot.next().await {
-                match item {
-                    arc_list::LazyItem::Next(i) => items.push((*i).clone()),
-                    arc_list::LazyItem::ItemsDropped(n) => {
-                        error!(%n, "dropped messages while collecting a backfill snapshot");
-                    }
-                }
-            }
-
-            items
-        }
-    }
-
-    const TEST_BUFFER_CAPACITY: usize = 12;
-
-    #[fuchsia_async::run_singlethreaded(test)]
-    async fn test_simple() {
-        let inspector = inspect::Inspector::new();
-        let mut m = AccountedBuffer::new(TEST_BUFFER_CAPACITY)
-            .with_inspect(inspector.root(), "buffer_stats")
-            .unwrap();
-        m.push((1, 4));
-        m.push((2, 4));
-        m.push((3, 4));
-        assert_eq!(&m.collect().await[..], &[(1, 4), (2, 4), (3, 4)]);
-        assert_inspect_tree!(inspector,
-        root: {
-            buffer_stats: {
-                rolled_out_entries: 0u64,
-            }
-        });
-    }
-
-    #[fuchsia_async::run_singlethreaded(test)]
-    async fn test_bound() {
-        let inspector = inspect::Inspector::new();
-        let mut m = AccountedBuffer::new(TEST_BUFFER_CAPACITY)
-            .with_inspect(inspector.root(), "buffer_stats")
-            .unwrap();
-        m.push((1, 4));
-        m.push((2, 4));
-        m.push((3, 5));
-        assert_eq!(&m.collect().await[..], &[(2, 4), (3, 5)]);
-        m.push((4, 4));
-        m.push((5, 4));
-        assert_eq!(&m.collect().await[..], &[(4, 4), (5, 4)]);
-        assert_inspect_tree!(inspector,
-        root: {
-            buffer_stats: {
-                rolled_out_entries: 3u64,
-            }
-        });
-    }
-}
+pub use arc_list::*;
diff --git a/src/diagnostics/archivist/src/logs/container.rs b/src/diagnostics/archivist/src/logs/container.rs
index 72cd3a60..622d936 100644
--- a/src/diagnostics/archivist/src/logs/container.rs
+++ b/src/diagnostics/archivist/src/logs/container.rs
@@ -5,7 +5,8 @@
 use crate::{
     container::ComponentIdentity,
     logs::{
-        buffer::AccountedBuffer,
+        budget::BudgetHandle,
+        buffer::ArcList,
         error::StreamError,
         socket::{Encoding, LogMessageSocket},
         stats::LogStreamStats,
@@ -30,8 +31,11 @@
     /// Inspect instrumentation.
     pub stats: Arc<LogStreamStats>,
 
+    /// Handle to the overall budget for log retention.
+    budget: BudgetHandle,
+
     /// Buffer for all log messages.
-    buffer: Arc<Mutex<AccountedBuffer<Message>>>,
+    buffer: ArcList<Message>,
 
     /// Mutable state for the container.
     state: Mutex<ContainerState>,
@@ -50,9 +54,11 @@
         identity: Arc<ComponentIdentity>,
         interest_selectors: &[LogInterestSelector],
         stats: LogStreamStats,
-        buffer: Arc<Mutex<AccountedBuffer<Message>>>,
+        buffer: ArcList<Message>,
+        budget: BudgetHandle,
     ) -> Self {
         let new = Self {
+            budget,
             buffer,
             identity,
             state: Mutex::new(ContainerState {
@@ -151,8 +157,9 @@
 
     /// Updates log stats in inspect and push the message onto the container's buffer.
     pub fn ingest_message(&self, message: Message) {
+        self.budget.allocate(message.metadata.size_bytes);
         self.stats.ingest_message(&message);
-        self.buffer.lock().push(message);
+        self.buffer.push_back(message);
     }
 
     /// Set the `Interest` for this component, calling `LogSink/OnRegisterInterest` with all
diff --git a/src/diagnostics/archivist/src/logs/listener.rs b/src/diagnostics/archivist/src/logs/listener.rs
index 9860c9d..d4ebf81 100644
--- a/src/diagnostics/archivist/src/logs/listener.rs
+++ b/src/diagnostics/archivist/src/logs/listener.rs
@@ -1,7 +1,7 @@
 // Copyright 2020 The Fuchsia Authors. All rights reserved.
 // Use of this source code is governed by a BSD-style license that can be found in the LICENSE file.
 
-use super::{buffer::Accounted, message::Message};
+use super::message::Message;
 use fidl::endpoints::ClientEnd;
 use fidl_fuchsia_logger::{
     LogFilterOptions, LogListenerSafeMarker, LogListenerSafeProxy, LogMessage,
@@ -100,7 +100,7 @@
         let mut batch_size = 0;
         let mut filtered_batch = vec![];
         for msg in messages {
-            let size = msg.bytes_used();
+            let size = msg.metadata.size_bytes;
             if self.filter.should_send(&msg) {
                 if batch_size + size > fidl_fuchsia_logger::MAX_LOG_MANY_SIZE_BYTES as usize {
                     self.send_filtered_logs(&mut filtered_batch).await;
diff --git a/src/diagnostics/archivist/src/logs/message.rs b/src/diagnostics/archivist/src/logs/message.rs
index 7124ef0..e37855e 100644
--- a/src/diagnostics/archivist/src/logs/message.rs
+++ b/src/diagnostics/archivist/src/logs/message.rs
@@ -4,7 +4,7 @@
 use crate::{
     container::ComponentIdentity,
     events::types::ComponentIdentifier,
-    logs::{buffer::Accounted, error::StreamError, stats::LogStreamStats},
+    logs::{error::StreamError, stats::LogStreamStats},
 };
 use byteorder::{ByteOrder, LittleEndian};
 use diagnostics_data::{LogError, Timestamp};
@@ -77,12 +77,6 @@
     }
 }
 
-impl Accounted for Message {
-    fn bytes_used(&self) -> usize {
-        self.metadata.size_bytes
-    }
-}
-
 impl Message {
     pub fn new(
         timestamp: impl Into<Timestamp>,
diff --git a/src/diagnostics/archivist/src/logs/mod.rs b/src/diagnostics/archivist/src/logs/mod.rs
index 0bc47ff..e6d15bc 100644
--- a/src/diagnostics/archivist/src/logs/mod.rs
+++ b/src/diagnostics/archivist/src/logs/mod.rs
@@ -2,6 +2,7 @@
 // Use of this source code is governed by a BSD-style license that can be
 // found in the LICENSE file.
 
+pub mod budget;
 pub mod buffer;
 pub mod container;
 pub mod debuglog;
diff --git a/src/diagnostics/archivist/src/logs/testing.rs b/src/diagnostics/archivist/src/logs/testing.rs
index 6129e2d..9da4a86 100644
--- a/src/diagnostics/archivist/src/logs/testing.rs
+++ b/src/diagnostics/archivist/src/logs/testing.rs
@@ -5,7 +5,11 @@
 use crate::{
     container::ComponentIdentity,
     events::types::{ComponentEvent, LogSinkRequestedEvent},
-    logs::message::{fx_log_packet_t, EMPTY_IDENTITY, MAX_DATAGRAM_LEN},
+    logs::{
+        budget::BudgetManager,
+        buffer::ArcList,
+        message::{fx_log_packet_t, EMPTY_IDENTITY, MAX_DATAGRAM_LEN},
+    },
     repository::DataRepo,
 };
 use async_trait::async_trait;
@@ -82,7 +86,9 @@
 
     fn make(hold_sinks: bool) -> Self {
         let inspector = Inspector::new();
-        let log_manager = DataRepo::new(1_000_000 /* ~1mb */, inspector.root());
+        let buffer = ArcList::default();
+        let budget = BudgetManager::new(1_000_000, &buffer);
+        let log_manager = DataRepo::new(buffer, &budget, inspector.root());
 
         let (listen_sender, listen_receiver) = mpsc::unbounded();
         let (log_proxy, log_stream) =
@@ -398,7 +404,9 @@
         .detach();
 
     let inspector = Inspector::new();
-    let lm = DataRepo::new(1_000_000 /* ~1mb */, inspector.root());
+    let buffer = ArcList::default();
+    let budget = BudgetManager::new(1_000_000, &buffer);
+    let lm = DataRepo::new(buffer, &budget, inspector.root());
     let (log_proxy, log_stream) = fidl::endpoints::create_proxy_and_stream::<LogMarker>().unwrap();
     lm.clone().handle_log(log_stream, log_sender);
     fasync::Task::spawn(lm.drain_debuglog(debug_log)).detach();
diff --git a/src/diagnostics/archivist/src/repository.rs b/src/diagnostics/archivist/src/repository.rs
index c8006ec..c42abd3 100644
--- a/src/diagnostics/archivist/src/repository.rs
+++ b/src/diagnostics/archivist/src/repository.rs
@@ -9,7 +9,8 @@
         inspect::container::{InspectArtifactsContainer, UnpopulatedInspectDataContainer},
         lifecycle::container::{LifecycleArtifactsContainer, LifecycleDataContainer},
         logs::{
-            buffer::{AccountedBuffer, LazyItem},
+            budget::BudgetManager,
+            buffer::{ArcList, LazyItem},
             container::LogsArtifactsContainer,
             debuglog::{DebugLog, DebugLogBridge, KERNEL_IDENTITY},
             error::LogsError,
@@ -24,13 +25,11 @@
     fidl_fuchsia_io::{DirectoryProxy, CLONE_FLAG_SAME_RIGHTS},
     fidl_fuchsia_logger::{LogInterestSelector, LogMarker, LogRequest, LogRequestStream},
     fuchsia_async::Task,
-    fuchsia_inspect as inspect,
-    fuchsia_inspect_derive::WithInspect,
-    fuchsia_zircon as zx,
+    fuchsia_inspect as inspect, fuchsia_zircon as zx,
     futures::channel::mpsc,
     futures::prelude::*,
     io_util,
-    parking_lot::{Mutex, RwLock},
+    parking_lot::RwLock,
     selectors,
     std::collections::HashMap,
     std::sync::Arc,
@@ -54,18 +53,20 @@
 #[cfg(test)]
 impl Default for DataRepo {
     fn default() -> Self {
-        DataRepo {
-            inner: DataRepoState::new(
-                crate::constants::LEGACY_DEFAULT_MAXIMUM_CACHED_LOGS_BYTES,
-                &Default::default(),
-            ),
-        }
+        let buffer = ArcList::default();
+        let budget =
+            BudgetManager::new(crate::constants::LEGACY_DEFAULT_MAXIMUM_CACHED_LOGS_BYTES, &buffer);
+        DataRepo { inner: DataRepoState::new(buffer, &budget, &Default::default()) }
     }
 }
 
 impl DataRepo {
-    pub fn new(logs_capacity: usize, parent: &fuchsia_inspect::Node) -> Self {
-        DataRepo { inner: DataRepoState::new(logs_capacity, parent) }
+    pub fn new(
+        logs_buffer: ArcList<Message>,
+        logs_budget: &BudgetManager,
+        parent: &fuchsia_inspect::Node,
+    ) -> Self {
+        DataRepo { inner: DataRepoState::new(logs_buffer, logs_budget, parent) }
     }
 
     /// Drain the kernel's debug log. The returned future completes once
@@ -166,7 +167,7 @@
     }
 
     pub fn cursor(&self, mode: StreamMode) -> impl Stream<Item = Arc<Message>> {
-        self.read().logs_buffer.lock().cursor(mode).map(|item| match item {
+        self.read().logs_buffer.cursor(mode).map(|item| match item {
             LazyItem::Next(m) => m,
             LazyItem::ItemsDropped(n) => Arc::new(Message::for_dropped(n)),
         })
@@ -175,7 +176,7 @@
     /// Stop accepting new messages, ensuring that pending Cursors return Poll::Ready(None) after
     /// consuming any messages received before this call.
     pub fn terminate_logs(&self) {
-        self.read().logs_buffer.lock().terminate();
+        self.read().logs_buffer.terminate();
     }
 }
 
@@ -183,20 +184,22 @@
     pub data_directories: trie::Trie<String, ComponentDiagnostics>,
     inspect_node: inspect::Node,
     logs_interest: Vec<LogInterestSelector>,
-    logs_buffer: Arc<Mutex<AccountedBuffer<Message>>>,
+    logs_budget: BudgetManager,
+    logs_buffer: ArcList<Message>,
 }
 
 impl DataRepoState {
-    fn new(logs_capacity: usize, parent: &fuchsia_inspect::Node) -> Arc<RwLock<Self>> {
+    fn new(
+        logs_buffer: ArcList<Message>,
+        logs_budget: &BudgetManager,
+        parent: &fuchsia_inspect::Node,
+    ) -> Arc<RwLock<Self>> {
         Arc::new(RwLock::new(Self {
             inspect_node: parent.create_child("sources"),
             data_directories: trie::Trie::new(),
             logs_interest: vec![],
-            logs_buffer: Arc::new(Mutex::new(
-                AccountedBuffer::new(logs_capacity)
-                    .with_inspect(parent, "logs_buffer")
-                    .expect("failed to attach inspect"),
-            )),
+            logs_budget: logs_budget.clone(),
+            logs_buffer,
         }))
     }
 
@@ -285,7 +288,8 @@
             () => {{
                 let mut to_insert =
                     ComponentDiagnostics::empty(Arc::new(identity), &self.inspect_node);
-                let logs = to_insert.logs(&self.logs_buffer, &self.logs_interest);
+                let logs =
+                    to_insert.logs(&self.logs_buffer, &self.logs_budget, &self.logs_interest);
                 self.data_directories.insert(trie_key, to_insert);
                 logs
             }};
@@ -294,7 +298,9 @@
         match self.data_directories.get_mut(trie_key.clone()) {
             Some(component) => match &mut component.get_values_mut()[..] {
                 [] => insert_component!(),
-                [existing] => existing.logs(&self.logs_buffer, &self.logs_interest),
+                [existing] => {
+                    existing.logs(&self.logs_buffer, &self.logs_budget, &self.logs_interest)
+                }
                 _ => unreachable!("invariant: each trie node has 0-1 entries"),
             },
             None => insert_component!(),