diff --git a/src/connector/src/lib.rs b/src/connector/src/lib.rs index f103346bf14fa..0d1a015f97663 100644 --- a/src/connector/src/lib.rs +++ b/src/connector/src/lib.rs @@ -50,6 +50,8 @@ pub mod source; pub mod common; +pub use paste::paste; + #[derive(Clone, Debug, Default)] pub struct ConnectorParams { pub connector_client: Option, diff --git a/src/connector/src/macros.rs b/src/connector/src/macros.rs index f18bb18462749..792e2066abcca 100644 --- a/src/connector/src/macros.rs +++ b/src/connector/src/macros.rs @@ -13,152 +13,225 @@ // limitations under the License. #[macro_export] -macro_rules! impl_split { - ($({ $variant_name:ident, $connector_name:ident, $split:ty} ),*) => { - impl From<&SplitImpl> for ConnectorSplit { - fn from(split: &SplitImpl) -> Self { - match split { - $( SplitImpl::$variant_name(inner) => ConnectorSplit { split_type: String::from($connector_name), encoded_split: inner.encode_to_bytes().to_vec() }, )* - } +macro_rules! for_all_classified_sources { + ($macro:path $(,$extra_args:tt)*) => { + $macro! { + // cdc sources + { + { Mysql }, + { Postgres }, + { Citus } + }, + // other sources + { + { Kafka, $crate::source::kafka::KafkaProperties, $crate::source::kafka::KafkaSplit }, + { Pulsar, $crate::source::pulsar::PulsarProperties, $crate::source::pulsar::PulsarSplit }, + { Kinesis, $crate::source::kinesis::KinesisProperties, $crate::source::kinesis::split::KinesisSplit }, + { Nexmark, $crate::source::nexmark::NexmarkProperties, $crate::source::nexmark::NexmarkSplit }, + { Datagen, $crate::source::datagen::DatagenProperties, $crate::source::datagen::DatagenSplit }, + { GooglePubsub, $crate::source::google_pubsub::PubsubProperties, $crate::source::google_pubsub::PubsubSplit }, + { Nats, $crate::source::nats::NatsProperties, $crate::source::nats::split::NatsSplit }, + { S3, $crate::source::filesystem::S3Properties, $crate::source::filesystem::FsSplit } } + $( + ,$extra_args + )* } - $( - impl TryFrom for $split { - type Error = anyhow::Error; + }; +} - fn try_from(split: SplitImpl) -> std::result::Result { - match split { - SplitImpl::$variant_name(inner) => Ok(inner), - other => Err(anyhow::anyhow!("expect {} but get {:?}", stringify!($split), other)) - } +#[macro_export] +macro_rules! for_all_sources_inner { + ( + {$({ $cdc_source_type:ident }),* }, + { $({ $source_variant:ident, $prop_name:ty, $split:ty }),* }, + $macro:tt $(, $extra_args:tt)* + ) => { + $crate::paste! { + $macro! { + { + $( + { + [< $cdc_source_type Cdc >], + $crate::source::cdc::[< $cdc_source_type CdcProperties >], + $crate::source::cdc::DebeziumCdcSplit<$crate::source::cdc::$cdc_source_type> + }, + )* + $( + { $source_variant, $prop_name, $split } + ),* } + $(,$extra_args)* } + } + }; +} - impl From<$split> for SplitImpl { - fn from(split: $split) -> SplitImpl { - SplitImpl::$variant_name(split) - } - } +#[macro_export] +macro_rules! for_all_sources { + ($macro:path $(, $arg:tt )*) => { + $crate::for_all_classified_sources! {$crate::for_all_sources_inner, $macro $(,$arg)* } + }; +} - )* +#[macro_export] +macro_rules! dispatch_source_enum_inner { + ( + {$({$source_variant:ident, $prop_name:ty, $split:ty }),*}, + $enum_name:ident, + $impl:tt, + {$inner_name:ident, $prop_type_name:ident, $split_type_name:ident}, + $body:expr + ) => {{ + match $impl { + $( + $enum_name::$source_variant($inner_name) => { + type $prop_type_name = $prop_name; + type $split_type_name = $split; + { + $body + } + }, + )* + } + }} +} - impl TryFrom<&ConnectorSplit> for SplitImpl { - type Error = anyhow::Error; +#[macro_export] +macro_rules! dispatch_source_enum { + ($enum_name:ident, $impl:expr, $inner_name:tt, $body:expr) => {{ + $crate::for_all_sources! {$crate::dispatch_source_enum_inner, $enum_name, { $impl }, $inner_name, $body} + }}; +} - fn try_from(split: &ConnectorSplit) -> std::result::Result { - match split.split_type.to_lowercase().as_str() { - $( $connector_name => <$split>::restore_from_bytes(split.encoded_split.as_ref()).map(SplitImpl::$variant_name), )* - other => { - Err(anyhow!("connector '{}' is not supported", other)) +#[macro_export] +macro_rules! match_source_name_str_inner { + ( + {$({$source_variant:ident, $prop_name:ty, $split:ty }),*}, + $source_name_str:expr, + $prop_type_name:ident, + $body:expr, + $on_other_closure:expr + ) => {{ + match $source_name_str { + $( + <$prop_name>::SOURCE_NAME => { + type $prop_type_name = $prop_name; + { + $body } - } - } + }, + )* + other => ($on_other_closure)(other), } + }} +} - impl SplitMetaData for SplitImpl { - fn id(&self) -> SplitId { - match self { - $( Self::$variant_name(inner) => inner.id(), )* - } - } +#[macro_export] +macro_rules! match_source_name_str { + ($source_name_str:expr, $prop_type_name:ident, $body:expr, $on_other_closure:expr) => {{ + $crate::for_all_sources! { + $crate::match_source_name_str_inner, + { $source_name_str }, + $prop_type_name, + { $body }, + { $on_other_closure } + } + }}; +} - fn encode_to_json(&self) -> JsonbVal { - use serde_json::json; - let inner = self.encode_to_json_inner().take(); - json!({ SPLIT_TYPE_FIELD: self.get_type(), SPLIT_INFO_FIELD: inner}).into() - } +#[macro_export] +macro_rules! dispatch_split_impl { + ($impl:expr, $inner_name:ident, $prop_type_name:ident, $body:expr) => {{ + use $crate::source::SplitImpl; + $crate::dispatch_source_enum! {SplitImpl, { $impl }, {$inner_name, $prop_type_name, IgnoreSplitType}, $body} + }}; +} - fn restore_from_json(value: JsonbVal) -> Result { - let mut value = value.take(); - let json_obj = value.as_object_mut().unwrap(); - let split_type = json_obj.remove(SPLIT_TYPE_FIELD).unwrap().as_str().unwrap().to_string(); - let inner_value = json_obj.remove(SPLIT_INFO_FIELD).unwrap(); - Self::restore_from_json_inner(&split_type, inner_value.into()) - } +#[macro_export] +macro_rules! impl_split { + ({$({ $variant_name:ident, $prop_name:ty, $split:ty}),*}) => { + + #[derive(Debug, Clone, EnumAsInner, PartialEq, Hash)] + pub enum SplitImpl { + $( + $variant_name($split), + )* } - impl SplitImpl { - pub fn get_type(&self) -> String { - match self { - $( Self::$variant_name(_) => $connector_name, )* - } - .to_string() - } + $( + impl TryFrom for $split { + type Error = anyhow::Error; - pub fn update_in_place(&mut self, start_offset: String) -> anyhow::Result<()> { - match self { - $( Self::$variant_name(inner) => inner.update_with_offset(start_offset)?, )* + fn try_from(split: SplitImpl) -> std::result::Result { + match split { + SplitImpl::$variant_name(inner) => Ok(inner), + other => Err(anyhow::anyhow!("expect {} but get {:?}", stringify!($split), other)) + } } - Ok(()) } - pub fn encode_to_json_inner(&self) -> JsonbVal { - match self { - $( Self::$variant_name(inner) => inner.encode_to_json(), )* + impl From<$split> for SplitImpl { + fn from(split: $split) -> SplitImpl { + SplitImpl::$variant_name(split) } } - fn restore_from_json_inner(split_type: &str, value: JsonbVal) -> Result { - match split_type.to_lowercase().as_str() { - $( $connector_name => <$split>::restore_from_json(value).map(SplitImpl::$variant_name), )* - other => { - Err(anyhow!("connector '{}' is not supported", other)) - } - } - } - } + )* } } +#[macro_export] +macro_rules! dispatch_source_prop { + ($impl:expr, $source_prop:tt, $body:expr) => {{ + use $crate::source::ConnectorProperties; + $crate::dispatch_source_enum! {ConnectorProperties, { $impl }, {$source_prop, IgnorePropType, IgnoreSplitType}, {$body}} + }}; +} + #[macro_export] macro_rules! impl_connector_properties { - ($({ $variant_name:ident, $connector_name:ident } ),*) => { - impl ConnectorProperties { - pub fn extract(mut props: HashMap) -> Result { - const UPSTREAM_SOURCE_KEY: &str = "connector"; - let connector = props.remove(UPSTREAM_SOURCE_KEY).ok_or_else(|| anyhow!("Must specify 'connector' in WITH clause"))?; - use $crate::source::cdc::CDC_CONNECTOR_NAME_SUFFIX; - if connector.ends_with(CDC_CONNECTOR_NAME_SUFFIX) { - ConnectorProperties::new_cdc_properties(&connector, props) - } else { - let json_value = serde_json::to_value(props).map_err(|e| anyhow!(e))?; - match connector.to_lowercase().as_str() { - $( - $connector_name => { - serde_json::from_value(json_value).map_err(|e| anyhow!(e.to_string())).map(Self::$variant_name) - }, - )* - _ => { - Err(anyhow!("connector '{}' is not supported", connector,)) - } - } + ({$({ $variant_name:ident, $prop_name:ty, $split:ty}),*}) => { + #[derive(Clone, Debug)] + pub enum ConnectorProperties { + $( + $variant_name(Box<$prop_name>), + )* + } + + $( + impl From<$prop_name> for ConnectorProperties { + fn from(prop: $prop_name) -> ConnectorProperties { + ConnectorProperties::$variant_name(Box::new(prop)) } } - } + )* } } #[macro_export] macro_rules! impl_cdc_source_type { - ($({$source_type:ident, $name:expr }),*) => { + ( + {$({$cdc_source_type:tt}),*}, + {$($_ignore:tt),*} + ) => { $( - paste!{ + $crate::paste!{ #[derive(Clone, Debug, Default, PartialEq, Eq, Hash)] - pub struct $source_type; - impl CdcSourceTypeTrait for $source_type { - const CDC_CONNECTOR_NAME: &'static str = concat!($name, "-cdc"); + pub struct $cdc_source_type; + impl CdcSourceTypeTrait for $cdc_source_type { + const CDC_CONNECTOR_NAME: &'static str = concat!(stringify!([<$cdc_source_type:lower>]), "-cdc"); fn source_type() -> CdcSourceType { - CdcSourceType::$source_type + CdcSourceType::$cdc_source_type } } - - pub type [< $source_type DebeziumSplitEnumerator >] = DebeziumSplitEnumerator<$source_type>; + pub type [<$cdc_source_type CdcProperties>] = CdcProperties<$cdc_source_type>; } )* pub enum CdcSourceType { $( - $source_type, + $cdc_source_type, )* } @@ -167,7 +240,7 @@ macro_rules! impl_cdc_source_type { match value { PbSourceType::Unspecified => unreachable!(), $( - PbSourceType::$source_type => CdcSourceType::$source_type, + PbSourceType::$cdc_source_type => CdcSourceType::$cdc_source_type, )* } } @@ -177,47 +250,8 @@ macro_rules! impl_cdc_source_type { fn from(this: CdcSourceType) -> PbSourceType { match this { $( - CdcSourceType::$source_type => PbSourceType::$source_type, - )* - } - } - } - - impl ConnectorProperties { - pub(crate) fn new_cdc_properties( - connector_name: &str, - properties: HashMap, - ) -> std::result::Result { - match connector_name { - $( - $source_type::CDC_CONNECTOR_NAME => paste! { - Ok(Self::[< $source_type Cdc >](Box::new(CdcProperties::<$source_type> { - props: properties, - ..Default::default() - }))) - }, - )* - _ => Err(anyhow::anyhow!("unexpected cdc connector '{}'", connector_name,)), - } - } - - pub fn init_cdc_properties(&mut self, table_schema: PbTableSchema) { - match self { - $( - paste! {ConnectorProperties:: [< $source_type Cdc >](c)} => { - c.table_schema = table_schema; - } - )* - _ => {} - } - } - - pub fn is_cdc_connector(&self) -> bool { - match self { - $( - paste! {ConnectorProperties:: [< $source_type Cdc >](_)} => true, + CdcSourceType::$cdc_source_type => PbSourceType::$cdc_source_type, )* - _ => false, } } } diff --git a/src/connector/src/source/base.rs b/src/connector/src/source/base.rs index da11310d78d56..d02645385b81f 100644 --- a/src/connector/src/source/base.rs +++ b/src/connector/src/source/base.rs @@ -27,41 +27,46 @@ use risingwave_common::array::StreamChunk; use risingwave_common::catalog::TableId; use risingwave_common::error::{ErrorSuppressor, RwError}; use risingwave_common::types::{JsonbVal, Scalar}; +use risingwave_pb::catalog::PbSource; use risingwave_pb::source::ConnectorSplit; use risingwave_rpc_client::ConnectorClient; +use serde::de::DeserializeOwned; use super::datagen::DatagenMeta; -use super::filesystem::{FsSplit, S3Properties, S3_CONNECTOR}; +use super::filesystem::FsSplit; use super::google_pubsub::GooglePubsubMeta; use super::kafka::KafkaMeta; use super::monitor::SourceMetrics; use super::nexmark::source::message::NexmarkMeta; use crate::parser::ParserConfig; -use crate::source::cdc::{ - CdcProperties, Citus, DebeziumCdcSplit, Mysql, Postgres, CITUS_CDC_CONNECTOR, - MYSQL_CDC_CONNECTOR, POSTGRES_CDC_CONNECTOR, -}; pub(crate) use crate::source::common::CommonSplitReader; -use crate::source::datagen::{DatagenProperties, DatagenSplit, DATAGEN_CONNECTOR}; -use crate::source::google_pubsub::{PubsubProperties, PubsubSplit, GOOGLE_PUBSUB_CONNECTOR}; -use crate::source::kafka::{KafkaProperties, KafkaSplit, KAFKA_CONNECTOR}; -use crate::source::kinesis::split::KinesisSplit; -use crate::source::kinesis::{KinesisProperties, KINESIS_CONNECTOR}; use crate::source::monitor::EnumeratorMetrics; -use crate::source::nats::source::NatsSplit; -use crate::source::nats::{NatsProperties, NATS_CONNECTOR}; -use crate::source::nexmark::{NexmarkProperties, NexmarkSplit, NEXMARK_CONNECTOR}; -use crate::source::pulsar::{PulsarProperties, PulsarSplit, PULSAR_CONNECTOR}; -use crate::{impl_connector_properties, impl_split}; +use crate::{ + dispatch_source_prop, dispatch_split_impl, for_all_sources, impl_connector_properties, + impl_split, match_source_name_str, +}; const SPLIT_TYPE_FIELD: &str = "split_type"; const SPLIT_INFO_FIELD: &str = "split_info"; -pub trait SourceProperties: Clone { +pub trait TryFromHashmap: Sized { + fn try_from_hashmap(props: HashMap) -> Result; +} + +pub trait SourceProperties: TryFromHashmap + Clone { const SOURCE_NAME: &'static str; type Split: SplitMetaData + TryFrom + Into; type SplitEnumerator: SplitEnumerator; type SplitReader: SplitReader; + + fn init_from_pb_source(&mut self, _source: &PbSource) {} +} + +impl TryFromHashmap for P { + fn try_from_hashmap(props: HashMap) -> Result { + let json_value = serde_json::to_value(props).map_err(|e| anyhow!(e))?; + serde_json::from_value::

(json_value).map_err(|e| anyhow!(e.to_string())) + } } pub async fn create_split_reader( @@ -289,61 +294,60 @@ pub trait SplitReader: Sized + Send { fn into_stream(self) -> BoxSourceWithStateStream; } -#[derive(Clone, Debug)] -pub enum ConnectorProperties { - Kafka(Box), - Pulsar(Box), - Kinesis(Box), - Nexmark(Box), - Datagen(Box), - S3(Box), - MysqlCdc(Box>), - PostgresCdc(Box>), - CitusCdc(Box>), - GooglePubsub(Box), - Nats(Box), -} - -#[macro_export] -macro_rules! dispatch_source_prop { - ($impl:expr, $source_prop:ident, $body:tt) => {{ - use $crate::source::base::ConnectorProperties; - - match $impl { - ConnectorProperties::Kafka($source_prop) => $body, - ConnectorProperties::Pulsar($source_prop) => $body, - ConnectorProperties::Kinesis($source_prop) => $body, - ConnectorProperties::Nexmark($source_prop) => $body, - ConnectorProperties::Datagen($source_prop) => $body, - ConnectorProperties::S3($source_prop) => $body, - ConnectorProperties::MysqlCdc($source_prop) => $body, - ConnectorProperties::PostgresCdc($source_prop) => $body, - ConnectorProperties::CitusCdc($source_prop) => $body, - ConnectorProperties::GooglePubsub($source_prop) => $body, - ConnectorProperties::Nats($source_prop) => $body, - } - }}; -} +for_all_sources!(impl_connector_properties); impl ConnectorProperties { + pub fn extract(mut props: HashMap) -> Result { + const UPSTREAM_SOURCE_KEY: &str = "connector"; + let connector = props + .remove(UPSTREAM_SOURCE_KEY) + .ok_or_else(|| anyhow!("Must specify 'connector' in WITH clause"))?; + match_source_name_str!( + connector.to_lowercase().as_str(), + PropType, + PropType::try_from_hashmap(props).map(ConnectorProperties::from), + |other| Err(anyhow!("connector '{}' is not supported", other)) + ) + } + + pub fn init_from_pb_source(&mut self, source: &PbSource) { + dispatch_source_prop!(self, prop, prop.init_from_pb_source(source)) + } + pub fn support_multiple_splits(&self) -> bool { matches!(self, ConnectorProperties::Kafka(_)) } } -#[derive(Debug, Clone, EnumAsInner, PartialEq, Hash)] -pub enum SplitImpl { - Kafka(KafkaSplit), - Pulsar(PulsarSplit), - Kinesis(KinesisSplit), - Nexmark(NexmarkSplit), - Datagen(DatagenSplit), - GooglePubsub(PubsubSplit), - MysqlCdc(DebeziumCdcSplit), - PostgresCdc(DebeziumCdcSplit), - CitusCdc(DebeziumCdcSplit), - Nats(NatsSplit), - S3(FsSplit), +for_all_sources!(impl_split); + +impl From<&SplitImpl> for ConnectorSplit { + fn from(split: &SplitImpl) -> Self { + dispatch_split_impl!(split, inner, SourcePropType, { + ConnectorSplit { + split_type: String::from(SourcePropType::SOURCE_NAME), + encoded_split: inner.encode_to_bytes().to_vec(), + } + }) + } +} + +impl TryFrom<&ConnectorSplit> for SplitImpl { + type Error = anyhow::Error; + + fn try_from(split: &ConnectorSplit) -> std::result::Result { + match_source_name_str!( + split.split_type.to_lowercase().as_str(), + PropType, + { + ::Split::restore_from_bytes( + split.encoded_split.as_ref(), + ) + .map(Into::into) + }, + |other| Err(anyhow!("connector '{}' is not supported", other)) + ) + } } // for the `FsSourceExecutor` @@ -364,29 +368,68 @@ impl SplitImpl { } } -impl_connector_properties! { - { Kafka, KAFKA_CONNECTOR }, - { Pulsar, PULSAR_CONNECTOR }, - { Kinesis, KINESIS_CONNECTOR }, - { Nexmark, NEXMARK_CONNECTOR }, - { Datagen, DATAGEN_CONNECTOR }, - { S3, S3_CONNECTOR }, - { GooglePubsub, GOOGLE_PUBSUB_CONNECTOR}, - { Nats, NATS_CONNECTOR } +impl SplitImpl { + fn restore_from_json_inner(split_type: &str, value: JsonbVal) -> Result { + match_source_name_str!( + split_type.to_lowercase().as_str(), + PropType, + ::Split::restore_from_json(value).map(Into::into), + |other| Err(anyhow!("connector '{}' is not supported", other)) + ) + } } -impl_split! { - { Kafka, KAFKA_CONNECTOR, KafkaSplit }, - { Pulsar, PULSAR_CONNECTOR, PulsarSplit }, - { Kinesis, KINESIS_CONNECTOR, KinesisSplit }, - { Nexmark, NEXMARK_CONNECTOR, NexmarkSplit }, - { Datagen, DATAGEN_CONNECTOR, DatagenSplit }, - { GooglePubsub, GOOGLE_PUBSUB_CONNECTOR, PubsubSplit }, - { MysqlCdc, MYSQL_CDC_CONNECTOR, DebeziumCdcSplit }, - { PostgresCdc, POSTGRES_CDC_CONNECTOR, DebeziumCdcSplit }, - { CitusCdc, CITUS_CDC_CONNECTOR, DebeziumCdcSplit }, - { S3, S3_CONNECTOR, FsSplit }, - { Nats, NATS_CONNECTOR, NatsSplit } +impl SplitMetaData for SplitImpl { + fn id(&self) -> SplitId { + dispatch_split_impl!(self, inner, IgnoreType, inner.id()) + } + + fn encode_to_json(&self) -> JsonbVal { + use serde_json::json; + let inner = self.encode_to_json_inner().take(); + json!({ SPLIT_TYPE_FIELD: self.get_type(), SPLIT_INFO_FIELD: inner}).into() + } + + fn restore_from_json(value: JsonbVal) -> Result { + let mut value = value.take(); + let json_obj = value.as_object_mut().unwrap(); + let split_type = json_obj + .remove(SPLIT_TYPE_FIELD) + .unwrap() + .as_str() + .unwrap() + .to_string(); + let inner_value = json_obj.remove(SPLIT_INFO_FIELD).unwrap(); + Self::restore_from_json_inner(&split_type, inner_value.into()) + } + + fn update_with_offset(&mut self, start_offset: String) -> Result<()> { + dispatch_split_impl!( + self, + inner, + IgnoreType, + inner.update_with_offset(start_offset) + ) + } +} + +impl SplitImpl { + pub fn get_type(&self) -> String { + dispatch_split_impl!(self, _ignored, PropType, { + PropType::SOURCE_NAME.to_string() + }) + } + + pub fn update_in_place(&mut self, start_offset: String) -> Result<()> { + dispatch_split_impl!(self, inner, IgnoreType, { + inner.update_with_offset(start_offset)? + }); + Ok(()) + } + + pub fn encode_to_json_inner(&self) -> JsonbVal { + dispatch_split_impl!(self, inner, IgnoreType, inner.encode_to_json()) + } } pub type DataType = risingwave_common::types::DataType; @@ -447,6 +490,7 @@ pub trait SplitMetaData: Sized { fn encode_to_json(&self) -> JsonbVal; fn restore_from_json(value: JsonbVal) -> Result; + fn update_with_offset(&mut self, start_offset: String) -> anyhow::Result<()>; } /// [`ConnectorState`] maintains the consuming splits' info. In specific split readers, @@ -461,7 +505,8 @@ mod tests { use nexmark::event::EventType; use super::*; - use crate::source::cdc::MySqlCdcSplit; + use crate::source::cdc::{DebeziumCdcSplit, MySqlCdcSplit}; + use crate::source::kafka::KafkaSplit; #[test] fn test_split_impl_get_fn() -> Result<()> { diff --git a/src/connector/src/source/cdc/mod.rs b/src/connector/src/source/cdc/mod.rs index e7054060a6f2b..1d795a7141e84 100644 --- a/src/connector/src/source/cdc/mod.rs +++ b/src/connector/src/source/cdc/mod.rs @@ -19,14 +19,15 @@ use std::collections::HashMap; use std::marker::PhantomData; pub use enumerator::*; -use paste::paste; +use itertools::Itertools; use risingwave_common::catalog::{ColumnDesc, Field, Schema}; +use risingwave_pb::catalog::PbSource; use risingwave_pb::connector_service::{PbSourceType, PbTableSchema, SourceType, TableSchema}; pub use source::*; pub use split::*; -use crate::impl_cdc_source_type; -use crate::source::{ConnectorProperties, SourceProperties, SplitImpl}; +use crate::source::{SourceProperties, SplitImpl, TryFromHashmap}; +use crate::{for_all_classified_sources, impl_cdc_source_type}; pub const CDC_CONNECTOR_NAME_SUFFIX: &str = "-cdc"; @@ -39,7 +40,7 @@ pub trait CdcSourceTypeTrait: Send + Sync + Clone + 'static { fn source_type() -> CdcSourceType; } -impl_cdc_source_type!({ Mysql, "mysql" }, { Postgres, "postgres" }, { Citus, "citus" }); +for_all_classified_sources!(impl_cdc_source_type); #[derive(Clone, Debug, Default)] pub struct CdcProperties { @@ -52,6 +53,16 @@ pub struct CdcProperties { pub _phantom: PhantomData, } +impl TryFromHashmap for CdcProperties { + fn try_from_hashmap(props: HashMap) -> anyhow::Result { + Ok(CdcProperties { + props, + table_schema: Default::default(), + _phantom: PhantomData, + }) + } +} + impl SourceProperties for CdcProperties where DebeziumCdcSplit: TryFrom + Into, @@ -62,6 +73,31 @@ where type SplitReader = CdcSplitReader; const SOURCE_NAME: &'static str = T::CDC_CONNECTOR_NAME; + + fn init_from_pb_source(&mut self, source: &PbSource) { + let pk_indices = source + .pk_column_ids + .iter() + .map(|&id| { + source + .columns + .iter() + .position(|col| col.column_desc.as_ref().unwrap().column_id == id) + .unwrap() as u32 + }) + .collect_vec(); + + let table_schema = PbTableSchema { + columns: source + .columns + .iter() + .flat_map(|col| &col.column_desc) + .cloned() + .collect(), + pk_indices, + }; + self.table_schema = table_schema; + } } impl CdcProperties { diff --git a/src/connector/src/source/cdc/split.rs b/src/connector/src/source/cdc/split.rs index 9d13a3c5a6eac..3bd24992f75f0 100644 --- a/src/connector/src/source/cdc/split.rs +++ b/src/connector/src/source/cdc/split.rs @@ -132,6 +132,7 @@ pub struct DebeziumCdcSplit { impl SplitMetaData for DebeziumCdcSplit { fn id(&self) -> SplitId { + // TODO: may check T to get the specific cdc type assert!(self.mysql_split.is_some() || self.pg_split.is_some()); if let Some(split) = &self.mysql_split { return format!("{}", split.inner.split_id).into(); @@ -149,6 +150,17 @@ impl SplitMetaData for DebeziumCdcSplit { fn restore_from_json(value: JsonbVal) -> anyhow::Result { serde_json::from_value(value.take()).map_err(|e| anyhow!(e)) } + + fn update_with_offset(&mut self, start_offset: String) -> anyhow::Result<()> { + // TODO: may check T to get the specific cdc type + assert!(self.mysql_split.is_some() || self.pg_split.is_some()); + if let Some(split) = &mut self.mysql_split { + split.update_with_offset(start_offset)? + } else if let Some(split) = &mut self.pg_split { + split.update_with_offset(start_offset)? + } + Ok(()) + } } impl DebeziumCdcSplit { @@ -196,14 +208,4 @@ impl DebeziumCdcSplit { } unreachable!("invalid debezium split") } - - pub fn update_with_offset(&mut self, start_offset: String) -> anyhow::Result<()> { - assert!(self.mysql_split.is_some() || self.pg_split.is_some()); - if let Some(split) = &mut self.mysql_split { - split.update_with_offset(start_offset)? - } else if let Some(split) = &mut self.pg_split { - split.update_with_offset(start_offset)? - } - Ok(()) - } } diff --git a/src/connector/src/source/datagen/split.rs b/src/connector/src/source/datagen/split.rs index 08babee97fcb9..617b933728837 100644 --- a/src/connector/src/source/datagen/split.rs +++ b/src/connector/src/source/datagen/split.rs @@ -39,6 +39,11 @@ impl SplitMetaData for DatagenSplit { fn encode_to_json(&self) -> JsonbVal { serde_json::to_value(self.clone()).unwrap().into() } + + fn update_with_offset(&mut self, start_offset: String) -> anyhow::Result<()> { + self.start_offset = Some(start_offset.as_str().parse::().unwrap()); + Ok(()) + } } impl DatagenSplit { @@ -49,9 +54,4 @@ impl DatagenSplit { start_offset, } } - - pub fn update_with_offset(&mut self, start_offset: String) -> anyhow::Result<()> { - self.start_offset = Some(start_offset.as_str().parse::().unwrap()); - Ok(()) - } } diff --git a/src/connector/src/source/filesystem/file_common.rs b/src/connector/src/source/filesystem/file_common.rs index 4711a5080c5ae..d4328289b547f 100644 --- a/src/connector/src/source/filesystem/file_common.rs +++ b/src/connector/src/source/filesystem/file_common.rs @@ -38,6 +38,12 @@ impl SplitMetaData for FsSplit { fn encode_to_json(&self) -> JsonbVal { serde_json::to_value(self.clone()).unwrap().into() } + + fn update_with_offset(&mut self, start_offset: String) -> anyhow::Result<()> { + let offset = start_offset.parse().unwrap(); + self.offset = offset; + Ok(()) + } } impl FsSplit { @@ -48,10 +54,4 @@ impl FsSplit { size, } } - - pub fn update_with_offset(&mut self, start_offset: String) -> anyhow::Result<()> { - let offset = start_offset.parse().unwrap(); - self.offset = offset; - Ok(()) - } } diff --git a/src/connector/src/source/google_pubsub/split.rs b/src/connector/src/source/google_pubsub/split.rs index d623a9337cf33..1f598eb6852d4 100644 --- a/src/connector/src/source/google_pubsub/split.rs +++ b/src/connector/src/source/google_pubsub/split.rs @@ -35,13 +35,6 @@ pub struct PubsubSplit { pub(crate) stop_offset: Option, } -impl PubsubSplit { - pub fn update_with_offset(&mut self, start_offset: String) -> anyhow::Result<()> { - self.start_offset = Some(start_offset); - Ok(()) - } -} - impl SplitMetaData for PubsubSplit { fn restore_from_json(value: JsonbVal) -> anyhow::Result { serde_json::from_value(value.take()).map_err(|e| anyhow!(e)) @@ -54,4 +47,9 @@ impl SplitMetaData for PubsubSplit { fn id(&self) -> SplitId { format!("{}-{}", self.subscription, self.index).into() } + + fn update_with_offset(&mut self, start_offset: String) -> anyhow::Result<()> { + self.start_offset = Some(start_offset); + Ok(()) + } } diff --git a/src/connector/src/source/kafka/split.rs b/src/connector/src/source/kafka/split.rs index 9d67b4496fe52..31c834d5f1609 100644 --- a/src/connector/src/source/kafka/split.rs +++ b/src/connector/src/source/kafka/split.rs @@ -39,6 +39,11 @@ impl SplitMetaData for KafkaSplit { fn encode_to_json(&self) -> JsonbVal { serde_json::to_value(self.clone()).unwrap().into() } + + fn update_with_offset(&mut self, start_offset: String) -> anyhow::Result<()> { + self.start_offset = Some(start_offset.as_str().parse::().unwrap()); + Ok(()) + } } impl KafkaSplit { @@ -56,11 +61,6 @@ impl KafkaSplit { } } - pub fn update_with_offset(&mut self, start_offset: String) -> anyhow::Result<()> { - self.start_offset = Some(start_offset.as_str().parse::().unwrap()); - Ok(()) - } - pub fn get_topic_and_partition(&self) -> (String, i32) { (self.topic.clone(), self.partition) } diff --git a/src/connector/src/source/kinesis/split.rs b/src/connector/src/source/kinesis/split.rs index 09138d842b8ea..e03ecb59bfcd0 100644 --- a/src/connector/src/source/kinesis/split.rs +++ b/src/connector/src/source/kinesis/split.rs @@ -46,6 +46,17 @@ impl SplitMetaData for KinesisSplit { fn encode_to_json(&self) -> JsonbVal { serde_json::to_value(self.clone()).unwrap().into() } + + fn update_with_offset(&mut self, start_offset: String) -> anyhow::Result<()> { + let start_offset = if start_offset.is_empty() { + KinesisOffset::Earliest + } else { + KinesisOffset::SequenceNumber(start_offset) + }; + + self.start_position = start_offset; + Ok(()) + } } impl KinesisSplit { @@ -60,15 +71,4 @@ impl KinesisSplit { end_position, } } - - pub fn update_with_offset(&mut self, start_offset: String) -> anyhow::Result<()> { - let start_offset = if start_offset.is_empty() { - KinesisOffset::Earliest - } else { - KinesisOffset::SequenceNumber(start_offset) - }; - - self.start_position = start_offset; - Ok(()) - } } diff --git a/src/connector/src/source/nats/split.rs b/src/connector/src/source/nats/split.rs index 4072de230d983..d9b3e11b98f87 100644 --- a/src/connector/src/source/nats/split.rs +++ b/src/connector/src/source/nats/split.rs @@ -50,6 +50,16 @@ impl SplitMetaData for NatsSplit { fn encode_to_json(&self) -> JsonbVal { serde_json::to_value(self.clone()).unwrap().into() } + + fn update_with_offset(&mut self, start_sequence: String) -> anyhow::Result<()> { + let start_sequence = if start_sequence.is_empty() { + NatsOffset::Earliest + } else { + NatsOffset::SequenceNumber(start_sequence) + }; + self.start_sequence = start_sequence; + Ok(()) + } } impl NatsSplit { @@ -60,14 +70,4 @@ impl NatsSplit { start_sequence, } } - - pub fn update_with_offset(&mut self, start_sequence: String) -> anyhow::Result<()> { - let start_sequence = if start_sequence.is_empty() { - NatsOffset::Earliest - } else { - NatsOffset::SequenceNumber(start_sequence) - }; - self.start_sequence = start_sequence; - Ok(()) - } } diff --git a/src/connector/src/source/nexmark/split.rs b/src/connector/src/source/nexmark/split.rs index e1730993f5fbc..221fa20cbfe48 100644 --- a/src/connector/src/source/nexmark/split.rs +++ b/src/connector/src/source/nexmark/split.rs @@ -38,6 +38,11 @@ impl SplitMetaData for NexmarkSplit { fn encode_to_json(&self) -> JsonbVal { serde_json::to_value(self.clone()).unwrap().into() } + + fn update_with_offset(&mut self, start_offset: String) -> anyhow::Result<()> { + self.start_offset = Some(start_offset.as_str().parse::().unwrap()); + Ok(()) + } } impl NexmarkSplit { @@ -48,9 +53,4 @@ impl NexmarkSplit { start_offset, } } - - pub fn update_with_offset(&mut self, start_offset: String) -> anyhow::Result<()> { - self.start_offset = Some(start_offset.as_str().parse::().unwrap()); - Ok(()) - } } diff --git a/src/connector/src/source/pulsar/split.rs b/src/connector/src/source/pulsar/split.rs index 3f7f1424f0ea3..5e546c0473519 100644 --- a/src/connector/src/source/pulsar/split.rs +++ b/src/connector/src/source/pulsar/split.rs @@ -26,19 +26,6 @@ pub struct PulsarSplit { pub(crate) start_offset: PulsarEnumeratorOffset, } -impl PulsarSplit { - pub fn update_with_offset(&mut self, start_offset: String) -> anyhow::Result<()> { - let start_offset = if start_offset.is_empty() { - PulsarEnumeratorOffset::Earliest - } else { - PulsarEnumeratorOffset::MessageId(start_offset) - }; - - self.start_offset = start_offset; - Ok(()) - } -} - impl SplitMetaData for PulsarSplit { fn id(&self) -> SplitId { // TODO: should avoid constructing a string every time @@ -52,4 +39,15 @@ impl SplitMetaData for PulsarSplit { fn encode_to_json(&self) -> JsonbVal { serde_json::to_value(self.clone()).unwrap().into() } + + fn update_with_offset(&mut self, start_offset: String) -> anyhow::Result<()> { + let start_offset = if start_offset.is_empty() { + PulsarEnumeratorOffset::Earliest + } else { + PulsarEnumeratorOffset::MessageId(start_offset) + }; + + self.start_offset = start_offset; + Ok(()) + } } diff --git a/src/meta/src/stream/source_manager.rs b/src/meta/src/stream/source_manager.rs index cae431aa5b188..d6a7377f19928 100644 --- a/src/meta/src/stream/source_manager.rs +++ b/src/meta/src/stream/source_manager.rs @@ -29,7 +29,6 @@ use risingwave_connector::source::{ SplitEnumerator, SplitId, SplitImpl, SplitMetaData, }; use risingwave_pb::catalog::Source; -use risingwave_pb::connector_service::PbTableSchema; use risingwave_pb::source::{ConnectorSplit, ConnectorSplits}; use risingwave_rpc_client::ConnectorClient; use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; @@ -77,30 +76,7 @@ struct ConnectorSourceWorker { fn extract_prop_from_source(source: &Source) -> MetaResult { let mut properties = ConnectorProperties::extract(source.properties.clone())?; - if properties.is_cdc_connector() { - let pk_indices = source - .pk_column_ids - .iter() - .map(|&id| { - source - .columns - .iter() - .position(|col| col.column_desc.as_ref().unwrap().column_id == id) - .unwrap() as u32 - }) - .collect_vec(); - - let table_schema = PbTableSchema { - columns: source - .columns - .iter() - .flat_map(|col| &col.column_desc) - .cloned() - .collect(), - pk_indices, - }; - properties.init_cdc_properties(table_schema); - } + properties.init_from_pb_source(source); Ok(properties) } @@ -936,6 +912,10 @@ mod tests { fn restore_from_json(value: JsonbVal) -> anyhow::Result { serde_json::from_value(value.take()).map_err(|e| anyhow!(e)) } + + fn update_with_offset(&mut self, _start_offset: String) -> anyhow::Result<()> { + Ok(()) + } } fn check_all_splits(