Skip to content

Commit

Permalink
refactor(source): refactor s3 source WITH options handling (#14190)
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan authored Dec 27, 2023
1 parent 4695ad1 commit 476daec
Show file tree
Hide file tree
Showing 7 changed files with 35 additions and 38 deletions.
1 change: 1 addition & 0 deletions src/connector/src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ macro_rules! match_source_name_str_inner {
}}
}

/// Matches against `SourceProperties::SOURCE_NAME` to dispatch logic.
#[macro_export]
macro_rules! match_source_name_str {
($source_name_str:expr, $prop_type_name:ident, $body:expr, $on_other_closure:expr) => {{
Expand Down
28 changes: 1 addition & 27 deletions src/connector/src/source/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,7 @@ use super::nexmark::source::message::NexmarkMeta;
use super::OPENDAL_S3_CONNECTOR;
use crate::parser::ParserConfig;
pub(crate) use crate::source::common::CommonSplitReader;
use crate::source::filesystem::opendal_source::OpendalS3Properties;
use crate::source::filesystem::{FsPageItem, GcsProperties, S3Properties};
use crate::source::filesystem::FsPageItem;
use crate::source::monitor::EnumeratorMetrics;
use crate::{
dispatch_source_prop, dispatch_split_impl, for_all_sources, impl_connector_properties,
Expand Down Expand Up @@ -382,31 +381,6 @@ impl ConnectorProperties {

impl ConnectorProperties {
pub fn extract(mut with_properties: HashMap<String, String>) -> Result<Self> {
if Self::is_new_fs_connector_hash_map(&with_properties) {
let connector = with_properties
.remove(UPSTREAM_SOURCE_KEY)
.ok_or_else(|| anyhow!("Must specify 'connector' in WITH clause"))?;
match connector.as_str() {
"s3_v2" => {
let assume_role = with_properties.get("s3.assume_role").cloned();
return Ok(ConnectorProperties::OpendalS3(Box::new(
OpendalS3Properties {
s3_properties: S3Properties::try_from_hashmap(with_properties)?,
assume_role,
},
)));
}
"gcs" => {
return Ok(ConnectorProperties::Gcs(Box::new(
GcsProperties::try_from_hashmap(with_properties)?,
)));
}
_ => {
unreachable!()
}
}
}

let connector = with_properties
.remove(UPSTREAM_SOURCE_KEY)
.ok_or_else(|| anyhow!("Must specify 'connector' in WITH clause"))?;
Expand Down
8 changes: 6 additions & 2 deletions src/connector/src/source/filesystem/opendal_source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ pub mod opendal_reader;

use self::opendal_enumerator::OpendalEnumerator;
use self::opendal_reader::OpendalReader;
use super::{OpendalFsSplit, S3Properties};
use super::s3::S3PropertiesCommon;
use super::OpendalFsSplit;
use crate::source::SourceProperties;

pub const GCS_CONNECTOR: &str = "gcs";
Expand Down Expand Up @@ -79,7 +80,10 @@ impl OpendalSource for OpendalGcs {

#[derive(Clone, Debug, Deserialize, PartialEq)]
pub struct OpendalS3Properties {
pub s3_properties: S3Properties,
#[serde(flatten)]
pub s3_properties: S3PropertiesCommon,

// The following are only supported by s3_v2 (opendal) source.
#[serde(rename = "s3.assume_role", default)]
pub assume_role: Option<String>,
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@ use opendal::Operator;
use super::opendal_enumerator::OpendalEnumerator;
use super::OpendalSource;
use crate::source::filesystem::s3::enumerator::get_prefix;
use crate::source::filesystem::S3Properties;
use crate::source::filesystem::s3::S3PropertiesCommon;

impl<Src: OpendalSource> OpendalEnumerator<Src> {
/// create opendal s3 source.
pub fn new_s3_source(
s3_properties: S3Properties,
s3_properties: S3PropertiesCommon,
assume_role: Option<String>,
) -> anyhow::Result<Self> {
// Create s3 builder.
Expand Down
6 changes: 4 additions & 2 deletions src/connector/src/source/filesystem/s3/enumerator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ impl SplitEnumerator for S3SplitEnumerator {
let config = AwsAuthProps::from(&properties);
let sdk_config = config.build_config().await?;
let s3_client = s3_client(&sdk_config, Some(default_conn_config()));
let properties = properties.common;
let (prefix, matcher) = if let Some(pattern) = properties.match_pattern.as_ref() {
let prefix = get_prefix(pattern);
let matcher = glob::Pattern::new(pattern)
Expand Down Expand Up @@ -125,11 +126,12 @@ mod tests {
}

use super::*;
use crate::source::filesystem::s3::S3PropertiesCommon;
use crate::source::SourceEnumeratorContext;
#[tokio::test]
#[ignore]
async fn test_s3_split_enumerator() {
let props = S3Properties {
let props = S3PropertiesCommon {
region_name: "ap-southeast-1".to_owned(),
bucket_name: "mingchao-s3-source".to_owned(),
match_pattern: Some("happy[0-9].csv".to_owned()),
Expand All @@ -138,7 +140,7 @@ mod tests {
endpoint_url: None,
};
let mut enumerator =
S3SplitEnumerator::new(props.clone(), SourceEnumeratorContext::default().into())
S3SplitEnumerator::new(props.into(), SourceEnumeratorContext::default().into())
.await
.unwrap();
let splits = enumerator.list_splits().await.unwrap();
Expand Down
16 changes: 15 additions & 1 deletion src/connector/src/source/filesystem/s3/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ use crate::source::SourceProperties;

pub const S3_CONNECTOR: &str = "s3";

/// These are supported by both `s3` and `s3_v2` (opendal) sources.
#[derive(Clone, Debug, Deserialize, PartialEq)]
pub struct S3Properties {
pub struct S3PropertiesCommon {
#[serde(rename = "s3.region_name")]
pub region_name: String,
#[serde(rename = "s3.bucket_name")]
Expand All @@ -40,6 +41,18 @@ pub struct S3Properties {
pub endpoint_url: Option<String>,
}

#[derive(Clone, Debug, Deserialize, PartialEq)]
pub struct S3Properties {
#[serde(flatten)]
pub common: S3PropertiesCommon,
}

impl From<S3PropertiesCommon> for S3Properties {
fn from(common: S3PropertiesCommon) -> Self {
Self { common }
}
}

impl SourceProperties for S3Properties {
type Split = FsSplit;
type SplitEnumerator = S3SplitEnumerator;
Expand All @@ -50,6 +63,7 @@ impl SourceProperties for S3Properties {

impl From<&S3Properties> for AwsAuthProps {
fn from(props: &S3Properties) -> Self {
let props = &props.common;
Self {
region: Some(props.region_name.clone()),
endpoint: props.endpoint_url.clone(),
Expand Down
10 changes: 6 additions & 4 deletions src/connector/src/source/filesystem/s3/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ impl SplitReader for S3FileReader {

let sdk_config = config.build_config().await?;

let bucket_name = props.bucket_name;
let bucket_name = props.common.bucket_name;
let s3_client = s3_client(&sdk_config, Some(default_conn_config()));

let s3_file_reader = S3FileReader {
Expand Down Expand Up @@ -252,20 +252,22 @@ mod tests {
CommonParserConfig, CsvProperties, EncodingProperties, ProtocolProperties,
SpecificParserConfig,
};
use crate::source::filesystem::{S3Properties, S3SplitEnumerator};
use crate::source::filesystem::s3::S3PropertiesCommon;
use crate::source::filesystem::S3SplitEnumerator;
use crate::source::{SourceColumnDesc, SourceEnumeratorContext, SplitEnumerator};

#[tokio::test]
#[ignore]
async fn test_s3_split_reader() {
let props = S3Properties {
let props: S3Properties = S3PropertiesCommon {
region_name: "ap-southeast-1".to_owned(),
bucket_name: "mingchao-s3-source".to_owned(),
match_pattern: None,
access: None,
secret: None,
endpoint_url: None,
};
}
.into();
let mut enumerator =
S3SplitEnumerator::new(props.clone(), SourceEnumeratorContext::default().into())
.await
Expand Down

0 comments on commit 476daec

Please sign in to comment.