Skip to content

Commit

Permalink
fix(storage): use per-table committed epoch in read version update
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Jul 18, 2024
1 parent 10220ed commit 8e356b3
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 45 deletions.
31 changes: 19 additions & 12 deletions src/storage/hummock_sdk/src/table_watermark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,15 @@ pub struct TableWatermarksIndex {
// later epoch at the back
pub staging_watermarks: VecDeque<(HummockEpoch, Arc<[VnodeWatermark]>)>,
pub committed_watermarks: Option<Arc<TableWatermarks>>,
latest_epoch: HummockEpoch,
committed_epoch: HummockEpoch,
latest_epoch: Option<HummockEpoch>,
committed_epoch: Option<HummockEpoch>,
}

impl TableWatermarksIndex {
pub fn new(watermark_direction: WatermarkDirection, committed_epoch: HummockEpoch) -> Self {
pub fn new(
watermark_direction: WatermarkDirection,
committed_epoch: Option<HummockEpoch>,
) -> Self {
Self {
watermark_direction,
staging_watermarks: VecDeque::new(),
Expand All @@ -66,8 +69,8 @@ impl TableWatermarksIndex {
Self {
watermark_direction: committed_watermarks.direction,
staging_watermarks: VecDeque::new(),
committed_epoch,
latest_epoch: committed_epoch,
committed_epoch: Some(committed_epoch),
latest_epoch: Some(committed_epoch),
committed_watermarks: Some(committed_watermarks),
}
}
Expand Down Expand Up @@ -204,9 +207,11 @@ impl TableWatermarksIndex {
vnode_watermark_list: Arc<[VnodeWatermark]>,
direction: WatermarkDirection,
) {
assert!(epoch > self.latest_epoch);
if let Some(prev_latest_epoch) = self.latest_epoch {
assert!(epoch > prev_latest_epoch);
}
assert_eq!(self.watermark_direction, direction);
self.latest_epoch = epoch;
self.latest_epoch = Some(epoch);
#[cfg(debug_assertions)]
{
let mut vnode_is_set = BitmapBuilder::zeroed(VirtualNode::COUNT);
Expand Down Expand Up @@ -238,11 +243,13 @@ impl TableWatermarksIndex {
committed_epoch: HummockEpoch,
) {
assert_eq!(self.watermark_direction, committed_watermark.direction);
assert!(self.committed_epoch <= committed_epoch);
if self.committed_epoch == committed_epoch {
return;
if let Some(prev_committed_epoch) = self.committed_epoch {
assert!(prev_committed_epoch <= committed_epoch);
if prev_committed_epoch == committed_epoch {
return;
}
}
self.committed_epoch = committed_epoch;
self.committed_epoch = Some(committed_epoch);
self.committed_watermarks = Some(committed_watermark);
// keep only watermark higher than committed epoch
while let Some((old_epoch, _)) = self.staging_watermarks.front()
Expand Down Expand Up @@ -902,7 +909,7 @@ mod tests {
watermark2: Bytes,
watermark3: Bytes,
) -> TableWatermarksIndex {
let mut index = TableWatermarksIndex::new(direction, COMMITTED_EPOCH);
let mut index = TableWatermarksIndex::new(direction, Some(COMMITTED_EPOCH));
index.add_epoch_watermark(
EPOCH1,
vec![VnodeWatermark::new(build_bitmap(0..4), watermark1.clone())].into(),
Expand Down
1 change: 1 addition & 0 deletions src/storage/src/hummock/event_handler/uploader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1160,6 +1160,7 @@ impl HummockUploader {
let UploaderState::Working(data) = &mut self.state else {
return;
};
debug!(epoch, ?table_ids, "start epoch");
for table_id in &table_ids {
let table_data = data
.unsync_data
Expand Down
101 changes: 68 additions & 33 deletions src/storage/src/hummock/store/version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,13 @@ impl HummockReadVersion {
.map(|table_watermarks| {
TableWatermarksIndex::new_committed(
table_watermarks.clone(),
committed_version.max_committed_epoch(),
committed_version
.version()
.state_table_info
.info()
.get(&table_id)
.expect("should exist")
.committed_epoch,
)
}),
staging: StagingVersion {
Expand Down Expand Up @@ -313,17 +319,28 @@ impl HummockReadVersion {

// old data comes first
for imm_id in imms.iter().rev() {
let valid = match self.staging.imm.pop_back() {
None => false,
Some(prev_imm_id) => prev_imm_id.batch_id() == *imm_id,
let check_err = match self.staging.imm.pop_back() {
None => Some("empty".to_string()),
Some(prev_imm_id) => {
if prev_imm_id.batch_id() == *imm_id {
None
} else {
Some(format!(
"miss match id {} {}",
prev_imm_id.batch_id(),
*imm_id
))
}
}
};
assert!(
valid,
check_err.is_none(),
"should be valid staging_sst.size {},
staging_sst.imm_ids {:?},
staging_sst.epochs {:?},
local_imm_ids {:?},
instance_id {}",
instance_id {}
check_err {:?}",
staging_sst_ref.imm_size,
staging_sst_ref.imm_ids,
staging_sst_ref.epochs,
Expand All @@ -333,6 +350,7 @@ impl HummockReadVersion {
.map(|imm| imm.batch_id())
.collect_vec(),
self.instance_id,
check_err
);
}

Expand All @@ -341,43 +359,52 @@ impl HummockReadVersion {
},

VersionUpdate::CommittedSnapshot(committed_version) => {
let max_committed_epoch = committed_version.max_committed_epoch();
self.committed = committed_version;

if let Some(info) = committed_version
.version()
.state_table_info
.info()
.get(&self.table_id)
{
// TODO: remove it when support update staging local_sst
self.staging
.imm
.retain(|imm| imm.min_epoch() > max_committed_epoch);
let committed_epoch = info.committed_epoch;
self.staging.imm.retain(|imm| {
if self.is_replicated {
imm.min_epoch() > committed_epoch
} else {
assert!(imm.min_epoch() > committed_epoch);
true
}
});

self.staging.sst.retain(|sst| {
sst.epochs.first().expect("epochs not empty") > &max_committed_epoch
sst.epochs.first().expect("epochs not empty") > &committed_epoch
});

// check epochs.last() > MCE
assert!(self.staging.sst.iter().all(|sst| {
sst.epochs.last().expect("epochs not empty") > &max_committed_epoch
sst.epochs.last().expect("epochs not empty") > &committed_epoch
}));
}

if let Some(committed_watermarks) = self
.committed
.version()
.table_watermarks
.get(&self.table_id)
{
if let Some(watermark_index) = &mut self.table_watermarks {
watermark_index.apply_committed_watermarks(
committed_watermarks.clone(),
self.committed.max_committed_epoch(),
);
} else {
self.table_watermarks = Some(TableWatermarksIndex::new_committed(
committed_watermarks.clone(),
self.committed.max_committed_epoch(),
));
if let Some(committed_watermarks) = self
.committed
.version()
.table_watermarks
.get(&self.table_id)
{
if let Some(watermark_index) = &mut self.table_watermarks {
watermark_index.apply_committed_watermarks(
committed_watermarks.clone(),
committed_epoch,
);
} else {
self.table_watermarks = Some(TableWatermarksIndex::new_committed(
committed_watermarks.clone(),
committed_epoch,
));
}
}
}

self.committed = committed_version;
}
VersionUpdate::NewTableWatermark {
direction,
Expand All @@ -386,7 +413,15 @@ impl HummockReadVersion {
} => self
.table_watermarks
.get_or_insert_with(|| {
TableWatermarksIndex::new(direction, self.committed.max_committed_epoch())
TableWatermarksIndex::new(
direction,
self.committed
.version()
.state_table_info
.info()
.get(&self.table_id)
.map(|info| info.committed_epoch),
)
})
.add_epoch_watermark(epoch, Arc::from(vnode_watermarks), direction),
}
Expand Down
2 changes: 2 additions & 0 deletions src/storage/src/hummock/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use risingwave_hummock_sdk::version::HummockVersion;
use risingwave_hummock_sdk::{can_concat, HummockEpoch};
use risingwave_pb::hummock::SstableInfo;
use tokio::sync::oneshot::{channel, Receiver, Sender};
use tracing::error;

use super::{HummockError, HummockResult};
use crate::error::StorageResult;
Expand Down Expand Up @@ -623,6 +624,7 @@ pub(crate) async fn wait_for_epoch(
let mut receiver = notifier.subscribe();
// avoid unnecessary check in the loop if the value does not change
let max_committed_epoch = *receiver.borrow_and_update();
error!(max_committed_epoch, wait_epoch, "wait_for_epoch");
if max_committed_epoch >= wait_epoch {
return Ok(());
}
Expand Down

0 comments on commit 8e356b3

Please sign in to comment.