Skip to content

Commit

Permalink
integrate with RoundRobinSubmission, in progress
Browse files Browse the repository at this point in the history
  • Loading branch information
eriktaubeneck authored and cberkhoff committed Dec 6, 2024
1 parent 06c8b2e commit add7967
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 18 deletions.
42 changes: 34 additions & 8 deletions ipa-core/src/bin/report_collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::{
fmt::Debug,
fs::{File, OpenOptions},
io,
io::{stdout, Write},
io::{stdout, BufRead, BufReader, Write},
ops::Deref,
path::{Path, PathBuf},
};
Expand All @@ -16,6 +16,7 @@ use ipa_core::{
playbook::{
make_clients, make_sharded_clients, playbook_oprf_ipa, run_hybrid_query_and_validate,
run_query_and_validate, validate, validate_dp, HybridQueryResult, InputSource,
RoundRobinSubmission,
},
CsvSerializer, IpaQueryResult, Verbosity,
},
Expand Down Expand Up @@ -143,6 +144,10 @@ enum ReportCollectorCommand {

#[clap(flatten)]
hybrid_query_config: HybridQueryParams,

/// Number of records to aggreagte
#[clap(long, short = 'n')]
count: u32,
},
}

Expand Down Expand Up @@ -255,7 +260,17 @@ async fn main() -> Result<(), Box<dyn Error>> {
ReportCollectorCommand::MaliciousHybrid {
ref encrypted_inputs,
hybrid_query_config,
} => hybrid(&args, hybrid_query_config, clients, encrypted_inputs).await?,
count,
} => {
hybrid(
&args,
hybrid_query_config,
clients,
encrypted_inputs,
count.try_into().expect("u32 should fit into usize"),
)
.await?
}
};

Ok(())
Expand Down Expand Up @@ -402,6 +417,7 @@ async fn hybrid(
hybrid_query_config: HybridQueryParams,
helper_clients: Vec<[IpaHttpClient<Helper>; 3]>,
encrypted_inputs: &EncryptedInputs,
count: usize,
) -> Result<(), Box<dyn Error>> {
let query_type = QueryType::MaliciousHybrid(hybrid_query_config);

Expand All @@ -411,11 +427,17 @@ async fn hybrid(
&encrypted_inputs.enc_input_file3,
];

// despite the name, this is generic enough to work with hybrid
let encrypted_report_streams = EncryptedOprfReportStreams::from(files);
let submissions = files
.iter()
.map(|path| {
let file =
File::open(path).unwrap_or_else(|e| panic!("unable to open file {path:?}. {e}"));
RoundRobinSubmission::new(BufReader::new(file))
})
.collect::<Vec<_>>();

let query_config = QueryConfig {
size: QuerySize::try_from(encrypted_report_streams.query_size).unwrap(),
size: QuerySize::try_from(count).unwrap(),
field_type: FieldType::Fp32BitPrime,
query_type,
};
Expand All @@ -429,9 +451,13 @@ async fn hybrid(
// the value for histogram values (BA32) must be kept in sync with the server-side
// implementation, otherwise a runtime reconstruct error will be generated.
// see ipa-core/src/query/executor.rs
let actual = run_hybrid_query_and_validate::<BA32>(
encrypted_report_streams.streams,
encrypted_report_streams.query_size,

let actual = run_hybrid_query_and_validate::<BA32, BufReader>(
submissions,
count,
args.shard_count
.try_into()
.expect("u32 should fit in usize"),
helper_clients,
query_id,
hybrid_query_config,
Expand Down
32 changes: 24 additions & 8 deletions ipa-core/src/cli/playbook/hybrid.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#![cfg(all(feature = "web-app", feature = "cli"))]
use std::{
cmp::min,
io::BufRead,
time::{Duration, Instant},
};

Expand All @@ -9,6 +10,7 @@ use serde::{Deserialize, Serialize};
use tokio::time::sleep;

use crate::{
cli::playbook::{RoundRobinSubmission, StreamingSubmission},
ff::{Serializable, U128Conversions},
helpers::{
query::{HybridQueryParams, QueryInput, QuerySize},
Expand All @@ -24,29 +26,43 @@ use crate::{
/// # Panics
/// if results are invalid
#[allow(clippy::disallowed_methods)] // allow try_join_all
pub async fn run_hybrid_query_and_validate<HV>(
inputs: [BodyStream; 3],
pub async fn run_hybrid_query_and_validate<HV, R>(
inputs: [RoundRobinSubmission<R>; 3],
query_size: usize,
shard_count: usize,
clients: Vec<[IpaHttpClient<Helper>; 3]>,
query_id: QueryId,
query_config: HybridQueryParams,
) -> HybridQueryResult
where
HV: SharedValue + U128Conversions,
AdditiveShare<HV>: Serializable,
R: BufRead + Send,
{
let mpc_time = Instant::now();
let leader_clients = clients[0].clone();

let transposed_inputs = inputs[0]
.into_byte_streams(shard_count)
.iter()
.zip(
inputs[1]
.into_byte_streams(shard_count)
.iter()
.zip(inputs[2].into_byte_streams(shard_count).iter()),
)
.map(|(i1, (i2, i3))| [i1, i2, i3])
.collect::<Vec<_>>();

// for now, submit everything to the leader. TODO: round robin submission
let leader_clients = &clients[0];
try_join_all(
inputs
transposed_inputs
.into_iter()
.zip(leader_clients)
.map(|(input_stream, client)| {
.flatten()
.zip(clients.into_iter().flatten())
.map(|(stream, client)| {
client.query_input(QueryInput {
query_id,
input_stream,
input_stream: BodyStream::from_bytes_stream(*stream),
})
}),
)
Expand Down
1 change: 1 addition & 0 deletions ipa-core/src/cli/playbook/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use tokio::time::sleep;
pub use self::{
hybrid::{run_hybrid_query_and_validate, HybridQueryResult},
ipa::{playbook_oprf_ipa, run_query_and_validate},
streaming::{RoundRobinSubmission, StreamingSubmission},
};
use crate::{
cli::config_parse::HelperNetworkConfigParseExt,
Expand Down
4 changes: 2 additions & 2 deletions ipa-core/src/cli/playbook/streaming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use crate::{

/// Trait for submitting inputs as streams, rather than reading everything
/// in memory. Should provide better performance for very large inputs.
trait StreamingSubmission {
pub trait StreamingSubmission {
/// Spits itself into `count` instances of [`BytesStream`].
fn into_byte_streams(self, count: usize) -> Vec<impl BytesStream>;
}
Expand All @@ -25,7 +25,7 @@ trait StreamingSubmission {
/// and delimited by newlines. The output streams will have
/// run-length encoding, meaning that each element will have
/// a 2 byte length prefix added to it.
struct RoundRobinSubmission<R>(R);
pub struct RoundRobinSubmission<R>(R);

impl<R: BufRead> RoundRobinSubmission<R> {
pub fn new(read_from: R) -> Self {
Expand Down
1 change: 1 addition & 0 deletions ipa-core/tests/hybrid.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ fn test_hybrid() {
.args(["--network".into(), config_path.join("network.toml")])
.args(["--output-file".as_ref(), output_file.as_os_str()])
.args(["--shard-count", SHARDS.to_string().as_str()])
.args(["--count", INPUT_SIZE.to_string().as_str()])
.args(["--wait", "2"])
.arg("malicious-hybrid")
.args(["--enc-input-file1".as_ref(), enc1.as_os_str()])
Expand Down

0 comments on commit add7967

Please sign in to comment.