feat(spanner): add metadata to RowIterator (#3050)

Adds ResultSetMetaData to the RowIterator struct. The metadata
are available after the first call to RowIterator.Next() as long
as that call did not return any other error than iterator.Done.

Fixes #1805
diff --git a/spanner/client_test.go b/spanner/client_test.go
index c30f39e..8750541 100644
--- a/spanner/client_test.go
+++ b/spanner/client_test.go
@@ -1880,6 +1880,51 @@
 	}
 }
 
+func TestClient_ShouldReceiveMetadataForEmptyResultSet(t *testing.T) {
+	t.Parallel()
+
+	server, client, teardown := setupMockedTestServer(t)
+	// This creates an empty result set.
+	res := server.CreateSingleRowSingersResult(SelectSingerIDAlbumIDAlbumTitleFromAlbumsRowCount)
+	sql := "SELECT SingerId, AlbumId, AlbumTitle FROM Albums WHERE 1=2"
+	server.TestSpanner.PutStatementResult(sql, res)
+	defer teardown()
+	ctx := context.Background()
+	iter := client.Single().Query(ctx, NewStatement(sql))
+	defer iter.Stop()
+	row, err := iter.Next()
+	if err != iterator.Done {
+		t.Errorf("Query result mismatch:\nGot: %v\nWant: <no rows>", row)
+	}
+	metadata := iter.Metadata
+	if metadata == nil {
+		t.Fatalf("Missing ResultSet Metadata")
+	}
+	if metadata.RowType == nil {
+		t.Fatalf("Missing ResultSet RowType")
+	}
+	if metadata.RowType.Fields == nil {
+		t.Fatalf("Missing ResultSet Fields")
+	}
+	if g, w := len(metadata.RowType.Fields), 3; g != w {
+		t.Fatalf("Field count mismatch\nGot: %v\nWant: %v", g, w)
+	}
+	wantFieldNames := []string{"SingerId", "AlbumId", "AlbumTitle"}
+	for i, w := range wantFieldNames {
+		g := metadata.RowType.Fields[i].Name
+		if g != w {
+			t.Fatalf("Field[%v] name mismatch\nGot: %v\nWant: %v", i, g, w)
+		}
+	}
+	wantFieldTypes := []sppb.TypeCode{sppb.TypeCode_INT64, sppb.TypeCode_INT64, sppb.TypeCode_STRING}
+	for i, w := range wantFieldTypes {
+		g := metadata.RowType.Fields[i].Type.Code
+		if g != w {
+			t.Fatalf("Field[%v] type mismatch\nGot: %v\nWant: %v", i, g, w)
+		}
+	}
+}
+
 func TestClient_EncodeCustomFieldType(t *testing.T) {
 	t.Parallel()
 
diff --git a/spanner/integration_test.go b/spanner/integration_test.go
index dd0a09c..47b266d 100644
--- a/spanner/integration_test.go
+++ b/spanner/integration_test.go
@@ -3156,6 +3156,11 @@
 	for {
 		row, err := iter.Next()
 		if err == iterator.Done {
+			if iter.Metadata == nil {
+				// All queries should always return metadata, regardless whether
+				// they return any rows or not.
+				return nil, errors.New("missing metadata from query")
+			}
 			return vals, nil
 		}
 		if err != nil {
diff --git a/spanner/internal/testutil/inmem_spanner_server.go b/spanner/internal/testutil/inmem_spanner_server.go
index 0735e09..fd39d74 100644
--- a/spanner/internal/testutil/inmem_spanner_server.go
+++ b/spanner/internal/testutil/inmem_spanner_server.go
@@ -148,6 +148,10 @@
 				break
 			}
 		}
+	} else {
+		result = append(result, &spannerpb.PartialResultSet{
+			Metadata: s.ResultSet.Metadata,
+		})
 	}
 	return result, nil
 }
diff --git a/spanner/read.go b/spanner/read.go
index 268cf10..5ede3a3 100644
--- a/spanner/read.go
+++ b/spanner/read.go
@@ -102,6 +102,11 @@
 	// iterator.Done.
 	RowCount int64
 
+	// The metadata of the results of the query. The metadata are available
+	// after the first call to RowIterator.Next(), unless the first call to
+	// RowIterator.Next() returned an error that is not equal to iterator.Done.
+	Metadata *sppb.ResultSetMetadata
+
 	streamd      *resumableStreamDecoder
 	rowd         *partialResultSetDecoder
 	setTimestamp func(time.Time)
@@ -133,7 +138,11 @@
 				r.RowCount = rc
 			}
 		}
-		r.rows, r.err = r.rowd.add(prs)
+		var metadata *sppb.ResultSetMetadata
+		r.rows, metadata, r.err = r.rowd.add(prs)
+		if metadata != nil {
+			r.Metadata = metadata
+		}
 		if r.err != nil {
 			return nil, r.err
 		}
@@ -648,7 +657,7 @@
 
 // add tries to merge a new PartialResultSet into buffered Row. It returns any
 // rows that have been completed as a result.
-func (p *partialResultSetDecoder) add(r *sppb.PartialResultSet) ([]*Row, error) {
+func (p *partialResultSetDecoder) add(r *sppb.PartialResultSet) ([]*Row, *sppb.ResultSetMetadata, error) {
 	var rows []*Row
 	if r.Metadata != nil {
 		// Metadata should only be returned in the first result.
@@ -663,20 +672,20 @@
 		}
 	}
 	if len(r.Values) == 0 {
-		return nil, nil
+		return nil, r.Metadata, nil
 	}
 	if p.chunked {
 		p.chunked = false
 		// Try to merge first value in r.Values into uncompleted row.
 		last := len(p.row.vals) - 1
 		if last < 0 { // confidence check
-			return nil, errChunkedEmptyRow()
+			return nil, nil, errChunkedEmptyRow()
 		}
 		var err error
 		// If p is chunked, then we should always try to merge p.last with
 		// r.first.
 		if p.row.vals[last], err = p.merge(p.row.vals[last], r.Values[0]); err != nil {
-			return nil, err
+			return nil, r.Metadata, err
 		}
 		r.Values = r.Values[1:]
 		// Merge is done, try to yield a complete Row.
@@ -698,7 +707,7 @@
 		// also chunked.
 		p.chunked = true
 	}
-	return rows, nil
+	return rows, r.Metadata, nil
 }
 
 // isMergeable returns if a protobuf Value can be potentially merged with other
diff --git a/spanner/read_test.go b/spanner/read_test.go
index 2b1ea48..87f6589 100644
--- a/spanner/read_test.go
+++ b/spanner/read_test.go
@@ -584,7 +584,7 @@
 		var rows []*Row
 		p := &partialResultSetDecoder{}
 		for j, v := range test.input {
-			rs, err := p.add(v)
+			rs, _, err := p.add(v)
 			if err != nil {
 				t.Errorf("test %d.%d: partialResultSetDecoder.add(%v) = %v; want nil", i, j, v, err)
 				continue nextTest