From 6b6207bf837ba8fd3a69a59f0025f4026aeeed27 Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Sun, 19 May 2024 23:44:22 +0800 Subject: [PATCH] add fix --- src/common/src/hash/table_distribution.rs | 2 +- .../src/executor/backfill/arrangement_backfill.rs | 1 + src/stream/src/executor/backfill/utils.rs | 14 +++++++------- 3 files changed, 9 insertions(+), 8 deletions(-) diff --git a/src/common/src/hash/table_distribution.rs b/src/common/src/hash/table_distribution.rs index fdf9543a8cd23..c69c9e9a43aec 100644 --- a/src/common/src/hash/table_distribution.rs +++ b/src/common/src/hash/table_distribution.rs @@ -37,7 +37,7 @@ enum ComputeVnode { dist_key_in_pk_indices: Vec, }, VnodeColumnIndex { - /// Indices of vnode columns. + /// Index of vnode column. vnode_col_idx_in_pk: usize, }, } diff --git a/src/stream/src/executor/backfill/arrangement_backfill.rs b/src/stream/src/executor/backfill/arrangement_backfill.rs index 8624551042796..d300f3320f9c2 100644 --- a/src/stream/src/executor/backfill/arrangement_backfill.rs +++ b/src/stream/src/executor/backfill/arrangement_backfill.rs @@ -445,6 +445,7 @@ where &chunk, &backfill_state, &pk_in_output_indices, + &upstream_table, &pk_order, )?, &self.output_indices, diff --git a/src/stream/src/executor/backfill/utils.rs b/src/stream/src/executor/backfill/utils.rs index 1d469b311fd26..1746924a08dfe 100644 --- a/src/stream/src/executor/backfill/utils.rs +++ b/src/stream/src/executor/backfill/utils.rs @@ -41,10 +41,11 @@ use risingwave_common::util::sort_util::{cmp_datum_iter, OrderType}; use risingwave_common::util::value_encoding::BasicSerde; use risingwave_connector::error::ConnectorError; use risingwave_connector::source::cdc::external::{CdcOffset, CdcOffsetParseFunc}; +use risingwave_storage::row_serde::value_serde::ValueRowSerde; use risingwave_storage::table::{collect_data_chunk_with_builder, KeyedRow}; use risingwave_storage::StateStore; -use crate::common::table::state_table::StateTableInner; +use crate::common::table::state_table::{ReplicatedStateTable, StateTableInner}; use crate::executor::{ Message, PkIndicesRef, StreamExecutorError, StreamExecutorResult, Watermark, }; @@ -336,10 +337,11 @@ pub(crate) fn mark_cdc_chunk( /// For each row of the chunk, forward it to downstream if its pk <= `current_pos` for the /// corresponding `vnode`, otherwise ignore it. /// We implement it by changing the visibility bitmap. -pub(crate) fn mark_chunk_ref_by_vnode( +pub(crate) fn mark_chunk_ref_by_vnode( chunk: &StreamChunk, backfill_state: &BackfillState, pk_in_output_indices: PkIndicesRef<'_>, + upstream_table: &ReplicatedStateTable, pk_order: &[OrderType], ) -> StreamExecutorResult { let chunk = chunk.clone(); @@ -347,7 +349,8 @@ pub(crate) fn mark_chunk_ref_by_vnode( let mut new_visibility = BitmapBuilder::with_capacity(ops.len()); // Use project to avoid allocation. for row in data.rows() { - let vnode = VirtualNode::compute_row(row, pk_in_output_indices); + let pk = row.project(pk_in_output_indices); + let vnode = upstream_table.compute_vnode_by_pk(pk); let v = match backfill_state.get_progress(&vnode)? { // We want to just forward the row, if the vnode has finished backfill. BackfillProgressPerVnode::Completed { .. } => true, @@ -355,10 +358,7 @@ pub(crate) fn mark_chunk_ref_by_vnode( BackfillProgressPerVnode::NotStarted => false, // If in progress, we need to check row <= current_pos. BackfillProgressPerVnode::InProgress { current_pos, .. } => { - let lhs = row.project(pk_in_output_indices); - let rhs = current_pos; - let order = cmp_datum_iter(lhs.iter(), rhs.iter(), pk_order.iter().copied()); - match order { + match cmp_datum_iter(pk.iter(), current_pos.iter(), pk_order.iter().copied()) { Ordering::Less | Ordering::Equal => true, Ordering::Greater => false, }