From 8ca35a4a1ad66dcbe06622833381a09427b18237 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Tue, 3 Sep 2024 21:42:38 +0800 Subject: [PATCH] fix: use number of partitions as parallilism in region scanner (#4669) * fix: use number of partitions as parallilism in region scanner Signed-off-by: Ruihang Xia * add sqlness Signed-off-by: Ruihang Xia Co-authored-by: Lei HUANG * order by ts Signed-off-by: Ruihang Xia * debug pring time range Signed-off-by: Ruihang Xia --------- Signed-off-by: Ruihang Xia Co-authored-by: Lei HUANG --- src/mito2/src/read/scan_region.rs | 3 +- src/mito2/src/read/seq_scan.rs | 19 +-- src/mito2/src/read/unordered_scan.rs | 11 +- .../common/select/flush_append_only.result | 108 ++++++++++++++++++ .../common/select/flush_append_only.sql | 58 ++++++++++ 5 files changed, 186 insertions(+), 13 deletions(-) create mode 100644 tests/cases/standalone/common/select/flush_append_only.result create mode 100644 tests/cases/standalone/common/select/flush_append_only.sql diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index b13068dbec5a..a13bdf697265 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -284,9 +284,10 @@ impl ScanRegion { .collect(); debug!( - "Scan region {}, request: {:?}, memtables: {}, ssts_to_read: {}, append_mode: {}", + "Scan region {}, request: {:?}, time range: {:?}, memtables: {}, ssts_to_read: {}, append_mode: {}", self.version.metadata.region_id, self.request, + time_range, memtables.len(), files.len(), self.version.options.append_mode, diff --git a/src/mito2/src/read/seq_scan.rs b/src/mito2/src/read/seq_scan.rs index 80fbb3189f43..e80fc106e24f 100644 --- a/src/mito2/src/read/seq_scan.rs +++ b/src/mito2/src/read/seq_scan.rs @@ -112,6 +112,7 @@ impl SeqScan { self.semaphore.clone(), &mut metrics, self.compaction, + self.properties.num_partitions(), ) .await?; // Safety: `build_merge_reader()` always returns a reader if partition is None. @@ -184,10 +185,11 @@ impl SeqScan { semaphore: Arc, metrics: &mut ScannerMetrics, compaction: bool, + parallelism: usize, ) -> Result> { // initialize parts list let mut parts = stream_ctx.parts.lock().await; - Self::maybe_init_parts(&stream_ctx.input, &mut parts, metrics).await?; + Self::maybe_init_parts(&stream_ctx.input, &mut parts, metrics, parallelism).await?; let parts_len = parts.0.len(); let mut sources = Vec::with_capacity(parts_len); @@ -211,11 +213,12 @@ impl SeqScan { semaphore: Arc, metrics: &mut ScannerMetrics, compaction: bool, + parallelism: usize, ) -> Result> { let mut sources = Vec::new(); let build_start = { let mut parts = stream_ctx.parts.lock().await; - Self::maybe_init_parts(&stream_ctx.input, &mut parts, metrics).await?; + Self::maybe_init_parts(&stream_ctx.input, &mut parts, metrics, parallelism).await?; let Some(part) = parts.0.get_part(range_id) else { return Ok(None); @@ -311,12 +314,13 @@ impl SeqScan { let semaphore = self.semaphore.clone(); let partition_ranges = self.properties.partitions[partition].clone(); let compaction = self.compaction; + let parallelism = self.properties.num_partitions(); let stream = try_stream! { let first_poll = stream_ctx.query_start.elapsed(); for partition_range in partition_ranges { let maybe_reader = - Self::build_merge_reader(&stream_ctx, partition_range.identifier, semaphore.clone(), &mut metrics, compaction) + Self::build_merge_reader(&stream_ctx, partition_range.identifier, semaphore.clone(), &mut metrics, compaction, parallelism) .await .map_err(BoxedError::new) .context(ExternalSnafu)?; @@ -390,6 +394,7 @@ impl SeqScan { let stream_ctx = self.stream_ctx.clone(); let semaphore = self.semaphore.clone(); let compaction = self.compaction; + let parallelism = self.properties.num_partitions(); // build stream let stream = try_stream! { @@ -398,7 +403,7 @@ impl SeqScan { // init parts let parts_len = { let mut parts = stream_ctx.parts.lock().await; - Self::maybe_init_parts(&stream_ctx.input, &mut parts, &mut metrics).await + Self::maybe_init_parts(&stream_ctx.input, &mut parts, &mut metrics, parallelism).await .map_err(BoxedError::new) .context(ExternalSnafu)?; parts.0.len() @@ -411,6 +416,7 @@ impl SeqScan { semaphore.clone(), &mut metrics, compaction, + parallelism ) .await .map_err(BoxedError::new) @@ -467,6 +473,7 @@ impl SeqScan { input: &ScanInput, part_list: &mut (ScanPartList, Duration), metrics: &mut ScannerMetrics, + parallelism: usize, ) -> Result<()> { if part_list.0.is_none() { let now = Instant::now(); @@ -477,9 +484,7 @@ impl SeqScan { Some(input.mapper.column_ids()), input.predicate.clone(), ); - part_list - .0 - .set_parts(distributor.build_parts(input.parallelism.parallelism)); + part_list.0.set_parts(distributor.build_parts(parallelism)); let build_part_cost = now.elapsed(); part_list.1 = build_part_cost; diff --git a/src/mito2/src/read/unordered_scan.rs b/src/mito2/src/read/unordered_scan.rs index 3bca8e0afea8..ec43654e09c2 100644 --- a/src/mito2/src/read/unordered_scan.rs +++ b/src/mito2/src/read/unordered_scan.rs @@ -59,10 +59,11 @@ impl UnorderedScan { /// Creates a new [UnorderedScan]. pub(crate) fn new(input: ScanInput) -> Self { let parallelism = input.parallelism.parallelism.max(1); - let properties = ScannerProperties::default() + let mut properties = ScannerProperties::default() .with_parallelism(parallelism) .with_append_mode(input.append_mode) .with_total_rows(input.total_rows()); + properties.partitions = vec![input.partition_ranges()]; let stream_ctx = Arc::new(StreamContext::new(input)); Self { @@ -148,12 +149,13 @@ impl RegionScanner for UnorderedScan { ..Default::default() }; let stream_ctx = self.stream_ctx.clone(); + let parallelism = self.properties.num_partitions(); let stream = try_stream! { let first_poll = stream_ctx.query_start.elapsed(); let part = { let mut parts = stream_ctx.parts.lock().await; - maybe_init_parts(&stream_ctx.input, &mut parts, &mut metrics) + maybe_init_parts(&stream_ctx.input, &mut parts, &mut metrics, parallelism) .await .map_err(BoxedError::new) .context(ExternalSnafu)?; @@ -260,6 +262,7 @@ async fn maybe_init_parts( input: &ScanInput, part_list: &mut (ScanPartList, Duration), metrics: &mut ScannerMetrics, + parallelism: usize, ) -> Result<()> { if part_list.0.is_none() { let now = Instant::now(); @@ -270,9 +273,7 @@ async fn maybe_init_parts( Some(input.mapper.column_ids()), input.predicate.clone(), ); - part_list - .0 - .set_parts(distributor.build_parts(input.parallelism.parallelism)); + part_list.0.set_parts(distributor.build_parts(parallelism)); let build_part_cost = now.elapsed(); part_list.1 = build_part_cost; diff --git a/tests/cases/standalone/common/select/flush_append_only.result b/tests/cases/standalone/common/select/flush_append_only.result new file mode 100644 index 000000000000..38f221aa93c7 --- /dev/null +++ b/tests/cases/standalone/common/select/flush_append_only.result @@ -0,0 +1,108 @@ +create table + t ( + ts timestamp time index, + host string primary key, + not_pk string, + val double, + ) +with + ( + append_mode = 'true', + 'compaction.type' = 'twcs', + 'compaction.twcs.max_active_window_files' = '8', + 'compaction.twcs.max_inactive_window_files' = '8' + ); + +Affected Rows: 0 + +insert into + t +values + (0, 'a', '🌕', 1.0), + (1, 'b', '🌖', 2.0), + (1, 'a', '🌗', 3.0), + (1, 'c', '🌘', 4.0), + (2, 'a', '🌑', 5.0), + (2, 'b', '🌒', 6.0), + (2, 'a', '🌓', 7.0), + (3, 'c', '🌔', 8.0), + (3, 'd', '🌕', 9.0); + +Affected Rows: 9 + +admin flush_table ('t'); + ++------------------------+ +| ADMIN flush_table('t') | ++------------------------+ +| 0 | ++------------------------+ + +insert into + t +values + (10, 'a', '🌕', 1.0), + (11, 'b', '🌖', 2.0), + (11, 'a', '🌗', 3.0), + (11, 'c', '🌘', 4.0), + (12, 'a', '🌑', 5.0), + (12, 'b', '🌒', 6.0), + (12, 'a', '🌓', 7.0), + (13, 'c', '🌔', 8.0), + (13, 'd', '🌕', 9.0); + +Affected Rows: 9 + +admin flush_table ('t'); + ++------------------------+ +| ADMIN flush_table('t') | ++------------------------+ +| 0 | ++------------------------+ + +select + count(ts) +from + t; + ++-------------+ +| COUNT(t.ts) | ++-------------+ +| 18 | ++-------------+ + +select + ts +from + t +order by + ts; + ++-------------------------+ +| ts | ++-------------------------+ +| 1970-01-01T00:00:00 | +| 1970-01-01T00:00:00.001 | +| 1970-01-01T00:00:00.001 | +| 1970-01-01T00:00:00.001 | +| 1970-01-01T00:00:00.002 | +| 1970-01-01T00:00:00.002 | +| 1970-01-01T00:00:00.002 | +| 1970-01-01T00:00:00.003 | +| 1970-01-01T00:00:00.003 | +| 1970-01-01T00:00:00.010 | +| 1970-01-01T00:00:00.011 | +| 1970-01-01T00:00:00.011 | +| 1970-01-01T00:00:00.011 | +| 1970-01-01T00:00:00.012 | +| 1970-01-01T00:00:00.012 | +| 1970-01-01T00:00:00.012 | +| 1970-01-01T00:00:00.013 | +| 1970-01-01T00:00:00.013 | ++-------------------------+ + +drop table t; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/select/flush_append_only.sql b/tests/cases/standalone/common/select/flush_append_only.sql new file mode 100644 index 000000000000..e8d6defea2a6 --- /dev/null +++ b/tests/cases/standalone/common/select/flush_append_only.sql @@ -0,0 +1,58 @@ +create table + t ( + ts timestamp time index, + host string primary key, + not_pk string, + val double, + ) +with + ( + append_mode = 'true', + 'compaction.type' = 'twcs', + 'compaction.twcs.max_active_window_files' = '8', + 'compaction.twcs.max_inactive_window_files' = '8' + ); + +insert into + t +values + (0, 'a', '🌕', 1.0), + (1, 'b', '🌖', 2.0), + (1, 'a', '🌗', 3.0), + (1, 'c', '🌘', 4.0), + (2, 'a', '🌑', 5.0), + (2, 'b', '🌒', 6.0), + (2, 'a', '🌓', 7.0), + (3, 'c', '🌔', 8.0), + (3, 'd', '🌕', 9.0); + +admin flush_table ('t'); + +insert into + t +values + (10, 'a', '🌕', 1.0), + (11, 'b', '🌖', 2.0), + (11, 'a', '🌗', 3.0), + (11, 'c', '🌘', 4.0), + (12, 'a', '🌑', 5.0), + (12, 'b', '🌒', 6.0), + (12, 'a', '🌓', 7.0), + (13, 'c', '🌔', 8.0), + (13, 'd', '🌕', 9.0); + +admin flush_table ('t'); + +select + count(ts) +from + t; + +select + ts +from + t +order by + ts; + +drop table t;