diff --git a/src/connector/src/macros.rs b/src/connector/src/macros.rs index a30ad58ce07ce..f18bb18462749 100644 --- a/src/connector/src/macros.rs +++ b/src/connector/src/macros.rs @@ -12,39 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -#[macro_export] -macro_rules! impl_split_enumerator { - ($({ $variant_name:ident, $split_enumerator_name:ident} ),*) => { - impl SplitEnumeratorImpl { - - pub async fn create(properties: ConnectorProperties, context: SourceEnumeratorContextRef) -> Result { - match properties { - $( ConnectorProperties::$variant_name(props) => $split_enumerator_name::new(*props, context).await.map(Self::$variant_name), )* - other => Err(anyhow!( - "split enumerator type for config {:?} is not supported", - other - )), - } - } - - pub async fn list_splits(&mut self) -> Result> { - match self { - $( Self::$variant_name(inner) => inner - .list_splits() - .await - .map(|ss| { - ss.into_iter() - .map(SplitImpl::$variant_name) - .collect_vec() - }) - .map_err(|e| risingwave_common::error::ErrorCode::ConnectorError(e.into()).into()), - )* - } - } - } - } -} - #[macro_export] macro_rules! impl_split { ($({ $variant_name:ident, $connector_name:ident, $split:ty} ),*) => { @@ -143,36 +110,6 @@ macro_rules! impl_split { } } -#[macro_export] -macro_rules! impl_split_reader { - ($({ $variant_name:ident, $split_reader_name:ident} ),*) => { - impl SplitReaderImpl { - pub fn into_stream(self) -> BoxSourceWithStateStream { - match self { - $( Self::$variant_name(inner) => inner.into_stream(), )* } - } - - pub async fn create( - config: ConnectorProperties, - state: ConnectorState, - parser_config: ParserConfig, - source_ctx: SourceContextRef, - columns: Option>, - ) -> Result { - if state.is_none() { - return Ok(Self::Dummy(Box::new(DummySplitReader {}))); - } - let splits = state.unwrap(); - let connector = match config { - $( ConnectorProperties::$variant_name(props) => Self::$variant_name(Box::new($split_reader_name::new(*props, splits, parser_config, source_ctx, columns).await?)), )* - }; - - Ok(connector) - } - } - } -} - #[macro_export] macro_rules! impl_connector_properties { ($({ $variant_name:ident, $connector_name:ident } ),*) => { diff --git a/src/connector/src/source/base.rs b/src/connector/src/source/base.rs index 5a94b36a6b8e8..da11310d78d56 100644 --- a/src/connector/src/source/base.rs +++ b/src/connector/src/source/base.rs @@ -35,53 +35,51 @@ use super::filesystem::{FsSplit, S3Properties, S3_CONNECTOR}; use super::google_pubsub::GooglePubsubMeta; use super::kafka::KafkaMeta; use super::monitor::SourceMetrics; -use super::nats::enumerator::NatsSplitEnumerator; -use super::nats::source::NatsSplitReader; use super::nexmark::source::message::NexmarkMeta; use crate::parser::ParserConfig; use crate::source::cdc::{ - CdcProperties, CdcSplitReader, Citus, CitusDebeziumSplitEnumerator, DebeziumCdcSplit, - DebeziumSplitEnumerator, Mysql, MysqlDebeziumSplitEnumerator, Postgres, - PostgresDebeziumSplitEnumerator, CITUS_CDC_CONNECTOR, MYSQL_CDC_CONNECTOR, - POSTGRES_CDC_CONNECTOR, + CdcProperties, Citus, DebeziumCdcSplit, Mysql, Postgres, CITUS_CDC_CONNECTOR, + MYSQL_CDC_CONNECTOR, POSTGRES_CDC_CONNECTOR, }; -use crate::source::datagen::{ - DatagenProperties, DatagenSplit, DatagenSplitEnumerator, DatagenSplitReader, DATAGEN_CONNECTOR, -}; -use crate::source::dummy_connector::DummySplitReader; -use crate::source::filesystem::{S3FileReader, S3SplitEnumerator}; -use crate::source::google_pubsub::{ - PubsubProperties, PubsubSplit, PubsubSplitEnumerator, PubsubSplitReader, - GOOGLE_PUBSUB_CONNECTOR, -}; -use crate::source::kafka::enumerator::KafkaSplitEnumerator; -use crate::source::kafka::source::KafkaSplitReader; +pub(crate) use crate::source::common::CommonSplitReader; +use crate::source::datagen::{DatagenProperties, DatagenSplit, DATAGEN_CONNECTOR}; +use crate::source::google_pubsub::{PubsubProperties, PubsubSplit, GOOGLE_PUBSUB_CONNECTOR}; use crate::source::kafka::{KafkaProperties, KafkaSplit, KAFKA_CONNECTOR}; -use crate::source::kinesis::enumerator::client::KinesisSplitEnumerator; -use crate::source::kinesis::source::reader::KinesisSplitReader; use crate::source::kinesis::split::KinesisSplit; use crate::source::kinesis::{KinesisProperties, KINESIS_CONNECTOR}; use crate::source::monitor::EnumeratorMetrics; -use crate::source::nats::split::NatsSplit; +use crate::source::nats::source::NatsSplit; use crate::source::nats::{NatsProperties, NATS_CONNECTOR}; -use crate::source::nexmark::source::reader::NexmarkSplitReader; -use crate::source::nexmark::{ - NexmarkProperties, NexmarkSplit, NexmarkSplitEnumerator, NEXMARK_CONNECTOR, -}; -use crate::source::pulsar::source::reader::PulsarSplitReader; -use crate::source::pulsar::{ - PulsarProperties, PulsarSplit, PulsarSplitEnumerator, PULSAR_CONNECTOR, -}; -use crate::{impl_connector_properties, impl_split, impl_split_enumerator, impl_split_reader}; +use crate::source::nexmark::{NexmarkProperties, NexmarkSplit, NEXMARK_CONNECTOR}; +use crate::source::pulsar::{PulsarProperties, PulsarSplit, PULSAR_CONNECTOR}; +use crate::{impl_connector_properties, impl_split}; const SPLIT_TYPE_FIELD: &str = "split_type"; const SPLIT_INFO_FIELD: &str = "split_info"; +pub trait SourceProperties: Clone { + const SOURCE_NAME: &'static str; + type Split: SplitMetaData + TryFrom + Into; + type SplitEnumerator: SplitEnumerator; + type SplitReader: SplitReader; +} + +pub async fn create_split_reader( + prop: P, + splits: Vec, + parser_config: ParserConfig, + source_ctx: SourceContextRef, + columns: Option>, +) -> Result { + let splits = splits.into_iter().map(P::Split::try_from).try_collect()?; + P::SplitReader::new(prop, splits, parser_config, source_ctx, columns).await +} + /// [`SplitEnumerator`] fetches the split metadata from the external source service. /// NOTE: It runs in the meta server, so probably it should be moved to the `meta` crate. #[async_trait] pub trait SplitEnumerator: Sized { - type Split: SplitMetaData + Send + Sync; + type Split: SplitMetaData + Send; type Properties; async fn new(properties: Self::Properties, context: SourceEnumeratorContextRef) @@ -276,12 +274,13 @@ impl From for StreamChunkWithState { /// responsible for parsing, it is used to read messages from the outside and transform them into a /// stream of parsed [`StreamChunk`] #[async_trait] -pub trait SplitReader: Sized { +pub trait SplitReader: Sized + Send { type Properties; + type Split: SplitMetaData; async fn new( properties: Self::Properties, - state: Vec, + state: Vec, parser_config: ParserConfig, source_ctx: SourceContextRef, columns: Option>, @@ -303,7 +302,27 @@ pub enum ConnectorProperties { CitusCdc(Box>), GooglePubsub(Box), Nats(Box), - Dummy(Box<()>), +} + +#[macro_export] +macro_rules! dispatch_source_prop { + ($impl:expr, $source_prop:ident, $body:tt) => {{ + use $crate::source::base::ConnectorProperties; + + match $impl { + ConnectorProperties::Kafka($source_prop) => $body, + ConnectorProperties::Pulsar($source_prop) => $body, + ConnectorProperties::Kinesis($source_prop) => $body, + ConnectorProperties::Nexmark($source_prop) => $body, + ConnectorProperties::Datagen($source_prop) => $body, + ConnectorProperties::S3($source_prop) => $body, + ConnectorProperties::MysqlCdc($source_prop) => $body, + ConnectorProperties::PostgresCdc($source_prop) => $body, + ConnectorProperties::CitusCdc($source_prop) => $body, + ConnectorProperties::GooglePubsub($source_prop) => $body, + ConnectorProperties::Nats($source_prop) => $body, + } + }}; } impl ConnectorProperties { @@ -345,35 +364,6 @@ impl SplitImpl { } } -pub enum SplitReaderImpl { - S3(Box), - Dummy(Box), - Kinesis(Box), - Kafka(Box), - Nexmark(Box), - Pulsar(Box), - Datagen(Box), - MysqlCdc(Box>), - PostgresCdc(Box>), - CitusCdc(Box>), - GooglePubsub(Box), - Nats(Box), -} - -pub enum SplitEnumeratorImpl { - Kafka(KafkaSplitEnumerator), - Pulsar(PulsarSplitEnumerator), - Kinesis(KinesisSplitEnumerator), - Nexmark(NexmarkSplitEnumerator), - Datagen(DatagenSplitEnumerator), - MysqlCdc(MysqlDebeziumSplitEnumerator), - PostgresCdc(PostgresDebeziumSplitEnumerator), - CitusCdc(CitusDebeziumSplitEnumerator), - GooglePubsub(PubsubSplitEnumerator), - S3(S3SplitEnumerator), - Nats(NatsSplitEnumerator), -} - impl_connector_properties! { { Kafka, KAFKA_CONNECTOR }, { Pulsar, PULSAR_CONNECTOR }, @@ -385,20 +375,6 @@ impl_connector_properties! { { Nats, NATS_CONNECTOR } } -impl_split_enumerator! { - { Kafka, KafkaSplitEnumerator }, - { Pulsar, PulsarSplitEnumerator }, - { Kinesis, KinesisSplitEnumerator }, - { Nexmark, NexmarkSplitEnumerator }, - { Datagen, DatagenSplitEnumerator }, - { MysqlCdc, DebeziumSplitEnumerator }, - { PostgresCdc, DebeziumSplitEnumerator }, - { CitusCdc, DebeziumSplitEnumerator }, - { GooglePubsub, PubsubSplitEnumerator}, - { S3, S3SplitEnumerator }, - { Nats, NatsSplitEnumerator } -} - impl_split! { { Kafka, KAFKA_CONNECTOR, KafkaSplit }, { Pulsar, PULSAR_CONNECTOR, PulsarSplit }, @@ -413,21 +389,6 @@ impl_split! { { Nats, NATS_CONNECTOR, NatsSplit } } -impl_split_reader! { - { S3, S3FileReader }, - { Kafka, KafkaSplitReader }, - { Pulsar, PulsarSplitReader }, - { Kinesis, KinesisSplitReader }, - { Nexmark, NexmarkSplitReader }, - { Datagen, DatagenSplitReader }, - { MysqlCdc, CdcSplitReader}, - { PostgresCdc, CdcSplitReader}, - { CitusCdc, CdcSplitReader }, - { GooglePubsub, PubsubSplitReader }, - { Nats, NatsSplitReader }, - { Dummy, DummySplitReader } -} - pub type DataType = risingwave_common::types::DataType; #[derive(Clone, Debug)] @@ -491,7 +452,7 @@ pub trait SplitMetaData: Sized { /// [`ConnectorState`] maintains the consuming splits' info. In specific split readers, /// `ConnectorState` cannot be [`None`] and contains one(for mq split readers) or many(for fs /// split readers) [`SplitImpl`]. If no split is assigned to source executor, `ConnectorState` is -/// [`None`] and [`DummySplitReader`] is up instead of other split readers. +/// [`None`] and the created source stream will be a pending stream. pub type ConnectorState = Option>; #[cfg(test)] diff --git a/src/connector/src/source/cdc/mod.rs b/src/connector/src/source/cdc/mod.rs index 86a8b16adec02..e7054060a6f2b 100644 --- a/src/connector/src/source/cdc/mod.rs +++ b/src/connector/src/source/cdc/mod.rs @@ -26,7 +26,7 @@ pub use source::*; pub use split::*; use crate::impl_cdc_source_type; -use crate::source::ConnectorProperties; +use crate::source::{ConnectorProperties, SourceProperties, SplitImpl}; pub const CDC_CONNECTOR_NAME_SUFFIX: &str = "-cdc"; @@ -52,6 +52,18 @@ pub struct CdcProperties { pub _phantom: PhantomData, } +impl SourceProperties for CdcProperties +where + DebeziumCdcSplit: TryFrom + Into, + DebeziumSplitEnumerator: ListCdcSplits, +{ + type Split = DebeziumCdcSplit; + type SplitEnumerator = DebeziumSplitEnumerator; + type SplitReader = CdcSplitReader; + + const SOURCE_NAME: &'static str = T::CDC_CONNECTOR_NAME; +} + impl CdcProperties { pub fn get_source_type_pb(&self) -> SourceType { SourceType::from(T::source_type()) diff --git a/src/connector/src/source/cdc/source/reader.rs b/src/connector/src/source/cdc/source/reader.rs index 6c76d838af128..f85367d32e5bf 100644 --- a/src/connector/src/source/cdc/source/reader.rs +++ b/src/connector/src/source/cdc/source/reader.rs @@ -30,10 +30,9 @@ use tokio::sync::mpsc; use crate::parser::ParserConfig; use crate::source::base::SourceMessage; use crate::source::cdc::{CdcProperties, CdcSourceType, CdcSourceTypeTrait, DebeziumCdcSplit}; -use crate::source::common::{into_chunk_stream, CommonSplitReader}; use crate::source::{ - BoxSourceWithStateStream, Column, SourceContextRef, SplitId, SplitImpl, SplitMetaData, - SplitReader, + into_chunk_stream, BoxSourceWithStateStream, Column, CommonSplitReader, SourceContextRef, + SplitId, SplitMetaData, SplitReader, }; pub struct CdcSplitReader { @@ -53,22 +52,20 @@ pub struct CdcSplitReader { const DEFAULT_CHANNEL_SIZE: usize = 16; #[async_trait] -impl SplitReader for CdcSplitReader -where - DebeziumCdcSplit: TryFrom, -{ +impl SplitReader for CdcSplitReader { type Properties = CdcProperties; + type Split = DebeziumCdcSplit; #[allow(clippy::unused_async)] async fn new( conn_props: CdcProperties, - splits: Vec, + splits: Vec>, parser_config: ParserConfig, source_ctx: SourceContextRef, _columns: Option>, ) -> Result { assert_eq!(splits.len(), 1); - let split = DebeziumCdcSplit::::try_from(splits.into_iter().next().unwrap())?; + let split = splits.into_iter().next().unwrap(); let split_id = split.id(); match T::source_type() { CdcSourceType::Mysql | CdcSourceType::Postgres => Ok(Self { @@ -101,10 +98,7 @@ where } } -impl CommonSplitReader for CdcSplitReader -where - Self: SplitReader, -{ +impl CommonSplitReader for CdcSplitReader { #[try_stream(ok = Vec, error = anyhow::Error)] async fn into_data_stream(self) { // rewrite the hostname and port for the split diff --git a/src/connector/src/source/common.rs b/src/connector/src/source/common.rs index 02f1cbde3de38..86ad60cc1b969 100644 --- a/src/connector/src/source/common.rs +++ b/src/connector/src/source/common.rs @@ -27,7 +27,7 @@ pub(crate) trait CommonSplitReader: SplitReader + 'static { #[try_stream(boxed, ok = StreamChunkWithState, error = RwError)] pub(crate) async fn into_chunk_stream( - reader: impl CommonSplitReader + Send, + reader: impl CommonSplitReader, parser_config: ParserConfig, source_ctx: SourceContextRef, ) { diff --git a/src/connector/src/source/datagen/mod.rs b/src/connector/src/source/datagen/mod.rs index c0d9717db5366..af2dd2c388e92 100644 --- a/src/connector/src/source/datagen/mod.rs +++ b/src/connector/src/source/datagen/mod.rs @@ -24,6 +24,8 @@ use serde_with::{serde_as, DisplayFromStr}; pub use source::*; pub use split::*; +use crate::source::SourceProperties; + pub const DATAGEN_CONNECTOR: &str = "datagen"; #[serde_as] @@ -55,6 +57,14 @@ pub struct DatagenProperties { fields: HashMap, } +impl SourceProperties for DatagenProperties { + type Split = DatagenSplit; + type SplitEnumerator = DatagenSplitEnumerator; + type SplitReader = DatagenSplitReader; + + const SOURCE_NAME: &'static str = DATAGEN_CONNECTOR; +} + fn default_rows_per_second() -> u64 { 10 } diff --git a/src/connector/src/source/datagen/source/reader.rs b/src/connector/src/source/datagen/source/reader.rs index 30d1bfae10c4e..3840b0e1f5dbc 100644 --- a/src/connector/src/source/datagen/source/reader.rs +++ b/src/connector/src/source/datagen/source/reader.rs @@ -21,13 +21,12 @@ use risingwave_common::field_generator::{FieldGeneratorImpl, VarcharProperty}; use super::generator::DatagenEventGenerator; use crate::parser::{EncodingProperties, ParserConfig, ProtocolProperties}; -use crate::source::common::{into_chunk_stream, CommonSplitReader}; use crate::source::data_gen_util::spawn_data_generation_stream; use crate::source::datagen::source::SEQUENCE_FIELD_KIND; use crate::source::datagen::{DatagenProperties, DatagenSplit, FieldDesc}; use crate::source::{ - BoxSourceWithStateStream, Column, DataType, SourceContextRef, SourceMessage, SplitId, - SplitImpl, SplitMetaData, SplitReader, + into_chunk_stream, BoxSourceWithStateStream, Column, CommonSplitReader, DataType, + SourceContextRef, SourceMessage, SplitId, SplitMetaData, SplitReader, }; pub struct DatagenSplitReader { @@ -42,16 +41,16 @@ pub struct DatagenSplitReader { #[async_trait] impl SplitReader for DatagenSplitReader { type Properties = DatagenProperties; + type Split = DatagenSplit; #[allow(clippy::unused_async)] async fn new( properties: DatagenProperties, - splits: Vec, + splits: Vec, parser_config: ParserConfig, source_ctx: SourceContextRef, columns: Option>, ) -> Result { - let mut assigned_split = DatagenSplit::default(); let mut events_so_far = u64::default(); tracing::debug!("Splits for datagen found! {:?}", splits); @@ -59,14 +58,12 @@ impl SplitReader for DatagenSplitReader { let split = splits.into_iter().next().unwrap(); // TODO: currently, assume there's only on split in one reader let split_id = split.id(); - if let SplitImpl::Datagen(n) = split { - if let Some(s) = n.start_offset { - // start_offset in `SplitImpl` indicates the latest successfully generated - // index, so here we use start_offset+1 - events_so_far = s + 1; - }; - assigned_split = n; - } + let assigned_split = split; + if let Some(s) = assigned_split.start_offset { + // start_offset in `SplitImpl` indicates the latest successfully generated + // index, so here we use start_offset+1 + events_so_far = s + 1; + }; let split_index = assigned_split.split_index as u64; let split_num = assigned_split.split_num as u64; @@ -180,7 +177,7 @@ impl CommonSplitReader for DatagenSplitReader { fn into_data_stream(self) -> impl Stream, anyhow::Error>> { // Will buffer at most 4 event chunks. const BUFFER_SIZE: usize = 4; - spawn_data_generation_stream(self.generator.into_msg_stream(), BUFFER_SIZE).boxed() + spawn_data_generation_stream(self.generator.into_msg_stream(), BUFFER_SIZE) } } @@ -346,11 +343,11 @@ mod tests { is_visible: true, }, ]; - let state = vec![SplitImpl::Datagen(DatagenSplit { + let state = vec![DatagenSplit { split_index: 0, split_num: 1, start_offset: None, - })]; + }]; let properties = DatagenProperties { split_num: None, rows_per_second: 10, @@ -424,11 +421,11 @@ mod tests { is_visible: true, }, ]; - let state = vec![SplitImpl::Datagen(DatagenSplit { + let state = vec![DatagenSplit { split_index: 0, split_num: 1, start_offset: None, - })]; + }]; let properties = DatagenProperties { split_num: None, rows_per_second: 10, @@ -454,11 +451,11 @@ mod tests { let v1 = stream.skip(1).next().await.unwrap()?; - let state = vec![SplitImpl::Datagen(DatagenSplit { + let state = vec![DatagenSplit { split_index: 0, split_num: 1, start_offset: Some(9), - })]; + }]; let mut stream = DatagenSplitReader::new( properties, state, diff --git a/src/connector/src/source/dummy_connector.rs b/src/connector/src/source/dummy_connector.rs deleted file mode 100644 index 3a5b8922fd29a..0000000000000 --- a/src/connector/src/source/dummy_connector.rs +++ /dev/null @@ -1,45 +0,0 @@ -// Copyright 2023 RisingWave Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use anyhow::Result; -use async_trait::async_trait; -use futures::StreamExt; - -use super::{SourceContextRef, SplitImpl, SplitReader}; -use crate::parser::ParserConfig; -use crate::source::{BoxSourceWithStateStream, Column}; - -/// [`DummySplitReader`] is a placeholder for source executor that is assigned no split. It will -/// wait forever when calling `next`. -#[derive(Clone, Debug)] -pub struct DummySplitReader; - -#[async_trait] -impl SplitReader for DummySplitReader { - type Properties = (); - - async fn new( - _properties: Self::Properties, - _state: Vec, - _parser_config: ParserConfig, - _source_ctx: SourceContextRef, - _columns: Option>, - ) -> Result { - Ok(Self {}) - } - - fn into_stream(self) -> BoxSourceWithStateStream { - futures::stream::pending().boxed() - } -} diff --git a/src/connector/src/source/filesystem/s3/mod.rs b/src/connector/src/source/filesystem/s3/mod.rs index 62f6bcd922a80..12701087309e0 100644 --- a/src/connector/src/source/filesystem/s3/mod.rs +++ b/src/connector/src/source/filesystem/s3/mod.rs @@ -19,6 +19,8 @@ use serde::Deserialize; pub use source::S3FileReader; use crate::aws_auth::AwsAuthProps; +use crate::source::filesystem::FsSplit; +use crate::source::SourceProperties; pub const S3_CONNECTOR: &str = "s3"; @@ -38,6 +40,14 @@ pub struct S3Properties { endpoint_url: Option, } +impl SourceProperties for S3Properties { + type Split = FsSplit; + type SplitEnumerator = S3SplitEnumerator; + type SplitReader = S3FileReader; + + const SOURCE_NAME: &'static str = S3_CONNECTOR; +} + impl From<&S3Properties> for AwsAuthProps { fn from(props: &S3Properties) -> Self { Self { diff --git a/src/connector/src/source/filesystem/s3/source/reader.rs b/src/connector/src/source/filesystem/s3/source/reader.rs index b20822139d181..736e4493d3f55 100644 --- a/src/connector/src/source/filesystem/s3/source/reader.rs +++ b/src/connector/src/source/filesystem/s3/source/reader.rs @@ -37,7 +37,7 @@ use crate::source::filesystem::file_common::FsSplit; use crate::source::filesystem::nd_streaming; use crate::source::filesystem::s3::S3Properties; use crate::source::{ - BoxSourceWithStateStream, Column, SourceContextRef, SourceMessage, SourceMeta, SplitImpl, + BoxSourceWithStateStream, Column, SourceContextRef, SourceMessage, SourceMeta, StreamChunkWithState, }; const MAX_CHANNEL_BUFFER_SIZE: usize = 2048; @@ -164,10 +164,11 @@ impl S3FileReader { #[async_trait] impl SplitReader for S3FileReader { type Properties = S3Properties; + type Split = FsSplit; async fn new( props: S3Properties, - state: Vec, + splits: Vec, parser_config: ParserConfig, source_ctx: SourceContextRef, _columns: Option>, @@ -179,10 +180,6 @@ impl SplitReader for S3FileReader { let bucket_name = props.bucket_name; let s3_client = s3_client(&sdk_config, Some(default_conn_config())); - let splits = state - .into_iter() - .map(|split| split.into_fs().expect("not a fs split")) - .collect(); let s3_file_reader = S3FileReader { split_offset: HashMap::new(), bucket_name, @@ -272,8 +269,6 @@ mod tests { let splits = enumerator.list_splits().await.unwrap(); println!("splits {:?}", splits); - let splits = splits.into_iter().map(SplitImpl::S3).collect(); - let descs = vec![ SourceColumnDesc::simple("id", DataType::Int64, 1.into()), SourceColumnDesc::simple("name", DataType::Varchar, 2.into()), diff --git a/src/connector/src/source/google_pubsub/mod.rs b/src/connector/src/source/google_pubsub/mod.rs index 0c93f672ccdb5..c4c2e5c716a13 100644 --- a/src/connector/src/source/google_pubsub/mod.rs +++ b/src/connector/src/source/google_pubsub/mod.rs @@ -23,6 +23,8 @@ use serde_with::{serde_as, DisplayFromStr}; pub use source::*; pub use split::*; +use crate::source::SourceProperties; + pub const GOOGLE_PUBSUB_CONNECTOR: &str = "google_pubsub"; #[serde_as] @@ -70,6 +72,14 @@ pub struct PubsubProperties { pub start_snapshot: Option, } +impl SourceProperties for PubsubProperties { + type Split = PubsubSplit; + type SplitEnumerator = PubsubSplitEnumerator; + type SplitReader = PubsubSplitReader; + + const SOURCE_NAME: &'static str = GOOGLE_PUBSUB_CONNECTOR; +} + impl PubsubProperties { /// `initialize_env` sets environment variables read by the `google-cloud-pubsub` crate pub(crate) fn initialize_env(&self) { diff --git a/src/connector/src/source/google_pubsub/source/reader.rs b/src/connector/src/source/google_pubsub/source/reader.rs index d4fa8a9ab5c98..18aeeecc050e4 100644 --- a/src/connector/src/source/google_pubsub/source/reader.rs +++ b/src/connector/src/source/google_pubsub/source/reader.rs @@ -23,11 +23,10 @@ use tonic::Code; use super::TaggedReceivedMessage; use crate::parser::ParserConfig; -use crate::source::common::{into_chunk_stream, CommonSplitReader}; -use crate::source::google_pubsub::PubsubProperties; +use crate::source::google_pubsub::{PubsubProperties, PubsubSplit}; use crate::source::{ - BoxSourceWithStateStream, Column, SourceContextRef, SourceMessage, SplitId, SplitImpl, - SplitMetaData, SplitReader, + into_chunk_stream, BoxSourceWithStateStream, Column, CommonSplitReader, SourceContextRef, + SourceMessage, SplitId, SplitMetaData, SplitReader, }; const PUBSUB_MAX_FETCH_MESSAGES: usize = 1024; @@ -107,10 +106,11 @@ impl CommonSplitReader for PubsubSplitReader { #[async_trait] impl SplitReader for PubsubSplitReader { type Properties = PubsubProperties; + type Split = PubsubSplit; async fn new( properties: PubsubProperties, - splits: Vec, + splits: Vec, parser_config: ParserConfig, source_ctx: SourceContextRef, _columns: Option>, @@ -119,12 +119,7 @@ impl SplitReader for PubsubSplitReader { splits.len() == 1, "the pubsub reader only supports a single split" ); - let split = splits - .into_iter() - .next() - .unwrap() - .into_google_pubsub() - .unwrap(); + let split = splits.into_iter().next().unwrap(); // Set environment variables consumed by `google_cloud_pubsub` properties.initialize_env(); diff --git a/src/connector/src/source/kafka/mod.rs b/src/connector/src/source/kafka/mod.rs index 50fabadb41365..c74ae3ac6152f 100644 --- a/src/connector/src/source/kafka/mod.rs +++ b/src/connector/src/source/kafka/mod.rs @@ -27,6 +27,8 @@ pub use source::*; pub use split::*; use crate::common::KafkaCommon; +use crate::source::SourceProperties; + pub const KAFKA_CONNECTOR: &str = "kafka"; pub const KAFKA_PROPS_BROKER_KEY: &str = "properties.bootstrap.server"; pub const KAFKA_PROPS_BROKER_KEY_ALIAS: &str = "kafka.brokers"; @@ -123,6 +125,14 @@ pub struct KafkaProperties { pub rdkafka_properties: RdKafkaPropertiesConsumer, } +impl SourceProperties for KafkaProperties { + type Split = KafkaSplit; + type SplitEnumerator = KafkaSplitEnumerator; + type SplitReader = KafkaSplitReader; + + const SOURCE_NAME: &'static str = KAFKA_CONNECTOR; +} + impl KafkaProperties { pub fn set_client(&self, c: &mut rdkafka::ClientConfig) { self.common.set_client(c); diff --git a/src/connector/src/source/kafka/source/reader.rs b/src/connector/src/source/kafka/source/reader.rs index 65887bb825f92..f9f6a9472a1a5 100644 --- a/src/connector/src/source/kafka/source/reader.rs +++ b/src/connector/src/source/kafka/source/reader.rs @@ -28,13 +28,12 @@ use rdkafka::{ClientConfig, Message, Offset, TopicPartitionList}; use crate::parser::ParserConfig; use crate::source::base::SourceMessage; -use crate::source::common::{into_chunk_stream, CommonSplitReader}; use crate::source::kafka::{ KafkaProperties, KafkaSplit, PrivateLinkConsumerContext, KAFKA_ISOLATION_LEVEL, }; use crate::source::{ - BoxSourceWithStateStream, Column, SourceContextRef, SplitId, SplitImpl, SplitMetaData, - SplitReader, + into_chunk_stream, BoxSourceWithStateStream, Column, CommonSplitReader, SourceContextRef, + SplitId, SplitMetaData, SplitReader, }; pub struct KafkaSplitReader { @@ -49,10 +48,11 @@ pub struct KafkaSplitReader { #[async_trait] impl SplitReader for KafkaSplitReader { type Properties = KafkaProperties; + type Split = KafkaSplit; async fn new( properties: KafkaProperties, - splits: Vec, + splits: Vec, parser_config: ParserConfig, source_ctx: SourceContextRef, _columns: Option>, @@ -107,11 +107,6 @@ impl SplitReader for KafkaSplitReader { .await .map_err(|e| anyhow!("failed to create kafka consumer: {}", e))?; - let splits = splits - .into_iter() - .map(|split| split.into_kafka().unwrap()) - .collect::>(); - let mut tpl = TopicPartitionList::with_capacity(splits.len()); let mut offsets = HashMap::new(); diff --git a/src/connector/src/source/kinesis/mod.rs b/src/connector/src/source/kinesis/mod.rs index fc786f8f1b10d..6ad250fc93018 100644 --- a/src/connector/src/source/kinesis/mod.rs +++ b/src/connector/src/source/kinesis/mod.rs @@ -19,6 +19,10 @@ pub mod split; use serde::Deserialize; use crate::common::KinesisCommon; +use crate::source::kinesis::enumerator::client::KinesisSplitEnumerator; +use crate::source::kinesis::source::reader::KinesisSplitReader; +use crate::source::kinesis::split::KinesisSplit; +use crate::source::SourceProperties; pub const KINESIS_CONNECTOR: &str = "kinesis"; @@ -36,3 +40,11 @@ pub struct KinesisProperties { #[serde(flatten)] pub common: KinesisCommon, } + +impl SourceProperties for KinesisProperties { + type Split = KinesisSplit; + type SplitEnumerator = KinesisSplitEnumerator; + type SplitReader = KinesisSplitReader; + + const SOURCE_NAME: &'static str = KINESIS_CONNECTOR; +} diff --git a/src/connector/src/source/kinesis/source/reader.rs b/src/connector/src/source/kinesis/source/reader.rs index f4aad6ad80587..fed48c5162a44 100644 --- a/src/connector/src/source/kinesis/source/reader.rs +++ b/src/connector/src/source/kinesis/source/reader.rs @@ -24,13 +24,12 @@ use futures_async_stream::try_stream; use tokio_retry; use crate::parser::ParserConfig; -use crate::source::common::{into_chunk_stream, CommonSplitReader}; use crate::source::kinesis::source::message::KinesisMessage; -use crate::source::kinesis::split::KinesisOffset; +use crate::source::kinesis::split::{KinesisOffset, KinesisSplit}; use crate::source::kinesis::KinesisProperties; use crate::source::{ - BoxSourceWithStateStream, Column, SourceContextRef, SourceMessage, SplitId, SplitImpl, - SplitMetaData, SplitReader, + into_chunk_stream, BoxSourceWithStateStream, Column, CommonSplitReader, SourceContextRef, + SourceMessage, SplitId, SplitMetaData, SplitReader, }; #[derive(Debug, Clone)] @@ -51,17 +50,18 @@ pub struct KinesisSplitReader { #[async_trait] impl SplitReader for KinesisSplitReader { type Properties = KinesisProperties; + type Split = KinesisSplit; async fn new( properties: KinesisProperties, - splits: Vec, + splits: Vec, parser_config: ParserConfig, source_ctx: SourceContextRef, _columns: Option>, ) -> Result { assert!(splits.len() == 1); - let split = splits.into_iter().next().unwrap().into_kinesis().unwrap(); + let split = splits.into_iter().next().unwrap(); let start_position = match &split.start_position { KinesisOffset::None => match &properties.scan_startup_mode { @@ -314,11 +314,11 @@ mod tests { }; let client = KinesisSplitReader::new( properties, - vec![SplitImpl::Kinesis(KinesisSplit { + vec![KinesisSplit { shard_id: "shardId-000000000001".to_string().into(), start_position: KinesisOffset::Earliest, end_position: KinesisOffset::None, - })], + }], Default::default(), Default::default(), None, @@ -348,11 +348,11 @@ mod tests { let trim_horizen_reader = KinesisSplitReader::new( properties.clone(), - vec![SplitImpl::Kinesis(KinesisSplit { + vec![KinesisSplit { shard_id: "shardId-000000000001".to_string().into(), start_position: KinesisOffset::Earliest, end_position: KinesisOffset::None, - })], + }], Default::default(), Default::default(), None, @@ -364,13 +364,13 @@ mod tests { let offset_reader = KinesisSplitReader::new( properties.clone(), - vec![SplitImpl::Kinesis(KinesisSplit { + vec![KinesisSplit { shard_id: "shardId-000000000001".to_string().into(), start_position: KinesisOffset::SequenceNumber( "49629139817504901062972448413535783695568426186596941842".to_string(), ), end_position: KinesisOffset::None, - })], + }], Default::default(), Default::default(), None, diff --git a/src/connector/src/source/mod.rs b/src/connector/src/source/mod.rs index ed3979ba67ca2..20a9f706e60b5 100644 --- a/src/connector/src/source/mod.rs +++ b/src/connector/src/source/mod.rs @@ -16,7 +16,6 @@ pub mod base; pub mod cdc; pub mod data_gen_util; pub mod datagen; -pub mod dummy_connector; pub mod filesystem; pub mod google_pubsub; pub mod kafka; @@ -26,6 +25,7 @@ pub mod nats; pub mod nexmark; pub mod pulsar; pub use base::*; +pub(crate) use common::*; pub use google_pubsub::GOOGLE_PUBSUB_CONNECTOR; pub use kafka::KAFKA_CONNECTOR; pub use kinesis::KINESIS_CONNECTOR; diff --git a/src/connector/src/source/nats/mod.rs b/src/connector/src/source/nats/mod.rs index 2aa9dc2de55f2..1d887e342f1f8 100644 --- a/src/connector/src/source/nats/mod.rs +++ b/src/connector/src/source/nats/mod.rs @@ -19,6 +19,10 @@ pub mod split; use serde::Deserialize; use crate::common::NatsCommon; +use crate::source::nats::enumerator::NatsSplitEnumerator; +use crate::source::nats::source::{NatsSplit, NatsSplitReader}; +use crate::source::SourceProperties; + pub const NATS_CONNECTOR: &str = "nats"; #[derive(Clone, Debug, Deserialize)] @@ -28,3 +32,11 @@ pub struct NatsProperties { } impl NatsProperties {} + +impl SourceProperties for NatsProperties { + type Split = NatsSplit; + type SplitEnumerator = NatsSplitEnumerator; + type SplitReader = NatsSplitReader; + + const SOURCE_NAME: &'static str = NATS_CONNECTOR; +} diff --git a/src/connector/src/source/nats/source/reader.rs b/src/connector/src/source/nats/source/reader.rs index c0070a16c1392..d958b5a898495 100644 --- a/src/connector/src/source/nats/source/reader.rs +++ b/src/connector/src/source/nats/source/reader.rs @@ -23,7 +23,7 @@ use crate::source::common::{into_chunk_stream, CommonSplitReader}; use crate::source::nats::split::NatsSplit; use crate::source::nats::NatsProperties; use crate::source::{ - BoxSourceWithStateStream, Column, SourceContextRef, SourceMessage, SplitImpl, SplitReader, + BoxSourceWithStateStream, Column, SourceContextRef, SourceMessage, SplitReader, }; pub struct NatsSplitReader { @@ -36,20 +36,17 @@ pub struct NatsSplitReader { #[async_trait] impl SplitReader for NatsSplitReader { type Properties = NatsProperties; + type Split = NatsSplit; async fn new( properties: NatsProperties, - splits: Vec, + splits: Vec, parser_config: ParserConfig, source_ctx: SourceContextRef, _columns: Option>, ) -> Result { // TODO: to simplify the logic, return 1 split for first version assert!(splits.len() == 1); - let splits = splits - .into_iter() - .map(|split| split.into_nats().unwrap()) - .collect::>(); let consumer = properties .common .build_consumer(0, splits[0].start_sequence) diff --git a/src/connector/src/source/nexmark/mod.rs b/src/connector/src/source/nexmark/mod.rs index 679306cf96b22..e1f75ae1008e7 100644 --- a/src/connector/src/source/nexmark/mod.rs +++ b/src/connector/src/source/nexmark/mod.rs @@ -25,6 +25,9 @@ use serde::Deserialize; use serde_with::{serde_as, DisplayFromStr}; pub use split::*; +use crate::source::nexmark::source::reader::NexmarkSplitReader; +use crate::source::SourceProperties; + pub const NEXMARK_CONNECTOR: &str = "nexmark"; const fn identity_i32() -> i32 { @@ -217,6 +220,14 @@ pub struct NexmarkPropertiesInner { pub threads: Option, } +impl SourceProperties for NexmarkProperties { + type Split = NexmarkSplit; + type SplitEnumerator = NexmarkSplitEnumerator; + type SplitReader = NexmarkSplitReader; + + const SOURCE_NAME: &'static str = NEXMARK_CONNECTOR; +} + fn default_event_num() -> u64 { u64::MAX } diff --git a/src/connector/src/source/nexmark/source/reader.rs b/src/connector/src/source/nexmark/source/reader.rs index 190a3f1cbb63d..a2ca20f1a1f0b 100644 --- a/src/connector/src/source/nexmark/source/reader.rs +++ b/src/connector/src/source/nexmark/source/reader.rs @@ -33,8 +33,8 @@ use crate::source::nexmark::source::combined_event::{ }; use crate::source::nexmark::{NexmarkProperties, NexmarkSplit}; use crate::source::{ - BoxSourceWithStateStream, Column, SourceContextRef, SplitId, SplitImpl, SplitMetaData, - SplitReader, StreamChunkWithState, + BoxSourceWithStateStream, Column, SourceContextRef, SplitId, SplitMetaData, SplitReader, + StreamChunkWithState, }; #[derive(Debug)] @@ -55,11 +55,12 @@ pub struct NexmarkSplitReader { #[async_trait] impl SplitReader for NexmarkSplitReader { type Properties = NexmarkProperties; + type Split = NexmarkSplit; #[allow(clippy::unused_async)] async fn new( properties: NexmarkProperties, - splits: Vec, + splits: Vec, parser_config: ParserConfig, source_ctx: SourceContextRef, _columns: Option>, @@ -67,7 +68,7 @@ impl SplitReader for NexmarkSplitReader { tracing::debug!("Splits for nexmark found! {:?}", splits); assert!(splits.len() == 1); // TODO: currently, assume there's only one split in one reader - let split = splits.into_iter().next().unwrap().into_nexmark().unwrap(); + let split = splits.into_iter().next().unwrap(); let split_id = split.id(); let split_index = split.split_index as u64; @@ -182,7 +183,7 @@ mod tests { use super::*; use crate::source::nexmark::{NexmarkPropertiesInner, NexmarkSplitEnumerator}; - use crate::source::{SourceEnumeratorContext, SplitEnumerator, SplitImpl}; + use crate::source::{SourceEnumeratorContext, SplitEnumerator}; #[tokio::test] async fn test_nexmark_split_reader() -> Result<()> { @@ -197,12 +198,7 @@ mod tests { let mut enumerator = NexmarkSplitEnumerator::new(props.clone(), SourceEnumeratorContext::default().into()) .await?; - let list_splits_resp: Vec = enumerator - .list_splits() - .await? - .into_iter() - .map(SplitImpl::Nexmark) - .collect(); + let list_splits_resp: Vec<_> = enumerator.list_splits().await?.into_iter().collect(); assert_eq!(list_splits_resp.len(), 2); diff --git a/src/connector/src/source/pulsar/mod.rs b/src/connector/src/source/pulsar/mod.rs index 5d2bbfa332307..1dbcdf2e7bfb7 100644 --- a/src/connector/src/source/pulsar/mod.rs +++ b/src/connector/src/source/pulsar/mod.rs @@ -33,6 +33,8 @@ use url::Url; use crate::aws_auth::AwsAuthProps; use crate::aws_utils::load_file_descriptor_from_s3; +use crate::source::pulsar::source::reader::PulsarSplitReader; +use crate::source::SourceProperties; pub const PULSAR_CONNECTOR: &str = "pulsar"; @@ -76,6 +78,14 @@ pub struct PulsarProperties { pub oauth: Option, } +impl SourceProperties for PulsarProperties { + type Split = PulsarSplit; + type SplitEnumerator = PulsarSplitEnumerator; + type SplitReader = PulsarSplitReader; + + const SOURCE_NAME: &'static str = PULSAR_CONNECTOR; +} + impl PulsarProperties { pub async fn build_pulsar_client(&self) -> Result> { let mut pulsar_builder = Pulsar::builder(&self.service_url, TokioExecutor); diff --git a/src/connector/src/source/pulsar/source/reader.rs b/src/connector/src/source/pulsar/source/reader.rs index 9c749c27d616a..db6ccfedd726b 100644 --- a/src/connector/src/source/pulsar/source/reader.rs +++ b/src/connector/src/source/pulsar/source/reader.rs @@ -22,15 +22,13 @@ use itertools::Itertools; use pulsar::consumer::InitialPosition; use pulsar::message::proto::MessageIdData; use pulsar::{Consumer, ConsumerBuilder, ConsumerOptions, Pulsar, SubType, TokioExecutor}; -use risingwave_common::try_match_expand; use crate::parser::ParserConfig; -use crate::source::common::{into_chunk_stream, CommonSplitReader}; use crate::source::pulsar::split::PulsarSplit; use crate::source::pulsar::{PulsarEnumeratorOffset, PulsarProperties}; use crate::source::{ - BoxSourceWithStateStream, Column, SourceContextRef, SourceMessage, SplitId, SplitImpl, - SplitMetaData, SplitReader, + into_chunk_stream, BoxSourceWithStateStream, Column, CommonSplitReader, SourceContextRef, + SourceMessage, SplitId, SplitMetaData, SplitReader, }; pub struct PulsarSplitReader { @@ -88,16 +86,17 @@ fn parse_message_id(id: &str) -> Result { #[async_trait] impl SplitReader for PulsarSplitReader { type Properties = PulsarProperties; + type Split = PulsarSplit; async fn new( props: PulsarProperties, - splits: Vec, + splits: Vec, parser_config: ParserConfig, source_ctx: SourceContextRef, _columns: Option>, ) -> Result { ensure!(splits.len() == 1, "only support single split"); - let split = try_match_expand!(splits.into_iter().next().unwrap(), SplitImpl::Pulsar)?; + let split = splits.into_iter().next().unwrap(); let pulsar = props.build_pulsar_client().await?; let topic = split.topic.to_string(); diff --git a/src/frontend/src/scheduler/plan_fragmenter.rs b/src/frontend/src/scheduler/plan_fragmenter.rs index ee40882c4cbf1..7fa512fcbb05a 100644 --- a/src/frontend/src/scheduler/plan_fragmenter.rs +++ b/src/frontend/src/scheduler/plan_fragmenter.rs @@ -27,8 +27,9 @@ use risingwave_common::catalog::TableDesc; use risingwave_common::error::RwError; use risingwave_common::hash::{ParallelUnitId, ParallelUnitMapping, VirtualNode}; use risingwave_common::util::scan_range::ScanRange; +use risingwave_connector::source::kafka::KafkaSplitEnumerator; use risingwave_connector::source::{ - ConnectorProperties, SourceEnumeratorContext, SplitEnumeratorImpl, SplitImpl, + ConnectorProperties, SourceEnumeratorContext, SplitEnumerator, SplitImpl, }; use risingwave_pb::batch_plan::plan_node::NodeBody; use risingwave_pb::batch_plan::{ExchangeInfo, ScanRange as ScanRangeProto}; @@ -266,19 +267,17 @@ impl SourceScanInfo { unreachable!("Never call complete when SourceScanInfo is already complete") } }; - let mut enumerator = SplitEnumeratorImpl::create( - fetch_info.connector, - SourceEnumeratorContext::default().into(), - ) - .await?; - let kafka_enumerator = match enumerator { - SplitEnumeratorImpl::Kafka(ref mut kafka_enumerator) => kafka_enumerator, + let kafka_prop = match fetch_info.connector { + ConnectorProperties::Kafka(prop) => *prop, _ => { return Err(SchedulerError::Internal(anyhow!( "Unsupported to query directly from this source" ))) } }; + let mut kafka_enumerator = + KafkaSplitEnumerator::new(kafka_prop, SourceEnumeratorContext::default().into()) + .await?; let split_info = kafka_enumerator .list_splits_batch(fetch_info.timebound.0, fetch_info.timebound.1) .await? diff --git a/src/meta/src/rpc/service/cloud_service.rs b/src/meta/src/rpc/service/cloud_service.rs index 141abd9a0aab9..b1544f3076a46 100644 --- a/src/meta/src/rpc/service/cloud_service.rs +++ b/src/meta/src/rpc/service/cloud_service.rs @@ -17,9 +17,10 @@ use std::sync::LazyLock; use async_trait::async_trait; use regex::Regex; +use risingwave_connector::dispatch_source_prop; use risingwave_connector::source::kafka::private_link::insert_privatelink_broker_rewrite_map; use risingwave_connector::source::{ - ConnectorProperties, SourceEnumeratorContext, SplitEnumeratorImpl, + ConnectorProperties, SourceEnumeratorContext, SourceProperties, SplitEnumerator, }; use risingwave_pb::catalog::connection::Info::PrivateLinkService; use risingwave_pb::cloud_service::cloud_service_server::CloudService; @@ -139,36 +140,43 @@ impl CloudService for CloudServiceImpl { e.to_string(), )); }; - let enumerator = - SplitEnumeratorImpl::create(props.unwrap(), SourceEnumeratorContext::default().into()) - .await; - if let Err(e) = enumerator { - return Ok(new_rwc_validate_fail_response( - ErrorType::KafkaInvalidProperties, - e.to_string(), - )); + + async fn new_enumerator( + props: P, + ) -> Result { + P::SplitEnumerator::new(props, SourceEnumeratorContext::default().into()).await } - if let Err(e) = enumerator.unwrap().list_splits().await { - let error_message = e.to_string(); - if error_message.contains("BrokerTransportFailure") { + + dispatch_source_prop!(props.unwrap(), props, { + let enumerator = new_enumerator(*props).await; + if let Err(e) = enumerator { return Ok(new_rwc_validate_fail_response( - ErrorType::KafkaBrokerUnreachable, + ErrorType::KafkaInvalidProperties, e.to_string(), )); } - static TOPIC_NOT_FOUND: LazyLock = - LazyLock::new(|| Regex::new(r"topic .* not found").unwrap()); - if TOPIC_NOT_FOUND.is_match(error_message.as_str()) { + if let Err(e) = enumerator.unwrap().list_splits().await { + let error_message = e.to_string(); + if error_message.contains("BrokerTransportFailure") { + return Ok(new_rwc_validate_fail_response( + ErrorType::KafkaBrokerUnreachable, + e.to_string(), + )); + } + static TOPIC_NOT_FOUND: LazyLock = + LazyLock::new(|| Regex::new(r"topic .* not found").unwrap()); + if TOPIC_NOT_FOUND.is_match(error_message.as_str()) { + return Ok(new_rwc_validate_fail_response( + ErrorType::KafkaTopicNotFound, + e.to_string(), + )); + } return Ok(new_rwc_validate_fail_response( - ErrorType::KafkaTopicNotFound, + ErrorType::KafkaOther, e.to_string(), )); } - return Ok(new_rwc_validate_fail_response( - ErrorType::KafkaOther, - e.to_string(), - )); - } + }); Ok(Response::new(RwCloudValidateSourceResponse { ok: true, error: None, diff --git a/src/meta/src/stream/source_manager.rs b/src/meta/src/stream/source_manager.rs index a6b25d5fba4d7..cae431aa5b188 100644 --- a/src/meta/src/stream/source_manager.rs +++ b/src/meta/src/stream/source_manager.rs @@ -16,15 +16,17 @@ use std::borrow::BorrowMut; use std::cmp::Ordering; use std::collections::hash_map::Entry; use std::collections::{BTreeMap, BTreeSet, BinaryHeap, HashMap, HashSet}; +use std::ops::Deref; use std::sync::Arc; use std::time::Duration; use anyhow::anyhow; use itertools::Itertools; use risingwave_common::catalog::TableId; +use risingwave_connector::dispatch_source_prop; use risingwave_connector::source::{ - ConnectorProperties, SourceEnumeratorContext, SourceEnumeratorInfo, SplitEnumeratorImpl, - SplitId, SplitImpl, SplitMetaData, + ConnectorProperties, SourceEnumeratorContext, SourceEnumeratorInfo, SourceProperties, + SplitEnumerator, SplitId, SplitImpl, SplitMetaData, }; use risingwave_pb::catalog::Source; use risingwave_pb::connector_service::PbTableSchema; @@ -61,23 +63,52 @@ struct SharedSplitMap { type SharedSplitMapRef = Arc>; -struct ConnectorSourceWorker { +struct ConnectorSourceWorker { source_id: SourceId, source_name: String, current_splits: SharedSplitMapRef, - enumerator: SplitEnumeratorImpl, + enumerator: P::SplitEnumerator, period: Duration, metrics: Arc, - connector_properties: ConnectorProperties, + connector_properties: P, connector_client: Option, fail_cnt: u32, } -impl ConnectorSourceWorker { - const DEFAULT_SOURCE_WORKER_TICK_INTERVAL: Duration = Duration::from_secs(30); +fn extract_prop_from_source(source: &Source) -> MetaResult { + let mut properties = ConnectorProperties::extract(source.properties.clone())?; + if properties.is_cdc_connector() { + let pk_indices = source + .pk_column_ids + .iter() + .map(|&id| { + source + .columns + .iter() + .position(|col| col.column_desc.as_ref().unwrap().column_id == id) + .unwrap() as u32 + }) + .collect_vec(); + let table_schema = PbTableSchema { + columns: source + .columns + .iter() + .flat_map(|col| &col.column_desc) + .cloned() + .collect(), + pk_indices, + }; + properties.init_cdc_properties(table_schema); + } + Ok(properties) +} + +const DEFAULT_SOURCE_WORKER_TICK_INTERVAL: Duration = Duration::from_secs(30); + +impl ConnectorSourceWorker

{ async fn refresh(&mut self) -> MetaResult<()> { - let enumerator = SplitEnumeratorImpl::create( + let enumerator = P::SplitEnumerator::new( self.connector_properties.clone(), Arc::new(SourceEnumeratorContext { metrics: self.metrics.source_enumerator_metrics.clone(), @@ -97,17 +128,13 @@ impl ConnectorSourceWorker { pub async fn create( connector_client: &Option, source: &Source, + connector_properties: P, period: Duration, splits: Arc>, metrics: Arc, ) -> MetaResult { - let mut properties = ConnectorProperties::extract(source.properties.clone())?; - if properties.is_cdc_connector() { - let table_schema = Self::extract_source_schema(source); - properties.init_cdc_properties(table_schema); - } - let enumerator = SplitEnumeratorImpl::create( - properties.clone(), + let enumerator = P::SplitEnumerator::new( + connector_properties.clone(), Arc::new(SourceEnumeratorContext { metrics: metrics.source_enumerator_metrics.clone(), info: SourceEnumeratorInfo { @@ -125,7 +152,7 @@ impl ConnectorSourceWorker { enumerator, period, metrics, - connector_properties: properties, + connector_properties, connector_client: connector_client.clone(), fail_cnt: 0, }) @@ -177,36 +204,12 @@ impl ConnectorSourceWorker { current_splits.splits.replace( splits .into_iter() - .map(|split| (split.id(), split)) + .map(|split| (split.id(), P::Split::into(split))) .collect(), ); Ok(()) } - - fn extract_source_schema(source: &Source) -> PbTableSchema { - let pk_indices = source - .pk_column_ids - .iter() - .map(|&id| { - source - .columns - .iter() - .position(|col| col.column_desc.as_ref().unwrap().column_id == id) - .unwrap() as u32 - }) - .collect_vec(); - - PbTableSchema { - columns: source - .columns - .iter() - .flat_map(|col| &col.column_desc) - .cloned() - .collect(), - pk_indices, - } - } } struct ConnectorSourceWorkerHandle { @@ -526,7 +529,7 @@ impl SourceManager { source, &mut managed_sources, metrics.clone(), - ) + )? } } @@ -712,7 +715,7 @@ impl SourceManager { source: Source, managed_sources: &mut HashMap, metrics: Arc, - ) { + ) -> MetaResult<()> { tracing::info!("spawning new watcher for source {}", source.id); let (sync_call_tx, sync_call_rx) = tokio::sync::mpsc::unbounded_channel(); @@ -721,32 +724,37 @@ impl SourceManager { let current_splits_ref = splits.clone(); let source_id = source.id; + let connector_properties = extract_prop_from_source(&source)?; + let handle = tokio::spawn(async move { let mut ticker = time::interval(Self::DEFAULT_SOURCE_TICK_INTERVAL); ticker.set_missed_tick_behavior(MissedTickBehavior::Skip); - let mut worker = loop { - ticker.tick().await; - - match ConnectorSourceWorker::create( - &connector_client, - &source, - ConnectorSourceWorker::DEFAULT_SOURCE_WORKER_TICK_INTERVAL, - splits.clone(), - metrics.clone(), - ) - .await - { - Ok(worker) => { - break worker; - } - Err(e) => { - tracing::warn!("failed to create source worker: {}", e); + dispatch_source_prop!(connector_properties, prop, { + let mut worker = loop { + ticker.tick().await; + + match ConnectorSourceWorker::create( + &connector_client, + &source, + prop.deref().clone(), + DEFAULT_SOURCE_WORKER_TICK_INTERVAL, + splits.clone(), + metrics.clone(), + ) + .await + { + Ok(worker) => { + break worker; + } + Err(e) => { + tracing::warn!("failed to create source worker: {}", e); + } } - } - }; + }; - worker.run(sync_call_rx).await + worker.run(sync_call_rx).await + }); }); managed_sources.insert( @@ -757,6 +765,7 @@ impl SourceManager { splits: current_splits_ref, }, ); + Ok(()) } async fn create_source_worker( @@ -767,38 +776,41 @@ impl SourceManager { metrics: Arc, ) -> MetaResult<()> { let current_splits_ref = Arc::new(Mutex::new(SharedSplitMap { splits: None })); - let mut worker = ConnectorSourceWorker::create( - &connector_client, - source, - ConnectorSourceWorker::DEFAULT_SOURCE_WORKER_TICK_INTERVAL, - current_splits_ref.clone(), - metrics, - ) - .await?; - - tracing::info!("spawning new watcher for source {}", source.id); - - // don't force tick in process of recovery. One source down should not lead to meta recovery - // failure. - if force_tick { - // if fail to fetch meta info, will refuse to create source - - // todo: make the timeout configurable, longer than `properties.sync.call.timeout` in - // kafka - tokio::time::timeout(Self::DEFAULT_SOURCE_TICK_TIMEOUT, worker.tick()) - .await - .map_err(|_e| { - anyhow!( - "failed to fetch meta info for source {}, error: timeout {}", - source.id, - Self::DEFAULT_SOURCE_TICK_TIMEOUT.as_secs() - ) - })??; - } - + let connector_properties = extract_prop_from_source(source)?; let (sync_call_tx, sync_call_rx) = tokio::sync::mpsc::unbounded_channel(); + let handle = dispatch_source_prop!(connector_properties, prop, { + let mut worker = ConnectorSourceWorker::create( + &connector_client, + source, + *prop, + DEFAULT_SOURCE_WORKER_TICK_INTERVAL, + current_splits_ref.clone(), + metrics, + ) + .await?; + + tracing::info!("spawning new watcher for source {}", source.id); + + // don't force tick in process of recovery. One source down should not lead to meta + // recovery failure. + if force_tick { + // if fail to fetch meta info, will refuse to create source + + // todo: make the timeout configurable, longer than `properties.sync.call.timeout` + // in kafka + tokio::time::timeout(Self::DEFAULT_SOURCE_TICK_TIMEOUT, worker.tick()) + .await + .map_err(|_e| { + anyhow!( + "failed to fetch meta info for source {}, error: timeout {}", + source.id, + Self::DEFAULT_SOURCE_TICK_TIMEOUT.as_secs() + ) + })??; + } - let handle = tokio::spawn(async move { worker.run(sync_call_rx).await }); + tokio::spawn(async move { worker.run(sync_call_rx).await }) + }); managed_sources.insert( source.id, diff --git a/src/source/src/connector_source.rs b/src/source/src/connector_source.rs index df0dc8e147a59..445bf0f6dbb90 100644 --- a/src/source/src/connector_source.rs +++ b/src/source/src/connector_source.rs @@ -16,16 +16,18 @@ use std::collections::HashMap; use std::sync::Arc; use futures::future::try_join_all; +use futures::stream::pending; use futures::StreamExt; use itertools::Itertools; use risingwave_common::catalog::ColumnId; use risingwave_common::error::ErrorCode::ConnectorError; use risingwave_common::error::{internal_error, Result}; use risingwave_common::util::select_all; +use risingwave_connector::dispatch_source_prop; use risingwave_connector::parser::{CommonParserConfig, ParserConfig, SpecificParserConfig}; use risingwave_connector::source::{ - BoxSourceWithStateStream, Column, ConnectorProperties, ConnectorState, SourceColumnDesc, - SourceContext, SplitReaderImpl, + create_split_reader, BoxSourceWithStateStream, Column, ConnectorProperties, ConnectorState, + SourceColumnDesc, SourceContext, SplitReader, }; #[derive(Clone, Debug)] @@ -74,10 +76,13 @@ impl ConnectorSource { pub async fn stream_reader( &self, - splits: ConnectorState, + state: ConnectorState, column_ids: Vec, source_ctx: Arc, ) -> Result { + let Some(splits) = state else { + return Ok(pending().boxed()); + }; let config = self.config.clone(); let columns = self.get_target_columns(column_ids)?; @@ -99,53 +104,46 @@ impl ConnectorSource { }, }; - let readers = if config.support_multiple_splits() { - tracing::debug!( - "spawning connector split reader for multiple splits {:?}", - splits - ); + let support_multiple_splits = config.support_multiple_splits(); - let reader = SplitReaderImpl::create( - config, - splits, - parser_config, - source_ctx, - data_gen_columns, - ) - .await?; + dispatch_source_prop!(config, prop, { + let readers = if support_multiple_splits { + tracing::debug!( + "spawning connector split reader for multiple splits {:?}", + splits + ); - vec![reader] - } else { - let to_reader_splits = match splits { - Some(vec_split_impl) => vec_split_impl - .into_iter() - .map(|split| Some(vec![split])) - .collect::>(), - None => vec![None], - }; + let reader = + create_split_reader(*prop, splits, parser_config, source_ctx, data_gen_columns) + .await?; - try_join_all(to_reader_splits.into_iter().map(|state| { - tracing::debug!("spawning connector split reader for split {:?}", state); - let props = config.clone(); - let data_gen_columns = data_gen_columns.clone(); - let parser_config = parser_config.clone(); - // TODO: is this reader split across multiple threads...? Realistically, we want - // source_ctx to live in a single actor. - let source_ctx = source_ctx.clone(); - async move { - SplitReaderImpl::create( - props, - state, - parser_config, - source_ctx, - data_gen_columns, - ) - .await - } - })) - .await? - }; + vec![reader] + } else { + let to_reader_splits = splits.into_iter().map(|split| vec![split]); - Ok(select_all(readers.into_iter().map(|r| r.into_stream())).boxed()) + try_join_all(to_reader_splits.into_iter().map(|splits| { + tracing::debug!("spawning connector split reader for split {:?}", splits); + let props = prop.clone(); + let data_gen_columns = data_gen_columns.clone(); + let parser_config = parser_config.clone(); + // TODO: is this reader split across multiple threads...? Realistically, we want + // source_ctx to live in a single actor. + let source_ctx = source_ctx.clone(); + async move { + create_split_reader( + *props, + splits, + parser_config, + source_ctx, + data_gen_columns, + ) + .await + } + })) + .await? + }; + + Ok(select_all(readers.into_iter().map(|r| r.into_stream())).boxed()) + }) } } diff --git a/src/source/src/fs_connector_source.rs b/src/source/src/fs_connector_source.rs index daee19569db0f..974f0561e0f2d 100644 --- a/src/source/src/fs_connector_source.rs +++ b/src/source/src/fs_connector_source.rs @@ -15,12 +15,16 @@ use std::collections::HashMap; use std::sync::Arc; +use futures::stream::pending; +use futures::StreamExt; use risingwave_common::catalog::ColumnId; use risingwave_common::error::ErrorCode::ConnectorError; -use risingwave_common::error::{internal_error, Result, RwError}; +use risingwave_common::error::{internal_error, Result}; +use risingwave_connector::dispatch_source_prop; use risingwave_connector::parser::{CommonParserConfig, ParserConfig, SpecificParserConfig}; use risingwave_connector::source::{ - ConnectorProperties, ConnectorState, SourceColumnDesc, SourceContext, SplitReaderImpl, + create_split_reader, BoxSourceWithStateStream, ConnectorProperties, ConnectorState, + SourceColumnDesc, SourceContext, SplitReader, }; #[derive(Clone, Debug)] @@ -77,7 +81,7 @@ impl FsConnectorSource { state: ConnectorState, column_ids: Vec, source_ctx: Arc, - ) -> Result { + ) -> Result { let config = self.config.clone(); let columns = self.get_target_columns(column_ids)?; @@ -87,8 +91,16 @@ impl FsConnectorSource { rw_columns: columns, }, }; - SplitReaderImpl::create(config, state, parser_config, source_ctx, None) - .await - .map_err(RwError::from) + let stream = match state { + None => pending().boxed(), + Some(splits) => { + dispatch_source_prop!(config, prop, { + create_split_reader(*prop, splits, parser_config, source_ctx, None) + .await? + .into_stream() + }) + } + }; + Ok(stream) } } diff --git a/src/stream/src/executor/source/fs_source_executor.rs b/src/stream/src/executor/source/fs_source_executor.rs index aba89f817b09a..ae77adb427e23 100644 --- a/src/stream/src/executor/source/fs_source_executor.rs +++ b/src/stream/src/executor/source/fs_source_executor.rs @@ -112,12 +112,11 @@ impl FsSourceExecutor { None, self.actor_ctx.error_suppressor.clone(), ); - let stream_reader = source_desc + source_desc .source .stream_reader(state, column_ids, Arc::new(source_ctx)) .await - .map_err(StreamExecutorError::connector_error)?; - Ok(stream_reader.into_stream()) + .map_err(StreamExecutorError::connector_error) } async fn apply_split_change(