diff --git a/ci/scripts/run-e2e-test.sh b/ci/scripts/run-e2e-test.sh
index b6bf0673e6ae4..ef11428ad5d00 100755
--- a/ci/scripts/run-e2e-test.sh
+++ b/ci/scripts/run-e2e-test.sh
@@ -81,6 +81,7 @@ cluster_start
 # Please make sure the regression is expected before increasing the timeout.
 sqllogictest -p 4566 -d dev './e2e_test/streaming/**/*.slt' --junit "streaming-${profile}"
 sqllogictest -p 4566 -d dev './e2e_test/backfill/rate_limit/*.slt'
+sqllogictest -p 4566 -d dev './e2e_test/backfill/sink/different_pk_and_dist_key.slt'
 
 echo "--- Kill cluster"
 cluster_stop
diff --git a/e2e_test/backfill/sink/different_pk_and_dist_key.slt b/e2e_test/backfill/sink/different_pk_and_dist_key.slt
new file mode 100644
index 0000000000000..bc8256b28e62a
--- /dev/null
+++ b/e2e_test/backfill/sink/different_pk_and_dist_key.slt
@@ -0,0 +1,49 @@
+statement ok
+create table t(v1 int, v2 int primary key, v3 int);
+
+statement ok
+create table t2(v1 int, v2 int primary key, v3 int);
+
+# Let snapshot side pk >= upstream side
+statement ok
+insert into t select 50000 + generate_series, 60000 + generate_series, 70000 + generate_series from generate_series(1, 10000);
+
+statement ok
+insert into t2 select 50000 + generate_series, 60000 + generate_series, 70000 + generate_series from generate_series(1, 10000);
+
+statement ok
+flush;
+
+statement ok
+create materialized view m1 as select t.v1, t.v2, t.v3 from t join t2 using(v1);
+
+statement ok
+set streaming_rate_limit = 1;
+
+statement ok
+set background_ddl = true;
+
+statement ok
+create sink s1 as select t.v1, t.v2, t.v3 from m1 join t using(v3) with (connector = 'blackhole');
+
+# Let snapshot side pk >= upstream side
+statement ok
+insert into t select 10000 + generate_series, 20000 + generate_series, 30000 + generate_series from generate_series(1, 10000);
+
+statement ok
+insert into t2 select 10000 + generate_series, 20000 + generate_series, 30000 + generate_series from generate_series(1, 10000);
+
+statement ok
+flush;
+
+statement ok
+drop sink s1;
+
+statement ok
+drop materialized view m1;
+
+statement ok
+drop table t;
+
+statement ok
+drop table t2;
\ No newline at end of file
diff --git a/src/common/src/hash/table_distribution.rs b/src/common/src/hash/table_distribution.rs
index fdf9543a8cd23..c69c9e9a43aec 100644
--- a/src/common/src/hash/table_distribution.rs
+++ b/src/common/src/hash/table_distribution.rs
@@ -37,7 +37,7 @@ enum ComputeVnode {
         dist_key_in_pk_indices: Vec<usize>,
     },
     VnodeColumnIndex {
-        /// Indices of vnode columns.
+        /// Index of vnode column.
         vnode_col_idx_in_pk: usize,
     },
 }
diff --git a/src/stream/src/executor/backfill/arrangement_backfill.rs b/src/stream/src/executor/backfill/arrangement_backfill.rs
index 8624551042796..d300f3320f9c2 100644
--- a/src/stream/src/executor/backfill/arrangement_backfill.rs
+++ b/src/stream/src/executor/backfill/arrangement_backfill.rs
@@ -445,6 +445,7 @@ where
                                 &chunk,
                                 &backfill_state,
                                 &pk_in_output_indices,
+                                &upstream_table,
                                 &pk_order,
                             )?,
                             &self.output_indices,
diff --git a/src/stream/src/executor/backfill/utils.rs b/src/stream/src/executor/backfill/utils.rs
index 1d469b311fd26..1746924a08dfe 100644
--- a/src/stream/src/executor/backfill/utils.rs
+++ b/src/stream/src/executor/backfill/utils.rs
@@ -41,10 +41,11 @@ use risingwave_common::util::sort_util::{cmp_datum_iter, OrderType};
 use risingwave_common::util::value_encoding::BasicSerde;
 use risingwave_connector::error::ConnectorError;
 use risingwave_connector::source::cdc::external::{CdcOffset, CdcOffsetParseFunc};
+use risingwave_storage::row_serde::value_serde::ValueRowSerde;
 use risingwave_storage::table::{collect_data_chunk_with_builder, KeyedRow};
 use risingwave_storage::StateStore;
 
-use crate::common::table::state_table::StateTableInner;
+use crate::common::table::state_table::{ReplicatedStateTable, StateTableInner};
 use crate::executor::{
     Message, PkIndicesRef, StreamExecutorError, StreamExecutorResult, Watermark,
 };
@@ -336,10 +337,11 @@ pub(crate) fn mark_cdc_chunk(
 /// For each row of the chunk, forward it to downstream if its pk <= `current_pos` for the
 /// corresponding `vnode`, otherwise ignore it.
 /// We implement it by changing the visibility bitmap.
-pub(crate) fn mark_chunk_ref_by_vnode(
+pub(crate) fn mark_chunk_ref_by_vnode<S: StateStore, SD: ValueRowSerde>(
     chunk: &StreamChunk,
     backfill_state: &BackfillState,
     pk_in_output_indices: PkIndicesRef<'_>,
+    upstream_table: &ReplicatedStateTable<S, SD>,
     pk_order: &[OrderType],
 ) -> StreamExecutorResult<StreamChunk> {
     let chunk = chunk.clone();
@@ -347,7 +349,8 @@ pub(crate) fn mark_chunk_ref_by_vnode(
     let mut new_visibility = BitmapBuilder::with_capacity(ops.len());
     // Use project to avoid allocation.
     for row in data.rows() {
-        let vnode = VirtualNode::compute_row(row, pk_in_output_indices);
+        let pk = row.project(pk_in_output_indices);
+        let vnode = upstream_table.compute_vnode_by_pk(pk);
         let v = match backfill_state.get_progress(&vnode)? {
             // We want to just forward the row, if the vnode has finished backfill.
             BackfillProgressPerVnode::Completed { .. } => true,
@@ -355,10 +358,7 @@ pub(crate) fn mark_chunk_ref_by_vnode(
             BackfillProgressPerVnode::NotStarted => false,
             // If in progress, we need to check row <= current_pos.
             BackfillProgressPerVnode::InProgress { current_pos, .. } => {
-                let lhs = row.project(pk_in_output_indices);
-                let rhs = current_pos;
-                let order = cmp_datum_iter(lhs.iter(), rhs.iter(), pk_order.iter().copied());
-                match order {
+                match cmp_datum_iter(pk.iter(), current_pos.iter(), pk_order.iter().copied()) {
                     Ordering::Less | Ordering::Equal => true,
                     Ordering::Greater => false,
                 }