From e44b49f7b9de845031b62893d61db824b42b9090 Mon Sep 17 00:00:00 2001 From: xxchan Date: Mon, 25 Dec 2023 20:32:49 +0800 Subject: [PATCH] extract a common struct for s3 sources --- .../src/source/filesystem/opendal_source/mod.rs | 7 +++++-- .../filesystem/opendal_source/s3_source.rs | 4 ++-- .../src/source/filesystem/s3/enumerator.rs | 6 ++++-- src/connector/src/source/filesystem/s3/mod.rs | 16 +++++++++++++++- .../src/source/filesystem/s3/source/reader.rs | 10 ++++++---- 5 files changed, 32 insertions(+), 11 deletions(-) diff --git a/src/connector/src/source/filesystem/opendal_source/mod.rs b/src/connector/src/source/filesystem/opendal_source/mod.rs index f09a74077214c..707136f98a96d 100644 --- a/src/connector/src/source/filesystem/opendal_source/mod.rs +++ b/src/connector/src/source/filesystem/opendal_source/mod.rs @@ -23,7 +23,8 @@ pub mod opendal_reader; use self::opendal_enumerator::OpendalEnumerator; use self::opendal_reader::OpendalReader; -use super::{OpendalFsSplit, S3Properties}; +use super::s3::S3PropertiesCommon; +use super::OpendalFsSplit; use crate::source::SourceProperties; pub const GCS_CONNECTOR: &str = "gcs"; @@ -81,7 +82,9 @@ impl OpendalSource for OpendalGcs { #[derive(Clone, Debug, Deserialize, PartialEq)] pub struct OpendalS3Properties { #[serde(flatten)] - pub s3_properties: S3Properties, + pub s3_properties: S3PropertiesCommon, + + // The following are only supported by s3_v2 (opendal) source. #[serde(rename = "s3.assume_role", default)] pub assume_role: Option, } diff --git a/src/connector/src/source/filesystem/opendal_source/s3_source.rs b/src/connector/src/source/filesystem/opendal_source/s3_source.rs index 22bac20be8dc0..51e5f4a68e5cf 100644 --- a/src/connector/src/source/filesystem/opendal_source/s3_source.rs +++ b/src/connector/src/source/filesystem/opendal_source/s3_source.rs @@ -22,12 +22,12 @@ use opendal::Operator; use super::opendal_enumerator::OpendalEnumerator; use super::OpendalSource; use crate::source::filesystem::s3::enumerator::get_prefix; -use crate::source::filesystem::S3Properties; +use crate::source::filesystem::s3::S3PropertiesCommon; impl OpendalEnumerator { /// create opendal s3 source. pub fn new_s3_source( - s3_properties: S3Properties, + s3_properties: S3PropertiesCommon, assume_role: Option, ) -> anyhow::Result { // Create s3 builder. diff --git a/src/connector/src/source/filesystem/s3/enumerator.rs b/src/connector/src/source/filesystem/s3/enumerator.rs index aa0f11d279578..03d29a56c1c11 100644 --- a/src/connector/src/source/filesystem/s3/enumerator.rs +++ b/src/connector/src/source/filesystem/s3/enumerator.rs @@ -79,6 +79,7 @@ impl SplitEnumerator for S3SplitEnumerator { let config = AwsAuthProps::from(&properties); let sdk_config = config.build_config().await?; let s3_client = s3_client(&sdk_config, Some(default_conn_config())); + let properties = properties.common; let (prefix, matcher) = if let Some(pattern) = properties.match_pattern.as_ref() { let prefix = get_prefix(pattern); let matcher = glob::Pattern::new(pattern) @@ -125,11 +126,12 @@ mod tests { } use super::*; + use crate::source::filesystem::s3::S3PropertiesCommon; use crate::source::SourceEnumeratorContext; #[tokio::test] #[ignore] async fn test_s3_split_enumerator() { - let props = S3Properties { + let props = S3PropertiesCommon { region_name: "ap-southeast-1".to_owned(), bucket_name: "mingchao-s3-source".to_owned(), match_pattern: Some("happy[0-9].csv".to_owned()), @@ -138,7 +140,7 @@ mod tests { endpoint_url: None, }; let mut enumerator = - S3SplitEnumerator::new(props.clone(), SourceEnumeratorContext::default().into()) + S3SplitEnumerator::new(props.into(), SourceEnumeratorContext::default().into()) .await .unwrap(); let splits = enumerator.list_splits().await.unwrap(); diff --git a/src/connector/src/source/filesystem/s3/mod.rs b/src/connector/src/source/filesystem/s3/mod.rs index c60f8994f76f1..4ea444d889d12 100644 --- a/src/connector/src/source/filesystem/s3/mod.rs +++ b/src/connector/src/source/filesystem/s3/mod.rs @@ -24,8 +24,9 @@ use crate::source::SourceProperties; pub const S3_CONNECTOR: &str = "s3"; +/// These are supported by both `s3` and `s3_v2` (opendal) sources. #[derive(Clone, Debug, Deserialize, PartialEq)] -pub struct S3Properties { +pub struct S3PropertiesCommon { #[serde(rename = "s3.region_name")] pub region_name: String, #[serde(rename = "s3.bucket_name")] @@ -40,6 +41,18 @@ pub struct S3Properties { pub endpoint_url: Option, } +#[derive(Clone, Debug, Deserialize, PartialEq)] +pub struct S3Properties { + #[serde(flatten)] + pub common: S3PropertiesCommon, +} + +impl From for S3Properties { + fn from(common: S3PropertiesCommon) -> Self { + Self { common } + } +} + impl SourceProperties for S3Properties { type Split = FsSplit; type SplitEnumerator = S3SplitEnumerator; @@ -50,6 +63,7 @@ impl SourceProperties for S3Properties { impl From<&S3Properties> for AwsAuthProps { fn from(props: &S3Properties) -> Self { + let props = &props.common; Self { region: Some(props.region_name.clone()), endpoint: props.endpoint_url.clone(), diff --git a/src/connector/src/source/filesystem/s3/source/reader.rs b/src/connector/src/source/filesystem/s3/source/reader.rs index 6cf4387ae3e92..a2421de1d4782 100644 --- a/src/connector/src/source/filesystem/s3/source/reader.rs +++ b/src/connector/src/source/filesystem/s3/source/reader.rs @@ -184,7 +184,7 @@ impl SplitReader for S3FileReader { let sdk_config = config.build_config().await?; - let bucket_name = props.bucket_name; + let bucket_name = props.common.bucket_name; let s3_client = s3_client(&sdk_config, Some(default_conn_config())); let s3_file_reader = S3FileReader { @@ -252,20 +252,22 @@ mod tests { CommonParserConfig, CsvProperties, EncodingProperties, ProtocolProperties, SpecificParserConfig, }; - use crate::source::filesystem::{S3Properties, S3SplitEnumerator}; + use crate::source::filesystem::s3::S3PropertiesCommon; + use crate::source::filesystem::S3SplitEnumerator; use crate::source::{SourceColumnDesc, SourceEnumeratorContext, SplitEnumerator}; #[tokio::test] #[ignore] async fn test_s3_split_reader() { - let props = S3Properties { + let props: S3Properties = S3PropertiesCommon { region_name: "ap-southeast-1".to_owned(), bucket_name: "mingchao-s3-source".to_owned(), match_pattern: None, access: None, secret: None, endpoint_url: None, - }; + } + .into(); let mut enumerator = S3SplitEnumerator::new(props.clone(), SourceEnumeratorContext::default().into()) .await