From af9ac6d3292e428c34dad59549bc43912047983b Mon Sep 17 00:00:00 2001 From: xxchan Date: Fri, 26 Apr 2024 01:34:47 +0800 Subject: [PATCH] feat: pause shared SourceExecutor until a downstream actor is created (#16348) Signed-off-by: xxchan --- Makefile.toml | 22 +- e2e_test/source/basic/kafka_shared_source.slt | 58 ----- .../cdc/mysql/mysql_create_drop.slt | 24 +- e2e_test/source_inline/commands.toml | 23 +- e2e_test/source_inline/kafka/b.sh | 9 + .../source_inline/kafka/consumer_group.mjs | 13 +- .../source_inline/kafka/shared_source.slt | 209 ++++++++++++++++++ src/common/src/catalog/internal_table.rs | 4 +- src/compute/tests/integration_tests.rs | 1 + .../tests/testdata/input/shared_source.yml | 2 +- .../tests/testdata/output/shared_source.yml | 2 +- .../src/optimizer/plan_node/logical_source.rs | 5 +- src/stream/src/executor/actor.rs | 1 + src/stream/src/executor/mod.rs | 46 ++++ src/stream/src/executor/mview/materialize.rs | 33 +-- .../src/executor/source/source_executor.rs | 27 ++- src/stream/src/executor/stream_reader.rs | 2 + .../src/from_proto/source/trad_source.rs | 3 + 18 files changed, 347 insertions(+), 137 deletions(-) delete mode 100644 e2e_test/source/basic/kafka_shared_source.slt create mode 100644 e2e_test/source_inline/kafka/b.sh create mode 100644 e2e_test/source_inline/kafka/shared_source.slt diff --git a/Makefile.toml b/Makefile.toml index 9f896adc00b00..84f9daab52ecf 100644 --- a/Makefile.toml +++ b/Makefile.toml @@ -131,8 +131,7 @@ rm -rf "${PREFIX_PROFILING}" [tasks.reset-rw] category = "RiseDev - Start/Stop" description = "Clean all data in the default database dev of the running RisingWave" -dependencies = ["check-risedev-env-file"] -env_files = ["${PREFIX_CONFIG}/risedev-env"] +dependencies = ["check-and-load-risedev-env-file"] script = ''' #!/usr/bin/env bash psql -h $RISEDEV_RW_FRONTEND_LISTEN_ADDRESS -p $RISEDEV_RW_FRONTEND_PORT -U root -d dev -c "CREATE DATABASE risedev_tmp;" @@ -568,11 +567,16 @@ if [ ! -f "${RC_ENV_FILE}" ]; then fi ''' +[tasks.check-and-load-risedev-env-file] +private = true +category = "RiseDev - Prepare" +dependencies = ["check-risedev-env-file"] +env_files = ["${PREFIX_CONFIG}/risedev-env"] + [tasks.psql-env] category = "RiseDev - Start/Stop" description = "Dump env configuration for psql" -dependencies = ["check-risedev-env-file"] -env_files = ["${PREFIX_CONFIG}/risedev-env"] +dependencies = ["check-and-load-risedev-env-file"] script = ''' #!/usr/bin/env bash cat < "${PREFIX_CONFIG}/psql-env" @@ -590,8 +594,7 @@ echo " $(tput setaf 4)source ${PREFIX_CONFIG}/psql-env$(tput sgr0)" [tasks.psql] category = "RiseDev - Start/Stop" description = "Run local psql client with default connection parameters. You can pass extra arguments to psql." -dependencies = ["check-risedev-env-file"] -env_files = ["${PREFIX_CONFIG}/risedev-env"] +dependencies = ["check-and-load-risedev-env-file"] script = ''' #!/usr/bin/env bash psql -h $RISEDEV_RW_FRONTEND_LISTEN_ADDRESS -p $RISEDEV_RW_FRONTEND_PORT -U root -d dev "$@" @@ -600,8 +603,7 @@ psql -h $RISEDEV_RW_FRONTEND_LISTEN_ADDRESS -p $RISEDEV_RW_FRONTEND_PORT -U root [tasks.ctl] category = "RiseDev - Start/Stop" description = "Start RiseCtl" -dependencies = ["check-risedev-env-file"] -env_files = ["${PREFIX_CONFIG}/risedev-env"] +dependencies = ["check-and-load-risedev-env-file"] script = ''' #!/usr/bin/env bash cargo run -p risingwave_cmd_all --profile "${RISINGWAVE_BUILD_PROFILE}" -- ctl "$@" @@ -1312,8 +1314,7 @@ category = "RiseDev - Test - SQLLogicTest" install_crate = { version = "0.20.1", crate_name = "sqllogictest-bin", binary = "sqllogictest", test_arg = [ "--help", ], install_command = "binstall" } -dependencies = ["check-risedev-env-file"] -env_files = ["${PREFIX_CONFIG}/risedev-env"] +dependencies = ["check-and-load-risedev-env-file"] command = "sqllogictest" args = ["${@}"] description = "🌟 Run SQLLogicTest" @@ -1452,6 +1453,7 @@ dependencies = ["check-risedev-env-file"] script = ''' #!/usr/bin/env bash set -euo pipefail + cat ${PREFIX_CONFIG}/risedev-env echo "Hint: To load the environment variables into the shell, you may run:" echo "$(tput setaf 4)\set -a; source ${PREFIX_CONFIG}/risedev-env; set +a$(tput sgr0)" diff --git a/e2e_test/source/basic/kafka_shared_source.slt b/e2e_test/source/basic/kafka_shared_source.slt deleted file mode 100644 index 5245d6ea68630..0000000000000 --- a/e2e_test/source/basic/kafka_shared_source.slt +++ /dev/null @@ -1,58 +0,0 @@ -control substitution on - -statement ok -SET rw_enable_shared_source TO true; - -statement ok -create source s0 (v1 int, v2 varchar) with ( - connector = 'kafka', - topic = 'kafka_4_partition_topic', - properties.bootstrap.server = '${KAFKA_BOOTSTRAP_SERVER:message_queue:29092}', - scan.startup.mode = 'earliest' -) FORMAT PLAIN ENCODE JSON; - -statement ok -create materialized view mv_1 as select * from s0; - -statement ok -SET rw_enable_shared_source TO false; - -statement ok -create materialized view mv_2 as select * from s0; - -statement ok -flush; - -# Wait enough time to ensure SourceExecutor consumes all Kafka data. -sleep 1s - -query IT rowsort -select v1, v2 from s0; ----- -1 1 -2 22 -3 333 -4 4444 - -query IT rowsort -select v1, v2 from mv_1; ----- -1 1 -2 22 -3 333 -4 4444 - -query IT rowsort -select v1, v2 from mv_2; ----- -1 1 -2 22 -3 333 -4 4444 - -# TODO: add more data to the topic and re-check the data. Currently there's no good test infra to do this... -# To test the correctness of source backfill, we might need to keep producing data during an interval, to let it go -# through the backfill stage to the forward stage. - -statement ok -drop source s0 cascade; diff --git a/e2e_test/source_inline/cdc/mysql/mysql_create_drop.slt b/e2e_test/source_inline/cdc/mysql/mysql_create_drop.slt index e6ce86aaf06fa..da3d82083755f 100644 --- a/e2e_test/source_inline/cdc/mysql/mysql_create_drop.slt +++ b/e2e_test/source_inline/cdc/mysql/mysql_create_drop.slt @@ -45,7 +45,23 @@ create source s with ( sleep 2s -# SourceExecutor only receives new data after it's created. +# At the beginning, the source is paused. It will resume after a downstream is created. +system ok +internal_table.mjs --name s --type '' --count +---- +count: 0 + + +statement ok +create table tt1_shared (v1 int, + v2 timestamptz, + PRIMARY KEY (v1) +) from s table 'testdb1.tt1'; + +sleep 2s + +# The source is resumed. +# SourceExecutor does not handle historical data, and only receives new data after it's created. # But it can receive offset update at the beginning and periodically # via the heartbeat message. system ok @@ -114,12 +130,6 @@ create table tt5 (v1 int, table.name = 'tt5', ); -statement ok -create table tt1_shared (v1 int, - v2 timestamptz, - PRIMARY KEY (v1) -) from s table 'testdb1.tt1'; - statement ok create table tt2_shared (v1 int, v2 timestamptz, diff --git a/e2e_test/source_inline/commands.toml b/e2e_test/source_inline/commands.toml index f616fb40711c9..fd9163f8747f7 100644 --- a/e2e_test/source_inline/commands.toml +++ b/e2e_test/source_inline/commands.toml @@ -1,9 +1,5 @@ # This file contains commands used by the tests. -[tasks.source-test-hook] -private = true -dependencies = ["check-risedev-env-file"] -env_files = ["${PREFIX_CONFIG}/risedev-env"] # Note about the Kafka CLI tooling: # - Built-in Kafka console tools: @@ -16,10 +12,10 @@ env_files = ["${PREFIX_CONFIG}/risedev-env"] # - rpk: # Golang based. # Style example: RPK_BROKERS=localhost:9092 rpk topic create t -[tasks.kafka-hook] +[tasks.check-kafka] private = true description = "Check if Kafka is started by RiseDev" -dependencies = ["source-test-hook"] +dependencies = ["check-and-load-risedev-env-file"] script = ''' #!/usr/bin/env sh set -e @@ -38,10 +34,11 @@ fi [tasks.clean-kafka] category = "RiseDev - Test - Source Test - Kafka" description = "Delete all kafka topics." -dependencies = ["kafka-hook"] +dependencies = ["check-and-load-risedev-env-file"] script = ''' #!/usr/bin/env sh set -e + if [ -n "${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}" ]; then echo "Deleting all Kafka topics..." rpk topic delete -r "*" @@ -52,7 +49,7 @@ fi [tasks.kafka-topics] category = "RiseDev - Test - Source Test - Kafka" -dependencies = ["kafka-hook"] +dependencies = ["check-kafka"] script = """ #!/usr/bin/env sh set -e @@ -61,7 +58,7 @@ ${PREFIX_BIN}/kafka/bin/kafka-topics.sh --bootstrap-server ${RISEDEV_KAFKA_BOOTS [tasks.kafka-produce] category = "RiseDev - Test - Source Test - Kafka" -dependencies = ["kafka-hook"] +dependencies = ["check-kafka"] script = """ #!/usr/bin/env sh set -e @@ -70,7 +67,7 @@ ${PREFIX_BIN}/kafka/bin/kafka-console-producer.sh --bootstrap-server ${RISEDEV_K [tasks.kafka-consume] category = "RiseDev - Test - Source Test - Kafka" -dependencies = ["kafka-hook"] +dependencies = ["check-kafka"] script = """ #!/usr/bin/env sh set -e @@ -79,7 +76,7 @@ ${PREFIX_BIN}/kafka/bin/kafka-console-consumer.sh --bootstrap-server ${RISEDEV_K [tasks.kafka-consumer-groups] category = "RiseDev - Test - Source Test - Kafka" -dependencies = ["kafka-hook"] +dependencies = ["check-kafka"] script = """ #!/usr/bin/env sh set -e @@ -89,7 +86,7 @@ ${PREFIX_BIN}/kafka/bin/kafka-consumer-groups.sh --bootstrap-server ${RISEDEV_KA # rpk tools [tasks.rpk] category = "RiseDev - Test - Source Test - Kafka" -dependencies = ["kafka-hook"] +dependencies = ["check-kafka"] # check https://docs.redpanda.com/current/reference/rpk/rpk-x-options/ or rpk -X help/list for options script = """ #!/usr/bin/env sh @@ -106,7 +103,7 @@ rpk "$@" [tasks.redpanda-console] category = "RiseDev - Test - Source Test - Kafka" description = "Start Redpanda console (Kafka GUI) at localhost:8080." -dependencies = ["kafka-hook"] +dependencies = ["check-kafka"] script = ''' #!/usr/bin/env sh set -e diff --git a/e2e_test/source_inline/kafka/b.sh b/e2e_test/source_inline/kafka/b.sh new file mode 100644 index 0000000000000..75960b7d03bf8 --- /dev/null +++ b/e2e_test/source_inline/kafka/b.sh @@ -0,0 +1,9 @@ +#!/bin/bash +for i in {0..9}; do +cat < { - return (await describe_consumer_group(group_name))["MEMBERS"] + return (await describe_consumer_group(group_name))["MEMBERS"]; }) ); } @@ -71,14 +68,14 @@ async function list_consumer_group_lags(fragment_id) { const groups = await list_consumer_groups(fragment_id); return Promise.all( groups.map(async (group_name) => { - return (await describe_consumer_group(group_name))["TOTAL-LAG"] + return (await describe_consumer_group(group_name))["TOTAL-LAG"]; }) ); } const fragment_id = await get_fragment_id_of_mv(mv); if (command == "list-groups") { - echo`${(await list_consumer_groups(fragment_id))}`; + echo`${await list_consumer_groups(fragment_id)}`; } else if (command == "list-members") { echo`${await list_consumer_group_members(fragment_id)}`; } else if (command == "list-lags") { diff --git a/e2e_test/source_inline/kafka/shared_source.slt b/e2e_test/source_inline/kafka/shared_source.slt new file mode 100644 index 0000000000000..806369f2d630d --- /dev/null +++ b/e2e_test/source_inline/kafka/shared_source.slt @@ -0,0 +1,209 @@ +control substitution on + +statement ok +SET rw_enable_shared_source TO true; + +system ok +rpk topic create shared_source -p 4 + +system ok +cat << EOF | rpk topic produce shared_source -f "%p %v\\n" -p 0 +0 {"v1": 1, "v2": "a"} +1 {"v1": 2, "v2": "b"} +2 {"v1": 3, "v2": "c"} +3 {"v1": 4, "v2": "d"} +EOF + +statement ok +create source s0 (v1 int, v2 varchar) with ( + ${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, + topic = 'shared_source', + scan.startup.mode = 'earliest' +) FORMAT PLAIN ENCODE JSON; + +query I +select count(*) from rw_internal_tables where name like '%s0%'; +---- +1 + +sleep 1s + +# Ingestion does not start (state table is empty), even after sleep +system ok +internal_table.mjs --name s0 --type source +---- +(empty) + + +statement ok +create materialized view mv_1 as select * from s0; + +# Wait enough time to ensure SourceExecutor consumes all Kafka data. +sleep 2s + +# Ingestion started +system ok +internal_table.mjs --name s0 --type source +---- +0,"{""split_info"": {""partition"": 0, ""start_offset"": 0, ""stop_offset"": null, ""topic"": ""shared_source""}, ""split_type"": ""kafka""}" +1,"{""split_info"": {""partition"": 1, ""start_offset"": 0, ""stop_offset"": null, ""topic"": ""shared_source""}, ""split_type"": ""kafka""}" +2,"{""split_info"": {""partition"": 2, ""start_offset"": 0, ""stop_offset"": null, ""topic"": ""shared_source""}, ""split_type"": ""kafka""}" +3,"{""split_info"": {""partition"": 3, ""start_offset"": 0, ""stop_offset"": null, ""topic"": ""shared_source""}, ""split_type"": ""kafka""}" + + +# The result is non-deterministic: +# If the upstream row comes before the backfill row, it will be ignored, and the result state is Backfilling. +# If the upstream row comes after the backfill row, the result state is Finished. +# Uncomment below and run manually to see the result. + +# system ok +# internal_table.mjs --name mv_1 --type sourcebackfill +# ---- +# 0,"{""Backfilling"": ""0""}" +# 1,"{""Backfilling"": ""0""}" +# 2,"{""Backfilling"": ""0""}" +# 3,"{""Backfilling"": ""0""}" + + +# This does not affect the behavior for CREATE MATERIALIZED VIEW below. It also uses the shared source, and creates SourceBackfillExecutor. +statement ok +SET rw_enable_shared_source TO false; + +statement ok +create materialized view mv_2 as select * from s0; + +sleep 2s + +query IT rowsort +select v1, v2 from s0; +---- +1 a +2 b +3 c +4 d + +query IT rowsort +select v1, v2 from mv_1; +---- +1 a +2 b +3 c +4 d + +query IT rowsort +select v1, v2 from mv_2; +---- +1 a +2 b +3 c +4 d + +system ok +cat << EOF | rpk topic produce shared_source -f "%p %v\\n" -p 0 +0 {"v1": 1, "v2": "aa"} +1 {"v1": 2, "v2": "bb"} +2 {"v1": 3, "v2": "cc"} +3 {"v1": 4, "v2": "dd"} +EOF + +sleep 2s + +query IT rowsort +select v1, v2 from s0; +---- +1 a +1 aa +2 b +2 bb +3 c +3 cc +4 d +4 dd + +query IT rowsort +select v1, v2 from mv_1; +---- +1 a +1 aa +2 b +2 bb +3 c +3 cc +4 d +4 dd + + +# start_offset changed to 1 +system ok +internal_table.mjs --name s0 --type source +---- +0,"{""split_info"": {""partition"": 0, ""start_offset"": 1, ""stop_offset"": null, ""topic"": ""shared_source""}, ""split_type"": ""kafka""}" +1,"{""split_info"": {""partition"": 1, ""start_offset"": 1, ""stop_offset"": null, ""topic"": ""shared_source""}, ""split_type"": ""kafka""}" +2,"{""split_info"": {""partition"": 2, ""start_offset"": 1, ""stop_offset"": null, ""topic"": ""shared_source""}, ""split_type"": ""kafka""}" +3,"{""split_info"": {""partition"": 3, ""start_offset"": 1, ""stop_offset"": null, ""topic"": ""shared_source""}, ""split_type"": ""kafka""}" + + +# Same as above, the result is still non-deterministic: Some partitions may be: "{""Backfilling"": ""1""}" +# Uncomment below and run manually to see the result. + +# system ok +# internal_table.mjs --name mv_1 --type sourcebackfill +# ---- +# 0,"{""Finished""}" +# 1,"{""Finished""}" +# 2,"{""Finished""}" +# 3,"{""Finished""}" + + +# Note: heredoc in loop in mac's sh is ok, but not in linux's sh. So we use bash here. +system ok +bash -c 'for i in {0..9}; do +cat < String { format!( "__internal_{}_{}_{}_{}", - mview_name, + job_name, fragment_id, table_type.to_lowercase(), table_id diff --git a/src/compute/tests/integration_tests.rs b/src/compute/tests/integration_tests.rs index ea9bd583284cc..14be0e64e7a20 100644 --- a/src/compute/tests/integration_tests.rs +++ b/src/compute/tests/integration_tests.rs @@ -173,6 +173,7 @@ async fn test_table_materialize() -> StreamResult<()> { barrier_rx, system_params_manager.get_params(), None, + false, ) .boxed(), ); diff --git a/src/frontend/planner_test/tests/testdata/input/shared_source.yml b/src/frontend/planner_test/tests/testdata/input/shared_source.yml index 0f68cc25f6288..952ae8dcc5aa0 100644 --- a/src/frontend/planner_test/tests/testdata/input/shared_source.yml +++ b/src/frontend/planner_test/tests/testdata/input/shared_source.yml @@ -25,7 +25,7 @@ # We use with_config_map to control the config when CREATE SOURCE, and use another SET statement to change the config for CREATE MV # # batch: All 4 plans should be the same. -# stream: StreamSourceScan (with backfill) should be used only for the last 1. All other 3 use StreamSource. +# stream: StreamSourceScan (with backfill) should be used only for the last 2. The first 2 use StreamSource. rw_enable_shared_source changes the behavior of CREATE SOURCE, but not CREATE MATERIALIZED VIEW - with_config_map: rw_enable_shared_source: false before: diff --git a/src/frontend/planner_test/tests/testdata/output/shared_source.yml b/src/frontend/planner_test/tests/testdata/output/shared_source.yml index 5bf3739f28411..39fb6b799fb83 100644 --- a/src/frontend/planner_test/tests/testdata/output/shared_source.yml +++ b/src/frontend/planner_test/tests/testdata/output/shared_source.yml @@ -73,7 +73,7 @@ StreamMaterialize { columns: [x, y, _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: NoCheck } └─StreamProject { exprs: [x, y, _row_id] } └─StreamRowIdGen { row_id_index: 3 } - └─StreamSource { source: s, columns: [x, y, _rw_kafka_timestamp, _row_id, _rw_kafka_partition, _rw_kafka_offset] } + └─StreamSourceScan { columns: [x, y, _rw_kafka_timestamp, _row_id, _rw_kafka_partition, _rw_kafka_offset] } with_config_map: rw_enable_shared_source: 'true' - before: diff --git a/src/frontend/src/optimizer/plan_node/logical_source.rs b/src/frontend/src/optimizer/plan_node/logical_source.rs index 50a28494e81df..0310fdbbd439b 100644 --- a/src/frontend/src/optimizer/plan_node/logical_source.rs +++ b/src/frontend/src/optimizer/plan_node/logical_source.rs @@ -347,8 +347,9 @@ impl ToStream for LogicalSource { } SourceNodeKind::CreateMViewOrBatch => { // Create MV on source. - let use_shared_source = self.source_catalog().is_some_and(|c| c.info.is_shared()) - && self.ctx().session_ctx().config().rw_enable_shared_source(); + // We only check rw_enable_shared_source is true when `CREATE SOURCE`. + // The value does not affect the behavior of `CREATE MATERIALIZED VIEW` here. + let use_shared_source = self.source_catalog().is_some_and(|c| c.info.is_shared()); if use_shared_source { plan = StreamSourceScan::new(self.core.clone()).into(); } else { diff --git a/src/stream/src/executor/actor.rs b/src/stream/src/executor/actor.rs index 750d299b55e84..ef776922d4357 100644 --- a/src/stream/src/executor/actor.rs +++ b/src/stream/src/executor/actor.rs @@ -49,6 +49,7 @@ pub struct ActorContext { pub streaming_metrics: Arc, + /// This is the number of dispatchers when the actor is created. It will not be updated during runtime when new downstreams are added. pub initial_dispatch_num: usize, } diff --git a/src/stream/src/executor/mod.rs b/src/stream/src/executor/mod.rs index 0748fac0a6569..62740e6344cf9 100644 --- a/src/stream/src/executor/mod.rs +++ b/src/stream/src/executor/mod.rs @@ -373,6 +373,52 @@ impl Barrier { } } + /// Whether this barrier adds new downstream fragment for the actor with `upstream_actor_id`. + /// + /// # Use case + /// Some optimizations are applied when an actor doesn't have any downstreams ("standalone" actors). + /// * Pause a standalone shared `SourceExecutor`. + /// * Disable a standalone `MaterializeExecutor`'s conflict check. + /// + /// This is implemented by checking `actor_context.initial_dispatch_num` on startup, and + /// check `has_more_downstream_fragments` on barrier to see whether the optimization + /// needs to be turned off. + /// + /// ## Some special cases not included + /// + /// Note that this is not `has_new_downstream_actor/fragment`. For our use case, we only + /// care about **number of downstream fragments** (more precisely, existence). + /// - When scaling, the number of downstream actors is changed, and they are "new", but downstream fragments is not changed. + /// - When `ALTER TABLE sink_into_table`, the fragment is replaced with a "new" one, but the number is not changed. + pub fn has_more_downstream_fragments(&self, upstream_actor_id: ActorId) -> bool { + let Some(mutation) = self.mutation.as_deref() else { + return false; + }; + match mutation { + // Add is for mv, index and sink creation. + Mutation::Add(AddMutation { adds, .. }) => adds.get(&upstream_actor_id).is_some(), + // AddAndUpdate is for sink-into-table. + Mutation::AddAndUpdate( + AddMutation { adds, .. }, + UpdateMutation { + dispatchers, + actor_new_dispatchers, + .. + }, + ) => { + adds.get(&upstream_actor_id).is_some() + || actor_new_dispatchers.get(&upstream_actor_id).is_some() + || dispatchers.get(&upstream_actor_id).is_some() + } + Mutation::Update(_) + | Mutation::Stop(_) + | Mutation::Pause + | Mutation::Resume + | Mutation::SourceChangeSplit(_) + | Mutation::Throttle(_) => false, + } + } + /// Whether this barrier requires the executor to pause its data stream on startup. pub fn is_pause_on_startup(&self) -> bool { match self.mutation.as_deref() { diff --git a/src/stream/src/executor/mview/materialize.rs b/src/stream/src/executor/mview/materialize.rs index ede0448f23887..c88b3d8257177 100644 --- a/src/stream/src/executor/mview/materialize.rs +++ b/src/stream/src/executor/mview/materialize.rs @@ -38,7 +38,6 @@ use crate::cache::{new_unbounded, ManagedLruCache}; use crate::common::metrics::MetricsInfo; use crate::common::table::state_table::{StateTableInner, StateTableOpConsistencyLevel}; use crate::executor::prelude::*; -use crate::executor::{AddMutation, UpdateMutation}; /// `MaterializeExecutor` materializes changes in stream into a materialized view on storage. pub struct MaterializeExecutor { @@ -239,10 +238,9 @@ impl MaterializeExecutor { } } Message::Barrier(b) => { - let mutation = b.mutation.as_deref(); // If a downstream mv depends on the current table, we need to do conflict check again. if !self.may_have_downstream - && Self::new_downstream_created(mutation, self.actor_context.id) + && b.has_more_downstream_fragments(self.actor_context.id) { self.may_have_downstream = true; } @@ -270,35 +268,6 @@ impl MaterializeExecutor { } } } - - fn new_downstream_created(mutation: Option<&Mutation>, actor_id: ActorId) -> bool { - let Some(mutation) = mutation else { - return false; - }; - match mutation { - // Add is for mv, index and sink creation. - Mutation::Add(AddMutation { adds, .. }) => adds.get(&actor_id).is_some(), - // AddAndUpdate is for sink-into-table. - Mutation::AddAndUpdate( - AddMutation { adds, .. }, - UpdateMutation { - dispatchers, - actor_new_dispatchers: actor_dispatchers, - .. - }, - ) => { - adds.get(&actor_id).is_some() - || actor_dispatchers.get(&actor_id).is_some() - || dispatchers.get(&actor_id).is_some() - } - Mutation::Update(_) - | Mutation::Stop(_) - | Mutation::Pause - | Mutation::Resume - | Mutation::SourceChangeSplit(_) - | Mutation::Throttle(_) => false, - } - } } impl MaterializeExecutor { diff --git a/src/stream/src/executor/source/source_executor.rs b/src/stream/src/executor/source/source_executor.rs index 25b153fcde8fd..455b6e0fc33e4 100644 --- a/src/stream/src/executor/source/source_executor.rs +++ b/src/stream/src/executor/source/source_executor.rs @@ -67,6 +67,8 @@ pub struct SourceExecutor { /// Rate limit in rows/s. rate_limit_rps: Option, + + is_shared: bool, } impl SourceExecutor { @@ -77,6 +79,7 @@ impl SourceExecutor { barrier_receiver: UnboundedReceiver, system_params: SystemParamsReaderRef, rate_limit_rps: Option, + is_shared: bool, ) -> Self { Self { actor_ctx, @@ -85,6 +88,7 @@ impl SourceExecutor { barrier_receiver: Some(barrier_receiver), system_params, rate_limit_rps, + is_shared, } } @@ -429,6 +433,7 @@ impl SourceExecutor { // init in-memory split states with persisted state if any core.init_split_state(boot_state.clone()); + let mut is_uninitialized = self.actor_ctx.initial_dispatch_num == 0; // Return the ownership of `stream_source_core` to the source executor. self.stream_source_core = Some(core); @@ -447,11 +452,16 @@ impl SourceExecutor { let mut stream = StreamReaderWithPause::::new(barrier_stream, source_chunk_reader); - // If the first barrier requires us to pause on startup, pause the stream. - if barrier.is_pause_on_startup() { + // - For shared source, pause until there's a MV. + // - If the first barrier requires us to pause on startup, pause the stream. + if (self.is_shared && is_uninitialized) || barrier.is_pause_on_startup() { + tracing::info!( + is_shared = self.is_shared, + is_uninitialized = is_uninitialized, + "source paused on startup" + ); stream.pause_stream(); } - // TODO: for shared source, pause until there's a MV. yield Message::Barrier(barrier); @@ -482,8 +492,17 @@ impl SourceExecutor { let epoch = barrier.epoch; + if self.is_shared + && is_uninitialized + && barrier.has_more_downstream_fragments(self.actor_ctx.id) + { + stream.resume_stream(); + is_uninitialized = false; + } + if let Some(mutation) = barrier.mutation.as_deref() { match mutation { + // XXX: Is it possible that the stream is self_paused, and we have pause mutation now? In this case, it will panic. Mutation::Pause => stream.pause_stream(), Mutation::Resume => stream.resume_stream(), Mutation::SourceChangeSplit(actor_splits) => { @@ -801,6 +820,7 @@ mod tests { barrier_rx, system_params_manager.get_params(), None, + false, ); let mut executor = executor.boxed().execute(); @@ -889,6 +909,7 @@ mod tests { barrier_rx, system_params_manager.get_params(), None, + false, ); let mut handler = executor.boxed().execute(); diff --git a/src/stream/src/executor/stream_reader.rs b/src/stream/src/executor/stream_reader.rs index 7fbf43a5794b0..92a31979c1c56 100644 --- a/src/stream/src/executor/stream_reader.rs +++ b/src/stream/src/executor/stream_reader.rs @@ -96,12 +96,14 @@ impl StreamReaderWithPause { /// Pause the data stream. pub fn pause_stream(&mut self) { assert!(!self.paused, "already paused"); + tracing::info!("data stream paused"); self.paused = true; } /// Resume the data stream. Panic if the data stream is not paused. pub fn resume_stream(&mut self) { assert!(self.paused, "not paused"); + tracing::info!("data stream resumed"); self.paused = false; } } diff --git a/src/stream/src/from_proto/source/trad_source.rs b/src/stream/src/from_proto/source/trad_source.rs index 2fce0ec71f5ca..0013792d51326 100644 --- a/src/stream/src/from_proto/source/trad_source.rs +++ b/src/stream/src/from_proto/source/trad_source.rs @@ -226,6 +226,7 @@ impl ExecutorBuilder for SourceExecutorBuilder { ) .boxed() } else { + let is_shared = source.info.as_ref().is_some_and(|info| info.is_shared()); SourceExecutor::new( params.actor_context.clone(), Some(stream_source_core), @@ -233,6 +234,7 @@ impl ExecutorBuilder for SourceExecutorBuilder { barrier_receiver, system_params, source.rate_limit, + is_shared, ) .boxed() } @@ -262,6 +264,7 @@ impl ExecutorBuilder for SourceExecutorBuilder { barrier_receiver, system_params, None, + false, ); Ok((params.info, exec).into()) }