Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel committed Jun 14, 2024
1 parent 0fbf134 commit b6ea411
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 4 deletions.
24 changes: 23 additions & 1 deletion src/meta/src/barrier/schedule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,14 @@ impl BarrierScheduler {
.context("failed to wait for barrier collect")?
}

async fn run_multiple_commands_until_collect(
/// Run multiple commands and return when the command barrier is collected. It's ensured that
/// multiple commands are executed continuously.
///
/// Returns the barrier info of each command, and a receiver for each command to check if it's
/// finished.
///
/// TODO: atomicity of multiple commands is not guaranteed.
async fn run_multiple_commands_until_collected(
&self,
commands: Vec<Command>,
) -> MetaResult<(Vec<BarrierInfo>, Vec<Receiver<MetaResult<()>>>)> {
Expand Down Expand Up @@ -363,6 +370,21 @@ impl BarrierScheduler {
.map(|i| i[1])
}

pub async fn run_command_until_collected(
&self,
command: Command,
) -> MetaResult<(BarrierInfo, Receiver<MetaResult<()>>)> {
let (info, finished_rx) = self
.run_multiple_commands_until_collected(vec![command])
.await?;
assert_eq!(info.len(), 1);
assert_eq!(finished_rx.len(), 1);
Ok((
info.into_iter().next().unwrap(),
finished_rx.into_iter().next().unwrap(),
))
}

/// Run a command and return when it's completely finished.
///
/// Returns the barrier info of the actual command.
Expand Down
15 changes: 12 additions & 3 deletions src/meta/src/stream/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::collections::{BTreeSet, HashMap, HashSet};
use std::sync::Arc;

use anyhow::Context;
use futures::future::join_all;
use itertools::Itertools;
use risingwave_common::catalog::TableId;
Expand Down Expand Up @@ -474,7 +475,17 @@ impl GlobalStreamManager {
create_type,
};
tracing::debug!("sending Command::CreateStreamingJob");
if let Err(err) = self.barrier_scheduler.run_command(command).await {
let collect_result = self
.barrier_scheduler
.run_command_until_collected(command)
.await;
let result: MetaResult<()> = try {
let (_barrier_info, finish_rx) = collect_result?;
finish_rx.await.ok().context("failed to finish command")??;
};
if let Err(err) = result {
// TODO(kwannoel): This will not be needed anymore,
// if we only commit the state once the barrier is finished.
if create_type == CreateType::Foreground || err.is_cancelled() {
let mut table_ids = HashSet::from_iter(std::iter::once(table_id));
if let Some(dummy_table_id) = replace_table_id {
Expand All @@ -486,10 +497,8 @@ impl GlobalStreamManager {
.await?;
}
}

return Err(err);
}

Ok(())
}

Expand Down

0 comments on commit b6ea411

Please sign in to comment.