| // |
| // Copyright 2020 Google LLC |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| // |
| |
| // This file contains methods & ParDos used by multiple DP aggregations. |
| |
| package pbeam |
| |
| import ( |
| "bytes" |
| "fmt" |
| "math" |
| "math/rand" |
| "reflect" |
| |
| "github.com/google/differential-privacy/go/v3/checks" |
| "github.com/google/differential-privacy/go/v3/dpagg" |
| "github.com/google/differential-privacy/go/v3/noise" |
| "github.com/google/differential-privacy/privacy-on-beam/v3/internal/kv" |
| "github.com/apache/beam/sdks/v2/go/pkg/beam" |
| "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex" |
| "github.com/apache/beam/sdks/v2/go/pkg/beam/register" |
| "github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/top" |
| ) |
| |
| func init() { |
| register.Combiner3[boundedSumAccumInt64, int64, *int64](&boundedSumInt64Fn{}) |
| register.Combiner3[boundedSumAccumFloat64, float64, *float64](&boundedSumFloat64Fn{}) |
| register.Combiner3[expandValuesAccum, beam.V, [][]byte](&expandValuesCombineFn{}) |
| register.Combiner3[expandFloat64ValuesAccum, float64, []float64](&expandFloat64ValuesCombineFn{}) |
| |
| register.DoFn1x3[pairInt64, beam.W, int64, error](&decodePairInt64Fn{}) |
| register.DoFn1x3[pairFloat64, beam.W, float64, error](&decodePairFloat64Fn{}) |
| register.DoFn2x3[beam.U, kv.Pair, beam.U, beam.W, error](&dropValuesFn{}) |
| register.DoFn2x3[kv.Pair, []byte, beam.W, kv.Pair, error](&encodeKVFn{}) |
| register.DoFn2x3[beam.W, kv.Pair, kv.Pair, beam.V, error](&encodeIDKFn{}) |
| register.DoFn2x3[kv.Pair, beam.V, beam.W, kv.Pair, error](&decodeIDKFn{}) |
| register.DoFn1x3[pairArrayFloat64, beam.W, []float64, error](&decodePairArrayFloat64Fn{}) |
| |
| register.Function2x1[beam.V, beam.V, bool](randBool) |
| register.Function3x0[beam.W, []beam.V, func(beam.W, beam.V)](flattenValues) |
| register.Emitter2[beam.W, beam.V]() |
| register.Function2x2[kv.Pair, int64, []byte, pairInt64](rekeyInt64) |
| register.Function2x2[kv.Pair, float64, []byte, pairFloat64](rekeyFloat64) |
| register.Function2x2[kv.Pair, []float64, []byte, pairArrayFloat64](rekeyArrayFloat64) |
| register.Function2x2[beam.W, int64, beam.W, int64](clampNegativePartitionsInt64) |
| register.Function2x2[beam.W, float64, beam.W, float64](clampNegativePartitionsFloat64) |
| register.Function3x0[beam.V, *int64, func(beam.V, int64)](dropThresholdedPartitionsInt64) |
| register.Emitter2[beam.V, int64]() |
| register.Function3x0[beam.V, *float64, func(beam.V, float64)](dropThresholdedPartitionsFloat64) |
| register.Emitter2[beam.V, float64]() |
| register.Function3x0[beam.V, []float64, func(beam.V, []float64)](dropThresholdedPartitionsFloat64Slice) |
| register.Emitter2[beam.V, []float64]() |
| register.Function2x2[beam.W, *int64, beam.W, int64](dereferenceValueInt64) |
| register.Function2x2[beam.W, *float64, beam.W, float64](dereferenceValueFloat64) |
| register.Function2x3[kv.Pair, beam.V, kv.Pair, int64, error](convertToInt64Fn) |
| register.Function2x3[kv.Pair, beam.V, kv.Pair, float64, error](convertToFloat64Fn) |
| } |
| |
| // randBool returns a uniformly random boolean. The randomness used here is not |
| // cryptographically secure, and using this with top.LargestPerKey doesn't |
| // necessarily result in a uniformly random permutation: the distribution of |
| // the permutation depends on the exact sorting algorithm used by Beam and the |
| // order in which the input values are processed within the pipeline. |
| // |
| // The fact that the resulting permutation is not necessarily uniformly random is |
| // not a problem, since all we require from this function to satisfy DP properties |
| // is that it doesn't depend on the data. More specifically, in order to satisfy DP |
| // properties, a privacy unit's data should not influence another privacy unit's |
| // permutation of contributions. We assume that the order Beam processes the |
| // input values for a privacy unit is independent of other privacy units' |
| // inputs, in which case this requirement is satisfied. |
| func randBool(_, _ beam.V) bool { |
| return rand.Uint32()%2 == 0 |
| } |
| |
| // boundContributions takes a PCollection<K,V> as input, and for each key, selects and returns |
| // at most contributionLimit records with this key. The selection is "mostly random": |
| // the records returned are selected randomly, but the randomness isn't secure. |
| // This is fine to use in the cross-partition bounding stage or in the per-partition bounding stage, |
| // since the privacy guarantee doesn't depend on the privacy unit contributions being selected randomly. |
| // |
| // In order to do the cross-partition contribution bounding we need: |
| // 1. the key to be the privacy ID. |
| // 2. the value to be the partition ID or the pair = {partition ID, aggregated statistic}, |
| // where aggregated statistic is either array of values which are associated with the given id |
| // and partition, or sum/count/etc of these values. |
| // |
| // In order to do the per-partition contribution bounding we need: |
| // 1. the key to be the pair = {privacy ID, partition ID}. |
| // 2. the value to be just the value which is associated with that {privacy ID, partition ID} pair |
| // (there could be multiple entries with the same key). |
| func boundContributions(s beam.Scope, kvCol beam.PCollection, contributionLimit int64) beam.PCollection { |
| s = s.Scope("boundContributions") |
| // Transform the PCollection<K,V> into a PCollection<K,[]V>, where |
| // there are at most contributionLimit elements per slice, chosen randomly. To |
| // do that, the easiest solution seems to be to use the LargestPerKey |
| // function (that returns the contributionLimit "largest" elements), except |
| // the function used to sort elements is random. |
| sampled := top.LargestPerKey(s, kvCol, int(contributionLimit), randBool) |
| // Flatten the values for each key to get back a PCollection<K,V>. |
| return beam.ParDo(s, flattenValues, sampled) |
| } |
| |
| // Given a PCollection<K,[]V>, flattens the second argument to return a PCollection<K,V>. |
| func flattenValues(key beam.W, values []beam.V, emit func(beam.W, beam.V)) { |
| for _, v := range values { |
| emit(key, v) |
| } |
| } |
| |
| func findRekeyFn(kind reflect.Kind) (any, error) { |
| switch kind { |
| case reflect.Int64: |
| return rekeyInt64, nil |
| case reflect.Float64: |
| return rekeyFloat64, nil |
| default: |
| return nil, fmt.Errorf("kind(%v) should be int64 or float64", kind) |
| } |
| } |
| |
| // pairInt64 contains an encoded partition key and an int64 metric. |
| type pairInt64 struct { |
| K []byte |
| M int64 |
| } |
| |
| // rekeyInt64 transforms a PCollection<kv.Pair<codedK,codedV>,int64> into a |
| // PCollection<codedK,pairInt64<codedV,int>>. |
| func rekeyInt64(kv kv.Pair, m int64) ([]byte, pairInt64) { |
| return kv.K, pairInt64{kv.V, m} |
| } |
| |
| // pairFloat64 contains an encoded value and an float64 metric. |
| type pairFloat64 struct { |
| K []byte |
| M float64 |
| } |
| |
| // rekeyFloat64 transforms a PCollection<kv.Pair<codedK,codedV>,float64> into a |
| // PCollection<codedK,pairFloat64<codedV,int>>. |
| func rekeyFloat64(kv kv.Pair, m float64) ([]byte, pairFloat64) { |
| return kv.K, pairFloat64{kv.V, m} |
| } |
| |
| // pairArrayFloat64 contains an encoded partition key and a slice of float64 metrics. |
| type pairArrayFloat64 struct { |
| K []byte |
| M []float64 |
| } |
| |
| // rekeyArrayFloat64 transforms a PCollection<kv.Pair<codedK,codedV>,[]float64> into a |
| // PCollection<codedK,pairArrayFloat64<codedV,[]float64>>. |
| func rekeyArrayFloat64(kv kv.Pair, m []float64) ([]byte, pairArrayFloat64) { |
| return kv.K, pairArrayFloat64{kv.V, m} |
| } |
| |
| func newDecodePairFn(t reflect.Type, kind reflect.Kind) (any, error) { |
| switch kind { |
| case reflect.Int64: |
| return newDecodePairInt64Fn(t), nil |
| case reflect.Float64: |
| return newDecodePairFloat64Fn(t), nil |
| default: |
| return nil, fmt.Errorf("kind(%v) should be int64 or float64", kind) |
| } |
| } |
| |
| // decodePairInt64Fn transforms a PCollection<pairInt64<KX,int64>> into a |
| // PCollection<K,int64>. |
| type decodePairInt64Fn struct { |
| KType beam.EncodedType |
| kDec beam.ElementDecoder |
| } |
| |
| func newDecodePairInt64Fn(t reflect.Type) *decodePairInt64Fn { |
| return &decodePairInt64Fn{KType: beam.EncodedType{t}} |
| } |
| |
| func (fn *decodePairInt64Fn) Setup() { |
| fn.kDec = beam.NewElementDecoder(fn.KType.T) |
| } |
| |
| func (fn *decodePairInt64Fn) ProcessElement(pair pairInt64) (beam.W, int64, error) { |
| k, err := fn.kDec.Decode(bytes.NewBuffer(pair.K)) |
| if err != nil { |
| return nil, 0, fmt.Errorf("pbeam.decodePairInt64Fn.ProcessElement: couldn't decode pair %v: %w", pair, err) |
| } |
| return k, pair.M, nil |
| } |
| |
| // decodePairFloat64Fn transforms a PCollection<pairFloat64<codedK,float64>> into a |
| // PCollection<K,float64>. |
| type decodePairFloat64Fn struct { |
| KType beam.EncodedType |
| kDec beam.ElementDecoder |
| } |
| |
| func newDecodePairFloat64Fn(t reflect.Type) *decodePairFloat64Fn { |
| return &decodePairFloat64Fn{KType: beam.EncodedType{t}} |
| } |
| |
| func (fn *decodePairFloat64Fn) Setup() { |
| fn.kDec = beam.NewElementDecoder(fn.KType.T) |
| } |
| |
| func (fn *decodePairFloat64Fn) ProcessElement(pair pairFloat64) (beam.W, float64, error) { |
| k, err := fn.kDec.Decode(bytes.NewBuffer(pair.K)) |
| if err != nil { |
| return nil, 0.0, fmt.Errorf("pbeam.decodePairFloat64Fn.ProcessElement: couldn't decode pair %v: %w", pair, err) |
| } |
| return k, pair.M, nil |
| } |
| |
| // decodePairArrayFloat64Fn transforms a PCollection<pairArrayFloat64<codedK,[]float64>> into a |
| // PCollection<K,[]float64>. |
| type decodePairArrayFloat64Fn struct { |
| KType beam.EncodedType |
| kDec beam.ElementDecoder |
| } |
| |
| func newDecodePairArrayFloat64Fn(t reflect.Type) *decodePairArrayFloat64Fn { |
| return &decodePairArrayFloat64Fn{KType: beam.EncodedType{t}} |
| } |
| |
| func (fn *decodePairArrayFloat64Fn) Setup() { |
| fn.kDec = beam.NewElementDecoder(fn.KType.T) |
| } |
| |
| func (fn *decodePairArrayFloat64Fn) ProcessElement(pair pairArrayFloat64) (beam.W, []float64, error) { |
| k, err := fn.kDec.Decode(bytes.NewBuffer(pair.K)) |
| if err != nil { |
| return nil, nil, fmt.Errorf("pbeam.decodePairArrayFloat64Fn.ProcessElement: couldn't decode pair %v: %w", pair, err) |
| } |
| return k, pair.M, nil |
| } |
| |
| // newBoundedSumFn returns a boundedSumInt64Fn or boundedSumFloat64Fn depending on vKind. |
| func newBoundedSumFn(spec PrivacySpec, params SumParams, noiseKind noise.Kind, vKind reflect.Kind, publicPartitions bool) (any, error) { |
| var err, checkErr error |
| var bsFn any |
| switch vKind { |
| case reflect.Int64: |
| checkErr = checks.CheckBoundsFloat64AsInt64(params.MinValue, params.MaxValue) |
| if checkErr != nil { |
| return nil, checkErr |
| } |
| bsFn, err = newBoundedSumInt64Fn(spec, params, noiseKind, publicPartitions) |
| case reflect.Float64: |
| checkErr = checks.CheckBoundsFloat64(params.MinValue, params.MaxValue) |
| if checkErr != nil { |
| return nil, checkErr |
| } |
| bsFn, err = newBoundedSumFloat64Fn(spec, params, noiseKind, publicPartitions) |
| default: |
| err = fmt.Errorf("vKind(%v) should be int64 or float64", vKind) |
| } |
| |
| return bsFn, err |
| } |
| |
| type boundedSumAccumInt64 struct { |
| BS *dpagg.BoundedSumInt64 |
| SP *dpagg.PreAggSelectPartition |
| PublicPartitions bool |
| } |
| |
| // boundedSumInt64Fn is a differentially private combineFn for summing values. Do not |
| // initialize it yourself, use newBoundedSumInt64Fn to create a boundedSumInt64Fn instance. |
| type boundedSumInt64Fn struct { |
| // Privacy spec parameters (set during initial construction). |
| NoiseEpsilon float64 |
| PartitionSelectionEpsilon float64 |
| NoiseDelta float64 |
| PartitionSelectionDelta float64 |
| PreThreshold int64 |
| MaxPartitionsContributed int64 |
| Lower int64 |
| Upper int64 |
| NoiseKind noise.Kind |
| noise noise.Noise // Set during Setup phase according to NoiseKind. |
| PublicPartitions bool |
| TestMode TestMode |
| } |
| |
| // newBoundedSumInt64Fn returns a boundedSumInt64Fn with the given budget and parameters. |
| func newBoundedSumInt64Fn(spec PrivacySpec, params SumParams, noiseKind noise.Kind, publicPartitions bool) (*boundedSumInt64Fn, error) { |
| if noiseKind != noise.GaussianNoise && noiseKind != noise.LaplaceNoise { |
| return nil, fmt.Errorf("unknown noise.Kind (%v) is specified. Please specify a valid noise", noiseKind) |
| } |
| return &boundedSumInt64Fn{ |
| NoiseEpsilon: params.AggregationEpsilon, |
| NoiseDelta: params.AggregationDelta, |
| PartitionSelectionEpsilon: params.PartitionSelectionParams.Epsilon, |
| PartitionSelectionDelta: params.PartitionSelectionParams.Delta, |
| PreThreshold: spec.preThreshold, |
| MaxPartitionsContributed: params.MaxPartitionsContributed, |
| Lower: int64(params.MinValue), |
| Upper: int64(params.MaxValue), |
| NoiseKind: noiseKind, |
| PublicPartitions: publicPartitions, |
| TestMode: spec.testMode, |
| }, nil |
| } |
| |
| func (fn *boundedSumInt64Fn) Setup() { |
| fn.noise = noise.ToNoise(fn.NoiseKind) |
| if fn.TestMode.isEnabled() { |
| fn.noise = noNoise{} |
| } |
| } |
| |
| func (fn *boundedSumInt64Fn) CreateAccumulator() (boundedSumAccumInt64, error) { |
| if fn.TestMode == TestModeWithoutContributionBounding { |
| fn.Lower = math.MinInt64 |
| fn.Upper = math.MaxInt64 |
| } |
| var bs *dpagg.BoundedSumInt64 |
| var err error |
| bs, err = dpagg.NewBoundedSumInt64(&dpagg.BoundedSumInt64Options{ |
| Epsilon: fn.NoiseEpsilon, |
| Delta: fn.NoiseDelta, |
| MaxPartitionsContributed: fn.MaxPartitionsContributed, |
| Lower: fn.Lower, |
| Upper: fn.Upper, |
| Noise: fn.noise, |
| }) |
| if err != nil { |
| return boundedSumAccumInt64{}, err |
| } |
| accum := boundedSumAccumInt64{BS: bs, PublicPartitions: fn.PublicPartitions} |
| if !fn.PublicPartitions { |
| accum.SP, err = dpagg.NewPreAggSelectPartition(&dpagg.PreAggSelectPartitionOptions{ |
| Epsilon: fn.PartitionSelectionEpsilon, |
| Delta: fn.PartitionSelectionDelta, |
| PreThreshold: fn.PreThreshold, |
| MaxPartitionsContributed: fn.MaxPartitionsContributed, |
| }) |
| } |
| return accum, err |
| } |
| |
| func (fn *boundedSumInt64Fn) AddInput(a boundedSumAccumInt64, value int64) (boundedSumAccumInt64, error) { |
| err := a.BS.Add(value) |
| if err != nil { |
| return a, err |
| } |
| if !fn.PublicPartitions { |
| err := a.SP.Increment() |
| if err != nil { |
| return a, err |
| } |
| } |
| return a, nil |
| } |
| |
| func (fn *boundedSumInt64Fn) MergeAccumulators(a, b boundedSumAccumInt64) (boundedSumAccumInt64, error) { |
| err := a.BS.Merge(b.BS) |
| if err != nil { |
| return a, err |
| } |
| if !fn.PublicPartitions { |
| err := a.SP.Merge(b.SP) |
| if err != nil { |
| return a, err |
| } |
| } |
| return a, nil |
| } |
| |
| func (fn *boundedSumInt64Fn) ExtractOutput(a boundedSumAccumInt64) (*int64, error) { |
| if fn.TestMode.isEnabled() { |
| a.BS.Noise = noNoise{} |
| } |
| var err error |
| shouldKeepPartition := fn.TestMode.isEnabled() || a.PublicPartitions // If in test mode or public partitions are specified, we always keep the partition. |
| if !shouldKeepPartition { // If not, we need to perform private partition selection. |
| shouldKeepPartition, err = a.SP.ShouldKeepPartition() |
| if err != nil { |
| return nil, err |
| } |
| } |
| |
| if shouldKeepPartition { |
| result, err := a.BS.Result() |
| return &result, err |
| } |
| return nil, nil |
| } |
| |
| func (fn *boundedSumInt64Fn) String() string { |
| return fmt.Sprintf("%#v", fn) |
| } |
| |
| type boundedSumAccumFloat64 struct { |
| BS *dpagg.BoundedSumFloat64 |
| SP *dpagg.PreAggSelectPartition |
| PublicPartitions bool |
| } |
| |
| // boundedSumFloat64Fn is a differentially private combineFn for summing values. Do not |
| // initialize it yourself, use newBoundedSumFloat64Fn to create a boundedSumFloat64Fn instance. |
| type boundedSumFloat64Fn struct { |
| // Privacy spec parameters (set during initial construction). |
| NoiseEpsilon float64 |
| PartitionSelectionEpsilon float64 |
| NoiseDelta float64 |
| PartitionSelectionDelta float64 |
| PreThreshold int64 |
| MaxPartitionsContributed int64 |
| Lower float64 |
| Upper float64 |
| NoiseKind noise.Kind |
| // Noise, set during Setup phase according to NoiseKind. |
| noise noise.Noise |
| PublicPartitions bool |
| TestMode TestMode |
| } |
| |
| // newBoundedSumFloat64Fn returns a boundedSumFloat64Fn with the given budget and parameters. |
| func newBoundedSumFloat64Fn(spec PrivacySpec, params SumParams, noiseKind noise.Kind, publicPartitions bool) (*boundedSumFloat64Fn, error) { |
| if noiseKind != noise.GaussianNoise && noiseKind != noise.LaplaceNoise { |
| return nil, fmt.Errorf("unknown noise.Kind (%v) is specified. Please specify a valid noise", noiseKind) |
| } |
| return &boundedSumFloat64Fn{ |
| NoiseEpsilon: params.AggregationEpsilon, |
| NoiseDelta: params.AggregationDelta, |
| PartitionSelectionEpsilon: params.PartitionSelectionParams.Epsilon, |
| PartitionSelectionDelta: params.PartitionSelectionParams.Delta, |
| PreThreshold: spec.preThreshold, |
| MaxPartitionsContributed: params.MaxPartitionsContributed, |
| Lower: params.MinValue, |
| Upper: params.MaxValue, |
| NoiseKind: noiseKind, |
| PublicPartitions: publicPartitions, |
| TestMode: spec.testMode, |
| }, nil |
| } |
| |
| func (fn *boundedSumFloat64Fn) Setup() { |
| fn.noise = noise.ToNoise(fn.NoiseKind) |
| if fn.TestMode.isEnabled() { |
| fn.noise = noNoise{} |
| } |
| } |
| |
| func (fn *boundedSumFloat64Fn) CreateAccumulator() (boundedSumAccumFloat64, error) { |
| if fn.TestMode == TestModeWithoutContributionBounding { |
| fn.Lower = math.Inf(-1) |
| fn.Upper = math.Inf(1) |
| } |
| var bs *dpagg.BoundedSumFloat64 |
| var err error |
| bs, err = dpagg.NewBoundedSumFloat64(&dpagg.BoundedSumFloat64Options{ |
| Epsilon: fn.NoiseEpsilon, |
| Delta: fn.NoiseDelta, |
| MaxPartitionsContributed: fn.MaxPartitionsContributed, |
| Lower: fn.Lower, |
| Upper: fn.Upper, |
| Noise: fn.noise, |
| }) |
| if err != nil { |
| return boundedSumAccumFloat64{}, err |
| } |
| accum := boundedSumAccumFloat64{BS: bs, PublicPartitions: fn.PublicPartitions} |
| if !fn.PublicPartitions { |
| accum.SP, err = dpagg.NewPreAggSelectPartition(&dpagg.PreAggSelectPartitionOptions{ |
| Epsilon: fn.PartitionSelectionEpsilon, |
| Delta: fn.PartitionSelectionDelta, |
| PreThreshold: fn.PreThreshold, |
| MaxPartitionsContributed: fn.MaxPartitionsContributed, |
| }) |
| } |
| return accum, err |
| } |
| |
| func (fn *boundedSumFloat64Fn) AddInput(a boundedSumAccumFloat64, value float64) (boundedSumAccumFloat64, error) { |
| var err error |
| err = a.BS.Add(value) |
| if err != nil { |
| return a, err |
| } |
| if !fn.PublicPartitions { |
| err = a.SP.Increment() |
| } |
| return a, err |
| } |
| |
| func (fn *boundedSumFloat64Fn) MergeAccumulators(a, b boundedSumAccumFloat64) (boundedSumAccumFloat64, error) { |
| var err error |
| err = a.BS.Merge(b.BS) |
| if err != nil { |
| return a, err |
| } |
| if !fn.PublicPartitions { |
| err = a.SP.Merge(b.SP) |
| } |
| return a, err |
| } |
| |
| func (fn *boundedSumFloat64Fn) ExtractOutput(a boundedSumAccumFloat64) (*float64, error) { |
| if fn.TestMode.isEnabled() { |
| a.BS.Noise = noNoise{} |
| } |
| var err error |
| shouldKeepPartition := fn.TestMode.isEnabled() || a.PublicPartitions // If in test mode or public partitions are specified, we always keep the partition. |
| if !shouldKeepPartition { // If not, we need to perform private partition selection. |
| shouldKeepPartition, err = a.SP.ShouldKeepPartition() |
| if err != nil { |
| return nil, err |
| } |
| } |
| |
| if shouldKeepPartition { |
| result, err := a.BS.Result() |
| return &result, err |
| } |
| return nil, nil |
| } |
| |
| func (fn *boundedSumFloat64Fn) String() string { |
| return fmt.Sprintf("%#v", fn) |
| } |
| |
| // findDereferenceValueFn dereferences a *int64 to int64 or *float64 to float64. |
| func findDereferenceValueFn(kind reflect.Kind) (any, error) { |
| switch kind { |
| case reflect.Int64: |
| return dereferenceValueInt64, nil |
| case reflect.Float64: |
| return dereferenceValueFloat64, nil |
| default: |
| return nil, fmt.Errorf("kind(%v) should be int64 or float64", kind) |
| } |
| } |
| |
| func dereferenceValueInt64(key beam.W, value *int64) (k beam.W, v int64) { |
| return key, *value |
| } |
| |
| func dereferenceValueFloat64(key beam.W, value *float64) (k beam.W, v float64) { |
| return key, *value |
| } |
| |
| func findDropThresholdedPartitionsFn(kind reflect.Kind) (any, error) { |
| switch kind { |
| case reflect.Int64: |
| return dropThresholdedPartitionsInt64, nil |
| case reflect.Float64: |
| return dropThresholdedPartitionsFloat64, nil |
| default: |
| return nil, fmt.Errorf("kind(%v) should be int64 or float64", kind) |
| } |
| } |
| |
| // dropThresholdedPartitionsInt64 drops thresholded int partitions, i.e. those |
| // that have nil r, by emitting only non-thresholded partitions. |
| func dropThresholdedPartitionsInt64(v beam.V, r *int64, emit func(beam.V, int64)) { |
| if r != nil { |
| emit(v, *r) |
| } |
| } |
| |
| // dropThresholdedPartitionsFloat64 drops thresholded float partitions, i.e. those |
| // that have nil r, by emitting only non-thresholded partitions. |
| func dropThresholdedPartitionsFloat64(v beam.V, r *float64, emit func(beam.V, float64)) { |
| if r != nil { |
| emit(v, *r) |
| } |
| } |
| |
| // dropThresholdedPartitionsFloat64Slice drops thresholded []float64 partitions, i.e. |
| // those that have nil r, by emitting only non-thresholded partitions. |
| func dropThresholdedPartitionsFloat64Slice(v beam.V, r []float64, emit func(beam.V, []float64)) { |
| if len(r) != 0 { |
| emit(v, r) |
| } |
| } |
| |
| func findClampNegativePartitionsFn(kind reflect.Kind) (any, error) { |
| switch kind { |
| case reflect.Int64: |
| return clampNegativePartitionsInt64, nil |
| case reflect.Float64: |
| return clampNegativePartitionsFloat64, nil |
| default: |
| return nil, fmt.Errorf("kind(%v) should be int64 or float64", kind) |
| } |
| } |
| |
| // Clamp negative partitions to zero for int64 partitions, e.g., as a post aggregation step for Count. |
| func clampNegativePartitionsInt64(k beam.W, r int64) (beam.W, int64) { |
| if r < 0 { |
| return k, 0 |
| } |
| return k, r |
| } |
| |
| // Clamp negative partitions to zero for float64 partitions. |
| func clampNegativePartitionsFloat64(k beam.W, r float64) (beam.W, float64) { |
| if r < 0 { |
| return k, 0 |
| } |
| return k, r |
| } |
| |
| type dropValuesFn struct { |
| Codec *kv.Codec |
| } |
| |
| func (fn *dropValuesFn) Setup() { |
| fn.Codec.Setup() |
| } |
| |
| func (fn *dropValuesFn) ProcessElement(id beam.U, kv kv.Pair) (beam.U, beam.W, error) { |
| k, _, err := fn.Codec.Decode(kv) |
| return id, k, err |
| } |
| |
| // encodeKVFn takes a PCollection<kv.Pair{ID,K}, codedV> as input, and returns a |
| // PCollection<ID, kv.Pair{K,V}>; where K and V have been coded, and ID has been |
| // decoded. |
| type encodeKVFn struct { |
| InputPairCodec *kv.Codec // Codec for the input kv.Pair{ID,K} |
| } |
| |
| func newEncodeKVFn(idkCodec *kv.Codec) *encodeKVFn { |
| return &encodeKVFn{InputPairCodec: idkCodec} |
| } |
| |
| func (fn *encodeKVFn) Setup() error { |
| return fn.InputPairCodec.Setup() |
| } |
| |
| func (fn *encodeKVFn) ProcessElement(pair kv.Pair, codedV []byte) (beam.W, kv.Pair, error) { |
| id, _, err := fn.InputPairCodec.Decode(pair) |
| return id, kv.Pair{pair.V, codedV}, err // pair.V is the K in PCollection<kv.Pair{ID,K}, codedV> |
| } |
| |
| // encodeIDKFn takes a PCollection<ID,kv.Pair{K,V}> as input, and returns a |
| // PCollection<kv.Pair{ID,K},V>; where ID and K have been coded, and V has been |
| // decoded. |
| type encodeIDKFn struct { |
| IDType beam.EncodedType // Type information of the privacy ID |
| idEnc beam.ElementEncoder // Encoder for privacy ID, set during Setup() according to IDType |
| InputPairCodec *kv.Codec // Codec for the input kv.Pair{K,V} |
| } |
| |
| func newEncodeIDKFn(idType typex.FullType, kvCodec *kv.Codec) *encodeIDKFn { |
| return &encodeIDKFn{ |
| IDType: beam.EncodedType{idType.Type()}, |
| InputPairCodec: kvCodec, |
| } |
| } |
| |
| func (fn *encodeIDKFn) Setup() error { |
| fn.idEnc = beam.NewElementEncoder(fn.IDType.T) |
| return fn.InputPairCodec.Setup() |
| } |
| |
| func (fn *encodeIDKFn) ProcessElement(id beam.W, pair kv.Pair) (kv.Pair, beam.V, error) { |
| var idBuf bytes.Buffer |
| if err := fn.idEnc.Encode(id, &idBuf); err != nil { |
| return kv.Pair{}, nil, fmt.Errorf("pbeam.encodeIDKFn.ProcessElement: couldn't encode ID %v: %w", id, err) |
| } |
| _, v, err := fn.InputPairCodec.Decode(pair) |
| return kv.Pair{idBuf.Bytes(), pair.K}, v, err |
| } |
| |
| // decodeIDKFn is the reverse operation of encodeIDKFn. It takes a PCollection<kv.Pair{ID,K},V> |
| // as input, and returns a PCollection<ID, kv.Pair{K,V}>; where K and V has been coded, and ID |
| // has been decoded. |
| type decodeIDKFn struct { |
| VType beam.EncodedType // Type information of the value V |
| vEnc beam.ElementEncoder // Encoder for privacy ID, set during Setup() according to VType |
| InputPairCodec *kv.Codec // Codec for the input kv.Pair{ID,K} |
| } |
| |
| func newDecodeIDKFn(vType typex.FullType, idkCodec *kv.Codec) *decodeIDKFn { |
| return &decodeIDKFn{ |
| VType: beam.EncodedType{vType.Type()}, |
| InputPairCodec: idkCodec, |
| } |
| } |
| |
| func (fn *decodeIDKFn) Setup() error { |
| fn.vEnc = beam.NewElementEncoder(fn.VType.T) |
| return fn.InputPairCodec.Setup() |
| } |
| |
| func (fn *decodeIDKFn) ProcessElement(pair kv.Pair, v beam.V) (beam.W, kv.Pair, error) { |
| var vBuf bytes.Buffer |
| if err := fn.vEnc.Encode(v, &vBuf); err != nil { |
| return nil, kv.Pair{}, fmt.Errorf("pbeam.decodeIDKFn.ProcessElement: couldn't encode V %v: %w", v, err) |
| } |
| id, _, err := fn.InputPairCodec.Decode(pair) |
| return id, kv.Pair{pair.V, vBuf.Bytes()}, err // pair.V is the K in PCollection<kv.Pair{ID,K},V> |
| } |
| |
| func convertToInt64Fn(idk kv.Pair, i beam.V) (kv.Pair, int64, error) { |
| v := reflect.ValueOf(i) |
| if !v.Type().ConvertibleTo(reflect.TypeOf(int64(0))) { |
| return kv.Pair{}, 0, fmt.Errorf("unexpected value type of %v", v.Type()) |
| } |
| return idk, v.Convert(reflect.TypeOf(int64(0))).Int(), nil |
| } |
| |
| func convertToFloat64Fn(idk kv.Pair, i beam.V) (kv.Pair, float64, error) { |
| v := reflect.ValueOf(i) |
| if !v.Type().ConvertibleTo(reflect.TypeOf(float64(0))) { |
| return kv.Pair{}, 0, fmt.Errorf("unexpected value type of %v", v.Type()) |
| } |
| return idk, v.Convert(reflect.TypeOf(float64(0))).Float(), nil |
| } |
| |
| type expandValuesAccum struct { |
| Values [][]byte |
| } |
| |
| // expandValuesCombineFn converts a PCollection<K,V> to PCollection<K,[]V> where each value |
| // corresponding to the same key are collected in a slice. Resulting PCollection has a |
| // single slice for each key. |
| type expandValuesCombineFn struct { |
| VType beam.EncodedType |
| vEnc beam.ElementEncoder |
| } |
| |
| func newExpandValuesCombineFn(vType beam.EncodedType) *expandValuesCombineFn { |
| return &expandValuesCombineFn{VType: vType} |
| } |
| |
| func (fn *expandValuesCombineFn) Setup() { |
| fn.vEnc = beam.NewElementEncoder(fn.VType.T) |
| } |
| |
| func (fn *expandValuesCombineFn) CreateAccumulator() expandValuesAccum { |
| return expandValuesAccum{Values: make([][]byte, 0)} |
| } |
| |
| func (fn *expandValuesCombineFn) AddInput(a expandValuesAccum, value beam.V) (expandValuesAccum, error) { |
| var vBuf bytes.Buffer |
| if err := fn.vEnc.Encode(value, &vBuf); err != nil { |
| return a, fmt.Errorf("pbeam.expandValuesCombineFn.AddInput: couldn't encode V %v: %w", value, err) |
| } |
| a.Values = append(a.Values, vBuf.Bytes()) |
| return a, nil |
| } |
| |
| func (fn *expandValuesCombineFn) MergeAccumulators(a, b expandValuesAccum) expandValuesAccum { |
| a.Values = append(a.Values, b.Values...) |
| return a |
| } |
| |
| func (fn *expandValuesCombineFn) ExtractOutput(a expandValuesAccum) [][]byte { |
| return a.Values |
| } |
| |
| type expandFloat64ValuesAccum struct { |
| Values []float64 |
| } |
| |
| // expandFloat64ValuesCombineFn converts a PCollection<K,float64> to PCollection<K,[]float64> |
| // where each value corresponding to the same key are collected in a slice. Resulting |
| // PCollection has a single slice for each key. |
| type expandFloat64ValuesCombineFn struct{} |
| |
| func (fn *expandFloat64ValuesCombineFn) CreateAccumulator() expandFloat64ValuesAccum { |
| return expandFloat64ValuesAccum{Values: make([]float64, 0)} |
| } |
| |
| func (fn *expandFloat64ValuesCombineFn) AddInput(a expandFloat64ValuesAccum, value float64) expandFloat64ValuesAccum { |
| a.Values = append(a.Values, value) |
| return a |
| } |
| |
| func (fn *expandFloat64ValuesCombineFn) MergeAccumulators(a, b expandFloat64ValuesAccum) expandFloat64ValuesAccum { |
| a.Values = append(a.Values, b.Values...) |
| return a |
| } |
| |
| func (fn *expandFloat64ValuesCombineFn) ExtractOutput(a expandFloat64ValuesAccum) []float64 { |
| return a.Values |
| } |
| |
| // checkAggregationEpsilon returns an error if the AggregationEpsilon parameter of an aggregation is not valid. |
| // AggregationEpsilon is valid if 0 < AggregationEpsilon < +∞. |
| func checkAggregationEpsilon(epsilon float64) error { |
| return checks.CheckEpsilonStrict(epsilon, "AggregationEpsilon") |
| } |
| |
| // checkPartitionSelectionEpsilon returns an error if the PartitionSelectionEpsilon parameter of an aggregation is not valid. |
| // PartitionSelectionEpsilon is valid in the following cases: |
| // |
| // PartitionSelectionEpsilon == 0; if public partitions are used |
| // 0 < PartitionSelectionEpsilon < +∞; otherwise |
| func checkPartitionSelectionEpsilon(epsilon float64, publicPartitions any) error { |
| if publicPartitions != nil { |
| if epsilon != 0 { |
| return fmt.Errorf("PartitionSelectionEpsilon is %e, using public partitions requires setting PartitionSelectionEpsilon to 0", epsilon) |
| } |
| return nil |
| } |
| return checks.CheckEpsilonStrict(epsilon, "PartitionSelectionEpsilon") |
| } |
| |
| // checkAggregationDelta returns an error if the AggregationDelta parameter of an aggregation is not valid. |
| // AggregationDelta is valid in the following cases: |
| // |
| // AggregationDelta == 0; when laplace noise is used |
| // 0 < AggregationDelta < 1; otherwise |
| func checkAggregationDelta(delta float64, noiseKind noise.Kind) error { |
| if noiseKind == noise.LaplaceNoise { |
| return checks.CheckNoDelta(delta, "AggregationDelta") |
| } |
| return checks.CheckDeltaStrict(delta, "AggregationDelta") |
| } |
| |
| // checkPartitionSelectionDelta returns an error if the PartitionSelectionDelta parameter of an aggregation is not valid. |
| // PartitionSelectionDelta is valid in the following cases: |
| // |
| // PartitionSelectionDelta == 0; if public partitions are used |
| // 0 < PartitionSelectionDelta < 1; otherwise |
| func checkPartitionSelectionDelta(delta float64, publicPartitions any) error { |
| if publicPartitions != nil { |
| return checks.CheckNoDelta(delta, "PartitionSelectionDelta") |
| } |
| return checks.CheckDeltaStrict(delta, "PartitionSelectionDelta") |
| } |
| |
| // checkMaxPartitionsContributed returns an error if maxPartitionsContributed parameter of an aggregation |
| // is smaller than or equal to 0. |
| func checkMaxPartitionsContributed(maxPartitionsContributed int64) error { |
| if maxPartitionsContributed <= 0 { |
| return fmt.Errorf("MaxPartitionsContributed must be set to a positive value, was %d instead", maxPartitionsContributed) |
| } |
| return nil |
| } |
| |
| // checkMaxPartitionsContributed returns an error if maxPartitionsContributed parameter of a PartitionSelectionParams |
| // is set to anything other than 0. |
| func checkMaxPartitionsContributedPartitionSelection(maxPartitionsContributed int64) error { |
| if maxPartitionsContributed != 0 { |
| return fmt.Errorf("separate contribution bounding for partition selection is not supported: "+ |
| "PartitionSelectionParams.MaxPartitionsContributed must be unset (i.e. 0), was %d instead", maxPartitionsContributed) |
| } |
| return nil |
| } |
| |
| // checkNumericType returns an error if t is not a numeric type. |
| func checkNumericType(t typex.FullType) error { |
| switch t.Type().Kind() { |
| case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64: |
| return nil |
| case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64: |
| return nil |
| case reflect.Float32, reflect.Float64: |
| return nil |
| default: |
| return fmt.Errorf("unexpected value type of %v", t) |
| } |
| } |