diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs index 220f69bc0a58..612fc754da24 100644 --- a/src/meta/src/stream/stream_manager.rs +++ b/src/meta/src/stream/stream_manager.rs @@ -427,6 +427,8 @@ impl GlobalStreamManager { "built actors finished" ); + let need_pause = replace_table_job_info.is_some(); + if let Some((streaming_job, context, table_fragments)) = replace_table_job_info { self.build_actors( &table_fragments, @@ -516,7 +518,15 @@ impl GlobalStreamManager { } }; let result: MetaResult = try { - self.barrier_scheduler.run_command(command).await?; + if need_pause { + // Special handling is required when creating sink into table, we need to pause the stream to avoid data loss. + self.barrier_scheduler + .run_config_change_command_with_pause(command) + .await?; + } else { + self.barrier_scheduler.run_command(command).await?; + } + tracing::debug!(?streaming_job, "first barrier collected for stream job"); let result = self .metadata_manager diff --git a/src/stream/src/executor/mod.rs b/src/stream/src/executor/mod.rs index ff32af10d119..5ade4c8243e0 100644 --- a/src/stream/src/executor/mod.rs +++ b/src/stream/src/executor/mod.rs @@ -462,8 +462,11 @@ impl Barrier { Some( Mutation::Update { .. } // new actors for scaling | Mutation::Add(AddMutation { pause: true, .. }) // new streaming job, or recovery - | Mutation::AddAndUpdate(AddMutation { pause: true, .. }, _) // new actors for replacing table ) => true, + Some(Mutation::AddAndUpdate(AddMutation { pause, ..}, _)) => { + assert!(pause); + true + }, _ => false, } }