blob: af7dc05c6276103abbf7783dc870ad080ba9bcf0 [file] [log] [blame]
# Copyright 2021 Google LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Parameters for the clustering algorithm and data input to cluster."""
import dataclasses
import enum
import typing
from absl import logging
import numpy as np
@enum.unique
class PrivacyModel(enum.Enum):
"""Designates the model of privacy to use or simulate.
CENTRAL : Uses the central model of privacy where a central aggregator has
access to the data and only the output needs to be differentially private.
"""
CENTRAL = "central"
@dataclasses.dataclass
class DifferentialPrivacyParam():
"""User parameters for differential privacy."""
epsilon: float = 1.0
delta: float = 1e-6
privacy_model: PrivacyModel = PrivacyModel.CENTRAL
@dataclasses.dataclass
class PrivacyBudgetSplit():
"""How to split epsilon between the computations.
Attributes:
epsilon: Differential privacy parameter, epsilon, to be split.
frac_sum: The fraction of epsilon to use when computing the sum of points
for noisy averaging.
frac_group_count: The fraction of epsilon to use when counting the number of
points with a particular hash value.
"""
frac_sum: float = 0.8
frac_group_count: float = 0.2
def __post_init__(self):
total = self.frac_sum + self.frac_group_count
if total > 1.0:
raise ValueError(
f"The provided privacy budget split ({total}) was greater than 1.0.")
@dataclasses.dataclass
class PrivacyCalculatorMultiplier():
"""Multipliers to be used by mechanism calibration."""
gaussian_std_dev_multiplier: float = 1.0
laplace_param_multiplier: float = 20.0
def get_gaussian_std_dev(self, alpha: float, sensitivity: float) -> float:
"""Returns gaussian standard deviation based on alpha.
Args:
alpha: parameter varied in mechanism calibration.
sensitivity: sensitivity of the dataset for the sum operations.
"""
return self.gaussian_std_dev_multiplier * alpha * sensitivity
def get_alpha(self, gaussian_std_dev: float, sensitivity: float) -> float:
"""Returns alpha based on gaussian standard deviation and sensitivity.
This must be the inverse of get_gaussian_std_dev with respect to alpha.
Args:
gaussian_std_dev: standard deviation to calculate alpha for.
sensitivity: sensitivity of the dataset for the sum operations.
"""
return gaussian_std_dev / (sensitivity * self.gaussian_std_dev_multiplier)
def get_laplace_param(self, alpha: float) -> float:
"""Returns laplace parameter based on alpha.
Args:
alpha: parameter varied in mechanism calibration.
"""
inverse_laplace_param = self.laplace_param_multiplier * alpha
# Laplace param increases as noise decreases, so we scale it inversely with
# alpha.
return 1.0 / inverse_laplace_param
@dataclasses.dataclass
class TreeParam():
"""Thresholds for constructing a tree.
Attributes:
min_num_points_in_branching_node: The minimum number of points that must be
included for a node to branch.
min_num_points_in_node: The minimum number of points that must be included
to appear in the tree.
max_depth: Maximum depth for the tree.
"""
min_num_points_in_branching_node: int
min_num_points_in_node: int
max_depth: int
def __post_init__(self):
if self.min_num_points_in_node < 1:
raise ValueError("Threshold for a node to be included in the tree must "
"be at least 1.")
if self.min_num_points_in_branching_node < self.min_num_points_in_node:
raise ValueError(
"Threshold for branching should be at least the threshold for a node "
"to be included in the tree.")
# Numpy array where the rows are points.
Points = np.ndarray
@dataclasses.dataclass(frozen=True)
class Data():
"""Dataset and metadata needed by the clustering algorithm.
Attributes:
datapoints: Datapoints where each row is a datapoint.
radius: Bound on the distance of each point from the origin in datapoints.
labels: Labels, if any, for the data (None by default).
num_points: Number of datapoints, populated based on the number of rows in
datapoints (inferred from datapoints).
dim: Dimension of each datapoint, populated based on the number of columns
in datapoints (inferred from datapoints).
"""
datapoints: Points
radius: float
labels: typing.Optional[np.ndarray] = None
num_points: int = dataclasses.field(init=False)
dim: int = dataclasses.field(init=False)
def __post_init__(self):
(num_points, dim) = np.shape(self.datapoints)
object.__setattr__(self, "num_points", num_points)
object.__setattr__(self, "dim", dim)
if self.labels is not None and self.labels.shape[0] != self.num_points:
raise ValueError(
f"Number of labels ({self.labels.shape[0]}) is different from "
f"number of datapoints ({self.num_points})")
def clip_by_radius(self,
points: typing.Optional[Points] = None) -> np.ndarray:
"""Returns clipped points based on self.radius.
If a point already has norm at most the radius, we leave it unchanged.
Otherwise, we rescale it so that its norm is exactly radius.
Args:
points: Points to clip. If None, self.datapoints are used. Does not modify
self.datapoints.
"""
if points is None:
points = self.datapoints
scale = self.radius / np.maximum(
np.linalg.norm(points, axis=-1), self.radius).reshape(-1, 1)
if np.min(scale) < 1.0:
logging.debug(
"Found %s points outside of radius provided, rescaling them to "
"have norm exactly equal to the radius.", np.sum(scale < 1.0))
return points * scale