From 8e356b3097fb00e2167db47c26fd3b1ec68f1beb Mon Sep 17 00:00:00 2001 From: William Wen Date: Thu, 18 Jul 2024 17:24:56 +0800 Subject: [PATCH 1/3] fix(storage): use per-table committed epoch in read version update --- .../hummock_sdk/src/table_watermark.rs | 31 +++--- .../src/hummock/event_handler/uploader/mod.rs | 1 + src/storage/src/hummock/store/version.rs | 101 ++++++++++++------ src/storage/src/hummock/utils.rs | 2 + 4 files changed, 90 insertions(+), 45 deletions(-) diff --git a/src/storage/hummock_sdk/src/table_watermark.rs b/src/storage/hummock_sdk/src/table_watermark.rs index 1fef491cacedc..3e4b24cf3a701 100644 --- a/src/storage/hummock_sdk/src/table_watermark.rs +++ b/src/storage/hummock_sdk/src/table_watermark.rs @@ -44,12 +44,15 @@ pub struct TableWatermarksIndex { // later epoch at the back pub staging_watermarks: VecDeque<(HummockEpoch, Arc<[VnodeWatermark]>)>, pub committed_watermarks: Option>, - latest_epoch: HummockEpoch, - committed_epoch: HummockEpoch, + latest_epoch: Option, + committed_epoch: Option, } impl TableWatermarksIndex { - pub fn new(watermark_direction: WatermarkDirection, committed_epoch: HummockEpoch) -> Self { + pub fn new( + watermark_direction: WatermarkDirection, + committed_epoch: Option, + ) -> Self { Self { watermark_direction, staging_watermarks: VecDeque::new(), @@ -66,8 +69,8 @@ impl TableWatermarksIndex { Self { watermark_direction: committed_watermarks.direction, staging_watermarks: VecDeque::new(), - committed_epoch, - latest_epoch: committed_epoch, + committed_epoch: Some(committed_epoch), + latest_epoch: Some(committed_epoch), committed_watermarks: Some(committed_watermarks), } } @@ -204,9 +207,11 @@ impl TableWatermarksIndex { vnode_watermark_list: Arc<[VnodeWatermark]>, direction: WatermarkDirection, ) { - assert!(epoch > self.latest_epoch); + if let Some(prev_latest_epoch) = self.latest_epoch { + assert!(epoch > prev_latest_epoch); + } assert_eq!(self.watermark_direction, direction); - self.latest_epoch = epoch; + self.latest_epoch = Some(epoch); #[cfg(debug_assertions)] { let mut vnode_is_set = BitmapBuilder::zeroed(VirtualNode::COUNT); @@ -238,11 +243,13 @@ impl TableWatermarksIndex { committed_epoch: HummockEpoch, ) { assert_eq!(self.watermark_direction, committed_watermark.direction); - assert!(self.committed_epoch <= committed_epoch); - if self.committed_epoch == committed_epoch { - return; + if let Some(prev_committed_epoch) = self.committed_epoch { + assert!(prev_committed_epoch <= committed_epoch); + if prev_committed_epoch == committed_epoch { + return; + } } - self.committed_epoch = committed_epoch; + self.committed_epoch = Some(committed_epoch); self.committed_watermarks = Some(committed_watermark); // keep only watermark higher than committed epoch while let Some((old_epoch, _)) = self.staging_watermarks.front() @@ -902,7 +909,7 @@ mod tests { watermark2: Bytes, watermark3: Bytes, ) -> TableWatermarksIndex { - let mut index = TableWatermarksIndex::new(direction, COMMITTED_EPOCH); + let mut index = TableWatermarksIndex::new(direction, Some(COMMITTED_EPOCH)); index.add_epoch_watermark( EPOCH1, vec![VnodeWatermark::new(build_bitmap(0..4), watermark1.clone())].into(), diff --git a/src/storage/src/hummock/event_handler/uploader/mod.rs b/src/storage/src/hummock/event_handler/uploader/mod.rs index 101f54541fede..24b21d4face9b 100644 --- a/src/storage/src/hummock/event_handler/uploader/mod.rs +++ b/src/storage/src/hummock/event_handler/uploader/mod.rs @@ -1160,6 +1160,7 @@ impl HummockUploader { let UploaderState::Working(data) = &mut self.state else { return; }; + debug!(epoch, ?table_ids, "start epoch"); for table_id in &table_ids { let table_data = data .unsync_data diff --git a/src/storage/src/hummock/store/version.rs b/src/storage/src/hummock/store/version.rs index 7db6e1edf5e99..7bb37ede2abb5 100644 --- a/src/storage/src/hummock/store/version.rs +++ b/src/storage/src/hummock/store/version.rs @@ -252,7 +252,13 @@ impl HummockReadVersion { .map(|table_watermarks| { TableWatermarksIndex::new_committed( table_watermarks.clone(), - committed_version.max_committed_epoch(), + committed_version + .version() + .state_table_info + .info() + .get(&table_id) + .expect("should exist") + .committed_epoch, ) }), staging: StagingVersion { @@ -313,17 +319,28 @@ impl HummockReadVersion { // old data comes first for imm_id in imms.iter().rev() { - let valid = match self.staging.imm.pop_back() { - None => false, - Some(prev_imm_id) => prev_imm_id.batch_id() == *imm_id, + let check_err = match self.staging.imm.pop_back() { + None => Some("empty".to_string()), + Some(prev_imm_id) => { + if prev_imm_id.batch_id() == *imm_id { + None + } else { + Some(format!( + "miss match id {} {}", + prev_imm_id.batch_id(), + *imm_id + )) + } + } }; assert!( - valid, + check_err.is_none(), "should be valid staging_sst.size {}, staging_sst.imm_ids {:?}, staging_sst.epochs {:?}, local_imm_ids {:?}, - instance_id {}", + instance_id {} + check_err {:?}", staging_sst_ref.imm_size, staging_sst_ref.imm_ids, staging_sst_ref.epochs, @@ -333,6 +350,7 @@ impl HummockReadVersion { .map(|imm| imm.batch_id()) .collect_vec(), self.instance_id, + check_err ); } @@ -341,43 +359,52 @@ impl HummockReadVersion { }, VersionUpdate::CommittedSnapshot(committed_version) => { - let max_committed_epoch = committed_version.max_committed_epoch(); - self.committed = committed_version; - + if let Some(info) = committed_version + .version() + .state_table_info + .info() + .get(&self.table_id) { - // TODO: remove it when support update staging local_sst - self.staging - .imm - .retain(|imm| imm.min_epoch() > max_committed_epoch); + let committed_epoch = info.committed_epoch; + self.staging.imm.retain(|imm| { + if self.is_replicated { + imm.min_epoch() > committed_epoch + } else { + assert!(imm.min_epoch() > committed_epoch); + true + } + }); self.staging.sst.retain(|sst| { - sst.epochs.first().expect("epochs not empty") > &max_committed_epoch + sst.epochs.first().expect("epochs not empty") > &committed_epoch }); // check epochs.last() > MCE assert!(self.staging.sst.iter().all(|sst| { - sst.epochs.last().expect("epochs not empty") > &max_committed_epoch + sst.epochs.last().expect("epochs not empty") > &committed_epoch })); - } - if let Some(committed_watermarks) = self - .committed - .version() - .table_watermarks - .get(&self.table_id) - { - if let Some(watermark_index) = &mut self.table_watermarks { - watermark_index.apply_committed_watermarks( - committed_watermarks.clone(), - self.committed.max_committed_epoch(), - ); - } else { - self.table_watermarks = Some(TableWatermarksIndex::new_committed( - committed_watermarks.clone(), - self.committed.max_committed_epoch(), - )); + if let Some(committed_watermarks) = self + .committed + .version() + .table_watermarks + .get(&self.table_id) + { + if let Some(watermark_index) = &mut self.table_watermarks { + watermark_index.apply_committed_watermarks( + committed_watermarks.clone(), + committed_epoch, + ); + } else { + self.table_watermarks = Some(TableWatermarksIndex::new_committed( + committed_watermarks.clone(), + committed_epoch, + )); + } } } + + self.committed = committed_version; } VersionUpdate::NewTableWatermark { direction, @@ -386,7 +413,15 @@ impl HummockReadVersion { } => self .table_watermarks .get_or_insert_with(|| { - TableWatermarksIndex::new(direction, self.committed.max_committed_epoch()) + TableWatermarksIndex::new( + direction, + self.committed + .version() + .state_table_info + .info() + .get(&self.table_id) + .map(|info| info.committed_epoch), + ) }) .add_epoch_watermark(epoch, Arc::from(vnode_watermarks), direction), } diff --git a/src/storage/src/hummock/utils.rs b/src/storage/src/hummock/utils.rs index 4c270ee736b97..0ec2200e6ef0e 100644 --- a/src/storage/src/hummock/utils.rs +++ b/src/storage/src/hummock/utils.rs @@ -32,6 +32,7 @@ use risingwave_hummock_sdk::version::HummockVersion; use risingwave_hummock_sdk::{can_concat, HummockEpoch}; use risingwave_pb::hummock::SstableInfo; use tokio::sync::oneshot::{channel, Receiver, Sender}; +use tracing::error; use super::{HummockError, HummockResult}; use crate::error::StorageResult; @@ -623,6 +624,7 @@ pub(crate) async fn wait_for_epoch( let mut receiver = notifier.subscribe(); // avoid unnecessary check in the loop if the value does not change let max_committed_epoch = *receiver.borrow_and_update(); + error!(max_committed_epoch, wait_epoch, "wait_for_epoch"); if max_committed_epoch >= wait_epoch { return Ok(()); } From d645decae2ce94a535321dba7fabdc8d2620d85c Mon Sep 17 00:00:00 2001 From: William Wen Date: Thu, 18 Jul 2024 18:18:03 +0800 Subject: [PATCH 2/3] fix --- src/storage/hummock_sdk/src/table_watermark.rs | 15 +++++++++++++-- src/storage/src/hummock/utils.rs | 2 -- 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/src/storage/hummock_sdk/src/table_watermark.rs b/src/storage/hummock_sdk/src/table_watermark.rs index 3e4b24cf3a701..5a5b47c05cb10 100644 --- a/src/storage/hummock_sdk/src/table_watermark.rs +++ b/src/storage/hummock_sdk/src/table_watermark.rs @@ -249,6 +249,17 @@ impl TableWatermarksIndex { return; } } + if let Some(latest_epoch) = self.latest_epoch { + if latest_epoch < committed_epoch { + warn!( + latest_epoch, + committed_epoch, "committed_epoch exceed table watermark latest_epoch" + ); + self.latest_epoch = Some(committed_epoch); + } + } else { + self.latest_epoch = Some(committed_epoch); + } self.committed_epoch = Some(committed_epoch); self.committed_watermarks = Some(committed_watermark); // keep only watermark higher than committed epoch @@ -1064,8 +1075,8 @@ mod tests { .clone(), EPOCH1, ); - assert_eq!(EPOCH1, index.committed_epoch); - assert_eq!(EPOCH2, index.latest_epoch); + assert_eq!(EPOCH1, index.committed_epoch.unwrap()); + assert_eq!(EPOCH2, index.latest_epoch.unwrap()); for vnode in 0..VirtualNode::COUNT { let vnode = VirtualNode::from_index(vnode); if (1..5).contains(&vnode.to_index()) { diff --git a/src/storage/src/hummock/utils.rs b/src/storage/src/hummock/utils.rs index 0ec2200e6ef0e..4c270ee736b97 100644 --- a/src/storage/src/hummock/utils.rs +++ b/src/storage/src/hummock/utils.rs @@ -32,7 +32,6 @@ use risingwave_hummock_sdk::version::HummockVersion; use risingwave_hummock_sdk::{can_concat, HummockEpoch}; use risingwave_pb::hummock::SstableInfo; use tokio::sync::oneshot::{channel, Receiver, Sender}; -use tracing::error; use super::{HummockError, HummockResult}; use crate::error::StorageResult; @@ -624,7 +623,6 @@ pub(crate) async fn wait_for_epoch( let mut receiver = notifier.subscribe(); // avoid unnecessary check in the loop if the value does not change let max_committed_epoch = *receiver.borrow_and_update(); - error!(max_committed_epoch, wait_epoch, "wait_for_epoch"); if max_committed_epoch >= wait_epoch { return Ok(()); } From 59d023c535f6621037575a12e17d2d8ddc5142d6 Mon Sep 17 00:00:00 2001 From: William Wen Date: Tue, 23 Jul 2024 13:04:58 +0800 Subject: [PATCH 3/3] fix latest epoch --- .../hummock_sdk/src/table_watermark.rs | 48 ++++++++++--------- src/storage/src/hummock/store/version.rs | 21 +++++--- 2 files changed, 39 insertions(+), 30 deletions(-) diff --git a/src/storage/hummock_sdk/src/table_watermark.rs b/src/storage/hummock_sdk/src/table_watermark.rs index 5a5b47c05cb10..51b45e70199d7 100644 --- a/src/storage/hummock_sdk/src/table_watermark.rs +++ b/src/storage/hummock_sdk/src/table_watermark.rs @@ -44,20 +44,28 @@ pub struct TableWatermarksIndex { // later epoch at the back pub staging_watermarks: VecDeque<(HummockEpoch, Arc<[VnodeWatermark]>)>, pub committed_watermarks: Option>, - latest_epoch: Option, + latest_epoch: HummockEpoch, committed_epoch: Option, } impl TableWatermarksIndex { pub fn new( watermark_direction: WatermarkDirection, + first_epoch: HummockEpoch, + first_vnode_watermark: Vec, committed_epoch: Option, ) -> Self { + if let Some(committed_epoch) = committed_epoch { + assert!(first_epoch > committed_epoch); + } Self { watermark_direction, - staging_watermarks: VecDeque::new(), + staging_watermarks: VecDeque::from_iter([( + first_epoch, + Arc::from(first_vnode_watermark), + )]), committed_watermarks: None, - latest_epoch: committed_epoch, + latest_epoch: first_epoch, committed_epoch, } } @@ -70,7 +78,7 @@ impl TableWatermarksIndex { watermark_direction: committed_watermarks.direction, staging_watermarks: VecDeque::new(), committed_epoch: Some(committed_epoch), - latest_epoch: Some(committed_epoch), + latest_epoch: committed_epoch, committed_watermarks: Some(committed_watermarks), } } @@ -207,11 +215,9 @@ impl TableWatermarksIndex { vnode_watermark_list: Arc<[VnodeWatermark]>, direction: WatermarkDirection, ) { - if let Some(prev_latest_epoch) = self.latest_epoch { - assert!(epoch > prev_latest_epoch); - } + assert!(epoch > self.latest_epoch); assert_eq!(self.watermark_direction, direction); - self.latest_epoch = Some(epoch); + self.latest_epoch = epoch; #[cfg(debug_assertions)] { let mut vnode_is_set = BitmapBuilder::zeroed(VirtualNode::COUNT); @@ -249,16 +255,12 @@ impl TableWatermarksIndex { return; } } - if let Some(latest_epoch) = self.latest_epoch { - if latest_epoch < committed_epoch { - warn!( - latest_epoch, - committed_epoch, "committed_epoch exceed table watermark latest_epoch" - ); - self.latest_epoch = Some(committed_epoch); - } - } else { - self.latest_epoch = Some(committed_epoch); + if self.latest_epoch < committed_epoch { + warn!( + latest_epoch = self.latest_epoch, + committed_epoch, "committed_epoch exceed table watermark latest_epoch" + ); + self.latest_epoch = committed_epoch; } self.committed_epoch = Some(committed_epoch); self.committed_watermarks = Some(committed_watermark); @@ -920,11 +922,11 @@ mod tests { watermark2: Bytes, watermark3: Bytes, ) -> TableWatermarksIndex { - let mut index = TableWatermarksIndex::new(direction, Some(COMMITTED_EPOCH)); - index.add_epoch_watermark( - EPOCH1, - vec![VnodeWatermark::new(build_bitmap(0..4), watermark1.clone())].into(), + let mut index = TableWatermarksIndex::new( direction, + EPOCH1, + vec![VnodeWatermark::new(build_bitmap(0..4), watermark1.clone())], + Some(COMMITTED_EPOCH), ); index.add_epoch_watermark( EPOCH2, @@ -1076,7 +1078,7 @@ mod tests { EPOCH1, ); assert_eq!(EPOCH1, index.committed_epoch.unwrap()); - assert_eq!(EPOCH2, index.latest_epoch.unwrap()); + assert_eq!(EPOCH2, index.latest_epoch); for vnode in 0..VirtualNode::COUNT { let vnode = VirtualNode::from_index(vnode); if (1..5).contains(&vnode.to_index()) { diff --git a/src/storage/src/hummock/store/version.rs b/src/storage/src/hummock/store/version.rs index 7bb37ede2abb5..c1ce855f4c48d 100644 --- a/src/storage/src/hummock/store/version.rs +++ b/src/storage/src/hummock/store/version.rs @@ -410,20 +410,27 @@ impl HummockReadVersion { direction, epoch, vnode_watermarks, - } => self - .table_watermarks - .get_or_insert_with(|| { - TableWatermarksIndex::new( + } => { + if let Some(watermark_index) = &mut self.table_watermarks { + watermark_index.add_epoch_watermark( + epoch, + Arc::from(vnode_watermarks), + direction, + ); + } else { + self.table_watermarks = Some(TableWatermarksIndex::new( direction, + epoch, + vnode_watermarks, self.committed .version() .state_table_info .info() .get(&self.table_id) .map(|info| info.committed_epoch), - ) - }) - .add_epoch_watermark(epoch, Arc::from(vnode_watermarks), direction), + )); + } + } } }