Skip to content

Commit

Permalink
feat: implement the LogStoreRawEntryReader
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed May 24, 2024
1 parent 466f7c6 commit e5b3f2d
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 3 deletions.
9 changes: 9 additions & 0 deletions src/mito2/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,14 @@ pub enum Error {
source: BoxedError,
},

#[snafu(display("Failed to read WAL, topic: {}", topic))]
ReadKafkaWal {
topic: String,
#[snafu(implicit)]
location: Location,
source: BoxedError,
},

#[snafu(display("Failed to decode WAL entry, region_id: {}", region_id))]
DecodeWal {
region_id: RegionId,
Expand Down Expand Up @@ -742,6 +750,7 @@ impl ErrorExt for Error {
| ReadParquet { .. }
| WriteWal { .. }
| ReadWal { .. }
| ReadKafkaWal { .. }
| DeleteWal { .. } => StatusCode::StorageUnavailable,
CompressObject { .. }
| DecompressObject { .. }
Expand Down
105 changes: 102 additions & 3 deletions src/mito2/src/wal/raw_entry_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,19 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use async_stream::try_stream;
use common_error::ext::BoxedError;
use common_wal::options::{KafkaWalOptions, WalOptions};
use futures::stream::BoxStream;
use store_api::logstore::entry::RawEntry;
use snafu::ResultExt;
use store_api::logstore::entry::{Entry, RawEntry};
use store_api::logstore::LogStore;
use store_api::storage::RegionId;
use tokio_stream::StreamExt;

use crate::error::Result;
use crate::error::{self, Result};
use crate::wal::EntryId;

/// A stream that yields [RawEntry].
Expand All @@ -40,5 +48,96 @@ pub(crate) enum LogStoreNamespace<'a> {

/// [RawEntryReader] provides the ability to read [RawEntry] from the underlying [LogStore].
pub(crate) trait RawEntryReader: Send + Sync {
fn read(&self, ctx: LogStoreNamespace, start_id: EntryId) -> Result<RawEntryStream<'static>>;
fn read<'a>(
&'a self,
ctx: LogStoreNamespace<'a>,
start_id: EntryId,
) -> Result<RawEntryStream<'a>>;
}

pub struct LogStoreRawEntryReader<S> {
store: Arc<S>,
}

impl<S: LogStore> LogStoreRawEntryReader<S> {
pub fn new(store: Arc<S>) -> Self {
Self { store }
}

fn read_region(&self, ns: RaftEngineNamespace, start_id: EntryId) -> Result<RawEntryStream> {
let region_id = ns.region_id;
let stream = try_stream!({
// TODO(weny): refactor the `namespace` method.
let namespace = self.store.namespace(region_id.into(), &Default::default());
let mut stream = self
.store
.read(&namespace, start_id)
.await
.map_err(BoxedError::new)
.context(error::ReadWalSnafu { region_id })?;

while let Some(entries) = stream.next().await {
let entries = entries
.map_err(BoxedError::new)
.context(error::ReadWalSnafu { region_id })?;

for entry in entries {
yield entry.into_raw_entry()
}
}
});

Ok(Box::pin(stream))
}

fn read_topic<'a>(
&'a self,
ns: KafkaNamespace<'a>,
start_id: EntryId,
) -> Result<RawEntryStream> {
let topic = ns.topic;
let stream = try_stream!({
// TODO(weny): refactor the `namespace` method.
let namespace = self.store.namespace(
RegionId::from_u64(0).into(),
&WalOptions::Kafka(KafkaWalOptions {
topic: topic.to_string(),
}),
);

let mut stream = self
.store
.read(&namespace, start_id)
.await
.map_err(BoxedError::new)
.context(error::ReadKafkaWalSnafu { topic })?;

while let Some(entries) = stream.next().await {
let entries = entries
.map_err(BoxedError::new)
.context(error::ReadKafkaWalSnafu { topic })?;

for entry in entries {
yield entry.into_raw_entry()
}
}
});

Ok(Box::pin(stream))
}
}

impl<S: LogStore> RawEntryReader for LogStoreRawEntryReader<S> {
fn read<'a>(
&'a self,
ctx: LogStoreNamespace<'a>,
start_id: EntryId,
) -> Result<RawEntryStream<'a>> {
let stream = match ctx {
LogStoreNamespace::RaftEngine(ns) => self.read_region(ns, start_id)?,
LogStoreNamespace::Kafka(ns) => self.read_topic(ns, start_id)?,
};

Ok(Box::pin(stream))
}
}

0 comments on commit e5b3f2d

Please sign in to comment.