blob: a8b6bbc764684e45cca46e0f4612de177d9109e3 [file] [log] [blame]
# Copyright 2024 Google LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Implements an (epsilon,delta)-DP test using hockey-stick divergence.
Specifically, tries to estimate the hockey stick divergence between the
empirical distributions of the output of a mechanism on neighboring dataset
using its dual formulation as the weighted accuracy of a classifier.
"""
from typing import Tuple
from absl import logging
import numpy as np
import tensorflow as tf
from typing_extensions import override
from dp_auditorium.configs import privacy_property
from dp_auditorium.configs import property_tester_config
from dp_auditorium.testers import divergence_tester
from dp_auditorium.testers import property_tester_utils
def make_default_hs_training_config() -> property_tester_config.TrainingConfig:
return property_tester_config.TrainingConfig(
training_epochs=2,
optimizer_learning_rate=1e-2,
batch_size=100,
verbose=0,
)
def make_default_hs_base_model() -> tf.keras.Model:
return tf.keras.Sequential([
tf.keras.layers.Dense(12, activation="relu"),
tf.keras.layers.Dense(12, activation="relu"),
tf.keras.layers.Dense(12, activation="relu"),
tf.keras.layers.Dense(1),
])
# Helper functions and classes for the HockeyStickDivergenceTester
def _get_accuracy_confidence_bound(
range_bound: float, n_samples: int, confidence: float = 0.95,
) -> float:
r"""Returns a confidence bound on the estimate of P(h(X) = y).
Uses Hoeffding's inequality to estimate this using
\frac{1}{n} \sum_{i=1}^n {h(X_i) = Y_i}.
Args:
range_bound: a bound on the length of the range on estimated values.
n_samples: Number of samples used in the estimate.
confidence: The level of confidence we want the estimate to have.
Returns:
The one-sided confidence error around the estimate.
"""
delta = 1.0 - confidence
return range_bound * np.sqrt(np.log(1.0 / delta) / 2.0 / n_samples)
class HockeyStickPropertyTester(divergence_tester.DivergencePropertyTester):
r"""Uses a model to estimate divergence between the outputs of a mechanism.
Specifically, given two neighboring datasets D_0, D_1 and epsilon. Generates
samples (X_i, Y_i) as follows Y_i \in {0, 1} where Y_i = 0
w.p. 1/(1 + e^epsilon)and 1 otherwise. X_i ~ Mechansim(D_{Y_i}). The model
tries to distinguish between "positive" and "negative" examples. A mechanism
is (epsilon,delta) DP if and only if the accuracy of a classifier in this
dataset is less than (e^epsilon + delta) / (1 + e^epsilon). The hockey stick
divergence corresponds to delta.
NOTE: This property tester overrides any user-specified value of
config.training_config.model_output_coordinate_bound with 1.0
for the sake of validity and efficiency.
Attributes:
_base_model: A keras model that discriminates between samples generated by a
mechanism ran on two different datasets. The base_model passed into this
class must return logits.
_epsilon: The epsilon in the (epsilon, delta) guarantee the mechanism is
supposed to satisfy.
"""
def __init__(
self,
config: property_tester_config.HockeyStickPropertyTesterConfig,
base_model: tf.keras.Model,
):
"""Initializes the instance.
Args:
config: Configuration for initializing property tester.
base_model: A keras model that discriminates between samples generated by
a mechanism ran on two different datasets. The base_model passed into
this class must return logits.
"""
property_tester_utils.validate_approximate_dp_property(
config.approximate_dp
)
# This constant defines the maximum output value of a `base_model` and is
# used to get confidence intervals for the lower bound of the divergence.
# We set it here to 1.0 given that the tester optimizes for a binary
# classification task.
logging.info(
"Overwriting `model_output_coordinate_bound`; the validity and efficacy"
" of the test is optimized for `model_output_coordinate_bound=1.0`"
)
config.training_config.model_output_coordinate_bound = 1.0
property_tester_utils.validate_training_config(config.training_config)
self._model_coordinate_bound = (
config.training_config.model_output_coordinate_bound
)
self._base_model = base_model
self._epsilon = config.approximate_dp.epsilon
self._delta = config.approximate_dp.delta
self._approximate_dp = config.approximate_dp
self._training_options = config.training_config
self._evaluation_batch_size = config.evaluation_batch_size
@property
def _test_threshold(self) -> float:
return self._delta
@property
def privacy_property(self) -> privacy_property.PrivacyProperty:
"""The privacy guarantee that the tester is being used to test for."""
return privacy_property.PrivacyProperty(approximate_dp=self._approximate_dp)
def _generate_inputs_to_model(
self,
samples1: np.ndarray,
samples2: np.ndarray,
) -> Tuple[np.ndarray, np.ndarray]:
"""Generates inputs to keras classifiers.
Generates a weighted sample for the input to a classifer.
It takes the first 1/(e^epsilon + 1)samples from samples1 and labels
them as 0, it uses the last e^epsilon/(e^epsilon + 1) samples from
samples2 and labels them as 1.
Args:
samples1: First set of samples.
samples2: Second set of samples.
Returns:
Features and labels as described above, where features correspond to the
output of the mechanism.
Raises:
ValueError if the ranks of sample1 and sample 2 are not equal.
"""
sample_cutoff_fraction = 1.0 / (np.exp(self._epsilon) + 1.0)
if len(samples1.shape) != len(samples2.shape):
raise ValueError(f"""Mechanism outputs on dataset 1 and dataset 2 should
have the same rank got {samples1.shape} and
{samples2.shape}""")
if len(samples1.shape) == 1:
samples1 = samples1.reshape(-1, 1)
samples2 = samples2.reshape(-1, 1)
samples1_final_ix = int(sample_cutoff_fraction * len(samples1))
samples2_initial_ix = int(sample_cutoff_fraction * len(samples2))
samples1 = samples1[0:samples1_final_ix, ...]
samples2 = samples2[samples2_initial_ix:, ...]
labels_1 = np.zeros((samples1.shape[0], 1))
labels_2 = np.ones((samples2.shape[0], 1))
features = np.concatenate([samples1, samples2], axis=0)
labels = np.concatenate([labels_1, labels_2], axis=0)
return features, labels
@override
def _get_optimized_divergence_estimation_model(
self,
samples_first_distribution: np.ndarray,
samples_second_distribution: np.ndarray,
):
model = tf.keras.models.clone_model(self._base_model)
model.compile(
optimizer=tf.keras.optimizers.Adam(
self._training_options.optimizer_learning_rate
),
loss=tf.keras.losses.BinaryCrossentropy(from_logits=True),
metrics=tf.keras.metrics.BinaryAccuracy(threshold=0.0),
)
features, labels = self._generate_inputs_to_model(
samples_first_distribution,
samples_second_distribution,
)
model.fit(
features,
labels,
shuffle=True,
epochs=self._training_options.training_epochs,
batch_size=self._training_options.batch_size,
verbose=self._training_options.verbose,
)
return model
@override
def _compute_divergence_on_samples(
self,
model: tf.keras.Model,
samples_first_distribution: np.ndarray,
samples_second_distribution: np.ndarray,
failure_probability: float,
) -> float:
features, labels = self._generate_inputs_to_model(
samples_first_distribution, samples_second_distribution
)
accuracy = model.evaluate(
features, labels, batch_size=self._evaluation_batch_size
)[1]
test_sample_size = min(
samples_first_distribution.shape[0],
samples_second_distribution.shape[0],
)
accuracy -= _get_accuracy_confidence_bound(
self._model_coordinate_bound,
test_sample_size,
1.0 - failure_probability,
)
hs_divergence = accuracy * (1.0 + np.exp(self._epsilon)) - np.exp(
self._epsilon
)
return hs_divergence