diff --git a/Cargo.lock b/Cargo.lock index 9b6912afb43f..b696852c9708 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8914,38 +8914,6 @@ dependencies = [ "zstd 0.13.0", ] -[[package]] -name = "parquet" -version = "50.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "547b92ebf0c1177e3892f44c8f79757ee62e678d564a9834189725f2c5b7a750" -dependencies = [ - "ahash 0.8.11", - "arrow-array 50.0.0", - "arrow-buffer 50.0.0", - "arrow-cast 50.0.0", - "arrow-data 50.0.0", - "arrow-ipc 50.0.0", - "arrow-schema 50.0.0", - "arrow-select 50.0.0", - "base64 0.21.7", - "brotli 3.5.0", - "bytes", - "chrono", - "flate2", - "half 2.3.1", - "hashbrown 0.14.3", - "lz4_flex", - "num", - "num-bigint", - "paste", - "seq-macro", - "snap", - "thrift", - "twox-hash", - "zstd 0.13.0", -] - [[package]] name = "parquet" version = "52.0.0" @@ -10694,6 +10662,7 @@ dependencies = [ "memcomparable", "opendal", "parking_lot 0.12.1", + "parquet 52.0.0", "paste", "prometheus", "prost 0.12.1", @@ -11203,7 +11172,6 @@ dependencies = [ "opendal", "openssl", "parking_lot 0.12.1", - "parquet 50.0.0", "paste", "pg_bigdecimal", "postgres-openssl", diff --git a/Cargo.toml b/Cargo.toml index 39a600e374d7..57fda38e8629 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -171,7 +171,7 @@ deltalake = { git = "https://github.com/risingwavelabs/delta-rs", rev = "5c2dccd itertools = "0.12.0" jsonbb = "0.1.4" lru = { git = "https://github.com/risingwavelabs/lru-rs.git", rev = "2682b85" } -parquet = "50" +parquet = { version = "52", features = ["async"] } thiserror-ext = "0.1.2" tikv-jemalloc-ctl = { git = "https://github.com/risingwavelabs/jemallocator.git", rev = "64a2d9" } tikv-jemallocator = { git = "https://github.com/risingwavelabs/jemallocator.git", features = [ diff --git a/src/batch/Cargo.toml b/src/batch/Cargo.toml index cf94b2ec838a..c3042346aff8 100644 --- a/src/batch/Cargo.toml +++ b/src/batch/Cargo.toml @@ -34,6 +34,7 @@ itertools = { workspace = true } memcomparable = "0.2" opendal = "0.47" parking_lot = { workspace = true } +parquet = { workspace = true } paste = "1" prometheus = { version = "0.13", features = ["process"] } prost = "0.12" diff --git a/src/batch/src/error.rs b/src/batch/src/error.rs index 2a555f18c356..9f652fad233c 100644 --- a/src/batch/src/error.rs +++ b/src/batch/src/error.rs @@ -17,6 +17,7 @@ use std::sync::Arc; pub use anyhow::anyhow; +use parquet::errors::ParquetError; use risingwave_common::array::ArrayError; use risingwave_common::error::BoxedError; use risingwave_common::util::value_encoding::error::ValueEncodingError; @@ -119,6 +120,13 @@ pub enum BatchError { iceberg::Error, ), + #[error(transparent)] + Parquet( + #[from] + #[backtrace] + ParquetError, + ), + // Make the ref-counted type to be a variant for easier code structuring. // TODO(error-handling): replace with `thiserror_ext::Arc` #[error(transparent)] diff --git a/src/batch/src/executor/mod.rs b/src/batch/src/executor/mod.rs index c19bc06c141b..3e2c2a8396a0 100644 --- a/src/batch/src/executor/mod.rs +++ b/src/batch/src/executor/mod.rs @@ -33,6 +33,7 @@ mod order_by; mod project; mod project_set; mod row_seq_scan; +mod s3_file_scan; mod sort_agg; mod sort_over_window; mod source; diff --git a/src/batch/src/executor/s3_file_scan.rs b/src/batch/src/executor/s3_file_scan.rs new file mode 100644 index 000000000000..61fc2d5b5dd3 --- /dev/null +++ b/src/batch/src/executor/s3_file_scan.rs @@ -0,0 +1,163 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::ops::Range; +use std::sync::Arc; + +use anyhow::anyhow; +use bytes::Bytes; +use futures_async_stream::try_stream; +use futures_util::future::BoxFuture; +use futures_util::stream::StreamExt; +use futures_util::TryFutureExt; +use hashbrown::HashMap; +use iceberg::io::{ + FileIOBuilder, FileMetadata, FileRead, S3_ACCESS_KEY_ID, S3_REGION, S3_SECRET_ACCESS_KEY, +}; +use parquet::arrow::async_reader::{AsyncFileReader, MetadataLoader}; +use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask}; +use parquet::file::metadata::ParquetMetaData; +use risingwave_common::array::arrow::IcebergArrowConvert; +use risingwave_common::catalog::Schema; + +use crate::error::BatchError; +use crate::executor::{DataChunk, Executor}; + +#[derive(PartialEq, Debug)] +pub enum FileFormat { + Parquet, +} + +/// S3 file scan executor. Currently only support parquet file format. +pub struct S3FileScanExecutor { + file_format: FileFormat, + location: String, + s3_region: String, + s3_access_key: String, + s3_secret_key: String, + batch_size: usize, + schema: Schema, + identity: String, +} + +impl Executor for S3FileScanExecutor { + fn schema(&self) -> &risingwave_common::catalog::Schema { + &self.schema + } + + fn identity(&self) -> &str { + &self.identity + } + + fn execute(self: Box) -> super::BoxedDataChunkStream { + self.do_execute().boxed() + } +} + +impl S3FileScanExecutor { + #![expect(dead_code)] + pub fn new( + file_format: FileFormat, + location: String, + s3_region: String, + s3_access_key: String, + s3_secret_key: String, + batch_size: usize, + schema: Schema, + identity: String, + ) -> Self { + Self { + file_format, + location, + s3_region, + s3_access_key, + s3_secret_key, + batch_size, + schema, + identity, + } + } + + #[try_stream(ok = DataChunk, error = BatchError)] + async fn do_execute(self: Box) { + assert_eq!(self.file_format, FileFormat::Parquet); + + let mut props = HashMap::new(); + props.insert(S3_REGION, self.s3_region.clone()); + props.insert(S3_ACCESS_KEY_ID, self.s3_access_key.clone()); + props.insert(S3_SECRET_ACCESS_KEY, self.s3_secret_key.clone()); + + let file_io_builder = FileIOBuilder::new("s3"); + let file_io = file_io_builder.with_props(props.into_iter()).build()?; + let parquet_file = file_io.new_input(&self.location)?; + + let parquet_metadata = parquet_file.metadata().await?; + let parquet_reader = parquet_file.reader().await?; + let arrow_file_reader = ArrowFileReader::new(parquet_metadata, parquet_reader); + + let mut batch_stream_builder = ParquetRecordBatchStreamBuilder::new(arrow_file_reader) + .await + .map_err(|e| anyhow!(e))?; + + let arrow_schema = batch_stream_builder.schema(); + assert_eq!(arrow_schema.fields.len(), self.schema.fields.len()); + for (field, arrow_field) in self.schema.fields.iter().zip(arrow_schema.fields.iter()) { + assert_eq!(*field.name, *arrow_field.name()); + } + + batch_stream_builder = batch_stream_builder.with_projection(ProjectionMask::all()); + + batch_stream_builder = batch_stream_builder.with_batch_size(self.batch_size); + + let record_batch_stream = batch_stream_builder.build().map_err(|e| anyhow!(e))?; + + #[for_await] + for record_batch in record_batch_stream { + let record_batch = record_batch.map_err(BatchError::Parquet)?; + let chunk = IcebergArrowConvert.chunk_from_record_batch(&record_batch)?; + debug_assert_eq!(chunk.data_types(), self.schema.data_types()); + yield chunk; + } + } +} + +struct ArrowFileReader { + meta: FileMetadata, + r: R, +} + +impl ArrowFileReader { + fn new(meta: FileMetadata, r: R) -> Self { + Self { meta, r } + } +} + +impl AsyncFileReader for ArrowFileReader { + fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, parquet::errors::Result> { + Box::pin( + self.r + .read(range.start as _..range.end as _) + .map_err(|err| parquet::errors::ParquetError::External(Box::new(err))), + ) + } + + fn get_metadata(&mut self) -> BoxFuture<'_, parquet::errors::Result>> { + Box::pin(async move { + let file_size = self.meta.size; + let mut loader = MetadataLoader::load(self, file_size as usize, None).await?; + loader.load_page_index(false, false).await?; + Ok(Arc::new(loader.finish())) + }) + } +} diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index 86491ae464a3..2c8be26c05b3 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -93,7 +93,6 @@ opendal = { version = "0.47", features = [ ] } openssl = "0.10" parking_lot = { workspace = true } -parquet = { workspace = true } paste = "1" pg_bigdecimal = { git = "https://github.com/risingwavelabs/rust-pg_bigdecimal", rev = "0b7893d88894ca082b4525f94f812da034486f7c" } postgres-openssl = "0.5.0" diff --git a/src/connector/src/source/filesystem/opendal_source/mod.rs b/src/connector/src/source/filesystem/opendal_source/mod.rs index 26a311f26eb8..78c6ebf4cd8c 100644 --- a/src/connector/src/source/filesystem/opendal_source/mod.rs +++ b/src/connector/src/source/filesystem/opendal_source/mod.rs @@ -26,7 +26,7 @@ pub mod opendal_reader; use self::opendal_enumerator::OpendalEnumerator; use self::opendal_reader::OpendalReader; use super::file_common::CompressionFormat; -use super::s3::S3PropertiesCommon; +pub use super::s3::S3PropertiesCommon; use super::OpendalFsSplit; use crate::error::ConnectorResult; use crate::source::{SourceProperties, UnknownFields};