| // Copyright 2015 Google LLC |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package bigquery |
| |
| import ( |
| "context" |
| "errors" |
| "fmt" |
| "time" |
| |
| "cloud.google.com/go/internal/trace" |
| "cloud.google.com/go/internal/uid" |
| bq "google.golang.org/api/bigquery/v2" |
| ) |
| |
| // QueryConfig holds the configuration for a query job. |
| type QueryConfig struct { |
| // Dst is the table into which the results of the query will be written. |
| // If this field is nil, a temporary table will be created. |
| Dst *Table |
| |
| // The query to execute. See https://cloud.google.com/bigquery/query-reference for details. |
| Q string |
| |
| // DefaultProjectID and DefaultDatasetID specify the dataset to use for unqualified table names in the query. |
| // If DefaultProjectID is set, DefaultDatasetID must also be set. |
| DefaultProjectID string |
| DefaultDatasetID string |
| |
| // TableDefinitions describes data sources outside of BigQuery. |
| // The map keys may be used as table names in the query string. |
| // |
| // When a QueryConfig is returned from Job.Config, the map values |
| // are always of type *ExternalDataConfig. |
| TableDefinitions map[string]ExternalData |
| |
| // CreateDisposition specifies the circumstances under which the destination table will be created. |
| // The default is CreateIfNeeded. |
| CreateDisposition TableCreateDisposition |
| |
| // WriteDisposition specifies how existing data in the destination table is treated. |
| // The default is WriteEmpty. |
| WriteDisposition TableWriteDisposition |
| |
| // DisableQueryCache prevents results being fetched from the query cache. |
| // If this field is false, results are fetched from the cache if they are available. |
| // The query cache is a best-effort cache that is flushed whenever tables in the query are modified. |
| // Cached results are only available when TableID is unspecified in the query's destination Table. |
| // For more information, see https://cloud.google.com/bigquery/querying-data#querycaching |
| DisableQueryCache bool |
| |
| // DisableFlattenedResults prevents results being flattened. |
| // If this field is false, results from nested and repeated fields are flattened. |
| // DisableFlattenedResults implies AllowLargeResults |
| // For more information, see https://cloud.google.com/bigquery/docs/data#nested |
| DisableFlattenedResults bool |
| |
| // AllowLargeResults allows the query to produce arbitrarily large result tables. |
| // The destination must be a table. |
| // When using this option, queries will take longer to execute, even if the result set is small. |
| // For additional limitations, see https://cloud.google.com/bigquery/querying-data#largequeryresults |
| AllowLargeResults bool |
| |
| // Priority specifies the priority with which to schedule the query. |
| // The default priority is InteractivePriority. |
| // For more information, see https://cloud.google.com/bigquery/querying-data#batchqueries |
| Priority QueryPriority |
| |
| // MaxBillingTier sets the maximum billing tier for a Query. |
| // Queries that have resource usage beyond this tier will fail (without |
| // incurring a charge). If this field is zero, the project default will be used. |
| MaxBillingTier int |
| |
| // MaxBytesBilled limits the number of bytes billed for |
| // this job. Queries that would exceed this limit will fail (without incurring |
| // a charge). |
| // If this field is less than 1, the project default will be |
| // used. |
| MaxBytesBilled int64 |
| |
| // UseStandardSQL causes the query to use standard SQL. The default. |
| // Deprecated: use UseLegacySQL. |
| UseStandardSQL bool |
| |
| // UseLegacySQL causes the query to use legacy SQL. |
| UseLegacySQL bool |
| |
| // Parameters is a list of query parameters. The presence of parameters |
| // implies the use of standard SQL. |
| // If the query uses positional syntax ("?"), then no parameter may have a name. |
| // If the query uses named syntax ("@p"), then all parameters must have names. |
| // It is illegal to mix positional and named syntax. |
| Parameters []QueryParameter |
| |
| // TimePartitioning specifies time-based partitioning |
| // for the destination table. |
| TimePartitioning *TimePartitioning |
| |
| // RangePartitioning specifies integer range-based partitioning |
| // for the destination table. |
| RangePartitioning *RangePartitioning |
| |
| // Clustering specifies the data clustering configuration for the destination table. |
| Clustering *Clustering |
| |
| // The labels associated with this job. |
| Labels map[string]string |
| |
| // If true, don't actually run this job. A valid query will return a mostly |
| // empty response with some processing statistics, while an invalid query will |
| // return the same error it would if it wasn't a dry run. |
| // |
| // Query.Read will fail with dry-run queries. Call Query.Run instead, and then |
| // call LastStatus on the returned job to get statistics. Calling Status on a |
| // dry-run job will fail. |
| DryRun bool |
| |
| // Custom encryption configuration (e.g., Cloud KMS keys). |
| DestinationEncryptionConfig *EncryptionConfig |
| |
| // Allows the schema of the destination table to be updated as a side effect of |
| // the query job. |
| SchemaUpdateOptions []string |
| |
| // CreateSession will trigger creation of a new session when true. |
| CreateSession bool |
| |
| // ConnectionProperties are optional key-values settings. |
| ConnectionProperties []*ConnectionProperty |
| |
| // Sets a best-effort deadline on a specific job. If job execution exceeds this |
| // timeout, BigQuery may attempt to cancel this work automatically. |
| // |
| // This deadline cannot be adjusted or removed once the job is created. Consider |
| // using Job.Cancel in situations where you need more dynamic behavior. |
| // |
| // Experimental: this option is experimental and may be modified or removed in future versions, |
| // regardless of any other documented package stability guarantees. |
| JobTimeout time.Duration |
| } |
| |
| func (qc *QueryConfig) toBQ() (*bq.JobConfiguration, error) { |
| qconf := &bq.JobConfigurationQuery{ |
| Query: qc.Q, |
| CreateDisposition: string(qc.CreateDisposition), |
| WriteDisposition: string(qc.WriteDisposition), |
| AllowLargeResults: qc.AllowLargeResults, |
| Priority: string(qc.Priority), |
| MaximumBytesBilled: qc.MaxBytesBilled, |
| TimePartitioning: qc.TimePartitioning.toBQ(), |
| RangePartitioning: qc.RangePartitioning.toBQ(), |
| Clustering: qc.Clustering.toBQ(), |
| DestinationEncryptionConfiguration: qc.DestinationEncryptionConfig.toBQ(), |
| SchemaUpdateOptions: qc.SchemaUpdateOptions, |
| CreateSession: qc.CreateSession, |
| } |
| if len(qc.TableDefinitions) > 0 { |
| qconf.TableDefinitions = make(map[string]bq.ExternalDataConfiguration) |
| } |
| for name, data := range qc.TableDefinitions { |
| qconf.TableDefinitions[name] = data.toBQ() |
| } |
| if qc.DefaultProjectID != "" || qc.DefaultDatasetID != "" { |
| qconf.DefaultDataset = &bq.DatasetReference{ |
| DatasetId: qc.DefaultDatasetID, |
| ProjectId: qc.DefaultProjectID, |
| } |
| } |
| if tier := int64(qc.MaxBillingTier); tier > 0 { |
| qconf.MaximumBillingTier = &tier |
| } |
| f := false |
| if qc.DisableQueryCache { |
| qconf.UseQueryCache = &f |
| } |
| if qc.DisableFlattenedResults { |
| qconf.FlattenResults = &f |
| // DisableFlattenResults implies AllowLargeResults. |
| qconf.AllowLargeResults = true |
| } |
| if qc.UseStandardSQL && qc.UseLegacySQL { |
| return nil, errors.New("bigquery: cannot provide both UseStandardSQL and UseLegacySQL") |
| } |
| if len(qc.Parameters) > 0 && qc.UseLegacySQL { |
| return nil, errors.New("bigquery: cannot provide both Parameters (implying standard SQL) and UseLegacySQL") |
| } |
| ptrue := true |
| pfalse := false |
| if qc.UseLegacySQL { |
| qconf.UseLegacySql = &ptrue |
| } else { |
| qconf.UseLegacySql = &pfalse |
| } |
| if qc.Dst != nil && !qc.Dst.implicitTable() { |
| qconf.DestinationTable = qc.Dst.toBQ() |
| } |
| for _, p := range qc.Parameters { |
| qp, err := p.toBQ() |
| if err != nil { |
| return nil, err |
| } |
| qconf.QueryParameters = append(qconf.QueryParameters, qp) |
| } |
| if len(qc.ConnectionProperties) > 0 { |
| bqcp := make([]*bq.ConnectionProperty, len(qc.ConnectionProperties)) |
| for k, v := range qc.ConnectionProperties { |
| bqcp[k] = v.toBQ() |
| } |
| qconf.ConnectionProperties = bqcp |
| } |
| jc := &bq.JobConfiguration{ |
| Labels: qc.Labels, |
| DryRun: qc.DryRun, |
| Query: qconf, |
| } |
| if qc.JobTimeout > 0 { |
| jc.JobTimeoutMs = qc.JobTimeout.Milliseconds() |
| } |
| return jc, nil |
| } |
| |
| func bqToQueryConfig(q *bq.JobConfiguration, c *Client) (*QueryConfig, error) { |
| qq := q.Query |
| qc := &QueryConfig{ |
| Labels: q.Labels, |
| DryRun: q.DryRun, |
| JobTimeout: time.Duration(q.JobTimeoutMs) * time.Millisecond, |
| Q: qq.Query, |
| CreateDisposition: TableCreateDisposition(qq.CreateDisposition), |
| WriteDisposition: TableWriteDisposition(qq.WriteDisposition), |
| AllowLargeResults: qq.AllowLargeResults, |
| Priority: QueryPriority(qq.Priority), |
| MaxBytesBilled: qq.MaximumBytesBilled, |
| UseLegacySQL: qq.UseLegacySql == nil || *qq.UseLegacySql, |
| TimePartitioning: bqToTimePartitioning(qq.TimePartitioning), |
| RangePartitioning: bqToRangePartitioning(qq.RangePartitioning), |
| Clustering: bqToClustering(qq.Clustering), |
| DestinationEncryptionConfig: bqToEncryptionConfig(qq.DestinationEncryptionConfiguration), |
| SchemaUpdateOptions: qq.SchemaUpdateOptions, |
| CreateSession: qq.CreateSession, |
| } |
| qc.UseStandardSQL = !qc.UseLegacySQL |
| |
| if len(qq.TableDefinitions) > 0 { |
| qc.TableDefinitions = make(map[string]ExternalData) |
| } |
| for name, qedc := range qq.TableDefinitions { |
| edc, err := bqToExternalDataConfig(&qedc) |
| if err != nil { |
| return nil, err |
| } |
| qc.TableDefinitions[name] = edc |
| } |
| if qq.DefaultDataset != nil { |
| qc.DefaultProjectID = qq.DefaultDataset.ProjectId |
| qc.DefaultDatasetID = qq.DefaultDataset.DatasetId |
| } |
| if qq.MaximumBillingTier != nil { |
| qc.MaxBillingTier = int(*qq.MaximumBillingTier) |
| } |
| if qq.UseQueryCache != nil && !*qq.UseQueryCache { |
| qc.DisableQueryCache = true |
| } |
| if qq.FlattenResults != nil && !*qq.FlattenResults { |
| qc.DisableFlattenedResults = true |
| } |
| if qq.DestinationTable != nil { |
| qc.Dst = bqToTable(qq.DestinationTable, c) |
| } |
| for _, qp := range qq.QueryParameters { |
| p, err := bqToQueryParameter(qp) |
| if err != nil { |
| return nil, err |
| } |
| qc.Parameters = append(qc.Parameters, p) |
| } |
| if len(qq.ConnectionProperties) > 0 { |
| props := make([]*ConnectionProperty, len(qq.ConnectionProperties)) |
| for k, v := range qq.ConnectionProperties { |
| props[k] = bqToConnectionProperty(v) |
| } |
| qc.ConnectionProperties = props |
| } |
| return qc, nil |
| } |
| |
| // QueryPriority specifies a priority with which a query is to be executed. |
| type QueryPriority string |
| |
| const ( |
| // BatchPriority specifies that the query should be scheduled with the |
| // batch priority. BigQuery queues each batch query on your behalf, and |
| // starts the query as soon as idle resources are available, usually within |
| // a few minutes. If BigQuery hasn't started the query within 24 hours, |
| // BigQuery changes the job priority to interactive. Batch queries don't |
| // count towards your concurrent rate limit, which can make it easier to |
| // start many queries at once. |
| // |
| // More information can be found at https://cloud.google.com/bigquery/docs/running-queries#batchqueries. |
| BatchPriority QueryPriority = "BATCH" |
| // InteractivePriority specifies that the query should be scheduled with |
| // interactive priority, which means that the query is executed as soon as |
| // possible. Interactive queries count towards your concurrent rate limit |
| // and your daily limit. It is the default priority with which queries get |
| // executed. |
| // |
| // More information can be found at https://cloud.google.com/bigquery/docs/running-queries#queries. |
| InteractivePriority QueryPriority = "INTERACTIVE" |
| ) |
| |
| // A Query queries data from a BigQuery table. Use Client.Query to create a Query. |
| type Query struct { |
| JobIDConfig |
| QueryConfig |
| client *Client |
| } |
| |
| // Query creates a query with string q. |
| // The returned Query may optionally be further configured before its Run method is called. |
| func (c *Client) Query(q string) *Query { |
| return &Query{ |
| client: c, |
| QueryConfig: QueryConfig{Q: q}, |
| } |
| } |
| |
| // Run initiates a query job. |
| func (q *Query) Run(ctx context.Context) (j *Job, err error) { |
| ctx = trace.StartSpan(ctx, "cloud.google.com/go/bigquery.Query.Run") |
| defer func() { trace.EndSpan(ctx, err) }() |
| |
| job, err := q.newJob() |
| if err != nil { |
| return nil, err |
| } |
| j, err = q.client.insertJob(ctx, job, nil) |
| if err != nil { |
| return nil, err |
| } |
| return j, nil |
| } |
| |
| func (q *Query) newJob() (*bq.Job, error) { |
| config, err := q.QueryConfig.toBQ() |
| if err != nil { |
| return nil, err |
| } |
| return &bq.Job{ |
| JobReference: q.JobIDConfig.createJobRef(q.client), |
| Configuration: config, |
| }, nil |
| } |
| |
| // Read submits a query for execution and returns the results via a RowIterator. |
| // If the request can be satisfied by running using the optimized query path, it |
| // is used in place of the jobs.insert path as this path does not expose a job |
| // object. |
| func (q *Query) Read(ctx context.Context) (it *RowIterator, err error) { |
| if q.QueryConfig.DryRun { |
| return nil, errors.New("bigquery: cannot evaluate Query.Read() for dry-run queries") |
| } |
| ctx = trace.StartSpan(ctx, "cloud.google.com/go/bigquery.Query.Run") |
| defer func() { trace.EndSpan(ctx, err) }() |
| queryRequest, err := q.probeFastPath() |
| if err != nil { |
| // Any error means we fallback to the older mechanism. |
| job, err := q.Run(ctx) |
| if err != nil { |
| return nil, err |
| } |
| return job.Read(ctx) |
| } |
| // we have a config, run on fastPath. |
| resp, err := q.client.runQuery(ctx, queryRequest) |
| if err != nil { |
| return nil, err |
| } |
| // construct a minimal job for backing the row iterator. |
| minimalJob := &Job{ |
| c: q.client, |
| jobID: resp.JobReference.JobId, |
| location: resp.JobReference.Location, |
| projectID: resp.JobReference.ProjectId, |
| } |
| if resp.JobComplete { |
| rowSource := &rowSource{ |
| j: minimalJob, |
| // RowIterator can precache results from the iterator to save a lookup. |
| cachedRows: resp.Rows, |
| cachedSchema: resp.Schema, |
| cachedNextToken: resp.PageToken, |
| } |
| return newRowIterator(ctx, rowSource, fetchPage), nil |
| } |
| // We're on the fastPath, but we need to poll because the job is incomplete. |
| // Fallback to job-based Read(). |
| // |
| // (Issue 2937) In order to satisfy basic probing of the job in classic path, |
| // we need to supply additional config which is probed for presence, not contents. |
| // |
| minimalJob.config = &bq.JobConfiguration{ |
| Query: &bq.JobConfigurationQuery{}, |
| } |
| |
| return minimalJob.Read(ctx) |
| } |
| |
| // probeFastPath is used to attempt configuring a jobs.Query request based on a |
| // user's Query configuration. If all the options set on the job are supported on the |
| // faster query path, this method returns a QueryRequest suitable for execution. |
| func (q *Query) probeFastPath() (*bq.QueryRequest, error) { |
| // This is a denylist of settings which prevent us from composing an equivalent |
| // bq.QueryRequest due to differences between configuration parameters accepted |
| // by jobs.insert vs jobs.query. |
| if q.QueryConfig.Dst != nil || |
| q.QueryConfig.TableDefinitions != nil || |
| q.QueryConfig.CreateDisposition != "" || |
| q.QueryConfig.WriteDisposition != "" || |
| !(q.QueryConfig.Priority == "" || q.QueryConfig.Priority == InteractivePriority) || |
| q.QueryConfig.UseLegacySQL || |
| q.QueryConfig.MaxBillingTier != 0 || |
| q.QueryConfig.TimePartitioning != nil || |
| q.QueryConfig.RangePartitioning != nil || |
| q.QueryConfig.Clustering != nil || |
| q.QueryConfig.DestinationEncryptionConfig != nil || |
| q.QueryConfig.SchemaUpdateOptions != nil || |
| q.QueryConfig.JobTimeout != 0 || |
| // User has defined the jobID generation behavior |
| q.JobIDConfig.JobID != "" { |
| return nil, fmt.Errorf("QueryConfig incompatible with fastPath") |
| } |
| pfalse := false |
| qRequest := &bq.QueryRequest{ |
| Query: q.QueryConfig.Q, |
| CreateSession: q.CreateSession, |
| Location: q.Location, |
| UseLegacySql: &pfalse, |
| MaximumBytesBilled: q.QueryConfig.MaxBytesBilled, |
| RequestId: uid.NewSpace("request", nil).New(), |
| Labels: q.Labels, |
| } |
| if q.QueryConfig.DisableQueryCache { |
| qRequest.UseQueryCache = &pfalse |
| } |
| // Convert query parameters |
| for _, p := range q.QueryConfig.Parameters { |
| qp, err := p.toBQ() |
| if err != nil { |
| return nil, err |
| } |
| qRequest.QueryParameters = append(qRequest.QueryParameters, qp) |
| } |
| if q.QueryConfig.DefaultDatasetID != "" { |
| qRequest.DefaultDataset = &bq.DatasetReference{ |
| ProjectId: q.QueryConfig.DefaultProjectID, |
| DatasetId: q.QueryConfig.DefaultDatasetID, |
| } |
| } |
| return qRequest, nil |
| } |
| |
| // ConnectionProperty represents a single key and value pair that can be sent alongside a query request. |
| type ConnectionProperty struct { |
| // Name of the connection property to set. |
| Key string |
| // Value of the connection property. |
| Value string |
| } |
| |
| func (cp *ConnectionProperty) toBQ() *bq.ConnectionProperty { |
| if cp == nil { |
| return nil |
| } |
| return &bq.ConnectionProperty{ |
| Key: cp.Key, |
| Value: cp.Value, |
| } |
| } |
| |
| func bqToConnectionProperty(in *bq.ConnectionProperty) *ConnectionProperty { |
| if in == nil { |
| return nil |
| } |
| return &ConnectionProperty{ |
| Key: in.Key, |
| Value: in.Value, |
| } |
| } |