diff --git a/src/meta/src/barrier/schedule.rs b/src/meta/src/barrier/schedule.rs index a061afa727df9..1a1dda80ecc7e 100644 --- a/src/meta/src/barrier/schedule.rs +++ b/src/meta/src/barrier/schedule.rs @@ -247,7 +247,14 @@ impl BarrierScheduler { .context("failed to wait for barrier collect")? } - async fn run_multiple_commands_until_collect( + /// Run multiple commands and return when the command barrier is collected. It's ensured that + /// multiple commands are executed continuously. + /// + /// Returns the barrier info of each command, and a receiver for each command to check if it's + /// finished. + /// + /// TODO: atomicity of multiple commands is not guaranteed. + async fn run_multiple_commands_until_collected( &self, commands: Vec, ) -> MetaResult<(Vec, Vec>>)> { @@ -363,6 +370,21 @@ impl BarrierScheduler { .map(|i| i[1]) } + pub async fn run_command_until_collected( + &self, + command: Command, + ) -> MetaResult<(BarrierInfo, Receiver>)> { + let (info, finished_rx) = self + .run_multiple_commands_until_collected(vec![command]) + .await?; + assert_eq!(info.len(), 1); + assert_eq!(finished_rx.len(), 1); + Ok(( + info.into_iter().next().unwrap(), + finished_rx.into_iter().next().unwrap(), + )) + } + /// Run a command and return when it's completely finished. /// /// Returns the barrier info of the actual command. diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs index d582095367fdc..e235622c8b818 100644 --- a/src/meta/src/stream/stream_manager.rs +++ b/src/meta/src/stream/stream_manager.rs @@ -15,6 +15,7 @@ use std::collections::{BTreeSet, HashMap, HashSet}; use std::sync::Arc; +use anyhow::Context; use futures::future::join_all; use itertools::Itertools; use risingwave_common::catalog::TableId; @@ -474,7 +475,17 @@ impl GlobalStreamManager { create_type, }; tracing::debug!("sending Command::CreateStreamingJob"); - if let Err(err) = self.barrier_scheduler.run_command(command).await { + let collect_result = self + .barrier_scheduler + .run_command_until_collected(command) + .await; + let result: MetaResult<()> = try { + let (_barrier_info, finish_rx) = collect_result?; + finish_rx.await.ok().context("failed to finish command")??; + }; + if let Err(err) = result { + // TODO(kwannoel): This will not be needed anymore, + // if we only commit the state once the barrier is finished. if create_type == CreateType::Foreground || err.is_cancelled() { let mut table_ids = HashSet::from_iter(std::iter::once(table_id)); if let Some(dummy_table_id) = replace_table_id { @@ -486,10 +497,8 @@ impl GlobalStreamManager { .await?; } } - return Err(err); } - Ok(()) }