Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(stream): arrangement backfill should use dist key instead of pk to derive vnode #16815

Merged
merged 2 commits into from
May 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ci/scripts/run-e2e-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ cluster_start
# Please make sure the regression is expected before increasing the timeout.
sqllogictest -p 4566 -d dev './e2e_test/streaming/**/*.slt' --junit "streaming-${profile}"
sqllogictest -p 4566 -d dev './e2e_test/backfill/rate_limit/*.slt'
sqllogictest -p 4566 -d dev './e2e_test/backfill/sink/different_pk_and_dist_key.slt'

echo "--- Kill cluster"
cluster_stop
Expand Down
49 changes: 49 additions & 0 deletions e2e_test/backfill/sink/different_pk_and_dist_key.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
statement ok
create table t(v1 int, v2 int primary key, v3 int);

statement ok
create table t2(v1 int, v2 int primary key, v3 int);

# Let snapshot side pk >= upstream side
statement ok
insert into t select 50000 + generate_series, 60000 + generate_series, 70000 + generate_series from generate_series(1, 10000);

statement ok
insert into t2 select 50000 + generate_series, 60000 + generate_series, 70000 + generate_series from generate_series(1, 10000);

statement ok
flush;

statement ok
create materialized view m1 as select t.v1, t.v2, t.v3 from t join t2 using(v1);

statement ok
set streaming_rate_limit = 1;

statement ok
set background_ddl = true;

statement ok
create sink s1 as select t.v1, t.v2, t.v3 from m1 join t using(v3) with (connector = 'blackhole');

# Let snapshot side pk >= upstream side
statement ok
insert into t select 10000 + generate_series, 20000 + generate_series, 30000 + generate_series from generate_series(1, 10000);

statement ok
insert into t2 select 10000 + generate_series, 20000 + generate_series, 30000 + generate_series from generate_series(1, 10000);

statement ok
flush;

statement ok
drop sink s1;

statement ok
drop materialized view m1;

statement ok
drop table t;

statement ok
drop table t2;
2 changes: 1 addition & 1 deletion src/common/src/hash/table_distribution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ enum ComputeVnode {
dist_key_in_pk_indices: Vec<usize>,
},
VnodeColumnIndex {
/// Indices of vnode columns.
/// Index of vnode column.
vnode_col_idx_in_pk: usize,
},
}
Expand Down
1 change: 1 addition & 0 deletions src/stream/src/executor/backfill/arrangement_backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,7 @@ where
&chunk,
&backfill_state,
&pk_in_output_indices,
&upstream_table,
&pk_order,
)?,
&self.output_indices,
Expand Down
14 changes: 7 additions & 7 deletions src/stream/src/executor/backfill/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,11 @@ use risingwave_common::util::sort_util::{cmp_datum_iter, OrderType};
use risingwave_common::util::value_encoding::BasicSerde;
use risingwave_connector::error::ConnectorError;
use risingwave_connector::source::cdc::external::{CdcOffset, CdcOffsetParseFunc};
use risingwave_storage::row_serde::value_serde::ValueRowSerde;
use risingwave_storage::table::{collect_data_chunk_with_builder, KeyedRow};
use risingwave_storage::StateStore;

use crate::common::table::state_table::StateTableInner;
use crate::common::table::state_table::{ReplicatedStateTable, StateTableInner};
use crate::executor::{
Message, PkIndicesRef, StreamExecutorError, StreamExecutorResult, Watermark,
};
Expand Down Expand Up @@ -336,29 +337,28 @@ pub(crate) fn mark_cdc_chunk(
/// For each row of the chunk, forward it to downstream if its pk <= `current_pos` for the
kwannoel marked this conversation as resolved.
Show resolved Hide resolved
/// corresponding `vnode`, otherwise ignore it.
/// We implement it by changing the visibility bitmap.
pub(crate) fn mark_chunk_ref_by_vnode(
pub(crate) fn mark_chunk_ref_by_vnode<S: StateStore, SD: ValueRowSerde>(
chunk: &StreamChunk,
backfill_state: &BackfillState,
pk_in_output_indices: PkIndicesRef<'_>,
upstream_table: &ReplicatedStateTable<S, SD>,
pk_order: &[OrderType],
) -> StreamExecutorResult<StreamChunk> {
let chunk = chunk.clone();
let (data, ops) = chunk.into_parts();
let mut new_visibility = BitmapBuilder::with_capacity(ops.len());
// Use project to avoid allocation.
for row in data.rows() {
let vnode = VirtualNode::compute_row(row, pk_in_output_indices);
let pk = row.project(pk_in_output_indices);
let vnode = upstream_table.compute_vnode_by_pk(pk);
let v = match backfill_state.get_progress(&vnode)? {
// We want to just forward the row, if the vnode has finished backfill.
BackfillProgressPerVnode::Completed { .. } => true,
// If not started, no need to forward.
BackfillProgressPerVnode::NotStarted => false,
// If in progress, we need to check row <= current_pos.
BackfillProgressPerVnode::InProgress { current_pos, .. } => {
let lhs = row.project(pk_in_output_indices);
let rhs = current_pos;
let order = cmp_datum_iter(lhs.iter(), rhs.iter(), pk_order.iter().copied());
match order {
match cmp_datum_iter(pk.iter(), current_pos.iter(), pk_order.iter().copied()) {
Ordering::Less | Ordering::Equal => true,
Ordering::Greater => false,
}
Expand Down
Loading