Skip to content

Commit

Permalink
refactor: simplify RawEntryReaderFilter
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed May 25, 2024
1 parent e955ba7 commit 15264cd
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 24 deletions.
10 changes: 4 additions & 6 deletions src/mito2/src/wal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,9 @@ use store_api::logstore::namespace::LogStoreNamespace;
use store_api::logstore::{AppendBatchResponse, LogStore};
use store_api::storage::RegionId;

use self::raw_entry_reader::RawEntryReaderFilter;
use self::wal_entry_reader::WalEntryReader;
use crate::error::{DeleteWalSnafu, EncodeWalSnafu, Result, WriteWalSnafu};
use crate::wal::raw_entry_reader::LogStoreRawEntryReader;
use crate::wal::wal_entry_reader::LogStoreEntryReader;
use crate::wal::raw_entry_reader::{LogStoreRawEntryReader, RegionRawEntryReader};
use crate::wal::wal_entry_reader::{LogStoreEntryReader, WalEntryReader};

/// WAL entry id.
pub type EntryId = store_api::logstore::entry::Id;
Expand Down Expand Up @@ -88,9 +86,9 @@ impl<S: LogStore> Wal<S> {
LogStoreEntryReader::new(LogStoreRawEntryReader::new(self.store.clone()))
.read(namespace, start_id)
}
LogStoreNamespace::Kafka(_) => LogStoreEntryReader::new(RawEntryReaderFilter::new(
LogStoreNamespace::Kafka(_) => LogStoreEntryReader::new(RegionRawEntryReader::new(
LogStoreRawEntryReader::new(self.store.clone()),
move |entry| entry.region_id == region_id,
region_id,
))
.read(namespace, start_id),
}
Expand Down
31 changes: 13 additions & 18 deletions src/mito2/src/wal/raw_entry_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,38 +114,33 @@ impl<S: LogStore> RawEntryReader for LogStoreRawEntryReader<S> {
}
}

/// A filter implement the [RawEntryReader]
pub struct RawEntryReaderFilter<R, F> {
/// A [RawEntryReader] reads [RawEntry] belongs to a specific region.
pub struct RegionRawEntryReader<R> {
reader: R,
filter: Arc<F>,
region_id: RegionId,
}

impl<R, F> RawEntryReaderFilter<R, F>
impl<R> RegionRawEntryReader<R>
where
R: RawEntryReader,
F: Fn(&RawEntry) -> bool + Sync + Send,
{
pub fn new(reader: R, filter: F) -> Self {
Self {
reader,
filter: Arc::new(filter),
}
pub fn new(reader: R, region_id: RegionId) -> Self {
Self { reader, region_id }
}
}

impl<R, F> RawEntryReader for RawEntryReaderFilter<R, F>
impl<R> RawEntryReader for RegionRawEntryReader<R>
where
R: RawEntryReader,
F: Fn(&RawEntry) -> bool + Sync + Send + 'static,
{
fn read(&self, ctx: &LogStoreNamespace, start_id: EntryId) -> Result<RawEntryStream<'static>> {
let mut stream = self.reader.read(ctx, start_id)?;
let filter = self.filter.clone();
let region_id = self.region_id;

let stream = try_stream!({
while let Some(entry) = stream.next().await {
let entry = entry?;
if filter(&entry) {
if entry.region_id == region_id {
yield entry
}
}
Expand Down Expand Up @@ -293,10 +288,10 @@ mod tests {
};

let expected_region_id = RegionId::new(1024, 3);
let reader =
RawEntryReaderFilter::new(LogStoreRawEntryReader::new(Arc::new(store)), move |entry| {
entry.region_id == expected_region_id
});
let reader = RegionRawEntryReader::new(
LogStoreRawEntryReader::new(Arc::new(store)),
expected_region_id,
);
let entries = reader
.read(
&LogStoreNamespace::raft_engine_namespace(RegionId::new(1024, 1).as_u64()),
Expand Down

0 comments on commit 15264cd

Please sign in to comment.