blob: 126fc5ae7012ff981c4d990748bc11bfaedec3dd [file] [log] [blame]
# Copyright 2024 Google LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Renyi divergence calculators.
Functions to estimate Renyi divergence between samples of two distributions.
"""
from typing import Dict
import numpy as np
import tensorflow as tf
from typing_extensions import override
from dp_auditorium.configs import privacy_property
from dp_auditorium.configs import property_tester_config
from dp_auditorium.testers import divergence_tester
from dp_auditorium.testers import property_tester_utils
def make_default_renyi_base_model() -> tf.keras.Model:
return tf.keras.models.Sequential([
tf.keras.layers.Dense(100, activation=tf.keras.activations.tanh),
tf.keras.layers.Dense(100, activation=tf.keras.activations.tanh),
tf.keras.layers.Dense(1),
])
def _compute_error_from_samples(
num_samples: int,
failure_probability: float,
model_output_coordinate_bound: float,
alpha: float,
) -> float:
"""Returns error tolerance from number of samples.
Args:
num_samples: Number of used samples to estimate divergence.
failure_probability: Probability of failing the Chernoff bound.
model_output_coordinate_bound: Constant that bounds function class to
estimate Renyi divergence.
alpha: Order of Renyi divergence.
"""
error_1 = np.sqrt(
3
* np.log(2 / failure_probability)
* np.exp(2 * (alpha - 1) * model_output_coordinate_bound)
/ num_samples
)
error_2 = np.sqrt(
2
* np.log(2 / failure_probability)
* np.exp(2 * alpha * model_output_coordinate_bound)
/ num_samples
)
gamma = max(error_1, error_2)
error_from_gamma = np.log((1 + gamma) / (1 - gamma))
return error_from_gamma
class RenyiModel(tf.keras.Model):
"""Model to estimate Renyi Divergence using variational formulation."""
def __init__(self, nn_model, alpha):
super().__init__()
self.nn_model = nn_model
self.alpha = alpha
def train_step(
self, data: tuple[np.ndarray, np.ndarray]
) -> Dict[str, tf.Tensor]:
with tf.GradientTape() as tape:
divergence = self(data, training=True)
loss = -divergence
trainable_vars = self.nn_model.trainable_variables
d_loss = tape.gradient(loss, trainable_vars)
self.optimizer.apply_gradients(zip(d_loss, trainable_vars))
return {'divergence': divergence}
def call(
self, data: tuple[np.ndarray, np.ndarray], training: bool = None
) -> tf.Tensor:
"""Estimate renyi divergence from samples and current nn_model.
This function unpacks samples x_p and x_q from two distributions and uses a
function parametrized by nn_model to estimate the renyi divergence following
the variational representation in https://arxiv.org/abs/2007.03814. Letting
t1 an t2 be the average of exp(nn_model(x_p) and exp(nn_model(x_q))
respectively, we estimate the divergence as alpha/(alpha-1)log(t1) -
log(t2).
Args:
data: tuple of two float arrays with samples from two distributions.
training: indicates if the model is being used for training or inference.
Returns:
estimated divergence with current nn_model parameters.
"""
x_p, x_q = data
g_p = self.nn_model(x_p, training=training)
g_q = self.nn_model(x_q, training=training)
t1 = tf.math.reduce_mean(tf.exp((self.alpha - 1) * g_p))
t2 = tf.math.reduce_mean(tf.exp(self.alpha * g_q))
divergence = (self.alpha / (self.alpha - 1)) * tf.math.log(
t1
) - tf.math.log(t2)
return divergence
class RenyiPropertyTester(divergence_tester.DivergencePropertyTester):
"""Renyi tester main class.
RenyiTester computes a lower bound for the Renyi divergence using Algorithm 2
in https://arxiv.org/abs/2307.05608. It computes a lower bound for the Renyi
divergence first using train samples to find a suitable function parametrized
by `renyi_model`. Then it uses test samples to estimate the lower end point of
a confidence interval for the divergence.
"""
def __init__(
self,
config: property_tester_config.RenyiPropertyTesterConfig,
base_model: tf.keras.Model,
):
# Get privacy parameters
if config.privacy_property.renyi_dp is not None:
property_tester_utils.validate_renyi_dp_property(
config.privacy_property.renyi_dp
)
privacy_type = 'renyi_dp'
epsilon = config.privacy_property.renyi_dp.epsilon
alpha = config.privacy_property.renyi_dp.alpha
if config.alpha != alpha:
raise ValueError(
'Alpha parameter for Renyi DP should be specified in'
' privacy_tester_config.privacy_property. It was specified in'
' config.alpha which is only used for Pure DP tests.'
)
elif config.privacy_property.pure_dp is not None:
property_tester_utils.validate_pure_dp_property(
config.privacy_property.pure_dp
)
privacy_type = 'pure_dp'
epsilon = config.privacy_property.pure_dp.epsilon
alpha = config.alpha
else:
raise ValueError(
'The specified privacy_property is not supported by'
' RenyiPropertyTester.'
)
property_tester_utils.validate_training_config(config.training_config)
if privacy_type == 'renyi_dp':
self._initial_test_threshold = epsilon
else:
self._initial_test_threshold = min(epsilon, 2 * alpha * epsilon**2)
self._tested_property = config.privacy_property
self._alpha = alpha
self._training_config = config.training_config
self._model_output_coordinate_bound = (
config.training_config.model_output_coordinate_bound
)
def scaled_tanh(x):
return self._model_output_coordinate_bound * tf.keras.activations.tanh(x)
base_model.add(tf.keras.layers.Activation(scaled_tanh))
self._renyi_model = RenyiModel(base_model, self._alpha)
self._renyi_model.compile(
optimizer=tf.keras.optimizers.Adam(
config.training_config.optimizer_learning_rate
),
)
self._divergence_train = []
@property
def _test_threshold(self) -> float:
return self._initial_test_threshold
@property
def privacy_property(self) -> privacy_property.PrivacyProperty:
return self._tested_property
def _reset_model_weights(self):
for layer in self._renyi_model.nn_model.layers:
if hasattr(layer, 'kernel'):
if layer.kernel is not None and hasattr(layer, 'kernel_initializer'):
layer.kernel.assign(layer.kernel_initializer(tf.shape(layer.kernel)))
if hasattr(layer, 'bias'):
if layer.bias is not None and hasattr(layer, 'bias_initializer'):
layer.bias.assign(layer.bias_initializer(tf.shape(layer.bias)))
@override
def _get_optimized_divergence_estimation_model(
self,
samples_first_distribution: np.ndarray,
samples_second_distribution: np.ndarray,
) -> tf.keras.Model:
self._reset_model_weights()
self._renyi_model.fit(
samples_first_distribution,
samples_second_distribution,
batch_size=self._training_config.batch_size,
epochs=self._training_config.training_epochs,
verbose=self._training_config.verbose,
)
return self._renyi_model
@override
def _compute_divergence_on_samples(
self,
model: tf.keras.Model,
samples1_test: np.ndarray,
samples2_test: np.ndarray,
failure_probability: float,
) -> float:
divergence_test = model((samples1_test, samples2_test))
# Calculate lower end of confidence interval.
num_samples = min(samples1_test.shape[0], samples2_test.shape[0])
error = _compute_error_from_samples(
num_samples=num_samples,
failure_probability=failure_probability,
model_output_coordinate_bound=self._model_output_coordinate_bound,
alpha=self._alpha,
)
divergence_test_lower_bound = divergence_test - error
return divergence_test_lower_bound