From c7cdfb57a6f684440655556908747b766ea3442a Mon Sep 17 00:00:00 2001 From: Richard Chien Date: Fri, 6 Dec 2024 15:38:01 +0800 Subject: [PATCH] minor Signed-off-by: Richard Chien --- src/connector/src/parser/mod.rs | 30 +++++++++++++++++------------- src/connector/src/source/base.rs | 2 +- 2 files changed, 18 insertions(+), 14 deletions(-) diff --git a/src/connector/src/parser/mod.rs b/src/connector/src/parser/mod.rs index d2aa5502ae53b..a247678a15748 100644 --- a/src/connector/src/parser/mod.rs +++ b/src/connector/src/parser/mod.rs @@ -49,7 +49,7 @@ use crate::schema::schema_registry::SchemaRegistryAuth; use crate::source::monitor::GLOBAL_SOURCE_METRICS; use crate::source::{ BoxSourceStream, ChunkSourceStream, SourceColumnDesc, SourceColumnType, SourceContext, - SourceContextRef, SourceMessage, SourceMeta, + SourceContextRef, SourceCtrlOpts, SourceMessage, SourceMeta, }; mod access_builder; @@ -221,29 +221,32 @@ async fn ensure_max_chunk_size(stream: BoxSourceStream, max_chunk_size: usize) { #[easy_ext::ext(SourceParserIntoStreamExt)] impl P { - /// Parse a data stream of one source split into a stream of [`StreamChunk`]. + /// Parse a stream of vectors of [`SourceMessage`] into a stream of [`StreamChunk`]. /// /// # Arguments - /// - `data_stream`: A data stream of one source split. - /// To be able to split multiple messages from mq, so it is not a pure byte stream + /// + /// - `msg_stream`: A stream of vectors of [`SourceMessage`]. /// /// # Returns /// - /// A [`ChunkSourceStream`] which is a stream of parsed messages. - pub fn into_stream(self, data_stream: BoxSourceStream) -> impl ChunkSourceStream { + /// A [`ChunkSourceStream`] which is a stream of parsed chunks. Each of the parsed chunks + /// are guaranteed to have less than or equal to `source_ctrl_opts.chunk_size` rows, unless + /// there's a large transaction and `source_ctrl_opts.split_txn` is false. + pub fn into_stream(self, msg_stream: BoxSourceStream) -> impl ChunkSourceStream { let actor_id = self.source_ctx().actor_id; let source_id = self.source_ctx().source_id.table_id(); // TODO(): remove this later // Ensure chunk size is smaller than rate limit - let data_stream = Box::pin(ensure_max_chunk_size( - data_stream, + let msg_stream = Box::pin(ensure_max_chunk_size( + msg_stream, self.source_ctx().source_ctrl_opts.chunk_size, )); - // The parser stream will be long-lived. We use `instrument_with` here to create + // The stream will be long-lived. We use `instrument_with` here to create // a new span for the polling of each chunk. - into_chunk_stream_inner(self, data_stream) + let source_ctrl_opts = self.source_ctx().source_ctrl_opts; + into_chunk_stream_inner(self, msg_stream, source_ctrl_opts) .instrument_with(move || tracing::info_span!("source_parse_chunk", actor_id, source_id)) } } @@ -257,7 +260,8 @@ const MAX_TRANSACTION_SIZE: usize = 4096; #[try_stream(ok = StreamChunk, error = crate::error::ConnectorError)] async fn into_chunk_stream_inner( mut parser: P, - data_stream: BoxSourceStream, + msg_stream: BoxSourceStream, + source_ctrl_ops: SourceCtrlOpts, ) { let columns = parser.columns().to_vec(); @@ -271,7 +275,7 @@ async fn into_chunk_stream_inner( let mut direct_cdc_event_lag_latency_metrics = HashMap::new(); #[for_await] - for batch in data_stream { + for batch in msg_stream { // It's possible that the split is not active, which means the next batch may arrive // very lately, so we should prefer emitting all records in current batch before the end // of each iteration, instead of merging them with the next batch. An exception is when @@ -490,7 +494,7 @@ pub enum ByteStreamSourceParserImpl { } impl ByteStreamSourceParserImpl { - /// Converts [`SourceMessage`] stream into [`StreamChunk`] stream. + /// Converts [`SourceMessage`] vec stream into [`StreamChunk`] stream. pub fn into_stream(self, msg_stream: BoxSourceStream) -> impl ChunkSourceStream + Unpin { #[auto_enum(futures03::Stream)] let stream = match self { diff --git a/src/connector/src/source/base.rs b/src/connector/src/source/base.rs index 71d690524b2b0..48fade3fbddd7 100644 --- a/src/connector/src/source/base.rs +++ b/src/connector/src/source/base.rs @@ -141,7 +141,7 @@ pub type SourceEnumeratorContextRef = Arc; /// The max size of a chunk yielded by source stream. pub const MAX_CHUNK_SIZE: usize = 1024; -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Copy)] pub struct SourceCtrlOpts { /// The max size of a chunk yielded by source stream. pub chunk_size: usize,