diff --git a/src/meta/src/barrier/schedule.rs b/src/meta/src/barrier/schedule.rs index 1a1dda80ecc7e..2500e9d9fe704 100644 --- a/src/meta/src/barrier/schedule.rs +++ b/src/meta/src/barrier/schedule.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::collections::{HashSet, VecDeque}; +use std::iter; use std::iter::once; use std::sync::Arc; use std::time::{Duration, Instant}; @@ -247,60 +248,6 @@ impl BarrierScheduler { .context("failed to wait for barrier collect")? } - /// Run multiple commands and return when the command barrier is collected. It's ensured that - /// multiple commands are executed continuously. - /// - /// Returns the barrier info of each command, and a receiver for each command to check if it's - /// finished. - /// - /// TODO: atomicity of multiple commands is not guaranteed. - async fn run_multiple_commands_until_collected( - &self, - commands: Vec, - ) -> MetaResult<(Vec, Vec>>)> { - let mut contexts = Vec::with_capacity(commands.len()); - let mut finished = Vec::with_capacity(commands.len()); - let mut scheduleds = Vec::with_capacity(commands.len()); - - for command in commands { - let (started_tx, started_rx) = oneshot::channel(); - let (collect_tx, collect_rx) = oneshot::channel(); - let (finish_tx, finish_rx) = oneshot::channel(); - - contexts.push((started_rx, collect_rx)); - finished.push(finish_rx); - scheduleds.push(self.inner.new_scheduled( - command.need_checkpoint(), - command, - once(Notifier { - started: Some(started_tx), - collected: Some(collect_tx), - finished: Some(finish_tx), - }), - )); - } - - self.push(scheduleds)?; - - let mut infos = Vec::with_capacity(contexts.len()); - - for (injected_rx, collect_rx) in contexts { - // Wait for this command to be injected, and record the result. - tracing::trace!("waiting for injected_rx"); - let info = injected_rx.await.ok().context("failed to inject barrier")?; - infos.push(info); - - tracing::trace!("waiting for collect_rx"); - // Throw the error if it occurs when collecting this barrier. - collect_rx - .await - .ok() - .context("failed to collect barrier")??; - } - - Ok((infos, finished)) - } - /// Run multiple commands and return when they're all completely finished. It's ensured that /// multiple commands are executed continuously. /// @@ -374,15 +321,27 @@ impl BarrierScheduler { &self, command: Command, ) -> MetaResult<(BarrierInfo, Receiver>)> { - let (info, finished_rx) = self - .run_multiple_commands_until_collected(vec![command]) - .await?; - assert_eq!(info.len(), 1); - assert_eq!(finished_rx.len(), 1); - Ok(( - info.into_iter().next().unwrap(), - finished_rx.into_iter().next().unwrap(), - )) + let (started_tx, started_rx) = oneshot::channel(); + let (collect_tx, collect_rx) = oneshot::channel(); + let (finish_tx, finish_rx) = oneshot::channel(); + let scheduled = self.inner.new_scheduled( + command.need_checkpoint(), + command, + once(Notifier { + started: Some(started_tx), + collected: Some(collect_tx), + finished: Some(finish_tx), + }), + ); + + self.push(iter::once(scheduled))?; + let info = started_rx.await.ok().context("failed to inject barrier")?; + collect_rx + .await + .ok() + .context("failed to collect barrier")??; + + Ok((info, finish_rx)) } /// Run a command and return when it's completely finished. diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs index e235622c8b818..ed20efd2b6939 100644 --- a/src/meta/src/stream/stream_manager.rs +++ b/src/meta/src/stream/stream_manager.rs @@ -479,6 +479,7 @@ impl GlobalStreamManager { .barrier_scheduler .run_command_until_collected(command) .await; + // TODO(kwannoel): Notify frontend of catalog here. let result: MetaResult<()> = try { let (_barrier_info, finish_rx) = collect_result?; finish_rx.await.ok().context("failed to finish command")??;