blob: 46e88c155c62c502401d98db41c477b9e0bbf7da [file] [log] [blame]
//
// Copyright 2021 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
package pbeam_test
import (
"context"
"fmt"
"github.com/google/differential-privacy/privacy-on-beam/v2/pbeam"
"github.com/google/differential-privacy/privacy-on-beam/v2/pbeam/pbeamtest"
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/io/textio"
"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/direct"
)
// This example demonstrates how to write test pipelines for pbeam using package
// pbeamtest where pbeam does not add any noise, disables partition selection
// and might disable or enable contribution bounding depending on the particular
// test mode used.
//
// This mirrors the default example with two differences:
// 1. pbeamtest is used to create a PrivacySpec instead of pbeam.
// 2. Code comments are different.
//
// Package pbeamtest does not provide any privacy guarantees and is only meant
// to be used in test code. DO NOT use this for production code.
func Example_testPipelines() {
// This example computes the "Sum-up revenue per day of the week" example
// from the Go Differential Privacy Library documentation, available at
// https://github.com/google/differential-privacy/go/README.md.
//
// It assumes that the input file, "week_data.csv", has the same format as
// the data used in the above example:
// https://github.com/google/differential-privacy/go/examples/data/week_data.csv
// visit contains the data corresponding to a single restaurant visit.
type visit struct {
visitorID string
eurosSpent int
weekday int
}
// Initialize the pipeline.
beam.Init()
p := beam.NewPipeline()
s := p.Root()
// Load the data and parse each visit, ignoring parsing errors.
icol := textio.Read(s, "week_data.csv")
icol = beam.ParDo(s, func(s string, emit func(visit)) {
var visitorID string
var euros, weekday int
_, err := fmt.Sscanf(s, "%s, %d, %d", &visitorID, &euros, &weekday)
if err != nil {
return
}
emit(visit{visitorID, euros, weekday})
}, icol)
// Transform the input PCollection into a PrivatePCollection.
// ε and δ are the differential privacy parameters that quantify the privacy
// provided by the pipeline. Even though noise will not be added since we are using
// pbeamtest, ε and δ will still be used for validation of parameters; so use the
// same parameters you use for production.
const ε, δ = 1, 1e-3
// Instead of calling pbeam.NewPrivacySpec(), we call the corresponding function in
// package pbeamtest. This is the only difference with a production pipeline with
// privacy that uses pbeam.NewPrivacySpec(), everything else remains the same.
// This enables per-partition and cross-partition contribution bounding. If you
// wish to disable both types of contribution bounding altogether, use
// pbeamtest.NewPrivacySpecNoNoiseWithoutContributionBounding() instead.
privacySpec := pbeamtest.NewPrivacySpecNoNoiseWithContributionBounding(ε, δ)
pcol := pbeam.MakePrivateFromStruct(s, icol, privacySpec, "visitorID")
// pcol is now a PrivatePCollection<visit>.
// Compute a non-private sum-up revenue per weekday. To do so, we extract a
// KV pair, where the key is weekday and the value is the money spent.
pWeekdayEuros := pbeam.ParDo(s, func(v visit) (int, int) {
return v.weekday, v.eurosSpent
}, pcol)
sumParams := pbeam.SumParams{
// There is only a single differentially private aggregation in this
// pipeline, so the entire privacy budget will be consumed (ε=1 and
// δ=10⁻³). If multiple aggregations are present, we would need to
// manually specify the privacy budget used by each.
// If a visitor of the restaurant is present in more than 4 weekdays,
// some of these contributions will be randomly dropped.
// Larger values lets you keep more contributions (more of the raw data)
// but lead to more noise in the output because the noise will be scaled
// by the value. See the relevant section in the codelab for details:
// https://codelabs.developers.google.com/codelabs/privacy-on-beam/#8
MaxPartitionsContributed: 4,
// If a visitor of the restaurant spends more than 50 euros, or less
// than 0 euros, their contribution will be clamped.
// Similar to MaxPartitionsContributed, a larger interval lets you keep more
// of the raw data but lead to more noise in the output because the noise
// will be scaled by max(|MinValue|,|MaxValue|).
MinValue: 0,
MaxValue: 50,
}
// Since pbeamtest is used, this will produce a non-differentially private
// sum of revenue per day.
ocol := pbeam.SumPerKey(s, pWeekdayEuros, sumParams)
// ocol is a regular PCollection; it can be written to disk.
formatted := beam.ParDo(s, func(weekday int, sum int64) string {
return fmt.Sprintf("Weekday n°%d: total spend is %d euros", weekday, sum)
}, ocol)
textio.Write(s, "spend_per_weekday.txt", formatted)
// Execute the pipeline.
if _, err := direct.Execute(context.Background(), p); err != nil {
fmt.Printf("Pipeline failed: %v", err)
}
}