blob: d052e2493e1add6bd9f2609e88f0ebc8f95c282d [file] [log] [blame]
//
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Package pbeam provides an API for building differentially private data
// processing pipelines using Apache Beam (https://beam.apache.org) with its
// Go SDK (https://godoc.org/github.com/apache/beam/sdks/v2/go/pkg/beam).
//
// It introduces the concept of a PrivatePCollection, an interface mirroring
// Apache Beam's PCollection concept. PrivatePCollection implements additional
// restrictions and aggregations to facilitate differentially private analysis.
// This API is meant to be used by developers without differential privacy
// expertise.
//
// For a step-by-step introduction to differential privacy, Apache Beam, and
// example usage of this library, see:
// https://codelabs.developers.google.com/codelabs/privacy-on-beam/index.html;
// a codelab meant for developers who want to get started on using this library
// and generating differentially private metrics.
//
// The rest of this package-level comment goes into more detail about the
// precise guarantees offered by this API, and assumes some familiarity with
// the Apache Beam model, its Go SDK, and differential privacy.
//
// To understand the main API contract provided by PrivatePCollection, consider
// the following example pipeline.
//
// p := beam.NewPipeline()
// s := p.Root()
// // The input is a series of files in which each line contains the data of a privacy unit (e.g. an individual).
// input := textio.Read(s, "/path/to/files/*.txt") // input is a PCollection<string>
// // Extracts the privacy ID and the data associated with each line: extractID is a func(string) (userID,data).
// icol := beam.ParDo(s, input, extractID) // icol is a PCollection<privacyUnitID,data>
// // Transforms the input PCollection into a PrivatePCollection with parameters ε=1 and δ=10⁻¹⁰.
// // The privacy ID is "hidden" by the operation: pcol behaves as if it were a PCollection<data>.
// pcol := MakePrivate(s, icol, NewPrivacySpec(1, 1e-10)) // pcol is a PrivatePCollection<data>
// // Arbitrary transformations can be applied to the data…
// pcol = ParDo(s, pcol, someDoFn)
// pcol = ParDo(s, pcol, otherDoFn)
// // …and to retrieve PCollection outputs, differentially private aggregations must be used.
// // For example, assuming pcol is now a PrivatePCollection<field,float64>:
// sumParams := SumParams{MaxPartitionsContributed: 10, MaxValue: 5}
// ocol := SumPerKey(s, pcol2, sumParams) // ocol is a PCollection<field,float64>
// // And it is now possible to output this data.
// textio.Write(s, "/path/to/output/file", ocol)
//
// The behavior of PrivatePCollection is similar to the behavior of PCollection.
// In particular, it implements arbitrary per-record transformations via ParDo.
// However, the contents of a PrivatePCollection cannot be written to disk.
// For example, there is no equivalent of:
//
// textio.Write(s, "/path/to/output/file", pcol)
//
// In order to retrieve data encapsulated in a PrivatePCollection, it is
// necessary to use one of the differentially private aggregations provided with
// this library (e.g., count, sum, mean), which transforms the
// PrivatePCollection back into a PCollection.
//
// This is because of the API contract provided by this library: once data is
// encapsulated in a PrivatePCollection, all its outputs are differentially
// private. More precisely, suppose a PrivatePCollection pcol is created from a
// PCollection<K,V> icol with privacy parameters (ε,δ), and output in one or
// several PCollections (ocol1, ocol2, ocol3). Let f be the corresponding
// randomized transformation, associating icol with (ocol1, ocol2, ocol3). Then
// f is (ε,δ)-differentially private in the following sense. Let icol' be the
// PCollection obtained by removing all records associated with a given value of
// K in icol. Then, for any set S of possible outputs:
//
// P[f(icol) ∈ S] ≤ exp(ε) * P[f(icol') ∈ S] + δ.
//
// The K, in the example above, is userID, representing a user identifier. This
// means that the full list of contributions of any given user is protected. However, this does not need
// to be the case; the protected property might be different than a user
// identifier. In this library, we use the more general terminology of "privacy
// unit" to refer to the type of this identifier (for example, user ID, event
// ID, a pair (user ID, day)); and "privacy identifier" to refer to a
// particular instance of this identifier (for example, user n°4217, event n°99,
// or the pair (user n°4127,2020-06-24)).
//
// Note that the interface contract of PrivatePCollection has limitations. this
// library assumes that the user of the library is trusted with access to the
// underlying raw data. This intended user is a well-meaning developer trying to
// produce anonymized metrics about data using differential privacy. The API
// tries to make it easy to anonymize metrics that are safe to publish to
// untrusted parties; and difficult to break the differential privacy privacy
// guarantees by mistake.
//
// However, this API does not attempt to protect against malicious library
// users. In particular, nothing prevents a user of this library from adding a
// side-effect to a ParDo function to leak raw data and bypass differential
// privacy guarantees. Similarly, ParDo functions are allowed to return errors
// that crash the pipeline, which could be abused to leak raw data. There is no
// protection against timing or side-channel attacks, as we assume that the only
// thing malicious users have access to is the output data.
package pbeam
import (
"bytes"
"fmt"
"math"
"reflect"
"strings"
"sync"
log "github.com/golang/glog"
"github.com/google/differential-privacy/go/v2/noise"
"github.com/google/differential-privacy/privacy-on-beam/v2/internal/kv"
"github.com/google/differential-privacy/privacy-on-beam/v2/internal/testoption"
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protoreflect"
)
func init() {
beam.RegisterType(reflect.TypeOf((*extractProtoFieldFn)(nil)))
beam.RegisterType(reflect.TypeOf((*extractStructFieldFn)(nil)))
// TODO: add tests to make sure we don't forget anything here
}
// PrivacySpec contains information about the privacy parameters used in
// a PrivatePCollection. It encapsulates a privacy budget that must be shared
// between all aggregations on PrivatePCollections using this PrivacySpec. If
// you have multiple pipelines in the same binary, and want them to use
// different privacy budgets, call NewPrivacySpec multiple times and give a
// different PrivacySpec to each PrivatePCollection.
type PrivacySpec struct {
epsilon float64 // ε budget available for this PrivatePCollection.
delta float64 // δ budget available for this PrivatePCollection.
partiallyConsumed bool // Whether some privacy budget has already been consumed from this PrivacySpec.
testMode testMode // Used for test pipelines, disabled by default.
mux sync.Mutex
}
// getBudget computes the differential privacy budget (ε,δ) to consume
// from a PrivacySpec. If epsilon and delta are 0, it gets the entire
// budget, which is only possible if this is the first time its budget
// is to be consumed.
//
// Returns the budget to consume.
//
// Warning: use consumeBudget to actually consume the budget.
func (ps *PrivacySpec) getBudget(epsilon, delta float64) (eps, del float64, err error) {
ps.mux.Lock()
defer ps.mux.Unlock()
return ps.getBudgetThreadUnsafe(epsilon, delta)
}
// getBudgetThreadUnsafe is not thread-safe and should not be used directly. Instead, use getBudget
// or consumeBudget.
func (ps *PrivacySpec) getBudgetThreadUnsafe(epsilon, delta float64) (eps, del float64, err error) {
if epsilon == 0 && delta == 0 {
return ps.getEntireBudget()
}
return ps.getPartialBudget(epsilon, delta)
}
// consumeBudget consumes a differential privacy budget (ε,δ) from a
// PrivacySpec. If epsilon and delta are 0, it consumes the entire budget,
// which is only possible if this is the first time its budget is consumed.
// Returns the budget consumed.
func (ps *PrivacySpec) consumeBudget(epsilon, delta float64) (eps, del float64, err error) {
ps.mux.Lock()
defer ps.mux.Unlock()
eps, del, err = ps.getBudgetThreadUnsafe(epsilon, delta)
ps.epsilon = ps.epsilon - eps
ps.delta = ps.delta - del
ps.partiallyConsumed = true
return eps, del, err
}
func (ps *PrivacySpec) getEntireBudget() (eps, del float64, err error) {
if ps.partiallyConsumed {
return 0, 0, fmt.Errorf("trying to consume entire budget of PrivacySpec, but it has already been partially or fully consumed: %+v ", ps)
}
return ps.epsilon, ps.delta, nil
}
func (ps *PrivacySpec) getPartialBudget(epsilon, delta float64) (eps, del float64, err error) {
if budgetSlightlyTooLarge(ps.epsilon, epsilon) {
log.Infof("corrected rounding error for epsilon budget allocation (requested: %f, available: %f, difference: %e)", epsilon, ps.epsilon, epsilon-ps.epsilon)
epsilon = ps.epsilon
}
if budgetSlightlyTooLarge(ps.delta, delta) {
log.Infof("corrected rounding error for delta budget allocation (requested: %e, available: %e, difference: %e)", epsilon, ps.epsilon, epsilon-ps.epsilon)
delta = ps.delta
}
if ps.epsilon < epsilon || ps.delta < delta {
return 0, 0, fmt.Errorf("not enough budget left for PrivacySpec: trying to consume epsilon=%f and delta=%e out of remaining epsilon=%f and delta=%e. Did you forget to split your budget among aggregations?", epsilon, delta, ps.epsilon, ps.delta)
}
return epsilon, delta, nil
}
// Relative tolerance of the budget that is assumed to be a rounding error and
// will consume all remaining budget.
const eqBudgetRelTol = 1e9
// budgetSlightlyTooLarge returns true if and only if requested is slightly larger
// than remaining, i.e. requested is larger by remaining up to a rounding error
// (computed as remaining/eqBudgetRelTol).
func budgetSlightlyTooLarge(remaining, requested float64) bool {
diff := remaining - requested
if diff >= 0 {
return false
}
return math.Abs(diff) <= remaining/eqBudgetRelTol
}
// PrivacySpecOption is used for customizing PrivacySpecs. In the typical use
// case, PrivacySpecOptions are passed into the NewPrivacySpec constructor to
// create a further customized PrivacySpec.
type PrivacySpecOption interface{}
func evaluatePrivacySpecOption(opt PrivacySpecOption, spec *PrivacySpec) {
switch opt {
case testoption.EnableNoNoiseWithContributionBounding{}:
spec.testMode = noNoiseWithContributionBounding
case testoption.EnableNoNoiseWithoutContributionBounding{}:
spec.testMode = noNoiseWithoutContributionBounding
}
}
// getMaxPartitionsContributed returns a maxPartitionsContributed parameter
// if it greater than zero, otherwise it fails.
func getMaxPartitionsContributed(spec *PrivacySpec, maxPartitionsContributed int64) (int64, error) {
if maxPartitionsContributed <= 0 {
return 0, fmt.Errorf("MaxPartitionsContributed must be set to a positive value, was %d instead.", maxPartitionsContributed)
}
return maxPartitionsContributed, nil
}
// NoiseKind represents the kind of noise to be used in an aggregations.
type NoiseKind interface {
toNoiseKind() noise.Kind
}
// GaussianNoise is an aggregations param that makes them use Gaussian Noise.
type GaussianNoise struct{}
func (gn GaussianNoise) toNoiseKind() noise.Kind {
return noise.GaussianNoise
}
// LaplaceNoise is an aggregations param that makes them use Laplace Noise.
type LaplaceNoise struct{}
func (ln LaplaceNoise) toNoiseKind() noise.Kind {
return noise.LaplaceNoise
}
// NewPrivacySpec creates a new PrivacySpec with the specified privacy budget
// and options.
//
// The epsilon and delta arguments are the total (ε,δ)-differential privacy
// budget for the pipeline. If there is only one aggregation, the entire budget
// will be used for this aggregation. Otherwise, the user must specify how the
// privacy budget is split across aggregations.
func NewPrivacySpec(epsilon, delta float64, options ...PrivacySpecOption) *PrivacySpec {
ps := &PrivacySpec{
epsilon: epsilon,
delta: delta,
}
for _, opt := range options {
evaluatePrivacySpecOption(opt, ps)
}
return ps
}
// A PrivatePCollection embeds a PCollection, associating each element to a
// privacy identifier, and ensures that its content can only be written to a
// sink after being anonymized using differentially private aggregations.
//
// We call "privacy identifier" the value of the identifier associated with a
// record (e.g. 62934947), and "privacy unit" the semantic type of this
// identifier (e.g. "user ID"). Typical choices for privacy units include user
// IDs or session IDs. This choice determines the privacy unit protected by
// differential privacy. For example, if the privacy unit is user ID, then the
// output of aggregations will be (ε,δ)-indistinguishable from the output
// obtained via PrivatePCollection in which all records associated with a
// single user ID have been removed, or modified.
//
// Some operations on PCollections are also available on PrivatePCollection,
// for example a limited subset of ParDo operations. They transparently
// propagate privacy identifiers, preserving the privacy guarantees of the
// PrivatePCollection.
type PrivatePCollection struct {
// PCollection<ID,X>, where ID is the privacy unit
col beam.PCollection
// If this PrivatePCollection is of <K,V> type, we store each pair as a
// kv.Pair; and this is the codec that can be used to decode it.
codec *kv.Codec
// Privacy budget and parameters attached to this PrivatePCollection
privacySpec *PrivacySpec
}
// MakePrivate transforms a PCollection<K,V> into a PrivatePCollection<V>,
// where <K> is the privacy unit.
func MakePrivate(_ beam.Scope, col beam.PCollection, spec *PrivacySpec) PrivatePCollection {
if !typex.IsKV(col.Type()) {
log.Fatalf("MakePrivate: PCollection col=%v must be of KV type", col)
}
return PrivatePCollection{
col: col,
privacySpec: spec,
}
}
// MakePrivateFromStruct creates a PrivatePCollection from a PCollection of
// structs and the qualified path (seperated by ".") of the struct field to
// use as a privacy key.
// For example:
//
// type exampleStruct1 struct {
// IntField int
// StructField exampleStruct2
// }
//
// type exampleStruct2 struct {
// StringField string
// }
//
// If col is a PCollection of exampleStruct1, you could use "IntField" or
// "StructField.StringField" as idFieldPath.
//
// Caution
//
// The privacy key field must be a simple type (e.g. int, string, etc.), or
// a pointer to a simple type and all its parents must be structs or
// pointers to structs.
//
// If the privacy key field is not set, all elements without a set field
// will be attributed to the same (default) privacy unit, likely degrading utility
// of future DP aggregations. Similarly, if the idFieldPath or any of its
// parents are nil, those elements will be attributed to the same (default)
// privacy unit as well.
func MakePrivateFromStruct(s beam.Scope, col beam.PCollection, spec *PrivacySpec, idFieldPath string) PrivatePCollection {
s = s.Scope("pbeam.MakePrivateFromStruct")
msgTypex := col.Type()
if typex.IsKV(msgTypex) {
log.Fatalf("MakePrivateFromStruct: PCollection col=%v cannot be of KV type", col)
}
msgType := msgTypex.Type()
if msgType.Kind() != reflect.Struct {
log.Fatalf("MakePrivateFromStruct: PCollection col=%v must be composed of structs", col)
}
extractFn := &extractStructFieldFn{IDFieldPath: idFieldPath}
return PrivatePCollection{
col: beam.ParDo(s, extractFn, col),
privacySpec: spec,
}
}
type extractStructFieldFn struct {
IDFieldPath string
}
func (ext *extractStructFieldFn) ProcessElement(v beam.V) (string, beam.V, error) {
idField, err := ext.getIDField(v)
if err != nil {
return "", nil, fmt.Errorf("couldn't retrieve ID field %s: %v", ext.IDFieldPath, err)
}
// We use %#v to guarantee two different keys map to different strings
return fmt.Sprintf("%#v", idField), v, nil
}
// getIDField retrieves the ID field (specified by the IDFieldPath) from
// struct or pointer to a struct s.
func (ext *extractStructFieldFn) getIDField(s interface{}) (interface{}, error) {
subFieldNames := strings.Split(ext.IDFieldPath, ".")
subField := reflect.ValueOf(s)
var subFieldPath bytes.Buffer
for _, subFieldName := range subFieldNames {
subField = ext.getPointedValue(subField) // Retrieve the pointed value if subField is a pointer, no-op otherwise.
if subField.Kind() != reflect.Struct {
return nil, fmt.Errorf("%s (%v) should be a struct or a pointer to a struct", subFieldPath.String(), subField.Kind())
}
subField = subField.FieldByName(subFieldName)
subFieldPath.WriteString(subFieldName + ".")
if !subField.IsValid() {
return nil, fmt.Errorf("no such field %s (%v) in s", subFieldPath.String(), subField.Kind())
}
}
subField = ext.getPointedValue(subField) // Retrieve the pointed value if subField is a pointer, no-op otherwise.
if err := ext.checkSimpleType(subField); err != nil {
return nil, err
}
// TODO Set the ID field to default value.
return subField.Interface(), nil
}
// getPointedValue returns the value pointed by v if v is a pointer. If v is nil,
// it returns the default value for the type pointed by v. If v is not a pointer,
// it returns v.
func (ext *extractStructFieldFn) getPointedValue(v reflect.Value) reflect.Value {
zeroVal := reflect.Value{}
if reflect.Indirect(v) != zeroVal {
return reflect.Indirect(v)
}
return reflect.Zero(v.Type().Elem())
}
func (ext *extractStructFieldFn) checkSimpleType(v reflect.Value) error {
switch v.Kind() {
case reflect.Bool, reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64, reflect.Uint,
reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64, reflect.Float32, reflect.Float64,
reflect.Complex64, reflect.Complex128, reflect.String:
return nil
default:
return fmt.Errorf("ID field must be a simple type (e.g. int, string), got type %v instead", v.Kind())
}
}
// MakePrivateFromProto creates a PrivatePCollection from a PCollection of
// proto messages and the qualified name of the field to use as a privacy key.
// The field and all its parents must be non-repeated, and the field itself
// cannot be a submessage.
func MakePrivateFromProto(s beam.Scope, col beam.PCollection, spec *PrivacySpec, idFieldPath string) PrivatePCollection {
s = s.Scope("pbeam.MakePrivateFromProto")
msgTypex := col.Type()
if typex.IsKV(msgTypex) {
log.Fatalf("MakePrivateFromProto: PCollection col=%v cannot be of KV type", col)
}
msgType := msgTypex.Type()
var sampleMessage proto.Message
if !msgType.Implements(reflect.TypeOf(&sampleMessage).Elem()) {
log.Fatalf("MakePrivateFromProto: PCollection col=%v must be composed of proto messages", col)
}
extractFn := &extractProtoFieldFn{
IDFieldPath: idFieldPath,
MsgType: beam.EncodedType{msgType},
}
return PrivatePCollection{
col: beam.ParDo(s, extractFn, col),
privacySpec: spec,
}
}
type extractProtoFieldFn struct {
IDFieldPath string
MsgType beam.EncodedType
desc protoreflect.MessageDescriptor
}
func (ext *extractProtoFieldFn) ProcessElement(v beam.V) (string, beam.V, error) {
pb := v.(proto.Message)
reflectPb := pb.ProtoReflect()
// If ext.desc hasn't been initialized, initialize it now.
if ext.desc == nil {
ext.desc = reflectPb.Descriptor()
}
idField, err := ext.extractField(reflectPb)
if err != nil {
return "", nil, fmt.Errorf("couldn't extract field %s from proto: %w", ext.IDFieldPath, err)
}
out := reflectPb.Interface()
return fmt.Sprint(idField), out, nil
}
// extractProtoField retrieves the value of a protoreflect.Message field based on
// its fully qualified name, and deletes this field from the original message.
// It fails if the field is a submessage, if it is repeated, or if any of its
// parents are repeated.
func (ext *extractProtoFieldFn) extractField(pb protoreflect.Message) (interface{}, error) {
parts := strings.Split(ext.IDFieldPath, ".")
curPb := pb
curDesc := ext.desc
for i, part := range parts {
fieldDesc := curDesc.Fields().ByName((protoreflect.Name)(part))
if fieldDesc == nil {
return nil, fmt.Errorf("couldn't get field %s from the proto message", strings.Join(parts[:i+1], "."))
}
switch {
case fieldDesc.Cardinality() == protoreflect.Repeated:
return nil, fmt.Errorf("repeated field %s found in the proto message", strings.Join(parts[:i+1], "."))
case fieldDesc.Kind() == protoreflect.MessageKind || fieldDesc.Kind() == protoreflect.GroupKind:
// Continue looking into subfields.
curDesc = fieldDesc.Message()
if curPb.Has(fieldDesc) {
curPb = curPb.Get(fieldDesc).Message()
} else {
curPb = curPb.NewField(fieldDesc).Message()
}
default:
// Remove and return the (value) field as a string.
value := curPb.Get(fieldDesc).String()
// TODO Remove the ID field.
return value, nil
}
}
return nil, fmt.Errorf("submessage field %s found in the proto message", ext.IDFieldPath)
}