From afe5279e8680debf608b8e09bd0b3df3772f39a6 Mon Sep 17 00:00:00 2001 From: xxchan Date: Sat, 20 Apr 2024 17:48:51 +0800 Subject: [PATCH] fix Signed-off-by: xxchan --- .../kafka/shared_source.slt} | 16 +++++++++++++--- .../tests/testdata/input/shared_source.yml | 2 +- .../tests/testdata/output/shared_source.yml | 2 +- .../src/optimizer/plan_node/logical_source.rs | 5 +++-- .../src/executor/source/source_executor.rs | 1 + src/stream/src/from_proto/source/trad_source.rs | 2 +- 6 files changed, 20 insertions(+), 8 deletions(-) rename e2e_test/{source/basic/kafka_shared_source.slt => source_inline/kafka/shared_source.slt} (70%) diff --git a/e2e_test/source/basic/kafka_shared_source.slt b/e2e_test/source_inline/kafka/shared_source.slt similarity index 70% rename from e2e_test/source/basic/kafka_shared_source.slt rename to e2e_test/source_inline/kafka/shared_source.slt index 5245d6ea68630..0bf8944306709 100644 --- a/e2e_test/source/basic/kafka_shared_source.slt +++ b/e2e_test/source_inline/kafka/shared_source.slt @@ -3,17 +3,27 @@ control substitution on statement ok SET rw_enable_shared_source TO true; +system ok +rpk topic create shared_source -p 4 + +system ok +cat << EOF | rpk topic produce shared_source -f "%p %v\\n" -p 0 +0 {"v1": 1, "v2": "1"} +1 {"v1": 2, "v2": "22"} +2 {"v1": 3, "v2": "333"} +3 {"v1": 4, "v2": "4444"} + statement ok create source s0 (v1 int, v2 varchar) with ( - connector = 'kafka', - topic = 'kafka_4_partition_topic', - properties.bootstrap.server = '${KAFKA_BOOTSTRAP_SERVER:message_queue:29092}', + ${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, + topic = 'shared_source', scan.startup.mode = 'earliest' ) FORMAT PLAIN ENCODE JSON; statement ok create materialized view mv_1 as select * from s0; +# This does not the behavior for CREATE MATERIALIZED VIEW below. It also uses the shared source, and creates SourceBackfillExecutor. statement ok SET rw_enable_shared_source TO false; diff --git a/src/frontend/planner_test/tests/testdata/input/shared_source.yml b/src/frontend/planner_test/tests/testdata/input/shared_source.yml index 0f68cc25f6288..952ae8dcc5aa0 100644 --- a/src/frontend/planner_test/tests/testdata/input/shared_source.yml +++ b/src/frontend/planner_test/tests/testdata/input/shared_source.yml @@ -25,7 +25,7 @@ # We use with_config_map to control the config when CREATE SOURCE, and use another SET statement to change the config for CREATE MV # # batch: All 4 plans should be the same. -# stream: StreamSourceScan (with backfill) should be used only for the last 1. All other 3 use StreamSource. +# stream: StreamSourceScan (with backfill) should be used only for the last 2. The first 2 use StreamSource. rw_enable_shared_source changes the behavior of CREATE SOURCE, but not CREATE MATERIALIZED VIEW - with_config_map: rw_enable_shared_source: false before: diff --git a/src/frontend/planner_test/tests/testdata/output/shared_source.yml b/src/frontend/planner_test/tests/testdata/output/shared_source.yml index 5bf3739f28411..39fb6b799fb83 100644 --- a/src/frontend/planner_test/tests/testdata/output/shared_source.yml +++ b/src/frontend/planner_test/tests/testdata/output/shared_source.yml @@ -73,7 +73,7 @@ StreamMaterialize { columns: [x, y, _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: NoCheck } └─StreamProject { exprs: [x, y, _row_id] } └─StreamRowIdGen { row_id_index: 3 } - └─StreamSource { source: s, columns: [x, y, _rw_kafka_timestamp, _row_id, _rw_kafka_partition, _rw_kafka_offset] } + └─StreamSourceScan { columns: [x, y, _rw_kafka_timestamp, _row_id, _rw_kafka_partition, _rw_kafka_offset] } with_config_map: rw_enable_shared_source: 'true' - before: diff --git a/src/frontend/src/optimizer/plan_node/logical_source.rs b/src/frontend/src/optimizer/plan_node/logical_source.rs index 8a8fd37bd66b3..141956e7f118c 100644 --- a/src/frontend/src/optimizer/plan_node/logical_source.rs +++ b/src/frontend/src/optimizer/plan_node/logical_source.rs @@ -343,8 +343,9 @@ impl ToStream for LogicalSource { } SourceNodeKind::CreateMViewOrBatch => { // Create MV on source. - let use_shared_source = self.source_catalog().is_some_and(|c| c.info.is_shared()) - && self.ctx().session_ctx().config().rw_enable_shared_source(); + // We only check rw_enable_shared_source is true when `CREATE SOURCE`. + // The value does not affect the behavior of `CREATE MATERIALIZED VIEW` here. + let use_shared_source = self.source_catalog().is_some_and(|c| c.info.is_shared()); if use_shared_source { plan = StreamSourceScan::new(self.core.clone()).into(); } else { diff --git a/src/stream/src/executor/source/source_executor.rs b/src/stream/src/executor/source/source_executor.rs index 764cc3177f33a..d51a64debcd35 100644 --- a/src/stream/src/executor/source/source_executor.rs +++ b/src/stream/src/executor/source/source_executor.rs @@ -498,6 +498,7 @@ impl SourceExecutor { // - For shared source, pause until there's a MV. // - If the first barrier requires us to pause on startup, pause the stream. if (self.is_shared && is_uninitialized) || barrier.is_pause_on_startup() { + tracing::info!(is_shared=%self.is_shared, is_uninitialized=%is_uninitialized, "source paused on startup"); stream.pause_stream(); } diff --git a/src/stream/src/from_proto/source/trad_source.rs b/src/stream/src/from_proto/source/trad_source.rs index ae60ebb0c726a..0aad2425149c0 100644 --- a/src/stream/src/from_proto/source/trad_source.rs +++ b/src/stream/src/from_proto/source/trad_source.rs @@ -191,7 +191,7 @@ impl ExecutorBuilder for SourceExecutorBuilder { .collect(); let state_table_handler = SourceStateTableHandler::from_table_catalog( - source.state_table.as_ref.unwrap(), + source.state_table.as_ref().unwrap(), store.clone(), ) .await;