From 2fcb95f50a3499379350d503eecfdf2a9b4f7d9e Mon Sep 17 00:00:00 2001 From: Yingwen Date: Mon, 9 Dec 2024 20:50:57 +0800 Subject: [PATCH] fix!: fix regression caused by unbalanced partitions and splitting ranges (#5090) * feat: assign partition ranges by rows * feat: balance partition rows * feat: get uppoer bound for part nums * feat: only split in non-compaction seq scan * fix: parallel scan on multiple sources * fix: can split check * feat: scanner prepare by request * feat: remove scan_parallelism * docs: upate docs * chore: update comment * style: fix clippy * feat: skip merge and dedup if there is only one source * chore: Revert "feat: skip merge and dedup if there is only one source" Since memtable won't do dedup jobs This reverts commit 2fc7a54b11de1b7219e7fe2818a96b46405d3ee6. * test: avoid compaction in sqlness window sort test * chore: do not create semaphore if num partitions is enough * chore: more assertions * chore: fix typo * fix: compaction flag not set * chore: address review comments --- config/config.md | 2 - config/datanode.example.toml | 6 - config/standalone.example.toml | 6 - src/cmd/tests/load_config_test.rs | 2 - src/mito2/src/compaction.rs | 5 +- src/mito2/src/config.rs | 13 +- src/mito2/src/engine.rs | 26 +- src/mito2/src/engine/append_mode_test.rs | 7 +- src/mito2/src/engine/merge_mode_test.rs | 7 +- src/mito2/src/engine/parallel_test.rs | 5 +- src/mito2/src/read/range.rs | 260 ++++++++++++------ src/mito2/src/read/scan_region.rs | 96 ++++--- src/mito2/src/read/seq_scan.rs | 108 +++++--- src/mito2/src/read/unordered_scan.rs | 13 +- src/query/src/optimizer/parallelize_scan.rs | 171 ++++++++++-- src/store-api/src/region_engine.rs | 85 ++++-- src/table/src/table/scan.rs | 15 +- .../common/order/windowed_sort.result | 26 +- .../standalone/common/order/windowed_sort.sql | 4 +- 19 files changed, 554 insertions(+), 303 deletions(-) diff --git a/config/config.md b/config/config.md index ec00eb98b730..1f034d28731d 100644 --- a/config/config.md +++ b/config/config.md @@ -136,7 +136,6 @@ | `region_engine.mito.experimental_write_cache_size` | String | `1GiB` | Capacity for write cache. If your disk space is sufficient, it is recommended to set it larger. | | `region_engine.mito.experimental_write_cache_ttl` | String | Unset | TTL for write cache. | | `region_engine.mito.sst_write_buffer_size` | String | `8MB` | Buffer size for SST writing. | -| `region_engine.mito.scan_parallelism` | Integer | `0` | Parallelism to scan a region (default: 1/4 of cpu cores).
- `0`: using the default value (1/4 of cpu cores).
- `1`: scan in current thread.
- `n`: scan in parallelism n. | | `region_engine.mito.parallel_scan_channel_size` | Integer | `32` | Capacity of the channel to send data from parallel scan tasks to the main task. | | `region_engine.mito.allow_stale_entries` | Bool | `false` | Whether to allow stale WAL entries read during replay. | | `region_engine.mito.min_compaction_interval` | String | `0m` | Minimum time interval between two compactions.
To align with the old behavior, the default value is 0 (no restrictions). | @@ -464,7 +463,6 @@ | `region_engine.mito.experimental_write_cache_size` | String | `1GiB` | Capacity for write cache. If your disk space is sufficient, it is recommended to set it larger. | | `region_engine.mito.experimental_write_cache_ttl` | String | Unset | TTL for write cache. | | `region_engine.mito.sst_write_buffer_size` | String | `8MB` | Buffer size for SST writing. | -| `region_engine.mito.scan_parallelism` | Integer | `0` | Parallelism to scan a region (default: 1/4 of cpu cores).
- `0`: using the default value (1/4 of cpu cores).
- `1`: scan in current thread.
- `n`: scan in parallelism n. | | `region_engine.mito.parallel_scan_channel_size` | Integer | `32` | Capacity of the channel to send data from parallel scan tasks to the main task. | | `region_engine.mito.allow_stale_entries` | Bool | `false` | Whether to allow stale WAL entries read during replay. | | `region_engine.mito.min_compaction_interval` | String | `0m` | Minimum time interval between two compactions.
To align with the old behavior, the default value is 0 (no restrictions). | diff --git a/config/datanode.example.toml b/config/datanode.example.toml index c5fdd24ebe14..11c2794e61df 100644 --- a/config/datanode.example.toml +++ b/config/datanode.example.toml @@ -492,12 +492,6 @@ experimental_write_cache_ttl = "8h" ## Buffer size for SST writing. sst_write_buffer_size = "8MB" -## Parallelism to scan a region (default: 1/4 of cpu cores). -## - `0`: using the default value (1/4 of cpu cores). -## - `1`: scan in current thread. -## - `n`: scan in parallelism n. -scan_parallelism = 0 - ## Capacity of the channel to send data from parallel scan tasks to the main task. parallel_scan_channel_size = 32 diff --git a/config/standalone.example.toml b/config/standalone.example.toml index deaf8900f213..a69295af1644 100644 --- a/config/standalone.example.toml +++ b/config/standalone.example.toml @@ -530,12 +530,6 @@ experimental_write_cache_ttl = "8h" ## Buffer size for SST writing. sst_write_buffer_size = "8MB" -## Parallelism to scan a region (default: 1/4 of cpu cores). -## - `0`: using the default value (1/4 of cpu cores). -## - `1`: scan in current thread. -## - `n`: scan in parallelism n. -scan_parallelism = 0 - ## Capacity of the channel to send data from parallel scan tasks to the main task. parallel_scan_channel_size = 32 diff --git a/src/cmd/tests/load_config_test.rs b/src/cmd/tests/load_config_test.rs index 454188141d14..c5f1111d37b6 100644 --- a/src/cmd/tests/load_config_test.rs +++ b/src/cmd/tests/load_config_test.rs @@ -69,7 +69,6 @@ fn test_load_datanode_example_config() { region_engine: vec![ RegionEngineConfig::Mito(MitoConfig { auto_flush_interval: Duration::from_secs(3600), - scan_parallelism: 0, experimental_write_cache_ttl: Some(Duration::from_secs(60 * 60 * 8)), ..Default::default() }), @@ -205,7 +204,6 @@ fn test_load_standalone_example_config() { RegionEngineConfig::Mito(MitoConfig { auto_flush_interval: Duration::from_secs(3600), experimental_write_cache_ttl: Some(Duration::from_secs(60 * 60 * 8)), - scan_parallelism: 0, ..Default::default() }), RegionEngineConfig::File(EngineConfig {}), diff --git a/src/mito2/src/compaction.rs b/src/mito2/src/compaction.rs index a4094af74121..5f462f33a111 100644 --- a/src/mito2/src/compaction.rs +++ b/src/mito2/src/compaction.rs @@ -597,9 +597,8 @@ impl<'a> CompactionSstReaderBuilder<'a> { scan_input.with_predicate(time_range_to_predicate(time_range, &self.metadata)?); } - SeqScan::new(scan_input) - .with_compaction() - .build_reader() + SeqScan::new(scan_input, true) + .build_reader_for_compaction() .await } } diff --git a/src/mito2/src/config.rs b/src/mito2/src/config.rs index 797c42f8084c..cb4022f65e57 100644 --- a/src/mito2/src/config.rs +++ b/src/mito2/src/config.rs @@ -30,7 +30,7 @@ use crate::sst::DEFAULT_WRITE_BUFFER_SIZE; const MULTIPART_UPLOAD_MINIMUM_SIZE: ReadableSize = ReadableSize::mb(5); /// Default channel size for parallel scan task. -const DEFAULT_SCAN_CHANNEL_SIZE: usize = 32; +pub(crate) const DEFAULT_SCAN_CHANNEL_SIZE: usize = 32; // Use `1/GLOBAL_WRITE_BUFFER_SIZE_FACTOR` of OS memory as global write buffer size in default mode const GLOBAL_WRITE_BUFFER_SIZE_FACTOR: u64 = 8; @@ -107,11 +107,6 @@ pub struct MitoConfig { // Other configs: /// Buffer size for SST writing. pub sst_write_buffer_size: ReadableSize, - /// Parallelism to scan a region (default: 1/4 of cpu cores). - /// - 0: using the default value (1/4 of cpu cores). - /// - 1: scan in current thread. - /// - n: scan in parallelism n. - pub scan_parallelism: usize, /// Capacity of the channel to send data from parallel scan tasks to the main task (default 32). pub parallel_scan_channel_size: usize, /// Whether to allow stale entries read during replay. @@ -156,7 +151,6 @@ impl Default for MitoConfig { experimental_write_cache_size: ReadableSize::gb(1), experimental_write_cache_ttl: None, sst_write_buffer_size: DEFAULT_WRITE_BUFFER_SIZE, - scan_parallelism: divide_num_cpus(4), parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE, allow_stale_entries: false, index: IndexConfig::default(), @@ -229,11 +223,6 @@ impl MitoConfig { ); } - // Use default value if `scan_parallelism` is 0. - if self.scan_parallelism == 0 { - self.scan_parallelism = divide_num_cpus(4); - } - if self.parallel_scan_channel_size < 1 { self.parallel_scan_channel_size = DEFAULT_SCAN_CHANNEL_SIZE; warn!( diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index c60b7c4107ed..a518da32535d 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -90,7 +90,7 @@ use crate::error::{ }; use crate::manifest::action::RegionEdit; use crate::metrics::HANDLE_REQUEST_ELAPSED; -use crate::read::scan_region::{ScanParallelism, ScanRegion, Scanner}; +use crate::read::scan_region::{ScanRegion, Scanner}; use crate::request::{RegionEditRequest, WorkerRequest}; use crate::wal::entry_distributor::{ build_wal_entry_distributor_and_receivers, DEFAULT_ENTRY_RECEIVER_BUFFER_SIZE, @@ -171,19 +171,9 @@ impl MitoEngine { self.scan_region(region_id, request)?.scanner() } - /// Returns a region scanner to scan the region for `request`. - fn region_scanner( - &self, - region_id: RegionId, - request: ScanRequest, - ) -> Result { - let scanner = self.scanner(region_id, request)?; - scanner.region_scanner() - } - /// Scans a region. fn scan_region(&self, region_id: RegionId, request: ScanRequest) -> Result { - self.inner.handle_query(region_id, request) + self.inner.scan_region(region_id, request) } /// Edit region's metadata by [RegionEdit] directly. Use with care. @@ -423,7 +413,7 @@ impl EngineInner { } /// Handles the scan `request` and returns a [ScanRegion]. - fn handle_query(&self, region_id: RegionId, request: ScanRequest) -> Result { + fn scan_region(&self, region_id: RegionId, request: ScanRequest) -> Result { let query_start = Instant::now(); // Reading a region doesn't need to go through the region worker thread. let region = self @@ -433,14 +423,10 @@ impl EngineInner { let version = region.version(); // Get cache. let cache_manager = self.workers.cache_manager(); - let scan_parallelism = ScanParallelism { - parallelism: self.config.scan_parallelism, - channel_size: self.config.parallel_scan_channel_size, - }; let scan_region = ScanRegion::new(version, region.access_layer.clone(), request, cache_manager) - .with_parallelism(scan_parallelism) + .with_parallel_scan_channel_size(self.config.parallel_scan_channel_size) .with_ignore_inverted_index(self.config.inverted_index.apply_on_query.disabled()) .with_ignore_fulltext_index(self.config.fulltext_index.apply_on_query.disabled()) .with_start_time(query_start); @@ -538,7 +524,9 @@ impl RegionEngine for MitoEngine { region_id: RegionId, request: ScanRequest, ) -> Result { - self.region_scanner(region_id, request) + self.scan_region(region_id, request) + .map_err(BoxedError::new)? + .region_scanner() .map_err(BoxedError::new) } diff --git a/src/mito2/src/engine/append_mode_test.rs b/src/mito2/src/engine/append_mode_test.rs index ab8515aa133c..c9f61c5db3e0 100644 --- a/src/mito2/src/engine/append_mode_test.rs +++ b/src/mito2/src/engine/append_mode_test.rs @@ -92,7 +92,6 @@ async fn test_append_mode_compaction() { let mut env = TestEnv::new(); let engine = env .create_engine(MitoConfig { - scan_parallelism: 2, ..Default::default() }) .await; @@ -176,19 +175,19 @@ async fn test_append_mode_compaction() { | b | 1.0 | 1970-01-01T00:00:01 | +-------+---------+---------------------+"; // Scans in parallel. - let scanner = engine.scanner(region_id, ScanRequest::default()).unwrap(); + let mut scanner = engine.scanner(region_id, ScanRequest::default()).unwrap(); assert_eq!(2, scanner.num_files()); assert_eq!(1, scanner.num_memtables()); + scanner.set_target_partitions(2); let stream = scanner.scan().await.unwrap(); let batches = RecordBatches::try_collect(stream).await.unwrap(); assert_eq!(expected, sort_batches_and_print(&batches, &["tag_0", "ts"])); - // Reopens engine with parallelism 1. + // Reopens engine. let engine = env .reopen_engine( engine, MitoConfig { - scan_parallelism: 1, ..Default::default() }, ) diff --git a/src/mito2/src/engine/merge_mode_test.rs b/src/mito2/src/engine/merge_mode_test.rs index 08f4d0565007..e74aba5655a3 100644 --- a/src/mito2/src/engine/merge_mode_test.rs +++ b/src/mito2/src/engine/merge_mode_test.rs @@ -92,7 +92,6 @@ async fn test_merge_mode_compaction() { let mut env = TestEnv::new(); let engine = env .create_engine(MitoConfig { - scan_parallelism: 2, ..Default::default() }) .await; @@ -190,19 +189,19 @@ async fn test_merge_mode_compaction() { | a | | 13.0 | 1970-01-01T00:00:03 | +-------+---------+---------+---------------------+"; // Scans in parallel. - let scanner = engine.scanner(region_id, ScanRequest::default()).unwrap(); + let mut scanner = engine.scanner(region_id, ScanRequest::default()).unwrap(); assert_eq!(1, scanner.num_files()); assert_eq!(1, scanner.num_memtables()); + scanner.set_target_partitions(2); let stream = scanner.scan().await.unwrap(); let batches = RecordBatches::try_collect(stream).await.unwrap(); assert_eq!(expected, sort_batches_and_print(&batches, &["tag_0", "ts"])); - // Reopens engine with parallelism 1. + // Reopens engine. let engine = env .reopen_engine( engine, MitoConfig { - scan_parallelism: 1, ..Default::default() }, ) diff --git a/src/mito2/src/engine/parallel_test.rs b/src/mito2/src/engine/parallel_test.rs index 53cc0dca8fb0..3d5dab3540e1 100644 --- a/src/mito2/src/engine/parallel_test.rs +++ b/src/mito2/src/engine/parallel_test.rs @@ -37,7 +37,6 @@ async fn scan_in_parallel( ) { let engine = env .open_engine(MitoConfig { - scan_parallelism: parallelism, parallel_scan_channel_size: channel_size, ..Default::default() }) @@ -57,7 +56,9 @@ async fn scan_in_parallel( .unwrap(); let request = ScanRequest::default(); - let stream = engine.scan_to_stream(region_id, request).await.unwrap(); + let mut scanner = engine.scanner(region_id, request).unwrap(); + scanner.set_target_partitions(parallelism); + let stream = scanner.scan().await.unwrap(); let batches = RecordBatches::try_collect(stream).await.unwrap(); let expected = "\ +-------+---------+---------------------+ diff --git a/src/mito2/src/read/range.rs b/src/mito2/src/read/range.rs index 1944d171dd19..554751830ffc 100644 --- a/src/mito2/src/read/range.rs +++ b/src/mito2/src/read/range.rs @@ -34,6 +34,16 @@ use crate::sst::parquet::DEFAULT_ROW_GROUP_SIZE; const ALL_ROW_GROUPS: i64 = -1; +/// Index and metadata for a memtable or file. +#[derive(Debug, Clone, Copy, PartialEq)] +pub(crate) struct SourceIndex { + /// Index of the memtable and file. + pub(crate) index: usize, + /// Total number of row groups in this source. 0 if the metadata + /// is unavailable. We use this to split files. + pub(crate) num_row_groups: u64, +} + /// Index to access a row group. #[derive(Debug, Clone, Copy, PartialEq)] pub(crate) struct RowGroupIndex { @@ -52,7 +62,7 @@ pub(crate) struct RangeMeta { /// The time range of the range. pub(crate) time_range: FileTimeRange, /// Indices to memtables or files. - indices: SmallVec<[usize; 2]>, + pub(crate) indices: SmallVec<[SourceIndex; 2]>, /// Indices to memtable/file row groups that this range scans. pub(crate) row_group_indices: SmallVec<[RowGroupIndex; 2]>, /// Estimated number of rows in the range. This can be 0 if the statistics are not available. @@ -81,12 +91,17 @@ impl RangeMeta { } /// Creates a list of ranges from the `input` for seq scan. - pub(crate) fn seq_scan_ranges(input: &ScanInput) -> Vec { + /// If `compaction` is true, it doesn't split the ranges. + pub(crate) fn seq_scan_ranges(input: &ScanInput, compaction: bool) -> Vec { let mut ranges = Vec::with_capacity(input.memtables.len() + input.files.len()); Self::push_seq_mem_ranges(&input.memtables, &mut ranges); Self::push_seq_file_ranges(input.memtables.len(), &input.files, &mut ranges); let ranges = group_ranges_for_seq_scan(ranges); + if compaction { + // We don't split ranges in compaction. + return ranges; + } maybe_split_ranges_for_seq_scan(ranges) } @@ -105,13 +120,13 @@ impl RangeMeta { } /// Returns true if the time range of given `meta` overlaps with the time range of this meta. - pub(crate) fn overlaps(&self, meta: &RangeMeta) -> bool { + fn overlaps(&self, meta: &RangeMeta) -> bool { overlaps(&self.time_range, &meta.time_range) } /// Merges given `meta` to this meta. /// It assumes that the time ranges overlap and they don't have the same file or memtable index. - pub(crate) fn merge(&mut self, mut other: RangeMeta) { + fn merge(&mut self, mut other: RangeMeta) { debug_assert!(self.overlaps(&other)); debug_assert!(self.indices.iter().all(|idx| !other.indices.contains(idx))); debug_assert!(self @@ -130,22 +145,28 @@ impl RangeMeta { /// Returns true if we can split the range into multiple smaller ranges and /// still preserve the order for [SeqScan]. - pub(crate) fn can_split_preserve_order(&self) -> bool { - // Only one source and multiple row groups. - self.indices.len() == 1 && self.row_group_indices.len() > 1 + fn can_split_preserve_order(&self) -> bool { + self.indices.len() == 1 && self.indices[0].num_row_groups > 1 } /// Splits the range if it can preserve the order. - pub(crate) fn maybe_split(self, output: &mut Vec) { + fn maybe_split(self, output: &mut Vec) { if self.can_split_preserve_order() { + let num_row_groups = self.indices[0].num_row_groups; + debug_assert_eq!(1, self.row_group_indices.len()); + debug_assert_eq!(ALL_ROW_GROUPS, self.row_group_indices[0].row_group_index); + output.reserve(self.row_group_indices.len()); - let num_rows = self.num_rows / self.row_group_indices.len(); + let num_rows = self.num_rows / num_row_groups as usize; // Splits by row group. - for index in self.row_group_indices { + for row_group_index in 0..num_row_groups { output.push(RangeMeta { time_range: self.time_range, indices: self.indices.clone(), - row_group_indices: smallvec![index], + row_group_indices: smallvec![RowGroupIndex { + index: self.indices[0].index, + row_group_index: row_group_index as i64, + }], num_rows, }); } @@ -165,7 +186,10 @@ impl RangeMeta { let num_rows = stats.num_rows() / stats.num_ranges(); ranges.push(RangeMeta { time_range, - indices: smallvec![memtable_index], + indices: smallvec![SourceIndex { + index: memtable_index, + num_row_groups: stats.num_ranges() as u64, + }], row_group_indices: smallvec![RowGroupIndex { index: memtable_index, row_group_index: row_group_index as i64, @@ -199,7 +223,10 @@ impl RangeMeta { let num_rows = parquet_meta.row_group(row_group_index as usize).num_rows(); ranges.push(RangeMeta { time_range: time_range.unwrap_or_else(|| file.time_range()), - indices: smallvec![file_index], + indices: smallvec![SourceIndex { + index: file_index, + num_row_groups: file.meta_ref().num_row_groups, + }], row_group_indices: smallvec![RowGroupIndex { index: file_index, row_group_index: row_group_index as i64, @@ -212,7 +239,10 @@ impl RangeMeta { for row_group_index in 0..file.meta_ref().num_row_groups { ranges.push(RangeMeta { time_range: file.time_range(), - indices: smallvec![file_index], + indices: smallvec![SourceIndex { + index: file_index, + num_row_groups: file.meta_ref().num_row_groups, + }], row_group_indices: smallvec![RowGroupIndex { index: file_index, row_group_index: row_group_index as i64, @@ -224,7 +254,10 @@ impl RangeMeta { // If we don't known the number of row groups in advance, scan all row groups. ranges.push(RangeMeta { time_range: file.time_range(), - indices: smallvec![file_index], + indices: smallvec![SourceIndex { + index: file_index, + num_row_groups: 0, + }], row_group_indices: smallvec![RowGroupIndex { index: file_index, row_group_index: ALL_ROW_GROUPS, @@ -245,7 +278,10 @@ impl RangeMeta { }; ranges.push(RangeMeta { time_range, - indices: smallvec![i], + indices: smallvec![SourceIndex { + index: i, + num_row_groups: stats.num_ranges() as u64, + }], row_group_indices: smallvec![RowGroupIndex { index: i, row_group_index: ALL_ROW_GROUPS, @@ -263,31 +299,18 @@ impl RangeMeta { // For non append-only mode, each range only contains one file. for (i, file) in files.iter().enumerate() { let file_index = num_memtables + i; - if file.meta_ref().num_row_groups > 0 { - // All row groups share the same time range. - let row_group_indices = (0..file.meta_ref().num_row_groups) - .map(|row_group_index| RowGroupIndex { - index: file_index, - row_group_index: row_group_index as i64, - }) - .collect(); - ranges.push(RangeMeta { - time_range: file.time_range(), - indices: smallvec![file_index], - row_group_indices, - num_rows: file.meta_ref().num_rows as usize, - }); - } else { - ranges.push(RangeMeta { - time_range: file.time_range(), - indices: smallvec![file_index], - row_group_indices: smallvec![RowGroupIndex { - index: file_index, - row_group_index: ALL_ROW_GROUPS, - }], - num_rows: file.meta_ref().num_rows as usize, - }); - } + ranges.push(RangeMeta { + time_range: file.time_range(), + indices: smallvec![SourceIndex { + index: file_index, + num_row_groups: file.meta_ref().num_row_groups, + }], + row_group_indices: smallvec![RowGroupIndex { + index: file_index, + row_group_index: ALL_ROW_GROUPS, + }], + num_rows: file.meta_ref().num_rows as usize, + }); } } } @@ -514,7 +537,10 @@ mod tests { ); RangeMeta { time_range, - indices: smallvec![*idx], + indices: smallvec![SourceIndex { + index: *idx, + num_row_groups: 0, + }], row_group_indices: smallvec![RowGroupIndex { index: *idx, row_group_index: 0 @@ -527,7 +553,7 @@ mod tests { let actual: Vec<_> = output .iter() .map(|range| { - let indices = range.indices.to_vec(); + let indices = range.indices.iter().map(|index| index.index).collect(); let group_indices: Vec<_> = range .row_group_indices .iter() @@ -578,7 +604,10 @@ mod tests { fn test_merge_range() { let mut left = RangeMeta { time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)), - indices: smallvec![1], + indices: smallvec![SourceIndex { + index: 1, + num_row_groups: 2, + }], row_group_indices: smallvec![ RowGroupIndex { index: 1, @@ -593,7 +622,10 @@ mod tests { }; let right = RangeMeta { time_range: (Timestamp::new_second(800), Timestamp::new_second(1200)), - indices: smallvec![2], + indices: smallvec![SourceIndex { + index: 2, + num_row_groups: 2, + }], row_group_indices: smallvec![ RowGroupIndex { index: 2, @@ -612,7 +644,16 @@ mod tests { left, RangeMeta { time_range: (Timestamp::new_second(800), Timestamp::new_second(2000)), - indices: smallvec![1, 2], + indices: smallvec![ + SourceIndex { + index: 1, + num_row_groups: 2 + }, + SourceIndex { + index: 2, + num_row_groups: 2 + } + ], row_group_indices: smallvec![ RowGroupIndex { index: 1, @@ -640,17 +681,14 @@ mod tests { fn test_split_range() { let range = RangeMeta { time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)), - indices: smallvec![1], - row_group_indices: smallvec![ - RowGroupIndex { - index: 1, - row_group_index: 1 - }, - RowGroupIndex { - index: 1, - row_group_index: 2 - } - ], + indices: smallvec![SourceIndex { + index: 1, + num_row_groups: 2, + }], + row_group_indices: smallvec![RowGroupIndex { + index: 1, + row_group_index: ALL_ROW_GROUPS, + }], num_rows: 5, }; @@ -663,19 +701,25 @@ mod tests { &[ RangeMeta { time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)), - indices: smallvec![1], + indices: smallvec![SourceIndex { + index: 1, + num_row_groups: 2, + }], row_group_indices: smallvec![RowGroupIndex { index: 1, - row_group_index: 1 + row_group_index: 0 },], num_rows: 2, }, RangeMeta { time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)), - indices: smallvec![1], + indices: smallvec![SourceIndex { + index: 1, + num_row_groups: 2, + }], row_group_indices: smallvec![RowGroupIndex { index: 1, - row_group_index: 2 + row_group_index: 1 }], num_rows: 2, } @@ -687,7 +731,16 @@ mod tests { fn test_not_split_range() { let range = RangeMeta { time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)), - indices: smallvec![1, 2], + indices: smallvec![ + SourceIndex { + index: 1, + num_row_groups: 1, + }, + SourceIndex { + index: 2, + num_row_groups: 1, + } + ], row_group_indices: smallvec![ RowGroupIndex { index: 1, @@ -710,32 +763,50 @@ mod tests { #[test] fn test_maybe_split_ranges() { let ranges = vec![ + RangeMeta { + time_range: (Timestamp::new_second(0), Timestamp::new_second(500)), + indices: smallvec![SourceIndex { + index: 0, + num_row_groups: 1, + }], + row_group_indices: smallvec![RowGroupIndex { + index: 0, + row_group_index: 0, + },], + num_rows: 4, + }, RangeMeta { time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)), - indices: smallvec![1], - row_group_indices: smallvec![ - RowGroupIndex { - index: 1, - row_group_index: 0 - }, - RowGroupIndex { - index: 1, - row_group_index: 1 - } - ], + indices: smallvec![SourceIndex { + index: 1, + num_row_groups: 2, + }], + row_group_indices: smallvec![RowGroupIndex { + index: 1, + row_group_index: ALL_ROW_GROUPS, + },], num_rows: 4, }, RangeMeta { time_range: (Timestamp::new_second(3000), Timestamp::new_second(4000)), - indices: smallvec![2, 3], + indices: smallvec![ + SourceIndex { + index: 2, + num_row_groups: 2, + }, + SourceIndex { + index: 3, + num_row_groups: 0, + } + ], row_group_indices: smallvec![ RowGroupIndex { index: 2, - row_group_index: 0 + row_group_index: ALL_ROW_GROUPS, }, RowGroupIndex { index: 3, - row_group_index: 0 + row_group_index: ALL_ROW_GROUPS, } ], num_rows: 5, @@ -745,9 +816,24 @@ mod tests { assert_eq!( output, vec![ + RangeMeta { + time_range: (Timestamp::new_second(0), Timestamp::new_second(500)), + indices: smallvec![SourceIndex { + index: 0, + num_row_groups: 1, + }], + row_group_indices: smallvec![RowGroupIndex { + index: 0, + row_group_index: 0 + },], + num_rows: 4, + }, RangeMeta { time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)), - indices: smallvec![1], + indices: smallvec![SourceIndex { + index: 1, + num_row_groups: 2, + }], row_group_indices: smallvec![RowGroupIndex { index: 1, row_group_index: 0 @@ -756,7 +842,10 @@ mod tests { }, RangeMeta { time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)), - indices: smallvec![1], + indices: smallvec![SourceIndex { + index: 1, + num_row_groups: 2, + }], row_group_indices: smallvec![RowGroupIndex { index: 1, row_group_index: 1 @@ -765,15 +854,24 @@ mod tests { }, RangeMeta { time_range: (Timestamp::new_second(3000), Timestamp::new_second(4000)), - indices: smallvec![2, 3], + indices: smallvec![ + SourceIndex { + index: 2, + num_row_groups: 2 + }, + SourceIndex { + index: 3, + num_row_groups: 0, + } + ], row_group_indices: smallvec![ RowGroupIndex { index: 2, - row_group_index: 0 + row_group_index: ALL_ROW_GROUPS, }, RowGroupIndex { index: 3, - row_group_index: 0 + row_group_index: ALL_ROW_GROUPS, } ], num_rows: 5, diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index 7da80806f22e..471cc1a8e5d4 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -33,6 +33,7 @@ use tokio_stream::wrappers::ReceiverStream; use crate::access_layer::AccessLayerRef; use crate::cache::file_cache::FileCacheRef; use crate::cache::CacheManagerRef; +use crate::config::DEFAULT_SCAN_CHANNEL_SIZE; use crate::error::Result; use crate::memtable::MemtableRef; use crate::metrics::READ_SST_COUNT; @@ -68,15 +69,6 @@ impl Scanner { Scanner::Unordered(unordered_scan) => unordered_scan.build_stream().await, } } - - /// Returns a [RegionScanner] to scan the region. - #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)] - pub(crate) fn region_scanner(self) -> Result { - match self { - Scanner::Seq(seq_scan) => Ok(Box::new(seq_scan)), - Scanner::Unordered(unordered_scan) => Ok(Box::new(unordered_scan)), - } - } } #[cfg(test)] @@ -104,6 +96,17 @@ impl Scanner { Scanner::Unordered(unordered_scan) => unordered_scan.input().file_ids(), } } + + /// Sets the target partitions for the scanner. It can controls the parallelism of the scanner. + pub(crate) fn set_target_partitions(&mut self, target_partitions: usize) { + use store_api::region_engine::{PrepareRequest, RegionScanner}; + + let request = PrepareRequest::default().with_target_partitions(target_partitions); + match self { + Scanner::Seq(seq_scan) => seq_scan.prepare(request).unwrap(), + Scanner::Unordered(unordered_scan) => unordered_scan.prepare(request).unwrap(), + } + } } #[cfg_attr(doc, aquamarine::aquamarine)] @@ -165,8 +168,8 @@ pub(crate) struct ScanRegion { request: ScanRequest, /// Cache. cache_manager: CacheManagerRef, - /// Parallelism to scan. - parallelism: ScanParallelism, + /// Capacity of the channel to send data from parallel scan tasks to the main task. + parallel_scan_channel_size: usize, /// Whether to ignore inverted index. ignore_inverted_index: bool, /// Whether to ignore fulltext index. @@ -188,17 +191,20 @@ impl ScanRegion { access_layer, request, cache_manager, - parallelism: ScanParallelism::default(), + parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE, ignore_inverted_index: false, ignore_fulltext_index: false, start_time: None, } } - /// Sets parallelism. + /// Sets parallel scan task channel size. #[must_use] - pub(crate) fn with_parallelism(mut self, parallelism: ScanParallelism) -> Self { - self.parallelism = parallelism; + pub(crate) fn with_parallel_scan_channel_size( + mut self, + parallel_scan_channel_size: usize, + ) -> Self { + self.parallel_scan_channel_size = parallel_scan_channel_size; self } @@ -224,7 +230,7 @@ impl ScanRegion { /// Returns a [Scanner] to scan the region. pub(crate) fn scanner(self) -> Result { - if self.version.options.append_mode && self.request.series_row_selector.is_none() { + if self.use_unordered_scan() { // If table is append only and there is no series row selector, we use unordered scan in query. // We still use seq scan in compaction. self.unordered_scan().map(Scanner::Unordered) @@ -233,10 +239,20 @@ impl ScanRegion { } } + /// Returns a [RegionScanner] to scan the region. + #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)] + pub(crate) fn region_scanner(self) -> Result { + if self.use_unordered_scan() { + self.unordered_scan().map(|scanner| Box::new(scanner) as _) + } else { + self.seq_scan().map(|scanner| Box::new(scanner) as _) + } + } + /// Scan sequentially. pub(crate) fn seq_scan(self) -> Result { let input = self.scan_input(true)?; - Ok(SeqScan::new(input)) + Ok(SeqScan::new(input, false)) } /// Unordered scan. @@ -248,7 +264,14 @@ impl ScanRegion { #[cfg(test)] pub(crate) fn scan_without_filter_deleted(self) -> Result { let input = self.scan_input(false)?; - Ok(SeqScan::new(input)) + Ok(SeqScan::new(input, false)) + } + + /// Returns true if the region can use unordered scan for current request. + fn use_unordered_scan(&self) -> bool { + // If table is append only and there is no series row selector, we use unordered scan in query. + // We still use seq scan in compaction. + self.version.options.append_mode && self.request.series_row_selector.is_none() } /// Creates a scan input. @@ -314,7 +337,7 @@ impl ScanRegion { .with_cache(self.cache_manager) .with_inverted_index_applier(inverted_index_applier) .with_fulltext_index_applier(fulltext_index_applier) - .with_parallelism(self.parallelism) + .with_parallel_scan_channel_size(self.parallel_scan_channel_size) .with_start_time(self.start_time) .with_append_mode(self.version.options.append_mode) .with_filter_deleted(filter_deleted) @@ -428,15 +451,6 @@ impl ScanRegion { } } -/// Config for parallel scan. -#[derive(Debug, Clone, Copy, Default)] -pub(crate) struct ScanParallelism { - /// Number of tasks expect to spawn to read data. - pub(crate) parallelism: usize, - /// Channel size to send batches. Only takes effect when the parallelism > 1. - pub(crate) channel_size: usize, -} - /// Returns true if the time range of a SST `file` matches the `predicate`. fn file_in_range(file: &FileHandle, predicate: &TimestampRange) -> bool { if predicate == &TimestampRange::min_to_max() { @@ -466,8 +480,8 @@ pub(crate) struct ScanInput { pub(crate) cache_manager: CacheManagerRef, /// Ignores file not found error. ignore_file_not_found: bool, - /// Parallelism to scan data. - pub(crate) parallelism: ScanParallelism, + /// Capacity of the channel to send data from parallel scan tasks to the main task. + pub(crate) parallel_scan_channel_size: usize, /// Index appliers. inverted_index_applier: Option, fulltext_index_applier: Option, @@ -496,7 +510,7 @@ impl ScanInput { files: Vec::new(), cache_manager: CacheManagerRef::default(), ignore_file_not_found: false, - parallelism: ScanParallelism::default(), + parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE, inverted_index_applier: None, fulltext_index_applier: None, query_start: None, @@ -549,10 +563,13 @@ impl ScanInput { self } - /// Sets scan parallelism. + /// Sets scan task channel size. #[must_use] - pub(crate) fn with_parallelism(mut self, parallelism: ScanParallelism) -> Self { - self.parallelism = parallelism; + pub(crate) fn with_parallel_scan_channel_size( + mut self, + parallel_scan_channel_size: usize, + ) -> Self { + self.parallel_scan_channel_size = parallel_scan_channel_size; self } @@ -621,12 +638,15 @@ impl ScanInput { sources: Vec, semaphore: Arc, ) -> Result> { - debug_assert!(self.parallelism.parallelism > 1); + if sources.len() <= 1 { + return Ok(sources); + } + // Spawn a task for each source. let sources = sources .into_iter() .map(|source| { - let (sender, receiver) = mpsc::channel(self.parallelism.channel_size); + let (sender, receiver) = mpsc::channel(self.parallel_scan_channel_size); self.spawn_scan_task(source, semaphore.clone(), sender); let stream = Box::pin(ReceiverStream::new(receiver)); Source::Stream(stream) @@ -761,9 +781,9 @@ pub(crate) struct StreamContext { impl StreamContext { /// Creates a new [StreamContext] for [SeqScan]. - pub(crate) fn seq_scan_ctx(input: ScanInput) -> Self { + pub(crate) fn seq_scan_ctx(input: ScanInput, compaction: bool) -> Self { let query_start = input.query_start.unwrap_or_else(Instant::now); - let ranges = RangeMeta::seq_scan_ranges(&input); + let ranges = RangeMeta::seq_scan_ranges(&input, compaction); READ_SST_COUNT.observe(input.num_files() as f64); Self { diff --git a/src/mito2/src/read/seq_scan.rs b/src/mito2/src/read/seq_scan.rs index 9498078ddbc4..d8732cb93df2 100644 --- a/src/mito2/src/read/seq_scan.rs +++ b/src/mito2/src/read/seq_scan.rs @@ -28,7 +28,7 @@ use datafusion::physical_plan::{DisplayAs, DisplayFormatType}; use datatypes::schema::SchemaRef; use snafu::ResultExt; use store_api::metadata::RegionMetadataRef; -use store_api::region_engine::{PartitionRange, RegionScanner, ScannerProperties}; +use store_api::region_engine::{PartitionRange, PrepareRequest, RegionScanner, ScannerProperties}; use store_api::storage::TimeSeriesRowSelector; use tokio::sync::Semaphore; @@ -51,39 +51,27 @@ pub struct SeqScan { properties: ScannerProperties, /// Context of streams. stream_ctx: Arc, - /// Semaphore to control scan parallelism of files. - /// Streams created by the scanner share the same semaphore. - semaphore: Arc, /// The scanner is used for compaction. compaction: bool, } impl SeqScan { - /// Creates a new [SeqScan]. - pub(crate) fn new(input: ScanInput) -> Self { - // TODO(yingwen): Set permits according to partition num. But we need to support file - // level parallelism. - let parallelism = input.parallelism.parallelism.max(1); + /// Creates a new [SeqScan] with the given input and compaction flag. + /// If `compaction` is true, the scanner will not attempt to split ranges. + pub(crate) fn new(input: ScanInput, compaction: bool) -> Self { let mut properties = ScannerProperties::default() .with_append_mode(input.append_mode) .with_total_rows(input.total_rows()); - let stream_ctx = Arc::new(StreamContext::seq_scan_ctx(input)); + let stream_ctx = Arc::new(StreamContext::seq_scan_ctx(input, compaction)); properties.partitions = vec![stream_ctx.partition_ranges()]; Self { properties, stream_ctx, - semaphore: Arc::new(Semaphore::new(parallelism)), - compaction: false, + compaction, } } - /// Sets the scanner to be used for compaction. - pub(crate) fn with_compaction(mut self) -> Self { - self.compaction = true; - self - } - /// Builds a stream for the query. /// /// The returned stream is not partitioned and will contains all the data. If want @@ -98,7 +86,12 @@ impl SeqScan { } /// Builds a [BoxedBatchReader] from sequential scan for compaction. - pub async fn build_reader(&self) -> Result { + /// + /// # Panics + /// Panics if the compaction flag is not set. + pub async fn build_reader_for_compaction(&self) -> Result { + assert!(self.compaction); + let part_metrics = PartitionMetrics::new( self.stream_ctx.input.mapper.metadata().region_id, 0, @@ -112,23 +105,20 @@ impl SeqScan { debug_assert_eq!(1, self.properties.partitions.len()); let partition_ranges = &self.properties.partitions[0]; - let reader = Self::build_all_merge_reader( + let reader = Self::merge_all_ranges_for_compaction( &self.stream_ctx, partition_ranges, - self.semaphore.clone(), - self.compaction, &part_metrics, ) .await?; Ok(Box::new(reader)) } - /// Builds a merge reader that reads all data. - async fn build_all_merge_reader( + /// Builds a merge reader that reads all ranges. + /// Callers MUST not split ranges before calling this method. + async fn merge_all_ranges_for_compaction( stream_ctx: &Arc, partition_ranges: &[PartitionRange], - semaphore: Arc, - compaction: bool, part_metrics: &PartitionMetrics, ) -> Result { let mut sources = Vec::new(); @@ -140,27 +130,37 @@ impl SeqScan { build_sources( stream_ctx, part_range, - compaction, + true, part_metrics, range_builder_list.clone(), &mut sources, ); } - Self::build_reader_from_sources(stream_ctx, sources, semaphore).await + + common_telemetry::debug!( + "Build reader to read all parts, region_id: {}, num_part_ranges: {}, num_sources: {}", + stream_ctx.input.mapper.metadata().region_id, + partition_ranges.len(), + sources.len() + ); + Self::build_reader_from_sources(stream_ctx, sources, None).await } + /// Builds a reader to read sources. If `semaphore` is provided, reads sources in parallel + /// if possible. #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)] async fn build_reader_from_sources( stream_ctx: &StreamContext, mut sources: Vec, - semaphore: Arc, + semaphore: Option>, ) -> Result { - if stream_ctx.input.parallelism.parallelism > 1 { - // Read sources in parallel. We always spawn a task so we can control the parallelism - // by the semaphore. - sources = stream_ctx - .input - .create_parallel_sources(sources, semaphore.clone())?; + if let Some(semaphore) = semaphore.as_ref() { + // Read sources in parallel. + if sources.len() > 1 { + sources = stream_ctx + .input + .create_parallel_sources(sources, semaphore.clone())?; + } } let mut builder = MergeReaderBuilder::from_sources(sources); @@ -207,10 +207,21 @@ impl SeqScan { } let stream_ctx = self.stream_ctx.clone(); - let semaphore = self.semaphore.clone(); + let semaphore = if self.properties.target_partitions() > self.properties.num_partitions() { + // We can use additional tasks to read the data if we have more target partitions than actual partitions. + // This semaphore is partition level. + // We don't use a global semaphore to avoid a partition waiting for others. The final concurrency + // of tasks usually won't exceed the target partitions a lot as compaction can reduce the number of + // files in a part range. + Some(Arc::new(Semaphore::new( + self.properties.target_partitions() - self.properties.num_partitions() + 1, + ))) + } else { + None + }; let partition_ranges = self.properties.partitions[partition].clone(); let compaction = self.compaction; - let distinguish_range = self.properties.distinguish_partition_range(); + let distinguish_range = self.properties.distinguish_partition_range; let part_metrics = PartitionMetrics::new( self.stream_ctx.input.mapper.metadata().region_id, partition, @@ -325,13 +336,8 @@ impl RegionScanner for SeqScan { self.scan_partition_impl(partition) } - fn prepare( - &mut self, - ranges: Vec>, - distinguish_partition_range: bool, - ) -> Result<(), BoxedError> { - self.properties.partitions = ranges; - self.properties.distinguish_partition_range = distinguish_partition_range; + fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> { + self.properties.prepare(request); Ok(()) } @@ -375,6 +381,20 @@ fn build_sources( ) { // Gets range meta. let range_meta = &stream_ctx.ranges[part_range.identifier]; + #[cfg(debug_assertions)] + if compaction { + // Compaction expects input sources are not been split. + debug_assert_eq!(range_meta.indices.len(), range_meta.row_group_indices.len()); + for (i, row_group_idx) in range_meta.row_group_indices.iter().enumerate() { + // It should scan all row groups. + debug_assert_eq!( + -1, row_group_idx.row_group_index, + "Expect {} range scan all row groups, given: {}", + i, row_group_idx.row_group_index, + ); + } + } + sources.reserve(range_meta.row_group_indices.len()); for index in &range_meta.row_group_indices { let stream = if stream_ctx.is_mem_range_index(*index) { diff --git a/src/mito2/src/read/unordered_scan.rs b/src/mito2/src/read/unordered_scan.rs index c1ee34b08e5d..97db9b86592c 100644 --- a/src/mito2/src/read/unordered_scan.rs +++ b/src/mito2/src/read/unordered_scan.rs @@ -27,7 +27,7 @@ use datatypes::schema::SchemaRef; use futures::{Stream, StreamExt}; use snafu::ResultExt; use store_api::metadata::RegionMetadataRef; -use store_api::region_engine::{PartitionRange, RegionScanner, ScannerProperties}; +use store_api::region_engine::{PrepareRequest, RegionScanner, ScannerProperties}; use crate::error::{PartitionOutOfRangeSnafu, Result}; use crate::read::range::RangeBuilderList; @@ -144,7 +144,7 @@ impl UnorderedScan { ); let stream_ctx = self.stream_ctx.clone(); let part_ranges = self.properties.partitions[partition].clone(); - let distinguish_range = self.properties.distinguish_partition_range(); + let distinguish_range = self.properties.distinguish_partition_range; let stream = try_stream! { part_metrics.on_first_poll(); @@ -231,13 +231,8 @@ impl RegionScanner for UnorderedScan { self.stream_ctx.input.mapper.output_schema() } - fn prepare( - &mut self, - ranges: Vec>, - distinguish_partition_range: bool, - ) -> Result<(), BoxedError> { - self.properties.partitions = ranges; - self.properties.distinguish_partition_range = distinguish_partition_range; + fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> { + self.properties.prepare(request); Ok(()) } diff --git a/src/query/src/optimizer/parallelize_scan.rs b/src/query/src/optimizer/parallelize_scan.rs index 02cd04df87b6..a9e0a3302436 100644 --- a/src/query/src/optimizer/parallelize_scan.rs +++ b/src/query/src/optimizer/parallelize_scan.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::BinaryHeap; use std::sync::Arc; use common_telemetry::debug; @@ -93,7 +94,7 @@ impl ParallelizeScan { // update the partition ranges let new_exec = region_scan_exec - .with_new_partitions(partition_ranges) + .with_new_partitions(partition_ranges, expected_partition_num) .map_err(|e| DataFusionError::External(e.into_inner()))?; return Ok(Transformed::yes(Arc::new(new_exec))); } @@ -109,21 +110,71 @@ impl ParallelizeScan { /// Distribute [`PartitionRange`]s to each partition. /// - /// Currently we use a simple round-robin strategy to assign ranges to partitions. + /// Currently we assign ranges to partitions according to their rows so each partition + /// has similar number of rows. /// This method may return partitions with smaller number than `expected_partition_num` /// if the number of ranges is smaller than `expected_partition_num`. But this will /// return at least one partition. fn assign_partition_range( - ranges: Vec, + mut ranges: Vec, expected_partition_num: usize, ) -> Vec> { - let actual_partition_num = expected_partition_num.min(ranges.len()).max(1); + if ranges.is_empty() { + // Returns a single partition with no range. + return vec![vec![]]; + } + + if ranges.len() == 1 { + return vec![ranges]; + } + + // Sort ranges by number of rows in descending order. + ranges.sort_by(|a, b| b.num_rows.cmp(&a.num_rows)); + // Get the max row number of the ranges. Note that the number of rows may be 0 if statistics are not available. + let max_rows = ranges[0].num_rows; + let total_rows = ranges.iter().map(|range| range.num_rows).sum::(); + // Computes the partition num by the max row number. This eliminates the unbalance of the partitions. + let balanced_partition_num = if max_rows > 0 { + total_rows.div_ceil(max_rows) + } else { + ranges.len() + }; + let actual_partition_num = expected_partition_num.min(balanced_partition_num).max(1); let mut partition_ranges = vec![vec![]; actual_partition_num]; - // round-robin assignment - for (i, range) in ranges.into_iter().enumerate() { - let partition_idx = i % expected_partition_num; + #[derive(Eq, PartialEq)] + struct HeapNode { + num_rows: usize, + partition_idx: usize, + } + + impl Ord for HeapNode { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + // Reverse for min-heap. + self.num_rows.cmp(&other.num_rows).reverse() + } + } + + impl PartialOrd for HeapNode { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } + } + + let mut part_heap = + BinaryHeap::from_iter((0..actual_partition_num).map(|partition_idx| HeapNode { + num_rows: 0, + partition_idx, + })); + + // Assigns the range to the partition with the smallest number of rows. + for range in ranges { + // Safety: actual_partition_num always > 0. + let mut node = part_heap.pop().unwrap(); + let partition_idx = node.partition_idx; + node.num_rows += range.num_rows; partition_ranges[partition_idx].push(range); + part_heap.push(node); } partition_ranges @@ -172,18 +223,18 @@ mod test { ParallelizeScan::assign_partition_range(ranges.clone(), expected_partition_num); let expected = vec![ vec![ + PartitionRange { + start: Timestamp::new(30, TimeUnit::Second), + end: Timestamp::new(40, TimeUnit::Second), + num_rows: 250, + identifier: 4, + }, PartitionRange { start: Timestamp::new(0, TimeUnit::Second), end: Timestamp::new(10, TimeUnit::Second), num_rows: 100, identifier: 1, }, - PartitionRange { - start: Timestamp::new(20, TimeUnit::Second), - end: Timestamp::new(30, TimeUnit::Second), - num_rows: 150, - identifier: 3, - }, ], vec![ PartitionRange { @@ -193,10 +244,10 @@ mod test { identifier: 2, }, PartitionRange { - start: Timestamp::new(30, TimeUnit::Second), - end: Timestamp::new(40, TimeUnit::Second), - num_rows: 250, - identifier: 4, + start: Timestamp::new(20, TimeUnit::Second), + end: Timestamp::new(30, TimeUnit::Second), + num_rows: 150, + identifier: 3, }, ], ]; @@ -207,34 +258,100 @@ mod test { let result = ParallelizeScan::assign_partition_range(ranges, expected_partition_num); let expected = vec![ vec![PartitionRange { + start: Timestamp::new(30, TimeUnit::Second), + end: Timestamp::new(40, TimeUnit::Second), + num_rows: 250, + identifier: 4, + }], + vec![PartitionRange { + start: Timestamp::new(10, TimeUnit::Second), + end: Timestamp::new(20, TimeUnit::Second), + num_rows: 200, + identifier: 2, + }], + vec![ + PartitionRange { + start: Timestamp::new(20, TimeUnit::Second), + end: Timestamp::new(30, TimeUnit::Second), + num_rows: 150, + identifier: 3, + }, + PartitionRange { + start: Timestamp::new(0, TimeUnit::Second), + end: Timestamp::new(10, TimeUnit::Second), + num_rows: 100, + identifier: 1, + }, + ], + ]; + assert_eq!(result, expected); + + // assign 0 ranges to 5 partitions. Only 1 partition is returned. + let result = ParallelizeScan::assign_partition_range(vec![], 5); + assert_eq!(result.len(), 1); + } + + #[test] + fn test_assign_unbalance_partition_range() { + let ranges = vec![ + PartitionRange { start: Timestamp::new(0, TimeUnit::Second), end: Timestamp::new(10, TimeUnit::Second), num_rows: 100, identifier: 1, - }], - vec![PartitionRange { + }, + PartitionRange { start: Timestamp::new(10, TimeUnit::Second), end: Timestamp::new(20, TimeUnit::Second), num_rows: 200, identifier: 2, - }], - vec![PartitionRange { + }, + PartitionRange { start: Timestamp::new(20, TimeUnit::Second), end: Timestamp::new(30, TimeUnit::Second), num_rows: 150, identifier: 3, - }], + }, + PartitionRange { + start: Timestamp::new(30, TimeUnit::Second), + end: Timestamp::new(40, TimeUnit::Second), + num_rows: 2500, + identifier: 4, + }, + ]; + + // assign to 2 partitions + let expected_partition_num = 2; + let result = + ParallelizeScan::assign_partition_range(ranges.clone(), expected_partition_num); + let expected = vec![ vec![PartitionRange { start: Timestamp::new(30, TimeUnit::Second), end: Timestamp::new(40, TimeUnit::Second), - num_rows: 250, + num_rows: 2500, identifier: 4, }], + vec![ + PartitionRange { + start: Timestamp::new(10, TimeUnit::Second), + end: Timestamp::new(20, TimeUnit::Second), + num_rows: 200, + identifier: 2, + }, + PartitionRange { + start: Timestamp::new(20, TimeUnit::Second), + end: Timestamp::new(30, TimeUnit::Second), + num_rows: 150, + identifier: 3, + }, + PartitionRange { + start: Timestamp::new(0, TimeUnit::Second), + end: Timestamp::new(10, TimeUnit::Second), + num_rows: 100, + identifier: 1, + }, + ], ]; assert_eq!(result, expected); - - // assign 0 ranges to 5 partitions. Only 1 partition is returned. - let result = ParallelizeScan::assign_partition_range(vec![], 5); - assert_eq!(result.len(), 1); } } diff --git a/src/store-api/src/region_engine.rs b/src/store-api/src/region_engine.rs index 8dd706395d1d..c9b0ac53db59 100644 --- a/src/store-api/src/region_engine.rs +++ b/src/store-api/src/region_engine.rs @@ -206,16 +206,13 @@ pub struct ScannerProperties { /// Whether to yield an empty batch to distinguish partition ranges. pub distinguish_partition_range: bool, + + /// The target partitions of the scanner. 0 indicates using the number of partitions as target partitions. + target_partitions: usize, } impl ScannerProperties { - /// Initialize partitions with given parallelism for scanner. - pub fn with_parallelism(mut self, parallelism: usize) -> Self { - self.partitions = vec![vec![]; parallelism]; - self - } - - /// Set append mode for scanner. + /// Sets append mode for scanner. pub fn with_append_mode(mut self, append_mode: bool) -> Self { self.append_mode = append_mode; self @@ -234,9 +231,24 @@ impl ScannerProperties { append_mode, total_rows, distinguish_partition_range: false, + target_partitions: 0, } } + /// Updates the properties with the given [PrepareRequest]. + pub fn prepare(&mut self, request: PrepareRequest) { + if let Some(ranges) = request.ranges { + self.partitions = ranges; + } + if let Some(distinguish_partition_range) = request.distinguish_partition_range { + self.distinguish_partition_range = distinguish_partition_range; + } + if let Some(target_partitions) = request.target_partitions { + self.target_partitions = target_partitions; + } + } + + /// Returns the number of actual partitions. pub fn num_partitions(&self) -> usize { self.partitions.len() } @@ -249,8 +261,44 @@ impl ScannerProperties { self.total_rows } - pub fn distinguish_partition_range(&self) -> bool { - self.distinguish_partition_range + /// Returns the target partitions of the scanner. If it is not set, returns the number of partitions. + pub fn target_partitions(&self) -> usize { + if self.target_partitions == 0 { + self.num_partitions() + } else { + self.target_partitions + } + } +} + +/// Request to override the scanner properties. +#[derive(Default)] +pub struct PrepareRequest { + /// Assigned partition ranges. + pub ranges: Option>>, + /// Distringuishes partition range by empty batches. + pub distinguish_partition_range: Option, + /// The expected number of target partitions. + pub target_partitions: Option, +} + +impl PrepareRequest { + /// Sets the ranges. + pub fn with_ranges(mut self, ranges: Vec>) -> Self { + self.ranges = Some(ranges); + self + } + + /// Sets the distinguish partition range flag. + pub fn with_distinguish_partition_range(mut self, distinguish_partition_range: bool) -> Self { + self.distinguish_partition_range = Some(distinguish_partition_range); + self + } + + /// Sets the target partitions. + pub fn with_target_partitions(mut self, target_partitions: usize) -> Self { + self.target_partitions = Some(target_partitions); + self } } @@ -271,11 +319,7 @@ pub trait RegionScanner: Debug + DisplayAs + Send { /// Prepares the scanner with the given partition ranges. /// /// This method is for the planner to adjust the scanner's behavior based on the partition ranges. - fn prepare( - &mut self, - ranges: Vec>, - distinguish_partition_range: bool, - ) -> Result<(), BoxedError>; + fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError>; /// Scans the partition and returns a stream of record batches. /// @@ -431,9 +475,7 @@ impl SinglePartitionScanner { Self { stream: Mutex::new(Some(stream)), schema, - properties: ScannerProperties::default() - .with_parallelism(1) - .with_append_mode(append_mode), + properties: ScannerProperties::default().with_append_mode(append_mode), metadata, } } @@ -454,13 +496,8 @@ impl RegionScanner for SinglePartitionScanner { self.schema.clone() } - fn prepare( - &mut self, - ranges: Vec>, - distinguish_partition_range: bool, - ) -> Result<(), BoxedError> { - self.properties.partitions = ranges; - self.properties.distinguish_partition_range = distinguish_partition_range; + fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> { + self.properties.prepare(request); Ok(()) } diff --git a/src/table/src/table/scan.rs b/src/table/src/table/scan.rs index 0eac7c0c354f..e4b47fa4fb2a 100644 --- a/src/table/src/table/scan.rs +++ b/src/table/src/table/scan.rs @@ -35,7 +35,7 @@ use datafusion_common::{ColumnStatistics, DataFusionError, Statistics}; use datafusion_physical_expr::{EquivalenceProperties, Partitioning, PhysicalSortExpr}; use datatypes::arrow::datatypes::SchemaRef as ArrowSchemaRef; use futures::{Stream, StreamExt}; -use store_api::region_engine::{PartitionRange, RegionScannerRef}; +use store_api::region_engine::{PartitionRange, PrepareRequest, RegionScannerRef}; use crate::table::metrics::StreamMetrics; @@ -112,6 +112,7 @@ impl RegionScanExec { pub fn with_new_partitions( &self, partitions: Vec>, + target_partitions: usize, ) -> Result { if self.is_partition_set { warn!("Setting partition ranges more than once for RegionScanExec"); @@ -123,8 +124,11 @@ impl RegionScanExec { { let mut scanner = self.scanner.lock().unwrap(); - let distinguish_partition_range = scanner.properties().distinguish_partition_range(); - scanner.prepare(partitions, distinguish_partition_range)?; + scanner.prepare( + PrepareRequest::default() + .with_ranges(partitions) + .with_target_partitions(target_partitions), + )?; } Ok(Self { @@ -141,9 +145,10 @@ impl RegionScanExec { pub fn with_distinguish_partition_range(&self, distinguish_partition_range: bool) { let mut scanner = self.scanner.lock().unwrap(); - let partition_ranges = scanner.properties().partitions.clone(); // set distinguish_partition_range won't fail - let _ = scanner.prepare(partition_ranges, distinguish_partition_range); + let _ = scanner.prepare( + PrepareRequest::default().with_distinguish_partition_range(distinguish_partition_range), + ); } pub fn time_index(&self) -> String { diff --git a/tests/cases/standalone/common/order/windowed_sort.result b/tests/cases/standalone/common/order/windowed_sort.result index 13b3503fb943..3ae8f9f8469a 100644 --- a/tests/cases/standalone/common/order/windowed_sort.result +++ b/tests/cases/standalone/common/order/windowed_sort.result @@ -1,5 +1,5 @@ -- Test without PK, with a windowed sort query. -CREATE TABLE test(i INTEGER, t TIMESTAMP TIME INDEX); +CREATE TABLE test(i INTEGER, t TIMESTAMP TIME INDEX) WITH('compaction.type'='twcs', 'compaction.twcs.max_inactive_window_files'='4'); Affected Rows: 0 @@ -69,8 +69,8 @@ EXPLAIN ANALYZE SELECT * FROM test ORDER BY t LIMIT 5; |_|_|_| | 1_| 0_|_GlobalLimitExec: skip=0, fetch=5 REDACTED |_|_|_SortPreservingMergeExec: [t@1 ASC NULLS LAST] REDACTED -|_|_|_WindowedSortExec: expr=t@1 ASC NULLS LAST num_ranges=2 fetch=5 REDACTED -|_|_|_SeqScan: region=REDACTED, partition_count=2 (1 memtable ranges, 1 file 1 ranges) REDACTED +|_|_|_WindowedSortExec: expr=t@1 ASC NULLS LAST num_ranges=4 fetch=5 REDACTED +|_|_|_SeqScan: region=REDACTED, partition_count=4 (1 memtable ranges, 3 file 3 ranges) REDACTED |_|_|_| |_|_| Total rows: 5_| +-+-+-+ @@ -101,9 +101,9 @@ EXPLAIN ANALYZE SELECT * FROM test ORDER BY t DESC LIMIT 5; |_|_|_| | 1_| 0_|_GlobalLimitExec: skip=0, fetch=5 REDACTED |_|_|_SortPreservingMergeExec: [t@1 DESC] REDACTED -|_|_|_WindowedSortExec: expr=t@1 DESC num_ranges=2 fetch=5 REDACTED -|_|_|_PartSortExec: expr=t@1 DESC num_ranges=2 limit=5 REDACTED -|_|_|_SeqScan: region=REDACTED, partition_count=2 (1 memtable ranges, 1 file 1 ranges) REDACTED +|_|_|_WindowedSortExec: expr=t@1 DESC num_ranges=4 fetch=5 REDACTED +|_|_|_PartSortExec: expr=t@1 DESC num_ranges=4 limit=5 REDACTED +|_|_|_SeqScan: region=REDACTED, partition_count=4 (1 memtable ranges, 3 file 3 ranges) REDACTED |_|_|_| |_|_| Total rows: 5_| +-+-+-+ @@ -113,7 +113,7 @@ DROP TABLE test; Affected Rows: 0 -- Test with PK, with a windowed sort query. -CREATE TABLE test_pk(pk INTEGER PRIMARY KEY, i INTEGER, t TIMESTAMP TIME INDEX); +CREATE TABLE test_pk(pk INTEGER PRIMARY KEY, i INTEGER, t TIMESTAMP TIME INDEX) WITH('compaction.type'='twcs', 'compaction.twcs.max_inactive_window_files'='4'); Affected Rows: 0 @@ -183,9 +183,9 @@ EXPLAIN ANALYZE SELECT * FROM test_pk ORDER BY t LIMIT 5; |_|_|_| | 1_| 0_|_GlobalLimitExec: skip=0, fetch=5 REDACTED |_|_|_SortPreservingMergeExec: [t@2 ASC NULLS LAST] REDACTED -|_|_|_WindowedSortExec: expr=t@2 ASC NULLS LAST num_ranges=2 fetch=5 REDACTED -|_|_|_PartSortExec: expr=t@2 ASC NULLS LAST num_ranges=2 limit=5 REDACTED -|_|_|_SeqScan: region=REDACTED, partition_count=2 (1 memtable ranges, 1 file 1 ranges) REDACTED +|_|_|_WindowedSortExec: expr=t@2 ASC NULLS LAST num_ranges=4 fetch=5 REDACTED +|_|_|_PartSortExec: expr=t@2 ASC NULLS LAST num_ranges=4 limit=5 REDACTED +|_|_|_SeqScan: region=REDACTED, partition_count=4 (1 memtable ranges, 3 file 3 ranges) REDACTED |_|_|_| |_|_| Total rows: 5_| +-+-+-+ @@ -216,9 +216,9 @@ EXPLAIN ANALYZE SELECT * FROM test_pk ORDER BY t DESC LIMIT 5; |_|_|_| | 1_| 0_|_GlobalLimitExec: skip=0, fetch=5 REDACTED |_|_|_SortPreservingMergeExec: [t@2 DESC] REDACTED -|_|_|_WindowedSortExec: expr=t@2 DESC num_ranges=2 fetch=5 REDACTED -|_|_|_PartSortExec: expr=t@2 DESC num_ranges=2 limit=5 REDACTED -|_|_|_SeqScan: region=REDACTED, partition_count=2 (1 memtable ranges, 1 file 1 ranges) REDACTED +|_|_|_WindowedSortExec: expr=t@2 DESC num_ranges=4 fetch=5 REDACTED +|_|_|_PartSortExec: expr=t@2 DESC num_ranges=4 limit=5 REDACTED +|_|_|_SeqScan: region=REDACTED, partition_count=4 (1 memtable ranges, 3 file 3 ranges) REDACTED |_|_|_| |_|_| Total rows: 5_| +-+-+-+ diff --git a/tests/cases/standalone/common/order/windowed_sort.sql b/tests/cases/standalone/common/order/windowed_sort.sql index e8006f74ce17..e21ae3764bdb 100644 --- a/tests/cases/standalone/common/order/windowed_sort.sql +++ b/tests/cases/standalone/common/order/windowed_sort.sql @@ -1,5 +1,5 @@ -- Test without PK, with a windowed sort query. -CREATE TABLE test(i INTEGER, t TIMESTAMP TIME INDEX); +CREATE TABLE test(i INTEGER, t TIMESTAMP TIME INDEX) WITH('compaction.type'='twcs', 'compaction.twcs.max_inactive_window_files'='4'); INSERT INTO test VALUES (1, 1), (NULL, 2), (1, 3); @@ -36,7 +36,7 @@ EXPLAIN ANALYZE SELECT * FROM test ORDER BY t DESC LIMIT 5; DROP TABLE test; -- Test with PK, with a windowed sort query. -CREATE TABLE test_pk(pk INTEGER PRIMARY KEY, i INTEGER, t TIMESTAMP TIME INDEX); +CREATE TABLE test_pk(pk INTEGER PRIMARY KEY, i INTEGER, t TIMESTAMP TIME INDEX) WITH('compaction.type'='twcs', 'compaction.twcs.max_inactive_window_files'='4'); INSERT INTO test_pk VALUES (1, 1, 1), (2, NULL, 2), (3, 1, 3);