diff --git a/Cargo.lock b/Cargo.lock index b8eafcbdcbcf..46dd2146115c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4898,6 +4898,7 @@ dependencies = [ "strum 0.25.0", "table", "tokio", + "tokio-stream", "tokio-util", "uuid", ] diff --git a/Cargo.toml b/Cargo.toml index a5ec4a69ff10..b24f725332be 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -127,6 +127,7 @@ sqlparser = { git = "https://github.com/GreptimeTeam/sqlparser-rs.git", rev = "6 strum = { version = "0.25", features = ["derive"] } tempfile = "3" tokio = { version = "1.28", features = ["full"] } +tokio-stream = { version = "0.1" } tokio-util = { version = "0.7", features = ["io-util", "compat"] } toml = "0.7" tonic = { version = "0.10", features = ["tls"] } @@ -168,7 +169,6 @@ frontend = { path = "src/frontend" } log-store = { path = "src/log-store" } meta-client = { path = "src/meta-client" } meta-srv = { path = "src/meta-srv" } -mito = { path = "src/mito" } mito2 = { path = "src/mito2" } object-store = { path = "src/object-store" } operator = { path = "src/operator" } diff --git a/config/datanode.example.toml b/config/datanode.example.toml index 9b24fd8dcff8..a0cc3601906e 100644 --- a/config/datanode.example.toml +++ b/config/datanode.example.toml @@ -89,6 +89,13 @@ vector_cache_size = "512MB" page_cache_size = "512MB" # Buffer size for SST writing. sst_write_buffer_size = "8MB" +# Parallelism to scan a region (default: 1/4 of cpu cores). +# - 0: using the default value (1/4 of cpu cores). +# - 1: scan in current thread. +# - n: scan in parallelism n. +scan_parallelism = 0 +# Capacity of the channel to send data from parallel scan tasks to the main task (default 32). +parallel_scan_channel_size = 32 # Log options, see `standalone.example.toml` # [logging] diff --git a/config/standalone.example.toml b/config/standalone.example.toml index 0e412c4a6313..fd6965ab3946 100644 --- a/config/standalone.example.toml +++ b/config/standalone.example.toml @@ -157,6 +157,13 @@ vector_cache_size = "512MB" page_cache_size = "512MB" # Buffer size for SST writing. sst_write_buffer_size = "8MB" +# Parallelism to scan a region (default: 1/4 of cpu cores). +# - 0: using the default value (1/4 of cpu cores). +# - 1: scan in current thread. +# - n: scan in parallelism n. +scan_parallelism = 0 +# Capacity of the channel to send data from parallel scan tasks to the main task (default 32). +parallel_scan_channel_size = 32 # Log options # [logging] diff --git a/src/client/Cargo.toml b/src/client/Cargo.toml index 840ab07b57ec..276ee43cf81b 100644 --- a/src/client/Cargo.toml +++ b/src/client/Cargo.toml @@ -35,7 +35,7 @@ prost.workspace = true rand.workspace = true session.workspace = true snafu.workspace = true -tokio-stream = { version = "0.1", features = ["net"] } +tokio-stream = { workspace = true, features = ["net"] } tokio.workspace = true tonic.workspace = true diff --git a/src/datanode/Cargo.toml b/src/datanode/Cargo.toml index 019430919521..4a51fd3a8a64 100644 --- a/src/datanode/Cargo.toml +++ b/src/datanode/Cargo.toml @@ -62,7 +62,7 @@ sql.workspace = true store-api.workspace = true substrait.workspace = true table.workspace = true -tokio-stream = { version = "0.1", features = ["net"] } +tokio-stream = { workspace = true, features = ["net"] } tokio.workspace = true toml.workspace = true tonic.workspace = true diff --git a/src/meta-client/Cargo.toml b/src/meta-client/Cargo.toml index 43f12cc9b36a..56765db3337f 100644 --- a/src/meta-client/Cargo.toml +++ b/src/meta-client/Cargo.toml @@ -20,7 +20,7 @@ serde.workspace = true serde_json.workspace = true snafu.workspace = true table.workspace = true -tokio-stream = { version = "0.1", features = ["net"] } +tokio-stream = { workspace = true, features = ["net"] } tokio.workspace = true tonic.workspace = true diff --git a/src/meta-srv/Cargo.toml b/src/meta-srv/Cargo.toml index f38f82be64bd..2df19d8bea0e 100644 --- a/src/meta-srv/Cargo.toml +++ b/src/meta-srv/Cargo.toml @@ -49,7 +49,7 @@ snafu.workspace = true store-api.workspace = true strum.workspace = true table.workspace = true -tokio-stream = { version = "0.1", features = ["net"] } +tokio-stream = { workspace = true, features = ["net"] } tokio.workspace = true toml.workspace = true tonic.workspace = true diff --git a/src/mito2/Cargo.toml b/src/mito2/Cargo.toml index 7f340b04a724..0e2bd5ad39ca 100644 --- a/src/mito2/Cargo.toml +++ b/src/mito2/Cargo.toml @@ -58,6 +58,7 @@ snafu.workspace = true store-api.workspace = true strum.workspace = true table.workspace = true +tokio-stream.workspace = true tokio-util.workspace = true tokio.workspace = true uuid.workspace = true diff --git a/src/mito2/src/config.rs b/src/mito2/src/config.rs index edb7d3dd0d65..2e779b760260 100644 --- a/src/mito2/src/config.rs +++ b/src/mito2/src/config.rs @@ -24,13 +24,16 @@ use serde::{Deserialize, Serialize}; const DEFAULT_MAX_BG_JOB: usize = 4; const MULTIPART_UPLOAD_MINIMUM_SIZE: ReadableSize = ReadableSize::mb(5); +/// Default channel size for parallel scan task. +const DEFAULT_SCAN_CHANNEL_SIZE: usize = 32; /// Configuration for [MitoEngine](crate::engine::MitoEngine). #[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] #[serde(default)] pub struct MitoConfig { // Worker configs: - /// Number of region workers (default 1). + /// Number of region workers (default: 1/2 of cpu cores). + /// Sets to 0 to use the default value. pub num_workers: usize, /// Request channel size of each worker (default 128). pub worker_channel_size: usize, @@ -68,12 +71,19 @@ pub struct MitoConfig { // Other configs: /// Buffer size for SST writing. pub sst_write_buffer_size: ReadableSize, + /// Parallelism to scan a region (default: 1/4 of cpu cores). + /// - 0: using the default value (1/4 of cpu cores). + /// - 1: scan in current thread. + /// - n: scan in parallelism n. + pub scan_parallelism: usize, + /// Capacity of the channel to send data from parallel scan tasks to the main task (default 32). + pub parallel_scan_channel_size: usize, } impl Default for MitoConfig { fn default() -> Self { MitoConfig { - num_workers: num_cpus::get() / 2, + num_workers: divide_num_cpus(2), worker_channel_size: 128, worker_request_batch_size: 64, manifest_checkpoint_distance: 10, @@ -86,6 +96,8 @@ impl Default for MitoConfig { vector_cache_size: ReadableSize::mb(512), page_cache_size: ReadableSize::mb(512), sst_write_buffer_size: ReadableSize::mb(8), + scan_parallelism: divide_num_cpus(4), + parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE, } } } @@ -93,16 +105,9 @@ impl Default for MitoConfig { impl MitoConfig { /// Sanitize incorrect configurations. pub(crate) fn sanitize(&mut self) { - // Sanitize worker num. - let num_workers_before = self.num_workers; + // Use default value if `num_workers` is 0. if self.num_workers == 0 { - self.num_workers = (num_cpus::get() / 2).max(1); - } - if num_workers_before != self.num_workers { - warn!( - "Sanitize worker num {} to {}", - num_workers_before, self.num_workers - ); + self.num_workers = divide_num_cpus(2); } // Sanitize channel size. @@ -131,5 +136,27 @@ impl MitoConfig { self.sst_write_buffer_size ); } + + // Use default value if `scan_parallelism` is 0. + if self.scan_parallelism == 0 { + self.scan_parallelism = divide_num_cpus(4); + } + + if self.parallel_scan_channel_size < 1 { + self.parallel_scan_channel_size = DEFAULT_SCAN_CHANNEL_SIZE; + warn!( + "Sanitize scan channel size to {}", + self.parallel_scan_channel_size + ); + } } } + +/// Divide cpu num by a non-zero `divisor` and returns at least 1. +fn divide_num_cpus(divisor: usize) -> usize { + debug_assert!(divisor > 0); + let cores = num_cpus::get(); + debug_assert!(cores > 0); + + (cores + divisor - 1) / divisor +} diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index f1a7e4455d04..dfca088f2d96 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -33,6 +33,8 @@ pub mod listener; #[cfg(test)] mod open_test; #[cfg(test)] +mod parallel_test; +#[cfg(test)] mod projection_test; #[cfg(test)] mod prune_test; @@ -40,6 +42,7 @@ mod prune_test; mod set_readonly_test; #[cfg(test)] mod truncate_test; + use std::sync::Arc; use async_trait::async_trait; @@ -56,7 +59,7 @@ use store_api::storage::{RegionId, ScanRequest}; use crate::config::MitoConfig; use crate::error::{RecvSnafu, RegionNotFoundSnafu, Result}; use crate::metrics::HANDLE_REQUEST_ELAPSED; -use crate::read::scan_region::{ScanRegion, Scanner}; +use crate::read::scan_region::{ScanParallism, ScanRegion, Scanner}; use crate::region::RegionUsage; use crate::request::WorkerRequest; use crate::worker::WorkerGroup; @@ -114,6 +117,8 @@ impl MitoEngine { struct EngineInner { /// Region workers group. workers: WorkerGroup, + /// Config of the engine. + config: Arc, } impl EngineInner { @@ -123,8 +128,10 @@ impl EngineInner { log_store: Arc, object_store_manager: ObjectStoreManagerRef, ) -> EngineInner { + let config = Arc::new(config); EngineInner { - workers: WorkerGroup::start(config, log_store, object_store_manager), + workers: WorkerGroup::start(config.clone(), log_store, object_store_manager), + config, } } @@ -171,12 +178,18 @@ impl EngineInner { let version = region.version(); // Get cache. let cache_manager = self.workers.cache_manager(); + let scan_parallelism = ScanParallism { + parallelism: self.config.scan_parallelism, + channel_size: self.config.parallel_scan_channel_size, + }; + let scan_region = ScanRegion::new( version, region.access_layer.clone(), request, Some(cache_manager), - ); + ) + .with_parallelism(scan_parallelism); scan_region.scanner() } @@ -303,15 +316,17 @@ impl MitoEngine { ) -> MitoEngine { config.sanitize(); + let config = Arc::new(config); MitoEngine { inner: Arc::new(EngineInner { workers: WorkerGroup::start_for_test( - config, + config.clone(), log_store, object_store_manager, write_buffer_manager, listener, ), + config, }), } } diff --git a/src/mito2/src/engine/parallel_test.rs b/src/mito2/src/engine/parallel_test.rs new file mode 100644 index 000000000000..3bed94f98a47 --- /dev/null +++ b/src/mito2/src/engine/parallel_test.rs @@ -0,0 +1,133 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Tests for parallel scan. + +use std::collections::HashMap; + +use api::v1::Rows; +use common_recordbatch::RecordBatches; +use store_api::region_engine::RegionEngine; +use store_api::region_request::{RegionOpenRequest, RegionRequest}; +use store_api::storage::{RegionId, ScanRequest}; + +use crate::config::MitoConfig; +use crate::test_util::{ + build_delete_rows_for_key, build_rows_for_key, delete_rows, delete_rows_schema, flush_region, + put_rows, rows_schema, CreateRequestBuilder, TestEnv, +}; + +async fn scan_in_parallel( + env: &mut TestEnv, + region_id: RegionId, + region_dir: &str, + parallelism: usize, + channel_size: usize, +) { + let engine = env + .open_engine(MitoConfig { + scan_parallelism: parallelism, + parallel_scan_channel_size: channel_size, + ..Default::default() + }) + .await; + + engine + .handle_request( + region_id, + RegionRequest::Open(RegionOpenRequest { + engine: String::new(), + region_dir: region_dir.to_string(), + options: HashMap::default(), + }), + ) + .await + .unwrap(); + + let request = ScanRequest::default(); + let stream = engine.handle_query(region_id, request).await.unwrap(); + let batches = RecordBatches::try_collect(stream).await.unwrap(); + let expected = "\ ++-------+---------+---------------------+ +| tag_0 | field_0 | ts | ++-------+---------+---------------------+ +| a | 0.0 | 1970-01-01T00:00:00 | +| a | 1.0 | 1970-01-01T00:00:01 | +| b | 2.0 | 1970-01-01T00:00:02 | ++-------+---------+---------------------+"; + assert_eq!(expected, batches.pretty_print().unwrap()); +} + +#[tokio::test] +async fn test_parallel_scan() { + let mut env = TestEnv::new(); + let engine = env.create_engine(MitoConfig::default()).await; + + let region_id = RegionId::new(1, 1); + let request = CreateRequestBuilder::new().build(); + let region_dir = request.region_dir.clone(); + + let column_schemas = rows_schema(&request); + let delete_schema = delete_rows_schema(&request); + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + let rows = Rows { + schema: column_schemas.clone(), + rows: build_rows_for_key("a", 0, 3, 0), + }; + put_rows(&engine, region_id, rows).await; + // SST0 + flush_region(&engine, region_id, None).await; + + let rows = Rows { + schema: column_schemas.clone(), + rows: build_rows_for_key("b", 0, 3, 0), + }; + put_rows(&engine, region_id, rows).await; + // SST1 + flush_region(&engine, region_id, None).await; + + // Delete (a, 2) + let rows = Rows { + schema: delete_schema.clone(), + rows: build_delete_rows_for_key("a", 2, 3), + }; + delete_rows(&engine, region_id, rows).await; + // SST2 + flush_region(&engine, region_id, None).await; + + // Delete (b, 0), (b, 1) + let rows = Rows { + schema: delete_schema, + rows: build_delete_rows_for_key("b", 0, 2), + }; + delete_rows(&engine, region_id, rows).await; + + engine.stop().await.unwrap(); + + scan_in_parallel(&mut env, region_id, ®ion_dir, 0, 1).await; + + scan_in_parallel(&mut env, region_id, ®ion_dir, 1, 1).await; + + scan_in_parallel(&mut env, region_id, ®ion_dir, 2, 1).await; + + scan_in_parallel(&mut env, region_id, ®ion_dir, 2, 8).await; + + scan_in_parallel(&mut env, region_id, ®ion_dir, 4, 8).await; + + scan_in_parallel(&mut env, region_id, ®ion_dir, 8, 2).await; +} diff --git a/src/mito2/src/read.rs b/src/mito2/src/read.rs index 1ef80b3bb1f2..c3ce229780fb 100644 --- a/src/mito2/src/read.rs +++ b/src/mito2/src/read.rs @@ -39,6 +39,8 @@ use datatypes::vectors::{ TimestampNanosecondVector, TimestampSecondVector, UInt32Vector, UInt64Vector, UInt8Vector, Vector, VectorRef, }; +use futures::stream::BoxStream; +use futures::TryStreamExt; use snafu::{ensure, OptionExt, ResultExt}; use store_api::metadata::RegionMetadata; use store_api::storage::{ColumnId, SequenceNumber}; @@ -668,6 +670,8 @@ pub enum Source { Reader(BoxedBatchReader), /// Source from a [BoxedBatchIterator]. Iter(BoxedBatchIterator), + /// Source from a [BoxedBatchStream]. + Stream(BoxedBatchStream), } impl Source { @@ -676,6 +680,7 @@ impl Source { match self { Source::Reader(reader) => reader.next_batch().await, Source::Iter(iter) => iter.next().transpose(), + Source::Stream(stream) => stream.try_next().await, } } } @@ -698,6 +703,9 @@ pub trait BatchReader: Send { /// Pointer to [BatchReader]. pub type BoxedBatchReader = Box; +/// Pointer to a stream that yields [Batch]. +pub type BoxedBatchStream = BoxStream<'static, Result>; + #[async_trait::async_trait] impl BatchReader for Box { async fn next_batch(&mut self) -> Result> { diff --git a/src/mito2/src/read/merge.rs b/src/mito2/src/read/merge.rs index e152956714f8..ae9dca224c91 100644 --- a/src/mito2/src/read/merge.rs +++ b/src/mito2/src/read/merge.rs @@ -329,6 +329,11 @@ impl MergeReaderBuilder { MergeReaderBuilder::default() } + /// Creates a builder from sources. + pub fn from_sources(sources: Vec) -> MergeReaderBuilder { + MergeReaderBuilder { sources } + } + /// Pushes a batch reader to sources. pub fn push_batch_reader(&mut self, reader: BoxedBatchReader) -> &mut Self { self.sources.push(Source::Reader(reader)); @@ -365,6 +370,8 @@ struct Metrics { num_output_rows: usize, /// Number of deleted rows. num_deleted_rows: usize, + /// Cost to fetch batches from sources. + fetch_cost: Duration, } /// A `Node` represent an individual input data source to be merged. @@ -383,7 +390,9 @@ impl Node { /// It tries to fetch one batch from the `source`. async fn new(mut source: Source, metrics: &mut Metrics) -> Result { // Ensures batch is not empty. + let start = Instant::now(); let current_batch = source.next_batch().await?.map(CompareFirst); + metrics.fetch_cost += start.elapsed(); metrics.num_input_rows += current_batch.as_ref().map(|b| b.0.num_rows()).unwrap_or(0); Ok(Node { @@ -420,8 +429,10 @@ impl Node { /// Panics if the node has reached EOF. async fn fetch_batch(&mut self, metrics: &mut Metrics) -> Result { let current = self.current_batch.take().unwrap(); + let start = Instant::now(); // Ensures batch is not empty. self.current_batch = self.source.next_batch().await?.map(CompareFirst); + metrics.fetch_cost += start.elapsed(); metrics.num_input_rows += self .current_batch .as_ref() diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index 2238573248bd..4a8c7028357b 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -115,6 +115,8 @@ pub(crate) struct ScanRegion { request: ScanRequest, /// Cache. cache_manager: Option, + /// Parallelism to scan. + parallelism: ScanParallism, } impl ScanRegion { @@ -130,9 +132,17 @@ impl ScanRegion { access_layer, request, cache_manager, + parallelism: ScanParallism::default(), } } + /// Sets parallelism. + #[must_use] + pub(crate) fn with_parallelism(mut self, parallelism: ScanParallism) -> Self { + self.parallelism = parallelism; + self + } + /// Returns a [Scanner] to scan the region. pub(crate) fn scanner(self) -> Result { self.seq_scan().map(Scanner::Seq) @@ -196,7 +206,8 @@ impl ScanRegion { .with_predicate(Some(predicate)) .with_memtables(memtables) .with_files(files) - .with_cache(self.cache_manager); + .with_cache(self.cache_manager) + .with_parallelism(self.parallelism); Ok(seq_scan) } @@ -215,6 +226,22 @@ impl ScanRegion { } } +/// Config for parallel scan. +#[derive(Debug, Clone, Copy, Default)] +pub(crate) struct ScanParallism { + /// Number of tasks expect to spawn to read data. + pub(crate) parallelism: usize, + /// Channel size to send batches. Only takes effect when the parallelism > 1. + pub(crate) channel_size: usize, +} + +impl ScanParallism { + /// Returns true if we allow parallel scan. + pub(crate) fn allow_parallel_scan(&self) -> bool { + self.parallelism > 1 + } +} + /// Returns true if the time range of a SST `file` matches the `predicate`. fn file_in_range(file: &FileHandle, predicate: &TimestampRange) -> bool { if predicate == &TimestampRange::min_to_max() { diff --git a/src/mito2/src/read/seq_scan.rs b/src/mito2/src/read/seq_scan.rs index b0a86072c1ac..31956a0c2d22 100644 --- a/src/mito2/src/read/seq_scan.rs +++ b/src/mito2/src/read/seq_scan.rs @@ -25,6 +25,8 @@ use common_telemetry::{debug, error}; use common_time::range::TimestampRange; use snafu::ResultExt; use table::predicate::Predicate; +use tokio::sync::{mpsc, Semaphore}; +use tokio_stream::wrappers::ReceiverStream; use crate::access_layer::AccessLayerRef; use crate::cache::{CacheManager, CacheManagerRef}; @@ -34,7 +36,8 @@ use crate::metrics::READ_STAGE_ELAPSED; use crate::read::compat::{self, CompatReader}; use crate::read::merge::MergeReaderBuilder; use crate::read::projection::ProjectionMapper; -use crate::read::{BatchReader, BoxedBatchReader}; +use crate::read::scan_region::ScanParallism; +use crate::read::{BatchReader, BoxedBatchReader, BoxedBatchStream, Source}; use crate::sst::file::FileHandle; /// Scans a region and returns rows in a sorted sequence. @@ -57,6 +60,8 @@ pub struct SeqScan { cache_manager: Option, /// Ignores file not found error. ignore_file_not_found: bool, + /// Parallelism to scan data. + parallelism: ScanParallism, } impl SeqScan { @@ -72,6 +77,7 @@ impl SeqScan { files: Vec::new(), cache_manager: None, ignore_file_not_found: false, + parallelism: ScanParallism::default(), } } @@ -117,18 +123,32 @@ impl SeqScan { self } + /// Sets scan parallelism. + #[must_use] + pub(crate) fn with_parallelism(mut self, parallelism: ScanParallism) -> Self { + self.parallelism = parallelism; + self + } + /// Builds a stream for the query. pub async fn build_stream(&self) -> Result { let start = Instant::now(); + let mut metrics = Metrics::default(); + let use_parallel = self.use_parallel_reader(); // Scans all memtables and SSTs. Builds a merge reader to merge results. - let mut reader = self.build_reader().await?; - let mut metrics = Metrics { - scan_cost: start.elapsed(), + let mut reader = if use_parallel { + self.build_parallel_reader().await? + } else { + self.build_reader().await? }; + let elapsed = start.elapsed(); + metrics.build_reader_cost = elapsed; + metrics.scan_cost = elapsed; // Creates a stream to poll the batch reader and convert batch into record batch. let mapper = self.mapper.clone(); let cache_manager = self.cache_manager.clone(); + let parallelism = self.parallelism.parallelism; let stream = try_stream! { let cache = cache_manager.as_ref().map(|cache| cache.as_ref()); while let Some(batch) = @@ -137,7 +157,10 @@ impl SeqScan { yield batch; } - debug!("Seq scan finished, region_id: {:?}, metrics: {:?}", mapper.metadata().region_id, metrics); + debug!( + "Seq scan finished, region_id: {:?}, metrics: {:?}, use_parallel: {}, parallelism: {}", + mapper.metadata().region_id, metrics, use_parallel, parallelism, + ); // Update metrics. READ_STAGE_ELAPSED.with_label_values(&["total"]).observe(metrics.scan_cost.as_secs_f64()); }; @@ -152,10 +175,35 @@ impl SeqScan { /// Builds a [BoxedBatchReader] from sequential scan. pub async fn build_reader(&self) -> Result { // Scans all memtables and SSTs. Builds a merge reader to merge results. - let mut builder = MergeReaderBuilder::new(); + let sources = self.build_sources().await?; + let mut builder = MergeReaderBuilder::from_sources(sources); + Ok(Box::new(builder.build().await?)) + } + + /// Builds a [BoxedBatchReader] that can scan memtables and SSTs in parallel. + async fn build_parallel_reader(&self) -> Result { + assert!(self.parallelism.allow_parallel_scan()); + // Scall all memtables and SSTs. + let sources = self.build_sources().await?; + let semaphore = Arc::new(Semaphore::new(self.parallelism.parallelism)); + // Spawn a task for each source. + let sources = sources + .into_iter() + .map(|source| { + let stream = self.spawn_scan_task(source, semaphore.clone()); + Source::Stream(stream) + }) + .collect(); + let mut builder = MergeReaderBuilder::from_sources(sources); + Ok(Box::new(builder.build().await?)) + } + + /// Builds and returns sources to read. + async fn build_sources(&self) -> Result> { + let mut sources = Vec::with_capacity(self.memtables.len() + self.files.len()); for mem in &self.memtables { let iter = mem.iter(Some(self.mapper.column_ids()), self.predicate.clone()); - builder.push_batch_iter(iter); + sources.push(Source::Iter(iter)); } for file in &self.files { let maybe_reader = self @@ -179,16 +227,50 @@ impl SeqScan { } }; if compat::has_same_columns(self.mapper.metadata(), reader.metadata()) { - builder.push_batch_reader(Box::new(reader)); + sources.push(Source::Reader(Box::new(reader))); } else { // They have different schema. We need to adapt the batch first so the - // mapper can convert the it. + // mapper can convert it. let compat_reader = CompatReader::new(&self.mapper, reader.metadata().clone(), reader)?; - builder.push_batch_reader(Box::new(compat_reader)); + sources.push(Source::Reader(Box::new(compat_reader))); } } - Ok(Box::new(builder.build().await?)) + + Ok(sources) + } + + /// Returns whether to use a parallel reader. + fn use_parallel_reader(&self) -> bool { + self.parallelism.allow_parallel_scan() && (self.files.len() + self.memtables.len()) > 1 + } + + /// Scan the input source in another task. + fn spawn_scan_task(&self, mut input: Source, semaphore: Arc) -> BoxedBatchStream { + let (sender, receiver) = mpsc::channel(self.parallelism.channel_size); + tokio::spawn(async move { + loop { + // We release the permit before sending result to avoid the task waiting on + // the channel with the permit holded + let maybe_batch = { + // Safety: We never close the semaphore. + let _permit = semaphore.acquire().await.unwrap(); + input.next_batch().await + }; + match maybe_batch { + Ok(Some(batch)) => { + let _ = sender.send(Ok(batch)).await; + } + Ok(None) => break, + Err(e) => { + let _ = sender.send(Err(e)).await; + break; + } + } + } + }); + + Box::pin(ReceiverStream::new(receiver)) } /// Fetch a batch from the reader and convert it into a record batch. @@ -211,7 +293,9 @@ impl SeqScan { return Ok(None); }; + let convert_start = Instant::now(); let record_batch = mapper.convert(&batch, cache)?; + metrics.convert_cost += convert_start.elapsed(); metrics.scan_cost += start.elapsed(); Ok(Some(record_batch)) @@ -221,8 +305,12 @@ impl SeqScan { /// Metrics for [SeqScan]. #[derive(Debug, Default)] struct Metrics { + /// Duration to build the reader. + build_reader_cost: Duration, /// Duration to scan data. scan_cost: Duration, + /// Duration to convert batches. + convert_cost: Duration, } #[cfg(test)] diff --git a/src/mito2/src/test_util.rs b/src/mito2/src/test_util.rs index 6804577d02af..8ab5394773ca 100644 --- a/src/mito2/src/test_util.rs +++ b/src/mito2/src/test_util.rs @@ -196,6 +196,15 @@ impl TestEnv { ) } + /// Open the engine. + pub async fn open_engine(&mut self, config: MitoConfig) -> MitoEngine { + MitoEngine::new( + config, + self.logstore.clone().unwrap(), + self.object_store_manager.clone().unwrap(), + ) + } + /// Only initializes the object store manager, returns the default object store. pub fn init_object_store_manager(&mut self) -> ObjectStore { self.object_store_manager = Some(Arc::new(self.create_object_store_manager())); @@ -206,7 +215,11 @@ impl TestEnv { pub(crate) async fn create_worker_group(&self, config: MitoConfig) -> WorkerGroup { let (log_store, object_store_manager) = self.create_log_and_object_store_manager().await; - WorkerGroup::start(config, Arc::new(log_store), Arc::new(object_store_manager)) + WorkerGroup::start( + Arc::new(config), + Arc::new(log_store), + Arc::new(object_store_manager), + ) } async fn create_log_and_object_store_manager( diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index 2ffc86d48f1e..750e29b56751 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -111,11 +111,10 @@ impl WorkerGroup { /// /// The number of workers should be power of two. pub(crate) fn start( - config: MitoConfig, + config: Arc, log_store: Arc, object_store_manager: ObjectStoreManagerRef, ) -> WorkerGroup { - let config = Arc::new(config); let write_buffer_manager = Arc::new(WriteBufferManagerImpl::new( config.global_write_buffer_size.as_bytes() as usize, )); @@ -205,13 +204,12 @@ impl WorkerGroup { /// /// The number of workers should be power of two. pub(crate) fn start_for_test( - config: MitoConfig, + config: Arc, log_store: Arc, object_store_manager: ObjectStoreManagerRef, write_buffer_manager: Option, listener: Option, ) -> WorkerGroup { - let config = Arc::new(config); let write_buffer_manager = write_buffer_manager.unwrap_or_else(|| { Arc::new(WriteBufferManagerImpl::new( config.global_write_buffer_size.as_bytes() as usize, diff --git a/src/query/Cargo.toml b/src/query/Cargo.toml index ecffc1b95ed6..2e61d7dd0a55 100644 --- a/src/query/Cargo.toml +++ b/src/query/Cargo.toml @@ -72,4 +72,4 @@ stats-cli = "3.0" store-api.workspace = true streaming-stats = "0.2" table = { workspace = true, features = ["testing"] } -tokio-stream = "0.1" +tokio-stream.workspace = true diff --git a/src/servers/Cargo.toml b/src/servers/Cargo.toml index 4d4078c5d1a8..1a8f8d5e3956 100644 --- a/src/servers/Cargo.toml +++ b/src/servers/Cargo.toml @@ -86,7 +86,7 @@ sql.workspace = true strum.workspace = true table.workspace = true tokio-rustls = "0.24" -tokio-stream = { version = "0.1", features = ["net"] } +tokio-stream = { workspace = true, features = ["net"] } tokio.workspace = true tonic-reflection = "0.10" tonic.workspace = true diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index 07f1b903c3c4..c687dd36d07a 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -717,7 +717,6 @@ providers = [] [[datanode.region_engine]] [datanode.region_engine.mito] -num_workers = {} worker_channel_size = 128 worker_request_batch_size = 64 manifest_checkpoint_distance = 10 @@ -730,6 +729,7 @@ sst_meta_cache_size = "128MiB" vector_cache_size = "512MiB" page_cache_size = "512MiB" sst_write_buffer_size = "8MiB" +parallel_scan_channel_size = 32 [[datanode.region_engine]] @@ -741,27 +741,38 @@ enable_otlp_tracing = false [logging] enable_otlp_tracing = false"#, store_type, - num_cpus::get() / 2 ); let body_text = drop_lines_with_inconsistent_results(res_get.text().await); assert_eq!(body_text, expected_toml_str); } fn drop_lines_with_inconsistent_results(input: String) -> String { + let inconsistent_results = [ + "dir =", + "data_home =", + "bucket =", + "root =", + "endpoint =", + "region =", + "cache_path =", + "cache_capacity =", + "sas_token =", + "scope =", + "num_workers =", + "scan_parallelism =", + ]; + input .lines() .filter(|line| { // ignores - !line.trim().starts_with("dir =") - && !line.trim().starts_with("data_home =") - && !line.trim().starts_with("bucket =") - && !line.trim().starts_with("root =") - && !line.trim().starts_with("endpoint =") - && !line.trim().starts_with("region =") - && !line.trim().starts_with("cache_path =") - && !line.trim().starts_with("cache_capacity =") - && !line.trim().starts_with("sas_token =") - && !line.trim().starts_with("scope =") + let line = line.trim(); + for prefix in inconsistent_results { + if line.starts_with(prefix) { + return false; + } + } + true }) .collect::>() .join(