diff --git a/ci/scripts/run-e2e-test.sh b/ci/scripts/run-e2e-test.sh index b6bf0673e6ae4..ef11428ad5d00 100755 --- a/ci/scripts/run-e2e-test.sh +++ b/ci/scripts/run-e2e-test.sh @@ -81,6 +81,7 @@ cluster_start # Please make sure the regression is expected before increasing the timeout. sqllogictest -p 4566 -d dev './e2e_test/streaming/**/*.slt' --junit "streaming-${profile}" sqllogictest -p 4566 -d dev './e2e_test/backfill/rate_limit/*.slt' +sqllogictest -p 4566 -d dev './e2e_test/backfill/sink/different_pk_and_dist_key.slt' echo "--- Kill cluster" cluster_stop diff --git a/e2e_test/backfill/sink/different_pk_and_dist_key.slt b/e2e_test/backfill/sink/different_pk_and_dist_key.slt new file mode 100644 index 0000000000000..bc8256b28e62a --- /dev/null +++ b/e2e_test/backfill/sink/different_pk_and_dist_key.slt @@ -0,0 +1,49 @@ +statement ok +create table t(v1 int, v2 int primary key, v3 int); + +statement ok +create table t2(v1 int, v2 int primary key, v3 int); + +# Let snapshot side pk >= upstream side +statement ok +insert into t select 50000 + generate_series, 60000 + generate_series, 70000 + generate_series from generate_series(1, 10000); + +statement ok +insert into t2 select 50000 + generate_series, 60000 + generate_series, 70000 + generate_series from generate_series(1, 10000); + +statement ok +flush; + +statement ok +create materialized view m1 as select t.v1, t.v2, t.v3 from t join t2 using(v1); + +statement ok +set streaming_rate_limit = 1; + +statement ok +set background_ddl = true; + +statement ok +create sink s1 as select t.v1, t.v2, t.v3 from m1 join t using(v3) with (connector = 'blackhole'); + +# Let snapshot side pk >= upstream side +statement ok +insert into t select 10000 + generate_series, 20000 + generate_series, 30000 + generate_series from generate_series(1, 10000); + +statement ok +insert into t2 select 10000 + generate_series, 20000 + generate_series, 30000 + generate_series from generate_series(1, 10000); + +statement ok +flush; + +statement ok +drop sink s1; + +statement ok +drop materialized view m1; + +statement ok +drop table t; + +statement ok +drop table t2; \ No newline at end of file diff --git a/src/common/src/hash/table_distribution.rs b/src/common/src/hash/table_distribution.rs index fdf9543a8cd23..c69c9e9a43aec 100644 --- a/src/common/src/hash/table_distribution.rs +++ b/src/common/src/hash/table_distribution.rs @@ -37,7 +37,7 @@ enum ComputeVnode { dist_key_in_pk_indices: Vec<usize>, }, VnodeColumnIndex { - /// Indices of vnode columns. + /// Index of vnode column. vnode_col_idx_in_pk: usize, }, } diff --git a/src/stream/src/executor/backfill/arrangement_backfill.rs b/src/stream/src/executor/backfill/arrangement_backfill.rs index 8624551042796..d300f3320f9c2 100644 --- a/src/stream/src/executor/backfill/arrangement_backfill.rs +++ b/src/stream/src/executor/backfill/arrangement_backfill.rs @@ -445,6 +445,7 @@ where &chunk, &backfill_state, &pk_in_output_indices, + &upstream_table, &pk_order, )?, &self.output_indices, diff --git a/src/stream/src/executor/backfill/utils.rs b/src/stream/src/executor/backfill/utils.rs index 1d469b311fd26..1746924a08dfe 100644 --- a/src/stream/src/executor/backfill/utils.rs +++ b/src/stream/src/executor/backfill/utils.rs @@ -41,10 +41,11 @@ use risingwave_common::util::sort_util::{cmp_datum_iter, OrderType}; use risingwave_common::util::value_encoding::BasicSerde; use risingwave_connector::error::ConnectorError; use risingwave_connector::source::cdc::external::{CdcOffset, CdcOffsetParseFunc}; +use risingwave_storage::row_serde::value_serde::ValueRowSerde; use risingwave_storage::table::{collect_data_chunk_with_builder, KeyedRow}; use risingwave_storage::StateStore; -use crate::common::table::state_table::StateTableInner; +use crate::common::table::state_table::{ReplicatedStateTable, StateTableInner}; use crate::executor::{ Message, PkIndicesRef, StreamExecutorError, StreamExecutorResult, Watermark, }; @@ -336,10 +337,11 @@ pub(crate) fn mark_cdc_chunk( /// For each row of the chunk, forward it to downstream if its pk <= `current_pos` for the /// corresponding `vnode`, otherwise ignore it. /// We implement it by changing the visibility bitmap. -pub(crate) fn mark_chunk_ref_by_vnode( +pub(crate) fn mark_chunk_ref_by_vnode<S: StateStore, SD: ValueRowSerde>( chunk: &StreamChunk, backfill_state: &BackfillState, pk_in_output_indices: PkIndicesRef<'_>, + upstream_table: &ReplicatedStateTable<S, SD>, pk_order: &[OrderType], ) -> StreamExecutorResult<StreamChunk> { let chunk = chunk.clone(); @@ -347,7 +349,8 @@ pub(crate) fn mark_chunk_ref_by_vnode( let mut new_visibility = BitmapBuilder::with_capacity(ops.len()); // Use project to avoid allocation. for row in data.rows() { - let vnode = VirtualNode::compute_row(row, pk_in_output_indices); + let pk = row.project(pk_in_output_indices); + let vnode = upstream_table.compute_vnode_by_pk(pk); let v = match backfill_state.get_progress(&vnode)? { // We want to just forward the row, if the vnode has finished backfill. BackfillProgressPerVnode::Completed { .. } => true, @@ -355,10 +358,7 @@ pub(crate) fn mark_chunk_ref_by_vnode( BackfillProgressPerVnode::NotStarted => false, // If in progress, we need to check row <= current_pos. BackfillProgressPerVnode::InProgress { current_pos, .. } => { - let lhs = row.project(pk_in_output_indices); - let rhs = current_pos; - let order = cmp_datum_iter(lhs.iter(), rhs.iter(), pk_order.iter().copied()); - match order { + match cmp_datum_iter(pk.iter(), current_pos.iter(), pk_order.iter().copied()) { Ordering::Less | Ordering::Equal => true, Ordering::Greater => false, }