| // |
| // Copyright 2020 Google LLC |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| // |
| |
| package pbeam |
| |
| import ( |
| "bytes" |
| "fmt" |
| "reflect" |
| |
| log "github.com/golang/glog" |
| "github.com/google/differential-privacy/go/v2/checks" |
| "github.com/google/differential-privacy/go/v2/dpagg" |
| "github.com/google/differential-privacy/go/v2/noise" |
| "github.com/google/differential-privacy/privacy-on-beam/v2/internal/kv" |
| "github.com/apache/beam/sdks/v2/go/pkg/beam" |
| "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex" |
| "github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/stats" |
| ) |
| |
| func init() { |
| beam.RegisterType(reflect.TypeOf((*prepareSumFn)(nil))) |
| beam.RegisterType(reflect.TypeOf((*addNoiseToEmptyPublicPartitionsInt64Fn)(nil))) |
| beam.RegisterType(reflect.TypeOf((*addNoiseToEmptyPublicPartitionsFloat64Fn)(nil))) |
| // TODO: add tests to make sure we don't forget anything here |
| } |
| |
| // SumParams specifies the parameters associated with a Sum aggregation. |
| type SumParams struct { |
| // Noise type (which is either LaplaceNoise{} or GaussianNoise{}). |
| // |
| // Defaults to LaplaceNoise{}. |
| NoiseKind NoiseKind |
| // Differential privacy budget consumed by this aggregation. If there is |
| // only one aggregation, both Epsilon and Delta can be left 0; in that |
| // case, the entire budget of the PrivacySpec is consumed. |
| Epsilon, Delta float64 |
| // The maximum number of distinct values that a given privacy identifier |
| // can influence. There is an inherent trade-off when choosing this |
| // parameter: a larger MaxPartitionsContributed leads to less data loss due |
| // to contribution bounding, but since the noise added in aggregations is |
| // scaled according to maxPartitionsContributed, it also means that more |
| // noise is added to each count. |
| // |
| // Required. |
| MaxPartitionsContributed int64 |
| // The total contribution of a given privacy identifier to partition can be |
| // at at least MinValue, and at most MaxValue; otherwise it will be clamped |
| // to these bounds. For example, if a privacy identifier is associated with |
| // the key-value pairs [("a", -5), ("a", 2), ("b", 7), ("c", 3)] and the |
| // (MinValue, MaxValue) bounds are (0, 5), the contribution for "a" will be |
| // clamped up to 0, the contribution for "b" will be clamped down to 5, and |
| // the contribution for "c" will be untouched. There is an inherent |
| // trade-off when choosing MinValue and MaxValue: a small MinValue and a |
| // large MaxValue means that less records will be clamped, but that more |
| // noise will be added. |
| // |
| // Required. |
| MinValue, MaxValue float64 |
| // You can input the list of partitions present in the output if you know |
| // them in advance. When you specify partitions, partition selection / |
| // thresholding will be disabled and partitions will appear in the output |
| // if and only if they appear in the set of public partitions. |
| // |
| // You should not derive the list of partitions non-privately from private |
| // data. You should only use this in either of the following cases: |
| // 1. The list of partitions is data-independent. For example, if you are |
| // aggregating a metric by hour, you could provide a list of all possible |
| // hourly period. |
| // 2. You use a differentially private operation to come up with the list of |
| // partitions. For example, you could use the output of a SelectPartitions |
| // operation or the keys of a DistinctPrivacyID operation as the list of |
| // public partitions. |
| // |
| // PublicPartitions needs to be a beam.PCollection, slice, or array. The |
| // underlying type needs to match the partition type of the PrivatePCollection. |
| // |
| // Prefer slices or arrays if the list of public partitions is small and |
| // can fit into memory (e.g., up to a million). Prefer beam.PCollection |
| // otherwise. |
| // |
| // Optional. |
| PublicPartitions interface{} |
| } |
| |
| // SumPerKey sums the values associated with each key in a |
| // PrivatePCollection<K,V>, adding differentially private noise to the sums and |
| // doing pre-aggregation thresholding to remove sums with a low number of |
| // distinct privacy identifiers. Client can also specify a PCollection of partitions. |
| // |
| // It is also possible to manually specify the list of partitions |
| // present in the output, in which case the partition selection/thresholding |
| // step is skipped. |
| // |
| // SumPerKey transforms a PrivatePCollection<K,V> either into a |
| // PCollection<K,int64> or a PCollection<K,float64>, depending on whether its |
| // input is an integer type or a float type. |
| // |
| // Note: Do not use when your results may cause overflows for int64 and float64 |
| // values. This aggregation is not hardened for such applications yet. |
| func SumPerKey(s beam.Scope, pcol PrivatePCollection, params SumParams) beam.PCollection { |
| s = s.Scope("pbeam.SumPerKey") |
| // Obtain & validate type information from the underlying PCollection<K,V>. |
| idT, kvT := beam.ValidateKVType(pcol.col) |
| if kvT.Type() != reflect.TypeOf(kv.Pair{}) { |
| log.Fatalf("SumPerKey must be used on a PrivatePCollection of type <K,V>, got type %v instead", kvT) |
| } |
| if pcol.codec == nil { |
| log.Fatalf("SumPerKey: no codec found for the input PrivatePCollection.") |
| } |
| |
| // Get privacy parameters. |
| spec := pcol.privacySpec |
| epsilon, delta, err := spec.consumeBudget(params.Epsilon, params.Delta) |
| if err != nil { |
| log.Fatalf("Couldn't consume budget for SumPerKey: %v", err) |
| } |
| |
| var noiseKind noise.Kind |
| if params.NoiseKind == nil { |
| noiseKind = noise.LaplaceNoise |
| log.Infof("No NoiseKind specified, using Laplace Noise by default.") |
| } else { |
| noiseKind = params.NoiseKind.toNoiseKind() |
| } |
| err = checkSumPerKeyParams(params, epsilon, delta, noiseKind, pcol.codec.KType.T) |
| if err != nil { |
| log.Fatalf("pbeam.SumPerKey: %v", err) |
| } |
| |
| maxPartitionsContributed, err := getMaxPartitionsContributed(spec, params.MaxPartitionsContributed) |
| if err != nil { |
| log.Fatalf("Couldn't get MaxPartitionsContributed for SumPerKey: %v", err) |
| } |
| // Drop non-public partitions, if public partitions are specified. |
| pcol.col, err = dropNonPublicPartitions(s, pcol, params.PublicPartitions, pcol.codec.KType.T) |
| if err != nil { |
| log.Fatalf("Couldn't drop non-public partitions for SumPerKey: %v", err) |
| } |
| // First, group together the privacy ID and the partition ID, and sum the |
| // values per-privacy unit and per-partition. |
| prepareSumFn := newPrepareSumFn(idT, pcol.codec) |
| decoded := beam.ParDo(s, |
| prepareSumFn, |
| pcol.col, |
| beam.TypeDefinition{Var: beam.VType, T: pcol.codec.VType.T}) |
| summed := stats.SumPerKey(s, decoded) |
| // Second, convert the sum to int64 or float64, and re-key. |
| _, sumT := beam.ValidateKVType(summed) |
| convertFn, err := findConvertFn(sumT) |
| if err != nil { |
| log.Fatalf("Couldn't get convertFn for SumPerKey: %v", err) |
| } |
| vKind, err := getKind(convertFn) |
| if err != nil { |
| log.Fatalf("Couldn't get vKind for SumPerKey: %v", err) |
| } |
| converted := beam.ParDo(s, convertFn, summed) |
| rekeyFn, err := findRekeyFn(vKind) |
| if err != nil { |
| log.Fatalf("Couldn't get rekeyFn for SumPerKey: %v", err) |
| } |
| rekeyed := beam.ParDo(s, rekeyFn, converted) |
| // Second, do cross-partition contribution bounding if not in test mode without contribution bounding. |
| if spec.testMode != noNoiseWithoutContributionBounding { |
| rekeyed = boundContributions(s, rekeyed, maxPartitionsContributed) |
| } |
| // Fourth, now that contribution bounding is done, remove the privacy keys, |
| // decode the value, and do a DP sum with all the partial sums. |
| partialSumPairs := beam.DropKey(s, rekeyed) |
| partitionT := pcol.codec.KType.T |
| decodePairFn, err := newDecodePairFn(partitionT, vKind) |
| if err != nil { |
| log.Fatalf("Couldn't get decodePairFn for SumPerKey: %v", err) |
| } |
| partialSumKV := beam.ParDo(s, |
| decodePairFn, |
| partialSumPairs, |
| beam.TypeDefinition{Var: beam.XType, T: partitionT}) |
| // Add public partitions and return the aggregation output, if public partitions are specified. |
| if params.PublicPartitions != nil { |
| return addPublicPartitionsForSum(s, epsilon, delta, maxPartitionsContributed, |
| params, noiseKind, vKind, partialSumKV, spec.testMode) |
| } |
| boundedSumFn, err := newBoundedSumFn(epsilon, delta, maxPartitionsContributed, params.MinValue, params.MaxValue, noiseKind, vKind, false, spec.testMode) |
| if err != nil { |
| log.Fatalf("Couldn't get boundedSumFn for SumPerKey: %v", err) |
| } |
| sums := beam.CombinePerKey(s, |
| boundedSumFn, |
| partialSumKV) |
| // Drop thresholded partitions. |
| dropThresholdedPartitionsFn, err := findDropThresholdedPartitionsFn(vKind) |
| if err != nil { |
| log.Fatalf("Couldn't get dropThresholdedPartitionsFn for SumPerKey: %v", err) |
| } |
| sums = beam.ParDo(s, dropThresholdedPartitionsFn, sums) |
| // Clamp negative counts to zero when MinValue is non-negative. |
| if params.MinValue >= 0 { |
| clampNegativePartitionsFn, err := findClampNegativePartitionsFn(vKind) |
| if err != nil { |
| log.Fatalf("Couldn't get clampNegativePartitionsFn for SumPerKey: %v", err) |
| } |
| sums = beam.ParDo(s, clampNegativePartitionsFn, sums) |
| } |
| return sums |
| } |
| |
| func addPublicPartitionsForSum(s beam.Scope, epsilon, delta float64, maxPartitionsContributed int64, params SumParams, noiseKind noise.Kind, vKind reflect.Kind, partialSumKV beam.PCollection, testMode testMode) beam.PCollection { |
| // Calculate sums with non-public partitions dropped. Result is PCollection<partition, int64> or PCollection<partition, float64>. |
| boundedSumFn, err := newBoundedSumFn(epsilon, delta, maxPartitionsContributed, params.MinValue, params.MaxValue, noiseKind, vKind, true, testMode) |
| if err != nil { |
| log.Fatalf("Couldn't get boundedSumFn for SumPerKey: %v", err) |
| } |
| sums := beam.CombinePerKey(s, |
| boundedSumFn, |
| partialSumKV) |
| partitionT, _ := beam.ValidateKVType(sums) |
| sumsPartitions := beam.DropValue(s, sums) |
| // Create map with partitions in the data as keys. |
| partitionMap := beam.Combine(s, newPartitionsMapFn(beam.EncodedType{partitionT.Type()}), sumsPartitions) |
| // Add value of 0 to each partition key in PublicPartitions. |
| addZeroValuesToPublicPartitions, err := newAddZeroValuesToPublicPartitionsFn(vKind) |
| if err != nil { |
| log.Fatalf("Couldn't get addZeroValuesToPublicPartitions for SumPerKey: %v", err) |
| } |
| publicPartitions, isPCollection := params.PublicPartitions.(beam.PCollection) |
| if !isPCollection { |
| publicPartitions = beam.Reshuffle(s, beam.CreateList(s, params.PublicPartitions)) |
| } |
| publicPartitionsWithValues := beam.ParDo(s, addZeroValuesToPublicPartitions, publicPartitions) |
| // emptyPublicPartitions are the partitions that are public but not found in the data. |
| emptyPublicPartitions := beam.ParDo(s, newEmitPartitionsNotInTheDataFn(partitionT), publicPartitionsWithValues, beam.SideInput{Input: partitionMap}) |
| // Add noise to the empty public partitions. |
| addNoiseToEmptyPublicPartitionsFn, err := newAddNoiseToEmptyPublicPartitionsFn(epsilon, delta, maxPartitionsContributed, params.MinValue, params.MaxValue, noiseKind, vKind, testMode) |
| if err != nil { |
| log.Fatalf("Couldn't get addNoiseToEmptyPublicPartitionsFn for SumPerKey: %v", err) |
| } |
| emptySums := beam.ParDo(s, |
| addNoiseToEmptyPublicPartitionsFn, |
| emptyPublicPartitions) |
| dereferenceValueFn, err := findDereferenceValueFn(vKind) |
| if err != nil { |
| log.Fatalf("Couldn't get dereferenceValueFn for SumPerKey: %v", err) |
| } |
| sums = beam.ParDo(s, dereferenceValueFn, sums) |
| // Merge sums from data with sums from the empty public partitions. |
| allSums := beam.Flatten(s, sums, emptySums) |
| // Clamp negative counts to zero when MinValue is non-negative. |
| if params.MinValue >= 0 { |
| clampNegativePartitionsFn, err := findClampNegativePartitionsFn(vKind) |
| if err != nil { |
| log.Fatalf("Couldn't get clampNegativePartitionsFn for SumPerKey: %v", err) |
| } |
| allSums = beam.ParDo(s, clampNegativePartitionsFn, allSums) |
| } |
| return allSums |
| } |
| |
| func checkSumPerKeyParams(params SumParams, epsilon, delta float64, noiseKind noise.Kind, partitionType reflect.Type) error { |
| err := checkPublicPartitions(params.PublicPartitions, partitionType) |
| if err != nil { |
| return err |
| } |
| err = checks.CheckEpsilon(epsilon) |
| if err != nil { |
| return err |
| } |
| err = checkDelta(delta, noiseKind, params.PublicPartitions) |
| if err != nil { |
| return err |
| } |
| return checks.CheckBoundsFloat64(params.MinValue, params.MaxValue) |
| } |
| |
| // prepareSumFn takes a PCollection<ID,kv.Pair{K,V}> as input, and returns a |
| // PCollection<kv.Pair{ID,K},V>; where ID has been coded, and V has been |
| // decoded. |
| type prepareSumFn struct { |
| IDType beam.EncodedType |
| idEnc beam.ElementEncoder |
| InputPairCodec *kv.Codec |
| } |
| |
| func newPrepareSumFn(idType typex.FullType, kvCodec *kv.Codec) *prepareSumFn { |
| return &prepareSumFn{ |
| IDType: beam.EncodedType{idType.Type()}, |
| InputPairCodec: kvCodec, |
| } |
| } |
| |
| func (fn *prepareSumFn) Setup() error { |
| fn.idEnc = beam.NewElementEncoder(fn.IDType.T) |
| return fn.InputPairCodec.Setup() |
| } |
| |
| func (fn *prepareSumFn) ProcessElement(id beam.W, pair kv.Pair) (kv.Pair, beam.V, error) { |
| var idBuf bytes.Buffer |
| if err := fn.idEnc.Encode(id, &idBuf); err != nil { |
| return kv.Pair{}, nil, fmt.Errorf("pbeam.prepareSumFn.ProcessElement: couldn't encode ID %v: %w", id, err) |
| } |
| _, v, err := fn.InputPairCodec.Decode(pair) |
| return kv.Pair{idBuf.Bytes(), pair.K}, v, err |
| } |
| |
| // findConvertFn gets the correct conversion to int64 or float64 function. |
| func findConvertFn(t typex.FullType) (interface{}, error) { |
| switch t.Type().String() { |
| case "int": |
| return convertIntToInt64Fn, nil |
| case "int8": |
| return convertInt8ToInt64Fn, nil |
| case "int16": |
| return convertInt16ToInt64Fn, nil |
| case "int32": |
| return convertInt32ToInt64Fn, nil |
| case "int64": |
| return convertInt64ToInt64Fn, nil |
| case "uint": |
| return convertUintToInt64Fn, nil |
| case "uint8": |
| return convertUint8ToInt64Fn, nil |
| case "uint16": |
| return convertUint16ToInt64Fn, nil |
| case "uint32": |
| return convertUint32ToInt64Fn, nil |
| case "uint64": |
| return convertUint64ToInt64Fn, nil |
| case "float32": |
| return convertFloat32ToFloat64Fn, nil |
| case "float64": |
| return convertFloat64ToFloat64Fn, nil |
| default: |
| return nil, fmt.Errorf("unexpected value type of %v", t) |
| } |
| } |
| |
| func convertIntToInt64Fn(z beam.Z, i int) (beam.Z, int64) { |
| return z, int64(i) |
| } |
| func convertInt8ToInt64Fn(z beam.Z, i int8) (beam.Z, int64) { |
| return z, int64(i) |
| } |
| func convertInt16ToInt64Fn(z beam.Z, i int16) (beam.Z, int64) { |
| return z, int64(i) |
| } |
| func convertInt32ToInt64Fn(z beam.Z, i int32) (beam.Z, int64) { |
| return z, int64(i) |
| } |
| func convertInt64ToInt64Fn(z beam.Z, i int64) (beam.Z, int64) { |
| return z, i |
| } |
| func convertUintToInt64Fn(z beam.Z, i uint) (beam.Z, int64) { |
| return z, int64(i) |
| } |
| func convertUint8ToInt64Fn(z beam.Z, i uint8) (beam.Z, int64) { |
| return z, int64(i) |
| } |
| func convertUint16ToInt64Fn(z beam.Z, i uint16) (beam.Z, int64) { |
| return z, int64(i) |
| } |
| func convertUint32ToInt64Fn(z beam.Z, i uint32) (beam.Z, int64) { |
| return z, int64(i) |
| } |
| func convertUint64ToInt64Fn(z beam.Z, i uint64) (beam.Z, int64) { |
| return z, int64(i) |
| } |
| |
| // getKind gets the return kind of the convertFn function. |
| func getKind(fn interface{}) (reflect.Kind, error) { |
| if fn == nil { |
| return reflect.Invalid, fmt.Errorf("convertFn is nil") |
| } |
| if reflect.TypeOf(fn).Kind() != reflect.Func { |
| return reflect.Invalid, fmt.Errorf("convertFn is %v, should be a function", reflect.TypeOf(fn).Kind()) |
| } |
| if reflect.TypeOf(fn).NumOut() < 2 { |
| return reflect.Invalid, fmt.Errorf("convertFn has %v outputs, expected at least 2", reflect.TypeOf(fn).NumOut()) |
| } |
| return reflect.TypeOf(fn).Out(1).Kind(), nil |
| } |
| |
| func newAddNoiseToEmptyPublicPartitionsFn(epsilon, delta float64, maxPartitionsContributed int64, lower, upper float64, noiseKind noise.Kind, vKind reflect.Kind, testMode testMode) (interface{}, error) { |
| var err error |
| var bsFn interface{} |
| |
| switch vKind { |
| case reflect.Int64: |
| err = checks.CheckBoundsFloat64AsInt64(lower, upper) |
| bsFn = newAddNoiseToEmptyPublicPartitionsInt64Fn(epsilon, delta, maxPartitionsContributed, int64(lower), int64(upper), noiseKind, testMode) |
| case reflect.Float64: |
| err = checks.CheckBoundsFloat64(lower, upper) |
| bsFn = newAddNoiseToEmptyPublicPartitionsFloat64Fn(epsilon, delta, maxPartitionsContributed, lower, upper, noiseKind, testMode) |
| default: |
| err = fmt.Errorf("vKind(%v) should be int64 or float64", vKind) |
| } |
| |
| return bsFn, err |
| } |
| |
| // addNoiseToEmptyPublicPartitionsInt64Fn adds integer noise to empty partitions. |
| type addNoiseToEmptyPublicPartitionsInt64Fn struct { |
| // Privacy spec parameters (set during initial construction). |
| NoiseEpsilon float64 |
| NoiseDelta float64 |
| MaxPartitionsContributed int64 |
| Lower int64 |
| Upper int64 |
| NoiseKind noise.Kind |
| noise noise.Noise // Set during Setup phase according to NoiseKind. |
| TestMode testMode |
| } |
| |
| // newAddNoiseToEmptyPublicPartitionsInt64Fn returns a addNoiseToEmptyPublicPartitionsInt64Fn with the given budget and parameters. |
| func newAddNoiseToEmptyPublicPartitionsInt64Fn(epsilon, delta float64, maxPartitionsContributed, lower, upper int64, noiseKind noise.Kind, testMode testMode) *addNoiseToEmptyPublicPartitionsInt64Fn { |
| return &addNoiseToEmptyPublicPartitionsInt64Fn{ |
| NoiseEpsilon: epsilon, |
| NoiseDelta: delta, |
| MaxPartitionsContributed: maxPartitionsContributed, |
| Lower: lower, |
| Upper: upper, |
| NoiseKind: noiseKind, |
| TestMode: testMode, |
| } |
| } |
| |
| func (fn *addNoiseToEmptyPublicPartitionsInt64Fn) Setup() { |
| fn.noise = noise.ToNoise(fn.NoiseKind) |
| if fn.TestMode.isEnabled() { |
| fn.noise = noNoise{} |
| } |
| } |
| |
| func (fn *addNoiseToEmptyPublicPartitionsInt64Fn) ProcessElement(partitionKey beam.X, _ int64) (beam.X, int64, error) { |
| bs, err := dpagg.NewBoundedSumInt64(&dpagg.BoundedSumInt64Options{ |
| Epsilon: fn.NoiseEpsilon, |
| Delta: fn.NoiseDelta, |
| MaxPartitionsContributed: fn.MaxPartitionsContributed, |
| Lower: fn.Lower, |
| Upper: fn.Upper, |
| Noise: fn.noise, |
| }) |
| if err != nil { |
| return partitionKey, 0, err |
| } |
| noisedValue, err := bs.Result() |
| return partitionKey, noisedValue, err |
| } |
| |
| // addNoiseToEmptyPublicPartitionsFloat64Fn adds integer noise to empty partitions. |
| type addNoiseToEmptyPublicPartitionsFloat64Fn struct { |
| // Privacy spec parameters (set during initial construction). |
| NoiseEpsilon float64 |
| NoiseDelta float64 |
| MaxPartitionsContributed int64 |
| Lower float64 |
| Upper float64 |
| NoiseKind noise.Kind |
| noise noise.Noise // Set during Setup phase according to NoiseKind. |
| TestMode testMode |
| } |
| |
| // newAddNoiseToEmptyPublicPartitionsFloat64Fn returns a addNoiseToEmptyPublicPartitionsFloat64Fn with the given budget and parameters. |
| func newAddNoiseToEmptyPublicPartitionsFloat64Fn(epsilon, delta float64, maxPartitionsContributed int64, lower, upper float64, noiseKind noise.Kind, testMode testMode) *addNoiseToEmptyPublicPartitionsFloat64Fn { |
| return &addNoiseToEmptyPublicPartitionsFloat64Fn{ |
| NoiseEpsilon: epsilon, |
| NoiseDelta: delta, |
| MaxPartitionsContributed: maxPartitionsContributed, |
| Lower: lower, |
| Upper: upper, |
| NoiseKind: noiseKind, |
| TestMode: testMode, |
| } |
| } |
| |
| func (fn *addNoiseToEmptyPublicPartitionsFloat64Fn) Setup() { |
| fn.noise = noise.ToNoise(fn.NoiseKind) |
| if fn.TestMode.isEnabled() { |
| fn.noise = noNoise{} |
| } |
| } |
| |
| func (fn *addNoiseToEmptyPublicPartitionsFloat64Fn) ProcessElement(partitionKey beam.X, _ float64) (beam.X, float64, error) { |
| bs, err := dpagg.NewBoundedSumFloat64(&dpagg.BoundedSumFloat64Options{ |
| Epsilon: fn.NoiseEpsilon, |
| Delta: fn.NoiseDelta, |
| MaxPartitionsContributed: fn.MaxPartitionsContributed, |
| Lower: fn.Lower, |
| Upper: fn.Upper, |
| Noise: fn.noise, |
| }) |
| if err != nil { |
| return partitionKey, 0, err |
| } |
| noisedValue, err := bs.Result() |
| return partitionKey, noisedValue, err |
| } |