diff --git a/Cargo.lock b/Cargo.lock index 1968f3e88403..6c33dc8acec0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1219,6 +1219,17 @@ dependencies = [ "futures-util", ] +[[package]] +name = "futures-batch" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6f444c45a1cb86f2a7e301469fd50a82084a60dadc25d94529a8312276ecb71a" +dependencies = [ + "futures", + "futures-timer", + "pin-utils", +] + [[package]] name = "futures-channel" version = "0.3.28" @@ -1290,6 +1301,12 @@ version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "76d3d132be6c0e6aa1534069c705a74a5997a356c0dc2f86a47765e5617c5b65" +[[package]] +name = "futures-timer" +version = "3.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e64b03909df88034c26dc1547e8970b91f98bdb65165d6a4e9110d94263dbb2c" + [[package]] name = "futures-util" version = "0.3.28" @@ -1592,6 +1609,7 @@ dependencies = [ "crdts", "criterion", "futures", + "futures-batch", "getrandom 0.2.8", "hdrhistogram", "hydroflow_cli_integration", diff --git a/hydro_cli/src/core/hydroflow_crate/ports.rs b/hydro_cli/src/core/hydroflow_crate/ports.rs index 3f492e47522c..8583a72e2c99 100644 --- a/hydro_cli/src/core/hydroflow_crate/ports.rs +++ b/hydro_cli/src/core/hydroflow_crate/ports.rs @@ -526,7 +526,6 @@ impl ServerConfig { } ServerConfig::MergeSelect(underlying, key) => { - dbg!(underlying); let key = *key; underlying .load_instantiated( diff --git a/hydro_cli/src/lib.rs b/hydro_cli/src/lib.rs index 4d98f4f4b949..49df422cf2cd 100644 --- a/hydro_cli/src/lib.rs +++ b/hydro_cli/src/lib.rs @@ -30,8 +30,9 @@ struct SafeCancelToken { impl SafeCancelToken { fn safe_cancel(&mut self) { if let Some(token) = self.cancel_tx.take() { - eprintln!("Received cancellation, cleaning up..."); - token.send(()).unwrap(); + if token.send(()).is_ok() { + eprintln!("Received cancellation, cleaning up..."); + } } else { eprintln!("Already received cancellation, please be patient!"); } @@ -52,13 +53,16 @@ async def coroutine_to_safely_cancellable(c, cancel_token): while True: try: ok, cancel = await asyncio.shield(c) + is_done = True except asyncio.CancelledError: cancel_token.safe_cancel() + is_done = False - if not cancel: - return ok - else: - raise asyncio.CancelledError() + if is_done: + if not cancel: + return ok + else: + raise asyncio.CancelledError() "#, "coro_converter", "coro_converter", diff --git a/hydro_cli_examples/examples/dedalus_sender/main.rs b/hydro_cli_examples/examples/dedalus_sender/main.rs index cb2ff4a3803f..111e4ef73037 100644 --- a/hydro_cli_examples/examples/dedalus_sender/main.rs +++ b/hydro_cli_examples/examples/dedalus_sender/main.rs @@ -1,3 +1,5 @@ +use std::time::Duration; + use hydroflow::{ tokio_stream::wrappers::IntervalStream, util::{ @@ -26,12 +28,15 @@ async fn main() { (format!("world {sender_i}"),), ]; + let batch_size = 8; + let batch_delay = Duration::from_millis(1); + let df = datalog!( r#" .input repeated `repeat_iter_external(to_repeat.iter().cloned())` .input periodic `source_stream(periodic) -> map(|_| ())` .input peers `repeat_iter(peers.clone()) -> map(|p| (p,))` - .async broadcast `map(|(node_id, v)| (node_id, serialize_to_bytes(v))) -> dest_sink(broadcast_sink)` `null::<(String,)>()` + .async broadcast `map(|(node_id, v)| (node_id, serialize_to_bytes(v))) -> dest_sink_chunked(broadcast_sink, batch_size, batch_delay)` `null::<(String,)>()` broadcast@n(x) :~ repeated(x), periodic(), peers(n) "# diff --git a/hydroflow/Cargo.toml b/hydroflow/Cargo.toml index 941661e0839b..054a0d3de8d6 100644 --- a/hydroflow/Cargo.toml +++ b/hydroflow/Cargo.toml @@ -5,7 +5,7 @@ edition = "2021" [features] default = [ "async", "macros" ] -async = [ "futures" ] +async = [ "futures", "futures-batch" ] macros = [ "hydroflow_macro", "hydroflow_datalog" ] hydroflow_macro = [ "dep:hydroflow_macro" ] hydroflow_datalog = [ "dep:hydroflow_datalog" ] @@ -17,6 +17,7 @@ bincode = "1.3" byteorder = "1.4.3" bytes = "1.1.0" futures = { version = "0.3", optional = true } +futures-batch = { version = "0.6.1", optional = true } hydroflow_datalog = { optional = true, path = "../hydroflow_datalog" } hydroflow_lang = { path = "../hydroflow_lang" } hydroflow_macro = { optional = true, path = "../hydroflow_macro" } diff --git a/hydroflow/src/lib.rs b/hydroflow/src/lib.rs index 712bd02e54cb..724928f67bfc 100644 --- a/hydroflow/src/lib.rs +++ b/hydroflow/src/lib.rs @@ -14,6 +14,7 @@ pub mod util; pub use bincode; pub use bytes; pub use futures; +pub use futures_batch; pub use pusherator; pub use rustc_hash; pub use serde; diff --git a/hydroflow/src/util/mod.rs b/hydroflow/src/util/mod.rs index 45bf9b239349..70f8125f4fb6 100644 --- a/hydroflow/src/util/mod.rs +++ b/hydroflow/src/util/mod.rs @@ -18,9 +18,10 @@ pub mod cli; use std::net::SocketAddr; use std::task::{Context, Poll}; +use std::time::Duration; use bincode; -use futures::Stream; +use futures::{Sink, SinkExt, Stream}; use serde::{Deserialize, Serialize}; pub fn unbounded_channel() -> ( @@ -131,6 +132,36 @@ where slice.sort_unstable_by(|a, b| f(a).cmp(f(b))) } +pub fn batched_sink + Send + 'static>( + s: S, + cap: usize, + timeout: Duration, +) -> impl Sink + Unpin { + let (send, recv) = tokio::sync::mpsc::unbounded_channel::(); + + use futures::{stream, StreamExt}; + use futures_batch::ChunksTimeoutStreamExt; + + tokio::spawn(async move { + let recv_stream = tokio_stream::wrappers::UnboundedReceiverStream::new(recv); + let mut batched_recv = recv_stream.chunks_timeout(cap, timeout); + let mut s = Box::pin(s); + + while let Some(batch) = batched_recv.next().await { + if s.send_all(&mut stream::iter(batch).map(|v| Ok(v))) + .await + .is_err() + { + panic!("Batched sink failed") + } + } + }); + + Box::pin(futures::sink::unfold(send, |send, item| async move { + send.send(item).map(|_| send).map_err(|_| ()) + })) +} + #[cfg(test)] mod test { use super::*; diff --git a/hydroflow_lang/src/graph/ops/dest_sink_chunked.rs b/hydroflow_lang/src/graph/ops/dest_sink_chunked.rs new file mode 100644 index 000000000000..426af5b625c0 --- /dev/null +++ b/hydroflow_lang/src/graph/ops/dest_sink_chunked.rs @@ -0,0 +1,98 @@ +use super::{make_missing_runtime_msg, FlowProperties, FlowPropertyVal}; + +use super::{ + OperatorConstraints, OperatorInstance, OperatorWriteOutput, WriteContextArgs, RANGE_0, RANGE_1, +}; + +use quote::quote_spanned; + +/// The same as `dest_sink`, but takes two additional parameters controlling +/// when the data is actually flushed. +#[hydroflow_internalmacro::operator_docgen] +pub const DEST_SINK_CHUNKED: OperatorConstraints = OperatorConstraints { + name: "dest_sink_chunked", + hard_range_inn: RANGE_1, + soft_range_inn: RANGE_1, + hard_range_out: RANGE_0, + soft_range_out: RANGE_0, + num_args: 3, + persistence_args: RANGE_0, + type_args: RANGE_0, + is_external_input: false, + ports_inn: None, + ports_out: None, + properties: FlowProperties { + deterministic: FlowPropertyVal::Preserve, + monotonic: FlowPropertyVal::Preserve, + inconsistency_tainted: false, + }, + input_delaytype_fn: |_| None, + write_fn: |wc @ &WriteContextArgs { + root, + hydroflow, + op_span, + ident, + op_name, + op_inst: OperatorInstance { arguments, .. }, + .. + }, + _| { + let sink_arg = &arguments[0]; + let chunk_size_arg = &arguments[1]; + let chunk_delay_arg = &arguments[2]; + + let send_ident = wc.make_ident("item_send"); + let recv_ident = wc.make_ident("item_recv"); + + let missing_runtime_msg = make_missing_runtime_msg(op_name); + + let write_prologue = quote_spanned! {op_span=> + let (#send_ident, #recv_ident) = #root::tokio::sync::mpsc::unbounded_channel(); + { + /// Function is needed so `Item` is so no ambiguity for what `Item` is used + /// when calling `.flush()`. + async fn sink_feed_flush( + recv: #root::tokio::sync::mpsc::UnboundedReceiver, + mut sink: Sink, + chunk_size: usize, + delay: ::std::time::Duration, + ) where + Sink: ::std::marker::Unpin + #root::futures::Sink, + Sink::Error: ::std::fmt::Debug, + { + use #root::futures::SinkExt; + use #root::futures::StreamExt; + use #root::futures_batch::ChunksTimeoutStreamExt; + + let recv_stream = #root::tokio_stream::wrappers::UnboundedReceiverStream::new(recv); + let mut batched_recv = ::std::boxed::Box::pin(recv_stream.chunks_timeout(chunk_size, delay)); + + while let Some(batch) = batched_recv.next().await { + for item in batch { + sink.feed(item) + .await + .expect("Error processing async sink item."); + } + + sink.flush().await.expect("Failed to flush sink."); + } + } + #hydroflow + .spawn_task(sink_feed_flush(#recv_ident, #sink_arg, #chunk_size_arg, #chunk_delay_arg)) + .expect(#missing_runtime_msg); + } + }; + + let write_iterator = quote_spanned! {op_span=> + let #ident = #root::pusherator::for_each::ForEach::new(|item| { + #send_ident.send(item).expect("Failed to send async write item for processing."); + }); + }; + + Ok(OperatorWriteOutput { + write_prologue, + write_iterator, + ..Default::default() + }) + }, +}; diff --git a/hydroflow_lang/src/graph/ops/mod.rs b/hydroflow_lang/src/graph/ops/mod.rs index bbbf5696b565..59a10dff3fd3 100644 --- a/hydroflow_lang/src/graph/ops/mod.rs +++ b/hydroflow_lang/src/graph/ops/mod.rs @@ -232,6 +232,7 @@ declare_ops![ demux::DEMUX, dest_file::DEST_FILE, dest_sink::DEST_SINK, + dest_sink_chunked::DEST_SINK_CHUNKED, dest_sink_serde::DEST_SINK_SERDE, difference::DIFFERENCE, filter::FILTER,