| /* |
| * |
| * Copyright 2016 gRPC authors. |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| * |
| */ |
| |
| package grpclb |
| |
| import ( |
| "fmt" |
| "sync" |
| "time" |
| |
| "google.golang.org/grpc/balancer" |
| "google.golang.org/grpc/resolver" |
| ) |
| |
| // The parent ClientConn should re-resolve when grpclb loses connection to the |
| // remote balancer. When the ClientConn inside grpclb gets a TransientFailure, |
| // it calls lbManualResolver.ResolveNow(), which calls parent ClientConn's |
| // ResolveNow, and eventually results in re-resolve happening in parent |
| // ClientConn's resolver (DNS for example). |
| // |
| // parent |
| // ClientConn |
| // +-----------------------------------------------------------------+ |
| // | parent +---------------------------------+ | |
| // | DNS ClientConn | grpclb | | |
| // | resolver balancerWrapper | | | |
| // | + + | grpclb grpclb | | |
| // | | | | ManualResolver ClientConn | | |
| // | | | | + + | | |
| // | | | | | | Transient | | |
| // | | | | | | Failure | | |
| // | | | | | <--------- | | | |
| // | | | <--------------- | ResolveNow | | | |
| // | | <--------- | ResolveNow | | | | | |
| // | | ResolveNow | | | | | | |
| // | | | | | | | | |
| // | + + | + + | | |
| // | +---------------------------------+ | |
| // +-----------------------------------------------------------------+ |
| |
| // lbManualResolver is used by the ClientConn inside grpclb. It's a manual |
| // resolver with a special ResolveNow() function. |
| // |
| // When ResolveNow() is called, it calls ResolveNow() on the parent ClientConn, |
| // so when grpclb client lose contact with remote balancers, the parent |
| // ClientConn's resolver will re-resolve. |
| type lbManualResolver struct { |
| scheme string |
| ccr resolver.ClientConn |
| |
| ccb balancer.ClientConn |
| } |
| |
| func (r *lbManualResolver) Build(_ resolver.Target, cc resolver.ClientConn, _ resolver.BuildOptions) (resolver.Resolver, error) { |
| r.ccr = cc |
| return r, nil |
| } |
| |
| func (r *lbManualResolver) Scheme() string { |
| return r.scheme |
| } |
| |
| // ResolveNow calls resolveNow on the parent ClientConn. |
| func (r *lbManualResolver) ResolveNow(o resolver.ResolveNowOptions) { |
| r.ccb.ResolveNow(o) |
| } |
| |
| // Close is a noop for Resolver. |
| func (*lbManualResolver) Close() {} |
| |
| // UpdateState calls cc.UpdateState. |
| func (r *lbManualResolver) UpdateState(s resolver.State) { |
| r.ccr.UpdateState(s) |
| } |
| |
| const subConnCacheTime = time.Second * 10 |
| |
| // lbCacheClientConn is a wrapper balancer.ClientConn with a SubConn cache. |
| // SubConns will be kept in cache for subConnCacheTime before being removed. |
| // |
| // Its new and remove methods are updated to do cache first. |
| type lbCacheClientConn struct { |
| cc balancer.ClientConn |
| timeout time.Duration |
| |
| mu sync.Mutex |
| // subConnCache only keeps subConns that are being deleted. |
| subConnCache map[resolver.Address]*subConnCacheEntry |
| subConnToAddr map[balancer.SubConn]resolver.Address |
| } |
| |
| type subConnCacheEntry struct { |
| sc balancer.SubConn |
| |
| cancel func() |
| abortDeleting bool |
| } |
| |
| func newLBCacheClientConn(cc balancer.ClientConn) *lbCacheClientConn { |
| return &lbCacheClientConn{ |
| cc: cc, |
| timeout: subConnCacheTime, |
| subConnCache: make(map[resolver.Address]*subConnCacheEntry), |
| subConnToAddr: make(map[balancer.SubConn]resolver.Address), |
| } |
| } |
| |
| func (ccc *lbCacheClientConn) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) { |
| if len(addrs) != 1 { |
| return nil, fmt.Errorf("grpclb calling NewSubConn with addrs of length %v", len(addrs)) |
| } |
| addrWithoutMD := addrs[0] |
| addrWithoutMD.Metadata = nil |
| |
| ccc.mu.Lock() |
| defer ccc.mu.Unlock() |
| if entry, ok := ccc.subConnCache[addrWithoutMD]; ok { |
| // If entry is in subConnCache, the SubConn was being deleted. |
| // cancel function will never be nil. |
| entry.cancel() |
| delete(ccc.subConnCache, addrWithoutMD) |
| return entry.sc, nil |
| } |
| |
| scNew, err := ccc.cc.NewSubConn(addrs, opts) |
| if err != nil { |
| return nil, err |
| } |
| |
| ccc.subConnToAddr[scNew] = addrWithoutMD |
| return scNew, nil |
| } |
| |
| func (ccc *lbCacheClientConn) RemoveSubConn(sc balancer.SubConn) { |
| ccc.mu.Lock() |
| defer ccc.mu.Unlock() |
| addr, ok := ccc.subConnToAddr[sc] |
| if !ok { |
| return |
| } |
| |
| if entry, ok := ccc.subConnCache[addr]; ok { |
| if entry.sc != sc { |
| // This could happen if NewSubConn was called multiple times for the |
| // same address, and those SubConns are all removed. We remove sc |
| // immediately here. |
| delete(ccc.subConnToAddr, sc) |
| ccc.cc.RemoveSubConn(sc) |
| } |
| return |
| } |
| |
| entry := &subConnCacheEntry{ |
| sc: sc, |
| } |
| ccc.subConnCache[addr] = entry |
| |
| timer := time.AfterFunc(ccc.timeout, func() { |
| ccc.mu.Lock() |
| defer ccc.mu.Unlock() |
| if entry.abortDeleting { |
| return |
| } |
| ccc.cc.RemoveSubConn(sc) |
| delete(ccc.subConnToAddr, sc) |
| delete(ccc.subConnCache, addr) |
| }) |
| entry.cancel = func() { |
| if !timer.Stop() { |
| // If stop was not successful, the timer has fired (this can only |
| // happen in a race). But the deleting function is blocked on ccc.mu |
| // because the mutex was held by the caller of this function. |
| // |
| // Set abortDeleting to true to abort the deleting function. When |
| // the lock is released, the deleting function will acquire the |
| // lock, check the value of abortDeleting and return. |
| entry.abortDeleting = true |
| } |
| } |
| } |
| |
| func (ccc *lbCacheClientConn) UpdateState(s balancer.State) { |
| ccc.cc.UpdateState(s) |
| } |
| |
| func (ccc *lbCacheClientConn) close() { |
| ccc.mu.Lock() |
| // Only cancel all existing timers. There's no need to remove SubConns. |
| for _, entry := range ccc.subConnCache { |
| entry.cancel() |
| } |
| ccc.mu.Unlock() |
| } |