diff --git a/docker/report_collector.Dockerfile b/docker/report_collector.Dockerfile index d0dff1a44..165b85266 100644 --- a/docker/report_collector.Dockerfile +++ b/docker/report_collector.Dockerfile @@ -7,14 +7,16 @@ ARG SOURCES_DIR WORKDIR "$SOURCES_DIR" COPY . . RUN set -eux; \ - cargo build --bin report_collector --release --no-default-features --features "cli test-fixture web-app real-world-infra compact-gate" + cargo build --bin in_the_clear --bin crypto_util --bin report_collector --release --no-default-features --features "cli test-fixture web-app real-world-infra compact-gate" # Copy them to the final image FROM rust:slim-bookworm -ENV RC_BIN_PATH=/usr/local/bin/report_collector +ENV RC_BIN_PATH=/usr/local/bin ENV CONF_DIR=/etc/ipa ARG SOURCES_DIR RUN apt-get update && apt-get install -y curl procps ca-certificates && rm -rf /var/lib/apt/lists/* -COPY --from=builder ${SOURCES_DIR}/target/release/report_collector $RC_BIN_PATH +COPY --from=builder ${SOURCES_DIR}/target/release/in_the_clear $RC_BIN_PATH/in_the_clear +COPY --from=builder ${SOURCES_DIR}/target/release/crypto_util $RC_BIN_PATH/crypto_util +COPY --from=builder ${SOURCES_DIR}/target/release/report_collector $RC_BIN_PATH/report_collector diff --git a/ipa-core/src/bin/report_collector.rs b/ipa-core/src/bin/report_collector.rs index aa6da0bb4..2ec06cf0d 100644 --- a/ipa-core/src/bin/report_collector.rs +++ b/ipa-core/src/bin/report_collector.rs @@ -4,7 +4,7 @@ use std::{ fmt::Debug, fs::{File, OpenOptions}, io, - io::{stdout, Write}, + io::{stdout, BufReader, Write}, ops::Deref, path::{Path, PathBuf}, }; @@ -16,6 +16,7 @@ use ipa_core::{ playbook::{ make_clients, make_sharded_clients, playbook_oprf_ipa, run_hybrid_query_and_validate, run_query_and_validate, validate, validate_dp, HybridQueryResult, InputSource, + RoundRobinSubmission, }, CsvSerializer, IpaQueryResult, Verbosity, }, @@ -33,6 +34,8 @@ use ipa_core::{ }; use rand::{distributions::Alphanumeric, rngs::StdRng, thread_rng, Rng}; use rand_core::SeedableRng; +use ipa_core::cli::playbook::StreamingSubmission; +use ipa_core::helpers::BodyStream; #[derive(Debug, Parser)] #[clap(name = "rc", about = "Report Collector CLI")] @@ -143,6 +146,10 @@ enum ReportCollectorCommand { #[clap(flatten)] hybrid_query_config: HybridQueryParams, + + /// Number of records to aggreagte + #[clap(long, short = 'n')] + count: u32, }, } @@ -255,7 +262,17 @@ async fn main() -> Result<(), Box> { ReportCollectorCommand::MaliciousHybrid { ref encrypted_inputs, hybrid_query_config, - } => hybrid(&args, hybrid_query_config, clients, encrypted_inputs).await?, + count, + } => { + hybrid( + &args, + hybrid_query_config, + clients, + encrypted_inputs, + count.try_into().expect("u32 should fit into usize"), + ) + .await? + } }; Ok(()) @@ -402,20 +419,31 @@ async fn hybrid( hybrid_query_config: HybridQueryParams, helper_clients: Vec<[IpaHttpClient; 3]>, encrypted_inputs: &EncryptedInputs, + count: usize, ) -> Result<(), Box> { let query_type = QueryType::MaliciousHybrid(hybrid_query_config); - let files = [ + let [h1_streams, h2_streams, h3_streams] = [ &encrypted_inputs.enc_input_file1, &encrypted_inputs.enc_input_file2, &encrypted_inputs.enc_input_file3, - ]; - - // despite the name, this is generic enough to work with hybrid - let encrypted_report_streams = EncryptedOprfReportStreams::from(files); + ].map(|path| { + let file = + File::open(path).unwrap_or_else(|e| panic!("unable to open file {path:?}. {e}")); + RoundRobinSubmission::new(BufReader::new(file)) + }).map(|s| s.into_byte_streams(args.shard_count)); + + // create byte streams for each shard + let submissions = h1_streams.into_iter() + .zip(h2_streams.into_iter()) + .zip(h3_streams.into_iter()) + .map(|((s1, s2), s3)| { + [BodyStream::from_bytes_stream(s1), BodyStream::from_bytes_stream(s2), BodyStream::from_bytes_stream(s3)] + }) + .collect::>(); let query_config = QueryConfig { - size: QuerySize::try_from(encrypted_report_streams.query_size).unwrap(), + size: QuerySize::try_from(count).unwrap(), field_type: FieldType::Fp32BitPrime, query_type, }; @@ -429,9 +457,10 @@ async fn hybrid( // the value for histogram values (BA32) must be kept in sync with the server-side // implementation, otherwise a runtime reconstruct error will be generated. // see ipa-core/src/query/executor.rs + let actual = run_hybrid_query_and_validate::( - encrypted_report_streams.streams, - encrypted_report_streams.query_size, + submissions, + count, helper_clients, query_id, hybrid_query_config, diff --git a/ipa-core/src/cli/crypto/hybrid_encrypt.rs b/ipa-core/src/cli/crypto/hybrid_encrypt.rs index e0c749faf..7db024681 100644 --- a/ipa-core/src/cli/crypto/hybrid_encrypt.rs +++ b/ipa-core/src/cli/crypto/hybrid_encrypt.rs @@ -58,16 +58,16 @@ impl HybridEncryptArgs { let mut key_registries = KeyRegistries::default(); let network = - NetworkConfig::from_toml_str(&read_to_string(&self.network).unwrap_or_else(|e| { - panic!("Failed to open network file: {:?}. {}", &self.network, e) - })) + NetworkConfig::from_toml_str_sharded(&read_to_string(&self.network).unwrap_or_else( + |e| panic!("Failed to open network file: {:?}. {}", &self.network, e), + )) .unwrap_or_else(|e| { panic!( "Failed to parse network file into toml: {:?}. {}", &self.network, e ) }); - let Some(key_registries) = key_registries.init_from(&network) else { + let Some(key_registries) = key_registries.init_from(&network[0]) else { panic!("could not load network file") }; diff --git a/ipa-core/src/cli/playbook/hybrid.rs b/ipa-core/src/cli/playbook/hybrid.rs index a8c80cffd..b4c358bda 100644 --- a/ipa-core/src/cli/playbook/hybrid.rs +++ b/ipa-core/src/cli/playbook/hybrid.rs @@ -3,7 +3,7 @@ use std::{ cmp::min, time::{Duration, Instant}, }; - +use std::iter::zip; use futures_util::future::try_join_all; use serde::{Deserialize, Serialize}; use tokio::time::sleep; @@ -25,7 +25,7 @@ use crate::{ /// if results are invalid #[allow(clippy::disallowed_methods)] // allow try_join_all pub async fn run_hybrid_query_and_validate( - inputs: [BodyStream; 3], + inputs: Vec<[BodyStream; 3]>, query_size: usize, clients: Vec<[IpaHttpClient; 3]>, query_id: QueryId, @@ -36,28 +36,31 @@ where AdditiveShare: Serializable, { let mpc_time = Instant::now(); + assert_eq!(clients.len(), inputs.len()); + // submit inputs to each shard + let _ = try_join_all(zip(clients.iter(), inputs.into_iter()) + .map(|(shard_clients, shard_inputs)| { + try_join_all(shard_clients + .iter() + .zip(shard_inputs.into_iter()) + .map(|(client, input)| + { + client.query_input(QueryInput { + query_id, + input_stream: input + }) + } + ) + ) + })).await.unwrap(); - // for now, submit everything to the leader. TODO: round robin submission let leader_clients = &clients[0]; - try_join_all( - inputs - .into_iter() - .zip(leader_clients) - .map(|(input_stream, client)| { - client.query_input(QueryInput { - query_id, - input_stream, - }) - }), - ) - .await - .unwrap(); let mut delay = Duration::from_millis(125); loop { if try_join_all( leader_clients - .iter() + .each_ref() .map(|client| client.query_status(query_id)), ) .await diff --git a/ipa-core/src/cli/playbook/mod.rs b/ipa-core/src/cli/playbook/mod.rs index dc10806e7..fc8b46e9e 100644 --- a/ipa-core/src/cli/playbook/mod.rs +++ b/ipa-core/src/cli/playbook/mod.rs @@ -22,6 +22,7 @@ use tokio::time::sleep; pub use self::{ hybrid::{run_hybrid_query_and_validate, HybridQueryResult}, ipa::{playbook_oprf_ipa, run_query_and_validate}, + streaming::{RoundRobinSubmission, StreamingSubmission}, }; use crate::{ cli::config_parse::HelperNetworkConfigParseExt, diff --git a/ipa-core/src/cli/playbook/streaming.rs b/ipa-core/src/cli/playbook/streaming.rs index 141b25922..f246582ee 100644 --- a/ipa-core/src/cli/playbook/streaming.rs +++ b/ipa-core/src/cli/playbook/streaming.rs @@ -15,7 +15,7 @@ use crate::{ /// Trait for submitting inputs as streams, rather than reading everything /// in memory. Should provide better performance for very large inputs. -trait StreamingSubmission { +pub trait StreamingSubmission { /// Spits itself into `count` instances of [`BytesStream`]. fn into_byte_streams(self, count: usize) -> Vec; } @@ -25,7 +25,7 @@ trait StreamingSubmission { /// and delimited by newlines. The output streams will have /// run-length encoding, meaning that each element will have /// a 2 byte length prefix added to it. -struct RoundRobinSubmission(R); +pub struct RoundRobinSubmission(R); impl RoundRobinSubmission { pub fn new(read_from: R) -> Self { diff --git a/ipa-core/src/net/http_serde.rs b/ipa-core/src/net/http_serde.rs index 2b9dad085..c7c3eb1a4 100644 --- a/ipa-core/src/net/http_serde.rs +++ b/ipa-core/src/net/http_serde.rs @@ -132,6 +132,10 @@ pub mod query { let Query(q) = req.extract().await?; Ok(QueryType::MaliciousOprfIpa(q)) } + QueryType::MALICIOUS_HYBRID_STR => { + let Query(q) = req.extract().await?; + Ok(QueryType::MaliciousHybrid(q)) + } other => Err(Error::bad_query_value("query_type", other)), }?; Ok(QueryConfigQueryParams(QueryConfig { diff --git a/ipa-core/src/query/executor.rs b/ipa-core/src/query/executor.rs index 47a0d3bf5..d626e12bf 100644 --- a/ipa-core/src/query/executor.rs +++ b/ipa-core/src/query/executor.rs @@ -39,7 +39,7 @@ use crate::{ Gate, }, query::{ - runner::{OprfIpaQuery, QueryResult}, + runner::{execute_hybrid_protocol, OprfIpaQuery, QueryResult}, state::RunningQuery, }, sync::Arc, @@ -165,7 +165,22 @@ pub fn execute( ) }, ), - (QueryType::MaliciousHybrid(_), _) => todo!(), + (QueryType::MaliciousHybrid(ipa_config), _) => do_query( + runtime, + config, + gateway, + input, + move |prss, gateway, config, input| { + Box::pin(execute_hybrid_protocol( + prss, + gateway, + input, + ipa_config, + config, + key_registry, + )) + }, + ), } } diff --git a/ipa-core/src/query/runner/hybrid.rs b/ipa-core/src/query/runner/hybrid.rs index 199ee385f..9cc5294d8 100644 --- a/ipa-core/src/query/runner/hybrid.rs +++ b/ipa-core/src/query/runner/hybrid.rs @@ -8,31 +8,35 @@ use std::{ use futures::{stream::iter, StreamExt, TryStreamExt}; use generic_array::ArrayLength; +use super::QueryResult; use crate::{ error::{Error, LengthError}, ff::{ boolean::Boolean, - boolean_array::{BooleanArray, BA3, BA8}, + boolean_array::{BooleanArray, BA16, BA3, BA8}, curve_points::RP25519, ec_prime_field::Fp25519, Serializable, U128Conversions, }, helpers::{ - query::{DpMechanism, HybridQueryParams, QuerySize}, - BodyStream, LengthDelimitedStream, + query::{DpMechanism, HybridQueryParams, QueryConfig, QuerySize}, + setup_cross_shard_prss, BodyStream, Gateway, LengthDelimitedStream, }, hpke::PrivateKeyRegistry, protocol::{ basics::{shard_fin::FinalizerContext, BooleanArrayMul, BooleanProtocols, Reveal}, - context::{DZKPUpgraded, MacUpgraded, ShardedContext, UpgradableContext}, + context::{ + DZKPUpgraded, MacUpgraded, ShardedContext, ShardedMaliciousContext, UpgradableContext, + }, hybrid::{ hybrid_protocol, oprf::{CONV_CHUNK, PRF_CHUNK}, step::HybridStep, }, ipa_prf::{oprf_padding::PaddingParameters, prf_eval::PrfSharing, shuffle::Shuffle}, - prss::FromPrss, + prss::{Endpoint, FromPrss}, step::ProtocolStep::Hybrid, + Gate, }, query::runner::reshard_tag::reshard_aad, report::hybrid::{ @@ -42,6 +46,7 @@ use crate::{ replicated::semi_honest::AdditiveShare as Replicated, BitDecomposed, TransposeFrom, Vectorizable, }, + sharding::{ShardConfiguration, Sharded}, }; #[allow(dead_code)] @@ -165,6 +170,32 @@ where } } +pub async fn execute_hybrid_protocol<'a, R: PrivateKeyRegistry>( + prss: &'a Endpoint, + gateway: &'a Gateway, + input: BodyStream, + ipa_config: HybridQueryParams, + config: &QueryConfig, + key_registry: Arc, +) -> QueryResult { + let gate = Gate::default(); + let cross_shard_prss = + setup_cross_shard_prss(gateway, &gate, prss.indexed(&gate), gateway).await?; + let sharded = Sharded { + shard_id: gateway.shard_id(), + shard_count: gateway.shard_count(), + prss: Arc::new(cross_shard_prss), + }; + + let ctx = ShardedMaliciousContext::new_with_gate(prss, gateway, gate, sharded); + + Ok(Box::new( + Query::<_, BA16, R>::new(ipa_config, key_registry) + .execute(ctx, config.size, input) + .await?, + )) +} + #[cfg(all(test, unit_test, feature = "in-memory-infra"))] mod tests { use std::{ diff --git a/ipa-core/src/query/runner/mod.rs b/ipa-core/src/query/runner/mod.rs index 83f033fe4..642e2a846 100644 --- a/ipa-core/src/query/runner/mod.rs +++ b/ipa-core/src/query/runner/mod.rs @@ -15,7 +15,7 @@ pub(super) use sharded_shuffle::execute_sharded_shuffle; #[cfg(any(test, feature = "cli", feature = "test-fixture"))] pub(super) use test_multiply::execute_test_multiply; -pub use self::oprf_ipa::OprfIpaQuery; +pub use self::{hybrid::execute_hybrid_protocol, oprf_ipa::OprfIpaQuery}; use crate::{error::Error, query::ProtocolResult}; pub(super) type QueryResult = Result, Error>; diff --git a/ipa-core/tests/common/mod.rs b/ipa-core/tests/common/mod.rs index 8e1c014d7..7eab1c864 100644 --- a/ipa-core/tests/common/mod.rs +++ b/ipa-core/tests/common/mod.rs @@ -209,7 +209,9 @@ pub fn spawn_shards( command.args(["--shard-server-socket-fd", &shard.as_raw_fd().to_string()]); // something went wrong if command is terminated at this point. - let mut child = command.silent().spawn().unwrap(); + // let mut child = command.silent().spawn().unwrap(); + // TODO: make this silent again + let mut child = command.spawn().unwrap(); match child.try_wait() { Ok(Some(status)) => { panic!("Helper binary terminated early with status = {status}") diff --git a/ipa-core/tests/hybrid.rs b/ipa-core/tests/hybrid.rs index 858a25dde..819854ac6 100644 --- a/ipa-core/tests/hybrid.rs +++ b/ipa-core/tests/hybrid.rs @@ -17,7 +17,6 @@ pub const IN_THE_CLEAR_BIN: &str = env!("CARGO_BIN_EXE_in_the_clear"); // this currently only generates data and runs in the clear // eventaully we'll want to add the MPC as well #[test] -#[ignore] fn test_hybrid() { const INPUT_SIZE: usize = 100; const SHARDS: usize = 5; @@ -34,6 +33,7 @@ fn test_hybrid() { // Gen inputs let input_file = dir.path().join("ipa_inputs.txt"); + let in_the_clear_output_file = dir.path().join("ipa_output_in_the_clear.json"); let output_file = dir.path().join("ipa_output.json"); let mut command = Command::new(TEST_RC_BIN); @@ -51,25 +51,31 @@ fn test_hybrid() { let mut command = Command::new(IN_THE_CLEAR_BIN); command .args(["--input-file".as_ref(), input_file.as_os_str()]) - .args(["--output-file".as_ref(), output_file.as_os_str()]) + .args([ + "--output-file".as_ref(), + in_the_clear_output_file.as_os_str(), + ]) .silent() .stdin(Stdio::piped()); command.status().unwrap_status(); - // set to true to always keep the temp dir after test finishes - let dir = TempDir::new_delete_on_drop(); - let path = dir.path(); + println!( + "In the clear results: {}", + &std::fs::read_to_string(&in_the_clear_output_file) + .expect("IPA in the clear results file exists") + ); - let sockets = test_sharded_setup::(path); - let _helpers = spawn_shards(path, &sockets, true); + let config_path = dir.path().join("config"); + let sockets = test_sharded_setup::(&config_path); + let _helpers = spawn_shards(&config_path, &sockets, true); // encrypt input let mut command = Command::new(CRYPTO_UTIL_BIN); command .arg("hybrid-encrypt") .args(["--input-file".as_ref(), input_file.as_os_str()]) - .args(["--output-dir".as_ref(), path.as_os_str()]) - .args(["--network".into(), dir.path().join("network.toml")]) + .args(["--output-dir".as_ref(), dir.path().as_os_str()]) + .args(["--network".into(), config_path.join("network.toml")]) .stdin(Stdio::piped()); command.status().unwrap_status(); let enc1 = dir.path().join("helper1.enc"); @@ -79,10 +85,12 @@ fn test_hybrid() { // Run Hybrid let mut command = Command::new(TEST_RC_BIN); command - .args(["--network".into(), dir.path().join("network.toml")]) + .args(["--network".into(), config_path.join("network.toml")]) .args(["--output-file".as_ref(), output_file.as_os_str()]) + .args(["--shard-count", SHARDS.to_string().as_str()]) .args(["--wait", "2"]) - .arg("hybrid") + .arg("malicious-hybrid") + .args(["--count", INPUT_SIZE.to_string().as_str()]) .args(["--enc-input-file1".as_ref(), enc1.as_os_str()]) .args(["--enc-input-file2".as_ref(), enc2.as_os_str()]) .args(["--enc-input-file3".as_ref(), enc3.as_os_str()]) @@ -100,11 +108,17 @@ fn test_hybrid() { } command.stdin(Stdio::piped()); - let _test_mpc = command.spawn().unwrap().terminate_on_drop(); + let test_mpc = command.spawn().unwrap().terminate_on_drop(); + test_mpc.wait().unwrap_status(); // basic output checks - output should have the exact size as number of breakdowns + println!( + "{}", + &std::fs::read_to_string(&output_file).expect("IPA results file should exist") + ); + let output = serde_json::from_str::( - &std::fs::read_to_string(&output_file).expect("IPA results file exists"), + &std::fs::read_to_string(&output_file).expect("IPA results file should exist"), ) .expect("IPA results file is valid JSON");