Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into peng/fe-remove-pu-slot
Browse files Browse the repository at this point in the history
  • Loading branch information
Shanicky Chen committed May 20, 2024
2 parents 95ded68 + 59770f9 commit b85ec7a
Show file tree
Hide file tree
Showing 20 changed files with 77 additions and 26 deletions.
8 changes: 4 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions ci/scripts/run-e2e-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ cluster_start
# Please make sure the regression is expected before increasing the timeout.
sqllogictest -p 4566 -d dev './e2e_test/streaming/**/*.slt' --junit "streaming-${profile}"
sqllogictest -p 4566 -d dev './e2e_test/backfill/rate_limit/*.slt'
sqllogictest -p 4566 -d dev './e2e_test/backfill/sink/different_pk_and_dist_key.slt'

echo "--- Kill cluster"
cluster_stop
Expand Down
2 changes: 1 addition & 1 deletion docker/docker-compose-distributed-etcd.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
---
version: "3"
x-image: &image
image: ${RW_IMAGE:-risingwavelabs/risingwave:v1.8.2}
image: ${RW_IMAGE:-risingwavelabs/risingwave:v1.9.0}
services:
compactor-0:
<<: *image
Expand Down
2 changes: 1 addition & 1 deletion docker/docker-compose-distributed.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
---
version: "3"
x-image: &image
image: ${RW_IMAGE:-risingwavelabs/risingwave:v1.8.1}
image: ${RW_IMAGE:-risingwavelabs/risingwave:v1.9.0}
services:
compactor-0:
<<: *image
Expand Down
2 changes: 1 addition & 1 deletion docker/docker-compose-etcd.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
---
version: "3"
x-image: &image
image: ${RW_IMAGE:-risingwavelabs/risingwave:v1.8.2}
image: ${RW_IMAGE:-risingwavelabs/risingwave:v1.9.0}
services:
risingwave-standalone:
<<: *image
Expand Down
2 changes: 1 addition & 1 deletion docker/docker-compose-with-azblob.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
---
version: "3"
x-image: &image
image: ${RW_IMAGE:-risingwavelabs/risingwave:v1.8.2}
image: ${RW_IMAGE:-risingwavelabs/risingwave:v1.9.0}
services:
risingwave-standalone:
<<: *image
Expand Down
2 changes: 1 addition & 1 deletion docker/docker-compose-with-gcs.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
---
version: "3"
x-image: &image
image: ${RW_IMAGE:-risingwavelabs/risingwave:v1.8.2}
image: ${RW_IMAGE:-risingwavelabs/risingwave:v1.9.0}
services:
risingwave-standalone:
<<: *image
Expand Down
2 changes: 1 addition & 1 deletion docker/docker-compose-with-local-fs.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
---
version: "3"
x-image: &image
image: ${RW_IMAGE:-risingwavelabs/risingwave:v1.8.2}
image: ${RW_IMAGE:-risingwavelabs/risingwave:v1.9.0}
services:
risingwave-standalone:
<<: *image
Expand Down
2 changes: 1 addition & 1 deletion docker/docker-compose-with-obs.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
---
version: "3"
x-image: &image
image: ${RW_IMAGE:-risingwavelabs/risingwave:v1.8.2}
image: ${RW_IMAGE:-risingwavelabs/risingwave:v1.9.0}
services:
risingwave-standalone:
<<: *image
Expand Down
2 changes: 1 addition & 1 deletion docker/docker-compose-with-oss.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
---
version: "3"
x-image: &image
image: ${RW_IMAGE:-risingwavelabs/risingwave:v1.8.2}
image: ${RW_IMAGE:-risingwavelabs/risingwave:v1.9.0}
services:
risingwave-standalone:
<<: *image
Expand Down
2 changes: 1 addition & 1 deletion docker/docker-compose-with-s3.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
---
version: "3"
x-image: &image
image: ${RW_IMAGE:-risingwavelabs/risingwave:v1.8.2}
image: ${RW_IMAGE:-risingwavelabs/risingwave:v1.9.0}
services:
risingwave-standalone:
<<: *image
Expand Down
2 changes: 1 addition & 1 deletion docker/docker-compose-with-sqlite.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
---
version: "3"
x-image: &image
image: ${RW_IMAGE:-risingwavelabs/risingwave:v1.8.1}
image: ${RW_IMAGE:-risingwavelabs/risingwave:v1.9.0}
services:
risingwave-standalone:
<<: *image
Expand Down
2 changes: 1 addition & 1 deletion docker/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
---
version: "3"
x-image: &image
image: ${RW_IMAGE:-risingwavelabs/risingwave:v1.8.2}
image: ${RW_IMAGE:-risingwavelabs/risingwave:v1.9.0}
services:
risingwave-standalone:
<<: *image
Expand Down
49 changes: 49 additions & 0 deletions e2e_test/backfill/sink/different_pk_and_dist_key.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
statement ok
create table t(v1 int, v2 int primary key, v3 int);

statement ok
create table t2(v1 int, v2 int primary key, v3 int);

# Let snapshot side pk >= upstream side
statement ok
insert into t select 50000 + generate_series, 60000 + generate_series, 70000 + generate_series from generate_series(1, 10000);

statement ok
insert into t2 select 50000 + generate_series, 60000 + generate_series, 70000 + generate_series from generate_series(1, 10000);

statement ok
flush;

statement ok
create materialized view m1 as select t.v1, t.v2, t.v3 from t join t2 using(v1);

statement ok
set streaming_rate_limit = 1;

statement ok
set background_ddl = true;

statement ok
create sink s1 as select t.v1, t.v2, t.v3 from m1 join t using(v3) with (connector = 'blackhole');

# Let snapshot side pk >= upstream side
statement ok
insert into t select 10000 + generate_series, 20000 + generate_series, 30000 + generate_series from generate_series(1, 10000);

statement ok
insert into t2 select 10000 + generate_series, 20000 + generate_series, 30000 + generate_series from generate_series(1, 10000);

statement ok
flush;

statement ok
drop sink s1;

statement ok
drop materialized view m1;

statement ok
drop table t;

statement ok
drop table t2;
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ private void executeStatement(PreparedStatement stmt) throws SQLException {
if (stmt == null) {
return;
}
LOG.info("Executing statement: {}", stmt);
LOG.debug("Executing statement: {}", stmt);
stmt.executeBatch();
stmt.clearParameters();
}
Expand Down
2 changes: 1 addition & 1 deletion src/common/src/hash/table_distribution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ enum ComputeVnode {
dist_key_in_pk_indices: Vec<usize>,
},
VnodeColumnIndex {
/// Indices of vnode columns.
/// Index of vnode column.
vnode_col_idx_in_pk: usize,
},
}
Expand Down
2 changes: 1 addition & 1 deletion src/connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ prometheus = { version = "0.13", features = ["process"] }
prost = { version = "0.12", features = ["no-recursion-limit"] }
prost-reflect = "0.13"
prost-types = "0.12"
protobuf-native = "0.2.1"
protobuf-native = "0.2.2"
pulsar = { version = "6.2", default-features = false, features = [
"tokio-runtime",
"telemetry",
Expand Down
2 changes: 1 addition & 1 deletion src/rpc_client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ normal = ["workspace-hack"]
anyhow = "1"
async-trait = "0.1"
easy-ext = "1"
either = "1.11.0"
either = "1.12.0"
futures = { version = "0.3", default-features = false, features = ["alloc"] }
http = "0.2"
hyper = "0.14" # required by tonic
Expand Down
1 change: 1 addition & 0 deletions src/stream/src/executor/backfill/arrangement_backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,7 @@ where
&chunk,
&backfill_state,
&pk_in_output_indices,
&upstream_table,
&pk_order,
)?,
&self.output_indices,
Expand Down
14 changes: 7 additions & 7 deletions src/stream/src/executor/backfill/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,11 @@ use risingwave_common::util::sort_util::{cmp_datum_iter, OrderType};
use risingwave_common::util::value_encoding::BasicSerde;
use risingwave_connector::error::ConnectorError;
use risingwave_connector::source::cdc::external::{CdcOffset, CdcOffsetParseFunc};
use risingwave_storage::row_serde::value_serde::ValueRowSerde;
use risingwave_storage::table::{collect_data_chunk_with_builder, KeyedRow};
use risingwave_storage::StateStore;

use crate::common::table::state_table::StateTableInner;
use crate::common::table::state_table::{ReplicatedStateTable, StateTableInner};
use crate::executor::{
Message, PkIndicesRef, StreamExecutorError, StreamExecutorResult, Watermark,
};
Expand Down Expand Up @@ -336,29 +337,28 @@ pub(crate) fn mark_cdc_chunk(
/// For each row of the chunk, forward it to downstream if its pk <= `current_pos` for the
/// corresponding `vnode`, otherwise ignore it.
/// We implement it by changing the visibility bitmap.
pub(crate) fn mark_chunk_ref_by_vnode(
pub(crate) fn mark_chunk_ref_by_vnode<S: StateStore, SD: ValueRowSerde>(
chunk: &StreamChunk,
backfill_state: &BackfillState,
pk_in_output_indices: PkIndicesRef<'_>,
upstream_table: &ReplicatedStateTable<S, SD>,
pk_order: &[OrderType],
) -> StreamExecutorResult<StreamChunk> {
let chunk = chunk.clone();
let (data, ops) = chunk.into_parts();
let mut new_visibility = BitmapBuilder::with_capacity(ops.len());
// Use project to avoid allocation.
for row in data.rows() {
let vnode = VirtualNode::compute_row(row, pk_in_output_indices);
let pk = row.project(pk_in_output_indices);
let vnode = upstream_table.compute_vnode_by_pk(pk);
let v = match backfill_state.get_progress(&vnode)? {
// We want to just forward the row, if the vnode has finished backfill.
BackfillProgressPerVnode::Completed { .. } => true,
// If not started, no need to forward.
BackfillProgressPerVnode::NotStarted => false,
// If in progress, we need to check row <= current_pos.
BackfillProgressPerVnode::InProgress { current_pos, .. } => {
let lhs = row.project(pk_in_output_indices);
let rhs = current_pos;
let order = cmp_datum_iter(lhs.iter(), rhs.iter(), pk_order.iter().copied());
match order {
match cmp_datum_iter(pk.iter(), current_pos.iter(), pk_order.iter().copied()) {
Ordering::Less | Ordering::Equal => true,
Ordering::Greater => false,
}
Expand Down

0 comments on commit b85ec7a

Please sign in to comment.