diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index 7f3348eb7e08..281ea7130c6d 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -250,6 +250,14 @@ pub enum Error { source: BoxedError, }, + #[snafu(display("Failed to read WAL, topic: {}", topic))] + ReadKafkaWal { + topic: String, + #[snafu(implicit)] + location: Location, + source: BoxedError, + }, + #[snafu(display("Failed to decode WAL entry, region_id: {}", region_id))] DecodeWal { region_id: RegionId, @@ -742,6 +750,7 @@ impl ErrorExt for Error { | ReadParquet { .. } | WriteWal { .. } | ReadWal { .. } + | ReadKafkaWal { .. } | DeleteWal { .. } => StatusCode::StorageUnavailable, CompressObject { .. } | DecompressObject { .. } diff --git a/src/mito2/src/wal/raw_entry_reader.rs b/src/mito2/src/wal/raw_entry_reader.rs index aa4d5ea0e455..57cee5845e50 100644 --- a/src/mito2/src/wal/raw_entry_reader.rs +++ b/src/mito2/src/wal/raw_entry_reader.rs @@ -12,11 +12,20 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::Arc; + +use async_stream::try_stream; +use common_error::ext::BoxedError; +use common_wal::options::{KafkaWalOptions, WalOptions}; use futures::stream::BoxStream; -use store_api::logstore::entry::RawEntry; +use futures::TryStreamExt; +use snafu::ResultExt; +use store_api::logstore::entry::{Entry, RawEntry}; +use store_api::logstore::LogStore; use store_api::storage::RegionId; +use tokio_stream::StreamExt; -use crate::error::Result; +use crate::error::{self, Result}; use crate::wal::EntryId; /// A stream that yields [RawEntry]. @@ -32,6 +41,12 @@ pub struct RaftEngineNamespace { region_id: RegionId, } +impl RaftEngineNamespace { + pub fn new(region_id: RegionId) -> Self { + Self { region_id } + } +} + /// The namespace of [RawEntryReader]. pub(crate) enum LogStoreNamespace<'a> { RaftEngine(RaftEngineNamespace), @@ -40,5 +55,298 @@ pub(crate) enum LogStoreNamespace<'a> { /// [RawEntryReader] provides the ability to read [RawEntry] from the underlying [LogStore]. pub(crate) trait RawEntryReader: Send + Sync { - fn read(&self, ctx: LogStoreNamespace, start_id: EntryId) -> Result>; + fn read<'a>( + &'a self, + ctx: LogStoreNamespace<'a>, + start_id: EntryId, + ) -> Result>; +} + +/// Implement the [RawEntryReader] for the [LogStore]. +pub struct LogStoreRawEntryReader { + store: Arc, +} + +impl LogStoreRawEntryReader { + pub fn new(store: Arc) -> Self { + Self { store } + } + + fn read_region(&self, ns: RaftEngineNamespace, start_id: EntryId) -> Result { + let region_id = ns.region_id; + let stream = try_stream!({ + // TODO(weny): refactor the `namespace` method. + let namespace = self.store.namespace(region_id.into(), &Default::default()); + let mut stream = self + .store + .read(&namespace, start_id) + .await + .map_err(BoxedError::new) + .context(error::ReadWalSnafu { region_id })?; + + while let Some(entries) = stream.next().await { + let entries = entries + .map_err(BoxedError::new) + .context(error::ReadWalSnafu { region_id })?; + + for entry in entries { + yield entry.into_raw_entry() + } + } + }); + + Ok(Box::pin(stream)) + } + + fn read_topic<'a>( + &'a self, + ns: KafkaNamespace<'a>, + start_id: EntryId, + ) -> Result { + let topic = ns.topic; + let stream = try_stream!({ + // TODO(weny): refactor the `namespace` method. + let namespace = self.store.namespace( + RegionId::from_u64(0).into(), + &WalOptions::Kafka(KafkaWalOptions { + topic: topic.to_string(), + }), + ); + + let mut stream = self + .store + .read(&namespace, start_id) + .await + .map_err(BoxedError::new) + .context(error::ReadKafkaWalSnafu { topic })?; + + while let Some(entries) = stream.next().await { + let entries = entries + .map_err(BoxedError::new) + .context(error::ReadKafkaWalSnafu { topic })?; + + for entry in entries { + yield entry.into_raw_entry() + } + } + }); + + Ok(Box::pin(stream)) + } +} + +impl RawEntryReader for LogStoreRawEntryReader { + fn read<'a>( + &'a self, + ctx: LogStoreNamespace<'a>, + start_id: EntryId, + ) -> Result> { + let stream = match ctx { + LogStoreNamespace::RaftEngine(ns) => self.read_region(ns, start_id)?, + LogStoreNamespace::Kafka(ns) => self.read_topic(ns, start_id)?, + }; + + Ok(Box::pin(stream)) + } +} + +/// A filter implement the [RawEntryReader] +pub struct RawEntryReaderFilter { + reader: R, + filter: F, +} + +impl RawEntryReaderFilter +where + R: RawEntryReader, + F: Fn(&RawEntry) -> bool + Sync + Send, +{ + pub fn new(reader: R, filter: F) -> Self { + Self { reader, filter } + } +} + +impl RawEntryReader for RawEntryReaderFilter +where + R: RawEntryReader, + F: Fn(&RawEntry) -> bool + Sync + Send, +{ + fn read<'a>( + &'a self, + ctx: LogStoreNamespace<'a>, + start_id: EntryId, + ) -> Result> { + let mut stream = self.reader.read(ctx, start_id)?; + let filter = &(self.filter); + let stream = try_stream!({ + while let Some(entry) = stream.next().await { + let entry = entry?; + if filter(&entry) { + yield entry + } + } + }); + + Ok(Box::pin(stream)) + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use common_wal::options::WalOptions; + use futures::stream; + use store_api::logstore::entry::{Entry, RawEntry}; + use store_api::logstore::entry_stream::SendableEntryStream; + use store_api::logstore::namespace::Namespace; + use store_api::logstore::{ + AppendBatchResponse, AppendResponse, EntryId, LogStore, NamespaceId, + }; + use store_api::storage::RegionId; + + use super::*; + use crate::error; + + #[derive(Debug)] + struct MockLogStore { + entries: Vec, + } + + #[derive(Debug, Eq, PartialEq, Clone, Copy, Default, Hash)] + struct MockNamespace; + + impl Namespace for MockNamespace { + fn id(&self) -> NamespaceId { + 0 + } + } + + #[async_trait::async_trait] + impl LogStore for MockLogStore { + type Entry = RawEntry; + type Error = error::Error; + type Namespace = MockNamespace; + + async fn stop(&self) -> Result<(), Self::Error> { + unreachable!() + } + + async fn append(&self, entry: Self::Entry) -> Result { + unreachable!() + } + + async fn append_batch( + &self, + entries: Vec, + ) -> Result { + unreachable!() + } + + async fn read( + &self, + ns: &Self::Namespace, + id: EntryId, + ) -> Result, Self::Error> { + Ok(Box::pin(stream::iter(vec![Ok(self.entries.clone())]))) + } + + async fn create_namespace(&self, ns: &Self::Namespace) -> Result<(), Self::Error> { + unreachable!() + } + + async fn delete_namespace(&self, ns: &Self::Namespace) -> Result<(), Self::Error> { + unreachable!() + } + + async fn list_namespaces(&self) -> Result, Self::Error> { + unreachable!() + } + + async fn obsolete( + &self, + ns: Self::Namespace, + entry_id: EntryId, + ) -> Result<(), Self::Error> { + unreachable!() + } + + fn entry(&self, data: &mut Vec, entry_id: EntryId, ns: Self::Namespace) -> Self::Entry { + unreachable!() + } + + fn namespace(&self, _ns_id: NamespaceId, _wal_options: &WalOptions) -> Self::Namespace { + MockNamespace + } + } + + #[tokio::test] + async fn test_raw_entry_reader() { + let expected_entries = vec![RawEntry { + region_id: RegionId::new(1024, 1), + entry_id: 1, + data: vec![], + }]; + let store = MockLogStore { + entries: expected_entries.clone(), + }; + + let reader = LogStoreRawEntryReader::new(Arc::new(store)); + let entries = reader + .read( + LogStoreNamespace::RaftEngine(RaftEngineNamespace::new(RegionId::new(1024, 1))), + 0, + ) + .unwrap() + .try_collect::>() + .await + .unwrap(); + assert_eq!(expected_entries, entries); + } + + #[tokio::test] + async fn test_raw_entry_reader_filter() { + let all_entries = vec![ + RawEntry { + region_id: RegionId::new(1024, 1), + entry_id: 1, + data: vec![1], + }, + RawEntry { + region_id: RegionId::new(1024, 2), + entry_id: 2, + data: vec![2], + }, + RawEntry { + region_id: RegionId::new(1024, 3), + entry_id: 3, + data: vec![3], + }, + ]; + let store = MockLogStore { + entries: all_entries.clone(), + }; + + let expected_region_id = RegionId::new(1024, 3); + let reader = + RawEntryReaderFilter::new(LogStoreRawEntryReader::new(Arc::new(store)), |entry| { + entry.region_id == expected_region_id + }); + let entries = reader + .read( + LogStoreNamespace::RaftEngine(RaftEngineNamespace::new(RegionId::new(1024, 1))), + 0, + ) + .unwrap() + .try_collect::>() + .await + .unwrap(); + assert_eq!( + all_entries + .into_iter() + .filter(|entry| entry.region_id == expected_region_id) + .collect::>(), + entries + ); + } } diff --git a/src/store-api/src/logstore/entry.rs b/src/store-api/src/logstore/entry.rs index 50e58a38fe43..09daa2e1abb9 100644 --- a/src/store-api/src/logstore/entry.rs +++ b/src/store-api/src/logstore/entry.rs @@ -19,12 +19,35 @@ use crate::storage::RegionId; pub type Id = u64; /// The raw Wal entry. +#[derive(Debug, Clone, PartialEq, Eq)] pub struct RawEntry { pub region_id: RegionId, pub entry_id: Id, pub data: Vec, } +impl Entry for RawEntry { + fn into_raw_entry(self) -> RawEntry { + self + } + + fn data(&self) -> &[u8] { + &self.data + } + + fn id(&self) -> Id { + self.entry_id + } + + fn region_id(&self) -> RegionId { + self.region_id + } + + fn estimated_size(&self) -> usize { + std::mem::size_of_val(self) + } +} + /// Entry is the minimal data storage unit through which users interact with the log store. /// The log store implementation may have larger or smaller data storage unit than an entry. pub trait Entry: Send + Sync {