feat(spanner/spannertest): implement FULL JOIN (#3218)
This doesn't fit into the same structure as the other JOINs, but is
approximately implemented as a LEFT JOIN with a catchup phase of
emitting the unmatched RHS rows.
diff --git a/spanner/spannertest/README.md b/spanner/spannertest/README.md
index d05f9d2..7fc451a 100644
--- a/spanner/spannertest/README.md
+++ b/spanner/spannertest/README.md
@@ -21,7 +21,7 @@
- more aggregation functions
- SELECT HAVING
- case insensitivity
-- FULL JOIN, multiple joins
+- multiple joins
- alternate literal types (esp. strings)
- STRUCT types
- transaction simulation
diff --git a/spanner/spannertest/db_query.go b/spanner/spannertest/db_query.go
index b9bbc01..a773533 100644
--- a/spanner/spannertest/db_query.go
+++ b/spanner/spannertest/db_query.go
@@ -741,7 +741,10 @@
ji.primary, ji.secondaryOrig = rhs, lhs
ji.primaryOffset, ji.secondaryOffset = len(rhsEC.cols), 0
case spansql.FullJoin:
- return nil, evalContext{}, fmt.Errorf("TODO: can't yet evaluate FULL JOIN")
+ // FULL JOIN is implemented as a LEFT JOIN with tracking for which rows of the RHS
+ // have been used. Then, at the end of the iteration, the unused RHS rows are emitted.
+ ji.nullPad = true
+ ji.used = make([]bool, 0, 10) // arbitrary preallocation
}
ji.ec.cols, ji.ec.row = nil, nil
@@ -781,11 +784,12 @@
}
return b != nil && *b, nil
}
- ji.zero = func(primary row) {
+ ji.zero = func(primary, secondary row) {
for i := range ji.ec.row {
ji.ec.row[i] = nil
}
copy(ji.ec.row[ji.primaryOffset:], primary)
+ copy(ji.ec.row[ji.secondaryOffset:], secondary)
}
}
@@ -844,11 +848,18 @@
return r[i]
}
// populate writes the data to ji.ec.row in the correct positions.
- populate := func(primary, secondary row) { // secondary may be nil
+ populate := func(primary, secondary row) { // either may be nil
j := 0
- for _, pi := range primaryUsing {
- ji.ec.row[j] = primary[pi]
- j++
+ if primary != nil {
+ for _, pi := range primaryUsing {
+ ji.ec.row[j] = primary[pi]
+ j++
+ }
+ } else {
+ for _, si := range secondaryUsing {
+ ji.ec.row[j] = secondary[si]
+ j++
+ }
}
lhs, rhs := primary, secondary
if flipped {
@@ -873,8 +884,8 @@
populate(primary, secondary)
return true, nil
}
- ji.zero = func(primary row) {
- populate(primary, nil)
+ ji.zero = func(primary, secondary row) {
+ populate(primary, secondary)
}
return nil
}
@@ -894,16 +905,24 @@
// should be yielded with null padding (e.g. OUTER JOINs).
nullPad bool
- primaryRow row // current row from primary, or nil if it is time to advance
- secondary *rawIter // current clone of secondary
- any bool // true if any secondary rows have matched primaryRow
+ primaryRow row // current row from primary, or nil if it is time to advance
+ secondary *rawIter // current clone of secondary
+ secondaryRead int // number of rows already read from secondary
+ any bool // true if any secondary rows have matched primaryRow
// cond reports whether the primary and secondary rows "join" (e.g. the ON clause is true).
// It populates ec.row with the output.
cond func(primary, secondary row) (bool, error)
- // zero populates ec.row with the primary row and sets the remainder to NULL.
- // This is used when nullPad is true and a primary row doesn't match any secondary row.
- zero func(primary row)
+ // zero populates ec.row with the primary or secondary row data (either of which may be nil),
+ // and sets the remainder to NULL.
+ // This is used when nullPad is true and a primary or secondary row doesn't match.
+ zero func(primary, secondary row)
+
+ // For FULL JOIN, this tracks the secondary rows that have been used.
+ // It is non-nil when being used.
+ used []bool
+ zeroUnused bool // set when emitting unused secondary rows
+ unusedIndex int // next index of used to check
}
func (ji *joinIter) Cols() []colInfo { return ji.ec.cols }
@@ -915,16 +934,49 @@
return err
}
ji.secondary = ji.secondaryOrig.clone()
+ ji.secondaryRead = 0
ji.any = false
return nil
}
func (ji *joinIter) Next() (row, error) {
- if ji.primaryRow == nil {
- if err := ji.nextPrimary(); err != nil {
+ if ji.primaryRow == nil && !ji.zeroUnused {
+ err := ji.nextPrimary()
+ if err == io.EOF && ji.used != nil {
+ // Drop down to emitting unused secondary rows.
+ ji.zeroUnused = true
+ ji.secondary = nil
+ goto scanJiUsed
+ }
+ if err != nil {
return nil, err
}
}
+scanJiUsed:
+ if ji.zeroUnused {
+ if ji.secondary == nil {
+ ji.secondary = ji.secondaryOrig.clone()
+ ji.secondaryRead = 0
+ }
+ for ji.unusedIndex < len(ji.used) && ji.used[ji.unusedIndex] {
+ ji.unusedIndex++
+ }
+ if ji.unusedIndex >= len(ji.used) || ji.secondaryRead >= len(ji.used) {
+ // Truly finished.
+ return nil, io.EOF
+ }
+ var secondaryRow row
+ for ji.secondaryRead <= ji.unusedIndex {
+ var err error
+ secondaryRow, err = ji.secondary.Next()
+ if err != nil {
+ return nil, err
+ }
+ ji.secondaryRead++
+ }
+ ji.zero(nil, secondaryRow)
+ return ji.ec.row, nil
+ }
for {
secondaryRow, err := ji.secondary.Next()
@@ -932,13 +984,19 @@
// Finished the current primary row.
if !ji.any && ji.nullPad {
- ji.zero(ji.primaryRow)
+ ji.zero(ji.primaryRow, nil)
ji.primaryRow = nil
return ji.ec.row, nil
}
// Advance to next one.
- if err := ji.nextPrimary(); err != nil {
+ err := ji.nextPrimary()
+ if err == io.EOF && ji.used != nil {
+ ji.zeroUnused = true
+ ji.secondary = nil
+ goto scanJiUsed
+ }
+ if err != nil {
return nil, err
}
continue
@@ -946,6 +1004,12 @@
if err != nil {
return nil, err
}
+ ji.secondaryRead++
+ if ji.used != nil {
+ for len(ji.used) < ji.secondaryRead {
+ ji.used = append(ji.used, false)
+ }
+ }
// We have a pair of rows to consider.
match, err := ji.cond(ji.primaryRow, secondaryRow)
@@ -956,6 +1020,10 @@
continue
}
ji.any = true
+ if ji.used != nil {
+ // Make a note that we used this secondary row.
+ ji.used[ji.secondaryRead-1] = true
+ }
return ji.ec.row, nil
}
}
diff --git a/spanner/spannertest/integration_test.go b/spanner/spannertest/integration_test.go
index f70b01f..365dd78 100644
--- a/spanner/spannertest/integration_test.go
+++ b/spanner/spannertest/integration_test.go
@@ -960,6 +960,34 @@
},
},
{
+ // Same as in docs, but with a weird ORDER BY clause to match the row ordering.
+ `SELECT * FROM JoinA FULL OUTER JOIN JoinB ON JoinA.w = JoinB.y ORDER BY w IS NULL, w, x, y, z`,
+ nil,
+ [][]interface{}{
+ {int64(1), "a", nil, nil},
+ {int64(2), "b", int64(2), "k"},
+ {int64(3), "c", int64(3), "m"},
+ {int64(3), "c", int64(3), "n"},
+ {int64(3), "d", int64(3), "m"},
+ {int64(3), "d", int64(3), "n"},
+ {nil, nil, int64(4), "p"},
+ },
+ },
+ {
+ // Same as the previous, but using a USING clause instead of an ON clause.
+ `SELECT * FROM JoinC FULL OUTER JOIN JoinD USING (x) ORDER BY x, y, z`,
+ nil,
+ [][]interface{}{
+ {int64(1), "a", nil},
+ {int64(2), "b", "k"},
+ {int64(3), "c", "m"},
+ {int64(3), "c", "n"},
+ {int64(3), "d", "m"},
+ {int64(3), "d", "n"},
+ {int64(4), nil, "p"},
+ },
+ },
+ {
`SELECT * FROM JoinA LEFT OUTER JOIN JoinB AS B ON JoinA.w = B.y ORDER BY w, x, y, z`,
nil,
[][]interface{}{