diff --git a/e2e_test/s3/fs_parquet_source_and_sink.py b/e2e_test/s3/fs_parquet_source_and_sink.py index 033cb73ffbe70..6425ef1d3a9d6 100644 --- a/e2e_test/s3/fs_parquet_source_and_sink.py +++ b/e2e_test/s3/fs_parquet_source_and_sink.py @@ -116,6 +116,7 @@ def _table(): return 's3_test_parquet' # Execute a SELECT statement + cur.execute(f'''set sink_decouple = false;''') cur.execute(f'''CREATE sink test_file_sink as select id, name, diff --git a/e2e_test/source/opendal/posix_fs.slt b/e2e_test/source/opendal/posix_fs.slt index 3fc572a1a1cc8..1bf026aed2744 100644 --- a/e2e_test/source/opendal/posix_fs.slt +++ b/e2e_test/source/opendal/posix_fs.slt @@ -2,21 +2,22 @@ statement ok SET RW_IMPLICIT_FLUSH TO true; statement ok -CREATE TABLE diamonds ( +CREATE TABLE diamonds_recursive_read ( carat FLOAT, cut TEXT, color TEXT, depth FLOAT, ) WITH ( - connector = 'posix_fs', - match_pattern = 'data*.csv', - posix_fs.root = 'e2e_test/source/opendal/data', + connector = 'posix_fs', + match_pattern = 'data*.csv', + posix_fs.root = 'e2e_test/source/opendal/data', + recursive_scan = 'true', ) FORMAT PLAIN ENCODE CSV ( without_header = 'false', delimiter = ','); sleep 10s query TTTT rowsort -select * from diamonds; +select * from diamonds_recursive_read; ---- 0.22 Premium I 62 0.23 Very Good H 57.5 @@ -29,5 +30,26 @@ select * from diamonds; 1.28 Good J 63.1 1.3 Fair E 64.7 +statement ok +CREATE TABLE diamonds ( + carat FLOAT, + cut TEXT, + color TEXT, + depth FLOAT, +) WITH ( + connector = 'posix_fs', + match_pattern = 'data*.csv', + posix_fs.root = 'e2e_test/source/opendal', +) FORMAT PLAIN ENCODE CSV ( without_header = 'false', delimiter = ','); + +sleep 10s + +query TTTT rowsort +select * from diamonds; +---- + statement ok DROP TABLE diamonds; + +statement ok +DROP TABLE diamonds_recursive_read; diff --git a/src/connector/src/source/filesystem/opendal_source/azblob_source.rs b/src/connector/src/source/filesystem/opendal_source/azblob_source.rs index 2ee050f21f812..8c6dac01ab87b 100644 --- a/src/connector/src/source/filesystem/opendal_source/azblob_source.rs +++ b/src/connector/src/source/filesystem/opendal_source/azblob_source.rs @@ -66,7 +66,6 @@ impl OpendalEnumerator { }; let compression_format = azblob_properties.compression_format; - Ok(Self { op, prefix, diff --git a/src/connector/src/source/filesystem/opendal_source/gcs_source.rs b/src/connector/src/source/filesystem/opendal_source/gcs_source.rs index 768f19fc36722..9a6d883f3c922 100644 --- a/src/connector/src/source/filesystem/opendal_source/gcs_source.rs +++ b/src/connector/src/source/filesystem/opendal_source/gcs_source.rs @@ -60,7 +60,6 @@ impl OpendalEnumerator { }; let compression_format = gcs_properties.compression_format; - Ok(Self { op, prefix, diff --git a/src/connector/src/source/filesystem/opendal_source/mod.rs b/src/connector/src/source/filesystem/opendal_source/mod.rs index cbb3c2a9c7b85..cea4972def92c 100644 --- a/src/connector/src/source/filesystem/opendal_source/mod.rs +++ b/src/connector/src/source/filesystem/opendal_source/mod.rs @@ -47,6 +47,10 @@ pub struct FsSourceCommon { #[serde(rename = "refresh.interval.sec")] #[serde_as(as = "Option")] pub refresh_interval_sec: Option, + + #[serde(rename = "recursive_scan", default)] + #[serde_as(as = "Option")] + pub recursive_scan: Option, } #[derive(Clone, Debug, Deserialize, PartialEq, WithOptions)] diff --git a/src/connector/src/source/filesystem/opendal_source/opendal_enumerator.rs b/src/connector/src/source/filesystem/opendal_source/opendal_enumerator.rs index 7396eac2ea38e..a9cb4b6c3f7f0 100644 --- a/src/connector/src/source/filesystem/opendal_source/opendal_enumerator.rs +++ b/src/connector/src/source/filesystem/opendal_source/opendal_enumerator.rs @@ -66,13 +66,13 @@ impl SplitEnumerator for OpendalEnumerator { } impl OpendalEnumerator { - pub async fn list(&self) -> ConnectorResult { + pub async fn list(&self, recursive_scan: bool) -> ConnectorResult { let prefix = self.prefix.as_deref().unwrap_or("/"); let object_lister = self .op .lister_with(prefix) - .recursive(false) + .recursive(recursive_scan) .metakey(Metakey::ContentLength | Metakey::LastModified) .await?; let stream = stream::unfold(object_lister, |mut object_lister| async move { diff --git a/src/connector/src/source/reader/reader.rs b/src/connector/src/source/reader/reader.rs index 95764792c0025..9a7cb1e440e9f 100644 --- a/src/connector/src/source/reader/reader.rs +++ b/src/connector/src/source/reader/reader.rs @@ -93,27 +93,47 @@ impl SourceReader { match config { ConnectorProperties::Gcs(prop) => { list_interval_sec = get_list_interval_sec(prop.fs_common.refresh_interval_sec); + let recursive_scan = prop.fs_common.recursive_scan.unwrap_or_default(); let lister: OpendalEnumerator = OpendalEnumerator::new_gcs_source(*prop)?; - Ok(build_opendal_fs_list_stream(lister, list_interval_sec)) + Ok(build_opendal_fs_list_stream( + lister, + list_interval_sec, + recursive_scan, + )) } ConnectorProperties::OpendalS3(prop) => { list_interval_sec = get_list_interval_sec(prop.fs_common.refresh_interval_sec); + let recursive_scan = prop.fs_common.recursive_scan.unwrap_or_default(); let lister: OpendalEnumerator = OpendalEnumerator::new_s3_source(prop.s3_properties, prop.assume_role)?; - Ok(build_opendal_fs_list_stream(lister, list_interval_sec)) + Ok(build_opendal_fs_list_stream( + lister, + list_interval_sec, + recursive_scan, + )) } ConnectorProperties::Azblob(prop) => { list_interval_sec = get_list_interval_sec(prop.fs_common.refresh_interval_sec); + let recursive_scan = prop.fs_common.recursive_scan.unwrap_or_default(); let lister: OpendalEnumerator = OpendalEnumerator::new_azblob_source(*prop)?; - Ok(build_opendal_fs_list_stream(lister, list_interval_sec)) + Ok(build_opendal_fs_list_stream( + lister, + list_interval_sec, + recursive_scan, + )) } ConnectorProperties::PosixFs(prop) => { list_interval_sec = get_list_interval_sec(prop.fs_common.refresh_interval_sec); + let recursive_scan = prop.fs_common.recursive_scan.unwrap_or_default(); let lister: OpendalEnumerator = OpendalEnumerator::new_posix_fs_source(*prop)?; - Ok(build_opendal_fs_list_stream(lister, list_interval_sec)) + Ok(build_opendal_fs_list_stream( + lister, + list_interval_sec, + recursive_scan, + )) } other => bail!("Unsupported source: {:?}", other), } @@ -264,10 +284,11 @@ impl SourceReader { async fn build_opendal_fs_list_stream( lister: OpendalEnumerator, list_interval_sec: u64, + recursive_scan: bool, ) { loop { let matcher = lister.get_matcher(); - let mut object_metadata_iter = lister.list().await?; + let mut object_metadata_iter = lister.list(recursive_scan).await?; while let Some(list_res) = object_metadata_iter.next().await { match list_res { @@ -294,9 +315,12 @@ async fn build_opendal_fs_list_stream( } #[try_stream(boxed, ok = OpendalFsSplit, error = crate::error::ConnectorError)] -pub async fn build_opendal_fs_list_for_batch(lister: OpendalEnumerator) { +pub async fn build_opendal_fs_list_for_batch( + lister: OpendalEnumerator, + recursive_scan: bool, +) { let matcher = lister.get_matcher(); - let mut object_metadata_iter = lister.list().await?; + let mut object_metadata_iter = lister.list(recursive_scan).await?; while let Some(list_res) = object_metadata_iter.next().await { match list_res { diff --git a/src/connector/with_options_source.yaml b/src/connector/with_options_source.yaml index a6a19e80c89a3..695a2aeaa1c14 100644 --- a/src/connector/with_options_source.yaml +++ b/src/connector/with_options_source.yaml @@ -24,6 +24,10 @@ AzblobProperties: - name: refresh.interval.sec field_type: u64 required: false + - name: recursive_scan + field_type: bool + required: false + default: Default::default - name: compression_format field_type: CompressionFormat required: false @@ -75,6 +79,10 @@ GcsProperties: - name: refresh.interval.sec field_type: u64 required: false + - name: recursive_scan + field_type: bool + required: false + default: Default::default - name: compression_format field_type: CompressionFormat required: false @@ -840,6 +848,10 @@ OpendalS3Properties: - name: refresh.interval.sec field_type: u64 required: false + - name: recursive_scan + field_type: bool + required: false + default: Default::default PosixFsProperties: fields: - name: posix_fs.root @@ -854,6 +866,10 @@ PosixFsProperties: - name: refresh.interval.sec field_type: u64 required: false + - name: recursive_scan + field_type: bool + required: false + default: Default::default - name: compression_format field_type: CompressionFormat required: false diff --git a/src/frontend/src/scheduler/plan_fragmenter.rs b/src/frontend/src/scheduler/plan_fragmenter.rs index 6777f9373b841..63b6eef38da71 100644 --- a/src/frontend/src/scheduler/plan_fragmenter.rs +++ b/src/frontend/src/scheduler/plan_fragmenter.rs @@ -310,9 +310,11 @@ impl SourceScanInfo { Ok(SourceScanInfo::Complete(split_info)) } ConnectorProperties::OpendalS3(prop) => { + let recursive_scan = prop.fs_common.recursive_scan.unwrap_or_default(); + let lister: OpendalEnumerator = OpendalEnumerator::new_s3_source(prop.s3_properties, prop.assume_role)?; - let stream = build_opendal_fs_list_for_batch(lister); + let stream = build_opendal_fs_list_for_batch(lister, recursive_scan); let batch_res: Vec<_> = stream.try_collect().await?; let res = batch_res @@ -323,18 +325,22 @@ impl SourceScanInfo { Ok(SourceScanInfo::Complete(res)) } ConnectorProperties::Gcs(prop) => { + let recursive_scan = prop.fs_common.recursive_scan.unwrap_or_default(); + let lister: OpendalEnumerator = OpendalEnumerator::new_gcs_source(*prop)?; - let stream = build_opendal_fs_list_for_batch(lister); + let stream = build_opendal_fs_list_for_batch(lister, recursive_scan); let batch_res: Vec<_> = stream.try_collect().await?; let res = batch_res.into_iter().map(SplitImpl::Gcs).collect_vec(); Ok(SourceScanInfo::Complete(res)) } ConnectorProperties::Azblob(prop) => { + let recursive_scan = prop.fs_common.recursive_scan.unwrap_or_default(); + let lister: OpendalEnumerator = OpendalEnumerator::new_azblob_source(*prop)?; - let stream = build_opendal_fs_list_for_batch(lister); + let stream = build_opendal_fs_list_for_batch(lister, recursive_scan); let batch_res: Vec<_> = stream.try_collect().await?; let res = batch_res.into_iter().map(SplitImpl::Azblob).collect_vec();