blob: 58674e43bb28de080966bdd7a1c8469be5572fde [file] [log] [blame]
//
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
package pbeam
import (
"fmt"
"math"
"reflect"
log "github.com/golang/glog"
"github.com/google/differential-privacy/go/checks"
"github.com/google/differential-privacy/go/dpagg"
"github.com/google/differential-privacy/go/noise"
"github.com/google/differential-privacy/privacy-on-beam/internal/kv"
"github.com/apache/beam/sdks/go/pkg/beam"
)
func init() {
beam.RegisterType(reflect.TypeOf((*boundedMeanFloat64Fn)(nil)))
}
// MeanParams specifies the parameters associated with a Mean aggregation.
type MeanParams struct {
// Noise type (which is either LaplaceNoise{} or GaussianNoise{}).
//
// Defaults to LaplaceNoise{}.
NoiseKind NoiseKind
// Differential privacy budget consumed by this aggregation. If there is
// only one aggregation, both Epsilon and Delta can be left 0; in that
// case, the entire budget of the PrivacySpec is consumed.
Epsilon, Delta float64
// The maximum number of distinct values that a given privacy identifier
// can influence. There is an inherent trade-off when choosing this
// parameter: a larger MaxPartitionsContributed leads to less data loss due
// to contribution bounding, but since the noise added in aggregations is
// scaled according to maxPartitionsContributed, it also means that more
// noise is added to each mean.
//
// Required.
MaxPartitionsContributed int64
// The maximum number of contributions from a given privacy identifier
// for each key. There is an inherent trade-off when choosing this
// parameter: a larger MaxContributionsPerPartition leads to less data loss due
// to contribution bounding, but since the noise added in aggregations is
// scaled according to maxContributionsPerPartition, it also means that more
// noise is added to each mean.
//
// Required.
MaxContributionsPerPartition int64
// The total contribution of a given privacy identifier to partition can be
// at at least MinValue, and at most MaxValue; otherwise it will be clamped
// to these bounds. For example, if a privacy identifier is associated with
// the key-value pairs [("a", -5), ("a", 2), ("b", 7), ("c", 3)] and the
// (MinValue, MaxValue) bounds are (0, 5), the contribution for "a" will be
// clamped up to 0, the contribution for "b" will be clamped down to 5, and
// the contribution for "c" will be untouched. There is an inherent
// trade-off when choosing MinValue and MaxValue: a small MinValue and a
// large MaxValue means that less records will be clamped, but that more
// noise will be added.
//
// Required.
MinValue, MaxValue float64
// You can input the list of partitions present in the output if you know
// them in advance. When you specify partitions, partition selection /
// thresholding will be disabled and partitions will appear in the output
// if and only if they appear in the set of public partitions.
//
// You should not derive the list of partitions non-privately from private
// data. You should only use this in either of the following cases:
// 1. The list of partitions is data-independent. For example, if you are
// aggregating a metric by hour, you could provide a list of all possible
// hourly period.
// 2. You use a differentially private operation to come up with the list of
// partitions. For example, you could use the keys of a DistinctPrivacyID
// operation as the list of public partitions.
//
// Note that current implementation limitations only allow up to millions of
// public partitions.
//
// Optional.
PublicPartitions beam.PCollection
}
// MeanPerKey obtains the mean of the values associated with each key in a
// PrivatePCollection<K,V>, adding differentially private noise to the means and
// doing pre-aggregation thresholding to remove means with a low number of
// distinct privacy identifiers. Client can also specify a PCollection of partitions.
//
// Note: Do not use when your results may cause overflows for float64 values.
// This aggregation is not hardened for such applications yet.
//
// MeanPerKey transforms a PrivatePCollection<K,V> into a PCollection<K,float64>.
func MeanPerKey(s beam.Scope, pcol PrivatePCollection, params MeanParams) beam.PCollection {
s = s.Scope("pbeam.MeanPerKey")
// Obtain & validate type information from the underlying PCollection<K,V>.
idT, kvT := beam.ValidateKVType(pcol.col)
if kvT.Type() != reflect.TypeOf(kv.Pair{}) {
log.Exitf("MeanPerKey must be used on a PrivatePCollection of type <K,V>, got type %v instead", kvT)
}
if pcol.codec == nil {
log.Exitf("MeanPerKey: no codec found for the input PrivatePCollection.")
}
// Get privacy parameters.
spec := pcol.privacySpec
epsilon, delta, err := spec.consumeBudget(params.Epsilon, params.Delta)
if err != nil {
log.Exitf("Couldn't consume budget for Mean: %v", err)
}
var noiseKind noise.Kind
if params.NoiseKind == nil {
noiseKind = noise.LaplaceNoise
log.Infof("No NoiseKind specified, using Laplace Noise by default.")
} else {
noiseKind = params.NoiseKind.toNoiseKind()
}
err = checkMeanPerKeyParams(params, epsilon, delta, noiseKind)
if err != nil {
log.Exit(err)
}
// Drop non-public partitions, if public partitions are specified.
if (params.PublicPartitions).IsValid() {
if pcol.codec.KType.T != (params.PublicPartitions).Type().Type() {
log.Exitf("Public partitions must be of type %v. Got type %v instead.",
pcol.codec.KType.T, (params.PublicPartitions).Type().Type())
}
pcol.col = dropNonPublicPartitionsKVFn(s, params.PublicPartitions, pcol, pcol.codec.KType)
}
// First, group together the privacy ID and the partition ID and do per-partition contribution bounding.
// Result is PCollection<kv.Pair{ID,K},V>
encodeIDKFn := newEncodeIDKFn(idT, pcol.codec)
decoded := beam.ParDo(s,
encodeIDKFn,
pcol.col,
beam.TypeDefinition{Var: beam.VType, T: pcol.codec.VType.T})
maxContributionsPerPartition := getMaxContributionsPerPartition(params.MaxContributionsPerPartition)
// Don't do per-partition contribution bounding if in test mode without contribution bounding.
if spec.testMode != noNoiseWithoutContributionBounding {
decoded = boundContributions(s, decoded, maxContributionsPerPartition)
}
// Convert value to float64.
// Result is PCollection<kv.Pair{ID,K},float64>.
_, valueT := beam.ValidateKVType(decoded)
convertFn, err := findConvertToFloat64Fn(valueT)
if err != nil {
log.Exit(err)
}
converted := beam.ParDo(s, convertFn, decoded)
// Combine all values for <id, partition> into a slice.
// Result is PCollection<kv.Pair{ID,K},[]float64>.
combined := beam.CombinePerKey(s,
&expandFloat64ValuesCombineFn{},
converted)
// Result is PCollection<ID, pairArrayFloat64>.
maxPartitionsContributed := getMaxPartitionsContributed(spec, params.MaxPartitionsContributed)
rekeyed := beam.ParDo(s, rekeyArrayFloat64Fn, combined)
// Second, do cross-partition contribution bounding if not in test mode without contribution bounding.
if spec.testMode != noNoiseWithoutContributionBounding {
rekeyed = boundContributions(s, rekeyed, maxPartitionsContributed)
}
// Now that the cross-partition contribution bounding is done, remove the privacy keys and decode the values.
// Result is PCollection<partition, []float64>.
partialPairs := beam.DropKey(s, rekeyed)
partitionT := pcol.codec.KType.T
partialKV := beam.ParDo(s,
newDecodePairArrayFloat64Fn(partitionT),
partialPairs,
beam.TypeDefinition{Var: beam.XType, T: partitionT})
// Add public partitions and return the aggregation output, if public partitions are specified.
if (params.PublicPartitions).IsValid() {
return addPublicPartitionsForMean(s, epsilon, delta, maxPartitionsContributed,
params, noiseKind, partialKV, spec.testMode)
}
// Compute the mean for each partition. Result is PCollection<partition, float64>.
means := beam.CombinePerKey(s,
newBoundedMeanFloat64Fn(epsilon, delta, maxPartitionsContributed, params.MaxContributionsPerPartition, params.MinValue, params.MaxValue, noiseKind, false, spec.testMode, false),
partialKV)
// Finally, drop thresholded partitions.
return beam.ParDo(s, dropThresholdedPartitionsFloat64Fn, means)
}
func addPublicPartitionsForMean(s beam.Scope, epsilon, delta float64, maxPartitionsContributed int64, params MeanParams, noiseKind noise.Kind, partialKV beam.PCollection, testMode testMode) beam.PCollection {
// Compute the mean for each partition with non-public partitions dropped. Result is PCollection<partition, float64>.
means := beam.CombinePerKey(s,
newBoundedMeanFloat64Fn(epsilon, delta, maxPartitionsContributed, params.MaxContributionsPerPartition, params.MinValue, params.MaxValue, noiseKind, true, testMode, false),
partialKV)
partitionT, _ := beam.ValidateKVType(means)
meansPartitions := beam.DropValue(s, means)
// Create map with partitions in the data as keys.
partitionMap := beam.Combine(s, newPartitionsMapFn(beam.EncodedType{partitionT.Type()}), meansPartitions)
partitionsCol := params.PublicPartitions
// Add value of empty array to each partition key in PublicPartitions.
publicPartitionsWithValues := beam.ParDo(s, addDummyValuesToPublicPartitionsFloat64SliceFn, partitionsCol)
// emptyPublicPartitions are the partitions that are public but not found in the data.
emptyPublicPartitions := beam.ParDo(s, newEmitPartitionsNotInTheDataFn(partitionT), publicPartitionsWithValues, beam.SideInput{Input: partitionMap})
// Add noise to the empty public partitions.
emptyMeans := beam.CombinePerKey(s,
newBoundedMeanFloat64Fn(epsilon, delta, maxPartitionsContributed, params.MaxContributionsPerPartition, params.MinValue, params.MaxValue, noiseKind, true, testMode, true),
emptyPublicPartitions)
means = beam.ParDo(s, dereferenceValueToFloat64, means)
emptyMeans = beam.ParDo(s, dereferenceValueToFloat64, emptyMeans)
// Merge means from data with means from the empty public partitions.
allMeans := beam.Flatten(s, means, emptyMeans)
return allMeans
}
func checkMeanPerKeyParams(params MeanParams, epsilon, delta float64, noiseKind noise.Kind) error {
err := checks.CheckEpsilon("pbeam.MeanPerKey", epsilon)
if err != nil {
return err
}
if (params.PublicPartitions).IsValid() && noiseKind == noise.LaplaceNoise {
err = checks.CheckNoDelta("pbeam.MeanPerKey", delta)
} else {
err = checks.CheckDeltaStrict("pbeam.MeanPerKey", delta)
}
if err != nil {
return err
}
err = checks.CheckBoundsFloat64("pbeam.MeanPerKey", params.MinValue, params.MaxValue)
if err != nil {
return err
}
return checks.CheckMaxPartitionsContributed("pbeam.MeanPerKey", params.MaxPartitionsContributed)
}
type boundedMeanAccumFloat64 struct {
BM *dpagg.BoundedMeanFloat64
SP *dpagg.PreAggSelectPartition
PublicPartitions bool
}
// boundedMeanFloat64Fn is a differentially private combineFn for obtaining mean of values. Do not
// initialize it yourself, use newBoundedMeanFloat64Fn to create a boundedMeanFloat64Fn instance.
type boundedMeanFloat64Fn struct {
// Privacy spec parameters (set during initial construction).
NoiseEpsilon float64
PartitionSelectionEpsilon float64
NoiseDelta float64
PartitionSelectionDelta float64
MaxPartitionsContributed int64
MaxContributionsPerPartition int64
Lower float64
Upper float64
NoiseKind noise.Kind
noise noise.Noise // Set during Setup phase according to NoiseKind.
PublicPartitions bool
TestMode testMode
EmptyPartitions bool // Set to true if this combineFn is for adding noise to empty public partitions.
}
// newBoundedMeanFloat64Fn returns a boundedMeanFloat64Fn with the given budget and parameters.
func newBoundedMeanFloat64Fn(epsilon, delta float64, maxPartitionsContributed, maxContributionsPerPartition int64, lower, upper float64, noiseKind noise.Kind, publicPartitions bool, testMode testMode, emptyPartitions bool) *boundedMeanFloat64Fn {
fn := &boundedMeanFloat64Fn{
MaxPartitionsContributed: maxPartitionsContributed,
MaxContributionsPerPartition: maxContributionsPerPartition,
Lower: lower,
Upper: upper,
NoiseKind: noiseKind,
PublicPartitions: publicPartitions,
TestMode: testMode,
EmptyPartitions: emptyPartitions,
}
if fn.PublicPartitions {
fn.NoiseEpsilon = epsilon
fn.NoiseDelta = delta
return fn
}
fn.NoiseEpsilon = epsilon / 2
fn.PartitionSelectionEpsilon = epsilon - fn.NoiseEpsilon
switch noiseKind {
case noise.GaussianNoise:
fn.NoiseDelta = delta / 2
case noise.LaplaceNoise:
fn.NoiseDelta = 0
default:
// TODO: return error instead
log.Exitf("newBoundedMeanFloat64Fn: unknown noise.Kind (%v) is specified. Please specify a valid noise.", noiseKind)
}
fn.PartitionSelectionDelta = delta - fn.NoiseDelta
return fn
}
func (fn *boundedMeanFloat64Fn) Setup() {
fn.noise = noise.ToNoise(fn.NoiseKind)
if fn.TestMode.isEnabled() {
fn.noise = noNoise{}
}
}
func (fn *boundedMeanFloat64Fn) CreateAccumulator() boundedMeanAccumFloat64 {
if fn.TestMode == noNoiseWithoutContributionBounding && !fn.EmptyPartitions {
fn.Lower = math.Inf(-1)
fn.Upper = math.Inf(1)
}
accum := boundedMeanAccumFloat64{
BM: dpagg.NewBoundedMeanFloat64(&dpagg.BoundedMeanFloat64Options{
Epsilon: fn.NoiseEpsilon,
Delta: fn.NoiseDelta,
MaxPartitionsContributed: fn.MaxPartitionsContributed,
MaxContributionsPerPartition: fn.MaxContributionsPerPartition,
Lower: fn.Lower,
Upper: fn.Upper,
Noise: fn.noise,
}), PublicPartitions: fn.PublicPartitions}
if !fn.PublicPartitions {
accum.SP = dpagg.NewPreAggSelectPartition(&dpagg.PreAggSelectPartitionOptions{
Epsilon: fn.PartitionSelectionEpsilon,
Delta: fn.PartitionSelectionDelta,
MaxPartitionsContributed: fn.MaxPartitionsContributed,
})
}
return accum
}
func (fn *boundedMeanFloat64Fn) AddInput(a boundedMeanAccumFloat64, values []float64) boundedMeanAccumFloat64 {
// We can have multiple values for each (privacy_key, partition_key) pair.
// We need to add each value to BoundedMean as input but we need to add a single input
// for each privacy_key to SelectPartition.
for _, v := range values {
a.BM.Add(v)
}
if !fn.PublicPartitions {
a.SP.Increment()
}
return a
}
func (fn *boundedMeanFloat64Fn) MergeAccumulators(a, b boundedMeanAccumFloat64) boundedMeanAccumFloat64 {
a.BM.Merge(b.BM)
if !fn.PublicPartitions {
a.SP.Merge(b.SP)
}
return a
}
func (fn *boundedMeanFloat64Fn) ExtractOutput(a boundedMeanAccumFloat64) *float64 {
if fn.TestMode.isEnabled() {
a.BM.NormalizedSum.Noise = noNoise{}
a.BM.Count.Noise = noNoise{}
}
if fn.TestMode.isEnabled() || a.PublicPartitions || a.SP.ShouldKeepPartition() {
result := a.BM.Result()
return &result
}
return nil
}
func (fn *boundedMeanFloat64Fn) String() string {
return fmt.Sprintf("%#v", fn)
}