Skip to content

Commit

Permalink
feat: implement the WalEntryDistributor and WalEntryReceiver
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed May 30, 2024
1 parent 6e9a9dc commit 9b20d49
Show file tree
Hide file tree
Showing 6 changed files with 427 additions and 17 deletions.
10 changes: 9 additions & 1 deletion src/mito2/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,13 @@ pub enum Error {
location: Location,
},

#[snafu(display("Invalid wal read request, {}", reason))]
InvalidWalReadRequest {
reason: String,
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Failed to convert array to vector"))]
ConvertVector {
#[snafu(implicit)]
Expand Down Expand Up @@ -787,7 +794,8 @@ impl ErrorExt for Error {
| ConvertColumnDataType { .. }
| ColumnNotFound { .. }
| InvalidMetadata { .. }
| InvalidRegionOptions { .. } => StatusCode::InvalidArguments,
| InvalidRegionOptions { .. }
| InvalidWalReadRequest { .. } => StatusCode::InvalidArguments,

InvalidRegionRequestSchemaVersion { .. } => StatusCode::RequestOutdated,

Expand Down
1 change: 1 addition & 0 deletions src/mito2/src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ pub mod meta_util;
pub mod scheduler_util;
pub mod sst_util;
pub mod version_util;
pub mod wal_util;

use std::collections::HashMap;
use std::path::Path;
Expand Down
34 changes: 34 additions & 0 deletions src/mito2/src/test_util/wal_util.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use async_stream::stream;
use futures::stream;
use store_api::logstore::entry::Entry;
use store_api::logstore::provider::Provider;
use store_api::logstore::EntryId;

use crate::error::Result;
use crate::wal::raw_entry_reader::{EntryStream, RawEntryReader};

pub(crate) struct MockRawEntryStream {
pub(crate) entries: Vec<Entry>,
}

impl RawEntryReader for MockRawEntryStream {
fn read(&self, ns: &Provider, start_id: EntryId) -> Result<EntryStream<'static>> {
let entries = self.entries.clone().into_iter().map(Ok);

Ok(Box::pin(stream::iter(entries)))
}
}
9 changes: 6 additions & 3 deletions src/mito2/src/wal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@
/// TODO(weny): remove it
#[allow(unused)]
pub(crate) mod raw_entry_reader;
pub(crate) mod entry_distributor;
/// TODO(weny): remove it
#[allow(unused)]
pub(crate) mod entry_reader;
/// TODO(weny): remove it
#[allow(unused)]
pub(crate) mod wal_entry_reader;
pub(crate) mod raw_entry_reader;

use std::collections::HashMap;
use std::mem;
Expand All @@ -36,8 +39,8 @@ use store_api::logstore::{AppendBatchResponse, LogStore};
use store_api::storage::RegionId;

use crate::error::{BuildEntrySnafu, DeleteWalSnafu, EncodeWalSnafu, Result, WriteWalSnafu};
use crate::wal::entry_reader::{LogStoreEntryReader, WalEntryReader};
use crate::wal::raw_entry_reader::{LogStoreRawEntryReader, RegionRawEntryReader};
use crate::wal::wal_entry_reader::{LogStoreEntryReader, WalEntryReader};

/// WAL entry id.
pub type EntryId = store_api::logstore::entry::Id;
Expand Down
Loading

0 comments on commit 9b20d49

Please sign in to comment.