From a1d97fcae278ca63c2036a19bb0fefa11921f2a5 Mon Sep 17 00:00:00 2001 From: Li0k Date: Sun, 4 Feb 2024 12:02:23 +0800 Subject: [PATCH] refactor(storage): refactor compaction event loop (#14631) --- src/meta/node/src/server.rs | 2 +- src/meta/src/hummock/manager/mod.rs | 350 ++++++++++++++++------------ 2 files changed, 198 insertions(+), 154 deletions(-) diff --git a/src/meta/node/src/server.rs b/src/meta/node/src/server.rs index f8bfd4f8b4079..9f06b8deb51ae 100644 --- a/src/meta/node/src/server.rs +++ b/src/meta/node/src/server.rs @@ -688,7 +688,7 @@ pub async fn start_service_as_election_leader( )); } sub_tasks.push(HummockManager::hummock_timer_task(hummock_manager.clone())); - sub_tasks.push(HummockManager::compaction_event_loop( + sub_tasks.extend(HummockManager::compaction_event_loop( hummock_manager, compactor_streams_change_rx, )); diff --git a/src/meta/src/hummock/manager/mod.rs b/src/meta/src/hummock/manager/mod.rs index f43408df23d44..8984aff7af042 100644 --- a/src/meta/src/hummock/manager/mod.rs +++ b/src/meta/src/hummock/manager/mod.rs @@ -24,7 +24,7 @@ use arc_swap::ArcSwap; use bytes::Bytes; use fail::fail_point; use function_name::named; -use futures::future::Either; +use futures::future::{Either, Shared}; use futures::stream::{BoxStream, FuturesUnordered}; use futures::{FutureExt, StreamExt}; use itertools::Itertools; @@ -54,7 +54,7 @@ use risingwave_pb::hummock::compact_task::{self, TaskStatus, TaskType}; use risingwave_pb::hummock::group_delta::DeltaType; use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config; use risingwave_pb::hummock::subscribe_compaction_event_request::{ - Event as RequestEvent, HeartBeat, PullTask, ReportTask, + self, Event as RequestEvent, HeartBeat, PullTask, ReportTask, }; use risingwave_pb::hummock::subscribe_compaction_event_response::{ Event as ResponseEvent, PullTaskAck, @@ -67,8 +67,8 @@ use risingwave_pb::hummock::{ use risingwave_pb::meta::subscribe_response::{Info, Operation}; use rw_futures_util::{pending_on_none, select_all}; use thiserror_ext::AsReport; -use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; -use tokio::sync::oneshot::Sender; +use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; +use tokio::sync::oneshot::{Receiver as OneShotReceiver, Sender}; use tokio::sync::RwLockWriteGuard; use tokio::task::JoinHandle; use tokio_stream::wrappers::IntervalStream; @@ -241,14 +241,6 @@ pub static CANCEL_STATUS_SET: LazyLock> = LazyLock::new(|| { .collect() }); -#[derive(Debug)] -pub enum CompactionResumeTrigger { - /// The addition (re-subscription) of compactors - CompactorAddition { context_id: HummockContextId }, - /// A compaction task is reported when all compactors are not idle. - TaskReport { original_task_num: usize }, -} - pub struct CommitEpochInfo { pub sstables: Vec, pub new_table_watermarks: HashMap, @@ -2694,18 +2686,37 @@ impl HummockManager { *self.group_to_table_vnode_partition.write() = group_to_table_vnode_partition; } - #[named] pub fn compaction_event_loop( hummock_manager: Arc, mut compactor_streams_change_rx: UnboundedReceiver<( u32, Streaming, )>, - ) -> (JoinHandle<()>, Sender<()>) { + ) -> Vec<(JoinHandle<()>, Sender<()>)> { let mut compactor_request_streams = FuturesUnordered::new(); let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel(); + let (shutdown_tx_dedicated, shutdown_rx_dedicated) = tokio::sync::oneshot::channel(); let shutdown_rx_shared = shutdown_rx.shared(); - let mut compaction_selectors = init_selectors(); + let shutdown_rx_dedicated_shared = shutdown_rx_dedicated.shared(); + + let (tx, rx) = unbounded_channel(); + + let mut join_handle_vec = Vec::default(); + + let hummock_manager_dedicated = hummock_manager.clone(); + let compact_task_event_handler_join_handle = tokio::spawn(async move { + Self::compact_task_dedicated_event_handler( + hummock_manager_dedicated, + rx, + shutdown_rx_dedicated_shared, + ) + .await; + }); + + join_handle_vec.push(( + compact_task_event_handler_join_handle, + shutdown_tx_dedicated, + )); let join_handle = tokio::spawn(async move { let push_stream = @@ -2720,10 +2731,10 @@ impl HummockManager { }; let mut event_loop_iteration_now = Instant::now(); + loop { let shutdown_rx_shared = shutdown_rx_shared.clone(); - - // report + let hummock_manager = hummock_manager.clone(); hummock_manager .metrics .compaction_event_loop_iteration_latency @@ -2731,9 +2742,7 @@ impl HummockManager { event_loop_iteration_now = Instant::now(); tokio::select! { - _ = shutdown_rx_shared => { - return; - }, + _ = shutdown_rx_shared => { return; }, compactor_stream = compactor_streams_change_rx.recv() => { if let Some((context_id, stream)) = compactor_stream { @@ -2744,23 +2753,20 @@ impl HummockManager { result = pending_on_none(compactor_request_streams.next()) => { let mut compactor_alive = true; - let (context_id, compactor_stream_req) = result; + + let (context_id, compactor_stream_req): (_, (std::option::Option>, _)) = result; let (event, create_at, stream) = match compactor_stream_req { (Some(Ok(req)), stream) => { (req.event.unwrap(), req.create_at, stream) } (Some(Err(err)), _stream) => { - tracing::warn!(error = %err.as_report(), "compactor {} leaving the cluster with err", context_id); - hummock_manager.compactor_manager - .remove_compactor(context_id); + tracing::warn!(error = %err.as_report(), "compactor stream {} poll with err, recv stream may be destroyed", context_id); continue } _ => { - tracing::warn!("compactor {} leaving the cluster", context_id); - hummock_manager.compactor_manager - .remove_compactor(context_id); + tracing::warn!("compactor stream {} poll err, recv stream may be destroyed", context_id); continue }, }; @@ -2778,131 +2784,34 @@ impl HummockManager { } match event { - RequestEvent::PullTask(PullTask { - pull_task_count, - }) => { - assert_ne!(0, pull_task_count); - if let Some(compactor) = hummock_manager.compactor_manager.get_compactor(context_id) { - if let Some((group, task_type)) = hummock_manager.auto_pick_compaction_group_and_type().await { - let selector: &mut Box = { - let versioning_guard = read_lock!(hummock_manager, versioning).await; - let versioning = versioning_guard.deref(); - - if versioning.write_limit.contains_key(&group) { - let enable_emergency_picker = match hummock_manager - .compaction_group_manager - .read() - .await - .try_get_compaction_group_config(group) - { - Some(config) =>{ config.compaction_config.enable_emergency_picker }, - None => { unreachable!("compaction-group {} not exist", group) } - }; - - if enable_emergency_picker { - compaction_selectors.get_mut(&TaskType::Emergency).unwrap() - } else { - compaction_selectors.get_mut(&task_type).unwrap() - } - } else { - compaction_selectors.get_mut(&task_type).unwrap() - } - }; - for _ in 0..pull_task_count { - let compact_task = - hummock_manager - .get_compact_task(group, selector) - .await; - - match compact_task { - Ok(Some(compact_task)) => { - let task_id = compact_task.task_id; - if let Err(e) = compactor.send_event( - ResponseEvent::CompactTask(compact_task) - ) { - tracing::warn!( - error = %e.as_report(), - "Failed to send task {} to {}", - task_id, - compactor.context_id(), - ); - - compactor_alive = false; - break; - } - }, - Ok(None) => { - // no compact_task to be picked - hummock_manager.compaction_state.unschedule(group, task_type); - break; - } - Err(err) => { - tracing::warn!(error = %err.as_report(), "Failed to get compaction task"); - break; - } - }; - } - } - - // ack to compactor - if compactor_alive { - if let Err(e) = compactor.send_event(ResponseEvent::PullTaskAck(PullTaskAck {})){ - tracing::warn!( - error = %e.as_report(), - "Failed to send ask to {}", - context_id, - ); - - compactor_alive = false; - } - } - } else { - compactor_alive = false; - - } - }, - - RequestEvent::ReportTask(ReportTask { - task_id, - task_status, - sorted_output_ssts, - table_stats_change - }) => { - if let Err(e) = hummock_manager.report_compact_task(task_id, TaskStatus::try_from(task_status).unwrap(), sorted_output_ssts, Some(table_stats_change)) - .await { - tracing::error!(error = %e.as_report(), "report compact_tack fail"); - } - }, - RequestEvent::HeartBeat(HeartBeat { progress, }) => { let compactor_manager = hummock_manager.compactor_manager.clone(); let cancel_tasks = compactor_manager.update_task_heartbeats(&progress); - - // TODO: task cancellation can be batched - for task in cancel_tasks { - tracing::info!( - "Task with group_id {} task_id {} with context_id {} has expired due to lack of visible progress", - task.compaction_group_id, - task.task_id, - context_id, - ); - - if let Err(e) = - hummock_manager - .cancel_compact_task(task.task_id, TaskStatus::HeartbeatCanceled) - .await - { - tracing::error!( - task_id = task.task_id, - error = %e.as_report(), - "Attempt to remove compaction task due to elapsed heartbeat failed. We will continue to track its heartbeat - until we can successfully report its status." + if let Some(compactor) = compactor_manager.get_compactor(context_id) { + // TODO: task cancellation can be batched + for task in cancel_tasks { + tracing::info!( + "Task with group_id {} task_id {} with context_id {} has expired due to lack of visible progress", + task.compaction_group_id, + task.task_id, + context_id, ); - } - if let Some(compactor) = compactor_manager.get_compactor(context_id) { + if let Err(e) = + hummock_manager + .cancel_compact_task(task.task_id, TaskStatus::HeartbeatCanceled) + .await + { + tracing::error!( + task_id = task.task_id, + error = %e.as_report(), + "Attempt to remove compaction task due to elapsed heartbeat failed. We will continue to track its heartbeat + until we can successfully report its status." + ); + } + // Forcefully cancel the task so that it terminates // early on the compactor // node. @@ -2912,31 +2821,36 @@ impl HummockManager { context_id, task.task_id ); - } else { - compactor_alive = false; } + } else { + // Determine the validity of the compactor streaming rpc. When the compactor no longer exists in the manager, the stream will be removed. + // Tip: Connectivity to the compactor will be determined through the `send_event` operation. When send fails, it will be removed from the manager + compactor_alive = false; } }, RequestEvent::Register(_) => { unreachable!() } - } + e @ (RequestEvent::PullTask(_) | RequestEvent::ReportTask(_)) => { + let _ = tx.send((context_id, e)); + } + } if compactor_alive { push_stream(context_id, stream, &mut compactor_request_streams); } else { - tracing::warn!("compactor {} leaving the cluster since it's not alive", context_id); - hummock_manager.compactor_manager - .remove_compactor(context_id); + tracing::warn!("compactor stream {} error, send stream may be destroyed", context_id); } - } + }, } } }); - (join_handle, shutdown_tx) + join_handle_vec.push((join_handle, shutdown_tx)); + + join_handle_vec } pub fn add_compactor_stream( @@ -3175,6 +3089,136 @@ impl HummockManager { Ok(()) } + + /// dedicated event runtime for CPU/IO bound event + #[named] + async fn compact_task_dedicated_event_handler( + hummock_manager: Arc, + mut rx: UnboundedReceiver<(u32, subscribe_compaction_event_request::Event)>, + shutdown_rx_shared: Shared>, + ) { + let mut compaction_selectors = init_selectors(); + + tokio::select! { + _ = shutdown_rx_shared => {} + + _ = async { + while let Some((context_id, event)) = rx.recv().await { + match event { + RequestEvent::PullTask(PullTask { pull_task_count }) => { + assert_ne!(0, pull_task_count); + if let Some(compactor) = + hummock_manager.compactor_manager.get_compactor(context_id) + { + if let Some((group, task_type)) = + hummock_manager.auto_pick_compaction_group_and_type().await + { + let selector: &mut Box = { + let versioning_guard = + read_lock!(hummock_manager, versioning).await; + let versioning = versioning_guard.deref(); + + if versioning.write_limit.contains_key(&group) { + let enable_emergency_picker = match hummock_manager + .compaction_group_manager + .read() + .await + .try_get_compaction_group_config(group) + { + Some(config) => { + config.compaction_config.enable_emergency_picker + } + None => { + unreachable!("compaction-group {} not exist", group) + } + }; + + if enable_emergency_picker { + compaction_selectors + .get_mut(&TaskType::Emergency) + .unwrap() + } else { + compaction_selectors.get_mut(&task_type).unwrap() + } + } else { + compaction_selectors.get_mut(&task_type).unwrap() + } + }; + for _ in 0..pull_task_count { + let compact_task = + hummock_manager.get_compact_task(group, selector).await; + + match compact_task { + Ok(Some(compact_task)) => { + let task_id = compact_task.task_id; + if let Err(e) = compactor.send_event( + ResponseEvent::CompactTask(compact_task), + ) { + tracing::warn!( + error = %e.as_report(), + "Failed to send task {} to {}", + task_id, + compactor.context_id(), + ); + + hummock_manager.compactor_manager.remove_compactor(context_id); + break; + } + } + Ok(None) => { + // no compact_task to be picked + hummock_manager + .compaction_state + .unschedule(group, task_type); + break; + } + Err(err) => { + tracing::warn!(error = %err.as_report(), "Failed to get compaction task"); + break; + } + }; + } + } + + // ack to compactor + if let Err(e) = + compactor.send_event(ResponseEvent::PullTaskAck(PullTaskAck {})) + { + tracing::warn!( + error = %e.as_report(), + "Failed to send ask to {}", + context_id, + ); + hummock_manager.compactor_manager.remove_compactor(context_id); + } + } + } + + RequestEvent::ReportTask(ReportTask { + task_id, + task_status, + sorted_output_ssts, + table_stats_change, + }) => { + if let Err(e) = hummock_manager + .report_compact_task( + task_id, + TaskStatus::try_from(task_status).unwrap(), + sorted_output_ssts, + Some(table_stats_change), + ) + .await + { + tracing::error!(error = %e.as_report(), "report compact_tack fail") + } + } + + _ => unreachable!(), + } + } + } => {} + } + } } // This structure describes how hummock handles sst switching in a compaction group. A better sst cut will result in better data alignment, which in turn will improve the efficiency of the compaction.