Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(log-store): log store support truncates at given offset #12263

Merged
merged 11 commits into from
Sep 20, 2023
154 changes: 128 additions & 26 deletions src/stream/src/common/log_store/in_mem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use tokio::sync::oneshot;
use crate::common::log_store::in_mem::LogReaderEpochProgress::{AwaitingTruncate, Consuming};
use crate::common::log_store::{
LogReader, LogStoreError, LogStoreFactory, LogStoreReadItem, LogStoreResult, LogWriter,
TruncateOffset,
};

enum InMemLogStoreItem {
Expand Down Expand Up @@ -79,6 +80,12 @@ pub struct BoundedInMemLogStoreReader {

/// Sender of consumed epoch to the log writer
truncated_epoch_tx: UnboundedSender<u64>,

/// Offset of the latest emitted item
latest_offset: TruncateOffset,

/// Offset of the latest truncated item
truncate_offset: TruncateOffset,
}

pub struct BoundedInMemLogStoreFactory {
Expand All @@ -104,6 +111,8 @@ impl LogStoreFactory for BoundedInMemLogStoreFactory {
init_epoch_rx: Some(init_epoch_rx),
item_rx,
truncated_epoch_tx,
latest_offset: TruncateOffset::Barrier { epoch: 0 },
truncate_offset: TruncateOffset::Barrier { epoch: 0 },
};
let writer = BoundedInMemLogStoreWriter {
curr_epoch: None,
Expand All @@ -126,6 +135,8 @@ impl LogReader for BoundedInMemLogStoreReader {
.map_err(|e| anyhow!("unable to get init epoch: {:?}", e))?;
assert_eq!(self.epoch_progress, UNINITIALIZED);
self.epoch_progress = LogReaderEpochProgress::Consuming(epoch);
self.latest_offset = TruncateOffset::Barrier { epoch: epoch - 1 };
self.truncate_offset = TruncateOffset::Barrier { epoch: epoch - 1 };
Ok(())
}

Expand All @@ -134,7 +145,29 @@ impl LogReader for BoundedInMemLogStoreReader {
Some(item) => match self.epoch_progress {
Consuming(current_epoch) => match item {
InMemLogStoreItem::StreamChunk(chunk) => {
Ok((current_epoch, LogStoreReadItem::StreamChunk(chunk)))
let chunk_id = match self.latest_offset {
TruncateOffset::Chunk { epoch, chunk_id } => {
assert_eq!(epoch, current_epoch);
chunk_id + 1
}
TruncateOffset::Barrier { epoch } => {
assert!(
epoch < current_epoch,
"prev offset at barrier {} but current epoch {}",
epoch,
current_epoch
);
0
}
};
self.latest_offset = TruncateOffset::Chunk {
epoch: current_epoch,
chunk_id,
};
Ok((
current_epoch,
LogStoreReadItem::StreamChunk { chunk, chunk_id },
))
}
InMemLogStoreItem::Barrier {
is_checkpoint,
Expand All @@ -148,35 +181,60 @@ impl LogReader for BoundedInMemLogStoreReader {
} else {
self.epoch_progress = Consuming(next_epoch);
}
self.latest_offset = TruncateOffset::Barrier {
epoch: current_epoch,
};
Ok((current_epoch, LogStoreReadItem::Barrier { is_checkpoint }))
}
InMemLogStoreItem::UpdateVnodeBitmap(vnode_bitmap) => Ok((
current_epoch,
LogStoreReadItem::UpdateVnodeBitmap(vnode_bitmap),
)),
},
AwaitingTruncate { .. } => {
unreachable!("should not be awaiting for when barrier comes")
}
AwaitingTruncate { .. } => Err(anyhow!(
"should not call next_item on checkpoint barrier for in-mem log store"
)
.into()),
},
None => Err(LogStoreError::EndOfLogStream),
}
}

async fn truncate(&mut self) -> LogStoreResult<()> {
let sealed_epoch = match self.epoch_progress {
Consuming(_) => unreachable!("should be awaiting truncate"),
AwaitingTruncate {
sealed_epoch,
next_epoch,
} => {
self.epoch_progress = Consuming(next_epoch);
sealed_epoch
async fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()> {
// check the truncate offset is higher than prev truncate offset
if self.truncate_offset >= offset {
return Err(anyhow!(
"truncate offset {:?} but prev truncate offset is {:?}",
offset,
self.truncate_offset
)
.into());
}

// check the truncate offset does not exceed the latest possible offset
if offset > self.latest_offset {
return Err(anyhow!(
"truncate at {:?} but latest offset is {:?}",
offset,
self.latest_offset
)
.into());
}

if let AwaitingTruncate {
sealed_epoch,
next_epoch,
} = &self.epoch_progress
{
if let TruncateOffset::Barrier {epoch} = offset && epoch == *sealed_epoch {
let sealed_epoch = *sealed_epoch;
self.epoch_progress = Consuming(*next_epoch);
self.truncated_epoch_tx
.send(sealed_epoch)
.map_err(|_| anyhow!("unable to send sealed epoch"))?;
}
};
self.truncated_epoch_tx
.send(sealed_epoch)
.map_err(|_| anyhow!("unable to send sealed epoch"))?;
}
self.truncate_offset = offset;
Ok(())
}
}
Expand Down Expand Up @@ -240,12 +298,18 @@ impl LogWriter for BoundedInMemLogStoreWriter {

#[cfg(test)]
mod tests {
use std::future::poll_fn;
use std::task::Poll;

use futures::FutureExt;
use risingwave_common::array::Op;
use risingwave_common::types::{DataType, ScalarImpl};
use risingwave_common::util::epoch::EpochPair;

use crate::common::log_store::in_mem::BoundedInMemLogStoreFactory;
use crate::common::log_store::{LogReader, LogStoreFactory, LogStoreReadItem, LogWriter};
use crate::common::log_store::{
LogReader, LogStoreFactory, LogStoreReadItem, LogWriter, TruncateOffset,
};
use crate::common::StreamChunkBuilder;

#[tokio::test]
Expand Down Expand Up @@ -273,7 +337,7 @@ mod tests {
let stream_chunk = builder.take().unwrap();
let stream_chunk_clone = stream_chunk.clone();

let join_handle = tokio::spawn(async move {
let mut join_handle = tokio::spawn(async move {
writer
.init(EpochPair::new_test_epoch(init_epoch))
.await
Expand All @@ -282,19 +346,33 @@ mod tests {
.write_chunk(stream_chunk_clone.clone())
.await
.unwrap();
writer
.write_chunk(stream_chunk_clone.clone())
.await
.unwrap();
writer.flush_current_epoch(epoch1, false).await.unwrap();
writer.write_chunk(stream_chunk_clone).await.unwrap();
writer.flush_current_epoch(epoch2, true).await.unwrap();
});

reader.init().await.unwrap();
match reader.next_item().await.unwrap() {
(epoch, LogStoreReadItem::StreamChunk(chunk)) => {
let _chunk_id1_1 = match reader.next_item().await.unwrap() {
(epoch, LogStoreReadItem::StreamChunk { chunk, chunk_id }) => {
assert_eq!(epoch, init_epoch);
assert_eq!(&chunk, &stream_chunk);
chunk_id
}
_ => unreachable!(),
}
};

let chunk_id1_2 = match reader.next_item().await.unwrap() {
(epoch, LogStoreReadItem::StreamChunk { chunk, chunk_id }) => {
assert_eq!(epoch, init_epoch);
assert_eq!(&chunk, &stream_chunk);
chunk_id
}
_ => unreachable!(),
};

match reader.next_item().await.unwrap() {
(epoch, LogStoreReadItem::Barrier { is_checkpoint }) => {
Expand All @@ -304,13 +382,14 @@ mod tests {
_ => unreachable!(),
}

match reader.next_item().await.unwrap() {
(epoch, LogStoreReadItem::StreamChunk(chunk)) => {
let chunk_id2_1 = match reader.next_item().await.unwrap() {
(epoch, LogStoreReadItem::StreamChunk { chunk, chunk_id }) => {
assert_eq!(&chunk, &stream_chunk);
assert_eq!(epoch, epoch1);
chunk_id
}
_ => unreachable!(),
}
};

match reader.next_item().await.unwrap() {
(epoch, LogStoreReadItem::Barrier { is_checkpoint }) => {
Expand All @@ -320,7 +399,30 @@ mod tests {
_ => unreachable!(),
}

reader.truncate().await.unwrap();
reader
.truncate(TruncateOffset::Chunk {
epoch: init_epoch,
chunk_id: chunk_id1_2,
})
.await
.unwrap();
assert!(poll_fn(|cx| Poll::Ready(join_handle.poll_unpin(cx)))
.await
.is_pending());
reader
.truncate(TruncateOffset::Chunk {
epoch: epoch1,
chunk_id: chunk_id2_1,
})
.await
.unwrap();
assert!(poll_fn(|cx| Poll::Ready(join_handle.poll_unpin(cx)))
.await
.is_pending());
reader
.truncate(TruncateOffset::Barrier { epoch: epoch1 })
.await
.unwrap();
join_handle.await.unwrap();
}
}
Loading
Loading