diff --git a/src/stream/src/common/log_store_impl/kv_log_store/buffer.rs b/src/stream/src/common/log_store_impl/kv_log_store/buffer.rs index cfe7404786ece..85926a82373da 100644 --- a/src/stream/src/common/log_store_impl/kv_log_store/buffer.rs +++ b/src/stream/src/common/log_store_impl/kv_log_store/buffer.rs @@ -199,6 +199,16 @@ impl LogStoreBufferInner { self.update_unconsumed_buffer_metrics(); } + fn add_truncate_offset(&mut self, (epoch, seq_id): ReaderTruncationOffsetType) { + if let Some((prev_epoch, ref mut prev_seq_id)) = self.truncation_list.back_mut() + && *prev_epoch == epoch + { + *prev_seq_id = seq_id; + } else { + self.truncation_list.push_back((epoch, seq_id)); + } + } + fn rewind(&mut self) { while let Some((epoch, item)) = self.consumed_queue.pop_front() { self.unconsumed_queue.push_back((epoch, item)); @@ -371,7 +381,7 @@ impl LogStoreBufferReceiver { } } - pub(crate) fn truncate(&mut self, offset: TruncateOffset) { + pub(crate) fn truncate_buffer(&mut self, offset: TruncateOffset) { let mut inner = self.buffer.inner(); let mut latest_offset: Option = None; while let Some((epoch, item)) = inner.consumed_queue.back() { @@ -431,17 +441,16 @@ impl LogStoreBufferReceiver { } } } - if let Some((epoch, seq_id)) = latest_offset { - if let Some((prev_epoch, ref mut prev_seq_id)) = inner.truncation_list.back_mut() - && *prev_epoch == epoch - { - *prev_seq_id = seq_id; - } else { - inner.truncation_list.push_back((epoch, seq_id)); - } + if let Some(offset) = latest_offset { + inner.add_truncate_offset(offset); } } + pub(crate) fn truncate_historical(&mut self, epoch: u64) { + let mut inner = self.buffer.inner(); + inner.add_truncate_offset((epoch, None)); + } + pub(crate) fn rewind(&self) { self.buffer.inner().rewind() } diff --git a/src/stream/src/common/log_store_impl/kv_log_store/mod.rs b/src/stream/src/common/log_store_impl/kv_log_store/mod.rs index d1574a71debc0..3db9723e2ba14 100644 --- a/src/stream/src/common/log_store_impl/kv_log_store/mod.rs +++ b/src/stream/src/common/log_store_impl/kv_log_store/mod.rs @@ -1555,4 +1555,229 @@ mod tests { let chunk_ids = check_reader_last_unsealed(&mut reader, empty()).await; assert!(chunk_ids.is_empty()); } + + async fn validate_reader( + reader: &mut impl LogReader, + expected: impl IntoIterator, + ) { + for (expected_epoch, expected_item) in expected { + let (epoch, item) = reader.next_item().await.unwrap(); + assert_eq!(expected_epoch, epoch); + match (expected_item, item) { + ( + LogStoreReadItem::StreamChunk { + chunk: expected_chunk, + .. + }, + LogStoreReadItem::StreamChunk { chunk, .. }, + ) => { + check_stream_chunk_eq(&expected_chunk, &chunk); + } + ( + LogStoreReadItem::Barrier { + is_checkpoint: expected_is_checkpoint, + }, + LogStoreReadItem::Barrier { is_checkpoint }, + ) => { + assert_eq!(expected_is_checkpoint, is_checkpoint); + } + _ => unreachable!(), + } + } + } + + #[tokio::test] + async fn test_truncate_historical() { + #[expect(deprecated)] + test_truncate_historical_inner( + 10, + &crate::common::log_store_impl::kv_log_store::v1::KV_LOG_STORE_V1_INFO, + ) + .await; + test_truncate_historical_inner(10, &KV_LOG_STORE_V2_INFO).await; + } + + async fn test_truncate_historical_inner( + max_row_count: usize, + pk_info: &'static KvLogStorePkInfo, + ) { + let gen_stream_chunk = |base| gen_stream_chunk_with_info(base, pk_info); + let test_env = prepare_hummock_test_env().await; + + let table = gen_test_log_store_table(pk_info); + + test_env.register_table(table.clone()).await; + + let stream_chunk1 = gen_stream_chunk(0); + let stream_chunk2 = gen_stream_chunk(10); + let bitmap = calculate_vnode_bitmap(stream_chunk1.rows().chain(stream_chunk2.rows())); + let bitmap = Arc::new(bitmap); + + let factory = KvLogStoreFactory::new( + test_env.storage.clone(), + table.clone(), + Some(bitmap.clone()), + max_row_count, + KvLogStoreMetrics::for_test(), + "test", + pk_info, + ); + let (mut reader, mut writer) = factory.build().await; + + let epoch1 = test_env + .storage + .get_pinned_version() + .version() + .max_committed_epoch + .next_epoch(); + writer + .init(EpochPair::new_test_epoch(epoch1), false) + .await + .unwrap(); + writer.write_chunk(stream_chunk1.clone()).await.unwrap(); + let epoch2 = epoch1.next_epoch(); + writer.flush_current_epoch(epoch2, false).await.unwrap(); + writer.write_chunk(stream_chunk2.clone()).await.unwrap(); + let epoch3 = epoch2.next_epoch(); + writer.flush_current_epoch(epoch3, true).await.unwrap(); + + test_env.storage.seal_epoch(epoch1, false); + test_env.commit_epoch(epoch2).await; + + reader.init().await.unwrap(); + validate_reader( + &mut reader, + [ + ( + epoch1, + LogStoreReadItem::StreamChunk { + chunk: stream_chunk1.clone(), + chunk_id: 0, + }, + ), + ( + epoch1, + LogStoreReadItem::Barrier { + is_checkpoint: false, + }, + ), + ( + epoch2, + LogStoreReadItem::StreamChunk { + chunk: stream_chunk2.clone(), + chunk_id: 0, + }, + ), + ( + epoch2, + LogStoreReadItem::Barrier { + is_checkpoint: true, + }, + ), + ], + ) + .await; + + drop(writer); + + // Recovery + test_env.storage.clear_shared_buffer(epoch2).await; + + // Rebuild log reader and writer in recovery + let factory = KvLogStoreFactory::new( + test_env.storage.clone(), + table.clone(), + Some(bitmap.clone()), + max_row_count, + KvLogStoreMetrics::for_test(), + "test", + pk_info, + ); + let (mut reader, mut writer) = factory.build().await; + writer + .init(EpochPair::new_test_epoch(epoch3), false) + .await + .unwrap(); + reader.init().await.unwrap(); + validate_reader( + &mut reader, + [ + ( + epoch1, + LogStoreReadItem::StreamChunk { + chunk: stream_chunk1.clone(), + chunk_id: 0, + }, + ), + ( + epoch1, + LogStoreReadItem::Barrier { + is_checkpoint: false, + }, + ), + ( + epoch2, + LogStoreReadItem::StreamChunk { + chunk: stream_chunk2.clone(), + chunk_id: 0, + }, + ), + ( + epoch2, + LogStoreReadItem::Barrier { + is_checkpoint: true, + }, + ), + ], + ) + .await; + // The truncate should take effect + reader + .truncate(TruncateOffset::Barrier { epoch: epoch1 }) + .unwrap(); + let epoch4 = epoch3.next_epoch(); + writer.flush_current_epoch(epoch4, true).await.unwrap(); + test_env.commit_epoch(epoch3).await; + + drop(writer); + + // Recovery + test_env.storage.clear_shared_buffer(epoch3).await; + + // Rebuild log reader and writer in recovery + let factory = KvLogStoreFactory::new( + test_env.storage.clone(), + table.clone(), + Some(bitmap), + max_row_count, + KvLogStoreMetrics::for_test(), + "test", + pk_info, + ); + let (mut reader, mut writer) = factory.build().await; + writer + .init(EpochPair::new_test_epoch(epoch4), false) + .await + .unwrap(); + reader.init().await.unwrap(); + validate_reader( + &mut reader, + [ + ( + epoch2, + LogStoreReadItem::StreamChunk { + chunk: stream_chunk2.clone(), + chunk_id: 0, + }, + ), + ( + epoch2, + LogStoreReadItem::Barrier { + is_checkpoint: true, + }, + ), + ], + ) + .await; + } } diff --git a/src/stream/src/common/log_store_impl/kv_log_store/reader.rs b/src/stream/src/common/log_store_impl/kv_log_store/reader.rs index 7cad46b5b5fc0..21ee99ec91d08 100644 --- a/src/stream/src/common/log_store_impl/kv_log_store/reader.rs +++ b/src/stream/src/common/log_store_impl/kv_log_store/reader.rs @@ -465,11 +465,11 @@ impl LogReader for KvLogStoreReader { } } if offset.epoch() >= self.first_write_epoch.expect("should have init") { - self.rx.truncate(offset); + self.rx.truncate_buffer(offset); } else { // For historical data, no need to truncate at seq id level. Only truncate at barrier. - if let TruncateOffset::Barrier { .. } = &offset { - self.rx.truncate(offset); + if let TruncateOffset::Barrier { epoch } = &offset { + self.rx.truncate_historical(*epoch); } } self.truncate_offset = Some(offset);