diff --git a/src/mito2/src/config.rs b/src/mito2/src/config.rs index 7d633765a0d9..2a3a190a207b 100644 --- a/src/mito2/src/config.rs +++ b/src/mito2/src/config.rs @@ -31,8 +31,6 @@ use crate::sst::DEFAULT_WRITE_BUFFER_SIZE; const DEFAULT_MAX_BG_JOB: usize = 4; const MULTIPART_UPLOAD_MINIMUM_SIZE: ReadableSize = ReadableSize::mb(5); -/// Default channel size for parallel scan task. -const DEFAULT_SCAN_CHANNEL_SIZE: usize = 32; // Use `1/GLOBAL_WRITE_BUFFER_SIZE_FACTOR` of OS memory as global write buffer size in default mode const GLOBAL_WRITE_BUFFER_SIZE_FACTOR: u64 = 8; @@ -99,8 +97,6 @@ pub struct MitoConfig { /// - 1: scan in current thread. /// - n: scan in parallelism n. pub scan_parallelism: usize, - /// Capacity of the channel to send data from parallel scan tasks to the main task (default 32). - pub parallel_scan_channel_size: usize, /// Whether to allow stale entries read during replay. pub allow_stale_entries: bool, @@ -132,7 +128,6 @@ impl Default for MitoConfig { experimental_write_cache_ttl: Some(Duration::from_secs(60 * 60)), sst_write_buffer_size: DEFAULT_WRITE_BUFFER_SIZE, scan_parallelism: divide_num_cpus(4), - parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE, allow_stale_entries: false, inverted_index: InvertedIndexConfig::default(), memtable: MemtableConfig::default(), @@ -189,14 +184,6 @@ impl MitoConfig { self.scan_parallelism = divide_num_cpus(4); } - if self.parallel_scan_channel_size < 1 { - self.parallel_scan_channel_size = DEFAULT_SCAN_CHANNEL_SIZE; - warn!( - "Sanitize scan channel size to {}", - self.parallel_scan_channel_size - ); - } - // Sets write cache path if it is empty. if self.experimental_write_cache_path.is_empty() { self.experimental_write_cache_path = join_dir(data_home, "write_cache"); diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index 16e079afd5ec..d5cfbed310d5 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -265,7 +265,6 @@ impl EngineInner { let cache_manager = self.workers.cache_manager(); let scan_parallelism = ScanParallism { parallelism: self.config.scan_parallelism, - channel_size: self.config.parallel_scan_channel_size, }; let scan_region = ScanRegion::new( diff --git a/src/mito2/src/engine/parallel_test.rs b/src/mito2/src/engine/parallel_test.rs index cc5d98291230..eec923b42732 100644 --- a/src/mito2/src/engine/parallel_test.rs +++ b/src/mito2/src/engine/parallel_test.rs @@ -33,12 +33,10 @@ async fn scan_in_parallel( region_id: RegionId, region_dir: &str, parallelism: usize, - channel_size: usize, ) { let engine = env .open_engine(MitoConfig { scan_parallelism: parallelism, - parallel_scan_channel_size: channel_size, ..Default::default() }) .await; @@ -120,15 +118,13 @@ async fn test_parallel_scan() { engine.stop().await.unwrap(); - scan_in_parallel(&mut env, region_id, ®ion_dir, 0, 1).await; + scan_in_parallel(&mut env, region_id, ®ion_dir, 0).await; - scan_in_parallel(&mut env, region_id, ®ion_dir, 1, 1).await; + scan_in_parallel(&mut env, region_id, ®ion_dir, 1).await; - scan_in_parallel(&mut env, region_id, ®ion_dir, 2, 1).await; + scan_in_parallel(&mut env, region_id, ®ion_dir, 2).await; - scan_in_parallel(&mut env, region_id, ®ion_dir, 2, 8).await; + scan_in_parallel(&mut env, region_id, ®ion_dir, 4).await; - scan_in_parallel(&mut env, region_id, ®ion_dir, 4, 8).await; - - scan_in_parallel(&mut env, region_id, ®ion_dir, 8, 2).await; + scan_in_parallel(&mut env, region_id, ®ion_dir, 8).await; } diff --git a/src/mito2/src/read.rs b/src/mito2/src/read.rs index 2f634aa4fb40..f7259cc3406a 100644 --- a/src/mito2/src/read.rs +++ b/src/mito2/src/read.rs @@ -747,6 +747,8 @@ impl BatchReader for Box { } /// Metrics for scanners. +// We allow dead code here because we only read some fields in formatting `Debug`. +#[allow(dead_code)] #[derive(Debug, Default)] pub(crate) struct ScannerMetrics { /// Duration to prepare the scan task. diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index 6c2c26f62ab2..550515492968 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -23,11 +23,9 @@ use common_recordbatch::SendableRecordBatchStream; use common_telemetry::{debug, error, warn}; use common_time::range::TimestampRange; use common_time::Timestamp; -use store_api::region_engine::{RegionScannerRef, SinglePartitionScanner}; +use store_api::region_engine::RegionScannerRef; use store_api::storage::ScanRequest; use table::predicate::{build_time_range_predicate, Predicate}; -use tokio::sync::{mpsc, Semaphore}; -use tokio_stream::wrappers::ReceiverStream; use crate::access_layer::AccessLayerRef; use crate::cache::file_cache::FileCacheRef; @@ -39,7 +37,7 @@ use crate::read::compat::{CompatBatch, CompatReader}; use crate::read::projection::ProjectionMapper; use crate::read::seq_scan::SeqScan; use crate::read::unordered_scan::UnorderedScan; -use crate::read::{compat, Batch, Source}; +use crate::read::{compat, Source}; use crate::region::version::VersionRef; use crate::sst::file::{FileHandle, FileMeta}; use crate::sst::index::applier::builder::SstIndexApplierBuilder; @@ -353,15 +351,6 @@ impl ScanRegion { pub(crate) struct ScanParallism { /// Number of tasks expect to spawn to read data. pub(crate) parallelism: usize, - /// Channel size to send batches. Only takes effect when the parallelism > 1. - pub(crate) channel_size: usize, -} - -impl ScanParallism { - /// Returns true if we allow parallel scan. - pub(crate) fn allow_parallel_scan(&self) -> bool { - self.parallelism > 1 - } } /// Returns true if the time range of a SST `file` matches the `predicate`. @@ -548,28 +537,7 @@ impl ScanInput { Ok(sources) } - /// Scans sources in parallel. - /// - /// # Panics if the input doesn't allow parallel scan. - pub(crate) async fn build_parallel_sources(&self) -> Result> { - assert!(self.parallelism.allow_parallel_scan()); - // Scall all memtables and SSTs. - let sources = self.build_sources().await?; - let semaphore = Arc::new(Semaphore::new(self.parallelism.parallelism)); - // Spawn a task for each source. - let sources = sources - .into_iter() - .map(|source| { - let (sender, receiver) = mpsc::channel(self.parallelism.channel_size); - self.spawn_scan_task(source, semaphore.clone(), sender); - let stream = Box::pin(ReceiverStream::new(receiver)); - Source::Stream(stream) - }) - .collect(); - Ok(sources) - } - - /// Prunes file ranges to scan and adds them tothe `collector`. + /// Prunes file ranges to scan and adds them to the `collector`. pub(crate) async fn prune_file_ranges( &self, collector: &mut impl FileRangeCollector, @@ -623,36 +591,6 @@ impl ScanInput { Ok(()) } - - /// Scans the input source in another task and sends batches to the sender. - pub(crate) fn spawn_scan_task( - &self, - mut input: Source, - semaphore: Arc, - sender: mpsc::Sender>, - ) { - common_runtime::spawn_read(async move { - loop { - // We release the permit before sending result to avoid the task waiting on - // the channel with the permit holded - let maybe_batch = { - // Safety: We never close the semaphore. - let _permit = semaphore.acquire().await.unwrap(); - input.next_batch().await - }; - match maybe_batch { - Ok(Some(batch)) => { - let _ = sender.send(Ok(batch)).await; - } - Ok(None) => break, - Err(e) => { - let _ = sender.send(Err(e)).await; - break; - } - } - } - }); - } } #[cfg(test)] diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index c429d5f9bcd9..eb9bbb624766 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -570,7 +570,7 @@ impl RowGroupReaderBuilder { &self.file_path } - /// Returns the file handle. + /// Handle of the file to read. pub(crate) fn file_handle(&self) -> &FileHandle { &self.file_handle }