From c82268b7c2e98af98e72dc50a4892f55d21e4565 Mon Sep 17 00:00:00 2001 From: Rossil <40714231+Rossil2012@users.noreply.github.com> Date: Tue, 26 Sep 2023 11:35:12 +0800 Subject: [PATCH 1/6] implement list executor and S3SourceLister --- src/connector/src/source/base.rs | 11 +- .../src/source/filesystem/file_common.rs | 18 +- src/connector/src/source/filesystem/mod.rs | 3 +- .../src/source/filesystem/s3_v2/lister.rs | 138 +++++++ .../src/source/filesystem/s3_v2/mod.rs | 15 + src/source/src/connector_source.rs | 16 +- src/stream/src/executor/mod.rs | 1 + .../src/executor/source_v2/list_executor.rs | 342 ++++++++++++++++++ src/stream/src/executor/source_v2/mod.rs | 15 + 9 files changed, 554 insertions(+), 5 deletions(-) create mode 100644 src/connector/src/source/filesystem/s3_v2/lister.rs create mode 100644 src/connector/src/source/filesystem/s3_v2/mod.rs create mode 100644 src/stream/src/executor/source_v2/list_executor.rs create mode 100644 src/stream/src/executor/source_v2/mod.rs diff --git a/src/connector/src/source/base.rs b/src/connector/src/source/base.rs index d02645385b81f..5205dea540946 100644 --- a/src/connector/src/source/base.rs +++ b/src/connector/src/source/base.rs @@ -33,7 +33,7 @@ use risingwave_rpc_client::ConnectorClient; use serde::de::DeserializeOwned; use super::datagen::DatagenMeta; -use super::filesystem::FsSplit; +use super::filesystem::{FsPage, FsSplit}; use super::google_pubsub::GooglePubsubMeta; use super::kafka::KafkaMeta; use super::monitor::SourceMetrics; @@ -499,6 +499,15 @@ pub trait SplitMetaData: Sized { /// [`None`] and the created source stream will be a pending stream. pub type ConnectorState = Option>; +#[async_trait] +pub trait SourceLister: Sized { + type Split: SplitMetaData + Send; + type Properties; + + async fn new(properties: Self::Properties) -> Result; + fn paginate(self) -> BoxTryStream>; +} + #[cfg(test)] mod tests { use maplit::*; diff --git a/src/connector/src/source/filesystem/file_common.rs b/src/connector/src/source/filesystem/file_common.rs index d4328289b547f..2d816e52ccc77 100644 --- a/src/connector/src/source/filesystem/file_common.rs +++ b/src/connector/src/source/filesystem/file_common.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. use anyhow::anyhow; -use risingwave_common::types::JsonbVal; +use risingwave_common::types::{JsonbVal, Timestamp}; use serde::{Deserialize, Serialize}; use crate::source::{SplitId, SplitMetaData}; @@ -55,3 +55,19 @@ impl FsSplit { } } } + +pub struct FsPage { + pub name: String, + pub size: usize, + pub timestamp: Timestamp, +} + +impl FsPage { + pub fn new(name: String, size: usize, timestamp: Timestamp) -> Self { + Self { + name, + size, + timestamp, + } + } +} diff --git a/src/connector/src/source/filesystem/mod.rs b/src/connector/src/source/filesystem/mod.rs index 729fb376ecc6e..3d40763caf18b 100644 --- a/src/connector/src/source/filesystem/mod.rs +++ b/src/connector/src/source/filesystem/mod.rs @@ -16,5 +16,6 @@ pub use s3::{S3FileReader, S3Properties, S3SplitEnumerator, S3_CONNECTOR}; mod file_common; pub mod nd_streaming; -pub use file_common::FsSplit; +pub use file_common::{FsPage, FsSplit}; mod s3; +pub mod s3_v2; diff --git a/src/connector/src/source/filesystem/s3_v2/lister.rs b/src/connector/src/source/filesystem/s3_v2/lister.rs new file mode 100644 index 0000000000000..2b8789d3db4ea --- /dev/null +++ b/src/connector/src/source/filesystem/s3_v2/lister.rs @@ -0,0 +1,138 @@ +use anyhow::{anyhow, Context, Result}; +use async_trait::async_trait; +use aws_sdk_s3::error::DisplayErrorContext; +use aws_sdk_s3::Client; +use futures_async_stream::try_stream; +use itertools::Itertools; +use risingwave_common::error::RwError; +use risingwave_common::types::Timestamp; + +use crate::aws_auth::AwsAuthProps; +use crate::aws_utils::{default_conn_config, s3_client}; +use crate::source::filesystem::file_common::{FsPage, FsSplit}; +use crate::source::filesystem::S3Properties; +use crate::source::{BoxTryStream, SourceLister}; + +/// Get the prefix from a glob +fn get_prefix(glob: &str) -> String { + let mut escaped = false; + let mut escaped_filter = false; + glob.chars() + .take_while(|c| match (c, &escaped) { + ('*', false) => false, + ('[', false) => false, + ('{', false) => false, + ('\\', false) => { + escaped = true; + true + } + (_, false) => true, + (_, true) => { + escaped = false; + true + } + }) + .filter(|c| match (c, &escaped_filter) { + (_, true) => { + escaped_filter = false; + true + } + ('\\', false) => { + escaped_filter = true; + false + } + (_, _) => true, + }) + .collect() +} + +pub struct S3SourceLister { + client: Client, + // prefix is used to reduce the number of objects to be listed + prefix: Option, + matcher: Option, + bucket_name: String, +} + +impl S3SourceLister { + #[try_stream(boxed, ok = Vec, error = RwError)] + async fn paginate_inner(self) { + loop { // start a new round + let mut next_continuation_token = None; + loop { // loop to paginate + let mut req = self + .client + .list_objects_v2() + .bucket(&self.bucket_name) + .set_prefix(self.prefix.clone()); + if let Some(continuation_token) = next_continuation_token.take() { + req = req.continuation_token(continuation_token); + } + let mut res = req + .send() + .await + .map_err(|e| anyhow!(DisplayErrorContext(e)))?; + + yield res + .contents + .take() + .unwrap_or_default() + .iter() + .filter(|obj| obj.key().is_some()) + .filter(|obj| { + self.matcher + .as_ref() + .map(|m| m.matches(obj.key().unwrap())) + .unwrap_or(true) + }) + .into_iter() + .map(|obj| { + let aws_ts = obj.last_modified().unwrap(); + FsPage::new( + obj.key().unwrap().to_owned(), + obj.size() as usize, + Timestamp::from_timestamp_uncheck(aws_ts.secs(), aws_ts.subsec_nanos()), + ) + }) + .collect_vec(); + + if res.is_truncated() { + next_continuation_token = Some(res.next_continuation_token.unwrap()) + } else { + break; + } + } + } + } +} + +#[async_trait] +impl SourceLister for S3SourceLister { + type Properties = S3Properties; + type Split = FsSplit; + + async fn new(properties: Self::Properties) -> Result { + let config = AwsAuthProps::from(&properties); + let sdk_config = config.build_config().await?; + let s3_client = s3_client(&sdk_config, Some(default_conn_config())); + let (prefix, matcher) = if let Some(pattern) = properties.match_pattern.as_ref() { + let prefix = get_prefix(pattern); + let matcher = glob::Pattern::new(pattern) + .with_context(|| format!("Invalid match_pattern: {}", pattern))?; + (Some(prefix), Some(matcher)) + } else { + (None, None) + }; + + Ok(Self { + bucket_name: properties.bucket_name, + matcher, + prefix, + client: s3_client, + }) + } + + fn paginate(self) -> BoxTryStream> { + self.paginate_inner() + } +} diff --git a/src/connector/src/source/filesystem/s3_v2/mod.rs b/src/connector/src/source/filesystem/s3_v2/mod.rs new file mode 100644 index 0000000000000..6fab862daca1e --- /dev/null +++ b/src/connector/src/source/filesystem/s3_v2/mod.rs @@ -0,0 +1,15 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod lister; diff --git a/src/source/src/connector_source.rs b/src/source/src/connector_source.rs index 445bf0f6dbb90..90c2259ef1003 100644 --- a/src/source/src/connector_source.rs +++ b/src/source/src/connector_source.rs @@ -25,9 +25,11 @@ use risingwave_common::error::{internal_error, Result}; use risingwave_common::util::select_all; use risingwave_connector::dispatch_source_prop; use risingwave_connector::parser::{CommonParserConfig, ParserConfig, SpecificParserConfig}; +use risingwave_connector::source::filesystem::s3_v2::lister::S3SourceLister; +use risingwave_connector::source::filesystem::FsPage; use risingwave_connector::source::{ - create_split_reader, BoxSourceWithStateStream, Column, ConnectorProperties, ConnectorState, - SourceColumnDesc, SourceContext, SplitReader, + create_split_reader, BoxSourceWithStateStream, BoxTryStream, Column, ConnectorProperties, + ConnectorState, SourceColumnDesc, SourceContext, SourceLister, SplitReader, }; #[derive(Clone, Debug)] @@ -74,6 +76,16 @@ impl ConnectorSource { .collect::>>() } + pub async fn source_lister(&self) -> Result>> { + let config = self.config.clone(); + let lister = match config { + ConnectorProperties::S3(prop) => S3SourceLister::new(*prop).await?, + _ => unreachable!(), + }; + + Ok(lister.paginate()) + } + pub async fn stream_reader( &self, state: ConnectorState, diff --git a/src/stream/src/executor/mod.rs b/src/stream/src/executor/mod.rs index 8fa7a5d818cc4..6fcd928e2c523 100644 --- a/src/stream/src/executor/mod.rs +++ b/src/stream/src/executor/mod.rs @@ -87,6 +87,7 @@ mod sink; mod sort; mod sort_buffer; pub mod source; +pub mod source_v2; mod stateless_simple_agg; mod stream_reader; pub mod subtask; diff --git a/src/stream/src/executor/source_v2/list_executor.rs b/src/stream/src/executor/source_v2/list_executor.rs new file mode 100644 index 0000000000000..9b73ee1a20157 --- /dev/null +++ b/src/stream/src/executor/source_v2/list_executor.rs @@ -0,0 +1,342 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use anyhow::anyhow; +use either::Either; +use futures::StreamExt; +use futures_async_stream::try_stream; +use risingwave_common::array::Op; +use risingwave_common::catalog::Schema; +use risingwave_common::system_param::local_manager::SystemParamsReaderRef; +use risingwave_connector::source::filesystem::FsPage; +use risingwave_connector::source::{ + BoxTryStream, SourceCtrlOpts, StreamChunkWithState +}; +use risingwave_connector::ConnectorParams; +use risingwave_source::source_desc::{SourceDesc, SourceDescBuilder}; +use risingwave_storage::StateStore; +use tokio::sync::mpsc::UnboundedReceiver; +use crate::executor::error::StreamExecutorError; +use crate::executor::monitor::StreamingMetrics; +use crate::executor::stream_reader::StreamReaderWithPause; +use crate::executor::*; + +pub struct ListExecutor { + actor_ctx: ActorContextRef, + + identity: String, + + schema: Schema, + + pk_indices: PkIndices, + + /// Streaming source for external + stream_source_core: Option>, + + /// Metrics for monitor. + metrics: Arc, + + /// Receiver of barrier channel. + barrier_receiver: Option>, + + /// System parameter reader to read barrier interval + system_params: SystemParamsReaderRef, + + // control options for connector level + source_ctrl_opts: SourceCtrlOpts, + + // config for the connector node + connector_params: ConnectorParams, +} + +impl ListExecutor { + #[allow(clippy::too_many_arguments)] + pub fn new( + actor_ctx: ActorContextRef, + schema: Schema, + pk_indices: PkIndices, + stream_source_core: Option>, + metrics: Arc, + barrier_receiver: UnboundedReceiver, + system_params: SystemParamsReaderRef, + executor_id: u64, + source_ctrl_opts: SourceCtrlOpts, + connector_params: ConnectorParams, + ) -> Self { + Self { + actor_ctx, + identity: format!("ListExecutor {:X}", executor_id), + schema, + pk_indices, + stream_source_core, + metrics, + barrier_receiver: Some(barrier_receiver), + system_params, + source_ctrl_opts, + connector_params, + } + } + + async fn build_fs_source_lister( + &self, + source_desc: &SourceDesc, + ) -> StreamExecutorResult>> { + source_desc + .source + .source_lister() + .await + .map_err(StreamExecutorError::connector_error) + } + + async fn fetch_one_page_chunk( + &self, + paginate_stream: &mut BoxTryStream>, + ) -> StreamExecutorResult { + match paginate_stream.next().await { + Some(Ok(page)) => { + let rows = page + .into_iter() + .map(|split| { + ( + Op::Insert, + OwnedRow::new(vec![ + Some(ScalarImpl::Utf8(split.name.into_boxed_str())), + Some(ScalarImpl::Timestamp(split.timestamp)), + Some(ScalarImpl::Int64(split.size as i64)), + ]), + ) + }) + .collect::>(); + Ok(StreamChunk::from_rows( + &rows, + &[DataType::Varchar, DataType::Timestamp, DataType::Int64], + )) + }, + Some(Err(err)) => Err(StreamExecutorError::connector_error(err)), + None => unreachable!(), // paginate_stream never ends + } + } + + + #[try_stream(ok = Message, error = StreamExecutorError)] + async fn into_stream(mut self) { + let mut barrier_receiver = self.barrier_receiver.take().unwrap(); + let barrier = barrier_receiver + .recv() + .instrument_await("source_recv_first_barrier") + .await + .ok_or_else(|| { + anyhow!( + "failed to receive the first barrier, actor_id: {:?}, source_id: {:?}", + self.actor_ctx.id, + self.stream_source_core.as_ref().unwrap().source_id + ) + })?; + + let mut core = self.stream_source_core.unwrap(); + + // Build source description from the builder. + let source_desc_builder: SourceDescBuilder = core.source_desc_builder.take().unwrap(); + let source_desc = source_desc_builder + .build() + .map_err(StreamExecutorError::connector_error)?; + + // TODO: init state store epoch + + // Return the ownership of `stream_source_core` to the source executor. + self.stream_source_core = Some(core); + + // TODO: recover state + let mut paginate_stream = self.build_fs_source_lister(&source_desc).await?; + + let barrier_stream = barrier_to_message_stream(barrier_receiver).boxed(); + let mut stream = StreamReaderWithPause::::new( + barrier_stream, + tokio_stream::pending().boxed(), + ); + + if barrier.is_pause_on_startup() { + stream.pause_stream(); + } + + yield Message::Barrier(barrier); + + while let Some(msg) = stream.next().await { + match msg { + Err(_) => (), + Ok(msg) => match msg { + Either::Left(msg) => match &msg { + Message::Barrier(barrier) => { + let mut is_pause_resume = false; + if let Some(mutation) = barrier.mutation.as_deref() { + match mutation { + Mutation::Pause => { + stream.pause_stream(); + is_pause_resume = true; + }, + Mutation::Resume => { + stream.resume_stream(); + is_pause_resume = true; + } + _ => (), + } + } + + if !is_pause_resume { + // TODO: persist some state here + let chunk = self.fetch_one_page_chunk(&mut paginate_stream).await?; + yield Message::Chunk(chunk); + } + + yield msg; // propagate the barrier + } + // Only barrier can be received. + _ => unreachable!(), + }, + // Right arm is always pending. + _ => unreachable!(), + }, + } + } + } +} + +impl Executor for ListExecutor { + fn execute(self: Box) -> BoxedMessageStream { + self.into_stream().boxed() + } + + fn schema(&self) -> &Schema { + &self.schema + } + + fn pk_indices(&self) -> PkIndicesRef<'_> { + &self.pk_indices + } + + fn identity(&self) -> &str { + self.identity.as_str() + } +} + +#[cfg(test)] +mod tests { + use futures::StreamExt; + use maplit::{convert_args, hashmap}; + use risingwave_common::catalog::{ColumnId, Field, Schema, TableId}; + use risingwave_common::system_param::local_manager::LocalSystemParamsManager; + use risingwave_common::types::DataType; + use risingwave_pb::catalog::StreamSourceInfo; + use risingwave_pb::plan_common::PbRowFormatType; + use risingwave_source::connector_test_utils::create_source_desc_builder; + use risingwave_storage::memory::MemoryStateStore; + use tokio::sync::mpsc::unbounded_channel; + + use super::*; + use crate::executor::ActorContext; + + const MOCK_SOURCE_NAME: &str = "mock_source"; + + #[tokio::test] + async fn test_fs_list_executor() { + let table_id = TableId::default(); + let schema = Schema { + fields: vec![ + Field::with_name(DataType::Varchar, "filename"), + Field::with_name(DataType::Timestamp, "timestamp"), + Field::with_name(DataType::Int64, "size"), + ], + }; + let row_id_index = None; + let pk_indices = vec![0]; + let source_info = StreamSourceInfo { + row_format: PbRowFormatType::Native as i32, + ..Default::default() + }; + let (barrier_tx, barrier_rx) = unbounded_channel::(); + let column_ids = vec![0].into_iter().map(ColumnId::from).collect(); + + let properties: HashMap = convert_args!(hashmap!( + "connector" => "s3", + "s3.region_name" => "us-east-1", + "s3.endpoint_url" => "http://[::1]:9090", + "s3.bucket_name" => "test", + "s3.credentials.access" => "any", + "s3.credentials.secret" => "any", + )); + let source_desc_builder = + create_source_desc_builder(&schema, row_id_index, source_info, properties, vec![]); + let split_state_store = SourceStateTableHandler::from_table_catalog( + &default_source_internal_table(0x2333), + MemoryStateStore::new(), + ) + .await; + let core = StreamSourceCore:: { + source_id: table_id, + column_ids, + source_desc_builder: Some(source_desc_builder), + stream_source_splits: HashMap::new(), + split_state_store, + state_cache: HashMap::new(), + source_name: MOCK_SOURCE_NAME.to_string(), + }; + + let system_params_manager = LocalSystemParamsManager::for_test(); + + let executor = ListExecutor::new( + ActorContext::create(0), + schema, + pk_indices, + Some(core), + Arc::new(StreamingMetrics::unused()), + barrier_rx, + system_params_manager.get_params(), + 1, + SourceCtrlOpts::default(), + ConnectorParams::default(), + ); + let mut executor = Box::new(executor).execute(); + + let init_barrier = Barrier::new_test_barrier(1).with_mutation(Mutation::Add { + adds: HashMap::new(), + added_actors: HashSet::new(), + splits: hashmap! { + ActorId::default() => vec![], + }, + pause: false, + }); + barrier_tx.send(init_barrier).unwrap(); + + // Consume init barrier. + executor.next().await.unwrap().unwrap(); + + // Consume the second barrier. + let barrier = Barrier::new_test_barrier(2); + barrier_tx.send(barrier).unwrap(); + let msg = executor.next().await.unwrap().unwrap(); // page chunk + executor.next().await.unwrap().unwrap(); // barrier + + println!("barrier 2: {:#?}", msg); + + // Consume the third barrier. + let barrier = Barrier::new_test_barrier(3); + barrier_tx.send(barrier).unwrap(); + let msg = executor.next().await.unwrap().unwrap(); // page chunk + executor.next().await.unwrap().unwrap(); // barrier + + println!("barrier 3: {:#?}", msg); + } +} diff --git a/src/stream/src/executor/source_v2/mod.rs b/src/stream/src/executor/source_v2/mod.rs new file mode 100644 index 0000000000000..a6cf4823bdd2f --- /dev/null +++ b/src/stream/src/executor/source_v2/mod.rs @@ -0,0 +1,15 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod list_executor; From 7461d2099b22e2d7127b1bc7c6433443b599e559 Mon Sep 17 00:00:00 2001 From: Rossil <40714231+Rossil2012@users.noreply.github.com> Date: Tue, 26 Sep 2023 11:41:19 +0800 Subject: [PATCH 2/6] add license header for lister.rs --- .../src/source/filesystem/s3_v2/lister.rs | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/src/connector/src/source/filesystem/s3_v2/lister.rs b/src/connector/src/source/filesystem/s3_v2/lister.rs index 2b8789d3db4ea..6a14cadbea9dd 100644 --- a/src/connector/src/source/filesystem/s3_v2/lister.rs +++ b/src/connector/src/source/filesystem/s3_v2/lister.rs @@ -1,3 +1,17 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + use anyhow::{anyhow, Context, Result}; use async_trait::async_trait; use aws_sdk_s3::error::DisplayErrorContext; From 2a7a6e1d21cbd2c1df6c7593e99d115e5c31cf95 Mon Sep 17 00:00:00 2001 From: Rossil <40714231+Rossil2012@users.noreply.github.com> Date: Tue, 26 Sep 2023 17:02:33 +0800 Subject: [PATCH 3/6] add loop label Co-authored-by: Bohan Zhang --- src/connector/src/source/filesystem/s3_v2/lister.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/connector/src/source/filesystem/s3_v2/lister.rs b/src/connector/src/source/filesystem/s3_v2/lister.rs index 6a14cadbea9dd..ae466e72f6ea7 100644 --- a/src/connector/src/source/filesystem/s3_v2/lister.rs +++ b/src/connector/src/source/filesystem/s3_v2/lister.rs @@ -71,9 +71,9 @@ pub struct S3SourceLister { impl S3SourceLister { #[try_stream(boxed, ok = Vec, error = RwError)] async fn paginate_inner(self) { - loop { // start a new round + 'round: loop { // start a new round let mut next_continuation_token = None; - loop { // loop to paginate + 'truncated: loop { // loop to paginate let mut req = self .client .list_objects_v2() @@ -113,7 +113,7 @@ impl S3SourceLister { if res.is_truncated() { next_continuation_token = Some(res.next_continuation_token.unwrap()) } else { - break; + break 'truncated; } } } From ae20d0b209d68e2f082f08d499b290fe7411fc58 Mon Sep 17 00:00:00 2001 From: Rossil <40714231+Rossil2012@users.noreply.github.com> Date: Tue, 26 Sep 2023 19:59:21 +0800 Subject: [PATCH 4/6] refactor list executor --- src/connector/src/source/base.rs | 2 +- .../src/source/filesystem/file_common.rs | 6 +- src/connector/src/source/filesystem/mod.rs | 2 +- .../src/source/filesystem/s3_v2/lister.rs | 10 +- src/source/src/connector_source.rs | 2 +- .../src/executor/source_v2/list_executor.rs | 108 +++++++----------- .../src/from_proto/source/trad_source.rs | 19 ++- 7 files changed, 73 insertions(+), 76 deletions(-) diff --git a/src/connector/src/source/base.rs b/src/connector/src/source/base.rs index 111ec6e2eacad..e5fc1ce0f5884 100644 --- a/src/connector/src/source/base.rs +++ b/src/connector/src/source/base.rs @@ -515,7 +515,7 @@ pub trait SourceLister: Sized { type Properties; async fn new(properties: Self::Properties) -> Result; - fn paginate(self) -> BoxTryStream>; + fn paginate(self) -> BoxTryStream; } #[cfg(test)] diff --git a/src/connector/src/source/filesystem/file_common.rs b/src/connector/src/source/filesystem/file_common.rs index 2d816e52ccc77..e5b29a02e1d58 100644 --- a/src/connector/src/source/filesystem/file_common.rs +++ b/src/connector/src/source/filesystem/file_common.rs @@ -56,13 +56,13 @@ impl FsSplit { } } -pub struct FsPage { +pub struct FsPageItem { pub name: String, pub size: usize, pub timestamp: Timestamp, } -impl FsPage { +impl FsPageItem { pub fn new(name: String, size: usize, timestamp: Timestamp) -> Self { Self { name, @@ -71,3 +71,5 @@ impl FsPage { } } } + +pub type FsPage = Vec; diff --git a/src/connector/src/source/filesystem/mod.rs b/src/connector/src/source/filesystem/mod.rs index fa5130406516f..8f2587384280b 100644 --- a/src/connector/src/source/filesystem/mod.rs +++ b/src/connector/src/source/filesystem/mod.rs @@ -16,7 +16,7 @@ pub use s3::{S3FileReader, S3Properties, S3SplitEnumerator, S3_CONNECTOR}; mod file_common; pub mod nd_streaming; -pub use file_common::{FsPage, FsSplit}; +pub use file_common::{FsPage, FsPageItem, FsSplit}; mod s3; pub mod s3_v2; pub const S3_V2_CONNECTOR: &str = "s3_v2"; diff --git a/src/connector/src/source/filesystem/s3_v2/lister.rs b/src/connector/src/source/filesystem/s3_v2/lister.rs index ae466e72f6ea7..c4212cafdfacd 100644 --- a/src/connector/src/source/filesystem/s3_v2/lister.rs +++ b/src/connector/src/source/filesystem/s3_v2/lister.rs @@ -24,7 +24,7 @@ use risingwave_common::types::Timestamp; use crate::aws_auth::AwsAuthProps; use crate::aws_utils::{default_conn_config, s3_client}; use crate::source::filesystem::file_common::{FsPage, FsSplit}; -use crate::source::filesystem::S3Properties; +use crate::source::filesystem::{S3Properties, FsPageItem}; use crate::source::{BoxTryStream, SourceLister}; /// Get the prefix from a glob @@ -69,9 +69,9 @@ pub struct S3SourceLister { } impl S3SourceLister { - #[try_stream(boxed, ok = Vec, error = RwError)] + #[try_stream(boxed, ok = FsPage, error = RwError)] async fn paginate_inner(self) { - 'round: loop { // start a new round + loop { // start a new round let mut next_continuation_token = None; 'truncated: loop { // loop to paginate let mut req = self @@ -102,7 +102,7 @@ impl S3SourceLister { .into_iter() .map(|obj| { let aws_ts = obj.last_modified().unwrap(); - FsPage::new( + FsPageItem::new( obj.key().unwrap().to_owned(), obj.size() as usize, Timestamp::from_timestamp_uncheck(aws_ts.secs(), aws_ts.subsec_nanos()), @@ -146,7 +146,7 @@ impl SourceLister for S3SourceLister { }) } - fn paginate(self) -> BoxTryStream> { + fn paginate(self) -> BoxTryStream { self.paginate_inner() } } diff --git a/src/source/src/connector_source.rs b/src/source/src/connector_source.rs index 90c2259ef1003..a9469924add82 100644 --- a/src/source/src/connector_source.rs +++ b/src/source/src/connector_source.rs @@ -76,7 +76,7 @@ impl ConnectorSource { .collect::>>() } - pub async fn source_lister(&self) -> Result>> { + pub async fn source_lister(&self) -> Result> { let config = self.config.clone(); let lister = match config { ConnectorProperties::S3(prop) => S3SourceLister::new(*prop).await?, diff --git a/src/stream/src/executor/source_v2/list_executor.rs b/src/stream/src/executor/source_v2/list_executor.rs index 9b73ee1a20157..eea7798634a1c 100644 --- a/src/stream/src/executor/source_v2/list_executor.rs +++ b/src/stream/src/executor/source_v2/list_executor.rs @@ -22,9 +22,7 @@ use risingwave_common::array::Op; use risingwave_common::catalog::Schema; use risingwave_common::system_param::local_manager::SystemParamsReaderRef; use risingwave_connector::source::filesystem::FsPage; -use risingwave_connector::source::{ - BoxTryStream, SourceCtrlOpts, StreamChunkWithState -}; +use risingwave_connector::source::{BoxTryStream, SourceCtrlOpts}; use risingwave_connector::ConnectorParams; use risingwave_source::source_desc::{SourceDesc, SourceDescBuilder}; use risingwave_storage::StateStore; @@ -34,7 +32,7 @@ use crate::executor::monitor::StreamingMetrics; use crate::executor::stream_reader::StreamReaderWithPause; use crate::executor::*; -pub struct ListExecutor { +pub struct FsListExecutor { actor_ctx: ActorContextRef, identity: String, @@ -62,7 +60,7 @@ pub struct ListExecutor { connector_params: ConnectorParams, } -impl ListExecutor { +impl FsListExecutor { #[allow(clippy::too_many_arguments)] pub fn new( actor_ctx: ActorContextRef, @@ -78,7 +76,7 @@ impl ListExecutor { ) -> Self { Self { actor_ctx, - identity: format!("ListExecutor {:X}", executor_id), + identity: format!("FsListExecutor {:X}", executor_id), schema, pk_indices, stream_source_core, @@ -90,46 +88,38 @@ impl ListExecutor { } } - async fn build_fs_source_lister( + async fn build_chunked_paginate_stream( &self, source_desc: &SourceDesc, - ) -> StreamExecutorResult>> { - source_desc + ) -> StreamExecutorResult> { + let stream = source_desc .source .source_lister() .await - .map_err(StreamExecutorError::connector_error) + .map_err(StreamExecutorError::connector_error)?; + + Ok(stream.map(|item| { + item.map(Self::map_fs_page_to_chunk) + }).boxed()) } - async fn fetch_one_page_chunk( - &self, - paginate_stream: &mut BoxTryStream>, - ) -> StreamExecutorResult { - match paginate_stream.next().await { - Some(Ok(page)) => { - let rows = page - .into_iter() - .map(|split| { - ( - Op::Insert, - OwnedRow::new(vec![ - Some(ScalarImpl::Utf8(split.name.into_boxed_str())), - Some(ScalarImpl::Timestamp(split.timestamp)), - Some(ScalarImpl::Int64(split.size as i64)), - ]), - ) - }) - .collect::>(); - Ok(StreamChunk::from_rows( - &rows, - &[DataType::Varchar, DataType::Timestamp, DataType::Int64], - )) - }, - Some(Err(err)) => Err(StreamExecutorError::connector_error(err)), - None => unreachable!(), // paginate_stream never ends - } + fn map_fs_page_to_chunk(page: FsPage) -> StreamChunk { + let rows = page + .into_iter() + .map(|split| {( + Op::Insert, + OwnedRow::new(vec![ + Some(ScalarImpl::Utf8(split.name.into_boxed_str())), + Some(ScalarImpl::Timestamp(split.timestamp)), + Some(ScalarImpl::Int64(split.size as i64)), + ]), + )}) + .collect::>(); + StreamChunk::from_rows( + &rows, + &[DataType::Varchar, DataType::Timestamp, DataType::Int64], + ) } - #[try_stream(ok = Message, error = StreamExecutorError)] async fn into_stream(mut self) { @@ -154,18 +144,15 @@ impl ListExecutor { .build() .map_err(StreamExecutorError::connector_error)?; - // TODO: init state store epoch - // Return the ownership of `stream_source_core` to the source executor. self.stream_source_core = Some(core); - // TODO: recover state - let mut paginate_stream = self.build_fs_source_lister(&source_desc).await?; + let chunked_paginate_stream = self.build_chunked_paginate_stream(&source_desc).await?; let barrier_stream = barrier_to_message_stream(barrier_receiver).boxed(); - let mut stream = StreamReaderWithPause::::new( + let mut stream = StreamReaderWithPause::::new( barrier_stream, - tokio_stream::pending().boxed(), + chunked_paginate_stream ); if barrier.is_pause_on_startup() { @@ -178,43 +165,34 @@ impl ListExecutor { match msg { Err(_) => (), Ok(msg) => match msg { + // Barrier arrives. Either::Left(msg) => match &msg { Message::Barrier(barrier) => { - let mut is_pause_resume = false; if let Some(mutation) = barrier.mutation.as_deref() { match mutation { - Mutation::Pause => { - stream.pause_stream(); - is_pause_resume = true; - }, - Mutation::Resume => { - stream.resume_stream(); - is_pause_resume = true; - } + Mutation::Pause => stream.pause_stream(), + Mutation::Resume => stream.resume_stream(), _ => (), } } - - if !is_pause_resume { - // TODO: persist some state here - let chunk = self.fetch_one_page_chunk(&mut paginate_stream).await?; - yield Message::Chunk(chunk); - } - - yield msg; // propagate the barrier + + // Propagate the barrier. + yield msg; } // Only barrier can be received. _ => unreachable!(), }, - // Right arm is always pending. - _ => unreachable!(), + // Chunked FsPage arrives. + Either::Right(chunk) => { + yield Message::Chunk(chunk); + } }, } } } } -impl Executor for ListExecutor { +impl Executor for FsListExecutor { fn execute(self: Box) -> BoxedMessageStream { self.into_stream().boxed() } @@ -296,7 +274,7 @@ mod tests { let system_params_manager = LocalSystemParamsManager::for_test(); - let executor = ListExecutor::new( + let executor = FsListExecutor::new( ActorContext::create(0), schema, pk_indices, diff --git a/src/stream/src/from_proto/source/trad_source.rs b/src/stream/src/from_proto/source/trad_source.rs index 77bbcc53e69c5..ee8ab9eac02a8 100644 --- a/src/stream/src/from_proto/source/trad_source.rs +++ b/src/stream/src/from_proto/source/trad_source.rs @@ -16,7 +16,7 @@ use risingwave_common::catalog::{ColumnId, Field, Schema, TableId}; use risingwave_common::types::DataType; use risingwave_common::util::sort_util::OrderType; use risingwave_connector::source::external::{ExternalTableType, SchemaTableName}; -use risingwave_connector::source::SourceCtrlOpts; +use risingwave_connector::source::{SourceCtrlOpts, S3_V2_CONNECTOR}; use risingwave_pb::stream_plan::SourceNode; use risingwave_source::source_desc::SourceDescBuilder; use risingwave_storage::panic_store::PanicStateStore; @@ -26,10 +26,12 @@ use super::*; use crate::executor::external::ExternalStorageTable; use crate::executor::source::StreamSourceCore; use crate::executor::source_executor::SourceExecutor; +use crate::executor::source_v2::list_executor::FsListExecutor; use crate::executor::state_table_handler::SourceStateTableHandler; use crate::executor::{CdcBackfillExecutor, FlowControlExecutor, FsSourceExecutor}; const FS_CONNECTORS: &[&str] = &["s3"]; +const FS_V2_CONNECTORS: &[&str] = &[S3_V2_CONNECTOR]; pub struct SourceExecutorBuilder; #[async_trait::async_trait] @@ -115,6 +117,7 @@ impl ExecutorBuilder for SourceExecutorBuilder { .map(|c| c.to_ascii_lowercase()) .unwrap_or_default(); let is_fs_connector = FS_CONNECTORS.contains(&connector.as_str()); + let is_fs_v2_connector = FS_V2_CONNECTORS.contains(&connector.as_str()); if is_fs_connector { FsSourceExecutor::new( @@ -129,6 +132,20 @@ impl ExecutorBuilder for SourceExecutorBuilder { source_ctrl_opts, )? .boxed() + } else if is_fs_v2_connector { + FsListExecutor::new( + params.actor_context.clone(), + schema.clone(), + params.pk_indices.clone(), + Some(stream_source_core), + params.executor_stats.clone(), + barrier_receiver, + system_params, + params.executor_id, + source_ctrl_opts.clone(), + params.env.connector_params(), + ) + .boxed() } else { let source_exec = SourceExecutor::new( params.actor_context.clone(), From f2a1d4ad6b9626e7979164c59362e2e388025728 Mon Sep 17 00:00:00 2001 From: Rossil <40714231+Rossil2012@users.noreply.github.com> Date: Thu, 28 Sep 2023 18:22:24 +0800 Subject: [PATCH 5/6] implement fetch executor --- src/connector/src/source/base.rs | 14 + .../src/source/filesystem/s3/source/reader.rs | 4 +- .../src/source/filesystem/s3_v2/mod.rs | 1 + .../src/source/filesystem/s3_v2/reader.rs | 92 +++++ src/source/src/connector_source.rs | 47 ++- .../src/executor/source_v2/fetch_executor.rs | 391 ++++++++++++++++++ src/stream/src/executor/source_v2/mod.rs | 1 + src/stream/src/from_proto/source/fs_fetch.rs | 74 +++- 8 files changed, 617 insertions(+), 7 deletions(-) create mode 100644 src/connector/src/source/filesystem/s3_v2/reader.rs create mode 100644 src/stream/src/executor/source_v2/fetch_executor.rs diff --git a/src/connector/src/source/base.rs b/src/connector/src/source/base.rs index e5fc1ce0f5884..4600a12a26424 100644 --- a/src/connector/src/source/base.rs +++ b/src/connector/src/source/base.rs @@ -518,6 +518,20 @@ pub trait SourceLister: Sized { fn paginate(self) -> BoxTryStream; } +#[async_trait] +pub trait SourceReader: Sized + Send { + type Properties; + + async fn new( + properties: Self::Properties, + parser_config: ParserConfig, + source_ctx: SourceContextRef, + columns: Option>, + ) -> Result; + + fn build_read_stream(&mut self, split: FsSplit) -> BoxSourceWithStateStream; +} + #[cfg(test)] mod tests { use maplit::*; diff --git a/src/connector/src/source/filesystem/s3/source/reader.rs b/src/connector/src/source/filesystem/s3/source/reader.rs index 736e4493d3f55..b1e368a2b409e 100644 --- a/src/connector/src/source/filesystem/s3/source/reader.rs +++ b/src/connector/src/source/filesystem/s3/source/reader.rs @@ -55,7 +55,7 @@ pub struct S3FileReader { impl S3FileReader { #[try_stream(boxed, ok = Vec, error = anyhow::Error)] - async fn stream_read_object( + pub async fn stream_read_object( client_for_s3: s3_client::Client, bucket_name: String, split: FsSplit, @@ -137,7 +137,7 @@ impl S3FileReader { } } - async fn get_object( + pub async fn get_object( client_for_s3: &s3_client::Client, bucket_name: &str, object_name: &str, diff --git a/src/connector/src/source/filesystem/s3_v2/mod.rs b/src/connector/src/source/filesystem/s3_v2/mod.rs index 6fab862daca1e..0bc457b97a27d 100644 --- a/src/connector/src/source/filesystem/s3_v2/mod.rs +++ b/src/connector/src/source/filesystem/s3_v2/mod.rs @@ -13,3 +13,4 @@ // limitations under the License. pub mod lister; +pub mod reader; diff --git a/src/connector/src/source/filesystem/s3_v2/reader.rs b/src/connector/src/source/filesystem/s3_v2/reader.rs new file mode 100644 index 0000000000000..378334dd64008 --- /dev/null +++ b/src/connector/src/source/filesystem/s3_v2/reader.rs @@ -0,0 +1,92 @@ +use anyhow::Result; +use async_trait::async_trait; +use aws_sdk_s3::client as s3_client; +use futures_async_stream::try_stream; +use risingwave_common::error::RwError; + +use crate::aws_auth::AwsAuthProps; +use crate::aws_utils::{s3_client, default_conn_config}; +use crate::parser::{ParserConfig, ByteStreamSourceParserImpl}; +use crate::source::{BoxSourceWithStateStream, StreamChunkWithState}; +use crate::source::base::SplitMetaData; +use crate::source::filesystem::{FsSplit, S3FileReader, nd_streaming}; +use crate::source::{SourceReader, filesystem::S3Properties, SourceContextRef, Column}; + +pub struct S3SourceReader { + bucket_name: String, + s3_client: s3_client::Client, + parser_config: ParserConfig, + source_ctx: SourceContextRef, +} + +impl S3SourceReader { + #[try_stream(boxed, ok = StreamChunkWithState, error = RwError)] + async fn build_read_stream_inner( + client_for_s3: s3_client::Client, + bucket_name: String, + source_ctx: SourceContextRef, + parser_config: ParserConfig, + split: FsSplit, + ) { + let split_id = split.id(); + let data_stream = S3FileReader::stream_read_object(client_for_s3, bucket_name, split, source_ctx.clone()); + let parser = ByteStreamSourceParserImpl::create(parser_config.clone(), source_ctx.clone()).await?; + let msg_stream = if matches!( + parser, + ByteStreamSourceParserImpl::Json(_) | ByteStreamSourceParserImpl::Csv(_) + ) { + parser.into_stream(nd_streaming::split_stream(data_stream)) + } else { + parser.into_stream(data_stream) + }; + + let actor_id = source_ctx.source_info.actor_id.to_string(); + let source_id = source_ctx.source_info.source_id.to_string(); + #[for_await] + for msg in msg_stream { + let msg = msg?; + source_ctx + .metrics + .partition_input_count + .with_label_values(&[&actor_id, &source_id, &split_id]) + .inc_by(msg.chunk.cardinality() as u64); + yield msg; + } + } +} + +#[async_trait] +impl SourceReader for S3SourceReader { + type Properties = S3Properties; + + async fn new( + properties: Self::Properties, + parser_config: ParserConfig, + source_ctx: SourceContextRef, + _columns: Option>, + ) -> Result { + let config = AwsAuthProps::from(&properties); + + let sdk_config = config.build_config().await?; + + let bucket_name = properties.bucket_name; + let s3_client = s3_client(&sdk_config, Some(default_conn_config())); + + Ok(S3SourceReader { + bucket_name, + s3_client, + parser_config, + source_ctx, + }) + } + + fn build_read_stream(&mut self, split: FsSplit) -> BoxSourceWithStateStream { + Self::build_read_stream_inner( + self.s3_client.clone(), + self.bucket_name.clone(), + self.source_ctx.clone(), + self.parser_config.clone(), + split + ) + } +} \ No newline at end of file diff --git a/src/source/src/connector_source.rs b/src/source/src/connector_source.rs index a9469924add82..2a7a513491673 100644 --- a/src/source/src/connector_source.rs +++ b/src/source/src/connector_source.rs @@ -26,10 +26,11 @@ use risingwave_common::util::select_all; use risingwave_connector::dispatch_source_prop; use risingwave_connector::parser::{CommonParserConfig, ParserConfig, SpecificParserConfig}; use risingwave_connector::source::filesystem::s3_v2::lister::S3SourceLister; -use risingwave_connector::source::filesystem::FsPage; +use risingwave_connector::source::filesystem::{FsPage, FsSplit}; +use risingwave_connector::source::filesystem::s3_v2::reader::S3SourceReader; use risingwave_connector::source::{ create_split_reader, BoxSourceWithStateStream, BoxTryStream, Column, ConnectorProperties, - ConnectorState, SourceColumnDesc, SourceContext, SourceLister, SplitReader, + ConnectorState, SourceColumnDesc, SourceContext, SourceLister, SourceReader, SplitReader, }; #[derive(Clone, Debug)] @@ -86,6 +87,48 @@ impl ConnectorSource { Ok(lister.paginate()) } + // TODO: reuse stream_reader, and using SourceContext + // to discriminate source v1/v2 + pub async fn source_reader( + &self, + column_ids: Vec, + source_ctx: Arc, + split: FsSplit, + ) -> Result { + let config = self.config.clone(); + let columns = self.get_target_columns(column_ids)?; + + let data_gen_columns = Some( + columns + .iter() + .map(|col| Column { + name: col.name.clone(), + data_type: col.data_type.clone(), + is_visible: col.is_visible(), + }) + .collect_vec(), + ); + + let parser_config = ParserConfig { + specific: self.parser_config.clone(), + common: CommonParserConfig { + rw_columns: columns, + }, + }; + + let mut reader = match config { + ConnectorProperties::S3(prop) => S3SourceReader::new( + *prop, + parser_config, + source_ctx, + data_gen_columns, + ).await?, + _ => unreachable!(), + }; + + Ok(reader.build_read_stream(split)) + } + pub async fn stream_reader( &self, state: ConnectorState, diff --git a/src/stream/src/executor/source_v2/fetch_executor.rs b/src/stream/src/executor/source_v2/fetch_executor.rs new file mode 100644 index 0000000000000..f403da53eda7d --- /dev/null +++ b/src/stream/src/executor/source_v2/fetch_executor.rs @@ -0,0 +1,391 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use either::Either; +use risingwave_common::row::Row; +use risingwave_common::types::{ScalarRef, ScalarRefImpl}; +use risingwave_common::util::epoch::EpochPair; +use risingwave_common::util::value_encoding::BasicSerde; +use risingwave_connector::source::{StreamChunkWithState, SplitMetaData, SplitImpl, SplitId}; +use risingwave_storage::StateStore; +use risingwave_storage::store::PrefetchOptions; +use std::collections::HashMap; +use std::collections::hash_map::Entry; +use std::pin::Pin; +use std::sync::Arc; + +use futures::stream::{self, StreamExt}; +use futures_async_stream::try_stream; +use risingwave_common::catalog::{Schema, ColumnId, TableId}; +use risingwave_connector::ConnectorParams; +use risingwave_connector::source::{SourceContext, BoxSourceWithStateStream, filesystem::FsSplit, SourceCtrlOpts}; +use risingwave_source::source_desc::{SourceDesc, SourceDescBuilder}; +use crate::common::table::state_table::KeyedRowStream; +use crate::executor::{StreamSourceCore, Mutation, SourceStateTableHandler}; +use crate::executor::{ + ActorContextRef, Executor, BoxedMessageStream, PkIndicesRef, PkIndices, Message, StreamExecutorError, + BoxedExecutor, expect_first_barrier, stream_reader::StreamReaderWithPause, StreamExecutorResult, +}; + +pub struct FsFetchExecutor { + actor_ctx: ActorContextRef, + + identity: String, + + schema: Schema, + + pk_indices: PkIndices, + + /// Streaming source for external + stream_source_core: Option>, + + /// Upstream list executor. + upstream: Option, + + // control options for connector level + source_ctrl_opts: SourceCtrlOpts, + + // config for the connector node + connector_params: ConnectorParams, +} + +impl FsFetchExecutor { + pub fn new( + actor_ctx: ActorContextRef, + schema: Schema, + pk_indices: PkIndices, + stream_source_core: StreamSourceCore, + executor_id: u64, + upstream: BoxedExecutor, + source_ctrl_opts: SourceCtrlOpts, + connector_params: ConnectorParams, + ) -> Self { + Self { + actor_ctx, + identity: format!("FsFetchExecutor {:X}", executor_id), + schema, + pk_indices, + stream_source_core: Some(stream_source_core), + upstream: Some(upstream), + source_ctrl_opts, + connector_params, + } + } + + async fn try_replace_with_new_reader<'a, const BIASED: bool>( + is_datastream_empty: &mut bool, + _state_store_handler: &'a SourceStateTableHandler, + state_cache: &mut HashMap, + column_ids: Vec, + source_ctx: SourceContext, + source_desc: &SourceDesc, + stream: &mut StreamReaderWithPause, + store_iter: &mut Pin>>, + ) -> StreamExecutorResult<()> { + let fs_split = + if let Some(item) = store_iter.next().await { + // Find the next assignment in state store. + let row = item?; + let split_id = match row.datum_at(0) { + Some(ScalarRefImpl::Utf8(split_id)) => split_id, + _ => unreachable!() + }; + let fs_split = match row.datum_at(1) { + Some(ScalarRefImpl::Jsonb(jsonb_ref)) => { + SplitImpl::restore_from_json(jsonb_ref.to_owned_scalar())? + .as_fs() + .unwrap() + .to_owned() + }, + _ => unreachable!() + }; + + // Cache the assignment retrieved from state store. + state_cache.insert(split_id.into(), fs_split.clone().into()); + Some(fs_split) + } else { + // Find incompleted assignment in state cache. + state_cache.iter().find(|(_, split)| { + let fs_split = split.as_fs().unwrap(); + fs_split.offset < fs_split.size + }) + .map(|(_, split)| split.as_fs().unwrap().to_owned()) + }; + + if let Some(fs_split) = fs_split { + stream.replace_data_stream( + Self::build_stream_source_reader(column_ids, source_ctx, source_desc, fs_split).await? + ); + *is_datastream_empty = false; + } else { + stream.replace_data_stream(stream::pending().boxed()); + *is_datastream_empty = true; + }; + + Ok(()) + } + + async fn take_snapshot_and_flush( + state_store_handler: &mut SourceStateTableHandler, + state_cache: &mut HashMap, + epoch: EpochPair, + ) -> StreamExecutorResult<()> { + let mut to_flush = Vec::new(); + let mut to_delete = Vec::new(); + state_cache + .iter() + .for_each(|(_, split)| { + let fs_split = split.as_fs().unwrap(); + if fs_split.offset >= fs_split.size { + // If read out, try delete in the state store + to_delete.push(split.to_owned()); + } else { + // Otherwise, flush to state store + to_flush.push(split.to_owned()); + } + }); + + state_store_handler.take_snapshot(to_flush).await?; + state_store_handler.trim_state(&to_delete).await?; + state_store_handler.state_store.commit(epoch).await?; + state_cache.clear(); + Ok(()) + } + + async fn build_stream_source_reader( + column_ids: Vec, + source_ctx: SourceContext, + source_desc: &SourceDesc, + split: FsSplit, + ) -> StreamExecutorResult { + source_desc + .source + .source_reader(column_ids, Arc::new(source_ctx), split) + .await + .map_err(StreamExecutorError::connector_error) + } + + fn build_source_ctx( + &self, + source_desc: &SourceDesc, + source_id: TableId, + ) -> SourceContext { + SourceContext::new_with_suppressor( + self.actor_ctx.id, + source_id, + self.actor_ctx.fragment_id, + source_desc.metrics.clone(), + self.source_ctrl_opts.clone(), + self.connector_params.connector_client.clone(), + self.actor_ctx.error_suppressor.clone(), + ) + } + + #[try_stream(ok = Message, error = StreamExecutorError)] + async fn into_stream(mut self) { + let mut upstream = self + .upstream + .take() + .unwrap() + .execute(); + let barrier = expect_first_barrier(&mut upstream).await?; + + let mut core = self.stream_source_core.take().unwrap(); + let mut state_store_handler = core.split_state_store; + let mut state_cache = core.state_cache; + + // Build source description from the builder. + let source_desc_builder: SourceDescBuilder = core + .source_desc_builder + .take() + .unwrap(); + + let source_desc = source_desc_builder + .build() + .map_err(StreamExecutorError::connector_error)?; + + // Initialize state store. + state_store_handler.init_epoch(barrier.epoch); + + let mut is_datastream_empty = true; + let mut stream = StreamReaderWithPause::::new( + upstream, + stream::pending().boxed() + ); + + if barrier.is_pause_on_startup() { + stream.pause_stream(); + } + + let mut store_iter = Box::pin( + state_store_handler + .state_store + .iter_row(PrefetchOptions::new_with_exhaust_iter(false)) + .await? + ); + + // If it is a recovery startup, + // there can be file assigments in state store. + // Hence we try to build a reader first. + Self::try_replace_with_new_reader( + &mut is_datastream_empty, + &state_store_handler, + &mut state_cache, + core.column_ids.clone(), + self.build_source_ctx(&source_desc, core.source_id), + &source_desc, + &mut stream, + &mut store_iter, + ).await?; + + yield Message::Barrier(barrier); + + while let Some(msg) = stream.next().await { + match msg { + Err(_) => { + todo!() + } + Ok(msg) => { + match msg { + // This branch will be preferred. + Either::Left(msg) => match &msg { + Message::Barrier(barrier) => { + if let Some(mutation) = barrier.mutation.as_deref() { + match mutation { + Mutation::Pause => stream.pause_stream(), + Mutation::Resume => stream.resume_stream(), + _ => (), + } + } + + drop(store_iter); + Self::take_snapshot_and_flush( + &mut state_store_handler, + &mut state_cache, + barrier.epoch, + ).await?; + + // Rebuild state store iterator. + store_iter = Box::pin( + state_store_handler + .state_store + .iter_row(PrefetchOptions::new_with_exhaust_iter(false)) + .await? + ); + + // Propagate the barrier. + yield msg; + }, + // Receiving file assignments from upstream list executor, + // store FsSplit into the cache. + Message::Chunk(chunk) => { + let file_assignment = chunk + .data_chunk() + .rows() + .map(|row| { + let filename = row.datum_at(0).unwrap().into_utf8(); + let size = row.datum_at(2).unwrap().into_int64(); + ( + Arc::::from(filename), + FsSplit::new(filename.to_owned(), 0, size as usize).into() + ) + }); + state_cache.extend(file_assignment); + + // When both of state cache and state store are empty, + // the right arm of stream is a pending stream, + // and is_datastream_empty is set to true. + // The new + if is_datastream_empty { + Self::try_replace_with_new_reader( + &mut is_datastream_empty, + &state_store_handler, + &mut state_cache, + core.column_ids.clone(), + self.build_source_ctx(&source_desc, core.source_id), + &source_desc, + &mut stream, + &mut store_iter, + ).await?; + } + }, + _ => unreachable!() + }, + // StreamChunk from FsSourceReader, and the reader reads only one file. + // If the file read out, replace with a new file reader. + Either::Right(StreamChunkWithState { + chunk, + split_offset_mapping, + }) => { + let mapping = split_offset_mapping.unwrap(); + debug_assert_eq!(mapping.len(), 1); + + // Get FsSplit in state cache. + let (split_id, offset) = mapping.iter().nth(0).unwrap(); + let mut cache_entry = match state_cache + .entry(split_id.to_owned()) { + Entry::Occupied(entry) => entry, + Entry::Vacant(_) => unreachable!() + }; + + // Update the offset in the state cache. + // If offset is equal to size, the entry + // will be deleted after the next barrier. + let offset = offset.parse().unwrap(); + let mut fs_split = cache_entry.get().to_owned().into_fs().unwrap(); + let fs_split_size = fs_split.size; + fs_split.offset = offset; + cache_entry.insert(fs_split.into()); + + // The file is read out, build a new reader. + if fs_split_size <= offset { + debug_assert_eq!(fs_split_size, offset); + Self::try_replace_with_new_reader( + &mut is_datastream_empty, + &state_store_handler, + &mut state_cache, + core.column_ids.clone(), + self.build_source_ctx(&source_desc, core.source_id), + &source_desc, + &mut stream, + &mut store_iter, + ).await?; + } + + yield Message::Chunk(chunk); + } + } + } + } + } + } +} + +impl Executor for FsFetchExecutor { + fn execute(self: Box) -> BoxedMessageStream { + self.into_stream().boxed() + } + + fn schema(&self) -> &Schema { + &self.schema + } + + fn pk_indices(&self) -> PkIndicesRef<'_> { + &self.pk_indices + } + + fn identity(&self) -> &str { + self.identity.as_str() + } +} \ No newline at end of file diff --git a/src/stream/src/executor/source_v2/mod.rs b/src/stream/src/executor/source_v2/mod.rs index a6cf4823bdd2f..8c3cb0a1a538a 100644 --- a/src/stream/src/executor/source_v2/mod.rs +++ b/src/stream/src/executor/source_v2/mod.rs @@ -13,3 +13,4 @@ // limitations under the License. pub mod list_executor; +pub mod fetch_executor; \ No newline at end of file diff --git a/src/stream/src/from_proto/source/fs_fetch.rs b/src/stream/src/from_proto/source/fs_fetch.rs index 14b01cc1ff861..60468ec6ed912 100644 --- a/src/stream/src/from_proto/source/fs_fetch.rs +++ b/src/stream/src/from_proto/source/fs_fetch.rs @@ -12,11 +12,16 @@ // See the License for the specific language governing permissions and // limitations under the License. +use risingwave_common::catalog::{Field, Schema, TableId, ColumnId}; +use risingwave_common::types::DataType; +use risingwave_connector::source::SourceCtrlOpts; use risingwave_pb::stream_plan::StreamFsFetchNode; +use risingwave_source::source_desc::SourceDescBuilder; use risingwave_storage::StateStore; use crate::error::StreamResult; -use crate::executor::BoxedExecutor; +use crate::executor::source_v2::fetch_executor::FsFetchExecutor; +use crate::executor::{BoxedExecutor, Executor, SourceStateTableHandler, StreamSourceCore}; use crate::from_proto::ExecutorBuilder; use crate::task::{ExecutorParams, LocalStreamManagerCore}; @@ -30,8 +35,71 @@ impl ExecutorBuilder for FsFetchExecutorBuilder { params: ExecutorParams, node: &Self::Node, store: impl StateStore, - stream: &mut LocalStreamManagerCore, + _stream: &mut LocalStreamManagerCore, ) -> StreamResult { - todo!() + let [upstream]: [_; 1] = params.input.try_into().unwrap(); + + let source = node.node_inner.as_ref().unwrap(); + + let source_id = TableId::new(source.source_id); + let source_name = source.source_name.clone(); + let source_info = source.get_info()?; + + let source_desc_builder = SourceDescBuilder::new( + source.columns.clone(), + params.env.source_metrics(), + source.row_id_index.map(|x| x as _), + source.properties.clone(), + source_info.clone(), + params.env.connector_params(), + params.env.config().developer.connector_message_buffer_size, + params.pk_indices.clone(), + ); + + let source_ctrl_opts = SourceCtrlOpts { + chunk_size: params.env.config().developer.chunk_size, + }; + + let column_ids: Vec<_> = source + .columns + .iter() + .map(|column| ColumnId::from(column.get_column_desc().unwrap().column_id)) + .collect(); + let fields = source + .columns + .iter() + .map(|prost| { + let column_desc = prost.column_desc.as_ref().unwrap(); + let data_type = DataType::from(column_desc.column_type.as_ref().unwrap()); + let name = column_desc.name.clone(); + Field::with_name(data_type, name) + }) + .collect(); + let schema = Schema::new(fields); + + let state_table_handler = SourceStateTableHandler::from_table_catalog( + source.state_table.as_ref().unwrap(), + store.clone(), + ) + .await; + let stream_source_core = StreamSourceCore::new( + source_id, + source_name, + column_ids, + source_desc_builder, + state_table_handler, + ); + + Ok(FsFetchExecutor::new( + params.actor_context, + schema, + params.pk_indices, + stream_source_core, + params.executor_id, + upstream, + source_ctrl_opts, + params.env.connector_params(), + ) + .boxed()) } } From 04d10102e8b5678e29aaddc96958e47da15d4eb7 Mon Sep 17 00:00:00 2001 From: Rossil <40714231+Rossil2012@users.noreply.github.com> Date: Thu, 28 Sep 2023 18:47:24 +0800 Subject: [PATCH 6/6] fix typo --- src/stream/src/executor/source_v2/fetch_executor.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/stream/src/executor/source_v2/fetch_executor.rs b/src/stream/src/executor/source_v2/fetch_executor.rs index f403da53eda7d..e4d62865bca0d 100644 --- a/src/stream/src/executor/source_v2/fetch_executor.rs +++ b/src/stream/src/executor/source_v2/fetch_executor.rs @@ -236,7 +236,7 @@ impl FsFetchExecutor { ); // If it is a recovery startup, - // there can be file assigments in state store. + // there can be file assignments in state store. // Hence we try to build a reader first. Self::try_replace_with_new_reader( &mut is_datastream_empty,