diff --git a/src/connector/src/source/base.rs b/src/connector/src/source/base.rs index 111ec6e2eacad..e5fc1ce0f5884 100644 --- a/src/connector/src/source/base.rs +++ b/src/connector/src/source/base.rs @@ -515,7 +515,7 @@ pub trait SourceLister: Sized { type Properties; async fn new(properties: Self::Properties) -> Result; - fn paginate(self) -> BoxTryStream>; + fn paginate(self) -> BoxTryStream; } #[cfg(test)] diff --git a/src/connector/src/source/filesystem/file_common.rs b/src/connector/src/source/filesystem/file_common.rs index 2d816e52ccc77..e5b29a02e1d58 100644 --- a/src/connector/src/source/filesystem/file_common.rs +++ b/src/connector/src/source/filesystem/file_common.rs @@ -56,13 +56,13 @@ impl FsSplit { } } -pub struct FsPage { +pub struct FsPageItem { pub name: String, pub size: usize, pub timestamp: Timestamp, } -impl FsPage { +impl FsPageItem { pub fn new(name: String, size: usize, timestamp: Timestamp) -> Self { Self { name, @@ -71,3 +71,5 @@ impl FsPage { } } } + +pub type FsPage = Vec; diff --git a/src/connector/src/source/filesystem/mod.rs b/src/connector/src/source/filesystem/mod.rs index fa5130406516f..8f2587384280b 100644 --- a/src/connector/src/source/filesystem/mod.rs +++ b/src/connector/src/source/filesystem/mod.rs @@ -16,7 +16,7 @@ pub use s3::{S3FileReader, S3Properties, S3SplitEnumerator, S3_CONNECTOR}; mod file_common; pub mod nd_streaming; -pub use file_common::{FsPage, FsSplit}; +pub use file_common::{FsPage, FsPageItem, FsSplit}; mod s3; pub mod s3_v2; pub const S3_V2_CONNECTOR: &str = "s3_v2"; diff --git a/src/connector/src/source/filesystem/s3_v2/lister.rs b/src/connector/src/source/filesystem/s3_v2/lister.rs index ae466e72f6ea7..c4212cafdfacd 100644 --- a/src/connector/src/source/filesystem/s3_v2/lister.rs +++ b/src/connector/src/source/filesystem/s3_v2/lister.rs @@ -24,7 +24,7 @@ use risingwave_common::types::Timestamp; use crate::aws_auth::AwsAuthProps; use crate::aws_utils::{default_conn_config, s3_client}; use crate::source::filesystem::file_common::{FsPage, FsSplit}; -use crate::source::filesystem::S3Properties; +use crate::source::filesystem::{S3Properties, FsPageItem}; use crate::source::{BoxTryStream, SourceLister}; /// Get the prefix from a glob @@ -69,9 +69,9 @@ pub struct S3SourceLister { } impl S3SourceLister { - #[try_stream(boxed, ok = Vec, error = RwError)] + #[try_stream(boxed, ok = FsPage, error = RwError)] async fn paginate_inner(self) { - 'round: loop { // start a new round + loop { // start a new round let mut next_continuation_token = None; 'truncated: loop { // loop to paginate let mut req = self @@ -102,7 +102,7 @@ impl S3SourceLister { .into_iter() .map(|obj| { let aws_ts = obj.last_modified().unwrap(); - FsPage::new( + FsPageItem::new( obj.key().unwrap().to_owned(), obj.size() as usize, Timestamp::from_timestamp_uncheck(aws_ts.secs(), aws_ts.subsec_nanos()), @@ -146,7 +146,7 @@ impl SourceLister for S3SourceLister { }) } - fn paginate(self) -> BoxTryStream> { + fn paginate(self) -> BoxTryStream { self.paginate_inner() } } diff --git a/src/source/src/connector_source.rs b/src/source/src/connector_source.rs index 90c2259ef1003..a9469924add82 100644 --- a/src/source/src/connector_source.rs +++ b/src/source/src/connector_source.rs @@ -76,7 +76,7 @@ impl ConnectorSource { .collect::>>() } - pub async fn source_lister(&self) -> Result>> { + pub async fn source_lister(&self) -> Result> { let config = self.config.clone(); let lister = match config { ConnectorProperties::S3(prop) => S3SourceLister::new(*prop).await?, diff --git a/src/stream/src/executor/source_v2/list_executor.rs b/src/stream/src/executor/source_v2/list_executor.rs index 9b73ee1a20157..eea7798634a1c 100644 --- a/src/stream/src/executor/source_v2/list_executor.rs +++ b/src/stream/src/executor/source_v2/list_executor.rs @@ -22,9 +22,7 @@ use risingwave_common::array::Op; use risingwave_common::catalog::Schema; use risingwave_common::system_param::local_manager::SystemParamsReaderRef; use risingwave_connector::source::filesystem::FsPage; -use risingwave_connector::source::{ - BoxTryStream, SourceCtrlOpts, StreamChunkWithState -}; +use risingwave_connector::source::{BoxTryStream, SourceCtrlOpts}; use risingwave_connector::ConnectorParams; use risingwave_source::source_desc::{SourceDesc, SourceDescBuilder}; use risingwave_storage::StateStore; @@ -34,7 +32,7 @@ use crate::executor::monitor::StreamingMetrics; use crate::executor::stream_reader::StreamReaderWithPause; use crate::executor::*; -pub struct ListExecutor { +pub struct FsListExecutor { actor_ctx: ActorContextRef, identity: String, @@ -62,7 +60,7 @@ pub struct ListExecutor { connector_params: ConnectorParams, } -impl ListExecutor { +impl FsListExecutor { #[allow(clippy::too_many_arguments)] pub fn new( actor_ctx: ActorContextRef, @@ -78,7 +76,7 @@ impl ListExecutor { ) -> Self { Self { actor_ctx, - identity: format!("ListExecutor {:X}", executor_id), + identity: format!("FsListExecutor {:X}", executor_id), schema, pk_indices, stream_source_core, @@ -90,46 +88,38 @@ impl ListExecutor { } } - async fn build_fs_source_lister( + async fn build_chunked_paginate_stream( &self, source_desc: &SourceDesc, - ) -> StreamExecutorResult>> { - source_desc + ) -> StreamExecutorResult> { + let stream = source_desc .source .source_lister() .await - .map_err(StreamExecutorError::connector_error) + .map_err(StreamExecutorError::connector_error)?; + + Ok(stream.map(|item| { + item.map(Self::map_fs_page_to_chunk) + }).boxed()) } - async fn fetch_one_page_chunk( - &self, - paginate_stream: &mut BoxTryStream>, - ) -> StreamExecutorResult { - match paginate_stream.next().await { - Some(Ok(page)) => { - let rows = page - .into_iter() - .map(|split| { - ( - Op::Insert, - OwnedRow::new(vec![ - Some(ScalarImpl::Utf8(split.name.into_boxed_str())), - Some(ScalarImpl::Timestamp(split.timestamp)), - Some(ScalarImpl::Int64(split.size as i64)), - ]), - ) - }) - .collect::>(); - Ok(StreamChunk::from_rows( - &rows, - &[DataType::Varchar, DataType::Timestamp, DataType::Int64], - )) - }, - Some(Err(err)) => Err(StreamExecutorError::connector_error(err)), - None => unreachable!(), // paginate_stream never ends - } + fn map_fs_page_to_chunk(page: FsPage) -> StreamChunk { + let rows = page + .into_iter() + .map(|split| {( + Op::Insert, + OwnedRow::new(vec![ + Some(ScalarImpl::Utf8(split.name.into_boxed_str())), + Some(ScalarImpl::Timestamp(split.timestamp)), + Some(ScalarImpl::Int64(split.size as i64)), + ]), + )}) + .collect::>(); + StreamChunk::from_rows( + &rows, + &[DataType::Varchar, DataType::Timestamp, DataType::Int64], + ) } - #[try_stream(ok = Message, error = StreamExecutorError)] async fn into_stream(mut self) { @@ -154,18 +144,15 @@ impl ListExecutor { .build() .map_err(StreamExecutorError::connector_error)?; - // TODO: init state store epoch - // Return the ownership of `stream_source_core` to the source executor. self.stream_source_core = Some(core); - // TODO: recover state - let mut paginate_stream = self.build_fs_source_lister(&source_desc).await?; + let chunked_paginate_stream = self.build_chunked_paginate_stream(&source_desc).await?; let barrier_stream = barrier_to_message_stream(barrier_receiver).boxed(); - let mut stream = StreamReaderWithPause::::new( + let mut stream = StreamReaderWithPause::::new( barrier_stream, - tokio_stream::pending().boxed(), + chunked_paginate_stream ); if barrier.is_pause_on_startup() { @@ -178,43 +165,34 @@ impl ListExecutor { match msg { Err(_) => (), Ok(msg) => match msg { + // Barrier arrives. Either::Left(msg) => match &msg { Message::Barrier(barrier) => { - let mut is_pause_resume = false; if let Some(mutation) = barrier.mutation.as_deref() { match mutation { - Mutation::Pause => { - stream.pause_stream(); - is_pause_resume = true; - }, - Mutation::Resume => { - stream.resume_stream(); - is_pause_resume = true; - } + Mutation::Pause => stream.pause_stream(), + Mutation::Resume => stream.resume_stream(), _ => (), } } - - if !is_pause_resume { - // TODO: persist some state here - let chunk = self.fetch_one_page_chunk(&mut paginate_stream).await?; - yield Message::Chunk(chunk); - } - - yield msg; // propagate the barrier + + // Propagate the barrier. + yield msg; } // Only barrier can be received. _ => unreachable!(), }, - // Right arm is always pending. - _ => unreachable!(), + // Chunked FsPage arrives. + Either::Right(chunk) => { + yield Message::Chunk(chunk); + } }, } } } } -impl Executor for ListExecutor { +impl Executor for FsListExecutor { fn execute(self: Box) -> BoxedMessageStream { self.into_stream().boxed() } @@ -296,7 +274,7 @@ mod tests { let system_params_manager = LocalSystemParamsManager::for_test(); - let executor = ListExecutor::new( + let executor = FsListExecutor::new( ActorContext::create(0), schema, pk_indices, diff --git a/src/stream/src/from_proto/source/trad_source.rs b/src/stream/src/from_proto/source/trad_source.rs index 77bbcc53e69c5..ee8ab9eac02a8 100644 --- a/src/stream/src/from_proto/source/trad_source.rs +++ b/src/stream/src/from_proto/source/trad_source.rs @@ -16,7 +16,7 @@ use risingwave_common::catalog::{ColumnId, Field, Schema, TableId}; use risingwave_common::types::DataType; use risingwave_common::util::sort_util::OrderType; use risingwave_connector::source::external::{ExternalTableType, SchemaTableName}; -use risingwave_connector::source::SourceCtrlOpts; +use risingwave_connector::source::{SourceCtrlOpts, S3_V2_CONNECTOR}; use risingwave_pb::stream_plan::SourceNode; use risingwave_source::source_desc::SourceDescBuilder; use risingwave_storage::panic_store::PanicStateStore; @@ -26,10 +26,12 @@ use super::*; use crate::executor::external::ExternalStorageTable; use crate::executor::source::StreamSourceCore; use crate::executor::source_executor::SourceExecutor; +use crate::executor::source_v2::list_executor::FsListExecutor; use crate::executor::state_table_handler::SourceStateTableHandler; use crate::executor::{CdcBackfillExecutor, FlowControlExecutor, FsSourceExecutor}; const FS_CONNECTORS: &[&str] = &["s3"]; +const FS_V2_CONNECTORS: &[&str] = &[S3_V2_CONNECTOR]; pub struct SourceExecutorBuilder; #[async_trait::async_trait] @@ -115,6 +117,7 @@ impl ExecutorBuilder for SourceExecutorBuilder { .map(|c| c.to_ascii_lowercase()) .unwrap_or_default(); let is_fs_connector = FS_CONNECTORS.contains(&connector.as_str()); + let is_fs_v2_connector = FS_V2_CONNECTORS.contains(&connector.as_str()); if is_fs_connector { FsSourceExecutor::new( @@ -129,6 +132,20 @@ impl ExecutorBuilder for SourceExecutorBuilder { source_ctrl_opts, )? .boxed() + } else if is_fs_v2_connector { + FsListExecutor::new( + params.actor_context.clone(), + schema.clone(), + params.pk_indices.clone(), + Some(stream_source_core), + params.executor_stats.clone(), + barrier_receiver, + system_params, + params.executor_id, + source_ctrl_opts.clone(), + params.env.connector_params(), + ) + .boxed() } else { let source_exec = SourceExecutor::new( params.actor_context.clone(),