blob: 971c5edfe71dff3827c79cf50a09ca5e65c39663 [file] [log] [blame]
//
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
package pbeam
import (
"fmt"
"reflect"
log "github.com/golang/glog"
"github.com/google/differential-privacy/go/v2/checks"
"github.com/google/differential-privacy/go/v2/noise"
"github.com/google/differential-privacy/privacy-on-beam/v2/internal/kv"
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/stats"
)
// CountParams specifies the parameters associated with a Count aggregation.
type CountParams struct {
// Noise type (which is either LaplaceNoise{} or GaussianNoise{}).
//
// Defaults to LaplaceNoise{}.
NoiseKind NoiseKind
// Differential privacy budget consumed by this aggregation. If there is
// only one aggregation, both Epsilon and Delta can be left 0; in that
// case, the entire budget of the PrivacySpec is consumed.
Epsilon, Delta float64
// The maximum number of distinct values that a given privacy identifier
// can influence. If a privacy identifier is associated with more values,
// random values will be dropped. There is an inherent trade-off when
// choosing this parameter: a larger MaxPartitionsContributed leads to less
// data loss due to contribution bounding, but since the noise added in
// aggregations is scaled according to maxPartitionsContributed, it also
// means that more noise is added to each count.
//
// Required.
MaxPartitionsContributed int64
// The maximum number of times that a privacy identifier can contribute to
// a single count (or, equivalently, the maximum value that a privacy
// identifier can add to a single count in total). If MaxValue=10 and a
// privacy identifier is associated with the same value in 15 records, Count
// ignores 5 of these records and only adds 10 to the count for this value.
// There is an inherent trade-off when choosing MaxValue: a larger
// parameter means that fewer records are lost, but a larger noise is added.
//
// Required.
MaxValue int64
// You can input the list of partitions present in the output if you know
// them in advance. When you specify partitions, partition selection /
// thresholding will be disabled and partitions will appear in the output
// if and only if they appear in the set of public partitions.
//
// You should not derive the list of partitions non-privately from private
// data. You should only use this in either of the following cases:
// 1. The list of partitions is data-independent. For example, if you are
// aggregating a metric by hour, you could provide a list of all possible
// hourly period.
// 2. You use a differentially private operation to come up with the list of
// partitions. For example, you could use the output of a SelectPartitions
// operation or the keys of a DistinctPrivacyID operation as the list of
// public partitions.
//
// PublicPartitions needs to be a beam.PCollection, slice, or array. The
// underlying type needs to match the partition type of the PrivatePCollection.
//
// Prefer slices or arrays if the list of public partitions is small and
// can fit into memory (e.g., up to a million). Prefer beam.PCollection
// otherwise.
//
// Optional.
PublicPartitions interface{}
}
// Count counts the number of times a value appears in a PrivatePCollection,
// adding differentially private noise to the counts and doing pre-aggregation
// thresholding to remove counts with a low number of distinct privacy
// identifiers.
//
// It is also possible to manually specify the list of partitions
// present in the output, in which case the partition selection/thresholding
// step is skipped.
//
// Note: Do not use when your results may cause overflows for int64 values.
// This aggregation is not hardened for such applications yet.
//
// Count transforms a PrivatePCollection<V> into a PCollection<V, int64>.
func Count(s beam.Scope, pcol PrivatePCollection, params CountParams) beam.PCollection {
s = s.Scope("pbeam.Count")
// Obtain type information from the underlying PCollection<K,V>.
idT, partitionT := beam.ValidateKVType(pcol.col)
// Get privacy parameters.
spec := pcol.privacySpec
epsilon, delta, err := spec.consumeBudget(params.Epsilon, params.Delta)
if err != nil {
log.Fatalf("Couldn't consume budget for Count: %v", err)
}
var noiseKind noise.Kind
if params.NoiseKind == nil {
noiseKind = noise.LaplaceNoise
log.Infof("No NoiseKind specified, using Laplace Noise by default.")
} else {
noiseKind = params.NoiseKind.toNoiseKind()
}
err = checkCountParams(params, epsilon, delta, noiseKind, partitionT.Type())
if err != nil {
log.Fatalf("pbeam.Count: %v", err)
}
maxPartitionsContributed, err := getMaxPartitionsContributed(spec, params.MaxPartitionsContributed)
if err != nil {
log.Fatalf("Couldn't get MaxPartitionsContributed for Count: %v", err)
}
// Drop non-public partitions, if public partitions are specified.
pcol.col, err = dropNonPublicPartitions(s, pcol, params.PublicPartitions, partitionT.Type())
if err != nil {
log.Fatalf("Couldn't drop non-public partitions for Count: %v", err)
}
// First, encode KV pairs, count how many times each one appears,
// and re-key by the original privacy key.
coded := beam.ParDo(s, kv.NewEncodeFn(idT, partitionT), pcol.col)
kvCounts := stats.Count(s, coded)
counts64 := beam.ParDo(s, vToInt64Fn, kvCounts)
rekeyed := beam.ParDo(s, rekeyInt64Fn, counts64)
// Second, do cross-partition contribution bounding if not in test mode without contribution bounding.
if spec.testMode != noNoiseWithoutContributionBounding {
rekeyed = boundContributions(s, rekeyed, maxPartitionsContributed)
}
// Third, now that contribution bounding is done, remove the privacy keys,
// decode the value, and sum all the counts bounded by maxCountContrib.
countPairs := beam.DropKey(s, rekeyed)
decodePairInt64Fn := newDecodePairInt64Fn(partitionT.Type())
countsKV := beam.ParDo(s,
decodePairInt64Fn,
countPairs,
beam.TypeDefinition{Var: beam.XType, T: partitionT.Type()})
// Add public partitions and return the aggregation output, if public partitions are specified.
if params.PublicPartitions != nil {
return addPublicPartitionsForCount(s, epsilon, delta, maxPartitionsContributed, params, noiseKind, countsKV, spec.testMode)
}
boundedSumInt64Fn, err := newBoundedSumInt64Fn(epsilon, delta, maxPartitionsContributed, 0, params.MaxValue, noiseKind, false, spec.testMode)
if err != nil {
log.Fatalf("Couldn't get boundedSumInt64Fn for Count: %v", err)
}
sums := beam.CombinePerKey(s,
boundedSumInt64Fn,
countsKV)
// Drop thresholded partitions.
counts := beam.ParDo(s, dropThresholdedPartitionsInt64Fn, sums)
// Clamp negative counts to zero and return.
return beam.ParDo(s, clampNegativePartitionsInt64Fn, counts)
}
func checkCountParams(params CountParams, epsilon, delta float64, noiseKind noise.Kind, partitionType reflect.Type) error {
err := checkPublicPartitions(params.PublicPartitions, partitionType)
if err != nil {
return err
}
err = checks.CheckEpsilon(epsilon)
if err != nil {
return err
}
err = checkDelta(delta, noiseKind, params.PublicPartitions)
if err != nil {
return err
}
if params.MaxValue <= 0 {
return fmt.Errorf("MaxValue should be strictly positive, got %d", params.MaxValue)
}
return nil
}
func addPublicPartitionsForCount(s beam.Scope, epsilon, delta float64, maxPartitionsContributed int64, params CountParams, noiseKind noise.Kind, countsKV beam.PCollection, testMode testMode) beam.PCollection {
// Turn PublicPartitions from PCollection<K> into PCollection<K, int64> by adding
// the value zero to each K.
publicPartitions, isPCollection := params.PublicPartitions.(beam.PCollection)
if !isPCollection {
publicPartitions = beam.Reshuffle(s, beam.CreateList(s, params.PublicPartitions))
}
emptyCounts := beam.ParDo(s, addZeroValuesToPublicPartitionsInt64Fn, publicPartitions)
// Merge countsKV and emptyCounts.
allPartitions := beam.Flatten(s, emptyCounts, countsKV)
// Sum and add noise.
boundedSumInt64Fn, err := newBoundedSumInt64Fn(epsilon, delta, maxPartitionsContributed, 0, params.MaxValue, noiseKind, true, testMode)
if err != nil {
log.Fatalf("Couldn't get boundedSumInt64Fn for Count: %v", err)
}
sums := beam.CombinePerKey(s, boundedSumInt64Fn, allPartitions)
finalPartitions := beam.ParDo(s, dereferenceValueToInt64Fn, sums)
// Clamp negative counts to zero and return.
return beam.ParDo(s, clampNegativePartitionsInt64Fn, finalPartitions)
}