From 15fc8768f3bc237514c30bf5e02f4605f6af357f Mon Sep 17 00:00:00 2001
From: August <pin@singularity-data.com>
Date: Thu, 11 Jan 2024 20:04:22 +0800
Subject: [PATCH] fix resolving of actor changes for sink into table

---
 src/meta/src/barrier/command.rs          | 76 ++++++++++++++----------
 src/meta/src/manager/catalog/fragment.rs |  2 +-
 2 files changed, 47 insertions(+), 31 deletions(-)

diff --git a/src/meta/src/barrier/command.rs b/src/meta/src/barrier/command.rs
index b292ce926afd2..bb31b2898b702 100644
--- a/src/meta/src/barrier/command.rs
+++ b/src/meta/src/barrier/command.rs
@@ -85,6 +85,31 @@ pub struct ReplaceTablePlan {
     pub init_split_assignment: SplitAssignment,
 }
 
+impl ReplaceTablePlan {
+    fn actor_changes(&self) -> CommandActorChanges {
+        let worker_actors = self.new_table_fragments.worker_actor_ids();
+        let barrier_inject_actors: &HashSet<_> = &self
+            .new_table_fragments
+            .barrier_inject_actor_ids()
+            .into_iter()
+            .collect();
+        let to_add = worker_actors
+            .into_iter()
+            .flat_map(|(node_id, actors)| {
+                actors.into_iter().map(move |actor_id| ActorDesc {
+                    id: actor_id,
+                    node_id,
+                    is_injectable: barrier_inject_actors.contains(&actor_id),
+                })
+            })
+            .collect_vec();
+        CommandActorChanges::Actor {
+            to_add,
+            to_remove: self.old_table_fragments.actor_ids().into_iter().collect(),
+        }
+    }
+}
+
 /// [`Command`] is the input of [`crate::barrier::GlobalBarrierManager`]. For different commands,
 /// it will build different barriers to send, and may do different stuffs after the barrier is
 /// collected.
@@ -188,14 +213,16 @@ impl Command {
                 to_remove: node_actors.values().flatten().cloned().collect(),
             },
             Command::CreateStreamingJob {
-                table_fragments, ..
+                table_fragments,
+                replace_table,
+                ..
             } => {
                 let worker_actors = table_fragments.worker_actor_ids();
                 let barrier_inject_actors: &HashSet<_> = &table_fragments
                     .barrier_inject_actor_ids()
                     .into_iter()
                     .collect();
-                let to_add = worker_actors
+                let mut to_add = worker_actors
                     .into_iter()
                     .flat_map(|(node_id, actors)| {
                         actors.into_iter().map(move |actor_id| ActorDesc {
@@ -206,9 +233,22 @@ impl Command {
                     })
                     .collect_vec();
 
-                CommandActorChanges::Actor {
-                    to_add,
-                    to_remove: Default::default(),
+                if let Some(plan) = replace_table {
+                    let changes = plan.actor_changes();
+                    let CommandActorChanges::Actor {
+                        to_add: to_add_plan,
+                        to_remove,
+                    } = changes
+                    else {
+                        unreachable!("replace plan should have actor changes")
+                    };
+                    to_add.extend(to_add_plan);
+                    CommandActorChanges::Actor { to_add, to_remove }
+                } else {
+                    CommandActorChanges::Actor {
+                        to_add,
+                        to_remove: Default::default(),
+                    }
                 }
             }
             Command::CancelStreamingJob(table_fragments) => CommandActorChanges::Actor {
@@ -233,31 +273,7 @@ impl Command {
 
                 CommandActorChanges::Actor { to_add, to_remove }
             }
-            Command::ReplaceTable(ReplaceTablePlan {
-                old_table_fragments,
-                new_table_fragments,
-                ..
-            }) => {
-                let worker_actors = new_table_fragments.worker_actor_ids();
-                let barrier_inject_actors: &HashSet<_> = &new_table_fragments
-                    .barrier_inject_actor_ids()
-                    .into_iter()
-                    .collect();
-                let to_add = worker_actors
-                    .into_iter()
-                    .flat_map(|(node_id, actors)| {
-                        actors.into_iter().map(move |actor_id| ActorDesc {
-                            id: actor_id,
-                            node_id,
-                            is_injectable: barrier_inject_actors.contains(&actor_id),
-                        })
-                    })
-                    .collect_vec();
-                CommandActorChanges::Actor {
-                    to_add,
-                    to_remove: old_table_fragments.actor_ids().into_iter().collect(),
-                }
-            }
+            Command::ReplaceTable(plan) => plan.actor_changes(),
             Command::SourceSplitAssignment(_) => CommandActorChanges::None,
             Command::Throttle(_) => CommandActorChanges::None,
         }
diff --git a/src/meta/src/manager/catalog/fragment.rs b/src/meta/src/manager/catalog/fragment.rs
index eb3dc162a12fc..2477f7b370dfb 100644
--- a/src/meta/src/manager/catalog/fragment.rs
+++ b/src/meta/src/manager/catalog/fragment.rs
@@ -1306,7 +1306,7 @@ impl FragmentManager {
                         if let Some(node) = downstream_actor.nodes.as_mut() {
                             update_merge_node_upstream(
                                 node,
-                                &fragment_id,
+                                fragment_id,
                                 &removed_actor_ids,
                                 &added_actor_ids,
                             );