Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(source): specify cdc generic type parameter for different cdc source #12109

Merged
merged 7 commits into from
Sep 7, 2023
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ nexmark = { version = "0.2", features = ["serde"] }
num-bigint = "0.4"
opendal = "0.39"
parking_lot = "0.12"
paste = "1"
prometheus = { version = "0.13", features = ["process"] }
prost = { version = "0.11.9", features = ["no-recursion-limit"] }
prost-reflect = "0.11.5"
Expand Down
110 changes: 108 additions & 2 deletions src/connector/src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ macro_rules! impl_split_enumerator {
.map(SplitImpl::$variant_name)
.collect_vec()
})
.map_err(|e| ErrorCode::ConnectorError(e.into()).into()),
.map_err(|e| risingwave_common::error::ErrorCode::ConnectorError(e.into()).into()),
)*
}
}
Expand All @@ -55,6 +55,25 @@ macro_rules! impl_split {
}
}
}
$(
impl TryFrom<SplitImpl> for $split {
type Error = anyhow::Error;

fn try_from(split: SplitImpl) -> std::result::Result<Self, Self::Error> {
match split {
SplitImpl::$variant_name(inner) => Ok(inner),
other => Err(anyhow::anyhow!("expect {} but get {:?}", stringify!($split), other))
}
}
}

impl From<$split> for SplitImpl {
fn from(split: $split) -> SplitImpl {
SplitImpl::$variant_name(split)
}
}

)*

impl TryFrom<&ConnectorSplit> for SplitImpl {
type Error = anyhow::Error;
Expand Down Expand Up @@ -161,7 +180,8 @@ macro_rules! impl_connector_properties {
pub fn extract(mut props: HashMap<String, String>) -> Result<Self> {
const UPSTREAM_SOURCE_KEY: &str = "connector";
let connector = props.remove(UPSTREAM_SOURCE_KEY).ok_or_else(|| anyhow!("Must specify 'connector' in WITH clause"))?;
if connector.ends_with("cdc") {
use $crate::source::cdc::CDC_CONNECTOR_NAME_SUFFIX;
if connector.ends_with(CDC_CONNECTOR_NAME_SUFFIX) {
ConnectorProperties::new_cdc_properties(&connector, props)
} else {
let json_value = serde_json::to_value(props).map_err(|e| anyhow!(e))?;
Expand All @@ -180,3 +200,89 @@ macro_rules! impl_connector_properties {
}
}
}

#[macro_export]
macro_rules! impl_cdc_source_type {
($({$source_type:ident, $name:expr }),*) => {
$(
paste!{
#[derive(Clone, Debug, Default, PartialEq, Eq, Hash)]
pub struct $source_type;
impl CdcSourceTypeTrait for $source_type {
const CDC_CONNECTOR_NAME: &'static str = concat!($name, "-cdc");
fn source_type() -> CdcSourceType {
CdcSourceType::$source_type
}
}

pub type [< $source_type DebeziumSplitEnumerator >] = DebeziumSplitEnumerator<$source_type>;
}
)*

pub enum CdcSourceType {
$(
$source_type,
)*
}

impl From<PbSourceType> for CdcSourceType {
fn from(value: PbSourceType) -> Self {
match value {
PbSourceType::Unspecified => unreachable!(),
$(
PbSourceType::$source_type => CdcSourceType::$source_type,
)*
}
}
}

impl From<CdcSourceType> for PbSourceType {
fn from(this: CdcSourceType) -> PbSourceType {
match this {
$(
CdcSourceType::$source_type => PbSourceType::$source_type,
)*
}
}
}

impl ConnectorProperties {
pub(crate) fn new_cdc_properties(
connector_name: &str,
properties: HashMap<String, String>,
) -> std::result::Result<Self, anyhow::Error> {
match connector_name {
$(
$source_type::CDC_CONNECTOR_NAME => paste! {
Ok(Self::[< $source_type Cdc >](Box::new(CdcProperties::<$source_type> {
props: properties,
..Default::default()
})))
},
)*
_ => Err(anyhow::anyhow!("unexpected cdc connector '{}'", connector_name,)),
}
}

pub fn init_cdc_properties(&mut self, table_schema: PbTableSchema) {
match self {
$(
paste! {ConnectorProperties:: [< $source_type Cdc >](c)} => {
c.table_schema = table_schema;
}
)*
_ => {}
}
}

pub fn is_cdc_connector(&self) -> bool {
match self {
$(
paste! {ConnectorProperties:: [< $source_type Cdc >](_)} => true,
)*
_ => false,
}
}
}
}
}
104 changes: 28 additions & 76 deletions src/connector/src/source/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,13 @@ use itertools::Itertools;
use parking_lot::Mutex;
use risingwave_common::array::StreamChunk;
use risingwave_common::catalog::TableId;
use risingwave_common::error::{ErrorCode, ErrorSuppressor, RwError};
use risingwave_common::error::{ErrorSuppressor, RwError};
use risingwave_common::types::{JsonbVal, Scalar};
use risingwave_pb::connector_service::PbTableSchema;
use risingwave_pb::source::ConnectorSplit;
use risingwave_rpc_client::ConnectorClient;
use serde::{Deserialize, Serialize};

use super::datagen::DatagenMeta;
use super::filesystem::{FsSplit, S3FileReader, S3Properties, S3SplitEnumerator, S3_CONNECTOR};
use super::filesystem::{FsSplit, S3Properties, S3_CONNECTOR};
use super::google_pubsub::GooglePubsubMeta;
use super::kafka::KafkaMeta;
use super::monitor::SourceMetrics;
Expand All @@ -42,13 +40,16 @@ use super::nats::source::NatsSplitReader;
use super::nexmark::source::message::NexmarkMeta;
use crate::parser::ParserConfig;
use crate::source::cdc::{
CdcProperties, CdcSplitReader, DebeziumCdcSplit, DebeziumSplitEnumerator, CITUS_CDC_CONNECTOR,
MYSQL_CDC_CONNECTOR, POSTGRES_CDC_CONNECTOR,
CdcProperties, CdcSplitReader, Citus, CitusDebeziumSplitEnumerator, DebeziumCdcSplit,
DebeziumSplitEnumerator, Mysql, MysqlDebeziumSplitEnumerator, Postgres,
PostgresDebeziumSplitEnumerator, CITUS_CDC_CONNECTOR, MYSQL_CDC_CONNECTOR,
POSTGRES_CDC_CONNECTOR,
};
use crate::source::datagen::{
DatagenProperties, DatagenSplit, DatagenSplitEnumerator, DatagenSplitReader, DATAGEN_CONNECTOR,
};
use crate::source::dummy_connector::DummySplitReader;
use crate::source::filesystem::{S3FileReader, S3SplitEnumerator};
use crate::source::google_pubsub::{
PubsubProperties, PubsubSplit, PubsubSplitEnumerator, PubsubSplitReader,
GOOGLE_PUBSUB_CONNECTOR,
Expand Down Expand Up @@ -289,83 +290,39 @@ pub trait SplitReader: Sized {
fn into_stream(self) -> BoxSourceWithStateStream;
}

#[derive(Clone, Debug, Deserialize)]
#[derive(Clone, Debug)]
pub enum ConnectorProperties {
Kafka(Box<KafkaProperties>),
Pulsar(Box<PulsarProperties>),
Kinesis(Box<KinesisProperties>),
Nexmark(Box<NexmarkProperties>),
Datagen(Box<DatagenProperties>),
S3(Box<S3Properties>),
MySqlCdc(Box<CdcProperties>),
PostgresCdc(Box<CdcProperties>),
CitusCdc(Box<CdcProperties>),
MysqlCdc(Box<CdcProperties<Mysql>>),
PostgresCdc(Box<CdcProperties<Postgres>>),
CitusCdc(Box<CdcProperties<Citus>>),
GooglePubsub(Box<PubsubProperties>),
Nats(Box<NatsProperties>),
Dummy(Box<()>),
}

impl ConnectorProperties {
fn new_cdc_properties(
connector_name: &str,
properties: HashMap<String, String>,
) -> Result<Self> {
match connector_name {
MYSQL_CDC_CONNECTOR => Ok(Self::MySqlCdc(Box::new(CdcProperties {
props: properties,
source_type: "mysql".to_string(),
..Default::default()
}))),
POSTGRES_CDC_CONNECTOR => Ok(Self::PostgresCdc(Box::new(CdcProperties {
props: properties,
source_type: "postgres".to_string(),
..Default::default()
}))),
CITUS_CDC_CONNECTOR => Ok(Self::CitusCdc(Box::new(CdcProperties {
props: properties,
source_type: "citus".to_string(),
..Default::default()
}))),
_ => Err(anyhow!("unexpected cdc connector '{}'", connector_name,)),
}
}

pub fn init_cdc_properties(&mut self, table_schema: PbTableSchema) {
match self {
ConnectorProperties::MySqlCdc(c)
| ConnectorProperties::PostgresCdc(c)
| ConnectorProperties::CitusCdc(c) => {
c.table_schema = table_schema;
}
_ => {}
}
}

pub fn is_cdc_connector(&self) -> bool {
matches!(
self,
ConnectorProperties::MySqlCdc(_)
| ConnectorProperties::PostgresCdc(_)
| ConnectorProperties::CitusCdc(_)
)
}

pub fn support_multiple_splits(&self) -> bool {
matches!(self, ConnectorProperties::Kafka(_))
}
}

#[derive(Debug, Clone, Serialize, Deserialize, EnumAsInner, PartialEq, Hash)]
#[derive(Debug, Clone, EnumAsInner, PartialEq, Hash)]
pub enum SplitImpl {
Kafka(KafkaSplit),
Pulsar(PulsarSplit),
Kinesis(KinesisSplit),
Nexmark(NexmarkSplit),
Datagen(DatagenSplit),
GooglePubsub(PubsubSplit),
MySqlCdc(DebeziumCdcSplit),
PostgresCdc(DebeziumCdcSplit),
CitusCdc(DebeziumCdcSplit),
MysqlCdc(DebeziumCdcSplit<Mysql>),
PostgresCdc(DebeziumCdcSplit<Postgres>),
CitusCdc(DebeziumCdcSplit<Citus>),
Nats(NatsSplit),
S3(FsSplit),
}
Expand Down Expand Up @@ -396,9 +353,9 @@ pub enum SplitReaderImpl {
Nexmark(Box<NexmarkSplitReader>),
Pulsar(Box<PulsarSplitReader>),
Datagen(Box<DatagenSplitReader>),
MySqlCdc(Box<CdcSplitReader>),
PostgresCdc(Box<CdcSplitReader>),
CitusCdc(Box<CdcSplitReader>),
MysqlCdc(Box<CdcSplitReader<Mysql>>),
PostgresCdc(Box<CdcSplitReader<Postgres>>),
CitusCdc(Box<CdcSplitReader<Citus>>),
GooglePubsub(Box<PubsubSplitReader>),
Nats(Box<NatsSplitReader>),
}
Expand All @@ -409,9 +366,9 @@ pub enum SplitEnumeratorImpl {
Kinesis(KinesisSplitEnumerator),
Nexmark(NexmarkSplitEnumerator),
Datagen(DatagenSplitEnumerator),
MySqlCdc(DebeziumSplitEnumerator),
PostgresCdc(DebeziumSplitEnumerator),
CitusCdc(DebeziumSplitEnumerator),
MysqlCdc(MysqlDebeziumSplitEnumerator),
PostgresCdc(PostgresDebeziumSplitEnumerator),
CitusCdc(CitusDebeziumSplitEnumerator),
GooglePubsub(PubsubSplitEnumerator),
S3(S3SplitEnumerator),
Nats(NatsSplitEnumerator),
Expand All @@ -424,9 +381,6 @@ impl_connector_properties! {
{ Nexmark, NEXMARK_CONNECTOR },
{ Datagen, DATAGEN_CONNECTOR },
{ S3, S3_CONNECTOR },
{ MySqlCdc, MYSQL_CDC_CONNECTOR },
{ PostgresCdc, POSTGRES_CDC_CONNECTOR },
{ CitusCdc, CITUS_CDC_CONNECTOR },
{ GooglePubsub, GOOGLE_PUBSUB_CONNECTOR},
{ Nats, NATS_CONNECTOR }
}
Expand All @@ -437,7 +391,7 @@ impl_split_enumerator! {
{ Kinesis, KinesisSplitEnumerator },
{ Nexmark, NexmarkSplitEnumerator },
{ Datagen, DatagenSplitEnumerator },
{ MySqlCdc, DebeziumSplitEnumerator },
{ MysqlCdc, DebeziumSplitEnumerator },
{ PostgresCdc, DebeziumSplitEnumerator },
{ CitusCdc, DebeziumSplitEnumerator },
{ GooglePubsub, PubsubSplitEnumerator},
Expand All @@ -452,9 +406,9 @@ impl_split! {
{ Nexmark, NEXMARK_CONNECTOR, NexmarkSplit },
{ Datagen, DATAGEN_CONNECTOR, DatagenSplit },
{ GooglePubsub, GOOGLE_PUBSUB_CONNECTOR, PubsubSplit },
{ MySqlCdc, MYSQL_CDC_CONNECTOR, DebeziumCdcSplit },
{ PostgresCdc, POSTGRES_CDC_CONNECTOR, DebeziumCdcSplit },
{ CitusCdc, CITUS_CDC_CONNECTOR, DebeziumCdcSplit },
{ MysqlCdc, MYSQL_CDC_CONNECTOR, DebeziumCdcSplit<Mysql> },
{ PostgresCdc, POSTGRES_CDC_CONNECTOR, DebeziumCdcSplit<Postgres> },
{ CitusCdc, CITUS_CDC_CONNECTOR, DebeziumCdcSplit<Citus> },
{ S3, S3_CONNECTOR, FsSplit },
{ Nats, NATS_CONNECTOR, NatsSplit }
}
Expand All @@ -466,7 +420,7 @@ impl_split_reader! {
{ Kinesis, KinesisSplitReader },
{ Nexmark, NexmarkSplitReader },
{ Datagen, DatagenSplitReader },
{ MySqlCdc, CdcSplitReader},
{ MysqlCdc, CdcSplitReader},
{ PostgresCdc, CdcSplitReader},
{ CitusCdc, CdcSplitReader },
{ GooglePubsub, PubsubSplitReader },
Expand Down Expand Up @@ -565,7 +519,7 @@ mod tests {
let offset_str = "{\"sourcePartition\":{\"server\":\"RW_CDC_mydb.products\"},\"sourceOffset\":{\"transaction_id\":null,\"ts_sec\":1670407377,\"file\":\"binlog.000001\",\"pos\":98587,\"row\":2,\"server_id\":1,\"event\":2}}";
let mysql_split = MySqlCdcSplit::new(1001, offset_str.to_string());
let split = DebeziumCdcSplit::new(Some(mysql_split), None);
let split_impl = SplitImpl::MySqlCdc(split);
let split_impl = SplitImpl::MysqlCdc(split);
let encoded_split = split_impl.encode_to_bytes();
let restored_split_impl = SplitImpl::restore_from_bytes(encoded_split.as_ref())?;
assert_eq!(
Expand Down Expand Up @@ -653,8 +607,7 @@ mod tests {
));

let conn_props = ConnectorProperties::extract(user_props_mysql).unwrap();
if let ConnectorProperties::MySqlCdc(c) = conn_props {
assert_eq!(c.source_type, "mysql");
if let ConnectorProperties::MysqlCdc(c) = conn_props {
assert_eq!(c.props.get("connector_node_addr").unwrap(), "localhost");
assert_eq!(c.props.get("database.hostname").unwrap(), "127.0.0.1");
assert_eq!(c.props.get("database.port").unwrap(), "3306");
Expand All @@ -668,7 +621,6 @@ mod tests {

let conn_props = ConnectorProperties::extract(user_props_postgres).unwrap();
if let ConnectorProperties::PostgresCdc(c) = conn_props {
assert_eq!(c.source_type, "postgres");
assert_eq!(c.props.get("connector_node_addr").unwrap(), "localhost");
assert_eq!(c.props.get("database.hostname").unwrap(), "127.0.0.1");
assert_eq!(c.props.get("database.port").unwrap(), "5432");
Expand Down
Loading