Skip to content

Commit

Permalink
feat(log-store): log store support truncates at given offset (#12263)
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 authored Sep 20, 2023
1 parent 869ef90 commit 8c05180
Show file tree
Hide file tree
Showing 7 changed files with 653 additions and 208 deletions.
154 changes: 128 additions & 26 deletions src/stream/src/common/log_store/in_mem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use tokio::sync::oneshot;
use crate::common::log_store::in_mem::LogReaderEpochProgress::{AwaitingTruncate, Consuming};
use crate::common::log_store::{
LogReader, LogStoreError, LogStoreFactory, LogStoreReadItem, LogStoreResult, LogWriter,
TruncateOffset,
};

enum InMemLogStoreItem {
Expand Down Expand Up @@ -79,6 +80,12 @@ pub struct BoundedInMemLogStoreReader {

/// Sender of consumed epoch to the log writer
truncated_epoch_tx: UnboundedSender<u64>,

/// Offset of the latest emitted item
latest_offset: TruncateOffset,

/// Offset of the latest truncated item
truncate_offset: TruncateOffset,
}

pub struct BoundedInMemLogStoreFactory {
Expand All @@ -104,6 +111,8 @@ impl LogStoreFactory for BoundedInMemLogStoreFactory {
init_epoch_rx: Some(init_epoch_rx),
item_rx,
truncated_epoch_tx,
latest_offset: TruncateOffset::Barrier { epoch: 0 },
truncate_offset: TruncateOffset::Barrier { epoch: 0 },
};
let writer = BoundedInMemLogStoreWriter {
curr_epoch: None,
Expand All @@ -126,6 +135,8 @@ impl LogReader for BoundedInMemLogStoreReader {
.map_err(|e| anyhow!("unable to get init epoch: {:?}", e))?;
assert_eq!(self.epoch_progress, UNINITIALIZED);
self.epoch_progress = LogReaderEpochProgress::Consuming(epoch);
self.latest_offset = TruncateOffset::Barrier { epoch: epoch - 1 };
self.truncate_offset = TruncateOffset::Barrier { epoch: epoch - 1 };
Ok(())
}

Expand All @@ -134,7 +145,29 @@ impl LogReader for BoundedInMemLogStoreReader {
Some(item) => match self.epoch_progress {
Consuming(current_epoch) => match item {
InMemLogStoreItem::StreamChunk(chunk) => {
Ok((current_epoch, LogStoreReadItem::StreamChunk(chunk)))
let chunk_id = match self.latest_offset {
TruncateOffset::Chunk { epoch, chunk_id } => {
assert_eq!(epoch, current_epoch);
chunk_id + 1
}
TruncateOffset::Barrier { epoch } => {
assert!(
epoch < current_epoch,
"prev offset at barrier {} but current epoch {}",
epoch,
current_epoch
);
0
}
};
self.latest_offset = TruncateOffset::Chunk {
epoch: current_epoch,
chunk_id,
};
Ok((
current_epoch,
LogStoreReadItem::StreamChunk { chunk, chunk_id },
))
}
InMemLogStoreItem::Barrier {
is_checkpoint,
Expand All @@ -148,35 +181,60 @@ impl LogReader for BoundedInMemLogStoreReader {
} else {
self.epoch_progress = Consuming(next_epoch);
}
self.latest_offset = TruncateOffset::Barrier {
epoch: current_epoch,
};
Ok((current_epoch, LogStoreReadItem::Barrier { is_checkpoint }))
}
InMemLogStoreItem::UpdateVnodeBitmap(vnode_bitmap) => Ok((
current_epoch,
LogStoreReadItem::UpdateVnodeBitmap(vnode_bitmap),
)),
},
AwaitingTruncate { .. } => {
unreachable!("should not be awaiting for when barrier comes")
}
AwaitingTruncate { .. } => Err(anyhow!(
"should not call next_item on checkpoint barrier for in-mem log store"
)
.into()),
},
None => Err(LogStoreError::EndOfLogStream),
}
}

async fn truncate(&mut self) -> LogStoreResult<()> {
let sealed_epoch = match self.epoch_progress {
Consuming(_) => unreachable!("should be awaiting truncate"),
AwaitingTruncate {
sealed_epoch,
next_epoch,
} => {
self.epoch_progress = Consuming(next_epoch);
sealed_epoch
async fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()> {
// check the truncate offset is higher than prev truncate offset
if self.truncate_offset >= offset {
return Err(anyhow!(
"truncate offset {:?} but prev truncate offset is {:?}",
offset,
self.truncate_offset
)
.into());
}

// check the truncate offset does not exceed the latest possible offset
if offset > self.latest_offset {
return Err(anyhow!(
"truncate at {:?} but latest offset is {:?}",
offset,
self.latest_offset
)
.into());
}

if let AwaitingTruncate {
sealed_epoch,
next_epoch,
} = &self.epoch_progress
{
if let TruncateOffset::Barrier {epoch} = offset && epoch == *sealed_epoch {
let sealed_epoch = *sealed_epoch;
self.epoch_progress = Consuming(*next_epoch);
self.truncated_epoch_tx
.send(sealed_epoch)
.map_err(|_| anyhow!("unable to send sealed epoch"))?;
}
};
self.truncated_epoch_tx
.send(sealed_epoch)
.map_err(|_| anyhow!("unable to send sealed epoch"))?;
}
self.truncate_offset = offset;
Ok(())
}
}
Expand Down Expand Up @@ -240,12 +298,18 @@ impl LogWriter for BoundedInMemLogStoreWriter {

#[cfg(test)]
mod tests {
use std::future::poll_fn;
use std::task::Poll;

use futures::FutureExt;
use risingwave_common::array::Op;
use risingwave_common::types::{DataType, ScalarImpl};
use risingwave_common::util::epoch::EpochPair;

use crate::common::log_store::in_mem::BoundedInMemLogStoreFactory;
use crate::common::log_store::{LogReader, LogStoreFactory, LogStoreReadItem, LogWriter};
use crate::common::log_store::{
LogReader, LogStoreFactory, LogStoreReadItem, LogWriter, TruncateOffset,
};
use crate::common::StreamChunkBuilder;

#[tokio::test]
Expand Down Expand Up @@ -273,7 +337,7 @@ mod tests {
let stream_chunk = builder.take().unwrap();
let stream_chunk_clone = stream_chunk.clone();

let join_handle = tokio::spawn(async move {
let mut join_handle = tokio::spawn(async move {
writer
.init(EpochPair::new_test_epoch(init_epoch))
.await
Expand All @@ -282,19 +346,33 @@ mod tests {
.write_chunk(stream_chunk_clone.clone())
.await
.unwrap();
writer
.write_chunk(stream_chunk_clone.clone())
.await
.unwrap();
writer.flush_current_epoch(epoch1, false).await.unwrap();
writer.write_chunk(stream_chunk_clone).await.unwrap();
writer.flush_current_epoch(epoch2, true).await.unwrap();
});

reader.init().await.unwrap();
match reader.next_item().await.unwrap() {
(epoch, LogStoreReadItem::StreamChunk(chunk)) => {
let _chunk_id1_1 = match reader.next_item().await.unwrap() {
(epoch, LogStoreReadItem::StreamChunk { chunk, chunk_id }) => {
assert_eq!(epoch, init_epoch);
assert_eq!(&chunk, &stream_chunk);
chunk_id
}
_ => unreachable!(),
}
};

let chunk_id1_2 = match reader.next_item().await.unwrap() {
(epoch, LogStoreReadItem::StreamChunk { chunk, chunk_id }) => {
assert_eq!(epoch, init_epoch);
assert_eq!(&chunk, &stream_chunk);
chunk_id
}
_ => unreachable!(),
};

match reader.next_item().await.unwrap() {
(epoch, LogStoreReadItem::Barrier { is_checkpoint }) => {
Expand All @@ -304,13 +382,14 @@ mod tests {
_ => unreachable!(),
}

match reader.next_item().await.unwrap() {
(epoch, LogStoreReadItem::StreamChunk(chunk)) => {
let chunk_id2_1 = match reader.next_item().await.unwrap() {
(epoch, LogStoreReadItem::StreamChunk { chunk, chunk_id }) => {
assert_eq!(&chunk, &stream_chunk);
assert_eq!(epoch, epoch1);
chunk_id
}
_ => unreachable!(),
}
};

match reader.next_item().await.unwrap() {
(epoch, LogStoreReadItem::Barrier { is_checkpoint }) => {
Expand All @@ -320,7 +399,30 @@ mod tests {
_ => unreachable!(),
}

reader.truncate().await.unwrap();
reader
.truncate(TruncateOffset::Chunk {
epoch: init_epoch,
chunk_id: chunk_id1_2,
})
.await
.unwrap();
assert!(poll_fn(|cx| Poll::Ready(join_handle.poll_unpin(cx)))
.await
.is_pending());
reader
.truncate(TruncateOffset::Chunk {
epoch: epoch1,
chunk_id: chunk_id2_1,
})
.await
.unwrap();
assert!(poll_fn(|cx| Poll::Ready(join_handle.poll_unpin(cx)))
.await
.is_pending());
reader
.truncate(TruncateOffset::Barrier { epoch: epoch1 })
.await
.unwrap();
join_handle.await.unwrap();
}
}
Loading

0 comments on commit 8c05180

Please sign in to comment.