| /* |
| * |
| * Copyright 2018 gRPC authors. |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| * |
| */ |
| |
| package test |
| |
| import ( |
| "context" |
| "fmt" |
| "net" |
| "sync" |
| "testing" |
| "time" |
| |
| "golang.org/x/net/http2" |
| "google.golang.org/grpc" |
| "google.golang.org/grpc/backoff" |
| "google.golang.org/grpc/balancer" |
| "google.golang.org/grpc/connectivity" |
| "google.golang.org/grpc/credentials/insecure" |
| "google.golang.org/grpc/internal" |
| "google.golang.org/grpc/internal/balancer/stub" |
| "google.golang.org/grpc/internal/grpcsync" |
| "google.golang.org/grpc/internal/testutils" |
| "google.golang.org/grpc/resolver" |
| "google.golang.org/grpc/resolver/manual" |
| ) |
| |
| const stateRecordingBalancerName = "state_recording_balancer" |
| |
| var testBalancerBuilder = newStateRecordingBalancerBuilder() |
| |
| func init() { |
| balancer.Register(testBalancerBuilder) |
| } |
| |
| // These tests use a pipeListener. This listener is similar to net.Listener |
| // except that it is unbuffered, so each read and write will wait for the other |
| // side's corresponding write or read. |
| func (s) TestStateTransitions_SingleAddress(t *testing.T) { |
| for _, test := range []struct { |
| desc string |
| want []connectivity.State |
| server func(net.Listener) net.Conn |
| }{ |
| { |
| desc: "When the server returns server preface, the client enters READY.", |
| want: []connectivity.State{ |
| connectivity.Connecting, |
| connectivity.Ready, |
| }, |
| server: func(lis net.Listener) net.Conn { |
| conn, err := lis.Accept() |
| if err != nil { |
| t.Error(err) |
| return nil |
| } |
| |
| go keepReading(conn) |
| |
| framer := http2.NewFramer(conn, conn) |
| if err := framer.WriteSettings(http2.Setting{}); err != nil { |
| t.Errorf("Error while writing settings frame. %v", err) |
| return nil |
| } |
| |
| return conn |
| }, |
| }, |
| { |
| desc: "When the connection is closed before the preface is sent, the client enters TRANSIENT FAILURE.", |
| want: []connectivity.State{ |
| connectivity.Connecting, |
| connectivity.TransientFailure, |
| }, |
| server: func(lis net.Listener) net.Conn { |
| conn, err := lis.Accept() |
| if err != nil { |
| t.Error(err) |
| return nil |
| } |
| |
| conn.Close() |
| return nil |
| }, |
| }, |
| { |
| desc: `When the server sends its connection preface, but the connection dies before the client can write its |
| connection preface, the client enters TRANSIENT FAILURE.`, |
| want: []connectivity.State{ |
| connectivity.Connecting, |
| connectivity.TransientFailure, |
| }, |
| server: func(lis net.Listener) net.Conn { |
| conn, err := lis.Accept() |
| if err != nil { |
| t.Error(err) |
| return nil |
| } |
| |
| framer := http2.NewFramer(conn, conn) |
| if err := framer.WriteSettings(http2.Setting{}); err != nil { |
| t.Errorf("Error while writing settings frame. %v", err) |
| return nil |
| } |
| |
| conn.Close() |
| return nil |
| }, |
| }, |
| { |
| desc: `When the server reads the client connection preface but does not send its connection preface, the |
| client enters TRANSIENT FAILURE.`, |
| want: []connectivity.State{ |
| connectivity.Connecting, |
| connectivity.TransientFailure, |
| }, |
| server: func(lis net.Listener) net.Conn { |
| conn, err := lis.Accept() |
| if err != nil { |
| t.Error(err) |
| return nil |
| } |
| |
| go keepReading(conn) |
| |
| return conn |
| }, |
| }, |
| } { |
| t.Log(test.desc) |
| testStateTransitionSingleAddress(t, test.want, test.server) |
| } |
| } |
| |
| func testStateTransitionSingleAddress(t *testing.T, want []connectivity.State, server func(net.Listener) net.Conn) { |
| pl := testutils.NewPipeListener() |
| defer pl.Close() |
| |
| // Launch the server. |
| var conn net.Conn |
| var connMu sync.Mutex |
| go func() { |
| connMu.Lock() |
| conn = server(pl) |
| connMu.Unlock() |
| }() |
| |
| client, err := grpc.Dial("", |
| grpc.WithTransportCredentials(insecure.NewCredentials()), |
| grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, stateRecordingBalancerName)), |
| grpc.WithDialer(pl.Dialer()), |
| grpc.WithConnectParams(grpc.ConnectParams{ |
| Backoff: backoff.Config{}, |
| MinConnectTimeout: 100 * time.Millisecond, |
| })) |
| if err != nil { |
| t.Fatal(err) |
| } |
| defer client.Close() |
| |
| ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) |
| defer cancel() |
| go testutils.StayConnected(ctx, client) |
| |
| stateNotifications := testBalancerBuilder.nextStateNotifier() |
| for i := 0; i < len(want); i++ { |
| select { |
| case <-time.After(defaultTestTimeout): |
| t.Fatalf("timed out waiting for state %d (%v) in flow %v", i, want[i], want) |
| case seen := <-stateNotifications: |
| if seen != want[i] { |
| t.Fatalf("expected to see %v at position %d in flow %v, got %v", want[i], i, want, seen) |
| } |
| } |
| } |
| |
| connMu.Lock() |
| defer connMu.Unlock() |
| if conn != nil { |
| err = conn.Close() |
| if err != nil { |
| t.Fatal(err) |
| } |
| } |
| } |
| |
| // When a READY connection is closed, the client enters IDLE then CONNECTING. |
| func (s) TestStateTransitions_ReadyToConnecting(t *testing.T) { |
| lis, err := net.Listen("tcp", "localhost:0") |
| if err != nil { |
| t.Fatalf("Error while listening. Err: %v", err) |
| } |
| defer lis.Close() |
| |
| sawReady := make(chan struct{}, 1) |
| defer close(sawReady) |
| |
| // Launch the server. |
| go func() { |
| conn, err := lis.Accept() |
| if err != nil { |
| t.Error(err) |
| return |
| } |
| |
| go keepReading(conn) |
| |
| framer := http2.NewFramer(conn, conn) |
| if err := framer.WriteSettings(http2.Setting{}); err != nil { |
| t.Errorf("Error while writing settings frame. %v", err) |
| return |
| } |
| |
| // Prevents race between onPrefaceReceipt and onClose. |
| <-sawReady |
| |
| conn.Close() |
| }() |
| |
| client, err := grpc.Dial(lis.Addr().String(), |
| grpc.WithTransportCredentials(insecure.NewCredentials()), |
| grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, stateRecordingBalancerName))) |
| if err != nil { |
| t.Fatal(err) |
| } |
| defer client.Close() |
| |
| ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) |
| defer cancel() |
| go testutils.StayConnected(ctx, client) |
| |
| stateNotifications := testBalancerBuilder.nextStateNotifier() |
| |
| want := []connectivity.State{ |
| connectivity.Connecting, |
| connectivity.Ready, |
| connectivity.Idle, |
| connectivity.Connecting, |
| } |
| for i := 0; i < len(want); i++ { |
| select { |
| case <-time.After(defaultTestTimeout): |
| t.Fatalf("timed out waiting for state %d (%v) in flow %v", i, want[i], want) |
| case seen := <-stateNotifications: |
| if seen == connectivity.Ready { |
| sawReady <- struct{}{} |
| } |
| if seen != want[i] { |
| t.Fatalf("expected to see %v at position %d in flow %v, got %v", want[i], i, want, seen) |
| } |
| } |
| } |
| } |
| |
| // When the first connection is closed, the client stays in CONNECTING until it |
| // tries the second address (which succeeds, and then it enters READY). |
| func (s) TestStateTransitions_TriesAllAddrsBeforeTransientFailure(t *testing.T) { |
| lis1, err := net.Listen("tcp", "localhost:0") |
| if err != nil { |
| t.Fatalf("Error while listening. Err: %v", err) |
| } |
| defer lis1.Close() |
| |
| lis2, err := net.Listen("tcp", "localhost:0") |
| if err != nil { |
| t.Fatalf("Error while listening. Err: %v", err) |
| } |
| defer lis2.Close() |
| |
| server1Done := make(chan struct{}) |
| server2Done := make(chan struct{}) |
| |
| // Launch server 1. |
| go func() { |
| conn, err := lis1.Accept() |
| if err != nil { |
| t.Error(err) |
| return |
| } |
| |
| conn.Close() |
| close(server1Done) |
| }() |
| // Launch server 2. |
| go func() { |
| conn, err := lis2.Accept() |
| if err != nil { |
| t.Error(err) |
| return |
| } |
| |
| go keepReading(conn) |
| |
| framer := http2.NewFramer(conn, conn) |
| if err := framer.WriteSettings(http2.Setting{}); err != nil { |
| t.Errorf("Error while writing settings frame. %v", err) |
| return |
| } |
| |
| close(server2Done) |
| }() |
| |
| rb := manual.NewBuilderWithScheme("whatever") |
| rb.InitialState(resolver.State{Addresses: []resolver.Address{ |
| {Addr: lis1.Addr().String()}, |
| {Addr: lis2.Addr().String()}, |
| }}) |
| client, err := grpc.Dial("whatever:///this-gets-overwritten", |
| grpc.WithTransportCredentials(insecure.NewCredentials()), |
| grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, stateRecordingBalancerName)), |
| grpc.WithResolvers(rb)) |
| if err != nil { |
| t.Fatal(err) |
| } |
| defer client.Close() |
| |
| stateNotifications := testBalancerBuilder.nextStateNotifier() |
| want := []connectivity.State{ |
| connectivity.Connecting, |
| connectivity.Ready, |
| } |
| ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) |
| defer cancel() |
| for i := 0; i < len(want); i++ { |
| select { |
| case <-ctx.Done(): |
| t.Fatalf("timed out waiting for state %d (%v) in flow %v", i, want[i], want) |
| case seen := <-stateNotifications: |
| if seen != want[i] { |
| t.Fatalf("expected to see %v at position %d in flow %v, got %v", want[i], i, want, seen) |
| } |
| } |
| } |
| select { |
| case <-ctx.Done(): |
| t.Fatal("saw the correct state transitions, but timed out waiting for client to finish interactions with server 1") |
| case <-server1Done: |
| } |
| select { |
| case <-ctx.Done(): |
| t.Fatal("saw the correct state transitions, but timed out waiting for client to finish interactions with server 2") |
| case <-server2Done: |
| } |
| } |
| |
| // When there are multiple addresses, and we enter READY on one of them, a |
| // later closure should cause the client to enter CONNECTING |
| func (s) TestStateTransitions_MultipleAddrsEntersReady(t *testing.T) { |
| lis1, err := net.Listen("tcp", "localhost:0") |
| if err != nil { |
| t.Fatalf("Error while listening. Err: %v", err) |
| } |
| defer lis1.Close() |
| |
| // Never actually gets used; we just want it to be alive so that the resolver has two addresses to target. |
| lis2, err := net.Listen("tcp", "localhost:0") |
| if err != nil { |
| t.Fatalf("Error while listening. Err: %v", err) |
| } |
| defer lis2.Close() |
| |
| server1Done := make(chan struct{}) |
| sawReady := make(chan struct{}, 1) |
| defer close(sawReady) |
| |
| // Launch server 1. |
| go func() { |
| conn, err := lis1.Accept() |
| if err != nil { |
| t.Error(err) |
| return |
| } |
| |
| go keepReading(conn) |
| |
| framer := http2.NewFramer(conn, conn) |
| if err := framer.WriteSettings(http2.Setting{}); err != nil { |
| t.Errorf("Error while writing settings frame. %v", err) |
| return |
| } |
| |
| <-sawReady |
| |
| conn.Close() |
| |
| close(server1Done) |
| }() |
| |
| rb := manual.NewBuilderWithScheme("whatever") |
| rb.InitialState(resolver.State{Addresses: []resolver.Address{ |
| {Addr: lis1.Addr().String()}, |
| {Addr: lis2.Addr().String()}, |
| }}) |
| client, err := grpc.Dial("whatever:///this-gets-overwritten", |
| grpc.WithTransportCredentials(insecure.NewCredentials()), |
| grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, stateRecordingBalancerName)), |
| grpc.WithResolvers(rb)) |
| if err != nil { |
| t.Fatal(err) |
| } |
| defer client.Close() |
| |
| ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) |
| defer cancel() |
| go testutils.StayConnected(ctx, client) |
| |
| stateNotifications := testBalancerBuilder.nextStateNotifier() |
| want := []connectivity.State{ |
| connectivity.Connecting, |
| connectivity.Ready, |
| connectivity.Idle, |
| connectivity.Connecting, |
| } |
| for i := 0; i < len(want); i++ { |
| select { |
| case <-ctx.Done(): |
| t.Fatalf("timed out waiting for state %d (%v) in flow %v", i, want[i], want) |
| case seen := <-stateNotifications: |
| if seen == connectivity.Ready { |
| sawReady <- struct{}{} |
| } |
| if seen != want[i] { |
| t.Fatalf("expected to see %v at position %d in flow %v, got %v", want[i], i, want, seen) |
| } |
| } |
| } |
| select { |
| case <-ctx.Done(): |
| t.Fatal("saw the correct state transitions, but timed out waiting for client to finish interactions with server 1") |
| case <-server1Done: |
| } |
| } |
| |
| type stateRecordingBalancer struct { |
| balancer.Balancer |
| } |
| |
| func (b *stateRecordingBalancer) Close() { |
| b.Balancer.Close() |
| } |
| |
| type stateRecordingBalancerBuilder struct { |
| mu sync.Mutex |
| notifier chan connectivity.State // The notifier used in the last Balancer. |
| } |
| |
| func newStateRecordingBalancerBuilder() *stateRecordingBalancerBuilder { |
| return &stateRecordingBalancerBuilder{} |
| } |
| |
| func (b *stateRecordingBalancerBuilder) Name() string { |
| return stateRecordingBalancerName |
| } |
| |
| func (b *stateRecordingBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer { |
| stateNotifications := make(chan connectivity.State, 10) |
| b.mu.Lock() |
| b.notifier = stateNotifications |
| b.mu.Unlock() |
| return &stateRecordingBalancer{ |
| Balancer: balancer.Get("pick_first").Build(&stateRecordingCCWrapper{cc, stateNotifications}, opts), |
| } |
| } |
| |
| func (b *stateRecordingBalancerBuilder) nextStateNotifier() <-chan connectivity.State { |
| b.mu.Lock() |
| defer b.mu.Unlock() |
| ret := b.notifier |
| b.notifier = nil |
| return ret |
| } |
| |
| type stateRecordingCCWrapper struct { |
| balancer.ClientConn |
| notifier chan<- connectivity.State |
| } |
| |
| func (ccw *stateRecordingCCWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) { |
| oldListener := opts.StateListener |
| opts.StateListener = func(s balancer.SubConnState) { |
| ccw.notifier <- s.ConnectivityState |
| oldListener(s) |
| } |
| return ccw.ClientConn.NewSubConn(addrs, opts) |
| } |
| |
| // Keep reading until something causes the connection to die (EOF, server |
| // closed, etc). Useful as a tool for mindlessly keeping the connection |
| // healthy, since the client will error if things like client prefaces are not |
| // accepted in a timely fashion. |
| func keepReading(conn net.Conn) { |
| buf := make([]byte, 1024) |
| for _, err := conn.Read(buf); err == nil; _, err = conn.Read(buf) { |
| } |
| } |
| |
| type funcConnectivityStateSubscriber struct { |
| onMsg func(connectivity.State) |
| } |
| |
| func (f *funcConnectivityStateSubscriber) OnMessage(msg any) { |
| f.onMsg(msg.(connectivity.State)) |
| } |
| |
| // TestConnectivityStateSubscriber confirms updates sent by the balancer in |
| // rapid succession are not missed by the subscriber. |
| func (s) TestConnectivityStateSubscriber(t *testing.T) { |
| ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) |
| defer cancel() |
| |
| sendStates := []connectivity.State{ |
| connectivity.Connecting, |
| connectivity.Ready, |
| connectivity.Idle, |
| connectivity.Connecting, |
| connectivity.Idle, |
| connectivity.Connecting, |
| connectivity.Ready, |
| } |
| wantStates := append(sendStates, connectivity.Shutdown) |
| |
| const testBalName = "any" |
| bf := stub.BalancerFuncs{ |
| UpdateClientConnState: func(bd *stub.BalancerData, _ balancer.ClientConnState) error { |
| // Send the expected states in rapid succession. |
| for _, s := range sendStates { |
| t.Logf("Sending state update %s", s) |
| bd.ClientConn.UpdateState(balancer.State{ConnectivityState: s}) |
| } |
| return nil |
| }, |
| } |
| stub.Register(testBalName, bf) |
| |
| // Create the ClientConn. |
| const testResName = "any" |
| rb := manual.NewBuilderWithScheme(testResName) |
| cc, err := grpc.Dial(testResName+":///", |
| grpc.WithResolvers(rb), |
| grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, testBalName)), |
| grpc.WithTransportCredentials(insecure.NewCredentials()), |
| ) |
| if err != nil { |
| t.Fatalf("Unexpected error from grpc.Dial: %v", err) |
| } |
| |
| // Subscribe to state updates. Use a buffer size of 1 to allow the |
| // Shutdown state to go into the channel when Close()ing. |
| connCh := make(chan connectivity.State, 1) |
| s := &funcConnectivityStateSubscriber{ |
| onMsg: func(s connectivity.State) { |
| select { |
| case connCh <- s: |
| case <-ctx.Done(): |
| } |
| if s == connectivity.Shutdown { |
| close(connCh) |
| } |
| }, |
| } |
| |
| internal.SubscribeToConnectivityStateChanges.(func(cc *grpc.ClientConn, s grpcsync.Subscriber) func())(cc, s) |
| |
| // Send an update from the resolver that will trigger the LB policy's UpdateClientConnState. |
| go rb.UpdateState(resolver.State{}) |
| |
| // Verify the resulting states. |
| for i, want := range wantStates { |
| if i == len(sendStates) { |
| // Trigger Shutdown to be sent by the channel. Use a goroutine to |
| // ensure the operation does not block. |
| cc.Close() |
| } |
| select { |
| case got := <-connCh: |
| if got != want { |
| t.Errorf("Update %v was %s; want %s", i, got, want) |
| } else { |
| t.Logf("Update %v was %s as expected", i, got) |
| } |
| case <-ctx.Done(): |
| t.Fatalf("Timed out waiting for state update %v: %s", i, want) |
| } |
| } |
| } |