| /* |
| * |
| * Copyright 2023 gRPC authors. |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| * |
| */ |
| |
| package grpc_test |
| |
| import ( |
| "context" |
| "errors" |
| "fmt" |
| "runtime" |
| "strings" |
| "testing" |
| "time" |
| |
| "google.golang.org/grpc" |
| "google.golang.org/grpc/balancer" |
| "google.golang.org/grpc/connectivity" |
| "google.golang.org/grpc/credentials/insecure" |
| "google.golang.org/grpc/internal" |
| "google.golang.org/grpc/internal/balancer/stub" |
| "google.golang.org/grpc/internal/channelz" |
| "google.golang.org/grpc/resolver" |
| "google.golang.org/grpc/resolver/manual" |
| ) |
| |
| // TestResolverBalancerInteraction tests: |
| // 1. resolver.Builder.Build() -> |
| // 2. resolver.ClientConn.UpdateState() -> |
| // 3. balancer.Balancer.UpdateClientConnState() -> |
| // 4. balancer.ClientConn.ResolveNow() -> |
| // 5. resolver.Resolver.ResolveNow() -> |
| func (s) TestResolverBalancerInteraction(t *testing.T) { |
| name := strings.Replace(strings.ToLower(t.Name()), "/", "", -1) |
| fmt.Println(name) |
| bf := stub.BalancerFuncs{ |
| UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error { |
| bd.ClientConn.ResolveNow(resolver.ResolveNowOptions{}) |
| return nil |
| }, |
| } |
| stub.Register(name, bf) |
| |
| rb := manual.NewBuilderWithScheme(name) |
| rb.BuildCallback = func(_ resolver.Target, cc resolver.ClientConn, _ resolver.BuildOptions) { |
| sc := cc.ParseServiceConfig(`{"loadBalancingConfig": [{"` + name + `":{}}]}`) |
| cc.UpdateState(resolver.State{ |
| Addresses: []resolver.Address{{Addr: "test"}}, |
| ServiceConfig: sc, |
| }) |
| } |
| rnCh := make(chan struct{}) |
| rb.ResolveNowCallback = func(resolver.ResolveNowOptions) { close(rnCh) } |
| resolver.Register(rb) |
| |
| cc, err := grpc.Dial(name+":///", grpc.WithTransportCredentials(insecure.NewCredentials())) |
| if err != nil { |
| t.Fatalf("grpc.Dial error: %v", err) |
| } |
| defer cc.Close() |
| select { |
| case <-rnCh: |
| case <-time.After(defaultTestTimeout): |
| t.Fatalf("timed out waiting for resolver.ResolveNow") |
| } |
| } |
| |
| type resolverBuilderWithErr struct { |
| resolver.Resolver |
| errCh <-chan error |
| scheme string |
| } |
| |
| func (b *resolverBuilderWithErr) Build(_ resolver.Target, cc resolver.ClientConn, _ resolver.BuildOptions) (resolver.Resolver, error) { |
| if err := <-b.errCh; err != nil { |
| return nil, err |
| } |
| return b, nil |
| } |
| |
| func (b *resolverBuilderWithErr) Scheme() string { |
| return b.scheme |
| } |
| |
| func (b *resolverBuilderWithErr) Close() {} |
| |
| // TestResolverBuildFailure tests: |
| // 1. resolver.Builder.Build() passes. |
| // 2. Channel enters idle mode. |
| // 3. An RPC happens. |
| // 4. resolver.Builder.Build() fails. |
| func (s) TestResolverBuildFailure(t *testing.T) { |
| enterIdle := internal.EnterIdleModeForTesting.(func(*grpc.ClientConn)) |
| name := strings.Replace(strings.ToLower(t.Name()), "/", "", -1) |
| resErrCh := make(chan error, 1) |
| resolver.Register(&resolverBuilderWithErr{errCh: resErrCh, scheme: name}) |
| |
| resErrCh <- nil |
| cc, err := grpc.Dial(name+":///", grpc.WithTransportCredentials(insecure.NewCredentials())) |
| if err != nil { |
| t.Fatalf("grpc.Dial error: %v", err) |
| } |
| defer cc.Close() |
| enterIdle(cc) |
| const errStr = "test error from resolver builder" |
| t.Log("pushing res err") |
| resErrCh <- errors.New(errStr) |
| ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) |
| defer cancel() |
| if err := cc.Invoke(ctx, "/a/b", nil, nil); err == nil || !strings.Contains(err.Error(), errStr) { |
| t.Fatalf("Invoke = %v; want %v", err, errStr) |
| } |
| } |
| |
| // TestEnterIdleDuringResolverUpdateState tests a scenario that used to deadlock |
| // while calling UpdateState at the same time as the resolver being closed while |
| // the channel enters idle mode. |
| func (s) TestEnterIdleDuringResolverUpdateState(t *testing.T) { |
| enterIdle := internal.EnterIdleModeForTesting.(func(*grpc.ClientConn)) |
| name := strings.Replace(strings.ToLower(t.Name()), "/", "", -1) |
| |
| // Create a manual resolver that spams UpdateState calls until it is closed. |
| rb := manual.NewBuilderWithScheme(name) |
| var cancel context.CancelFunc |
| rb.BuildCallback = func(_ resolver.Target, cc resolver.ClientConn, _ resolver.BuildOptions) { |
| var ctx context.Context |
| ctx, cancel = context.WithCancel(context.Background()) |
| go func() { |
| for ctx.Err() == nil { |
| cc.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "test"}}}) |
| } |
| }() |
| } |
| rb.CloseCallback = func() { |
| cancel() |
| } |
| resolver.Register(rb) |
| |
| cc, err := grpc.NewClient(name+":///", grpc.WithTransportCredentials(insecure.NewCredentials())) |
| if err != nil { |
| t.Fatalf("grpc.NewClient error: %v", err) |
| } |
| defer cc.Close() |
| |
| // Enter/exit idle mode repeatedly. |
| for i := 0; i < 2000; i++ { |
| // Start a timer so we panic out of the deadlock and can see all the |
| // stack traces to debug the problem. |
| p := time.AfterFunc(time.Second, func() { |
| buf := make([]byte, 8192) |
| buf = buf[0:runtime.Stack(buf, true)] |
| t.Error("Timed out waiting for enterIdle") |
| panic(fmt.Sprint("Stack trace:\n", string(buf))) |
| }) |
| enterIdle(cc) |
| p.Stop() |
| cc.Connect() |
| } |
| } |
| |
| // TestEnterIdleDuringBalancerUpdateState tests calling UpdateState at the same |
| // time as the balancer being closed while the channel enters idle mode. |
| func (s) TestEnterIdleDuringBalancerUpdateState(t *testing.T) { |
| enterIdle := internal.EnterIdleModeForTesting.(func(*grpc.ClientConn)) |
| name := strings.Replace(strings.ToLower(t.Name()), "/", "", -1) |
| |
| // Create a balancer that calls UpdateState once asynchronously, attempting |
| // to make the channel appear ready even after entering idle. |
| bf := stub.BalancerFuncs{ |
| UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error { |
| go func() { |
| bd.ClientConn.UpdateState(balancer.State{ConnectivityState: connectivity.Ready}) |
| }() |
| return nil |
| }, |
| } |
| stub.Register(name, bf) |
| |
| rb := manual.NewBuilderWithScheme(name) |
| rb.BuildCallback = func(_ resolver.Target, cc resolver.ClientConn, _ resolver.BuildOptions) { |
| cc.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "test"}}}) |
| } |
| resolver.Register(rb) |
| |
| cc, err := grpc.NewClient( |
| name+":///", |
| grpc.WithTransportCredentials(insecure.NewCredentials()), |
| grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"`+name+`":{}}]}`)) |
| if err != nil { |
| t.Fatalf("grpc.NewClient error: %v", err) |
| } |
| defer cc.Close() |
| |
| // Enter/exit idle mode repeatedly. |
| for i := 0; i < 2000; i++ { |
| enterIdle(cc) |
| if got, want := cc.GetState(), connectivity.Idle; got != want { |
| t.Fatalf("cc state = %v; want %v", got, want) |
| } |
| cc.Connect() |
| } |
| } |
| |
| // TestEnterIdleDuringBalancerNewSubConn tests calling NewSubConn at the same |
| // time as the balancer being closed while the channel enters idle mode. |
| func (s) TestEnterIdleDuringBalancerNewSubConn(t *testing.T) { |
| channelz.TurnOn() |
| defer internal.ChannelzTurnOffForTesting() |
| enterIdle := internal.EnterIdleModeForTesting.(func(*grpc.ClientConn)) |
| name := strings.Replace(strings.ToLower(t.Name()), "/", "", -1) |
| |
| // Create a balancer that calls NewSubConn once asynchronously, attempting |
| // to create a subchannel after going idle. |
| bf := stub.BalancerFuncs{ |
| UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error { |
| go func() { |
| bd.ClientConn.NewSubConn([]resolver.Address{{Addr: "test"}}, balancer.NewSubConnOptions{}) |
| }() |
| return nil |
| }, |
| } |
| stub.Register(name, bf) |
| |
| rb := manual.NewBuilderWithScheme(name) |
| rb.BuildCallback = func(_ resolver.Target, cc resolver.ClientConn, _ resolver.BuildOptions) { |
| cc.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "test"}}}) |
| } |
| resolver.Register(rb) |
| |
| cc, err := grpc.NewClient( |
| name+":///", |
| grpc.WithTransportCredentials(insecure.NewCredentials()), |
| grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"`+name+`":{}}]}`)) |
| if err != nil { |
| t.Fatalf("grpc.NewClient error: %v", err) |
| } |
| defer cc.Close() |
| |
| // Enter/exit idle mode repeatedly. |
| for i := 0; i < 2000; i++ { |
| enterIdle(cc) |
| tcs, _ := channelz.GetTopChannels(0, 0) |
| if len(tcs) != 1 { |
| t.Fatalf("Found channels: %v; expected 1 entry", tcs) |
| } |
| if got := tcs[0].SubChans(); len(got) != 0 { |
| t.Fatalf("Found subchannels: %v; expected 0 entries", got) |
| } |
| cc.Connect() |
| } |
| } |