Skip to content

Commit

Permalink
rename
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Oct 10, 2024
1 parent c613991 commit 1e82a19
Showing 1 changed file with 23 additions and 26 deletions.
49 changes: 23 additions & 26 deletions src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ struct CheckpointControl {

/// Command that has been collected but is still completing.
/// The join handle of the completing future is stored.
completing_command: CompletingCommand,
completing_task: CompletingTask,

hummock_version_stats: HummockVersionStats,

Expand All @@ -235,7 +235,7 @@ impl CheckpointControl {
Self {
command_ctx_queue: Default::default(),
creating_streaming_job_controls: Default::default(),
completing_command: CompletingCommand::None,
completing_task: CompletingTask::None,
hummock_version_stats: context.hummock_manager.get_version_stats().await,
create_mview_tracker,
context,
Expand All @@ -244,8 +244,8 @@ impl CheckpointControl {

fn total_command_num(&self) -> usize {
self.command_ctx_queue.len()
+ match &self.completing_command {
CompletingCommand::GlobalStreamingGraph { .. } => 1,
+ match &self.completing_task {
CompletingTask::Completing { .. } => 1,
_ => 0,
}
}
Expand Down Expand Up @@ -398,9 +398,9 @@ impl CheckpointControl {
.command_ctx_queue
.last_key_value()
.map(|(_, x)| &x.command_ctx)
.or(match &self.completing_command {
CompletingCommand::None | CompletingCommand::Err(_) => None,
CompletingCommand::GlobalStreamingGraph { command_ctx, .. } => command_ctx.as_ref(),
.or(match &self.completing_task {
CompletingTask::None | CompletingTask::Err(_) => None,
CompletingTask::Completing { command_ctx, .. } => command_ctx.as_ref(),
})
.map(|command_ctx| command_ctx.command.should_pause_inject_barrier())
.unwrap_or(false);
Expand All @@ -409,10 +409,9 @@ impl CheckpointControl {
.values()
.map(|node| &node.command_ctx)
.chain(
match &self.completing_command {
CompletingCommand::None | CompletingCommand::Err(_) => None,
CompletingCommand::GlobalStreamingGraph { command_ctx, .. } =>
command_ctx.as_ref(),
match &self.completing_task {
CompletingTask::None | CompletingTask::Err(_) => None,
CompletingTask::Completing { command_ctx, .. } => command_ctx.as_ref(),
}
.into_iter()
)
Expand All @@ -426,9 +425,9 @@ impl CheckpointControl {
/// We need to make sure there are no changes when doing recovery
pub async fn clear_on_err(&mut self, err: &MetaError) {
// join spawned completing command to finish no matter it succeeds or not.
let is_err = match replace(&mut self.completing_command, CompletingCommand::None) {
CompletingCommand::None => false,
CompletingCommand::GlobalStreamingGraph { join_handle, .. } => {
let is_err = match replace(&mut self.completing_task, CompletingTask::None) {
CompletingTask::None => false,
CompletingTask::Completing { join_handle, .. } => {
info!("waiting for completing command to finish in recovery");
match join_handle.await {
Err(e) => {
Expand All @@ -442,7 +441,7 @@ impl CheckpointControl {
Ok(Ok(_)) => false,
}
}
CompletingCommand::Err(_) => true,
CompletingTask::Err(_) => true,
};
if !is_err {
// continue to finish the pending collected barrier.
Expand Down Expand Up @@ -509,9 +508,9 @@ impl BarrierEpochState {
}
}

enum CompletingCommand {
enum CompletingTask {
None,
GlobalStreamingGraph {
Completing {
command_ctx: Option<Arc<CommandContext>>,
table_ids_to_finish: HashSet<TableId>,
creating_job_epochs: Vec<(TableId, u64)>,
Expand Down Expand Up @@ -1369,7 +1368,7 @@ impl CheckpointControl {
) -> impl Future<Output = MetaResult<BarrierCompleteOutput>> + 'a {
// If there is no completing barrier, try to start completing the earliest barrier if
// it has been collected.
if let CompletingCommand::None = &self.completing_command {
if let CompletingTask::None = &self.completing_task {
if let Some(task) = self.next_complete_barrier_task(Some(scheduled_barriers)) {
{
let command_ctx = task
Expand All @@ -1379,7 +1378,7 @@ impl CheckpointControl {
let table_ids_to_finish = task.table_ids_to_finish.clone();
let creating_job_epochs = task.creating_job_epochs.clone();
let join_handle = tokio::spawn(self.context.clone().complete_barrier(task));
self.completing_command = CompletingCommand::GlobalStreamingGraph {
self.completing_task = CompletingTask::Completing {
command_ctx,
join_handle,
table_ids_to_finish,
Expand All @@ -1393,9 +1392,7 @@ impl CheckpointControl {
}

async fn next_completed_barrier_inner(&mut self) -> MetaResult<BarrierCompleteOutput> {
let CompletingCommand::GlobalStreamingGraph { join_handle, .. } =
&mut self.completing_command
else {
let CompletingTask::Completing { join_handle, .. } = &mut self.completing_task else {
return pending().await;
};

Expand All @@ -1409,15 +1406,15 @@ impl CheckpointControl {
// It's important to reset the completing_command after await no matter the result is err
// or not, and otherwise the join handle will be polled again after ready.
let next_completing_command_status = if let Err(e) = &join_result {
CompletingCommand::Err(e.clone())
CompletingTask::Err(e.clone())
} else {
CompletingCommand::None
CompletingTask::None
};
let completed_command =
replace(&mut self.completing_command, next_completing_command_status);
replace(&mut self.completing_task, next_completing_command_status);
self.hummock_version_stats = join_result?;

must_match!(completed_command, CompletingCommand::GlobalStreamingGraph {
must_match!(completed_command, CompletingTask::Completing {
table_ids_to_finish,
creating_job_epochs,
..
Expand Down

0 comments on commit 1e82a19

Please sign in to comment.