Skip to content

Commit

Permalink
refactor(compaction): remove meta client from compactor context (#12020)
Browse files Browse the repository at this point in the history
  • Loading branch information
wcy-fdu authored Sep 6, 2023
1 parent b5dd428 commit 50792cc
Show file tree
Hide file tree
Showing 12 changed files with 37 additions and 60 deletions.
6 changes: 3 additions & 3 deletions src/compute/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,9 +218,8 @@ pub async fn compute_node_serve(
let memory_limiter = Arc::new(MemoryLimiter::new(
storage_opts.compactor_memory_limit_mb as u64 * 1024 * 1024 / 2,
));
let compactor_context = Arc::new(CompactorContext {
let compactor_context = CompactorContext {
storage_opts,
hummock_meta_client: hummock_meta_client.clone(),
sstable_store: storage.sstable_store(),
compactor_metrics: compactor_metrics.clone(),
is_share_buffer_compact: false,
Expand All @@ -231,10 +230,11 @@ pub async fn compute_node_serve(
task_progress_manager: Default::default(),
await_tree_reg: None,
running_task_count: Arc::new(AtomicU32::new(0)),
});
};

let (handle, shutdown_sender) = start_compactor(
compactor_context,
hummock_meta_client.clone(),
storage.sstable_object_id_manager().clone(),
);
sub_tasks.push((handle, shutdown_sender));
Expand Down
6 changes: 3 additions & 3 deletions src/storage/compactor/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,9 +194,8 @@ pub async fn compactor_serve(
};
let await_tree_reg =
await_tree_config.map(|c| Arc::new(RwLock::new(await_tree::Registry::new(c))));
let compactor_context = Arc::new(CompactorContext {
let compactor_context = CompactorContext {
storage_opts,
hummock_meta_client: hummock_meta_client.clone(),
sstable_store: sstable_store.clone(),
compactor_metrics,
is_share_buffer_compact: false,
Expand All @@ -211,7 +210,7 @@ pub async fn compactor_serve(
task_progress_manager: Default::default(),
await_tree_reg: await_tree_reg.clone(),
running_task_count: Arc::new(AtomicU32::new(0)),
});
};
let mut sub_tasks = vec![
MetaClient::start_heartbeat_loop(
meta_client.clone(),
Expand All @@ -220,6 +219,7 @@ pub async fn compactor_serve(
),
risingwave_storage::hummock::compactor::start_compactor(
compactor_context.clone(),
hummock_meta_client.clone(),
sstable_object_id_manager.clone(),
),
];
Expand Down
26 changes: 7 additions & 19 deletions src/storage/hummock_test/src/compactor_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,27 +167,23 @@ pub(crate) mod tests {

fn get_compactor_context_with_filter_key_extractor_manager(
storage: &HummockStorage,
hummock_meta_client: &Arc<dyn HummockMetaClient>,
filter_key_extractor_manager: FilterKeyExtractorManagerRef,
) -> CompactorContext {
get_compactor_context_with_filter_key_extractor_manager_impl(
storage.storage_opts().clone(),
storage.sstable_store(),
hummock_meta_client,
filter_key_extractor_manager,
)
}

fn get_compactor_context_with_filter_key_extractor_manager_impl(
options: Arc<StorageOpts>,
sstable_store: SstableStoreRef,
hummock_meta_client: &Arc<dyn HummockMetaClient>,
filter_key_extractor_manager: FilterKeyExtractorManagerRef,
) -> CompactorContext {
CompactorContext {
storage_opts: options,
sstable_store,
hummock_meta_client: hummock_meta_client.clone(),
compactor_metrics: Arc::new(CompactorMetrics::unused()),
is_share_buffer_compact: false,
compaction_executor: Arc::new(CompactionExecutor::new(Some(1))),
Expand Down Expand Up @@ -236,7 +232,6 @@ pub(crate) mod tests {
};
let compact_ctx = get_compactor_context_with_filter_key_extractor_manager(
&storage,
&hummock_meta_client,
rpc_filter_key_extractor_manager,
);
let sstable_object_id_manager = Arc::new(SstableObjectIdManager::new(
Expand Down Expand Up @@ -292,7 +287,7 @@ pub(crate) mod tests {

let (_tx, rx) = tokio::sync::oneshot::channel();
let (mut result_task, task_stats) = compact(
Arc::new(compact_ctx.clone()),
compact_ctx.clone(),
compact_task.clone(),
rx,
Box::new(sstable_object_id_manager.clone()),
Expand Down Expand Up @@ -400,7 +395,6 @@ pub(crate) mod tests {
};
let compact_ctx = get_compactor_context_with_filter_key_extractor_manager(
&storage,
&hummock_meta_client,
rpc_filter_key_extractor_manager,
);
let sstable_object_id_manager = Arc::new(SstableObjectIdManager::new(
Expand Down Expand Up @@ -455,7 +449,7 @@ pub(crate) mod tests {
// 3. compact
let (_tx, rx) = tokio::sync::oneshot::channel();
let (mut result_task, task_stats) = compact(
Arc::new(compact_ctx),
compact_ctx,
compact_task.clone(),
rx,
Box::new(sstable_object_id_manager.clone()),
Expand Down Expand Up @@ -570,7 +564,6 @@ pub(crate) mod tests {

pub(crate) fn prepare_compactor_and_filter(
storage: &HummockStorage,
hummock_meta_client: &Arc<dyn HummockMetaClient>,
existing_table_id: u32,
) -> CompactorContext {
let rpc_filter_key_extractor_manager = match storage.filter_key_extractor_manager().clone()
Expand All @@ -587,7 +580,6 @@ pub(crate) mod tests {

get_compactor_context_with_filter_key_extractor_manager(
storage,
hummock_meta_client,
rpc_filter_key_extractor_manager,
)
}
Expand Down Expand Up @@ -701,7 +693,6 @@ pub(crate) mod tests {
let compact_ctx = get_compactor_context_with_filter_key_extractor_manager_impl(
global_storage.storage_opts().clone(),
global_storage.sstable_store(),
&hummock_meta_client,
rpc_filter_key_extractor_manager,
);
let sstable_object_id_manager = Arc::new(SstableObjectIdManager::new(
Expand Down Expand Up @@ -789,7 +780,7 @@ pub(crate) mod tests {
// 4. compact
let (_tx, rx) = tokio::sync::oneshot::channel();
let (mut result_task, task_stats) = compact(
Arc::new(compact_ctx),
compact_ctx,
compact_task.clone(),
rx,
Box::new(sstable_object_id_manager.clone()),
Expand Down Expand Up @@ -888,7 +879,6 @@ pub(crate) mod tests {

let compact_ctx = get_compactor_context_with_filter_key_extractor_manager(
&storage,
&hummock_meta_client,
rpc_filter_key_extractor_manager.clone(),
);
let sstable_object_id_manager = Arc::new(SstableObjectIdManager::new(
Expand Down Expand Up @@ -978,7 +968,7 @@ pub(crate) mod tests {
// 3. compact
let (_tx, rx) = tokio::sync::oneshot::channel();
let (mut result_task, task_stats) = compact(
Arc::new(compact_ctx),
compact_ctx,
compact_task.clone(),
rx,
Box::new(sstable_object_id_manager.clone()),
Expand Down Expand Up @@ -1088,7 +1078,6 @@ pub(crate) mod tests {
);
let compact_ctx = get_compactor_context_with_filter_key_extractor_manager(
&storage,
&hummock_meta_client,
rpc_filter_key_extractor_manager,
);
let sstable_object_id_manager = Arc::new(SstableObjectIdManager::new(
Expand Down Expand Up @@ -1164,7 +1153,7 @@ pub(crate) mod tests {
// 3. compact
let (_tx, rx) = tokio::sync::oneshot::channel();
let (mut result_task, task_stats) = compact(
Arc::new(compact_ctx),
compact_ctx,
compact_task.clone(),
rx,
Box::new(sstable_object_id_manager.clone()),
Expand Down Expand Up @@ -1265,8 +1254,7 @@ pub(crate) mod tests {
TableId::from(existing_table_id),
)
.await;
let compact_ctx =
prepare_compactor_and_filter(&storage, &hummock_meta_client, existing_table_id);
let compact_ctx = prepare_compactor_and_filter(&storage, existing_table_id);
let sstable_object_id_manager = Arc::new(SstableObjectIdManager::new(
hummock_meta_client.clone(),
storage
Expand Down Expand Up @@ -1323,7 +1311,7 @@ pub(crate) mod tests {
// 3. compact
let (_tx, rx) = tokio::sync::oneshot::channel();
let (mut result_task, task_stats) = compact(
Arc::new(compact_ctx),
compact_ctx,
compact_task.clone(),
rx,
Box::new(sstable_object_id_manager.clone()),
Expand Down
8 changes: 2 additions & 6 deletions src/storage/hummock_test/src/sync_point_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ async fn test_syncpoints_test_local_notification_receiver() {

pub async fn compact_once(
hummock_manager_ref: HummockManagerRef<MemStore>,
compact_ctx: Arc<CompactorContext>,
compact_ctx: CompactorContext,
sstable_object_id_manager: Arc<SstableObjectIdManager>,
) {
// 2. get compact task
Expand Down Expand Up @@ -290,11 +290,7 @@ async fn test_syncpoints_get_in_delete_range_boundary() {
TableId::from(existing_table_id),
)
.await;
let compact_ctx = Arc::new(prepare_compactor_and_filter(
&storage,
&hummock_meta_client,
existing_table_id,
));
let compact_ctx = prepare_compactor_and_filter(&storage, existing_table_id);

let sstable_object_id_manager = Arc::new(SstableObjectIdManager::new(
hummock_meta_client.clone(),
Expand Down
4 changes: 2 additions & 2 deletions src/storage/src/hummock/compactor/compaction_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ pub fn build_multi_compaction_filter(compact_task: &CompactTask) -> MultiCompact
pub async fn generate_splits(
sstable_infos: &Vec<SstableInfo>,
compaction_size: u64,
context: Arc<CompactorContext>,
context: CompactorContext,
) -> HummockResult<Vec<KeyRange_vec>> {
let parallel_compact_size = (context.storage_opts.parallel_compact_size_mb as u64) << 20;
if compaction_size > parallel_compact_size {
Expand Down Expand Up @@ -231,7 +231,7 @@ pub async fn generate_splits(
Ok(vec![])
}

pub fn estimate_task_output_capacity(context: Arc<CompactorContext>, task: &CompactTask) -> usize {
pub fn estimate_task_output_capacity(context: CompactorContext, task: &CompactTask) -> usize {
let max_target_file_size = context.storage_opts.sstable_size_mb as usize * (1 << 20);
let total_input_uncompressed_file_size = task
.input_ssts
Expand Down
8 changes: 4 additions & 4 deletions src/storage/src/hummock/compactor/compactor_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ pub struct CompactorRunner {
impl CompactorRunner {
pub fn new(
split_index: usize,
context: Arc<CompactorContext>,
context: CompactorContext,
task: CompactTask,
object_id_getter: Box<dyn GetObjectId>,
) -> Self {
Expand Down Expand Up @@ -109,7 +109,7 @@ impl CompactorRunner {
Self {
compactor,
compact_task: task,
sstable_store: context.sstable_store.clone(),
sstable_store: context.sstable_store,
key_range,
split_index,
}
Expand Down Expand Up @@ -235,7 +235,7 @@ impl CompactorRunner {
/// Handles a compaction task and reports its status to hummock manager.
/// Always return `Ok` and let hummock manager handle errors.
pub async fn compact(
compactor_context: Arc<CompactorContext>,
compactor_context: CompactorContext,
mut compact_task: CompactTask,
mut shutdown_rx: Receiver<()>,
object_id_getter: Box<dyn GetObjectId>,
Expand Down Expand Up @@ -543,7 +543,7 @@ pub async fn compact(
/// Fills in the compact task and tries to report the task result to meta node.
fn compact_done(
mut compact_task: CompactTask,
context: Arc<CompactorContext>,
context: CompactorContext,
output_ssts: Vec<CompactOutput>,
task_status: TaskStatus,
) -> (CompactTask, HashMap<u32, TableStats>) {
Expand Down
6 changes: 0 additions & 6 deletions src/storage/src/hummock/compactor/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use std::sync::atomic::AtomicU32;
use std::sync::Arc;

use parking_lot::RwLock;
use risingwave_rpc_client::HummockMetaClient;

use super::task_progress::TaskProgressManagerRef;
use crate::filter_key_extractor::FilterKeyExtractorManager;
Expand All @@ -32,9 +31,6 @@ pub struct CompactorContext {
/// Storage options.
pub storage_opts: Arc<StorageOpts>,

/// The meta client.
pub hummock_meta_client: Arc<dyn HummockMetaClient>,

/// Sstable store that manages the sstables.
pub sstable_store: SstableStoreRef,

Expand All @@ -61,7 +57,6 @@ impl CompactorContext {
pub fn new_local_compact_context(
storage_opts: Arc<StorageOpts>,
sstable_store: SstableStoreRef,
hummock_meta_client: Arc<dyn HummockMetaClient>,
compactor_metrics: Arc<CompactorMetrics>,
filter_key_extractor_manager: FilterKeyExtractorManager,
) -> Self {
Expand All @@ -77,7 +72,6 @@ impl CompactorContext {
// not limit memory for local compact
Self {
storage_opts,
hummock_meta_client,
sstable_store,
compactor_metrics,
is_share_buffer_compact: true,
Expand Down
9 changes: 5 additions & 4 deletions src/storage/src/hummock/compactor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ use risingwave_pb::hummock::{
CompactTaskProgress, CompactorWorkload, SubscribeCompactionEventRequest,
SubscribeCompactionEventResponse,
};
use risingwave_rpc_client::HummockMetaClient;
pub use shared_buffer_compact::{compact, merge_imms_in_memory};
use sysinfo::{CpuRefreshKind, ProcessExt, ProcessRefreshKind, RefreshKind, System, SystemExt};
use tokio::sync::oneshot::Sender;
Expand All @@ -76,7 +77,7 @@ use crate::hummock::{
/// Implementation of Hummock compaction.
pub struct Compactor {
/// The context of the compactor.
context: Arc<CompactorContext>,
context: CompactorContext,
object_id_getter: Box<dyn GetObjectId>,
task_config: TaskConfig,
options: SstableBuilderOptions,
Expand All @@ -88,7 +89,7 @@ pub type CompactOutput = (usize, Vec<LocalSstableInfo>, CompactionStatistics);
impl Compactor {
/// Create a new compactor.
pub fn new(
context: Arc<CompactorContext>,
context: CompactorContext,
options: SstableBuilderOptions,
task_config: TaskConfig,
object_id_getter: Box<dyn GetObjectId>,
Expand Down Expand Up @@ -310,10 +311,10 @@ impl Compactor {
/// manager and runs compaction tasks.
#[cfg_attr(coverage, no_coverage)]
pub fn start_compactor(
compactor_context: Arc<CompactorContext>,
compactor_context: CompactorContext,
hummock_meta_client: Arc<dyn HummockMetaClient>,
sstable_object_id_manager: Arc<SstableObjectIdManager>,
) -> (JoinHandle<()>, Sender<()>) {
let hummock_meta_client = compactor_context.hummock_meta_client.clone();
type CompactionShutdownMap = Arc<Mutex<HashMap<u64, Sender<()>>>>;
let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
let stream_retry_interval = Duration::from_secs(30);
Expand Down
6 changes: 3 additions & 3 deletions src/storage/src/hummock/compactor/shared_buffer_compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ const GC_WATERMARK_FOR_FLUSH: u64 = 0;

/// Flush shared buffer to level0. Resulted SSTs are grouped by compaction group.
pub async fn compact(
context: Arc<CompactorContext>,
context: CompactorContext,
sstable_object_id_manager: SstableObjectIdManagerRef,
payload: UploadTaskPayload,
compaction_group_index: Arc<HashMap<TableId, CompactionGroupId>>,
Expand Down Expand Up @@ -110,7 +110,7 @@ pub async fn compact(

/// For compaction from shared buffer to level 0, this is the only function gets called.
async fn compact_shared_buffer(
context: Arc<CompactorContext>,
context: CompactorContext,
sstable_object_id_manager: SstableObjectIdManagerRef,
mut payload: UploadTaskPayload,
) -> HummockResult<Vec<LocalSstableInfo>> {
Expand Down Expand Up @@ -453,7 +453,7 @@ impl SharedBufferCompactRunner {
pub fn new(
split_index: usize,
key_range: KeyRange,
context: Arc<CompactorContext>,
context: CompactorContext,
sub_compaction_sstable_size: usize,
split_weight_by_vnode: u32,
use_block_based_filter: bool,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ pub struct HummockEventHandler {
async fn flush_imms(
payload: UploadTaskPayload,
task_info: UploadTaskInfo,
compactor_context: Arc<crate::hummock::compactor::CompactorContext>,
compactor_context: CompactorContext,
sstable_object_id_manager: Arc<SstableObjectIdManager>,
) -> HummockResult<Vec<LocalSstableInfo>> {
for epoch in &task_info.epochs {
Expand All @@ -158,7 +158,7 @@ impl HummockEventHandler {
hummock_event_tx: mpsc::UnboundedSender<HummockEvent>,
hummock_event_rx: mpsc::UnboundedReceiver<HummockEvent>,
pinned_version: PinnedVersion,
compactor_context: Arc<CompactorContext>,
compactor_context: CompactorContext,
sstable_object_id_manager: Arc<SstableObjectIdManager>,
state_store_metrics: Arc<HummockStateStoreMetrics>,
cache_refill_config: CacheRefillConfig,
Expand Down
Loading

0 comments on commit 50792cc

Please sign in to comment.