Skip to content

Commit

Permalink
refactor: revert "#18324 configure whether to recursively scan file s…
Browse files Browse the repository at this point in the history
…ource" (#18517)
  • Loading branch information
wcy-fdu authored Sep 14, 2024
1 parent 21345cb commit c80351b
Show file tree
Hide file tree
Showing 6 changed files with 18 additions and 88 deletions.
26 changes: 2 additions & 24 deletions e2e_test/source/opendal/posix_fs.slt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ statement ok
SET RW_IMPLICIT_FLUSH TO true;

statement ok
CREATE TABLE diamonds_recursive_read (
CREATE TABLE diamonds (
carat FLOAT,
cut TEXT,
color TEXT,
Expand All @@ -11,13 +11,12 @@ CREATE TABLE diamonds_recursive_read (
connector = 'posix_fs',
match_pattern = 'data*.csv',
posix_fs.root = 'e2e_test/source/opendal/data',
recursive_scan = 'true',
) FORMAT PLAIN ENCODE CSV ( without_header = 'false', delimiter = ',');

sleep 10s

query TTTT rowsort
select * from diamonds_recursive_read;
select * from diamonds;
----
0.22 Premium I 62
0.23 Very Good H 57.5
Expand All @@ -30,26 +29,5 @@ select * from diamonds_recursive_read;
1.28 Good J 63.1
1.3 Fair E 64.7

statement ok
CREATE TABLE diamonds (
carat FLOAT,
cut TEXT,
color TEXT,
depth FLOAT,
) WITH (
connector = 'posix_fs',
match_pattern = 'data*.csv',
posix_fs.root = 'e2e_test/source/opendal',
) FORMAT PLAIN ENCODE CSV ( without_header = 'false', delimiter = ',');

sleep 10s

query TTTT rowsort
select * from diamonds;
----

statement ok
DROP TABLE diamonds;

statement ok
DROP TABLE diamonds_recursive_read;
5 changes: 0 additions & 5 deletions src/connector/src/source/filesystem/opendal_source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,7 @@ pub struct FsSourceCommon {
#[serde(rename = "refresh.interval.sec")]
#[serde_as(as = "Option<DisplayFromStr>")]
pub refresh_interval_sec: Option<u64>,

#[serde(rename = "recursive_scan", default)]
#[serde_as(as = "Option<DisplayFromStr>")]
pub recursive_scan: Option<bool>,
}

#[derive(Clone, Debug, Deserialize, PartialEq, WithOptions)]
pub struct GcsProperties {
#[serde(rename = "gcs.bucket_name")]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,13 @@ impl<Src: OpendalSource> SplitEnumerator for OpendalEnumerator<Src> {
}

impl<Src: OpendalSource> OpendalEnumerator<Src> {
pub async fn list(&self, recursive_scan: bool) -> ConnectorResult<ObjectMetadataIter> {
pub async fn list(&self) -> ConnectorResult<ObjectMetadataIter> {
let prefix = self.prefix.as_deref().unwrap_or("/");

let object_lister = self
.op
.lister_with(prefix)
.recursive(recursive_scan)
.recursive(true)
.metakey(Metakey::ContentLength | Metakey::LastModified)
.await?;
let stream = stream::unfold(object_lister, |mut object_lister| async move {
Expand Down Expand Up @@ -108,5 +108,9 @@ impl<Src: OpendalSource> OpendalEnumerator<Src> {
pub fn get_matcher(&self) -> &Option<glob::Pattern> {
&self.matcher
}

pub fn get_prefix(&self) -> &str {
self.prefix.as_deref().unwrap_or("/")
}
}
pub type ObjectMetadataIter = BoxStream<'static, ConnectorResult<FsPageItem>>;
39 changes: 7 additions & 32 deletions src/connector/src/source/reader/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,47 +93,27 @@ impl SourceReader {
match config {
ConnectorProperties::Gcs(prop) => {
list_interval_sec = get_list_interval_sec(prop.fs_common.refresh_interval_sec);
let recursive_scan = prop.fs_common.recursive_scan.unwrap_or_default();
let lister: OpendalEnumerator<OpendalGcs> =
OpendalEnumerator::new_gcs_source(*prop)?;
Ok(build_opendal_fs_list_stream(
lister,
list_interval_sec,
recursive_scan,
))
Ok(build_opendal_fs_list_stream(lister, list_interval_sec))
}
ConnectorProperties::OpendalS3(prop) => {
list_interval_sec = get_list_interval_sec(prop.fs_common.refresh_interval_sec);
let recursive_scan = prop.fs_common.recursive_scan.unwrap_or_default();
let lister: OpendalEnumerator<OpendalS3> =
OpendalEnumerator::new_s3_source(prop.s3_properties, prop.assume_role)?;
Ok(build_opendal_fs_list_stream(
lister,
list_interval_sec,
recursive_scan,
))
Ok(build_opendal_fs_list_stream(lister, list_interval_sec))
}
ConnectorProperties::Azblob(prop) => {
list_interval_sec = get_list_interval_sec(prop.fs_common.refresh_interval_sec);
let recursive_scan = prop.fs_common.recursive_scan.unwrap_or_default();
let lister: OpendalEnumerator<OpendalAzblob> =
OpendalEnumerator::new_azblob_source(*prop)?;
Ok(build_opendal_fs_list_stream(
lister,
list_interval_sec,
recursive_scan,
))
Ok(build_opendal_fs_list_stream(lister, list_interval_sec))
}
ConnectorProperties::PosixFs(prop) => {
list_interval_sec = get_list_interval_sec(prop.fs_common.refresh_interval_sec);
let recursive_scan = prop.fs_common.recursive_scan.unwrap_or_default();
let lister: OpendalEnumerator<OpendalPosixFs> =
OpendalEnumerator::new_posix_fs_source(*prop)?;
Ok(build_opendal_fs_list_stream(
lister,
list_interval_sec,
recursive_scan,
))
Ok(build_opendal_fs_list_stream(lister, list_interval_sec))
}
other => bail!("Unsupported source: {:?}", other),
}
Expand Down Expand Up @@ -284,11 +264,10 @@ impl SourceReader {
async fn build_opendal_fs_list_stream<Src: OpendalSource>(
lister: OpendalEnumerator<Src>,
list_interval_sec: u64,
recursive_scan: bool,
) {
loop {
let matcher = lister.get_matcher();
let mut object_metadata_iter = lister.list(recursive_scan).await?;
let mut object_metadata_iter = lister.list().await?;

while let Some(list_res) = object_metadata_iter.next().await {
match list_res {
Expand All @@ -300,7 +279,6 @@ async fn build_opendal_fs_list_stream<Src: OpendalSource>(
{
yield res
} else {
// Currrntly due to the lack of prefix list, we just skip the unmatched files.
continue;
}
}
Expand All @@ -315,12 +293,9 @@ async fn build_opendal_fs_list_stream<Src: OpendalSource>(
}

#[try_stream(boxed, ok = OpendalFsSplit<Src>, error = crate::error::ConnectorError)]
pub async fn build_opendal_fs_list_for_batch<Src: OpendalSource>(
lister: OpendalEnumerator<Src>,
recursive_scan: bool,
) {
pub async fn build_opendal_fs_list_for_batch<Src: OpendalSource>(lister: OpendalEnumerator<Src>) {
let matcher = lister.get_matcher();
let mut object_metadata_iter = lister.list(recursive_scan).await?;
let mut object_metadata_iter = lister.list().await?;

while let Some(list_res) = object_metadata_iter.next().await {
match list_res {
Expand Down
16 changes: 0 additions & 16 deletions src/connector/with_options_source.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,6 @@ AzblobProperties:
- name: refresh.interval.sec
field_type: u64
required: false
- name: recursive_scan
field_type: bool
required: false
default: Default::default
- name: compression_format
field_type: CompressionFormat
required: false
Expand Down Expand Up @@ -79,10 +75,6 @@ GcsProperties:
- name: refresh.interval.sec
field_type: u64
required: false
- name: recursive_scan
field_type: bool
required: false
default: Default::default
- name: compression_format
field_type: CompressionFormat
required: false
Expand Down Expand Up @@ -848,10 +840,6 @@ OpendalS3Properties:
- name: refresh.interval.sec
field_type: u64
required: false
- name: recursive_scan
field_type: bool
required: false
default: Default::default
PosixFsProperties:
fields:
- name: posix_fs.root
Expand All @@ -866,10 +854,6 @@ PosixFsProperties:
- name: refresh.interval.sec
field_type: u64
required: false
- name: recursive_scan
field_type: bool
required: false
default: Default::default
- name: compression_format
field_type: CompressionFormat
required: false
Expand Down
12 changes: 3 additions & 9 deletions src/frontend/src/scheduler/plan_fragmenter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,11 +310,9 @@ impl SourceScanInfo {
Ok(SourceScanInfo::Complete(split_info))
}
ConnectorProperties::OpendalS3(prop) => {
let recursive_scan = prop.fs_common.recursive_scan.unwrap_or_default();

let lister: OpendalEnumerator<OpendalS3> =
OpendalEnumerator::new_s3_source(prop.s3_properties, prop.assume_role)?;
let stream = build_opendal_fs_list_for_batch(lister, recursive_scan);
let stream = build_opendal_fs_list_for_batch(lister);

let batch_res: Vec<_> = stream.try_collect().await?;
let res = batch_res
Expand All @@ -325,22 +323,18 @@ impl SourceScanInfo {
Ok(SourceScanInfo::Complete(res))
}
ConnectorProperties::Gcs(prop) => {
let recursive_scan = prop.fs_common.recursive_scan.unwrap_or_default();

let lister: OpendalEnumerator<OpendalGcs> =
OpendalEnumerator::new_gcs_source(*prop)?;
let stream = build_opendal_fs_list_for_batch(lister, recursive_scan);
let stream = build_opendal_fs_list_for_batch(lister);
let batch_res: Vec<_> = stream.try_collect().await?;
let res = batch_res.into_iter().map(SplitImpl::Gcs).collect_vec();

Ok(SourceScanInfo::Complete(res))
}
ConnectorProperties::Azblob(prop) => {
let recursive_scan = prop.fs_common.recursive_scan.unwrap_or_default();

let lister: OpendalEnumerator<OpendalAzblob> =
OpendalEnumerator::new_azblob_source(*prop)?;
let stream = build_opendal_fs_list_for_batch(lister, recursive_scan);
let stream = build_opendal_fs_list_for_batch(lister);
let batch_res: Vec<_> = stream.try_collect().await?;
let res = batch_res.into_iter().map(SplitImpl::Azblob).collect_vec();

Expand Down

0 comments on commit c80351b

Please sign in to comment.