Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(compaction): remove meta client from compactor context #12020

Merged
merged 2 commits into from
Sep 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/compute/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,9 +216,8 @@ pub async fn compute_node_serve(
let memory_limiter = Arc::new(MemoryLimiter::new(
storage_opts.compactor_memory_limit_mb as u64 * 1024 * 1024 / 2,
));
let compactor_context = Arc::new(CompactorContext {
let compactor_context = CompactorContext {
storage_opts,
hummock_meta_client: hummock_meta_client.clone(),
sstable_store: storage.sstable_store(),
compactor_metrics: compactor_metrics.clone(),
is_share_buffer_compact: false,
Expand All @@ -229,10 +228,11 @@ pub async fn compute_node_serve(
task_progress_manager: Default::default(),
await_tree_reg: None,
running_task_count: Arc::new(AtomicU32::new(0)),
});
};

let (handle, shutdown_sender) = start_compactor(
compactor_context,
hummock_meta_client.clone(),
storage.sstable_object_id_manager().clone(),
);
sub_tasks.push((handle, shutdown_sender));
Expand Down
6 changes: 3 additions & 3 deletions src/storage/compactor/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,9 +194,8 @@ pub async fn compactor_serve(
};
let await_tree_reg =
await_tree_config.map(|c| Arc::new(RwLock::new(await_tree::Registry::new(c))));
let compactor_context = Arc::new(CompactorContext {
let compactor_context = CompactorContext {
storage_opts,
hummock_meta_client: hummock_meta_client.clone(),
sstable_store: sstable_store.clone(),
compactor_metrics,
is_share_buffer_compact: false,
Expand All @@ -211,7 +210,7 @@ pub async fn compactor_serve(
task_progress_manager: Default::default(),
await_tree_reg: await_tree_reg.clone(),
running_task_count: Arc::new(AtomicU32::new(0)),
});
};
let mut sub_tasks = vec![
MetaClient::start_heartbeat_loop(
meta_client.clone(),
Expand All @@ -220,6 +219,7 @@ pub async fn compactor_serve(
),
risingwave_storage::hummock::compactor::start_compactor(
compactor_context.clone(),
hummock_meta_client.clone(),
sstable_object_id_manager.clone(),
),
];
Expand Down
26 changes: 7 additions & 19 deletions src/storage/hummock_test/src/compactor_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,27 +166,23 @@ pub(crate) mod tests {

fn get_compactor_context_with_filter_key_extractor_manager(
storage: &HummockStorage,
hummock_meta_client: &Arc<dyn HummockMetaClient>,
filter_key_extractor_manager: FilterKeyExtractorManagerRef,
) -> CompactorContext {
get_compactor_context_with_filter_key_extractor_manager_impl(
storage.storage_opts().clone(),
storage.sstable_store(),
hummock_meta_client,
filter_key_extractor_manager,
)
}

fn get_compactor_context_with_filter_key_extractor_manager_impl(
options: Arc<StorageOpts>,
sstable_store: SstableStoreRef,
hummock_meta_client: &Arc<dyn HummockMetaClient>,
filter_key_extractor_manager: FilterKeyExtractorManagerRef,
) -> CompactorContext {
CompactorContext {
storage_opts: options,
sstable_store,
hummock_meta_client: hummock_meta_client.clone(),
compactor_metrics: Arc::new(CompactorMetrics::unused()),
is_share_buffer_compact: false,
compaction_executor: Arc::new(CompactionExecutor::new(Some(1))),
Expand Down Expand Up @@ -235,7 +231,6 @@ pub(crate) mod tests {
};
let compact_ctx = get_compactor_context_with_filter_key_extractor_manager(
&storage,
&hummock_meta_client,
rpc_filter_key_extractor_manager,
);
let sstable_object_id_manager = Arc::new(SstableObjectIdManager::new(
Expand Down Expand Up @@ -291,7 +286,7 @@ pub(crate) mod tests {

let (_tx, rx) = tokio::sync::oneshot::channel();
let (mut result_task, task_stats) = compact(
Arc::new(compact_ctx.clone()),
compact_ctx.clone(),
compact_task.clone(),
rx,
Box::new(sstable_object_id_manager.clone()),
Expand Down Expand Up @@ -399,7 +394,6 @@ pub(crate) mod tests {
};
let compact_ctx = get_compactor_context_with_filter_key_extractor_manager(
&storage,
&hummock_meta_client,
rpc_filter_key_extractor_manager,
);
let sstable_object_id_manager = Arc::new(SstableObjectIdManager::new(
Expand Down Expand Up @@ -454,7 +448,7 @@ pub(crate) mod tests {
// 3. compact
let (_tx, rx) = tokio::sync::oneshot::channel();
let (mut result_task, task_stats) = compact(
Arc::new(compact_ctx),
compact_ctx,
compact_task.clone(),
rx,
Box::new(sstable_object_id_manager.clone()),
Expand Down Expand Up @@ -569,7 +563,6 @@ pub(crate) mod tests {

pub(crate) fn prepare_compactor_and_filter(
storage: &HummockStorage,
hummock_meta_client: &Arc<dyn HummockMetaClient>,
existing_table_id: u32,
) -> CompactorContext {
let rpc_filter_key_extractor_manager = match storage.filter_key_extractor_manager().clone()
Expand All @@ -586,7 +579,6 @@ pub(crate) mod tests {

get_compactor_context_with_filter_key_extractor_manager(
storage,
hummock_meta_client,
rpc_filter_key_extractor_manager,
)
}
Expand Down Expand Up @@ -700,7 +692,6 @@ pub(crate) mod tests {
let compact_ctx = get_compactor_context_with_filter_key_extractor_manager_impl(
global_storage.storage_opts().clone(),
global_storage.sstable_store(),
&hummock_meta_client,
rpc_filter_key_extractor_manager,
);
let sstable_object_id_manager = Arc::new(SstableObjectIdManager::new(
Expand Down Expand Up @@ -788,7 +779,7 @@ pub(crate) mod tests {
// 4. compact
let (_tx, rx) = tokio::sync::oneshot::channel();
let (mut result_task, task_stats) = compact(
Arc::new(compact_ctx),
compact_ctx,
compact_task.clone(),
rx,
Box::new(sstable_object_id_manager.clone()),
Expand Down Expand Up @@ -887,7 +878,6 @@ pub(crate) mod tests {

let compact_ctx = get_compactor_context_with_filter_key_extractor_manager(
&storage,
&hummock_meta_client,
rpc_filter_key_extractor_manager.clone(),
);
let sstable_object_id_manager = Arc::new(SstableObjectIdManager::new(
Expand Down Expand Up @@ -977,7 +967,7 @@ pub(crate) mod tests {
// 3. compact
let (_tx, rx) = tokio::sync::oneshot::channel();
let (mut result_task, task_stats) = compact(
Arc::new(compact_ctx),
compact_ctx,
compact_task.clone(),
rx,
Box::new(sstable_object_id_manager.clone()),
Expand Down Expand Up @@ -1087,7 +1077,6 @@ pub(crate) mod tests {
);
let compact_ctx = get_compactor_context_with_filter_key_extractor_manager(
&storage,
&hummock_meta_client,
rpc_filter_key_extractor_manager,
);
let sstable_object_id_manager = Arc::new(SstableObjectIdManager::new(
Expand Down Expand Up @@ -1163,7 +1152,7 @@ pub(crate) mod tests {
// 3. compact
let (_tx, rx) = tokio::sync::oneshot::channel();
let (mut result_task, task_stats) = compact(
Arc::new(compact_ctx),
compact_ctx,
compact_task.clone(),
rx,
Box::new(sstable_object_id_manager.clone()),
Expand Down Expand Up @@ -1264,8 +1253,7 @@ pub(crate) mod tests {
TableId::from(existing_table_id),
)
.await;
let compact_ctx =
prepare_compactor_and_filter(&storage, &hummock_meta_client, existing_table_id);
let compact_ctx = prepare_compactor_and_filter(&storage, existing_table_id);
let sstable_object_id_manager = Arc::new(SstableObjectIdManager::new(
hummock_meta_client.clone(),
storage
Expand Down Expand Up @@ -1322,7 +1310,7 @@ pub(crate) mod tests {
// 3. compact
let (_tx, rx) = tokio::sync::oneshot::channel();
let (mut result_task, task_stats) = compact(
Arc::new(compact_ctx),
compact_ctx,
compact_task.clone(),
rx,
Box::new(sstable_object_id_manager.clone()),
Expand Down
8 changes: 2 additions & 6 deletions src/storage/hummock_test/src/sync_point_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ async fn test_syncpoints_test_local_notification_receiver() {

pub async fn compact_once(
hummock_manager_ref: HummockManagerRef<MemStore>,
compact_ctx: Arc<CompactorContext>,
compact_ctx: CompactorContext,
sstable_object_id_manager: Arc<SstableObjectIdManager>,
) {
// 2. get compact task
Expand Down Expand Up @@ -289,11 +289,7 @@ async fn test_syncpoints_get_in_delete_range_boundary() {
TableId::from(existing_table_id),
)
.await;
let compact_ctx = Arc::new(prepare_compactor_and_filter(
&storage,
&hummock_meta_client,
existing_table_id,
));
let compact_ctx = prepare_compactor_and_filter(&storage, existing_table_id);

let sstable_object_id_manager = Arc::new(SstableObjectIdManager::new(
hummock_meta_client.clone(),
Expand Down
4 changes: 2 additions & 2 deletions src/storage/src/hummock/compactor/compaction_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ pub fn build_multi_compaction_filter(compact_task: &CompactTask) -> MultiCompact
pub async fn generate_splits(
sstable_infos: &Vec<SstableInfo>,
compaction_size: u64,
context: Arc<CompactorContext>,
context: CompactorContext,
) -> HummockResult<Vec<KeyRange_vec>> {
let parallel_compact_size = (context.storage_opts.parallel_compact_size_mb as u64) << 20;
if compaction_size > parallel_compact_size {
Expand Down Expand Up @@ -231,7 +231,7 @@ pub async fn generate_splits(
Ok(vec![])
}

pub fn estimate_task_output_capacity(context: Arc<CompactorContext>, task: &CompactTask) -> usize {
pub fn estimate_task_output_capacity(context: CompactorContext, task: &CompactTask) -> usize {
let max_target_file_size = context.storage_opts.sstable_size_mb as usize * (1 << 20);
let total_input_uncompressed_file_size = task
.input_ssts
Expand Down
8 changes: 4 additions & 4 deletions src/storage/src/hummock/compactor/compactor_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ pub struct CompactorRunner {
impl CompactorRunner {
pub fn new(
split_index: usize,
context: Arc<CompactorContext>,
context: CompactorContext,
task: CompactTask,
object_id_getter: Box<dyn GetObjectId>,
) -> Self {
Expand Down Expand Up @@ -109,7 +109,7 @@ impl CompactorRunner {
Self {
compactor,
compact_task: task,
sstable_store: context.sstable_store.clone(),
sstable_store: context.sstable_store,
key_range,
split_index,
}
Expand Down Expand Up @@ -235,7 +235,7 @@ impl CompactorRunner {
/// Handles a compaction task and reports its status to hummock manager.
/// Always return `Ok` and let hummock manager handle errors.
pub async fn compact(
compactor_context: Arc<CompactorContext>,
compactor_context: CompactorContext,
mut compact_task: CompactTask,
mut shutdown_rx: Receiver<()>,
object_id_getter: Box<dyn GetObjectId>,
Expand Down Expand Up @@ -543,7 +543,7 @@ pub async fn compact(
/// Fills in the compact task and tries to report the task result to meta node.
fn compact_done(
mut compact_task: CompactTask,
context: Arc<CompactorContext>,
context: CompactorContext,
output_ssts: Vec<CompactOutput>,
task_status: TaskStatus,
) -> (CompactTask, HashMap<u32, TableStats>) {
Expand Down
6 changes: 0 additions & 6 deletions src/storage/src/hummock/compactor/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use std::sync::atomic::AtomicU32;
use std::sync::Arc;

use parking_lot::RwLock;
use risingwave_rpc_client::HummockMetaClient;

use super::task_progress::TaskProgressManagerRef;
use crate::filter_key_extractor::FilterKeyExtractorManager;
Expand All @@ -32,9 +31,6 @@ pub struct CompactorContext {
/// Storage options.
pub storage_opts: Arc<StorageOpts>,

/// The meta client.
pub hummock_meta_client: Arc<dyn HummockMetaClient>,

/// Sstable store that manages the sstables.
pub sstable_store: SstableStoreRef,

Expand All @@ -61,7 +57,6 @@ impl CompactorContext {
pub fn new_local_compact_context(
storage_opts: Arc<StorageOpts>,
sstable_store: SstableStoreRef,
hummock_meta_client: Arc<dyn HummockMetaClient>,
compactor_metrics: Arc<CompactorMetrics>,
filter_key_extractor_manager: FilterKeyExtractorManager,
) -> Self {
Expand All @@ -77,7 +72,6 @@ impl CompactorContext {
// not limit memory for local compact
Self {
storage_opts,
hummock_meta_client,
sstable_store,
compactor_metrics,
is_share_buffer_compact: true,
Expand Down
9 changes: 5 additions & 4 deletions src/storage/src/hummock/compactor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ use risingwave_pb::hummock::{
CompactTaskProgress, CompactorWorkload, SubscribeCompactionEventRequest,
SubscribeCompactionEventResponse,
};
use risingwave_rpc_client::HummockMetaClient;
pub use shared_buffer_compact::{compact, merge_imms_in_memory};
use sysinfo::{CpuRefreshKind, ProcessExt, ProcessRefreshKind, RefreshKind, System, SystemExt};
use tokio::sync::oneshot::Sender;
Expand All @@ -76,7 +77,7 @@ use crate::hummock::{
/// Implementation of Hummock compaction.
pub struct Compactor {
/// The context of the compactor.
context: Arc<CompactorContext>,
context: CompactorContext,
object_id_getter: Box<dyn GetObjectId>,
task_config: TaskConfig,
options: SstableBuilderOptions,
Expand All @@ -88,7 +89,7 @@ pub type CompactOutput = (usize, Vec<LocalSstableInfo>, CompactionStatistics);
impl Compactor {
/// Create a new compactor.
pub fn new(
context: Arc<CompactorContext>,
context: CompactorContext,
options: SstableBuilderOptions,
task_config: TaskConfig,
object_id_getter: Box<dyn GetObjectId>,
Expand Down Expand Up @@ -310,10 +311,10 @@ impl Compactor {
/// manager and runs compaction tasks.
#[cfg_attr(coverage, no_coverage)]
pub fn start_compactor(
compactor_context: Arc<CompactorContext>,
compactor_context: CompactorContext,
hummock_meta_client: Arc<dyn HummockMetaClient>,
sstable_object_id_manager: Arc<SstableObjectIdManager>,
) -> (JoinHandle<()>, Sender<()>) {
let hummock_meta_client = compactor_context.hummock_meta_client.clone();
type CompactionShutdownMap = Arc<Mutex<HashMap<u64, Sender<()>>>>;
let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
let stream_retry_interval = Duration::from_secs(30);
Expand Down
6 changes: 3 additions & 3 deletions src/storage/src/hummock/compactor/shared_buffer_compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ const GC_WATERMARK_FOR_FLUSH: u64 = 0;

/// Flush shared buffer to level0. Resulted SSTs are grouped by compaction group.
pub async fn compact(
context: Arc<CompactorContext>,
context: CompactorContext,
sstable_object_id_manager: SstableObjectIdManagerRef,
payload: UploadTaskPayload,
compaction_group_index: Arc<HashMap<TableId, CompactionGroupId>>,
Expand Down Expand Up @@ -110,7 +110,7 @@ pub async fn compact(

/// For compaction from shared buffer to level 0, this is the only function gets called.
async fn compact_shared_buffer(
context: Arc<CompactorContext>,
context: CompactorContext,
sstable_object_id_manager: SstableObjectIdManagerRef,
mut payload: UploadTaskPayload,
) -> HummockResult<Vec<LocalSstableInfo>> {
Expand Down Expand Up @@ -453,7 +453,7 @@ impl SharedBufferCompactRunner {
pub fn new(
split_index: usize,
key_range: KeyRange,
context: Arc<CompactorContext>,
context: CompactorContext,
sub_compaction_sstable_size: usize,
split_weight_by_vnode: u32,
use_block_based_filter: bool,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ pub struct HummockEventHandler {
async fn flush_imms(
payload: UploadTaskPayload,
task_info: UploadTaskInfo,
compactor_context: Arc<crate::hummock::compactor::CompactorContext>,
compactor_context: CompactorContext,
sstable_object_id_manager: Arc<SstableObjectIdManager>,
) -> HummockResult<Vec<LocalSstableInfo>> {
for epoch in &task_info.epochs {
Expand All @@ -159,7 +159,7 @@ impl HummockEventHandler {
hummock_event_tx: mpsc::UnboundedSender<HummockEvent>,
hummock_event_rx: mpsc::UnboundedReceiver<HummockEvent>,
pinned_version: PinnedVersion,
compactor_context: Arc<CompactorContext>,
compactor_context: CompactorContext,
sstable_object_id_manager: Arc<SstableObjectIdManager>,
state_store_metrics: Arc<HummockStateStoreMetrics>,
refill_data_file_cache_levels: HashSet<u32>,
Expand Down
Loading