blob: d057f1ece889974fdda60ac68e088b67498432b0 [file] [log] [blame]
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package xds
import (
"context"
"sync"
"time"
"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes"
structpb "github.com/golang/protobuf/ptypes/struct"
"google.golang.org/grpc"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/xds/internal"
cdspb "google.golang.org/grpc/balancer/xds/internal/proto/envoy/api/v2/cds"
basepb "google.golang.org/grpc/balancer/xds/internal/proto/envoy/api/v2/core/base"
discoverypb "google.golang.org/grpc/balancer/xds/internal/proto/envoy/api/v2/discovery"
edspb "google.golang.org/grpc/balancer/xds/internal/proto/envoy/api/v2/eds"
adsgrpc "google.golang.org/grpc/balancer/xds/internal/proto/envoy/service/discovery/v2/ads"
"google.golang.org/grpc/balancer/xds/lrs"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal/backoff"
"google.golang.org/grpc/internal/channelz"
)
const (
cdsType = "type.googleapis.com/envoy.api.v2.Cluster"
edsType = "type.googleapis.com/envoy.api.v2.ClusterLoadAssignment"
endpointRequired = "endpoints_required"
)
var (
defaultBackoffConfig = backoff.Exponential{
MaxDelay: 120 * time.Second,
}
)
// client is responsible for connecting to the specified traffic director, passing the received
// ADS response from the traffic director, and sending notification when communication with the
// traffic director is lost.
type client struct {
ctx context.Context
cancel context.CancelFunc
cli adsgrpc.AggregatedDiscoveryServiceClient
opts balancer.BuildOptions
balancerName string // the traffic director name
serviceName string // the user dial target name
enableCDS bool
newADS func(ctx context.Context, resp proto.Message) error
loseContact func(ctx context.Context)
cleanup func()
backoff backoff.Strategy
loadStore lrs.Store
loadReportOnce sync.Once
mu sync.Mutex
cc *grpc.ClientConn
}
func (c *client) run() {
c.dial()
c.makeADSCall()
}
func (c *client) close() {
c.cancel()
c.mu.Lock()
if c.cc != nil {
c.cc.Close()
}
c.mu.Unlock()
c.cleanup()
}
func (c *client) dial() {
var dopts []grpc.DialOption
if creds := c.opts.DialCreds; creds != nil {
if err := creds.OverrideServerName(c.balancerName); err == nil {
dopts = append(dopts, grpc.WithTransportCredentials(creds))
} else {
grpclog.Warningf("xds: failed to override the server name in the credentials: %v, using Insecure", err)
dopts = append(dopts, grpc.WithInsecure())
}
} else {
dopts = append(dopts, grpc.WithInsecure())
}
if c.opts.Dialer != nil {
dopts = append(dopts, grpc.WithContextDialer(c.opts.Dialer))
}
// Explicitly set pickfirst as the balancer.
dopts = append(dopts, grpc.WithBalancerName(grpc.PickFirstBalancerName))
if channelz.IsOn() {
dopts = append(dopts, grpc.WithChannelzParentID(c.opts.ChannelzParentID))
}
cc, err := grpc.DialContext(c.ctx, c.balancerName, dopts...)
// Since this is a non-blocking dial, so if it fails, it due to some serious error (not network
// related) error.
if err != nil {
grpclog.Fatalf("xds: failed to dial: %v", err)
}
c.mu.Lock()
select {
case <-c.ctx.Done():
cc.Close()
default:
// only assign c.cc when xds client has not been closed, to prevent ClientConn leak.
c.cc = cc
}
c.mu.Unlock()
}
func (c *client) newCDSRequest() *discoverypb.DiscoveryRequest {
cdsReq := &discoverypb.DiscoveryRequest{
Node: &basepb.Node{
Metadata: &structpb.Struct{
Fields: map[string]*structpb.Value{
internal.GrpcHostname: {
Kind: &structpb.Value_StringValue{StringValue: c.serviceName},
},
},
},
},
TypeUrl: cdsType,
}
return cdsReq
}
func (c *client) newEDSRequest() *discoverypb.DiscoveryRequest {
edsReq := &discoverypb.DiscoveryRequest{
Node: &basepb.Node{
Metadata: &structpb.Struct{
Fields: map[string]*structpb.Value{
internal.GrpcHostname: {
Kind: &structpb.Value_StringValue{StringValue: c.serviceName},
},
endpointRequired: {
Kind: &structpb.Value_BoolValue{BoolValue: c.enableCDS},
},
},
},
},
// TODO: the expected ResourceName could be in a different format from
// dial target. (test_service.test_namespace.traffic_director.com vs
// test_namespace:test_service).
//
// The solution today is to always include GrpcHostname in metadata,
// with the value set to dial target.
//
// A future solution could be: always do CDS, get cluster name from CDS
// response, and use it here.
// `ResourceNames: []string{c.clusterName},`
TypeUrl: edsType,
}
return edsReq
}
func (c *client) makeADSCall() {
c.cli = adsgrpc.NewAggregatedDiscoveryServiceClient(c.cc)
retryCount := 0
var doRetry bool
for {
select {
case <-c.ctx.Done():
return
default:
}
if doRetry {
backoffTimer := time.NewTimer(c.backoff.Backoff(retryCount))
select {
case <-backoffTimer.C:
case <-c.ctx.Done():
backoffTimer.Stop()
return
}
retryCount++
}
firstRespReceived := c.adsCallAttempt()
if firstRespReceived {
retryCount = 0
doRetry = false
} else {
doRetry = true
}
c.loseContact(c.ctx)
}
}
func (c *client) adsCallAttempt() (firstRespReceived bool) {
firstRespReceived = false
ctx, cancel := context.WithCancel(c.ctx)
defer cancel()
st, err := c.cli.StreamAggregatedResources(ctx, grpc.WaitForReady(true))
if err != nil {
grpclog.Infof("xds: failed to initial ADS streaming RPC due to %v", err)
return
}
if c.enableCDS {
if err := st.Send(c.newCDSRequest()); err != nil {
// current stream is broken, start a new one.
grpclog.Infof("xds: ads RPC failed due to err: %v, when sending the CDS request ", err)
return
}
}
if err := st.Send(c.newEDSRequest()); err != nil {
// current stream is broken, start a new one.
grpclog.Infof("xds: ads RPC failed due to err: %v, when sending the EDS request", err)
return
}
expectCDS := c.enableCDS
for {
resp, err := st.Recv()
if err != nil {
// current stream is broken, start a new one.
grpclog.Infof("xds: ads RPC failed due to err: %v, when receiving the response", err)
return
}
firstRespReceived = true
resources := resp.GetResources()
if len(resources) < 1 {
grpclog.Warning("xds: ADS response contains 0 resource info.")
// start a new call as server misbehaves by sending a ADS response with 0 resource info.
return
}
if resp.GetTypeUrl() == cdsType && !c.enableCDS {
grpclog.Warning("xds: received CDS response in custom plugin mode.")
// start a new call as we receive CDS response when in EDS-only mode.
return
}
var adsResp ptypes.DynamicAny
if err := ptypes.UnmarshalAny(resources[0], &adsResp); err != nil {
grpclog.Warningf("xds: failed to unmarshal resources due to %v.", err)
return
}
switch adsResp.Message.(type) {
case *cdspb.Cluster:
expectCDS = false
case *edspb.ClusterLoadAssignment:
if expectCDS {
grpclog.Warningf("xds: expecting CDS response, got EDS response instead.")
return
}
}
if err := c.newADS(c.ctx, adsResp.Message); err != nil {
grpclog.Warningf("xds: processing new ADS message failed due to %v.", err)
return
}
// Only start load reporting after ADS resp is received.
//
// Also, newADS() will close the previous load reporting stream, so we
// don't have double reporting.
c.loadReportOnce.Do(func() {
if c.loadStore != nil {
go c.loadStore.ReportTo(c.ctx, c.cc)
}
})
}
}
func newXDSClient(balancerName string, enableCDS bool, opts balancer.BuildOptions, loadStore lrs.Store, newADS func(context.Context, proto.Message) error, loseContact func(ctx context.Context), exitCleanup func()) *client {
c := &client{
balancerName: balancerName,
serviceName: opts.Target.Endpoint,
enableCDS: enableCDS,
opts: opts,
newADS: newADS,
loseContact: loseContact,
cleanup: exitCleanup,
backoff: defaultBackoffConfig,
loadStore: loadStore,
}
c.ctx, c.cancel = context.WithCancel(context.Background())
return c
}