blob: 10471184f5445d6df46b28f357a90f8374d6fa53 [file] [log] [blame]
// Copyright 2023 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package bigquery
import (
"context"
"errors"
"fmt"
"io"
"testing"
"time"
"cloud.google.com/go/bigquery/storage/apiv1/storagepb"
gax "github.com/googleapis/gax-go/v2"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
func TestStorageIteratorRetry(t *testing.T) {
cancelledCtx, cancel := context.WithCancel(context.Background())
cancel()
testCases := []struct {
ctx context.Context
desc string
errors []error
wantFail bool
}{
{
desc: "no error",
errors: []error{},
wantFail: false,
},
{
desc: "transient failures",
errors: []error{
status.Errorf(codes.DeadlineExceeded, "try 1"),
status.Errorf(codes.Unavailable, "try 2"),
status.Errorf(codes.Canceled, "try 3"),
status.Errorf(codes.Internal, "try 4"),
},
wantFail: false,
},
{
desc: "not enough permission",
errors: []error{
status.Errorf(codes.PermissionDenied, "the user does not have 'bigquery.readsessions.getData' permission"),
},
wantFail: true,
},
{
desc: "permanent error",
errors: []error{
status.Errorf(codes.InvalidArgument, "invalid args"),
},
wantFail: true,
},
{
ctx: cancelledCtx,
desc: "context cancelled/deadline exceeded",
errors: []error{
fmt.Errorf("random error"),
fmt.Errorf("another random error"),
fmt.Errorf("yet another random error"),
},
wantFail: true,
},
}
for _, tc := range testCases {
rrc := &testReadRowsClient{
errors: tc.errors,
}
readRowsFuncs := map[string]func(context.Context, *storagepb.ReadRowsRequest, ...gax.CallOption) (storagepb.BigQueryRead_ReadRowsClient, error){
"readRows fail on first call": func(ctx context.Context, req *storagepb.ReadRowsRequest, opts ...gax.CallOption) (storagepb.BigQueryRead_ReadRowsClient, error) {
if len(tc.errors) == 0 {
return &testReadRowsClient{}, nil
}
err := tc.errors[0]
tc.errors = tc.errors[1:]
if err != nil {
return nil, err
}
return &testReadRowsClient{}, nil
},
"readRows fails on Recv": func(ctx context.Context, req *storagepb.ReadRowsRequest, opts ...gax.CallOption) (storagepb.BigQueryRead_ReadRowsClient, error) {
return rrc, nil
},
}
for readRowsFuncType, readRowsFunc := range readRowsFuncs {
baseCtx := tc.ctx
if baseCtx == nil {
baseCtx = context.Background()
}
ctx, cancel := context.WithTimeout(baseCtx, 5*time.Second)
defer cancel()
it, err := newRawStorageRowIterator(&readSession{
ctx: ctx,
settings: defaultReadClientSettings(),
readRowsFunc: readRowsFunc,
bqSession: &storagepb.ReadSession{},
}, Schema{})
if err != nil {
t.Fatalf("case %s: newRawStorageRowIterator: %v", tc.desc, err)
}
it.processStream("test-stream")
if errors.Is(it.ctx.Err(), context.Canceled) || errors.Is(it.ctx.Err(), context.DeadlineExceeded) {
if tc.wantFail {
continue
}
t.Fatalf("case %s(%s): deadline exceeded", tc.desc, readRowsFuncType)
}
if tc.wantFail && len(it.errs) == 0 {
t.Fatalf("case %s(%s):want test to fail, but found no errors", tc.desc, readRowsFuncType)
}
if !tc.wantFail && len(it.errs) > 0 {
t.Fatalf("case %s(%s):test should not fail, but found %d errors", tc.desc, readRowsFuncType, len(it.errs))
}
}
}
}
type testReadRowsClient struct {
storagepb.BigQueryRead_ReadRowsClient
responses []*storagepb.ReadRowsResponse
errors []error
}
func (trrc *testReadRowsClient) Recv() (*storagepb.ReadRowsResponse, error) {
if len(trrc.errors) > 0 {
err := trrc.errors[0]
trrc.errors = trrc.errors[1:]
return nil, err
}
if len(trrc.responses) > 0 {
r := trrc.responses[0]
trrc.responses = trrc.responses[:1]
return r, nil
}
return nil, io.EOF
}