diff --git a/e2e_test/sink/append_only_sink.slt b/e2e_test/sink/append_only_sink.slt index 5ace195ec48c..9a6aeabb88ae 100644 --- a/e2e_test/sink/append_only_sink.slt +++ b/e2e_test/sink/append_only_sink.slt @@ -22,7 +22,7 @@ create sink invalid_sink_type from t with (connector = 'blackhole', type = 'inva statement error `force_append_only` must be true or false create sink invalid_force_append_only from t with (connector = 'blackhole', force_append_only = 'invalid'); -statement error invalid connector type: invalid +statement error db error: ERROR: QueryError: internal error: Sink error: config error: unsupported sink connector invalid create sink invalid_connector from t with (connector = 'invalid'); statement ok diff --git a/src/connector/src/sink/boxed.rs b/src/connector/src/sink/boxed.rs index d966b1b6dea0..8c4dcdb4b050 100644 --- a/src/connector/src/sink/boxed.rs +++ b/src/connector/src/sink/boxed.rs @@ -12,28 +12,18 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::fmt::{Debug, Formatter}; -use std::ops::{Deref, DerefMut}; +use std::ops::DerefMut; use std::sync::Arc; use async_trait::async_trait; use risingwave_common::array::StreamChunk; use risingwave_common::buffer::Bitmap; use risingwave_pb::connector_service::SinkMetadata; -use risingwave_rpc_client::ConnectorClient; -use crate::sink::{Sink, SinkCommitCoordinator, SinkWriter, SinkWriterParam}; +use crate::sink::{SinkCommitCoordinator, SinkWriter}; pub type BoxWriter = Box + Send + 'static>; pub type BoxCoordinator = Box; -pub type BoxSink = - Box, Coordinator = BoxCoordinator> + Send + Sync + 'static>; - -impl Debug for BoxSink { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - f.write_str("BoxSink") - } -} #[async_trait] impl SinkWriter for BoxWriter { @@ -70,24 +60,3 @@ impl SinkCommitCoordinator for BoxCoordinator { self.deref_mut().commit(epoch, metadata).await } } - -#[async_trait] -impl Sink for BoxSink { - type Coordinator = BoxCoordinator; - type Writer = BoxWriter<()>; - - async fn validate(&self) -> crate::sink::Result<()> { - self.deref().validate().await - } - - async fn new_writer(&self, writer_param: SinkWriterParam) -> crate::sink::Result { - self.deref().new_writer(writer_param).await - } - - async fn new_coordinator( - &self, - connector_client: Option, - ) -> crate::sink::Result { - self.deref().new_coordinator(connector_client).await - } -} diff --git a/src/connector/src/sink/clickhouse.rs b/src/connector/src/sink/clickhouse.rs index ef4cc8b993a9..08287b789eaf 100644 --- a/src/connector/src/sink/clickhouse.rs +++ b/src/connector/src/sink/clickhouse.rs @@ -30,7 +30,8 @@ use serde_with::serde_as; use super::{DummySinkCommitCoordinator, SinkWriterParam}; use crate::common::ClickHouseCommon; use crate::sink::{ - Result, Sink, SinkError, SinkWriter, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, + Result, Sink, SinkError, SinkParam, SinkWriter, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, + SINK_TYPE_UPSERT, }; pub const CLICKHOUSE_SINK: &str = "clickhouse"; @@ -70,21 +71,22 @@ impl ClickHouseConfig { } } -impl ClickHouseSink { - pub fn new( - config: ClickHouseConfig, - schema: Schema, - pk_indices: Vec, - is_append_only: bool, - ) -> Result { +impl TryFrom for ClickHouseSink { + type Error = SinkError; + + fn try_from(param: SinkParam) -> std::result::Result { + let schema = param.schema(); + let config = ClickHouseConfig::from_hashmap(param.properties)?; Ok(Self { config, schema, - pk_indices, - is_append_only, + pk_indices: param.downstream_pk, + is_append_only: param.sink_type.is_append_only(), }) } +} +impl ClickHouseSink { /// Check that the column names and types of risingwave and clickhouse are identical fn check_column_name_and_type(&self, clickhouse_columns_desc: &[SystemColumn]) -> Result<()> { let rw_fields_name = build_fields_name_type_from_schema(&self.schema)?; @@ -205,11 +207,12 @@ impl ClickHouseSink { Ok(()) } } -#[async_trait::async_trait] impl Sink for ClickHouseSink { type Coordinator = DummySinkCommitCoordinator; type Writer = ClickHouseSinkWriter; + const SINK_NAME: &'static str = CLICKHOUSE_SINK; + async fn validate(&self) -> Result<()> { // For upsert clickhouse sink, the primary key must be defined. if !self.is_append_only && self.pk_indices.is_empty() { @@ -241,13 +244,13 @@ impl Sink for ClickHouseSink { } async fn new_writer(&self, _writer_env: SinkWriterParam) -> Result { - Ok(ClickHouseSinkWriter::new( + ClickHouseSinkWriter::new( self.config.clone(), self.schema.clone(), self.pk_indices.clone(), self.is_append_only, ) - .await?) + .await } } pub struct ClickHouseSinkWriter { diff --git a/src/connector/src/sink/doris.rs b/src/connector/src/sink/doris.rs index 82e6454663f4..3e226bcb37d4 100644 --- a/src/connector/src/sink/doris.rs +++ b/src/connector/src/sink/doris.rs @@ -29,7 +29,9 @@ use super::doris_connector::{DorisField, DorisInsert, DorisInsertClient, DORIS_D use super::{SinkError, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT}; use crate::common::DorisCommon; use crate::sink::encoder::{JsonEncoder, RowEncoder, TimestampHandlingMode}; -use crate::sink::{DummySinkCommitCoordinator, Result, Sink, SinkWriter, SinkWriterParam}; +use crate::sink::{ + DummySinkCommitCoordinator, Result, Sink, SinkParam, SinkWriter, SinkWriterParam, +}; pub const DORIS_SINK: &str = "doris"; #[serde_as] @@ -151,19 +153,20 @@ impl DorisSink { } } -#[async_trait] impl Sink for DorisSink { type Coordinator = DummySinkCommitCoordinator; type Writer = DorisSinkWriter; + const SINK_NAME: &'static str = DORIS_SINK; + async fn new_writer(&self, _writer_env: SinkWriterParam) -> Result { - Ok(DorisSinkWriter::new( + DorisSinkWriter::new( self.config.clone(), self.schema.clone(), self.pk_indices.clone(), self.is_append_only, ) - .await?) + .await } async fn validate(&self) -> Result<()> { @@ -195,6 +198,21 @@ pub struct DorisSinkWriter { row_encoder: JsonEncoder, } +impl TryFrom for DorisSink { + type Error = SinkError; + + fn try_from(param: SinkParam) -> std::result::Result { + let schema = param.schema(); + let config = DorisConfig::from_hashmap(param.properties)?; + DorisSink::new( + config, + schema, + param.downstream_pk, + param.sink_type.is_append_only(), + ) + } +} + impl DorisSinkWriter { pub async fn new( config: DorisConfig, diff --git a/src/connector/src/sink/iceberg.rs b/src/connector/src/sink/iceberg.rs index 41aded5ba26c..25c5aa776511 100644 --- a/src/connector/src/sink/iceberg.rs +++ b/src/connector/src/sink/iceberg.rs @@ -41,15 +41,21 @@ use super::{ }; use crate::deserialize_bool_from_string; use crate::sink::coordinate::CoordinatedSinkWriter; -use crate::sink::remote::{CoordinatedRemoteSink, RemoteConfig}; +use crate::sink::remote::{CoordinatedRemoteSink, RemoteSinkTrait}; use crate::sink::{Result, SinkCommitCoordinator, SinkParam}; /// This iceberg sink is WIP. When it ready, we will change this name to "iceberg". pub const ICEBERG_SINK: &str = "iceberg"; pub const REMOTE_ICEBERG_SINK: &str = "iceberg_java"; -pub type RemoteIcebergSink = CoordinatedRemoteSink; -pub type RemoteIcebergConfig = RemoteConfig; +#[derive(Debug)] +pub struct RemoteIceberg; + +impl RemoteSinkTrait for RemoteIceberg { + const SINK_NAME: &'static str = REMOTE_ICEBERG_SINK; +} + +pub type RemoteIcebergSink = CoordinatedRemoteSink; #[derive(Debug, Clone, Deserialize)] #[serde(deny_unknown_fields)] @@ -192,6 +198,15 @@ pub struct IcebergSink { param: SinkParam, } +impl TryFrom for IcebergSink { + type Error = SinkError; + + fn try_from(param: SinkParam) -> std::result::Result { + let config = IcebergConfig::from_hashmap(param.properties.clone())?; + IcebergSink::new(config, param) + } +} + impl Debug for IcebergSink { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("IcebergSink") @@ -240,11 +255,12 @@ impl IcebergSink { } } -#[async_trait::async_trait] impl Sink for IcebergSink { type Coordinator = IcebergSinkCommitter; type Writer = CoordinatedSinkWriter; + const SINK_NAME: &'static str = ICEBERG_SINK; + async fn validate(&self) -> Result<()> { let _ = self.create_table().await?; Ok(()) @@ -261,7 +277,7 @@ impl Sink for IcebergSink { .map_err(|err| SinkError::Iceberg(anyhow!(err)))?, table, }; - Ok(CoordinatedSinkWriter::new( + CoordinatedSinkWriter::new( writer_param .meta_client .expect("should have meta client") @@ -275,7 +291,7 @@ impl Sink for IcebergSink { })?, inner, ) - .await?) + .await } async fn new_coordinator( diff --git a/src/connector/src/sink/kafka.rs b/src/connector/src/sink/kafka.rs index 85894792cd7e..0cad48bbb57f 100644 --- a/src/connector/src/sink/kafka.rs +++ b/src/connector/src/sink/kafka.rs @@ -281,24 +281,29 @@ pub struct KafkaSink { sink_from_name: String, } -impl KafkaSink { - pub fn new(config: KafkaConfig, param: SinkParam) -> Self { - Self { +impl TryFrom for KafkaSink { + type Error = SinkError; + + fn try_from(param: SinkParam) -> std::result::Result { + let schema = param.schema(); + let config = KafkaConfig::from_hashmap(param.properties)?; + Ok(Self { config, - schema: param.schema(), + schema, pk_indices: param.downstream_pk, is_append_only: param.sink_type.is_append_only(), db_name: param.db_name, sink_from_name: param.sink_from_name, - } + }) } } -#[async_trait::async_trait] impl Sink for KafkaSink { type Coordinator = DummySinkCommitCoordinator; type Writer = SinkWriterV1Adapter; + const SINK_NAME: &'static str = KAFKA_SINK; + async fn new_writer(&self, _writer_param: SinkWriterParam) -> Result { Ok(SinkWriterV1Adapter::new( KafkaSinkWriter::new( diff --git a/src/connector/src/sink/kinesis.rs b/src/connector/src/sink/kinesis.rs index 114070ca1e44..f16ca3905b88 100644 --- a/src/connector/src/sink/kinesis.rs +++ b/src/connector/src/sink/kinesis.rs @@ -47,24 +47,29 @@ pub struct KinesisSink { sink_from_name: String, } -impl KinesisSink { - pub fn new(config: KinesisSinkConfig, param: SinkParam) -> Self { - Self { +impl TryFrom for KinesisSink { + type Error = SinkError; + + fn try_from(param: SinkParam) -> std::result::Result { + let schema = param.schema(); + let config = KinesisSinkConfig::from_hashmap(param.properties)?; + Ok(Self { config, - schema: param.schema(), + schema, pk_indices: param.downstream_pk, is_append_only: param.sink_type.is_append_only(), db_name: param.db_name, sink_from_name: param.sink_from_name, - } + }) } } -#[async_trait::async_trait] impl Sink for KinesisSink { type Coordinator = DummySinkCommitCoordinator; type Writer = KinesisSinkWriter; + const SINK_NAME: &'static str = KINESIS_SINK; + async fn validate(&self) -> Result<()> { // For upsert Kafka sink, the primary key must be defined. if !self.is_append_only && self.pk_indices.is_empty() { diff --git a/src/connector/src/sink/mod.rs b/src/connector/src/sink/mod.rs index 449f4f541f0e..f312453481db 100644 --- a/src/connector/src/sink/mod.rs +++ b/src/connector/src/sink/mod.rs @@ -27,7 +27,6 @@ pub mod nats; pub mod pulsar; pub mod redis; pub mod remote; -#[cfg(any(test, madsim))] pub mod test_sink; pub mod utils; @@ -37,7 +36,6 @@ use std::sync::Arc; use ::clickhouse::error::Error as ClickHouseError; use anyhow::anyhow; use async_trait::async_trait; -use enum_as_inner::EnumAsInner; use risingwave_common::array::StreamChunk; use risingwave_common::buffer::Bitmap; use risingwave_common::catalog::{ColumnDesc, Field, Schema}; @@ -50,27 +48,82 @@ use thiserror::Error; pub use tracing; use self::catalog::SinkType; -use self::clickhouse::{ClickHouseConfig, ClickHouseSink}; -use self::doris::{DorisConfig, DorisSink}; use self::encoder::SerTo; use self::formatter::SinkFormatter; -use self::iceberg::{IcebergSink, ICEBERG_SINK, REMOTE_ICEBERG_SINK}; -use self::pulsar::{PulsarConfig, PulsarSink}; -use crate::sink::boxed::BoxSink; use crate::sink::catalog::{SinkCatalog, SinkId}; -use crate::sink::clickhouse::CLICKHOUSE_SINK; -use crate::sink::doris::DORIS_SINK; -use crate::sink::iceberg::{IcebergConfig, RemoteIcebergConfig, RemoteIcebergSink}; -use crate::sink::kafka::{KafkaConfig, KafkaSink, KAFKA_SINK}; -use crate::sink::kinesis::{KinesisSink, KinesisSinkConfig, KINESIS_SINK}; -use crate::sink::nats::{NatsConfig, NatsSink, NATS_SINK}; -use crate::sink::pulsar::PULSAR_SINK; -use crate::sink::redis::{RedisConfig, RedisSink}; -use crate::sink::remote::{CoordinatedRemoteSink, RemoteConfig, RemoteSink}; -#[cfg(any(test, madsim))] -use crate::sink::test_sink::{build_test_sink, TEST_SINK_NAME}; +pub use crate::sink::clickhouse::ClickHouseSink; +pub use crate::sink::iceberg::{IcebergSink, RemoteIcebergSink}; +pub use crate::sink::kafka::KafkaSink; +pub use crate::sink::kinesis::KinesisSink; +pub use crate::sink::nats::NatsSink; +pub use crate::sink::pulsar::PulsarSink; +pub use crate::sink::redis::RedisSink; +pub use crate::sink::remote::{CassandraSink, DeltaLakeSink, ElasticSearchSink, JdbcSink}; +pub use crate::sink::test_sink::TestSink; use crate::ConnectorParams; +#[macro_export] +macro_rules! for_all_sinks { + ($macro:path $(, $arg:tt)*) => { + $macro! { + { + { Redis, $crate::sink::RedisSink }, + { Kafka, $crate::sink::KafkaSink }, + { Pulsar, $crate::sink::PulsarSink }, + { BlackHole, $crate::sink::BlackHoleSink }, + { Kinesis, $crate::sink::KinesisSink }, + { ClickHouse, $crate::sink::ClickHouseSink }, + { Iceberg, $crate::sink::IcebergSink }, + { Nats, $crate::sink::NatsSink }, + { RemoteIceberg, $crate::sink::RemoteIcebergSink }, + { Jdbc, $crate::sink::JdbcSink }, + { DeltaLake, $crate::sink::DeltaLakeSink }, + { ElasticSearch, $crate::sink::ElasticSearchSink }, + { Cassandra, $crate::sink::CassandraSink }, + { Doris, $crate::sink::doris::DorisSink }, + { Test, $crate::sink::TestSink } + } + $(,$arg)* + } + }; +} + +#[macro_export] +macro_rules! dispatch_sink { + ({$({$variant_name:ident, $sink_type:ty}),*}, $impl:tt, $sink:tt, $body:tt) => {{ + use $crate::sink::SinkImpl; + + match $impl { + $( + SinkImpl::$variant_name($sink) => $body, + )* + } + }}; + ($impl:expr, $sink:ident, $body:expr) => {{ + $crate::for_all_sinks! {$crate::dispatch_sink, {$impl}, $sink, {$body}} + }}; +} + +#[macro_export] +macro_rules! match_sink_name_str { + ({$({$variant_name:ident, $sink_type:ty}),*}, $name_str:tt, $type_name:ident, $body:tt, $on_other_closure:tt) => { + match $name_str { + $( + <$sink_type>::SINK_NAME => { + type $type_name = $sink_type; + { + $body + } + }, + )* + other => ($on_other_closure)(other), + } + }; + ($name_str:expr, $type_name:ident, $body:expr, $on_other_closure:expr) => {{ + $crate::for_all_sinks! {$crate::match_sink_name_str, {$name_str}, $type_name, {$body}, {$on_other_closure}} + }}; +} + pub const DOWNSTREAM_SINK_KEY: &str = "connector"; pub const SINK_TYPE_OPTION: &str = "type"; pub const SINK_TYPE_APPEND_ONLY: &str = "append-only"; @@ -156,13 +209,14 @@ pub struct SinkWriterParam { pub meta_client: Option, } -#[async_trait] -pub trait Sink { +pub trait Sink: TryFrom { + const SINK_NAME: &'static str; type Writer: SinkWriter; type Coordinator: SinkCommitCoordinator; async fn validate(&self) -> Result<()>; async fn new_writer(&self, writer_param: SinkWriterParam) -> Result; + #[expect(clippy::unused_async)] async fn new_coordinator( &self, _connector_client: Option, @@ -320,33 +374,25 @@ impl SinkCommitCoordinator for DummySinkCommitCoordinator { } } -#[derive(Clone, Debug, EnumAsInner)] -pub enum SinkConfig { - Redis(RedisConfig), - Kafka(Box), - Remote(RemoteConfig), - Kinesis(Box), - Iceberg(IcebergConfig), - RemoteIceberg(RemoteIcebergConfig), - Pulsar(PulsarConfig), - BlackHole, - ClickHouse(Box), - Doris(Box), - Nats(NatsConfig), - #[cfg(any(test, madsim))] - Test, -} - pub const BLACKHOLE_SINK: &str = "blackhole"; #[derive(Debug)] pub struct BlackHoleSink; -#[async_trait] +impl TryFrom for BlackHoleSink { + type Error = SinkError; + + fn try_from(_value: SinkParam) -> std::result::Result { + Ok(Self) + } +} + impl Sink for BlackHoleSink { type Coordinator = DummySinkCommitCoordinator; type Writer = Self; + const SINK_NAME: &'static str = BLACKHOLE_SINK; + async fn new_writer(&self, _writer_env: SinkWriterParam) -> Result { Ok(Self) } @@ -371,144 +417,68 @@ impl SinkWriter for BlackHoleSink { } } -impl SinkConfig { - pub fn from_hashmap(mut properties: HashMap) -> Result { +impl SinkImpl { + pub fn new(mut param: SinkParam) -> Result { const CONNECTOR_TYPE_KEY: &str = "connector"; const CONNECTION_NAME_KEY: &str = "connection.name"; const PRIVATE_LINK_TARGET_KEY: &str = "privatelink.targets"; // remove privatelink related properties if any - properties.remove(PRIVATE_LINK_TARGET_KEY); - properties.remove(CONNECTION_NAME_KEY); + param.properties.remove(PRIVATE_LINK_TARGET_KEY); + param.properties.remove(CONNECTION_NAME_KEY); - let sink_type = properties + let sink_type = param + .properties .get(CONNECTOR_TYPE_KEY) .ok_or_else(|| SinkError::Config(anyhow!("missing config: {}", CONNECTOR_TYPE_KEY)))?; - match sink_type.to_lowercase().as_str() { - KAFKA_SINK => Ok(SinkConfig::Kafka(Box::new(KafkaConfig::from_hashmap( - properties, - )?))), - KINESIS_SINK => Ok(SinkConfig::Kinesis(Box::new( - KinesisSinkConfig::from_hashmap(properties)?, - ))), - CLICKHOUSE_SINK => Ok(SinkConfig::ClickHouse(Box::new( - ClickHouseConfig::from_hashmap(properties)?, - ))), - DORIS_SINK => Ok(SinkConfig::Doris(Box::new(DorisConfig::from_hashmap( - properties, - )?))), - BLACKHOLE_SINK => Ok(SinkConfig::BlackHole), - PULSAR_SINK => Ok(SinkConfig::Pulsar(PulsarConfig::from_hashmap(properties)?)), - REMOTE_ICEBERG_SINK => Ok(SinkConfig::RemoteIceberg( - RemoteIcebergConfig::from_hashmap(properties)?, - )), - ICEBERG_SINK => Ok(SinkConfig::Iceberg(IcebergConfig::from_hashmap( - properties, - )?)), - NATS_SINK => Ok(SinkConfig::Nats(NatsConfig::from_hashmap(properties)?)), - // Only in test or deterministic test, test sink is enabled. - #[cfg(any(test, madsim))] - TEST_SINK_NAME => Ok(SinkConfig::Test), - _ => Ok(SinkConfig::Remote(RemoteConfig::from_hashmap(properties)?)), - } + match_sink_name_str!( + sink_type.to_lowercase().as_str(), + SinkType, + Ok(SinkType::try_from(param)?.into()), + |other| { + Err(SinkError::Config(anyhow!( + "unsupported sink connector {}", + other + ))) + } + ) } } pub fn build_sink(param: SinkParam) -> Result { - let config = SinkConfig::from_hashmap(param.properties.clone())?; - SinkImpl::new(config, param) + SinkImpl::new(param) } -#[derive(Debug)] -pub enum SinkImpl { - Redis(RedisSink), - Kafka(KafkaSink), - Remote(RemoteSink), - Pulsar(PulsarSink), - BlackHole(BlackHoleSink), - Kinesis(KinesisSink), - ClickHouse(ClickHouseSink), - Doris(DorisSink), - Iceberg(IcebergSink), - Nats(NatsSink), - RemoteIceberg(RemoteIcebergSink), - TestSink(BoxSink), -} - -impl SinkImpl { - pub fn get_connector(&self) -> &'static str { - match self { - SinkImpl::Kafka(_) => "kafka", - SinkImpl::Redis(_) => "redis", - SinkImpl::Remote(_) => "remote", - SinkImpl::Pulsar(_) => "pulsar", - SinkImpl::BlackHole(_) => "blackhole", - SinkImpl::Kinesis(_) => "kinesis", - SinkImpl::ClickHouse(_) => "clickhouse", - SinkImpl::Iceberg(_) => "iceberg", - SinkImpl::Nats(_) => "nats", - SinkImpl::RemoteIceberg(_) => "iceberg_java", - SinkImpl::TestSink(_) => "test", - SinkImpl::Doris(_) => "doris", +macro_rules! def_sink_impl { + () => { + $crate::for_all_sinks! { def_sink_impl } + }; + ({ $({ $variant_name:ident, $sink_type:ty }),* }) => { + #[derive(Debug)] + pub enum SinkImpl { + $( + $variant_name($sink_type), + )* } - } -} -#[macro_export] -macro_rules! dispatch_sink { - ($impl:expr, $sink:ident, $body:tt) => {{ - use $crate::sink::SinkImpl; - - match $impl { - SinkImpl::Redis($sink) => $body, - SinkImpl::Kafka($sink) => $body, - SinkImpl::Remote($sink) => $body, - SinkImpl::Pulsar($sink) => $body, - SinkImpl::BlackHole($sink) => $body, - SinkImpl::Kinesis($sink) => $body, - SinkImpl::ClickHouse($sink) => $body, - SinkImpl::Doris($sink) => $body, - SinkImpl::Iceberg($sink) => $body, - SinkImpl::Nats($sink) => $body, - SinkImpl::RemoteIceberg($sink) => $body, - SinkImpl::TestSink($sink) => $body, - } - }}; + $( + impl From<$sink_type> for SinkImpl { + fn from(sink: $sink_type) -> SinkImpl { + SinkImpl::$variant_name(sink) + } + } + )* + }; } +def_sink_impl!(); + impl SinkImpl { - pub fn new(cfg: SinkConfig, param: SinkParam) -> Result { - Ok(match cfg { - SinkConfig::Redis(cfg) => SinkImpl::Redis(RedisSink::new(cfg, param.schema())?), - SinkConfig::Kafka(cfg) => SinkImpl::Kafka(KafkaSink::new(*cfg, param)), - SinkConfig::Kinesis(cfg) => SinkImpl::Kinesis(KinesisSink::new(*cfg, param)), - SinkConfig::Remote(cfg) => SinkImpl::Remote(RemoteSink::new(cfg, param)), - SinkConfig::Pulsar(cfg) => SinkImpl::Pulsar(PulsarSink::new(cfg, param)), - SinkConfig::BlackHole => SinkImpl::BlackHole(BlackHoleSink), - SinkConfig::ClickHouse(cfg) => SinkImpl::ClickHouse(ClickHouseSink::new( - *cfg, - param.schema(), - param.downstream_pk, - param.sink_type.is_append_only(), - )?), - SinkConfig::Iceberg(cfg) => SinkImpl::Iceberg(IcebergSink::new(cfg, param)?), - SinkConfig::Nats(cfg) => SinkImpl::Nats(NatsSink::new( - cfg, - param.schema(), - param.sink_type.is_append_only(), - )), - SinkConfig::RemoteIceberg(cfg) => { - SinkImpl::RemoteIceberg(CoordinatedRemoteSink(RemoteSink::new(cfg, param))) - } - #[cfg(any(test, madsim))] - SinkConfig::Test => SinkImpl::TestSink(build_test_sink(param)?), - SinkConfig::Doris(cfg) => SinkImpl::Doris(DorisSink::new( - *cfg, - param.schema(), - param.downstream_pk, - param.sink_type.is_append_only(), - )?), - }) + pub fn get_connector(&self) -> &'static str { + fn get_name(_: &S) -> &'static str { + S::SINK_NAME + } + dispatch_sink!(self, sink, get_name(sink)) } } diff --git a/src/connector/src/sink/nats.rs b/src/connector/src/sink/nats.rs index 57b9e572c061..1a126eb2c74d 100644 --- a/src/connector/src/sink/nats.rs +++ b/src/connector/src/sink/nats.rs @@ -28,7 +28,7 @@ use super::utils::chunk_to_json; use super::{DummySinkCommitCoordinator, SinkWriter, SinkWriterParam}; use crate::common::NatsCommon; use crate::sink::encoder::{JsonEncoder, TimestampHandlingMode}; -use crate::sink::{Result, Sink, SinkError, SINK_TYPE_APPEND_ONLY}; +use crate::sink::{Result, Sink, SinkError, SinkParam, SINK_TYPE_APPEND_ONLY}; pub const NATS_SINK: &str = "nats"; @@ -71,21 +71,26 @@ impl NatsConfig { } } -impl NatsSink { - pub fn new(config: NatsConfig, schema: Schema, is_append_only: bool) -> Self { - Self { +impl TryFrom for NatsSink { + type Error = SinkError; + + fn try_from(param: SinkParam) -> std::result::Result { + let schema = param.schema(); + let config = NatsConfig::from_hashmap(param.properties)?; + Ok(Self { config, schema, - is_append_only, - } + is_append_only: param.sink_type.is_append_only(), + }) } } -#[async_trait::async_trait] impl Sink for NatsSink { type Coordinator = DummySinkCommitCoordinator; type Writer = NatsSinkWriter; + const SINK_NAME: &'static str = NATS_SINK; + async fn validate(&self) -> Result<()> { if !self.is_append_only { return Err(SinkError::Nats(anyhow!( @@ -105,7 +110,7 @@ impl Sink for NatsSink { } async fn new_writer(&self, _writer_env: SinkWriterParam) -> Result { - Ok(NatsSinkWriter::new(self.config.clone(), self.schema.clone()).await?) + NatsSinkWriter::new(self.config.clone(), self.schema.clone()).await } } diff --git a/src/connector/src/sink/pulsar.rs b/src/connector/src/sink/pulsar.rs index 8dc478f7dd82..df13c2b4c7d0 100644 --- a/src/connector/src/sink/pulsar.rs +++ b/src/connector/src/sink/pulsar.rs @@ -145,24 +145,29 @@ pub struct PulsarSink { sink_from_name: String, } -impl PulsarSink { - pub fn new(config: PulsarConfig, param: SinkParam) -> Self { - Self { +impl TryFrom for PulsarSink { + type Error = SinkError; + + fn try_from(param: SinkParam) -> std::result::Result { + let schema = param.schema(); + let config = PulsarConfig::from_hashmap(param.properties)?; + Ok(Self { config, - schema: param.schema(), + schema, downstream_pk: param.downstream_pk, is_append_only: param.sink_type.is_append_only(), db_name: param.db_name, sink_from_name: param.sink_from_name, - } + }) } } -#[async_trait] impl Sink for PulsarSink { type Coordinator = DummySinkCommitCoordinator; type Writer = PulsarSinkWriter; + const SINK_NAME: &'static str = PULSAR_SINK; + async fn new_writer(&self, _writer_param: SinkWriterParam) -> Result { PulsarSinkWriter::new( self.config.clone(), diff --git a/src/connector/src/sink/redis.rs b/src/connector/src/sink/redis.rs index f7957335aa1e..8cd0875300d2 100644 --- a/src/connector/src/sink/redis.rs +++ b/src/connector/src/sink/redis.rs @@ -14,9 +14,10 @@ use async_trait::async_trait; use risingwave_common::array::StreamChunk; -use risingwave_common::catalog::Schema; -use crate::sink::{DummySinkCommitCoordinator, Result, Sink, SinkWriter, SinkWriterParam}; +use crate::sink::{ + DummySinkCommitCoordinator, Result, Sink, SinkError, SinkParam, SinkWriter, SinkWriterParam, +}; #[derive(Clone, Debug)] pub struct RedisConfig; @@ -24,17 +25,20 @@ pub struct RedisConfig; #[derive(Debug)] pub struct RedisSink; -impl RedisSink { - pub fn new(_cfg: RedisConfig, _schema: Schema) -> Result { +impl TryFrom for RedisSink { + type Error = SinkError; + + fn try_from(_param: SinkParam) -> std::result::Result { todo!() } } -#[async_trait] impl Sink for RedisSink { type Coordinator = DummySinkCommitCoordinator; type Writer = RedisSinkWriter; + const SINK_NAME: &'static str = "redis"; + async fn new_writer(&self, _writer_env: SinkWriterParam) -> Result { todo!() } diff --git a/src/connector/src/sink/remote.rs b/src/connector/src/sink/remote.rs index f6fa615e44c5..51195ac366a2 100644 --- a/src/connector/src/sink/remote.rs +++ b/src/connector/src/sink/remote.rs @@ -22,7 +22,6 @@ use itertools::Itertools; use jni::objects::{JByteArray, JValue, JValueOwned}; use prost::Message; use risingwave_common::array::StreamChunk; -use risingwave_common::catalog::Schema; use risingwave_common::error::anyhow_error; use risingwave_common::types::DataType; use risingwave_jni_core::jvm_runtime::JVM; @@ -45,7 +44,6 @@ use tracing::{error, warn}; use super::encoder::{JsonEncoder, RowEncoder, TimestampHandlingMode}; use crate::sink::coordinate::CoordinatedSinkWriter; -use crate::sink::iceberg::REMOTE_ICEBERG_SINK; use crate::sink::SinkError::Remote; use crate::sink::{ DummySinkCommitCoordinator, Result, Sink, SinkCommitCoordinator, SinkError, SinkParam, @@ -53,68 +51,58 @@ use crate::sink::{ }; use crate::ConnectorParams; -pub const VALID_REMOTE_SINKS: [&str; 5] = [ - "jdbc", - REMOTE_ICEBERG_SINK, - "deltalake", - "elasticsearch", - "cassandra", -]; - -pub fn is_valid_remote_sink(connector_type: &str) -> bool { - VALID_REMOTE_SINKS.contains(&connector_type) -} - -#[derive(Clone, Debug)] -pub struct RemoteConfig { - pub connector_type: String, - pub properties: HashMap, +macro_rules! def_remote_sink { + () => { + def_remote_sink! { + { ElasticSearch, ElasticSearchSink, "elasticsearch" }, + { Cassandra, CassandraSink, "cassandra" }, + { Jdbc, JdbcSink, "jdbc" }, + { DeltaLake, DeltaLakeSink, "deltalake" } + } + }; + ($({ $variant_name:ident, $sink_type_name:ident, $sink_name:expr }),*) => { + $( + #[derive(Debug)] + pub struct $variant_name; + impl RemoteSinkTrait for $variant_name { + const SINK_NAME: &'static str = $sink_name; + } + pub type $sink_type_name = RemoteSink<$variant_name>; + )* + }; } -impl RemoteConfig { - pub fn from_hashmap(values: HashMap) -> Result { - let connector_type = values - .get("connector") - .expect("sink type must be specified") - .to_string(); - - if !is_valid_remote_sink(connector_type.as_str()) { - return Err(SinkError::Config(anyhow!( - "invalid connector type: {connector_type}" - ))); - } +def_remote_sink!(); - Ok(RemoteConfig { - connector_type, - properties: values, - }) - } +pub trait RemoteSinkTrait: Send + Sync + 'static { + const SINK_NAME: &'static str; } #[derive(Debug)] -pub struct RemoteSink { - config: RemoteConfig, +pub struct RemoteSink { param: SinkParam, + _phantom: PhantomData, } -impl RemoteSink { - pub fn new(config: RemoteConfig, param: SinkParam) -> Self { - Self { config, param } +impl TryFrom for RemoteSink { + type Error = SinkError; + + fn try_from(param: SinkParam) -> std::result::Result { + Ok(Self { + param, + _phantom: PhantomData, + }) } } -#[async_trait] -impl Sink for RemoteSink { +impl Sink for RemoteSink { type Coordinator = DummySinkCommitCoordinator; - type Writer = RemoteSinkWriter; + type Writer = RemoteSinkWriter; + + const SINK_NAME: &'static str = R::SINK_NAME; async fn new_writer(&self, writer_param: SinkWriterParam) -> Result { - Ok(RemoteSinkWriter::new( - self.config.clone(), - self.param.clone(), - writer_param.connector_params, - ) - .await?) + RemoteSinkWriter::new(self.param.clone(), writer_param.connector_params).await } async fn validate(&self) -> Result<()> { @@ -195,19 +183,28 @@ impl Sink for RemoteSink { } #[derive(Debug)] -pub struct CoordinatedRemoteSink(pub RemoteSink); +pub struct CoordinatedRemoteSink(pub RemoteSink); -#[async_trait] -impl Sink for CoordinatedRemoteSink { - type Coordinator = RemoteCoordinator; - type Writer = CoordinatedSinkWriter; +impl TryFrom for CoordinatedRemoteSink { + type Error = SinkError; + + fn try_from(param: SinkParam) -> std::result::Result { + RemoteSink::try_from(param).map(Self) + } +} + +impl Sink for CoordinatedRemoteSink { + type Coordinator = RemoteCoordinator; + type Writer = CoordinatedSinkWriter>; + + const SINK_NAME: &'static str = R::SINK_NAME; async fn validate(&self) -> Result<()> { self.0.validate().await } async fn new_writer(&self, writer_param: SinkWriterParam) -> Result { - Ok(CoordinatedSinkWriter::new( + CoordinatedSinkWriter::new( writer_param .meta_client .expect("should have meta client") @@ -219,50 +216,40 @@ impl Sink for CoordinatedRemoteSink { "sink needs coordination should not have singleton input" )) })?, - CoordinatedRemoteSinkWriter::new( - self.0.config.clone(), - self.0.param.clone(), - writer_param.connector_params, - ) - .await?, + CoordinatedRemoteSinkWriter::new(self.0.param.clone(), writer_param.connector_params) + .await?, ) - .await?) + .await } async fn new_coordinator( &self, connector_client: Option, ) -> Result { - Ok(RemoteCoordinator::new( + RemoteCoordinator::new( connector_client .ok_or_else(|| Remote(anyhow_error!("no connector client specified")))?, self.0.param.clone(), ) - .await?) + .await } } -pub type RemoteSinkWriter = RemoteSinkWriterInner<()>; -pub type CoordinatedRemoteSinkWriter = RemoteSinkWriterInner>; +pub type RemoteSinkWriter = RemoteSinkWriterInner<(), R>; +pub type CoordinatedRemoteSinkWriter = RemoteSinkWriterInner, R>; -pub struct RemoteSinkWriterInner { - pub connector_type: String, +pub struct RemoteSinkWriterInner { properties: HashMap, epoch: Option, batch_id: u64, - schema: Schema, payload_format: SinkPayloadFormat, stream_handle: SinkWriterStreamHandle, json_encoder: JsonEncoder, - _phantom: PhantomData, + _phantom: PhantomData<(SM, R)>, } -impl RemoteSinkWriterInner { - pub async fn new( - config: RemoteConfig, - param: SinkParam, - connector_params: ConnectorParams, - ) -> Result { +impl RemoteSinkWriterInner { + pub async fn new(param: SinkParam, connector_params: ConnectorParams) -> Result { let client = connector_params.connector_client.ok_or_else(|| { SinkError::Remote(anyhow_error!( "connector node endpoint not specified or unable to connect to connector node" @@ -274,24 +261,25 @@ impl RemoteSinkWriterInner { .inspect_err(|e| { error!( "failed to start sink stream for connector `{}`: {:?}", - &config.connector_type, e + R::SINK_NAME, + e ) })?; tracing::trace!( "{:?} sink stream started with properties: {:?}", - &config.connector_type, - &config.properties + R::SINK_NAME, + ¶m.properties ); + let schema = param.schema(); + Ok(Self { - connector_type: config.connector_type, - properties: config.properties, + properties: param.properties, epoch: None, batch_id: 0, - schema: param.schema(), stream_handle, payload_format: connector_params.sink_payload_format, - json_encoder: JsonEncoder::new(param.schema(), None, TimestampHandlingMode::String), + json_encoder: JsonEncoder::new(schema, None, TimestampHandlingMode::String), _phantom: PhantomData, }) } @@ -300,8 +288,8 @@ impl RemoteSinkWriterInner { fn for_test( response_receiver: UnboundedReceiver>, request_sender: Sender, - ) -> RemoteSinkWriter { - use risingwave_common::catalog::Field; + ) -> RemoteSinkWriter { + use risingwave_common::catalog::{Field, Schema}; let properties = HashMap::from([("output.path".to_string(), "/tmp/rw".to_string())]); let schema = Schema::new(vec![ @@ -328,12 +316,10 @@ impl RemoteSinkWriterInner { ); RemoteSinkWriter { - connector_type: "file".to_string(), properties, epoch: None, batch_id: 0, - json_encoder: JsonEncoder::new(schema.clone(), None, TimestampHandlingMode::String), - schema, + json_encoder: JsonEncoder::new(schema, None, TimestampHandlingMode::String), stream_handle, payload_format: SinkPayloadFormat::Json, _phantom: PhantomData, @@ -347,7 +333,7 @@ trait HandleBarrierResponse { fn non_checkpoint_return_value() -> Self::SinkMetadata; } -impl HandleBarrierResponse for RemoteSinkWriter { +impl HandleBarrierResponse for RemoteSinkWriter { type SinkMetadata = (); fn handle_commit_response(rsp: CommitResponse) -> Result { @@ -360,7 +346,7 @@ impl HandleBarrierResponse for RemoteSinkWriter { fn non_checkpoint_return_value() -> Self::SinkMetadata {} } -impl HandleBarrierResponse for CoordinatedRemoteSinkWriter { +impl HandleBarrierResponse for CoordinatedRemoteSinkWriter { type SinkMetadata = Option; fn handle_commit_response(rsp: CommitResponse) -> Result { @@ -379,7 +365,7 @@ impl HandleBarrierResponse for CoordinatedRemoteSinkWriter { } #[async_trait] -impl SinkWriter for RemoteSinkWriterInner +impl SinkWriter for RemoteSinkWriterInner where Self: HandleBarrierResponse, { @@ -449,21 +435,25 @@ where } } -pub struct RemoteCoordinator { +pub struct RemoteCoordinator { stream_handle: SinkCoordinatorStreamHandle, + _phantom: PhantomData, } -impl RemoteCoordinator { +impl RemoteCoordinator { pub async fn new(client: ConnectorClient, param: SinkParam) -> Result { let stream_handle = client .start_sink_coordinator_stream(param.to_proto()) .await?; - Ok(RemoteCoordinator { stream_handle }) + Ok(RemoteCoordinator { + stream_handle, + _phantom: PhantomData, + }) } } #[async_trait] -impl SinkCommitCoordinator for RemoteCoordinator { +impl SinkCommitCoordinator for RemoteCoordinator { async fn init(&mut self) -> Result<()> { Ok(()) } @@ -486,15 +476,20 @@ mod test { use risingwave_pb::data; use tokio::sync::mpsc; - use crate::sink::remote::RemoteSinkWriter; + use crate::sink::remote::{RemoteSinkTrait, RemoteSinkWriter}; use crate::sink::SinkWriter; + struct TestRemote; + impl RemoteSinkTrait for TestRemote { + const SINK_NAME: &'static str = "test-remote"; + } + #[tokio::test] async fn test_epoch_check() { let (request_sender, mut request_recv) = mpsc::channel(16); let (_, resp_recv) = mpsc::unbounded_channel(); - let mut sink = RemoteSinkWriter::for_test(resp_recv, request_sender); + let mut sink = >::for_test(resp_recv, request_sender); let chunk = StreamChunk::from_pretty( " i T + 1 Ripper @@ -531,7 +526,7 @@ mod test { async fn test_remote_sink() { let (request_sender, mut request_receiver) = mpsc::channel(16); let (response_sender, response_receiver) = mpsc::unbounded_channel(); - let mut sink = RemoteSinkWriter::for_test(response_receiver, request_sender); + let mut sink = >::for_test(response_receiver, request_sender); let chunk_a = StreamChunk::from_pretty( " i T diff --git a/src/connector/src/sink/test_sink.rs b/src/connector/src/sink/test_sink.rs index d288253eb7a2..96ed91d71bb5 100644 --- a/src/connector/src/sink/test_sink.rs +++ b/src/connector/src/sink/test_sink.rs @@ -14,23 +14,57 @@ use std::sync::{Arc, OnceLock}; +use anyhow::anyhow; use parking_lot::Mutex; -use crate::sink::boxed::BoxSink; -use crate::sink::{SinkError, SinkParam}; +use crate::sink::boxed::{BoxCoordinator, BoxWriter}; +use crate::sink::{Sink, SinkError, SinkParam, SinkWriterParam}; -pub type BuildBoxSink = - Box Result + Send + Sync + 'static>; +pub trait BuildBoxWriterTrait = FnMut(SinkParam, SinkWriterParam) -> BoxWriter<()> + Send + 'static; + +pub type BuildBoxWriter = Box; pub const TEST_SINK_NAME: &str = "test"; +#[derive(Debug)] +pub struct TestSink { + param: SinkParam, +} + +impl TryFrom for TestSink { + type Error = SinkError; + + fn try_from(param: SinkParam) -> Result { + if cfg!(any(madsim, test)) { + Ok(TestSink { param }) + } else { + Err(SinkError::Config(anyhow!("test sink only support in test"))) + } + } +} + +impl Sink for TestSink { + type Coordinator = BoxCoordinator; + type Writer = BoxWriter<()>; + + const SINK_NAME: &'static str = "test"; + + async fn validate(&self) -> crate::sink::Result<()> { + Ok(()) + } + + async fn new_writer(&self, writer_param: SinkWriterParam) -> crate::sink::Result { + Ok(build_box_writer(self.param.clone(), writer_param)) + } +} + struct TestSinkRegistry { - build_box_sink: Arc>>, + build_box_writer: Arc>>, } impl TestSinkRegistry { fn new() -> Self { TestSinkRegistry { - build_box_sink: Arc::new(Mutex::new(None)), + build_box_writer: Arc::new(Mutex::new(None)), } } } @@ -44,25 +78,23 @@ pub struct TestSinkRegistryGuard; impl Drop for TestSinkRegistryGuard { fn drop(&mut self) { - assert!(get_registry().build_box_sink.lock().take().is_some()); + assert!(get_registry().build_box_writer.lock().take().is_some()); } } -pub fn registry_build_sink( - build_sink: impl Fn(SinkParam) -> Result + Send + Sync + 'static, -) -> TestSinkRegistryGuard { +pub fn registry_build_sink(build_box_writer: impl BuildBoxWriterTrait) -> TestSinkRegistryGuard { assert!(get_registry() - .build_box_sink + .build_box_writer .lock() - .replace(Box::new(build_sink)) + .replace(Box::new(build_box_writer)) .is_none()); TestSinkRegistryGuard } -pub fn build_test_sink(param: SinkParam) -> Result { +pub fn build_box_writer(param: SinkParam, writer_param: SinkWriterParam) -> BoxWriter<()> { (get_registry() - .build_box_sink + .build_box_writer .lock() - .as_ref() - .expect("should not be empty"))(param) + .as_mut() + .expect("should not be empty"))(param, writer_param) } diff --git a/src/meta/src/stream/sink.rs b/src/meta/src/stream/sink.rs index 4717a1ffdfe9..8544011071ec 100644 --- a/src/meta/src/stream/sink.rs +++ b/src/meta/src/stream/sink.rs @@ -25,5 +25,5 @@ pub async fn validate_sink(prost_sink_catalog: &PbSink) -> MetaResult<()> { let sink = build_sink(param)?; - dispatch_sink!(sink, sink, { Ok(sink.validate().await?) }) + dispatch_sink!(sink, sink, Ok(sink.validate().await?)) } diff --git a/src/tests/simulation/tests/integration_tests/sink/basic.rs b/src/tests/simulation/tests/integration_tests/sink/basic.rs index a5715a8471c4..f2c6d9f3c8b9 100644 --- a/src/tests/simulation/tests/integration_tests/sink/basic.rs +++ b/src/tests/simulation/tests/integration_tests/sink/basic.rs @@ -74,32 +74,6 @@ impl Drop for TestWriter { } } -struct TestSink { - row_counter: Arc, - parallelism_counter: Arc, -} - -#[async_trait] -impl Sink for TestSink { - type Coordinator = BoxCoordinator; - type Writer = BoxWriter<()>; - - async fn validate(&self) -> risingwave_connector::sink::Result<()> { - Ok(()) - } - - async fn new_writer( - &self, - _writer_param: SinkWriterParam, - ) -> risingwave_connector::sink::Result { - self.parallelism_counter.fetch_add(1, Relaxed); - Ok(Box::new(TestWriter { - parallelism_counter: self.parallelism_counter.clone(), - row_counter: self.row_counter.clone(), - })) - } -} - fn build_stream_chunk(row_iter: impl Iterator) -> StreamChunk { let mut builder = DataChunkBuilder::new(vec![DataType::Int32, DataType::Varchar], 100000); for (id, name) in row_iter { @@ -142,11 +116,12 @@ async fn test_sink_basic() -> Result<()> { let _sink_guard = registry_build_sink({ let row_counter = row_counter.clone(); let parallelism_counter = parallelism_counter.clone(); - move |_param| { - Ok(Box::new(TestSink { + move |_, _| { + parallelism_counter.fetch_add(1, Relaxed); + Box::new(TestWriter { row_counter: row_counter.clone(), parallelism_counter: parallelism_counter.clone(), - })) + }) } }); @@ -239,11 +214,12 @@ async fn test_sink_decouple_basic() -> Result<()> { let _sink_guard = registry_build_sink({ let row_counter = row_counter.clone(); let parallelism_counter = parallelism_counter.clone(); - move |_param| { - Ok(Box::new(TestSink { + move |_, _| { + parallelism_counter.fetch_add(1, Relaxed); + Box::new(TestWriter { row_counter: row_counter.clone(), parallelism_counter: parallelism_counter.clone(), - })) + }) } });