From 4ad19403b10dcd9b8595694f06e73c2f843d374f Mon Sep 17 00:00:00 2001 From: William Wen <44139337+wenym1@users.noreply.github.com> Date: Mon, 25 Dec 2023 17:43:23 +0800 Subject: [PATCH] feat(storage): pass per-vnode watermark to hummock (#13429) --- .../hummock_sdk/src/table_watermark.rs | 115 +++- .../hummock_test/src/hummock_storage_tests.rs | 497 +++++++++++++++++- src/storage/hummock_trace/src/opts.rs | 5 +- .../event_handler/hummock_event_handler.rs | 12 +- .../src/hummock/event_handler/uploader.rs | 1 - .../hummock/store/local_hummock_storage.rs | 13 +- src/storage/src/hummock/store/version.rs | 29 +- src/storage/src/mem_table.rs | 6 +- src/storage/src/store.rs | 65 ++- .../log_store_impl/kv_log_store/reader.rs | 2 +- .../log_store_impl/kv_log_store/serde.rs | 24 +- .../log_store_impl/kv_log_store/writer.rs | 27 +- src/stream/src/common/table/state_table.rs | 68 +-- 13 files changed, 774 insertions(+), 90 deletions(-) diff --git a/src/storage/hummock_sdk/src/table_watermark.rs b/src/storage/hummock_sdk/src/table_watermark.rs index e4967b6e3950..2f76cff6cc5e 100644 --- a/src/storage/hummock_sdk/src/table_watermark.rs +++ b/src/storage/hummock_sdk/src/table_watermark.rs @@ -23,7 +23,7 @@ use risingwave_common::buffer::{Bitmap, BitmapBuilder}; use risingwave_common::hash::{VirtualNode, VnodeBitmapExt}; use risingwave_pb::hummock::table_watermarks::PbEpochNewWatermarks; use risingwave_pb::hummock::{PbTableWatermarks, PbVnodeWatermark}; -use tracing::debug; +use tracing::{debug, warn}; use crate::key::{prefix_slice_with_vnode, vnode_range, TableKey, TableKeyRange}; use crate::HummockEpoch; @@ -82,6 +82,10 @@ impl TableWatermarksIndex { } } + pub fn index(&self) -> &HashMap> { + &self.index + } + pub fn read_watermark(&self, vnode: VirtualNode, epoch: HummockEpoch) -> Option { self.index.get(&vnode).and_then(|epoch_watermarks| { epoch_watermarks @@ -171,6 +175,54 @@ impl TableWatermarksIndex { } } + pub fn filter_regress_watermarks(&self, watermarks: &mut Vec) { + let mut ret = Vec::with_capacity(watermarks.len()); + for watermark in watermarks.drain(..) { + let mut regress_vnodes = None; + for vnode in watermark.vnode_bitmap.iter_vnodes() { + if let Some(prev_watermark) = self.latest_watermark(vnode) { + let is_regress = match self.direction() { + WatermarkDirection::Ascending => prev_watermark > watermark.watermark, + WatermarkDirection::Descending => prev_watermark < watermark.watermark, + }; + if is_regress { + warn!( + "table watermark regress: {:?} {} {:?} {:?}", + self.direction(), + vnode, + watermark.watermark, + prev_watermark + ); + regress_vnodes + .get_or_insert_with(|| BitmapBuilder::zeroed(VirtualNode::COUNT)) + .set(vnode.to_index(), true); + } + } + } + if let Some(regress_vnodes) = regress_vnodes { + let mut bitmap_builder = None; + for vnode in watermark.vnode_bitmap.iter_vnodes() { + let vnode_index = vnode.to_index(); + if !regress_vnodes.is_set(vnode_index) { + bitmap_builder + .get_or_insert_with(|| BitmapBuilder::zeroed(VirtualNode::COUNT)) + .set(vnode_index, true); + } + } + if let Some(bitmap_builder) = bitmap_builder { + ret.push(VnodeWatermark::new( + Arc::new(bitmap_builder.finish()), + watermark.watermark, + )); + } + } else { + // no vnode has regress watermark + ret.push(watermark); + } + } + *watermarks = ret; + } + pub fn direction(&self) -> WatermarkDirection { self.watermark_direction } @@ -238,7 +290,7 @@ impl WatermarkDirection { } } -#[derive(Clone, Debug)] +#[derive(Clone, Debug, PartialEq)] pub struct VnodeWatermark { vnode_bitmap: Arc, watermark: Bytes, @@ -826,6 +878,9 @@ mod tests { prefixed_range_with_vnode(range, TEST_SINGLE_VNODE) } + /// Build and return a watermark index with the following watermarks + /// EPOCH1 bitmap(0, 1, 2, 3) watermark1 + /// EPOCH2 bitmap(1, 2, 3, 4) watermark2 fn build_and_test_watermark_index( direction: WatermarkDirection, watermark1: Bytes, @@ -1030,4 +1085,60 @@ mod tests { } } } + + #[test] + fn test_filter_regress_watermark() { + let watermark1 = Bytes::from_static(b"watermark1"); + let watermark2 = Bytes::from_static(b"watermark2"); + let watermark3 = Bytes::from_static(b"watermark3"); + let index = build_and_test_watermark_index( + WatermarkDirection::Ascending, + watermark1.clone(), + watermark2.clone(), + watermark3.clone(), + ); + + let mut new_watermarks = vec![ + // Partial regress + VnodeWatermark { + vnode_bitmap: build_bitmap(0..2), + watermark: watermark1.clone(), + }, + // All not regress + VnodeWatermark { + vnode_bitmap: build_bitmap(2..4), + watermark: watermark3.clone(), + }, + // All regress + VnodeWatermark { + vnode_bitmap: build_bitmap(4..5), + watermark: watermark1.clone(), + }, + // All newly set vnode + VnodeWatermark { + vnode_bitmap: build_bitmap(5..6), + watermark: watermark3.clone(), + }, + ]; + + index.filter_regress_watermarks(&mut new_watermarks); + + assert_eq!( + new_watermarks, + vec![ + VnodeWatermark { + vnode_bitmap: build_bitmap(0..1), + watermark: watermark1, + }, + VnodeWatermark { + vnode_bitmap: build_bitmap(2..4), + watermark: watermark3.clone(), + }, + VnodeWatermark { + vnode_bitmap: build_bitmap(5..6), + watermark: watermark3, + }, + ] + ); + } } diff --git a/src/storage/hummock_test/src/hummock_storage_tests.rs b/src/storage/hummock_test/src/hummock_storage_tests.rs index 5826ef5c8bb6..9c5e7fac402a 100644 --- a/src/storage/hummock_test/src/hummock_storage_tests.rs +++ b/src/storage/hummock_test/src/hummock_storage_tests.rs @@ -12,18 +12,26 @@ // See the License for the specific language governing permissions and // limitations under the License. use std::ops::Bound::{Excluded, Included, Unbounded}; +use std::ops::Range; use std::sync::Arc; use bytes::{BufMut, Bytes}; use futures::TryStreamExt; +use itertools::Itertools; use parking_lot::RwLock; +use risingwave_common::buffer::BitmapBuilder; use risingwave_common::cache::CachePriority; use risingwave_common::catalog::TableId; use risingwave_common::hash::VirtualNode; -use risingwave_hummock_sdk::key::{FullKey, TableKey, TABLE_PREFIX_LEN}; +use risingwave_common::range::RangeBoundsExt; +use risingwave_hummock_sdk::key::{ + gen_key_from_bytes, prefix_slice_with_vnode, FullKey, TableKey, UserKey, TABLE_PREFIX_LEN, +}; +use risingwave_hummock_sdk::table_watermark::{VnodeWatermark, WatermarkDirection}; use risingwave_rpc_client::HummockMetaClient; +use risingwave_storage::hummock::local_version::pinned_version::PinnedVersion; use risingwave_storage::hummock::store::version::{read_filter_for_batch, read_filter_for_local}; -use risingwave_storage::hummock::CachePolicy; +use risingwave_storage::hummock::{CachePolicy, HummockStorage, LocalHummockStorage}; use risingwave_storage::storage_value::StorageValue; use risingwave_storage::store::*; use risingwave_storage::StateStore; @@ -1780,3 +1788,488 @@ async fn test_get_with_min_epoch() { assert!(v.is_none()); } } + +#[tokio::test] +async fn test_table_watermark() { + const TEST_TABLE_ID: TableId = TableId { table_id: 233 }; + let test_env = prepare_hummock_test_env().await; + test_env.register_table_id(TEST_TABLE_ID).await; + let mut local1 = test_env + .storage + .new_local(NewLocalOptions::for_test(TEST_TABLE_ID)) + .await; + + let mut local2 = test_env + .storage + .new_local(NewLocalOptions::for_test(TEST_TABLE_ID)) + .await; + + let vnode1 = VirtualNode::from_index(1); + let vnode_bitmap1 = { + let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT); + builder.set(1, true); + builder.finish() + }; + let vnode2 = VirtualNode::from_index(2); + let vnode_bitmap2 = { + let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT); + builder.set(2, true); + builder.finish() + }; + + let epoch1 = (31 * 1000) << 16; + local1.init_for_test(epoch1).await.unwrap(); + local2.init_for_test(epoch1).await.unwrap(); + + fn gen_inner_key(index: usize) -> Bytes { + Bytes::copy_from_slice(format!("key_{:05}", index).as_bytes()) + } + + fn gen_key(vnode: VirtualNode, index: usize) -> TableKey { + gen_key_from_bytes(vnode, &gen_inner_key(index)) + } + + fn gen_val(index: usize) -> Bytes { + Bytes::copy_from_slice(format!("val_{}", index).as_bytes()) + } + + fn gen_range() -> Range { + 0..30 + } + + fn gen_batch( + vnode: VirtualNode, + index: impl Iterator, + ) -> Vec<(TableKey, Bytes)> { + index + .map(|index| (gen_key(vnode, index), gen_val(index))) + .collect_vec() + } + + let epoch1_indexes = || gen_range().filter(|index| index % 3 == 0); + + // epoch 1 write + let batch1_epoch1 = gen_batch(vnode1, epoch1_indexes()); + let batch2_epoch1 = gen_batch(vnode2, epoch1_indexes()); + + for (local, batch) in [(&mut local1, batch1_epoch1), (&mut local2, batch2_epoch1)] { + for (key, value) in batch { + local.insert(key, value, None).unwrap(); + } + } + + // test read after write + { + for (local, vnode) in [(&local1, vnode1), (&local2, vnode2)] { + for index in epoch1_indexes() { + let value = risingwave_storage::store::LocalStateStore::get( + local, + gen_key(vnode, index), + ReadOptions { + table_id: TEST_TABLE_ID, + ..Default::default() + }, + ) + .await + .unwrap(); + assert_eq!(value.unwrap(), gen_val(index)); + } + let result = risingwave_storage::store::LocalStateStore::iter( + local, + RangeBoundsExt::map(&gen_range(), |index| gen_key(vnode, *index)), + ReadOptions { + table_id: TEST_TABLE_ID, + ..Default::default() + }, + ) + .await + .unwrap() + .map_ok(|(full_key, value)| (full_key.user_key, value)) + .try_collect::>() + .await + .unwrap(); + let expected = epoch1_indexes() + .map(|index| { + ( + UserKey::new(TEST_TABLE_ID, gen_key(vnode, index)), + gen_val(index), + ) + }) + .collect_vec(); + assert_eq!(expected, result); + } + } + + let watermark1 = 10; + + let epoch2 = (32 * 1000) << 16; + for (local, vnode_bitmap) in [ + (&mut local1, vnode_bitmap1.clone()), + (&mut local2, vnode_bitmap2.clone()), + ] { + local.flush(vec![]).await.unwrap(); + local.seal_current_epoch( + epoch2, + SealCurrentEpochOptions::new( + vec![VnodeWatermark::new( + Arc::new(vnode_bitmap), + gen_inner_key(watermark1), + )], + WatermarkDirection::Ascending, + ), + ); + } + + // test read after seal with watermark1 + { + for (local, vnode) in [(&local1, vnode1), (&local2, vnode2)] { + for index in epoch1_indexes() { + let value = risingwave_storage::store::LocalStateStore::get( + local, + gen_key(vnode, index), + ReadOptions { + table_id: TEST_TABLE_ID, + ..Default::default() + }, + ) + .await + .unwrap(); + if index < watermark1 { + assert!(value.is_none()); + } else { + assert_eq!(value.unwrap(), gen_val(index)); + } + } + + // iter full range + { + let result = risingwave_storage::store::LocalStateStore::iter( + local, + RangeBoundsExt::map(&gen_range(), |index| gen_key(vnode, *index)), + ReadOptions { + table_id: TEST_TABLE_ID, + ..Default::default() + }, + ) + .await + .unwrap() + .map_ok(|(full_key, value)| (full_key.user_key, value)) + .try_collect::>() + .await + .unwrap(); + let expected = epoch1_indexes() + .filter(|index| index >= &watermark1) + .map(|index| { + ( + UserKey::new(TEST_TABLE_ID, gen_key(vnode, index)), + gen_val(index), + ) + }) + .collect_vec(); + assert_eq!(expected, result); + } + + // iter below watermark + { + let result = risingwave_storage::store::LocalStateStore::iter( + local, + ( + Included(gen_key(vnode, 0)), + Included(gen_key(vnode, watermark1 - 1)), + ), + ReadOptions { + table_id: TEST_TABLE_ID, + ..Default::default() + }, + ) + .await + .unwrap() + .try_collect::>() + .await + .unwrap(); + assert!(result.is_empty()); + } + } + } + + let epoch2_indexes = || { + gen_range() + .filter(|index| index % 3 == 1) + .filter(|index| index >= &watermark1) + }; + + // epoch 2 write + let batch1_epoch2 = gen_batch(vnode1, epoch2_indexes()); + let batch2_epoch2 = gen_batch(vnode2, epoch2_indexes()); + + let epoch3 = (33 * 1000) << 16; + + for (local, batch) in [(&mut local1, batch1_epoch2), (&mut local2, batch2_epoch2)] { + for (key, value) in batch { + local.insert(key, value, None).unwrap(); + } + local.flush(vec![]).await.unwrap(); + local.seal_current_epoch(epoch3, SealCurrentEpochOptions::no_watermark()); + } + + let indexes_after_epoch2 = || gen_range().filter(|index| index % 3 == 0 || index % 3 == 1); + + let test_after_epoch2 = |local1: LocalHummockStorage, local2: LocalHummockStorage| async { + for (local, vnode) in [(&local1, vnode1), (&local2, vnode2)] { + for index in indexes_after_epoch2() { + let value = risingwave_storage::store::LocalStateStore::get( + local, + gen_key(vnode, index), + ReadOptions { + table_id: TEST_TABLE_ID, + ..Default::default() + }, + ) + .await + .unwrap(); + if index < watermark1 { + assert!(value.is_none()); + } else { + assert_eq!(value.unwrap(), gen_val(index)); + } + } + + // iter full range + { + let result = risingwave_storage::store::LocalStateStore::iter( + local, + RangeBoundsExt::map(&gen_range(), |index| gen_key(vnode, *index)), + ReadOptions { + table_id: TEST_TABLE_ID, + ..Default::default() + }, + ) + .await + .unwrap() + .map_ok(|(full_key, value)| (full_key.user_key, value)) + .try_collect::>() + .await + .unwrap(); + let expected = indexes_after_epoch2() + .filter(|index| index >= &watermark1) + .map(|index| { + ( + UserKey::new(TEST_TABLE_ID, gen_key(vnode, index)), + gen_val(index), + ) + }) + .collect_vec(); + assert_eq!(expected, result); + } + + // iter below watermark + { + let result = risingwave_storage::store::LocalStateStore::iter( + local, + ( + Included(gen_key(vnode, 0)), + Included(gen_key(vnode, watermark1 - 1)), + ), + ReadOptions { + table_id: TEST_TABLE_ID, + ..Default::default() + }, + ) + .await + .unwrap() + .try_collect::>() + .await + .unwrap(); + assert!(result.is_empty()); + } + } + (local1, local2) + }; + + let (local1, local2) = test_after_epoch2(local1, local2).await; + + let check_version_table_watermark = |version: PinnedVersion| { + let table_watermarks = version.table_watermark_index().get(&TEST_TABLE_ID).unwrap(); + assert_eq!(WatermarkDirection::Ascending, table_watermarks.direction()); + let index = table_watermarks.index(); + assert_eq!(2, index.len()); + let vnode1_watermark = index.get(&vnode1).unwrap(); + assert_eq!(1, vnode1_watermark.len()); + assert_eq!( + &gen_inner_key(watermark1), + vnode1_watermark.get(&epoch1).unwrap() + ); + let vnode2_watermark = index.get(&vnode2).unwrap(); + assert_eq!(1, vnode2_watermark.len()); + assert_eq!( + &gen_inner_key(watermark1), + vnode2_watermark.get(&epoch1).unwrap() + ); + }; + + test_env.commit_epoch(epoch1).await; + test_env.storage.try_wait_epoch_for_test(epoch1).await; + + let test_global_read = |storage: HummockStorage, epoch: u64| async move { + // inner vnode read + for vnode in [vnode1, vnode2] { + for index in indexes_after_epoch2() { + let value = storage + .get( + gen_key(vnode, index), + epoch, + ReadOptions { + table_id: TEST_TABLE_ID, + ..Default::default() + }, + ) + .await + .unwrap(); + if index < watermark1 { + assert!(value.is_none()); + } else { + assert_eq!(value.unwrap(), gen_val(index)); + } + } + + // iter full range + { + let result = storage + .iter( + RangeBoundsExt::map(&gen_range(), |index| gen_key(vnode, *index)), + epoch, + ReadOptions { + table_id: TEST_TABLE_ID, + ..Default::default() + }, + ) + .await + .unwrap() + .map_ok(|(full_key, value)| (full_key.user_key, value)) + .try_collect::>() + .await + .unwrap(); + let expected = indexes_after_epoch2() + .filter(|index| index >= &watermark1) + .map(|index| { + ( + UserKey::new(TEST_TABLE_ID, gen_key(vnode, index)), + gen_val(index), + ) + }) + .collect_vec(); + assert_eq!(expected, result); + } + + // iter below watermark + { + let result = storage + .iter( + ( + Included(gen_key(vnode, 0)), + Included(gen_key(vnode, watermark1 - 1)), + ), + epoch, + ReadOptions { + table_id: TEST_TABLE_ID, + ..Default::default() + }, + ) + .await + .unwrap() + .try_collect::>() + .await + .unwrap(); + assert!(result.is_empty()); + } + } + + // cross vnode read + let result = storage + .iter( + ( + Included(TableKey(prefix_slice_with_vnode( + vnode1, + &gen_inner_key(gen_range().start), + ))), + Included(TableKey(prefix_slice_with_vnode( + vnode2, + &gen_inner_key(gen_range().end), + ))), + ), + epoch, + ReadOptions { + table_id: TEST_TABLE_ID, + ..Default::default() + }, + ) + .await + .unwrap() + .map_ok(|(full_key, value)| (full_key.user_key, value)) + .try_collect::>() + .await + .unwrap(); + let expected = [vnode1, vnode2] + .into_iter() + .flat_map(|vnode| { + gen_range() + .filter(|index| index % 3 == 0 || index % 3 == 1) + .filter(|index| index >= &watermark1) + .map(move |index| { + ( + UserKey::new(TEST_TABLE_ID, gen_key(vnode, index)), + gen_val(index), + ) + }) + }) + .collect_vec(); + assert_eq!(expected, result); + }; + + test_global_read(test_env.storage.clone(), epoch2).await; + + check_version_table_watermark(test_env.storage.get_pinned_version()); + + let (local1, local2) = test_after_epoch2(local1, local2).await; + + test_env.commit_epoch(epoch2).await; + test_env.storage.try_wait_epoch_for_test(epoch2).await; + + test_global_read(test_env.storage.clone(), epoch2).await; + + check_version_table_watermark(test_env.storage.get_pinned_version()); + + let (mut local1, mut local2) = test_after_epoch2(local1, local2).await; + + let epoch4 = (34 * 1000) << 16; + + for (local, vnode_bitmap) in [ + (&mut local1, vnode_bitmap1.clone()), + (&mut local2, vnode_bitmap2.clone()), + ] { + // regress watermark + local.seal_current_epoch( + epoch4, + SealCurrentEpochOptions::new( + vec![VnodeWatermark::new( + Arc::new(vnode_bitmap), + gen_inner_key(5), + )], + WatermarkDirection::Ascending, + ), + ); + } + + test_global_read(test_env.storage.clone(), epoch3).await; + + let (local1, local2) = test_after_epoch2(local1, local2).await; + + test_env.commit_epoch(epoch3).await; + test_env.storage.try_wait_epoch_for_test(epoch3).await; + + check_version_table_watermark(test_env.storage.get_pinned_version()); + + let (_local1, _local2) = test_after_epoch2(local1, local2).await; + + test_global_read(test_env.storage.clone(), epoch3).await; +} diff --git a/src/storage/hummock_trace/src/opts.rs b/src/storage/hummock_trace/src/opts.rs index b9eb6fd3ed69..dfa3ba46ac4e 100644 --- a/src/storage/hummock_trace/src/opts.rs +++ b/src/storage/hummock_trace/src/opts.rs @@ -219,4 +219,7 @@ pub struct TracedInitOptions { } #[derive(Debug, Clone, PartialEq, Eq, Decode, Encode)] -pub struct TracedSealCurrentEpochOptions {} +pub struct TracedSealCurrentEpochOptions { + // The watermark is serialized into protobuf + pub table_watermarks: Option<(bool, Vec>)>, +} diff --git a/src/storage/src/hummock/event_handler/hummock_event_handler.rs b/src/storage/src/hummock/event_handler/hummock_event_handler.rs index f8fc574a54ad..498c41c6e635 100644 --- a/src/storage/src/hummock/event_handler/hummock_event_handler.rs +++ b/src/storage/src/hummock/event_handler/hummock_event_handler.rs @@ -578,7 +578,17 @@ impl HummockEventHandler { } } - HummockEvent::LocalSealEpoch { .. } => {} + HummockEvent::LocalSealEpoch { + epoch, + opts, + table_id, + .. + } => { + if let Some((direction, watermarks)) = opts.table_watermarks { + self.uploader + .add_table_watermarks(epoch, table_id, watermarks, direction) + } + } #[cfg(any(test, feature = "test"))] HummockEvent::FlushEvent(sender) => { diff --git a/src/storage/src/hummock/event_handler/uploader.rs b/src/storage/src/hummock/event_handler/uploader.rs index ba6cb511259b..66357753fd03 100644 --- a/src/storage/src/hummock/event_handler/uploader.rs +++ b/src/storage/src/hummock/event_handler/uploader.rs @@ -759,7 +759,6 @@ impl HummockUploader { .push_front(imm); } - #[expect(dead_code)] pub(crate) fn add_table_watermarks( &mut self, epoch: u64, diff --git a/src/storage/src/hummock/store/local_hummock_storage.rs b/src/storage/src/hummock/store/local_hummock_storage.rs index 258694121028..72c24cc5b1cb 100644 --- a/src/storage/src/hummock/store/local_hummock_storage.rs +++ b/src/storage/src/hummock/store/local_hummock_storage.rs @@ -394,7 +394,7 @@ impl LocalStateStore for LocalHummockStorage { Ok(()) } - fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions) { + fn seal_current_epoch(&mut self, next_epoch: u64, mut opts: SealCurrentEpochOptions) { assert!(!self.is_dirty()); let prev_epoch = self .epoch @@ -407,6 +407,17 @@ impl LocalStateStore for LocalHummockStorage { next_epoch, prev_epoch ); + if let Some((direction, watermarks)) = &mut opts.table_watermarks { + let mut read_version = self.read_version.write(); + read_version.filter_regress_watermarks(watermarks); + if !watermarks.is_empty() { + read_version.update(VersionUpdate::NewTableWatermark { + direction: *direction, + epoch: prev_epoch, + vnode_watermarks: watermarks.clone(), + }); + } + } self.event_sender .send(HummockEvent::LocalSealEpoch { instance_id: self.instance_id(), diff --git a/src/storage/src/hummock/store/version.rs b/src/storage/src/hummock/store/version.rs index 5496e94b9e75..8fcd81d6b75d 100644 --- a/src/storage/src/hummock/store/version.rs +++ b/src/storage/src/hummock/store/version.rs @@ -27,7 +27,9 @@ use risingwave_hummock_sdk::key::{ bound_table_key_range, is_empty_key_range, FullKey, TableKey, TableKeyRange, UserKey, }; use risingwave_hummock_sdk::key_range::KeyRangeCommon; -use risingwave_hummock_sdk::table_watermark::{ReadTableWatermark, TableWatermarksIndex}; +use risingwave_hummock_sdk::table_watermark::{ + ReadTableWatermark, TableWatermarksIndex, VnodeWatermark, WatermarkDirection, +}; use risingwave_hummock_sdk::{HummockEpoch, LocalSstableInfo}; use risingwave_pb::hummock::{HummockVersionDelta, LevelType, SstableInfo}; use sync_point::sync_point; @@ -124,6 +126,11 @@ pub enum VersionUpdate { Staging(StagingData), CommittedDelta(HummockVersionDelta), CommittedSnapshot(CommittedVersion), + NewTableWatermark { + direction: WatermarkDirection, + epoch: HummockEpoch, + vnode_watermarks: Vec, + }, } #[derive(Clone)] @@ -407,6 +414,16 @@ impl HummockReadVersion { } } } + VersionUpdate::NewTableWatermark { + direction, + epoch, + vnode_watermarks, + } => self + .table_watermarks + .get_or_insert_with(|| { + TableWatermarksIndex::new(direction, self.committed.max_committed_epoch()) + }) + .add_epoch_watermark(epoch, &vnode_watermarks, direction), } } @@ -418,6 +435,16 @@ impl HummockReadVersion { &self.committed } + /// We have assumption that the watermark is increasing monotonically. Therefore, + /// here if the upper layer usage has passed an regressed watermark, we should + /// filter out the regressed watermark. Currently the kv log store may write + /// regressed watermark + pub fn filter_regress_watermarks(&self, watermarks: &mut Vec) { + if let Some(watermark_index) = &self.table_watermarks { + watermark_index.filter_regress_watermarks(watermarks) + } + } + pub fn clear_uncommitted(&mut self) { self.staging.imm.clear(); self.staging.merged_imm.clear(); diff --git a/src/storage/src/mem_table.rs b/src/storage/src/mem_table.rs index 25342f2b163e..eca66bfba2f7 100644 --- a/src/storage/src/mem_table.rs +++ b/src/storage/src/mem_table.rs @@ -26,6 +26,7 @@ use risingwave_common::catalog::{TableId, TableOption}; use risingwave_common::estimate_size::{EstimateSize, KvSize}; use risingwave_hummock_sdk::key::{FullKey, TableKey, TableKeyRange}; use thiserror::Error; +use tracing::warn; use crate::error::{StorageError, StorageResult}; use crate::hummock::iterator::{FromRustIterator, RustIteratorBuilder}; @@ -600,7 +601,7 @@ impl LocalStateStore for MemtableLocalState Ok(()) } - fn seal_current_epoch(&mut self, next_epoch: u64, _opts: SealCurrentEpochOptions) { + fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions) { assert!(!self.is_dirty()); let prev_epoch = self .epoch @@ -612,6 +613,9 @@ impl LocalStateStore for MemtableLocalState next_epoch, prev_epoch ); + if opts.table_watermarks.is_some() { + warn!("table watermark only supported in hummock state store"); + } } async fn try_flush(&mut self) -> StorageResult<()> { diff --git a/src/storage/src/store.rs b/src/storage/src/store.rs index 7564df7a7dfc..cf5211d7069e 100644 --- a/src/storage/src/store.rs +++ b/src/storage/src/store.rs @@ -21,10 +21,13 @@ use std::sync::Arc; use bytes::Bytes; use futures::{Stream, StreamExt, TryStreamExt}; use futures_async_stream::try_stream; +use prost::Message; use risingwave_common::catalog::{TableId, TableOption}; use risingwave_common::util::epoch::{Epoch, EpochPair}; use risingwave_hummock_sdk::key::{FullKey, TableKey, TableKeyRange}; -use risingwave_hummock_sdk::table_watermark::TableWatermarks; +use risingwave_hummock_sdk::table_watermark::{ + TableWatermarks, VnodeWatermark, WatermarkDirection, +}; use risingwave_hummock_sdk::{HummockReadEpoch, LocalSstableInfo}; use risingwave_hummock_trace::{ TracedInitOptions, TracedNewLocalOptions, TracedPrefetchOptions, TracedReadOptions, @@ -503,30 +506,64 @@ impl From for InitOptions { } #[derive(Clone, Debug)] -pub struct SealCurrentEpochOptions {} +pub struct SealCurrentEpochOptions { + pub table_watermarks: Option<(WatermarkDirection, Vec)>, +} impl From for TracedSealCurrentEpochOptions { - fn from(_value: SealCurrentEpochOptions) -> Self { - TracedSealCurrentEpochOptions {} + fn from(value: SealCurrentEpochOptions) -> Self { + TracedSealCurrentEpochOptions { + table_watermarks: value.table_watermarks.map(|(direction, watermarks)| { + ( + direction == WatermarkDirection::Ascending, + watermarks + .iter() + .map(|watermark| Message::encode_to_vec(&watermark.to_protobuf())) + .collect(), + ) + }), + } } } -impl TryInto for TracedSealCurrentEpochOptions { - type Error = anyhow::Error; - - fn try_into(self) -> Result { - Ok(SealCurrentEpochOptions {}) +impl From for SealCurrentEpochOptions { + fn from(value: TracedSealCurrentEpochOptions) -> SealCurrentEpochOptions { + SealCurrentEpochOptions { + table_watermarks: value.table_watermarks.map(|(is_ascending, watermarks)| { + ( + if is_ascending { + WatermarkDirection::Ascending + } else { + WatermarkDirection::Descending + }, + watermarks + .iter() + .map(|serialized_watermark| { + Message::decode(serialized_watermark.as_slice()) + .map(|pb| VnodeWatermark::from_protobuf(&pb)) + .expect("should not failed") + }) + .collect(), + ) + }), + } } } impl SealCurrentEpochOptions { - #[expect(clippy::new_without_default)] - pub fn new() -> Self { - Self {} + pub fn new(watermarks: Vec, direction: WatermarkDirection) -> Self { + Self { + table_watermarks: Some((direction, watermarks)), + } + } + + pub fn no_watermark() -> Self { + Self { + table_watermarks: None, + } } - #[cfg(any(test, feature = "test"))] pub fn for_test() -> Self { - Self::new() + Self::no_watermark() } } diff --git a/src/stream/src/common/log_store_impl/kv_log_store/reader.rs b/src/stream/src/common/log_store_impl/kv_log_store/reader.rs index dede4e21a61d..8cb126f64b7f 100644 --- a/src/stream/src/common/log_store_impl/kv_log_store/reader.rs +++ b/src/stream/src/common/log_store_impl/kv_log_store/reader.rs @@ -482,6 +482,6 @@ impl LogReader for KvLogStoreReader { } self.rx.rewind(); - Ok((true, Some(self.serde.vnodes().clone()))) + Ok((true, Some((**self.serde.vnodes()).clone()))) } } diff --git a/src/stream/src/common/log_store_impl/kv_log_store/serde.rs b/src/stream/src/common/log_store_impl/kv_log_store/serde.rs index 22d5958d7409..7ff0661f9795 100644 --- a/src/stream/src/common/log_store_impl/kv_log_store/serde.rs +++ b/src/stream/src/common/log_store_impl/kv_log_store/serde.rs @@ -183,8 +183,8 @@ impl LogStoreRowSerde { self.vnodes = vnodes; } - pub(crate) fn vnodes(&self) -> &Bitmap { - self.vnodes.as_ref() + pub(crate) fn vnodes(&self) -> &Arc { + &self.vnodes } pub(crate) fn encode_epoch(epoch: u64) -> i64 { @@ -272,14 +272,16 @@ impl LogStoreRowSerde { pub(crate) fn serialize_truncation_offset_watermark( &self, - vnode: VirtualNode, offset: ReaderTruncationOffsetType, ) -> Bytes { let (epoch, seq_id) = offset; - let curr_offset = self.serialize_log_store_pk(vnode, epoch, seq_id); - let ret = Bytes::from(next_key(&curr_offset)); - assert!(!ret.is_empty()); - ret + Bytes::from(next_key(&serialize_pk( + [ + Some(ScalarImpl::Int64(Self::encode_epoch(epoch))), + seq_id.map(ScalarImpl::Int32), + ], + &self.pk_serde, + ))) } } @@ -781,9 +783,7 @@ mod tests { fn remove_vnode_prefix(key: &Bytes) -> Bytes { key.slice(VirtualNode::SIZE..) } - let delete_range_right1 = remove_vnode_prefix( - &serde.serialize_truncation_offset_watermark(DEFAULT_VNODE, (epoch, None)), - ); + let delete_range_right1 = serde.serialize_truncation_offset_watermark((epoch, None)); for (op, row) in stream_chunk.rows() { let (_, key, value) = serde.serialize_data_row(epoch, seq_id, op, row); @@ -820,9 +820,7 @@ mod tests { seq_id = 1; epoch += 1; - let delete_range_right2 = remove_vnode_prefix( - &serde.serialize_truncation_offset_watermark(DEFAULT_VNODE, (epoch, None)), - ); + let delete_range_right2 = serde.serialize_truncation_offset_watermark((epoch, None)); for (op, row) in stream_chunk.rows() { let (_, key, value) = serde.serialize_data_row(epoch, seq_id, op, row); diff --git a/src/stream/src/common/log_store_impl/kv_log_store/writer.rs b/src/stream/src/common/log_store_impl/kv_log_store/writer.rs index 7b215e772324..fa380a77b6ae 100644 --- a/src/stream/src/common/log_store_impl/kv_log_store/writer.rs +++ b/src/stream/src/common/log_store_impl/kv_log_store/writer.rs @@ -12,11 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::ops::Bound::{Excluded, Included}; use std::sync::Arc; use anyhow::anyhow; -use bytes::Bytes; +use itertools::Itertools; use risingwave_common::array::StreamChunk; use risingwave_common::buffer::{Bitmap, BitmapBuilder}; use risingwave_common::catalog::TableId; @@ -24,6 +23,7 @@ use risingwave_common::estimate_size::EstimateSize; use risingwave_common::hash::{VirtualNode, VnodeBitmapExt}; use risingwave_common::util::epoch::EpochPair; use risingwave_connector::sink::log_store::{LogStoreResult, LogWriter}; +use risingwave_hummock_sdk::table_watermark::{VnodeWatermark, WatermarkDirection}; use risingwave_storage::store::{InitOptions, LocalStateStore, SealCurrentEpochOptions}; use tokio::sync::watch; @@ -150,19 +150,20 @@ impl LogWriter for KvLogStoreWriter { Ok(()) })?; flush_info.report(&self.metrics); - let mut delete_range = Vec::with_capacity(self.serde.vnodes().count_ones()); + let mut watermark = None; if let Some(truncation_offset) = self.tx.pop_truncation(epoch) { - for vnode in self.serde.vnodes().iter_vnodes() { - let range_begin = Bytes::from(vnode.to_be_bytes().to_vec()); - let range_end = self - .serde - .serialize_truncation_offset_watermark(vnode, truncation_offset); - delete_range.push((Included(range_begin), Excluded(range_end))); - } + watermark = Some(VnodeWatermark::new( + self.serde.vnodes().clone(), + self.serde + .serialize_truncation_offset_watermark(truncation_offset), + )); } - self.state_store.flush(delete_range).await?; - self.state_store - .seal_current_epoch(next_epoch, SealCurrentEpochOptions::new()); + self.state_store.flush(vec![]).await?; + let watermark = watermark.into_iter().collect_vec(); + self.state_store.seal_current_epoch( + next_epoch, + SealCurrentEpochOptions::new(watermark, WatermarkDirection::Ascending), + ); self.tx.barrier(epoch, is_checkpoint, next_epoch); self.seq_id = FIRST_SEQ_ID; Ok(()) diff --git a/src/stream/src/common/table/state_table.rs b/src/stream/src/common/table/state_table.rs index b5cf00eceb78..d88768468697 100644 --- a/src/stream/src/common/table/state_table.rs +++ b/src/stream/src/common/table/state_table.rs @@ -40,9 +40,10 @@ use risingwave_common::util::row_serde::OrderedRowSerde; use risingwave_common::util::sort_util::OrderType; use risingwave_common::util::value_encoding::BasicSerde; use risingwave_hummock_sdk::key::{ - end_bound_of_prefix, next_key, prefixed_range_with_vnode, range_of_prefix, + end_bound_of_prefix, prefixed_range_with_vnode, range_of_prefix, start_bound_of_excluded_prefix, TableKey, TableKeyRange, }; +use risingwave_hummock_sdk::table_watermark::{VnodeWatermark, WatermarkDirection}; use risingwave_pb::catalog::Table; use risingwave_storage::error::{ErrorKind, StorageError, StorageResult}; use risingwave_storage::hummock::CachePolicy; @@ -1040,7 +1041,7 @@ where if !self.is_dirty() { // If the state table is not modified, go fast path. self.local_store - .seal_current_epoch(new_epoch.curr, SealCurrentEpochOptions::new()); + .seal_current_epoch(new_epoch.curr, SealCurrentEpochOptions::no_watermark()); return Ok(()); } else { self.seal_current_epoch(new_epoch.curr) @@ -1109,7 +1110,7 @@ where // per epoch. self.watermark_buffer_strategy.tick(); self.local_store - .seal_current_epoch(new_epoch.curr, SealCurrentEpochOptions::new()); + .seal_current_epoch(new_epoch.curr, SealCurrentEpochOptions::no_watermark()); } /// Write to state store. @@ -1119,8 +1120,6 @@ where trace!(table_id = %self.table_id, watermark = ?watermark, "state cleaning"); }); - let mut delete_ranges = Vec::new(); - let prefix_serializer = if self.pk_indices().is_empty() { None } else { @@ -1156,10 +1155,11 @@ where ) }); + let mut seal_watermark: Option<(WatermarkDirection, VnodeWatermark)> = None; + // Compute Delete Ranges if should_clean_watermark && let Some(watermark_suffix) = watermark_suffix - && let Some(first_byte) = watermark_suffix.first() { trace!(table_id = %self.table_id, watermark = ?watermark_suffix, vnodes = ?{ self.vnodes.iter_vnodes().collect_vec() @@ -1172,36 +1172,21 @@ where .unwrap() .is_ascending() { - // We either serialize null into `0u8`, data into `(1u8 || scalar)`, or serialize null - // into `1u8`, data into `(0u8 || scalar)`. We do not want to delete null - // here, so `range_begin_suffix` cannot be `vec![]` when null is represented as `0u8`. - let range_begin_suffix = vec![*first_byte]; - for vnode in self.vnodes.iter_vnodes() { - let mut range_begin = vnode.to_be_bytes().to_vec(); - let mut range_end = range_begin.clone(); - range_begin.extend(&range_begin_suffix); - range_end.extend(&watermark_suffix); - delete_ranges.push(( - Bound::Included(Bytes::from(range_begin)), - Bound::Excluded(Bytes::from(range_end)), - )); - } + seal_watermark = Some(( + WatermarkDirection::Ascending, + VnodeWatermark::new( + self.vnodes.clone(), + Bytes::copy_from_slice(watermark_suffix.as_ref()) + ) + )); } else { - assert_ne!(*first_byte, u8::MAX); - let following_bytes = next_key(&watermark_suffix[1..]); - if !following_bytes.is_empty() { - for vnode in self.vnodes.iter_vnodes() { - let mut range_begin = vnode.to_be_bytes().to_vec(); - let mut range_end = range_begin.clone(); - range_begin.push(*first_byte); - range_begin.extend(&following_bytes); - range_end.push(first_byte + 1); - delete_ranges.push(( - Bound::Included(Bytes::from(range_begin)), - Bound::Excluded(Bytes::from(range_end)), - )); - } - } + seal_watermark = Some(( + WatermarkDirection::Descending, + VnodeWatermark::new( + self.vnodes.clone(), + Bytes::copy_from_slice(watermark_suffix.as_ref()) + ) + )); } } self.prev_cleaned_watermark = watermark; @@ -1213,13 +1198,18 @@ where // 2. Mark the cache as not_synced, so we can still refill it later. // 3. When refilling the cache, // we just refill from the largest value of the cache, as the lower bound. - if USE_WATERMARK_CACHE && !delete_ranges.is_empty() { + if USE_WATERMARK_CACHE && seal_watermark.is_some() { self.watermark_cache.clear(); } - self.local_store.flush(delete_ranges).await?; - self.local_store - .seal_current_epoch(next_epoch, SealCurrentEpochOptions::new()); + self.local_store.flush(vec![]).await?; + let seal_opt = match seal_watermark { + Some((direction, watermark)) => { + SealCurrentEpochOptions::new(vec![watermark], direction) + } + None => SealCurrentEpochOptions::no_watermark(), + }; + self.local_store.seal_current_epoch(next_epoch, seal_opt); Ok(()) }