From 975db92de1841af023a8600b64224dcfb78460a9 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Fri, 24 May 2024 06:57:21 +0000 Subject: [PATCH] feat: implement the `LogStoreEntryReader` --- src/mito2/src/wal/raw_entry_reader.rs | 3 ++ src/mito2/src/wal/wal_entry_reader.rs | 46 ++++++++++++++++++++++++++- 2 files changed, 48 insertions(+), 1 deletion(-) diff --git a/src/mito2/src/wal/raw_entry_reader.rs b/src/mito2/src/wal/raw_entry_reader.rs index aa4d5ea0e455..c9c658217e09 100644 --- a/src/mito2/src/wal/raw_entry_reader.rs +++ b/src/mito2/src/wal/raw_entry_reader.rs @@ -23,16 +23,19 @@ use crate::wal::EntryId; pub type RawEntryStream<'a> = BoxStream<'a, Result>; // The namespace of kafka log store +#[derive(Debug, Clone, Copy)] pub struct KafkaNamespace<'a> { topic: &'a str, } // The namespace of raft engine log store +#[derive(Debug, Clone, Copy)] pub struct RaftEngineNamespace { region_id: RegionId, } /// The namespace of [RawEntryReader]. +#[derive(Debug, Clone, Copy)] pub(crate) enum LogStoreNamespace<'a> { RaftEngine(RaftEngineNamespace), Kafka(KafkaNamespace<'a>), diff --git a/src/mito2/src/wal/wal_entry_reader.rs b/src/mito2/src/wal/wal_entry_reader.rs index 8c3e16122254..cef2359f76b3 100644 --- a/src/mito2/src/wal/wal_entry_reader.rs +++ b/src/mito2/src/wal/wal_entry_reader.rs @@ -12,13 +12,57 @@ // See the License for the specific language governing permissions and // limitations under the License. +use api::v1::WalEntry; +use async_stream::try_stream; +use futures::{StreamExt, TryStreamExt}; +use prost::Message; +use snafu::ResultExt; +use store_api::logstore::entry::RawEntry; use store_api::storage::RegionId; -use crate::error::Result; +use super::raw_entry_reader::RawEntryReader; +use crate::error::{DecodeWalSnafu, Result}; use crate::wal::raw_entry_reader::LogStoreNamespace; use crate::wal::{EntryId, WalEntryStream}; +pub(crate) fn decode_raw_entry(raw_entry: RawEntry) -> Result<(EntryId, WalEntry)> { + let entry_id = raw_entry.entry_id; + let wal_entry = WalEntry::decode(raw_entry.data.as_slice()).context(DecodeWalSnafu { + region_id: raw_entry.region_id, + })?; + + Ok((entry_id, wal_entry)) +} + /// [OneshotWalEntryReader] provides the ability to read and decode entries from the underlying store. pub(crate) trait OneshotWalEntryReader: Send + Sync { fn read(self, ctx: LogStoreNamespace, start_id: EntryId) -> Result; } + +/// A Reader reads the [RawEntry] from [RawEntryReader] and decodes [RawEntry] into [WalEntry]. +pub struct LogStoreEntryReader { + reader: R, +} + +impl OneshotWalEntryReader for LogStoreEntryReader { + fn read(self, ctx: LogStoreNamespace, start_id: EntryId) -> Result { + let mut stream = self.reader.read(ctx, start_id)?; + match ctx { + LogStoreNamespace::RaftEngine(_) => { + let stream = stream.map(|entry| decode_raw_entry(entry?)); + + Ok(Box::pin(stream)) + } + LogStoreNamespace::Kafka(_) => { + let stream = try_stream!({ + while let Some(entry) = stream.next().await { + let entry = entry?; + yield decode_raw_entry(entry)? + } + }); + + Ok(Box::pin(stream)) + } + } + } +}