diff --git a/.config/hakari.toml b/.config/hakari.toml index e3e58e80cbf16..50000f6cc7586 100644 --- a/.config/hakari.toml +++ b/.config/hakari.toml @@ -36,4 +36,6 @@ third-party = [ { name = "criterion" }, { name = "console" }, { name = "similar" }, + # FYI: https://github.com/risingwavelabs/risingwave/issues/12315 + { name = "tikv-jemalloc-sys", git = "https://github.com/risingwavelabs/jemallocator.git", rev = "64a2d9" }, ] diff --git a/Cargo.lock b/Cargo.lock index 9e73822d5a77d..75e4d922d5631 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10114,7 +10114,6 @@ dependencies = [ "subtle", "syn 1.0.109", "syn 2.0.33", - "tikv-jemalloc-sys", "tikv-jemallocator", "time", "time-macros", diff --git a/proto/hummock.proto b/proto/hummock.proto index db99cbe5f8509..04611907ebab0 100644 --- a/proto/hummock.proto +++ b/proto/hummock.proto @@ -387,8 +387,13 @@ message SubscribeCompactionEventRequest { // ReportTask provides the compact task to report to the meta. message ReportTask { - CompactTask compact_task = 2; + reserved 2; + reserved "compact_task"; map table_stats_change = 3; + + uint64 task_id = 4; + CompactTask.TaskStatus task_status = 5; + repeated SstableInfo sorted_output_ssts = 6; } // HeartBeat provides the progress status of all tasks on the Compactor. diff --git a/src/meta/src/hummock/manager/compaction_group_manager.rs b/src/meta/src/hummock/manager/compaction_group_manager.rs index 8fa1aea32115f..9af1f9e384e54 100644 --- a/src/meta/src/hummock/manager/compaction_group_manager.rs +++ b/src/meta/src/hummock/manager/compaction_group_manager.rs @@ -475,6 +475,7 @@ impl HummockManager { ))); } } + if table_ids.len() == parent_group.member_table_ids.len() { return Err(Error::CompactionGroup(format!( "invalid split attempt for group {}: all member tables are moved", @@ -593,8 +594,10 @@ impl HummockManager { new_compaction_group_id } }; + let mut current_version = versioning.current_version.clone(); let sst_split_info = current_version.apply_version_delta(&new_version_delta); + let mut branched_ssts = BTreeMapTransaction::new(&mut versioning.branched_ssts); let mut trx = Transaction::default(); new_version_delta.apply_to_txn(&mut trx)?; @@ -652,10 +655,17 @@ impl HummockManager { } } } - for mut task in canceled_tasks { - task.set_task_status(TaskStatus::ManualCanceled); + + for task in canceled_tasks { if !self - .report_compact_task_impl(&mut task, &mut compaction_guard, None) + .report_compact_task_impl( + task.task_id, + None, + TaskStatus::ManualCanceled, + vec![], + &mut compaction_guard, + None, + ) .await .unwrap_or(false) { diff --git a/src/meta/src/hummock/manager/mod.rs b/src/meta/src/hummock/manager/mod.rs index 99f0c41d696d3..e91f4565b9590 100644 --- a/src/meta/src/hummock/manager/mod.rs +++ b/src/meta/src/hummock/manager/mod.rs @@ -54,7 +54,7 @@ use risingwave_pb::hummock::{ version_update_payload, CompactTask, CompactTaskAssignment, CompactionConfig, GroupDelta, HummockPinnedSnapshot, HummockPinnedVersion, HummockSnapshot, HummockVersion, HummockVersionCheckpoint, HummockVersionDelta, HummockVersionDeltas, HummockVersionStats, - IntraLevelDelta, SubscribeCompactionEventRequest, TableOption, + IntraLevelDelta, SstableInfo, SubscribeCompactionEventRequest, TableOption, }; use risingwave_pb::meta::subscribe_response::{Info, Operation}; use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; @@ -837,7 +837,8 @@ impl HummockManager { return Ok(None); } - let can_trivial_move = matches!(selector.task_type(), compact_task::TaskType::Dynamic); + let can_trivial_move: bool = + matches!(selector.task_type(), compact_task::TaskType::Dynamic); let mut stats = LocalSelectorStatistic::default(); let member_table_ids = ¤t_version @@ -863,6 +864,7 @@ impl HummockManager { } Some(task) => task, }; + compact_task.watermark = watermark; compact_task.existing_table_ids = current_version .levels @@ -875,8 +877,15 @@ impl HummockManager { if is_trivial_reclaim { compact_task.set_task_status(TaskStatus::Success); - self.report_compact_task_impl(&mut compact_task, &mut compaction_guard, None) - .await?; + self.report_compact_task_impl( + task_id, + Some(compact_task.clone()), + TaskStatus::Success, + vec![], + &mut compaction_guard, + None, + ) + .await?; tracing::debug!( "TrivialReclaim for compaction group {}: remove {} sstables, cost time: {:?}", compaction_group_id, @@ -888,11 +897,20 @@ impl HummockManager { start_time.elapsed() ); } else if is_trivial_move && can_trivial_move { - compact_task.sorted_output_ssts = compact_task.input_ssts[0].table_infos.clone(); + // compact_task.sorted_output_ssts = compact_task.input_ssts[0].table_infos.clone(); // this task has been finished and `trivial_move_task` does not need to be schedule. compact_task.set_task_status(TaskStatus::Success); - self.report_compact_task_impl(&mut compact_task, &mut compaction_guard, None) - .await?; + compact_task.sorted_output_ssts = compact_task.input_ssts[0].table_infos.clone(); + self.report_compact_task_impl( + task_id, + Some(compact_task.clone()), + TaskStatus::Success, + compact_task.input_ssts[0].table_infos.clone(), + &mut compaction_guard, + None, + ) + .await?; + tracing::debug!( "TrivialMove for compaction group {}: pick up {} sstables in level {} to compact to target_level {} cost time: {:?}", compaction_group_id, @@ -1006,24 +1024,30 @@ impl HummockManager { } /// Cancels a compaction task no matter it's assigned or unassigned. - pub async fn cancel_compact_task( - &self, - compact_task: &mut CompactTask, - task_status: TaskStatus, - ) -> Result { - compact_task.set_task_status(task_status); + pub async fn cancel_compact_task(&self, task_id: u64, task_status: TaskStatus) -> Result { fail_point!("fp_cancel_compact_task", |_| Err(Error::MetaStore( anyhow::anyhow!("failpoint metastore err") ))); - self.cancel_compact_task_impl(compact_task).await + self.cancel_compact_task_impl(task_id, task_status).await } #[named] - pub async fn cancel_compact_task_impl(&self, compact_task: &mut CompactTask) -> Result { - assert!(CANCEL_STATUS_SET.contains(&compact_task.task_status())); + pub async fn cancel_compact_task_impl( + &self, + task_id: u64, + task_status: TaskStatus, + ) -> Result { + assert!(CANCEL_STATUS_SET.contains(&task_status)); let mut compaction_guard = write_lock!(self, compaction).await; let ret = self - .report_compact_task_impl(compact_task, &mut compaction_guard, None) + .report_compact_task_impl( + task_id, + None, + task_status, + vec![], + &mut compaction_guard, + None, + ) .await?; #[cfg(test)] { @@ -1110,19 +1134,21 @@ impl HummockManager { #[named] pub async fn report_compact_task( &self, - compact_task: &mut CompactTask, + task_id: u64, + task_status: TaskStatus, + sorted_output_ssts: Vec, table_stats_change: Option, ) -> Result { let mut guard = write_lock!(self, compaction).await; - let ret = self - .report_compact_task_impl(compact_task, &mut guard, table_stats_change) - .await?; - #[cfg(test)] - { - drop(guard); - self.check_state_consistency().await; - } - Ok(ret) + self.report_compact_task_impl( + task_id, + None, + task_status, + sorted_output_ssts, + &mut guard, + table_stats_change, + ) + .await } /// Finishes or cancels a compaction task, according to `task_status`. @@ -1135,7 +1161,10 @@ impl HummockManager { #[named] pub async fn report_compact_task_impl( &self, - compact_task: &mut CompactTask, + task_id: u64, + compact_task: Option, + task_status: TaskStatus, + sorted_output_ssts: Vec, compaction_guard: &mut RwLockWriteGuard<'_, Compaction>, table_stats_change: Option, ) -> Result { @@ -1147,18 +1176,28 @@ impl HummockManager { let mut compact_task_assignment = BTreeMapTransaction::new(&mut compaction.compact_task_assignment); - let is_trivial_reclaim = CompactStatus::is_trivial_reclaim(compact_task); - let is_trivial_move = CompactStatus::is_trivial_move_task(compact_task); - // remove task_assignment - if compact_task_assignment - .remove(compact_task.task_id) - .is_none() - && !(is_trivial_reclaim || is_trivial_move) + let mut compact_task = if let Some(input_task) = compact_task { + input_task + } else { + match compact_task_assignment.remove(task_id) { + Some(compact_task) => compact_task.compact_task.unwrap(), + None => { + tracing::warn!("{}", format!("compact task {} not found", task_id)); + return Ok(false); + } + } + }; + { - return Ok(false); + // apply result + compact_task.set_task_status(task_status); + compact_task.sorted_output_ssts = sorted_output_ssts; } + let is_trivial_reclaim = CompactStatus::is_trivial_reclaim(&compact_task); + let is_trivial_move = CompactStatus::is_trivial_move_task(&compact_task); + { // The compaction task is finished. let mut versioning_guard = write_lock!(self, versioning).await; @@ -1173,7 +1212,7 @@ impl HummockManager { match compact_statuses.get_mut(compact_task.compaction_group_id) { Some(mut compact_status) => { - compact_status.report_compact_task(compact_task); + compact_status.report_compact_task(&compact_task); } None => { compact_task.set_task_status(TaskStatus::InvalidGroupCanceled); @@ -1197,7 +1236,7 @@ impl HummockManager { let is_success = if let TaskStatus::Success = compact_task.task_status() { // if member_table_ids changes, the data of sstable may stale. let is_expired = - Self::is_compact_task_expired(compact_task, &versioning.branched_ssts); + Self::is_compact_task_expired(&compact_task, &versioning.branched_ssts); if is_expired { compact_task.set_task_status(TaskStatus::InputOutdatedCanceled); false @@ -1212,7 +1251,7 @@ impl HummockManager { compact_task.set_task_status(TaskStatus::InvalidGroupCanceled); warn!( "The task may be expired because of group split, task:\n {:?}", - compact_task_to_string(compact_task) + compact_task_to_string(&compact_task) ); } input_exist @@ -1228,7 +1267,7 @@ impl HummockManager { &mut hummock_version_deltas, &mut branched_ssts, ¤t_version, - compact_task, + &compact_task, deterministic_mode, ); let mut version_stats = VarTransaction::new(&mut versioning.version_stats); @@ -1298,7 +1337,7 @@ impl HummockManager { tracing::trace!( "Reported compaction task. {}. cost time: {:?}", - compact_task_to_string(compact_task), + compact_task_to_string(&compact_task), start_time.elapsed(), ); @@ -1477,6 +1516,7 @@ impl HummockManager { .into_iter() .map(|ExtendedSstableInfo { sst_info, .. }| sst_info) .collect_vec(); + let group_deltas = &mut new_version_delta .group_deltas .entry(compaction_group_id) @@ -1909,10 +1949,12 @@ impl HummockManager { Ok(()) } + #[cfg(any(test, feature = "test"))] pub fn compactor_manager_ref_for_test(&self) -> CompactorManagerRef { self.compactor_manager.clone() } + #[cfg(any(test, feature = "test"))] #[named] pub async fn compaction_task_from_assignment_for_test( &self, @@ -1923,6 +1965,28 @@ impl HummockManager { assignment_ref.get(&task_id).cloned() } + #[cfg(any(test, feature = "test"))] + #[named] + pub async fn report_compact_task_for_test( + &self, + task_id: u64, + compact_task: Option, + task_status: TaskStatus, + sorted_output_ssts: Vec, + table_stats_change: Option, + ) -> Result { + let mut guard = write_lock!(self, compaction).await; + self.report_compact_task_impl( + task_id, + compact_task, + task_status, + sorted_output_ssts, + &mut guard, + table_stats_change, + ) + .await + } + pub fn cluster_manager(&self) -> &ClusterManagerRef { &self.cluster_manager } @@ -2192,13 +2256,12 @@ impl HummockManager { // side, and meta is just used as a last resort to clean up the // tasks that compactor has expired. - // - for mut task in + for task in compactor_manager.get_expired_tasks(Some(INTERVAL_SEC)) { if let Err(e) = hummock_manager .cancel_compact_task( - &mut task, + task.task_id, TaskStatus::HeartbeatCanceled, ) .await @@ -2614,15 +2677,14 @@ impl HummockManager { }, RequestEvent::ReportTask(ReportTask { - compact_task, + task_id, + task_status, + sorted_output_ssts, table_stats_change }) => { - if let Some(mut compact_task) = compact_task { - if let Err(e) = hummock_manager - .report_compact_task(&mut compact_task, Some(table_stats_change)) + if let Err(e) = hummock_manager.report_compact_task(task_id, TaskStatus::from_i32(task_status).unwrap(), sorted_output_ssts, Some(table_stats_change)) .await { tracing::error!("report compact_tack fail {e:?}"); - } } }, @@ -2633,7 +2695,7 @@ impl HummockManager { let cancel_tasks = compactor_manager.update_task_heartbeats(&progress); // TODO: task cancellation can be batched - for mut task in cancel_tasks { + for task in cancel_tasks { tracing::info!( "Task with task_id {} with context_id {} has expired due to lack of visible progress", context_id, @@ -2642,7 +2704,7 @@ impl HummockManager { if let Err(e) = hummock_manager - .cancel_compact_task(&mut task, TaskStatus::HeartbeatCanceled) + .cancel_compact_task(task.task_id, TaskStatus::HeartbeatCanceled) .await { tracing::error!("Attempt to remove compaction task due to elapsed heartbeat failed. We will continue to track its heartbeat @@ -2824,6 +2886,7 @@ fn gen_version_delta<'a>( group_deltas.push(group_delta); version_delta.gc_object_ids.append(&mut gc_object_ids); version_delta.safe_epoch = std::cmp::max(old_version.safe_epoch, compact_task.watermark); + // Don't persist version delta generated by compaction to meta store in deterministic mode. // Because it will override existing version delta that has same ID generated in the data // ingestion phase. diff --git a/src/meta/src/hummock/manager/tests.rs b/src/meta/src/hummock/manager/tests.rs index 596149df3b8aa..44c90c7855ec9 100644 --- a/src/meta/src/hummock/manager/tests.rs +++ b/src/meta/src/hummock/manager/tests.rs @@ -189,7 +189,7 @@ async fn test_hummock_compaction_task() { .unwrap(); // Get a compaction task. - let mut compact_task = hummock_manager + let compact_task = hummock_manager .get_compact_task( StaticCompactionGroupId::StateDefault.into(), &mut default_level_selector(), @@ -209,12 +209,12 @@ async fn test_hummock_compaction_task() { // Cancel the task and succeed. assert!(hummock_manager - .cancel_compact_task(&mut compact_task, TaskStatus::ManualCanceled) + .cancel_compact_task(compact_task.task_id, TaskStatus::ManualCanceled) .await .unwrap()); // Get a compaction task. - let mut compact_task = hummock_manager + let compact_task = hummock_manager .get_compact_task( StaticCompactionGroupId::StateDefault.into(), &mut default_level_selector(), @@ -224,10 +224,9 @@ async fn test_hummock_compaction_task() { .unwrap(); assert_eq!(compact_task.get_task_id(), 3); // Finish the task and succeed. - compact_task.set_task_status(TaskStatus::Success); assert!(hummock_manager - .report_compact_task(&mut compact_task, None) + .report_compact_task(compact_task.task_id, TaskStatus::Success, vec![], None) .await .unwrap()); } @@ -847,15 +846,6 @@ async fn test_trigger_manual_compaction() { assert!(result.is_ok()); } - let task_id: u64 = 4; - let compact_task = hummock_manager - .compaction_task_from_assignment_for_test(task_id) - .await - .unwrap() - .compact_task - .unwrap(); - assert_eq!(task_id, compact_task.task_id); - { let option = ManualCompactionOption::default(); // all sst pending , test no compaction avail @@ -915,7 +905,7 @@ async fn test_hummock_compaction_task_heartbeat() { .unwrap(); // Get a compaction task. - let mut compact_task = hummock_manager + let compact_task = hummock_manager .get_compact_task( StaticCompactionGroupId::StateDefault.into(), &mut default_level_selector(), @@ -946,14 +936,18 @@ async fn test_hummock_compaction_task_heartbeat() { } // Cancel the task immediately and succeed. - compact_task.set_task_status(TaskStatus::ExecuteFailed); assert!(hummock_manager - .report_compact_task(&mut compact_task, None) + .report_compact_task( + compact_task.task_id, + TaskStatus::ExecuteFailed, + vec![], + None + ) .await .unwrap()); // Get a compaction task. - let mut compact_task = hummock_manager + let compact_task = hummock_manager .get_compact_task( StaticCompactionGroupId::StateDefault.into(), &mut default_level_selector(), @@ -965,14 +959,18 @@ async fn test_hummock_compaction_task_heartbeat() { assert_eq!(compact_task.get_task_id(), 3); // Cancel the task after heartbeat has triggered and fail. - compact_task.set_task_status(TaskStatus::ExecuteFailed); // do not send heartbeats to the task for 30s seconds (ttl = 1s, heartbeat check freq. = 1s) // default_interval = 30s tokio::time::sleep(std::time::Duration::from_secs(32)).await; assert!(!hummock_manager - .report_compact_task(&mut compact_task, None) + .report_compact_task( + compact_task.task_id, + TaskStatus::ExecuteFailed, + vec![], + None + ) .await .unwrap()); shutdown_tx.send(()).unwrap(); @@ -1193,7 +1191,7 @@ async fn test_version_stats() { .compactor_manager_ref_for_test() .add_compactor(worker_node.id); - let mut compact_task = hummock_manager + let compact_task = hummock_manager .get_compact_task( StaticCompactionGroupId::StateDefault.into(), &mut default_level_selector(), @@ -1201,7 +1199,7 @@ async fn test_version_stats() { .await .unwrap() .unwrap(); - compact_task.task_status = TaskStatus::Success as _; + // compact_task.task_status = TaskStatus::Success as _; let compact_table_stats_change = TableStatsMap::from([ ( 2, @@ -1222,7 +1220,9 @@ async fn test_version_stats() { ]); hummock_manager .report_compact_task( - &mut compact_task, + compact_task.task_id, + TaskStatus::Success, + vec![], Some(to_prost_table_stats_map(compact_table_stats_change)), ) .await @@ -1632,15 +1632,17 @@ async fn test_split_compaction_group_trivial_expired() { .register_table_ids(&[(102, 2)]) .await .unwrap(); - let mut task = hummock_manager + let task = hummock_manager .get_compact_task(2, &mut default_level_selector()) .await .unwrap() .unwrap(); + hummock_manager .split_compaction_group(2, &[100]) .await .unwrap(); + let mut selector: Box = Box::::default(); let reclaim_task = hummock_manager .get_compact_task_impl(2, &mut selector) @@ -1666,30 +1668,32 @@ async fn test_split_compaction_group_trivial_expired() { vec![100] ); - let mut task2 = hummock_manager + let task2 = hummock_manager .get_compact_task(new_group_id, &mut default_level_selector()) .await .unwrap() .unwrap(); - task2.sorted_output_ssts = vec![SstableInfo { - object_id: 12, - sst_id: 12, - key_range: None, - table_ids: vec![100], - min_epoch: 20, - max_epoch: 20, - ..Default::default() - }]; - // delete all reference of sst-10 - task2.task_status = TaskStatus::Success as i32; + let ret = hummock_manager - .report_compact_task(&mut task2, None) + .report_compact_task( + task2.task_id, + TaskStatus::Success, + vec![SstableInfo { + object_id: 12, + sst_id: 12, + key_range: None, + table_ids: vec![100], + min_epoch: 20, + max_epoch: 20, + ..Default::default() + }], + None, + ) .await .unwrap(); assert!(ret); - task.task_status = TaskStatus::Success as i32; let ret = hummock_manager - .report_compact_task(&mut task, None) + .report_compact_task(task.task_id, TaskStatus::Success, vec![], None) .await .unwrap(); // the task has been canceld @@ -1750,37 +1754,41 @@ async fn test_split_compaction_group_on_demand_bottom_levels() { .await .unwrap(); // Construct data via manual compaction - let mut compaction_task = get_manual_compact_task(&hummock_manager, context_id).await; + let compaction_task = get_manual_compact_task(&hummock_manager, context_id).await; let base_level: usize = 6; assert_eq!(compaction_task.input_ssts[0].table_infos.len(), 1); assert_eq!(compaction_task.target_level, base_level as u32); - compaction_task.sorted_output_ssts = vec![ - SstableInfo { - object_id: 11, - sst_id: 11, - table_ids: vec![100, 101], - key_range: Some(KeyRange { - left: iterator_test_key_of_epoch(1, 1, 1), - right: iterator_test_key_of_epoch(1, 1, 1), - right_exclusive: false, - }), - ..Default::default() - }, - SstableInfo { - object_id: 12, - sst_id: 12, - table_ids: vec![100], - key_range: Some(KeyRange { - left: iterator_test_key_of_epoch(1, 2, 2), - right: iterator_test_key_of_epoch(1, 2, 2), - right_exclusive: false, - }), - ..Default::default() - }, - ]; - compaction_task.task_status = TaskStatus::Success.into(); + assert!(hummock_manager - .report_compact_task(&mut compaction_task, None) + .report_compact_task( + compaction_task.task_id, + TaskStatus::Success, + vec![ + SstableInfo { + object_id: 11, + sst_id: 11, + table_ids: vec![100, 101], + key_range: Some(KeyRange { + left: iterator_test_key_of_epoch(1, 1, 1), + right: iterator_test_key_of_epoch(1, 1, 1), + right_exclusive: false, + }), + ..Default::default() + }, + SstableInfo { + object_id: 12, + sst_id: 12, + table_ids: vec![100], + key_range: Some(KeyRange { + left: iterator_test_key_of_epoch(1, 2, 2), + right: iterator_test_key_of_epoch(1, 2, 2), + right_exclusive: false, + }), + ..Default::default() + }, + ], + None + ) .await .unwrap()); let current_version = hummock_manager.get_current_version().await; @@ -1911,7 +1919,7 @@ async fn test_compaction_task_expiration_due_to_split_group() { .await .unwrap(); - let mut compaction_task = get_manual_compact_task(&hummock_manager, context_id).await; + let compaction_task = get_manual_compact_task(&hummock_manager, context_id).await; assert_eq!(compaction_task.input_ssts[0].table_infos.len(), 2); hummock_manager .split_compaction_group(2, &[100]) @@ -1919,9 +1927,9 @@ async fn test_compaction_task_expiration_due_to_split_group() { .unwrap(); let version_1 = hummock_manager.get_current_version().await; - compaction_task.task_status = TaskStatus::Success.into(); + // compaction_task.task_status = TaskStatus::Success.into(); assert!(!hummock_manager - .report_compact_task(&mut compaction_task, None) + .report_compact_task(compaction_task.task_id, TaskStatus::Success, vec![], None) .await .unwrap()); let version_2 = hummock_manager.get_current_version().await; @@ -1930,11 +1938,10 @@ async fn test_compaction_task_expiration_due_to_split_group() { "version should not change because compaction task has been cancelled" ); - let mut compaction_task = get_manual_compact_task(&hummock_manager, context_id).await; + let compaction_task = get_manual_compact_task(&hummock_manager, context_id).await; assert_eq!(compaction_task.input_ssts[0].table_infos.len(), 2); - compaction_task.task_status = TaskStatus::Success.into(); hummock_manager - .report_compact_task(&mut compaction_task, None) + .report_compact_task(compaction_task.task_id, TaskStatus::Success, vec![], None) .await .unwrap(); @@ -1968,18 +1975,21 @@ async fn test_move_tables_between_compaction_group() { .await .unwrap(); // Construct data via manual compaction - let mut compaction_task = get_manual_compact_task(&hummock_manager, context_id).await; + let compaction_task = get_manual_compact_task(&hummock_manager, context_id).await; let base_level: usize = 6; assert_eq!(compaction_task.input_ssts[0].table_infos.len(), 1); assert_eq!(compaction_task.target_level, base_level as u32); - compaction_task.sorted_output_ssts = vec![ - gen_sstable_info(11, 1, vec![100]), - gen_sstable_info(12, 2, vec![100, 101]), - gen_sstable_info(13, 3, vec![101, 102]), - ]; - compaction_task.task_status = TaskStatus::Success.into(); assert!(hummock_manager - .report_compact_task(&mut compaction_task, None) + .report_compact_task( + compaction_task.task_id, + TaskStatus::Success, + vec![ + gen_sstable_info(11, 1, vec![100]), + gen_sstable_info(12, 2, vec![100, 101]), + gen_sstable_info(13, 3, vec![101, 102]), + ], + None + ) .await .unwrap()); let sst_2 = gen_extend_sstable_info(14, 2, 1, vec![101, 102]); @@ -2023,7 +2033,7 @@ async fn test_move_tables_between_compaction_group() { let mut selector: Box = Box::::default(); - let mut compaction_task = hummock_manager + let compaction_task = hummock_manager .get_compact_task(2, &mut selector) .await .unwrap() @@ -2031,11 +2041,14 @@ async fn test_move_tables_between_compaction_group() { assert_eq!(compaction_task.existing_table_ids, vec![101, 102]); assert_eq!(compaction_task.input_ssts[0].table_infos.len(), 1); assert_eq!(compaction_task.input_ssts[0].table_infos[0].object_id, 12); - compaction_task.sorted_output_ssts = vec![gen_sstable_info(20, 2, vec![101])]; - compaction_task.task_status = TaskStatus::Success.into(); let ret = hummock_manager - .report_compact_task(&mut compaction_task, None) + .report_compact_task( + compaction_task.task_id, + TaskStatus::Success, + vec![gen_sstable_info(20, 2, vec![101])], + None, + ) .await .unwrap(); assert!(ret); diff --git a/src/meta/src/hummock/manager/worker.rs b/src/meta/src/hummock/manager/worker.rs index 8a43ddc87247b..9f9b0fd911bd4 100644 --- a/src/meta/src/hummock/manager/worker.rs +++ b/src/meta/src/hummock/manager/worker.rs @@ -98,58 +98,29 @@ impl HummockManager { let retry_strategy = ExponentialBackoff::from_millis(10) .max_delay(Duration::from_secs(60)) .map(jitter); - match notification { - LocalNotification::WorkerNodeDeleted(worker_node) => { - if worker_node.get_type().unwrap() == WorkerType::Compactor { - self.compactor_manager.remove_compactor(worker_node.id); - } - tokio_retry::RetryIf::spawn( - retry_strategy.clone(), - || async { - if let Err(err) = self.release_contexts(vec![worker_node.id]).await { - tracing::warn!( - "Failed to release hummock context {}. {}. Will retry.", - worker_node.id, - err - ); - return Err(err); - } - Ok(()) - }, - RetryableError::default(), - ) - .await - .expect("retry until success"); - tracing::info!("Released hummock context {}", worker_node.id); - sync_point!("AFTER_RELEASE_HUMMOCK_CONTEXTS_ASYNC"); + if let LocalNotification::WorkerNodeDeleted(worker_node) = notification { + if worker_node.get_type().unwrap() == WorkerType::Compactor { + self.compactor_manager.remove_compactor(worker_node.id); } - // TODO move `CompactionTaskNeedCancel` to `handle_hummock_manager_event` - // TODO extract retry boilerplate code - LocalNotification::CompactionTaskNeedCancel(compact_task) => { - let task_id = compact_task.task_id; - tokio_retry::RetryIf::spawn( - retry_strategy.clone(), - || async { - let mut compact_task_mut = compact_task.clone(); - if let Err(err) = self.cancel_compact_task_impl(&mut compact_task_mut).await - { - tracing::warn!( - "Failed to cancel compaction task {}. {}. Will retry.", - compact_task.task_id, - err - ); - return Err(err); - } - Ok(()) - }, - RetryableError::default(), - ) - .await - .expect("retry until success"); - tracing::info!("Cancelled compaction task {}", task_id); - sync_point!("AFTER_CANCEL_COMPACTION_TASK_ASYNC"); - } - _ => {} + tokio_retry::RetryIf::spawn( + retry_strategy.clone(), + || async { + if let Err(err) = self.release_contexts(vec![worker_node.id]).await { + tracing::warn!( + "Failed to release hummock context {}. {}. Will retry.", + worker_node.id, + err + ); + return Err(err); + } + Ok(()) + }, + RetryableError::default(), + ) + .await + .expect("retry until success"); + tracing::info!("Released hummock context {}", worker_node.id); + sync_point!("AFTER_RELEASE_HUMMOCK_CONTEXTS_ASYNC"); } } } diff --git a/src/meta/src/hummock/test_utils.rs b/src/meta/src/hummock/test_utils.rs index 632d56ca2c400..70c055d387c85 100644 --- a/src/meta/src/hummock/test_utils.rs +++ b/src/meta/src/hummock/test_utils.rs @@ -114,10 +114,15 @@ pub async fn add_test_tables( .unwrap(); assert_eq!(compactor.context_id(), context_id); } - compact_task.sorted_output_ssts = test_tables_2.clone(); - compact_task.set_task_status(TaskStatus::Success); + let ret = hummock_manager - .report_compact_task(&mut compact_task, None) + .report_compact_task_for_test( + compact_task.task_id, + Some(compact_task), + TaskStatus::Success, + test_tables_2.clone(), + None, + ) .await .unwrap(); assert!(ret); diff --git a/src/meta/src/manager/notification.rs b/src/meta/src/manager/notification.rs index 5e4172911ba70..69ab29f835699 100644 --- a/src/meta/src/manager/notification.rs +++ b/src/meta/src/manager/notification.rs @@ -18,7 +18,6 @@ use std::sync::Arc; use risingwave_common::system_param::reader::SystemParamsReader; use risingwave_pb::common::{WorkerNode, WorkerType}; -use risingwave_pb::hummock::CompactTask; use risingwave_pb::meta::relation::RelationInfo; use risingwave_pb::meta::subscribe_response::{Info, Operation}; use risingwave_pb::meta::{ @@ -41,7 +40,6 @@ pub type NotificationVersion = u64; pub enum LocalNotification { WorkerNodeDeleted(WorkerNode), WorkerNodeActivated(WorkerNode), - CompactionTaskNeedCancel(CompactTask), SystemParamsChange(SystemParamsReader), FragmentMappingsUpsert(Vec), FragmentMappingsDelete(Vec), diff --git a/src/storage/hummock_test/src/compactor_tests.rs b/src/storage/hummock_test/src/compactor_tests.rs index 5864fa9c0a484..3b289b6511f89 100644 --- a/src/storage/hummock_test/src/compactor_tests.rs +++ b/src/storage/hummock_test/src/compactor_tests.rs @@ -160,6 +160,7 @@ pub(crate) mod tests { .await .unwrap() .uncommitted_ssts; + hummock_meta_client.commit_epoch(epoch, ssts).await.unwrap(); } } @@ -221,7 +222,6 @@ pub(crate) mod tests { Default::default(), ) .await; - let rpc_filter_key_extractor_manager = match storage.filter_key_extractor_manager().clone() { FilterKeyExtractorManager::RpcFilterKeyExtractorManager( @@ -285,7 +285,7 @@ pub(crate) mod tests { compact_task.current_epoch_time = 0; let (_tx, rx) = tokio::sync::oneshot::channel(); - let (mut result_task, task_stats) = compact( + let (result_task, task_stats) = compact( compact_ctx.clone(), compact_task.clone(), rx, @@ -294,7 +294,13 @@ pub(crate) mod tests { .await; hummock_manager_ref - .report_compact_task(&mut result_task, Some(to_prost_table_stats_map(task_stats))) + .report_compact_task_for_test( + result_task.task_id, + Some(compact_task), + result_task.task_status(), + result_task.sorted_output_ssts, + Some(to_prost_table_stats_map(task_stats)), + ) .await .unwrap(); } @@ -434,7 +440,7 @@ pub(crate) mod tests { { // 3. compact let (_tx, rx) = tokio::sync::oneshot::channel(); - let (mut result_task, task_stats) = compact( + let (result_task, task_stats) = compact( compact_ctx.clone(), compact_task.clone(), rx, @@ -443,7 +449,12 @@ pub(crate) mod tests { .await; hummock_manager_ref - .report_compact_task(&mut result_task, Some(to_prost_table_stats_map(task_stats))) + .report_compact_task( + result_task.task_id, + result_task.task_status(), + result_task.sorted_output_ssts, + Some(to_prost_table_stats_map(task_stats)), + ) .await .unwrap(); } @@ -601,6 +612,7 @@ pub(crate) mod tests { ) .await .unwrap(); + assert!(compact_task.is_none()); // 3. get the latest version and check @@ -754,7 +766,7 @@ pub(crate) mod tests { // 4. compact let (_tx, rx) = tokio::sync::oneshot::channel(); - let (mut result_task, task_stats) = compact( + let (result_task, task_stats) = compact( compact_ctx, compact_task.clone(), rx, @@ -763,7 +775,12 @@ pub(crate) mod tests { .await; hummock_manager_ref - .report_compact_task(&mut result_task, Some(to_prost_table_stats_map(task_stats))) + .report_compact_task( + result_task.task_id, + result_task.task_status(), + result_task.sorted_output_ssts, + Some(to_prost_table_stats_map(task_stats)), + ) .await .unwrap(); @@ -942,7 +959,7 @@ pub(crate) mod tests { // 3. compact let (_tx, rx) = tokio::sync::oneshot::channel(); - let (mut result_task, task_stats) = compact( + let (result_task, task_stats) = compact( compact_ctx, compact_task.clone(), rx, @@ -951,7 +968,12 @@ pub(crate) mod tests { .await; hummock_manager_ref - .report_compact_task(&mut result_task, Some(to_prost_table_stats_map(task_stats))) + .report_compact_task( + result_task.task_id, + result_task.task_status(), + result_task.sorted_output_ssts, + Some(to_prost_table_stats_map(task_stats)), + ) .await .unwrap(); @@ -1127,7 +1149,7 @@ pub(crate) mod tests { // 3. compact let (_tx, rx) = tokio::sync::oneshot::channel(); - let (mut result_task, task_stats) = compact( + let (result_task, task_stats) = compact( compact_ctx, compact_task.clone(), rx, @@ -1136,7 +1158,12 @@ pub(crate) mod tests { .await; hummock_manager_ref - .report_compact_task(&mut result_task, Some(to_prost_table_stats_map(task_stats))) + .report_compact_task( + result_task.task_id, + result_task.task_status(), + result_task.sorted_output_ssts, + Some(to_prost_table_stats_map(task_stats)), + ) .await .unwrap(); @@ -1285,7 +1312,7 @@ pub(crate) mod tests { // 3. compact let (_tx, rx) = tokio::sync::oneshot::channel(); - let (mut result_task, task_stats) = compact( + let (result_task, task_stats) = compact( compact_ctx, compact_task.clone(), rx, @@ -1294,7 +1321,12 @@ pub(crate) mod tests { .await; hummock_manager_ref - .report_compact_task(&mut result_task, Some(to_prost_table_stats_map(task_stats))) + .report_compact_task( + result_task.task_id, + result_task.task_status(), + result_task.sorted_output_ssts, + Some(to_prost_table_stats_map(task_stats)), + ) .await .unwrap(); diff --git a/src/storage/hummock_test/src/sync_point_tests.rs b/src/storage/hummock_test/src/sync_point_tests.rs index 7194dd2d963ea..c75f7d8410581 100644 --- a/src/storage/hummock_test/src/sync_point_tests.rs +++ b/src/storage/hummock_test/src/sync_point_tests.rs @@ -179,54 +179,6 @@ async fn test_syncpoints_test_failpoints_fetch_ids() { } } -#[tokio::test] -#[cfg(feature = "sync_point")] -#[serial] -async fn test_syncpoints_test_local_notification_receiver() { - let (env, hummock_manager, _cluster_manager, worker_node) = setup_compute_env(80).await; - let context_id = worker_node.id; - - register_table_ids_to_compaction_group( - hummock_manager.as_ref(), - &[1], - StaticCompactionGroupId::StateDefault.into(), - ) - .await; - // Test cancel compaction task - let _sst_infos = add_ssts(1, hummock_manager.as_ref(), context_id).await; - let mut task = hummock_manager - .get_compact_task( - StaticCompactionGroupId::StateDefault.into(), - &mut default_level_selector(), - ) - .await - .unwrap() - .unwrap(); - task.task_status = TaskStatus::ManualCanceled as i32; - assert_eq!(hummock_manager.list_all_tasks_ids().await.len(), 1); - env.notification_manager() - .notify_local_subscribers(LocalNotification::CompactionTaskNeedCancel(task)) - .await; - sync_point::wait_timeout( - "AFTER_CANCEL_COMPACTION_TASK_ASYNC", - Duration::from_secs(10), - ) - .await - .unwrap(); - assert_eq!(hummock_manager.list_all_tasks_ids().await.len(), 0); - - // Test release hummock contexts - env.notification_manager() - .notify_local_subscribers(LocalNotification::WorkerNodeDeleted(worker_node)) - .await; - sync_point::wait_timeout( - "AFTER_RELEASE_HUMMOCK_CONTEXTS_ASYNC", - Duration::from_secs(10), - ) - .await - .unwrap(); -} - pub async fn compact_once( hummock_manager_ref: HummockManagerRef, compact_ctx: CompactorContext, @@ -247,12 +199,15 @@ pub async fn compact_once( .unwrap() .unwrap(); compact_task.gc_delete_keys = false; + hummock_manager_ref + .set_assignment_for_test(compact_task.clone()) + .await; let compaction_filter_flag = CompactionFilterFlag::STATE_CLEAN; compact_task.compaction_filter_mask = compaction_filter_flag.bits(); // 3. compact let (_tx, rx) = tokio::sync::oneshot::channel(); - let (mut result_task, task_stats) = compact( + let (result_task, task_stats) = compact( compact_ctx, compact_task.clone(), rx, @@ -261,7 +216,12 @@ pub async fn compact_once( .await; hummock_manager_ref - .report_compact_task(&mut result_task, Some(to_prost_table_stats_map(task_stats))) + .report_compact_task( + result_task.task_id, + result_task.task_status(), + result_task.sorted_output_ssts, + Some(to_prost_table_stats_map(task_stats)), + ) .await .unwrap(); } diff --git a/src/storage/src/hummock/compactor/mod.rs b/src/storage/src/hummock/compactor/mod.rs index c9d9d43c38785..c10e85a2da6d2 100644 --- a/src/storage/src/hummock/compactor/mod.rs +++ b/src/storage/src/hummock/compactor/mod.rs @@ -518,7 +518,9 @@ pub fn start_compactor( if let Err(e) = request_sender.send(SubscribeCompactionEventRequest { event: Some(RequestEvent::ReportTask( ReportTask { - compact_task: Some(compact_task), + task_id: compact_task.task_id, + task_status: compact_task.task_status, + sorted_output_ssts: compact_task.sorted_output_ssts, table_stats_change:to_prost_table_stats_map(table_stats), } )), diff --git a/src/workspace-hack/Cargo.toml b/src/workspace-hack/Cargo.toml index 63cf30bf854df..cec5ff69ec8fa 100644 --- a/src/workspace-hack/Cargo.toml +++ b/src/workspace-hack/Cargo.toml @@ -228,7 +228,6 @@ rdkafka-sys = { git = "https://github.com/MaterializeInc/rust-rdkafka", rev = "8 rustix = { version = "0.38", features = ["fs", "termios"] } rustls = { version = "0.21", features = ["dangerous_configuration"] } serde_json = { version = "1", default-features = false, features = ["raw_value"] } -tikv-jemalloc-sys = { git = "https://github.com/risingwavelabs/jemallocator.git", rev = "64a2d9", features = ["profiling", "stats", "unprefixed_malloc_on_supported_platforms"] } tikv-jemallocator = { git = "https://github.com/risingwavelabs/jemallocator.git", rev = "64a2d9", features = ["profiling", "stats", "unprefixed_malloc_on_supported_platforms"] } tower-http = { version = "0.4", features = ["add-extension", "cors", "fs"] } zstd-sys = { version = "2", features = ["std"] } @@ -246,7 +245,6 @@ rdkafka-sys = { git = "https://github.com/MaterializeInc/rust-rdkafka", rev = "8 rustix = { version = "0.38", features = ["fs", "termios"] } rustls = { version = "0.21", features = ["dangerous_configuration"] } serde_json = { version = "1", default-features = false, features = ["raw_value"] } -tikv-jemalloc-sys = { git = "https://github.com/risingwavelabs/jemallocator.git", rev = "64a2d9", features = ["profiling", "stats", "unprefixed_malloc_on_supported_platforms"] } tikv-jemallocator = { git = "https://github.com/risingwavelabs/jemallocator.git", rev = "64a2d9", features = ["profiling", "stats", "unprefixed_malloc_on_supported_platforms"] } tower-http = { version = "0.4", features = ["add-extension", "cors", "fs"] } zstd-sys = { version = "2", features = ["std"] } @@ -264,7 +262,6 @@ rdkafka-sys = { git = "https://github.com/MaterializeInc/rust-rdkafka", rev = "8 rustix = { version = "0.38", features = ["fs", "termios"] } rustls = { version = "0.21", features = ["dangerous_configuration"] } serde_json = { version = "1", default-features = false, features = ["raw_value"] } -tikv-jemalloc-sys = { git = "https://github.com/risingwavelabs/jemallocator.git", rev = "64a2d9", features = ["profiling", "stats", "unprefixed_malloc_on_supported_platforms"] } tikv-jemallocator = { git = "https://github.com/risingwavelabs/jemallocator.git", rev = "64a2d9", features = ["profiling", "stats", "unprefixed_malloc_on_supported_platforms"] } tower-http = { version = "0.4", features = ["add-extension", "cors", "fs"] } zstd-sys = { version = "2", features = ["std"] } @@ -282,7 +279,6 @@ rdkafka-sys = { git = "https://github.com/MaterializeInc/rust-rdkafka", rev = "8 rustix = { version = "0.38", features = ["fs", "termios"] } rustls = { version = "0.21", features = ["dangerous_configuration"] } serde_json = { version = "1", default-features = false, features = ["raw_value"] } -tikv-jemalloc-sys = { git = "https://github.com/risingwavelabs/jemallocator.git", rev = "64a2d9", features = ["profiling", "stats", "unprefixed_malloc_on_supported_platforms"] } tikv-jemallocator = { git = "https://github.com/risingwavelabs/jemallocator.git", rev = "64a2d9", features = ["profiling", "stats", "unprefixed_malloc_on_supported_platforms"] } tower-http = { version = "0.4", features = ["add-extension", "cors", "fs"] } zstd-sys = { version = "2", features = ["std"] } @@ -300,7 +296,6 @@ rdkafka-sys = { git = "https://github.com/MaterializeInc/rust-rdkafka", rev = "8 rustix = { version = "0.38", features = ["fs", "termios"] } rustls = { version = "0.21", features = ["dangerous_configuration"] } serde_json = { version = "1", default-features = false, features = ["raw_value"] } -tikv-jemalloc-sys = { git = "https://github.com/risingwavelabs/jemallocator.git", rev = "64a2d9", features = ["profiling", "stats", "unprefixed_malloc_on_supported_platforms"] } tikv-jemallocator = { git = "https://github.com/risingwavelabs/jemallocator.git", rev = "64a2d9", features = ["profiling", "stats", "unprefixed_malloc_on_supported_platforms"] } tower-http = { version = "0.4", features = ["add-extension", "cors", "fs"] } zstd-sys = { version = "2", features = ["std"] } @@ -318,7 +313,6 @@ rdkafka-sys = { git = "https://github.com/MaterializeInc/rust-rdkafka", rev = "8 rustix = { version = "0.38", features = ["fs", "termios"] } rustls = { version = "0.21", features = ["dangerous_configuration"] } serde_json = { version = "1", default-features = false, features = ["raw_value"] } -tikv-jemalloc-sys = { git = "https://github.com/risingwavelabs/jemallocator.git", rev = "64a2d9", features = ["profiling", "stats", "unprefixed_malloc_on_supported_platforms"] } tikv-jemallocator = { git = "https://github.com/risingwavelabs/jemallocator.git", rev = "64a2d9", features = ["profiling", "stats", "unprefixed_malloc_on_supported_platforms"] } tower-http = { version = "0.4", features = ["add-extension", "cors", "fs"] } zstd-sys = { version = "2", features = ["std"] } @@ -336,7 +330,6 @@ rdkafka-sys = { git = "https://github.com/MaterializeInc/rust-rdkafka", rev = "8 rustix = { version = "0.38", features = ["fs", "termios"] } rustls = { version = "0.21", features = ["dangerous_configuration"] } serde_json = { version = "1", default-features = false, features = ["raw_value"] } -tikv-jemalloc-sys = { git = "https://github.com/risingwavelabs/jemallocator.git", rev = "64a2d9", features = ["profiling", "stats", "unprefixed_malloc_on_supported_platforms"] } tikv-jemallocator = { git = "https://github.com/risingwavelabs/jemallocator.git", rev = "64a2d9", features = ["profiling", "stats", "unprefixed_malloc_on_supported_platforms"] } tower-http = { version = "0.4", features = ["add-extension", "cors", "fs"] } zstd-sys = { version = "2", features = ["std"] } @@ -354,7 +347,6 @@ rdkafka-sys = { git = "https://github.com/MaterializeInc/rust-rdkafka", rev = "8 rustix = { version = "0.38", features = ["fs", "termios"] } rustls = { version = "0.21", features = ["dangerous_configuration"] } serde_json = { version = "1", default-features = false, features = ["raw_value"] } -tikv-jemalloc-sys = { git = "https://github.com/risingwavelabs/jemallocator.git", rev = "64a2d9", features = ["profiling", "stats", "unprefixed_malloc_on_supported_platforms"] } tikv-jemallocator = { git = "https://github.com/risingwavelabs/jemallocator.git", rev = "64a2d9", features = ["profiling", "stats", "unprefixed_malloc_on_supported_platforms"] } tower-http = { version = "0.4", features = ["add-extension", "cors", "fs"] } zstd-sys = { version = "2", features = ["std"] }