blob: 1ed30fc43cf49ba4be35460f6825aa467ea2b8ad [file] [log] [blame]
//
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
package pbeam
import (
"context"
"errors"
"fmt"
"reflect"
"strings"
"testing"
"time"
log "github.com/golang/glog"
"github.com/google/differential-privacy/privacy-on-beam/v2/internal/generated"
"github.com/google/differential-privacy/privacy-on-beam/v2/internal/kv"
"github.com/google/differential-privacy/privacy-on-beam/v2/pbeam/testutils"
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/funcx"
"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/direct"
"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"
"github.com/google/go-cmp/cmp"
)
func execute(ctx context.Context, p *beam.Pipeline) error {
_, err := direct.Execute(ctx, p)
return err
}
// Reusable between ParDo tests
var codec = kv.NewCodec(reflect.TypeOf(1), reflect.TypeOf(1))
var values = []testutils.PairII{
{17, 42},
{99, 0},
}
var goodResult = []testutils.PairII{
{17, 26},
{99, 5},
}
// Expected result for a cancelled context. The direct runner has a bug: when a
// doFn returns an error, the output is still collected (and contains the
// default value for the type).
var zeroResult = []testutils.PairII{
{17, 0},
{99, 0},
}
var goodResult2x1 = []testutils.PairII{
{17, 106},
{99, 1},
}
// testutils.PairICodedKVs and kv.Pairs shared among different tests.
// Initialized in init().
var goodResult2x2 []testutils.PairICodedKV
var valuesCodedKV []testutils.PairICodedKV
var zeroValuedCodedKV []testutils.PairICodedKV
var pairK84V22, pairK0V1, pairK0V0, pairK106V62, pairK1VMinus1, pairK42V11, pairK53V31 kv.Pair
func init() {
// We call the Setup method to supply the encoders and decoders inside codec at runtime.
codec.Setup()
var err error
pairK84V22, err = codec.Encode(84, 22)
if err != nil {
log.Exit(err)
}
pairK0V1, err = codec.Encode(0, 1)
if err != nil {
log.Exit(err)
}
pairK0V0, err = codec.Encode(0, 0)
if err != nil {
log.Exit(err)
}
pairK106V62, err = codec.Encode(106, 62)
if err != nil {
log.Exit(err)
}
pairK1VMinus1, err = codec.Encode(1, -1)
if err != nil {
log.Exit(err)
}
pairK42V11, err = codec.Encode(42, 11)
if err != nil {
log.Exit(err)
}
pairK53V31, err = codec.Encode(53, 31)
if err != nil {
log.Exit(err)
}
valuesCodedKV = []testutils.PairICodedKV{
{17, pairK84V22},
{99, pairK0V1},
}
zeroValuedCodedKV = []testutils.PairICodedKV{
{17, pairK0V0},
{99, pairK0V0},
}
goodResult2x2 = []testutils.PairICodedKV{
{17, pairK106V62},
{99, pairK1VMinus1},
}
}
func compareCodecs(codec1, codec2 *kv.Codec) bool {
if codec1 == nil && codec2 == nil {
return true
}
if (codec1 == nil) != (codec2 == nil) {
return false
}
if codec1.KType == codec2.KType && codec1.VType == codec2.VType {
return true
}
return false
}
func compareTypeDefs(typeDef1, typeDef2 beam.TypeDefinition) bool {
if typeDef1.Var == typeDef2.Var && typeDef1.T == typeDef2.T {
return true
}
return false
}
func TestParDo1x1(t *testing.T) {
doFn := func(v int) int { return v/2 + 5 }
p, s, col, wantCol := ptest.CreateList2(values, goodResult)
colKV := beam.ParDo(s, testutils.PairToKV, col)
// pcol should contain 17→42 and 99→0.
pcol := MakePrivate(s, colKV, NewPrivacySpec(1, 1e-10))
// We change that to 17→26 and 99→5 in the PrivatePCollection
pcol = ParDo(s, doFn, pcol)
gotCol := beam.ParDo(s, testutils.KVToPair, pcol.col)
passert.Equals(s, gotCol, wantCol)
if err := execute(context.Background(), p); err != nil {
t.Errorf("Got an error: %v", err)
}
}
func TestParDo1x2(t *testing.T) {
doFn := func(v int) (int, int) { return v * 2, v/2 + 1 }
p, s, col, wantCol := ptest.CreateList2(values, valuesCodedKV)
wantCodec := kv.NewCodec(reflect.TypeOf(int(0)), reflect.TypeOf(int(0)))
colKV := beam.ParDo(s, testutils.PairToKV, col)
// pcol should contain 17→42 and 99→0.
pcol := MakePrivate(s, colKV, NewPrivacySpec(1, 1e-10))
// We change that to 17→<kv.Pair{84, 22}> and 99→<kv.Pair{0, 1}> in the PrivatePCollection
pcol = ParDo(s, doFn, pcol)
gotCol := beam.ParDo(s, testutils.KVToPairICodedKV, pcol.col)
passert.Equals(s, gotCol, wantCol)
if err := execute(context.Background(), p); err != nil {
t.Errorf("Got an error: %v", err)
}
if diff := cmp.Diff(pcol.codec, wantCodec, cmp.Comparer(compareCodecs)); diff != "" {
t.Errorf("DoFn returned a PrivatePCollection with wrong codec, diff=%s", diff)
}
}
func TestParDoCtx1x2(t *testing.T) {
for _, tc := range []struct {
desc string
m func(context.Context) context.Context // function that modifies a context
want []testutils.PairICodedKV
}{
// good context
{"unchanged context", unchangedContext, valuesCodedKV},
// various context modifiers that will cancel the context before any work starts
{"cancel", cancel, zeroValuedCodedKV},
{"instantDeadline", instantDeadline, zeroValuedCodedKV},
{"instantTimeout", instantTimeout, zeroValuedCodedKV},
} {
doFn := func(ctx context.Context, v int) (int, int) {
if ctx.Err() != nil {
return 0, 0
}
return v * 2, v/2 + 1
}
p, s, col, wantCol := ptest.CreateList2(values, tc.want)
wantCodec := kv.NewCodec(reflect.TypeOf(int(0)), reflect.TypeOf(int(0)))
colKV := beam.ParDo(s, testutils.PairToKV, col)
// pcol should contain 17→42 and 99→0.
pcol := MakePrivate(s, colKV, NewPrivacySpec(1, 1e-10))
// We change that to 17→<kv.Pair{84, 22}> and 99→<kv.Pair{0, 1}> in the PrivatePCollection
pcol = ParDo(s, doFn, pcol)
gotCol := beam.ParDo(s, testutils.KVToPairICodedKV, pcol.col)
passert.Equals(s, gotCol, wantCol)
if err := execute(tc.m(context.Background()), p); err != nil {
t.Errorf("With %s, got an error: %v", tc.desc, err)
}
if diff := cmp.Diff(pcol.codec, wantCodec, cmp.Comparer(compareCodecs)); diff != "" {
t.Errorf("With %s, DoFn returned a PrivatePCollection with wrong codec, diff=%s", tc.desc, diff)
}
}
}
func TestParDo1x2Err(t *testing.T) {
hasErr := func(v int) (int, int, error) {
return 0, 0, errors.New("sample error")
}
noErr := func(v int) (int, int, error) {
return v * 2, v/2 + 1, nil
}
for _, tc := range []struct {
desc string
doFn func(v int) (int, int, error)
returnsErr bool
want []testutils.PairICodedKV
}{
{"doFn that does not return an error", noErr, false, valuesCodedKV},
{"doFn that returns an error", hasErr, true, zeroValuedCodedKV},
} {
p, s, col, wantCol := ptest.CreateList2(values, tc.want)
wantCodec := kv.NewCodec(reflect.TypeOf(int(0)), reflect.TypeOf(int(0)))
colKV := beam.ParDo(s, testutils.PairToKV, col)
// pcol should contain 17→42 and 99→0.
pcol := MakePrivate(s, colKV, NewPrivacySpec(1, 1e-10))
// We change that to 17→<kv.Pair{84, 22}> and 99→<kv.Pair{0, 1}> in the PrivatePCollection
pcol = ParDo(s, tc.doFn, pcol)
gotCol := beam.ParDo(s, testutils.KVToPairICodedKV, pcol.col)
passert.Equals(s, gotCol, wantCol)
if tc.returnsErr {
if err := execute(context.Background(), p); err == nil {
t.Errorf("With %s, expected runner to return an error, but didn't get one", tc.desc)
}
continue
}
if err := execute(context.Background(), p); err != nil {
t.Errorf("With %s, did not expect an error, but got one: %v", tc.desc, err)
}
if diff := cmp.Diff(pcol.codec, wantCodec, cmp.Comparer(compareCodecs)); diff != "" {
t.Errorf("With %s, DoFn returned a PrivatePCollection with wrong codec, diff=%s", tc.desc, diff)
}
}
}
func TestParDoCtx1x2Err(t *testing.T) {
hasErr := func(ctx context.Context, v int) (int, int, error) {
return 0, 0, errors.New("sample error")
}
noErr := func(ctx context.Context, v int) (int, int, error) {
if ctx.Err() != nil {
return 0, 0, nil
}
return v * 2, v/2 + 1, nil
}
for _, tc := range []struct {
desc string
m func(context.Context) context.Context // function that modifies a context
doFn func(context.Context, int) (int, int, error)
returnsErr bool
want []testutils.PairICodedKV
}{
// good context
{"unchanged context and no error", unchangedContext, noErr, false, valuesCodedKV},
// various context modifiers that will cancel the context before any work starts
{"cancel and no error", cancel, noErr, false, zeroValuedCodedKV},
{"instantDeadline and no error", instantDeadline, noErr, false, zeroValuedCodedKV},
{"instantTimeout and no error", instantTimeout, noErr, false, zeroValuedCodedKV},
{"instantTimeout and error", instantTimeout, hasErr, true, zeroValuedCodedKV},
} {
p, s, col, wantCol := ptest.CreateList2(values, tc.want)
wantCodec := kv.NewCodec(reflect.TypeOf(int(0)), reflect.TypeOf(int(0)))
colKV := beam.ParDo(s, testutils.PairToKV, col)
// pcol should contain 17→42 and 99→0.
pcol := MakePrivate(s, colKV, NewPrivacySpec(1, 1e-10))
// We change that to 17→<kv.Pair{84, 22}> and 99→<kv.Pair{0, 1}> in the PrivatePCollection
pcol = ParDo(s, tc.doFn, pcol)
gotCol := beam.ParDo(s, testutils.KVToPairICodedKV, pcol.col)
passert.Equals(s, gotCol, wantCol)
if tc.returnsErr {
if err := execute(tc.m(context.Background()), p); err == nil {
t.Errorf("With %s, expected runner to return an error, but didn't get one", tc.desc)
}
continue
}
if err := execute(tc.m(context.Background()), p); err != nil {
t.Errorf("With %s, did not expect an error, but got one: %v", tc.desc, err)
}
if diff := cmp.Diff(pcol.codec, wantCodec, cmp.Comparer(compareCodecs)); diff != "" {
t.Errorf("With %s, DoFn returned a PrivatePCollection with wrong codec, diff=%s", tc.desc, diff)
}
}
}
func TestParDo2x1(t *testing.T) {
doFn := func(k int, v int) int { return k + v }
p, s, col, wantCol := ptest.CreateList2(valuesCodedKV, goodResult2x1)
colKV := beam.ParDo(s, testutils.PairICodedKVToKV, col)
// pcol should contain 17→<kv.Pair{84, 22}> and 99→<kv.Pair{0, 1}>.
pcol := MakePrivate(s, colKV, NewPrivacySpec(1, 1e-10))
// We change that to 17→106 and 99→1 in the PrivatePCollection
pcol = ParDo(s, doFn, pcol)
gotCol := beam.ParDo(s, testutils.KVToPair, pcol.col)
passert.Equals(s, gotCol, wantCol)
if err := execute(context.Background(), p); err != nil {
t.Errorf("Got an error: %v", err)
}
}
func TestParDoCtx2x1(t *testing.T) {
for _, tc := range []struct {
desc string
// m is a function that modifies a context
m func(context.Context) context.Context
want []testutils.PairII
}{
// good context
{"unchanged context", unchangedContext, goodResult2x1},
// various context modifiers that will cancel the context before any work starts
{"cancel", cancel, zeroResult},
{"instantDeadline", instantDeadline, zeroResult},
{"instantTimeout", instantTimeout, zeroResult},
} {
doFn := func(ctx context.Context, k, v int) int {
if ctx.Err() != nil {
return 0
}
return k + v
}
p, s, col, wantCol := ptest.CreateList2(valuesCodedKV, tc.want)
colKV := beam.ParDo(s, testutils.PairICodedKVToKV, col)
// pcol should contain 17→<codedKV{84, 22}> and 99→<codedKV{0, 1}>.
pcol := MakePrivate(s, colKV, NewPrivacySpec(1, 1e-10))
// We change that to 17→106 and 99→1 in the PrivatePCollection
pcol = ParDo(s, doFn, pcol)
gotCol := beam.ParDo(s, testutils.KVToPair, pcol.col)
passert.Equals(s, gotCol, wantCol)
if err := execute(tc.m(context.Background()), p); err != nil {
t.Errorf("With %s, got an error: %v", tc.desc, err)
}
}
}
func TestParDo2x1Err(t *testing.T) {
noErr := func(k, v int) (int, error) {
return k + v, nil
}
hasErr := func(_, _ int) (int, error) {
return 0, errors.New("sample error")
}
for _, tc := range []struct {
desc string
doFn func(int, int) (int, error)
returnsErr bool
want []testutils.PairII
}{
{"doFn that does not return an error", noErr, false, goodResult2x1},
{"doFn that returns an error", hasErr, true, zeroResult},
} {
p, s, col, wantCol := ptest.CreateList2(valuesCodedKV, tc.want)
colKV := beam.ParDo(s, testutils.PairICodedKVToKV, col)
// pcol should contain 17→<codedKV{84, 22}> and 99→<codedKV{0, 1}>.
pcol := MakePrivate(s, colKV, NewPrivacySpec(1, 1e-10))
// We change that to 17→106 and 99→1 in the PrivatePCollection
pcol = ParDo(s, tc.doFn, pcol)
gotCol := beam.ParDo(s, testutils.KVToPair, pcol.col)
passert.Equals(s, gotCol, wantCol)
if tc.returnsErr {
if err := execute(context.Background(), p); err == nil {
t.Errorf("With %s, expected runner to return an error, but didn't get one", tc.desc)
}
continue
}
if err := execute(context.Background(), p); err != nil {
t.Errorf("With %s, did not expect an error, but got one: %v", tc.desc, err)
}
}
}
func TestParDoCtx2x1Err(t *testing.T) {
noErr := func(ctx context.Context, k, v int) (int, error) {
if ctx.Err() != nil {
return 0, nil
}
return k + v, nil
}
hasErr := func(_ context.Context, _, _ int) (int, error) {
return 0, errors.New("sample error")
}
for _, tc := range []struct {
desc string
m func(context.Context) context.Context // function that modifies a context
doFn func(context.Context, int, int) (int, error)
returnsErr bool
want []testutils.PairII
}{
// good context
{"unchanged context and no error", unchangedContext, noErr, false, goodResult2x1},
// various context modifiers that will cancel the context before any work starts
{"cancel and no error", cancel, noErr, false, zeroResult},
{"instantDeadline and no error", instantDeadline, noErr, false, zeroResult},
{"instantTimeout and no error", instantTimeout, noErr, false, zeroResult},
{"instantTimeout and error", instantTimeout, hasErr, true, zeroResult},
} {
p, s, col, wantCol := ptest.CreateList2(valuesCodedKV, tc.want)
colKV := beam.ParDo(s, testutils.PairICodedKVToKV, col)
// pcol should contain 17→<codedKV{84, 22}> and 99→<codedKV{0, 1}>.
pcol := MakePrivate(s, colKV, NewPrivacySpec(1, 1e-10))
// We change that to 17→106 and 99→1 in the PrivatePCollection
pcol = ParDo(s, tc.doFn, pcol)
gotCol := beam.ParDo(s, testutils.KVToPair, pcol.col)
passert.Equals(s, gotCol, wantCol)
if tc.returnsErr {
if err := execute(tc.m(context.Background()), p); err == nil {
t.Errorf("With %s, expected runner to return an error, but didn't get one", tc.desc)
}
continue
}
if err := execute(tc.m(context.Background()), p); err != nil {
t.Errorf("With %s, did not expect an error, but got one: %v", tc.desc, err)
}
}
}
func TestParDo2x2(t *testing.T) {
doFn := func(k int, v int) (int, int) { return k + v, k - v }
p, s, col, wantCol := ptest.CreateList2(valuesCodedKV, goodResult2x2)
wantCodec := kv.NewCodec(reflect.TypeOf(int(0)), reflect.TypeOf(int(0)))
colKV := beam.ParDo(s, testutils.PairICodedKVToKV, col)
// pcol should contain 17→<kv.Pair{84, 22}> and 99→<kv.Pair{0, 1}>.
pcol := MakePrivate(s, colKV, NewPrivacySpec(1, 1e-10))
// We change that to 17→<kv.Pair{106, 62}> and 99→<kv.Pair{1, -1}> in the PrivatePCollection
pcol = ParDo(s, doFn, pcol)
gotCol := beam.ParDo(s, testutils.KVToPairICodedKV, pcol.col)
passert.Equals(s, gotCol, wantCol)
if err := execute(context.Background(), p); err != nil {
t.Errorf("Got an error: %v", err)
}
if diff := cmp.Diff(pcol.codec, wantCodec, cmp.Comparer(compareCodecs)); diff != "" {
t.Errorf("DoFn returned a PrivatePCollection with wrong codec, diff=%s", diff)
}
}
func TestParDoCtx2x2(t *testing.T) {
for _, tc := range []struct {
desc string
m func(context.Context) context.Context // function that modifies a context
want []testutils.PairICodedKV
}{
// good context
{"unchanged context", unchangedContext, goodResult2x2},
// various context modifiers that will cancel the context before any work starts
{"cancel", cancel, zeroValuedCodedKV},
{"instantDeadline", instantDeadline, zeroValuedCodedKV},
{"instantTimeout", instantTimeout, zeroValuedCodedKV},
} {
doFn := func(ctx context.Context, k, v int) (int, int) {
if ctx.Err() != nil {
return 0, 0
}
return k + v, k - v
}
p, s, col, wantCol := ptest.CreateList2(valuesCodedKV, tc.want)
wantCodec := kv.NewCodec(reflect.TypeOf(int(0)), reflect.TypeOf(int(0)))
colKV := beam.ParDo(s, testutils.PairICodedKVToKV, col)
// pcol should contain 17→<kv.Pair{84, 22}> and 99→<kv.Pair{0, 1}>.
pcol := MakePrivate(s, colKV, NewPrivacySpec(1, 1e-10))
// We change that to 17→<kv.Pair{106, 62}> and 99→<kv.Pair{1, -1}> in the PrivatePCollection
pcol = ParDo(s, doFn, pcol)
gotCol := beam.ParDo(s, testutils.KVToPairICodedKV, pcol.col)
passert.Equals(s, gotCol, wantCol)
if err := execute(tc.m(context.Background()), p); err != nil {
t.Errorf("With %s, got an error: %v", tc.desc, err)
}
if diff := cmp.Diff(pcol.codec, wantCodec, cmp.Comparer(compareCodecs)); diff != "" {
t.Errorf("With %s, DoFn returned a PrivatePCollection with wrong codec, diff=%s", tc.desc, diff)
}
}
}
func TestParDo2x2Err(t *testing.T) {
noErr := func(k, v int) (int, int, error) {
return k + v, k - v, nil
}
hasErr := func(_, _ int) (int, int, error) {
return 0, 0, errors.New("sample error")
}
for _, tc := range []struct {
desc string
doFn func(int, int) (int, int, error)
returnsErr bool
want []testutils.PairICodedKV
}{
{"doFn that does not return an error", noErr, false, goodResult2x2},
{"doFn that returns an error", hasErr, true, zeroValuedCodedKV},
} {
p, s, col, wantCol := ptest.CreateList2(valuesCodedKV, tc.want)
wantCodec := kv.NewCodec(reflect.TypeOf(int(0)), reflect.TypeOf(int(0)))
colKV := beam.ParDo(s, testutils.PairICodedKVToKV, col)
// pcol should contain 17→<kv.Pair{84, 22}> and 99→<kv.Pair{0, 1}>.
pcol := MakePrivate(s, colKV, NewPrivacySpec(1, 1e-10))
// We change that to 17→<kv.Pair{106, 62}> and 99→<kv.Pair{1, -1}> in the PrivatePCollection
pcol = ParDo(s, tc.doFn, pcol)
gotCol := beam.ParDo(s, testutils.KVToPairICodedKV, pcol.col)
passert.Equals(s, gotCol, wantCol)
if tc.returnsErr {
if err := execute(context.Background(), p); err == nil {
t.Errorf("With %s, expected runner to return an error, but didn't get one", tc.desc)
}
continue
}
if err := execute(context.Background(), p); err != nil {
t.Errorf("With %s, did not expect an error, but got one: %v", tc.desc, err)
}
if diff := cmp.Diff(pcol.codec, wantCodec, cmp.Comparer(compareCodecs)); diff != "" {
t.Errorf("With %s, DoFn returned a PrivatePCollection with wrong codec, diff=%s", tc.desc, diff)
}
}
}
func TestParDoCtx2x2Err(t *testing.T) {
noErr := func(ctx context.Context, k, v int) (int, int, error) {
if ctx.Err() != nil {
return 0, 0, nil
}
return k + v, k - v, nil
}
hasErr := func(_ context.Context, _, _ int) (int, int, error) {
return 0, 0, errors.New("sample error")
}
for _, tc := range []struct {
desc string
m func(context.Context) context.Context // function that modifies a context
doFn func(context.Context, int, int) (int, int, error)
returnsErr bool
want []testutils.PairICodedKV
}{
// good context
{"unchanged context and no error", unchangedContext, noErr, false, goodResult2x2},
// various context modifiers that will cancel the context before any work starts
{"cancel and no error", cancel, noErr, false, zeroValuedCodedKV},
{"instantDeadline and no error", instantDeadline, noErr, false, zeroValuedCodedKV},
{"instantTimeout and no error", instantTimeout, noErr, false, zeroValuedCodedKV},
{"instantTimeout and error", instantTimeout, hasErr, true, zeroValuedCodedKV},
} {
p, s, col, wantCol := ptest.CreateList2(valuesCodedKV, tc.want)
wantCodec := kv.NewCodec(reflect.TypeOf(int(0)), reflect.TypeOf(int(0)))
colKV := beam.ParDo(s, testutils.PairICodedKVToKV, col)
// pcol should contain 17→<kv.Pair{84, 22}> and 99→<kv.Pair{0, 1}>.
pcol := MakePrivate(s, colKV, NewPrivacySpec(1, 1e-10))
// We change that to 17→<kv.Pair{106, 62}> and 99→<kv.Pair{1, -1}> in the PrivatePCollection
pcol = ParDo(s, tc.doFn, pcol)
gotCol := beam.ParDo(s, testutils.KVToPairICodedKV, pcol.col)
passert.Equals(s, gotCol, wantCol)
if tc.returnsErr {
if err := execute(tc.m(context.Background()), p); err == nil {
t.Errorf("With %s, expected runner to return an error, but didn't get one", tc.desc)
}
continue
}
if err := execute(tc.m(context.Background()), p); err != nil {
t.Errorf("With %s, did not expect an error, but got one: %v", tc.desc, err)
}
if diff := cmp.Diff(pcol.codec, wantCodec, cmp.Comparer(compareCodecs)); diff != "" {
t.Errorf("With %s, DoFn returned a PrivatePCollection with wrong codec, diff=%s", tc.desc, diff)
}
}
}
// Functions that modify a context.Context, so that tests can have a
// cancelled or expired context
func unchangedContext(ctx context.Context) context.Context {
return ctx
}
func cancel(ctx context.Context) context.Context {
newCtx, cancel := context.WithCancel(ctx)
cancel()
return newCtx
}
// These two functions cancel the context in a goroutine
// so that the context can "expire" naturally
func instantDeadline(ctx context.Context) context.Context {
newCtx, cancel := context.WithDeadline(ctx, time.Now())
go time.AfterFunc(time.Second, cancel)
return newCtx
}
func instantTimeout(ctx context.Context) context.Context {
newCtx, cancel := context.WithTimeout(ctx, 0)
go time.AfterFunc(time.Second, cancel)
return newCtx
}
// Reusable doFn that uses contexts
func doFnWithContext(ctx context.Context, v int) int {
if ctx.Err() != nil {
return 0
}
return v/2 + 5
}
func TestParDoCtx1x1(t *testing.T) {
for _, tc := range []struct {
// m is a function that modifies a context
m func(context.Context) context.Context
want []testutils.PairII
}{
// good context
{unchangedContext, goodResult},
// various context modifiers that will cancel the context before any work starts
{cancel, zeroResult},
{instantDeadline, zeroResult},
{instantTimeout, zeroResult},
} {
p, s, col, wantCol := ptest.CreateList2(values, tc.want)
colKV := beam.ParDo(s, testutils.PairToKV, col)
// pcol should contain 17→42 and 99→0.
pcol := MakePrivate(s, colKV, NewPrivacySpec(1, 1e-10))
// If the context was cancelled, we change that to 17→0 and 99→0 in the PrivatePCollection,
// Otherwise, we change that to 17→26 and 99→5
pcol = ParDo(s, doFnWithContext, pcol)
gotCol := beam.ParDo(s, testutils.KVToPair, pcol.col)
passert.Equals(s, gotCol, wantCol)
// Run the beam pipeline with the modified context
if err := execute(tc.m(context.Background()), p); err != nil {
t.Errorf("Got an error: %v", err)
}
}
}
func TestParDo1x1Err(t *testing.T) {
doFn := func(v int) (int, error) {
return v/2 + 5, nil
}
p, s, col, wantCol := ptest.CreateList2(values, goodResult)
colKV := beam.ParDo(s, testutils.PairToKV, col)
pcol := MakePrivate(s, colKV, NewPrivacySpec(1, 1e-10))
pcol = ParDo(s, doFn, pcol)
gotCol := beam.ParDo(s, testutils.KVToPair, pcol.col)
passert.Equals(s, gotCol, wantCol)
if err := execute(context.Background(), p); err != nil {
t.Errorf("Got an error: %v", err)
}
}
func TestParDo1x1ErrReturnsError(t *testing.T) {
doFn := func(v int) (int, error) {
return 0, errors.New("this function always returns an error")
}
p, s, col, wantCol := ptest.CreateList2(values, zeroResult)
colKV := beam.ParDo(s, testutils.PairToKV, col)
pcol := MakePrivate(s, colKV, NewPrivacySpec(1, 1e-10))
pcol = ParDo(s, doFn, pcol)
gotCol := beam.ParDo(s, testutils.KVToPair, pcol.col)
passert.Equals(s, gotCol, wantCol)
if err := execute(context.Background(), p); err == nil {
t.Errorf("Expected runner to return an error, but didn't get one")
}
}
func TestParDoCtx1x1Err(t *testing.T) {
doFn := func(ctx context.Context, v int) (int, error) {
return doFnWithContext(ctx, v), nil
}
for _, tc := range []struct {
// m is a function that modifies a context
m func(context.Context) context.Context
want []testutils.PairII
}{
// good context
{unchangedContext, goodResult},
// various context modifiers that will cancel the context before any work starts
{cancel, zeroResult},
{instantDeadline, zeroResult},
{instantTimeout, zeroResult},
} {
p, s, col, wantCol := ptest.CreateList2(values, tc.want)
colKV := beam.ParDo(s, testutils.PairToKV, col)
// pcol should contain 17→42 and 99→0.
pcol := MakePrivate(s, colKV, NewPrivacySpec(1, 1e-10))
// If the context was cancelled, we change that to 17→0 and 99→0 in the PrivatePCollection,
// Otherwise, we change that to 17→26 and 99→5
pcol = ParDo(s, doFn, pcol)
gotCol := beam.ParDo(s, testutils.KVToPair, pcol.col)
passert.Equals(s, gotCol, wantCol)
// Run the beam pipeline with the modified context
if err := execute(tc.m(context.Background()), p); err != nil {
t.Errorf("Got an error: %v", err)
}
}
}
func doFnPair(b int, emit func(int)) {
emit(b)
}
func doFnPairWithCtx(_ context.Context, b int, emit func(int)) {
emit(b)
}
func TestParDo1x1Emit(t *testing.T) {
values := []testutils.PairII{
{17, 42},
{19, 10},
{80, 99},
{99, 0},
}
p, s, col := ptest.CreateList(values)
colKV := beam.ParDo(s, testutils.PairToKV, col)
pcol := MakePrivate(s, colKV, NewPrivacySpec(1, 1e-10))
pcol = ParDo(s, doFnPair, pcol)
gotCol := beam.ParDo(s, testutils.KVToPair, pcol.col)
passert.Equals(s, gotCol, col)
if err := execute(context.Background(), p); err != nil {
t.Errorf("Without context, got an error: %v", err)
}
// Check for values with ctx passed in doFn
pcol = MakePrivate(s, colKV, NewPrivacySpec(1, 1e-10))
pcol = ParDo(s, doFnPairWithCtx, pcol)
gotCol = beam.ParDo(s, testutils.KVToPair, pcol.col)
passert.Equals(s, gotCol, col)
if err := execute(context.Background(), p); err != nil {
t.Errorf("With context, got an error: %v", err)
}
}
func TestParDo1x1ErrEmit(t *testing.T) {
noErr := func(v int, emit func(int)) error {
emit(v/2 + 5)
return nil
}
hasErr := func(v int, emit func(int)) error {
return errors.New("sample error")
}
for _, tc := range []struct {
desc string
doFn func(int, func(int)) error
returnsErr bool
want []testutils.PairII
}{
{"doFn that does not return an error", noErr, false, goodResult},
{"doFn that returns an error", hasErr, true, zeroResult},
} {
p, s, col, wantCol := ptest.CreateList2(values, tc.want)
colKV := beam.ParDo(s, testutils.PairToKV, col)
// pcol should contain 17→42 and 99→0.
pcol := MakePrivate(s, colKV, NewPrivacySpec(1, 1e-10))
// We change that to 17→26 and 99→5 in the PrivatePCollection
pcol = ParDo(s, tc.doFn, pcol)
gotCol := beam.ParDo(s, testutils.KVToPair, pcol.col)
passert.Equals(s, gotCol, wantCol)
if tc.returnsErr {
if err := execute(context.Background(), p); err == nil {
t.Errorf("With %s, expected runner to return an error, but didn't get one", tc.desc)
}
continue
}
if err := execute(context.Background(), p); err != nil {
t.Errorf("With %s, did not expect an error, but got one: %v", tc.desc, err)
}
}
}
func TestParDoCtx1x1ErrEmit(t *testing.T) {
noErr := func(ctx context.Context, v int, emit func(int)) error {
if ctx.Err() != nil {
emit(0)
return nil
}
emit(v/2 + 5)
return nil
}
hasErr := func(ctx context.Context, v int, emit func(int)) error {
return errors.New("sample error")
}
for _, tc := range []struct {
desc string
m func(context.Context) context.Context // function that modifies a context
doFn func(context.Context, int, func(int)) error
returnsErr bool
want []testutils.PairII
}{
// good context
{"unchanged context and no error", unchangedContext, noErr, false, goodResult},
// various context modifiers that will cancel the context before any work starts
{"cancel and no error", cancel, noErr, false, zeroResult},
{"instantDeadline and no error", instantDeadline, noErr, false, zeroResult},
{"instantTimeout and no error", instantTimeout, noErr, false, zeroResult},
{"instantTimeout and error", instantTimeout, hasErr, true, zeroResult},
} {
p, s, col, wantCol := ptest.CreateList2(values, tc.want)
colKV := beam.ParDo(s, testutils.PairToKV, col)
// pcol should contain 17→42 and 99→0.
pcol := MakePrivate(s, colKV, NewPrivacySpec(1, 1e-10))
// We change that to 17→26 and 99→5 in the PrivatePCollection
pcol = ParDo(s, tc.doFn, pcol)
gotCol := beam.ParDo(s, testutils.KVToPair, pcol.col)
passert.Equals(s, gotCol, wantCol)
if tc.returnsErr {
if err := execute(tc.m(context.Background()), p); err == nil {
t.Errorf("With %s, expected runner to return an error, but didn't get one", tc.desc)
}
continue
}
if err := execute(tc.m(context.Background()), p); err != nil {
t.Errorf("With %s, did not expect an error, but got one: %v", tc.desc, err)
}
}
}
func TestParDo1x2Emit(t *testing.T) {
for _, tc := range []struct {
desc string
doFn interface{}
want []testutils.PairICodedKV
}{
{"doFn that emits only non-zero inputs",
func(v int, emit func(int, int)) {
if v != 0 {
emit(v*2, v/2+1)
}
},
[]testutils.PairICodedKV{
{17, pairK84V22},
}},
{"doFn that emits each input once",
func(v int, emit func(int, int)) {
emit(v*2, v/2+1)
},
valuesCodedKV},
{"doFn that emits once or twice",
func(v int, emit func(int, int)) {
emit(v*2, v/2+1)
if v != 0 {
v = v / 2
emit(v*2, v/2+1)
}
},
[]testutils.PairICodedKV{
{17, pairK84V22},
{17, pairK42V11},
{99, pairK0V1},
}},
} {
p, s, col, wantCol := ptest.CreateList2(values, tc.want)
wantCodec := kv.NewCodec(reflect.TypeOf(int(0)), reflect.TypeOf(int(0)))
colKV := beam.ParDo(s, testutils.PairToKV, col)
// pcol should contain 17→42 and 99→0.
pcol := MakePrivate(s, colKV, NewPrivacySpec(1, 1e-10))
// We change that to 17→<kv.Pair{84, 22}> and 99→<kv.Pair{0, 1}> in the PrivatePCollection
pcol = ParDo(s, tc.doFn, pcol)
gotCol := beam.ParDo(s, testutils.KVToPairICodedKV, pcol.col)
passert.Equals(s, gotCol, wantCol)
if err := execute(context.Background(), p); err != nil {
t.Errorf("With %s, got an error: %v", tc.desc, err)
}
if diff := cmp.Diff(pcol.codec, wantCodec, cmp.Comparer(compareCodecs)); diff != "" {
t.Errorf("With %s, DoFn returned a PrivatePCollection with wrong codec, diff=%s", tc.desc, diff)
}
}
}
func TestParDoCtx1x2Emit(t *testing.T) {
for _, tc := range []struct {
desc string
m func(context.Context) context.Context // function that modifies a context
want []testutils.PairICodedKV
}{
// good context
{"unchanged context", unchangedContext, valuesCodedKV},
// various context modifiers that will cancel the context before any work starts
{"cancel", cancel, zeroValuedCodedKV},
{"instantDeadline", instantDeadline, zeroValuedCodedKV},
{"instantTimeout", instantTimeout, zeroValuedCodedKV},
} {
doFn := func(ctx context.Context, v int, emit func(int, int)) {
if ctx.Err() != nil {
emit(0, 0)
} else {
emit(v*2, v/2+1)
}
}
p, s, col, wantCol := ptest.CreateList2(values, tc.want)
wantCodec := kv.NewCodec(reflect.TypeOf(int(0)), reflect.TypeOf(int(0)))
colKV := beam.ParDo(s, testutils.PairToKV, col)
// pcol should contain 17→42 and 99→0.
pcol := MakePrivate(s, colKV, NewPrivacySpec(1, 1e-10))
// We change that to 17→<kv.Pair{84, 22}> and 99→<kv.Pair{0, 1}> in the PrivatePCollection
pcol = ParDo(s, doFn, pcol)
gotCol := beam.ParDo(s, testutils.KVToPairICodedKV, pcol.col)
passert.Equals(s, gotCol, wantCol)
if err := execute(tc.m(context.Background()), p); err != nil {
t.Errorf("With %s, got an error: %v", tc.desc, err)
}
if diff := cmp.Diff(pcol.codec, wantCodec, cmp.Comparer(compareCodecs)); diff != "" {
t.Errorf("With %s, DoFn returned a PrivatePCollection with wrong codec, diff=%s", tc.desc, diff)
}
}
}
func TestParDo1x2ErrEmit(t *testing.T) {
noErr := func(v int, emit func(int, int)) error {
emit(v*2, v/2+1)
return nil
}
hasErr := func(v int, emit func(int, int)) error {
return errors.New("sample error")
}
for _, tc := range []struct {
desc string
doFn func(int, func(int, int)) error
returnsErr bool
want []testutils.PairICodedKV
}{
{"doFn that does not return an error", noErr, false, valuesCodedKV},
{"doFn that returns an error", hasErr, true, zeroValuedCodedKV},
} {
p, s, col, wantCol := ptest.CreateList2(values, tc.want)
wantCodec := kv.NewCodec(reflect.TypeOf(int(0)), reflect.TypeOf(int(0)))
colKV := beam.ParDo(s, testutils.PairToKV, col)
// pcol should contain 17→42 and 99→0.
pcol := MakePrivate(s, colKV, NewPrivacySpec(1, 1e-10))
// We change that to 17→<kv.Pair{84, 22}> and 99→<kv.Pair{0, 1}> in the PrivatePCollection
pcol = ParDo(s, tc.doFn, pcol)
gotCol := beam.ParDo(s, testutils.KVToPairICodedKV, pcol.col)
passert.Equals(s, gotCol, wantCol)
if tc.returnsErr {
if err := execute(context.Background(), p); err == nil {
t.Errorf("With %s, expected runner to return an error, but didn't get one", tc.desc)
}
continue
}
if err := execute(context.Background(), p); err != nil {
t.Errorf("With %s, did not expect an error, but got one: %v", tc.desc, err)
}
if diff := cmp.Diff(pcol.codec, wantCodec, cmp.Comparer(compareCodecs)); diff != "" {
t.Errorf("With %s, DoFn returned a PrivatePCollection with wrong codec, diff=%s", tc.desc, diff)
}
}
}
func TestParDoCtx1x2ErrEmit(t *testing.T) {
noErr := func(ctx context.Context, v int, emit func(int, int)) error {
if ctx.Err() != nil {
emit(0, 0)
return nil
}
emit(v*2, v/2+1)
return nil
}
hasErr := func(ctx context.Context, v int, emit func(int, int)) error {
return errors.New("sample error")
}
for _, tc := range []struct {
desc string
m func(context.Context) context.Context // function that modifies a context
doFn func(context.Context, int, func(int, int)) error
returnsErr bool
want []testutils.PairICodedKV
}{
// good context
{"unchanged context and no error", unchangedContext, noErr, false, valuesCodedKV},
// various context modifiers that will cancel the context before any work starts
{"cancel and no error", cancel, noErr, false, zeroValuedCodedKV},
{"instantDeadline and no error", instantDeadline, noErr, false, zeroValuedCodedKV},
{"instantTimeout and no error", instantTimeout, noErr, false, zeroValuedCodedKV},
{"instantTimeout and error", instantTimeout, hasErr, true, zeroValuedCodedKV},
} {
p, s, col, wantCol := ptest.CreateList2(values, tc.want)
wantCodec := kv.NewCodec(reflect.TypeOf(int(0)), reflect.TypeOf(int(0)))
colKV := beam.ParDo(s, testutils.PairToKV, col)
// pcol should contain 17→42 and 99→0.
pcol := MakePrivate(s, colKV, NewPrivacySpec(1, 1e-10))
// We change that to 17→<kv.Pair{84, 22}> and 99→<kv.Pair{0, 1}> in the PrivatePCollection
pcol = ParDo(s, tc.doFn, pcol)
gotCol := beam.ParDo(s, testutils.KVToPairICodedKV, pcol.col)
passert.Equals(s, gotCol, wantCol)
if tc.returnsErr {
if err := execute(tc.m(context.Background()), p); err == nil {
t.Errorf("With %s, expected runner to return an error, but didn't get one", tc.desc)
}
continue
}
if err := execute(tc.m(context.Background()), p); err != nil {
t.Errorf("With %s, got an error: %v", tc.desc, err)
}
if diff := cmp.Diff(pcol.codec, wantCodec, cmp.Comparer(compareCodecs)); diff != "" {
t.Errorf("With %s, DoFn returned a PrivatePCollection with wrong codec, diff=%s", tc.desc, diff)
}
}
}
func TestParDo2x1Emit(t *testing.T) {
for _, tc := range []struct {
desc string
doFn interface{}
want []testutils.PairII
}{
{"doFn that emits only non-zero input k",
func(k, v int, emit func(int)) {
if k != 0 {
emit(k + v)
}
},
[]testutils.PairII{
{17, 106},
}},
{"doFn that emits each input once",
func(k, v int, emit func(int)) {
emit(k + v)
},
goodResult2x1},
{"doFn that emits once or twice",
func(k, v int, emit func(int)) {
emit(k + v)
if k != 0 {
k = k / 2
v = v / 2
emit(k + v)
}
},
[]testutils.PairII{
{17, 106},
{17, 53},
{99, 1},
}},
} {
p, s, col, wantCol := ptest.CreateList2(valuesCodedKV, tc.want)
colKV := beam.ParDo(s, testutils.PairICodedKVToKV, col)
// pcol should contain 17→<kv.Pair{84, 22}> and 99→<kv.Pair{0, 1}>.
pcol := MakePrivate(s, colKV, NewPrivacySpec(1, 1e-10))
// We change that to 17→106 and 99→1 in the PrivatePCollection
pcol = ParDo(s, tc.doFn, pcol)
gotCol := beam.ParDo(s, testutils.KVToPair, pcol.col)
passert.Equals(s, gotCol, wantCol)
if err := execute(context.Background(), p); err != nil {
t.Errorf("With %s, got an error: %v", tc.desc, err)
}
}
}
func TestParDoCtx2x1Emit(t *testing.T) {
for _, tc := range []struct {
desc string
m func(context.Context) context.Context // function that modifies a context
want []testutils.PairII
}{
// good context
{"unchanged context", unchangedContext, goodResult2x1},
// various context modifiers that will cancel the context before any work starts
{"cancel", cancel, zeroResult},
{"instantDeadline", instantDeadline, zeroResult},
{"instantTimeout", instantTimeout, zeroResult},
} {
doFn := func(ctx context.Context, k, v int, emit func(int)) {
if ctx.Err() != nil {
emit(0)
} else {
emit(k + v)
}
}
p, s, col, wantCol := ptest.CreateList2(valuesCodedKV, tc.want)
colKV := beam.ParDo(s, testutils.PairICodedKVToKV, col)
// pcol should contain 17→<kv.Pair{84, 22}> and 99→<kv.Pair{0, 1}>.
pcol := MakePrivate(s, colKV, NewPrivacySpec(1, 1e-10))
// We change that to 17→106 and 99→1 in the PrivatePCollection
pcol = ParDo(s, doFn, pcol)
gotCol := beam.ParDo(s, testutils.KVToPair, pcol.col)
passert.Equals(s, gotCol, wantCol)
if err := execute(tc.m(context.Background()), p); err != nil {
t.Errorf("With %s, got an error: %v", tc.desc, err)
}
}
}
func TestParDo2x1ErrEmit(t *testing.T) {
noErr := func(k, v int, emit func(int)) error {
emit(k + v)
return nil
}
hasErr := func(k, v int, emit func(int)) error {
return errors.New("sample error")
}
for _, tc := range []struct {
desc string
doFn func(int, int, func(int)) error
returnsErr bool
want []testutils.PairII
}{
{"doFn that does not return an error", noErr, false, goodResult2x1},
{"doFn that returns an error", hasErr, true, zeroResult},
} {
p, s, col, wantCol := ptest.CreateList2(valuesCodedKV, tc.want)
colKV := beam.ParDo(s, testutils.PairICodedKVToKV, col)
// pcol should contain 17→<codedKV{84, 22}> and 99→<codedKV{0, 1}>.
pcol := MakePrivate(s, colKV, NewPrivacySpec(1, 1e-10))
// We change that to 17→106 and 99→1 in the PrivatePCollection
pcol = ParDo(s, tc.doFn, pcol)
gotCol := beam.ParDo(s, testutils.KVToPair, pcol.col)
passert.Equals(s, gotCol, wantCol)
if tc.returnsErr {
if err := execute(context.Background(), p); err == nil {
t.Errorf("With %s, expected runner to return an error, but didn't get one", tc.desc)
}
continue
}
if err := execute(context.Background(), p); err != nil {
t.Errorf("With %s, did not expect an error, but got on: %v", tc.desc, err)
}
}
}
func TestParDoCtx2x1ErrEmit(t *testing.T) {
noErr := func(ctx context.Context, k, v int, emit func(int)) error {
if ctx.Err() != nil {
emit(0)
return nil
}
emit(k + v)
return nil
}
hasErr := func(ctx context.Context, k, v int, emit func(int)) error {
return errors.New("sample error")
}
for _, tc := range []struct {
desc string
m func(context.Context) context.Context // function that modifies a context
doFn func(context.Context, int, int, func(int)) error
returnsErr bool
want []testutils.PairII
}{
// good context
{"unchanged context and no error", unchangedContext, noErr, false, goodResult2x1},
// various context modifiers that will cancel the context before any work starts
{"cancel and no error", cancel, noErr, false, zeroResult},
{"instantDeadline and no error", instantDeadline, noErr, false, zeroResult},
{"instantTimeout and no error", instantTimeout, noErr, false, zeroResult},
{"instantTimeout and error", instantTimeout, hasErr, true, zeroResult},
} {
p, s, col, wantCol := ptest.CreateList2(valuesCodedKV, tc.want)
colKV := beam.ParDo(s, testutils.PairICodedKVToKV, col)
// pcol should contain 17→<codedKV{84, 22}> and 99→<codedKV{0, 1}>.
pcol := MakePrivate(s, colKV, NewPrivacySpec(1, 1e-10))
// We change that to 17→106 and 99→1 in the PrivatePCollection
pcol = ParDo(s, tc.doFn, pcol)
gotCol := beam.ParDo(s, testutils.KVToPair, pcol.col)
passert.Equals(s, gotCol, wantCol)
if tc.returnsErr {
if err := execute(tc.m(context.Background()), p); err == nil {
t.Errorf("With %s, expected runner to return an error, but didn't get one", tc.desc)
}
continue
}
if err := execute(tc.m(context.Background()), p); err != nil {
t.Errorf("With %s, did not expect an error, but got on: %v", tc.desc, err)
}
}
}
func TestParDo2x2Emit(t *testing.T) {
for _, tc := range []struct {
desc string
doFn interface{}
want []testutils.PairICodedKV
}{
{"doFn that emits only non-zero input k",
func(k, v int, emit func(int, int)) {
if k != 0 {
emit(k+v, k-v)
}
},
[]testutils.PairICodedKV{
{17, pairK106V62},
}},
{"doFn that emits each input once",
func(k, v int, emit func(int, int)) {
emit(k+v, k-v)
},
goodResult2x2},
{"doFn that emits once or twice",
func(k, v int, emit func(int, int)) {
emit(k+v, k-v)
if k != 0 {
k = k / 2
v = v / 2
emit(k+v, k-v)
}
},
[]testutils.PairICodedKV{
{17, pairK106V62},
{17, pairK53V31},
{99, pairK1VMinus1},
}},
} {
p, s, col, wantCol := ptest.CreateList2(valuesCodedKV, tc.want)
wantCodec := kv.NewCodec(reflect.TypeOf(int(0)), reflect.TypeOf(int(0)))
colKV := beam.ParDo(s, testutils.PairICodedKVToKV, col)
// pcol should contain 17→<kv.Pair{84, 22}> and 99→<kv.Pair{0, 1}>.
pcol := MakePrivate(s, colKV, NewPrivacySpec(1, 1e-10))
// We change that to 17→<kv.Pair{106, 62}> and 99→<kv.Pair{1, -1}> in the PrivatePCollection
pcol = ParDo(s, tc.doFn, pcol)
gotCol := beam.ParDo(s, testutils.KVToPairICodedKV, pcol.col)
passert.Equals(s, gotCol, wantCol)
if err := execute(context.Background(), p); err != nil {
t.Errorf("With %s, got an error: %v", tc.desc, err)
}
if diff := cmp.Diff(pcol.codec, wantCodec, cmp.Comparer(compareCodecs)); diff != "" {
t.Errorf("With %s, DoFn returned a PrivatePCollection with wrong codec, diff=%s", tc.desc, diff)
}
}
}
func TestParDoCtx2x2Emit(t *testing.T) {
for _, tc := range []struct {
desc string
m func(context.Context) context.Context // function that modifies a context
want []testutils.PairICodedKV
}{
// good context
{"unchanged context", unchangedContext, goodResult2x2},
// various context modifiers that will cancel the context before any work starts
{"cancel", cancel, zeroValuedCodedKV},
{"instantDeadline", instantDeadline, zeroValuedCodedKV},
{"instantTimeout", instantTimeout, zeroValuedCodedKV},
} {
doFn := func(ctx context.Context, k, v int, emit func(int, int)) {
if ctx.Err() != nil {
emit(0, 0)
} else {
emit(k+v, k-v)
}
}
p, s, col, wantCol := ptest.CreateList2(valuesCodedKV, tc.want)
wantCodec := kv.NewCodec(reflect.TypeOf(int(0)), reflect.TypeOf(int(0)))
colKV := beam.ParDo(s, testutils.PairICodedKVToKV, col)
// pcol should contain 17→<kv.Pair{84, 22}> and 99→<kv.Pair{0, 1}>.
pcol := MakePrivate(s, colKV, NewPrivacySpec(1, 1e-10))
// We change that to 17→<kv.Pair{106, 62}> and 99→<kv.Pair{1, -1}> in the PrivatePCollection
pcol = ParDo(s, doFn, pcol)
gotCol := beam.ParDo(s, testutils.KVToPairICodedKV, pcol.col)
passert.Equals(s, gotCol, wantCol)
if err := execute(tc.m(context.Background()), p); err != nil {
t.Errorf("With %s, got an error: %v", tc.desc, err)
}
if diff := cmp.Diff(pcol.codec, wantCodec, cmp.Comparer(compareCodecs)); diff != "" {
t.Errorf("With %s, DoFn returned a PrivatePCollection with wrong codec, diff=%s", tc.desc, diff)
}
}
}
func TestParDo2x2ErrEmit(t *testing.T) {
noErr := func(k, v int, emit func(int, int)) error {
emit(k+v, k-v)
return nil
}
hasErr := func(k, v int, emit func(int, int)) error {
return errors.New("sample error")
}
for _, tc := range []struct {
desc string
doFn func(int, int, func(int, int)) error
returnsErr bool
want []testutils.PairICodedKV
}{
{"doFn that does not return an error", noErr, false, goodResult2x2},
{"doFn that returns an error", hasErr, true, zeroValuedCodedKV},
} {
p, s, col, wantCol := ptest.CreateList2(valuesCodedKV, tc.want)
wantCodec := kv.NewCodec(reflect.TypeOf(int(0)), reflect.TypeOf(int(0)))
colKV := beam.ParDo(s, testutils.PairICodedKVToKV, col)
// pcol should contain 17→<kv.Pair{84, 22}> and 99→<kv.Pair{0, 1}>.
pcol := MakePrivate(s, colKV, NewPrivacySpec(1, 1e-10))
// We change that to 17→<kv.Pair{106, 62}> and 99→<kv.Pair{1, -1}> in the PrivatePCollection
pcol = ParDo(s, tc.doFn, pcol)
gotCol := beam.ParDo(s, testutils.KVToPairICodedKV, pcol.col)
passert.Equals(s, gotCol, wantCol)
if tc.returnsErr {
if err := execute(context.Background(), p); err == nil {
t.Errorf("With %s, expected runner to return an error, but didn't get one", tc.desc)
}
continue
}
if err := execute(context.Background(), p); err != nil {
t.Errorf("With %s, got an error: %v", tc.desc, err)
}
if diff := cmp.Diff(pcol.codec, wantCodec, cmp.Comparer(compareCodecs)); diff != "" {
t.Errorf("With %s, DoFn returned a PrivatePCollection with wrong codec, diff=%s", tc.desc, diff)
}
}
}
func TestParDoCtx2x2ErrEmit(t *testing.T) {
noErr := func(ctx context.Context, k, v int, emit func(int, int)) error {
if ctx.Err() != nil {
emit(0, 0)
return nil
}
emit(k+v, k-v)
return nil
}
hasErr := func(ctx context.Context, k, v int, emit func(int, int)) error {
return errors.New("sample error")
}
for _, tc := range []struct {
desc string
m func(context.Context) context.Context // function that modifies a context
doFn func(context.Context, int, int, func(int, int)) error
returnsErr bool
want []testutils.PairICodedKV
}{
// good context
{"unchanged context and no error", unchangedContext, noErr, false, goodResult2x2},
// various context modifiers that will cancel the context before any work starts
{"cancel and no error", cancel, noErr, false, zeroValuedCodedKV},
{"instantDeadline and no error", instantDeadline, noErr, false, zeroValuedCodedKV},
{"instantTimeout and no error", instantTimeout, noErr, false, zeroValuedCodedKV},
{"instantTimeout and error", instantTimeout, hasErr, true, zeroValuedCodedKV},
} {
p, s, col, wantCol := ptest.CreateList2(valuesCodedKV, tc.want)
wantCodec := kv.NewCodec(reflect.TypeOf(int(0)), reflect.TypeOf(int(0)))
colKV := beam.ParDo(s, testutils.PairICodedKVToKV, col)
// pcol should contain 17→<kv.Pair{84, 22}> and 99→<kv.Pair{0, 1}>.
pcol := MakePrivate(s, colKV, NewPrivacySpec(1, 1e-10))
// We change that to 17→<kv.Pair{106, 62}> and 99→<kv.Pair{1, -1}> in the PrivatePCollection
pcol = ParDo(s, tc.doFn, pcol)
gotCol := beam.ParDo(s, testutils.KVToPairICodedKV, pcol.col)
passert.Equals(s, gotCol, wantCol)
if tc.returnsErr {
if err := execute(tc.m(context.Background()), p); err == nil {
t.Errorf("With %s, expected runner to return an error, but didn't get one", tc.desc)
}
continue
}
if err := execute(tc.m(context.Background()), p); err != nil {
t.Errorf("With %s, got an error: %v", tc.desc, err)
}
if diff := cmp.Diff(pcol.codec, wantCodec, cmp.Comparer(compareCodecs)); diff != "" {
t.Errorf("With %s, DoFn returned a PrivatePCollection with wrong codec, diff=%s", tc.desc, diff)
}
}
}
// Ensure that valid DoFns built are built correctly
func TestBuildDoFn(t *testing.T) {
for _, tc := range []struct {
desc string
doFn interface{}
wantType reflect.Type
wantTypeDef beam.TypeDefinition
wantCodec *kv.Codec
}{
{"string → int",
func(x string) int { return len(x) },
reflect.TypeOf(&generated.TransformFn1x1{}),
beam.TypeDefinition{Var: beam.YType, T: reflect.TypeOf(int(0))},
nil,
},
{"(context, string) → int",
func(_ context.Context, x string) int { return len(x) },
reflect.TypeOf(&generated.TransformFnCtx1x1{}),
beam.TypeDefinition{Var: beam.YType, T: reflect.TypeOf(int(0))},
nil,
},
{"string → (int, error)",
func(x string) (int, error) { return len(x), nil },
reflect.TypeOf(&generated.TransformFn1x1Err{}),
beam.TypeDefinition{Var: beam.YType, T: reflect.TypeOf(int(0))},
nil,
},
{"(context, string) → (int, error)",
func(_ context.Context, x string) (int, error) { return len(x), nil },
reflect.TypeOf(&generated.TransformFnCtx1x1Err{}),
beam.TypeDefinition{Var: beam.YType, T: reflect.TypeOf(int(0))},
nil,
},
{"string → (string, int)",
func(x string) (string, int) { return x, len(x) },
reflect.TypeOf(&generated.TransformFn1x2{}),
beam.TypeDefinition{Var: beam.ZType, T: reflect.TypeOf(kv.Pair{})},
kv.NewCodec(reflect.TypeOf(""), reflect.TypeOf(int(0))),
},
{"(context, string) → (string, int)",
func(_ context.Context, x string) (string, int) { return x, len(x) },
reflect.TypeOf(&generated.TransformFnCtx1x2{}),
beam.TypeDefinition{Var: beam.ZType, T: reflect.TypeOf(kv.Pair{})},
kv.NewCodec(reflect.TypeOf(""), reflect.TypeOf(int(0))),
},
{"string→(string, int, error)",
func(x string) (string, int, error) { return x, len(x), nil },
reflect.TypeOf(&generated.TransformFn1x2Err{}),
beam.TypeDefinition{Var: beam.ZType, T: reflect.TypeOf(kv.Pair{})},
kv.NewCodec(reflect.TypeOf(""), reflect.TypeOf(int(0))),
},
{"(context, string) → (string, int, error)",
func(_ context.Context, x string) (string, int, error) { return x, len(x), nil },
reflect.TypeOf(&generated.TransformFnCtx1x2Err{}),
beam.TypeDefinition{Var: beam.ZType, T: reflect.TypeOf(kv.Pair{})},
kv.NewCodec(reflect.TypeOf(""), reflect.TypeOf(int(0))),
},
{"(string, int) → int",
func(x string, y int) int { return len(x) },
reflect.TypeOf(&generated.TransformFn2x1{}),
beam.TypeDefinition{Var: beam.YType, T: reflect.TypeOf(int(0))},
nil,
},
{"(context, string, int) → int",
func(_ context.Context, x string, y int) int { return len(x) },
reflect.TypeOf(&generated.TransformFnCtx2x1{}),
beam.TypeDefinition{Var: beam.YType, T: reflect.TypeOf(int(0))},
nil,
},
{"(string, int) → (int, error)",
func(x string, y int) (int, error) { return len(x), nil },
reflect.TypeOf(&generated.TransformFn2x1Err{}),
beam.TypeDefinition{Var: beam.YType, T: reflect.TypeOf(int(0))},
nil,
},
{"(context, string, int) → (int, error)",
func(_ context.Context, x string, y int) (int, error) { return len(x), nil },
reflect.TypeOf(&generated.TransformFnCtx2x1Err{}),
beam.TypeDefinition{Var: beam.YType, T: reflect.TypeOf(int(0))},
nil,
},
{"(string, int) → (string, int)",
func(x string, y int) (string, int) { return x, len(x) + y },
reflect.TypeOf(&generated.TransformFn2x2{}),
beam.TypeDefinition{},
kv.NewCodec(reflect.TypeOf(""), reflect.TypeOf(int(0))),
},
{"(context, string, int) → (string, int)",
func(_ context.Context, x string, y int) (string, int) { return x, len(x) + y },
reflect.TypeOf(&generated.TransformFnCtx2x2{}),
beam.TypeDefinition{},
kv.NewCodec(reflect.TypeOf(""), reflect.TypeOf(int(0))),
},
{"(string, int) → (string, int, error)",
func(x string, y int) (string, int, error) { return x, len(x) + y, nil },
reflect.TypeOf(&generated.TransformFn2x2Err{}),
beam.TypeDefinition{},
kv.NewCodec(reflect.TypeOf(""), reflect.TypeOf(int(0))),
},
{"(context, string, int) → (string, int, error)",
func(_ context.Context, x string, y int) (string, int, error) { return x, len(x) + y, nil },
reflect.TypeOf(&generated.TransformFnCtx2x2Err{}),
beam.TypeDefinition{},
kv.NewCodec(reflect.TypeOf(""), reflect.TypeOf(int(0))),
},
{"(string, emit(string)) → <no output>",
func(x string, emit func(string)) { emit(x) },
reflect.TypeOf(&generated.TransformFn1x1Emit{}),
beam.TypeDefinition{Var: beam.YType, T: reflect.TypeOf("")},
nil,
},
{"(context, string, emit(string)) → <no output>",
func(_ context.Context, x string, emit func(string)) { emit(x) },
reflect.TypeOf(&generated.TransformFnCtx1x1Emit{}),
beam.TypeDefinition{Var: beam.YType, T: reflect.TypeOf("")},
nil,
},
{"(string, emit(string)) → error",
func(x string, emit func(string)) error {
emit(x)
return nil
},
reflect.TypeOf(&generated.TransformFn1x1ErrEmit{}),
beam.TypeDefinition{Var: beam.YType, T: reflect.TypeOf("")},
nil,
},
{"(context, string, emit(string)) → error",
func(_ context.Context, x string, emit func(string)) error {
emit(x)
return nil
},
reflect.TypeOf(&generated.TransformFnCtx1x1ErrEmit{}),
beam.TypeDefinition{Var: beam.YType, T: reflect.TypeOf("")},
nil,
},
{"(string, emit(string, int)) → <no output>",
func(x string, emit func(string, int)) { emit(x, len(x)) },
reflect.TypeOf(&generated.TransformFn1x2Emit{}),
beam.TypeDefinition{Var: beam.ZType, T: reflect.TypeOf(kv.Pair{})},
kv.NewCodec(reflect.TypeOf(""), reflect.TypeOf(int(0))),
},
{"(context, string, emit(string, int)) → <no output>",
func(ctx context.Context, x string, emit func(string, int)) { emit(x, len(x)) },
reflect.TypeOf(&generated.TransformFnCtx1x2Emit{}),
beam.TypeDefinition{Var: beam.ZType, T: reflect.TypeOf(kv.Pair{})},
kv.NewCodec(reflect.TypeOf(""), reflect.TypeOf(int(0))),
},
{"(string, emit(string, int)) → error",
func(x string, emit func(string, int)) error {
emit(x, len(x))
return nil
},
reflect.TypeOf(&generated.TransformFn1x2ErrEmit{}),
beam.TypeDefinition{Var: beam.ZType, T: reflect.TypeOf(kv.Pair{})},
kv.NewCodec(reflect.TypeOf(""), reflect.TypeOf(int(0))),
},
{"(context, string, emit(string, int)) → error",
func(ctx context.Context, x string, emit func(string, int)) error {
emit(x, len(x))
return nil
},
reflect.TypeOf(&generated.TransformFnCtx1x2ErrEmit{}),
beam.TypeDefinition{Var: beam.ZType, T: reflect.TypeOf(kv.Pair{})},
kv.NewCodec(reflect.TypeOf(""), reflect.TypeOf(int(0))),
},
{"(string, int, emit(string)) → <no output>",
func(x string, y int, emit func(string)) { emit(x) },
reflect.TypeOf(&generated.TransformFn2x1Emit{}),
beam.TypeDefinition{Var: beam.YType, T: reflect.TypeOf("")},
nil,
},
{"(context, string, int, emit(string)) → <no output>",
func(ctx context.Context, x string, y int, emit func(string)) { emit(x) },
reflect.TypeOf(&generated.TransformFnCtx2x1Emit{}),
beam.TypeDefinition{Var: beam.YType, T: reflect.TypeOf("")},
nil,
},
{"(string, int, emit(string)) → error",
func(x string, y int, emit func(string)) error {
emit(x)
return nil
},
reflect.TypeOf(&generated.TransformFn2x1ErrEmit{}),
beam.TypeDefinition{Var: beam.YType, T: reflect.TypeOf("")},
nil,
},
{"(context, string, int, emit(string)) → error",
func(ctx context.Context, x string, y int, emit func(string)) error {
emit(x)
return nil
},
reflect.TypeOf(&generated.TransformFnCtx2x1ErrEmit{}),
beam.TypeDefinition{Var: beam.YType, T: reflect.TypeOf("")},
nil,
},
{"(string, int, emit(string, int)) → <no output>",
func(x string, y int, emit func(string, int)) { emit(x, len(x)) },
reflect.TypeOf(&generated.TransformFn2x2Emit{}),
beam.TypeDefinition{},
kv.NewCodec(reflect.TypeOf(""), reflect.TypeOf(int(0))),
},
{"(context, string, int, emit(string, int)) → <no output>",
func(ctx context.Context, x string, y int, emit func(string, int)) { emit(x, len(x)) },
reflect.TypeOf(&generated.TransformFnCtx2x2Emit{}),
beam.TypeDefinition{},
kv.NewCodec(reflect.TypeOf(""), reflect.TypeOf(int(0))),
},
{"(string, int, emit(string, int)) → error",
func(x string, y int, emit func(string, int)) error {
emit(x, len(x))
return nil
},
reflect.TypeOf(&generated.TransformFn2x2ErrEmit{}),
beam.TypeDefinition{},
kv.NewCodec(reflect.TypeOf(""), reflect.TypeOf(int(0))),
},
{"(context, string, int, emit(string, int)) → error",
func(ctx context.Context, x string, y int, emit func(string, int)) error {
emit(x, len(x))
return nil
},
reflect.TypeOf(&generated.TransformFnCtx2x2ErrEmit{}),
beam.TypeDefinition{},
kv.NewCodec(reflect.TypeOf(""), reflect.TypeOf(int(0))),
},
} {
got, err := buildDoFn(tc.doFn)
if err != nil {
t.Errorf("%s: buildDoFn returned error %v (expected no error)", tc.desc, err)
}
if got == nil {
t.Errorf("%s: buildDoFn returned nil function (wanted function of type %v)", tc.desc, tc.wantType.String())
}
if typ := reflect.TypeOf(got.fn); typ != tc.wantType {
t.Errorf("%s: buildDoFn returned wrong type %v (want %v)", tc.desc, typ.String(), tc.wantType.String())
}
if diff := cmp.Diff(got.typeDef, tc.wantTypeDef, cmp.Comparer(compareTypeDefs)); diff != "" {
t.Errorf("%s: buildDoFn returned wrong beam.TypeDefinition, diff=%s", tc.desc, diff)
}
if diff := cmp.Diff(got.codec, tc.wantCodec, cmp.Comparer(compareCodecs)); diff != "" {
t.Errorf("%s: buildDoFn returned wrong kv.Codec, diff=%s", tc.desc, diff)
}
}
}
type testStructuralDoFn struct {
state int
}
func (fn *testStructuralDoFn) ProcessElement(x int) int {
return fn.state + x
}
// Ensure that invalid DoFns return an error
func TestInvalidDoFn(t *testing.T) {
for _, tc := range []struct {
desc string
doFn interface{}
}{
{"structural doFn", &testStructuralDoFn{1}},
// bad inputs
{"(string, string, string) → int", func(x, y, z string) int { return len(x) }},
{"(EventTime, string) → int", func(_ beam.EventTime, x string) int { return len(x) }},
{"(Window, string) → int", func(_ beam.Window, x string) int { return len(x) }},
{"(string, emit(string), unWantedEmit(string)) → <no output>", func(x string, emit func(string), unWantedEmit func(string)) { emit(x) }},
{"(context, int, string, emit(string)) → string", func(_ context.Context, _ int, x string, emit func(string)) string { return x }},
{"(string, emit(string), unWantedEmit(string)) → <no output>", func(x string, emit func(string), unWantedEmit func(string)) { emit(x) }},
{"(context, int, string, emit(string)) → string", func(_ context.Context, _ int, x string, emit func(string)) string { return x }},
// bad outputs
{"string → (EventTime, int)", func(x string) (beam.EventTime, int) { return 0, len(x) }},
{"string → (int, int, int)", func(x string) (int, int, int) { return len(x), 0, -1 }},
{"string → <no output>", func(x string) {}},
{"(string, emit(string)) → string", func(x string, emit func(string)) string { return x }},
} {
// All of these doFns should return an error
got, err := buildDoFn(tc.doFn)
if got != nil {
t.Errorf("%s: buildDoFn returned (non-nil function),%v; expected nil function and error", tc.desc, err)
}
if err == nil {
t.Errorf("%s: buildDoFn returned <nil function>,<nil error>; expected an error", tc.desc)
}
// Check that we return a human-readable error, otherwise the
// tested fn is not a valid DoFn for beam.ParDo in the first place
if err != nil && strings.Contains(err.Error(), "couldn't create funcx.Fn") {
t.Errorf("%s: buildDoFn failed with a Beam error: %v", tc.desc, err)
}
}
}
func TestValidateArgOrder(t *testing.T) {
for _, tc := range []struct {
desc string
order []funcx.FnParamKind
valid bool
}{
{
desc: "(context)",
order: []funcx.FnParamKind{funcx.FnContext},
valid: true,
}, {
desc: "(time)",
order: []funcx.FnParamKind{funcx.FnEventTime},
valid: true,
}, {
desc: "(value)",
order: []funcx.FnParamKind{funcx.FnValue},
valid: true,
}, {
desc: "(context,time)",
order: []funcx.FnParamKind{funcx.FnContext, funcx.FnEventTime},
valid: true,
}, {
desc: "(context, value)",
order: []funcx.FnParamKind{funcx.FnContext, funcx.FnValue},
valid: true,
}, {
desc: "(time, context)",
order: []funcx.FnParamKind{funcx.FnEventTime, funcx.FnContext},
valid: false,
}, {
desc: "(time, value)",
order: []funcx.FnParamKind{funcx.FnEventTime, funcx.FnValue},
valid: true,
}, {
desc: "(value, context)",
order: []funcx.FnParamKind{funcx.FnValue, funcx.FnContext},
valid: false,
}, {
desc: "(value, time)",
order: []funcx.FnParamKind{funcx.FnValue, funcx.FnEventTime},
valid: false,
}, {
desc: "(context,time,value)",
order: []funcx.FnParamKind{funcx.FnContext, funcx.FnEventTime, funcx.FnValue},
valid: true,
}, {
desc: "(context,value,time)",
order: []funcx.FnParamKind{funcx.FnContext, funcx.FnValue, funcx.FnEventTime},
valid: false,
}, {
desc: "(time,context,value)",
order: []funcx.FnParamKind{funcx.FnEventTime, funcx.FnContext, funcx.FnValue},
valid: false,
}, {
desc: "(time,value,context)",
order: []funcx.FnParamKind{funcx.FnEventTime, funcx.FnContext, funcx.FnValue},
valid: false,
}, {
desc: "(value,context,time)",
order: []funcx.FnParamKind{funcx.FnValue, funcx.FnContext, funcx.FnEventTime},
valid: false,
}, {
desc: "(value,time,context)",
order: []funcx.FnParamKind{funcx.FnValue, funcx.FnEventTime, funcx.FnValue},
valid: false,
},
{
desc: "(emit, context,value)",
order: []funcx.FnParamKind{funcx.FnEmit, funcx.FnContext, funcx.FnValue},
valid: false,
},
{
desc: "(value, emit)",
order: []funcx.FnParamKind{funcx.FnValue, funcx.FnEmit},
valid: true,
},
{
desc: "(context,value, emit)",
order: []funcx.FnParamKind{funcx.FnContext, funcx.FnValue, funcx.FnEmit},
valid: true,
},
} {
var fn funcx.Fn
for _, kind := range tc.order {
fn.Param = append(fn.Param, funcx.FnParam{Kind: kind})
}
err := validateArgOrder(&fn)
if tc.valid {
if err != nil {
t.Errorf("Test case %s should have been valid, but got error %v", tc.desc, err)
}
} else if err == nil {
t.Errorf("Test case %s should not have been valid, but no error was returned", tc.desc)
}
}
}
func TestValidateReturnOrder(t *testing.T) {
for _, tc := range []struct {
desc string
order []funcx.ReturnKind
valid bool
}{
{
desc: "(time)",
order: []funcx.ReturnKind{funcx.RetEventTime},
valid: true,
}, {
desc: "(value)",
order: []funcx.ReturnKind{funcx.RetValue},
valid: true,
}, {
desc: "(error)",
order: []funcx.ReturnKind{funcx.RetError},
valid: true,
}, {
desc: "(time,value)",
order: []funcx.ReturnKind{funcx.RetEventTime, funcx.RetValue},
valid: true,
}, {
desc: "(time,error)",
order: []funcx.ReturnKind{funcx.RetEventTime, funcx.RetError},
valid: true,
}, {
desc: "(value,time)",
order: []funcx.ReturnKind{funcx.RetValue, funcx.RetEventTime},
valid: false,
}, {
desc: "(value,error)",
order: []funcx.ReturnKind{funcx.RetValue, funcx.RetError},
valid: true,
}, {
desc: "(error,time)",
order: []funcx.ReturnKind{funcx.RetError, funcx.RetEventTime},
valid: false,
}, {
desc: "(error,value)",
order: []funcx.ReturnKind{funcx.RetError, funcx.RetValue},
valid: false,
}, {
desc: "(time,value,error)",
order: []funcx.ReturnKind{funcx.RetEventTime, funcx.RetValue, funcx.RetError},
valid: true,
}, {
desc: "(time,error,value)",
order: []funcx.ReturnKind{funcx.RetEventTime, funcx.RetError, funcx.RetValue},
valid: false,
}, {
desc: "(value, time, error)",
order: []funcx.ReturnKind{funcx.RetValue, funcx.RetEventTime, funcx.RetError},
valid: false,
}, {
desc: "(value,error,time)",
order: []funcx.ReturnKind{funcx.RetValue, funcx.RetError, funcx.RetEventTime},
valid: false,
}, {
desc: "(error,time,value)",
order: []funcx.ReturnKind{funcx.RetError, funcx.RetEventTime, funcx.RetValue},
valid: false,
}, {
desc: "(error,value,time)",
order: []funcx.ReturnKind{funcx.RetError, funcx.RetValue, funcx.RetEventTime},
valid: false,
},
} {
t.Run(fmt.Sprintf("function with return values %s", tc.desc), func(t *testing.T) {
var fn funcx.Fn
for _, kind := range tc.order {
fn.Ret = append(fn.Ret, funcx.ReturnParam{Kind: kind})
}
err := validateRetOrder(&fn)
if tc.valid != (err == nil) {
t.Errorf("Testcase %s returned error %v, valid: %v", tc.desc, err, tc.valid)
}
})
}
}