From 8effb67021a762b252e15d379e61bc5e00a75118 Mon Sep 17 00:00:00 2001 From: Erik Taubeneck Date: Thu, 5 Dec 2024 14:36:51 -0800 Subject: [PATCH] integrate with RoundRobinSubmission, in progress --- ipa-core/src/bin/report_collector.rs | 42 +++++++++++++++++++++----- ipa-core/src/cli/playbook/hybrid.rs | 32 +++++++++++++++----- ipa-core/src/cli/playbook/mod.rs | 1 + ipa-core/src/cli/playbook/streaming.rs | 4 +-- ipa-core/tests/hybrid.rs | 1 + 5 files changed, 62 insertions(+), 18 deletions(-) diff --git a/ipa-core/src/bin/report_collector.rs b/ipa-core/src/bin/report_collector.rs index aa6da0bb4..98461ad7e 100644 --- a/ipa-core/src/bin/report_collector.rs +++ b/ipa-core/src/bin/report_collector.rs @@ -4,7 +4,7 @@ use std::{ fmt::Debug, fs::{File, OpenOptions}, io, - io::{stdout, Write}, + io::{stdout, BufRead, BufReader, Write}, ops::Deref, path::{Path, PathBuf}, }; @@ -16,6 +16,7 @@ use ipa_core::{ playbook::{ make_clients, make_sharded_clients, playbook_oprf_ipa, run_hybrid_query_and_validate, run_query_and_validate, validate, validate_dp, HybridQueryResult, InputSource, + RoundRobinSubmission, }, CsvSerializer, IpaQueryResult, Verbosity, }, @@ -143,6 +144,10 @@ enum ReportCollectorCommand { #[clap(flatten)] hybrid_query_config: HybridQueryParams, + + /// Number of records to aggreagte + #[clap(long, short = 'n')] + count: u32, }, } @@ -255,7 +260,17 @@ async fn main() -> Result<(), Box> { ReportCollectorCommand::MaliciousHybrid { ref encrypted_inputs, hybrid_query_config, - } => hybrid(&args, hybrid_query_config, clients, encrypted_inputs).await?, + count, + } => { + hybrid( + &args, + hybrid_query_config, + clients, + encrypted_inputs, + count.try_into().expect("u32 should fit into usize"), + ) + .await? + } }; Ok(()) @@ -402,6 +417,7 @@ async fn hybrid( hybrid_query_config: HybridQueryParams, helper_clients: Vec<[IpaHttpClient; 3]>, encrypted_inputs: &EncryptedInputs, + count: usize, ) -> Result<(), Box> { let query_type = QueryType::MaliciousHybrid(hybrid_query_config); @@ -411,11 +427,17 @@ async fn hybrid( &encrypted_inputs.enc_input_file3, ]; - // despite the name, this is generic enough to work with hybrid - let encrypted_report_streams = EncryptedOprfReportStreams::from(files); + let submissions = files + .iter() + .map(|path| { + let file = + File::open(path).unwrap_or_else(|e| panic!("unable to open file {path:?}. {e}")); + RoundRobinSubmission::new(BufReader::new(file)) + }) + .collect::>(); let query_config = QueryConfig { - size: QuerySize::try_from(encrypted_report_streams.query_size).unwrap(), + size: QuerySize::try_from(count).unwrap(), field_type: FieldType::Fp32BitPrime, query_type, }; @@ -429,9 +451,13 @@ async fn hybrid( // the value for histogram values (BA32) must be kept in sync with the server-side // implementation, otherwise a runtime reconstruct error will be generated. // see ipa-core/src/query/executor.rs - let actual = run_hybrid_query_and_validate::( - encrypted_report_streams.streams, - encrypted_report_streams.query_size, + + let actual = run_hybrid_query_and_validate::( + submissions, + count, + args.shard_count + .try_into() + .expect("u32 should fit in usize"), helper_clients, query_id, hybrid_query_config, diff --git a/ipa-core/src/cli/playbook/hybrid.rs b/ipa-core/src/cli/playbook/hybrid.rs index a8c80cffd..1fb9b293c 100644 --- a/ipa-core/src/cli/playbook/hybrid.rs +++ b/ipa-core/src/cli/playbook/hybrid.rs @@ -1,6 +1,7 @@ #![cfg(all(feature = "web-app", feature = "cli"))] use std::{ cmp::min, + io::BufRead, time::{Duration, Instant}, }; @@ -9,6 +10,7 @@ use serde::{Deserialize, Serialize}; use tokio::time::sleep; use crate::{ + cli::playbook::{RoundRobinSubmission, StreamingSubmission}, ff::{Serializable, U128Conversions}, helpers::{ query::{HybridQueryParams, QueryInput, QuerySize}, @@ -24,9 +26,10 @@ use crate::{ /// # Panics /// if results are invalid #[allow(clippy::disallowed_methods)] // allow try_join_all -pub async fn run_hybrid_query_and_validate( - inputs: [BodyStream; 3], +pub async fn run_hybrid_query_and_validate( + inputs: [RoundRobinSubmission; 3], query_size: usize, + shard_count: usize, clients: Vec<[IpaHttpClient; 3]>, query_id: QueryId, query_config: HybridQueryParams, @@ -34,19 +37,32 @@ pub async fn run_hybrid_query_and_validate( where HV: SharedValue + U128Conversions, AdditiveShare: Serializable, + R: BufRead + Send, { let mpc_time = Instant::now(); + let leader_clients = clients[0].clone(); + + let transposed_inputs = inputs[0] + .into_byte_streams(shard_count) + .iter() + .zip( + inputs[1] + .into_byte_streams(shard_count) + .iter() + .zip(inputs[2].into_byte_streams(shard_count).iter()), + ) + .map(|(i1, (i2, i3))| [i1, i2, i3]) + .collect::>(); - // for now, submit everything to the leader. TODO: round robin submission - let leader_clients = &clients[0]; try_join_all( - inputs + transposed_inputs .into_iter() - .zip(leader_clients) - .map(|(input_stream, client)| { + .flatten() + .zip(clients.into_iter().flatten()) + .map(|(stream, client)| { client.query_input(QueryInput { query_id, - input_stream, + input_stream: BodyStream::from_bytes_stream(*stream), }) }), ) diff --git a/ipa-core/src/cli/playbook/mod.rs b/ipa-core/src/cli/playbook/mod.rs index dc10806e7..fc8b46e9e 100644 --- a/ipa-core/src/cli/playbook/mod.rs +++ b/ipa-core/src/cli/playbook/mod.rs @@ -22,6 +22,7 @@ use tokio::time::sleep; pub use self::{ hybrid::{run_hybrid_query_and_validate, HybridQueryResult}, ipa::{playbook_oprf_ipa, run_query_and_validate}, + streaming::{RoundRobinSubmission, StreamingSubmission}, }; use crate::{ cli::config_parse::HelperNetworkConfigParseExt, diff --git a/ipa-core/src/cli/playbook/streaming.rs b/ipa-core/src/cli/playbook/streaming.rs index 141b25922..f246582ee 100644 --- a/ipa-core/src/cli/playbook/streaming.rs +++ b/ipa-core/src/cli/playbook/streaming.rs @@ -15,7 +15,7 @@ use crate::{ /// Trait for submitting inputs as streams, rather than reading everything /// in memory. Should provide better performance for very large inputs. -trait StreamingSubmission { +pub trait StreamingSubmission { /// Spits itself into `count` instances of [`BytesStream`]. fn into_byte_streams(self, count: usize) -> Vec; } @@ -25,7 +25,7 @@ trait StreamingSubmission { /// and delimited by newlines. The output streams will have /// run-length encoding, meaning that each element will have /// a 2 byte length prefix added to it. -struct RoundRobinSubmission(R); +pub struct RoundRobinSubmission(R); impl RoundRobinSubmission { pub fn new(read_from: R) -> Self { diff --git a/ipa-core/tests/hybrid.rs b/ipa-core/tests/hybrid.rs index 5b1beed61..c57783286 100644 --- a/ipa-core/tests/hybrid.rs +++ b/ipa-core/tests/hybrid.rs @@ -88,6 +88,7 @@ fn test_hybrid() { .args(["--network".into(), config_path.join("network.toml")]) .args(["--output-file".as_ref(), output_file.as_os_str()]) .args(["--shard-count", SHARDS.to_string().as_str()]) + .args(["--count", INPUT_SIZE.to_string().as_str()]) .args(["--wait", "2"]) .arg("malicious-hybrid") .args(["--enc-input-file1".as_ref(), enc1.as_os_str()])