Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use stream buffering in report collector #1504

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions ipa-core/src/bin/report_collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
fs::{File, OpenOptions},
io,
io::{stdout, BufReader, Write},
num::NonZeroUsize,
ops::Deref,
path::{Path, PathBuf},
};
Expand All @@ -15,8 +16,8 @@
cli::{
playbook::{
make_clients, make_sharded_clients, playbook_oprf_ipa, run_hybrid_query_and_validate,
run_query_and_validate, validate, validate_dp, HybridQueryResult, InputSource,
RoundRobinSubmission, StreamingSubmission,
run_query_and_validate, validate, validate_dp, BufferedRoundRobinSubmission,
HybridQueryResult, InputSource, StreamingSubmission,
},
CsvSerializer, IpaQueryResult, Verbosity,
},
Expand Down Expand Up @@ -430,6 +431,9 @@
count: usize,
set_fixed_polling_ms: Option<u64>,
) -> Result<(), Box<dyn Error>> {
// twice the size of TCP MSS. This may get messed up if TCP options are used which is not
// in our control, but hopefully fragmentation is not too bad
const BUF_SIZE: NonZeroUsize = NonZeroUsize::new(1072).unwrap();
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@andyleiserson do you have any suggestion on picking up the buffer size?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tokio and std buffered I/O helpers use 8 kB, so maybe use that? I don't think it's necessary to try and match this to TCP MSS, I might not even expose the buffer size from BufferedBytesStream until we identify a need to tune it for individual uses.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think what's important is that the kernel has at least a full packet's worth of data, so that it can send full packets.

let query_type = QueryType::MaliciousHybrid(hybrid_query_config);

let [h1_streams, h2_streams, h3_streams] = [
Expand All @@ -439,7 +443,7 @@
]
.map(|path| {
let file = File::open(path).unwrap_or_else(|e| panic!("unable to open file {path:?}. {e}"));
RoundRobinSubmission::new(BufReader::new(file))
BufferedRoundRobinSubmission::new(BufReader::new(file), BUF_SIZE)

Check warning on line 446 in ipa-core/src/bin/report_collector.rs

View check run for this annotation

Codecov / codecov/patch

ipa-core/src/bin/report_collector.rs#L446

Added line #L446 was not covered by tests
})
.map(|s| s.into_byte_streams(args.shard_count));

Expand Down
2 changes: 1 addition & 1 deletion ipa-core/src/cli/playbook/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use tokio::time::sleep;
pub use self::{
hybrid::{run_hybrid_query_and_validate, HybridQueryResult},
ipa::{playbook_oprf_ipa, run_query_and_validate},
streaming::{RoundRobinSubmission, StreamingSubmission},
streaming::{BufferedRoundRobinSubmission, StreamingSubmission},
};
use crate::{
cli::config_parse::HelperNetworkConfigParseExt,
Expand Down
155 changes: 144 additions & 11 deletions ipa-core/src/cli/playbook/streaming.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::{
io::BufRead,
num::NonZeroUsize,
pin::Pin,
task::{Context, Poll, Waker},
};
Expand All @@ -9,7 +10,7 @@ use futures::Stream;

use crate::{
error::BoxError,
helpers::BytesStream,
helpers::{BufferedBytesStream, BytesStream},
sync::{Arc, Mutex},
};

Expand All @@ -20,6 +21,34 @@ pub trait StreamingSubmission {
fn into_byte_streams(self, count: usize) -> Vec<impl BytesStream>;
}

/// Same as [`RoundRobinSubmission`] but buffers the destination stream
/// until it accumulates at least `buf_size` bytes of data
pub struct BufferedRoundRobinSubmission<R> {
inner: R,
buf_size: NonZeroUsize,
}

impl<R: BufRead> BufferedRoundRobinSubmission<R> {
/// Creates a new instance with the specified buffer size. All streams created
/// using [`StreamingSubmission::into_byte_streams`] will have their own buffer set.
pub fn new(read_from: R, buf_size: NonZeroUsize) -> Self {
Self {
inner: read_from,
buf_size,
}
}
}

impl<R: BufRead + Send> StreamingSubmission for BufferedRoundRobinSubmission<R> {
fn into_byte_streams(self, count: usize) -> Vec<impl BytesStream> {
RoundRobinSubmission::new(self.inner)
.into_byte_streams(count)
.into_iter()
.map(|s| BufferedBytesStream::new(s, self.buf_size))
.collect()
}
}

/// Round-Robin strategy to read off the provided buffer
/// and distribute them. Inputs is expected to be hex-encoded
/// and delimited by newlines. The output streams will have
Expand Down Expand Up @@ -149,6 +178,7 @@ impl<R: BufRead> State<R> {
#[cfg(all(test, unit_test))]
mod tests {
use std::{
collections::HashSet,
fs::File,
io::{BufReader, Write},
iter,
Expand All @@ -159,24 +189,98 @@ mod tests {
use tempfile::TempDir;

use crate::{
cli::playbook::streaming::{RoundRobinSubmission, StreamingSubmission},
cli::playbook::streaming::{
BufferedRoundRobinSubmission, RoundRobinSubmission, StreamingSubmission,
},
helpers::BytesStream,
test_executor::run,
};

async fn drain_all<S: BytesStream>(streams: Vec<S>) -> Vec<String> {
async fn drain_all_buffered<S: BytesStream>(
streams: Vec<S>,
buf_size: Option<usize>,
) -> Vec<Vec<u8>> {
let mut futs = FuturesOrdered::default();
for s in streams {
futs.push_back(s.try_fold(String::new(), |mut acc, chunk| async move {
// remove RLE decoding
let len = usize::from(u16::from_le_bytes(chunk[..2].try_into().unwrap()));
assert_eq!(len, chunk.len() - 2);
acc.push_str(&String::from_utf8_lossy(&chunk[2..]));
Ok(acc)
}));
futs.push_back(s.try_fold(
(Vec::new(), HashSet::new(), 0, 0),
|(mut acc, mut sizes, mut leftover, mut pending_len), mut chunk| async move {
// keep track of chunk sizes we've seen from the stream. Only the last chunk
// can have size that is not equal to `buf_size`
sizes.insert(chunk.len());

// if we have a leftover from previous buffer, push it first
if leftover > 0 {
let next_chunk = std::cmp::min(leftover, chunk.len());
leftover -= next_chunk;
acc.extend(&chunk.split_to(next_chunk));
}

while !chunk.is_empty() {
// remove RLE decoding
let len = if pending_len > 0 {
// len (2 byte value) can be fragmented as well
let next_byte =
u8::from_le_bytes(chunk.split_to(1).as_ref().try_into().unwrap());
let r = u16::from_le_bytes([pending_len, next_byte]);
pending_len = 0;
r
} else if chunk.len() > 1 {
let len =
u16::from_le_bytes(chunk.split_to(2).as_ref().try_into().unwrap());
len
} else {
pending_len =
u8::from_le_bytes(chunk.split_to(1).as_ref().try_into().unwrap());
assert!(chunk.is_empty());
break;
};

let len = usize::from(len);

// the next item may span across multiple buffers
let take_len = if len > chunk.len() {
leftover = len - chunk.len();
chunk.len()
} else {
len
};
acc.extend(&chunk.split_to(take_len));
}

Ok((acc, sizes, leftover, pending_len))
},
));
}
futs.try_collect::<Vec<_>>()
.await
.unwrap()
.into_iter()
.map(|(s, sizes, leftover, pending_len)| {
assert_eq!(0, leftover);
assert_eq!(0, pending_len);

// We can have only one chunk that can be at or less than `buf_size`.
// If there are multiple chunks, then at least one must have `buf_size` and there
// can be at most two chunks.
if let Some(buf_size) = buf_size {
assert!(sizes.len() <= 2);
if sizes.len() > 1 {
assert!(sizes.contains(&buf_size));
}
}

s
})
.collect()
}

futs.try_collect::<Vec<_>>().await.unwrap()
async fn drain_all<S: BytesStream>(streams: Vec<S>) -> Vec<String> {
drain_all_buffered(streams, None)
.await
.into_iter()
.map(|v| String::from_utf8_lossy(&v).to_string())
.collect()
}

fn encoded<I: IntoIterator<Item: AsRef<[u8]>>>(input: I) -> Vec<String> {
Expand All @@ -188,6 +292,12 @@ mod tests {
run(|| verify_one(vec!["foo", "bar", "baz", "qux", "quux"], 3));
}

#[test]
fn basic_buffered() {
run(|| verify_buffered(vec!["foo", "bar", "baz", "qux", "quux"], 1, 1));
run(|| verify_buffered(vec!["foo", "bar", "baz", "qux", "quux"], 3, 5));
}

#[test]
#[should_panic(expected = "InvalidHexCharacter")]
fn non_hex() {
Expand Down Expand Up @@ -272,12 +382,35 @@ mod tests {
assert_eq!(expected, drain_all(streams).await);
}

/// The reason we work with bytes is that string character may span multiple bytes,
/// making [`String::from_utf8`] method work incorrectly as it is not commutative with
/// buffering.
async fn verify_buffered<R: AsRef<[u8]>>(input: Vec<R>, count: usize, buf_size: usize) {
assert!(count > 0);
let data = encoded(input.iter().map(AsRef::as_ref)).join("\n");
let streams =
BufferedRoundRobinSubmission::new(data.as_bytes(), buf_size.try_into().unwrap())
.into_byte_streams(count);
let mut expected: Vec<Vec<u8>> = vec![vec![]; count];
for (i, next) in input.into_iter().enumerate() {
expected[i % count].extend(next.as_ref());
}
assert_eq!(expected, drain_all_buffered(streams, Some(buf_size)).await);
}

proptest! {
#[test]
fn proptest_round_robin(input: Vec<String>, count in 1_usize..953) {
run(move || async move {
verify_one(input, count).await;
});
}

#[test]
fn proptest_round_robin_buffered(input: Vec<Vec<u8>>, count in 1_usize..953, buf_size in 1_usize..1024) {
run(move || async move {
verify_buffered(input, count, buf_size).await;
});
}
}
}
10 changes: 5 additions & 5 deletions ipa-core/src/helpers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,11 @@ pub use transport::{
InMemoryTransportError,
};
pub use transport::{
make_owned_handler, query, routing, ApiError, BodyStream, BroadcastError, BytesStream,
HandlerBox, HandlerRef, HelperResponse, Identity as TransportIdentity, LengthDelimitedStream,
LogErrors, NoQueryId, NoResourceIdentifier, NoStep, QueryIdBinding, ReceiveRecords,
RecordsStream, RequestHandler, RouteParams, SingleRecordStream, StepBinding, StreamCollection,
StreamKey, Transport, WrappedBoxBodyStream,
make_owned_handler, query, routing, ApiError, BodyStream, BroadcastError, BufferedBytesStream,
BytesStream, HandlerBox, HandlerRef, HelperResponse, Identity as TransportIdentity,
LengthDelimitedStream, LogErrors, NoQueryId, NoResourceIdentifier, NoStep, QueryIdBinding,
ReceiveRecords, RecordsStream, RequestHandler, RouteParams, SingleRecordStream, StepBinding,
StreamCollection, StreamKey, Transport, WrappedBoxBodyStream,
};
use typenum::{Const, ToUInt, Unsigned, U8};
use x25519_dalek::PublicKey;
Expand Down
4 changes: 2 additions & 2 deletions ipa-core/src/helpers/transport/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ pub use receive::{LogErrors, ReceiveRecords};
#[cfg(feature = "web-app")]
pub use stream::WrappedAxumBodyStream;
pub use stream::{
BodyStream, BytesStream, LengthDelimitedStream, RecordsStream, SingleRecordStream,
StreamCollection, StreamKey, WrappedBoxBodyStream,
BodyStream, BufferedBytesStream, BytesStream, LengthDelimitedStream, RecordsStream,
SingleRecordStream, StreamCollection, StreamKey, WrappedBoxBodyStream,
};

/// An identity of a peer that can be communicated with using [`Transport`]. There are currently two
Expand Down
10 changes: 5 additions & 5 deletions ipa-core/src/helpers/transport/stream/buffered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::{
};

use bytes::Bytes;
use futures::Stream;
use futures::{stream::Fuse, Stream, StreamExt};
use pin_project::pin_project;

use crate::helpers::BytesStream;
Expand All @@ -19,7 +19,7 @@ use crate::helpers::BytesStream;
pub struct BufferedBytesStream<S> {
/// Inner stream to poll
#[pin]
inner: S,
inner: Fuse<S>,
/// Buffer of bytes pending release
buffer: Vec<u8>,
/// Number of bytes released per single poll.
Expand All @@ -28,10 +28,10 @@ pub struct BufferedBytesStream<S> {
sz: usize,
}

impl<S> BufferedBytesStream<S> {
fn new(inner: S, buf_size: NonZeroUsize) -> Self {
impl<S: BytesStream> BufferedBytesStream<S> {
pub fn new(inner: S, buf_size: NonZeroUsize) -> Self {
Self {
inner,
inner: inner.fuse(),
buffer: Vec::with_capacity(buf_size.get()),
sz: buf_size.get(),
}
Expand Down
2 changes: 1 addition & 1 deletion ipa-core/src/helpers/transport/stream/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#[cfg(feature = "web-app")]
mod axum_body;
mod box_body;
#[allow(dead_code)]
mod buffered;
mod collection;
mod input;
Expand All @@ -14,6 +13,7 @@ use std::{
#[cfg(feature = "web-app")]
pub use axum_body::WrappedAxumBodyStream;
pub use box_body::WrappedBoxBodyStream;
pub use buffered::BufferedBytesStream;
use bytes::Bytes;
pub use collection::{StreamCollection, StreamKey};
use futures::{stream::iter, Stream};
Expand Down
Loading