diff --git a/src/connector/src/sink/log_store.rs b/src/connector/src/sink/log_store.rs index 5102cc573ccb3..f7d99141139f5 100644 --- a/src/connector/src/sink/log_store.rs +++ b/src/connector/src/sink/log_store.rs @@ -133,6 +133,8 @@ pub trait LogReader: Send + Sized + 'static { fn init(&mut self) -> impl Future> + Send + '_; /// Emit the next item. + /// + /// The implementation should ensure that the future is cancellation safe. fn next_item( &mut self, ) -> impl Future> + Send + '_; diff --git a/src/stream/src/common/log_store_impl/kv_log_store/buffer.rs b/src/stream/src/common/log_store_impl/kv_log_store/buffer.rs index 49f8cd643e16d..ed1c495c81d75 100644 --- a/src/stream/src/common/log_store_impl/kv_log_store/buffer.rs +++ b/src/stream/src/common/log_store_impl/kv_log_store/buffer.rs @@ -54,8 +54,8 @@ struct LogStoreBufferInner { unconsumed_queue: VecDeque<(u64, LogStoreBufferItem)>, /// Items already read by log reader by not truncated. Newer item at the front consumed_queue: VecDeque<(u64, LogStoreBufferItem)>, - stream_chunk_count: usize, - max_stream_chunk_count: usize, + row_count: usize, + max_row_count: usize, truncation_list: VecDeque, @@ -64,13 +64,16 @@ struct LogStoreBufferInner { impl LogStoreBufferInner { fn can_add_stream_chunk(&self) -> bool { - self.stream_chunk_count < self.max_stream_chunk_count + self.row_count < self.max_row_count } fn add_item(&mut self, epoch: u64, item: LogStoreBufferItem) { if let LogStoreBufferItem::StreamChunk { .. } = item { unreachable!("StreamChunk should call try_add_item") } + if let LogStoreBufferItem::Barrier { .. } = &item { + self.next_chunk_id = 0; + } self.unconsumed_queue.push_front((epoch, item)); } @@ -86,7 +89,7 @@ impl LogStoreBufferInner { } else { let chunk_id = self.next_chunk_id; self.next_chunk_id += 1; - self.stream_chunk_count += 1; + self.row_count += chunk.cardinality(); self.unconsumed_queue.push_front(( epoch, LogStoreBufferItem::StreamChunk { @@ -326,6 +329,7 @@ impl LogStoreBufferReceiver { chunk_id, flushed, end_seq_id, + chunk, .. } => { let chunk_offset = TruncateOffset::Chunk { @@ -335,7 +339,7 @@ impl LogStoreBufferReceiver { let flushed = *flushed; let end_seq_id = *end_seq_id; if chunk_offset <= offset { - inner.stream_chunk_count -= 1; + inner.row_count -= chunk.cardinality(); inner.consumed_queue.pop_back(); if flushed { latest_offset = Some((epoch, Some(end_seq_id))); @@ -386,13 +390,13 @@ impl LogStoreBufferReceiver { } pub(crate) fn new_log_store_buffer( - max_stream_chunk_count: usize, + max_row_count: usize, ) -> (LogStoreBufferSender, LogStoreBufferReceiver) { let buffer = SharedMutex::new(LogStoreBufferInner { unconsumed_queue: VecDeque::new(), consumed_queue: VecDeque::new(), - stream_chunk_count: 0, - max_stream_chunk_count, + row_count: 0, + max_row_count, truncation_list: VecDeque::new(), next_chunk_id: 0, }); diff --git a/src/stream/src/common/log_store_impl/kv_log_store/mod.rs b/src/stream/src/common/log_store_impl/kv_log_store/mod.rs index 6b24d5d6228d1..3f30b3753b37c 100644 --- a/src/stream/src/common/log_store_impl/kv_log_store/mod.rs +++ b/src/stream/src/common/log_store_impl/kv_log_store/mod.rs @@ -174,7 +174,7 @@ pub struct KvLogStoreFactory { vnodes: Option>, - max_stream_chunk_count: usize, + max_row_count: usize, metrics: KvLogStoreMetrics, } @@ -184,14 +184,14 @@ impl KvLogStoreFactory { state_store: S, table_catalog: Table, vnodes: Option>, - max_stream_chunk_count: usize, + max_row_count: usize, metrics: KvLogStoreMetrics, ) -> Self { Self { state_store, table_catalog, vnodes, - max_stream_chunk_count, + max_row_count, metrics, } } @@ -218,7 +218,7 @@ impl LogStoreFactory for KvLogStoreFactory { }) .await; - let (tx, rx) = new_log_store_buffer(self.max_stream_chunk_count); + let (tx, rx) = new_log_store_buffer(self.max_row_count); let reader = KvLogStoreReader::new( table_id, @@ -236,7 +236,10 @@ impl LogStoreFactory for KvLogStoreFactory { #[cfg(test)] mod tests { + use std::future::{poll_fn, Future}; + use std::pin::pin; use std::sync::Arc; + use std::task::Poll; use risingwave_common::buffer::{Bitmap, BitmapBuilder}; use risingwave_common::hash::VirtualNode; @@ -251,18 +254,18 @@ mod tests { use crate::common::log_store_impl::kv_log_store::test_utils::{ calculate_vnode_bitmap, check_rows_eq, check_stream_chunk_eq, - gen_multi_vnode_stream_chunks, gen_stream_chunk, gen_test_log_store_table, + gen_multi_vnode_stream_chunks, gen_stream_chunk, gen_test_log_store_table, TEST_DATA_SIZE, }; use crate::common::log_store_impl::kv_log_store::{KvLogStoreFactory, KvLogStoreMetrics}; #[tokio::test] async fn test_basic() { for count in 0..20 { - test_basic_inner(count).await + test_basic_inner(count * TEST_DATA_SIZE).await } } - async fn test_basic_inner(max_stream_chunk_count: usize) { + async fn test_basic_inner(max_row_count: usize) { let test_env = prepare_hummock_test_env().await; let table = gen_test_log_store_table(); @@ -277,7 +280,7 @@ mod tests { test_env.storage.clone(), table.clone(), Some(Arc::new(bitmap)), - max_stream_chunk_count, + max_row_count, KvLogStoreMetrics::for_test(), ); let (mut reader, mut writer) = factory.build().await; @@ -350,11 +353,11 @@ mod tests { #[tokio::test] async fn test_recovery() { for count in 0..20 { - test_recovery_inner(count).await + test_recovery_inner(count * TEST_DATA_SIZE).await } } - async fn test_recovery_inner(max_stream_chunk_count: usize) { + async fn test_recovery_inner(max_row_count: usize) { let test_env = prepare_hummock_test_env().await; let table = gen_test_log_store_table(); @@ -370,7 +373,7 @@ mod tests { test_env.storage.clone(), table.clone(), Some(bitmap.clone()), - max_stream_chunk_count, + max_row_count, KvLogStoreMetrics::for_test(), ); let (mut reader, mut writer) = factory.build().await; @@ -456,7 +459,7 @@ mod tests { test_env.storage.clone(), table.clone(), Some(bitmap), - max_stream_chunk_count, + max_row_count, KvLogStoreMetrics::for_test(), ); let (mut reader, mut writer) = factory.build().await; @@ -514,7 +517,7 @@ mod tests { } } - async fn test_truncate_inner(max_stream_chunk_count: usize) { + async fn test_truncate_inner(max_row_count: usize) { let test_env = prepare_hummock_test_env().await; let table = gen_test_log_store_table(); @@ -538,7 +541,7 @@ mod tests { test_env.storage.clone(), table.clone(), Some(bitmap.clone()), - max_stream_chunk_count, + max_row_count, KvLogStoreMetrics::for_test(), ); let (mut reader, mut writer) = factory.build().await; @@ -648,7 +651,7 @@ mod tests { test_env.storage.clone(), table.clone(), Some(bitmap), - max_stream_chunk_count, + max_row_count, KvLogStoreMetrics::for_test(), ); let (mut reader, mut writer) = factory.build().await; @@ -739,14 +742,14 @@ mod tests { test_env.storage.clone(), table.clone(), Some(vnodes1), - 10, + 10 * TEST_DATA_SIZE, KvLogStoreMetrics::for_test(), ); let factory2 = KvLogStoreFactory::new( test_env.storage.clone(), table.clone(), Some(vnodes2), - 10, + 10 * TEST_DATA_SIZE, KvLogStoreMetrics::for_test(), ); let (mut reader1, mut writer1) = factory1.build().await; @@ -865,7 +868,7 @@ mod tests { test_env.storage.clone(), table.clone(), Some(vnodes), - 10, + 10 * TEST_DATA_SIZE, KvLogStoreMetrics::for_test(), ); let (mut reader, mut writer) = factory.build().await; @@ -903,4 +906,70 @@ mod tests { _ => unreachable!(), } } + + #[tokio::test] + async fn test_cancellation_safe() { + let test_env = prepare_hummock_test_env().await; + + let table = gen_test_log_store_table(); + + test_env.register_table(table.clone()).await; + + let stream_chunk1 = gen_stream_chunk(0); + let stream_chunk2 = gen_stream_chunk(10); + let bitmap = calculate_vnode_bitmap(stream_chunk1.rows().chain(stream_chunk2.rows())); + + let factory = KvLogStoreFactory::new( + test_env.storage.clone(), + table.clone(), + Some(Arc::new(bitmap)), + 0, + KvLogStoreMetrics::for_test(), + ); + let (mut reader, mut writer) = factory.build().await; + + let epoch1 = test_env + .storage + .get_pinned_version() + .version() + .max_committed_epoch + + 1; + writer + .init(EpochPair::new_test_epoch(epoch1)) + .await + .unwrap(); + writer.write_chunk(stream_chunk1.clone()).await.unwrap(); + let epoch2 = epoch1 + 1; + writer.flush_current_epoch(epoch2, true).await.unwrap(); + + reader.init().await.unwrap(); + + { + let mut future = pin!(reader.next_item()); + assert!(poll_fn(|cx| Poll::Ready(future.as_mut().poll(cx))) + .await + .is_pending()); + } + + match reader.next_item().await.unwrap() { + ( + epoch, + LogStoreReadItem::StreamChunk { + chunk: read_stream_chunk, + .. + }, + ) => { + assert_eq!(epoch, epoch1); + assert!(check_stream_chunk_eq(&stream_chunk1, &read_stream_chunk)); + } + _ => unreachable!(), + } + match reader.next_item().await.unwrap() { + (epoch, LogStoreReadItem::Barrier { is_checkpoint }) => { + assert_eq!(epoch, epoch1); + assert!(is_checkpoint) + } + _ => unreachable!(), + } + } } diff --git a/src/stream/src/common/log_store_impl/kv_log_store/reader.rs b/src/stream/src/common/log_store_impl/kv_log_store/reader.rs index 38069b5994e57..cb7fc402168d4 100644 --- a/src/stream/src/common/log_store_impl/kv_log_store/reader.rs +++ b/src/stream/src/common/log_store_impl/kv_log_store/reader.rs @@ -17,13 +17,15 @@ use std::pin::Pin; use anyhow::anyhow; use bytes::Bytes; -use futures::future::try_join_all; +use futures::future::{try_join_all, BoxFuture}; use futures::stream::select_all; +use futures::FutureExt; +use risingwave_common::array::StreamChunk; use risingwave_common::cache::CachePriority; use risingwave_common::catalog::TableId; use risingwave_common::hash::VnodeBitmapExt; use risingwave_connector::sink::log_store::{ - LogReader, LogStoreReadItem, LogStoreResult, TruncateOffset, + ChunkId, LogReader, LogStoreReadItem, LogStoreResult, TruncateOffset, }; use risingwave_hummock_sdk::key::TableKey; use risingwave_storage::hummock::CachePolicy; @@ -54,6 +56,14 @@ pub struct KvLogStoreReader { /// `Some` means consuming historical log data state_store_stream: Option>>>, + /// Store the future that attempts to read a flushed stream chunk. + /// This is for cancellation safety. Since it is possible that the future of `next_item` + /// gets dropped after it takes an flushed item out from the buffer, but before it successfully + /// read the stream chunk out from the storage. Therefore we store the future so that it can continue + /// reading the stream chunk after the next `next_item` is called. + read_flushed_chunk_future: + Option>>, + latest_offset: TruncateOffset, truncate_offset: TruncateOffset, @@ -74,6 +84,7 @@ impl KvLogStoreReader { state_store, serde, rx, + read_flushed_chunk_future: None, first_write_epoch: None, state_store_stream: None, latest_offset: TruncateOffset::Barrier { epoch: 0 }, @@ -81,6 +92,20 @@ impl KvLogStoreReader { metrics, } } + + async fn may_continue_read_flushed_chunk( + &mut self, + ) -> LogStoreResult> { + if let Some(future) = self.read_flushed_chunk_future.as_mut() { + let result = future.await; + self.read_flushed_chunk_future + .take() + .expect("future not None"); + Ok(Some(result?)) + } else { + Ok(None) + } + } } impl LogReader for KvLogStoreReader { @@ -146,6 +171,22 @@ impl LogReader for KvLogStoreReader { } } + // It is possible that the future gets dropped after it pops a flushed + // item but before it reads a stream chunk. Therefore, we may continue + // driving the future to continue reading the stream chunk. + if let Some((chunk_id, chunk, item_epoch)) = self.may_continue_read_flushed_chunk().await? { + let offset = TruncateOffset::Chunk { + epoch: item_epoch, + chunk_id, + }; + assert!(offset > self.latest_offset); + self.latest_offset = offset; + return Ok(( + item_epoch, + LogStoreReadItem::StreamChunk { chunk, chunk_id }, + )); + } + // Now the historical state store has been consumed. let (item_epoch, item) = self.rx.next_item().await; self.latest_offset.check_next_item_epoch(item_epoch)?; @@ -170,46 +211,78 @@ impl LogReader for KvLogStoreReader { end_seq_id, chunk_id, } => { - let streams = try_join_all(vnode_bitmap.iter_vnodes().map(|vnode| { - let range_start = - self.serde - .serialize_log_store_pk(vnode, item_epoch, Some(start_seq_id)); - let range_end = - self.serde - .serialize_log_store_pk(vnode, item_epoch, Some(end_seq_id)); + let read_flushed_chunk_future = { + let serde = self.serde.clone(); let state_store = self.state_store.clone(); let table_id = self.table_id; - // Use u64::MAX here because the epoch to consume may be below the safe - // epoch + let read_metrics = self.metrics.flushed_buffer_read_metrics.clone(); async move { - Ok::<_, anyhow::Error>(Box::pin( - state_store - .iter( - (Included(range_start), Included(range_end)), - u64::MAX, - ReadOptions { - prefetch_options: PrefetchOptions::new_for_exhaust_iter(), - cache_policy: CachePolicy::Fill(CachePriority::Low), - table_id, - ..Default::default() - }, - ) - .await?, - )) + let streams = try_join_all(vnode_bitmap.iter_vnodes().map(|vnode| { + let range_start = + serde.serialize_log_store_pk(vnode, item_epoch, Some(start_seq_id)); + let range_end = + serde.serialize_log_store_pk(vnode, item_epoch, Some(end_seq_id)); + let state_store = &state_store; + + // Use u64::MAX here because the epoch to consume may be below the safe + // epoch + async move { + Ok::<_, anyhow::Error>(Box::pin( + state_store + .iter( + (Included(range_start), Included(range_end)), + u64::MAX, + ReadOptions { + prefetch_options: + PrefetchOptions::new_for_exhaust_iter(), + cache_policy: CachePolicy::Fill(CachePriority::Low), + table_id, + ..Default::default() + }, + ) + .await?, + )) + } + })) + .await?; + let combined_stream = select_all(streams); + + let chunk = serde + .deserialize_stream_chunk( + combined_stream, + start_seq_id, + end_seq_id, + item_epoch, + &read_metrics, + ) + .await?; + + Ok((chunk_id, chunk, item_epoch)) } - })) - .await?; - let combined_stream = select_all(streams); - let chunk = self - .serde - .deserialize_stream_chunk( - combined_stream, - start_seq_id, - end_seq_id, - item_epoch, - &self.metrics.flushed_buffer_read_metrics, - ) - .await?; + .boxed() + }; + + // Store the future in case that in the subsequent pending await point, + // the future is cancelled, and we lose an flushed item. + assert!(self + .read_flushed_chunk_future + .replace(read_flushed_chunk_future) + .is_none()); + + // for cancellation test + #[cfg(test)] + { + use std::time::Duration; + + use tokio::time::sleep; + sleep(Duration::from_secs(1)).await; + } + + let (_, chunk, _) = self + .may_continue_read_flushed_chunk() + .await? + .expect("future just insert. unlikely to be none"); + let offset = TruncateOffset::Chunk { epoch: item_epoch, chunk_id, diff --git a/src/stream/src/common/log_store_impl/kv_log_store/test_utils.rs b/src/stream/src/common/log_store_impl/kv_log_store/test_utils.rs index 829f52a8a9d6a..809b5b42129d2 100644 --- a/src/stream/src/common/log_store_impl/kv_log_store/test_utils.rs +++ b/src/stream/src/common/log_store_impl/kv_log_store/test_utils.rs @@ -28,9 +28,10 @@ use risingwave_pb::catalog::PbTable; use crate::common::table::test_utils::gen_prost_table_with_dist_key; pub(crate) const TEST_TABLE_ID: TableId = TableId { table_id: 233 }; +pub(crate) const TEST_DATA_SIZE: usize = 10; pub(crate) fn gen_test_data(base: i64) -> (Vec, Vec) { - gen_sized_test_data(base, 10) + gen_sized_test_data(base, TEST_DATA_SIZE) } pub(crate) fn gen_sized_test_data(base: i64, max_count: usize) -> (Vec, Vec) { diff --git a/src/stream/src/from_proto/sink.rs b/src/stream/src/from_proto/sink.rs index e9c60c61b09e9..298a0642710cf 100644 --- a/src/stream/src/from_proto/sink.rs +++ b/src/stream/src/from_proto/sink.rs @@ -148,12 +148,13 @@ impl ExecutorBuilder for SinkExecutorBuilder { &sink_param, connector, ); + // TODO: support setting max row count in config dispatch_state_store!(params.env.state_store(), state_store, { let factory = KvLogStoreFactory::new( state_store, node.table.as_ref().unwrap().clone(), params.vnode_bitmap.clone().map(Arc::new), - 32, + 65536, metrics, );