feat(spanner): support rw-transaction with options (#3058)
* feat(spanner): support rw-transaction with options
diff --git a/spanner/client.go b/spanner/client.go
index c4e9f26..a4ffbb8 100644
--- a/spanner/client.go
+++ b/spanner/client.go
@@ -424,11 +424,29 @@
func (c *Client) ReadWriteTransaction(ctx context.Context, f func(context.Context, *ReadWriteTransaction) error) (commitTimestamp time.Time, err error) {
ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.ReadWriteTransaction")
defer func() { trace.EndSpan(ctx, err) }()
+ resp, err := c.rwTransaction(ctx, f, TransactionOptions{})
+ return resp.CommitTs, err
+}
+
+// ReadWriteTransactionWithOptions executes a read-write transaction with
+// configurable options, with retries as necessary.
+//
+// ReadWriteTransactionWithOptions is a configurable ReadWriteTransaction.
+//
+// See https://godoc.org/cloud.google.com/go/spanner#ReadWriteTransaction for
+// more details.
+func (c *Client) ReadWriteTransactionWithOptions(ctx context.Context, f func(context.Context, *ReadWriteTransaction) error, options TransactionOptions) (resp CommitResponse, err error) {
+ ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.ReadWriteTransactionWithOptions")
+ defer func() { trace.EndSpan(ctx, err) }()
+ resp, err = c.rwTransaction(ctx, f, options)
+ return resp, err
+}
+
+func (c *Client) rwTransaction(ctx context.Context, f func(context.Context, *ReadWriteTransaction) error, options TransactionOptions) (resp CommitResponse, err error) {
if err := checkNestedTxn(ctx); err != nil {
- return time.Time{}, err
+ return resp, err
}
var (
- ts time.Time
sh *sessionHandle
)
err = runWithRetryOnAbortedOrSessionNotFound(ctx, func(ctx context.Context) error {
@@ -457,13 +475,13 @@
if err = t.begin(ctx); err != nil {
return err
}
- ts, err = t.runInTransaction(ctx, f)
+ resp, err = t.runInTransaction(ctx, f)
return err
})
if sh != nil {
sh.recycle()
}
- return ts, err
+ return resp, err
}
// applyOption controls the behavior of Client.Apply.
diff --git a/spanner/transaction.go b/spanner/transaction.go
index c12863b..93087b5 100644
--- a/spanner/transaction.go
+++ b/spanner/transaction.go
@@ -74,6 +74,10 @@
qo QueryOptions
}
+// TransactionOptions provides options for a transaction.
+type TransactionOptions struct {
+}
+
// errSessionClosed returns error for using a recycled/destroyed session
func errSessionClosed(sh *sessionHandle) error {
return spannerErrorf(codes.FailedPrecondition,
@@ -954,22 +958,28 @@
return err
}
+// CommitResponse provides a response of a transaction commit in a database.
+type CommitResponse struct {
+ // CommitTs is the commit time for a transaction.
+ CommitTs time.Time
+}
+
// commit tries to commit a readwrite transaction to Cloud Spanner. It also
-// returns the commit timestamp for the transactions.
-func (t *ReadWriteTransaction) commit(ctx context.Context) (time.Time, error) {
- var ts time.Time
+// returns the commit response for the transactions.
+func (t *ReadWriteTransaction) commit(ctx context.Context) (CommitResponse, error) {
+ resp := CommitResponse{}
t.mu.Lock()
t.state = txClosed // No further operations after commit.
mPb, err := mutationsProto(t.wb)
t.mu.Unlock()
if err != nil {
- return ts, err
+ return resp, err
}
// In case that sessionHandle was destroyed but transaction body fails to
// report it.
sid, client := t.sh.getID(), t.sh.getClient()
if sid == "" || client == nil {
- return ts, errSessionClosed(t.sh)
+ return resp, errSessionClosed(t.sh)
}
res, e := client.Commit(contextWithOutgoingMetadata(ctx, t.sh.getMetadata()), &sppb.CommitRequest{
@@ -980,15 +990,15 @@
Mutations: mPb,
})
if e != nil {
- return ts, toSpannerErrorWithCommitInfo(e, true)
+ return resp, toSpannerErrorWithCommitInfo(e, true)
}
if tstamp := res.GetCommitTimestamp(); tstamp != nil {
- ts = time.Unix(tstamp.Seconds, int64(tstamp.Nanos))
+ resp.CommitTs = time.Unix(tstamp.Seconds, int64(tstamp.Nanos))
}
if isSessionNotFoundError(err) {
t.sh.destroy()
}
- return ts, err
+ return resp, err
}
// rollback is called when a commit is aborted or the transaction body runs
@@ -1014,15 +1024,15 @@
}
// runInTransaction executes f under a read-write transaction context.
-func (t *ReadWriteTransaction) runInTransaction(ctx context.Context, f func(context.Context, *ReadWriteTransaction) error) (time.Time, error) {
+func (t *ReadWriteTransaction) runInTransaction(ctx context.Context, f func(context.Context, *ReadWriteTransaction) error) (CommitResponse, error) {
var (
- ts time.Time
+ resp CommitResponse
err error
errDuringCommit bool
)
if err = f(context.WithValue(ctx, transactionInProgressKey{}, 1), t); err == nil {
// Try to commit if transaction body returns no error.
- ts, err = t.commit(ctx)
+ resp, err = t.commit(ctx)
errDuringCommit = err != nil
}
if err != nil {
@@ -1030,11 +1040,11 @@
// Retry the transaction using the same session on ABORT error.
// Cloud Spanner will create the new transaction with the previous
// one's wound-wait priority.
- return ts, err
+ return resp, err
}
if isSessionNotFoundError(err) {
t.sh.destroy()
- return ts, err
+ return resp, err
}
// Rollback the transaction unless the error occurred during the
// commit. Executing a rollback after a commit has failed will
@@ -1045,10 +1055,10 @@
if !errDuringCommit {
t.rollback(ctx)
}
- return ts, err
+ return resp, err
}
- // err == nil, return commit timestamp.
- return ts, nil
+ // err == nil, return commit response.
+ return resp, nil
}
// ReadWriteStmtBasedTransaction provides a wrapper of ReadWriteTransaction in
@@ -1074,6 +1084,18 @@
// For most use cases, client.ReadWriteTransaction should be used, as it will
// handle all Aborted and 'Session not found' errors automatically.
func NewReadWriteStmtBasedTransaction(ctx context.Context, c *Client) (*ReadWriteStmtBasedTransaction, error) {
+ return NewReadWriteStmtBasedTransactionWithOptions(ctx, c, TransactionOptions{})
+}
+
+// NewReadWriteStmtBasedTransactionWithOptions starts a read-write transaction
+// with configurable options. Commit() or Rollback() must be called to end a
+// transaction. If Commit() or Rollback() is not called, the session that is
+// used by the transaction will not be returned to the pool and cause a session
+// leak.
+//
+// NewReadWriteStmtBasedTransactionWithOptions is a configurable version of
+// NewReadWriteStmtBasedTransaction.
+func NewReadWriteStmtBasedTransactionWithOptions(ctx context.Context, c *Client, options TransactionOptions) (*ReadWriteStmtBasedTransaction, error) {
var (
sh *sessionHandle
err error
@@ -1105,11 +1127,7 @@
// Commit tries to commit a readwrite transaction to Cloud Spanner. It also
// returns the commit timestamp for the transactions.
func (t *ReadWriteStmtBasedTransaction) Commit(ctx context.Context) (time.Time, error) {
- var (
- ts time.Time
- err error
- )
- ts, err = t.commit(ctx)
+ resp, err := t.commit(ctx)
// Rolling back an aborted transaction is not necessary.
if err != nil && status.Code(err) != codes.Aborted {
t.rollback(ctx)
@@ -1117,7 +1135,7 @@
if t.sh != nil {
t.sh.recycle()
}
- return ts, err
+ return resp.CommitTs, err
}
// Rollback is called to cancel the ongoing transaction that has not been