| # Copyright 2024 Google LLC. |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| """Implements an (epsilon,delta)-DP test using hockey-stick divergence. |
| |
| Specifically, tries to estimate the hockey stick divergence between the |
| empirical distributions of the output of a mechanism on neighboring dataset |
| using its dual formulation as the weighted accuracy of a classifier. |
| """ |
| |
| from typing import Tuple |
| |
| from absl import logging |
| import numpy as np |
| import tensorflow as tf |
| from typing_extensions import override |
| |
| from dp_auditorium.configs import privacy_property |
| from dp_auditorium.configs import property_tester_config |
| from dp_auditorium.testers import divergence_tester |
| from dp_auditorium.testers import property_tester_utils |
| |
| |
| def make_default_hs_training_config() -> property_tester_config.TrainingConfig: |
| return property_tester_config.TrainingConfig( |
| training_epochs=2, |
| optimizer_learning_rate=1e-2, |
| batch_size=100, |
| verbose=0, |
| ) |
| |
| |
| def make_default_hs_base_model() -> tf.keras.Model: |
| return tf.keras.Sequential([ |
| tf.keras.layers.Dense(12, activation="relu"), |
| tf.keras.layers.Dense(12, activation="relu"), |
| tf.keras.layers.Dense(12, activation="relu"), |
| tf.keras.layers.Dense(1), |
| ]) |
| |
| |
| # Helper functions and classes for the HockeyStickDivergenceTester |
| def _get_accuracy_confidence_bound( |
| range_bound: float, n_samples: int, confidence: float = 0.95, |
| ) -> float: |
| r"""Returns a confidence bound on the estimate of P(h(X) = y). |
| |
| Uses Hoeffding's inequality to estimate this using |
| \frac{1}{n} \sum_{i=1}^n {h(X_i) = Y_i}. |
| |
| Args: |
| range_bound: a bound on the length of the range on estimated values. |
| n_samples: Number of samples used in the estimate. |
| confidence: The level of confidence we want the estimate to have. |
| |
| Returns: |
| The one-sided confidence error around the estimate. |
| """ |
| delta = 1.0 - confidence |
| return range_bound * np.sqrt(np.log(1.0 / delta) / 2.0 / n_samples) |
| |
| |
| class HockeyStickPropertyTester(divergence_tester.DivergencePropertyTester): |
| r"""Uses a model to estimate divergence between the outputs of a mechanism. |
| |
| Specifically, given two neighboring datasets D_0, D_1 and epsilon. Generates |
| samples (X_i, Y_i) as follows Y_i \in {0, 1} where Y_i = 0 |
| w.p. 1/(1 + e^epsilon)and 1 otherwise. X_i ~ Mechansim(D_{Y_i}). The model |
| tries to distinguish between "positive" and "negative" examples. A mechanism |
| is (epsilon,delta) DP if and only if the accuracy of a classifier in this |
| dataset is less than (e^epsilon + delta) / (1 + e^epsilon). The hockey stick |
| divergence corresponds to delta. |
| |
| NOTE: This property tester overrides any user-specified value of |
| config.training_config.model_output_coordinate_bound with 1.0 |
| for the sake of validity and efficiency. |
| |
| Attributes: |
| _base_model: A keras model that discriminates between samples generated by a |
| mechanism ran on two different datasets. The base_model passed into this |
| class must return logits. |
| _epsilon: The epsilon in the (epsilon, delta) guarantee the mechanism is |
| supposed to satisfy. |
| """ |
| |
| def __init__( |
| self, |
| config: property_tester_config.HockeyStickPropertyTesterConfig, |
| base_model: tf.keras.Model, |
| ): |
| """Initializes the instance. |
| |
| Args: |
| config: Configuration for initializing property tester. |
| base_model: A keras model that discriminates between samples generated by |
| a mechanism ran on two different datasets. The base_model passed into |
| this class must return logits. |
| """ |
| property_tester_utils.validate_approximate_dp_property( |
| config.approximate_dp |
| ) |
| # This constant defines the maximum output value of a `base_model` and is |
| # used to get confidence intervals for the lower bound of the divergence. |
| # We set it here to 1.0 given that the tester optimizes for a binary |
| # classification task. |
| logging.info( |
| "Overwriting `model_output_coordinate_bound`; the validity and efficacy" |
| " of the test is optimized for `model_output_coordinate_bound=1.0`" |
| ) |
| config.training_config.model_output_coordinate_bound = 1.0 |
| property_tester_utils.validate_training_config(config.training_config) |
| self._model_coordinate_bound = ( |
| config.training_config.model_output_coordinate_bound |
| ) |
| self._base_model = base_model |
| self._epsilon = config.approximate_dp.epsilon |
| self._delta = config.approximate_dp.delta |
| self._approximate_dp = config.approximate_dp |
| self._training_options = config.training_config |
| self._evaluation_batch_size = config.evaluation_batch_size |
| |
| @property |
| def _test_threshold(self) -> float: |
| return self._delta |
| |
| @property |
| def privacy_property(self) -> privacy_property.PrivacyProperty: |
| """The privacy guarantee that the tester is being used to test for.""" |
| return privacy_property.PrivacyProperty(approximate_dp=self._approximate_dp) |
| |
| def _generate_inputs_to_model( |
| self, |
| samples1: np.ndarray, |
| samples2: np.ndarray, |
| ) -> Tuple[np.ndarray, np.ndarray]: |
| """Generates inputs to keras classifiers. |
| |
| Generates a weighted sample for the input to a classifer. |
| It takes the first 1/(e^epsilon + 1)samples from samples1 and labels |
| them as 0, it uses the last e^epsilon/(e^epsilon + 1) samples from |
| samples2 and labels them as 1. |
| |
| Args: |
| samples1: First set of samples. |
| samples2: Second set of samples. |
| |
| Returns: |
| Features and labels as described above, where features correspond to the |
| output of the mechanism. |
| |
| Raises: |
| ValueError if the ranks of sample1 and sample 2 are not equal. |
| """ |
| sample_cutoff_fraction = 1.0 / (np.exp(self._epsilon) + 1.0) |
| |
| if len(samples1.shape) != len(samples2.shape): |
| raise ValueError(f"""Mechanism outputs on dataset 1 and dataset 2 should |
| have the same rank got {samples1.shape} and |
| {samples2.shape}""") |
| if len(samples1.shape) == 1: |
| samples1 = samples1.reshape(-1, 1) |
| samples2 = samples2.reshape(-1, 1) |
| samples1_final_ix = int(sample_cutoff_fraction * len(samples1)) |
| samples2_initial_ix = int(sample_cutoff_fraction * len(samples2)) |
| samples1 = samples1[0:samples1_final_ix, ...] |
| samples2 = samples2[samples2_initial_ix:, ...] |
| |
| labels_1 = np.zeros((samples1.shape[0], 1)) |
| labels_2 = np.ones((samples2.shape[0], 1)) |
| features = np.concatenate([samples1, samples2], axis=0) |
| labels = np.concatenate([labels_1, labels_2], axis=0) |
| return features, labels |
| |
| @override |
| def _get_optimized_divergence_estimation_model( |
| self, |
| samples_first_distribution: np.ndarray, |
| samples_second_distribution: np.ndarray, |
| ): |
| model = tf.keras.models.clone_model(self._base_model) |
| model.compile( |
| optimizer=tf.keras.optimizers.Adam( |
| self._training_options.optimizer_learning_rate |
| ), |
| loss=tf.keras.losses.BinaryCrossentropy(from_logits=True), |
| metrics=tf.keras.metrics.BinaryAccuracy(threshold=0.0), |
| ) |
| features, labels = self._generate_inputs_to_model( |
| samples_first_distribution, |
| samples_second_distribution, |
| ) |
| model.fit( |
| features, |
| labels, |
| shuffle=True, |
| epochs=self._training_options.training_epochs, |
| batch_size=self._training_options.batch_size, |
| verbose=self._training_options.verbose, |
| ) |
| |
| return model |
| |
| @override |
| def _compute_divergence_on_samples( |
| self, |
| model: tf.keras.Model, |
| samples_first_distribution: np.ndarray, |
| samples_second_distribution: np.ndarray, |
| failure_probability: float, |
| ) -> float: |
| features, labels = self._generate_inputs_to_model( |
| samples_first_distribution, samples_second_distribution |
| ) |
| |
| accuracy = model.evaluate( |
| features, labels, batch_size=self._evaluation_batch_size |
| )[1] |
| test_sample_size = min( |
| samples_first_distribution.shape[0], |
| samples_second_distribution.shape[0], |
| ) |
| accuracy -= _get_accuracy_confidence_bound( |
| self._model_coordinate_bound, |
| test_sample_size, |
| 1.0 - failure_probability, |
| ) |
| hs_divergence = accuracy * (1.0 + np.exp(self._epsilon)) - np.exp( |
| self._epsilon |
| ) |
| return hs_divergence |