blob: d36d285906470efd1285fa8a64f224b6351fe1a8 [file] [edit]
/*
Copyright 2015 Google LLC
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package bigtable
import (
"bytes"
"context"
"encoding/binary"
"errors"
"hash/crc32"
"io"
"reflect"
"strings"
"testing"
"time"
btpb "cloud.google.com/go/bigtable/apiv2/bigtablepb"
"cloud.google.com/go/civil"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/googleapis/gax-go/v2/apierror"
"google.golang.org/api/option"
"google.golang.org/genproto/googleapis/rpc/errdetails"
"google.golang.org/genproto/googleapis/type/date"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/timestamppb"
)
var disableMetricsConfig = ClientConfig{MetricsProvider: NoopMetricsProvider{}}
var emulatorUnsupported = "the emulator does not currently support"
func TestPrefix(t *testing.T) {
for _, test := range []struct {
prefix, succ string
}{
{"", ""},
{"\xff", ""}, // when used, "" means Infinity
{"x\xff", "y"},
{"\xfe", "\xff"},
} {
got := prefixSuccessor(test.prefix)
if got != test.succ {
t.Errorf("prefixSuccessor(%q) = %q, want %s", test.prefix, got, test.succ)
continue
}
r := PrefixRange(test.prefix)
if test.succ == "" && r.end != "" {
t.Errorf("PrefixRange(%q) got end %q", test.prefix, r.end)
}
if test.succ != "" && r.end != test.succ {
t.Errorf("PrefixRange(%q) got end %q, want %q", test.prefix, r.end, test.succ)
}
}
}
func TestNewClosedOpenRange(t *testing.T) {
start := "b"
limit := "b\x01"
r := NewClosedOpenRange(start, limit)
for _, test := range []struct {
k string
contains bool
}{
{"a", false},
{"b", true},
{"b\x00", true},
{"b\x01", false},
} {
if want, got := test.contains, r.Contains(test.k); want != got {
t.Errorf("%s.Contains(%q) = %t, want %t", r.String(), test.k, got, want)
}
}
for _, test := range []struct {
start, limit string
valid bool
}{
{"a", "a", false},
{"b", "a", false},
{"a", "a\x00", true},
{"a", "b", true},
} {
r := NewClosedOpenRange(test.start, test.limit)
if want, got := test.valid, r.valid(); want != got {
t.Errorf("%s.valid() = %t, want %t", r.String(), got, want)
}
}
}
func TestNewOpenClosedRange(t *testing.T) {
start := "b"
limit := "b\x01"
r := NewOpenClosedRange(start, limit)
for _, test := range []struct {
k string
contains bool
}{
{"a", false},
{"b", false},
{"b\x00", true},
{"b\x01", true},
{"b\x01\x00", false},
} {
if want, got := test.contains, r.Contains(test.k); want != got {
t.Errorf("%s.Contains(%q) = %t, want %t", r.String(), test.k, got, want)
}
}
for _, test := range []struct {
start, limit string
valid bool
}{
{"a", "a", false},
{"b", "a", false},
{"a", "a\x00", true},
{"a", "b", true},
} {
r := NewOpenClosedRange(test.start, test.limit)
if want, got := test.valid, r.valid(); want != got {
t.Errorf("%s.valid() = %t, want %t", r.String(), got, want)
}
}
}
func TestNewClosedRange(t *testing.T) {
start := "b"
limit := "b"
r := NewClosedRange(start, limit)
for _, test := range []struct {
k string
contains bool
}{
{"a", false},
{"b", true},
{"b\x01", false},
} {
if want, got := test.contains, r.Contains(test.k); want != got {
t.Errorf("NewClosedRange(%q, %q).Contains(%q) = %t, want %t", "a", "a\x01", test.k, got, test.contains)
}
}
for _, test := range []struct {
start, limit string
valid bool
}{
{"a", "b", true},
{"b", "b", true},
{"b", "b\x00", true},
{"b\x00", "b", false},
} {
r := NewClosedRange(test.start, test.limit)
if want, got := test.valid, r.valid(); want != got {
t.Errorf("NewClosedRange(%q, %q).valid() = %t, want %t", test.start, test.limit, got, want)
}
}
}
func TestNewOpenRange(t *testing.T) {
start := "b"
limit := "b\x01"
r := NewOpenRange(start, limit)
for _, test := range []struct {
k string
contains bool
}{
{"a", false},
{"b", false},
{"b\x00", true},
{"b\x01", false},
} {
if want, got := test.contains, r.Contains(test.k); want != got {
t.Errorf("NewOpenRange(%q, %q).Contains(%q) = %t, want %t", "a", "a\x01", test.k, got, test.contains)
}
}
for _, test := range []struct {
start, limit string
valid bool
}{
{"a", "a", false},
{"a", "b", true},
{"a", "a\x00", true},
{"a", "a\x01", true},
} {
r := NewOpenRange(test.start, test.limit)
if want, got := test.valid, r.valid(); want != got {
t.Errorf("NewOpenRange(%q, %q).valid() = %t, want %t", test.start, test.limit, got, want)
}
}
}
func TestInfiniteRange(t *testing.T) {
r := InfiniteRange("b")
for _, test := range []struct {
k string
contains bool
}{
{"a", false},
{"b", true},
{"b\x00", true},
{"z", true},
} {
if want, got := test.contains, r.Contains(test.k); want != got {
t.Errorf("%s.Contains(%q) = %t, want %t", r.String(), test.k, got, want)
}
}
for _, test := range []struct {
start string
valid bool
}{
{"a", true},
{"", true},
} {
r := InfiniteRange(test.start)
if want, got := test.valid, r.valid(); want != got {
t.Errorf("%s.valid() = %t, want %t", r.String(), got, want)
}
}
}
func TestInfiniteReverseRange(t *testing.T) {
r := InfiniteReverseRange("z")
for _, test := range []struct {
k string
contains bool
}{
{"a", true},
{"z", true},
{"z\x00", false},
} {
if want, got := test.contains, r.Contains(test.k); want != got {
t.Errorf("%s.Contains(%q) = %t, want %t", r.String(), test.k, got, want)
}
}
for _, test := range []struct {
start string
valid bool
}{
{"a", true},
{"", true},
} {
r := InfiniteReverseRange(test.start)
if want, got := test.valid, r.valid(); want != got {
t.Errorf("%s.valid() = %t, want %t", r.String(), got, want)
}
}
}
func TestApplyErrors(t *testing.T) {
ctx := context.Background()
table := &Table{
c: &Client{
project: "P",
instance: "I",
metricsTracerFactory: &builtinMetricsTracerFactory{},
},
table: "t",
}
f := ColumnFilter("C")
m := NewMutation()
m.DeleteRow()
// Test nested conditional mutations.
cm := NewCondMutation(f, NewCondMutation(f, m, nil), nil)
if err := table.Apply(ctx, "x", cm); err == nil {
t.Error("got nil, want error")
}
cm = NewCondMutation(f, nil, NewCondMutation(f, m, nil))
if err := table.Apply(ctx, "x", cm); err == nil {
t.Error("got nil, want error")
}
}
func TestGroupEntries(t *testing.T) {
for _, test := range []struct {
desc string
in []*entryErr
size int
want [][]*entryErr
}{
{
desc: "one entry less than max size is one group",
in: []*entryErr{buildEntry(5)},
size: 10,
want: [][]*entryErr{{buildEntry(5)}},
},
{
desc: "one entry equal to max size is one group",
in: []*entryErr{buildEntry(10)},
size: 10,
want: [][]*entryErr{{buildEntry(10)}},
},
{
desc: "one entry greater than max size is one group",
in: []*entryErr{buildEntry(15)},
size: 10,
want: [][]*entryErr{{buildEntry(15)}},
},
{
desc: "all entries fitting within max size are one group",
in: []*entryErr{buildEntry(10), buildEntry(10)},
size: 20,
want: [][]*entryErr{{buildEntry(10), buildEntry(10)}},
},
{
desc: "entries each under max size and together over max size are grouped separately",
in: []*entryErr{buildEntry(10), buildEntry(10)},
size: 15,
want: [][]*entryErr{{buildEntry(10)}, {buildEntry(10)}},
},
{
desc: "entries together over max size are grouped by max size",
in: []*entryErr{buildEntry(5), buildEntry(5), buildEntry(5)},
size: 10,
want: [][]*entryErr{{buildEntry(5), buildEntry(5)}, {buildEntry(5)}},
},
{
desc: "one entry over max size and one entry under max size are two groups",
in: []*entryErr{buildEntry(15), buildEntry(5)},
size: 10,
want: [][]*entryErr{{buildEntry(15)}, {buildEntry(5)}},
},
} {
t.Run(test.desc, func(t *testing.T) {
if got, want := groupEntries(test.in, test.size), test.want; !cmp.Equal(mutationCounts(got), mutationCounts(want)) {
t.Fatalf("[%s] want = %v, got = %v", test.desc, mutationCounts(want), mutationCounts(got))
}
})
}
}
func buildEntry(numMutations int) *entryErr {
var muts []*btpb.Mutation
for i := 0; i < numMutations; i++ {
muts = append(muts, &btpb.Mutation{})
}
return &entryErr{Entry: &btpb.MutateRowsRequest_Entry{Mutations: muts}}
}
func mutationCounts(batched [][]*entryErr) []int {
var res []int
for _, entries := range batched {
var count int
for _, e := range entries {
count += len(e.Entry.Mutations)
}
res = append(res, count)
}
return res
}
type requestCountingInterceptor struct {
grpc.ClientStream
requestCallback func()
}
func (i *requestCountingInterceptor) SendMsg(m interface{}) error {
i.requestCallback()
return i.ClientStream.SendMsg(m)
}
func (i *requestCountingInterceptor) RecvMsg(m interface{}) error {
return i.ClientStream.RecvMsg(m)
}
func requestCallback(callback func()) func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
clientStream, err := streamer(ctx, desc, cc, method, opts...)
return &requestCountingInterceptor{
ClientStream: clientStream,
requestCallback: callback,
}, err
}
}
func TestRowRangeProto(t *testing.T) {
for _, test := range []struct {
desc string
rr RowRange
proto *btpb.RowSet
}{
{
desc: "RowRange proto start and end",
rr: NewClosedOpenRange("a", "b"),
proto: &btpb.RowSet{RowRanges: []*btpb.RowRange{{
StartKey: &btpb.RowRange_StartKeyClosed{StartKeyClosed: []byte("a")},
EndKey: &btpb.RowRange_EndKeyOpen{EndKeyOpen: []byte("b")},
}}},
},
{
desc: "RowRange proto start but empty end",
rr: NewClosedOpenRange("a", ""),
proto: &btpb.RowSet{RowRanges: []*btpb.RowRange{{
StartKey: &btpb.RowRange_StartKeyClosed{StartKeyClosed: []byte("a")},
}}},
},
{
desc: "RowRange proto unbound",
rr: NewClosedOpenRange("", ""),
proto: &btpb.RowSet{RowRanges: []*btpb.RowRange{{}}},
},
{
desc: "RowRange proto unbound with no start or end",
rr: InfiniteRange(""),
proto: &btpb.RowSet{RowRanges: []*btpb.RowRange{{}}},
},
{
desc: "RowRange proto open closed",
rr: NewOpenClosedRange("a", "b"),
proto: &btpb.RowSet{RowRanges: []*btpb.RowRange{{
StartKey: &btpb.RowRange_StartKeyOpen{StartKeyOpen: []byte("a")},
EndKey: &btpb.RowRange_EndKeyClosed{EndKeyClosed: []byte("b")},
}}},
},
{
desc: "RowRange proto open closed and empty start",
rr: NewOpenClosedRange("", "b"),
proto: &btpb.RowSet{RowRanges: []*btpb.RowRange{{
EndKey: &btpb.RowRange_EndKeyClosed{EndKeyClosed: []byte("b")},
}}},
},
{
desc: "RowRange proto open closed and empty start",
rr: NewOpenClosedRange("", "b"),
proto: &btpb.RowSet{RowRanges: []*btpb.RowRange{{
EndKey: &btpb.RowRange_EndKeyClosed{EndKeyClosed: []byte("b")},
}}},
},
{
desc: "RowRange proto closed open",
rr: NewClosedOpenRange("a", "b"),
proto: &btpb.RowSet{RowRanges: []*btpb.RowRange{{
StartKey: &btpb.RowRange_StartKeyClosed{StartKeyClosed: []byte("a")},
EndKey: &btpb.RowRange_EndKeyOpen{EndKeyOpen: []byte("b")},
}}},
},
} {
t.Run(test.desc, func(t *testing.T) {
got := test.rr.proto()
want := test.proto
if !reflect.DeepEqual(got, want) {
t.Errorf("Bad proto for %s: got %v, want %v", test.rr.String(), got, want)
}
})
}
}
func TestRowRangeRetainRowsBefore(t *testing.T) {
for _, test := range []struct {
desc string
rr RowSet
proto *btpb.RowSet
}{
{
desc: "retain rows before",
rr: NewRange("a", "c").retainRowsBefore("b"),
proto: &btpb.RowSet{RowRanges: []*btpb.RowRange{{
StartKey: &btpb.RowRange_StartKeyClosed{StartKeyClosed: []byte("a")},
EndKey: &btpb.RowRange_EndKeyOpen{EndKeyOpen: []byte("b")},
}}},
},
{
desc: "retain rows before empty key",
rr: NewRange("a", "c").retainRowsBefore(""),
proto: &btpb.RowSet{RowRanges: []*btpb.RowRange{{
StartKey: &btpb.RowRange_StartKeyClosed{StartKeyClosed: []byte("a")},
EndKey: &btpb.RowRange_EndKeyOpen{EndKeyOpen: []byte("c")},
}}},
},
{
desc: "retain rows before key greater than range end",
rr: NewClosedRange("a", "c").retainRowsBefore("d"),
proto: &btpb.RowSet{RowRanges: []*btpb.RowRange{{
StartKey: &btpb.RowRange_StartKeyClosed{StartKeyClosed: []byte("a")},
EndKey: &btpb.RowRange_EndKeyClosed{EndKeyClosed: []byte("c")},
}}},
},
{
desc: "retain rows before key same as closed end key",
rr: NewClosedRange("a", "c").retainRowsBefore("c"),
proto: &btpb.RowSet{RowRanges: []*btpb.RowRange{{
StartKey: &btpb.RowRange_StartKeyClosed{StartKeyClosed: []byte("a")},
EndKey: &btpb.RowRange_EndKeyOpen{EndKeyOpen: []byte("c")},
}}},
},
{
desc: "retain rows before on unbounded range",
rr: InfiniteRange("").retainRowsBefore("z"),
proto: &btpb.RowSet{RowRanges: []*btpb.RowRange{{
EndKey: &btpb.RowRange_EndKeyOpen{EndKeyOpen: []byte("z")},
}}},
},
} {
t.Run(test.desc, func(t *testing.T) {
got := test.rr.proto()
want := test.proto
if !reflect.DeepEqual(got, want) {
t.Errorf("Bad retain rows before proto: got %v, want %v", got, want)
}
})
}
}
func TestRowRangeString(t *testing.T) {
for _, test := range []struct {
desc string
rr RowRange
str string
}{
{
desc: "RowRange closed open",
rr: NewClosedOpenRange("a", "b"),
str: "[\"a\",\"b\")",
},
{
desc: "RowRange open open",
rr: NewOpenRange("c", "d"),
str: "(\"c\",\"d\")",
},
{
desc: "RowRange closed closed",
rr: NewClosedRange("e", "f"),
str: "[\"e\",\"f\"]",
},
{
desc: "RowRange open closed",
rr: NewOpenClosedRange("g", "h"),
str: "(\"g\",\"h\"]",
},
{
desc: "RowRange unbound unbound",
rr: InfiniteRange(""),
str: "(∞,∞)",
},
{
desc: "RowRange closed unbound",
rr: InfiniteRange("b"),
str: "[\"b\",∞)",
},
{
desc: "RowRange unbound closed",
rr: InfiniteReverseRange("c"),
str: "(∞,\"c\"]",
},
} {
t.Run(test.desc, func(t *testing.T) {
got := test.rr.String()
want := test.str
if !reflect.DeepEqual(got, want) {
t.Errorf("Bad String(): got %v, want %v", got, want)
}
})
}
}
// TestReadRowsInvalidRowSet verifies that the client doesn't send ReadRows() requests with invalid RowSets.
func TestReadRowsInvalidRowSet(t *testing.T) {
testEnv, err := NewEmulatedEnv(IntegrationTestConfig{})
if err != nil {
t.Fatalf("NewEmulatedEnv failed: %v", err)
}
var requestCount int
incrementRequestCount := func() { requestCount++ }
conn, err := grpc.Dial(testEnv.server.Addr, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock(),
grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(100<<20), grpc.MaxCallRecvMsgSize(100<<20)),
grpc.WithStreamInterceptor(requestCallback(incrementRequestCount)),
)
if err != nil {
t.Fatalf("grpc.Dial failed: %v", err)
}
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()
adminClient, err := NewAdminClient(ctx, testEnv.config.Project, testEnv.config.Instance, option.WithGRPCConn(conn))
if err != nil {
t.Fatalf("NewClient failed: %v", err)
}
defer adminClient.Close()
if err := adminClient.CreateTable(ctx, testEnv.config.Table); err != nil {
t.Fatalf("CreateTable(%v) failed: %v", testEnv.config.Table, err)
}
client, err := NewClientWithConfig(ctx, testEnv.config.Project, testEnv.config.Instance, disableMetricsConfig, option.WithGRPCConn(conn))
if err != nil {
t.Fatalf("NewClientWithConfig failed: %v", err)
}
defer client.Close()
table := client.Open(testEnv.config.Table)
tests := []struct {
rr RowSet
valid bool
}{
{
rr: RowRange{startBound: rangeUnbounded, endBound: rangeUnbounded},
valid: true,
},
{
rr: RowRange{startBound: rangeClosed, start: "b", endBound: rangeUnbounded},
valid: true,
},
{
rr: RowRange{startBound: rangeClosed, start: "b", endBound: rangeOpen, end: "c"},
valid: true,
},
{
rr: RowRange{startBound: rangeClosed, start: "b", endBound: rangeOpen, end: "a"},
valid: false,
},
{
rr: RowList{"a"},
valid: true,
},
{
rr: RowList{},
valid: false,
},
}
for _, test := range tests {
requestCount = 0
err = table.ReadRows(ctx, test.rr, func(r Row) bool { return true })
if err != nil {
t.Fatalf("ReadRows(%v) failed: %v", test.rr, err)
}
requestValid := requestCount != 0
if requestValid != test.valid {
t.Errorf("%s: got %v, want %v", test.rr, requestValid, test.valid)
}
}
}
func TestReadRowsRequestStats(t *testing.T) {
testEnv, err := NewEmulatedEnv(IntegrationTestConfig{})
if err != nil {
t.Fatalf("NewEmulatedEnv failed: %v", err)
}
conn, err := grpc.Dial(testEnv.server.Addr, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock(),
grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(100<<20), grpc.MaxCallRecvMsgSize(100<<20)),
)
if err != nil {
t.Fatalf("grpc.Dial failed: %v", err)
}
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()
adminClient, err := NewAdminClient(ctx, testEnv.config.Project, testEnv.config.Instance, option.WithGRPCConn(conn))
if err != nil {
t.Fatalf("NewClient failed: %v", err)
}
defer adminClient.Close()
tableConf := &TableConf{
TableID: testEnv.config.Table,
Families: map[string]GCPolicy{
"f": NoGcPolicy(),
},
}
if err := adminClient.CreateTableFromConf(ctx, tableConf); err != nil {
t.Fatalf("CreateTable(%v) failed: %v", testEnv.config.Table, err)
}
client, err := NewClientWithConfig(ctx, testEnv.config.Project, testEnv.config.Instance, disableMetricsConfig, option.WithGRPCConn(conn))
if err != nil {
t.Fatalf("NewClientWithConfig failed: %v", err)
}
defer client.Close()
table := client.Open(testEnv.config.Table)
m := NewMutation()
m.Set("f", "q", ServerTime, []byte("value"))
if err = table.Apply(ctx, "row1", m); err != nil {
t.Fatalf("Apply failed: %v", err)
}
m = NewMutation()
m.Set("f", "q", ServerTime, []byte("value"))
m.Set("f", "q2", ServerTime, []byte("value2"))
if err = table.Apply(ctx, "row2", m); err != nil {
t.Fatalf("Apply failed: %v", err)
}
m = NewMutation()
m.Set("f", "excluded", ServerTime, []byte("value"))
if err = table.Apply(ctx, "row3", m); err != nil {
t.Fatalf("Apply failed: %v", err)
}
statsChannel := make(chan FullReadStats, 1)
readStart := time.Now()
if err := table.ReadRows(ctx, InfiniteRange(""), func(r Row) bool { return true }, WithFullReadStats(func(s *FullReadStats) { statsChannel <- *s }), RowFilter(ColumnFilter("q.*"))); err != nil {
t.Fatalf("NewClient failed: %v", err)
}
readElapsed := time.Since(readStart)
got := <-statsChannel
wantIter := ReadIterationStats{
RowsSeenCount: 3,
RowsReturnedCount: 2,
CellsSeenCount: 4,
CellsReturnedCount: 3,
}
if diff := cmp.Diff(wantIter, got.ReadIterationStats); diff != "" {
t.Errorf("ReadRows RequestStats are incorrect (-want +got):\n%s", diff)
}
if got.RequestLatencyStats.FrontendServerLatency > readElapsed || got.RequestLatencyStats.FrontendServerLatency <= 0 {
t.Fatalf("ReadRows FrontendServerLatency should be in range 0, %v", readElapsed)
}
}
func TestReadRowsLimit(t *testing.T) {
testEnv, err := NewEmulatedEnv(IntegrationTestConfig{})
if err != nil {
t.Fatalf("NewEmulatedEnv failed: %v", err)
}
conn, err := grpc.Dial(testEnv.server.Addr, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock(),
grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(100<<20), grpc.MaxCallRecvMsgSize(100<<20)),
)
if err != nil {
t.Fatalf("grpc.Dial failed: %v", err)
}
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()
adminClient, err := NewAdminClient(ctx, testEnv.config.Project, testEnv.config.Instance, option.WithGRPCConn(conn))
if err != nil {
t.Fatalf("NewClient failed: %v", err)
}
defer adminClient.Close()
tableConf := &TableConf{
TableID: testEnv.config.Table,
Families: map[string]GCPolicy{
"f": NoGcPolicy(),
},
}
if err := adminClient.CreateTableFromConf(ctx, tableConf); err != nil {
t.Fatalf("CreateTable(%v) failed: %v", testEnv.config.Table, err)
}
client, err := NewClientWithConfig(ctx, testEnv.config.Project, testEnv.config.Instance, disableMetricsConfig, option.WithGRPCConn(conn))
if err != nil {
t.Fatalf("NewClientWithConfig failed: %v", err)
}
defer client.Close()
table := client.Open(testEnv.config.Table)
m := NewMutation()
m.Set("f", "q", ServerTime, []byte("value"))
if err = table.Apply(ctx, "row1", m); err != nil {
t.Fatalf("Apply failed: %v", err)
}
m = NewMutation()
m.Set("f", "q", ServerTime, []byte("value"))
m.Set("f", "q2", ServerTime, []byte("value2"))
if err = table.Apply(ctx, "row2", m); err != nil {
t.Fatalf("Apply failed: %v", err)
}
m = NewMutation()
m.Set("f", "excluded", ServerTime, []byte("value"))
if err = table.Apply(ctx, "row3", m); err != nil {
t.Fatalf("Apply failed: %v", err)
}
for _, test := range []struct {
desc string
limit *int64
wantRowCount int64
wantErr error
}{
{
desc: "No limit",
wantRowCount: 3,
},
{
desc: "Limit less than number of rows in table",
limit: ptr(int64(2)),
wantRowCount: 2,
},
{
desc: "Limit greater than number of rows in table",
limit: ptr(int64(5)),
wantRowCount: 3,
},
{
desc: "Negative row limit",
limit: ptr(int64(-1)),
wantErr: errNegativeRowLimit,
},
} {
gotRowCount := int64(0)
t.Run(test.desc, func(t *testing.T) {
opts := []ReadOption{}
if test.limit != nil {
opts = append(opts, LimitRows(*test.limit))
}
if err := table.ReadRows(ctx, InfiniteRange(""), func(r Row) bool {
gotRowCount++
return true
}, opts...); !errors.Is(err, test.wantErr) {
t.Errorf("ReadRows err got: %v, want: %v", err, test.wantErr)
}
if gotRowCount != test.wantRowCount {
t.Errorf("ReadRows returned %d rows, want %d", gotRowCount, test.wantRowCount)
}
})
}
}
// ptr returns a pointer to its argument.
// It can be used to initialize pointer fields:
func ptr[T any](t T) *T { return &t }
// TestHeaderPopulatedWithAppProfile verifies that request params header is populated with table name and app profile
func TestHeaderPopulatedWithAppProfile(t *testing.T) {
testEnv, err := NewEmulatedEnv(IntegrationTestConfig{})
if err != nil {
t.Fatalf("NewEmulatedEnv failed: %v", err)
}
conn, err := grpc.Dial(testEnv.server.Addr, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock())
if err != nil {
t.Fatalf("grpc.Dial failed: %v", err)
}
ctx := context.Background()
opt := option.WithGRPCConn(conn)
config := ClientConfig{
AppProfile: "my-app-profile",
}
client, err := NewClientWithConfig(ctx, "my-project", "my-instance", config, opt)
if err != nil {
t.Fatalf("Failed to create client %v", err)
}
table := client.Open("my-table")
if table == nil {
t.Fatal("Failed to open table")
}
resourcePrefixHeaderValue := table.md.Get(resourcePrefixHeader)
if got, want := len(resourcePrefixHeaderValue), 1; got != want {
t.Fatalf("Incorrect number of header values in resourcePrefixHeader. Got %d, want %d", got, want)
}
if got, want := resourcePrefixHeaderValue[0], "projects/my-project/instances/my-instance/tables/my-table"; got != want {
t.Errorf("Incorrect value in resourcePrefixHeader. Got %s, want %s", got, want)
}
requestParamsHeaderValue := table.md.Get(requestParamsHeader)
if got, want := len(requestParamsHeaderValue), 1; got != want {
t.Fatalf("Incorrect number of header values in requestParamsHeader. Got %d, want %d", got, want)
}
if got, want := requestParamsHeaderValue[0], "table_name=projects%2Fmy-project%2Finstances%2Fmy-instance%2Ftables%2Fmy-table&app_profile_id=my-app-profile"; got != want {
t.Errorf("Incorrect value in resourcePrefixHeader. Got %s, want %s", got, want)
}
}
func TestMutateRowsWithAggregates_AddToCell(t *testing.T) {
testEnv, err := NewEmulatedEnv(IntegrationTestConfig{})
if err != nil {
t.Fatalf("NewEmulatedEnv failed: %v", err)
}
conn, err := grpc.Dial(testEnv.server.Addr, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock(),
grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(100<<20), grpc.MaxCallRecvMsgSize(100<<20)),
)
if err != nil {
t.Fatalf("grpc.Dial failed: %v", err)
}
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()
adminClient, err := NewAdminClient(ctx, testEnv.config.Project, testEnv.config.Instance, option.WithGRPCConn(conn))
if err != nil {
t.Fatalf("NewClient failed: %v", err)
}
defer adminClient.Close()
tableConf := &TableConf{
TableID: testEnv.config.Table,
ColumnFamilies: map[string]Family{
"f": {
ValueType: AggregateType{
Input: Int64Type{},
Aggregator: SumAggregator{},
},
},
},
}
if err := adminClient.CreateTableFromConf(ctx, tableConf); err != nil {
t.Fatalf("CreateTable(%v) failed: %v", testEnv.config.Table, err)
}
client, err := NewClientWithConfig(ctx, testEnv.config.Project, testEnv.config.Instance, disableMetricsConfig, option.WithGRPCConn(conn))
if err != nil {
t.Fatalf("NewClientWithConfig failed: %v", err)
}
defer client.Close()
table := client.Open(testEnv.config.Table)
m := NewMutation()
m.AddIntToCell("f", "q", 0, 1000)
err = table.Apply(ctx, "row1", m)
if err != nil {
t.Fatalf("Apply failed: %v", err)
}
m = NewMutation()
m.AddIntToCell("f", "q", 0, 2000)
err = table.Apply(ctx, "row1", m)
if err != nil {
t.Fatalf("Apply failed: %v", err)
}
row, err := table.ReadRow(ctx, "row1")
if !bytes.Equal(row["f"][0].Value, binary.BigEndian.AppendUint64([]byte{}, 3000)) {
t.Error()
}
}
func TestMutateRowsWithAggregates_MergeToCell(t *testing.T) {
testEnv, err := NewEmulatedEnv(IntegrationTestConfig{})
if err != nil {
t.Fatalf("NewEmulatedEnv failed: %v", err)
}
conn, err := grpc.Dial(testEnv.server.Addr, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock(),
grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(100<<20), grpc.MaxCallRecvMsgSize(100<<20)),
)
if err != nil {
t.Fatalf("grpc.Dial failed: %v", err)
}
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()
adminClient, err := NewAdminClient(ctx, testEnv.config.Project, testEnv.config.Instance, option.WithGRPCConn(conn))
if err != nil {
t.Fatalf("NewClient failed: %v", err)
}
defer adminClient.Close()
tableConf := &TableConf{
TableID: testEnv.config.Table,
ColumnFamilies: map[string]Family{
"f": {
ValueType: AggregateType{
Input: Int64Type{},
Aggregator: SumAggregator{},
},
},
},
}
if err := adminClient.CreateTableFromConf(ctx, tableConf); err != nil {
t.Fatalf("CreateTable(%v) failed: %v", testEnv.config.Table, err)
}
client, err := NewClientWithConfig(ctx, testEnv.config.Project, testEnv.config.Instance, ClientConfig{MetricsProvider: NoopMetricsProvider{}}, option.WithGRPCConn(conn))
if err != nil {
t.Fatalf("NewClient failed: %v", err)
}
defer client.Close()
table := client.Open(testEnv.config.Table)
m := NewMutation()
m.MergeBytesToCell("f", "q", 0, binary.BigEndian.AppendUint64([]byte{}, 1000))
err = table.Apply(ctx, "row1", m)
if err != nil {
t.Fatalf("Apply failed: %v", err)
}
m = NewMutation()
m.MergeBytesToCell("f", "q", 0, binary.BigEndian.AppendUint64([]byte{}, 2000))
err = table.Apply(ctx, "row1", m)
if err != nil {
t.Fatalf("Apply failed: %v", err)
}
row, err := table.ReadRow(ctx, "row1")
if !bytes.Equal(row["f"][0].Value, binary.BigEndian.AppendUint64([]byte{}, 3000)) {
t.Error()
}
}
type rowKeyCheckingInterceptor struct {
grpc.ClientStream
failRow string
failErr error // error to use while sending failed response for fail row
requestCounter *int
}
func (i *rowKeyCheckingInterceptor) SendMsg(m interface{}) error {
*i.requestCounter = *i.requestCounter + 1
if req, ok := m.(*btpb.MutateRowsRequest); ok {
for _, entry := range req.Entries {
if string(entry.RowKey) == i.failRow {
return i.failErr
}
}
}
return i.ClientStream.SendMsg(m)
}
func (i *rowKeyCheckingInterceptor) RecvMsg(m interface{}) error {
return i.ClientStream.RecvMsg(m)
}
// Mutations are broken down into groups of 'maxMutations' and then MutateRowsRequest is sent to Cloud Bigtable Service
// This test validates that even if one of the group receives error, requests are sent for further groups
func TestApplyBulk_MutationsSucceedAfterGroupError(t *testing.T) {
testEnv, gotErr := NewEmulatedEnv(IntegrationTestConfig{})
if gotErr != nil {
t.Fatalf("NewEmulatedEnv failed: %v", gotErr)
}
// Add interceptor to fail rows
failedRow := "row2"
failErr := status.Error(codes.InvalidArgument, "Invalid row key")
reqCount := 0
conn, gotErr := grpc.Dial(testEnv.server.Addr, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock(),
grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(100<<20), grpc.MaxCallRecvMsgSize(100<<20)),
grpc.WithStreamInterceptor(func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
clientStream, err := streamer(ctx, desc, cc, method, opts...)
return &rowKeyCheckingInterceptor{
ClientStream: clientStream,
failRow: failedRow,
requestCounter: &reqCount,
failErr: failErr,
}, err
}),
)
if gotErr != nil {
t.Fatalf("grpc.Dial failed: %v", gotErr)
}
// Create client and table
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()
adminClient, gotErr := NewAdminClient(ctx, testEnv.config.Project, testEnv.config.Instance, option.WithGRPCConn(conn))
if gotErr != nil {
t.Fatalf("NewClient failed: %v", gotErr)
}
defer adminClient.Close()
tableConf := &TableConf{
TableID: testEnv.config.Table,
ColumnFamilies: map[string]Family{
"f": {
ValueType: AggregateType{
Input: Int64Type{},
Aggregator: SumAggregator{},
},
},
},
}
if err := adminClient.CreateTableFromConf(ctx, tableConf); err != nil {
t.Fatalf("CreateTable(%v) failed: %v", testEnv.config.Table, err)
}
client, gotErr := NewClientWithConfig(ctx, testEnv.config.Project, testEnv.config.Instance, disableMetricsConfig, option.WithGRPCConn(conn))
if gotErr != nil {
t.Fatalf("NewClientWithConfig failed: %v", gotErr)
}
defer client.Close()
table := client.Open(testEnv.config.Table)
// Override maxMutations to break mutations into smaller groups
origMaxMutations := maxMutations
t.Cleanup(func() {
maxMutations = origMaxMutations
})
maxMutations = 2
// Create mutations
m1 := NewMutation()
m1.AddIntToCell("f", "q", 0, 1000)
m2 := NewMutation()
m2.AddIntToCell("f", "q", 0, 2000)
// Perform ApplyBulk operation and compare errors
rowKeys := []string{"row1", "row1", failedRow, failedRow, "row3", "row3"}
var wantErr error
wantErrs := []error{nil, nil, failErr, failErr, nil, nil}
gotErrs, gotErr := table.ApplyBulk(ctx, rowKeys, []*Mutation{m1, m2, m1, m2, m1, m2})
// Assert overall error
if !equalErrs(gotErr, wantErr) {
t.Fatalf("ApplyBulk err got: %v, want: %v", gotErr, wantErr)
}
// Assert individual muation errors
if len(gotErrs) != len(wantErrs) {
t.Fatalf("ApplyBulk errs got: %v, want: %v", gotErrs, wantErrs)
}
for i := range gotErrs {
if !equalErrs(gotErrs[i], wantErrs[i]) {
t.Errorf("#%d ApplyBulk err got: %v, want: %v", i, gotErrs[i], wantErrs[i])
}
}
// Assert number of requests sent
wantReqCount := len(rowKeys) / maxMutations
if reqCount != wantReqCount {
t.Errorf("Number of requests got: %v, want: %v", reqCount, wantReqCount)
}
// Assert individual mutation apply success/failure by reading rows
gotErr = table.ReadRows(ctx, RowList{"row1", failedRow, "row3"}, func(row Row) bool {
rowMutated := bytes.Equal(row["f"][0].Value, binary.BigEndian.AppendUint64([]byte{}, 3000))
if rowMutated && row.Key() == failedRow {
t.Error("Expected mutation to fail for row " + row.Key())
}
if !rowMutated && row.Key() != failedRow {
t.Error("Expected mutation to succeed for row " + row.Key())
}
return true
})
if gotErr != nil {
t.Fatalf("ReadRows failed: %v", gotErr)
}
}
func TestAnySQLTypeToPbVal(t *testing.T) {
testTime := time.Now()
testDate := civil.DateOf(time.Now())
tests := []struct {
testName string
paramVal any
psType SQLType
wantPbVal *btpb.Value
wantErr bool
wantErrMsg string
}{
{
testName: "BytesSQLType success",
paramVal: []byte("test"),
psType: BytesSQLType{},
wantPbVal: &btpb.Value{
Type: &btpb.Type{
Kind: &btpb.Type_BytesType{
BytesType: &btpb.Type_Bytes{},
},
},
Kind: &btpb.Value_BytesValue{
BytesValue: []byte("test"),
},
},
},
{
testName: "BytesSQLType nil success",
psType: BytesSQLType{},
wantPbVal: &btpb.Value{
Type: &btpb.Type{
Kind: &btpb.Type_BytesType{
BytesType: &btpb.Type_Bytes{},
},
},
},
},
{
testName: "BytesSQLType type mismatch",
paramVal: "test",
psType: BytesSQLType{},
wantErr: true,
wantErrMsg: ptr(errTypeMismatch{value: "test", psType: BytesSQLType{}}).Error(),
},
{
testName: "StringSQLType success",
paramVal: "test",
psType: StringSQLType{},
wantPbVal: &btpb.Value{
Type: &btpb.Type{
Kind: &btpb.Type_StringType{
StringType: &btpb.Type_String{},
},
},
Kind: &btpb.Value_StringValue{
StringValue: "test",
},
},
},
{
testName: "StringSQLType nil success",
psType: StringSQLType{},
wantPbVal: &btpb.Value{
Type: &btpb.Type{
Kind: &btpb.Type_StringType{
StringType: &btpb.Type_String{},
},
},
},
},
{
testName: "StringSQLType type mismatch",
paramVal: 123,
psType: StringSQLType{},
wantErr: true,
wantErrMsg: ptr(errTypeMismatch{value: 123, psType: StringSQLType{}}).Error(),
},
{
testName: "Int64SQLType success",
paramVal: int64(123),
psType: Int64SQLType{},
wantPbVal: &btpb.Value{
Type: &btpb.Type{
Kind: &btpb.Type_Int64Type{
Int64Type: &btpb.Type_Int64{},
},
},
Kind: &btpb.Value_IntValue{
IntValue: int64(123),
},
},
},
{
testName: "Int64SQLType nil success",
psType: Int64SQLType{},
wantPbVal: &btpb.Value{
Type: &btpb.Type{
Kind: &btpb.Type_Int64Type{
Int64Type: &btpb.Type_Int64{},
},
},
},
},
{
testName: "Int64SQLType type mismatch",
paramVal: "123",
psType: Int64SQLType{},
wantErr: true,
wantErrMsg: ptr(errTypeMismatch{value: "123", psType: Int64SQLType{}}).Error(),
},
{
testName: "Float32SQLType success",
paramVal: float32(1.23),
psType: Float32SQLType{},
wantPbVal: &btpb.Value{
Type: &btpb.Type{
Kind: &btpb.Type_Float32Type{
Float32Type: &btpb.Type_Float32{},
},
},
Kind: &btpb.Value_FloatValue{
FloatValue: float64(1.23),
},
},
},
{
testName: "Float32SQLType nil success",
psType: Float32SQLType{},
wantPbVal: &btpb.Value{
Type: &btpb.Type{
Kind: &btpb.Type_Float32Type{
Float32Type: &btpb.Type_Float32{},
},
},
},
},
{
testName: "Float32SQLType type mismatch - string",
paramVal: "1.23",
psType: Float32SQLType{},
wantErr: true,
wantErrMsg: ptr(errTypeMismatch{value: "1.23", psType: Float32SQLType{}}).Error(),
},
{
testName: "Float32SQLType type mismatch - float64",
paramVal: float64(1.23),
psType: Float32SQLType{},
wantErr: true,
wantErrMsg: ptr(errTypeMismatch{value: float64(1.23), psType: Float32SQLType{}}).Error(),
},
{
testName: "Float64SQLType success",
paramVal: float64(1.23),
psType: Float64SQLType{},
wantPbVal: &btpb.Value{
Type: &btpb.Type{
Kind: &btpb.Type_Float64Type{
Float64Type: &btpb.Type_Float64{},
},
},
Kind: &btpb.Value_FloatValue{
FloatValue: float64(1.23),
},
},
},
{
testName: "Float64SQLType nil success",
psType: Float64SQLType{},
wantPbVal: &btpb.Value{
Type: &btpb.Type{
Kind: &btpb.Type_Float64Type{
Float64Type: &btpb.Type_Float64{},
},
},
},
},
{
testName: "Float64SQLType type mismatch - string",
paramVal: "1.23",
psType: Float64SQLType{},
wantErr: true,
wantErrMsg: ptr(errTypeMismatch{value: "1.23", psType: Float64SQLType{}}).Error(),
},
{
testName: "Float64SQLType type mismatch - float32",
paramVal: float32(1.23),
psType: Float64SQLType{},
wantErr: true,
wantErrMsg: ptr(errTypeMismatch{value: float32(1.23), psType: Float64SQLType{}}).Error(),
},
{
testName: "BoolSQLType success",
paramVal: true,
psType: BoolSQLType{},
wantPbVal: &btpb.Value{
Type: &btpb.Type{
Kind: &btpb.Type_BoolType{
BoolType: &btpb.Type_Bool{},
},
},
Kind: &btpb.Value_BoolValue{
BoolValue: true,
},
},
},
{
testName: "BoolSQLType nil success",
psType: BoolSQLType{},
wantPbVal: &btpb.Value{
Type: &btpb.Type{
Kind: &btpb.Type_BoolType{
BoolType: &btpb.Type_Bool{},
},
},
},
},
{
testName: "BoolSQLType type mismatch",
paramVal: "true",
psType: BoolSQLType{},
wantErr: true,
wantErrMsg: ptr(errTypeMismatch{value: "true", psType: BoolSQLType{}}).Error(),
},
{
testName: "TimestampSQLType success",
paramVal: testTime,
psType: TimestampSQLType{},
wantPbVal: &btpb.Value{
Type: &btpb.Type{
Kind: &btpb.Type_TimestampType{
TimestampType: &btpb.Type_Timestamp{},
},
},
Kind: &btpb.Value_TimestampValue{
TimestampValue: timestamppb.New(testTime),
},
},
},
{
testName: "TimestampSQLType nil success",
psType: TimestampSQLType{},
wantPbVal: &btpb.Value{
Type: &btpb.Type{
Kind: &btpb.Type_TimestampType{
TimestampType: &btpb.Type_Timestamp{},
},
},
},
},
{
testName: "TimestampSQLType type mismatch",
paramVal: "2024-01-01",
psType: TimestampSQLType{},
wantErr: true,
wantErrMsg: ptr(errTypeMismatch{value: "2024-01-01", psType: TimestampSQLType{}}).Error(),
},
{
testName: "DateSQLType success",
paramVal: testDate,
psType: DateSQLType{},
wantPbVal: &btpb.Value{
Type: &btpb.Type{
Kind: &btpb.Type_DateType{
DateType: &btpb.Type_Date{},
},
},
Kind: &btpb.Value_DateValue{
DateValue: &date.Date{Year: int32(testDate.Year), Month: int32(testDate.Month), Day: int32(testDate.Day)},
},
},
},
{
testName: "DateSQLType nil success",
psType: DateSQLType{},
wantPbVal: &btpb.Value{
Type: &btpb.Type{
Kind: &btpb.Type_DateType{
DateType: &btpb.Type_Date{},
},
},
},
},
{
testName: "DateSQLType type mismatch",
paramVal: "2024-01-01",
psType: DateSQLType{},
wantErr: true,
wantErrMsg: ptr(errTypeMismatch{value: "2024-01-01", psType: DateSQLType{}}).Error(),
},
{
testName: "ArraySQLType success concrete type",
paramVal: []int64{1, 2, 3},
psType: ArraySQLType{ElemType: Int64SQLType{}},
wantPbVal: &btpb.Value{
Type: &btpb.Type{
Kind: &btpb.Type_ArrayType{
ArrayType: &btpb.Type_Array{
ElementType: &btpb.Type{
Kind: &btpb.Type_Int64Type{
Int64Type: &btpb.Type_Int64{},
},
},
},
},
},
Kind: &btpb.Value_ArrayValue{
ArrayValue: &btpb.ArrayValue{
Values: []*btpb.Value{
{
Kind: &btpb.Value_IntValue{
IntValue: int64(1),
},
},
{
Kind: &btpb.Value_IntValue{
IntValue: int64(2),
},
},
{
Kind: &btpb.Value_IntValue{
IntValue: int64(3),
},
},
},
},
},
},
},
{
testName: "ArraySQLType success any type with nil",
paramVal: []any{1, 2, 3, nil},
psType: ArraySQLType{ElemType: Int64SQLType{}},
wantPbVal: &btpb.Value{
Type: &btpb.Type{
Kind: &btpb.Type_ArrayType{
ArrayType: &btpb.Type_Array{
ElementType: &btpb.Type{
Kind: &btpb.Type_Int64Type{
Int64Type: &btpb.Type_Int64{},
},
},
},
},
},
Kind: &btpb.Value_ArrayValue{
ArrayValue: &btpb.ArrayValue{
Values: []*btpb.Value{
{
Kind: &btpb.Value_IntValue{
IntValue: int64(1),
},
},
{
Kind: &btpb.Value_IntValue{
IntValue: int64(2),
},
},
{
Kind: &btpb.Value_IntValue{
IntValue: int64(3),
},
},
{},
},
},
},
},
},
{
testName: "ArraySQLType success int32 in int64",
paramVal: []int32{1, 2, 3},
psType: ArraySQLType{ElemType: Int64SQLType{}},
wantPbVal: &btpb.Value{
Type: &btpb.Type{
Kind: &btpb.Type_ArrayType{
ArrayType: &btpb.Type_Array{
ElementType: &btpb.Type{
Kind: &btpb.Type_Int64Type{
Int64Type: &btpb.Type_Int64{},
},
},
},
},
},
Kind: &btpb.Value_ArrayValue{
ArrayValue: &btpb.ArrayValue{
Values: []*btpb.Value{
{
Kind: &btpb.Value_IntValue{
IntValue: int64(1),
},
},
{
Kind: &btpb.Value_IntValue{
IntValue: int64(2),
},
},
{
Kind: &btpb.Value_IntValue{
IntValue: int64(3),
},
},
},
},
},
},
},
{
testName: "ArraySQLType nil success",
psType: ArraySQLType{ElemType: Int64SQLType{}},
wantPbVal: &btpb.Value{
Type: &btpb.Type{
Kind: &btpb.Type_ArrayType{
ArrayType: &btpb.Type_Array{
ElementType: &btpb.Type{
Kind: &btpb.Type_Int64Type{
Int64Type: &btpb.Type_Int64{},
},
},
},
},
},
},
},
{
testName: "ArraySQLType empty array success",
paramVal: []int64{},
psType: ArraySQLType{ElemType: Int64SQLType{}},
wantPbVal: &btpb.Value{
Type: &btpb.Type{
Kind: &btpb.Type_ArrayType{
ArrayType: &btpb.Type_Array{
ElementType: &btpb.Type{
Kind: &btpb.Type_Int64Type{
Int64Type: &btpb.Type_Int64{},
},
},
},
},
},
Kind: &btpb.Value_ArrayValue{
ArrayValue: &btpb.ArrayValue{
Values: []*btpb.Value{},
},
},
},
},
{
testName: "ArraySQLType type mismatch",
paramVal: "not an array",
psType: ArraySQLType{ElemType: Int64SQLType{}},
wantErr: true,
wantErrMsg: ptr(errTypeMismatch{value: "not an array", psType: ArraySQLType{ElemType: Int64SQLType{}}}).Error(),
},
{
testName: "ArraySQLType element type mismatch",
paramVal: []any{int64(1), "not an int", int64(3)},
psType: ArraySQLType{ElemType: Int64SQLType{}},
wantErr: true,
wantErrMsg: ptr(errTypeMismatch{value: "not an int", psType: Int64SQLType{}}).Error(),
},
{
testName: "ArraySQLType unsupported ElemType",
paramVal: []int64{1, 2, 3},
psType: ArraySQLType{ElemType: ArraySQLType{ElemType: Int64SQLType{}}},
wantErr: true,
wantErrMsg: "bigtable: unsupported ElemType: bigtable.ArraySQLType",
},
}
for _, tt := range tests {
t.Run(tt.testName, func(t *testing.T) {
got, err := anySQLTypeToPbVal(tt.paramVal, tt.psType)
if (err != nil) != tt.wantErr {
t.Errorf("error got: %v, want: nil", err)
return
}
if tt.wantErr {
if err != nil && !strings.Contains(err.Error(), tt.wantErrMsg) {
t.Errorf("error got: %v, want: %v", err, tt.wantErrMsg)
}
return
}
if !cmp.Equal(got, tt.wantPbVal, cmpOptionsBtpbValue()...) {
t.Errorf("SQLType value got: %+v, want: %+v, diff: %v", got, tt.wantPbVal, cmp.Diff(got, tt.wantPbVal, cmpOptionsBtpbValue()...))
}
})
}
}
func cmpOptionsBtpbValue() []cmp.Option {
return []cmp.Option{cmpopts.IgnoreUnexported(btpb.Value{}, btpb.Type{},
btpb.Type_BytesType{}, btpb.Type_Bytes{},
btpb.Type_StringType{}, btpb.Type_String{},
btpb.Type_Int64Type{}, btpb.Type_Int64{},
btpb.Type_Float32Type{}, btpb.Type_Float32{},
btpb.Type_Float64Type{}, btpb.Type_Float64{},
btpb.Type_BoolType{}, btpb.Type_Bool{},
btpb.Type_TimestampType{}, btpb.Type_Timestamp{},
btpb.Type_DateType{}, btpb.Type_Date{},
btpb.Type_ArrayType{}, btpb.Type_Array{},
btpb.Value_BytesValue{},
btpb.Value_StringValue{},
btpb.Value_IntValue{},
btpb.Value_FloatValue{},
btpb.Value_BoolValue{},
btpb.Value_TimestampValue{}, timestamppb.Timestamp{},
btpb.Value_DateValue{}, date.Date{},
btpb.Value_ArrayValue{}, btpb.ArrayValue{}),
cmpopts.IgnoreFields(btpb.Value_FloatValue{}, "FloatValue")}
}
func TestPreparedStatementBind(t *testing.T) {
tests := []struct {
testName string
query string
paramTypes map[string]SQLType
values map[string]any
wantErr bool
wantErrMsg string
}{
{
testName: "no parameter error",
paramTypes: map[string]SQLType{},
values: map[string]any{"param1": "value1"},
wantErr: true,
wantErrMsg: "bigtable: no parameter with name param1 in prepared statement",
},
{
testName: "not bound error - single missing",
paramTypes: map[string]SQLType{"param1": StringSQLType{}, "param2": StringSQLType{}},
values: map[string]any{"param1": "value1"},
wantErr: true,
wantErrMsg: "bigtable: parameter \"param2\" not bound in call to Bind",
},
{
testName: "not bound error - all missing",
paramTypes: map[string]SQLType{"param1": StringSQLType{}, "param2": StringSQLType{}},
values: nil,
wantErr: true,
wantErrMsg: "not bound in call to Bind",
},
}
for _, tt := range tests {
t.Run(tt.testName, func(t *testing.T) {
ps := PreparedStatement{
paramTypes: tt.paramTypes,
}
_, err := ps.Bind(tt.values)
if err == nil && tt.wantErr {
t.Fatalf("Bind: err got: nil, want: %v", tt.wantErrMsg)
}
if err != nil && !strings.Contains(err.Error(), tt.wantErrMsg) {
t.Fatalf("Bind: err got: %v, want: %v", err, tt.wantErrMsg)
}
})
}
}
func TestExecuteQuery(t *testing.T) {
var gotPrepReqCount int
var mockPrepQueryResps []prepareQueryResp
var gotRecvMsgCount int
var mockRecvMsgResps []recvMsgResp
var gotSendMsgReqs []*btpb.ExecuteQueryRequest
const defaultPreparedQueryTTL = 10 * time.Second
var testPreparedQueryTTL = defaultPreparedQueryTTL
// start emulated server
testEnv, gotErr := NewEmulatedEnv(IntegrationTestConfig{})
if gotErr != nil {
t.Fatalf("NewEmulatedEnv failed: %v", gotErr)
}
// Create client
conn, gotErr := grpc.Dial(testEnv.server.Addr, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock(),
grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(100<<20), grpc.MaxCallRecvMsgSize(100<<20)),
grpc.WithStreamInterceptor(
newStreamClientInterceptor(&gotRecvMsgCount, &gotSendMsgReqs, &mockRecvMsgResps)),
grpc.WithUnaryInterceptor(
newUnaryClientInterceptor(&gotPrepReqCount, &mockPrepQueryResps, &testPreparedQueryTTL)),
)
if gotErr != nil {
t.Fatalf("grpc.Dial failed: %v", gotErr)
}
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Minute)
defer cancel()
client, gotErr := NewClientWithConfig(ctx, testEnv.config.Project, testEnv.config.Instance, disableMetricsConfig, option.WithGRPCConn(conn))
if gotErr != nil {
t.Fatalf("NewClientWithConfig failed: %v", gotErr)
}
defer client.Close()
stPcfIa, _ := status.New(codes.InvalidArgument, "invalid argument").WithDetails(&errdetails.PreconditionFailure{
Violations: []*errdetails.PreconditionFailure_Violation{
{
Type: queryExpiredViolationType,
Description: "The prepared query has expired. Please re-issue the ExecuteQuery with a valid prepared query.",
},
},
})
aePcfIa, _ := apierror.FromError(stPcfIa.Err())
stPcfFp, _ := status.New(codes.FailedPrecondition, "failed precondition").WithDetails(&errdetails.PreconditionFailure{
Violations: []*errdetails.PreconditionFailure_Violation{
{
Type: queryExpiredViolationType,
Description: "The prepared query has expired. Please re-issue the ExecuteQuery with a valid prepared query.",
},
},
})
aePcfFp, _ := apierror.FromError(stPcfFp.Err())
preparedQuery1 := "first mock prepared query"
preparedQuery2 := "second mock prepared query"
for _, tc := range []struct {
desc string
testTTL time.Duration
mockPrepQueryResps []prepareQueryResp
mockRecvMsgResps []recvMsgResp
wantExecReqPrepQuerys [][]byte
wantResultRowValues [][]*btpb.Value
wantExecErr error
}{
{
/*
1. PrepareQuery
2. ExecuteQuery
3. RecvMsg - gets first half of a batch of data with reset true
4. RecvMsg - gets second half of a batch of data with reset false
5. RecvMsg - gets resume token
6. RecvMsg - gets EOF
*/
desc: "success with single batch received across multiple responses",
mockPrepQueryResps: []prepareQueryResp{
newPrepareQueryResp(preparedQuery1, colFamAddress, colFamInfo),
},
mockRecvMsgResps: []recvMsgResp{
newExecQueryRespPartialBatchFirstHalf(true /* reset */, nil /* sleep */, []string{colFamAddress}),
newExecQueryRespPartialBatchSecondHalf(false /* reset */, nil /* sleep */, []string{colFamAddress}, []string{colFamInfo}),
newExecQueryRespResumeToken(),
{err: io.EOF},
},
wantExecReqPrepQuerys: [][]byte{
[]byte(preparedQuery1),
},
wantResultRowValues: [][]*btpb.Value{newProtoRowValuesWithKey(colFamAddress, colFamInfo)},
},
{
/*
1. PrepareQuery
2. ExecuteQuery
3. RecvMsg - gets first half of a batch of data with reset true - half1
3. RecvMsg - receives Unavailable error
3. RecvMsg - gets first half of a batch of data with reset true - half2
4. RecvMsg - gets second half of a batch of data with reset false - half3
5. RecvMsg - gets resume token
6. RecvMsg - gets EOF
The resulting row should contain only half2 and half3
*/
desc: "success with single batch received across multiple responses with reset",
mockPrepQueryResps: []prepareQueryResp{
newPrepareQueryResp(preparedQuery1, colFamAddress, colFamInfo),
},
mockRecvMsgResps: []recvMsgResp{
newExecQueryRespPartialBatchFirstHalf(true /* reset */, nil /* sleep */, []string{colFamAddress}),
{err: status.Error(codes.Unavailable, "mock unavailable error")},
newExecQueryRespPartialBatchFirstHalf(true /* reset */, nil /* sleep */, []string{colFamAddressNew}),
newExecQueryRespPartialBatchSecondHalf(false /* reset */, nil /* sleep */, []string{colFamAddressNew}, []string{colFamInfo}),
newExecQueryRespResumeToken(),
{err: io.EOF},
},
wantExecReqPrepQuerys: [][]byte{
[]byte(preparedQuery1),
[]byte(preparedQuery1),
},
wantResultRowValues: [][]*btpb.Value{newProtoRowValuesWithKey(colFamAddressNew, colFamInfo)},
},
{
/*
1. PrepareQuery
2. ExecuteQuery
3. RecvMsg - gets a batch of data with reset true
4. RecvMsg - gets EOF
*/
desc: "server stream ended without ResumeToken",
mockPrepQueryResps: []prepareQueryResp{
newPrepareQueryResp(preparedQuery1, colFamAddress),
},
mockRecvMsgResps: []recvMsgResp{
newExecQueryRespFullBatch(true, nil /* sleep */, []string{colFamAddress}),
{err: io.EOF},
},
wantExecReqPrepQuerys: [][]byte{
[]byte(preparedQuery1),
},
wantExecErr: errors.New("bigtable: server stream ended without sending a resume token"),
},
{
/*
1. PrepareQuery
2. ExecuteQuery
3. RecvMsg - receives FailedPrecondition error
3. PrepareQuery
4. ExecuteQuery
5. RecvMsg - gets a batch of data with reset true
6. RecvMsg - gets resume token
7. RecvMsg - gets EOF
*/
desc: "retry on expired query FailedPrecondition error",
mockPrepQueryResps: []prepareQueryResp{
newPrepareQueryResp(preparedQuery1, colFamAddress),
newPrepareQueryResp(preparedQuery2, colFamAddress),
},
mockRecvMsgResps: []recvMsgResp{
{err: aePcfFp},
newExecQueryRespFullBatch(true, nil /* sleep */, []string{colFamAddress}),
newExecQueryRespResumeToken(),
{err: io.EOF},
},
wantExecReqPrepQuerys: [][]byte{
[]byte(preparedQuery1),
[]byte(preparedQuery2),
},
wantResultRowValues: [][]*btpb.Value{newProtoRowValuesWithKey(colFamAddress)},
},
{
/*
1. PrepareQuery
2. ExecuteQuery
3. RecvMsg - receives InvalidArgument with PrecondtionFailure error
*/
desc: "should not retry on expired query when code is not FailedPrecondition",
mockPrepQueryResps: []prepareQueryResp{
newPrepareQueryResp(preparedQuery1, colFamAddress),
},
mockRecvMsgResps: []recvMsgResp{
{err: aePcfIa},
},
wantExecReqPrepQuerys: [][]byte{
[]byte(preparedQuery1),
},
wantExecErr: status.Error(codes.InvalidArgument, "invalid argument"),
},
{
/*
1. PrepareQuery
2. ExecuteQuery
3. RecvMsg - gets a batch of data with reset true
4. RecvMsg - gets resume token
5. RecvMsg - gets Unavailable error
6. ExecuteQuery
7. RecvMsg - gets EOF
*/
desc: "transient error after receiving first resume token should not refresh query",
mockPrepQueryResps: []prepareQueryResp{
newPrepareQueryResp(preparedQuery1, colFamAddress), // PrepareStatement
},
mockRecvMsgResps: []recvMsgResp{
newExecQueryRespFullBatch(true, nil /* sleep */, []string{colFamAddress}),
newExecQueryRespResumeToken(),
{err: status.Error(codes.Unavailable, "transient error")},
{err: io.EOF},
},
wantExecReqPrepQuerys: [][]byte{
[]byte(preparedQuery1),
[]byte(preparedQuery1),
},
wantResultRowValues: [][]*btpb.Value{newProtoRowValuesWithKey(colFamAddress)},
},
{
/*
1. PrepareQuery
2. ExecuteQuery
3. RecvMsg - gets a batch of data with reset true
4. RecvMsg - receives DeadlineExceeded error after query TTL has passed
5. PrepareQuery
6. ExecuteQuery
7. RecvMsg - gets resume token
8. RecvMsg - gets EOF
*/
desc: "retry on time-based expired query",
testTTL: 50 * time.Millisecond,
mockPrepQueryResps: []prepareQueryResp{
newPrepareQueryResp(preparedQuery1, colFamAddress), // PrepareStatement
newPrepareQueryResp(preparedQuery1, colFamAddress), // From Execute, because expired query
},
mockRecvMsgResps: []recvMsgResp{
newExecQueryRespFullBatch(true, nil /* sleep */, []string{colFamAddress}),
{
sleep: ptr(150 * time.Millisecond),
err: status.Error(codes.DeadlineExceeded, "context deadline exceeded"), // retryable
},
newExecQueryRespResumeToken(),
{err: io.EOF},
},
wantExecReqPrepQuerys: [][]byte{
[]byte(preparedQuery1),
[]byte(preparedQuery1),
},
wantResultRowValues: [][]*btpb.Value{newProtoRowValuesWithKey(colFamAddress)},
},
{
/*
1. PrepareQuery
2. ExecuteQuery
3. RecvMsg - receives FailedPrecondition error
4. PrepareQuery - receives DeadlineExceeded error
5. PrepareQuery
6. ExecuteQuery
7. RecvMsg - gets a batch of data with reset true
8. RecvMsg - gets resume token
9. RecvMsg - gets EOF
*/
desc: "retryable error from PrepareQuery should retry PrepareQuery and Execute",
mockPrepQueryResps: []prepareQueryResp{
newPrepareQueryResp(preparedQuery1, colFamAddress), // PrepareStatement
{err: status.Error(codes.DeadlineExceeded, "context deadline exceeded")}, // Execute, retryable error
newPrepareQueryResp(preparedQuery1, colFamAddress),
},
mockRecvMsgResps: []recvMsgResp{
{err: aePcfFp},
newExecQueryRespFullBatch(true, nil /* sleep */, []string{colFamAddress}),
newExecQueryRespResumeToken(),
{err: io.EOF},
},
wantExecReqPrepQuerys: [][]byte{
[]byte(preparedQuery1),
[]byte(preparedQuery1),
},
wantResultRowValues: [][]*btpb.Value{newProtoRowValuesWithKey(colFamAddress)},
},
{
/*
1. PrepareQuery
2. ExecuteQuery
3. RecvMsg - gets first batch of data with reset true
4. RecvMsg - gets second batch of data with reset false
5. RecvMsg - gets Unavailable error
6. ExecuteQuery
7. RecvMsg - gets query expired error
8. PrepareQuery - receives changed metadata
9. ExecuteQuery
10. RecvMsg - gets first batch of new data with reset true
11. RecvMsg - gets resume token
12. RecvMsg - gets EOF error
*/
desc: "batch should be discarded if metadata changed and reset true",
mockPrepQueryResps: []prepareQueryResp{
newPrepareQueryResp(preparedQuery1, colFamAddress), // Step 1
newPrepareQueryResp(preparedQuery2, colFamAddress, colFamInfo), // Step 8
},
mockRecvMsgResps: []recvMsgResp{
newExecQueryRespFullBatch(true, nil /* sleep */, []string{colFamAddress}), // Step 3
newExecQueryRespFullBatch(false, nil /* sleep */, []string{colFamAddress}), // Step 4
{err: status.Error(codes.Unavailable, "mock unavailable error")}, // Step 5, retryable error
{err: aePcfFp}, // Step 7, retryable error
newExecQueryRespFullBatch(true, nil /* sleep */, []string{colFamAddress, colFamInfo}), // Step 10
newExecQueryRespResumeToken(), // Step 11
{err: io.EOF}, // Step 12
},
wantExecReqPrepQuerys: [][]byte{
[]byte(preparedQuery1), // Step 2
[]byte(preparedQuery1), // Step 6
[]byte(preparedQuery2), // Step 9
},
wantResultRowValues: [][]*btpb.Value{newProtoRowValuesWithKey(colFamAddress, colFamInfo)},
},
{
/*
1. PrepareQuery
2. ExecuteQuery
3. RecvMsg - gets resume token
4. RecvMsg - gets resume token
5. RecvMsg - gets EOF error
*/
desc: "multiple resume tokens with no data",
mockPrepQueryResps: []prepareQueryResp{
newPrepareQueryResp(preparedQuery1, colFamAddress),
},
mockRecvMsgResps: []recvMsgResp{
newExecQueryRespResumeToken(),
newExecQueryRespResumeToken(),
{err: io.EOF},
},
wantExecReqPrepQuerys: [][]byte{
[]byte(preparedQuery1),
},
},
} {
mockPrepQueryResps = tc.mockPrepQueryResps
mockRecvMsgResps = tc.mockRecvMsgResps
if tc.testTTL != 0 {
testPreparedQueryTTL = tc.testTTL
} else {
testPreparedQueryTTL = defaultPreparedQueryTTL
}
// Reset vars for the test
gotPrepReqCount = 0
gotRecvMsgCount = 0
gotSendMsgReqs = []*btpb.ExecuteQueryRequest{}
gotRowCount := 0
t.Run(tc.desc, func(t *testing.T) {
// Prepare query
ps, err := client.PrepareStatement(ctx, "SELECT * FROM users;", nil)
if err != nil {
t.Fatalf("PrepareStatement: %v", err)
}
bs, err := ps.Bind(nil)
if err != nil {
t.Fatalf("Bind: %v", err)
}
// Execute query
err = bs.Execute(ctx, func(rr ResultRow) bool {
vals := rr.pbValues
// Compare row values
if gotRowCount > len(tc.wantResultRowValues) ||
!cmp.Equal(vals, tc.wantResultRowValues[gotRowCount], cmpOptionsBtpbValue()...) {
t.Errorf("#%d ResultRow.values: got: %+v, want: %+v, diff: %+v", gotRowCount, vals, tc.wantResultRowValues[gotRowCount],
cmp.Diff(vals, tc.wantResultRowValues[gotRowCount], cmpOptionsBtpbValue()...))
return false
}
return true
})
if tc.wantExecErr != nil && err != nil && err.Error() != tc.wantExecErr.Error() {
t.Fatalf("Execute: err: got: %v, want: %v", err, tc.wantExecErr)
} else if (err != nil && tc.wantExecErr == nil) || (err == nil && tc.wantExecErr != nil) {
t.Fatalf("Execute: err got: %v, want: %v", err, tc.wantExecErr)
}
if gotPrepReqCount != len(tc.mockPrepQueryResps) {
t.Fatalf("PrepareQuery request count: got: %v, want: %v", gotPrepReqCount, len(tc.mockPrepQueryResps))
}
if gotRecvMsgCount != len(tc.mockRecvMsgResps) {
t.Fatalf("RecvMsg request count: got: %v, want: %v", gotRecvMsgCount, len(tc.mockRecvMsgResps))
}
if len(tc.wantExecReqPrepQuerys) != len(gotSendMsgReqs) {
t.Fatalf("ExecuteQuery request count: got: %v, want: %v", len(gotSendMsgReqs), len(tc.wantExecReqPrepQuerys))
}
for i, wantPrepQueryInExecReq := range tc.wantExecReqPrepQuerys {
if string(gotSendMsgReqs[i].PreparedQuery) != string(wantPrepQueryInExecReq) {
t.Fatalf("%v: PreparedQuery in ExecuteQuery request: got: %v, want: %v",
i,
string(gotSendMsgReqs[i].PreparedQuery), string(wantPrepQueryInExecReq))
}
}
})
}
}
const colFamAddress = "address"
const colFamAddressNew = "address-new" // Used only for values and not metadata
const colFamInfo = "info"
var cfToValues = map[string][]*btpb.Value{
colFamAddress: {
{
Kind: &btpb.Value_ArrayValue{
ArrayValue: &btpb.ArrayValue{
Values: []*btpb.Value{
{
Kind: &btpb.Value_BytesValue{
BytesValue: []byte("city"),
},
},
{
Kind: &btpb.Value_BytesValue{
BytesValue: []byte("San Francisco"),
},
},
},
},
},
},
{
Kind: &btpb.Value_ArrayValue{
ArrayValue: &btpb.ArrayValue{
Values: []*btpb.Value{
{
Kind: &btpb.Value_BytesValue{
BytesValue: []byte("state"),
},
},
{
Kind: &btpb.Value_BytesValue{
BytesValue: []byte("CA"),
},
},
},
},
},
},
},
colFamAddressNew: {
{
Kind: &btpb.Value_ArrayValue{
ArrayValue: &btpb.ArrayValue{
Values: []*btpb.Value{
{
Kind: &btpb.Value_BytesValue{
BytesValue: []byte("city"),
},
},
{
Kind: &btpb.Value_BytesValue{
BytesValue: []byte("Kirkland"),
},
},
},
},
},
},
{
Kind: &btpb.Value_ArrayValue{
ArrayValue: &btpb.ArrayValue{
Values: []*btpb.Value{
{
Kind: &btpb.Value_BytesValue{
BytesValue: []byte("state"),
},
},
{
Kind: &btpb.Value_BytesValue{
BytesValue: []byte("WA"),
},
},
},
},
},
},
},
colFamInfo: {
{
Kind: &btpb.Value_ArrayValue{
ArrayValue: &btpb.ArrayValue{
Values: []*btpb.Value{
{
Kind: &btpb.Value_BytesValue{
BytesValue: []byte("greeting"),
},
},
{
Kind: &btpb.Value_BytesValue{
BytesValue: []byte("Hey there"),
},
},
},
},
},
},
},
}
type recvMsgResp struct {
results *btpb.ExecuteQueryResponse_Results
sleep *time.Duration
err error
}
type prepareQueryResp struct {
resp *btpb.PrepareQueryResponse
err error
}
func newPrepareQueryResp(preparedQuery string, colFams ...string) prepareQueryResp {
bytesType := &btpb.Type{
Kind: &btpb.Type_BytesType{
BytesType: &btpb.Type_Bytes{},
},
}
columns := []*btpb.ColumnMetadata{
{
Name: "_key",
Type: bytesType,
},
}
for _, cf := range colFams {
columns = append(columns, &btpb.ColumnMetadata{
Name: cf,
Type: &btpb.Type{
Kind: &btpb.Type_MapType{
MapType: &btpb.Type_Map{
KeyType: bytesType,
ValueType: bytesType,
},
},
},
})
}
return prepareQueryResp{
resp: &btpb.PrepareQueryResponse{
PreparedQuery: []byte(preparedQuery),
Metadata: &btpb.ResultSetMetadata{
Schema: &btpb.ResultSetMetadata_ProtoSchema{
ProtoSchema: &btpb.ProtoSchema{
Columns: columns,
},
},
},
},
}
}
func newRecvMsgResp(reset bool, protoRows, checksumProtoRows *btpb.ProtoRows, blockTime *time.Duration) recvMsgResp {
marshalled, _ := proto.Marshal(protoRows)
var checksum *uint32
if checksumProtoRows != nil {
marshalledCk, _ := proto.Marshal(checksumProtoRows)
checksum = ptr(crc32.Checksum(marshalledCk, crc32cTable))
}
return recvMsgResp{
results: &btpb.ExecuteQueryResponse_Results{
Results: &btpb.PartialResultSet{
PartialRows: &btpb.PartialResultSet_ProtoRowsBatch{
ProtoRowsBatch: &btpb.ProtoRowsBatch{
BatchData: marshalled,
},
},
BatchChecksum: checksum,
Reset_: reset,
},
},
sleep: blockTime,
}
}
func newExecQueryRespResumeToken() recvMsgResp {
return recvMsgResp{
results: &btpb.ExecuteQueryResponse_Results{
Results: &btpb.PartialResultSet{
ResumeToken: []byte("resume-token"),
},
},
}
}
func newExecQueryRespFullBatch(reset bool, blockTime *time.Duration, colFams []string) recvMsgResp {
protoRows := &btpb.ProtoRows{
Values: newProtoRowValuesWithKey(colFams...),
}
return newRecvMsgResp(reset, protoRows, protoRows, blockTime)
}
func newExecQueryRespPartialBatchFirstHalf(reset bool, blockTime *time.Duration, colFams []string) recvMsgResp {
protoRows := &btpb.ProtoRows{
Values: newProtoRowValuesWithKey(colFams...),
}
return newRecvMsgResp(reset, protoRows, nil, blockTime)
}
func newExecQueryRespPartialBatchSecondHalf(reset bool, blockTime *time.Duration, colFamsFirstHalf, colFamsSecondHalf []string) recvMsgResp {
protoRows := &btpb.ProtoRows{
Values: newProtoRowValues(colFamsSecondHalf...),
}
checksumProtoRows := &btpb.ProtoRows{
Values: append(newProtoRowValuesWithKey(colFamsFirstHalf...), newProtoRowValues(colFamsSecondHalf...)...),
}
return newRecvMsgResp(reset, protoRows, checksumProtoRows, blockTime)
}
func newProtoRowValuesWithKey(colFams ...string) []*btpb.Value {
values := []*btpb.Value{
{
Kind: &btpb.Value_BytesValue{
BytesValue: []byte("row-01"),
},
},
}
values = append(values, newProtoRowValues(colFams...)...)
return values
}
func newProtoRowValues(colFams ...string) []*btpb.Value {
values := []*btpb.Value{}
for _, cf := range colFams {
values = append(values, &btpb.Value{
Kind: &btpb.Value_ArrayValue{
ArrayValue: &btpb.ArrayValue{
Values: cfToValues[cf],
},
},
})
}
return values
}
// wrappedClientStream wraps around the embedded grpc.ClientStream, and intercepts the RecvMsg and
// SendMsg method call.
type wrappedClientStream struct {
recvMsgCount *int
reqRecorder *[]*btpb.ExecuteQueryRequest
respPtrs *[]recvMsgResp
grpc.ClientStream
}
func (w *wrappedClientStream) RecvMsg(m any) error {
defer func() { *w.recvMsgCount++ }()
err := w.ClientStream.RecvMsg(m)
resp, ok := m.(*btpb.ExecuteQueryResponse)
if !ok {
return err
}
resps := *w.respPtrs
if resps[*w.recvMsgCount].sleep != nil {
time.Sleep(*resps[*w.recvMsgCount].sleep)
}
resp.Response = resps[*w.recvMsgCount].results
return resps[*w.recvMsgCount].err
}
func (w *wrappedClientStream) SendMsg(m any) error {
execReq, _ := m.(*btpb.ExecuteQueryRequest)
*w.reqRecorder = append(*w.reqRecorder, execReq)
return w.ClientStream.SendMsg(m)
}
func newWrappedClientStream(s grpc.ClientStream, recvMsgCount *int,
reqPtrs *[]*btpb.ExecuteQueryRequest, respPtrs *[]recvMsgResp) grpc.ClientStream {
return &wrappedClientStream{
ClientStream: s,
recvMsgCount: recvMsgCount,
reqRecorder: reqPtrs,
respPtrs: respPtrs,
}
}
func newStreamClientInterceptor(recvMsgCount *int, reqPtrs *[]*btpb.ExecuteQueryRequest, respPtrs *[]recvMsgResp) func(
ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string,
streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string,
streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
s, err := streamer(ctx, desc, cc, method, opts...)
if err != nil {
return nil, err
}
return newWrappedClientStream(s, recvMsgCount, reqPtrs, respPtrs), nil
}
}
func newUnaryClientInterceptor(prepReqCount *int, respPtrs *[]prepareQueryResp, ttl *time.Duration) func(
ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
return func(ctx context.Context, method string, req, reply interface{},
cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
err := invoker(ctx, method, req, reply, cc, opts...)
defer func() { *prepReqCount++ }()
_, isPrepReq := req.(*btpb.PrepareQueryRequest)
if !isPrepReq || (err != nil && !strings.Contains(err.Error(), emulatorUnsupported)) {
return err
}
resps := *respPtrs
currErr := resps[*prepReqCount].err
if currErr != nil {
return currErr
}
pqr, _ := reply.(*btpb.PrepareQueryResponse)
pqr.PreparedQuery = resps[*prepReqCount].resp.PreparedQuery
pqr.ValidUntil = timestamppb.New(time.Now().Add(*ttl))
pqr.Metadata = resps[*prepReqCount].resp.Metadata
return nil
}
}