diff --git a/e2e_test/batch/catalog/pg_settings.slt.part b/e2e_test/batch/catalog/pg_settings.slt.part index 278436a884c7..dfcd05a46048 100644 --- a/e2e_test/batch/catalog/pg_settings.slt.part +++ b/e2e_test/batch/catalog/pg_settings.slt.part @@ -61,6 +61,7 @@ user streaming_max_parallelism user streaming_over_window_cache_policy user streaming_parallelism user streaming_use_arrangement_backfill +user streaming_use_shared_source user streaming_use_snapshot_backfill user synchronize_seqscans user timezone diff --git a/e2e_test/source_inline/kafka/shared_source.slt b/e2e_test/source_inline/kafka/shared_source.slt index ca429781604e..b66bf84fbd80 100644 --- a/e2e_test/source_inline/kafka/shared_source.slt +++ b/e2e_test/source_inline/kafka/shared_source.slt @@ -1,7 +1,7 @@ control substitution on statement ok -SET enable_shared_source TO true; +SET streaming_use_shared_source TO true; system ok rpk topic create shared_source -p 4 @@ -86,7 +86,7 @@ internal_table.mjs --name mv_1 --type sourcebackfill # This does not affect the behavior for CREATE MATERIALIZED VIEW below. It also uses the shared source, and creates SourceBackfillExecutor. statement ok -SET enable_shared_source TO false; +SET streaming_use_shared_source TO false; statement ok create materialized view mv_2 as select * from s0; diff --git a/src/common/src/config.rs b/src/common/src/config.rs index 455588ffa3e4..fa183bd746cd 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -1034,8 +1034,7 @@ pub struct StreamingDeveloperConfig { /// Enable arrangement backfill /// If false, the arrangement backfill will be disabled, /// even if session variable set. - /// If true, it will be enabled by default, but session variable - /// can override it. + /// If true, it's decided by session variable `streaming_use_arrangement_backfill` (default true) pub enable_arrangement_backfill: bool, #[serde(default = "default::developer::stream_high_join_amplification_threshold")] @@ -1055,6 +1054,13 @@ pub struct StreamingDeveloperConfig { /// A flag to allow disabling the auto schema change handling #[serde(default = "default::developer::stream_enable_auto_schema_change")] pub enable_auto_schema_change: bool, + + #[serde(default = "default::developer::enable_shared_source")] + /// Enable shared source + /// If false, the shared source will be disabled, + /// even if session variable set. + /// If true, it's decided by session variable `streaming_use_shared_source` (default true) + pub enable_shared_source: bool, } /// The subsections `[batch.developer]`. @@ -1939,6 +1945,10 @@ pub mod default { true } + pub fn enable_shared_source() -> bool { + true + } + pub fn stream_high_join_amplification_threshold() -> usize { 2048 } diff --git a/src/common/src/session_config/mod.rs b/src/common/src/session_config/mod.rs index 43ac67363dd9..3d7f40618247 100644 --- a/src/common/src/session_config/mod.rs +++ b/src/common/src/session_config/mod.rs @@ -287,8 +287,8 @@ pub struct SessionConfig { /// /// When enabled, `CREATE SOURCE` will create a source streaming job, and `CREATE MATERIALIZED VIEWS` from the source /// will forward the data from the same source streaming job, and also backfill prior data from the external source. - #[parameter(default = false, alias = "rw_enable_shared_source")] - enable_shared_source: bool, + #[parameter(default = true, alias = "rw_enable_shared_source")] + streaming_use_shared_source: bool, /// Shows the server-side character set encoding. At present, this parameter can be shown but not set, because the encoding is determined at database creation time. #[parameter(default = SERVER_ENCODING)] diff --git a/src/config/example.toml b/src/config/example.toml index 21a63d1de90a..4b0e08c21319 100644 --- a/src/config/example.toml +++ b/src/config/example.toml @@ -129,6 +129,7 @@ stream_high_join_amplification_threshold = 2048 stream_enable_actor_tokio_metrics = false stream_exchange_connection_pool_size = 1 stream_enable_auto_schema_change = true +stream_enable_shared_source = true [storage] share_buffers_sync_parallelism = 1 diff --git a/src/frontend/planner_test/tests/testdata/input/shared_source.yml b/src/frontend/planner_test/tests/testdata/input/shared_source.yml index 71c87ab2e3ce..4d684422d9fd 100644 --- a/src/frontend/planner_test/tests/testdata/input/shared_source.yml +++ b/src/frontend/planner_test/tests/testdata/input/shared_source.yml @@ -9,7 +9,7 @@ ) FORMAT PLAIN ENCODE JSON; expected_outputs: [] - with_config_map: - enable_shared_source: true + streaming_use_shared_source: true sql: | /* The shared source config doesn't affect table with connector. */ EXPLAIN CREATE TABLE s(x int,y int) @@ -25,43 +25,43 @@ # We use with_config_map to control the config when CREATE SOURCE, and use another SET statement to change the config for CREATE MV # # batch: All 4 plans should be the same. -# stream: StreamSourceScan (with backfill) should be used only for the last 2. The first 2 use StreamSource. enable_shared_source changes the behavior of CREATE SOURCE, but not CREATE MATERIALIZED VIEW +# stream: StreamSourceScan (with backfill) should be used only for the last 2. The first 2 use StreamSource. streaming_use_shared_source changes the behavior of CREATE SOURCE, but not CREATE MATERIALIZED VIEW - with_config_map: - enable_shared_source: false + streaming_use_shared_source: false before: - create_source sql: | - SET enable_shared_source = false; + SET streaming_use_shared_source = false; select * from s; expected_outputs: - batch_plan - stream_plan - with_config_map: - enable_shared_source: false + streaming_use_shared_source: false before: - create_source sql: | - SET enable_shared_source = true; + SET streaming_use_shared_source = true; select * from s; expected_outputs: - batch_plan - stream_plan - with_config_map: - enable_shared_source: true + streaming_use_shared_source: true before: - create_source sql: | - SET enable_shared_source = false; + SET streaming_use_shared_source = false; select * from s; expected_outputs: - batch_plan - stream_plan - with_config_map: - enable_shared_source: true + streaming_use_shared_source: true before: - create_source sql: | - SET enable_shared_source = true; + SET streaming_use_shared_source = true; select * from s; expected_outputs: - batch_plan diff --git a/src/frontend/planner_test/tests/testdata/output/shared_source.yml b/src/frontend/planner_test/tests/testdata/output/shared_source.yml index 5083c23952f3..83fde26bfc7d 100644 --- a/src/frontend/planner_test/tests/testdata/output/shared_source.yml +++ b/src/frontend/planner_test/tests/testdata/output/shared_source.yml @@ -27,11 +27,11 @@ └─StreamDml { columns: [x, y, _row_id] } └─StreamSource with_config_map: - enable_shared_source: 'true' + streaming_use_shared_source: 'true' - before: - create_source sql: | - SET enable_shared_source = false; + SET streaming_use_shared_source = false; select * from s; batch_plan: |- BatchExchange { order: [], dist: Single } @@ -43,11 +43,11 @@ └─StreamRowIdGen { row_id_index: 3 } └─StreamSource { source: s, columns: [x, y, _rw_kafka_timestamp, _row_id] } with_config_map: - enable_shared_source: 'false' + streaming_use_shared_source: 'false' - before: - create_source sql: | - SET enable_shared_source = true; + SET streaming_use_shared_source = true; select * from s; batch_plan: |- BatchExchange { order: [], dist: Single } @@ -59,11 +59,11 @@ └─StreamRowIdGen { row_id_index: 3 } └─StreamSource { source: s, columns: [x, y, _rw_kafka_timestamp, _row_id] } with_config_map: - enable_shared_source: 'false' + streaming_use_shared_source: 'false' - before: - create_source sql: | - SET enable_shared_source = false; + SET streaming_use_shared_source = false; select * from s; batch_plan: |- BatchExchange { order: [], dist: Single } @@ -75,11 +75,11 @@ └─StreamRowIdGen { row_id_index: 3 } └─StreamSourceScan { columns: [x, y, _rw_kafka_timestamp, _row_id, _rw_kafka_partition, _rw_kafka_offset] } with_config_map: - enable_shared_source: 'true' + streaming_use_shared_source: 'true' - before: - create_source sql: | - SET enable_shared_source = true; + SET streaming_use_shared_source = true; select * from s; batch_plan: |- BatchExchange { order: [], dist: Single } @@ -91,4 +91,4 @@ └─StreamRowIdGen { row_id_index: 3 } └─StreamSourceScan { columns: [x, y, _rw_kafka_timestamp, _row_id, _rw_kafka_partition, _rw_kafka_offset] } with_config_map: - enable_shared_source: 'true' + streaming_use_shared_source: 'true' diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index d6338b89456a..f2720d111cc4 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -1662,7 +1662,12 @@ pub async fn handle_create_source( let create_cdc_source_job = with_properties.is_shareable_cdc_connector(); let is_shared = create_cdc_source_job || (with_properties.is_shareable_non_cdc_connector() - && session.config().enable_shared_source()); + && session + .env() + .streaming_config() + .developer + .enable_shared_source + && session.config().streaming_use_shared_source()); let (columns_from_resolve_source, mut source_info) = if create_cdc_source_job { bind_columns_from_source_for_cdc(&session, &source_schema)? diff --git a/src/frontend/src/optimizer/plan_node/logical_source.rs b/src/frontend/src/optimizer/plan_node/logical_source.rs index 5bbf1a021641..81dc1128c2a4 100644 --- a/src/frontend/src/optimizer/plan_node/logical_source.rs +++ b/src/frontend/src/optimizer/plan_node/logical_source.rs @@ -354,7 +354,7 @@ impl ToStream for LogicalSource { } SourceNodeKind::CreateMViewOrBatch => { // Create MV on source. - // We only check enable_shared_source is true when `CREATE SOURCE`. + // We only check streaming_use_shared_source is true when `CREATE SOURCE`. // The value does not affect the behavior of `CREATE MATERIALIZED VIEW` here. let use_shared_source = self.source_catalog().is_some_and(|c| c.info.is_shared()); if use_shared_source { diff --git a/src/tests/simulation/src/cluster.rs b/src/tests/simulation/src/cluster.rs index 3b41c82afab0..2a6118970ccd 100644 --- a/src/tests/simulation/src/cluster.rs +++ b/src/tests/simulation/src/cluster.rs @@ -158,7 +158,7 @@ impl Configuration { pub fn for_scale_shared_source() -> Self { let mut conf = Self::for_scale(); - conf.per_session_queries = vec!["SET ENABLE_SHARED_SOURCE = true;".into()].into(); + conf.per_session_queries = vec!["SET STREAMING_USE_SHARED_SOURCE = true;".into()].into(); conf }