Skip to content

Commit

Permalink
fix(stream): arrangement backfill should use dist key instead of pk t…
Browse files Browse the repository at this point in the history
…o derive vnode (#16815)
  • Loading branch information
kwannoel authored and chenzl25 committed May 20, 2024
1 parent 9749037 commit d3a9ba3
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 8 deletions.
1 change: 1 addition & 0 deletions ci/scripts/run-e2e-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ cluster_start
# Please make sure the regression is expected before increasing the timeout.
sqllogictest -p 4566 -d dev './e2e_test/streaming/**/*.slt' --junit "streaming-${profile}"
sqllogictest -p 4566 -d dev './e2e_test/backfill/rate_limit/*.slt'
sqllogictest -p 4566 -d dev './e2e_test/backfill/sink/different_pk_and_dist_key.slt'

echo "--- Kill cluster"
cluster_stop
Expand Down
49 changes: 49 additions & 0 deletions e2e_test/backfill/sink/different_pk_and_dist_key.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
statement ok
create table t(v1 int, v2 int primary key, v3 int);

statement ok
create table t2(v1 int, v2 int primary key, v3 int);

# Let snapshot side pk >= upstream side
statement ok
insert into t select 50000 + generate_series, 60000 + generate_series, 70000 + generate_series from generate_series(1, 10000);

statement ok
insert into t2 select 50000 + generate_series, 60000 + generate_series, 70000 + generate_series from generate_series(1, 10000);

statement ok
flush;

statement ok
create materialized view m1 as select t.v1, t.v2, t.v3 from t join t2 using(v1);

statement ok
set streaming_rate_limit = 1;

statement ok
set background_ddl = true;

statement ok
create sink s1 as select t.v1, t.v2, t.v3 from m1 join t using(v3) with (connector = 'blackhole');

# Let snapshot side pk >= upstream side
statement ok
insert into t select 10000 + generate_series, 20000 + generate_series, 30000 + generate_series from generate_series(1, 10000);

statement ok
insert into t2 select 10000 + generate_series, 20000 + generate_series, 30000 + generate_series from generate_series(1, 10000);

statement ok
flush;

statement ok
drop sink s1;

statement ok
drop materialized view m1;

statement ok
drop table t;

statement ok
drop table t2;
2 changes: 1 addition & 1 deletion src/common/src/hash/table_distribution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ enum ComputeVnode {
dist_key_in_pk_indices: Vec<usize>,
},
VnodeColumnIndex {
/// Indices of vnode columns.
/// Index of vnode column.
vnode_col_idx_in_pk: usize,
},
}
Expand Down
1 change: 1 addition & 0 deletions src/stream/src/executor/backfill/arrangement_backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,7 @@ where
&chunk,
&backfill_state,
&pk_in_output_indices,
&upstream_table,
&pk_order,
)?,
&self.output_indices,
Expand Down
14 changes: 7 additions & 7 deletions src/stream/src/executor/backfill/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,11 @@ use risingwave_common::util::sort_util::{cmp_datum_iter, OrderType};
use risingwave_common::util::value_encoding::BasicSerde;
use risingwave_connector::error::ConnectorError;
use risingwave_connector::source::cdc::external::{CdcOffset, CdcOffsetParseFunc};
use risingwave_storage::row_serde::value_serde::ValueRowSerde;
use risingwave_storage::table::{collect_data_chunk_with_builder, KeyedRow};
use risingwave_storage::StateStore;

use crate::common::table::state_table::StateTableInner;
use crate::common::table::state_table::{ReplicatedStateTable, StateTableInner};
use crate::executor::{
Message, PkIndicesRef, StreamExecutorError, StreamExecutorResult, Watermark,
};
Expand Down Expand Up @@ -336,29 +337,28 @@ pub(crate) fn mark_cdc_chunk(
/// For each row of the chunk, forward it to downstream if its pk <= `current_pos` for the
/// corresponding `vnode`, otherwise ignore it.
/// We implement it by changing the visibility bitmap.
pub(crate) fn mark_chunk_ref_by_vnode(
pub(crate) fn mark_chunk_ref_by_vnode<S: StateStore, SD: ValueRowSerde>(
chunk: &StreamChunk,
backfill_state: &BackfillState,
pk_in_output_indices: PkIndicesRef<'_>,
upstream_table: &ReplicatedStateTable<S, SD>,
pk_order: &[OrderType],
) -> StreamExecutorResult<StreamChunk> {
let chunk = chunk.clone();
let (data, ops) = chunk.into_parts();
let mut new_visibility = BitmapBuilder::with_capacity(ops.len());
// Use project to avoid allocation.
for row in data.rows() {
let vnode = VirtualNode::compute_row(row, pk_in_output_indices);
let pk = row.project(pk_in_output_indices);
let vnode = upstream_table.compute_vnode_by_pk(pk);
let v = match backfill_state.get_progress(&vnode)? {
// We want to just forward the row, if the vnode has finished backfill.
BackfillProgressPerVnode::Completed { .. } => true,
// If not started, no need to forward.
BackfillProgressPerVnode::NotStarted => false,
// If in progress, we need to check row <= current_pos.
BackfillProgressPerVnode::InProgress { current_pos, .. } => {
let lhs = row.project(pk_in_output_indices);
let rhs = current_pos;
let order = cmp_datum_iter(lhs.iter(), rhs.iter(), pk_order.iter().copied());
match order {
match cmp_datum_iter(pk.iter(), current_pos.iter(), pk_order.iter().copied()) {
Ordering::Less | Ordering::Equal => true,
Ordering::Greater => false,
}
Expand Down

0 comments on commit d3a9ba3

Please sign in to comment.