diff --git a/src/common/src/array/compact_chunk.rs b/src/common/src/array/compact_chunk.rs index c009621af2135..2b93e28a0be7d 100644 --- a/src/common/src/array/compact_chunk.rs +++ b/src/common/src/array/compact_chunk.rs @@ -121,9 +121,6 @@ impl StreamChunkCompactor { let mut op_row_map: OpRowMap<'_, '_> = new_prehashed_map_with_capacity(estimate_size); for (hash_values, c) in &mut chunks { for (row, mut op_row) in c.to_rows_mut() { - if !op_row.vis() { - continue; - } op_row.set_op(op_row.op().normalize_update()); let hash = hash_values[row.index()]; let stream_key = row.project(&key_indices); diff --git a/src/common/src/array/stream_chunk.rs b/src/common/src/array/stream_chunk.rs index 3175806257e7a..d072c65ad1826 100644 --- a/src/common/src/array/stream_chunk.rs +++ b/src/common/src/array/stream_chunk.rs @@ -481,14 +481,16 @@ impl StreamChunkMut { /// get the mut reference of the stream chunk. pub fn to_rows_mut(&mut self) -> impl Iterator, OpRowMutRef<'_>)> { unsafe { - (0..self.vis.len()).map(|i| { - let p = self as *const StreamChunkMut; - let p = p as *mut StreamChunkMut; - ( - RowRef::with_columns(self.columns(), i), - OpRowMutRef { c: &mut *p, i }, - ) - }) + (0..self.vis.len()) + .filter(|i| self.vis.is_set(*i)) + .map(|i| { + let p = self as *const StreamChunkMut; + let p = p as *mut StreamChunkMut; + ( + RowRef::with_columns(self.columns(), i), + OpRowMutRef { c: &mut *p, i }, + ) + }) } } } diff --git a/src/stream/src/executor/sink.rs b/src/stream/src/executor/sink.rs index 2a476ca4e94b8..5e417d922e1ae 100644 --- a/src/stream/src/executor/sink.rs +++ b/src/stream/src/executor/sink.rs @@ -20,9 +20,9 @@ use futures::{FutureExt, StreamExt}; use futures_async_stream::try_stream; use itertools::Itertools; use prometheus::Histogram; +use risingwave_common::array::stream_chunk::StreamChunkMut; use risingwave_common::array::{merge_chunk_row, Op, StreamChunk}; use risingwave_common::catalog::{ColumnCatalog, Field, Schema}; -use risingwave_common::types::DataType; use risingwave_common::util::epoch::EpochPair; use risingwave_connector::dispatch_sink; use risingwave_connector::sink::catalog::SinkType; @@ -33,7 +33,6 @@ use risingwave_connector::sink::{ use super::error::{StreamExecutorError, StreamExecutorResult}; use super::{BoxedExecutor, Executor, Message, PkIndices}; use crate::common::log_store::{LogReader, LogStoreFactory, LogStoreReadItem, LogWriter}; -use crate::common::StreamChunkBuilder; use crate::executor::monitor::StreamingMetrics; use crate::executor::{expect_first_barrier, ActorContextRef, BoxedMessageStream}; @@ -57,15 +56,16 @@ struct SinkMetrics { } // Drop all the DELETE messages in this chunk and convert UPDATE INSERT into INSERT. -fn force_append_only(chunk: StreamChunk, data_types: Vec) -> Option { - let mut builder = StreamChunkBuilder::new(chunk.cardinality() + 1, data_types); - for (op, row_ref) in chunk.rows() { - if op == Op::Insert || op == Op::UpdateInsert { - let none = builder.append_row(Op::Insert, row_ref); - assert!(none.is_none()); +fn force_append_only(c: StreamChunk) -> StreamChunk { + let mut c: StreamChunkMut = c.into(); + for (_, mut r) in c.to_rows_mut() { + match r.op() { + Op::Insert => {} + Op::Delete | Op::UpdateDelete => r.set_vis(false), + Op::UpdateInsert => r.set_op(Op::Insert), } } - builder.take() + c.into() } impl SinkExecutor { @@ -117,7 +117,6 @@ impl SinkExecutor { self.input, self.pk_indices, self.log_writer, - self.input_columns.clone(), self.sink_param.sink_type, self.actor_context, ); @@ -139,17 +138,11 @@ impl SinkExecutor { input: BoxedExecutor, stream_key: PkIndices, mut log_writer: impl LogWriter, - columns: Vec, sink_type: SinkType, actor_context: ActorContextRef, ) { let mut input = input.execute(); - let data_types = columns - .iter() - .map(|col| col.column_desc.data_type.clone()) - .collect_vec(); - let barrier = expect_first_barrier(&mut input).await?; let epoch_pair = barrier.epoch; @@ -169,21 +162,19 @@ impl SinkExecutor { // Compact the chunk to eliminate any useless intermediate result (e.g. UPDATE // V->V). let chunk = merge_chunk_row(chunk, &stream_key); - let visible_chunk = if sink_type == SinkType::ForceAppendOnly { + let chunk = if sink_type == SinkType::ForceAppendOnly { // Force append-only by dropping UPDATE/DELETE messages. We do this when the // user forces the sink to be append-only while it is actually not based on // the frontend derivation result. - force_append_only(chunk.clone(), data_types.clone()) + force_append_only(chunk) } else { - Some(chunk.clone().compact()) + chunk }; - if let Some(chunk) = visible_chunk { - log_writer.write_chunk(chunk.clone()).await?; + log_writer.write_chunk(chunk.clone()).await?; - // Use original chunk instead of the reordered one as the executor output. - yield Message::Chunk(chunk); - } + // Use original chunk instead of the reordered one as the executor output. + yield Message::Chunk(chunk); } Message::Barrier(barrier) => { log_writer @@ -425,7 +416,7 @@ mod test { let chunk_msg = executor.next().await.unwrap().unwrap(); assert_eq!( - chunk_msg.into_chunk().unwrap(), + chunk_msg.into_chunk().unwrap().compact(), StreamChunk::from_pretty( " I I I + 3 2 1", @@ -437,7 +428,7 @@ mod test { let chunk_msg = executor.next().await.unwrap().unwrap(); assert_eq!( - chunk_msg.into_chunk().unwrap(), + chunk_msg.into_chunk().unwrap().compact(), StreamChunk::from_pretty( " I I I + 3 4 1