| // Copyright 2026 Google LLC |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package bigtable |
| |
| import ( |
| "context" |
| "fmt" |
| "net/url" |
| "os" |
| "strconv" |
| "time" |
| |
| btpb "cloud.google.com/go/bigtable/apiv2/bigtablepb" |
| btopt "cloud.google.com/go/bigtable/internal/option" |
| btransport "cloud.google.com/go/bigtable/internal/transport" |
| "cloud.google.com/go/internal/trace" |
| gax "github.com/googleapis/gax-go/v2" |
| "google.golang.org/api/option" |
| "google.golang.org/api/option/internaloption" |
| gtransport "google.golang.org/api/transport/grpc" |
| "google.golang.org/grpc" |
| "google.golang.org/grpc/metadata" |
| ) |
| |
| // Client is a client for reading and writing data to tables in an instance. |
| // |
| // A Client is safe to use concurrently, except for its Close method. |
| type Client struct { |
| connPool gtransport.ConnPool |
| client btpb.BigtableClient |
| project, instance string |
| appProfile string |
| metricsTracerFactory *builtinMetricsTracerFactory |
| disableRetryInfo bool |
| retryOption gax.CallOption |
| executeQueryRetryOption gax.CallOption |
| enableDirectAccess bool |
| featureFlagsMD metadata.MD // Pre-computed feature flags metadata to be sent with each request. |
| dynamicScaleMonitor *btransport.DynamicScaleMonitor |
| connsRecycler *btransport.ConnectionRecycler |
| } |
| |
| // ClientConfig has configurations for the client. |
| type ClientConfig struct { |
| // The id of the app profile to associate with all data operations sent from this client. |
| // If unspecified, the default app profile for the instance will be used. |
| AppProfile string |
| |
| // If not set or set to nil, client side metrics will be collected and exported |
| // |
| // To disable client side metrics, set 'MetricsProvider' to 'NoopMetricsProvider' |
| // |
| // TODO: support user provided meter provider |
| MetricsProvider MetricsProvider |
| |
| // DisableDynamicChannelPool disables the dynamic channel resizing based on load |
| // Dynamic channel resizing is enabled by default to resize based on load and avoid queuing of requests. |
| DisableDynamicChannelPool bool |
| |
| // DisableConnectionRecycler disables the automatic preemptive refresh of connection. |
| // Preemptive connection is default to true |
| DisableConnectionRecycler bool |
| } |
| |
| // MetricsProvider is a wrapper for built in metrics meter provider |
| type MetricsProvider interface { |
| isMetricsProvider() |
| } |
| |
| // NoopMetricsProvider can be used to disable built in metrics |
| type NoopMetricsProvider struct{} |
| |
| func (NoopMetricsProvider) isMetricsProvider() {} |
| |
| // NewClient creates a new Client for a given project and instance. |
| // The default ClientConfig will be used. |
| func NewClient(ctx context.Context, project, instance string, opts ...option.ClientOption) (*Client, error) { |
| return NewClientWithConfig(ctx, project, instance, ClientConfig{}, opts...) |
| } |
| |
| // NewClientWithConfig creates a new client with the given config. |
| func NewClientWithConfig(ctx context.Context, project, instance string, config ClientConfig, opts ...option.ClientOption) (*Client, error) { |
| clientCreationTimestamp := time.Now() |
| metricsProvider := config.MetricsProvider |
| if emulatorAddr := os.Getenv("BIGTABLE_EMULATOR_HOST"); emulatorAddr != "" { |
| // Do not emit metrics when emulator is being used |
| metricsProvider = NoopMetricsProvider{} |
| } |
| |
| // Create a OpenTelemetry metrics configuration |
| metricsTracerFactory, err := newBuiltinMetricsTracerFactory(ctx, project, instance, config.AppProfile, metricsProvider, opts...) |
| if err != nil { |
| return nil, err |
| } |
| |
| o, err := btopt.DefaultClientOptions(prodAddr, mtlsProdAddr, Scope, clientUserAgent) |
| if err != nil { |
| return nil, err |
| } |
| // for otel metrics |
| if metricsTracerFactory.enabled { |
| if len(metricsTracerFactory.clientOpts) > 0 { |
| o = append(o, metricsTracerFactory.clientOpts...) |
| } |
| } |
| |
| // Add gRPC client interceptors to supply Google client information. No external interceptors are passed. |
| o = append(o, btopt.ClientInterceptorOptions(nil, nil)...) |
| o = append(o, option.WithGRPCDialOption(grpc.WithStatsHandler(sharedLatencyStatsHandler))) |
| // Default to a small connection pool that can be overridden. |
| o = append(o, |
| option.WithGRPCConnectionPool(4), |
| // Set the max size to correspond to server-side limits. |
| option.WithGRPCDialOption(grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(1<<28), grpc.MaxCallRecvMsgSize(1<<28))), |
| ) |
| |
| enableDirectAccess, _ := strconv.ParseBool(os.Getenv("CBT_ENABLE_DIRECTPATH")) |
| if enableDirectAccess { |
| o = append(o, internaloption.EnableDirectPath(true), internaloption.EnableDirectPathXds()) |
| if disableBoundToken, _ := strconv.ParseBool(os.Getenv("CBT_DISABLE_DIRECTPATH_BOUND_TOKEN")); !disableBoundToken { |
| o = append(o, internaloption.AllowHardBoundTokens("ALTS")) |
| } |
| } |
| |
| // Allow non-default service account in DirectPath. |
| o = append(o, internaloption.AllowNonDefaultServiceAccount(true)) |
| o = append(o, opts...) |
| |
| // TODO(b/372244283): Remove after b/358175516 has been fixed |
| o = append(o, internaloption.EnableAsyncRefreshDryRun(metricsTracerFactory.newAsyncRefreshErrHandler())) |
| |
| disableRetryInfo := false |
| |
| // If DISABLE_RETRY_INFO=1, library does not base retry decision and back off time on server returned RetryInfo value. |
| disableRetryInfoEnv := os.Getenv("DISABLE_RETRY_INFO") |
| disableRetryInfo = disableRetryInfoEnv == "1" |
| retryOption := defaultRetryOption |
| executeQueryRetryOption := defaultExecuteQueryRetryOption |
| if disableRetryInfo { |
| retryOption = clientOnlyRetryOption |
| executeQueryRetryOption = clientOnlyExecuteQueryRetryOption |
| } |
| |
| // Create the feature flags metadata once |
| ffMD := createFeatureFlagsMD(metricsTracerFactory.enabled, disableRetryInfo, enableDirectAccess) |
| |
| var connPool gtransport.ConnPool |
| var connPoolErr error |
| var dsm *btransport.DynamicScaleMonitor |
| var connRecycler *btransport.ConnectionRecycler |
| |
| enableBigtableConnPool := btopt.EnableBigtableConnectionPool() |
| if enableBigtableConnPool { |
| fullInstanceName := fmt.Sprintf("projects/%s/instances/%s", project, instance) |
| |
| btPool, err := btransport.NewBigtableChannelPool(ctx, |
| defaultBigtableConnPoolSize, |
| btopt.BigtableLoadBalancingStrategy(), |
| func() (*btransport.BigtableConn, error) { |
| grpcConn, err := gtransport.Dial(ctx, o...) |
| if err != nil { |
| return nil, err |
| } |
| return btransport.NewBigtableConn(grpcConn), nil |
| }, |
| clientCreationTimestamp, |
| // options |
| btransport.WithInstanceName(fullInstanceName), |
| btransport.WithAppProfile(config.AppProfile), |
| btransport.WithFeatureFlagsMetadata(ffMD), |
| btransport.WithMetricsReporterConfig(btopt.DefaultMetricsReporterConfig()), |
| btransport.WithMeterProvider(metricsTracerFactory.otelMeterProvider), |
| ) |
| |
| if err != nil { |
| connPoolErr = err |
| } else { |
| connPool = btPool |
| |
| // Validate dynamic config early if enabled |
| if !config.DisableDynamicChannelPool { |
| if err := btransport.ValidateDynamicConfig(btopt.DefaultDynamicChannelPoolConfig(), defaultBigtableConnPoolSize); err != nil { |
| return nil, fmt.Errorf("invalid DynamicChannelPoolConfig: %w", err) |
| } |
| |
| dsm = btransport.NewDynamicScaleMonitor(btopt.DefaultDynamicChannelPoolConfig(), btPool) |
| dsm.Start(ctx) // Start the monitor's background goroutine |
| } |
| // connection recyler. |
| if !config.DisableConnectionRecycler { |
| connRecycler = btransport.NewConnectionRecycler(btopt.DefaultConnectionRecycleConfig(), btPool) |
| connRecycler.Start(ctx) // Start the monitor's background goroutine |
| } |
| |
| } |
| |
| } else { |
| // use to regular ConnPool |
| connPool, connPoolErr = gtransport.DialPool(ctx, o...) |
| } |
| |
| if connPoolErr != nil { |
| return nil, connPoolErr |
| } |
| |
| return &Client{ |
| connPool: connPool, |
| client: btpb.NewBigtableClient(connPool), |
| project: project, |
| instance: instance, |
| appProfile: config.AppProfile, |
| metricsTracerFactory: metricsTracerFactory, |
| disableRetryInfo: disableRetryInfo, |
| retryOption: retryOption, |
| executeQueryRetryOption: executeQueryRetryOption, |
| enableDirectAccess: enableDirectAccess, |
| featureFlagsMD: ffMD, |
| dynamicScaleMonitor: dsm, |
| connsRecycler: connRecycler, |
| }, nil |
| } |
| |
| // Close closes the Client. |
| func (c *Client) Close() error { |
| if c.dynamicScaleMonitor != nil { |
| c.dynamicScaleMonitor.Stop() |
| } |
| if c.metricsTracerFactory != nil { |
| c.metricsTracerFactory.shutdown() |
| } |
| if c.connsRecycler != nil { |
| c.connsRecycler.Stop() |
| } |
| return c.connPool.Close() |
| } |
| |
| func (c *Client) fullInstanceName() string { |
| return fmt.Sprintf("projects/%s/instances/%s", c.project, c.instance) |
| } |
| |
| func (c *Client) fullTableName(table string) string { |
| return fmt.Sprintf("projects/%s/instances/%s/tables/%s", c.project, c.instance, table) |
| } |
| |
| func (c *Client) fullAuthorizedViewName(table string, authorizedView string) string { |
| return fmt.Sprintf("projects/%s/instances/%s/tables/%s/authorizedViews/%s", c.project, c.instance, table, authorizedView) |
| } |
| |
| func (c *Client) fullMaterializedViewName(materializedView string) string { |
| return fmt.Sprintf("projects/%s/instances/%s/materializedViews/%s", c.project, c.instance, materializedView) |
| } |
| |
| func (c *Client) reqParamsHeaderValTable(table string) string { |
| return fmt.Sprintf("table_name=%s&app_profile_id=%s", url.QueryEscape(c.fullTableName(table)), url.QueryEscape(c.appProfile)) |
| } |
| |
| func (c *Client) reqParamsHeaderValInstance() string { |
| return fmt.Sprintf("name=%s&app_profile_id=%s", url.QueryEscape(c.fullInstanceName()), url.QueryEscape(c.appProfile)) |
| } |
| |
| // Open opens a table. |
| func (c *Client) Open(table string) *Table { |
| return &Table{ |
| c: c, |
| table: table, |
| md: metadata.Join(metadata.Pairs( |
| resourcePrefixHeader, c.fullTableName(table), |
| requestParamsHeader, c.reqParamsHeaderValTable(table), |
| ), c.featureFlagsMD), |
| } |
| } |
| |
| // OpenTable opens a table. |
| func (c *Client) OpenTable(table string) TableAPI { |
| return &tableImpl{Table{ |
| c: c, |
| table: table, |
| md: metadata.Join(metadata.Pairs( |
| resourcePrefixHeader, c.fullTableName(table), |
| requestParamsHeader, c.reqParamsHeaderValTable(table), |
| ), c.featureFlagsMD), |
| }} |
| } |
| |
| // OpenAuthorizedView opens an authorized view. |
| func (c *Client) OpenAuthorizedView(table, authorizedView string) TableAPI { |
| return &tableImpl{Table{ |
| c: c, |
| table: table, |
| md: metadata.Join(metadata.Pairs( |
| resourcePrefixHeader, c.fullAuthorizedViewName(table, authorizedView), |
| requestParamsHeader, c.reqParamsHeaderValTable(table), |
| ), c.featureFlagsMD), |
| authorizedView: authorizedView, |
| }} |
| } |
| |
| // OpenMaterializedView opens a materialized view. |
| func (c *Client) OpenMaterializedView(materializedView string) TableAPI { |
| return &tableImpl{Table{ |
| c: c, |
| md: metadata.Join(metadata.Pairs( |
| resourcePrefixHeader, c.fullMaterializedViewName(materializedView), |
| requestParamsHeader, c.reqParamsHeaderValTable(materializedView), |
| ), c.featureFlagsMD), |
| materializedView: materializedView, |
| }} |
| } |
| |
| // PingAndWarm pings the server and warms up the connection. |
| func (c *Client) PingAndWarm(ctx context.Context) (err error) { |
| md := metadata.Join(metadata.Pairs( |
| resourcePrefixHeader, c.fullInstanceName(), |
| requestParamsHeader, c.reqParamsHeaderValInstance(), |
| ), c.featureFlagsMD) |
| |
| ctx = mergeOutgoingMetadata(ctx, md) |
| ctx = trace.StartSpan(ctx, "cloud.google.com/go/bigtable/PingAndWarm") |
| defer func() { trace.EndSpan(ctx, err) }() |
| mt := c.newBuiltinMetricsTracer(ctx, "", false) |
| defer mt.recordOperationCompletion() |
| |
| err = c.pingerWithMetadata(ctx, mt) |
| statusCode, statusErr := convertToGrpcStatusErr(err) |
| mt.currOp.setStatus(statusCode.String()) |
| return statusErr |
| } |
| |
| func (c *Client) pingerWithMetadata(ctx context.Context, mt *builtinMetricsTracer) (err error) { |
| req := &btpb.PingAndWarmRequest{ |
| Name: c.fullInstanceName(), |
| AppProfileId: c.appProfile, |
| } |
| err = gaxInvokeWithRecorder(ctx, mt, "PingAndWarm", func(ctx context.Context, headerMD, trailerMD *metadata.MD, _ gax.CallSettings) error { |
| var err error |
| _, err = c.client.PingAndWarm(ctx, req, grpc.Header(headerMD), grpc.Trailer(trailerMD)) |
| return err |
| }) |
| |
| return err |
| |
| } |
| |
| func (c *Client) newBuiltinMetricsTracer(ctx context.Context, table string, isStreaming bool) *builtinMetricsTracer { |
| mt := c.metricsTracerFactory.createBuiltinMetricsTracer(ctx, table, isStreaming) |
| return &mt |
| } |