diff --git a/src/stream/src/common/log_store_impl/kv_log_store/reader.rs b/src/stream/src/common/log_store_impl/kv_log_store/reader.rs index 3e9e9ad4833e8..7ac4885d2ad7a 100644 --- a/src/stream/src/common/log_store_impl/kv_log_store/reader.rs +++ b/src/stream/src/common/log_store_impl/kv_log_store/reader.rs @@ -13,11 +13,13 @@ // limitations under the License. use std::future::Future; +use std::ops::Bound; use std::ops::Bound::{Excluded, Included, Unbounded}; use std::pin::Pin; -use std::time::Duration; +use std::time::{Duration, Instant}; use anyhow::anyhow; +use bytes::Bytes; use foyer::memory::CacheContext; use futures::future::{try_join_all, BoxFuture}; use futures::{FutureExt, TryFutureExt}; @@ -30,11 +32,14 @@ use risingwave_common::util::epoch::EpochExt; use risingwave_connector::sink::log_store::{ ChunkId, LogReader, LogStoreReadItem, LogStoreResult, TruncateOffset, }; -use risingwave_hummock_sdk::key::prefixed_range_with_vnode; +use risingwave_hummock_sdk::key::{prefixed_range_with_vnode, FullKey, TableKey, TableKeyRange}; use risingwave_hummock_sdk::HummockEpoch; +use risingwave_storage::error::StorageResult; use risingwave_storage::hummock::CachePolicy; -use risingwave_storage::store::{PrefetchOptions, ReadOptions}; -use risingwave_storage::StateStore; +use risingwave_storage::store::{ + PrefetchOptions, ReadOptions, StateStoreIterItemRef, StateStoreRead, +}; +use risingwave_storage::{StateStore, StateStoreIter}; use tokio::sync::watch; use tokio::time::sleep; use tokio_stream::StreamExt; @@ -112,7 +117,7 @@ pub struct KvLogStoreReader { first_write_epoch: Option, /// `Some` means consuming historical log data - state_store_stream: Option>>>, + state_store_stream: Option>>>>, /// Store the future that attempts to read a flushed stream chunk. /// This is for cancellation safety. Since it is possible that the future of `next_item` @@ -179,12 +184,141 @@ impl KvLogStoreReader { } } +struct AutoRebuildStateStoreReadIter { + state_store: S, + iter: S::Iter, + // call to get whether to rebuild the iter. Once return true, the closure should reset itself. + should_rebuild: F, + end_bound: Bound>, + epoch: HummockEpoch, + options: ReadOptions, +} + +impl bool> AutoRebuildStateStoreReadIter { + async fn new( + state_store: S, + should_rebuild: F, + range: TableKeyRange, + epoch: HummockEpoch, + options: ReadOptions, + ) -> StorageResult { + let (start_bound, end_bound) = range; + let iter = state_store + .iter((start_bound, end_bound.clone()), epoch, options.clone()) + .await?; + Ok(Self { + state_store, + iter, + should_rebuild, + end_bound, + epoch, + options, + }) + } +} + +type TimeoutAutoRebuildIter = + AutoRebuildStateStoreReadIter bool + Send>; + +async fn iter_with_timeout_rebuild( + state_store: S, + range: TableKeyRange, + epoch: HummockEpoch, + options: ReadOptions, + timeout: Duration, +) -> StorageResult> { + const CHECK_TIMEOUT_PERIOD: usize = 100; + // use a struct here to avoid accidental copy instead of move on primitive usize + struct Count(usize); + let mut check_count = Count(0); + let mut total_count = Count(0); + let mut curr_iter_item_count = Count(0); + let mut start_time = Instant::now(); + let initial_start_time = start_time; + AutoRebuildStateStoreReadIter::new( + state_store, + move || { + check_count.0 += 1; + curr_iter_item_count.0 += 1; + total_count.0 += 1; + if check_count.0 == CHECK_TIMEOUT_PERIOD { + check_count.0 = 0; + if start_time.elapsed() > timeout { + let prev_iter_item_count = curr_iter_item_count.0; + curr_iter_item_count.0 = 0; + start_time = Instant::now(); + info!( + table_id = options.table_id.table_id, + iter_exist_time_secs = initial_start_time.elapsed().as_secs(), + prev_iter_item_count, + total_iter_item_count = total_count.0, + "kv log store iter is rebuilt" + ); + true + } else { + false + } + } else { + false + } + }, + range, + epoch, + options, + ) + .await +} + +impl bool + Send> StateStoreIter + for AutoRebuildStateStoreReadIter +{ + async fn try_next(&mut self) -> StorageResult>> { + let should_rebuild = (self.should_rebuild)(); + if should_rebuild { + let Some((key, _value)) = self.iter.try_next().await? else { + return Ok(None); + }; + let key: FullKey<&[u8]> = key; + let range_start = Bytes::copy_from_slice(key.user_key.table_key.as_ref()); + let new_iter = self + .state_store + .iter( + ( + Included(TableKey(range_start.clone())), + self.end_bound.clone(), + ), + self.epoch, + self.options.clone(), + ) + .await?; + self.iter = new_iter; + let item: Option> = self.iter.try_next().await?; + if let Some((key, value)) = item { + assert_eq!( + key.user_key.table_key.0, + range_start.as_ref(), + "the first key should be the previous key" + ); + Ok(Some((key, value))) + } else { + unreachable!( + "the first key should be the previous key {:?}, but get None", + range_start + ) + } + } else { + self.iter.try_next().await + } + } +} + impl KvLogStoreReader { fn read_persisted_log_store( &self, last_persisted_epoch: Option, - ) -> impl Future>>>> + Send - { + ) -> impl Future< + Output = LogStoreResult>>>>, + > + Send { let range_start = if let Some(last_persisted_epoch) = last_persisted_epoch { // start from the next epoch of last_persisted_epoch Included( @@ -209,19 +343,21 @@ impl KvLogStoreReader { ); let state_store = self.state_store.clone(); async move { - state_store - .iter( - key_range, - HummockEpoch::MAX, - ReadOptions { - // This stream lives too long, the connection of prefetch object may break. So use a short connection prefetch. - prefetch_options: PrefetchOptions::prefetch_for_small_range_scan(), - cache_policy: CachePolicy::Fill(CacheContext::LruPriorityLow), - table_id, - ..Default::default() - }, - ) - .await + // rebuild the iter every 10 minutes to avoid pinning hummock version for too long + iter_with_timeout_rebuild( + state_store, + key_range, + HummockEpoch::MAX, + ReadOptions { + // This stream lives too long, the connection of prefetch object may break. So use a short connection prefetch. + prefetch_options: PrefetchOptions::prefetch_for_small_range_scan(), + cache_policy: CachePolicy::Fill(CacheContext::LruPriorityLow), + table_id, + ..Default::default() + }, + Duration::from_secs(10 * 60), + ) + .await } })); @@ -488,3 +624,98 @@ impl LogReader for KvLogStoreReader { Ok((true, Some((**self.serde.vnodes()).clone()))) } } + +#[cfg(test)] +mod tests { + use std::ops::Bound::Unbounded; + + use bytes::Bytes; + use itertools::Itertools; + use risingwave_common::util::epoch::test_epoch; + use risingwave_hummock_sdk::key::TableKey; + use risingwave_storage::hummock::iterator::test_utils::{ + iterator_test_table_key_of, iterator_test_value_of, + }; + use risingwave_storage::memory::MemoryStateStore; + use risingwave_storage::storage_value::StorageValue; + use risingwave_storage::store::{ReadOptions, StateStoreRead, StateStoreWrite, WriteOptions}; + use risingwave_storage::StateStoreIter; + + use crate::common::log_store_impl::kv_log_store::reader::AutoRebuildStateStoreReadIter; + use crate::common::log_store_impl::kv_log_store::test_utils::TEST_TABLE_ID; + + #[tokio::test] + async fn test_auto_rebuild_iter() { + let state_store = MemoryStateStore::new(); + let key_count = 100; + let pairs = (0..key_count) + .map(|i| { + let key = iterator_test_table_key_of(i); + let value = iterator_test_value_of(i); + (TableKey(Bytes::from(key)), StorageValue::new_put(value)) + }) + .collect_vec(); + let epoch = test_epoch(1); + state_store + .ingest_batch( + pairs.clone(), + vec![], + WriteOptions { + epoch, + table_id: TEST_TABLE_ID, + }, + ) + .unwrap(); + + async fn validate( + mut kv_iter: impl Iterator, StorageValue)>, + mut iter: impl StateStoreIter, + ) { + while let Some((key, value)) = iter.try_next().await.unwrap() { + let (k, v) = kv_iter.next().unwrap(); + assert_eq!(key.user_key.table_key, k.to_ref()); + assert_eq!(v.user_value.as_deref(), Some(value)); + } + assert!(kv_iter.next().is_none()); + } + + let read_options = ReadOptions { + table_id: TEST_TABLE_ID, + ..Default::default() + }; + + let kv_iter = pairs.clone().into_iter(); + let iter = state_store + .iter((Unbounded, Unbounded), epoch, read_options.clone()) + .await + .unwrap(); + validate(kv_iter, iter).await; + + let kv_iter = pairs.clone().into_iter(); + let mut count = 0; + let count_mut_ref = &mut count; + let rebuild_period = 8; + let mut rebuild_count = 0; + let rebuild_count_mut_ref = &mut rebuild_count; + let iter = AutoRebuildStateStoreReadIter::new( + state_store, + move || { + *count_mut_ref += 1; + if *count_mut_ref % rebuild_period == 0 { + *rebuild_count_mut_ref += 1; + true + } else { + false + } + }, + (Unbounded, Unbounded), + epoch, + read_options, + ) + .await + .unwrap(); + validate(kv_iter, iter).await; + assert_eq!(count, key_count + 1); // with an extra call on the last None + assert_eq!(rebuild_count, key_count / rebuild_period); + } +}