Skip to content

Commit

Permalink
feat(sink): merge small chunks for sink executor (#17825)
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzl25 authored and chenzl25 committed Jul 29, 2024
1 parent 9f3af5a commit f8adc74
Showing 1 changed file with 19 additions and 7 deletions.
26 changes: 19 additions & 7 deletions src/stream/src/executor/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -339,20 +339,32 @@ impl<F: LogStoreFactory> SinkExecutor<F> {
} else {
chunks
};
let chunks = if re_construct_with_sink_pk {
StreamChunkCompactor::new(down_stream_pk.clone(), chunks)
if re_construct_with_sink_pk {
let chunks = StreamChunkCompactor::new(down_stream_pk.clone(), chunks)
.reconstructed_compacted_chunks(
chunk_size,
input_data_types.clone(),
sink_type != SinkType::ForceAppendOnly,
)
);
for c in chunks {
yield Message::Chunk(c);
}
} else {
chunks
let mut chunk_builder =
StreamChunkBuilder::new(chunk_size, input_data_types.clone());
for chunk in chunks {
for (op, row) in chunk.rows() {
if let Some(c) = chunk_builder.append_row(op, row) {
yield Message::Chunk(c);
}
}
}

if let Some(c) = chunk_builder.take() {
yield Message::Chunk(c);
}
};

for c in chunks {
yield Message::Chunk(c);
}
if let Some(w) = mem::take(&mut watermark) {
yield Message::Watermark(w)
}
Expand Down

0 comments on commit f8adc74

Please sign in to comment.