blob: 15088624f3c39ef39078ce11556bb3f577212064 [file] [log] [blame]
// Copyright 2015 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package bigquery
import (
"context"
"errors"
"fmt"
"io"
"net/http"
"net/url"
"os"
"strings"
"time"
"cloud.google.com/go/bigquery/internal"
cloudinternal "cloud.google.com/go/internal"
"cloud.google.com/go/internal/detect"
"cloud.google.com/go/internal/trace"
"cloud.google.com/go/internal/version"
gax "github.com/googleapis/gax-go/v2"
bq "google.golang.org/api/bigquery/v2"
"google.golang.org/api/googleapi"
"google.golang.org/api/option"
)
const (
// Scope is the Oauth2 scope for the service.
// For relevant BigQuery scopes, see:
// https://developers.google.com/identity/protocols/googlescopes#bigqueryv2
Scope = "https://www.googleapis.com/auth/bigquery"
userAgentPrefix = "gcloud-golang-bigquery"
)
var xGoogHeader = fmt.Sprintf("gl-go/%s gccl/%s", version.Go(), internal.Version)
func setClientHeader(headers http.Header) {
headers.Set("x-goog-api-client", xGoogHeader)
}
// Client may be used to perform BigQuery operations.
type Client struct {
// Location, if set, will be used as the default location for all subsequent
// dataset creation and job operations. A location specified directly in one of
// those operations will override this value.
Location string
projectID string
bqs *bq.Service
rc *readClient
// governs use of preview query features.
enableQueryPreview bool
}
// DetectProjectID is a sentinel value that instructs NewClient to detect the
// project ID. It is given in place of the projectID argument. NewClient will
// use the project ID from the given credentials or the default credentials
// (https://developers.google.com/accounts/docs/application-default-credentials)
// if no credentials were provided. When providing credentials, not all
// options will allow NewClient to extract the project ID. Specifically a JWT
// does not have the project ID encoded.
const DetectProjectID = "*detect-project-id*"
// NewClient constructs a new Client which can perform BigQuery operations.
// Operations performed via the client are billed to the specified GCP project.
//
// If the project ID is set to DetectProjectID, NewClient will attempt to detect
// the project ID from credentials.
//
// This client supports enabling query-related preview features via environmental
// variables. By setting the environment variable QUERY_PREVIEW_ENABLED to the string
// "TRUE", the client will enable preview features, though behavior may still be
// controlled via the bigquery service as well. Currently, the feature(s) in scope
// include: stateless queries (query execution without corresponding job metadata).
func NewClient(ctx context.Context, projectID string, opts ...option.ClientOption) (*Client, error) {
o := []option.ClientOption{
option.WithScopes(Scope),
option.WithUserAgent(fmt.Sprintf("%s/%s", userAgentPrefix, internal.Version)),
}
o = append(o, opts...)
bqs, err := bq.NewService(ctx, o...)
if err != nil {
return nil, fmt.Errorf("bigquery: constructing client: %w", err)
}
// Handle project autodetection.
projectID, err = detect.ProjectID(ctx, projectID, "", opts...)
if err != nil {
return nil, err
}
var preview bool
if v, ok := os.LookupEnv("QUERY_PREVIEW_ENABLED"); ok {
if strings.ToUpper(v) == "TRUE" {
preview = true
}
}
c := &Client{
projectID: projectID,
bqs: bqs,
enableQueryPreview: preview,
}
return c, nil
}
// EnableStorageReadClient sets up Storage API connection to be used when fetching
// large datasets from tables, jobs or queries.
// Currently out of pagination methods like PageInfo().Token and RowIterator.StartIndex
// are not supported when the Storage API is enabled.
// Calling this method twice will return an error.
func (c *Client) EnableStorageReadClient(ctx context.Context, opts ...option.ClientOption) error {
if c.isStorageReadAvailable() {
return fmt.Errorf("failed: storage read client already set up")
}
rc, err := newReadClient(ctx, c.projectID, opts...)
if err != nil {
return err
}
c.rc = rc
return nil
}
func (c *Client) isStorageReadAvailable() bool {
return c.rc != nil
}
// Project returns the project ID or number for this instance of the client, which may have
// either been explicitly specified or autodetected.
func (c *Client) Project() string {
return c.projectID
}
// Close closes any resources held by the client.
// Close should be called when the client is no longer needed.
// It need not be called at program exit.
func (c *Client) Close() error {
if c.isStorageReadAvailable() {
err := c.rc.close()
if err != nil {
return err
}
}
return nil
}
// Calls the Jobs.Insert RPC and returns a Job.
func (c *Client) insertJob(ctx context.Context, job *bq.Job, media io.Reader, mediaOpts ...googleapi.MediaOption) (*Job, error) {
call := c.bqs.Jobs.Insert(c.projectID, job).Context(ctx)
setClientHeader(call.Header())
if media != nil {
call.Media(media, mediaOpts...)
}
var res *bq.Job
var err error
invoke := func() error {
sCtx := trace.StartSpan(ctx, "bigquery.jobs.insert")
res, err = call.Do()
trace.EndSpan(sCtx, err)
return err
}
// A job with a client-generated ID can be retried; the presence of the
// ID makes the insert operation idempotent.
// We don't retry if there is media, because it is an io.Reader. We'd
// have to read the contents and keep it in memory, and that could be expensive.
// TODO(jba): Look into retrying if media != nil.
if job.JobReference != nil && media == nil {
// We deviate from default retries due to BigQuery wanting to retry structured internal job errors.
err = runWithRetryExplicit(ctx, invoke, jobRetryReasons)
} else {
err = invoke()
}
if err != nil {
return nil, err
}
return bqToJob(res, c)
}
// runQuery invokes the optimized query path.
// Due to differences in options it supports, it cannot be used for all existing
// jobs.insert requests that are query jobs.
func (c *Client) runQuery(ctx context.Context, queryRequest *bq.QueryRequest) (*bq.QueryResponse, error) {
call := c.bqs.Jobs.Query(c.projectID, queryRequest).Context(ctx)
setClientHeader(call.Header())
var res *bq.QueryResponse
var err error
invoke := func() error {
sCtx := trace.StartSpan(ctx, "bigquery.jobs.query")
res, err = call.Do()
trace.EndSpan(sCtx, err)
return err
}
// We control request ID, so we can always runWithRetry.
err = runWithRetryExplicit(ctx, invoke, jobRetryReasons)
if err != nil {
return nil, err
}
return res, nil
}
// Convert a number of milliseconds since the Unix epoch to a time.Time.
// Treat an input of zero specially: convert it to the zero time,
// rather than the start of the epoch.
func unixMillisToTime(m int64) time.Time {
if m == 0 {
return time.Time{}
}
return time.Unix(0, m*1e6)
}
// runWithRetry calls the function until it returns nil or a non-retryable error, or
// the context is done.
// See the similar function in ../storage/invoke.go. The main difference is the
// reason for retrying.
func runWithRetry(ctx context.Context, call func() error) error {
return runWithRetryExplicit(ctx, call, defaultRetryReasons)
}
func runWithRetryExplicit(ctx context.Context, call func() error, allowedReasons []string) error {
// These parameters match the suggestions in https://cloud.google.com/bigquery/sla.
backoff := gax.Backoff{
Initial: 1 * time.Second,
Max: 32 * time.Second,
Multiplier: 2,
}
return cloudinternal.Retry(ctx, backoff, func() (stop bool, err error) {
err = call()
if err == nil {
return true, nil
}
return !retryableError(err, allowedReasons), err
})
}
var (
defaultRetryReasons = []string{"backendError", "rateLimitExceeded"}
// These reasons are used exclusively for enqueuing jobs (jobs.insert and jobs.query).
// Using them for polling may cause unwanted retries until context deadline/cancellation/etc.
jobRetryReasons = []string{"backendError", "rateLimitExceeded", "jobRateLimitExceeded", "internalError"}
retry5xxCodes = []int{
http.StatusInternalServerError,
http.StatusBadGateway,
http.StatusServiceUnavailable,
http.StatusGatewayTimeout,
}
)
// retryableError is the unary retry predicate for this library. In addition to structured error
// reasons, it specifies some HTTP codes (500, 502, 503, 504) and network/transport reasons.
func retryableError(err error, allowedReasons []string) bool {
if err == nil {
return false
}
if err == io.ErrUnexpectedEOF {
return true
}
// Special case due to http2: https://github.com/googleapis/google-cloud-go/issues/1793
// Due to Go's default being higher for streams-per-connection than is accepted by the
// BQ backend, it's possible to get streams refused immediately after a connection is
// started but before we receive SETTINGS frame from the backend. This generally only
// happens when we try to enqueue > 100 requests onto a newly initiated connection.
if err.Error() == "http2: stream closed" {
return true
}
switch e := err.(type) {
case *googleapi.Error:
// We received a structured error from backend.
var reason string
if len(e.Errors) > 0 {
reason = e.Errors[0].Reason
for _, r := range allowedReasons {
if reason == r {
return true
}
}
}
for _, code := range retry5xxCodes {
if e.Code == code {
return true
}
}
case *url.Error:
retryable := []string{"connection refused", "connection reset"}
for _, s := range retryable {
if strings.Contains(e.Error(), s) {
return true
}
}
case interface{ Temporary() bool }:
if e.Temporary() {
return true
}
}
// Check wrapped error.
return retryableError(errors.Unwrap(err), allowedReasons)
}