blob: be5cfdfce599c512e4cece2f7dbc4cddf1fcfbc7 [file] [log] [blame]
//
// Copyright 2021 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
package pbeamtest
import (
"testing"
"github.com/google/differential-privacy/privacy-on-beam/v2/pbeam"
"github.com/google/differential-privacy/privacy-on-beam/v2/pbeam/testutils"
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"
"github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/stats"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
)
func TestMain(m *testing.M) {
ptest.Main(m)
}
const (
// Low ε & δ (i.e. high noise) ensures that we would add noise if test PrivacySpec does not work as intended.
tinyEpsilon = 1e-10
tinyDelta = 1e-200
// Zero δ is used when public partitions are specified.
zeroDelta = 0.0
)
// Tests that DistinctPrivacyID bounds per-partition and cross-partition contributions
// correctly, adds no noise and keeps all partitions in test mode.
func TestDistinctPrivacyIDTestMode(t *testing.T) {
for _, tc := range []struct {
desc string
privacySpec *pbeam.PrivacySpec
maxPartitionsContributed int64
want int64
}{
{
desc: "test mode with contribution bounding",
privacySpec: NewPrivacySpecNoNoiseWithContributionBounding(tinyEpsilon, tinyDelta),
maxPartitionsContributed: 3,
// The same privacy ID contributes to 10 partitions, which implies that count of each
// partition is 1. With a max contribution of 3, 7 partitions should be dropped. The sum
// of all counts must then be 3. This also ensures that no partitions (each with a single
// privacy id) gets thresholded.
want: 3,
},
{
desc: "test mode without contribution bounding",
privacySpec: NewPrivacySpecNoNoiseWithoutContributionBounding(tinyEpsilon, tinyDelta),
maxPartitionsContributed: 3,
// The same privacy ID contributes to 10 partitions, which implies that count of each
// partition is 1. Contribution bounding is disabled. The sum of all counts must then be 10.
// This also ensures that no partitions (each with a single privacy id) gets thresholded.
want: 10,
},
} {
// pairs{privacy_id, partition_key} contains {0,0}, {0,0}, {0,1}, {0,1}, {0,2}, {0,2}, …, {0,9}, {0,9}.
var pairs []testutils.PairII
for i := 0; i < 10; i++ {
pairs = append(pairs, testutils.MakePairsWithFixedV(1, i)...)
pairs = append(pairs, testutils.MakePairsWithFixedV(1, i)...) // Duplicate contributions should be dropped for DistinctPrivacyID.
}
wantMetric := []testutils.TestInt64Metric{
{0, tc.want},
}
p, s, col, want := ptest.CreateList2(pairs, wantMetric)
col = beam.ParDo(s, testutils.PairToKV, col)
pcol := pbeam.MakePrivate(s, col, tc.privacySpec)
got := pbeam.DistinctPrivacyID(s, pcol, pbeam.DistinctPrivacyIDParams{
MaxPartitionsContributed: tc.maxPartitionsContributed,
NoiseKind: pbeam.LaplaceNoise{}})
counts := beam.DropKey(s, got)
sumOverPartitions := stats.Sum(s, counts)
got = beam.AddFixedKey(s, sumOverPartitions) // Adds a fixed key of 0.
want = beam.ParDo(s, testutils.Int64MetricToKV, want)
if err := testutils.EqualsKVInt64(s, got, want); err != nil {
t.Fatalf("EqualsKVInt64: %v", err)
}
if err := ptest.Run(p); err != nil {
t.Errorf("DistinctPrivacyID: %s did not bound contributions correctly: %v", tc.desc, err)
}
}
}
// Tests that DistinctPrivacyID with public partitions bounds per-partition and cross-partition
// contributions correctly, adds no noise and respects public partitions (keeps only public partitions)
// in test mode.
func TestDistinctPrivacyIDWithPartitionsTestMode(t *testing.T) {
for _, tc := range []struct {
desc string
privacySpec *pbeam.PrivacySpec
maxPartitionsContributed int64
want int64
}{
{
desc: "test mode with contribution bounding",
privacySpec: NewPrivacySpecNoNoiseWithContributionBounding(tinyEpsilon, zeroDelta),
maxPartitionsContributed: 3,
// The same privacy ID contributes to 10 partitions, which implies that count of each
// partition is 1. With a max contribution of 3, 2 out of 5 public partitions should be
// dropped. The sum of all counts must then be 3.
want: 3,
},
{
desc: "test mode without contribution bounding",
privacySpec: NewPrivacySpecNoNoiseWithoutContributionBounding(tinyEpsilon, zeroDelta),
maxPartitionsContributed: 3,
// The same privacy ID contributes to 10 partitions, which implies that count of each
// partition is 1. Contribution bounding is disabled and 5 out of 10 partitions are
// specified as public partitions. The sum of all counts must then be 5.
want: 5,
},
} {
// pairs{privacy_id, partition_key} contains {0,0}, {0,0}, {0,1}, {0,1}, {0,2}, {0,2}, …, {0,9}, {0,9}.
var pairs []testutils.PairII
for i := 0; i < 10; i++ {
pairs = append(pairs, testutils.MakePairsWithFixedV(1, i)...)
pairs = append(pairs, testutils.MakePairsWithFixedV(1, i)...) // Duplicate contributions should be dropped for DistinctPrivacyID.
}
wantMetric := []testutils.TestInt64Metric{
{0, tc.want},
}
p, s, col, want := ptest.CreateList2(pairs, wantMetric)
col = beam.ParDo(s, testutils.PairToKV, col)
partitions := []int{0, 1, 2, 3, 4}
// Create partition PCollection.
publicPartitions := beam.CreateList(s, partitions)
pcol := pbeam.MakePrivate(s, col, tc.privacySpec)
got := pbeam.DistinctPrivacyID(s, pcol, pbeam.DistinctPrivacyIDParams{
MaxPartitionsContributed: tc.maxPartitionsContributed,
NoiseKind: pbeam.LaplaceNoise{},
PublicPartitions: publicPartitions})
counts := beam.DropKey(s, got)
sumOverPartitions := stats.Sum(s, counts)
got = beam.AddFixedKey(s, sumOverPartitions) // Adds a fixed key of 0.
want = beam.ParDo(s, testutils.Int64MetricToKV, want)
if err := testutils.EqualsKVInt64(s, got, want); err != nil {
t.Fatalf("EqualsKVInt64: %v", err)
}
if err := ptest.Run(p); err != nil {
t.Errorf("DistinctPrivacyID: %s with partitions did not bound contributions correctly: %v", tc.desc, err)
}
}
}
// Tests that DistinctPrivacyID with public partitions adds empty partitions not found in the data
// but are in the list of public partitions in test mode.
func TestDistinctPrivacyIDWithPartitionsTestModeAddsEmptyPartitions(t *testing.T) {
for _, tc := range []struct {
desc string
privacySpec *pbeam.PrivacySpec
}{
{
desc: "test mode with contribution bounding",
privacySpec: NewPrivacySpecNoNoiseWithContributionBounding(tinyEpsilon, zeroDelta),
},
{
desc: "test mode without contribution bounding",
privacySpec: NewPrivacySpecNoNoiseWithoutContributionBounding(tinyEpsilon, zeroDelta),
},
} {
// pairs{privacy_id, partition_key} contains {0,0}, {0,1}, {0,2}, …, {0,9}.
var pairs []testutils.PairII
for i := 0; i < 10; i++ {
pairs = append(pairs, testutils.PairII{1, i})
}
wantMetric := []testutils.TestInt64Metric{
{9, 1}, // Keep partition 9.
{10, 0}, // Add partition 10.
}
p, s, col, want := ptest.CreateList2(pairs, wantMetric)
col = beam.ParDo(s, testutils.PairToKV, col)
partitions := []int{9, 10}
// Create partition PCollection.
publicPartitions := beam.CreateList(s, partitions)
pcol := pbeam.MakePrivate(s, col, tc.privacySpec)
got := pbeam.DistinctPrivacyID(s, pcol, pbeam.DistinctPrivacyIDParams{
MaxPartitionsContributed: 1,
NoiseKind: pbeam.LaplaceNoise{},
PublicPartitions: publicPartitions})
want = beam.ParDo(s, testutils.Int64MetricToKV, want)
if err := testutils.EqualsKVInt64(s, got, want); err != nil {
t.Fatalf("EqualsKVInt64: %v", err)
}
if err := ptest.Run(p); err != nil {
t.Errorf("DistinctPrivacyID: %s with partitions did not add empty partitions: %v", tc.desc, err)
}
}
}
// Tests that Count bounds per-partition and cross-partition contributions correctly,
// adds no noise and keeps all partitions in test mode.
func TestCountTestMode(t *testing.T) {
for _, tc := range []struct {
desc string
privacySpec *pbeam.PrivacySpec
maxPartitionsContributed int64
maxValue int64
want int64
}{
{
desc: "test mode with contribution bounding",
privacySpec: NewPrivacySpecNoNoiseWithContributionBounding(tinyEpsilon, tinyDelta),
maxPartitionsContributed: 3,
maxValue: 2,
// The same privacy ID contributes twice (third contribution is dropped due per-partition
// contribution bounding) to 10 partitions, which implies that count of each partition is 2.
// With a max contribution of 3, 7 partitions should be dropped. The sum of all counts must
// then be 6. This also ensures that no partitions (each with a single privacy id) gets
// thresholded.
want: 6,
},
{
desc: "test mode without contribution bounding",
maxPartitionsContributed: 3,
maxValue: 2,
privacySpec: NewPrivacySpecNoNoiseWithoutContributionBounding(tinyEpsilon, tinyDelta),
// The same privacy ID contributes thrice to 10 partitions, which implies that count of each
// partition is 3. Contribution bounding is disabled. The sum of all counts must then be 30.
// This also ensures that no partitions (each with a single privacy id) gets thresholded.
want: 30,
},
} {
// pairs{privacy_id, partition_key} contains {0,0}, {0,0}, {0,0}, {0,1}, {0,1}, {0,1}, {0,2}, {0,2}, {0,2}, …, {0,9}, {0,9}, {0,9}.
var pairs []testutils.PairII
for i := 0; i < 10; i++ {
// When contribution bounding is enabled, one of the three contributions should be dropped since MaxValue is 2.
pairs = append(pairs, testutils.MakePairsWithFixedV(1, i)...)
pairs = append(pairs, testutils.MakePairsWithFixedV(1, i)...)
pairs = append(pairs, testutils.MakePairsWithFixedV(1, i)...)
}
wantMetric := []testutils.TestInt64Metric{
{0, tc.want},
}
p, s, col, want := ptest.CreateList2(pairs, wantMetric)
col = beam.ParDo(s, testutils.PairToKV, col)
pcol := pbeam.MakePrivate(s, col, tc.privacySpec)
got := pbeam.Count(s, pcol, pbeam.CountParams{
MaxValue: tc.maxValue,
MaxPartitionsContributed: tc.maxPartitionsContributed,
NoiseKind: pbeam.LaplaceNoise{}})
counts := beam.DropKey(s, got)
sumOverPartitions := stats.Sum(s, counts)
got = beam.AddFixedKey(s, sumOverPartitions) // Adds a fixed key of 0.
want = beam.ParDo(s, testutils.Int64MetricToKV, want)
if err := testutils.EqualsKVInt64(s, got, want); err != nil {
t.Fatalf("EqualsKVInt64: %v", err)
}
if err := ptest.Run(p); err != nil {
t.Errorf("Count: %s did not bound contributions correctly: %v", tc.desc, err)
}
}
}
// Tests that Count with public partitions bounds per-partition and cross-partition contributions
// correctly, adds no noise and respects public partitions (keeps only public partitions) in test
// mode.
func TestCountWithPartitionsTestMode(t *testing.T) {
for _, tc := range []struct {
desc string
privacySpec *pbeam.PrivacySpec
maxPartitionsContributed int64
maxValue int64
want int64
}{
{
desc: "test mode with contribution bounding",
privacySpec: NewPrivacySpecNoNoiseWithContributionBounding(tinyEpsilon, zeroDelta),
maxPartitionsContributed: 3,
maxValue: 2,
// The same privacy ID contributes twice (third contribution is dropped due per-partition
// contribution bounding) to 10 partitions, which implies that count of each partition is 2.
// With a max contribution of 3, 2 out of 5 public partitions should be dropped. The sum of
// all counts must then be 6.
want: 6,
},
{
desc: "test mode without contribution bounding",
privacySpec: NewPrivacySpecNoNoiseWithoutContributionBounding(tinyEpsilon, zeroDelta),
maxPartitionsContributed: 3,
maxValue: 2,
// The same privacy ID contributes thrice to 10 partitions, which implies that count of each
// partition is 3. Contribution bounding is disabled and 5 out of 10 partitions are specified
// as public partitions. The sum of all counts must then be 15.
want: 15,
},
} {
// pairs{privacy_id, partition_key} contains {0,0}, {0,0}, {0,0}, {0,1}, {0,1}, {0,1}, {0,2}, {0,2}, {0,2}, …, {0,9}, {0,9}, {0,9}.
var pairs []testutils.PairII
for i := 0; i < 10; i++ {
// When contribution bounding is enabled, one of the three contributions should be dropped since MaxValue is 2.
pairs = append(pairs, testutils.MakePairsWithFixedV(1, i)...)
pairs = append(pairs, testutils.MakePairsWithFixedV(1, i)...)
pairs = append(pairs, testutils.MakePairsWithFixedV(1, i)...)
}
wantMetric := []testutils.TestInt64Metric{
{0, tc.want},
}
p, s, col, want := ptest.CreateList2(pairs, wantMetric)
col = beam.ParDo(s, testutils.PairToKV, col)
partitions := []int{0, 1, 2, 3, 4}
publicPartitions := beam.CreateList(s, partitions)
pcol := pbeam.MakePrivate(s, col, tc.privacySpec)
got := pbeam.Count(s, pcol, pbeam.CountParams{
MaxValue: tc.maxValue,
MaxPartitionsContributed: tc.maxPartitionsContributed,
NoiseKind: pbeam.LaplaceNoise{},
PublicPartitions: publicPartitions})
counts := beam.DropKey(s, got)
sumOverPartitions := stats.Sum(s, counts)
got = beam.AddFixedKey(s, sumOverPartitions) // Adds a fixed key of 0.
want = beam.ParDo(s, testutils.Int64MetricToKV, want)
if err := testutils.EqualsKVInt64(s, got, want); err != nil {
t.Fatalf("EqualsKVInt64: %v", err)
}
if err := ptest.Run(p); err != nil {
t.Errorf("Count: %s with partitions did not bound contributions correctly: %v", tc.desc, err)
}
}
}
// Tests that Count with public partitions adds empty partitions not found in the data
// but are in the list of public partitions in test mode.
func TestCountWithPartitionsTestModeAddsEmptyPartitions(t *testing.T) {
for _, tc := range []struct {
desc string
privacySpec *pbeam.PrivacySpec
}{
{
desc: "test mode with contribution bounding",
privacySpec: NewPrivacySpecNoNoiseWithContributionBounding(tinyEpsilon, zeroDelta),
},
{
desc: "test mode without contribution bounding",
privacySpec: NewPrivacySpecNoNoiseWithoutContributionBounding(tinyEpsilon, zeroDelta),
},
} {
var pairs []testutils.PairII
for i := 0; i < 10; i++ {
pairs = append(pairs, testutils.PairII{1, i})
}
wantMetric := []testutils.TestInt64Metric{
{9, 1}, // Keep partition 9.
{10, 0}, // Add partition 10.
}
p, s, col, want := ptest.CreateList2(pairs, wantMetric)
col = beam.ParDo(s, testutils.PairToKV, col)
partitions := []int{9, 10}
publicPartitions := beam.CreateList(s, partitions)
pcol := pbeam.MakePrivate(s, col, tc.privacySpec)
got := pbeam.Count(s, pcol, pbeam.CountParams{
MaxValue: 2,
MaxPartitionsContributed: 1,
NoiseKind: pbeam.LaplaceNoise{},
PublicPartitions: publicPartitions})
want = beam.ParDo(s, testutils.Int64MetricToKV, want)
if err := testutils.EqualsKVInt64(s, got, want); err != nil {
t.Fatalf("EqualsKVInt64: %v", err)
}
if err := ptest.Run(p); err != nil {
t.Errorf("Count: %s with partitions did not add empty partitions: %v", tc.desc, err)
}
}
}
// Tests that SumPerKey bounds per-partition and cross-partition contributions correctly,
// adds no noise and keeps all partitions in test mode with ints.
func TestSumPerKeyTestModeInt(t *testing.T) {
for _, tc := range []struct {
desc string
privacySpec *pbeam.PrivacySpec
maxPartitionsContributed int64
minValue float64
maxValue float64
want int64
}{
{
desc: "test mode with contribution bounding",
privacySpec: NewPrivacySpecNoNoiseWithContributionBounding(tinyEpsilon, tinyDelta),
maxPartitionsContributed: 3,
minValue: 0.0,
maxValue: 1.0,
// The same privacy ID contributes "1" ("2" is clamped to "1" since MaxValue is 1) to 10
// partitions, which implies that sum of each partition is 1. With a max contribution of 3,
// 7 partitions should be dropped. The sum of all sum must then be 3. This also ensures that
// no partitions (each with a single privacy id) gets thresholded.
want: 3,
},
{
desc: "test mode without contribution bounding",
privacySpec: NewPrivacySpecNoNoiseWithoutContributionBounding(tinyEpsilon, tinyDelta),
maxPartitionsContributed: 3,
minValue: 0.0,
maxValue: 1.0,
// The same privacy ID contributes "2" to 10 partitions, which implies that sum of each
// partition is 2. Contribution bounding is disabled. The sum of all sums must then be 20.
// This also ensures that no partitions (each with a single privacy id) gets thresholded.
want: 20,
},
} {
// triples{privacy_id, partition_key, value} contains {0,0,1}, {0,0,1}, {0,1,1}, {0,0,1}, {0,2,1}, {0,2,1}, …, {0,9,1}, {0,9,1}.
var triples []testutils.TripleWithIntValue
for i := 0; i < 10; i++ {
// The sum total contributions per user per partition is 2. When contribution bounding is
// enabled, this will be clamped to 1 since MaxValue is 1.
triples = append(triples, testutils.MakeSampleTripleWithIntValue(1, i)...)
triples = append(triples, testutils.MakeSampleTripleWithIntValue(1, i)...)
}
wantMetric := []testutils.TestInt64Metric{
{0, tc.want},
}
p, s, col, want := ptest.CreateList2(triples, wantMetric)
col = beam.ParDo(s, testutils.ExtractIDFromTripleWithIntValue, col)
pcol := pbeam.MakePrivate(s, col, tc.privacySpec)
pcol = pbeam.ParDo(s, testutils.TripleWithIntValueToKV, pcol)
got := pbeam.SumPerKey(s, pcol, pbeam.SumParams{
MaxPartitionsContributed: tc.maxPartitionsContributed,
MinValue: tc.minValue,
MaxValue: tc.maxValue,
NoiseKind: pbeam.LaplaceNoise{}})
sums := beam.DropKey(s, got)
sumOverPartitions := stats.Sum(s, sums)
got = beam.AddFixedKey(s, sumOverPartitions) // Adds a fixed key of 0.
want = beam.ParDo(s, testutils.Int64MetricToKV, want)
if err := testutils.EqualsKVInt64(s, got, want); err != nil {
t.Fatalf("EqualsKVInt64: %v", err)
}
if err := ptest.Run(p); err != nil {
t.Errorf("SumPerKey: %s did not bound contributions correctly for ints: %v", tc.desc, err)
}
}
}
// Tests that SumPerKey with public partitions bounds per-partition and cross-partition contributions
// correctly, adds no noise and respects public partitions (keeps only public partitions) in test
// mode with ints.
func TestSumPerKeyWithPartitionsTestModeInt(t *testing.T) {
for _, tc := range []struct {
desc string
privacySpec *pbeam.PrivacySpec
maxPartitionsContributed int64
minValue float64
maxValue float64
want int64
}{
{
desc: "test mode with contribution bounding",
privacySpec: NewPrivacySpecNoNoiseWithContributionBounding(tinyEpsilon, zeroDelta),
maxPartitionsContributed: 3,
minValue: 0.0,
maxValue: 1.0,
// The same privacy ID contributes "1" ("2" is clamped to "1" since MaxValue is 1) to 10
// partitions, which implies that sum of each partition is 1. With a max contribution of 3,
// 2 out of 5 public partitions should be dropped. The sum of all sums must then be 3.
want: 3,
},
{
desc: "test mode without contribution bounding",
privacySpec: NewPrivacySpecNoNoiseWithoutContributionBounding(tinyEpsilon, zeroDelta),
maxPartitionsContributed: 3,
minValue: 0.0,
maxValue: 1.0,
// The same privacy ID contributes "2" to 10 partitions, which implies that sum of each
// partition is 2. Contribution bounding is disabled and 5 out of 10 partitions are specified
// as public partitions. The sum of all sums must then be 10.
want: 10,
},
} {
// triples{privacy_id, partition_key, value} contains {0,0,1}, {0,0,1}, {0,1,1}, {0,0,1}, {0,2,1}, {0,2,1}, …, {0,9,1}, {0,9,1}.
var triples []testutils.TripleWithIntValue
for i := 0; i < 10; i++ {
// The sum total contributions per user per partition is 2. When contribution bounding is
// enabled, this will be clamped to 1 since MaxValue is 1.
triples = append(triples, testutils.MakeSampleTripleWithIntValue(1, i)...)
triples = append(triples, testutils.MakeSampleTripleWithIntValue(1, i)...)
}
wantMetric := []testutils.TestInt64Metric{
{0, tc.want},
}
p, s, col, want := ptest.CreateList2(triples, wantMetric)
col = beam.ParDo(s, testutils.ExtractIDFromTripleWithIntValue, col)
partitions := []int{0, 1, 2, 3, 4}
publicPartitions := beam.CreateList(s, partitions)
pcol := pbeam.MakePrivate(s, col, tc.privacySpec)
pcol = pbeam.ParDo(s, testutils.TripleWithIntValueToKV, pcol)
got := pbeam.SumPerKey(s, pcol, pbeam.SumParams{
MaxPartitionsContributed: tc.maxPartitionsContributed,
MinValue: tc.minValue,
MaxValue: tc.maxValue,
NoiseKind: pbeam.LaplaceNoise{},
PublicPartitions: publicPartitions})
sums := beam.DropKey(s, got)
sumOverPartitions := stats.Sum(s, sums)
got = beam.AddFixedKey(s, sumOverPartitions) // Adds a fixed key of 0.
want = beam.ParDo(s, testutils.Int64MetricToKV, want)
if err := testutils.EqualsKVInt64(s, got, want); err != nil {
t.Fatalf("EqualsKVInt64: %v", err)
}
if err := ptest.Run(p); err != nil {
t.Errorf("SumPerKey: %s with partitions did not bound contributions correctly for ints: %v", tc.desc, err)
}
}
}
// Tests that SumPerKey with public partitions adds empty partitions not found in the
// data but are in the list of public partitions in test mode with ints.
func TestSumPerKeyWithPartitionsTestModeAddsEmptyPartitionsInt(t *testing.T) {
for _, tc := range []struct {
desc string
privacySpec *pbeam.PrivacySpec
}{
{
desc: "test mode with contribution bounding",
privacySpec: NewPrivacySpecNoNoiseWithContributionBounding(tinyEpsilon, zeroDelta),
},
{
desc: "test mode without contribution bounding",
privacySpec: NewPrivacySpecNoNoiseWithoutContributionBounding(tinyEpsilon, zeroDelta),
},
} {
// triples{privacy_id, partition_key, value} contains {0,0,1}, {0,1,1}, {0,2,1}, …, {0,9,1}.
var triples []testutils.TripleWithIntValue
for i := 0; i < 10; i++ {
triples = append(triples, testutils.MakeSampleTripleWithIntValue(1, i)...)
}
wantMetric := []testutils.TestInt64Metric{
{9, 1}, // Keep partition 9.
{10, 0}, // Add partition 10.
}
p, s, col, want := ptest.CreateList2(triples, wantMetric)
col = beam.ParDo(s, testutils.ExtractIDFromTripleWithIntValue, col)
partitions := []int{9, 10}
publicPartitions := beam.CreateList(s, partitions)
pcol := pbeam.MakePrivate(s, col, tc.privacySpec)
pcol = pbeam.ParDo(s, testutils.TripleWithIntValueToKV, pcol)
got := pbeam.SumPerKey(s, pcol, pbeam.SumParams{
MaxPartitionsContributed: 1,
MinValue: 0,
MaxValue: 1,
NoiseKind: pbeam.LaplaceNoise{},
PublicPartitions: publicPartitions})
want = beam.ParDo(s, testutils.Int64MetricToKV, want)
if err := testutils.EqualsKVInt64(s, got, want); err != nil {
t.Fatalf("EqualsKVInt64: %v", err)
}
if err := ptest.Run(p); err != nil {
t.Errorf("SumPerKey: %s with partitions did not add empty partitions for ints: %v", tc.desc, err)
}
}
}
// Tests that SumPerKey bounds per-partition and cross-partition contributions correctly,
// adds no noise and keeps all partitions in test mode with floats.
func TestSumPerKeyTestModeFloat(t *testing.T) {
for _, tc := range []struct {
desc string
privacySpec *pbeam.PrivacySpec
maxPartitionsContributed int64
minValue float64
maxValue float64
want float64
}{
{
desc: "test mode with contribution bounding",
privacySpec: NewPrivacySpecNoNoiseWithContributionBounding(tinyEpsilon, tinyDelta),
maxPartitionsContributed: 3,
minValue: 0.0,
maxValue: 1.0,
// The same privacy ID contributes "1.0" ("2.0" is clamped to "1.0" since MaxValue is 1) to 10
// partitions, which implies that sum of each partition is 1.0. With a max contribution of 3,
// 7 partitions should be dropped. The sum of all sum must then be 3.0. This also ensures that
// no partitions (each with a single privacy id) gets thresholded.
want: 3.0,
},
{
desc: "test mode without contribution bounding",
privacySpec: NewPrivacySpecNoNoiseWithoutContributionBounding(tinyEpsilon, tinyDelta),
maxPartitionsContributed: 3,
minValue: 0.0,
maxValue: 1.0,
// The same privacy ID contributes "2.0" to 10 partitions, which implies that sum of each
// partition is 2.0. Contribution bounding is disabled. The sum of all sums must then be 20.0.
// This also ensures that no partitions (each with a single privacy id) gets thresholded.
want: 20.0,
},
} {
// triples{privacy_id, partition_key, value} contains {0,0,1}, {0,0,1}, {0,1,1}, {0,0,1}, {0,2,1}, {0,2,1}, …, {0,9,1}, {0,9,1}.
var triples []testutils.TripleWithFloatValue
for i := 0; i < 10; i++ {
// The sum total contributions per user per partition is 2.0. When contribution bounding is
// enabled, this will be clamped to 1.0 since MaxValue is 1.
triples = append(triples, testutils.MakeSampleTripleWithFloatValue(1, i)...)
triples = append(triples, testutils.MakeSampleTripleWithFloatValue(1, i)...)
}
wantMetric := []testutils.TestFloat64Metric{
{0, tc.want},
}
p, s, col, want := ptest.CreateList2(triples, wantMetric)
col = beam.ParDo(s, testutils.ExtractIDFromTripleWithFloatValue, col)
pcol := pbeam.MakePrivate(s, col, tc.privacySpec)
pcol = pbeam.ParDo(s, testutils.TripleWithFloatValueToKV, pcol)
got := pbeam.SumPerKey(s, pcol, pbeam.SumParams{
MaxPartitionsContributed: tc.maxPartitionsContributed,
MinValue: tc.minValue,
MaxValue: tc.maxValue,
NoiseKind: pbeam.LaplaceNoise{}})
sums := beam.DropKey(s, got)
sumOverPartitions := stats.Sum(s, sums)
got = beam.AddFixedKey(s, sumOverPartitions) // Adds a fixed key of 0.
want = beam.ParDo(s, testutils.Float64MetricToKV, want)
if err := testutils.EqualsKVFloat64(s, got, want); err != nil {
t.Fatalf("EqualsKVFloat64: %v", err)
}
if err := ptest.Run(p); err != nil {
t.Errorf("SumPerKey: %s did not bound contributions correctly for floats: %v", tc.desc, err)
}
}
}
// Tests that SumPerKey with public partitions bounds per-partition and cross-partition contributions
// correctly, adds no noise and respects public partitions (keeps only public partitions) in test
// mode with floats.
func TestSumPerKeyWithPartitionsTestModeFloat(t *testing.T) {
for _, tc := range []struct {
desc string
privacySpec *pbeam.PrivacySpec
maxPartitionsContributed int64
minValue float64
maxValue float64
want float64
}{
{
desc: "test mode with contribution bounding",
privacySpec: NewPrivacySpecNoNoiseWithContributionBounding(tinyEpsilon, zeroDelta),
maxPartitionsContributed: 3,
minValue: 0.0,
maxValue: 1.0,
// The same privacy ID contributes "1.0" ("2.0" is clamped to "1.0" since MaxValue is 1) to 10
// partitions, which implies that sum of each partition is 1.0. With a max contribution of 3,
// 2 out of 5 public partitions should be dropped. The sum of all sums must then be 3.0.
want: 3.0,
},
{
desc: "test mode without contribution bounding",
privacySpec: NewPrivacySpecNoNoiseWithoutContributionBounding(tinyEpsilon, zeroDelta),
maxPartitionsContributed: 3,
minValue: 0.0,
maxValue: 1.0,
// The same privacy ID contributes "2.0" to 10 partitions, which implies that sum of each
// partition is 2. Contribution bounding is disabled and 5 out of 10 partitions are specified
// as public partitions. The sum of all sums must then be 10.0.
want: 10.0,
},
} {
// triples{privacy_id, partition_key, value} contains {0,0,1}, {0,0,1}, {0,1,1}, {0,0,1}, {0,2,1}, {0,2,1}, …, {0,9,1}, {0,9,1}.
var triples []testutils.TripleWithFloatValue
for i := 0; i < 10; i++ {
// The sum total contributions per user per partition is 2.0. When contribution bounding is
// enabled, this will be clamped to 1.0 since MaxValue is 1.
triples = append(triples, testutils.MakeSampleTripleWithFloatValue(1, i)...)
triples = append(triples, testutils.MakeSampleTripleWithFloatValue(1, i)...)
}
wantMetric := []testutils.TestFloat64Metric{
{0, tc.want},
}
p, s, col, want := ptest.CreateList2(triples, wantMetric)
col = beam.ParDo(s, testutils.ExtractIDFromTripleWithFloatValue, col)
partitions := []int{0, 1, 2, 3, 4}
publicPartitions := beam.CreateList(s, partitions)
pcol := pbeam.MakePrivate(s, col, tc.privacySpec)
pcol = pbeam.ParDo(s, testutils.TripleWithFloatValueToKV, pcol)
got := pbeam.SumPerKey(s, pcol, pbeam.SumParams{
MaxPartitionsContributed: tc.maxPartitionsContributed,
MinValue: tc.minValue,
MaxValue: tc.maxValue,
NoiseKind: pbeam.LaplaceNoise{},
PublicPartitions: publicPartitions})
sums := beam.DropKey(s, got)
sumOverPartitions := stats.Sum(s, sums)
got = beam.AddFixedKey(s, sumOverPartitions) // Adds a fixed key of 0.
want = beam.ParDo(s, testutils.Float64MetricToKV, want)
if err := testutils.EqualsKVFloat64(s, got, want); err != nil {
t.Fatalf("EqualsKVFloat64: %v", err)
}
if err := ptest.Run(p); err != nil {
t.Errorf("SumPerKey: %s with partitions did not bound contributions correctly for floats: %v", tc.desc, err)
}
}
}
// Tests that SumPerKey with public partitions adds empty partitions not found in the
// data but are in the list of public partitions in test mode with floats.
func TestSumPerKeyWithPartitionsTestModeAddsEmptyPartitionsFloat(t *testing.T) {
for _, tc := range []struct {
desc string
privacySpec *pbeam.PrivacySpec
}{
{
desc: "test mode with contribution bounding",
privacySpec: NewPrivacySpecNoNoiseWithContributionBounding(tinyEpsilon, zeroDelta),
},
{
desc: "test mode without contribution bounding",
privacySpec: NewPrivacySpecNoNoiseWithoutContributionBounding(tinyEpsilon, zeroDelta),
},
} {
// triples{privacy_id, partition_key, value} contains {0,0,1}, {0,1,1}, {0,2,1}, …, {0,9,1}.
var triples []testutils.TripleWithFloatValue
for i := 0; i < 10; i++ {
triples = append(triples, testutils.MakeSampleTripleWithFloatValue(1, i)...)
}
wantMetric := []testutils.TestFloat64Metric{
{9, 1.0}, // Keep partition 9.
{10, 0.0}, // Add partition 10.
}
p, s, col, want := ptest.CreateList2(triples, wantMetric)
col = beam.ParDo(s, testutils.ExtractIDFromTripleWithFloatValue, col)
partitions := []int{9, 10}
publicPartitions := beam.CreateList(s, partitions)
pcol := pbeam.MakePrivate(s, col, tc.privacySpec)
pcol = pbeam.ParDo(s, testutils.TripleWithFloatValueToKV, pcol)
got := pbeam.SumPerKey(s, pcol, pbeam.SumParams{MaxPartitionsContributed: 1,
MinValue: 0,
MaxValue: 1,
NoiseKind: pbeam.LaplaceNoise{},
PublicPartitions: publicPartitions})
want = beam.ParDo(s, testutils.Float64MetricToKV, want)
if err := testutils.EqualsKVFloat64(s, got, want); err != nil {
t.Fatalf("EqualsKVFloat64: %v", err)
}
if err := ptest.Run(p); err != nil {
t.Errorf("SumPerKey: %s with partitions did not add empty partitions for floats: %v", tc.desc, err)
}
}
}
// Tests that MeanPerKey bounds cross-partition contributions correctly, adds no noise
// and keeps all partitions in test mode.
func TestMeanPerKeyTestModeCrossPartitionContributionBounding(t *testing.T) {
for _, tc := range []struct {
desc string
privacySpec *pbeam.PrivacySpec
maxPartitionsContributed int64
want float64
}{
{
desc: "test mode with contribution bounding",
privacySpec: NewPrivacySpecNoNoiseWithContributionBounding(tinyEpsilon, tinyDelta),
maxPartitionsContributed: 3,
// The same privacy ID contributes "1.0" to 10 partitions, which implies that mean of each
// partition is 1.0. With a max contribution of 3, 7 partitions should be dropped. The sum
// of all means must then be 3.0. This also ensures that no partitions (each with a single
// privacy id) gets thresholded.
want: 3.0,
},
{
desc: "test mode without contribution bounding",
privacySpec: NewPrivacySpecNoNoiseWithoutContributionBounding(tinyEpsilon, tinyDelta),
maxPartitionsContributed: 3,
// The same privacy ID contributes "1.0" to 10 partitions, which implies that mean of each
// partition is 1.0. Cross-partition contribution bounding is disabled. The sum of all means
// must then be 10.0. This also ensures that no partitions (each with a single privacy id)
// gets thresholded.
want: 10.0,
},
} {
// triples{privacy_id, partition_key, value} contains {0,0,1}, {0,1,1}, {0,2,1}, …, {0,9,1}.
var triples []testutils.TripleWithFloatValue
for i := 0; i < 10; i++ {
triples = append(triples, testutils.MakeSampleTripleWithFloatValue(1, i)...)
}
wantMetric := []testutils.TestFloat64Metric{
{0, tc.want},
}
p, s, col, want := ptest.CreateList2(triples, wantMetric)
col = beam.ParDo(s, testutils.ExtractIDFromTripleWithFloatValue, col)
pcol := pbeam.MakePrivate(s, col, tc.privacySpec)
pcol = pbeam.ParDo(s, testutils.TripleWithFloatValueToKV, pcol)
got := pbeam.MeanPerKey(s, pcol, pbeam.MeanParams{
MaxPartitionsContributed: tc.maxPartitionsContributed,
MaxContributionsPerPartition: 1,
MinValue: 0,
MaxValue: 1,
NoiseKind: pbeam.LaplaceNoise{}})
means := beam.DropKey(s, got)
sumOverPartitions := stats.Sum(s, means)
got = beam.AddFixedKey(s, sumOverPartitions) // Adds a fixed key of 0.
want = beam.ParDo(s, testutils.Float64MetricToKV, want)
if err := testutils.EqualsKVFloat64(s, got, want); err != nil {
t.Fatalf("EqualsKVFloat64: %v", err)
}
if err := ptest.Run(p); err != nil {
t.Errorf("MeanPerKey: %s did not do cross-partition contribution bounding correctly: %v", tc.desc, err)
}
}
}
// Tests that MeanPerKey bounds per-partition contributions correctly, adds no noise
// and keeps all partitions in test mode.
func TestMeanPerKeyTestModePerPartitionContributionBounding(t *testing.T) {
for _, tc := range []struct {
desc string
privacySpec *pbeam.PrivacySpec
maxContributionPerPartition int64
minValue float64
maxValue float64
want float64
}{
{
desc: "test mode with contribution bounding",
privacySpec: NewPrivacySpecNoNoiseWithContributionBounding(tinyEpsilon, tinyDelta),
maxContributionPerPartition: 1,
minValue: 0.0,
maxValue: 50.0,
// MaxContributionsPerPartition = 1, but id = 0 contributes 3 times to partition 0.
// There will be a per-partition contribution bounding stage.
// In this stage the algorithm will arbitrarily chose one of these 3 contributions.
// The mean should be equal to 50/50 = 1.0 (not 150/52 ≈ 2.88, if no per-partition contribution bounding is done).
want: 1.0,
},
{
desc: "test mode without contribution bounding",
privacySpec: NewPrivacySpecNoNoiseWithoutContributionBounding(tinyEpsilon, tinyDelta),
maxContributionPerPartition: 1,
minValue: 0.0,
maxValue: 50.0,
// There will not be a per-partition contribution bounding stage.
// The mean should be equal to 150/52 = 2.88461538462.
want: 2.88461538462,
},
} {
var triples []testutils.TripleWithFloatValue
// triples{privacy_id, partition_key, value} contains {0,0,50}, {0,0,50}, {0,0,50}, {1,0,0}, {2,0,0},…, {49,0,0}.
triples = append(triples, testutils.MakeTripleWithFloatValue(1, 0, 50)...)
triples = append(triples, testutils.MakeTripleWithFloatValue(1, 0, 50)...)
triples = append(triples, testutils.MakeTripleWithFloatValue(1, 0, 50)...)
triples = append(triples, testutils.MakeTripleWithFloatValueStartingFromKey(1, 49, 0, 0)...)
wantMetric := []testutils.TestFloat64Metric{
{0, tc.want},
}
p, s, col, want := ptest.CreateList2(triples, wantMetric)
col = beam.ParDo(s, testutils.ExtractIDFromTripleWithFloatValue, col)
pcol := pbeam.MakePrivate(s, col, tc.privacySpec)
pcol = pbeam.ParDo(s, testutils.TripleWithFloatValueToKV, pcol)
got := pbeam.MeanPerKey(s, pcol, pbeam.MeanParams{
MaxPartitionsContributed: 3,
MaxContributionsPerPartition: tc.maxContributionPerPartition,
MinValue: tc.minValue,
MaxValue: tc.maxValue,
NoiseKind: pbeam.LaplaceNoise{}})
want = beam.ParDo(s, testutils.Float64MetricToKV, want)
tolerance := 1e-10 // Using a small tolerance to make up for the rounding errors due to summation & division.
if err := testutils.ApproxEqualsKVFloat64(s, got, want, tolerance); err != nil {
t.Fatalf("ApproxEqualsKVFloat64: %v", err)
}
if err := ptest.Run(p); err != nil {
t.Errorf("MeanPerKey: %s did not do per-partition contribution bounding correctly: %v", tc.desc, err)
}
}
}
// Tests that MeanPerKey with public partitions bounds cross-partition contributions correctly,
// adds no noise and respects public partitions (keeps only public partitions) in test mode.
func TestMeanPerKeyWithPartitionsTestModeCrossPartitionContributionBounding(t *testing.T) {
for _, tc := range []struct {
desc string
privacySpec *pbeam.PrivacySpec
maxPartitionsContributed int64
want float64
}{
{
desc: "test mode with contribution bounding",
privacySpec: NewPrivacySpecNoNoiseWithContributionBounding(tinyEpsilon, zeroDelta),
maxPartitionsContributed: 3,
// The same privacy ID contributes "1.0" to 10 partitions, which implies that mean of each
// partition is 1.0. With a max contribution of 3, 2 out of 5 public partitions should be
// dropped. The sum of all means must then be 3.0.
want: 3.0,
},
{
desc: "test mode without contribution bounding",
privacySpec: NewPrivacySpecNoNoiseWithoutContributionBounding(tinyEpsilon, zeroDelta),
maxPartitionsContributed: 3,
// The same privacy ID contributes "1.0" to 10 partitions, which implies that mean of each
// partition is 1.0. Cross-partition contribution bounding is disabled and 5 out of 10 partitions
// are specified as public partitions. The sum of all means must then be 5.0.
want: 5.0,
},
} {
// triples{privacy_id, partition_key, value} contains {0,0,1}, {0,1,1}, {0,2,1},…, {0,9,1}.
var triples []testutils.TripleWithFloatValue
for i := 0; i < 10; i++ {
triples = append(triples, testutils.MakeSampleTripleWithFloatValue(1, i)...)
}
wantMetric := []testutils.TestFloat64Metric{
{0, tc.want},
}
p, s, col, want := ptest.CreateList2(triples, wantMetric)
col = beam.ParDo(s, testutils.ExtractIDFromTripleWithFloatValue, col)
partitions := []int{0, 1, 2, 3, 4}
publicPartitions := beam.CreateList(s, partitions)
pcol := pbeam.MakePrivate(s, col, tc.privacySpec)
pcol = pbeam.ParDo(s, testutils.TripleWithFloatValueToKV, pcol)
// Setting MinValue to -1 and MaxValue to 1 so that empty partitions have a mean of 0.
got := pbeam.MeanPerKey(s, pcol, pbeam.MeanParams{MaxPartitionsContributed: tc.maxPartitionsContributed,
MaxContributionsPerPartition: 1,
MinValue: -1,
MaxValue: 1,
NoiseKind: pbeam.LaplaceNoise{},
PublicPartitions: publicPartitions})
means := beam.DropKey(s, got)
sumOverPartitions := stats.Sum(s, means)
got = beam.AddFixedKey(s, sumOverPartitions) // Adds a fixed key of 0.
want = beam.ParDo(s, testutils.Float64MetricToKV, want)
if err := testutils.EqualsKVFloat64(s, got, want); err != nil {
t.Fatalf("EqualsKVFloat64: %v", err)
}
if err := ptest.Run(p); err != nil {
t.Errorf("MeanPerKey: %s with partitions did not do cross-partition contribution bounding correctly: %v", tc.desc, err)
}
}
}
// Tests that MeanPerKey with public partitions bounds per-partition contributions correctly,
// adds no noise and respects public partitions (keeps public partitions and adds empty
// partitions) in test mode.
func TestMeanPerKeyWithPartitionsTestModePerPartitionContributionBoundingAddsEmptyPartitions(t *testing.T) {
for _, tc := range []struct {
desc string
privacySpec *pbeam.PrivacySpec
maxContributionPerPartition int64
minValue float64
maxValue float64
want float64
}{
{
desc: "test mode with contribution bounding",
privacySpec: NewPrivacySpecNoNoiseWithContributionBounding(tinyEpsilon, zeroDelta),
maxContributionPerPartition: 1,
minValue: 0.0,
maxValue: 50.0,
// MaxContributionsPerPartition = 1, but id = 0 contributes 3 times to partition 0.
// There will be a per-partition contribution bounding stage.
// In this stage the algorithm will arbitrarily chose one of these 3 contributions.
// The mean should be equal to 50/50 = 1.0 (not 150/52 ≈ 2.88, if no per-partition contribution bounding is done).
want: 1.0,
},
{
desc: "test mode without contribution bounding",
privacySpec: NewPrivacySpecNoNoiseWithoutContributionBounding(tinyEpsilon, zeroDelta),
maxContributionPerPartition: 1,
minValue: 0.0,
maxValue: 50.0,
// There will not be a per-partition contribution bounding stage.
// The mean should be equal to 150/52 = 2.88461538462.
want: 2.88461538462,
},
} {
var triples []testutils.TripleWithFloatValue
// triples{privacy_id, partition_key, value} contains {0,0,50}, {0,0,50}, {0,0,50}, {1,0,0}, {2,0,0},…, {49,0,0}.
triples = append(triples, testutils.MakeTripleWithFloatValue(1, 0, 50)...)
triples = append(triples, testutils.MakeTripleWithFloatValue(1, 0, 50)...)
triples = append(triples, testutils.MakeTripleWithFloatValue(1, 0, 50)...)
triples = append(triples, testutils.MakeTripleWithFloatValueStartingFromKey(1, 49, 0, 0)...)
wantMetric := []testutils.TestFloat64Metric{
{0, tc.want},
{1, 25.0}, // Empty partition (output is midpoint of MinValue and MaxValue).
}
p, s, col, want := ptest.CreateList2(triples, wantMetric)
col = beam.ParDo(s, testutils.ExtractIDFromTripleWithFloatValue, col)
partitions := []int{0, 1}
publicPartitions := beam.CreateList(s, partitions)
pcol := pbeam.MakePrivate(s, col, tc.privacySpec)
pcol = pbeam.ParDo(s, testutils.TripleWithFloatValueToKV, pcol)
got := pbeam.MeanPerKey(s, pcol, pbeam.MeanParams{
MaxPartitionsContributed: 1,
MaxContributionsPerPartition: tc.maxContributionPerPartition,
MinValue: tc.minValue,
MaxValue: tc.maxValue,
NoiseKind: pbeam.LaplaceNoise{},
PublicPartitions: publicPartitions})
want = beam.ParDo(s, testutils.Float64MetricToKV, want)
tolerance := 1e-10 // Using a small tolerance to make up for the rounding errors due to summation & division.
if err := testutils.ApproxEqualsKVFloat64(s, got, want, tolerance); err != nil {
t.Fatalf("EqualsKVFloat64: %v", err)
}
if err := ptest.Run(p); err != nil {
t.Errorf("MeanPerKey: %s with partitions did not do per-partition contribution bounding correctly or added empty partitions: %v", tc.desc, err)
}
}
}
// Tests that QuantilesPerKey bounds cross-partitions contributions correctly, adds no
// noise and keeps all partitions in test mode.
func TestQuantilesPerKeyTestModeCrossPartitionContributionBounding(t *testing.T) {
for _, tc := range []struct {
desc string
privacySpec *pbeam.PrivacySpec
maxContributionsPerPartition int64
maxPartitionsContributed int64
minValue float64
maxValue float64
want float64
}{
{
desc: "test mode with contribution bounding",
privacySpec: NewPrivacySpecNoNoiseWithContributionBounding(tinyEpsilon, tinyDelta),
maxContributionsPerPartition: 20,
maxPartitionsContributed: 1,
minValue: 0.0,
maxValue: 1.0,
// 10 distinct privacy IDs contribute 0.0 to partition 0 and another 10 distinct
// privacy IDs contribute 0.0 to partition 1. A single privacy ID (different from
// these 20 privacy IDs) then contributes 20 "1.0"s to both partition 0 and partition 1.
// MaxPartitionsContributed is 1, so contributions to only one of these partitions will
// be kept. The median (rank=0.50) of one of these partitions must then be 0.0 and the other
// 1.0. The sum of these medians must then equal 1.0 (as opposed to 2.0 if no contribution
// bounding takes place). This also ensures that no partitions get thresholded.
want: 1.0,
},
{
desc: "test mode without contribution bounding",
privacySpec: NewPrivacySpecNoNoiseWithoutContributionBounding(tinyEpsilon, tinyDelta),
maxContributionsPerPartition: 20,
maxPartitionsContributed: 1,
minValue: 0.0,
maxValue: 1.0,
// 10 distinct privacy IDs contribute 0.0 to partition 0 and another 10 distinct
// privacy IDs contribute 0.0 to partition 1. A single privacy ID (different from
// these 20 privacy IDs) then contributes 20 "1.0"s to both partition 0 and partition 1.
// Cross-partition contribution bounding is disabled, so the median (rank=0.50) of both of
// these partitions must then be 1.0. The sum of these medians must then equal 2.0.
want: 2.0,
},
} {
triples := testutils.ConcatenateTriplesWithFloatValue(
testutils.MakeTripleWithFloatValue(10, 0, 0.0),
testutils.MakeTripleWithFloatValueStartingFromKey(10, 10, 1, 0.0))
for i := 0; i < 20; i++ {
triples = append(triples, testutils.TripleWithFloatValue{ID: 20, Partition: 0, Value: 1.0})
triples = append(triples, testutils.TripleWithFloatValue{ID: 20, Partition: 1, Value: 1.0})
}
wantMetric := []testutils.TestFloat64Metric{
{0, tc.want},
}
p, s, col, want := ptest.CreateList2(triples, wantMetric)
col = beam.ParDo(s, testutils.ExtractIDFromTripleWithFloatValue, col)
pcol := pbeam.MakePrivate(s, col, tc.privacySpec)
pcol = pbeam.ParDo(s, testutils.TripleWithFloatValueToKV, pcol)
got := pbeam.QuantilesPerKey(s, pcol, pbeam.QuantilesParams{
MaxContributionsPerPartition: tc.maxContributionsPerPartition,
MaxPartitionsContributed: tc.maxPartitionsContributed,
MinValue: tc.minValue,
MaxValue: tc.maxValue,
Ranks: []float64{0.50},
NoiseKind: pbeam.LaplaceNoise{}})
got = beam.ParDo(s, testutils.DereferenceFloat64Slice, got)
medians := beam.DropKey(s, got)
sumOverPartitions := stats.Sum(s, medians)
got = beam.AddFixedKey(s, sumOverPartitions) // Adds a fixed key of 0.
want = beam.ParDo(s, testutils.Float64MetricToKV, want)
// Tolerance is multiplied by 2 because we sum over 2 partitions.
tolerance := QuantilesTolerance(tc.minValue, tc.maxValue) * 2
if err := testutils.ApproxEqualsKVFloat64(s, got, want, tolerance); err != nil {
t.Fatalf("ApproxEqualsKVFloat64: %v", err)
}
if err := ptest.Run(p); err != nil {
t.Errorf("QuantilesPerKey: %s did not bound cross-partition contributions correctly: %v", tc.desc, err)
}
}
}
// Tests that QuantilesPerKey with partitions bounds cross-partitions contributions correctly
// and adds no noise in test mode.
func TestQuantilesPerKeyWithPartitionsTestModeCrossPartitionContributionBounding(t *testing.T) {
for _, tc := range []struct {
desc string
privacySpec *pbeam.PrivacySpec
maxContributionsPerPartition int64
maxPartitionsContributed int64
minValue float64
maxValue float64
want float64
}{
{
desc: "test mode with contribution bounding",
privacySpec: NewPrivacySpecNoNoiseWithContributionBounding(tinyEpsilon, zeroDelta),
maxContributionsPerPartition: 20,
maxPartitionsContributed: 1,
minValue: 0.0,
maxValue: 1.0,
// 10 distinct privacy IDs contribute 0.0 to partition 0 and another 10 distinct
// privacy IDs contribute 0.0 to partition 1. A single privacy ID (different from
// these 20 privacy IDs) then contributes 20 "1.0"s to both partition 0 and partition 1.
// MaxPartitionsContributed is 1, so contributions to only one of these partitions will
// be kept. The median (rank=0.50) of one of these partitions must then be 0.0 and the other
// 1.0. The sum of these medians must then equal 1.0 (as opposed to 2.0 if no contribution
// bounding takes place).
want: 1.0,
},
{
desc: "test mode without contribution bounding",
privacySpec: NewPrivacySpecNoNoiseWithoutContributionBounding(tinyEpsilon, zeroDelta),
maxContributionsPerPartition: 20,
maxPartitionsContributed: 1,
minValue: 0.0,
maxValue: 1.0,
// 10 distinct privacy IDs contribute 0.0 to partition 0 and another 10 distinct
// privacy IDs contribute 0.0 to partition 1. A single privacy ID (different from
// these 20 privacy IDs) then contributes 20 "1.0"s to both partition 0 and partition 1.
// Cross-partition contribution bounding is disabled, so the median (rank=0.50) of both of
// these partitions must then be 1.0. The sum of these medians must then equal 2.0.
want: 2.0,
},
} {
triples := testutils.ConcatenateTriplesWithFloatValue(
testutils.MakeTripleWithFloatValue(10, 0, 0.0),
testutils.MakeTripleWithFloatValueStartingFromKey(10, 10, 1, 0.0))
for i := 0; i < 20; i++ {
triples = append(triples, testutils.TripleWithFloatValue{ID: 200, Partition: 0, Value: 1.0})
triples = append(triples, testutils.TripleWithFloatValue{ID: 200, Partition: 1, Value: 1.0})
}
wantMetric := []testutils.TestFloat64Metric{
{0, tc.want},
}
p, s, col, want := ptest.CreateList2(triples, wantMetric)
col = beam.ParDo(s, testutils.ExtractIDFromTripleWithFloatValue, col)
partitions := []int{0, 1}
publicPartitions := beam.CreateList(s, partitions)
pcol := pbeam.MakePrivate(s, col, tc.privacySpec)
pcol = pbeam.ParDo(s, testutils.TripleWithFloatValueToKV, pcol)
got := pbeam.QuantilesPerKey(s, pcol, pbeam.QuantilesParams{
MaxContributionsPerPartition: tc.maxContributionsPerPartition,
MaxPartitionsContributed: tc.maxPartitionsContributed,
MinValue: tc.minValue,
MaxValue: tc.maxValue,
Ranks: []float64{0.50},
PublicPartitions: publicPartitions,
NoiseKind: pbeam.LaplaceNoise{}})
got = beam.ParDo(s, testutils.DereferenceFloat64Slice, got)
medians := beam.DropKey(s, got)
sumOverPartitions := stats.Sum(s, medians)
got = beam.AddFixedKey(s, sumOverPartitions) // Adds a fixed key of 0.
want = beam.ParDo(s, testutils.Float64MetricToKV, want)
// Tolerance is multiplied by 2 because we sum over 2 partitions.
tolerance := QuantilesTolerance(tc.minValue, tc.maxValue) * 2
if err := testutils.ApproxEqualsKVFloat64(s, got, want, tolerance); err != nil {
t.Fatalf("ApproxEqualsKVFloat64: %v", err)
}
if err := ptest.Run(p); err != nil {
t.Errorf("QuantilesPerKey: %s with partitions did not bound cross-partition contributions correctly: %v", tc.desc, err)
}
}
}
// Tests that QuantilesPerKey bounds per-partition contributions correctly, adds no noise
// and keeps all partitions in test mode.
func TestQuantilesPerKeyTestModePerPartitionContributionBounding(t *testing.T) {
for _, tc := range []struct {
desc string
privacySpec *pbeam.PrivacySpec
maxContributionPerPartition int64
minValue float64
maxValue float64
want float64
}{
{
desc: "test mode with contribution bounding",
privacySpec: NewPrivacySpecNoNoiseWithContributionBounding(tinyEpsilon, tinyDelta),
maxContributionPerPartition: 1,
minValue: 0.0,
maxValue: 1.0,
// First 50 privacy IDs contribute "0.0" 3 times to partition 0 and the next 50 privacy IDs
// contribute "1.0" to the same partition.
// There will be a per-partition contribution bounding stage. MaxContributionsPerPartition=1, so
// the algorithm will arbitrarily keep one of these 3 contributions for the first 50 privacy IDs.
// There will be equal number of "0.0"s and "1.0", so rank=0.6 should be equal to 1.0 (not 0.0,
// if no per-partition contribution bounding is done)
want: 1.0,
},
{
desc: "test mode without contribution bounding",
privacySpec: NewPrivacySpecNoNoiseWithoutContributionBounding(tinyEpsilon, tinyDelta),
maxContributionPerPartition: 1,
minValue: 0.0,
maxValue: 1.0,
// First 50 privacy IDs contribute "0.0" 3 times to partition 0 and the next 50 privacy IDs
// contribute "1.0" to the same partition.
// There will not be a per-partition contribution bounding stage, meaning that there will be 150
// "0.0"s and 50 "1.0"s. rank=0.6 should be equal to 0.0.
want: 0.0,
},
} {
var triples []testutils.TripleWithFloatValue
// triples{privacy_id, partition_key, value} contains {0,0,0}, {0,0,0}, {0,0,0}, …, {49,0,0}, {49,0,0}, {49,0,0}, {50,0,1}, {51,0,1}, …, {99, 0, 1}.
triples = append(triples, testutils.MakeTripleWithFloatValue(50, 0, 0.0)...)
triples = append(triples, testutils.MakeTripleWithFloatValue(50, 0, 0.0)...)
triples = append(triples, testutils.MakeTripleWithFloatValue(50, 0, 0.0)...)
triples = append(triples, testutils.MakeTripleWithFloatValueStartingFromKey(50, 50, 0, 1.0)...)
wantMetric := []testutils.TestFloat64Metric{
{0, tc.want},
}
p, s, col, want := ptest.CreateList2(triples, wantMetric)
col = beam.ParDo(s, testutils.ExtractIDFromTripleWithFloatValue, col)
pcol := pbeam.MakePrivate(s, col, tc.privacySpec)
pcol = pbeam.ParDo(s, testutils.TripleWithFloatValueToKV, pcol)
got := pbeam.QuantilesPerKey(s, pcol, pbeam.QuantilesParams{
MaxContributionsPerPartition: tc.maxContributionPerPartition,
MaxPartitionsContributed: 1,
MinValue: tc.minValue,
MaxValue: tc.maxValue,
Ranks: []float64{0.6},
NoiseKind: pbeam.LaplaceNoise{}})
got = beam.ParDo(s, testutils.DereferenceFloat64Slice, got)
want = beam.ParDo(s, testutils.Float64MetricToKV, want)
if err := testutils.ApproxEqualsKVFloat64(s, got, want, QuantilesTolerance(tc.minValue, tc.maxValue)); err != nil {
t.Fatalf("ApproxEqualsKVFloat64: %v", err)
}
if err := ptest.Run(p); err != nil {
t.Errorf("QuantilesPerKey: %s did not do per-partition contribution bounding correctly: %v", tc.desc, err)
}
}
}
// Tests that QuantilesPerKey with partition bounds per-partition contributions correctly,
// adds no noise and keeps all partitions in test mode.
func TestQuantilesPerKeyWithPartitionsTestModePerPartitionContributionBounding(t *testing.T) {
for _, tc := range []struct {
desc string
privacySpec *pbeam.PrivacySpec
maxContributionPerPartition int64
minValue float64
maxValue float64
want float64
}{
{
desc: "test mode with contribution bounding",
privacySpec: NewPrivacySpecNoNoiseWithContributionBounding(tinyEpsilon, zeroDelta),
maxContributionPerPartition: 1,
minValue: 0.0,
maxValue: 1.0,
// First 50 privacy IDs contribute "0.0" 3 times to partition 0 and the next 50 privacy IDs
// contribute "1.0" to the same partition.
// There will be a per-partition contribution bounding stage. MaxContributionsPerPartition=1, so
// the algorithm will arbitrarily keep one of these 3 contributions for the first 50 privacy IDs.
// There will be equal number of "0.0"s and "1.0", so rank=0.6 should be equal to 1.0 (not 0.0,
// if no per-partition contribution bounding is done)
want: 1.0,
},
{
desc: "test mode without contribution bounding",
privacySpec: NewPrivacySpecNoNoiseWithoutContributionBounding(tinyEpsilon, zeroDelta),
maxContributionPerPartition: 1,
minValue: 0.0,
maxValue: 1.0,
// First 50 privacy IDs contribute "0.0" 3 times to partition 0 and the next 50 privacy IDs
// contribute "1.0" to the same partition.
// There will not be a per-partition contribution bounding stage, meaning that there will be 150
// "0.0"s and 50 "1.0"s. rank=0.6 should be equal to 0.0.
want: 0.0,
},
} {
var triples []testutils.TripleWithFloatValue
// triples{privacy_id, partition_key, value} contains {0,0,0}, {0,0,0}, {0,0,0}, …, {49,0,0}, {49,0,0}, {49,0,0}, {50,0,1}, {51,0,1}, …, {99, 0, 1}.
triples = append(triples, testutils.MakeTripleWithFloatValue(50, 0, 0.0)...)
triples = append(triples, testutils.MakeTripleWithFloatValue(50, 0, 0.0)...)
triples = append(triples, testutils.MakeTripleWithFloatValue(50, 0, 0.0)...)
triples = append(triples, testutils.MakeTripleWithFloatValueStartingFromKey(50, 50, 0, 1.0)...)
wantMetric := []testutils.TestFloat64Metric{
{0, tc.want},
}
p, s, col, want := ptest.CreateList2(triples, wantMetric)
col = beam.ParDo(s, testutils.ExtractIDFromTripleWithFloatValue, col)
publicPartitions := beam.CreateList(s, []int{0})
pcol := pbeam.MakePrivate(s, col, tc.privacySpec)
pcol = pbeam.ParDo(s, testutils.TripleWithFloatValueToKV, pcol)
got := pbeam.QuantilesPerKey(s, pcol, pbeam.QuantilesParams{
MaxContributionsPerPartition: tc.maxContributionPerPartition,
MaxPartitionsContributed: 1,
MinValue: tc.minValue,
MaxValue: tc.maxValue,
Ranks: []float64{0.6},
PublicPartitions: publicPartitions,
NoiseKind: pbeam.LaplaceNoise{}})
got = beam.ParDo(s, testutils.DereferenceFloat64Slice, got)
want = beam.ParDo(s, testutils.Float64MetricToKV, want)
if err := testutils.ApproxEqualsKVFloat64(s, got, want, QuantilesTolerance(tc.minValue, tc.maxValue)); err != nil {
t.Fatalf("ApproxEqualsKVFloat64: %v", err)
}
if err := ptest.Run(p); err != nil {
t.Errorf("QuantilesPerKey: %s with partitions did not do per-partition contribution bounding correctly: %v", tc.desc, err)
}
}
}
// Checks that QuantilesPerKey with partitions applies public partitions correctly in test mode.
func TestQuantilesPerKeyWithPartitionsAppliesPublicPartitions(t *testing.T) {
for _, tc := range []struct {
desc string
privacySpec *pbeam.PrivacySpec
}{
{
desc: "test mode with contribution bounding",
privacySpec: NewPrivacySpecNoNoiseWithContributionBounding(tinyEpsilon, zeroDelta),
},
{
desc: "test mode without contribution bounding",
privacySpec: NewPrivacySpecNoNoiseWithoutContributionBounding(tinyEpsilon, zeroDelta),
},
} {
triples := testutils.ConcatenateTriplesWithFloatValue(
testutils.MakeTripleWithFloatValue(100, 0, 1.0),
testutils.MakeTripleWithFloatValue(100, 0, 4.0),
testutils.MakeTripleWithFloatValueStartingFromKey(100, 100, 1, 1.0))
wantMetric := []testutils.TestFloat64SliceMetric{
{0, []float64{1.0, 1.0, 4.0, 4.0}},
// Partition 1 is not in the list of public partitions, so it will be dropped.
{2, []float64{0.5, 1.25, 3.75, 4.5}}, // Empty partition is linearly interpolated.
}
p, s, col, want := ptest.CreateList2(triples, wantMetric)
col = beam.ParDo(s, testutils.ExtractIDFromTripleWithFloatValue, col)
lower := 0.0
upper := 5.0
ranks := []float64{0.10, 0.25, 0.75, 0.90}
publicPartitions := beam.CreateList(s, []int{0, 2})
pcol := pbeam.MakePrivate(s, col, tc.privacySpec)
pcol = pbeam.ParDo(s, testutils.TripleWithFloatValueToKV, pcol)
got := pbeam.QuantilesPerKey(s, pcol, pbeam.QuantilesParams{
MaxPartitionsContributed: 1,
MaxContributionsPerPartition: 2,
MinValue: lower,
MaxValue: upper,
Ranks: ranks,
PublicPartitions: publicPartitions,
})
want = beam.ParDo(s, testutils.Float64SliceMetricToKV, want)
if err := testutils.ApproxEqualsKVFloat64Slice(s, got, want, QuantilesTolerance(lower, upper)); err != nil {
t.Fatalf("ApproxEqualsKVFloat64Slice: got error %v", err)
}
if err := ptest.Run(p); err != nil {
t.Errorf("QuantilesPerKey did apply public partitions correctly: %v", err)
}
}
}
func TestQuantilesTolerance(t *testing.T) {
for _, tc := range []struct {
minValue float64
maxValue float64
wantTolerance float64
}{
{-5.0, 5.0, 0.00015258789},
{0.0, 1000.0, 0.01525878906},
} {
got := QuantilesTolerance(tc.minValue, tc.maxValue)
if !cmp.Equal(got, tc.wantTolerance, cmpopts.EquateApprox(0, 1e-9)) { // Allow for floating point arithmetic errors.
t.Errorf("QuantilesTolerance: with minValue=%f maxValue=%f got tolerance=%f, want=%f", tc.minValue, tc.maxValue, got, tc.wantTolerance)
}
}
}
// Tests that SelectPartitions bounds cross-partition contributions correctly and keeps
// all partitions in test mode with PrivatePCollection<V> inputs.
func TestSelectPartitionsTestModeCrossPartitionContributionBoundingV(t *testing.T) {
for _, tc := range []struct {
desc string
privacySpec *pbeam.PrivacySpec
maxPartitionsContributed int64
want int
}{
{
desc: "test mode with contribution bounding",
privacySpec: NewPrivacySpecNoNoiseWithContributionBounding(tinyEpsilon, tinyDelta),
maxPartitionsContributed: 1,
// With a max contribution of 1, only 1 partition should be outputted.
want: 1,
},
{
desc: "test mode without contribution bounding",
privacySpec: NewPrivacySpecNoNoiseWithoutContributionBounding(tinyEpsilon, tinyDelta),
maxPartitionsContributed: 1,
// Cross-partition contribution bounding is disabled, so all 10 partitions should be outputted.
want: 10,
},
} {
// Create 10 partitions with a single privacy ID contributing to each.
var pairs []testutils.PairII
for i := 0; i < 10; i++ {
pairs = append(pairs, testutils.MakePairsWithFixedV(1, i)...)
}
p, s, col := ptest.CreateList(pairs)
col = beam.ParDo(s, testutils.PairToKV, col)
pcol := pbeam.MakePrivate(s, col, tc.privacySpec)
got := pbeam.SelectPartitions(s, pcol, pbeam.SelectPartitionsParams{MaxPartitionsContributed: tc.maxPartitionsContributed})
testutils.CheckNumPartitions(s, got, tc.want)
if err := ptest.Run(p); err != nil {
t.Errorf("SelectPartitions: %s did not bound cross-partition contributions correctly for PrivatePCollection<V> inputs: %v", tc.desc, err)
}
}
}
// Tests that SelectPartitions bounds cross-partition contributions correctly and keeps
// all partitions in test mode with PrivatePCollection<K,V> inputs.
func TestSelectPartitionsTestModeCrossPartitionContributionBoundingKV(t *testing.T) {
for _, tc := range []struct {
desc string
privacySpec *pbeam.PrivacySpec
maxPartitionsContributed int64
want int
}{
{
desc: "test mode with contribution bounding",
privacySpec: NewPrivacySpecNoNoiseWithContributionBounding(tinyEpsilon, tinyDelta),
maxPartitionsContributed: 1,
// With a max contribution of 1, only 1 partition should be outputted.
want: 1,
},
{
desc: "test mode without contribution bounding",
privacySpec: NewPrivacySpecNoNoiseWithoutContributionBounding(tinyEpsilon, tinyDelta),
maxPartitionsContributed: 1,
// Cross-partition contribution bounding is disabled, so all 10 partitions should be outputted.
want: 10,
},
} {
// Create 10 partitions with a single privacy ID contributing to each.
var triples []testutils.TripleWithIntValue
for i := 0; i < 10; i++ {
triples = append(triples, testutils.MakeTripleWithIntValue(1, i, 0)...)
}
p, s, col := ptest.CreateList(triples)
col = beam.ParDo(s, testutils.ExtractIDFromTripleWithIntValue, col)
pcol := pbeam.MakePrivate(s, col, tc.privacySpec)
pcol = pbeam.ParDo(s, testutils.TripleWithIntValueToKV, pcol)
got := pbeam.SelectPartitions(s, pcol, pbeam.SelectPartitionsParams{MaxPartitionsContributed: tc.maxPartitionsContributed})
testutils.CheckNumPartitions(s, got, tc.want)
if err := ptest.Run(p); err != nil {
t.Errorf("SelectPartitions: %s did not bound cross-partition contributions correctly for PrivatePCollection<K,V> inputs: %v", tc.desc, err)
}
}
}
// Tests that DistinctPerKey bounds cross-partition contributions correctly, adds no
// noise and keeps all partitions in test mode.
func TestDistinctPerKeyTestModeCrossPartitionContributionBounding(t *testing.T) {
for _, tc := range []struct {
desc string
privacySpec *pbeam.PrivacySpec
maxPartitionsContributed int64
want int64
}{
{
desc: "test mode with contribution bounding",
privacySpec: NewPrivacySpecNoNoiseWithContributionBounding(tinyEpsilon, tinyDelta),
maxPartitionsContributed: 3,
// The same privacy ID contributes once to 10 partitions, which implies that count of each
// partition is 1. With a max contribution of 3, 7 partitions should be dropped. The sum of
// all counts must then be 3. This also ensures that no partitions (each with a single
// privacy id) gets thresholded.
want: 3,
},
{
desc: "test mode without contribution bounding",
privacySpec: NewPrivacySpecNoNoiseWithoutContributionBounding(tinyEpsilon, tinyDelta),
maxPartitionsContributed: 3,
// The same privacy ID contributes once to 10 partitions, which implies that count of each
// partition is 3. Cross-partition contribution bounding is disabled. The sum of all counts
// must then be 10. This also ensures that no partitions (each with a single privacy id)
// gets thresholded.
want: 10,
},
} {
// triples{privacy_id, partition_key, value} contains {0,0,1}, {0,1,1}, {0,2,1}, …, {0,9,1}.
var triples []testutils.TripleWithIntValue
for i := 0; i < 10; i++ {
triples = append(triples, testutils.MakeSampleTripleWithIntValue(1, i)...)
}
wantMetric := []testutils.TestInt64Metric{
{0, tc.want},
}
p, s, col, want := ptest.CreateList2(triples, wantMetric)
col = beam.ParDo(s, testutils.ExtractIDFromTripleWithIntValue, col)
pcol := pbeam.MakePrivate(s, col, tc.privacySpec)
pcol = pbeam.ParDo(s, testutils.TripleWithIntValueToKV, pcol)
got := pbeam.DistinctPerKey(s, pcol, pbeam.DistinctPerKeyParams{
MaxPartitionsContributed: tc.maxPartitionsContributed,
MaxContributionsPerPartition: 1,
NoiseKind: pbeam.LaplaceNoise{}})
sums := beam.DropKey(s, got)
sumOverPartitions := stats.Sum(s, sums)
got = beam.AddFixedKey(s, sumOverPartitions) // Adds a fixed key of 0.
want = beam.ParDo(s, testutils.Int64MetricToKV, want)
if err := testutils.EqualsKVInt64(s, got, want); err != nil {
t.Fatalf("EqualsKVInt64: %v", err)
}
if err := ptest.Run(p); err != nil {
t.Errorf("DistinctPerKey: %s did not bound cross-partition contributions correctly: %v", tc.desc, err)
}
}
}
// Tests that DistinctPerKey bounds per-partition contributions correctly, adds no
// noise and keeps all partitions in test mode.
func TestDistinctPerKeyTestModePerPartitionContributionBounding(t *testing.T) {
for _, tc := range []struct {
desc string
privacySpec *pbeam.PrivacySpec
maxContributionsPerPartition int64
want int64
}{
{
desc: "test mode with contribution bounding",
privacySpec: NewPrivacySpecNoNoiseWithContributionBounding(tinyEpsilon, tinyDelta),
maxContributionsPerPartition: 3,
// MaxContributionsPerPartition = 3, but id = 0 contributes 10 distinct values to partition 0.
// There will be a per-partition contribution bounding stage and only 3 of 10 distinct values
// will be kept. The count of partition 0 must then be 3. This also ensures that partition 0
// (with a single privacy id) does not get thresholded.
want: 3,
},
{
desc: "test mode without contribution bounding",
privacySpec: NewPrivacySpecNoNoiseWithoutContributionBounding(tinyEpsilon, tinyDelta),
maxContributionsPerPartition: 3,
// MaxContributionsPerPartition = 3, but id = 0 contributes 10 distinct values to partition 0.
// There will not be a per-partition contribution bounding stage, so all 10 distinct values will
// be kept. The count of partition 0 must then be 10. This also ensures that partition 0 (with
// a single privacy id) does not get thresholded.
want: 10,
},
} {
// triples{privacy_id, partition_key, value} contains {0,0,0}, {0,0,1}, {0,0,2}, …, {0,0,9}.
var triples []testutils.TripleWithIntValue
for i := 0; i < 10; i++ {
triples = append(triples, testutils.TripleWithIntValue{ID: 0, Partition: 0, Value: i})
}
wantMetric := []testutils.TestInt64Metric{
{0, tc.want},
}
p, s, col, want := ptest.CreateList2(triples, wantMetric)
col = beam.ParDo(s, testutils.ExtractIDFromTripleWithIntValue, col)
pcol := pbeam.MakePrivate(s, col, tc.privacySpec)
pcol = pbeam.ParDo(s, testutils.TripleWithIntValueToKV, pcol)
got := pbeam.DistinctPerKey(s, pcol, pbeam.DistinctPerKeyParams{
MaxPartitionsContributed: 1,
MaxContributionsPerPartition: tc.maxContributionsPerPartition,
NoiseKind: pbeam.LaplaceNoise{}})
sums := beam.DropKey(s, got)
sumOverPartitions := stats.Sum(s, sums)
got = beam.AddFixedKey(s, sumOverPartitions) // Adds a fixed key of 0.
want = beam.ParDo(s, testutils.Int64MetricToKV, want)
if err := testutils.EqualsKVInt64(s, got, want); err != nil {
t.Fatalf("EqualsKVInt64: %v", err)
}
if err := ptest.Run(p); err != nil {
t.Errorf("DistinctPerKey: %s did not bound per-partition contributions correctly: %v", tc.desc, err)
}
}
}