Skip to content

Commit

Permalink
Add in-cluster test with DP
Browse files Browse the repository at this point in the history
  • Loading branch information
divergentdave committed Jul 24, 2024
1 parent 84a789d commit cbc912c
Show file tree
Hide file tree
Showing 3 changed files with 172 additions and 24 deletions.
21 changes: 12 additions & 9 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ console-subscriber = "0.2.0"
deadpool = "0.12.1"
deadpool-postgres = "0.13.2"
derivative = "2.2.0"
divviup-client = "0.2"
divviup-client = "0.4"
fixed = "1.27"
fixed-macro = "1.1.1"
futures = "0.3.30"
Expand Down
173 changes: 159 additions & 14 deletions integration_tests/tests/integration/in_cluster.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
#![cfg(feature = "in-cluster")]

use crate::{
common::{build_test_task, submit_measurements_and_verify_aggregate, TestContext},
common::{
build_test_task, collect_aggregate_result_generic,
submit_measurements_and_verify_aggregate, submit_measurements_generic, TestContext,
},
initialize_rustls,
};
use chrono::prelude::*;
use clap::{CommandFactory, FromArgMatches, Parser};
use divviup_client::{
Client, DivviupClient, Histogram, HpkeConfig, NewAggregator, NewSharedAggregator, NewTask, Vdaf,
Client, DivviupClient, Histogram, HpkeConfig, NewAggregator, NewSharedAggregator, NewTask,
SumVec, Vdaf,
};
use janus_aggregator_core::task::{test_util::TaskBuilder, QueryType};
#[cfg(feature = "ohttp")]
Expand All @@ -25,7 +29,13 @@ use janus_core::{
};
use janus_integration_tests::{client::ClientBackend, TaskParameters};
use janus_messages::{Duration as JanusDuration, TaskId};
use std::{env, str::FromStr, time::Duration};
use prio::{
dp::{
distributions::PureDpDiscreteLaplace, DifferentialPrivacyStrategy, PureDpBudget, Rational,
},
vdaf::prio3::Prio3,
};
use std::{env, iter, str::FromStr, time::Duration};
use trillium_rustls::RustlsConfig;
use trillium_tokio::ClientConfig;
use url::Url;
Expand Down Expand Up @@ -408,20 +418,30 @@ impl InClusterJanusPair {
bits,
length,
chunk_length,
dp_strategy: _,
} => Vdaf::SumVec {
bits: bits.try_into().unwrap(),
length: length.try_into().unwrap(),
chunk_length: Some(chunk_length.try_into().unwrap()),
},
dp_strategy,
} => {
let dp_strategy =
serde_json::from_value(serde_json::to_value(dp_strategy).unwrap()).unwrap();
Vdaf::SumVec(SumVec::new(
bits.try_into().unwrap(),
length.try_into().unwrap(),
Some(chunk_length.try_into().unwrap()),
dp_strategy,
))
}
VdafInstance::Prio3Histogram {
length,
chunk_length,
dp_strategy: _,
} => Vdaf::Histogram(Histogram::Length {
length: length.try_into().unwrap(),
chunk_length: Some(chunk_length.try_into().unwrap()),
}),
dp_strategy,
} => {
let dp_strategy =
serde_json::from_value(serde_json::to_value(dp_strategy).unwrap()).unwrap();
Vdaf::Histogram(Histogram::Length {
length: length.try_into().unwrap(),
chunk_length: Some(chunk_length.try_into().unwrap()),
dp_strategy,
})
}
other => panic!("unsupported vdaf {other:?}"),
},
min_batch_size: task.min_batch_size(),
Expand Down Expand Up @@ -897,3 +917,128 @@ mod rate_limits {
.await
}
}

#[tokio::test(flavor = "multi_thread")]
async fn in_cluster_histogram_dp_noise() {
static TEST_NAME: &str = "in_cluster_histogram_dp_noise";
const HISTOGRAM_LENGTH: usize = 100;
const CHUNK_LENGTH: usize = 10;

install_test_trace_subscriber();
initialize_rustls();

// Start port forwards and set up task.
let epsilon = Rational::from_unsigned(1u128, 10u128).unwrap();
let janus_pair = InClusterJanusPair::new(
VdafInstance::Prio3Histogram {
length: HISTOGRAM_LENGTH,
chunk_length: CHUNK_LENGTH,
dp_strategy: vdaf_dp_strategies::Prio3Histogram::PureDpDiscreteLaplace(
PureDpDiscreteLaplace::from_budget(PureDpBudget::new(epsilon).unwrap()),
),
},
QueryType::FixedSize {
max_batch_size: Some(110),
batch_time_window_size: Some(JanusDuration::from_hours(8).unwrap()),
},
)
.await;
let vdaf = Prio3::new_histogram_multithreaded(2, HISTOGRAM_LENGTH, CHUNK_LENGTH).unwrap();

let total_measurements: usize = janus_pair
.task_parameters
.min_batch_size
.try_into()
.unwrap();
let measurements = iter::repeat(0).take(total_measurements).collect::<Vec<_>>();
let client_implementation = ClientBackend::InProcess
.build(
TEST_NAME,
&janus_pair.task_parameters,
(janus_pair.leader.port(), janus_pair.helper.port()),
vdaf.clone(),
)
.await
.unwrap();
let before_timestamp = submit_measurements_generic(&measurements, &client_implementation).await;
let (report_count, aggregate_result) = collect_aggregate_result_generic(
&janus_pair.task_parameters,
janus_pair.leader.port(),
vdaf,
before_timestamp,
&(),
)
.await;
assert_eq!(report_count, janus_pair.task_parameters.min_batch_size);

let mut un_noised_result = [0u128; HISTOGRAM_LENGTH];
un_noised_result[0] = report_count.into();
// Smoke test: Just confirm that some noise was added. Since epsilon is small, the noise will be
// large (drawn from Laplace_Z(20) + Laplace_Z(20)), and it is highly unlikely that all 100
// noise values will be zero simultaneously.
assert_ne!(aggregate_result, un_noised_result);
}

#[tokio::test(flavor = "multi_thread")]
async fn in_cluster_sumvec_dp_noise() {
static TEST_NAME: &str = "in_cluster_sumvec_dp_noise";
const VECTOR_LENGTH: usize = 50;
const BITS: usize = 2;
const CHUNK_LENGTH: usize = 10;

install_test_trace_subscriber();
initialize_rustls();

// Start port forwards and set up task.
let epsilon = Rational::from_unsigned(1u128, 10u128).unwrap();
let janus_pair = InClusterJanusPair::new(
VdafInstance::Prio3SumVec {
bits: BITS,
length: VECTOR_LENGTH,
chunk_length: CHUNK_LENGTH,
dp_strategy: vdaf_dp_strategies::Prio3SumVec::PureDpDiscreteLaplace(
PureDpDiscreteLaplace::from_budget(PureDpBudget::new(epsilon).unwrap()),
),
},
QueryType::FixedSize {
max_batch_size: Some(110),
batch_time_window_size: Some(JanusDuration::from_hours(8).unwrap()),
},
)
.await;
let vdaf = Prio3::new_sum_vec_multithreaded(2, BITS, VECTOR_LENGTH, CHUNK_LENGTH).unwrap();

let total_measurements: usize = janus_pair
.task_parameters
.min_batch_size
.try_into()
.unwrap();
let measurements = iter::repeat(vec![0; VECTOR_LENGTH])
.take(total_measurements)
.collect::<Vec<_>>();
let client_implementation = ClientBackend::InProcess
.build(
TEST_NAME,
&janus_pair.task_parameters,
(janus_pair.leader.port(), janus_pair.helper.port()),
vdaf.clone(),
)
.await
.unwrap();
let before_timestamp = submit_measurements_generic(&measurements, &client_implementation).await;
let (report_count, aggregate_result) = collect_aggregate_result_generic(
&janus_pair.task_parameters,
janus_pair.leader.port(),
vdaf,
before_timestamp,
&(),
)
.await;
assert_eq!(report_count, janus_pair.task_parameters.min_batch_size);

let un_noised_result = [0u128; VECTOR_LENGTH];
// Smoke test: Just confirm that some noise was added. Since epsilon is small, the noise will be
// large (drawn from Laplace_Z(150) + Laplace_Z(150)), and it is highly unlikely that all 50
// noise values will be zero simultaneously.
assert_ne!(aggregate_result, un_noised_result);
}

0 comments on commit cbc912c

Please sign in to comment.