Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(source): refine source macro #12260

Merged
merged 22 commits into from
Sep 19, 2023
Merged
Changes from 1 commit
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
5bd5f44
refactor source with trait
wenym1 Sep 4, 2023
a205e09
refactor(source): specify cdc generic type parameter for different cd…
wenym1 Sep 4, 2023
6282937
Merge branch 'yiming/refactor-cdc-type' into yiming/refactor-source-w…
wenym1 Sep 4, 2023
074a57c
refactor(source): implement the common split reader as a generic func…
wenym1 Sep 4, 2023
d824b76
Merge branch 'yiming/extract-source-common-reader' into yiming/refact…
wenym1 Sep 4, 2023
8167f25
Merge branch 'main' into yiming/extract-source-common-reader
wenym1 Sep 4, 2023
07a6e82
Merge branch 'main' into yiming/extract-source-common-reader
wenym1 Sep 5, 2023
c11d850
Merge branch 'yiming/extract-source-common-reader' into yiming/refact…
wenym1 Sep 5, 2023
8cdc90e
extract more common cdc logic
wenym1 Sep 5, 2023
dbaf522
Merge branch 'yiming/refactor-cdc-type' into yiming/refactor-source-w…
wenym1 Sep 5, 2023
a1c9cf6
Merge branch 'main' into yiming/refactor-cdc-type
wenym1 Sep 8, 2023
14e0792
Merge branch 'yiming/refactor-cdc-type' into yiming/refactor-source-w…
wenym1 Sep 8, 2023
bdb20b4
make code compile
wenym1 Sep 8, 2023
73ae14c
refactor(source): refine source macro
wenym1 Sep 11, 2023
6be9d4f
Merge branch 'main' into yiming/refactor-source-with-trait
wenym1 Sep 11, 2023
2206d07
fmt
wenym1 Sep 11, 2023
c61f588
Merge branch 'main' into yiming/refactor-source-with-trait
wenym1 Sep 13, 2023
c03e7f0
Merge branch 'yiming/refactor-source-with-trait' into yiming/refactor…
wenym1 Sep 13, 2023
8ae946f
Merge branch 'main' into yiming/refactor-source-macro
wenym1 Sep 19, 2023
0fc9b65
extract update_with_offset to split trait
wenym1 Sep 19, 2023
9a3b9b0
extract init_from_pb_source
wenym1 Sep 19, 2023
d7ca600
Merge branch 'main' into yiming/refactor-source-macro
wenym1 Sep 19, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
refactor(source): refine source macro
  • Loading branch information
wenym1 committed Sep 11, 2023
commit 73ae14cded102a4fcd53fdc64dae0f69e62ba076
2 changes: 2 additions & 0 deletions src/connector/src/lib.rs
Original file line number Diff line number Diff line change
@@ -49,6 +49,8 @@ pub mod source;

pub mod common;

pub use paste::paste;

#[derive(Clone, Debug, Default)]
pub struct ConnectorParams {
pub connector_client: Option<ConnectorClient>,
318 changes: 176 additions & 142 deletions src/connector/src/macros.rs
Original file line number Diff line number Diff line change
@@ -13,152 +13,225 @@
// limitations under the License.

#[macro_export]
macro_rules! impl_split {
($({ $variant_name:ident, $connector_name:ident, $split:ty} ),*) => {
impl From<&SplitImpl> for ConnectorSplit {
fn from(split: &SplitImpl) -> Self {
match split {
$( SplitImpl::$variant_name(inner) => ConnectorSplit { split_type: String::from($connector_name), encoded_split: inner.encode_to_bytes().to_vec() }, )*
}
macro_rules! for_all_classified_sources {
($macro:path $(,$extra_args:tt)*) => {
$macro! {
// cdc sources
{
{ Mysql },
{ Postgres },
{ Citus }
},
// other sources
{
{ Kafka, $crate::source::kafka::KafkaProperties, $crate::source::kafka::KafkaSplit },
{ Pulsar, $crate::source::pulsar::PulsarProperties, $crate::source::pulsar::PulsarSplit },
{ Kinesis, $crate::source::kinesis::KinesisProperties, $crate::source::kinesis::split::KinesisSplit },
{ Nexmark, $crate::source::nexmark::NexmarkProperties, $crate::source::nexmark::NexmarkSplit },
{ Datagen, $crate::source::datagen::DatagenProperties, $crate::source::datagen::DatagenSplit },
{ GooglePubsub, $crate::source::google_pubsub::PubsubProperties, $crate::source::google_pubsub::PubsubSplit },
{ Nats, $crate::source::nats::NatsProperties, $crate::source::nats::split::NatsSplit },
{ S3, $crate::source::filesystem::S3Properties, $crate::source::filesystem::FsSplit }
}
$(
,$extra_args
)*
}
$(
impl TryFrom<SplitImpl> for $split {
type Error = anyhow::Error;
};
}

fn try_from(split: SplitImpl) -> std::result::Result<Self, Self::Error> {
match split {
SplitImpl::$variant_name(inner) => Ok(inner),
other => Err(anyhow::anyhow!("expect {} but get {:?}", stringify!($split), other))
}
#[macro_export]
macro_rules! for_all_sources_inner {
(
{$({ $cdc_source_type:ident }),* },
{ $({ $source_variant:ident, $prop_name:ty, $split:ty }),* },
$macro:tt $(, $extra_args:tt)*
) => {
$crate::paste! {
$macro! {
{
$(
{
[< $cdc_source_type Cdc >],
$crate::source::cdc::[< $cdc_source_type CdcProperties >],
$crate::source::cdc::DebeziumCdcSplit<$crate::source::cdc::$cdc_source_type>
},
)*
$(
{ $source_variant, $prop_name, $split }
),*
}
$(,$extra_args)*
}
}
};
}

impl From<$split> for SplitImpl {
fn from(split: $split) -> SplitImpl {
SplitImpl::$variant_name(split)
}
}
#[macro_export]
macro_rules! for_all_sources {
($macro:path $(, $arg:tt )*) => {
$crate::for_all_classified_sources! {$crate::for_all_sources_inner, $macro $(,$arg)* }
};
}

)*
#[macro_export]
macro_rules! dispatch_source_enum_inner {
(
{$({$source_variant:ident, $prop_name:ty, $split:ty }),*},
$enum_name:ident,
$impl:tt,
{$inner_name:ident, $prop_type_name:ident, $split_type_name:ident},
$body:expr
) => {{
match $impl {
$(
$enum_name::$source_variant($inner_name) => {
type $prop_type_name = $prop_name;
type $split_type_name = $split;
{
$body
}
},
)*
}
}}
}

impl TryFrom<&ConnectorSplit> for SplitImpl {
type Error = anyhow::Error;
#[macro_export]
macro_rules! dispatch_source_enum {
($enum_name:ident, $impl:expr, $inner_name:tt, $body:expr) => {{
$crate::for_all_sources! {$crate::dispatch_source_enum_inner, $enum_name, { $impl }, $inner_name, $body}
}};
}

fn try_from(split: &ConnectorSplit) -> std::result::Result<Self, Self::Error> {
match split.split_type.to_lowercase().as_str() {
$( $connector_name => <$split>::restore_from_bytes(split.encoded_split.as_ref()).map(SplitImpl::$variant_name), )*
other => {
Err(anyhow!("connector '{}' is not supported", other))
#[macro_export]
macro_rules! match_source_name_str_inner {
(
{$({$source_variant:ident, $prop_name:ty, $split:ty }),*},
$source_name_str:expr,
$prop_type_name:ident,
$body:expr,
$on_other_closure:expr
) => {{
match $source_name_str {
$(
<$prop_name>::SOURCE_NAME => {
type $prop_type_name = $prop_name;
{
$body
}
}
}
},
)*
other => ($on_other_closure)(other),
}
}}
}

impl SplitMetaData for SplitImpl {
fn id(&self) -> SplitId {
match self {
$( Self::$variant_name(inner) => inner.id(), )*
}
}
#[macro_export]
macro_rules! match_source_name_str {
($source_name_str:expr, $prop_type_name:ident, $body:expr, $on_other_closure:expr) => {{
$crate::for_all_sources! {
$crate::match_source_name_str_inner,
{ $source_name_str },
$prop_type_name,
{ $body },
{ $on_other_closure }
}
}};
}

fn encode_to_json(&self) -> JsonbVal {
use serde_json::json;
let inner = self.encode_to_json_inner().take();
json!({ SPLIT_TYPE_FIELD: self.get_type(), SPLIT_INFO_FIELD: inner}).into()
}
#[macro_export]
macro_rules! dispatch_split_impl {
($impl:expr, $inner_name:ident, $prop_type_name:ident, $body:expr) => {{
use $crate::source::SplitImpl;
$crate::dispatch_source_enum! {SplitImpl, { $impl }, {$inner_name, $prop_type_name, IgnoreSplitType}, $body}
}};
}

fn restore_from_json(value: JsonbVal) -> Result<Self> {
let mut value = value.take();
let json_obj = value.as_object_mut().unwrap();
let split_type = json_obj.remove(SPLIT_TYPE_FIELD).unwrap().as_str().unwrap().to_string();
let inner_value = json_obj.remove(SPLIT_INFO_FIELD).unwrap();
Self::restore_from_json_inner(&split_type, inner_value.into())
}
#[macro_export]
macro_rules! impl_split {
({$({ $variant_name:ident, $prop_name:ty, $split:ty}),*}) => {

#[derive(Debug, Clone, EnumAsInner, PartialEq, Hash)]
pub enum SplitImpl {
$(
$variant_name($split),
)*
}

impl SplitImpl {
pub fn get_type(&self) -> String {
match self {
$( Self::$variant_name(_) => $connector_name, )*
}
.to_string()
}
$(
impl TryFrom<SplitImpl> for $split {
type Error = anyhow::Error;

pub fn update_in_place(&mut self, start_offset: String) -> anyhow::Result<()> {
match self {
$( Self::$variant_name(inner) => inner.update_with_offset(start_offset)?, )*
fn try_from(split: SplitImpl) -> std::result::Result<Self, Self::Error> {
match split {
SplitImpl::$variant_name(inner) => Ok(inner),
other => Err(anyhow::anyhow!("expect {} but get {:?}", stringify!($split), other))
}
}
Ok(())
}

pub fn encode_to_json_inner(&self) -> JsonbVal {
match self {
$( Self::$variant_name(inner) => inner.encode_to_json(), )*
impl From<$split> for SplitImpl {
fn from(split: $split) -> SplitImpl {
SplitImpl::$variant_name(split)
}
}

fn restore_from_json_inner(split_type: &str, value: JsonbVal) -> Result<Self> {
match split_type.to_lowercase().as_str() {
$( $connector_name => <$split>::restore_from_json(value).map(SplitImpl::$variant_name), )*
other => {
Err(anyhow!("connector '{}' is not supported", other))
}
}
}
}
)*
}
}

#[macro_export]
macro_rules! dispatch_source_prop {
($impl:expr, $source_prop:tt, $body:expr) => {{
use $crate::source::ConnectorProperties;
$crate::dispatch_source_enum! {ConnectorProperties, { $impl }, {$source_prop, IgnorePropType, IgnoreSplitType}, {$body}}
Copy link
Contributor

@StrikeW StrikeW Oct 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I revisit this pr to want to understand how does dispatch_source_prop! work, but it is hard to get it. Because these macros are nested instead of in a flatten structure.
For example to understand the third arg {$source_prop, IgnorePropType, IgnoreSplitType}, I need to dive into other macros level by level. What's worse is that IDE can't expand these macro_rules! directly without calling it in a function. Is there a way to show the expanded code of a macro_rules! without calling it just like a function definition? Or coud we flatten these macros a bit? cc @wenym1

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way to show the expanded code of a macro_rules! without calling it just like a function definition

You can write an example in a random function and use cargo expand to see the expanded code.

coud we flatten these macros a bit

This main idea of this PR is to declare all source related information in only one place so that when developing a new source, we don't have to declare these information in places previously scattered around in our codebase.

}};
}

#[macro_export]
macro_rules! impl_connector_properties {
($({ $variant_name:ident, $connector_name:ident } ),*) => {
impl ConnectorProperties {
pub fn extract(mut props: HashMap<String, String>) -> Result<Self> {
const UPSTREAM_SOURCE_KEY: &str = "connector";
let connector = props.remove(UPSTREAM_SOURCE_KEY).ok_or_else(|| anyhow!("Must specify 'connector' in WITH clause"))?;
use $crate::source::cdc::CDC_CONNECTOR_NAME_SUFFIX;
if connector.ends_with(CDC_CONNECTOR_NAME_SUFFIX) {
ConnectorProperties::new_cdc_properties(&connector, props)
} else {
let json_value = serde_json::to_value(props).map_err(|e| anyhow!(e))?;
match connector.to_lowercase().as_str() {
$(
$connector_name => {
serde_json::from_value(json_value).map_err(|e| anyhow!(e.to_string())).map(Self::$variant_name)
},
)*
_ => {
Err(anyhow!("connector '{}' is not supported", connector,))
}
}
({$({ $variant_name:ident, $prop_name:ty, $split:ty}),*}) => {
#[derive(Clone, Debug)]
pub enum ConnectorProperties {
$(
$variant_name(Box<$prop_name>),
)*
}

$(
impl From<$prop_name> for ConnectorProperties {
fn from(prop: $prop_name) -> ConnectorProperties {
ConnectorProperties::$variant_name(Box::new(prop))
}
}
}
)*
}
}

#[macro_export]
macro_rules! impl_cdc_source_type {
($({$source_type:ident, $name:expr }),*) => {
(
{$({$cdc_source_type:tt}),*},
{$($_ignore:tt),*}
) => {
$(
paste!{
$crate::paste!{
#[derive(Clone, Debug, Default, PartialEq, Eq, Hash)]
pub struct $source_type;
impl CdcSourceTypeTrait for $source_type {
const CDC_CONNECTOR_NAME: &'static str = concat!($name, "-cdc");
pub struct $cdc_source_type;
impl CdcSourceTypeTrait for $cdc_source_type {
const CDC_CONNECTOR_NAME: &'static str = concat!(stringify!([<$cdc_source_type:lower>]), "-cdc");
fn source_type() -> CdcSourceType {
CdcSourceType::$source_type
CdcSourceType::$cdc_source_type
}
}

pub type [< $source_type DebeziumSplitEnumerator >] = DebeziumSplitEnumerator<$source_type>;
pub type [<$cdc_source_type CdcProperties>] = CdcProperties<$cdc_source_type>;
}
)*

pub enum CdcSourceType {
$(
$source_type,
$cdc_source_type,
)*
}

@@ -167,7 +240,7 @@ macro_rules! impl_cdc_source_type {
match value {
PbSourceType::Unspecified => unreachable!(),
$(
PbSourceType::$source_type => CdcSourceType::$source_type,
PbSourceType::$cdc_source_type => CdcSourceType::$cdc_source_type,
)*
}
}
@@ -177,47 +250,8 @@ macro_rules! impl_cdc_source_type {
fn from(this: CdcSourceType) -> PbSourceType {
match this {
$(
CdcSourceType::$source_type => PbSourceType::$source_type,
)*
}
}
}

impl ConnectorProperties {
pub(crate) fn new_cdc_properties(
connector_name: &str,
properties: HashMap<String, String>,
) -> std::result::Result<Self, anyhow::Error> {
match connector_name {
$(
$source_type::CDC_CONNECTOR_NAME => paste! {
Ok(Self::[< $source_type Cdc >](Box::new(CdcProperties::<$source_type> {
props: properties,
..Default::default()
})))
},
)*
_ => Err(anyhow::anyhow!("unexpected cdc connector '{}'", connector_name,)),
}
}

pub fn init_cdc_properties(&mut self, table_schema: PbTableSchema) {
match self {
$(
paste! {ConnectorProperties:: [< $source_type Cdc >](c)} => {
c.table_schema = table_schema;
}
)*
_ => {}
}
}

pub fn is_cdc_connector(&self) -> bool {
match self {
$(
paste! {ConnectorProperties:: [< $source_type Cdc >](_)} => true,
CdcSourceType::$cdc_source_type => PbSourceType::$cdc_source_type,
)*
_ => false,
}
}
}
Loading