Skip to content

Commit

Permalink
refactor(source): refine source macro (#12260)
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 authored Sep 19, 2023
1 parent ed96064 commit c8ea5ee
Show file tree
Hide file tree
Showing 14 changed files with 424 additions and 329 deletions.
2 changes: 2 additions & 0 deletions src/connector/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ pub mod source;

pub mod common;

pub use paste::paste;

#[derive(Clone, Debug, Default)]
pub struct ConnectorParams {
pub connector_client: Option<ConnectorClient>,
Expand Down
318 changes: 176 additions & 142 deletions src/connector/src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,152 +13,225 @@
// limitations under the License.

#[macro_export]
macro_rules! impl_split {
($({ $variant_name:ident, $connector_name:ident, $split:ty} ),*) => {
impl From<&SplitImpl> for ConnectorSplit {
fn from(split: &SplitImpl) -> Self {
match split {
$( SplitImpl::$variant_name(inner) => ConnectorSplit { split_type: String::from($connector_name), encoded_split: inner.encode_to_bytes().to_vec() }, )*
}
macro_rules! for_all_classified_sources {
($macro:path $(,$extra_args:tt)*) => {
$macro! {
// cdc sources
{
{ Mysql },
{ Postgres },
{ Citus }
},
// other sources
{
{ Kafka, $crate::source::kafka::KafkaProperties, $crate::source::kafka::KafkaSplit },
{ Pulsar, $crate::source::pulsar::PulsarProperties, $crate::source::pulsar::PulsarSplit },
{ Kinesis, $crate::source::kinesis::KinesisProperties, $crate::source::kinesis::split::KinesisSplit },
{ Nexmark, $crate::source::nexmark::NexmarkProperties, $crate::source::nexmark::NexmarkSplit },
{ Datagen, $crate::source::datagen::DatagenProperties, $crate::source::datagen::DatagenSplit },
{ GooglePubsub, $crate::source::google_pubsub::PubsubProperties, $crate::source::google_pubsub::PubsubSplit },
{ Nats, $crate::source::nats::NatsProperties, $crate::source::nats::split::NatsSplit },
{ S3, $crate::source::filesystem::S3Properties, $crate::source::filesystem::FsSplit }
}
$(
,$extra_args
)*
}
$(
impl TryFrom<SplitImpl> for $split {
type Error = anyhow::Error;
};
}

fn try_from(split: SplitImpl) -> std::result::Result<Self, Self::Error> {
match split {
SplitImpl::$variant_name(inner) => Ok(inner),
other => Err(anyhow::anyhow!("expect {} but get {:?}", stringify!($split), other))
}
#[macro_export]
macro_rules! for_all_sources_inner {
(
{$({ $cdc_source_type:ident }),* },
{ $({ $source_variant:ident, $prop_name:ty, $split:ty }),* },
$macro:tt $(, $extra_args:tt)*
) => {
$crate::paste! {
$macro! {
{
$(
{
[< $cdc_source_type Cdc >],
$crate::source::cdc::[< $cdc_source_type CdcProperties >],
$crate::source::cdc::DebeziumCdcSplit<$crate::source::cdc::$cdc_source_type>
},
)*
$(
{ $source_variant, $prop_name, $split }
),*
}
$(,$extra_args)*
}
}
};
}

impl From<$split> for SplitImpl {
fn from(split: $split) -> SplitImpl {
SplitImpl::$variant_name(split)
}
}
#[macro_export]
macro_rules! for_all_sources {
($macro:path $(, $arg:tt )*) => {
$crate::for_all_classified_sources! {$crate::for_all_sources_inner, $macro $(,$arg)* }
};
}

)*
#[macro_export]
macro_rules! dispatch_source_enum_inner {
(
{$({$source_variant:ident, $prop_name:ty, $split:ty }),*},
$enum_name:ident,
$impl:tt,
{$inner_name:ident, $prop_type_name:ident, $split_type_name:ident},
$body:expr
) => {{
match $impl {
$(
$enum_name::$source_variant($inner_name) => {
type $prop_type_name = $prop_name;
type $split_type_name = $split;
{
$body
}
},
)*
}
}}
}

impl TryFrom<&ConnectorSplit> for SplitImpl {
type Error = anyhow::Error;
#[macro_export]
macro_rules! dispatch_source_enum {
($enum_name:ident, $impl:expr, $inner_name:tt, $body:expr) => {{
$crate::for_all_sources! {$crate::dispatch_source_enum_inner, $enum_name, { $impl }, $inner_name, $body}
}};
}

fn try_from(split: &ConnectorSplit) -> std::result::Result<Self, Self::Error> {
match split.split_type.to_lowercase().as_str() {
$( $connector_name => <$split>::restore_from_bytes(split.encoded_split.as_ref()).map(SplitImpl::$variant_name), )*
other => {
Err(anyhow!("connector '{}' is not supported", other))
#[macro_export]
macro_rules! match_source_name_str_inner {
(
{$({$source_variant:ident, $prop_name:ty, $split:ty }),*},
$source_name_str:expr,
$prop_type_name:ident,
$body:expr,
$on_other_closure:expr
) => {{
match $source_name_str {
$(
<$prop_name>::SOURCE_NAME => {
type $prop_type_name = $prop_name;
{
$body
}
}
}
},
)*
other => ($on_other_closure)(other),
}
}}
}

impl SplitMetaData for SplitImpl {
fn id(&self) -> SplitId {
match self {
$( Self::$variant_name(inner) => inner.id(), )*
}
}
#[macro_export]
macro_rules! match_source_name_str {
($source_name_str:expr, $prop_type_name:ident, $body:expr, $on_other_closure:expr) => {{
$crate::for_all_sources! {
$crate::match_source_name_str_inner,
{ $source_name_str },
$prop_type_name,
{ $body },
{ $on_other_closure }
}
}};
}

fn encode_to_json(&self) -> JsonbVal {
use serde_json::json;
let inner = self.encode_to_json_inner().take();
json!({ SPLIT_TYPE_FIELD: self.get_type(), SPLIT_INFO_FIELD: inner}).into()
}
#[macro_export]
macro_rules! dispatch_split_impl {
($impl:expr, $inner_name:ident, $prop_type_name:ident, $body:expr) => {{
use $crate::source::SplitImpl;
$crate::dispatch_source_enum! {SplitImpl, { $impl }, {$inner_name, $prop_type_name, IgnoreSplitType}, $body}
}};
}

fn restore_from_json(value: JsonbVal) -> Result<Self> {
let mut value = value.take();
let json_obj = value.as_object_mut().unwrap();
let split_type = json_obj.remove(SPLIT_TYPE_FIELD).unwrap().as_str().unwrap().to_string();
let inner_value = json_obj.remove(SPLIT_INFO_FIELD).unwrap();
Self::restore_from_json_inner(&split_type, inner_value.into())
}
#[macro_export]
macro_rules! impl_split {
({$({ $variant_name:ident, $prop_name:ty, $split:ty}),*}) => {

#[derive(Debug, Clone, EnumAsInner, PartialEq, Hash)]
pub enum SplitImpl {
$(
$variant_name($split),
)*
}

impl SplitImpl {
pub fn get_type(&self) -> String {
match self {
$( Self::$variant_name(_) => $connector_name, )*
}
.to_string()
}
$(
impl TryFrom<SplitImpl> for $split {
type Error = anyhow::Error;

pub fn update_in_place(&mut self, start_offset: String) -> anyhow::Result<()> {
match self {
$( Self::$variant_name(inner) => inner.update_with_offset(start_offset)?, )*
fn try_from(split: SplitImpl) -> std::result::Result<Self, Self::Error> {
match split {
SplitImpl::$variant_name(inner) => Ok(inner),
other => Err(anyhow::anyhow!("expect {} but get {:?}", stringify!($split), other))
}
}
Ok(())
}

pub fn encode_to_json_inner(&self) -> JsonbVal {
match self {
$( Self::$variant_name(inner) => inner.encode_to_json(), )*
impl From<$split> for SplitImpl {
fn from(split: $split) -> SplitImpl {
SplitImpl::$variant_name(split)
}
}

fn restore_from_json_inner(split_type: &str, value: JsonbVal) -> Result<Self> {
match split_type.to_lowercase().as_str() {
$( $connector_name => <$split>::restore_from_json(value).map(SplitImpl::$variant_name), )*
other => {
Err(anyhow!("connector '{}' is not supported", other))
}
}
}
}
)*
}
}

#[macro_export]
macro_rules! dispatch_source_prop {
($impl:expr, $source_prop:tt, $body:expr) => {{
use $crate::source::ConnectorProperties;
$crate::dispatch_source_enum! {ConnectorProperties, { $impl }, {$source_prop, IgnorePropType, IgnoreSplitType}, {$body}}
}};
}

#[macro_export]
macro_rules! impl_connector_properties {
($({ $variant_name:ident, $connector_name:ident } ),*) => {
impl ConnectorProperties {
pub fn extract(mut props: HashMap<String, String>) -> Result<Self> {
const UPSTREAM_SOURCE_KEY: &str = "connector";
let connector = props.remove(UPSTREAM_SOURCE_KEY).ok_or_else(|| anyhow!("Must specify 'connector' in WITH clause"))?;
use $crate::source::cdc::CDC_CONNECTOR_NAME_SUFFIX;
if connector.ends_with(CDC_CONNECTOR_NAME_SUFFIX) {
ConnectorProperties::new_cdc_properties(&connector, props)
} else {
let json_value = serde_json::to_value(props).map_err(|e| anyhow!(e))?;
match connector.to_lowercase().as_str() {
$(
$connector_name => {
serde_json::from_value(json_value).map_err(|e| anyhow!(e.to_string())).map(Self::$variant_name)
},
)*
_ => {
Err(anyhow!("connector '{}' is not supported", connector,))
}
}
({$({ $variant_name:ident, $prop_name:ty, $split:ty}),*}) => {
#[derive(Clone, Debug)]
pub enum ConnectorProperties {
$(
$variant_name(Box<$prop_name>),
)*
}

$(
impl From<$prop_name> for ConnectorProperties {
fn from(prop: $prop_name) -> ConnectorProperties {
ConnectorProperties::$variant_name(Box::new(prop))
}
}
}
)*
}
}

#[macro_export]
macro_rules! impl_cdc_source_type {
($({$source_type:ident, $name:expr }),*) => {
(
{$({$cdc_source_type:tt}),*},
{$($_ignore:tt),*}
) => {
$(
paste!{
$crate::paste!{
#[derive(Clone, Debug, Default, PartialEq, Eq, Hash)]
pub struct $source_type;
impl CdcSourceTypeTrait for $source_type {
const CDC_CONNECTOR_NAME: &'static str = concat!($name, "-cdc");
pub struct $cdc_source_type;
impl CdcSourceTypeTrait for $cdc_source_type {
const CDC_CONNECTOR_NAME: &'static str = concat!(stringify!([<$cdc_source_type:lower>]), "-cdc");
fn source_type() -> CdcSourceType {
CdcSourceType::$source_type
CdcSourceType::$cdc_source_type
}
}

pub type [< $source_type DebeziumSplitEnumerator >] = DebeziumSplitEnumerator<$source_type>;
pub type [<$cdc_source_type CdcProperties>] = CdcProperties<$cdc_source_type>;
}
)*

pub enum CdcSourceType {
$(
$source_type,
$cdc_source_type,
)*
}

Expand All @@ -167,7 +240,7 @@ macro_rules! impl_cdc_source_type {
match value {
PbSourceType::Unspecified => unreachable!(),
$(
PbSourceType::$source_type => CdcSourceType::$source_type,
PbSourceType::$cdc_source_type => CdcSourceType::$cdc_source_type,
)*
}
}
Expand All @@ -177,47 +250,8 @@ macro_rules! impl_cdc_source_type {
fn from(this: CdcSourceType) -> PbSourceType {
match this {
$(
CdcSourceType::$source_type => PbSourceType::$source_type,
)*
}
}
}

impl ConnectorProperties {
pub(crate) fn new_cdc_properties(
connector_name: &str,
properties: HashMap<String, String>,
) -> std::result::Result<Self, anyhow::Error> {
match connector_name {
$(
$source_type::CDC_CONNECTOR_NAME => paste! {
Ok(Self::[< $source_type Cdc >](Box::new(CdcProperties::<$source_type> {
props: properties,
..Default::default()
})))
},
)*
_ => Err(anyhow::anyhow!("unexpected cdc connector '{}'", connector_name,)),
}
}

pub fn init_cdc_properties(&mut self, table_schema: PbTableSchema) {
match self {
$(
paste! {ConnectorProperties:: [< $source_type Cdc >](c)} => {
c.table_schema = table_schema;
}
)*
_ => {}
}
}

pub fn is_cdc_connector(&self) -> bool {
match self {
$(
paste! {ConnectorProperties:: [< $source_type Cdc >](_)} => true,
CdcSourceType::$cdc_source_type => PbSourceType::$cdc_source_type,
)*
_ => false,
}
}
}
Expand Down
Loading

0 comments on commit c8ea5ee

Please sign in to comment.