diff --git a/src/batch/src/executor/source.rs b/src/batch/src/executor/source.rs index 3651b3504c3f6..42f610455c134 100644 --- a/src/batch/src/executor/source.rs +++ b/src/batch/src/executor/source.rs @@ -150,7 +150,7 @@ impl SourceExecutor { )); let stream = self .source - .into_stream(Some(vec![self.split]), self.column_ids, source_ctx) + .to_stream(Some(vec![self.split]), self.column_ids, source_ctx) .await?; #[for_await] diff --git a/src/connector/src/source/reader/fs_reader.rs b/src/connector/src/source/reader/fs_reader.rs index 716cb3e11a543..95106ebbd2eee 100644 --- a/src/connector/src/source/reader/fs_reader.rs +++ b/src/connector/src/source/reader/fs_reader.rs @@ -77,7 +77,7 @@ impl FsSourceReader { .collect::>>() } - pub async fn into_stream( + pub async fn to_stream( &self, state: ConnectorState, column_ids: Vec, diff --git a/src/connector/src/source/reader/mod.rs b/src/connector/src/source/reader/mod.rs index 7381afe1df84c..fc6800a0f1da7 100644 --- a/src/connector/src/source/reader/mod.rs +++ b/src/connector/src/source/reader/mod.rs @@ -14,4 +14,5 @@ pub mod desc; pub mod fs_reader; +#[expect(clippy::module_inception)] pub mod reader; diff --git a/src/connector/src/source/reader/reader.rs b/src/connector/src/source/reader/reader.rs index f340087d5848d..5f9259058f869 100644 --- a/src/connector/src/source/reader/reader.rs +++ b/src/connector/src/source/reader/reader.rs @@ -102,7 +102,7 @@ impl SourceReader { } } - pub async fn into_stream( + pub async fn to_stream( &self, state: ConnectorState, column_ids: Vec, diff --git a/src/stream/src/executor/source/fetch_executor.rs b/src/stream/src/executor/source/fetch_executor.rs index a9aff438b51c9..98b6c3c87c32f 100644 --- a/src/stream/src/executor/source/fetch_executor.rs +++ b/src/stream/src/executor/source/fetch_executor.rs @@ -162,7 +162,7 @@ impl FsFetchExecutor { ) -> StreamExecutorResult { source_desc .source - .into_stream(batch, column_ids, Arc::new(source_ctx)) + .to_stream(batch, column_ids, Arc::new(source_ctx)) .await .map_err(StreamExecutorError::connector_error) } diff --git a/src/stream/src/executor/source/fs_source_executor.rs b/src/stream/src/executor/source/fs_source_executor.rs index ca5e2e0d8dd8f..e0129f34a1aae 100644 --- a/src/stream/src/executor/source/fs_source_executor.rs +++ b/src/stream/src/executor/source/fs_source_executor.rs @@ -110,7 +110,7 @@ impl FsSourceExecutor { ); source_desc .source - .into_stream(state, column_ids, Arc::new(source_ctx)) + .to_stream(state, column_ids, Arc::new(source_ctx)) .await .map_err(StreamExecutorError::connector_error) } diff --git a/src/stream/src/executor/source/source_executor.rs b/src/stream/src/executor/source/source_executor.rs index 66903f7ecb3e7..661983e4bea38 100644 --- a/src/stream/src/executor/source/source_executor.rs +++ b/src/stream/src/executor/source/source_executor.rs @@ -111,7 +111,7 @@ impl SourceExecutor { ); source_desc .source - .into_stream(state, column_ids, Arc::new(source_ctx)) + .to_stream(state, column_ids, Arc::new(source_ctx)) .await .map_err(StreamExecutorError::connector_error) }