diff --git a/src/storage/hummock_sdk/src/table_watermark.rs b/src/storage/hummock_sdk/src/table_watermark.rs index 1fef491cacedc..51b45e70199d7 100644 --- a/src/storage/hummock_sdk/src/table_watermark.rs +++ b/src/storage/hummock_sdk/src/table_watermark.rs @@ -45,16 +45,27 @@ pub struct TableWatermarksIndex { pub staging_watermarks: VecDeque<(HummockEpoch, Arc<[VnodeWatermark]>)>, pub committed_watermarks: Option>, latest_epoch: HummockEpoch, - committed_epoch: HummockEpoch, + committed_epoch: Option, } impl TableWatermarksIndex { - pub fn new(watermark_direction: WatermarkDirection, committed_epoch: HummockEpoch) -> Self { + pub fn new( + watermark_direction: WatermarkDirection, + first_epoch: HummockEpoch, + first_vnode_watermark: Vec, + committed_epoch: Option, + ) -> Self { + if let Some(committed_epoch) = committed_epoch { + assert!(first_epoch > committed_epoch); + } Self { watermark_direction, - staging_watermarks: VecDeque::new(), + staging_watermarks: VecDeque::from_iter([( + first_epoch, + Arc::from(first_vnode_watermark), + )]), committed_watermarks: None, - latest_epoch: committed_epoch, + latest_epoch: first_epoch, committed_epoch, } } @@ -66,7 +77,7 @@ impl TableWatermarksIndex { Self { watermark_direction: committed_watermarks.direction, staging_watermarks: VecDeque::new(), - committed_epoch, + committed_epoch: Some(committed_epoch), latest_epoch: committed_epoch, committed_watermarks: Some(committed_watermarks), } @@ -238,11 +249,20 @@ impl TableWatermarksIndex { committed_epoch: HummockEpoch, ) { assert_eq!(self.watermark_direction, committed_watermark.direction); - assert!(self.committed_epoch <= committed_epoch); - if self.committed_epoch == committed_epoch { - return; + if let Some(prev_committed_epoch) = self.committed_epoch { + assert!(prev_committed_epoch <= committed_epoch); + if prev_committed_epoch == committed_epoch { + return; + } + } + if self.latest_epoch < committed_epoch { + warn!( + latest_epoch = self.latest_epoch, + committed_epoch, "committed_epoch exceed table watermark latest_epoch" + ); + self.latest_epoch = committed_epoch; } - self.committed_epoch = committed_epoch; + self.committed_epoch = Some(committed_epoch); self.committed_watermarks = Some(committed_watermark); // keep only watermark higher than committed epoch while let Some((old_epoch, _)) = self.staging_watermarks.front() @@ -902,11 +922,11 @@ mod tests { watermark2: Bytes, watermark3: Bytes, ) -> TableWatermarksIndex { - let mut index = TableWatermarksIndex::new(direction, COMMITTED_EPOCH); - index.add_epoch_watermark( - EPOCH1, - vec![VnodeWatermark::new(build_bitmap(0..4), watermark1.clone())].into(), + let mut index = TableWatermarksIndex::new( direction, + EPOCH1, + vec![VnodeWatermark::new(build_bitmap(0..4), watermark1.clone())], + Some(COMMITTED_EPOCH), ); index.add_epoch_watermark( EPOCH2, @@ -1057,7 +1077,7 @@ mod tests { .clone(), EPOCH1, ); - assert_eq!(EPOCH1, index.committed_epoch); + assert_eq!(EPOCH1, index.committed_epoch.unwrap()); assert_eq!(EPOCH2, index.latest_epoch); for vnode in 0..VirtualNode::COUNT { let vnode = VirtualNode::from_index(vnode); diff --git a/src/storage/src/hummock/event_handler/uploader/mod.rs b/src/storage/src/hummock/event_handler/uploader/mod.rs index 6c7ded7b3bf03..884b29de5edd3 100644 --- a/src/storage/src/hummock/event_handler/uploader/mod.rs +++ b/src/storage/src/hummock/event_handler/uploader/mod.rs @@ -1165,6 +1165,7 @@ impl HummockUploader { let UploaderState::Working(data) = &mut self.state else { return; }; + debug!(epoch, ?table_ids, "start epoch"); for table_id in &table_ids { let table_data = data .unsync_data diff --git a/src/storage/src/hummock/store/version.rs b/src/storage/src/hummock/store/version.rs index 51f52be6d669e..87114f83c7d0a 100644 --- a/src/storage/src/hummock/store/version.rs +++ b/src/storage/src/hummock/store/version.rs @@ -252,7 +252,13 @@ impl HummockReadVersion { .map(|table_watermarks| { TableWatermarksIndex::new_committed( table_watermarks.clone(), - committed_version.max_committed_epoch(), + committed_version + .version() + .state_table_info + .info() + .get(&table_id) + .expect("should exist") + .committed_epoch, ) }), staging: StagingVersion { @@ -313,17 +319,28 @@ impl HummockReadVersion { // old data comes first for imm_id in imms.iter().rev() { - let valid = match self.staging.imm.pop_back() { - None => false, - Some(prev_imm_id) => prev_imm_id.batch_id() == *imm_id, + let check_err = match self.staging.imm.pop_back() { + None => Some("empty".to_string()), + Some(prev_imm_id) => { + if prev_imm_id.batch_id() == *imm_id { + None + } else { + Some(format!( + "miss match id {} {}", + prev_imm_id.batch_id(), + *imm_id + )) + } + } }; assert!( - valid, + check_err.is_none(), "should be valid staging_sst.size {}, staging_sst.imm_ids {:?}, staging_sst.epochs {:?}, local_imm_ids {:?}, - instance_id {}", + instance_id {} + check_err {:?}", staging_sst_ref.imm_size, staging_sst_ref.imm_ids, staging_sst_ref.epochs, @@ -333,6 +350,7 @@ impl HummockReadVersion { .map(|imm| imm.batch_id()) .collect_vec(), self.instance_id, + check_err ); } @@ -341,54 +359,78 @@ impl HummockReadVersion { }, VersionUpdate::CommittedSnapshot(committed_version) => { - let max_committed_epoch = committed_version.max_committed_epoch(); - self.committed = committed_version; - + if let Some(info) = committed_version + .version() + .state_table_info + .info() + .get(&self.table_id) { - // TODO: remove it when support update staging local_sst - self.staging - .imm - .retain(|imm| imm.min_epoch() > max_committed_epoch); + let committed_epoch = info.committed_epoch; + self.staging.imm.retain(|imm| { + if self.is_replicated { + imm.min_epoch() > committed_epoch + } else { + assert!(imm.min_epoch() > committed_epoch); + true + } + }); self.staging.sst.retain(|sst| { - sst.epochs.first().expect("epochs not empty") > &max_committed_epoch + sst.epochs.first().expect("epochs not empty") > &committed_epoch }); // check epochs.last() > MCE assert!(self.staging.sst.iter().all(|sst| { - sst.epochs.last().expect("epochs not empty") > &max_committed_epoch + sst.epochs.last().expect("epochs not empty") > &committed_epoch })); - } - if let Some(committed_watermarks) = self - .committed - .version() - .table_watermarks - .get(&self.table_id) - { - if let Some(watermark_index) = &mut self.table_watermarks { - watermark_index.apply_committed_watermarks( - committed_watermarks.clone(), - self.committed.max_committed_epoch(), - ); - } else { - self.table_watermarks = Some(TableWatermarksIndex::new_committed( - committed_watermarks.clone(), - self.committed.max_committed_epoch(), - )); + if let Some(committed_watermarks) = self + .committed + .version() + .table_watermarks + .get(&self.table_id) + { + if let Some(watermark_index) = &mut self.table_watermarks { + watermark_index.apply_committed_watermarks( + committed_watermarks.clone(), + committed_epoch, + ); + } else { + self.table_watermarks = Some(TableWatermarksIndex::new_committed( + committed_watermarks.clone(), + committed_epoch, + )); + } } } + + self.committed = committed_version; } VersionUpdate::NewTableWatermark { direction, epoch, vnode_watermarks, - } => self - .table_watermarks - .get_or_insert_with(|| { - TableWatermarksIndex::new(direction, self.committed.max_committed_epoch()) - }) - .add_epoch_watermark(epoch, Arc::from(vnode_watermarks), direction), + } => { + if let Some(watermark_index) = &mut self.table_watermarks { + watermark_index.add_epoch_watermark( + epoch, + Arc::from(vnode_watermarks), + direction, + ); + } else { + self.table_watermarks = Some(TableWatermarksIndex::new( + direction, + epoch, + vnode_watermarks, + self.committed + .version() + .state_table_info + .info() + .get(&self.table_id) + .map(|info| info.committed_epoch), + )); + } + } } }