diff --git a/.typos.toml b/.typos.toml
index 4d4bbfca1c082..498d954a55d88 100644
--- a/.typos.toml
+++ b/.typos.toml
@@ -36,4 +36,5 @@ extend-exclude = [
     # We don't want to fix "fals" here, but may want in other places.
     # Ideally, we should just ignore that line: https://github.com/crate-ci/typos/issues/316
     "src/common/src/cast/mod.rs",
+    "src/tests/simulation/tests/integration_tests/scale/shared_source.rs",
 ]
diff --git a/Cargo.lock b/Cargo.lock
index c8bb3bb7afa86..1d9abe6ead5ba 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -11207,6 +11207,7 @@ dependencies = [
  "comfy-table",
  "crepe",
  "easy-ext",
+ "educe",
  "either",
  "enum-as-inner 0.6.0",
  "expect-test",
@@ -11554,6 +11555,7 @@ dependencies = [
  "madsim-etcd-client",
  "madsim-rdkafka",
  "madsim-tokio",
+ "maplit",
  "paste",
  "pin-project",
  "pretty_assertions",
diff --git a/ci/scripts/e2e-source-test.sh b/ci/scripts/e2e-source-test.sh
index 56a06ac756931..29f2a0ac7b5ce 100755
--- a/ci/scripts/e2e-source-test.sh
+++ b/ci/scripts/e2e-source-test.sh
@@ -130,7 +130,7 @@ echo "> inserted new rows into postgres"
 
 # start cluster w/o clean-data
 unset RISINGWAVE_CI
-export RUST_LOG="risingwave_stream=debug,risingwave_batch=info,risingwave_storage=info" \
+export RUST_LOG="risingwave_stream=debug,risingwave_batch=info,risingwave_storage=info"
 
 risedev dev ci-1cn-1fe-with-recovery
 echo "> wait for cluster recovery finish"
diff --git a/e2e_test/commands/risectl b/e2e_test/commands/risectl
new file mode 100755
index 0000000000000..2bb462d83fbab
--- /dev/null
+++ b/e2e_test/commands/risectl
@@ -0,0 +1,3 @@
+#!/usr/bin/env bash
+
+RUST_LOG="error" .risingwave/bin/risingwave/risectl "$@"
diff --git a/e2e_test/source_inline/kafka/shared_source.slt b/e2e_test/source_inline/kafka/shared_source.slt
index c481e609ffccd..44ed45857f4a3 100644
--- a/e2e_test/source_inline/kafka/shared_source.slt
+++ b/e2e_test/source_inline/kafka/shared_source.slt
@@ -21,7 +21,7 @@ create source s0 (v1 int, v2 varchar) with (
   scan.startup.mode = 'earliest'
 ) FORMAT PLAIN ENCODE JSON;
 
-query I
+query ?
 select count(*) from rw_internal_tables where name like '%s0%';
 ----
 1
@@ -67,7 +67,7 @@ create materialized view mv_2 as select * from s0;
 
 sleep 2s
 
-query IT rowsort
+query ?? rowsort
 select v1, v2 from s0;
 ----
 1 a
@@ -75,7 +75,7 @@ select v1, v2 from s0;
 3 c
 4 d
 
-query IT rowsort
+query ?? rowsort
 select v1, v2 from mv_1;
 ----
 1 a
@@ -83,7 +83,7 @@ select v1, v2 from mv_1;
 3 c
 4 d
 
-query IT rowsort
+query ?? rowsort
 select v1, v2 from mv_2;
 ----
 1 a
@@ -111,7 +111,7 @@ internal_table.mjs --name s0 --type source
 3,"{""split_info"": {""partition"": 3, ""start_offset"": 1, ""stop_offset"": null, ""topic"": ""shared_source""}, ""split_type"": ""kafka""}"
 
 
-query IT rowsort
+query ?? rowsort
 select v1, v2 from s0;
 ----
 1 a
@@ -123,7 +123,7 @@ select v1, v2 from s0;
 4 d
 4 dd
 
-query IT rowsort
+query ?? rowsort
 select v1, v2 from mv_1;
 ----
 1 a
@@ -173,7 +173,7 @@ done
 
 sleep 3s
 
-query IT rowsort
+query ?? rowsort
 select v1, count(*) from s0 group by v1;
 ----
 1 12
@@ -181,7 +181,7 @@ select v1, count(*) from s0 group by v1;
 3 12
 4 12
 
-query IT rowsort
+query ?? rowsort
 select v1, count(*) from mv_1 group by v1;
 ----
 1 12
@@ -210,5 +210,86 @@ internal_table.mjs --name mv_1 --type sourcebackfill
 3,"""Finished"""
 
 
+# # Note: the parallelism depends on the risedev profile.
+# # So scale tests below are commented out.
+
+# query ???
+# select name, flags, parallelism from rw_fragments JOIN rw_relations ON rw_fragments.table_id = rw_relations.id order by name;
+# ----
+# mv_1	{MVIEW,SOURCE_SCAN}	5
+# mv_2	{MVIEW,SOURCE_SCAN}	5
+# s0	{SOURCE}	5
+
+
+# system ok
+# risectl meta source-split-info --ignore-id
+# ----
+# Table
+# 	Fragment (Source)
+# 		Actor (1 splits): [0]
+# 		Actor (1 splits): [2]
+# 		Actor (1 splits): [3]
+# 		Actor (1 splits): [1]
+# 		Actor (0 splits): []
+# Table
+# 	Fragment (SourceScan)
+# 		Actor (1 splits): [0] <- Upstream Actor #1055: [0]
+# 		Actor (1 splits): [2] <- Upstream Actor #1056: [2]
+# 		Actor (1 splits): [3] <- Upstream Actor #1057: [3]
+# 		Actor (1 splits): [1] <- Upstream Actor #1058: [1]
+# 		Actor (0 splits): [] <- Upstream Actor #1059: []
+# Table
+# 	Fragment (SourceScan)
+# 		Actor (1 splits): [0] <- Upstream Actor #1055: [0]
+# 		Actor (1 splits): [2] <- Upstream Actor #1056: [2]
+# 		Actor (1 splits): [3] <- Upstream Actor #1057: [3]
+# 		Actor (1 splits): [1] <- Upstream Actor #1058: [1]
+# 		Actor (0 splits): [] <- Upstream Actor #1059: []
+
+
+# # scale down
+# statement ok
+# ALTER MATERIALIZED VIEW mv_1 SET PARALLELISM TO 2;
+
+# # should have no effect, because of NoShuffle
+# # TODO: support ALTER SOURCE SET PARALLELISM, then we can
+# query ???
+# select name, flags, parallelism from rw_fragments JOIN rw_relations ON rw_fragments.table_id = rw_relations.id order by name;
+# ----
+# mv_1	{MVIEW,SOURCE_SCAN}	5
+# mv_2	{MVIEW,SOURCE_SCAN}	5
+# s0	{SOURCE}	5
+
+# system ok
+# risectl meta source-split-info --ignore-id
+# ----
+# Table
+# 	Fragment (Source)
+# 		Actor (1 splits): [0]
+# 		Actor (1 splits): [2]
+# 		Actor (1 splits): [3]
+# 		Actor (1 splits): [1]
+# 		Actor (0 splits): []
+# Table
+# 	Fragment (SourceScan)
+# 		Actor (1 splits): [0] <- Upstream Actor #1055: [0]
+# 		Actor (1 splits): [2] <- Upstream Actor #1056: [2]
+# 		Actor (1 splits): [3] <- Upstream Actor #1057: [3]
+# 		Actor (1 splits): [1] <- Upstream Actor #1058: [1]
+# 		Actor (0 splits): [] <- Upstream Actor #1059: []
+# Table
+# 	Fragment (SourceScan)
+# 		Actor (1 splits): [0] <- Upstream Actor #1055: [0]
+# 		Actor (1 splits): [2] <- Upstream Actor #1056: [2]
+# 		Actor (1 splits): [3] <- Upstream Actor #1057: [3]
+# 		Actor (1 splits): [1] <- Upstream Actor #1058: [1]
+# 		Actor (0 splits): [] <- Upstream Actor #1059: []
+
+
+# # Manual test: change the parallelism of the compute node, kill and restart, and check
+# # risedev ctl meta source-split-info --ignore-id
+# # risedev psql -c "select name, flags, parallelism from rw_fragments JOIN rw_relations ON rw_fragments.table_id = rw_relations.id order by name;"
+
+
 statement ok
 drop source s0 cascade;
diff --git a/src/ctl/src/cmd_impl/meta/cluster_info.rs b/src/ctl/src/cmd_impl/meta/cluster_info.rs
index cbc21ca6ec610..42178814a0bc7 100644
--- a/src/ctl/src/cmd_impl/meta/cluster_info.rs
+++ b/src/ctl/src/cmd_impl/meta/cluster_info.rs
@@ -31,7 +31,7 @@ pub async fn get_cluster_info(context: &CtlContext) -> anyhow::Result<GetCluster
     Ok(response)
 }
 
-pub async fn source_split_info(context: &CtlContext) -> anyhow::Result<()> {
+pub async fn source_split_info(context: &CtlContext, ignore_id: bool) -> anyhow::Result<()> {
     let GetClusterInfoResponse {
         worker_nodes: _,
         source_infos: _,
@@ -40,37 +40,113 @@ pub async fn source_split_info(context: &CtlContext) -> anyhow::Result<()> {
         revision: _,
     } = get_cluster_info(context).await?;
 
+    let mut actor_splits_map: BTreeMap<u32, String> = BTreeMap::new();
+
+    // build actor_splits_map
     for table_fragment in &table_fragments {
         if table_fragment.actor_splits.is_empty() {
             continue;
         }
 
-        println!("Table #{}", table_fragment.table_id);
-
         for fragment in table_fragment.fragments.values() {
             let fragment_type_mask = fragment.fragment_type_mask;
             if fragment_type_mask & FragmentTypeFlag::Source as u32 == 0
-                || fragment_type_mask & FragmentTypeFlag::Dml as u32 != 0
+                && fragment_type_mask & FragmentTypeFlag::SourceScan as u32 == 0
             {
+                // no source or source backfill
+                continue;
+            }
+            if fragment_type_mask & FragmentTypeFlag::Dml as u32 != 0 {
                 // skip dummy source for dml fragment
                 continue;
             }
 
-            println!("\tFragment #{}", fragment.fragment_id);
             for actor in &fragment.actors {
                 if let Some(ConnectorSplits { splits }) = actor_splits.remove(&actor.actor_id) {
                     let splits = splits
                         .iter()
                         .map(|split| SplitImpl::try_from(split).unwrap())
                         .map(|split| split.id())
-                        .collect_vec();
+                        .collect_vec()
+                        .join(",");
+                    actor_splits_map.insert(actor.actor_id, splits);
+                }
+            }
+        }
+    }
 
+    // print in the second iteration. Otherwise we don't have upstream splits info
+    for table_fragment in &table_fragments {
+        if table_fragment.actor_splits.is_empty() {
+            continue;
+        }
+        if ignore_id {
+            println!("Table");
+        } else {
+            println!("Table #{}", table_fragment.table_id);
+        }
+        for fragment in table_fragment.fragments.values() {
+            let fragment_type_mask = fragment.fragment_type_mask;
+            if fragment_type_mask & FragmentTypeFlag::Source as u32 == 0
+                && fragment_type_mask & FragmentTypeFlag::SourceScan as u32 == 0
+            {
+                // no source or source backfill
+                continue;
+            }
+            if fragment_type_mask & FragmentTypeFlag::Dml as u32 != 0 {
+                // skip dummy source for dml fragment
+                continue;
+            }
+
+            println!(
+                "\tFragment{} ({})",
+                if ignore_id {
+                    "".to_string()
+                } else {
+                    format!(" #{}", fragment.fragment_id)
+                },
+                if fragment_type_mask == FragmentTypeFlag::Source as u32 {
+                    "Source"
+                } else {
+                    "SourceScan"
+                }
+            );
+            for actor in &fragment.actors {
+                if let Some(splits) = actor_splits_map.get(&actor.actor_id) {
                     println!(
-                        "\t\tActor #{:<3} ({}): [{}]",
-                        actor.actor_id,
+                        "\t\tActor{} ({} splits): [{}]{}",
+                        if ignore_id {
+                            "".to_string()
+                        } else {
+                            format!(" #{:<3}", actor.actor_id,)
+                        },
                         splits.len(),
-                        splits.join(",")
+                        splits,
+                        if !actor.upstream_actor_id.is_empty() {
+                            assert!(
+                                actor.upstream_actor_id.len() == 1,
+                                "should have only one upstream actor, got {actor:?}"
+                            );
+                            let upstream_splits =
+                                actor_splits_map.get(&actor.upstream_actor_id[0]).unwrap();
+                            format!(
+                                " <- Upstream Actor{}: [{}]",
+                                if ignore_id {
+                                    "".to_string()
+                                } else {
+                                    format!(" #{}", actor.upstream_actor_id[0])
+                                },
+                                upstream_splits
+                            )
+                        } else {
+                            "".to_string()
+                        }
                     );
+                } else {
+                    println!(
+                        "\t\tError: Actor #{:<3} (not found in actor_splits)",
+                        actor.actor_id,
+                    )
                 }
             }
         }
diff --git a/src/ctl/src/lib.rs b/src/ctl/src/lib.rs
index d1deba4f99140..34c5be6ace21b 100644
--- a/src/ctl/src/lib.rs
+++ b/src/ctl/src/lib.rs
@@ -404,7 +404,10 @@ enum MetaCommands {
     /// get cluster info
     ClusterInfo,
     /// get source split info
-    SourceSplitInfo,
+    SourceSplitInfo {
+        #[clap(long)]
+        ignore_id: bool,
+    },
     /// Reschedule the actors in the stream graph
     ///
     /// The format is `fragment_id-[worker_id:count]+[worker_id:count]`
@@ -808,8 +811,8 @@ async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> {
         Commands::Meta(MetaCommands::Pause) => cmd_impl::meta::pause(context).await?,
         Commands::Meta(MetaCommands::Resume) => cmd_impl::meta::resume(context).await?,
         Commands::Meta(MetaCommands::ClusterInfo) => cmd_impl::meta::cluster_info(context).await?,
-        Commands::Meta(MetaCommands::SourceSplitInfo) => {
-            cmd_impl::meta::source_split_info(context).await?
+        Commands::Meta(MetaCommands::SourceSplitInfo { ignore_id }) => {
+            cmd_impl::meta::source_split_info(context, ignore_id).await?
         }
         Commands::Meta(MetaCommands::Reschedule {
             from,
diff --git a/src/meta/Cargo.toml b/src/meta/Cargo.toml
index 4511e9f61d894..a7f37bf505910 100644
--- a/src/meta/Cargo.toml
+++ b/src/meta/Cargo.toml
@@ -28,6 +28,7 @@ clap = { workspace = true }
 comfy-table = "7"
 crepe = "0.1"
 easy-ext = "1"
+educe = "0.6"
 either = "1"
 enum-as-inner = "0.6"
 etcd-client = { workspace = true }
diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs
index feb7a959083bb..e1605b0aa61dc 100644
--- a/src/meta/src/rpc/ddl_controller.rs
+++ b/src/meta/src/rpc/ddl_controller.rs
@@ -368,12 +368,14 @@ impl DdlController {
         }
     }
 
+    #[tracing::instrument(skip(self), level = "debug")]
     pub async fn alter_parallelism(
         &self,
         table_id: u32,
         parallelism: PbTableParallelism,
         mut deferred: bool,
     ) -> MetaResult<()> {
+        tracing::info!("alter parallelism");
         if self.barrier_manager.check_status_running().is_err() {
             tracing::info!(
                 "alter parallelism is set to deferred mode because the system is in recovery state"
diff --git a/src/meta/src/stream/scale.rs b/src/meta/src/stream/scale.rs
index 634cca76f0d7e..5891faf4dc31c 100644
--- a/src/meta/src/stream/scale.rs
+++ b/src/meta/src/stream/scale.rs
@@ -183,17 +183,26 @@ impl CustomFragmentInfo {
     }
 }
 
+use educe::Educe;
+
+// The debug implementation is arbitrary. Just used in debug logs.
+#[derive(Educe)]
+#[educe(Debug)]
 pub struct RescheduleContext {
     /// Meta information for all Actors
+    #[educe(Debug(ignore))]
     actor_map: HashMap<ActorId, CustomActorInfo>,
     /// Status of all Actors, used to find the location of the `Actor`
     actor_status: BTreeMap<ActorId, WorkerId>,
     /// Meta information of all `Fragment`, used to find the `Fragment`'s `Actor`
+    #[educe(Debug(ignore))]
     fragment_map: HashMap<FragmentId, CustomFragmentInfo>,
     /// Index of all `Actor` upstreams, specific to `Dispatcher`
     upstream_dispatchers: HashMap<ActorId, Vec<(FragmentId, DispatcherId, DispatcherType)>>,
-    /// Fragments with stream source
+    /// Fragments with `StreamSource`
     stream_source_fragment_ids: HashSet<FragmentId>,
+    /// Fragments with `StreamSourceBackfill`
+    stream_source_backfill_fragment_ids: HashSet<FragmentId>,
     /// Target fragments in `NoShuffle` relation
     no_shuffle_target_fragment_ids: HashSet<FragmentId>,
     /// Source fragments in `NoShuffle` relation
@@ -770,6 +779,7 @@ impl ScaleController {
         }
 
         let mut stream_source_fragment_ids = HashSet::new();
+        let mut stream_source_backfill_fragment_ids = HashSet::new();
         let mut no_shuffle_reschedule = HashMap::new();
         for (fragment_id, WorkerReschedule { worker_actor_diff }) in &*reschedule {
             let fragment = fragment_map
@@ -798,6 +808,7 @@ impl ScaleController {
             // correspondence, so we need to clone the reschedule plan to the downstream of all
             // cascading relations.
             if no_shuffle_source_fragment_ids.contains(fragment_id) {
+                // This fragment is a NoShuffle's upstream.
                 let mut queue: VecDeque<_> = fragment_dispatcher_map
                     .get(fragment_id)
                     .unwrap()
@@ -887,6 +898,17 @@ impl ScaleController {
                 "reschedule plan rewritten with NoShuffle reschedule {:?}",
                 no_shuffle_reschedule
             );
+
+            for noshuffle_downstream in no_shuffle_reschedule.keys() {
+                let fragment = fragment_map.get(noshuffle_downstream).unwrap();
+                // SourceScan is always a NoShuffle downstream, rescheduled together with the upstream Source.
+                if (fragment.get_fragment_type_mask() & FragmentTypeFlag::SourceScan as u32) != 0 {
+                    let stream_node = fragment.actor_template.nodes.as_ref().unwrap();
+                    if stream_node.find_source_backfill().is_some() {
+                        stream_source_backfill_fragment_ids.insert(fragment.fragment_id);
+                    }
+                }
+            }
         }
 
         // Modifications for NoShuffle downstream.
@@ -898,6 +920,7 @@ impl ScaleController {
             fragment_map,
             upstream_dispatchers,
             stream_source_fragment_ids,
+            stream_source_backfill_fragment_ids,
             no_shuffle_target_fragment_ids,
             no_shuffle_source_fragment_ids,
             fragment_dispatcher_map,
@@ -924,9 +947,11 @@ impl ScaleController {
         HashMap<FragmentId, Reschedule>,
         HashMap<FragmentId, HashSet<ActorId>>,
     )> {
+        tracing::debug!("build_reschedule_context, reschedules: {:#?}", reschedules);
         let ctx = self
             .build_reschedule_context(&mut reschedules, options, table_parallelisms)
             .await?;
+        tracing::debug!("reschedule context: {:#?}", ctx);
         let reschedules = reschedules;
 
         // Here, the plan for both upstream and downstream of the NO_SHUFFLE Fragment should already have been populated.
@@ -1264,9 +1289,9 @@ impl ScaleController {
             }
         }
 
-        // For stream source fragments, we need to reallocate the splits.
+        // For stream source & source backfill fragments, we need to reallocate the splits.
         // Because we are in the Pause state, so it's no problem to reallocate
-        let mut fragment_stream_source_actor_splits = HashMap::new();
+        let mut fragment_actor_splits = HashMap::new();
         for fragment_id in reschedules.keys() {
             let actors_after_reschedule =
                 fragment_actors_after_reschedule.get(fragment_id).unwrap();
@@ -1284,13 +1309,51 @@ impl ScaleController {
 
                 let actor_splits = self
                     .source_manager
-                    .migrate_splits(*fragment_id, &prev_actor_ids, &curr_actor_ids)
+                    .migrate_splits_for_source_actors(
+                        *fragment_id,
+                        &prev_actor_ids,
+                        &curr_actor_ids,
+                    )
                     .await?;
 
-                fragment_stream_source_actor_splits.insert(*fragment_id, actor_splits);
+                tracing::debug!(
+                    "source actor splits: {:?}, fragment_id: {}",
+                    actor_splits,
+                    fragment_id
+                );
+                fragment_actor_splits.insert(*fragment_id, actor_splits);
+            }
+        }
+        // We use 2 iterations to make sure source actors are migrated first, and then align backfill actors
+        if !ctx.stream_source_backfill_fragment_ids.is_empty() {
+            for fragment_id in reschedules.keys() {
+                let actors_after_reschedule =
+                    fragment_actors_after_reschedule.get(fragment_id).unwrap();
+
+                if ctx
+                    .stream_source_backfill_fragment_ids
+                    .contains(fragment_id)
+                {
+                    let fragment = ctx.fragment_map.get(fragment_id).unwrap();
+
+                    let curr_actor_ids = actors_after_reschedule.keys().cloned().collect_vec();
+
+                    let actor_splits = self.source_manager.migrate_splits_for_backfill_actors(
+                        *fragment_id,
+                        &fragment.upstream_fragment_ids,
+                        &curr_actor_ids,
+                        &fragment_actor_splits,
+                        &no_shuffle_upstream_actor_map,
+                    )?;
+                    tracing::debug!(
+                        "source backfill actor splits: {:?}, fragment_id: {}",
+                        actor_splits,
+                        fragment_id
+                    );
+                    fragment_actor_splits.insert(*fragment_id, actor_splits);
+                }
             }
         }
-        // TODO: support migrate splits for SourceBackfill
 
         // Generate fragment reschedule plan
         let mut reschedule_fragment: HashMap<FragmentId, Reschedule> =
@@ -1428,7 +1491,7 @@ impl ScaleController {
             let upstream_fragment_dispatcher_ids =
                 upstream_fragment_dispatcher_set.into_iter().collect_vec();
 
-            let actor_splits = fragment_stream_source_actor_splits
+            let actor_splits = fragment_actor_splits
                 .get(&fragment_id)
                 .cloned()
                 .unwrap_or_default();
@@ -1479,6 +1542,8 @@ impl ScaleController {
             .pre_apply_reschedules(fragment_created_actors)
             .await;
 
+        tracing::debug!("analyze_reschedule_plan result: {:#?}", reschedule_fragment);
+
         Ok((reschedule_fragment, applied_reschedules))
     }
 
@@ -1651,6 +1716,7 @@ impl ScaleController {
             .cloned()
             .collect_vec();
 
+        // TODO: Does SourceBackfill need this?
         fn replace_merge_node_upstream(
             stream_node: &mut StreamNode,
             applied_upstream_fragment_actor_ids: &HashMap<FragmentId, Vec<ActorId>>,
@@ -1867,12 +1933,12 @@ impl ScaleController {
             actor_location: &mut HashMap<ActorId, WorkerId>,
             table_fragment_id_map: &mut HashMap<u32, HashSet<FragmentId>>,
             fragment_actor_id_map: &mut HashMap<FragmentId, HashSet<u32>>,
-            table_fragments: &BTreeMap<TableId, TableFragments>,
+            all_table_fragments: &BTreeMap<TableId, TableFragments>,
         ) -> MetaResult<()> {
             // This is only for assertion purposes and will be removed once the dispatcher_id is guaranteed to always correspond to the downstream fragment_id,
             // such as through the foreign key constraints in the SQL backend.
             let mut actor_fragment_id_map_for_check = HashMap::new();
-            for table_fragments in table_fragments.values() {
+            for table_fragments in all_table_fragments.values() {
                 for (fragment_id, fragment) in &table_fragments.fragments {
                     for actor in &fragment.actors {
                         let prev =
@@ -1883,7 +1949,7 @@ impl ScaleController {
                 }
             }
 
-            for (table_id, table_fragments) in table_fragments {
+            for (table_id, table_fragments) in all_table_fragments {
                 for (fragment_id, fragment) in &table_fragments.fragments {
                     for actor in &fragment.actors {
                         fragment_actor_id_map
@@ -1911,8 +1977,15 @@ impl ScaleController {
                                         dispatcher.dispatcher_id as FragmentId
                                     );
                                 } else {
+                                    tracing::error!(
+                                        "downstream actor id {} from actor {} (fragment {}) not found in actor_fragment_id_map_for_check: {actor_fragment_id_map_for_check:?}\n\ndispatchers: {:#?}",
+                                        downstream_actor_id,
+                                        actor.actor_id,
+                                        actor.fragment_id,
+                                        actor.dispatcher
+                                    );
                                     bail!(
-                                        "downstream actor id {} from actor {} not found in fragment_actor_id_map",
+                                        "downstream actor id {} from actor {} not found",
                                         downstream_actor_id,
                                         actor.actor_id,
                                     );
@@ -2029,6 +2102,17 @@ impl ScaleController {
                 .await?;
             }
         }
+        tracing::debug!(
+            ?worker_ids,
+            ?table_parallelisms,
+            ?no_shuffle_source_fragment_ids,
+            ?no_shuffle_target_fragment_ids,
+            ?fragment_distribution_map,
+            ?actor_location,
+            ?table_fragment_id_map,
+            ?fragment_actor_id_map,
+            "generate_table_resize_plan, after build_index"
+        );
 
         let mut target_plan = HashMap::new();
 
@@ -2149,7 +2233,10 @@ impl ScaleController {
         }
 
         target_plan.retain(|_, plan| !plan.worker_actor_diff.is_empty());
-
+        tracing::debug!(
+            ?target_plan,
+            "generate_table_resize_plan finished target_plan"
+        );
         Ok(target_plan)
     }
 
@@ -2380,6 +2467,7 @@ impl ScaleController {
 
 /// At present, for table level scaling, we use the strategy `TableResizePolicy`.
 /// Currently, this is used as an internal interface, so it won’t be included in Protobuf.
+#[derive(Debug)]
 pub struct TableResizePolicy {
     pub(crate) worker_ids: BTreeSet<WorkerId>,
     pub(crate) table_parallelisms: HashMap<u32, TableParallelism>,
diff --git a/src/meta/src/stream/source_manager.rs b/src/meta/src/stream/source_manager.rs
index a383bfee8e46a..751ee92beebc1 100644
--- a/src/meta/src/stream/source_manager.rs
+++ b/src/meta/src/stream/source_manager.rs
@@ -231,7 +231,8 @@ pub struct SourceManagerCore {
     /// `source_id` -> `(fragment_id, upstream_fragment_id)`
     backfill_fragments: HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>,
 
-    /// Splits assigned per actor
+    /// Splits assigned per actor,
+    /// incl. both `Source` and `SourceBackfill`.
     actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
 }
 
@@ -468,13 +469,13 @@ impl Default for SplitDiffOptions {
 }
 
 /// Reassigns splits if there are new splits or dropped splits,
-/// i.e., `actor_splits` and `discovered_splits` differ.
+/// i.e., `actor_splits` and `discovered_splits` differ, or actors are rescheduled.
 ///
 /// The existing splits will remain unmoved in their currently assigned actor.
 ///
 /// If an actor has an upstream actor, it should be a backfill executor,
-/// and its splits should be aligned with the upstream actor. `reassign_splits` should not be used in this case.
-/// Use `align_backfill_splits` instead.
+/// and its splits should be aligned with the upstream actor. **`reassign_splits` should not be used in this case.
+/// Use `align_backfill_splits` instead.**
 ///
 /// - `fragment_id`: just for logging
 ///
@@ -790,11 +791,10 @@ impl SourceManager {
 
     /// Migrates splits from previous actors to the new actors for a rescheduled fragment.
     ///
-    /// Very occasionally split removal may happen
-    /// during scaling, in which case we need to use the old splits for reallocation instead of the
-    /// latest splits (which may be missing), so that we can resolve the split removal in the next
-    /// command.
-    pub async fn migrate_splits(
+    /// Very occasionally split removal may happen during scaling, in which case we need to
+    /// use the old splits for reallocation instead of the latest splits (which may be missing),
+    /// so that we can resolve the split removal in the next command.
+    pub async fn migrate_splits_for_source_actors(
         &self,
         fragment_id: FragmentId,
         prev_actor_ids: &[ActorId],
@@ -817,7 +817,7 @@ impl SourceManager {
             fragment_id,
             empty_actor_splits,
             &prev_splits,
-            // pre-allocate splits is the first time getting splits and it does not have scale in scene
+            // pre-allocate splits is the first time getting splits and it does not have scale-in scene
             SplitDiffOptions::default(),
         )
         .unwrap_or_default();
@@ -825,6 +825,43 @@ impl SourceManager {
         Ok(diff)
     }
 
+    /// Migrates splits from previous actors to the new actors for a rescheduled fragment.
+    pub fn migrate_splits_for_backfill_actors(
+        &self,
+        fragment_id: FragmentId,
+        upstream_fragment_ids: &Vec<FragmentId>,
+        curr_actor_ids: &[ActorId],
+        fragment_actor_splits: &HashMap<FragmentId, HashMap<ActorId, Vec<SplitImpl>>>,
+        no_shuffle_upstream_actor_map: &HashMap<ActorId, HashMap<FragmentId, ActorId>>,
+    ) -> MetaResult<HashMap<ActorId, Vec<SplitImpl>>> {
+        // align splits for backfill fragments with its upstream source fragment
+        debug_assert!(upstream_fragment_ids.len() == 1);
+        let upstream_fragment_id = upstream_fragment_ids[0];
+        let actors = no_shuffle_upstream_actor_map
+            .iter()
+            .filter(|(id, _)| curr_actor_ids.contains(id))
+            .map(|(id, upstream_fragment_actors)| {
+                debug_assert!(upstream_fragment_actors.len() == 1);
+                (
+                    *id,
+                    vec![*upstream_fragment_actors.get(&upstream_fragment_id).unwrap()],
+                )
+            });
+        let upstream_assignment = fragment_actor_splits.get(&upstream_fragment_id).unwrap();
+        tracing::info!(
+            fragment_id,
+            upstream_fragment_id,
+            ?upstream_assignment,
+            "migrate_splits_for_backfill_actors"
+        );
+        Ok(align_backfill_splits(
+            actors,
+            upstream_assignment,
+            fragment_id,
+            upstream_fragment_id,
+        )?)
+    }
+
     /// Allocates splits to actors for a newly created source executor.
     pub async fn allocate_splits(&self, table_id: &TableId) -> MetaResult<SplitAssignment> {
         let core = self.core.lock().await;
diff --git a/src/tests/simulation/Cargo.toml b/src/tests/simulation/Cargo.toml
index 8729207c0d025..c82f2b7d5911e 100644
--- a/src/tests/simulation/Cargo.toml
+++ b/src/tests/simulation/Cargo.toml
@@ -25,6 +25,7 @@ glob = "0.3"
 itertools = { workspace = true }
 lru = { workspace = true }
 madsim = "0.2.30"
+maplit = "1"
 paste = "1"
 pin-project = "1.1"
 pretty_assertions = "1"
diff --git a/src/tests/simulation/src/cluster.rs b/src/tests/simulation/src/cluster.rs
index 26fdc3a8757e1..a9ffba0063562 100644
--- a/src/tests/simulation/src/cluster.rs
+++ b/src/tests/simulation/src/cluster.rs
@@ -158,27 +158,16 @@ impl Configuration {
     /// Provides a configuration for scale test which ensures that the arrangement backfill is disabled,
     /// so table scan will use `no_shuffle`.
     pub fn for_scale_no_shuffle() -> Self {
-        // Embed the config file and create a temporary file at runtime. The file will be deleted
-        // automatically when it's dropped.
-        let config_path = {
-            let mut file =
-                tempfile::NamedTempFile::new().expect("failed to create temp config file");
-            file.write_all(include_bytes!("risingwave-scale.toml"))
-                .expect("failed to write config file");
-            file.into_temp_path()
-        };
+        let mut conf = Self::for_scale();
+        conf.per_session_queries =
+            vec!["SET STREAMING_USE_ARRANGEMENT_BACKFILL = false;".into()].into();
+        conf
+    }
 
-        Configuration {
-            config_path: ConfigPath::Temp(config_path.into()),
-            frontend_nodes: 2,
-            compute_nodes: 3,
-            meta_nodes: 3,
-            compactor_nodes: 2,
-            compute_node_cores: 2,
-            per_session_queries: vec!["SET STREAMING_USE_ARRANGEMENT_BACKFILL = false;".into()]
-                .into(),
-            ..Default::default()
-        }
+    pub fn for_scale_shared_source() -> Self {
+        let mut conf = Self::for_scale();
+        conf.per_session_queries = vec!["SET RW_ENABLE_SHARED_SOURCE = true;".into()].into();
+        conf
     }
 
     pub fn for_auto_parallelism(
diff --git a/src/tests/simulation/src/ctl_ext.rs b/src/tests/simulation/src/ctl_ext.rs
index 9b57673e49c16..3986a826e21e7 100644
--- a/src/tests/simulation/src/ctl_ext.rs
+++ b/src/tests/simulation/src/ctl_ext.rs
@@ -12,9 +12,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-#![cfg_attr(not(madsim), expect(unused_imports))]
-
-use std::collections::{HashMap, HashSet};
+use std::collections::{BTreeMap, HashMap, HashSet};
 use std::ffi::OsString;
 use std::fmt::Write;
 use std::sync::Arc;
@@ -23,17 +21,17 @@ use anyhow::{anyhow, Result};
 use cfg_or_panic::cfg_or_panic;
 use clap::Parser;
 use itertools::Itertools;
-use rand::seq::{IteratorRandom, SliceRandom};
+use rand::seq::IteratorRandom;
 use rand::{thread_rng, Rng};
 use risingwave_common::catalog::TableId;
 use risingwave_common::hash::WorkerSlotId;
+use risingwave_connector::source::{SplitImpl, SplitMetaData};
 use risingwave_hummock_sdk::{CompactionGroupId, HummockSstableId};
 use risingwave_pb::meta::table_fragments::fragment::FragmentDistributionType;
 use risingwave_pb::meta::table_fragments::PbFragment;
 use risingwave_pb::meta::update_worker_node_schedulability_request::Schedulability;
 use risingwave_pb::meta::GetClusterInfoResponse;
 use risingwave_pb::stream_plan::StreamNode;
-use serde::de::IntoDeserializer;
 
 use self::predicate::BoxedPredicate;
 use crate::cluster::Cluster;
@@ -76,7 +74,7 @@ pub mod predicate {
         Box::new(p)
     }
 
-    /// There exists operators whose identity contains `s` in the fragment.
+    /// There exists operators whose identity contains `s` in the fragment (case insensitive).
     pub fn identity_contains(s: impl Into<String>) -> BoxedPredicate {
         let s: String = s.into();
         let p = move |f: &PbFragment| {
@@ -363,6 +361,30 @@ impl Cluster {
         Ok(response)
     }
 
+    /// `table_id -> actor_id -> splits`
+    pub async fn list_source_splits(&self) -> Result<BTreeMap<u32, BTreeMap<u32, String>>> {
+        let info = self.get_cluster_info().await?;
+        let mut res = BTreeMap::new();
+
+        for table in info.table_fragments {
+            let mut table_actor_splits = BTreeMap::new();
+
+            for (actor_id, splits) in table.actor_splits {
+                let splits = splits
+                    .splits
+                    .iter()
+                    .map(|split| SplitImpl::try_from(split).unwrap())
+                    .map(|split| split.id())
+                    .collect_vec()
+                    .join(",");
+                table_actor_splits.insert(actor_id, splits);
+            }
+            res.insert(table.table_id, table_actor_splits);
+        }
+
+        Ok(res)
+    }
+
     // update node schedulability
     #[cfg_or_panic(madsim)]
     async fn update_worker_node_schedulability(
diff --git a/src/tests/simulation/tests/integration_tests/scale/mod.rs b/src/tests/simulation/tests/integration_tests/scale/mod.rs
index f6940f072409e..3c7a702dc6290 100644
--- a/src/tests/simulation/tests/integration_tests/scale/mod.rs
+++ b/src/tests/simulation/tests/integration_tests/scale/mod.rs
@@ -20,6 +20,7 @@ mod nexmark_q4;
 mod nexmark_source;
 mod no_shuffle;
 mod schedulability;
+mod shared_source;
 mod singleton_migration;
 mod sink;
 mod streaming_parallelism;
diff --git a/src/tests/simulation/tests/integration_tests/scale/shared_source.rs b/src/tests/simulation/tests/integration_tests/scale/shared_source.rs
new file mode 100644
index 0000000000000..175b3a043100c
--- /dev/null
+++ b/src/tests/simulation/tests/integration_tests/scale/shared_source.rs
@@ -0,0 +1,192 @@
+// Copyright 2024 RisingWave Labs
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use std::collections::BTreeMap;
+
+use anyhow::Result;
+use itertools::Itertools;
+use maplit::{convert_args, hashmap};
+use risingwave_common::hash::WorkerSlotId;
+use risingwave_pb::meta::table_fragments::Fragment;
+use risingwave_simulation::cluster::{Cluster, Configuration};
+use risingwave_simulation::ctl_ext::predicate::{identity_contains, no_identity_contains};
+
+const CREATE_SOURCE: &str = r#"
+CREATE SOURCE s(v1 int, v2 varchar) WITH (
+    connector='kafka',
+    properties.bootstrap.server='192.168.11.1:29092',
+    topic='shared_source'
+) FORMAT PLAIN ENCODE JSON;"#;
+
+fn actor_upstream(fragment: &Fragment) -> Vec<(u32, Vec<u32>)> {
+    fragment
+        .actors
+        .iter()
+        .map(|actor| (actor.actor_id, actor.upstream_actor_id.clone()))
+        .collect_vec()
+}
+
+async fn validate_splits_aligned(cluster: &mut Cluster) -> Result<()> {
+    let source_backfill_fragment = cluster
+        .locate_one_fragment([identity_contains("StreamSourceScan")])
+        .await?;
+    // The result of scaling is non-deterministic.
+    // So we just print the result here, instead of asserting with a fixed value.
+    let actor_upstream = actor_upstream(&source_backfill_fragment.inner);
+    tracing::info!(
+        "{}",
+        actor_upstream
+            .iter()
+            .format_with("\n", |(actor_id, upstream), f| f(&format_args!(
+                "{} <- {:?}",
+                actor_id, upstream
+            )))
+    );
+    let splits = cluster.list_source_splits().await?;
+    tracing::info!("{:#?}", splits);
+    let actor_splits: BTreeMap<u32, String> = splits
+        .values()
+        .flat_map(|m| m.clone().into_iter())
+        .collect();
+    for (actor, upstream) in actor_upstream {
+        assert!(upstream.len() == 1, "invalid upstream: {:?}", upstream);
+        let upstream_actor = upstream[0];
+        assert_eq!(
+            actor_splits.get(&actor).unwrap(),
+            actor_splits.get(&upstream_actor).unwrap()
+        );
+    }
+    Ok(())
+}
+
+#[tokio::test]
+async fn test_shared_source() -> Result<()> {
+    tracing_subscriber::fmt::Subscriber::builder()
+        .with_max_level(tracing::Level::ERROR)
+        .with_env_filter("risingwave_stream::executor::source::source_backfill_executor=DEBUG,integration_tests=DEBUG")
+        .init();
+
+    let mut cluster = Cluster::start(Configuration::for_scale_shared_source()).await?;
+    cluster.create_kafka_topics(convert_args!(hashmap!(
+        "shared_source" => 4,
+    )));
+    let mut session = cluster.start_session();
+
+    session.run("set rw_implicit_flush = true;").await?;
+
+    session.run(CREATE_SOURCE).await?;
+    session
+        .run("create materialized view mv as select count(*) from s group by v1;")
+        .await?;
+    let source_fragment = cluster
+        .locate_one_fragment([
+            identity_contains("Source"),
+            no_identity_contains("StreamSourceScan"),
+        ])
+        .await?;
+    let source_workers = source_fragment.all_worker_count().into_keys().collect_vec();
+    let source_backfill_fragment = cluster
+        .locate_one_fragment([identity_contains("StreamSourceScan")])
+        .await?;
+    let source_backfill_workers = source_backfill_fragment
+        .all_worker_count()
+        .into_keys()
+        .collect_vec();
+    let hash_agg_fragment = cluster
+        .locate_one_fragment([identity_contains("hashagg")])
+        .await?;
+    let hash_agg_workers = hash_agg_fragment
+        .all_worker_count()
+        .into_keys()
+        .collect_vec();
+    validate_splits_aligned(&mut cluster).await?;
+    expect_test::expect![[r#"
+        1 1 HASH {2} {} {SOURCE} 6
+        2 3 HASH {4,3} {3} {MVIEW} 6
+        3 3 HASH {5} {1} {SOURCE_SCAN} 6"#]]
+    .assert_eq(&cluster.run("select * from rw_fragments;").await?);
+    expect_test::expect![[r#"
+        1 CREATED ADAPTIVE
+        3 CREATED ADAPTIVE"#]]
+    .assert_eq(&cluster.run("select * from rw_table_fragments;").await?);
+
+    // SourceBackfill cannot be scaled because of NoShuffle.
+    assert!(
+        &cluster
+            .reschedule(
+                source_backfill_fragment
+                    .reschedule([WorkerSlotId::new(source_backfill_workers[0], 0)], []),
+            )
+            .await.unwrap_err().to_string().contains("rescheduling NoShuffle downstream fragment (maybe Chain fragment) is forbidden, please use NoShuffle upstream fragment (like Materialized fragment) to scale"),
+    );
+
+    // hash agg can be scaled independently
+    cluster
+        .reschedule(hash_agg_fragment.reschedule([WorkerSlotId::new(hash_agg_workers[0], 0)], []))
+        .await
+        .unwrap();
+    expect_test::expect![[r#"
+        1 1 HASH {2} {} {SOURCE} 6
+        2 3 HASH {4,3} {3} {MVIEW} 5
+        3 3 HASH {5} {1} {SOURCE_SCAN} 6"#]]
+    .assert_eq(&cluster.run("select * from rw_fragments;").await?);
+
+    // source is the NoShuffle upstream. It can be scaled, and the downstream SourceBackfill will be scaled together.
+    cluster
+        .reschedule(source_fragment.reschedule(
+            [
+                WorkerSlotId::new(source_workers[0], 0),
+                WorkerSlotId::new(source_workers[0], 1),
+                WorkerSlotId::new(source_workers[2], 0),
+            ],
+            [],
+        ))
+        .await
+        .unwrap();
+    validate_splits_aligned(&mut cluster).await?;
+    expect_test::expect![[r#"
+        1 1 HASH {2} {} {SOURCE} 3
+        2 3 HASH {4,3} {3} {MVIEW} 5
+        3 3 HASH {5} {1} {SOURCE_SCAN} 3"#]]
+    .assert_eq(&cluster.run("select * from rw_fragments;").await?);
+    expect_test::expect![[r#"
+        1 CREATED CUSTOM
+        3 CREATED CUSTOM"#]]
+    .assert_eq(&cluster.run("select * from rw_table_fragments;").await?);
+
+    // resolve_no_shuffle for backfill fragment is OK, which will scale the upstream together.
+    cluster
+        .reschedule_resolve_no_shuffle(source_backfill_fragment.reschedule(
+            [],
+            [
+                WorkerSlotId::new(source_workers[0], 0),
+                WorkerSlotId::new(source_workers[0], 1),
+                WorkerSlotId::new(source_workers[2], 0),
+                WorkerSlotId::new(source_workers[2], 1),
+            ],
+        ))
+        .await
+        .unwrap();
+    validate_splits_aligned(&mut cluster).await?;
+    expect_test::expect![[r#"
+        1 1 HASH {2} {} {SOURCE} 7
+        2 3 HASH {4,3} {3} {MVIEW} 5
+        3 3 HASH {5} {1} {SOURCE_SCAN} 7"#]]
+    .assert_eq(&cluster.run("select * from rw_fragments;").await?);
+    expect_test::expect![[r#"
+1 CREATED CUSTOM
+3 CREATED CUSTOM"#]]
+    .assert_eq(&cluster.run("select * from rw_table_fragments;").await?);
+    Ok(())
+}