feat(spanner/spannertest): implement FULL JOIN (#3218)

This doesn't fit into the same structure as the other JOINs, but is
approximately implemented as a LEFT JOIN with a catchup phase of
emitting the unmatched RHS rows.
diff --git a/spanner/spannertest/README.md b/spanner/spannertest/README.md
index d05f9d2..7fc451a 100644
--- a/spanner/spannertest/README.md
+++ b/spanner/spannertest/README.md
@@ -21,7 +21,7 @@
 - more aggregation functions
 - SELECT HAVING
 - case insensitivity
-- FULL JOIN, multiple joins
+- multiple joins
 - alternate literal types (esp. strings)
 - STRUCT types
 - transaction simulation
diff --git a/spanner/spannertest/db_query.go b/spanner/spannertest/db_query.go
index b9bbc01..a773533 100644
--- a/spanner/spannertest/db_query.go
+++ b/spanner/spannertest/db_query.go
@@ -741,7 +741,10 @@
 		ji.primary, ji.secondaryOrig = rhs, lhs
 		ji.primaryOffset, ji.secondaryOffset = len(rhsEC.cols), 0
 	case spansql.FullJoin:
-		return nil, evalContext{}, fmt.Errorf("TODO: can't yet evaluate FULL JOIN")
+		// FULL JOIN is implemented as a LEFT JOIN with tracking for which rows of the RHS
+		// have been used. Then, at the end of the iteration, the unused RHS rows are emitted.
+		ji.nullPad = true
+		ji.used = make([]bool, 0, 10) // arbitrary preallocation
 	}
 	ji.ec.cols, ji.ec.row = nil, nil
 
@@ -781,11 +784,12 @@
 		}
 		return b != nil && *b, nil
 	}
-	ji.zero = func(primary row) {
+	ji.zero = func(primary, secondary row) {
 		for i := range ji.ec.row {
 			ji.ec.row[i] = nil
 		}
 		copy(ji.ec.row[ji.primaryOffset:], primary)
+		copy(ji.ec.row[ji.secondaryOffset:], secondary)
 	}
 }
 
@@ -844,11 +848,18 @@
 		return r[i]
 	}
 	// populate writes the data to ji.ec.row in the correct positions.
-	populate := func(primary, secondary row) { // secondary may be nil
+	populate := func(primary, secondary row) { // either may be nil
 		j := 0
-		for _, pi := range primaryUsing {
-			ji.ec.row[j] = primary[pi]
-			j++
+		if primary != nil {
+			for _, pi := range primaryUsing {
+				ji.ec.row[j] = primary[pi]
+				j++
+			}
+		} else {
+			for _, si := range secondaryUsing {
+				ji.ec.row[j] = secondary[si]
+				j++
+			}
 		}
 		lhs, rhs := primary, secondary
 		if flipped {
@@ -873,8 +884,8 @@
 		populate(primary, secondary)
 		return true, nil
 	}
-	ji.zero = func(primary row) {
-		populate(primary, nil)
+	ji.zero = func(primary, secondary row) {
+		populate(primary, secondary)
 	}
 	return nil
 }
@@ -894,16 +905,24 @@
 	// should be yielded with null padding (e.g. OUTER JOINs).
 	nullPad bool
 
-	primaryRow row      // current row from primary, or nil if it is time to advance
-	secondary  *rawIter // current clone of secondary
-	any        bool     // true if any secondary rows have matched primaryRow
+	primaryRow    row      // current row from primary, or nil if it is time to advance
+	secondary     *rawIter // current clone of secondary
+	secondaryRead int      // number of rows already read from secondary
+	any           bool     // true if any secondary rows have matched primaryRow
 
 	// cond reports whether the primary and secondary rows "join" (e.g. the ON clause is true).
 	// It populates ec.row with the output.
 	cond func(primary, secondary row) (bool, error)
-	// zero populates ec.row with the primary row and sets the remainder to NULL.
-	// This is used when nullPad is true and a primary row doesn't match any secondary row.
-	zero func(primary row)
+	// zero populates ec.row with the primary or secondary row data (either of which may be nil),
+	// and sets the remainder to NULL.
+	// This is used when nullPad is true and a primary or secondary row doesn't match.
+	zero func(primary, secondary row)
+
+	// For FULL JOIN, this tracks the secondary rows that have been used.
+	// It is non-nil when being used.
+	used        []bool
+	zeroUnused  bool // set when emitting unused secondary rows
+	unusedIndex int  // next index of used to check
 }
 
 func (ji *joinIter) Cols() []colInfo { return ji.ec.cols }
@@ -915,16 +934,49 @@
 		return err
 	}
 	ji.secondary = ji.secondaryOrig.clone()
+	ji.secondaryRead = 0
 	ji.any = false
 	return nil
 }
 
 func (ji *joinIter) Next() (row, error) {
-	if ji.primaryRow == nil {
-		if err := ji.nextPrimary(); err != nil {
+	if ji.primaryRow == nil && !ji.zeroUnused {
+		err := ji.nextPrimary()
+		if err == io.EOF && ji.used != nil {
+			// Drop down to emitting unused secondary rows.
+			ji.zeroUnused = true
+			ji.secondary = nil
+			goto scanJiUsed
+		}
+		if err != nil {
 			return nil, err
 		}
 	}
+scanJiUsed:
+	if ji.zeroUnused {
+		if ji.secondary == nil {
+			ji.secondary = ji.secondaryOrig.clone()
+			ji.secondaryRead = 0
+		}
+		for ji.unusedIndex < len(ji.used) && ji.used[ji.unusedIndex] {
+			ji.unusedIndex++
+		}
+		if ji.unusedIndex >= len(ji.used) || ji.secondaryRead >= len(ji.used) {
+			// Truly finished.
+			return nil, io.EOF
+		}
+		var secondaryRow row
+		for ji.secondaryRead <= ji.unusedIndex {
+			var err error
+			secondaryRow, err = ji.secondary.Next()
+			if err != nil {
+				return nil, err
+			}
+			ji.secondaryRead++
+		}
+		ji.zero(nil, secondaryRow)
+		return ji.ec.row, nil
+	}
 
 	for {
 		secondaryRow, err := ji.secondary.Next()
@@ -932,13 +984,19 @@
 			// Finished the current primary row.
 
 			if !ji.any && ji.nullPad {
-				ji.zero(ji.primaryRow)
+				ji.zero(ji.primaryRow, nil)
 				ji.primaryRow = nil
 				return ji.ec.row, nil
 			}
 
 			// Advance to next one.
-			if err := ji.nextPrimary(); err != nil {
+			err := ji.nextPrimary()
+			if err == io.EOF && ji.used != nil {
+				ji.zeroUnused = true
+				ji.secondary = nil
+				goto scanJiUsed
+			}
+			if err != nil {
 				return nil, err
 			}
 			continue
@@ -946,6 +1004,12 @@
 		if err != nil {
 			return nil, err
 		}
+		ji.secondaryRead++
+		if ji.used != nil {
+			for len(ji.used) < ji.secondaryRead {
+				ji.used = append(ji.used, false)
+			}
+		}
 
 		// We have a pair of rows to consider.
 		match, err := ji.cond(ji.primaryRow, secondaryRow)
@@ -956,6 +1020,10 @@
 			continue
 		}
 		ji.any = true
+		if ji.used != nil {
+			// Make a note that we used this secondary row.
+			ji.used[ji.secondaryRead-1] = true
+		}
 		return ji.ec.row, nil
 	}
 }
diff --git a/spanner/spannertest/integration_test.go b/spanner/spannertest/integration_test.go
index f70b01f..365dd78 100644
--- a/spanner/spannertest/integration_test.go
+++ b/spanner/spannertest/integration_test.go
@@ -960,6 +960,34 @@
 			},
 		},
 		{
+			// Same as in docs, but with a weird ORDER BY clause to match the row ordering.
+			`SELECT * FROM JoinA FULL OUTER JOIN JoinB ON JoinA.w = JoinB.y ORDER BY w IS NULL, w, x, y, z`,
+			nil,
+			[][]interface{}{
+				{int64(1), "a", nil, nil},
+				{int64(2), "b", int64(2), "k"},
+				{int64(3), "c", int64(3), "m"},
+				{int64(3), "c", int64(3), "n"},
+				{int64(3), "d", int64(3), "m"},
+				{int64(3), "d", int64(3), "n"},
+				{nil, nil, int64(4), "p"},
+			},
+		},
+		{
+			// Same as the previous, but using a USING clause instead of an ON clause.
+			`SELECT * FROM JoinC FULL OUTER JOIN JoinD USING (x) ORDER BY x, y, z`,
+			nil,
+			[][]interface{}{
+				{int64(1), "a", nil},
+				{int64(2), "b", "k"},
+				{int64(3), "c", "m"},
+				{int64(3), "c", "n"},
+				{int64(3), "d", "m"},
+				{int64(3), "d", "n"},
+				{int64(4), nil, "p"},
+			},
+		},
+		{
 			`SELECT * FROM JoinA LEFT OUTER JOIN JoinB AS B ON JoinA.w = B.y ORDER BY w, x, y, z`,
 			nil,
 			[][]interface{}{