Skip to content

Commit

Permalink
simplify
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel committed Jun 18, 2024
1 parent b6ea411 commit 96eaf61
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 63 deletions.
85 changes: 22 additions & 63 deletions src/meta/src/barrier/schedule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use std::collections::{HashSet, VecDeque};
use std::iter;
use std::iter::once;
use std::sync::Arc;
use std::time::{Duration, Instant};
Expand Down Expand Up @@ -247,60 +248,6 @@ impl BarrierScheduler {
.context("failed to wait for barrier collect")?
}

/// Run multiple commands and return when the command barrier is collected. It's ensured that
/// multiple commands are executed continuously.
///
/// Returns the barrier info of each command, and a receiver for each command to check if it's
/// finished.
///
/// TODO: atomicity of multiple commands is not guaranteed.
async fn run_multiple_commands_until_collected(
&self,
commands: Vec<Command>,
) -> MetaResult<(Vec<BarrierInfo>, Vec<Receiver<MetaResult<()>>>)> {
let mut contexts = Vec::with_capacity(commands.len());
let mut finished = Vec::with_capacity(commands.len());
let mut scheduleds = Vec::with_capacity(commands.len());

for command in commands {
let (started_tx, started_rx) = oneshot::channel();
let (collect_tx, collect_rx) = oneshot::channel();
let (finish_tx, finish_rx) = oneshot::channel();

contexts.push((started_rx, collect_rx));
finished.push(finish_rx);
scheduleds.push(self.inner.new_scheduled(
command.need_checkpoint(),
command,
once(Notifier {
started: Some(started_tx),
collected: Some(collect_tx),
finished: Some(finish_tx),
}),
));
}

self.push(scheduleds)?;

let mut infos = Vec::with_capacity(contexts.len());

for (injected_rx, collect_rx) in contexts {
// Wait for this command to be injected, and record the result.
tracing::trace!("waiting for injected_rx");
let info = injected_rx.await.ok().context("failed to inject barrier")?;
infos.push(info);

tracing::trace!("waiting for collect_rx");
// Throw the error if it occurs when collecting this barrier.
collect_rx
.await
.ok()
.context("failed to collect barrier")??;
}

Ok((infos, finished))
}

/// Run multiple commands and return when they're all completely finished. It's ensured that
/// multiple commands are executed continuously.
///
Expand Down Expand Up @@ -374,15 +321,27 @@ impl BarrierScheduler {
&self,
command: Command,
) -> MetaResult<(BarrierInfo, Receiver<MetaResult<()>>)> {
let (info, finished_rx) = self
.run_multiple_commands_until_collected(vec![command])
.await?;
assert_eq!(info.len(), 1);
assert_eq!(finished_rx.len(), 1);
Ok((
info.into_iter().next().unwrap(),
finished_rx.into_iter().next().unwrap(),
))
let (started_tx, started_rx) = oneshot::channel();
let (collect_tx, collect_rx) = oneshot::channel();
let (finish_tx, finish_rx) = oneshot::channel();
let scheduled = self.inner.new_scheduled(
command.need_checkpoint(),
command,
once(Notifier {
started: Some(started_tx),
collected: Some(collect_tx),
finished: Some(finish_tx),
}),
);

self.push(iter::once(scheduled))?;
let info = started_rx.await.ok().context("failed to inject barrier")?;
collect_rx
.await
.ok()
.context("failed to collect barrier")??;

Ok((info, finish_rx))
}

/// Run a command and return when it's completely finished.
Expand Down
1 change: 1 addition & 0 deletions src/meta/src/stream/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,7 @@ impl GlobalStreamManager {
.barrier_scheduler
.run_command_until_collected(command)
.await;
// TODO(kwannoel): Notify frontend of catalog here.
let result: MetaResult<()> = try {
let (_barrier_info, finish_rx) = collect_result?;
finish_rx.await.ok().context("failed to finish command")??;
Expand Down

0 comments on commit 96eaf61

Please sign in to comment.