diff --git a/src/storage/hummock_sdk/src/table_watermark.rs b/src/storage/hummock_sdk/src/table_watermark.rs index 5a5b47c05cb10..51b45e70199d7 100644 --- a/src/storage/hummock_sdk/src/table_watermark.rs +++ b/src/storage/hummock_sdk/src/table_watermark.rs @@ -44,20 +44,28 @@ pub struct TableWatermarksIndex { // later epoch at the back pub staging_watermarks: VecDeque<(HummockEpoch, Arc<[VnodeWatermark]>)>, pub committed_watermarks: Option>, - latest_epoch: Option, + latest_epoch: HummockEpoch, committed_epoch: Option, } impl TableWatermarksIndex { pub fn new( watermark_direction: WatermarkDirection, + first_epoch: HummockEpoch, + first_vnode_watermark: Vec, committed_epoch: Option, ) -> Self { + if let Some(committed_epoch) = committed_epoch { + assert!(first_epoch > committed_epoch); + } Self { watermark_direction, - staging_watermarks: VecDeque::new(), + staging_watermarks: VecDeque::from_iter([( + first_epoch, + Arc::from(first_vnode_watermark), + )]), committed_watermarks: None, - latest_epoch: committed_epoch, + latest_epoch: first_epoch, committed_epoch, } } @@ -70,7 +78,7 @@ impl TableWatermarksIndex { watermark_direction: committed_watermarks.direction, staging_watermarks: VecDeque::new(), committed_epoch: Some(committed_epoch), - latest_epoch: Some(committed_epoch), + latest_epoch: committed_epoch, committed_watermarks: Some(committed_watermarks), } } @@ -207,11 +215,9 @@ impl TableWatermarksIndex { vnode_watermark_list: Arc<[VnodeWatermark]>, direction: WatermarkDirection, ) { - if let Some(prev_latest_epoch) = self.latest_epoch { - assert!(epoch > prev_latest_epoch); - } + assert!(epoch > self.latest_epoch); assert_eq!(self.watermark_direction, direction); - self.latest_epoch = Some(epoch); + self.latest_epoch = epoch; #[cfg(debug_assertions)] { let mut vnode_is_set = BitmapBuilder::zeroed(VirtualNode::COUNT); @@ -249,16 +255,12 @@ impl TableWatermarksIndex { return; } } - if let Some(latest_epoch) = self.latest_epoch { - if latest_epoch < committed_epoch { - warn!( - latest_epoch, - committed_epoch, "committed_epoch exceed table watermark latest_epoch" - ); - self.latest_epoch = Some(committed_epoch); - } - } else { - self.latest_epoch = Some(committed_epoch); + if self.latest_epoch < committed_epoch { + warn!( + latest_epoch = self.latest_epoch, + committed_epoch, "committed_epoch exceed table watermark latest_epoch" + ); + self.latest_epoch = committed_epoch; } self.committed_epoch = Some(committed_epoch); self.committed_watermarks = Some(committed_watermark); @@ -920,11 +922,11 @@ mod tests { watermark2: Bytes, watermark3: Bytes, ) -> TableWatermarksIndex { - let mut index = TableWatermarksIndex::new(direction, Some(COMMITTED_EPOCH)); - index.add_epoch_watermark( - EPOCH1, - vec![VnodeWatermark::new(build_bitmap(0..4), watermark1.clone())].into(), + let mut index = TableWatermarksIndex::new( direction, + EPOCH1, + vec![VnodeWatermark::new(build_bitmap(0..4), watermark1.clone())], + Some(COMMITTED_EPOCH), ); index.add_epoch_watermark( EPOCH2, @@ -1076,7 +1078,7 @@ mod tests { EPOCH1, ); assert_eq!(EPOCH1, index.committed_epoch.unwrap()); - assert_eq!(EPOCH2, index.latest_epoch.unwrap()); + assert_eq!(EPOCH2, index.latest_epoch); for vnode in 0..VirtualNode::COUNT { let vnode = VirtualNode::from_index(vnode); if (1..5).contains(&vnode.to_index()) { diff --git a/src/storage/src/hummock/store/version.rs b/src/storage/src/hummock/store/version.rs index 7bb37ede2abb5..c1ce855f4c48d 100644 --- a/src/storage/src/hummock/store/version.rs +++ b/src/storage/src/hummock/store/version.rs @@ -410,20 +410,27 @@ impl HummockReadVersion { direction, epoch, vnode_watermarks, - } => self - .table_watermarks - .get_or_insert_with(|| { - TableWatermarksIndex::new( + } => { + if let Some(watermark_index) = &mut self.table_watermarks { + watermark_index.add_epoch_watermark( + epoch, + Arc::from(vnode_watermarks), + direction, + ); + } else { + self.table_watermarks = Some(TableWatermarksIndex::new( direction, + epoch, + vnode_watermarks, self.committed .version() .state_table_info .info() .get(&self.table_id) .map(|info| info.committed_epoch), - ) - }) - .add_epoch_watermark(epoch, Arc::from(vnode_watermarks), direction), + )); + } + } } }