Skip to content

Commit

Permalink
refactor(compactor): refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
Li0k committed May 17, 2024
1 parent b90250c commit 65e48ab
Show file tree
Hide file tree
Showing 6 changed files with 33 additions and 93 deletions.
49 changes: 1 addition & 48 deletions src/meta/src/hummock/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2612,22 +2612,7 @@ impl HummockManager {
// progress (meta + compactor)
// 2. meta periodically scans the task and performs a cancel on
// the meta side for tasks that are not updated by heartbeat
// for task in compactor_manager.get_heartbeat_expired_tasks() {
// if let Err(e) = hummock_manager
// .cancel_compact_task(
// task.task_id,
// TaskStatus::HeartbeatCanceled,
// )
// .await
// {
// tracing::error!(
// task_id = task.task_id,
// error = %e.as_report(),
// "Attempt to remove compaction task due to elapsed heartbeat failed. We will continue to track its heartbeat
// until we can successfully report its status",
// );
// }
// }

let expired_tasks: Vec<u64> = compactor_manager
.get_heartbeat_expired_tasks()
.into_iter()
Expand Down Expand Up @@ -3048,38 +3033,6 @@ impl HummockManager {
context_id
);
}

// for task in cancel_tasks {
// tracing::info!(
// "Task cancel with group_id {} task_id {} with context_id {} has expired due to lack of visible progress",
// task.compaction_group_id,
// task.task_id,
// context_id,
// );

// if let Err(e) =
// hummock_manager
// .cancel_compact_task(task.task_id, TaskStatus::HeartbeatProgressCanceled)
// .await
// {
// tracing::error!(
// task_id = task.task_id,
// error = %e.as_report(),
// "Attempt to remove compaction task due to elapsed heartbeat failed. We will continue to track its heartbeat
// until we can successfully report its status."
// );
// }

// // Forcefully cancel the task so that it terminates
// // early on the compactor
// // node.
// let _ = compactor.cancel_task(task.task_id);
// tracing::info!(
// "CancelTask operation for task_id {} has been sent to node with context_id {}",
// context_id,
// task.task_id
// );
// }
} else {
// Determine the validity of the compactor streaming rpc. When the compactor no longer exists in the manager, the stream will be removed.
// Tip: Connectivity to the compactor will be determined through the `send_event` operation. When send fails, it will be removed from the manager
Expand Down
9 changes: 2 additions & 7 deletions src/storage/hummock_sdk/src/compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,6 @@ pub fn estimate_memory_for_compact_task(
block_size: u64,
recv_buffer_size: u64,
sst_capacity: u64,
support_streaming_upload: bool,
) -> u64 {
let mut result = 0;
// When building the SstableStreamIterator, sstable_syncable will fetch the SstableMeta and seek
Expand Down Expand Up @@ -223,13 +222,9 @@ pub fn estimate_memory_for_compact_task(
// output
// builder will maintain SstableInfo + block_builder(block) + writer (block to vec)
let estimated_meta_size = sst_capacity * task_max_sst_meta_ratio / 100;
// if support_streaming_upload {
// result += estimated_meta_size + 2 * block_size
// } else {
// result += estimated_meta_size + sst_capacity; // Use sst_capacity to avoid BatchUploader
// // memory bursts.
// }

// FIXME: sst_capacity is the upper bound of the memory usage of the streaming sstable uploader
// A more reasonable memory limit method needs to be adopted, this is just a temporary fix.
result += estimated_meta_size + sst_capacity;

result
Expand Down
1 change: 0 additions & 1 deletion src/storage/src/hummock/compactor/compactor_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,6 @@ pub async fn compact(
.object_store_recv_buffer_size
.unwrap_or(6 * 1024 * 1024) as u64,
capacity as u64,
context.sstable_store.store().support_streaming_upload(),
) * compact_task.splits.len() as u64;

tracing::info!(
Expand Down
23 changes: 16 additions & 7 deletions src/storage/src/hummock/compactor/fast_compactor_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -505,17 +505,26 @@ impl CompactorRunner {
);

let statistic = self.executor.take_statistics();
let outputs = self.executor.builder.finish().await?;
let ssts = Compactor::report_progress(
let output_ssts = self
.executor
.builder
.finish()
.await?
.into_iter()
.map(|split_table_output| split_table_output.sst_info)
.collect();
Compactor::report_progress(
self.metrics.clone(),
Some(self.executor.task_progress.clone()),
outputs,
&output_ssts,
false,
)
.await?;
let sst_infos = ssts.iter().map(|sst| sst.sst_info.clone()).collect_vec();
);
let sst_infos = output_ssts
.iter()
.map(|sst| sst.sst_info.clone())
.collect_vec();
assert!(can_concat(&sst_infos));
Ok((ssts, statistic))
Ok((output_ssts, statistic))
}
}

Expand Down
41 changes: 14 additions & 27 deletions src/storage/src/hummock/compactor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ use crate::hummock::iterator::{Forward, HummockIterator};
use crate::hummock::multi_builder::SplitTableOutput;
use crate::hummock::vacuum::Vacuum;
use crate::hummock::{
validate_ssts, BlockedXor16FilterBuilder, FilterBuilder, HummockError,
SharedComapctorObjectIdManager, SstableWriterFactory, UnifiedSstableWriterFactory,
validate_ssts, BlockedXor16FilterBuilder, FilterBuilder, SharedComapctorObjectIdManager,
SstableWriterFactory, UnifiedSstableWriterFactory,
};
use crate::monitor::CompactorMetrics;

Expand Down Expand Up @@ -175,13 +175,17 @@ impl Compactor {

compact_timer.observe_duration();

let ssts = Self::report_progress(
let ssts = split_table_outputs
.into_iter()
.map(|x| x.sst_info)
.collect();

Self::report_progress(
self.context.compactor_metrics.clone(),
task_progress,
split_table_outputs,
&ssts,
self.context.is_share_buffer_compact,
)
.await?;
);

self.context
.compactor_metrics
Expand All @@ -204,27 +208,14 @@ impl Compactor {
Ok((ssts, table_stats_map))
}

pub async fn report_progress(
pub fn report_progress(
metrics: Arc<CompactorMetrics>,
task_progress: Option<Arc<TaskProgress>>,
split_table_outputs: Vec<SplitTableOutput>,
ssts: &Vec<LocalSstableInfo>,
is_share_buffer_compact: bool,
) -> HummockResult<Vec<LocalSstableInfo>> {
let mut ssts = Vec::with_capacity(split_table_outputs.len());
// let mut rets = vec![];

for SplitTableOutput {
sst_info,
// upload_join_handle,
} in split_table_outputs
{
) {
for sst_info in ssts {
let sst_size = sst_info.file_size();
ssts.push(sst_info);
// let ret = upload_join_handle
// .verbose_instrument_await("upload")
// .await
// .map_err(HummockError::sstable_upload_error);
// rets.push(ret);
if let Some(tracker) = &task_progress {
tracker.inc_ssts_uploaded();
tracker.dec_num_pending_write_io();
Expand All @@ -235,10 +226,6 @@ impl Compactor {
metrics.compaction_upload_sst_counts.inc();
}
}
// for ret in rets {
// ret??;
// }
Ok(ssts)
}

async fn compact_key_range_impl<F: SstableWriterFactory, B: FilterBuilder>(
Expand Down
3 changes: 0 additions & 3 deletions src/storage/src/hummock/sstable/multi_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ pub trait TableBuilderFactory {

pub struct SplitTableOutput {
pub sst_info: LocalSstableInfo,
// pub upload_join_handle: UploadJoinHandle,
}

/// A wrapper for [`SstableBuilder`] which automatically split key-value pairs into multiple tables,
Expand Down Expand Up @@ -304,15 +303,13 @@ where
}
}

// TEST join upload handle before push
builder_output
.writer_output
.verbose_instrument_await("upload")
.await
.map_err(HummockError::sstable_upload_error)??;

self.sst_outputs.push(SplitTableOutput {
// upload_join_handle: builder_output.writer_output,
sst_info: builder_output.sst_info,
});
}
Expand Down

0 comments on commit 65e48ab

Please sign in to comment.