Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add in-cluster test with DP #3314

Merged
merged 2 commits into from
Jul 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 12 additions & 9 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ console-subscriber = "0.2.0"
deadpool = "0.12.1"
deadpool-postgres = "0.13.2"
derivative = "2.2.0"
divviup-client = "0.2"
divviup-client = "0.4"
fixed = "1.27"
fixed-macro = "1.1.1"
futures = "0.3.30"
Expand Down
182 changes: 168 additions & 14 deletions integration_tests/tests/integration/in_cluster.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
#![cfg(feature = "in-cluster")]

use crate::{
common::{build_test_task, submit_measurements_and_verify_aggregate, TestContext},
common::{
build_test_task, collect_aggregate_result_generic,
submit_measurements_and_verify_aggregate, submit_measurements_generic, TestContext,
},
initialize_rustls,
};
use chrono::prelude::*;
use clap::{CommandFactory, FromArgMatches, Parser};
use divviup_client::{
Client, DivviupClient, Histogram, HpkeConfig, NewAggregator, NewSharedAggregator, NewTask, Vdaf,
Client, DivviupClient, Histogram, HpkeConfig, NewAggregator, NewSharedAggregator, NewTask,
SumVec, Vdaf,
};
use janus_aggregator_core::task::{test_util::TaskBuilder, QueryType};
#[cfg(feature = "ohttp")]
Expand All @@ -25,7 +29,14 @@ use janus_core::{
};
use janus_integration_tests::{client::ClientBackend, TaskParameters};
use janus_messages::{Duration as JanusDuration, TaskId};
use std::{env, str::FromStr, time::Duration};
use prio::{
dp::{
distributions::PureDpDiscreteLaplace, DifferentialPrivacyStrategy, PureDpBudget, Rational,
},
field::{Field128, FieldElementWithInteger},
vdaf::prio3::Prio3,
};
use std::{env, iter, str::FromStr, time::Duration};
use trillium_rustls::RustlsConfig;
use trillium_tokio::ClientConfig;
use url::Url;
Expand Down Expand Up @@ -408,20 +419,30 @@ impl InClusterJanusPair {
bits,
length,
chunk_length,
dp_strategy: _,
} => Vdaf::SumVec {
bits: bits.try_into().unwrap(),
length: length.try_into().unwrap(),
chunk_length: Some(chunk_length.try_into().unwrap()),
},
dp_strategy,
} => {
let dp_strategy =
serde_json::from_value(serde_json::to_value(dp_strategy).unwrap()).unwrap();
Vdaf::SumVec(SumVec::new(
bits.try_into().unwrap(),
length.try_into().unwrap(),
Some(chunk_length.try_into().unwrap()),
dp_strategy,
))
}
VdafInstance::Prio3Histogram {
length,
chunk_length,
dp_strategy: _,
} => Vdaf::Histogram(Histogram::Length {
length: length.try_into().unwrap(),
chunk_length: Some(chunk_length.try_into().unwrap()),
}),
dp_strategy,
} => {
let dp_strategy =
serde_json::from_value(serde_json::to_value(dp_strategy).unwrap()).unwrap();
Vdaf::Histogram(Histogram::Length {
length: length.try_into().unwrap(),
chunk_length: Some(chunk_length.try_into().unwrap()),
dp_strategy,
})
}
other => panic!("unsupported vdaf {other:?}"),
},
min_batch_size: task.min_batch_size(),
Expand Down Expand Up @@ -897,3 +918,136 @@ mod rate_limits {
.await
}
}

#[tokio::test(flavor = "multi_thread")]
async fn in_cluster_histogram_dp_noise() {
static TEST_NAME: &str = "in_cluster_histogram_dp_noise";
const HISTOGRAM_LENGTH: usize = 100;
const CHUNK_LENGTH: usize = 10;

install_test_trace_subscriber();
initialize_rustls();

// Start port forwards and set up task.
let epsilon = Rational::from_unsigned(1u128, 10u128).unwrap();
let janus_pair = InClusterJanusPair::new(
VdafInstance::Prio3Histogram {
length: HISTOGRAM_LENGTH,
chunk_length: CHUNK_LENGTH,
dp_strategy: vdaf_dp_strategies::Prio3Histogram::PureDpDiscreteLaplace(
PureDpDiscreteLaplace::from_budget(PureDpBudget::new(epsilon).unwrap()),
),
},
QueryType::FixedSize {
max_batch_size: Some(110),
batch_time_window_size: Some(JanusDuration::from_hours(8).unwrap()),
},
)
.await;
let vdaf = Prio3::new_histogram_multithreaded(2, HISTOGRAM_LENGTH, CHUNK_LENGTH).unwrap();

let total_measurements: usize = janus_pair
.task_parameters
.min_batch_size
.try_into()
.unwrap();
let measurements = iter::repeat(0).take(total_measurements).collect::<Vec<_>>();
let client_implementation = ClientBackend::InProcess
.build(
TEST_NAME,
&janus_pair.task_parameters,
(janus_pair.leader.port(), janus_pair.helper.port()),
vdaf.clone(),
)
.await
.unwrap();
let before_timestamp = submit_measurements_generic(&measurements, &client_implementation).await;
let (report_count, aggregate_result) = collect_aggregate_result_generic(
&janus_pair.task_parameters,
janus_pair.leader.port(),
vdaf,
before_timestamp,
&(),
)
.await;
assert_eq!(report_count, janus_pair.task_parameters.min_batch_size);

let mut un_noised_result = [0u128; HISTOGRAM_LENGTH];
un_noised_result[0] = report_count.into();
// Smoke test: Just confirm that some noise was added. Since epsilon is small, the noise will be
// large (drawn from Laplace_Z(20) + Laplace_Z(20)), and it is highly unlikely that all 100
// noise values will be zero simultaneously.
assert_ne!(aggregate_result, un_noised_result);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there any useful bounds on the amount of noise added such that we could check that the actual result is close enough to the non-noised result to be plausibly the result of aggregation + DP noise? "Useful" in this case meaning the condition is unlikely to be satisfied if DP noise is not being added correctly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

@divergentdave divergentdave Jul 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could check that each coordinate is not in the middle of the field's range (p*1/4 to p*3/4). That'll be more than far enough out that false positives won't be a problem, while still catching errors in secret sharing/unsharding with a probability of 1/2 on each coordinate.


assert!(aggregate_result
.iter()
.all(|x| *x < Field128::modulus() / 4 || *x > Field128::modulus() / 4 * 3));
}

#[tokio::test(flavor = "multi_thread")]
async fn in_cluster_sumvec_dp_noise() {
static TEST_NAME: &str = "in_cluster_sumvec_dp_noise";
const VECTOR_LENGTH: usize = 50;
const BITS: usize = 2;
const CHUNK_LENGTH: usize = 10;

install_test_trace_subscriber();
initialize_rustls();

// Start port forwards and set up task.
let epsilon = Rational::from_unsigned(1u128, 10u128).unwrap();
let janus_pair = InClusterJanusPair::new(
VdafInstance::Prio3SumVec {
bits: BITS,
length: VECTOR_LENGTH,
chunk_length: CHUNK_LENGTH,
dp_strategy: vdaf_dp_strategies::Prio3SumVec::PureDpDiscreteLaplace(
PureDpDiscreteLaplace::from_budget(PureDpBudget::new(epsilon).unwrap()),
),
},
QueryType::FixedSize {
max_batch_size: Some(110),
batch_time_window_size: Some(JanusDuration::from_hours(8).unwrap()),
},
)
.await;
let vdaf = Prio3::new_sum_vec_multithreaded(2, BITS, VECTOR_LENGTH, CHUNK_LENGTH).unwrap();

let total_measurements: usize = janus_pair
.task_parameters
.min_batch_size
.try_into()
.unwrap();
let measurements = iter::repeat(vec![0; VECTOR_LENGTH])
.take(total_measurements)
.collect::<Vec<_>>();
let client_implementation = ClientBackend::InProcess
.build(
TEST_NAME,
&janus_pair.task_parameters,
(janus_pair.leader.port(), janus_pair.helper.port()),
vdaf.clone(),
)
.await
.unwrap();
let before_timestamp = submit_measurements_generic(&measurements, &client_implementation).await;
let (report_count, aggregate_result) = collect_aggregate_result_generic(
&janus_pair.task_parameters,
janus_pair.leader.port(),
vdaf,
before_timestamp,
&(),
)
.await;
assert_eq!(report_count, janus_pair.task_parameters.min_batch_size);

let un_noised_result = [0u128; VECTOR_LENGTH];
// Smoke test: Just confirm that some noise was added. Since epsilon is small, the noise will be
// large (drawn from Laplace_Z(150) + Laplace_Z(150)), and it is highly unlikely that all 50
// noise values will be zero simultaneously.
assert_ne!(aggregate_result, un_noised_result);

assert!(aggregate_result
.iter()
.all(|x| *x < Field128::modulus() / 4 || *x > Field128::modulus() / 4 * 3));
}
9 changes: 9 additions & 0 deletions integration_tests/tests/integration/janus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use prio::{
dp::{
distributions::PureDpDiscreteLaplace, DifferentialPrivacyStrategy, PureDpBudget, Rational,
},
field::{Field128, FieldElementWithInteger},
vdaf::{dummy, prio3::Prio3},
};
use std::{iter, time::Duration};
Expand Down Expand Up @@ -514,6 +515,10 @@ async fn janus_in_process_histogram_dp_noise() {
// large (drawn from Laplace_Z(20) + Laplace_Z(20)), and it is highly unlikely that all 100
// noise values will be zero simultaneously.
assert_ne!(aggregate_result, un_noised_result);

assert!(aggregate_result
.iter()
.all(|x| *x < Field128::modulus() / 4 || *x > Field128::modulus() / 4 * 3));
}

#[tokio::test(flavor = "multi_thread")]
Expand Down Expand Up @@ -574,4 +579,8 @@ async fn janus_in_process_sumvec_dp_noise() {
// large (drawn from Laplace_Z(150) + Laplace_Z(150)), and it is highly unlikely that all 50
// noise values will be zero simultaneously.
assert_ne!(aggregate_result, un_noised_result);

assert!(aggregate_result
.iter()
.all(|x| *x < Field128::modulus() / 4 || *x > Field128::modulus() / 4 * 3));
}
Loading