Skip to content

Commit

Permalink
feat(source): configure whether to recursively scan file source (#18324)
Browse files Browse the repository at this point in the history
  • Loading branch information
wcy-fdu authored Sep 10, 2024
1 parent dc07da0 commit 596138e
Show file tree
Hide file tree
Showing 9 changed files with 90 additions and 19 deletions.
1 change: 1 addition & 0 deletions e2e_test/s3/fs_parquet_source_and_sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ def _table():
return 's3_test_parquet'

# Execute a SELECT statement
cur.execute(f'''set sink_decouple = false;''')
cur.execute(f'''CREATE sink test_file_sink as select
id,
name,
Expand Down
32 changes: 27 additions & 5 deletions e2e_test/source/opendal/posix_fs.slt
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,22 @@ statement ok
SET RW_IMPLICIT_FLUSH TO true;

statement ok
CREATE TABLE diamonds (
CREATE TABLE diamonds_recursive_read (
carat FLOAT,
cut TEXT,
color TEXT,
depth FLOAT,
) WITH (
connector = 'posix_fs',
match_pattern = 'data*.csv',
posix_fs.root = 'e2e_test/source/opendal/data',
connector = 'posix_fs',
match_pattern = 'data*.csv',
posix_fs.root = 'e2e_test/source/opendal/data',
recursive_scan = 'true',
) FORMAT PLAIN ENCODE CSV ( without_header = 'false', delimiter = ',');

sleep 10s

query TTTT rowsort
select * from diamonds;
select * from diamonds_recursive_read;
----
0.22 Premium I 62
0.23 Very Good H 57.5
Expand All @@ -29,5 +30,26 @@ select * from diamonds;
1.28 Good J 63.1
1.3 Fair E 64.7

statement ok
CREATE TABLE diamonds (
carat FLOAT,
cut TEXT,
color TEXT,
depth FLOAT,
) WITH (
connector = 'posix_fs',
match_pattern = 'data*.csv',
posix_fs.root = 'e2e_test/source/opendal',
) FORMAT PLAIN ENCODE CSV ( without_header = 'false', delimiter = ',');

sleep 10s

query TTTT rowsort
select * from diamonds;
----

statement ok
DROP TABLE diamonds;

statement ok
DROP TABLE diamonds_recursive_read;
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ impl<Src: OpendalSource> OpendalEnumerator<Src> {
};

let compression_format = azblob_properties.compression_format;

Ok(Self {
op,
prefix,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ impl<Src: OpendalSource> OpendalEnumerator<Src> {
};

let compression_format = gcs_properties.compression_format;

Ok(Self {
op,
prefix,
Expand Down
4 changes: 4 additions & 0 deletions src/connector/src/source/filesystem/opendal_source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ pub struct FsSourceCommon {
#[serde(rename = "refresh.interval.sec")]
#[serde_as(as = "Option<DisplayFromStr>")]
pub refresh_interval_sec: Option<u64>,

#[serde(rename = "recursive_scan", default)]
#[serde_as(as = "Option<DisplayFromStr>")]
pub recursive_scan: Option<bool>,
}

#[derive(Clone, Debug, Deserialize, PartialEq, WithOptions)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,13 @@ impl<Src: OpendalSource> SplitEnumerator for OpendalEnumerator<Src> {
}

impl<Src: OpendalSource> OpendalEnumerator<Src> {
pub async fn list(&self) -> ConnectorResult<ObjectMetadataIter> {
pub async fn list(&self, recursive_scan: bool) -> ConnectorResult<ObjectMetadataIter> {
let prefix = self.prefix.as_deref().unwrap_or("/");

let object_lister = self
.op
.lister_with(prefix)
.recursive(false)
.recursive(recursive_scan)
.metakey(Metakey::ContentLength | Metakey::LastModified)
.await?;
let stream = stream::unfold(object_lister, |mut object_lister| async move {
Expand Down
38 changes: 31 additions & 7 deletions src/connector/src/source/reader/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,27 +93,47 @@ impl SourceReader {
match config {
ConnectorProperties::Gcs(prop) => {
list_interval_sec = get_list_interval_sec(prop.fs_common.refresh_interval_sec);
let recursive_scan = prop.fs_common.recursive_scan.unwrap_or_default();
let lister: OpendalEnumerator<OpendalGcs> =
OpendalEnumerator::new_gcs_source(*prop)?;
Ok(build_opendal_fs_list_stream(lister, list_interval_sec))
Ok(build_opendal_fs_list_stream(
lister,
list_interval_sec,
recursive_scan,
))
}
ConnectorProperties::OpendalS3(prop) => {
list_interval_sec = get_list_interval_sec(prop.fs_common.refresh_interval_sec);
let recursive_scan = prop.fs_common.recursive_scan.unwrap_or_default();
let lister: OpendalEnumerator<OpendalS3> =
OpendalEnumerator::new_s3_source(prop.s3_properties, prop.assume_role)?;
Ok(build_opendal_fs_list_stream(lister, list_interval_sec))
Ok(build_opendal_fs_list_stream(
lister,
list_interval_sec,
recursive_scan,
))
}
ConnectorProperties::Azblob(prop) => {
list_interval_sec = get_list_interval_sec(prop.fs_common.refresh_interval_sec);
let recursive_scan = prop.fs_common.recursive_scan.unwrap_or_default();
let lister: OpendalEnumerator<OpendalAzblob> =
OpendalEnumerator::new_azblob_source(*prop)?;
Ok(build_opendal_fs_list_stream(lister, list_interval_sec))
Ok(build_opendal_fs_list_stream(
lister,
list_interval_sec,
recursive_scan,
))
}
ConnectorProperties::PosixFs(prop) => {
list_interval_sec = get_list_interval_sec(prop.fs_common.refresh_interval_sec);
let recursive_scan = prop.fs_common.recursive_scan.unwrap_or_default();
let lister: OpendalEnumerator<OpendalPosixFs> =
OpendalEnumerator::new_posix_fs_source(*prop)?;
Ok(build_opendal_fs_list_stream(lister, list_interval_sec))
Ok(build_opendal_fs_list_stream(
lister,
list_interval_sec,
recursive_scan,
))
}
other => bail!("Unsupported source: {:?}", other),
}
Expand Down Expand Up @@ -264,10 +284,11 @@ impl SourceReader {
async fn build_opendal_fs_list_stream<Src: OpendalSource>(
lister: OpendalEnumerator<Src>,
list_interval_sec: u64,
recursive_scan: bool,
) {
loop {
let matcher = lister.get_matcher();
let mut object_metadata_iter = lister.list().await?;
let mut object_metadata_iter = lister.list(recursive_scan).await?;

while let Some(list_res) = object_metadata_iter.next().await {
match list_res {
Expand All @@ -294,9 +315,12 @@ async fn build_opendal_fs_list_stream<Src: OpendalSource>(
}

#[try_stream(boxed, ok = OpendalFsSplit<Src>, error = crate::error::ConnectorError)]
pub async fn build_opendal_fs_list_for_batch<Src: OpendalSource>(lister: OpendalEnumerator<Src>) {
pub async fn build_opendal_fs_list_for_batch<Src: OpendalSource>(
lister: OpendalEnumerator<Src>,
recursive_scan: bool,
) {
let matcher = lister.get_matcher();
let mut object_metadata_iter = lister.list().await?;
let mut object_metadata_iter = lister.list(recursive_scan).await?;

while let Some(list_res) = object_metadata_iter.next().await {
match list_res {
Expand Down
16 changes: 16 additions & 0 deletions src/connector/with_options_source.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ AzblobProperties:
- name: refresh.interval.sec
field_type: u64
required: false
- name: recursive_scan
field_type: bool
required: false
default: Default::default
- name: compression_format
field_type: CompressionFormat
required: false
Expand Down Expand Up @@ -75,6 +79,10 @@ GcsProperties:
- name: refresh.interval.sec
field_type: u64
required: false
- name: recursive_scan
field_type: bool
required: false
default: Default::default
- name: compression_format
field_type: CompressionFormat
required: false
Expand Down Expand Up @@ -840,6 +848,10 @@ OpendalS3Properties:
- name: refresh.interval.sec
field_type: u64
required: false
- name: recursive_scan
field_type: bool
required: false
default: Default::default
PosixFsProperties:
fields:
- name: posix_fs.root
Expand All @@ -854,6 +866,10 @@ PosixFsProperties:
- name: refresh.interval.sec
field_type: u64
required: false
- name: recursive_scan
field_type: bool
required: false
default: Default::default
- name: compression_format
field_type: CompressionFormat
required: false
Expand Down
12 changes: 9 additions & 3 deletions src/frontend/src/scheduler/plan_fragmenter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,9 +310,11 @@ impl SourceScanInfo {
Ok(SourceScanInfo::Complete(split_info))
}
ConnectorProperties::OpendalS3(prop) => {
let recursive_scan = prop.fs_common.recursive_scan.unwrap_or_default();

let lister: OpendalEnumerator<OpendalS3> =
OpendalEnumerator::new_s3_source(prop.s3_properties, prop.assume_role)?;
let stream = build_opendal_fs_list_for_batch(lister);
let stream = build_opendal_fs_list_for_batch(lister, recursive_scan);

let batch_res: Vec<_> = stream.try_collect().await?;
let res = batch_res
Expand All @@ -323,18 +325,22 @@ impl SourceScanInfo {
Ok(SourceScanInfo::Complete(res))
}
ConnectorProperties::Gcs(prop) => {
let recursive_scan = prop.fs_common.recursive_scan.unwrap_or_default();

let lister: OpendalEnumerator<OpendalGcs> =
OpendalEnumerator::new_gcs_source(*prop)?;
let stream = build_opendal_fs_list_for_batch(lister);
let stream = build_opendal_fs_list_for_batch(lister, recursive_scan);
let batch_res: Vec<_> = stream.try_collect().await?;
let res = batch_res.into_iter().map(SplitImpl::Gcs).collect_vec();

Ok(SourceScanInfo::Complete(res))
}
ConnectorProperties::Azblob(prop) => {
let recursive_scan = prop.fs_common.recursive_scan.unwrap_or_default();

let lister: OpendalEnumerator<OpendalAzblob> =
OpendalEnumerator::new_azblob_source(*prop)?;
let stream = build_opendal_fs_list_for_batch(lister);
let stream = build_opendal_fs_list_for_batch(lister, recursive_scan);
let batch_res: Vec<_> = stream.try_collect().await?;
let res = batch_res.into_iter().map(SplitImpl::Azblob).collect_vec();

Expand Down

0 comments on commit 596138e

Please sign in to comment.