Skip to content

Commit

Permalink
feat: suppress sink pk compaction warn && not warn for force append o…
Browse files Browse the repository at this point in the history
…nly sink (#16251)
  • Loading branch information
st1page authored Apr 11, 2024
1 parent 1cc84f7 commit 331f079
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 15 deletions.
53 changes: 38 additions & 15 deletions src/stream/src/common/compact_chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@ use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::hash::BuildHasherDefault;
use std::mem;
use std::sync::LazyLock;

use itertools::Itertools;
use prehash::{new_prehashed_map_with_capacity, Passthru, Prehashed};
use risingwave_common::array::stream_chunk::{OpRowMutRef, StreamChunkMut};
use risingwave_common::array::stream_chunk_builder::StreamChunkBuilder;
use risingwave_common::array::stream_record::Record;
use risingwave_common::array::{Op, RowRef, StreamChunk};
use risingwave_common::log::LogSuppresser;
use risingwave_common::row::{Project, RowExt};
use risingwave_common::types::DataType;
use risingwave_common::util::hash_util::Crc32FastBuilder;
Expand Down Expand Up @@ -106,15 +108,18 @@ pub enum RowOp<'a> {
/// (old_value, new_value)
Update((RowRef<'a>, RowRef<'a>)),
}
static LOG_SUPPERSSER: LazyLock<LogSuppresser> = LazyLock::new(LogSuppresser::default);

pub struct RowOpMap<'a, 'b> {
map: HashMap<Prehashed<Project<'b, RowRef<'a>>>, RowOp<'a>, BuildHasherDefault<Passthru>>,
warn_for_inconsistent_stream: bool,
}

impl<'a, 'b> RowOpMap<'a, 'b> {
fn with_capacity(estimate_size: usize) -> Self {
fn with_capacity(estimate_size: usize, warn_for_inconsistent_stream: bool) -> Self {
Self {
map: new_prehashed_map_with_capacity(estimate_size),
warn_for_inconsistent_stream,
}
}

Expand All @@ -124,19 +129,31 @@ impl<'a, 'b> RowOpMap<'a, 'b> {
Entry::Vacant(e) => {
e.insert(RowOp::Insert(v));
}
Entry::Occupied(mut e) => match e.get() {
RowOp::Delete(ref old_v) => {
e.insert(RowOp::Update((*old_v, v)));
}
RowOp::Insert(_) => {
tracing::warn!("double insert for the same pk");
e.insert(RowOp::Insert(v));
}
RowOp::Update((ref old_v, _)) => {
tracing::warn!("double insert for the same pk");
e.insert(RowOp::Update((*old_v, v)));
Entry::Occupied(mut e) => {
match e.get() {
RowOp::Delete(ref old_v) => {
e.insert(RowOp::Update((*old_v, v)));
}
RowOp::Insert(_) => {
if self.warn_for_inconsistent_stream {
if let Ok(suppressed_count) = LOG_SUPPERSSER.check() {
tracing::warn!(
suppressed_count, "double insert for the same pk, breaking the sink's pk constraint"
);
}
}
e.insert(RowOp::Insert(v));
}
RowOp::Update((ref old_v, _)) => {
if self.warn_for_inconsistent_stream {
if let Ok(suppressed_count) = LOG_SUPPERSSER.check() {
tracing::warn!(suppressed_count, "double insert for the same pk, breaking the sink's pk constraint");
}
}
e.insert(RowOp::Update((*old_v, v)));
}
}
},
}
}
}

Expand All @@ -154,7 +171,11 @@ impl<'a, 'b> RowOpMap<'a, 'b> {
e.insert(RowOp::Delete(*prev));
}
RowOp::Delete(_) => {
tracing::warn!("double delete for the same pk");
if self.warn_for_inconsistent_stream {
if let Ok(suppressed_count) = LOG_SUPPERSSER.check() {
tracing::warn!(suppressed_count, "double delete for the same pk");
}
}
e.insert(RowOp::Delete(v));
}
},
Expand Down Expand Up @@ -271,6 +292,7 @@ impl StreamChunkCompactor {
self,
chunk_size: usize,
data_types: Vec<DataType>,
warn_for_inconsistent_stream: bool,
) -> Vec<StreamChunk> {
let (chunks, key_indices) = self.into_inner();

Expand All @@ -287,7 +309,7 @@ impl StreamChunkCompactor {
(hash_values, ops, c)
})
.collect_vec();
let mut map = RowOpMap::with_capacity(estimate_size);
let mut map = RowOpMap::with_capacity(estimate_size, warn_for_inconsistent_stream);
for (hash_values, ops, c) in &chunks {
for row in c.rows() {
let hash = hash_values[row.index()];
Expand Down Expand Up @@ -395,6 +417,7 @@ mod tests {
let chunks = compactor.reconstructed_compacted_chunks(
100,
vec![DataType::Int64, DataType::Int64, DataType::Int64],
true,
);
assert_eq!(
chunks.into_iter().next().unwrap(),
Expand Down
1 change: 1 addition & 0 deletions src/stream/src/executor/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,7 @@ impl<F: LogStoreFactory> SinkExecutor<F> {
.reconstructed_compacted_chunks(
chunk_size,
input_data_types.clone(),
sink_type != SinkType::ForceAppendOnly,
)
} else {
chunks
Expand Down

0 comments on commit 331f079

Please sign in to comment.