From 2ac35d4324bcef2f106dfa1f07b901609251f152 Mon Sep 17 00:00:00 2001 From: tabVersion Date: Tue, 26 Dec 2023 23:21:07 +0800 Subject: [PATCH 1/4] fix --- .../src/source/filesystem/s3/source/reader.rs | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/src/connector/src/source/filesystem/s3/source/reader.rs b/src/connector/src/source/filesystem/s3/source/reader.rs index 4d51dbc4d2b44..40f14420a314d 100644 --- a/src/connector/src/source/filesystem/s3/source/reader.rs +++ b/src/connector/src/source/filesystem/s3/source/reader.rs @@ -32,7 +32,7 @@ use tokio_util::io::ReaderStream; use crate::aws_utils::{default_conn_config, s3_client}; use crate::common::AwsAuthProps; -use crate::parser::{ByteStreamSourceParserImpl, ParserConfig}; +use crate::parser::{ByteStreamSourceParserImpl, EncodingProperties, ParserConfig}; use crate::source::base::{SplitMetaData, SplitReader}; use crate::source::filesystem::file_common::FsSplit; use crate::source::filesystem::nd_streaming; @@ -209,6 +209,11 @@ impl S3FileReader { let actor_id = self.source_ctx.source_info.actor_id.to_string(); let source_id = self.source_ctx.source_info.source_id.to_string(); let source_ctx = self.source_ctx.clone(); + let use_nd_streaming = { + let _encoding_config = &self.parser_config.specific.encoding_config; + matches!(&EncodingProperties::Json, _encoding_config) + || matches!(&EncodingProperties::Csv, _encoding_config) + }; let split_id = split.id(); @@ -221,10 +226,7 @@ impl S3FileReader { let parser = ByteStreamSourceParserImpl::create(self.parser_config.clone(), source_ctx).await?; - let msg_stream = if matches!( - parser, - ByteStreamSourceParserImpl::Json(_) | ByteStreamSourceParserImpl::Csv(_) - ) { + let msg_stream = if use_nd_streaming { parser.into_stream(nd_streaming::split_stream(data_stream)) } else { parser.into_stream(data_stream) From bf650c414b7ee6ed321a730600b98941abb47831 Mon Sep 17 00:00:00 2001 From: tabVersion Date: Tue, 26 Dec 2023 23:30:55 +0800 Subject: [PATCH 2/4] fix --- src/connector/src/source/filesystem/nd_streaming.rs | 6 ++++++ .../filesystem/opendal_source/opendal_reader.rs | 6 ++---- .../src/source/filesystem/s3/source/reader.rs | 11 ++++------- 3 files changed, 12 insertions(+), 11 deletions(-) diff --git a/src/connector/src/source/filesystem/nd_streaming.rs b/src/connector/src/source/filesystem/nd_streaming.rs index 428514d2d6adf..bce96fc0976d7 100644 --- a/src/connector/src/source/filesystem/nd_streaming.rs +++ b/src/connector/src/source/filesystem/nd_streaming.rs @@ -18,8 +18,14 @@ use futures::io::Cursor; use futures::AsyncBufReadExt; use futures_async_stream::try_stream; +use crate::parser::EncodingProperties; use crate::source::{BoxSourceStream, SourceMessage}; +pub fn need_nd_streaming(encode_config: &EncodingProperties) -> bool { + matches!(encode_config, &EncodingProperties::Json(_)) + || matches!(encode_config, EncodingProperties::Csv(_)) +} + #[try_stream(boxed, ok = Vec, error = anyhow::Error)] /// This function splits a byte stream by the newline separator "(\r)\n" into a message stream. /// It can be difficult to split and compute offsets correctly when the bytes are received in diff --git a/src/connector/src/source/filesystem/opendal_source/opendal_reader.rs b/src/connector/src/source/filesystem/opendal_source/opendal_reader.rs index d2758ba9bb0ef..7efb13f409478 100644 --- a/src/connector/src/source/filesystem/opendal_source/opendal_reader.rs +++ b/src/connector/src/source/filesystem/opendal_source/opendal_reader.rs @@ -24,6 +24,7 @@ use tokio_util::io::{ReaderStream, StreamReader}; use super::opendal_enumerator::OpendalEnumerator; use super::OpendalSource; use crate::parser::{ByteStreamSourceParserImpl, ParserConfig}; +use crate::source::filesystem::nd_streaming::need_nd_streaming; use crate::source::filesystem::{nd_streaming, OpendalFsSplit}; use crate::source::{ BoxSourceWithStateStream, Column, SourceContextRef, SourceMessage, SourceMeta, SplitMetaData, @@ -81,10 +82,7 @@ impl OpendalReader { let parser = ByteStreamSourceParserImpl::create(self.parser_config.clone(), source_ctx).await?; - let msg_stream = if matches!( - parser, - ByteStreamSourceParserImpl::Json(_) | ByteStreamSourceParserImpl::Csv(_) - ) { + let msg_stream = if need_nd_streaming(&self.parser_config.specific.encoding_config) { parser.into_stream(nd_streaming::split_stream(data_stream)) } else { parser.into_stream(data_stream) diff --git a/src/connector/src/source/filesystem/s3/source/reader.rs b/src/connector/src/source/filesystem/s3/source/reader.rs index 40f14420a314d..6cf4387ae3e92 100644 --- a/src/connector/src/source/filesystem/s3/source/reader.rs +++ b/src/connector/src/source/filesystem/s3/source/reader.rs @@ -32,15 +32,17 @@ use tokio_util::io::ReaderStream; use crate::aws_utils::{default_conn_config, s3_client}; use crate::common::AwsAuthProps; -use crate::parser::{ByteStreamSourceParserImpl, EncodingProperties, ParserConfig}; +use crate::parser::{ByteStreamSourceParserImpl, ParserConfig}; use crate::source::base::{SplitMetaData, SplitReader}; use crate::source::filesystem::file_common::FsSplit; use crate::source::filesystem::nd_streaming; +use crate::source::filesystem::nd_streaming::need_nd_streaming; use crate::source::filesystem::s3::S3Properties; use crate::source::{ BoxSourceWithStateStream, Column, SourceContextRef, SourceMessage, SourceMeta, StreamChunkWithState, }; + const MAX_CHANNEL_BUFFER_SIZE: usize = 2048; const STREAM_READER_CAPACITY: usize = 4096; @@ -209,11 +211,6 @@ impl S3FileReader { let actor_id = self.source_ctx.source_info.actor_id.to_string(); let source_id = self.source_ctx.source_info.source_id.to_string(); let source_ctx = self.source_ctx.clone(); - let use_nd_streaming = { - let _encoding_config = &self.parser_config.specific.encoding_config; - matches!(&EncodingProperties::Json, _encoding_config) - || matches!(&EncodingProperties::Csv, _encoding_config) - }; let split_id = split.id(); @@ -226,7 +223,7 @@ impl S3FileReader { let parser = ByteStreamSourceParserImpl::create(self.parser_config.clone(), source_ctx).await?; - let msg_stream = if use_nd_streaming { + let msg_stream = if need_nd_streaming(&self.parser_config.specific.encoding_config) { parser.into_stream(nd_streaming::split_stream(data_stream)) } else { parser.into_stream(data_stream) From f2d690a9244833d0933164c7e92b1b5a69f6e17a Mon Sep 17 00:00:00 2001 From: tabVersion Date: Tue, 26 Dec 2023 23:45:58 +0800 Subject: [PATCH 3/4] rerun Signed-off-by: tabVersion From 6e0bb0c67effaa8d010e34c3e794e50f83bd0ad7 Mon Sep 17 00:00:00 2001 From: tabVersion Date: Tue, 26 Dec 2023 23:53:40 +0800 Subject: [PATCH 4/4] rerun Signed-off-by: tabVersion