From 7dbb1768426a8f79ddf8cd9e9eec5f225590aeac Mon Sep 17 00:00:00 2001 From: Dylan Chen Date: Fri, 26 Jul 2024 17:29:03 +0800 Subject: [PATCH] merge small chunks for sink executor --- src/stream/src/executor/sink.rs | 26 +++++++++++++++++++------- 1 file changed, 19 insertions(+), 7 deletions(-) diff --git a/src/stream/src/executor/sink.rs b/src/stream/src/executor/sink.rs index 726fc3ccad55..ca4fea1fdde4 100644 --- a/src/stream/src/executor/sink.rs +++ b/src/stream/src/executor/sink.rs @@ -362,20 +362,32 @@ impl SinkExecutor { } else { chunks }; - let chunks = if re_construct_with_sink_pk { - StreamChunkCompactor::new(down_stream_pk.clone(), chunks) + if re_construct_with_sink_pk { + let chunks = StreamChunkCompactor::new(down_stream_pk.clone(), chunks) .reconstructed_compacted_chunks( chunk_size, input_data_types.clone(), sink_type != SinkType::ForceAppendOnly, - ) + ); + for c in chunks { + yield Message::Chunk(c); + } } else { - chunks + let mut chunk_builder = + StreamChunkBuilder::new(chunk_size, input_data_types.clone()); + for chunk in chunks { + for (op, row) in chunk.rows() { + if let Some(c) = chunk_builder.append_row(op, row) { + yield Message::Chunk(c); + } + } + } + + if let Some(c) = chunk_builder.take() { + yield Message::Chunk(c); + } }; - for c in chunks { - yield Message::Chunk(c); - } if let Some(w) = mem::take(&mut watermark) { yield Message::Watermark(w) }