Skip to content

Commit

Permalink
fix(compactor): fix
Browse files Browse the repository at this point in the history
  • Loading branch information
Li0k committed May 16, 2024
1 parent 7063dee commit b90250c
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 19 deletions.
1 change: 1 addition & 0 deletions proto/hummock.proto
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,7 @@ message CompactTask {
JOIN_HANDLE_FAILED = 11;
TRACK_SST_OBJECT_ID_FAILED = 12;
NO_AVAIL_CPU_RESOURCE_CANCELED = 13;
HEARTBEAT_PROGRESS_CANCELED = 14;
}
// SSTs to be compacted, which will be removed from LSM after compaction
repeated InputLevel input_ssts = 1;
Expand Down
7 changes: 7 additions & 0 deletions src/meta/src/hummock/compactor_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,13 @@ impl Compactor {
Ok(())
}

pub fn cancel_tasks(&self, task_ids: Vec<u64>) -> MetaResult<()> {
for task_id in task_ids {
self.cancel_task(task_id)?;
}
Ok(())
}

pub fn context_id(&self) -> HummockContextId {
self.context_id
}
Expand Down
89 changes: 71 additions & 18 deletions src/meta/src/hummock/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2612,16 +2612,41 @@ impl HummockManager {
// progress (meta + compactor)
// 2. meta periodically scans the task and performs a cancel on
// the meta side for tasks that are not updated by heartbeat
for task in compactor_manager.get_heartbeat_expired_tasks() {
// for task in compactor_manager.get_heartbeat_expired_tasks() {
// if let Err(e) = hummock_manager
// .cancel_compact_task(
// task.task_id,
// TaskStatus::HeartbeatCanceled,
// )
// .await
// {
// tracing::error!(
// task_id = task.task_id,
// error = %e.as_report(),
// "Attempt to remove compaction task due to elapsed heartbeat failed. We will continue to track its heartbeat
// until we can successfully report its status",
// );
// }
// }
let expired_tasks: Vec<u64> = compactor_manager
.get_heartbeat_expired_tasks()
.into_iter()
.map(|task| task.task_id)
.collect();
if !expired_tasks.is_empty() {
tracing::info!(
expired_tasks = ?expired_tasks,
"Heartbeat expired compaction tasks detected. Attempting to cancel tasks.",
);
if let Err(e) = hummock_manager
.cancel_compact_task(
task.task_id,
.cancel_compact_tasks(
expired_tasks.clone(),
TaskStatus::HeartbeatCanceled,
)
.await
{
tracing::error!(
task_id = task.task_id,
expired_tasks = ?expired_tasks,
error = %e.as_report(),
"Attempt to remove compaction task due to elapsed heartbeat failed. We will continue to track its heartbeat
until we can successfully report its status",
Expand Down Expand Up @@ -2993,24 +3018,20 @@ impl HummockManager {
progress,
}) => {
let compactor_manager = hummock_manager.compactor_manager.clone();
let cancel_tasks = compactor_manager.update_task_heartbeats(&progress);
if let Some(compactor) = compactor_manager.get_compactor(context_id) {
// TODO: task cancellation can be batched
for task in cancel_tasks {
let cancel_tasks = compactor_manager.update_task_heartbeats(&progress).into_iter().map(|task|task.task_id).collect::<Vec<_>>();
if !cancel_tasks.is_empty() {
tracing::info!(
"Task with group_id {} task_id {} with context_id {} has expired due to lack of visible progress",
task.compaction_group_id,
task.task_id,
"Tasks cancel with task_ids {:?} with context_id {} has expired due to lack of visible progress",
cancel_tasks,
context_id,
);

if let Err(e) =
hummock_manager
.cancel_compact_task(task.task_id, TaskStatus::HeartbeatCanceled)
if let Err(e) = hummock_manager
.cancel_compact_tasks(cancel_tasks.clone(), TaskStatus::HeartbeatProgressCanceled)
.await
{
tracing::error!(
task_id = task.task_id,
error = %e.as_report(),
"Attempt to remove compaction task due to elapsed heartbeat failed. We will continue to track its heartbeat
until we can successfully report its status."
Expand All @@ -3020,13 +3041,45 @@ impl HummockManager {
// Forcefully cancel the task so that it terminates
// early on the compactor
// node.
let _ = compactor.cancel_task(task.task_id);
let _ = compactor.cancel_tasks(cancel_tasks.clone());
tracing::info!(
"CancelTask operation for task_id {} has been sent to node with context_id {}",
context_id,
task.task_id
"CancelTask operation for task_id {:?} has been sent to node with context_id {}",
cancel_tasks,
context_id
);
}

// for task in cancel_tasks {
// tracing::info!(
// "Task cancel with group_id {} task_id {} with context_id {} has expired due to lack of visible progress",
// task.compaction_group_id,
// task.task_id,
// context_id,
// );

// if let Err(e) =
// hummock_manager
// .cancel_compact_task(task.task_id, TaskStatus::HeartbeatProgressCanceled)
// .await
// {
// tracing::error!(
// task_id = task.task_id,
// error = %e.as_report(),
// "Attempt to remove compaction task due to elapsed heartbeat failed. We will continue to track its heartbeat
// until we can successfully report its status."
// );
// }

// // Forcefully cancel the task so that it terminates
// // early on the compactor
// // node.
// let _ = compactor.cancel_task(task.task_id);
// tracing::info!(
// "CancelTask operation for task_id {} has been sent to node with context_id {}",
// context_id,
// task.task_id
// );
// }
} else {
// Determine the validity of the compactor streaming rpc. When the compactor no longer exists in the manager, the stream will be removed.
// Tip: Connectivity to the compactor will be determined through the `send_event` operation. When send fails, it will be removed from the manager
Expand Down
2 changes: 1 addition & 1 deletion src/storage/src/hummock/compactor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,7 @@ pub fn start_compactor(
&request_sender,
);

continue 'start_stream;
continue 'consume_stream;
}

running_task_parallelism
Expand Down

0 comments on commit b90250c

Please sign in to comment.