blob: c40d07acdee7934a9483ab5789b39a362744b449 [file] [log] [blame]
//
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
package pbeam
import (
"fmt"
"reflect"
log "github.com/golang/glog"
"github.com/google/differential-privacy/go/v2/checks"
"github.com/google/differential-privacy/go/v2/dpagg"
"github.com/google/differential-privacy/privacy-on-beam/v2/internal/kv"
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/filter"
)
func init() {
beam.RegisterType(reflect.TypeOf((*partitionSelectionFn)(nil)))
beam.RegisterFunction(dropThresholdedPartitionsBoolFn)
}
// SelectPartitionsParams specifies the parameters associated with a
// SelectPartitions aggregation.
type SelectPartitionsParams struct {
// Differential privacy budget consumed by this aggregation. If there is
// only one aggregation, both Epsilon and Delta can be left 0; in that
// case, the entire budget of the PrivacySpec is consumed.
Epsilon, Delta float64
// The maximum number of distinct keys that a given privacy identifier
// can influence. If a privacy identifier is associated to more keys,
// random keys will be dropped. There is an inherent trade-off when
// choosing this parameter: a larger MaxPartitionsContributed leads to less
// data loss due to contribution bounding, but since the noise added in
// aggregations is scaled according to maxPartitionsContributed, it also
// means that more noise is added to each count.
//
// Required.
MaxPartitionsContributed int64
}
// SelectPartitions performs differentially private partition selection using
// dpagg.PreAggSelectPartitions and returns the list of partitions to keep as
// a PCollection.
//
// In a PrivatePCollection<K,V>, K is the partition key and in a PrivatePCollection<V>,
// V is the partition key. SelectPartitions transforms a PrivatePCollection<K,V> into a
// PCollection<K> and a PrivatePCollection<V> into a PCollection<V>.
func SelectPartitions(s beam.Scope, pcol PrivatePCollection, params SelectPartitionsParams) beam.PCollection {
s = s.Scope("pbeam.SelectPartitions")
// Obtain type information from the underlying PCollection<K,V>.
_, pT := beam.ValidateKVType(pcol.col)
epsilon, delta, err := pcol.privacySpec.consumeBudget(params.Epsilon, params.Delta)
if err != nil {
log.Fatalf("Couldn't consume budget for SelectPartition: %v", err)
}
spec := pcol.privacySpec
maxPartitionsContributed, err := getMaxPartitionsContributed(spec, params.MaxPartitionsContributed)
if err != nil {
log.Fatalf("Couldn't get MaxPartitionsContributed for SelectPartitions: %v", err)
}
err = checkSelectPartitionsParams(epsilon, delta, maxPartitionsContributed)
if err != nil {
log.Fatalf("pbeam.SelectPartitions: %v", err)
}
// First, we drop the values if we have (privacyKey, partitionKey, value) tuples.
// Afterwards, we will have (privacyKey, partitionKey) pairs.
// If we initially have (privacyKey, partitionKey) pairs already, we do nothing.
partitions := pcol.col
if pT.Type() == reflect.TypeOf(kv.Pair{}) {
if pcol.codec == nil {
log.Fatalf("SelectPartitions: no codec found for the input PrivatePCollection.")
}
partitions = beam.ParDo(s, &dropValuesFn{pcol.codec}, pcol.col, beam.TypeDefinition{Var: beam.WType, T: pcol.codec.VType.T})
}
// Second, we keep one contribution per user for each partition.
idT, partitionT := beam.ValidateKVType(partitions)
coded := beam.ParDo(s, kv.NewEncodeFn(idT, partitionT), partitions)
coded = filter.Distinct(s, coded)
decodeFn := kv.NewDecodeFn(idT, partitionT)
partitions = beam.ParDo(s, decodeFn, coded, beam.TypeDefinition{Var: beam.TType, T: idT.Type()}, beam.TypeDefinition{Var: beam.VType, T: partitionT.Type()})
// Third, do cross-partition contribution bounding if not in test mode without contribution bounding.
if spec.testMode != noNoiseWithoutContributionBounding {
partitions = boundContributions(s, partitions, maxPartitionsContributed)
}
// Finally, we swap the privacy and partition key and perform partition selection.
partitions = beam.SwapKV(s, partitions) // PCollection<K, ID>
partitions = beam.CombinePerKey(s, newPartitionSelectionFn(epsilon, delta, maxPartitionsContributed, spec.testMode), partitions)
return beam.ParDo(s, dropThresholdedPartitionsBoolFn, partitions)
}
func checkSelectPartitionsParams(epsilon, delta float64, maxPartitionsContributed int64) error {
err := checks.CheckEpsilon(epsilon)
if err != nil {
return err
}
return checks.CheckDeltaStrict(delta)
}
type partitionSelectionAccum struct {
SP *dpagg.PreAggSelectPartition
}
type partitionSelectionFn struct {
Epsilon float64
Delta float64
MaxPartitionsContributed int64
TestMode testMode
}
func newPartitionSelectionFn(epsilon, delta float64, maxPartitionsContributed int64, testMode testMode) *partitionSelectionFn {
return &partitionSelectionFn{Epsilon: epsilon, Delta: delta, MaxPartitionsContributed: maxPartitionsContributed, TestMode: testMode}
}
func (fn *partitionSelectionFn) CreateAccumulator() (partitionSelectionAccum, error) {
sp, err := dpagg.NewPreAggSelectPartition(&dpagg.PreAggSelectPartitionOptions{
Epsilon: fn.Epsilon,
Delta: fn.Delta,
MaxPartitionsContributed: fn.MaxPartitionsContributed})
return partitionSelectionAccum{SP: sp}, err
}
func (fn *partitionSelectionFn) AddInput(a partitionSelectionAccum, _ beam.W) (partitionSelectionAccum, error) {
err := a.SP.Increment()
return a, err
}
func (fn *partitionSelectionFn) MergeAccumulators(a, b partitionSelectionAccum) (partitionSelectionAccum, error) {
err := a.SP.Merge(b.SP)
return a, err
}
func (fn *partitionSelectionFn) ExtractOutput(a partitionSelectionAccum) (bool, error) {
if fn.TestMode.isEnabled() {
return true, nil
}
return a.SP.ShouldKeepPartition()
}
func (fn *partitionSelectionFn) String() string {
return fmt.Sprintf("%#v", fn)
}
// dropThresholdedPartitionsBoolFn drops thresholded bool partitions, i.e. those
// that have false v, by emitting only non-thresholded partitions. Differently from
// other dropThresholdedPartitionsFn's, since v only indicates whether or not a
// partition should be kept, the value is not emitted with the partition key.
func dropThresholdedPartitionsBoolFn(k beam.W, v bool, emit func(beam.W)) {
if v {
emit(k)
}
}