feat(spanner): support rw-transaction with options (#3058)

* feat(spanner): support rw-transaction with options
diff --git a/spanner/client.go b/spanner/client.go
index c4e9f26..a4ffbb8 100644
--- a/spanner/client.go
+++ b/spanner/client.go
@@ -424,11 +424,29 @@
 func (c *Client) ReadWriteTransaction(ctx context.Context, f func(context.Context, *ReadWriteTransaction) error) (commitTimestamp time.Time, err error) {
 	ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.ReadWriteTransaction")
 	defer func() { trace.EndSpan(ctx, err) }()
+	resp, err := c.rwTransaction(ctx, f, TransactionOptions{})
+	return resp.CommitTs, err
+}
+
+// ReadWriteTransactionWithOptions executes a read-write transaction with
+// configurable options, with retries as necessary.
+//
+// ReadWriteTransactionWithOptions is a configurable ReadWriteTransaction.
+//
+// See https://godoc.org/cloud.google.com/go/spanner#ReadWriteTransaction for
+// more details.
+func (c *Client) ReadWriteTransactionWithOptions(ctx context.Context, f func(context.Context, *ReadWriteTransaction) error, options TransactionOptions) (resp CommitResponse, err error) {
+	ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.ReadWriteTransactionWithOptions")
+	defer func() { trace.EndSpan(ctx, err) }()
+	resp, err = c.rwTransaction(ctx, f, options)
+	return resp, err
+}
+
+func (c *Client) rwTransaction(ctx context.Context, f func(context.Context, *ReadWriteTransaction) error, options TransactionOptions) (resp CommitResponse, err error) {
 	if err := checkNestedTxn(ctx); err != nil {
-		return time.Time{}, err
+		return resp, err
 	}
 	var (
-		ts time.Time
 		sh *sessionHandle
 	)
 	err = runWithRetryOnAbortedOrSessionNotFound(ctx, func(ctx context.Context) error {
@@ -457,13 +475,13 @@
 		if err = t.begin(ctx); err != nil {
 			return err
 		}
-		ts, err = t.runInTransaction(ctx, f)
+		resp, err = t.runInTransaction(ctx, f)
 		return err
 	})
 	if sh != nil {
 		sh.recycle()
 	}
-	return ts, err
+	return resp, err
 }
 
 // applyOption controls the behavior of Client.Apply.
diff --git a/spanner/transaction.go b/spanner/transaction.go
index c12863b..93087b5 100644
--- a/spanner/transaction.go
+++ b/spanner/transaction.go
@@ -74,6 +74,10 @@
 	qo QueryOptions
 }
 
+// TransactionOptions provides options for a transaction.
+type TransactionOptions struct {
+}
+
 // errSessionClosed returns error for using a recycled/destroyed session
 func errSessionClosed(sh *sessionHandle) error {
 	return spannerErrorf(codes.FailedPrecondition,
@@ -954,22 +958,28 @@
 	return err
 }
 
+// CommitResponse provides a response of a transaction commit in a database.
+type CommitResponse struct {
+	// CommitTs is the commit time for a transaction.
+	CommitTs time.Time
+}
+
 // commit tries to commit a readwrite transaction to Cloud Spanner. It also
-// returns the commit timestamp for the transactions.
-func (t *ReadWriteTransaction) commit(ctx context.Context) (time.Time, error) {
-	var ts time.Time
+// returns the commit response for the transactions.
+func (t *ReadWriteTransaction) commit(ctx context.Context) (CommitResponse, error) {
+	resp := CommitResponse{}
 	t.mu.Lock()
 	t.state = txClosed // No further operations after commit.
 	mPb, err := mutationsProto(t.wb)
 	t.mu.Unlock()
 	if err != nil {
-		return ts, err
+		return resp, err
 	}
 	// In case that sessionHandle was destroyed but transaction body fails to
 	// report it.
 	sid, client := t.sh.getID(), t.sh.getClient()
 	if sid == "" || client == nil {
-		return ts, errSessionClosed(t.sh)
+		return resp, errSessionClosed(t.sh)
 	}
 
 	res, e := client.Commit(contextWithOutgoingMetadata(ctx, t.sh.getMetadata()), &sppb.CommitRequest{
@@ -980,15 +990,15 @@
 		Mutations: mPb,
 	})
 	if e != nil {
-		return ts, toSpannerErrorWithCommitInfo(e, true)
+		return resp, toSpannerErrorWithCommitInfo(e, true)
 	}
 	if tstamp := res.GetCommitTimestamp(); tstamp != nil {
-		ts = time.Unix(tstamp.Seconds, int64(tstamp.Nanos))
+		resp.CommitTs = time.Unix(tstamp.Seconds, int64(tstamp.Nanos))
 	}
 	if isSessionNotFoundError(err) {
 		t.sh.destroy()
 	}
-	return ts, err
+	return resp, err
 }
 
 // rollback is called when a commit is aborted or the transaction body runs
@@ -1014,15 +1024,15 @@
 }
 
 // runInTransaction executes f under a read-write transaction context.
-func (t *ReadWriteTransaction) runInTransaction(ctx context.Context, f func(context.Context, *ReadWriteTransaction) error) (time.Time, error) {
+func (t *ReadWriteTransaction) runInTransaction(ctx context.Context, f func(context.Context, *ReadWriteTransaction) error) (CommitResponse, error) {
 	var (
-		ts              time.Time
+		resp            CommitResponse
 		err             error
 		errDuringCommit bool
 	)
 	if err = f(context.WithValue(ctx, transactionInProgressKey{}, 1), t); err == nil {
 		// Try to commit if transaction body returns no error.
-		ts, err = t.commit(ctx)
+		resp, err = t.commit(ctx)
 		errDuringCommit = err != nil
 	}
 	if err != nil {
@@ -1030,11 +1040,11 @@
 			// Retry the transaction using the same session on ABORT error.
 			// Cloud Spanner will create the new transaction with the previous
 			// one's wound-wait priority.
-			return ts, err
+			return resp, err
 		}
 		if isSessionNotFoundError(err) {
 			t.sh.destroy()
-			return ts, err
+			return resp, err
 		}
 		// Rollback the transaction unless the error occurred during the
 		// commit. Executing a rollback after a commit has failed will
@@ -1045,10 +1055,10 @@
 		if !errDuringCommit {
 			t.rollback(ctx)
 		}
-		return ts, err
+		return resp, err
 	}
-	// err == nil, return commit timestamp.
-	return ts, nil
+	// err == nil, return commit response.
+	return resp, nil
 }
 
 // ReadWriteStmtBasedTransaction provides a wrapper of ReadWriteTransaction in
@@ -1074,6 +1084,18 @@
 // For most use cases, client.ReadWriteTransaction should be used, as it will
 // handle all Aborted and 'Session not found' errors automatically.
 func NewReadWriteStmtBasedTransaction(ctx context.Context, c *Client) (*ReadWriteStmtBasedTransaction, error) {
+	return NewReadWriteStmtBasedTransactionWithOptions(ctx, c, TransactionOptions{})
+}
+
+// NewReadWriteStmtBasedTransactionWithOptions starts a read-write transaction
+// with configurable options. Commit() or Rollback() must be called to end a
+// transaction. If Commit() or Rollback() is not called, the session that is
+// used by the transaction will not be returned to the pool and cause a session
+// leak.
+//
+// NewReadWriteStmtBasedTransactionWithOptions is a configurable version of
+// NewReadWriteStmtBasedTransaction.
+func NewReadWriteStmtBasedTransactionWithOptions(ctx context.Context, c *Client, options TransactionOptions) (*ReadWriteStmtBasedTransaction, error) {
 	var (
 		sh  *sessionHandle
 		err error
@@ -1105,11 +1127,7 @@
 // Commit tries to commit a readwrite transaction to Cloud Spanner. It also
 // returns the commit timestamp for the transactions.
 func (t *ReadWriteStmtBasedTransaction) Commit(ctx context.Context) (time.Time, error) {
-	var (
-		ts  time.Time
-		err error
-	)
-	ts, err = t.commit(ctx)
+	resp, err := t.commit(ctx)
 	// Rolling back an aborted transaction is not necessary.
 	if err != nil && status.Code(err) != codes.Aborted {
 		t.rollback(ctx)
@@ -1117,7 +1135,7 @@
 	if t.sh != nil {
 		t.sh.recycle()
 	}
-	return ts, err
+	return resp.CommitTs, err
 }
 
 // Rollback is called to cancel the ongoing transaction that has not been