| // |
| // Copyright 2021 Google LLC |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| // |
| |
| // Package testutils provides helper functions, structs, etc. for testing |
| // Privacy on Beam pipelines. |
| package testutils |
| |
| import ( |
| "errors" |
| "fmt" |
| "math" |
| "math/big" |
| "reflect" |
| "sort" |
| "testing" |
| |
| "github.com/google/differential-privacy/go/v4/dpagg" |
| "github.com/google/differential-privacy/go/v4/noise" |
| "github.com/google/differential-privacy/privacy-on-beam/v4/internal/kv" |
| "github.com/apache/beam/sdks/v2/go/pkg/beam" |
| "github.com/apache/beam/sdks/v2/go/pkg/beam/register" |
| "github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/filter" |
| "github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/stats" |
| "github.com/google/go-cmp/cmp" |
| "github.com/google/go-cmp/cmp/cmpopts" |
| ) |
| |
| func init() { |
| register.DoFn3x1[beam.W, func(*int64) bool, func(*int64) bool, string](&diffInt64Fn{}) |
| register.Iter1[int64]() |
| register.DoFn3x1[int, func(*float64) bool, func(*float64) bool, string](&diffFloat64Fn{}) |
| register.Iter1[float64]() |
| register.DoFn3x1[int, func(*[]float64) bool, func(*[]float64) bool, string](&diffFloat64SliceFn{}) |
| register.Iter1[[]float64]() |
| register.DoFn1x1[PairIF64, error](&checkFloat64MetricsAreNoisyFn{}) |
| register.DoFn1x1[PairII64, error](&checkInt64MetricsAreNoisyFn{}) |
| register.DoFn1x1[int, error](&checkSomePartitionsAreDroppedFn{}) |
| register.DoFn3x1[int, func(*int) bool, func(*int) bool, error](&gotExpectedNumPartitionsFn{}) |
| register.Iter1[int]() |
| |
| register.Function1x1[int64, *int64](Int64Ptr) |
| register.Function1x1[float64, *float64](Float64Ptr) |
| register.Function1x1[beam.V, int](OneFn) |
| register.Function1x1[int64, error](CheckNoNegativeValuesInt64) |
| register.Function1x1[float64, error](CheckNoNegativeValuesFloat64) |
| register.Function1x1[float64, error](CheckAllValuesNegativeFloat64) |
| register.Function2x1[int, int, PairII](KVToPair) |
| register.Function1x2[PairII, int, int](PairToKV) |
| register.Function2x1[int, int64, PairII64](KVToPairII64) |
| register.Function1x2[PairII64, int, int64](PairII64ToKV) |
| register.Function2x1[int, float64, PairIF64](KVToPairIF64) |
| register.Function1x2[PairIF64, int, float64](PairIF64ToKV) |
| register.Function2x1[int, []float64, PairIF64Slice](KVToPairIF64Slice) |
| register.Function1x2[PairIF64Slice, int, []float64](PairIF64SliceToKV) |
| register.Function2x1[int, kv.Pair, PairICodedKV](KVToPairICodedKV) |
| register.Function1x2[PairICodedKV, int, kv.Pair](PairICodedKVToKV) |
| register.Function2x3[beam.V, []float64, beam.V, float64, error](DereferenceFloat64Slice) |
| register.Function1x2[TripleWithIntValue, int, int](TripleWithIntValueToKV) |
| register.Function1x2[TripleWithIntValue, int, TripleWithIntValue](ExtractIDFromTripleWithIntValue) |
| register.Function1x2[TripleWithFloatValue, int, float32](TripleWithFloatValueToKV) |
| register.Function1x2[TripleWithFloatValue, int, TripleWithFloatValue](ExtractIDFromTripleWithFloatValue) |
| register.Function3x1[int, func(*float64) bool, func(*float64) bool, string](lessThanOrEqualTo) |
| register.Function2x1[string, string, string](CombineDiffs) |
| register.Function1x1[string, error](ReportDiffs) |
| register.Function1x1[string, error](reportEquals) |
| register.Function1x1[string, error](reportGreaterThan) |
| register.Function3x1[beam.X, func(*int) bool, func(*int) bool, string](diffIntFn) |
| register.Function1x1[int64, bool](isNegativeInt64) |
| register.Function1x1[int, error](checkNumNegativeElemCountIsPositive) |
| } |
| |
| // PairII, PairII64, PairIF64, PairICodedKV and the related functions are helpers |
| // necessary to get a PCollection of KV type as input of a test Beam pipeline. |
| |
| // PairII holds a key-value pair of type (int, int). |
| type PairII struct { |
| Key int |
| Value int |
| } |
| |
| // PairToKV transforms a PairII into an (int, int) key-value pair. |
| func PairToKV(p PairII) (k, v int) { |
| return p.Key, p.Value |
| } |
| |
| // KVToPair transforms an (int, int) key-value pair into a PairII. |
| func KVToPair(k, v int) PairII { |
| return PairII{k, v} |
| } |
| |
| // PairII64 holds a key-value pair of type (int, int64). |
| type PairII64 struct { |
| Key int |
| Value int64 |
| } |
| |
| // KVToPairII64 transforms an (int, int64) key-value pair into a PairII64. |
| func KVToPairII64(v int, m int64) PairII64 { |
| return PairII64{v, m} |
| } |
| |
| // PairII64ToKV transforms a PairII64 into an (int, int64) key-value pair. |
| func PairII64ToKV(tm PairII64) (int, int64) { |
| return tm.Key, tm.Value |
| } |
| |
| // PairIF64 holds a key-value pair of type (int, float64). |
| type PairIF64 struct { |
| Key int |
| Value float64 |
| } |
| |
| // KVToPairIF64 transforms an (int, float64) key-value pair into a PairIF64. |
| func KVToPairIF64(v int, m float64) PairIF64 { |
| return PairIF64{v, m} |
| } |
| |
| // PairIF64ToKV transforms a PairIF64 into an (int, float64) key-value pair. |
| func PairIF64ToKV(tm PairIF64) (int, float64) { |
| return tm.Key, tm.Value |
| } |
| |
| // PairIF64Slice holds a key-value pair of type (int, []float64). |
| type PairIF64Slice struct { |
| Key int |
| Value []float64 |
| } |
| |
| // PairIF64SliceToKV transforms a PairIF64Slice into an (int, []float64) key-value pair. |
| func PairIF64SliceToKV(tm PairIF64Slice) (int, []float64) { |
| return tm.Key, tm.Value |
| } |
| |
| // KVToPairIF64Slice transforms an (int, []float64) key-value pair into a PairIF64Slice. |
| func KVToPairIF64Slice(v int, m []float64) PairIF64Slice { |
| return PairIF64Slice{v, m} |
| } |
| |
| // PairICodedKV holds a key-value pair of type (int, kv.Pair). |
| type PairICodedKV struct { |
| Key int |
| Value kv.Pair |
| } |
| |
| // PairICodedKVToKV transforms a PairICodedKV into an (int, kv.Pair) key-value pair. |
| func PairICodedKVToKV(p PairICodedKV) (k int, v kv.Pair) { |
| return p.Key, p.Value |
| } |
| |
| // KVToPairICodedKV transforms an (int, kv.Pair) key-value pair into a PairICodedKV. |
| func KVToPairICodedKV(k int, v kv.Pair) PairICodedKV { |
| return PairICodedKV{k, v} |
| } |
| |
| // MakePairsWithFixedV returns sample data where the same value is associated with |
| // multiple privacy keys: it returns a slice of pairs {0, v}, {1, v}, ..., {numKeys-1, v}. |
| func MakePairsWithFixedV(numKeys, v int) []PairII { |
| s := make([]PairII, numKeys) |
| for k := 0; k < numKeys; k++ { |
| s[k] = PairII{k, v} |
| } |
| return s |
| } |
| |
| // MakePairsWithFixedVStartingFromKey returns sample data where the same value is associated with |
| // multiple privacy keys: it returns a slice of pairs {0, v}, {1, v}, ..., {numKeys-1, v}. |
| // Privacy keys start from kOffset. |
| func MakePairsWithFixedVStartingFromKey(kOffset, numKeys, v int) []PairII { |
| s := make([]PairII, numKeys) |
| for k := 0; k < numKeys; k++ { |
| s[k] = PairII{k + kOffset, v} |
| } |
| return s |
| } |
| |
| // TripleWithIntValue contains a privacy ID, a partition ID, and an int value. |
| type TripleWithIntValue struct { |
| ID int |
| Partition int |
| Value int |
| } |
| |
| // MakeSampleTripleWithIntValue returns sample int data where the same partition ID is |
| // associated with multiple privacy keys, every time with the value 1: it returns |
| // a slice of tripleInts {0,p,1}, {1,p,1}, ..., {numKeys-1,p,1}. |
| func MakeSampleTripleWithIntValue(numKeys, p int) []TripleWithIntValue { |
| return MakeTripleWithIntValue(numKeys, p, 1) |
| } |
| |
| // MakeTripleWithIntValueStartingFromKey returns int data where the same partition ID is |
| // associated with multiple privacy keys (starting from provided key), to the given value v: it returns |
| // a slice of tripleInts {kOffset,p,v}, {kOffset + 1,p,v}, ..., {numKeys + kOffset - 1,p,v}. |
| // Privacy keys start from kOffset. |
| func MakeTripleWithIntValueStartingFromKey(kOffset, numKeys, p, v int) []TripleWithIntValue { |
| s := make([]TripleWithIntValue, numKeys) |
| for k := 0; k < numKeys; k++ { |
| s[k] = TripleWithIntValue{k + kOffset, p, v} |
| } |
| return s |
| } |
| |
| // MakeTripleWithIntValue returns int data where the same partition ID is |
| // associated with multiple privacy keys, to the given value v: it returns |
| // a slice of tripleInts {0,p,v}, {1,p,v}, ..., {numKeys-1,p,v}. |
| func MakeTripleWithIntValue(numKeys, p, v int) []TripleWithIntValue { |
| s := make([]TripleWithIntValue, numKeys) |
| for k := 0; k < numKeys; k++ { |
| s[k] = TripleWithIntValue{k, p, v} |
| } |
| return s |
| } |
| |
| // TripleWithIntValueToKV extracts the partition ID and the value from a tripleWithIntValue. It is |
| // used once the PrivatePCollection has been initialized, to transform it into a |
| // PrivatePCollection<partitionID,value>. |
| func TripleWithIntValueToKV(t TripleWithIntValue) (int, int) { |
| return t.Partition, t.Value |
| } |
| |
| // ExtractIDFromTripleWithIntValue extracts and returns the ID from a tripleWithIntValue. It is used to |
| // initialize PrivatePCollections. |
| func ExtractIDFromTripleWithIntValue(t TripleWithIntValue) (int, TripleWithIntValue) { |
| return t.ID, t |
| } |
| |
| // ConcatenateTriplesWithIntValue concatenates tripleWithIntValue slices. |
| func ConcatenateTriplesWithIntValue(slices ...[]TripleWithIntValue) []TripleWithIntValue { |
| var t []TripleWithIntValue |
| for _, slice := range slices { |
| t = append(t, slice...) |
| } |
| return t |
| } |
| |
| // TripleWithFloatValue contains a privacy ID, a partition ID, and a float value. |
| type TripleWithFloatValue struct { |
| ID int |
| Partition int |
| Value float32 |
| } |
| |
| // MakeSampleTripleWithFloatValue returns sample float data where the same partition ID is |
| // associated with multiple privacy keys, every time with the value 1.0: it returns |
| // a slice of tripleFloats {0,p,1}, {1,p,1}, ..., {numKeys-1,p,1}. |
| func MakeSampleTripleWithFloatValue(numKeys, p int) []TripleWithFloatValue { |
| return MakeTripleWithFloatValue(numKeys, p, 1.0) |
| } |
| |
| // MakeTripleWithFloatValue returns float data where the same partition ID is |
| // associated with multiple privacy keys, to the given value v: it returns |
| // a slice of tripleInts {0,p,v}, {1,p,v}, ..., {numKeys-1,p,v}. |
| func MakeTripleWithFloatValue(numKeys, p int, v float32) []TripleWithFloatValue { |
| s := make([]TripleWithFloatValue, numKeys) |
| for k := 0; k < numKeys; k++ { |
| s[k] = TripleWithFloatValue{k, p, v} |
| } |
| return s |
| } |
| |
| // MakeTripleWithFloatValueStartingFromKey returns float data where the same partition ID is |
| // associated with multiple privacy keys (starting from provided key), to the given value v: it returns |
| // a slice of tripleFloats {kOffset,p,v}, {kOffset + 1,p,v}, ..., {numKeys + kOffset - 1,p,v}. |
| // Privacy keys start from kOffset. |
| func MakeTripleWithFloatValueStartingFromKey(kOffset, numKeys, p int, v float32) []TripleWithFloatValue { |
| s := make([]TripleWithFloatValue, numKeys) |
| for k := 0; k < numKeys; k++ { |
| s[k] = TripleWithFloatValue{k + kOffset, p, v} |
| } |
| return s |
| } |
| |
| // ConcatenateTriplesWithFloatValue concatenates tripleWithFloatValue slices. |
| func ConcatenateTriplesWithFloatValue(slices ...[]TripleWithFloatValue) []TripleWithFloatValue { |
| var t []TripleWithFloatValue |
| for _, slice := range slices { |
| t = append(t, slice...) |
| } |
| return t |
| } |
| |
| // ExtractIDFromTripleWithFloatValue extracts and returns the ID from a tripleWithFloatValue. It is used to |
| // initialize PrivatePCollections. |
| func ExtractIDFromTripleWithFloatValue(t TripleWithFloatValue) (int, TripleWithFloatValue) { |
| return t.ID, t |
| } |
| |
| // TripleWithFloatValueToKV extracts the partition ID and the value from a tripleWithFloatValue. It is |
| // used once the PrivatePCollection has been initialized, to transform it into a |
| // PrivatePCollection<partitionID,value>. |
| func TripleWithFloatValueToKV(t TripleWithFloatValue) (int, float32) { |
| return t.Partition, t.Value |
| } |
| |
| // ConcatenatePairs concatenates pairII slices. |
| func ConcatenatePairs(slices ...[]PairII) []PairII { |
| var s []PairII |
| for _, slice := range slices { |
| s = append(s, slice...) |
| } |
| return s |
| } |
| |
| // EqualsKVInt checks that two PCollections col1 and col2 of type |
| // <K,int> are exactly equal. |
| func EqualsKVInt(t *testing.T, s beam.Scope, col1, col2 beam.PCollection) { |
| t.Helper() |
| wantV := reflect.TypeOf(int(0)) |
| if err := checkValueType(col1, wantV); err != nil { |
| t.Fatalf("EqualsKVInt: unexpected value type for col1: %v", err) |
| } |
| if err := checkValueType(col2, wantV); err != nil { |
| t.Fatalf("EqualsKVInt: unexpected value type for col2: %v", err) |
| } |
| |
| coGroupToValue := beam.CoGroupByKey(s, col1, col2) |
| diffs := beam.ParDo(s, diffIntFn, coGroupToValue) |
| combinedDiff := beam.Combine(s, CombineDiffs, diffs) |
| beam.ParDo0(s, ReportDiffs, combinedDiff) |
| } |
| |
| // EqualsKVInt64 checks that two PCollections col1 and col2 of type |
| // <K,int64> are exactly equal. Each key can only hold a single value. |
| func EqualsKVInt64(t *testing.T, s beam.Scope, col1, col2 beam.PCollection) { |
| ApproxEqualsKVInt64(t, s, col1, col2, 0.0) |
| } |
| |
| // EqualsKVFloat64 checks that two PCollections col1 and col2 of type |
| // <K,float64> are exactly equal. Each key can only hold a single value. |
| func EqualsKVFloat64(t *testing.T, s beam.Scope, col1, col2 beam.PCollection) { |
| ApproxEqualsKVFloat64(t, s, col1, col2, 0.0) |
| } |
| |
| // NotEqualsFloat64 checks that two PCollections col1 and col2 of type |
| // <K,float64> are different. Each key can only hold a single value. |
| func NotEqualsFloat64(t *testing.T, s beam.Scope, col1, col2 beam.PCollection) { |
| t.Helper() |
| wantV := reflect.TypeOf(float64(0)) |
| if err := checkValueType(col1, wantV); err != nil { |
| t.Fatalf("NotEqualsFloat64: unexpected value type for col1: %v", err) |
| } |
| if err := checkValueType(col2, wantV); err != nil { |
| t.Fatalf("NotEqualsFloat64: unexpected value type for col2: %v", err) |
| } |
| coGroupToValue := beam.CoGroupByKey(s, col1, col2) |
| diffs := beam.ParDo(s, &diffFloat64Fn{Tolerance: 0.0}, coGroupToValue) |
| combinedDiff := beam.Combine(s, CombineDiffs, diffs) |
| beam.ParDo0(s, reportEquals, combinedDiff) |
| } |
| |
| // ApproxEqualsKVInt64 checks that two PCollections col1 and col2 of type |
| // <K,int64> are approximately equal, where "approximately equal" means |
| // "the keys are the same in both col1 and col2, and the value associated with |
| // key k in col1 is within the specified tolerance of the value associated with k |
| // in col2". Each key can only hold a single value. |
| func ApproxEqualsKVInt64(t *testing.T, s beam.Scope, col1, col2 beam.PCollection, tolerance float64) { |
| t.Helper() |
| wantV := reflect.TypeOf(int64(0)) |
| if err := checkValueType(col1, wantV); err != nil { |
| t.Fatalf("ApproxEqualsKVInt64: unexpected value type for col1: %v", err) |
| } |
| if err := checkValueType(col2, wantV); err != nil { |
| t.Fatalf("ApproxEqualsKVInt64: unexpected value type for col2: %v", err) |
| } |
| |
| coGroupToValue := beam.CoGroupByKey(s, col1, col2) |
| diffs := beam.ParDo(s, &diffInt64Fn{Tolerance: tolerance}, coGroupToValue) |
| combinedDiff := beam.Combine(s, CombineDiffs, diffs) |
| beam.ParDo0(s, ReportDiffs, combinedDiff) |
| } |
| |
| // ApproxEqualsKVFloat64 checks that two PCollections col1 and col2 of type |
| // <K,float64> are approximately equal, where "approximately equal" means |
| // "the keys are the same in both col1 and col2, and the value associated with |
| // key k in col1 is within the specified tolerance of the value associated with k |
| // in col2". Each key can only hold a single value. |
| func ApproxEqualsKVFloat64(t *testing.T, s beam.Scope, col1, col2 beam.PCollection, tolerance float64) { |
| t.Helper() |
| wantV := reflect.TypeOf(float64(0)) |
| if err := checkValueType(col1, wantV); err != nil { |
| t.Fatalf("ApproxEqualsKVFloat64: unexpected value type for col1: %v", err) |
| } |
| if err := checkValueType(col2, wantV); err != nil { |
| t.Fatalf("ApproxEqualsKVFloat64: unexpected value type for col2: %v", err) |
| } |
| |
| coGroupToValue := beam.CoGroupByKey(s, col1, col2) |
| diffs := beam.ParDo(s, &diffFloat64Fn{Tolerance: tolerance}, coGroupToValue) |
| combinedDiff := beam.Combine(s, CombineDiffs, diffs) |
| beam.ParDo0(s, ReportDiffs, combinedDiff) |
| } |
| |
| // LessThanOrEqualToKVFloat64 checks that for PCollections col1 and col2 of type |
| // <K,float64>, for each key k, value corresponding to col1 is less than or equal |
| // to the value corresponding in col2. Each key can only hold a single value. |
| func LessThanOrEqualToKVFloat64(t *testing.T, s beam.Scope, col1, col2 beam.PCollection) { |
| t.Helper() |
| wantV := reflect.TypeOf(float64(0)) |
| if err := checkValueType(col1, wantV); err != nil { |
| t.Fatalf("LessThanOrEqualToKVFloat64: unexpected value type for col1: %v", err) |
| } |
| if err := checkValueType(col2, wantV); err != nil { |
| t.Fatalf("LessThanOrEqualToKVFloat64: unexpected value type for col2: %v", err) |
| } |
| |
| coGroupToValue := beam.CoGroupByKey(s, col1, col2) |
| diffs := beam.ParDo(s, lessThanOrEqualTo, coGroupToValue) |
| combinedDiff := beam.Combine(s, CombineDiffs, diffs) |
| beam.ParDo0(s, reportGreaterThan, combinedDiff) |
| } |
| |
| // ApproxEqualsKVFloat64Slice checks that two PCollections col1 and col2 of type |
| // <K,[]float64> are approximately equal, where "approximately equal" means |
| // "the keys are the same in both col1 and col2, and each value in the slice |
| // associated with key k in col1 is within the specified tolerance of each value |
| // in the slice associated with k in col2". Each key can only hold a single slice. |
| func ApproxEqualsKVFloat64Slice(t *testing.T, s beam.Scope, col1, col2 beam.PCollection, tolerance float64) { |
| t.Helper() |
| wantV := reflect.TypeOf([]float64{0.0}) |
| if err := checkValueType(col1, wantV); err != nil { |
| t.Fatalf("ApproxEqualsKVFloat64Slice: unexpected value type for col1: %v", err) |
| } |
| if err := checkValueType(col2, wantV); err != nil { |
| t.Fatalf("ApproxEqualsKVFloat64Slice: unexpected value type for col2: %v", err) |
| } |
| |
| coGroupToValue := beam.CoGroupByKey(s, col1, col2) |
| diffs := beam.ParDo(s, &diffFloat64SliceFn{Tolerance: tolerance}, coGroupToValue) |
| combinedDiff := beam.Combine(s, CombineDiffs, diffs) |
| beam.ParDo0(s, ReportDiffs, combinedDiff) |
| } |
| |
| func reportEquals(diffs string) error { |
| if diffs != "" { |
| return nil |
| } |
| return fmt.Errorf("collections are equal") |
| } |
| |
| // ReportDiffs returns an error if diffs is not empty. |
| func ReportDiffs(diffs string) error { |
| if diffs != "" { |
| return fmt.Errorf("collections are not approximately equal. Diff (-got, +want):\n%s", diffs) |
| } |
| return nil |
| } |
| |
| func reportGreaterThan(errors string) error { |
| if errors != "" { |
| return fmt.Errorf("col1 is not less than or equal to col2: %s", errors) |
| } |
| return nil |
| } |
| |
| // CombineDiffs concatenates two diff strings into a single string. |
| func CombineDiffs(diff1, diff2 string) string { |
| if diff2 == "" { |
| return fmt.Sprintf("%s", diff1) |
| } |
| return fmt.Sprintf("%s\n%s", diff1, diff2) |
| } |
| |
| // SliceFromIter returns a slice of type T from an iterator. |
| func SliceFromIter[T any](iter func(*T) bool) []T { |
| var result []T |
| var value T |
| for iter(&value) { |
| result = append(result, value) |
| } |
| return result |
| } |
| |
| type diffInt64Fn struct { |
| Tolerance float64 |
| } |
| |
| // ProcessElement returns a diff between values associated with a key. It |
| // returns an empty string if the values are approximately equal. |
| func (fn *diffInt64Fn) ProcessElement(k beam.W, v1Iter, v2Iter func(*int64) bool) string { |
| var v1 = int64PtrToSlice(v1Iter) |
| var v2 = int64PtrToSlice(v2Iter) |
| if diff := cmp.Diff(v1, v2, cmpopts.EquateApprox(0, fn.Tolerance)); diff != "" { |
| return fmt.Sprintf("For k=%v: diff=%s", k, diff) |
| } |
| return "" |
| } |
| |
| // diffIntFn returns a diff between values associated with a key. It |
| // returns an empty string if the values are approximately equal. |
| func diffIntFn(k beam.X, v1Iter, v2Iter func(*int) bool) string { |
| var v1 = intPtrToSlice(v1Iter) |
| var v2 = intPtrToSlice(v2Iter) |
| if diff := cmp.Diff(v1, v2); diff != "" { |
| return fmt.Sprintf("For k=%d: diff=%s", k, diff) |
| } |
| return "" |
| } |
| |
| func int64PtrToSlice(vIter func(*int64) bool) []float64 { |
| var vSlice []float64 |
| var v int64 |
| for vIter(&v) { |
| vSlice = append(vSlice, float64(v)) |
| } |
| return vSlice |
| } |
| |
| func intPtrToSlice(vIter func(*int) bool) []float64 { |
| var vSlice []float64 |
| var v int |
| for vIter(&v) { |
| vSlice = append(vSlice, float64(v)) |
| } |
| sort.Float64s(vSlice) |
| return vSlice |
| } |
| |
| func lessThanOrEqualTo(k int, v1Iter, v2Iter func(*float64) bool) string { |
| var v1 = SliceFromIter(v1Iter) |
| var v2 = SliceFromIter(v2Iter) |
| if len(v1) != 1 { |
| return fmt.Sprintf("For k=%d, col1 has %d values, it needs to have exactly 1 value", k, len(v1)) |
| } |
| if len(v2) != 1 { |
| return fmt.Sprintf("For k=%d, col2 has %d values, it needs to have exactly 1 value", k, len(v2)) |
| } |
| if v1[0] > v2[0] { |
| return fmt.Sprintf("For k=%d, v1=%f is greater than v2=%f", k, v1[0], v2[0]) |
| } |
| |
| return "" |
| } |
| |
| type diffFloat64Fn struct { |
| Tolerance float64 |
| } |
| |
| func (fn *diffFloat64Fn) ProcessElement(k int, v1Iter, v2Iter func(*float64) bool) string { |
| var v1 = SliceFromIter(v1Iter) |
| var v2 = SliceFromIter(v2Iter) |
| if diff := cmp.Diff(v1, v2, cmpopts.EquateApprox(0, fn.Tolerance)); diff != "" { |
| return fmt.Sprintf("For k=%d: diff=%s", k, diff) |
| } |
| |
| return "" // No diff |
| } |
| |
| type diffFloat64SliceFn struct { |
| Tolerance float64 |
| } |
| |
| func (fn *diffFloat64SliceFn) ProcessElement(k int, v1Iter, v2Iter func(*[]float64) bool) string { |
| var v1 = float64SlicePtrToSlice(v1Iter) |
| var v2 = float64SlicePtrToSlice(v2Iter) |
| if diff := cmp.Diff(v1, v2, cmpopts.EquateApprox(0, fn.Tolerance)); diff != "" { |
| return fmt.Sprintf("For k=%d: diff=%s", k, diff) |
| } |
| |
| return "" // No diff |
| } |
| |
| func float64SlicePtrToSlice(vIter func(*[]float64) bool) []float64 { |
| var vSlice []float64 |
| vIter(&vSlice) // We are expecting a single slice. |
| return vSlice |
| } |
| |
| func checkValueType(col beam.PCollection, wantValueType reflect.Type) error { |
| _, vFullType := beam.ValidateKVType(col) |
| vType := vFullType.Type() |
| if vType != wantValueType { |
| return fmt.Errorf("PCollection has (K,V) type with V=%v, want %v", vType, wantValueType) |
| } |
| return nil |
| } |
| |
| // LaplaceTolerance returns tolerance to be used in approxEquals or in threshold |
| // computations for tests with Laplace Noise to pass with 10⁻ᵏ flakiness. |
| // flakinessK is the parameter used to specify this. |
| // |
| // l1Sensitivity and epsilon are the DP parameters of the test. |
| // |
| // To see the logic and the math behind flakiness and tolerance calculation, |
| // See https://github.com/google/differential-privacy/blob/main/privacy-on-beam/docs/Tolerance_Calculation.pdf. |
| func LaplaceTolerance(flakinessK, l1Sensitivity, epsilon float64) float64 { |
| return l1Sensitivity * flakinessK * math.Log(10) / epsilon |
| } |
| |
| // ComplementaryLaplaceTolerance returns tolerance to be used in checkMetricsAreNoisy |
| // for tests with Laplace Noise to pass with 10⁻ᵏ flakiness. flakinessK is the |
| // parameter used to specify this. |
| // |
| // l1Sensitivity, epsilon and delta are the DP parameters of the test. |
| // |
| // To see the logic and the math behind flakiness and tolerance calculation, |
| // See https://github.com/google/differential-privacy/blob/main/privacy-on-beam/docs/Tolerance_Calculation.pdf |
| func ComplementaryLaplaceTolerance(flakinessK, l1Sensitivity, epsilon float64) float64 { |
| // We need arbitrary precision arithmetics here because ln(1-10⁻ᵏ) evaluates to |
| // 0 with float64, making the output 0. |
| sum := big.NewFloat(math.Pow(10, -flakinessK)).SetMode(big.AwayFromZero) // 10⁻ᵏ |
| sum.Neg(sum) // -10⁻ᵏ |
| sum.SetMode(big.ToZero).Add(sum, big.NewFloat(1)) // 1-10⁻ᵏ |
| log, _ := sum.Float64() |
| log = math.Log(log) // ln(1-10⁻ᵏ) |
| return -l1Sensitivity * log / epsilon |
| } |
| |
| // RoundedLaplaceTolerance rounds laplace tolerance value to the nearest integer, |
| // in order to work with tests for integer-valued aggregations. |
| // |
| // To see the logic and the math behind flakiness and tolerance calculation, |
| // See https://github.com/google/differential-privacy/blob/main/privacy-on-beam/docs/Tolerance_Calculation.pdf |
| func RoundedLaplaceTolerance(flakinessK, l1Sensitivity, epsilon float64) float64 { |
| return math.Round(LaplaceTolerance(flakinessK, l1Sensitivity, epsilon)) |
| } |
| |
| // GaussianTolerance returns tolerance to be used in approxEquals or in threshold |
| // computations for tests with Gaussian Noise to pass with 10⁻ᵏ flakiness. |
| // flakinessK is the parameter used to specify this. |
| // |
| // l0Sensitivity, lInfSensitivity, epsilon and delta are the DP parameters of the test. |
| // |
| // To see the logic and the math behind flakiness and tolerance calculation, |
| // See https://github.com/google/differential-privacy/blob/main/privacy-on-beam/docs/Tolerance_Calculation.pdf |
| func GaussianTolerance(flakinessK, l0Sensitivity, lInfSensitivity, epsilon, delta float64) float64 { |
| // We need arbitrary precision arithmetics here because (1-10⁻ᵏ) evaluates to |
| // 1 with float64, making the output Inf. |
| sum := big.NewFloat(math.Pow(10, -flakinessK)).SetMode(big.AwayFromZero) // 10⁻ᵏ |
| sum.Neg(sum) // -10⁻ᵏ |
| sum.SetMode(big.ToZero).Add(sum, big.NewFloat(1)) // 1-10⁻ᵏ |
| erfinv, _ := sum.Float64() |
| erfinv = math.Erfinv(erfinv) // Erfinv(1-10⁻ᵏ) |
| return erfinv * noise.SigmaForGaussian(int64(l0Sensitivity), lInfSensitivity, epsilon, delta) * math.Sqrt(2) |
| } |
| |
| // ComplementaryGaussianTolerance returns tolerance to be used in checkMetricsAreNoisy |
| // for tests with Gaussian Noise to pass with 10⁻ᵏ flakiness. flakinessK is the |
| // parameter used to specify this. |
| // |
| // l0Sensitivity, lInfSensitivity, epsilon and delta are the DP parameters of the test. |
| // |
| // To see the logic and the math behind flakiness and tolerance calculation, |
| // See https://github.com/google/differential-privacy/blob/main/privacy-on-beam/docs/Tolerance_Calculation.pdf |
| func ComplementaryGaussianTolerance(flakinessK, l0Sensitivity, lInfSensitivity, epsilon, delta float64) float64 { |
| return math.Erfinv(math.Pow(10, -flakinessK)) * noise.SigmaForGaussian(int64(l0Sensitivity), lInfSensitivity, epsilon, delta) * math.Sqrt(2) |
| } |
| |
| // GaussianToleranceForMean returns tolerances to be used in approxEquals for tests |
| // for mean using Gaussian Noise to pass with 10⁻ᵏ flakiness. |
| // |
| // The return values include the tolerances for count, normalized sum, and mean. |
| // |
| // - flakinessK: parameter used to specify k in the flakiness. |
| // - lower: minimum possible value of the input entities. |
| // - upper: maximum possible value of the input entities. |
| // - epsilon: the differential privacy parameter epsilon. |
| // - stats.NormalizedSum: \sum { clamp(x_i, lower, upper) - midPoint }. |
| // |
| // distanceFromMidPoint = upper - midPoint, where midPoint = (lower + upper)/2. |
| // |
| // To see the logic and the math behind flakiness and tolerance calculation, |
| // See https://github.com/google/differential-privacy/blob/main/privacy-on-beam/docs/Tolerance_Calculation.pdf |
| func GaussianToleranceForMean( |
| flakinessK, lower, upper float64, maxContributionsPerPartition, maxPartitionsContributed int64, |
| epsilon float64, delta float64, stats VarianceStatistics, |
| ) (VarianceStatistics, error) { |
| getToleranceOfNoise := func(flakinessK, eps, delta float64, sensitivify sensitivity) float64 { |
| return GaussianTolerance(flakinessK, sensitivify.L0, sensitivify.LInf, eps, delta) |
| } |
| return toleranceForNoisyMeanImpl( |
| flakinessK, lower, upper, maxContributionsPerPartition, maxPartitionsContributed, |
| epsilon, delta, stats, getToleranceOfNoise) |
| } |
| |
| // LaplaceToleranceForMean returns tolerances to be used in approxEquals for tests |
| // for mean using Laplace Noise to pass with 10⁻ᵏ flakiness. |
| // |
| // The return values include the tolerances for count, normalized sum, and mean. |
| // |
| // - flakinessK: parameter used to specify k in the flakiness. |
| // - lower: minimum possible value of the input entities. |
| // - upper: maximum possible value of the input entities. |
| // - epsilon: the differential privacy parameter epsilon. |
| // - stats.NormalizedSum: \sum { clamp(x_i, lower, upper) - midPoint }. |
| // |
| // distanceFromMidPoint = upper - midPoint, where midPoint = (lower + upper)/2. |
| // |
| // To see the logic and the math behind flakiness and tolerance calculation, |
| // See https://github.com/google/differential-privacy/blob/main/privacy-on-beam/docs/Tolerance_Calculation.pdf |
| func LaplaceToleranceForMean( |
| flakinessK, lower, upper float64, maxContributionsPerPartition, maxPartitionsContributed int64, |
| epsilon float64, stats VarianceStatistics, |
| ) (VarianceStatistics, error) { |
| getToleranceOfNoise := func(flakinessK, eps, delta float64, sensitivify sensitivity) float64 { |
| return LaplaceTolerance(flakinessK, sensitivify.L1, eps) |
| } |
| const delta float64 = 0 // Not used. |
| return toleranceForNoisyMeanImpl( |
| flakinessK, lower, upper, maxContributionsPerPartition, maxPartitionsContributed, |
| epsilon, delta, stats, getToleranceOfNoise) |
| } |
| |
| // toleranceForNoisyMeanImpl calculates the tolerance for noisy mean of a certain noise kind. |
| // |
| // The return values include the tolerances for count, normalized sum, and mean. |
| // |
| // - getToleranceOfNoise calculates the noise specific tolerance given flakiness and sensitivity. |
| func toleranceForNoisyMeanImpl( |
| flakinessK, lower, upper float64, |
| maxContributionsPerPartition, maxPartitionsContributed int64, |
| epsilon, delta float64, stats VarianceStatistics, |
| getToleranceOfNoise func(flakinessK, eps, delta float64, sensitivify sensitivity) float64, |
| ) (VarianceStatistics, error) { |
| // The term below is equivalent to -log_10(1-sqrt(1-1e-k)). |
| // It is formulated this way to increase precision and to avoid having this term go to infinity. |
| // Count and normalized sum uses the same following flakiness for simplicity. |
| newFlakinessK := -math.Log10(-math.Expm1(0.5 * math.Log1p(-math.Pow(10, -flakinessK)))) |
| halfEpsilon := epsilon / 2 |
| halfDelta := delta / 2 |
| |
| computer := sensitivityComputer{ |
| Lower: lower, |
| Upper: upper, |
| MaxContributionsPerPartition: maxContributionsPerPartition, |
| MaxPartitionsContributed: maxPartitionsContributed, |
| } |
| countSens := computer.SensitivitiesForCount() |
| normalizedSumSens := computer.SensitivitiesForNormalizedSum() |
| |
| countTolerance := math.Ceil(getToleranceOfNoise(newFlakinessK, halfEpsilon, halfDelta, countSens)) |
| normalizedSumTolerance := getToleranceOfNoise( |
| newFlakinessK, halfEpsilon, halfDelta, normalizedSumSens) |
| |
| tolerances := VarianceStatistics{ |
| Count: countTolerance, |
| NormalizedSum: normalizedSumTolerance, |
| } |
| |
| meanTolerance, err := ToleranceForMean(lower, upper, stats, tolerances) |
| if err != nil { |
| return VarianceStatistics{}, fmt.Errorf("ToleranceForMean: %w", err) |
| } |
| tolerances.Mean = meanTolerance |
| |
| return tolerances, nil |
| } |
| |
| // ToleranceForMean returns tolerance for mean to be used in approxEquals or checkMetricsAreNoisy |
| // for tests, under the pre-calculated tolerances for count and normalized sum so that the tests |
| // pass with 10⁻ᵏ flakiness. |
| // |
| // - lower: minimum possible value of the input entities. |
| // - upper: maximum possible value of the input entities. |
| // - exactStats: Count, NormalizedSum, and Mean of the input entities. NormalizedSumOfSquares is |
| // not needed. |
| // - tolerances: tolerances for count and normalized sum. The tolerance for normalized sum of |
| // squares is not needed. |
| // |
| // To see the logic and the math behind flakiness and tolerance calculation, |
| // see https://github.com/google/differential-privacy/blob/main/privacy-on-beam/docs/Tolerance_Calculation.pdf. |
| func ToleranceForMean( |
| lower, upper float64, |
| exactStats, tolerances VarianceStatistics, |
| ) (float64, error) { |
| |
| midPoint := lower + (upper-lower)/2.0 |
| |
| minNoisyCount := math.Max(1.0, exactStats.Count-tolerances.Count) // c_- |
| maxNoisyCount := math.Max(1.0, exactStats.Count+tolerances.Count) // c_+ |
| minNoisyNormalizedSum := exactStats.NormalizedSum - tolerances.NormalizedSum // s_- |
| maxNoisyNormalizedSum := exactStats.NormalizedSum + tolerances.NormalizedSum // s_+ |
| |
| // Find m_- and m_+ such that {s \in [s_-, s_+] and c \in [c_-, c_+]} implies {m \in [m_-, m_+]}. |
| // |
| // 1. For m_-: |
| // - If s_- >= 0, then m_- = s_-/c_+. |
| // - Otherwise, m_- = s_-/c_-. |
| // 2. For m_+: |
| // - If s_+ >= 0, then m_+ = s_+/c_-. |
| // - Otherwise, m_+ = s_+/c_+. |
| |
| getMBound := func(a, b, c float64) (float64, error) { |
| // If the numerator (min/max noisy normalized sum) of the mean is negative, |
| // min/max noisy counts should switch places to find min/max noisy mean. |
| normalizedBound := a / b |
| if a < 0 { |
| normalizedBound = a / c |
| } |
| return dpagg.ClampFloat64(normalizedBound+midPoint, lower, upper) |
| } |
| // Get M_- = m_- + midPoint. |
| minNoisyMean, err := getMBound(minNoisyNormalizedSum, maxNoisyCount, minNoisyCount) |
| if err != nil { |
| return 0, err |
| } |
| // Get M_+ = m_+ + midPoint. |
| maxNoisyMean, err := getMBound(maxNoisyNormalizedSum, minNoisyCount, maxNoisyCount) |
| if err != nil { |
| return 0, err |
| } |
| |
| // Return the tolerance as max(|exactMean - M_-| , |exactMean - M_+|). |
| return math.Max( |
| distanceBetween(exactStats.Mean, minNoisyMean), |
| distanceBetween(maxNoisyMean, exactStats.Mean), |
| ), nil |
| } |
| |
| // VarianceStatistics is a struct that contains the statistics related to mean or variance |
| // aggregation. |
| // |
| // When used in tests for variance, all fields are used. |
| // |
| // When used in tests for mean, only Count, NormalizedSum, and Mean are used. |
| type VarianceStatistics struct { |
| Count float64 |
| NormalizedSum float64 |
| NormalizedSumOfSquares float64 |
| Mean float64 |
| Variance float64 |
| } |
| |
| // ComputeMean computes the mean field based on the count and normalized sum fields in |
| // the original struct, plus the given bounds. |
| // Remember to call this function after having Count and NormalizedSum, |
| // and do not pass in inf or -inf as bounds. |
| // If the count is zero, the mean is set to the midPoint. |
| func (s *VarianceStatistics) ComputeMean(lower, upper float64) { |
| midPoint := (lower + upper) / 2 |
| if s.Count == 0 { |
| s.Mean = midPoint |
| } else { |
| normalizedMean := s.NormalizedSum / s.Count |
| s.Mean = normalizedMean + midPoint |
| } |
| } |
| |
| // ComputeMeanVariance computes the mean and variance fields based on the other fields in |
| // the original struct, plus the given bounds. |
| // Remember to call this function after having Count, NormalizedSum, and NormalizedSumOfSquares, |
| // and do not pass in inf or -inf as bounds. |
| // If the count is zero, the mean is set to the midPoint, and variance is set to zero. |
| func (s *VarianceStatistics) ComputeMeanVariance(lower, upper float64) { |
| midPoint := (lower + upper) / 2 |
| if s.Count == 0 { |
| s.Mean = midPoint |
| s.Variance = 0 |
| } else { |
| normalizedMean := s.NormalizedSum / s.Count |
| s.Mean = normalizedMean + midPoint |
| s.Variance = s.NormalizedSumOfSquares/s.Count - normalizedMean*normalizedMean |
| } |
| } |
| |
| // PerPartitionVarianceStatistics calculates the variance related statistics of each partition and |
| // returns a map of partition to varianceStatistics. |
| func PerPartitionVarianceStatistics( |
| minValue, maxValue float64, contributions []TripleWithFloatValue, |
| ) map[int]VarianceStatistics { |
| midPoint := (minValue + maxValue) / 2 |
| m := make(map[int]VarianceStatistics) |
| for _, triple := range contributions { |
| partition := triple.Partition |
| normalizedValue := min(maxValue, max(minValue, float64(triple.Value))) - midPoint |
| var newStats VarianceStatistics |
| if stats, ok := m[partition]; ok { |
| newStats = stats |
| } |
| |
| // Insert or update the statistics for the partition. |
| newStats.Count++ |
| newStats.NormalizedSum += normalizedValue |
| newStats.NormalizedSumOfSquares += normalizedValue * normalizedValue |
| m[partition] = newStats |
| } |
| for partition, stats := range m { |
| stats.ComputeMeanVariance(minValue, maxValue) |
| m[partition] = stats |
| } |
| return m |
| } |
| |
| // PerPartitionVarianceStatisticsInt is similar to PerPartitionVarianceStatistics but for input |
| // with TripleWithIntValue type. |
| func PerPartitionVarianceStatisticsInt( |
| minValue, maxValue float64, contributions []TripleWithIntValue, |
| ) map[int]VarianceStatistics { |
| var floatTriples []TripleWithFloatValue |
| for _, t := range contributions { |
| floatTriples = append(floatTriples, TripleWithFloatValue{ |
| ID: t.ID, |
| Partition: t.Partition, |
| Value: float32(t.Value), |
| }) |
| } |
| return PerPartitionVarianceStatistics(minValue, maxValue, floatTriples) |
| } |
| |
| // LaplaceToleranceForVariance returns tolerances to be used in approxEquals for tests |
| // for variance to pass with 10⁻ᵏ flakiness. |
| // |
| // The return values include the tolerances for count, normalized sum, normalized sum of squares, |
| // mean, and variance. |
| // |
| // - flakinessK: parameter used to specify k in the flakiness. |
| // - lower: minimum possible value of the input entities. |
| // - upper: maximum possible value of the input entities. |
| // - epsilon: the differential privacy parameter epsilon. |
| // - stats.NormalizedSumOfSquares: \sum { (clamp(x_i, lower, upper) - midPoint) ^ 2 } |
| // - stats.NormalizedSum: \sum { clamp(x_i, lower, upper) - midPoint }. |
| // |
| // distanceFromMidPoint = upper - midPoint, where midPoint = (lower + upper)/2. |
| // |
| // To see the logic and the math behind flakiness and tolerance calculation, |
| // See https://github.com/google/differential-privacy/blob/main/privacy-on-beam/docs/Tolerance_Calculation.pdf |
| func LaplaceToleranceForVariance( |
| flakinessK, lower, upper float64, maxContributionsPerPartition, maxPartitionsContributed int64, |
| epsilon float64, stats VarianceStatistics, |
| ) (VarianceStatistics, error) { |
| // The term below is equivalent to -log_10(1-cbrt(1-1e-k)). |
| // It is formulated this way to increase precision and to avoid having this term go to infinity. |
| // Count, normalized sum, and normalized square sum uses the same following flakiness for simplicity. |
| newFlakinessK := -math.Log10(-math.Expm1(1 / 3. * math.Log1p(-math.Pow(10, -flakinessK)))) |
| newEpsilon := epsilon / 3 |
| |
| computer := sensitivityComputer{ |
| Lower: lower, |
| Upper: upper, |
| MaxContributionsPerPartition: maxContributionsPerPartition, |
| MaxPartitionsContributed: maxPartitionsContributed, |
| } |
| l1Count := computer.SensitivitiesForCount().L1 |
| l1NormalizedSum := computer.SensitivitiesForNormalizedSum().L1 |
| l1NormalizedSumOfSquares := computer.SensitivitiesForNormalizedSumOfSquares().L1 |
| |
| countTolerance := math.Ceil(LaplaceTolerance(newFlakinessK, l1Count, newEpsilon)) |
| normalizedSumTolerance := LaplaceTolerance(newFlakinessK, l1NormalizedSum, newEpsilon) |
| normalizedSumOfSquaresTolerance := LaplaceTolerance(newFlakinessK, l1NormalizedSumOfSquares, newEpsilon) |
| |
| tolerances := VarianceStatistics{ |
| Count: countTolerance, |
| NormalizedSum: normalizedSumTolerance, |
| NormalizedSumOfSquares: normalizedSumOfSquaresTolerance, |
| } |
| |
| meanTolerance, err := ToleranceForMean(lower, upper, stats, tolerances) |
| if err != nil { |
| return VarianceStatistics{}, fmt.Errorf("ToleranceForMean: %w", err) |
| } |
| |
| varianceTolerance, err := ToleranceForVariance(lower, upper, stats, tolerances) |
| if err != nil { |
| return VarianceStatistics{}, fmt.Errorf("ToleranceForVariance: %w", err) |
| } |
| |
| tolerances.Mean = meanTolerance |
| tolerances.Variance = varianceTolerance |
| |
| return tolerances, nil |
| } |
| |
| // ToleranceForVariance returns tolerance for variance to be used in approxEquals or |
| // checkMetricsAreNoisy for tests, under the pre-calculated tolerances for count, normalized sum, |
| // and normalized sum of squares so that the tests pass with 10⁻ᵏ flakiness. |
| // |
| // - lower: minimum possible value of the input entities. |
| // - upper: maximum possible value of the input entities. |
| // - exactStats: Count, NormalizedSum, NormalizedSumOfSquares, and variance of the input entities. |
| // - tolerances: tolerances for count, normalized sum, and normalized sum of squares. |
| // |
| // To see the logic and the math behind flakiness and tolerance calculation, |
| // see https://github.com/google/differential-privacy/blob/main/privacy-on-beam/docs/Tolerance_Calculation.pdf. |
| func ToleranceForVariance( |
| lower, upper float64, |
| exactStats, tolerances VarianceStatistics, |
| ) (float64, error) { |
| |
| minNoisyC := math.Max(1.0, exactStats.Count-tolerances.Count) // c_- |
| maxNoisyC := math.Max(1.0, exactStats.Count+tolerances.Count) // c_+ |
| minNoisyNS := exactStats.NormalizedSum - tolerances.NormalizedSum // s_- |
| maxNoisyNS := exactStats.NormalizedSum + tolerances.NormalizedSum // s_+ |
| minNoisyNSS := exactStats.NormalizedSumOfSquares - tolerances.NormalizedSumOfSquares // ss_- |
| maxNoisyNSS := exactStats.NormalizedSumOfSquares + tolerances.NormalizedSumOfSquares // ss_+ |
| |
| // Let mm denote the mean of noisy normalized squares, i.e. mm = ss/c. |
| // Find the lower and upper bounds of mm, i.e. mm_- <= mm <= mm_+, using the following rules: |
| // |
| // 1. For mm_-: |
| // - If ss_- >= 0, then mm_- = ss_-/c_+. |
| // - Otherwise, mm_- = ss_-/c_-. |
| // 2. For mm_+: |
| // - Since ss_+ is always non-negative, mm_+ = ss_+/c_-. |
| |
| // Calculate mm_-. |
| minNoisyNMM := minNoisyNSS / maxNoisyC |
| if minNoisyNSS < 0 { |
| minNoisyNMM = minNoisyNSS / minNoisyC |
| } |
| // Calculate mm_+ |
| maxNoisyNMM := maxNoisyNSS / minNoisyC |
| |
| // Let m denote the mean of noisy normalized values, i.e. m = s/c. |
| // Find the lower and upper bounds of m^2, i.e. (m2_-)^2 <= m^2 <= (m2_+)^2, as follows: |
| // |
| // 1. If s_- <= s_+ <= 0, then |
| // - m2_- = (s_+/c_+)^2. |
| // - m2_+ = (s_-/c_-)^2. |
| // 2. If s_- <= 0 <= s_+, then |
| // - m2_- = 0. |
| // - m2_+ = ( max(|s_-|, |s_+|)/c_- )^2. |
| // 3. If 0 <= s_- <= s_+, then |
| // - m2_- = (s_-/c_+ )^2. |
| // - m2_+ = (s_+/c_- )^2. |
| |
| // Calculate m2_- and m2_+. |
| minNoisyNM2, maxNoisyNM2 := math.Pow(maxNoisyNS/maxNoisyC, 2), math.Pow(minNoisyNS/minNoisyC, 2) |
| if minNoisyNS <= 0 && 0 <= maxNoisyNS { |
| minNoisyNM2 = 0 |
| maxNoisyNM2 = math.Pow(math.Max(-minNoisyNS, maxNoisyNS)/minNoisyC, 2) |
| } else if minNoisyNS >= 0 { |
| minNoisyNM2 = math.Pow(minNoisyNS/maxNoisyC, 2) |
| maxNoisyNM2 = math.Pow(maxNoisyNS/minNoisyC, 2) |
| } |
| |
| // Because shifting the element value by midPoint does not change variance, |
| // that is we have the noisy variance V = MM - M2 = mm - m2. |
| // Given the bounds of mm and m2, we have (mm_- - m2_+) <= V <= (mm_+ - m2_-). |
| |
| // Return the tolerance as max(|exactVariance - V_-| , |exactVariance - V_+|). |
| // However, we first clamp V_- and V_+ to the range [0, maxVariance], |
| // where maxVariance = (upper - lower)^2/4. |
| maxVariance := math.Pow(upper-lower, 2) / 4 |
| minNoisyV := minNoisyNMM - maxNoisyNM2 |
| minNoisyVariance, err := dpagg.ClampFloat64(minNoisyV, 0, maxVariance) |
| if err != nil { |
| return 0, fmt.Errorf("clamping minNoisyVariance(%v, %v, %v): %w", |
| minNoisyV, 0, maxVariance, err) |
| } |
| maxNoisyV := maxNoisyNMM - minNoisyNM2 |
| maxNoisyVariance, err := dpagg.ClampFloat64(maxNoisyV, 0, maxVariance) |
| if err != nil { |
| return 0, fmt.Errorf("clamping maxNoisyVariance(%v, %v, %v): %w", |
| maxNoisyV, 0, maxVariance, err) |
| } |
| |
| return math.Max( |
| distanceBetween(exactStats.Variance, minNoisyVariance), |
| distanceBetween(exactStats.Variance, maxNoisyVariance), |
| ), nil |
| } |
| |
| // QuantilesTolerance returns tolerance to be used in approxEquals for tests |
| // for quantiles to pass with negligible flakiness. |
| // |
| // When no noise is added, the quantiles should return a value that differs from the true |
| // quantile by no more than the size of the buckets the range is partitioned into, i.e., |
| // (upper - lower) / (branchingFactor^treeHeight - 1). |
| // |
| // The tests don't disable noise, hence we multiply the tolerance by a reasonably small number, |
| // in this case 5, to account for the noise addition. |
| func QuantilesTolerance(lower, upper float64) float64 { |
| return 5 * (upper - lower) / (math.Pow(float64(dpagg.DefaultBranchingFactor), float64(dpagg.DefaultTreeHeight)) - 1.0) |
| } |
| |
| func distanceBetween(a, b float64) float64 { |
| return math.Abs(a - b) |
| } |
| |
| type sensitivityComputer struct { |
| Lower, Upper float64 |
| MaxContributionsPerPartition int64 |
| MaxPartitionsContributed int64 |
| } |
| |
| type sensitivity struct { |
| L0 float64 // L0 sensitivity |
| LInf float64 // LInf sensitivity |
| L1 float64 // L1 sensitivity |
| } |
| |
| func (c *sensitivityComputer) MaxDistFromMidPoint() float64 { |
| midPoint := c.Lower + (c.Upper-c.Lower)/2.0 |
| return c.Upper - midPoint |
| } |
| |
| func (c *sensitivityComputer) MaxContributions() float64 { |
| return float64(c.MaxContributionsPerPartition) * float64(c.MaxPartitionsContributed) |
| } |
| |
| func (c *sensitivityComputer) SensitivitiesForNormalizedSumOfSquares() sensitivity { |
| maxDistFromMidPoint := c.MaxDistFromMidPoint() |
| return sensitivity{ |
| L0: float64(c.MaxPartitionsContributed), |
| LInf: math.Pow(maxDistFromMidPoint, 2) * float64(c.MaxContributionsPerPartition), |
| L1: math.Pow(maxDistFromMidPoint, 2) * c.MaxContributions(), |
| } |
| } |
| |
| func (c *sensitivityComputer) SensitivitiesForNormalizedSum() sensitivity { |
| maxDistFromMidpoint := c.MaxDistFromMidPoint() |
| return sensitivity{ |
| L0: float64(c.MaxPartitionsContributed), |
| LInf: maxDistFromMidpoint * float64(c.MaxContributionsPerPartition), |
| L1: maxDistFromMidpoint * c.MaxContributions(), |
| } |
| } |
| |
| func (c *sensitivityComputer) SensitivitiesForCount() sensitivity { |
| return sensitivity{ |
| L0: float64(c.MaxPartitionsContributed), |
| LInf: float64(c.MaxContributionsPerPartition), |
| L1: float64(c.MaxContributionsPerPartition) * float64(c.MaxPartitionsContributed), |
| } |
| } |
| |
| // Int64Ptr transforms an int64 into an *int64. |
| func Int64Ptr(i int64) *int64 { |
| return &i |
| } |
| |
| // Float64Ptr transforms a float64 into a *float64. |
| func Float64Ptr(f float64) *float64 { |
| return &f |
| } |
| |
| // CheckFloat64MetricsAreNoisy checks that no values in a PCollection<pairIF64> |
| // (where pairIF64 contains the aggregate statistic) is equal to exactMetric. |
| func CheckFloat64MetricsAreNoisy(s beam.Scope, col beam.PCollection, exactMetric, tolerance float64) { |
| beam.ParDo0(s, &checkFloat64MetricsAreNoisyFn{exactMetric, tolerance}, col) |
| } |
| |
| type checkFloat64MetricsAreNoisyFn struct { |
| ExactMetric float64 |
| Tolerance float64 |
| } |
| |
| func (fn *checkFloat64MetricsAreNoisyFn) ProcessElement(m PairIF64) error { |
| if cmp.Equal(m.Value, fn.ExactMetric, cmpopts.EquateApprox(0, fn.Tolerance)) { |
| return fmt.Errorf("found a non-noisy output of %f for (key, exactOutput)=(%d, %f)", m.Value, m.Key, fn.ExactMetric) |
| } |
| return nil |
| } |
| |
| // CheckInt64MetricsAreNoisy checks that no values in a PCollection<pairII64> |
| // (where pairII64 contains the aggregate statistic) is equal to exactMetric. |
| func CheckInt64MetricsAreNoisy(s beam.Scope, col beam.PCollection, exactMetric int, tolerance float64) { |
| beam.ParDo0(s, &checkInt64MetricsAreNoisyFn{exactMetric, tolerance}, col) |
| } |
| |
| type checkInt64MetricsAreNoisyFn struct { |
| ExactMetric int |
| Tolerance float64 |
| } |
| |
| func (fn *checkInt64MetricsAreNoisyFn) ProcessElement(m PairII64) error { |
| if cmp.Equal(float64(m.Value), float64(fn.ExactMetric), cmpopts.EquateApprox(0, fn.Tolerance)) { |
| return fmt.Errorf("found a non-noisy output of %d for (key, exactOutput)=(%d, %d)", m.Value, m.Key, fn.ExactMetric) |
| } |
| return nil |
| } |
| |
| // OneFn always returns 1. |
| func OneFn(beam.V) int { return 1 } |
| |
| // CheckSomePartitionsAreDropped checks that the number of values in the PCollection |
| // is smaller than numPartitions, but larger than 0. |
| func CheckSomePartitionsAreDropped(s beam.Scope, col beam.PCollection, numPartitions int) { |
| ones := beam.ParDo(s, OneFn, col) |
| sum := stats.Sum(s, ones) |
| beam.ParDo0(s, &checkSomePartitionsAreDroppedFn{numPartitions}, sum) |
| } |
| |
| type checkSomePartitionsAreDroppedFn struct { |
| NumPartitions int |
| } |
| |
| func (fn *checkSomePartitionsAreDroppedFn) ProcessElement(i int) error { |
| if i <= 0 { |
| return fmt.Errorf("got %d emitted partitions, want a positive number", i) |
| } |
| if i >= fn.NumPartitions { |
| return fmt.Errorf("got %d emitted partitions (all of them), want some partitions to be dropped", i) |
| } |
| return nil |
| } |
| |
| // CheckNoNegativeValuesInt64 returns an error if an int64 value is negative. |
| func CheckNoNegativeValuesInt64(v int64) error { |
| if v < 0 { |
| return fmt.Errorf("unexpected negative element: %v", v) |
| } |
| return nil |
| } |
| |
| func isNegativeInt64(v int64) bool { |
| return v < 0 |
| } |
| |
| func checkNumNegativeElemCountIsPositive(elemCount int) error { |
| if elemCount == 0 { |
| return errors.New("want at least one negative value, but got 0") |
| } |
| return nil |
| } |
| |
| // CheckAtLeastOneValueNegativeInt64 operates on a PCollection<int64> and will |
| // return an error during runtime if none of the int64 values is negative. |
| func CheckAtLeastOneValueNegativeInt64(s beam.Scope, col beam.PCollection) { |
| negativeValues := filter.Include(s, col, isNegativeInt64) |
| numNegativeValues := stats.CountElms(s, negativeValues) |
| beam.ParDo0(s, checkNumNegativeElemCountIsPositive, numNegativeValues) |
| } |
| |
| // CheckNoNegativeValuesFloat64 returns an error if an float64 value is negative. |
| func CheckNoNegativeValuesFloat64(v float64) error { |
| if v < 0 { |
| return fmt.Errorf("unexpected negative element: %v", v) |
| } |
| return nil |
| } |
| |
| // CheckAllValuesNegativeFloat64 returns an error if an float64 value is non-negative. |
| func CheckAllValuesNegativeFloat64(v float64) error { |
| if v >= 0 { |
| return fmt.Errorf("unexpected non-negative element: %v", v) |
| } |
| return nil |
| } |
| |
| // ApproxEquals returns true if x and y are approximately equal within |
| // a tolerance of 1e-10. |
| func ApproxEquals(x, y float64) bool { |
| return cmp.Equal(x, y, cmpopts.EquateApprox(0, 1e-10)) |
| } |
| |
| // CheckNumPartitions checks that col has expected number of partitions. |
| func CheckNumPartitions(s beam.Scope, col beam.PCollection, expected int) { |
| CheckApproxNumPartitions(s, col, expected, 0) |
| } |
| |
| // CheckApproxNumPartitions checks that col has approximately expected number of partitions. |
| // col is allowed to have number of partitions within tolerance of expected. |
| func CheckApproxNumPartitions(s beam.Scope, col beam.PCollection, expected, tolerance int) { |
| ones := beam.ParDo(s, OneFn, col) |
| numPartitions := stats.Sum(s, ones) |
| numPartitions = beam.AddFixedKey(s, numPartitions) |
| |
| want := beam.Create(s, expected) |
| want = beam.AddFixedKey(s, want) |
| coGroupToValue := beam.CoGroupByKey(s, numPartitions, want) |
| beam.ParDo0(s, &gotExpectedNumPartitionsFn{tolerance}, coGroupToValue) |
| } |
| |
| type gotExpectedNumPartitionsFn struct { |
| Tolerance int |
| } |
| |
| func (fn *gotExpectedNumPartitionsFn) ProcessElement(_ int, v1Iter, v2Iter func(*int) bool) error { |
| got := getNumPartitions(v1Iter) |
| want := getNumPartitions(v2Iter) |
| if math.Abs(float64(got-want)) > float64(fn.Tolerance) { |
| if fn.Tolerance != 0 { |
| return fmt.Errorf("got %d emitted partitions, want %d +- %d", got, want, fn.Tolerance) |
| } |
| return fmt.Errorf("got %d emitted partitions, want %d", got, want) |
| } |
| return nil |
| } |
| |
| func getNumPartitions(vIter func(*int) bool) (v int) { |
| ok := vIter(&v) |
| if !ok { |
| return 0 |
| } |
| return v |
| } |
| |
| // DereferenceFloat64Slice returns the first and only element of the slice for |
| // each key in a PCollection<K, []float64>. Returns an error if the slice |
| // does not contain exactly 1 element. |
| func DereferenceFloat64Slice(v beam.V, r []float64) (beam.V, float64, error) { |
| if len(r) != 1 { |
| return v, 0.0, fmt.Errorf("dereferenceFloat64: r=%v does not contain a single element", r) |
| } |
| return v, r[0], nil |
| } |
| |
| // MeanVarianceAPIAddsNoiseTestParams contains the privacy parameters for the Mean and Variance API |
| // tests which checks that noise is added. |
| // |
| // We use small epsilon and delta to avoid flakiness due to rounding count noises to 0. |
| var MeanVarianceAPIAddsNoiseTestParams = struct { |
| AggEpsGaussian float64 |
| AggDelGaussian float64 |
| AggEpsLaplace float64 |
| }{ |
| AggEpsGaussian: 1 * math.Pow10(-13), |
| AggDelGaussian: 0.005 * math.Pow10(-13), |
| AggEpsLaplace: 9.8e-11, |
| } |