Skip to content

Commit

Permalink
chore: remove unused code (#4135)
Browse files Browse the repository at this point in the history
* chore: remove unused code

* Update src/mito2/src/wal/entry_reader.rs

Co-authored-by: Ruihang Xia <[email protected]>

---------

Co-authored-by: Ruihang Xia <[email protected]>
  • Loading branch information
WenyXu and waynexia authored Jun 12, 2024
1 parent 65f8b72 commit 14a2d83
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 47 deletions.
6 changes: 0 additions & 6 deletions src/mito2/src/wal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,8 @@

//! Write ahead log of the engine.
/// TODO(weny): remove it
#[allow(unused)]
pub(crate) mod entry_distributor;
/// TODO(weny): remove it
#[allow(unused)]
pub(crate) mod entry_reader;
/// TODO(weny): remove it
#[allow(unused)]
pub(crate) mod raw_entry_reader;

use std::collections::HashMap;
Expand Down
24 changes: 8 additions & 16 deletions src/mito2/src/wal/entry_distributor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,17 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::cmp::min;
use std::collections::HashMap;
use std::sync::Arc;

use api::v1::WalEntry;
use async_stream::stream;
use common_telemetry::{debug, error};
use futures::future::join_all;
use snafu::{ensure, OptionExt};
use snafu::OptionExt;
use store_api::logstore::entry::Entry;
use store_api::logstore::provider::Provider;
use store_api::storage::RegionId;
use tokio::sync::mpsc::{self, Receiver, Sender, UnboundedReceiver, UnboundedSender};
use tokio::sync::mpsc::{self, Receiver, Sender};
use tokio::sync::oneshot;
use tokio_stream::StreamExt;

Expand Down Expand Up @@ -99,30 +97,24 @@ impl WalEntryDistributor {
/// Receives the Wal entries from [WalEntryDistributor].
#[derive(Debug)]
pub(crate) struct WalEntryReceiver {
region_id: RegionId,
/// Receives the [Entry] from the [WalEntryDistributor].
entry_receiver: Option<Receiver<Entry>>,
/// Sends the `start_id` to the [WalEntryDistributor].
arg_sender: Option<oneshot::Sender<EntryId>>,
}

impl WalEntryReceiver {
pub fn new(
region_id: RegionId,
entry_receiver: Receiver<Entry>,
arg_sender: oneshot::Sender<EntryId>,
) -> Self {
pub fn new(entry_receiver: Receiver<Entry>, arg_sender: oneshot::Sender<EntryId>) -> Self {
Self {
region_id,
entry_receiver: Some(entry_receiver),
arg_sender: Some(arg_sender),
}
}
}

impl WalEntryReader for WalEntryReceiver {
fn read(&mut self, provider: &Provider, start_id: EntryId) -> Result<WalEntryStream<'static>> {
let mut arg_sender =
fn read(&mut self, _provider: &Provider, start_id: EntryId) -> Result<WalEntryStream<'static>> {
let arg_sender =
self.arg_sender
.take()
.with_context(|| error::InvalidWalReadRequestSnafu {
Expand Down Expand Up @@ -205,7 +197,7 @@ pub fn build_wal_entry_distributor_and_receivers(

senders.insert(region_id, entry_sender);
arg_receivers.push((region_id, arg_receiver));
readers.push(WalEntryReceiver::new(region_id, entry_receiver, arg_sender));
readers.push(WalEntryReceiver::new(entry_receiver, arg_sender));
}

(
Expand All @@ -223,7 +215,7 @@ pub fn build_wal_entry_distributor_and_receivers(
mod tests {
use std::assert_matches::assert_matches;

use api::v1::{Mutation, OpType};
use api::v1::{Mutation, OpType, WalEntry};
use futures::{stream, TryStreamExt};
use prost::Message;
use store_api::logstore::entry::{Entry, MultiplePartEntry, MultiplePartHeader, NaiveEntry};
Expand All @@ -244,7 +236,7 @@ mod tests {
}

impl RawEntryReader for MockRawEntryReader {
fn read(&self, provider: &Provider, _start_id: EntryId) -> Result<EntryStream<'static>> {
fn read(&self, _provider: &Provider, _start_id: EntryId) -> Result<EntryStream<'static>> {
let stream = stream::iter(self.entries.clone().into_iter().map(Ok));
Ok(Box::pin(stream))
}
Expand Down
8 changes: 2 additions & 6 deletions src/mito2/src/wal/entry_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,11 @@

use api::v1::WalEntry;
use async_stream::stream;
use common_telemetry::info;
use futures::StreamExt;
use prost::Message;
use snafu::{ensure, ResultExt};
use store_api::logstore::entry::Entry;
use store_api::logstore::provider::Provider;
use store_api::storage::RegionId;

use crate::error::{CorruptedEntrySnafu, DecodeWalSnafu, Result};
use crate::wal::raw_entry_reader::RawEntryReader;
Expand Down Expand Up @@ -90,17 +88,15 @@ mod tests {
use std::assert_matches::assert_matches;

use api::v1::{Mutation, OpType, WalEntry};
use futures::{stream, TryStreamExt};
use futures::TryStreamExt;
use prost::Message;
use store_api::logstore::entry::{Entry, MultiplePartEntry, MultiplePartHeader};
use store_api::logstore::provider::Provider;
use store_api::storage::RegionId;

use crate::error::{self, Result};
use crate::error;
use crate::test_util::wal_util::MockRawEntryStream;
use crate::wal::entry_reader::{LogStoreEntryReader, WalEntryReader};
use crate::wal::raw_entry_reader::{EntryStream, RawEntryReader};
use crate::wal::EntryId;

#[tokio::test]
async fn test_tail_corrupted_stream() {
Expand Down
33 changes: 14 additions & 19 deletions src/mito2/src/wal/raw_entry_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,10 @@ use std::sync::Arc;

use async_stream::try_stream;
use common_error::ext::BoxedError;
use common_wal::options::{KafkaWalOptions, WalOptions};
use futures::stream::BoxStream;
use futures::TryStreamExt;
use snafu::ResultExt;
use store_api::logstore::entry::Entry;
use store_api::logstore::provider::{KafkaProvider, Provider, RaftEngineProvider};
use store_api::logstore::provider::Provider;
use store_api::logstore::LogStore;
use store_api::storage::RegionId;
use tokio_stream::StreamExt;
Expand Down Expand Up @@ -119,12 +117,9 @@ where
mod tests {
use std::sync::Arc;

use common_wal::options::WalOptions;
use futures::stream;
use futures::{stream, TryStreamExt};
use store_api::logstore::entry::{Entry, NaiveEntry};
use store_api::logstore::{
AppendBatchResponse, AppendResponse, EntryId, LogStore, SendableEntryStream,
};
use store_api::logstore::{AppendBatchResponse, EntryId, LogStore, SendableEntryStream};
use store_api::storage::RegionId;

use super::*;
Expand All @@ -145,24 +140,24 @@ mod tests {

async fn append_batch(
&self,
entries: Vec<Entry>,
_entries: Vec<Entry>,
) -> Result<AppendBatchResponse, Self::Error> {
unreachable!()
}

async fn read(
&self,
provider: &Provider,
id: EntryId,
_provider: &Provider,
_id: EntryId,
) -> Result<SendableEntryStream<'static, Entry, Self::Error>, Self::Error> {
Ok(Box::pin(stream::iter(vec![Ok(self.entries.clone())])))
}

async fn create_namespace(&self, ns: &Provider) -> Result<(), Self::Error> {
async fn create_namespace(&self, _ns: &Provider) -> Result<(), Self::Error> {
unreachable!()
}

async fn delete_namespace(&self, ns: &Provider) -> Result<(), Self::Error> {
async fn delete_namespace(&self, _ns: &Provider) -> Result<(), Self::Error> {
unreachable!()
}

Expand All @@ -172,18 +167,18 @@ mod tests {

async fn obsolete(
&self,
provider: &Provider,
entry_id: EntryId,
_provider: &Provider,
_entry_id: EntryId,
) -> Result<(), Self::Error> {
unreachable!()
}

fn entry(
&self,
data: &mut Vec<u8>,
entry_id: EntryId,
region_id: RegionId,
provider: &Provider,
_data: &mut Vec<u8>,
_entry_id: EntryId,
_region_id: RegionId,
_provider: &Provider,
) -> Result<Entry, Self::Error> {
unreachable!()
}
Expand Down

0 comments on commit 14a2d83

Please sign in to comment.