blob: 6e9bfb37289d243c2d5d23b21fcdaa5e820f8d01 [file] [log] [blame]
/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package test
import (
"context"
"fmt"
"net"
"sync"
"testing"
"time"
"golang.org/x/net/http2"
"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/balancer/stub"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/resolver/manual"
)
const stateRecordingBalancerName = "state_recording_balancer"
var testBalancerBuilder = newStateRecordingBalancerBuilder()
func init() {
balancer.Register(testBalancerBuilder)
}
// These tests use a pipeListener. This listener is similar to net.Listener
// except that it is unbuffered, so each read and write will wait for the other
// side's corresponding write or read.
func (s) TestStateTransitions_SingleAddress(t *testing.T) {
for _, test := range []struct {
desc string
want []connectivity.State
server func(net.Listener) net.Conn
}{
{
desc: "When the server returns server preface, the client enters READY.",
want: []connectivity.State{
connectivity.Connecting,
connectivity.Ready,
},
server: func(lis net.Listener) net.Conn {
conn, err := lis.Accept()
if err != nil {
t.Error(err)
return nil
}
go keepReading(conn)
framer := http2.NewFramer(conn, conn)
if err := framer.WriteSettings(http2.Setting{}); err != nil {
t.Errorf("Error while writing settings frame. %v", err)
return nil
}
return conn
},
},
{
desc: "When the connection is closed before the preface is sent, the client enters TRANSIENT FAILURE.",
want: []connectivity.State{
connectivity.Connecting,
connectivity.TransientFailure,
},
server: func(lis net.Listener) net.Conn {
conn, err := lis.Accept()
if err != nil {
t.Error(err)
return nil
}
conn.Close()
return nil
},
},
{
desc: `When the server sends its connection preface, but the connection dies before the client can write its
connection preface, the client enters TRANSIENT FAILURE.`,
want: []connectivity.State{
connectivity.Connecting,
connectivity.TransientFailure,
},
server: func(lis net.Listener) net.Conn {
conn, err := lis.Accept()
if err != nil {
t.Error(err)
return nil
}
framer := http2.NewFramer(conn, conn)
if err := framer.WriteSettings(http2.Setting{}); err != nil {
t.Errorf("Error while writing settings frame. %v", err)
return nil
}
conn.Close()
return nil
},
},
{
desc: `When the server reads the client connection preface but does not send its connection preface, the
client enters TRANSIENT FAILURE.`,
want: []connectivity.State{
connectivity.Connecting,
connectivity.TransientFailure,
},
server: func(lis net.Listener) net.Conn {
conn, err := lis.Accept()
if err != nil {
t.Error(err)
return nil
}
go keepReading(conn)
return conn
},
},
} {
t.Log(test.desc)
testStateTransitionSingleAddress(t, test.want, test.server)
}
}
func testStateTransitionSingleAddress(t *testing.T, want []connectivity.State, server func(net.Listener) net.Conn) {
pl := testutils.NewPipeListener()
defer pl.Close()
// Launch the server.
var conn net.Conn
var connMu sync.Mutex
go func() {
connMu.Lock()
conn = server(pl)
connMu.Unlock()
}()
client, err := grpc.Dial("",
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, stateRecordingBalancerName)),
grpc.WithDialer(pl.Dialer()),
grpc.WithConnectParams(grpc.ConnectParams{
Backoff: backoff.Config{},
MinConnectTimeout: 100 * time.Millisecond,
}))
if err != nil {
t.Fatal(err)
}
defer client.Close()
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
go testutils.StayConnected(ctx, client)
stateNotifications := testBalancerBuilder.nextStateNotifier()
for i := 0; i < len(want); i++ {
select {
case <-time.After(defaultTestTimeout):
t.Fatalf("timed out waiting for state %d (%v) in flow %v", i, want[i], want)
case seen := <-stateNotifications:
if seen != want[i] {
t.Fatalf("expected to see %v at position %d in flow %v, got %v", want[i], i, want, seen)
}
}
}
connMu.Lock()
defer connMu.Unlock()
if conn != nil {
err = conn.Close()
if err != nil {
t.Fatal(err)
}
}
}
// When a READY connection is closed, the client enters IDLE then CONNECTING.
func (s) TestStateTransitions_ReadyToConnecting(t *testing.T) {
lis, err := net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatalf("Error while listening. Err: %v", err)
}
defer lis.Close()
sawReady := make(chan struct{}, 1)
defer close(sawReady)
// Launch the server.
go func() {
conn, err := lis.Accept()
if err != nil {
t.Error(err)
return
}
go keepReading(conn)
framer := http2.NewFramer(conn, conn)
if err := framer.WriteSettings(http2.Setting{}); err != nil {
t.Errorf("Error while writing settings frame. %v", err)
return
}
// Prevents race between onPrefaceReceipt and onClose.
<-sawReady
conn.Close()
}()
client, err := grpc.Dial(lis.Addr().String(),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, stateRecordingBalancerName)))
if err != nil {
t.Fatal(err)
}
defer client.Close()
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
go testutils.StayConnected(ctx, client)
stateNotifications := testBalancerBuilder.nextStateNotifier()
want := []connectivity.State{
connectivity.Connecting,
connectivity.Ready,
connectivity.Idle,
connectivity.Connecting,
}
for i := 0; i < len(want); i++ {
select {
case <-time.After(defaultTestTimeout):
t.Fatalf("timed out waiting for state %d (%v) in flow %v", i, want[i], want)
case seen := <-stateNotifications:
if seen == connectivity.Ready {
sawReady <- struct{}{}
}
if seen != want[i] {
t.Fatalf("expected to see %v at position %d in flow %v, got %v", want[i], i, want, seen)
}
}
}
}
// When the first connection is closed, the client stays in CONNECTING until it
// tries the second address (which succeeds, and then it enters READY).
func (s) TestStateTransitions_TriesAllAddrsBeforeTransientFailure(t *testing.T) {
lis1, err := net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatalf("Error while listening. Err: %v", err)
}
defer lis1.Close()
lis2, err := net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatalf("Error while listening. Err: %v", err)
}
defer lis2.Close()
server1Done := make(chan struct{})
server2Done := make(chan struct{})
// Launch server 1.
go func() {
conn, err := lis1.Accept()
if err != nil {
t.Error(err)
return
}
conn.Close()
close(server1Done)
}()
// Launch server 2.
go func() {
conn, err := lis2.Accept()
if err != nil {
t.Error(err)
return
}
go keepReading(conn)
framer := http2.NewFramer(conn, conn)
if err := framer.WriteSettings(http2.Setting{}); err != nil {
t.Errorf("Error while writing settings frame. %v", err)
return
}
close(server2Done)
}()
rb := manual.NewBuilderWithScheme("whatever")
rb.InitialState(resolver.State{Addresses: []resolver.Address{
{Addr: lis1.Addr().String()},
{Addr: lis2.Addr().String()},
}})
client, err := grpc.Dial("whatever:///this-gets-overwritten",
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, stateRecordingBalancerName)),
grpc.WithResolvers(rb))
if err != nil {
t.Fatal(err)
}
defer client.Close()
stateNotifications := testBalancerBuilder.nextStateNotifier()
want := []connectivity.State{
connectivity.Connecting,
connectivity.Ready,
}
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
for i := 0; i < len(want); i++ {
select {
case <-ctx.Done():
t.Fatalf("timed out waiting for state %d (%v) in flow %v", i, want[i], want)
case seen := <-stateNotifications:
if seen != want[i] {
t.Fatalf("expected to see %v at position %d in flow %v, got %v", want[i], i, want, seen)
}
}
}
select {
case <-ctx.Done():
t.Fatal("saw the correct state transitions, but timed out waiting for client to finish interactions with server 1")
case <-server1Done:
}
select {
case <-ctx.Done():
t.Fatal("saw the correct state transitions, but timed out waiting for client to finish interactions with server 2")
case <-server2Done:
}
}
// When there are multiple addresses, and we enter READY on one of them, a
// later closure should cause the client to enter CONNECTING
func (s) TestStateTransitions_MultipleAddrsEntersReady(t *testing.T) {
lis1, err := net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatalf("Error while listening. Err: %v", err)
}
defer lis1.Close()
// Never actually gets used; we just want it to be alive so that the resolver has two addresses to target.
lis2, err := net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatalf("Error while listening. Err: %v", err)
}
defer lis2.Close()
server1Done := make(chan struct{})
sawReady := make(chan struct{}, 1)
defer close(sawReady)
// Launch server 1.
go func() {
conn, err := lis1.Accept()
if err != nil {
t.Error(err)
return
}
go keepReading(conn)
framer := http2.NewFramer(conn, conn)
if err := framer.WriteSettings(http2.Setting{}); err != nil {
t.Errorf("Error while writing settings frame. %v", err)
return
}
<-sawReady
conn.Close()
close(server1Done)
}()
rb := manual.NewBuilderWithScheme("whatever")
rb.InitialState(resolver.State{Addresses: []resolver.Address{
{Addr: lis1.Addr().String()},
{Addr: lis2.Addr().String()},
}})
client, err := grpc.Dial("whatever:///this-gets-overwritten",
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, stateRecordingBalancerName)),
grpc.WithResolvers(rb))
if err != nil {
t.Fatal(err)
}
defer client.Close()
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
go testutils.StayConnected(ctx, client)
stateNotifications := testBalancerBuilder.nextStateNotifier()
want := []connectivity.State{
connectivity.Connecting,
connectivity.Ready,
connectivity.Idle,
connectivity.Connecting,
}
for i := 0; i < len(want); i++ {
select {
case <-ctx.Done():
t.Fatalf("timed out waiting for state %d (%v) in flow %v", i, want[i], want)
case seen := <-stateNotifications:
if seen == connectivity.Ready {
sawReady <- struct{}{}
}
if seen != want[i] {
t.Fatalf("expected to see %v at position %d in flow %v, got %v", want[i], i, want, seen)
}
}
}
select {
case <-ctx.Done():
t.Fatal("saw the correct state transitions, but timed out waiting for client to finish interactions with server 1")
case <-server1Done:
}
}
type stateRecordingBalancer struct {
balancer.Balancer
}
func (b *stateRecordingBalancer) Close() {
b.Balancer.Close()
}
type stateRecordingBalancerBuilder struct {
mu sync.Mutex
notifier chan connectivity.State // The notifier used in the last Balancer.
}
func newStateRecordingBalancerBuilder() *stateRecordingBalancerBuilder {
return &stateRecordingBalancerBuilder{}
}
func (b *stateRecordingBalancerBuilder) Name() string {
return stateRecordingBalancerName
}
func (b *stateRecordingBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
stateNotifications := make(chan connectivity.State, 10)
b.mu.Lock()
b.notifier = stateNotifications
b.mu.Unlock()
return &stateRecordingBalancer{
Balancer: balancer.Get("pick_first").Build(&stateRecordingCCWrapper{cc, stateNotifications}, opts),
}
}
func (b *stateRecordingBalancerBuilder) nextStateNotifier() <-chan connectivity.State {
b.mu.Lock()
defer b.mu.Unlock()
ret := b.notifier
b.notifier = nil
return ret
}
type stateRecordingCCWrapper struct {
balancer.ClientConn
notifier chan<- connectivity.State
}
func (ccw *stateRecordingCCWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
oldListener := opts.StateListener
opts.StateListener = func(s balancer.SubConnState) {
ccw.notifier <- s.ConnectivityState
oldListener(s)
}
return ccw.ClientConn.NewSubConn(addrs, opts)
}
// Keep reading until something causes the connection to die (EOF, server
// closed, etc). Useful as a tool for mindlessly keeping the connection
// healthy, since the client will error if things like client prefaces are not
// accepted in a timely fashion.
func keepReading(conn net.Conn) {
buf := make([]byte, 1024)
for _, err := conn.Read(buf); err == nil; _, err = conn.Read(buf) {
}
}
type funcConnectivityStateSubscriber struct {
onMsg func(connectivity.State)
}
func (f *funcConnectivityStateSubscriber) OnMessage(msg any) {
f.onMsg(msg.(connectivity.State))
}
// TestConnectivityStateSubscriber confirms updates sent by the balancer in
// rapid succession are not missed by the subscriber.
func (s) TestConnectivityStateSubscriber(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
sendStates := []connectivity.State{
connectivity.Connecting,
connectivity.Ready,
connectivity.Idle,
connectivity.Connecting,
connectivity.Idle,
connectivity.Connecting,
connectivity.Ready,
}
wantStates := append(sendStates, connectivity.Shutdown)
const testBalName = "any"
bf := stub.BalancerFuncs{
UpdateClientConnState: func(bd *stub.BalancerData, _ balancer.ClientConnState) error {
// Send the expected states in rapid succession.
for _, s := range sendStates {
t.Logf("Sending state update %s", s)
bd.ClientConn.UpdateState(balancer.State{ConnectivityState: s})
}
return nil
},
}
stub.Register(testBalName, bf)
// Create the ClientConn.
const testResName = "any"
rb := manual.NewBuilderWithScheme(testResName)
cc, err := grpc.Dial(testResName+":///",
grpc.WithResolvers(rb),
grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, testBalName)),
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
t.Fatalf("Unexpected error from grpc.Dial: %v", err)
}
// Subscribe to state updates. Use a buffer size of 1 to allow the
// Shutdown state to go into the channel when Close()ing.
connCh := make(chan connectivity.State, 1)
s := &funcConnectivityStateSubscriber{
onMsg: func(s connectivity.State) {
select {
case connCh <- s:
case <-ctx.Done():
}
if s == connectivity.Shutdown {
close(connCh)
}
},
}
internal.SubscribeToConnectivityStateChanges.(func(cc *grpc.ClientConn, s grpcsync.Subscriber) func())(cc, s)
// Send an update from the resolver that will trigger the LB policy's UpdateClientConnState.
go rb.UpdateState(resolver.State{})
// Verify the resulting states.
for i, want := range wantStates {
if i == len(sendStates) {
// Trigger Shutdown to be sent by the channel. Use a goroutine to
// ensure the operation does not block.
cc.Close()
}
select {
case got := <-connCh:
if got != want {
t.Errorf("Update %v was %s; want %s", i, got, want)
} else {
t.Logf("Update %v was %s as expected", i, got)
}
case <-ctx.Done():
t.Fatalf("Timed out waiting for state update %v: %s", i, want)
}
}
}