blob: 831de43a3070a3f4399cd9be5d6767360b6b0113 [file] [log] [blame] [edit]
/*
Copyright 2019 Google LLC
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package spanner
import (
"context"
"fmt"
"sync"
"testing"
"time"
vkit "cloud.google.com/go/spanner/apiv1"
sppb "cloud.google.com/go/spanner/apiv1/spannerpb"
gax "github.com/googleapis/gax-go/v2"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
)
type testSessionCreateError struct {
err error
num int32
}
type testConsumer struct {
numExpected int32
mu sync.Mutex
sessions []*session
errors []*testSessionCreateError
numErr int32
receivedAll chan struct{}
}
func (tc *testConsumer) sessionReady(_ context.Context, s *session) {
tc.mu.Lock()
defer tc.mu.Unlock()
tc.sessions = append(tc.sessions, s)
tc.checkReceivedAll()
}
func (tc *testConsumer) sessionCreationFailed(_ context.Context, err error) {
tc.mu.Lock()
defer tc.mu.Unlock()
tc.errors = append(tc.errors, &testSessionCreateError{
err: err,
num: 1,
})
tc.numErr++
tc.checkReceivedAll()
}
func (tc *testConsumer) checkReceivedAll() {
if int32(len(tc.sessions))+tc.numErr == tc.numExpected {
close(tc.receivedAll)
}
}
func newTestConsumer(numExpected int32) *testConsumer {
return &testConsumer{
numExpected: numExpected,
receivedAll: make(chan struct{}),
}
}
func TestNextClient(t *testing.T) {
t.Parallel()
if useGRPCgcp {
// For GCPMultiEndpoint the nextClient is indirectly tested via TestBatchCreateAndCloseSession.
t.Skip("GCPMultiEndpoint does not provide a connection via Connection().")
}
n := 4
_, client, teardown := setupMockedTestServerWithConfig(t, ClientConfig{
DisableNativeMetrics: true,
NumChannels: n,
SessionPoolConfig: SessionPoolConfig{
MinOpened: 0,
MaxOpened: 100,
},
})
defer teardown()
sc := client.sm.sc
connections := make(map[*grpc.ClientConn]int)
for i := 0; i < n; i++ {
client, err := sc.nextClient()
if err != nil {
t.Fatalf("Error getting a gapic client from the session client\nGot: %v", err)
}
conn1 := client.Connection()
conn2 := client.Connection()
if conn1 != conn2 {
t.Fatalf("Client connection mismatch. Expected to get two equal connections.\nGot: %v and %v", conn1, conn2)
}
if index, ok := connections[conn1]; ok {
t.Fatalf("Same connection used multiple times for different clients.\nClient 1: %v\nClient 2: %v", index, i)
}
connections[conn1] = i
}
// Pass through all the clients once more. This time the exact same
// connections should be found.
for i := 0; i < n; i++ {
client, err := sc.nextClient()
if err != nil {
t.Fatalf("Error getting a gapic client from the session client\nGot: %v", err)
}
conn := client.Connection()
if _, ok := connections[conn]; !ok {
t.Fatalf("Connection not found for index %v", i)
}
}
}
func TestCreateAndCloseSession(t *testing.T) {
t.Parallel()
server, client, teardown := setupMockedTestServerWithConfig(t, ClientConfig{
DisableNativeMetrics: true,
})
defer teardown()
s, err := client.sc.createSession(context.Background())
if err != nil {
t.Fatalf("batch.next() return error mismatch\ngot: %v\nwant: nil", err)
}
if s == nil {
t.Fatalf("batch.next() return value mismatch\ngot: %v\nwant: any session", s)
}
if server.TestSpanner.TotalSessionsCreated() != uint(2) {
t.Fatalf("number of sessions created mismatch\ngot: %v\nwant: %v", server.TestSpanner.TotalSessionsCreated(), 2)
}
}
func TestCreateSessionWithDatabaseRole(t *testing.T) {
// Make sure that there is always only one session in the sm.
sc := SessionPoolConfig{
MinOpened: 0,
MaxOpened: 1,
}
server, client, teardown := setupMockedTestServerWithConfig(t, ClientConfig{DisableNativeMetrics: true, SessionPoolConfig: sc, DatabaseRole: "test"})
defer teardown()
ctx := context.Background()
s, err := client.sc.createSession(ctx)
if err != nil {
t.Fatalf("batch.next() return error mismatch\ngot: %v\nwant: nil", err)
}
if s == nil {
t.Fatalf("batch.next() return value mismatch\ngot: %v\nwant: any session", s)
}
if g, w := server.TestSpanner.TotalSessionsCreated(), uint(2); g != w {
t.Fatalf("number of sessions created mismatch\ngot: %v\nwant: %v", g, w)
}
resp, err := server.TestSpanner.GetSession(ctx, &sppb.GetSessionRequest{Name: s.id})
if err != nil {
t.Fatalf("Failed to get session unexpectedly: %v", err)
}
if g, w := resp.CreatorRole, "test"; g != w {
t.Fatalf("database role mismatch.\nGot: %v\nWant: %v", g, w)
}
}
func TestClientIDGenerator(t *testing.T) {
cidGen = newClientIDGenerator()
for _, tt := range []struct {
database string
clientID string
}{
{"db", "client-1"},
{"db-new", "client-1"},
{"db", "client-2"},
} {
if got, want := cidGen.nextID(tt.database), tt.clientID; got != want {
t.Fatalf("Generate wrong client ID: got %v, want %v", got, want)
}
}
}
func TestMergeCallOptions(t *testing.T) {
a := &vkit.CallOptions{
CreateSession: []gax.CallOption{
gax.WithRetry(func() gax.Retryer {
return gax.OnCodes([]codes.Code{
codes.Unavailable, codes.DeadlineExceeded,
}, gax.Backoff{
Initial: 100 * time.Millisecond,
Max: 16000 * time.Millisecond,
Multiplier: 1.0,
})
}),
},
GetSession: []gax.CallOption{
gax.WithRetry(func() gax.Retryer {
return gax.OnCodes([]codes.Code{
codes.Unavailable, codes.DeadlineExceeded,
}, gax.Backoff{
Initial: 250 * time.Millisecond,
Max: 32000 * time.Millisecond,
Multiplier: 1.30,
})
}),
},
}
b := &vkit.CallOptions{
CreateSession: []gax.CallOption{
gax.WithRetry(func() gax.Retryer {
return gax.OnCodes([]codes.Code{
codes.Unavailable,
}, gax.Backoff{
Initial: 250 * time.Millisecond,
Max: 32000 * time.Millisecond,
Multiplier: 1.30,
})
}),
},
BatchCreateSessions: []gax.CallOption{
gax.WithRetry(func() gax.Retryer {
return gax.OnCodes([]codes.Code{
codes.Unavailable,
}, gax.Backoff{
Initial: 250 * time.Millisecond,
Max: 32000 * time.Millisecond,
Multiplier: 1.30,
})
}),
}}
merged := mergeCallOptions(b, a)
cs := &gax.CallSettings{}
// We can't access the fields of Retryer so we have test the result by
// comparing strings.
merged.CreateSession[0].Resolve(cs)
if got, want := fmt.Sprintf("%v", cs.Retry()), "&{{250000000 32000000000 1.3 0} [14]}"; got != want {
t.Fatalf("merged CallOptions is incorrect: got %v, want %v", got, want)
}
merged.CreateSession[1].Resolve(cs)
if got, want := fmt.Sprintf("%v", cs.Retry()), "&{{100000000 16000000000 1 0} [14 4]}"; got != want {
t.Fatalf("merged CallOptions is incorrect: got %v, want %v", got, want)
}
merged.GetSession[0].Resolve(cs)
if got, want := fmt.Sprintf("%v", cs.Retry()), "&{{250000000 32000000000 1.3 0} [14 4]}"; got != want {
t.Fatalf("merged CallOptions is incorrect: got %v, want %v", got, want)
}
merged.BatchCreateSessions[0].Resolve(cs)
if got, want := fmt.Sprintf("%v", cs.Retry()), "&{{250000000 32000000000 1.3 0} [14]}"; got != want {
t.Fatalf("merged CallOptions is incorrect: got %v, want %v", got, want)
}
}