// Copyright 2021 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use crate::{identity::ComponentIdentity, logs::container::LogsArtifactsContainer};
use futures::channel::mpsc;
use parking_lot::Mutex;
use std::sync::{Arc, Weak};
use tracing::{debug, error};
pub struct BudgetManager {
state: Arc<Mutex<BudgetState>>,
impl BudgetManager {
pub fn new(capacity: usize) -> Self {
Self {
state: Arc::new(Mutex::new(BudgetState {
current: 0,
containers: vec![],
remover: None,
pub fn set_remover(&self, remover: mpsc::UnboundedSender<Arc<ComponentIdentity>>) {
self.state.lock().remover = Some(remover);
pub fn add_container(&self, container: Arc<LogsArtifactsContainer>) {
pub fn handle(&self) -> BudgetHandle {
BudgetHandle { state: Arc::downgrade(&self.state) }
struct BudgetState {
current: usize,
capacity: usize,
remover: Option<mpsc::UnboundedSender<Arc<ComponentIdentity>>>,
/// Log containers are stored in a `Vec` which is regularly sorted instead of a `BinaryHeap`
/// because `BinaryHeap`s are broken with interior mutability in the contained type which would
/// affect the `Ord` impl's results.
/// To use a BinaryHeap, we would have to make the container's `Ord` impl call
/// `oldest_timestamp()`, but the value of that changes every time `pop()` is called. At the
/// time of writing, `pop()` does not require a mutable reference. While it's only called from
/// this module, we don't have a way to statically enforce that. This means that in a
/// future change we could introduce incorrect and likely flakey behavior without any warning.
containers: Vec<Arc<LogsArtifactsContainer>>,
impl BudgetState {
fn allocate(&mut self, size: usize) {
self.current += size;
while self.current > self.capacity {
// find the container with the oldest log message
self.containers.sort_unstable_by_key(|c| c.oldest_timestamp().unwrap_or(std::i64::MAX));
let container_with_oldest = self
.expect("containers are added to budget before they can call allocate")
let oldest_message = container_with_oldest
.expect("if we need to free space, we have messages to remove");
self.current -= oldest_message.size();
// now we need to remove any containers that are no longer needed. this will usually only
// fire for components from which we've just dropped a message, but it also serves to clean
// up containers which may not have been removable when we first received the stop event.
// the below code is ~equivalent to the unstable drain_filter
let mut i = 0;
while i != self.containers.len() {
if !self.containers[i].should_retain() {
let container = self.containers.remove(i);
debug!(identity = %container.identity, "Removing now that we've popped the last message.");
if let Some(remover) = &self.remover {
remover.unbounded_send(container.identity.clone()).unwrap_or_else(|err| {
error!(%err, identity = %container.identity, "Failed to send identity for removal");
} else {
i += 1;
fn terminate(&self) {
for container in &self.containers {
pub struct BudgetHandle {
/// We keep a weak pointer to the budget state to avoid this ownership cycle:
/// `BudgetManager -> BudgetState -> LogsArtifactsContainer -> BudgetHandle -> BudgetState`
state: Weak<Mutex<BudgetState>>,
impl BudgetHandle {
pub fn allocate(&self, size: usize) {
self.state.upgrade().expect("budgetmanager outlives all containers").lock().allocate(size);
/// Terminate the log buffers of all components here in case we have some that have been
/// removed from the data repo but we haven't dropped ourselves.
pub fn terminate(&self) {
self.state.upgrade().expect("budgetmanager outlives all containers").lock().terminate();
mod tests {
use super::*;
use crate::{
container::LogsArtifactsContainer, multiplex::PinStream, stats::LogStreamStats,
use diagnostics_data::{LogsData, Severity};
use diagnostics_log_encoding::{
encode::Encoder, Argument, Record, Severity as StreamSeverity, Value,
use fidl_fuchsia_diagnostics::StreamMode;
use futures::{Stream, StreamExt};
use std::{
task::{Context, Poll},
struct CursorWrapper(PinStream<Arc<LogsData>>);
impl Stream for CursorWrapper {
type Item = Arc<LogsData>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
async fn verify_container_is_terminated_on_removal() {
let manager = BudgetManager::new(128);
let container_a = Arc::new(LogsArtifactsContainer::new(
let container_b = Arc::new(LogsArtifactsContainer::new(
assert_eq!(manager.state.lock().containers.len(), 2);
// Add a few test messages
let mut cursor = CursorWrapper(container_b.cursor(StreamMode::SnapshotThenSubscribe));
assert_eq!(, Some(Arc::new(fake_message(1))));
// This allocation exceeds capacity, so the B container is dropped and terminated.
assert_eq!(manager.state.lock().containers.len(), 1);
// The container was terminated too.
assert_eq!(container_b.buffer().final_entry(), 1);
// Container is terminated, the cursor should give None.
assert_eq!(, None);
fn fake_message_bytes(timestamp: i64) -> StoredMessage {
let record = Record {
severity: StreamSeverity::Debug,
arguments: vec![
Argument { name: "pid".to_string(), value: Value::UnsignedInt(123) },
Argument { name: "tid".to_string(), value: Value::UnsignedInt(456) },
let mut buffer = Cursor::new(vec![0u8; 1024]);
let mut encoder = Encoder::new(&mut buffer);
let encoded = &buffer.get_ref()[..buffer.position() as usize];
StoredMessage::structured(encoded, Default::default()).unwrap()
fn fake_message(timestamp: i64) -> LogsData {
diagnostics_data::LogsDataBuilder::new(diagnostics_data::BuilderArgs {
timestamp_nanos: timestamp.into(),
component_url: Some(TEST_IDENTITY.url.clone()),
moniker: TEST_IDENTITY.to_string(),
severity: Severity::Debug,