blob: 83b9ff9711227a5c602912ffa7af18cad01cff0d [file] [log] [blame]
//
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
package pbeam
import (
"fmt"
"reflect"
log "github.com/golang/glog"
"github.com/google/differential-privacy/go/v2/checks"
"github.com/google/differential-privacy/go/v2/dpagg"
"github.com/google/differential-privacy/go/v2/noise"
"github.com/google/differential-privacy/privacy-on-beam/v2/internal/kv"
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/filter"
)
func init() {
beam.RegisterType(reflect.TypeOf((*countFn)(nil)))
// TODO: add tests to make sure we don't forget anything here
}
// DistinctPrivacyIDParams specifies the parameters associated with a
// DistinctPrivacyID aggregation.
type DistinctPrivacyIDParams struct {
// Noise type (which is either LaplaceNoise{} or GaussianNoise{}).
//
// Defaults to LaplaceNoise{}.
NoiseKind NoiseKind
// Differential privacy budget consumed by this aggregation. If there is
// only one aggregation, both Epsilon and Delta can be left 0; in that
// case, the entire budget of the PrivacySpec is consumed.
Epsilon, Delta float64
// The maximum number of distinct values that a given privacy identifier
// can influence. If a privacy identifier is associated with more values,
// random values will be dropped. There is an inherent trade-off when
// choosing this parameter: a larger MaxPartitionsContributed leads to less
// data loss due to contribution bounding, but since the noise added in
// aggregations is scaled according to maxPartitionsContributed, it also
// means that more noise is added to each count.
//
// Required.
MaxPartitionsContributed int64
// You can input the list of partitions present in the output if you know
// them in advance. When you specify partitions, partition selection /
// thresholding will be disabled and partitions will appear in the output
// if and only if they appear in the set of public partitions.
//
// You should not derive the list of partitions non-privately from private
// data. You should only use this in either of the following cases:
// 1. The list of partitions is data-independent. For example, if you are
// aggregating a metric by hour, you could provide a list of all possible
// hourly period.
// 2. You use a differentially private operation to come up with the list of
// partitions. For example, you could use the output of a SelectPartitions
// operation or the keys of a DistinctPrivacyID operation as the list of
// public partitions.
//
// PublicPartitions needs to be a beam.PCollection, slice, or array. The
// underlying type needs to match the partition type of the PrivatePCollection.
//
// Prefer slices or arrays if the list of public partitions is small and
// can fit into memory (e.g., up to a million). Prefer beam.PCollection
// otherwise.
//
// Optional.
PublicPartitions interface{}
}
// DistinctPrivacyID counts the number of distinct privacy identifiers
// associated with each value in a PrivatePCollection, adding differentially
// private noise to the counts and doing post-aggregation thresholding to
// remove low counts. It is conceptually equivalent to calling Count with
// MaxValue=1, but is specifically optimized for this use case.
//
// It is also possible to manually specify the list of partitions
// present in the output, in which case the partition selection/thresholding
// step is skipped.
//
// DistinctPrivacyID transforms a PrivatePCollection<V> into a
// PCollection<V,int64>.
//
// Note: Do not use when your results may cause overflows for int64 values.
// This aggregation is not hardened for such applications yet.
func DistinctPrivacyID(s beam.Scope, pcol PrivatePCollection, params DistinctPrivacyIDParams) beam.PCollection {
s = s.Scope("pbeam.DistinctPrivacyID")
// Obtain type information from the underlying PCollection<K,V>.
idT, partitionT := beam.ValidateKVType(pcol.col)
var noiseKind noise.Kind
if params.NoiseKind == nil {
noiseKind = noise.LaplaceNoise
log.Infof("No NoiseKind specified, using Laplace Noise by default.")
} else {
noiseKind = params.NoiseKind.toNoiseKind()
}
// Get privacy parameters.
spec := pcol.privacySpec
epsilon, delta, err := spec.consumeBudget(params.Epsilon, params.Delta)
if err != nil {
log.Fatalf("Couldn't consume budget for DistinctPrivacyID: %v", err)
}
err = checkDistinctPrivacyIDParams(params, epsilon, delta, noiseKind, partitionT.Type())
if err != nil {
log.Fatalf("pbeam.DistinctPrivacyID: %v", err)
}
maxPartitionsContributed, err := getMaxPartitionsContributed(spec, params.MaxPartitionsContributed)
if err != nil {
log.Fatalf("Couldn't get MaxPartitionsContributed for DistinctPrivacyID: %v", err)
}
// Drop non-public partitions, if public partitions are specified.
pcol.col, err = dropNonPublicPartitions(s, pcol, params.PublicPartitions, partitionT.Type())
if err != nil {
log.Fatalf("Couldn't drop non-public partitions for DistinctPrivacyID: %v", err)
}
// First, deduplicate KV pairs by encoding them and calling Distinct.
coded := beam.ParDo(s, kv.NewEncodeFn(idT, partitionT), pcol.col)
distinct := filter.Distinct(s, coded)
decodeFn := kv.NewDecodeFn(idT, partitionT)
decoded := beam.ParDo(s,
decodeFn,
distinct,
beam.TypeDefinition{Var: beam.TType, T: idT.Type()},
beam.TypeDefinition{Var: beam.VType, T: partitionT.Type()})
// Second, do cross-partition contribution bounding if not in test mode without contribution bounding.
if spec.testMode != noNoiseWithoutContributionBounding {
decoded = boundContributions(s, decoded, maxPartitionsContributed)
}
// Third, now that KV pairs are deduplicated and contribution bounding is
// done, remove the keys and count how many times each value appears.
values := beam.DropKey(s, decoded)
emptyCounts := beam.ParDo(s, addOneValueFn, values)
// Add public partitions and return the aggregation output, if public partitions are specified.
if params.PublicPartitions != nil {
return addPublicPartitionsForDistinctID(s, params, epsilon, delta, maxPartitionsContributed, noiseKind, emptyCounts, spec.testMode)
}
countFn, err := newCountFn(epsilon, delta, maxPartitionsContributed, noiseKind, false, spec.testMode)
if err != nil {
log.Fatalf("pbeam.DistinctPrivacyID: %v", err)
}
noisedCounts := beam.CombinePerKey(s, countFn, emptyCounts)
// Drop thresholded partitions.
counts := beam.ParDo(s, dropThresholdedPartitionsInt64Fn, noisedCounts)
// Clamp negative counts to zero and return.
return beam.ParDo(s, clampNegativePartitionsInt64Fn, counts)
}
func addPublicPartitionsForDistinctID(s beam.Scope, params DistinctPrivacyIDParams, epsilon, delta float64,
maxPartitionsContributed int64, noiseKind noise.Kind, countsKV beam.PCollection, testMode testMode) beam.PCollection {
publicPartitions, isPCollection := params.PublicPartitions.(beam.PCollection)
if !isPCollection {
publicPartitions = beam.Reshuffle(s, beam.CreateList(s, params.PublicPartitions))
}
prepareAddPublicPartitions := beam.ParDo(s, addZeroValuesToPublicPartitionsInt64Fn, publicPartitions)
// Merge countsKV and prepareAddPublicPartitions.
allAddPartitions := beam.Flatten(s, countsKV, prepareAddPublicPartitions)
countFn, err := newCountFn(epsilon, delta, maxPartitionsContributed, noiseKind, true, testMode)
if err != nil {
log.Fatalf("pbeam.DistinctPrivacyID: %v", err)
}
noisedCounts := beam.CombinePerKey(s, countFn, allAddPartitions)
finalPartitions := beam.ParDo(s, dereferenceValueToInt64Fn, noisedCounts)
// Clamp negative counts to zero and return.
return beam.ParDo(s, clampNegativePartitionsInt64Fn, finalPartitions)
}
func checkDistinctPrivacyIDParams(params DistinctPrivacyIDParams, epsilon, delta float64, noiseKind noise.Kind, partitionType reflect.Type) error {
err := checkPublicPartitions(params.PublicPartitions, partitionType)
if err != nil {
return err
}
err = checks.CheckEpsilon(epsilon)
if err != nil {
return err
}
err = checkDelta(delta, noiseKind, params.PublicPartitions)
return err
}
func addOneValueFn(v beam.V) (beam.V, int64) {
return v, 1
}
// countFn is the accumulator used for counting the number of values in a partition.
type countFn struct {
// Privacy spec parameters (set during initial construction).
Epsilon float64
NoiseDelta float64
ThresholdDelta float64
MaxPartitionsContributed int64
NoiseKind noise.Kind
noise noise.Noise // Set during Setup phase according to NoiseKind.
PublicPartitions bool
TestMode testMode
}
// newCountFn returns a newCountFn with the given budget and parameters.
func newCountFn(epsilon, delta float64, maxPartitionsContributed int64, noiseKind noise.Kind, publicPartitions bool, testMode testMode) (*countFn, error) {
fn := &countFn{
MaxPartitionsContributed: maxPartitionsContributed,
NoiseKind: noiseKind,
PublicPartitions: publicPartitions,
TestMode: testMode,
}
fn.Epsilon = epsilon
if fn.PublicPartitions {
fn.NoiseDelta = delta
return fn, nil
}
switch noiseKind {
case noise.GaussianNoise:
fn.NoiseDelta = delta / 2
case noise.LaplaceNoise:
fn.NoiseDelta = 0
default:
return nil, fmt.Errorf("unknown noise.Kind (%v) is specified. Please specify a valid noise", noiseKind)
}
fn.ThresholdDelta = delta - fn.NoiseDelta
return fn, nil
}
func (fn *countFn) Setup() {
fn.noise = noise.ToNoise(fn.NoiseKind)
if fn.TestMode.isEnabled() {
fn.noise = noNoise{}
}
}
type countAccum struct {
C *dpagg.Count
PublicPartitions bool
}
func (fn *countFn) CreateAccumulator() (countAccum, error) {
c, err := dpagg.NewCount(&dpagg.CountOptions{
Epsilon: fn.Epsilon,
Delta: fn.NoiseDelta,
MaxPartitionsContributed: fn.MaxPartitionsContributed,
Noise: fn.noise,
})
return countAccum{C: c, PublicPartitions: fn.PublicPartitions}, err
}
// AddInput increments the count by one for each contribution. Does nothing when the
// the value is 0, which is the case only for empty public partitions.
func (fn *countFn) AddInput(a countAccum, value int64) (countAccum, error) {
var err error
if value != 0 {
err = a.C.Increment()
}
return a, err
}
func (fn *countFn) MergeAccumulators(a, b countAccum) (countAccum, error) {
err := a.C.Merge(b.C)
return a, err
}
func (fn *countFn) ExtractOutput(a countAccum) (*int64, error) {
if fn.TestMode.isEnabled() {
a.C.Noise = noNoise{}
}
if a.PublicPartitions {
result, err := a.C.Result()
return &result, err
}
return a.C.ThresholdedResult(fn.ThresholdDelta)
}
func (fn *countFn) String() string {
return fmt.Sprintf("%#v", fn)
}