blob: 0bd9627c17c8e0b63c9d6bd95e9fcdc095db8fbf [file] [edit]
/*
Copyright 2019 Google LLC
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package bigtable
import (
"context"
"encoding/base64"
"encoding/binary"
"flag"
"fmt"
"log"
"math"
"math/rand"
"os"
"os/exec"
"reflect"
"sort"
"strconv"
"strings"
"sync"
"testing"
"time"
cryptorand "crypto/rand"
btapb "cloud.google.com/go/bigtable/admin/apiv2/adminpb"
"cloud.google.com/go/civil"
"cloud.google.com/go/iam"
"cloud.google.com/go/internal"
"cloud.google.com/go/internal/optional"
"cloud.google.com/go/internal/testutil"
"cloud.google.com/go/internal/uid"
monitoring "cloud.google.com/go/monitoring/apiv3/v2"
"cloud.google.com/go/monitoring/apiv3/v2/monitoringpb"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
gax "github.com/googleapis/gax-go/v2"
"google.golang.org/api/iterator"
grpc "google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/timestamppb"
)
const (
timeUntilResourceCleanup = time.Hour * 12 // 12 hours
prefixOfInstanceResources = "bt-it-"
prefixOfClusterResources = "bt-c-"
maxCreateAttempts = 10
retryCreateSleep = 10 * time.Second
)
var (
// Backoffs: 10s 13s 16.9s 21.97s 28.561s 37.12s ......
retryCreateBackoff = gax.Backoff{
Initial: retryCreateSleep, // 10s
Max: time.Minute,
Multiplier: 1.30,
}
presidentsSocialGraph = map[string][]string{
"wmckinley": {"tjefferson"},
"gwashington": {"j§adams"},
"tjefferson": {"gwashington", "j§adams"},
"j§adams": {"gwashington", "tjefferson"},
}
clusterUIDSpace = uid.NewSpace(prefixOfClusterResources, &uid.Options{Short: true})
tableNameSpace = uid.NewSpace("cbt-test", &uid.Options{Short: true})
myTableNameSpace = uid.NewSpace("mytable", &uid.Options{Short: true})
myOtherTableNameSpace = uid.NewSpace("myothertable", &uid.Options{Short: true})
)
/*
| | follows |
| _key |------------------------------------|
| | tjefferson | j§adams | gwashington |
|-------------|------------|---------|-------------|
| wmckinley | 1 | | |
| gwashington | | 1 | |
| tjefferson | | 1 | 1 |
| j§adams | 1 | | 1 |
*/
func populatePresidentsGraph(table *Table) error {
ctx := context.Background()
for rowKey, ss := range presidentsSocialGraph {
mut := NewMutation()
for _, name := range ss {
mut.Set("follows", name, 1000, []byte("1"))
}
if err := table.Apply(ctx, rowKey, mut); err != nil {
return fmt.Errorf("Mutating row %q: %v", rowKey, err)
}
}
return nil
}
func generateNewInstanceName() string {
return fmt.Sprintf("%v%d", prefixOfInstanceResources, time.Now().Unix())
}
var instanceToCreate string
func init() {
if runCreateInstanceTests {
instanceToCreate = generateNewInstanceName()
}
}
func TestMain(m *testing.M) {
flag.Parse()
env, err := NewIntegrationEnv()
if err != nil {
panic(fmt.Sprintf("there was an issue creating an integration env: %v", err))
}
c := env.Config()
if c.UseProd {
fmt.Printf(
"Note: when using prod, you must first create an instance:\n"+
"cbt createinstance %s %s %s %s %s SSD\n",
c.Instance, c.Instance,
c.Cluster, "us-central1-b", "1",
)
}
exit := m.Run()
if err := cleanup(c); err != nil {
log.Printf("Post-test cleanup failed: %v", err)
}
os.Exit(exit)
}
func cleanup(c IntegrationTestConfig) error {
// Cleanup resources marked with bt-it- after a time delay
if !c.UseProd {
return nil
}
ctx := context.Background()
iac, err := NewInstanceAdminClient(ctx, c.Project, c.ClientOpts...)
if err != nil {
return err
}
instances, err := iac.Instances(ctx)
if err != nil {
return err
}
for _, instanceInfo := range instances {
if strings.HasPrefix(instanceInfo.Name, prefixOfInstanceResources) {
timestamp := instanceInfo.Name[len(prefixOfInstanceResources):]
t, err := strconv.ParseInt(timestamp, 10, 64)
if err != nil {
return err
}
uT := time.Unix(t, 0)
if time.Now().After(uT.Add(timeUntilResourceCleanup)) {
iac.DeleteInstance(ctx, instanceInfo.Name)
}
} else {
// Delete clusters created in existing instances
clusters, err := iac.Clusters(ctx, instanceInfo.Name)
if err != nil {
return err
}
for _, clusterInfo := range clusters {
if strings.HasPrefix(clusterInfo.Name, prefixOfClusterResources) {
iac.DeleteCluster(ctx, instanceInfo.Name, clusterInfo.Name)
}
}
}
}
return nil
}
func TestIntegration_TieredStorage(t *testing.T) {
ctx := context.Background()
testEnv, _, adminClient, _, _, cleanup, err := setupIntegration(ctx, t)
if err != nil {
t.Fatal(err)
}
defer cleanup()
if !testEnv.Config().UseProd {
t.Skip("emulator doesn't support TieredStorage")
}
tableName := tableNameSpace.New()
conf := &TableConf{
TableID: tableName,
TieredStorageConfig: &TieredStorageConfig{
InfrequentAccess: &TieredStorageIncludeIfOlderThan{
Duration: 30 * 24 * time.Hour,
},
},
}
if err := adminClient.CreateTableFromConf(ctx, conf); err != nil {
t.Fatalf("CreateTableFromConf failed: %v", err)
}
defer adminClient.DeleteTable(ctx, tableName)
ti, err := adminClient.TableInfo(ctx, tableName)
if err != nil {
t.Fatalf("TableInfo failed: %v", err)
}
if ti.TieredStorageConfig == nil {
t.Fatal("TieredStorageConfig is nil")
}
rule, ok := ti.TieredStorageConfig.InfrequentAccess.(*TieredStorageIncludeIfOlderThan)
if !ok {
t.Fatalf("Unexpected rule type: %T", ti.TieredStorageConfig.InfrequentAccess)
}
if optional.ToDuration(rule.Duration) != 30*24*time.Hour {
t.Errorf("Unexpected IncludeIfOlderThan: %v, expected %v", optional.ToDuration(rule.Duration), 30*24*time.Hour)
}
// Update tiered storage config
newDuration := 45 * 24 * time.Hour
newConfig := TieredStorageConfig{
InfrequentAccess: &TieredStorageIncludeIfOlderThan{
Duration: newDuration,
},
}
if err := adminClient.UpdateTableWithTieredStorageConfig(ctx, tableName, &newConfig); err != nil {
t.Fatalf("UpdateTableWithTieredStorageConfig failed: %v", err)
}
ti, err = adminClient.TableInfo(ctx, tableName)
if err != nil {
t.Fatalf("TableInfo failed after update: %v", err)
}
rule, ok = ti.TieredStorageConfig.InfrequentAccess.(*TieredStorageIncludeIfOlderThan)
if !ok {
t.Fatalf("Unexpected rule type after update: %T", ti.TieredStorageConfig.InfrequentAccess)
}
if optional.ToDuration(rule.Duration) != newDuration {
t.Errorf("Unexpected IncludeIfOlderThan after update: %v, expected %v", optional.ToDuration(rule.Duration), newDuration)
}
// Remove tiered storage config
if err := adminClient.UpdateTableRemoveTieredStorageConfig(ctx, tableName); err != nil {
t.Fatalf("UpdateTableRemoveTieredStorageConfig failed: %v", err)
}
ti, err = adminClient.TableInfo(ctx, tableName)
if err != nil {
t.Fatalf("TableInfo failed after removal: %v", err)
}
if ti.TieredStorageConfig != nil {
t.Errorf("TieredStorageConfig should be nil after removal, got %+v", ti.TieredStorageConfig)
}
}
func TestIntegration_Pinger(t *testing.T) {
ctx := context.Background()
testEnv, client, _, _, _, cleanup, err := setupIntegration(ctx, t)
if err != nil {
t.Fatal(err)
}
t.Cleanup(func() { cleanup() })
if !testEnv.Config().UseProd {
t.Skip("emulator doesn't support PingAndWarm")
}
if err := client.PingAndWarm(ctx); err != nil {
t.Fatalf("pinger failed. got %v, want %v", err, nil)
}
}
func TestIntegration_UpdateFamilyValueType(t *testing.T) {
ctx := context.Background()
_, _, adminClient, _, tableName, cleanup, err := setupIntegration(ctx, t)
if err != nil {
t.Fatal(err)
}
t.Cleanup(cleanup)
familyName := "new_family"
// Create a new column family
if err = createColumnFamily(ctx, t, adminClient, tableName, familyName, nil); err != nil {
t.Fatalf("Failed to create column family: %v", err)
}
// the type of the family is not aggregate
table, err := adminClient.getTable(ctx, tableName, btapb.Table_SCHEMA_VIEW)
if err != nil {
t.Fatalf("Failed to get table: %v", err)
}
family := table.GetColumnFamilies()[familyName]
if family.ValueType.GetAggregateType() != nil {
t.Fatalf("New column family cannot be aggregate type")
}
// Update column family type to string type should be successful
update := Family{
ValueType: StringType{
Encoding: StringUtf8BytesEncoding{},
},
}
err = retry(func() error { return adminClient.UpdateFamily(ctx, tableName, familyName, update) }, nil)
if err != nil {
t.Fatalf("Failed to update value type of family %s with current type %v: %v", familyName, family.ValueType, err)
}
// Get FamilyInfo to check if the type is updated
table, err = adminClient.getTable(ctx, tableName, btapb.Table_SCHEMA_VIEW)
if err != nil {
t.Fatalf("Failed to get table info: %v", err)
}
family = table.GetColumnFamilies()[familyName]
if !testutil.Equal(family.ValueType, update.ValueType.proto()) {
t.Fatalf("got %v, want %v", family.ValueType, update.ValueType.proto())
}
}
func TestIntegration_Aggregates(t *testing.T) {
ctx := context.Background()
_, _, ac, table, tableName, cleanup, err := setupIntegration(ctx, t)
if err != nil {
t.Fatal(err)
}
defer cleanup()
key := "some-key"
family := "sum"
column := "col"
mut := NewMutation()
mut.AddIntToCell(family, column, 1000, 5)
// Add 5 to empty cell.
if err := table.Apply(ctx, key, mut); err != nil {
t.Fatalf("Mutating row %q: %v", key, err)
}
row, err := table.ReadRow(ctx, key)
if err != nil {
t.Fatalf("Reading a row: %v", err)
}
wantRow := Row{
family: []ReadItem{
{Row: key, Column: fmt.Sprintf("%s:%s", family, column), Timestamp: 1000, Value: binary.BigEndian.AppendUint64([]byte{}, 5)},
},
}
if !testutil.Equal(row, wantRow) {
t.Fatalf("Read row mismatch.\n got %#v\nwant %#v", row, wantRow)
}
// Add 5 again.
if err := table.Apply(ctx, key, mut); err != nil {
t.Fatalf("Mutating row %q: %v", key, err)
}
row, err = table.ReadRow(ctx, key)
if err != nil {
t.Fatalf("Reading a row: %v", err)
}
wantRow = Row{
family: []ReadItem{
{Row: key, Column: fmt.Sprintf("%s:%s", family, column), Timestamp: 1000, Value: binary.BigEndian.AppendUint64([]byte{}, 10)},
},
}
if !testutil.Equal(row, wantRow) {
t.Fatalf("Read row mismatch.\n got %#v\nwant %#v", row, wantRow)
}
// Merge 5, which translates in the backend to adding 5 for sum column families.
mut2 := NewMutation()
mut2.MergeBytesToCell(family, column, 1000, binary.BigEndian.AppendUint64([]byte{}, 5))
if err := table.Apply(ctx, key, mut); err != nil {
t.Fatalf("Mutating row %q: %v", key, err)
}
row, err = table.ReadRow(ctx, key)
if err != nil {
t.Fatalf("Reading a row: %v", err)
}
wantRow = Row{
family: []ReadItem{
{Row: key, Column: fmt.Sprintf("%s:%s", family, column), Timestamp: 1000, Value: binary.BigEndian.AppendUint64([]byte{}, 15)},
},
}
if !testutil.Equal(row, wantRow) {
t.Fatalf("Read row mismatch.\n got %#v\nwant %#v", row, wantRow)
}
err = ac.UpdateFamily(ctx, tableName, family, Family{ValueType: StringType{}})
if err == nil {
t.Fatalf("Expected UpdateFamily to fail, but it didn't")
}
wantError := "Immutable fields 'value_type.aggregate_type' cannot be updated"
if !strings.Contains(err.Error(), wantError) {
t.Errorf("Wrong error. Expected to containt %q but was %v", wantError, err)
}
}
func TestIntegration_HighlyConcurrentReadsAndWrites(t *testing.T) {
ctx := context.Background()
_, _, adminClient, table, tableName, cleanup, err := setupIntegration(ctx, t)
if err != nil {
t.Fatal(err)
}
defer cleanup()
if err := populatePresidentsGraph(table); err != nil {
t.Fatal(err)
}
if err := createColumnFamily(ctx, t, adminClient, tableName, "ts", nil); err != nil {
t.Fatalf("Creating column family: %v", err)
}
// Do highly concurrent reads/writes.
const maxConcurrency = 1000
var wg sync.WaitGroup
for i := 0; i < maxConcurrency; i++ {
wg.Add(1)
go func() {
defer wg.Done()
switch r := rand.Intn(100); { // r ∈ [0,100)
case 0 <= r && r < 30:
// Do a read.
_, err := table.ReadRow(ctx, "testrow", RowFilter(LatestNFilter(1)))
if err != nil {
t.Errorf("Concurrent read: %v", err)
}
case 30 <= r && r < 100:
// Do a write.
mut := NewMutation()
mut.Set("ts", "col", 1000, []byte("data"))
if err := table.Apply(ctx, "testrow", mut); err != nil {
t.Errorf("Concurrent write: %v", err)
}
}
}()
}
wg.Wait()
}
func TestIntegration_NoopMetricsProvider(t *testing.T) {
ctx := context.Background()
testEnv, _, adminClient, _, tableName, cleanup, err := setupIntegration(ctx, t)
if err != nil {
t.Fatal(err)
}
defer cleanup()
if testing.Short() || !testEnv.Config().UseProd {
t.Skip("Skip long running tests in short mode or non-prod environments")
}
family := "export"
if err := createColumnFamily(ctx, t, adminClient, tableName, family, nil); err != nil {
t.Fatalf("Creating column family: %v", err)
}
noopClient, err := testEnv.NewClientWithConfig(ClientConfig{MetricsProvider: NoopMetricsProvider{}})
if err != nil {
t.Fatalf("NewClientWithConfig: %v", err)
}
noopTable := noopClient.Open(tableName)
for i := 0; i < 10; i++ {
mut := NewMutation()
mut.Set(family, "col", 1000, []byte("test"))
if err := noopTable.Apply(ctx, fmt.Sprintf("row-%v", i), mut); err != nil {
t.Fatalf("Apply: %v", err)
}
}
err = noopTable.ReadRows(ctx, PrefixRange("row-"), func(r Row) bool {
return true
}, RowFilter(ColumnFilter("col")))
if err != nil {
t.Fatalf("ReadRows: %v", err)
}
}
func TestIntegration_ExportBuiltInMetrics(t *testing.T) {
// record start time
testStartTime := time.Now()
tsListStart := &timestamppb.Timestamp{
Seconds: testStartTime.Unix(),
Nanos: int32(testStartTime.Nanosecond()),
}
origSamplePeriod := defaultSamplePeriod
defaultSamplePeriod = 10 * time.Second
defer func() {
defaultSamplePeriod = origSamplePeriod
}()
ctx := context.Background()
testEnv, _, adminClient, table, tableName, cleanup, err := setupIntegration(ctx, t)
if err != nil {
t.Fatal(err)
}
defer cleanup()
if testing.Short() || !testEnv.Config().UseProd {
t.Skip("Skip long running tests in short mode or non-prod environments")
}
family := "export"
if err := createColumnFamily(ctx, t, adminClient, tableName, family, nil); err != nil {
t.Fatalf("Creating column family: %v", err)
}
for i := 0; i < 10; i++ {
mut := NewMutation()
mut.Set(family, "col", 1000, []byte("test"))
if err := table.Apply(ctx, fmt.Sprintf("row-%v", i), mut); err != nil {
t.Fatalf("Apply: %v", err)
}
}
err = table.ReadRows(ctx, PrefixRange("row-"), func(r Row) bool {
return true
}, RowFilter(ColumnFilter("col")))
if err != nil {
t.Fatalf("ReadRows: %v", err)
}
// Validate that metrics are exported
elapsedTime := time.Since(testStartTime)
if elapsedTime < 2*defaultSamplePeriod {
// Ensure at least 2 datapoints are recorded
time.Sleep(2*defaultSamplePeriod - elapsedTime)
}
// Sleep some more
time.Sleep(5 * time.Second)
monitoringClient, err := monitoring.NewMetricClient(ctx, testEnv.Config().ClientOpts...)
if err != nil {
t.Errorf("Failed to create metric client: %v", err)
}
metricNamesValidate := []string{
metricNameOperationLatencies,
metricNameAttemptLatencies,
metricNameServerLatencies,
metricNameFirstRespLatencies,
}
// Try for 5m with 10s sleep between retries
testutil.Retry(t, 10, 5*time.Second, func(r *testutil.R) {
for _, metricName := range metricNamesValidate {
timeListEnd := time.Now()
tsListEnd := &timestamppb.Timestamp{
Seconds: timeListEnd.Unix(),
Nanos: int32(timeListEnd.Nanosecond()),
}
// ListTimeSeries can list only one metric type at a time.
// So, call ListTimeSeries with different metric names
iter := monitoringClient.ListTimeSeries(ctx, &monitoringpb.ListTimeSeriesRequest{
Name: fmt.Sprintf("projects/%s", testEnv.Config().Project),
Interval: &monitoringpb.TimeInterval{
StartTime: tsListStart,
EndTime: tsListEnd,
},
Filter: fmt.Sprintf("metric.type = starts_with(\"bigtable.googleapis.com/client/%v\")", metricName),
})
// Assert at least 1 datapoint was exported
_, err := iter.Next()
if err != nil {
r.Errorf("%v not exported\n", metricName)
}
}
})
}
func TestIntegration_LargeReadsWritesAndScans(t *testing.T) {
ctx := context.Background()
testEnv, _, adminClient, table, tableName, cleanup, err := setupIntegration(ctx, t)
if err != nil {
t.Fatal(err)
}
defer cleanup()
if testing.Short() {
t.Skip("Skip long running tests in short mode")
}
ts := uid.NewSpace("ts", &uid.Options{Short: true}).New()
if err := createColumnFamily(ctx, t, adminClient, tableName, ts, nil); err != nil {
t.Fatalf("Creating column family: %v", err)
}
bigBytes := make([]byte, 5<<20) // 5 MB is larger than current default gRPC max of 4 MB, but less than the max we set.
nonsense := []byte("lorem ipsum dolor sit amet, ")
fill(bigBytes, nonsense)
mut := NewMutation()
mut.Set(ts, "col", 1000, bigBytes)
if err := table.Apply(ctx, "bigrow", mut); err != nil {
t.Fatalf("Big write: %v", err)
}
verifyDirectPathRemoteAddress(testEnv, t)
r, err := table.ReadRow(ctx, "bigrow")
if err != nil {
t.Fatalf("Big read: %v", err)
}
verifyDirectPathRemoteAddress(testEnv, t)
wantRow := Row{ts: []ReadItem{
{Row: "bigrow", Column: fmt.Sprintf("%s:col", ts), Timestamp: 1000, Value: bigBytes},
}}
if !testutil.Equal(r, wantRow) {
t.Fatalf("Big read returned incorrect bytes: %v", r)
}
var wg sync.WaitGroup
// Now write 1000 rows, each with 82 KB values, then scan them all.
medBytes := make([]byte, 82<<10)
fill(medBytes, nonsense)
sem := make(chan int, 50) // do up to 50 mutations at a time.
for i := 0; i < 1000; i++ {
mut := NewMutation()
mut.Set(ts, "big-scan", 1000, medBytes)
row := fmt.Sprintf("row-%d", i)
wg.Add(1)
go func() {
defer wg.Done()
defer func() { <-sem }()
sem <- 1
if err := table.Apply(ctx, row, mut); err != nil {
t.Errorf("Preparing large scan: %v", err)
}
verifyDirectPathRemoteAddress(testEnv, t)
}()
}
wg.Wait()
n := 0
err = table.ReadRows(ctx, PrefixRange("row-"), func(r Row) bool {
for _, ris := range r {
for _, ri := range ris {
n += len(ri.Value)
}
}
return true
}, RowFilter(ColumnFilter("big-scan")))
if err != nil {
t.Fatalf("Doing large scan: %v", err)
}
verifyDirectPathRemoteAddress(testEnv, t)
if want := 1000 * len(medBytes); n != want {
t.Fatalf("Large scan returned %d bytes, want %d", n, want)
}
// Scan a subset of the 1000 rows that we just created, using a LimitRows ReadOption.
rc := 0
wantRc := 3
err = table.ReadRows(ctx, PrefixRange("row-"), func(r Row) bool {
rc++
return true
}, LimitRows(int64(wantRc)))
if err != nil {
t.Fatal(err)
}
verifyDirectPathRemoteAddress(testEnv, t)
if rc != wantRc {
t.Fatalf("Scan with row end returned %d rows, want %d", rc, wantRc)
}
// Test bulk mutations
bulk := uid.NewSpace("bulk", &uid.Options{Short: true}).New()
if err := createColumnFamily(ctx, t, adminClient, tableName, bulk, nil); err != nil {
t.Fatalf("Creating column family: %v", err)
}
bulkData := map[string][]string{
"red sox": {"2004", "2007", "2013"},
"patriots": {"2001", "2003", "2004", "2014"},
"celtics": {"1981", "1984", "1986", "2008"},
}
var rowKeys []string
var muts []*Mutation
for row, ss := range bulkData {
mut := NewMutation()
for _, name := range ss {
mut.Set(bulk, name, 1000, []byte("1"))
}
rowKeys = append(rowKeys, row)
muts = append(muts, mut)
}
status, err := table.ApplyBulk(ctx, rowKeys, muts)
if err != nil {
t.Fatalf("Bulk mutating rows %q: %v", rowKeys, err)
}
verifyDirectPathRemoteAddress(testEnv, t)
if status != nil {
t.Fatalf("non-nil errors: %v", err)
}
// Read each row back
for rowKey, ss := range bulkData {
row, err := table.ReadRow(ctx, rowKey)
if err != nil {
t.Fatalf("Reading a bulk row: %v", err)
}
verifyDirectPathRemoteAddress(testEnv, t)
var wantItems []ReadItem
for _, val := range ss {
c := fmt.Sprintf("%s:%s", bulk, val)
wantItems = append(wantItems, ReadItem{Row: rowKey, Column: c, Timestamp: 1000, Value: []byte("1")})
}
wantRow := Row{bulk: wantItems}
if !testutil.Equal(row, wantRow) {
t.Fatalf("Read row mismatch.\n got %#v\nwant %#v", row, wantRow)
}
}
// Test bulk write errors.
// Note: Setting timestamps as ServerTime makes sure the mutations are not retried on error.
badMut := NewMutation()
badMut.Set("badfamily", "col", ServerTime, nil)
badMut2 := NewMutation()
badMut2.Set("badfamily2", "goodcol", ServerTime, []byte("1"))
status, err = table.ApplyBulk(ctx, []string{"badrow", "badrow2"}, []*Mutation{badMut, badMut2})
if err != nil {
t.Fatalf("Bulk mutating rows %q: %v", rowKeys, err)
}
verifyDirectPathRemoteAddress(testEnv, t)
if status == nil {
t.Fatalf("No errors for bad bulk mutation")
} else if status[0] == nil || status[1] == nil {
t.Fatalf("No error for bad bulk mutation")
}
}
func TestIntegration_Presidents(t *testing.T) {
ctx := context.Background()
testEnv, _, adminClient, table, tableName, cleanup, err := setupIntegration(ctx, t)
if err != nil {
t.Fatal(err)
}
defer cleanup()
if err := populatePresidentsGraph(table); err != nil {
t.Fatal(err)
}
t.Run("PartialReadRows", func(t *testing.T) {
// Do a scan and stop part way through.
// Verify that the ReadRows callback doesn't keep running.
stopped := false
err = table.ReadRows(ctx, RowRange{}, func(r Row) bool {
if r.Key() < "h" {
return true
}
if !stopped {
stopped = true
return false
}
t.Fatalf("ReadRows kept scanning to row %q after being told to stop", r.Key())
return false
})
if err != nil {
t.Fatalf("Partial ReadRows: %v", err)
}
})
t.Run("ReadRowList", func(t *testing.T) {
// Read a RowList
var elt []string
keys := RowList{"wmckinley", "gwashington", "j§adams"}
want := "gwashington-j§adams-1,j§adams-gwashington-1,j§adams-tjefferson-1,wmckinley-tjefferson-1"
err = table.ReadRows(ctx, keys, func(r Row) bool {
for _, ris := range r {
for _, ri := range ris {
elt = append(elt, formatReadItem(ri))
}
}
return true
})
if err != nil {
t.Fatalf("read RowList: %v", err)
}
if got := strings.Join(elt, ","); got != want {
t.Fatalf("bulk read: wrong reads.\n got %q\nwant %q", got, want)
}
})
t.Run("ReadRowListReverse", func(t *testing.T) {
// Read a RowList
var elt []string
rowRange := NewOpenClosedRange("gwashington", "wmckinley")
want := "wmckinley-tjefferson-1,tjefferson-gwashington-1,tjefferson-j§adams-1,j§adams-gwashington-1,j§adams-tjefferson-1"
err = table.ReadRows(ctx, rowRange, func(r Row) bool {
for _, ris := range r {
for _, ri := range ris {
elt = append(elt, formatReadItem(ri))
}
}
return true
}, ReverseScan())
if err != nil {
t.Fatalf("read RowList: %v", err)
}
if got := strings.Join(elt, ","); got != want {
t.Fatalf("bulk read: wrong reads.\n got %q\nwant %q", got, want)
}
})
t.Run("ConditionalMutations", func(t *testing.T) {
// Do a conditional mutation with a complex filter.
mutTrue := NewMutation()
mutTrue.Set("follows", "wmckinley", 1000, []byte("1"))
filter := ChainFilters(ColumnFilter("gwash[iz].*"), ValueFilter("."))
mut := NewCondMutation(filter, mutTrue, nil)
if err := table.Apply(ctx, "tjefferson", mut); err != nil {
t.Fatalf("Conditionally mutating row: %v", err)
}
verifyDirectPathRemoteAddress(testEnv, t)
// Do a second condition mutation with a filter that does not match,
// and thus no changes should be made.
mutTrue = NewMutation()
mutTrue.DeleteRow()
filter = ColumnFilter("snoop.dogg")
mut = NewCondMutation(filter, mutTrue, nil)
if err := table.Apply(ctx, "tjefferson", mut); err != nil {
t.Fatalf("Conditionally mutating row: %v", err)
}
verifyDirectPathRemoteAddress(testEnv, t)
// Fetch a row.
row, err := table.ReadRow(ctx, "j§adams")
if err != nil {
t.Fatalf("Reading a row: %v", err)
}
verifyDirectPathRemoteAddress(testEnv, t)
wantRow := Row{
"follows": []ReadItem{
{Row: "j§adams", Column: "follows:gwashington", Timestamp: 1000, Value: []byte("1")},
{Row: "j§adams", Column: "follows:tjefferson", Timestamp: 1000, Value: []byte("1")},
},
}
if !testutil.Equal(row, wantRow) {
t.Fatalf("Read row mismatch.\n got %#v\nwant %#v", row, wantRow)
}
})
t.Run("ReadModifyWrite", func(t *testing.T) {
if err := createColumnFamily(ctx, t, adminClient, tableName, "counter", nil); err != nil {
t.Fatalf("Creating column family: %v", err)
}
appendRMW := func(b []byte) *ReadModifyWrite {
rmw := NewReadModifyWrite()
rmw.AppendValue("counter", "likes", b)
return rmw
}
incRMW := func(n int64) *ReadModifyWrite {
rmw := NewReadModifyWrite()
rmw.Increment("counter", "likes", n)
return rmw
}
rmwSeq := []struct {
desc string
rmw *ReadModifyWrite
want []byte
}{
{
desc: "append #1",
rmw: appendRMW([]byte{0, 0, 0}),
want: []byte{0, 0, 0},
},
{
desc: "append #2",
rmw: appendRMW([]byte{0, 0, 0, 0, 17}), // the remaining 40 bits to make a big-endian 17
want: []byte{0, 0, 0, 0, 0, 0, 0, 17},
},
{
desc: "increment",
rmw: incRMW(8),
want: []byte{0, 0, 0, 0, 0, 0, 0, 25},
},
}
for _, step := range rmwSeq {
row, err := table.ApplyReadModifyWrite(ctx, "gwashington", step.rmw)
if err != nil {
t.Fatalf("ApplyReadModifyWrite %+v: %v", step.rmw, err)
}
verifyDirectPathRemoteAddress(testEnv, t)
// Make sure the modified cell returned by the RMW operation has a timestamp.
if row["counter"][0].Timestamp == 0 {
t.Fatalf("RMW returned cell timestamp: got %v, want > 0", row["counter"][0].Timestamp)
}
clearTimestamps(row)
wantRow := Row{"counter": []ReadItem{{Row: "gwashington", Column: "counter:likes", Value: step.want}}}
if !testutil.Equal(row, wantRow) {
t.Fatalf("After %s,\n got %v\nwant %v", step.desc, row, wantRow)
}
}
// Check for google-cloud-go/issues/723. RMWs that insert new rows should keep row order sorted in the emulator.
_, err = table.ApplyReadModifyWrite(ctx, "issue-723-2", appendRMW([]byte{0}))
if err != nil {
t.Fatalf("ApplyReadModifyWrite null string: %v", err)
}
verifyDirectPathRemoteAddress(testEnv, t)
_, err = table.ApplyReadModifyWrite(ctx, "issue-723-1", appendRMW([]byte{0}))
if err != nil {
t.Fatalf("ApplyReadModifyWrite null string: %v", err)
}
verifyDirectPathRemoteAddress(testEnv, t)
// Get only the correct row back on read.
r, err := table.ReadRow(ctx, "issue-723-1")
if err != nil {
t.Fatalf("Reading row: %v", err)
}
verifyDirectPathRemoteAddress(testEnv, t)
if r.Key() != "issue-723-1" {
t.Fatalf("ApplyReadModifyWrite: incorrect read after RMW,\n got %v\nwant %v", r.Key(), "issue-723-1")
}
})
t.Run("DeleteRow", func(t *testing.T) {
// Delete a row and check it goes away.
mut := NewMutation()
mut.DeleteRow()
if err := table.Apply(ctx, "wmckinley", mut); err != nil {
t.Fatalf("Apply DeleteRow: %v", err)
}
row, err := table.ReadRow(ctx, "wmckinley")
if err != nil {
t.Fatalf("Reading a row after DeleteRow: %v", err)
}
if len(row) != 0 {
t.Fatalf("Read non-zero row after DeleteRow: %v", row)
}
})
}
func TestIntegration_FullReadStats(t *testing.T) {
ctx := context.Background()
testEnv, _, _, table, _, cleanup, err := setupIntegration(ctx, t)
if err != nil {
t.Fatal(err)
}
defer cleanup()
// Insert some data.
initialData := map[string][]string{
"wmckinley": {"tjefferson"},
"gwashington": {"j§adams"},
"tjefferson": {"gwashington", "j§adams", "wmckinley"},
"j§adams": {"gwashington", "tjefferson"},
}
for row, ss := range initialData {
mut := NewMutation()
for _, name := range ss {
mut.Set("follows", name, 1000, []byte("1"))
}
if err := table.Apply(ctx, row, mut); err != nil {
t.Fatalf("Mutating row %q: %v", row, err)
}
verifyDirectPathRemoteAddress(testEnv, t)
}
for _, test := range []struct {
desc string
rr RowSet
filter Filter // may be nil
limit ReadOption // may be nil
reverseScan bool
// We do the read and grab all the stats.
cellsReturnedCount int64
rowsReturnedCount int64
}{
{
desc: "read all, unfiltered",
rr: RowRange{},
cellsReturnedCount: 7,
rowsReturnedCount: 4,
},
{
desc: "read with InfiniteRange, unfiltered",
rr: InfiniteRange("tjefferson"),
cellsReturnedCount: 4,
rowsReturnedCount: 2,
},
{
desc: "read with NewRange, unfiltered",
rr: NewRange("gargamel", "hubbard"),
cellsReturnedCount: 1,
rowsReturnedCount: 1,
},
{
desc: "read with NewRange, no results",
rr: NewRange("zany", "zebra"), // no matches
cellsReturnedCount: 0,
rowsReturnedCount: 0,
},
{
desc: "read with PrefixRange, unfiltered",
rr: PrefixRange("j§ad"),
cellsReturnedCount: 2,
rowsReturnedCount: 1,
},
{
desc: "read with SingleRow, unfiltered",
rr: SingleRow("wmckinley"),
cellsReturnedCount: 1,
rowsReturnedCount: 1,
},
{
desc: "read all, with ColumnFilter",
rr: RowRange{},
filter: ColumnFilter(".*j.*"), // matches "j§adams" and "tjefferson"
cellsReturnedCount: 4,
rowsReturnedCount: 4,
},
{
desc: "read all, with ColumnFilter, prefix",
rr: RowRange{},
filter: ColumnFilter("j"), // no matches
cellsReturnedCount: 0,
rowsReturnedCount: 0,
},
{
desc: "read range, with ColumnRangeFilter",
rr: RowRange{},
filter: ColumnRangeFilter("follows", "h", "k"),
cellsReturnedCount: 2,
rowsReturnedCount: 2,
},
{
desc: "read range from empty, with ColumnRangeFilter",
rr: RowRange{},
filter: ColumnRangeFilter("follows", "", "u"),
cellsReturnedCount: 6,
rowsReturnedCount: 4,
},
{
desc: "read range from start to empty, with ColumnRangeFilter",
rr: RowRange{},
filter: ColumnRangeFilter("follows", "h", ""),
cellsReturnedCount: 5,
rowsReturnedCount: 4,
},
{
desc: "read with RowKeyFilter",
rr: RowRange{},
filter: RowKeyFilter(".*wash.*"),
cellsReturnedCount: 1,
rowsReturnedCount: 1,
},
{
desc: "read with RowKeyFilter unicode",
rr: RowRange{},
filter: RowKeyFilter(".*j§.*"),
cellsReturnedCount: 2,
rowsReturnedCount: 1,
},
{
desc: "read with RowKeyFilter escaped",
rr: RowRange{},
filter: RowKeyFilter(`.*j\xC2\xA7.*`),
cellsReturnedCount: 2,
rowsReturnedCount: 1,
},
{
desc: "read with RowKeyFilter, prefix",
rr: RowRange{},
filter: RowKeyFilter("gwash"),
cellsReturnedCount: 0,
rowsReturnedCount: 0,
},
{
desc: "read with RowKeyFilter, no matches",
rr: RowRange{},
filter: RowKeyFilter(".*xxx.*"),
cellsReturnedCount: 0,
rowsReturnedCount: 0,
},
{
desc: "read with FamilyFilter, no matches",
rr: RowRange{},
filter: FamilyFilter(".*xxx.*"),
cellsReturnedCount: 0,
rowsReturnedCount: 0,
},
{
desc: "read with ColumnFilter + row end",
rr: RowRange{},
filter: ColumnFilter(".*j.*"), // matches "j§adams" and "tjefferson"
limit: LimitRows(2),
cellsReturnedCount: 2,
rowsReturnedCount: 2,
},
{
desc: "apply labels to the result rows",
rr: RowRange{},
filter: LabelFilter("test-label"),
limit: LimitRows(2),
cellsReturnedCount: 3,
rowsReturnedCount: 2,
},
{
desc: "read all, strip values",
rr: RowRange{},
filter: StripValueFilter(),
cellsReturnedCount: 7,
rowsReturnedCount: 4,
},
{
desc: "read with ColumnFilter + row end + strip values",
rr: RowRange{},
filter: ChainFilters(ColumnFilter(".*j.*"), StripValueFilter()), // matches "j§adams" and "tjefferson"
limit: LimitRows(2),
cellsReturnedCount: 2,
rowsReturnedCount: 2,
},
{
desc: "read with condition, strip values on true",
rr: RowRange{},
filter: ConditionFilter(ColumnFilter(".*j.*"), StripValueFilter(), nil),
cellsReturnedCount: 7,
rowsReturnedCount: 4,
},
{
desc: "read with condition, strip values on false",
rr: RowRange{},
filter: ConditionFilter(ColumnFilter(".*xxx.*"), nil, StripValueFilter()),
cellsReturnedCount: 7,
rowsReturnedCount: 4,
},
{
desc: "read with ValueRangeFilter + row end",
rr: RowRange{},
filter: ValueRangeFilter([]byte("1"), []byte("5")), // matches our value of "1"
limit: LimitRows(2),
cellsReturnedCount: 3,
rowsReturnedCount: 2,
},
{
desc: "read with ValueRangeFilter, no match on exclusive end",
rr: RowRange{},
filter: ValueRangeFilter([]byte("0"), []byte("1")), // no match
cellsReturnedCount: 0,
rowsReturnedCount: 0,
},
{
desc: "read with ValueRangeFilter, no matches",
rr: RowRange{},
filter: ValueRangeFilter([]byte("3"), []byte("5")), // matches nothing
cellsReturnedCount: 0,
rowsReturnedCount: 0,
},
{
desc: "read with InterleaveFilter, no matches on all filters",
rr: RowRange{},
filter: InterleaveFilters(ColumnFilter(".*x.*"), ColumnFilter(".*z.*")),
cellsReturnedCount: 0,
rowsReturnedCount: 0,
},
{
desc: "read with InterleaveFilter, no duplicate cells",
rr: RowRange{},
filter: InterleaveFilters(ColumnFilter(".*g.*"), ColumnFilter(".*j.*")),
cellsReturnedCount: 6,
rowsReturnedCount: 4,
},
{
desc: "read with InterleaveFilter, with duplicate cells",
rr: RowRange{},
filter: InterleaveFilters(ColumnFilter(".*g.*"), ColumnFilter(".*g.*")),
cellsReturnedCount: 4,
rowsReturnedCount: 2,
},
{
desc: "read with a RowRangeList and no filter",
rr: RowRangeList{NewRange("gargamel", "hubbard"), InfiniteRange("wmckinley")},
cellsReturnedCount: 2,
rowsReturnedCount: 2,
},
{
desc: "chain that excludes rows and matches nothing, in a condition",
rr: RowRange{},
filter: ConditionFilter(ChainFilters(ColumnFilter(".*j.*"), ColumnFilter(".*mckinley.*")), StripValueFilter(), nil),
cellsReturnedCount: 0,
rowsReturnedCount: 0,
},
{
desc: "chain that ends with an interleave that has no match. covers #804",
rr: RowRange{},
filter: ConditionFilter(ChainFilters(ColumnFilter(".*j.*"), InterleaveFilters(ColumnFilter(".*x.*"), ColumnFilter(".*z.*"))), StripValueFilter(), nil),
cellsReturnedCount: 0,
rowsReturnedCount: 0,
},
{
desc: "reverse read all, unfiltered",
rr: RowRange{},
reverseScan: true,
cellsReturnedCount: 7,
rowsReturnedCount: 4,
},
{
desc: "reverse read with InfiniteRange, unfiltered",
rr: InfiniteReverseRange("wmckinley"),
reverseScan: true,
cellsReturnedCount: 7,
rowsReturnedCount: 4,
},
} {
t.Run(test.desc, func(t *testing.T) {
var opts []ReadOption
if test.filter != nil {
opts = append(opts, RowFilter(test.filter))
}
if test.limit != nil {
opts = append(opts, test.limit)
}
if test.reverseScan {
opts = append(opts, ReverseScan())
}
// Define a callback for validating request stats.
callbackInvoked := false
statsValidator := WithFullReadStats(
func(stats *FullReadStats) {
if callbackInvoked {
t.Fatalf("The request stats callback was invoked more than once. It should be invoked exactly once.")
}
readStats := stats.ReadIterationStats
callbackInvoked = true
if readStats.CellsReturnedCount != test.cellsReturnedCount {
t.Errorf("CellsReturnedCount did not match. got: %d, want: %d",
readStats.CellsReturnedCount, test.cellsReturnedCount)
}
if readStats.RowsReturnedCount != test.rowsReturnedCount {
t.Errorf("RowsReturnedCount did not match. got: %d, want: %d",
readStats.RowsReturnedCount, test.rowsReturnedCount)
}
// We use lenient checks for CellsSeenCount and RowsSeenCount. Exact checks would be brittle.
// Note that the emulator and prod sometimes yield different values:
// - Sometimes prod scans fewer cells due to optimizations that allow prod to skip cells.
// - Sometimes prod scans more cells due to filters that must rescan cells.
// Similar issues apply for RowsSeenCount.
if got, want := readStats.CellsSeenCount, readStats.CellsReturnedCount; got < want {
t.Errorf("CellsSeenCount should be greater than or equal to CellsReturnedCount. got: %d < want: %d",
got, want)
}
if got, want := readStats.RowsSeenCount, readStats.RowsReturnedCount; got < want {
t.Errorf("RowsSeenCount should be greater than or equal to RowsReturnedCount. got: %d < want: %d",
got, want)
}
})
opts = append(opts, statsValidator)
err := table.ReadRows(ctx, test.rr, func(r Row) bool { return true }, opts...)
if err != nil {
t.Fatal(err)
}
if !callbackInvoked {
t.Fatalf("The request stats callback was not invoked. It should be invoked exactly once.")
}
verifyDirectPathRemoteAddress(testEnv, t)
})
}
}
func TestIntegration_SampleRowKeys(t *testing.T) {
ctx := context.Background()
testEnv, client, adminClient, _, _, cleanup, err := setupIntegration(ctx, t)
if err != nil {
t.Fatal(err)
}
defer cleanup()
presplitTable := fmt.Sprintf("presplit-table-%d", time.Now().Unix())
if err := createPresplitTable(ctx, adminClient, presplitTable, []string{"follows"}); err != nil {
t.Fatal(err)
}
defer adminClient.DeleteTable(ctx, presplitTable)
cf := uid.NewSpace("follows", &uid.Options{Short: true}).New()
if err := createColumnFamily(ctx, t, adminClient, presplitTable, cf, nil); err != nil {
t.Fatal(err)
}
table := client.Open(presplitTable)
// Insert some data.
initialData := map[string][]string{
"wmckinley11": {"tjefferson11"},
"gwashington77": {"j§adams77"},
"tjefferson0": {"gwashington0", "j§adams0"},
}
for row, ss := range initialData {
mut := NewMutation()
for _, name := range ss {
mut.Set(cf, name, 1000, []byte("1"))
}
if err := table.Apply(ctx, row, mut); err != nil {
t.Fatalf("Mutating row %q: %v", row, err)
}
verifyDirectPathRemoteAddress(testEnv, t)
}
sampleKeys, err := table.SampleRowKeys(context.Background())
if err != nil {
t.Fatalf("%s: %v", "SampleRowKeys:", err)
}
if len(sampleKeys) == 0 {
t.Error("SampleRowKeys length 0")
}
}
// testing if deletionProtection works properly e.g. when set to Protected, column family and table cannot be deleted;
// then update the deletionProtection to Unprotected and check if deleting the column family and table works properly.
func TestIntegration_TableDeletionProtection(t *testing.T) {
testEnv, err := NewIntegrationEnv()
if err != nil {
t.Fatalf("IntegrationEnv: %v", err)
}
defer testEnv.Close()
timeout := 2 * time.Second
if testEnv.Config().UseProd {
timeout = 5 * time.Minute
}
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
adminClient, err := testEnv.NewAdminClient()
if err != nil {
t.Fatalf("NewAdminClient: %v", err)
}
defer adminClient.Close()
myTableName := myTableNameSpace.New()
tableConf := TableConf{
TableID: myTableName,
Families: map[string]GCPolicy{
"fam1": MaxVersionsPolicy(1),
"fam2": MaxVersionsPolicy(2),
},
DeletionProtection: Protected,
}
if err := createTableFromConf(ctx, adminClient, &tableConf); err != nil {
t.Fatalf("Create table from config: %v", err)
}
table, err := adminClient.TableInfo(ctx, tableConf.TableID)
if err != nil {
t.Fatalf("Getting table info: %v", err)
}
if table.DeletionProtection != Protected {
t.Errorf("Expect table deletion protection to be enabled for table: %v", tableConf.TableID)
}
// Check if the deletion protection works properly
if err = adminClient.DeleteColumnFamily(ctx, tableConf.TableID, "fam1"); err == nil {
t.Errorf("We shouldn't be able to delete the column family fam1 when the deletion protection is enabled for table %v", myTableName)
}
if err = adminClient.DeleteTable(ctx, tableConf.TableID); err == nil {
t.Errorf("We shouldn't be able to delete the table when the deletion protection is enabled for table %v", myTableName)
}
if err := adminClient.UpdateTableWithDeletionProtection(ctx, tableConf.TableID, Unprotected); err != nil {
t.Fatalf("Update table from config: %v", err)
}
table, err = adminClient.TableInfo(ctx, tableConf.TableID)
if err != nil {
t.Fatalf("Getting table info: %v", err)
}
if table.DeletionProtection != Unprotected {
t.Errorf("Expect table deletion protection to be disabled for table: %v", tableConf.TableID)
}
if err := adminClient.DeleteColumnFamily(ctx, tableConf.TableID, "fam1"); err != nil {
t.Errorf("Delete column family does not work properly while deletion protection bit is disabled: %v", err)
}
if err = adminClient.DeleteTable(ctx, tableConf.TableID); err != nil {
t.Errorf("Deleting the table does not work properly while deletion protection bit is disabled: %v", err)
}
}
// testing if change stream works properly i.e. can create table with change
// stream and disable change stream on existing table and delete fails if change
// stream is enabled.
func TestIntegration_EnableChangeStream(t *testing.T) {
testEnv, err := NewIntegrationEnv()
if err != nil {
t.Fatalf("IntegrationEnv: %v", err)
}
defer testEnv.Close()
timeout := 2 * time.Second
if testEnv.Config().UseProd {
timeout = 5 * time.Minute
}
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
adminClient, err := testEnv.NewAdminClient()
if err != nil {
t.Fatalf("NewAdminClient: %v", err)
}
defer adminClient.Close()
changeStreamRetention, err := time.ParseDuration("24h")
if err != nil {
t.Fatalf("ChangeStreamRetention not valid: %v", err)
}
myTableName := myTableNameSpace.New()
tableConf := TableConf{
TableID: myTableName,
Families: map[string]GCPolicy{
"fam1": MaxVersionsPolicy(1),
"fam2": MaxVersionsPolicy(2),
},
ChangeStreamRetention: changeStreamRetention,
}
if err := createTableFromConf(ctx, adminClient, &tableConf); err != nil {
t.Fatalf("Create table from config: %v", err)
}
table, err := adminClient.TableInfo(ctx, tableConf.TableID)
if err != nil {
t.Fatalf("Getting table info: %v", err)
}
if table.ChangeStreamRetention != changeStreamRetention {
t.Errorf("Expect table change stream to be enabled for table: %v has info: %v", tableConf.TableID, table)
}
// Update retention
changeStreamRetention, err = time.ParseDuration("70h")
if err != nil {
t.Fatalf("ChangeStreamRetention not valid: %v", err)
}
if err := adminClient.UpdateTableWithChangeStream(ctx, tableConf.TableID, changeStreamRetention); err != nil {
t.Fatalf("Update table from config: %v", err)
}
table, err = adminClient.TableInfo(ctx, tableConf.TableID)
if err != nil {
t.Fatalf("Getting table info: %v", err)
}
if table.ChangeStreamRetention != changeStreamRetention {
t.Errorf("Expect table change stream to be enabled for table: %v has info: %v", tableConf.TableID, table)
}
// Disable change stream
if err := adminClient.UpdateTableDisableChangeStream(ctx, tableConf.TableID); err != nil {
t.Fatalf("Update table from config: %v", err)
}
table, err = adminClient.TableInfo(ctx, tableConf.TableID)
if err != nil {
t.Fatalf("Getting table info: %v", err)
}
if table.ChangeStreamRetention != nil {
t.Errorf("Expect table change stream to be disabled for table: %v has info: %v", tableConf.TableID, table)
}
if err = adminClient.DeleteTable(ctx, tableConf.TableID); err != nil {
t.Errorf("Deleting the table failed when change stream is disabled: %v", err)
}
}
func equalOptionalDuration(a, b optional.Duration) bool {
if a == nil && b == nil {
return true
}
if a == nil || b == nil {
return false
}
return int64(a.(time.Duration).Seconds()) == int64(b.(time.Duration).Seconds())
}
// Testing if automated backups works properly i.e.
// - Can create table with Automated Backups configured
// - Can update Automated Backup Policy on an existing table
// - Can disable Automated Backups on an existing table
func TestIntegration_AutomatedBackups(t *testing.T) {
testEnv, err := NewIntegrationEnv()
if err != nil {
t.Fatalf("IntegrationEnv: %v", err)
}
defer testEnv.Close()
if !testEnv.Config().UseProd {
t.Skip("emulator doesn't support Automated Backups")
}
timeout := 5 * time.Minute
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
adminClient, err := testEnv.NewAdminClient()
if err != nil {
t.Fatalf("NewAdminClient: %v", err)
}
defer adminClient.Close()
retentionPeriod, err := time.ParseDuration("72h")
if err != nil {
t.Fatalf("RetentionPeriod not valid: %v", err)
}
frequency, err := time.ParseDuration("24h")
if err != nil {
t.Fatalf("Frequency not valid: %v", err)
}
automatedBackupPolicy := TableAutomatedBackupPolicy{RetentionPeriod: retentionPeriod, Frequency: frequency}
myTableName := myTableNameSpace.New()
tableConf := TableConf{
TableID: myTableName,
Families: map[string]GCPolicy{
"fam1": MaxVersionsPolicy(1),
"fam2": MaxVersionsPolicy(2),
},
AutomatedBackupConfig: &automatedBackupPolicy,
}
if err := createTableFromConf(ctx, adminClient, &tableConf); err != nil {
t.Fatalf("Create table from config: %v", err)
}
defer deleteTable(ctx, t, adminClient, tableConf.TableID)
table, err := adminClient.TableInfo(ctx, tableConf.TableID)
if err != nil {
t.Fatalf("Getting table info: %v", err)
}
if table.AutomatedBackupConfig == nil {
t.Errorf("Expect Automated Backup Policy to be enabled for table: %v has info: %v", tableConf.TableID, table)
}
tableAbp := table.AutomatedBackupConfig.(*TableAutomatedBackupPolicy)
if !equalOptionalDuration(tableAbp.Frequency, automatedBackupPolicy.Frequency) {
t.Errorf("Expect automated backup policy frequency to be set for table: %v has info: %v", tableConf.TableID, table)
}
if !equalOptionalDuration(tableAbp.RetentionPeriod, automatedBackupPolicy.RetentionPeriod) {
t.Errorf("Expect automated backup policy retention period to be set for table: %v has info: %v", tableConf.TableID, table)
}
// Test update automated backup policy
retentionPeriod, err = time.ParseDuration("72h")
if err != nil {
t.Fatalf("RetentionPeriod not valid: %v", err)
}
frequency, err = time.ParseDuration("24h")
if err != nil {
t.Fatalf("Frequency not valid: %v", err)
}
for _, testcase := range []struct {
desc string
bkpPolicy TableAutomatedBackupPolicy
}{
{
desc: "Update automated backup policy, just frequency",
bkpPolicy: TableAutomatedBackupPolicy{Frequency: frequency},
},
{
desc: "Update automated backup policy, just retention period",
bkpPolicy: TableAutomatedBackupPolicy{RetentionPeriod: retentionPeriod},
},
{
desc: "Update automated backup policy, all fields",
bkpPolicy: TableAutomatedBackupPolicy{RetentionPeriod: retentionPeriod, Frequency: frequency},
},
} {
if gotErr := adminClient.UpdateTableWithAutomatedBackupPolicy(ctx, tableConf.TableID, testcase.bkpPolicy); err != nil {
t.Fatalf("%v: Update table from config: %v", testcase.desc, gotErr)
}
gotTable, gotErr := adminClient.TableInfo(ctx, tableConf.TableID)
if gotErr != nil {
t.Fatalf("%v: Getting table info: %v", testcase.desc, gotErr)
}
if gotTable.AutomatedBackupConfig == nil {
t.Errorf("%v: Expect Automated Backup Policy to be enabled for table: %v has info: %v", testcase.desc, tableConf.TableID, gotTable)
}
gotTableAbp := gotTable.AutomatedBackupConfig.(*TableAutomatedBackupPolicy)
if testcase.bkpPolicy.Frequency != nil && !equalOptionalDuration(gotTableAbp.Frequency, testcase.bkpPolicy.Frequency) {
t.Errorf("%v: Expect automated backup policy frequency to be set for table: %v has info: %v", testcase.desc, tableConf.TableID, table)
}
if testcase.bkpPolicy.RetentionPeriod != nil && !equalOptionalDuration(gotTableAbp.RetentionPeriod, testcase.bkpPolicy.RetentionPeriod) {
t.Errorf("%v: Expect automated backup policy retention period to be set for table: %v has info: %v", testcase.desc, tableConf.TableID, table)
}
}
// Test disable automated backups
if err := adminClient.UpdateTableDisableAutomatedBackupPolicy(ctx, tableConf.TableID); err != nil {
t.Fatalf("Update table from config: %v", err)
}
table, err = adminClient.TableInfo(ctx, tableConf.TableID)
if err != nil {
t.Fatalf("Getting table info: %v", err)
}
if table.AutomatedBackupConfig != nil {
t.Errorf("Expect table automated backups to be disabled for table: %v has info: %v", tableConf.TableID, table)
}
}
func TestIntegration_CreateTableWithRowKeySchema(t *testing.T) {
testEnv, err := NewIntegrationEnv()
if err != nil {
t.Fatalf("IntegrationEnv: %v", err)
}
defer testEnv.Close()
if !testEnv.Config().UseProd {
t.Skip("emulator doesn't support row key schema")
}
timeout := 5 * time.Minute
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
adminClient, err := testEnv.NewAdminClient()
if err != nil {
t.Fatalf("NewAdminClient: %v", err)
}
defer adminClient.Close()
testCases := []struct {
desc string
rks StructType
errorExpected bool
}{
{
desc: "Create fail with conflict family and row key column",
rks: StructType{
Fields: []StructField{{FieldName: "fam1", FieldType: Int64Type{Encoding: BigEndianBytesEncoding{}}}},
Encoding: StructOrderedCodeBytesEncoding{},
},
errorExpected: true,
},
{
desc: "Create fail with missing encoding in struct type",
rks: StructType{
Fields: []StructField{{FieldName: "myfield", FieldType: Int64Type{Encoding: BigEndianBytesEncoding{}}}},
},
errorExpected: true,
},
{
desc: "Create fail on DelimitedBytes missing delimiter",
rks: StructType{
Fields: []StructField{{FieldName: "myfield", FieldType: StringType{Encoding: StringUtf8BytesEncoding{}}}},
Encoding: StructDelimitedBytesEncoding{},
},
errorExpected: true,
},
{
desc: "Create with Singleton failed with more than 1 field",
rks: StructType{
Fields: []StructField{
{FieldName: "myfield1", FieldType: StringType{Encoding: StringUtf8BytesEncoding{}}},
{FieldName: "myfield2", FieldType: Int64Type{Encoding: BigEndianBytesEncoding{}}},
},
Encoding: StructSingletonEncoding{},
},
errorExpected: true,
},
{
desc: "Create with Singleton ok",
rks: StructType{
Fields: []StructField{
{FieldName: "myfield1", FieldType: StringType{Encoding: StringUtf8BytesEncoding{}}},
},
Encoding: StructSingletonEncoding{},
},
},
{
desc: "Create with OrderedCode ok",
rks: StructType{
Fields: []StructField{
{FieldName: "myfield1", FieldType: StringType{Encoding: StringUtf8BytesEncoding{}}},
{FieldName: "myfield2", FieldType: Int64Type{Encoding: BigEndianBytesEncoding{}}},
},
Encoding: StructOrderedCodeBytesEncoding{},
},
},
{
desc: "Create with DelimitedBytes ok",
rks: StructType{
Fields: []StructField{
{FieldName: "myfield1", FieldType: StringType{Encoding: StringUtf8BytesEncoding{}}},
{FieldName: "myfield2", FieldType: Int64Type{Encoding: BigEndianBytesEncoding{}}},
},
Encoding: StructDelimitedBytesEncoding{
Delimiter: []byte{'#'},
},
},
},
}
for _, tc := range testCases {
myTableName := myTableNameSpace.New()
tableConf := TableConf{
TableID: myTableName,
Families: map[string]GCPolicy{
"fam1": MaxVersionsPolicy(1),
"fam2": MaxVersionsPolicy(2),
},
}
tableConf.RowKeySchema = &tc.rks
err := adminClient.CreateTableFromConf(ctx, &tableConf)
if tc.errorExpected && err == nil {
t.Fatalf("Want error from test: '%v', got nil", tc.desc)
}
if !tc.errorExpected && err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// get the table and see the new schema is updated
tbl, err := adminClient.TableInfo(ctx, tableConf.TableID)
if !tc.errorExpected && tbl.RowKeySchema == nil {
t.Errorf("Expecting row key schema %v to be created in table, got nil", tc.rks)
}
if tbl != nil {
// clean up table
err = adminClient.DeleteTable(ctx, tableConf.TableID)
if err != nil {
t.Fatalf("Unexpected error trying to clean up table: %v", err)
}
}
}
}
func TestIntegration_UpdateRowKeySchemaInTable(t *testing.T) {
testEnv, err := NewIntegrationEnv()
if err != nil {
t.Fatalf("IntegrationEnv: %v", err)
}
defer testEnv.Close()
if !testEnv.Config().UseProd {
t.Skip("emulator doesn't support Automated Backups")
}
timeout := 5 * time.Minute
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
adminClient, err := testEnv.NewAdminClient()
if err != nil {
t.Fatalf("NewAdminClient: %v", err)
}
defer adminClient.Close()
testCases := []struct {
desc string
updateRks StructType
errorExpected bool
currentRks *StructType
}{
{
desc: "Update fail with conflicting family name",
updateRks: StructType{
Fields: []StructField{{FieldName: "fam1", FieldType: Int64Type{Encoding: BigEndianBytesEncoding{}}}},
Encoding: StructSingletonEncoding{},
},
errorExpected: true,
currentRks: nil,
},
{
desc: "Update fail for table with existing row key schema",
updateRks: StructType{
Fields: []StructField{{FieldName: "mycol", FieldType: Int64Type{Encoding: BigEndianBytesEncoding{}}}},
Encoding: StructSingletonEncoding{},
},
currentRks: &StructType{
Fields: []StructField{{FieldName: "myfirstcol", FieldType: Int64Type{Encoding: BigEndianBytesEncoding{}}}},
Encoding: StructDelimitedBytesEncoding{Delimiter: []byte{'#'}},
},
errorExpected: true,
},
{
desc: "Update ok",
updateRks: StructType{
Fields: []StructField{
{FieldName: "myfield", FieldType: Int64Type{Encoding: BigEndianBytesEncoding{}}},
{FieldName: "myfield2", FieldType: StringType{Encoding: StringUtf8BytesEncoding{}}}},
Encoding: StructDelimitedBytesEncoding{
Delimiter: []byte{'#'},
},
},
currentRks: nil,
},
}
for _, tc := range testCases {
myTableName := myTableNameSpace.New()
tableConf := TableConf{
TableID: myTableName,
Families: map[string]GCPolicy{
"fam1": MaxVersionsPolicy(1),
},
}
if tc.currentRks != nil {
tableConf.RowKeySchema = tc.currentRks
}
if err := adminClient.CreateTableFromConf(ctx, &tableConf); err != nil {
t.Fatalf("Unexpected error trying to create table: %v", err)
}
defer adminClient.DeleteTable(ctx, tableConf.TableID)
err = adminClient.UpdateTableWithRowKeySchema(ctx, tableConf.TableID, tc.updateRks)
if tc.errorExpected && err == nil {
t.Fatalf("Expecting error from test '%v', got nil", tc.desc)
}
if !tc.errorExpected && err != nil {
t.Fatalf("Unexpected error from test '%v': %v", tc.desc, err)
}
// Get the table to check if the schema is updated
tbl, err := adminClient.TableInfo(ctx, tableConf.TableID)
if !tc.errorExpected && tbl.RowKeySchema == nil {
t.Errorf("Expecting row key schema %v to be updated in table, got: %v", tc.updateRks, tbl)
}
// Clear schema ok
if err = adminClient.UpdateTableRemoveRowKeySchema(ctx, tableConf.TableID); err != nil {
t.Errorf("Unexpected error trying to clear row key schema: %v", err)
}
}
}
func TestIntegration_Admin(t *testing.T) {
testEnv, err := NewIntegrationEnv()
if err != nil {
t.Fatalf("IntegrationEnv: %v", err)
}
defer testEnv.Close()
timeout := 2 * time.Second
if testEnv.Config().UseProd {
timeout = 5 * time.Minute
}
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
adminClient, err := testEnv.NewAdminClient()
if err != nil {
t.Fatalf("NewAdminClient: %v", err)
}
defer adminClient.Close()
iAdminClient, err := testEnv.NewInstanceAdminClient()
if err != nil {
t.Fatalf("NewInstanceAdminClient: %v", err)
}
if iAdminClient != nil {
defer iAdminClient.Close()
iInfo, err := iAdminClient.InstanceInfo(ctx, adminClient.instance)
if err != nil {
t.Errorf("InstanceInfo: %v", err)
}
if iInfo.Name != adminClient.instance {
t.Errorf("InstanceInfo returned name %#v, want %#v", iInfo.Name, adminClient.instance)
}
}
list := func() []string {
tbls, err := adminClient.Tables(ctx)
if err != nil {
t.Fatalf("Fetching list of tables: %v", err)
}
sort.Strings(tbls)
return tbls
}
containsAll := func(got, want []string) bool {
gotSet := make(map[string]bool)
for _, s := range got {
gotSet[s] = true
}
for _, s := range want {
if !gotSet[s] {
return false
}
}
return true
}
myTableName := myTableNameSpace.New()
defer deleteTable(ctx, t, adminClient, myTableName)
if err := createTable(ctx, adminClient, myTableName); err != nil {
t.Fatalf("Creating table: %v", err)
}
myOtherTableName := myOtherTableNameSpace.New()
defer deleteTable(ctx, t, adminClient, myOtherTableName)
if err := createTable(ctx, adminClient, myOtherTableName); err != nil {
t.Fatalf("Creating table: %v", err)
}
if got, want := list(), []string{myOtherTableName, myTableName}; !containsAll(got, want) {
t.Errorf("adminClient.Tables returned %#v, want %#v", got, want)
}
must(adminClient.WaitForReplication(ctx, myTableName))
if err := adminClient.DeleteTable(ctx, myOtherTableName); err != nil {
t.Fatalf("Deleting table: %v", err)
}
tables := list()
if got, want := tables, []string{myTableName}; !containsAll(got, want) {
t.Errorf("adminClient.Tables returned %#v, want %#v", got, want)
}
if got, unwanted := tables, []string{myOtherTableName}; containsAll(got, unwanted) {
t.Errorf("adminClient.Tables return %#v. unwanted %#v", got, unwanted)
}
uniqueID := make([]byte, 4)
rand.Read(uniqueID)
tableID := fmt.Sprintf("conftable%x", uniqueID)
tblConf := TableConf{
TableID: tableID,
Families: map[string]GCPolicy{
"fam1": MaxVersionsPolicy(1),
"fam2": MaxVersionsPolicy(2),
},
}
if err := createTableFromConf(ctx, adminClient, &tblConf); err != nil {
t.Fatalf("Creating table from TableConf: %v", err)
}
defer deleteTable(ctx, t, adminClient, tblConf.TableID)
tblInfo, err := adminClient.TableInfo(ctx, tblConf.TableID)
if err != nil {
t.Fatalf("Getting table info: %v", err)
}
sort.Strings(tblInfo.Families)
wantFams := []string{"fam1", "fam2"}
if !testutil.Equal(tblInfo.Families, wantFams) {
t.Errorf("Column family mismatch, got %v, want %v", tblInfo.Families, wantFams)
}
// Populate mytable and drop row ranges
if err = createColumnFamily(ctx, t, adminClient, myTableName, "cf", nil); err != nil {
t.Fatalf("Creating column family: %v", err)
}
client, err := testEnv.NewClient()
if err != nil {
t.Fatalf("NewClient: %v", err)
}
defer client.Close()
tbl := client.Open(myTableName)
prefixes := []string{"a", "b", "c"}
for _, prefix := range prefixes {
for i := 0; i < 5; i++ {
mut := NewMutation()
mut.Set("cf", "col", 1000, []byte("1"))
if err := tbl.Apply(ctx, fmt.Sprintf("%v-%v", prefix, i), mut); err != nil {
t.Fatalf("Mutating row: %v", err)
}
}
}
if err = adminClient.DropRowRange(ctx, myTableName, "a"); err != nil {
t.Errorf("DropRowRange a: %v", err)
}
if err = adminClient.DropRowRange(ctx, myTableName, "c"); err != nil {
t.Errorf("DropRowRange c: %v", err)
}
if err = adminClient.DropRowRange(ctx, myTableName, "x"); err != nil {
t.Errorf("DropRowRange x: %v", err)
}
var gotRowCount int
must(tbl.ReadRows(ctx, RowRange{}, func(row Row) bool {
gotRowCount++
if !strings.HasPrefix(row.Key(), "b") {
t.Errorf("Invalid row after dropping range: %v", row)
}
return true
}))
if gotRowCount != 5 {
t.Errorf("Invalid row count after dropping range: got %v, want %v", gotRowCount, 5)
}
if err = adminClient.DropAllRows(ctx, myTableName); err != nil {
t.Errorf("DropAllRows mytable: %v", err)
}
gotRowCount = 0
must(tbl.ReadRows(ctx, RowRange{}, func(row Row) bool {
gotRowCount++
return true
}))
if gotRowCount != 0 {
t.Errorf("Invalid row count after truncating table: got %v, want %v", gotRowCount, 0)
}
// Validate Encryption Info configured to default. (not supported by emulator)
if testEnv.Config().UseProd {
encryptionInfo, err := adminClient.EncryptionInfo(ctx, myTableName)
if err != nil {
t.Fatalf("EncryptionInfo: %v", err)
}
wantLen := 1
if testEnv.Config().Cluster2 != "" {
wantLen++
}
if got, want := len(encryptionInfo), wantLen; !cmp.Equal(got, want) {
t.Fatalf("Number of Clusters with Encryption Info: %v, want: %v", got, want)
}
clusterEncryptionInfo := encryptionInfo[testEnv.Config().Cluster][0]
if clusterEncryptionInfo.KMSKeyVersion != "" {
t.Errorf("Encryption Info mismatch, got %v, want %v", clusterEncryptionInfo.KMSKeyVersion, 0)
}
if clusterEncryptionInfo.Type != GoogleDefaultEncryption {
t.Errorf("Encryption Info mismatch, got %v, want %v", clusterEncryptionInfo.Type, GoogleDefaultEncryption)
}
}
}
func TestIntegration_TableIam(t *testing.T) {
testEnv, err := NewIntegrationEnv()
if err != nil {
t.Fatalf("IntegrationEnv: %v", err)
}
defer testEnv.Close()
if !testEnv.Config().UseProd {
t.Skip("emulator doesn't support IAM Policy creation")
}
timeout := 5 * time.Minute
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
adminClient, err := testEnv.NewAdminClient()
if err != nil {
t.Fatalf("NewAdminClient: %v", err)
}
defer adminClient.Close()
myTableName := myTableNameSpace.New()
defer deleteTable(ctx, t, adminClient, myTableName)
if err := createTable(ctx, adminClient, myTableName); err != nil {
t.Fatalf("Creating table: %v", err)
}
// Verify that the IAM Controls work for Tables.
iamHandle := adminClient.TableIAM(myTableName)
p, err := iamHandle.Policy(ctx)
if err != nil {
t.Fatalf("Iam GetPolicy mytable: %v", err)
}
if err = iamHandle.SetPolicy(ctx, p); err != nil {
t.Errorf("Iam SetPolicy mytable: %v", err)
}
if _, err = iamHandle.TestPermissions(ctx, []string{"bigtable.tables.get"}); err != nil {
t.Errorf("Iam TestPermissions mytable: %v", err)
}
}
func TestIntegration_BackupIAM(t *testing.T) {
testEnv, err := NewIntegrationEnv()
if err != nil {
t.Fatalf("IntegrationEnv: %v", err)
}
defer testEnv.Close()
if !testEnv.Config().UseProd {
t.Skip("emulator doesn't support IAM Policy creation")
}
timeout := 5 * time.Minute
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
adminClient, err := testEnv.NewAdminClient()
if err != nil {
t.Fatalf("NewAdminClient: %v", err)
}
defer adminClient.Close()
table := testEnv.Config().Table
cluster := testEnv.Config().Cluster
defer deleteTable(ctx, t, adminClient, table)
if err := createTable(ctx, adminClient, table); err != nil {
t.Fatalf("Creating table: %v", err)
}
// Create backup.
opts := &uid.Options{Sep: '_'}
backupUUID := uid.NewSpace("backup", opts)
backup := backupUUID.New()
defer adminClient.DeleteBackup(ctx, cluster, backup)
if err = adminClient.CreateBackup(ctx, table, cluster, backup, time.Now().Add(8*time.Hour)); err != nil {
t.Fatalf("Creating backup: %v", err)
}
iamHandle := adminClient.BackupIAM(cluster, backup)
// Get backup policy.
p, err := iamHandle.Policy(ctx)
if err != nil {
t.Errorf("iamHandle.Policy: %v", err)
}
// The resource is new, so the policy should be empty.
if got := p.Roles(); len(got) > 0 {
t.Errorf("got roles %v, want none", got)
}
// Set backup policy.
member := "domain:google.com"
// Add a member, set the policy, then check that the member is present.
p.Add(member, iam.Viewer)
if err = iamHandle.SetPolicy(ctx, p); err != nil {
t.Errorf("iamHandle.SetPolicy: %v", err)
}
p, err = iamHandle.Policy(ctx)
if err != nil {
t.Errorf("iamHandle.Policy: %v", err)
}
if got, want := p.Members(iam.Viewer), []string{member}; !testutil.Equal(got, want) {
t.Errorf("iamHandle.Policy: got %v, want %v", got, want)
}
// Test backup permissions.
permissions := []string{"bigtable.backups.get", "bigtable.backups.update"}
_, err = iamHandle.TestPermissions(ctx, permissions)
if err != nil {
t.Errorf("iamHandle.TestPermissions: %v", err)
}
}
func TestIntegration_AuthorizedViewIAM(t *testing.T) {
t.Parallel()
testEnv, err := NewIntegrationEnv()
if err != nil {
t.Fatalf("IntegrationEnv: %v", err)
}
defer testEnv.Close()
if !testEnv.Config().UseProd {
t.Skip("emulator doesn't support IAM Policy creation")
}
timeout := 5 * time.Minute
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
adminClient, err := testEnv.NewAdminClient()
if err != nil {
t.Fatalf("NewAdminClient: %v", err)
}
defer adminClient.Close()
table := tableNameSpace.New()
defer deleteTable(ctx, t, adminClient, table)
if err := createTable(ctx, adminClient, table); err != nil {
t.Fatalf("Creating table: %v", err)
}
// Create authorized view.
opts := &uid.Options{Sep: '_'}
authorizedViewUUID := uid.NewSpace("authorizedView", opts)
authorizedView := authorizedViewUUID.New()
defer adminClient.DeleteAuthorizedView(ctx, table, authorizedView)
if err = adminClient.CreateAuthorizedView(ctx, &AuthorizedViewConf{
TableID: table,
AuthorizedViewID: authorizedView,
AuthorizedView: &SubsetViewConf{},
DeletionProtection: Unprotected,
}); err != nil {
t.Fatalf("Creating authorizedView: %v", err)
}
iamHandle := adminClient.AuthorizedViewIAM(table, authorizedView)
// Get authorized view policy.
p, err := iamHandle.Policy(ctx)
if err != nil {
t.Errorf("iamHandle.Policy: %v", err)
}
// The resource is new, so the policy should be empty.
if got := p.Roles(); len(got) > 0 {
t.Errorf("got roles %v, want none", got)
}
// Set authorized view policy.
member := "domain:google.com"
// Add a member, set the policy, then check that the member is present.
p.Add(member, iam.Viewer)
if err = iamHandle.SetPolicy(ctx, p); err != nil {
t.Errorf("iamHandle.SetPolicy: %v", err)
}
p, err = iamHandle.Policy(ctx)
if err != nil {
t.Errorf("iamHandle.Policy: %v", err)
}
if got, want := p.Members(iam.Viewer), []string{member}; !testutil.Equal(got, want) {
t.Errorf("iamHandle.Policy: got %v, want %v", got, want)
}
// Test authorized view permissions.
permissions := []string{"bigtable.authorizedViews.get", "bigtable.authorizedViews.update"}
_, err = iamHandle.TestPermissions(ctx, permissions)
if err != nil {
t.Errorf("iamHandle.TestPermissions: %v", err)
}
}
func TestIntegration_AdminCreateInstance(t *testing.T) {
if instanceToCreate == "" {
t.Skip("instanceToCreate not set, skipping instance creation testing")
}
instanceToCreate += "0"
testEnv, err := NewIntegrationEnv()
if err != nil {
t.Fatalf("IntegrationEnv: %v", err)
}
defer testEnv.Close()
if !testEnv.Config().UseProd {
t.Skip("emulator doesn't support instance creation")
}
timeout := 7 * time.Minute
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
iAdminClient, err := testEnv.NewInstanceAdminClient()
if err != nil {
t.Fatalf("NewInstanceAdminClient: %v", err)
}
defer iAdminClient.Close()
clusterID := instanceToCreate + "-cluster"
// Create a development instance
conf := &InstanceConf{
InstanceId: instanceToCreate,
ClusterId: clusterID,
DisplayName: "test instance",
Zone: instanceToCreateZone,
InstanceType: DEVELOPMENT,
Labels: map[string]string{"test-label-key": "test-label-value"},
}
// CreateInstance can be flaky; retry before marking as failing.
if err := createInstance(ctx, iAdminClient, conf); err != nil {
t.Fatalf("CreateInstance: %v", err)
}
defer iAdminClient.DeleteInstance(ctx, instanceToCreate)
iInfo, err := iAdminClient.InstanceInfo(ctx, instanceToCreate)
if err != nil {
t.Fatalf("InstanceInfo: %v", err)
}
// Basic return values are tested elsewhere, check instance type
if iInfo.InstanceType != DEVELOPMENT {
t.Fatalf("Instance is not DEVELOPMENT: %v", iInfo.InstanceType)
}
if got, want := iInfo.Labels, conf.Labels; !cmp.Equal(got, want) {
t.Fatalf("Labels: %v, want: %v", got, want)
}
// Update everything we can about the instance in one call.
confWithClusters := &InstanceWithClustersConfig{
InstanceID: instanceToCreate,
DisplayName: "new display name",
InstanceType: PRODUCTION,
Labels: map[string]string{"new-label-key": "new-label-value"},
Clusters: []ClusterConfig{
{ClusterID: clusterID, NumNodes: 5},
},
}
if err = iAdminClient.UpdateInstanceWithClusters(ctx, confWithClusters); err != nil {
t.Fatalf("UpdateInstanceWithClusters: %v", err)
}
iInfo, err = iAdminClient.InstanceInfo(ctx, instanceToCreate)
if err != nil {
t.Fatalf("InstanceInfo: %v", err)
}
if iInfo.InstanceType != PRODUCTION {
t.Fatalf("Instance type is not PRODUCTION: %v", iInfo.InstanceType)
}
if got, want := iInfo.Labels, confWithClusters.Labels; !cmp.Equal(got, want) {
t.Fatalf("Labels: %v, want: %v", got, want)
}
if got, want := iInfo.DisplayName, confWithClusters.DisplayName; got != want {
t.Fatalf("Display name: %q, want: %q", got, want)
}
cInfo, err := iAdminClient.GetCluster(ctx, instanceToCreate, clusterID)
if err != nil {
t.Fatalf("GetCluster: %v", err)
}
if cInfo.ServeNodes != 5 {
t.Fatalf("NumNodes: %v, want: %v", cInfo.ServeNodes, 5)
}
if cInfo.KMSKeyName != "" {
t.Fatalf("KMSKeyName: %v, want: %v", cInfo.KMSKeyName, "")
}
}
func TestIntegration_AdminEncryptionInfo(t *testing.T) {
t.Skip("flaky - https://github.com/googleapis/google-cloud-go/issues/11268")
if instanceToCreate == "" {
t.Skip("instanceToCreate not set, skipping instance creation testing")
}
instanceToCreate += "1"
testEnv, err := NewIntegrationEnv()
if err != nil {
t.Fatalf("IntegrationEnv: %v", err)
}
defer testEnv.Close()
if !testEnv.Config().UseProd {
t.Skip("emulator doesn't support instance creation")
}
// adjust test environment to use our cluster to create
c := testEnv.Config()
c.Instance = instanceToCreate
testEnv, err = NewProdEnv(c)
if err != nil {
t.Fatalf("NewProdEnv: %v", err)
}
timeout := 10 * time.Minute
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
iAdminClient, err := testEnv.NewInstanceAdminClient()
if err != nil {
t.Fatalf("NewInstanceAdminClient: %v", err)
}
defer iAdminClient.Close()
adminClient, err := testEnv.NewAdminClient()
if err != nil {
t.Fatalf("NewAdminClient: %v", err)
}
defer adminClient.Close()
table := instanceToCreate + "-table"
clusterID := instanceToCreate + "-cluster"
keyRingName := os.Getenv("GCLOUD_TESTS_BIGTABLE_KEYRING")
if keyRingName == "" {
// try to fall back on GOLANG keyring
keyRingName = os.Getenv("GCLOUD_TESTS_GOLANG_KEYRING")
if keyRingName == "" {
t.Fatal("GCLOUD_TESTS_BIGTABLE_KEYRING or GCLOUD_TESTS_GOLANG_KEYRING must be set. See CONTRIBUTING.md for details")
}
}
kmsKeyName := keyRingName + "/cryptoKeys/key1"
conf := &InstanceWithClustersConfig{
InstanceID: instanceToCreate,
DisplayName: "test instance",
Clusters: []ClusterConfig{
{
ClusterID: clusterID,
KMSKeyName: kmsKeyName,
Zone: instanceToCreateZone,
NumNodes: 1,
},
},
}
defer iAdminClient.DeleteInstance(ctx, instanceToCreate)
err = retry(func() error { return iAdminClient.CreateInstanceWithClusters(ctx, conf) },
func() error { return iAdminClient.DeleteInstance(ctx, conf.InstanceID) })
if err != nil {
t.Fatalf("CreateInstanceWithClusters: %v", err)
}
// Delete the table at the end of the test. Schedule ahead of time
// in case the client fails
defer deleteTable(ctx, t, adminClient, table)
if err := createTable(ctx, adminClient, table); err != nil {
t.Fatalf("Creating table: %v", err)
}
var encryptionKeyVersion string
// The encryption info can take 30-500s (currently about 120-190s) to
// become ready.
for i := 0; i < 50; i++ {
encryptionInfo, err := adminClient.EncryptionInfo(ctx, table)
if err != nil {
t.Fatalf("EncryptionInfo: %v", err)
}
encryptionKeyVersion = encryptionInfo[clusterID][0].KMSKeyVersion
if encryptionKeyVersion != "" {
break
}
time.Sleep(time.Second * 2)
}
if encryptionKeyVersion == "" {
t.Fatalf("Encryption Key not created within allotted time end")
}
// Validate Encryption Info under getTable
table2, err := adminClient.getTable(ctx, table, btapb.Table_ENCRYPTION_VIEW)
if err != nil {
t.Fatalf("Getting Table: %v", err)
}
if got, want := len(table2.ClusterStates), 1; !cmp.Equal(got, want) {
t.Fatalf("Table Cluster States %v, want: %v", got, want)
}
clusterState := table2.ClusterStates[clusterID]
if got, want := len(clusterState.EncryptionInfo), 1; !cmp.Equal(got, want) {
t.Fatalf("Table Encryption Info Length: %v, want: %v", got, want)
}
tableEncInfo := clusterState.EncryptionInfo[0]
if got, want := int(tableEncInfo.EncryptionStatus.Code), 0; !cmp.Equal(got, want) {
t.Fatalf("EncryptionStatus: %v, want: %v", got, want)
}
// NOTE: this EncryptionType is btapb.EncryptionInfo_EncryptionType
if got, want := tableEncInfo.EncryptionType, btapb.EncryptionInfo_CUSTOMER_MANAGED_ENCRYPTION; !cmp.Equal(got, want) {
t.Fatalf("EncryptionType: %v, want: %v", got, want)
}
if got, want := tableEncInfo.KmsKeyVersion, encryptionKeyVersion; !cmp.Equal(got, want) {
t.Fatalf("KMS Key Version: %v, want: %v", got, want)
}
// Validate Encryption Info retrieved via EncryptionInfo
encryptionInfo, err := adminClient.EncryptionInfo(ctx, table)
if err != nil {
t.Fatalf("EncryptionInfo: %v", err)
}
if got, want := len(encryptionInfo), 1; !cmp.Equal(got, want) {
t.Fatalf("Number of Clusters with Encryption Info: %v, want: %v", got, want)
}
encryptionInfos := encryptionInfo[clusterID]
if got, want := len(encryptionInfos), 1; !cmp.Equal(got, want) {
t.Fatalf("Encryption Info Length: %v, want: %v", got, want)
}
if len(encryptionInfos) != 1 {
t.Fatalf("Expected Single EncryptionInfo")
}
v := encryptionInfos[0]
if got, want := int(v.Status.Code), 0; !cmp.Equal(got, want) {
t.Fatalf("EncryptionStatus: %v, want: %v", got, want)
}
// NOTE: this EncryptionType is EncryptionType
if got, want := v.Type, CustomerManagedEncryption; !cmp.Equal(got, want) {
t.Fatalf("EncryptionType: %v, want: %v", got, want)
}
if got, want := v.KMSKeyVersion, encryptionKeyVersion; !cmp.Equal(got, want) {
t.Fatalf("KMS Key Version: %v, want: %v", got, want)
}
// Validate CMEK on Cluster Info
cInfo, err := iAdminClient.GetCluster(ctx, instanceToCreate, clusterID)
if err != nil {
t.Fatalf("GetCluster: %v", err)
}
if got, want := cInfo.KMSKeyName, kmsKeyName; !cmp.Equal(got, want) {
t.Fatalf("KMSKeyName: %v, want: %v", got, want)
}
// Create a backup with CMEK enabled, verify backup encryption info
backupName := "backupCMEK"
defer adminClient.DeleteBackup(ctx, clusterID, backupName)
if err = adminClient.CreateBackup(ctx, table, clusterID, backupName, time.Now().Add(8*time.Hour)); err != nil {
t.Fatalf("Creating backup: %v", err)
}
backup, err := adminClient.BackupInfo(ctx, clusterID, backupName)
if err != nil {
t.Fatalf("BackupInfo: %v", backup)
}
if got, want := backup.EncryptionInfo.Type, CustomerManagedEncryption; !cmp.Equal(got, want) {
t.Fatalf("Backup Encryption EncryptionType: %v, want: %v", got, want)
}
if got, want := backup.EncryptionInfo.KMSKeyVersion, encryptionKeyVersion; !cmp.Equal(got, want) {
t.Fatalf("Backup Encryption KMSKeyVersion: %v, want: %v", got, want)
}
if got, want := int(backup.EncryptionInfo.Status.Code), 2; !cmp.Equal(got, want) {
t.Fatalf("Backup EncryptionStatus: %v, want: %v", got, want)
}
}
func TestIntegration_AdminUpdateInstanceLabels(t *testing.T) {
// Check the environments
if instanceToCreate == "" {
t.Skip("instanceToCreate not set, skipping instance creation testing")
}
instanceToCreate += "2"
testEnv, err := NewIntegrationEnv()
if err != nil {
t.Fatalf("IntegrationEnv: %v", err)
}
defer testEnv.Close()
if !testEnv.Config().UseProd {
t.Skip("emulator doesn't support instance creation")
}
// Create an instance admin client
timeout := 7 * time.Minute
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
iAdminClient, err := testEnv.NewInstanceAdminClient()
if err != nil {
t.Fatalf("NewInstanceAdminClient: %v", err)
}
defer iAdminClient.Close()
// Create a test instance
conf := &InstanceConf{
InstanceId: instanceToCreate,
ClusterId: instanceToCreate + "-cluster",
DisplayName: "test instance",
InstanceType: DEVELOPMENT,
Zone: instanceToCreateZone,
}
defer iAdminClient.DeleteInstance(ctx, instanceToCreate)
if err := createInstance(ctx, iAdminClient, conf); err != nil {
t.Fatalf("CreateInstance: %v", err)
}
// Check the created test instances
iInfo, err := iAdminClient.InstanceInfo(ctx, instanceToCreate)
if err != nil {
t.Fatalf("InstanceInfo: %v", err)
}
if got, want := iInfo.Labels, conf.Labels; !cmp.Equal(got, want) {
t.Fatalf("Labels: %v, want: %v", got, want)
}
// Test patterns to update instance labels
tests := []struct {
name string
in map[string]string
out map[string]string
}{
{
name: "update labels",
in: map[string]string{"test-label-key": "test-label-value"},
out: map[string]string{"test-label-key": "test-label-value"},
},
{
name: "update multiple labels",
in: map[string]string{"update-label-key-a": "update-label-value-a", "update-label-key-b": "update-label-value-b"},
out: map[string]string{"update-label-key-a": "update-label-value-a", "update-label-key-b": "update-label-value-b"},
},
{
name: "not update existing labels",
in: nil, // nil map
out: map[string]string{"update-label-key-a": "update-label-value-a", "update-label-key-b": "update-label-value-b"},
},
{
name: "delete labels",
in: map[string]string{}, // empty map
out: nil,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
confWithClusters := &InstanceWithClustersConfig{
InstanceID: instanceToCreate,
Labels: tt.in,
}
if err := iAdminClient.UpdateInstanceWithClusters(ctx, confWithClusters); err != nil {
t.Fatalf("UpdateInstanceWithClusters: %v", err)
}
iInfo, err := iAdminClient.InstanceInfo(ctx, instanceToCreate)
if err != nil {
t.Fatalf("InstanceInfo: %v", err)
}
if got, want := iInfo.Labels, tt.out; !cmp.Equal(got, want) {
t.Fatalf("Labels: %v, want: %v", got, want)
}
})
}
}
func TestIntegration_AdminUpdateInstanceAndSyncClusters(t *testing.T) {
if instanceToCreate == "" {
t.Skip("instanceToCreate not set, skipping instance update testing")
}
instanceToCreate += "3"
testEnv, err := NewIntegrationEnv()
if err != nil {
t.Fatalf("IntegrationEnv: %v", err)
}
defer testEnv.Close()
if !testEnv.Config().UseProd {
t.Skip("emulator doesn't support instance creation")
}
timeout := 5 * time.Minute
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
iAdminClient, err := testEnv.NewInstanceAdminClient()
if err != nil {
t.Fatalf("NewInstanceAdminClient: %v", err)
}
defer iAdminClient.Close()
clusterID := clusterUIDSpace.New()
// Create a development instance
conf := &InstanceConf{
InstanceId: instanceToCreate,
ClusterId: clusterID,
DisplayName: "test instance",
Zone: instanceToCreateZone,
InstanceType: DEVELOPMENT,
Labels: map[string]string{"test-label-key": "test-label-value"},
}
defer iAdminClient.DeleteInstance(ctx, instanceToCreate)
if err := createInstance(ctx, iAdminClient, conf); err != nil {
t.Fatalf("CreateInstance: %v", err)
}
iInfo, err := iAdminClient.InstanceInfo(ctx, instanceToCreate)
if err != nil {
t.Fatalf("InstanceInfo: %v", err)
}
// Basic return values are tested elsewhere, check instance type
if iInfo.InstanceType != DEVELOPMENT {
t.Fatalf("Instance is not DEVELOPMENT: %v", iInfo.InstanceType)
}
if got, want := iInfo.Labels, conf.Labels; !cmp.Equal(got, want) {
t.Fatalf("Labels: %v, want: %v", got, want)
}
// Update everything we can about the instance in one call.
confWithClusters := &InstanceWithClustersConfig{
InstanceID: instanceToCreate,
DisplayName: "new display name",
InstanceType: PRODUCTION,
Labels: map[string]string{"new-label-key": "new-label-value"},
Clusters: []ClusterConfig{
{ClusterID: clusterID, NumNodes: 5},
},
}
results, err := UpdateInstanceAndSyncClusters(ctx, iAdminClient, confWithClusters)
if err != nil {
t.Fatalf("UpdateInstanceAndSyncClusters: %v", err)
}
wantResults := UpdateInstanceResults{
InstanceUpdated: true,
UpdatedClusters: []string{clusterID},
}
if diff := testutil.Diff(*results, wantResults); diff != "" {
t.Fatalf("UpdateInstanceResults: got - want +\n%s", diff)
}
iInfo, err = iAdminClient.InstanceInfo(ctx, instanceToCreate)
if err != nil {
t.Fatalf("InstanceInfo: %v", err)
}
if iInfo.InstanceType != PRODUCTION {
t.Fatalf("Instance type is not PRODUCTION: %v", iInfo.InstanceType)
}
if got, want := iInfo.Labels, confWithClusters.Labels; !cmp.Equal(got, want) {
t.Fatalf("Labels: %v, want: %v", got, want)
}
if got, want := iInfo.DisplayName, confWithClusters.DisplayName; got != want {
t.Fatalf("Display name: %q, want: %q", got, want)
}
cInfo, err := iAdminClient.GetCluster(ctx, instanceToCreate, clusterID)
if err != nil {
t.Fatalf("GetCluster: %v", err)
}
if cInfo.ServeNodes != 5 {
t.Fatalf("NumNodes: %v, want: %v", cInfo.ServeNodes, 5)
}
// Now add a second cluster as the only change. The first cluster must also be provided so
// it is not removed.
clusterID2 := clusterUIDSpace.New()
confWithClusters = &InstanceWithClustersConfig{
InstanceID: instanceToCreate,
Clusters: []ClusterConfig{
{ClusterID: clusterID},
{ClusterID: clusterID2, NumNodes: 3, StorageType: SSD, Zone: instanceToCreateZone2},
},
}
results, err = UpdateInstanceAndSyncClusters(ctx, iAdminClient, confWithClusters)
if err != nil {
t.Fatalf("UpdateInstanceAndSyncClusters: %v %v", confWithClusters, err)
}
wantResults = UpdateInstanceResults{
InstanceUpdated: false,
CreatedClusters: []string{clusterID2},
}
if diff := testutil.Diff(*results, wantResults); diff != "" {
t.Fatalf("UpdateInstanceResults: got - want +\n%s", diff)
}
// Now update one cluster and delete the other
confWithClusters = &InstanceWithClustersConfig{
InstanceID: instanceToCreate,
Clusters: []ClusterConfig{
{ClusterID: clusterID, NumNodes: 4},
},
}
results, err = UpdateInstanceAndSyncClusters(ctx, iAdminClient, confWithClusters)
if err != nil {
t.Fatalf("UpdateInstanceAndSyncClusters: %v %v", confWithClusters, err)
}
wantResults = UpdateInstanceResults{
InstanceUpdated: false,
UpdatedClusters: []string{clusterID},
DeletedClusters: []string{clusterID2},
}
if diff := testutil.Diff(*results, wantResults); diff != "" {
t.Fatalf("UpdateInstanceResults: got - want +\n%s", diff)
}
// Make sure the instance looks as we would expect
clusters, err := iAdminClient.Clusters(ctx, conf.InstanceId)
if err != nil {
t.Fatalf("Clusters: %v", err)
}
if len(clusters) != 1 {
t.Fatalf("Clusters length %v, want: 1", len(clusters))
}
wantCluster := &ClusterInfo{
Name: clusterID,
Zone: instanceToCreateZone,
ServeNodes: 4,
State: "READY",
}
if diff := testutil.Diff(clusters[0], wantCluster); diff != "" {
t.Fatalf("InstanceEquality: got - want +\n%s", diff)
}
}
func TestIntegration_Autoscaling(t *testing.T) {
if instanceToCreate == "" {
t.Skip("instanceToCreate not set, skipping instance update testing")
}
instanceToCreate += "4"
testEnv, err := NewIntegrationEnv()
if err != nil {
t.Fatalf("IntegrationEnv: %v", err)
}
defer testEnv.Close()
if !testEnv.Config().UseProd {
t.Skip("emulator doesn't support instance creation")
}
timeout := 5 * time.Minute
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
iAdminClient, err := testEnv.NewInstanceAdminClient()
if err != nil {
t.Fatalf("NewInstanceAdminClient: %v", err)
}
defer iAdminClient.Close()
clusterID := instanceToCreate + "-cluster"
t.Log("creating an instance with autoscaling ON (Min = 3, Max = 4)")
conf := &InstanceConf{
InstanceId: instanceToCreate,
ClusterId: clusterID,
DisplayName: "test instance",
Zone: instanceToCreateZone,
InstanceType: PRODUCTION,
AutoscalingConfig: &AutoscalingConfig{
MinNodes: 3,
MaxNodes: 4,
CPUTargetPercent: 60,
},
}
defer iAdminClient.DeleteInstance(ctx, instanceToCreate)
if err := createInstance(ctx, iAdminClient, conf); err != nil {
t.Fatalf("CreateInstance: %v", err)
}
cluster, err := iAdminClient.GetCluster(ctx, instanceToCreate, clusterID)
if err != nil {
t.Fatalf("GetCluster: %v", err)
}
wantNodes := 3
if gotNodes := cluster.ServeNodes; gotNodes != wantNodes {
t.Fatalf("want cluster nodes = %v, got = %v", wantNodes, gotNodes)
}
wantMin := 3
if gotMin := cluster.AutoscalingConfig.MinNodes; gotMin != wantMin {
t.Fatalf("want cluster autoscaling min = %v, got = %v", wantMin, gotMin)
}
wantMax := 4
if gotMax := cluster.AutoscalingConfig.MaxNodes; gotMax != wantMax {
t.Fatalf("want cluster autoscaling max = %v, got = %v", wantMax, gotMax)
}
wantCPU := 60
if gotCPU := cluster.AutoscalingConfig.CPUTargetPercent; gotCPU != wantCPU {
t.Fatalf("want cluster autoscaling CPU target = %v, got = %v", wantCPU, gotCPU)
}
serveNodes := 1
t.Logf("setting autoscaling OFF and setting serve nodes to %v", serveNodes)
err = retry(
func() error {
return iAdminClient.UpdateCluster(ctx, instanceToCreate, clusterID, int32(serveNodes))
}, nil)
if err != nil {
t.Fatalf("UpdateCluster: %v", err)
}
cluster, err = iAdminClient.GetCluster(ctx, instanceToCreate, clusterID)
if err != nil {
t.Fatalf("GetCluster: %v", err)
}
wantNodes = 1
if gotNodes := cluster.ServeNodes; gotNodes != wantNodes {
t.Fatalf("want cluster nodes = %v, got = %v", wantNodes, gotNodes)
}
if gotAsc := cluster.AutoscalingConfig; gotAsc != nil {
t.Fatalf("want cluster autoscaling = nil, got = %v", gotAsc)
}
ac := AutoscalingConfig{
MinNodes: 3,
MaxNodes: 4,
CPUTargetPercent: 80,
}
t.Logf("setting autoscaling ON (Min = %v, Max = %v)", ac.MinNodes, ac.MaxNodes)
err = iAdminClient.SetAutoscaling(ctx, instanceToCreate, clusterID, ac)
if err != nil {
t.Fatalf("SetAutoscaling: %v", err)
}
cluster, err = iAdminClient.GetCluster(ctx, instanceToCreate, clusterID)
if err != nil {
t.Fatalf("GetCluster: %v", err)
}
wantMin = ac.MinNodes
if gotMin := cluster.AutoscalingConfig.MinNodes; gotMin != wantMin {
t.Fatalf("want cluster autoscaling min = %v, got = %v", wantMin, gotMin)
}
wantMax = ac.MaxNodes
if gotMax := cluster.AutoscalingConfig.MaxNodes; gotMax != wantMax {
t.Fatalf("want cluster autoscaling max = %v, got = %v", wantMax, gotMax)
}
wantCPU = ac.CPUTargetPercent
if gotCPU := cluster.AutoscalingConfig.CPUTargetPercent; gotCPU != wantCPU {
t.Fatalf("want cluster autoscaling CPU target = %v, got = %v", wantCPU, gotCPU)
}
}
// instanceAdminClientMock is used to test FailedLocations field processing.
type instanceAdminClientMock struct {
Clusters []*btapb.Cluster
UnavailableLocations []string
// Imbedding the interface allows test writers to override just the methods
// that are interesting for a test and ignore the rest.
btapb.BigtableInstanceAdminClient
}
func (iacm *instanceAdminClientMock) ListClusters(ctx context.Context, req *btapb.ListClustersRequest, opts ...grpc.CallOption) (*btapb.ListClustersResponse, error) {
res := btapb.ListClustersResponse{
Clusters: iacm.Clusters,
FailedLocations: iacm.UnavailableLocations,
}
return &res, nil
}
func TestIntegration_InstanceAdminClient_Clusters_WithFailedLocations(t *testing.T) {
testEnv, err := NewIntegrationEnv()
if err != nil {
t.Fatalf("IntegrationEnv: %v", err)
}
defer testEnv.Close()
if !testEnv.Config().UseProd {
t.Skip("emulator doesn't support snapshots")
}
iAdminClient, err := testEnv.NewInstanceAdminClient()
if err != nil {
t.Fatalf("NewInstanceAdminClient: %v", err)
}
defer iAdminClient.Close()
cluster1 := btapb.Cluster{Name: "cluster1"}
failedLoc := "euro1"
iAdminClient.iClient = &instanceAdminClientMock{
Clusters: []*btapb.Cluster{&cluster1},
UnavailableLocations: []string{failedLoc},
}
cis, err := iAdminClient.Clusters(context.Background(), "instance-id")
convertedErr, ok := err.(ErrPartiallyUnavailable)
if !ok {
t.Fatalf("want error ErrPartiallyUnavailable, got other")
}
if got, want := len(convertedErr.Locations), 1; got != want {
t.Fatalf("want %v failed locations, got %v", want, got)
}
if got, want := convertedErr.Locations[0], failedLoc; got != want {
t.Fatalf("want failed location %v, got %v", want, got)
}
if got, want := len(cis), 1; got != want {
t.Fatalf("want %v failed locations, got %v", want, got)
}
if got, want := cis[0].Name, cluster1.Name; got != want {
t.Fatalf("want cluster %v, got %v", want, got)
}
}
func TestIntegration_Granularity(t *testing.T) {
testEnv, err := NewIntegrationEnv()
if err != nil {
t.Fatalf("IntegrationEnv: %v", err)
}
defer testEnv.Close()
timeout := 2 * time.Second
if testEnv.Config().UseProd {
timeout = 5 * time.Minute
}
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
adminClient, err := testEnv.NewAdminClient()
if err != nil {
t.Fatalf("NewAdminClient: %v", err)
}
defer adminClient.Close()
list := func() []string {
tbls, err := adminClient.Tables(ctx)
if err != nil {
t.Fatalf("Fetching list of tables: %v", err)
}
sort.Strings(tbls)
return tbls
}
containsAll := func(got, want []string) bool {
gotSet := make(map[string]bool)
for _, s := range got {
gotSet[s] = true
}
for _, s := range want {
if !gotSet[s] {
return false
}
}
return true
}
myTableName := myTableNameSpace.New()
defer deleteTable(ctx, t, adminClient, myTableName)
if err := createTable(ctx, adminClient, myTableName); err != nil {
t.Fatalf("Creating table: %v", err)
}
tables := list()
if got, want := tables, []string{myTableName}; !containsAll(got, want) {
t.Errorf("adminClient.Tables returned %#v, want %#v", got, want)
}
// calling ModifyColumnFamilies to check the granularity of table
prefix := adminClient.instancePrefix()
req := &btapb.ModifyColumnFamiliesRequest{
Name: prefix + "/tables/" + myTableName,
Modifications: []*btapb.ModifyColumnFamiliesRequest_Modification{{
Id: "cf",
Mod: &btapb.ModifyColumnFamiliesRequest_Modification_Create{Create: &btapb.ColumnFamily{}},
}},
}
table, err := adminClient.tClient.ModifyColumnFamilies(ctx, req)
if err != nil {
t.Fatalf("Creating column family: %v", err)
}
if table.Granularity != btapb.Table_TimestampGranularity(btapb.Table_MILLIS) {
t.Errorf("ModifyColumnFamilies returned granularity %#v, want %#v", table.Granularity, btapb.Table_TimestampGranularity(btapb.Table_MILLIS))
}
}
func TestIntegration_InstanceAdminClient_CreateAppProfile(t *testing.T) {
testEnv, err := NewIntegrationEnv()
if err != nil {
t.Fatalf("IntegrationEnv: %v", err)
}
defer testEnv.Close()
timeout := 2 * time.Second
if testEnv.Config().UseProd {
timeout = 5 * time.Minute
}
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
adminClient, err := testEnv.NewAdminClient()
if err != nil {
t.Fatalf("NewAdminClient: %v", err)
}
defer adminClient.Close()
iAdminClient, err := testEnv.NewInstanceAdminClient()
if err != nil {
t.Fatalf("NewInstanceAdminClient: %v", err)
}
if iAdminClient == nil {
return
}
defer iAdminClient.Close()
profileIDPrefix := "app_profile_id"
uniqueID := make([]byte, 4)
wantProfiles := map[string]struct{}{"default": {}}
gotProfiles := []*btapb.AppProfile{}
for _, testcase := range []struct {
desc string
profileConf ProfileConf
wantProfile *btapb.AppProfile
}{
{
desc: "SingleClusterRouting",
profileConf: ProfileConf{
RoutingPolicy: SingleClusterRouting,
ClusterID: testEnv.Config().Cluster,
},
wantProfile: &btapb.AppProfile{
RoutingPolicy: &btapb.AppProfile_SingleClusterRouting_{
SingleClusterRouting: &btapb.AppProfile_SingleClusterRouting{
ClusterId: testEnv.Config().Cluster,
},
},
Isolation: &btapb.AppProfile_StandardIsolation_{
StandardIsolation: &btapb.AppProfile_StandardIsolation{
Priority: btapb.AppProfile_PRIORITY_HIGH,
},
},
},
},
{
desc: "MultiClusterRouting",
profileConf: ProfileConf{
RoutingPolicy: MultiClusterRouting,
},
wantProfile: &btapb.AppProfile{
RoutingPolicy: &btapb.AppProfile_MultiClusterRoutingUseAny_{
MultiClusterRoutingUseAny: &btapb.AppProfile_MultiClusterRoutingUseAny{},
},
Isolation: &btapb.AppProfile_StandardIsolation_{
StandardIsolation: &btapb.AppProfile_StandardIsolation{
Priority: btapb.AppProfile_PRIORITY_HIGH,
},
},
},
},
{
desc: "MultiClusterRoutingUseAnyConfig no affinity",
profileConf: ProfileConf{
RoutingConfig: &MultiClusterRoutingUseAnyConfig{
ClusterIDs: []string{testEnv.Config().Cluster},
},
},
wantProfile: &btapb.AppProfile{
RoutingPolicy: &btapb.AppProfile_MultiClusterRoutingUseAny_{
MultiClusterRoutingUseAny: &btapb.AppProfile_MultiClusterRoutingUseAny{
ClusterIds: []string{testEnv.Config().Cluster},
},
},
Isolation: &btapb.AppProfile_StandardIsolation_{
StandardIsolation: &btapb.AppProfile_StandardIsolation{
Priority: btapb.AppProfile_PRIORITY_HIGH,
},
},
},
},
{
desc: "MultiClusterRoutingUseAnyConfig row affinity",
profileConf: ProfileConf{
RoutingConfig: &MultiClusterRoutingUseAnyConfig{
ClusterIDs: []string{testEnv.Config().Cluster},
Affinity: &RowAffinity{},
},
},
wantProfile: &btapb.AppProfile{
RoutingPolicy: &btapb.AppProfile_MultiClusterRoutingUseAny_{
MultiClusterRoutingUseAny: &btapb.AppProfile_MultiClusterRoutingUseAny{
ClusterIds: []string{testEnv.Config().Cluster},
Affinity: &btapb.AppProfile_MultiClusterRoutingUseAny_RowAffinity_{},
},
},
Isolation: &btapb.AppProfile_StandardIsolation_{
StandardIsolation: &btapb.AppProfile_StandardIsolation{
Priority: btapb.AppProfile_PRIORITY_HIGH,
},
},
},
},
{
desc: "SingleClusterRoutingConfig no Isolation",
profileConf: ProfileConf{
RoutingConfig: &SingleClusterRoutingConfig{
ClusterID: testEnv.Config().Cluster,
AllowTransactionalWrites: true,
},
},
wantProfile: &btapb.AppProfile{
RoutingPolicy: &btapb.AppProfile_SingleClusterRouting_{
SingleClusterRouting: &btapb.AppProfile_SingleClusterRouting{
ClusterId: testEnv.Config().Cluster,
AllowTransactionalWrites: true,
},
},
Isolation: &btapb.AppProfile_StandardIsolation_{
StandardIsolation: &btapb.AppProfile_StandardIsolation{
Priority: btapb.AppProfile_PRIORITY_HIGH,
},
},
},
},
{
desc: "SingleClusterRoutingConfig and low priority standard Isolation",
profileConf: ProfileConf{
RoutingConfig: &SingleClusterRoutingConfig{
ClusterID: testEnv.Config().Cluster,
AllowTransactionalWrites: true,
},
Isolation: &StandardIsolation{
Priority: AppProfilePriorityLow,
},
},
wantProfile: &btapb.AppProfile{
RoutingPolicy: &btapb.AppProfile_SingleClusterRouting_{
SingleClusterRouting: &btapb.AppProfile_SingleClusterRouting{
ClusterId: testEnv.Config().Cluster,
AllowTransactionalWrites: true,
},
},
Isolation: &btapb.AppProfile_StandardIsolation_{
StandardIsolation: &btapb.AppProfile_StandardIsolation{
Priority: btapb.AppProfile_PRIORITY_LOW,
},
},
},
},
{
desc: "SingleClusterRoutingConfig and DataBoost Isolation HostPays ComputeBillingOwner",
profileConf: ProfileConf{
RoutingConfig: &SingleClusterRoutingConfig{
ClusterID: testEnv.Config().Cluster,
},
Isolation: &DataBoostIsolationReadOnly{
ComputeBillingOwner: HostPays,
},
},
wantProfile: &btapb.AppProfile{
RoutingPolicy: &btapb.AppProfile_SingleClusterRouting_{
SingleClusterRouting: &btapb.AppProfile_SingleClusterRouting{
ClusterId: testEnv.Config().Cluster,
},
},
Isolation: &btapb.AppProfile_DataBoostIsolationReadOnly_{
DataBoostIsolationReadOnly: &btapb.AppProfile_DataBoostIsolationReadOnly{
ComputeBillingOwner: ptr(btapb.AppProfile_DataBoostIsolationReadOnly_HOST_PAYS),
},
},
},
},
} {
t.Run(testcase.desc, func(t *testing.T) {
cryptorand.Read(uniqueID)
profileID := fmt.Sprintf("%s%x", profileIDPrefix, uniqueID)
testcase.profileConf.ProfileID = profileID
testcase.profileConf.InstanceID = adminClient.instance
testcase.profileConf.Description = testcase.desc
_, err := iAdminClient.CreateAppProfile(ctx, testcase.profileConf)
if err != nil {
t.Fatalf("Creating app profile: %v", err)
}
gotProfile, err := iAdminClient.GetAppProfile(ctx, adminClient.instance, profileID)
if err != nil {
t.Fatalf("Get app profile: %v", err)
}
gotProfiles = append(gotProfiles, gotProfile)
defer func() {
err = iAdminClient.DeleteAppProfile(ctx, adminClient.instance, profileID)
if err != nil {
t.Fatalf("Delete app profile: %v", err)
}
}()
testcase.wantProfile.Name = appProfilePath(testEnv.Config().Project, adminClient.instance, profileID)
testcase.wantProfile.Description = testcase.desc
if !proto.Equal(testcase.wantProfile, gotProfile) {
t.Fatalf("profile: got: %s, want: %s", gotProfile, testcase.wantProfile)
}
wantProfiles[profileID] = struct{}{}
})
}
}
func TestIntegration_InstanceAdminClient_UpdateAppProfile(t *testing.T) {
testEnv, gotErr := NewIntegrationEnv()
if gotErr != nil {
t.Fatalf("IntegrationEnv: %v", gotErr)
}
defer testEnv.Close()
timeout := 2 * time.Second
if testEnv.Config().UseProd {
timeout = 5 * time.Minute
}
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
adminClient, gotErr := testEnv.NewAdminClient()
if gotErr != nil {
t.Fatalf("NewAdminClient: %v", gotErr)
}
defer adminClient.Close()
iAdminClient, gotErr := testEnv.NewInstanceAdminClient()
if gotErr != nil {
t.Fatalf("NewInstanceAdminClient: %v", gotErr)
}
if iAdminClient == nil {
return
}
defer iAdminClient.Close()
uniqueID := make([]byte, 4)
rand.Read(uniqueID)
profileID := fmt.Sprintf("app_profile_id%x", uniqueID)
profile := ProfileConf{
ProfileID: profileID,
InstanceID: adminClient.instance,
ClusterID: testEnv.Config().Cluster,
Description: "creating new app profile 1",
RoutingPolicy: SingleClusterRouting,
}
createdProfile, gotErr := iAdminClient.CreateAppProfile(ctx, profile)
if gotErr != nil {
t.Fatalf("Creating app profile: %v", gotErr)
}
gotProfile, gotErr := iAdminClient.GetAppProfile(ctx, adminClient.instance, profileID)
if gotErr != nil {
t.Fatalf("Get app profile: %v", gotErr)
}
defer func() {
gotErr = iAdminClient.DeleteAppProfile(ctx, adminClient.instance, profileID)
if gotErr != nil {
t.Fatalf("Delete app profile: %v", gotErr)
}
}()
if !proto.Equal(createdProfile, gotProfile) {
t.Fatalf("created profile: %s, got profile: %s", createdProfile.Name, gotProfile.Name)
}
list := func(instanceID string) ([]*btapb.AppProfile, error) {
profiles := []*btapb.AppProfile(nil)
it := iAdminClient.ListAppProfiles(ctx, instanceID)
for {
s, err := it.Next()
if err == iterator.Done {
break
}
if err != nil {
return nil, err
}
profiles = append(profiles, s)
}
return profiles, gotErr
}
profiles, gotErr := list(adminClient.instance)
if gotErr != nil {
t.Fatalf("List app profile: %v", gotErr)
}
// Ensure the profiles we require exist. profiles ⊂ allProfiles
verifyProfilesSubset := func(allProfiles []*btapb.AppProfile, profiles map[string]struct{}) {
for _, profile := range allProfiles {
segs := strings.Split(profile.Name, "/")
delete(profiles, segs[len(segs)-1])
}
if len(profiles) > 0 {
t.Fatalf("Initial app profile list missing profile: %v : %v", profiles, allProfiles)
}
}
// App Profile list should contain default, app_profile1
wantProfiles := map[string]struct{}{"default": {}, profileID: {}}
verifyProfilesSubset(profiles, wantProfiles)
for _, test := range []struct {
desc string
uattrs ProfileAttrsToUpdate
wantProfile *btapb.AppProfile
wantErrMsg string
skip bool
}{
{
desc: "empty update",
uattrs: ProfileAttrsToUpdate{},
wantErrMsg: "A non-empty 'update_mask' must be specified",
},
{
desc: "empty description update",
uattrs: ProfileAttrsToUpdate{Description: ""},
wantProfile: &btapb.AppProfile{
Name: gotProfile.Name,
Description: "",
RoutingPolicy: gotProfile.RoutingPolicy,
Etag: gotProfile.Etag,
Isolation: &btapb.AppProfile_StandardIsolation_{
StandardIsolation: &btapb.AppProfile_StandardIsolation{
Priority: btapb.AppProfile_PRIORITY_HIGH,
},
},
},
},
{
desc: "routing update SingleClusterRouting",
uattrs: ProfileAttrsToUpdate{
RoutingPolicy: SingleClusterRouting,
ClusterID: testEnv.Config().Cluster,
},
wantProfile: &btapb.AppProfile{
Name: gotProfile.Name,
Description: "",
Etag: gotProfile.Etag,
RoutingPolicy: &btapb.AppProfile_SingleClusterRouting_{
SingleClusterRouting: &btapb.AppProfile_SingleClusterRouting{
ClusterId: testEnv.Config().Cluster,
},
},
Isolation: &btapb.AppProfile_StandardIsolation_{
StandardIsolation: &btapb.AppProfile_StandardIsolation{
Priority: btapb.AppProfile_PRIORITY_HIGH,
},
},
},
},
{
desc: "routing only update MultiClusterRoutingUseAnyConfig",
uattrs: ProfileAttrsToUpdate{
RoutingConfig: &MultiClusterRoutingUseAnyConfig{
ClusterIDs: []string{testEnv.Config().Cluster},
},
},
wantProfile: &btapb.AppProfile{
Name: gotProfile.Name,
Etag: gotProfile.Etag,
RoutingPolicy: &btapb.AppProfile_MultiClusterRoutingUseAny_{
MultiClusterRoutingUseAny: &btapb.AppProfile_MultiClusterRoutingUseAny{
ClusterIds: []string{testEnv.Config().Cluster},
},
},
Isolation: &btapb.AppProfile_StandardIsolation_{
StandardIsolation: &btapb.AppProfile_StandardIsolation{
Priority: btapb.AppProfile_PRIORITY_HIGH,
},
},
},
},
{
desc: "routing only update SingleClusterRoutingConfig",
uattrs: ProfileAttrsToUpdate{
RoutingConfig: &SingleClusterRoutingConfig{
ClusterID: testEnv.Config().Cluster,
},
},
wantProfile: &btapb.AppProfile{
Name: gotProfile.Name,
Etag: gotProfile.Etag,
RoutingPolicy: &btapb.AppProfile_SingleClusterRouting_{
SingleClusterRouting: &btapb.AppProfile_SingleClusterRouting{
ClusterId: testEnv.Config().Cluster,
},
},
Isolation: &btapb.AppProfile_StandardIsolation_{
StandardIsolation: &btapb.AppProfile_StandardIsolation{
Priority: btapb.AppProfile_PRIORITY_HIGH,
},
},
},
},
{
desc: "isolation only update DataBoost",
uattrs: ProfileAttrsToUpdate{
Isolation: &DataBoostIsolationReadOnly{
ComputeBillingOwner: HostPays,
},
},
wantProfile: &btapb.AppProfile{
Name: gotProfile.Name,
Etag: gotProfile.Etag,
RoutingPolicy: &btapb.AppProfile_SingleClusterRouting_{
SingleClusterRouting: &btapb.AppProfile_SingleClusterRouting{
ClusterId: testEnv.Config().Cluster,
},
},
Isolation: &btapb.AppProfile_DataBoostIsolationReadOnly_{
DataBoostIsolationReadOnly: &btapb.AppProfile_DataBoostIsolationReadOnly{
ComputeBillingOwner: ptr(btapb.AppProfile_DataBoostIsolationReadOnly_HOST_PAYS),
},
},
},
skip: true,
},
} {
if test.skip {
t.Logf("skipping test: %s", test.desc)
continue
}
gotErr = iAdminClient.UpdateAppProfile(ctx, adminClient.instance, profileID, test.uattrs)
if gotErr == nil && test.wantErrMsg != "" {
t.Fatalf("%s: UpdateAppProfile: got: nil, want: error: %v", test.desc, test.wantErrMsg)
}
if gotErr != nil && test.wantErrMsg == "" {
t.Fatalf("%s: UpdateAppProfile: got: %v, want: nil", test.desc, gotErr)
}
if gotErr != nil {
continue
}
// Retry to see if the update has been completed
testutil.Retry(t, 10, 10*time.Second, func(r *testutil.R) {
got, _ := iAdminClient.GetAppProfile(ctx, adminClient.instance, profileID)
if !proto.Equal(got, test.wantProfile) {
r.Errorf("%s: got profile: %v,\n want profile: %v", test.desc, gotProfile, test.wantProfile)
}
})
}
}
func TestIntegration_NodeScalingFactor(t *testing.T) {
if instanceToCreate == "" {
t.Skip("instanceToCreate not set, skipping instance update testing")
}
instanceToCreate += "5"
testEnv, err := NewIntegrationEnv()
if err != nil {
t.Fatalf("IntegrationEnv: %v", err)
}
defer testEnv.Close()
if !testEnv.Config().UseProd {
t.Skip("emulator doesn't support instance creation")
}
timeout := 10 * time.Minute
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
iAdminClient, err := testEnv.NewInstanceAdminClient()
if err != nil {
t.Fatalf("NewInstanceAdminClient: %v", err)
}
defer iAdminClient.Close()
clusterID := instanceToCreate + "-cluster"
wantNodeScalingFactor := NodeScalingFactor2X
t.Log("creating an instance with node scaling factor")
conf := &InstanceWithClustersConfig{
InstanceID: instanceToCreate,
DisplayName: "test instance",
Clusters: []ClusterConfig{
{
ClusterID: clusterID,
NumNodes: 2,
NodeScalingFactor: wantNodeScalingFactor,
Zone: instanceToCreateZone,
},
},
}
defer iAdminClient.DeleteInstance(ctx, instanceToCreate)
err = retry(func() error { return iAdminClient.CreateInstanceWithClusters(ctx, conf) },
func() error { return iAdminClient.DeleteInstance(ctx, conf.InstanceID) })
if err != nil {
t.Fatalf("CreateInstanceWithClusters: %v", err)
}
cluster, err := iAdminClient.GetCluster(ctx, instanceToCreate, clusterID)
if err != nil {
t.Fatalf("GetCluster: %v", err)
}
if gotNodeScalingFactor := cluster.NodeScalingFactor; gotNodeScalingFactor != wantNodeScalingFactor {
t.Fatalf("NodeScalingFactor: got: %v, want: %v", gotNodeScalingFactor, wantNodeScalingFactor)
}
}
func TestIntegration_InstanceUpdate(t *testing.T) {
testEnv, err := NewIntegrationEnv()
if err != nil {
t.Fatalf("IntegrationEnv: %v", err)
}
defer testEnv.Close()
timeout := 2 * time.Second
if testEnv.Config().UseProd {
timeout = 5 * time.Minute
}
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
adminClient, err := testEnv.NewAdminClient()
if err != nil {
t.Fatalf("NewAdminClient: %v", err)
}
defer adminClient.Close()
iAdminClient, err := testEnv.NewInstanceAdminClient()
if err != nil {
t.Fatalf("NewInstanceAdminClient: %v", err)
}
if iAdminClient == nil {
return
}
defer iAdminClient.Close()
iInfo, err := iAdminClient.InstanceInfo(ctx, adminClient.instance)
if err != nil {
t.Errorf("InstanceInfo: %v", err)
}
if iInfo.Name != adminClient.instance {
t.Errorf("InstanceInfo returned name %#v, want %#v", iInfo.Name, adminClient.instance)
}
if iInfo.DisplayName != adminClient.instance {
t.Errorf("InstanceInfo returned name %#v, want %#v", iInfo.DisplayName, adminClient.instance)
}
const numNodes = 4
// update cluster nodes
if err := retry(
func() error {
return iAdminClient.UpdateCluster(ctx, adminClient.instance, testEnv.Config().Cluster, int32(numNodes))
}, nil); err != nil {
t.Errorf("UpdateCluster: %v", err)
}
// get cluster after updating
cis, err := iAdminClient.GetCluster(ctx, adminClient.instance, testEnv.Config().Cluster)
if err != nil {
t.Errorf("GetCluster %v", err)
}
if cis.ServeNodes != int(numNodes) {
t.Errorf("ServeNodes returned %d, want %d", cis.ServeNodes, int(numNodes))
}
}
func createRandomInstance(ctx context.Context, iAdminClient *InstanceAdminClient) (string, string, error) {
newConf := InstanceConf{
InstanceId: generateNewInstanceName(),
ClusterId: clusterUIDSpace.New(),
DisplayName: "different test sourceInstance",
Zone: instanceToCreateZone2,
InstanceType: DEVELOPMENT,
Labels: map[string]string{"test-label-key-diff": "test-label-value-diff"},
}
err := createInstance(ctx, iAdminClient, &newConf)
return newConf.InstanceId, newConf.ClusterId, err
}
func createInstance(ctx context.Context, iAdminClient *InstanceAdminClient, iConf *InstanceConf) error {
return retry(func() error { return iAdminClient.CreateInstance(ctx, iConf) },
func() error { return iAdminClient.DeleteInstance(ctx, iConf.InstanceId) },
)
}
func TestIntegration_AdminCopyBackup(t *testing.T) {
testEnv, err := NewIntegrationEnv()
if err != nil {
t.Fatalf("IntegrationEnv: %v", err)
}
defer testEnv.Close()
if !testEnv.Config().UseProd {
t.Skip("emulator doesn't support backups")
}
timeout := 15 * time.Minute
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
// Create source clients
srcAdminClient, err := testEnv.NewAdminClient()
if err != nil {
t.Fatalf("NewAdminClient: %v", err)
}
defer srcAdminClient.Close()
srcIAdminClient, err := testEnv.NewInstanceAdminClient()
if err != nil {
t.Fatalf("NewInstanceAdminClient: %v", err)
}
defer srcIAdminClient.Close()
// Create table
tblConf := TableConf{
TableID: testEnv.Config().Table,
Families: map[string]GCPolicy{
"fam1": MaxVersionsPolicy(1),
"fam2": MaxVersionsPolicy(2),
},
}
defer deleteTable(ctx, t, srcAdminClient, tblConf.TableID)
if err := createTableFromConf(ctx, srcAdminClient, &tblConf); err != nil {
t.Fatalf("Creating table from TableConf: %v", err)
}
// Create source backup
copyBackupUID := uid.NewSpace(prefixOfInstanceResources, &uid.Options{})
backupUID := uid.NewSpace(prefixOfInstanceResources, &uid.Options{})
srcBackupName := backupUID.New()
srcCluster := testEnv.Config().Cluster
defer srcAdminClient.DeleteBackup(ctx, srcCluster, srcBackupName)
if err = srcAdminClient.CreateBackup(ctx, tblConf.TableID, srcCluster, srcBackupName, time.Now().Add(100*time.Hour)); err != nil {
t.Fatalf("Creating backup: %v", err)
}
wantSourceBackup := srcAdminClient.instancePrefix() + "/clusters/" + srcCluster + "/backups/" + srcBackupName
destProj1 := testEnv.Config().Project
destProj1Inst1 := testEnv.Config().Instance // 1st instance in 1st destination project
destProj1Inst1Cl1 := srcCluster // 1st cluster in 1st instance in 1st destination project
type testcase struct {
desc string
destProject string
destInstance string
destCluster string
}
testcases := []testcase{
{
desc: "Copy backup to same project, same instance, same cluster",
destProject: destProj1,
destInstance: destProj1Inst1,
destCluster: destProj1Inst1Cl1,
},
}
for _, testcase := range testcases {
// Create destination client
destCtx, destOpts, err := testEnv.AdminClientOptions()
if err != nil {
t.Fatalf("%v: AdminClientOptions: %v", testcase.desc, err)
}
desc := testcase.desc
destProject := testcase.destProject
destInstance := testcase.destInstance
destCluster := testcase.destCluster
destAdminClient, err := NewAdminClient(destCtx, destProject, destInstance, destOpts...)
if err != nil {
t.Fatalf("%v: NewAdminClient: %v", desc, err)
}
defer destAdminClient.Close()
// Copy Backup
destBackupName := copyBackupUID.New()
defer destAdminClient.DeleteBackup(destCtx, destCluster, destBackupName)
err = srcAdminClient.CopyBackup(destCtx, srcCluster, srcBackupName, destProject, destInstance, destCluster,
destBackupName, time.Now().Add(24*time.Hour))
if err != nil {
t.Fatalf("%v: CopyBackup: %v", desc, err)
}
// Verify source backup field in backup info
gotBackupInfo, err := destAdminClient.BackupInfo(ctx, destCluster, destBackupName)
if err != nil {
t.Fatalf("%v: BackupInfo: %v", desc, err)
}
if gotBackupInfo.SourceBackup != wantSourceBackup {
t.Fatalf("%v: SourceBackup: got: %v, want: %v", desc, gotBackupInfo.SourceBackup, wantSourceBackup)
}
}
}
func TestIntegration_AdminBackup(t *testing.T) {
testEnv, err := NewIntegrationEnv()
if err != nil {
t.Fatalf("IntegrationEnv: %v", err)
}
defer testEnv.Close()
if !testEnv.Config().UseProd {
t.Skip("emulator doesn't support backups")
}
timeout := 15 * time.Minute
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
adminClient, err := testEnv.NewAdminClient()
if err != nil {
t.Fatalf("NewAdminClient: %v", err)
}
defer adminClient.Close()
tblConf := TableConf{
TableID: testEnv.Config().Table,
Families: map[string]GCPolicy{
"fam1": MaxVersionsPolicy(1),
"fam2": MaxVersionsPolicy(2),
},
}
if err := createTableFromConf(ctx, adminClient, &tblConf); err != nil {
t.Fatalf("Creating table from TableConf: %v", err)
}
// Delete the table at the end of the test. Schedule ahead of time
// in case the client fails
defer deleteTable(ctx, t, adminClient, tblConf.TableID)
sourceInstance := testEnv.Config().Instance
sourceCluster := testEnv.Config().Cluster
iAdminClient, err := testEnv.NewInstanceAdminClient()
if err != nil {
t.Fatalf("NewInstanceAdminClient: %v", err)
}
defer iAdminClient.Close()
list := func(cluster string) ([]*BackupInfo, error) {
infos := []*BackupInfo(nil)
it := adminClient.Backups(ctx, cluster)
for {
s, err := it.Next()
if err == iterator.Done {
break
}
if err != nil {
return nil, err
}
infos = append(infos, s)
}
return infos, err
}
// Create standard backup
if err != nil {
t.Fatalf("Failed to generate a unique ID: %v", err)
}
backupUID := uid.NewSpace("mybackup-", &uid.Options{})
stdBkpName := backupUID.New()
defer adminClient.DeleteBackup(ctx, sourceCluster, stdBkpName)
if err = adminClient.CreateBackup(ctx, tblConf.TableID, sourceCluster, stdBkpName, time.Now().Add(8*time.Hour)); err != nil {
t.Fatalf("Creating backup: %v", err)
}
// Create hot backup with hot_to_standard_time
hotBkpName1 := backupUID.New()
defer adminClient.DeleteBackup(ctx, sourceCluster, hotBkpName1)
wantHtsTime := time.Now().Truncate(time.Second).Add(48 * time.Hour)
if err = adminClient.CreateBackupWithOptions(ctx, tblConf.TableID, sourceCluster, hotBkpName1,
WithExpiry(time.Now().Add(8*time.Hour)), WithHotToStandardBackup(wantHtsTime)); err != nil {
t.Fatalf("Creating backup: %v", err)
}
// Create hot backup without hot_to_standard_time
hotBkpName2 := backupUID.New()
defer adminClient.DeleteBackup(ctx, sourceCluster, hotBkpName2)
if err = adminClient.CreateBackupWithOptions(ctx, tblConf.TableID, sourceCluster, hotBkpName2,
WithExpiry(time.Now().Add(8*time.Hour)), WithHotBackup()); err != nil {
t.Fatalf("Creating backup: %v", err)
}
// List backup
var gotBackups []*BackupInfo
testutil.Retry(t, 20, 30*time.Second, func(r *testutil.R) {
var err error
gotBackups, err = list(sourceCluster)
if err != nil {
r.Fatalf("Listing backups: %v", err)
}
wantBackups := map[string]struct {
HotToStandardTime *time.Time
BackupType BackupType
}{
stdBkpName: {
BackupType: BackupTypeStandard,
},
hotBkpName1: {
BackupType: BackupTypeHot,
HotToStandardTime: &wantHtsTime,
},
hotBkpName2: {
BackupType: BackupTypeHot,
},
}
foundBackups := map[string]bool{}
for _, gotBackup := range gotBackups {
wantBackup, ok := wantBackups[gotBackup.Name]
if !ok {
continue
}
foundBackups[gotBackup.Name] = true
if got, want := gotBackup.SourceTable, tblConf.TableID; got != want {
r.Errorf("%v SourceTable got: %s, want: %s", gotBackup.Name, got, want)
}
if got, want := gotBackup.ExpireTime, gotBackup.StartTime.Add(8*time.Hour); math.Abs(got.Sub(want).Minutes()) > 1 {
r.Errorf("%v ExpireTime got: %s, want: %s", gotBackup.Name, got, want)
}
if got, want := gotBackup.BackupType, wantBackup.BackupType; got != want {
r.Errorf("%v BackupType got: %v, want: %v", gotBackup.Name, got, want)
}
if got, want := gotBackup.HotToStandardTime, wantBackup.HotToStandardTime; (got != nil && !got.Equal(*want)) ||
(got == nil && got != want) || (want == nil && got != want) {
r.Errorf("%v HotToStandardTime got: %v, want: %v", gotBackup.Name, got, want)
}
}
if len(foundBackups) != len(wantBackups) {
r.Errorf("foundBackups: %+v, wantBackups: %+v", foundBackups, wantBackups)
}
})
// Get BackupInfo
gotBackupInfo, err := adminClient.BackupInfo(ctx, sourceCluster, stdBkpName)
if err != nil {
t.Fatalf("BackupInfo: %v", gotBackupInfo)
}
if got, want := *gotBackupInfo, *gotBackups[0]; cmp.Equal(got, &want) {
t.Errorf("BackupInfo: %v, want: %v", got, want)
}
// Update backup
newExpireTime := time.Now().Add(10 * time.Hour)
err = adminClient.UpdateBackup(ctx, sourceCluster, stdBkpName, newExpireTime)
if err != nil {
t.Fatalf("UpdateBackup failed: %v", err)
}
// Check that updated backup has the correct expire time
updatedBackup, err := adminClient.BackupInfo(ctx, sourceCluster, stdBkpName)
if err != nil {
t.Fatalf("BackupInfo: %v", err)
}
gotBackupInfo.ExpireTime = newExpireTime
// Server clock and local clock may not be perfectly sync'ed.
if got, want := *updatedBackup, *gotBackupInfo; got.ExpireTime.Sub(want.ExpireTime) > time.Minute {
t.Errorf("BackupInfo: %v, want: %v", got, want)
}
// Restore backup
restoredTable := tblConf.TableID + "-restored"
defer deleteTable(ctx, t, adminClient, restoredTable)
if err = adminClient.RestoreTable(ctx, restoredTable, sourceCluster, stdBkpName); err != nil {
t.Fatalf("RestoreTable: %v", err)
}
if _, err := adminClient.TableInfo(ctx, restoredTable); err != nil {
t.Fatalf("Restored TableInfo: %v", err)
}
// If 'it.run-create-instance-tests' flag is set while running the tests,
// instanceToCreate will be non-empty string.
// Add more testcases if instanceToCreate is non-empty string
if instanceToCreate != "" {
// Create different instance to restore table.
diffInstance, diffCluster, err := createRandomInstance(ctx, iAdminClient)
if err != nil {
t.Fatalf("CreateInstance: %v", err)
}
defer iAdminClient.DeleteInstance(ctx, diffInstance)
// Restore backup to different instance
restoreTableName := tblConf.TableID + "-diff-restored"
diffConf := IntegrationTestConfig{
Project: testEnv.Config().Project,
Instance: diffInstance,
Cluster: diffCluster,
Table: restoreTableName,
}
env := &ProdEnv{
config: diffConf,
}
dAdminClient, err := env.NewAdminClient()
if err != nil {
t.Errorf("NewAdminClient: %v", err)
}
defer dAdminClient.Close()
defer deleteTable(ctx, t, dAdminClient, restoreTableName)
if err = dAdminClient.RestoreTableFrom(ctx, sourceInstance, restoreTableName, sourceCluster, stdBkpName); err != nil {
t.Fatalf("RestoreTableFrom: %v", err)
}
tblInfo, err := dAdminClient.TableInfo(ctx, restoreTableName)
if err != nil {
t.Fatalf("Restored to different sourceInstance failed, TableInfo: %v", err)
}
families := tblInfo.Families
sort.Strings(tblInfo.Families)
wantFams := []string{"fam1", "fam2"}
if !testutil.Equal(families, wantFams) {
t.Errorf("Column family mismatch, got %v, want %v", tblInfo.Families, wantFams)
}
}
// Delete backup
if err = adminClient.DeleteBackup(ctx, sourceCluster, stdBkpName); err != nil {
t.Fatalf("DeleteBackup: %v", err)
}
gotBackups, err = list(sourceCluster)
if err != nil {
t.Fatalf("List after Delete: %v", err)
}
// Verify the backup was deleted.
for _, backup := range gotBackups {
if backup.Name == stdBkpName {
t.Errorf("Backup '%v' was not deleted", backup.Name)
break
}
}
}
func TestIntegration_AdminUpdateBackupHotToStandardTime(t *testing.T) {
// Setup test environment
testEnv, err := NewIntegrationEnv()
if err != nil {
t.Fatalf("IntegrationEnv: %v", err)
}
defer testEnv.Close()
if !testEnv.Config().UseProd {
t.Skip("emulator doesn't support backups")
}
// Create context
timeout := 15 * time.Minute
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
// Create table
adminClient, err := testEnv.NewAdminClient()
if err != nil {
t.Fatalf("NewAdminClient: %v", err)
}
defer adminClient.Close()
tblConf := TableConf{
TableID: testEnv.Config().Table,
Families: map[string]GCPolicy{
"fam1": MaxVersionsPolicy(1),
"fam2": MaxVersionsPolicy(2),
},
}
defer deleteTable(ctx, t, adminClient, tblConf.TableID)
if err := adminClient.CreateTableFromConf(ctx, &tblConf); err != nil {
t.Fatalf("Creating table from TableConf: %v", err)
}
// Create hot backup with hot_to_standard_time 2 days from now
backupUID := uid.NewSpace("mybackup-", &uid.Options{})
bkpName := backupUID.New()
defer adminClient.DeleteBackup(ctx, testEnv.Config().Cluster, bkpName)
if err = adminClient.CreateBackupWithOptions(ctx, tblConf.TableID, testEnv.Config().Cluster, bkpName,
WithExpiry(time.Now().Add(8*time.Hour)), WithHotToStandardBackup(time.Now().Truncate(time.Second).Add(2*24*time.Hour))); err != nil {
t.Fatalf("Creating backup: %v", err)
}
fiveDaysLater := time.Now().Truncate(time.Second).Add(5 * 24 * time.Hour)
for _, test := range []struct {
wantHtsTime *time.Time
desc string
}{
{
desc: "Unset hot_to_standard_time",
wantHtsTime: nil,
},
{
desc: "Set hot_to_standard_time to 5 days from now",
wantHtsTime: &fiveDaysLater,
},
} {
t.Run(test.desc, func(t *testing.T) {
// Update hot_to_standard_time
if test.wantHtsTime == nil {
err = adminClient.UpdateBackupRemoveHotToStandardTime(ctx, testEnv.Config().Cluster, bkpName)
if err != nil {
t.Fatalf("UpdateBackupRemoveHotToStandardTime failed: %v", err)
}
} else {
err = adminClient.UpdateBackupHotToStandardTime(ctx, testEnv.Config().Cluster, bkpName, *test.wantHtsTime)
if err != nil {
t.Fatalf("UpdateBackupHotToStandardTime failed: %v", err)
}
}
// Check that updated backup has the correct hot_to_standard_time
updatedBackup, err := adminClient.BackupInfo(ctx, testEnv.Config().Cluster, bkpName)
if err != nil {
t.Fatalf("BackupInfo: %v", err)
}
gotHtsTime := updatedBackup.HotToStandardTime
if (test.wantHtsTime == nil && gotHtsTime != nil) ||
(test.wantHtsTime != nil && gotHtsTime == nil) ||
(test.wantHtsTime != nil && !test.wantHtsTime.Equal(*gotHtsTime)) {
t.Errorf("hot_to_standard_time got: %v, want: %v", gotHtsTime, test.wantHtsTime)
}
})
}
}
func TestIntegration_AdminAuthorizedView(t *testing.T) {
t.Skip("flaky https://github.com/googleapis/google-cloud-go/issues/13398")
testEnv, err := NewIntegrationEnv()
if err != nil {
t.Fatalf("IntegrationEnv: %v", err)
}
defer testEnv.Close()
if !testEnv.Config().UseProd {
t.Skip("emulator doesn't support authorizedViews")
}
timeout := 15 * time.Minute
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
adminClient, err := testEnv.NewAdminClient()
if err != nil {
t.Fatalf("NewAdminClient: %v", err)
}
defer adminClient.Close()
tblConf := TableConf{
TableID: testEnv.Config().Table,
Families: map[string]GCPolicy{
"fam1": MaxVersionsPolicy(1),
"fam2": MaxVersionsPolicy(2),
},
}
if err := createTableFromConf(ctx, adminClient, &tblConf); err != nil {
t.Fatalf("Creating table from TableConf: %v", err)
}
// Delete the table at the end of the test. Schedule ahead of time
// in case the client fails
defer deleteTable(ctx, t, adminClient, tblConf.TableID)
// Create authorized view
authorizedViewUUID := uid.NewSpace("authorizedView-", &uid.Options{})
authorizedView := authorizedViewUUID.New()
defer adminClient.DeleteAuthorizedView(ctx, tblConf.TableID, authorizedView)
authorizedViewConf := AuthorizedViewConf{
TableID: tblConf.TableID,
AuthorizedViewID: authorizedView,
AuthorizedView: &SubsetViewConf{
RowPrefixes: [][]byte{[]byte("r1")},
},
DeletionProtection: Protected,
}
if err = adminClient.CreateAuthorizedView(ctx, &authorizedViewConf); err != nil {
t.Fatalf("Creating authorized view: %v", err)
}
// List authorized views
authorizedViews, err := adminClient.AuthorizedViews(ctx, tblConf.TableID)
if err != nil {
t.Fatalf("Listing authorized views: %v", err)
}
if got, want := len(authorizedViews), 1; got != want {
t.Fatalf("Listing authorized views count: %d, want: != %d", got, want)
}
if got, want := authorizedViews[0], authorizedView; got != want {
t.Errorf("AuthorizedView Name: %s, want: %s", got, want)
}
// Get authorized view
avInfo, err := adminClient.AuthorizedViewInfo(ctx, tblConf.TableID, authorizedView)
if err != nil {
t.Fatalf("Getting authorized view: %v", err)
}
if got, want := avInfo.AuthorizedView.(*SubsetViewInfo), authorizedViewConf.AuthorizedView.(*SubsetViewConf); cmp.Equal(got, want) {
t.Errorf("SubsetViewConf: %v, want: %v", got, want)
}
// Cannot delete the authorized view because it is deletion protected
if err = adminClient.DeleteAuthorizedView(ctx, tblConf.TableID, authorizedView); err == nil {
t.Fatalf("Expect error when deleting authorized view")
}
// Update authorized view
newAuthorizedViewConf := AuthorizedViewConf{
TableID: tblConf.TableID,
AuthorizedViewID: authorizedView,
DeletionProtection: Unprotected,
}
err = adminClient.UpdateAuthorizedView(ctx, UpdateAuthorizedViewConf{
AuthorizedViewConf: newAuthorizedViewConf,
})
if err != nil {
t.Fatalf("UpdateAuthorizedView failed: %v", err)
}
// Check that updated authorized view has the correct deletion protection
avInfo, err = adminClient.AuthorizedViewInfo(ctx, tblConf.TableID, authorizedView)
if err != nil {
t.Fatalf("Getting authorized view: %v", err)
}
if got, want := avInfo.DeletionProtection, Unprotected; got != want {
t.Errorf("AuthorizedView deletion protection: %v, want: %v", got, want)
}
// Check that the subset_view field doesn't change
if got, want := avInfo.AuthorizedView.(*SubsetViewInfo), authorizedViewConf.AuthorizedView.(*SubsetViewConf); cmp.Equal(got, want) {
t.Errorf("SubsetViewConf: %v, want: %v", got, want)
}
// Delete authorized view
if err = adminClient.DeleteAuthorizedView(ctx, tblConf.TableID, authorizedView); err != nil {
t.Fatalf("DeleteAuthorizedView: %v", err)
}
// Verify the authorized view was deleted.
authorizedViews, err = adminClient.AuthorizedViews(ctx, tblConf.TableID)
if err != nil {
t.Fatalf("Listing authorized views: %v", err)
}
if got, want := len(authorizedViews), 0; got != want {
t.Fatalf("Listing authorized views count: %d, want: != %d", got, want)
}
}
func TestIntegration_DataAuthorizedView(t *testing.T) {
testEnv, err := NewIntegrationEnv()
if err != nil {
t.Fatalf("IntegrationEnv: %v", err)
}
defer testEnv.Close()
if !testEnv.Config().UseProd {
t.Skip("emulator doesn't support authorizedViews")
}
timeout := 15 * time.Minute
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
adminClient, err := testEnv.NewAdminClient()
if err != nil {
t.Fatalf("NewAdminClient: %v", err)
}
defer adminClient.Close()
tblConf := TableConf{
TableID: testEnv.Config().Table,
Families: map[string]GCPolicy{
"fam1": MaxVersionsPolicy(1),
"fam2": MaxVersionsPolicy(2),
},
}
if err := createTableFromConf(ctx, adminClient, &tblConf); err != nil {
t.Fatalf("Creating table from TableConf: %v", err)
}
// Delete the table at the end of the test. Schedule ahead of time
// in case the client fails
defer deleteTable(ctx, t, adminClient, tblConf.TableID)
// Create authorized view
authorizedViewUUID := uid.NewSpace("authorizedView-", &uid.Options{})
authorizedView := authorizedViewUUID.New()
defer adminClient.DeleteAuthorizedView(ctx, tblConf.TableID, authorizedView)
authorizedViewConf := AuthorizedViewConf{
TableID: tblConf.TableID,
AuthorizedViewID: authorizedView,
AuthorizedView: &SubsetViewConf{
RowPrefixes: [][]byte{[]byte("r1")},
FamilySubsets: map[string]FamilySubset{
"fam1": {
QualifierPrefixes: [][]byte{[]byte("col")},
},
"fam2": {
Qualifiers: [][]byte{[]byte("col")},
},
},
},
DeletionProtection: Unprotected,
}
if err = adminClient.CreateAuthorizedView(ctx, &authorizedViewConf); err != nil {
t.Fatalf("Creating authorized view: %v", err)
}
client, err := testEnv.NewClient()
if err != nil {
t.Fatalf("NewClient: %v", err)
}
defer client.Close()
av := client.OpenAuthorizedView(tblConf.TableID, authorizedView)
tbl := client.OpenTable(tblConf.TableID)
prefix1 := "r1"
prefix2 := "r2" // outside of the authorized view
mut1 := NewMutation()
mut1.Set("fam1", "col1", 1000, []byte("1"))
mut2 := NewMutation()
mut2.Set("fam1", "col2", 1000, []byte("1"))
mut3 := NewMutation()
mut3.Set("fam2", "column", 1000, []byte("1")) // outside of the authorized view
// Test mutation
if err := av.Apply(ctx, prefix1, mut1); err != nil {
t.Fatalf("Mutating row from an authorized view: %v", err)
}
if err := av.Apply(ctx, prefix2, mut1); err == nil {
t.Fatalf("Expect error when mutating a row outside of the authorized view: %v", err)
}
if err := tbl.Apply(ctx, prefix2, mut1); err != nil {
t.Fatalf("Mutating row from a table: %v", err)
}
// Test bulk mutations
status, err := av.ApplyBulk(ctx, []string{prefix1, prefix2, prefix1}, []*Mutation{mut2, mut2, mut3})
if err != nil {
t.Fatalf("Mutating rows from an authorized view: %v", err)
}
if status == nil {
t.Fatalf("Expect error for bad bulk mutation outside of the authorized view")
} else if status[0] != nil || status[1] == nil || status[2] == nil {
t.Fatalf("Expect error for bad bulk mutation outside of the authorized view")
}
// Test ReadRow
gotRow, err := av.ReadRow(ctx, "r1")
if err != nil {
t.Fatalf("Reading row from an authorized view: %v", err)
}
wantRow := Row{
"fam1": []ReadItem{
{Row: "r1", Column: "fam1:col1", Timestamp: 1000, Value: []byte("1")},
{Row: "r1", Column: "fam1:col2", Timestamp: 1000, Value: []byte("1")},
},
}
if !testutil.Equal(gotRow, wantRow) {
t.Fatalf("Error reading row from authorized view.\n Got %v\n Want %v", gotRow, wantRow)
}
gotRow, err = av.ReadRow(ctx, "r2")
if err != nil {
t.Fatalf("Reading row from an authorized view: %v", err)
}
if len(gotRow) != 0 {
t.Fatalf("Expect empty result when reading row from outside an authorized view")
}
gotRow, err = tbl.ReadRow(ctx, "r2")
if err != nil {
t.Fatalf("Reading row from a table: %v", err)
}
if len(gotRow) != 1 {
t.Fatalf("Invalid row count when reading from a table: %d, want: != %d", len(gotRow), 1)
}
// Test ReadRows
var elt []string
f := func(row Row) bool {
for _, ris := range row {
for _, ri := range ris {
elt = append(elt, formatReadItem(ri))
}
}
return true
}
if err = av.ReadRows(ctx, RowRange{}, f); err != nil {
t.Fatalf("Reading rows from an authorized view: %v", err)
}
want := "r1-col1-1,r1-col2-1"
if got := strings.Join(elt, ","); got != want {
t.Fatalf("Error bulk reading from authorized view.\n Got %v\n Want %v", got, want)
}
elt = nil
if err = tbl.ReadRows(ctx, RowRange{}, f); err != nil {
t.Fatalf("Reading rows from a table: %v", err)
}
want = "r1-col1-1,r1-col2-1,r2-col1-1"
if got := strings.Join(elt, ","); got != want {
t.Fatalf("Error bulk reading from table.\n Got %v\n Want %v", got, want)
}
// Test ReadModifyWrite
rmw := NewReadModifyWrite()
rmw.AppendValue("fam1", "col1", []byte("1"))
gotRow, err = av.ApplyReadModifyWrite(ctx, "r1", rmw)
if err != nil {
t.Fatalf("Applying ReadModifyWrite from an authorized view: %v", err)
}
wantRow = Row{
"fam1": []ReadItem{
{Row: "r1", Column: "fam1:col1", Value: []byte("11")},
},
}
// Make sure the modified cell returned by the RMW operation has a timestamp.
if gotRow["fam1"][0].Timestamp == 0 {
t.Fatalf("RMW returned cell timestamp: got %v, want > 0", gotRow["fam1"][0].Timestamp)
}
clearTimestamps(gotRow)
if !testutil.Equal(gotRow, wantRow) {
t.Fatalf("Error applying ReadModifyWrite from authorized view.\n Got %v\n Want %v", gotRow, wantRow)
}
if _, err = av.ApplyReadModifyWrite(ctx, "r2", rmw); err == nil {
t.Fatalf("Expect error applying ReadModifyWrite from outside an authorized view")
}
// Test SampleRowKeys
presplitTable := fmt.Sprintf("presplit-table-%d", time.Now().Unix())
if err := createPresplitTable(ctx, adminClient, presplitTable, []string{"r0", "r11", "r12", "r2"}); err != nil {
t.Fatal(err)
}
defer adminClient.DeleteTable(ctx, presplitTable)
if err := createColumnFamily(ctx, t, adminClient, presplitTable, "fam1", nil); err != nil {
t.Fatal(err)
}
defer adminClient.DeleteAuthorizedView(ctx, presplitTable, authorizedView)
if err = adminClient.CreateAuthorizedView(ctx, &AuthorizedViewConf{
TableID: presplitTable,
AuthorizedViewID: authorizedView,
AuthorizedView: &SubsetViewConf{
RowPrefixes: [][]byte{[]byte("r1")},
},
DeletionProtection: Unprotected,
}); err != nil {
t.Fatalf("Creating authorized view: %v", err)
}
av = client.OpenAuthorizedView(presplitTable, authorizedView)
sampleKeys, err := av.SampleRowKeys(ctx)
if err != nil {
t.Fatalf("Sampling row keys from an authorized view: %v", err)
}
want = "r11,r12,r2"
if got := strings.Join(sampleKeys, ","); got != want {
t.Fatalf("Error sample row keys from an authorized view.\n Got %v\n Want %v", got, want)
}
}
func TestIntegration_AdminSchemaBundle(t *testing.T) {
testEnv, err := NewIntegrationEnv()
if err != nil {
t.Fatalf("IntegrationEnv: %v", err)
}
defer testEnv.Close()
if !testEnv.Config().UseProd {
t.Skip("emulator doesn't support schemaBundles")
}
timeout := 15 * time.Minute
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
adminClient, err := testEnv.NewAdminClient()
if err != nil {
t.Fatalf("NewAdminClient: %v", err)
}
defer adminClient.Close()
tblConf := TableConf{
TableID: testEnv.Config().Table,
Families: map[string]GCPolicy{
"fam1": MaxVersionsPolicy(1),
"fam2": MaxVersionsPolicy(2),
},
}
if err := createTableFromConf(ctx, adminClient, &tblConf); err != nil {
t.Fatalf("Creating table from TableConf: %v", err)
}
// Delete the table at the end of the test. Schedule ahead of time
// in case the client fails
defer deleteTable(ctx, t, adminClient, tblConf.TableID)
// Create schema bundle
schemaBundleUUID := uid.NewSpace("schemaBundle-", &uid.Options{})
schemaBundle := schemaBundleUUID.New()
defer adminClient.DeleteSchemaBundle(ctx, tblConf.TableID, schemaBundle)
content, err := os.ReadFile("testdata/proto_schema_bundle.pb")
if err != nil {
t.Fatalf("Error reading the file: %v", err)
}
schemaBundleConf := SchemaBundleConf{
TableID: tblConf.TableID,
SchemaBundleID: schemaBundle,
ProtoSchema: &ProtoSchemaInfo{
ProtoDescriptors: content,
},
}
if err = adminClient.CreateSchemaBundle(ctx, &schemaBundleConf); err != nil {
t.Fatalf("Creating schema bundle: %v", err)
}
// List schema bundles
schemaBundles, err := adminClient.SchemaBundles(ctx, tblConf.TableID)
if err != nil {
t.Fatalf("Listing schema bundles: %v", err)
}
if got, want := len(schemaBundles), 1; got != want {
t.Fatalf("Listing schema bundles count: %d, want: != %d", got, want)
}
if got, want := schemaBundles[0], schemaBundle; got != want {
t.Errorf("SchemaBundle Name: %s, want: %s", got, want)
}
// Get schema bundle
sbInfo, err := adminClient.GetSchemaBundle(ctx, tblConf.TableID, schemaBundle)
if err != nil {
t.Fatalf("Getting schema bundle: %v", err)
}
if got, want := sbInfo.SchemaBundle, content; !reflect.DeepEqual(got, want) {
t.Errorf("ProtoSchema: %v, want: %v", got, want)
}
content, err = os.ReadFile("testdata/updated_proto_schema_bundle.pb")
if err != nil {
t.Fatalf("Error reading the file: %v", err)
}
// Update schema bundle
newSchemaBundleConf := SchemaBundleConf{
TableID: tblConf.TableID,
SchemaBundleID: schemaBundle,
Etag: sbInfo.Etag,
ProtoSchema: &ProtoSchemaInfo{
ProtoDescriptors: content,
},
}
err = adminClient.UpdateSchemaBundle(ctx, UpdateSchemaBundleConf{
SchemaBundleConf: newSchemaBundleConf,
})
if err != nil {
t.Fatalf("UpdateSchemaBundle failed: %v", err)
}
// Get schema bundle
sbInfo, err = adminClient.GetSchemaBundle(ctx, tblConf.TableID, schemaBundle)
if err != nil {
t.Fatalf("Getting schema bundle: %v", err)
}
if got, want := sbInfo.SchemaBundle, content; !reflect.DeepEqual(got, want) {
t.Errorf("ProtoSchema: %v, want: %v", got, want)
}
// Delete schema bundle
if err = adminClient.DeleteSchemaBundle(ctx, tblConf.TableID, schemaBundle); err != nil {
t.Fatalf("DeleteSchemaBundle: %v", err)
}
// Verify the schema bundle was deleted.
schemaBundles, err = adminClient.SchemaBundles(ctx, tblConf.TableID)
if err != nil {
t.Fatalf("Listing schema bundles: %v", err)
}
if got, want := len(schemaBundles), 0; got != want {
t.Fatalf("Listing schema bundles count: %d, want: != %d", got, want)
}
}
func TestIntegration_DataMaterializedView(t *testing.T) {
t.Skip("flaky https://github.com/googleapis/google-cloud-go/issues/12686")
testEnv, err := NewIntegrationEnv()
if err != nil {
t.Fatalf("IntegrationEnv: %v", err)
}
defer testEnv.Close()
if !testEnv.Config().UseProd {
t.Skip("emulator doesn't support materializedViews")
}
timeout := 15 * time.Minute
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
adminClient, err := testEnv.NewAdminClient()
if err != nil {
t.Fatalf("NewAdminClient: %v", err)
}
defer adminClient.Close()
instanceAdminClient, err := testEnv.NewInstanceAdminClient()
if err != nil {
t.Fatalf("NewInstanceAdminClient: %v", err)
}
defer instanceAdminClient.Close()
tblConf := TableConf{
TableID: testEnv.Config().Table,
Families: map[string]GCPolicy{
"fam1": MaxVersionsPolicy(1),
"fam2": MaxVersionsPolicy(2),
},
}
if err := createTableFromConf(ctx, adminClient, &tblConf); err != nil {
t.Fatalf("Creating table from TableConf: %v", err)
}
// Delete the table at the end of the test. Schedule ahead of time
// in case the client fails
defer deleteTable(ctx, t, adminClient, tblConf.TableID)
client, err := testEnv.NewClient()
if err != nil {
t.Fatalf("NewClient: %v", err)
}
defer client.Close()
// Populate table
tbl := client.OpenTable(tblConf.TableID)
mut := NewMutation()
mut.Set("fam1", "col1", 1000, []byte("1"))
if err := tbl.Apply(ctx, "r1", mut); err != nil {
t.Fatalf("Mutating row: %v", err)
}
// Create materialized view
materializedViewUUID := uid.NewSpace("materializedView-", &uid.Options{})
materializedView := materializedViewUUID.New()
defer instanceAdminClient.DeleteMaterializedView(ctx, testEnv.Config().Instance, materializedView)
materializedViewInfo := MaterializedViewInfo{
MaterializedViewID: materializedView,
Query: fmt.Sprintf("SELECT _key, count(fam1['col1']) as `result.count` FROM `%s` GROUP BY _key", tblConf.TableID),
DeletionProtection: Unprotected,
}
if err = instanceAdminClient.CreateMaterializedView(ctx, testEnv.Config().Instance, &materializedViewInfo); err != nil {
t.Fatalf("Creating materialized view: %v", err)
}
mv := client.OpenMaterializedView(materializedView)
// Test ReadRow
gotRow, err := mv.ReadRow(ctx, "r1")
if err != nil {
t.Fatalf("Reading row from a materialized view: %v", err)
}
wantRow := Row{
"result": []ReadItem{
{Row: "r1", Column: "result:count", Timestamp: 0, Value: binary.BigEndian.AppendUint64([]byte{}, 1)},
},
"default": []ReadItem{
{Row: "r1", Column: "default:", Timestamp: 0},
},
}
if !testutil.Equal(gotRow, wantRow) {
t.Errorf("Error reading row from materialized view.\n Got %#v\n Want %#v", gotRow, wantRow)
}
gotRow, err = mv.ReadRow(ctx, "r2")
if err != nil {
t.Fatalf("Reading row from an materialized view: %v", err)
}
if len(gotRow) != 0 {
t.Errorf("Expect empty result when reading row from outside an materialized view")
}
// Test ReadRows
var elt []string
f := func(row Row) bool {
for _, ris := range row {
for _, ri := range ris {
elt = append(elt, formatReadItem(ri))
}
}
return true
}
if err = mv.ReadRows(ctx, RowRange{}, f); err != nil {
t.Fatalf("Reading rows from an materialized view: %v", err)
}
want := "r1--,r1-count-" + string(binary.BigEndian.AppendUint64([]byte{}, 1))
sort.Strings(elt)
if got := strings.Join(elt, ","); got != want {
t.Errorf("Error bulk reading from materialized view.\n Got %q\n Want %q", got, want)
}
// Test SampleRowKeys
if _, err := mv.SampleRowKeys(ctx); err != nil {
t.Errorf("Sampling row keys from an materialized view: %v", err)
}
}
func TestIntegration_AdminLogicalView(t *testing.T) {
testEnv, err := NewIntegrationEnv()
if err != nil {
t.Fatalf("IntegrationEnv: %v", err)
}
defer testEnv.Close()
if !testEnv.Config().UseProd {
t.Skip("emulator doesn't support logicalViews")
}
timeout := 15 * time.Minute
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
adminClient, err := testEnv.NewAdminClient()
if err != nil {
t.Fatalf("NewAdminClient: %v", err)
}
defer adminClient.Close()
instanceAdminClient, err := testEnv.NewInstanceAdminClient()
if err != nil {
t.Fatalf("NewInstanceAdminClient: %v", err)
}
defer instanceAdminClient.Close()
tblConf := TableConf{
TableID: testEnv.Config().Table,
Families: map[string]GCPolicy{
"fam1": MaxVersionsPolicy(1),
"fam2": MaxVersionsPolicy(2),
},
}
if err := createTableFromConf(ctx, adminClient, &tblConf); err != nil {
t.Fatalf("Creating table from TableConf: %v", err)
}
// Delete the table at the end of the test. Schedule ahead of time
// in case the client fails
defer deleteTable(ctx, t, adminClient, tblConf.TableID)
// Create logical view
logicalViewUUID := uid.NewSpace("logicalView-", &uid.Options{})
logicalView := logicalViewUUID.New()
defer instanceAdminClient.DeleteLogicalView(ctx, testEnv.Config().Instance, logicalView)
logicalViewInfo := LogicalViewInfo{
LogicalViewID: logicalView,
Query: fmt.Sprintf("SELECT _key, fam1['col1'] as col FROM `%s`", tblConf.TableID),
DeletionProtection: Protected,
}
if err = instanceAdminClient.CreateLogicalView(ctx, testEnv.Config().Instance, &logicalViewInfo); err != nil {
t.Fatalf("Creating logical view: %v", err)
}
// List logical views
logicalViews, err := instanceAdminClient.LogicalViews(ctx, testEnv.Config().Instance)
if err != nil {
t.Fatalf("Listing logical views: %v", err)
}
if got, want := len(logicalViews), 1; got != want {
t.Fatalf("Listing logical views count: %d, want: != %d", got, want)
}
if got, want := logicalViews[0].LogicalViewID, logicalView; got != want {
t.Errorf("LogicalView Name: %s, want: %s", got, want)
}
if got, want := logicalViews[0].Query, logicalViewInfo.Query; got != want {
t.Errorf("LogicalView Query: %q, want: %q", got, want)
}
if got, want := logicalViews[0].DeletionProtection, logicalViewInfo.DeletionProtection; got != want {
t.Errorf("LogicalView DeletionProtection: %v, want: %v", got, want)
}
// Get logical view
lvInfo, err := instanceAdminClient.LogicalViewInfo(ctx, testEnv.Config().Instance, logicalView)
if err != nil {
t.Fatalf("Getting logical view: %v", err)
}
if got, want := lvInfo.Query, logicalViewInfo.Query; got != want {
t.Errorf("LogicalView Query: %q, want: %q", got, want)
}
if got, want := lvInfo.DeletionProtection, logicalViewInfo.DeletionProtection; got != want {
t.Errorf("LogicalView DeletionProtection: %v, want: %v", got, want)
}
// Update logical view
newLogicalViewInfo := LogicalViewInfo{
LogicalViewID: logicalView,
Query: fmt.Sprintf("SELECT _key, fam2['col1'] as col FROM `%s`", tblConf.TableID),
DeletionProtection: Unprotected,
}
err = instanceAdminClient.UpdateLogicalView(ctx, testEnv.Config().Instance, newLogicalViewInfo)
if err != nil {
t.Fatalf("UpdateLogicalView failed: %v", err)
}
// Check that updated logical view has the correct deletion protection
lvInfo, err = instanceAdminClient.LogicalViewInfo(ctx, testEnv.Config().Instance, logicalView)
if err != nil {
t.Fatalf("Getting logical view: %v", err)
}
if got, want := lvInfo.Query, newLogicalViewInfo.Query; got != want {
t.Errorf("LogicalView Query: %q, want: %q", got, want)
}
if got, want := lvInfo.DeletionProtection, newLogicalViewInfo.DeletionProtection; got != want {
t.Errorf("LogicalView DeletionProtection: %v, want: %v", got, want)
}
// Delete logical view
if err = instanceAdminClient.DeleteLogicalView(ctx, testEnv.Config().Instance, logicalView); err != nil {
t.Fatalf("DeleteLogicalView: %v", err)
}
// Verify the logical view was deleted.
logicalViews, err = instanceAdminClient.LogicalViews(ctx, testEnv.Config().Instance)
if err != nil {
t.Fatalf("Listing logical views: %v", err)
}
if got, want := len(logicalViews), 0; got != want {
t.Fatalf("Listing logical views count: %d, want: != %d", got, want)
}
}
func TestIntegration_AdminMaterializedView(t *testing.T) {
t.Skip("broken https://github.com/googleapis/google-cloud-go/issues/12415")
testEnv, err := NewIntegrationEnv()
if err != nil {
t.Fatalf("IntegrationEnv: %v", err)
}
defer testEnv.Close()
if !testEnv.Config().UseProd {
t.Skip("emulator doesn't support materializedViews")
}
timeout := 15 * time.Minute
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
adminClient, err := testEnv.NewAdminClient()
if err != nil {
t.Fatalf("NewAdminClient: %v", err)
}
defer adminClient.Close()
instanceAdminClient, err := testEnv.NewInstanceAdminClient()
if err != nil {
t.Fatalf("NewInstanceAdminClient: %v", err)
}
defer instanceAdminClient.Close()
tblConf := TableConf{
TableID: testEnv.Config().Table,
Families: map[string]GCPolicy{
"fam1": MaxVersionsPolicy(1),
"fam2": MaxVersionsPolicy(2),
},
}
if err := createTableFromConf(ctx, adminClient, &tblConf); err != nil {
t.Fatalf("Creating table from TableConf: %v", err)
}
// Delete the table at the end of the test. Schedule ahead of time
// in case the client fails
defer deleteTable(ctx, t, adminClient, tblConf.TableID)
// Create materialized view
materializedViewUUID := uid.NewSpace("materializedView-", &uid.Options{})
materializedView := materializedViewUUID.New()
defer instanceAdminClient.DeleteMaterializedView(ctx, testEnv.Config().Instance, materializedView)
materializedViewInfo := MaterializedViewInfo{
MaterializedViewID: materializedView,
Query: fmt.Sprintf("SELECT _key, count(fam1['col1']) as count FROM `%s` GROUP BY _key", tblConf.TableID),
DeletionProtection: Protected,
}
if err = instanceAdminClient.CreateMaterializedView(ctx, testEnv.Config().Instance, &materializedViewInfo); err != nil {
t.Fatalf("Creating materialized view: %v", err)
}
// List materialized views
materializedViews, err := instanceAdminClient.MaterializedViews(ctx, testEnv.Config().Instance)
if err != nil {
t.Fatalf("Listing materialized views: %v", err)
}
if got, want := len(materializedViews), 1; got < want {
t.Fatalf("Listing materialized views count: %d, want: >= %d", got, want)
}
for _, mv := range materializedViews {
if mv.MaterializedViewID == materializedView {
if got, want := mv.Query, materializedViewInfo.Query; got != want {
t.Errorf("MaterializedView Query: %q, want: %q", got, want)
}
}
}
// Get materialized view
mvInfo, err := instanceAdminClient.MaterializedViewInfo(ctx, testEnv.Config().Instance, materializedView)
if err != nil {
t.Fatalf("Getting materialized view: %v", err)
}
if got, want := mvInfo.Query, materializedViewInfo.Query; got != want {
t.Errorf("MaterializedView Query: %q, want: %q", got, want)
}
// Cannot delete the materialized view because it is deletion protected
if err = instanceAdminClient.DeleteMaterializedView(ctx, testEnv.Config().Instance, materializedView); err == nil {
t.Fatalf("DeleteMaterializedView: %v", err)
}
// Update materialized view
newMaterializedViewInfo := MaterializedViewInfo{
MaterializedViewID: materializedView,
DeletionProtection: Unprotected,
}
err = instanceAdminClient.UpdateMaterializedView(ctx, testEnv.Config().Instance, newMaterializedViewInfo)
if err != nil {
t.Fatalf("UpdateMaterializedView failed: %v", err)
}
// Check that updated materialized view has the correct deletion protection
mvInfo, err = instanceAdminClient.MaterializedViewInfo(ctx, testEnv.Config().Instance, materializedView)
if err != nil {
t.Fatalf("Getting materialized view: %v", err)
}
if got, want := mvInfo.DeletionProtection, Unprotected; got != want {
t.Errorf("MaterializedViewInfo deletion protection: %v, want: %v", got, want)
}
// Check that the subset_view field doesn't change
if got, want := mvInfo.Query, materializedViewInfo.Query; !cmp.Equal(got, want) {
t.Errorf("Query: %q, want: %q", got, want)
}
// Delete materialized view
if err = instanceAdminClient.DeleteMaterializedView(ctx, testEnv.Config().Instance, materializedView); err != nil {
t.Fatalf("DeleteMaterializedView: %v", err)
}
// Verify the materialized view was deleted.
materializedViews, err = instanceAdminClient.MaterializedViews(ctx, testEnv.Config().Instance)
if err != nil {
t.Fatalf("Listing materialized views: %v", err)
}
for _, mv := range materializedViews {
if mv.MaterializedViewID == materializedView {
t.Errorf("Found view %q that was meant to be deleted", materializedView)
}
}
}
// TestIntegration_DirectPathFallback tests the CFE fallback when the directpath net is blackholed.
func TestIntegration_DirectPathFallback(t *testing.T) {
ctx := context.Background()
testEnv, _, _, table, _, cleanup, err := setupIntegration(ctx, t)
if err != nil {
t.Fatal(err)
}
defer cleanup()
if !testEnv.Config().AttemptDirectPath {
t.Skip()
}
if len(blackholeDpv6Cmd) == 0 {
t.Fatal("-it.blackhole-dpv6-cmd unset")
}
if len(blackholeDpv4Cmd) == 0 {
t.Fatal("-it.blackhole-dpv4-cmd unset")
}
if len(allowDpv6Cmd) == 0 {
t.Fatal("-it.allowdpv6-cmd unset")
}
if len(allowDpv4Cmd) == 0 {
t.Fatal("-it.allowdpv4-cmd unset")
}
if err := populatePresidentsGraph(table); err != nil {
t.Fatal(err)
}
// Precondition: wait for DirectPath to connect.
dpEnabled := examineTraffic(ctx, testEnv, table, false)
if !dpEnabled {
t.Fatalf("Failed to observe RPCs over DirectPath")
}
// Enable the blackhole, which will prevent communication with grpclb and thus DirectPath.
blackholeDirectPath(testEnv, t)
dpDisabled := examineTraffic(ctx, testEnv, table, true)
if !dpDisabled {
t.Fatalf("Failed to fallback to CFE after blackhole DirectPath")
}
// Disable the blackhole, and client should use DirectPath again.
allowDirectPath(testEnv, t)
dpEnabled = examineTraffic(ctx, testEnv, table, false)
if !dpEnabled {
t.Fatalf("Failed to fallback to CFE after blackhole DirectPath")
}
}
func TestIntegration_Execute(t *testing.T) {
// Set up table and clients
ctx := context.Background()
testEnv, client, adminClient, table, _, cleanup, err := setupIntegration(ctx, t)
if err != nil {
t.Fatal(err)
}
t.Cleanup(func() { cleanup() })
if !testEnv.Config().UseProd {
t.Skip("emulator doesn't support PrepareQuery")
}
colFam := "address"
err = createColumnFamily(ctx, t, adminClient, table.table, colFam, map[codes.Code]bool{codes.Unavailable: true})
if err != nil {
t.Fatal("createColumnFamily: " + err.Error())
}
// Add data to table
v1Timestamp := Time(time.Now().Add(-time.Minute))
v2Timestamp := Time(time.Now())
populateAddresses(ctx, t, table, colFam, v1Timestamp, v2Timestamp)
base64City := base64.StdEncoding.EncodeToString([]byte("city"))
base64State := base64.StdEncoding.EncodeToString([]byte("state"))
// While writing, "timestamp": s"2025-03-31 03:26:33.489256 +0000 UTC"
// When reading back, "timestamp": s"2025-03-31 03:26:33.489 +0000 UTC"
ignoreMillisecondDiff := cmpopts.EquateApproxTime(time.Millisecond)
cmpOpts := []cmp.Option{ignoreMillisecondDiff, cmp.AllowUnexported(Struct{})}
tsParamValue := time.Now()
type col struct {
name string
gotDest any
wantDest any
}
// Run test cases
for _, tc := range []struct {
desc string
psQuery string
psParamTypes map[string]SQLType
bsParamValues map[string]any
rows [][]col
}{
{
desc: "select *",
psQuery: "SELECT * FROM `" + table.table + "` ORDER BY _key LIMIT 5",
rows: [][]col{
{
{
name: "_key",
gotDest: []byte{},
wantDest: []byte("row-01"),
},
{
name: "address",
gotDest: map[string][]byte{},
wantDest: map[string][]byte{
base64City: []byte("San Francisco"),
base64State: []byte("CA"),
},
},
{
name: "follows",
gotDest: map[string][]byte{},
wantDest: map[string][]byte{},
},
{
name: "sum",
gotDest: map[string]int64{},
wantDest: map[string]int64{},
},
},
{
{
name: "_key",
gotDest: []byte{},
wantDest: []byte("row-02"),
},
{
name: "address",
gotDest: map[string][]byte{},
wantDest: map[string][]byte{
base64City: []byte("Phoenix"),
base64State: []byte("AZ"),
},
},
{
name: "follows",
gotDest: map[string][]byte{},
wantDest: map[string][]byte{},
},
{
name: "sum",
gotDest: map[string]int64{},
wantDest: map[string]int64{},
},
},
},
},
{
desc: "WITH_HISTORY key and column",
psQuery: "SELECT _key, " + colFam + "['state'] AS state FROM `" + table.table + "`(WITH_HISTORY=>TRUE) LIMIT 5",
rows: [][]col{
{
{
name: "_key",
gotDest: []byte{},
wantDest: []byte("row-01"),
},
{
name: "state",
gotDest: []Struct{},
wantDest: []Struct{
{
fields: []structFieldWithValue{
{
Name: "timestamp",
Value: v2Timestamp.Time(),
},
{
Name: "value",
Value: []byte("CA"),
},
},
nameToIndex: map[string][]int{"timestamp": {0}, "value": {1}},
},
{
fields: []structFieldWithValue{
{
Name: "timestamp",
Value: v1Timestamp.Time(),
},
{
Name: "value",
Value: []byte("WA"),
},
},
nameToIndex: map[string][]int{"timestamp": {0}, "value": {1}},
},
},
},
},
{
{
name: "_key",
gotDest: []byte{},
wantDest: []byte("row-02"),
},
{
name: "state",
gotDest: []Struct{},
wantDest: []Struct{
{
fields: []structFieldWithValue{
{
Name: "timestamp",
Value: v1Timestamp.Time(),
},
{
Name: "value",
Value: []byte("AZ"),
},
},
nameToIndex: map[string][]int{"timestamp": {0}, "value": {1}},
},
},
},
},
},
},
{
desc: "WITH_HISTORY column family",
psQuery: "SELECT " + colFam + " FROM `" + table.table + "`(WITH_HISTORY=>TRUE) WHERE _key='row-01'",
rows: [][]col{
{
{
name: "address",
gotDest: map[string][]Struct{},
wantDest: map[string][]Struct{
base64State: {
{
fields: []structFieldWithValue{
{
Name: "timestamp",
Value: v2Timestamp.Time(),
},
{
Name: "value",
Value: []byte("CA"),
},
},
nameToIndex: map[string][]int{"timestamp": {0}, "value": {1}},
},
{
fields: []structFieldWithValue{
{
Name: "timestamp",
Value: v1Timestamp.Time(),
},
{
Name: "value",
Value: []byte("WA"),
},
},
nameToIndex: map[string][]int{"timestamp": {0}, "value": {1}},
},
},
base64City: {
{
fields: []structFieldWithValue{
{
Name: "timestamp",
Value: v2Timestamp.Time(),
},
{
Name: "value",
Value: []byte("San Francisco"),
},
},
nameToIndex: map[string][]int{"timestamp": {0}, "value": {1}},
},
},
},
},
},
},
},
{
desc: "all types in result set",
psQuery: "SELECT 'stringVal' AS strCol, b'foo' as bytesCol, 1 AS intCol, CAST(1.2 AS FLOAT32) as f32Col, " +
"CAST(1.3 AS FLOAT64) as f64Col, true as boolCol, TIMESTAMP_FROM_UNIX_MILLIS(1000) AS tsCol, " +
"DATE(2024, 06, 01) as dateCol, STRUCT(1 as a, \"foo\" as b) AS structCol, [1,2,3] AS arrCol, " +
colFam +
" as mapCol FROM `" +
table.table +
"` WHERE _key='row-01' LIMIT 1",
rows: [][]col{
{
{
name: "strCol",
gotDest: "",
wantDest: "stringVal",
},
{
name: "bytesCol",
gotDest: []byte{},
wantDest: []byte("foo"),
},
{
name: "intCol",
gotDest: int64(0),
wantDest: int64(1),
},
{
name: "f32Col",
gotDest: float32(0),
wantDest: float32(1.2),
},
{
name: "f64Col",
gotDest: float64(0),
wantDest: float64(1.3),
},
{
name: "boolCol",
gotDest: false,
wantDest: true,
},
{
name: "tsCol",
gotDest: time.Time{},
wantDest: time.Unix(1, 0),
},
{
name: "dateCol",
gotDest: civil.Date{},
wantDest: civil.Date{Year: 2024, Month: 06, Day: 01},
},
{
name: "structCol",
gotDest: Struct{},
wantDest: Struct{
fields: []structFieldWithValue{{Name: "a", Value: int64(1)}, {Name: "b", Value: string("foo")}},
nameToIndex: map[string][]int{"a": {0}, "b": {1}},
},
},
{
name: "arrCol",
gotDest: []int64{},
wantDest: []int64{1, 2, 3},
},
{
name: "mapCol",
gotDest: map[string][]byte{},
wantDest: map[string][]byte{
base64City: []byte("San Francisco"),
base64State: []byte("CA"),
},
},
},
},
},
{
desc: "all types in query parameters",
psQuery: "SELECT @bytesParam as bytesCol, @stringParam AS strCol, @int64Param AS int64Col, " +
"@float32Param AS float32Col, @float64Param AS float64Col, @boolParam AS boolCol, " +
"@tsParam AS tsCol, @dateParam AS dateCol, @bytesArrayParam AS bytesArrayCol, " +
"@stringArrayParam AS stringArrayCol, @int64ArrayParam AS int64ArrayCol, " +
"@float32ArrayParam AS float32ArrayCol, @float64ArrayParam AS float64ArrayCol, " +
"@boolArrayParam AS boolArrayCol, @tsArrayParam AS tsArrayCol, " +
"@dateArrayParam AS dateArrayCol",
psParamTypes: map[string]SQLType{
"bytesParam": BytesSQLType{},
"stringParam": StringSQLType{},
"int64Param": Int64SQLType{},
"float32Param": Float32SQLType{},
"float64Param": Float64SQLType{},
"boolParam": BoolSQLType{},
"tsParam": TimestampSQLType{},
"dateParam": DateSQLType{},
"bytesArrayParam": ArraySQLType{
ElemType: BytesSQLType{},
},
"stringArrayParam": ArraySQLType{
ElemType: StringSQLType{},
},
"int64ArrayParam": ArraySQLType{
ElemType: Int64SQLType{},
},
"float32ArrayParam": ArraySQLType{
ElemType: Float32SQLType{},
},
"float64ArrayParam": ArraySQLType{
ElemType: Float64SQLType{},
},
"boolArrayParam": ArraySQLType{
ElemType: BoolSQLType{},
},
"tsArrayParam": ArraySQLType{
ElemType: TimestampSQLType{},
},
"dateArrayParam": ArraySQLType{
ElemType: DateSQLType{},
},
},
bsParamValues: map[string]any{
"bytesParam": []byte("foo"),
"stringParam": "stringVal",
"int64Param": int64(1),
"float32Param": float32(1.3),
"float64Param": float64(1.4),
"boolParam": true,
"tsParam": tsParamValue,
"dateParam": civil.DateOf(tsParamValue),
"bytesArrayParam": [][]byte{[]byte("foo"), nil, []byte("bar")},
"stringArrayParam": []string{"baz", "qux"},
"int64ArrayParam": []any{int64(1), nil, int64(2)},
"float32ArrayParam": []any{float32(1.3), nil, float32(2.3)},
"float64ArrayParam": []float64{1.4, 2.4, 3.4},
"boolArrayParam": []any{true, nil, false},
"tsArrayParam": []any{tsParamValue, nil},
"dateArrayParam": []civil.Date{civil.DateOf(tsParamValue), civil.DateOf(tsParamValue.Add(24 * time.Hour))},
},
rows: [][]col{
{
{
name: "bytesCol",
gotDest: []byte{},
wantDest: []byte("foo"),
},
{
name: "strCol",
gotDest: "",
wantDest: "stringVal",
},
{
name: "int64Col",
gotDest: int64(0),
wantDest: int64(1),
},
{
name: "float32Col",
gotDest: float32(0),
wantDest: float32(1.3),
},
{
name: "float64Col",
gotDest: float64(0),
wantDest: float64(1.4),
},
{
name: "boolCol",
gotDest: false,
wantDest: true,
},
{
name: "tsCol",
gotDest: time.Time{},
wantDest: tsParamValue,
},
{
name: "dateCol",
gotDest: civil.Date{},
wantDest: civil.DateOf(tsParamValue),
},
{
name: "bytesArrayCol",
gotDest: [][]byte{},
wantDest: [][]byte{[]byte("foo"), nil, []byte("bar")},
},
{
name: "stringArrayCol",
gotDest: []string{},
wantDest: []string{"baz", "qux"},
},
{
name: "int64ArrayCol",
gotDest: []any{},
wantDest: []any{int64(1), nil, int64(2)},
},
{
name: "float32ArrayCol",
gotDest: []any{},
wantDest: []any{float32(1.3), nil, float32(2.3)},
},
{
name: "float64ArrayCol",
gotDest: []float64{},
wantDest: []float64{1.4, 2.4, 3.4},
},
{
name: "boolArrayCol",
gotDest: []any{},
wantDest: []any{true, nil, false},
},
{
name: "tsArrayCol",
gotDest: []any{},
wantDest: []any{tsParamValue, nil},
},
{
name: "dateArrayCol",
gotDest: []civil.Date{civil.DateOf(tsParamValue), civil.DateOf(tsParamValue.Add(24 * time.Hour))},
wantDest: []civil.Date{civil.DateOf(tsParamValue), civil.DateOf(tsParamValue.Add(24 * time.Hour))},
},
},
},
},
} {
t.Run(tc.desc, func(t *testing.T) {
ps, err := client.PrepareStatement(ctx, tc.psQuery, tc.psParamTypes)
if err != nil {
t.Fatal("PrepareStatement: " + err.Error())
}
bs, err := ps.Bind(tc.bsParamValues)
if err != nil {
t.Fatal("Bind: " + err.Error())
}
gotRowCount := 0
if err = bs.Execute(ctx, func(rr ResultRow) bool {
foundErr := false
// Assert that rr has correct values
if (tc.rows) != nil {
// more rows than expected
if gotRowCount >= len(tc.rows) {
t.Fatalf("#%v: Unexpected row returned from Execute. gotRow: %#v", gotRowCount, rr)
}
var wantColCount int
for wantColCount < len(tc.rows[gotRowCount]) {
gotCurrCol := tc.rows[gotRowCount][wantColCount]
destType := reflect.TypeOf(gotCurrCol.gotDest)
// Assert GetByName returns correct value
gotDestPtrReflectValue := reflect.New(destType)
gotDestPtrInterface := gotDestPtrReflectValue.Interface()
errGet := rr.GetByName(gotCurrCol.name, gotDestPtrInterface)
if errGet != nil {
t.Errorf("Row #%d: GetByName(name='%s', dest type='%T') failed: %v", gotRowCount, gotCurrCol.name, gotDestPtrInterface, errGet)
foundErr = true
}
gotDest := reflect.ValueOf(gotDestPtrInterface).Elem().Interface()
if diff := testutil.Diff(gotCurrCol.wantDest, gotDest, cmpOpts...); diff != "" {
t.Errorf("[Row:%v Column:%v] GetByName: got: %#v, want: %#v, diff (-want +got):\n %+v",
gotRowCount, wantColCount, gotDest, gotCurrCol.wantDest, diff)
foundErr = true
}
// Assert GetByIndex returns correct value
gotDestPtrReflectValue = reflect.New(destType)
gotDestPtrInterface = gotDestPtrReflectValue.Interface()
errGet = rr.GetByIndex(wantColCount, gotDestPtrInterface)
if errGet != nil {
t.Errorf("Row #%d: GetByIndex(index='%d', destType='%T') failed: %v", gotRowCount, wantColCount, gotDest, errGet)
foundErr = true
}
gotDest = reflect.ValueOf(gotDestPtrInterface).Elem().Interface()
if diff := testutil.Diff(gotCurrCol.wantDest, gotDest, cmpOpts...); diff != "" {
t.Errorf("[Row:%v Column:%v] GetByIndex: got: %#v, want: %#v, diff (-want +got):\n %+v",
gotRowCount, wantColCount, gotDest, gotCurrCol.wantDest, diff)
foundErr = true
}
wantColCount++
}
if len(rr.pbValues) != len(tc.rows[gotRowCount]) {
t.Errorf("[Row:%v] Number of columns: got: %v, want: %v", gotRowCount, len(rr.pbValues), len(tc.rows[gotRowCount]))
// more columns than expected
if len(rr.pbValues) > len(tc.rows[gotRowCount]) {
i := len(tc.rows[gotRowCount])
for i < len(rr.pbValues) {
t.Errorf("[Row:%v Column:%v]: Unexpected column with value: %v", gotRowCount, i, rr.pbValues[i])
i++
}
foundErr = true
}
// lesser columns than expected
if len(rr.pbValues) < len(tc.rows[gotRowCount]) {
i := len(rr.pbValues)
for i < len(tc.rows[gotRowCount]) {
t.Errorf("[Row:%v Column:%v]: Missing column with value: %v", gotRowCount, i, tc.rows[gotRowCount][i])
i++
}
foundErr = true
}
}
if foundErr {
return false // Stop processing on error
}
}
gotRowCount++
return true
}); err != nil {
t.Fatal("Execute: " + err.Error())
}
// lesser rows than expected
if gotRowCount < len(tc.rows) {
t.Errorf("Number of rows: got: %v, want: %v", gotRowCount, len(tc.rows))
i := gotRowCount
for i < len(tc.rows) {
t.Errorf("#%v: Row missing in Execute response: %#v", i, tc.rows[gotRowCount])
i++
}
}
})
}
}
func populateAddresses(ctx context.Context, t *testing.T, table *Table, colFam string, v1Timestamp, v2Timestamp Timestamp) {
type cell struct {
Ts Timestamp
Value []byte
}
muts := []*Mutation{}
rowKeys := []string{}
for rowKey, mutData := range map[string]map[string]any{
"row-01": {
"state": []cell{
{
Ts: v1Timestamp,
Value: []byte("WA"),
},
{
Ts: v2Timestamp,
Value: []byte("CA"),
},
},
"city": []cell{
{
Ts: v2Timestamp,
Value: []byte("San Francisco"),
},
},
},
"row-02": {
"state": []cell{
{
Ts: v1Timestamp,
Value: []byte("AZ"),
},
},
"city": []cell{
{
Ts: v1Timestamp,
Value: []byte("Phoenix"),
},
},
},
} {
mut := NewMutation()
for col, v := range mutData {
cells, ok := v.([]cell)
if ok {
for _, cell := range cells {
mut.Set(colFam, col, cell.Ts, cell.Value)
}
}
}
muts = append(muts, mut)
rowKeys = append(rowKeys, rowKey)
}
rowErrs, err := table.ApplyBulk(ctx, rowKeys, muts)
if err != nil || rowErrs != nil {
t.Fatal("ApplyBulk: ", err)
}
}
// examineTraffic returns whether RPCs use DirectPath (blackholeDP = false) or CFE (blackholeDP = true).
func examineTraffic(ctx context.Context, testEnv IntegrationEnv, table *Table, blackholeDP bool) bool {
numCount := 0
const (
numRPCsToSend = 20
minCompleteRPC = 40
)
start := time.Now()
for time.Since(start) < 2*time.Minute {
for i := 0; i < numRPCsToSend; i++ {
_, _ = table.ReadRow(ctx, "j§adams")
if _, useDP := isDirectPathRemoteAddress(testEnv); useDP != blackholeDP {
numCount++
if numCount >= minCompleteRPC {
return true
}
}
time.Sleep(100 * time.Millisecond)
}
}
return false
}
func setupIntegration(ctx context.Context, t *testing.T) (_ IntegrationEnv, _ *Client, _ *AdminClient, table *Table, tableName string, cleanup func(), _ error) {
testEnv, err := NewIntegrationEnv()
if err != nil {
return nil, nil, nil, nil, "", nil, err
}
var timeout time.Duration
if testEnv.Config().UseProd {
timeout = 10 * time.Minute
t.Logf("Running test against production")
} else {
timeout = 5 * time.Minute
t.Logf("bttest.Server running on %s", testEnv.Config().AdminEndpoint)
}
ctx, cancel := context.WithTimeout(ctx, timeout)
_ = cancel // ignore for test
// Reduce sampling period for faster test runs
origSamplePeriod := defaultSamplePeriod
defaultSamplePeriod = time.Minute
client, err := testEnv.NewClient()
if err != nil {
t.Logf("Error creating client: %v", err)
return nil, nil, nil, nil, "", nil, err
}
adminClient, err := testEnv.NewAdminClient()
if err != nil {
cancel()
client.Close()
t.Logf("Error creating admin client: %v", err)
return nil, nil, nil, nil, "", nil, err
}
if testEnv.Config().UseProd {
// TODO: tables may not be successfully deleted in some cases, and will
// become obsolete. We may need a way to automatically delete them.
tableName = tableNameSpace.New()
} else {
tableName = testEnv.Config().Table
}
if err := createTable(ctx, adminClient, tableName); err != nil {
cancel()
client.Close()
adminClient.Close()
t.Logf("Error creating table: %v", err)
return nil, nil, nil, nil, "", nil, err
}
err = createColumnFamily(ctx, t, adminClient, tableName, "follows", map[codes.Code]bool{codes.Unavailable: true})
if err != nil {
if deleteErr := adminClient.DeleteTable(ctx, tableName); deleteErr != nil {
t.Logf("DeleteTable got error %v", deleteErr)
}
cancel()
client.Close()
adminClient.Close()
t.Logf("Error creating column family: %v", err)
return nil, nil, nil, nil, "", nil, err
}
err = createColumnFamilyWithConfig(ctx, t, adminClient, tableName, "sum", &Family{ValueType: AggregateType{
Input: Int64Type{},
Aggregator: SumAggregator{},
}}, map[codes.Code]bool{codes.Unavailable: true})
if err != nil {
if deleteErr := deleteTable(ctx, t, adminClient, tableName); deleteErr != nil {
t.Logf("DeleteTable got error %v", deleteErr)
}
cancel()
client.Close()
adminClient.Close()
t.Logf("Error creating aggregate column family: %v", err)
return nil, nil, nil, nil, "", nil, err
}
return testEnv, client, adminClient, client.Open(tableName), tableName, func() {
defaultSamplePeriod = origSamplePeriod
if err := deleteTable(ctx, t, adminClient, tableName); err != nil {
t.Errorf("DeleteTable got error %v", err)
}
cancel()
client.Close()
adminClient.Close()
}, nil
}
func createPresplitTable(ctx context.Context, adminClient *AdminClient, tableName string, splitKeys []string) error {
return retry(func() error { return adminClient.CreatePresplitTable(ctx, tableName, splitKeys) },
func() error { return adminClient.DeleteTable(ctx, tableName) })
}
func createTableFromConf(ctx context.Context, adminClient *AdminClient, conf *TableConf) error {
return retry(func() error { return adminClient.CreateTableFromConf(ctx, conf) },
func() error { return adminClient.DeleteTable(ctx, conf.TableID) })
}
func createTable(ctx context.Context, adminClient *AdminClient, tableName string) error {
return retry(func() error { return adminClient.CreateTable(ctx, tableName) },
func() error { return adminClient.DeleteTable(ctx, tableName) })
}
// retry 'f' and runs 'onExists' if 'f' returns AlreadyExists error
// onExists can be nil
func retry(f func() error, onExists func() error) error {
if f == nil {
return nil
}
// Error seen on last attempt
var lastErr error
attemptsDone := 0
internal.Retry(context.Background(), retryCreateBackoff, func() (bool, error) {
currErr := f()
lastErr = currErr
if currErr != nil {
s, ok := status.FromError(lastErr)
if ok && s.Code() == codes.AlreadyExists && onExists != nil {
lastErr = onExists()
}
}
attemptsDone++
return lastErr == nil || attemptsDone == maxCreateAttempts, lastErr
})
return lastErr
}
func createColumnFamily(ctx context.Context, t *testing.T, adminClient *AdminClient, table, family string, retryableCodes map[codes.Code]bool) error {
return createColumnFamilyWithConfig(ctx, t, adminClient, table, family, nil, retryableCodes)
}
func createColumnFamilyWithConfig(ctx context.Context, t *testing.T, adminClient *AdminClient, table, family string, config *Family, retryableCodes map[codes.Code]bool) error {
// Error seen on last create attempt
var err error
testutil.Retry(t, maxCreateAttempts, retryCreateSleep, func(r *testutil.R) {
var createErr error
if config != nil {
createErr = adminClient.CreateColumnFamilyWithConfig(ctx, table, family, *config)
} else {
createErr = adminClient.CreateColumnFamily(ctx, table, family)
}
err = createErr
if createErr != nil {
r.Errorf("%+v", createErr.Error())
s, ok := status.FromError(err)
if ok && retryableCodes != nil && !retryableCodes[s.Code()] {
r.Fatalf("%+v", createErr.Error())
}
if ok && s.Code() == codes.AlreadyExists {
// delete before retry
err = adminClient.DeleteColumnFamily(ctx, table, family)
}
}
})
return err
}
func formatReadItem(ri ReadItem) string {
// Use the column qualifier only to make the test data briefer.
col := ri.Column[strings.Index(ri.Column, ":")+1:]
return fmt.Sprintf("%s-%s-%s", ri.Row, col, ri.Value)
}
func fill(b, sub []byte) {
for len(b) > len(sub) {
n := copy(b, sub)
b = b[n:]
}
}
func clearTimestamps(r Row) {
for _, ris := range r {
for i := range ris {
ris[i].Timestamp = 0
}
}
}
func deleteTable(ctx context.Context, t *testing.T, ac *AdminClient, name string) error {
bo := gax.Backoff{
Initial: 100 * time.Millisecond,
Max: 2 * time.Second,
Multiplier: 1.2,
}
ctx, cancel := context.WithTimeout(ctx, time.Second*60)
defer cancel()
err := internal.Retry(ctx, bo, func() (bool, error) {
err := ac.DeleteTable(ctx, name)
if err != nil {
return false, err
}
return true, nil
})
if err != nil {
t.Logf("DeleteTable: %v", err)
}
return err
}
func verifyDirectPathRemoteAddress(testEnv IntegrationEnv, t *testing.T) {
t.Helper()
if !testEnv.Config().AttemptDirectPath {
return
}
if remoteIP, res := isDirectPathRemoteAddress(testEnv); !res {
if testEnv.Config().DirectPathIPV4Only {
t.Fatalf("Expect to access DirectPath via ipv4 only, but RPC was destined to %s", remoteIP)
} else {
t.Fatalf("Expect to access DirectPath via ipv4 or ipv6, but RPC was destined to %s", remoteIP)
}
}
}
func isDirectPathRemoteAddress(testEnv IntegrationEnv) (_ string, _ bool) {
remoteIP := testEnv.Peer().Addr.String()
// DirectPath ipv4-only can only use ipv4 traffic.
if testEnv.Config().DirectPathIPV4Only {
return remoteIP, strings.HasPrefix(remoteIP, directPathIPV4Prefix)
}
// DirectPath ipv6 can use either ipv4 or ipv6 traffic.
return remoteIP, strings.HasPrefix(remoteIP, directPathIPV4Prefix) || strings.HasPrefix(remoteIP, directPathIPV6Prefix)
}
func blackholeDirectPath(testEnv IntegrationEnv, t *testing.T) {
cmdRes := exec.Command("bash", "-c", blackholeDpv4Cmd)
out, _ := cmdRes.CombinedOutput()
t.Logf("%+v", string(out))
if testEnv.Config().DirectPathIPV4Only {
return
}
cmdRes = exec.Command("bash", "-c", blackholeDpv6Cmd)
out, _ = cmdRes.CombinedOutput()
t.Logf("%+v", string(out))
}
func allowDirectPath(testEnv IntegrationEnv, t *testing.T) {
cmdRes := exec.Command("bash", "-c", allowDpv4Cmd)
out, _ := cmdRes.CombinedOutput()
t.Logf("%+v", string(out))
if testEnv.Config().DirectPathIPV4Only {
return
}
cmdRes = exec.Command("bash", "-c", allowDpv6Cmd)
out, _ = cmdRes.CombinedOutput()
t.Logf("%+v", string(out))
}