Delete QueuePool. Remove the obsolete `QueuePool` class and simplify the public API of `EagerRequestBatcher` by accepting `maxBatchSize` directly as an integer. PiperOrigin-RevId: 926216029 Change-Id: I99072ea8e0e12c2031ffd320fb1a5cf88c4c235a
diff --git a/src/main/java/com/google/devtools/build/lib/concurrent/EagerRequestBatcher.java b/src/main/java/com/google/devtools/build/lib/concurrent/EagerRequestBatcher.java index bdf3e95..978dcdf 100644 --- a/src/main/java/com/google/devtools/build/lib/concurrent/EagerRequestBatcher.java +++ b/src/main/java/com/google/devtools/build/lib/concurrent/EagerRequestBatcher.java
@@ -117,12 +117,12 @@ public static <RequestT, ResponseT> EagerRequestBatcher<RequestT, ResponseT> create( Multiplexer<RequestT, ResponseT> multiplexer, Executor responseDistributionExecutor, - QueuePool<RequestT, ResponseT> pool, + int maxBatchSize, int targetConcurrentRequests, Executor executor) { return new EagerRequestBatcher<>( RequestBatching.createBatchExecutionStrategy(multiplexer, responseDistributionExecutor), - pool, + maxBatchSize, targetConcurrentRequests, executor); } @@ -141,12 +141,12 @@ public static <RequestT, ResponseT> EagerRequestBatcher<RequestT, ResponseT> createWithCallbackMultiplexer( CallbackMultiplexer<RequestT, ResponseT> multiplexer, - QueuePool<RequestT, ResponseT> pool, + int maxBatchSize, int targetConcurrentRequests, Executor executor) { return new EagerRequestBatcher<>( RequestBatching.createCallbackBatchExecutionStrategy(multiplexer), - pool, + maxBatchSize, targetConcurrentRequests, executor); } @@ -165,12 +165,12 @@ public static <RequestT, ResponseT> EagerRequestBatcher<RequestT, ResponseT> createWithFutureMultiplexer( FutureMultiplexer<RequestT, ResponseT> multiplexer, - QueuePool<RequestT, ResponseT> pool, + int maxBatchSize, int targetConcurrentRequests, Executor executor) { return new EagerRequestBatcher<>( RequestBatching.createFutureBatchExecutionStrategy(multiplexer), - pool, + maxBatchSize, targetConcurrentRequests, executor); } @@ -179,12 +179,12 @@ @VisibleForTesting EagerRequestBatcher( BatchExecutionStrategy<RequestT, ResponseT> batchExecutionStrategy, - // Kept for API compatibility. To be removed in a follow up change. - QueuePool<RequestT, ResponseT> pool, + int maxBatchSize, int targetConcurrentRequests, Executor executor) { + checkArgument(maxBatchSize >= 1, "maxBatchSize must be >= 1"); this.batchExecutionStrategy = batchExecutionStrategy; - this.maxBatchSize = pool.getMaxBatchSize(); + this.maxBatchSize = maxBatchSize; checkArgument(targetConcurrentRequests >= 1, "targetConcurrentRequests must be >= 1"); this.targetConcurrentRequests = targetConcurrentRequests; this.executor = executor;
diff --git a/src/main/java/com/google/devtools/build/lib/concurrent/QueuePool.java b/src/main/java/com/google/devtools/build/lib/concurrent/QueuePool.java deleted file mode 100644 index 2b8b214..0000000 --- a/src/main/java/com/google/devtools/build/lib/concurrent/QueuePool.java +++ /dev/null
@@ -1,59 +0,0 @@ -// Copyright 2026 The Bazel Authors. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -package com.google.devtools.build.lib.concurrent; - -import static com.google.common.base.Preconditions.checkArgument; - -import com.google.devtools.build.lib.concurrent.RequestBatching.Operation; -import java.util.ArrayList; -import java.util.List; - -/** - * A shared, thread-local pool of {@link ArrayList} containing {@code Operation} instances. - * - * <p>This pool is designed to be shared across multiple {@link EagerRequestBatcher} instances. It - * eliminates churn by reusing the same thread-local list allocations. - */ -public final class QueuePool<RequestT, ResponseT> { - private final ThreadLocal<List<Operation<RequestT, ResponseT>>> pool; - private final int maxBatchSize; - - public QueuePool(int maxBatchSize) { - checkArgument(maxBatchSize >= 1, "maxBatchSize must be >= 1"); - this.maxBatchSize = maxBatchSize; - this.pool = - ThreadLocal.withInitial(() -> new ArrayList<Operation<RequestT, ResponseT>>(maxBatchSize)); - } - - /** - * Gets a list from the pool for the current thread. - * - * <p>IMPORTANT: if the caller modifies or takes ownership of this list, it must recycle a - * different, unowned, list. Otherwise, a later call to {@code getQueue} could return the same - * list and cause an aliasing bug. - */ - List<Operation<RequestT, ResponseT>> getQueue() { - return pool.get(); - } - - /** Clears the list and returns it to the pool for the current thread. */ - void recycleQueue(List<Operation<RequestT, ResponseT>> queue) { - queue.clear(); - pool.set(queue); - } - - public int getMaxBatchSize() { - return maxBatchSize; - } -}
diff --git a/src/test/java/com/google/devtools/build/lib/concurrent/EagerRequestBatcherTest.java b/src/test/java/com/google/devtools/build/lib/concurrent/EagerRequestBatcherTest.java index e040735..4210403 100644 --- a/src/test/java/com/google/devtools/build/lib/concurrent/EagerRequestBatcherTest.java +++ b/src/test/java/com/google/devtools/build/lib/concurrent/EagerRequestBatcherTest.java
@@ -58,7 +58,7 @@ EagerRequestBatcher.<Request, Response>create( requests -> immediateFuture(respondTo(requests)), directExecutor(), - new QueuePool<Request, Response>(10), + /* maxBatchSize= */ 10, /* targetConcurrentRequests= */ 1, directExecutor()); ListenableFuture<Response> response = batcher.submit(new Request(1)); @@ -71,10 +71,7 @@ var strategy = RequestBatching.createBatchExecutionStrategy(multiplexer, directExecutor()); var batcher = new EagerRequestBatcher<>( - strategy, - new QueuePool<Request, Response>(10), - /* targetConcurrentRequests= */ 2, - directExecutor()); + strategy, /* maxBatchSize= */ 10, /* targetConcurrentRequests= */ 2, directExecutor()); // Scenario A: Eager sending due to low concurrency. // State established: @@ -178,10 +175,7 @@ RequestBatching.createBatchExecutionStrategy(faultyMultiplexer, directExecutor()); var batcher = new EagerRequestBatcher<>( - strategy, - new QueuePool<Request, Response>(10), - /* targetConcurrentRequests= */ 1, - directExecutor()); + strategy, /* maxBatchSize= */ 10, /* targetConcurrentRequests= */ 1, directExecutor()); ListenableFuture<Response> response = batcher.submit(new Request(1)); @@ -195,7 +189,7 @@ var goodBatcher = new EagerRequestBatcher<>( goodStrategy, - new QueuePool<Request, Response>(10), + /* maxBatchSize= */ 10, /* targetConcurrentRequests= */ 1, directExecutor()); @@ -221,10 +215,7 @@ var batcher = new EagerRequestBatcher<>( - strategy, - new QueuePool<Request, Response>(10), - /* targetConcurrentRequests= */ 1, - recordingExecutor); + strategy, /* maxBatchSize= */ 10, /* targetConcurrentRequests= */ 1, recordingExecutor); ListenableFuture<Response> r1 = batcher.submit(new Request(1)); BatchedOperations batch1 = multiplexer.queue.take(); @@ -251,7 +242,7 @@ } @Test - public void queuePool_safety_nestedSubmissions() throws Exception { + public void nestedSubmissions_workCorrectly() throws Exception { var multiplexer = new SettableMultiplexer(); var batcherRef = new AtomicReference<EagerRequestBatcher<Request, Response>>(); var nestedResponseRef = new AtomicReference<ListenableFuture<Response>>(); @@ -276,7 +267,7 @@ var batcher = new EagerRequestBatcher<>( goodStrategy, - new QueuePool<Request, Response>(10), + /* maxBatchSize= */ 10, /* targetConcurrentRequests= */ 1, directExecutor()); batcherRef.set(batcher); @@ -324,7 +315,7 @@ var batcher = EagerRequestBatcher.<Request, Response>createWithCallbackMultiplexer( callbackMultiplexer, - new QueuePool<Request, Response>(2), + /* maxBatchSize= */ 2, /* targetConcurrentRequests= */ 1, directExecutor()); @@ -346,7 +337,7 @@ var batcher = EagerRequestBatcher.<Request, Response>createWithFutureMultiplexer( futureMultiplexer, - new QueuePool<Request, Response>(2), + /* maxBatchSize= */ 2, /* targetConcurrentRequests= */ 1, directExecutor()); @@ -354,61 +345,28 @@ assertThat(r1.get()).isEqualTo(new Response(1)); } - @Test - public void sharedQueuePool_worksWithoutIssues() throws Exception { - var pool = new QueuePool<Request, Response>(10); - var multiplexer1 = new SettableMultiplexer(); - var multiplexer2 = new SettableMultiplexer(); - var strategy1 = RequestBatching.createBatchExecutionStrategy(multiplexer1, directExecutor()); - var strategy2 = RequestBatching.createBatchExecutionStrategy(multiplexer2, directExecutor()); - - var batcher1 = - new EagerRequestBatcher<>( - strategy1, pool, /* targetConcurrentRequests= */ 1, directExecutor()); - var batcher2 = - new EagerRequestBatcher<>( - strategy2, pool, /* targetConcurrentRequests= */ 1, directExecutor()); - - var testThread = - new Thread( - () -> { - try { - ListenableFuture<Response> r1 = batcher1.submit(new Request(1)); - BatchedOperations batch1 = multiplexer1.queue.take(); - batch1.setSimpleResponses(); - r1.get(); - - ListenableFuture<Response> r2 = batcher2.submit(new Request(2)); - BatchedOperations batch2 = multiplexer2.queue.take(); - batch2.setSimpleResponses(); - r2.get(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException(e); - } catch (ExecutionException e) { - throw new RuntimeException(e); - } - }); - testThread.start(); - testThread.join(); - } @Test public void parameterValidation() { - assertThrows(IllegalArgumentException.class, () -> new QueuePool<Object, Object>(0)); - assertThrows(IllegalArgumentException.class, () -> new QueuePool<Object, Object>(-1)); - - var pool = new QueuePool<Request, Response>(10); var strategy = RequestBatching.createBatchExecutionStrategy(new SettableMultiplexer(), directExecutor()); + // Validate maxBatchSize assertThrows( IllegalArgumentException.class, - () -> new EagerRequestBatcher<>(strategy, pool, 0, directExecutor())); + () -> new EagerRequestBatcher<>(strategy, 0, 1, directExecutor())); assertThrows( IllegalArgumentException.class, - () -> new EagerRequestBatcher<>(strategy, pool, -1, directExecutor())); + () -> new EagerRequestBatcher<>(strategy, -1, 1, directExecutor())); + + // Validate targetConcurrentRequests + assertThrows( + IllegalArgumentException.class, + () -> new EagerRequestBatcher<>(strategy, 10, 0, directExecutor())); + assertThrows( + IllegalArgumentException.class, + () -> new EagerRequestBatcher<>(strategy, 10, -1, directExecutor())); } private static class SettableMultiplexer implements Multiplexer<Request, Response> { @@ -452,11 +410,7 @@ LatencyMultiplexer multiplexer = new LatencyMultiplexer(latencyMs, multiplexerExecutor); var strategy = RequestBatching.createBatchExecutionStrategy(multiplexer, directExecutor()); var batcher = - new EagerRequestBatcher<>( - strategy, - new QueuePool<Request, Response>(maxBatchSize), - targetConcurrency, - directExecutor()); + new EagerRequestBatcher<>(strategy, maxBatchSize, targetConcurrency, directExecutor()); CyclicBarrier barrier = new CyclicBarrier(numThreads); CountDownLatch latch = new CountDownLatch(numThreads); @@ -508,11 +462,7 @@ var multiplexer = new SettableMultiplexer(); var strategy = RequestBatching.createBatchExecutionStrategy(multiplexer, directExecutor()); var batcher = - new EagerRequestBatcher<>( - strategy, - new QueuePool<Request, Response>(maxBatchSize), - targetConcurrency, - directExecutor()); + new EagerRequestBatcher<>(strategy, maxBatchSize, targetConcurrency, directExecutor()); List<ListenableFuture<Response>> futures = new ArrayList<>(); for (int i = 1; i <= 50; i++) { @@ -594,11 +544,7 @@ var multiplexer = new MockExceptionMultiplexer(); var strategy = RequestBatching.createBatchExecutionStrategy(multiplexer, directExecutor()); var batcher = - new EagerRequestBatcher<>( - strategy, - new QueuePool<Request, Response>(maxBatchSize), - targetConcurrency, - directExecutor()); + new EagerRequestBatcher<>(strategy, maxBatchSize, targetConcurrency, directExecutor()); ListenableFuture<Response> r1 = batcher.submit(new Request(1)); assertThat(batcher.getInFlightCount()).isEqualTo(0); @@ -638,11 +584,7 @@ var multiplexer = new SettableMultiplexer(); var strategy = RequestBatching.createBatchExecutionStrategy(multiplexer, directExecutor()); var batcher = - new EagerRequestBatcher<>( - strategy, - new QueuePool<Request, Response>(maxBatchSize), - targetConcurrency, - directExecutor()); + new EagerRequestBatcher<>(strategy, maxBatchSize, targetConcurrency, directExecutor()); ListenableFuture<Response> r1 = batcher.submit(new Request(1)); assertThat(batcher.getInFlightCount()).isEqualTo(1); @@ -759,11 +701,7 @@ AsyncCountingMultiplexer multiplexer = new AsyncCountingMultiplexer(multiplexerExecutor); var strategy = RequestBatching.createBatchExecutionStrategy(multiplexer, directExecutor()); var batcher = - new EagerRequestBatcher<>( - strategy, - new QueuePool<Request, Response>(maxBatchSize), - targetConcurrency, - directExecutor()); + new EagerRequestBatcher<>(strategy, maxBatchSize, targetConcurrency, directExecutor()); CyclicBarrier barrier = new CyclicBarrier(numThreads); CountDownLatch latch = new CountDownLatch(numThreads); @@ -818,11 +756,7 @@ var multiplexer = new SettableMultiplexer(); var strategy = RequestBatching.createBatchExecutionStrategy(multiplexer, directExecutor()); var batcher = - new EagerRequestBatcher<>( - strategy, - new QueuePool<Request, Response>(maxBatchSize), - targetConcurrency, - directExecutor()); + new EagerRequestBatcher<>(strategy, maxBatchSize, targetConcurrency, directExecutor()); ListenableFuture<Response> r1 = batcher.submit(new Request(1)); assertThat(batcher.getInFlightCount()).isEqualTo(1);