diff --git a/src/common/src/config.rs b/src/common/src/config.rs index a5cb55445c070..91939e8f7f00b 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -618,6 +618,7 @@ pub struct StorageConfig { /// The threshold for the number of immutable memtables to merge to a new imm. #[serde(default = "default::storage::imm_merge_threshold")] + #[deprecated] pub imm_merge_threshold: usize, /// Whether to enable write conflict detection diff --git a/src/storage/hummock_test/src/hummock_read_version_tests.rs b/src/storage/hummock_test/src/hummock_read_version_tests.rs index 993ad88d0bf33..2329970b485f5 100644 --- a/src/storage/hummock_test/src/hummock_read_version_tests.rs +++ b/src/storage/hummock_test/src/hummock_read_version_tests.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; use std::ops::Bound; use std::sync::Arc; @@ -26,6 +27,7 @@ use risingwave_hummock_sdk::key::{key_with_epoch, map_table_key_range}; use risingwave_hummock_sdk::LocalSstableInfo; use risingwave_meta::hummock::test_utils::setup_compute_env; use risingwave_pb::hummock::{KeyRange, SstableInfo}; +use risingwave_storage::hummock::event_handler::TEST_LOCAL_INSTANCE_ID; use risingwave_storage::hummock::iterator::test_utils::{ iterator_test_table_key_of, iterator_test_user_key_of, }; @@ -48,7 +50,12 @@ async fn test_read_version_basic() { let mut epoch = test_epoch(1); let table_id = 0; let vnodes = Arc::new(Bitmap::ones(VirtualNode::COUNT)); - let mut read_version = HummockReadVersion::new(TableId::from(table_id), pinned_version, vnodes); + let mut read_version = HummockReadVersion::new( + TableId::from(table_id), + TEST_LOCAL_INSTANCE_ID, + pinned_version, + vnodes, + ); { // single imm @@ -178,7 +185,7 @@ async fn test_read_version_basic() { ], vec![], epoch_id_vec_for_clear, - batch_id_vec_for_clear, + HashMap::from_iter([(TEST_LOCAL_INSTANCE_ID, batch_id_vec_for_clear)]), 1, )); @@ -267,6 +274,7 @@ async fn test_read_filter_basic() { let vnodes = Arc::new(Bitmap::ones(VirtualNode::COUNT)); let read_version = Arc::new(RwLock::new(HummockReadVersion::new( TableId::from(table_id), + TEST_LOCAL_INSTANCE_ID, pinned_version, vnodes.clone(), ))); diff --git a/src/storage/src/hummock/event_handler/hummock_event_handler.rs b/src/storage/src/hummock/event_handler/hummock_event_handler.rs index 682586f789955..e68760f3c419f 100644 --- a/src/storage/src/hummock/event_handler/hummock_event_handler.rs +++ b/src/storage/src/hummock/event_handler/hummock_event_handler.rs @@ -12,12 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{BTreeMap, HashMap}; -use std::ops::DerefMut; +use std::collections::{BTreeMap, HashMap, HashSet, VecDeque}; use std::pin::pin; use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering::Relaxed; use std::sync::{Arc, LazyLock}; +use std::time::Duration; use arc_swap::ArcSwap; use await_tree::InstrumentAwait; @@ -43,8 +43,8 @@ use crate::hummock::compactor::{await_tree_key, compact, CompactorContext}; use crate::hummock::conflict_detector::ConflictDetector; use crate::hummock::event_handler::refiller::{CacheRefillerEvent, SpawnRefillTask}; use crate::hummock::event_handler::uploader::{ - default_spawn_merging_task, HummockUploader, SpawnMergingTask, SpawnUploadTask, SyncedData, - UploadTaskInfo, UploadTaskOutput, UploadTaskPayload, UploaderEvent, + HummockUploader, SpawnUploadTask, SyncedData, UploadTaskInfo, UploadTaskOutput, + UploadTaskPayload, UploaderEvent, }; use crate::hummock::event_handler::{ HummockEvent, HummockReadVersionRef, HummockVersionUpdate, ReadOnlyReadVersionMapping, @@ -258,7 +258,10 @@ impl HummockEventHandler { let future = async move { let _timer = upload_task_latency.start_timer(); let mut output = flush_imms( - payload, + payload + .values() + .flat_map(|imms| imms.iter().cloned()) + .collect(), task_info, upload_compactor_context.clone(), filter_key_extractor_manager.clone(), @@ -281,10 +284,6 @@ impl HummockEventHandler { } }) }), - default_spawn_merging_task( - compactor_context.compaction_executor.clone(), - compactor_context.await_tree_reg.clone(), - ), CacheRefiller::default_spawn_refill_task(), ) } @@ -297,7 +296,6 @@ impl HummockEventHandler { state_store_metrics: Arc, storage_opts: &StorageOpts, spawn_upload_task: SpawnUploadTask, - spawn_merging_task: SpawnMergingTask, spawn_refill_task: SpawnRefillTask, ) -> Self { let (hummock_event_tx, hummock_event_rx) = @@ -331,7 +329,6 @@ impl HummockEventHandler { state_store_metrics, pinned_version.clone(), spawn_upload_task, - spawn_merging_task, buffer_tracker, storage_opts, ); @@ -388,20 +385,29 @@ impl HummockEventHandler { newly_uploaded_sstables: Vec, ) { debug!("epoch has been synced: {}.", epoch); + let newly_uploaded_sstables = newly_uploaded_sstables + .into_iter() + .map(Arc::new) + .collect_vec(); if !newly_uploaded_sstables.is_empty() { - newly_uploaded_sstables - .into_iter() - // Take rev because newer data come first in `newly_uploaded_sstables` but we apply - // older data first - .rev() - .for_each(|staging_sstable_info| { - let staging_sstable_info_ref = Arc::new(staging_sstable_info); - self.for_each_read_version(|read_version| { - read_version.update(VersionUpdate::Staging(StagingData::Sst( - staging_sstable_info_ref.clone(), - ))) + let related_instance_ids: HashSet<_> = newly_uploaded_sstables + .iter() + .flat_map(|sst| sst.imm_ids().keys().cloned()) + .collect(); + self.for_each_read_version(related_instance_ids, |instance_id, read_version| { + newly_uploaded_sstables + .iter() + // Take rev because newer data come first in `newly_uploaded_sstables` but we apply + // older data first + .rev() + .for_each(|staging_sstable_info| { + if staging_sstable_info.imm_ids().contains_key(&instance_id) { + read_version.update(VersionUpdate::Staging(StagingData::Sst( + staging_sstable_info.clone(), + ))); + } }); - }); + }); } let result = self .uploader @@ -434,21 +440,74 @@ impl HummockEventHandler { /// This function will be performed under the protection of the `read_version_mapping` read /// lock, and add write lock on each `read_version` operation - fn for_each_read_version(&self, mut f: impl FnMut(&mut HummockReadVersion)) { - self.local_read_version_mapping - .values() - .for_each(|read_version: &HummockReadVersionRef| f(read_version.write().deref_mut())); + fn for_each_read_version( + &self, + instances: impl IntoIterator, + mut f: impl FnMut(LocalInstanceId, &mut HummockReadVersion), + ) { + let instances = { + #[cfg(debug_assertion)] + { + // check duplication on debug_mode + use std::collections::HashSet; + let mut id_set = HashSet::new(); + for instance in instances { + assert!(id_set.insert(instance)); + } + id_set + } + #[cfg(not(debug_assertion))] + { + instances + } + }; + let mut pending = VecDeque::new(); + let mut total_count = 0; + for instance_id in instances { + let Some(read_version) = self.local_read_version_mapping.get(&instance_id) else { + continue; + }; + total_count += 1; + if let Some(mut write_guard) = read_version.try_write() { + f(instance_id, &mut write_guard); + } else { + pending.push_back(instance_id); + } + } + if !pending.is_empty() { + warn!( + pending_count = pending.len(), + total_count, "cannot acquire lock for all read version" + ); + } + + const TRY_LOCK_TIMEOUT: Duration = Duration::from_millis(1); + + while let Some(instance_id) = pending.pop_front() { + let read_version = self + .local_read_version_mapping + .get(&instance_id) + .expect("have checked exist before"); + if let Some(mut write_guard) = read_version.try_write_for(TRY_LOCK_TIMEOUT) { + f(instance_id, &mut write_guard); + } else { + warn!(instance_id, "failed to get lock again for instance"); + pending.push_back(instance_id); + } + } } fn handle_data_spilled(&mut self, staging_sstable_info: StagingSstableInfo) { - // todo: do some prune for version update let staging_sstable_info = Arc::new(staging_sstable_info); - self.for_each_read_version(|read_version| { - trace!("data_spilled. SST size {}", staging_sstable_info.imm_size()); - read_version.update(VersionUpdate::Staging(StagingData::Sst( - staging_sstable_info.clone(), - ))) - }) + trace!("data_spilled. SST size {}", staging_sstable_info.imm_size()); + self.for_each_read_version( + staging_sstable_info.imm_ids().keys().cloned(), + |_, read_version| { + read_version.update(VersionUpdate::Staging(StagingData::Sst( + staging_sstable_info.clone(), + ))) + }, + ) } fn handle_await_sync_epoch( @@ -679,9 +738,13 @@ impl HummockEventHandler { .store(Arc::new(new_pinned_version.clone())); { - self.for_each_read_version(|read_version| { - read_version.update(VersionUpdate::CommittedSnapshot(new_pinned_version.clone())) - }); + self.for_each_read_version( + self.local_read_version_mapping.keys().cloned(), + |_, read_version| { + read_version + .update(VersionUpdate::CommittedSnapshot(new_pinned_version.clone())) + }, + ); } let prev_max_committed_epoch = pinned_version.max_committed_epoch(); @@ -769,26 +832,6 @@ impl HummockEventHandler { let _timer = self.metrics.event_handler_on_spilled_latency.start_timer(); self.handle_data_spilled(staging_sstable_info); } - - UploaderEvent::ImmMerged(merge_output) => { - // update read version for corresponding table shards - if let Some(read_version) = self - .local_read_version_mapping - .get(&merge_output.instance_id) - { - read_version - .write() - .update(VersionUpdate::Staging(StagingData::MergedImmMem( - merge_output.merged_imm, - merge_output.imm_ids, - ))); - } else { - warn!( - "handle ImmMerged: table instance not found. table {:?}, instance {}", - &merge_output.table_id, &merge_output.instance_id - ) - } - } } } @@ -830,9 +873,6 @@ impl HummockEventHandler { if is_checkpoint { self.uploader.start_sync_epoch(epoch); - } else { - // start merging task on non-checkpoint epochs sealed - self.uploader.start_merge_imms(epoch); } } @@ -868,17 +908,17 @@ impl HummockEventHandler { vnodes, } => { let pinned_version = self.pinned_version.load(); + let instance_id = self.generate_instance_id(); let basic_read_version = Arc::new(RwLock::new( HummockReadVersion::new_with_replication_option( table_id, + instance_id, (**pinned_version).clone(), is_replicated, vnodes, ), )); - let instance_id = self.generate_instance_id(); - debug!( "new read version registered: table_id: {}, instance_id: {}", table_id, instance_id @@ -1011,209 +1051,22 @@ fn to_sync_result(result: &HummockResult) -> HummockResult(); - spawn_upload_task_tx.send(tx).unwrap(); - spawn(async move { - // wait for main thread to notify returning error - rx.await.unwrap(); - Err(HummockError::other("".to_string())) - }) - }), - Arc::new(move |_, _, imms, _| { - let (tx, rx) = oneshot::channel::<()>(); - let (finish_tx, finish_rx) = oneshot::channel::<()>(); - spawn_merging_task_tx.send((tx, finish_rx)).unwrap(); - spawn(async move { - rx.await.unwrap(); - finish_tx.send(()).unwrap(); - imms[0].clone() - }) - }), - CacheRefiller::default_spawn_refill_task(), - ); - - let tx = event_handler.event_sender(); - - let _join_handle = spawn(event_handler.start_hummock_event_handler_worker()); - - let (read_version_tx, read_version_rx) = oneshot::channel(); - - tx.send(HummockEvent::RegisterReadVersion { - table_id, - new_read_version_sender: read_version_tx, - is_replicated: false, - vnodes: Arc::new(Bitmap::ones(VirtualNode::COUNT)), - }) - .unwrap(); - let (read_version, guard) = read_version_rx.await.unwrap(); - let instance_id = guard.instance_id; - - let build_batch = |epoch, spill_offset| { - SharedBufferBatch::build_shared_buffer_batch( - epoch, - spill_offset, - vec![(TableKey(Bytes::from("key")), SharedBufferValue::Delete)], - None, - 10, - table_id, - instance_id, - None, - ) - }; - - let epoch1 = epoch0.next_epoch(); - let imm1 = build_batch(epoch1, 0); - read_version - .write() - .update(VersionUpdate::Staging(StagingData::ImmMem(imm1.clone()))); - tx.send(HummockEvent::ImmToUploader(imm1.clone())).unwrap(); - tx.send(HummockEvent::SealEpoch { - epoch: epoch1, - is_checkpoint: true, - }) - .unwrap(); - let (sync_tx, mut sync_rx) = oneshot::channel(); - tx.send(HummockEvent::AwaitSyncEpoch { - new_sync_epoch: epoch1, - sync_result_sender: sync_tx, - }) - .unwrap(); - - let upload_finish_tx = spawn_upload_task_rx.recv().await.unwrap(); - assert!(poll_fn(|cx| Poll::Ready(sync_rx.poll_unpin(cx))) - .await - .is_pending()); - - let epoch2 = epoch1.next_epoch(); - let mut imm_ids = Vec::new(); - for i in 0..10 { - let imm = build_batch(epoch2, i); - imm_ids.push(imm.batch_id()); - read_version - .write() - .update(VersionUpdate::Staging(StagingData::ImmMem(imm.clone()))); - tx.send(HummockEvent::ImmToUploader(imm)).unwrap(); - } - - for (staging_imm, imm_id) in read_version - .read() - .staging() - .imm - .iter() - .zip_eq_debug(imm_ids.iter().copied().rev().chain(once(imm1.batch_id()))) - { - assert_eq!(staging_imm.batch_id(), imm_id); - } - - // should start merging task - tx.send(HummockEvent::SealEpoch { - epoch: epoch2, - is_checkpoint: false, - }) - .unwrap(); - - println!("before wait spawn merging task"); - - let (merging_start_tx, merging_finish_rx) = spawn_merging_task_rx.recv().await.unwrap(); - merging_start_tx.send(()).unwrap(); - - println!("after wait spawn merging task"); - - // yield to possibly poll the merging task, though it shouldn't poll it because there is unfinished syncing task - yield_now().await; - - for (staging_imm, imm_id) in read_version - .read() - .staging() - .imm - .iter() - .zip_eq_debug(imm_ids.iter().copied().rev().chain(once(imm1.batch_id()))) - { - assert_eq!(staging_imm.batch_id(), imm_id); - } - - upload_finish_tx.send(()).unwrap(); - assert!(sync_rx.await.unwrap().is_err()); - - merging_finish_rx.await.unwrap(); - - // yield to poll the merging task, and then it should have finished. - for _ in 0..10 { - yield_now().await; - } - - assert_eq!( - read_version - .read() - .staging() - .imm - .iter() - .map(|imm| imm.batch_id()) - .collect_vec(), - vec![*imm_ids.last().unwrap(), imm1.batch_id()] - ); - } - #[tokio::test] async fn test_clear_shared_buffer() { let epoch0 = 233; @@ -1243,7 +1096,6 @@ mod tests { Arc::new(HummockStateStoreMetrics::unused()), &default_opts_for_test(), Arc::new(|_, _| unreachable!("should not spawn upload task")), - Arc::new(|_, _, _, _| unreachable!("should not spawn merging task")), Arc::new(move |_, _, old_version, new_version| { let (tx, rx) = oneshot::channel(); refill_task_tx_clone diff --git a/src/storage/src/hummock/event_handler/mod.rs b/src/storage/src/hummock/event_handler/mod.rs index 9e98fac4ee3b1..efbae0ac21544 100644 --- a/src/storage/src/hummock/event_handler/mod.rs +++ b/src/storage/src/hummock/event_handler/mod.rs @@ -170,6 +170,7 @@ impl std::fmt::Debug for HummockEvent { } pub type LocalInstanceId = u64; +pub const TEST_LOCAL_INSTANCE_ID: LocalInstanceId = 233; pub type HummockReadVersionRef = Arc>; pub type ReadVersionMappingType = HashMap>; pub type ReadOnlyReadVersionMapping = ReadOnlyRwLockRef; diff --git a/src/storage/src/hummock/event_handler/uploader.rs b/src/storage/src/hummock/event_handler/uploader.rs index e4b5e6806c28b..c80bb0a9577cc 100644 --- a/src/storage/src/hummock/event_handler/uploader.rs +++ b/src/storage/src/hummock/event_handler/uploader.rs @@ -13,26 +13,23 @@ // limitations under the License. use std::collections::hash_map::Entry; -use std::collections::{BTreeMap, BTreeSet, HashMap, VecDeque}; +use std::collections::{BTreeMap, HashMap, VecDeque}; use std::fmt::{Debug, Display, Formatter}; use std::future::{poll_fn, Future}; -use std::mem::swap; +use std::mem::take; use std::pin::Pin; -use std::sync::atomic::AtomicUsize; -use std::sync::atomic::Ordering::Relaxed; -use std::sync::{Arc, LazyLock}; +use std::sync::Arc; use std::task::{ready, Context, Poll}; use futures::future::{try_join_all, TryJoinAll}; use futures::FutureExt; use itertools::Itertools; -use more_asserts::{assert_ge, assert_gt, assert_le}; +use more_asserts::{assert_ge, assert_gt}; use prometheus::core::{AtomicU64, GenericGauge}; use prometheus::{HistogramTimer, IntGauge}; use risingwave_common::buffer::BitmapBuilder; use risingwave_common::catalog::TableId; use risingwave_common::hash::VirtualNode; -use risingwave_hummock_sdk::key::EPOCH_LEN; use risingwave_hummock_sdk::table_watermark::{ TableWatermarks, VnodeWatermark, WatermarkDirection, }; @@ -41,19 +38,16 @@ use thiserror_ext::AsReport; use tokio::task::JoinHandle; use tracing::{debug, error, info}; -use crate::hummock::compactor::{ - await_tree_key, merge_imms_in_memory, CompactionAwaitTreeRegRef, CompactionExecutor, -}; use crate::hummock::event_handler::hummock_event_handler::BufferTracker; use crate::hummock::event_handler::LocalInstanceId; use crate::hummock::local_version::pinned_version::PinnedVersion; use crate::hummock::store::version::StagingSstableInfo; -use crate::hummock::utils::MemoryTracker; use crate::hummock::{HummockError, HummockResult, ImmutableMemtable}; use crate::mem_table::ImmId; use crate::monitor::HummockStateStoreMetrics; use crate::opts::StorageOpts; +pub type UploadTaskInput = HashMap>; pub type UploadTaskPayload = Vec; #[derive(Debug)] @@ -63,64 +57,17 @@ pub struct UploadTaskOutput { pub wait_poll_timer: Option, } pub type SpawnUploadTask = Arc< - dyn Fn(UploadTaskPayload, UploadTaskInfo) -> JoinHandle> - + Send - + Sync - + 'static, ->; - -pub type SpawnMergingTask = Arc< - dyn Fn( - TableId, - LocalInstanceId, - Vec, - Option, - ) -> JoinHandle + dyn Fn(UploadTaskInput, UploadTaskInfo) -> JoinHandle> + Send + Sync + 'static, >; -pub(crate) fn default_spawn_merging_task( - compaction_executor: Arc, - await_tree_reg: Option, -) -> SpawnMergingTask { - Arc::new(move |table_id, instance_id, imms, tracker| { - compaction_executor.spawn({ - static NEXT_MERGING_TASK_ID: LazyLock = - LazyLock::new(|| AtomicUsize::new(0)); - let tree_root = await_tree_reg.as_ref().map(|reg| { - let merging_task_id = NEXT_MERGING_TASK_ID.fetch_add(1, Relaxed); - reg.register( - await_tree_key::MergingTask { - id: merging_task_id, - }, - format!( - "Merging Imm {:?} {:?} {:?}", - table_id, - instance_id, - imms.iter() - .flat_map(|imm| imm.epochs().iter()) - .copied() - .collect::>() - ), - ) - }); - let future = merge_imms_in_memory(table_id, instance_id, imms, tracker); - if let Some(root) = tree_root { - root.instrument(future).left_future() - } else { - future.right_future() - } - }) - }) -} - #[derive(Clone)] pub struct UploadTaskInfo { pub task_size: usize, pub epochs: Vec, - pub imm_ids: Vec, + pub imm_ids: HashMap>, pub compaction_group_index: Arc>, } @@ -147,7 +94,8 @@ impl Debug for UploadTaskInfo { /// A wrapper for a uploading task that compacts and uploads the imm payload. Task context are /// stored so that when the task fails, it can be re-tried. struct UploadingTask { - payload: UploadTaskPayload, + // newer data at the front + payload: UploadTaskInput, join_handle: JoinHandle>, task_info: UploadTaskInfo, spawn_upload_task: SpawnUploadTask, @@ -155,75 +103,6 @@ struct UploadingTask { task_count_guard: IntGauge, } -pub struct MergeImmTaskOutput { - /// Input imm ids of the merging task. Larger imm ids at the front. - pub imm_ids: Vec, - pub table_id: TableId, - pub instance_id: LocalInstanceId, - pub merged_imm: ImmutableMemtable, -} - -// A future that merges multiple immutable memtables to a single immutable memtable. -struct MergingImmTask { - table_id: TableId, - instance_id: LocalInstanceId, - input_imms: Vec, - join_handle: JoinHandle, -} - -impl Debug for MergingImmTask { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - f.debug_struct("MergingImmTask") - .field("table_id", &self.table_id) - .field("instance_id", &self.instance_id) - .field("input_imms", &self.input_imms) - .finish() - } -} - -impl MergingImmTask { - fn new( - table_id: TableId, - instance_id: LocalInstanceId, - imms: Vec, - memory_tracker: Option, - context: &UploaderContext, - ) -> Self { - assert!(imms.iter().rev().map(|imm| imm.batch_id()).is_sorted()); - let input_imms = imms.clone(); - let join_handle = (context.spawn_merging_task)(table_id, instance_id, imms, memory_tracker); - - MergingImmTask { - table_id, - instance_id, - input_imms, - join_handle, - } - } - - /// Poll the result of the merge task - fn poll_result(&mut self, cx: &mut Context<'_>) -> Poll { - Poll::Ready(match ready!(self.join_handle.poll_unpin(cx)) { - Ok(task_result) => task_result, - Err(err) => { - panic!( - "failed to join merging task: {:?} {:?}", - err.as_report(), - self - ); - } - }) - } -} - -impl Future for MergingImmTask { - type Output = ImmutableMemtable; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - self.poll_result(cx) - } -} - impl Drop for UploadingTask { fn drop(&mut self) { self.task_size_guard.sub(self.task_info.task_size as u64); @@ -244,19 +123,30 @@ impl UploadingTask { // INFO logs will be enabled for task with size exceeding 50MB. const LOG_THRESHOLD_FOR_UPLOAD_TASK_SIZE: usize = 50 * (1 << 20); - fn new(payload: UploadTaskPayload, context: &UploaderContext) -> Self { + fn new(payload: UploadTaskInput, context: &UploaderContext) -> Self { assert!(!payload.is_empty()); let mut epochs = payload .iter() - .flat_map(|imm| imm.epochs().clone()) + .flat_map(|(_, imms)| imms.iter().flat_map(|imm| imm.epochs().iter().cloned())) .sorted() .dedup() .collect_vec(); // reverse to make newer epochs comes first epochs.reverse(); - let imm_ids = payload.iter().map(|imm| imm.batch_id()).collect_vec(); - let task_size = payload.iter().map(|imm| imm.size()).sum(); + let imm_ids = payload + .iter() + .map(|(instance_id, imms)| { + ( + *instance_id, + imms.iter().map(|imm| imm.batch_id()).collect_vec(), + ) + }) + .collect(); + let task_size = payload + .values() + .map(|imms| imms.iter().map(|imm| imm.size()).sum::()) + .sum(); let task_info = UploadTaskInfo { task_size, epochs, @@ -393,7 +283,7 @@ impl SpilledData { #[derive(Default, Debug)] struct UnsealedEpochData { // newer data at the front - imms: VecDeque, + imms: HashMap>, spilled_data: SpilledData, table_watermarks: HashMap, BitmapBuilder)>, @@ -401,7 +291,10 @@ struct UnsealedEpochData { impl UnsealedEpochData { fn flush(&mut self, context: &UploaderContext) { - let imms = self.imms.drain(..).collect_vec(); + let imms: HashMap<_, _> = take(&mut self.imms) + .into_iter() + .map(|(id, imms)| (id, imms.into_iter().collect_vec())) + .collect(); if !imms.is_empty() { let task = UploadingTask::new(imms, context); context.stats.spill_task_counts_from_unsealed.inc(); @@ -463,16 +356,9 @@ struct SealedData { // newer epochs come first epochs: VecDeque, - // store the output of merge task that will be feed into `flush_imms` procedure - merged_imms: VecDeque, - // Sealed imms grouped by table shard. // newer data (larger imm id) at the front - imms_by_table_shard: HashMap<(TableId, LocalInstanceId), VecDeque>, - - // Merging tasks generated from sealed imms - // it should be safe to directly drop these tasks - merging_tasks: VecDeque, + imms_by_table_shard: HashMap>, spilled_data: SpilledData, @@ -482,9 +368,6 @@ struct SealedData { impl SealedData { fn clear(&mut self) { self.spilled_data.clear(); - self.merging_tasks - .drain(..) - .for_each(|task| task.join_handle.abort()); *self = Self::default(); } @@ -525,15 +408,14 @@ impl SealedData { } // rearrange sealed imms by table shard and in epoch descending order - for imm in unseal_epoch_data.imms.into_iter().rev() { - let queue = self - .imms_by_table_shard - .entry((imm.table_id, imm.instance_id)) - .or_default(); - if let Some(front) = queue.front() { - assert_gt!(imm.batch_id(), front.batch_id()); + for (instance_id, imms) in unseal_epoch_data.imms { + let queue = self.imms_by_table_shard.entry(instance_id).or_default(); + for imm in imms.into_iter().rev() { + if let Some(front) = queue.front() { + assert_gt!(imm.batch_id(), front.batch_id()); + } + queue.push_front(imm); } - queue.push_front(imm); } self.epochs.push_front(epoch); @@ -563,51 +445,12 @@ impl SealedData { } } - fn add_merged_imm(&mut self, merged_imm: &ImmutableMemtable) { - // add merged_imm to merged_imms - self.merged_imms.push_front(merged_imm.clone()); - } - - fn drop_merging_tasks(&mut self) { - // pop from newest merging task to restore candidate imms back - while let Some(task) = self.merging_tasks.pop_front() { - // cancel the task - task.join_handle.abort(); - self.imms_by_table_shard - .get_mut(&(task.table_id, task.instance_id)) - .unwrap() - .extend(task.input_imms.into_iter()); - } - } - // Flush can be triggered by either a sync_epoch or a spill (`may_flush`) request. fn flush(&mut self, context: &UploaderContext, is_spilled: bool) { - // drop unfinished merging tasks - self.drop_merging_tasks(); - - // group imms by epoch and order by epoch - let mut imms_by_epoch: BTreeMap> = BTreeMap::new(); - self.imms_by_table_shard.drain().for_each(|(_, imms)| { - for imm in imms { - debug_assert_eq!(imm.max_epoch(), imm.min_epoch()); - imms_by_epoch.entry(imm.max_epoch()).or_default().push(imm); - } - }); - - // When ImmMerged, we have removed those imms that have been merged from the - // `self.imms`, thus we need to feed merged_imms into the `flush` procedure - // to complete a checkpoint. - let merged_imms = self.merged_imms.drain(..); - - // newer epoch comes first, the order is matter for cleaning the staging imms - // when adding a staging sst to the `StagingVersion` - let payload = imms_by_epoch + let payload: HashMap<_, _> = take(&mut self.imms_by_table_shard) .into_iter() - .rev() - // in `imms`, newer data comes first - .flat_map(|(_epoch, imms)| imms) - .chain(merged_imms) - .collect_vec(); + .map(|(id, imms)| (id, imms.into_iter().collect())) + .collect(); if !payload.is_empty() { let task = UploadingTask::new(payload, context); @@ -623,30 +466,9 @@ impl SealedData { } } - fn poll_success_merge_imm(&mut self, cx: &mut Context<'_>) -> Poll> { - // only poll the oldest merge task if there is any - if let Some(task) = self.merging_tasks.back_mut() { - let merged_imm = ready!(task.poll_unpin(cx)); - - // pop the finished task - let task = self.merging_tasks.pop_back().expect("must exist"); - - Poll::Ready(Some(MergeImmTaskOutput { - imm_ids: task.input_imms.iter().map(|imm| imm.batch_id()).collect(), - table_id: task.table_id, - instance_id: task.instance_id, - merged_imm, - })) - } else { - Poll::Ready(None) - } - } - /// Clear self and return the current sealed data fn drain(&mut self) -> SealedData { - let mut ret = SealedData::default(); - swap(&mut ret, self); - ret + take(self) } #[cfg(test)] @@ -656,21 +478,6 @@ impl SealedData { .map(|imms| imms.len()) .sum() } - - #[cfg(test)] - fn imms_by_epoch(&self) -> BTreeMap> { - let mut imms_by_epoch: BTreeMap> = BTreeMap::new(); - self.imms_by_table_shard.iter().for_each(|(_, imms)| { - for imm in imms { - debug_assert!(imm.max_epoch() == imm.min_epoch()); - imms_by_epoch - .entry(imm.max_epoch()) - .or_default() - .push(imm.clone()); - } - }); - imms_by_epoch - } } struct SyncingData { @@ -703,12 +510,7 @@ struct UploaderContext { pinned_version: PinnedVersion, /// When called, it will spawn a task to flush the imm into sst and return the join handle. spawn_upload_task: SpawnUploadTask, - spawn_merging_task: SpawnMergingTask, buffer_tracker: BufferTracker, - /// The number of immutable memtables that will be merged into a new imm. - /// When the number of imms of a table shard exceeds this threshold, uploader will generate - /// merging tasks to merge them. - imm_merge_threshold: usize, stats: Arc, } @@ -717,17 +519,14 @@ impl UploaderContext { fn new( pinned_version: PinnedVersion, spawn_upload_task: SpawnUploadTask, - spawn_merging_task: SpawnMergingTask, buffer_tracker: BufferTracker, - config: &StorageOpts, + _config: &StorageOpts, stats: Arc, ) -> Self { UploaderContext { pinned_version, spawn_upload_task, - spawn_merging_task, buffer_tracker, - imm_merge_threshold: config.imm_merge_threshold, stats, } } @@ -778,7 +577,6 @@ impl HummockUploader { state_store_metrics: Arc, pinned_version: PinnedVersion, spawn_upload_task: SpawnUploadTask, - spawn_merging_task: SpawnMergingTask, buffer_tracker: BufferTracker, config: &StorageOpts, ) -> Self { @@ -794,7 +592,6 @@ impl HummockUploader { context: UploaderContext::new( pinned_version, spawn_upload_task, - spawn_merging_task, buffer_tracker, config, state_store_metrics, @@ -802,11 +599,6 @@ impl HummockUploader { } } - #[cfg(test)] - pub(crate) fn imm_merge_threshold(&self) -> usize { - self.context.imm_merge_threshold - } - pub(crate) fn buffer_tracker(&self) -> &BufferTracker { &self.context.buffer_tracker } @@ -841,7 +633,11 @@ impl HummockUploader { self.max_sealed_epoch ); let unsealed_data = self.unsealed_data.entry(epoch).or_default(); - unsealed_data.imms.push_front(imm); + unsealed_data + .imms + .entry(imm.instance_id) + .or_default() + .push_front(imm); } pub(crate) fn add_table_watermarks( @@ -897,60 +693,6 @@ impl HummockUploader { self.sealed_data.seal_new_epoch(epoch, unsealed_data); } - pub(crate) fn start_merge_imms(&mut self, sealed_epoch: HummockEpoch) { - // skip merging if merge threshold is 1 - if self.context.imm_merge_threshold <= 1 { - return; - } - - let memory_limiter = self.context.buffer_tracker.get_memory_limiter(); - // scan imms of each table shard to generate merging task - // when the number of imms exceeds the merge threshold - for ((table_id, shard_id), imms) in self - .sealed_data - .imms_by_table_shard - .iter_mut() - .filter(|(_, imms)| imms.len() >= self.context.imm_merge_threshold) - { - let imms_to_merge = imms.drain(..).collect_vec(); - let mut value_count = 0; - let mut imm_size = 0; - imms_to_merge.iter().for_each(|imm| { - // ensure imms are sealed - assert_le!(imm.max_epoch(), sealed_epoch); - value_count += imm.value_count(); - imm_size += imm.size(); - }); - - // acquire memory before generate merge task - // if acquire memory failed, the task will not be generated - let memory_sz = (imm_size + value_count * EPOCH_LEN) as u64; - if let Some(tracker) = memory_limiter.try_require_memory(memory_sz) { - self.sealed_data - .merging_tasks - .push_front(MergingImmTask::new( - *table_id, - *shard_id, - imms_to_merge, - Some(tracker), - &self.context, - )); - } else { - tracing::warn!( - "fail to acqiure memory {} B, skip merging imms for table {}, shard {}", - memory_sz, - table_id, - shard_id - ); - imms.extend(imms_to_merge); - } - } - } - - pub(crate) fn update_sealed_data(&mut self, merged_imm: &ImmutableMemtable) { - self.sealed_data.add_merged_imm(merged_imm); - } - pub(crate) fn start_sync_epoch(&mut self, epoch: HummockEpoch) { debug!("start sync epoch: {}", epoch); assert!( @@ -1161,40 +903,12 @@ impl HummockUploader { } Poll::Ready(None) } - - fn poll_sealed_merge_imm_task( - &mut self, - cx: &mut Context<'_>, - ) -> Poll> { - let poll_ret = self.sealed_data.poll_success_merge_imm(cx); - if let Poll::Ready(Some(output)) = &poll_ret { - let table_id_label = output.table_id.to_string(); - - // monitor finished task - self.context - .stats - .merge_imm_task_counts - .with_label_values(&[table_id_label.as_str()]) - .inc(); - // monitor merge imm memory size - // we should also add up the size of EPOCH stored in each entry - self.context - .stats - .merge_imm_batch_memory_sz - .with_label_values(&[table_id_label.as_str()]) - .inc_by( - (output.merged_imm.size() + output.merged_imm.value_count() * EPOCH_LEN) as _, - ); - } - poll_ret - } } pub(crate) enum UploaderEvent { // staging sstable info of newer data comes first SyncFinish(HummockEpoch, Vec), DataSpilled(StagingSstableInfo), - ImmMerged(MergeImmTaskOutput), } impl HummockUploader { @@ -1212,11 +926,6 @@ impl HummockUploader { return Poll::Ready(UploaderEvent::DataSpilled(sstable_info)); } - if let Some(merge_output) = ready!(self.poll_sealed_merge_imm_task(cx)) { - // add the merged imm into sealed data - self.update_sealed_data(&merge_output.merged_imm); - return Poll::Ready(UploaderEvent::ImmMerged(merge_output)); - } Poll::Pending }) } @@ -1231,7 +940,6 @@ mod tests { use std::sync::atomic::Ordering::SeqCst; use std::sync::Arc; use std::task::Poll; - use std::time::Duration; use bytes::Bytes; use futures::future::BoxFuture; @@ -1249,19 +957,15 @@ mod tests { use tokio::sync::oneshot; use tokio::task::yield_now; - use crate::hummock::compactor::CompactionExecutor; use crate::hummock::event_handler::hummock_event_handler::BufferTracker; use crate::hummock::event_handler::uploader::{ - default_spawn_merging_task, HummockUploader, MergingImmTask, UploadTaskInfo, - UploadTaskOutput, UploadTaskPayload, UploaderContext, UploaderEvent, UploadingTask, - }; - use crate::hummock::event_handler::LocalInstanceId; - use crate::hummock::iterator::test_utils::{ - iterator_test_table_key_of, transform_shared_buffer, + HummockUploader, UploadTaskInfo, UploadTaskInput, UploadTaskOutput, UploaderContext, + UploaderEvent, UploadingTask, }; + use crate::hummock::event_handler::{LocalInstanceId, TEST_LOCAL_INSTANCE_ID}; use crate::hummock::local_version::pinned_version::PinnedVersion; use crate::hummock::shared_buffer::shared_buffer_batch::{ - SharedBufferBatch, SharedBufferValue, + SharedBufferBatch, SharedBufferBatchId, SharedBufferValue, }; use crate::hummock::{HummockError, HummockResult, MemoryLimiter}; use crate::mem_table::{ImmId, ImmutableMemtable}; @@ -1274,7 +978,7 @@ mod tests { pub trait UploadOutputFuture = Future> + Send + 'static; pub trait UploadFn = - Fn(UploadTaskPayload, UploadTaskInfo) -> Fut + Send + Sync + 'static; + Fn(UploadTaskInput, UploadTaskInfo) -> Fut + Send + Sync + 'static; fn test_hummock_version(epoch: HummockEpoch) -> HummockVersion { HummockVersion { @@ -1315,7 +1019,7 @@ mod tests { None, size, TEST_TABLE_ID, - LocalInstanceId::default(), + TEST_LOCAL_INSTANCE_ID, tracker, ) } @@ -1350,11 +1054,9 @@ mod tests { F: UploadFn, { let config = StorageOpts::default(); - let compaction_executor = Arc::new(CompactionExecutor::new(None)); UploaderContext::new( initial_pinned_version(), Arc::new(move |payload, task_info| spawn(upload_fn(payload, task_info))), - default_spawn_merging_task(compaction_executor, None), BufferTracker::for_test(), &config, Arc::new(HummockStateStoreMetrics::unused()), @@ -1367,15 +1069,12 @@ mod tests { F: UploadFn, { let config = StorageOpts { - imm_merge_threshold: 4, ..Default::default() }; - let compaction_executor = Arc::new(CompactionExecutor::new(None)); HummockUploader::new( Arc::new(HummockStateStoreMetrics::unused()), initial_pinned_version(), Arc::new(move |payload, task_info| spawn(upload_fn(payload, task_info))), - default_spawn_merging_task(compaction_executor, None), BufferTracker::for_test(), &config, ) @@ -1391,7 +1090,7 @@ mod tests { #[allow(clippy::unused_async)] async fn dummy_success_upload_future( - _: UploadTaskPayload, + _: UploadTaskInput, _: UploadTaskInfo, ) -> HummockResult { Ok(dummy_success_upload_output()) @@ -1399,22 +1098,42 @@ mod tests { #[allow(clippy::unused_async)] async fn dummy_fail_upload_future( - _: UploadTaskPayload, + _: UploadTaskInput, _: UploadTaskInfo, ) -> HummockResult { Err(HummockError::other("failed")) } + impl UploadingTask { + fn from_vec(imms: Vec, context: &UploaderContext) -> Self { + let mut input: HashMap<_, Vec<_>> = HashMap::new(); + for imm in imms { + input.entry(imm.instance_id).or_default().push(imm); + } + Self::new(input, context) + } + } + + fn get_imm_ids<'a>( + imms: impl IntoIterator, + ) -> HashMap> { + let mut ret: HashMap<_, Vec<_>> = HashMap::new(); + for imm in imms { + ret.entry(imm.instance_id).or_default().push(imm.batch_id()) + } + ret + } + #[tokio::test] pub async fn test_uploading_task_future() { let uploader_context = test_uploader_context(dummy_success_upload_future); let imm = gen_imm(INITIAL_EPOCH).await; let imm_size = imm.size(); - let imm_id = imm.batch_id(); - let task = UploadingTask::new(vec![imm], &uploader_context); + let imm_ids = get_imm_ids(vec![&imm]); + let task = UploadingTask::from_vec(vec![imm], &uploader_context); assert_eq!(imm_size, task.task_info.task_size); - assert_eq!(vec![imm_id], task.task_info.imm_ids); + assert_eq!(imm_ids, task.task_info.imm_ids); assert_eq!(vec![INITIAL_EPOCH], task.task_info.epochs); let output = task.await.unwrap(); assert_eq!( @@ -1422,18 +1141,20 @@ mod tests { &dummy_success_upload_output().new_value_ssts ); assert_eq!(imm_size, output.imm_size()); - assert_eq!(&vec![imm_id], output.imm_ids()); + assert_eq!(&imm_ids, output.imm_ids()); assert_eq!(&vec![INITIAL_EPOCH], output.epochs()); let uploader_context = test_uploader_context(dummy_fail_upload_future); - let task = UploadingTask::new(vec![gen_imm(INITIAL_EPOCH).await], &uploader_context); + let imm = gen_imm(INITIAL_EPOCH).await; + let task = UploadingTask::from_vec(vec![imm], &uploader_context); let _ = task.await.unwrap_err(); } #[tokio::test] pub async fn test_uploading_task_poll_result() { let uploader_context = test_uploader_context(dummy_success_upload_future); - let mut task = UploadingTask::new(vec![gen_imm(INITIAL_EPOCH).await], &uploader_context); + let mut task = + UploadingTask::from_vec(vec![gen_imm(INITIAL_EPOCH).await], &uploader_context); let output = poll_fn(|cx| task.poll_result(cx)).await.unwrap(); assert_eq!( output.sstable_infos(), @@ -1441,7 +1162,8 @@ mod tests { ); let uploader_context = test_uploader_context(dummy_fail_upload_future); - let mut task = UploadingTask::new(vec![gen_imm(INITIAL_EPOCH).await], &uploader_context); + let mut task = + UploadingTask::from_vec(vec![gen_imm(INITIAL_EPOCH).await], &uploader_context); let _ = poll_fn(|cx| task.poll_result(cx)).await.unwrap_err(); } @@ -1463,7 +1185,8 @@ mod tests { ret } }); - let mut task = UploadingTask::new(vec![gen_imm(INITIAL_EPOCH).await], &uploader_context); + let mut task = + UploadingTask::from_vec(vec![gen_imm(INITIAL_EPOCH).await], &uploader_context); let output = poll_fn(|cx| task.poll_ok_with_retry(cx)).await; assert_eq!(fail_num + 1, run_count_clone.load(SeqCst)); assert_eq!( @@ -1514,7 +1237,7 @@ mod tests { assert_eq!(1, ssts.len()); let staging_sst = ssts.first().unwrap(); assert_eq!(&vec![epoch1], staging_sst.epochs()); - assert_eq!(&vec![imm.batch_id()], staging_sst.imm_ids()); + assert_eq!(&get_imm_ids([&imm]), staging_sst.imm_ids()); assert_eq!( &dummy_success_upload_output().new_value_ssts, staging_sst.sstable_infos() @@ -1528,7 +1251,7 @@ mod tests { assert_eq!(1, ssts.len()); let staging_sst = ssts.first().unwrap(); assert_eq!(&vec![epoch1], staging_sst.epochs()); - assert_eq!(&vec![imm.batch_id()], staging_sst.imm_ids()); + assert_eq!(&get_imm_ids([&imm]), staging_sst.imm_ids()); assert_eq!( &dummy_success_upload_output().new_value_ssts, staging_sst.sstable_infos() @@ -1543,107 +1266,6 @@ mod tests { assert_eq!(epoch1, uploader.max_committed_epoch()); } - #[tokio::test] - async fn test_uploader_merge_imms_without_flush() { - let mut uploader = test_uploader(dummy_success_upload_future); - let mut all_imms = VecDeque::new(); - // assume a chckpoint consists of 11 epochs - let ckpt_intervals = 11; - let imm_merge_threshold: usize = uploader.imm_merge_threshold(); - - // For each epoch, we gen imm for 2 shards and add them to uploader and seal the epoch - // afterward. check uploader's state after each epoch has been sealed - // When we get IMM_MERGE_THRESHOLD epochs, there should be merging task started for sealed - // data. Then we await the merging task and check the uploader's state again. - let mut merged_imms = VecDeque::new(); - - let mut epoch = INITIAL_EPOCH; - for i in 1..=ckpt_intervals { - epoch.inc_epoch(); - let mut imm1 = gen_imm(epoch).await; - let mut imm2 = gen_imm(epoch).await; - - imm1.instance_id = 1 as LocalInstanceId; - imm2.instance_id = 2 as LocalInstanceId; - - uploader.add_imm(imm1.clone()); - uploader.add_imm(imm2.clone()); - - // newer imm comes in front - all_imms.push_front(imm1); - all_imms.push_front(imm2); - - uploader.seal_epoch(epoch); - - assert_eq!(epoch, uploader.max_sealed_epoch); - // check sealed data has two imms - let imms_by_epoch = uploader.sealed_data.imms_by_epoch(); - if let Some((e, imms)) = imms_by_epoch.last_key_value() - && *e == epoch - { - assert_eq!(2, imms.len()); - } - - let epoch_cnt = i; - - if epoch_cnt < imm_merge_threshold { - assert!(uploader.sealed_data.merging_tasks.is_empty()); - assert!(uploader.sealed_data.spilled_data.is_empty()); - assert_eq!(epoch_cnt, uploader.sealed_data.epochs.len()); - } else { - assert_eq!(epoch_cnt, uploader.sealed_data.epochs.len()); - - let unmerged_imm_cnt: usize = epoch_cnt - imm_merge_threshold * merged_imms.len(); - - if unmerged_imm_cnt < imm_merge_threshold { - continue; - } - - let imms_by_shard = &mut uploader.sealed_data.imms_by_table_shard; - // check shard 1 - if let Some(imms) = imms_by_shard.get(&(TEST_TABLE_ID, 1 as LocalInstanceId)) { - assert_eq!(imm_merge_threshold, imms.len()); - } - - // check shard 2 - if let Some(imms) = imms_by_shard.get(&(TEST_TABLE_ID, 2 as LocalInstanceId)) { - assert_eq!(imm_merge_threshold, imms.len()); - } - - // we have enough sealed imms, start merging task - println!("start merging task for epoch {}", epoch); - uploader.start_merge_imms(epoch); - assert!(!uploader.sealed_data.merging_tasks.is_empty()); - assert!(uploader.sealed_data.spilled_data.is_empty()); - - // check after generate merging task - if let Some(imms) = uploader - .sealed_data - .imms_by_table_shard - .get(&(TEST_TABLE_ID, 1 as LocalInstanceId)) - { - assert_eq!(0, imms.len()); - } - if let Some(imms) = uploader - .sealed_data - .imms_by_table_shard - .get(&(TEST_TABLE_ID, 2 as LocalInstanceId)) - { - assert_eq!(0, imms.len()); - } - - // poll the merging task and check the result - match uploader.next_event().await { - UploaderEvent::ImmMerged(output) => { - println!("merging task success for epoch {}", epoch); - merged_imms.push_front(output.merged_imm); - } - _ => unreachable!(), - }; - } - } - } - #[tokio::test] async fn test_uploader_empty_epoch() { let mut uploader = test_uploader(dummy_success_upload_future); @@ -1675,90 +1297,6 @@ mod tests { assert_eq!(epoch1, uploader.max_committed_epoch()); } - #[tokio::test] - async fn test_drop_success_merging_task() { - let table_id = TableId { table_id: 1004 }; - let shared_buffer_items1: Vec<(Vec, SharedBufferValue)> = vec![ - ( - iterator_test_table_key_of(1), - SharedBufferValue::Insert(Bytes::from("value1")), - ), - ( - iterator_test_table_key_of(2), - SharedBufferValue::Insert(Bytes::from("value2")), - ), - ( - iterator_test_table_key_of(3), - SharedBufferValue::Insert(Bytes::from("value3")), - ), - ]; - let epoch = test_epoch(1); - let imm1 = SharedBufferBatch::for_test( - transform_shared_buffer(shared_buffer_items1.clone()), - epoch, - table_id, - ); - let shared_buffer_items2: Vec<(Vec, SharedBufferValue)> = vec![ - ( - iterator_test_table_key_of(1), - SharedBufferValue::Insert(Bytes::from("value12")), - ), - ( - iterator_test_table_key_of(2), - SharedBufferValue::Insert(Bytes::from("value22")), - ), - ( - iterator_test_table_key_of(3), - SharedBufferValue::Insert(Bytes::from("value32")), - ), - ]; - let epoch = test_epoch(2); - let imm2 = SharedBufferBatch::for_test( - transform_shared_buffer(shared_buffer_items2.clone()), - epoch, - table_id, - ); - - let shared_buffer_items3: Vec<(Vec, SharedBufferValue)> = vec![ - ( - iterator_test_table_key_of(1), - SharedBufferValue::Insert(Bytes::from("value13")), - ), - ( - iterator_test_table_key_of(2), - SharedBufferValue::Insert(Bytes::from("value23")), - ), - ( - iterator_test_table_key_of(3), - SharedBufferValue::Insert(Bytes::from("value33")), - ), - ]; - let epoch = test_epoch(3); - let imm3 = SharedBufferBatch::for_test( - transform_shared_buffer(shared_buffer_items3.clone()), - epoch, - table_id, - ); - - // newer data comes first - let imms = vec![imm3, imm2, imm1]; - let context = test_uploader_context(dummy_success_upload_future); - let mut task = MergingImmTask::new(table_id, 0, imms, None, &context); - let sleep = tokio::time::sleep(Duration::from_millis(500)); - tokio::select! { - _ = sleep => { - println!("sleep timeout") - } - imm = &mut task => { - println!("merging task success"); - assert_eq!(table_id, imm.table_id); - assert_eq!(9, imm.value_count()); - } - } - task.join_handle.abort(); - println!("merging task abort success"); - } - #[tokio::test] async fn test_uploader_poll_empty() { let mut uploader = test_uploader(dummy_success_upload_future); @@ -1816,7 +1354,6 @@ mod tests { assert_eq!(epoch6, epoch); } UploaderEvent::DataSpilled(_) => unreachable!(), - UploaderEvent::ImmMerged(_) => unreachable!(), } uploader.update_pinned_version(version5); assert_eq!(epoch6, uploader.max_synced_epoch); @@ -1827,7 +1364,7 @@ mod tests { fn prepare_uploader_order_test() -> ( BufferTracker, HummockUploader, - impl Fn(Vec) -> (BoxFuture<'static, ()>, oneshot::Sender<()>), + impl Fn(HashMap>) -> (BoxFuture<'static, ()>, oneshot::Sender<()>), ) { // flush threshold is 0. Flush anyway let buffer_tracker = @@ -1840,7 +1377,7 @@ mod tests { let new_task_notifier = { let task_notifier_holder = task_notifier_holder.clone(); - move |imm_ids: Vec| { + move |imm_ids: HashMap>| { let (start_tx, start_rx) = oneshot::channel(); let (finish_tx, finish_rx) = oneshot::channel(); task_notifier_holder @@ -1856,12 +1393,11 @@ mod tests { }; let config = StorageOpts::default(); - let compaction_executor = Arc::new(CompactionExecutor::new(None)); let uploader = HummockUploader::new( Arc::new(HummockStateStoreMetrics::unused()), initial_pinned_version(), Arc::new({ - move |_: UploadTaskPayload, task_info: UploadTaskInfo| { + move |_, task_info: UploadTaskInfo| { let task_notifier_holder = task_notifier_holder.clone(); let (start_tx, finish_rx) = task_notifier_holder.lock().pop_back().unwrap(); let start_epoch = *task_info.epochs.last().unwrap(); @@ -1879,7 +1415,6 @@ mod tests { }) } }), - default_spawn_merging_task(compaction_executor, None), buffer_tracker.clone(), &config, ); @@ -1915,9 +1450,8 @@ mod tests { uploader.add_imm(imm1_2.clone()); // imm1 will be spilled first - let (await_start1, finish_tx1) = - new_task_notifier(vec![imm1_2.batch_id(), imm1_1.batch_id()]); - let (await_start2, finish_tx2) = new_task_notifier(vec![imm2.batch_id()]); + let (await_start1, finish_tx1) = new_task_notifier(get_imm_ids([&imm1_2, &imm1_1])); + let (await_start2, finish_tx2) = new_task_notifier(get_imm_ids([&imm2])); uploader.may_flush(); await_start1.await; await_start2.await; @@ -1929,14 +1463,14 @@ mod tests { finish_tx1.send(()).unwrap(); if let UploaderEvent::DataSpilled(sst) = uploader.next_event().await { - assert_eq!(&vec![imm1_2.batch_id(), imm1_1.batch_id()], sst.imm_ids()); + assert_eq!(&get_imm_ids([&imm1_2, &imm1_1]), sst.imm_ids()); assert_eq!(&vec![epoch1], sst.epochs()); } else { unreachable!("") } if let UploaderEvent::DataSpilled(sst) = uploader.next_event().await { - assert_eq!(&vec![imm2.batch_id()], sst.imm_ids()); + assert_eq!(&get_imm_ids([&imm2]), sst.imm_ids()); assert_eq!(&vec![epoch2], sst.epochs()); } else { unreachable!("") @@ -1944,12 +1478,12 @@ mod tests { let imm1_3 = gen_imm_with_limiter(epoch1, memory_limiter).await; uploader.add_imm(imm1_3.clone()); - let (await_start1_3, finish_tx1_3) = new_task_notifier(vec![imm1_3.batch_id()]); + let (await_start1_3, finish_tx1_3) = new_task_notifier(get_imm_ids([&imm1_3])); uploader.may_flush(); await_start1_3.await; let imm1_4 = gen_imm_with_limiter(epoch1, memory_limiter).await; uploader.add_imm(imm1_4.clone()); - let (await_start1_4, finish_tx1_4) = new_task_notifier(vec![imm1_4.batch_id()]); + let (await_start1_4, finish_tx1_4) = new_task_notifier(get_imm_ids([&imm1_4])); uploader.seal_epoch(epoch1); uploader.start_sync_epoch(epoch1); await_start1_4.await; @@ -1964,12 +1498,12 @@ mod tests { let epoch3 = epoch2.next_epoch(); let imm3_1 = gen_imm_with_limiter(epoch3, memory_limiter).await; uploader.add_imm(imm3_1.clone()); - let (await_start3_1, finish_tx3_1) = new_task_notifier(vec![imm3_1.batch_id()]); + let (await_start3_1, finish_tx3_1) = new_task_notifier(get_imm_ids([&imm3_1])); uploader.may_flush(); await_start3_1.await; let imm3_2 = gen_imm_with_limiter(epoch3, memory_limiter).await; uploader.add_imm(imm3_2.clone()); - let (await_start3_2, finish_tx3_2) = new_task_notifier(vec![imm3_2.batch_id()]); + let (await_start3_2, finish_tx3_2) = new_task_notifier(get_imm_ids([&imm3_2])); uploader.may_flush(); await_start3_2.await; let imm3_3 = gen_imm_with_limiter(epoch3, memory_limiter).await; @@ -2000,8 +1534,8 @@ mod tests { if let UploaderEvent::SyncFinish(epoch, newly_upload_sst) = uploader.next_event().await { assert_eq!(epoch1, epoch); assert_eq!(2, newly_upload_sst.len()); - assert_eq!(&vec![imm1_4.batch_id()], newly_upload_sst[0].imm_ids()); - assert_eq!(&vec![imm1_3.batch_id()], newly_upload_sst[1].imm_ids()); + assert_eq!(&get_imm_ids([&imm1_4]), newly_upload_sst[0].imm_ids()); + assert_eq!(&get_imm_ids([&imm1_3]), newly_upload_sst[1].imm_ids()); } else { unreachable!("should be sync finish"); } @@ -2013,12 +1547,9 @@ mod tests { .unwrap() .staging_ssts; assert_eq!(3, synced_data1.len()); - assert_eq!(&vec![imm1_4.batch_id()], synced_data1[0].imm_ids()); - assert_eq!(&vec![imm1_3.batch_id()], synced_data1[1].imm_ids()); - assert_eq!( - &vec![imm1_2.batch_id(), imm1_1.batch_id()], - synced_data1[2].imm_ids() - ); + assert_eq!(&get_imm_ids([&imm1_4]), synced_data1[0].imm_ids()); + assert_eq!(&get_imm_ids([&imm1_3]), synced_data1[1].imm_ids()); + assert_eq!(&get_imm_ids([&imm1_2, &imm1_1]), synced_data1[2].imm_ids()); // current uploader state: // unsealed: epoch3: imm: imm3_3, uploading: [imm3_2], [imm3_1] @@ -2042,7 +1573,7 @@ mod tests { .unwrap() .staging_ssts; assert_eq!(1, synced_data2.len()); - assert_eq!(&vec![imm2.batch_id()], synced_data2[0].imm_ids()); + assert_eq!(&get_imm_ids([&imm2]), synced_data2[0].imm_ids()); // current uploader state: // unsealed: epoch3: imm: imm3_3, uploading: [imm3_2], [imm3_1] @@ -2054,7 +1585,7 @@ mod tests { uploader.seal_epoch(epoch3); if let UploaderEvent::DataSpilled(sst) = uploader.next_event().await { - assert_eq!(&vec![imm3_1.batch_id()], sst.imm_ids()); + assert_eq!(&get_imm_ids([&imm3_1]), sst.imm_ids()); } else { unreachable!("should be data spilled"); } @@ -2068,7 +1599,7 @@ mod tests { uploader.seal_epoch(epoch4); let (await_start4_with_3_3, finish_tx4_with_3_3) = - new_task_notifier(vec![imm4.batch_id(), imm3_3.batch_id()]); + new_task_notifier(get_imm_ids([&imm4, &imm3_3])); uploader.start_sync_epoch(epoch4); await_start4_with_3_3.await; @@ -2088,10 +1619,10 @@ mod tests { assert_eq!(epoch4, epoch); assert_eq!(2, newly_upload_sst.len()); assert_eq!( - &vec![imm4.batch_id(), imm3_3.batch_id()], + &get_imm_ids([&imm4, &imm3_3]), newly_upload_sst[0].imm_ids() ); - assert_eq!(&vec![imm3_2.batch_id()], newly_upload_sst[1].imm_ids()); + assert_eq!(&get_imm_ids([&imm3_2]), newly_upload_sst[1].imm_ids()); } else { unreachable!("should be sync finish"); } @@ -2104,12 +1635,9 @@ mod tests { .staging_ssts; assert_eq!(3, synced_data4.len()); assert_eq!(&vec![epoch4, epoch3], synced_data4[0].epochs()); - assert_eq!( - &vec![imm4.batch_id(), imm3_3.batch_id()], - synced_data4[0].imm_ids() - ); - assert_eq!(&vec![imm3_2.batch_id()], synced_data4[1].imm_ids()); - assert_eq!(&vec![imm3_1.batch_id()], synced_data4[2].imm_ids()); + assert_eq!(&get_imm_ids([&imm4, &imm3_3]), synced_data4[0].imm_ids()); + assert_eq!(&get_imm_ids([&imm3_2]), synced_data4[1].imm_ids()); + assert_eq!(&get_imm_ids([&imm3_1]), synced_data4[2].imm_ids()); // current uploader state: // unsealed: empty diff --git a/src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs b/src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs index ecffa11871000..4df3b8bc20854 100644 --- a/src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs +++ b/src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs @@ -578,10 +578,11 @@ impl SharedBufferBatch { ) -> Self { let inner = SharedBufferBatchInner::new(epoch, spill_offset, sorted_items, None, size, None); + use crate::hummock::event_handler::TEST_LOCAL_INSTANCE_ID; SharedBufferBatch { inner: Arc::new(inner), table_id, - instance_id: LocalInstanceId::default(), + instance_id: TEST_LOCAL_INSTANCE_ID, } } } diff --git a/src/storage/src/hummock/store/version.rs b/src/storage/src/hummock/store/version.rs index 9f94895e2bba7..50ce72746c435 100644 --- a/src/storage/src/hummock/store/version.rs +++ b/src/storage/src/hummock/store/version.rs @@ -14,7 +14,7 @@ use std::cmp::Ordering; use std::collections::vec_deque::VecDeque; -use std::collections::HashSet; +use std::collections::HashMap; use std::iter::once; use std::ops::Bound::Included; use std::sync::Arc; @@ -39,17 +39,19 @@ use risingwave_hummock_sdk::version::HummockVersionDelta; use risingwave_hummock_sdk::{EpochWithGap, HummockEpoch, LocalSstableInfo}; use risingwave_pb::hummock::{EpochNewChangeLog, LevelType, SstableInfo}; use sync_point::sync_point; +use tracing::warn; use super::StagingDataIterator; use crate::error::StorageResult; +use crate::hummock::event_handler::LocalInstanceId; use crate::hummock::iterator::change_log::ChangeLogIterator; use crate::hummock::iterator::{ConcatIterator, HummockIteratorUnion, MergeIterator, UserIterator}; use crate::hummock::local_version::pinned_version::PinnedVersion; use crate::hummock::sstable::SstableIteratorReadOptions; use crate::hummock::sstable_store::SstableStoreRef; use crate::hummock::utils::{ - check_subset_preserve_order, filter_single_sst, prune_nonoverlapping_ssts, - prune_overlapping_ssts, range_overlap, search_sst_idx, + filter_single_sst, prune_nonoverlapping_ssts, prune_overlapping_ssts, range_overlap, + search_sst_idx, }; use crate::hummock::{ get_from_batch, get_from_sstable_info, hit_sstable_bloom_filter, HummockError, HummockResult, @@ -77,7 +79,8 @@ pub struct StagingSstableInfo { /// Epochs whose data are included in the Sstable. The newer epoch comes first. /// The field must not be empty. epochs: Vec, - imm_ids: Vec, + // newer data at the front + imm_ids: HashMap>, imm_size: usize, } @@ -86,7 +89,7 @@ impl StagingSstableInfo { sstable_infos: Vec, old_value_sstable_infos: Vec, epochs: Vec, - imm_ids: Vec, + imm_ids: HashMap>, imm_size: usize, ) -> Self { // the epochs are sorted from higher epoch to lower epoch @@ -116,7 +119,7 @@ impl StagingSstableInfo { &self.epochs } - pub fn imm_ids(&self) -> &Vec { + pub fn imm_ids(&self) -> &HashMap> { &self.imm_ids } } @@ -124,7 +127,6 @@ impl StagingSstableInfo { #[derive(Clone)] pub enum StagingData { ImmMem(ImmutableMemtable), - MergedImmMem(ImmutableMemtable, Vec), Sst(Arc), } @@ -208,6 +210,7 @@ impl StagingVersion { /// A container of information required for reading from hummock. pub struct HummockReadVersion { table_id: TableId, + instance_id: LocalInstanceId, /// Local version for staging data. staging: StagingVersion, @@ -231,6 +234,7 @@ pub struct HummockReadVersion { impl HummockReadVersion { pub fn new_with_replication_option( table_id: TableId, + instance_id: LocalInstanceId, committed_version: CommittedVersion, is_replicated: bool, vnodes: Arc, @@ -241,6 +245,7 @@ impl HummockReadVersion { assert!(committed_version.is_valid()); Self { table_id, + instance_id, table_watermarks: committed_version .version() .table_watermarks @@ -265,10 +270,11 @@ impl HummockReadVersion { pub fn new( table_id: TableId, + instance_id: LocalInstanceId, committed_version: CommittedVersion, vnodes: Arc, ) -> Self { - Self::new_with_replication_option(table_id, committed_version, false, vnodes) + Self::new_with_replication_option(table_id, instance_id, committed_version, false, vnodes) } pub fn table_id(&self) -> TableId { @@ -297,77 +303,41 @@ impl HummockReadVersion { self.staging.imm.push_front(imm) } - StagingData::MergedImmMem(merged_imm, imm_ids) => { - self.add_merged_imm(merged_imm, imm_ids); - } StagingData::Sst(staging_sst_ref) => { - // The following properties must be ensured: - // 1) self.staging.imm is sorted by imm id descendingly - // 2) staging_sst.imm_ids preserves the imm id partial - // ordering of the participating read version imms. Example: - // If staging_sst contains two read versions r1: [i1, i3] and r2: [i2, i4], - // then [i2, i1, i3, i4] is valid while [i3, i1, i2, i4] is invalid. - // 3) The intersection between staging_sst.imm_ids and self.staging.imm - // are always the suffix of self.staging.imm - - // Check 1) - debug_assert!(self - .staging - .imm - .iter() - .rev() - .is_sorted_by_key(|imm| imm.batch_id())); - - // Calculate intersection - let staging_imm_ids_from_imms: HashSet = - self.staging.imm.iter().map(|imm| imm.batch_id()).collect(); - - // intersected batch_id order from oldest to newest - let intersect_imm_ids = staging_sst_ref - .imm_ids - .iter() - .rev() - .copied() - .filter(|id| staging_imm_ids_from_imms.contains(id)) - .collect_vec(); - - if !intersect_imm_ids.is_empty() { - // Check 2) - debug_assert!(check_subset_preserve_order( - intersect_imm_ids.iter().copied(), - self.staging.imm.iter().map(|imm| imm.batch_id()).rev(), - )); - - // Check 3) and replace imms with a staging sst - for imm_id in &intersect_imm_ids { - if let Some(imm) = self.staging.imm.back() { - if *imm_id == imm.batch_id() { - self.staging.imm.pop_back(); - } - } else { - let local_imm_ids = self - .staging - .imm - .iter() - .map(|imm| imm.batch_id()) - .collect_vec(); - - unreachable!( - "should not reach here staging_sst.size {}, + let Some(imms) = staging_sst_ref.imm_ids.get(&self.instance_id) else { + warn!( + instance_id = self.instance_id, + "no related imm in sst input" + ); + return; + }; + + // old data comes first + for imm_id in imms.iter().rev() { + let valid = match self.staging.imm.pop_back() { + None => false, + Some(prev_imm_id) => prev_imm_id.batch_id() == *imm_id, + }; + assert!( + valid, + "should be valid staging_sst.size {}, staging_sst.imm_ids {:?}, staging_sst.epochs {:?}, local_imm_ids {:?}, - intersect_imm_ids {:?}", - staging_sst_ref.imm_size, - staging_sst_ref.imm_ids, - staging_sst_ref.epochs, - local_imm_ids, - intersect_imm_ids, - ); - } - } - self.staging.sst.push_front(staging_sst_ref); + instance_id {}", + staging_sst_ref.imm_size, + staging_sst_ref.imm_ids, + staging_sst_ref.epochs, + self.staging + .imm + .iter() + .map(|imm| imm.batch_id()) + .collect_vec(), + self.instance_id, + ); } + + self.staging.sst.push_front(staging_sst_ref); } }, @@ -445,92 +415,6 @@ impl HummockReadVersion { } } - /// `imm_ids` is the list of imm ids that are merged into this batch - /// This field is immutable. Larger imm id at the front. - pub fn add_merged_imm(&mut self, merged_imm: ImmutableMemtable, imm_ids: Vec) { - assert!(imm_ids.iter().rev().is_sorted()); - let min_imm_id = *imm_ids.last().expect("non-empty"); - - let back = self.staging.imm.back().expect("should not be empty"); - - // pop and save imms that are written earlier than the oldest imm if there is any - let earlier_imms = if back.batch_id() < min_imm_id { - let mut earlier_imms = VecDeque::with_capacity(self.staging.imm.len()); - loop { - let batch_id = self - .staging - .imm - .back() - .expect("should not be empty") - .batch_id(); - match batch_id.cmp(&min_imm_id) { - Ordering::Less => { - let imm = self.staging.imm.pop_back().unwrap(); - earlier_imms.push_front(imm); - } - Ordering::Equal => { - break; - } - Ordering::Greater => { - let remaining_staging_imm_ids = self - .staging - .imm - .iter() - .map(|imm| imm.batch_id()) - .collect_vec(); - let earlier_imm_ids = - earlier_imms.iter().map(|imm| imm.batch_id()).collect_vec(); - - unreachable!( - "must have break in equal: {:?} {:?} {:?}", - remaining_staging_imm_ids, earlier_imm_ids, imm_ids - ) - } - } - } - Some(earlier_imms) - } else { - assert_eq!( - back.batch_id(), - min_imm_id, - "{:?} {:?}", - { - self.staging - .imm - .iter() - .map(|imm| imm.batch_id()) - .collect_vec() - }, - imm_ids - ); - None - }; - - // iter from smaller imm and take the older imm at the back. - for imm_id in imm_ids.iter().rev() { - let imm = self.staging.imm.pop_back().expect("should exist"); - assert_eq!( - imm.batch_id(), - *imm_id, - "{:?} {:?} {}", - { - self.staging - .imm - .iter() - .map(|imm| imm.batch_id()) - .collect_vec() - }, - imm_ids, - imm_id, - ); - } - - self.staging.imm.push_back(merged_imm); - if let Some(earlier_imms) = earlier_imms { - self.staging.imm.extend(earlier_imms); - } - } - pub fn is_replicated(&self) -> bool { self.is_replicated } diff --git a/src/storage/src/opts.rs b/src/storage/src/opts.rs index 8feaa10cbfe88..85d8d5c772a06 100644 --- a/src/storage/src/opts.rs +++ b/src/storage/src/opts.rs @@ -39,8 +39,6 @@ pub struct StorageOpts { /// The shared buffer will start flushing data to object when the ratio of memory usage to the /// shared buffer capacity exceed such ratio. pub shared_buffer_flush_ratio: f32, - /// The threshold for the number of immutable memtables to merge to a new imm. - pub imm_merge_threshold: usize, /// Remote directory for storing data and metadata objects. pub data_directory: String, /// Whether to enable write conflict detection @@ -160,7 +158,6 @@ impl From<(&RwConfig, &SystemParamsReader, &StorageMemoryConfig)> for StorageOpt .share_buffer_compaction_worker_threads_number, shared_buffer_capacity_mb: s.shared_buffer_capacity_mb, shared_buffer_flush_ratio: c.storage.shared_buffer_flush_ratio, - imm_merge_threshold: c.storage.imm_merge_threshold, data_directory: p.data_directory().to_string(), write_conflict_detection_enabled: c.storage.write_conflict_detection_enabled, block_cache_capacity_mb: s.block_cache_capacity_mb,