From a7b7bab78b22a37209c96a60e9c6a0d8cde2b029 Mon Sep 17 00:00:00 2001 From: xxchan Date: Mon, 25 Dec 2023 16:42:58 +0800 Subject: [PATCH 1/2] refactor(source): unify s3_v2 handling in ConnectorProperties::extract --- src/connector/src/macros.rs | 1 + src/connector/src/source/base.rs | 28 +------------------ .../source/filesystem/opendal_source/mod.rs | 1 + 3 files changed, 3 insertions(+), 27 deletions(-) diff --git a/src/connector/src/macros.rs b/src/connector/src/macros.rs index d777faf9bb2cc..e4538246b56e2 100644 --- a/src/connector/src/macros.rs +++ b/src/connector/src/macros.rs @@ -131,6 +131,7 @@ macro_rules! match_source_name_str_inner { }} } +/// Matches against `SourceProperties::SOURCE_NAME` to dispatch logic. #[macro_export] macro_rules! match_source_name_str { ($source_name_str:expr, $prop_type_name:ident, $body:expr, $on_other_closure:expr) => {{ diff --git a/src/connector/src/source/base.rs b/src/connector/src/source/base.rs index 8db65bb9681bf..d0600a904174c 100644 --- a/src/connector/src/source/base.rs +++ b/src/connector/src/source/base.rs @@ -45,8 +45,7 @@ use super::nexmark::source::message::NexmarkMeta; use super::OPENDAL_S3_CONNECTOR; use crate::parser::ParserConfig; pub(crate) use crate::source::common::CommonSplitReader; -use crate::source::filesystem::opendal_source::OpendalS3Properties; -use crate::source::filesystem::{FsPageItem, GcsProperties, S3Properties}; +use crate::source::filesystem::FsPageItem; use crate::source::monitor::EnumeratorMetrics; use crate::{ dispatch_source_prop, dispatch_split_impl, for_all_sources, impl_connector_properties, @@ -382,31 +381,6 @@ impl ConnectorProperties { impl ConnectorProperties { pub fn extract(mut with_properties: HashMap) -> Result { - if Self::is_new_fs_connector_hash_map(&with_properties) { - let connector = with_properties - .remove(UPSTREAM_SOURCE_KEY) - .ok_or_else(|| anyhow!("Must specify 'connector' in WITH clause"))?; - match connector.as_str() { - "s3_v2" => { - let assume_role = with_properties.get("s3.assume_role").cloned(); - return Ok(ConnectorProperties::OpendalS3(Box::new( - OpendalS3Properties { - s3_properties: S3Properties::try_from_hashmap(with_properties)?, - assume_role, - }, - ))); - } - "gcs" => { - return Ok(ConnectorProperties::Gcs(Box::new( - GcsProperties::try_from_hashmap(with_properties)?, - ))); - } - _ => { - unreachable!() - } - } - } - let connector = with_properties .remove(UPSTREAM_SOURCE_KEY) .ok_or_else(|| anyhow!("Must specify 'connector' in WITH clause"))?; diff --git a/src/connector/src/source/filesystem/opendal_source/mod.rs b/src/connector/src/source/filesystem/opendal_source/mod.rs index d72f94badb85c..f09a74077214c 100644 --- a/src/connector/src/source/filesystem/opendal_source/mod.rs +++ b/src/connector/src/source/filesystem/opendal_source/mod.rs @@ -80,6 +80,7 @@ impl OpendalSource for OpendalGcs { #[derive(Clone, Debug, Deserialize, PartialEq)] pub struct OpendalS3Properties { + #[serde(flatten)] pub s3_properties: S3Properties, #[serde(rename = "s3.assume_role", default)] pub assume_role: Option, From e44b49f7b9de845031b62893d61db824b42b9090 Mon Sep 17 00:00:00 2001 From: xxchan Date: Mon, 25 Dec 2023 20:32:49 +0800 Subject: [PATCH 2/2] extract a common struct for s3 sources --- .../src/source/filesystem/opendal_source/mod.rs | 7 +++++-- .../filesystem/opendal_source/s3_source.rs | 4 ++-- .../src/source/filesystem/s3/enumerator.rs | 6 ++++-- src/connector/src/source/filesystem/s3/mod.rs | 16 +++++++++++++++- .../src/source/filesystem/s3/source/reader.rs | 10 ++++++---- 5 files changed, 32 insertions(+), 11 deletions(-) diff --git a/src/connector/src/source/filesystem/opendal_source/mod.rs b/src/connector/src/source/filesystem/opendal_source/mod.rs index f09a74077214c..707136f98a96d 100644 --- a/src/connector/src/source/filesystem/opendal_source/mod.rs +++ b/src/connector/src/source/filesystem/opendal_source/mod.rs @@ -23,7 +23,8 @@ pub mod opendal_reader; use self::opendal_enumerator::OpendalEnumerator; use self::opendal_reader::OpendalReader; -use super::{OpendalFsSplit, S3Properties}; +use super::s3::S3PropertiesCommon; +use super::OpendalFsSplit; use crate::source::SourceProperties; pub const GCS_CONNECTOR: &str = "gcs"; @@ -81,7 +82,9 @@ impl OpendalSource for OpendalGcs { #[derive(Clone, Debug, Deserialize, PartialEq)] pub struct OpendalS3Properties { #[serde(flatten)] - pub s3_properties: S3Properties, + pub s3_properties: S3PropertiesCommon, + + // The following are only supported by s3_v2 (opendal) source. #[serde(rename = "s3.assume_role", default)] pub assume_role: Option, } diff --git a/src/connector/src/source/filesystem/opendal_source/s3_source.rs b/src/connector/src/source/filesystem/opendal_source/s3_source.rs index 22bac20be8dc0..51e5f4a68e5cf 100644 --- a/src/connector/src/source/filesystem/opendal_source/s3_source.rs +++ b/src/connector/src/source/filesystem/opendal_source/s3_source.rs @@ -22,12 +22,12 @@ use opendal::Operator; use super::opendal_enumerator::OpendalEnumerator; use super::OpendalSource; use crate::source::filesystem::s3::enumerator::get_prefix; -use crate::source::filesystem::S3Properties; +use crate::source::filesystem::s3::S3PropertiesCommon; impl OpendalEnumerator { /// create opendal s3 source. pub fn new_s3_source( - s3_properties: S3Properties, + s3_properties: S3PropertiesCommon, assume_role: Option, ) -> anyhow::Result { // Create s3 builder. diff --git a/src/connector/src/source/filesystem/s3/enumerator.rs b/src/connector/src/source/filesystem/s3/enumerator.rs index aa0f11d279578..03d29a56c1c11 100644 --- a/src/connector/src/source/filesystem/s3/enumerator.rs +++ b/src/connector/src/source/filesystem/s3/enumerator.rs @@ -79,6 +79,7 @@ impl SplitEnumerator for S3SplitEnumerator { let config = AwsAuthProps::from(&properties); let sdk_config = config.build_config().await?; let s3_client = s3_client(&sdk_config, Some(default_conn_config())); + let properties = properties.common; let (prefix, matcher) = if let Some(pattern) = properties.match_pattern.as_ref() { let prefix = get_prefix(pattern); let matcher = glob::Pattern::new(pattern) @@ -125,11 +126,12 @@ mod tests { } use super::*; + use crate::source::filesystem::s3::S3PropertiesCommon; use crate::source::SourceEnumeratorContext; #[tokio::test] #[ignore] async fn test_s3_split_enumerator() { - let props = S3Properties { + let props = S3PropertiesCommon { region_name: "ap-southeast-1".to_owned(), bucket_name: "mingchao-s3-source".to_owned(), match_pattern: Some("happy[0-9].csv".to_owned()), @@ -138,7 +140,7 @@ mod tests { endpoint_url: None, }; let mut enumerator = - S3SplitEnumerator::new(props.clone(), SourceEnumeratorContext::default().into()) + S3SplitEnumerator::new(props.into(), SourceEnumeratorContext::default().into()) .await .unwrap(); let splits = enumerator.list_splits().await.unwrap(); diff --git a/src/connector/src/source/filesystem/s3/mod.rs b/src/connector/src/source/filesystem/s3/mod.rs index c60f8994f76f1..4ea444d889d12 100644 --- a/src/connector/src/source/filesystem/s3/mod.rs +++ b/src/connector/src/source/filesystem/s3/mod.rs @@ -24,8 +24,9 @@ use crate::source::SourceProperties; pub const S3_CONNECTOR: &str = "s3"; +/// These are supported by both `s3` and `s3_v2` (opendal) sources. #[derive(Clone, Debug, Deserialize, PartialEq)] -pub struct S3Properties { +pub struct S3PropertiesCommon { #[serde(rename = "s3.region_name")] pub region_name: String, #[serde(rename = "s3.bucket_name")] @@ -40,6 +41,18 @@ pub struct S3Properties { pub endpoint_url: Option, } +#[derive(Clone, Debug, Deserialize, PartialEq)] +pub struct S3Properties { + #[serde(flatten)] + pub common: S3PropertiesCommon, +} + +impl From for S3Properties { + fn from(common: S3PropertiesCommon) -> Self { + Self { common } + } +} + impl SourceProperties for S3Properties { type Split = FsSplit; type SplitEnumerator = S3SplitEnumerator; @@ -50,6 +63,7 @@ impl SourceProperties for S3Properties { impl From<&S3Properties> for AwsAuthProps { fn from(props: &S3Properties) -> Self { + let props = &props.common; Self { region: Some(props.region_name.clone()), endpoint: props.endpoint_url.clone(), diff --git a/src/connector/src/source/filesystem/s3/source/reader.rs b/src/connector/src/source/filesystem/s3/source/reader.rs index 6cf4387ae3e92..a2421de1d4782 100644 --- a/src/connector/src/source/filesystem/s3/source/reader.rs +++ b/src/connector/src/source/filesystem/s3/source/reader.rs @@ -184,7 +184,7 @@ impl SplitReader for S3FileReader { let sdk_config = config.build_config().await?; - let bucket_name = props.bucket_name; + let bucket_name = props.common.bucket_name; let s3_client = s3_client(&sdk_config, Some(default_conn_config())); let s3_file_reader = S3FileReader { @@ -252,20 +252,22 @@ mod tests { CommonParserConfig, CsvProperties, EncodingProperties, ProtocolProperties, SpecificParserConfig, }; - use crate::source::filesystem::{S3Properties, S3SplitEnumerator}; + use crate::source::filesystem::s3::S3PropertiesCommon; + use crate::source::filesystem::S3SplitEnumerator; use crate::source::{SourceColumnDesc, SourceEnumeratorContext, SplitEnumerator}; #[tokio::test] #[ignore] async fn test_s3_split_reader() { - let props = S3Properties { + let props: S3Properties = S3PropertiesCommon { region_name: "ap-southeast-1".to_owned(), bucket_name: "mingchao-s3-source".to_owned(), match_pattern: None, access: None, secret: None, endpoint_url: None, - }; + } + .into(); let mut enumerator = S3SplitEnumerator::new(props.clone(), SourceEnumeratorContext::default().into()) .await