Skip to content

Commit

Permalink
fix latest epoch
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Jul 23, 2024
1 parent d2e02b0 commit 59d023c
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 30 deletions.
48 changes: 25 additions & 23 deletions src/storage/hummock_sdk/src/table_watermark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,20 +44,28 @@ pub struct TableWatermarksIndex {
// later epoch at the back
pub staging_watermarks: VecDeque<(HummockEpoch, Arc<[VnodeWatermark]>)>,
pub committed_watermarks: Option<Arc<TableWatermarks>>,
latest_epoch: Option<HummockEpoch>,
latest_epoch: HummockEpoch,
committed_epoch: Option<HummockEpoch>,
}

impl TableWatermarksIndex {
pub fn new(
watermark_direction: WatermarkDirection,
first_epoch: HummockEpoch,
first_vnode_watermark: Vec<VnodeWatermark>,
committed_epoch: Option<HummockEpoch>,
) -> Self {
if let Some(committed_epoch) = committed_epoch {
assert!(first_epoch > committed_epoch);
}
Self {
watermark_direction,
staging_watermarks: VecDeque::new(),
staging_watermarks: VecDeque::from_iter([(
first_epoch,
Arc::from(first_vnode_watermark),
)]),
committed_watermarks: None,
latest_epoch: committed_epoch,
latest_epoch: first_epoch,
committed_epoch,
}
}
Expand All @@ -70,7 +78,7 @@ impl TableWatermarksIndex {
watermark_direction: committed_watermarks.direction,
staging_watermarks: VecDeque::new(),
committed_epoch: Some(committed_epoch),
latest_epoch: Some(committed_epoch),
latest_epoch: committed_epoch,
committed_watermarks: Some(committed_watermarks),
}
}
Expand Down Expand Up @@ -207,11 +215,9 @@ impl TableWatermarksIndex {
vnode_watermark_list: Arc<[VnodeWatermark]>,
direction: WatermarkDirection,
) {
if let Some(prev_latest_epoch) = self.latest_epoch {
assert!(epoch > prev_latest_epoch);
}
assert!(epoch > self.latest_epoch);
assert_eq!(self.watermark_direction, direction);
self.latest_epoch = Some(epoch);
self.latest_epoch = epoch;
#[cfg(debug_assertions)]
{
let mut vnode_is_set = BitmapBuilder::zeroed(VirtualNode::COUNT);
Expand Down Expand Up @@ -249,16 +255,12 @@ impl TableWatermarksIndex {
return;
}
}
if let Some(latest_epoch) = self.latest_epoch {
if latest_epoch < committed_epoch {
warn!(
latest_epoch,
committed_epoch, "committed_epoch exceed table watermark latest_epoch"
);
self.latest_epoch = Some(committed_epoch);
}
} else {
self.latest_epoch = Some(committed_epoch);
if self.latest_epoch < committed_epoch {
warn!(
latest_epoch = self.latest_epoch,
committed_epoch, "committed_epoch exceed table watermark latest_epoch"
);
self.latest_epoch = committed_epoch;
}
self.committed_epoch = Some(committed_epoch);
self.committed_watermarks = Some(committed_watermark);
Expand Down Expand Up @@ -920,11 +922,11 @@ mod tests {
watermark2: Bytes,
watermark3: Bytes,
) -> TableWatermarksIndex {
let mut index = TableWatermarksIndex::new(direction, Some(COMMITTED_EPOCH));
index.add_epoch_watermark(
EPOCH1,
vec![VnodeWatermark::new(build_bitmap(0..4), watermark1.clone())].into(),
let mut index = TableWatermarksIndex::new(
direction,
EPOCH1,
vec![VnodeWatermark::new(build_bitmap(0..4), watermark1.clone())],
Some(COMMITTED_EPOCH),
);
index.add_epoch_watermark(
EPOCH2,
Expand Down Expand Up @@ -1076,7 +1078,7 @@ mod tests {
EPOCH1,
);
assert_eq!(EPOCH1, index.committed_epoch.unwrap());
assert_eq!(EPOCH2, index.latest_epoch.unwrap());
assert_eq!(EPOCH2, index.latest_epoch);
for vnode in 0..VirtualNode::COUNT {
let vnode = VirtualNode::from_index(vnode);
if (1..5).contains(&vnode.to_index()) {
Expand Down
21 changes: 14 additions & 7 deletions src/storage/src/hummock/store/version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -410,20 +410,27 @@ impl HummockReadVersion {
direction,
epoch,
vnode_watermarks,
} => self
.table_watermarks
.get_or_insert_with(|| {
TableWatermarksIndex::new(
} => {
if let Some(watermark_index) = &mut self.table_watermarks {
watermark_index.add_epoch_watermark(
epoch,
Arc::from(vnode_watermarks),
direction,
);
} else {
self.table_watermarks = Some(TableWatermarksIndex::new(
direction,
epoch,
vnode_watermarks,
self.committed
.version()
.state_table_info
.info()
.get(&self.table_id)
.map(|info| info.committed_epoch),
)
})
.add_epoch_watermark(epoch, Arc::from(vnode_watermarks), direction),
));
}
}
}
}

Expand Down

0 comments on commit 59d023c

Please sign in to comment.