eds: fix priority timeout failure when EDS removes all priorities (cherry pick of #3830) (#3840)

Without this fix, when the EDS response removes all priorities, after the timeout, the priority check panics because priority is unset.
diff --git a/xds/internal/balancer/edsbalancer/eds_impl_priority.go b/xds/internal/balancer/edsbalancer/eds_impl_priority.go
index c2ce312..fc1d26d 100644
--- a/xds/internal/balancer/edsbalancer/eds_impl_priority.go
+++ b/xds/internal/balancer/edsbalancer/eds_impl_priority.go
@@ -49,6 +49,12 @@
 	// Everything was removed by EDS.
 	if !edsImpl.priorityLowest.isSet() {
 		edsImpl.priorityInUse = newPriorityTypeUnset()
+		// Stop the init timer. This can happen if the only priority is removed
+		// shortly after it's added.
+		if timer := edsImpl.priorityInitTimer; timer != nil {
+			timer.Stop()
+			edsImpl.priorityInitTimer = nil
+		}
 		edsImpl.cc.UpdateState(balancer.State{ConnectivityState: connectivity.TransientFailure, Picker: base.NewErrPicker(errAllPrioritiesRemoved)})
 		return
 	}
@@ -116,7 +122,7 @@
 	edsImpl.priorityInitTimer = time.AfterFunc(defaultPriorityInitTimeout, func() {
 		edsImpl.priorityMu.Lock()
 		defer edsImpl.priorityMu.Unlock()
-		if !edsImpl.priorityInUse.equal(priority) {
+		if !edsImpl.priorityInUse.isSet() || !edsImpl.priorityInUse.equal(priority) {
 			return
 		}
 		edsImpl.priorityInitTimer = nil
@@ -307,14 +313,18 @@
 }
 
 func (p priorityType) equal(p2 priorityType) bool {
+	if !p.isSet() && !p2.isSet() {
+		return true
+	}
 	if !p.isSet() || !p2.isSet() {
-		panic("priority unset")
+		return false
 	}
 	return p == p2
 }
 
 func (p priorityType) higherThan(p2 priorityType) bool {
 	if !p.isSet() || !p2.isSet() {
+		// TODO(menghanl): return an appropriate value instead of panic.
 		panic("priority unset")
 	}
 	return p.p < p2.p
@@ -322,6 +332,7 @@
 
 func (p priorityType) lowerThan(p2 priorityType) bool {
 	if !p.isSet() || !p2.isSet() {
+		// TODO(menghanl): return an appropriate value instead of panic.
 		panic("priority unset")
 	}
 	return p.p > p2.p
diff --git a/xds/internal/balancer/edsbalancer/eds_impl_priority_test.go b/xds/internal/balancer/edsbalancer/eds_impl_priority_test.go
index d9b9ff4..2636ca9 100644
--- a/xds/internal/balancer/edsbalancer/eds_impl_priority_test.go
+++ b/xds/internal/balancer/edsbalancer/eds_impl_priority_test.go
@@ -656,6 +656,46 @@
 	}
 }
 
+func (s) TestPriorityTypeEqual(t *testing.T) {
+	tests := []struct {
+		name   string
+		p1, p2 priorityType
+		want   bool
+	}{
+		{
+			name: "equal",
+			p1:   newPriorityType(12),
+			p2:   newPriorityType(12),
+			want: true,
+		},
+		{
+			name: "not equal",
+			p1:   newPriorityType(12),
+			p2:   newPriorityType(34),
+			want: false,
+		},
+		{
+			name: "one not set",
+			p1:   newPriorityType(1),
+			p2:   newPriorityTypeUnset(),
+			want: false,
+		},
+		{
+			name: "both not set",
+			p1:   newPriorityTypeUnset(),
+			p2:   newPriorityTypeUnset(),
+			want: true,
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			if got := tt.p1.equal(tt.p2); got != tt.want {
+				t.Errorf("equal() = %v, want %v", got, tt.want)
+			}
+		})
+	}
+}
+
 // Test the case where the high priority contains no backends. The low priority
 // will be used.
 func (s) TestEDSPriority_HighPriorityNoEndpoints(t *testing.T) {
@@ -775,3 +815,28 @@
 		t.Fatalf("want %v, got %v", want, err)
 	}
 }
+
+// Test the case where the first and only priority is removed.
+func (s) TestEDSPriority_FirstPriorityUnavailable(t *testing.T) {
+	const testPriorityInitTimeout = time.Second
+	defer func(t time.Duration) {
+		defaultPriorityInitTimeout = t
+	}(defaultPriorityInitTimeout)
+	defaultPriorityInitTimeout = testPriorityInitTimeout
+
+	cc := testutils.NewTestClientConn(t)
+	edsb := newEDSBalancerImpl(cc, nil, nil, nil)
+	edsb.enqueueChildBalancerStateUpdate = edsb.updateState
+
+	// One localities, with priorities [0], each with one backend.
+	clab1 := xdsclient.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
+	clab1.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil)
+	edsb.handleEDSResponse(xdsclient.ParseEDSRespProtoForTesting(clab1.Build()))
+
+	// Remove the only localities.
+	clab2 := xdsclient.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
+	edsb.handleEDSResponse(xdsclient.ParseEDSRespProtoForTesting(clab2.Build()))
+
+	// Wait after double the init timer timeout, to ensure it doesn't fail.
+	time.Sleep(testPriorityInitTimeout * 2)
+}