blob: 834d51640eb612acecf3b2c3bc191b4262acef1d [file] [log] [blame]
# Copyright 2024 Google LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Pipeline-DP library mechanisms."""
import itertools
import numpy as np
import pipeline_dp
from dp_auditorium import interfaces
from dp_auditorium.configs import privacy_property
class AggregationMechanism(interfaces.Mechanism):
"""Pipeline DP mechanism wrapper for privacy auditing."""
def __init__(
self,
config: pipeline_dp.AggregateParams,
tested_privacy_property: privacy_property.ApproximateDp,
public_partitions: list[int] | None,
):
self._epsilon = tested_privacy_property.epsilon
self._delta = tested_privacy_property.delta
self._config = config
self._public_partitions = public_partitions
def _compute_aggregations(self, data: list[float]) -> list[float]:
"""Returns one sample of a DP aggregation using the pipeline_dp library.
Args:
data: One dimensional array with scalar corresponding to different
records.
"""
budget_accountant = pipeline_dp.NaiveBudgetAccountant(
self._epsilon, self._delta
)
dp_engine = pipeline_dp.DPEngine(
budget_accountant, pipeline_dp.LocalBackend()
)
data_extractors = pipeline_dp.DataExtractors(
partition_extractor=lambda x: x[0],
privacy_id_extractor=lambda x: x[1],
value_extractor=lambda x: x[2],
)
result = dp_engine.aggregate(
data,
self._config,
data_extractors,
public_partitions=self._public_partitions,
)
budget_accountant.compute_budgets()
# result is an iterator where each item is a tuple
# `(`partition_id`, MetricsTuple)`. We drop partition_id and concatenate all
# metrics' values.
values = [row[1] for row in result]
# The output of this wrapper is designed for `interfaces.PropertyTester`
# which receives arrays of samples where each sample is a one-dimensional
# array. The specific metric defining each entry does not affect the privacy
# test result, so for each sample we flatten all metrics across distinct
# partitions.
return [x for x in itertools.chain(*values)]
def __call__(self, data: np.ndarray, num_samples: int) -> np.ndarray:
"""Returns an array of samples of a DP aggregation using pipeline_dp."""
result = []
data = list(data) # PipelineDP works now for list only.
for _ in range(num_samples):
result.append(self._compute_aggregations(data))
return np.array(result)