Consolidate remaining aggregation tests
Some tests separately calculate aggregations for value, anotherValue, and vector respectively. Refactor these tests into one test that calculates everything instead. Additionally, remove some redundant tests.
PiperOrigin-RevId: 826416536
diff --git a/pipelinedp4j/tests/com/google/privacy/differentialprivacy/pipelinedp4j/api/BeamApiTest.kt b/pipelinedp4j/tests/com/google/privacy/differentialprivacy/pipelinedp4j/api/BeamApiTest.kt
index 5dce372..50b36a7 100644
--- a/pipelinedp4j/tests/com/google/privacy/differentialprivacy/pipelinedp4j/api/BeamApiTest.kt
+++ b/pipelinedp4j/tests/com/google/privacy/differentialprivacy/pipelinedp4j/api/BeamApiTest.kt
@@ -843,11 +843,11 @@
"sumValue" to DoubleWithTolerance(value = 2.5, tolerance = 0.5),
"meanValue" to DoubleWithTolerance(value = 0.83, tolerance = 0.5),
// (1^2+(0.5)^2+1^2)/3-((1.0+0.5+1.0)/3)^2 = 0.0(5)
- "varianceValue" to DoubleWithTolerance(value = 0.05, tolerance = 0.05),
+ "varianceValue" to DoubleWithTolerance(value = 0.05, tolerance = 0.1),
"quantilesValue_0.5" to DoubleWithTolerance(value = 1.0, tolerance = 0.5),
"sumAnotherValue" to DoubleWithTolerance(value = 4.5, tolerance = 0.5),
"meanAnotherValue" to DoubleWithTolerance(value = 1.5, tolerance = 0.5),
- "varianceAnotherValue" to DoubleWithTolerance(value = 1.16, tolerance = 0.05),
+ "varianceAnotherValue" to DoubleWithTolerance(value = 1.16, tolerance = 0.1),
"quantilesAnotherValue_0.5" to DoubleWithTolerance(value = 2.0, tolerance = 0.5),
),
mapOf(
@@ -928,11 +928,11 @@
"cnt" to DoubleWithTolerance(value = 3.0, tolerance = 0.5),
"sumValue" to DoubleWithTolerance(value = 2.5, tolerance = 0.5),
"meanValue" to DoubleWithTolerance(value = 0.83, tolerance = 0.5),
- "varianceValue" to DoubleWithTolerance(value = 0.05, tolerance = 0.05),
+ "varianceValue" to DoubleWithTolerance(value = 0.05, tolerance = 0.1),
"quantilesValue_0.5" to DoubleWithTolerance(value = 1.0, tolerance = 0.5),
"sumAnotherValue" to DoubleWithTolerance(value = 4.5, tolerance = 0.5),
"meanAnotherValue" to DoubleWithTolerance(value = 1.5, tolerance = 0.5),
- "varianceAnotherValue" to DoubleWithTolerance(value = 1.16, tolerance = 0.05),
+ "varianceAnotherValue" to DoubleWithTolerance(value = 1.16, tolerance = 0.1),
"quantilesAnotherValue_0.5" to DoubleWithTolerance(value = 2.0, tolerance = 0.5),
),
mapOf(
diff --git a/pipelinedp4j/tests/com/google/privacy/differentialprivacy/pipelinedp4j/api/LocalApiTest.kt b/pipelinedp4j/tests/com/google/privacy/differentialprivacy/pipelinedp4j/api/LocalApiTest.kt
index 9a1ddc3..c14e4ec 100644
--- a/pipelinedp4j/tests/com/google/privacy/differentialprivacy/pipelinedp4j/api/LocalApiTest.kt
+++ b/pipelinedp4j/tests/com/google/privacy/differentialprivacy/pipelinedp4j/api/LocalApiTest.kt
@@ -765,14 +765,14 @@
}
@Test
- fun run_publicGroups_allPossibleValueAggregations_calculatesStatisticsCorrectly() {
+ fun run_publicGroups_allPossibleAggregationsOverMultipleValues_calculatesStatisticsCorrectly() {
val data =
createInputData(
listOf(
- TestDataRow("group1", "pid1", 1.0),
- TestDataRow("group1", "pid1", 1.5),
- TestDataRow("group1", "pid2", 2.0),
- TestDataRow("nonPublicGroup", "pid2", 2.0),
+ TestDataRow("group1", "pid1", 1.0, 2.0),
+ TestDataRow("group1", "pid1", 0.5, 2.5),
+ TestDataRow("group1", "pid2", 1.5, 0.0),
+ TestDataRow("nonPublicGroup", "pid2", 3.0),
)
)
val publicGroups = createPublicGroups(listOf("group1"))
@@ -791,59 +791,21 @@
.aggregateValue(
{ it.value },
ValueAggregationsBuilder()
- .sum("sumResult")
- .mean("meanResult")
- .variance("varianceResult")
- .quantiles(ranks = listOf(0.5), outputColumnName = "quantilesResult"),
- ContributionBounds(valueBounds = Bounds(minValue = 1.0, maxValue = 2.0)),
+ .sum("sumValue")
+ .mean("meanValue")
+ .variance("varianceValue")
+ .quantiles(ranks = listOf(0.5), outputColumnName = "quantilesValue"),
+ ContributionBounds(valueBounds = Bounds(minValue = 0.5, maxValue = 1.0)),
)
- .build(TotalBudget(epsilon = 1000.0), NoiseKind.LAPLACE)
-
- val result: Sequence<QueryPerGroupResult<String>> = query.run()
-
- val expected =
- listOf(
- QueryPerGroupResultWithTolerance(
- "group1",
- mapOf(
- "pidCnt" to DoubleWithTolerance(value = 2.0, tolerance = 0.5),
- "cnt" to DoubleWithTolerance(value = 3.0, tolerance = 0.5),
- "sumResult" to DoubleWithTolerance(value = 4.5, tolerance = 0.5),
- "meanResult" to DoubleWithTolerance(value = 1.5, tolerance = 0.5),
- // (1^2+(1.5)^2+2^2)/3-((1.0+1.5+2)/3)^2 = 0.1(6)
- "varianceResult" to DoubleWithTolerance(value = 0.16, tolerance = 0.05),
- "quantilesResult_0.5" to DoubleWithTolerance(value = 1.5, tolerance = 0.5),
- ),
- vectorAggregationResults = mapOf(),
+ .aggregateValue(
+ { it.anotherValue },
+ ValueAggregationsBuilder()
+ .sum("sumAnotherValue")
+ .mean("meanAnotherValue")
+ .variance("varianceAnotherValue")
+ .quantiles(ranks = listOf(0.5), outputColumnName = "quantilesAnotherValue"),
+ ContributionBounds(valueBounds = Bounds(minValue = 0.0, maxValue = 3.0)),
)
- )
- assertEquals(result, expected)
- }
-
- @Test
- fun run_publicGroups_allPossibleVectorAggregations_calculatesStatisticsCorrectly() {
- val data =
- createInputData(
- listOf(
- TestDataRow("group1", "pid1", 1.0, 2.0),
- TestDataRow("group1", "pid1", 0.5, 2.5),
- TestDataRow("group1", "pid2", 1.0, 0.0),
- TestDataRow("nonPublicGroup", "pid2", 3.0),
- )
- )
- val publicGroups = createPublicGroups(listOf("group1"))
- val query =
- LocalQueryBuilder.from(
- data,
- { it.privacyUnit },
- ContributionBoundingLevel.DATASET_LEVEL(
- maxGroupsContributed = 1,
- maxContributionsPerGroup = 2,
- ),
- )
- .groupBy({ it.groupKey }, GroupsType.PublicGroups.create(publicGroups))
- .countDistinctPrivacyUnits("pidCnt")
- .count("cnt")
.aggregateVector(
{ listOf(it.value, it.anotherValue) },
vectorSize = 2,
@@ -863,15 +825,24 @@
mapOf(
"pidCnt" to DoubleWithTolerance(value = 2.0, tolerance = 0.5),
"cnt" to DoubleWithTolerance(value = 3.0, tolerance = 0.5),
+ "sumValue" to DoubleWithTolerance(value = 2.5, tolerance = 0.5),
+ "meanValue" to DoubleWithTolerance(value = 0.83, tolerance = 0.5),
+ // (1^2+(0.5)^2+1^2)/3-((1.0+0.5+1.0)/3)^2 = 0.0(5)
+ "varianceValue" to DoubleWithTolerance(value = 0.05, tolerance = 0.1),
+ "quantilesValue_0.5" to DoubleWithTolerance(value = 1.0, tolerance = 0.5),
+ "sumAnotherValue" to DoubleWithTolerance(value = 4.5, tolerance = 0.5),
+ "meanAnotherValue" to DoubleWithTolerance(value = 1.5, tolerance = 0.5),
+ "varianceAnotherValue" to DoubleWithTolerance(value = 1.16, tolerance = 0.1),
+ "quantilesAnotherValue_0.5" to DoubleWithTolerance(value = 2.0, tolerance = 0.5),
),
mapOf(
+ // pid1: (1.0, 2.0) + (0.5, 2.5) = (1.5, 4.5), L_INF norm is 4.5 =>
+ // clip it (1.5, 2.0).
+ // pid2: (1.5, 0.0), L_INF norm is 1.5 => no clipping.
+ // result: (1.5, 2.0) + (1.5, 0.0) = (3.0, 2.0)
"vectorSumResult" to
- // pid1: (1.0, 2.0) + (0.5, 2.5) = (1.5, 4.5), L_INF norm is 4.5 =>
- // clip it (1.5, 2.0).
- // pid2: (1.0, 0.0), L_INF norm is 1.0 => no clipping.
- // result: (1.5, 2.0) + (1.0, 0.0) = (2.5, 2.0)
listOf(
- DoubleWithTolerance(value = 2.5, tolerance = 0.5),
+ DoubleWithTolerance(value = 3.0, tolerance = 0.5),
DoubleWithTolerance(value = 2.0, tolerance = 0.5),
)
),
@@ -881,7 +852,7 @@
}
@Test
- fun run_publicGroups_multipleValueAndVectorAggregations_calculatesStatisticsCorrectly() {
+ fun run_privateGroups_allPossibleAggregationsOverMultipleValues_calculatesStatisticsCorrectly() {
val data =
createInputData(
listOf(
@@ -891,238 +862,6 @@
TestDataRow("nonPublicGroup", "pid2", 3.0),
)
)
- val publicGroups = createPublicGroups(listOf("group1"))
- val query =
- LocalQueryBuilder.from(
- data,
- { it.privacyUnit },
- ContributionBoundingLevel.DATASET_LEVEL(
- maxGroupsContributed = 1,
- maxContributionsPerGroup = 2,
- ),
- )
- .groupBy({ it.groupKey }, GroupsType.PublicGroups.create(publicGroups))
- .countDistinctPrivacyUnits("pidCnt")
- .count("cnt")
- .aggregateValue(
- { it.value },
- ValueAggregationsBuilder().sum("sumValue"),
- ContributionBounds(totalValueBounds = Bounds(1.0, 2.0)),
- )
- .aggregateValue(
- { it.anotherValue },
- ValueAggregationsBuilder().sum("sumAnotherValue"),
- ContributionBounds(totalValueBounds = Bounds(0.0, 3.0)),
- )
- .aggregateVector(
- { listOf(it.value, it.anotherValue) },
- vectorSize = 2,
- VectorAggregationsBuilder().vectorSum("vectorSumResult"),
- VectorContributionBounds(
- maxVectorTotalNorm = VectorNorm(normKind = NormKind.L_INF, value = 2.0)
- ),
- )
- .build(TotalBudget(epsilon = 1000.0), NoiseKind.LAPLACE)
-
- val result: Sequence<QueryPerGroupResult<String>> = query.run()
-
- // sumValue: pid1 contributes 1.5, pid2 contributes 1.0. Total 2.5
- // sumAnotherValue:
- // pid1 contributes 2.0 + 2.5 = 4.5. Bounded by [0.0, 3.0], so clipped to 3.0
- // pid2 contributes 0.0. Bounded by [0.0, 3.0], so it is 0.0
- // Total sumAnotherValue = 3.0 + 0.0 = 3.0
-
- val expected =
- listOf(
- QueryPerGroupResultWithTolerance(
- "group1",
- mapOf(
- "pidCnt" to DoubleWithTolerance(value = 2.0, tolerance = 0.5),
- "cnt" to DoubleWithTolerance(value = 3.0, tolerance = 0.5),
- "sumValue" to DoubleWithTolerance(value = 2.5, tolerance = 0.5),
- "sumAnotherValue" to DoubleWithTolerance(value = 3.0, tolerance = 0.5),
- ),
- mapOf(
- "vectorSumResult" to
- listOf(
- DoubleWithTolerance(value = 2.5, tolerance = 0.5),
- DoubleWithTolerance(value = 2.0, tolerance = 0.5),
- )
- ),
- )
- )
- assertEquals(result, expected)
- }
-
- @Test
- fun run_privateGroups_multipleValueAndVectorAggregations_calculatesStatisticsCorrectly() {
- val data =
- createInputData(
- listOf(
- TestDataRow("group1", "pid1", 1.0, 2.0),
- TestDataRow("group1", "pid1", 0.5, 2.5),
- TestDataRow("group1", "pid2", 1.0, 0.0),
- TestDataRow("nonPublicGroup", "pid2", 3.0),
- )
- )
- val query =
- LocalQueryBuilder.from(
- data,
- { it.privacyUnit },
- ContributionBoundingLevel.DATASET_LEVEL(
- maxGroupsContributed = 2,
- maxContributionsPerGroup = 2,
- ),
- )
- .groupBy({ it.groupKey }, GroupsType.PrivateGroups())
- .countDistinctPrivacyUnits("pidCnt")
- .count("cnt")
- .aggregateValue(
- { it.value },
- ValueAggregationsBuilder().sum("sumValue"),
- ContributionBounds(totalValueBounds = Bounds(1.0, 2.0)),
- )
- .aggregateValue(
- { it.anotherValue },
- ValueAggregationsBuilder().sum("sumAnotherValue"),
- ContributionBounds(totalValueBounds = Bounds(0.0, 3.0)),
- )
- .aggregateVector(
- { listOf(it.value, it.anotherValue) },
- vectorSize = 2,
- VectorAggregationsBuilder().vectorSum("vectorSumResult"),
- VectorContributionBounds(
- maxVectorTotalNorm = VectorNorm(normKind = NormKind.L_INF, value = 2.0)
- ),
- )
- .build(TotalBudget(epsilon = 3500.0, delta = 0.001), NoiseKind.LAPLACE)
-
- val result: Sequence<QueryPerGroupResult<String>> = query.run()
-
- // sumValue:
- // group1: pid1 contributes 1.5 (in [1,2]), pid2 contributes 1.0 (in [1,2]). Total 2.5.
- // nonPublicGroup: pid2 contributes 3.0, clipped to 2.0 by [1,2]. Total 2.0.
- // sumAnotherValue:
- // group1: pid1 contributes 4.5, clipped to 3.0 by [0,3]. pid2 contributes 0.0 (in [0,3]). Total
- // 3.0.
- // nonPublicGroup: pid2 contributes 0.0 (in [0,3]). Total 0.0.
- // vectorSumResult:
- // group1: pid1 contributes (1.5, 4.5), L_INF-clipped to (1.5, 2.0). pid2 contributes (1.0,
- // 0.0),
- // not clipped.
- // Total for group1: (1.5, 2.0) + (1.0, 0.0) = (2.5, 2.0).
- // nonPublicGroup: pid2 contributes (3.0, 0.0), L_INF-clipped to (2.0, 0.0).
- val expected =
- listOf(
- QueryPerGroupResultWithTolerance(
- "group1",
- mapOf(
- "pidCnt" to DoubleWithTolerance(value = 2.0, tolerance = 0.5),
- "cnt" to DoubleWithTolerance(value = 3.0, tolerance = 0.5),
- "sumValue" to DoubleWithTolerance(value = 2.5, tolerance = 0.5),
- "sumAnotherValue" to DoubleWithTolerance(value = 3.0, tolerance = 0.5),
- ),
- mapOf(
- "vectorSumResult" to
- listOf(
- DoubleWithTolerance(value = 2.5, tolerance = 0.5),
- DoubleWithTolerance(value = 2.0, tolerance = 0.5),
- )
- ),
- )
- )
- assertEquals(result, expected)
- }
-
- @Test
- fun run_privateGroups_noPidCount_multipleValueAndVectorAggregations_calculatesStatisticsCorrectly() {
- val data =
- createInputData(
- listOf(
- TestDataRow("group1", "pid1", 1.0, 2.0),
- TestDataRow("group1", "pid1", 0.5, 2.5),
- TestDataRow("group1", "pid2", 1.0, 0.0),
- TestDataRow("nonPublicGroup", "pid2", 3.0),
- )
- )
- val query =
- LocalQueryBuilder.from(
- data,
- { it.privacyUnit },
- ContributionBoundingLevel.DATASET_LEVEL(
- maxGroupsContributed = 2,
- maxContributionsPerGroup = 2,
- ),
- )
- .groupBy({ it.groupKey }, GroupsType.PrivateGroups())
- .count("cnt")
- .aggregateValue(
- { it.value },
- ValueAggregationsBuilder().sum("sumValue"),
- ContributionBounds(totalValueBounds = Bounds(1.0, 2.0)),
- )
- .aggregateValue(
- { it.anotherValue },
- ValueAggregationsBuilder().sum("sumAnotherValue"),
- ContributionBounds(totalValueBounds = Bounds(0.0, 3.0)),
- )
- .aggregateVector(
- { listOf(it.value, it.anotherValue) },
- vectorSize = 2,
- VectorAggregationsBuilder().vectorSum("vectorSumResult"),
- VectorContributionBounds(
- maxVectorTotalNorm = VectorNorm(normKind = NormKind.L_INF, value = 2.0)
- ),
- )
- .build(TotalBudget(epsilon = 3500.0, delta = 0.001), NoiseKind.LAPLACE)
-
- val result: Sequence<QueryPerGroupResult<String>> = query.run()
-
- // sumValue:
- // group1: pid1 contributes 1.5 (in [1,2]), pid2 contributes 1.0 (in [1,2]). Total 2.5.
- // nonPublicGroup: pid2 contributes 3.0, clipped to 2.0 by [1,2]. Total 2.0.
- // sumAnotherValue:
- // group1: pid1 contributes 4.5, clipped to 3.0 by [0,3]. pid2 contributes 0.0 (in [0,3]). Total
- // 3.0.
- // nonPublicGroup: pid2 contributes 0.0 (in [0,3]). Total 0.0.
- // vectorSumResult:
- // group1: pid1 contributes (1.5, 4.5), L_INF-clipped to (1.5, 2.0). pid2 contributes (1.0,
- // 0.0),
- // not clipped.
- // Total for group1: (1.5, 2.0) + (1.0, 0.0) = (2.5, 2.0).
- // nonPublicGroup: pid2 contributes (3.0, 0.0), L_INF-clipped to (2.0, 0.0).
- val expected =
- listOf(
- QueryPerGroupResultWithTolerance(
- "group1",
- mapOf(
- "cnt" to DoubleWithTolerance(value = 3.0, tolerance = 0.5),
- "sumValue" to DoubleWithTolerance(value = 2.5, tolerance = 0.5),
- "sumAnotherValue" to DoubleWithTolerance(value = 3.0, tolerance = 0.5),
- ),
- mapOf(
- "vectorSumResult" to
- listOf(
- DoubleWithTolerance(value = 2.5, tolerance = 0.5),
- DoubleWithTolerance(value = 2.0, tolerance = 0.5),
- )
- ),
- )
- )
- assertEquals(result, expected)
- }
-
- @Test
- fun run_privateGroups_allPossibleValueAggregations_calculatesStatisticsCorrectly() {
- val data =
- createInputData(
- listOf(
- TestDataRow("group1", "pid1", 1.0),
- TestDataRow("group1", "pid1", 1.5),
- TestDataRow("group1", "pid2", 2.0),
- TestDataRow("group2", "pid1", 1.0),
- )
- )
val query =
LocalQueryBuilder.from(
data,
@@ -1138,64 +877,27 @@
.aggregateValue(
{ it.value },
ValueAggregationsBuilder()
- .sum("sumResult")
- .mean("meanResult")
- .variance("varianceResult")
- .quantiles(ranks = listOf(0.5), outputColumnName = "quantilesResult"),
- ContributionBounds(valueBounds = Bounds(minValue = 1.0, maxValue = 2.0)),
+ .sum("sumValue")
+ .mean("meanValue")
+ .variance("varianceValue")
+ .quantiles(ranks = listOf(0.5), outputColumnName = "quantilesValue"),
+ ContributionBounds(valueBounds = Bounds(minValue = 0.5, maxValue = 1.0)),
)
- .build(TotalBudget(epsilon = 3500.0, delta = 0.001), NoiseKind.LAPLACE)
-
- val result: Sequence<QueryPerGroupResult<String>> = query.run()
-
- val expected =
- listOf(
- QueryPerGroupResultWithTolerance(
- "group1",
- mapOf(
- "pidCnt" to DoubleWithTolerance(value = 2.0, tolerance = 0.5),
- "cnt" to DoubleWithTolerance(value = 3.0, tolerance = 0.5),
- "sumResult" to DoubleWithTolerance(value = 4.5, tolerance = 0.5),
- "meanResult" to DoubleWithTolerance(value = 1.5, tolerance = 0.5),
- // (1^2+(1.5)^2+2^2)/3-((1.0+1.5+2)/3)^2 = 0.1(6)
- "varianceResult" to DoubleWithTolerance(value = 0.16, tolerance = 0.05),
- "quantilesResult_0.5" to DoubleWithTolerance(value = 1.5, tolerance = 0.5),
- ),
- vectorAggregationResults = mapOf(),
+ .aggregateValue(
+ { it.anotherValue },
+ ValueAggregationsBuilder()
+ .sum("sumAnotherValue")
+ .mean("meanAnotherValue")
+ .variance("varianceAnotherValue")
+ .quantiles(ranks = listOf(0.5), outputColumnName = "quantilesAnotherValue"),
+ ContributionBounds(valueBounds = Bounds(minValue = 0.0, maxValue = 2.5)),
)
- )
- assertEquals(result, expected)
- }
-
- @Test
- fun run_privateGroups_allPossibleVectorAggregations_calculatesStatisticsCorrectly() {
- val data =
- createInputData(
- listOf(
- TestDataRow("group1", "pid1", 1.0, 2.0),
- TestDataRow("group1", "pid1", 1.5, 2.5),
- TestDataRow("group1", "pid2", 3.0, -1.0),
- TestDataRow("group2", "pid1", -1.0, -3.0),
- )
- )
- val query =
- LocalQueryBuilder.from(
- data,
- { it.privacyUnit },
- ContributionBoundingLevel.DATASET_LEVEL(
- maxGroupsContributed = 2,
- maxContributionsPerGroup = 2,
- ),
- )
- .groupBy({ it.groupKey }, GroupsType.PrivateGroups())
- .countDistinctPrivacyUnits("pidCnt")
- .count("cnt")
.aggregateVector(
{ listOf(it.value, it.anotherValue) },
vectorSize = 2,
VectorAggregationsBuilder().vectorSum("vectorSumResult"),
VectorContributionBounds(
- maxVectorTotalNorm = VectorNorm(normKind = NormKind.L1, value = 5.0)
+ maxVectorTotalNorm = VectorNorm(normKind = NormKind.L1, value = 2.0)
),
)
.build(TotalBudget(epsilon = 3500.0, delta = 0.001), NoiseKind.LAPLACE)
@@ -1209,16 +911,25 @@
mapOf(
"pidCnt" to DoubleWithTolerance(value = 2.0, tolerance = 0.5),
"cnt" to DoubleWithTolerance(value = 3.0, tolerance = 0.5),
+ "sumValue" to DoubleWithTolerance(value = 2.5, tolerance = 0.5),
+ "meanValue" to DoubleWithTolerance(value = 0.83, tolerance = 0.5),
+ "varianceValue" to DoubleWithTolerance(value = 0.05, tolerance = 0.1),
+ "quantilesValue_0.5" to DoubleWithTolerance(value = 1.0, tolerance = 0.5),
+ "sumAnotherValue" to DoubleWithTolerance(value = 4.5, tolerance = 0.5),
+ "meanAnotherValue" to DoubleWithTolerance(value = 1.5, tolerance = 0.5),
+ "varianceAnotherValue" to DoubleWithTolerance(value = 1.16, tolerance = 0.1),
+ "quantilesAnotherValue_0.5" to DoubleWithTolerance(value = 2.0, tolerance = 0.5),
),
mapOf(
+ // pid1: (1.0, 2.0) + (0.5, 2.5) = (1.5, 4.5), L1 norm is 6 =>
+ // clip it to (1.5, 4.5) * 2.0 / 6 = (0.5, 1.5)
+ // pid2: (1.0, 0.0), L1 norm is 1.0 => no clipping.
+ // result: (0.5, 1.5) + (1.0, 0.0) = (1.5, 1.5)
+ // nonPublicGroup: pid2 contributes (3.0, 0.0), L1-clipped to (2.0, 0.0).
"vectorSumResult" to
- // pid1: (1.0, 2.0) + (1.5, 2.5) = (2.5, 4.5), L1 norm is 7 =>
- // clip it to (2.5, 4.5) * 5.0 / 7.0 = (1.8, 3.2)
- // pid2: (3.0, -1.0), L1 norm is 4.0 => no clipping.
- // result: (1.8, 3.2) + (3.0, -1.0) = (4.8, 2.2)
listOf(
- DoubleWithTolerance(value = 4.8, tolerance = 0.5),
- DoubleWithTolerance(value = 2.2, tolerance = 0.5),
+ DoubleWithTolerance(value = 1.5, tolerance = 0.5),
+ DoubleWithTolerance(value = 1.5, tolerance = 0.5),
)
),
)
@@ -1228,14 +939,14 @@
// When counting distinct privacy units different group selection mechanism is used.
@Test
- fun run_privateGroups_noCountDistinctPrivacyUnits_calculatesStatisticsCorrectly() {
+ fun run_privateGroups_noCountDistinctPrivacyUnits_multipleValueAndVectorAggregations_calculatesStatisticsCorrectly() {
val data =
createInputData(
listOf(
- TestDataRow("group1", "pid1", 1.0),
- TestDataRow("group1", "pid1", 1.5),
- TestDataRow("group1", "pid2", 2.0),
- TestDataRow("group2", "pid1", 1.0),
+ TestDataRow("group1", "pid1", 1.0, 2.0),
+ TestDataRow("group1", "pid1", 0.5, 2.5),
+ TestDataRow("group1", "pid2", 1.0, 0.0),
+ TestDataRow("nonPublicGroup", "pid2", 3.0),
)
)
val query =
@@ -1251,30 +962,55 @@
.count("cnt")
.aggregateValue(
{ it.value },
- ValueAggregationsBuilder()
- .sum("sumResult")
- .mean("meanResult")
- .variance("varianceResult")
- .quantiles(ranks = listOf(0.5), outputColumnName = "quantilesResult"),
- ContributionBounds(valueBounds = Bounds(minValue = 1.0, maxValue = 2.0)),
+ ValueAggregationsBuilder().sum("sumValue"),
+ ContributionBounds(totalValueBounds = Bounds(1.0, 2.0)),
)
- .build(TotalBudget(epsilon = 3000.0, delta = 0.001), NoiseKind.GAUSSIAN)
+ .aggregateValue(
+ { it.anotherValue },
+ ValueAggregationsBuilder().sum("sumAnotherValue"),
+ ContributionBounds(totalValueBounds = Bounds(0.0, 3.0)),
+ )
+ .aggregateVector(
+ { listOf(it.value, it.anotherValue) },
+ vectorSize = 2,
+ VectorAggregationsBuilder().vectorSum("vectorSumResult"),
+ VectorContributionBounds(
+ maxVectorTotalNorm = VectorNorm(normKind = NormKind.L_INF, value = 2.0)
+ ),
+ )
+ .build(TotalBudget(epsilon = 3500.0, delta = 0.001), NoiseKind.LAPLACE)
val result: Sequence<QueryPerGroupResult<String>> = query.run()
+ // sumValue:
+ // group1: pid1 contributes 1.5 (in [1,2]), pid2 contributes 1.0 (in [1,2]). Total 2.5.
+ // nonPublicGroup: pid2 contributes 3.0, clipped to 2.0 by [1,2]. Total 2.0.
+ // sumAnotherValue:
+ // group1: pid1 contributes 4.5, clipped to 3.0 by [0,3]. pid2 contributes 0.0 (in [0,3]). Total
+ // 3.0.
+ // nonPublicGroup: pid2 contributes 0.0 (in [0,3]). Total 0.0.
+ // vectorSumResult:
+ // group1: pid1 contributes (1.5, 4.5), L_INF-clipped to (1.5, 2.0). pid2 contributes (1.0,
+ // 0.0),
+ // not clipped.
+ // Total for group1: (1.5, 2.0) + (1.0, 0.0) = (2.5, 2.0).
+ // nonPublicGroup: pid2 contributes (3.0, 0.0), L_INF-clipped to (2.0, 0.0).
val expected =
listOf(
QueryPerGroupResultWithTolerance(
"group1",
mapOf(
"cnt" to DoubleWithTolerance(value = 3.0, tolerance = 0.5),
- "sumResult" to DoubleWithTolerance(value = 4.5, tolerance = 0.5),
- "meanResult" to DoubleWithTolerance(value = 1.5, tolerance = 0.5),
- // (1^2+(1.5)^2+2^2)/3-((1.0+1.5+2)/3)^2 = 0.1(6)
- "varianceResult" to DoubleWithTolerance(value = 0.16, tolerance = 0.05),
- "quantilesResult_0.5" to DoubleWithTolerance(value = 1.5, tolerance = 0.5),
+ "sumValue" to DoubleWithTolerance(value = 2.5, tolerance = 0.5),
+ "sumAnotherValue" to DoubleWithTolerance(value = 3.0, tolerance = 0.5),
),
- vectorAggregationResults = mapOf(),
+ mapOf(
+ "vectorSumResult" to
+ listOf(
+ DoubleWithTolerance(value = 2.5, tolerance = 0.5),
+ DoubleWithTolerance(value = 2.0, tolerance = 0.5),
+ )
+ ),
)
)
assertEquals(result, expected)
diff --git a/pipelinedp4j/tests/com/google/privacy/differentialprivacy/pipelinedp4j/api/SparkApiTest.kt b/pipelinedp4j/tests/com/google/privacy/differentialprivacy/pipelinedp4j/api/SparkApiTest.kt
index d46ebd6..31e4b78 100644
--- a/pipelinedp4j/tests/com/google/privacy/differentialprivacy/pipelinedp4j/api/SparkApiTest.kt
+++ b/pipelinedp4j/tests/com/google/privacy/differentialprivacy/pipelinedp4j/api/SparkApiTest.kt
@@ -833,11 +833,11 @@
"sumValue" to DoubleWithTolerance(value = 2.5, tolerance = 0.5),
"meanValue" to DoubleWithTolerance(value = 0.83, tolerance = 0.5),
// (1^2+(0.5)^2+1^2)/3-((1.0+0.5+1.0)/3)^2 = 0.0(5)
- "varianceValue" to DoubleWithTolerance(value = 0.05, tolerance = 0.05),
+ "varianceValue" to DoubleWithTolerance(value = 0.05, tolerance = 0.1),
"quantilesValue_0.5" to DoubleWithTolerance(value = 1.0, tolerance = 0.5),
"sumAnotherValue" to DoubleWithTolerance(value = 4.5, tolerance = 0.5),
"meanAnotherValue" to DoubleWithTolerance(value = 1.5, tolerance = 0.5),
- "varianceAnotherValue" to DoubleWithTolerance(value = 1.16, tolerance = 0.05),
+ "varianceAnotherValue" to DoubleWithTolerance(value = 1.16, tolerance = 0.1),
"quantilesAnotherValue_0.5" to DoubleWithTolerance(value = 2.0, tolerance = 0.5),
),
mapOf(
@@ -918,11 +918,11 @@
"cnt" to DoubleWithTolerance(value = 3.0, tolerance = 0.5),
"sumValue" to DoubleWithTolerance(value = 2.5, tolerance = 0.5),
"meanValue" to DoubleWithTolerance(value = 0.83, tolerance = 0.5),
- "varianceValue" to DoubleWithTolerance(value = 0.05, tolerance = 0.05),
+ "varianceValue" to DoubleWithTolerance(value = 0.05, tolerance = 0.1),
"quantilesValue_0.5" to DoubleWithTolerance(value = 1.0, tolerance = 0.5),
"sumAnotherValue" to DoubleWithTolerance(value = 4.5, tolerance = 0.5),
"meanAnotherValue" to DoubleWithTolerance(value = 1.5, tolerance = 0.5),
- "varianceAnotherValue" to DoubleWithTolerance(value = 1.16, tolerance = 0.05),
+ "varianceAnotherValue" to DoubleWithTolerance(value = 1.16, tolerance = 0.1),
"quantilesAnotherValue_0.5" to DoubleWithTolerance(value = 2.0, tolerance = 0.5),
),
mapOf(
diff --git a/pipelinedp4j/tests/com/google/privacy/differentialprivacy/pipelinedp4j/api/SparkDataFrameApiTest.kt b/pipelinedp4j/tests/com/google/privacy/differentialprivacy/pipelinedp4j/api/SparkDataFrameApiTest.kt
index 06fb707..e4d895c 100644
--- a/pipelinedp4j/tests/com/google/privacy/differentialprivacy/pipelinedp4j/api/SparkDataFrameApiTest.kt
+++ b/pipelinedp4j/tests/com/google/privacy/differentialprivacy/pipelinedp4j/api/SparkDataFrameApiTest.kt
@@ -847,14 +847,14 @@
}
@Test
- fun run_publicGroups_allPossibleValueAggregations_calculatesStatisticsCorrectly() {
+ fun run_publicGroups_allPossibleAggregationsOverMultipleValues_calculatesStatisticsCorrectly() {
val data =
createInputData(
listOf(
- TestDataRow("group1", "pid1", 1.0),
- TestDataRow("group1", "pid1", 1.5),
- TestDataRow("group1", "pid2", 2.0),
- TestDataRow("nonPublicGroup", "pid2", 2.0),
+ TestDataRow("group1", "pid1", 1.0, 2.0),
+ TestDataRow("group1", "pid1", 0.5, 2.5),
+ TestDataRow("group1", "pid2", 1.5, 0.0),
+ TestDataRow("nonPublicGroup", "pid2", 3.0),
)
)
val publicGroups = createPublicGroups(listOf("group1"))
@@ -873,59 +873,21 @@
.aggregateValue(
"value",
ValueAggregationsBuilder()
- .sum("sumResult")
- .mean("meanResult")
- .variance("varianceResult")
- .quantiles(ranks = listOf(0.5), outputColumnName = "quantilesResult"),
- ContributionBounds(valueBounds = Bounds(minValue = 1.0, maxValue = 2.0)),
+ .sum("sumValue")
+ .mean("meanValue")
+ .variance("varianceValue")
+ .quantiles(ranks = listOf(0.5), outputColumnName = "quantilesValue"),
+ ContributionBounds(valueBounds = Bounds(minValue = 0.5, maxValue = 1.0)),
)
- .build(TotalBudget(epsilon = 1000.0), NoiseKind.LAPLACE)
-
- val result: SparkDataFrame = query.run()
-
- val expected =
- listOf(
- QueryPerGroupResultWithTolerance(
- "group1",
- mapOf(
- "pidCnt" to DoubleWithTolerance(value = 2.0, tolerance = 0.5),
- "cnt" to DoubleWithTolerance(value = 3.0, tolerance = 0.5),
- "sumResult" to DoubleWithTolerance(value = 4.5, tolerance = 0.5),
- "meanResult" to DoubleWithTolerance(value = 1.5, tolerance = 0.5),
- // (1^2+(1.5)^2+2^2)/3-((1.0+1.5+2)/3)^2 = 0.1(6)
- "varianceResult" to DoubleWithTolerance(value = 0.16, tolerance = 0.05),
- "quantilesResult_0.5" to DoubleWithTolerance(value = 1.5, tolerance = 0.5),
- ),
- vectorAggregationResults = mapOf(),
+ .aggregateValue(
+ "anotherValue",
+ ValueAggregationsBuilder()
+ .sum("sumAnotherValue")
+ .mean("meanAnotherValue")
+ .variance("varianceAnotherValue")
+ .quantiles(ranks = listOf(0.5), outputColumnName = "quantilesAnotherValue"),
+ ContributionBounds(valueBounds = Bounds(minValue = 0.0, maxValue = 3.0)),
)
- )
- assertEquals(result, expected)
- }
-
- @Test
- fun run_publicGroups_allPossibleVectorAggregations_calculatesStatisticsCorrectly() {
- val data =
- createInputData(
- listOf(
- TestDataRow("group1", "pid1", 1.0, 2.0),
- TestDataRow("group1", "pid1", 0.5, 2.5),
- TestDataRow("group1", "pid2", 1.0, 0.0),
- TestDataRow("nonPublicGroup", "pid2", 3.0),
- )
- )
- val publicGroups = createPublicGroups(listOf("group1"))
- val query =
- SparkDataFrameQueryBuilder.from(
- data,
- ColumnNames("privacyUnit"),
- ContributionBoundingLevel.DATASET_LEVEL(
- maxGroupsContributed = 1,
- maxContributionsPerGroup = 2,
- ),
- )
- .groupBy(ColumnNames("groupKey"), GroupsType.PublicGroups.createForDataFrame(publicGroups))
- .countDistinctPrivacyUnits("pidCnt")
- .count("cnt")
.aggregateVector(
ColumnNames("value", "anotherValue"),
vectorSize = 2,
@@ -945,15 +907,24 @@
mapOf(
"pidCnt" to DoubleWithTolerance(value = 2.0, tolerance = 0.5),
"cnt" to DoubleWithTolerance(value = 3.0, tolerance = 0.5),
+ "sumValue" to DoubleWithTolerance(value = 2.5, tolerance = 0.5),
+ "meanValue" to DoubleWithTolerance(value = 0.83, tolerance = 0.5),
+ // (1^2+(0.5)^2+1^2)/3-((1.0+0.5+1.0)/3)^2 = 0.0(5)
+ "varianceValue" to DoubleWithTolerance(value = 0.05, tolerance = 0.1),
+ "quantilesValue_0.5" to DoubleWithTolerance(value = 1.0, tolerance = 0.5),
+ "sumAnotherValue" to DoubleWithTolerance(value = 4.5, tolerance = 0.5),
+ "meanAnotherValue" to DoubleWithTolerance(value = 1.5, tolerance = 0.5),
+ "varianceAnotherValue" to DoubleWithTolerance(value = 1.16, tolerance = 0.1),
+ "quantilesAnotherValue_0.5" to DoubleWithTolerance(value = 2.0, tolerance = 0.5),
),
mapOf(
// pid1: (1.0, 2.0) + (0.5, 2.5) = (1.5, 4.5), L_INF norm is 4.5 =>
// clip it (1.5, 2.0).
- // pid2: (1.0, 0.0), L_INF norm is 1.0 => no clipping.
- // result: (1.5, 2.0) + (1.0, 0.0) = (2.5, 2.0)
+ // pid2: (1.5, 0.0), L_INF norm is 1.5 => no clipping.
+ // result: (1.5, 2.0) + (1.5, 0.0) = (3.0, 2.0)
"vectorSumResult" to
listOf(
- DoubleWithTolerance(value = 2.5, tolerance = 0.5),
+ DoubleWithTolerance(value = 3.0, tolerance = 0.5),
DoubleWithTolerance(value = 2.0, tolerance = 0.5),
)
),
@@ -963,7 +934,7 @@
}
@Test
- fun run_publicGroups_multipleValueAndVectorAggregations_calculatesStatisticsCorrectly() {
+ fun run_privateGroups_allPossibleAggregationsOverMultipleValues_calculatesStatisticsCorrectly() {
val data =
createInputData(
listOf(
@@ -973,238 +944,6 @@
TestDataRow("nonPublicGroup", "pid2", 3.0),
)
)
- val publicGroups = createPublicGroups(listOf("group1"))
- val query =
- SparkDataFrameQueryBuilder.from(
- data,
- ColumnNames("privacyUnit"),
- ContributionBoundingLevel.DATASET_LEVEL(
- maxGroupsContributed = 1,
- maxContributionsPerGroup = 2,
- ),
- )
- .groupBy(ColumnNames("groupKey"), GroupsType.PublicGroups.createForDataFrame(publicGroups))
- .countDistinctPrivacyUnits("pidCnt")
- .count("cnt")
- .aggregateValue(
- "value",
- ValueAggregationsBuilder().sum("sumValue"),
- ContributionBounds(totalValueBounds = Bounds(1.0, 2.0)),
- )
- .aggregateValue(
- "anotherValue",
- ValueAggregationsBuilder().sum("sumAnotherValue"),
- ContributionBounds(totalValueBounds = Bounds(0.0, 3.0)),
- )
- .aggregateVector(
- ColumnNames("value", "anotherValue"),
- vectorSize = 2,
- VectorAggregationsBuilder().vectorSum("vectorSumResult"),
- VectorContributionBounds(
- maxVectorTotalNorm = VectorNorm(normKind = NormKind.L_INF, value = 2.0)
- ),
- )
- .build(TotalBudget(epsilon = 1000.0), NoiseKind.LAPLACE)
-
- val result: SparkDataFrame = query.run()
-
- // sumValue: pid1 contributes 1.5, pid2 contributes 1.0. Total 2.5
- // sumAnotherValue:
- // pid1 contributes 2.0 + 2.5 = 4.5. Bounded by [0.0, 3.0], so clipped to 3.0
- // pid2 contributes 0.0. Bounded by [0.0, 3.0], so it is 0.0
- // Total sumAnotherValue = 3.0 + 0.0 = 3.0
-
- val expected =
- listOf(
- QueryPerGroupResultWithTolerance(
- "group1",
- mapOf(
- "pidCnt" to DoubleWithTolerance(value = 2.0, tolerance = 0.5),
- "cnt" to DoubleWithTolerance(value = 3.0, tolerance = 0.5),
- "sumValue" to DoubleWithTolerance(value = 2.5, tolerance = 0.5),
- "sumAnotherValue" to DoubleWithTolerance(value = 3.0, tolerance = 0.5),
- ),
- mapOf(
- "vectorSumResult" to
- listOf(
- DoubleWithTolerance(value = 2.5, tolerance = 0.5),
- DoubleWithTolerance(value = 2.0, tolerance = 0.5),
- )
- ),
- )
- )
- assertEquals(result, expected)
- }
-
- @Test
- fun run_privateGroups_multipleValueAndVectorAggregations_calculatesStatisticsCorrectly() {
- val data =
- createInputData(
- listOf(
- TestDataRow("group1", "pid1", 1.0, 2.0),
- TestDataRow("group1", "pid1", 0.5, 2.5),
- TestDataRow("group1", "pid2", 1.0, 0.0),
- TestDataRow("nonPublicGroup", "pid2", 3.0),
- )
- )
- val query =
- SparkDataFrameQueryBuilder.from(
- data,
- ColumnNames("privacyUnit"),
- ContributionBoundingLevel.DATASET_LEVEL(
- maxGroupsContributed = 2,
- maxContributionsPerGroup = 2,
- ),
- )
- .groupBy(ColumnNames("groupKey"), GroupsType.PrivateGroups())
- .countDistinctPrivacyUnits("pidCnt")
- .count("cnt")
- .aggregateValue(
- "value",
- ValueAggregationsBuilder().sum("sumValue"),
- ContributionBounds(totalValueBounds = Bounds(1.0, 2.0)),
- )
- .aggregateValue(
- "anotherValue",
- ValueAggregationsBuilder().sum("sumAnotherValue"),
- ContributionBounds(totalValueBounds = Bounds(0.0, 3.0)),
- )
- .aggregateVector(
- ColumnNames("value", "anotherValue"),
- vectorSize = 2,
- VectorAggregationsBuilder().vectorSum("vectorSumResult"),
- VectorContributionBounds(
- maxVectorTotalNorm = VectorNorm(normKind = NormKind.L_INF, value = 2.0)
- ),
- )
- .build(TotalBudget(epsilon = 3500.0, delta = 0.001), NoiseKind.LAPLACE)
-
- val result: SparkDataFrame = query.run()
-
- // sumValue:
- // group1: pid1 contributes 1.5 (in [1,2]), pid2 contributes 1.0 (in [1,2]). Total 2.5.
- // nonPublicGroup: pid2 contributes 3.0, clipped to 2.0 by [1,2]. Total 2.0.
- // sumAnotherValue:
- // group1: pid1 contributes 4.5, clipped to 3.0 by [0,3]. pid2 contributes 0.0 (in [0,3]). Total
- // 3.0.
- // nonPublicGroup: pid2 contributes 0.0 (in [0,3]). Total 0.0.
- // vectorSumResult:
- // group1: pid1 contributes (1.5, 4.5), L_INF-clipped to (1.5, 2.0). pid2 contributes (1.0,
- // 0.0),
- // not clipped.
- // Total for group1: (1.5, 2.0) + (1.0, 0.0) = (2.5, 2.0).
- // nonPublicGroup: pid2 contributes (3.0, 0.0), L_INF-clipped to (2.0, 0.0).
- val expected =
- listOf(
- QueryPerGroupResultWithTolerance(
- "group1",
- mapOf(
- "pidCnt" to DoubleWithTolerance(value = 2.0, tolerance = 0.5),
- "cnt" to DoubleWithTolerance(value = 3.0, tolerance = 0.5),
- "sumValue" to DoubleWithTolerance(value = 2.5, tolerance = 0.5),
- "sumAnotherValue" to DoubleWithTolerance(value = 3.0, tolerance = 0.5),
- ),
- mapOf(
- "vectorSumResult" to
- listOf(
- DoubleWithTolerance(value = 2.5, tolerance = 0.5),
- DoubleWithTolerance(value = 2.0, tolerance = 0.5),
- )
- ),
- )
- )
- assertEquals(result, expected)
- }
-
- @Test
- fun run_privateGroups_noPidCount_multipleValueAndVectorAggregations_calculatesStatisticsCorrectly() {
- val data =
- createInputData(
- listOf(
- TestDataRow("group1", "pid1", 1.0, 2.0),
- TestDataRow("group1", "pid1", 0.5, 2.5),
- TestDataRow("group1", "pid2", 1.0, 0.0),
- TestDataRow("nonPublicGroup", "pid2", 3.0),
- )
- )
- val query =
- SparkDataFrameQueryBuilder.from(
- data,
- ColumnNames("privacyUnit"),
- ContributionBoundingLevel.DATASET_LEVEL(
- maxGroupsContributed = 2,
- maxContributionsPerGroup = 2,
- ),
- )
- .groupBy(ColumnNames("groupKey"), GroupsType.PrivateGroups())
- .count("cnt")
- .aggregateValue(
- "value",
- ValueAggregationsBuilder().sum("sumValue"),
- ContributionBounds(totalValueBounds = Bounds(1.0, 2.0)),
- )
- .aggregateValue(
- "anotherValue",
- ValueAggregationsBuilder().sum("sumAnotherValue"),
- ContributionBounds(totalValueBounds = Bounds(0.0, 3.0)),
- )
- .aggregateVector(
- ColumnNames("value", "anotherValue"),
- vectorSize = 2,
- VectorAggregationsBuilder().vectorSum("vectorSumResult"),
- VectorContributionBounds(
- maxVectorTotalNorm = VectorNorm(normKind = NormKind.L_INF, value = 2.0)
- ),
- )
- .build(TotalBudget(epsilon = 3500.0, delta = 0.001), NoiseKind.LAPLACE)
-
- val result: SparkDataFrame = query.run()
-
- // sumValue:
- // group1: pid1 contributes 1.5 (in [1,2]), pid2 contributes 1.0 (in [1,2]). Total 2.5.
- // nonPublicGroup: pid2 contributes 3.0, clipped to 2.0 by [1,2]. Total 2.0.
- // sumAnotherValue:
- // group1: pid1 contributes 4.5, clipped to 3.0 by [0,3]. pid2 contributes 0.0 (in [0,3]). Total
- // 3.0.
- // nonPublicGroup: pid2 contributes 0.0 (in [0,3]). Total 0.0.
- // vectorSumResult:
- // group1: pid1 contributes (1.5, 4.5), L_INF-clipped to (1.5, 2.0). pid2 contributes (1.0,
- // 0.0),
- // not clipped.
- // Total for group1: (1.5, 2.0) + (1.0, 0.0) = (2.5, 2.0).
- // nonPublicGroup: pid2 contributes (3.0, 0.0), L_INF-clipped to (2.0, 0.0).
- val expected =
- listOf(
- QueryPerGroupResultWithTolerance(
- "group1",
- mapOf(
- "cnt" to DoubleWithTolerance(value = 3.0, tolerance = 0.5),
- "sumValue" to DoubleWithTolerance(value = 2.5, tolerance = 0.5),
- "sumAnotherValue" to DoubleWithTolerance(value = 3.0, tolerance = 0.5),
- ),
- mapOf(
- "vectorSumResult" to
- listOf(
- DoubleWithTolerance(value = 2.5, tolerance = 0.5),
- DoubleWithTolerance(value = 2.0, tolerance = 0.5),
- )
- ),
- )
- )
- assertEquals(result, expected)
- }
-
- @Test
- fun run_privateGroups_allPossibleValueAggregations_calculatesStatisticsCorrectly() {
- val data =
- createInputData(
- listOf(
- TestDataRow("group1", "pid1", 1.0),
- TestDataRow("group1", "pid1", 1.5),
- TestDataRow("group1", "pid2", 2.0),
- TestDataRow("group2", "pid1", 1.0),
- )
- )
val query =
SparkDataFrameQueryBuilder.from(
data,
@@ -1220,64 +959,27 @@
.aggregateValue(
"value",
ValueAggregationsBuilder()
- .sum("sumResult")
- .mean("meanResult")
- .variance("varianceResult")
- .quantiles(ranks = listOf(0.5), outputColumnName = "quantilesResult"),
- ContributionBounds(valueBounds = Bounds(minValue = 1.0, maxValue = 2.0)),
+ .sum("sumValue")
+ .mean("meanValue")
+ .variance("varianceValue")
+ .quantiles(ranks = listOf(0.5), outputColumnName = "quantilesValue"),
+ ContributionBounds(valueBounds = Bounds(minValue = 0.5, maxValue = 1.0)),
)
- .build(TotalBudget(epsilon = 3500.0, delta = 0.001), NoiseKind.LAPLACE)
-
- val result: SparkDataFrame = query.run()
-
- val expected =
- listOf(
- QueryPerGroupResultWithTolerance(
- "group1",
- mapOf(
- "pidCnt" to DoubleWithTolerance(value = 2.0, tolerance = 0.5),
- "cnt" to DoubleWithTolerance(value = 3.0, tolerance = 0.5),
- "sumResult" to DoubleWithTolerance(value = 4.5, tolerance = 0.5),
- "meanResult" to DoubleWithTolerance(value = 1.5, tolerance = 0.5),
- // (1^2+(1.5)^2+2^2)/3-((1.0+1.5+2)/3)^2 = 0.1(6)
- "varianceResult" to DoubleWithTolerance(value = 0.16, tolerance = 0.05),
- "quantilesResult_0.5" to DoubleWithTolerance(value = 1.5, tolerance = 0.5),
- ),
- vectorAggregationResults = mapOf(),
+ .aggregateValue(
+ "anotherValue",
+ ValueAggregationsBuilder()
+ .sum("sumAnotherValue")
+ .mean("meanAnotherValue")
+ .variance("varianceAnotherValue")
+ .quantiles(ranks = listOf(0.5), outputColumnName = "quantilesAnotherValue"),
+ ContributionBounds(valueBounds = Bounds(minValue = 0.0, maxValue = 2.5)),
)
- )
- assertEquals(result, expected)
- }
-
- @Test
- fun run_privateGroups_allPossibleVectorAggregations_calculatesStatisticsCorrectly() {
- val data =
- createInputData(
- listOf(
- TestDataRow("group1", "pid1", 1.0, 2.0),
- TestDataRow("group1", "pid1", 1.5, 2.5),
- TestDataRow("group1", "pid2", 3.0, -1.0),
- TestDataRow("group2", "pid1", -1.0, -3.0),
- )
- )
- val query =
- SparkDataFrameQueryBuilder.from(
- data,
- ColumnNames("privacyUnit"),
- ContributionBoundingLevel.DATASET_LEVEL(
- maxGroupsContributed = 2,
- maxContributionsPerGroup = 2,
- ),
- )
- .groupBy(ColumnNames("groupKey"), GroupsType.PrivateGroups())
- .countDistinctPrivacyUnits("pidCnt")
- .count("cnt")
.aggregateVector(
ColumnNames("value", "anotherValue"),
vectorSize = 2,
VectorAggregationsBuilder().vectorSum("vectorSumResult"),
VectorContributionBounds(
- maxVectorTotalNorm = VectorNorm(normKind = NormKind.L1, value = 5.0)
+ maxVectorTotalNorm = VectorNorm(normKind = NormKind.L1, value = 2.0)
),
)
.build(TotalBudget(epsilon = 3500.0, delta = 0.001), NoiseKind.LAPLACE)
@@ -1291,16 +993,25 @@
mapOf(
"pidCnt" to DoubleWithTolerance(value = 2.0, tolerance = 0.5),
"cnt" to DoubleWithTolerance(value = 3.0, tolerance = 0.5),
+ "sumValue" to DoubleWithTolerance(value = 2.5, tolerance = 0.5),
+ "meanValue" to DoubleWithTolerance(value = 0.83, tolerance = 0.5),
+ "varianceValue" to DoubleWithTolerance(value = 0.05, tolerance = 0.1),
+ "quantilesValue_0.5" to DoubleWithTolerance(value = 1.0, tolerance = 0.5),
+ "sumAnotherValue" to DoubleWithTolerance(value = 4.5, tolerance = 0.5),
+ "meanAnotherValue" to DoubleWithTolerance(value = 1.5, tolerance = 0.5),
+ "varianceAnotherValue" to DoubleWithTolerance(value = 1.16, tolerance = 0.1),
+ "quantilesAnotherValue_0.5" to DoubleWithTolerance(value = 2.0, tolerance = 0.5),
),
mapOf(
+ // pid1: (1.0, 2.0) + (0.5, 2.5) = (1.5, 4.5), L1 norm is 6 =>
+ // clip it to (1.5, 4.5) * 2.0 / 6 = (0.5, 1.5)
+ // pid2: (1.0, 0.0), L1 norm is 1.0 => no clipping.
+ // result: (0.5, 1.5) + (1.0, 0.0) = (1.5, 1.5)
+ // nonPublicGroup: pid2 contributes (3.0, 0.0), L1-clipped to (2.0, 0.0).
"vectorSumResult" to
- // pid1: (1.0, 2.0) + (1.5, 2.5) = (2.5, 4.5), L1 norm is 7 =>
- // clip it to (2.5, 4.5) * 5.0 / 7.0 = (1.8, 3.2)
- // pid2: (3.0, -1.0), L1 norm is 4.0 => no clipping.
- // result: (1.8, 3.2) + (3.0, -1.0) = (4.8, 2.2)
listOf(
- DoubleWithTolerance(value = 4.8, tolerance = 0.5),
- DoubleWithTolerance(value = 2.2, tolerance = 0.5),
+ DoubleWithTolerance(value = 1.5, tolerance = 0.5),
+ DoubleWithTolerance(value = 1.5, tolerance = 0.5),
)
),
)
@@ -1310,14 +1021,14 @@
// When counting distinct privacy units different group selection mechanism is used.
@Test
- fun run_privateGroups_noCountDistinctPrivacyUnits_calculatesStatisticsCorrectly() {
+ fun run_privateGroups_noCountDistinctPrivacyUnits_multipleValueAndVectorAggregations_calculatesStatisticsCorrectly() {
val data =
createInputData(
listOf(
- TestDataRow("group1", "pid1", 1.0),
- TestDataRow("group1", "pid1", 1.5),
- TestDataRow("group1", "pid2", 2.0),
- TestDataRow("group2", "pid1", 1.0),
+ TestDataRow("group1", "pid1", 1.0, 2.0),
+ TestDataRow("group1", "pid1", 0.5, 2.5),
+ TestDataRow("group1", "pid2", 1.0, 0.0),
+ TestDataRow("nonPublicGroup", "pid2", 3.0),
)
)
val query =
@@ -1333,30 +1044,55 @@
.count("cnt")
.aggregateValue(
"value",
- ValueAggregationsBuilder()
- .sum("sumResult")
- .mean("meanResult")
- .variance("varianceResult")
- .quantiles(ranks = listOf(0.5), outputColumnName = "quantilesResult"),
- ContributionBounds(valueBounds = Bounds(minValue = 1.0, maxValue = 2.0)),
+ ValueAggregationsBuilder().sum("sumValue"),
+ ContributionBounds(totalValueBounds = Bounds(1.0, 2.0)),
)
- .build(TotalBudget(epsilon = 3000.0, delta = 0.001), NoiseKind.GAUSSIAN)
+ .aggregateValue(
+ "anotherValue",
+ ValueAggregationsBuilder().sum("sumAnotherValue"),
+ ContributionBounds(totalValueBounds = Bounds(0.0, 3.0)),
+ )
+ .aggregateVector(
+ ColumnNames("value", "anotherValue"),
+ vectorSize = 2,
+ VectorAggregationsBuilder().vectorSum("vectorSumResult"),
+ VectorContributionBounds(
+ maxVectorTotalNorm = VectorNorm(normKind = NormKind.L_INF, value = 2.0)
+ ),
+ )
+ .build(TotalBudget(epsilon = 3500.0, delta = 0.001), NoiseKind.LAPLACE)
val result: SparkDataFrame = query.run()
+ // sumValue:
+ // group1: pid1 contributes 1.5 (in [1,2]), pid2 contributes 1.0 (in [1,2]). Total 2.5.
+ // nonPublicGroup: pid2 contributes 3.0, clipped to 2.0 by [1,2]. Total 2.0.
+ // sumAnotherValue:
+ // group1: pid1 contributes 4.5, clipped to 3.0 by [0,3]. pid2 contributes 0.0 (in [0,3]). Total
+ // 3.0.
+ // nonPublicGroup: pid2 contributes 0.0 (in [0,3]). Total 0.0.
+ // vectorSumResult:
+ // group1: pid1 contributes (1.5, 4.5), L_INF-clipped to (1.5, 2.0). pid2 contributes (1.0,
+ // 0.0),
+ // not clipped.
+ // Total for group1: (1.5, 2.0) + (1.0, 0.0) = (2.5, 2.0).
+ // nonPublicGroup: pid2 contributes (3.0, 0.0), L_INF-clipped to (2.0, 0.0).
val expected =
listOf(
QueryPerGroupResultWithTolerance(
"group1",
mapOf(
"cnt" to DoubleWithTolerance(value = 3.0, tolerance = 0.5),
- "sumResult" to DoubleWithTolerance(value = 4.5, tolerance = 0.5),
- "meanResult" to DoubleWithTolerance(value = 1.5, tolerance = 0.5),
- // (1^2+(1.5)^2+2^2)/3-((1.0+1.5+2)/3)^2 = 0.1(6)
- "varianceResult" to DoubleWithTolerance(value = 0.16, tolerance = 0.05),
- "quantilesResult_0.5" to DoubleWithTolerance(value = 1.5, tolerance = 0.5),
+ "sumValue" to DoubleWithTolerance(value = 2.5, tolerance = 0.5),
+ "sumAnotherValue" to DoubleWithTolerance(value = 3.0, tolerance = 0.5),
),
- vectorAggregationResults = mapOf(),
+ mapOf(
+ "vectorSumResult" to
+ listOf(
+ DoubleWithTolerance(value = 2.5, tolerance = 0.5),
+ DoubleWithTolerance(value = 2.0, tolerance = 0.5),
+ )
+ ),
)
)
assertEquals(result, expected)