Skip to content

Commit

Permalink
revert changes to table watermark
Browse files Browse the repository at this point in the history
  • Loading branch information
zwang28 committed Sep 12, 2024
1 parent ba67bd1 commit 9631102
Show file tree
Hide file tree
Showing 4 changed files with 188 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@ use std::sync::Arc;

use risingwave_common::catalog::TableId;
use risingwave_hummock_sdk::compaction_group::hummock_version_ext::{
safe_epoch_read_table_watermarks_impl, table_watermarks_by_table_ids_impl,
safe_epoch_read_table_watermarks_impl, safe_epoch_table_watermarks_impl,
};
use risingwave_hummock_sdk::table_watermark::{ReadTableWatermark, TableWatermarks};
use risingwave_hummock_sdk::version::HummockVersionStateTableInfo;
use risingwave_hummock_sdk::HummockCompactionTaskId;
use risingwave_pb::hummock::compact_task::TaskType;

Expand All @@ -41,14 +42,16 @@ impl CompactionSelector for VnodeWatermarkCompactionSelector {
level_handlers,
developer_config,
table_watermarks,
state_table_info,
member_table_ids,
..
} = context;
let dynamic_level_core =
DynamicLevelSelectorCore::new(group.compaction_config.clone(), developer_config);
let ctx = dynamic_level_core.calculate_level_base_size(levels);
let mut picker = VnodeWatermarkCompactionPicker::new();
let table_watermarks = safe_epoch_read_table_watermarks(table_watermarks, member_table_ids);
let table_watermarks =
safe_epoch_read_table_watermarks(table_watermarks, state_table_info, member_table_ids);
let compaction_input = picker.pick_compaction(levels, level_handlers, &table_watermarks)?;
compaction_input.add_pending_task(task_id, level_handlers);
Some(create_compaction_task(
Expand All @@ -70,10 +73,12 @@ impl CompactionSelector for VnodeWatermarkCompactionSelector {

fn safe_epoch_read_table_watermarks(
table_watermarks: &HashMap<TableId, Arc<TableWatermarks>>,
state_table_info: &HummockVersionStateTableInfo,
member_table_ids: &BTreeSet<TableId>,
) -> BTreeMap<TableId, ReadTableWatermark> {
safe_epoch_read_table_watermarks_impl(&table_watermarks_by_table_ids_impl(
safe_epoch_read_table_watermarks_impl(&safe_epoch_table_watermarks_impl(
table_watermarks,
state_table_info,
&member_table_ids
.iter()
.map(TableId::table_id)
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/hummock/manager/compaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -824,7 +824,7 @@ impl HummockManager {
.await;
compact_task.table_watermarks = version
.latest_version()
.table_watermarks_by_table_ids(&compact_task.existing_table_ids);
.safe_epoch_table_watermarks(&compact_task.existing_table_ids);

if self.env.opts.enable_dropped_column_reclaim {
// TODO: get all table schemas for all tables in once call to avoid acquiring lock and await.
Expand Down
58 changes: 46 additions & 12 deletions src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ use crate::level::{Level, Levels, OverlappingLevel};
use crate::sstable_info::SstableInfo;
use crate::table_watermark::{ReadTableWatermark, TableWatermarks};
use crate::version::{
GroupDelta, GroupDeltas, HummockVersion, HummockVersionDelta, IntraLevelDelta,
GroupDelta, GroupDeltas, HummockVersion, HummockVersionDelta, HummockVersionStateTableInfo,
IntraLevelDelta,
};
use crate::{can_concat, CompactionGroupId, HummockSstableId, HummockSstableObjectId};

Expand Down Expand Up @@ -259,26 +260,42 @@ impl HummockVersion {
.unwrap_or(0)
}

pub fn table_watermarks_by_table_ids(
pub fn safe_epoch_table_watermarks(
&self,
existing_table_ids: &[u32],
) -> BTreeMap<u32, TableWatermarks> {
table_watermarks_by_table_ids_impl(&self.table_watermarks, existing_table_ids)
safe_epoch_table_watermarks_impl(
&self.table_watermarks,
&self.state_table_info,
existing_table_ids,
)
}
}

pub fn table_watermarks_by_table_ids_impl(
pub fn safe_epoch_table_watermarks_impl(
table_watermarks: &HashMap<TableId, Arc<TableWatermarks>>,
state_table_info: &HummockVersionStateTableInfo,
existing_table_ids: &[u32],
) -> BTreeMap<u32, TableWatermarks> {
fn extract_single_table_watermark(
table_watermarks: &TableWatermarks,
safe_epoch: u64,
) -> Option<TableWatermarks> {
if let Some((first_epoch, first_epoch_watermark)) = table_watermarks.watermarks.first() {
Some(TableWatermarks {
watermarks: vec![(*first_epoch, first_epoch_watermark.clone())],
direction: table_watermarks.direction,
})
assert!(
*first_epoch >= safe_epoch,
"smallest epoch {} in table watermark should be at least safe epoch {}",
first_epoch,
safe_epoch
);
if *first_epoch == safe_epoch {
Some(TableWatermarks {
watermarks: vec![(*first_epoch, first_epoch_watermark.clone())],
direction: table_watermarks.direction,
})
} else {
None
}
} else {
None
}
Expand All @@ -290,8 +307,15 @@ pub fn table_watermarks_by_table_ids_impl(
if !existing_table_ids.contains(&u32_table_id) {
None
} else {
extract_single_table_watermark(table_watermarks)
.map(|table_watermarks| (table_id.table_id, table_watermarks))
extract_single_table_watermark(
table_watermarks,
state_table_info
.info()
.get(table_id)
.expect("table should exist")
.committed_epoch,
)
.map(|table_watermarks| (table_id.table_id, table_watermarks))
}
})
.collect()
Expand Down Expand Up @@ -727,11 +751,22 @@ impl HummockVersion {
}
}
for (table_id, table_watermarks) in &self.table_watermarks {
let safe_epoch = if let Some(state_table_info) =
self.state_table_info.info().get(table_id)
&& let Some((oldest_epoch, _)) = table_watermarks.watermarks.first()
&& state_table_info.committed_epoch > *oldest_epoch
{
// safe epoch has progressed, need further clear.
state_table_info.committed_epoch
} else {
// safe epoch not progressed or the table has been removed. No need to truncate
continue;
};
let table_watermarks = modified_table_watermarks
.entry(*table_id)
.or_insert_with(|| Some((**table_watermarks).clone()));
if let Some(table_watermarks) = table_watermarks {
table_watermarks.clear_stale_epoch_watermark();
table_watermarks.clear_stale_epoch_watermark(safe_epoch);
}
}
// apply the staging table watermark to hummock version
Expand Down Expand Up @@ -1266,7 +1301,6 @@ pub fn object_size_map(version: &HummockVersion) -> HashMap<HummockSstableObject
/// Currently this method is only used by risectl validate-version.
pub fn validate_version(version: &HummockVersion) -> Vec<String> {
let mut res = Vec::new();

// Ensure each table maps to only one compaction group
for (group_id, levels) in &version.levels {
// Ensure compaction group id matches
Expand Down
148 changes: 133 additions & 15 deletions src/storage/hummock_sdk/src/table_watermark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use risingwave_common::hash::{VirtualNode, VnodeBitmapExt};
use risingwave_common_estimate_size::EstimateSize;
use risingwave_pb::hummock::table_watermarks::PbEpochNewWatermarks;
use risingwave_pb::hummock::{PbVnodeWatermark, TableWatermarks as PbTableWatermarks};
use tracing::warn;
use tracing::{debug, warn};

use crate::key::{prefix_slice_with_vnode, vnode, TableKey, TableKeyRange};
use crate::HummockEpoch;
Expand Down Expand Up @@ -492,12 +492,37 @@ impl TableWatermarks {
);
}

pub fn clear_stale_epoch_watermark(&mut self) {
// retain at most 1 epoch
let mut result_epoch_watermark: Option<(HummockEpoch, Arc<[VnodeWatermark]>)> = None;
pub fn clear_stale_epoch_watermark(&mut self, safe_epoch: u64) {
match self.watermarks.first() {
None => {
// return on empty watermark
return;
}
Some((earliest_epoch, _)) => {
if *earliest_epoch >= safe_epoch {
// No stale epoch watermark needs to be cleared.
return;
}
}
}
debug!("clear stale table watermark below epoch {}", safe_epoch);
let mut result_epoch_watermark = Vec::with_capacity(self.watermarks.len());
let mut unset_vnode: HashSet<VirtualNode> = (0..VirtualNode::COUNT)
.map(VirtualNode::from_index)
.collect();
while let Some((epoch, _)) = self.watermarks.last() {
if *epoch >= safe_epoch {
let (epoch, watermarks) = self.watermarks.pop().expect("have check Some");
for watermark in watermarks.as_ref() {
for vnode in watermark.vnode_bitmap.iter_vnodes() {
unset_vnode.remove(&vnode);
}
}
result_epoch_watermark.push((epoch, watermarks));
} else {
break;
}
}
while !unset_vnode.is_empty()
&& let Some((_, watermarks)) = self.watermarks.pop()
{
Expand All @@ -522,7 +547,9 @@ impl TableWatermarks {
}
}
if !new_vnode_watermarks.is_empty() {
if let Some((_last_epoch, last_watermarks)) = result_epoch_watermark.as_mut() {
if let Some((last_epoch, last_watermarks)) = result_epoch_watermark.last_mut()
&& *last_epoch == safe_epoch
{
*last_watermarks = Arc::from(
last_watermarks
.iter()
Expand All @@ -531,15 +558,17 @@ impl TableWatermarks {
.collect_vec(),
);
} else {
result_epoch_watermark = Some((0, Arc::from(new_vnode_watermarks)));
result_epoch_watermark.push((safe_epoch, Arc::from(new_vnode_watermarks)));
}
}
}
let Some(result_epoch_watermark) = result_epoch_watermark else {
return;
};
// epoch watermark are added from later epoch to earlier epoch.
// reverse to ensure that earlier epochs are at the front
result_epoch_watermark.reverse();
assert!(result_epoch_watermark
.is_sorted_by(|(first_epoch, _), (second_epoch, _)| { first_epoch < second_epoch }));
*self = TableWatermarks {
watermarks: vec![result_epoch_watermark],
watermarks: result_epoch_watermark,
direction: self.direction,
}
}
Expand Down Expand Up @@ -805,17 +834,106 @@ mod tests {
);

let mut table_watermarks_checkpoint = table_watermarks.clone();
table_watermarks_checkpoint.clear_stale_epoch_watermark();
table_watermarks_checkpoint.clear_stale_epoch_watermark(epoch1);
assert_eq!(table_watermarks_checkpoint, table_watermarks);

table_watermarks_checkpoint.clear_stale_epoch_watermark(epoch2);
assert_eq!(
table_watermarks_checkpoint,
TableWatermarks {
watermarks: vec![
(
epoch2,
vec![VnodeWatermark::new(
build_bitmap(vec![0, 1, 2, 3]),
watermark2.clone(),
)]
.into()
),
(
epoch3,
vec![VnodeWatermark::new(
build_bitmap(0..VirtualNode::COUNT),
watermark3.clone(),
)]
.into()
),
(
epoch5,
vec![VnodeWatermark::new(
build_bitmap(vec![0, 3, 4]),
watermark4.clone(),
)]
.into()
)
],
direction,
}
);

table_watermarks_checkpoint.clear_stale_epoch_watermark(epoch3);
assert_eq!(
table_watermarks_checkpoint,
TableWatermarks {
watermarks: vec![
(
epoch3,
vec![VnodeWatermark::new(
build_bitmap(0..VirtualNode::COUNT),
watermark3.clone(),
)]
.into()
),
(
epoch5,
vec![VnodeWatermark::new(
build_bitmap(vec![0, 3, 4]),
watermark4.clone(),
)]
.into()
)
],
direction,
}
);

table_watermarks_checkpoint.clear_stale_epoch_watermark(epoch4);
assert_eq!(
table_watermarks_checkpoint,
TableWatermarks {
watermarks: vec![
(
epoch4,
vec![VnodeWatermark::new(
build_bitmap((1..3).chain(5..VirtualNode::COUNT)),
watermark3.clone()
)]
.into()
),
(
epoch5,
vec![VnodeWatermark::new(
build_bitmap(vec![0, 3, 4]),
watermark4.clone(),
)]
.into()
)
],
direction,
}
);

table_watermarks_checkpoint.clear_stale_epoch_watermark(epoch5);
assert_eq!(
table_watermarks_checkpoint,
TableWatermarks {
watermarks: vec![(
0,
epoch5,
vec![
VnodeWatermark::new(build_bitmap(vec![0, 3, 4]), watermark4.clone(),),
VnodeWatermark::new(build_bitmap(vec![0, 3, 4]), watermark4.clone()),
VnodeWatermark::new(
build_bitmap((1..=2).chain(5..VirtualNode::COUNT)),
watermark3.clone(),
build_bitmap((1..3).chain(5..VirtualNode::COUNT)),
watermark3.clone()
)
]
.into()
Expand Down

0 comments on commit 9631102

Please sign in to comment.