Skip to content

Commit

Permalink
feat: implement the RawEntryReaderFilter
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed May 24, 2024
1 parent e5b3f2d commit 628e6a2
Showing 1 changed file with 33 additions and 0 deletions.
33 changes: 33 additions & 0 deletions src/mito2/src/wal/raw_entry_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use async_stream::try_stream;
use common_error::ext::BoxedError;
use common_wal::options::{KafkaWalOptions, WalOptions};
use futures::stream::BoxStream;
use futures::TryStreamExt;
use snafu::ResultExt;
use store_api::logstore::entry::{Entry, RawEntry};
use store_api::logstore::LogStore;
Expand Down Expand Up @@ -55,6 +56,7 @@ pub(crate) trait RawEntryReader: Send + Sync {
) -> Result<RawEntryStream<'a>>;
}

/// Implement the [RawEntryReader] for the [LogStore].
pub struct LogStoreRawEntryReader<S> {
store: Arc<S>,
}
Expand Down Expand Up @@ -141,3 +143,34 @@ impl<S: LogStore> RawEntryReader for LogStoreRawEntryReader<S> {
Ok(Box::pin(stream))
}
}

/// A Filter implement the [RawEntryReader]
pub struct RawEntryReaderFilter<R, F> {
reader: R,
filter: F,
}

impl<R, F> RawEntryReader for RawEntryReaderFilter<R, F>
where
R: RawEntryReader,
F: Fn(&RawEntry) -> bool + Sync + Send,
{
fn read<'a>(
&'a self,
ctx: LogStoreNamespace<'a>,
start_id: EntryId,
) -> Result<RawEntryStream<'a>> {
let mut stream = self.reader.read(ctx, start_id)?;
let filter = &(self.filter);
let stream = try_stream!({
while let Some(entry) = stream.next().await {
let entry = entry?;
if filter(&entry) {
yield entry
}
}
});

Ok(Box::pin(stream))
}
}

0 comments on commit 628e6a2

Please sign in to comment.