diff --git a/src/storage/src/hummock/event_handler/hummock_event_handler.rs b/src/storage/src/hummock/event_handler/hummock_event_handler.rs index 908bb45a43fc4..ad94e74cdcb26 100644 --- a/src/storage/src/hummock/event_handler/hummock_event_handler.rs +++ b/src/storage/src/hummock/event_handler/hummock_event_handler.rs @@ -88,6 +88,16 @@ impl BufferTracker { ) } + #[cfg(test)] + fn for_test_with_config(flush_threshold: usize, min_batch_flush_size: usize) -> Self { + Self::new( + usize::MAX, + flush_threshold, + GenericGauge::new("test", "test").unwrap(), + min_batch_flush_size, + ) + } + fn new( capacity: usize, flush_threshold: usize, @@ -192,6 +202,7 @@ pub struct HummockEventHandler { read_version_mapping: Arc>, /// A copy of `read_version_mapping` but owned by event handler local_read_version_mapping: HashMap, + table_read_versions: HashMap>, version_update_notifier_tx: Arc>, recent_versions: Arc>, @@ -232,12 +243,24 @@ impl HummockEventHandler { let upload_compactor_context = compactor_context.clone(); let upload_task_latency = state_store_metrics.uploader_upload_task_latency.clone(); let wait_poll_latency = state_store_metrics.uploader_wait_poll_latency.clone(); + let recent_versions = RecentVersions::new( + pinned_version, + compactor_context + .storage_opts + .max_cached_recent_versions_number, + state_store_metrics.clone(), + ); + let buffer_tracker = BufferTracker::from_storage_opts( + &compactor_context.storage_opts, + state_store_metrics.uploader_uploading_task_size.clone(), + ); Self::new_inner( version_update_rx, - pinned_version, compactor_context.sstable_store.clone(), state_store_metrics, - &compactor_context.storage_opts, + CacheRefillConfig::from_storage_opts(&compactor_context.storage_opts), + recent_versions, + buffer_tracker, Arc::new(move |payload, task_info| { static NEXT_UPLOAD_TASK_ID: LazyLock = LazyLock::new(|| AtomicUsize::new(0)); @@ -288,22 +311,20 @@ impl HummockEventHandler { fn new_inner( version_update_rx: UnboundedReceiver, - pinned_version: PinnedVersion, sstable_store: SstableStoreRef, state_store_metrics: Arc, - storage_opts: &StorageOpts, + refill_config: CacheRefillConfig, + recent_versions: RecentVersions, + buffer_tracker: BufferTracker, spawn_upload_task: SpawnUploadTask, spawn_refill_task: SpawnRefillTask, ) -> Self { let (hummock_event_tx, hummock_event_rx) = event_channel(state_store_metrics.event_handler_pending_event.clone()); - let (version_update_notifier_tx, _) = tokio::sync::watch::channel(pinned_version.clone()); + let (version_update_notifier_tx, _) = + tokio::sync::watch::channel(recent_versions.latest_version().clone()); let version_update_notifier_tx = Arc::new(version_update_notifier_tx); let read_version_mapping = Arc::new(RwLock::new(HashMap::default())); - let buffer_tracker = BufferTracker::from_storage_opts( - storage_opts, - state_store_metrics.uploader_uploading_task_size.clone(), - ); let metrics = HummockEventHandlerMetrics { event_handler_on_upload_finish_latency: state_store_metrics @@ -319,29 +340,21 @@ impl HummockEventHandler { let uploader = HummockUploader::new( state_store_metrics.clone(), - pinned_version.clone(), + recent_versions.latest_version().clone(), spawn_upload_task, buffer_tracker, - storage_opts, - ); - let refiller = CacheRefiller::new( - CacheRefillConfig::from_storage_opts(storage_opts), - sstable_store, - spawn_refill_task, ); + let refiller = CacheRefiller::new(refill_config, sstable_store, spawn_refill_task); Self { hummock_event_tx, hummock_event_rx, version_update_rx, version_update_notifier_tx, - recent_versions: Arc::new(ArcSwap::from_pointee(RecentVersions::new( - pinned_version, - storage_opts.max_cached_recent_versions_number, - state_store_metrics, - ))), + recent_versions: Arc::new(ArcSwap::from_pointee(recent_versions)), read_version_mapping, local_read_version_mapping: Default::default(), + table_read_versions: Default::default(), uploader, refiller, last_instance_id: 0, @@ -465,14 +478,19 @@ impl HummockEventHandler { .start_sync_epoch(new_sync_epoch, sync_result_sender, table_ids); } - async fn handle_clear(&mut self, notifier: oneshot::Sender<()>, version_id: HummockVersionId) { + async fn handle_clear( + &mut self, + notifier: oneshot::Sender<()>, + version_id: HummockVersionId, + table_ids: Option>, + ) { info!( ?version_id, current_version_id = ?self.uploader.hummock_version().id(), "handle clear event" ); - self.uploader.clear(); + self.uploader.clear(table_ids.clone()); let current_version = self.uploader.hummock_version(); @@ -526,14 +544,28 @@ impl HummockEventHandler { ); } - assert!( - self.local_read_version_mapping.is_empty(), - "read version mapping not empty when clear. remaining tables: {:?}", - self.local_read_version_mapping - .values() - .map(|(_, read_version)| read_version.read().table_id()) - .collect_vec() - ); + if let Some(table_ids) = &table_ids { + assert!( + table_ids + .iter() + .all(|table_id| !self.table_read_versions.contains_key(table_id)), + "read version not empty when clearing tables: {:?}. remaining tables: {:?}", + table_ids, + table_ids + .iter() + .filter(|table_id| self.table_read_versions.contains_key(*table_id)) + .collect_vec() + ); + } else { + assert!( + self.local_read_version_mapping.is_empty(), + "read version mapping not empty when clear. remaining tables: {:?}", + self.local_read_version_mapping + .values() + .map(|(_, read_version)| read_version.read().table_id()) + .collect_vec() + ); + } // Notify completion of the Clear event. let _ = notifier.send(()).inspect_err(|e| { @@ -646,8 +678,8 @@ impl HummockEventHandler { event = pin!(self.hummock_event_rx.recv()) => { let Some(event) = event else { break }; match event { - HummockEvent::Clear(notifier, version_id) => { - self.handle_clear(notifier, version_id).await + HummockEvent::Clear(notifier, version_id, table_ids) => { + self.handle_clear(notifier, version_id, table_ids).await }, HummockEvent::Shutdown => { info!("event handler shutdown"); @@ -690,7 +722,7 @@ impl HummockEventHandler { } => { self.handle_sync_epoch(new_sync_epoch, sync_result_sender, table_ids); } - HummockEvent::Clear(_, _) => { + HummockEvent::Clear(_, _, _) => { unreachable!("clear is handled in separated async context") } HummockEvent::Shutdown => { @@ -764,6 +796,11 @@ impl HummockEventHandler { { self.local_read_version_mapping .insert(instance_id, (table_id, basic_read_version.clone())); + assert!(self + .table_read_versions + .entry(table_id) + .or_default() + .insert(instance_id)); let mut read_version_mapping_guard = self.read_version_mapping.write(); read_version_mapping_guard @@ -819,6 +856,14 @@ impl HummockEventHandler { instance_id ) }); + let table_read_versions = self + .table_read_versions + .get_mut(&table_id) + .expect("should exist"); + assert!(table_read_versions.remove(&instance_id)); + if table_read_versions.is_empty() { + self.table_read_versions.remove(&table_id); + } let mut read_version_mapping_guard = self.read_version_mapping.write(); let entry = read_version_mapping_guard .get_mut(&table_id) @@ -893,6 +938,7 @@ mod tests { use futures::FutureExt; use parking_lot::Mutex; use risingwave_common::bitmap::Bitmap; + use risingwave_common::catalog::TableId; use risingwave_common::hash::VirtualNode; use risingwave_common::util::epoch::{test_epoch, EpochExt}; use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId; @@ -902,15 +948,23 @@ mod tests { use tokio::sync::mpsc::unbounded_channel; use tokio::sync::oneshot; - use crate::hummock::event_handler::refiller::CacheRefiller; - use crate::hummock::event_handler::uploader::test_utils::{gen_imm, TEST_TABLE_ID}; + use crate::hummock::event_handler::hummock_event_handler::BufferTracker; + use crate::hummock::event_handler::refiller::{CacheRefillConfig, CacheRefiller}; + use crate::hummock::event_handler::uploader::test_utils::{ + gen_imm, gen_imm_inner, prepare_uploader_order_test_spawn_task_fn, TEST_TABLE_ID, + }; use crate::hummock::event_handler::uploader::UploadTaskOutput; - use crate::hummock::event_handler::{HummockEvent, HummockEventHandler, HummockVersionUpdate}; + use crate::hummock::event_handler::{ + HummockEvent, HummockEventHandler, HummockReadVersionRef, HummockVersionUpdate, + LocalInstanceGuard, + }; use crate::hummock::iterator::test_utils::mock_sstable_store; use crate::hummock::local_version::pinned_version::PinnedVersion; + use crate::hummock::local_version::recent_versions::RecentVersions; use crate::hummock::store::version::{StagingData, VersionUpdate}; use crate::hummock::test_utils::default_opts_for_test; use crate::hummock::HummockError; + use crate::mem_table::ImmutableMemtable; use crate::monitor::HummockStateStoreMetrics; use crate::store::SealCurrentEpochOptions; @@ -932,13 +986,19 @@ mod tests { let (refill_task_tx, mut refill_task_rx) = unbounded_channel(); let refill_task_tx_clone = refill_task_tx.clone(); + let storage_opt = default_opts_for_test(); + let metrics = Arc::new(HummockStateStoreMetrics::unused()); let event_handler = HummockEventHandler::new_inner( version_update_rx, - initial_version.clone(), mock_sstable_store().await, - Arc::new(HummockStateStoreMetrics::unused()), - &default_opts_for_test(), + metrics.clone(), + CacheRefillConfig::from_storage_opts(&storage_opt), + RecentVersions::new(initial_version.clone(), 10, metrics.clone()), + BufferTracker::from_storage_opts( + &storage_opt, + metrics.uploader_uploading_task_size.clone(), + ), Arc::new(|_, _| unreachable!("should not spawn upload task")), Arc::new(move |_, _, old_version, new_version| { let (tx, rx) = oneshot::channel(); @@ -957,7 +1017,9 @@ mod tests { let send_clear = |version_id| { let (tx, rx) = oneshot::channel(); - event_tx.send(HummockEvent::Clear(tx, version_id)).unwrap(); + event_tx + .send(HummockEvent::Clear(tx, version_id, None)) + .unwrap(); rx }; @@ -1066,12 +1128,19 @@ mod tests { let (tx, rx) = oneshot::channel(); let rx = Arc::new(Mutex::new(Some(rx))); + let storage_opt = default_opts_for_test(); + let metrics = Arc::new(HummockStateStoreMetrics::unused()); + let event_handler = HummockEventHandler::new_inner( version_update_rx, - initial_version.clone(), mock_sstable_store().await, - Arc::new(HummockStateStoreMetrics::unused()), - &default_opts_for_test(), + metrics.clone(), + CacheRefillConfig::from_storage_opts(&storage_opt), + RecentVersions::new(initial_version.clone(), 10, metrics.clone()), + BufferTracker::from_storage_opts( + &storage_opt, + metrics.uploader_uploading_task_size.clone(), + ), Arc::new(move |_, info| { assert_eq!(info.epochs.len(), 1); let epoch = info.epochs[0]; @@ -1100,7 +1169,7 @@ mod tests { let send_event = |event| event_tx.send(event).unwrap(); - let _join_handle = spawn(event_handler.start_hummock_event_handler_worker()); + let join_handle = spawn(event_handler.start_hummock_event_handler_worker()); let (read_version, guard) = { let (tx, rx) = oneshot::channel(); @@ -1183,5 +1252,282 @@ mod tests { tx.send(()).unwrap(); rx1.await.unwrap().unwrap_err(); rx2.await.unwrap().unwrap_err(); + + send_event(HummockEvent::Shutdown); + join_handle.await.unwrap(); + } + + #[tokio::test] + async fn test_clear_tables() { + let table_id1 = TableId::new(1); + let table_id2 = TableId::new(2); + let epoch0 = test_epoch(233); + + let initial_version = PinnedVersion::new( + HummockVersion::from_rpc_protobuf(&PbHummockVersion { + id: 1, + state_table_info: HashMap::from_iter([ + ( + table_id1.table_id, + StateTableInfo { + committed_epoch: epoch0, + compaction_group_id: StaticCompactionGroupId::StateDefault as _, + }, + ), + ( + table_id2.table_id, + StateTableInfo { + committed_epoch: epoch0, + compaction_group_id: StaticCompactionGroupId::StateDefault as _, + }, + ), + ]), + ..Default::default() + }), + unbounded_channel().0, + ); + + let (_version_update_tx, version_update_rx) = unbounded_channel(); + + let epoch1 = epoch0.next_epoch(); + let epoch2 = epoch1.next_epoch(); + let epoch3 = epoch2.next_epoch(); + + let imm_size = gen_imm_inner(TEST_TABLE_ID, epoch1, 0, None).await.size(); + + // The buffer can hold at most 1 imm. When a new imm is added, the previous one will be spilled, and the newly added one will be retained. + let buffer_tracker = BufferTracker::for_test_with_config(imm_size * 2 - 1, 1); + let memory_limiter = buffer_tracker.get_memory_limiter().clone(); + + let gen_imm = |table_id, epoch, spill_offset| { + let imm = gen_imm_inner(table_id, epoch, spill_offset, Some(&*memory_limiter)) + .now_or_never() + .unwrap(); + assert_eq!(imm.size(), imm_size); + imm + }; + let imm1_1 = gen_imm(table_id1, epoch1, 0); + let imm1_2_1 = gen_imm(table_id1, epoch2, 0); + + let storage_opt = default_opts_for_test(); + let metrics = Arc::new(HummockStateStoreMetrics::unused()); + + let (spawn_task, new_task_notifier) = prepare_uploader_order_test_spawn_task_fn(false); + + let event_handler = HummockEventHandler::new_inner( + version_update_rx, + mock_sstable_store().await, + metrics.clone(), + CacheRefillConfig::from_storage_opts(&storage_opt), + RecentVersions::new(initial_version.clone(), 10, metrics.clone()), + buffer_tracker, + spawn_task, + CacheRefiller::default_spawn_refill_task(), + ); + + let event_tx = event_handler.event_sender(); + + let send_event = |event| event_tx.send(event).unwrap(); + let flush_event = || async { + let (tx, rx) = oneshot::channel(); + send_event(HummockEvent::FlushEvent(tx)); + rx.await.unwrap(); + }; + let start_epoch = |table_id, epoch| { + send_event(HummockEvent::StartEpoch { + epoch, + table_ids: HashSet::from_iter([table_id]), + }) + }; + let init_epoch = |instance: &LocalInstanceGuard, init_epoch| { + send_event(HummockEvent::InitEpoch { + instance_id: instance.instance_id, + init_epoch, + }) + }; + let write_imm = |read_version: &HummockReadVersionRef, + instance: &LocalInstanceGuard, + imm: &ImmutableMemtable| { + read_version + .write() + .update(VersionUpdate::Staging(StagingData::ImmMem(imm.clone()))); + + send_event(HummockEvent::ImmToUploader { + instance_id: instance.instance_id, + imm: imm.clone(), + }); + }; + let seal_epoch = |instance: &LocalInstanceGuard, next_epoch| { + send_event(HummockEvent::LocalSealEpoch { + instance_id: instance.instance_id, + next_epoch, + opts: SealCurrentEpochOptions::for_test(), + }) + }; + let sync_epoch = |table_id, new_sync_epoch| { + let (tx, rx) = oneshot::channel(); + send_event(HummockEvent::SyncEpoch { + new_sync_epoch, + sync_result_sender: tx, + table_ids: HashSet::from_iter([table_id]), + }); + rx + }; + + let join_handle = spawn(event_handler.start_hummock_event_handler_worker()); + + let (read_version1, guard1) = { + let (tx, rx) = oneshot::channel(); + send_event(HummockEvent::RegisterReadVersion { + table_id: table_id1, + new_read_version_sender: tx, + is_replicated: false, + vnodes: Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST)), + }); + rx.await.unwrap() + }; + + let (read_version2, guard2) = { + let (tx, rx) = oneshot::channel(); + send_event(HummockEvent::RegisterReadVersion { + table_id: table_id2, + new_read_version_sender: tx, + is_replicated: false, + vnodes: Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST)), + }); + rx.await.unwrap() + }; + + // prepare data of table1 + let (task1_1_finish_tx, task1_1_rx) = { + start_epoch(table_id1, epoch1); + + init_epoch(&guard1, epoch1); + + write_imm(&read_version1, &guard1, &imm1_1); + + start_epoch(table_id1, epoch2); + + seal_epoch(&guard1, epoch2); + + let (wait_task_start, task_finish_tx) = new_task_notifier(HashMap::from_iter([( + guard1.instance_id, + vec![imm1_1.batch_id()], + )])); + + let mut rx = sync_epoch(table_id1, epoch1); + wait_task_start.await; + assert!(poll_fn(|cx| Poll::Ready(rx.poll_unpin(cx).is_pending())).await); + + write_imm(&read_version1, &guard1, &imm1_2_1); + flush_event().await; + + (task_finish_tx, rx) + }; + // by now, the state in uploader of table_id1 + // unsync: epoch2 -> [imm1_2] + // syncing: epoch1 -> [imm1_1] + + let (task1_2_finish_tx, _finish_txs) = { + let mut finish_txs = vec![]; + let imm2_1_1 = gen_imm(table_id2, epoch1, 0); + start_epoch(table_id2, epoch1); + init_epoch(&guard2, epoch1); + let (wait_task_start, task1_2_finish_tx) = new_task_notifier(HashMap::from_iter([( + guard1.instance_id, + vec![imm1_2_1.batch_id()], + )])); + write_imm(&read_version2, &guard2, &imm2_1_1); + wait_task_start.await; + + let imm2_1_2 = gen_imm(table_id2, epoch1, 1); + let (wait_task_start, finish_tx) = new_task_notifier(HashMap::from_iter([( + guard2.instance_id, + vec![imm2_1_2.batch_id(), imm2_1_1.batch_id()], + )])); + finish_txs.push(finish_tx); + write_imm(&read_version2, &guard2, &imm2_1_2); + wait_task_start.await; + + let imm2_1_3 = gen_imm(table_id2, epoch1, 2); + write_imm(&read_version2, &guard2, &imm2_1_3); + start_epoch(table_id2, epoch2); + seal_epoch(&guard2, epoch2); + let (wait_task_start, finish_tx) = new_task_notifier(HashMap::from_iter([( + guard2.instance_id, + vec![imm2_1_3.batch_id()], + )])); + finish_txs.push(finish_tx); + let _sync_rx = sync_epoch(table_id2, epoch1); + wait_task_start.await; + + let imm2_2_1 = gen_imm(table_id2, epoch2, 0); + write_imm(&read_version2, &guard2, &imm2_2_1); + flush_event().await; + let imm2_2_2 = gen_imm(table_id2, epoch2, 1); + write_imm(&read_version2, &guard2, &imm2_2_2); + let (wait_task_start, finish_tx) = new_task_notifier(HashMap::from_iter([( + guard2.instance_id, + vec![imm2_2_2.batch_id(), imm2_2_1.batch_id()], + )])); + finish_txs.push(finish_tx); + wait_task_start.await; + + let imm2_2_3 = gen_imm(table_id2, epoch2, 2); + write_imm(&read_version2, &guard2, &imm2_2_3); + + // by now, the state in uploader of table_id2 + // syncing: epoch1 -> spill: [imm2_1_2, imm2_1_1], sync: [imm2_1_3] + // unsync: epoch2 -> spilling: [imm2_2_2, imm2_2_1], imm: [imm2_2_3] + // the state in uploader of table_id1 + // unsync: epoch2 -> spilling [imm1_2] + // syncing: epoch1 -> [imm1_1] + + drop(guard2); + let (clear_tx, clear_rx) = oneshot::channel(); + send_event(HummockEvent::Clear( + clear_tx, + initial_version.id, + Some(HashSet::from_iter([table_id2])), + )); + clear_rx.await.unwrap(); + (task1_2_finish_tx, finish_txs) + }; + + let imm1_2_2 = gen_imm(table_id1, epoch2, 1); + write_imm(&read_version1, &guard1, &imm1_2_2); + start_epoch(table_id1, epoch3); + seal_epoch(&guard1, epoch3); + + let (tx2, mut sync_rx2) = oneshot::channel(); + let (wait_task_start, task1_2_2_finish_tx) = new_task_notifier(HashMap::from_iter([( + guard1.instance_id, + vec![imm1_2_2.batch_id()], + )])); + send_event(HummockEvent::SyncEpoch { + new_sync_epoch: epoch2, + sync_result_sender: tx2, + table_ids: HashSet::from_iter([table_id1]), + }); + wait_task_start.await; + assert!(poll_fn(|cx| Poll::Ready(sync_rx2.poll_unpin(cx).is_pending())).await); + + task1_1_finish_tx.send(()).unwrap(); + let sync_data1 = task1_1_rx.await.unwrap().unwrap(); + sync_data1 + .uploaded_ssts + .iter() + .all(|sst| sst.epochs() == &vec![epoch1]); + task1_2_finish_tx.send(()).unwrap(); + assert!(poll_fn(|cx| Poll::Ready(sync_rx2.poll_unpin(cx).is_pending())).await); + task1_2_2_finish_tx.send(()).unwrap(); + let sync_data2 = sync_rx2.await.unwrap().unwrap(); + sync_data2 + .uploaded_ssts + .iter() + .all(|sst| sst.epochs() == &vec![epoch2]); + + send_event(HummockEvent::Shutdown); + join_handle.await.unwrap(); } } diff --git a/src/storage/src/hummock/event_handler/mod.rs b/src/storage/src/hummock/event_handler/mod.rs index f0a4b2a899874..1406753c17068 100644 --- a/src/storage/src/hummock/event_handler/mod.rs +++ b/src/storage/src/hummock/event_handler/mod.rs @@ -65,7 +65,11 @@ pub enum HummockEvent { }, /// Clear shared buffer and reset all states - Clear(oneshot::Sender<()>, HummockVersionId), + Clear( + oneshot::Sender<()>, + HummockVersionId, + Option>, + ), Shutdown, @@ -122,7 +126,9 @@ impl HummockEvent { table_ids, } => format!("AwaitSyncEpoch epoch {} {:?}", new_sync_epoch, table_ids), - HummockEvent::Clear(_, version_id) => format!("Clear {}", version_id), + HummockEvent::Clear(_, version_id, table_ids) => { + format!("Clear {} {:?}", version_id, table_ids) + } HummockEvent::Shutdown => "Shutdown".to_string(), diff --git a/src/storage/src/hummock/event_handler/uploader/mod.rs b/src/storage/src/hummock/event_handler/uploader/mod.rs index 8326b3b876890..96b565c00ef49 100644 --- a/src/storage/src/hummock/event_handler/uploader/mod.rs +++ b/src/storage/src/hummock/event_handler/uploader/mod.rs @@ -53,7 +53,6 @@ use crate::hummock::store::version::StagingSstableInfo; use crate::hummock::{HummockError, HummockResult, ImmutableMemtable}; use crate::mem_table::ImmId; use crate::monitor::HummockStateStoreMetrics; -use crate::opts::StorageOpts; use crate::store::SealCurrentEpochOptions; /// Take epoch data inclusively before `epoch` out from `data` @@ -784,6 +783,7 @@ struct UnsyncData { // An index as a mapping from instance id to its table id instance_table_id: HashMap, unsync_epochs: HashMap>, + spilled_data: HashMap, HashSet)>, } impl UnsyncData { @@ -903,6 +903,43 @@ impl UnsyncData { None } } + + fn clear_tables(&mut self, table_ids: &HashSet, task_manager: &mut TaskManager) { + for table_id in table_ids { + if let Some(table_unsync_data) = self.table_data.remove(table_id) { + for task_id in table_unsync_data.spill_tasks.into_values().flatten() { + if let Some(task_status) = task_manager.abort_task(task_id) { + must_match!(task_status, UploadingTaskStatus::Spilling(spill_table_ids) => { + assert!(spill_table_ids.is_subset(table_ids)); + }); + } + if let Some((_, spill_table_ids)) = self.spilled_data.remove(&task_id) { + assert!(spill_table_ids.is_subset(table_ids)); + } + } + assert!( + table_unsync_data.instance_data.is_empty(), + "should be clear when dropping the read version instance" + ); + } + } + debug_assert!(self + .spilled_data + .values() + .all(|(_, spill_table_ids)| spill_table_ids.is_disjoint(table_ids))); + self.unsync_epochs.retain(|_, unsync_epoch_table_ids| { + if !unsync_epoch_table_ids.is_disjoint(table_ids) { + assert!(unsync_epoch_table_ids.is_subset(table_ids)); + false + } else { + true + } + }); + assert!(self + .instance_table_id + .values() + .all(|table_id| !table_ids.contains(table_id))); + } } impl UploaderData { @@ -958,7 +995,7 @@ impl UploaderData { ); } for task_id in task_ids { - if self.spilled_data.contains_key(&task_id) { + if self.unsync_data.spilled_data.contains_key(&task_id) { spilled_tasks.insert(task_id); } else { uploading_tasks.insert(task_id); @@ -988,8 +1025,11 @@ impl UploaderData { .iter() .rev() .map(|task_id| { - let (sst, spill_table_ids) = - self.spilled_data.remove(task_id).expect("should exist"); + let (sst, spill_table_ids) = self + .unsync_data + .spilled_data + .remove(task_id) + .expect("should exist"); assert!( spill_table_ids.is_subset(&table_ids), "spilled tabled ids {:?} not a subset of sync table id {:?}", @@ -1057,7 +1097,6 @@ impl UploaderContext { pinned_version: PinnedVersion, spawn_upload_task: SpawnUploadTask, buffer_tracker: BufferTracker, - _config: &StorageOpts, stats: Arc, ) -> Self { UploaderContext { @@ -1079,20 +1118,52 @@ struct UploaderData { syncing_data: BTreeMap, task_manager: TaskManager, - spilled_data: HashMap, HashSet)>, next_sync_id: usize, } impl UploaderData { fn abort(self, err: impl Fn() -> HummockError) { - self.task_manager.abort(); + self.task_manager.abort_all_tasks(); for syncing_data in self.syncing_data.into_values() { send_sync_result(syncing_data.sync_result_sender, Err(err())); } } + fn clear_tables(&mut self, table_ids: HashSet) { + if table_ids.is_empty() { + return; + } + self.unsync_data + .clear_tables(&table_ids, &mut self.task_manager); + self.syncing_data.retain(|sync_id, syncing_data| { + if !syncing_data.table_ids.is_disjoint(&table_ids) { + assert!(syncing_data.table_ids.is_subset(&table_ids)); + for task_id in &syncing_data.remaining_uploading_tasks { + match self + .task_manager + .abort_task(*task_id) + .expect("should exist") + { + UploadingTaskStatus::Spilling(spill_table_ids) => { + assert!(spill_table_ids.is_subset(&table_ids)); + } + UploadingTaskStatus::Sync(task_sync_id) => { + assert_eq!(sync_id, &task_sync_id); + } + } + } + false + } else { + true + } + }); + + self.check_upload_task_consistency(); + } + fn min_uncommitted_sst_id(&self) -> Option { - self.spilled_data + self.unsync_data + .spilled_data .values() .map(|(s, _)| s) .chain(self.syncing_data.values().flat_map(|s| s.uploaded.iter())) @@ -1141,7 +1212,6 @@ impl HummockUploader { pinned_version: PinnedVersion, spawn_upload_task: SpawnUploadTask, buffer_tracker: BufferTracker, - config: &StorageOpts, ) -> Self { Self { state: UploaderState::Working(UploaderData::default()), @@ -1149,7 +1219,6 @@ impl HummockUploader { pinned_version, spawn_upload_task, buffer_tracker, - config, state_store_metrics, ), } @@ -1308,15 +1377,21 @@ impl HummockUploader { } } - pub(crate) fn clear(&mut self) { - if let UploaderState::Working(data) = replace( - &mut self.state, - UploaderState::Working(UploaderData::default()), - ) { - data.abort(|| HummockError::other("uploader is reset")); - } + pub(crate) fn clear(&mut self, table_ids: Option>) { + if let Some(table_ids) = table_ids { + if let UploaderState::Working(data) = &mut self.state { + data.clear_tables(table_ids); + } + } else { + if let UploaderState::Working(data) = replace( + &mut self.state, + UploaderState::Working(UploaderData::default()), + ) { + data.abort(|| HummockError::other("uploader is reset")); + } - self.context.stats.uploader_syncing_epoch_count.set(0); + self.context.stats.uploader_syncing_epoch_count.set(0); + } } pub(crate) fn may_destroy_instance(&mut self, instance_id: LocalInstanceId) { @@ -1331,10 +1406,11 @@ impl HummockUploader { .into_values() .flat_map(|task_ids| task_ids.into_iter()) .filter(|task_id| { - if let Some((_, table_ids)) = data.spilled_data.get_mut(task_id) { + if let Some((_, table_ids)) = data.unsync_data.spilled_data.get_mut(task_id) + { assert!(table_ids.remove(&removed_table_data.table_id)); if table_ids.is_empty() { - data.spilled_data.remove(task_id); + data.unsync_data.spilled_data.remove(task_id); } false } else { @@ -1422,7 +1498,7 @@ impl UploaderData { .collect(); let mut spill_task_table_id_from_manager: HashMap<_, HashSet<_>> = HashMap::new(); - for (task_id, (_, table_ids)) in &self.spilled_data { + for (task_id, (_, table_ids)) in &self.unsync_data.spilled_data { spill_task_table_id_from_manager.insert(*task_id, table_ids.clone()); } let mut syncing_task_from_manager: HashMap<_, HashSet<_>> = HashMap::new(); @@ -1473,7 +1549,9 @@ impl HummockUploader { data.may_notify_sync_task(&self.context); } UploadingTaskStatus::Spilling(table_ids) => { - data.spilled_data.insert(task_id, (sst.clone(), table_ids)); + data.unsync_data + .spilled_data + .insert(task_id, (sst.clone(), table_ids)); } } data.check_upload_task_consistency(); diff --git a/src/storage/src/hummock/event_handler/uploader/spiller.rs b/src/storage/src/hummock/event_handler/uploader/spiller.rs index ba04d85856ace..4e560c36eacf0 100644 --- a/src/storage/src/hummock/event_handler/uploader/spiller.rs +++ b/src/storage/src/hummock/event_handler/uploader/spiller.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::cmp::Ordering; use std::collections::{HashMap, HashSet}; use risingwave_common::catalog::TableId; @@ -75,7 +76,20 @@ impl<'a> Spiller<'a> { if let Some(unsync_epoch_id) = self .epoch_info .iter() - .max_by_key(|(_, info)| info.payload_size) + .max_by( + |(UnsyncEpochId(_, table1), info1), (UnsyncEpochId(_, table2), info2)| { + info1.payload_size.cmp(&info2.payload_size).then_with(|| { + if !cfg!(test) { + Ordering::Equal + } else { + assert_ne!(table1, table2); + // enforce deterministic spill order in test + // smaller table id will be spilled first. + table2.cmp(table1) + } + }) + }, + ) .map(|(unsync_epoch_id, _)| *unsync_epoch_id) { let spill_epoch = unsync_epoch_id.epoch(); diff --git a/src/storage/src/hummock/event_handler/uploader/task_manager.rs b/src/storage/src/hummock/event_handler/uploader/task_manager.rs index 2347be1ed57eb..fd53fae1db322 100644 --- a/src/storage/src/hummock/event_handler/uploader/task_manager.rs +++ b/src/storage/src/hummock/event_handler/uploader/task_manager.rs @@ -97,12 +97,21 @@ impl TaskManager { } } - pub(super) fn abort(self) { + pub(super) fn abort_all_tasks(self) { for task in self.tasks.into_values() { task.task.join_handle.abort(); } } + pub(super) fn abort_task(&mut self, task_id: UploadingTaskId) -> Option { + self.tasks.remove(&task_id).map(|entry| { + entry.task.join_handle.abort(); + self.task_order + .retain(|inflight_task_id| *inflight_task_id != task_id); + entry.status + }) + } + pub(super) fn spill( &mut self, context: &UploaderContext, diff --git a/src/storage/src/hummock/event_handler/uploader/test_utils.rs b/src/storage/src/hummock/event_handler/uploader/test_utils.rs index 6eb41bda52071..3e7b92624109a 100644 --- a/src/storage/src/hummock/event_handler/uploader/test_utils.rs +++ b/src/storage/src/hummock/event_handler/uploader/test_utils.rs @@ -45,8 +45,9 @@ use tokio::task::yield_now; use crate::hummock::event_handler::hummock_event_handler::BufferTracker; use crate::hummock::event_handler::uploader::uploader_imm::UploaderImm; use crate::hummock::event_handler::uploader::{ - HummockUploader, TableUnsyncData, UploadTaskInfo, UploadTaskOutput, UploadTaskPayload, - UploaderContext, UploaderData, UploaderState, UploadingTask, UploadingTaskId, + HummockUploader, SpawnUploadTask, TableUnsyncData, UploadTaskInfo, UploadTaskOutput, + UploadTaskPayload, UploaderContext, UploaderData, UploaderState, UploadingTask, + UploadingTaskId, }; use crate::hummock::event_handler::{LocalInstanceId, TEST_LOCAL_INSTANCE_ID}; use crate::hummock::local_version::pinned_version::PinnedVersion; @@ -121,7 +122,7 @@ pub(super) async fn gen_imm_with_limiter( gen_imm_inner(TEST_TABLE_ID, epoch, 0, limiter).await } -pub(super) async fn gen_imm_inner( +pub(crate) async fn gen_imm_inner( table_id: TableId, epoch: HummockEpoch, spill_offset: u16, @@ -176,12 +177,10 @@ where Fut: UploadOutputFuture, F: UploadFn, { - let config = StorageOpts::default(); UploaderContext::new( initial_pinned_version(), Arc::new(move |payload, task_info| spawn(upload_fn(payload, task_info))), BufferTracker::for_test(), - &config, Arc::new(HummockStateStoreMetrics::unused()), ) } @@ -191,15 +190,11 @@ where Fut: UploadOutputFuture, F: UploadFn, { - let config = StorageOpts { - ..Default::default() - }; HummockUploader::new( Arc::new(HummockStateStoreMetrics::unused()), initial_pinned_version(), Arc::new(move |payload, task_info| spawn(upload_fn(payload, task_info))), BufferTracker::for_test(), - &config, ) } @@ -272,16 +267,12 @@ impl HummockUploader { } #[expect(clippy::type_complexity)] -pub(crate) fn prepare_uploader_order_test( - config: &StorageOpts, +pub(crate) fn prepare_uploader_order_test_spawn_task_fn( skip_schedule: bool, ) -> ( - BufferTracker, - HummockUploader, + SpawnUploadTask, impl Fn(HashMap>) -> (BoxFuture<'static, ()>, oneshot::Sender<()>), ) { - let gauge = GenericGauge::new("test", "test").unwrap(); - let buffer_tracker = BufferTracker::from_storage_opts(config, gauge); // (the started task send the imm ids of payload, the started task wait for finish notify) #[allow(clippy::type_complexity)] let task_notifier_holder: Arc< @@ -304,35 +295,51 @@ pub(crate) fn prepare_uploader_order_test( (await_start_future, finish_tx) } }; + let spawn_fn = Arc::new({ + move |_, task_info: UploadTaskInfo| { + let task_notifier_holder = task_notifier_holder.clone(); + let task_item = task_notifier_holder.lock().pop_back(); + let start_epoch = *task_info.epochs.last().unwrap(); + let end_epoch = *task_info.epochs.first().unwrap(); + assert!(end_epoch >= start_epoch); + spawn(async move { + let ssts = gen_sstable_info(start_epoch, end_epoch); + if !skip_schedule { + let (start_tx, finish_rx) = task_item.unwrap(); + start_tx.send(task_info).unwrap(); + finish_rx + .await + .map_err(|_| HummockError::other("failed to receive rx"))?; + } + Ok(UploadTaskOutput { + new_value_ssts: ssts, + old_value_ssts: vec![], + wait_poll_timer: None, + }) + }) + } + }); + (spawn_fn, new_task_notifier) +} + +#[expect(clippy::type_complexity)] +pub(crate) fn prepare_uploader_order_test( + config: &StorageOpts, + skip_schedule: bool, +) -> ( + BufferTracker, + HummockUploader, + impl Fn(HashMap>) -> (BoxFuture<'static, ()>, oneshot::Sender<()>), +) { + let (spawn_fn, new_task_notifier) = prepare_uploader_order_test_spawn_task_fn(skip_schedule); + let gauge = GenericGauge::new("test", "test").unwrap(); + let buffer_tracker = BufferTracker::from_storage_opts(config, gauge); - let config = StorageOpts::default(); let uploader = HummockUploader::new( Arc::new(HummockStateStoreMetrics::unused()), initial_pinned_version(), - Arc::new({ - move |_, task_info: UploadTaskInfo| { - let task_notifier_holder = task_notifier_holder.clone(); - let task_item = task_notifier_holder.lock().pop_back(); - let start_epoch = *task_info.epochs.last().unwrap(); - let end_epoch = *task_info.epochs.first().unwrap(); - assert!(end_epoch >= start_epoch); - spawn(async move { - let ssts = gen_sstable_info(start_epoch, end_epoch); - if !skip_schedule { - let (start_tx, finish_rx) = task_item.unwrap(); - start_tx.send(task_info).unwrap(); - finish_rx.await.unwrap(); - } - Ok(UploadTaskOutput { - new_value_ssts: ssts, - old_value_ssts: vec![], - wait_poll_timer: None, - }) - }) - } - }), + spawn_fn, buffer_tracker.clone(), - &config, ); (buffer_tracker, uploader, new_task_notifier) } diff --git a/src/storage/src/hummock/store/hummock_storage.rs b/src/storage/src/hummock/store/hummock_storage.rs index 073dd9e1dc1c4..1d3d5b43f8fb9 100644 --- a/src/storage/src/hummock/store/hummock_storage.rs +++ b/src/storage/src/hummock/store/hummock_storage.rs @@ -513,11 +513,21 @@ impl HummockStorage { pub async fn clear_shared_buffer(&self, version_id: HummockVersionId) { let (tx, rx) = oneshot::channel(); self.hummock_event_sender - .send(HummockEvent::Clear(tx, version_id)) + .send(HummockEvent::Clear(tx, version_id, None)) .expect("should send success"); rx.await.expect("should wait success"); } + pub async fn clear_tables(&self, version_id: HummockVersionId, table_ids: HashSet) { + if !table_ids.is_empty() { + let (tx, rx) = oneshot::channel(); + self.hummock_event_sender + .send(HummockEvent::Clear(tx, version_id, Some(table_ids))) + .expect("should send success"); + rx.await.expect("should wait success"); + } + } + /// Declare the start of an epoch. This information is provided for spill so that the spill task won't /// include data of two or more syncs. // TODO: remove this method when we support spill task that can include data of more two or more syncs