blob: f5979305d0bec4daad226cc304e01145cd203393 [file] [log] [blame]
# Copyright 2022 Google LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests for PLDPmf."""
import unittest
from absl.testing import parameterized
import numpy as np
from dp_accounting.pld import common
from dp_accounting.pld import pld_pmf
from dp_accounting.pld import test_util
class PLDPmfTest(parameterized.TestCase):
def _create_pmf(self,
discretization: float,
dense: bool,
infinity_mass: float = 0.0,
lower_loss: int = 0,
probs: np.ndarray = np.array([1.0]),
pessimistic_estimate: bool = True) -> pld_pmf.PLDPmf:
"""Helper function for creating PLD for testing."""
if dense:
return pld_pmf.DensePLDPmf(discretization, lower_loss, probs,
infinity_mass, pessimistic_estimate)
loss_probs = common.list_to_dictionary(probs, lower_loss)
return pld_pmf.SparsePLDPmf(loss_probs, discretization, infinity_mass,
pessimistic_estimate)
def _check_dense_probs(self, dense_pmf: pld_pmf.DensePLDPmf,
expected_lower_loss: int, expected_probs: np.ndarray):
"""Checks that resulting dense pmf satisfies expectations."""
self.assertEqual(expected_lower_loss, dense_pmf._lower_loss)
self.assertSequenceAlmostEqual(expected_probs, dense_pmf._probs)
def _check_sparse_probs(self, sparse_pmf: pld_pmf.SparsePLDPmf,
expected_lower_loss: int, expected_probs: np.ndarray):
"""Checks that resulting sparse pmf satisfies expectations."""
expected_loss_probs = common.list_to_dictionary(expected_probs,
expected_lower_loss)
test_util.assert_dictionary_almost_equal(self, expected_loss_probs,
sparse_pmf._loss_probs)
@parameterized.parameters(False, True)
def test_delta_for_epsilon(self, dense: bool):
discretization = 0.1
infinity_mass = 0.1
lower_loss = -1
probs = np.array([0.2, 0.3, 0, 0.4])
pmf = self._create_pmf(discretization, dense, infinity_mass, lower_loss,
probs)
self.assertAlmostEqual(0.1, pmf.get_delta_for_epsilon(3)) # infinity_mass
self.assertAlmostEqual(0.1 + 0.4 * (1 - np.exp(-0.1)),
pmf.get_delta_for_epsilon(0.1))
self.assertAlmostEqual(1, pmf.get_delta_for_epsilon(-20))
self.assertEqual(infinity_mass, pmf.get_delta_for_epsilon(np.inf))
self.assertAlmostEqual(1, pmf.get_delta_for_epsilon(-np.inf))
@parameterized.parameters(False, True)
def test_delta_for_epsilon_vectorized(self, dense: bool):
discretization = 0.1
infinity_mass = 0.1
lower_loss = -1
probs = np.array([0.2, 0.3, 0, 0.4])
pmf = self._create_pmf(discretization, dense, infinity_mass, lower_loss,
probs)
epsilon = [-np.inf, -20, 0.1, np.inf]
expected_delta = [
1, 1, infinity_mass + 0.4 * (1 - np.exp(-0.1)), infinity_mass
]
self.assertSequenceAlmostEqual(expected_delta,
pmf.get_delta_for_epsilon(epsilon))
def test_delta_for_epsilon_not_sorted(self):
pmf = self._create_pmf(discretization=0.1, dense=True, infinity_mass=0)
epsilon = [2.0, 3.0, 1.0] # not sorted
with self.assertRaisesRegex(
ValueError,
'Epsilons in get_delta_for_epsilon must be sorted in ascending order'):
pmf.get_delta_for_epsilon(epsilon)
@parameterized.parameters(False, True)
def test_get_delta_for_epsilon_for_composed_pld(self, dense):
discretization = 0.1
infinity_mass1, lower_loss1, probs1 = 0.1, -1, np.array(
[0.2, 0.3, 0, 0.1, 0.3])
infinity_mass2, lower_loss2, probs2 = 0.2, -2, np.array([0.1, 0, 0.4, 0.3])
pmf1 = self._create_pmf(discretization, dense, infinity_mass1, lower_loss1,
probs1)
pmf2 = self._create_pmf(discretization, dense, infinity_mass2, lower_loss2,
probs2)
pmf_composed = pmf1.compose(pmf2)
for epsilon in np.linspace(-10, 10, num=100):
delta1 = pmf1.get_delta_for_epsilon_for_composed_pld(pmf2, epsilon)
delta2 = pmf_composed.get_delta_for_epsilon(epsilon)
self.assertAlmostEqual(delta1, delta2, msg=f'{epsilon}')
@parameterized.parameters(False, True)
def test_epsilon_for_delta(self, dense):
discretization = 0.1
lower_loss = -1 # loss_value
probs = np.array([0.2, 0.3, 0, 0.4]) # probs for losses -0.1, 0, 0.1, 0.2
infinity_mass = 0.1
pmf = self._create_pmf(discretization, dense, infinity_mass, lower_loss,
probs)
self.assertEqual(np.inf, pmf.get_epsilon_for_delta(0.05)) # <infinity_mass
self.assertAlmostEqual(0.2, pmf.get_epsilon_for_delta(0.1)) # infinity_mass
epsilon = 0.15
delta = pmf.get_delta_for_epsilon(epsilon)
self.assertAlmostEqual(epsilon, pmf.get_epsilon_for_delta(delta))
self.assertAlmostEqual(np.inf, pmf.get_epsilon_for_delta(0))
@parameterized.product(
(
{
'tail_mass_truncation': 0,
'expected_lower_loss': -3,
'expected_probs': np.array([0.06, 0.31, 0.35]),
'expected_truncated_to_inf_mass': 0
},
{
'tail_mass_truncation':
0.1, # no truncation 0.1/2 < min(0.06, 0.35)
'expected_lower_loss': -3,
'expected_probs': np.array([0.06, 0.31, 0.35]),
'expected_truncated_to_inf_mass': 0
},
{
'tail_mass_truncation': 0.15, # truncation from left.
'expected_lower_loss': -2,
'expected_probs': np.array([0.37, 0.35]),
'expected_truncated_to_inf_mass': 0
},
{
'tail_mass_truncation': 0.72, # truncation from both sides.
'expected_lower_loss': -2,
'expected_probs': np.array([0.37]),
'expected_truncated_to_inf_mass':
0.35 # last element goes to inf.
},
),
dense=(False, True),
)
def test_compose_pessimistic(self, tail_mass_truncation, expected_lower_loss,
expected_probs, expected_truncated_to_inf_mass,
dense):
discretization = 0.1
pmf1 = self._create_pmf(
discretization,
lower_loss=-1,
probs=np.array([0.2, 0.7]),
infinity_mass=0.1,
dense=dense)
pmf2 = self._create_pmf(
discretization,
lower_loss=-2,
probs=np.array([0.3, 0.5]),
infinity_mass=0.2,
dense=dense)
pmf = pmf1.compose(pmf2, tail_mass_truncation)
self.assertEqual(discretization, pmf._discretization)
if dense:
self._check_dense_probs(pmf, expected_lower_loss, expected_probs)
else:
self._check_sparse_probs(pmf, expected_lower_loss, expected_probs)
expected_inf_mass = 1 - (1 - 0.1) * (1 -
0.2) + expected_truncated_to_inf_mass
self.assertAlmostEqual(expected_inf_mass, pmf._infinity_mass)
@parameterized.product(
(
{
'tail_mass_truncation': 0,
'expected_lower_loss': -3,
'expected_probs': np.array([0.06, 0.31, 0.35]),
'expected_truncated_to_inf_mass': 0,
'pessimistic_estimate': True
},
{
'tail_mass_truncation': 0,
'expected_lower_loss': -3,
'expected_probs': np.array([0.06, 0.31, 0.35]),
'expected_truncated_to_inf_mass': 0,
'pessimistic_estimate': False
},
{
'tail_mass_truncation':
0.1, # no truncation 0.1/2 < min(0.06, 0.35)
'expected_lower_loss': -3,
'expected_probs': np.array([0.06, 0.31, 0.35]),
'expected_truncated_to_inf_mass': 0,
'pessimistic_estimate': True
},
{
'tail_mass_truncation':
0.1, # no truncation 0.1/2 < min(0.06, 0.35)
'expected_lower_loss': -3,
'expected_probs': np.array([0.06, 0.31, 0.35]),
'expected_truncated_to_inf_mass': 0,
'pessimistic_estimate': False
},
{
'tail_mass_truncation': 0.15, # truncation the left tail.
'expected_lower_loss': -2,
'expected_probs': np.array([0.37, 0.35]),
'expected_truncated_to_inf_mass': 0,
'pessimistic_estimate': True
},
{
'tail_mass_truncation': 0.15, # truncation the left tail.
'expected_lower_loss': -2,
'expected_probs': np.array([0.31, 0.35]),
'expected_truncated_to_inf_mass': 0,
'pessimistic_estimate': False
},
{
'tail_mass_truncation': 0.72, # truncation both tails.
'expected_lower_loss': -2,
'expected_probs': np.array([0.37]),
'expected_truncated_to_inf_mass': 0.35,
'pessimistic_estimate': True
},
{
'tail_mass_truncation': 0.72, # truncation both tails.
'expected_lower_loss': -2,
'expected_probs': np.array([0.66]),
'expected_truncated_to_inf_mass': 0,
'pessimistic_estimate': False
},
),
dense=(False, True))
def test_compose(self, tail_mass_truncation: float, expected_lower_loss: int,
expected_probs: np.ndarray,
expected_truncated_to_inf_mass: float,
pessimistic_estimate: bool, dense: bool):
discretization = 0.1
pmf1 = self._create_pmf(
discretization,
lower_loss=-1,
probs=np.array([0.2, 0.7]),
infinity_mass=0.1,
dense=dense,
pessimistic_estimate=pessimistic_estimate)
pmf2 = self._create_pmf(
discretization,
lower_loss=-2,
probs=np.array([0.3, 0.5]),
infinity_mass=0.2,
dense=dense,
pessimistic_estimate=pessimistic_estimate)
pmf = pmf1.compose(pmf2, tail_mass_truncation)
self.assertEqual(discretization, pmf._discretization)
if dense:
self._check_dense_probs(pmf, expected_lower_loss, expected_probs)
else:
self._check_sparse_probs(pmf, expected_lower_loss, expected_probs)
expected_inf_mass = 1 - (1 - 0.1) * (1 -
0.2) + expected_truncated_to_inf_mass
self.assertAlmostEqual(expected_inf_mass, pmf._infinity_mass)
@parameterized.parameters(False, True)
def test_compose_different_discretization(self, dense: bool):
pmf1 = self._create_pmf(discretization=0.1, dense=dense)
pmf2 = self._create_pmf(discretization=0.2, dense=dense)
with self.assertRaisesRegex(
ValueError, 'Discretization intervals are different: 0.1 != 0.2'):
pmf1.compose(pmf2)
with self.assertRaisesRegex(
ValueError, 'Discretization intervals are different: 0.1 != 0.2'):
pmf1.get_delta_for_epsilon_for_composed_pld(pmf2, 1)
@parameterized.parameters(False, True)
def test_compose_different_estimation(self, dense: bool):
pmf1 = self._create_pmf(
discretization=0.1, pessimistic_estimate=True, dense=dense)
pmf2 = self._create_pmf(
discretization=0.1, pessimistic_estimate=False, dense=dense)
with self.assertRaisesRegex(ValueError, 'Estimation types are different'):
pmf1.compose(pmf2)
with self.assertRaisesRegex(ValueError, 'Estimation types are different'):
pmf1.get_delta_for_epsilon_for_composed_pld(pmf2, 1)
@parameterized.product(
(
{
'num_times': 2,
'tail_mass_truncation': 0,
'expected_lower_loss': -2,
'expected_probs': np.array([0.04, 0.28, 0.49]),
'expected_truncated_to_inf_mass': 0
},
{
'num_times':
5,
'tail_mass_truncation':
0,
'expected_lower_loss':
-5,
'expected_probs':
np.array([0.00032, 0.0056, 0.0392, 0.1372, 0.2401, 0.16807]),
'expected_truncated_to_inf_mass':
0
},
{
'num_times': 2,
'tail_mass_truncation': 0.1,
'expected_lower_loss': -1,
'expected_probs': np.array([0.32, 0.49]),
'expected_truncated_to_inf_mass': 0
},
{
'num_times':
5,
'tail_mass_truncation':
0.01, # truncation left tail.
'expected_lower_loss':
-4,
'expected_probs':
np.array([0.00032 + 0.0056, 0.0392, 0.1372, 0.2401, 0.16807]),
'expected_truncated_to_inf_mass':
0
},
),
dense=(False, True),
)
def test_self_compose(self, num_times, tail_mass_truncation,
expected_lower_loss, expected_probs,
expected_truncated_to_inf_mass, dense):
discretization = 0.1
pmf_input = self._create_pmf(
discretization,
lower_loss=-1,
probs=np.array([0.2, 0.7]),
infinity_mass=0.1,
dense=dense)
pmf_result = pmf_input.self_compose(num_times, tail_mass_truncation)
self.assertEqual(discretization, pmf_result._discretization)
expected_inf_mass = 1 - (1 -
0.1)**num_times + expected_truncated_to_inf_mass
self.assertAlmostEqual(expected_inf_mass, pmf_result._infinity_mass)
if dense:
self._check_dense_probs(pmf_result, expected_lower_loss, expected_probs)
else:
self._check_sparse_probs(pmf_result, expected_lower_loss, expected_probs)
def test_self_compose_many_times_dense(self):
discretization = 0.1
num_times = 50
tail_mass_truncation = 1e-2
expected_lower_loss = -24
expected_probs = np.array([
0.00550859, 0.00668396, 0.01281092, 0.02267679, 0.03703876, 0.05575728,
0.07724706, 0.09831444, 0.11470018, 0.12234686, 0.11894834, 0.10501745,
0.08382972, 0.06018545, 0.03861902, 0.02197841, 0.01098969, 0.00477262
])
expected_truncated_to_inf_mass = 0.00235534610580374
pmf_input = self._create_pmf(
discretization,
lower_loss=-1,
probs=np.array([0.3, 0.7]),
infinity_mass=0,
dense=True)
pmf_result = pmf_input.self_compose(num_times, tail_mass_truncation)
self.assertEqual(discretization, pmf_result._discretization)
self.assertAlmostEqual(expected_truncated_to_inf_mass,
pmf_result._infinity_mass)
self._check_dense_probs(pmf_result, expected_lower_loss, expected_probs)
@parameterized.parameters((1, True), (100, True), (1000, True), (1001, False))
def test_pmf_creation(self, num_points: int, is_sparse: bool):
probs = np.ones(num_points) / num_points
lower_loss = -1
loss_probs = common.list_to_dictionary(probs, lower_loss)
discretization = 0.01
infinity_mass = 0
pessimistic_estimate = True
pmf = pld_pmf.create_pmf(loss_probs, discretization, infinity_mass,
pessimistic_estimate)
if is_sparse:
self.assertIsInstance(pmf, pld_pmf.SparsePLDPmf)
else:
self.assertIsInstance(pmf, pld_pmf.DensePLDPmf)
self.assertEqual(num_points, pmf.size)
@parameterized.parameters((1, 100, True), (10, 100, True), (1, 1001, False),
(10, 101, False), (1001, 1, False),
(1001, 1001, False))
def test_compose_pmfs(self, num_points1, num_points2, is_sparse):
lower_loss = -1
discretization = 0.01
infinity_mass = 0
pessimistic_estimate = True
probs1 = np.ones(num_points1) / num_points1
loss_probs1 = common.list_to_dictionary(probs1, lower_loss)
pmf1 = pld_pmf.create_pmf(loss_probs1, discretization, infinity_mass,
pessimistic_estimate)
probs2 = np.ones(num_points2) / num_points2
loss_probs2 = common.list_to_dictionary(probs2, lower_loss)
pmf2 = pld_pmf.create_pmf(loss_probs2, discretization, infinity_mass,
pessimistic_estimate)
pmf = pld_pmf.compose_pmfs(pmf1, pmf2)
if is_sparse:
self.assertIsInstance(pmf, pld_pmf.SparsePLDPmf)
else:
self.assertIsInstance(pmf, pld_pmf.DensePLDPmf)
if __name__ == '__main__':
unittest.main()