blob: a66aa04b0bc2c5d8147e1d899d51b3f7be5dd901 [file]
/*
* Copyright 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* An end-to-end example how to compute DP metrics on a Netflix dataset using the library.
*
* <p>See README for details including how to run the example.
*/
package com.google.privacy.differentialprivacy.pipelinedp4j.examples;
import static java.lang.Math.round;
import static java.util.stream.Collectors.toCollection;
import com.google.privacy.differentialprivacy.pipelinedp4j.api.NoiseKind;
import com.google.privacy.differentialprivacy.pipelinedp4j.api.QueryBuilder;
import com.google.privacy.differentialprivacy.pipelinedp4j.api.QueryPerGroupResult;
import com.google.privacy.differentialprivacy.pipelinedp4j.api.TotalBudget;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.stream.IntStream;
import kotlin.jvm.functions.Function1;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.PCollection;
import picocli.CommandLine;
import picocli.CommandLine.Command;
import picocli.CommandLine.Option;
/**
* An end-to-end example how to compute DP metrics on a Netflix dataset using the library on Beam.
*
* <p>See README for details including how to run the example.
*/
@Command(
name = "BeamExample",
version = {"BeamExample 1.0"},
mixinStandardHelpOptions = true)
public class BeamExample implements Runnable {
@Option(
names = "--use-public-groups",
description =
"If true we will assume in the example that movie ids are publicly known and are from "
+ "4500 to 4509"
+ ". Default is false, i.e. we will choose movie ids in a differentially"
+ " private way.",
defaultValue = "false")
private boolean usePublicGroups = false;
@Option(
names = "--local-input-file-path",
description =
"Input file. For using as input file you can download data from"
+ " https://www.kaggle.com/datasets/netflix-inc/netflix-prize-data. Use only part of"
+ " it to speed up the calculations.",
required = true)
private String localInputFilePath;
@Option(
names = "--local-output-file-path",
description = "Output file.",
defaultValue = "/tmp/anonymized_output.txt")
private String localOutputFilePath;
public static void main(String[] args) {
int exitCode = new CommandLine(new BeamExample()).execute(args);
System.exit(exitCode);
}
@Override
public void run() {
System.out.println("Starting calculations...");
var pipeline = initBeam();
// Read the input data, these are movie views that contain movie id, user id and rating.
PCollection<MovieView> data = readData(pipeline);
// Define the query
var query =
QueryBuilder.from(data, /* privacyIdExtractor= */ new UserIdExtractor())
.groupBy(
/* groupKeyExtractor= */ new MovieIdExtractor(),
/* maxGroupsContributed= */ 3,
/* maxContributionsPerGroup= */ 1,
usePublicGroups ? publiclyKnownMovieIds(pipeline) : null)
.countDistinctPrivacyUnits("numberOfViewers")
.count(/* outputColumnName= */ "numberOfViews")
.mean(
new RatingExtractor(),
/* minValue= */ 1.0,
/* maxValue= */ 5.0,
/* outputColumnName= */ "averageOfRatings",
/* budget= */ null)
.build();
// Run the query with DP parameters.
PCollection<QueryPerGroupResult> result =
query.run(new TotalBudget(/* epsilon= */ 1.1, /* delta= */ 1e-10), NoiseKind.LAPLACE);
// Convert the result to better representation, i.e. to MovieMetrics.
var movieMetricsCoder = AvroCoder.of(MovieMetrics.class);
SerializableFunction<QueryPerGroupResult, MovieMetrics> mapToMovieMetricsFn =
perGroupResult -> {
String movieId = perGroupResult.getGroupKey();
long numberOfViewers =
round(perGroupResult.getAggregationResults().get("numberOfViewers"));
long numberOfViews = round(perGroupResult.getAggregationResults().get("numberOfViews"));
double averageOfRatings = perGroupResult.getAggregationResults().get("averageOfRatings");
return new MovieMetrics(movieId, numberOfViewers, numberOfViews, averageOfRatings);
};
// We now have our anonymized metrics of movie views.
PCollection<MovieMetrics> anonymizedMovieMetrics =
result
.apply(
"Map query result to MovieMetrics",
MapElements.into(movieMetricsCoder.getEncodedTypeDescriptor())
.via(mapToMovieMetricsFn))
.setCoder(movieMetricsCoder);
// Save the result to a file.
writeOutput(anonymizedMovieMetrics);
// Run the scheduled calculations in the pipeline.
pipeline.run().waitUntilFinish();
System.out.println("Finished calculations.");
}
// Data extractors. They always have to implement Function1 and Serializable interfaces. If it
// doesn't implement Serializable interface, it will fail on Beam. If it doesn't implement
// Function1, it will at compile time due to types mismatch. Do not use lambdas for data
// extractors as they won't be serializable.
static class UserIdExtractor implements Function1<MovieView, String>, Serializable {
@Override
public String invoke(MovieView movieView) {
return movieView.getUserId();
}
}
static class MovieIdExtractor implements Function1<MovieView, String>, Serializable {
@Override
public String invoke(MovieView movieView) {
return movieView.getMovieId();
}
}
static class RatingExtractor implements Function1<MovieView, Double>, Serializable {
@Override
public Double invoke(MovieView movieView) {
return movieView.getRating();
}
}
private static Pipeline initBeam() {
var options = PipelineOptionsFactory.create();
return Pipeline.create(options);
}
private PCollection<MovieView> readData(Pipeline pipeline) {
PCollection<String> inputPCollection =
pipeline.apply("Read input", TextIO.read().from(localInputFilePath));
var coder = AvroCoder.of(MovieView.class);
SerializableFunction<String, MovieView> parseFunction = MovieView::parseView;
return inputPCollection
.apply("Parse input", MapElements.into(coder.getEncodedTypeDescriptor()).via(parseFunction))
.setCoder(coder);
}
/**
* Movie ids (which are group keys for this dataset) are integers from 1 to ~17000. Set public
* groups 1-10.
*/
private static PCollection<String> publiclyKnownMovieIds(Pipeline pipeline) {
var publicGroupsAsJavaList =
IntStream.rangeClosed(
4500, 4509
)
.mapToObj(Integer::toString)
.collect(toCollection(ArrayList::new));
return pipeline.apply("Create public groups", Create.of(publicGroupsAsJavaList));
}
private void writeOutput(PCollection<MovieMetrics> result) {
SerializableFunction<MovieMetrics, String> toStringFunction = MovieMetrics::toString;
var lines =
result.apply(
"Map MovieMetrics to string",
MapElements.into(StringUtf8Coder.of().getEncodedTypeDescriptor())
.via(toStringFunction));
lines.apply("Write output to file", TextIO.write().withoutSharding().to(localOutputFilePath));
}
}