From d9890ef1f244b233e20b1c558edbda3e1b95377a Mon Sep 17 00:00:00 2001 From: xxchan Date: Thu, 23 May 2024 23:31:24 +0800 Subject: [PATCH] refactor(connector): improve common split reader to chunk stream logic (#16892) Signed-off-by: xxchan --- src/connector/src/parser/avro/parser.rs | 8 ---- src/connector/src/parser/avro/util.rs | 10 ++-- src/connector/src/parser/mod.rs | 23 ++++++---- src/connector/src/parser/plain_parser.rs | 5 +- src/connector/src/parser/unified/avro.rs | 3 +- src/connector/src/source/base.rs | 2 +- src/connector/src/source/cdc/source/reader.rs | 8 ++-- src/connector/src/source/common.rs | 13 ++---- .../src/source/datagen/source/reader.rs | 8 ++-- .../opendal_source/opendal_reader.rs | 44 ++++++------------ .../src/source/filesystem/s3/source/reader.rs | 46 +++++++------------ .../src/source/google_pubsub/source/reader.rs | 8 ++-- .../src/source/kafka/source/reader.rs | 8 ++-- .../src/source/kinesis/source/reader.rs | 8 ++-- .../src/source/mqtt/source/reader.rs | 6 +-- .../src/source/nats/source/reader.rs | 6 +-- .../src/source/pulsar/source/reader.rs | 14 ++++-- 17 files changed, 94 insertions(+), 126 deletions(-) diff --git a/src/connector/src/parser/avro/parser.rs b/src/connector/src/parser/avro/parser.rs index c1950313b5e5c..540592ff383e8 100644 --- a/src/connector/src/parser/avro/parser.rs +++ b/src/connector/src/parser/avro/parser.rs @@ -159,14 +159,6 @@ impl AvroParserConfig { } } - pub fn extract_pks(&self) -> ConnectorResult> { - avro_schema_to_column_descs( - self.key_schema - .as_deref() - .ok_or_else(|| anyhow::format_err!("key schema is required"))?, - ) - } - pub fn map_to_columns(&self) -> ConnectorResult> { avro_schema_to_column_descs(self.schema.as_ref()) } diff --git a/src/connector/src/parser/avro/util.rs b/src/connector/src/parser/avro/util.rs index a58ad884fd886..9f4992060decf 100644 --- a/src/connector/src/parser/avro/util.rs +++ b/src/connector/src/parser/avro/util.rs @@ -139,9 +139,7 @@ fn avro_type_mapping(schema: &Schema) -> ConnectorResult { .variants() .iter() .find_or_first(|s| !matches!(s, Schema::Null)) - .ok_or_else(|| { - anyhow::format_err!("unsupported type in Avro: {:?}", union_schema) - })?; + .ok_or_else(|| anyhow::format_err!("unsupported Avro type: {:?}", union_schema))?; avro_type_mapping(nested_schema)? } @@ -151,10 +149,12 @@ fn avro_type_mapping(schema: &Schema) -> ConnectorResult { { DataType::Decimal } else { - bail!("unsupported type in Avro: {:?}", schema); + bail!("unsupported Avro type: {:?}", schema); } } - _ => bail!("unsupported type in Avro: {:?}", schema), + Schema::Map(_) | Schema::Null | Schema::Fixed(_) | Schema::Uuid => { + bail!("unsupported Avro type: {:?}", schema) + } }; Ok(data_type) diff --git a/src/connector/src/parser/mod.rs b/src/connector/src/parser/mod.rs index 08e8bd3dd46ea..87b6b5c4a782c 100644 --- a/src/connector/src/parser/mod.rs +++ b/src/connector/src/parser/mod.rs @@ -544,8 +544,10 @@ pub enum ParserFormat { Plain, } -/// `ByteStreamSourceParser` is a new message parser, the parser should consume -/// the input data stream and return a stream of parsed msgs. +/// `ByteStreamSourceParser` is the entrypoint abstraction for parsing messages. +/// It consumes bytes of one individual message and produces parsed records. +/// +/// It's used by [`ByteStreamSourceParserImpl::into_stream`]. `pub` is for benchmark only. pub trait ByteStreamSourceParser: Send + Debug + Sized + 'static { /// The column descriptors of the output chunk. fn columns(&self) -> &[SourceColumnDesc]; @@ -627,7 +629,7 @@ impl P { // The parser stream will be long-lived. We use `instrument_with` here to create // a new span for the polling of each chunk. - into_chunk_stream(self, data_stream) + into_chunk_stream_inner(self, data_stream) .instrument_with(move || tracing::info_span!("source_parse_chunk", actor_id, source_id)) } } @@ -639,7 +641,10 @@ const MAX_ROWS_FOR_TRANSACTION: usize = 4096; // TODO: when upsert is disabled, how to filter those empty payload // Currently, an err is returned for non upsert with empty payload #[try_stream(ok = StreamChunk, error = crate::error::ConnectorError)] -async fn into_chunk_stream(mut parser: P, data_stream: BoxSourceStream) { +async fn into_chunk_stream_inner( + mut parser: P, + data_stream: BoxSourceStream, +) { let columns = parser.columns().to_vec(); let mut heartbeat_builder = SourceStreamChunkBuilder::with_capacity(columns.clone(), 0); @@ -855,8 +860,10 @@ impl AccessBuilderImpl { } } +/// The entrypoint of parsing. It parses [`SourceMessage`] stream (byte stream) into [`StreamChunk`] stream. +/// Used by [`crate::source::into_chunk_stream`]. #[derive(Debug)] -pub enum ByteStreamSourceParserImpl { +pub(crate) enum ByteStreamSourceParserImpl { Csv(CsvParser), Json(JsonParser), Debezium(DebeziumParser), @@ -867,11 +874,9 @@ pub enum ByteStreamSourceParserImpl { CanalJson(CanalJsonParser), } -pub type ParsedStreamImpl = impl ChunkSourceStream + Unpin; - impl ByteStreamSourceParserImpl { - /// Converts this `SourceMessage` stream into a stream of [`StreamChunk`]. - pub fn into_stream(self, msg_stream: BoxSourceStream) -> ParsedStreamImpl { + /// Converts [`SourceMessage`] stream into [`StreamChunk`] stream. + pub fn into_stream(self, msg_stream: BoxSourceStream) -> impl ChunkSourceStream + Unpin { #[auto_enum(futures03::Stream)] let stream = match self { Self::Csv(parser) => parser.into_stream(msg_stream), diff --git a/src/connector/src/parser/plain_parser.rs b/src/connector/src/parser/plain_parser.rs index c526366905938..db6fe44874d0b 100644 --- a/src/connector/src/parser/plain_parser.rs +++ b/src/connector/src/parser/plain_parser.rs @@ -30,6 +30,7 @@ use crate::parser::upsert_parser::get_key_column_name; use crate::parser::{BytesProperties, ParseResult, ParserFormat}; use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef, SourceMeta}; +/// Parser for `FORMAT PLAIN`, i.e., append-only source. #[derive(Debug)] pub struct PlainParser { pub key_builder: Option, @@ -210,7 +211,7 @@ mod tests { let mut transactional = false; // for untransactional source, we expect emit a chunk for each message batch let message_stream = source_message_stream(transactional); - let chunk_stream = crate::parser::into_chunk_stream(parser, message_stream.boxed()); + let chunk_stream = crate::parser::into_chunk_stream_inner(parser, message_stream.boxed()); let output: std::result::Result, _> = block_on(chunk_stream.collect::>()) .into_iter() .collect(); @@ -247,7 +248,7 @@ mod tests { // for transactional source, we expect emit a single chunk for the transaction transactional = true; let message_stream = source_message_stream(transactional); - let chunk_stream = crate::parser::into_chunk_stream(parser, message_stream.boxed()); + let chunk_stream = crate::parser::into_chunk_stream_inner(parser, message_stream.boxed()); let output: std::result::Result, _> = block_on(chunk_stream.collect::>()) .into_iter() .collect(); diff --git a/src/connector/src/parser/unified/avro.rs b/src/connector/src/parser/unified/avro.rs index 257ca2926c342..837b879f92d29 100644 --- a/src/connector/src/parser/unified/avro.rs +++ b/src/connector/src/parser/unified/avro.rs @@ -478,8 +478,7 @@ mod tests { /// - string: String /// - Date (the number of days from the unix epoch, 1970-1-1 UTC) /// - Timestamp (the number of milliseconds from the unix epoch, 1970-1-1 00:00:00.000 UTC) - - pub(crate) fn from_avro_value( + fn from_avro_value( value: Value, value_schema: &Schema, shape: &DataType, diff --git a/src/connector/src/source/base.rs b/src/connector/src/source/base.rs index dc67695013958..9c77382a0143d 100644 --- a/src/connector/src/source/base.rs +++ b/src/connector/src/source/base.rs @@ -42,7 +42,6 @@ use super::nexmark::source::message::NexmarkMeta; use super::{GCS_CONNECTOR, OPENDAL_S3_CONNECTOR, POSIX_FS_CONNECTOR}; use crate::error::ConnectorResult as Result; use crate::parser::ParserConfig; -pub(crate) use crate::source::common::CommonSplitReader; use crate::source::filesystem::FsPageItem; use crate::source::monitor::EnumeratorMetrics; use crate::source::SplitImpl::{CitusCdc, MongodbCdc, MysqlCdc, PostgresCdc}; @@ -314,6 +313,7 @@ pub fn extract_source_struct(info: &PbStreamSourceInfo) -> Result Ok(SourceStruct::new(format, encode)) } +/// Stream of [`SourceMessage`]. pub type BoxSourceStream = BoxStream<'static, crate::error::ConnectorResult>>; pub trait ChunkSourceStream = diff --git a/src/connector/src/source/cdc/source/reader.rs b/src/connector/src/source/cdc/source/reader.rs index cf2b5c3d17e00..6c681adeece5d 100644 --- a/src/connector/src/source/cdc/source/reader.rs +++ b/src/connector/src/source/cdc/source/reader.rs @@ -33,8 +33,8 @@ use crate::parser::ParserConfig; use crate::source::base::SourceMessage; use crate::source::cdc::{CdcProperties, CdcSourceType, CdcSourceTypeTrait, DebeziumCdcSplit}; use crate::source::{ - into_chunk_stream, BoxChunkSourceStream, Column, CommonSplitReader, SourceContextRef, SplitId, - SplitMetaData, SplitReader, + into_chunk_stream, BoxChunkSourceStream, Column, SourceContextRef, SplitId, SplitMetaData, + SplitReader, }; pub struct CdcSplitReader { @@ -190,11 +190,11 @@ impl SplitReader for CdcSplitReader { fn into_stream(self) -> BoxChunkSourceStream { let parser_config = self.parser_config.clone(); let source_context = self.source_ctx.clone(); - into_chunk_stream(self, parser_config, source_context) + into_chunk_stream(self.into_data_stream(), parser_config, source_context) } } -impl CommonSplitReader for CdcSplitReader { +impl CdcSplitReader { #[try_stream(ok = Vec, error = ConnectorError)] async fn into_data_stream(self) { let source_type = T::source_type(); diff --git a/src/connector/src/source/common.rs b/src/connector/src/source/common.rs index 8fcd1318401f2..abb80221e8fae 100644 --- a/src/connector/src/source/common.rs +++ b/src/connector/src/source/common.rs @@ -18,15 +18,13 @@ use risingwave_common::array::StreamChunk; use crate::error::{ConnectorError, ConnectorResult}; use crate::parser::ParserConfig; -use crate::source::{SourceContextRef, SourceMessage, SplitReader}; - -pub(crate) trait CommonSplitReader: SplitReader + 'static { - fn into_data_stream(self) -> impl Stream>> + Send; -} +use crate::source::{SourceContextRef, SourceMessage}; +/// Utility function to convert [`SourceMessage`] stream (got from specific connector's [`SplitReader`](super::SplitReader)) +/// into [`StreamChunk`] stream (by invoking [`ByteStreamSourceParserImpl`](crate::parser::ByteStreamSourceParserImpl)). #[try_stream(boxed, ok = StreamChunk, error = ConnectorError)] pub(crate) async fn into_chunk_stream( - reader: impl CommonSplitReader, + data_stream: impl Stream>> + Send + 'static, parser_config: ParserConfig, source_ctx: SourceContextRef, ) { @@ -36,8 +34,7 @@ pub(crate) async fn into_chunk_stream( let source_name = source_ctx.source_name.to_string(); let metrics = source_ctx.metrics.clone(); - let data_stream = reader.into_data_stream(); - + // add metrics to the data stream let data_stream = data_stream .inspect_ok(move |data_batch| { let mut by_split_id = std::collections::HashMap::new(); diff --git a/src/connector/src/source/datagen/source/reader.rs b/src/connector/src/source/datagen/source/reader.rs index 87f798d59f38b..b91333aa9443c 100644 --- a/src/connector/src/source/datagen/source/reader.rs +++ b/src/connector/src/source/datagen/source/reader.rs @@ -27,8 +27,8 @@ use crate::source::data_gen_util::spawn_data_generation_stream; use crate::source::datagen::source::SEQUENCE_FIELD_KIND; use crate::source::datagen::{DatagenProperties, DatagenSplit, FieldDesc}; use crate::source::{ - into_chunk_stream, BoxChunkSourceStream, Column, CommonSplitReader, DataType, SourceContextRef, - SourceMessage, SplitId, SplitMetaData, SplitReader, + into_chunk_stream, BoxChunkSourceStream, Column, DataType, SourceContextRef, SourceMessage, + SplitId, SplitMetaData, SplitReader, }; pub struct DatagenSplitReader { @@ -177,13 +177,13 @@ impl SplitReader for DatagenSplitReader { _ => { let parser_config = self.parser_config.clone(); let source_context = self.source_ctx.clone(); - into_chunk_stream(self, parser_config, source_context) + into_chunk_stream(self.into_data_stream(), parser_config, source_context) } } } } -impl CommonSplitReader for DatagenSplitReader { +impl DatagenSplitReader { fn into_data_stream(self) -> impl Stream>> { // Will buffer at most 4 event chunks. const BUFFER_SIZE: usize = 4; diff --git a/src/connector/src/source/filesystem/opendal_source/opendal_reader.rs b/src/connector/src/source/filesystem/opendal_source/opendal_reader.rs index 0fb53ce8a0d1c..5594b3cfdfe23 100644 --- a/src/connector/src/source/filesystem/opendal_source/opendal_reader.rs +++ b/src/connector/src/source/filesystem/opendal_source/opendal_reader.rs @@ -25,12 +25,12 @@ use tokio_util::io::{ReaderStream, StreamReader}; use super::opendal_enumerator::OpendalEnumerator; use super::OpendalSource; use crate::error::ConnectorResult; -use crate::parser::{ByteStreamSourceParserImpl, ParserConfig}; +use crate::parser::ParserConfig; use crate::source::filesystem::nd_streaming::need_nd_streaming; use crate::source::filesystem::{nd_streaming, OpendalFsSplit}; use crate::source::{ - BoxChunkSourceStream, Column, SourceContextRef, SourceMessage, SourceMeta, SplitMetaData, - SplitReader, + into_chunk_stream, BoxChunkSourceStream, Column, SourceContextRef, SourceMessage, SourceMeta, + SplitMetaData, SplitReader, }; const MAX_CHANNEL_BUFFER_SIZE: usize = 2048; @@ -65,46 +65,30 @@ impl SplitReader for OpendalReader { } fn into_stream(self) -> BoxChunkSourceStream { - self.into_chunk_stream() + self.into_stream_inner() } } impl OpendalReader { #[try_stream(boxed, ok = StreamChunk, error = crate::error::ConnectorError)] - async fn into_chunk_stream(self) { + async fn into_stream_inner(self) { for split in self.splits { - let actor_id = self.source_ctx.actor_id.to_string(); - let fragment_id = self.source_ctx.fragment_id.to_string(); - let source_id = self.source_ctx.source_id.to_string(); - let source_name = self.source_ctx.source_name.to_string(); - let source_ctx = self.source_ctx.clone(); - - let split_id = split.id(); - let data_stream = Self::stream_read_object(self.connector.op.clone(), split, self.source_ctx.clone()); - - let parser = - ByteStreamSourceParserImpl::create(self.parser_config.clone(), source_ctx).await?; - let msg_stream = if need_nd_streaming(&self.parser_config.specific.encoding_config) { - parser.into_stream(nd_streaming::split_stream(data_stream)) + let data_stream = if need_nd_streaming(&self.parser_config.specific.encoding_config) { + nd_streaming::split_stream(data_stream) } else { - parser.into_stream(data_stream) + data_stream }; + + let msg_stream = into_chunk_stream( + data_stream, + self.parser_config.clone(), + self.source_ctx.clone(), + ); #[for_await] for msg in msg_stream { let msg = msg?; - self.source_ctx - .metrics - .partition_input_count - .with_label_values(&[ - &actor_id, - &source_id, - &split_id, - &source_name, - &fragment_id, - ]) - .inc_by(msg.cardinality() as u64); yield msg; } } diff --git a/src/connector/src/source/filesystem/s3/source/reader.rs b/src/connector/src/source/filesystem/s3/source/reader.rs index 129b708a61521..d456ce226e3a3 100644 --- a/src/connector/src/source/filesystem/s3/source/reader.rs +++ b/src/connector/src/source/filesystem/s3/source/reader.rs @@ -33,13 +33,15 @@ use tokio_util::io::ReaderStream; use crate::aws_utils::{default_conn_config, s3_client}; use crate::connector_common::AwsAuthProps; use crate::error::ConnectorResult; -use crate::parser::{ByteStreamSourceParserImpl, ParserConfig}; +use crate::parser::ParserConfig; use crate::source::base::{SplitMetaData, SplitReader}; use crate::source::filesystem::file_common::FsSplit; use crate::source::filesystem::nd_streaming; use crate::source::filesystem::nd_streaming::need_nd_streaming; use crate::source::filesystem::s3::S3Properties; -use crate::source::{BoxChunkSourceStream, Column, SourceContextRef, SourceMessage, SourceMeta}; +use crate::source::{ + into_chunk_stream, BoxChunkSourceStream, Column, SourceContextRef, SourceMessage, SourceMeta, +}; const MAX_CHANNEL_BUFFER_SIZE: usize = 2048; const STREAM_READER_CAPACITY: usize = 4096; @@ -204,50 +206,34 @@ impl SplitReader for S3FileReader { } fn into_stream(self) -> BoxChunkSourceStream { - self.into_chunk_stream() + self.into_stream_inner() } } impl S3FileReader { #[try_stream(boxed, ok = StreamChunk, error = crate::error::ConnectorError)] - async fn into_chunk_stream(self) { + async fn into_stream_inner(self) { for split in self.splits { - let actor_id = self.source_ctx.actor_id.to_string(); - let fragment_id = self.source_ctx.fragment_id.to_string(); - let source_id = self.source_ctx.source_id.to_string(); - let source_name = self.source_ctx.source_name.to_string(); - let source_ctx = self.source_ctx.clone(); - - let split_id = split.id(); - let data_stream = Self::stream_read_object( self.s3_client.clone(), self.bucket_name.clone(), split, self.source_ctx.clone(), ); - - let parser = - ByteStreamSourceParserImpl::create(self.parser_config.clone(), source_ctx).await?; - let msg_stream = if need_nd_streaming(&self.parser_config.specific.encoding_config) { - parser.into_stream(nd_streaming::split_stream(data_stream)) + let data_stream = if need_nd_streaming(&self.parser_config.specific.encoding_config) { + nd_streaming::split_stream(data_stream) } else { - parser.into_stream(data_stream) + data_stream }; + + let msg_stream = into_chunk_stream( + data_stream, + self.parser_config.clone(), + self.source_ctx.clone(), + ); #[for_await] for msg in msg_stream { let msg = msg?; - self.source_ctx - .metrics - .partition_input_count - .with_label_values(&[ - &actor_id, - &source_id, - &split_id, - &source_name, - &fragment_id, - ]) - .inc_by(msg.cardinality() as u64); yield msg; } } @@ -313,7 +299,7 @@ mod tests { .await .unwrap(); - let msg_stream = reader.into_chunk_stream(); + let msg_stream = reader.into_stream_inner(); #[for_await] for msg in msg_stream { println!("msg {:?}", msg); diff --git a/src/connector/src/source/google_pubsub/source/reader.rs b/src/connector/src/source/google_pubsub/source/reader.rs index c771fc8eb638c..abc3ecd7743e3 100644 --- a/src/connector/src/source/google_pubsub/source/reader.rs +++ b/src/connector/src/source/google_pubsub/source/reader.rs @@ -23,8 +23,8 @@ use crate::error::{ConnectorError, ConnectorResult as Result}; use crate::parser::ParserConfig; use crate::source::google_pubsub::{PubsubProperties, PubsubSplit}; use crate::source::{ - into_chunk_stream, BoxChunkSourceStream, Column, CommonSplitReader, SourceContextRef, - SourceMessage, SplitId, SplitMetaData, SplitReader, + into_chunk_stream, BoxChunkSourceStream, Column, SourceContextRef, SourceMessage, SplitId, + SplitMetaData, SplitReader, }; const PUBSUB_MAX_FETCH_MESSAGES: usize = 1024; @@ -37,7 +37,7 @@ pub struct PubsubSplitReader { source_ctx: SourceContextRef, } -impl CommonSplitReader for PubsubSplitReader { +impl PubsubSplitReader { #[try_stream(ok = Vec, error = ConnectorError)] async fn into_data_stream(self) { loop { @@ -108,6 +108,6 @@ impl SplitReader for PubsubSplitReader { fn into_stream(self) -> BoxChunkSourceStream { let parser_config = self.parser_config.clone(); let source_context = self.source_ctx.clone(); - into_chunk_stream(self, parser_config, source_context) + into_chunk_stream(self.into_data_stream(), parser_config, source_context) } } diff --git a/src/connector/src/source/kafka/source/reader.rs b/src/connector/src/source/kafka/source/reader.rs index e3c59611d1acc..c54b0ac27655a 100644 --- a/src/connector/src/source/kafka/source/reader.rs +++ b/src/connector/src/source/kafka/source/reader.rs @@ -34,8 +34,8 @@ use crate::source::kafka::{ KafkaContextCommon, KafkaProperties, KafkaSplit, RwConsumerContext, KAFKA_ISOLATION_LEVEL, }; use crate::source::{ - into_chunk_stream, BoxChunkSourceStream, Column, CommonSplitReader, SourceContextRef, SplitId, - SplitMetaData, SplitReader, + into_chunk_stream, BoxChunkSourceStream, Column, SourceContextRef, SplitId, SplitMetaData, + SplitReader, }; pub struct KafkaSplitReader { @@ -149,7 +149,7 @@ impl SplitReader for KafkaSplitReader { fn into_stream(self) -> BoxChunkSourceStream { let parser_config = self.parser_config.clone(); let source_context = self.source_ctx.clone(); - into_chunk_stream(self, parser_config, source_context) + into_chunk_stream(self.into_data_stream(), parser_config, source_context) } } @@ -168,7 +168,7 @@ impl KafkaSplitReader { } } -impl CommonSplitReader for KafkaSplitReader { +impl KafkaSplitReader { #[try_stream(ok = Vec, error = crate::error::ConnectorError)] async fn into_data_stream(self) { if self.offsets.values().all(|(start_offset, stop_offset)| { diff --git a/src/connector/src/source/kinesis/source/reader.rs b/src/connector/src/source/kinesis/source/reader.rs index 4504324c5fb96..64873df7df029 100644 --- a/src/connector/src/source/kinesis/source/reader.rs +++ b/src/connector/src/source/kinesis/source/reader.rs @@ -31,8 +31,8 @@ use crate::source::kinesis::source::message::from_kinesis_record; use crate::source::kinesis::split::{KinesisOffset, KinesisSplit}; use crate::source::kinesis::KinesisProperties; use crate::source::{ - into_chunk_stream, BoxChunkSourceStream, Column, CommonSplitReader, SourceContextRef, - SourceMessage, SplitId, SplitMetaData, SplitReader, + into_chunk_stream, BoxChunkSourceStream, Column, SourceContextRef, SourceMessage, SplitId, + SplitMetaData, SplitReader, }; #[derive(Debug, Clone)] @@ -114,11 +114,11 @@ impl SplitReader for KinesisSplitReader { fn into_stream(self) -> BoxChunkSourceStream { let parser_config = self.parser_config.clone(); let source_context = self.source_ctx.clone(); - into_chunk_stream(self, parser_config, source_context) + into_chunk_stream(self.into_data_stream(), parser_config, source_context) } } -impl CommonSplitReader for KinesisSplitReader { +impl KinesisSplitReader { #[try_stream(ok = Vec < SourceMessage >, error = crate::error::ConnectorError)] async fn into_data_stream(mut self) { self.new_shard_iter().await?; diff --git a/src/connector/src/source/mqtt/source/reader.rs b/src/connector/src/source/mqtt/source/reader.rs index 50f90c816390c..82ca98ee10cf1 100644 --- a/src/connector/src/source/mqtt/source/reader.rs +++ b/src/connector/src/source/mqtt/source/reader.rs @@ -23,7 +23,7 @@ use super::message::MqttMessage; use super::MqttSplit; use crate::error::ConnectorResult as Result; use crate::parser::ParserConfig; -use crate::source::common::{into_chunk_stream, CommonSplitReader}; +use crate::source::common::into_chunk_stream; use crate::source::mqtt::MqttProperties; use crate::source::{BoxChunkSourceStream, Column, SourceContextRef, SourceMessage, SplitReader}; @@ -78,11 +78,11 @@ impl SplitReader for MqttSplitReader { fn into_stream(self) -> BoxChunkSourceStream { let parser_config = self.parser_config.clone(); let source_context = self.source_ctx.clone(); - into_chunk_stream(self, parser_config, source_context) + into_chunk_stream(self.into_data_stream(), parser_config, source_context) } } -impl CommonSplitReader for MqttSplitReader { +impl MqttSplitReader { #[try_stream(ok = Vec, error = crate::error::ConnectorError)] async fn into_data_stream(self) { let mut eventloop = self.eventloop; diff --git a/src/connector/src/source/nats/source/reader.rs b/src/connector/src/source/nats/source/reader.rs index 05954538b91cf..46a6cecd30bba 100644 --- a/src/connector/src/source/nats/source/reader.rs +++ b/src/connector/src/source/nats/source/reader.rs @@ -23,7 +23,7 @@ use super::message::NatsMessage; use super::{NatsOffset, NatsSplit}; use crate::error::ConnectorResult as Result; use crate::parser::ParserConfig; -use crate::source::common::{into_chunk_stream, CommonSplitReader}; +use crate::source::common::into_chunk_stream; use crate::source::nats::NatsProperties; use crate::source::{ BoxChunkSourceStream, Column, SourceContextRef, SourceMessage, SplitId, SplitReader, @@ -98,11 +98,11 @@ impl SplitReader for NatsSplitReader { fn into_stream(self) -> BoxChunkSourceStream { let parser_config = self.parser_config.clone(); let source_context = self.source_ctx.clone(); - into_chunk_stream(self, parser_config, source_context) + into_chunk_stream(self.into_data_stream(), parser_config, source_context) } } -impl CommonSplitReader for NatsSplitReader { +impl NatsSplitReader { #[try_stream(ok = Vec, error = crate::error::ConnectorError)] async fn into_data_stream(self) { let capacity = self.source_ctx.source_ctrl_opts.chunk_size; diff --git a/src/connector/src/source/pulsar/source/reader.rs b/src/connector/src/source/pulsar/source/reader.rs index 967874b62c335..190a83f4ba772 100644 --- a/src/connector/src/source/pulsar/source/reader.rs +++ b/src/connector/src/source/pulsar/source/reader.rs @@ -38,8 +38,8 @@ use crate::parser::ParserConfig; use crate::source::pulsar::split::PulsarSplit; use crate::source::pulsar::{PulsarEnumeratorOffset, PulsarProperties}; use crate::source::{ - into_chunk_stream, BoxChunkSourceStream, Column, CommonSplitReader, SourceContextRef, - SourceMessage, SplitId, SplitMetaData, SplitReader, + into_chunk_stream, BoxChunkSourceStream, Column, SourceContextRef, SourceMessage, SplitId, + SplitMetaData, SplitReader, }; pub enum PulsarSplitReader { @@ -89,7 +89,11 @@ impl SplitReader for PulsarSplitReader { Self::Broker(reader) => { let (parser_config, source_context) = (reader.parser_config.clone(), reader.source_ctx.clone()); - Box::pin(into_chunk_stream(reader, parser_config, source_context)) + Box::pin(into_chunk_stream( + reader.into_data_stream(), + parser_config, + source_context, + )) } Self::Iceberg(reader) => Box::pin(reader.into_stream()), } @@ -227,11 +231,11 @@ impl SplitReader for PulsarBrokerReader { fn into_stream(self) -> BoxChunkSourceStream { let parser_config = self.parser_config.clone(); let source_context = self.source_ctx.clone(); - into_chunk_stream(self, parser_config, source_context) + into_chunk_stream(self.into_data_stream(), parser_config, source_context) } } -impl CommonSplitReader for PulsarBrokerReader { +impl PulsarBrokerReader { #[try_stream(ok = Vec, error = crate::error::ConnectorError)] async fn into_data_stream(self) { let max_chunk_size = self.source_ctx.source_ctrl_opts.chunk_size;