-
Notifications
You must be signed in to change notification settings - Fork 591
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(batch): support file scan a directory of parquet files #17811
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,9 +23,14 @@ use futures::TryFutureExt; | |
use iceberg::io::{ | ||
FileIOBuilder, FileMetadata, FileRead, S3_ACCESS_KEY_ID, S3_REGION, S3_SECRET_ACCESS_KEY, | ||
}; | ||
use iceberg::{Error, ErrorKind}; | ||
use opendal::layers::RetryLayer; | ||
use opendal::services::S3; | ||
use opendal::Operator; | ||
use parquet::arrow::async_reader::{AsyncFileReader, MetadataLoader}; | ||
use parquet::arrow::ParquetRecordBatchStreamBuilder; | ||
use parquet::file::metadata::ParquetMetaData; | ||
use url::Url; | ||
|
||
pub struct ParquetFileReader<R: FileRead> { | ||
meta: FileMetadata, | ||
|
@@ -83,3 +88,45 @@ pub async fn create_parquet_stream_builder( | |
.await | ||
.map_err(|e| anyhow!(e)) | ||
} | ||
|
||
pub async fn list_s3_directory( | ||
s3_region: String, | ||
s3_access_key: String, | ||
s3_secret_key: String, | ||
dir: String, | ||
) -> Result<Vec<String>, anyhow::Error> { | ||
let url = Url::parse(&dir)?; | ||
let bucket = url.host_str().ok_or_else(|| { | ||
Error::new( | ||
ErrorKind::DataInvalid, | ||
format!("Invalid s3 url: {}, missing bucket", dir), | ||
) | ||
})?; | ||
|
||
let prefix = format!("s3://{}/", bucket); | ||
if dir.starts_with(&prefix) { | ||
let mut builder = S3::default(); | ||
builder | ||
.region(&s3_region) | ||
.access_key_id(&s3_access_key) | ||
.secret_access_key(&s3_secret_key) | ||
.bucket(bucket); | ||
let op = Operator::new(builder)? | ||
.layer(RetryLayer::default()) | ||
.finish(); | ||
|
||
op.list(&dir[prefix.len()..]) | ||
.await | ||
.map_err(|e| anyhow!(e)) | ||
.map(|list| { | ||
list.into_iter() | ||
.map(|entry| prefix.to_string() + entry.path()) | ||
.collect() | ||
}) | ||
} else { | ||
Err(Error::new( | ||
ErrorKind::DataInvalid, | ||
format!("Invalid s3 url: {}, should start with {}", dir, prefix), | ||
))? | ||
Comment on lines
+127
to
+130
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why it's not an assertion? Because the code says the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Because we need to ensure the URL prefix is |
||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,12 +17,12 @@ use std::sync::{Arc, LazyLock}; | |
use itertools::Itertools; | ||
use risingwave_common::array::arrow::IcebergArrowConvert; | ||
use risingwave_common::types::{DataType, ScalarImpl, StructType}; | ||
use risingwave_connector::source::iceberg::create_parquet_stream_builder; | ||
use risingwave_connector::source::iceberg::{create_parquet_stream_builder, list_s3_directory}; | ||
pub use risingwave_pb::expr::table_function::PbType as TableFunctionType; | ||
use risingwave_pb::expr::PbTableFunction; | ||
use tokio::runtime::Runtime; | ||
|
||
use super::{infer_type, Expr, ExprImpl, ExprRewriter, RwResult}; | ||
use super::{infer_type, Expr, ExprImpl, ExprRewriter, Literal, RwResult}; | ||
use crate::catalog::function_catalog::{FunctionCatalog, FunctionKind}; | ||
use crate::error::ErrorCode::BindError; | ||
|
||
|
@@ -68,7 +68,7 @@ impl TableFunction { | |
|
||
/// A special table function which would be transformed into `LogicalFileScan` by `TableFunctionToFileScanRule` in the optimizer. | ||
/// select * from `file_scan`('parquet', 's3', region, ak, sk, location) | ||
pub fn new_file_scan(args: Vec<ExprImpl>) -> RwResult<Self> { | ||
pub fn new_file_scan(mut args: Vec<ExprImpl>) -> RwResult<Self> { | ||
let return_type = { | ||
// arguments: | ||
// file format e.g. parquet | ||
|
@@ -149,13 +149,43 @@ impl TableFunction { | |
.expect("failed to build file-scan runtime") | ||
}); | ||
|
||
tokio::task::block_in_place(|| { | ||
let files = if eval_args[5].ends_with('/') { | ||
let files = tokio::task::block_in_place(|| { | ||
RUNTIME.block_on(async { | ||
let files = list_s3_directory( | ||
eval_args[2].clone(), | ||
eval_args[3].clone(), | ||
eval_args[4].clone(), | ||
eval_args[5].clone(), | ||
) | ||
.await?; | ||
|
||
Ok::<Vec<String>, anyhow::Error>(files) | ||
}) | ||
})?; | ||
|
||
if files.is_empty() { | ||
return Err(BindError( | ||
"file_scan function only accepts non-empty directory".to_string(), | ||
) | ||
.into()); | ||
} | ||
|
||
Some(files) | ||
} else { | ||
None | ||
}; | ||
|
||
let schema = tokio::task::block_in_place(|| { | ||
RUNTIME.block_on(async { | ||
let parquet_stream_builder = create_parquet_stream_builder( | ||
eval_args[2].clone(), | ||
eval_args[3].clone(), | ||
eval_args[4].clone(), | ||
eval_args[5].clone(), | ||
match files.as_ref() { | ||
Some(files) => files[0].clone(), | ||
None => eval_args[5].clone(), | ||
}, | ||
) | ||
.await?; | ||
|
||
|
@@ -171,7 +201,20 @@ impl TableFunction { | |
StructType::new(rw_types), | ||
)) | ||
}) | ||
})? | ||
})?; | ||
|
||
if let Some(files) = files { | ||
// if the file location is a directory, we need to remove the last argument and add all files in the directory as arguments | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A bit dirty to list files in binder, but acceptable to me |
||
args.remove(5); | ||
for file in files { | ||
args.push(ExprImpl::Literal(Box::new(Literal::new( | ||
Some(ScalarImpl::Utf8(file.into())), | ||
DataType::Varchar, | ||
)))); | ||
} | ||
} | ||
|
||
schema | ||
} | ||
}; | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -43,6 +43,7 @@ impl Rule for TableFunctionToFileScanRule { | |
|
||
let schema = Schema::new(fields); | ||
|
||
assert!(logical_table_function.table_function().args.len() >= 6); | ||
let mut eval_args = vec![]; | ||
for arg in &logical_table_function.table_function().args { | ||
assert_eq!(arg.return_type(), DataType::Varchar); | ||
|
@@ -56,14 +57,13 @@ impl Rule for TableFunctionToFileScanRule { | |
} | ||
} | ||
} | ||
assert!(eval_args.len() == 6); | ||
assert!("parquet".eq_ignore_ascii_case(&eval_args[0])); | ||
assert!("s3".eq_ignore_ascii_case(&eval_args[1])); | ||
let s3_region = eval_args[2].clone(); | ||
let s3_access_key = eval_args[3].clone(); | ||
let s3_secret_key = eval_args[4].clone(); | ||
let file_location = eval_args[5].clone(); | ||
|
||
// The rest of the arguments are file locations | ||
let file_location = eval_args[5..].iter().cloned().collect_vec(); | ||
Comment on lines
+65
to
+66
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @tabVersion We set file_location here. |
||
Some( | ||
LogicalFileScan::new( | ||
logical_table_function.ctx(), | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I dont see an example of how to set multiple file_locations