Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: cherry-pick rebuild log store iter when exists for a timeout (#17009) #17055

Merged
merged 1 commit into from
Jun 1, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
271 changes: 251 additions & 20 deletions src/stream/src/common/log_store_impl/kv_log_store/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@
// limitations under the License.

use std::future::Future;
use std::ops::Bound;
use std::ops::Bound::{Excluded, Included, Unbounded};
use std::pin::Pin;
use std::time::Duration;
use std::time::{Duration, Instant};

use anyhow::anyhow;
use await_tree::InstrumentAwait;
use bytes::Bytes;
use foyer::memory::CacheContext;
use futures::future::{try_join_all, BoxFuture};
use futures::{FutureExt, TryFutureExt};
Expand All @@ -31,11 +33,14 @@ use risingwave_common::util::epoch::EpochExt;
use risingwave_connector::sink::log_store::{
ChunkId, LogReader, LogStoreReadItem, LogStoreResult, TruncateOffset,
};
use risingwave_hummock_sdk::key::prefixed_range_with_vnode;
use risingwave_hummock_sdk::key::{prefixed_range_with_vnode, FullKey, TableKey, TableKeyRange};
use risingwave_hummock_sdk::HummockEpoch;
use risingwave_storage::error::StorageResult;
use risingwave_storage::hummock::CachePolicy;
use risingwave_storage::store::{PrefetchOptions, ReadOptions};
use risingwave_storage::StateStore;
use risingwave_storage::store::{
PrefetchOptions, ReadOptions, StateStoreIterItemRef, StateStoreRead,
};
use risingwave_storage::{StateStore, StateStoreIter};
use tokio::sync::watch;
use tokio::time::sleep;
use tokio_stream::StreamExt;
Expand Down Expand Up @@ -113,7 +118,7 @@ pub struct KvLogStoreReader<S: StateStore> {
first_write_epoch: Option<u64>,

/// `Some` means consuming historical log data
state_store_stream: Option<Pin<Box<LogStoreItemMergeStream<S::Iter>>>>,
state_store_stream: Option<Pin<Box<LogStoreItemMergeStream<TimeoutAutoRebuildIter<S>>>>>,

/// Store the future that attempts to read a flushed stream chunk.
/// This is for cancellation safety. Since it is possible that the future of `next_item`
Expand Down Expand Up @@ -180,12 +185,141 @@ impl<S: StateStore> KvLogStoreReader<S> {
}
}

struct AutoRebuildStateStoreReadIter<S: StateStoreRead, F> {
state_store: S,
iter: S::Iter,
// call to get whether to rebuild the iter. Once return true, the closure should reset itself.
should_rebuild: F,
end_bound: Bound<TableKey<Bytes>>,
epoch: HummockEpoch,
options: ReadOptions,
}

impl<S: StateStoreRead, F: FnMut() -> bool> AutoRebuildStateStoreReadIter<S, F> {
async fn new(
state_store: S,
should_rebuild: F,
range: TableKeyRange,
epoch: HummockEpoch,
options: ReadOptions,
) -> StorageResult<Self> {
let (start_bound, end_bound) = range;
let iter = state_store
.iter((start_bound, end_bound.clone()), epoch, options.clone())
.await?;
Ok(Self {
state_store,
iter,
should_rebuild,
end_bound,
epoch,
options,
})
}
}

type TimeoutAutoRebuildIter<S: StateStoreRead> =
AutoRebuildStateStoreReadIter<S, impl FnMut() -> bool + Send>;

async fn iter_with_timeout_rebuild<S: StateStoreRead>(
state_store: S,
range: TableKeyRange,
epoch: HummockEpoch,
options: ReadOptions,
timeout: Duration,
) -> StorageResult<TimeoutAutoRebuildIter<S>> {
const CHECK_TIMEOUT_PERIOD: usize = 100;
// use a struct here to avoid accidental copy instead of move on primitive usize
struct Count(usize);
let mut check_count = Count(0);
let mut total_count = Count(0);
let mut curr_iter_item_count = Count(0);
let mut start_time = Instant::now();
let initial_start_time = start_time;
AutoRebuildStateStoreReadIter::new(
state_store,
move || {
check_count.0 += 1;
curr_iter_item_count.0 += 1;
total_count.0 += 1;
if check_count.0 == CHECK_TIMEOUT_PERIOD {
check_count.0 = 0;
if start_time.elapsed() > timeout {
let prev_iter_item_count = curr_iter_item_count.0;
curr_iter_item_count.0 = 0;
start_time = Instant::now();
info!(
table_id = options.table_id.table_id,
iter_exist_time_secs = initial_start_time.elapsed().as_secs(),
prev_iter_item_count,
total_iter_item_count = total_count.0,
"kv log store iter is rebuilt"
);
true
} else {
false
}
} else {
false
}
},
range,
epoch,
options,
)
.await
}

impl<S: StateStoreRead, F: FnMut() -> bool + Send> StateStoreIter
for AutoRebuildStateStoreReadIter<S, F>
{
async fn try_next(&mut self) -> StorageResult<Option<StateStoreIterItemRef<'_>>> {
let should_rebuild = (self.should_rebuild)();
if should_rebuild {
let Some((key, _value)) = self.iter.try_next().await? else {
return Ok(None);
};
let key: FullKey<&[u8]> = key;
let range_start = Bytes::copy_from_slice(key.user_key.table_key.as_ref());
let new_iter = self
.state_store
.iter(
(
Included(TableKey(range_start.clone())),
self.end_bound.clone(),
),
self.epoch,
self.options.clone(),
)
.await?;
self.iter = new_iter;
let item: Option<StateStoreIterItemRef<'_>> = self.iter.try_next().await?;
if let Some((key, value)) = item {
assert_eq!(
key.user_key.table_key.0,
range_start.as_ref(),
"the first key should be the previous key"
);
Ok(Some((key, value)))
} else {
unreachable!(
"the first key should be the previous key {:?}, but get None",
range_start
)
}
} else {
self.iter.try_next().await
}
}
}

impl<S: StateStore> KvLogStoreReader<S> {
fn read_persisted_log_store(
&self,
last_persisted_epoch: Option<u64>,
) -> impl Future<Output = LogStoreResult<Pin<Box<LogStoreItemMergeStream<S::Iter>>>>> + Send
{
) -> impl Future<
Output = LogStoreResult<Pin<Box<LogStoreItemMergeStream<TimeoutAutoRebuildIter<S>>>>>,
> + Send {
let range_start = if let Some(last_persisted_epoch) = last_persisted_epoch {
// start from the next epoch of last_persisted_epoch
Included(
Expand All @@ -210,19 +344,21 @@ impl<S: StateStore> KvLogStoreReader<S> {
);
let state_store = self.state_store.clone();
async move {
state_store
.iter(
key_range,
HummockEpoch::MAX,
ReadOptions {
// This stream lives too long, the connection of prefetch object may break. So use a short connection prefetch.
prefetch_options: PrefetchOptions::prefetch_for_small_range_scan(),
cache_policy: CachePolicy::Fill(CacheContext::LruPriorityLow),
table_id,
..Default::default()
},
)
.await
// rebuild the iter every 10 minutes to avoid pinning hummock version for too long
iter_with_timeout_rebuild(
state_store,
key_range,
HummockEpoch::MAX,
ReadOptions {
// This stream lives too long, the connection of prefetch object may break. So use a short connection prefetch.
prefetch_options: PrefetchOptions::prefetch_for_small_range_scan(),
cache_policy: CachePolicy::Fill(CacheContext::LruPriorityLow),
table_id,
..Default::default()
},
Duration::from_secs(10 * 60),
)
.await
}
}));

Expand Down Expand Up @@ -500,3 +636,98 @@ impl<S: StateStore> LogReader for KvLogStoreReader<S> {
Ok((true, Some((**self.serde.vnodes()).clone())))
}
}

#[cfg(test)]
mod tests {
use std::ops::Bound::Unbounded;

use bytes::Bytes;
use itertools::Itertools;
use risingwave_common::util::epoch::test_epoch;
use risingwave_hummock_sdk::key::TableKey;
use risingwave_storage::hummock::iterator::test_utils::{
iterator_test_table_key_of, iterator_test_value_of,
};
use risingwave_storage::memory::MemoryStateStore;
use risingwave_storage::storage_value::StorageValue;
use risingwave_storage::store::{ReadOptions, StateStoreRead, StateStoreWrite, WriteOptions};
use risingwave_storage::StateStoreIter;

use crate::common::log_store_impl::kv_log_store::reader::AutoRebuildStateStoreReadIter;
use crate::common::log_store_impl::kv_log_store::test_utils::TEST_TABLE_ID;

#[tokio::test]
async fn test_auto_rebuild_iter() {
let state_store = MemoryStateStore::new();
let key_count = 100;
let pairs = (0..key_count)
.map(|i| {
let key = iterator_test_table_key_of(i);
let value = iterator_test_value_of(i);
(TableKey(Bytes::from(key)), StorageValue::new_put(value))
})
.collect_vec();
let epoch = test_epoch(1);
state_store
.ingest_batch(
pairs.clone(),
vec![],
WriteOptions {
epoch,
table_id: TEST_TABLE_ID,
},
)
.unwrap();

async fn validate(
mut kv_iter: impl Iterator<Item = (TableKey<Bytes>, StorageValue)>,
mut iter: impl StateStoreIter,
) {
while let Some((key, value)) = iter.try_next().await.unwrap() {
let (k, v) = kv_iter.next().unwrap();
assert_eq!(key.user_key.table_key, k.to_ref());
assert_eq!(v.user_value.as_deref(), Some(value));
}
assert!(kv_iter.next().is_none());
}

let read_options = ReadOptions {
table_id: TEST_TABLE_ID,
..Default::default()
};

let kv_iter = pairs.clone().into_iter();
let iter = state_store
.iter((Unbounded, Unbounded), epoch, read_options.clone())
.await
.unwrap();
validate(kv_iter, iter).await;

let kv_iter = pairs.clone().into_iter();
let mut count = 0;
let count_mut_ref = &mut count;
let rebuild_period = 8;
let mut rebuild_count = 0;
let rebuild_count_mut_ref = &mut rebuild_count;
let iter = AutoRebuildStateStoreReadIter::new(
state_store,
move || {
*count_mut_ref += 1;
if *count_mut_ref % rebuild_period == 0 {
*rebuild_count_mut_ref += 1;
true
} else {
false
}
},
(Unbounded, Unbounded),
epoch,
read_options,
)
.await
.unwrap();
validate(kv_iter, iter).await;
assert_eq!(count, key_count + 1); // with an extra call on the last None
assert_eq!(rebuild_count, key_count / rebuild_period);
}
}
Loading