diff --git a/src/storage/hummock_test/src/snapshot_tests.rs b/src/storage/hummock_test/src/snapshot_tests.rs index 670205850b89d..d8b77df0dbe92 100644 --- a/src/storage/hummock_test/src/snapshot_tests.rs +++ b/src/storage/hummock_test/src/snapshot_tests.rs @@ -32,7 +32,7 @@ use risingwave_storage::store::{ use risingwave_storage::StateStore; use crate::local_state_store_test_utils::LocalStateStoreTestExt; -use crate::test_utils::{gen_key_from_bytes, with_hummock_storage_v2, TestIngestBatch}; +use crate::test_utils::{gen_key_from_bytes, with_hummock_storage, TestIngestBatch}; macro_rules! assert_count_range_scan { ( @@ -402,36 +402,36 @@ async fn test_snapshot_range_scan_inner( #[tokio::test] async fn test_snapshot_v2() { - let (storage, meta_client) = with_hummock_storage_v2(Default::default()).await; + let (storage, meta_client) = with_hummock_storage(Default::default()).await; test_snapshot_inner(storage, meta_client, false, false).await; } #[tokio::test] async fn test_snapshot_with_sync_v2() { - let (storage, meta_client) = with_hummock_storage_v2(Default::default()).await; + let (storage, meta_client) = with_hummock_storage(Default::default()).await; test_snapshot_inner(storage, meta_client, true, false).await; } #[tokio::test] async fn test_snapshot_with_commit_v2() { - let (storage, meta_client) = with_hummock_storage_v2(Default::default()).await; + let (storage, meta_client) = with_hummock_storage(Default::default()).await; test_snapshot_inner(storage, meta_client, true, true).await; } #[tokio::test] async fn test_snapshot_range_scan_v2() { - let (storage, meta_client) = with_hummock_storage_v2(Default::default()).await; + let (storage, meta_client) = with_hummock_storage(Default::default()).await; test_snapshot_range_scan_inner(storage, meta_client, false, false).await; } #[tokio::test] async fn test_snapshot_range_scan_with_sync_v2() { - let (storage, meta_client) = with_hummock_storage_v2(Default::default()).await; + let (storage, meta_client) = with_hummock_storage(Default::default()).await; test_snapshot_range_scan_inner(storage, meta_client, true, false).await; } #[tokio::test] async fn test_snapshot_range_scan_with_commit_v2() { - let (storage, meta_client) = with_hummock_storage_v2(Default::default()).await; + let (storage, meta_client) = with_hummock_storage(Default::default()).await; test_snapshot_range_scan_inner(storage, meta_client, true, true).await; } diff --git a/src/storage/hummock_test/src/state_store_tests.rs b/src/storage/hummock_test/src/state_store_tests.rs index 914517ff83336..1fd6f6ae0de25 100644 --- a/src/storage/hummock_test/src/state_store_tests.rs +++ b/src/storage/hummock_test/src/state_store_tests.rs @@ -26,7 +26,7 @@ use risingwave_common::bitmap::Bitmap; use risingwave_common::catalog::{TableId, TableOption}; use risingwave_common::hash::VirtualNode; use risingwave_common::util::epoch::{test_epoch, EpochExt, INVALID_EPOCH, MAX_EPOCH}; -use risingwave_hummock_sdk::key::{prefixed_range_with_vnode, TableKeyRange}; +use risingwave_hummock_sdk::key::{prefixed_range_with_vnode, FullKey, TableKeyRange}; use risingwave_hummock_sdk::{HummockReadEpoch, LocalSstableInfo, SyncResult}; use risingwave_meta::hummock::test_utils::setup_compute_env; use risingwave_meta::hummock::CommitEpochInfo; @@ -44,11 +44,11 @@ use risingwave_storage::store_impl::verify::VerifyStateStore; use crate::get_notification_client_for_test; use crate::local_state_store_test_utils::LocalStateStoreTestExt; -use crate::test_utils::{gen_key_from_str, with_hummock_storage_v2, TestIngestBatch}; +use crate::test_utils::{gen_key_from_str, with_hummock_storage, TestIngestBatch}; #[tokio::test] -async fn test_empty_read_v2() { - let (hummock_storage, _meta_client) = with_hummock_storage_v2(Default::default()).await; +async fn test_empty_read() { + let (hummock_storage, _meta_client) = with_hummock_storage(Default::default()).await; assert!(hummock_storage .get( gen_key_from_str(VirtualNode::ZERO, "test_key"), @@ -82,8 +82,8 @@ async fn test_empty_read_v2() { } #[tokio::test] -async fn test_basic_v2() { - let (hummock_storage, meta_client) = with_hummock_storage_v2(Default::default()).await; +async fn test_basic() { + let (hummock_storage, meta_client) = with_hummock_storage(Default::default()).await; let anchor = gen_key_from_str(VirtualNode::ZERO, "aa"); // First batch inserts the anchor and others. @@ -418,8 +418,8 @@ async fn test_basic_v2() { } #[tokio::test] -async fn test_state_store_sync_v2() { - let (hummock_storage, _meta_client) = with_hummock_storage_v2(Default::default()).await; +async fn test_state_store_sync() { + let (hummock_storage, _meta_client) = with_hummock_storage(Default::default()).await; let mut epoch = INVALID_EPOCH.next_epoch(); @@ -544,7 +544,7 @@ async fn test_reload_storage() { let sstable_store = mock_sstable_store().await; let hummock_options = Arc::new(default_opts_for_test()); let (env, hummock_manager_ref, cluster_ctl_ref, worker_id) = setup_compute_env(8080).await; - let (hummock_storage, meta_client) = with_hummock_storage_v2(Default::default()).await; + let (hummock_storage, meta_client) = with_hummock_storage(Default::default()).await; let anchor = gen_key_from_str(VirtualNode::ZERO, "aa"); // First batch inserts the anchor and others. @@ -733,8 +733,8 @@ async fn test_reload_storage() { // Keep this test case's codes for future reference // #[tokio::test] -// async fn test_write_anytime_v2() { -// let (hummock_storage, meta_client) = with_hummock_storage_v2(Default::default()).await; +// async fn test_write_anytime() { +// let (hummock_storage, meta_client) = with_hummock_storage(Default::default()).await; // test_write_anytime_inner(hummock_storage, meta_client).await; // } @@ -1032,8 +1032,8 @@ async fn test_reload_storage() { // } #[tokio::test] -async fn test_delete_get_v2() { - let (hummock_storage, meta_client) = with_hummock_storage_v2(Default::default()).await; +async fn test_delete_get() { + let (hummock_storage, meta_client) = with_hummock_storage(Default::default()).await; let initial_epoch = INVALID_EPOCH; let epoch1 = initial_epoch.next_epoch(); @@ -1122,8 +1122,8 @@ async fn test_delete_get_v2() { } #[tokio::test] -async fn test_multiple_epoch_sync_v2() { - let (hummock_storage, meta_client) = with_hummock_storage_v2(Default::default()).await; +async fn test_multiple_epoch_sync() { + let (hummock_storage, meta_client) = with_hummock_storage(Default::default()).await; let initial_epoch = INVALID_EPOCH; let epoch1 = initial_epoch.next_epoch(); @@ -1287,7 +1287,7 @@ async fn test_multiple_epoch_sync_v2() { #[tokio::test] async fn test_clear_shared_buffer() { - let (hummock_storage, meta_client) = with_hummock_storage_v2(Default::default()).await; + let (hummock_storage, meta_client) = with_hummock_storage(Default::default()).await; let mut local_hummock_storage = hummock_storage .new_local(NewLocalOptions::for_test(Default::default())) .await; @@ -1367,7 +1367,7 @@ async fn test_clear_shared_buffer() { async fn test_replicated_local_hummock_storage() { const TEST_TABLE_ID: TableId = TableId { table_id: 233 }; - let (hummock_storage, meta_client) = with_hummock_storage_v2(TEST_TABLE_ID).await; + let (hummock_storage, meta_client) = with_hummock_storage(TEST_TABLE_ID).await; let epoch0 = meta_client .hummock_manager_ref() @@ -1568,7 +1568,7 @@ async fn test_replicated_local_hummock_storage() { #[tokio::test] async fn test_iter_log() { let table_id = TableId::new(233); - let (hummock_storage, meta_client) = with_hummock_storage_v2(table_id).await; + let (hummock_storage, meta_client) = with_hummock_storage(table_id).await; let epoch_count = 10; let key_count = 10000; @@ -1692,3 +1692,264 @@ async fn test_iter_log() { verify_iter_log((start_bound, Unbounded)).await; } } + +#[tokio::test] +async fn test_get_keyed_row() { + let (hummock_storage, meta_client) = with_hummock_storage(Default::default()).await; + let table_id = TableId::default(); + let anchor = gen_key_from_str(VirtualNode::ZERO, "aa"); + + // First batch inserts the anchor and others. + let batch1 = vec![ + (anchor.clone(), StorageValue::new_put("111")), + ( + gen_key_from_str(VirtualNode::ZERO, "bb"), + StorageValue::new_put("222"), + ), + ]; + + // Second batch modifies the anchor. + let batch2 = vec![ + (anchor.clone(), StorageValue::new_put("111111")), + ( + gen_key_from_str(VirtualNode::ZERO, "cc"), + StorageValue::new_put("333"), + ), + ]; + + // Third batch deletes the anchor + let batch3 = vec![ + (anchor.clone(), StorageValue::new_delete()), + ( + gen_key_from_str(VirtualNode::ZERO, "dd"), + StorageValue::new_put("444"), + ), + ( + gen_key_from_str(VirtualNode::ZERO, "ee"), + StorageValue::new_put("555"), + ), + ]; + + let mut local = hummock_storage + .new_local(NewLocalOptions::for_test(TableId::default())) + .await; + + // epoch 0 is reserved by storage service + let epoch1 = test_epoch(1); + hummock_storage.start_epoch(epoch1, HashSet::from_iter([Default::default()])); + local.init_for_test(epoch1).await.unwrap(); + + // try to write an empty batch, and hummock should write nothing + let size = local + .ingest_batch( + vec![], + WriteOptions { + epoch: epoch1, + table_id, + }, + ) + .await + .unwrap(); + assert_eq!(size, 0); + + // Write the first batch. + local + .ingest_batch( + batch1, + WriteOptions { + epoch: epoch1, + table_id, + }, + ) + .await + .unwrap(); + + let epoch2 = epoch1.next_epoch(); + hummock_storage.start_epoch(epoch2, HashSet::from_iter([Default::default()])); + local.seal_current_epoch(epoch2, SealCurrentEpochOptions::for_test()); + + // Get the value after flushing to remote. + let (key, value) = hummock_storage + .get_keyed_row( + anchor.clone(), + epoch1, + ReadOptions { + cache_policy: CachePolicy::Fill(CacheContext::Default), + ..Default::default() + }, + ) + .await + .unwrap() + .unwrap(); + assert_eq!(key, FullKey::new(table_id, anchor.clone(), epoch1)); + assert_eq!(value, Bytes::from("111")); + let (key, value) = hummock_storage + .get_keyed_row( + gen_key_from_str(VirtualNode::ZERO, "bb"), + epoch1, + ReadOptions { + cache_policy: CachePolicy::Fill(CacheContext::Default), + ..Default::default() + }, + ) + .await + .unwrap() + .unwrap(); + assert_eq!( + key, + FullKey::new(table_id, gen_key_from_str(VirtualNode::ZERO, "bb"), epoch1) + ); + assert_eq!(value, Bytes::from("222")); + + // Test looking for a nonexistent key. `next()` would return the next key. + let res = hummock_storage + .get_keyed_row( + gen_key_from_str(VirtualNode::ZERO, "ab"), + epoch1, + ReadOptions { + cache_policy: CachePolicy::Fill(CacheContext::Default), + ..Default::default() + }, + ) + .await + .unwrap(); + assert_eq!(res, None); + + // Write the second batch. + local + .ingest_batch( + batch2, + WriteOptions { + epoch: epoch2, + table_id, + }, + ) + .await + .unwrap(); + + let epoch3 = epoch2.next_epoch(); + hummock_storage.start_epoch(epoch3, HashSet::from_iter([Default::default()])); + local.seal_current_epoch(epoch3, SealCurrentEpochOptions::for_test()); + + // Get the value after flushing to remote. + let (key, value) = hummock_storage + .get_keyed_row( + anchor.clone(), + epoch2, + ReadOptions { + cache_policy: CachePolicy::Fill(CacheContext::Default), + ..Default::default() + }, + ) + .await + .unwrap() + .unwrap(); + assert_eq!(key, FullKey::new(table_id, anchor.clone(), epoch2)); + assert_eq!(value, Bytes::from("111111")); + + // Write the third batch. + local + .ingest_batch( + batch3, + WriteOptions { + epoch: epoch3, + table_id, + }, + ) + .await + .unwrap(); + + local.seal_current_epoch(u64::MAX, SealCurrentEpochOptions::for_test()); + + // Get the value after flushing to remote. + let res = hummock_storage + .get_keyed_row( + anchor.clone(), + epoch3, + ReadOptions { + cache_policy: CachePolicy::Fill(CacheContext::Default), + ..Default::default() + }, + ) + .await + .unwrap(); + assert_eq!(res, None); + + // Get the anchor value at the first snapshot + let (key, value) = hummock_storage + .get_keyed_row( + anchor.clone(), + epoch1, + ReadOptions { + cache_policy: CachePolicy::Fill(CacheContext::Default), + ..Default::default() + }, + ) + .await + .unwrap() + .unwrap(); + assert_eq!(key, FullKey::new(table_id, anchor.clone(), epoch1)); + assert_eq!(value, Bytes::from("111")); + + // Get the anchor value at the second snapshot + let (key, value) = hummock_storage + .get_keyed_row( + anchor.clone(), + epoch2, + ReadOptions { + cache_policy: CachePolicy::Fill(CacheContext::Default), + ..Default::default() + }, + ) + .await + .unwrap() + .unwrap(); + assert_eq!(key, FullKey::new(table_id, anchor.clone(), epoch2)); + assert_eq!(value, Bytes::from("111111")); + + let res = hummock_storage + .seal_and_sync_epoch(epoch1, HashSet::from_iter([local.table_id()])) + .await + .unwrap(); + let is_log_store = false; + meta_client + .commit_epoch(epoch1, res, is_log_store) + .await + .unwrap(); + hummock_storage + .try_wait_epoch( + HummockReadEpoch::Committed(epoch1), + TryWaitEpochOptions::for_test(local.table_id()), + ) + .await + .unwrap(); + let (key, value) = hummock_storage + .get_keyed_row( + gen_key_from_str(VirtualNode::ZERO, "bb"), + epoch2, + ReadOptions { + cache_policy: CachePolicy::Fill(CacheContext::Default), + ..Default::default() + }, + ) + .await + .unwrap() + .unwrap(); + assert_eq!( + key, + FullKey::new(table_id, gen_key_from_str(VirtualNode::ZERO, "bb"), epoch1) + ); + assert_eq!(value, Bytes::from("222")); + let res = hummock_storage + .get_keyed_row( + gen_key_from_str(VirtualNode::ZERO, "dd"), + epoch2, + ReadOptions { + cache_policy: CachePolicy::Fill(CacheContext::Default), + ..Default::default() + }, + ) + .await + .unwrap(); + assert!(res.is_none()); +} diff --git a/src/storage/hummock_test/src/test_utils.rs b/src/storage/hummock_test/src/test_utils.rs index c403917938fb5..5b0f90df73bee 100644 --- a/src/storage/hummock_test/src/test_utils.rs +++ b/src/storage/hummock_test/src/test_utils.rs @@ -118,7 +118,7 @@ impl TestIngestBatch for S { } } -pub async fn with_hummock_storage_v2( +pub async fn with_hummock_storage( table_id: TableId, ) -> (HummockStorage, Arc) { let sstable_store = mock_sstable_store().await; diff --git a/src/storage/src/hummock/store/hummock_storage.rs b/src/storage/src/hummock/store/hummock_storage.rs index 74ecff472a5d5..efc4989b8e6ec 100644 --- a/src/storage/src/hummock/store/hummock_storage.rs +++ b/src/storage/src/hummock/store/hummock_storage.rs @@ -258,7 +258,7 @@ impl HummockStorage { key: TableKey, epoch: HummockEpoch, read_options: ReadOptions, - ) -> StorageResult> { + ) -> StorageResult> { let key_range = (Bound::Included(key.clone()), Bound::Included(key.clone())); let (key_range, read_version_tuple) = self @@ -579,12 +579,12 @@ impl StateStoreRead for HummockStorage { type Iter = HummockStorageIterator; type RevIter = HummockStorageRevIterator; - fn get( + fn get_keyed_row( &self, key: TableKey, epoch: u64, read_options: ReadOptions, - ) -> impl Future>> + '_ { + ) -> impl Future>> + Send + '_ { self.get_inner(key, epoch, read_options) } diff --git a/src/storage/src/hummock/store/local_hummock_storage.rs b/src/storage/src/hummock/store/local_hummock_storage.rs index b53dc6ee27c73..bedc903283358 100644 --- a/src/storage/src/hummock/store/local_hummock_storage.rs +++ b/src/storage/src/hummock/store/local_hummock_storage.rs @@ -112,7 +112,7 @@ impl LocalHummockStorage { table_key: TableKey, epoch: u64, read_options: ReadOptions, - ) -> StorageResult> { + ) -> StorageResult> { let table_key_range = ( Bound::Included(table_key.clone()), Bound::Included(table_key.clone()), @@ -248,12 +248,12 @@ impl StateStoreRead for LocalHummockStorage { type Iter = HummockStorageIterator; type RevIter = HummockStorageRevIterator; - fn get( + fn get_keyed_row( &self, key: TableKey, epoch: u64, read_options: ReadOptions, - ) -> impl Future>> + '_ { + ) -> impl Future>> + Send + '_ { assert!(epoch <= self.epoch()); self.get_inner(key, epoch, read_options) } @@ -305,7 +305,10 @@ impl LocalStateStore for LocalHummockStorage { read_options: ReadOptions, ) -> StorageResult> { match self.mem_table.buffer.get(&key) { - None => self.get_inner(key, self.epoch(), read_options).await, + None => self + .get_inner(key, self.epoch(), read_options) + .await + .map(|e| e.map(|item| item.1)), Some(op) => match op { KeyOp::Insert(value) | KeyOp::Update((_, value)) => Ok(Some(value.clone())), KeyOp::Delete(_) => Ok(None), @@ -742,7 +745,7 @@ pub struct HummockStorageIteratorInner<'a> { } impl<'a> StateStoreIter for HummockStorageIteratorInner<'a> { - async fn try_next<'b>(&'b mut self) -> StorageResult>> { + async fn try_next<'b>(&'b mut self) -> StorageResult>> { let iter = &mut self.inner; if !self.initial_read { self.initial_read = true; @@ -824,7 +827,7 @@ pub struct HummockStorageRevIteratorInner<'a> { } impl<'a> StateStoreIter for HummockStorageRevIteratorInner<'a> { - async fn try_next<'b>(&'b mut self) -> StorageResult>> { + async fn try_next<'b>(&'b mut self) -> StorageResult>> { let iter = &mut self.inner; if !self.initial_read { self.initial_read = true; diff --git a/src/storage/src/hummock/store/version.rs b/src/storage/src/hummock/store/version.rs index d38424820c703..ce1173fe8262a 100644 --- a/src/storage/src/hummock/store/version.rs +++ b/src/storage/src/hummock/store/version.rs @@ -66,7 +66,7 @@ use crate::mem_table::{ use crate::monitor::{ GetLocalMetricsGuard, HummockStateStoreMetrics, IterLocalMetricsGuard, StoreLocalStatistic, }; -use crate::store::{gen_min_epoch, ReadLogOptions, ReadOptions}; +use crate::store::{gen_min_epoch, ReadLogOptions, ReadOptions, StateStoreKeyedRow}; pub type CommittedVersion = PinnedVersion; @@ -530,7 +530,7 @@ impl HummockVersionReader { epoch: u64, read_options: ReadOptions, read_version_tuple: ReadVersionTuple, - ) -> StorageResult> { + ) -> StorageResult> { let (imms, uncommitted_ssts, committed_version) = read_version_tuple; let min_epoch = gen_min_epoch(epoch, read_options.retention_seconds.as_ref()); @@ -558,7 +558,16 @@ impl HummockVersionReader { return Ok(if data_epoch.pure_epoch() < min_epoch { None } else { - data.into_user_value() + data.into_user_value().map(|v| { + ( + FullKey::new_with_gap_epoch( + read_options.table_id, + table_key.clone(), + data_epoch, + ), + v, + ) + }) }); } } @@ -590,7 +599,16 @@ impl HummockVersionReader { return Ok(if data_epoch.pure_epoch() < min_epoch { None } else { - data.into_user_value() + data.into_user_value().map(|v| { + ( + FullKey::new_with_gap_epoch( + read_options.table_id, + table_key.clone(), + data_epoch, + ), + v, + ) + }) }); } } @@ -626,7 +644,16 @@ impl HummockVersionReader { return Ok(if data_epoch.pure_epoch() < min_epoch { None } else { - data.into_user_value() + data.into_user_value().map(|v| { + ( + FullKey::new_with_gap_epoch( + read_options.table_id, + table_key.clone(), + data_epoch, + ), + v, + ) + }) }); } } @@ -661,7 +688,16 @@ impl HummockVersionReader { return Ok(if data_epoch.pure_epoch() < min_epoch { None } else { - data.into_user_value() + data.into_user_value().map(|v| { + ( + FullKey::new_with_gap_epoch( + read_options.table_id, + table_key.clone(), + data_epoch, + ), + v, + ) + }) }); } } diff --git a/src/storage/src/mem_table.rs b/src/storage/src/mem_table.rs index 50984052fc5bc..86fe4ecd3f6ec 100644 --- a/src/storage/src/mem_table.rs +++ b/src/storage/src/mem_table.rs @@ -427,10 +427,10 @@ impl KeyOp { } } -#[try_stream(ok = StateStoreIterItem, error = StorageError)] +#[try_stream(ok = StateStoreKeyedRow, error = StorageError)] pub(crate) async fn merge_stream<'a>( mem_table_iter: impl Iterator, &'a KeyOp)> + 'a, - inner_stream: impl Stream> + 'static, + inner_stream: impl Stream> + 'static, table_id: TableId, epoch: u64, rev: bool, diff --git a/src/storage/src/memory.rs b/src/storage/src/memory.rs index 9558811a2bdb0..6f6de5a47dd0f 100644 --- a/src/storage/src/memory.rs +++ b/src/storage/src/memory.rs @@ -608,19 +608,22 @@ impl StateStoreRead for RangeKvStateStore { type RevIter = RangeKvStateStoreRevIter; #[allow(clippy::unused_async)] - async fn get( + async fn get_keyed_row( &self, key: TableKey, epoch: u64, read_options: ReadOptions, - ) -> StorageResult> { + ) -> StorageResult> { let range_bounds = (Bound::Included(key.clone()), Bound::Included(key)); // We do not really care about vnodes here, so we just use the default value. let res = self.scan(range_bounds, epoch, read_options.table_id, Some(1))?; Ok(match res.as_slice() { [] => None, - [(_, value)] => Some(value.clone()), + [(key, value)] => Some(( + FullKey::decode(key.as_ref()).to_vec().into_bytes(), + value.clone(), + )), _ => unreachable!(), }) } @@ -767,7 +770,7 @@ pub struct RangeKvStateStoreIter { last_key: Option>, - item_buffer: Option, + item_buffer: Option, } impl RangeKvStateStoreIter { @@ -788,7 +791,7 @@ impl RangeKvStateStoreIter { impl StateStoreIter for RangeKvStateStoreIter { #[allow(clippy::unused_async)] - async fn try_next(&mut self) -> StorageResult>> { + async fn try_next(&mut self) -> StorageResult>> { self.next_inner()?; Ok(self .item_buffer @@ -826,7 +829,7 @@ pub struct RangeKvStateStoreRevIter { epoch: HummockEpoch, is_inclusive_epoch: bool, - item_buffer: VecDeque, + item_buffer: VecDeque, } impl RangeKvStateStoreRevIter { @@ -846,7 +849,7 @@ impl RangeKvStateStoreRevIter { impl StateStoreIter for RangeKvStateStoreRevIter { #[allow(clippy::unused_async)] - async fn try_next(&mut self) -> StorageResult>> { + async fn try_next(&mut self) -> StorageResult>> { self.next_inner()?; Ok(self .item_buffer diff --git a/src/storage/src/monitor/monitored_storage_metrics.rs b/src/storage/src/monitor/monitored_storage_metrics.rs index 5813c20b4e9ef..8bd7ef64b6b83 100644 --- a/src/storage/src/monitor/monitored_storage_metrics.rs +++ b/src/storage/src/monitor/monitored_storage_metrics.rs @@ -34,7 +34,7 @@ use risingwave_common::{ }; use crate::store::{ - ChangeLogValue, IterItem, StateStoreIterItem, StateStoreIterItemRef, StateStoreReadLogItem, + ChangeLogValue, IterItem, StateStoreKeyedRow, StateStoreKeyedRowRef, StateStoreReadLogItem, StateStoreReadLogItemRef, }; @@ -500,7 +500,7 @@ impl StateStoreIterStats { } impl StateStoreIterStatsTrait for StateStoreIterStats { - type Item = StateStoreIterItem; + type Item = StateStoreKeyedRow; fn new(table_id: u32, metrics: &MonitoredStorageMetrics, iter_init_duration: Duration) -> Self { Self::for_table_metrics(table_id, metrics, |metrics| { @@ -511,7 +511,7 @@ impl StateStoreIterStatsTrait for StateStoreIterStats { } } - fn observe(&mut self, (key, value): StateStoreIterItemRef<'_>) { + fn observe(&mut self, (key, value): StateStoreKeyedRowRef<'_>) { self.inner.total_items += 1; self.inner.total_size += key.encoded_len() + value.len(); } diff --git a/src/storage/src/monitor/monitored_store.rs b/src/storage/src/monitor/monitored_store.rs index a7be71307ffec..e5e3ceaaca016 100644 --- a/src/storage/src/monitor/monitored_store.rs +++ b/src/storage/src/monitor/monitored_store.rs @@ -147,6 +147,30 @@ impl MonitoredStateStore { Ok(value) } + + async fn monitored_get_keyed_row( + &self, + get_keyed_row_future: impl Future>>, + table_id: TableId, + key_len: usize, + ) -> StorageResult> { + let mut stats = + MonitoredStateStoreGetStats::new(table_id.table_id, self.storage_metrics.clone()); + + let value = get_keyed_row_future + .verbose_instrument_await("store_get_keyed_row") + .instrument(tracing::trace_span!("store_get_keyed_row")) + .await + .inspect_err(|e| error!(error = %e.as_report(), "Failed in get"))?; + + stats.get_key_size = key_len; + if let Some((_, value)) = value.as_ref() { + stats.get_value_size = value.len(); + } + stats.report(); + + Ok(value) + } } impl StateStoreRead for MonitoredStateStore { @@ -154,15 +178,19 @@ impl StateStoreRead for MonitoredStateStore { type Iter = impl StateStoreReadIter; type RevIter = impl StateStoreReadIter; - fn get( + fn get_keyed_row( &self, key: TableKey, epoch: u64, read_options: ReadOptions, - ) -> impl Future>> + '_ { + ) -> impl Future>> + '_ { let table_id = read_options.table_id; let key_len = key.len(); - self.monitored_get(self.inner.get(key, epoch, read_options), table_id, key_len) + self.monitored_get_keyed_row( + self.inner.get_keyed_row(key, epoch, read_options), + table_id, + key_len, + ) } fn iter( diff --git a/src/storage/src/monitor/traced_store.rs b/src/storage/src/monitor/traced_store.rs index 8bd8013ba3810..f06c5634a5220 100644 --- a/src/storage/src/monitor/traced_store.rs +++ b/src/storage/src/monitor/traced_store.rs @@ -105,6 +105,29 @@ impl TracedStateStore { ))); res } + + async fn traced_get_keyed_row( + &self, + key: TableKey, + epoch: Option, + read_options: ReadOptions, + get_future: impl Future>>, + ) -> StorageResult> { + let span = TraceSpan::new_get_span( + key.0.clone(), + epoch, + read_options.clone().into(), + self.storage_type, + ); + + let res = get_future.await; + + span.may_send_result(OperationResult::Get(TraceResult::from( + res.as_ref() + .map(|o| o.as_ref().map(|(_, v)| TracedBytes::from(v.clone()))), + ))); + res + } } impl LocalStateStore for TracedStateStore { @@ -279,17 +302,17 @@ impl StateStoreRead for TracedStateStore { type Iter = impl StateStoreReadIter; type RevIter = impl StateStoreReadIter; - fn get( + fn get_keyed_row( &self, key: TableKey, epoch: u64, read_options: ReadOptions, - ) -> impl Future>> + Send + '_ { - self.traced_get( + ) -> impl Future>> + Send + '_ { + self.traced_get_keyed_row( key.clone(), Some(epoch), read_options.clone(), - self.inner.get(key, epoch, read_options), + self.inner.get_keyed_row(key, epoch, read_options), ) } @@ -373,7 +396,7 @@ impl TracedStateStoreIter { } impl StateStoreIter for TracedStateStoreIter { - async fn try_next(&mut self) -> StorageResult>> { + async fn try_next(&mut self) -> StorageResult>> { if let Some((key, value)) = self .inner .try_next() diff --git a/src/storage/src/panic_store.rs b/src/storage/src/panic_store.rs index 03b0471f90446..ee1f8ebfbbf43 100644 --- a/src/storage/src/panic_store.rs +++ b/src/storage/src/panic_store.rs @@ -35,16 +35,16 @@ pub struct PanicStateStore; impl StateStoreRead for PanicStateStore { type ChangeLogIter = PanicStateStoreIter; - type Iter = PanicStateStoreIter; - type RevIter = PanicStateStoreIter; + type Iter = PanicStateStoreIter; + type RevIter = PanicStateStoreIter; #[allow(clippy::unused_async)] - async fn get( + async fn get_keyed_row( &self, _key: TableKey, _epoch: u64, _read_options: ReadOptions, - ) -> StorageResult> { + ) -> StorageResult> { panic!("should not read from the state store!"); } @@ -90,8 +90,8 @@ impl StateStoreWrite for PanicStateStore { } impl LocalStateStore for PanicStateStore { - type Iter<'a> = PanicStateStoreIter; - type RevIter<'a> = PanicStateStoreIter; + type Iter<'a> = PanicStateStoreIter; + type RevIter<'a> = PanicStateStoreIter; #[allow(clippy::unused_async)] async fn get( diff --git a/src/storage/src/store.rs b/src/storage/src/store.rs index 6eadf5ba8a64a..bf93639b3d70e 100644 --- a/src/storage/src/store.rs +++ b/src/storage/src/store.rs @@ -22,7 +22,7 @@ use std::ops::Bound; use std::sync::{Arc, LazyLock}; use bytes::Bytes; -use futures::{Stream, TryStreamExt}; +use futures::{Stream, TryFutureExt, TryStreamExt}; use futures_async_stream::try_stream; use prost::Message; use risingwave_common::array::Op; @@ -51,25 +51,25 @@ pub trait IterItem: Send + 'static { type ItemRef<'a>: Send + Copy + 'a; } -impl IterItem for StateStoreIterItem { - type ItemRef<'a> = StateStoreIterItemRef<'a>; +impl IterItem for StateStoreKeyedRow { + type ItemRef<'a> = StateStoreKeyedRowRef<'a>; } impl IterItem for StateStoreReadLogItem { type ItemRef<'a> = StateStoreReadLogItemRef<'a>; } -pub trait StateStoreIter: Send { +pub trait StateStoreIter: Send { fn try_next( &mut self, ) -> impl Future>>> + Send + '_; } -pub fn to_owned_item((key, value): StateStoreIterItemRef<'_>) -> StorageResult { +pub fn to_owned_item((key, value): StateStoreKeyedRowRef<'_>) -> StorageResult { Ok((key.copy_into(), Bytes::copy_from_slice(value))) } -pub trait StateStoreIterExt: StateStoreIter + Sized { +pub trait StateStoreIterExt: StateStoreIter + Sized { type ItemStream Fn(T::ItemRef<'a>) -> StorageResult>: Stream> + Send; @@ -101,7 +101,7 @@ async fn into_stream_inner< pub struct FromStreamStateStoreIter { inner: S, - item_buffer: Option, + item_buffer: Option, } impl FromStreamStateStoreIter { @@ -113,10 +113,10 @@ impl FromStreamStateStoreIter { } } -impl> + Unpin + Send> StateStoreIter +impl> + Unpin + Send> StateStoreIter for FromStreamStateStoreIter { - async fn try_next(&mut self) -> StorageResult>> { + async fn try_next(&mut self) -> StorageResult>> { self.item_buffer = self.inner.try_next().await?; Ok(self .item_buffer @@ -167,8 +167,8 @@ impl> StateStoreIterExt for I { } } -pub type StateStoreIterItemRef<'a> = (FullKey<&'a [u8]>, &'a [u8]); -pub type StateStoreIterItem = (FullKey, Bytes); +pub type StateStoreKeyedRowRef<'a> = (FullKey<&'a [u8]>, &'a [u8]); +pub type StateStoreKeyedRow = (FullKey, Bytes); pub trait StateStoreReadIter = StateStoreIter + 'static; #[derive(Clone, Copy, Eq, PartialEq, Debug)] @@ -247,12 +247,26 @@ pub trait StateStoreRead: StaticSendSync { /// Point gets a value from the state store. /// The result is based on a snapshot corresponding to the given `epoch`. + /// Both full key and the value are returned. + fn get_keyed_row( + &self, + key: TableKey, + epoch: u64, + read_options: ReadOptions, + ) -> impl Future>> + Send + '_; + + /// Point gets a value from the state store. + /// The result is based on a snapshot corresponding to the given `epoch`. + /// Only the value is returned. fn get( &self, key: TableKey, epoch: u64, read_options: ReadOptions, - ) -> impl Future>> + Send + '_; + ) -> impl Future>> + Send + '_ { + self.get_keyed_row(key, epoch, read_options) + .map_ok(|v| v.map(|(_, v)| v)) + } /// Opens and returns an iterator for given `prefix_hint` and `full_key_range` /// Internally, `prefix_hint` will be used to for checking `bloom_filter` and @@ -295,7 +309,7 @@ pub trait StateStoreReadExt: StaticSendSync { epoch: u64, limit: Option, read_options: ReadOptions, - ) -> impl Future>> + Send + '_; + ) -> impl Future>> + Send + '_; } impl StateStoreReadExt for S { @@ -305,7 +319,7 @@ impl StateStoreReadExt for S { epoch: u64, limit: Option, mut read_options: ReadOptions, - ) -> StorageResult> { + ) -> StorageResult> { if limit.is_some() { read_options.prefetch_options.prefetch = false; } diff --git a/src/storage/src/store_impl.rs b/src/storage/src/store_impl.rs index 2008047378479..f59395d26db7f 100644 --- a/src/storage/src/store_impl.rs +++ b/src/storage/src/store_impl.rs @@ -279,18 +279,18 @@ pub mod verify { type Iter = impl StateStoreReadIter; type RevIter = impl StateStoreReadIter; - async fn get( + async fn get_keyed_row( &self, key: TableKey, epoch: u64, read_options: ReadOptions, - ) -> StorageResult> { + ) -> StorageResult> { let actual = self .actual - .get(key.clone(), epoch, read_options.clone()) + .get_keyed_row(key.clone(), epoch, read_options.clone()) .await; if let Some(expected) = &self.expected { - let expected = expected.get(key, epoch, read_options).await; + let expected = expected.get_keyed_row(key, epoch, read_options).await; assert_result_eq(&actual, &expected); } actual @@ -316,7 +316,7 @@ pub mod verify { None }; - Ok(verify_iter::(actual, expected)) + Ok(verify_iter::(actual, expected)) } } @@ -338,7 +338,7 @@ pub mod verify { None }; - Ok(verify_iter::(actual, expected)) + Ok(verify_iter::(actual, expected)) } } @@ -455,7 +455,7 @@ pub mod verify { None }; - Ok(verify_iter::(actual, expected)) + Ok(verify_iter::(actual, expected)) } } @@ -476,7 +476,7 @@ pub mod verify { None }; - Ok(verify_iter::(actual, expected)) + Ok(verify_iter::(actual, expected)) } } @@ -871,17 +871,17 @@ pub mod boxed_state_store { // For StateStoreRead - pub type BoxStateStoreReadIter = BoxStateStoreIter<'static, StateStoreIterItem>; + pub type BoxStateStoreReadIter = BoxStateStoreIter<'static, StateStoreKeyedRow>; pub type BoxStateStoreReadChangeLogIter = BoxStateStoreIter<'static, StateStoreReadLogItem>; #[async_trait::async_trait] pub trait DynamicDispatchedStateStoreRead: StaticSendSync { - async fn get( + async fn get_keyed_row( &self, key: TableKey, epoch: u64, read_options: ReadOptions, - ) -> StorageResult>; + ) -> StorageResult>; async fn iter( &self, @@ -907,13 +907,13 @@ pub mod boxed_state_store { #[async_trait::async_trait] impl DynamicDispatchedStateStoreRead for S { - async fn get( + async fn get_keyed_row( &self, key: TableKey, epoch: u64, read_options: ReadOptions, - ) -> StorageResult> { - self.get(key, epoch, read_options).await + ) -> StorageResult> { + self.get_keyed_row(key, epoch, read_options).await } async fn iter( @@ -949,7 +949,7 @@ pub mod boxed_state_store { } // For LocalStateStore - pub type BoxLocalStateStoreIterStream<'a> = BoxStateStoreIter<'a, StateStoreIterItem>; + pub type BoxLocalStateStoreIterStream<'a> = BoxStateStoreIter<'a, StateStoreKeyedRow>; #[async_trait::async_trait] pub trait DynamicDispatchedLocalStateStore: StaticSendSync { async fn get( @@ -1200,13 +1200,13 @@ pub mod boxed_state_store { type Iter = BoxStateStoreReadIter; type RevIter = BoxStateStoreReadIter; - fn get( + fn get_keyed_row( &self, key: TableKey, epoch: u64, read_options: ReadOptions, - ) -> impl Future>> + Send + '_ { - self.deref().get(key, epoch, read_options) + ) -> impl Future>> + Send + '_ { + self.deref().get_keyed_row(key, epoch, read_options) } fn iter( diff --git a/src/stream/src/common/log_store_impl/kv_log_store/reader.rs b/src/stream/src/common/log_store_impl/kv_log_store/reader.rs index b2b19c45bd0a5..9dcc0453bc7db 100644 --- a/src/stream/src/common/log_store_impl/kv_log_store/reader.rs +++ b/src/stream/src/common/log_store_impl/kv_log_store/reader.rs @@ -38,7 +38,7 @@ use risingwave_hummock_sdk::HummockEpoch; use risingwave_storage::error::StorageResult; use risingwave_storage::hummock::CachePolicy; use risingwave_storage::store::{ - PrefetchOptions, ReadOptions, StateStoreIterItemRef, StateStoreRead, + PrefetchOptions, ReadOptions, StateStoreKeyedRowRef, StateStoreRead, }; use risingwave_storage::{StateStore, StateStoreIter}; use tokio::sync::watch; @@ -298,7 +298,7 @@ use timeout_auto_rebuild::*; impl bool + Send> StateStoreIter for AutoRebuildStateStoreReadIter { - async fn try_next(&mut self) -> StorageResult>> { + async fn try_next(&mut self) -> StorageResult>> { let should_rebuild = (self.should_rebuild)(); if should_rebuild { let Some((key, _value)) = self.iter.try_next().await? else { @@ -318,7 +318,7 @@ impl bool + Send> StateStoreIter ) .await?; self.iter = new_iter; - let item: Option> = self.iter.try_next().await?; + let item: Option> = self.iter.try_next().await?; if let Some((key, value)) = item { assert_eq!( key.user_key.table_key.0, diff --git a/src/stream/src/common/log_store_impl/kv_log_store/serde.rs b/src/stream/src/common/log_store_impl/kv_log_store/serde.rs index 4bfe82aa7c41d..21e2c48f3858f 100644 --- a/src/stream/src/common/log_store_impl/kv_log_store/serde.rs +++ b/src/stream/src/common/log_store_impl/kv_log_store/serde.rs @@ -947,7 +947,7 @@ mod tests { use risingwave_hummock_sdk::key::FullKey; use risingwave_storage::error::StorageResult; use risingwave_storage::store::{ - FromStreamStateStoreIter, StateStoreIterItem, StateStoreReadIter, + FromStreamStateStoreIter, StateStoreKeyedRow, StateStoreReadIter, }; use risingwave_storage::table::SINGLETON_VNODE; use tokio::sync::oneshot; @@ -1168,7 +1168,7 @@ mod tests { epoch: u64, seq_id: &mut SeqIdType, ) -> ( - impl Stream>, + impl Stream>, Sender<()>, ) { let (tx, rx) = oneshot::channel(); @@ -1197,7 +1197,7 @@ mod tests { seq_id: &mut SeqIdType, base: i64, ) -> ( - impl Stream>, + impl Stream>, oneshot::Sender<()>, oneshot::Sender<()>, Vec,