Skip to content

Commit

Permalink
fix(log-store): write truncate offset when consuming persistented data (
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 authored May 29, 2024
1 parent 2016c32 commit d5ea645
Show file tree
Hide file tree
Showing 3 changed files with 246 additions and 12 deletions.
27 changes: 18 additions & 9 deletions src/stream/src/common/log_store_impl/kv_log_store/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,16 @@ impl LogStoreBufferInner {
self.update_unconsumed_buffer_metrics();
}

fn add_truncate_offset(&mut self, (epoch, seq_id): ReaderTruncationOffsetType) {
if let Some((prev_epoch, ref mut prev_seq_id)) = self.truncation_list.back_mut()
&& *prev_epoch == epoch
{
*prev_seq_id = seq_id;
} else {
self.truncation_list.push_back((epoch, seq_id));
}
}

fn rewind(&mut self) {
while let Some((epoch, item)) = self.consumed_queue.pop_front() {
self.unconsumed_queue.push_back((epoch, item));
Expand Down Expand Up @@ -371,7 +381,7 @@ impl LogStoreBufferReceiver {
}
}

pub(crate) fn truncate(&mut self, offset: TruncateOffset) {
pub(crate) fn truncate_buffer(&mut self, offset: TruncateOffset) {
let mut inner = self.buffer.inner();
let mut latest_offset: Option<ReaderTruncationOffsetType> = None;
while let Some((epoch, item)) = inner.consumed_queue.back() {
Expand Down Expand Up @@ -431,17 +441,16 @@ impl LogStoreBufferReceiver {
}
}
}
if let Some((epoch, seq_id)) = latest_offset {
if let Some((prev_epoch, ref mut prev_seq_id)) = inner.truncation_list.back_mut()
&& *prev_epoch == epoch
{
*prev_seq_id = seq_id;
} else {
inner.truncation_list.push_back((epoch, seq_id));
}
if let Some(offset) = latest_offset {
inner.add_truncate_offset(offset);
}
}

pub(crate) fn truncate_historical(&mut self, epoch: u64) {
let mut inner = self.buffer.inner();
inner.add_truncate_offset((epoch, None));
}

pub(crate) fn rewind(&self) {
self.buffer.inner().rewind()
}
Expand Down
225 changes: 225 additions & 0 deletions src/stream/src/common/log_store_impl/kv_log_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1555,4 +1555,229 @@ mod tests {
let chunk_ids = check_reader_last_unsealed(&mut reader, empty()).await;
assert!(chunk_ids.is_empty());
}

async fn validate_reader(
reader: &mut impl LogReader,
expected: impl IntoIterator<Item = (u64, LogStoreReadItem)>,
) {
for (expected_epoch, expected_item) in expected {
let (epoch, item) = reader.next_item().await.unwrap();
assert_eq!(expected_epoch, epoch);
match (expected_item, item) {
(
LogStoreReadItem::StreamChunk {
chunk: expected_chunk,
..
},
LogStoreReadItem::StreamChunk { chunk, .. },
) => {
check_stream_chunk_eq(&expected_chunk, &chunk);
}
(
LogStoreReadItem::Barrier {
is_checkpoint: expected_is_checkpoint,
},
LogStoreReadItem::Barrier { is_checkpoint },
) => {
assert_eq!(expected_is_checkpoint, is_checkpoint);
}
_ => unreachable!(),
}
}
}

#[tokio::test]
async fn test_truncate_historical() {
#[expect(deprecated)]
test_truncate_historical_inner(
10,
&crate::common::log_store_impl::kv_log_store::v1::KV_LOG_STORE_V1_INFO,
)
.await;
test_truncate_historical_inner(10, &KV_LOG_STORE_V2_INFO).await;
}

async fn test_truncate_historical_inner(
max_row_count: usize,
pk_info: &'static KvLogStorePkInfo,
) {
let gen_stream_chunk = |base| gen_stream_chunk_with_info(base, pk_info);
let test_env = prepare_hummock_test_env().await;

let table = gen_test_log_store_table(pk_info);

test_env.register_table(table.clone()).await;

let stream_chunk1 = gen_stream_chunk(0);
let stream_chunk2 = gen_stream_chunk(10);
let bitmap = calculate_vnode_bitmap(stream_chunk1.rows().chain(stream_chunk2.rows()));
let bitmap = Arc::new(bitmap);

let factory = KvLogStoreFactory::new(
test_env.storage.clone(),
table.clone(),
Some(bitmap.clone()),
max_row_count,
KvLogStoreMetrics::for_test(),
"test",
pk_info,
);
let (mut reader, mut writer) = factory.build().await;

let epoch1 = test_env
.storage
.get_pinned_version()
.version()
.max_committed_epoch
.next_epoch();
writer
.init(EpochPair::new_test_epoch(epoch1), false)
.await
.unwrap();
writer.write_chunk(stream_chunk1.clone()).await.unwrap();
let epoch2 = epoch1.next_epoch();
writer.flush_current_epoch(epoch2, false).await.unwrap();
writer.write_chunk(stream_chunk2.clone()).await.unwrap();
let epoch3 = epoch2.next_epoch();
writer.flush_current_epoch(epoch3, true).await.unwrap();

test_env.storage.seal_epoch(epoch1, false);
test_env.commit_epoch(epoch2).await;

reader.init().await.unwrap();
validate_reader(
&mut reader,
[
(
epoch1,
LogStoreReadItem::StreamChunk {
chunk: stream_chunk1.clone(),
chunk_id: 0,
},
),
(
epoch1,
LogStoreReadItem::Barrier {
is_checkpoint: false,
},
),
(
epoch2,
LogStoreReadItem::StreamChunk {
chunk: stream_chunk2.clone(),
chunk_id: 0,
},
),
(
epoch2,
LogStoreReadItem::Barrier {
is_checkpoint: true,
},
),
],
)
.await;

drop(writer);

// Recovery
test_env.storage.clear_shared_buffer(epoch2).await;

// Rebuild log reader and writer in recovery
let factory = KvLogStoreFactory::new(
test_env.storage.clone(),
table.clone(),
Some(bitmap.clone()),
max_row_count,
KvLogStoreMetrics::for_test(),
"test",
pk_info,
);
let (mut reader, mut writer) = factory.build().await;
writer
.init(EpochPair::new_test_epoch(epoch3), false)
.await
.unwrap();
reader.init().await.unwrap();
validate_reader(
&mut reader,
[
(
epoch1,
LogStoreReadItem::StreamChunk {
chunk: stream_chunk1.clone(),
chunk_id: 0,
},
),
(
epoch1,
LogStoreReadItem::Barrier {
is_checkpoint: false,
},
),
(
epoch2,
LogStoreReadItem::StreamChunk {
chunk: stream_chunk2.clone(),
chunk_id: 0,
},
),
(
epoch2,
LogStoreReadItem::Barrier {
is_checkpoint: true,
},
),
],
)
.await;
// The truncate should take effect
reader
.truncate(TruncateOffset::Barrier { epoch: epoch1 })
.unwrap();
let epoch4 = epoch3.next_epoch();
writer.flush_current_epoch(epoch4, true).await.unwrap();
test_env.commit_epoch(epoch3).await;

drop(writer);

// Recovery
test_env.storage.clear_shared_buffer(epoch3).await;

// Rebuild log reader and writer in recovery
let factory = KvLogStoreFactory::new(
test_env.storage.clone(),
table.clone(),
Some(bitmap),
max_row_count,
KvLogStoreMetrics::for_test(),
"test",
pk_info,
);
let (mut reader, mut writer) = factory.build().await;
writer
.init(EpochPair::new_test_epoch(epoch4), false)
.await
.unwrap();
reader.init().await.unwrap();
validate_reader(
&mut reader,
[
(
epoch2,
LogStoreReadItem::StreamChunk {
chunk: stream_chunk2.clone(),
chunk_id: 0,
},
),
(
epoch2,
LogStoreReadItem::Barrier {
is_checkpoint: true,
},
),
],
)
.await;
}
}
6 changes: 3 additions & 3 deletions src/stream/src/common/log_store_impl/kv_log_store/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -465,11 +465,11 @@ impl<S: StateStore> LogReader for KvLogStoreReader<S> {
}
}
if offset.epoch() >= self.first_write_epoch.expect("should have init") {
self.rx.truncate(offset);
self.rx.truncate_buffer(offset);
} else {
// For historical data, no need to truncate at seq id level. Only truncate at barrier.
if let TruncateOffset::Barrier { .. } = &offset {
self.rx.truncate(offset);
if let TruncateOffset::Barrier { epoch } = &offset {
self.rx.truncate_historical(*epoch);
}
}
self.truncate_offset = Some(offset);
Expand Down

0 comments on commit d5ea645

Please sign in to comment.