diff --git a/src/connector/src/source/filesystem/nd_streaming.rs b/src/connector/src/source/filesystem/nd_streaming.rs index 428514d2d6adf..bce96fc0976d7 100644 --- a/src/connector/src/source/filesystem/nd_streaming.rs +++ b/src/connector/src/source/filesystem/nd_streaming.rs @@ -18,8 +18,14 @@ use futures::io::Cursor; use futures::AsyncBufReadExt; use futures_async_stream::try_stream; +use crate::parser::EncodingProperties; use crate::source::{BoxSourceStream, SourceMessage}; +pub fn need_nd_streaming(encode_config: &EncodingProperties) -> bool { + matches!(encode_config, &EncodingProperties::Json(_)) + || matches!(encode_config, EncodingProperties::Csv(_)) +} + #[try_stream(boxed, ok = Vec, error = anyhow::Error)] /// This function splits a byte stream by the newline separator "(\r)\n" into a message stream. /// It can be difficult to split and compute offsets correctly when the bytes are received in diff --git a/src/connector/src/source/filesystem/opendal_source/opendal_reader.rs b/src/connector/src/source/filesystem/opendal_source/opendal_reader.rs index d2758ba9bb0ef..7efb13f409478 100644 --- a/src/connector/src/source/filesystem/opendal_source/opendal_reader.rs +++ b/src/connector/src/source/filesystem/opendal_source/opendal_reader.rs @@ -24,6 +24,7 @@ use tokio_util::io::{ReaderStream, StreamReader}; use super::opendal_enumerator::OpendalEnumerator; use super::OpendalSource; use crate::parser::{ByteStreamSourceParserImpl, ParserConfig}; +use crate::source::filesystem::nd_streaming::need_nd_streaming; use crate::source::filesystem::{nd_streaming, OpendalFsSplit}; use crate::source::{ BoxSourceWithStateStream, Column, SourceContextRef, SourceMessage, SourceMeta, SplitMetaData, @@ -81,10 +82,7 @@ impl OpendalReader { let parser = ByteStreamSourceParserImpl::create(self.parser_config.clone(), source_ctx).await?; - let msg_stream = if matches!( - parser, - ByteStreamSourceParserImpl::Json(_) | ByteStreamSourceParserImpl::Csv(_) - ) { + let msg_stream = if need_nd_streaming(&self.parser_config.specific.encoding_config) { parser.into_stream(nd_streaming::split_stream(data_stream)) } else { parser.into_stream(data_stream) diff --git a/src/connector/src/source/filesystem/s3/source/reader.rs b/src/connector/src/source/filesystem/s3/source/reader.rs index 4d51dbc4d2b44..6cf4387ae3e92 100644 --- a/src/connector/src/source/filesystem/s3/source/reader.rs +++ b/src/connector/src/source/filesystem/s3/source/reader.rs @@ -36,11 +36,13 @@ use crate::parser::{ByteStreamSourceParserImpl, ParserConfig}; use crate::source::base::{SplitMetaData, SplitReader}; use crate::source::filesystem::file_common::FsSplit; use crate::source::filesystem::nd_streaming; +use crate::source::filesystem::nd_streaming::need_nd_streaming; use crate::source::filesystem::s3::S3Properties; use crate::source::{ BoxSourceWithStateStream, Column, SourceContextRef, SourceMessage, SourceMeta, StreamChunkWithState, }; + const MAX_CHANNEL_BUFFER_SIZE: usize = 2048; const STREAM_READER_CAPACITY: usize = 4096; @@ -221,10 +223,7 @@ impl S3FileReader { let parser = ByteStreamSourceParserImpl::create(self.parser_config.clone(), source_ctx).await?; - let msg_stream = if matches!( - parser, - ByteStreamSourceParserImpl::Json(_) | ByteStreamSourceParserImpl::Csv(_) - ) { + let msg_stream = if need_nd_streaming(&self.parser_config.specific.encoding_config) { parser.into_stream(nd_streaming::split_stream(data_stream)) } else { parser.into_stream(data_stream)