blob: 1ed30fc43cf49ba4be35460f6825aa467ea2b8ad [file] [log] [blame]
// Copyright 2020 Google LLC
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package pbeam
import (
log ""
func execute(ctx context.Context, p *beam.Pipeline) error {
_, err := direct.Execute(ctx, p)
return err
// Reusable between ParDo tests
var codec = kv.NewCodec(reflect.TypeOf(1), reflect.TypeOf(1))
var values = []testutils.PairII{
{17, 42},
{99, 0},
var goodResult = []testutils.PairII{
{17, 26},
{99, 5},
// Expected result for a cancelled context. The direct runner has a bug: when a
// doFn returns an error, the output is still collected (and contains the
// default value for the type).
var zeroResult = []testutils.PairII{
{17, 0},
{99, 0},
var goodResult2x1 = []testutils.PairII{
{17, 106},
{99, 1},
// testutils.PairICodedKVs and kv.Pairs shared among different tests.
// Initialized in init().
var goodResult2x2 []testutils.PairICodedKV
var valuesCodedKV []testutils.PairICodedKV
var zeroValuedCodedKV []testutils.PairICodedKV
var pairK84V22, pairK0V1, pairK0V0, pairK106V62, pairK1VMinus1, pairK42V11, pairK53V31 kv.Pair
func init() {
// We call the Setup method to supply the encoders and decoders inside codec at runtime.
var err error
pairK84V22, err = codec.Encode(84, 22)
if err != nil {
pairK0V1, err = codec.Encode(0, 1)
if err != nil {
pairK0V0, err = codec.Encode(0, 0)
if err != nil {
pairK106V62, err = codec.Encode(106, 62)
if err != nil {
pairK1VMinus1, err = codec.Encode(1, -1)
if err != nil {
pairK42V11, err = codec.Encode(42, 11)
if err != nil {
pairK53V31, err = codec.Encode(53, 31)
if err != nil {
valuesCodedKV = []testutils.PairICodedKV{
{17, pairK84V22},
{99, pairK0V1},
zeroValuedCodedKV = []testutils.PairICodedKV{
{17, pairK0V0},
{99, pairK0V0},
goodResult2x2 = []testutils.PairICodedKV{
{17, pairK106V62},
{99, pairK1VMinus1},
func compareCodecs(codec1, codec2 *kv.Codec) bool {
if codec1 == nil && codec2 == nil {
return true
if (codec1 == nil) != (codec2 == nil) {
return false
if codec1.KType == codec2.KType && codec1.VType == codec2.VType {
return true
return false
func compareTypeDefs(typeDef1, typeDef2 beam.TypeDefinition) bool {
if typeDef1.Var == typeDef2.Var && typeDef1.T == typeDef2.T {
return true
return false
func TestParDo1x1(t *testing.T) {
doFn := func(v int) int { return v/2 + 5 }
p, s, col, wantCol := ptest.CreateList2(values, goodResult)
colKV := beam.ParDo(s, testutils.PairToKV, col)
// pcol should contain 17→42 and 99→0.
pcol := MakePrivate(s, colKV, NewPrivacySpec(1, 1e-10))
// We change that to 17→26 and 99→5 in the PrivatePCollection
pcol = ParDo(s, doFn, pcol)
gotCol := beam.ParDo(s, testutils.KVToPair, pcol.col)
passert.Equals(s, gotCol, wantCol)
if err := execute(context.Background(), p); err != nil {
t.Errorf("Got an error: %v", err)
func TestParDo1x2(t *testing.T) {
doFn := func(v int) (int, int) { return v * 2, v/2 + 1 }
p, s, col, wantCol := ptest.CreateList2(values, valuesCodedKV)
wantCodec := kv.NewCodec(reflect.TypeOf(int(0)), reflect.TypeOf(int(0)))
colKV := beam.ParDo(s, testutils.PairToKV, col)
// pcol should contain 17→42 and 99→0.
pcol := MakePrivate(s, colKV, NewPrivacySpec(1, 1e-10))
// We change that to 17→<kv.Pair{84, 22}> and 99→<kv.Pair{0, 1}> in the PrivatePCollection
pcol = ParDo(s, doFn, pcol)
gotCol := beam.ParDo(s, testutils.KVToPairICodedKV, pcol.col)
passert.Equals(s, gotCol, wantCol)
if err := execute(context.Background(), p); err != nil {
t.Errorf("Got an error: %v", err)
if diff := cmp.Diff(pcol.codec, wantCodec, cmp.Comparer(compareCodecs)); diff != "" {
t.Errorf("DoFn returned a PrivatePCollection with wrong codec, diff=%s", diff)
func TestParDoCtx1x2(t *testing.T) {
for _, tc := range []struct {
desc string
m func(context.Context) context.Context // function that modifies a context
want []testutils.PairICodedKV
// good context
{"unchanged context", unchangedContext, valuesCodedKV},
// various context modifiers that will cancel the context before any work starts
{"cancel", cancel, zeroValuedCodedKV},
{"instantDeadline", instantDeadline, zeroValuedCodedKV},
{"instantTimeout", instantTimeout, zeroValuedCodedKV},
} {
doFn := func(ctx context.Context, v int) (int, int) {
if ctx.Err() != nil {
return 0, 0
return v * 2, v/2 + 1
p, s, col, wantCol := ptest.CreateList2(values, tc.want)
wantCodec := kv.NewCodec(reflect.TypeOf(int(0)), reflect.TypeOf(int(0)))
colKV := beam.ParDo(s, testutils.PairToKV, col)
// pcol should contain 17→42 and 99→0.
pcol := MakePrivate(s, colKV, NewPrivacySpec(1, 1e-10))
// We change that to 17→<kv.Pair{84, 22}> and 99→<kv.Pair{0, 1}> in the PrivatePCollection
pcol = ParDo(s, doFn, pcol)
gotCol := beam.ParDo(s, testutils.KVToPairICodedKV, pcol.col)
passert.Equals(s, gotCol, wantCol)
if err := execute(tc.m(context.Background()), p); err != nil {
t.Errorf("With %s, got an error: %v", tc.desc, err)
if diff := cmp.Diff(pcol.codec, wantCodec, cmp.Comparer(compareCodecs)); diff != "" {
t.Errorf("With %s, DoFn returned a PrivatePCollection with wrong codec, diff=%s", tc.desc, diff)
func TestParDo1x2Err(t *testing.T) {
hasErr := func(v int) (int, int, error) {
return 0, 0, errors.New("sample error")
noErr := func(v int) (int, int, error) {
return v * 2, v/2 + 1, nil
for _, tc := range []struct {
desc string
doFn func(v int) (int, int, error)
returnsErr bool
want []testutils.PairICodedKV
{"doFn that does not return an error", noErr, false, valuesCodedKV},
{"doFn that returns an error", hasErr, true, zeroValuedCodedKV},
} {
p, s, col, wantCol := ptest.CreateList2(values, tc.want)
wantCodec := kv.NewCodec(reflect.TypeOf(int(0)), reflect.TypeOf(int(0)))
colKV := beam.ParDo(s, testutils.PairToKV, col)
// pcol should contain 17→42 and 99→0.
pcol := MakePrivate(s, colKV, NewPrivacySpec(1, 1e-10))
// We change that to 17→<kv.Pair{84, 22}> and 99→<kv.Pair{0, 1}> in the PrivatePCollection
pcol = ParDo(s, tc.doFn, pcol)
gotCol := beam.ParDo(s, testutils.KVToPairICodedKV, pcol.col)
passert.Equals(s, gotCol, wantCol)
if tc.returnsErr {
if err := execute(context.Background(), p); err == nil {
t.Errorf("With %s, expected runner to return an error, but didn't get one", tc.desc)
if err := execute(context.Background(), p); err != nil {
t.Errorf("With %s, did not expect an error, but got one: %v", tc.desc, err)
if diff := cmp.Diff(pcol.codec, wantCodec, cmp.Comparer(compareCodecs)); diff != "" {
t.Errorf("With %s, DoFn returned a PrivatePCollection with wrong codec, diff=%s", tc.desc, diff)
func TestParDoCtx1x2Err(t *testing.T) {
hasErr := func(ctx context.Context, v int) (int, int, error) {
return 0, 0, errors.New("sample error")
noErr := func(ctx context.Context, v int) (int, int, error) {
if ctx.Err() != nil {
return 0, 0, nil
return v * 2, v/2 + 1, nil
for _, tc := range []struct {
desc string
m func(context.Context) context.Context // function that modifies a context
doFn func(context.Context, int) (int, int, error)
returnsErr bool
want []testutils.PairICodedKV
// good context
{"unchanged context and no error", unchangedContext, noErr, false, valuesCodedKV},
// various context modifiers that will cancel the context before any work starts
{"cancel and no error", cancel, noErr, false, zeroValuedCodedKV},
{"instantDeadline and no error", instantDeadline, noErr, false, zeroValuedCodedKV},
{"instantTimeout and no error", instantTimeout, noErr, false, zeroValuedCodedKV},
{"instantTimeout and error", instantTimeout, hasErr, true, zeroValuedCodedKV},
} {
p, s, col, wantCol := ptest.CreateList2(values, tc.want)
wantCodec := kv.NewCodec(reflect.TypeOf(int(0)), reflect.TypeOf(int(0)))
colKV := beam.ParDo(s, testutils.PairToKV, col)
// pcol should contain 17→42 and 99→0.
pcol := MakePrivate(s, colKV, NewPrivacySpec(1, 1e-10))
// We change that to 17→<kv.Pair{84, 22}> and 99→<kv.Pair{0, 1}> in the PrivatePCollection
pcol = ParDo(s, tc.doFn, pcol)
gotCol := beam.ParDo(s, testutils.KVToPairICodedKV, pcol.col)
passert.Equals(s, gotCol, wantCol)
if tc.returnsErr {
if err := execute(tc.m(context.Background()), p); err == nil {
t.Errorf("With %s, expected runner to return an error, but didn't get one", tc.desc)
if err := execute(tc.m(context.Background()), p); err != nil {
t.Errorf("With %s, did not expect an error, but got one: %v", tc.desc, err)
if diff := cmp.Diff(pcol.codec, wantCodec, cmp.Comparer(compareCodecs)); diff != "" {
t.Errorf("With %s, DoFn returned a PrivatePCollection with wrong codec, diff=%s", tc.desc, diff)
func TestParDo2x1(t *testing.T) {
doFn := func(k int, v int) int { return k + v }
p, s, col, wantCol := ptest.CreateList2(valuesCodedKV, goodResult2x1)
colKV := beam.ParDo(s, testutils.PairICodedKVToKV, col)
// pcol should contain 17→<kv.Pair{84, 22}> and 99→<kv.Pair{0, 1}>.
pcol := MakePrivate(s, colKV, NewPrivacySpec(1, 1e-10))
// We change that to 17→106 and 99→1 in the PrivatePCollection
pcol = ParDo(s, doFn, pcol)
gotCol := beam.ParDo(s, testutils.KVToPair, pcol.col)
passert.Equals(s, gotCol, wantCol)
if err := execute(context.Background(), p); err != nil {
t.Errorf("Got an error: %v", err)
func TestParDoCtx2x1(t *testing.T) {
for _, tc := range []struct {
desc string
// m is a function that modifies a context
m func(context.Context) context.Context
want []testutils.PairII
// good context
{"unchanged context", unchangedContext, goodResult2x1},
// various context modifiers that will cancel the context before any work starts
{"cancel", cancel, zeroResult},
{"instantDeadline", instantDeadline, zeroResult},
{"instantTimeout", instantTimeout, zeroResult},
} {
doFn := func(ctx context.Context, k, v int) int {
if ctx.Err() != nil {
return 0
return k + v
p, s, col, wantCol := ptest.CreateList2(valuesCodedKV, tc.want)
colKV := beam.ParDo(s, testutils.PairICodedKVToKV, col)
// pcol should contain 17→<codedKV{84, 22}> and 99→<codedKV{0, 1}>.
pcol := MakePrivate(s, colKV, NewPrivacySpec(1, 1e-10))
// We change that to 17→106 and 99→1 in the PrivatePCollection
pcol = ParDo(s, doFn, pcol)
gotCol := beam.ParDo(s, testutils.KVToPair, pcol.col)
passert.Equals(s, gotCol, wantCol)
if err := execute(tc.m(context.Background()), p); err != nil {
t.Errorf("With %s, got an error: %v", tc.desc, err)
func TestParDo2x1Err(t *testing.T) {
noErr := func(k, v int) (int, error) {
return k + v, nil
hasErr := func(_, _ int) (int, error) {
return 0, errors.New("sample error")
for _, tc := range []struct {
desc string
doFn func(int, int) (int, error)
returnsErr bool
want []testutils.PairII
{"doFn that does not return an error", noErr, false, goodResult2x1},
{"doFn that returns an error", hasErr, true, zeroResult},
} {
p, s, col, wantCol := ptest.CreateList2(valuesCodedKV, tc.want)
colKV := beam.ParDo(s, testutils.PairICodedKVToKV, col)
// pcol should contain 17→<codedKV{84, 22}> and 99→<codedKV{0, 1}>.
pcol := MakePrivate(s, colKV, NewPrivacySpec(1, 1e-10))
// We change that to 17→106 and 99→1 in the PrivatePCollection
pcol = ParDo(s, tc.doFn, pcol)
gotCol := beam.ParDo(s, testutils.KVToPair, pcol.col)
passert.Equals(s, gotCol, wantCol)
if tc.returnsErr {
if err := execute(context.Background(), p); err == nil {
t.Errorf("With %s, expected runner to return an error, but didn't get one", tc.desc)
if err := execute(context.Background(), p); err != nil {
t.Errorf("With %s, did not expect an error, but got one: %v", tc.desc, err)
func TestParDoCtx2x1Err(t *testing.T) {
noErr := func(ctx context.Context, k, v int) (int, error) {
if ctx.Err() != nil {
return 0, nil
return k + v, nil
hasErr := func(_ context.Context, _, _ int) (int, error) {
return 0, errors.New("sample error")
for _, tc := range []struct {
desc string
m func(context.Context) context.Context // function that modifies a context
doFn func(context.Context, int, int) (int, error)
returnsErr bool
want []testutils.PairII
// good context
{"unchanged context and no error", unchangedContext, noErr, false, goodResult2x1},
// various context modifiers that will cancel the context before any work starts
{"cancel and no error", cancel, noErr, false, zeroResult},
{"instantDeadline and no error", instantDeadline, noErr, false, zeroResult},
{"instantTimeout and no error", instantTimeout, noErr, false, zeroResult},
{"instantTimeout and error", instantTimeout, hasErr, true, zeroResult},
} {
p, s, col, wantCol := ptest.CreateList2(valuesCodedKV, tc.want)
colKV := beam.ParDo(s, testutils.PairICodedKVToKV, col)
// pcol should contain 17→<codedKV{84, 22}> and 99→<codedKV{0, 1}>.
pcol := MakePrivate(s, colKV, NewPrivacySpec(1, 1e-10))
// We change that to 17→106 and 99→1 in the PrivatePCollection
pcol = ParDo(s, tc.doFn, pcol)
gotCol := beam.ParDo(s, testutils.KVToPair, pcol.col)
passert.Equals(s, gotCol, wantCol)
if tc.returnsErr {
if err := execute(tc.m(context.Background()), p); err == nil {
t.Errorf("With %s, expected runner to return an error, but didn't get one", tc.desc)
if err := execute(tc.m(context.Background()), p); err != nil {
t.Errorf("With %s, did not expect an error, but got one: %v", tc.desc, err)
func TestParDo2x2(t *testing.T) {
doFn := func(k int, v int) (int, int) { return k + v, k - v }
p, s, col, wantCol := ptest.CreateList2(valuesCodedKV, goodResult2x2)
wantCodec := kv.NewCodec(reflect.TypeOf(int(0)), reflect.TypeOf(int(0)))
colKV := beam.ParDo(s, testutils.PairICodedKVToKV, col)
// pcol should contain 17→<kv.Pair{84, 22}> and 99→<kv.Pair{0, 1}>.
pcol := MakePrivate(s, colKV, NewPrivacySpec(1, 1e-10))
// We change that to 17→<kv.Pair{106, 62}> and 99→<kv.Pair{1, -1}> in the PrivatePCollection
pcol = ParDo(s, doFn, pcol)
gotCol := beam.ParDo(s, testutils.KVToPairICodedKV, pcol.col)
passert.Equals(s, gotCol, wantCol)
if err := execute(context.Background(), p); err != nil {
t.Errorf("Got an error: %v", err)
if diff := cmp.Diff(pcol.codec, wantCodec, cmp.Comparer(compareCodecs)); diff != "" {
t.Errorf("DoFn returned a PrivatePCollection with wrong codec, diff=%s", diff)
func TestParDoCtx2x2(t *testing.T) {
for _, tc := range []struct {
desc string
m func(context.Context) context.Context // function that modifies a context
want []testutils.PairICodedKV
// good context
{"unchanged context", unchangedContext, goodResult2x2},
// various context modifiers that will cancel the context before any work starts
{"cancel", cancel, zeroValuedCodedKV},
{"instantDeadline", instantDeadline, zeroValuedCodedKV},
{"instantTimeout", instantTimeout, zeroValuedCodedKV},
} {
doFn := func(ctx context.Context, k, v int) (int, int) {
if ctx.Err() != nil {
return 0, 0
return k + v, k - v
p, s, col, wantCol := ptest.CreateList2(valuesCodedKV, tc.want)
wantCodec := kv.NewCodec(reflect.TypeOf(int(0)), reflect.TypeOf(int(0)))
colKV := beam.ParDo(s, testutils.PairICodedKVToKV, col)
// pcol should contain 17→<kv.Pair{84, 22}> and 99→<kv.Pair{0, 1}>.
pcol := MakePrivate(s, colKV, NewPrivacySpec(1, 1e-10))
// We change that to 17→<kv.Pair{106, 62}> and 99→<kv.Pair{1, -1}> in the PrivatePCollection
pcol = ParDo(s, doFn, pcol)
gotCol := beam.ParDo(s, testutils.KVToPairICodedKV, pcol.col)
passert.Equals(s, gotCol, wantCol)
if err := execute(tc.m(context.Background()), p); err != nil {
t.Errorf("With %s, got an error: %v", tc.desc, err)
if diff := cmp.Diff(pcol.codec, wantCodec, cmp.Comparer(compareCodecs)); diff != "" {
t.Errorf("With %s, DoFn returned a PrivatePCollection with wrong codec, diff=%s", tc.desc, diff)
func TestParDo2x2Err(t *testing.T) {
noErr := func(k, v int) (int, int, error) {
return k + v, k - v, nil
hasErr := func(_, _ int) (int, int, error) {
return 0, 0, errors.New("sample error")
for _, tc := range []struct {
desc string
doFn func(int, int) (int, int, error)
returnsErr bool
want []testutils.PairICodedKV
{"doFn that does not return an error", noErr, false, goodResult2x2},
{"doFn that returns an error", hasErr, true, zeroValuedCodedKV},
} {
p, s, col, wantCol := ptest.CreateList2(valuesCodedKV, tc.want)
wantCodec := kv.NewCodec(reflect.TypeOf(int(0)), reflect.TypeOf(int(0)))
colKV := beam.ParDo(s, testutils.PairICodedKVToKV, col)
// pcol should contain 17→<kv.Pair{84, 22}> and 99→<kv.Pair{0, 1}>.
pcol := MakePrivate(s, colKV, NewPrivacySpec(1, 1e-10))
// We change that to 17→<kv.Pair{106, 62}> and 99→<kv.Pair{1, -1}> in the PrivatePCollection
pcol = ParDo(s, tc.doFn, pcol)
gotCol := beam.ParDo(s, testutils.KVToPairICodedKV, pcol.col)
passert.Equals(s, gotCol, wantCol)
if tc.returnsErr {
if err := execute(context.Background(), p); err == nil {
t.Errorf("With %s, expected runner to return an error, but didn't get one", tc.desc)
if err := execute(context.Background(), p); err != nil {
t.Errorf("With %s, did not expect an error, but got one: %v", tc.desc, err)
if diff := cmp.Diff(pcol.codec, wantCodec, cmp.Comparer(compareCodecs)); diff != "" {
t.Errorf("With %s, DoFn returned a PrivatePCollection with wrong codec, diff=%s", tc.desc, diff)
func TestParDoCtx2x2Err(t *testing.T) {
noErr := func(ctx context.Context, k, v int) (int, int, error) {
if ctx.Err() != nil {
return 0, 0, nil
return k + v, k - v, nil
hasErr := func(_ context.Context, _, _ int) (int, int, error) {
return 0, 0, errors.New("sample error")
for _, tc := range []struct {
desc string
m func(context.Context) context.Context // function that modifies a context
doFn func(context.Context, int, int) (int, int, error)
returnsErr bool
want []testutils.PairICodedKV
// good context
{"unchanged context and no error", unchangedContext, noErr, false, goodResult2x2},
// various context modifiers that will cancel the context before any work starts
{"cancel and no error", cancel, noErr, false, zeroValuedCodedKV},
{"instantDeadline and no error", instantDeadline, noErr, false, zeroValuedCodedKV},
{"instantTimeout and no error", instantTimeout, noErr, false, zeroValuedCodedKV},
{"instantTimeout and error", instantTimeout, hasErr, true, zeroValuedCodedKV},
} {
p, s, col, wantCol := ptest.CreateList2(valuesCodedKV, tc.want)
wantCodec := kv.NewCodec(reflect.TypeOf(int(0)), reflect.TypeOf(int(0)))
colKV := beam.ParDo(s, testutils.PairICodedKVToKV, col)
// pcol should contain 17→<kv.Pair{84, 22}> and 99→<kv.Pair{0, 1}>.
pcol := MakePrivate(s, colKV, NewPrivacySpec(1, 1e-10))
// We change that to 17→<kv.Pair{106, 62}> and 99→<kv.Pair{1, -1}> in the PrivatePCollection
pcol = ParDo(s, tc.doFn, pcol)
gotCol := beam.ParDo(s, testutils.KVToPairICodedKV, pcol.col)
passert.Equals(s, gotCol, wantCol)
if tc.returnsErr {
if err := execute(tc.m(context.Background()), p); err == nil {
t.Errorf("With %s, expected runner to return an error, but didn't get one", tc.desc)
if err := execute(tc.m(context.Background()), p); err != nil {
t.Errorf("With %s, did not expect an error, but got one: %v", tc.desc, err)
if diff := cmp.Diff(pcol.codec, wantCodec, cmp.Comparer(compareCodecs)); diff != "" {
t.Errorf("With %s, DoFn returned a PrivatePCollection with wrong codec, diff=%s", tc.desc, diff)
// Functions that modify a context.Context, so that tests can have a
// cancelled or expired context
func unchangedContext(ctx context.Context) context.Context {
return ctx
func cancel(ctx context.Context) context.Context {
newCtx, cancel := context.WithCancel(ctx)
return newCtx
// These two functions cancel the context in a goroutine
// so that the context can "expire" naturally
func instantDeadline(ctx context.Context) context.Context {
newCtx, cancel := context.WithDeadline(ctx, time.Now())
go time.AfterFunc(time.Second, cancel)
return newCtx
func instantTimeout(ctx context.Context) context.Context {
newCtx, cancel := context.WithTimeout(ctx, 0)
go time.AfterFunc(time.Second, cancel)
return newCtx
// Reusable doFn that uses contexts
func doFnWithContext(ctx context.Context, v int) int {
if ctx.Err() != nil {
return 0
return v/2 + 5
func TestParDoCtx1x1(t *testing.T) {
for _, tc := range []struct {
// m is a function that modifies a context
m func(context.Context) context.Context
want []testutils.PairII
// good context
{unchangedContext, goodResult},
// various context modifiers that will cancel the context before any work starts
{cancel, zeroResult},
{instantDeadline, zeroResult},
{instantTimeout, zeroResult},
} {
p, s, col, wantCol := ptest.CreateList2(values, tc.want)
colKV := beam.ParDo(s, testutils.PairToKV, col)
// pcol should contain 17→42 and 99→0.
pcol := MakePrivate(s, colKV, NewPrivacySpec(1, 1e-10))
// If the context was cancelled, we change that to 17→0 and 99→0 in the PrivatePCollection,
// Otherwise, we change that to 17→26 and 99→5
pcol = ParDo(s, doFnWithContext, pcol)
gotCol := beam.ParDo(s, testutils.KVToPair, pcol.col)
passert.Equals(s, gotCol, wantCol)
// Run the beam pipeline with the modified context
if err := execute(tc.m(context.Background()), p); err != nil {
t.Errorf("Got an error: %v", err)
func TestParDo1x1Err(t *testing.T) {
doFn := func(v int) (int, error) {
return v/2 + 5, nil
p, s, col, wantCol := ptest.CreateList2(values, goodResult)
colKV := beam.ParDo(s, testutils.PairToKV, col)
pcol := MakePrivate(s, colKV, NewPrivacySpec(1, 1e-10))
pcol = ParDo(s, doFn, pcol)
gotCol := beam.ParDo(s, testutils.KVToPair, pcol.col)
passert.Equals(s, gotCol, wantCol)
if err := execute(context.Background(), p); err != nil {
t.Errorf("Got an error: %v", err)
func TestParDo1x1ErrReturnsError(t *testing.T) {
doFn := func(v int) (int, error) {
return 0, errors.New("this function always returns an error")
p, s, col, wantCol := ptest.CreateList2(values, zeroResult)
colKV := beam.ParDo(s, testutils.PairToKV, col)
pcol := MakePrivate(s, colKV, NewPrivacySpec(1, 1e-10))
pcol = ParDo(s, doFn, pcol)
gotCol := beam.ParDo(s, testutils.KVToPair, pcol.col)
passert.Equals(s, gotCol, wantCol)
if err := execute(context.Background(), p); err == nil {
t.Errorf("Expected runner to return an error, but didn't get one")
func TestParDoCtx1x1Err(t *testing.T) {
doFn := func(ctx context.Context, v int) (int, error) {
return doFnWithContext(ctx, v), nil
for _, tc := range []struct {
// m is a function that modifies a context
m func(context.Context) context.Context
want []testutils.PairII
// good context
{unchangedContext, goodResult},
// various context modifiers that will cancel the context before any work starts
{cancel, zeroResult},
{instantDeadline, zeroResult},
{instantTimeout, zeroResult},
} {
p, s, col, wantCol := ptest.CreateList2(values, tc.want)
colKV := beam.ParDo(s, testutils.PairToKV, col)
// pcol should contain 17→42 and 99→0.
pcol := MakePrivate(s, colKV, NewPrivacySpec(1, 1e-10))
// If the context was cancelled, we change that to 17→0 and 99→0 in the PrivatePCollection,
// Otherwise, we change that to 17→26 and 99→5
pcol = ParDo(s, doFn, pcol)
gotCol := beam.ParDo(s, testutils.KVToPair, pcol.col)
passert.Equals(s, gotCol, wantCol)
// Run the beam pipeline with the modified context
if err := execute(tc.m(context.Background()), p); err != nil {
t.Errorf("Got an error: %v", err)
func doFnPair(b int, emit func(int)) {
func doFnPairWithCtx(_ context.Context, b int, emit func(int)) {
func TestParDo1x1Emit(t *testing.T) {
values := []testutils.PairII{
{17, 42},
{19, 10},
{80, 99},
{99, 0},
p, s, col := ptest.CreateList(values)
colKV := beam.ParDo(s, testutils.PairToKV, col)
pcol := MakePrivate(s, colKV, NewPrivacySpec(1, 1e-10))
pcol = ParDo(s, doFnPair, pcol)
gotCol := beam.ParDo(s, testutils.KVToPair, pcol.col)
passert.Equals(s, gotCol, col)
if err := execute(context.Background(), p); err != nil {
t.Errorf("Without context, got an error: %v", err)
// Check for values with ctx passed in doFn
pcol = MakePrivate(s, colKV, NewPrivacySpec(1, 1e-10))
pcol = ParDo(s, doFnPairWithCtx, pcol)
gotCol = beam.ParDo(s, testutils.KVToPair, pcol.col)
passert.Equals(s, gotCol, col)
if err := execute(context.Background(), p); err != nil {
t.Errorf("With context, got an error: %v", err)
func TestParDo1x1ErrEmit(t *testing.T) {
noErr := func(v int, emit func(int)) error {
emit(v/2 + 5)
return nil
hasErr := func(v int, emit func(int)) error {
return errors.New("sample error")
for _, tc := range []struct {
desc string
doFn func(int, func(int)) error
returnsErr bool
want []testutils.PairII
{"doFn that does not return an error", noErr, false, goodResult},
{"doFn that returns an error", hasErr, true, zeroResult},
} {
p, s, col, wantCol := ptest.CreateList2(values, tc.want)
colKV := beam.ParDo(s, testutils.PairToKV, col)
// pcol should contain 17→42 and 99→0.
pcol := MakePrivate(s, colKV, NewPrivacySpec(1, 1e-10))
// We change that to 17→26 and 99→5 in the PrivatePCollection
pcol = ParDo(s, tc.doFn, pcol)
gotCol := beam.ParDo(s, testutils.KVToPair, pcol.col)
passert.Equals(s, gotCol, wantCol)
if tc.returnsErr {
if err := execute(context.Background(), p); err == nil {
t.Errorf("With %s, expected runner to return an error, but didn't get one", tc.desc)
if err := execute(context.Background(), p); err != nil {
t.Errorf("With %s, did not expect an error, but got one: %v", tc.desc, err)
func TestParDoCtx1x1ErrEmit(t *testing.T) {
noErr := func(ctx context.Context, v int, emit func(int)) error {
if ctx.Err() != nil {
return nil
emit(v/2 + 5)
return nil
hasErr := func(ctx context.Context, v int, emit func(int)) error {
return errors.New("sample error")
for _, tc := range []struct {
desc string
m func(context.Context) context.Context // function that modifies a context
doFn func(context.Context, int, func(int)) error
returnsErr bool
want []testutils.PairII
// good context
{"unchanged context and no error", unchangedContext, noErr, false, goodResult},
// various context modifiers that will cancel the context before any work starts
{"cancel and no error", cancel, noErr, false, zeroResult},
{"instantDeadline and no error", instantDeadline, noErr, false, zeroResult},
{"instantTimeout and no error", instantTimeout, noErr, false, zeroResult},
{"instantTimeout and error", instantTimeout, hasErr, true, zeroResult},
} {
p, s, col, wantCol := ptest.CreateList2(values, tc.want)
colKV := beam.ParDo(s, testutils.PairToKV, col)
// pcol should contain 17→42 and 99→0.
pcol := MakePrivate(s, colKV, NewPrivacySpec(1, 1e-10))
// We change that to 17→26 and 99→5 in the PrivatePCollection
pcol = ParDo(s, tc.doFn, pcol)
gotCol := beam.ParDo(s, testutils.KVToPair, pcol.col)
passert.Equals(s, gotCol, wantCol)
if tc.returnsErr {
if err := execute(tc.m(context.Background()), p); err == nil {
t.Errorf("With %s, expected runner to return an error, but didn't get one", tc.desc)
if err := execute(tc.m(context.Background()), p); err != nil {
t.Errorf("With %s, did not expect an error, but got one: %v", tc.desc, err)
func TestParDo1x2Emit(t *testing.T) {
for _, tc := range []struct {
desc string
doFn interface{}
want []testutils.PairICodedKV
{"doFn that emits only non-zero inputs",
func(v int, emit func(int, int)) {
if v != 0 {
emit(v*2, v/2+1)
{17, pairK84V22},
{"doFn that emits each input once",
func(v int, emit func(int, int)) {
emit(v*2, v/2+1)
{"doFn that emits once or twice",
func(v int, emit func(int, int)) {
emit(v*2, v/2+1)
if v != 0 {
v = v / 2
emit(v*2, v/2+1)
{17, pairK84V22},
{17, pairK42V11},
{99, pairK0V1},
} {
p, s, col, wantCol := ptest.CreateList2(values, tc.want)
wantCodec := kv.NewCodec(reflect.TypeOf(int(0)), reflect.TypeOf(int(0)))
colKV := beam.ParDo(s, testutils.PairToKV, col)
// pcol should contain 17→42 and 99→0.
pcol := MakePrivate(s, colKV, NewPrivacySpec(1, 1e-10))
// We change that to 17→<kv.Pair{84, 22}> and 99→<kv.Pair{0, 1}> in the PrivatePCollection
pcol = ParDo(s, tc.doFn, pcol)
gotCol := beam.ParDo(s, testutils.KVToPairICodedKV, pcol.col)
passert.Equals(s, gotCol, wantCol)
if err := execute(context.Background(), p); err != nil {
t.Errorf("With %s, got an error: %v", tc.desc, err)
if diff := cmp.Diff(pcol.codec, wantCodec, cmp.Comparer(compareCodecs)); diff != "" {
t.Errorf("With %s, DoFn returned a PrivatePCollection with wrong codec, diff=%s", tc.desc, diff)
func TestParDoCtx1x2Emit(t *testing.T) {
for _, tc := range []struct {
desc string
m func(context.Context) context.Context // function that modifies a context
want []testutils.PairICodedKV
// good context
{"unchanged context", unchangedContext, valuesCodedKV},
// various context modifiers that will cancel the context before any work starts
{"cancel", cancel, zeroValuedCodedKV},
{"instantDeadline", instantDeadline, zeroValuedCodedKV},
{"instantTimeout", instantTimeout, zeroValuedCodedKV},
} {
doFn := func(ctx context.Context, v int, emit func(int, int)) {
if ctx.Err() != nil {
emit(0, 0)
} else {
emit(v*2, v/2+1)
p, s, col, wantCol := ptest.CreateList2(values, tc.want)
wantCodec := kv.NewCodec(reflect.TypeOf(int(0)), reflect.TypeOf(int(0)))
colKV := beam.ParDo(s, testutils.PairToKV, col)
// pcol should contain 17→42 and 99→0.
pcol := MakePrivate(s, colKV, NewPrivacySpec(1, 1e-10))
// We change that to 17→<kv.Pair{84, 22}> and 99→<kv.Pair{0, 1}> in the PrivatePCollection
pcol = ParDo(s, doFn, pcol)
gotCol := beam.ParDo(s, testutils.KVToPairICodedKV, pcol.col)
passert.Equals(s, gotCol, wantCol)
if err := execute(tc.m(context.Background()), p); err != nil {
t.Errorf("With %s, got an error: %v", tc.desc, err)
if diff := cmp.Diff(pcol.codec, wantCodec, cmp.Comparer(compareCodecs)); diff != "" {
t.Errorf("With %s, DoFn returned a PrivatePCollection with wrong codec, diff=%s", tc.desc, diff)
func TestParDo1x2ErrEmit(t *testing.T) {
noErr := func(v int, emit func(int, int)) error {
emit(v*2, v/2+1)
return nil
hasErr := func(v int, emit func(int, int)) error {
return errors.New("sample error")
for _, tc := range []struct {
desc string
doFn func(int, func(int, int)) error
returnsErr bool
want []testutils.PairICodedKV
{"doFn that does not return an error", noErr, false, valuesCodedKV},
{"doFn that returns an error", hasErr, true, zeroValuedCodedKV},
} {
p, s, col, wantCol := ptest.CreateList2(values, tc.want)
wantCodec := kv.NewCodec(reflect.TypeOf(int(0)), reflect.TypeOf(int(0)))
colKV := beam.ParDo(s, testutils.PairToKV, col)
// pcol should contain 17→42 and 99→0.
pcol := MakePrivate(s, colKV, NewPrivacySpec(1, 1e-10))
// We change that to 17→<kv.Pair{84, 22}> and 99→<kv.Pair{0, 1}> in the PrivatePCollection
pcol = ParDo(s, tc.doFn, pcol)
gotCol := beam.ParDo(s, testutils.KVToPairICodedKV, pcol.col)
passert.Equals(s, gotCol, wantCol)
if tc.returnsErr {
if err := execute(context.Background(), p); err == nil {
t.Errorf("With %s, expected runner to return an error, but didn't get one", tc.desc)
if err := execute(context.Background(), p); err != nil {
t.Errorf("With %s, did not expect an error, but got one: %v", tc.desc, err)
if diff := cmp.Diff(pcol.codec, wantCodec, cmp.Comparer(compareCodecs)); diff != "" {
t.Errorf("With %s, DoFn returned a PrivatePCollection with wrong codec, diff=%s", tc.desc, diff)
func TestParDoCtx1x2ErrEmit(t *testing.T) {
noErr := func(ctx context.Context, v int, emit func(int, int)) error {
if ctx.Err() != nil {
emit(0, 0)
return nil
emit(v*2, v/2+1)
return nil
hasErr := func(ctx context.Context, v int, emit func(int, int)) error {
return errors.New("sample error")
for _, tc := range []struct {
desc string
m func(context.Context) context.Context // function that modifies a context
doFn func(context.Context, int, func(int, int)) error
returnsErr bool
want []testutils.PairICodedKV
// good context
{"unchanged context and no error", unchangedContext, noErr, false, valuesCodedKV},
// various context modifiers that will cancel the context before any work starts
{"cancel and no error", cancel, noErr, false, zeroValuedCodedKV},
{"instantDeadline and no error", instantDeadline, noErr, false, zeroValuedCodedKV},
{"instantTimeout and no error", instantTimeout, noErr, false, zeroValuedCodedKV},
{"instantTimeout and error", instantTimeout, hasErr, true, zeroValuedCodedKV},
} {
p, s, col, wantCol := ptest.CreateList2(values, tc.want)
wantCodec := kv.NewCodec(reflect.TypeOf(int(0)), reflect.TypeOf(int(0)))
colKV := beam.ParDo(s, testutils.PairToKV, col)
// pcol should contain 17→42 and 99→0.
pcol := MakePrivate(s, colKV, NewPrivacySpec(1, 1e-10))
// We change that to 17→<kv.Pair{84, 22}> and 99→<kv.Pair{0, 1}> in the PrivatePCollection
pcol = ParDo(s, tc.doFn, pcol)
gotCol := beam.ParDo(s, testutils.KVToPairICodedKV, pcol.col)
passert.Equals(s, gotCol, wantCol)
if tc.returnsErr {
if err := execute(tc.m(context.Background()), p); err == nil {
t.Errorf("With %s, expected runner to return an error, but didn't get one", tc.desc)
if err := execute(tc.m(context.Background()), p); err != nil {
t.Errorf("With %s, got an error: %v", tc.desc, err)
if diff := cmp.Diff(pcol.codec, wantCodec, cmp.Comparer(compareCodecs)); diff != "" {
t.Errorf("With %s, DoFn returned a PrivatePCollection with wrong codec, diff=%s", tc.desc, diff)
func TestParDo2x1Emit(t *testing.T) {
for _, tc := range []struct {
desc string
doFn interface{}
want []testutils.PairII
{"doFn that emits only non-zero input k",
func(k, v int, emit func(int)) {
if k != 0 {
emit(k + v)
{17, 106},
{"doFn that emits each input once",
func(k, v int, emit func(int)) {
emit(k + v)
{"doFn that emits once or twice",
func(k, v int, emit func(int)) {
emit(k + v)
if k != 0 {
k = k / 2
v = v / 2
emit(k + v)
{17, 106},
{17, 53},
{99, 1},
} {
p, s, col, wantCol := ptest.CreateList2(valuesCodedKV, tc.want)
colKV := beam.ParDo(s, testutils.PairICodedKVToKV, col)
// pcol should contain 17→<kv.Pair{84, 22}> and 99→<kv.Pair{0, 1}>.
pcol := MakePrivate(s, colKV, NewPrivacySpec(1, 1e-10))
// We change that to 17→106 and 99→1 in the PrivatePCollection
pcol = ParDo(s, tc.doFn, pcol)
gotCol := beam.ParDo(s, testutils.KVToPair, pcol.col)
passert.Equals(s, gotCol, wantCol)
if err := execute(context.Background(), p); err != nil {
t.Errorf("With %s, got an error: %v", tc.desc, err)
func TestParDoCtx2x1Emit(t *testing.T) {
for _, tc := range []struct {
desc string
m func(context.Context) context.Context // function that modifies a context
want []testutils.PairII
// good context
{"unchanged context", unchangedContext, goodResult2x1},
// various context modifiers that will cancel the context before any work starts
{"cancel", cancel, zeroResult},
{"instantDeadline", instantDeadline, zeroResult},
{"instantTimeout", instantTimeout, zeroResult},
} {
doFn := func(ctx context.Context, k, v int, emit func(int)) {
if ctx.Err() != nil {
} else {
emit(k + v)
p, s, col, wantCol := ptest.CreateList2(valuesCodedKV, tc.want)
colKV := beam.ParDo(s, testutils.PairICodedKVToKV, col)
// pcol should contain 17→<kv.Pair{84, 22}> and 99→<kv.Pair{0, 1}>.
pcol := MakePrivate(s, colKV, NewPrivacySpec(1, 1e-10))
// We change that to 17→106 and 99→1 in the PrivatePCollection
pcol = ParDo(s, doFn, pcol)
gotCol := beam.ParDo(s, testutils.KVToPair, pcol.col)
passert.Equals(s, gotCol, wantCol)
if err := execute(tc.m(context.Background()), p); err != nil {
t.Errorf("With %s, got an error: %v", tc.desc, err)
func TestParDo2x1ErrEmit(t *testing.T) {
noErr := func(k, v int, emit func(int)) error {
emit(k + v)
return nil
hasErr := func(k, v int, emit func(int)) error {
return errors.New("sample error")
for _, tc := range []struct {
desc string
doFn func(int, int, func(int)) error
returnsErr bool
want []testutils.PairII
{"doFn that does not return an error", noErr, false, goodResult2x1},
{"doFn that returns an error", hasErr, true, zeroResult},
} {
p, s, col, wantCol := ptest.CreateList2(valuesCodedKV, tc.want)
colKV := beam.ParDo(s, testutils.PairICodedKVToKV, col)
// pcol should contain 17→<codedKV{84, 22}> and 99→<codedKV{0, 1}>.
pcol := MakePrivate(s, colKV, NewPrivacySpec(1, 1e-10))
// We change that to 17→106 and 99→1 in the PrivatePCollection
pcol = ParDo(s, tc.doFn, pcol)
gotCol := beam.ParDo(s, testutils.KVToPair, pcol.col)
passert.Equals(s, gotCol, wantCol)
if tc.returnsErr {
if err := execute(context.Background(), p); err == nil {
t.Errorf("With %s, expected runner to return an error, but didn't get one", tc.desc)
if err := execute(context.Background(), p); err != nil {
t.Errorf("With %s, did not expect an error, but got on: %v", tc.desc, err)
func TestParDoCtx2x1ErrEmit(t *testing.T) {
noErr := func(ctx context.Context, k, v int, emit func(int)) error {
if ctx.Err() != nil {
return nil
emit(k + v)
return nil
hasErr := func(ctx context.Context, k, v int, emit func(int)) error {
return errors.New("sample error")
for _, tc := range []struct {
desc string
m func(context.Context) context.Context // function that modifies a context
doFn func(context.Context, int, int, func(int)) error
returnsErr bool
want []testutils.PairII
// good context
{"unchanged context and no error", unchangedContext, noErr, false, goodResult2x1},
// various context modifiers that will cancel the context before any work starts
{"cancel and no error", cancel, noErr, false, zeroResult},
{"instantDeadline and no error", instantDeadline, noErr, false, zeroResult},
{"instantTimeout and no error", instantTimeout, noErr, false, zeroResult},
{"instantTimeout and error", instantTimeout, hasErr, true, zeroResult},
} {
p, s, col, wantCol := ptest.CreateList2(valuesCodedKV, tc.want)
colKV := beam.ParDo(s, testutils.PairICodedKVToKV, col)
// pcol should contain 17→<codedKV{84, 22}> and 99→<codedKV{0, 1}>.
pcol := MakePrivate(s, colKV, NewPrivacySpec(1, 1e-10))
// We change that to 17→106 and 99→1 in the PrivatePCollection
pcol = ParDo(s, tc.doFn, pcol)
gotCol := beam.ParDo(s, testutils.KVToPair, pcol.col)
passert.Equals(s, gotCol, wantCol)
if tc.returnsErr {
if err := execute(tc.m(context.Background()), p); err == nil {
t.Errorf("With %s, expected runner to return an error, but didn't get one", tc.desc)
if err := execute(tc.m(context.Background()), p); err != nil {
t.Errorf("With %s, did not expect an error, but got on: %v", tc.desc, err)
func TestParDo2x2Emit(t *testing.T) {
for _, tc := range []struct {
desc string
doFn interface{}
want []testutils.PairICodedKV
{"doFn that emits only non-zero input k",
func(k, v int, emit func(int, int)) {
if k != 0 {
emit(k+v, k-v)
{17, pairK106V62},
{"doFn that emits each input once",
func(k, v int, emit func(int, int)) {
emit(k+v, k-v)
{"doFn that emits once or twice",
func(k, v int, emit func(int, int)) {
emit(k+v, k-v)
if k != 0 {
k = k / 2
v = v / 2
emit(k+v, k-v)
{17, pairK106V62},
{17, pairK53V31},
{99, pairK1VMinus1},
} {
p, s, col, wantCol := ptest.CreateList2(valuesCodedKV, tc.want)
wantCodec := kv.NewCodec(reflect.TypeOf(int(0)), reflect.TypeOf(int(0)))
colKV := beam.ParDo(s, testutils.PairICodedKVToKV, col)
// pcol should contain 17→<kv.Pair{84, 22}> and 99→<kv.Pair{0, 1}>.
pcol := MakePrivate(s, colKV, NewPrivacySpec(1, 1e-10))
// We change that to 17→<kv.Pair{106, 62}> and 99→<kv.Pair{1, -1}> in the PrivatePCollection
pcol = ParDo(s, tc.doFn, pcol)
gotCol := beam.ParDo(s, testutils.KVToPairICodedKV, pcol.col)
passert.Equals(s, gotCol, wantCol)
if err := execute(context.Background(), p); err != nil {
t.Errorf("With %s, got an error: %v", tc.desc, err)
if diff := cmp.Diff(pcol.codec, wantCodec, cmp.Comparer(compareCodecs)); diff != "" {
t.Errorf("With %s, DoFn returned a PrivatePCollection with wrong codec, diff=%s", tc.desc, diff)
func TestParDoCtx2x2Emit(t *testing.T) {
for _, tc := range []struct {
desc string
m func(context.Context) context.Context // function that modifies a context
want []testutils.PairICodedKV
// good context
{"unchanged context", unchangedContext, goodResult2x2},
// various context modifiers that will cancel the context before any work starts
{"cancel", cancel, zeroValuedCodedKV},
{"instantDeadline", instantDeadline, zeroValuedCodedKV},
{"instantTimeout", instantTimeout, zeroValuedCodedKV},
} {
doFn := func(ctx context.Context, k, v int, emit func(int, int)) {
if ctx.Err() != nil {
emit(0, 0)
} else {
emit(k+v, k-v)
p, s, col, wantCol := ptest.CreateList2(valuesCodedKV, tc.want)
wantCodec := kv.NewCodec(reflect.TypeOf(int(0)), reflect.TypeOf(int(0)))
colKV := beam.ParDo(s, testutils.PairICodedKVToKV, col)
// pcol should contain 17→<kv.Pair{84, 22}> and 99→<kv.Pair{0, 1}>.
pcol := MakePrivate(s, colKV, NewPrivacySpec(1, 1e-10))
// We change that to 17→<kv.Pair{106, 62}> and 99→<kv.Pair{1, -1}> in the PrivatePCollection
pcol = ParDo(s, doFn, pcol)
gotCol := beam.ParDo(s, testutils.KVToPairICodedKV, pcol.col)
passert.Equals(s, gotCol, wantCol)
if err := execute(tc.m(context.Background()), p); err != nil {
t.Errorf("With %s, got an error: %v", tc.desc, err)
if diff := cmp.Diff(pcol.codec, wantCodec, cmp.Comparer(compareCodecs)); diff != "" {
t.Errorf("With %s, DoFn returned a PrivatePCollection with wrong codec, diff=%s", tc.desc, diff)
func TestParDo2x2ErrEmit(t *testing.T) {
noErr := func(k, v int, emit func(int, int)) error {
emit(k+v, k-v)
return nil
hasErr := func(k, v int, emit func(int, int)) error {
return errors.New("sample error")
for _, tc := range []struct {
desc string
doFn func(int, int, func(int, int)) error
returnsErr bool
want []testutils.PairICodedKV
{"doFn that does not return an error", noErr, false, goodResult2x2},
{"doFn that returns an error", hasErr, true, zeroValuedCodedKV},
} {
p, s, col, wantCol := ptest.CreateList2(valuesCodedKV, tc.want)
wantCodec := kv.NewCodec(reflect.TypeOf(int(0)), reflect.TypeOf(int(0)))
colKV := beam.ParDo(s, testutils.PairICodedKVToKV, col)
// pcol should contain 17→<kv.Pair{84, 22}> and 99→<kv.Pair{0, 1}>.
pcol := MakePrivate(s, colKV, NewPrivacySpec(1, 1e-10))
// We change that to 17→<kv.Pair{106, 62}> and 99→<kv.Pair{1, -1}> in the PrivatePCollection
pcol = ParDo(s, tc.doFn, pcol)
gotCol := beam.ParDo(s, testutils.KVToPairICodedKV, pcol.col)
passert.Equals(s, gotCol, wantCol)
if tc.returnsErr {
if err := execute(context.Background(), p); err == nil {
t.Errorf("With %s, expected runner to return an error, but didn't get one", tc.desc)
if err := execute(context.Background(), p); err != nil {
t.Errorf("With %s, got an error: %v", tc.desc, err)
if diff := cmp.Diff(pcol.codec, wantCodec, cmp.Comparer(compareCodecs)); diff != "" {
t.Errorf("With %s, DoFn returned a PrivatePCollection with wrong codec, diff=%s", tc.desc, diff)
func TestParDoCtx2x2ErrEmit(t *testing.T) {
noErr := func(ctx context.Context, k, v int, emit func(int, int)) error {
if ctx.Err() != nil {
emit(0, 0)
return nil
emit(k+v, k-v)
return nil
hasErr := func(ctx context.Context, k, v int, emit func(int, int)) error {
return errors.New("sample error")
for _, tc := range []struct {
desc string
m func(context.Context) context.Context // function that modifies a context
doFn func(context.Context, int, int, func(int, int)) error
returnsErr bool
want []testutils.PairICodedKV
// good context
{"unchanged context and no error", unchangedContext, noErr, false, goodResult2x2},
// various context modifiers that will cancel the context before any work starts
{"cancel and no error", cancel, noErr, false, zeroValuedCodedKV},
{"instantDeadline and no error", instantDeadline, noErr, false, zeroValuedCodedKV},
{"instantTimeout and no error", instantTimeout, noErr, false, zeroValuedCodedKV},
{"instantTimeout and error", instantTimeout, hasErr, true, zeroValuedCodedKV},
} {
p, s, col, wantCol := ptest.CreateList2(valuesCodedKV, tc.want)
wantCodec := kv.NewCodec(reflect.TypeOf(int(0)), reflect.TypeOf(int(0)))
colKV := beam.ParDo(s, testutils.PairICodedKVToKV, col)
// pcol should contain 17→<kv.Pair{84, 22}> and 99→<kv.Pair{0, 1}>.
pcol := MakePrivate(s, colKV, NewPrivacySpec(1, 1e-10))
// We change that to 17→<kv.Pair{106, 62}> and 99→<kv.Pair{1, -1}> in the PrivatePCollection
pcol = ParDo(s, tc.doFn, pcol)
gotCol := beam.ParDo(s, testutils.KVToPairICodedKV, pcol.col)
passert.Equals(s, gotCol, wantCol)
if tc.returnsErr {
if err := execute(tc.m(context.Background()), p); err == nil {
t.Errorf("With %s, expected runner to return an error, but didn't get one", tc.desc)
if err := execute(tc.m(context.Background()), p); err != nil {
t.Errorf("With %s, got an error: %v", tc.desc, err)
if diff := cmp.Diff(pcol.codec, wantCodec, cmp.Comparer(compareCodecs)); diff != "" {
t.Errorf("With %s, DoFn returned a PrivatePCollection with wrong codec, diff=%s", tc.desc, diff)
// Ensure that valid DoFns built are built correctly
func TestBuildDoFn(t *testing.T) {
for _, tc := range []struct {
desc string
doFn interface{}
wantType reflect.Type
wantTypeDef beam.TypeDefinition
wantCodec *kv.Codec
{"string → int",
func(x string) int { return len(x) },
beam.TypeDefinition{Var: beam.YType, T: reflect.TypeOf(int(0))},
{"(context, string) → int",
func(_ context.Context, x string) int { return len(x) },
beam.TypeDefinition{Var: beam.YType, T: reflect.TypeOf(int(0))},
{"string → (int, error)",
func(x string) (int, error) { return len(x), nil },
beam.TypeDefinition{Var: beam.YType, T: reflect.TypeOf(int(0))},
{"(context, string) → (int, error)",
func(_ context.Context, x string) (int, error) { return len(x), nil },
beam.TypeDefinition{Var: beam.YType, T: reflect.TypeOf(int(0))},
{"string → (string, int)",
func(x string) (string, int) { return x, len(x) },
beam.TypeDefinition{Var: beam.ZType, T: reflect.TypeOf(kv.Pair{})},
kv.NewCodec(reflect.TypeOf(""), reflect.TypeOf(int(0))),
{"(context, string) → (string, int)",
func(_ context.Context, x string) (string, int) { return x, len(x) },
beam.TypeDefinition{Var: beam.ZType, T: reflect.TypeOf(kv.Pair{})},
kv.NewCodec(reflect.TypeOf(""), reflect.TypeOf(int(0))),
{"string→(string, int, error)",
func(x string) (string, int, error) { return x, len(x), nil },
beam.TypeDefinition{Var: beam.ZType, T: reflect.TypeOf(kv.Pair{})},
kv.NewCodec(reflect.TypeOf(""), reflect.TypeOf(int(0))),
{"(context, string) → (string, int, error)",
func(_ context.Context, x string) (string, int, error) { return x, len(x), nil },
beam.TypeDefinition{Var: beam.ZType, T: reflect.TypeOf(kv.Pair{})},
kv.NewCodec(reflect.TypeOf(""), reflect.TypeOf(int(0))),
{"(string, int) → int",
func(x string, y int) int { return len(x) },
beam.TypeDefinition{Var: beam.YType, T: reflect.TypeOf(int(0))},
{"(context, string, int) → int",
func(_ context.Context, x string, y int) int { return len(x) },
beam.TypeDefinition{Var: beam.YType, T: reflect.TypeOf(int(0))},
{"(string, int) → (int, error)",
func(x string, y int) (int, error) { return len(x), nil },
beam.TypeDefinition{Var: beam.YType, T: reflect.TypeOf(int(0))},
{"(context, string, int) → (int, error)",
func(_ context.Context, x string, y int) (int, error) { return len(x), nil },
beam.TypeDefinition{Var: beam.YType, T: reflect.TypeOf(int(0))},
{"(string, int) → (string, int)",
func(x string, y int) (string, int) { return x, len(x) + y },
kv.NewCodec(reflect.TypeOf(""), reflect.TypeOf(int(0))),
{"(context, string, int) → (string, int)",
func(_ context.Context, x string, y int) (string, int) { return x, len(x) + y },
kv.NewCodec(reflect.TypeOf(""), reflect.TypeOf(int(0))),
{"(string, int) → (string, int, error)",
func(x string, y int) (string, int, error) { return x, len(x) + y, nil },
kv.NewCodec(reflect.TypeOf(""), reflect.TypeOf(int(0))),
{"(context, string, int) → (string, int, error)",
func(_ context.Context, x string, y int) (string, int, error) { return x, len(x) + y, nil },
kv.NewCodec(reflect.TypeOf(""), reflect.TypeOf(int(0))),
{"(string, emit(string)) → <no output>",
func(x string, emit func(string)) { emit(x) },
beam.TypeDefinition{Var: beam.YType, T: reflect.TypeOf("")},
{"(context, string, emit(string)) → <no output>",
func(_ context.Context, x string, emit func(string)) { emit(x) },
beam.TypeDefinition{Var: beam.YType, T: reflect.TypeOf("")},
{"(string, emit(string)) → error",
func(x string, emit func(string)) error {
return nil
beam.TypeDefinition{Var: beam.YType, T: reflect.TypeOf("")},
{"(context, string, emit(string)) → error",
func(_ context.Context, x string, emit func(string)) error {
return nil
beam.TypeDefinition{Var: beam.YType, T: reflect.TypeOf("")},
{"(string, emit(string, int)) → <no output>",
func(x string, emit func(string, int)) { emit(x, len(x)) },
beam.TypeDefinition{Var: beam.ZType, T: reflect.TypeOf(kv.Pair{})},
kv.NewCodec(reflect.TypeOf(""), reflect.TypeOf(int(0))),
{"(context, string, emit(string, int)) → <no output>",
func(ctx context.Context, x string, emit func(string, int)) { emit(x, len(x)) },
beam.TypeDefinition{Var: beam.ZType, T: reflect.TypeOf(kv.Pair{})},
kv.NewCodec(reflect.TypeOf(""), reflect.TypeOf(int(0))),
{"(string, emit(string, int)) → error",
func(x string, emit func(string, int)) error {
emit(x, len(x))
return nil
beam.TypeDefinition{Var: beam.ZType, T: reflect.TypeOf(kv.Pair{})},
kv.NewCodec(reflect.TypeOf(""), reflect.TypeOf(int(0))),
{"(context, string, emit(string, int)) → error",
func(ctx context.Context, x string, emit func(string, int)) error {
emit(x, len(x))
return nil
beam.TypeDefinition{Var: beam.ZType, T: reflect.TypeOf(kv.Pair{})},
kv.NewCodec(reflect.TypeOf(""), reflect.TypeOf(int(0))),
{"(string, int, emit(string)) → <no output>",
func(x string, y int, emit func(string)) { emit(x) },
beam.TypeDefinition{Var: beam.YType, T: reflect.TypeOf("")},
{"(context, string, int, emit(string)) → <no output>",
func(ctx context.Context, x string, y int, emit func(string)) { emit(x) },
beam.TypeDefinition{Var: beam.YType, T: reflect.TypeOf("")},
{"(string, int, emit(string)) → error",
func(x string, y int, emit func(string)) error {
return nil
beam.TypeDefinition{Var: beam.YType, T: reflect.TypeOf("")},
{"(context, string, int, emit(string)) → error",
func(ctx context.Context, x string, y int, emit func(string)) error {
return nil
beam.TypeDefinition{Var: beam.YType, T: reflect.TypeOf("")},
{"(string, int, emit(string, int)) → <no output>",
func(x string, y int, emit func(string, int)) { emit(x, len(x)) },
kv.NewCodec(reflect.TypeOf(""), reflect.TypeOf(int(0))),
{"(context, string, int, emit(string, int)) → <no output>",
func(ctx context.Context, x string, y int, emit func(string, int)) { emit(x, len(x)) },
kv.NewCodec(reflect.TypeOf(""), reflect.TypeOf(int(0))),
{"(string, int, emit(string, int)) → error",
func(x string, y int, emit func(string, int)) error {
emit(x, len(x))
return nil
kv.NewCodec(reflect.TypeOf(""), reflect.TypeOf(int(0))),
{"(context, string, int, emit(string, int)) → error",
func(ctx context.Context, x string, y int, emit func(string, int)) error {
emit(x, len(x))
return nil
kv.NewCodec(reflect.TypeOf(""), reflect.TypeOf(int(0))),
} {
got, err := buildDoFn(tc.doFn)
if err != nil {
t.Errorf("%s: buildDoFn returned error %v (expected no error)", tc.desc, err)
if got == nil {
t.Errorf("%s: buildDoFn returned nil function (wanted function of type %v)", tc.desc, tc.wantType.String())
if typ := reflect.TypeOf(got.fn); typ != tc.wantType {
t.Errorf("%s: buildDoFn returned wrong type %v (want %v)", tc.desc, typ.String(), tc.wantType.String())
if diff := cmp.Diff(got.typeDef, tc.wantTypeDef, cmp.Comparer(compareTypeDefs)); diff != "" {
t.Errorf("%s: buildDoFn returned wrong beam.TypeDefinition, diff=%s", tc.desc, diff)
if diff := cmp.Diff(got.codec, tc.wantCodec, cmp.Comparer(compareCodecs)); diff != "" {
t.Errorf("%s: buildDoFn returned wrong kv.Codec, diff=%s", tc.desc, diff)
type testStructuralDoFn struct {
state int
func (fn *testStructuralDoFn) ProcessElement(x int) int {
return fn.state + x
// Ensure that invalid DoFns return an error
func TestInvalidDoFn(t *testing.T) {
for _, tc := range []struct {
desc string
doFn interface{}
{"structural doFn", &testStructuralDoFn{1}},
// bad inputs
{"(string, string, string) → int", func(x, y, z string) int { return len(x) }},
{"(EventTime, string) → int", func(_ beam.EventTime, x string) int { return len(x) }},
{"(Window, string) → int", func(_ beam.Window, x string) int { return len(x) }},
{"(string, emit(string), unWantedEmit(string)) → <no output>", func(x string, emit func(string), unWantedEmit func(string)) { emit(x) }},
{"(context, int, string, emit(string)) → string", func(_ context.Context, _ int, x string, emit func(string)) string { return x }},
{"(string, emit(string), unWantedEmit(string)) → <no output>", func(x string, emit func(string), unWantedEmit func(string)) { emit(x) }},
{"(context, int, string, emit(string)) → string", func(_ context.Context, _ int, x string, emit func(string)) string { return x }},
// bad outputs
{"string → (EventTime, int)", func(x string) (beam.EventTime, int) { return 0, len(x) }},
{"string → (int, int, int)", func(x string) (int, int, int) { return len(x), 0, -1 }},
{"string → <no output>", func(x string) {}},
{"(string, emit(string)) → string", func(x string, emit func(string)) string { return x }},
} {
// All of these doFns should return an error
got, err := buildDoFn(tc.doFn)
if got != nil {
t.Errorf("%s: buildDoFn returned (non-nil function),%v; expected nil function and error", tc.desc, err)
if err == nil {
t.Errorf("%s: buildDoFn returned <nil function>,<nil error>; expected an error", tc.desc)
// Check that we return a human-readable error, otherwise the
// tested fn is not a valid DoFn for beam.ParDo in the first place
if err != nil && strings.Contains(err.Error(), "couldn't create funcx.Fn") {
t.Errorf("%s: buildDoFn failed with a Beam error: %v", tc.desc, err)
func TestValidateArgOrder(t *testing.T) {
for _, tc := range []struct {
desc string
order []funcx.FnParamKind
valid bool
desc: "(context)",
order: []funcx.FnParamKind{funcx.FnContext},
valid: true,
}, {
desc: "(time)",
order: []funcx.FnParamKind{funcx.FnEventTime},
valid: true,
}, {
desc: "(value)",
order: []funcx.FnParamKind{funcx.FnValue},
valid: true,
}, {
desc: "(context,time)",
order: []funcx.FnParamKind{funcx.FnContext, funcx.FnEventTime},
valid: true,
}, {
desc: "(context, value)",
order: []funcx.FnParamKind{funcx.FnContext, funcx.FnValue},
valid: true,
}, {
desc: "(time, context)",
order: []funcx.FnParamKind{funcx.FnEventTime, funcx.FnContext},
valid: false,
}, {
desc: "(time, value)",
order: []funcx.FnParamKind{funcx.FnEventTime, funcx.FnValue},
valid: true,
}, {
desc: "(value, context)",
order: []funcx.FnParamKind{funcx.FnValue, funcx.FnContext},
valid: false,
}, {
desc: "(value, time)",
order: []funcx.FnParamKind{funcx.FnValue, funcx.FnEventTime},
valid: false,
}, {
desc: "(context,time,value)",
order: []funcx.FnParamKind{funcx.FnContext, funcx.FnEventTime, funcx.FnValue},
valid: true,
}, {
desc: "(context,value,time)",
order: []funcx.FnParamKind{funcx.FnContext, funcx.FnValue, funcx.FnEventTime},
valid: false,
}, {
desc: "(time,context,value)",
order: []funcx.FnParamKind{funcx.FnEventTime, funcx.FnContext, funcx.FnValue},
valid: false,
}, {
desc: "(time,value,context)",
order: []funcx.FnParamKind{funcx.FnEventTime, funcx.FnContext, funcx.FnValue},
valid: false,
}, {
desc: "(value,context,time)",
order: []funcx.FnParamKind{funcx.FnValue, funcx.FnContext, funcx.FnEventTime},
valid: false,
}, {
desc: "(value,time,context)",
order: []funcx.FnParamKind{funcx.FnValue, funcx.FnEventTime, funcx.FnValue},
valid: false,
desc: "(emit, context,value)",
order: []funcx.FnParamKind{funcx.FnEmit, funcx.FnContext, funcx.FnValue},
valid: false,
desc: "(value, emit)",
order: []funcx.FnParamKind{funcx.FnValue, funcx.FnEmit},
valid: true,
desc: "(context,value, emit)",
order: []funcx.FnParamKind{funcx.FnContext, funcx.FnValue, funcx.FnEmit},
valid: true,
} {
var fn funcx.Fn
for _, kind := range tc.order {
fn.Param = append(fn.Param, funcx.FnParam{Kind: kind})
err := validateArgOrder(&fn)
if tc.valid {
if err != nil {
t.Errorf("Test case %s should have been valid, but got error %v", tc.desc, err)
} else if err == nil {
t.Errorf("Test case %s should not have been valid, but no error was returned", tc.desc)
func TestValidateReturnOrder(t *testing.T) {
for _, tc := range []struct {
desc string
order []funcx.ReturnKind
valid bool
desc: "(time)",
order: []funcx.ReturnKind{funcx.RetEventTime},
valid: true,
}, {
desc: "(value)",
order: []funcx.ReturnKind{funcx.RetValue},
valid: true,
}, {
desc: "(error)",
order: []funcx.ReturnKind{funcx.RetError},
valid: true,
}, {
desc: "(time,value)",
order: []funcx.ReturnKind{funcx.RetEventTime, funcx.RetValue},
valid: true,
}, {
desc: "(time,error)",
order: []funcx.ReturnKind{funcx.RetEventTime, funcx.RetError},
valid: true,
}, {
desc: "(value,time)",
order: []funcx.ReturnKind{funcx.RetValue, funcx.RetEventTime},
valid: false,
}, {
desc: "(value,error)",
order: []funcx.ReturnKind{funcx.RetValue, funcx.RetError},
valid: true,
}, {
desc: "(error,time)",
order: []funcx.ReturnKind{funcx.RetError, funcx.RetEventTime},
valid: false,
}, {
desc: "(error,value)",
order: []funcx.ReturnKind{funcx.RetError, funcx.RetValue},
valid: false,
}, {
desc: "(time,value,error)",
order: []funcx.ReturnKind{funcx.RetEventTime, funcx.RetValue, funcx.RetError},
valid: true,
}, {
desc: "(time,error,value)",
order: []funcx.ReturnKind{funcx.RetEventTime, funcx.RetError, funcx.RetValue},
valid: false,
}, {
desc: "(value, time, error)",
order: []funcx.ReturnKind{funcx.RetValue, funcx.RetEventTime, funcx.RetError},
valid: false,
}, {
desc: "(value,error,time)",
order: []funcx.ReturnKind{funcx.RetValue, funcx.RetError, funcx.RetEventTime},
valid: false,
}, {
desc: "(error,time,value)",
order: []funcx.ReturnKind{funcx.RetError, funcx.RetEventTime, funcx.RetValue},
valid: false,
}, {
desc: "(error,value,time)",
order: []funcx.ReturnKind{funcx.RetError, funcx.RetValue, funcx.RetEventTime},
valid: false,
} {
t.Run(fmt.Sprintf("function with return values %s", tc.desc), func(t *testing.T) {
var fn funcx.Fn
for _, kind := range tc.order {
fn.Ret = append(fn.Ret, funcx.ReturnParam{Kind: kind})
err := validateRetOrder(&fn)
if tc.valid != (err == nil) {
t.Errorf("Testcase %s returned error %v, valid: %v", tc.desc, err, tc.valid)