| /* |
| Copyright 2017 Google LLC |
| |
| Licensed under the Apache License, Version 2.0 (the "License"); |
| you may not use this file except in compliance with the License. |
| You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| */ |
| |
| package spanner |
| |
| import ( |
| "fmt" |
| "math/rand" |
| "reflect" |
| "time" |
| |
| sppb "cloud.google.com/go/spanner/apiv1/spannerpb" |
| "google.golang.org/grpc/codes" |
| proto3 "google.golang.org/protobuf/types/known/structpb" |
| "google.golang.org/protobuf/types/known/timestamppb" |
| ) |
| |
| // op is the mutation operation. |
| type op int |
| |
| const ( |
| // opDelete removes a row from a table. Succeeds whether or not the |
| // key was present. |
| opDelete op = iota |
| // opInsert inserts a row into a table. If the row already exists, the |
| // write or transaction fails. |
| opInsert |
| // opInsertOrUpdate inserts a row into a table. If the row already |
| // exists, it updates it instead. Any column values not explicitly |
| // written are preserved. |
| opInsertOrUpdate |
| // opReplace inserts a row into a table, deleting any existing row. |
| // Unlike InsertOrUpdate, this means any values not explicitly written |
| // become NULL. |
| opReplace |
| // opUpdate updates a row in a table. If the row does not already |
| // exist, the write or transaction fails. |
| opUpdate |
| // opSend sends a message to a queue. Users need to specify the queue |
| // name, a key, payload, and optionally delivery time and sending |
| // a message. |
| opSend |
| // opAck acks a message in a queue, effectively remove it from the |
| // queue. Users need to specify the queue name and the key, and optionally |
| // a bool value to ignore error if the message does not exist. |
| opAck |
| ) |
| |
| // A Mutation describes a modification to one or more Cloud Spanner rows. The |
| // mutation represents an insert, update, delete, etc on a table, or send, ack |
| // on a queue. |
| // |
| // Many mutations can be applied in a single atomic commit. For purposes of |
| // constraint checking (such as foreign key constraints), the operations can be |
| // viewed as applying in the same order as the mutations are provided (so that, |
| // e.g., a row and its logical "child" can be inserted in the same commit). |
| // |
| // The Apply function applies series of mutations. For example, |
| // |
| // m := spanner.Insert("User", |
| // []string{"user_id", "profile"}, |
| // []interface{}{UserID, profile}) |
| // _, err := client.Apply(ctx, []*spanner.Mutation{m}) |
| // |
| // inserts a new row into the User table. The primary key |
| // for the new row is UserID (presuming that "user_id" has been declared as the |
| // primary key of the "User" table). |
| // |
| // To apply a series of mutations as part of an atomic read-modify-write |
| // operation, use ReadWriteTransaction. |
| // |
| // # Updating a row |
| // |
| // Changing the values of columns in an existing row is very similar to |
| // inserting a new row: |
| // |
| // m := spanner.Update("User", |
| // []string{"user_id", "profile"}, |
| // []interface{}{UserID, profile}) |
| // _, err := client.Apply(ctx, []*spanner.Mutation{m}) |
| // |
| // # Deleting a row |
| // |
| // To delete a row, use spanner.Delete: |
| // |
| // m := spanner.Delete("User", spanner.Key{UserId}) |
| // _, err := client.Apply(ctx, []*spanner.Mutation{m}) |
| // |
| // spanner.Delete accepts a KeySet, so you can also pass in a KeyRange, or use |
| // the spanner.KeySets function to build any combination of Keys and KeyRanges. |
| // |
| // Note that deleting a row in a table may also delete rows from other tables |
| // if cascading deletes are specified in those tables' schemas. Delete does |
| // nothing if the named row does not exist (does not yield an error). |
| // |
| // # Deleting a field |
| // |
| // To delete/clear a field within a row, use spanner.Update with the value nil: |
| // |
| // m := spanner.Update("User", |
| // []string{"user_id", "profile"}, |
| // []interface{}{UserID, nil}) |
| // _, err := client.Apply(ctx, []*spanner.Mutation{m}) |
| // |
| // The valid Go types and their corresponding Cloud Spanner types that can be |
| // used in the Insert/Update/InsertOrUpdate functions are: |
| // |
| // string, *string, NullString - STRING |
| // []string, []*string, []NullString - STRING ARRAY |
| // []byte - BYTES |
| // [][]byte - BYTES ARRAY |
| // int, int64, *int64, NullInt64 - INT64 |
| // []int, []int64, []*int64, []NullInt64 - INT64 ARRAY |
| // bool, *bool, NullBool - BOOL |
| // []bool, []*bool, []NullBool - BOOL ARRAY |
| // float64, *float64, NullFloat64 - FLOAT64 |
| // []float64, []*float64, []NullFloat64 - FLOAT64 ARRAY |
| // time.Time, *time.Time, NullTime - TIMESTAMP |
| // []time.Time, []*time.Time, []NullTime - TIMESTAMP ARRAY |
| // Date, *Date, NullDate - DATE |
| // []Date, []*Date, []NullDate - DATE ARRAY |
| // big.Rat, *big.Rat, NullNumeric - NUMERIC |
| // []big.Rat, []*big.Rat, []NullNumeric - NUMERIC ARRAY |
| // |
| // To compare two Mutations for testing purposes, use reflect.DeepEqual. |
| type Mutation struct { |
| // op is the operation type of the mutation. |
| // See documentation for spanner.op for more details. |
| op op |
| // Table is the name of the target table to be modified. |
| table string |
| // keySet is a set of primary keys that names the rows |
| // in a delete operation. |
| keySet KeySet |
| // columns names the set of columns that are going to be |
| // modified by Insert, InsertOrUpdate, Replace or Update |
| // operations. |
| columns []string |
| // values specifies the new values for the target columns |
| // named by Columns. |
| values []interface{} |
| |
| // Queue related fields |
| // Target queue name to be modified |
| queue string |
| // key is the primary key used in send or ack |
| key Key |
| // payload is the content of the message |
| payload interface{} |
| // deliverTime is optionally set for opSend |
| deliverTime time.Time |
| // ignoreNotFound is optionally set for opAck |
| ignoreNotFound bool |
| |
| // wrapped is the protobuf mutation that is the source for this Mutation. |
| // This is only set if the [WrapMutation] function was used to create the Mutation. |
| wrapped *sppb.Mutation |
| } |
| |
| // A MutationGroup is a list of Mutation to be committed atomically. |
| type MutationGroup struct { |
| // The Mutations in this group |
| Mutations []*Mutation |
| } |
| |
| // mapToMutationParams converts Go map into mutation parameters. |
| func mapToMutationParams(in map[string]interface{}) ([]string, []interface{}) { |
| cols := []string{} |
| vals := []interface{}{} |
| for k, v := range in { |
| cols = append(cols, k) |
| vals = append(vals, v) |
| } |
| return cols, vals |
| } |
| |
| // errNotStruct returns error for not getting a go struct type. |
| func errNotStruct(in interface{}) error { |
| return spannerErrorf(codes.InvalidArgument, "%T is not a go struct type", in) |
| } |
| |
| // structToMutationParams converts Go struct into mutation parameters. |
| // If the input is not a valid Go struct type, structToMutationParams |
| // returns error. |
| func structToMutationParams(in interface{}) ([]string, []interface{}, error) { |
| if in == nil { |
| return nil, nil, errNotStruct(in) |
| } |
| v := reflect.ValueOf(in) |
| t := v.Type() |
| if t.Kind() == reflect.Ptr && t.Elem().Kind() == reflect.Struct { |
| // t is a pointer to a struct. |
| if v.IsNil() { |
| // Return empty results. |
| return nil, nil, nil |
| } |
| // Get the struct value that in points to. |
| v = v.Elem() |
| t = t.Elem() |
| } |
| if t.Kind() != reflect.Struct { |
| return nil, nil, errNotStruct(in) |
| } |
| fields, err := fieldCache.Fields(t) |
| if err != nil { |
| return nil, nil, ToSpannerError(err) |
| } |
| var cols []string |
| var vals []interface{} |
| for _, f := range fields { |
| if f.ParsedTag != nil { |
| if tag, ok := f.ParsedTag.(spannerTag); ok && tag.ReadOnly { |
| continue |
| } |
| } |
| cols = append(cols, f.Name) |
| vals = append(vals, v.FieldByIndex(f.Index).Interface()) |
| } |
| return cols, vals, nil |
| } |
| |
| // Insert returns a Mutation to insert a row into a table. If the row already |
| // exists, the write or transaction fails with codes.AlreadyExists. |
| func Insert(table string, cols []string, vals []interface{}) *Mutation { |
| return &Mutation{ |
| op: opInsert, |
| table: table, |
| columns: cols, |
| values: vals, |
| } |
| } |
| |
| // InsertMap returns a Mutation to insert a row into a table, specified by |
| // a map of column name to value. If the row already exists, the write or |
| // transaction fails with codes.AlreadyExists. |
| func InsertMap(table string, in map[string]interface{}) *Mutation { |
| cols, vals := mapToMutationParams(in) |
| return Insert(table, cols, vals) |
| } |
| |
| // InsertStruct returns a Mutation to insert a row into a table, specified by |
| // a Go struct. If the row already exists, the write or transaction fails with |
| // codes.AlreadyExists. |
| // |
| // The in argument must be a struct or a pointer to a struct. Its exported |
| // fields specify the column names and values. Use a field tag like `spanner:"name"` |
| // to provide an alternative column name, or use `spanner:"-"` to ignore the field. |
| func InsertStruct(table string, in interface{}) (*Mutation, error) { |
| cols, vals, err := structToMutationParams(in) |
| if err != nil { |
| return nil, err |
| } |
| return Insert(table, cols, vals), nil |
| } |
| |
| // Update returns a Mutation to update a row in a table. If the row does not |
| // already exist, the write or transaction fails. |
| func Update(table string, cols []string, vals []interface{}) *Mutation { |
| return &Mutation{ |
| op: opUpdate, |
| table: table, |
| columns: cols, |
| values: vals, |
| } |
| } |
| |
| // UpdateMap returns a Mutation to update a row in a table, specified by |
| // a map of column to value. If the row does not already exist, the write or |
| // transaction fails. |
| func UpdateMap(table string, in map[string]interface{}) *Mutation { |
| cols, vals := mapToMutationParams(in) |
| return Update(table, cols, vals) |
| } |
| |
| // UpdateStruct returns a Mutation to update a row in a table, specified by a Go |
| // struct. If the row does not already exist, the write or transaction fails. |
| func UpdateStruct(table string, in interface{}) (*Mutation, error) { |
| cols, vals, err := structToMutationParams(in) |
| if err != nil { |
| return nil, err |
| } |
| return Update(table, cols, vals), nil |
| } |
| |
| // InsertOrUpdate returns a Mutation to insert a row into a table. If the row |
| // already exists, it updates it instead. Any column values not explicitly |
| // written are preserved. |
| // |
| // For a similar example, See Update. |
| func InsertOrUpdate(table string, cols []string, vals []interface{}) *Mutation { |
| return &Mutation{ |
| op: opInsertOrUpdate, |
| table: table, |
| columns: cols, |
| values: vals, |
| } |
| } |
| |
| // InsertOrUpdateMap returns a Mutation to insert a row into a table, |
| // specified by a map of column to value. If the row already exists, it |
| // updates it instead. Any column values not explicitly written are preserved. |
| // |
| // For a similar example, See UpdateMap. |
| func InsertOrUpdateMap(table string, in map[string]interface{}) *Mutation { |
| cols, vals := mapToMutationParams(in) |
| return InsertOrUpdate(table, cols, vals) |
| } |
| |
| // InsertOrUpdateStruct returns a Mutation to insert a row into a table, |
| // specified by a Go struct. If the row already exists, it updates it instead. |
| // Any column values not explicitly written are preserved. |
| // |
| // The in argument must be a struct or a pointer to a struct. Its exported |
| // fields specify the column names and values. Use a field tag like |
| // `spanner:"name"` to provide an alternative column name, or use `spanner:"-"` to |
| // ignore the field. |
| // |
| // For a similar example, See UpdateStruct. |
| func InsertOrUpdateStruct(table string, in interface{}) (*Mutation, error) { |
| cols, vals, err := structToMutationParams(in) |
| if err != nil { |
| return nil, err |
| } |
| return InsertOrUpdate(table, cols, vals), nil |
| } |
| |
| // Replace returns a Mutation to insert a row into a table, deleting any |
| // existing row. Unlike InsertOrUpdate, this means any values not explicitly |
| // written become NULL. |
| // |
| // For a similar example, See Update. |
| func Replace(table string, cols []string, vals []interface{}) *Mutation { |
| return &Mutation{ |
| op: opReplace, |
| table: table, |
| columns: cols, |
| values: vals, |
| } |
| } |
| |
| // ReplaceMap returns a Mutation to insert a row into a table, deleting any |
| // existing row. Unlike InsertOrUpdateMap, this means any values not explicitly |
| // written become NULL. The row is specified by a map of column to value. |
| // |
| // For a similar example, See UpdateMap. |
| func ReplaceMap(table string, in map[string]interface{}) *Mutation { |
| cols, vals := mapToMutationParams(in) |
| return Replace(table, cols, vals) |
| } |
| |
| // ReplaceStruct returns a Mutation to insert a row into a table, deleting any |
| // existing row. Unlike InsertOrUpdateMap, this means any values not explicitly |
| // written become NULL. The row is specified by a Go struct. |
| // |
| // The in argument must be a struct or a pointer to a struct. Its exported |
| // fields specify the column names and values. Use a field tag like `spanner:"name"` |
| // to provide an alternative column name, or use `spanner:"-"` to ignore the field. |
| // |
| // For a similar example, See UpdateStruct. |
| func ReplaceStruct(table string, in interface{}) (*Mutation, error) { |
| cols, vals, err := structToMutationParams(in) |
| if err != nil { |
| return nil, err |
| } |
| return Replace(table, cols, vals), nil |
| } |
| |
| // Delete removes the rows described by the KeySet from the table. It succeeds |
| // whether or not the keys were present. |
| func Delete(table string, ks KeySet) *Mutation { |
| return &Mutation{ |
| op: opDelete, |
| table: table, |
| keySet: ks, |
| } |
| } |
| |
| // SendOption specifies optional fields for Send mutation |
| type SendOption func(*Mutation) |
| |
| // AckOption specifies optional fields for Ack mutation |
| type AckOption func(*Mutation) |
| |
| // WithDeliveryTime returns an SendOption to set field `deliverTime` |
| func WithDeliveryTime(t time.Time) SendOption { |
| return func(m *Mutation) { |
| m.deliverTime = t |
| } |
| } |
| |
| // Send returns a Mutation to send a message to a queue. |
| func Send(queue string, key Key, payload interface{}, opts ...SendOption) *Mutation { |
| m := &Mutation{ |
| op: opSend, |
| queue: queue, |
| key: key, |
| payload: payload, |
| } |
| for _, opt := range opts { |
| opt(m) |
| } |
| return m |
| } |
| |
| // WithIgnoreNotFound returns an AckOption to set field `ignoreNotFound` |
| func WithIgnoreNotFound(ignoreNotFound bool) AckOption { |
| return func(m *Mutation) { |
| m.ignoreNotFound = ignoreNotFound |
| } |
| } |
| |
| // Ack returns a Mutation to acknowledge (and thus delete) a message from a queue. |
| func Ack(queue string, key Key, opts ...AckOption) *Mutation { |
| m := &Mutation{ |
| op: opAck, |
| queue: queue, |
| key: key, |
| } |
| for _, opt := range opts { |
| opt(m) |
| } |
| return m |
| } |
| |
| // WrapMutation creates a mutation that wraps an existing protobuf mutation object. |
| func WrapMutation(proto *sppb.Mutation) (*Mutation, error) { |
| if proto == nil { |
| return nil, fmt.Errorf("protobuf mutation may not be nil") |
| } |
| op, table, queue, err := getTableOrQueueAndSpannerOperation(proto) |
| if err != nil { |
| return nil, err |
| } |
| return &Mutation{ |
| op: op, |
| table: table, |
| queue: queue, |
| wrapped: proto, |
| }, nil |
| } |
| |
| func getTableOrQueueAndSpannerOperation(proto *sppb.Mutation) (op, string, string, error) { |
| switch op := proto.Operation.(type) { |
| case *sppb.Mutation_Insert: |
| return opInsert, op.Insert.Table, "", nil |
| case *sppb.Mutation_Update: |
| return opUpdate, op.Update.Table, "", nil |
| case *sppb.Mutation_Replace: |
| return opReplace, op.Replace.Table, "", nil |
| case *sppb.Mutation_Delete_: |
| return opDelete, op.Delete.Table, "", nil |
| case *sppb.Mutation_InsertOrUpdate: |
| return opInsertOrUpdate, op.InsertOrUpdate.Table, "", nil |
| case *sppb.Mutation_Send_: |
| return opSend, "", op.Send.Queue, nil |
| case *sppb.Mutation_Ack_: |
| return opAck, "", op.Ack.Queue, nil |
| } |
| return 0, "", "", spannerErrorf(codes.InvalidArgument, "unknown op type: %T", proto.Operation) |
| } |
| |
| // prepareWrite generates sppb.Mutation_Write from table name, column names |
| // and new column values. |
| func prepareWrite(table string, columns []string, vals []interface{}) (*sppb.Mutation_Write, error) { |
| v, err := encodeValueArray(vals) |
| if err != nil { |
| return nil, err |
| } |
| return &sppb.Mutation_Write{ |
| Table: table, |
| Columns: columns, |
| Values: []*proto3.ListValue{v}, |
| }, nil |
| } |
| |
| // errInvdMutationOp returns error for unrecognized mutation operation. |
| func errInvdMutationOp(m Mutation) error { |
| return spannerErrorf(codes.InvalidArgument, "Unknown op type: %d", m.op) |
| } |
| |
| // proto converts a spanner.Mutation to sppb.Mutation, in preparation to send |
| // RPCs. |
| func (m Mutation) proto() (*sppb.Mutation, error) { |
| if m.wrapped != nil { |
| return m.wrapped, nil |
| } |
| |
| var pb *sppb.Mutation |
| switch m.op { |
| case opDelete: |
| var kp *sppb.KeySet |
| if m.keySet != nil { |
| var err error |
| kp, err = m.keySet.keySetProto() |
| if err != nil { |
| return nil, err |
| } |
| } |
| pb = &sppb.Mutation{ |
| Operation: &sppb.Mutation_Delete_{ |
| Delete: &sppb.Mutation_Delete{ |
| Table: m.table, |
| KeySet: kp, |
| }, |
| }, |
| } |
| case opInsert: |
| w, err := prepareWrite(m.table, m.columns, m.values) |
| if err != nil { |
| return nil, err |
| } |
| pb = &sppb.Mutation{Operation: &sppb.Mutation_Insert{Insert: w}} |
| case opInsertOrUpdate: |
| w, err := prepareWrite(m.table, m.columns, m.values) |
| if err != nil { |
| return nil, err |
| } |
| pb = &sppb.Mutation{Operation: &sppb.Mutation_InsertOrUpdate{InsertOrUpdate: w}} |
| case opReplace: |
| w, err := prepareWrite(m.table, m.columns, m.values) |
| if err != nil { |
| return nil, err |
| } |
| pb = &sppb.Mutation{Operation: &sppb.Mutation_Replace{Replace: w}} |
| case opUpdate: |
| w, err := prepareWrite(m.table, m.columns, m.values) |
| if err != nil { |
| return nil, err |
| } |
| pb = &sppb.Mutation{Operation: &sppb.Mutation_Update{Update: w}} |
| case opSend: |
| k, err := encodeValueArray([]interface{}(m.key)) |
| if err != nil { |
| return nil, err |
| } |
| p, _, err := encodeValue(m.payload) |
| if err != nil { |
| return nil, err |
| } |
| var dt *timestamppb.Timestamp |
| if !m.deliverTime.IsZero() { |
| dt = timestamppb.New(m.deliverTime) |
| } |
| pb = &sppb.Mutation{ |
| Operation: &sppb.Mutation_Send_{ |
| Send: &sppb.Mutation_Send{ |
| Queue: m.queue, |
| Key: k, |
| Payload: p, |
| DeliverTime: dt, |
| }, |
| }, |
| } |
| case opAck: |
| k, err := encodeValueArray([]interface{}(m.key)) |
| if err != nil { |
| return nil, err |
| } |
| pb = &sppb.Mutation{ |
| Operation: &sppb.Mutation_Ack_{ |
| Ack: &sppb.Mutation_Ack{ |
| Queue: m.queue, |
| Key: k, |
| IgnoreNotFound: m.ignoreNotFound, |
| }, |
| }, |
| } |
| default: |
| return nil, errInvdMutationOp(m) |
| } |
| return pb, nil |
| } |
| |
| // mutationsProto turns a spanner.Mutation array into a sppb.Mutation array, |
| // it is convenient for sending batch mutations to Cloud Spanner. |
| func mutationsProto(ms []*Mutation) ([]*sppb.Mutation, *sppb.Mutation, error) { |
| n := len(ms) |
| out := make([]*sppb.Mutation, 0, n) |
| if n == 0 { |
| return out, nil, nil |
| } |
| maxInsertIdx := -1 |
| maxInsertVals := -1 |
| nonInsertCount := 0 |
| selectedNonInsertIdx := -1 |
| for i, m := range ms { |
| pb, err := m.proto() |
| if err != nil { |
| return nil, nil, err |
| } |
| out = append(out, pb) |
| if m.op == opInsert { |
| if v := len(m.values); v >= maxInsertVals { |
| maxInsertVals, maxInsertIdx = v, i |
| } |
| continue |
| } |
| nonInsertCount++ |
| if rand.Intn(nonInsertCount) == 0 { |
| selectedNonInsertIdx = i |
| } |
| } |
| if nonInsertCount > 0 { |
| return out, out[selectedNonInsertIdx], nil |
| } |
| if maxInsertIdx >= 0 { |
| return out, out[maxInsertIdx], nil |
| } |
| return out, nil, nil |
| } |
| |
| // mutationGroupsProto turns a spanner.MutationGroup array into a |
| // sppb.BatchWriteRequest_MutationGroup array, in preparation to send RPCs. |
| func mutationGroupsProto(mgs []*MutationGroup) ([]*sppb.BatchWriteRequest_MutationGroup, error) { |
| gs := make([]*sppb.BatchWriteRequest_MutationGroup, 0, len(mgs)) |
| for _, mg := range mgs { |
| ms, _, err := mutationsProto(mg.Mutations) |
| if err != nil { |
| return nil, err |
| } |
| gs = append(gs, &sppb.BatchWriteRequest_MutationGroup{Mutations: ms}) |
| } |
| return gs, nil |
| } |