Skip to content

Commit

Permalink
fix(storage): fix delete range test timeout (#12568)
Browse files Browse the repository at this point in the history
  • Loading branch information
Li0k authored Sep 28, 2023
1 parent 8f220de commit dc6865c
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 9 deletions.
5 changes: 1 addition & 4 deletions src/config/ci-delete-range-test.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,4 @@ vacuum_interval_sec = 10
telemetry_enabled = false

[system]
max_concurrent_creating_streaming_jobs = 0

[meta.compaction_config]
level0_stop_write_threshold_sub_level_number = 1000
max_concurrent_creating_streaming_jobs = 0
43 changes: 40 additions & 3 deletions src/meta/src/hummock/mock_hummock_meta_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ use risingwave_hummock_sdk::{
SstObjectIdRange,
};
use risingwave_pb::common::{HostAddress, WorkerType};
use risingwave_pb::hummock::compact_task::TaskStatus;
use risingwave_pb::hummock::subscribe_compaction_event_request::{Event, ReportTask};
use risingwave_pb::hummock::subscribe_compaction_event_response::Event as ResponseEvent;
use risingwave_pb::hummock::{
compact_task, CompactTask, HummockSnapshot, HummockVersion, SubscribeCompactionEventRequest,
Expand Down Expand Up @@ -224,14 +226,16 @@ impl HummockMetaClient for MockHummockMetaClient {
.compactor_manager_ref_for_test()
.add_compactor(context_id);

let (request_sender, _request_receiver) =
let (request_sender, mut request_receiver) =
unbounded_channel::<SubscribeCompactionEventRequest>();

self.compact_context_id.store(context_id, Ordering::Release);

let (task_tx, task_rx) = tokio::sync::mpsc::unbounded_channel();

let hummock_manager_compact = self.hummock_manager.clone();
let mut join_handle_vec = vec![];

let handle = tokio::spawn(async move {
loop {
let group_and_type = hummock_manager_compact
Expand Down Expand Up @@ -270,11 +274,44 @@ impl HummockMetaClient for MockHummockMetaClient {
}
});

join_handle_vec.push(handle);

let hummock_manager_compact = self.hummock_manager.clone();
let report_handle = tokio::spawn(async move {
tracing::info!("report_handle start");

loop {
if let Some(item) = request_receiver.recv().await {
if let Event::ReportTask(ReportTask {
task_id,
task_status,
sorted_output_ssts,
table_stats_change,
}) = item.event.unwrap()
{
if let Err(e) = hummock_manager_compact
.report_compact_task(
task_id,
TaskStatus::from_i32(task_status).unwrap(),
sorted_output_ssts,
Some(table_stats_change),
)
.await
{
tracing::error!("report compact_tack fail {e:?}");
}
}
}
}
});

join_handle_vec.push(report_handle);

Ok((
request_sender,
Box::pin(CompactionEventItemStream {
inner: UnboundedReceiverStream::new(task_rx),
_handle: handle,
_handle: join_handle_vec,
}),
))
}
Expand All @@ -288,7 +325,7 @@ impl MockHummockMetaClient {

pub struct CompactionEventItemStream {
inner: UnboundedReceiverStream<CompactionEventItem>,
_handle: JoinHandle<()>,
_handle: Vec<JoinHandle<()>>,
}

impl Drop for CompactionEventItemStream {
Expand Down
1 change: 0 additions & 1 deletion src/storage/src/hummock/compactor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,6 @@ pub fn start_compactor(
_ = periodic_event_interval.tick() => {
let progress_list = get_task_progress(task_progress.clone());


if let Err(e) = request_sender.send(SubscribeCompactionEventRequest {
event: Some(RequestEvent::HeartBeat(
HeartBeat {
Expand Down
4 changes: 3 additions & 1 deletion src/tests/compaction_test/src/delete_range_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ pub fn start_delete_range(opts: CompactionTestOpts) -> Pin<Box<dyn Future<Output
}
pub async fn compaction_test_main(opts: CompactionTestOpts) -> anyhow::Result<()> {
let config = load_config(&opts.config_path, NoOverride);
let compaction_config = CompactionConfigBuilder::new().build();
let compaction_config =
CompactionConfigBuilder::with_opt(&config.meta.compaction_config).build();
compaction_test(
compaction_config,
config,
Expand Down Expand Up @@ -596,6 +597,7 @@ fn run_compactor_thread(
await_tree_reg: None,
running_task_count: Arc::new(AtomicU32::new(0)),
};

start_compactor(
compactor_context,
meta_client,
Expand Down

0 comments on commit dc6865c

Please sign in to comment.