Skip to content

Commit

Permalink
add fix
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel committed May 19, 2024
1 parent 3984326 commit 0c5852f
Show file tree
Hide file tree
Showing 4 changed files with 10 additions and 9 deletions.
2 changes: 1 addition & 1 deletion ci/scripts/run-e2e-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ cluster_start
# Please make sure the regression is expected before increasing the timeout.
sqllogictest -p 4566 -d dev './e2e_test/streaming/**/*.slt' --junit "streaming-${profile}"
sqllogictest -p 4566 -d dev './e2e_test/backfill/rate_limit/*.slt'
sqllogictest -p 4566 -d dev './e2e_test/backfill/sink/different_pk_and_dist_key.slt
sqllogictest -p 4566 -d dev './e2e_test/backfill/sink/different_pk_and_dist_key.slt'

echo "--- Kill cluster"
cluster_stop
Expand Down
2 changes: 1 addition & 1 deletion src/common/src/hash/table_distribution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ enum ComputeVnode {
dist_key_in_pk_indices: Vec<usize>,
},
VnodeColumnIndex {
/// Indices of vnode columns.
/// Index of vnode column.
vnode_col_idx_in_pk: usize,
},
}
Expand Down
1 change: 1 addition & 0 deletions src/stream/src/executor/backfill/arrangement_backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,7 @@ where
&chunk,
&backfill_state,
&pk_in_output_indices,
&upstream_table,
&pk_order,
)?,
&self.output_indices,
Expand Down
14 changes: 7 additions & 7 deletions src/stream/src/executor/backfill/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,11 @@ use risingwave_common::util::sort_util::{cmp_datum_iter, OrderType};
use risingwave_common::util::value_encoding::BasicSerde;
use risingwave_connector::error::ConnectorError;
use risingwave_connector::source::cdc::external::{CdcOffset, CdcOffsetParseFunc};
use risingwave_storage::row_serde::value_serde::ValueRowSerde;
use risingwave_storage::table::{collect_data_chunk_with_builder, KeyedRow};
use risingwave_storage::StateStore;

use crate::common::table::state_table::StateTableInner;
use crate::common::table::state_table::{ReplicatedStateTable, StateTableInner};
use crate::executor::{
Message, PkIndicesRef, StreamExecutorError, StreamExecutorResult, Watermark,
};
Expand Down Expand Up @@ -336,29 +337,28 @@ pub(crate) fn mark_cdc_chunk(
/// For each row of the chunk, forward it to downstream if its pk <= `current_pos` for the
/// corresponding `vnode`, otherwise ignore it.
/// We implement it by changing the visibility bitmap.
pub(crate) fn mark_chunk_ref_by_vnode(
pub(crate) fn mark_chunk_ref_by_vnode<S: StateStore, SD: ValueRowSerde>(
chunk: &StreamChunk,
backfill_state: &BackfillState,
pk_in_output_indices: PkIndicesRef<'_>,
upstream_table: &ReplicatedStateTable<S, SD>,
pk_order: &[OrderType],
) -> StreamExecutorResult<StreamChunk> {
let chunk = chunk.clone();
let (data, ops) = chunk.into_parts();
let mut new_visibility = BitmapBuilder::with_capacity(ops.len());
// Use project to avoid allocation.
for row in data.rows() {
let vnode = VirtualNode::compute_row(row, pk_in_output_indices);
let pk = row.project(pk_in_output_indices);
let vnode = upstream_table.compute_vnode_by_pk(pk);
let v = match backfill_state.get_progress(&vnode)? {
// We want to just forward the row, if the vnode has finished backfill.
BackfillProgressPerVnode::Completed { .. } => true,
// If not started, no need to forward.
BackfillProgressPerVnode::NotStarted => false,
// If in progress, we need to check row <= current_pos.
BackfillProgressPerVnode::InProgress { current_pos, .. } => {
let lhs = row.project(pk_in_output_indices);
let rhs = current_pos;
let order = cmp_datum_iter(lhs.iter(), rhs.iter(), pk_order.iter().copied());
match order {
match cmp_datum_iter(pk.iter(), current_pos.iter(), pk_order.iter().copied()) {
Ordering::Less | Ordering::Equal => true,
Ordering::Greater => false,
}
Expand Down

0 comments on commit 0c5852f

Please sign in to comment.