blob: 4bacf2c01a9e2f03e306de874c5133d6ec45d725 [file] [log] [blame]
// Copyright 2015 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package bigquery
import (
"errors"
"fmt"
"golang.org/x/net/context"
"google.golang.org/api/iterator"
)
// A pageFetcher returns a page of rows, starting from the row specified by token.
type pageFetcher interface {
fetch(ctx context.Context, s service, token string) (*readDataResult, error)
setPaging(*pagingConf)
}
// Iterator provides access to the result of a BigQuery lookup.
// Next must be called before the first call to Get.
type Iterator struct {
service service
err error // contains any error encountered during calls to Next.
// Once Next has been called at least once, schema has the result schema, rs contains the current
// page of data, and nextToken contains the token for fetching the next
// page (empty if there is no more data to be fetched).
schema Schema
rs [][]Value
nextToken string
// The remaining fields contain enough information to fetch the current
// page of data, and determine which row of data from this page is the
// current row.
pf pageFetcher
pageToken string
// The offset from the start of the current page to the current row.
// For a new iterator, this is -1.
offset int64
}
func newIterator(s service, pf pageFetcher) *Iterator {
return &Iterator{
service: s,
pf: pf,
offset: -1,
}
}
// fetchPage loads the current page of data from the server.
// The contents of rs and nextToken are replaced with the loaded data.
// If there is an error while fetching, the error is stored in it.err and false is returned.
func (it *Iterator) fetchPage(ctx context.Context) bool {
var res *readDataResult
var err error
for {
res, err = it.pf.fetch(ctx, it.service, it.pageToken)
if err != errIncompleteJob {
break
}
}
if err != nil {
it.err = err
return false
}
it.schema = res.schema
it.rs = res.rows
it.nextToken = res.pageToken
return true
}
// getEnoughData loads new data into rs until offset no longer points beyond the end of rs.
func (it *Iterator) getEnoughData(ctx context.Context) bool {
if len(it.rs) == 0 {
// Either we have not yet fetched any pages, or we are iterating over an empty dataset.
// In the former case, we should fetch a page of data, so that we can depend on the resultant nextToken.
// In the latter case, it is harmless to fetch a page of data.
if !it.fetchPage(ctx) {
return false
}
}
for it.offset >= int64(len(it.rs)) {
// If offset is still outside the bounds of the loaded data,
// but there are no more pages of data to fetch, then we have
// failed to satisfy the offset.
if it.nextToken == "" {
return false
}
// offset cannot be satisfied with the currently loaded data,
// so we fetch the next page. We no longer need the existing
// cached rows, so we remove them and update the offset to be
// relative to the new page that we're about to fetch.
// NOTE: we can't just set offset to 0, because after
// marshalling/unmarshalling, it's possible for the offset to
// point arbitrarily far beyond the end of rs.
// This can happen if the server returns a different size
// results page before and after marshalling.
it.offset -= int64(len(it.rs))
it.pageToken = it.nextToken
if !it.fetchPage(ctx) {
return false
}
}
return true
}
// Next advances the Iterator to the next row, making that row available
// via the Get method.
// Next must be called before the first call to Get or Schema, and blocks until data is available.
// Next returns false when there are no more rows available, either because
// the end of the output was reached, or because there was an error (consult
// the Err method to determine which).
func (it *Iterator) Next(ctx context.Context) bool {
if it.err != nil {
return false
}
// Advance offset to where we want it to be for the next call to Get.
it.offset++
// offset may now point beyond the end of rs, so we fetch data
// until offset is within its bounds again. If there are no more
// results available, offset will be left pointing beyond the bounds
// of rs.
// At the end of this method, rs will contain at least one element
// unless the dataset we are iterating over is empty.
return it.getEnoughData(ctx)
}
// Err returns the last error encountered by Next, or nil for no error.
func (it *Iterator) Err() error {
return it.err
}
// verifyState checks that the iterator is pointing to a valid row.
func (it *Iterator) verifyState() error {
if it.err != nil {
return fmt.Errorf("called on iterator in error state: %v", it.err)
}
// If Next has been called, then offset should always index into a
// valid row in rs, as long as there is still data available.
if it.offset >= int64(len(it.rs)) || it.offset < 0 {
return errors.New("called without preceding successful call to Next")
}
return nil
}
// Get loads the current row into dst, which must implement ValueLoader.
func (it *Iterator) Get(dst interface{}) error {
if err := it.verifyState(); err != nil {
return fmt.Errorf("Get %v", err)
}
if dst, ok := dst.(ValueLoader); ok {
return dst.Load(it.rs[it.offset])
}
return errors.New("Get called with unsupported argument type")
}
// Schema returns the schema of the result rows.
func (it *Iterator) Schema() (Schema, error) {
if err := it.verifyState(); err != nil {
return nil, fmt.Errorf("Schema %v", err)
}
return it.schema, nil
}
////////////////////////////////////////////////////////////////
// New iterator implementation: will replace the old
// TODO(jba) replace Job.Read with Job.readRows.
func (j *Job) readRows(ctx context.Context) (*RowIterator, error) {
conf := &readQueryConf{}
if err := j.customizeReadQuery(conf); err != nil {
return nil, err
}
return newRowIterator(ctx, j.service, conf), nil
}
// TODO(jba) replace Table.Read with Table.readRows.
// Note: no error return.
func (t *Table) readRows(ctx context.Context) *RowIterator {
conf := &readTableConf{}
t.customizeReadSrc(conf)
return newRowIterator(ctx, t.c.service, conf)
}
func newRowIterator(ctx context.Context, s service, pf pageFetcher) *RowIterator {
it := &RowIterator{
ctx: ctx,
service: s,
pf: pf,
schemaErr: errors.New("called without preceding successful call to Next"),
}
it.pageInfo, it.nextFunc = iterator.NewPageInfo(
it.fetch,
func() int { return len(it.rows) },
func() interface{} { r := it.rows; it.rows = nil; return r })
return it
}
// A RowIterator provides access to the result of a BigQuery lookup.
type RowIterator struct {
ctx context.Context
service service
pf pageFetcher
pageInfo *iterator.PageInfo
nextFunc func() error
// StartIndex can be set before the first call to Next. If PageInfo().PageToken
// is also set, StartIndex is ignored.
StartIndex uint64
rows [][]Value
schema Schema // populated on first call to fech
schemaErr error
}
// Next loads the next row into dst. Its return value is iterator.Done if there
// are no more results. Once Next returns iterator.Done, all subsequent calls
// will return iterator.Done.
func (it *RowIterator) Next(dst ValueLoader) error {
if err := it.nextFunc(); err != nil {
return err
}
row := it.rows[0]
it.rows = it.rows[1:]
return dst.Load(row)
}
// PageInfo supports pagination. See the google.golang.org/api/iterator package for details.
func (it *RowIterator) PageInfo() *iterator.PageInfo { return it.pageInfo }
// Schema returns the schema of the result rows.
func (it *RowIterator) Schema() (Schema, error) {
if it.schemaErr != nil {
return nil, fmt.Errorf("Schema %v", it.schemaErr)
}
return it.schema, nil
}
func (it *RowIterator) fetch(pageSize int, pageToken string) (string, error) {
pc := &pagingConf{}
if pageSize > 0 {
pc.recordsPerRequest = int64(pageSize)
pc.setRecordsPerRequest = true
}
if pageToken == "" {
pc.startIndex = it.StartIndex
}
it.pf.setPaging(pc)
res, err := it.pf.fetch(it.ctx, it.service, pageToken)
if err != nil {
it.schemaErr = err
return "", err
}
it.rows = append(it.rows, res.rows...)
it.schema = res.schema
it.schemaErr = nil
return res.pageToken, nil
}