Skip to content

Commit

Permalink
refactor(source): specify cdc generic type parameter for different cd…
Browse files Browse the repository at this point in the history
…c source (#12109)
  • Loading branch information
wenym1 authored Sep 7, 2023
1 parent 7d940cf commit 471aa2b
Show file tree
Hide file tree
Showing 9 changed files with 272 additions and 165 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ nexmark = { version = "0.2", features = ["serde"] }
num-bigint = "0.4"
opendal = "0.39"
parking_lot = "0.12"
paste = "1"
prometheus = { version = "0.13", features = ["process"] }
prost = { version = "0.11.9", features = ["no-recursion-limit"] }
prost-reflect = "0.11.5"
Expand Down
110 changes: 108 additions & 2 deletions src/connector/src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ macro_rules! impl_split_enumerator {
.map(SplitImpl::$variant_name)
.collect_vec()
})
.map_err(|e| ErrorCode::ConnectorError(e.into()).into()),
.map_err(|e| risingwave_common::error::ErrorCode::ConnectorError(e.into()).into()),
)*
}
}
Expand All @@ -55,6 +55,25 @@ macro_rules! impl_split {
}
}
}
$(
impl TryFrom<SplitImpl> for $split {
type Error = anyhow::Error;

fn try_from(split: SplitImpl) -> std::result::Result<Self, Self::Error> {
match split {
SplitImpl::$variant_name(inner) => Ok(inner),
other => Err(anyhow::anyhow!("expect {} but get {:?}", stringify!($split), other))
}
}
}

impl From<$split> for SplitImpl {
fn from(split: $split) -> SplitImpl {
SplitImpl::$variant_name(split)
}
}

)*

impl TryFrom<&ConnectorSplit> for SplitImpl {
type Error = anyhow::Error;
Expand Down Expand Up @@ -161,7 +180,8 @@ macro_rules! impl_connector_properties {
pub fn extract(mut props: HashMap<String, String>) -> Result<Self> {
const UPSTREAM_SOURCE_KEY: &str = "connector";
let connector = props.remove(UPSTREAM_SOURCE_KEY).ok_or_else(|| anyhow!("Must specify 'connector' in WITH clause"))?;
if connector.ends_with("cdc") {
use $crate::source::cdc::CDC_CONNECTOR_NAME_SUFFIX;
if connector.ends_with(CDC_CONNECTOR_NAME_SUFFIX) {
ConnectorProperties::new_cdc_properties(&connector, props)
} else {
let json_value = serde_json::to_value(props).map_err(|e| anyhow!(e))?;
Expand All @@ -180,3 +200,89 @@ macro_rules! impl_connector_properties {
}
}
}

#[macro_export]
macro_rules! impl_cdc_source_type {
($({$source_type:ident, $name:expr }),*) => {
$(
paste!{
#[derive(Clone, Debug, Default, PartialEq, Eq, Hash)]
pub struct $source_type;
impl CdcSourceTypeTrait for $source_type {
const CDC_CONNECTOR_NAME: &'static str = concat!($name, "-cdc");
fn source_type() -> CdcSourceType {
CdcSourceType::$source_type
}
}

pub type [< $source_type DebeziumSplitEnumerator >] = DebeziumSplitEnumerator<$source_type>;
}
)*

pub enum CdcSourceType {
$(
$source_type,
)*
}

impl From<PbSourceType> for CdcSourceType {
fn from(value: PbSourceType) -> Self {
match value {
PbSourceType::Unspecified => unreachable!(),
$(
PbSourceType::$source_type => CdcSourceType::$source_type,
)*
}
}
}

impl From<CdcSourceType> for PbSourceType {
fn from(this: CdcSourceType) -> PbSourceType {
match this {
$(
CdcSourceType::$source_type => PbSourceType::$source_type,
)*
}
}
}

impl ConnectorProperties {
pub(crate) fn new_cdc_properties(
connector_name: &str,
properties: HashMap<String, String>,
) -> std::result::Result<Self, anyhow::Error> {
match connector_name {
$(
$source_type::CDC_CONNECTOR_NAME => paste! {
Ok(Self::[< $source_type Cdc >](Box::new(CdcProperties::<$source_type> {
props: properties,
..Default::default()
})))
},
)*
_ => Err(anyhow::anyhow!("unexpected cdc connector '{}'", connector_name,)),
}
}

pub fn init_cdc_properties(&mut self, table_schema: PbTableSchema) {
match self {
$(
paste! {ConnectorProperties:: [< $source_type Cdc >](c)} => {
c.table_schema = table_schema;
}
)*
_ => {}
}
}

pub fn is_cdc_connector(&self) -> bool {
match self {
$(
paste! {ConnectorProperties:: [< $source_type Cdc >](_)} => true,
)*
_ => false,
}
}
}
}
}
104 changes: 28 additions & 76 deletions src/connector/src/source/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,13 @@ use itertools::Itertools;
use parking_lot::Mutex;
use risingwave_common::array::StreamChunk;
use risingwave_common::catalog::TableId;
use risingwave_common::error::{ErrorCode, ErrorSuppressor, RwError};
use risingwave_common::error::{ErrorSuppressor, RwError};
use risingwave_common::types::{JsonbVal, Scalar};
use risingwave_pb::connector_service::PbTableSchema;
use risingwave_pb::source::ConnectorSplit;
use risingwave_rpc_client::ConnectorClient;
use serde::{Deserialize, Serialize};

use super::datagen::DatagenMeta;
use super::filesystem::{FsSplit, S3FileReader, S3Properties, S3SplitEnumerator, S3_CONNECTOR};
use super::filesystem::{FsSplit, S3Properties, S3_CONNECTOR};
use super::google_pubsub::GooglePubsubMeta;
use super::kafka::KafkaMeta;
use super::monitor::SourceMetrics;
Expand All @@ -42,13 +40,16 @@ use super::nats::source::NatsSplitReader;
use super::nexmark::source::message::NexmarkMeta;
use crate::parser::ParserConfig;
use crate::source::cdc::{
CdcProperties, CdcSplitReader, DebeziumCdcSplit, DebeziumSplitEnumerator, CITUS_CDC_CONNECTOR,
MYSQL_CDC_CONNECTOR, POSTGRES_CDC_CONNECTOR,
CdcProperties, CdcSplitReader, Citus, CitusDebeziumSplitEnumerator, DebeziumCdcSplit,
DebeziumSplitEnumerator, Mysql, MysqlDebeziumSplitEnumerator, Postgres,
PostgresDebeziumSplitEnumerator, CITUS_CDC_CONNECTOR, MYSQL_CDC_CONNECTOR,
POSTGRES_CDC_CONNECTOR,
};
use crate::source::datagen::{
DatagenProperties, DatagenSplit, DatagenSplitEnumerator, DatagenSplitReader, DATAGEN_CONNECTOR,
};
use crate::source::dummy_connector::DummySplitReader;
use crate::source::filesystem::{S3FileReader, S3SplitEnumerator};
use crate::source::google_pubsub::{
PubsubProperties, PubsubSplit, PubsubSplitEnumerator, PubsubSplitReader,
GOOGLE_PUBSUB_CONNECTOR,
Expand Down Expand Up @@ -289,83 +290,39 @@ pub trait SplitReader: Sized {
fn into_stream(self) -> BoxSourceWithStateStream;
}

#[derive(Clone, Debug, Deserialize)]
#[derive(Clone, Debug)]
pub enum ConnectorProperties {
Kafka(Box<KafkaProperties>),
Pulsar(Box<PulsarProperties>),
Kinesis(Box<KinesisProperties>),
Nexmark(Box<NexmarkProperties>),
Datagen(Box<DatagenProperties>),
S3(Box<S3Properties>),
MySqlCdc(Box<CdcProperties>),
PostgresCdc(Box<CdcProperties>),
CitusCdc(Box<CdcProperties>),
MysqlCdc(Box<CdcProperties<Mysql>>),
PostgresCdc(Box<CdcProperties<Postgres>>),
CitusCdc(Box<CdcProperties<Citus>>),
GooglePubsub(Box<PubsubProperties>),
Nats(Box<NatsProperties>),
Dummy(Box<()>),
}

impl ConnectorProperties {
fn new_cdc_properties(
connector_name: &str,
properties: HashMap<String, String>,
) -> Result<Self> {
match connector_name {
MYSQL_CDC_CONNECTOR => Ok(Self::MySqlCdc(Box::new(CdcProperties {
props: properties,
source_type: "mysql".to_string(),
..Default::default()
}))),
POSTGRES_CDC_CONNECTOR => Ok(Self::PostgresCdc(Box::new(CdcProperties {
props: properties,
source_type: "postgres".to_string(),
..Default::default()
}))),
CITUS_CDC_CONNECTOR => Ok(Self::CitusCdc(Box::new(CdcProperties {
props: properties,
source_type: "citus".to_string(),
..Default::default()
}))),
_ => Err(anyhow!("unexpected cdc connector '{}'", connector_name,)),
}
}

pub fn init_cdc_properties(&mut self, table_schema: PbTableSchema) {
match self {
ConnectorProperties::MySqlCdc(c)
| ConnectorProperties::PostgresCdc(c)
| ConnectorProperties::CitusCdc(c) => {
c.table_schema = table_schema;
}
_ => {}
}
}

pub fn is_cdc_connector(&self) -> bool {
matches!(
self,
ConnectorProperties::MySqlCdc(_)
| ConnectorProperties::PostgresCdc(_)
| ConnectorProperties::CitusCdc(_)
)
}

pub fn support_multiple_splits(&self) -> bool {
matches!(self, ConnectorProperties::Kafka(_))
}
}

#[derive(Debug, Clone, Serialize, Deserialize, EnumAsInner, PartialEq, Hash)]
#[derive(Debug, Clone, EnumAsInner, PartialEq, Hash)]
pub enum SplitImpl {
Kafka(KafkaSplit),
Pulsar(PulsarSplit),
Kinesis(KinesisSplit),
Nexmark(NexmarkSplit),
Datagen(DatagenSplit),
GooglePubsub(PubsubSplit),
MySqlCdc(DebeziumCdcSplit),
PostgresCdc(DebeziumCdcSplit),
CitusCdc(DebeziumCdcSplit),
MysqlCdc(DebeziumCdcSplit<Mysql>),
PostgresCdc(DebeziumCdcSplit<Postgres>),
CitusCdc(DebeziumCdcSplit<Citus>),
Nats(NatsSplit),
S3(FsSplit),
}
Expand Down Expand Up @@ -396,9 +353,9 @@ pub enum SplitReaderImpl {
Nexmark(Box<NexmarkSplitReader>),
Pulsar(Box<PulsarSplitReader>),
Datagen(Box<DatagenSplitReader>),
MySqlCdc(Box<CdcSplitReader>),
PostgresCdc(Box<CdcSplitReader>),
CitusCdc(Box<CdcSplitReader>),
MysqlCdc(Box<CdcSplitReader<Mysql>>),
PostgresCdc(Box<CdcSplitReader<Postgres>>),
CitusCdc(Box<CdcSplitReader<Citus>>),
GooglePubsub(Box<PubsubSplitReader>),
Nats(Box<NatsSplitReader>),
}
Expand All @@ -409,9 +366,9 @@ pub enum SplitEnumeratorImpl {
Kinesis(KinesisSplitEnumerator),
Nexmark(NexmarkSplitEnumerator),
Datagen(DatagenSplitEnumerator),
MySqlCdc(DebeziumSplitEnumerator),
PostgresCdc(DebeziumSplitEnumerator),
CitusCdc(DebeziumSplitEnumerator),
MysqlCdc(MysqlDebeziumSplitEnumerator),
PostgresCdc(PostgresDebeziumSplitEnumerator),
CitusCdc(CitusDebeziumSplitEnumerator),
GooglePubsub(PubsubSplitEnumerator),
S3(S3SplitEnumerator),
Nats(NatsSplitEnumerator),
Expand All @@ -424,9 +381,6 @@ impl_connector_properties! {
{ Nexmark, NEXMARK_CONNECTOR },
{ Datagen, DATAGEN_CONNECTOR },
{ S3, S3_CONNECTOR },
{ MySqlCdc, MYSQL_CDC_CONNECTOR },
{ PostgresCdc, POSTGRES_CDC_CONNECTOR },
{ CitusCdc, CITUS_CDC_CONNECTOR },
{ GooglePubsub, GOOGLE_PUBSUB_CONNECTOR},
{ Nats, NATS_CONNECTOR }
}
Expand All @@ -437,7 +391,7 @@ impl_split_enumerator! {
{ Kinesis, KinesisSplitEnumerator },
{ Nexmark, NexmarkSplitEnumerator },
{ Datagen, DatagenSplitEnumerator },
{ MySqlCdc, DebeziumSplitEnumerator },
{ MysqlCdc, DebeziumSplitEnumerator },
{ PostgresCdc, DebeziumSplitEnumerator },
{ CitusCdc, DebeziumSplitEnumerator },
{ GooglePubsub, PubsubSplitEnumerator},
Expand All @@ -452,9 +406,9 @@ impl_split! {
{ Nexmark, NEXMARK_CONNECTOR, NexmarkSplit },
{ Datagen, DATAGEN_CONNECTOR, DatagenSplit },
{ GooglePubsub, GOOGLE_PUBSUB_CONNECTOR, PubsubSplit },
{ MySqlCdc, MYSQL_CDC_CONNECTOR, DebeziumCdcSplit },
{ PostgresCdc, POSTGRES_CDC_CONNECTOR, DebeziumCdcSplit },
{ CitusCdc, CITUS_CDC_CONNECTOR, DebeziumCdcSplit },
{ MysqlCdc, MYSQL_CDC_CONNECTOR, DebeziumCdcSplit<Mysql> },
{ PostgresCdc, POSTGRES_CDC_CONNECTOR, DebeziumCdcSplit<Postgres> },
{ CitusCdc, CITUS_CDC_CONNECTOR, DebeziumCdcSplit<Citus> },
{ S3, S3_CONNECTOR, FsSplit },
{ Nats, NATS_CONNECTOR, NatsSplit }
}
Expand All @@ -466,7 +420,7 @@ impl_split_reader! {
{ Kinesis, KinesisSplitReader },
{ Nexmark, NexmarkSplitReader },
{ Datagen, DatagenSplitReader },
{ MySqlCdc, CdcSplitReader},
{ MysqlCdc, CdcSplitReader},
{ PostgresCdc, CdcSplitReader},
{ CitusCdc, CdcSplitReader },
{ GooglePubsub, PubsubSplitReader },
Expand Down Expand Up @@ -565,7 +519,7 @@ mod tests {
let offset_str = "{\"sourcePartition\":{\"server\":\"RW_CDC_mydb.products\"},\"sourceOffset\":{\"transaction_id\":null,\"ts_sec\":1670407377,\"file\":\"binlog.000001\",\"pos\":98587,\"row\":2,\"server_id\":1,\"event\":2}}";
let mysql_split = MySqlCdcSplit::new(1001, offset_str.to_string());
let split = DebeziumCdcSplit::new(Some(mysql_split), None);
let split_impl = SplitImpl::MySqlCdc(split);
let split_impl = SplitImpl::MysqlCdc(split);
let encoded_split = split_impl.encode_to_bytes();
let restored_split_impl = SplitImpl::restore_from_bytes(encoded_split.as_ref())?;
assert_eq!(
Expand Down Expand Up @@ -653,8 +607,7 @@ mod tests {
));

let conn_props = ConnectorProperties::extract(user_props_mysql).unwrap();
if let ConnectorProperties::MySqlCdc(c) = conn_props {
assert_eq!(c.source_type, "mysql");
if let ConnectorProperties::MysqlCdc(c) = conn_props {
assert_eq!(c.props.get("connector_node_addr").unwrap(), "localhost");
assert_eq!(c.props.get("database.hostname").unwrap(), "127.0.0.1");
assert_eq!(c.props.get("database.port").unwrap(), "3306");
Expand All @@ -668,7 +621,6 @@ mod tests {

let conn_props = ConnectorProperties::extract(user_props_postgres).unwrap();
if let ConnectorProperties::PostgresCdc(c) = conn_props {
assert_eq!(c.source_type, "postgres");
assert_eq!(c.props.get("connector_node_addr").unwrap(), "localhost");
assert_eq!(c.props.get("database.hostname").unwrap(), "127.0.0.1");
assert_eq!(c.props.get("database.port").unwrap(), "5432");
Expand Down
Loading

0 comments on commit 471aa2b

Please sign in to comment.