Skip to content

Commit

Permalink
feat(sink): do not compact chunk's vis in sink (#12428)
Browse files Browse the repository at this point in the history
  • Loading branch information
st1page authored Sep 19, 2023
1 parent c8ea5ee commit a6ab35f
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 37 deletions.
3 changes: 0 additions & 3 deletions src/common/src/array/compact_chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,6 @@ impl StreamChunkCompactor {
let mut op_row_map: OpRowMap<'_, '_> = new_prehashed_map_with_capacity(estimate_size);
for (hash_values, c) in &mut chunks {
for (row, mut op_row) in c.to_rows_mut() {
if !op_row.vis() {
continue;
}
op_row.set_op(op_row.op().normalize_update());
let hash = hash_values[row.index()];
let stream_key = row.project(&key_indices);
Expand Down
18 changes: 10 additions & 8 deletions src/common/src/array/stream_chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -481,14 +481,16 @@ impl StreamChunkMut {
/// get the mut reference of the stream chunk.
pub fn to_rows_mut(&mut self) -> impl Iterator<Item = (RowRef<'_>, OpRowMutRef<'_>)> {
unsafe {
(0..self.vis.len()).map(|i| {
let p = self as *const StreamChunkMut;
let p = p as *mut StreamChunkMut;
(
RowRef::with_columns(self.columns(), i),
OpRowMutRef { c: &mut *p, i },
)
})
(0..self.vis.len())
.filter(|i| self.vis.is_set(*i))
.map(|i| {
let p = self as *const StreamChunkMut;
let p = p as *mut StreamChunkMut;
(
RowRef::with_columns(self.columns(), i),
OpRowMutRef { c: &mut *p, i },
)
})
}
}
}
Expand Down
43 changes: 17 additions & 26 deletions src/stream/src/executor/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ use futures::{FutureExt, StreamExt};
use futures_async_stream::try_stream;
use itertools::Itertools;
use prometheus::Histogram;
use risingwave_common::array::stream_chunk::StreamChunkMut;
use risingwave_common::array::{merge_chunk_row, Op, StreamChunk};
use risingwave_common::catalog::{ColumnCatalog, Field, Schema};
use risingwave_common::types::DataType;
use risingwave_common::util::epoch::EpochPair;
use risingwave_connector::dispatch_sink;
use risingwave_connector::sink::catalog::SinkType;
Expand All @@ -33,7 +33,6 @@ use risingwave_connector::sink::{
use super::error::{StreamExecutorError, StreamExecutorResult};
use super::{BoxedExecutor, Executor, Message, PkIndices};
use crate::common::log_store::{LogReader, LogStoreFactory, LogStoreReadItem, LogWriter};
use crate::common::StreamChunkBuilder;
use crate::executor::monitor::StreamingMetrics;
use crate::executor::{expect_first_barrier, ActorContextRef, BoxedMessageStream};

Expand All @@ -57,15 +56,16 @@ struct SinkMetrics {
}

// Drop all the DELETE messages in this chunk and convert UPDATE INSERT into INSERT.
fn force_append_only(chunk: StreamChunk, data_types: Vec<DataType>) -> Option<StreamChunk> {
let mut builder = StreamChunkBuilder::new(chunk.cardinality() + 1, data_types);
for (op, row_ref) in chunk.rows() {
if op == Op::Insert || op == Op::UpdateInsert {
let none = builder.append_row(Op::Insert, row_ref);
assert!(none.is_none());
fn force_append_only(c: StreamChunk) -> StreamChunk {
let mut c: StreamChunkMut = c.into();
for (_, mut r) in c.to_rows_mut() {
match r.op() {
Op::Insert => {}
Op::Delete | Op::UpdateDelete => r.set_vis(false),
Op::UpdateInsert => r.set_op(Op::Insert),
}
}
builder.take()
c.into()
}

impl<F: LogStoreFactory> SinkExecutor<F> {
Expand Down Expand Up @@ -117,7 +117,6 @@ impl<F: LogStoreFactory> SinkExecutor<F> {
self.input,
self.pk_indices,
self.log_writer,
self.input_columns.clone(),
self.sink_param.sink_type,
self.actor_context,
);
Expand All @@ -139,17 +138,11 @@ impl<F: LogStoreFactory> SinkExecutor<F> {
input: BoxedExecutor,
stream_key: PkIndices,
mut log_writer: impl LogWriter,
columns: Vec<ColumnCatalog>,
sink_type: SinkType,
actor_context: ActorContextRef,
) {
let mut input = input.execute();

let data_types = columns
.iter()
.map(|col| col.column_desc.data_type.clone())
.collect_vec();

let barrier = expect_first_barrier(&mut input).await?;

let epoch_pair = barrier.epoch;
Expand All @@ -169,21 +162,19 @@ impl<F: LogStoreFactory> SinkExecutor<F> {
// Compact the chunk to eliminate any useless intermediate result (e.g. UPDATE
// V->V).
let chunk = merge_chunk_row(chunk, &stream_key);
let visible_chunk = if sink_type == SinkType::ForceAppendOnly {
let chunk = if sink_type == SinkType::ForceAppendOnly {
// Force append-only by dropping UPDATE/DELETE messages. We do this when the
// user forces the sink to be append-only while it is actually not based on
// the frontend derivation result.
force_append_only(chunk.clone(), data_types.clone())
force_append_only(chunk)
} else {
Some(chunk.clone().compact())
chunk
};

if let Some(chunk) = visible_chunk {
log_writer.write_chunk(chunk.clone()).await?;
log_writer.write_chunk(chunk.clone()).await?;

// Use original chunk instead of the reordered one as the executor output.
yield Message::Chunk(chunk);
}
// Use original chunk instead of the reordered one as the executor output.
yield Message::Chunk(chunk);
}
Message::Barrier(barrier) => {
log_writer
Expand Down Expand Up @@ -425,7 +416,7 @@ mod test {

let chunk_msg = executor.next().await.unwrap().unwrap();
assert_eq!(
chunk_msg.into_chunk().unwrap(),
chunk_msg.into_chunk().unwrap().compact(),
StreamChunk::from_pretty(
" I I I
+ 3 2 1",
Expand All @@ -437,7 +428,7 @@ mod test {

let chunk_msg = executor.next().await.unwrap().unwrap();
assert_eq!(
chunk_msg.into_chunk().unwrap(),
chunk_msg.into_chunk().unwrap().compact(),
StreamChunk::from_pretty(
" I I I
+ 3 4 1
Expand Down

0 comments on commit a6ab35f

Please sign in to comment.