xds: XdsDepManager should ignore updates after shutdown
This prevents a NPE and subsequent channel panic when trying to build a
config (because there are no watchers, so waitingOnResource==false)
without any listener and route.
```
java.lang.NullPointerException: Cannot invoke "io.grpc.xds.XdsDependencyManager$RdsUpdateSupplier.getRdsUpdate()" because "routeSource" is null
at io.grpc.xds.XdsDependencyManager.buildUpdate(XdsDependencyManager.java:295)
at io.grpc.xds.XdsDependencyManager.maybePublishConfig(XdsDependencyManager.java:266)
at io.grpc.xds.XdsDependencyManager$EdsWatcher.onChanged(XdsDependencyManager.java:899)
at io.grpc.xds.XdsDependencyManager$EdsWatcher.onChanged(XdsDependencyManager.java:888)
at io.grpc.xds.client.XdsClientImpl$ResourceSubscriber.notifyWatcher(XdsClientImpl.java:929)
at io.grpc.xds.client.XdsClientImpl$ResourceSubscriber.lambda$onData$0(XdsClientImpl.java:837)
at io.grpc.SynchronizationContext.drain(SynchronizationContext.java:96)
```
I think this fully-fixes the problem today, but not tomorrow.
subscribeToCluster() is racy as well, but not yet used.
This was noticed when idleTimeout was firing, with some other code
calling getState(true) to wake the channel back up. That may have made
this panic more visible than it would be otherwise, but that has not
been investigated.
b/412474567diff --git a/xds/src/main/java/io/grpc/xds/XdsDependencyManager.java b/xds/src/main/java/io/grpc/xds/XdsDependencyManager.java
index 8cd3119..d804954 100644
--- a/xds/src/main/java/io/grpc/xds/XdsDependencyManager.java
+++ b/xds/src/main/java/io/grpc/xds/XdsDependencyManager.java
@@ -199,6 +199,7 @@
for (Map.Entry<String, XdsWatcherBase<T>> watcherEntry : watchers.watchers.entrySet()) {
xdsClient.cancelXdsResourceWatch(watchers.resourceType, watcherEntry.getKey(),
watcherEntry.getValue());
+ watcherEntry.getValue().cancelled = true;
}
}
@@ -591,6 +592,9 @@
@Override
public void onError(Status error) {
checkNotNull(error, "error");
+ if (cancelled) {
+ return;
+ }
// Don't update configuration on error, if we've already received configuration
if (!hasDataValue()) {
setDataAsStatus(Status.UNAVAILABLE.withDescription(
@@ -659,6 +663,9 @@
@Override
public void onChanged(XdsListenerResource.LdsUpdate update) {
checkNotNull(update, "update");
+ if (cancelled) {
+ return;
+ }
HttpConnectionManager httpConnectionManager = update.httpConnectionManager();
List<VirtualHost> virtualHosts;
@@ -787,6 +794,9 @@
@Override
public void onChanged(RdsUpdate update) {
checkNotNull(update, "update");
+ if (cancelled) {
+ return;
+ }
List<VirtualHost> oldVirtualHosts = hasDataValue()
? getData().getValue().virtualHosts
: Collections.emptyList();
@@ -815,6 +825,9 @@
@Override
public void onChanged(XdsClusterResource.CdsUpdate update) {
checkNotNull(update, "update");
+ if (cancelled) {
+ return;
+ }
switch (update.clusterType()) {
case EDS:
setData(update);
@@ -895,6 +908,9 @@
@Override
public void onChanged(XdsEndpointResource.EdsUpdate update) {
+ if (cancelled) {
+ return;
+ }
setData(checkNotNull(update, "update"));
maybePublishConfig();
}
diff --git a/xds/src/test/java/io/grpc/xds/XdsDependencyManagerTest.java b/xds/src/test/java/io/grpc/xds/XdsDependencyManagerTest.java
index 2af04a3..1f3d851 100644
--- a/xds/src/test/java/io/grpc/xds/XdsDependencyManagerTest.java
+++ b/xds/src/test/java/io/grpc/xds/XdsDependencyManagerTest.java
@@ -41,6 +41,7 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
+import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.Message;
import io.envoyproxy.envoy.config.cluster.v3.Cluster;
import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment;
@@ -65,7 +66,7 @@
import io.grpc.xds.XdsConfig.XdsClusterConfig;
import io.grpc.xds.XdsEndpointResource.EdsUpdate;
import io.grpc.xds.client.CommonBootstrapperTestUtils;
-import io.grpc.xds.client.XdsClientImpl;
+import io.grpc.xds.client.XdsClient;
import io.grpc.xds.client.XdsClientMetricReporter;
import io.grpc.xds.client.XdsTransportFactory;
import java.io.Closeable;
@@ -115,7 +116,7 @@
});
private ManagedChannel channel;
- private XdsClientImpl xdsClient;
+ private XdsClient xdsClient;
private XdsDependencyManager xdsDependencyManager;
private TestWatcher xdsConfigWatcher;
private Server xdsServer;
@@ -715,6 +716,138 @@
assertThat(status.getDescription()).contains(XdsTestUtils.CLUSTER_NAME);
}
+ @Test
+ public void ldsUpdateAfterShutdown() {
+ XdsTestUtils.setAdsConfig(controlPlaneService, serverName, "RDS", "CDS", "EDS",
+ ENDPOINT_HOSTNAME, ENDPOINT_PORT);
+
+ xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext,
+ serverName, serverName, nameResolverArgs, scheduler);
+
+ verify(xdsConfigWatcher, timeout(1000)).onUpdate(any());
+
+ @SuppressWarnings("unchecked")
+ XdsClient.ResourceWatcher<XdsListenerResource.LdsUpdate> resourceWatcher =
+ mock(XdsClient.ResourceWatcher.class);
+ xdsClient.watchXdsResource(
+ XdsListenerResource.getInstance(),
+ serverName,
+ resourceWatcher,
+ MoreExecutors.directExecutor());
+ verify(resourceWatcher, timeout(5000)).onChanged(any());
+
+ syncContext.execute(() -> {
+ // Shutdown before any updates. This will unsubscribe from XdsClient, but only after this
+ // Runnable returns
+ xdsDependencyManager.shutdown();
+
+ XdsTestUtils.setAdsConfig(controlPlaneService, serverName, "RDS2", "CDS", "EDS",
+ ENDPOINT_HOSTNAME, ENDPOINT_PORT);
+ verify(resourceWatcher, timeout(5000).times(2)).onChanged(any());
+ xdsClient.cancelXdsResourceWatch(
+ XdsListenerResource.getInstance(), serverName, resourceWatcher);
+ });
+ }
+
+ @Test
+ public void rdsUpdateAfterShutdown() {
+ XdsTestUtils.setAdsConfig(controlPlaneService, serverName, "RDS", "CDS", "EDS",
+ ENDPOINT_HOSTNAME, ENDPOINT_PORT);
+
+ xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext,
+ serverName, serverName, nameResolverArgs, scheduler);
+
+ verify(xdsConfigWatcher, timeout(1000)).onUpdate(any());
+
+ @SuppressWarnings("unchecked")
+ XdsClient.ResourceWatcher<XdsRouteConfigureResource.RdsUpdate> resourceWatcher =
+ mock(XdsClient.ResourceWatcher.class);
+ xdsClient.watchXdsResource(
+ XdsRouteConfigureResource.getInstance(),
+ "RDS",
+ resourceWatcher,
+ MoreExecutors.directExecutor());
+ verify(resourceWatcher, timeout(5000)).onChanged(any());
+
+ syncContext.execute(() -> {
+ // Shutdown before any updates. This will unsubscribe from XdsClient, but only after this
+ // Runnable returns
+ xdsDependencyManager.shutdown();
+
+ XdsTestUtils.setAdsConfig(controlPlaneService, serverName, "RDS", "CDS2", "EDS",
+ ENDPOINT_HOSTNAME, ENDPOINT_PORT);
+ verify(resourceWatcher, timeout(5000).times(2)).onChanged(any());
+ xdsClient.cancelXdsResourceWatch(
+ XdsRouteConfigureResource.getInstance(), serverName, resourceWatcher);
+ });
+ }
+
+ @Test
+ public void cdsUpdateAfterShutdown() {
+ XdsTestUtils.setAdsConfig(controlPlaneService, serverName, "RDS", "CDS", "EDS",
+ ENDPOINT_HOSTNAME, ENDPOINT_PORT);
+
+ xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext,
+ serverName, serverName, nameResolverArgs, scheduler);
+
+ verify(xdsConfigWatcher, timeout(1000)).onUpdate(any());
+
+ @SuppressWarnings("unchecked")
+ XdsClient.ResourceWatcher<XdsClusterResource.CdsUpdate> resourceWatcher =
+ mock(XdsClient.ResourceWatcher.class);
+ xdsClient.watchXdsResource(
+ XdsClusterResource.getInstance(),
+ "CDS",
+ resourceWatcher,
+ MoreExecutors.directExecutor());
+ verify(resourceWatcher, timeout(5000)).onChanged(any());
+
+ syncContext.execute(() -> {
+ // Shutdown before any updates. This will unsubscribe from XdsClient, but only after this
+ // Runnable returns
+ xdsDependencyManager.shutdown();
+
+ XdsTestUtils.setAdsConfig(controlPlaneService, serverName, "RDS", "CDS", "EDS2",
+ ENDPOINT_HOSTNAME, ENDPOINT_PORT);
+ verify(resourceWatcher, timeout(5000).times(2)).onChanged(any());
+ xdsClient.cancelXdsResourceWatch(
+ XdsClusterResource.getInstance(), serverName, resourceWatcher);
+ });
+ }
+
+ @Test
+ public void edsUpdateAfterShutdown() {
+ XdsTestUtils.setAdsConfig(controlPlaneService, serverName, "RDS", "CDS", "EDS",
+ ENDPOINT_HOSTNAME, ENDPOINT_PORT);
+
+ xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext,
+ serverName, serverName, nameResolverArgs, scheduler);
+
+ verify(xdsConfigWatcher, timeout(1000)).onUpdate(any());
+
+ @SuppressWarnings("unchecked")
+ XdsClient.ResourceWatcher<XdsEndpointResource.EdsUpdate> resourceWatcher =
+ mock(XdsClient.ResourceWatcher.class);
+ xdsClient.watchXdsResource(
+ XdsEndpointResource.getInstance(),
+ "EDS",
+ resourceWatcher,
+ MoreExecutors.directExecutor());
+ verify(resourceWatcher, timeout(5000)).onChanged(any());
+
+ syncContext.execute(() -> {
+ // Shutdown before any updates. This will unsubscribe from XdsClient, but only after this
+ // Runnable returns
+ xdsDependencyManager.shutdown();
+
+ XdsTestUtils.setAdsConfig(controlPlaneService, serverName, "RDS", "CDS", "EDS",
+ ENDPOINT_HOSTNAME + "2", ENDPOINT_PORT);
+ verify(resourceWatcher, timeout(5000).times(2)).onChanged(any());
+ xdsClient.cancelXdsResourceWatch(
+ XdsEndpointResource.getInstance(), serverName, resourceWatcher);
+ });
+ }
+
private Listener buildInlineClientListener(String rdsName, String clusterName) {
return XdsTestUtils.buildInlineClientListener(rdsName, clusterName, serverName);
}