diff --git a/src/mito2/src/wal/raw_entry_reader.rs b/src/mito2/src/wal/raw_entry_reader.rs index ec7590a2ca01..6e32ad1ce745 100644 --- a/src/mito2/src/wal/raw_entry_reader.rs +++ b/src/mito2/src/wal/raw_entry_reader.rs @@ -18,6 +18,7 @@ use async_stream::try_stream; use common_error::ext::BoxedError; use common_wal::options::{KafkaWalOptions, WalOptions}; use futures::stream::BoxStream; +use futures::TryStreamExt; use snafu::ResultExt; use store_api::logstore::entry::{Entry, RawEntry}; use store_api::logstore::LogStore; @@ -55,6 +56,7 @@ pub(crate) trait RawEntryReader: Send + Sync { ) -> Result>; } +/// Implement the [RawEntryReader] for the [LogStore]. pub struct LogStoreRawEntryReader { store: Arc, } @@ -141,3 +143,34 @@ impl RawEntryReader for LogStoreRawEntryReader { Ok(Box::pin(stream)) } } + +/// A Filter implement the [RawEntryReader] +pub struct RawEntryReaderFilter { + reader: R, + filter: F, +} + +impl RawEntryReader for RawEntryReaderFilter +where + R: RawEntryReader, + F: Fn(&RawEntry) -> bool + Sync + Send, +{ + fn read<'a>( + &'a self, + ctx: LogStoreNamespace<'a>, + start_id: EntryId, + ) -> Result> { + let mut stream = self.reader.read(ctx, start_id)?; + let filter = &(self.filter); + let stream = try_stream!({ + while let Some(entry) = stream.next().await { + let entry = entry?; + if filter(&entry) { + yield entry + } + } + }); + + Ok(Box::pin(stream)) + } +}