blob: 3ce36814551a2885718c1f279ca5ff9d5b81e5ae [file] [log] [blame]
/*
Copyright 2017 Google LLC
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package spanner
import (
"context"
"encoding/json"
"errors"
"flag"
"fmt"
"log"
"math"
"math/big"
"os"
"os/exec"
"reflect"
"regexp"
"strconv"
"strings"
"sync"
"testing"
"time"
"cloud.google.com/go/civil"
"cloud.google.com/go/internal/testutil"
"cloud.google.com/go/internal/uid"
database "cloud.google.com/go/spanner/admin/database/apiv1"
adminpb "cloud.google.com/go/spanner/admin/database/apiv1/databasepb"
instance "cloud.google.com/go/spanner/admin/instance/apiv1"
"cloud.google.com/go/spanner/admin/instance/apiv1/instancepb"
v1 "cloud.google.com/go/spanner/apiv1"
sppb "cloud.google.com/go/spanner/apiv1/spannerpb"
"cloud.google.com/go/spanner/internal"
"github.com/stretchr/testify/require"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
"google.golang.org/api/iterator"
"google.golang.org/api/option"
"google.golang.org/api/option/internaloption"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/status"
)
const (
directPathIPV6Prefix = "[2001:4860:8040"
directPathIPV4Prefix = "34.126"
singerDDLStatements = "SINGER_DDL_STATEMENTS"
simpleDDLStatements = "SIMPLE_DDL_STATEMENTS"
readDDLStatements = "READ_DDL_STATEMENTS"
backupDDLStatements = "BACKUP_DDL_STATEMENTS"
testTableDDLStatements = "TEST_TABLE_DDL_STATEMENTS"
fkdcDDLStatements = "FKDC_DDL_STATEMENTS"
testTableBitReversedSeqStatements = "TEST_TABLE_BIT_REVERSED_SEQUENCE_STATEMENTS"
)
var (
// testProjectID specifies the project used for testing. It can be changed
// by setting environment variable GCLOUD_TESTS_GOLANG_PROJECT_ID.
testProjectID = testutil.ProjID()
// testDialect specifies the dialect used for testing.
testDialect adminpb.DatabaseDialect
// spannerHost specifies the spanner API host used for testing. It can be changed
// by setting the environment variable GCLOUD_TESTS_GOLANG_SPANNER_HOST
spannerHost = getSpannerHost()
// instanceConfig specifies the instance config used to create an instance for testing.
// It can be changed by setting the environment variable
// GCLOUD_TESTS_GOLANG_SPANNER_INSTANCE_CONFIG.
instanceConfig = getInstanceConfig()
dbNameSpace = uid.NewSpace("gotest", &uid.Options{Sep: '_', Short: true})
instanceNameSpace = uid.NewSpace("gotest", &uid.Options{Sep: '-', Short: true})
backupIDSpace = uid.NewSpace("gotest", &uid.Options{Sep: '_', Short: true})
testInstanceID = instanceNameSpace.New()
testTable = "TestTable"
testTableIndex = "TestTableByValue"
testTableColumns = []string{"Key", "StringValue"}
databaseAdmin *database.DatabaseAdminClient
instanceAdmin *instance.InstanceAdminClient
dpConfig directPathTestConfig
peerInfo *peer.Peer
singerDBPGStatements = []string{
`CREATE TABLE Singers (
SingerId INT8 NOT NULL,
FirstName VARCHAR(1024),
LastName VARCHAR(1024),
SingerInfo BYTEA,
numeric NUMERIC,
float8 FLOAT8,
PRIMARY KEY(SingerId)
)`,
`CREATE INDEX SingerByName ON Singers(FirstName, LastName)`,
`CREATE TABLE Accounts (
AccountId BIGINT NOT NULL,
Nickname VARCHAR(100),
Balance BIGINT NOT NULL,
PRIMARY KEY(AccountId)
)`,
`CREATE INDEX AccountByNickname ON Accounts(Nickname)`,
`CREATE TABLE Types (
RowID BIGINT PRIMARY KEY,
String VARCHAR,
Bytes BYTEA,
Int64a BIGINT,
Bool BOOL,
Float64 DOUBLE PRECISION,
Numeric NUMERIC,
JSONB jsonb
)`,
}
singerDBStatements = []string{
`CREATE TABLE Singers (
SingerId INT64 NOT NULL,
FirstName STRING(1024),
LastName STRING(1024),
SingerInfo BYTES(MAX)
) PRIMARY KEY (SingerId)`,
`CREATE INDEX SingerByName ON Singers(FirstName, LastName)`,
`CREATE TABLE Accounts (
AccountId INT64 NOT NULL,
Nickname STRING(100),
Balance INT64 NOT NULL,
) PRIMARY KEY (AccountId)`,
`CREATE INDEX AccountByNickname ON Accounts(Nickname) STORING (Balance)`,
`CREATE TABLE Types (
RowID INT64 NOT NULL,
String STRING(MAX),
StringArray ARRAY<STRING(MAX)>,
Bytes BYTES(MAX),
BytesArray ARRAY<BYTES(MAX)>,
Int64a INT64,
Int64Array ARRAY<INT64>,
Bool BOOL,
BoolArray ARRAY<BOOL>,
Float64 FLOAT64,
Float64Array ARRAY<FLOAT64>,
Date DATE,
DateArray ARRAY<DATE>,
Timestamp TIMESTAMP,
TimestampArray ARRAY<TIMESTAMP>,
Numeric NUMERIC,
NumericArray ARRAY<NUMERIC>
) PRIMARY KEY (RowID)`,
}
readDBStatements = []string{
`CREATE TABLE TestTable (
Key STRING(MAX) NOT NULL,
StringValue STRING(MAX)
) PRIMARY KEY (Key)`,
`CREATE INDEX TestTableByValue ON TestTable(StringValue)`,
`CREATE INDEX TestTableByValueDesc ON TestTable(StringValue DESC)`,
}
readDBPGStatements = []string{
`CREATE TABLE TestTable (
Key VARCHAR PRIMARY KEY,
StringValue VARCHAR
)`,
`CREATE INDEX TestTableByValue ON TestTable(StringValue)`,
`CREATE INDEX TestTableByValueDesc ON TestTable(StringValue DESC)`,
}
simpleDBStatements = []string{
`CREATE TABLE test (
a STRING(1024),
b STRING(1024),
) PRIMARY KEY (a)`,
}
simpleDBPGStatements = []string{
`CREATE TABLE test (
a VARCHAR(1024) PRIMARY KEY,
b VARCHAR(1024)
)`,
}
simpleDBTableColumns = []string{"a", "b"}
ctsDBStatements = []string{
`CREATE TABLE TestTable (
Key STRING(MAX) NOT NULL,
Ts TIMESTAMP OPTIONS (allow_commit_timestamp = true),
) PRIMARY KEY (Key)`,
}
backupDBStatements = []string{
`CREATE TABLE Singers (
SingerId INT64 NOT NULL,
FirstName STRING(1024),
LastName STRING(1024),
SingerInfo BYTES(MAX)
) PRIMARY KEY (SingerId)`,
`CREATE INDEX SingerByName ON Singers(FirstName, LastName)`,
`CREATE TABLE Accounts (
AccountId INT64 NOT NULL,
Nickname STRING(100),
Balance INT64 NOT NULL,
) PRIMARY KEY (AccountId)`,
`CREATE INDEX AccountByNickname ON Accounts(Nickname) STORING (Balance)`,
`CREATE TABLE Types (
RowID INT64 NOT NULL,
String STRING(MAX),
StringArray ARRAY<STRING(MAX)>,
Bytes BYTES(MAX),
BytesArray ARRAY<BYTES(MAX)>,
Int64a INT64,
Int64Array ARRAY<INT64>,
Bool BOOL,
BoolArray ARRAY<BOOL>,
Float64 FLOAT64,
Float64Array ARRAY<FLOAT64>,
Date DATE,
DateArray ARRAY<DATE>,
Timestamp TIMESTAMP,
TimestampArray ARRAY<TIMESTAMP>,
Numeric NUMERIC,
NumericArray ARRAY<NUMERIC>
) PRIMARY KEY (RowID)`,
}
backupDBPGStatements = []string{
`CREATE TABLE Singers (
SingerId BIGINT PRIMARY KEY,
FirstName VARCHAR(1024),
LastName VARCHAR(1024),
SingerInfo BYTEA
)`,
`CREATE INDEX SingerByName ON Singers(FirstName, LastName)`,
`CREATE TABLE Accounts (
AccountId BIGINT PRIMARY KEY,
Nickname VARCHAR(100),
Balance BIGINT NOT NULL
)`,
`CREATE INDEX AccountByNickname ON Accounts(Nickname)`,
`CREATE TABLE Types (
RowID BIGINT PRIMARY KEY,
String VARCHAR,
Bytes BYTEA,
Int64a BIGINT,
Bool BOOL,
Float64 DOUBLE PRECISION,
Numeric NUMERIC
)`,
}
fkdcDBStatements = []string{
`CREATE TABLE Customers (
CustomerId INT64 NOT NULL,
CustomerName STRING(62) NOT NULL,
) PRIMARY KEY (CustomerId)`,
`CREATE TABLE ShoppingCarts (
CartId INT64 NOT NULL,
CustomerId INT64 NOT NULL,
CustomerName STRING(62) NOT NULL,
CONSTRAINT FKShoppingCartsCustomerId FOREIGN KEY (CustomerId)
REFERENCES Customers (CustomerId) ON DELETE CASCADE
) PRIMARY KEY (CartId)`,
}
fkdcDBPGStatements = []string{
`CREATE TABLE Customers (
CustomerId BIGINT,
CustomerName VARCHAR(62) NOT NULL,
PRIMARY KEY (CustomerId))`,
`CREATE TABLE ShoppingCarts (
CartId BIGINT,
CustomerId BIGINT NOT NULL,
CustomerName VARCHAR(62) NOT NULL,
CONSTRAINT "FKShoppingCartsCustomerId" FOREIGN KEY (CustomerId)
REFERENCES Customers (CustomerId) ON DELETE CASCADE,
PRIMARY KEY (CartId))`,
}
bitReverseSeqDBStatments = []string{
`CREATE SEQUENCE seqT OPTIONS (sequence_kind = "bit_reversed_positive")`,
`CREATE TABLE T (
id INT64 DEFAULT (GET_NEXT_SEQUENCE_VALUE(Sequence seqT)),
value INT64,
counter INT64 DEFAULT (GET_INTERNAL_SEQUENCE_STATE(Sequence seqT)),
br_id INT64 AS (BIT_REVERSE(id, true)) STORED,
CONSTRAINT id_gt_0 CHECK (id > 0),
CONSTRAINT counter_gt_br_id CHECK (counter >= br_id),
CONSTRAINT br_id_true CHECK (id = BIT_REVERSE(br_id, true)),
) PRIMARY KEY (id)`,
}
bitReverseSeqDBPGStatments = []string{
`CREATE SEQUENCE seqT BIT_REVERSED_POSITIVE`,
`CREATE TABLE T (
id BIGINT DEFAULT nextval('seqT'),
value BIGINT,
counter BIGINT DEFAULT spanner.get_internal_sequence_state('seqT'),
br_id bigint GENERATED ALWAYS AS (spanner.bit_reverse(id, true)) STORED,
CONSTRAINT id_gt_0 CHECK (id > 0),
CONSTRAINT counter_gt_br_id CHECK (counter >= br_id),
CONSTRAINT br_id_true CHECK (id = spanner.bit_reverse(br_id, true)),
PRIMARY KEY (id)
)`,
}
statements = map[adminpb.DatabaseDialect]map[string][]string{
adminpb.DatabaseDialect_GOOGLE_STANDARD_SQL: {
singerDDLStatements: singerDBStatements,
simpleDDLStatements: simpleDBStatements,
readDDLStatements: readDBStatements,
backupDDLStatements: backupDBStatements,
testTableDDLStatements: readDBStatements,
fkdcDDLStatements: fkdcDBStatements,
testTableBitReversedSeqStatements: bitReverseSeqDBStatments,
},
adminpb.DatabaseDialect_POSTGRESQL: {
singerDDLStatements: singerDBPGStatements,
simpleDDLStatements: simpleDBPGStatements,
readDDLStatements: readDBPGStatements,
backupDDLStatements: backupDBPGStatements,
testTableDDLStatements: readDBPGStatements,
fkdcDDLStatements: fkdcDBPGStatements,
testTableBitReversedSeqStatements: bitReverseSeqDBPGStatments,
},
}
validInstancePattern = regexp.MustCompile("^projects/(?P<project>[^/]+)/instances/(?P<instance>[^/]+)$")
blackholeDpv6Cmd string
blackholeDpv4Cmd string
allowDpv6Cmd string
allowDpv4Cmd string
)
func init() {
flag.BoolVar(&dpConfig.attemptDirectPath, "it.attempt-directpath", false, "DirectPath integration test flag")
flag.BoolVar(&dpConfig.directPathIPv4Only, "it.directpath-ipv4-only", false, "Run DirectPath on a IPv4-only VM")
peerInfo = &peer.Peer{}
// Use sysctl or iptables to blackhole DirectPath IP for fallback tests.
flag.StringVar(&blackholeDpv6Cmd, "it.blackhole-dpv6-cmd", "", "Command to make LB and backend addresses blackholed over dpv6")
flag.StringVar(&blackholeDpv4Cmd, "it.blackhole-dpv4-cmd", "", "Command to make LB and backend addresses blackholed over dpv4")
flag.StringVar(&allowDpv6Cmd, "it.allow-dpv6-cmd", "", "Command to make LB and backend addresses allowed over dpv6")
flag.StringVar(&allowDpv4Cmd, "it.allow-dpv4-cmd", "", "Command to make LB and backend addresses allowed over dpv4")
}
type directPathTestConfig struct {
attemptDirectPath bool
directPathIPv4Only bool
}
func parseInstanceName(inst string) (project, instance string, err error) {
matches := validInstancePattern.FindStringSubmatch(inst)
if len(matches) == 0 {
return "", "", fmt.Errorf("Failed to parse instance name from %q according to pattern %q",
inst, validInstancePattern.String())
}
return matches[1], matches[2], nil
}
func getSpannerHost() string {
return os.Getenv("GCLOUD_TESTS_GOLANG_SPANNER_HOST")
}
func getInstanceConfig() string {
return os.Getenv("GCLOUD_TESTS_GOLANG_SPANNER_INSTANCE_CONFIG")
}
const (
str1 = "alice"
str2 = "a@example.com"
)
func TestMain(m *testing.M) {
cleanup := initIntegrationTests()
defer cleanup()
for _, dialect := range []adminpb.DatabaseDialect{adminpb.DatabaseDialect_GOOGLE_STANDARD_SQL, adminpb.DatabaseDialect_POSTGRESQL} {
if isEmulatorEnvSet() && dialect == adminpb.DatabaseDialect_POSTGRESQL {
// PG tests are not supported in emulator
continue
}
testDialect = dialect
res := m.Run()
if res != 0 {
cleanup()
os.Exit(res)
}
}
}
var grpcHeaderChecker = testutil.DefaultHeadersEnforcer()
func initIntegrationTests() (cleanup func()) {
ctx := context.Background()
flag.Parse() // Needed for testing.Short().
noop := func() {}
if testing.Short() {
log.Println("Integration tests skipped in -short mode.")
return noop
}
if testProjectID == "" {
log.Println("Integration tests skipped: GCLOUD_TESTS_GOLANG_PROJECT_ID is missing")
return noop
}
opts := grpcHeaderChecker.CallOptions()
if spannerHost != "" {
opts = append(opts, option.WithEndpoint(spannerHost))
}
if dpConfig.attemptDirectPath {
opts = append(opts, option.WithGRPCDialOption(grpc.WithDefaultCallOptions(grpc.Peer(peerInfo))))
}
var err error
// Create InstanceAdmin and DatabaseAdmin clients.
instanceAdmin, err = instance.NewInstanceAdminClient(ctx, opts...)
if err != nil {
log.Fatalf("cannot create instance databaseAdmin client: %v", err)
}
databaseAdmin, err = database.NewDatabaseAdminClient(ctx, opts...)
if err != nil {
log.Fatalf("cannot create databaseAdmin client: %v", err)
}
var configName string
if instanceConfig != "" {
configName = fmt.Sprintf("projects/%s/instanceConfigs/%s", testProjectID, instanceConfig)
} else {
// Get the list of supported instance configs for the project that is used
// for the integration tests. The supported instance configs can differ per
// project. The integration tests will use the first instance config that
// is returned by Cloud Spanner. This will normally be the regional config
// that is physically the closest to where the request is coming from.
configIterator := instanceAdmin.ListInstanceConfigs(ctx, &instancepb.ListInstanceConfigsRequest{
Parent: fmt.Sprintf("projects/%s", testProjectID),
})
config, err := configIterator.Next()
if err != nil {
log.Fatalf("Cannot get any instance configurations.\nPlease make sure the Cloud Spanner API is enabled for the test project.\nGet error: %v", err)
}
configName = config.Name
}
log.Printf("Running test by using the instance config: %s\n", configName)
// First clean up any old test instances before we start the actual testing
// as these might cause this test run to fail.
cleanupInstances()
// Create a test instance to use for this test run.
op, err := instanceAdmin.CreateInstance(ctx, &instancepb.CreateInstanceRequest{
Parent: fmt.Sprintf("projects/%s", testProjectID),
InstanceId: testInstanceID,
Instance: &instancepb.Instance{
Config: configName,
DisplayName: testInstanceID,
NodeCount: 1,
},
})
if err != nil {
log.Fatalf("could not create instance with id %s: %v", fmt.Sprintf("projects/%s/instances/%s", testProjectID, testInstanceID), err)
}
// Wait for the instance creation to finish.
i, err := op.Wait(ctx)
if err != nil {
log.Fatalf("waiting for instance creation to finish failed: %v", err)
}
if i.State != instancepb.Instance_READY {
log.Printf("instance state is not READY, it might be that the test instance will cause problems during tests. Got state %v\n", i.State)
}
return func() {
// Delete this test instance.
instanceName := fmt.Sprintf("projects/%v/instances/%v", testProjectID, testInstanceID)
deleteInstanceAndBackups(ctx, instanceName)
// Delete other test instances that may be lingering around.
cleanupInstances()
databaseAdmin.Close()
instanceAdmin.Close()
}
}
func TestIntegration_InitSessionPool(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
// Set up an empty testing environment.
client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, []string{})
defer cleanup()
sp := client.idleSessions
sp.mu.Lock()
want := sp.MinOpened
sp.mu.Unlock()
var numOpened int
loop:
for {
select {
case <-ctx.Done():
t.Fatalf("timed out, got %d session(s), want %d", numOpened, want)
default:
sp.mu.Lock()
numOpened = sp.idleList.Len()
sp.mu.Unlock()
if uint64(numOpened) == want {
break loop
}
}
}
// Delete all sessions in the pool on the backend and then try to execute a
// simple query. The 'Session not found' error should cause an automatic
// retry of the read-only transaction.
sp.mu.Lock()
s := sp.idleList.Front()
for {
if s == nil {
break
}
// This will delete the session on the backend without removing it
// from the pool.
s.Value.(*session).delete(context.Background())
s = s.Next()
}
sp.mu.Unlock()
sql := "SELECT 1, 'FOO', 'BAR'"
tx := client.ReadOnlyTransaction()
defer tx.Close()
iter := tx.Query(context.Background(), NewStatement(sql))
rows, err := readAll(iter)
if err != nil {
t.Fatalf("Unexpected error for query %q: %v", sql, err)
}
if got, want := len(rows), 1; got != want {
t.Fatalf("Row count mismatch for query %q\nGot: %v\nWant: %v", sql, got, want)
}
if got, want := len(rows[0]), 3; got != want {
t.Fatalf("Column count mismatch for query %q\nGot: %v\nWant: %v", sql, got, want)
}
if got, want := rows[0][0].(int64), int64(1); got != want {
t.Fatalf("Column value mismatch for query %q\nGot: %v\nWant: %v", sql, got, want)
}
}
// Test SingleUse transaction.
func TestIntegration_SingleUse(t *testing.T) {
t.Parallel()
skipEmulatorTestForPG(t)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
// Set up testing environment.
client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, statements[testDialect][singerDDLStatements])
defer cleanup()
writes := []struct {
row []interface{}
ts time.Time
}{
{row: []interface{}{1, "Marc", "Foo"}},
{row: []interface{}{2, "Tars", "Bar"}},
{row: []interface{}{3, "Alpha", "Beta"}},
{row: []interface{}{4, "Last", "End"}},
}
// Try to write four rows through the Apply API.
for i, w := range writes {
var err error
m := InsertOrUpdate("Singers",
[]string{"SingerId", "FirstName", "LastName"},
w.row)
if writes[i].ts, err = client.Apply(ctx, []*Mutation{m}, ApplyAtLeastOnce()); err != nil {
t.Fatal(err)
}
verifyDirectPathRemoteAddress(t)
}
// Calculate time difference between Cloud Spanner server and localhost to
// use to determine the exact staleness value to use.
timeDiff := maxDuration(time.Now().Sub(writes[0].ts), 0)
// Test reading rows with different timestamp bounds.
for i, test := range []struct {
name string
want [][]interface{}
tb TimestampBound
checkTs func(time.Time) error
skipForPG bool
}{
{
name: "strong",
want: [][]interface{}{{int64(1), "Marc", "Foo"}, {int64(3), "Alpha", "Beta"}, {int64(4), "Last", "End"}},
tb: StrongRead(),
checkTs: func(ts time.Time) error {
// writes[3] is the last write, all subsequent strong read
// should have a timestamp larger than that.
if ts.Before(writes[3].ts) {
return fmt.Errorf("read got timestamp %v, want it to be no earlier than %v", ts, writes[3].ts)
}
return nil
},
},
{
name: "min_read_timestamp",
want: [][]interface{}{{int64(1), "Marc", "Foo"}, {int64(3), "Alpha", "Beta"}, {int64(4), "Last", "End"}},
tb: MinReadTimestamp(writes[3].ts),
checkTs: func(ts time.Time) error {
if ts.Before(writes[3].ts) {
return fmt.Errorf("read got timestamp %v, want it to be no earlier than %v", ts, writes[3].ts)
}
return nil
},
},
{
name: "max_staleness",
want: [][]interface{}{{int64(1), "Marc", "Foo"}, {int64(3), "Alpha", "Beta"}, {int64(4), "Last", "End"}},
tb: MaxStaleness(time.Second),
checkTs: func(ts time.Time) error {
if ts.Before(writes[3].ts) {
return fmt.Errorf("read got timestamp %v, want it to be no earlier than %v", ts, writes[3].ts)
}
return nil
},
},
{
name: "read_timestamp",
want: [][]interface{}{{int64(1), "Marc", "Foo"}, {int64(3), "Alpha", "Beta"}},
tb: ReadTimestamp(writes[2].ts),
checkTs: func(ts time.Time) error {
if ts != writes[2].ts {
return fmt.Errorf("read got timestamp %v, want %v", ts, writes[2].ts)
}
return nil
},
},
{
name: "exact_staleness",
// PG query with exact_staleness returns error code
// "InvalidArgument", desc = "[ERROR] relation \"singers\" does not exist
skipForPG: true,
want: nil,
// Specify a staleness which should be already before this test.
tb: ExactStaleness(time.Now().Sub(writes[0].ts) + timeDiff + 30*time.Second),
checkTs: func(ts time.Time) error {
if !ts.Before(writes[0].ts) {
return fmt.Errorf("read got timestamp %v, want it to be earlier than %v", ts, writes[0].ts)
}
return nil
},
},
} {
t.Run(test.name, func(t *testing.T) {
singersQuery := "SELECT SingerId, FirstName, LastName FROM Singers WHERE SingerId IN (@p1, @p2, @p3) ORDER BY SingerId"
if testDialect == adminpb.DatabaseDialect_POSTGRESQL {
if test.skipForPG {
t.Skip("Skipping testing of unsupported tests in Postgres dialect.")
}
singersQuery = "SELECT SingerId, FirstName, LastName FROM Singers WHERE SingerId = $1 OR SingerId = $2 OR SingerId = $3 ORDER BY SingerId"
}
// SingleUse.Query
su := client.Single().WithTimestampBound(test.tb)
got, err := readAll(su.Query(
ctx,
Statement{
singersQuery,
map[string]interface{}{"p1": int64(1), "p2": int64(3), "p3": int64(4)},
}))
if err != nil {
t.Fatalf("%d: SingleUse.Query returns error %v, want nil", i, err)
}
verifyDirectPathRemoteAddress(t)
rts, err := su.Timestamp()
if err != nil {
t.Fatalf("%d: SingleUse.Query doesn't return a timestamp, error: %v", i, err)
}
if err := test.checkTs(rts); err != nil {
t.Fatalf("%d: SingleUse.Query doesn't return expected timestamp: %v", i, err)
}
if !testEqual(got, test.want) {
t.Fatalf("%d: got unexpected result from SingleUse.Query: %v, want %v", i, got, test.want)
}
// SingleUse.Read
su = client.Single().WithTimestampBound(test.tb)
got, err = readAll(su.Read(ctx, "Singers", KeySets(Key{1}, Key{3}, Key{4}), []string{"SingerId", "FirstName", "LastName"}))
if err != nil {
t.Fatalf("%d: SingleUse.Read returns error %v, want nil", i, err)
}
verifyDirectPathRemoteAddress(t)
rts, err = su.Timestamp()
if err != nil {
t.Fatalf("%d: SingleUse.Read doesn't return a timestamp, error: %v", i, err)
}
if err := test.checkTs(rts); err != nil {
t.Fatalf("%d: SingleUse.Read doesn't return expected timestamp: %v", i, err)
}
if !testEqual(got, test.want) {
t.Fatalf("%d: got unexpected result from SingleUse.Read: %v, want %v", i, got, test.want)
}
// SingleUse.ReadRow
got = nil
for _, k := range []Key{{1}, {3}, {4}} {
su = client.Single().WithTimestampBound(test.tb)
r, err := su.ReadRow(ctx, "Singers", k, []string{"SingerId", "FirstName", "LastName"})
if err != nil {
continue
}
verifyDirectPathRemoteAddress(t)
v, err := rowToValues(r)
if err != nil {
continue
}
got = append(got, v)
rts, err = su.Timestamp()
if err != nil {
t.Fatalf("%d: SingleUse.ReadRow(%v) doesn't return a timestamp, error: %v", i, k, err)
}
if err := test.checkTs(rts); err != nil {
t.Fatalf("%d: SingleUse.ReadRow(%v) doesn't return expected timestamp: %v", i, k, err)
}
}
if !testEqual(got, test.want) {
t.Fatalf("%d: got unexpected results from SingleUse.ReadRow: %v, want %v", i, got, test.want)
}
// SingleUse.ReadUsingIndex
su = client.Single().WithTimestampBound(test.tb)
got, err = readAll(su.ReadUsingIndex(ctx, "Singers", "SingerByName", KeySets(Key{"Marc", "Foo"}, Key{"Alpha", "Beta"}, Key{"Last", "End"}), []string{"SingerId", "FirstName", "LastName"}))
if err != nil {
t.Fatalf("%d: SingleUse.ReadUsingIndex returns error %v, want nil", i, err)
}
verifyDirectPathRemoteAddress(t)
// The results from ReadUsingIndex is sorted by the index rather than primary key.
if len(got) != len(test.want) {
t.Fatalf("%d: got unexpected result from SingleUse.ReadUsingIndex: %v, want %v", i, got, test.want)
}
for j, g := range got {
if j > 0 {
prev := got[j-1][1].(string) + got[j-1][2].(string)
curr := got[j][1].(string) + got[j][2].(string)
if strings.Compare(prev, curr) > 0 {
t.Fatalf("%d: SingleUse.ReadUsingIndex fails to order rows by index keys, %v should be after %v", i, got[j-1], got[j])
}
}
found := false
for _, w := range test.want {
if testEqual(g, w) {
found = true
}
}
if !found {
t.Fatalf("%d: got unexpected result from SingleUse.ReadUsingIndex: %v, want %v", i, got, test.want)
}
}
rts, err = su.Timestamp()
if err != nil {
t.Fatalf("%d: SingleUse.ReadUsingIndex doesn't return a timestamp, error: %v", i, err)
}
if err := test.checkTs(rts); err != nil {
t.Fatalf("%d: SingleUse.ReadUsingIndex doesn't return expected timestamp: %v", i, err)
}
// SingleUse.ReadRowUsingIndex
got = nil
for _, k := range []Key{{"Marc", "Foo"}, {"Alpha", "Beta"}, {"Last", "End"}} {
su = client.Single().WithTimestampBound(test.tb)
r, err := su.ReadRowUsingIndex(ctx, "Singers", "SingerByName", k, []string{"SingerId", "FirstName", "LastName"})
if err != nil {
continue
}
verifyDirectPathRemoteAddress(t)
v, err := rowToValues(r)
if err != nil {
continue
}
got = append(got, v)
rts, err = su.Timestamp()
if err != nil {
t.Fatalf("%d: SingleUse.ReadRowUsingIndex(%v) doesn't return a timestamp, error: %v", i, k, err)
}
if err := test.checkTs(rts); err != nil {
t.Fatalf("%d: SingleUse.ReadRowUsingIndex(%v) doesn't return expected timestamp: %v", i, k, err)
}
}
if !testEqual(got, test.want) {
t.Fatalf("%d: got unexpected results from SingleUse.ReadRowUsingIndex: %v, want %v", i, got, test.want)
}
})
}
}
// Test custom query options provided on query-level configuration.
func TestIntegration_SingleUse_WithQueryOptions(t *testing.T) {
skipEmulatorTest(t)
t.Parallel()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
// Set up testing environment.
client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, statements[testDialect][singerDDLStatements])
defer cleanup()
writes := []struct {
row []interface{}
ts time.Time
}{
{row: []interface{}{1, "Marc", "Foo"}},
{row: []interface{}{2, "Tars", "Bar"}},
{row: []interface{}{3, "Alpha", "Beta"}},
{row: []interface{}{4, "Last", "End"}},
}
// Try to write four rows through the Apply API.
for i, w := range writes {
var err error
m := InsertOrUpdate("Singers",
[]string{"SingerId", "FirstName", "LastName"},
w.row)
if writes[i].ts, err = client.Apply(ctx, []*Mutation{m}, ApplyAtLeastOnce()); err != nil {
t.Fatal(err)
}
}
singersQuery := "SELECT SingerId, FirstName, LastName FROM Singers WHERE SingerId IN (@p1, @p2, @p3)"
if testDialect == adminpb.DatabaseDialect_POSTGRESQL {
singersQuery = "SELECT SingerId, FirstName, LastName FROM Singers WHERE SingerId = $1 OR SingerId = $2 OR SingerId = $3"
}
qo := QueryOptions{Options: &sppb.ExecuteSqlRequest_QueryOptions{
OptimizerVersion: "1",
OptimizerStatisticsPackage: "latest",
}}
got, err := readAll(client.Single().QueryWithOptions(ctx, Statement{
singersQuery,
map[string]interface{}{"p1": int64(1), "p2": int64(3), "p3": int64(4)},
}, qo))
if err != nil {
t.Errorf("ReadOnlyTransaction.QueryWithOptions returns error %v, want nil", err)
}
want := [][]interface{}{{int64(1), "Marc", "Foo"}, {int64(3), "Alpha", "Beta"}, {int64(4), "Last", "End"}}
if !testEqual(got, want) {
t.Errorf("got unexpected result from ReadOnlyTransaction.QueryWithOptions: %v, want %v", got, want)
}
}
func TestIntegration_TransactionWasStartedInDifferentSession(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
// Set up testing environment.
client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, statements[testDialect][singerDDLStatements])
defer cleanup()
attempts := 0
_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, transaction *ReadWriteTransaction) error {
attempts++
if attempts == 1 {
deleteTestSession(ctx, t, transaction.sh.getID())
}
if _, err := readAll(transaction.Query(ctx, NewStatement("select * from singers"))); err != nil {
return err
}
return nil
})
if err != nil {
t.Fatal(err)
}
if g, w := attempts, 2; g != w {
t.Fatalf("attempts mismatch\nGot: %v\nWant: %v", g, w)
}
}
func deleteTestSession(ctx context.Context, t *testing.T, sessionName string) {
var opts []option.ClientOption
if emulatorAddr := os.Getenv("SPANNER_EMULATOR_HOST"); emulatorAddr != "" {
emulatorOpts := []option.ClientOption{
option.WithEndpoint(emulatorAddr),
option.WithGRPCDialOption(grpc.WithInsecure()),
option.WithoutAuthentication(),
internaloption.SkipDialSettingsValidation(),
}
opts = append(emulatorOpts, opts...)
}
gapic, err := v1.NewClient(ctx, opts...)
if err != nil {
t.Fatalf("could not create gapic client: %v", err)
}
defer gapic.Close()
if err := gapic.DeleteSession(ctx, &sppb.DeleteSessionRequest{Name: sessionName}); err != nil {
t.Fatal(err)
}
}
func TestIntegration_BatchWrite(t *testing.T) {
skipEmulatorTest(t)
t.Parallel()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
// Set up testing environment.
client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, statements[testDialect][singerDDLStatements])
defer cleanup()
writes := []struct {
row []interface{}
ts time.Time
}{
{row: []interface{}{1, "Marc", "Foo"}},
{row: []interface{}{2, "Tars", "Bar"}},
{row: []interface{}{3, "Alpha", "Beta"}},
{row: []interface{}{4, "Last", "End"}},
}
mgs := make([]*MutationGroup, len(writes))
// Try to write four rows through the BatchWrite API.
for i, w := range writes {
m := InsertOrUpdate("Singers",
[]string{"SingerId", "FirstName", "LastName"},
w.row)
ms := make([]*Mutation, 1)
ms[0] = m
mgs[i] = &MutationGroup{Mutations: ms}
}
// Records the mutation group indexes received in the response.
seen := make(map[int32]int32)
numMutationGroups := len(mgs)
validate := func(res *sppb.BatchWriteResponse) error {
if status := status.ErrorProto(res.GetStatus()); status != nil {
t.Fatalf("Invalid status: %v", status)
}
if ts := res.GetCommitTimestamp(); ts == nil {
t.Fatal("Invalid commit timestamp")
}
for _, idx := range res.GetIndexes() {
if idx >= 0 && idx < int32(numMutationGroups) {
seen[idx]++
} else {
t.Fatalf("Index %v out of range. Expected range [%v,%v]", idx, 0, numMutationGroups-1)
}
}
return nil
}
iter := client.BatchWrite(ctx, mgs)
if err := iter.Do(validate); err != nil {
t.Fatal(err)
}
// Validate that each mutation group index is seen exactly once.
if numMutationGroups != len(seen) {
t.Fatalf("Expected %v indexes, got %v indexes", numMutationGroups, len(seen))
}
for idx, ct := range seen {
if ct != 1 {
t.Fatalf("Index %v seen %v times instead of exactly once", idx, ct)
}
}
// Verify the writes by reading the database.
singersQuery := "SELECT SingerId, FirstName, LastName FROM Singers WHERE SingerId IN (@p1, @p2, @p3)"
if testDialect == adminpb.DatabaseDialect_POSTGRESQL {
singersQuery = "SELECT SingerId, FirstName, LastName FROM Singers WHERE SingerId = $1 OR SingerId = $2 OR SingerId = $3"
}
qo := QueryOptions{Options: &sppb.ExecuteSqlRequest_QueryOptions{
OptimizerVersion: "1",
OptimizerStatisticsPackage: "latest",
}}
got, err := readAll(client.Single().QueryWithOptions(ctx, Statement{
singersQuery,
map[string]interface{}{"p1": int64(1), "p2": int64(3), "p3": int64(4)},
}, qo))
if err != nil {
t.Errorf("ReadOnlyTransaction.QueryWithOptions returns error %v, want nil", err)
}
want := [][]interface{}{{int64(1), "Marc", "Foo"}, {int64(3), "Alpha", "Beta"}, {int64(4), "Last", "End"}}
if !testEqual(got, want) {
t.Errorf("got unexpected result from ReadOnlyTransaction.QueryWithOptions: %v, want %v", got, want)
}
}
func TestIntegration_SingleUse_ReadingWithLimit(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
// Set up testing environment.
client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, statements[testDialect][singerDDLStatements])
defer cleanup()
writes := []struct {
row []interface{}
ts time.Time
}{
{row: []interface{}{1, "Marc", "Foo"}},
{row: []interface{}{2, "Tars", "Bar"}},
{row: []interface{}{3, "Alpha", "Beta"}},
{row: []interface{}{4, "Last", "End"}},
}
// Try to write four rows through the Apply API.
for i, w := range writes {
var err error
m := InsertOrUpdate("Singers",
[]string{"SingerId", "FirstName", "LastName"},
w.row)
if writes[i].ts, err = client.Apply(ctx, []*Mutation{m}, ApplyAtLeastOnce()); err != nil {
t.Fatal(err)
}
}
su := client.Single()
const limit = 1
gotRows, err := readAll(su.ReadWithOptions(ctx, "Singers", KeySets(Key{1}, Key{3}, Key{4}),
[]string{"SingerId", "FirstName", "LastName"}, &ReadOptions{Limit: limit}))
if err != nil {
t.Errorf("SingleUse.ReadWithOptions returns error %v, want nil", err)
}
if got, want := len(gotRows), limit; got != want {
t.Errorf("got %d, want %d", got, want)
}
}
// Test ReadOnlyTransaction. The testsuite is mostly like SingleUse, except it
// also tests for a single timestamp across multiple reads.
func TestIntegration_ReadOnlyTransaction(t *testing.T) {
t.Parallel()
ctxTimeout := 5 * time.Minute
ctx, cancel := context.WithTimeout(context.Background(), ctxTimeout)
defer cancel()
// Set up testing environment.
client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, statements[testDialect][singerDDLStatements])
defer cleanup()
writes := []struct {
row []interface{}
ts time.Time
}{
{row: []interface{}{1, "Marc", "Foo"}},
{row: []interface{}{2, "Tars", "Bar"}},
{row: []interface{}{3, "Alpha", "Beta"}},
{row: []interface{}{4, "Last", "End"}},
}
// Try to write four rows through the Apply API.
for i, w := range writes {
var err error
m := InsertOrUpdate("Singers",
[]string{"SingerId", "FirstName", "LastName"},
w.row)
if writes[i].ts, err = client.Apply(ctx, []*Mutation{m}, ApplyAtLeastOnce()); err != nil {
t.Fatal(err)
}
}
// For testing timestamp bound staleness.
<-time.After(time.Second)
// Test reading rows with different timestamp bounds.
for i, test := range []struct {
want [][]interface{}
tb TimestampBound
checkTs func(time.Time) error
skipForPG bool
}{
// Note: min_read_timestamp and max_staleness are not supported by
// ReadOnlyTransaction. See API document for more details.
{
// strong
[][]interface{}{{int64(1), "Marc", "Foo"}, {int64(3), "Alpha", "Beta"}, {int64(4), "Last", "End"}},
StrongRead(),
func(ts time.Time) error {
if ts.Before(writes[3].ts) {
return fmt.Errorf("read got timestamp %v, want it to be no later than %v", ts, writes[3].ts)
}
return nil
},
false,
},
{
// read_timestamp
[][]interface{}{{int64(1), "Marc", "Foo"}, {int64(3), "Alpha", "Beta"}},
ReadTimestamp(writes[2].ts),
func(ts time.Time) error {
if ts != writes[2].ts {
return fmt.Errorf("read got timestamp %v, expect %v", ts, writes[2].ts)
}
return nil
},
false,
},
{
// exact_staleness
nil,
// Specify a staleness which should be already before this test.
ExactStaleness(ctxTimeout + 1),
func(ts time.Time) error {
if ts.After(writes[0].ts) {
return fmt.Errorf("read got timestamp %v, want it to be no earlier than %v", ts, writes[0].ts)
}
return nil
},
// PG query with exact_staleness returns Table not found error
true,
},
} {
// ReadOnlyTransaction.Query
ro := client.ReadOnlyTransaction().WithTimestampBound(test.tb)
singersQuery := "SELECT SingerId, FirstName, LastName FROM Singers WHERE SingerId IN (@p1, @p2, @p3) ORDER BY SingerId"
if testDialect == adminpb.DatabaseDialect_POSTGRESQL {
if test.skipForPG {
t.Skip("Skipping testing of unsupported tests in Postgres dialect.")
}
singersQuery = "SELECT SingerId, FirstName, LastName FROM Singers WHERE SingerId = $1 OR SingerId = $2 OR SingerId = $3 ORDER BY SingerId"
}
got, err := readAll(ro.Query(
ctx,
Statement{
singersQuery,
map[string]interface{}{"p1": int64(1), "p2": int64(3), "p3": int64(4)},
}))
if err != nil {
t.Errorf("%d: ReadOnlyTransaction.Query returns error %v, want nil", i, err)
}
if !testEqual(got, test.want) {
t.Errorf("%d: got unexpected result from ReadOnlyTransaction.Query: %v, want %v", i, got, test.want)
}
rts, err := ro.Timestamp()
if err != nil {
t.Errorf("%d: ReadOnlyTransaction.Query doesn't return a timestamp, error: %v", i, err)
}
if err := test.checkTs(rts); err != nil {
t.Errorf("%d: ReadOnlyTransaction.Query doesn't return expected timestamp: %v", i, err)
}
roTs := rts
// ReadOnlyTransaction.Read
got, err = readAll(ro.Read(ctx, "Singers", KeySets(Key{1}, Key{3}, Key{4}), []string{"SingerId", "FirstName", "LastName"}))
if err != nil {
t.Errorf("%d: ReadOnlyTransaction.Read returns error %v, want nil", i, err)
}
if !testEqual(got, test.want) {
t.Errorf("%d: got unexpected result from ReadOnlyTransaction.Read: %v, want %v", i, got, test.want)
}
rts, err = ro.Timestamp()
if err != nil {
t.Errorf("%d: ReadOnlyTransaction.Read doesn't return a timestamp, error: %v", i, err)
}
if err := test.checkTs(rts); err != nil {
t.Errorf("%d: ReadOnlyTransaction.Read doesn't return expected timestamp: %v", i, err)
}
if roTs != rts {
t.Errorf("%d: got two read timestamps: %v, %v, want ReadOnlyTransaction to return always the same read timestamp", i, roTs, rts)
}
// ReadOnlyTransaction.ReadRow
got = nil
for _, k := range []Key{{1}, {3}, {4}} {
r, err := ro.ReadRow(ctx, "Singers", k, []string{"SingerId", "FirstName", "LastName"})
if err != nil {
continue
}
v, err := rowToValues(r)
if err != nil {
continue
}
got = append(got, v)
rts, err = ro.Timestamp()
if err != nil {
t.Errorf("%d: ReadOnlyTransaction.ReadRow(%v) doesn't return a timestamp, error: %v", i, k, err)
}
if err := test.checkTs(rts); err != nil {
t.Errorf("%d: ReadOnlyTransaction.ReadRow(%v) doesn't return expected timestamp: %v", i, k, err)
}
if roTs != rts {
t.Errorf("%d: got two read timestamps: %v, %v, want ReadOnlyTransaction to return always the same read timestamp", i, roTs, rts)
}
}
if !testEqual(got, test.want) {
t.Errorf("%d: got unexpected results from ReadOnlyTransaction.ReadRow: %v, want %v", i, got, test.want)
}
// SingleUse.ReadUsingIndex
got, err = readAll(ro.ReadUsingIndex(ctx, "Singers", "SingerByName", KeySets(Key{"Marc", "Foo"}, Key{"Alpha", "Beta"}, Key{"Last", "End"}), []string{"SingerId", "FirstName", "LastName"}))
if err != nil {
t.Errorf("%d: ReadOnlyTransaction.ReadUsingIndex returns error %v, want nil", i, err)
}
// The results from ReadUsingIndex is sorted by the index rather than
// primary key.
if len(got) != len(test.want) {
t.Errorf("%d: got unexpected result from ReadOnlyTransaction.ReadUsingIndex: %v, want %v", i, got, test.want)
}
for j, g := range got {
if j > 0 {
prev := got[j-1][1].(string) + got[j-1][2].(string)
curr := got[j][1].(string) + got[j][2].(string)
if strings.Compare(prev, curr) > 0 {
t.Errorf("%d: ReadOnlyTransaction.ReadUsingIndex fails to order rows by index keys, %v should be after %v", i, got[j-1], got[j])
}
}
found := false
for _, w := range test.want {
if testEqual(g, w) {
found = true
}
}
if !found {
t.Errorf("%d: got unexpected result from ReadOnlyTransaction.ReadUsingIndex: %v, want %v", i, got, test.want)
break
}
}
rts, err = ro.Timestamp()
if err != nil {
t.Errorf("%d: ReadOnlyTransaction.ReadUsingIndex doesn't return a timestamp, error: %v", i, err)
}
if err := test.checkTs(rts); err != nil {
t.Errorf("%d: ReadOnlyTransaction.ReadUsingIndex doesn't return expected timestamp: %v", i, err)
}
if roTs != rts {
t.Errorf("%d: got two read timestamps: %v, %v, want ReadOnlyTransaction to return always the same read timestamp", i, roTs, rts)
}
// ReadOnlyTransaction.ReadRowUsingIndex
got = nil
for _, k := range []Key{{"Marc", "Foo"}, {"Alpha", "Beta"}, {"Last", "End"}} {
r, err := ro.ReadRowUsingIndex(ctx, "Singers", "SingerByName", k, []string{"SingerId", "FirstName", "LastName"})
if err != nil {
continue
}
v, err := rowToValues(r)
if err != nil {
continue
}
got = append(got, v)
rts, err = ro.Timestamp()
if err != nil {
t.Errorf("%d: ReadOnlyTransaction.ReadRowUsingIndex(%v) doesn't return a timestamp, error: %v", i, k, err)
}
if err := test.checkTs(rts); err != nil {
t.Errorf("%d: ReadOnlyTransaction.ReadRowUsingIndex(%v) doesn't return expected timestamp: %v", i, k, err)
}
if roTs != rts {
t.Errorf("%d: got two read timestamps: %v, %v, want ReadOnlyTransaction to return always the same read timestamp", i, roTs, rts)
}
}
if !testEqual(got, test.want) {
t.Errorf("%d: got unexpected results from ReadOnlyTransaction.ReadRowUsingIndex: %v, want %v", i, got, test.want)
}
ro.Close()
}
}
// Test ReadOnlyTransaction with different timestamp bound when there's an
// update at the same time.
func TestIntegration_UpdateDuringRead(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, statements[testDialect][singerDDLStatements])
defer cleanup()
for i, tb := range []TimestampBound{
StrongRead(),
ReadTimestamp(time.Now().Add(-time.Minute * 30)), // version GC is 1 hour
ExactStaleness(time.Minute * 30),
} {
ro := client.ReadOnlyTransaction().WithTimestampBound(tb)
_, err := ro.ReadRow(ctx, "Singers", Key{i}, []string{"SingerId"})
if ErrCode(err) != codes.NotFound {
t.Errorf("%d: ReadOnlyTransaction.ReadRow before write returns error: %v, want NotFound", i, err)
}
m := InsertOrUpdate("Singers", []string{"SingerId"}, []interface{}{i})
if _, err := client.Apply(ctx, []*Mutation{m}, ApplyAtLeastOnce()); err != nil {
t.Fatal(err)
}
_, err = ro.ReadRow(ctx, "Singers", Key{i}, []string{"SingerId"})
if ErrCode(err) != codes.NotFound {
t.Errorf("%d: ReadOnlyTransaction.ReadRow after write returns error: %v, want NotFound", i, err)
}
}
}
// Test ReadWriteTransaction.
func TestIntegration_ReadWriteTransaction(t *testing.T) {
t.Parallel()
// Give a longer deadline because of transaction backoffs.
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, statements[testDialect][singerDDLStatements])
defer cleanup()
// Set up two accounts
accounts := []*Mutation{
Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(1), "Foo", int64(50)}),
Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(2), "Bar", int64(1)}),
}
if _, err := client.Apply(ctx, accounts, ApplyAtLeastOnce()); err != nil {
t.Fatal(err)
}
wg := sync.WaitGroup{}
readBalance := func(iter *RowIterator) (int64, error) {
defer iter.Stop()
var bal int64
for {
row, err := iter.Next()
if err == iterator.Done {
return bal, nil
}
if err != nil {
return 0, err
}
if err := row.Column(0, &bal); err != nil {
return 0, err
}
}
}
queryAccountByID := "SELECT Balance FROM Accounts WHERE AccountId = @p1"
if testDialect == adminpb.DatabaseDialect_POSTGRESQL {
queryAccountByID = "SELECT Balance FROM Accounts WHERE AccountId = $1"
}
for i := 0; i < 20; i++ {
wg.Add(1)
go func(iter int) {
defer wg.Done()
_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
// Query Foo's balance and Bar's balance.
bf, e := readBalance(tx.Query(ctx,
Statement{queryAccountByID, map[string]interface{}{"p1": int64(1)}}))
if e != nil {
return e
}
verifyDirectPathRemoteAddress(t)
bb, e := readBalance(tx.Read(ctx, "Accounts", KeySets(Key{int64(2)}), []string{"Balance"}))
if e != nil {
return e
}
verifyDirectPathRemoteAddress(t)
if bf <= 0 {
return nil
}
bf--
bb++
return tx.BufferWrite([]*Mutation{
Update("Accounts", []string{"AccountId", "Balance"}, []interface{}{int64(1), bf}),
Update("Accounts", []string{"AccountId", "Balance"}, []interface{}{int64(2), bb}),
})
})
if err != nil {
t.Errorf("%d: failed to execute transaction: %v", iter, err)
}
verifyDirectPathRemoteAddress(t)
}(i)
}
// Because of context timeout, all goroutines will eventually return.
wg.Wait()
_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
var bf, bb int64
r, e := tx.ReadRow(ctx, "Accounts", Key{int64(1)}, []string{"Balance"})
if e != nil {
return e
}
verifyDirectPathRemoteAddress(t)
if ce := r.Column(0, &bf); ce != nil {
return ce
}
// reading non-indexed column from index is not supported in PG
if testDialect == adminpb.DatabaseDialect_POSTGRESQL {
bb, e = readBalance(tx.Read(ctx, "Accounts", Key{int64(2)}, []string{"Balance"}))
if e != nil {
return e
}
} else {
bb, e = readBalance(tx.ReadUsingIndex(ctx, "Accounts", "AccountByNickname", KeySets(Key{"Bar"}), []string{"Balance"}))
if e != nil {
return e
}
}
verifyDirectPathRemoteAddress(t)
if bf != 30 || bb != 21 {
t.Errorf("Foo's balance is now %v and Bar's balance is now %v, want %v and %v", bf, bb, 30, 21)
}
return nil
})
if err != nil {
t.Errorf("failed to check balances: %v", err)
}
verifyDirectPathRemoteAddress(t)
}
// Test ReadWriteTransactionWithOptions.
func TestIntegration_ReadWriteTransactionWithOptions(t *testing.T) {
t.Parallel()
skipEmulatorTest(t)
// Give a longer deadline because of transaction backoffs.
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, statements[testDialect][singerDDLStatements])
defer cleanup()
// Set up two accounts
accounts := []*Mutation{
Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(1), "Foo", int64(50)}),
Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(2), "Bar", int64(1)}),
}
if _, err := client.Apply(ctx, accounts, ApplyAtLeastOnce()); err != nil {
t.Fatal(err)
}
readBalance := func(iter *RowIterator) (int64, error) {
defer iter.Stop()
var bal int64
for {
row, err := iter.Next()
if err == iterator.Done {
return bal, nil
}
if err != nil {
return 0, err
}
if err := row.Column(0, &bal); err != nil {
return 0, err
}
}
}
duration, _ := time.ParseDuration("100ms")
txOpts := TransactionOptions{CommitOptions: CommitOptions{ReturnCommitStats: true, MaxCommitDelay: &duration}}
resp, err := client.ReadWriteTransactionWithOptions(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
// Query Foo's balance and Bar's balance.
queryAccountByID := "SELECT Balance FROM Accounts WHERE AccountId = @p1"
if testDialect == adminpb.DatabaseDialect_POSTGRESQL {
queryAccountByID = "SELECT Balance FROM Accounts WHERE AccountId = $1"
}
bf, e := readBalance(tx.Query(ctx,
Statement{queryAccountByID, map[string]interface{}{"p1": int64(1)}}))
if e != nil {
return e
}
bb, e := readBalance(tx.Read(ctx, "Accounts", KeySets(Key{int64(2)}), []string{"Balance"}))
if e != nil {
return e
}
if bf <= 0 {
return nil
}
bf--
bb++
return tx.BufferWrite([]*Mutation{
Update("Accounts", []string{"AccountId", "Balance"}, []interface{}{int64(1), bf}),
Update("Accounts", []string{"AccountId", "Balance"}, []interface{}{int64(2), bb}),
})
}, txOpts)
if err != nil {
t.Fatalf("Failed to execute transaction: %v", err)
}
if resp.CommitStats == nil {
t.Fatal("Missing commit stats in commit response")
}
expectedMutationCount := int64(8)
if testDialect == adminpb.DatabaseDialect_POSTGRESQL {
// for PG mutation count for Balance column is not added for AccountName index
expectedMutationCount = int64(4)
}
if got, want := resp.CommitStats.MutationCount, expectedMutationCount; got != want {
t.Errorf("Mismatch mutation count - got: %v, want: %v", got, want)
}
}
func TestIntegration_ReadWriteTransaction_StatementBased(t *testing.T) {
t.Parallel()
skipEmulatorTest(t)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, statements[testDialect][singerDDLStatements])
defer cleanup()
// Set up two accounts
accounts := []*Mutation{
Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(1), "Foo", int64(50)}),
Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(2), "Bar", int64(1)}),
}
if _, err := client.Apply(ctx, accounts, ApplyAtLeastOnce()); err != nil {
t.Fatal(err)
}
getBalance := func(txn *ReadWriteStmtBasedTransaction, key Key) (int64, error) {
row, err := txn.ReadRow(ctx, "Accounts", key, []string{"Balance"})
if err != nil {
return 0, err
}
var balance int64
if err := row.Column(0, &balance); err != nil {
return 0, err
}
return balance, nil
}
statements := func(txn *ReadWriteStmtBasedTransaction) error {
outBalance, err := getBalance(txn, Key{1})
if err != nil {
return err
}
const transferAmt = 20
if outBalance >= transferAmt {
inBalance, err := getBalance(txn, Key{2})
if err != nil {
return err
}
inBalance += transferAmt
outBalance -= transferAmt
cols := []string{"AccountId", "Balance"}
txn.BufferWrite([]*Mutation{
Update("Accounts", cols, []interface{}{1, outBalance}),
Update("Accounts", cols, []interface{}{2, inBalance}),
})
}
return nil
}
for {
tx, err := NewReadWriteStmtBasedTransaction(ctx, client)
if err != nil {
t.Fatalf("failed to begin a transaction: %v", err)
}
err = statements(tx)
if err != nil && status.Code(err) != codes.Aborted {
tx.Rollback(ctx)
t.Fatalf("failed to execute statements: %v", err)
} else if err == nil {
_, err = tx.Commit(ctx)
if err == nil {
break
} else if status.Code(err) != codes.Aborted {
t.Fatalf("failed to commit a transaction: %v", err)
}
}
// Set a default sleep time if the server delay is absent.
delay := 10 * time.Millisecond
if serverDelay, hasServerDelay := ExtractRetryDelay(err); hasServerDelay {
delay = serverDelay
}
time.Sleep(delay)
}
// Query the updated values.
stmt := Statement{SQL: `SELECT AccountId, Nickname, Balance FROM Accounts ORDER BY AccountId`}
iter := client.Single().Query(ctx, stmt)
got, err := readAllAccountsTable(iter)
if err != nil {
t.Fatalf("failed to read data: %v", err)
}
want := [][]interface{}{
{int64(1), "Foo", int64(30)},
{int64(2), "Bar", int64(21)},
}
if !testEqual(got, want) {
t.Errorf("\ngot %v\nwant%v", got, want)
}
}
func TestIntegration_ReadWriteTransaction_StatementBasedWithOptions(t *testing.T) {
t.Parallel()
skipEmulatorTest(t)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, statements[testDialect][singerDDLStatements])
defer cleanup()
// Set up two accounts
accounts := []*Mutation{
Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(1), "Foo", int64(50)}),
Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(2), "Bar", int64(1)}),
}
if _, err := client.Apply(ctx, accounts, ApplyAtLeastOnce()); err != nil {
t.Fatal(err)
}
getBalance := func(txn *ReadWriteStmtBasedTransaction, key Key) (int64, error) {
row, err := txn.ReadRow(ctx, "Accounts", key, []string{"Balance"})
if err != nil {
return 0, err
}
var balance int64
if err := row.Column(0, &balance); err != nil {
return 0, err
}
return balance, nil
}
statements := func(txn *ReadWriteStmtBasedTransaction) error {
outBalance, err := getBalance(txn, Key{1})
if err != nil {
return err
}
const transferAmt = 20
if outBalance >= transferAmt {
inBalance, err := getBalance(txn, Key{2})
if err != nil {
return err
}
inBalance += transferAmt
outBalance -= transferAmt
cols := []string{"AccountId", "Balance"}
txn.BufferWrite([]*Mutation{
Update("Accounts", cols, []interface{}{1, outBalance}),
Update("Accounts", cols, []interface{}{2, inBalance}),
})
}
return nil
}
var resp CommitResponse
txOpts := TransactionOptions{CommitOptions: CommitOptions{ReturnCommitStats: true}}
for {
tx, err := NewReadWriteStmtBasedTransactionWithOptions(ctx, client, txOpts)
if err != nil {
t.Fatalf("failed to begin a transaction: %v", err)
}
err = statements(tx)
if err != nil && status.Code(err) != codes.Aborted {
tx.Rollback(ctx)
t.Fatalf("failed to execute statements: %v", err)
} else if err == nil {
resp, err = tx.CommitWithReturnResp(ctx)
if err == nil {
break
} else if status.Code(err) != codes.Aborted {
t.Fatalf("failed to commit a transaction: %v", err)
}
}
// Set a default sleep time if the server delay is absent.
delay := 10 * time.Millisecond
if serverDelay, hasServerDelay := ExtractRetryDelay(err); hasServerDelay {
delay = serverDelay
}
time.Sleep(delay)
}
if resp.CommitStats == nil {
t.Fatal("Missing commit stats in commit response")
}
expectedMutationCount := int64(8)
if testDialect == adminpb.DatabaseDialect_POSTGRESQL {
// for PG mutation count for Balance column is not added for AccountName index
expectedMutationCount = int64(4)
}
if got, want := resp.CommitStats.MutationCount, expectedMutationCount; got != want {
t.Errorf("Mismatch mutation count - got: %v, want: %v", got, want)
}
}
func TestIntegration_Reads(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
// Set up testing environment.
_, dbPath, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, statements[testDialect][testTableDDLStatements])
defer cleanup()
client, err := createClient(ctx, dbPath, ClientConfig{SessionPoolConfig: DefaultSessionPoolConfig, Compression: "gzip"})
if err != nil {
t.Fatal(err)
}
defer client.Close()
// Includes k0..k14. Strings sort lexically, eg "k1" < "k10" < "k2".
var ms []*Mutation
for i := 0; i < 15; i++ {
ms = append(ms, InsertOrUpdate(testTable,
testTableColumns,
[]interface{}{fmt.Sprintf("k%d", i), fmt.Sprintf("v%d", i)}))
}
// Don't use ApplyAtLeastOnce, so we can test the other code path.
if _, err := client.Apply(ctx, ms); err != nil {
t.Fatal(err)
}
verifyDirectPathRemoteAddress(t)
// Empty read.
rows, err := readAllTestTable(client.Single().Read(ctx, testTable,
KeyRange{Start: Key{"k99"}, End: Key{"z"}}, testTableColumns))
if err != nil {
t.Fatal(err)
}
verifyDirectPathRemoteAddress(t)
if got, want := len(rows), 0; got != want {
t.Errorf("got %d, want %d", got, want)
}
// Index empty read.
rows, err = readAllTestTable(client.Single().ReadUsingIndex(ctx, testTable, testTableIndex,
KeyRange{Start: Key{"v99"}, End: Key{"z"}}, testTableColumns))
if err != nil {
t.Fatal(err)
}
verifyDirectPathRemoteAddress(t)
if got, want := len(rows), 0; got != want {
t.Errorf("got %d, want %d", got, want)
}
// Point read.
row, err := client.Single().ReadRow(ctx, testTable, Key{"k1"}, testTableColumns)
if err != nil {
t.Fatal(err)
}
verifyDirectPathRemoteAddress(t)
var got testTableRow
if err := row.ToStruct(&got); err != nil {
t.Fatal(err)
}
if want := (testTableRow{"k1", "v1"}); got != want {
t.Errorf("got %v, want %v", got, want)
}
// Point read not found.
_, err = client.Single().ReadRow(ctx, testTable, Key{"k999"}, testTableColumns)
if ErrCode(err) != codes.NotFound {
t.Fatalf("got %v, want NotFound", err)
}
verifyDirectPathRemoteAddress(t)
// Index point read.
rowIndex, err := client.Single().ReadRowUsingIndex(ctx, testTable, testTableIndex, Key{"v1"}, testTableColumns)
if err != nil {
t.Fatal(err)
}
verifyDirectPathRemoteAddress(t)
var gotIndex testTableRow
if err := rowIndex.ToStruct(&gotIndex); err != nil {
t.Fatal(err)
}
if wantIndex := (testTableRow{"k1", "v1"}); gotIndex != wantIndex {
t.Errorf("got %v, want %v", gotIndex, wantIndex)
}
// Index point read not found.
_, err = client.Single().ReadRowUsingIndex(ctx, testTable, testTableIndex, Key{"v999"}, testTableColumns)
if ErrCode(err) != codes.NotFound {
t.Fatalf("got %v, want NotFound", err)
}
verifyDirectPathRemoteAddress(t)
rangeReads(ctx, t, client)
indexRangeReads(ctx, t, client)
}
func TestIntegration_EarlyTimestamp(t *testing.T) {
t.Parallel()
// Test that we can get the timestamp from a read-only transaction as
// soon as we have read at least one row.
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
// Set up testing environment.
client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, statements[testDialect][testTableDDLStatements])
defer cleanup()
var ms []*Mutation
for i := 0; i < 3; i++ {
ms = append(ms, InsertOrUpdate(testTable,
testTableColumns,
[]interface{}{fmt.Sprintf("k%d", i), fmt.Sprintf("v%d", i)}))
}
if _, err := client.Apply(ctx, ms, ApplyAtLeastOnce()); err != nil {
t.Fatal(err)
}
txn := client.Single()
iter := txn.Read(ctx, testTable, AllKeys(), testTableColumns)
defer iter.Stop()
// In single-use transaction, we should get an error before reading anything.
if _, err := txn.Timestamp(); err == nil {
t.Error("wanted error, got nil")
}
// After reading one row, the timestamp should be available.
_, err := iter.Next()
if err != nil {
t.Fatal(err)
}
if _, err := txn.Timestamp(); err != nil {
t.Errorf("got %v, want nil", err)
}
txn = client.ReadOnlyTransaction()
defer txn.Close()
iter = txn.Read(ctx, testTable, AllKeys(), testTableColumns)
defer iter.Stop()
// In an ordinary read-only transaction, the timestamp should be
// available immediately.
if _, err := txn.Timestamp(); err != nil {
t.Errorf("got %v, want nil", err)
}
}
func TestIntegration_NestedTransaction(t *testing.T) {
t.Parallel()
// You cannot use a transaction from inside a read-write transaction.
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, statements[testDialect][singerDDLStatements])
defer cleanup()
_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
_, err := client.ReadWriteTransaction(ctx,
func(context.Context, *ReadWriteTransaction) error { return nil })
if ErrCode(err) != codes.FailedPrecondition {
t.Fatalf("got %v, want FailedPrecondition", err)
}
_, err = client.Single().ReadRow(ctx, "Singers", Key{1}, []string{"SingerId"})
if ErrCode(err) != codes.FailedPrecondition {
t.Fatalf("got %v, want FailedPrecondition", err)
}
rot := client.ReadOnlyTransaction()
defer rot.Close()
_, err = rot.ReadRow(ctx, "Singers", Key{1}, []string{"SingerId"})
if ErrCode(err) != codes.FailedPrecondition {
t.Fatalf("got %v, want FailedPrecondition", err)
}
return nil
})
if err != nil {
t.Fatal(err)
}
}
func TestIntegration_CreateDBRetry(t *testing.T) {
t.Parallel()
skipUnsupportedPGTest(t)
if databaseAdmin == nil {
t.Skip("Integration tests skipped")
}
skipEmulatorTest(t)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
dbName := dbNameSpace.New()
// Simulate an Unavailable error on the CreateDatabase RPC.
hasReturnedUnavailable := false
interceptor := func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
err := invoker(ctx, method, req, reply, cc, opts...)
if !hasReturnedUnavailable && err == nil {
hasReturnedUnavailable = true
return status.Error(codes.Unavailable, "Injected unavailable error")
}
return err
}
// Pass spanner host as options for running builds against different environments
opts := []option.ClientOption{option.WithEndpoint(spannerHost), option.WithGRPCDialOption(grpc.WithUnaryInterceptor(interceptor))}
dbAdmin, err := database.NewDatabaseAdminClient(ctx, opts...)
if err != nil {
log.Fatalf("cannot create dbAdmin client: %v", err)
}
op, err := dbAdmin.CreateDatabaseWithRetry(ctx, &adminpb.CreateDatabaseRequest{
Parent: fmt.Sprintf("projects/%v/instances/%v", testProjectID, testInstanceID),
CreateStatement: "CREATE DATABASE " + dbName,
DatabaseDialect: testDialect,
})
if err != nil {
t.Fatalf("failed to create database: %v", err)
}
_, err = op.Wait(ctx)
if err != nil {
t.Fatalf("failed to create database: %v", err)
}
if !hasReturnedUnavailable {
t.Fatalf("interceptor did not return Unavailable")
}
}
// Test client recovery on database recreation.
func TestIntegration_DbRemovalRecovery(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
// Create a client with MinOpened=0 to prevent the session pool maintainer
// from repeatedly trying to create sessions for the invalid database.
client, dbPath, cleanup := prepareIntegrationTest(ctx, t, SessionPoolConfig{}, statements[testDialect][singerDDLStatements])
defer cleanup()
// Drop the testing database.
if err := databaseAdmin.DropDatabase(ctx, &adminpb.DropDatabaseRequest{Database: dbPath}); err != nil {
t.Fatalf("failed to drop testing database %v: %v", dbPath, err)
}
// Now, send the query.
iter := client.Single().Query(ctx, Statement{SQL: "SELECT SingerId FROM Singers"})
defer iter.Stop()
if _, err := iter.Next(); err == nil {
t.Errorf("client sends query to removed database successfully, want it to fail")
}
verifyDirectPathRemoteAddress(t)
// Recreate database and table.
dbName := dbPath[strings.LastIndex(dbPath, "/")+1:]
req := &adminpb.CreateDatabaseRequest{
Parent: fmt.Sprintf("projects/%v/instances/%v", testProjectID, testInstanceID),
CreateStatement: "CREATE DATABASE " + dbName,
ExtraStatements: []string{
`CREATE TABLE Singers (
SingerId INT64 NOT NULL,
FirstName STRING(1024),
LastName STRING(1024),
SingerInfo BYTES(MAX)
) PRIMARY KEY (SingerId)`,
},
DatabaseDialect: testDialect,
}
if testDialect == adminpb.DatabaseDialect_POSTGRESQL {
req.ExtraStatements = []string{}
}
op, err := databaseAdmin.CreateDatabaseWithRetry(ctx, req)
if err != nil {
t.Fatalf("cannot recreate testing DB %v: %v", dbPath, err)
}
if _, err := op.Wait(ctx); err != nil {
t.Fatalf("cannot recreate testing DB %v: %v", dbPath, err)
}
if testDialect == adminpb.DatabaseDialect_POSTGRESQL {
op, err := databaseAdmin.UpdateDatabaseDdl(ctx, &adminpb.UpdateDatabaseDdlRequest{
Database: dbPath,
Statements: []string{`
CREATE TABLE Singers (
SingerId INT8 NOT NULL,
FirstName VARCHAR(1024),
LastName VARCHAR(1024),
SingerInfo BYTEA,
PRIMARY KEY(SingerId)
)`},
})
if err != nil {
t.Fatalf("cannot create singers table %v: %v", dbPath, err)
}
if err := op.Wait(ctx); err != nil {
t.Fatalf("timeout creating singers table %v: %v", dbPath, err)
}
}
// Now, send the query again.
iter = client.Single().Query(ctx, Statement{SQL: "SELECT SingerId FROM Singers"})
defer iter.Stop()
_, err = iter.Next()
if err != nil && err != iterator.Done {
t.Errorf("failed to send query to database %v: %v", dbPath, err)
}
verifyDirectPathRemoteAddress(t)
}
// Test encoding/decoding non-struct Cloud Spanner types.
func TestIntegration_BasicTypes(t *testing.T) {
t.Parallel()
skipUnsupportedPGTest(t)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
stmts := []string{
`CREATE TABLE Singers (
SingerId INT64 NOT NULL,
FirstName STRING(1024),
LastName STRING(1024),
SingerInfo BYTES(MAX)
) PRIMARY KEY (SingerId)`,
`CREATE INDEX SingerByName ON Singers(FirstName, LastName)`,
`CREATE TABLE Accounts (
AccountId INT64 NOT NULL,
Nickname STRING(100),
Balance INT64 NOT NULL,
) PRIMARY KEY (AccountId)`,
`CREATE INDEX AccountByNickname ON Accounts(Nickname) STORING (Balance)`,
`CREATE TABLE Types (
RowID INT64 NOT NULL,
String STRING(MAX),
StringArray ARRAY<STRING(MAX)>,
Bytes BYTES(MAX),
BytesArray ARRAY<BYTES(MAX)>,
Int64a INT64,
Int64Array ARRAY<INT64>,
Bool BOOL,
BoolArray ARRAY<BOOL>,
Float64 FLOAT64,
Float64Array ARRAY<FLOAT64>,
Date DATE,
DateArray ARRAY<DATE>,
Timestamp TIMESTAMP,
TimestampArray ARRAY<TIMESTAMP>,
Numeric NUMERIC,
NumericArray ARRAY<NUMERIC>,
JSON JSON,
JSONArray ARRAY<JSON>
) PRIMARY KEY (RowID)`,
}
client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, stmts)
defer cleanup()
t1, _ := time.Parse(time.RFC3339Nano, "2016-11-15T15:04:05.999999999Z")
// Boundaries
t2, _ := time.Parse(time.RFC3339Nano, "0001-01-01T00:00:00.000000000Z")
t3, _ := time.Parse(time.RFC3339Nano, "9999-12-31T23:59:59.999999999Z")
d1, _ := civil.ParseDate("2016-11-15")
// Boundaries
d2, _ := civil.ParseDate("0001-01-01")
d3, _ := civil.ParseDate("9999-12-31")
n0 := big.Rat{}
n1p, _ := (&big.Rat{}).SetString("123456789")
n2p, _ := (&big.Rat{}).SetString("123456789/1000000000")
n1 := *n1p
n2 := *n2p
type Message struct {
Name string
Body string
Time int64
FloatValue interface{}
}
msg := Message{"Alice", "Hello", 145688415796432520, json.Number("0.39240506000000003")}
unmarshalledJSONStructUsingNumber := map[string]interface{}{
"Name": "Alice",
"Body": "Hello",
"Time": json.Number("145688415796432520"),
"FloatValue": json.Number("0.39240506000000003"),
}
unmarshalledJSONStruct := map[string]interface{}{
"Name": "Alice",
"Body": "Hello",
"Time": 1.456884157964325e+17,
"FloatValue": 0.39240506,
}
tests := []struct {
col string
val interface{}
wantWithDefaultConfig interface{}
wantWithNumber interface{}
}{
{col: "String", val: ""},
{col: "String", val: "", wantWithDefaultConfig: NullString{"", true}, wantWithNumber: NullString{"", true}},
{col: "String", val: "foo"},
{col: "String", val: "foo", wantWithDefaultConfig: NullString{"foo", true}, wantWithNumber: NullString{"foo", true}},
{col: "String", val: NullString{"bar", true}, wantWithDefaultConfig: "bar", wantWithNumber: "bar"},
{col: "String", val: NullString{"bar", false}, wantWithDefaultConfig: NullString{"", false}, wantWithNumber: NullString{"", false}},
{col: "StringArray", val: []string(nil), wantWithDefaultConfig: []NullString(nil), wantWithNumber: []NullString(nil)},
{col: "StringArray", val: []string{}, wantWithDefaultConfig: []NullString{}, wantWithNumber: []NullString{}},
{col: "StringArray", val: []string{"foo", "bar"}, wantWithDefaultConfig: []NullString{{"foo", true}, {"bar", true}}, wantWithNumber: []NullString{{"foo", true}, {"bar", true}}},
{col: "StringArray", val: []NullString(nil)},
{col: "StringArray", val: []NullString{}},
{col: "StringArray", val: []NullString{{"foo", true}, {}}},
{col: "Bytes", val: []byte{}},
{col: "Bytes", val: []byte{1, 2, 3}},
{col: "Bytes", val: []byte(nil)},
{col: "BytesArray", val: [][]byte(nil)},
{col: "BytesArray", val: [][]byte{}},
{col: "BytesArray", val: [][]byte{{1}, {2, 3}}},
{col: "Int64a", val: 0, wantWithDefaultConfig: int64(0), wantWithNumber: int64(0)},
{col: "Int64a", val: -1, wantWithDefaultConfig: int64(-1), wantWithNumber: int64(-1)},
{col: "Int64a", val: 2, wantWithDefaultConfig: int64(2), wantWithNumber: int64(2)},
{col: "Int64a", val: int64(3)},
{col: "Int64a", val: 4, wantWithDefaultConfig: NullInt64{4, true}, wantWithNumber: NullInt64{4, true}},
{col: "Int64a", val: NullInt64{5, true}, wantWithDefaultConfig: int64(5), wantWithNumber: int64(5)},
{col: "Int64a", val: NullInt64{6, true}, wantWithDefaultConfig: int64(6), wantWithNumber: int64(6)},
{col: "Int64a", val: NullInt64{7, false}, wantWithDefaultConfig: NullInt64{0, false}, wantWithNumber: NullInt64{0, false}},
{col: "Int64Array", val: []int(nil), wantWithDefaultConfig: []NullInt64(nil), wantWithNumber: []NullInt64(nil)},
{col: "Int64Array", val: []int{}, wantWithDefaultConfig: []NullInt64{}, wantWithNumber: []NullInt64{}},
{col: "Int64Array", val: []int{1, 2}, wantWithDefaultConfig: []NullInt64{{1, true}, {2, true}}, wantWithNumber: []NullInt64{{1, true}, {2, true}}},
{col: "Int64Array", val: []int64(nil), wantWithDefaultConfig: []NullInt64(nil), wantWithNumber: []NullInt64(nil)},
{col: "Int64Array", val: []int64{}, wantWithDefaultConfig: []NullInt64{}, wantWithNumber: []NullInt64{}},
{col: "Int64Array", val: []int64{1, 2}, wantWithDefaultConfig: []NullInt64{{1, true}, {2, true}}, wantWithNumber: []NullInt64{{1, true}, {2, true}}},
{col: "Int64Array", val: []NullInt64(nil)},
{col: "Int64Array", val: []NullInt64{}},
{col: "Int64Array", val: []NullInt64{{1, true}, {}}},
{col: "Bool", val: false},
{col: "Bool", val: true},
{col: "Bool", val: false, wantWithDefaultConfig: NullBool{false, true}, wantWithNumber: NullBool{false, true}},
{col: "Bool", val: true, wantWithDefaultConfig: NullBool{true, true}, wantWithNumber: NullBool{true, true}},
{col: "Bool", val: NullBool{true, true}},
{col: "Bool", val: NullBool{false, false}},
{col: "BoolArray", val: []bool(nil), wantWithDefaultConfig: []NullBool(nil), wantWithNumber: []NullBool(nil)},
{col: "BoolArray", val: []bool{}, wantWithDefaultConfig: []NullBool{}, wantWithNumber: []NullBool{}},
{col: "BoolArray", val: []bool{true, false}, wantWithDefaultConfig: []NullBool{{true, true}, {false, true}}, wantWithNumber: []NullBool{{true, true}, {false, true}}},
{col: "BoolArray", val: []NullBool(nil)},
{col: "BoolArray", val: []NullBool{}},
{col: "BoolArray", val: []NullBool{{false, true}, {true, true}, {}}},
{col: "Float64", val: 0.0},
{col: "Float64", val: 3.14},
{col: "Float64", val: math.NaN()},
{col: "Float64", val: math.Inf(1)},
{col: "Float64", val: math.Inf(-1)},
{col: "Float64", val: 2.78, wantWithDefaultConfig: NullFloat64{2.78, true}, wantWithNumber: NullFloat64{2.78, true}},
{col: "Float64", val: NullFloat64{2.71, true}, wantWithDefaultConfig: 2.71, wantWithNumber: 2.71},
{col: "Float64", val: NullFloat64{1.41, true}, wantWithDefaultConfig: NullFloat64{1.41, true}, wantWithNumber: NullFloat64{1.41, true}},
{col: "Float64", val: NullFloat64{0, false}},
{col: "Float64Array", val: []float64(nil), wantWithDefaultConfig: []NullFloat64(nil), wantWithNumber: []NullFloat64(nil)},
{col: "Float64Array", val: []float64{}, wantWithDefaultConfig: []NullFloat64{}, wantWithNumber: []NullFloat64{}},
{col: "Float64Array", val: []float64{2.72, 3.14, math.Inf(1)}, wantWithDefaultConfig: []NullFloat64{{2.72, true}, {3.14, true}, {math.Inf(1), true}}, wantWithNumber: []NullFloat64{{2.72, true}, {3.14, true}, {math.Inf(1), true}}},
{col: "Float64Array", val: []NullFloat64(nil)},
{col: "Float64Array", val: []NullFloat64{}},
{col: "Float64Array", val: []NullFloat64{{2.72, true}, {math.Inf(1), true}, {}}},
{col: "Date", val: d1},
{col: "Date", val: d1, wantWithDefaultConfig: NullDate{d1, true}, wantWithNumber: NullDate{d1, true}},
{col: "Date", val: NullDate{d1, true}},
{col: "Date", val: NullDate{d1, true}, wantWithDefaultConfig: d1, wantWithNumber: d1},
{col: "Date", val: NullDate{civil.Date{}, false}},
{col: "DateArray", val: []civil.Date(nil), wantWithDefaultConfig: []NullDate(nil), wantWithNumber: []NullDate(nil)},
{col: "DateArray", val: []civil.Date{}, wantWithDefaultConfig: []NullDate{}, wantWithNumber: []NullDate{}},
{col: "DateArray", val: []civil.Date{d1, d2, d3}, wantWithDefaultConfig: []NullDate{{d1, true}, {d2, true}, {d3, true}}, wantWithNumber: []NullDate{{d1, true}, {d2, true}, {d3, true}}},
{col: "Timestamp", val: t1},
{col: "Timestamp", val: t1, wantWithDefaultConfig: NullTime{t1, true}, wantWithNumber: NullTime{t1, true}},
{col: "Timestamp", val: NullTime{t1, true}},
{col: "Timestamp", val: NullTime{t1, true}, wantWithDefaultConfig: t1, wantWithNumber: t1},
{col: "Timestamp", val: NullTime{}},
{col: "TimestampArray", val: []time.Time(nil), wantWithDefaultConfig: []NullTime(nil), wantWithNumber: []NullTime(nil)},
{col: "TimestampArray", val: []time.Time{}, wantWithDefaultConfig: []NullTime{}, wantWithNumber: []NullTime{}},
{col: "TimestampArray", val: []time.Time{t1, t2, t3}, wantWithDefaultConfig: []NullTime{{t1, true}, {t2, true}, {t3, true}}, wantWithNumber: []NullTime{{t1, true}, {t2, true}, {t3, true}}},
{col: "Numeric", val: n1},
{col: "Numeric", val: n2},
{col: "Numeric", val: n1, wantWithDefaultConfig: NullNumeric{n1, true}, wantWithNumber: NullNumeric{n1, true}},
{col: "Numeric", val: n2, wantWithDefaultConfig: NullNumeric{n2, true}, wantWithNumber: NullNumeric{n2, true}},
{col: "Numeric", val: NullNumeric{n1, true}, wantWithDefaultConfig: n1, wantWithNumber: n1},
{col: "Numeric", val: NullNumeric{n1, true}, wantWithDefaultConfig: NullNumeric{n1, true}, wantWithNumber: NullNumeric{n1, true}},
{col: "Numeric", val: NullNumeric{n0, false}},
{col: "NumericArray", val: []big.Rat(nil), wantWithDefaultConfig: []NullNumeric(nil), wantWithNumber: []NullNumeric(nil)},
{col: "NumericArray", val: []big.Rat{}, wantWithDefaultConfig: []NullNumeric{}, wantWithNumber: []NullNumeric{}},
{col: "NumericArray", val: []big.Rat{n1, n2}, wantWithDefaultConfig: []NullNumeric{{n1, true}, {n2, true}}, wantWithNumber: []NullNumeric{{n1, true}, {n2, true}}},
{col: "NumericArray", val: []NullNumeric(nil)},
{col: "NumericArray", val: []NullNumeric{}},
{col: "NumericArray", val: []NullNumeric{{n1, true}, {n2, true}, {}}},
{col: "JSON", val: NullJSON{msg, true}, wantWithDefaultConfig: NullJSON{unmarshalledJSONStruct, true}, wantWithNumber: NullJSON{unmarshalledJSONStructUsingNumber, true}},
{col: "JSON", val: NullJSON{msg, false}, wantWithDefaultConfig: NullJSON{}, wantWithNumber: NullJSON{}},
{col: "JSONArray", val: []NullJSON(nil)},
{col: "JSONArray", val: []NullJSON{}},
{col: "JSONArray", val: []NullJSON{{msg, true}, {msg, true}, {}}, wantWithDefaultConfig: []NullJSON{{unmarshalledJSONStruct, true}, {unmarshalledJSONStruct, true}, {}}, wantWithNumber: []NullJSON{{unmarshalledJSONStructUsingNumber, true}, {unmarshalledJSONStructUsingNumber, true}, {}}},
{col: "String", val: nil, wantWithDefaultConfig: NullString{}, wantWithNumber: NullString{}},
{col: "StringArray", val: nil, wantWithDefaultConfig: []NullString(nil), wantWithNumber: []NullString(nil)},
{col: "Bytes", val: nil, wantWithDefaultConfig: []byte(nil), wantWithNumber: []byte(nil)},
{col: "BytesArray", val: nil, wantWithDefaultConfig: [][]byte(nil), wantWithNumber: [][]byte(nil)},
{col: "Int64a", val: nil, wantWithDefaultConfig: NullInt64{}, wantWithNumber: NullInt64{}},
{col: "Int64Array", val: nil, wantWithDefaultConfig: []NullInt64(nil), wantWithNumber: []NullInt64(nil)},
{col: "Bool", val: nil, wantWithDefaultConfig: NullBool{}, wantWithNumber: NullBool{}},
{col: "BoolArray", val: nil, wantWithDefaultConfig: []NullBool(nil), wantWithNumber: []NullBool(nil)},
{col: "Float64", val: nil, wantWithDefaultConfig: NullFloat64{}, wantWithNumber: NullFloat64{}},
{col: "Float64Array", val: nil, wantWithDefaultConfig: []NullFloat64(nil), wantWithNumber: []NullFloat64(nil)},
{col: "Numeric", val: nil, wantWithDefaultConfig: NullNumeric{}, wantWithNumber: NullNumeric{}},
{col: "NumericArray", val: nil, wantWithDefaultConfig: []NullNumeric(nil), wantWithNumber: []NullNumeric(nil)},
{col: "JSON", val: nil, wantWithDefaultConfig: NullJSON{}, wantWithNumber: NullJSON{}},
{col: "JSONArray", val: nil, wantWithDefaultConfig: []NullJSON(nil), wantWithNumber: []NullJSON(nil)},
}
// See https://github.com/GoogleCloudPlatform/cloud-spanner-emulator/issues/31
if !isEmulatorEnvSet() {
tests = append(tests, []struct {
col string
val interface{}
wantWithDefaultConfig interface{}
wantWithNumber interface{}
}{
{col: "Date", val: nil, wantWithDefaultConfig: NullDate{}, wantWithNumber: NullDate{}},
{col: "Timestamp", val: nil, wantWithDefaultConfig: NullTime{}, wantWithNumber: NullTime{}},
}...)
}
for _, withNumberConfigOption := range []bool{false, true} {
if withNumberConfigOption {
UseNumberWithJSONDecoderEncoder(withNumberConfigOption)
defer UseNumberWithJSONDecoderEncoder(!withNumberConfigOption)
}
// Write rows into table first using DML.
statements := make([]Statement, 0)
for i, test := range tests {
stmt := NewStatement(fmt.Sprintf("INSERT INTO Types (RowId, `%s`) VALUES (@id, @value)", test.col))
// Note: We are not setting the parameter type here to ensure that it
// can be automatically recognized when it is actually needed.
stmt.Params["id"] = i
stmt.Params["value"] = test.val
statements = append(statements, stmt)
}
_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
rowCounts, err := tx.BatchUpdate(ctx, statements)
if err != nil {
return err
}
if len(rowCounts) != len(tests) {
return fmt.Errorf("rowCounts length mismatch\nGot: %v\nWant: %v", len(rowCounts), len(tests))
}
for i, c := range rowCounts {
if c != 1 {
return fmt.Errorf("row count mismatch for row %v:\nGot: %v\nWant: %v", i, c, 1)
}
}
return nil
})
if err != nil {
t.Fatalf("failed to insert values using DML: %v", err)
}
// Delete all the rows so we can insert them using mutations as well.
_, err = client.Apply(ctx, []*Mutation{Delete("Types", AllKeys())})
if err != nil {
t.Fatalf("failed to delete all rows: %v", err)
}
// Verify that we can insert the rows using mutations.
var muts []*Mutation
for i, test := range tests {
muts = append(muts, InsertOrUpdate("Types", []string{"RowID", test.col}, []interface{}{i, test.val}))
}
if _, err := client.Apply(ctx, muts, ApplyAtLeastOnce()); err != nil {
t.Fatal(err)
}
for i, test := range tests {
row, err := client.Single().ReadRow(ctx, "Types", []interface{}{i}, []string{test.col})
if err != nil {
t.Fatalf("Unable to fetch row %v: %v", i, err)
}
verifyDirectPathRemoteAddress(t)
want := test.wantWithDefaultConfig
if withNumberConfigOption {
want = test.wantWithNumber
}
if want == nil {
want = test.val
}
gotp := reflect.New(reflect.TypeOf(want))
if err := row.Column(0, gotp.Interface()); err != nil {
t.Errorf("%v-%d: col:%v val:%#v, %v", withNumberConfigOption, i, test.col, test.val, err)
continue
}
got := reflect.Indirect(gotp).Interface()
// One of the test cases is checking NaN handling. Given
// NaN!=NaN, we can't use reflect to test for it.
if isNaN(got) && isNaN(want) {
continue
}
// Check non-NaN cases.
if !testEqual(got, want) {
t.Errorf("%v-%d: col:%v val:%#v, got %#v, want %#v", withNumberConfigOption, i, test.col, test.val, got, want)
continue
}
}
// cleanup
_, err = client.Apply(ctx, []*Mutation{Delete("Types", AllKeys())})
require.NoError(t, err)
}
}
// Test decoding Cloud Spanner STRUCT type.
func TestIntegration_StructTypes(t *testing.T) {
t.Parallel()
skipUnsupportedPGTest(t)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements)
defer cleanup()
tests := []struct {
q Statement
want func(r *Row) error
}{
{
q: Statement{SQL: `SELECT ARRAY(SELECT STRUCT(1, 2))`},
want: func(r *Row) error {
// Test STRUCT ARRAY decoding to []NullRow.
var rows []NullRow
if err := r.Column(0, &rows); err != nil {
return err
}
if len(rows) != 1 {
return fmt.Errorf("len(rows) = %d; want 1", len(rows))
}
if !rows[0].Valid {
return fmt.Errorf("rows[0] is NULL")
}
var i, j int64
if err := rows[0].Row.Columns(&i, &j); err != nil {
return err
}
if i != 1 || j != 2 {
return fmt.Errorf("got (%d,%d), want (1,2)", i, j)
}
return nil
},
},
{
q: Statement{SQL: `SELECT ARRAY(SELECT STRUCT(1 as foo, 2 as bar)) as col1`},
want: func(r *Row) error {
// Test Row.ToStruct.
s := struct {
Col1 []*struct {
Foo int64 `spanner:"foo"`
Bar int64 `spanner:"bar"`
} `spanner:"col1"`
}{}
if err := r.ToStruct(&s); err != nil {
return err
}
want := struct {
Col1 []*struct {
Foo int64 `spanner:"foo"`
Bar int64 `spanner:"bar"`
} `spanner:"col1"`
}{
Col1: []*struct {
Foo int64 `spanner:"foo"`
Bar int64 `spanner:"bar"`
}{
{
Foo: 1,
Bar: 2,
},
},
}
if !testEqual(want, s) {
return fmt.Errorf("unexpected decoding result: %v, want %v", s, want)
}
return nil
},
},
}
for i, test := range tests {
iter := client.Single().Query(ctx, test.q)
defer iter.Stop()
row, err := iter.Next()
if err != nil {
t.Errorf("%d: %v", i, err)
continue
}
if err := test.want(row); err != nil {
t.Errorf("%d: %v", i, err)
continue
}
}
}
func TestIntegration_StructParametersUnsupported(t *testing.T) {
skipEmulatorTest(t)
t.Parallel()
skipUnsupportedPGTest(t)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, nil)
defer cleanup()
for _, test := range []struct {
param interface{}
wantCode codes.Code
wantMsgPart string
}{
{
struct {
Field int
}{10},
codes.Unimplemented,
"Unsupported query shape: " +
"A struct value cannot be returned as a column value. " +
"Rewrite the query to flatten the struct fields in the result.",
},
{
[]struct {
Field int
}{{10}, {20}},
codes.Unimplemented,
"Unsupported query shape: " +
"This query can return a null-valued array of struct, " +
"which is not supported by Spanner.",
},
} {
iter := client.Single().Query(ctx, Statement{
SQL: "SELECT @p",
Params: map[string]interface{}{"p": test.param},
})
_, err := iter.Next()
iter.Stop()
if msg, ok := matchError(err, test.wantCode, test.wantMsgPart); !ok {
t.Fatal(msg)
}
}
}
// Test queries of the form "SELECT expr".
func TestIntegration_QueryExpressions(t *testing.T) {
t.Parallel()
skipUnsupportedPGTest(t)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, nil)
defer cleanup()
newRow := func(vals []interface{}) *Row {
row, err := NewRow(make([]string, len(vals)), vals)
if err != nil {
t.Fatal(err)
}
return row
}
tests := []struct {
expr string
want interface{}
}{
{"1", int64(1)},
{"[1, 2, 3]", []NullInt64{{1, true}, {2, true}, {3, true}}},
{"[1, NULL, 3]", []NullInt64{{1, true}, {0, false}, {3, true}}},
{"IEEE_DIVIDE(1, 0)", math.Inf(1)},
{"IEEE_DIVIDE(-1, 0)", math.Inf(-1)},
{"IEEE_DIVIDE(0, 0)", math.NaN()},
// TODO(jba): add IEEE_DIVIDE(0, 0) to the following array when we have a better equality predicate.
{"[IEEE_DIVIDE(1, 0), IEEE_DIVIDE(-1, 0)]", []NullFloat64{{math.Inf(1), true}, {math.Inf(-1), true}}},
{"ARRAY(SELECT AS STRUCT * FROM (SELECT 'a', 1) WHERE 0 = 1)", []NullRow{}},
{"ARRAY(SELECT STRUCT(1, 2))", []NullRow{{Row: *newRow([]interface{}{1, 2}), Valid: true}}},
}
for _, test := range tests {
iter := client.Single().Query(ctx, Statement{SQL: "SELECT " + test.expr})
defer iter.Stop()
row, err := iter.Next()
if err != nil {
t.Errorf("%q: %v", test.expr, err)
continue
}
// Create new instance of type of test.want.
gotp := reflect.New(reflect.TypeOf(test.want))
if err := row.Column(0, gotp.Interface()); err != nil {
t.Errorf("%q: Column returned error %v", test.expr, err)
continue
}
got := reflect.Indirect(gotp).Interface()
// TODO(jba): remove isNaN special case when we have a better equality predicate.
if isNaN(got) && isNaN(test.want) {
continue
}
if !testEqual(got, test.want) {
t.Errorf("%q\n got %#v\nwant %#v", test.expr, got, test.want)
}
}
}
func TestIntegration_QueryStats(t *testing.T) {
skipEmulatorTest(t)
t.Parallel()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, statements[testDialect][singerDDLStatements])
defer cleanup()
accounts := []*Mutation{
Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(1), "Foo", int64(50)}),
Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(2), "Bar", int64(1)}),
}
if _, err := client.Apply(ctx, accounts, ApplyAtLeastOnce()); err != nil {
t.Fatal(err)
}
const sql = "SELECT Balance FROM Accounts"
qp, err := client.Single().AnalyzeQuery(ctx, Statement{sql, nil})
if err != nil {
t.Fatal(err)
}
if len(qp.PlanNodes) == 0 {
t.Error("got zero plan nodes, expected at least one")
}
iter := client.Single().QueryWithStats(ctx, Statement{sql, nil})
defer iter.Stop()
for {
_, err := iter.Next()
if err == iterator.Done {
break
}
if err != nil {
t.Fatal(err)
}
}
if iter.QueryPlan == nil {
t.Error("got nil QueryPlan, expected one")
}
if iter.QueryStats == nil {
t.Error("got nil QueryStats, expected some")
}
}
func TestIntegration_InvalidDatabase(t *testing.T) {
t.Parallel()
if databaseAdmin == nil {
t.Skip("Integration tests skipped")
}
ctx := context.Background()
dbPath := fmt.Sprintf("projects/%v/instances/%v/databases/invalid", testProjectID, testInstanceID)
c, err := createClient(ctx, dbPath, ClientConfig{SessionPoolConfig: SessionPoolConfig{}})
// Client creation should succeed even if the database is invalid.
if err != nil {
t.Fatal(err)
}
_, err = c.Single().ReadRow(ctx, "TestTable", Key{1}, []string{"col1"})
if msg, ok := matchError(err, codes.NotFound, ""); !ok {
t.Fatal(msg)
}
}
func TestIntegration_ReadErrors(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, statements[testDialect][readDDLStatements])
defer cleanup()
var ms []*Mutation
for i := 0; i < 2; i++ {
ms = append(ms, InsertOrUpdate(testTable,
testTableColumns,
[]interface{}{fmt.Sprintf("k%d", i), fmt.Sprintf("v")}))
}
if _, err := client.Apply(ctx, ms); err != nil {
t.Fatal(err)
}
// Read over invalid table fails
_, err := client.Single().ReadRow(ctx, "badTable", Key{1}, []string{"StringValue"})
if msg, ok := matchError(err, codes.NotFound, "badTable"); !ok {
t.Error(msg)
}
// Read over invalid column fails
_, err = client.Single().ReadRow(ctx, "TestTable", Key{1}, []string{"badcol"})
if msg, ok := matchError(err, codes.NotFound, "badcol"); !ok {
t.Error(msg)
}
// Invalid query fails
iter := client.Single().Query(ctx, Statement{SQL: "SELECT Apples AND Oranges"})
defer iter.Stop()
_, err = iter.Next()
errorMessage := "unrecognized name"
if testDialect == adminpb.DatabaseDialect_POSTGRESQL {
errorMessage = "does not exist"
}
if msg, ok := matchError(err, codes.InvalidArgument, errorMessage); !ok {
t.Error(msg)
}
// Read should fail on cancellation.
cctx, cancel := context.WithCancel(ctx)
cancel()
_, err = client.Single().ReadRow(cctx, "TestTable", Key{1}, []string{"StringValue"})
if msg, ok := matchError(err, codes.Canceled, ""); !ok {
t.Error(msg)
}
// Read should fail if deadline exceeded.
dctx, cancel := context.WithTimeout(ctx, time.Nanosecond)
defer cancel()
<-dctx.Done()
_, err = client.Single().ReadRow(dctx, "TestTable", Key{1}, []string{"StringValue"})
if msg, ok := matchError(err, codes.DeadlineExceeded, ""); !ok {
t.Error(msg)
}
// Read should fail if there are multiple rows returned.
_, err = client.Single().ReadRowUsingIndex(ctx, testTable, testTableIndex, Key{"v"}, testTableColumns)
wantMsgPart := fmt.Sprintf("more than one row found by index(Table: %v, IndexKey: %v, Index: %v)", testTable, Key{"v"}, testTableIndex)
if msg, ok := matchError(err, codes.FailedPrecondition, wantMsgPart); !ok {
t.Error(msg)
}
}
// Test TransactionRunner. Test that transactions are aborted and retried as
// expected.
func TestIntegration_TransactionRunner(t *testing.T) {
skipEmulatorTest(t)
t.Parallel()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, statements[testDialect][singerDDLStatements])
defer cleanup()
// Test 1: User error should abort the transaction.
_, _ = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
tx.BufferWrite([]*Mutation{
Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(1), "Foo", int64(50)})})
return errors.New("user error")
})
// Empty read.
rows, err := readAllTestTable(client.Single().Read(ctx, "Accounts", Key{1}, []string{"AccountId", "Nickname", "Balance"}))
if err != nil {
t.Fatal(err)
}
if got, want := len(rows), 0; got != want {
t.Errorf("Empty read, got %d, want %d.", got, want)
}
// Test 2: Expect abort and retry.
// We run two ReadWriteTransactions concurrently and make txn1 abort txn2 by
// committing writes to the column txn2 have read, and expect the following
// read to abort and txn2 retries.
// Set up two accounts
accounts := []*Mutation{
Insert("Accounts", []string{"AccountId", "Balance"}, []interface{}{int64(1), int64(0)}),
Insert("Accounts", []string{"AccountId", "Balance"}, []interface{}{int64(2), int64(1)}),
}
if _, err := client.Apply(ctx, accounts, ApplyAtLeastOnce()); err != nil {
t.Fatal(err)
}
var (
cTxn1Start = make(chan struct{})
cTxn1Commit = make(chan struct{})
cTxn2Start = make(chan struct{})
wg sync.WaitGroup
)
// read balance, check error if we don't expect abort.
readBalance := func(tx interface {
ReadRow(ctx context.Context, table string, key Key, columns []string) (*Row, error)
}, key int64, expectAbort bool) (int64, error) {
var b int64
r, e := tx.ReadRow(ctx, "Accounts", Key{int64(key)}, []string{"Balance"})
if e != nil {
if expectAbort && !isAbortedErr(e) {
t.Errorf("ReadRow got %v, want Abort error.", e)
}
// Verify that we received and are able to extract retry info from
// the aborted error.
if _, hasRetryInfo := ExtractRetryDelay(e); !hasRetryInfo {
t.Errorf("Got Abort error without RetryInfo\nGot: %v", e)
}
return b, e
}
if ce := r.Column(0, &b); ce != nil {
return b, ce
}
return b, nil
}
wg.Add(2)
// Txn 1
go func() {
defer wg.Done()
var once sync.Once
_, e := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
b, e := readBalance(tx, 1, false)
if e != nil {
return e
}
// txn 1 can abort, in that case we skip closing the channel on
// retry.
once.Do(func() { close(cTxn1Start) })
e = tx.BufferWrite([]*Mutation{
Update("Accounts", []string{"AccountId", "Balance"}, []interface{}{int64(1), int64(b + 1)})})
if e != nil {
return e
}
// Wait for second transaction.
<-cTxn2Start
return nil
})
close(cTxn1Commit)
if e != nil {
t.Errorf("Transaction 1 commit, got %v, want nil.", e)
}
}()
// Txn 2
go func() {
// Wait until txn 1 starts.
<-cTxn1Start
defer wg.Done()
var (
once sync.Once
b1 int64
b2 int64
e error
)
_, e = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
if b1, e = readBalance(tx, 1, false); e != nil {
return e
}
// Skip closing channel on retry.
once.Do(func() { close(cTxn2Start) })
// Wait until txn 1 successfully commits.
<-cTxn1Commit
// Txn1 has committed and written a balance to the account. Now this
// transaction (txn2) reads and re-writes the balance. The first
// time through, it will abort because it overlaps with txn1. Then
// it will retry after txn1 commits, and succeed.
if b2, e = readBalance(tx, 2, true); e != nil {
return e
}
return tx.BufferWrite([]*Mutation{
Update("Accounts", []string{"AccountId", "Balance"}, []interface{}{int64(2), int64(b1 + b2)})})
})
if e != nil {
t.Errorf("Transaction 2 commit, got %v, want nil.", e)
}
}()
wg.Wait()
// Check that both transactions' effects are visible.
for i := int64(1); i <= int64(2); i++ {
if b, e := readBalance(client.Single(), i, false); e != nil {
t.Fatalf("ReadBalance for key %d error %v.", i, e)
} else if b != i {
t.Errorf("Balance for key %d, got %d, want %d.", i, b, i)
}
}
}
func TestIntegration_QueryWithRoles(t *testing.T) {
t.Parallel()
// Database roles are not currently available in emulator
skipEmulatorTest(t)
// Set up testing environment.
var (
row *Row
client, clientWithRole *Client
iter *RowIterator
err error
id int64
firstName, lastName string
)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
stmts := []string{
`CREATE ROLE singers_reader`,
`CREATE ROLE singers_unauthorized`,
`CREATE ROLE singers_reader_revoked`,
`CREATE ROLE dropped`,
`DROP ROLE dropped`,
}
if testDialect == adminpb.DatabaseDialect_POSTGRESQL {
stmts = append(stmts,
`CREATE TABLE Singers (
SingerId BIGINT NOT NULL,
FirstName VARCHAR(1024),
LastName VARCHAR(1024),
SingerInfo BYTEA,
PRIMARY KEY (SingerId)
)`,
`GRANT SELECT(SingerId, FirstName, LastName) ON Singers TO singers_reader`,
`GRANT SELECT(SingerId, FirstName) ON TABLE Singers TO singers_unauthorized`,
`GRANT SELECT(SingerId, FirstName, LastName) ON TABLE Singers TO singers_reader_revoked`,
`REVOKE SELECT(LastName) ON TABLE Singers FROM singers_reader_revoked`)
} else {
stmts = append(stmts,
`CREATE TABLE Singers (
SingerId INT64 NOT NULL,
FirstName STRING(1024),
LastName STRING(1024),
SingerInfo BYTES(MAX)
) PRIMARY KEY (SingerId)`,
`GRANT SELECT(SingerId, FirstName, LastName) ON TABLE Singers TO ROLE singers_reader`,
`GRANT SELECT(SingerId, FirstName) ON TABLE Singers TO ROLE singers_unauthorized`,
`GRANT SELECT(SingerId, FirstName, LastName) ON TABLE Singers TO ROLE singers_reader_revoked`,
`REVOKE SELECT(LastName) ON TABLE Singers FROM ROLE singers_reader_revoked`)
}
client, dbPath, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, stmts)
defer cleanup()
singerColumns := []string{"SingerId", "FirstName", "LastName"}
var ms = []*Mutation{
InsertOrUpdate("Singers", singerColumns, []interface{}{1, "Marc", "Richards"}),
}
if _, err := client.Apply(ctx, ms, ApplyAtLeastOnce()); err != nil {
t.Fatalf("Could not insert rows to table. Got error %v", err)
}
queryStmt := Statement{SQL: `SELECT SingerId, FirstName, LastName FROM Singers`}
// A request with sufficient privileges should return all rows
for _, dbRole := range []string{
"",
"singers_reader",
} {
if clientWithRole, err = createClientWithRole(ctx, dbPath, SessionPoolConfig{}, dbRole); err != nil {
t.Fatal(err)
}
defer clientWithRole.Close()
iter = clientWithRole.Single().Query(ctx, queryStmt)
defer iter.Stop()
row, err = iter.Next()
if err != nil {
t.Fatalf("Could not read row. Got error %v", err)
}
if err = row.Columns(&id, &firstName, &lastName); err != nil {
t.Fatalf("failed to parse row %v", err)
}
if id != 1 || firstName != "Marc" || lastName != "Richards" {
t.Fatalf("execution didn't return expected values")
}
_, err = iter.Next()
if err != iterator.Done {
t.Fatalf("got results from iterator, want none: %#v, err = %v\n", row, err)
}
}
// A request with insufficient privileges should return permission denied
for _, test := range []struct {
dbRole string
errMsg string
}{
{
"singers_unauthorized",
"Role singers_unauthorized does not have required privileges on table Singers.",
},
{
"singers_reader_revoked",
"Role singers_reader_revoked does not have required privileges on table Singers.",
},
{
"nonexistent",
"Role not found: nonexistent.",
},
{
"dropped",
"Role not found: dropped.",
},
} {
if clientWithRole, err = createClientWithRole(ctx, dbPath, SessionPoolConfig{}, test.dbRole); err != nil {
t.Fatal(err)
}
defer clientWithRole.Close()
iter = clientWithRole.Single().Query(ctx, queryStmt)
defer iter.Stop()
_, err = iter.Next()
if err == nil {
t.Fatal("expected err, got nil")
}
if msg, ok := matchError(err, codes.PermissionDenied, test.errMsg); !ok {
t.Fatal(msg)
}
}
}
func TestIntegration_ReadWithRoles(t *testing.T) {
t.Parallel()
// Database roles are not currently available in emulator
skipEmulatorTest(t)
// Set up testing environment.
var (
row *Row
client, clientWithRole *Client
iter *RowIterator
err error
id int64
firstName, lastName string
)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
stmts := []string{
`CREATE ROLE singers_reader`,
`CREATE ROLE singers_unauthorized`,
`CREATE ROLE singers_reader_revoked`,
`CREATE ROLE dropped`,
`DROP ROLE dropped`,
}
if testDialect == adminpb.DatabaseDialect_POSTGRESQL {
stmts = append(stmts,
`CREATE TABLE Singers (
SingerId BIGINT NOT NULL,
FirstName VARCHAR(1024),
LastName VARCHAR(1024),
SingerInfo BYTEA,
PRIMARY KEY (SingerId)
)`,
`GRANT SELECT(SingerId, FirstName, LastName) ON Singers TO singers_reader`,
`GRANT SELECT(SingerId, FirstName) ON TABLE Singers TO singers_unauthorized`,
`GRANT SELECT(SingerId, FirstName, LastName) ON TABLE Singers TO singers_reader_revoked`,
`REVOKE SELECT(LastName) ON TABLE Singers FROM singers_reader_revoked`)
} else {
stmts = append(stmts,
`CREATE TABLE Singers (
SingerId INT64 NOT NULL,
FirstName STRING(1024),
LastName STRING(1024),
SingerInfo BYTES(MAX)
) PRIMARY KEY (SingerId)`,
`GRANT SELECT(SingerId, FirstName, LastName) ON TABLE Singers TO ROLE singers_reader`,
`GRANT SELECT(SingerId, FirstName) ON TABLE Singers TO ROLE singers_unauthorized`,
`GRANT SELECT(SingerId, FirstName, LastName) ON TABLE Singers TO ROLE singers_reader_revoked`,
`REVOKE SELECT(LastName) ON TABLE Singers FROM ROLE singers_reader_revoked`)
}
client, dbPath, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, stmts)
defer cleanup()
singerColumns := []string{"SingerId", "FirstName", "LastName"}
var ms = []*Mutation{
InsertOrUpdate("Singers", singerColumns, []interface{}{1, "Marc", "Richards"}),
}
if _, err := client.Apply(ctx, ms, ApplyAtLeastOnce()); err != nil {
t.Fatalf("Could not insert rows to table. Got error %v", err)
}
// A request with sufficient privileges should return all rows
for _, dbRole := range []string{
"",
"singers_reader",
} {
if clientWithRole, err = createClientWithRole(ctx, dbPath, SessionPoolConfig{}, dbRole); err != nil {
t.Fatal(err)
}
defer clientWithRole.Close()
iter = clientWithRole.Single().Read(ctx, "Singers", AllKeys(), singerColumns)
defer iter.Stop()
row, err = iter.Next()
if err != nil {
t.Fatalf("Could not read row. Got error %v", err)
}
if err = row.Columns(&id, &firstName, &lastName); err != nil {
t.Fatalf("failed to parse row %v", err)
}
if id != 1 || firstName != "Marc" || lastName != "Richards" {
t.Fatalf("execution didn't return expected values")
}
_, err = iter.Next()
if err != iterator.Done {
t.Fatalf("got results from iterator, want none: %#v, err = %v\n", row, err)
}
}
// A request with insufficient privileges should return permission denied
for _, test := range []struct {
dbRole string
errMsg string
}{
{
"singers_unauthorized",
"Role singers_unauthorized does not have required privileges on table Singers.",
},
{
"singers_reader_revoked",
"Role singers_reader_revoked does not have required privileges on table Singers.",
},
{
"nonexistent",
"Role not found: nonexistent.",
},
{
"dropped",
"Role not found: dropped.",
},
} {
if clientWithRole, err = createClientWithRole(ctx, dbPath, SessionPoolConfig{}, test.dbRole); err != nil {
t.Fatal(err)
}
defer clientWithRole.Close()
iter = clientWithRole.Single().Read(ctx, "Singers", AllKeys(), singerColumns)
defer iter.Stop()
_, err = iter.Next()
if err == nil {
t.Fatal("expected err, got nil")
}
if msg, ok := matchError(err, codes.PermissionDenied, test.errMsg); !ok {
t.Fatal(msg)
}
}
}
func TestIntegration_DMLWithRoles(t *testing.T) {
t.Parallel()
// Database roles are not currently available in emulator
skipEmulatorTest(t)
// Set up testing environment.
var (
client, clientWithRole *Client
err error
)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
stmts := []string{
`CREATE ROLE singers_updater`,
`CREATE ROLE singers_unauthorized`,
`CREATE ROLE singers_creator`,
`CREATE ROLE singers_deleter`,
}
if testDialect == adminpb.DatabaseDialect_POSTGRESQL {
stmts = append(stmts,
`CREATE TABLE Singers (
SingerId BIGINT NOT NULL,
FirstName VARCHAR(1024),
LastName VARCHAR(1024),
SingerInfo BYTEA,
PRIMARY KEY (SingerId)
)`,
`GRANT SELECT(SingerId), UPDATE(FirstName, LastName) ON Singers TO singers_updater`,
`GRANT SELECT(SingerId), UPDATE(FirstName) ON TABLE Singers TO singers_unauthorized`,
`GRANT INSERT(SingerId, FirstName, LastName) ON TABLE Singers TO singers_creator`,
`GRANT SELECT(SingerId), DELETE ON TABLE Singers TO singers_deleter`)
} else {
stmts = append(stmts,
`CREATE TABLE Singers (
SingerId INT64 NOT NULL,
FirstName STRING(1024),
LastName STRING(1024),
SingerInfo BYTES(MAX)
) PRIMARY KEY (SingerId)`,
`GRANT SELECT(SingerId), UPDATE(FirstName, LastName) ON TABLE Singers TO ROLE singers_updater`,
`GRANT SELECT(SingerId), UPDATE(FirstName) ON TABLE Singers TO ROLE singers_unauthorized`,
`GRANT INSERT(SingerId, FirstName, LastName) ON TABLE Singers TO ROLE singers_creator`,
`GRANT SELECT(SingerId), DELETE ON TABLE Singers TO ROLE singers_deleter`)
}
client, dbPath, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, stmts)
defer cleanup()
singerColumns := []string{"SingerId", "FirstName", "LastName"}
var ms = []*Mutation{
InsertOrUpdate("Singers", singerColumns, []interface{}{1, "Marc", "Richards"}),
}
if _, err := client.Apply(ctx, ms, ApplyAtLeastOnce()); err != nil {
t.Fatalf("Could not insert rows to table. Got error %v", err)
}
updateStmt := Statement{SQL: `UPDATE Singers SET FirstName = 'Mark', LastName = 'Richards' WHERE SingerId = 1`}
// A request with sufficient privileges should update the row
for _, dbRole := range []string{
"",
"singers_updater",
} {
if clientWithRole, err = createClientWithRole(ctx, dbPath, SessionPoolConfig{}, dbRole); err != nil {
t.Fatal(err)
}
defer clientWithRole.Close()
_, err = clientWithRole.ReadWriteTransaction(ctx, func(ctx context.Context, txn *ReadWriteTransaction) error {
_, err := txn.Update(ctx, updateStmt)
return err
})
if err != nil {
t.Fatalf("Could not update row. Got error %v", err)
}
}
// A request with insufficient privileges should return permission denied
for _, test := range []struct {
dbRole string
errMsg string
}{
{
"singers_unauthorized",
"Role singers_unauthorized does not have required privileges on table Singers.",
},
{
"nonexistent",
"Role not found: nonexistent.",
},
} {
if clientWithRole, err = createClientWithRole(ctx, dbPath, SessionPoolConfig{}, test.dbRole); err != nil {
t.Fatal(err)
}
defer clientWithRole.Close()
_, err = clientWithRole.ReadWriteTransaction(ctx, func(ctx context.Context, txn *ReadWriteTransaction) error {
_, err := txn.Update(ctx, updateStmt)
return err
})
if err == nil {
t.Fatal("expected err, got nil")
}
if msg, ok := matchError(err, codes.PermissionDenied, test.errMsg); !ok {
t.Fatal(msg)
}
}
// A request with sufficient privileges should insert the row
getInsertStmt := func(vals []interface{}) Statement {
sql := fmt.Sprintf(`INSERT INTO Singers (SingerId, FirstName, LastName) VALUES (%d, '%s', '%s')`, vals...)
return Statement{SQL: sql}
}
for _, test := range []struct {
dbRole string
vals []interface{}
}{
{
"",
[]interface{}{2, "Catalina", "Smith"},
},
{
"singers_creator",
[]interface{}{3, "Alice", "Trentor"},
},
} {
if clientWithRole, err = createClientWithRole(ctx, dbPath, SessionPoolConfig{}, test.dbRole); err != nil {
t.Fatal(err)
}
defer clientWithRole.Close()
_, err = clientWithRole.ReadWriteTransaction(ctx, func(ctx context.Context, txn *ReadWriteTransaction) error {
_, err := txn.Update(ctx, getInsertStmt(test.vals))
return err
})
if err != nil {
t.Fatalf("Could not insert row. Got error %v", err)
}
}
// A request with sufficient privileges should delete the row
deleteStmt := Statement{SQL: `DELETE FROM Singers WHERE TRUE`}
for _, dbRole := range []string{
"",
"singers_deleter",
} {
if clientWithRole, err = createClientWithRole(ctx, dbPath, SessionPoolConfig{}, dbRole); err != nil {
t.Fatal(err)
}
defer clientWithRole.Close()
_, err = clientWithRole.ReadWriteTransaction(ctx, func(ctx context.Context, txn *ReadWriteTransaction) error {
_, err := txn.Update(ctx, deleteStmt)
return err
})
if err != nil {
t.Fatalf("Could not delete row. Got error %v", err)
}
}
}
func TestIntegration_MutationWithRoles(t *testing.T) {
t.Parallel()
// Database roles are not currently available in emulator
skipEmulatorTest(t)
// Set up testing environment.
var (
client, clientWithRole *Client
err error
)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
stmts := []string{
`CREATE ROLE singers_updater`,
`CREATE ROLE singers_unauthorized`,
`CREATE ROLE singers_creator`,
`CREATE ROLE singers_deleter`,
}
if testDialect == adminpb.DatabaseDialect_POSTGRESQL {
stmts = append(stmts,
`CREATE TABLE Singers (
SingerId BIGINT NOT NULL,
FirstName VARCHAR(1024),
LastName VARCHAR(1024),
SingerInfo BYTEA,
PRIMARY KEY (SingerId)
)`,
`GRANT SELECT(SingerId), UPDATE(SingerId, FirstName, LastName) ON Singers TO singers_updater`,
`GRANT SELECT(SingerId), UPDATE(SingerId, FirstName) ON TABLE Singers TO singers_unauthorized`,
`GRANT INSERT(SingerId, FirstName, LastName) ON TABLE Singers TO singers_creator`,
`GRANT SELECT(SingerId), DELETE ON TABLE Singers TO singers_deleter`)
} else {
stmts = append(stmts,
`CREATE TABLE Singers (
SingerId INT64 NOT NULL,
FirstName STRING(1024),
LastName STRING(1024),
SingerInfo BYTES(MAX)
) PRIMARY KEY (SingerId)`,
`GRANT SELECT(SingerId), UPDATE(SingerId, FirstName, LastName) ON TABLE Singers TO ROLE singers_updater`,
`GRANT SELECT(SingerId), UPDATE(SingerId, FirstName) ON TABLE Singers TO ROLE singers_unauthorized`,
`GRANT INSERT(SingerId, FirstName, LastName) ON TABLE Singers TO ROLE singers_creator`,
`GRANT SELECT(SingerId), DELETE ON TABLE Singers TO ROLE singers_deleter`)
}
client, dbPath, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, stmts)
defer cleanup()
singerColumns := []string{"SingerId", "FirstName", "LastName"}
var ms = []*Mutation{
InsertOrUpdate("Singers", singerColumns, []interface{}{1, "Marc", "Richards"}),
}
if _, err := client.Apply(ctx, ms, ApplyAtLeastOnce()); err != nil {
t.Fatalf("Could not insert rows to table. Got error %v", err)
}
// A request with sufficient privileges should update the row
for _, dbRole := range []string{
"",
"singers_updater",
} {
if clientWithRole, err = createClientWithRole(ctx, dbPath, SessionPoolConfig{}, dbRole); err != nil {
t.Fatal(err)
}
defer clientWithRole.Close()
_, err = clientWithRole.Apply(ctx, []*Mutation{
Update("Singers", singerColumns, []interface{}{1, "Mark", "Richards"}),
})
if err != nil {
t.Fatalf("Could not update row. Got error %v", err)
}
}
// A request with insufficient privileges should return permission denied
for _, test := range []struct {
dbRole string
errMsg string
}{
{
"singers_unauthorized",
"Role singers_unauthorized does not have required privileges on table Singers.",
},
{
"nonexistent",
"Role not found: nonexistent.",
},
} {
if clientWithRole, err = createClientWithRole(ctx, dbPath, SessionPoolConfig{}, test.dbRole); err != nil {
t.Fatal(err)
}
defer clientWithRole.Close()
_, err = clientWithRole.Apply(ctx, []*Mutation{
Update("Singers", singerColumns, []interface{}{1, "Mark", "Richards"}),
})
if err == nil {
t.Fatal("expected err, got nil")
}
if msg, ok := matchError(err, codes.PermissionDenied, test.errMsg); !ok {
t.Fatal(msg)
}
}
// A request with sufficient privileges should insert the row
for _, test := range []struct {
dbRole string
vals []interface{}
}{
{
"",
[]interface{}{2, "Catalina", "Smith"},
},
{
"singers_creator",
[]interface{}{3, "Alice", "Trentor"},
},
} {
if clientWithRole, err = createClientWithRole(ctx, dbPath, SessionPoolConfig{}, test.dbRole); err != nil {
t.Fatal(err)
}
defer clientWithRole.Close()
_, err = clientWithRole.Apply(ctx, []*Mutation{
Insert("Singers", singerColumns, test.vals),
})
if err != nil {
t.Fatalf("Could not insert row. Got error %v", err)
}
}
// A request with sufficient privileges should delete the row
for _, dbRole := range []string{
"",
"singers_deleter",
} {
if clientWithRole, err = createClientWithRole(ctx, dbPath, SessionPoolConfig{}, dbRole); err != nil {
t.Fatal(err)
}
defer clientWithRole.Close()
_, err = clientWithRole.Apply(ctx, []*Mutation{
Delete("Singers", Key{1}),
})
if err != nil {
t.Fatalf("Could not delete row. Got error %v", err)
}
}
}
func TestIntegration_ListDatabaseRoles(t *testing.T) {
t.Parallel()
// Database roles are not currently available in emulator
skipEmulatorTest(t)
// Set up testing environment.
var (
err error
iter *database.DatabaseRoleIterator
)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
stmts := []string{
`CREATE ROLE a`,
`CREATE ROLE z`,
}
_, dbPath, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, stmts)
defer cleanup()
iter = databaseAdmin.ListDatabaseRoles(ctx, &adminpb.ListDatabaseRolesRequest{
Parent: dbPath,
})
roles, err := readDatabaseRoles(iter)
if err != nil {
t.Fatalf("cannot list database roles in %v: %v", dbPath, err)
}
var got []string
rolePrefix := dbPath + "/databaseRoles/"
for _, role := range roles {
if !strings.HasPrefix(role.Name, rolePrefix) {
t.Fatalf("Role %v does not have prefix %v", role.Name, rolePrefix)
}
got = append(got, strings.TrimPrefix(role.Name, rolePrefix))
}
want := []string{"a", "public", "spanner_info_reader", "spanner_sys_reader", "z"}
if !testEqual(got, want) {
t.Fatalf("Database role mismatch\nGot: %v, Want: %v", got, want)
}
}
func readDatabaseRoles(iter *database.DatabaseRoleIterator) ([]*adminpb.DatabaseRole, error) {
var vals []*adminpb.DatabaseRole
for {
v, err := iter.Next()
if err == iterator.Done {
return vals, nil
}
if err != nil {
return nil, err
}
vals = append(vals, v)
}
}
// Test PartitionQuery of BatchReadOnlyTransaction, create partitions then
// serialize and deserialize both transaction and partition to be used in
// execution on another client, and compare results.
func TestIntegration_BatchQuery(t *testing.T) {
t.Parallel()
// Set up testing environment.
var (
client2 *Client
err error
)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
client, dbPath, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, statements[testDialect][simpleDDLStatements])
defer cleanup()
if err = populate(ctx, client); err != nil {
t.Fatal(err)
}
if client2, err = createClient(ctx, dbPath, ClientConfig{SessionPoolConfig: SessionPoolConfig{}}); err != nil {
t.Fatal(err)
}
defer client2.Close()
// PartitionQuery
var (
txn *BatchReadOnlyTransaction
partitions []*Partition
stmt = Statement{SQL: "SELECT * FROM test;"}
)
if txn, err = client.BatchReadOnlyTransaction(ctx, StrongRead()); err != nil {
t.Fatal(err)
}
defer txn.Cleanup(ctx)
if partitions, err = txn.PartitionQueryWithOptions(ctx, stmt, PartitionOptions{0, 3}, QueryOptions{DataBoostEnabled: true}); err != nil {
t.Fatal(err)
}
// Reconstruct BatchReadOnlyTransactionID and execute partitions
var (
tid2 BatchReadOnlyTransactionID
data []byte
gotResult bool // if we get matching result from two separate txns
)
if data, err = txn.ID.MarshalBinary(); err != nil {
t.Fatalf("encoding failed %v", err)
}
if err = tid2.UnmarshalBinary(data); err != nil {
t.Fatalf("decoding failed %v", err)
}
txn2 := client2.BatchReadOnlyTransactionFromID(tid2)
// Execute Partitions and compare results
for i, p := range partitions {
iter := txn.Execute(ctx, p)
defer iter.Stop()
p2 := serdesPartition(t, i, p)
iter2 := txn2.Execute(ctx, &p2)
defer iter2.Stop()
row1, err1 := iter.Next()
row2, err2 := iter2.Next()
if err1 != err2 {
t.Fatalf("execution failed for different reasons: %v, %v", err1, err2)
}
if !testEqual(row1, row2) {
t.Fatalf("execution returned different values: %v, %v", row1, row2)
}
if row1 == nil {
continue
}
var a, b string
if err = row1.Columns(&a, &b); err != nil {
t.Fatalf("failed to parse row %v", err)
}
if a == str1 && b == str2 {
gotResult = true
}
}
if !gotResult {
t.Fatalf("execution didn't return expected values")
}
}
// Test PartitionRead of BatchReadOnlyTransaction, similar to TestBatchQuery
func TestIntegration_BatchRead(t *testing.T) {
t.Parallel()
// skipping PG test because of rpc error: code = InvalidArgument desc = [ERROR] syntax error at or near "." in PartitionRead
skipUnsupportedPGTest(t)
// Set up testing environment.
var (
client2 *Client
err error
)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
client, dbPath, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, statements[testDialect][simpleDDLStatements])
defer cleanup()
if err = populate(ctx, client); err != nil {
t.Fatal(err)
}
if client2, err = createClient(ctx, dbPath, ClientConfig{SessionPoolConfig: SessionPoolConfig{}}); err != nil {
t.Fatal(err)
}
defer client2.Close()
// PartitionRead
var (
txn *BatchReadOnlyTransaction
partitions []*Partition
)
if txn, err = client.BatchReadOnlyTransaction(ctx, StrongRead()); err != nil {
t.Fatal(err)
}
defer txn.Cleanup(ctx)
if partitions, err = txn.PartitionReadWithOptions(ctx, "test", AllKeys(), simpleDBTableColumns, PartitionOptions{0, 3}, ReadOptions{DataBoostEnabled: true}); err != nil {
t.Fatal(err)
}
// Reconstruct BatchReadOnlyTransactionID and execute partitions.
var (
tid2 BatchReadOnlyTransactionID
data []byte
gotResult bool // if we get matching result from two separate txns
)
if data, err = txn.ID.MarshalBinary(); err != nil {
t.Fatalf("encoding failed %v", err)
}
if err = tid2.UnmarshalBinary(data); err != nil {
t.Fatalf("decoding failed %v", err)
}
txn2 := client2.BatchReadOnlyTransactionFromID(tid2)
// Execute Partitions and compare results.
for i, p := range partitions {
iter := txn.Execute(ctx, p)
defer iter.Stop()
p2 := serdesPartition(t, i, p)
iter2 := txn2.Execute(ctx, &p2)
defer iter2.Stop()
row1, err1 := iter.Next()
row2, err2 := iter2.Next()
if err1 != err2 {
t.Fatalf("execution failed for different reasons: %v, %v", err1, err2)
continue
}
if !testEqual(row1, row2) {
t.Fatalf("execution returned different values: %v, %v", row1, row2)
}
if row1 == nil {
continue
}
var a, b string
if err = row1.Columns(&a, &b); err != nil {
t.Fatalf("failed to parse row %v", err)
}
if a == str1 && b == str2 {
gotResult = true
}
}
if !gotResult {
t.Fatalf("execution didn't return expected values")
}
}
// Test normal txReadEnv method on BatchReadOnlyTransaction.
func TestIntegration_BROTNormal(t *testing.T) {
t.Parallel()
// skipping PG test because of rpc error: code = InvalidArgument desc = [ERROR] syntax error at or near "." in PartitionRead
skipUnsupportedPGTest(t)
// Set up testing environment and create txn.
var (
txn *BatchReadOnlyTransaction
err error
row *Row
i int64
)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, statements[testDialect][simpleDDLStatements])
defer cleanup()
if txn, err = client.BatchReadOnlyTransaction(ctx, StrongRead()); err != nil {
t.Fatal(err)
}
defer txn.Cleanup(ctx)
if _, err := txn.PartitionRead(ctx, "test", AllKeys(), simpleDBTableColumns, PartitionOptions{0, 3}); err != nil {
t.Fatal(err)
}
// Normal query should work with BatchReadOnlyTransaction.
stmt2 := Statement{SQL: "SELECT 1"}
iter := txn.Query(ctx, stmt2)
defer iter.Stop()
row, err = iter.Next()
if err != nil {
t.Errorf("query failed with %v", err)
}
if err = row.Columns(&i); err != nil {
t.Errorf("failed to parse row %v", err)
}
}
func TestIntegration_CommitTimestamp(t *testing.T) {
t.Parallel()
skipUnsupportedPGTest(t)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, ctsDBStatements)
defer cleanup()
type testTableRow struct {
Key string
Ts NullTime
}
var (
cts1, cts2, ts1, ts2 time.Time
err error
)
// Apply mutation in sequence, expect to see commit timestamp in good order,
// check also the commit timestamp returned
for _, it := range []struct {
k string
t *time.Time
}{
{"a", &cts1},
{"b", &cts2},
} {
tt := testTableRow{Key: it.k, Ts: NullTime{CommitTimestamp, true}}
m, err := InsertStruct("TestTable", tt)
if err != nil {
t.Fatal(err)
}
*it.t, err = client.Apply(ctx, []*Mutation{m}, ApplyAtLeastOnce())
if err != nil {
t.Fatal(err)
}
}
txn := client.ReadOnlyTransaction()
for _, it := range []struct {
k string
t *time.Time
}{
{"a", &ts1},
{"b", &ts2},
} {
if r, e := txn.ReadRow(ctx, "TestTable", Key{it.k}, []string{"Ts"}); e != nil {
t.Fatal(err)
} else {
var got testTableRow
if err := r.ToStruct(&got); err != nil {
t.Fatal(err)
}
*it.t = got.Ts.Time
}
}
if !cts1.Equal(ts1) {
t.Errorf("Expect commit timestamp returned and read to match for txn1, got %v and %v.", cts1, ts1)
}
if !cts2.Equal(ts2) {
t.Errorf("Expect commit timestamp returned and read to match for txn2, got %v and %v.", cts2, ts2)
}
// Try writing a timestamp in the future to commit timestamp, expect error.
_, err = client.Apply(ctx, []*Mutation{InsertOrUpdate("TestTable", []string{"Key", "Ts"}, []interface{}{"a", time.Now().Add(time.Hour)})}, ApplyAtLeastOnce())
if msg, ok := matchError(err, codes.FailedPrecondition, "Cannot write timestamps in the future"); !ok {
t.Error(msg)
}
}
func TestIntegration_DML(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, statements[testDialect][singerDDLStatements])
defer cleanup()
// Function that reads a single row's first name from within a transaction.
readFirstName := func(tx *ReadWriteTransaction, key int) (string, error) {
row, err := tx.ReadRow(ctx, "Singers", Key{key}, []string{"FirstName"})
if err != nil {
return "", err
}
var fn string
if err := row.Column(0, &fn); err != nil {
return "", err
}
return fn, nil
}
// Function that reads multiple rows' first names from outside a read/write
// transaction.
readFirstNames := func(keys ...int) []string {
var ks []KeySet
for _, k := range keys {
ks = append(ks, Key{k})
}
iter := client.Single().Read(ctx, "Singers", KeySets(ks...), []string{"FirstName"})
var got []string
var fn string
err := iter.Do(func(row *Row) error {
if err := row.Column(0, &fn); err != nil {
return err
}
got = append(got, fn)
return nil
})
if err != nil {
t.Fatalf("readFirstNames(%v): %v", keys, err)
}
return got
}
singersQuery := []string{`INSERT INTO Singers (SingerId, FirstName, LastName) VALUES (1, "Umm", "Kulthum")`,
`INSERT INTO Singers (SingerId, FirstName, LastName) VALUES (2, "Eduard", "Khil")`,
`INSERT INTO Singers (SingerId, FirstName, LastName) VALUES (3, "Audra", "McDonald")`,
`UPDATE Singers SET FirstName = "Oum" WHERE SingerId = 1`,
`UPDATE Singers SET FirstName = "Eddie" WHERE SingerId = 2`,
`INSERT INTO Singers (SingerId, FirstName, LastName) VALUES (3, "Audra", "McDonald")`,
`SELECT FirstName FROM Singers`,
}
if testDialect == adminpb.DatabaseDialect_POSTGRESQL {
singersQuery = []string{`INSERT INTO Singers (SingerId, FirstName, LastName) VALUES (1, 'Umm', 'Kulthum')`,
`INSERT INTO Singers (SingerId, FirstName, LastName) VALUES (2, 'Eduard', 'Khil')`,
`INSERT INTO Singers (SingerId, FirstName, LastName) VALUES (3, 'Audra', 'McDonald')`,
`UPDATE Singers SET FirstName = 'Oum' WHERE SingerId = 1`,
`UPDATE Singers SET FirstName = 'Eddie' WHERE SingerId = 2`,
`INSERT INTO Singers (SingerId, FirstName, LastName) VALUES (3, 'Audra', 'McDonald')`,
`SELECT FirstName FROM Singers`,
}
}
// Use ReadWriteTransaction.Query to execute a DML statement.
_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
iter := tx.Query(ctx, Statement{
SQL: singersQuery[0],
})
defer iter.Stop()
row, err := iter.Next()
if ErrCode(err) == codes.Aborted {
return err
}
if err != iterator.Done {
t.Fatalf("got results from iterator, want none: %#v, err = %v\n", row, err)
}
if iter.RowCount != 1 {
t.Errorf("row count: got %d, want 1", iter.RowCount)
}
// The results of the DML statement should be visible to the transaction.
got, err := readFirstName(tx, 1)
if err != nil {
return err
}
if want := "Umm"; got != want {
t.Errorf("got %q, want %q", got, want)
}
return nil
})
if err != nil {
t.Fatal(err)
}
// Use ReadWriteTransaction.Update to execute a DML statement.
_, err = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
count, err := tx.Update(ctx, Statement{
SQL: singersQuery[1],
})
if err != nil {
return err
}
if count != 1 {
t.Errorf("row count: got %d, want 1", count)
}
got, err := readFirstName(tx, 2)
if err != nil {
return err
}
if want := "Eduard"; got != want {
t.Errorf("got %q, want %q", got, want)
}
return nil
})
if err != nil {
t.Fatal(err)
}
// Roll back a DML statement and confirm that it didn't happen.
var fail = errors.New("fail")
_, err = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
_, err := tx.Update(ctx, Statement{
SQL: singersQuery[2],
})
if err != nil {
return err
}
return fail
})
if err != fail {
t.Fatalf("rolling back: got error %v, want the error 'fail'", err)
}
_, err = client.Single().ReadRow(ctx, "Singers", Key{3}, []string{"FirstName"})
if got, want := ErrCode(err), codes.NotFound; got != want {
t.Errorf("got %s, want %s", got, want)
}
// Run two DML statements in the same transaction.
_, err = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
_, err := tx.Update(ctx, Statement{SQL: singersQuery[3]})
if err != nil {
return err
}
_, err = tx.Update(ctx, Statement{SQL: singersQuery[4]})
if err != nil {
return err
}
return nil
})
if err != nil {
t.Fatal(err)
}
got := readFirstNames(1, 2)
want := []string{"Oum", "Eddie"}
if !testEqual(got, want) {
t.Errorf("got %v, want %v", got, want)
}
// Run a DML statement and an ordinary mutation in the same transaction.
_, err = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
_, err := tx.Update(ctx, Statement{
SQL: singersQuery[5],
})
if err != nil {
return err
}
return tx.BufferWrite([]*Mutation{
Insert("Singers", []string{"SingerId", "FirstName", "LastName"},
[]interface{}{4, "Andy", "Irvine"}),
})
})
if err != nil {
t.Fatal(err)
}
got = readFirstNames(3, 4)
want = []string{"Audra", "Andy"}
if !testEqual(got, want) {
t.Errorf("got %v, want %v", got, want)
}
// Attempt to run a query using update.
_, err = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
_, err := tx.Update(ctx, Statement{SQL: singersQuery[6]})
return err
})
if got, want := ErrCode(err), codes.InvalidArgument; got != want {
t.Errorf("got %s, want %s", got, want)
}
}
func TestIntegration_StructParametersBind(t *testing.T) {
t.Parallel()
skipUnsupportedPGTest(t)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, nil)
defer cleanup()
type tRow []interface{}
type tRows []struct{ trow tRow }
type allFields struct {
Stringf string
Intf int
Boolf bool
Floatf float64
Bytef []byte
Timef time.Time
Datef civil.Date
}
allColumns := []string{
"Stringf",
"Intf",
"Boolf",
"Floatf",
"Bytef",
"Timef",
"Datef",
}
s1 := allFields{"abc", 300, false, 3.45, []byte("foo"), t1, d1}
s2 := allFields{"def", -300, false, -3.45, []byte("bar"), t2, d2}
dynamicStructType := reflect.StructOf([]reflect.StructField{
{Name: "A", Type: reflect.TypeOf(t1), Tag: `spanner:"ff1"`},
})
s3 := reflect.New(dynamicStructType)
s3.Elem().Field(0).Set(reflect.ValueOf(t1))
for i, test := range []struct {
param interface{}
sql string
cols []string
trows tRows
}{
// Struct value.
{
s1,
"SELECT" +
" @p.Stringf," +
" @p.Intf," +
" @p.Boolf," +
" @p.Floatf," +
" @p.Bytef," +
" @p.Timef," +
" @p.Datef",
allColumns,
tRows{
{tRow{"abc", 300, false, 3.45, []byte("foo"), t1, d1}},
},
},
// Array of struct value.
{
[]allFields{s1, s2},
"SELECT * FROM UNNEST(@p)",
allColumns,
tRows{
{tRow{"abc", 300, false, 3.45, []byte("foo"), t1, d1}},
{tRow{"def", -300, false, -3.45, []byte("bar"), t2, d2}},
},
},
// Null struct.
{
(*allFields)(nil),
"SELECT @p IS NULL",
[]string{""},
tRows{
{tRow{true}},
},
},
// Null Array of struct.
{
[]allFields(nil),
"SELECT @p IS NULL",
[]string{""},
tRows{
{tRow{true}},
},
},
// Empty struct.
{
struct{}{},
"SELECT @p IS NULL ",
[]string{""},
tRows{
{tRow{false}},
},
},
// Empty array of struct.
{
[]allFields{},
"SELECT * FROM UNNEST(@p) ",
allColumns,
tRows{},
},
// Struct with duplicate fields.
{
struct {
A int `spanner:"field"`
B int `spanner:"field"`
}{10, 20},
"SELECT * FROM UNNEST([@p]) ",
[]string{"field", "field"},
tRows{
{tRow{10, 20}},
},
},
// Struct with unnamed fields.
{
struct {
A string `spanner:""`
}{"hello"},
"SELECT * FROM UNNEST([@p]) ",
[]string{""},
tRows{
{tRow{"hello"}},
},
},
// Mixed struct.
{
struct {
DynamicStructField interface{} `spanner:"f1"`
ArrayStructField []*allFields `spanner:"f2"`
}{
DynamicStructField: s3.Interface(),
ArrayStructField: []*allFields{nil},
},
"SELECT @p.f1.ff1, ARRAY_LENGTH(@p.f2), @p.f2[OFFSET(0)] IS NULL ",
[]string{"ff1", "", ""},
tRows{
{tRow{t1, 1, true}},
},
},
} {
iter := client.Single().Query(ctx, Statement{
SQL: test.sql,
Params: map[string]interface{}{"p": test.param},
})
var gotRows []*Row
err := iter.Do(func(r *Row) error {
gotRows = append(gotRows, r)
return nil
})
if err != nil {
t.Errorf("Failed to execute test case %d, error: %v", i, err)
}
var wantRows []*Row
for j, row := range test.trows {
r, err := NewRow(test.cols, row.trow)
if err != nil {
t.Errorf("Invalid row %d in test case %d", j, i)
}
wantRows = append(wantRows, r)
}
if !testEqual(gotRows, wantRows) {
t.Errorf("%d: Want result %v, got result %v", i, wantRows, gotRows)
}
}
}
func TestIntegration_PDML(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, statements[testDialect][singerDDLStatements])
defer cleanup()
columns := []string{"SingerId", "FirstName", "LastName"}
// Populate the Singers table.
var muts []*Mutation
for _, row := range [][]interface{}{
{1, "Umm", "Kulthum"},
{2, "Eduard", "Khil"},
{3, "Audra", "McDonald"},
{4, "Enrique", "Iglesias"},
{5, "Shakira", "Ripoll"},
} {
muts = append(muts, Insert("Singers", columns, row))
}
if _, err := client.Apply(ctx, muts); err != nil {
t.Fatal(err)
}
query := `UPDATE Singers SET Singers.FirstName = "changed" WHERE Singers.SingerId >= 1 AND Singers.SingerId <= @p1`
if testDialect == adminpb.DatabaseDialect_POSTGRESQL {
query = `UPDATE Singers SET FirstName = 'changed' WHERE SingerId >= 1 AND SingerId <= $1`
}
// Identifiers in PDML statements must be fully qualified.
// TODO(jba): revisit the above.
count, err := client.PartitionedUpdate(ctx, Statement{
SQL: query,
Params: map[string]interface{}{
"p1": 3,
},
})
if err != nil {
t.Fatal(err)
}
if want := int64(3); count != want {
t.Fatalf("got %d, want %d", count, want)
}
got, err := readAll(client.Single().Read(ctx, "Singers", AllKeys(), columns))
if err != nil {
t.Fatal(err)
}
want := [][]interface{}{
{int64(1), "changed", "Kulthum"},
{int64(2), "changed", "Khil"},
{int64(3), "changed", "McDonald"},
{int64(4), "Enrique", "Iglesias"},
{int64(5), "Shakira", "Ripoll"},
}
if !testEqual(got, want) {
t.Errorf("\ngot %v\nwant%v", got, want)
}
}
func TestIntegration_BatchDML(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, statements[testDialect][singerDDLStatements])
defer cleanup()
columns := []string{"SingerId", "FirstName", "LastName"}
// Populate the Singers table.
var muts []*Mutation
for _, row := range [][]interface{}{
{1, "Umm", "Kulthum"},
{2, "Eduard", "Khil"},
{3, "Audra", "McDonald"},
} {
muts = append(muts, Insert("Singers", columns, row))
}
if _, err := client.Apply(ctx, muts); err != nil {
t.Fatal(err)
}
singersQuery := []string{`UPDATE Singers SET Singers.FirstName = "changed 1" WHERE Singers.SingerId = 1`,
`UPDATE Singers SET Singers.FirstName = "changed 2" WHERE Singers.SingerId = 2`,
`UPDATE Singers SET Singers.FirstName = "changed 3" WHERE Singers.SingerId = 3`,
}
if testDialect == adminpb.DatabaseDialect_POSTGRESQL {
singersQuery = []string{`UPDATE Singers SET FirstName = 'changed 1' WHERE SingerId = 1`,
`UPDATE Singers SET FirstName = 'changed 2' WHERE SingerId = 2`,
`UPDATE Singers SET FirstName = 'changed 3' WHERE SingerId = 3`,
}
}
var counts []int64
_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) (err error) {
counts, err = tx.BatchUpdate(ctx, []Statement{
{SQL: singersQuery[0]},
{SQL: singersQuery[1]},
{SQL: singersQuery[2]},
})
return err
})
if err != nil {
t.Fatal(err)
}
if want := []int64{1, 1, 1}; !testEqual(counts, want) {
t.Fatalf("got %d, want %d", counts, want)
}
got, err := readAll(client.Single().Read(ctx, "Singers", AllKeys(), columns))
if err != nil {
t.Fatal(err)
}
want := [][]interface{}{
{int64(1), "changed 1", "Kulthum"},
{int64(2), "changed 2", "Khil"},
{int64(3), "changed 3", "McDonald"},
}
if !testEqual(got, want) {
t.Errorf("\ngot %v\nwant%v", got, want)
}
}
func TestIntegration_BatchDML_NoStatements(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, statements[testDialect][singerDDLStatements])
defer cleanup()
_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) (err error) {
_, err = tx.BatchUpdate(ctx, []Statement{})
return err
})
if err == nil {
t.Fatal("expected error, got nil")
}
if s, ok := status.FromError(err); ok {
if s.Code() != codes.InvalidArgument {
t.Fatalf("expected InvalidArgument, got %v", err)
}
} else {
t.Fatalf("expected InvalidArgument, got %v", err)
}
}
func TestIntegration_BatchDML_TwoStatements(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, statements[testDialect][singerDDLStatements])
defer cleanup()
columns := []string{"SingerId", "FirstName", "LastName"}
// Populate the Singers table.
var muts []*Mutation
for _, row := range [][]interface{}{
{1, "Umm", "Kulthum"},
{2, "Eduard", "Khil"},
{3, "Audra", "McDonald"},
} {
muts = append(muts, Insert("Singers", columns, row))
}
if _, err := client.Apply(ctx, muts); err != nil {
t.Fatal(err)
}
singersQuery := []string{`UPDATE Singers SET Singers.FirstName = "changed 1" WHERE Singers.SingerId = 1`,
`UPDATE Singers SET Singers.FirstName = "changed 2" WHERE Singers.SingerId = 2`,
`UPDATE Singers SET Singers.FirstName = "changed 3" WHERE Singers.SingerId = 3`,
`UPDATE Singers SET Singers.FirstName = "changed 1" WHERE Singers.SingerId = 1`}
if testDialect == adminpb.DatabaseDialect_POSTGRESQL {
singersQuery = []string{`UPDATE Singers SET FirstName = 'changed 1' WHERE SingerId = 1`,
`UPDATE Singers SET FirstName = 'changed 2' WHERE SingerId = 2`,
`UPDATE Singers SET FirstName = 'changed 3' WHERE SingerId = 3`,
`UPDATE Singers SET FirstName = 'changed 1' WHERE SingerId = 1`}
}
var updateCount int64
var batchCounts []int64
_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) (err error) {
batchCounts, err = tx.BatchUpdate(ctx, []Statement{
{SQL: singersQuery[0]},
{SQL: singersQuery[1]},
{SQL: singersQuery[2]},
})
if err != nil {
return err
}
updateCount, err = tx.Update(ctx, Statement{SQL: singersQuery[3]})
return err
})
if err != nil {
t.Fatal(err)
}
if want := []int64{1, 1, 1}; !testEqual(batchCounts, want) {
t.Fatalf("got %d, want %d", batchCounts, want)
}
if updateCount != 1 {
t.Fatalf("got %v, want 1", updateCount)
}
}
func TestIntegration_BatchDML_Error(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, statements[testDialect][singerDDLStatements])
defer cleanup()
columns := []string{"SingerId", "FirstName", "LastName"}
// Populate the Singers table.
var muts []*Mutation
for _, row := range [][]interface{}{
{1, "Umm", "Kulthum"},
{2, "Eduard", "Khil"},
{3, "Audra", "McDonald"},
} {
muts = append(muts, Insert("Singers", columns, row))
}
if _, err := client.Apply(ctx, muts); err != nil {
t.Fatal(err)
}
singersQuery := []string{`UPDATE Singers SET Singers.FirstName = "changed 1" WHERE Singers.SingerId = 1`,
`some illegal statement`,
`UPDATE Singers SET Singers.FirstName = "changed 3" WHERE Singers.SingerId = 3`}
if testDialect == adminpb.DatabaseDialect_POSTGRESQL {
singersQuery = []string{`UPDATE Singers SET FirstName = 'changed 1' WHERE SingerId = 1`,
`some illegal statement`,
`UPDATE Singers SET FirstName = 'changed 3' WHERE SingerId = 3`}
}
_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) (err error) {
counts, err := tx.BatchUpdate(ctx, []Statement{
{SQL: singersQuery[0]},
{SQL: singersQuery[1]},
{SQL: singersQuery[2]},
})
if err == nil {
t.Fatal("expected err, got nil")
}
// The statement may also have been aborted by Spanner, and in that
// case we should just let the client library retry the transaction.
if status.Code(err) == codes.Aborted {
return err
}
if want := []int64{1}; !testEqual(counts, want) {
t.Fatalf("got %d, want %d", counts, want)
}
got, err := readAll(tx.Read(ctx, "Singers", AllKeys(), columns))
// Aborted error is ok, just retry the transaction.
if status.Code(err) == codes.Aborted {
return err
}
if err != nil {
t.Fatal(err)
}
want := [][]interface{}{
{int64(1), "changed 1", "Kulthum"},
{int64(2), "Eduard", "Khil"},
{int64(3), "Audra", "McDonald"},
}
if !testEqual(got, want) {
t.Errorf("\ngot %v\nwant%v", got, want)
}
return nil
})
if err != nil {
t.Fatal(err)
}
}
func TestIntegration_PGNumeric(t *testing.T) {
onlyRunForPGTest(t)
skipEmulatorTest(t)
t.Parallel()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
client, _, cleanup := prepareIntegrationTestForPG(ctx, t, DefaultSessionPoolConfig, singerDBPGStatements)
defer cleanup()
_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
count, err := tx.Update(ctx, Statement{
SQL: `INSERT INTO Singers (SingerId, numeric, float8) VALUES ($1, $2, $3)`,
Params: map[string]interface{}{
"p1": int64(123),
"p2": PGNumeric{"123.456789", true},
"p3": float64(123.456),
},
})
if err != nil {
return err
}
if count != 1 {
t.Errorf("row count: got %d, want 1", count)
}
count, err = tx.Update(ctx, Statement{
SQL: `INSERT INTO Singers (SingerId, numeric, float8) VALUES ($1, $2, $3)`,
Params: map[string]interface{}{
"p1": int64(456),
"p2": PGNumeric{"NaN", true},
"p3": float64(12345.6),
},
})
if err != nil {
return err
}
if count != 1 {
t.Errorf("row count: got %d, want 1", count)
}
return nil
})
if err != nil {
t.Fatal(err)
}
iter := client.Single().Query(ctx, Statement{SQL: "SELECT SingerId, numeric, float8 FROM Singers"})
got, err := readPGSingerTable(iter)
if err != nil {
t.Fatalf("failed to read data: %v", err)
}
want := [][]interface{}{
{int64(123), PGNumeric{"123.456789", true}, float64(123.456)},
{int64(456), PGNumeric{"NaN", true}, float64(12345.6)},
}
if !testEqual(got, want) {
t.Errorf("\ngot %v\nwant%v", got, want)
}
}
func TestIntegration_PGJSONB(t *testing.T) {
onlyRunForPGTest(t)
skipEmulatorTest(t)
t.Parallel()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
client, _, cleanup := prepareIntegrationTestForPG(ctx, t, DefaultSessionPoolConfig, singerDBPGStatements)
defer cleanup()
type Message struct {
Name string
Body string
Time int64
}
msg := Message{"Alice", "Hello", 1294706395881547000}
jsonStr := `{"Name":"Alice","Body":"Hello","Time":1294706395881547000}`
var unmarshalledJSONstruct interface{}
json.Unmarshal([]byte(jsonStr), &unmarshalledJSONstruct)
tests := []struct {
col string
val interface{}
want interface{}
}{
{col: "JSONB", val: PGJsonB{Value: msg, Valid: true}, want: PGJsonB{Value: unmarshalledJSONstruct, Valid: true}},
{col: "JSONB", val: PGJsonB{Value: msg, Valid: false}, want: PGJsonB{}},
}
// Write rows into table first using DML.
statements := make([]Statement, 0)
for i, test := range tests {
stmt := NewStatement(fmt.Sprintf("INSERT INTO Types (RowId, %s) VALUES ($1, $2)", test.col))
// Note: We are not setting the parameter type here to ensure that it
// can be automatically recognized when it is actually needed.
stmt.Params["p1"] = i
stmt.Params["p2"] = test.val
statements = append(statements, stmt)
}
_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
rowCounts, err := tx.BatchUpdate(ctx, statements)
if err != nil {
return err
}
if len(rowCounts) != len(tests) {
return fmt.Errorf("rowCounts length mismatch\nGot: %v\nWant: %v", len(rowCounts), len(tests))
}
for i, c := range rowCounts {
if c != 1 {
return fmt.Errorf("row count mismatch for row %v:\nGot: %v\nWant: %v", i, c, 1)
}
}
return nil
})
if err != nil {
t.Fatalf("failed to insert values using DML: %v", err)
}
// Delete all the rows so we can insert them using mutations as well.
_, err = client.Apply(ctx, []*Mutation{Delete("Types", AllKeys())})
if err != nil {
t.Fatalf("failed to delete all rows: %v", err)
}
// Verify that we can insert the rows using mutations.
var muts []*Mutation
for i, test := range tests {
muts = append(muts, InsertOrUpdate("Types", []string{"RowID", test.col}, []interface{}{i, test.val}))
}
if _, err := client.Apply(ctx, muts, ApplyAtLeastOnce()); err != nil {
t.Fatal(err)
}
for i, test := range tests {
row, err := client.Single().ReadRow(ctx, "Types", []interface{}{i}, []string{test.col})
if err != nil {
t.Fatalf("Unable to fetch row %v: %v", i, err)
}
verifyDirectPathRemoteAddress(t)
// Create new instance of type of test.want.
want := test.want
if want == nil {
want = test.val
}
gotp := reflect.New(reflect.TypeOf(want))
if err := row.Column(0, gotp.Interface()); err != nil {
t.Errorf("%d: col:%v val:%#v, %v", i, test.col, test.val, err)
continue
}
got := reflect.Indirect(gotp).Interface()
// One of the test cases is checking NaN handling. Given
// NaN!=NaN, we can't use reflect to test for it.
if isNaN(got) && isNaN(want) {
continue
}
// Check non-NaN cases.
if !testEqual(got, want) {
t.Errorf("%d: col:%v val:%#v, got %#v, want %#v", i, test.col, test.val, got, want)
continue
}
}
}
func readPGSingerTable(iter *RowIterator) ([][]interface{}, error) {
defer iter.Stop()
var vals [][]interface{}
for {
row, err := iter.Next()
if err == iterator.Done {
return vals, nil
}
if err != nil {
return nil, err
}
var id int64
var numeric PGNumeric
var float8 float64
err = row.Columns(&id, &numeric, &float8)
if err != nil {
return nil, err
}
vals = append(vals, []interface{}{id, numeric, float8})
}
}
func TestIntegration_StartBackupOperation(t *testing.T) {
t.Skip("https://github.com/googleapis/google-cloud-go/issues/6200")
skipEmulatorTest(t)
skipUnsupportedPGTest(t)
t.Parallel()
startTime := time.Now()
// Backups can be slow, so use 1 hour timeout.
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Hour)
defer cancel()
client, testDatabaseName, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, statements[testDialect][backupDDLStatements])
defer cleanup()
// Set up 1 singer to have backup size greater than zero
singers := []*Mutation{
Insert("Singers", []string{"SingerId", "FirstName", "LastName"}, []interface{}{int64(1), "test", "test"}),
}
if _, err := client.Apply(ctx, singers, ApplyAtLeastOnce()); err != nil {
t.Fatal(err)
}
backupID := backupIDSpace.New()
backupName := fmt.Sprintf("projects/%s/instances/%s/backups/%s", testProjectID, testInstanceID, backupID)
// Minimum expiry time is 6 hours
expires := time.Now().Add(time.Hour * 7)
respLRO, err := databaseAdmin.StartBackupOperation(ctx, backupID, testDatabaseName, expires)
if err != nil {
t.Fatal(err)
}
_, err = respLRO.Wait(ctx)
if err != nil {
t.Fatal(err)
}
t.Logf("create backup operation took: %v\n", time.Since(startTime))
respMetadata, err := respLRO.Metadata()
if err != nil {
t.Fatalf("backup response metadata, got error %v, want nil", err)
}
if respMetadata.Database != testDatabaseName {
t.Fatalf("backup database name, got %s, want %s", respMetadata.Database, testDatabaseName)
}
if respMetadata.Progress.ProgressPercent != 100 {
t.Fatalf("backup progress percent, got %d, want 100", respMetadata.Progress.ProgressPercent)
}
respCheck, err := databaseAdmin.GetBackup(ctx, &adminpb.GetBackupRequest{Name: backupName})
if err != nil {
t.Fatalf("backup metadata, got error %v, want nil", err)
}
if respCheck.CreateTime == nil {
t.Fatal("backup create time, got nil, want non-nil")
}
if respCheck.State != adminpb.Backup_READY {
t.Fatalf("backup state, got %v, want %v", respCheck.State, adminpb.Backup_READY)
}
if respCheck.SizeBytes == 0 {
t.Fatalf("backup size, got %d, want non-zero", respCheck.SizeBytes)
}
}
// TestIntegration_DirectPathFallback tests the CFE fallback when the directpath net is blackholed.
func TestIntegration_DirectPathFallback(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
// Set up testing environment.
client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, statements[testDialect][readDDLStatements])
defer cleanup()
if !dpConfig.attemptDirectPath {
return
}
if len(blackholeDpv6Cmd) == 0 {
t.Fatal("-it.blackhole-dpv6-cmd unset")
}
if len(blackholeDpv4Cmd) == 0 {
t.Fatal("-it.blackhole-dpv4-cmd unset")
}
if len(allowDpv6Cmd) == 0 {
t.Fatal("-it.allowdpv6-cmd unset")
}
if len(allowDpv4Cmd) == 0 {
t.Fatal("-it.allowdpv4-cmd unset")
}
// Precondition: wait for DirectPath to connect.
countEnough := examineTraffic(ctx, client /*blackholeDP = */, false)
if !countEnough {
t.Fatalf("Failed to observe RPCs over DirectPath")
}
// Enable the blackhole, which will prevent communication with grpclb and thus DirectPath.
blackholeOrAllowDirectPath(t /*blackholeDP = */, true)
countEnough = examineTraffic(ctx, client /*blackholeDP = */, true)
if !countEnough {
t.Fatalf("Failed to fallback to CFE after blackhole DirectPath")
}
// Disable the blackhole, and client should use DirectPath again.
blackholeOrAllowDirectPath(t /*blackholeDP = */, false)
countEnough = examineTraffic(ctx, client /*blackholeDP = */, false)
if !countEnough {
t.Fatalf("Failed to fallback to CFE after blackhole DirectPath")
}
}
func TestIntegration_Foreign_Key_Delete_Cascade_Action(t *testing.T) {
skipEmulatorTest(t)
t.Parallel()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
// create table with foreign key actions
client, dbPath, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, statements[testDialect][fkdcDDLStatements])
defer cleanup()
var tests = []struct {
name string
test func() error
validate func()
wantErr error
}{
{
name: "add delete cascade constraint",
test: func() error {
constraintName := "FKShoppingCartsCustomerName"
if testDialect == adminpb.DatabaseDialect_POSTGRESQL {
constraintName = `"FKShoppingCartsCustomerName"`
}
addConstraintDDL := fmt.Sprintf("ALTER TABLE ShoppingCarts ADD CONSTRAINT %s FOREIGN KEY (CustomerName) REFERENCES Customers(CustomerName) ON DELETE CASCADE", constraintName)
op, err := databaseAdmin.UpdateDatabaseDdl(ctx, &adminpb.UpdateDatabaseDdlRequest{
Database: dbPath,
Statements: []string{addConstraintDDL},
})
if err != nil {
return err
}
if err := op.Wait(ctx); err != nil {
return err
}
return nil
},
validate: func() {
constraintName := `"FKShoppingCartsCustomerName"`
if testDialect == adminpb.DatabaseDialect_POSTGRESQL {
constraintName = `'FKShoppingCartsCustomerName'`
}
got, err := readAll(client.Single().Query(ctx, Statement{
fmt.Sprintf(`SELECT 1, '', DELETE_RULE FROM INFORMATION_SCHEMA.REFERENTIAL_CONSTRAINTS WHERE CONSTRAINT_NAME = %s`, constraintName),
map[string]interface{}{}}))
if err != nil {
t.Fatalf("Expect to read the delete_rule from information_schema, got: %v", err)
}
if !testEqual(got, [][]interface{}{{int64(1), "", "CASCADE"}}) {
t.Error("DELETE_RULE is not CASCADE")
}
},
},
{
name: "drop delete cascade constraint",
test: func() error {
constraintName := "FKShoppingCartsCustomerName"
if testDialect == adminpb.DatabaseDialect_POSTGRESQL {
constraintName = `"FKShoppingCartsCustomerName"`
}
dropConstraintDDL := fmt.Sprintf("ALTER TABLE ShoppingCarts DROP CONSTRAINT %s", constraintName)
op, err := databaseAdmin.UpdateDatabaseDdl(ctx, &adminpb.UpdateDatabaseDdlRequest{
Database: dbPath,
Statements: []string{dropConstraintDDL},
})
if err != nil {
return err
}
if err := op.Wait(ctx); err != nil {
return err
}
return nil
},
validate: func() {
constraintName := `"FKShoppingCartsCustomerName"`
if testDialect == adminpb.DatabaseDialect_POSTGRESQL {
constraintName = `'FKShoppingCartsCustomerName'`
}
got, err := readAll(client.Single().Query(ctx, Statement{
fmt.Sprintf(`SELECT 1, '', DELETE_RULE FROM INFORMATION_SCHEMA.REFERENTIAL_CONSTRAINTS WHERE CONSTRAINT_NAME = %s`, constraintName),
map[string]interface{}{}}))
if err != nil {
t.Fatalf("Expect to read the delete_rule from information_schema, got: %v", err)
}
var want [][]interface{}
if !testEqual(got, want) {
t.Error("DELETE_RULE should be empty")
}
},
},
{
name: "success: insert a row in referencing table",
test: func() error {
// Populate the parent table.
columns := []string{"CustomerId", "CustomerName"}
var muts []*Mutation
for _, row := range [][]interface{}{
{1, "FKCustomer1"},
{2, "FKCustomer2"},
{3, "FKCustomer3"},
} {
muts = append(muts, Insert("Customers", columns, row))
}
if _, err := client.Apply(ctx, muts); err != nil {
return err
}
// Populate the referencing table.
columns = []string{"CartId", "CustomerId", "CustomerName"}
// Populate the parent table.
muts = []*Mutation{}
for _, row := range [][]interface{}{
{1, 1, "FKCustomer1"},
{2, 1, "FKCustomer1"},
{3, 2, "FKCustomer2"},
} {
muts = append(muts, Insert("ShoppingCarts", columns, row))
}
if _, err := client.Apply(ctx, muts); err != nil {
return err
}
return nil
},
validate: func() {},
},
{
name: "success: deleting a row in referenced table should delete all rows referencing it",
test: func() error {
_, err := client.Apply(ctx, []*Mutation{Delete("Customers", Key{1})})
return err
},
validate: func() {
if _, err := client.Single().ReadRow(ctx, "ShoppingCarts", Key{1}, []string{"CartId"}); err == nil {
t.Fatalf("expected to return not found error after deleting in referenced table")
}
if _, err := client.Single().ReadRow(ctx, "ShoppingCarts", Key{2}, []string{"CartId"}); err == nil {
t.Fatalf("expected to return not found error after deleting in referenced table")
}
},
},
{
name: "failure: conflicting insert and delete of referenced key",
test: func() error {
columns := []string{"CustomerId", "CustomerName"}
var muts []*Mutation
for _, row := range [][]interface{}{
{3, "FKCustomer3"},
} {
muts = append(muts, Insert("Customers", columns, row))
}
muts = append(muts, Delete("Customers", Key{3}))
_, err := client.Apply(ctx, muts)
return err
},
wantErr: spannerErrorf(codes.FailedPrecondition, "Cannot write a value for the referenced column `Customers.CustomerId` and delete it in the same transaction."),
validate: func() {},
},
{
name: "failure: insert in child table and deleting referenced key from parent table",
test: func() error {
columns := []string{"CartId", "CustomerId", "CustomerName"}
// Populate the parent table.
muts := []*Mutation{}
for _, row := range [][]interface{}{
{4, 3, "FKCustomer3"},
} {
muts = append(muts, Insert("ShoppingCarts", columns, row))
}
muts = append(muts, Delete("Customers", Key{3}))
_, err := client.Apply(ctx, muts)
return err
},
wantErr: spannerErrorf(codes.FailedPrecondition, "Foreign key constraint `FKShoppingCartsCustomerId` is violated on table `ShoppingCarts`. Cannot find referenced values in Customers(CustomerId)."),
validate: func() {},
},
{
name: "failure: insert a row in referencing table with reference key not present",
test: func() error {
// inset in the referencing table.
columns := []string{"CartId", "CustomerId", "CustomerName"}
muts := []*Mutation{}
for _, row := range [][]interface{}{
{1, 100, "FKCustomer1"},
} {
muts = append(muts, Insert("ShoppingCarts", columns, row))
}
if _, err := client.Apply(ctx, muts); err != nil {
return err
}
return nil
},
wantErr: spannerErrorf(codes.FailedPrecondition, "Foreign key constraint `FKShoppingCartsCustomerId` is violated on table `ShoppingCarts`. Cannot find referenced values in Customers(CustomerId)."),
validate: func() {},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
gotErr := tt.test()
// convert the error to lower case because resource names are in lower case for PG dialect.
if gotErr != nil && strings.ToLower(gotErr.Error()) != strings.ToLower(tt.wantErr.Error()) {
t.Errorf("FKDC error=%v, wantErr: %v", gotErr, tt.wantErr)
} else {
tt.validate()
}
})
}
}
func TestIntegration_GFE_Latency(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
te := testutil.NewTestExporter(GFEHeaderMissingCountView, GFELatencyView)
setGFELatencyMetricsFlag(true)
client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, statements[testDialect][singerDDLStatements])
defer cleanup()
singerColumns := []string{"SingerId", "FirstName", "LastName"}
var ms = []*Mutation{
InsertOrUpdate("Singers", singerColumns, []interface{}{1, "Marc", "Richards"}),
}
_, err := client.Apply(ctx, ms)
if err != nil {
t.Fatalf("Could not insert rows to table. Got error %v", err)
}
_, err = client.Single().ReadRow(ctx, "Singers", Key{1}, []string{"SingerId", "FirstName", "LastName"})
if err != nil {
t.Fatalf("Could not read row. Got error %v", err)
}
waitErr := &Error{}
waitFor(t, func() error {
select {
case stat := <-te.Stats:
if len(stat.Rows) > 0 {
return nil
}
}
return waitErr
})
var viewMap = map[string]bool{statsPrefix + "gfe_latency": false,
statsPrefix + "gfe_header_missing_count": false,
}
for {
if viewMap[statsPrefix+"gfe_latency"] || viewMap[statsPrefix+"gfe_header_missing_count"] {
break
}
select {
case stat := <-te.Stats:
if len(stat.Rows) == 0 {
t.Fatal("No metrics are exported")
}
if stat.View.Measure.Name() != statsPrefix+"gfe_latency" && stat.View.Measure.Name() != statsPrefix+"gfe_header_missing_count" {
t.Fatalf("Incorrect measure: got %v, want %v", stat.View.Measure.Name(), statsPrefix+"gfe_latency or "+statsPrefix+"gfe_header_missing_count")
} else {
viewMap[stat.View.Measure.Name()] = true
}
for _, row := range stat.Rows {
m := getTagMap(row.Tags)
checkCommonTagsGFELatency(t, m)
var data string
switch row.Data.(type) {
default:
data = fmt.Sprintf("%v", row.Data)
case *view.CountData:
data = fmt.Sprintf("%v", row.Data.(*view.CountData).Value)
case *view.LastValueData:
data = fmt.Sprintf("%v", row.Data.(*view.LastValueData).Value)
case *view.DistributionData:
data = fmt.Sprintf("%v", row.Data.(*view.DistributionData).Count)
}
if got, want := fmt.Sprintf("%v", data), "0"; got <= want {
t.Fatalf("Incorrect data: got %v, wanted more than %v for metric %v", got, want, stat.View.Measure.Name())
}
}
case <-time.After(10 * time.Second):
if !viewMap[statsPrefix+"gfe_latency"] && !viewMap[statsPrefix+"gfe_header_missing_count"] {
t.Fatal("no stats were exported before timeout")
}
}
}
DisableGfeLatencyAndHeaderMissingCountViews()
}
func TestIntegration_Bit_Reversed_Sequence(t *testing.T) {
skipEmulatorTest(t)
t.Parallel()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
// create table with bit reverse seq options
client, dbPath, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, statements[testDialect][testTableBitReversedSeqStatements])
defer cleanup()
returningSQL := `THEN RETURN`
internalStateSQL := `GET_INTERNAL_SEQUENCE_STATE(Sequence seqT)`
if testDialect == adminpb.DatabaseDialect_POSTGRESQL {
returningSQL = `RETURNING`
internalStateSQL = `spanner.get_internal_sequence_state('seqT')`
}
var tests = []struct {
name string
test func() error
wantErr error
}{
{
name: "success: inserted rows should have auto generated keys",
test: func() error {
var (
values [][]interface{}
err error
)
_, err = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
iter := tx.Query(ctx, NewStatement("INSERT INTO T (value) VALUES (100), (200), (300) "+returningSQL+" id, counter"))
values, err = readAllBitReversedSeqTable(iter, true)
return err
})
if len(values) != 3 {
return errors.New("expected 3 rows to be inserted")
}
counter := int64(0)
for i := 0; i < 3; i++ {
newID, newCounter := values[i][0].(int64), values[i][1].(int64)
if newID <= 0 {
return errors.New("expected id1, id2, id3 > 0")
}
if newCounter < counter {
return errors.New("expected c3 >= c2 >= c1")
}
counter = newCounter
}
iter := client.Single().Query(ctx, NewStatement("SELECT "+internalStateSQL))
r, err := iter.Next()
if err != nil {
return err
}
var c3 int64
err = r.Columns(&c3)
if err != nil {
return err
}
if c3 > counter {
return errors.New("expected c3 <= SELECT GET_INTERNAL_SEQUENCE_STATE(Sequence seqT)")
}
return err
},
},
{
name: "success: reduce ranges to half",
test: func() error {
ddl := `ALTER SEQUENCE seqT SET OPTIONS (
skip_range_min = 0,
skip_range_max = 4611686018427387904
)`
if testDialect == adminpb.DatabaseDialect_POSTGRESQL {
ddl = `ALTER SEQUENCE seqT SKIP RANGE 0 4611686018427387904`
}
op, err := databaseAdmin.UpdateDatabaseDdl(ctx, &adminpb.UpdateDatabaseDdlRequest{
Database: dbPath,
Statements: []string{ddl},
})
if err != nil {
return err
}
if err := op.Wait(ctx); err != nil {
return err
}
var values [][]interface{}
_, err = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
var v string
for i := 1; i <= 100; i++ {
v = v + " (" + strconv.Itoa(i) + ")"
if i != 100 {
v = v + ", "
}
}
iter := tx.Query(ctx, NewStatement("INSERT INTO T (value) VALUES "+v+" "+returningSQL+" id, counter"))
values, err = readAllBitReversedSeqTable(iter, true)
return err
})
if err != nil {
return err
}
if len(values) != 100 {
return errors.New("expected 100 rows to be inserted")
}
counter := int64(0)
for i := 0; i < 100; i++ {
newID, newCounter := values[i][0].(int64), values[i][1].(int64)
if newID <= 0 || newID < 4611686018427387904 {
return errors.New("expected d1, id2, id3, …., id100 > 4611686018427387904")
}
if newCounter < counter {
return errors.New("expected c3 >= c2 >= c1")
}
counter = newCounter
}
return err
},
},
{
name: "success: move start with counter forward",
test: func() error {
ddl := `ALTER SEQUENCE seqT SET OPTIONS (start_with_counter=10001)`
if testDialect == adminpb.DatabaseDialect_POSTGRESQL {
ddl = `ALTER SEQUENCE seqT RESTART COUNTER WITH 10001`
}
op, err := databaseAdmin.UpdateDatabaseDdl(ctx, &adminpb.UpdateDatabaseDdlRequest{
Database: dbPath,
Statements: []string{ddl},
})
if err != nil {
return err
}
if err := op.Wait(ctx); err != nil {
return err
}
var values [][]interface{}
_, err = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
iter := tx.Query(ctx, NewStatement("INSERT INTO T (value) VALUES (4), (5), (6) "+returningSQL+" id, br_id, counter"))
values, err = readAllBitReversedSeqTable(iter, false)
return err
})
if err != nil {
return err
}
if len(values) != 3 {
return errors.New("expected 3 rows to be inserted")
}
counter := int64(0)
for i := 0; i < 3; i++ {
newID, newBrID, newCounter := values[i][0].(int64), values[i][1].(int64), values[i][2].(int64)
if newID <= 0 {
return errors.New("expected id > 0")
}
if newBrID < 10001 {
return errors.New("expected br_idi >= 10001")
}
if newCounter < 10001 {
return errors.New("expected ci >= 10001")
}
if counter < newCounter {
counter = newCounter
}
}
iter := client.Single().Query(ctx, NewStatement("SELECT "+internalStateSQL))
r, err := iter.Next()
if err != nil {
return err
}
var c3 int64
err = r.Columns(&c3)
if err != nil {
return err
}
if c3 > counter {
return errors.New("expected max(ci) <= SELECT GET_INTERNAL_SEQUENCE_STATE(Sequence seqT)")
}
return err
},
},
{
name: "success: drop the sequences",
test: func() error {
op, err := databaseAdmin.UpdateDatabaseDdl(ctx, &adminpb.UpdateDatabaseDdlRequest{
Database: dbPath,
Statements: []string{
"ALTER TABLE T ALTER COLUMN id DROP DEFAULT",
"ALTER TABLE T ALTER COLUMN counter DROP DEFAULT",
"ALTER TABLE T DROP CONSTRAINT id_gt_0",
"ALTER TABLE T DROP CONSTRAINT counter_gt_br_id",
"ALTER TABLE T DROP CONSTRAINT br_id_true",
"DROP SEQUENCE seqT",
},
})
if err != nil {
return err
}
if err := op.Wait(ctx); err != nil {
return err
}
return nil
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
gotErr := tt.test()
if gotErr != nil && strings.ToLower(gotErr.Error()) != strings.ToLower(tt.wantErr.Error()) {
t.Errorf("BIT REVERSED SEQUENECES error=%v, wantErr: %v", gotErr, tt.wantErr)
}
})
}
}
func TestIntegration_WithDirectedReadOptions_ReadOnlyTransaction(t *testing.T) {
t.Parallel()
// DirectedReadOptions for PG is supported, however we test only for Google SQL.
skipUnsupportedPGTest(t)
skipEmulatorTest(t)
ctxTimeout := 5 * time.Minute
ctx, cancel := context.WithTimeout(context.Background(), ctxTimeout)
defer cancel()
// Set up testing environment.
client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, statements[testDialect][singerDDLStatements])
defer cleanup()
directedReadOptions := &sppb.DirectedReadOptions{
Replicas: &sppb.DirectedReadOptions_IncludeReplicas_{
IncludeReplicas: &sppb.DirectedReadOptions_IncludeReplicas{
ReplicaSelections: []*sppb.DirectedReadOptions_ReplicaSelection{
{
Location: "us-west1",
Type: sppb.DirectedReadOptions_ReplicaSelection_READ_ONLY,
},
},
AutoFailoverDisabled: true,
},
},
}
writes := []struct {
row []interface{}
ts time.Time
}{
{row: []interface{}{1, "Marc", "Foo"}},
{row: []interface{}{2, "Tars", "Bar"}},
{row: []interface{}{3, "Alpha", "Beta"}},
{row: []interface{}{4, "Last", "End"}},
}
// Try to write four rows through the Apply API.
for i, w := range writes {
var err error
m := InsertOrUpdate("Singers",
[]string{"SingerId", "FirstName", "LastName"},
w.row)
if writes[i].ts, err = client.Apply(ctx, []*Mutation{m}, ApplyAtLeastOnce()); err != nil {
t.Fatal(err)
}
}
want := [][]interface{}{{int64(1), "Marc", "Foo"}, {int64(3), "Alpha", "Beta"}, {int64(4), "Last", "End"}}
// Test DirectedReadOptions for ReadOnlyTransaction.ReadWithOptions
got, err := readAll(client.ReadOnlyTransaction().ReadWithOptions(ctx, "Singers", KeySets(Key{1}, Key{3}, Key{4}), []string{"SingerId", "FirstName", "LastName"},
&ReadOptions{DirectedReadOptions: directedReadOptions}))
if err != nil {
t.Errorf("DirectedReadOptions using ReadOptions returns error %v, want nil", err)
}
if !testEqual(got, want) {
t.Errorf("got unexpected result in DirectedReadOptions test: %v, want %v", got, want)
}
// Test DirectedReadOptions for ReadOnlyTransaction.QueryWithOptions
singersQuery := "SELECT SingerId, FirstName, LastName FROM Singers WHERE SingerId IN (@p1, @p2, @p3) ORDER BY SingerId"
got, err = readAll(client.Single().QueryWithOptions(ctx, Statement{
singersQuery,
map[string]interface{}{"p1": int64(1), "p2": int64(3), "p3": int64(4)},
}, QueryOptions{DirectedReadOptions: directedReadOptions}))
if err != nil {
t.Errorf("DirectedReadOptions using QueryOptions returns error %v, want nil", err)
}
if !testEqual(got, want) {
t.Errorf("got unexpected result in DirectedReadOptions test: %v, want %v", got, want)
}
}
func TestIntegration_WithDirectedReadOptions_ReadWriteTransaction_ShouldThrowError(t *testing.T) {
t.Parallel()
// DirectedReadOptions for PG is supported, however we test only for Google SQL.
skipUnsupportedPGTest(t)
skipEmulatorTest(t)
ctxTimeout := 5 * time.Minute
ctx, cancel := context.WithTimeout(context.Background(), ctxTimeout)
defer cancel()
// Set up testing environment.
client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, statements[testDialect][singerDDLStatements])
defer cleanup()
directedReadOptions := &sppb.DirectedReadOptions{
Replicas: &sppb.DirectedReadOptions_IncludeReplicas_{
IncludeReplicas: &sppb.DirectedReadOptions_IncludeReplicas{
ReplicaSelections: []*sppb.DirectedReadOptions_ReplicaSelection{
{
Location: "us-west1",
Type: sppb.DirectedReadOptions_ReplicaSelection_READ_ONLY,
},
},
AutoFailoverDisabled: true,
},
},
}
writes := []struct {
row []interface{}
ts time.Time
}{
{row: []interface{}{1, "Marc", "Foo"}},
{row: []interface{}{2, "Tars", "Bar"}},
{row: []interface{}{3, "Alpha", "Beta"}},
{row: []interface{}{4, "Last", "End"}},
}
// Try to write four rows through the Apply API.
for i, w := range writes {
var err error
m := InsertOrUpdate("Singers",
[]string{"SingerId", "FirstName", "LastName"},
w.row)
if writes[i].ts, err = client.Apply(ctx, []*Mutation{m}, ApplyAtLeastOnce()); err != nil {
t.Fatal(err)
}
}
_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) (err error) {
_, err = readAll(tx.ReadWithOptions(ctx, "Singers", KeySets(Key{1}, Key{3}, Key{4}), []string{"SingerId", "FirstName", "LastName"}, &ReadOptions{DirectedReadOptions: directedReadOptions}))
return err
})
if err == nil {
t.Fatal("expected err, got nil")
}
if msg, ok := matchError(err, codes.InvalidArgument, "Directed reads can only be performed in a read-only transaction"); !ok {
t.Fatal(msg)
}
}
// Prepare initializes Cloud Spanner testing DB and clients.
func prepareIntegrationTest(ctx context.Context, t *testing.T, spc SessionPoolConfig, statements []string) (*Client, string, func()) {
return prepareDBAndClient(ctx, t, spc, statements, testDialect)
}
func prepareIntegrationTestForPG(ctx context.Context, t *testing.T, spc SessionPoolConfig, statements []string) (*Client, string, func()) {
return prepareDBAndClient(ctx, t, spc, statements, adminpb.DatabaseDialect_POSTGRESQL)
}
func prepareDBAndClient(ctx context.Context, t *testing.T, spc SessionPoolConfig, statements []string, dbDialect adminpb.DatabaseDialect) (*Client, string, func()) {
if databaseAdmin == nil {
t.Skip("Integration tests skipped")
}
// Construct a unique test DB name.
dbName := dbNameSpace.New()
dbPath := fmt.Sprintf("projects/%v/instances/%v/databases/%v", testProjectID, testInstanceID, dbName)
// Create database and tables.
req := &adminpb.CreateDatabaseRequest{
Parent: fmt.Sprintf("projects/%v/instances/%v", testProjectID, testInstanceID),
CreateStatement: "CREATE DATABASE " + dbName,
ExtraStatements: statements,
DatabaseDialect: dbDialect,
}
if dbDialect == adminpb.DatabaseDialect_POSTGRESQL {
req.ExtraStatements = []string{}
}
op, err := databaseAdmin.CreateDatabaseWithRetry(ctx, req)
if err != nil {
t.Fatalf("cannot create testing DB %v: %v", dbPath, err)
}
if _, err := op.Wait(ctx); err != nil {
t.Fatalf("cannot create testing DB %v: %v", dbPath, err)
}
if dbDialect == adminpb.DatabaseDialect_POSTGRESQL && len(statements) > 0 {
op, err := databaseAdmin.UpdateDatabaseDdl(ctx, &adminpb.UpdateDatabaseDdlRequest{
Database: dbPath,
Statements: statements,
})
if err != nil {
t.Fatalf("cannot create testing table %v: %v", dbPath, err)
}
if err := op.Wait(ctx); err != nil {
t.Fatalf("timeout creating testing table %v: %v", dbPath, err)
}
}
client, err := createClient(ctx, dbPath, ClientConfig{SessionPoolConfig: spc})
if err != nil {
t.Fatalf("cannot create data client on DB %v: %v", dbPath, err)
}
return client, dbPath, func() {
client.Close()
}
}
func cleanupInstances() {
if instanceAdmin == nil {
// Integration tests skipped.
return
}
ctx := context.Background()
parent := fmt.Sprintf("projects/%v", testProjectID)
iter := instanceAdmin.ListInstances(ctx, &instancepb.ListInstancesRequest{
Parent: parent,
Filter: "name:gotest-",
})
expireAge := 24 * time.Hour
for {
inst, err := iter.Next()
if err == iterator.Done {
break
}
if err != nil {
panic(err)
}
_, instID, err := parseInstanceName(inst.Name)
if err != nil {
log.Printf("Error: %v\n", err)
continue
}
if instanceNameSpace.Older(instID, expireAge) {
log.Printf("Deleting instance %s", inst.Name)
deleteInstanceAndBackups(ctx, inst.Name)
}
}
}
func deleteInstanceAndBackups(ctx context.Context, instanceName string) {
// First delete any lingering backups that might have been left on
// the instance.
backups := databaseAdmin.ListBackups(ctx, &adminpb.ListBackupsRequest{Parent: instanceName})
for {
backup, err := backups.Next()
if err == iterator.Done {
break
}
if err != nil {
log.Printf("failed to retrieve backups from instance %s because of error %v", instanceName, err)
break
}
if err := databaseAdmin.DeleteBackup(ctx, &adminpb.DeleteBackupRequest{Name: backup.Name}); err != nil {
log.Printf("failed to delete backup %s (error %v)", backup.Name, err)
}
}
if err := instanceAdmin.DeleteInstance(ctx, &instancepb.DeleteInstanceRequest{Name: instanceName}); err != nil {
log.Printf("failed to delete instance %s (error %v), might need a manual removal",
instanceName, err)
}
}
func rangeReads(ctx context.Context, t *testing.T, client *Client) {
checkRange := func(ks KeySet, wantNums ...int) {
if msg, ok := compareRows(client.Single().Read(ctx, testTable, ks, testTableColumns), wantNums); !ok {
t.Errorf("key set %+v: %s", ks, msg)
}
}
checkRange(Key{"k1"}, 1)
checkRange(KeyRange{Key{"k3"}, Key{"k5"}, ClosedOpen}, 3, 4)
checkRange(KeyRange{Key{"k3"}, Key{"k5"}, ClosedClosed}, 3, 4, 5)
checkRange(KeyRange{Key{"k3"}, Key{"k5"}, OpenClosed}, 4, 5)
checkRange(KeyRange{Key{"k3"}, Key{"k5"}, OpenOpen}, 4)
// Partial key specification.
checkRange(KeyRange{Key{"k7"}, Key{}, ClosedClosed}, 7, 8, 9)
checkRange(KeyRange{Key{"k7"}, Key{}, OpenClosed}, 8, 9)
checkRange(KeyRange{Key{}, Key{"k11"}, ClosedOpen}, 0, 1, 10)
checkRange(KeyRange{Key{}, Key{"k11"}, ClosedClosed}, 0, 1, 10, 11)
// The following produce empty ranges.
// TODO(jba): Consider a multi-part key to illustrate partial key behavior.
// checkRange(KeyRange{Key{"k7"}, Key{}, ClosedOpen})
// checkRange(KeyRange{Key{"k7"}, Key{}, OpenOpen})
// checkRange(KeyRange{Key{}, Key{"k11"}, OpenOpen})
// checkRange(KeyRange{Key{}, Key{"k11"}, OpenClosed})
// Prefix is component-wise, not string prefix.
checkRange(Key{"k1"}.AsPrefix(), 1)
checkRange(KeyRange{Key{"k1"}, Key{"k2"}, ClosedOpen}, 1, 10, 11, 12, 13, 14)
checkRange(AllKeys(), 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14)
}
func indexRangeReads(ctx context.Context, t *testing.T, client *Client) {
checkRange := func(ks KeySet, wantNums ...int) {
if msg, ok := compareRows(client.Single().ReadUsingIndex(ctx, testTable, testTableIndex, ks, testTableColumns),
wantNums); !ok {
t.Errorf("key set %+v: %s", ks, msg)
}
}
checkRange(Key{"v1"}, 1)
checkRange(KeyRange{Key{"v3"}, Key{"v5"}, ClosedOpen}, 3, 4)
checkRange(KeyRange{Key{"v3"}, Key{"v5"}, ClosedClosed}, 3, 4, 5)
checkRange(KeyRange{Key{"v3"}, Key{"v5"}, OpenClosed}, 4, 5)
checkRange(KeyRange{Key{"v3"}, Key{"v5"}, OpenOpen}, 4)
// // Partial key specification.
checkRange(KeyRange{Key{"v7"}, Key{}, ClosedClosed}, 7, 8, 9)
checkRange(KeyRange{Key{"v7"}, Key{}, OpenClosed}, 8, 9)
checkRange(KeyRange{Key{}, Key{"v11"}, ClosedOpen}, 0, 1, 10)
checkRange(KeyRange{Key{}, Key{"v11"}, ClosedClosed}, 0, 1, 10, 11)
// // The following produce empty ranges.
// checkRange(KeyRange{Key{"v7"}, Key{}, ClosedOpen})
// checkRange(KeyRange{Key{"v7"}, Key{}, OpenOpen})
// checkRange(KeyRange{Key{}, Key{"v11"}, OpenOpen})
// checkRange(KeyRange{Key{}, Key{"v11"}, OpenClosed})
// // Prefix is component-wise, not string prefix.
checkRange(Key{"v1"}.AsPrefix(), 1)
checkRange(KeyRange{Key{"v1"}, Key{"v2"}, ClosedOpen}, 1, 10, 11, 12, 13, 14)
checkRange(AllKeys(), 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14)
// Read from an index with DESC ordering.
wantNums := []int{14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0}
if msg, ok := compareRows(client.Single().ReadUsingIndex(ctx, testTable, "TestTableByValueDesc", AllKeys(), testTableColumns),
wantNums); !ok {
t.Errorf("desc: %s", msg)
}
}
type testTableRow struct{ Key, StringValue string }
func compareRows(iter *RowIterator, wantNums []int) (string, bool) {
rows, err := readAllTestTable(iter)
if err != nil {
return err.Error(), false
}
want := map[string]string{}
for _, n := range wantNums {
want[fmt.Sprintf("k%d", n)] = fmt.Sprintf("v%d", n)
}
got := map[string]string{}
for _, r := range rows {
got[r.Key] = r.StringValue
}
if !testEqual(got, want) {
return fmt.Sprintf("got %v, want %v", got, want), false
}
return "", true
}
func isNaN(x interface{}) bool {
f, ok := x.(float64)
if !ok {
return false
}
return math.IsNaN(f)
}
// createClient creates Cloud Spanner data client.
func createClient(ctx context.Context, dbPath string, config ClientConfig) (client *Client, err error) {
opts := grpcHeaderChecker.CallOptions()
if spannerHost != "" {
opts = append(opts, option.WithEndpoint(spannerHost))
}
if dpConfig.attemptDirectPath {
opts = append(opts, option.WithGRPCDialOption(grpc.WithDefaultCallOptions(grpc.Peer(peerInfo))))
}
client, err = NewClientWithConfig(ctx, dbPath, config, opts...)
if err != nil {
return nil, fmt.Errorf("cannot create data client on DB %v: %v", dbPath, err)
}
return client, nil
}
func createClientWithRole(ctx context.Context, dbPath string, spc SessionPoolConfig, role string) (client *Client, err error) {
opts := grpcHeaderChecker.CallOptions()
if spannerHost != "" {
opts = append(opts, option.WithEndpoint(spannerHost))
}
if dpConfig.attemptDirectPath {
opts = append(opts, option.WithGRPCDialOption(grpc.WithDefaultCallOptions(grpc.Peer(peerInfo))))
}
client, err = NewClientWithConfig(ctx, dbPath, ClientConfig{SessionPoolConfig: spc, DatabaseRole: role}, opts...)
if err != nil {
return nil, fmt.Errorf("cannot create data client on DB %v: %v", dbPath, err)
}
return client, nil
}
// populate prepares the database with some data.
func populate(ctx context.Context, client *Client) error {
// Populate data
var err error
m := InsertMap("test", map[string]interface{}{
"a": str1,
"b": str2,
})
_, err = client.Apply(ctx, []*Mutation{m})
return err
}
func matchError(got error, wantCode codes.Code, wantMsgPart string) (string, bool) {
if ErrCode(got) != wantCode || !strings.Contains(strings.ToLower(ErrDesc(got)), strings.ToLower(wantMsgPart)) {
return fmt.Sprintf("got error <%v>\n"+`want <code = %q, "...%s...">`, got, wantCode, wantMsgPart), false
}
return "", true
}
func rowToValues(r *Row) ([]interface{}, error) {
var x int64
var y, z string
if err := r.Column(0, &x); err != nil {
return nil, err
}
if err := r.Column(1, &y); err != nil {
return nil, err
}
if err := r.Column(2, &z); err != nil {
return nil, err
}
return []interface{}{x, y, z}, nil
}
func readAll(iter *RowIterator) ([][]interface{}, error) {
defer iter.Stop()
var vals [][]interface{}
for {
row, err := iter.Next()
if err == iterator.Done {
return vals, nil
}
if err != nil {
return nil, err
}
v, err := rowToValues(row)
if err != nil {
return nil, err
}
vals = append(vals, v)
}
}
func readAllTestTable(iter *RowIterator) ([]testTableRow, error) {
defer iter.Stop()
var vals []testTableRow
for {
row, err := iter.Next()
if err == iterator.Done {
if iter.Metadata == nil {
// All queries should always return metadata, regardless whether
// they return any rows or not.
return nil, errors.New("missing metadata from query")
}
return vals, nil
}
if err != nil {
return nil, err
}
var ttr testTableRow
if err := row.ToStruct(&ttr); err != nil {
return nil, err
}
vals = append(vals, ttr)
}
}
func readAllAccountsTable(iter *RowIterator) ([][]interface{}, error) {
defer iter.Stop()
var vals [][]interface{}
for {
row, err := iter.Next()
if err == iterator.Done {
return vals, nil
}
if err != nil {
return nil, err
}
var id, balance int64
var nickname string
err = row.Columns(&id, &nickname, &balance)
if err != nil {
return nil, err
}
vals = append(vals, []interface{}{id, nickname, balance})
}
}
func readAllBitReversedSeqTable(iter *RowIterator, onlyIDCounter bool) ([][]interface{}, error) {
defer iter.Stop()
var vals [][]interface{}
for {
row, err := iter.Next()
if err == iterator.Done {
return vals, nil
}
if err != nil {
return nil, err
}
var id, brID, counter int64
if onlyIDCounter {
err = row.Columns(&id, &counter)
if err != nil {
return nil, err
}
vals = append(vals, []interface{}{id, counter})
continue
}
err = row.Columns(&id, &brID, &counter)
if err != nil {
return nil, err
}
vals = append(vals, []interface{}{id, brID, counter})
}
}
func maxDuration(a, b time.Duration) time.Duration {
if a > b {
return a
}
return b
}
func isEmulatorEnvSet() bool {
return os.Getenv("SPANNER_EMULATOR_HOST") != ""
}
func skipEmulatorTest(t *testing.T) {
if isEmulatorEnvSet() {
t.Skip("Skipping testing against the emulator.")
}
}
func skipEmulatorTestForPG(t *testing.T) {
if isEmulatorEnvSet() && testDialect == adminpb.DatabaseDialect_POSTGRESQL {
t.Skip("Skipping PG testing against the emulator.")
}
}
func skipUnsupportedPGTest(t *testing.T) {
if testDialect == adminpb.DatabaseDialect_POSTGRESQL {
t.Skip("Skipping testing of unsupported tests in Postgres dialect.")
}
}
func onlyRunForPGTest(t *testing.T) {
if testDialect != adminpb.DatabaseDialect_POSTGRESQL {
t.Skip("Skipping tests supported only in Postgres dialect.")
}
}
func skipForPGTest(t *testing.T) {
if testDialect == adminpb.DatabaseDialect_POSTGRESQL {
t.Skip("Skipping tests non needed for Postgres dialect.")
}
}
func verifyDirectPathRemoteAddress(t *testing.T) {
t.Helper()
if !dpConfig.attemptDirectPath {
return
}
if remoteIP, res := isDirectPathRemoteAddress(); !res {
if dpConfig.directPathIPv4Only {
t.Fatalf("Expect to access DirectPath via ipv4 only, but RPC was destined to %s", remoteIP)
} else {
t.Fatalf("Expect to access DirectPath via ipv4 or ipv6, but RPC was destined to %s", remoteIP)
}
}
}
func isDirectPathRemoteAddress() (_ string, _ bool) {
remoteIP := peerInfo.Addr.String()
// DirectPath ipv4-only can only use ipv4 traffic.
if dpConfig.directPathIPv4Only {
return remoteIP, strings.HasPrefix(remoteIP, directPathIPV4Prefix)
}
// DirectPath ipv6 can use either ipv4 or ipv6 traffic.
return remoteIP, strings.HasPrefix(remoteIP, directPathIPV4Prefix) || strings.HasPrefix(remoteIP, directPathIPV6Prefix)
}
// examineTraffic counts RPCs use DirectPath or CFE traffic.
func examineTraffic(ctx context.Context, client *Client, expectDP bool) bool {
var numCount uint64
const (
numRPCsToSend = 20
minCompleteRPC = 40
)
countEnough := false
start := time.Now()
for !countEnough && time.Since(start) < 2*time.Minute {
for i := 0; i < numRPCsToSend; i++ {
_, _ = readAllTestTable(client.Single().Read(ctx, testTable, Key{"k1"}, testTableColumns))
if _, useDP := isDirectPathRemoteAddress(); useDP != expectDP {
numCount++
}
time.Sleep(100 * time.Millisecond)
countEnough = numCount >= minCompleteRPC
}
}
return countEnough
}
func blackholeOrAllowDirectPath(t *testing.T, blackholeDP bool) {
if dpConfig.directPathIPv4Only {
if blackholeDP {
cmdRes := exec.Command("bash", "-c", blackholeDpv4Cmd)
out, _ := cmdRes.CombinedOutput()
t.Logf(string(out))
} else {
cmdRes := exec.Command("bash", "-c", allowDpv4Cmd)
out, _ := cmdRes.CombinedOutput()
t.Logf(string(out))
}
return
}
// DirectPath supports both ipv4 and ipv6
if blackholeDP {
cmdRes := exec.Command("bash", "-c", blackholeDpv4Cmd)
out, _ := cmdRes.CombinedOutput()
t.Logf(string(out))
cmdRes = exec.Command("bash", "-c", blackholeDpv6Cmd)
out, _ = cmdRes.CombinedOutput()
t.Logf(string(out))
} else {
cmdRes := exec.Command("bash", "-c", allowDpv4Cmd)
out, _ := cmdRes.CombinedOutput()
t.Logf(string(out))
cmdRes = exec.Command("bash", "-c", allowDpv6Cmd)
out, _ = cmdRes.CombinedOutput()
t.Logf(string(out))
}
}
func checkCommonTagsGFELatency(t *testing.T, m map[tag.Key]string) {
// We only check prefix because client ID increases if we create
// multiple clients for the same database.
if !strings.HasPrefix(m[tagKeyClientID], "client") {
t.Fatalf("Incorrect client ID: %v", m[tagKeyClientID])
}
if m[tagKeyLibVersion] != internal.Version {
t.Fatalf("Incorrect library version: %v", m[tagKeyLibVersion])
}
}