Skip to content

Commit

Permalink
refactor(source): unify s3_v2 handling in ConnectorProperties::extract
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan committed Dec 25, 2023
1 parent 64b9ed0 commit 1ff6db4
Show file tree
Hide file tree
Showing 3 changed files with 3 additions and 27 deletions.
1 change: 1 addition & 0 deletions src/connector/src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ macro_rules! match_source_name_str_inner {
}}
}

/// Matches against `SourceProperties::SOURCE_NAME` to dispatch logic.
#[macro_export]
macro_rules! match_source_name_str {
($source_name_str:expr, $prop_type_name:ident, $body:expr, $on_other_closure:expr) => {{
Expand Down
28 changes: 1 addition & 27 deletions src/connector/src/source/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,7 @@ use super::nexmark::source::message::NexmarkMeta;
use super::OPENDAL_S3_CONNECTOR;
use crate::parser::ParserConfig;
pub(crate) use crate::source::common::CommonSplitReader;
use crate::source::filesystem::opendal_source::OpendalS3Properties;
use crate::source::filesystem::{FsPageItem, GcsProperties, S3Properties};
use crate::source::filesystem::FsPageItem;
use crate::source::monitor::EnumeratorMetrics;
use crate::{
dispatch_source_prop, dispatch_split_impl, for_all_sources, impl_connector_properties,
Expand Down Expand Up @@ -382,31 +381,6 @@ impl ConnectorProperties {

impl ConnectorProperties {
pub fn extract(mut with_properties: HashMap<String, String>) -> Result<Self> {
if Self::is_new_fs_connector_hash_map(&with_properties) {
let connector = with_properties
.remove(UPSTREAM_SOURCE_KEY)
.ok_or_else(|| anyhow!("Must specify 'connector' in WITH clause"))?;
match connector.as_str() {
"s3_v2" => {
let assume_role = with_properties.get("s3.assume_role").cloned();
return Ok(ConnectorProperties::OpendalS3(Box::new(
OpendalS3Properties {
s3_properties: S3Properties::try_from_hashmap(with_properties)?,
assume_role,
},
)));
}
"gcs" => {
return Ok(ConnectorProperties::Gcs(Box::new(
GcsProperties::try_from_hashmap(with_properties)?,
)));
}
_ => {
unreachable!()
}
}
}

let connector = with_properties
.remove(UPSTREAM_SOURCE_KEY)
.ok_or_else(|| anyhow!("Must specify 'connector' in WITH clause"))?;
Expand Down
1 change: 1 addition & 0 deletions src/connector/src/source/filesystem/opendal_source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ impl OpendalSource for OpendalGcs {

#[derive(Clone, Debug, Deserialize, PartialEq)]
pub struct OpendalS3Properties {
#[serde(flatten)]
pub s3_properties: S3Properties,
#[serde(rename = "s3.assume_role", default)]
pub assume_role: Option<String>,
Expand Down

0 comments on commit 1ff6db4

Please sign in to comment.