diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index 7f3348eb7e08..281ea7130c6d 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -250,6 +250,14 @@ pub enum Error { source: BoxedError, }, + #[snafu(display("Failed to read WAL, topic: {}", topic))] + ReadKafkaWal { + topic: String, + #[snafu(implicit)] + location: Location, + source: BoxedError, + }, + #[snafu(display("Failed to decode WAL entry, region_id: {}", region_id))] DecodeWal { region_id: RegionId, @@ -742,6 +750,7 @@ impl ErrorExt for Error { | ReadParquet { .. } | WriteWal { .. } | ReadWal { .. } + | ReadKafkaWal { .. } | DeleteWal { .. } => StatusCode::StorageUnavailable, CompressObject { .. } | DecompressObject { .. } diff --git a/src/mito2/src/wal/raw_entry_reader.rs b/src/mito2/src/wal/raw_entry_reader.rs index aa4d5ea0e455..ec7590a2ca01 100644 --- a/src/mito2/src/wal/raw_entry_reader.rs +++ b/src/mito2/src/wal/raw_entry_reader.rs @@ -12,11 +12,19 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::Arc; + +use async_stream::try_stream; +use common_error::ext::BoxedError; +use common_wal::options::{KafkaWalOptions, WalOptions}; use futures::stream::BoxStream; -use store_api::logstore::entry::RawEntry; +use snafu::ResultExt; +use store_api::logstore::entry::{Entry, RawEntry}; +use store_api::logstore::LogStore; use store_api::storage::RegionId; +use tokio_stream::StreamExt; -use crate::error::Result; +use crate::error::{self, Result}; use crate::wal::EntryId; /// A stream that yields [RawEntry]. @@ -40,5 +48,96 @@ pub(crate) enum LogStoreNamespace<'a> { /// [RawEntryReader] provides the ability to read [RawEntry] from the underlying [LogStore]. pub(crate) trait RawEntryReader: Send + Sync { - fn read(&self, ctx: LogStoreNamespace, start_id: EntryId) -> Result>; + fn read<'a>( + &'a self, + ctx: LogStoreNamespace<'a>, + start_id: EntryId, + ) -> Result>; +} + +pub struct LogStoreRawEntryReader { + store: Arc, +} + +impl LogStoreRawEntryReader { + pub fn new(store: Arc) -> Self { + Self { store } + } + + fn read_region(&self, ns: RaftEngineNamespace, start_id: EntryId) -> Result { + let region_id = ns.region_id; + let stream = try_stream!({ + // TODO(weny): refactor the `namespace` method. + let namespace = self.store.namespace(region_id.into(), &Default::default()); + let mut stream = self + .store + .read(&namespace, start_id) + .await + .map_err(BoxedError::new) + .context(error::ReadWalSnafu { region_id })?; + + while let Some(entries) = stream.next().await { + let entries = entries + .map_err(BoxedError::new) + .context(error::ReadWalSnafu { region_id })?; + + for entry in entries { + yield entry.into_raw_entry() + } + } + }); + + Ok(Box::pin(stream)) + } + + fn read_topic<'a>( + &'a self, + ns: KafkaNamespace<'a>, + start_id: EntryId, + ) -> Result { + let topic = ns.topic; + let stream = try_stream!({ + // TODO(weny): refactor the `namespace` method. + let namespace = self.store.namespace( + RegionId::from_u64(0).into(), + &WalOptions::Kafka(KafkaWalOptions { + topic: topic.to_string(), + }), + ); + + let mut stream = self + .store + .read(&namespace, start_id) + .await + .map_err(BoxedError::new) + .context(error::ReadKafkaWalSnafu { topic })?; + + while let Some(entries) = stream.next().await { + let entries = entries + .map_err(BoxedError::new) + .context(error::ReadKafkaWalSnafu { topic })?; + + for entry in entries { + yield entry.into_raw_entry() + } + } + }); + + Ok(Box::pin(stream)) + } +} + +impl RawEntryReader for LogStoreRawEntryReader { + fn read<'a>( + &'a self, + ctx: LogStoreNamespace<'a>, + start_id: EntryId, + ) -> Result> { + let stream = match ctx { + LogStoreNamespace::RaftEngine(ns) => self.read_region(ns, start_id)?, + LogStoreNamespace::Kafka(ns) => self.read_topic(ns, start_id)?, + }; + + Ok(Box::pin(stream)) + } }