diff --git a/src/stream/src/common/log_store_impl/kv_log_store/writer.rs b/src/stream/src/common/log_store_impl/kv_log_store/writer.rs index a07757a5510a9..b0fcc5c246ee9 100644 --- a/src/stream/src/common/log_store_impl/kv_log_store/writer.rs +++ b/src/stream/src/common/log_store_impl/kv_log_store/writer.rs @@ -68,7 +68,9 @@ impl LogWriter for KvLogStoreWriter { } async fn write_chunk(&mut self, chunk: StreamChunk) -> LogStoreResult<()> { - assert!(chunk.cardinality() > 0); + if chunk.cardinality() == 0 { + return Ok(()); + } let epoch = self.state_store.epoch(); let start_seq_id = self.seq_id; self.seq_id += chunk.cardinality() as SeqIdType; diff --git a/src/stream/src/executor/sink.rs b/src/stream/src/executor/sink.rs index 14394073e8df7..11a5ae11e120a 100644 --- a/src/stream/src/executor/sink.rs +++ b/src/stream/src/executor/sink.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::mem; use std::sync::Arc; use anyhow::anyhow; @@ -20,7 +21,7 @@ use futures::{FutureExt, StreamExt}; use futures_async_stream::try_stream; use itertools::Itertools; use risingwave_common::array::stream_chunk::StreamChunkMut; -use risingwave_common::array::{merge_chunk_row, Op, StreamChunk}; +use risingwave_common::array::{merge_chunk_row, Op, StreamChunk, StreamChunkCompactor}; use risingwave_common::catalog::{ColumnCatalog, Field, Schema}; use risingwave_common::util::epoch::EpochPair; use risingwave_connector::dispatch_sink; @@ -65,6 +66,19 @@ fn force_append_only(c: StreamChunk) -> StreamChunk { c.into() } +// Drop all the INSERT messages in this chunk and convert UPDATE DELETE into DELETE. +fn force_delete_only(c: StreamChunk) -> StreamChunk { + let mut c: StreamChunkMut = c.into(); + for (_, mut r) in c.to_rows_mut() { + match r.op() { + Op::Delete => {} + Op::Insert | Op::UpdateInsert => r.set_vis(false), + Op::UpdateDelete => r.set_op(Op::Delete), + } + } + c.into() +} + impl SinkExecutor { #[allow(clippy::too_many_arguments)] pub async fn new( @@ -101,12 +115,21 @@ impl SinkExecutor { } fn execute_inner(self) -> BoxedMessageStream { + let stream_key = self.pk_indices; + + let stream_key_sink_pk_mismatch = { + stream_key + .iter() + .any(|i| !self.sink_param.downstream_pk.contains(i)) + }; + let write_log_stream = Self::execute_write_log( self.input, - self.pk_indices, + stream_key, self.log_writer, self.sink_param.sink_type, self.actor_context, + stream_key_sink_pk_mismatch, ); dispatch_sink!(self.sink, sink, { @@ -127,6 +150,7 @@ impl SinkExecutor { mut log_writer: impl LogWriter, sink_type: SinkType, actor_context: ActorContextRef, + stream_key_sink_pk_mismatch: bool, ) { let mut input = input.execute(); @@ -141,36 +165,115 @@ impl SinkExecutor { // Propagate the first barrier yield Message::Barrier(barrier); - #[for_await] - for msg in input { - match msg? { - Message::Watermark(w) => yield Message::Watermark(w), - Message::Chunk(chunk) => { - // Compact the chunk to eliminate any useless intermediate result (e.g. UPDATE - // V->V). - let chunk = merge_chunk_row(chunk, &stream_key); - let chunk = if sink_type == SinkType::ForceAppendOnly { - // Force append-only by dropping UPDATE/DELETE messages. We do this when the - // user forces the sink to be append-only while it is actually not based on - // the frontend derivation result. - force_append_only(chunk) - } else { - chunk - }; - - log_writer.write_chunk(chunk.clone()).await?; - - // Use original chunk instead of the reordered one as the executor output. - yield Message::Chunk(chunk); + // When stream key is different from the user defined primary key columns for sinks. The operations could be out of order + // stream key: a,b + // sink pk: a + + // original: + // (1,1) -> (1,2) + // (1,2) -> (1,3) + + // mv fragment 1: + // delete (1,1) + + // mv fragment 2: + // insert (1,2) + // delete (1,2) + + // mv fragment 3: + // insert (1,3) + + // merge to sink fragment: + // insert (1,3) + // insert (1,2) + // delete (1,2) + // delete (1,1) + // So we do additional compaction in the sink executor per barrier. + + // 1. compact all the chanes with the stream key. + // 2. sink all the delete events and then sink all insert evernt. + + // after compacting with the stream key, the two event with the same used defined sink pk must have different stream key. + // So the delete event is not to delete the inserted record in our internal streaming SQL semantic. + if stream_key_sink_pk_mismatch && sink_type != SinkType::AppendOnly { + let mut chunk_buffer = StreamChunkCompactor::new(stream_key.clone()); + let mut watermark = None; + #[for_await] + for msg in input { + match msg? { + Message::Watermark(w) => watermark = Some(w), + Message::Chunk(c) => { + chunk_buffer.push_chunk(c); + } + Message::Barrier(barrier) => { + let mut delete_chunks = vec![]; + let mut insert_chunks = vec![]; + for c in mem::replace( + &mut chunk_buffer, + StreamChunkCompactor::new(stream_key.clone()), + ) + .into_compacted_chunks() + { + if sink_type != SinkType::ForceAppendOnly { + // Force append-only by dropping UPDATE/DELETE messages. We do this when the + // user forces the sink to be append-only while it is actually not based on + // the frontend derivation result. + delete_chunks.push(force_delete_only(c.clone())); + } + insert_chunks.push(force_append_only(c)); + } + + for c in delete_chunks.into_iter().chain(insert_chunks.into_iter()) { + log_writer.write_chunk(c.clone()).await?; + yield Message::Chunk(c); + } + if let Some(w) = mem::take(&mut watermark) { + yield Message::Watermark(w) + } + log_writer + .flush_current_epoch(barrier.epoch.curr, barrier.kind.is_checkpoint()) + .await?; + if let Some(vnode_bitmap) = barrier.as_update_vnode_bitmap(actor_context.id) + { + log_writer.update_vnode_bitmap(vnode_bitmap); + } + yield Message::Barrier(barrier); + } } - Message::Barrier(barrier) => { - log_writer - .flush_current_epoch(barrier.epoch.curr, barrier.kind.is_checkpoint()) - .await?; - if let Some(vnode_bitmap) = barrier.as_update_vnode_bitmap(actor_context.id) { - log_writer.update_vnode_bitmap(vnode_bitmap); + } + } else { + #[for_await] + for msg in input { + match msg? { + Message::Watermark(w) => yield Message::Watermark(w), + Message::Chunk(chunk) => { + // Compact the chunk to eliminate any useless intermediate result (e.g. UPDATE + // V->V). + let chunk = merge_chunk_row(chunk, &stream_key); + let chunk = if sink_type == SinkType::ForceAppendOnly { + // Force append-only by dropping UPDATE/DELETE messages. We do this when the + // user forces the sink to be append-only while it is actually not based on + // the frontend derivation result. + force_append_only(chunk) + } else { + chunk + }; + + log_writer.write_chunk(chunk.clone()).await?; + + // Use original chunk instead of the reordered one as the executor output. + yield Message::Chunk(chunk); + } + Message::Barrier(barrier) => { + log_writer + .flush_current_epoch(barrier.epoch.curr, barrier.kind.is_checkpoint()) + .await?; + if let Some(vnode_bitmap) = barrier.as_update_vnode_bitmap(actor_context.id) + { + log_writer.update_vnode_bitmap(vnode_bitmap); + } + yield Message::Barrier(barrier); } - yield Message::Barrier(barrier); } } } @@ -352,6 +455,147 @@ mod test { executor.next().await.unwrap().unwrap(); } + #[tokio::test] + async fn stream_key_sink_pk_mismatch() { + use risingwave_common::array::stream_chunk::StreamChunk; + use risingwave_common::array::StreamChunkTestExt; + use risingwave_common::types::DataType; + + use crate::executor::Barrier; + + let properties = maplit::hashmap! { + "connector".into() => "blackhole".into(), + }; + + // We have two visible columns and one hidden column. The hidden column will be pruned out + // within the sink executor. + let columns = vec![ + ColumnCatalog { + column_desc: ColumnDesc::unnamed(ColumnId::new(0), DataType::Int64), + is_hidden: false, + }, + ColumnCatalog { + column_desc: ColumnDesc::unnamed(ColumnId::new(1), DataType::Int64), + is_hidden: false, + }, + ColumnCatalog { + column_desc: ColumnDesc::unnamed(ColumnId::new(2), DataType::Int64), + is_hidden: true, + }, + ]; + let schema: Schema = columns + .iter() + .map(|column| Field::from(column.column_desc.clone())) + .collect(); + + let mock = MockSource::with_messages( + schema, + vec![0, 1], + vec![ + Message::Barrier(Barrier::new_test_barrier(1)), + Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty( + " I I I + + 1 1 10", + ))), + Message::Barrier(Barrier::new_test_barrier(2)), + Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty( + " I I I + + 1 3 30", + ))), + Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty( + " I I I + + 1 2 20 + - 1 2 20", + ))), + Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty( + " I I I + - 1 1 10", + ))), + Message::Barrier(Barrier::new_test_barrier(3)), + ], + ); + + let sink_param = SinkParam { + sink_id: 0.into(), + properties, + columns: columns + .iter() + .filter(|col| !col.is_hidden) + .map(|col| col.column_desc.clone()) + .collect(), + downstream_pk: vec![0], + sink_type: SinkType::Upsert, + db_name: "test".into(), + sink_from_name: "test".into(), + }; + + let sink_executor = SinkExecutor::new( + Box::new(mock), + Arc::new(StreamingMetrics::unused()), + SinkWriterParam::default(), + sink_param, + columns.clone(), + ActorContext::create(0), + BoundedInMemLogStoreFactory::new(1), + vec![0, 1], + ) + .await + .unwrap(); + + let mut executor = SinkExecutor::execute(Box::new(sink_executor)); + + // Barrier message. + executor.next().await.unwrap().unwrap(); + + let chunk_msg = executor.next().await.unwrap().unwrap(); + assert_eq!(chunk_msg.into_chunk().unwrap().cardinality(), 0); + + let chunk_msg = executor.next().await.unwrap().unwrap(); + assert_eq!( + chunk_msg.into_chunk().unwrap().compact(), + StreamChunk::from_pretty( + " I I I + + 1 1 10", + ) + ); + + // Barrier message. + executor.next().await.unwrap().unwrap(); + + let chunk_msg = executor.next().await.unwrap().unwrap(); + assert_eq!(chunk_msg.into_chunk().unwrap().cardinality(), 0); + let chunk_msg = executor.next().await.unwrap().unwrap(); + assert_eq!(chunk_msg.into_chunk().unwrap().cardinality(), 0); + + let chunk_msg = executor.next().await.unwrap().unwrap(); + assert_eq!( + chunk_msg.into_chunk().unwrap().compact(), + StreamChunk::from_pretty( + " I I I + - 1 1 10", + ) + ); + + let chunk_msg = executor.next().await.unwrap().unwrap(); + assert_eq!( + chunk_msg.into_chunk().unwrap().compact(), + StreamChunk::from_pretty( + " I I I + + 1 3 30", + ) + ); + let chunk_msg = executor.next().await.unwrap().unwrap(); + assert_eq!(chunk_msg.into_chunk().unwrap().cardinality(), 0); + let chunk_msg = executor.next().await.unwrap().unwrap(); + assert_eq!(chunk_msg.into_chunk().unwrap().cardinality(), 0); + + // Should not receive the third stream chunk message because the force-append-only sink + // executor will drop all DELETE messages. + + // The last barrier message. + executor.next().await.unwrap().unwrap(); + } + #[tokio::test] async fn test_empty_barrier_sink() { use risingwave_common::types::DataType; diff --git a/src/tests/simulation/tests/integration_tests/sink/basic.rs b/src/tests/simulation/tests/integration_tests/sink/basic.rs index ad8eb8a4d9cf2..aa8c8725b3815 100644 --- a/src/tests/simulation/tests/integration_tests/sink/basic.rs +++ b/src/tests/simulation/tests/integration_tests/sink/basic.rs @@ -177,6 +177,7 @@ async fn test_sink_basic() -> Result<()> { sleep(Duration::from_millis(10)).await; } } + sleep(Duration::from_millis(10000)).await; assert_eq!(6, parallelism_counter.load(Relaxed)); assert_eq!(count, row_counter.load(Relaxed));