Skip to content

Commit

Permalink
chore: remove unused codes
Browse files Browse the repository at this point in the history
  • Loading branch information
evenyag committed May 27, 2024
1 parent 020ee2c commit eb242be
Show file tree
Hide file tree
Showing 6 changed files with 11 additions and 89 deletions.
13 changes: 0 additions & 13 deletions src/mito2/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@ use crate::sst::DEFAULT_WRITE_BUFFER_SIZE;
const DEFAULT_MAX_BG_JOB: usize = 4;

const MULTIPART_UPLOAD_MINIMUM_SIZE: ReadableSize = ReadableSize::mb(5);
/// Default channel size for parallel scan task.
const DEFAULT_SCAN_CHANNEL_SIZE: usize = 32;

// Use `1/GLOBAL_WRITE_BUFFER_SIZE_FACTOR` of OS memory as global write buffer size in default mode
const GLOBAL_WRITE_BUFFER_SIZE_FACTOR: u64 = 8;
Expand Down Expand Up @@ -99,8 +97,6 @@ pub struct MitoConfig {
/// - 1: scan in current thread.
/// - n: scan in parallelism n.
pub scan_parallelism: usize,
/// Capacity of the channel to send data from parallel scan tasks to the main task (default 32).
pub parallel_scan_channel_size: usize,
/// Whether to allow stale entries read during replay.
pub allow_stale_entries: bool,

Expand Down Expand Up @@ -132,7 +128,6 @@ impl Default for MitoConfig {
experimental_write_cache_ttl: Some(Duration::from_secs(60 * 60)),
sst_write_buffer_size: DEFAULT_WRITE_BUFFER_SIZE,
scan_parallelism: divide_num_cpus(4),
parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
allow_stale_entries: false,
inverted_index: InvertedIndexConfig::default(),
memtable: MemtableConfig::default(),
Expand Down Expand Up @@ -189,14 +184,6 @@ impl MitoConfig {
self.scan_parallelism = divide_num_cpus(4);
}

if self.parallel_scan_channel_size < 1 {
self.parallel_scan_channel_size = DEFAULT_SCAN_CHANNEL_SIZE;
warn!(
"Sanitize scan channel size to {}",
self.parallel_scan_channel_size
);
}

// Sets write cache path if it is empty.
if self.experimental_write_cache_path.is_empty() {
self.experimental_write_cache_path = join_dir(data_home, "write_cache");
Expand Down
1 change: 0 additions & 1 deletion src/mito2/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,6 @@ impl EngineInner {
let cache_manager = self.workers.cache_manager();
let scan_parallelism = ScanParallism {
parallelism: self.config.scan_parallelism,
channel_size: self.config.parallel_scan_channel_size,
};

let scan_region = ScanRegion::new(
Expand Down
14 changes: 5 additions & 9 deletions src/mito2/src/engine/parallel_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,10 @@ async fn scan_in_parallel(
region_id: RegionId,
region_dir: &str,
parallelism: usize,
channel_size: usize,
) {
let engine = env
.open_engine(MitoConfig {
scan_parallelism: parallelism,
parallel_scan_channel_size: channel_size,
..Default::default()
})
.await;
Expand Down Expand Up @@ -120,15 +118,13 @@ async fn test_parallel_scan() {

engine.stop().await.unwrap();

scan_in_parallel(&mut env, region_id, &region_dir, 0, 1).await;
scan_in_parallel(&mut env, region_id, &region_dir, 0).await;

scan_in_parallel(&mut env, region_id, &region_dir, 1, 1).await;
scan_in_parallel(&mut env, region_id, &region_dir, 1).await;

scan_in_parallel(&mut env, region_id, &region_dir, 2, 1).await;
scan_in_parallel(&mut env, region_id, &region_dir, 2).await;

scan_in_parallel(&mut env, region_id, &region_dir, 2, 8).await;
scan_in_parallel(&mut env, region_id, &region_dir, 4).await;

scan_in_parallel(&mut env, region_id, &region_dir, 4, 8).await;

scan_in_parallel(&mut env, region_id, &region_dir, 8, 2).await;
scan_in_parallel(&mut env, region_id, &region_dir, 8).await;
}
2 changes: 2 additions & 0 deletions src/mito2/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -747,6 +747,8 @@ impl<T: BatchReader + ?Sized> BatchReader for Box<T> {
}

/// Metrics for scanners.
// We allow dead code here because we only read some fields in formatting `Debug`.
#[allow(dead_code)]
#[derive(Debug, Default)]
pub(crate) struct ScannerMetrics {
/// Duration to prepare the scan task.
Expand Down
68 changes: 3 additions & 65 deletions src/mito2/src/read/scan_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,9 @@ use common_recordbatch::SendableRecordBatchStream;
use common_telemetry::{debug, error, warn};
use common_time::range::TimestampRange;
use common_time::Timestamp;
use store_api::region_engine::{RegionScannerRef, SinglePartitionScanner};
use store_api::region_engine::RegionScannerRef;
use store_api::storage::ScanRequest;
use table::predicate::{build_time_range_predicate, Predicate};
use tokio::sync::{mpsc, Semaphore};
use tokio_stream::wrappers::ReceiverStream;

use crate::access_layer::AccessLayerRef;
use crate::cache::file_cache::FileCacheRef;
Expand All @@ -39,7 +37,7 @@ use crate::read::compat::{CompatBatch, CompatReader};
use crate::read::projection::ProjectionMapper;
use crate::read::seq_scan::SeqScan;
use crate::read::unordered_scan::UnorderedScan;
use crate::read::{compat, Batch, Source};
use crate::read::{compat, Source};
use crate::region::version::VersionRef;
use crate::sst::file::{FileHandle, FileMeta};
use crate::sst::index::applier::builder::SstIndexApplierBuilder;
Expand Down Expand Up @@ -353,15 +351,6 @@ impl ScanRegion {
pub(crate) struct ScanParallism {
/// Number of tasks expect to spawn to read data.
pub(crate) parallelism: usize,
/// Channel size to send batches. Only takes effect when the parallelism > 1.
pub(crate) channel_size: usize,
}

impl ScanParallism {
/// Returns true if we allow parallel scan.
pub(crate) fn allow_parallel_scan(&self) -> bool {
self.parallelism > 1
}
}

/// Returns true if the time range of a SST `file` matches the `predicate`.
Expand Down Expand Up @@ -548,28 +537,7 @@ impl ScanInput {
Ok(sources)
}

/// Scans sources in parallel.
///
/// # Panics if the input doesn't allow parallel scan.
pub(crate) async fn build_parallel_sources(&self) -> Result<Vec<Source>> {
assert!(self.parallelism.allow_parallel_scan());
// Scall all memtables and SSTs.
let sources = self.build_sources().await?;
let semaphore = Arc::new(Semaphore::new(self.parallelism.parallelism));
// Spawn a task for each source.
let sources = sources
.into_iter()
.map(|source| {
let (sender, receiver) = mpsc::channel(self.parallelism.channel_size);
self.spawn_scan_task(source, semaphore.clone(), sender);
let stream = Box::pin(ReceiverStream::new(receiver));
Source::Stream(stream)
})
.collect();
Ok(sources)
}

/// Prunes file ranges to scan and adds them tothe `collector`.
/// Prunes file ranges to scan and adds them to the `collector`.
pub(crate) async fn prune_file_ranges(
&self,
collector: &mut impl FileRangeCollector,
Expand Down Expand Up @@ -623,36 +591,6 @@ impl ScanInput {

Ok(())
}

/// Scans the input source in another task and sends batches to the sender.
pub(crate) fn spawn_scan_task(
&self,
mut input: Source,
semaphore: Arc<Semaphore>,
sender: mpsc::Sender<Result<Batch>>,
) {
common_runtime::spawn_read(async move {
loop {
// We release the permit before sending result to avoid the task waiting on
// the channel with the permit holded
let maybe_batch = {
// Safety: We never close the semaphore.
let _permit = semaphore.acquire().await.unwrap();
input.next_batch().await
};
match maybe_batch {
Ok(Some(batch)) => {
let _ = sender.send(Ok(batch)).await;
}
Ok(None) => break,
Err(e) => {
let _ = sender.send(Err(e)).await;
break;
}
}
}
});
}
}

#[cfg(test)]
Expand Down
2 changes: 1 addition & 1 deletion src/mito2/src/sst/parquet/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,7 @@ impl RowGroupReaderBuilder {
&self.file_path
}

/// Returns the file handle.
/// Handle of the file to read.
pub(crate) fn file_handle(&self) -> &FileHandle {
&self.file_handle
}
Expand Down

0 comments on commit eb242be

Please sign in to comment.