Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add RawEntryReader and OneshotWalEntryReader trait #4027

Merged
merged 3 commits into from
May 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 13 additions & 4 deletions src/log-store/src/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,9 @@ pub(crate) mod util;
use std::fmt::Display;

use serde::{Deserialize, Serialize};
use store_api::logstore::entry::{Entry, Id as EntryId};
use store_api::logstore::entry::{Entry, Id as EntryId, RawEntry};
use store_api::logstore::namespace::Namespace;

use crate::error::Error;
use store_api::storage::RegionId;

/// Kafka Namespace implementation.
#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
Expand Down Expand Up @@ -56,7 +55,13 @@ pub struct EntryImpl {
}

impl Entry for EntryImpl {
type Error = Error;
fn into_raw_entry(self) -> RawEntry {
RawEntry {
region_id: self.region_id(),
entry_id: self.id(),
data: self.data,
}
}

fn data(&self) -> &[u8] {
&self.data
Expand All @@ -66,6 +71,10 @@ impl Entry for EntryImpl {
self.id
}

fn region_id(&self) -> RegionId {
RegionId::from_u64(self.ns.region_id)
}

fn estimated_size(&self) -> usize {
size_of::<Self>() + self.data.capacity() * size_of::<u8>() + self.ns.topic.capacity()
}
Expand Down
15 changes: 13 additions & 2 deletions src/log-store/src/noop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@
// limitations under the License.

use common_wal::options::WalOptions;
use store_api::logstore::entry::{Entry, Id as EntryId};
use store_api::logstore::entry::{Entry, Id as EntryId, RawEntry};
use store_api::logstore::namespace::{Id as NamespaceId, Namespace};
use store_api::logstore::{AppendBatchResponse, AppendResponse, LogStore};
use store_api::storage::RegionId;

use crate::error::{Error, Result};

Expand All @@ -36,7 +37,13 @@ impl Namespace for NamespaceImpl {
}

impl Entry for EntryImpl {
type Error = Error;
fn into_raw_entry(self) -> RawEntry {
RawEntry {
region_id: self.region_id(),
entry_id: self.id(),
data: vec![],
}
}

fn data(&self) -> &[u8] {
&[]
Expand All @@ -46,6 +53,10 @@ impl Entry for EntryImpl {
0
}

fn region_id(&self) -> RegionId {
RegionId::from_u64(0)
}

fn estimated_size(&self) -> usize {
0
}
Expand Down
16 changes: 13 additions & 3 deletions src/log-store/src/raft_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
use std::hash::{Hash, Hasher};
use std::mem::size_of;

use store_api::logstore::entry::{Entry, Id as EntryId};
use store_api::logstore::entry::{Entry, Id as EntryId, RawEntry};
use store_api::logstore::namespace::{Id as NamespaceId, Namespace};
use store_api::storage::RegionId;

use crate::error::Error;
use crate::raft_engine::protos::logstore::{EntryImpl, NamespaceImpl};

mod backend;
Expand Down Expand Up @@ -67,7 +67,13 @@ impl Namespace for NamespaceImpl {
}

impl Entry for EntryImpl {
type Error = Error;
fn into_raw_entry(self) -> RawEntry {
RawEntry {
region_id: self.region_id(),
entry_id: self.id(),
data: self.data,
}
}

fn data(&self) -> &[u8] {
self.data.as_slice()
Expand All @@ -77,6 +83,10 @@ impl Entry for EntryImpl {
self.id
}

fn region_id(&self) -> RegionId {
RegionId::from_u64(self.id)
}

fn estimated_size(&self) -> usize {
self.data.len() + size_of::<u64>() + size_of::<u64>()
}
Expand Down
7 changes: 7 additions & 0 deletions src/mito2/src/wal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,13 @@

//! Write ahead log of the engine.

/// TODO(weny): remove it
#[allow(unused)]
pub(crate) mod raw_entry_reader;
/// TODO(weny): remove it
#[allow(unused)]
pub(crate) mod wal_entry_reader;

use std::collections::HashMap;
use std::mem;
use std::sync::Arc;
Expand Down
44 changes: 44 additions & 0 deletions src/mito2/src/wal/raw_entry_reader.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use futures::stream::BoxStream;
use store_api::logstore::entry::RawEntry;
use store_api::storage::RegionId;

use crate::error::Result;
use crate::wal::EntryId;

/// A stream that yields [RawEntry].
pub type RawEntryStream<'a> = BoxStream<'a, Result<RawEntry>>;

// The namespace of kafka log store
pub struct KafkaNamespace<'a> {
topic: &'a str,
}

// The namespace of raft engine log store
pub struct RaftEngineNamespace {
region_id: RegionId,
}

/// The namespace of [RawEntryReader].
pub(crate) enum LogStoreNamespace<'a> {
RaftEngine(RaftEngineNamespace),
Kafka(KafkaNamespace<'a>),
}

/// [RawEntryReader] provides the ability to read [RawEntry] from the underlying [LogStore].
pub(crate) trait RawEntryReader: Send + Sync {
fn read(&self, ctx: LogStoreNamespace, start_id: EntryId) -> Result<RawEntryStream<'static>>;
}
24 changes: 24 additions & 0 deletions src/mito2/src/wal/wal_entry_reader.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use store_api::storage::RegionId;

use crate::error::Result;
use crate::wal::raw_entry_reader::LogStoreNamespace;
use crate::wal::{EntryId, WalEntryStream};

/// [OneshotWalEntryReader] provides the ability to read and decode entries from the underlying store.
pub(crate) trait OneshotWalEntryReader: Send + Sync {
fn read(self, ctx: LogStoreNamespace, start_id: EntryId) -> Result<WalEntryStream>;
}
15 changes: 13 additions & 2 deletions src/store-api/src/logstore/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,24 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use common_error::ext::ErrorExt;
use crate::storage::RegionId;

/// An entry's id.
/// Different log store implementations may interpret the id to different meanings.
pub type Id = u64;

/// The raw Wal entry.
pub struct RawEntry {
pub region_id: RegionId,
pub entry_id: Id,
pub data: Vec<u8>,
}

/// Entry is the minimal data storage unit through which users interact with the log store.
/// The log store implementation may have larger or smaller data storage unit than an entry.
pub trait Entry: Send + Sync {
type Error: ErrorExt + Send + Sync;
/// Consumes [Entry] and converts to [RawEntry].
fn into_raw_entry(self) -> RawEntry;

/// Returns the contained data of the entry.
fn data(&self) -> &[u8];
Expand All @@ -30,6 +38,9 @@ pub trait Entry: Send + Sync {
/// Usually the namespace id is identical with the region id.
fn id(&self) -> Id;

/// Returns the [RegionId]
fn region_id(&self) -> RegionId;

/// Computes the estimated encoded size.
fn estimated_size(&self) -> usize;
}
14 changes: 13 additions & 1 deletion src/store-api/src/logstore/entry_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ mod tests {

use super::*;
pub use crate::logstore::entry::Id;
use crate::logstore::entry::RawEntry;
use crate::storage::RegionId;

pub struct SimpleEntry {
/// Binary data of current entry
Expand All @@ -64,7 +66,13 @@ mod tests {
}

impl Entry for SimpleEntry {
type Error = Error;
fn into_raw_entry(self) -> RawEntry {
RawEntry {
region_id: RegionId::from_u64(0),
entry_id: 0,
data: vec![],
}
}

fn data(&self) -> &[u8] {
&self.data
Expand All @@ -74,6 +82,10 @@ mod tests {
0u64
}

fn region_id(&self) -> RegionId {
RegionId::from_u64(0)
}

fn estimated_size(&self) -> usize {
self.data.len()
}
Expand Down