From f8adc74918551c383aaa54ba98a80c45adc8f32a Mon Sep 17 00:00:00 2001 From: Dylan Date: Mon, 29 Jul 2024 21:28:31 +0800 Subject: [PATCH] feat(sink): merge small chunks for sink executor (#17825) --- src/stream/src/executor/sink.rs | 26 +++++++++++++++++++------- 1 file changed, 19 insertions(+), 7 deletions(-) diff --git a/src/stream/src/executor/sink.rs b/src/stream/src/executor/sink.rs index dc280cd94f1f9..708460c907f35 100644 --- a/src/stream/src/executor/sink.rs +++ b/src/stream/src/executor/sink.rs @@ -339,20 +339,32 @@ impl SinkExecutor { } else { chunks }; - let chunks = if re_construct_with_sink_pk { - StreamChunkCompactor::new(down_stream_pk.clone(), chunks) + if re_construct_with_sink_pk { + let chunks = StreamChunkCompactor::new(down_stream_pk.clone(), chunks) .reconstructed_compacted_chunks( chunk_size, input_data_types.clone(), sink_type != SinkType::ForceAppendOnly, - ) + ); + for c in chunks { + yield Message::Chunk(c); + } } else { - chunks + let mut chunk_builder = + StreamChunkBuilder::new(chunk_size, input_data_types.clone()); + for chunk in chunks { + for (op, row) in chunk.rows() { + if let Some(c) = chunk_builder.append_row(op, row) { + yield Message::Chunk(c); + } + } + } + + if let Some(c) = chunk_builder.take() { + yield Message::Chunk(c); + } }; - for c in chunks { - yield Message::Chunk(c); - } if let Some(w) = mem::take(&mut watermark) { yield Message::Watermark(w) }