From be65e587cfc09455638fd88236cf96c96246f42d Mon Sep 17 00:00:00 2001 From: xxchan Date: Mon, 25 Dec 2023 16:42:58 +0800 Subject: [PATCH] refactor(source): unify s3_v2 handling in ConnectorProperties::extract --- src/connector/src/macros.rs | 1 + src/connector/src/source/base.rs | 28 +------------------ .../source/filesystem/opendal_source/mod.rs | 1 + 3 files changed, 3 insertions(+), 27 deletions(-) diff --git a/src/connector/src/macros.rs b/src/connector/src/macros.rs index d777faf9bb2cc..e4538246b56e2 100644 --- a/src/connector/src/macros.rs +++ b/src/connector/src/macros.rs @@ -131,6 +131,7 @@ macro_rules! match_source_name_str_inner { }} } +/// Matches against `SourceProperties::SOURCE_NAME` to dispatch logic. #[macro_export] macro_rules! match_source_name_str { ($source_name_str:expr, $prop_type_name:ident, $body:expr, $on_other_closure:expr) => {{ diff --git a/src/connector/src/source/base.rs b/src/connector/src/source/base.rs index 8db65bb9681bf..d0600a904174c 100644 --- a/src/connector/src/source/base.rs +++ b/src/connector/src/source/base.rs @@ -45,8 +45,7 @@ use super::nexmark::source::message::NexmarkMeta; use super::OPENDAL_S3_CONNECTOR; use crate::parser::ParserConfig; pub(crate) use crate::source::common::CommonSplitReader; -use crate::source::filesystem::opendal_source::OpendalS3Properties; -use crate::source::filesystem::{FsPageItem, GcsProperties, S3Properties}; +use crate::source::filesystem::FsPageItem; use crate::source::monitor::EnumeratorMetrics; use crate::{ dispatch_source_prop, dispatch_split_impl, for_all_sources, impl_connector_properties, @@ -382,31 +381,6 @@ impl ConnectorProperties { impl ConnectorProperties { pub fn extract(mut with_properties: HashMap) -> Result { - if Self::is_new_fs_connector_hash_map(&with_properties) { - let connector = with_properties - .remove(UPSTREAM_SOURCE_KEY) - .ok_or_else(|| anyhow!("Must specify 'connector' in WITH clause"))?; - match connector.as_str() { - "s3_v2" => { - let assume_role = with_properties.get("s3.assume_role").cloned(); - return Ok(ConnectorProperties::OpendalS3(Box::new( - OpendalS3Properties { - s3_properties: S3Properties::try_from_hashmap(with_properties)?, - assume_role, - }, - ))); - } - "gcs" => { - return Ok(ConnectorProperties::Gcs(Box::new( - GcsProperties::try_from_hashmap(with_properties)?, - ))); - } - _ => { - unreachable!() - } - } - } - let connector = with_properties .remove(UPSTREAM_SOURCE_KEY) .ok_or_else(|| anyhow!("Must specify 'connector' in WITH clause"))?; diff --git a/src/connector/src/source/filesystem/opendal_source/mod.rs b/src/connector/src/source/filesystem/opendal_source/mod.rs index d72f94badb85c..f09a74077214c 100644 --- a/src/connector/src/source/filesystem/opendal_source/mod.rs +++ b/src/connector/src/source/filesystem/opendal_source/mod.rs @@ -80,6 +80,7 @@ impl OpendalSource for OpendalGcs { #[derive(Clone, Debug, Deserialize, PartialEq)] pub struct OpendalS3Properties { + #[serde(flatten)] pub s3_properties: S3Properties, #[serde(rename = "s3.assume_role", default)] pub assume_role: Option,