diff --git a/src/batch/src/executor/s3_file_scan.rs b/src/batch/src/executor/s3_file_scan.rs index 0fd907e371673..7c56788f85ae0 100644 --- a/src/batch/src/executor/s3_file_scan.rs +++ b/src/batch/src/executor/s3_file_scan.rs @@ -18,7 +18,7 @@ use futures_util::stream::StreamExt; use parquet::arrow::ProjectionMask; use risingwave_common::array::arrow::IcebergArrowConvert; use risingwave_common::catalog::Schema; -use risingwave_connector::source::iceberg::arrow_file_reader::create_parquet_stream_builder; +use risingwave_connector::source::iceberg::parquet_file_reader::create_parquet_stream_builder; use crate::error::BatchError; use crate::executor::{DataChunk, Executor}; diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index 2c8be26c05b33..86491ae464a3f 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -93,6 +93,7 @@ opendal = { version = "0.47", features = [ ] } openssl = "0.10" parking_lot = { workspace = true } +parquet = { workspace = true } paste = "1" pg_bigdecimal = { git = "https://github.com/risingwavelabs/rust-pg_bigdecimal", rev = "0b7893d88894ca082b4525f94f812da034486f7c" } postgres-openssl = "0.5.0" diff --git a/src/connector/src/source/iceberg/mod.rs b/src/connector/src/source/iceberg/mod.rs index 51dffd1873826..d1d79d8e3cba3 100644 --- a/src/connector/src/source/iceberg/mod.rs +++ b/src/connector/src/source/iceberg/mod.rs @@ -12,12 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. -pub mod arrow_file_reader; +pub mod parquet_file_reader; use std::collections::HashMap; use anyhow::anyhow; -pub use arrow_file_reader::*; +pub use parquet_file_reader::*; use async_trait::async_trait; use futures::StreamExt; use iceberg::spec::{DataContentType, ManifestList}; diff --git a/src/connector/src/source/iceberg/arrow_file_reader.rs b/src/connector/src/source/iceberg/parquet_file_reader.rs similarity index 87% rename from src/connector/src/source/iceberg/arrow_file_reader.rs rename to src/connector/src/source/iceberg/parquet_file_reader.rs index e66d84f7ad9e5..d61eab039da09 100644 --- a/src/connector/src/source/iceberg/arrow_file_reader.rs +++ b/src/connector/src/source/iceberg/parquet_file_reader.rs @@ -27,18 +27,18 @@ use parquet::arrow::async_reader::{AsyncFileReader, MetadataLoader}; use parquet::arrow::ParquetRecordBatchStreamBuilder; use parquet::file::metadata::ParquetMetaData; -pub struct ArrowFileReader { +pub struct ParquetFileReader { meta: FileMetadata, r: R, } -impl ArrowFileReader { +impl ParquetFileReader { pub fn new(meta: FileMetadata, r: R) -> Self { Self { meta, r } } } -impl AsyncFileReader for ArrowFileReader { +impl AsyncFileReader for ParquetFileReader { fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, parquet::errors::Result> { Box::pin( self.r @@ -62,7 +62,7 @@ pub async fn create_parquet_stream_builder( s3_access_key: String, s3_secret_key: String, location: String, -) -> Result>, anyhow::Error> { +) -> Result>, anyhow::Error> { let mut props = HashMap::new(); props.insert(S3_REGION, s3_region.clone()); props.insert(S3_ACCESS_KEY_ID, s3_access_key.clone()); @@ -77,9 +77,9 @@ pub async fn create_parquet_stream_builder( let parquet_metadata = parquet_file.metadata().await.map_err(|e| anyhow!(e))?; let parquet_reader = parquet_file.reader().await.map_err(|e| anyhow!(e))?; - let arrow_file_reader = ArrowFileReader::new(parquet_metadata, parquet_reader); + let parquet_file_reader = ParquetFileReader::new(parquet_metadata, parquet_reader); - ParquetRecordBatchStreamBuilder::new(arrow_file_reader) + ParquetRecordBatchStreamBuilder::new(parquet_file_reader) .await .map_err(|e| anyhow!(e)) }