Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Query input from URL #1508

Merged
Prev Previous commit
Next Next commit
Move input-from-URL to the axum handler
This should result in it running on the HTTP runtime. Doing it in the
query processor would have it on the query runtime (absent other
changes to runtime plumbing).
  • Loading branch information
andyleiserson committed Dec 18, 2024
commit 2a82205d6e51aabaa7a83038b12f1cca1935a5bc
25 changes: 12 additions & 13 deletions ipa-core/src/app.rs
Original file line number Diff line number Diff line change
@@ -12,7 +12,7 @@ use crate::{
},
hpke::{KeyRegistry, PrivateKeyOnly},
protocol::QueryId,
query::{NewQueryError, QueryInputError, QueryProcessor, QueryStatus},
query::{NewQueryError, QueryProcessor, QueryStatus},
sharding::ShardIndex,
sync::Arc,
utils::NonZeroU32PowerOfTwo,
@@ -139,9 +139,12 @@ impl HelperApp {
pub fn execute_query(&self, input: QueryInput) -> Result<(), ApiError> {
let mpc_transport = self.inner.mpc_transport.clone_ref();
let shard_transport = self.inner.shard_transport.clone_ref();
let QueryInput::Inline { query_id, input_stream } = input else {
panic!("this client does not support pulling query input from a URL");
};
self.inner
.query_processor
.receive_inputs(mpc_transport, shard_transport, input)?;
.receive_inputs(mpc_transport, shard_transport, query_id, input_stream)?;
Ok(())
}

@@ -250,17 +253,13 @@ impl RequestHandler<HelperIdentity> for Inner {
)
}
RouteId::QueryInput => {
HelperResponse::from(
QueryInput::from_addr(req, data)
.ok_or(QueryInputError::BadRequest)
.and_then(|input| {
qp.receive_inputs(
Transport::clone_ref(&self.mpc_transport),
Transport::clone_ref(&self.shard_transport),
input,
)
})?
)
let query_id = ext_query_id(&req)?;
HelperResponse::from(qp.receive_inputs(
Transport::clone_ref(&self.mpc_transport),
Transport::clone_ref(&self.shard_transport),
query_id,
data,
)?)
}
RouteId::QueryStatus => {
let query_id = ext_query_id(&req)?;
6 changes: 6 additions & 0 deletions ipa-core/src/helpers/transport/query/mod.rs
Original file line number Diff line number Diff line change
@@ -185,6 +185,7 @@ impl RouteParams<RouteId, QueryId, NoStep> for &PrepareQuery {
}


/*
pub enum QueryInputRequest {
FromUrl {
query_id: QueryId,
@@ -194,6 +195,7 @@ pub enum QueryInputRequest {
query_id: QueryId,
},
}
*/

pub enum QueryInput {
FromUrl {
@@ -245,13 +247,15 @@ impl QueryInput {
}
}

/*
impl QueryInputRequest {
pub fn query_id(&self) -> QueryId {
match self {
Self::FromUrl { query_id, .. } | Self::Inline { query_id, ..} => *query_id,
}
}
}
*/

impl Debug for QueryInput {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
@@ -271,6 +275,7 @@ impl Debug for QueryInput {
}
}

/*
impl RouteParams<RouteId, QueryId, NoStep> for QueryInputRequest {
type Params = String;

@@ -294,6 +299,7 @@ impl RouteParams<RouteId, QueryId, NoStep> for QueryInputRequest {
}
}
}
*/

#[derive(Clone, Debug, Serialize, Deserialize)]
#[cfg_attr(test, derive(PartialEq, Eq))]
6 changes: 4 additions & 2 deletions ipa-core/src/net/query_input.rs
Original file line number Diff line number Diff line change
@@ -4,10 +4,10 @@ use hyper::Uri;
use hyper_rustls::HttpsConnectorBuilder;
use hyper_util::{client::legacy::Client, rt::{TokioExecutor, TokioTimer}};

use crate::{helpers::{query::QueryInput, BodyStream}, net::Error};
use crate::{helpers::BodyStream, net::Error};

/// Download query input from a remote URL.
async fn stream_query_input_from_url(uri: &Uri) -> Result<BodyStream, Error> {
pub async fn stream_query_input_from_url(uri: &Uri) -> Result<BodyStream, Error> {
let mut builder = Client::builder(TokioExecutor::new());
// the following timer is necessary for http2, in particular for any timeouts
// and waits the clients will need to make
@@ -43,9 +43,11 @@ async fn stream_query_input_from_url(uri: &Uri) -> Result<BodyStream, Error> {
Ok(BodyStream::from_bytes_stream(resp.into_body().map_err(BoxError::from).into_data_stream()))
}

/*
pub async fn stream_query_input(query_input: QueryInput) -> Result<BodyStream, Error> {
match query_input {
QueryInput::Inline { input_stream, .. } => Ok(input_stream),
QueryInput::FromUrl { url, .. } => stream_query_input_from_url(&url).await,
}
}
*/
38 changes: 9 additions & 29 deletions ipa-core/src/net/server/handlers/query/input.rs
Original file line number Diff line number Diff line change
@@ -2,8 +2,8 @@ use axum::{extract::Path, routing::post, Extension, Router};
use hyper::StatusCode;

use crate::{
helpers::{query::QueryInputRequest, BodyStream},
net::{http_serde::{self, query::input::QueryInputUrl}, transport::MpcHttpTransport, Error},
helpers::{routing::RouteId, BodyStream},
net::{http_serde::{self, query::input::QueryInputUrl}, query_input::stream_query_input_from_url, transport::MpcHttpTransport, Error},
protocol::QueryId,
};

@@ -13,13 +13,13 @@ async fn handler(
input_url: QueryInputUrl,
input_stream: BodyStream,
) -> Result<(), Error> {
let query_input = if let Some(url) = input_url.into() {
QueryInputRequest::FromUrl { query_id, url }
let input_stream = if let Some(url) = input_url.into() {
stream_query_input_from_url(&url).await?
} else {
QueryInputRequest::Inline { query_id }
input_stream
};
let _ = transport
.dispatch(query_input, input_stream)
.dispatch((RouteId::QueryInput, query_id), input_stream)
.await
.map_err(|e| Error::application(StatusCode::INTERNAL_SERVER_ERROR, e))?;

@@ -81,29 +81,9 @@ mod tests {
assert_success_with(req, req_handler).await;
}

#[tokio::test(flavor = "multi_thread")]
async fn input_from_url() {
let expected_query_id = QueryId;
let expected_url = "https://storage.example/ipa-reports";
let req = http_serde::query::input::Request::new(QueryInput::FromUrl {
query_id: expected_query_id,
url: expected_url.parse().unwrap(),
});
let req_handler = make_owned_handler(move |addr, _body| async move {
let RouteId::QueryInput = addr.route else {
panic!("unexpected call");
};

assert_eq!(addr.query_id, Some(expected_query_id));
assert_eq!(addr.params, expected_url);

Ok(HelperResponse::ok())
});
let req = req
.try_into_http_request(Scheme::HTTP, Authority::from_static("localhost"))
.unwrap();
assert_success_with(req, req_handler).await;
}
// It is not possible to test input from URL with this style of test, because these
// tests don't actually invoke the handler defined in this file, and that is where
// we handle URL input.

struct OverrideReq {
query_id: String,
9 changes: 5 additions & 4 deletions ipa-core/src/query/processor.rs
Original file line number Diff line number Diff line change
@@ -11,7 +11,7 @@ use crate::{
error::Error as ProtocolError,
executor::IpaRuntime,
helpers::{
query::{CompareStatusRequest, PrepareQuery, QueryConfig, QueryInput}, routing::RouteId, BodyStream, BroadcastError, Gateway, GatewayConfig, MpcTransportError, MpcTransportImpl, Role, RoleAssignment, ShardTransportError, ShardTransportImpl, Transport
query::{CompareStatusRequest, PrepareQuery, QueryConfig}, routing::RouteId, BodyStream, BroadcastError, Gateway, GatewayConfig, MpcTransportError, MpcTransportImpl, Role, RoleAssignment, ShardTransportError, ShardTransportImpl, Transport
},
hpke::{KeyRegistry, PrivateKeyOnly},
protocol::QueryId,
@@ -299,11 +299,12 @@ impl Processor {
&self,
mpc_transport: MpcTransportImpl,
shard_transport: ShardTransportImpl,
input: QueryInput,
query_id: QueryId,
input_stream: BodyStream,
) -> Result<(), QueryInputError> {
let mut queries = self.queries.inner.lock().unwrap();
let query_id = input.query_id();
let input_stream = input.input_stream().unwrap_or_else(|| BodyStream::empty());
//let query_id = input.query_id();
//let input_stream = input.input_stream().unwrap_or_else(|| BodyStream::empty());
match queries.entry(query_id) {
Entry::Occupied(entry) => {
let state = entry.remove();