From a7349b573b63fae2c590889938c616a7bd69daad Mon Sep 17 00:00:00 2001 From: Wei <15172118655@163.com> Date: Fri, 22 Dec 2023 16:47:55 +0800 Subject: [PATCH 1/2] feat: support fetch ranges in concurrent (#2959) * feat: concurrent fetch ranges * chore: cr comment * chore: cr comment * chore: clippy --- src/mito2/src/sst/parquet/reader.rs | 12 +-- src/mito2/src/sst/parquet/row_group.rs | 108 +++++++++++++++++++++++-- 2 files changed, 109 insertions(+), 11 deletions(-) diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index 45e36786d41d..0882ef82c7e3 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -22,7 +22,7 @@ use async_trait::async_trait; use common_telemetry::debug; use common_time::range::TimestampRange; use datatypes::arrow::record_batch::RecordBatch; -use object_store::{ObjectStore, Reader}; +use object_store::ObjectStore; use parquet::arrow::arrow_reader::ParquetRecordBatchReader; use parquet::arrow::async_reader::AsyncFileReader; use parquet::arrow::{parquet_to_arrow_field_levels, FieldLevels, ProjectionMask}; @@ -178,7 +178,7 @@ impl ParquetReaderBuilder { file_handle: self.file_handle.clone(), file_path, parquet_meta, - file_reader: reader, + object_store: self.object_store.clone(), projection: projection_mask, field_levels, cache_manager: self.cache_manager.clone(), @@ -285,8 +285,8 @@ struct RowGroupReaderBuilder { file_path: String, /// Metadata of the parquet file. parquet_meta: Arc, - /// Reader to get data. - file_reader: BufReader, + /// Object store as an Operator. + object_store: ObjectStore, /// Projection mask. projection: ProjectionMask, /// Field levels to read. @@ -309,10 +309,12 @@ impl RowGroupReaderBuilder { &self.parquet_meta, row_group_idx, self.cache_manager.clone(), + &self.file_path, + self.object_store.clone(), ); // Fetches data into memory. row_group - .fetch(&mut self.file_reader, &self.projection, None) + .fetch(&self.projection, None) .await .context(ReadParquetSnafu { path: &self.file_path, diff --git a/src/mito2/src/sst/parquet/row_group.rs b/src/mito2/src/sst/parquet/row_group.rs index 827db8999ae8..b24413e43f69 100644 --- a/src/mito2/src/sst/parquet/row_group.rs +++ b/src/mito2/src/sst/parquet/row_group.rs @@ -14,11 +14,12 @@ //! Ports private structs from [parquet crate](https://github.com/apache/arrow-rs/blob/7e134f4d277c0b62c27529fc15a4739de3ad0afd/parquet/src/arrow/async_reader/mod.rs#L644-L650). +use std::ops::Range; use std::sync::Arc; use bytes::{Buf, Bytes}; +use object_store::ObjectStore; use parquet::arrow::arrow_reader::{RowGroups, RowSelection}; -use parquet::arrow::async_reader::AsyncFileReader; use parquet::arrow::ProjectionMask; use parquet::column::page::{PageIterator, PageReader}; use parquet::errors::{ParquetError, Result}; @@ -46,6 +47,9 @@ pub struct InMemoryRowGroup<'a> { /// /// `column_cached_pages.len()` equals to `column_chunks.len()`. column_cached_pages: Vec>>, + file_path: &'a str, + /// Object store. + object_store: ObjectStore, } impl<'a> InMemoryRowGroup<'a> { @@ -59,6 +63,8 @@ impl<'a> InMemoryRowGroup<'a> { parquet_meta: &'a ParquetMetaData, row_group_idx: usize, cache_manager: Option, + file_path: &'a str, + object_store: ObjectStore, ) -> Self { let metadata = parquet_meta.row_group(row_group_idx); // `page_locations` is always `None` if we don't set @@ -78,13 +84,14 @@ impl<'a> InMemoryRowGroup<'a> { row_group_idx, cache_manager, column_cached_pages: vec![None; metadata.columns().len()], + file_path, + object_store, } } /// Fetches the necessary column data into memory - pub async fn fetch( + pub async fn fetch( &mut self, - input: &mut T, projection: &ProjectionMask, selection: Option<&RowSelection>, ) -> Result<()> { @@ -93,7 +100,7 @@ impl<'a> InMemoryRowGroup<'a> { // `RowSelection` let mut page_start_offsets: Vec> = vec![]; - let fetch_ranges = self + let fetch_ranges: Vec<_> = self .column_chunks .iter() .zip(self.metadata.columns()) @@ -119,8 +126,11 @@ impl<'a> InMemoryRowGroup<'a> { ranges }) .collect(); + let mut chunk_data = + fetch_byte_ranges(self.file_path, self.object_store.clone(), fetch_ranges) + .await? + .into_iter(); - let mut chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter(); let mut page_start_offsets = page_start_offsets.into_iter(); for (idx, chunk) in self.column_chunks.iter_mut().enumerate() { @@ -165,7 +175,10 @@ impl<'a> InMemoryRowGroup<'a> { return Ok(()); } - let mut chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter(); + let mut chunk_data = + fetch_byte_ranges(self.file_path, self.object_store.clone(), fetch_ranges) + .await? + .into_iter(); for (idx, (chunk, cached_pages)) in self .column_chunks @@ -336,3 +349,86 @@ impl Iterator for ColumnChunkIterator { } impl PageIterator for ColumnChunkIterator {} + +/// Fetches data from object store. +/// If the object store supports blocking, use sequence blocking read. +/// Otherwise, use concurrent read. +async fn fetch_byte_ranges( + file_path: &str, + object_store: ObjectStore, + ranges: Vec>, +) -> Result> { + let ranges: Vec<_> = ranges + .iter() + .map(|range| range.start as u64..range.end as u64) + .collect(); + if object_store.info().full_capability().blocking { + fetch_ranges_seq(file_path, object_store, ranges).await + } else { + fetch_ranges_concurrent(file_path, object_store, ranges).await + } +} + +/// Fetches data from object store sequentially +async fn fetch_ranges_seq( + file_path: &str, + object_store: ObjectStore, + ranges: Vec>, +) -> Result> { + let block_object_store = object_store.blocking(); + let file_path = file_path.to_string(); + + let f = move || -> Result> { + ranges + .into_iter() + .map(|range| { + let data = block_object_store + .read_with(&file_path) + .range(range) + .call() + .map_err(|e| ParquetError::External(Box::new(e)))?; + Ok::<_, ParquetError>(Bytes::from(data)) + }) + .collect::>>() + }; + + maybe_spawn_blocking(f).await +} + +/// Fetches data from object store concurrently. +async fn fetch_ranges_concurrent( + file_path: &str, + object_store: ObjectStore, + ranges: Vec>, +) -> Result> { + // TODO(QuenKar): may merge small ranges to a bigger range to optimize. + let mut handles = Vec::with_capacity(ranges.len()); + for range in ranges { + let future_read = object_store.read_with(file_path); + handles.push(async move { + let data = future_read + .range(range.start..range.end) + .await + .map_err(|e| ParquetError::External(Box::new(e)))?; + Ok::<_, ParquetError>(Bytes::from(data)) + }); + } + let results = futures::future::try_join_all(handles).await?; + Ok(results) +} + +// Port from https://github.com/apache/arrow-rs/blob/802ed428f87051fdca31180430ddb0ecb2f60e8b/object_store/src/util.rs#L74-L83 +/// Takes a function and spawns it to a tokio blocking pool if available +pub async fn maybe_spawn_blocking(f: F) -> Result +where + F: FnOnce() -> Result + Send + 'static, + T: Send + 'static, +{ + match tokio::runtime::Handle::try_current() { + Ok(runtime) => runtime + .spawn_blocking(f) + .await + .map_err(|e| ParquetError::External(Box::new(e)))?, + Err(_) => f(), + } +} From 7d509e97f6d421234847732e8af00dece80c97c1 Mon Sep 17 00:00:00 2001 From: niebayes Date: Fri, 22 Dec 2023 17:13:18 +0800 Subject: [PATCH 2/2] chore: move some commonly referenced crates to workspace Cargo.toml (#2981) fix: resolve conflicts --- Cargo.lock | 4 ++++ Cargo.toml | 2 ++ src/catalog/Cargo.toml | 4 ++-- src/common/config/Cargo.toml | 2 +- src/common/decimal/Cargo.toml | 2 +- src/common/grpc/Cargo.toml | 2 +- src/common/meta/Cargo.toml | 2 +- src/common/procedure/Cargo.toml | 2 +- src/common/time/Cargo.toml | 2 +- src/datanode/Cargo.toml | 4 ++-- src/datatypes/Cargo.toml | 2 +- src/file-engine/Cargo.toml | 2 +- src/frontend/Cargo.toml | 2 +- src/log-store/Cargo.toml | 4 ++++ src/meta-srv/Cargo.toml | 4 ++-- src/mito2/Cargo.toml | 2 +- src/operator/Cargo.toml | 2 +- src/partition/Cargo.toml | 2 +- src/query/Cargo.toml | 2 +- src/servers/Cargo.toml | 4 ++-- src/store-api/Cargo.toml | 2 +- tests-integration/Cargo.toml | 2 +- 22 files changed, 33 insertions(+), 23 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b426112a4ce6..89e77773b2e3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4476,12 +4476,16 @@ dependencies = [ "common-runtime", "common-telemetry", "common-test-util", + "dashmap", "futures", "futures-util", "protobuf", "protobuf-build", "raft-engine", "rand", + "rskafka", + "serde", + "serde_json", "snafu", "store-api", "tokio", diff --git a/Cargo.toml b/Cargo.toml index 87985d74935b..9d6508b0d569 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -77,6 +77,7 @@ bitflags = "2.4.1" bytemuck = "1.12" bytes = { version = "1.5", features = ["serde"] } chrono = { version = "0.4", features = ["serde"] } +dashmap = "5.4" datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "26e43acac3a96cec8dd4c8365f22dfb1a84306e9" } datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git", rev = "26e43acac3a96cec8dd4c8365f22dfb1a84306e9" } datafusion-expr = { git = "https://github.com/apache/arrow-datafusion.git", rev = "26e43acac3a96cec8dd4c8365f22dfb1a84306e9" } @@ -116,6 +117,7 @@ reqwest = { version = "0.11", default-features = false, features = [ "rustls-tls-native-roots", "stream", ] } +rskafka = "0.5" rust_decimal = "1.33" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" diff --git a/src/catalog/Cargo.toml b/src/catalog/Cargo.toml index cb37e048b0c8..79f3603f4f19 100644 --- a/src/catalog/Cargo.toml +++ b/src/catalog/Cargo.toml @@ -24,7 +24,7 @@ common-recordbatch.workspace = true common-runtime.workspace = true common-telemetry.workspace = true common-time.workspace = true -dashmap = "5.4" +dashmap.workspace = true datafusion.workspace = true datatypes.workspace = true futures = "0.3" @@ -38,7 +38,7 @@ paste = "1.0" prometheus.workspace = true regex.workspace = true serde.workspace = true -serde_json = "1.0" +serde_json.workspace = true session.workspace = true snafu.workspace = true store-api.workspace = true diff --git a/src/common/config/Cargo.toml b/src/common/config/Cargo.toml index d43626b7e960..ce779d904313 100644 --- a/src/common/config/Cargo.toml +++ b/src/common/config/Cargo.toml @@ -7,7 +7,7 @@ license.workspace = true [dependencies] common-base.workspace = true humantime-serde.workspace = true -rskafka = "0.5" +rskafka.workspace = true serde.workspace = true serde_json.workspace = true serde_with = "3" diff --git a/src/common/decimal/Cargo.toml b/src/common/decimal/Cargo.toml index dd0ba90c440f..adf9b08446a8 100644 --- a/src/common/decimal/Cargo.toml +++ b/src/common/decimal/Cargo.toml @@ -11,5 +11,5 @@ common-error.workspace = true common-macro.workspace = true rust_decimal.workspace = true serde.workspace = true -serde_json = "1.0" +serde_json.workspace = true snafu.workspace = true diff --git a/src/common/grpc/Cargo.toml b/src/common/grpc/Cargo.toml index f323af866079..9c71d5786039 100644 --- a/src/common/grpc/Cargo.toml +++ b/src/common/grpc/Cargo.toml @@ -16,7 +16,7 @@ common-recordbatch.workspace = true common-runtime.workspace = true common-telemetry.workspace = true common-time.workspace = true -dashmap = "5.4" +dashmap.workspace = true datafusion.workspace = true datatypes.workspace = true flatbuffers = "23.1" diff --git a/src/common/meta/Cargo.toml b/src/common/meta/Cargo.toml index 6f95109196d8..5a15581f41c6 100644 --- a/src/common/meta/Cargo.toml +++ b/src/common/meta/Cargo.toml @@ -34,7 +34,7 @@ prometheus.workspace = true prost.workspace = true rand.workspace = true regex.workspace = true -rskafka = "0.5" +rskafka.workspace = true serde.workspace = true serde_json.workspace = true serde_with = "3" diff --git a/src/common/procedure/Cargo.toml b/src/common/procedure/Cargo.toml index ece649434b88..795df7eea9ea 100644 --- a/src/common/procedure/Cargo.toml +++ b/src/common/procedure/Cargo.toml @@ -19,7 +19,7 @@ futures.workspace = true humantime-serde.workspace = true object-store.workspace = true serde.workspace = true -serde_json = "1.0" +serde_json.workspace = true smallvec.workspace = true snafu.workspace = true tokio.workspace = true diff --git a/src/common/time/Cargo.toml b/src/common/time/Cargo.toml index 23be6519f4c2..5bdba94a7e9b 100644 --- a/src/common/time/Cargo.toml +++ b/src/common/time/Cargo.toml @@ -11,7 +11,7 @@ chrono.workspace = true common-error.workspace = true common-macro.workspace = true serde = { version = "1.0", features = ["derive"] } -serde_json = "1.0" +serde_json.workspace = true snafu.workspace = true [dev-dependencies] diff --git a/src/datanode/Cargo.toml b/src/datanode/Cargo.toml index ee568a3c0602..85afd709e4b8 100644 --- a/src/datanode/Cargo.toml +++ b/src/datanode/Cargo.toml @@ -32,7 +32,7 @@ common-recordbatch.workspace = true common-runtime.workspace = true common-telemetry.workspace = true common-time.workspace = true -dashmap = "5.4" +dashmap.workspace = true datafusion-common.workspace = true datafusion-expr.workspace = true datafusion.workspace = true @@ -55,7 +55,7 @@ query.workspace = true reqwest.workspace = true secrecy = { version = "0.8", features = ["serde", "alloc"] } serde.workspace = true -serde_json = "1.0" +serde_json.workspace = true servers.workspace = true session.workspace = true snafu.workspace = true diff --git a/src/datatypes/Cargo.toml b/src/datatypes/Cargo.toml index 79ec3099367b..c7674d9973ae 100644 --- a/src/datatypes/Cargo.toml +++ b/src/datatypes/Cargo.toml @@ -26,5 +26,5 @@ ordered-float = { version = "3.0", features = ["serde"] } paste = "1.0" rust_decimal = "1.32.0" serde.workspace = true -serde_json = "1.0" +serde_json.workspace = true snafu.workspace = true diff --git a/src/file-engine/Cargo.toml b/src/file-engine/Cargo.toml index 514a064508b4..f0938c545bad 100644 --- a/src/file-engine/Cargo.toml +++ b/src/file-engine/Cargo.toml @@ -26,7 +26,7 @@ datatypes.workspace = true futures.workspace = true object-store.workspace = true serde = { version = "1.0", features = ["derive"] } -serde_json = "1.0" +serde_json.workspace = true snafu.workspace = true store-api.workspace = true table.workspace = true diff --git a/src/frontend/Cargo.toml b/src/frontend/Cargo.toml index a95b557961ce..6df16182511c 100644 --- a/src/frontend/Cargo.toml +++ b/src/frontend/Cargo.toml @@ -62,7 +62,7 @@ raft-engine.workspace = true regex.workspace = true script = { workspace = true, features = ["python"], optional = true } serde.workspace = true -serde_json = "1.0" +serde_json.workspace = true servers.workspace = true session.workspace = true snafu.workspace = true diff --git a/src/log-store/Cargo.toml b/src/log-store/Cargo.toml index b3ce74640a21..7941cd11e918 100644 --- a/src/log-store/Cargo.toml +++ b/src/log-store/Cargo.toml @@ -21,10 +21,14 @@ common-macro.workspace = true common-meta.workspace = true common-runtime.workspace = true common-telemetry.workspace = true +dashmap.workspace = true futures-util.workspace = true futures.workspace = true protobuf = { version = "2", features = ["bytes"] } raft-engine.workspace = true +rskafka.workspace = true +serde.workspace = true +serde_json.workspace = true snafu.workspace = true store-api.workspace = true tokio-util.workspace = true diff --git a/src/meta-srv/Cargo.toml b/src/meta-srv/Cargo.toml index 2df19d8bea0e..01450f861d11 100644 --- a/src/meta-srv/Cargo.toml +++ b/src/meta-srv/Cargo.toml @@ -26,7 +26,7 @@ common-procedure.workspace = true common-runtime.workspace = true common-telemetry.workspace = true common-time.workspace = true -dashmap = "5.4" +dashmap.workspace = true datatypes.workspace = true derive_builder.workspace = true etcd-client.workspace = true @@ -43,7 +43,7 @@ prost.workspace = true rand.workspace = true regex.workspace = true serde.workspace = true -serde_json = "1.0" +serde_json.workspace = true servers.workspace = true snafu.workspace = true store-api.workspace = true diff --git a/src/mito2/Cargo.toml b/src/mito2/Cargo.toml index 0ee17bb85861..8c3ef50ec2c7 100644 --- a/src/mito2/Cargo.toml +++ b/src/mito2/Cargo.toml @@ -32,7 +32,7 @@ common-runtime.workspace = true common-telemetry.workspace = true common-test-util = { workspace = true, optional = true } common-time.workspace = true -dashmap = "5.4" +dashmap.workspace = true datafusion-common.workspace = true datafusion-expr.workspace = true datafusion.workspace = true diff --git a/src/operator/Cargo.toml b/src/operator/Cargo.toml index a0d627b872d9..a2de8e0102ae 100644 --- a/src/operator/Cargo.toml +++ b/src/operator/Cargo.toml @@ -43,7 +43,7 @@ prometheus.workspace = true query.workspace = true regex.workspace = true serde.workspace = true -serde_json = "1.0" +serde_json.workspace = true servers.workspace = true session.workspace = true snafu.workspace = true diff --git a/src/partition/Cargo.toml b/src/partition/Cargo.toml index 9fd2321f178e..cf9ba0a16f2f 100644 --- a/src/partition/Cargo.toml +++ b/src/partition/Cargo.toml @@ -22,7 +22,7 @@ meta-client.workspace = true moka = { workspace = true, features = ["future"] } prometheus.workspace = true serde.workspace = true -serde_json = "1.0" +serde_json.workspace = true snafu.workspace = true store-api.workspace = true table.workspace = true diff --git a/src/query/Cargo.toml b/src/query/Cargo.toml index 2e61d7dd0a55..24010dd94011 100644 --- a/src/query/Cargo.toml +++ b/src/query/Cargo.toml @@ -47,7 +47,7 @@ promql-parser = "0.1.1" promql.workspace = true regex.workspace = true serde.workspace = true -serde_json = "1.0" +serde_json.workspace = true session.workspace = true snafu.workspace = true sql.workspace = true diff --git a/src/servers/Cargo.toml b/src/servers/Cargo.toml index 45621a26e170..e81dfaf593c8 100644 --- a/src/servers/Cargo.toml +++ b/src/servers/Cargo.toml @@ -80,7 +80,7 @@ rustls-pki-types = "1.0" schemars = "0.8" secrecy = { version = "0.8", features = ["serde", "alloc"] } serde.workspace = true -serde_json = "1.0" +serde_json.workspace = true session.workspace = true sha1 = "0.10" snafu.workspace = true @@ -112,7 +112,7 @@ mysql_async = { version = "0.33", default-features = false, features = [ ] } rand.workspace = true script = { workspace = true, features = ["python"] } -serde_json = "1.0" +serde_json.workspace = true session = { workspace = true, features = ["testing"] } table.workspace = true tokio-postgres = "0.7" diff --git a/src/store-api/Cargo.toml b/src/store-api/Cargo.toml index 293302dcfde5..31b140ef7bdd 100644 --- a/src/store-api/Cargo.toml +++ b/src/store-api/Cargo.toml @@ -26,5 +26,5 @@ strum.workspace = true [dev-dependencies] async-stream.workspace = true -serde_json = "1.0" +serde_json.workspace = true tokio.workspace = true diff --git a/tests-integration/Cargo.toml b/tests-integration/Cargo.toml index 20812d397fba..1ba6f8e05d36 100644 --- a/tests-integration/Cargo.toml +++ b/tests-integration/Cargo.toml @@ -48,7 +48,7 @@ rstest = "0.17" rstest_reuse = "0.5" secrecy = "0.8" serde.workspace = true -serde_json = "1.0" +serde_json.workspace = true servers = { workspace = true, features = ["testing"] } session.workspace = true snafu.workspace = true