Implement grpc.lb.backend_service optional label

This completes gRFC A89. 7162d2d66 and fc86084df had already implemented
the LB plumbing for the optional label on RPC metrics. This observes the
value in OpenTelemetry and adds it to WRR metrics as well.

https://github.com/grpc/proposal/blob/master/A89-backend-service-metric-label.md
diff --git a/api/src/main/java/io/grpc/NameResolver.java b/api/src/main/java/io/grpc/NameResolver.java
index e829536..21064d7 100644
--- a/api/src/main/java/io/grpc/NameResolver.java
+++ b/api/src/main/java/io/grpc/NameResolver.java
@@ -275,6 +275,11 @@
   @Documented
   public @interface ResolutionResultAttr {}
 
+  @ExperimentalApi("https://github.com/grpc/grpc-java/issues/11989")
+  @ResolutionResultAttr
+  public static final Attributes.Key<String> ATTR_BACKEND_SERVICE =
+      Attributes.Key.create("io.grpc.NameResolver.ATTR_BACKEND_SERVICE");
+
   /**
    * Information that a {@link Factory} uses to create a {@link NameResolver}.
    *
diff --git a/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsModule.java b/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsModule.java
index b6cf09d..82ad41e 100644
--- a/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsModule.java
+++ b/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsModule.java
@@ -17,6 +17,7 @@
 package io.grpc.opentelemetry;
 
 import static com.google.common.base.Preconditions.checkNotNull;
+import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.BACKEND_SERVICE_KEY;
 import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.LOCALITY_KEY;
 import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.METHOD_KEY;
 import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.STATUS_KEY;
@@ -70,7 +71,6 @@
  */
 final class OpenTelemetryMetricsModule {
   private static final Logger logger = Logger.getLogger(OpenTelemetryMetricsModule.class.getName());
-  private static final String LOCALITY_LABEL_NAME = "grpc.lb.locality";
   public static final ImmutableSet<String> DEFAULT_PER_CALL_METRICS_SET =
       ImmutableSet.of(
           "grpc.client.attempt.started",
@@ -90,6 +90,7 @@
   private final OpenTelemetryMetricsResource resource;
   private final Supplier<Stopwatch> stopwatchSupplier;
   private final boolean localityEnabled;
+  private final boolean backendServiceEnabled;
   private final ImmutableList<OpenTelemetryPlugin> plugins;
 
   OpenTelemetryMetricsModule(Supplier<Stopwatch> stopwatchSupplier,
@@ -97,7 +98,8 @@
       List<OpenTelemetryPlugin> plugins) {
     this.resource = checkNotNull(resource, "resource");
     this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier");
-    this.localityEnabled = optionalLabels.contains(LOCALITY_LABEL_NAME);
+    this.localityEnabled = optionalLabels.contains(LOCALITY_KEY.getKey());
+    this.backendServiceEnabled = optionalLabels.contains(BACKEND_SERVICE_KEY.getKey());
     this.plugins = ImmutableList.copyOf(plugins);
   }
 
@@ -162,6 +164,7 @@
     volatile long outboundWireSize;
     volatile long inboundWireSize;
     volatile String locality;
+    volatile String backendService;
     long attemptNanos;
     Code statusCode;
 
@@ -206,9 +209,12 @@
 
     @Override
     public void addOptionalLabel(String key, String value) {
-      if (LOCALITY_LABEL_NAME.equals(key)) {
+      if ("grpc.lb.locality".equals(key)) {
         locality = value;
       }
+      if ("grpc.lb.backend_service".equals(key)) {
+        backendService = value;
+      }
     }
 
     @Override
@@ -248,6 +254,13 @@
         }
         builder.put(LOCALITY_KEY, savedLocality);
       }
+      if (module.backendServiceEnabled) {
+        String savedBackendService = backendService;
+        if (savedBackendService == null) {
+          savedBackendService = "";
+        }
+        builder.put(BACKEND_SERVICE_KEY, savedBackendService);
+      }
       for (OpenTelemetryPlugin.ClientStreamPlugin plugin : streamPlugins) {
         plugin.addLabels(builder);
       }
diff --git a/opentelemetry/src/main/java/io/grpc/opentelemetry/internal/OpenTelemetryConstants.java b/opentelemetry/src/main/java/io/grpc/opentelemetry/internal/OpenTelemetryConstants.java
index 081e376..5214804 100644
--- a/opentelemetry/src/main/java/io/grpc/opentelemetry/internal/OpenTelemetryConstants.java
+++ b/opentelemetry/src/main/java/io/grpc/opentelemetry/internal/OpenTelemetryConstants.java
@@ -33,6 +33,9 @@
   public static final AttributeKey<String> LOCALITY_KEY =
       AttributeKey.stringKey("grpc.lb.locality");
 
+  public static final AttributeKey<String> BACKEND_SERVICE_KEY =
+      AttributeKey.stringKey("grpc.lb.backend_service");
+
   public static final List<Double> LATENCY_BUCKETS =
       ImmutableList.of(
           0d,     0.00001d, 0.00005d, 0.0001d, 0.0003d, 0.0006d, 0.0008d, 0.001d, 0.002d,
diff --git a/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryMetricsModuleTest.java b/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryMetricsModuleTest.java
index 6003599..77f0268 100644
--- a/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryMetricsModuleTest.java
+++ b/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryMetricsModuleTest.java
@@ -51,6 +51,7 @@
 import io.grpc.opentelemetry.OpenTelemetryMetricsModule.CallAttemptsTracerFactory;
 import io.grpc.opentelemetry.internal.OpenTelemetryConstants;
 import io.grpc.testing.GrpcServerRule;
+import io.opentelemetry.api.common.AttributeKey;
 import io.opentelemetry.api.metrics.Meter;
 import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
 import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule;
@@ -1071,6 +1072,140 @@
   }
 
   @Test
+  public void clientBackendServiceMetrics_present() {
+    String target = "target:///";
+    OpenTelemetryMetricsResource resource = GrpcOpenTelemetry.createMetricInstruments(testMeter,
+        enabledMetricsMap, disableDefaultMetrics);
+    OpenTelemetryMetricsModule module = new OpenTelemetryMetricsModule(
+        fakeClock.getStopwatchSupplier(), resource, Arrays.asList("grpc.lb.backend_service"),
+        emptyList());
+    OpenTelemetryMetricsModule.CallAttemptsTracerFactory callAttemptsTracerFactory =
+        new CallAttemptsTracerFactory(module, target, method.getFullMethodName(), emptyList());
+
+    ClientStreamTracer tracer =
+        callAttemptsTracerFactory.newClientStreamTracer(STREAM_INFO, new Metadata());
+    tracer.addOptionalLabel("grpc.lb.foo", "unimportant");
+    tracer.addOptionalLabel("grpc.lb.backend_service", "should-be-overwritten");
+    tracer.addOptionalLabel("grpc.lb.backend_service", "the-moon");
+    tracer.addOptionalLabel("grpc.lb.foo", "thats-no-moon");
+    tracer.streamClosed(Status.OK);
+    callAttemptsTracerFactory.callEnded(Status.OK);
+
+    io.opentelemetry.api.common.Attributes attributes = io.opentelemetry.api.common.Attributes.of(
+        TARGET_KEY, target,
+        METHOD_KEY, method.getFullMethodName());
+
+    io.opentelemetry.api.common.Attributes clientAttributes
+        = io.opentelemetry.api.common.Attributes.of(
+        TARGET_KEY, target,
+        METHOD_KEY, method.getFullMethodName(),
+        STATUS_KEY, Status.Code.OK.toString());
+
+    io.opentelemetry.api.common.Attributes clientAttributesWithBackendService
+        = clientAttributes.toBuilder()
+        .put(AttributeKey.stringKey("grpc.lb.backend_service"), "the-moon")
+        .build();
+
+    assertThat(openTelemetryTesting.getMetrics())
+        .satisfiesExactlyInAnyOrder(
+            metric ->
+                assertThat(metric)
+                    .hasName(CLIENT_ATTEMPT_COUNT_INSTRUMENT_NAME)
+                    .hasLongSumSatisfying(
+                        longSum -> longSum.hasPointsSatisfying(
+                            point -> point.hasAttributes(attributes))),
+            metric ->
+                assertThat(metric)
+                    .hasName(CLIENT_ATTEMPT_DURATION_INSTRUMENT_NAME)
+                    .hasHistogramSatisfying(
+                        histogram -> histogram.hasPointsSatisfying(
+                            point -> point.hasAttributes(clientAttributesWithBackendService))),
+            metric ->
+                assertThat(metric)
+                    .hasName(CLIENT_ATTEMPT_SENT_TOTAL_COMPRESSED_MESSAGE_SIZE)
+                    .hasHistogramSatisfying(
+                        histogram -> histogram.hasPointsSatisfying(
+                            point -> point.hasAttributes(clientAttributesWithBackendService))),
+            metric ->
+                assertThat(metric)
+                    .hasName(CLIENT_ATTEMPT_RECV_TOTAL_COMPRESSED_MESSAGE_SIZE)
+                    .hasHistogramSatisfying(
+                        histogram -> histogram.hasPointsSatisfying(
+                            point -> point.hasAttributes(clientAttributesWithBackendService))),
+            metric ->
+                assertThat(metric)
+                    .hasName(CLIENT_CALL_DURATION)
+                    .hasHistogramSatisfying(
+                        histogram -> histogram.hasPointsSatisfying(
+                            point -> point.hasAttributes(clientAttributes))));
+  }
+
+  @Test
+  public void clientBackendServiceMetrics_missing() {
+    String target = "target:///";
+    OpenTelemetryMetricsResource resource = GrpcOpenTelemetry.createMetricInstruments(testMeter,
+        enabledMetricsMap, disableDefaultMetrics);
+    OpenTelemetryMetricsModule module = new OpenTelemetryMetricsModule(
+        fakeClock.getStopwatchSupplier(), resource, Arrays.asList("grpc.lb.backend_service"),
+        emptyList());
+    OpenTelemetryMetricsModule.CallAttemptsTracerFactory callAttemptsTracerFactory =
+        new CallAttemptsTracerFactory(module, target, method.getFullMethodName(), emptyList());
+
+    ClientStreamTracer tracer =
+        callAttemptsTracerFactory.newClientStreamTracer(STREAM_INFO, new Metadata());
+    tracer.streamClosed(Status.OK);
+    callAttemptsTracerFactory.callEnded(Status.OK);
+
+    io.opentelemetry.api.common.Attributes attributes = io.opentelemetry.api.common.Attributes.of(
+        TARGET_KEY, target,
+        METHOD_KEY, method.getFullMethodName());
+
+    io.opentelemetry.api.common.Attributes clientAttributes
+        = io.opentelemetry.api.common.Attributes.of(
+        TARGET_KEY, target,
+        METHOD_KEY, method.getFullMethodName(),
+        STATUS_KEY, Status.Code.OK.toString());
+
+    io.opentelemetry.api.common.Attributes clientAttributesWithBackendService
+        = clientAttributes.toBuilder()
+        .put(AttributeKey.stringKey("grpc.lb.backend_service"), "")
+        .build();
+
+    assertThat(openTelemetryTesting.getMetrics())
+        .satisfiesExactlyInAnyOrder(
+            metric ->
+                assertThat(metric)
+                    .hasName(CLIENT_ATTEMPT_COUNT_INSTRUMENT_NAME)
+                    .hasLongSumSatisfying(
+                        longSum -> longSum.hasPointsSatisfying(
+                            point -> point.hasAttributes(attributes))),
+            metric ->
+                assertThat(metric)
+                    .hasName(CLIENT_ATTEMPT_DURATION_INSTRUMENT_NAME)
+                    .hasHistogramSatisfying(
+                        histogram -> histogram.hasPointsSatisfying(
+                            point -> point.hasAttributes(clientAttributesWithBackendService))),
+            metric ->
+                assertThat(metric)
+                    .hasName(CLIENT_ATTEMPT_SENT_TOTAL_COMPRESSED_MESSAGE_SIZE)
+                    .hasHistogramSatisfying(
+                        histogram -> histogram.hasPointsSatisfying(
+                            point -> point.hasAttributes(clientAttributesWithBackendService))),
+            metric ->
+                assertThat(metric)
+                    .hasName(CLIENT_ATTEMPT_RECV_TOTAL_COMPRESSED_MESSAGE_SIZE)
+                    .hasHistogramSatisfying(
+                        histogram -> histogram.hasPointsSatisfying(
+                            point -> point.hasAttributes(clientAttributesWithBackendService))),
+            metric ->
+                assertThat(metric)
+                    .hasName(CLIENT_CALL_DURATION)
+                    .hasHistogramSatisfying(
+                        histogram -> histogram.hasPointsSatisfying(
+                            point -> point.hasAttributes(clientAttributes))));
+  }
+
+  @Test
   public void serverBasicMetrics() {
     OpenTelemetryMetricsResource resource = GrpcOpenTelemetry.createMetricInstruments(testMeter,
         enabledMetricsMap, disableDefaultMetrics);
diff --git a/xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java b/xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java
index fd4f49f..034cdee 100644
--- a/xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java
+++ b/xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java
@@ -32,6 +32,7 @@
 import io.grpc.InternalLogId;
 import io.grpc.LoadBalancer;
 import io.grpc.Metadata;
+import io.grpc.NameResolver;
 import io.grpc.Status;
 import io.grpc.internal.ForwardingClientStreamTracer;
 import io.grpc.internal.GrpcUtil;
@@ -150,7 +151,9 @@
 
     childSwitchLb.handleResolvedAddresses(
         resolvedAddresses.toBuilder()
-            .setAttributes(attributes)
+            .setAttributes(attributes.toBuilder()
+              .set(NameResolver.ATTR_BACKEND_SERVICE, cluster)
+              .build())
             .setLoadBalancingPolicyConfig(config.childConfig)
             .build());
     return Status.OK;
diff --git a/xds/src/main/java/io/grpc/xds/WeightedRoundRobinLoadBalancer.java b/xds/src/main/java/io/grpc/xds/WeightedRoundRobinLoadBalancer.java
index b7b4fd5..c3d36f7 100644
--- a/xds/src/main/java/io/grpc/xds/WeightedRoundRobinLoadBalancer.java
+++ b/xds/src/main/java/io/grpc/xds/WeightedRoundRobinLoadBalancer.java
@@ -102,32 +102,44 @@
   private final long infTime;
   private final Ticker ticker;
   private String locality = "";
+  private String backendService = "";
   private SubchannelPicker currentPicker = new FixedResultPicker(PickResult.withNoResult());
 
   // The metric instruments are only registered once and shared by all instances of this LB.
   static {
     MetricInstrumentRegistry metricInstrumentRegistry
         = MetricInstrumentRegistry.getDefaultRegistry();
-    RR_FALLBACK_COUNTER = metricInstrumentRegistry.registerLongCounter("grpc.lb.wrr.rr_fallback",
+    RR_FALLBACK_COUNTER = metricInstrumentRegistry.registerLongCounter(
+        "grpc.lb.wrr.rr_fallback",
         "EXPERIMENTAL. Number of scheduler updates in which there were not enough endpoints "
             + "with valid weight, which caused the WRR policy to fall back to RR behavior",
-        "{update}", Lists.newArrayList("grpc.target"), Lists.newArrayList("grpc.lb.locality"),
+        "{update}",
+        Lists.newArrayList("grpc.target"),
+        Lists.newArrayList("grpc.lb.locality", "grpc.lb.backend_service"),
         false);
     ENDPOINT_WEIGHT_NOT_YET_USEABLE_COUNTER = metricInstrumentRegistry.registerLongCounter(
-        "grpc.lb.wrr.endpoint_weight_not_yet_usable", "EXPERIMENTAL. Number of endpoints "
-            + "from each scheduler update that don't yet have usable weight information",
-        "{endpoint}", Lists.newArrayList("grpc.target"), Lists.newArrayList("grpc.lb.locality"),
+        "grpc.lb.wrr.endpoint_weight_not_yet_usable",
+        "EXPERIMENTAL. Number of endpoints from each scheduler update that don't yet have usable "
+            + "weight information",
+        "{endpoint}",
+        Lists.newArrayList("grpc.target"),
+        Lists.newArrayList("grpc.lb.locality", "grpc.lb.backend_service"),
         false);
     ENDPOINT_WEIGHT_STALE_COUNTER = metricInstrumentRegistry.registerLongCounter(
         "grpc.lb.wrr.endpoint_weight_stale",
         "EXPERIMENTAL. Number of endpoints from each scheduler update whose latest weight is "
-            + "older than the expiration period", "{endpoint}", Lists.newArrayList("grpc.target"),
-        Lists.newArrayList("grpc.lb.locality"), false);
+            + "older than the expiration period",
+        "{endpoint}",
+        Lists.newArrayList("grpc.target"),
+        Lists.newArrayList("grpc.lb.locality", "grpc.lb.backend_service"),
+        false);
     ENDPOINT_WEIGHTS_HISTOGRAM = metricInstrumentRegistry.registerDoubleHistogram(
         "grpc.lb.wrr.endpoint_weights",
         "EXPERIMENTAL. The histogram buckets will be endpoint weight ranges.",
-        "{weight}", Lists.newArrayList(), Lists.newArrayList("grpc.target"),
-        Lists.newArrayList("grpc.lb.locality"),
+        "{weight}",
+        Lists.newArrayList(),
+        Lists.newArrayList("grpc.target"),
+        Lists.newArrayList("grpc.lb.locality", "grpc.lb.backend_service"),
         false);
   }
 
@@ -168,6 +180,13 @@
     } else {
       this.locality = "";
     }
+    String backendService
+        = resolvedAddresses.getAttributes().get(NameResolver.ATTR_BACKEND_SERVICE);
+    if (backendService != null) {
+      this.backendService = backendService;
+    } else {
+      this.backendService = "";
+    }
     config =
             (WeightedRoundRobinLoadBalancerConfig) resolvedAddresses.getLoadBalancingPolicyConfig();
 
@@ -232,7 +251,7 @@
       helper.getMetricRecorder()
           .recordDoubleHistogram(ENDPOINT_WEIGHTS_HISTOGRAM, newWeight,
               ImmutableList.of(helper.getChannelTarget()),
-              ImmutableList.of(locality));
+              ImmutableList.of(locality, backendService));
       newWeights[i] = newWeight > 0 ? (float) newWeight : 0.0f;
     }
 
@@ -240,18 +259,19 @@
       helper.getMetricRecorder()
           .addLongCounter(ENDPOINT_WEIGHT_STALE_COUNTER, staleEndpoints.get(),
               ImmutableList.of(helper.getChannelTarget()),
-              ImmutableList.of(locality));
+              ImmutableList.of(locality, backendService));
     }
     if (notYetUsableEndpoints.get() > 0) {
       helper.getMetricRecorder()
           .addLongCounter(ENDPOINT_WEIGHT_NOT_YET_USEABLE_COUNTER, notYetUsableEndpoints.get(),
-              ImmutableList.of(helper.getChannelTarget()), ImmutableList.of(locality));
+              ImmutableList.of(helper.getChannelTarget()),
+              ImmutableList.of(locality, backendService));
     }
     boolean weightsEffective = picker.updateWeight(newWeights);
     if (!weightsEffective) {
       helper.getMetricRecorder()
           .addLongCounter(RR_FALLBACK_COUNTER, 1, ImmutableList.of(helper.getChannelTarget()),
-              ImmutableList.of(locality));
+              ImmutableList.of(locality, backendService));
     }
   }
 
diff --git a/xds/src/test/java/io/grpc/xds/ClusterImplLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/ClusterImplLoadBalancerTest.java
index 09a1abb..7df0630 100644
--- a/xds/src/test/java/io/grpc/xds/ClusterImplLoadBalancerTest.java
+++ b/xds/src/test/java/io/grpc/xds/ClusterImplLoadBalancerTest.java
@@ -50,6 +50,7 @@
 import io.grpc.LoadBalancerRegistry;
 import io.grpc.ManagedChannel;
 import io.grpc.Metadata;
+import io.grpc.NameResolver;
 import io.grpc.Status;
 import io.grpc.Status.Code;
 import io.grpc.SynchronizationContext;
@@ -198,6 +199,7 @@
     assertThat(childBalancer.config).isSameInstanceAs(weightedTargetConfig);
     assertThat(childBalancer.attributes.get(XdsAttributes.XDS_CLIENT_POOL))
         .isSameInstanceAs(xdsClientPool);
+    assertThat(childBalancer.attributes.get(NameResolver.ATTR_BACKEND_SERVICE)).isEqualTo(CLUSTER);
   }
 
   /**
diff --git a/xds/src/test/java/io/grpc/xds/WeightedRoundRobinLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/WeightedRoundRobinLoadBalancerTest.java
index 9ab7832..bf23c65 100644
--- a/xds/src/test/java/io/grpc/xds/WeightedRoundRobinLoadBalancerTest.java
+++ b/xds/src/test/java/io/grpc/xds/WeightedRoundRobinLoadBalancerTest.java
@@ -58,6 +58,7 @@
 import io.grpc.Metadata;
 import io.grpc.MetricRecorder;
 import io.grpc.MetricSink;
+import io.grpc.NameResolver;
 import io.grpc.NoopMetricSink;
 import io.grpc.ServerCall;
 import io.grpc.ServerServiceDefinition;
@@ -161,6 +162,7 @@
 
   private String channelTarget = "channel-target";
   private String locality = "locality";
+  private String backendService = "the-backend-service";
 
   public WeightedRoundRobinLoadBalancerTest() {
     testHelperInstance = new TestHelper();
@@ -1119,7 +1121,9 @@
   public void metrics() {
     // Give WRR some valid addresses to work with.
     Attributes attributesWithLocality = Attributes.newBuilder()
-        .set(WeightedTargetLoadBalancer.CHILD_NAME, locality).build();
+        .set(WeightedTargetLoadBalancer.CHILD_NAME, locality)
+        .set(NameResolver.ATTR_BACKEND_SERVICE, backendService)
+        .build();
     syncContext.execute(() -> wrr.acceptResolvedAddresses(ResolvedAddresses.newBuilder()
         .setAddresses(servers).setLoadBalancingPolicyConfig(weightedConfig)
         .setAttributes(attributesWithLocality).build()));
@@ -1269,7 +1273,7 @@
         argThat((instr) -> instr.getName().equals("grpc.lb.wrr.rr_fallback")),
         eq(1L),
         eq(Arrays.asList("directaddress:///wrr-metrics")),
-        eq(Arrays.asList("")));
+        eq(Arrays.asList("", "")));
   }
 
   // Verifies that the MetricRecorder has been called to record a long counter value of 1 for the
@@ -1281,7 +1285,10 @@
           public boolean matches(LongCounterMetricInstrument longCounterInstrument) {
             return longCounterInstrument.getName().equals(name);
           }
-        }), eq(value), eq(Lists.newArrayList(channelTarget)), eq(Lists.newArrayList(locality)));
+        }),
+        eq(value),
+        eq(Lists.newArrayList(channelTarget)),
+        eq(Lists.newArrayList(locality, backendService)));
   }
 
   // Verifies that the MetricRecorder has been called to record a given double histogram value the
@@ -1293,7 +1300,10 @@
           public boolean matches(DoubleHistogramMetricInstrument doubleHistogramInstrument) {
             return doubleHistogramInstrument.getName().equals(name);
           }
-        }), eq(value), eq(Lists.newArrayList(channelTarget)), eq(Lists.newArrayList(locality)));
+        }),
+        eq(value),
+        eq(Lists.newArrayList(channelTarget)),
+        eq(Lists.newArrayList(locality, backendService)));
   }
 
   private int getNumFilteredPendingTasks() {