Do not create new addrConn when connection error happens (#1369)
diff --git a/clientconn.go b/clientconn.go
index e4d6a0b..f21a568 100644
--- a/clientconn.go
+++ b/clientconn.go
@@ -559,15 +559,16 @@
// resetAddrConn creates an addrConn for addr and adds it to cc.conns.
// If there is an old addrConn for addr, it will be torn down, using tearDownErr as the reason.
// If tearDownErr is nil, errConnDrain will be used instead.
+//
+// We should never need to replace an addrConn with a new one. This function is only used
+// as newAddrConn to create new addrConn.
+// TODO rename this function and clean up the code.
func (cc *ClientConn) resetAddrConn(addr Address, block bool, tearDownErr error) error {
ac := &addrConn{
cc: cc,
addr: addr,
dopts: cc.dopts,
}
- cc.mu.RLock()
- ac.dopts.copts.KeepaliveParams = cc.mkp
- cc.mu.RUnlock()
ac.ctx, ac.cancel = context.WithCancel(cc.ctx)
ac.stateCV = sync.NewCond(&ac.mu)
if EnableTracing {
@@ -598,10 +599,7 @@
cc.mu.Unlock()
if stale != nil {
// There is an addrConn alive on ac.addr already. This could be due to
- // 1) a buggy Balancer notifies duplicated Addresses;
- // 2) goaway was received, a new ac will replace the old ac.
- // The old ac should be deleted from cc.conns, but the
- // underlying transport should drain rather than close.
+ // a buggy Balancer that reports duplicated Addresses.
if tearDownErr == nil {
// tearDownErr is nil if resetAddrConn is called by
// 1) Dial
@@ -828,26 +826,44 @@
return ac.state, nil
}
-func (ac *addrConn) resetTransport(closeTransport bool) error {
+// resetTransport recreates a transport to the address for ac.
+// For the old transport:
+// - if drain is true, it will be gracefully closed.
+// - otherwise, it will be closed.
+func (ac *addrConn) resetTransport(drain bool) error {
+ ac.mu.Lock()
+ if ac.state == Shutdown {
+ ac.mu.Unlock()
+ return errConnClosing
+ }
+ ac.printf("connecting")
+ if ac.down != nil {
+ ac.down(downErrorf(false, true, "%v", errNetworkIO))
+ ac.down = nil
+ }
+ ac.state = Connecting
+ ac.stateCV.Broadcast()
+ t := ac.transport
+ ac.transport = nil
+ ac.mu.Unlock()
+ if t != nil {
+ if drain {
+ t.GracefulClose()
+ } else {
+ t.Close()
+ }
+ }
+ ac.cc.mu.RLock()
+ ac.dopts.copts.KeepaliveParams = ac.cc.mkp
+ ac.cc.mu.RUnlock()
for retries := 0; ; retries++ {
ac.mu.Lock()
- ac.printf("connecting")
if ac.state == Shutdown {
// ac.tearDown(...) has been invoked.
ac.mu.Unlock()
return errConnClosing
}
- if ac.down != nil {
- ac.down(downErrorf(false, true, "%v", errNetworkIO))
- ac.down = nil
- }
- ac.state = Connecting
- ac.stateCV.Broadcast()
- t := ac.transport
ac.mu.Unlock()
- if closeTransport && t != nil {
- t.Close()
- }
sleepTime := ac.dopts.bs.backoff(retries)
timeout := minConnectTimeout
if timeout < sleepTime {
@@ -883,7 +899,6 @@
ac.ready = nil
}
ac.mu.Unlock()
- closeTransport = false
timer := time.NewTimer(sleepTime - time.Since(connectTime))
select {
case <-timer.C:
@@ -936,19 +951,25 @@
return
case <-t.GoAway():
ac.adjustParams(t.GetGoAwayReason())
- // If GoAway happens without any network I/O error, ac is closed without shutting down the
- // underlying transport (the transport will be closed when all the pending RPCs finished or
- // failed.).
- // If GoAway and some network I/O error happen concurrently, ac and its underlying transport
- // are closed.
- // In both cases, a new ac is created.
+ // If GoAway happens without any network I/O error, the underlying transport
+ // will be gracefully closed, and a new transport will be created.
+ // (The transport will be closed when all the pending RPCs finished or failed.)
+ // If GoAway and some network I/O error happen concurrently, the underlying transport
+ // will be closed, and a new transport will be created.
+ var drain bool
select {
case <-t.Error():
- ac.cc.resetAddrConn(ac.addr, false, errNetworkIO)
default:
- ac.cc.resetAddrConn(ac.addr, false, errConnDrain)
+ drain = true
}
- return
+ if err := ac.resetTransport(drain); err != nil {
+ grpclog.Infof("get error from resetTransport %v, transportMonitor returning", err)
+ if err != errConnClosing {
+ // Keep this ac in cc.conns, to get the reason it's torn down.
+ ac.tearDown(err)
+ }
+ return
+ }
case <-t.Error():
select {
case <-ac.ctx.Done():
@@ -956,8 +977,14 @@
return
case <-t.GoAway():
ac.adjustParams(t.GetGoAwayReason())
- ac.cc.resetAddrConn(ac.addr, false, errNetworkIO)
- return
+ if err := ac.resetTransport(false); err != nil {
+ grpclog.Infof("get error from resetTransport %v, transportMonitor returning", err)
+ if err != errConnClosing {
+ // Keep this ac in cc.conns, to get the reason it's torn down.
+ ac.tearDown(err)
+ }
+ return
+ }
default:
}
ac.mu.Lock()
@@ -969,7 +996,8 @@
ac.state = TransientFailure
ac.stateCV.Broadcast()
ac.mu.Unlock()
- if err := ac.resetTransport(true); err != nil {
+ if err := ac.resetTransport(false); err != nil {
+ grpclog.Infof("get error from resetTransport %v, transportMonitor returning", err)
ac.mu.Lock()
ac.printf("transport exiting: %v", err)
ac.mu.Unlock()