From 331f079fb2e3ffa1d1f762b9fc2e77df63b5bd67 Mon Sep 17 00:00:00 2001 From: stonepage <40830455+st1page@users.noreply.github.com> Date: Thu, 11 Apr 2024 15:36:01 +0800 Subject: [PATCH] feat: suppress sink pk compaction warn && not warn for force append only sink (#16251) --- src/stream/src/common/compact_chunk.rs | 53 ++++++++++++++++++-------- src/stream/src/executor/sink.rs | 1 + 2 files changed, 39 insertions(+), 15 deletions(-) diff --git a/src/stream/src/common/compact_chunk.rs b/src/stream/src/common/compact_chunk.rs index 24e194bdb0c82..32dc4d0a996a6 100644 --- a/src/stream/src/common/compact_chunk.rs +++ b/src/stream/src/common/compact_chunk.rs @@ -16,6 +16,7 @@ use std::collections::hash_map::Entry; use std::collections::HashMap; use std::hash::BuildHasherDefault; use std::mem; +use std::sync::LazyLock; use itertools::Itertools; use prehash::{new_prehashed_map_with_capacity, Passthru, Prehashed}; @@ -23,6 +24,7 @@ use risingwave_common::array::stream_chunk::{OpRowMutRef, StreamChunkMut}; use risingwave_common::array::stream_chunk_builder::StreamChunkBuilder; use risingwave_common::array::stream_record::Record; use risingwave_common::array::{Op, RowRef, StreamChunk}; +use risingwave_common::log::LogSuppresser; use risingwave_common::row::{Project, RowExt}; use risingwave_common::types::DataType; use risingwave_common::util::hash_util::Crc32FastBuilder; @@ -106,15 +108,18 @@ pub enum RowOp<'a> { /// (old_value, new_value) Update((RowRef<'a>, RowRef<'a>)), } +static LOG_SUPPERSSER: LazyLock = LazyLock::new(LogSuppresser::default); pub struct RowOpMap<'a, 'b> { map: HashMap>>, RowOp<'a>, BuildHasherDefault>, + warn_for_inconsistent_stream: bool, } impl<'a, 'b> RowOpMap<'a, 'b> { - fn with_capacity(estimate_size: usize) -> Self { + fn with_capacity(estimate_size: usize, warn_for_inconsistent_stream: bool) -> Self { Self { map: new_prehashed_map_with_capacity(estimate_size), + warn_for_inconsistent_stream, } } @@ -124,19 +129,31 @@ impl<'a, 'b> RowOpMap<'a, 'b> { Entry::Vacant(e) => { e.insert(RowOp::Insert(v)); } - Entry::Occupied(mut e) => match e.get() { - RowOp::Delete(ref old_v) => { - e.insert(RowOp::Update((*old_v, v))); - } - RowOp::Insert(_) => { - tracing::warn!("double insert for the same pk"); - e.insert(RowOp::Insert(v)); - } - RowOp::Update((ref old_v, _)) => { - tracing::warn!("double insert for the same pk"); - e.insert(RowOp::Update((*old_v, v))); + Entry::Occupied(mut e) => { + match e.get() { + RowOp::Delete(ref old_v) => { + e.insert(RowOp::Update((*old_v, v))); + } + RowOp::Insert(_) => { + if self.warn_for_inconsistent_stream { + if let Ok(suppressed_count) = LOG_SUPPERSSER.check() { + tracing::warn!( + suppressed_count, "double insert for the same pk, breaking the sink's pk constraint" + ); + } + } + e.insert(RowOp::Insert(v)); + } + RowOp::Update((ref old_v, _)) => { + if self.warn_for_inconsistent_stream { + if let Ok(suppressed_count) = LOG_SUPPERSSER.check() { + tracing::warn!(suppressed_count, "double insert for the same pk, breaking the sink's pk constraint"); + } + } + e.insert(RowOp::Update((*old_v, v))); + } } - }, + } } } @@ -154,7 +171,11 @@ impl<'a, 'b> RowOpMap<'a, 'b> { e.insert(RowOp::Delete(*prev)); } RowOp::Delete(_) => { - tracing::warn!("double delete for the same pk"); + if self.warn_for_inconsistent_stream { + if let Ok(suppressed_count) = LOG_SUPPERSSER.check() { + tracing::warn!(suppressed_count, "double delete for the same pk"); + } + } e.insert(RowOp::Delete(v)); } }, @@ -271,6 +292,7 @@ impl StreamChunkCompactor { self, chunk_size: usize, data_types: Vec, + warn_for_inconsistent_stream: bool, ) -> Vec { let (chunks, key_indices) = self.into_inner(); @@ -287,7 +309,7 @@ impl StreamChunkCompactor { (hash_values, ops, c) }) .collect_vec(); - let mut map = RowOpMap::with_capacity(estimate_size); + let mut map = RowOpMap::with_capacity(estimate_size, warn_for_inconsistent_stream); for (hash_values, ops, c) in &chunks { for row in c.rows() { let hash = hash_values[row.index()]; @@ -395,6 +417,7 @@ mod tests { let chunks = compactor.reconstructed_compacted_chunks( 100, vec![DataType::Int64, DataType::Int64, DataType::Int64], + true, ); assert_eq!( chunks.into_iter().next().unwrap(), diff --git a/src/stream/src/executor/sink.rs b/src/stream/src/executor/sink.rs index bdfd6c1cfc551..f552f8ba8e184 100644 --- a/src/stream/src/executor/sink.rs +++ b/src/stream/src/executor/sink.rs @@ -343,6 +343,7 @@ impl SinkExecutor { .reconstructed_compacted_chunks( chunk_size, input_data_types.clone(), + sink_type != SinkType::ForceAppendOnly, ) } else { chunks