diff --git a/src/storage/hummock_test/src/hummock_storage_tests.rs b/src/storage/hummock_test/src/hummock_storage_tests.rs index 6962fc506ccfb..5c6e1607de926 100644 --- a/src/storage/hummock_test/src/hummock_storage_tests.rs +++ b/src/storage/hummock_test/src/hummock_storage_tests.rs @@ -530,6 +530,9 @@ async fn test_state_store_sync() { .await .unwrap(); + let epoch3 = epoch2.next_epoch(); + hummock_storage.seal_current_epoch(epoch3, SealCurrentEpochOptions::for_test()); + let res = test_env.storage.seal_and_sync_epoch(epoch1).await.unwrap(); test_env .meta_client @@ -829,14 +832,15 @@ async fn test_delete_get() { .await .unwrap(); + let epoch2 = epoch1.next_epoch(); + hummock_storage.seal_current_epoch(epoch2, SealCurrentEpochOptions::for_test()); let res = test_env.storage.seal_and_sync_epoch(epoch1).await.unwrap(); test_env .meta_client .commit_epoch(epoch1, res) .await .unwrap(); - let epoch2 = epoch1.next_epoch(); - hummock_storage.seal_current_epoch(epoch2, SealCurrentEpochOptions::for_test()); + let batch2 = vec![( gen_key_from_str(VirtualNode::ZERO, "bb"), StorageValue::new_delete(), @@ -851,6 +855,7 @@ async fn test_delete_get() { ) .await .unwrap(); + hummock_storage.seal_current_epoch(u64::MAX, SealCurrentEpochOptions::for_test()); let res = test_env.storage.seal_and_sync_epoch(epoch2).await.unwrap(); test_env .meta_client @@ -1005,6 +1010,8 @@ async fn test_multiple_epoch_sync() { }; test_get().await; + let epoch4 = epoch3.next_epoch(); + hummock_storage.seal_current_epoch(epoch4, SealCurrentEpochOptions::for_test()); test_env.storage.seal_epoch(epoch1, false); let sync_result2 = test_env.storage.seal_and_sync_epoch(epoch2).await.unwrap(); let sync_result3 = test_env.storage.seal_and_sync_epoch(epoch3).await.unwrap(); @@ -1079,6 +1086,9 @@ async fn test_iter_with_min_epoch() { .await .unwrap(); + let epoch3 = (33 * 1000) << 16; + hummock_storage.seal_current_epoch(epoch3, SealCurrentEpochOptions::for_test()); + { // test before sync { @@ -1329,6 +1339,9 @@ async fn test_hummock_version_reader() { .await .unwrap(); + let epoch4 = (34 * 1000) << 16; + hummock_storage.seal_current_epoch(epoch4, SealCurrentEpochOptions::for_test()); + { // test before sync { @@ -1739,6 +1752,8 @@ async fn test_get_with_min_epoch() { .await .unwrap(); + hummock_storage.seal_current_epoch(u64::MAX, SealCurrentEpochOptions::for_test()); + { // test before sync let k = gen_key(0); diff --git a/src/storage/hummock_test/src/state_store_tests.rs b/src/storage/hummock_test/src/state_store_tests.rs index 4e14e006f009f..fe129deb52245 100644 --- a/src/storage/hummock_test/src/state_store_tests.rs +++ b/src/storage/hummock_test/src/state_store_tests.rs @@ -1046,11 +1046,12 @@ async fn test_delete_get_v2() { ) .await .unwrap(); - let res = hummock_storage.seal_and_sync_epoch(epoch1).await.unwrap(); - meta_client.commit_epoch(epoch1, res).await.unwrap(); let epoch2 = epoch1.next_epoch(); local.seal_current_epoch(epoch2, SealCurrentEpochOptions::for_test()); + let res = hummock_storage.seal_and_sync_epoch(epoch1).await.unwrap(); + meta_client.commit_epoch(epoch1, res).await.unwrap(); + let batch2 = vec![( gen_key_from_str(VirtualNode::ZERO, "bb"), StorageValue::new_delete(), diff --git a/src/storage/src/hummock/event_handler/hummock_event_handler.rs b/src/storage/src/hummock/event_handler/hummock_event_handler.rs index c01a97563237e..277984d3545dd 100644 --- a/src/storage/src/hummock/event_handler/hummock_event_handler.rs +++ b/src/storage/src/hummock/event_handler/hummock_event_handler.rs @@ -196,7 +196,7 @@ pub struct HummockEventHandler { version_update_rx: UnboundedReceiver, read_version_mapping: Arc>, /// A copy of `read_version_mapping` but owned by event handler - local_read_version_mapping: HashMap, + local_read_version_mapping: HashMap, version_update_notifier_tx: Arc>, pinned_version: Arc>, @@ -455,7 +455,7 @@ impl HummockEventHandler { let mut pending = VecDeque::new(); let mut total_count = 0; for instance_id in instances { - let Some(read_version) = self.local_read_version_mapping.get(&instance_id) else { + let Some((_, read_version)) = self.local_read_version_mapping.get(&instance_id) else { continue; }; total_count += 1; @@ -475,7 +475,7 @@ impl HummockEventHandler { const TRY_LOCK_TIMEOUT: Duration = Duration::from_millis(1); while let Some(instance_id) = pending.pop_front() { - let read_version = self + let (_, read_version) = self .local_read_version_mapping .get(&instance_id) .expect("have checked exist before"); @@ -520,7 +520,6 @@ impl HummockEventHandler { prev_epoch, max_committed_epoch = self.uploader.max_committed_epoch(), max_synced_epoch = self.uploader.max_synced_epoch(), - max_sealed_epoch = self.uploader.max_sealed_epoch(), "handle clear event" ); @@ -588,7 +587,7 @@ impl HummockEventHandler { "read version mapping not empty when clear. remaining tables: {:?}", self.local_read_version_mapping .values() - .map(|read_version| read_version.read().table_id()) + .map(|(_, read_version)| read_version.read().table_id()) .collect_vec() ); @@ -784,6 +783,18 @@ impl HummockEventHandler { HummockEvent::Shutdown => { unreachable!("shutdown is handled specially") } + HummockEvent::InitEpoch { + instance_id, + init_epoch, + } => { + let table_id = self + .local_read_version_mapping + .get(&instance_id) + .expect("should exist") + .0; + self.uploader + .init_instance(instance_id, table_id, init_epoch); + } HummockEvent::ImmToUploader { instance_id, imm } => { assert!( self.local_read_version_mapping.contains_key(&instance_id), @@ -795,29 +806,13 @@ impl HummockEventHandler { self.uploader.may_flush(); } - HummockEvent::SealEpoch { - epoch, - is_checkpoint: _, - } => { - self.uploader.seal_epoch(epoch); - } - HummockEvent::LocalSealEpoch { - epoch, + next_epoch, opts, - table_id, instance_id, } => { - assert!( - self.local_read_version_mapping - .contains_key(&instance_id), - "seal epoch from non-existing read version instance: instance_id: {}, table_id: {}, epoch: {}", - instance_id, table_id, epoch, - ); - if let Some((direction, watermarks)) = opts.table_watermarks { - self.uploader - .add_table_watermarks(epoch, table_id, watermarks, direction) - } + self.uploader + .local_seal_epoch(instance_id, next_epoch, opts); } #[cfg(any(test, feature = "test"))] @@ -852,7 +847,7 @@ impl HummockEventHandler { { self.local_read_version_mapping - .insert(instance_id, basic_read_version.clone()); + .insert(instance_id, (table_id, basic_read_version.clone())); let mut read_version_mapping_guard = self.read_version_mapping.write(); read_version_mapping_guard @@ -876,33 +871,29 @@ impl HummockEventHandler { table_id, instance_id ); guard.event_sender.take().expect("sender is just set"); - self.destroy_read_version(table_id, instance_id); + self.destroy_read_version(instance_id); } } } - HummockEvent::DestroyReadVersion { - table_id, - instance_id, - } => { - self.destroy_read_version(table_id, instance_id); + HummockEvent::DestroyReadVersion { instance_id } => { + self.uploader.may_destroy_instance(instance_id); + self.destroy_read_version(instance_id); } } } - fn destroy_read_version(&mut self, table_id: TableId, instance_id: LocalInstanceId) { + fn destroy_read_version(&mut self, instance_id: LocalInstanceId) { { { - debug!( - "read version deregister: table_id: {}, instance_id: {}", - table_id, instance_id - ); - self.local_read_version_mapping + debug!("read version deregister: instance_id: {}", instance_id); + let (table_id, _) = self + .local_read_version_mapping .remove(&instance_id) .unwrap_or_else(|| { panic!( - "DestroyHummockInstance inexist instance table_id {} instance_id {}", - table_id, instance_id + "DestroyHummockInstance inexist instance instance_id {}", + instance_id ) }); let mut read_version_mapping_guard = self.read_version_mapping.write(); @@ -994,6 +985,7 @@ mod tests { use crate::hummock::test_utils::default_opts_for_test; use crate::hummock::HummockError; use crate::monitor::HummockStateStoreMetrics; + use crate::store::SealCurrentEpochOptions; #[tokio::test] async fn test_clear_shared_buffer() { @@ -1197,6 +1189,11 @@ mod tests { rx.await.unwrap() }; + send_event(HummockEvent::InitEpoch { + instance_id: guard.instance_id, + init_epoch: epoch1, + }); + let imm1 = gen_imm(epoch1).await; read_version .write() @@ -1207,6 +1204,12 @@ mod tests { imm: imm1, }); + send_event(HummockEvent::LocalSealEpoch { + instance_id: guard.instance_id, + next_epoch: epoch2, + opts: SealCurrentEpochOptions::for_test(), + }); + let imm2 = gen_imm(epoch2).await; read_version .write() @@ -1217,20 +1220,19 @@ mod tests { imm: imm2, }); - send_event(HummockEvent::SealEpoch { - epoch: epoch1, - is_checkpoint: true, + let epoch3 = epoch2.next_epoch(); + send_event(HummockEvent::LocalSealEpoch { + instance_id: guard.instance_id, + next_epoch: epoch3, + opts: SealCurrentEpochOptions::for_test(), }); + let (tx1, mut rx1) = oneshot::channel(); send_event(HummockEvent::SyncEpoch { new_sync_epoch: epoch1, sync_result_sender: tx1, }); assert!(poll_fn(|cx| Poll::Ready(rx1.poll_unpin(cx).is_pending())).await); - send_event(HummockEvent::SealEpoch { - epoch: epoch2, - is_checkpoint: true, - }); let (tx2, mut rx2) = oneshot::channel(); send_event(HummockEvent::SyncEpoch { new_sync_epoch: epoch2, diff --git a/src/storage/src/hummock/event_handler/mod.rs b/src/storage/src/hummock/event_handler/mod.rs index c28dd6d25c3a4..bbf69ae194f72 100644 --- a/src/storage/src/hummock/event_handler/mod.rs +++ b/src/storage/src/hummock/event_handler/mod.rs @@ -72,15 +72,14 @@ pub enum HummockEvent { imm: ImmutableMemtable, }, - SealEpoch { - epoch: HummockEpoch, - is_checkpoint: bool, + InitEpoch { + instance_id: LocalInstanceId, + init_epoch: HummockEpoch, }, LocalSealEpoch { instance_id: LocalInstanceId, - table_id: TableId, - epoch: HummockEpoch, + next_epoch: HummockEpoch, opts: SealCurrentEpochOptions, }, @@ -97,7 +96,6 @@ pub enum HummockEvent { }, DestroyReadVersion { - table_id: TableId, instance_id: LocalInstanceId, }, } @@ -116,27 +114,25 @@ impl HummockEvent { HummockEvent::Shutdown => "Shutdown".to_string(), + HummockEvent::InitEpoch { + instance_id, + init_epoch, + } => { + format!("InitEpoch {} {}", instance_id, init_epoch) + } + HummockEvent::ImmToUploader { instance_id, imm } => { format!("ImmToUploader {} {}", instance_id, imm.batch_id()) } - HummockEvent::SealEpoch { - epoch, - is_checkpoint, - } => format!( - "SealEpoch epoch {:?} is_checkpoint {:?}", - epoch, is_checkpoint - ), - HummockEvent::LocalSealEpoch { - epoch, instance_id, - table_id, + next_epoch, opts, } => { format!( - "LocalSealEpoch epoch: {}, table_id: {}, instance_id: {}, opts: {:?}", - epoch, table_id.table_id, instance_id, opts + "LocalSealEpoch next_epoch: {}, instance_id: {}, opts: {:?}", + next_epoch, instance_id, opts ) } @@ -150,13 +146,9 @@ impl HummockEvent { table_id, is_replicated ), - HummockEvent::DestroyReadVersion { - table_id, - instance_id, - } => format!( - "DestroyReadVersion table_id {:?} instance_id {:?}", - table_id, instance_id - ), + HummockEvent::DestroyReadVersion { instance_id } => { + format!("DestroyReadVersion instance_id {:?}", instance_id) + } #[cfg(any(test, feature = "test"))] HummockEvent::FlushEvent(_) => "FlushEvent".to_string(), @@ -210,7 +202,6 @@ impl Drop for LocalInstanceGuard { // need to handle failure sender .send(HummockEvent::DestroyReadVersion { - table_id: self.table_id, instance_id: self.instance_id, }) .unwrap_or_else(|err| { diff --git a/src/storage/src/hummock/event_handler/uploader.rs b/src/storage/src/hummock/event_handler/uploader.rs index 1feaf017b2f02..f768aa23dcd89 100644 --- a/src/storage/src/hummock/event_handler/uploader.rs +++ b/src/storage/src/hummock/event_handler/uploader.rs @@ -12,11 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::hash_map::Entry; +use std::cmp::Ordering; +use std::collections::btree_map::Entry; use std::collections::{BTreeMap, HashMap, VecDeque}; use std::fmt::{Debug, Display, Formatter}; use std::future::{poll_fn, Future}; -use std::mem::{replace, take}; +use std::mem::{replace, swap, take}; use std::pin::Pin; use std::sync::Arc; use std::task::{ready, Context, Poll}; @@ -38,7 +39,7 @@ use risingwave_hummock_sdk::{CompactionGroupId, HummockEpoch, LocalSstableInfo, use thiserror_ext::AsReport; use tokio::sync::oneshot; use tokio::task::JoinHandle; -use tracing::{debug, error, info}; +use tracing::{debug, error, info, warn}; use crate::hummock::event_handler::hummock_event_handler::{send_sync_result, BufferTracker}; use crate::hummock::event_handler::uploader::uploader_imm::UploaderImm; @@ -50,6 +51,17 @@ use crate::hummock::{HummockError, HummockResult, ImmutableMemtable}; use crate::mem_table::ImmId; use crate::monitor::HummockStateStoreMetrics; use crate::opts::StorageOpts; +use crate::store::SealCurrentEpochOptions; + +/// Take epoch data inclusively before `epoch` out from `data` +fn take_before_epoch( + data: &mut BTreeMap, + epoch: HummockEpoch, +) -> BTreeMap { + let mut before_epoch_data = data.split_off(&(epoch + 1)); + swap(&mut before_epoch_data, data); + before_epoch_data +} type UploadTaskInput = HashMap>; pub type UploadTaskPayload = HashMap>; @@ -329,11 +341,6 @@ struct SpilledData { } impl SpilledData { - #[cfg(test)] - fn is_empty(&self) -> bool { - self.uploading_tasks.is_empty() && self.uploaded_data.is_empty() - } - fn add_task(&mut self, task: UploadingTask) { self.uploading_tasks.push_front(task); } @@ -360,20 +367,16 @@ impl SpilledData { } #[derive(Default, Debug)] -struct UnsealedEpochData { - // newer data at the front - imms: HashMap>, +struct EpochData { spilled_data: SpilledData, - - table_watermarks: HashMap, BitmapBuilder)>, } -impl UnsealedEpochData { - fn flush(&mut self, context: &UploaderContext) -> usize { - let imms: HashMap<_, _> = take(&mut self.imms) - .into_iter() - .map(|(id, imms)| (id, imms.into_iter().collect_vec())) - .collect(); +impl EpochData { + fn flush( + &mut self, + context: &UploaderContext, + imms: HashMap>, + ) -> usize { if !imms.is_empty() { let task = UploadingTask::new(imms, context); context.stats.spill_task_counts_from_unsealed.inc(); @@ -389,10 +392,12 @@ impl UnsealedEpochData { 0 } } +} +impl TableUnsyncData { fn add_table_watermarks( &mut self, - table_id: TableId, + epoch: HummockEpoch, table_watermarks: Vec, direction: WatermarkDirection, ) { @@ -411,45 +416,50 @@ impl UnsealedEpochData { } } } - match self.table_watermarks.entry(table_id) { - Entry::Occupied(mut entry) => { - let (prev_direction, prev_watermarks, vnode_bitmap) = entry.get_mut(); + match &mut self.table_watermarks { + Some((prev_direction, prev_watermarks)) => { assert_eq!( *prev_direction, direction, "table id {} new watermark direction not match with previous", - table_id + self.table_id ); - apply_new_vnodes(vnode_bitmap, &table_watermarks); - prev_watermarks.extend(table_watermarks); + match prev_watermarks.entry(epoch) { + Entry::Occupied(mut entry) => { + let (prev_watermarks, vnode_bitmap) = entry.get_mut(); + apply_new_vnodes(vnode_bitmap, &table_watermarks); + prev_watermarks.extend(table_watermarks); + } + Entry::Vacant(entry) => { + let mut vnode_bitmap = BitmapBuilder::zeroed(VirtualNode::COUNT); + apply_new_vnodes(&mut vnode_bitmap, &table_watermarks); + entry.insert((table_watermarks, vnode_bitmap)); + } + } } - Entry::Vacant(entry) => { + None => { let mut vnode_bitmap = BitmapBuilder::zeroed(VirtualNode::COUNT); apply_new_vnodes(&mut vnode_bitmap, &table_watermarks); - entry.insert((direction, table_watermarks, vnode_bitmap)); + self.table_watermarks = Some(( + direction, + BTreeMap::from_iter([(epoch, (table_watermarks, vnode_bitmap))]), + )); } } } } #[derive(Default)] -/// Data at the sealed stage. We will ensure that data in `imms` are newer than the data in the -/// `spilled_data`, and that data in the `uploading_tasks` in `spilled_data` are newer than data in -/// the `uploaded_data` in `spilled_data`. -struct SealedData { +struct SyncDataBuilder { // newer epochs come first epochs: VecDeque, - // Sealed imms grouped by table shard. - // newer data (larger imm id) at the front - imms_by_table_shard: HashMap>, - spilled_data: SpilledData, table_watermarks: HashMap, } -impl SealedData { - /// Add the data of a newly sealed epoch. +impl SyncDataBuilder { + /// Add the data of a new epoch. /// /// Note: it may happen that, for example, currently we hold `imms` and `spilled_data` of epoch /// 3, and after we add the spilled data of epoch 4, both `imms` and `spilled_data` hold data @@ -459,9 +469,9 @@ impl SealedData { /// data of `imms` must not overlap with the epoch 4 data of `spilled_data`. The explanation is /// as followed: /// - /// First, unsealed data has 3 stages, from earlier to later, imms, uploading task, and - /// uploaded. When we try to spill unsealed data, we first pick the imms of older epoch until - /// the imms of older epoch are all picked. When we try to poll the uploading tasks of unsealed + /// First, unsync data has 3 stages, from earlier to later, imms, uploading task, and + /// uploaded. When we try to spill unsync data, we first pick the imms of older epoch until + /// the imms of older epoch are all picked. When we try to poll the uploading tasks of unsync /// data, we first poll the task of older epoch, until there is no uploading task in older /// epoch. Therefore, we can reach that, if two data are in the same stage, but /// different epochs, data in the older epoch will always enter the next stage earlier than data @@ -475,28 +485,19 @@ impl SealedData { /// Based on the two points above, we can reach that, if two data of a same key appear in /// different epochs, the data of older epoch will not appear at a later stage than the data /// of newer epoch. Therefore, we can safely merge the data of each stage when we seal an epoch. - fn seal_new_epoch(&mut self, epoch: HummockEpoch, mut unseal_epoch_data: UnsealedEpochData) { - if let Some(prev_max_sealed_epoch) = self.epochs.front() { + fn add_new_epoch(&mut self, epoch: HummockEpoch, mut unseal_epoch_data: EpochData) { + if let Some(prev_max_epoch) = self.epochs.front() { assert!( - epoch > *prev_max_sealed_epoch, - "epoch {} to seal not greater than prev max sealed epoch {}", + epoch > *prev_max_epoch, + "epoch {} to seal not greater than prev max epoch {}", epoch, - prev_max_sealed_epoch + prev_max_epoch ); } - // rearrange sealed imms by table shard and in epoch descending order - for (instance_id, imms) in unseal_epoch_data.imms { - let queue = self.imms_by_table_shard.entry(instance_id).or_default(); - for imm in imms.into_iter().rev() { - if let Some(front) = queue.front() { - assert_gt!(imm.batch_id(), front.batch_id()); - } - queue.push_front(imm); - } - } - self.epochs.push_front(epoch); + // for each local instance, earlier data must be spilled at earlier epoch. Therefore, since we add spill data from old epoch + // to new epoch, unseal_epoch_data .spilled_data .uploading_tasks @@ -507,64 +508,403 @@ impl SealedData { .append(&mut self.spilled_data.uploaded_data); self.spilled_data.uploading_tasks = unseal_epoch_data.spilled_data.uploading_tasks; self.spilled_data.uploaded_data = unseal_epoch_data.spilled_data.uploaded_data; - for (table_id, (direction, watermarks, _)) in unseal_epoch_data.table_watermarks { - match self.table_watermarks.entry(table_id) { - Entry::Occupied(mut entry) => { - entry.get_mut().add_new_epoch_watermarks( + } + + fn add_table_watermarks( + &mut self, + table_id: TableId, + direction: WatermarkDirection, + watermarks: impl Iterator)>, + ) { + let mut table_watermarks: Option = None; + for (epoch, watermarks) in watermarks { + match &mut table_watermarks { + Some(prev_watermarks) => { + prev_watermarks.add_new_epoch_watermarks( epoch, Arc::from(watermarks), direction, ); } - Entry::Vacant(entry) => { - entry.insert(TableWatermarks::single_epoch(epoch, watermarks, direction)); + None => { + table_watermarks = + Some(TableWatermarks::single_epoch(epoch, watermarks, direction)); } - }; + } + } + if let Some(table_watermarks) = table_watermarks { + assert!(self + .table_watermarks + .insert(table_id, table_watermarks) + .is_none()); } } - // Flush can be triggered by either a sync_epoch or a spill (`may_flush`) request. - fn flush(&mut self, context: &UploaderContext, is_spilled: bool) -> usize { - let payload: HashMap<_, Vec<_>> = take(&mut self.imms_by_table_shard) - .into_iter() - .map(|(id, imms)| (id, imms.into_iter().collect())) - .collect(); - + fn flush(&mut self, context: &UploaderContext, payload: UploadTaskInput) { if !payload.is_empty() { let task = UploadingTask::new(payload, context); - let size = task.task_info.task_size; - if is_spilled { - context.stats.spill_task_counts_from_sealed.inc(); - context - .stats - .spill_task_size_from_sealed - .inc_by(task.task_info.task_size as u64); - info!("Spill sealed data. Task: {}", task.get_task_info()); - } self.spilled_data.add_task(task); - size + } + } +} + +struct LocalInstanceEpochData { + epoch: HummockEpoch, + // newer data comes first. + imms: VecDeque, + has_spilled: bool, +} + +impl LocalInstanceEpochData { + fn new(epoch: HummockEpoch) -> Self { + Self { + epoch, + imms: VecDeque::new(), + has_spilled: false, + } + } + + fn epoch(&self) -> HummockEpoch { + self.epoch + } + + fn add_imm(&mut self, imm: UploaderImm) { + assert_eq!(imm.max_epoch(), imm.min_epoch()); + assert_eq!(self.epoch, imm.min_epoch()); + if let Some(prev_imm) = self.imms.front() { + assert_gt!(imm.batch_id(), prev_imm.batch_id()); + } + self.imms.push_front(imm); + } + + fn is_empty(&self) -> bool { + self.imms.is_empty() + } +} + +struct LocalInstanceUnsyncData { + table_id: TableId, + instance_id: LocalInstanceId, + // None means that the current instance should have stopped advancing + current_epoch_data: Option, + // newer data comes first. + sealed_data: VecDeque, + // newer data comes first + flushing_imms: VecDeque, +} + +impl LocalInstanceUnsyncData { + fn new(table_id: TableId, instance_id: LocalInstanceId, init_epoch: HummockEpoch) -> Self { + Self { + table_id, + instance_id, + current_epoch_data: Some(LocalInstanceEpochData::new(init_epoch)), + sealed_data: VecDeque::new(), + flushing_imms: Default::default(), + } + } + + fn add_imm(&mut self, imm: UploaderImm) { + assert_eq!(self.table_id, imm.table_id); + self.current_epoch_data + .as_mut() + .expect("should be Some when adding new imm") + .add_imm(imm); + } + + fn local_seal_epoch(&mut self, next_epoch: HummockEpoch) -> HummockEpoch { + let data = self + .current_epoch_data + .as_mut() + .expect("should be Some when seal new epoch"); + let current_epoch = data.epoch; + debug!( + instance_id = self.instance_id, + next_epoch, current_epoch, "local seal epoch" + ); + assert_gt!(next_epoch, current_epoch); + let epoch_data = replace(data, LocalInstanceEpochData::new(next_epoch)); + if !epoch_data.is_empty() { + self.sealed_data.push_front(epoch_data); + } + current_epoch + } + + // imm_ids from old to new, which means in ascending order + fn ack_flushed(&mut self, imm_ids: impl Iterator) { + for imm_id in imm_ids { + assert_eq!(self.flushing_imms.pop_back().expect("should exist"), imm_id); + } + } + + fn spill(&mut self, epoch: HummockEpoch) -> Vec { + let imms = if let Some(oldest_sealed_epoch) = self.sealed_data.back() { + match oldest_sealed_epoch.epoch.cmp(&epoch) { + Ordering::Less => { + unreachable!( + "should not spill at this epoch because there \ + is unspilled data in previous epoch: prev epoch {}, spill epoch {}", + oldest_sealed_epoch.epoch, epoch + ); + } + Ordering::Equal => { + let epoch_data = self.sealed_data.pop_back().unwrap(); + assert_eq!(epoch, epoch_data.epoch); + epoch_data.imms + } + Ordering::Greater => VecDeque::new(), + } } else { - 0 + let Some(current_epoch_data) = &mut self.current_epoch_data else { + return Vec::new(); + }; + match current_epoch_data.epoch.cmp(&epoch) { + Ordering::Less => { + assert!( + current_epoch_data.imms.is_empty(), + "should not spill at this epoch because there \ + is unspilled data in current epoch epoch {}, spill epoch {}", + current_epoch_data.epoch, + epoch + ); + VecDeque::new() + } + Ordering::Equal => { + if !current_epoch_data.imms.is_empty() { + current_epoch_data.has_spilled = true; + take(&mut current_epoch_data.imms) + } else { + VecDeque::new() + } + } + Ordering::Greater => VecDeque::new(), + } + }; + self.add_flushing_imm(imms.iter().rev().map(|imm| imm.batch_id())); + imms.into_iter().collect() + } + + fn add_flushing_imm(&mut self, imm_ids: impl Iterator) { + for imm_id in imm_ids { + if let Some(prev_imm_id) = self.flushing_imms.front() { + assert_gt!(imm_id, *prev_imm_id); + } + self.flushing_imms.push_front(imm_id); } } - /// Clear self and return the current sealed data - fn drain(&mut self) -> SealedData { - take(self) + // start syncing the imm inclusively before the `epoch` + // returning data with newer data coming first + fn sync(&mut self, epoch: HummockEpoch) -> Vec { + // firstly added from old to new + let mut ret = Vec::new(); + while let Some(epoch_data) = self.sealed_data.back() + && epoch_data.epoch() <= epoch + { + let imms = self.sealed_data.pop_back().expect("checked exist").imms; + self.add_flushing_imm(imms.iter().rev().map(|imm| imm.batch_id())); + ret.extend(imms.into_iter().rev()); + } + // reverse so that newer data comes first + ret.reverse(); + if let Some(latest_epoch_data) = &self.current_epoch_data { + if latest_epoch_data.epoch <= epoch { + assert!(self.sealed_data.is_empty()); + assert!(latest_epoch_data.is_empty()); + assert!(!latest_epoch_data.has_spilled); + if cfg!(debug_assertions) { + panic!("sync epoch exceeds latest epoch, and the current instance should have be archived"); + } + warn!( + instance_id = self.instance_id, + table_id = self.table_id.table_id, + "sync epoch exceeds latest epoch, and the current instance should have be archived" + ); + self.current_epoch_data = None; + } + } + ret } +} - #[cfg(test)] - fn imm_count(&self) -> usize { - self.imms_by_table_shard - .values() - .map(|imms| imms.len()) - .sum() +struct TableUnsyncData { + table_id: TableId, + instance_data: HashMap, + #[expect(clippy::type_complexity)] + table_watermarks: Option<( + WatermarkDirection, + BTreeMap, BitmapBuilder)>, + )>, +} + +impl TableUnsyncData { + fn new(table_id: TableId) -> Self { + Self { + table_id, + instance_data: Default::default(), + table_watermarks: None, + } + } + + fn sync( + &mut self, + epoch: HummockEpoch, + ) -> ( + impl Iterator)> + '_, + Option<( + WatermarkDirection, + impl Iterator)>, + )>, + ) { + ( + self.instance_data + .iter_mut() + .map(move |(instance_id, data)| (*instance_id, data.sync(epoch))), + self.table_watermarks + .as_mut() + .map(|(direction, watermarks)| { + let watermarks = take_before_epoch(watermarks, epoch); + ( + *direction, + watermarks + .into_iter() + .map(|(epoch, (watermarks, _))| (epoch, watermarks)), + ) + }), + ) + } +} + +#[derive(Default)] +/// Unsync data, can be either imm or spilled sst, and some aggregated epoch information. +/// +/// `instance_data` holds the imm of each individual local instance, and data are first added here. +/// The aggregated epoch information (table watermarks, etc.) and the spilled sst will be added to `epoch_data`. +struct UnsyncData { + table_data: HashMap, + // An index as a mapping from instance id to its table id + instance_table_id: HashMap, + epoch_data: BTreeMap, +} + +impl UnsyncData { + fn init_instance( + &mut self, + table_id: TableId, + instance_id: LocalInstanceId, + init_epoch: HummockEpoch, + ) { + debug!( + table_id = table_id.table_id, + instance_id, init_epoch, "init epoch" + ); + let table_data = self + .table_data + .entry(table_id) + .or_insert_with(|| TableUnsyncData::new(table_id)); + assert!(table_data + .instance_data + .insert( + instance_id, + LocalInstanceUnsyncData::new(table_id, instance_id, init_epoch) + ) + .is_none()); + assert!(self + .instance_table_id + .insert(instance_id, table_id) + .is_none()); + self.epoch_data.entry(init_epoch).or_default(); + } + + fn instance_data( + &mut self, + instance_id: LocalInstanceId, + ) -> Option<&mut LocalInstanceUnsyncData> { + self.instance_table_id + .get_mut(&instance_id) + .cloned() + .map(move |table_id| { + self.table_data + .get_mut(&table_id) + .expect("should exist") + .instance_data + .get_mut(&instance_id) + .expect("should exist") + }) + } + + fn add_imm(&mut self, instance_id: LocalInstanceId, imm: UploaderImm) { + self.instance_data(instance_id) + .expect("should exist") + .add_imm(imm); + } + + fn local_seal_epoch( + &mut self, + instance_id: LocalInstanceId, + next_epoch: HummockEpoch, + opts: SealCurrentEpochOptions, + ) { + let table_id = self.instance_table_id[&instance_id]; + let table_data = self.table_data.get_mut(&table_id).expect("should exist"); + let instance_data = table_data + .instance_data + .get_mut(&instance_id) + .expect("should exist"); + let epoch = instance_data.local_seal_epoch(next_epoch); + self.epoch_data.entry(next_epoch).or_default(); + if let Some((direction, table_watermarks)) = opts.table_watermarks { + table_data.add_table_watermarks(epoch, table_watermarks, direction); + } + } + + fn may_destroy_instance(&mut self, instance_id: LocalInstanceId) { + if let Some(table_id) = self.instance_table_id.remove(&instance_id) { + debug!(instance_id, "destroy instance"); + let table_data = self.table_data.get_mut(&table_id).expect("should exist"); + assert!(table_data.instance_data.remove(&instance_id).is_some()); + if table_data.instance_data.is_empty() { + self.table_data.remove(&table_id); + } + } + } + + fn sync(&mut self, epoch: HummockEpoch, context: &UploaderContext) -> SyncDataBuilder { + let sync_epoch_data = take_before_epoch(&mut self.epoch_data, epoch); + + let mut sync_data = SyncDataBuilder::default(); + for (epoch, epoch_data) in sync_epoch_data { + sync_data.add_new_epoch(epoch, epoch_data); + } + + let mut flush_payload = HashMap::new(); + for (table_id, table_data) in &mut self.table_data { + let (unflushed_payload, table_watermarks) = table_data.sync(epoch); + for (instance_id, payload) in unflushed_payload { + if !payload.is_empty() { + flush_payload.insert(instance_id, payload); + } + } + if let Some((direction, watermarks)) = table_watermarks { + sync_data.add_table_watermarks(*table_id, direction, watermarks); + } + } + sync_data.flush(context, flush_payload); + sync_data + } + + fn ack_flushed(&mut self, sstable_info: &StagingSstableInfo) { + for (instance_id, imm_ids) in sstable_info.imm_ids() { + if let Some(instance_data) = self.instance_data(*instance_id) { + // take `rev` to let old imm id goes first + instance_data.ack_flushed(imm_ids.iter().rev().cloned()); + } + } } } struct SyncingData { - // newer epochs come first - epochs: Vec, + sync_epoch: HummockEpoch, // TODO: may replace `TryJoinAll` with a future that will abort other join handles once // one join handle failed. // None means there is no pending uploading tasks @@ -575,13 +915,7 @@ struct SyncingData { sync_result_sender: oneshot::Sender>, } -impl SyncingData { - fn sync_epoch(&self) -> HummockEpoch { - *self.epochs.first().expect("non-empty") - } -} - -pub(super) struct SyncedData { +pub struct SyncedData { pub newly_upload_ssts: Vec, pub uploaded_ssts: VecDeque, pub table_watermarks: HashMap, @@ -615,12 +949,7 @@ impl UploaderContext { #[derive(Default)] struct UploaderData { - /// Data that are not sealed yet. `epoch` satisfies `epoch > max_sealed_epoch`. - unsealed_data: BTreeMap, - - /// Data that are sealed but not synced yet. `epoch` satisfies - /// `max_syncing_epoch < epoch <= max_sealed_epoch`. - sealed_data: SealedData, + unsync_data: UnsyncData, /// Data that has started syncing but not synced yet. `epoch` satisfies /// `max_synced_epoch < epoch <= max_syncing_epoch`. @@ -630,9 +959,8 @@ struct UploaderData { impl UploaderData { fn abort(self, err: impl Fn() -> HummockError) { - self.sealed_data.spilled_data.abort(); - for (_, unsealed_data) in self.unsealed_data { - unsealed_data.spilled_data.abort(); + for (_, epoch_data) in self.unsync_data.epoch_data { + epoch_data.spilled_data.abort(); } // TODO: call `abort` on the uploading task join handle of syncing_data for syncing_data in self.syncing_data { @@ -651,20 +979,17 @@ enum UploaderState { /// An uploader for hummock data. /// -/// Data have 4 sequential stages: unsealed, sealed, syncing, synced. +/// Data have 3 sequential stages: unsync (inside each local instance, data can be unsealed, sealed), syncing, synced. /// -/// The 4 stages are divided by 3 marginal epochs: `max_sealed_epoch`, `max_syncing_epoch`, +/// The 3 stages are divided by 2 marginal epochs: `max_syncing_epoch`, /// `max_synced_epoch`. Epochs satisfy the following inequality. /// /// (epochs of `synced_data`) <= `max_synced_epoch` < (epochs of `syncing_data`) <= -/// `max_syncing_epoch` < (epochs of `sealed_data`) <= `max_sealed_epoch` < (epochs of -/// `unsealed_data`) +/// `max_syncing_epoch` < (epochs of `unsync_data`) /// /// Data are mostly stored in `VecDeque`, and the order stored in the `VecDeque` indicates the data /// order. Data at the front represents ***newer*** data. pub struct HummockUploader { - /// The maximum epoch that is sealed - max_sealed_epoch: HummockEpoch, /// The maximum epoch that has started syncing max_syncing_epoch: HummockEpoch, /// The maximum epoch that has been synced @@ -685,12 +1010,10 @@ impl HummockUploader { ) -> Self { let initial_epoch = pinned_version.version().max_committed_epoch; Self { - max_sealed_epoch: initial_epoch, max_syncing_epoch: initial_epoch, max_synced_epoch: initial_epoch, state: UploaderState::Working(UploaderData { - unsealed_data: Default::default(), - sealed_data: Default::default(), + unsync_data: Default::default(), syncing_data: Default::default(), }), context: UploaderContext::new( @@ -707,10 +1030,6 @@ impl HummockUploader { &self.context.buffer_tracker } - pub(super) fn max_sealed_epoch(&self) -> HummockEpoch { - self.max_sealed_epoch - } - pub(super) fn max_synced_epoch(&self) -> HummockEpoch { self.max_synced_epoch } @@ -727,78 +1046,36 @@ impl HummockUploader { let UploaderState::Working(data) = &mut self.state else { return; }; - let epoch = imm.min_epoch(); - assert!( - epoch > self.max_sealed_epoch, - "imm epoch {} older than max sealed epoch {}", - epoch, - self.max_sealed_epoch - ); - let unsealed_data = data.unsealed_data.entry(epoch).or_default(); - unsealed_data - .imms - .entry(instance_id) - .or_default() - .push_front(UploaderImm::new(imm, &self.context)); + let imm = UploaderImm::new(imm, &self.context); + data.unsync_data.add_imm(instance_id, imm); } - pub(super) fn add_table_watermarks( + pub(super) fn init_instance( &mut self, - epoch: u64, + instance_id: LocalInstanceId, table_id: TableId, - table_watermarks: Vec, - direction: WatermarkDirection, + init_epoch: HummockEpoch, ) { let UploaderState::Working(data) = &mut self.state else { return; }; - assert!( - epoch > self.max_sealed_epoch, - "imm epoch {} older than max sealed epoch {}", - epoch, - self.max_sealed_epoch - ); - data.unsealed_data - .entry(epoch) - .or_default() - .add_table_watermarks(table_id, table_watermarks, direction); + assert_gt!(init_epoch, self.max_syncing_epoch); + data.unsync_data + .init_instance(table_id, instance_id, init_epoch); } - pub(super) fn seal_epoch(&mut self, epoch: HummockEpoch) { + pub(super) fn local_seal_epoch( + &mut self, + instance_id: LocalInstanceId, + next_epoch: HummockEpoch, + opts: SealCurrentEpochOptions, + ) { let UploaderState::Working(data) = &mut self.state else { return; }; - debug!("epoch {} is sealed", epoch); - assert!( - epoch > self.max_sealed_epoch, - "sealing a sealed epoch {}. {}", - epoch, - self.max_sealed_epoch - ); - self.max_sealed_epoch = epoch; - let unsealed_data = - if let Some((&smallest_unsealed_epoch, _)) = data.unsealed_data.first_key_value() { - assert!( - smallest_unsealed_epoch >= epoch, - "some epoch {} older than epoch to seal {}", - smallest_unsealed_epoch, - epoch - ); - if smallest_unsealed_epoch == epoch { - let (_, unsealed_data) = data - .unsealed_data - .pop_first() - .expect("we have checked non-empty"); - unsealed_data - } else { - debug!("epoch {} to seal has no data", epoch); - UnsealedEpochData::default() - } - } else { - debug!("epoch {} to seal has no data", epoch); - UnsealedEpochData::default() - }; - data.sealed_data.seal_new_epoch(epoch, unsealed_data); + assert_gt!(next_epoch, self.max_syncing_epoch); + data.unsync_data + .local_seal_epoch(instance_id, next_epoch, opts); } pub(super) fn start_sync_epoch( @@ -827,20 +1104,12 @@ impl HummockUploader { epoch, self.max_syncing_epoch ); - assert_eq!( - epoch, self.max_sealed_epoch, - "we must start syncing all the sealed data", - ); self.max_syncing_epoch = epoch; - // flush imms to SST file, the output SSTs will be uploaded to object store - // return unfinished merging task - data.sealed_data.flush(&self.context, false); + let sync_data = data.unsync_data.sync(epoch, &self.context); - let SealedData { - epochs, - imms_by_table_shard, + let SyncDataBuilder { spilled_data: SpilledData { uploading_tasks, @@ -848,14 +1117,7 @@ impl HummockUploader { }, table_watermarks, .. - } = data.sealed_data.drain(); - - assert!( - imms_by_table_shard.is_empty(), - "after flush, imms must be empty" - ); - - assert_eq!(epoch, *epochs.front().expect("non-empty epoch")); + } = sync_data; let try_join_all_upload_task = if uploading_tasks.is_empty() { None @@ -864,7 +1126,7 @@ impl HummockUploader { }; data.syncing_data.push_front(SyncingData { - epochs: epochs.into_iter().collect(), + sync_epoch: epoch, uploading_tasks: try_join_all_upload_task, uploaded: uploaded_data, table_watermarks, @@ -902,34 +1164,21 @@ impl HummockUploader { self.context.pinned_version = pinned_version; if self.max_synced_epoch < max_committed_epoch { self.max_synced_epoch = max_committed_epoch; - if let UploaderState::Working(data) = &mut self.state { - if let Some(syncing_data) = data.syncing_data.back() { - // there must not be any syncing data below MCE - assert_gt!( - *syncing_data - .epochs - .last() - .expect("epoch should not be empty"), - max_committed_epoch - ); - } - }; } if self.max_syncing_epoch < max_committed_epoch { self.max_syncing_epoch = max_committed_epoch; - if let UploaderState::Working(data) = &mut self.state { - // there must not be any sealed data below MCE - if let Some(&epoch) = data.sealed_data.epochs.back() { - assert_gt!(epoch, max_committed_epoch); - } - } - } - if self.max_sealed_epoch < max_committed_epoch { - self.max_sealed_epoch = max_committed_epoch; - if let UploaderState::Working(data) = &mut self.state { - // there must not be any unsealed data below MCE - if let Some((&epoch, _)) = data.unsealed_data.first_key_value() { - assert_gt!(epoch, max_committed_epoch); + if let UploaderState::Working(data) = &self.state { + for instance_data in data + .unsync_data + .table_data + .values() + .flat_map(|data| data.instance_data.values()) + { + if let Some(oldest_epoch) = instance_data.sealed_data.back() { + assert_gt!(oldest_epoch.epoch, max_committed_epoch); + } else if let Some(current_epoch) = &instance_data.current_epoch_data { + assert_gt!(current_epoch.epoch, max_committed_epoch); + } } } } @@ -941,26 +1190,28 @@ impl HummockUploader { }; if self.context.buffer_tracker.need_flush() { let mut curr_batch_flush_size = 0; - if self.context.buffer_tracker.need_flush() { - curr_batch_flush_size += data.sealed_data.flush(&self.context, true); - } - - if self - .context - .buffer_tracker - .need_more_flush(curr_batch_flush_size) - { - // iterate from older epoch to newer epoch - for unsealed_data in data.unsealed_data.values_mut() { - curr_batch_flush_size += unsealed_data.flush(&self.context); - if !self - .context - .buffer_tracker - .need_more_flush(curr_batch_flush_size) - { - break; + // iterate from older epoch to newer epoch + for (epoch, epoch_data) in &mut data.unsync_data.epoch_data { + if !self + .context + .buffer_tracker + .need_more_flush(curr_batch_flush_size) + { + break; + } + let mut payload = HashMap::new(); + for (instance_id, instance_data) in data + .unsync_data + .table_data + .values_mut() + .flat_map(|data| data.instance_data.iter_mut()) + { + let instance_payload = instance_data.spill(*epoch); + if !instance_payload.is_empty() { + payload.insert(*instance_id, instance_payload); } } + curr_batch_flush_size += epoch_data.flush(&self.context, payload); } curr_batch_flush_size > 0 } else { @@ -972,7 +1223,6 @@ impl HummockUploader { let max_committed_epoch = self.context.pinned_version.max_committed_epoch(); self.max_synced_epoch = max_committed_epoch; self.max_syncing_epoch = max_committed_epoch; - self.max_sealed_epoch = max_committed_epoch; if let UploaderState::Working(data) = replace( &mut self.state, UploaderState::Working(UploaderData::default()), @@ -984,6 +1234,13 @@ impl HummockUploader { self.context.stats.uploader_syncing_epoch_count.set(0); } + + pub(crate) fn may_destroy_instance(&mut self, instance_id: LocalInstanceId) { + let UploaderState::Working(data) = &mut self.state else { + return; + }; + data.unsync_data.may_destroy_instance(instance_id); + } } impl UploaderData { @@ -1015,12 +1272,18 @@ impl UploaderData { .stats .uploader_syncing_epoch_count .set(self.syncing_data.len() as _); - let epoch = syncing_data.sync_epoch(); + let epoch = syncing_data.sync_epoch; - let result = result.map(|newly_uploaded_sstable_infos| SyncedData { - newly_upload_ssts: newly_uploaded_sstable_infos, - uploaded_ssts: syncing_data.uploaded, - table_watermarks: syncing_data.table_watermarks, + let result = result.map(|newly_uploaded_sstable_infos| { + // take `rev` so that old data is acked first + for sstable_info in newly_uploaded_sstable_infos.iter().rev() { + self.unsync_data.ack_flushed(sstable_info); + } + SyncedData { + newly_upload_ssts: newly_uploaded_sstable_infos, + uploaded_ssts: syncing_data.uploaded, + table_watermarks: syncing_data.table_watermarks, + } }); Poll::Ready(Some((epoch, result, syncing_data.sync_result_sender))) @@ -1029,23 +1292,15 @@ impl UploaderData { } } - /// Poll the success of the oldest spilled task of sealed data. Return `Poll::Ready(None)` if - /// there is no spilling task. - fn poll_sealed_spill_task(&mut self, cx: &mut Context<'_>) -> Poll> { - self.sealed_data.spilled_data.poll_success_spill(cx) - } - - /// Poll the success of the oldest spilled task of unsealed data. Return `Poll::Ready(None)` if + /// Poll the success of the oldest spilled task of unsync spill data. Return `Poll::Ready(None)` if /// there is no spilling task. - fn poll_unsealed_spill_task( - &mut self, - cx: &mut Context<'_>, - ) -> Poll> { + fn poll_spill_task(&mut self, cx: &mut Context<'_>) -> Poll> { // iterator from older epoch to new epoch so that the spill task are finished in epoch order - for unsealed_data in self.unsealed_data.values_mut() { - // if None, there is no spilling task. Search for the unsealed data of the next epoch in + for epoch_data in self.unsync_data.epoch_data.values_mut() { + // if None, there is no spilling task. Search for the unsync data of the next epoch in // the next iteration. - if let Some(sstable_info) = ready!(unsealed_data.spilled_data.poll_success_spill(cx)) { + if let Some(sstable_info) = ready!(epoch_data.spilled_data.poll_success_spill(cx)) { + self.unsync_data.ack_flushed(&sstable_info); return Poll::Ready(Some(sstable_info)); } } @@ -1093,17 +1348,12 @@ impl HummockUploader { data.abort(|| { HummockError::other(format!("previous epoch {} failed to sync", epoch)) }); - return Poll::Pending; } - } - } - - if let Some(sstable_info) = ready!(data.poll_sealed_spill_task(cx)) { - return Poll::Ready(UploaderEvent::DataSpilled(sstable_info)); + }; } - if let Some(sstable_info) = ready!(data.poll_unsealed_spill_task(cx)) { + if let Some(sstable_info) = ready!(data.poll_spill_task(cx)) { return Poll::Ready(UploaderEvent::DataSpilled(sstable_info)); } @@ -1156,6 +1406,7 @@ pub(crate) mod tests { use crate::mem_table::{ImmId, ImmutableMemtable}; use crate::monitor::HummockStateStoreMetrics; use crate::opts::StorageOpts; + use crate::store::SealCurrentEpochOptions; const INITIAL_EPOCH: HummockEpoch = test_epoch(5); pub(crate) const TEST_TABLE_ID: TableId = TableId { table_id: 233 }; @@ -1313,6 +1564,16 @@ pub(crate) mod tests { )]) } + impl HummockUploader { + fn local_seal_epoch_for_test(&mut self, instance_id: LocalInstanceId, epoch: HummockEpoch) { + self.local_seal_epoch( + instance_id, + epoch.next_epoch(), + SealCurrentEpochOptions::for_test(), + ); + } + } + #[tokio::test] pub async fn test_uploading_task_future() { let uploader_context = test_uploader_context(dummy_success_upload_future); @@ -1389,36 +1650,15 @@ pub(crate) mod tests { let mut uploader = test_uploader(dummy_success_upload_future); let epoch1 = INITIAL_EPOCH.next_epoch(); let imm = gen_imm(epoch1).await; - + uploader.init_instance(TEST_LOCAL_INSTANCE_ID, TEST_TABLE_ID, epoch1); uploader.add_imm(TEST_LOCAL_INSTANCE_ID, imm.clone()); - assert_eq!(1, uploader.data().unsealed_data.len()); - assert_eq!( - epoch1 as HummockEpoch, - *uploader.data().unsealed_data.first_key_value().unwrap().0 - ); - assert_eq!( - 1, - uploader - .data() - .unsealed_data - .first_key_value() - .unwrap() - .1 - .imms - .len() - ); - uploader.seal_epoch(epoch1); - assert_eq!(epoch1, uploader.max_sealed_epoch); - assert!(uploader.data().unsealed_data.is_empty()); - assert_eq!(1, uploader.data().sealed_data.imm_count()); + uploader.local_seal_epoch_for_test(TEST_LOCAL_INSTANCE_ID, epoch1); uploader.start_sync_epoch_for_test(epoch1); assert_eq!(epoch1 as HummockEpoch, uploader.max_syncing_epoch); - assert_eq!(0, uploader.data().sealed_data.imm_count()); - assert!(uploader.data().sealed_data.spilled_data.is_empty()); assert_eq!(1, uploader.data().syncing_data.len()); let syncing_data = uploader.data().syncing_data.front().unwrap(); - assert_eq!(epoch1 as HummockEpoch, syncing_data.sync_epoch()); + assert_eq!(epoch1 as HummockEpoch, syncing_data.sync_epoch); assert!(syncing_data.uploaded.is_empty()); assert!(syncing_data.uploading_tasks.is_some()); @@ -1456,6 +1696,32 @@ pub(crate) mod tests { assert_eq!(epoch1, uploader.max_committed_epoch()); } + #[tokio::test] + async fn test_empty_uploader_sync() { + let mut uploader = test_uploader(dummy_success_upload_future); + let epoch1 = INITIAL_EPOCH.next_epoch(); + + uploader.start_sync_epoch_for_test(epoch1); + assert_eq!(epoch1, uploader.max_syncing_epoch); + + match uploader.next_event().await { + UploaderEvent::SyncFinish(finished_epoch, data) => { + assert_eq!(epoch1, finished_epoch); + assert!(data.uploaded_ssts.is_empty()); + assert!(data.newly_upload_ssts.is_empty()); + } + _ => unreachable!(), + }; + assert_eq!(epoch1, uploader.max_synced_epoch()); + let new_pinned_version = uploader + .context + .pinned_version + .new_pin_version(test_hummock_version(epoch1)); + uploader.update_pinned_version(new_pinned_version); + assert!(uploader.data().syncing_data.is_empty()); + assert_eq!(epoch1, uploader.max_committed_epoch()); + } + #[tokio::test] async fn test_uploader_empty_epoch() { let mut uploader = test_uploader(dummy_success_upload_future); @@ -1463,9 +1729,9 @@ pub(crate) mod tests { let epoch2 = epoch1.next_epoch(); let imm = gen_imm(epoch2).await; // epoch1 is empty while epoch2 is not. Going to seal empty epoch1. + uploader.init_instance(TEST_LOCAL_INSTANCE_ID, TEST_TABLE_ID, epoch1); + uploader.local_seal_epoch_for_test(TEST_LOCAL_INSTANCE_ID, epoch1); uploader.add_imm(TEST_LOCAL_INSTANCE_ID, imm); - uploader.seal_epoch(epoch1); - assert_eq!(epoch1, uploader.max_sealed_epoch); uploader.start_sync_epoch_for_test(epoch1); assert_eq!(epoch1, uploader.max_syncing_epoch); @@ -1495,12 +1761,7 @@ pub(crate) mod tests { assert!(poll_fn(|cx| data.poll_syncing_task(cx, &uploader.context)) .await .is_none()); - assert!(poll_fn(|cx| data.poll_sealed_spill_task(cx)) - .await - .is_none()); - assert!(poll_fn(|cx| data.poll_unsealed_spill_task(cx)) - .await - .is_none()); + assert!(poll_fn(|cx| data.poll_spill_task(cx)).await.is_none()); } #[tokio::test] @@ -1521,27 +1782,23 @@ pub(crate) mod tests { uploader.update_pinned_version(version1); assert_eq!(epoch1, uploader.max_synced_epoch); assert_eq!(epoch1, uploader.max_syncing_epoch); - assert_eq!(epoch1, uploader.max_sealed_epoch); + uploader.init_instance(TEST_LOCAL_INSTANCE_ID, TEST_TABLE_ID, epoch6); uploader.add_imm(TEST_LOCAL_INSTANCE_ID, gen_imm(epoch6).await); uploader.update_pinned_version(version2); assert_eq!(epoch2, uploader.max_synced_epoch); assert_eq!(epoch2, uploader.max_syncing_epoch); - assert_eq!(epoch2, uploader.max_sealed_epoch); - uploader.seal_epoch(epoch6); - assert_eq!(epoch6, uploader.max_sealed_epoch); + uploader.local_seal_epoch_for_test(TEST_LOCAL_INSTANCE_ID, epoch6); uploader.update_pinned_version(version3); assert_eq!(epoch3, uploader.max_synced_epoch); assert_eq!(epoch3, uploader.max_syncing_epoch); - assert_eq!(epoch6, uploader.max_sealed_epoch); uploader.start_sync_epoch_for_test(epoch6); assert_eq!(epoch6, uploader.max_syncing_epoch); uploader.update_pinned_version(version4); assert_eq!(epoch4, uploader.max_synced_epoch); assert_eq!(epoch6, uploader.max_syncing_epoch); - assert_eq!(epoch6, uploader.max_sealed_epoch); match uploader.next_event().await { UploaderEvent::SyncFinish(epoch, _) => { @@ -1552,7 +1809,6 @@ pub(crate) mod tests { uploader.update_pinned_version(version5); assert_eq!(epoch6, uploader.max_synced_epoch); assert_eq!(epoch6, uploader.max_syncing_epoch); - assert_eq!(epoch6, uploader.max_sealed_epoch); } fn prepare_uploader_order_test( @@ -1648,6 +1904,9 @@ pub(crate) mod tests { let instance_id1 = 1; let instance_id2 = 2; + uploader.init_instance(instance_id1, TEST_TABLE_ID, epoch1); + uploader.init_instance(instance_id2, TEST_TABLE_ID, epoch2); + // imm2 contains data in newer epoch, but added first let imm2 = gen_imm_with_limiter(epoch2, memory_limiter).await; uploader.add_imm(instance_id2, imm2.clone()); @@ -1700,18 +1959,19 @@ pub(crate) mod tests { let epoch1_sync_payload = HashMap::from_iter([(instance_id1, vec![imm1_4.clone()])]); let (await_start1_4, finish_tx1_4) = new_task_notifier(get_payload_imm_ids(&epoch1_sync_payload)); - uploader.seal_epoch(epoch1); + uploader.local_seal_epoch_for_test(instance_id1, epoch1); uploader.start_sync_epoch_for_test(epoch1); await_start1_4.await; + let epoch3 = epoch2.next_epoch(); - uploader.seal_epoch(epoch2); + uploader.local_seal_epoch_for_test(instance_id1, epoch2); + uploader.local_seal_epoch_for_test(instance_id2, epoch2); // current uploader state: // unsealed: empty // sealed: epoch2: uploaded sst([imm2]) // syncing: epoch1: uploading: [imm1_4], [imm1_3], uploaded: sst([imm1_2, imm1_1]) - let epoch3 = epoch2.next_epoch(); let imm3_1 = gen_imm_with_limiter(epoch3, memory_limiter).await; let epoch3_spill_payload1 = HashMap::from_iter([(instance_id1, vec![imm3_1.clone()])]); uploader.add_imm(instance_id1, imm3_1.clone()); @@ -1735,6 +1995,7 @@ pub(crate) mod tests { // syncing: epoch1: uploading: [imm1_4], [imm1_3], uploaded: sst([imm1_2, imm1_1]) let epoch4 = epoch3.next_epoch(); + uploader.local_seal_epoch_for_test(instance_id1, epoch3); let imm4 = gen_imm_with_limiter(epoch4, memory_limiter).await; uploader.add_imm(instance_id1, imm4.clone()); assert_uploader_pending(&mut uploader).await; @@ -1800,7 +2061,7 @@ pub(crate) mod tests { // synced: epoch1: sst([imm1_4]), sst([imm1_3]), sst([imm1_2, imm1_1]) // epoch2: sst([imm2]) - uploader.seal_epoch(epoch3); + uploader.local_seal_epoch_for_test(instance_id2, epoch3); if let UploaderEvent::DataSpilled(sst) = uploader.next_event().await { assert_eq!(&get_payload_imm_ids(&epoch3_spill_payload1), sst.imm_ids()); } else { @@ -1814,7 +2075,8 @@ pub(crate) mod tests { // synced: epoch1: sst([imm1_4]), sst([imm1_3]), sst([imm1_2, imm1_1]) // epoch2: sst([imm2]) - uploader.seal_epoch(epoch4); + uploader.local_seal_epoch_for_test(instance_id1, epoch4); + uploader.local_seal_epoch_for_test(instance_id2, epoch4); let epoch4_sync_payload = HashMap::from_iter([(instance_id1, vec![imm4, imm3_3])]); let (await_start4_with_3_3, finish_tx4_with_3_3) = new_task_notifier(get_payload_imm_ids(&epoch4_sync_payload)); @@ -1877,9 +2139,14 @@ pub(crate) mod tests { let epoch1 = INITIAL_EPOCH.next_epoch(); let epoch2 = epoch1.next_epoch(); + let instance_id1 = 1; + let instance_id2 = 2; let flush_threshold = buffer_tracker.flush_threshold(); let memory_limiter = buffer_tracker.get_memory_limiter().clone(); + uploader.init_instance(instance_id1, TEST_TABLE_ID, epoch1); + uploader.init_instance(instance_id2, TEST_TABLE_ID, epoch2); + // imm2 contains data in newer epoch, but added first let mut total_memory = 0; while total_memory < flush_threshold { @@ -1888,15 +2155,15 @@ pub(crate) mod tests { if total_memory > flush_threshold { break; } - uploader.add_imm(TEST_LOCAL_INSTANCE_ID, imm); + uploader.add_imm(instance_id2, imm); } let imm = gen_imm_with_limiter(epoch1, Some(memory_limiter.as_ref())).await; - uploader.add_imm(TEST_LOCAL_INSTANCE_ID, imm); + uploader.add_imm(instance_id1, imm); assert!(uploader.may_flush()); for _ in 0..10 { let imm = gen_imm_with_limiter(epoch1, Some(memory_limiter.as_ref())).await; - uploader.add_imm(TEST_LOCAL_INSTANCE_ID, imm); + uploader.add_imm(instance_id1, imm); assert!(!uploader.may_flush()); } } diff --git a/src/storage/src/hummock/store/hummock_storage.rs b/src/storage/src/hummock/store/hummock_storage.rs index e52b87c7d8aba..2d89fdd401fa2 100644 --- a/src/storage/src/hummock/store/hummock_storage.rs +++ b/src/storage/src/hummock/store/hummock_storage.rs @@ -583,12 +583,6 @@ impl StateStore for HummockStorage { MemOrdering::SeqCst, ); } - self.hummock_event_sender - .send(HummockEvent::SealEpoch { - epoch, - is_checkpoint, - }) - .expect("should send success"); StoreLocalStatistic::flush_all(); } diff --git a/src/storage/src/hummock/store/local_hummock_storage.rs b/src/storage/src/hummock/store/local_hummock_storage.rs index 2f0ad3437efce..a14f3b450adff 100644 --- a/src/storage/src/hummock/store/local_hummock_storage.rs +++ b/src/storage/src/hummock/store/local_hummock_storage.rs @@ -513,6 +513,14 @@ impl LocalStateStore for LocalHummockStorage { "local state store of table id {:?} is init for more than once", self.table_id ); + if !self.is_replicated { + self.event_sender + .send(HummockEvent::InitEpoch { + instance_id: self.instance_id(), + init_epoch: options.epoch.curr, + }) + .expect("should succeed"); + } Ok(()) } @@ -544,14 +552,15 @@ impl LocalStateStore for LocalHummockStorage { }); } } - self.event_sender - .send(HummockEvent::LocalSealEpoch { - instance_id: self.instance_id(), - table_id: self.table_id, - epoch: prev_epoch, - opts, - }) - .expect("should be able to send") + if !self.is_replicated { + self.event_sender + .send(HummockEvent::LocalSealEpoch { + instance_id: self.instance_id(), + next_epoch, + opts, + }) + .expect("should be able to send"); + } } fn update_vnode_bitmap(&mut self, vnodes: Arc) -> Arc { diff --git a/src/stream/src/executor/actor.rs b/src/stream/src/executor/actor.rs index 249546031e229..1c73a3aeddad6 100644 --- a/src/stream/src/executor/actor.rs +++ b/src/stream/src/executor/actor.rs @@ -223,14 +223,15 @@ where ) .into())); - // Collect barriers to local barrier manager - self.barrier_manager.collect(id, &barrier); - // Then stop this actor if asked if barrier.is_stop(id) { - break Ok(()); + debug!(actor_id = id, epoch = ?barrier.epoch, "stop at barrier"); + break Ok(barrier); } + // Collect barriers to local barrier manager + self.barrier_manager.collect(id, &barrier); + // Tracing related work last_epoch = Some(barrier.epoch); span = barrier.tracing_context().attach(new_span(last_epoch)); @@ -238,7 +239,12 @@ where spawn_blocking_drop_stream(stream).await; - tracing::trace!(actor_id = id, "actor exit"); + let result = result.map(|stop_barrier| { + // Collect the stop barrier after the stream has been dropped to ensure that all resources + self.barrier_manager.collect(id, &stop_barrier); + }); + + tracing::debug!(actor_id = id, ok = result.is_ok(), "actor exit"); result } } diff --git a/src/stream/src/executor/backfill/arrangement_backfill.rs b/src/stream/src/executor/backfill/arrangement_backfill.rs index 557dda4f535ef..7920e8dceee80 100644 --- a/src/stream/src/executor/backfill/arrangement_backfill.rs +++ b/src/stream/src/executor/backfill/arrangement_backfill.rs @@ -551,7 +551,8 @@ where // If not finished then we need to update state, otherwise no need. if let Message::Barrier(barrier) = &msg { if is_completely_finished { - // If already finished, no need to persist any state. + // If already finished, no need to persist any state. But we need to advance the epoch anyway + self.state_table.commit(barrier.epoch).await?; } else { // If snapshot was empty, we do not need to backfill, // but we still need to persist the finished state. @@ -595,6 +596,10 @@ where #[for_await] for msg in upstream { if let Some(msg) = mapping_message(msg?, &self.output_indices) { + if let Message::Barrier(barrier) = &msg { + // If already finished, no need persist any state, but we need to advance the epoch of the state table anyway. + self.state_table.commit(barrier.epoch).await?; + } yield msg; } } diff --git a/src/stream/src/executor/backfill/no_shuffle_backfill.rs b/src/stream/src/executor/backfill/no_shuffle_backfill.rs index bd130fd8f52a2..e368086a97737 100644 --- a/src/stream/src/executor/backfill/no_shuffle_backfill.rs +++ b/src/stream/src/executor/backfill/no_shuffle_backfill.rs @@ -498,7 +498,10 @@ where // If not finished then we need to update state, otherwise no need. if let Message::Barrier(barrier) = &msg { if is_finished { - // If already finished, no need persist any state. + // If already finished, no need persist any state, but we need to advance the epoch of the state table anyway. + if let Some(table) = &mut self.state_table { + table.commit(barrier.epoch).await?; + } } else { // If snapshot was empty, we do not need to backfill, // but we still need to persist the finished state. @@ -564,6 +567,13 @@ where #[for_await] for msg in upstream { if let Some(msg) = mapping_message(msg?, &self.output_indices) { + if let Message::Barrier(barrier) = &msg { + // If already finished, no need persist any state, but we need to advance the epoch of the state table anyway. + if let Some(table) = &mut self.state_table { + table.commit(barrier.epoch).await?; + } + } + yield msg; } }