diff --git a/src/config/ci-delete-range-test.toml b/src/config/ci-delete-range-test.toml index 6a1c271dd1448..33313ca092c4d 100644 --- a/src/config/ci-delete-range-test.toml +++ b/src/config/ci-delete-range-test.toml @@ -5,7 +5,4 @@ vacuum_interval_sec = 10 telemetry_enabled = false [system] -max_concurrent_creating_streaming_jobs = 0 - -[meta.compaction_config] -level0_stop_write_threshold_sub_level_number = 1000 \ No newline at end of file +max_concurrent_creating_streaming_jobs = 0 \ No newline at end of file diff --git a/src/meta/src/hummock/mock_hummock_meta_client.rs b/src/meta/src/hummock/mock_hummock_meta_client.rs index 063556ace1839..5bcb6f4faa4d7 100644 --- a/src/meta/src/hummock/mock_hummock_meta_client.rs +++ b/src/meta/src/hummock/mock_hummock_meta_client.rs @@ -27,6 +27,8 @@ use risingwave_hummock_sdk::{ SstObjectIdRange, }; use risingwave_pb::common::{HostAddress, WorkerType}; +use risingwave_pb::hummock::compact_task::TaskStatus; +use risingwave_pb::hummock::subscribe_compaction_event_request::{Event, ReportTask}; use risingwave_pb::hummock::subscribe_compaction_event_response::Event as ResponseEvent; use risingwave_pb::hummock::{ compact_task, CompactTask, HummockSnapshot, HummockVersion, SubscribeCompactionEventRequest, @@ -224,7 +226,7 @@ impl HummockMetaClient for MockHummockMetaClient { .compactor_manager_ref_for_test() .add_compactor(context_id); - let (request_sender, _request_receiver) = + let (request_sender, mut request_receiver) = unbounded_channel::(); self.compact_context_id.store(context_id, Ordering::Release); @@ -232,6 +234,8 @@ impl HummockMetaClient for MockHummockMetaClient { let (task_tx, task_rx) = tokio::sync::mpsc::unbounded_channel(); let hummock_manager_compact = self.hummock_manager.clone(); + let mut join_handle_vec = vec![]; + let handle = tokio::spawn(async move { loop { let group_and_type = hummock_manager_compact @@ -270,11 +274,44 @@ impl HummockMetaClient for MockHummockMetaClient { } }); + join_handle_vec.push(handle); + + let hummock_manager_compact = self.hummock_manager.clone(); + let report_handle = tokio::spawn(async move { + tracing::info!("report_handle start"); + + loop { + if let Some(item) = request_receiver.recv().await { + if let Event::ReportTask(ReportTask { + task_id, + task_status, + sorted_output_ssts, + table_stats_change, + }) = item.event.unwrap() + { + if let Err(e) = hummock_manager_compact + .report_compact_task( + task_id, + TaskStatus::from_i32(task_status).unwrap(), + sorted_output_ssts, + Some(table_stats_change), + ) + .await + { + tracing::error!("report compact_tack fail {e:?}"); + } + } + } + } + }); + + join_handle_vec.push(report_handle); + Ok(( request_sender, Box::pin(CompactionEventItemStream { inner: UnboundedReceiverStream::new(task_rx), - _handle: handle, + _handle: join_handle_vec, }), )) } @@ -288,7 +325,7 @@ impl MockHummockMetaClient { pub struct CompactionEventItemStream { inner: UnboundedReceiverStream, - _handle: JoinHandle<()>, + _handle: Vec>, } impl Drop for CompactionEventItemStream { diff --git a/src/storage/src/hummock/compactor/mod.rs b/src/storage/src/hummock/compactor/mod.rs index 01f1d992e0b49..d2f36167675e7 100644 --- a/src/storage/src/hummock/compactor/mod.rs +++ b/src/storage/src/hummock/compactor/mod.rs @@ -408,7 +408,6 @@ pub fn start_compactor( _ = periodic_event_interval.tick() => { let progress_list = get_task_progress(task_progress.clone()); - if let Err(e) = request_sender.send(SubscribeCompactionEventRequest { event: Some(RequestEvent::HeartBeat( HeartBeat { diff --git a/src/tests/compaction_test/src/delete_range_runner.rs b/src/tests/compaction_test/src/delete_range_runner.rs index abeae5e418af2..5963d7b33cdea 100644 --- a/src/tests/compaction_test/src/delete_range_runner.rs +++ b/src/tests/compaction_test/src/delete_range_runner.rs @@ -91,7 +91,8 @@ pub fn start_delete_range(opts: CompactionTestOpts) -> Pin anyhow::Result<()> { let config = load_config(&opts.config_path, NoOverride); - let compaction_config = CompactionConfigBuilder::new().build(); + let compaction_config = + CompactionConfigBuilder::with_opt(&config.meta.compaction_config).build(); compaction_test( compaction_config, config, @@ -596,6 +597,7 @@ fn run_compactor_thread( await_tree_reg: None, running_task_count: Arc::new(AtomicU32::new(0)), }; + start_compactor( compactor_context, meta_client,