blob: 2696eaa7c0936e306866e083ccb6de992c8a501e [file] [log] [blame] [edit]
// Copyright 2026 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package bigtable
import (
"bytes"
"context"
"errors"
"fmt"
"hash/crc32"
"io"
"sync"
"time"
btpb "cloud.google.com/go/bigtable/apiv2/bigtablepb"
"cloud.google.com/go/internal/trace"
gax "github.com/googleapis/gax-go/v2"
"github.com/googleapis/gax-go/v2/apierror"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/timestamppb"
)
const (
queryExpiredViolationType = "PREPARED_QUERY_EXPIRED"
preparedQueryExpireEarlyDuration = time.Second
)
// PreparedStatement stores the results of query preparation that can be used to
// create [BoundStatements]s to execute queries.
//
// Whenever possible this should be shared across different instances of the same query,
// in order to amortize query preparation costs.
type PreparedStatement struct {
c *Client
query string
paramTypes map[string]SQLType
opts []PrepareOption
data *preparedQueryData
refreshMutex sync.Mutex
}
type preparedQueryData struct {
// Structure of rows in the response stream of `ExecuteQueryResponse` for the
// returned `prepared_query`.
metadata *btpb.ResultSetMetadata
// A serialized prepared query. It is an opaque
// blob of bytes to send in `ExecuteQueryRequest`.
preparedQuery []byte
// The time at which the prepared query token becomes invalid.
// A token may become invalid early due to changes in the data being read, but
// it provides a guideline to refresh query plans asynchronously.
validUntil *timestamppb.Timestamp
Metadata *ResultRowMetadata
}
func (pqd *preparedQueryData) initializeMetadataAndMap() error {
rrMetadata, err := newResultRowMetadata(pqd.metadata)
if err != nil {
return err
}
pqd.Metadata = rrMetadata
return nil
}
// PrepareOption can be passed while preparing a query statement.
type PrepareOption interface{}
// PrepareStatement prepares a query for execution. If possible, this should be called once and
// reused across requests. This will amortize the cost of query preparation.
//
// The query string can be a parameterized query containing placeholders in the form of @ followed by the parameter name
// Parameter names may consist of any combination of letters, numbers, and underscores.
//
// Parameters can appear anywhere that a literal value is expected. The same parameter name can
// be used more than once, for example: WHERE cf["qualifier1"] = @value OR cf["qualifier2"] = @value
func (c *Client) PrepareStatement(ctx context.Context, query string, paramTypes map[string]SQLType, opts ...PrepareOption) (preparedStatement *PreparedStatement, err error) {
md := metadata.Join(metadata.Pairs(
resourcePrefixHeader, c.fullInstanceName(),
requestParamsHeader, c.reqParamsHeaderValInstance(),
), c.featureFlagsMD)
ctx = mergeOutgoingMetadata(ctx, md)
return c.prepareStatementWithMetadata(ctx, query, paramTypes, opts...)
}
// Called when context already has the required metadata
func (c *Client) prepareStatementWithMetadata(ctx context.Context, query string, paramTypes map[string]SQLType, opts ...PrepareOption) (preparedStatement *PreparedStatement, err error) {
ctx = trace.StartSpan(ctx, "cloud.google.com/go/bigtable.PrepareQuery")
defer func() { trace.EndSpan(ctx, err) }()
mt := c.newBuiltinMetricsTracer(ctx, "", false)
defer mt.recordOperationCompletion()
preparedStatement, err = c.prepareStatement(ctx, mt, query, paramTypes, opts...)
statusCode, statusErr := convertToGrpcStatusErr(err)
mt.setCurrOpStatus(statusCode)
return preparedStatement, statusErr
}
func (c *Client) prepareStatement(ctx context.Context, mt *builtinMetricsTracer, query string, paramTypes map[string]SQLType, opts ...PrepareOption) (*PreparedStatement, error) {
reqParamTypes := map[string]*btpb.Type{}
for k, v := range paramTypes {
if v == nil {
return nil, errors.New("bigtable: invalid SQLType: nil")
}
if !v.isValidPrepareParamType() {
return nil, fmt.Errorf("bigtable: %T cannot be used as parameter type", v)
}
tpb, err := v.typeProto()
if err != nil {
return nil, err
}
reqParamTypes[k] = tpb
}
req := &btpb.PrepareQueryRequest{
InstanceName: c.fullInstanceName(),
AppProfileId: c.appProfile,
Query: query,
DataFormat: &btpb.PrepareQueryRequest_ProtoFormat{
ProtoFormat: &btpb.ProtoFormat{},
},
ParamTypes: reqParamTypes,
}
var res *btpb.PrepareQueryResponse
err := gaxInvokeWithRecorder(ctx, mt, "PrepareQuery", func(ctx context.Context, headerMD, trailerMD *metadata.MD, _ gax.CallSettings) error {
var err error
res, err = c.client.PrepareQuery(ctx, req, grpc.Header(headerMD), grpc.Trailer(trailerMD))
return err
}, c.retryOption)
if err != nil {
return nil, err
}
return &PreparedStatement{
c: c,
data: &preparedQueryData{
metadata: res.Metadata,
preparedQuery: res.PreparedQuery,
validUntil: res.ValidUntil,
},
query: query,
paramTypes: paramTypes,
opts: opts,
}, err
}
// Bind binds a set of parameters to a prepared statement.
//
// Allowed parameter value types are []byte, string, int64, float32, float64, bool,
// time.Time, civil.Date, array, slice and nil
func (ps *PreparedStatement) Bind(values map[string]any) (*BoundStatement, error) {
if ps == nil {
return nil, errors.New("bigtable: nil prepared statement")
}
// check that every parameter is bound
for paramName := range ps.paramTypes {
_, found := values[paramName]
if !found {
return nil, fmt.Errorf("bigtable: parameter %q not bound in call to Bind", paramName)
}
}
boundParams := map[string]*btpb.Value{}
for paramName, paramVal := range values {
// Validate that the parameter was specified during prepare
psType, found := ps.paramTypes[paramName]
if !found {
return nil, errors.New("bigtable: no parameter with name " + paramName + " in prepared statement")
}
// Convert value specified by user to *btpb.Value
pbVal, err := anySQLTypeToPbVal(paramVal, psType)
if err != nil {
return nil, err
}
boundParams[paramName] = pbVal
}
return &BoundStatement{
ps: ps,
params: boundParams,
}, nil
}
func (ps *PreparedStatement) refreshIfInvalid(ctx context.Context) error {
/*
| valid | validEarly | behaviour |
|-------|------------|----------------------|
| true | true | nil |
| false | true | impossible condition |
| true | false | async refresh token |
| false | false | sync refresh token |
*/
valid, validEarly := ps.valid()
if validEarly {
// Token valid
return nil
}
if !valid {
// Token already expired
ps.refreshMutex.Lock()
defer ps.refreshMutex.Unlock()
// Check if token became valid while acquiring lock
valid, _ = ps.valid()
if valid {
return nil
}
return ps.refresh(ctx)
}
// Token about to expire
go func() {
ps.refreshMutex.Lock()
defer ps.refreshMutex.Unlock()
// Check if token became valid while acquiring lock
valid, _ = ps.valid()
if valid {
return
}
ps.refresh(ctx)
}()
return nil
}
// valid is true if the prepared query is valid, and validEarly is true
// if the prepared query is valid and has not reached the early expiration threshold.
func (ps *PreparedStatement) valid() (valid bool, validEarly bool) {
nowTime := time.Now().UTC()
expireTime := ps.data.validUntil.AsTime()
return nowTime.Before(expireTime), nowTime.Add(preparedQueryExpireEarlyDuration).Before(expireTime)
}
func (ps *PreparedStatement) refresh(ctx context.Context) error {
newPs, err := ps.c.prepareStatementWithMetadata(ctx, ps.query, ps.paramTypes, ps.opts...)
if err != nil {
return err
}
ps.data = &preparedQueryData{
metadata: newPs.data.metadata,
preparedQuery: newPs.data.preparedQuery,
validUntil: newPs.data.validUntil,
}
return err
}
// BoundStatement is a statement that has been bound to a set of parameters.
// It is created by calling [PreparedStatement.Bind].
type BoundStatement struct {
ps *PreparedStatement
params map[string]*btpb.Value
}
// ExecuteOption is an optional argument to Execute.
type ExecuteOption interface{}
// Execute executes a previously prepared query. f is called for each row in result set.
// If f returns false, the stream is shut down and Execute returns.
// f owns its argument, and f is called serially in order of results returned.
// f will be executed in the same Go routine as the caller.
func (bs *BoundStatement) Execute(ctx context.Context, f func(ResultRow) bool, opts ...ExecuteOption) (err error) {
md := metadata.Join(metadata.Pairs(
resourcePrefixHeader, bs.ps.c.fullInstanceName(),
requestParamsHeader, bs.ps.c.reqParamsHeaderValInstance(),
), bs.ps.c.featureFlagsMD)
ctx = mergeOutgoingMetadata(ctx, md)
ctx = trace.StartSpan(ctx, "cloud.google.com/go/bigtable.ExecuteQuery")
defer func() { trace.EndSpan(ctx, err) }()
mt := bs.ps.c.newBuiltinMetricsTracer(ctx, "", true)
defer mt.recordOperationCompletion()
err = bs.execute(ctx, f, mt)
statusCode, statusErr := convertToGrpcStatusErr(err)
mt.setCurrOpStatus(statusCode)
return statusErr
}
func newPreparedQueryData(ps *PreparedStatement) *preparedQueryData {
data := *ps.data
return &data
}
func (bs *BoundStatement) execute(ctx context.Context, f func(ResultRow) bool, mt *builtinMetricsTracer) error {
// buffer data constructed from the fields in PartialRows`
var ongoingResultBatch bytes.Buffer
// data buffered since the last non-empty `ResumeToken`
valuesBuffer := []*btpb.Value{}
var resumeToken []byte
receivedResumeToken := false
var prevError error
// Metadata could change on planned query refresh.
// E.g.
// 1. 'SELECT *' request with ps started at t1
// 2. A column family is added to the table
// 3. Some other request triggers refresh of ps at t2
// 4. If the metadata from the refreshed ps at t2 is used, metadata contains the new column family,
// the responses do not (because the request used the plan from t1)`
//
// So, do not use latest metadata from `bs.ps`
var finalizedStmt *preparedQueryData
err := gaxInvokeWithRecorder(ctx, mt, "ExecuteQuery", func(ctx context.Context, headerMD, trailerMD *metadata.MD, _ gax.CallSettings) error {
ctx, cancel := context.WithCancel(ctx) // for aborting the stream
defer cancel()
if isQueryExpiredViolation(prevError) {
// Query could have other expiry conditions apart from time based expiry.
// So, it is possible that the query does not get refreshed in `refreshIfInvalid`
bs.ps.refreshMutex.Lock()
defer bs.ps.refreshMutex.Unlock()
err := bs.ps.refresh(ctx)
if err != nil {
prevError = err
return err
}
}
if !receivedResumeToken {
// Once we have a resume token we need the prepared query to never change
// The Bigtable servive will only send the query expired error for requests without a token
// (before sending any responses).
// We don't want the plan to change on a transient error once we've already received a token.
err := bs.ps.refreshIfInvalid(ctx)
if err != nil {
prevError = err
return err
}
}
candFinalizedStmt := finalizedStmt
if candFinalizedStmt == nil {
candFinalizedStmt = newPreparedQueryData(bs.ps)
}
req := &btpb.ExecuteQueryRequest{
InstanceName: bs.ps.c.fullInstanceName(),
AppProfileId: bs.ps.c.appProfile,
PreparedQuery: candFinalizedStmt.preparedQuery,
Params: bs.params,
}
stream, err := bs.ps.c.client.ExecuteQuery(ctx, req)
if err != nil {
prevError = err
return err
}
// Ignore error since header is only being used to record builtin metrics
// Failure to record metrics should not fail the operation
*headerMD, _ = stream.Header()
eqResp := new(btpb.ExecuteQueryResponse)
for {
proto.Reset(eqResp)
err := stream.RecvMsg(eqResp)
if err == io.EOF {
return handleExecuteStreamEnd(stream, trailerMD, valuesBuffer, err, &prevError)
}
if err != nil {
// Setup for next call
req.ResumeToken = resumeToken
return handleExecuteStreamEnd(stream, trailerMD, valuesBuffer, err, &prevError)
}
resp := eqResp.GetResponse()
results, ok := resp.(*btpb.ExecuteQueryResponse_Results)
if !ok {
prevError = errors.New("bigtable: unexpected response type")
return prevError
}
partialResultSet := results.Results
if partialResultSet.GetReset_() {
valuesBuffer = []*btpb.Value{}
ongoingResultBatch.Reset()
}
var batchData []byte
if partialResultSet.GetProtoRowsBatch() != nil {
batchData = partialResultSet.GetProtoRowsBatch().GetBatchData()
ongoingResultBatch.Write(batchData)
}
// Validate checksum if exists
var protoRows *btpb.ProtoRows
if partialResultSet.BatchChecksum != nil {
// Current batch is now complete
// Validate checksum
currBatchChecksum := crc32.Checksum(ongoingResultBatch.Bytes(), crc32cTable)
if *partialResultSet.BatchChecksum != currBatchChecksum {
prevError = errors.New("bigtable: batch_checksum mismatch")
return prevError
}
// Parse the batch
protoRows = new(btpb.ProtoRows)
if err := proto.Unmarshal(ongoingResultBatch.Bytes(), protoRows); err != nil {
prevError = err
return err
}
valuesBuffer = append(valuesBuffer, protoRows.GetValues()...)
// Prepare to receive next batch of results
ongoingResultBatch.Reset()
}
if partialResultSet.GetResumeToken() != nil {
// Values can be yielded to the caller
// If `resume_token` is non-empty and any data has been received since the
// last one, BatchChecksum is guaranteed to be non-empty. In other words, a batch will
// never cross a `resume_token` boundary. It is an error otherwise
if ongoingResultBatch.Len() != 0 &&
partialResultSet.BatchChecksum == nil {
prevError = errors.New("bigtable: received resume_token with buffered data and no batch_checksum")
return prevError
}
if !receivedResumeToken {
// first ResumeToken received
finalizedStmt = candFinalizedStmt
finalizedStmt.initializeMetadataAndMap()
receivedResumeToken = true
}
// Save ResumeToken for subsequent requests
resumeToken = partialResultSet.GetResumeToken()
if finalizedStmt.metadata == nil || finalizedStmt.metadata.GetProtoSchema() == nil {
prevError = errors.New("bigtable: metadata missing")
return prevError
}
cols := finalizedStmt.metadata.GetProtoSchema().GetColumns()
numCols := len(cols)
// Parse rows
for len(valuesBuffer) != 0 {
var completeRowValues []*btpb.Value
// Pop first 'numCols' values to create a row
if len(valuesBuffer) < numCols {
prevError = fmt.Errorf("bigtable: metadata and data mismatch: %d columns in metadata but received %d values", numCols, len(valuesBuffer))
return prevError
}
completeRowValues, valuesBuffer = valuesBuffer[0:numCols], valuesBuffer[numCols:]
// Construct ResultRow
rr, err := newResultRow(completeRowValues, finalizedStmt.metadata, finalizedStmt.Metadata)
if err != nil {
return err
}
continueReading := f(*rr)
if !continueReading {
// Cancel and drain stream.
cancel()
for {
proto.Reset(eqResp)
if err := stream.RecvMsg(eqResp); err != nil {
handleExecuteStreamEnd(stream, trailerMD, valuesBuffer, err, &prevError)
// The stream has ended. We don't return an error
// because the caller has intentionally interrupted the scan.
return nil
}
}
}
}
}
}
}, bs.ps.c.executeQueryRetryOption)
if err != nil {
return err
}
return nil
}
func handleExecuteStreamEnd(stream btpb.Bigtable_ExecuteQueryClient, trailerMD *metadata.MD, valuesBuffer []*btpb.Value, err error, prevError *error) error {
*prevError = err
if err != nil && err != io.EOF {
return err
}
*trailerMD = stream.Trailer()
if len(valuesBuffer) != 0 {
return errors.New("bigtable: server stream ended without sending a resume token")
}
return nil
}
func clientOnlyExecuteQueryRetry(backoff *gax.Backoff, err error) (time.Duration, bool) {
if isQueryExpiredViolation(err) {
return backoff.Pause(), true
}
return clientOnlyRetry(backoff, err)
}
func isQueryExpiredViolation(err error) bool {
apiErr, ok := apierror.FromError(err)
if ok && apiErr != nil && apiErr.Details().PreconditionFailure != nil && status.Code(err) == codes.FailedPrecondition {
for _, violation := range apiErr.Details().PreconditionFailure.GetViolations() {
if violation != nil && violation.GetType() == queryExpiredViolationType {
return true
}
}
}
return false
}