Skip to content

Commit

Permalink
fix(log-store): ensure cancellation safety for log reader next_item c…
Browse files Browse the repository at this point in the history
…all (#12725) (#12744)

Co-authored-by: William Wen <[email protected]>
  • Loading branch information
github-actions[bot] and wenym1 authored Oct 10, 2023
1 parent cb5d5a3 commit 9c1192d
Show file tree
Hide file tree
Showing 6 changed files with 216 additions and 66 deletions.
2 changes: 2 additions & 0 deletions src/connector/src/sink/log_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ pub trait LogReader: Send + Sized + 'static {
fn init(&mut self) -> impl Future<Output = LogStoreResult<()>> + Send + '_;

/// Emit the next item.
///
/// The implementation should ensure that the future is cancellation safe.
fn next_item(
&mut self,
) -> impl Future<Output = LogStoreResult<(u64, LogStoreReadItem)>> + Send + '_;
Expand Down
20 changes: 12 additions & 8 deletions src/stream/src/common/log_store_impl/kv_log_store/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ struct LogStoreBufferInner {
unconsumed_queue: VecDeque<(u64, LogStoreBufferItem)>,
/// Items already read by log reader by not truncated. Newer item at the front
consumed_queue: VecDeque<(u64, LogStoreBufferItem)>,
stream_chunk_count: usize,
max_stream_chunk_count: usize,
row_count: usize,
max_row_count: usize,

truncation_list: VecDeque<ReaderTruncationOffsetType>,

Expand All @@ -64,13 +64,16 @@ struct LogStoreBufferInner {

impl LogStoreBufferInner {
fn can_add_stream_chunk(&self) -> bool {
self.stream_chunk_count < self.max_stream_chunk_count
self.row_count < self.max_row_count
}

fn add_item(&mut self, epoch: u64, item: LogStoreBufferItem) {
if let LogStoreBufferItem::StreamChunk { .. } = item {
unreachable!("StreamChunk should call try_add_item")
}
if let LogStoreBufferItem::Barrier { .. } = &item {
self.next_chunk_id = 0;
}
self.unconsumed_queue.push_front((epoch, item));
}

Expand All @@ -86,7 +89,7 @@ impl LogStoreBufferInner {
} else {
let chunk_id = self.next_chunk_id;
self.next_chunk_id += 1;
self.stream_chunk_count += 1;
self.row_count += chunk.cardinality();
self.unconsumed_queue.push_front((
epoch,
LogStoreBufferItem::StreamChunk {
Expand Down Expand Up @@ -326,6 +329,7 @@ impl LogStoreBufferReceiver {
chunk_id,
flushed,
end_seq_id,
chunk,
..
} => {
let chunk_offset = TruncateOffset::Chunk {
Expand All @@ -335,7 +339,7 @@ impl LogStoreBufferReceiver {
let flushed = *flushed;
let end_seq_id = *end_seq_id;
if chunk_offset <= offset {
inner.stream_chunk_count -= 1;
inner.row_count -= chunk.cardinality();
inner.consumed_queue.pop_back();
if flushed {
latest_offset = Some((epoch, Some(end_seq_id)));
Expand Down Expand Up @@ -386,13 +390,13 @@ impl LogStoreBufferReceiver {
}

pub(crate) fn new_log_store_buffer(
max_stream_chunk_count: usize,
max_row_count: usize,
) -> (LogStoreBufferSender, LogStoreBufferReceiver) {
let buffer = SharedMutex::new(LogStoreBufferInner {
unconsumed_queue: VecDeque::new(),
consumed_queue: VecDeque::new(),
stream_chunk_count: 0,
max_stream_chunk_count,
row_count: 0,
max_row_count,
truncation_list: VecDeque::new(),
next_chunk_id: 0,
});
Expand Down
105 changes: 87 additions & 18 deletions src/stream/src/common/log_store_impl/kv_log_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ pub struct KvLogStoreFactory<S: StateStore> {

vnodes: Option<Arc<Bitmap>>,

max_stream_chunk_count: usize,
max_row_count: usize,

metrics: KvLogStoreMetrics,
}
Expand All @@ -184,14 +184,14 @@ impl<S: StateStore> KvLogStoreFactory<S> {
state_store: S,
table_catalog: Table,
vnodes: Option<Arc<Bitmap>>,
max_stream_chunk_count: usize,
max_row_count: usize,
metrics: KvLogStoreMetrics,
) -> Self {
Self {
state_store,
table_catalog,
vnodes,
max_stream_chunk_count,
max_row_count,
metrics,
}
}
Expand All @@ -218,7 +218,7 @@ impl<S: StateStore> LogStoreFactory for KvLogStoreFactory<S> {
})
.await;

let (tx, rx) = new_log_store_buffer(self.max_stream_chunk_count);
let (tx, rx) = new_log_store_buffer(self.max_row_count);

let reader = KvLogStoreReader::new(
table_id,
Expand All @@ -236,7 +236,10 @@ impl<S: StateStore> LogStoreFactory for KvLogStoreFactory<S> {

#[cfg(test)]
mod tests {
use std::future::{poll_fn, Future};
use std::pin::pin;
use std::sync::Arc;
use std::task::Poll;

use risingwave_common::buffer::{Bitmap, BitmapBuilder};
use risingwave_common::hash::VirtualNode;
Expand All @@ -251,18 +254,18 @@ mod tests {

use crate::common::log_store_impl::kv_log_store::test_utils::{
calculate_vnode_bitmap, check_rows_eq, check_stream_chunk_eq,
gen_multi_vnode_stream_chunks, gen_stream_chunk, gen_test_log_store_table,
gen_multi_vnode_stream_chunks, gen_stream_chunk, gen_test_log_store_table, TEST_DATA_SIZE,
};
use crate::common::log_store_impl::kv_log_store::{KvLogStoreFactory, KvLogStoreMetrics};

#[tokio::test]
async fn test_basic() {
for count in 0..20 {
test_basic_inner(count).await
test_basic_inner(count * TEST_DATA_SIZE).await
}
}

async fn test_basic_inner(max_stream_chunk_count: usize) {
async fn test_basic_inner(max_row_count: usize) {
let test_env = prepare_hummock_test_env().await;

let table = gen_test_log_store_table();
Expand All @@ -277,7 +280,7 @@ mod tests {
test_env.storage.clone(),
table.clone(),
Some(Arc::new(bitmap)),
max_stream_chunk_count,
max_row_count,
KvLogStoreMetrics::for_test(),
);
let (mut reader, mut writer) = factory.build().await;
Expand Down Expand Up @@ -350,11 +353,11 @@ mod tests {
#[tokio::test]
async fn test_recovery() {
for count in 0..20 {
test_recovery_inner(count).await
test_recovery_inner(count * TEST_DATA_SIZE).await
}
}

async fn test_recovery_inner(max_stream_chunk_count: usize) {
async fn test_recovery_inner(max_row_count: usize) {
let test_env = prepare_hummock_test_env().await;

let table = gen_test_log_store_table();
Expand All @@ -370,7 +373,7 @@ mod tests {
test_env.storage.clone(),
table.clone(),
Some(bitmap.clone()),
max_stream_chunk_count,
max_row_count,
KvLogStoreMetrics::for_test(),
);
let (mut reader, mut writer) = factory.build().await;
Expand Down Expand Up @@ -456,7 +459,7 @@ mod tests {
test_env.storage.clone(),
table.clone(),
Some(bitmap),
max_stream_chunk_count,
max_row_count,
KvLogStoreMetrics::for_test(),
);
let (mut reader, mut writer) = factory.build().await;
Expand Down Expand Up @@ -514,7 +517,7 @@ mod tests {
}
}

async fn test_truncate_inner(max_stream_chunk_count: usize) {
async fn test_truncate_inner(max_row_count: usize) {
let test_env = prepare_hummock_test_env().await;

let table = gen_test_log_store_table();
Expand All @@ -538,7 +541,7 @@ mod tests {
test_env.storage.clone(),
table.clone(),
Some(bitmap.clone()),
max_stream_chunk_count,
max_row_count,
KvLogStoreMetrics::for_test(),
);
let (mut reader, mut writer) = factory.build().await;
Expand Down Expand Up @@ -648,7 +651,7 @@ mod tests {
test_env.storage.clone(),
table.clone(),
Some(bitmap),
max_stream_chunk_count,
max_row_count,
KvLogStoreMetrics::for_test(),
);
let (mut reader, mut writer) = factory.build().await;
Expand Down Expand Up @@ -739,14 +742,14 @@ mod tests {
test_env.storage.clone(),
table.clone(),
Some(vnodes1),
10,
10 * TEST_DATA_SIZE,
KvLogStoreMetrics::for_test(),
);
let factory2 = KvLogStoreFactory::new(
test_env.storage.clone(),
table.clone(),
Some(vnodes2),
10,
10 * TEST_DATA_SIZE,
KvLogStoreMetrics::for_test(),
);
let (mut reader1, mut writer1) = factory1.build().await;
Expand Down Expand Up @@ -865,7 +868,7 @@ mod tests {
test_env.storage.clone(),
table.clone(),
Some(vnodes),
10,
10 * TEST_DATA_SIZE,
KvLogStoreMetrics::for_test(),
);
let (mut reader, mut writer) = factory.build().await;
Expand Down Expand Up @@ -903,4 +906,70 @@ mod tests {
_ => unreachable!(),
}
}

#[tokio::test]
async fn test_cancellation_safe() {
let test_env = prepare_hummock_test_env().await;

let table = gen_test_log_store_table();

test_env.register_table(table.clone()).await;

let stream_chunk1 = gen_stream_chunk(0);
let stream_chunk2 = gen_stream_chunk(10);
let bitmap = calculate_vnode_bitmap(stream_chunk1.rows().chain(stream_chunk2.rows()));

let factory = KvLogStoreFactory::new(
test_env.storage.clone(),
table.clone(),
Some(Arc::new(bitmap)),
0,
KvLogStoreMetrics::for_test(),
);
let (mut reader, mut writer) = factory.build().await;

let epoch1 = test_env
.storage
.get_pinned_version()
.version()
.max_committed_epoch
+ 1;
writer
.init(EpochPair::new_test_epoch(epoch1))
.await
.unwrap();
writer.write_chunk(stream_chunk1.clone()).await.unwrap();
let epoch2 = epoch1 + 1;
writer.flush_current_epoch(epoch2, true).await.unwrap();

reader.init().await.unwrap();

{
let mut future = pin!(reader.next_item());
assert!(poll_fn(|cx| Poll::Ready(future.as_mut().poll(cx)))
.await
.is_pending());
}

match reader.next_item().await.unwrap() {
(
epoch,
LogStoreReadItem::StreamChunk {
chunk: read_stream_chunk,
..
},
) => {
assert_eq!(epoch, epoch1);
assert!(check_stream_chunk_eq(&stream_chunk1, &read_stream_chunk));
}
_ => unreachable!(),
}
match reader.next_item().await.unwrap() {
(epoch, LogStoreReadItem::Barrier { is_checkpoint }) => {
assert_eq!(epoch, epoch1);
assert!(is_checkpoint)
}
_ => unreachable!(),
}
}
}
Loading

0 comments on commit 9c1192d

Please sign in to comment.