Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(source): refactor s3 source WITH options handling #14190

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/connector/src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ macro_rules! match_source_name_str_inner {
}}
}

/// Matches against `SourceProperties::SOURCE_NAME` to dispatch logic.
#[macro_export]
macro_rules! match_source_name_str {
($source_name_str:expr, $prop_type_name:ident, $body:expr, $on_other_closure:expr) => {{
Expand Down
28 changes: 1 addition & 27 deletions src/connector/src/source/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,7 @@ use super::nexmark::source::message::NexmarkMeta;
use super::OPENDAL_S3_CONNECTOR;
use crate::parser::ParserConfig;
pub(crate) use crate::source::common::CommonSplitReader;
use crate::source::filesystem::opendal_source::OpendalS3Properties;
use crate::source::filesystem::{FsPageItem, GcsProperties, S3Properties};
use crate::source::filesystem::FsPageItem;
use crate::source::monitor::EnumeratorMetrics;
use crate::{
dispatch_source_prop, dispatch_split_impl, for_all_sources, impl_connector_properties,
Expand Down Expand Up @@ -382,31 +381,6 @@ impl ConnectorProperties {

impl ConnectorProperties {
pub fn extract(mut with_properties: HashMap<String, String>) -> Result<Self> {
if Self::is_new_fs_connector_hash_map(&with_properties) {
let connector = with_properties
.remove(UPSTREAM_SOURCE_KEY)
.ok_or_else(|| anyhow!("Must specify 'connector' in WITH clause"))?;
match connector.as_str() {
"s3_v2" => {
let assume_role = with_properties.get("s3.assume_role").cloned();
return Ok(ConnectorProperties::OpendalS3(Box::new(
OpendalS3Properties {
s3_properties: S3Properties::try_from_hashmap(with_properties)?,
assume_role,
},
)));
}
"gcs" => {
return Ok(ConnectorProperties::Gcs(Box::new(
GcsProperties::try_from_hashmap(with_properties)?,
)));
}
Comment on lines -389 to -403
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where's the dispatch logic going

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

... Do you remember why the dispatch is needed here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, previously we need the if here because s3_v2 and s3 shares the same config struct, and thus the macro cannot dispatch correctly. Since we have separated types now, the special treatment seems unnecessary and confusing.

I mentioned in the PR description

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

match_source_name_str takes care of all cases, which I think goes to this in the lowest level

{ S3, $crate::source::filesystem::S3Properties, $crate::source::filesystem::FsSplit },
{ Gcs, $crate::source::filesystem::opendal_source::GcsProperties , $crate::source::filesystem::OpendalFsSplit<$crate::source::filesystem::opendal_source::OpendalGcs> },
{ OpendalS3, $crate::source::filesystem::opendal_source::OpendalS3Properties, $crate::source::filesystem::OpendalFsSplit<$crate::source::filesystem::opendal_source::OpendalS3> },

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shipshipship

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for helping me remember the structure. No more question.

_ => {
unreachable!()
}
}
}

let connector = with_properties
.remove(UPSTREAM_SOURCE_KEY)
.ok_or_else(|| anyhow!("Must specify 'connector' in WITH clause"))?;
Expand Down
8 changes: 6 additions & 2 deletions src/connector/src/source/filesystem/opendal_source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ pub mod opendal_reader;

use self::opendal_enumerator::OpendalEnumerator;
use self::opendal_reader::OpendalReader;
use super::{OpendalFsSplit, S3Properties};
use super::s3::S3PropertiesCommon;
use super::OpendalFsSplit;
use crate::source::SourceProperties;

pub const GCS_CONNECTOR: &str = "gcs";
Expand Down Expand Up @@ -80,7 +81,10 @@ impl OpendalSource for OpendalGcs {

#[derive(Clone, Debug, Deserialize, PartialEq)]
pub struct OpendalS3Properties {
pub s3_properties: S3Properties,
#[serde(flatten)]
pub s3_properties: S3PropertiesCommon,

// The following are only supported by s3_v2 (opendal) source.
#[serde(rename = "s3.assume_role", default)]
pub assume_role: Option<String>,
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@ use opendal::Operator;
use super::opendal_enumerator::OpendalEnumerator;
use super::OpendalSource;
use crate::source::filesystem::s3::enumerator::get_prefix;
use crate::source::filesystem::S3Properties;
use crate::source::filesystem::s3::S3PropertiesCommon;

impl<Src: OpendalSource> OpendalEnumerator<Src> {
/// create opendal s3 source.
pub fn new_s3_source(
s3_properties: S3Properties,
s3_properties: S3PropertiesCommon,
assume_role: Option<String>,
) -> anyhow::Result<Self> {
// Create s3 builder.
Expand Down
6 changes: 4 additions & 2 deletions src/connector/src/source/filesystem/s3/enumerator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ impl SplitEnumerator for S3SplitEnumerator {
let config = AwsAuthProps::from(&properties);
let sdk_config = config.build_config().await?;
let s3_client = s3_client(&sdk_config, Some(default_conn_config()));
let properties = properties.common;
let (prefix, matcher) = if let Some(pattern) = properties.match_pattern.as_ref() {
let prefix = get_prefix(pattern);
let matcher = glob::Pattern::new(pattern)
Expand Down Expand Up @@ -125,11 +126,12 @@ mod tests {
}

use super::*;
use crate::source::filesystem::s3::S3PropertiesCommon;
use crate::source::SourceEnumeratorContext;
#[tokio::test]
#[ignore]
async fn test_s3_split_enumerator() {
let props = S3Properties {
let props = S3PropertiesCommon {
region_name: "ap-southeast-1".to_owned(),
bucket_name: "mingchao-s3-source".to_owned(),
match_pattern: Some("happy[0-9].csv".to_owned()),
Expand All @@ -138,7 +140,7 @@ mod tests {
endpoint_url: None,
};
let mut enumerator =
S3SplitEnumerator::new(props.clone(), SourceEnumeratorContext::default().into())
S3SplitEnumerator::new(props.into(), SourceEnumeratorContext::default().into())
.await
.unwrap();
let splits = enumerator.list_splits().await.unwrap();
Expand Down
16 changes: 15 additions & 1 deletion src/connector/src/source/filesystem/s3/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ use crate::source::SourceProperties;

pub const S3_CONNECTOR: &str = "s3";

/// These are supported by both `s3` and `s3_v2` (opendal) sources.
#[derive(Clone, Debug, Deserialize, PartialEq)]
pub struct S3Properties {
pub struct S3PropertiesCommon {
#[serde(rename = "s3.region_name")]
pub region_name: String,
#[serde(rename = "s3.bucket_name")]
Expand All @@ -40,6 +41,18 @@ pub struct S3Properties {
pub endpoint_url: Option<String>,
}

#[derive(Clone, Debug, Deserialize, PartialEq)]
pub struct S3Properties {
#[serde(flatten)]
pub common: S3PropertiesCommon,
}

impl From<S3PropertiesCommon> for S3Properties {
fn from(common: S3PropertiesCommon) -> Self {
Self { common }
}
}

impl SourceProperties for S3Properties {
type Split = FsSplit;
type SplitEnumerator = S3SplitEnumerator;
Expand All @@ -50,6 +63,7 @@ impl SourceProperties for S3Properties {

impl From<&S3Properties> for AwsAuthProps {
fn from(props: &S3Properties) -> Self {
let props = &props.common;
Self {
region: Some(props.region_name.clone()),
endpoint: props.endpoint_url.clone(),
Expand Down
10 changes: 6 additions & 4 deletions src/connector/src/source/filesystem/s3/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ impl SplitReader for S3FileReader {

let sdk_config = config.build_config().await?;

let bucket_name = props.bucket_name;
let bucket_name = props.common.bucket_name;
let s3_client = s3_client(&sdk_config, Some(default_conn_config()));

let s3_file_reader = S3FileReader {
Expand Down Expand Up @@ -252,20 +252,22 @@ mod tests {
CommonParserConfig, CsvProperties, EncodingProperties, ProtocolProperties,
SpecificParserConfig,
};
use crate::source::filesystem::{S3Properties, S3SplitEnumerator};
use crate::source::filesystem::s3::S3PropertiesCommon;
use crate::source::filesystem::S3SplitEnumerator;
use crate::source::{SourceColumnDesc, SourceEnumeratorContext, SplitEnumerator};

#[tokio::test]
#[ignore]
async fn test_s3_split_reader() {
let props = S3Properties {
let props: S3Properties = S3PropertiesCommon {
region_name: "ap-southeast-1".to_owned(),
bucket_name: "mingchao-s3-source".to_owned(),
match_pattern: None,
access: None,
secret: None,
endpoint_url: None,
};
}
.into();
let mut enumerator =
S3SplitEnumerator::new(props.clone(), SourceEnumeratorContext::default().into())
.await
Expand Down
Loading