From a205e093e715fefe03214bba464c76e4fdf1f854 Mon Sep 17 00:00:00 2001 From: William Wen Date: Mon, 4 Sep 2023 18:53:36 +0800 Subject: [PATCH 1/3] refactor(source): specify cdc generic type parameter for different cdc source --- Cargo.lock | 1 + src/connector/Cargo.toml | 1 + src/connector/src/macros.rs | 77 +++------- src/connector/src/source/base.rs | 70 +++++----- .../src/source/cdc/enumerator/mod.rs | 132 +++++++++++------- src/connector/src/source/cdc/mod.rs | 81 +++++++++-- src/connector/src/source/cdc/source/reader.rs | 46 +++--- src/connector/src/source/cdc/split.rs | 12 +- src/connector/src/source/common.rs | 76 ++++++++++ .../src/source/datagen/source/reader.rs | 19 +-- .../src/source/google_pubsub/source/reader.rs | 13 +- .../src/source/kafka/source/reader.rs | 16 ++- .../src/source/kinesis/source/reader.rs | 26 ++-- src/connector/src/source/mod.rs | 2 + .../src/source/pulsar/source/reader.rs | 16 +-- 15 files changed, 361 insertions(+), 227 deletions(-) create mode 100644 src/connector/src/source/common.rs diff --git a/Cargo.lock b/Cargo.lock index dbf61ce31ee26..68242a7f36bb5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6566,6 +6566,7 @@ dependencies = [ "num-bigint", "opendal", "parking_lot 0.12.1", + "paste", "prometheus", "prost", "prost-reflect", diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index 5ab001cfe7a11..315c32b677a8a 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -57,6 +57,7 @@ nexmark = { version = "0.2", features = ["serde"] } num-bigint = "0.4" opendal = "0.39" parking_lot = "0.12" +paste = "1" prometheus = { version = "0.13", features = ["process"] } prost = { version = "0.11.9", features = ["no-recursion-limit"] } prost-reflect = "0.11.4" diff --git a/src/connector/src/macros.rs b/src/connector/src/macros.rs index 70fb6130b8717..4291978e18117 100644 --- a/src/connector/src/macros.rs +++ b/src/connector/src/macros.rs @@ -37,7 +37,7 @@ macro_rules! impl_split_enumerator { .map(SplitImpl::$variant_name) .collect_vec() }) - .map_err(|e| ErrorCode::ConnectorError(e.into()).into()), + .map_err(|e| risingwave_common::error::ErrorCode::ConnectorError(e.into()).into()), )* } } @@ -55,6 +55,25 @@ macro_rules! impl_split { } } } + $( + impl TryFrom for $split { + type Error = anyhow::Error; + + fn try_from(split: SplitImpl) -> std::result::Result { + match split { + SplitImpl::$variant_name(inner) => Ok(inner), + other => Err(anyhow::anyhow!("expect {} but get {:?}", stringify!($split), other)) + } + } + } + + impl From<$split> for SplitImpl { + fn from(split: $split) -> SplitImpl { + SplitImpl::$variant_name(split) + } + } + + )* impl TryFrom<&ConnectorSplit> for SplitImpl { type Error = anyhow::Error; @@ -161,7 +180,7 @@ macro_rules! impl_connector_properties { pub fn extract(mut props: HashMap) -> Result { const UPSTREAM_SOURCE_KEY: &str = "connector"; let connector = props.remove(UPSTREAM_SOURCE_KEY).ok_or_else(|| anyhow!("Must specify 'connector' in WITH clause"))?; - if connector.ends_with("cdc") { + if connector.ends_with("-cdc") { ConnectorProperties::new_cdc_properties(&connector, props) } else { let json_value = serde_json::to_value(props).map_err(|e| anyhow!(e))?; @@ -180,57 +199,3 @@ macro_rules! impl_connector_properties { } } } - -#[macro_export] -macro_rules! impl_common_split_reader_logic { - ($reader:ty, $props:ty) => { - impl $reader { - #[try_stream(boxed, ok = $crate::source::StreamChunkWithState, error = risingwave_common::error::RwError)] - pub(crate) async fn into_chunk_stream(self) { - let parser_config = self.parser_config.clone(); - let actor_id = self.source_ctx.source_info.actor_id.to_string(); - let source_id = self.source_ctx.source_info.source_id.to_string(); - let metrics = self.source_ctx.metrics.clone(); - let source_ctx = self.source_ctx.clone(); - - let data_stream = self.into_data_stream(); - - let data_stream = data_stream - .inspect_ok(move |data_batch| { - let mut by_split_id = std::collections::HashMap::new(); - - for msg in data_batch { - by_split_id - .entry(msg.split_id.as_ref()) - .or_insert_with(Vec::new) - .push(msg); - } - - for (split_id, msgs) in by_split_id { - metrics - .partition_input_count - .with_label_values(&[&actor_id, &source_id, split_id]) - .inc_by(msgs.len() as u64); - - let sum_bytes = msgs - .iter() - .flat_map(|msg| msg.payload.as_ref().map(|p| p.len() as u64)) - .sum(); - - metrics - .partition_input_bytes - .with_label_values(&[&actor_id, &source_id, &split_id]) - .inc_by(sum_bytes); - } - }).boxed(); - - let parser = - $crate::parser::ByteStreamSourceParserImpl::create(parser_config, source_ctx).await?; - #[for_await] - for msg_batch in parser.into_stream(data_stream) { - yield msg_batch?; - } - } - } - }; -} diff --git a/src/connector/src/source/base.rs b/src/connector/src/source/base.rs index 8b5b3287fb445..1178cf68939ec 100644 --- a/src/connector/src/source/base.rs +++ b/src/connector/src/source/base.rs @@ -25,28 +25,30 @@ use itertools::Itertools; use parking_lot::Mutex; use risingwave_common::array::StreamChunk; use risingwave_common::catalog::TableId; -use risingwave_common::error::{ErrorCode, ErrorSuppressor, Result as RwResult, RwError}; +use risingwave_common::error::{ErrorSuppressor, Result as RwResult, RwError}; use risingwave_common::types::{JsonbVal, Scalar}; use risingwave_pb::connector_service::PbTableSchema; use risingwave_pb::source::ConnectorSplit; use risingwave_rpc_client::ConnectorClient; -use serde::{Deserialize, Serialize}; use super::datagen::DatagenMeta; -use super::filesystem::{FsSplit, S3FileReader, S3Properties, S3SplitEnumerator, S3_CONNECTOR}; +use super::filesystem::{FsSplit, S3Properties, S3_CONNECTOR}; use super::google_pubsub::GooglePubsubMeta; use super::kafka::KafkaMeta; use super::monitor::SourceMetrics; use super::nexmark::source::message::NexmarkMeta; use crate::parser::ParserConfig; use crate::source::cdc::{ - CdcProperties, CdcSplitReader, DebeziumCdcSplit, DebeziumSplitEnumerator, CITUS_CDC_CONNECTOR, - MYSQL_CDC_CONNECTOR, POSTGRES_CDC_CONNECTOR, + CdcProperties, CdcSplitReader, Citus, CitusDebeziumSplitEnumerator, DebeziumCdcSplit, + DebeziumSplitEnumerator, Mysql, MysqlDebeziumSplitEnumerator, Postgres, + PostgresDebeziumSplitEnumerator, CITUS_CDC_CONNECTOR, MYSQL_CDC_CONNECTOR, + POSTGRES_CDC_CONNECTOR, }; use crate::source::datagen::{ DatagenProperties, DatagenSplit, DatagenSplitEnumerator, DatagenSplitReader, DATAGEN_CONNECTOR, }; use crate::source::dummy_connector::DummySplitReader; +use crate::source::filesystem::{S3FileReader, S3SplitEnumerator}; use crate::source::google_pubsub::{ PubsubProperties, PubsubSplit, PubsubSplitEnumerator, PubsubSplitReader, GOOGLE_PUBSUB_CONNECTOR, @@ -287,7 +289,7 @@ pub trait SplitReader: Sized { fn into_stream(self) -> BoxSourceWithStateStream; } -#[derive(Clone, Debug, Deserialize)] +#[derive(Clone, Debug)] pub enum ConnectorProperties { Kafka(Box), Pulsar(Box), @@ -295,9 +297,9 @@ pub enum ConnectorProperties { Nexmark(Box), Datagen(Box), S3(Box), - MySqlCdc(Box), - PostgresCdc(Box), - CitusCdc(Box), + MySqlCdc(Box>), + PostgresCdc(Box>), + CitusCdc(Box>), GooglePubsub(Box), Dummy(Box<()>), } @@ -308,19 +310,16 @@ impl ConnectorProperties { properties: HashMap, ) -> Result { match connector_name { - MYSQL_CDC_CONNECTOR => Ok(Self::MySqlCdc(Box::new(CdcProperties { + MYSQL_CDC_CONNECTOR => Ok(Self::MySqlCdc(Box::new(CdcProperties:: { props: properties, - source_type: "mysql".to_string(), ..Default::default() }))), - POSTGRES_CDC_CONNECTOR => Ok(Self::PostgresCdc(Box::new(CdcProperties { + POSTGRES_CDC_CONNECTOR => Ok(Self::PostgresCdc(Box::new(CdcProperties:: { props: properties, - source_type: "postgres".to_string(), ..Default::default() }))), - CITUS_CDC_CONNECTOR => Ok(Self::CitusCdc(Box::new(CdcProperties { + CITUS_CDC_CONNECTOR => Ok(Self::CitusCdc(Box::new(CdcProperties:: { props: properties, - source_type: "citus".to_string(), ..Default::default() }))), _ => Err(anyhow!("unexpected cdc connector '{}'", connector_name,)), @@ -329,9 +328,13 @@ impl ConnectorProperties { pub fn init_cdc_properties(&mut self, table_schema: Option) { match self { - ConnectorProperties::MySqlCdc(c) - | ConnectorProperties::PostgresCdc(c) - | ConnectorProperties::CitusCdc(c) => { + ConnectorProperties::MySqlCdc(c) => { + c.table_schema = table_schema; + } + ConnectorProperties::PostgresCdc(c) => { + c.table_schema = table_schema; + } + ConnectorProperties::CitusCdc(c) => { c.table_schema = table_schema; } _ => {} @@ -352,7 +355,7 @@ impl ConnectorProperties { } } -#[derive(Debug, Clone, Serialize, Deserialize, EnumAsInner, PartialEq, Hash)] +#[derive(Debug, Clone, EnumAsInner, PartialEq, Hash)] pub enum SplitImpl { Kafka(KafkaSplit), Pulsar(PulsarSplit), @@ -360,9 +363,9 @@ pub enum SplitImpl { Nexmark(NexmarkSplit), Datagen(DatagenSplit), GooglePubsub(PubsubSplit), - MySqlCdc(DebeziumCdcSplit), - PostgresCdc(DebeziumCdcSplit), - CitusCdc(DebeziumCdcSplit), + MySqlCdc(DebeziumCdcSplit), + PostgresCdc(DebeziumCdcSplit), + CitusCdc(DebeziumCdcSplit), S3(FsSplit), } @@ -392,9 +395,9 @@ pub enum SplitReaderImpl { Nexmark(Box), Pulsar(Box), Datagen(Box), - MySqlCdc(Box), - PostgresCdc(Box), - CitusCdc(Box), + MySqlCdc(Box>), + PostgresCdc(Box>), + CitusCdc(Box>), GooglePubsub(Box), } @@ -404,9 +407,9 @@ pub enum SplitEnumeratorImpl { Kinesis(KinesisSplitEnumerator), Nexmark(NexmarkSplitEnumerator), Datagen(DatagenSplitEnumerator), - MySqlCdc(DebeziumSplitEnumerator), - PostgresCdc(DebeziumSplitEnumerator), - CitusCdc(DebeziumSplitEnumerator), + MySqlCdc(MysqlDebeziumSplitEnumerator), + PostgresCdc(PostgresDebeziumSplitEnumerator), + CitusCdc(CitusDebeziumSplitEnumerator), GooglePubsub(PubsubSplitEnumerator), S3(S3SplitEnumerator), } @@ -418,9 +421,6 @@ impl_connector_properties! { { Nexmark, NEXMARK_CONNECTOR }, { Datagen, DATAGEN_CONNECTOR }, { S3, S3_CONNECTOR }, - { MySqlCdc, MYSQL_CDC_CONNECTOR }, - { PostgresCdc, POSTGRES_CDC_CONNECTOR }, - { CitusCdc, CITUS_CDC_CONNECTOR }, { GooglePubsub, GOOGLE_PUBSUB_CONNECTOR} } @@ -444,9 +444,9 @@ impl_split! { { Nexmark, NEXMARK_CONNECTOR, NexmarkSplit }, { Datagen, DATAGEN_CONNECTOR, DatagenSplit }, { GooglePubsub, GOOGLE_PUBSUB_CONNECTOR, PubsubSplit }, - { MySqlCdc, MYSQL_CDC_CONNECTOR, DebeziumCdcSplit }, - { PostgresCdc, POSTGRES_CDC_CONNECTOR, DebeziumCdcSplit }, - { CitusCdc, CITUS_CDC_CONNECTOR, DebeziumCdcSplit }, + { MySqlCdc, MYSQL_CDC_CONNECTOR, DebeziumCdcSplit }, + { PostgresCdc, POSTGRES_CDC_CONNECTOR, DebeziumCdcSplit }, + { CitusCdc, CITUS_CDC_CONNECTOR, DebeziumCdcSplit }, { S3, S3_CONNECTOR, FsSplit } } @@ -645,7 +645,6 @@ mod tests { let conn_props = ConnectorProperties::extract(user_props_mysql).unwrap(); if let ConnectorProperties::MySqlCdc(c) = conn_props { - assert_eq!(c.source_type, "mysql"); assert_eq!(c.props.get("connector_node_addr").unwrap(), "localhost"); assert_eq!(c.props.get("database.hostname").unwrap(), "127.0.0.1"); assert_eq!(c.props.get("database.port").unwrap(), "3306"); @@ -659,7 +658,6 @@ mod tests { let conn_props = ConnectorProperties::extract(user_props_postgres).unwrap(); if let ConnectorProperties::PostgresCdc(c) = conn_props { - assert_eq!(c.source_type, "postgres"); assert_eq!(c.props.get("connector_node_addr").unwrap(), "localhost"); assert_eq!(c.props.get("database.hostname").unwrap(), "127.0.0.1"); assert_eq!(c.props.get("database.port").unwrap(), "5432"); diff --git a/src/connector/src/source/cdc/enumerator/mod.rs b/src/connector/src/source/cdc/enumerator/mod.rs index 91d2bf0bb7bbf..6ced5ee79da1e 100644 --- a/src/connector/src/source/cdc/enumerator/mod.rs +++ b/src/connector/src/source/cdc/enumerator/mod.rs @@ -12,38 +12,43 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::marker::PhantomData; use std::str::FromStr; use anyhow::anyhow; use async_trait::async_trait; use itertools::Itertools; use risingwave_common::util::addr::HostAddr; -use risingwave_pb::connector_service::SourceType as PbSourceType; +use risingwave_pb::connector_service::SourceType; use crate::source::cdc::{ - CdcProperties, CdcSplitBase, DebeziumCdcSplit, MySqlCdcSplit, PostgresCdcSplit, + CdcProperties, CdcSourceTypeTrait, CdcSplitBase, Citus, DebeziumCdcSplit, MySqlCdcSplit, Mysql, + Postgres, PostgresCdcSplit, }; use crate::source::{SourceEnumeratorContextRef, SplitEnumerator}; pub const DATABASE_SERVERS_KEY: &str = "database.servers"; #[derive(Debug)] -pub struct DebeziumSplitEnumerator { +pub struct DebeziumSplitEnumerator { /// The source_id in the catalog source_id: u32, - source_type: PbSourceType, worker_node_addrs: Vec, + _phantom: PhantomData, } #[async_trait] -impl SplitEnumerator for DebeziumSplitEnumerator { - type Properties = CdcProperties; - type Split = DebeziumCdcSplit; +impl SplitEnumerator for DebeziumSplitEnumerator +where + Self: ListCdcSplits, +{ + type Properties = CdcProperties; + type Split = DebeziumCdcSplit; async fn new( - props: CdcProperties, + props: CdcProperties, context: SourceEnumeratorContextRef, - ) -> anyhow::Result { + ) -> anyhow::Result { let connector_client = context.connector_client.clone().ok_or_else(|| { anyhow!("connector node endpoint not specified or unable to connect to connector node") })?; @@ -59,12 +64,16 @@ impl SplitEnumerator for DebeziumSplitEnumerator { .transpose()? .unwrap_or_default(); - let source_type = props.get_source_type_pb()?; + assert_eq!( + props.get_source_type_pb(), + SourceType::from(T::source_type()) + ); + // validate connector properties connector_client .validate_source_properties( context.info.source_id as u64, - props.get_source_type_pb()?, + props.get_source_type_pb(), props.props, props.table_schema, ) @@ -73,54 +82,73 @@ impl SplitEnumerator for DebeziumSplitEnumerator { tracing::debug!("validate cdc source properties success"); Ok(Self { source_id: context.info.source_id, - source_type, worker_node_addrs: server_addrs, + _phantom: PhantomData, }) } - async fn list_splits(&mut self) -> anyhow::Result> { - match self.source_type { - PbSourceType::Mysql => { - // CDC source only supports single split - let split = MySqlCdcSplit { - inner: CdcSplitBase::new(self.source_id, None), - }; - let dbz_split = DebeziumCdcSplit { - mysql_split: Some(split), - pg_split: None, - }; - Ok(vec![dbz_split]) - } - PbSourceType::Postgres => { + async fn list_splits(&mut self) -> anyhow::Result>> { + Ok(self.list_cdc_splits()) + } +} + +pub trait ListCdcSplits { + type CdcSourceType: CdcSourceTypeTrait; + fn list_cdc_splits(&mut self) -> Vec>; +} + +impl ListCdcSplits for DebeziumSplitEnumerator { + type CdcSourceType = Mysql; + + fn list_cdc_splits(&mut self) -> Vec> { + // CDC source only supports single split + let split = MySqlCdcSplit { + inner: CdcSplitBase::new(self.source_id, None), + }; + let dbz_split = DebeziumCdcSplit { + mysql_split: Some(split), + pg_split: None, + _phantom: PhantomData, + }; + vec![dbz_split] + } +} + +impl ListCdcSplits for DebeziumSplitEnumerator { + type CdcSourceType = Postgres; + + fn list_cdc_splits(&mut self) -> Vec> { + let split = PostgresCdcSplit { + inner: CdcSplitBase::new(self.source_id, None), + server_addr: None, + }; + let dbz_split = DebeziumCdcSplit { + mysql_split: None, + pg_split: Some(split), + _phantom: Default::default(), + }; + vec![dbz_split] + } +} + +impl ListCdcSplits for DebeziumSplitEnumerator { + type CdcSourceType = Citus; + + fn list_cdc_splits(&mut self) -> Vec> { + self.worker_node_addrs + .iter() + .enumerate() + .map(|(id, addr)| { let split = PostgresCdcSplit { - inner: CdcSplitBase::new(self.source_id, None), - server_addr: None, + inner: CdcSplitBase::new(id as u32, None), + server_addr: Some(addr.to_string()), }; - let dbz_split = DebeziumCdcSplit { + DebeziumCdcSplit { mysql_split: None, pg_split: Some(split), - }; - Ok(vec![dbz_split]) - } - PbSourceType::Citus => { - let splits = self - .worker_node_addrs - .iter() - .enumerate() - .map(|(id, addr)| { - let split = PostgresCdcSplit { - inner: CdcSplitBase::new(id as u32, None), - server_addr: Some(addr.to_string()), - }; - DebeziumCdcSplit { - mysql_split: None, - pg_split: Some(split), - } - }) - .collect_vec(); - Ok(splits) - } - _ => Err(anyhow!("unexpected source type")), - } + _phantom: Default::default(), + } + }) + .collect_vec() } } diff --git a/src/connector/src/source/cdc/mod.rs b/src/connector/src/source/cdc/mod.rs index 308e6ed88206c..e7bff4b4a7ec0 100644 --- a/src/connector/src/source/cdc/mod.rs +++ b/src/connector/src/source/cdc/mod.rs @@ -15,34 +15,85 @@ pub mod enumerator; pub mod source; pub mod split; - use std::collections::HashMap; +use std::marker::PhantomData; -use anyhow::anyhow; pub use enumerator::*; -use risingwave_pb::connector_service::{SourceType, TableSchema}; -use serde::Deserialize; +use paste::paste; +use risingwave_pb::connector_service::{PbSourceType, SourceType, TableSchema}; pub use source::*; pub use split::*; -pub const MYSQL_CDC_CONNECTOR: &str = "mysql-cdc"; -pub const POSTGRES_CDC_CONNECTOR: &str = "postgres-cdc"; -pub const CITUS_CDC_CONNECTOR: &str = "citus-cdc"; +pub const MYSQL_CDC_CONNECTOR: &str = Mysql::CDC_CONNECTOR_NAME; +pub const POSTGRES_CDC_CONNECTOR: &str = Postgres::CDC_CONNECTOR_NAME; +pub const CITUS_CDC_CONNECTOR: &str = Citus::CDC_CONNECTOR_NAME; + +pub trait CdcSourceTypeTrait: Send + Sync + Clone + 'static { + const CDC_CONNECTOR_NAME: &'static str; + fn source_type() -> CdcSourceType; +} + +macro_rules! impl_cdc_source_type { + ($({$source_type:ident, $name:expr }),*) => { + $( + paste!{ + #[derive(Clone, Debug, Default, PartialEq, Eq, Hash)] + pub struct $source_type; + impl CdcSourceTypeTrait for $source_type { + const CDC_CONNECTOR_NAME: &'static str = concat!($name, "-cdc"); + fn source_type() -> CdcSourceType { + CdcSourceType::$source_type + } + } + + pub type [< $source_type DebeziumSplitEnumerator >] = DebeziumSplitEnumerator<$source_type>; + } + )* + + pub enum CdcSourceType { + $( + $source_type, + )* + } -#[derive(Clone, Debug, Deserialize, Default)] -pub struct CdcProperties { - /// Type of the cdc source, e.g. mysql, postgres - pub source_type: String, + impl From for CdcSourceType { + fn from(value: PbSourceType) -> Self { + match value { + PbSourceType::Unspecified => unreachable!(), + $( + PbSourceType::$source_type => CdcSourceType::$source_type, + )* + } + } + } + + impl From for PbSourceType { + fn from(this: CdcSourceType) -> PbSourceType { + match this { + $( + CdcSourceType::$source_type => PbSourceType::$source_type, + )* + } + } + } + } +} + +impl_cdc_source_type!({ Mysql, "mysql" }, { Postgres, "postgres" }, { Citus, "citus" }); + +#[derive(Clone, Debug, Default)] +pub struct CdcProperties { /// Properties specified in the WITH clause by user pub props: HashMap, /// Schema of the source specified by users pub table_schema: Option, + + pub _phantom: PhantomData, } -impl CdcProperties { - pub fn get_source_type_pb(&self) -> anyhow::Result { - SourceType::from_str_name(&self.source_type.to_ascii_uppercase()) - .ok_or_else(|| anyhow!("unknown source type: {}", self.source_type)) +impl CdcProperties { + pub fn get_source_type_pb(&self) -> SourceType { + SourceType::from(T::source_type()) } } diff --git a/src/connector/src/source/cdc/source/reader.rs b/src/connector/src/source/cdc/source/reader.rs index 200c91a8a5051..974ec8877d2f6 100644 --- a/src/connector/src/source/cdc/source/reader.rs +++ b/src/connector/src/source/cdc/source/reader.rs @@ -16,28 +16,26 @@ use std::str::FromStr; use anyhow::{anyhow, Result}; use async_trait::async_trait; -use futures::{pin_mut, StreamExt, TryStreamExt}; +use futures::pin_mut; use futures_async_stream::try_stream; use risingwave_common::util::addr::HostAddr; use risingwave_pb::connector_service::GetEventStreamResponse; -use crate::impl_common_split_reader_logic; use crate::parser::ParserConfig; use crate::source::base::SourceMessage; -use crate::source::cdc::CdcProperties; +use crate::source::cdc::{CdcProperties, CdcSourceType, CdcSourceTypeTrait, DebeziumCdcSplit}; +use crate::source::common::{into_chunk_stream, CommonSplitReader}; use crate::source::{ BoxSourceWithStateStream, Column, SourceContextRef, SplitId, SplitImpl, SplitMetaData, SplitReader, }; -impl_common_split_reader_logic!(CdcSplitReader, CdcProperties); - -pub struct CdcSplitReader { +pub struct CdcSplitReader { source_id: u64, start_offset: Option, // host address of worker node for a Citus cluster server_addr: Option, - conn_props: CdcProperties, + conn_props: CdcProperties, split_id: SplitId, // whether the full snapshot phase is done @@ -47,22 +45,25 @@ pub struct CdcSplitReader { } #[async_trait] -impl SplitReader for CdcSplitReader { - type Properties = CdcProperties; +impl SplitReader for CdcSplitReader +where + DebeziumCdcSplit: TryFrom, +{ + type Properties = CdcProperties; #[allow(clippy::unused_async)] async fn new( - conn_props: CdcProperties, + conn_props: CdcProperties, splits: Vec, parser_config: ParserConfig, source_ctx: SourceContextRef, _columns: Option>, ) -> Result { assert_eq!(splits.len(), 1); - let split = splits.into_iter().next().unwrap(); + let split = DebeziumCdcSplit::::try_from(splits.into_iter().next().unwrap())?; let split_id = split.id(); - match split { - SplitImpl::MySqlCdc(split) | SplitImpl::PostgresCdc(split) => Ok(Self { + match T::source_type() { + CdcSourceType::Mysql | CdcSourceType::Postgres => Ok(Self { source_id: split.split_id() as u64, start_offset: split.start_offset().clone(), server_addr: None, @@ -72,7 +73,7 @@ impl SplitReader for CdcSplitReader { parser_config, source_ctx, }), - SplitImpl::CitusCdc(split) => Ok(Self { + CdcSourceType::Citus => Ok(Self { source_id: split.split_id() as u64, start_offset: split.start_offset().clone(), server_addr: split.server_addr().clone(), @@ -82,20 +83,21 @@ impl SplitReader for CdcSplitReader { parser_config, source_ctx, }), - - _ => Err(anyhow!( - "failed to create cdc split reader: invalid splis info" - )), } } fn into_stream(self) -> BoxSourceWithStateStream { - self.into_chunk_stream() + let parser_config = self.parser_config.clone(); + let source_context = self.source_ctx.clone(); + into_chunk_stream(self, parser_config, source_context) } } -impl CdcSplitReader { - #[try_stream(boxed, ok = Vec, error = anyhow::Error)] +impl CommonSplitReader for CdcSplitReader +where + Self: SplitReader, +{ + #[try_stream(ok = Vec, error = anyhow::Error)] async fn into_data_stream(self) { let cdc_client = self.source_ctx.connector_client.clone().ok_or_else(|| { anyhow!("connector node endpoint not specified or unable to connect to connector node") @@ -122,7 +124,7 @@ impl CdcSplitReader { let cdc_stream = cdc_client .start_source_stream( self.source_id, - self.conn_props.get_source_type_pb()?, + self.conn_props.get_source_type_pb(), self.start_offset, properties, self.snapshot_done, diff --git a/src/connector/src/source/cdc/split.rs b/src/connector/src/source/cdc/split.rs index e72fe97b5658f..6535585a5309e 100644 --- a/src/connector/src/source/cdc/split.rs +++ b/src/connector/src/source/cdc/split.rs @@ -13,11 +13,13 @@ // limitations under the License. use std::collections::HashMap; +use std::marker::PhantomData; use anyhow::anyhow; use risingwave_common::types::JsonbVal; use serde::{Deserialize, Serialize}; +use crate::source::cdc::CdcSourceTypeTrait; use crate::source::{SplitId, SplitMetaData}; /// The base states of a CDC split, which will be persisted to checkpoint. @@ -151,12 +153,15 @@ impl PostgresCdcSplit { } #[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Hash)] -pub struct DebeziumCdcSplit { +pub struct DebeziumCdcSplit { pub mysql_split: Option, pub pg_split: Option, + + #[serde(skip)] + pub _phantom: PhantomData, } -impl SplitMetaData for DebeziumCdcSplit { +impl SplitMetaData for DebeziumCdcSplit { fn id(&self) -> SplitId { assert!(self.mysql_split.is_some() || self.pg_split.is_some()); if let Some(split) = &self.mysql_split { @@ -177,11 +182,12 @@ impl SplitMetaData for DebeziumCdcSplit { } } -impl DebeziumCdcSplit { +impl DebeziumCdcSplit { pub fn new(mysql_split: Option, pg_split: Option) -> Self { Self { mysql_split, pg_split, + _phantom: PhantomData, } } diff --git a/src/connector/src/source/common.rs b/src/connector/src/source/common.rs new file mode 100644 index 0000000000000..02f1cbde3de38 --- /dev/null +++ b/src/connector/src/source/common.rs @@ -0,0 +1,76 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use futures::{Stream, StreamExt, TryStreamExt}; +use futures_async_stream::try_stream; +use risingwave_common::error::RwError; + +use crate::parser::ParserConfig; +use crate::source::{SourceContextRef, SourceMessage, SplitReader, StreamChunkWithState}; + +pub(crate) trait CommonSplitReader: SplitReader + 'static { + fn into_data_stream( + self, + ) -> impl Stream, anyhow::Error>> + Send; +} + +#[try_stream(boxed, ok = StreamChunkWithState, error = RwError)] +pub(crate) async fn into_chunk_stream( + reader: impl CommonSplitReader + Send, + parser_config: ParserConfig, + source_ctx: SourceContextRef, +) { + let actor_id = source_ctx.source_info.actor_id.to_string(); + let source_id = source_ctx.source_info.source_id.to_string(); + let metrics = source_ctx.metrics.clone(); + + let data_stream = reader.into_data_stream(); + + let data_stream = data_stream + .inspect_ok(move |data_batch| { + let mut by_split_id = std::collections::HashMap::new(); + + for msg in data_batch { + by_split_id + .entry(msg.split_id.as_ref()) + .or_insert_with(Vec::new) + .push(msg); + } + + for (split_id, msgs) in by_split_id { + metrics + .partition_input_count + .with_label_values(&[&actor_id, &source_id, split_id]) + .inc_by(msgs.len() as u64); + + let sum_bytes = msgs + .iter() + .flat_map(|msg| msg.payload.as_ref().map(|p| p.len() as u64)) + .sum(); + + metrics + .partition_input_bytes + .with_label_values(&[&actor_id, &source_id, split_id]) + .inc_by(sum_bytes); + } + }) + .boxed(); + + let parser = + crate::parser::ByteStreamSourceParserImpl::create(parser_config, source_ctx).await?; + #[for_await] + for msg_batch in parser.into_stream(data_stream) { + yield msg_batch?; + } +} diff --git a/src/connector/src/source/datagen/source/reader.rs b/src/connector/src/source/datagen/source/reader.rs index d040a3cb63f21..30d1bfae10c4e 100644 --- a/src/connector/src/source/datagen/source/reader.rs +++ b/src/connector/src/source/datagen/source/reader.rs @@ -16,23 +16,20 @@ use std::collections::HashMap; use anyhow::{anyhow, Result}; use async_trait::async_trait; -use futures::{StreamExt, TryStreamExt}; -use futures_async_stream::try_stream; +use futures::{Stream, StreamExt, TryStreamExt}; use risingwave_common::field_generator::{FieldGeneratorImpl, VarcharProperty}; use super::generator::DatagenEventGenerator; -use crate::impl_common_split_reader_logic; use crate::parser::{EncodingProperties, ParserConfig, ProtocolProperties}; +use crate::source::common::{into_chunk_stream, CommonSplitReader}; use crate::source::data_gen_util::spawn_data_generation_stream; use crate::source::datagen::source::SEQUENCE_FIELD_KIND; use crate::source::datagen::{DatagenProperties, DatagenSplit, FieldDesc}; use crate::source::{ - BoxSourceStream, BoxSourceWithStateStream, Column, DataType, SourceContextRef, SplitId, + BoxSourceWithStateStream, Column, DataType, SourceContextRef, SourceMessage, SplitId, SplitImpl, SplitMetaData, SplitReader, }; -impl_common_split_reader_logic!(DatagenSplitReader, DatagenProperties); - pub struct DatagenSplitReader { generator: DatagenEventGenerator, assigned_split: DatagenSplit, @@ -170,13 +167,17 @@ impl SplitReader for DatagenSplitReader { ) .boxed() } - _ => self.into_chunk_stream(), + _ => { + let parser_config = self.parser_config.clone(); + let source_context = self.source_ctx.clone(); + into_chunk_stream(self, parser_config, source_context) + } } } } -impl DatagenSplitReader { - pub(crate) fn into_data_stream(self) -> BoxSourceStream { +impl CommonSplitReader for DatagenSplitReader { + fn into_data_stream(self) -> impl Stream, anyhow::Error>> { // Will buffer at most 4 event chunks. const BUFFER_SIZE: usize = 4; spawn_data_generation_stream(self.generator.into_msg_stream(), BUFFER_SIZE).boxed() diff --git a/src/connector/src/source/google_pubsub/source/reader.rs b/src/connector/src/source/google_pubsub/source/reader.rs index f5d4955ec9b20..d4fa8a9ab5c98 100644 --- a/src/connector/src/source/google_pubsub/source/reader.rs +++ b/src/connector/src/source/google_pubsub/source/reader.rs @@ -15,7 +15,6 @@ use anyhow::{anyhow, ensure, Context, Result}; use async_trait::async_trait; use chrono::{NaiveDateTime, TimeZone, Utc}; -use futures::{StreamExt, TryStreamExt}; use futures_async_stream::try_stream; use google_cloud_pubsub::client::Client; use google_cloud_pubsub::subscription::{SeekTo, Subscription}; @@ -23,8 +22,8 @@ use risingwave_common::bail; use tonic::Code; use super::TaggedReceivedMessage; -use crate::impl_common_split_reader_logic; use crate::parser::ParserConfig; +use crate::source::common::{into_chunk_stream, CommonSplitReader}; use crate::source::google_pubsub::PubsubProperties; use crate::source::{ BoxSourceWithStateStream, Column, SourceContextRef, SourceMessage, SplitId, SplitImpl, @@ -33,8 +32,6 @@ use crate::source::{ const PUBSUB_MAX_FETCH_MESSAGES: usize = 1024; -impl_common_split_reader_logic!(PubsubSplitReader, PubsubProperties); - pub struct PubsubSplitReader { subscription: Subscription, stop_offset: Option, @@ -44,8 +41,8 @@ pub struct PubsubSplitReader { source_ctx: SourceContextRef, } -impl PubsubSplitReader { - #[try_stream(boxed, ok = Vec, error = anyhow::Error)] +impl CommonSplitReader for PubsubSplitReader { + #[try_stream(ok = Vec, error = anyhow::Error)] async fn into_data_stream(self) { loop { let pull_result = self @@ -172,6 +169,8 @@ impl SplitReader for PubsubSplitReader { } fn into_stream(self) -> BoxSourceWithStateStream { - self.into_chunk_stream() + let parser_config = self.parser_config.clone(); + let source_context = self.source_ctx.clone(); + into_chunk_stream(self, parser_config, source_context) } } diff --git a/src/connector/src/source/kafka/source/reader.rs b/src/connector/src/source/kafka/source/reader.rs index d443429857c21..103255a882ed7 100644 --- a/src/connector/src/source/kafka/source/reader.rs +++ b/src/connector/src/source/kafka/source/reader.rs @@ -19,16 +19,16 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH}; use anyhow::{anyhow, Result}; use async_trait::async_trait; -use futures::{StreamExt, TryStreamExt}; +use futures::StreamExt; use futures_async_stream::try_stream; use rdkafka::config::RDKafkaLogLevel; use rdkafka::consumer::{Consumer, StreamConsumer}; use rdkafka::error::KafkaError; use rdkafka::{ClientConfig, Message, Offset, TopicPartitionList}; -use crate::impl_common_split_reader_logic; use crate::parser::ParserConfig; use crate::source::base::SourceMessage; +use crate::source::common::{into_chunk_stream, CommonSplitReader}; use crate::source::kafka::{ KafkaProperties, KafkaSplit, PrivateLinkConsumerContext, KAFKA_ISOLATION_LEVEL, }; @@ -37,8 +37,6 @@ use crate::source::{ SplitReader, }; -impl_common_split_reader_logic!(KafkaSplitReader, KafkaProperties); - pub struct KafkaSplitReader { consumer: StreamConsumer, offsets: HashMap, Option)>, @@ -159,7 +157,9 @@ impl SplitReader for KafkaSplitReader { } fn into_stream(self) -> BoxSourceWithStateStream { - self.into_chunk_stream() + let parser_config = self.parser_config.clone(); + let source_context = self.source_ctx.clone(); + into_chunk_stream(self, parser_config, source_context) } } @@ -176,9 +176,11 @@ impl KafkaSplitReader { ]) .set(offset); } +} - #[try_stream(boxed, ok = Vec, error = anyhow::Error)] - pub async fn into_data_stream(self) { +impl CommonSplitReader for KafkaSplitReader { + #[try_stream(ok = Vec, error = anyhow::Error)] + async fn into_data_stream(self) { if self.offsets.values().all(|(start_offset, stop_offset)| { match (start_offset, stop_offset) { (Some(start), Some(stop)) if (*start + 1) >= *stop => true, diff --git a/src/connector/src/source/kinesis/source/reader.rs b/src/connector/src/source/kinesis/source/reader.rs index 8d714f3b79334..2939908d4aef5 100644 --- a/src/connector/src/source/kinesis/source/reader.rs +++ b/src/connector/src/source/kinesis/source/reader.rs @@ -20,12 +20,11 @@ use aws_sdk_kinesis::error::{DisplayErrorContext, SdkError}; use aws_sdk_kinesis::operation::get_records::{GetRecordsError, GetRecordsOutput}; use aws_sdk_kinesis::types::ShardIteratorType; use aws_sdk_kinesis::Client as KinesisClient; -use futures::{StreamExt, TryStreamExt}; use futures_async_stream::try_stream; use tokio_retry; -use crate::impl_common_split_reader_logic; use crate::parser::ParserConfig; +use crate::source::common::{into_chunk_stream, CommonSplitReader}; use crate::source::kinesis::source::message::KinesisMessage; use crate::source::kinesis::split::KinesisOffset; use crate::source::kinesis::KinesisProperties; @@ -34,8 +33,6 @@ use crate::source::{ SplitMetaData, SplitReader, }; -impl_common_split_reader_logic!(KinesisSplitReader, KinesisProperties); - #[derive(Debug, Clone)] pub struct KinesisSplitReader { client: KinesisClient, @@ -108,13 +105,15 @@ impl SplitReader for KinesisSplitReader { } fn into_stream(self) -> BoxSourceWithStateStream { - self.into_chunk_stream() + let parser_config = self.parser_config.clone(); + let source_context = self.source_ctx.clone(); + into_chunk_stream(self, parser_config, source_context) } } -impl KinesisSplitReader { - #[try_stream(boxed, ok = Vec, error = anyhow::Error)] - pub(crate) async fn into_data_stream(mut self) { +impl CommonSplitReader for KinesisSplitReader { + #[try_stream(ok = Vec < SourceMessage >, error = anyhow::Error)] + async fn into_data_stream(mut self) { self.new_shard_iter().await?; loop { if self.shard_iter.is_none() { @@ -189,7 +188,8 @@ impl KinesisSplitReader { } } } - +} +impl KinesisSplitReader { async fn new_shard_iter(&mut self) -> Result<()> { let (starting_seq_num, iter_type) = if self.latest_offset.is_some() { ( @@ -269,7 +269,7 @@ impl KinesisSplitReader { #[cfg(test)] mod tests { - use futures::StreamExt; + use futures::{pin_mut, StreamExt}; use super::*; use crate::common::KinesisCommon; @@ -294,7 +294,7 @@ mod tests { seq_offset: None, }; - let mut trim_horizen_reader = KinesisSplitReader::new( + let trim_horizen_reader = KinesisSplitReader::new( properties.clone(), vec![SplitImpl::Kinesis(KinesisSplit { shard_id: "shardId-000000000001".to_string().into(), @@ -307,9 +307,10 @@ mod tests { ) .await? .into_data_stream(); + pin_mut!(trim_horizen_reader); println!("{:?}", trim_horizen_reader.next().await.unwrap()?); - let mut offset_reader = KinesisSplitReader::new( + let offset_reader = KinesisSplitReader::new( properties.clone(), vec![SplitImpl::Kinesis(KinesisSplit { shard_id: "shardId-000000000001".to_string().into(), @@ -324,6 +325,7 @@ mod tests { ) .await? .into_data_stream(); + pin_mut!(offset_reader); println!("{:?}", offset_reader.next().await.unwrap()?); Ok(()) diff --git a/src/connector/src/source/mod.rs b/src/connector/src/source/mod.rs index ac10ded25c688..b53fd0f924164 100644 --- a/src/connector/src/source/mod.rs +++ b/src/connector/src/source/mod.rs @@ -28,7 +28,9 @@ pub use base::*; pub use google_pubsub::GOOGLE_PUBSUB_CONNECTOR; pub use kafka::KAFKA_CONNECTOR; pub use kinesis::KINESIS_CONNECTOR; +mod common; mod manager; + pub use manager::SourceColumnDesc; pub use crate::source::nexmark::NEXMARK_CONNECTOR; diff --git a/src/connector/src/source/pulsar/source/reader.rs b/src/connector/src/source/pulsar/source/reader.rs index a8e801541f273..9c749c27d616a 100644 --- a/src/connector/src/source/pulsar/source/reader.rs +++ b/src/connector/src/source/pulsar/source/reader.rs @@ -16,7 +16,7 @@ use std::time::{SystemTime, UNIX_EPOCH}; use anyhow::{anyhow, ensure, Result}; use async_trait::async_trait; -use futures::{StreamExt, TryStreamExt}; +use futures::StreamExt; use futures_async_stream::try_stream; use itertools::Itertools; use pulsar::consumer::InitialPosition; @@ -24,8 +24,8 @@ use pulsar::message::proto::MessageIdData; use pulsar::{Consumer, ConsumerBuilder, ConsumerOptions, Pulsar, SubType, TokioExecutor}; use risingwave_common::try_match_expand; -use crate::impl_common_split_reader_logic; use crate::parser::ParserConfig; +use crate::source::common::{into_chunk_stream, CommonSplitReader}; use crate::source::pulsar::split::PulsarSplit; use crate::source::pulsar::{PulsarEnumeratorOffset, PulsarProperties}; use crate::source::{ @@ -33,8 +33,6 @@ use crate::source::{ SplitMetaData, SplitReader, }; -impl_common_split_reader_logic!(PulsarSplitReader, PulsarProperties); - pub struct PulsarSplitReader { pulsar: Pulsar, consumer: Consumer, TokioExecutor>, @@ -170,13 +168,15 @@ impl SplitReader for PulsarSplitReader { } fn into_stream(self) -> BoxSourceWithStateStream { - self.into_chunk_stream() + let parser_config = self.parser_config.clone(); + let source_context = self.source_ctx.clone(); + into_chunk_stream(self, parser_config, source_context) } } -impl PulsarSplitReader { - #[try_stream(boxed, ok = Vec, error = anyhow::Error)] - pub(crate) async fn into_data_stream(self) { +impl CommonSplitReader for PulsarSplitReader { + #[try_stream(ok = Vec, error = anyhow::Error)] + async fn into_data_stream(self) { let max_chunk_size = self.source_ctx.source_ctrl_opts.chunk_size; #[for_await] for msgs in self.consumer.ready_chunks(max_chunk_size) { From 074a57c884588f356e50d40858f3139b67cf53a8 Mon Sep 17 00:00:00 2001 From: William Wen Date: Mon, 4 Sep 2023 19:10:59 +0800 Subject: [PATCH 2/3] refactor(source): implement the common split reader as a generic function --- src/connector/src/macros.rs | 54 ------------- src/connector/src/source/cdc/source/reader.rs | 14 ++-- src/connector/src/source/common.rs | 76 +++++++++++++++++++ .../src/source/datagen/source/reader.rs | 19 ++--- .../src/source/google_pubsub/source/reader.rs | 13 ++-- .../src/source/kafka/source/reader.rs | 16 ++-- .../src/source/kinesis/source/reader.rs | 26 ++++--- src/connector/src/source/mod.rs | 2 + .../src/source/pulsar/source/reader.rs | 16 ++-- 9 files changed, 132 insertions(+), 104 deletions(-) create mode 100644 src/connector/src/source/common.rs diff --git a/src/connector/src/macros.rs b/src/connector/src/macros.rs index 70fb6130b8717..ea42c63aa6c54 100644 --- a/src/connector/src/macros.rs +++ b/src/connector/src/macros.rs @@ -180,57 +180,3 @@ macro_rules! impl_connector_properties { } } } - -#[macro_export] -macro_rules! impl_common_split_reader_logic { - ($reader:ty, $props:ty) => { - impl $reader { - #[try_stream(boxed, ok = $crate::source::StreamChunkWithState, error = risingwave_common::error::RwError)] - pub(crate) async fn into_chunk_stream(self) { - let parser_config = self.parser_config.clone(); - let actor_id = self.source_ctx.source_info.actor_id.to_string(); - let source_id = self.source_ctx.source_info.source_id.to_string(); - let metrics = self.source_ctx.metrics.clone(); - let source_ctx = self.source_ctx.clone(); - - let data_stream = self.into_data_stream(); - - let data_stream = data_stream - .inspect_ok(move |data_batch| { - let mut by_split_id = std::collections::HashMap::new(); - - for msg in data_batch { - by_split_id - .entry(msg.split_id.as_ref()) - .or_insert_with(Vec::new) - .push(msg); - } - - for (split_id, msgs) in by_split_id { - metrics - .partition_input_count - .with_label_values(&[&actor_id, &source_id, split_id]) - .inc_by(msgs.len() as u64); - - let sum_bytes = msgs - .iter() - .flat_map(|msg| msg.payload.as_ref().map(|p| p.len() as u64)) - .sum(); - - metrics - .partition_input_bytes - .with_label_values(&[&actor_id, &source_id, &split_id]) - .inc_by(sum_bytes); - } - }).boxed(); - - let parser = - $crate::parser::ByteStreamSourceParserImpl::create(parser_config, source_ctx).await?; - #[for_await] - for msg_batch in parser.into_stream(data_stream) { - yield msg_batch?; - } - } - } - }; -} diff --git a/src/connector/src/source/cdc/source/reader.rs b/src/connector/src/source/cdc/source/reader.rs index 200c91a8a5051..32e0c27ca2856 100644 --- a/src/connector/src/source/cdc/source/reader.rs +++ b/src/connector/src/source/cdc/source/reader.rs @@ -16,22 +16,20 @@ use std::str::FromStr; use anyhow::{anyhow, Result}; use async_trait::async_trait; -use futures::{pin_mut, StreamExt, TryStreamExt}; +use futures::pin_mut; use futures_async_stream::try_stream; use risingwave_common::util::addr::HostAddr; use risingwave_pb::connector_service::GetEventStreamResponse; -use crate::impl_common_split_reader_logic; use crate::parser::ParserConfig; use crate::source::base::SourceMessage; use crate::source::cdc::CdcProperties; +use crate::source::common::{into_chunk_stream, CommonSplitReader}; use crate::source::{ BoxSourceWithStateStream, Column, SourceContextRef, SplitId, SplitImpl, SplitMetaData, SplitReader, }; -impl_common_split_reader_logic!(CdcSplitReader, CdcProperties); - pub struct CdcSplitReader { source_id: u64, start_offset: Option, @@ -90,12 +88,14 @@ impl SplitReader for CdcSplitReader { } fn into_stream(self) -> BoxSourceWithStateStream { - self.into_chunk_stream() + let parser_config = self.parser_config.clone(); + let source_context = self.source_ctx.clone(); + into_chunk_stream(self, parser_config, source_context) } } -impl CdcSplitReader { - #[try_stream(boxed, ok = Vec, error = anyhow::Error)] +impl CommonSplitReader for CdcSplitReader { + #[try_stream(ok = Vec, error = anyhow::Error)] async fn into_data_stream(self) { let cdc_client = self.source_ctx.connector_client.clone().ok_or_else(|| { anyhow!("connector node endpoint not specified or unable to connect to connector node") diff --git a/src/connector/src/source/common.rs b/src/connector/src/source/common.rs new file mode 100644 index 0000000000000..02f1cbde3de38 --- /dev/null +++ b/src/connector/src/source/common.rs @@ -0,0 +1,76 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use futures::{Stream, StreamExt, TryStreamExt}; +use futures_async_stream::try_stream; +use risingwave_common::error::RwError; + +use crate::parser::ParserConfig; +use crate::source::{SourceContextRef, SourceMessage, SplitReader, StreamChunkWithState}; + +pub(crate) trait CommonSplitReader: SplitReader + 'static { + fn into_data_stream( + self, + ) -> impl Stream, anyhow::Error>> + Send; +} + +#[try_stream(boxed, ok = StreamChunkWithState, error = RwError)] +pub(crate) async fn into_chunk_stream( + reader: impl CommonSplitReader + Send, + parser_config: ParserConfig, + source_ctx: SourceContextRef, +) { + let actor_id = source_ctx.source_info.actor_id.to_string(); + let source_id = source_ctx.source_info.source_id.to_string(); + let metrics = source_ctx.metrics.clone(); + + let data_stream = reader.into_data_stream(); + + let data_stream = data_stream + .inspect_ok(move |data_batch| { + let mut by_split_id = std::collections::HashMap::new(); + + for msg in data_batch { + by_split_id + .entry(msg.split_id.as_ref()) + .or_insert_with(Vec::new) + .push(msg); + } + + for (split_id, msgs) in by_split_id { + metrics + .partition_input_count + .with_label_values(&[&actor_id, &source_id, split_id]) + .inc_by(msgs.len() as u64); + + let sum_bytes = msgs + .iter() + .flat_map(|msg| msg.payload.as_ref().map(|p| p.len() as u64)) + .sum(); + + metrics + .partition_input_bytes + .with_label_values(&[&actor_id, &source_id, split_id]) + .inc_by(sum_bytes); + } + }) + .boxed(); + + let parser = + crate::parser::ByteStreamSourceParserImpl::create(parser_config, source_ctx).await?; + #[for_await] + for msg_batch in parser.into_stream(data_stream) { + yield msg_batch?; + } +} diff --git a/src/connector/src/source/datagen/source/reader.rs b/src/connector/src/source/datagen/source/reader.rs index d040a3cb63f21..30d1bfae10c4e 100644 --- a/src/connector/src/source/datagen/source/reader.rs +++ b/src/connector/src/source/datagen/source/reader.rs @@ -16,23 +16,20 @@ use std::collections::HashMap; use anyhow::{anyhow, Result}; use async_trait::async_trait; -use futures::{StreamExt, TryStreamExt}; -use futures_async_stream::try_stream; +use futures::{Stream, StreamExt, TryStreamExt}; use risingwave_common::field_generator::{FieldGeneratorImpl, VarcharProperty}; use super::generator::DatagenEventGenerator; -use crate::impl_common_split_reader_logic; use crate::parser::{EncodingProperties, ParserConfig, ProtocolProperties}; +use crate::source::common::{into_chunk_stream, CommonSplitReader}; use crate::source::data_gen_util::spawn_data_generation_stream; use crate::source::datagen::source::SEQUENCE_FIELD_KIND; use crate::source::datagen::{DatagenProperties, DatagenSplit, FieldDesc}; use crate::source::{ - BoxSourceStream, BoxSourceWithStateStream, Column, DataType, SourceContextRef, SplitId, + BoxSourceWithStateStream, Column, DataType, SourceContextRef, SourceMessage, SplitId, SplitImpl, SplitMetaData, SplitReader, }; -impl_common_split_reader_logic!(DatagenSplitReader, DatagenProperties); - pub struct DatagenSplitReader { generator: DatagenEventGenerator, assigned_split: DatagenSplit, @@ -170,13 +167,17 @@ impl SplitReader for DatagenSplitReader { ) .boxed() } - _ => self.into_chunk_stream(), + _ => { + let parser_config = self.parser_config.clone(); + let source_context = self.source_ctx.clone(); + into_chunk_stream(self, parser_config, source_context) + } } } } -impl DatagenSplitReader { - pub(crate) fn into_data_stream(self) -> BoxSourceStream { +impl CommonSplitReader for DatagenSplitReader { + fn into_data_stream(self) -> impl Stream, anyhow::Error>> { // Will buffer at most 4 event chunks. const BUFFER_SIZE: usize = 4; spawn_data_generation_stream(self.generator.into_msg_stream(), BUFFER_SIZE).boxed() diff --git a/src/connector/src/source/google_pubsub/source/reader.rs b/src/connector/src/source/google_pubsub/source/reader.rs index f5d4955ec9b20..d4fa8a9ab5c98 100644 --- a/src/connector/src/source/google_pubsub/source/reader.rs +++ b/src/connector/src/source/google_pubsub/source/reader.rs @@ -15,7 +15,6 @@ use anyhow::{anyhow, ensure, Context, Result}; use async_trait::async_trait; use chrono::{NaiveDateTime, TimeZone, Utc}; -use futures::{StreamExt, TryStreamExt}; use futures_async_stream::try_stream; use google_cloud_pubsub::client::Client; use google_cloud_pubsub::subscription::{SeekTo, Subscription}; @@ -23,8 +22,8 @@ use risingwave_common::bail; use tonic::Code; use super::TaggedReceivedMessage; -use crate::impl_common_split_reader_logic; use crate::parser::ParserConfig; +use crate::source::common::{into_chunk_stream, CommonSplitReader}; use crate::source::google_pubsub::PubsubProperties; use crate::source::{ BoxSourceWithStateStream, Column, SourceContextRef, SourceMessage, SplitId, SplitImpl, @@ -33,8 +32,6 @@ use crate::source::{ const PUBSUB_MAX_FETCH_MESSAGES: usize = 1024; -impl_common_split_reader_logic!(PubsubSplitReader, PubsubProperties); - pub struct PubsubSplitReader { subscription: Subscription, stop_offset: Option, @@ -44,8 +41,8 @@ pub struct PubsubSplitReader { source_ctx: SourceContextRef, } -impl PubsubSplitReader { - #[try_stream(boxed, ok = Vec, error = anyhow::Error)] +impl CommonSplitReader for PubsubSplitReader { + #[try_stream(ok = Vec, error = anyhow::Error)] async fn into_data_stream(self) { loop { let pull_result = self @@ -172,6 +169,8 @@ impl SplitReader for PubsubSplitReader { } fn into_stream(self) -> BoxSourceWithStateStream { - self.into_chunk_stream() + let parser_config = self.parser_config.clone(); + let source_context = self.source_ctx.clone(); + into_chunk_stream(self, parser_config, source_context) } } diff --git a/src/connector/src/source/kafka/source/reader.rs b/src/connector/src/source/kafka/source/reader.rs index d443429857c21..103255a882ed7 100644 --- a/src/connector/src/source/kafka/source/reader.rs +++ b/src/connector/src/source/kafka/source/reader.rs @@ -19,16 +19,16 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH}; use anyhow::{anyhow, Result}; use async_trait::async_trait; -use futures::{StreamExt, TryStreamExt}; +use futures::StreamExt; use futures_async_stream::try_stream; use rdkafka::config::RDKafkaLogLevel; use rdkafka::consumer::{Consumer, StreamConsumer}; use rdkafka::error::KafkaError; use rdkafka::{ClientConfig, Message, Offset, TopicPartitionList}; -use crate::impl_common_split_reader_logic; use crate::parser::ParserConfig; use crate::source::base::SourceMessage; +use crate::source::common::{into_chunk_stream, CommonSplitReader}; use crate::source::kafka::{ KafkaProperties, KafkaSplit, PrivateLinkConsumerContext, KAFKA_ISOLATION_LEVEL, }; @@ -37,8 +37,6 @@ use crate::source::{ SplitReader, }; -impl_common_split_reader_logic!(KafkaSplitReader, KafkaProperties); - pub struct KafkaSplitReader { consumer: StreamConsumer, offsets: HashMap, Option)>, @@ -159,7 +157,9 @@ impl SplitReader for KafkaSplitReader { } fn into_stream(self) -> BoxSourceWithStateStream { - self.into_chunk_stream() + let parser_config = self.parser_config.clone(); + let source_context = self.source_ctx.clone(); + into_chunk_stream(self, parser_config, source_context) } } @@ -176,9 +176,11 @@ impl KafkaSplitReader { ]) .set(offset); } +} - #[try_stream(boxed, ok = Vec, error = anyhow::Error)] - pub async fn into_data_stream(self) { +impl CommonSplitReader for KafkaSplitReader { + #[try_stream(ok = Vec, error = anyhow::Error)] + async fn into_data_stream(self) { if self.offsets.values().all(|(start_offset, stop_offset)| { match (start_offset, stop_offset) { (Some(start), Some(stop)) if (*start + 1) >= *stop => true, diff --git a/src/connector/src/source/kinesis/source/reader.rs b/src/connector/src/source/kinesis/source/reader.rs index 8d714f3b79334..2939908d4aef5 100644 --- a/src/connector/src/source/kinesis/source/reader.rs +++ b/src/connector/src/source/kinesis/source/reader.rs @@ -20,12 +20,11 @@ use aws_sdk_kinesis::error::{DisplayErrorContext, SdkError}; use aws_sdk_kinesis::operation::get_records::{GetRecordsError, GetRecordsOutput}; use aws_sdk_kinesis::types::ShardIteratorType; use aws_sdk_kinesis::Client as KinesisClient; -use futures::{StreamExt, TryStreamExt}; use futures_async_stream::try_stream; use tokio_retry; -use crate::impl_common_split_reader_logic; use crate::parser::ParserConfig; +use crate::source::common::{into_chunk_stream, CommonSplitReader}; use crate::source::kinesis::source::message::KinesisMessage; use crate::source::kinesis::split::KinesisOffset; use crate::source::kinesis::KinesisProperties; @@ -34,8 +33,6 @@ use crate::source::{ SplitMetaData, SplitReader, }; -impl_common_split_reader_logic!(KinesisSplitReader, KinesisProperties); - #[derive(Debug, Clone)] pub struct KinesisSplitReader { client: KinesisClient, @@ -108,13 +105,15 @@ impl SplitReader for KinesisSplitReader { } fn into_stream(self) -> BoxSourceWithStateStream { - self.into_chunk_stream() + let parser_config = self.parser_config.clone(); + let source_context = self.source_ctx.clone(); + into_chunk_stream(self, parser_config, source_context) } } -impl KinesisSplitReader { - #[try_stream(boxed, ok = Vec, error = anyhow::Error)] - pub(crate) async fn into_data_stream(mut self) { +impl CommonSplitReader for KinesisSplitReader { + #[try_stream(ok = Vec < SourceMessage >, error = anyhow::Error)] + async fn into_data_stream(mut self) { self.new_shard_iter().await?; loop { if self.shard_iter.is_none() { @@ -189,7 +188,8 @@ impl KinesisSplitReader { } } } - +} +impl KinesisSplitReader { async fn new_shard_iter(&mut self) -> Result<()> { let (starting_seq_num, iter_type) = if self.latest_offset.is_some() { ( @@ -269,7 +269,7 @@ impl KinesisSplitReader { #[cfg(test)] mod tests { - use futures::StreamExt; + use futures::{pin_mut, StreamExt}; use super::*; use crate::common::KinesisCommon; @@ -294,7 +294,7 @@ mod tests { seq_offset: None, }; - let mut trim_horizen_reader = KinesisSplitReader::new( + let trim_horizen_reader = KinesisSplitReader::new( properties.clone(), vec![SplitImpl::Kinesis(KinesisSplit { shard_id: "shardId-000000000001".to_string().into(), @@ -307,9 +307,10 @@ mod tests { ) .await? .into_data_stream(); + pin_mut!(trim_horizen_reader); println!("{:?}", trim_horizen_reader.next().await.unwrap()?); - let mut offset_reader = KinesisSplitReader::new( + let offset_reader = KinesisSplitReader::new( properties.clone(), vec![SplitImpl::Kinesis(KinesisSplit { shard_id: "shardId-000000000001".to_string().into(), @@ -324,6 +325,7 @@ mod tests { ) .await? .into_data_stream(); + pin_mut!(offset_reader); println!("{:?}", offset_reader.next().await.unwrap()?); Ok(()) diff --git a/src/connector/src/source/mod.rs b/src/connector/src/source/mod.rs index ac10ded25c688..b53fd0f924164 100644 --- a/src/connector/src/source/mod.rs +++ b/src/connector/src/source/mod.rs @@ -28,7 +28,9 @@ pub use base::*; pub use google_pubsub::GOOGLE_PUBSUB_CONNECTOR; pub use kafka::KAFKA_CONNECTOR; pub use kinesis::KINESIS_CONNECTOR; +mod common; mod manager; + pub use manager::SourceColumnDesc; pub use crate::source::nexmark::NEXMARK_CONNECTOR; diff --git a/src/connector/src/source/pulsar/source/reader.rs b/src/connector/src/source/pulsar/source/reader.rs index a8e801541f273..9c749c27d616a 100644 --- a/src/connector/src/source/pulsar/source/reader.rs +++ b/src/connector/src/source/pulsar/source/reader.rs @@ -16,7 +16,7 @@ use std::time::{SystemTime, UNIX_EPOCH}; use anyhow::{anyhow, ensure, Result}; use async_trait::async_trait; -use futures::{StreamExt, TryStreamExt}; +use futures::StreamExt; use futures_async_stream::try_stream; use itertools::Itertools; use pulsar::consumer::InitialPosition; @@ -24,8 +24,8 @@ use pulsar::message::proto::MessageIdData; use pulsar::{Consumer, ConsumerBuilder, ConsumerOptions, Pulsar, SubType, TokioExecutor}; use risingwave_common::try_match_expand; -use crate::impl_common_split_reader_logic; use crate::parser::ParserConfig; +use crate::source::common::{into_chunk_stream, CommonSplitReader}; use crate::source::pulsar::split::PulsarSplit; use crate::source::pulsar::{PulsarEnumeratorOffset, PulsarProperties}; use crate::source::{ @@ -33,8 +33,6 @@ use crate::source::{ SplitMetaData, SplitReader, }; -impl_common_split_reader_logic!(PulsarSplitReader, PulsarProperties); - pub struct PulsarSplitReader { pulsar: Pulsar, consumer: Consumer, TokioExecutor>, @@ -170,13 +168,15 @@ impl SplitReader for PulsarSplitReader { } fn into_stream(self) -> BoxSourceWithStateStream { - self.into_chunk_stream() + let parser_config = self.parser_config.clone(); + let source_context = self.source_ctx.clone(); + into_chunk_stream(self, parser_config, source_context) } } -impl PulsarSplitReader { - #[try_stream(boxed, ok = Vec, error = anyhow::Error)] - pub(crate) async fn into_data_stream(self) { +impl CommonSplitReader for PulsarSplitReader { + #[try_stream(ok = Vec, error = anyhow::Error)] + async fn into_data_stream(self) { let max_chunk_size = self.source_ctx.source_ctrl_opts.chunk_size; #[for_await] for msgs in self.consumer.ready_chunks(max_chunk_size) { From 8cdc90ebd99d4faa8938f2a658628694f02a5fd6 Mon Sep 17 00:00:00 2001 From: William Wen Date: Wed, 6 Sep 2023 01:48:29 +0800 Subject: [PATCH 3/3] extract more common cdc logic --- src/connector/src/macros.rs | 89 ++++++++++++++++++- src/connector/src/source/base.rs | 64 ++----------- src/connector/src/source/cdc/mod.rs | 53 ++--------- .../src/executor/backfill/cdc_backfill.rs | 2 +- 4 files changed, 104 insertions(+), 104 deletions(-) diff --git a/src/connector/src/macros.rs b/src/connector/src/macros.rs index 4291978e18117..a30ad58ce07ce 100644 --- a/src/connector/src/macros.rs +++ b/src/connector/src/macros.rs @@ -180,7 +180,8 @@ macro_rules! impl_connector_properties { pub fn extract(mut props: HashMap) -> Result { const UPSTREAM_SOURCE_KEY: &str = "connector"; let connector = props.remove(UPSTREAM_SOURCE_KEY).ok_or_else(|| anyhow!("Must specify 'connector' in WITH clause"))?; - if connector.ends_with("-cdc") { + use $crate::source::cdc::CDC_CONNECTOR_NAME_SUFFIX; + if connector.ends_with(CDC_CONNECTOR_NAME_SUFFIX) { ConnectorProperties::new_cdc_properties(&connector, props) } else { let json_value = serde_json::to_value(props).map_err(|e| anyhow!(e))?; @@ -199,3 +200,89 @@ macro_rules! impl_connector_properties { } } } + +#[macro_export] +macro_rules! impl_cdc_source_type { + ($({$source_type:ident, $name:expr }),*) => { + $( + paste!{ + #[derive(Clone, Debug, Default, PartialEq, Eq, Hash)] + pub struct $source_type; + impl CdcSourceTypeTrait for $source_type { + const CDC_CONNECTOR_NAME: &'static str = concat!($name, "-cdc"); + fn source_type() -> CdcSourceType { + CdcSourceType::$source_type + } + } + + pub type [< $source_type DebeziumSplitEnumerator >] = DebeziumSplitEnumerator<$source_type>; + } + )* + + pub enum CdcSourceType { + $( + $source_type, + )* + } + + impl From for CdcSourceType { + fn from(value: PbSourceType) -> Self { + match value { + PbSourceType::Unspecified => unreachable!(), + $( + PbSourceType::$source_type => CdcSourceType::$source_type, + )* + } + } + } + + impl From for PbSourceType { + fn from(this: CdcSourceType) -> PbSourceType { + match this { + $( + CdcSourceType::$source_type => PbSourceType::$source_type, + )* + } + } + } + + impl ConnectorProperties { + pub(crate) fn new_cdc_properties( + connector_name: &str, + properties: HashMap, + ) -> std::result::Result { + match connector_name { + $( + $source_type::CDC_CONNECTOR_NAME => paste! { + Ok(Self::[< $source_type Cdc >](Box::new(CdcProperties::<$source_type> { + props: properties, + ..Default::default() + }))) + }, + )* + _ => Err(anyhow::anyhow!("unexpected cdc connector '{}'", connector_name,)), + } + } + + pub fn init_cdc_properties(&mut self, table_schema: PbTableSchema) { + match self { + $( + paste! {ConnectorProperties:: [< $source_type Cdc >](c)} => { + c.table_schema = table_schema; + } + )* + _ => {} + } + } + + pub fn is_cdc_connector(&self) -> bool { + match self { + $( + paste! {ConnectorProperties:: [< $source_type Cdc >](_)} => true, + )* + _ => false, + } + } + } + } +} diff --git a/src/connector/src/source/base.rs b/src/connector/src/source/base.rs index cb39c8dcc9b41..5a94b36a6b8e8 100644 --- a/src/connector/src/source/base.rs +++ b/src/connector/src/source/base.rs @@ -27,7 +27,6 @@ use risingwave_common::array::StreamChunk; use risingwave_common::catalog::TableId; use risingwave_common::error::{ErrorSuppressor, RwError}; use risingwave_common::types::{JsonbVal, Scalar}; -use risingwave_pb::connector_service::PbTableSchema; use risingwave_pb::source::ConnectorSplit; use risingwave_rpc_client::ConnectorClient; @@ -299,7 +298,7 @@ pub enum ConnectorProperties { Nexmark(Box), Datagen(Box), S3(Box), - MySqlCdc(Box>), + MysqlCdc(Box>), PostgresCdc(Box>), CitusCdc(Box>), GooglePubsub(Box), @@ -308,51 +307,6 @@ pub enum ConnectorProperties { } impl ConnectorProperties { - fn new_cdc_properties( - connector_name: &str, - properties: HashMap, - ) -> Result { - match connector_name { - MYSQL_CDC_CONNECTOR => Ok(Self::MySqlCdc(Box::new(CdcProperties:: { - props: properties, - ..Default::default() - }))), - POSTGRES_CDC_CONNECTOR => Ok(Self::PostgresCdc(Box::new(CdcProperties:: { - props: properties, - ..Default::default() - }))), - CITUS_CDC_CONNECTOR => Ok(Self::CitusCdc(Box::new(CdcProperties:: { - props: properties, - ..Default::default() - }))), - _ => Err(anyhow!("unexpected cdc connector '{}'", connector_name,)), - } - } - - pub fn init_cdc_properties(&mut self, table_schema: PbTableSchema) { - match self { - ConnectorProperties::MySqlCdc(c) => { - c.table_schema = table_schema; - } - ConnectorProperties::PostgresCdc(c) => { - c.table_schema = table_schema; - } - ConnectorProperties::CitusCdc(c) => { - c.table_schema = table_schema; - } - _ => {} - } - } - - pub fn is_cdc_connector(&self) -> bool { - matches!( - self, - ConnectorProperties::MySqlCdc(_) - | ConnectorProperties::PostgresCdc(_) - | ConnectorProperties::CitusCdc(_) - ) - } - pub fn support_multiple_splits(&self) -> bool { matches!(self, ConnectorProperties::Kafka(_)) } @@ -366,7 +320,7 @@ pub enum SplitImpl { Nexmark(NexmarkSplit), Datagen(DatagenSplit), GooglePubsub(PubsubSplit), - MySqlCdc(DebeziumCdcSplit), + MysqlCdc(DebeziumCdcSplit), PostgresCdc(DebeziumCdcSplit), CitusCdc(DebeziumCdcSplit), Nats(NatsSplit), @@ -399,7 +353,7 @@ pub enum SplitReaderImpl { Nexmark(Box), Pulsar(Box), Datagen(Box), - MySqlCdc(Box>), + MysqlCdc(Box>), PostgresCdc(Box>), CitusCdc(Box>), GooglePubsub(Box), @@ -412,7 +366,7 @@ pub enum SplitEnumeratorImpl { Kinesis(KinesisSplitEnumerator), Nexmark(NexmarkSplitEnumerator), Datagen(DatagenSplitEnumerator), - MySqlCdc(MysqlDebeziumSplitEnumerator), + MysqlCdc(MysqlDebeziumSplitEnumerator), PostgresCdc(PostgresDebeziumSplitEnumerator), CitusCdc(CitusDebeziumSplitEnumerator), GooglePubsub(PubsubSplitEnumerator), @@ -437,7 +391,7 @@ impl_split_enumerator! { { Kinesis, KinesisSplitEnumerator }, { Nexmark, NexmarkSplitEnumerator }, { Datagen, DatagenSplitEnumerator }, - { MySqlCdc, DebeziumSplitEnumerator }, + { MysqlCdc, DebeziumSplitEnumerator }, { PostgresCdc, DebeziumSplitEnumerator }, { CitusCdc, DebeziumSplitEnumerator }, { GooglePubsub, PubsubSplitEnumerator}, @@ -452,7 +406,7 @@ impl_split! { { Nexmark, NEXMARK_CONNECTOR, NexmarkSplit }, { Datagen, DATAGEN_CONNECTOR, DatagenSplit }, { GooglePubsub, GOOGLE_PUBSUB_CONNECTOR, PubsubSplit }, - { MySqlCdc, MYSQL_CDC_CONNECTOR, DebeziumCdcSplit }, + { MysqlCdc, MYSQL_CDC_CONNECTOR, DebeziumCdcSplit }, { PostgresCdc, POSTGRES_CDC_CONNECTOR, DebeziumCdcSplit }, { CitusCdc, CITUS_CDC_CONNECTOR, DebeziumCdcSplit }, { S3, S3_CONNECTOR, FsSplit }, @@ -466,7 +420,7 @@ impl_split_reader! { { Kinesis, KinesisSplitReader }, { Nexmark, NexmarkSplitReader }, { Datagen, DatagenSplitReader }, - { MySqlCdc, CdcSplitReader}, + { MysqlCdc, CdcSplitReader}, { PostgresCdc, CdcSplitReader}, { CitusCdc, CdcSplitReader }, { GooglePubsub, PubsubSplitReader }, @@ -565,7 +519,7 @@ mod tests { let offset_str = "{\"sourcePartition\":{\"server\":\"RW_CDC_mydb.products\"},\"sourceOffset\":{\"transaction_id\":null,\"ts_sec\":1670407377,\"file\":\"binlog.000001\",\"pos\":98587,\"row\":2,\"server_id\":1,\"event\":2}}"; let mysql_split = MySqlCdcSplit::new(1001, offset_str.to_string()); let split = DebeziumCdcSplit::new(Some(mysql_split), None); - let split_impl = SplitImpl::MySqlCdc(split); + let split_impl = SplitImpl::MysqlCdc(split); let encoded_split = split_impl.encode_to_bytes(); let restored_split_impl = SplitImpl::restore_from_bytes(encoded_split.as_ref())?; assert_eq!( @@ -653,7 +607,7 @@ mod tests { )); let conn_props = ConnectorProperties::extract(user_props_mysql).unwrap(); - if let ConnectorProperties::MySqlCdc(c) = conn_props { + if let ConnectorProperties::MysqlCdc(c) = conn_props { assert_eq!(c.props.get("connector_node_addr").unwrap(), "localhost"); assert_eq!(c.props.get("database.hostname").unwrap(), "127.0.0.1"); assert_eq!(c.props.get("database.port").unwrap(), "3306"); diff --git a/src/connector/src/source/cdc/mod.rs b/src/connector/src/source/cdc/mod.rs index 0205ecfd9147b..86a8b16adec02 100644 --- a/src/connector/src/source/cdc/mod.rs +++ b/src/connector/src/source/cdc/mod.rs @@ -21,10 +21,15 @@ use std::marker::PhantomData; pub use enumerator::*; use paste::paste; use risingwave_common::catalog::{ColumnDesc, Field, Schema}; -use risingwave_pb::connector_service::{PbSourceType, SourceType, TableSchema}; +use risingwave_pb::connector_service::{PbSourceType, PbTableSchema, SourceType, TableSchema}; pub use source::*; pub use split::*; +use crate::impl_cdc_source_type; +use crate::source::ConnectorProperties; + +pub const CDC_CONNECTOR_NAME_SUFFIX: &str = "-cdc"; + pub const MYSQL_CDC_CONNECTOR: &str = Mysql::CDC_CONNECTOR_NAME; pub const POSTGRES_CDC_CONNECTOR: &str = Postgres::CDC_CONNECTOR_NAME; pub const CITUS_CDC_CONNECTOR: &str = Citus::CDC_CONNECTOR_NAME; @@ -34,52 +39,6 @@ pub trait CdcSourceTypeTrait: Send + Sync + Clone + 'static { fn source_type() -> CdcSourceType; } -macro_rules! impl_cdc_source_type { - ($({$source_type:ident, $name:expr }),*) => { - $( - paste!{ - #[derive(Clone, Debug, Default, PartialEq, Eq, Hash)] - pub struct $source_type; - impl CdcSourceTypeTrait for $source_type { - const CDC_CONNECTOR_NAME: &'static str = concat!($name, "-cdc"); - fn source_type() -> CdcSourceType { - CdcSourceType::$source_type - } - } - - pub type [< $source_type DebeziumSplitEnumerator >] = DebeziumSplitEnumerator<$source_type>; - } - )* - - pub enum CdcSourceType { - $( - $source_type, - )* - } - - impl From for CdcSourceType { - fn from(value: PbSourceType) -> Self { - match value { - PbSourceType::Unspecified => unreachable!(), - $( - PbSourceType::$source_type => CdcSourceType::$source_type, - )* - } - } - } - - impl From for PbSourceType { - fn from(this: CdcSourceType) -> PbSourceType { - match this { - $( - CdcSourceType::$source_type => PbSourceType::$source_type, - )* - } - } - } - } -} - impl_cdc_source_type!({ Mysql, "mysql" }, { Postgres, "postgres" }, { Citus, "citus" }); #[derive(Clone, Debug, Default)] diff --git a/src/stream/src/executor/backfill/cdc_backfill.rs b/src/stream/src/executor/backfill/cdc_backfill.rs index 55742348bbf03..2f522ae8eeb0c 100644 --- a/src/stream/src/executor/backfill/cdc_backfill.rs +++ b/src/stream/src/executor/backfill/cdc_backfill.rs @@ -492,7 +492,7 @@ impl CdcBackfillExecutor { .set(key.into(), JsonbVal::from(Value::Bool(true))) .await?; - if let Some(SplitImpl::MySqlCdc(split)) = cdc_split.as_mut() + if let Some(SplitImpl::MysqlCdc(split)) = cdc_split.as_mut() && let Some(s) = split.mysql_split.as_mut() { let start_offset = last_binlog_offset.as_ref().map(|cdc_offset| {