diff --git a/proto/hummock.proto b/proto/hummock.proto index faf9ee4f375e..b7f80d911a40 100644 --- a/proto/hummock.proto +++ b/proto/hummock.proto @@ -328,6 +328,7 @@ message CompactTask { JOIN_HANDLE_FAILED = 11; TRACK_SST_OBJECT_ID_FAILED = 12; NO_AVAIL_CPU_RESOURCE_CANCELED = 13; + HEARTBEAT_PROGRESS_CANCELED = 14; } // SSTs to be compacted, which will be removed from LSM after compaction repeated InputLevel input_ssts = 1; diff --git a/src/meta/src/hummock/compactor_manager.rs b/src/meta/src/hummock/compactor_manager.rs index fcbac63817ff..c3e6c10325f6 100644 --- a/src/meta/src/hummock/compactor_manager.rs +++ b/src/meta/src/hummock/compactor_manager.rs @@ -101,6 +101,13 @@ impl Compactor { Ok(()) } + pub fn cancel_tasks(&self, task_ids: Vec) -> MetaResult<()> { + for task_id in task_ids { + self.cancel_task(task_id)?; + } + Ok(()) + } + pub fn context_id(&self) -> HummockContextId { self.context_id } diff --git a/src/meta/src/hummock/manager/mod.rs b/src/meta/src/hummock/manager/mod.rs index 1b1f99df38fb..a8661dab7737 100644 --- a/src/meta/src/hummock/manager/mod.rs +++ b/src/meta/src/hummock/manager/mod.rs @@ -2612,16 +2612,41 @@ impl HummockManager { // progress (meta + compactor) // 2. meta periodically scans the task and performs a cancel on // the meta side for tasks that are not updated by heartbeat - for task in compactor_manager.get_heartbeat_expired_tasks() { + // for task in compactor_manager.get_heartbeat_expired_tasks() { + // if let Err(e) = hummock_manager + // .cancel_compact_task( + // task.task_id, + // TaskStatus::HeartbeatCanceled, + // ) + // .await + // { + // tracing::error!( + // task_id = task.task_id, + // error = %e.as_report(), + // "Attempt to remove compaction task due to elapsed heartbeat failed. We will continue to track its heartbeat + // until we can successfully report its status", + // ); + // } + // } + let expired_tasks: Vec = compactor_manager + .get_heartbeat_expired_tasks() + .into_iter() + .map(|task| task.task_id) + .collect(); + if !expired_tasks.is_empty() { + tracing::info!( + expired_tasks = ?expired_tasks, + "Heartbeat expired compaction tasks detected. Attempting to cancel tasks.", + ); if let Err(e) = hummock_manager - .cancel_compact_task( - task.task_id, + .cancel_compact_tasks( + expired_tasks.clone(), TaskStatus::HeartbeatCanceled, ) .await { tracing::error!( - task_id = task.task_id, + expired_tasks = ?expired_tasks, error = %e.as_report(), "Attempt to remove compaction task due to elapsed heartbeat failed. We will continue to track its heartbeat until we can successfully report its status", @@ -2993,24 +3018,20 @@ impl HummockManager { progress, }) => { let compactor_manager = hummock_manager.compactor_manager.clone(); - let cancel_tasks = compactor_manager.update_task_heartbeats(&progress); if let Some(compactor) = compactor_manager.get_compactor(context_id) { - // TODO: task cancellation can be batched - for task in cancel_tasks { + let cancel_tasks = compactor_manager.update_task_heartbeats(&progress).into_iter().map(|task|task.task_id).collect::>(); + if !cancel_tasks.is_empty() { tracing::info!( - "Task with group_id {} task_id {} with context_id {} has expired due to lack of visible progress", - task.compaction_group_id, - task.task_id, + "Tasks cancel with task_ids {:?} with context_id {} has expired due to lack of visible progress", + cancel_tasks, context_id, ); - if let Err(e) = - hummock_manager - .cancel_compact_task(task.task_id, TaskStatus::HeartbeatCanceled) + if let Err(e) = hummock_manager + .cancel_compact_tasks(cancel_tasks.clone(), TaskStatus::HeartbeatProgressCanceled) .await { tracing::error!( - task_id = task.task_id, error = %e.as_report(), "Attempt to remove compaction task due to elapsed heartbeat failed. We will continue to track its heartbeat until we can successfully report its status." @@ -3020,13 +3041,45 @@ impl HummockManager { // Forcefully cancel the task so that it terminates // early on the compactor // node. - let _ = compactor.cancel_task(task.task_id); + let _ = compactor.cancel_tasks(cancel_tasks.clone()); tracing::info!( - "CancelTask operation for task_id {} has been sent to node with context_id {}", - context_id, - task.task_id + "CancelTask operation for task_id {:?} has been sent to node with context_id {}", + cancel_tasks, + context_id ); } + + // for task in cancel_tasks { + // tracing::info!( + // "Task cancel with group_id {} task_id {} with context_id {} has expired due to lack of visible progress", + // task.compaction_group_id, + // task.task_id, + // context_id, + // ); + + // if let Err(e) = + // hummock_manager + // .cancel_compact_task(task.task_id, TaskStatus::HeartbeatProgressCanceled) + // .await + // { + // tracing::error!( + // task_id = task.task_id, + // error = %e.as_report(), + // "Attempt to remove compaction task due to elapsed heartbeat failed. We will continue to track its heartbeat + // until we can successfully report its status." + // ); + // } + + // // Forcefully cancel the task so that it terminates + // // early on the compactor + // // node. + // let _ = compactor.cancel_task(task.task_id); + // tracing::info!( + // "CancelTask operation for task_id {} has been sent to node with context_id {}", + // context_id, + // task.task_id + // ); + // } } else { // Determine the validity of the compactor streaming rpc. When the compactor no longer exists in the manager, the stream will be removed. // Tip: Connectivity to the compactor will be determined through the `send_event` operation. When send fails, it will be removed from the manager diff --git a/src/storage/src/hummock/compactor/mod.rs b/src/storage/src/hummock/compactor/mod.rs index ad1f9dd555e6..a94c2b51e501 100644 --- a/src/storage/src/hummock/compactor/mod.rs +++ b/src/storage/src/hummock/compactor/mod.rs @@ -510,7 +510,7 @@ pub fn start_compactor( &request_sender, ); - continue 'start_stream; + continue 'consume_stream; } running_task_parallelism