blob: e40cfdc03a7a19d2c15d7692d8c8efad20bcdb15 [file] [log] [blame]
//
// Copyright 2021 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
package pbeam
import (
"fmt"
"reflect"
log "github.com/golang/glog"
"github.com/google/differential-privacy/go/v2/checks"
"github.com/google/differential-privacy/go/v2/dpagg"
"github.com/google/differential-privacy/go/v2/noise"
"github.com/google/differential-privacy/privacy-on-beam/v2/internal/kv"
"github.com/apache/beam/sdks/v2/go/pkg/beam"
)
func init() {
beam.RegisterType(reflect.TypeOf((*boundedQuantilesFn)(nil)))
}
// QuantilesParams specifies the parameters associated with a Quantiles aggregation.
type QuantilesParams struct {
// Noise type (which is either LaplaceNoise{} or GaussianNoise{}).
//
// Defaults to LaplaceNoise{}.
NoiseKind NoiseKind
// Differential privacy budget consumed by this aggregation. If there is
// only one aggregation, both Epsilon and Delta can be left 0; in that
// case, the entire budget of the PrivacySpec is consumed.
Epsilon, Delta float64
// The maximum number of distinct values that a given privacy identifier
// can influence. There is an inherent trade-off when choosing this
// parameter: a larger MaxPartitionsContributed leads to less data loss due
// to contribution bounding, but since the noise added in aggregations is
// scaled according to maxPartitionsContributed, it also means that more
// noise is added to each quantile.
//
// Required.
MaxPartitionsContributed int64
// The maximum number of contributions from a given privacy identifier
// for each key. There is an inherent trade-off when choosing this
// parameter: a larger MaxContributionsPerPartition leads to less data loss due
// to contribution bounding, but since the noise added in aggregations is
// scaled according to maxContributionsPerPartition, it also means that more
// noise is added to each quantile.
//
// Required.
MaxContributionsPerPartition int64
// A single contribution of a given privacy identifier to partition can be
// at at least MinValue, and at most MaxValue; otherwise it will be clamped
// to these bounds. There is an inherent trade-off when choosing MinValue and
// MaxValue: a small MinValue and a large MaxValue means that less records
// will be clamped, but that more noise will be added.
// For example, if a privacy identifier is associated with the key-value
// pairs [("a", -5), ("a", 2), ("b", 7), ("c", 3)] and the (MinValue, MaxValue)
// bounds are (0, 5), the contribution for "a" will be clamped up to 0,
// the contribution for "b" will be clamped down to 5, and the contribution
// for "c" will be untouched.
//
// Required.
MinValue, MaxValue float64
// Percentile ranks that the quantiles should be computed for. Each rank must
// be between zero and one. The DP quantile operation returns a list of
// quantile values corresponding to the respective ranks. E.g., a percentile
// rank of 0.2 yields a quantile value that is greater than 20% and less than
// 80% of the values in the data set.
//
// Note that computing multiple quantiles does not consume extra privacy budget,
// i.e. computing multiple quantiles does not make each quantile less accurate
// for a fixed privacy budget.
Ranks []float64
// You can input the list of partitions present in the output if you know
// them in advance. When you specify partitions, partition selection /
// thresholding will be disabled and partitions will appear in the output
// if and only if they appear in the set of public partitions.
//
// You should not derive the list of partitions non-privately from private
// data. You should only use this in either of the following cases:
// 1. The list of partitions is data-independent. For example, if you are
// aggregating a metric by hour, you could provide a list of all possible
// hourly period.
// 2. You use a differentially private operation to come up with the list of
// partitions. For example, you could use the output of a SelectPartitions
// operation or the keys of a DistinctPrivacyID operation as the list of
// public partitions.
//
// PublicPartitions needs to be a beam.PCollection, slice, or array. The
// underlying type needs to match the partition type of the PrivatePCollection.
//
// Prefer slices or arrays if the list of public partitions is small and
// can fit into memory (e.g., up to a million). Prefer beam.PCollection
// otherwise.
//
// Optional.
PublicPartitions interface{}
}
// QuantilesPerKey computes one or multiple quantiles of the values associated with each
// key in a PrivatePCollection<K,V>, adding differentially private noise to the quantiles
// and doing pre-aggregation thresholding to remove partitions with a low number of
// distinct privacy identifiers.
//
// It is also possible to manually specify the list of partitions
// present in the output, in which case the partition selection/thresholding
// step is skipped.
//
// QuantilesPerKey transforms a PrivatePCollection<K,V> into a PCollection<K,[]float64>.
//
// Note that due to the implementation details of the internal Quantiles algorithm, using pbeamtest
// with QuantilesPerKey has two caveats:
//
// 1. Even without DP noise, the output will be slightly noisy. You can use
// pbeamtest.QuantilesTolerance() to account for that noise.
// 2. It is not possible to not clamp input values when using
// pbeamtest.NewPrivacySpecNoNoiseWithoutContributionBounding(), so clamping to Min/MaxValue will
// still be applied. However, MaxContributionsPerPartition and MaxPartitionsContributed contribution
// bounding will be disabled.
func QuantilesPerKey(s beam.Scope, pcol PrivatePCollection, params QuantilesParams) beam.PCollection {
s = s.Scope("pbeam.QuantilesPerKey")
// Obtain & validate type information from the underlying PCollection<K,V>.
idT, kvT := beam.ValidateKVType(pcol.col)
if kvT.Type() != reflect.TypeOf(kv.Pair{}) {
log.Fatalf("QuantilesPerKey must be used on a PrivatePCollection of type <K,V>, got type %v instead", kvT)
}
if pcol.codec == nil {
log.Fatalf("QuantilesPerKey: no codec found for the input PrivatePCollection.")
}
// Get privacy parameters.
spec := pcol.privacySpec
epsilon, delta, err := spec.consumeBudget(params.Epsilon, params.Delta)
if err != nil {
log.Fatalf("Couldn't consume budget for Quantiles: %v", err)
}
var noiseKind noise.Kind
if params.NoiseKind == nil {
noiseKind = noise.LaplaceNoise
log.Infof("No NoiseKind specified, using Laplace Noise by default.")
} else {
noiseKind = params.NoiseKind.toNoiseKind()
}
err = checkQuantilesPerKeyParams(params, epsilon, delta, noiseKind, pcol.codec.KType.T)
if err != nil {
log.Fatalf("pbeam.QuantilesPerKey: %v", err)
}
// Drop non-public partitions, if public partitions are specified.
pcol.col, err = dropNonPublicPartitions(s, pcol, params.PublicPartitions, pcol.codec.KType.T)
if err != nil {
log.Fatalf("Couldn't drop non-public partitions for Quantiles: %v", err)
}
// First, group together the privacy ID and the partition ID and do per-partition contribution bounding.
// Result is PCollection<kv.Pair{ID,K},V>
encodeIDKFn := newEncodeIDKFn(idT, pcol.codec)
decoded := beam.ParDo(s,
encodeIDKFn,
pcol.col,
beam.TypeDefinition{Var: beam.VType, T: pcol.codec.VType.T})
// Don't do per-partition contribution bounding if in test mode without contribution bounding.
if spec.testMode != noNoiseWithoutContributionBounding {
decoded = boundContributions(s, decoded, params.MaxContributionsPerPartition)
}
// Convert value to float64.
// Result is PCollection<kv.Pair{ID,K},float64>.
_, valueT := beam.ValidateKVType(decoded)
convertFn, err := findConvertToFloat64Fn(valueT)
if err != nil {
log.Fatalf("Couldn't get convertFn for QuantilesPerKey: %v", err)
}
converted := beam.ParDo(s, convertFn, decoded)
// Combine all values for <id, partition> into a slice.
// Result is PCollection<kv.Pair{ID,K},[]float64>.
combined := beam.CombinePerKey(s,
&expandFloat64ValuesCombineFn{},
converted)
// Result is PCollection<ID, pairArrayFloat64>.
maxPartitionsContributed, err := getMaxPartitionsContributed(spec, params.MaxPartitionsContributed)
if err != nil {
log.Fatalf("Couldn't get MaxPartitionsContributed for QuantilesPerKey: %v", err)
}
rekeyed := beam.ParDo(s, rekeyArrayFloat64Fn, combined)
// Second, do cross-partition contribution bounding if not in test mode without contribution bounding.
if spec.testMode != noNoiseWithoutContributionBounding {
rekeyed = boundContributions(s, rekeyed, maxPartitionsContributed)
}
// Now that the cross-partition contribution bounding is done, remove the privacy keys and decode the values.
// Result is PCollection<partition, []float64>.
partialPairs := beam.DropKey(s, rekeyed)
partitionT := pcol.codec.KType.T
partialKV := beam.ParDo(s,
newDecodePairArrayFloat64Fn(partitionT),
partialPairs,
beam.TypeDefinition{Var: beam.XType, T: partitionT})
// Add public partitions and return the aggregation output, if public partitions are specified.
if params.PublicPartitions != nil {
return addPublicPartitionsForQuantiles(s, epsilon, delta, maxPartitionsContributed,
params, noiseKind, partialKV, spec.testMode)
}
// Compute the quantiles for each partition. Result is PCollection<partition, []float64>.
boundedQuantilesFn, err := newBoundedQuantilesFn(boundedQuantilesFnParams{
epsilon: epsilon,
delta: delta,
maxPartitionsContributed: maxPartitionsContributed,
maxContributionsPerPartition: params.MaxContributionsPerPartition,
minValue: params.MinValue,
maxValue: params.MaxValue,
noiseKind: noiseKind,
ranks: params.Ranks,
publicPartitions: false,
testMode: spec.testMode,
emptyPartitions: false})
if err != nil {
log.Fatalf("Couldn't get boundedQuantilesFn for QuantilesPerKey: %v", err)
}
quantiles := beam.CombinePerKey(s,
boundedQuantilesFn,
partialKV)
// Finally, drop thresholded partitions.
return beam.ParDo(s, dropThresholdedPartitionsFloat64SliceFn, quantiles)
}
func addPublicPartitionsForQuantiles(s beam.Scope, epsilon, delta float64, maxPartitionsContributed int64, params QuantilesParams, noiseKind noise.Kind, partialKV beam.PCollection, testMode testMode) beam.PCollection {
// Compute the quantiles for each partition with non-public partitions dropped. Result is PCollection<partition, map[float64]float64>.
boundedQuantilesFn, err := newBoundedQuantilesFn(boundedQuantilesFnParams{
epsilon: epsilon,
delta: delta,
maxPartitionsContributed: maxPartitionsContributed,
maxContributionsPerPartition: params.MaxContributionsPerPartition,
minValue: params.MinValue,
maxValue: params.MaxValue,
noiseKind: noiseKind,
ranks: params.Ranks,
publicPartitions: true,
testMode: testMode,
emptyPartitions: false})
if err != nil {
log.Fatalf("Couldn't get boundedQuantilesFn for QuantilesPerKey: %v", err)
}
quantiles := beam.CombinePerKey(s,
boundedQuantilesFn,
partialKV)
partitionT, _ := beam.ValidateKVType(quantiles)
quantilesPartitions := beam.DropValue(s, quantiles)
// Create map with partitions in the data as keys.
partitionMap := beam.Combine(s, newPartitionsMapFn(beam.EncodedType{partitionT.Type()}), quantilesPartitions)
publicPartitions, isPCollection := params.PublicPartitions.(beam.PCollection)
if !isPCollection {
publicPartitions = beam.Reshuffle(s, beam.CreateList(s, params.PublicPartitions))
}
// Add value of empty array to each partition key in PublicPartitions.
publicPartitionsWithValues := beam.ParDo(s, addEmptySliceToPublicPartitionsFloat64Fn, publicPartitions)
// emptyPublicPartitions are the partitions that are public but not found in the data.
emptyPublicPartitions := beam.ParDo(s, newEmitPartitionsNotInTheDataFn(partitionT), publicPartitionsWithValues, beam.SideInput{Input: partitionMap})
// Compute DP quantiles for empty public partitions.
boundedQuantilesFn, err = newBoundedQuantilesFn(boundedQuantilesFnParams{
epsilon: epsilon,
delta: delta,
maxPartitionsContributed: maxPartitionsContributed,
maxContributionsPerPartition: params.MaxContributionsPerPartition,
minValue: params.MinValue,
maxValue: params.MaxValue,
noiseKind: noiseKind,
ranks: params.Ranks,
publicPartitions: true,
testMode: testMode,
emptyPartitions: true})
if err != nil {
log.Fatalf("Couldn't get boundedQuantilesFn for QuantilesPerKey: %v", err)
}
emptyQuantiles := beam.CombinePerKey(s,
boundedQuantilesFn,
emptyPublicPartitions)
// Merge quantiles from data with quantiles from the empty public partitions.
allQuantiles := beam.Flatten(s, quantiles, emptyQuantiles)
return allQuantiles
}
func checkQuantilesPerKeyParams(params QuantilesParams, epsilon, delta float64, noiseKind noise.Kind, partitionType reflect.Type) error {
err := checkPublicPartitions(params.PublicPartitions, partitionType)
if err != nil {
return err
}
err = checks.CheckEpsilon(epsilon)
if err != nil {
return err
}
err = checkDelta(delta, noiseKind, params.PublicPartitions)
if err != nil {
return err
}
err = checks.CheckBoundsFloat64(params.MinValue, params.MaxValue)
if err != nil {
return err
}
err = checks.CheckBoundsNotEqual(params.MinValue, params.MaxValue)
if err != nil {
return err
}
if len(params.Ranks) == 0 {
return fmt.Errorf("there should at least be one rank to compute")
}
for i, rank := range params.Ranks {
if rank < 0.0 || rank > 1.0 {
return fmt.Errorf("Ranks[%d]=%f must be >= 0 and <= 1", i, rank)
}
}
return checks.CheckMaxContributionsPerPartition(params.MaxContributionsPerPartition)
}
type boundedQuantilesAccum struct {
BQ *dpagg.BoundedQuantiles
SP *dpagg.PreAggSelectPartition
PublicPartitions bool
}
// boundedQuantilesFn is a differentially private combineFn for computing quantiles of values. Do not
// initialize it yourself, use newBoundedQuantilesFn to create a boundedQuantilesFn instance.
type boundedQuantilesFn struct {
// Privacy spec parameters (set during initial construction).
NoiseEpsilon float64
PartitionSelectionEpsilon float64
NoiseDelta float64
PartitionSelectionDelta float64
MaxPartitionsContributed int64
MaxContributionsPerPartition int64
Lower float64
Upper float64
NoiseKind noise.Kind
noise noise.Noise // Set during Setup phase according to NoiseKind.
Ranks []float64
PublicPartitions bool // Set to true if public partitions are used.
TestMode testMode
EmptyPartitions bool // Set to true if this combineFn is for adding noise to empty public partitions.
}
// boundedQuantilesFnParams contains the parameters for creating a new boundedQuantilesFn.
type boundedQuantilesFnParams struct {
epsilon float64
delta float64
maxPartitionsContributed int64
maxContributionsPerPartition int64
minValue float64
maxValue float64
noiseKind noise.Kind
ranks []float64
publicPartitions bool // Set to true if public partitions are used.
testMode testMode
emptyPartitions bool // Set to true if the boundedQuantilesFn is for adding noise to empty public partitions.
}
// newBoundedQuantilesFn returns a boundedQuantilesFn with the given budget and parameters.
func newBoundedQuantilesFn(params boundedQuantilesFnParams) (*boundedQuantilesFn, error) {
fn := &boundedQuantilesFn{
MaxPartitionsContributed: params.maxPartitionsContributed,
MaxContributionsPerPartition: params.maxContributionsPerPartition,
Lower: params.minValue,
Upper: params.maxValue,
NoiseKind: params.noiseKind,
Ranks: params.ranks,
PublicPartitions: params.publicPartitions,
TestMode: params.testMode,
EmptyPartitions: params.emptyPartitions,
}
if fn.PublicPartitions {
fn.NoiseEpsilon = params.epsilon
fn.NoiseDelta = params.delta
return fn, nil
}
fn.NoiseEpsilon = params.epsilon / 2
fn.PartitionSelectionEpsilon = params.epsilon - fn.NoiseEpsilon
switch params.noiseKind {
case noise.GaussianNoise:
fn.NoiseDelta = params.delta / 2
case noise.LaplaceNoise:
fn.NoiseDelta = 0
default:
return nil, fmt.Errorf("unknown noise.Kind (%v) is specified. Please specify a valid noise", params.noiseKind)
}
fn.PartitionSelectionDelta = params.delta - fn.NoiseDelta
return fn, nil
}
func (fn *boundedQuantilesFn) Setup() {
fn.noise = noise.ToNoise(fn.NoiseKind)
if fn.TestMode.isEnabled() {
fn.noise = noNoise{}
}
}
func (fn *boundedQuantilesFn) CreateAccumulator() (boundedQuantilesAccum, error) {
var bq *dpagg.BoundedQuantiles
var err error
bq, err = dpagg.NewBoundedQuantiles(&dpagg.BoundedQuantilesOptions{
Epsilon: fn.NoiseEpsilon,
Delta: fn.NoiseDelta,
MaxPartitionsContributed: fn.MaxPartitionsContributed,
MaxContributionsPerPartition: fn.MaxContributionsPerPartition,
Lower: fn.Lower,
Upper: fn.Upper,
Noise: fn.noise,
})
if err != nil {
return boundedQuantilesAccum{}, err
}
accum := boundedQuantilesAccum{BQ: bq, PublicPartitions: fn.PublicPartitions}
if !fn.PublicPartitions {
accum.SP, err = dpagg.NewPreAggSelectPartition(&dpagg.PreAggSelectPartitionOptions{
Epsilon: fn.PartitionSelectionEpsilon,
Delta: fn.PartitionSelectionDelta,
MaxPartitionsContributed: fn.MaxPartitionsContributed,
})
}
return accum, err
}
func (fn *boundedQuantilesFn) AddInput(a boundedQuantilesAccum, values []float64) (boundedQuantilesAccum, error) {
var err error
// We can have multiple values for each (privacy_key, partition_key) pair.
// We need to add each value to BoundedQuantiles as input but we need to add a single input
// for each privacy_key to SelectPartition.
for _, v := range values {
err = a.BQ.Add(v)
if err != nil {
return a, err
}
}
if !fn.PublicPartitions {
err = a.SP.Increment()
}
return a, err
}
func (fn *boundedQuantilesFn) MergeAccumulators(a, b boundedQuantilesAccum) (boundedQuantilesAccum, error) {
var err error
err = a.BQ.Merge(b.BQ)
if err != nil {
return a, err
}
if !fn.PublicPartitions {
err = a.SP.Merge(b.SP)
}
return a, err
}
func (fn *boundedQuantilesFn) ExtractOutput(a boundedQuantilesAccum) ([]float64, error) {
if fn.TestMode.isEnabled() {
a.BQ.Noise = noNoise{}
}
var err error
shouldKeepPartition := fn.TestMode.isEnabled() || a.PublicPartitions // If in test mode or public partitions are specified, we always keep the partition.
if !shouldKeepPartition { // If not, we need to perform private partition selection.
shouldKeepPartition, err = a.SP.ShouldKeepPartition()
if err != nil {
return nil, err
}
}
if shouldKeepPartition {
result := make([]float64, len(fn.Ranks))
for i, rank := range fn.Ranks {
result[i], err = a.BQ.Result(rank)
if err != nil {
return nil, err
}
}
return result, nil
}
return nil, nil
}
func (fn *boundedQuantilesFn) String() string {
return fmt.Sprintf("%#v", fn)
}