Skip to content

Commit

Permalink
feat: implement the LogStoreEntryReader
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed May 24, 2024
1 parent 466f7c6 commit 975db92
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 1 deletion.
3 changes: 3 additions & 0 deletions src/mito2/src/wal/raw_entry_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,19 @@ use crate::wal::EntryId;
pub type RawEntryStream<'a> = BoxStream<'a, Result<RawEntry>>;

// The namespace of kafka log store
#[derive(Debug, Clone, Copy)]
pub struct KafkaNamespace<'a> {
topic: &'a str,
}

// The namespace of raft engine log store
#[derive(Debug, Clone, Copy)]
pub struct RaftEngineNamespace {
region_id: RegionId,
}

/// The namespace of [RawEntryReader].
#[derive(Debug, Clone, Copy)]
pub(crate) enum LogStoreNamespace<'a> {
RaftEngine(RaftEngineNamespace),
Kafka(KafkaNamespace<'a>),
Expand Down
46 changes: 45 additions & 1 deletion src/mito2/src/wal/wal_entry_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,57 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use api::v1::WalEntry;
use async_stream::try_stream;
use futures::{StreamExt, TryStreamExt};
use prost::Message;
use snafu::ResultExt;
use store_api::logstore::entry::RawEntry;
use store_api::storage::RegionId;

use crate::error::Result;
use super::raw_entry_reader::RawEntryReader;
use crate::error::{DecodeWalSnafu, Result};
use crate::wal::raw_entry_reader::LogStoreNamespace;
use crate::wal::{EntryId, WalEntryStream};

pub(crate) fn decode_raw_entry(raw_entry: RawEntry) -> Result<(EntryId, WalEntry)> {
let entry_id = raw_entry.entry_id;
let wal_entry = WalEntry::decode(raw_entry.data.as_slice()).context(DecodeWalSnafu {
region_id: raw_entry.region_id,
})?;

Ok((entry_id, wal_entry))
}

/// [OneshotWalEntryReader] provides the ability to read and decode entries from the underlying store.
pub(crate) trait OneshotWalEntryReader: Send + Sync {
fn read(self, ctx: LogStoreNamespace, start_id: EntryId) -> Result<WalEntryStream>;
}

/// A Reader reads the [RawEntry] from [RawEntryReader] and decodes [RawEntry] into [WalEntry].
pub struct LogStoreEntryReader<R> {
reader: R,
}

impl<R: RawEntryReader> OneshotWalEntryReader for LogStoreEntryReader<R> {
fn read(self, ctx: LogStoreNamespace, start_id: EntryId) -> Result<WalEntryStream> {
let mut stream = self.reader.read(ctx, start_id)?;
match ctx {
LogStoreNamespace::RaftEngine(_) => {
let stream = stream.map(|entry| decode_raw_entry(entry?));

Ok(Box::pin(stream))
}
LogStoreNamespace::Kafka(_) => {
let stream = try_stream!({
while let Some(entry) = stream.next().await {
let entry = entry?;
yield decode_raw_entry(entry)?
}
});

Ok(Box::pin(stream))
}
}
}
}

0 comments on commit 975db92

Please sign in to comment.