Fix a rare privacy bug in DistinctPerKey in Privacy on Beam. (#84)

The bug occurred when there are outlier users in the input that
contribute to many partitions and/or to many values AND the
values contributed are the same as values from other users (the second
part is critical, if the contributed values only come from a single user
then the bug does not occur). Then, the output might not have be DP due
to incorrect contribution bounding. See the comments in the newly added
tests for concrete examples of when/how the bug used to occur.

This is cherry-picked from the main branch commit
e149618d032f97b476fecc5839a278cc680def08.

This commit includes a minor change compared to the one on the main
branch: It removes the error output for kv.Codec.Encode/Decode
functions used in aggregations.go since the error output for these
functions weren't implemented in v1.0.0.
diff --git a/privacy-on-beam/pbeam/aggregations.go b/privacy-on-beam/pbeam/aggregations.go
index 43d5e17..4ef8792 100644
--- a/privacy-on-beam/pbeam/aggregations.go
+++ b/privacy-on-beam/pbeam/aggregations.go
@@ -42,8 +42,11 @@
 	beam.RegisterType(reflect.TypeOf((*decodePairInt64Fn)(nil)))
 	beam.RegisterType(reflect.TypeOf((*decodePairFloat64Fn)(nil)))
 	beam.RegisterType(reflect.TypeOf((*dropValuesFn)(nil)))
+	beam.RegisterType(reflect.TypeOf((*encodeKVFn)(nil)))
 	beam.RegisterType(reflect.TypeOf((*encodeIDKFn)(nil)))
+	beam.RegisterType(reflect.TypeOf((*decodeIDKFn)(nil)))
 	beam.RegisterType(reflect.TypeOf((*expandValuesCombineFn)(nil)))
+	beam.RegisterType(reflect.TypeOf((*expandFloat64ValuesCombineFn)(nil)))
 	beam.RegisterType(reflect.TypeOf((*decodePairArrayFloat64Fn)(nil)))
 	beam.RegisterType(reflect.TypeOf((*partitionsMapFn)(nil)).Elem())
 	beam.RegisterType(reflect.TypeOf((*prunePartitionsVFn)(nil)).Elem())
@@ -736,6 +739,26 @@
 	return id, k
 }
 
+// encodeKVFn takes a PCollection<kv.Pair{ID,K}, codedV> as input, and returns a
+// PCollection<ID, kv.Pair{K,V}>; where K and V have been coded, and ID has been
+// decoded.
+type encodeKVFn struct {
+	InputPairCodec *kv.Codec // Codec for the input kv.Pair{ID,K}
+}
+
+func newEncodeKVFn(idkCodec *kv.Codec) *encodeKVFn {
+	return &encodeKVFn{InputPairCodec: idkCodec}
+}
+
+func (fn *encodeKVFn) Setup() error {
+	return fn.InputPairCodec.Setup()
+}
+
+func (fn *encodeKVFn) ProcessElement(pair kv.Pair, codedV []byte) (beam.W, kv.Pair) {
+	id, _ := fn.InputPairCodec.Decode(pair)
+	return id, kv.Pair{pair.V, codedV} // pair.V is the K in PCollection<kv.Pair{ID,K}, codedV>
+}
+
 // encodeIDKFn takes a PCollection<ID,kv.Pair{K,V}> as input, and returns a
 // PCollection<kv.Pair{ID,K},V>; where ID and K have been coded, and V has been
 // decoded.
@@ -766,6 +789,36 @@
 	return kv.Pair{idBuf.Bytes(), pair.K}, v
 }
 
+// decodeIDKFn is the reverse operation of encodeIDKFn. It takes a PCollection<kv.Pair{ID,K},V>
+// as input, and returns a PCollection<ID, kv.Pair{K,V}>; where K and V has been coded, and ID
+// has been decoded.
+type decodeIDKFn struct {
+	VType          beam.EncodedType    // Type information of the value V
+	vEnc           beam.ElementEncoder // Encoder for privacy ID, set during Setup() according to VType
+	InputPairCodec *kv.Codec           // Codec for the input kv.Pair{ID,K}
+}
+
+func newDecodeIDKFn(vType typex.FullType, idkCodec *kv.Codec) *decodeIDKFn {
+	return &decodeIDKFn{
+		VType:          beam.EncodedType{vType.Type()},
+		InputPairCodec: idkCodec,
+	}
+}
+
+func (fn *decodeIDKFn) Setup() error {
+	fn.vEnc = beam.NewElementEncoder(fn.VType.T)
+	return fn.InputPairCodec.Setup()
+}
+
+func (fn *decodeIDKFn) ProcessElement(pair kv.Pair, v beam.V) (beam.W, kv.Pair, error) {
+	var vBuf bytes.Buffer
+	if err := fn.vEnc.Encode(v, &vBuf); err != nil {
+		return nil, kv.Pair{}, fmt.Errorf("pbeam.decodeIDKFn.ProcessElement: couldn't encode V %v: %w", v, err)
+	}
+	id, _ := fn.InputPairCodec.Decode(pair)
+	return id, kv.Pair{pair.V, vBuf.Bytes()}, nil // pair.V is the K in PCollection<kv.Pair{ID,K},V>
+}
+
 // decodePairArrayFloat64Fn transforms a PCollection<pairArrayFloat64<codedX,[]float64>> into a
 // PCollection<X,[]float64>.
 type decodePairArrayFloat64Fn struct {
@@ -862,21 +915,36 @@
 }
 
 type expandValuesAccum struct {
-	Values []float64
+	Values [][]byte
 }
 
-// expandValuesCombineFn converts a PCollection<K,float64> to PCollection<K,[]float64>
-// where each value corresponding to the same key are collected in a slice. Resulting
-// PCollection has a single slice for each key.
-type expandValuesCombineFn struct{}
+// expandValuesCombineFn converts a PCollection<K,V> to PCollection<K,[]V> where each value
+// corresponding to the same key are collected in a slice. Resulting PCollection has a
+// single slice for each key.
+type expandValuesCombineFn struct {
+	VType beam.EncodedType
+	vEnc  beam.ElementEncoder
+}
+
+func newExpandValuesCombineFn(vType beam.EncodedType) *expandValuesCombineFn {
+	return &expandValuesCombineFn{VType: vType}
+}
+
+func (fn *expandValuesCombineFn) Setup() {
+	fn.vEnc = beam.NewElementEncoder(fn.VType.T)
+}
 
 func (fn *expandValuesCombineFn) CreateAccumulator() expandValuesAccum {
-	return expandValuesAccum{Values: make([]float64, 0)}
+	return expandValuesAccum{Values: make([][]byte, 0)}
 }
 
-func (fn *expandValuesCombineFn) AddInput(a expandValuesAccum, value float64) expandValuesAccum {
-	a.Values = append(a.Values, value)
-	return a
+func (fn *expandValuesCombineFn) AddInput(a expandValuesAccum, value beam.V) (expandValuesAccum, error) {
+	var vBuf bytes.Buffer
+	if err := fn.vEnc.Encode(value, &vBuf); err != nil {
+		return a, fmt.Errorf("pbeam.expandValuesCombineFn.AddInput: couldn't encode V %v: %w", value, err)
+	}
+	a.Values = append(a.Values, vBuf.Bytes())
+	return a, nil
 }
 
 func (fn *expandValuesCombineFn) MergeAccumulators(a, b expandValuesAccum) expandValuesAccum {
@@ -884,7 +952,34 @@
 	return a
 }
 
-func (fn *expandValuesCombineFn) ExtractOutput(a expandValuesAccum) []float64 {
+func (fn *expandValuesCombineFn) ExtractOutput(a expandValuesAccum) [][]byte {
+	return a.Values
+}
+
+type expandFloat64ValuesAccum struct {
+	Values []float64
+}
+
+// expandFloat64ValuesCombineFn converts a PCollection<K,float64> to PCollection<K,[]float64>
+// where each value corresponding to the same key are collected in a slice. Resulting
+// PCollection has a single slice for each key.
+type expandFloat64ValuesCombineFn struct{}
+
+func (fn *expandFloat64ValuesCombineFn) CreateAccumulator() expandFloat64ValuesAccum {
+	return expandFloat64ValuesAccum{Values: make([]float64, 0)}
+}
+
+func (fn *expandFloat64ValuesCombineFn) AddInput(a expandFloat64ValuesAccum, value float64) expandFloat64ValuesAccum {
+	a.Values = append(a.Values, value)
+	return a
+}
+
+func (fn *expandFloat64ValuesCombineFn) MergeAccumulators(a, b expandFloat64ValuesAccum) expandFloat64ValuesAccum {
+	a.Values = append(a.Values, b.Values...)
+	return a
+}
+
+func (fn *expandFloat64ValuesCombineFn) ExtractOutput(a expandFloat64ValuesAccum) []float64 {
 	return a.Values
 }
 
diff --git a/privacy-on-beam/pbeam/coders.go b/privacy-on-beam/pbeam/coders.go
index bc6a268..eaffd80 100644
--- a/privacy-on-beam/pbeam/coders.go
+++ b/privacy-on-beam/pbeam/coders.go
@@ -33,6 +33,7 @@
 	beam.RegisterCoder(reflect.TypeOf(boundedMeanAccumFloat64{}), encodeBoundedMeanAccumFloat64, decodeBoundedMeanAccumFloat64)
 	beam.RegisterCoder(reflect.TypeOf(boundedQuantilesAccum{}), encodeBoundedQuantilesAccum, decodeBoundedQuantilesAccum)
 	beam.RegisterCoder(reflect.TypeOf(expandValuesAccum{}), encodeExpandValuesAccum, decodeExpandValuesAccum)
+	beam.RegisterCoder(reflect.TypeOf(expandFloat64ValuesAccum{}), encodeExpandFloat64ValuesAccum, decodeExpandFloat64ValuesAccum)
 	beam.RegisterCoder(reflect.TypeOf(partitionSelectionAccum{}), encodePartitionSelectionAccum, decodePartitionSelectionAccum)
 }
 
@@ -96,6 +97,16 @@
 	return ret, err
 }
 
+func encodeExpandFloat64ValuesAccum(v expandFloat64ValuesAccum) ([]byte, error) {
+	return encode(v)
+}
+
+func decodeExpandFloat64ValuesAccum(data []byte) (expandFloat64ValuesAccum, error) {
+	var ret expandFloat64ValuesAccum
+	err := decode(&ret, data)
+	return ret, err
+}
+
 func encodePartitionSelectionAccum(v partitionSelectionAccum) ([]byte, error) {
 	return encode(v)
 }
diff --git a/privacy-on-beam/pbeam/distinct_per_key.go b/privacy-on-beam/pbeam/distinct_per_key.go
index 1bec85e..cd94f43 100644
--- a/privacy-on-beam/pbeam/distinct_per_key.go
+++ b/privacy-on-beam/pbeam/distinct_per_key.go
@@ -74,7 +74,7 @@
 func DistinctPerKey(s beam.Scope, pcol PrivatePCollection, params DistinctPerKeyParams) beam.PCollection {
 	s = s.Scope("pbeam.DistinctPerKey")
 	// Obtain type information from the underlying PCollection<K,V>.
-	_, kvT := beam.ValidateKVType(pcol.col)
+	idT, kvT := beam.ValidateKVType(pcol.col)
 	if kvT.Type() != reflect.TypeOf(kv.Pair{}) {
 		log.Exitf("DistinctPerKey must be used on a PrivatePCollection of type <K,V>, got type %v instead", kvT)
 	}
@@ -103,14 +103,62 @@
 		log.Exit(err)
 	}
 
-	// Perform partition selection
+	// Do initial per- and cross-partition contribution bounding and swap kv.Pair<K,V> and ID.
+	// This is not great in terms of utility, since dropping contributions randomly might
+	// mean that we keep duplicates instead of distinct values. However, this is necessary
+	// for the current algorithm to be DP.
+	if spec.testMode != noNoiseWithoutContributionBounding {
+		// First, rekey by kv.Pair{ID,K} and do per-partition contribution bounding.
+		rekeyed := beam.ParDo(
+			s,
+			newEncodeIDKFn(idT, pcol.codec),
+			pcol.col,
+			beam.TypeDefinition{Var: beam.VType, T: pcol.codec.VType.T}) // PCollection<kv.Pair{ID,K}, V>.
+		// Keep only maxContributionsPerPartition values per (privacyKey, partitionKey) pair.
+		sampled := boundContributions(s, rekeyed, params.MaxContributionsPerPartition)
+
+		// Collect all values per kv.Pair{ID,K} in a slice.
+		combined := beam.CombinePerKey(s,
+			newExpandValuesCombineFn(pcol.codec.VType),
+			sampled) // PCollection<kv.Pair{ID,K}, []codedV}>, where codedV=[]byte
+
+		_, codedVSliceType := beam.ValidateKVType(combined)
+
+		decoded := beam.ParDo(
+			s,
+			newDecodeIDKFn(codedVSliceType, kv.NewCodec(idT.Type(), pcol.codec.KType.T)),
+			combined,
+			beam.TypeDefinition{Var: beam.WType, T: idT.Type()}) // PCollection<ID, kv.Pair{K,[]codedV}>, where codedV=[]byte
+
+		// Second, do cross-partition contribution bounding.
+		decoded = boundContributions(s, decoded, params.MaxPartitionsContributed)
+
+		rekeyed = beam.ParDo(
+			s,
+			newEncodeIDKFn(idT, kv.NewCodec(pcol.codec.KType.T, codedVSliceType.Type())),
+			decoded,
+			beam.TypeDefinition{Var: beam.VType, T: codedVSliceType.Type()}) // PCollection<kv.Pair{ID,K}, []codedV>, where codedV=[]byte
+
+		flattened := beam.ParDo(s, flattenValuesFn, rekeyed) // PCollection<kv.Pair{ID,K}, codedV>, where codedV=[]byte
+
+		pcol.col = beam.ParDo(
+			s,
+			newEncodeKVFn(kv.NewCodec(idT.Type(), pcol.codec.KType.T)),
+			flattened,
+			beam.TypeDefinition{Var: beam.WType, T: idT.Type()}) // PCollection<ID, kv.Pair{K,V}>
+	}
+
+	// Perform partition selection.
+	// We do partition selection after cross-partition contribution bounding because
+	// we want to keep the same contributions across partitions for partition selection
+	// and Count.
 	noiseEpsilon, partitionSelectionEpsilon, noiseDelta, partitionSelectionDelta := splitBudget(epsilon, delta, noiseKind)
 	partitions := SelectPartitions(s, pcol, SelectPartitionsParams{Epsilon: partitionSelectionEpsilon, Delta: partitionSelectionDelta, MaxPartitionsContributed: params.MaxPartitionsContributed})
 
-	// Deduplicate (partitionKey,value) pairs across users.
-	rekeyed := beam.SwapKV(s, pcol.col) // PCollection<kv.Pair{K,V}, ID>.
-	// Only keep one privacyKey per (partitionKey,value) pair.
-	sampled := boundContributions(s, rekeyed, 1)
+	// Keep only one privacyKey per (partitionKey, value) pair
+	// (i.e. remove duplicate values for each partition).
+	swapped := beam.SwapKV(s, pcol.col) // PCollection<kv.Pair{K,V}, ID>
+	sampled := boundContributions(s, swapped, 1)
 
 	// Drop V's, each <privacyKey, partitionKey> pair now corresponds to a unique V.
 	sampled = beam.SwapKV(s, sampled) // PCollection<ID, kv.Pair{K,V}>.
diff --git a/privacy-on-beam/pbeam/distinct_per_key_test.go b/privacy-on-beam/pbeam/distinct_per_key_test.go
index 73f915a..0ad6693 100644
--- a/privacy-on-beam/pbeam/distinct_per_key_test.go
+++ b/privacy-on-beam/pbeam/distinct_per_key_test.go
@@ -30,11 +30,15 @@
 // are correctly counted (without duplicates).
 func TestDistinctPrivacyKeyNoNoise(t *testing.T) {
 	var triples []testutils.TripleWithIntValue
-	for i := 0; i < 100; i++ { // Add 400 values of which 200 are distinct to Partition 0.
+	for i := 0; i < 100; i++ { // Add 200 distinct values to Partition 0.
 		triples = append(triples, testutils.TripleWithIntValue{ID: i, Partition: 0, Value: i})
 		triples = append(triples, testutils.TripleWithIntValue{ID: i, Partition: 0, Value: 100 + i})
-		triples = append(triples, testutils.TripleWithIntValue{ID: i, Partition: 0, Value: i})       // Duplicate each value. Should be discarded by DistinctPerKey.
-		triples = append(triples, testutils.TripleWithIntValue{ID: i, Partition: 0, Value: 100 + i}) // Duplicate each value. Should be discarded by DistinctPerKey.
+	}
+	for i := 100; i < 200; i++ { // Add 200 additional values, all of which are duplicates of the existing distinct values, to Partition 0.
+		// The duplicates come from users different from the 100 users above in order to not discard
+		// any distinct values during the initial per-partition contribution bounding step.
+		triples = append(triples, testutils.TripleWithIntValue{ID: i, Partition: 0, Value: i - 100}) // Duplicate. Should be discarded by DistinctPerKey.
+		triples = append(triples, testutils.TripleWithIntValue{ID: i, Partition: 0, Value: i})       // Duplicate. Should be discarded by DistinctPerKey.
 	}
 	for i := 0; i < 50; i++ { // Add 200 values of which 100 are distinct to Partition 1.
 		triples = append(triples, testutils.TripleWithIntValue{ID: i, Partition: 1, Value: i})
@@ -183,6 +187,49 @@
 	}
 }
 
+// Checks that DistinctPrivacyKey bounds cross-partition contributions before doing deduplication of
+// values. This is to ensure we don't run into a contribution bounding-related privacy bug in some
+// rare cases.
+func TestDistinctPerKeyPerKeyCrossPartitionContributionBounding_IsAppliedBeforeDeduplication(t *testing.T) {
+	var triples []testutils.TripleWithIntValue
+	for i := 0; i < 100; i++ { // Add value=1 to 100 partitions.
+		triples = append(triples, testutils.TripleWithIntValue{ID: i, Partition: i, Value: 1})
+	}
+	for i := 0; i < 100; i++ { // Add a user that contributes value=1 to all 100 partitions.
+		triples = append(triples, testutils.TripleWithIntValue{ID: 100, Partition: i, Value: 1})
+	}
+	// Assume cross-partition contribution bounding is not done before deduplication of values.
+	// Each value=1 in each of the i ∈ {0, ..., 99} partitions would have two users associated
+	// with it: user with ID=i and user with ID=100. We pick one of these two users randomly,
+	// so in expectation about 50 of 100 partitions' deduplicated values would have user with id=100
+	// associated with them. After cross-partition contribution bounding happens, we would be
+	// left with around 50 partitions with a single distinct value each and the test would fail.
+	result := []testutils.TestInt64Metric{}
+	for i := 0; i < 100; i++ {
+		result = append(result, testutils.TestInt64Metric{i, 1})
+	}
+	p, s, col, want := ptest.CreateList2(triples, result)
+	col = beam.ParDo(s, testutils.ExtractIDFromTripleWithIntValue, col)
+
+	// ε=50, δ=1-10⁻¹⁵ and l1Sensitivity=1 gives a threshold of ≈2.
+	// However, since δ is very large, a partition with a single user
+	// is kept with a probability almost 1.
+	// We have 100 partitions. So, to get an overall flakiness of 10⁻²³,
+	// we can have each partition fail with 1-10⁻²⁵ probability (k=25).
+	epsilon, delta, k, l1Sensitivity := 50.0, 1-1e-15, 25.0, 1.0
+	// ε is split by 2 for noise and for partition selection, so we use 2*ε to get a Laplace noise with ε.
+	pcol := MakePrivate(s, col, NewPrivacySpec(2*epsilon, delta))
+	pcol = ParDo(s, testutils.TripleWithIntValueToKV, pcol)
+	got := DistinctPerKey(s, pcol, DistinctPerKeyParams{MaxPartitionsContributed: 1, NoiseKind: LaplaceNoise{}, MaxContributionsPerPartition: 1})
+	want = beam.ParDo(s, testutils.Int64MetricToKV, want)
+	if err := testutils.ApproxEqualsKVInt64(s, got, want, testutils.LaplaceTolerance(k, l1Sensitivity, epsilon)); err != nil {
+		t.Fatalf("TestDistinctPerKeyPerKeyCrossPartitionContributionBounding_IsAppliedBeforeDeduplication: %v", err)
+	}
+	if err := ptest.Run(p); err != nil {
+		t.Errorf("TestDistinctPerKeyPerKeyCrossPartitionContributionBounding_IsAppliedBeforeDeduplication: DistinctPerKey(%v) = %v, expected %v: %v", col, got, want, err)
+	}
+}
+
 // Checks that DistinctPrivacyKey bounds per-partition contributions correctly.
 func TestDistinctPrivacyKeyPerPartitionContributionBounding(t *testing.T) {
 	var triples []testutils.TripleWithIntValue
@@ -218,8 +265,6 @@
 	// ε=50, δ=10⁻¹⁰⁰ and l1Sensitivity=6 gives a threshold of ≈33.
 	// We have 3 partitions. So, to get an overall flakiness of 10⁻²³,
 	// we can have each partition fail with 1-10⁻²⁵ probability (k=25).
-	// To see the logic and the math behind flakiness and tolerance calculation,
-	// See https://github.com/google/differential-privacy/blob/main/privacy-on-beam/docs/Tolerance_Calculation.pdf.
 	epsilon, delta, k, l1Sensitivity := 50.0, 1e-100, 25.0, 6.0
 	// ε is split by 2 for noise and for partition selection, so we use 2*ε to get a Laplace noise with ε.
 	pcol := MakePrivate(s, col, NewPrivacySpec(2*epsilon, delta))
@@ -227,10 +272,50 @@
 	got := DistinctPerKey(s, pcol, DistinctPerKeyParams{MaxPartitionsContributed: 3, NoiseKind: LaplaceNoise{}, MaxContributionsPerPartition: 2})
 	want = beam.ParDo(s, testutils.Int64MetricToKV, want)
 	if err := testutils.ApproxEqualsKVInt64(s, got, want, testutils.LaplaceTolerance(k, l1Sensitivity, epsilon)); err != nil {
-		t.Fatalf("TestDistinctPerKeyNoNoise: %v", err)
+		t.Fatalf("TestDistinctPrivacyKeyPerPartitionContributionBounding: %v", err)
 	}
 	if err := ptest.Run(p); err != nil {
-		t.Errorf("TestDistinctPerKeyNoNoise: DistinctPerKey(%v) = %v, expected %v: %v", col, got, want, err)
+		t.Errorf("TestDistinctPrivacyKeyPerPartitionContributionBounding: DistinctPerKey(%v) = %v, expected %v: %v", col, got, want, err)
+	}
+}
+
+// Checks that DistinctPrivacyKey bounds per-partition contributions before doing deduplication of
+// values. This is to ensure we don't run into a contribution bounding-related privacy bug in some
+// rare cases.
+func TestDistinctPrivacyKeyPerPartitionContributionBounding_IsAppliedBeforeDeduplication(t *testing.T) {
+	var triples []testutils.TripleWithIntValue
+	for i := 0; i < 100; i++ { // Add 100 distinct values to Partition 0.
+		triples = append(triples, testutils.TripleWithIntValue{ID: i, Partition: 0, Value: i})
+	}
+	for i := 0; i < 100; i++ { // Add a user that contributes all these 100 distinct values to Partition 0.
+		triples = append(triples, testutils.TripleWithIntValue{ID: 100, Partition: 0, Value: i})
+	}
+	// Assume per-partition contribution bounding is not done before deduplication of values.
+	// Each value i ∈ {0, ..., 99} would have two users associated with it: user with ID=i and
+	// user with ID=100. We pick one of these two users randomly, so in expectation about 50
+	// of 100 deduplicated values would have user with id=100 associated with them. After
+	// per-partition contribution bounding happens, we would be left with around 50 distinct
+	// values and the test would fail.
+	result := []testutils.TestInt64Metric{
+		{0, 100},
+	}
+	p, s, col, want := ptest.CreateList2(triples, result)
+	col = beam.ParDo(s, testutils.ExtractIDFromTripleWithIntValue, col)
+
+	// ε=50, δ=10⁻¹⁰⁰ and l1Sensitivity=1 gives a threshold of ≈6.
+	// We have 1 partition. So, to get an overall flakiness of 10⁻²³,
+	// we need to have each partition pass with 1-10⁻²³ probability (k=23).
+	epsilon, delta, k, l1Sensitivity := 50.0, 1e-100, 23.0, 1.0
+	// ε is split by 2 for noise and for partition selection, so we use 2*ε to get a Laplace noise with ε.
+	pcol := MakePrivate(s, col, NewPrivacySpec(2*epsilon, delta))
+	pcol = ParDo(s, testutils.TripleWithIntValueToKV, pcol)
+	got := DistinctPerKey(s, pcol, DistinctPerKeyParams{MaxPartitionsContributed: 1, NoiseKind: LaplaceNoise{}, MaxContributionsPerPartition: 1})
+	want = beam.ParDo(s, testutils.Int64MetricToKV, want)
+	if err := testutils.ApproxEqualsKVInt64(s, got, want, testutils.LaplaceTolerance(k, l1Sensitivity, epsilon)); err != nil {
+		t.Fatalf("TestDistinctPrivacyKeyPerPartitionContributionBounding_IsAppliedBeforeDeduplication: %v", err)
+	}
+	if err := ptest.Run(p); err != nil {
+		t.Errorf("TestDistinctPrivacyKeyPerPartitionContributionBounding_IsAppliedBeforeDeduplication: DistinctPerKey(%v) = %v, expected %v: %v", col, got, want, err)
 	}
 }
 
diff --git a/privacy-on-beam/pbeam/mean.go b/privacy-on-beam/pbeam/mean.go
index f38ee22..58674e4 100644
--- a/privacy-on-beam/pbeam/mean.go
+++ b/privacy-on-beam/pbeam/mean.go
@@ -168,7 +168,7 @@
 	// Combine all values for <id, partition> into a slice.
 	// Result is PCollection<kv.Pair{ID,K},[]float64>.
 	combined := beam.CombinePerKey(s,
-		&expandValuesCombineFn{},
+		&expandFloat64ValuesCombineFn{},
 		converted)
 
 	// Result is PCollection<ID, pairArrayFloat64>.
diff --git a/privacy-on-beam/pbeam/quantiles.go b/privacy-on-beam/pbeam/quantiles.go
index e6ce9ed..ceeae23 100644
--- a/privacy-on-beam/pbeam/quantiles.go
+++ b/privacy-on-beam/pbeam/quantiles.go
@@ -184,7 +184,7 @@
 	// Combine all values for <id, partition> into a slice.
 	// Result is PCollection<kv.Pair{ID,K},[]float64>.
 	combined := beam.CombinePerKey(s,
-		&expandValuesCombineFn{},
+		&expandFloat64ValuesCombineFn{},
 		converted)
 
 	// Result is PCollection<ID, pairArrayFloat64>.