| /* | 
 |  * | 
 |  * Copyright 2018 gRPC authors. | 
 |  * | 
 |  * Licensed under the Apache License, Version 2.0 (the "License"); | 
 |  * you may not use this file except in compliance with the License. | 
 |  * You may obtain a copy of the License at | 
 |  * | 
 |  *     http://www.apache.org/licenses/LICENSE-2.0 | 
 |  * | 
 |  * Unless required by applicable law or agreed to in writing, software | 
 |  * distributed under the License is distributed on an "AS IS" BASIS, | 
 |  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 
 |  * See the License for the specific language governing permissions and | 
 |  * limitations under the License. | 
 |  * | 
 |  */ | 
 |  | 
 | package grpc | 
 |  | 
 | import ( | 
 | 	"context" | 
 | 	"net" | 
 | 	"sync" | 
 | 	"testing" | 
 | 	"time" | 
 |  | 
 | 	"golang.org/x/net/http2" | 
 | 	"google.golang.org/grpc/balancer" | 
 | 	"google.golang.org/grpc/connectivity" | 
 | 	"google.golang.org/grpc/internal/testutils" | 
 | 	"google.golang.org/grpc/resolver" | 
 | 	"google.golang.org/grpc/resolver/manual" | 
 | ) | 
 |  | 
 | const stateRecordingBalancerName = "state_recoding_balancer" | 
 |  | 
 | var testBalancerBuilder = newStateRecordingBalancerBuilder() | 
 |  | 
 | func init() { | 
 | 	balancer.Register(testBalancerBuilder) | 
 | } | 
 |  | 
 | // These tests use a pipeListener. This listener is similar to net.Listener | 
 | // except that it is unbuffered, so each read and write will wait for the other | 
 | // side's corresponding write or read. | 
 | func (s) TestStateTransitions_SingleAddress(t *testing.T) { | 
 | 	for _, test := range []struct { | 
 | 		desc   string | 
 | 		want   []connectivity.State | 
 | 		server func(net.Listener) net.Conn | 
 | 	}{ | 
 | 		{ | 
 | 			desc: "When the server returns server preface, the client enters READY.", | 
 | 			want: []connectivity.State{ | 
 | 				connectivity.Connecting, | 
 | 				connectivity.Ready, | 
 | 			}, | 
 | 			server: func(lis net.Listener) net.Conn { | 
 | 				conn, err := lis.Accept() | 
 | 				if err != nil { | 
 | 					t.Error(err) | 
 | 					return nil | 
 | 				} | 
 |  | 
 | 				go keepReading(conn) | 
 |  | 
 | 				framer := http2.NewFramer(conn, conn) | 
 | 				if err := framer.WriteSettings(http2.Setting{}); err != nil { | 
 | 					t.Errorf("Error while writing settings frame. %v", err) | 
 | 					return nil | 
 | 				} | 
 |  | 
 | 				return conn | 
 | 			}, | 
 | 		}, | 
 | 		{ | 
 | 			desc: "When the connection is closed, the client enters TRANSIENT FAILURE.", | 
 | 			want: []connectivity.State{ | 
 | 				connectivity.Connecting, | 
 | 				connectivity.TransientFailure, | 
 | 			}, | 
 | 			server: func(lis net.Listener) net.Conn { | 
 | 				conn, err := lis.Accept() | 
 | 				if err != nil { | 
 | 					t.Error(err) | 
 | 					return nil | 
 | 				} | 
 |  | 
 | 				conn.Close() | 
 | 				return nil | 
 | 			}, | 
 | 		}, | 
 | 		{ | 
 | 			desc: `When the server sends its connection preface, but the connection dies before the client can write its | 
 | connection preface, the client enters TRANSIENT FAILURE.`, | 
 | 			want: []connectivity.State{ | 
 | 				connectivity.Connecting, | 
 | 				connectivity.TransientFailure, | 
 | 			}, | 
 | 			server: func(lis net.Listener) net.Conn { | 
 | 				conn, err := lis.Accept() | 
 | 				if err != nil { | 
 | 					t.Error(err) | 
 | 					return nil | 
 | 				} | 
 |  | 
 | 				framer := http2.NewFramer(conn, conn) | 
 | 				if err := framer.WriteSettings(http2.Setting{}); err != nil { | 
 | 					t.Errorf("Error while writing settings frame. %v", err) | 
 | 					return nil | 
 | 				} | 
 |  | 
 | 				conn.Close() | 
 | 				return nil | 
 | 			}, | 
 | 		}, | 
 | 		{ | 
 | 			desc: `When the server reads the client connection preface but does not send its connection preface, the | 
 | client enters TRANSIENT FAILURE.`, | 
 | 			want: []connectivity.State{ | 
 | 				connectivity.Connecting, | 
 | 				connectivity.TransientFailure, | 
 | 			}, | 
 | 			server: func(lis net.Listener) net.Conn { | 
 | 				conn, err := lis.Accept() | 
 | 				if err != nil { | 
 | 					t.Error(err) | 
 | 					return nil | 
 | 				} | 
 |  | 
 | 				go keepReading(conn) | 
 |  | 
 | 				return conn | 
 | 			}, | 
 | 		}, | 
 | 	} { | 
 | 		t.Log(test.desc) | 
 | 		testStateTransitionSingleAddress(t, test.want, test.server) | 
 | 	} | 
 | } | 
 |  | 
 | func testStateTransitionSingleAddress(t *testing.T, want []connectivity.State, server func(net.Listener) net.Conn) { | 
 | 	ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) | 
 | 	defer cancel() | 
 |  | 
 | 	pl := testutils.NewPipeListener() | 
 | 	defer pl.Close() | 
 |  | 
 | 	// Launch the server. | 
 | 	var conn net.Conn | 
 | 	var connMu sync.Mutex | 
 | 	go func() { | 
 | 		connMu.Lock() | 
 | 		conn = server(pl) | 
 | 		connMu.Unlock() | 
 | 	}() | 
 |  | 
 | 	client, err := DialContext(ctx, | 
 | 		"", | 
 | 		WithInsecure(), | 
 | 		WithBalancerName(stateRecordingBalancerName), | 
 | 		WithDialer(pl.Dialer()), | 
 | 		withBackoff(noBackoff{}), | 
 | 		withMinConnectDeadline(func() time.Duration { return time.Millisecond * 100 })) | 
 | 	if err != nil { | 
 | 		t.Fatal(err) | 
 | 	} | 
 | 	defer client.Close() | 
 |  | 
 | 	stateNotifications := testBalancerBuilder.nextStateNotifier() | 
 |  | 
 | 	timeout := time.After(5 * time.Second) | 
 |  | 
 | 	for i := 0; i < len(want); i++ { | 
 | 		select { | 
 | 		case <-timeout: | 
 | 			t.Fatalf("timed out waiting for state %d (%v) in flow %v", i, want[i], want) | 
 | 		case seen := <-stateNotifications: | 
 | 			if seen != want[i] { | 
 | 				t.Fatalf("expected to see %v at position %d in flow %v, got %v", want[i], i, want, seen) | 
 | 			} | 
 | 		} | 
 | 	} | 
 |  | 
 | 	connMu.Lock() | 
 | 	defer connMu.Unlock() | 
 | 	if conn != nil { | 
 | 		err = conn.Close() | 
 | 		if err != nil { | 
 | 			t.Fatal(err) | 
 | 		} | 
 | 	} | 
 | } | 
 |  | 
 | // When a READY connection is closed, the client enters CONNECTING. | 
 | func (s) TestStateTransitions_ReadyToConnecting(t *testing.T) { | 
 | 	want := []connectivity.State{ | 
 | 		connectivity.Connecting, | 
 | 		connectivity.Ready, | 
 | 		connectivity.Connecting, | 
 | 	} | 
 |  | 
 | 	ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) | 
 | 	defer cancel() | 
 |  | 
 | 	lis, err := net.Listen("tcp", "localhost:0") | 
 | 	if err != nil { | 
 | 		t.Fatalf("Error while listening. Err: %v", err) | 
 | 	} | 
 | 	defer lis.Close() | 
 |  | 
 | 	sawReady := make(chan struct{}) | 
 |  | 
 | 	// Launch the server. | 
 | 	go func() { | 
 | 		conn, err := lis.Accept() | 
 | 		if err != nil { | 
 | 			t.Error(err) | 
 | 			return | 
 | 		} | 
 |  | 
 | 		go keepReading(conn) | 
 |  | 
 | 		framer := http2.NewFramer(conn, conn) | 
 | 		if err := framer.WriteSettings(http2.Setting{}); err != nil { | 
 | 			t.Errorf("Error while writing settings frame. %v", err) | 
 | 			return | 
 | 		} | 
 |  | 
 | 		// Prevents race between onPrefaceReceipt and onClose. | 
 | 		<-sawReady | 
 |  | 
 | 		conn.Close() | 
 | 	}() | 
 |  | 
 | 	client, err := DialContext(ctx, lis.Addr().String(), WithInsecure(), WithBalancerName(stateRecordingBalancerName)) | 
 | 	if err != nil { | 
 | 		t.Fatal(err) | 
 | 	} | 
 | 	defer client.Close() | 
 |  | 
 | 	stateNotifications := testBalancerBuilder.nextStateNotifier() | 
 |  | 
 | 	timeout := time.After(5 * time.Second) | 
 |  | 
 | 	for i := 0; i < len(want); i++ { | 
 | 		select { | 
 | 		case <-timeout: | 
 | 			t.Fatalf("timed out waiting for state %d (%v) in flow %v", i, want[i], want) | 
 | 		case seen := <-stateNotifications: | 
 | 			if seen == connectivity.Ready { | 
 | 				close(sawReady) | 
 | 			} | 
 | 			if seen != want[i] { | 
 | 				t.Fatalf("expected to see %v at position %d in flow %v, got %v", want[i], i, want, seen) | 
 | 			} | 
 | 		} | 
 | 	} | 
 | } | 
 |  | 
 | // When the first connection is closed, the client stays in CONNECTING until it | 
 | // tries the second address (which succeeds, and then it enters READY). | 
 | func (s) TestStateTransitions_TriesAllAddrsBeforeTransientFailure(t *testing.T) { | 
 | 	want := []connectivity.State{ | 
 | 		connectivity.Connecting, | 
 | 		connectivity.Ready, | 
 | 	} | 
 |  | 
 | 	ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) | 
 | 	defer cancel() | 
 |  | 
 | 	lis1, err := net.Listen("tcp", "localhost:0") | 
 | 	if err != nil { | 
 | 		t.Fatalf("Error while listening. Err: %v", err) | 
 | 	} | 
 | 	defer lis1.Close() | 
 |  | 
 | 	lis2, err := net.Listen("tcp", "localhost:0") | 
 | 	if err != nil { | 
 | 		t.Fatalf("Error while listening. Err: %v", err) | 
 | 	} | 
 | 	defer lis2.Close() | 
 |  | 
 | 	server1Done := make(chan struct{}) | 
 | 	server2Done := make(chan struct{}) | 
 |  | 
 | 	// Launch server 1. | 
 | 	go func() { | 
 | 		conn, err := lis1.Accept() | 
 | 		if err != nil { | 
 | 			t.Error(err) | 
 | 			return | 
 | 		} | 
 |  | 
 | 		conn.Close() | 
 | 		close(server1Done) | 
 | 	}() | 
 | 	// Launch server 2. | 
 | 	go func() { | 
 | 		conn, err := lis2.Accept() | 
 | 		if err != nil { | 
 | 			t.Error(err) | 
 | 			return | 
 | 		} | 
 |  | 
 | 		go keepReading(conn) | 
 |  | 
 | 		framer := http2.NewFramer(conn, conn) | 
 | 		if err := framer.WriteSettings(http2.Setting{}); err != nil { | 
 | 			t.Errorf("Error while writing settings frame. %v", err) | 
 | 			return | 
 | 		} | 
 |  | 
 | 		close(server2Done) | 
 | 	}() | 
 |  | 
 | 	rb := manual.NewBuilderWithScheme("whatever") | 
 | 	rb.InitialState(resolver.State{Addresses: []resolver.Address{ | 
 | 		{Addr: lis1.Addr().String()}, | 
 | 		{Addr: lis2.Addr().String()}, | 
 | 	}}) | 
 | 	client, err := DialContext(ctx, "whatever:///this-gets-overwritten", WithInsecure(), WithBalancerName(stateRecordingBalancerName), WithResolvers(rb)) | 
 | 	if err != nil { | 
 | 		t.Fatal(err) | 
 | 	} | 
 | 	defer client.Close() | 
 |  | 
 | 	stateNotifications := testBalancerBuilder.nextStateNotifier() | 
 |  | 
 | 	timeout := time.After(5 * time.Second) | 
 |  | 
 | 	for i := 0; i < len(want); i++ { | 
 | 		select { | 
 | 		case <-timeout: | 
 | 			t.Fatalf("timed out waiting for state %d (%v) in flow %v", i, want[i], want) | 
 | 		case seen := <-stateNotifications: | 
 | 			if seen != want[i] { | 
 | 				t.Fatalf("expected to see %v at position %d in flow %v, got %v", want[i], i, want, seen) | 
 | 			} | 
 | 		} | 
 | 	} | 
 | 	select { | 
 | 	case <-timeout: | 
 | 		t.Fatal("saw the correct state transitions, but timed out waiting for client to finish interactions with server 1") | 
 | 	case <-server1Done: | 
 | 	} | 
 | 	select { | 
 | 	case <-timeout: | 
 | 		t.Fatal("saw the correct state transitions, but timed out waiting for client to finish interactions with server 2") | 
 | 	case <-server2Done: | 
 | 	} | 
 | } | 
 |  | 
 | // When there are multiple addresses, and we enter READY on one of them, a | 
 | // later closure should cause the client to enter CONNECTING | 
 | func (s) TestStateTransitions_MultipleAddrsEntersReady(t *testing.T) { | 
 | 	want := []connectivity.State{ | 
 | 		connectivity.Connecting, | 
 | 		connectivity.Ready, | 
 | 		connectivity.Connecting, | 
 | 	} | 
 |  | 
 | 	ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) | 
 | 	defer cancel() | 
 |  | 
 | 	lis1, err := net.Listen("tcp", "localhost:0") | 
 | 	if err != nil { | 
 | 		t.Fatalf("Error while listening. Err: %v", err) | 
 | 	} | 
 | 	defer lis1.Close() | 
 |  | 
 | 	// Never actually gets used; we just want it to be alive so that the resolver has two addresses to target. | 
 | 	lis2, err := net.Listen("tcp", "localhost:0") | 
 | 	if err != nil { | 
 | 		t.Fatalf("Error while listening. Err: %v", err) | 
 | 	} | 
 | 	defer lis2.Close() | 
 |  | 
 | 	server1Done := make(chan struct{}) | 
 | 	sawReady := make(chan struct{}) | 
 |  | 
 | 	// Launch server 1. | 
 | 	go func() { | 
 | 		conn, err := lis1.Accept() | 
 | 		if err != nil { | 
 | 			t.Error(err) | 
 | 			return | 
 | 		} | 
 |  | 
 | 		go keepReading(conn) | 
 |  | 
 | 		framer := http2.NewFramer(conn, conn) | 
 | 		if err := framer.WriteSettings(http2.Setting{}); err != nil { | 
 | 			t.Errorf("Error while writing settings frame. %v", err) | 
 | 			return | 
 | 		} | 
 |  | 
 | 		<-sawReady | 
 |  | 
 | 		conn.Close() | 
 |  | 
 | 		_, err = lis1.Accept() | 
 | 		if err != nil { | 
 | 			t.Error(err) | 
 | 			return | 
 | 		} | 
 |  | 
 | 		close(server1Done) | 
 | 	}() | 
 |  | 
 | 	rb := manual.NewBuilderWithScheme("whatever") | 
 | 	rb.InitialState(resolver.State{Addresses: []resolver.Address{ | 
 | 		{Addr: lis1.Addr().String()}, | 
 | 		{Addr: lis2.Addr().String()}, | 
 | 	}}) | 
 | 	client, err := DialContext(ctx, "whatever:///this-gets-overwritten", WithInsecure(), WithBalancerName(stateRecordingBalancerName), WithResolvers(rb)) | 
 | 	if err != nil { | 
 | 		t.Fatal(err) | 
 | 	} | 
 | 	defer client.Close() | 
 |  | 
 | 	stateNotifications := testBalancerBuilder.nextStateNotifier() | 
 |  | 
 | 	timeout := time.After(2 * time.Second) | 
 |  | 
 | 	for i := 0; i < len(want); i++ { | 
 | 		select { | 
 | 		case <-timeout: | 
 | 			t.Fatalf("timed out waiting for state %d (%v) in flow %v", i, want[i], want) | 
 | 		case seen := <-stateNotifications: | 
 | 			if seen == connectivity.Ready { | 
 | 				close(sawReady) | 
 | 			} | 
 | 			if seen != want[i] { | 
 | 				t.Fatalf("expected to see %v at position %d in flow %v, got %v", want[i], i, want, seen) | 
 | 			} | 
 | 		} | 
 | 	} | 
 | 	select { | 
 | 	case <-timeout: | 
 | 		t.Fatal("saw the correct state transitions, but timed out waiting for client to finish interactions with server 1") | 
 | 	case <-server1Done: | 
 | 	} | 
 | } | 
 |  | 
 | type stateRecordingBalancer struct { | 
 | 	notifier chan<- connectivity.State | 
 | 	balancer.Balancer | 
 | } | 
 |  | 
 | func (b *stateRecordingBalancer) UpdateSubConnState(sc balancer.SubConn, s balancer.SubConnState) { | 
 | 	b.notifier <- s.ConnectivityState | 
 | 	b.Balancer.UpdateSubConnState(sc, s) | 
 | } | 
 |  | 
 | func (b *stateRecordingBalancer) ResetNotifier(r chan<- connectivity.State) { | 
 | 	b.notifier = r | 
 | } | 
 |  | 
 | func (b *stateRecordingBalancer) Close() { | 
 | 	b.Balancer.Close() | 
 | } | 
 |  | 
 | type stateRecordingBalancerBuilder struct { | 
 | 	mu       sync.Mutex | 
 | 	notifier chan connectivity.State // The notifier used in the last Balancer. | 
 | } | 
 |  | 
 | func newStateRecordingBalancerBuilder() *stateRecordingBalancerBuilder { | 
 | 	return &stateRecordingBalancerBuilder{} | 
 | } | 
 |  | 
 | func (b *stateRecordingBalancerBuilder) Name() string { | 
 | 	return stateRecordingBalancerName | 
 | } | 
 |  | 
 | func (b *stateRecordingBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer { | 
 | 	stateNotifications := make(chan connectivity.State, 10) | 
 | 	b.mu.Lock() | 
 | 	b.notifier = stateNotifications | 
 | 	b.mu.Unlock() | 
 | 	return &stateRecordingBalancer{ | 
 | 		notifier: stateNotifications, | 
 | 		Balancer: balancer.Get(PickFirstBalancerName).Build(cc, opts), | 
 | 	} | 
 | } | 
 |  | 
 | func (b *stateRecordingBalancerBuilder) nextStateNotifier() <-chan connectivity.State { | 
 | 	b.mu.Lock() | 
 | 	defer b.mu.Unlock() | 
 | 	ret := b.notifier | 
 | 	b.notifier = nil | 
 | 	return ret | 
 | } | 
 |  | 
 | type noBackoff struct{} | 
 |  | 
 | func (b noBackoff) Backoff(int) time.Duration { return time.Duration(0) } | 
 |  | 
 | // Keep reading until something causes the connection to die (EOF, server | 
 | // closed, etc). Useful as a tool for mindlessly keeping the connection | 
 | // healthy, since the client will error if things like client prefaces are not | 
 | // accepted in a timely fashion. | 
 | func keepReading(conn net.Conn) { | 
 | 	buf := make([]byte, 1024) | 
 | 	for _, err := conn.Read(buf); err == nil; _, err = conn.Read(buf) { | 
 | 	} | 
 | } |