From c298a736ed9c7f41ba0987ce2341026671869459 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Mon, 26 Aug 2024 20:47:26 +0800 Subject: [PATCH] feat: inject pause/resume for creating sink into table (#17651) (#18250) Signed-off-by: Shanicky Chen Co-authored-by: Shanicky Chen --- src/meta/src/stream/stream_manager.rs | 12 +++++++++++- src/stream/src/executor/mod.rs | 5 ++++- 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs index 220f69bc0a58d..612fc754da24a 100644 --- a/src/meta/src/stream/stream_manager.rs +++ b/src/meta/src/stream/stream_manager.rs @@ -427,6 +427,8 @@ impl GlobalStreamManager { "built actors finished" ); + let need_pause = replace_table_job_info.is_some(); + if let Some((streaming_job, context, table_fragments)) = replace_table_job_info { self.build_actors( &table_fragments, @@ -516,7 +518,15 @@ impl GlobalStreamManager { } }; let result: MetaResult = try { - self.barrier_scheduler.run_command(command).await?; + if need_pause { + // Special handling is required when creating sink into table, we need to pause the stream to avoid data loss. + self.barrier_scheduler + .run_config_change_command_with_pause(command) + .await?; + } else { + self.barrier_scheduler.run_command(command).await?; + } + tracing::debug!(?streaming_job, "first barrier collected for stream job"); let result = self .metadata_manager diff --git a/src/stream/src/executor/mod.rs b/src/stream/src/executor/mod.rs index ff32af10d1198..5ade4c8243e03 100644 --- a/src/stream/src/executor/mod.rs +++ b/src/stream/src/executor/mod.rs @@ -462,8 +462,11 @@ impl Barrier { Some( Mutation::Update { .. } // new actors for scaling | Mutation::Add(AddMutation { pause: true, .. }) // new streaming job, or recovery - | Mutation::AddAndUpdate(AddMutation { pause: true, .. }, _) // new actors for replacing table ) => true, + Some(Mutation::AddAndUpdate(AddMutation { pause, ..}, _)) => { + assert!(pause); + true + }, _ => false, } }