diff --git a/src/batch/executors/src/executor/s3_file_scan.rs b/src/batch/executors/src/executor/s3_file_scan.rs index 82a80a17dc74..8140011dfcfc 100644 --- a/src/batch/executors/src/executor/s3_file_scan.rs +++ b/src/batch/executors/src/executor/s3_file_scan.rs @@ -16,7 +16,9 @@ use futures_async_stream::try_stream; use futures_util::stream::StreamExt; use risingwave_common::array::DataChunk; use risingwave_common::catalog::{Field, Schema}; -use risingwave_connector::source::iceberg::{new_s3_operator, read_parquet_file}; +use risingwave_connector::source::iceberg::{ + extract_bucket_and_file_name, new_s3_operator, read_parquet_file, +}; use risingwave_pb::batch_plan::file_scan_node; use risingwave_pb::batch_plan::file_scan_node::StorageType; use risingwave_pb::batch_plan::plan_node::NodeBody; @@ -82,13 +84,15 @@ impl S3FileScanExecutor { async fn do_execute(self: Box) { assert_eq!(self.file_format, FileFormat::Parquet); for file in self.file_location { + let (bucket, file_name) = extract_bucket_and_file_name(&file)?; let op = new_s3_operator( self.s3_region.clone(), self.s3_access_key.clone(), self.s3_secret_key.clone(), - file.clone(), + bucket.clone(), )?; - let chunk_stream = read_parquet_file(op, file, None, None, self.batch_size, 0).await?; + let chunk_stream = + read_parquet_file(op, file_name, None, None, self.batch_size, 0).await?; #[for_await] for stream_chunk in chunk_stream { let stream_chunk = stream_chunk?; diff --git a/src/connector/src/source/iceberg/parquet_file_handler.rs b/src/connector/src/source/iceberg/parquet_file_handler.rs index 71c82b7ee579..49b6d9a42527 100644 --- a/src/connector/src/source/iceberg/parquet_file_handler.rs +++ b/src/connector/src/source/iceberg/parquet_file_handler.rs @@ -108,10 +108,9 @@ pub fn new_s3_operator( s3_region: String, s3_access_key: String, s3_secret_key: String, - location: String, + bucket: String, ) -> ConnectorResult { // Create s3 builder. - let bucket = extract_bucket(&location); let mut builder = S3::default().bucket(&bucket).region(&s3_region); builder = builder.secret_access_key(&s3_access_key); builder = builder.secret_access_key(&s3_secret_key); @@ -120,8 +119,6 @@ pub fn new_s3_operator( bucket, s3_region )); - builder = builder.disable_config_load(); - let op: Operator = Operator::new(builder)? .layer(LoggingLayer::default()) .layer(RetryLayer::default()) @@ -130,13 +127,20 @@ pub fn new_s3_operator( Ok(op) } -fn extract_bucket(location: &str) -> String { - let prefix = "s3://"; - let start = prefix.len(); - let end = location[start..] - .find('/') - .unwrap_or(location.len() - start); - location[start..start + end].to_string() +pub fn extract_bucket_and_file_name(location: &str) -> ConnectorResult<(String, String)> { + let url = Url::parse(location)?; + let bucket = url + .host_str() + .ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!("Invalid s3 url: {}, missing bucket", location), + ) + })? + .to_owned(); + let prefix = format!("s3://{}/", bucket); + let file_name = location[prefix.len()..].to_string(); + Ok((bucket, file_name)) } pub async fn list_s3_directory( @@ -145,14 +149,7 @@ pub async fn list_s3_directory( s3_secret_key: String, dir: String, ) -> Result, anyhow::Error> { - let url = Url::parse(&dir)?; - let bucket = url.host_str().ok_or_else(|| { - Error::new( - ErrorKind::DataInvalid, - format!("Invalid s3 url: {}, missing bucket", dir), - ) - })?; - + let (bucket, file_name) = extract_bucket_and_file_name(&dir)?; let prefix = format!("s3://{}/", bucket); if dir.starts_with(&prefix) { let mut builder = S3::default(); @@ -160,12 +157,16 @@ pub async fn list_s3_directory( .region(&s3_region) .access_key_id(&s3_access_key) .secret_access_key(&s3_secret_key) - .bucket(bucket); + .bucket(&bucket); + builder = builder.endpoint(&format!( + "https://{}.s3.{}.amazonaws.com", + bucket, s3_region + )); let op = Operator::new(builder)? .layer(RetryLayer::default()) .finish(); - op.list(&dir[prefix.len()..]) + op.list(&file_name) .await .map_err(|e| anyhow!(e)) .map(|list| { @@ -197,44 +198,39 @@ pub async fn list_s3_directory( /// Parquet file schema that match the requested schema. If an error occurs during processing, /// it returns an appropriate error. pub fn extract_valid_column_indices( - columns: Option>, + rw_columns: Vec, metadata: &FileMetaData, ) -> ConnectorResult> { - match columns { - Some(rw_columns) => { - let parquet_column_names = metadata - .schema_descr() - .columns() - .iter() - .map(|c| c.name()) - .collect_vec(); + let parquet_column_names = metadata + .schema_descr() + .columns() + .iter() + .map(|c| c.name()) + .collect_vec(); - let converted_arrow_schema = - parquet_to_arrow_schema(metadata.schema_descr(), metadata.key_value_metadata()) - .map_err(anyhow::Error::from)?; + let converted_arrow_schema = + parquet_to_arrow_schema(metadata.schema_descr(), metadata.key_value_metadata()) + .map_err(anyhow::Error::from)?; - let valid_column_indices: Vec = rw_columns - .iter() - .filter_map(|column| { - parquet_column_names - .iter() - .position(|&name| name == column.name) - .and_then(|pos| { - let arrow_data_type: &risingwave_common::array::arrow::arrow_schema_udf::DataType = converted_arrow_schema.field(pos).data_type(); - let rw_data_type: &risingwave_common::types::DataType = &column.data_type; + let valid_column_indices: Vec = rw_columns + .iter() + .filter_map(|column| { + parquet_column_names + .iter() + .position(|&name| name == column.name) + .and_then(|pos| { + let arrow_data_type: &risingwave_common::array::arrow::arrow_schema_udf::DataType = converted_arrow_schema.field(pos).data_type(); + let rw_data_type: &risingwave_common::types::DataType = &column.data_type; - if is_parquet_schema_match_source_schema(arrow_data_type, rw_data_type) { - Some(pos) - } else { - None - } - }) - }) - .collect(); - Ok(valid_column_indices) - } - None => Ok(vec![]), - } + if is_parquet_schema_match_source_schema(arrow_data_type, rw_data_type) { + Some(pos) + } else { + None + } + }) + }) + .collect(); + Ok(valid_column_indices) } /// Reads a specified Parquet file and converts its content into a stream of chunks. @@ -258,8 +254,14 @@ pub async fn read_parquet_file( let parquet_metadata = reader.get_metadata().await.map_err(anyhow::Error::from)?; let file_metadata = parquet_metadata.file_metadata(); - let column_indices = extract_valid_column_indices(rw_columns, file_metadata)?; - let projection_mask = ProjectionMask::leaves(file_metadata.schema_descr(), column_indices); + let projection_mask = match rw_columns { + Some(columns) => { + let column_indices = extract_valid_column_indices(columns, file_metadata)?; + ProjectionMask::leaves(file_metadata.schema_descr(), column_indices) + } + None => ProjectionMask::all(), + }; + // For the Parquet format, we directly convert from a record batch to a stream chunk. // Therefore, the offset of the Parquet file represents the current position in terms of the number of rows read from the file. let record_batch_stream = ParquetRecordBatchStreamBuilder::new(reader) @@ -289,7 +291,6 @@ pub async fn read_parquet_file( }) .collect(), }; - let parquet_parser = ParquetParser::new(columns, file_name, offset)?; let msg_stream: Pin< Box> + Send>, diff --git a/src/frontend/src/expr/table_function.rs b/src/frontend/src/expr/table_function.rs index b3cf4e286096..d49b4332b117 100644 --- a/src/frontend/src/expr/table_function.rs +++ b/src/frontend/src/expr/table_function.rs @@ -21,7 +21,7 @@ use mysql_async::prelude::*; use risingwave_common::array::arrow::IcebergArrowConvert; use risingwave_common::types::{DataType, ScalarImpl, StructType}; use risingwave_connector::source::iceberg::{ - get_parquet_fields, list_s3_directory, new_s3_operator, + extract_bucket_and_file_name, get_parquet_fields, list_s3_directory, new_s3_operator, }; pub use risingwave_pb::expr::table_function::PbType as TableFunctionType; use risingwave_pb::expr::PbTableFunction; @@ -177,23 +177,19 @@ impl TableFunction { let schema = tokio::task::block_in_place(|| { FRONTEND_RUNTIME.block_on(async { + let location = match files.as_ref() { + Some(files) => files[0].clone(), + None => eval_args[5].clone(), + }; + let (bucket, file_name) = extract_bucket_and_file_name(&location)?; let op = new_s3_operator( eval_args[2].clone(), eval_args[3].clone(), eval_args[4].clone(), - match files.as_ref() { - Some(files) => files[0].clone(), - None => eval_args[5].clone(), - }, + bucket.clone(), )?; - let fields = get_parquet_fields( - op, - match files.as_ref() { - Some(files) => files[0].clone(), - None => eval_args[5].clone(), - }, - ) - .await?; + + let fields = get_parquet_fields(op, file_name).await?; let mut rw_types = vec![]; for field in &fields {