// Copyright 2015 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//      http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package bigquery

import (
	"context"
	"errors"
	"fmt"
	"time"

	"cloud.google.com/go/internal/optional"
	"cloud.google.com/go/internal/trace"
	bq "google.golang.org/api/bigquery/v2"
)

// A Table is a reference to a BigQuery table.
type Table struct {
	// ProjectID, DatasetID and TableID may be omitted if the Table is the destination for a query.
	// In this case the result will be stored in an ephemeral table.
	ProjectID string
	DatasetID string
	// TableID must contain only letters (a-z, A-Z), numbers (0-9), or underscores (_).
	// The maximum length is 1,024 characters.
	TableID string

	c *Client
}

// TableMetadata contains information about a BigQuery table.
type TableMetadata struct {
	// The following fields can be set when creating a table.

	// The user-friendly name for the table.
	Name string

	// The user-friendly description of the table.
	Description string

	// The table schema. If provided on create, ViewQuery must be empty.
	Schema Schema

	// The query to use for a view. If provided on create, Schema must be nil.
	ViewQuery string

	// Use Legacy SQL for the view query.
	// At most one of UseLegacySQL and UseStandardSQL can be true.
	UseLegacySQL bool

	// Use Standard SQL for the view query. The default.
	// At most one of UseLegacySQL and UseStandardSQL can be true.
	// Deprecated: use UseLegacySQL.
	UseStandardSQL bool

	// If non-nil, the table is partitioned by time.
	TimePartitioning *TimePartitioning

	// If set to true, queries that reference this table must specify a
	// partition filter (e.g. a WHERE clause) that can be used to eliminate
	// partitions. Used to prevent unintentional full data scans on large
	// partitioned tables.
	RequirePartitionFilter bool

	// Clustering specifies the data clustering configuration for the table.
	Clustering *Clustering

	// The time when this table expires. If set, this table will expire at the
	// specified time. Expired tables will be deleted and their storage
	// reclaimed. The zero value is ignored.
	ExpirationTime time.Time

	// User-provided labels.
	Labels map[string]string

	// Information about a table stored outside of BigQuery.
	ExternalDataConfig *ExternalDataConfig

	// Custom encryption configuration (e.g., Cloud KMS keys).
	EncryptionConfig *EncryptionConfig

	// All the fields below are read-only.

	FullID           string // An opaque ID uniquely identifying the table.
	Type             TableType
	CreationTime     time.Time
	LastModifiedTime time.Time

	// The size of the table in bytes.
	// This does not include data that is being buffered during a streaming insert.
	NumBytes int64

	// The number of bytes in the table considered "long-term storage" for reduced
	// billing purposes.  See https://cloud.google.com/bigquery/pricing#long-term-storage
	// for more information.
	NumLongTermBytes int64

	// The number of rows of data in this table.
	// This does not include data that is being buffered during a streaming insert.
	NumRows uint64

	// Contains information regarding this table's streaming buffer, if one is
	// present. This field will be nil if the table is not being streamed to or if
	// there is no data in the streaming buffer.
	StreamingBuffer *StreamingBuffer

	// ETag is the ETag obtained when reading metadata. Pass it to Table.Update to
	// ensure that the metadata hasn't changed since it was read.
	ETag string
}

// TableCreateDisposition specifies the circumstances under which destination table will be created.
// Default is CreateIfNeeded.
type TableCreateDisposition string

const (
	// CreateIfNeeded will create the table if it does not already exist.
	// Tables are created atomically on successful completion of a job.
	CreateIfNeeded TableCreateDisposition = "CREATE_IF_NEEDED"

	// CreateNever ensures the table must already exist and will not be
	// automatically created.
	CreateNever TableCreateDisposition = "CREATE_NEVER"
)

// TableWriteDisposition specifies how existing data in a destination table is treated.
// Default is WriteAppend.
type TableWriteDisposition string

const (
	// WriteAppend will append to any existing data in the destination table.
	// Data is appended atomically on successful completion of a job.
	WriteAppend TableWriteDisposition = "WRITE_APPEND"

	// WriteTruncate overrides the existing data in the destination table.
	// Data is overwritten atomically on successful completion of a job.
	WriteTruncate TableWriteDisposition = "WRITE_TRUNCATE"

	// WriteEmpty fails writes if the destination table already contains data.
	WriteEmpty TableWriteDisposition = "WRITE_EMPTY"
)

// TableType is the type of table.
type TableType string

const (
	// RegularTable is a regular table.
	RegularTable TableType = "TABLE"
	// ViewTable is a table type describing that the table is view. See more
	// information at https://cloud.google.com/bigquery/docs/views.
	ViewTable TableType = "VIEW"
	// ExternalTable is a table type describing that the table is an external
	// table (also known as a federated data source). See more information at
	// https://cloud.google.com/bigquery/external-data-sources.
	ExternalTable TableType = "EXTERNAL"
)

// TimePartitioning describes the time-based date partitioning on a table.
// For more information see: https://cloud.google.com/bigquery/docs/creating-partitioned-tables.
type TimePartitioning struct {
	// The amount of time to keep the storage for a partition.
	// If the duration is empty (0), the data in the partitions do not expire.
	Expiration time.Duration

	// If empty, the table is partitioned by pseudo column '_PARTITIONTIME'; if set, the
	// table is partitioned by this field. The field must be a top-level TIMESTAMP or
	// DATE field. Its mode must be NULLABLE or REQUIRED.
	Field string

	// If set to true, queries that reference this table must specify a
	// partition filter (e.g. a WHERE clause) that can be used to eliminate
	// partitions. Used to prevent unintentional full data scans on large
	// partitioned tables.
	// DEPRECATED: use the top-level RequirePartitionFilter in TableMetadata.
	RequirePartitionFilter bool
}

func (p *TimePartitioning) toBQ() *bq.TimePartitioning {
	if p == nil {
		return nil
	}
	return &bq.TimePartitioning{
		Type:                   "DAY",
		ExpirationMs:           int64(p.Expiration / time.Millisecond),
		Field:                  p.Field,
		RequirePartitionFilter: p.RequirePartitionFilter,
	}
}

func bqToTimePartitioning(q *bq.TimePartitioning) *TimePartitioning {
	if q == nil {
		return nil
	}
	return &TimePartitioning{
		Expiration:             time.Duration(q.ExpirationMs) * time.Millisecond,
		Field:                  q.Field,
		RequirePartitionFilter: q.RequirePartitionFilter,
	}
}

// Clustering governs the organization of data within a partitioned table.
// For more information, see https://cloud.google.com/bigquery/docs/clustered-tables
type Clustering struct {
	Fields []string
}

func (c *Clustering) toBQ() *bq.Clustering {
	if c == nil {
		return nil
	}
	return &bq.Clustering{
		Fields: c.Fields,
	}
}

func bqToClustering(q *bq.Clustering) *Clustering {
	if q == nil {
		return nil
	}
	return &Clustering{
		Fields: q.Fields,
	}
}

// EncryptionConfig configures customer-managed encryption on tables.
type EncryptionConfig struct {
	// Describes the Cloud KMS encryption key that will be used to protect
	// destination BigQuery table. The BigQuery Service Account associated with your
	// project requires access to this encryption key.
	KMSKeyName string
}

func (e *EncryptionConfig) toBQ() *bq.EncryptionConfiguration {
	if e == nil {
		return nil
	}
	return &bq.EncryptionConfiguration{
		KmsKeyName: e.KMSKeyName,
	}
}

func bqToEncryptionConfig(q *bq.EncryptionConfiguration) *EncryptionConfig {
	if q == nil {
		return nil
	}
	return &EncryptionConfig{
		KMSKeyName: q.KmsKeyName,
	}
}

// StreamingBuffer holds information about the streaming buffer.
type StreamingBuffer struct {
	// A lower-bound estimate of the number of bytes currently in the streaming
	// buffer.
	EstimatedBytes uint64

	// A lower-bound estimate of the number of rows currently in the streaming
	// buffer.
	EstimatedRows uint64

	// The time of the oldest entry in the streaming buffer.
	OldestEntryTime time.Time
}

func (t *Table) toBQ() *bq.TableReference {
	return &bq.TableReference{
		ProjectId: t.ProjectID,
		DatasetId: t.DatasetID,
		TableId:   t.TableID,
	}
}

// FullyQualifiedName returns the ID of the table in projectID:datasetID.tableID format.
func (t *Table) FullyQualifiedName() string {
	return fmt.Sprintf("%s:%s.%s", t.ProjectID, t.DatasetID, t.TableID)
}

// implicitTable reports whether Table is an empty placeholder, which signifies that a new table should be created with an auto-generated Table ID.
func (t *Table) implicitTable() bool {
	return t.ProjectID == "" && t.DatasetID == "" && t.TableID == ""
}

// Create creates a table in the BigQuery service.
// Pass in a TableMetadata value to configure the table.
// If tm.View.Query is non-empty, the created table will be of type VIEW.
// If no ExpirationTime is specified, the table will never expire.
// After table creation, a view can be modified only if its table was initially created
// with a view.
func (t *Table) Create(ctx context.Context, tm *TableMetadata) (err error) {
	ctx = trace.StartSpan(ctx, "cloud.google.com/go/bigquery.Table.Create")
	defer func() { trace.EndSpan(ctx, err) }()

	table, err := tm.toBQ()
	if err != nil {
		return err
	}
	table.TableReference = &bq.TableReference{
		ProjectId: t.ProjectID,
		DatasetId: t.DatasetID,
		TableId:   t.TableID,
	}
	req := t.c.bqs.Tables.Insert(t.ProjectID, t.DatasetID, table).Context(ctx)
	setClientHeader(req.Header())
	_, err = req.Do()
	return err
}

func (tm *TableMetadata) toBQ() (*bq.Table, error) {
	t := &bq.Table{}
	if tm == nil {
		return t, nil
	}
	if tm.Schema != nil && tm.ViewQuery != "" {
		return nil, errors.New("bigquery: provide Schema or ViewQuery, not both")
	}
	t.FriendlyName = tm.Name
	t.Description = tm.Description
	t.Labels = tm.Labels
	if tm.Schema != nil {
		t.Schema = tm.Schema.toBQ()
	}
	if tm.ViewQuery != "" {
		if tm.UseStandardSQL && tm.UseLegacySQL {
			return nil, errors.New("bigquery: cannot provide both UseStandardSQL and UseLegacySQL")
		}
		t.View = &bq.ViewDefinition{Query: tm.ViewQuery}
		if tm.UseLegacySQL {
			t.View.UseLegacySql = true
		} else {
			t.View.UseLegacySql = false
			t.View.ForceSendFields = append(t.View.ForceSendFields, "UseLegacySql")
		}
	} else if tm.UseLegacySQL || tm.UseStandardSQL {
		return nil, errors.New("bigquery: UseLegacy/StandardSQL requires ViewQuery")
	}
	t.TimePartitioning = tm.TimePartitioning.toBQ()
	t.Clustering = tm.Clustering.toBQ()
	t.RequirePartitionFilter = tm.RequirePartitionFilter

	if !validExpiration(tm.ExpirationTime) {
		return nil, fmt.Errorf("invalid expiration time: %v.\n"+
			"Valid expiration times are after 1678 and before 2262", tm.ExpirationTime)
	}
	if !tm.ExpirationTime.IsZero() && tm.ExpirationTime != NeverExpire {
		t.ExpirationTime = tm.ExpirationTime.UnixNano() / 1e6
	}
	if tm.ExternalDataConfig != nil {
		edc := tm.ExternalDataConfig.toBQ()
		t.ExternalDataConfiguration = &edc
	}
	t.EncryptionConfiguration = tm.EncryptionConfig.toBQ()
	if tm.FullID != "" {
		return nil, errors.New("cannot set FullID on create")
	}
	if tm.Type != "" {
		return nil, errors.New("cannot set Type on create")
	}
	if !tm.CreationTime.IsZero() {
		return nil, errors.New("cannot set CreationTime on create")
	}
	if !tm.LastModifiedTime.IsZero() {
		return nil, errors.New("cannot set LastModifiedTime on create")
	}
	if tm.NumBytes != 0 {
		return nil, errors.New("cannot set NumBytes on create")
	}
	if tm.NumLongTermBytes != 0 {
		return nil, errors.New("cannot set NumLongTermBytes on create")
	}
	if tm.NumRows != 0 {
		return nil, errors.New("cannot set NumRows on create")
	}
	if tm.StreamingBuffer != nil {
		return nil, errors.New("cannot set StreamingBuffer on create")
	}
	if tm.ETag != "" {
		return nil, errors.New("cannot set ETag on create")
	}
	return t, nil
}

// Metadata fetches the metadata for the table.
func (t *Table) Metadata(ctx context.Context) (md *TableMetadata, err error) {
	ctx = trace.StartSpan(ctx, "cloud.google.com/go/bigquery.Table.Metadata")
	defer func() { trace.EndSpan(ctx, err) }()

	req := t.c.bqs.Tables.Get(t.ProjectID, t.DatasetID, t.TableID).Context(ctx)
	setClientHeader(req.Header())
	var table *bq.Table
	err = runWithRetry(ctx, func() (err error) {
		table, err = req.Do()
		return err
	})
	if err != nil {
		return nil, err
	}
	return bqToTableMetadata(table)
}

func bqToTableMetadata(t *bq.Table) (*TableMetadata, error) {
	md := &TableMetadata{
		Description:            t.Description,
		Name:                   t.FriendlyName,
		Type:                   TableType(t.Type),
		FullID:                 t.Id,
		Labels:                 t.Labels,
		NumBytes:               t.NumBytes,
		NumLongTermBytes:       t.NumLongTermBytes,
		NumRows:                t.NumRows,
		ExpirationTime:         unixMillisToTime(t.ExpirationTime),
		CreationTime:           unixMillisToTime(t.CreationTime),
		LastModifiedTime:       unixMillisToTime(int64(t.LastModifiedTime)),
		ETag:                   t.Etag,
		EncryptionConfig:       bqToEncryptionConfig(t.EncryptionConfiguration),
		RequirePartitionFilter: t.RequirePartitionFilter,
	}
	if t.Schema != nil {
		md.Schema = bqToSchema(t.Schema)
	}
	if t.View != nil {
		md.ViewQuery = t.View.Query
		md.UseLegacySQL = t.View.UseLegacySql
	}
	md.TimePartitioning = bqToTimePartitioning(t.TimePartitioning)
	md.Clustering = bqToClustering(t.Clustering)
	if t.StreamingBuffer != nil {
		md.StreamingBuffer = &StreamingBuffer{
			EstimatedBytes:  t.StreamingBuffer.EstimatedBytes,
			EstimatedRows:   t.StreamingBuffer.EstimatedRows,
			OldestEntryTime: unixMillisToTime(int64(t.StreamingBuffer.OldestEntryTime)),
		}
	}
	if t.ExternalDataConfiguration != nil {
		edc, err := bqToExternalDataConfig(t.ExternalDataConfiguration)
		if err != nil {
			return nil, err
		}
		md.ExternalDataConfig = edc
	}
	return md, nil
}

// Delete deletes the table.
func (t *Table) Delete(ctx context.Context) (err error) {
	ctx = trace.StartSpan(ctx, "cloud.google.com/go/bigquery.Table.Delete")
	defer func() { trace.EndSpan(ctx, err) }()

	req := t.c.bqs.Tables.Delete(t.ProjectID, t.DatasetID, t.TableID).Context(ctx)
	setClientHeader(req.Header())
	return req.Do()
}

// Read fetches the contents of the table.
func (t *Table) Read(ctx context.Context) *RowIterator {
	return t.read(ctx, fetchPage)
}

func (t *Table) read(ctx context.Context, pf pageFetcher) *RowIterator {
	return newRowIterator(ctx, t, pf)
}

// NeverExpire is a sentinel value used to remove a table'e expiration time.
var NeverExpire = time.Time{}.Add(-1)

// Update modifies specific Table metadata fields.
func (t *Table) Update(ctx context.Context, tm TableMetadataToUpdate, etag string) (md *TableMetadata, err error) {
	ctx = trace.StartSpan(ctx, "cloud.google.com/go/bigquery.Table.Update")
	defer func() { trace.EndSpan(ctx, err) }()

	bqt, err := tm.toBQ()
	if err != nil {
		return nil, err
	}
	call := t.c.bqs.Tables.Patch(t.ProjectID, t.DatasetID, t.TableID, bqt).Context(ctx)
	setClientHeader(call.Header())
	if etag != "" {
		call.Header().Set("If-Match", etag)
	}
	var res *bq.Table
	if err := runWithRetry(ctx, func() (err error) {
		res, err = call.Do()
		return err
	}); err != nil {
		return nil, err
	}
	return bqToTableMetadata(res)
}

func (tm *TableMetadataToUpdate) toBQ() (*bq.Table, error) {
	t := &bq.Table{}
	forceSend := func(field string) {
		t.ForceSendFields = append(t.ForceSendFields, field)
	}

	if tm.Description != nil {
		t.Description = optional.ToString(tm.Description)
		forceSend("Description")
	}
	if tm.Name != nil {
		t.FriendlyName = optional.ToString(tm.Name)
		forceSend("FriendlyName")
	}
	if tm.Schema != nil {
		t.Schema = tm.Schema.toBQ()
		forceSend("Schema")
	}
	if tm.EncryptionConfig != nil {
		t.EncryptionConfiguration = tm.EncryptionConfig.toBQ()
	}

	if !validExpiration(tm.ExpirationTime) {
		return nil, invalidTimeError(tm.ExpirationTime)
	}
	if tm.ExpirationTime == NeverExpire {
		t.NullFields = append(t.NullFields, "ExpirationTime")
	} else if !tm.ExpirationTime.IsZero() {
		t.ExpirationTime = tm.ExpirationTime.UnixNano() / 1e6
		forceSend("ExpirationTime")
	}
	if tm.TimePartitioning != nil {
		t.TimePartitioning = tm.TimePartitioning.toBQ()
		t.TimePartitioning.ForceSendFields = []string{"RequirePartitionFilter"}
		if tm.TimePartitioning.Expiration == 0 {
			t.TimePartitioning.NullFields = []string{"ExpirationMs"}
		}
	}
	if tm.RequirePartitionFilter != nil {
		t.RequirePartitionFilter = optional.ToBool(tm.RequirePartitionFilter)
		forceSend("RequirePartitionFilter")
	}
	if tm.ViewQuery != nil {
		t.View = &bq.ViewDefinition{
			Query:           optional.ToString(tm.ViewQuery),
			ForceSendFields: []string{"Query"},
		}
	}
	if tm.UseLegacySQL != nil {
		if t.View == nil {
			t.View = &bq.ViewDefinition{}
		}
		t.View.UseLegacySql = optional.ToBool(tm.UseLegacySQL)
		t.View.ForceSendFields = append(t.View.ForceSendFields, "UseLegacySql")
	}
	labels, forces, nulls := tm.update()
	t.Labels = labels
	t.ForceSendFields = append(t.ForceSendFields, forces...)
	t.NullFields = append(t.NullFields, nulls...)
	return t, nil
}

// validExpiration ensures a specified time is either the sentinel NeverExpire,
// the zero value, or within the defined range of UnixNano. Internal
// represetations of expiration times are based upon Time.UnixNano. Any time
// before 1678 or after 2262 cannot be represented by an int64 and is therefore
// undefined and invalid. See https://godoc.org/time#Time.UnixNano.
func validExpiration(t time.Time) bool {
	return t == NeverExpire || t.IsZero() || time.Unix(0, t.UnixNano()).Equal(t)
}

// invalidTimeError emits a consistent error message for failures of the
// validExpiration function.
func invalidTimeError(t time.Time) error {
	return fmt.Errorf("invalid expiration time %v. "+
		"Valid expiration times are after 1678 and before 2262", t)
}

// TableMetadataToUpdate is used when updating a table's metadata.
// Only non-nil fields will be updated.
type TableMetadataToUpdate struct {
	// The user-friendly description of this table.
	Description optional.String

	// The user-friendly name for this table.
	Name optional.String

	// The table's schema.
	// When updating a schema, you can add columns but not remove them.
	Schema Schema

	// The table's encryption configuration.  When calling Update, ensure that
	// all mutable fields of EncryptionConfig are populated.
	EncryptionConfig *EncryptionConfig

	// The time when this table expires. To remove a table's expiration,
	// set ExpirationTime to NeverExpire. The zero value is ignored.
	ExpirationTime time.Time

	// The query to use for a view.
	ViewQuery optional.String

	// Use Legacy SQL for the view query.
	UseLegacySQL optional.Bool

	// TimePartitioning allows modification of certain aspects of partition
	// configuration such as partition expiration and whether partition
	// filtration is required at query time.  When calling Update, ensure
	// that all mutable fields of TimePartitioning are populated.
	TimePartitioning *TimePartitioning

	// RequirePartitionFilter governs whether the table enforces partition
	// elimination when referenced in a query.
	RequirePartitionFilter optional.Bool

	labelUpdater
}

// labelUpdater contains common code for updating labels.
type labelUpdater struct {
	setLabels    map[string]string
	deleteLabels map[string]bool
}

// SetLabel causes a label to be added or modified on a call to Update.
func (u *labelUpdater) SetLabel(name, value string) {
	if u.setLabels == nil {
		u.setLabels = map[string]string{}
	}
	u.setLabels[name] = value
}

// DeleteLabel causes a label to be deleted on a call to Update.
func (u *labelUpdater) DeleteLabel(name string) {
	if u.deleteLabels == nil {
		u.deleteLabels = map[string]bool{}
	}
	u.deleteLabels[name] = true
}

func (u *labelUpdater) update() (labels map[string]string, forces, nulls []string) {
	if u.setLabels == nil && u.deleteLabels == nil {
		return nil, nil, nil
	}
	labels = map[string]string{}
	for k, v := range u.setLabels {
		labels[k] = v
	}
	if len(labels) == 0 && len(u.deleteLabels) > 0 {
		forces = []string{"Labels"}
	}
	for l := range u.deleteLabels {
		nulls = append(nulls, "Labels."+l)
	}
	return labels, forces, nulls
}
