Skip to content

Commit

Permalink
fix cancel bug + cleanup of creating_job_info
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel committed Oct 27, 2023
1 parent 92eb5f0 commit 407a301
Showing 1 changed file with 18 additions and 5 deletions.
23 changes: 18 additions & 5 deletions src/meta/src/stream/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ impl CreatingStreamingJobInfo {
if shutdown_tx.send(CreatingState::Canceling { finish_tx: tx }).await.is_ok() {
receivers.insert(job_id, rx);
} else {
tracing::warn!("failed to send canceling state");
tracing::warn!(id=?job_id, "failed to send canceling state");
}
} else {
// If these job ids do not exist in streaming_jobs,
Expand Down Expand Up @@ -267,9 +267,12 @@ impl GlobalStreamManager {
while let Some(state) = receiver.recv().await {
match state {
CreatingState::Failed { reason } => {
tracing::debug!(id=?table_id, "stream job failed");
self.creating_job_info.delete_job(table_id).await;
return Err(reason);
}
CreatingState::Canceling { finish_tx } => {
tracing::debug!(id=?table_id, "cancelling streaming job");
if let Ok(table_fragments) = self
.fragment_manager
.select_table_fragments_by_table_id(&table_id)
Expand Down Expand Up @@ -327,15 +330,18 @@ impl GlobalStreamManager {
let _ = finish_tx.send(()).inspect_err(|_| {
tracing::warn!("failed to notify cancelled: {table_id}")
});
self.creating_job_info.delete_job(table_id).await;
return Err(MetaError::cancelled("create".into()));
}
}
CreatingState::Created => return Ok(()),
CreatingState::Created => {
self.creating_job_info.delete_job(table_id).await;
return Ok(());
}
}
}
};

self.creating_job_info.delete_job(table_id).await;
res
}

Expand Down Expand Up @@ -596,22 +602,29 @@ impl GlobalStreamManager {
// NOTE(kwannoel): For recovered stream jobs, we can directly cancel them by running the barrier command,
// since Barrier manager manages the recovered stream jobs.
let futures = recovered_job_ids.into_iter().map(|id| async move {
tracing::debug!(?id, "cancelling recovered streaming job");
let result: MetaResult<()> = try {
let fragment = self
.fragment_manager
.select_table_fragments_by_table_id(&id)
.await?;
if fragment.is_created() {
Err(MetaError::invalid_parameter(format!(
"streaming job {} is already created",
id
)))?;
}
self.barrier_scheduler
.run_command(Command::CancelStreamingJob(fragment))
.await?;
};
match result {
Ok(_) => {
tracing::info!("cancelled recovered streaming job {id}");
tracing::info!(?id, "cancelled recovered streaming job");
Some(id)
},
Err(_) => {
tracing::error!("failed to cancel recovered streaming job {id}, does {id} correspond to any jobs in `SHOW JOBS`?");
tracing::error!(?id, "failed to cancel recovered streaming job, does it correspond to any jobs in `SHOW JOBS`?");
None
},
}
Expand Down

0 comments on commit 407a301

Please sign in to comment.