Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(batch): notify frontend on execution panic #15740

Merged
merged 1 commit into from
Mar 18, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 20 additions & 2 deletions src/batch/src/task/task_execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -462,11 +462,27 @@ impl<C: BatchTaskContext> BatchTaskExecution<C> {

// Clone `self` to make compiler happy because of the move block.
let t_1 = self.clone();
let this = self.clone();
async fn notify_panic<C: BatchTaskContext>(
this: &BatchTaskExecution<C>,
state_tx: Option<&mut StateReporter>,
) {
let err_str = "execution panic".into();
if let Err(e) = this
.change_state_notify(TaskStatus::Failed, state_tx, Some(err_str))
.await
{
warn!(
error = %e.as_report(),
"The status receiver in FE has closed so the status push is failed",
);
}
}
// Spawn task for real execution.
let fut = async move {
trace!("Executing plan [{:?}]", task_id);
let sender = sender;
let mut state_tx = state_tx;
let mut state_tx_1 = state_tx.clone();
let batch_metrics = t_1.context.batch_metrics();

let task = |task_id: TaskId| async move {
Expand All @@ -481,7 +497,7 @@ impl<C: BatchTaskContext> BatchTaskExecution<C> {
// close it after task error has been set.
expr_context_scope(
expr_context,
t_1.run(exec, sender, state_tx.as_mut()).instrument(span),
t_1.run(exec, sender, state_tx_1.as_mut()).instrument(span),
)
.await;
};
Expand All @@ -492,6 +508,7 @@ impl<C: BatchTaskContext> BatchTaskExecution<C> {
AssertUnwindSafe(TaskMonitor::instrument(&monitor, task(task_id.clone())));
if let Err(error) = instrumented_task.rw_catch_unwind().await {
error!("Batch task {:?} panic: {:?}", task_id, error);
notify_panic(&this, state_tx.as_mut()).await;
}
let cumulative = monitor.cumulative();
batch_metrics
Expand All @@ -517,6 +534,7 @@ impl<C: BatchTaskContext> BatchTaskExecution<C> {
.await
{
error!("Batch task {:?} panic: {:?}", task_id, error);
notify_panic(&this, state_tx.as_mut()).await;
}
};

Expand Down
Loading