blob: e1105da10ffe27107cd98988459b30a46594bf2b [file] [log] [blame] [edit]
// Copyright 2021 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use {
futures::{channel::mpsc, StreamExt},
/// An unbounded mpsc channel connecting each runner to the counter thread.
/// CounterTx is cloned and given to all runners. Upon a successful operation, the runner
/// will send their actor's name over the channel.
/// The counter thread receives the actors name and updates the current operation count.
/// This ensures that runners are not blocked.
pub type CounterTx = mpsc::UnboundedSender<String>;
/// Starts a new thread that maintains a count of all successful operations
/// This thread will terminate when the target number of operations has been hit.
/// Returns the counter task and a CounterTx.
pub fn start_counter(target: u64) -> (Task<()>, CounterTx) {
let (tx, mut rx) = mpsc::unbounded();
let task = Task::blocking(async move {
// Keep track of global count + individual actor contributions
let mut count_map: HashMap<String, u64> = HashMap::new();
let mut total = 0;
// Run this task until the count has been met
while total < target {
// Wait for an actor to finish an operation
let key =;
// Update the actor's contribution
if let Some(value) = count_map.get_mut(&key) {
*value += 1;
} else {
count_map.insert(key, 1);
// Update global count
total += 1;
debug!("Counters -> [total:{}] {:?}", total, count_map);
(task, tx)