xds: Add logical dns cluster support to XdsDepManager

ClusterResolverLb gets the NameResolverRegistry from
LoadBalancer.Helper, so a new API was added in NameResover.Args to
propagate the same object to the name resolver tree.

RetryingNameResolver was exposed to xds. This is expected to be
temporary, as the retrying is being removed from ManagedChannelImpl and
moved into the resolvers. At that point, DnsNameResolverProvider would
wrap DnsNameResolver with a similar API to RetryingNameResolver and xds
would no longer be responsible.
diff --git a/api/src/main/java/io/grpc/NameResolver.java b/api/src/main/java/io/grpc/NameResolver.java
index 21064d7..9483f5f 100644
--- a/api/src/main/java/io/grpc/NameResolver.java
+++ b/api/src/main/java/io/grpc/NameResolver.java
@@ -303,6 +303,7 @@
     @Nullable private final Executor executor;
     @Nullable private final String overrideAuthority;
     @Nullable private final MetricRecorder metricRecorder;
+    @Nullable private final NameResolverRegistry nameResolverRegistry;
     @Nullable private final IdentityHashMap<Key<?>, Object> customArgs;
 
     private Args(Builder builder) {
@@ -316,6 +317,7 @@
       this.executor = builder.executor;
       this.overrideAuthority = builder.overrideAuthority;
       this.metricRecorder = builder.metricRecorder;
+      this.nameResolverRegistry = builder.nameResolverRegistry;
       this.customArgs = cloneCustomArgs(builder.customArgs);
     }
 
@@ -447,6 +449,18 @@
       return metricRecorder;
     }
 
+    /**
+     * Returns the {@link NameResolverRegistry} that the Channel uses to look for {@link
+     * NameResolver}s.
+     *
+     * @since 1.74.0
+     */
+    public NameResolverRegistry getNameResolverRegistry() {
+      if (nameResolverRegistry == null) {
+        throw new IllegalStateException("NameResolverRegistry is not set in Builder");
+      }
+      return nameResolverRegistry;
+    }
 
     @Override
     public String toString() {
@@ -461,6 +475,7 @@
           .add("executor", executor)
           .add("overrideAuthority", overrideAuthority)
           .add("metricRecorder", metricRecorder)
+          .add("nameResolverRegistry", nameResolverRegistry)
           .toString();
     }
 
@@ -480,6 +495,7 @@
       builder.setOffloadExecutor(executor);
       builder.setOverrideAuthority(overrideAuthority);
       builder.setMetricRecorder(metricRecorder);
+      builder.setNameResolverRegistry(nameResolverRegistry);
       builder.customArgs = cloneCustomArgs(customArgs);
       return builder;
     }
@@ -508,6 +524,7 @@
       private Executor executor;
       private String overrideAuthority;
       private MetricRecorder metricRecorder;
+      private NameResolverRegistry nameResolverRegistry;
       private IdentityHashMap<Key<?>, Object> customArgs;
 
       Builder() {
@@ -615,6 +632,16 @@
       }
 
       /**
+       * See {@link Args#getNameResolverRegistry}.  This is an optional field.
+       *
+       * @since 1.74.0
+       */
+      public Builder setNameResolverRegistry(NameResolverRegistry registry) {
+        this.nameResolverRegistry = registry;
+        return this;
+      }
+
+      /**
        * Builds an {@link Args}.
        *
        * @since 1.21.0
diff --git a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
index 9b756c3..e8f106c 100644
--- a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
+++ b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
@@ -597,7 +597,8 @@
             .setChannelLogger(channelLogger)
             .setOffloadExecutor(this.offloadExecutorHolder)
             .setOverrideAuthority(this.authorityOverride)
-            .setMetricRecorder(this.metricRecorder);
+            .setMetricRecorder(this.metricRecorder)
+            .setNameResolverRegistry(builder.nameResolverRegistry);
     builder.copyAllNameResolverCustomArgsTo(nameResolverArgsBuilder);
     this.nameResolverArgs = nameResolverArgsBuilder.build();
     this.nameResolver = getNameResolver(
@@ -685,11 +686,7 @@
     // We wrap the name resolver in a RetryingNameResolver to give it the ability to retry failures.
     // TODO: After a transition period, all NameResolver implementations that need retry should use
     //       RetryingNameResolver directly and this step can be removed.
-    NameResolver usedNameResolver = new RetryingNameResolver(resolver,
-          new BackoffPolicyRetryScheduler(new ExponentialBackoffPolicy.Provider(),
-              nameResolverArgs.getScheduledExecutorService(),
-              nameResolverArgs.getSynchronizationContext()),
-          nameResolverArgs.getSynchronizationContext());
+    NameResolver usedNameResolver = RetryingNameResolver.wrap(resolver, nameResolverArgs);
 
     if (overrideAuthority == null) {
       return usedNameResolver;
diff --git a/core/src/main/java/io/grpc/internal/RetryingNameResolver.java b/core/src/main/java/io/grpc/internal/RetryingNameResolver.java
index 55fedea..90827fa 100644
--- a/core/src/main/java/io/grpc/internal/RetryingNameResolver.java
+++ b/core/src/main/java/io/grpc/internal/RetryingNameResolver.java
@@ -27,13 +27,22 @@
  *
  * <p>The {@link NameResolver} used with this
  */
-final class RetryingNameResolver extends ForwardingNameResolver {
+public final class RetryingNameResolver extends ForwardingNameResolver {
+  public static NameResolver wrap(NameResolver retriedNameResolver, Args args) {
+    // For migration, this might become conditional
+    return new RetryingNameResolver(
+        retriedNameResolver,
+        new BackoffPolicyRetryScheduler(
+            new ExponentialBackoffPolicy.Provider(),
+            args.getScheduledExecutorService(),
+            args.getSynchronizationContext()),
+        args.getSynchronizationContext());
+  }
 
   private final NameResolver retriedNameResolver;
   private final RetryScheduler retryScheduler;
   private final SynchronizationContext syncContext;
 
-
   /**
    * Creates a new {@link RetryingNameResolver}.
    *
diff --git a/core/src/test/java/io/grpc/internal/DnsNameResolverTest.java b/core/src/test/java/io/grpc/internal/DnsNameResolverTest.java
index 130c01d..f5be078 100644
--- a/core/src/test/java/io/grpc/internal/DnsNameResolverTest.java
+++ b/core/src/test/java/io/grpc/internal/DnsNameResolverTest.java
@@ -207,14 +207,7 @@
 
     // In practice the DNS name resolver provider always wraps the resolver in a
     // RetryingNameResolver which adds retry capabilities to it. We use the same setup here.
-    return new RetryingNameResolver(
-        dnsResolver,
-        new BackoffPolicyRetryScheduler(
-            new ExponentialBackoffPolicy.Provider(),
-            fakeExecutor.getScheduledExecutorService(),
-            syncContext
-        ),
-        syncContext);
+    return (RetryingNameResolver) RetryingNameResolver.wrap(dnsResolver, args);
   }
 
   @Before
diff --git a/xds/src/main/java/io/grpc/xds/XdsDependencyManager.java b/xds/src/main/java/io/grpc/xds/XdsDependencyManager.java
index 82cfd9c..cc419e6 100644
--- a/xds/src/main/java/io/grpc/xds/XdsDependencyManager.java
+++ b/xds/src/main/java/io/grpc/xds/XdsDependencyManager.java
@@ -23,18 +23,27 @@
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import io.grpc.EquivalentAddressGroup;
 import io.grpc.NameResolver;
+import io.grpc.NameResolverProvider;
 import io.grpc.Status;
 import io.grpc.StatusOr;
 import io.grpc.SynchronizationContext;
+import io.grpc.internal.RetryingNameResolver;
+import io.grpc.xds.Endpoints.LocalityLbEndpoints;
 import io.grpc.xds.VirtualHost.Route.RouteAction.ClusterWeight;
 import io.grpc.xds.XdsClusterResource.CdsUpdate.ClusterType;
 import io.grpc.xds.XdsConfig.XdsClusterConfig.AggregateConfig;
 import io.grpc.xds.XdsConfig.XdsClusterConfig.EndpointConfig;
 import io.grpc.xds.XdsRouteConfigureResource.RdsUpdate;
+import io.grpc.xds.client.Locality;
 import io.grpc.xds.client.XdsClient;
 import io.grpc.xds.client.XdsClient.ResourceWatcher;
 import io.grpc.xds.client.XdsResourceType;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.EnumMap;
 import java.util.HashMap;
@@ -44,7 +53,6 @@
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
-import java.util.concurrent.ScheduledExecutorService;
 import javax.annotation.Nullable;
 
 /**
@@ -55,7 +63,7 @@
  */
 final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegistry {
   private enum TrackedWatcherTypeEnum {
-    LDS, RDS, CDS, EDS
+    LDS, RDS, CDS, EDS, DNS
   }
 
   private static final TrackedWatcherType<XdsListenerResource.LdsUpdate> LDS_TYPE =
@@ -66,12 +74,19 @@
       new TrackedWatcherType<>(TrackedWatcherTypeEnum.CDS);
   private static final TrackedWatcherType<XdsEndpointResource.EdsUpdate> EDS_TYPE =
       new TrackedWatcherType<>(TrackedWatcherTypeEnum.EDS);
+  private static final TrackedWatcherType<List<EquivalentAddressGroup>> DNS_TYPE =
+      new TrackedWatcherType<>(TrackedWatcherTypeEnum.DNS);
+
+  // DNS-resolved endpoints do not have the definition of the locality it belongs to, just hardcode
+  // to an empty locality.
+  private static final Locality LOGICAL_DNS_CLUSTER_LOCALITY = Locality.create("", "", "");
 
   private static final int MAX_CLUSTER_RECURSION_DEPTH = 16; // Specified by gRFC A37
   private final String listenerName;
   private final XdsClient xdsClient;
   private final SynchronizationContext syncContext;
   private final String dataPlaneAuthority;
+  private final NameResolver.Args nameResolverArgs;
   private XdsConfigWatcher xdsConfigWatcher;
 
   private StatusOr<XdsConfig> lastUpdate = null;
@@ -79,16 +94,17 @@
       new EnumMap<>(TrackedWatcherTypeEnum.class);
   private final Set<ClusterSubscription> subscriptions = new HashSet<>();
 
-  XdsDependencyManager(XdsClient xdsClient,
-                       SynchronizationContext syncContext, String dataPlaneAuthority,
-                       String listenerName, NameResolver.Args nameResolverArgs,
-                       ScheduledExecutorService scheduler) {
+  XdsDependencyManager(
+      XdsClient xdsClient,
+      SynchronizationContext syncContext,
+      String dataPlaneAuthority,
+      String listenerName,
+      NameResolver.Args nameResolverArgs) {
     this.listenerName = checkNotNull(listenerName, "listenerName");
     this.xdsClient = checkNotNull(xdsClient, "xdsClient");
     this.syncContext = checkNotNull(syncContext, "syncContext");
     this.dataPlaneAuthority = checkNotNull(dataPlaneAuthority, "dataPlaneAuthority");
-    checkNotNull(nameResolverArgs, "nameResolverArgs");
-    checkNotNull(scheduler, "scheduler");
+    this.nameResolverArgs = checkNotNull(nameResolverArgs, "nameResolverArgs");
   }
 
   public static String toContextStr(String typeName, String resourceName) {
@@ -120,6 +136,18 @@
     return subscription;
   }
 
+  /**
+   * For all logical dns clusters refresh their results.
+   */
+  public void requestReresolution() {
+    syncContext.execute(() -> {
+      for (TrackedWatcher<List<EquivalentAddressGroup>> watcher : getWatchers(DNS_TYPE).values()) {
+        DnsWatcher dnsWatcher = (DnsWatcher) watcher;
+        dnsWatcher.refresh();
+      }
+    });
+  }
+
   private <T extends ResourceUpdate> void addWatcher(
       TrackedWatcherType<T> watcherType, XdsWatcherBase<T> watcher) {
     syncContext.throwIfNotInThisSynchronizationContext();
@@ -335,9 +363,9 @@
         }
         break;
       case LOGICAL_DNS:
-        // TODO get the resolved endpoint configuration
-        child = new EndpointConfig(StatusOr.fromStatus(
-            Status.INTERNAL.withDescription("Logical DNS in dependency manager unsupported")));
+        TrackedWatcher<List<EquivalentAddressGroup>> dnsWatcher =
+            tracer.getWatcher(DNS_TYPE, cdsUpdate.dnsHostName());
+        child = new EndpointConfig(dnsToEdsUpdate(dnsWatcher.getData(), cdsUpdate.dnsHostName()));
         break;
       default:
         child = new EndpointConfig(StatusOr.fromStatus(Status.UNAVAILABLE.withDescription(
@@ -352,6 +380,24 @@
         new XdsConfig.XdsClusterConfig(clusterName, cdsUpdate, child)));
   }
 
+  private static StatusOr<XdsEndpointResource.EdsUpdate> dnsToEdsUpdate(
+      StatusOr<List<EquivalentAddressGroup>> dnsData, String dnsHostName) {
+    if (!dnsData.hasValue()) {
+      return StatusOr.fromStatus(dnsData.getStatus());
+    }
+
+    List<Endpoints.LbEndpoint> endpoints = new ArrayList<>();
+    for (EquivalentAddressGroup eag : dnsData.getValue()) {
+      endpoints.add(Endpoints.LbEndpoint.create(eag, 1, true, dnsHostName, ImmutableMap.of()));
+    }
+    LocalityLbEndpoints lbEndpoints =
+        LocalityLbEndpoints.create(endpoints, 1, 0, ImmutableMap.of());
+    return StatusOr.fromValue(new XdsEndpointResource.EdsUpdate(
+        "fakeEds_logicalDns",
+        Collections.singletonMap(LOGICAL_DNS_CLUSTER_LOCALITY, lbEndpoints),
+        new ArrayList<>()));
+  }
+
   private void addRdsWatcher(String resourceName) {
     if (getWatchers(RDS_TYPE).containsKey(resourceName)) {
       return;
@@ -376,6 +422,17 @@
     addWatcher(CDS_TYPE, new CdsWatcher(clusterName));
   }
 
+  private void addDnsWatcher(String dnsHostName) {
+    syncContext.throwIfNotInThisSynchronizationContext();
+    if (getWatchers(DNS_TYPE).containsKey(dnsHostName)) {
+      return;
+    }
+
+    DnsWatcher watcher = new DnsWatcher(dnsHostName, nameResolverArgs);
+    getWatchers(DNS_TYPE).put(dnsHostName, watcher);
+    watcher.start();
+  }
+
   private void updateRoutes(List<VirtualHost> virtualHosts) {
     VirtualHost virtualHost =
         RoutingUtils.findVirtualHostForHostName(virtualHosts, dataPlaneAuthority);
@@ -411,6 +468,33 @@
     return clusters;
   }
 
+  private static NameResolver createNameResolver(
+      String dnsHostName,
+      NameResolver.Args nameResolverArgs) {
+    URI uri;
+    try {
+      uri = new URI("dns", "", "/" + dnsHostName, null);
+    } catch (URISyntaxException e) {
+      return new FailingNameResolver(
+          Status.INTERNAL.withDescription("Bug, invalid URI creation: " + dnsHostName)
+            .withCause(e));
+    }
+
+    NameResolverProvider provider =
+        nameResolverArgs.getNameResolverRegistry().getProviderForScheme("dns");
+    if (provider == null) {
+      return new FailingNameResolver(
+          Status.INTERNAL.withDescription("Could not find dns name resolver"));
+    }
+
+    NameResolver bareResolver = provider.newNameResolver(uri, nameResolverArgs);
+    if (bareResolver == null) {
+      return new FailingNameResolver(
+          Status.INTERNAL.withDescription("DNS name resolver provider returned null: " + uri));
+    }
+    return RetryingNameResolver.wrap(bareResolver, nameResolverArgs);
+  }
+
   private static class TypeWatchers<T> {
     // Key is resource name
     final Map<String, TrackedWatcher<T>> watchers = new HashMap<>();
@@ -722,7 +806,7 @@
           addEdsWatcher(getEdsServiceName());
           break;
         case LOGICAL_DNS:
-          // no eds needed
+          addDnsWatcher(update.dnsHostName());
           break;
         case AGGREGATE:
           update.prioritizedClusterNames()
@@ -751,4 +835,101 @@
     @Override
     public void subscribeToChildren(XdsEndpointResource.EdsUpdate update) {}
   }
+
+  private final class DnsWatcher implements TrackedWatcher<List<EquivalentAddressGroup>> {
+    private final NameResolver resolver;
+    @Nullable
+    private StatusOr<List<EquivalentAddressGroup>> data;
+    private boolean cancelled;
+
+    public DnsWatcher(String dnsHostName, NameResolver.Args nameResolverArgs) {
+      this.resolver = createNameResolver(dnsHostName, nameResolverArgs);
+    }
+
+    public void start() {
+      resolver.start(new NameResolverListener());
+    }
+
+    public void refresh() {
+      if (cancelled) {
+        return;
+      }
+      resolver.refresh();
+    }
+
+    @Override
+    @Nullable
+    public StatusOr<List<EquivalentAddressGroup>> getData() {
+      return data;
+    }
+
+    @Override
+    public void close() {
+      if (cancelled) {
+        return;
+      }
+      cancelled = true;
+      resolver.shutdown();
+    }
+
+    private class NameResolverListener extends NameResolver.Listener2 {
+      @Override
+      public void onResult(final NameResolver.ResolutionResult resolutionResult) {
+        syncContext.execute(() -> onResult2(resolutionResult));
+      }
+
+      @Override
+      public Status onResult2(final NameResolver.ResolutionResult resolutionResult) {
+        if (cancelled) {
+          return Status.OK;
+        }
+        data = resolutionResult.getAddressesOrError();
+        maybePublishConfig();
+        return resolutionResult.getAddressesOrError().getStatus();
+      }
+
+      @Override
+      public void onError(final Status error) {
+        syncContext.execute(new Runnable() {
+          @Override
+          public void run() {
+            if (cancelled) {
+              return;
+            }
+            // DnsNameResolver cannot distinguish between address-not-found and transient errors.
+            // Assume it is a transient error.
+            // TODO: Once the resolution note API is available, don't throw away the error if
+            // hasDataValue(); pass it as the note instead
+            if (!hasDataValue()) {
+              data = StatusOr.fromStatus(error);
+              maybePublishConfig();
+            }
+          }
+        });
+      }
+    }
+  }
+
+  private static final class FailingNameResolver extends NameResolver {
+    private final Status status;
+
+    public FailingNameResolver(Status status) {
+      checkNotNull(status, "status");
+      checkArgument(!status.isOk(), "Status must not be OK");
+      this.status = status;
+    }
+
+    @Override
+    public void start(Listener2 listener) {
+      listener.onError(status);
+    }
+
+    @Override
+    public String getServiceAuthority() {
+      return "bug-if-you-see-this-authority";
+    }
+
+    @Override
+    public void shutdown() {}
+  }
 }
diff --git a/xds/src/main/java/io/grpc/xds/XdsNameResolver.java b/xds/src/main/java/io/grpc/xds/XdsNameResolver.java
index 25918fa..c71e4dc 100644
--- a/xds/src/main/java/io/grpc/xds/XdsNameResolver.java
+++ b/xds/src/main/java/io/grpc/xds/XdsNameResolver.java
@@ -655,7 +655,7 @@
       authority = overrideAuthority != null ? overrideAuthority : encodedServiceAuthority;
       xdsDependencyManager =
           new XdsDependencyManager(xdsClient, syncContext, authority, ldsResourceName,
-              nameResolverArgs, scheduler);
+              nameResolverArgs);
     }
 
     void start() {
diff --git a/xds/src/test/java/io/grpc/xds/CdsLoadBalancer2Test.java b/xds/src/test/java/io/grpc/xds/CdsLoadBalancer2Test.java
index f9a09f7..258e290 100644
--- a/xds/src/test/java/io/grpc/xds/CdsLoadBalancer2Test.java
+++ b/xds/src/test/java/io/grpc/xds/CdsLoadBalancer2Test.java
@@ -66,6 +66,7 @@
 import io.grpc.LoadBalancerProvider;
 import io.grpc.LoadBalancerRegistry;
 import io.grpc.NameResolver;
+import io.grpc.NameResolverRegistry;
 import io.grpc.Status;
 import io.grpc.Status.Code;
 import io.grpc.SynchronizationContext;
@@ -176,6 +177,7 @@
         .setServiceConfigParser(mock(NameResolver.ServiceConfigParser.class))
         .setChannelLogger(mock(ChannelLogger.class))
         .setScheduledExecutorService(fakeClock.getScheduledExecutorService())
+        .setNameResolverRegistry(new NameResolverRegistry())
         .build();
 
     xdsDepManager = new XdsDependencyManager(
@@ -183,8 +185,7 @@
         syncContext,
         SERVER_NAME,
         SERVER_NAME,
-        nameResolverArgs,
-        fakeClock.getScheduledExecutorService());
+        nameResolverArgs);
 
     controlPlaneService.setXdsConfig(ADS_TYPE_URL_LDS, ImmutableMap.of(
         SERVER_NAME, ControlPlaneRule.buildClientListener(SERVER_NAME, "my-route")));
diff --git a/xds/src/test/java/io/grpc/xds/XdsDependencyManagerTest.java b/xds/src/test/java/io/grpc/xds/XdsDependencyManagerTest.java
index 8ac0f8c..5a897de 100644
--- a/xds/src/test/java/io/grpc/xds/XdsDependencyManagerTest.java
+++ b/xds/src/test/java/io/grpc/xds/XdsDependencyManagerTest.java
@@ -42,12 +42,19 @@
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.protobuf.Message;
 import io.envoyproxy.envoy.config.cluster.v3.Cluster;
+import io.envoyproxy.envoy.config.core.v3.Address;
+import io.envoyproxy.envoy.config.core.v3.SocketAddress;
 import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment;
+import io.envoyproxy.envoy.config.endpoint.v3.Endpoint;
+import io.envoyproxy.envoy.config.endpoint.v3.LbEndpoint;
+import io.envoyproxy.envoy.config.endpoint.v3.LocalityLbEndpoints;
 import io.envoyproxy.envoy.config.listener.v3.Listener;
 import io.envoyproxy.envoy.config.route.v3.RouteConfiguration;
 import io.grpc.BindableService;
 import io.grpc.ChannelLogger;
+import io.grpc.EquivalentAddressGroup;
 import io.grpc.NameResolver;
+import io.grpc.NameResolverRegistry;
 import io.grpc.Status;
 import io.grpc.StatusOr;
 import io.grpc.StatusOrMatcher;
@@ -56,10 +63,12 @@
 import io.grpc.inprocess.InProcessServerBuilder;
 import io.grpc.internal.FakeClock;
 import io.grpc.internal.GrpcUtil;
+import io.grpc.internal.testing.FakeNameResolverProvider;
 import io.grpc.testing.GrpcCleanupRule;
 import io.grpc.xds.XdsClusterResource.CdsUpdate;
 import io.grpc.xds.XdsConfig.XdsClusterConfig;
 import io.grpc.xds.XdsEndpointResource.EdsUpdate;
+import io.grpc.xds.client.Locality;
 import io.grpc.xds.client.XdsClient;
 import io.grpc.xds.client.XdsClient.ResourceMetadata;
 import io.grpc.xds.client.XdsResourceType;
@@ -74,7 +83,6 @@
 import java.util.Map;
 import java.util.Queue;
 import java.util.Set;
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.logging.Logger;
@@ -121,6 +129,7 @@
   private final XdsTestControlPlaneService controlPlaneService = new XdsTestControlPlaneService();
   private final BindableService lrsService =
       XdsTestUtils.createLrsService(lrsEnded, loadReportCalls);
+  private final NameResolverRegistry nameResolverRegistry = new NameResolverRegistry();
 
   @Rule
   public final GrpcCleanupRule cleanupRule = new GrpcCleanupRule();
@@ -138,11 +147,11 @@
       .setServiceConfigParser(mock(NameResolver.ServiceConfigParser.class))
       .setChannelLogger(mock(ChannelLogger.class))
       .setScheduledExecutorService(fakeClock.getScheduledExecutorService())
+      .setNameResolverRegistry(nameResolverRegistry)
       .build();
 
-  private final ScheduledExecutorService scheduler = fakeClock.getScheduledExecutorService();
   private XdsDependencyManager xdsDependencyManager = new XdsDependencyManager(
-      xdsClient, syncContext, serverName, serverName, nameResolverArgs, scheduler);
+      xdsClient, syncContext, serverName, serverName, nameResolverArgs);
 
   @Before
   public void setUp() throws Exception {
@@ -369,7 +378,7 @@
   public void testMissingLds() {
     String ldsName = "badLdsName";
     xdsDependencyManager = new XdsDependencyManager(xdsClient, syncContext,
-        serverName, ldsName, nameResolverArgs, scheduler);
+        serverName, ldsName, nameResolverArgs);
     xdsDependencyManager.start(xdsConfigWatcher);
 
     fakeClock.forwardTime(16, TimeUnit.SECONDS);
@@ -434,7 +443,7 @@
         "xdstp://unknown.example.com/envoy.config.listener.v3.Listener/listener1";
 
     xdsDependencyManager = new XdsDependencyManager(xdsClient, syncContext,
-        serverName, ldsResourceName, nameResolverArgs, scheduler);
+        serverName, ldsResourceName, nameResolverArgs);
     xdsDependencyManager.start(xdsConfigWatcher);
 
     verify(xdsConfigWatcher).onUpdate(
@@ -739,6 +748,75 @@
   }
 
   @Test
+  public void testLogicalDns_success() {
+    FakeSocketAddress fakeAddress = new FakeSocketAddress();
+    nameResolverRegistry.register(new FakeNameResolverProvider(
+        "dns:///dns.example.com:1111", fakeAddress));
+    Cluster cluster = Cluster.newBuilder()
+        .setName(CLUSTER_NAME)
+        .setType(Cluster.DiscoveryType.LOGICAL_DNS)
+        .setLoadAssignment(ClusterLoadAssignment.newBuilder()
+          .addEndpoints(LocalityLbEndpoints.newBuilder()
+            .addLbEndpoints(LbEndpoint.newBuilder()
+              .setEndpoint(Endpoint.newBuilder()
+                .setAddress(Address.newBuilder()
+                  .setSocketAddress(SocketAddress.newBuilder()
+                    .setAddress("dns.example.com")
+                    .setPortValue(1111)))))))
+        .build();
+    controlPlaneService.setXdsConfig(ADS_TYPE_URL_CDS,
+        ImmutableMap.of(CLUSTER_NAME, cluster));
+    xdsDependencyManager.start(xdsConfigWatcher);
+
+    verify(xdsConfigWatcher).onUpdate(xdsUpdateCaptor.capture());
+    XdsConfig config = xdsUpdateCaptor.getValue().getValue();
+    XdsClusterConfig.ClusterChild clusterChild =
+        config.getClusters().get(CLUSTER_NAME).getValue().getChildren();
+    assertThat(clusterChild).isInstanceOf(XdsClusterConfig.EndpointConfig.class);
+    StatusOr<EdsUpdate> endpointOr = ((XdsClusterConfig.EndpointConfig) clusterChild).getEndpoint();
+    assertThat(endpointOr.getStatus()).isEqualTo(Status.OK);
+    assertThat(endpointOr.getValue()).isEqualTo(new EdsUpdate(
+        "fakeEds_logicalDns",
+        ImmutableMap.of(
+            Locality.create("", "", ""),
+            Endpoints.LocalityLbEndpoints.create(
+                Arrays.asList(Endpoints.LbEndpoint.create(
+                    new EquivalentAddressGroup(fakeAddress),
+                    1, true, "dns.example.com:1111", ImmutableMap.of())),
+                1, 0, ImmutableMap.of())),
+        Arrays.asList()));
+  }
+
+  @Test
+  public void testLogicalDns_noDnsNr() {
+    Cluster cluster = Cluster.newBuilder()
+        .setName(CLUSTER_NAME)
+        .setType(Cluster.DiscoveryType.LOGICAL_DNS)
+        .setLoadAssignment(ClusterLoadAssignment.newBuilder()
+          .addEndpoints(LocalityLbEndpoints.newBuilder()
+            .addLbEndpoints(LbEndpoint.newBuilder()
+              .setEndpoint(Endpoint.newBuilder()
+                .setAddress(Address.newBuilder()
+                  .setSocketAddress(SocketAddress.newBuilder()
+                    .setAddress("dns.example.com")
+                    .setPortValue(1111)))))))
+        .build();
+    controlPlaneService.setXdsConfig(ADS_TYPE_URL_CDS,
+        ImmutableMap.of(CLUSTER_NAME, cluster));
+    xdsDependencyManager.start(xdsConfigWatcher);
+
+    verify(xdsConfigWatcher).onUpdate(xdsUpdateCaptor.capture());
+    XdsConfig config = xdsUpdateCaptor.getValue().getValue();
+    XdsClusterConfig.ClusterChild clusterChild =
+        config.getClusters().get(CLUSTER_NAME).getValue().getChildren();
+    assertThat(clusterChild).isInstanceOf(XdsClusterConfig.EndpointConfig.class);
+    StatusOr<EdsUpdate> endpointOr = ((XdsClusterConfig.EndpointConfig) clusterChild).getEndpoint();
+    assertThat(endpointOr.getStatus().getCode()).isEqualTo(Status.Code.INTERNAL);
+    assertThat(endpointOr.getStatus().getDescription())
+        .isEqualTo("Could not find dns name resolver");
+  }
+
+  @Test
   public void testCdsError() throws IOException {
     controlPlaneService.setXdsConfig(
         ADS_TYPE_URL_CDS, ImmutableMap.of(XdsTestUtils.CLUSTER_NAME,
@@ -943,4 +1021,6 @@
           && xdsConfig.getClusters().keySet().containsAll(expectedNames);
     }
   }
+
+  private static class FakeSocketAddress extends java.net.SocketAddress {}
 }