diff --git a/src/connector/src/source/base.rs b/src/connector/src/source/base.rs index 22723f022acce..4600a12a26424 100644 --- a/src/connector/src/source/base.rs +++ b/src/connector/src/source/base.rs @@ -33,7 +33,7 @@ use risingwave_rpc_client::ConnectorClient; use serde::de::DeserializeOwned; use super::datagen::DatagenMeta; -use super::filesystem::FsSplit; +use super::filesystem::{FsPage, FsSplit}; use super::google_pubsub::GooglePubsubMeta; use super::kafka::KafkaMeta; use super::monitor::SourceMetrics; @@ -509,6 +509,29 @@ pub trait SplitMetaData: Sized { /// [`None`] and the created source stream will be a pending stream. pub type ConnectorState = Option>; +#[async_trait] +pub trait SourceLister: Sized { + type Split: SplitMetaData + Send; + type Properties; + + async fn new(properties: Self::Properties) -> Result; + fn paginate(self) -> BoxTryStream; +} + +#[async_trait] +pub trait SourceReader: Sized + Send { + type Properties; + + async fn new( + properties: Self::Properties, + parser_config: ParserConfig, + source_ctx: SourceContextRef, + columns: Option>, + ) -> Result; + + fn build_read_stream(&mut self, split: FsSplit) -> BoxSourceWithStateStream; +} + #[cfg(test)] mod tests { use maplit::*; diff --git a/src/connector/src/source/filesystem/file_common.rs b/src/connector/src/source/filesystem/file_common.rs index d4328289b547f..e5b29a02e1d58 100644 --- a/src/connector/src/source/filesystem/file_common.rs +++ b/src/connector/src/source/filesystem/file_common.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. use anyhow::anyhow; -use risingwave_common::types::JsonbVal; +use risingwave_common::types::{JsonbVal, Timestamp}; use serde::{Deserialize, Serialize}; use crate::source::{SplitId, SplitMetaData}; @@ -55,3 +55,21 @@ impl FsSplit { } } } + +pub struct FsPageItem { + pub name: String, + pub size: usize, + pub timestamp: Timestamp, +} + +impl FsPageItem { + pub fn new(name: String, size: usize, timestamp: Timestamp) -> Self { + Self { + name, + size, + timestamp, + } + } +} + +pub type FsPage = Vec; diff --git a/src/connector/src/source/filesystem/mod.rs b/src/connector/src/source/filesystem/mod.rs index fbe59a11a889f..8f2587384280b 100644 --- a/src/connector/src/source/filesystem/mod.rs +++ b/src/connector/src/source/filesystem/mod.rs @@ -16,7 +16,7 @@ pub use s3::{S3FileReader, S3Properties, S3SplitEnumerator, S3_CONNECTOR}; mod file_common; pub mod nd_streaming; -pub use file_common::FsSplit; +pub use file_common::{FsPage, FsPageItem, FsSplit}; mod s3; - +pub mod s3_v2; pub const S3_V2_CONNECTOR: &str = "s3_v2"; diff --git a/src/connector/src/source/filesystem/s3/source/reader.rs b/src/connector/src/source/filesystem/s3/source/reader.rs index 736e4493d3f55..b1e368a2b409e 100644 --- a/src/connector/src/source/filesystem/s3/source/reader.rs +++ b/src/connector/src/source/filesystem/s3/source/reader.rs @@ -55,7 +55,7 @@ pub struct S3FileReader { impl S3FileReader { #[try_stream(boxed, ok = Vec, error = anyhow::Error)] - async fn stream_read_object( + pub async fn stream_read_object( client_for_s3: s3_client::Client, bucket_name: String, split: FsSplit, @@ -137,7 +137,7 @@ impl S3FileReader { } } - async fn get_object( + pub async fn get_object( client_for_s3: &s3_client::Client, bucket_name: &str, object_name: &str, diff --git a/src/connector/src/source/filesystem/s3_v2/lister.rs b/src/connector/src/source/filesystem/s3_v2/lister.rs new file mode 100644 index 0000000000000..c4212cafdfacd --- /dev/null +++ b/src/connector/src/source/filesystem/s3_v2/lister.rs @@ -0,0 +1,152 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use anyhow::{anyhow, Context, Result}; +use async_trait::async_trait; +use aws_sdk_s3::error::DisplayErrorContext; +use aws_sdk_s3::Client; +use futures_async_stream::try_stream; +use itertools::Itertools; +use risingwave_common::error::RwError; +use risingwave_common::types::Timestamp; + +use crate::aws_auth::AwsAuthProps; +use crate::aws_utils::{default_conn_config, s3_client}; +use crate::source::filesystem::file_common::{FsPage, FsSplit}; +use crate::source::filesystem::{S3Properties, FsPageItem}; +use crate::source::{BoxTryStream, SourceLister}; + +/// Get the prefix from a glob +fn get_prefix(glob: &str) -> String { + let mut escaped = false; + let mut escaped_filter = false; + glob.chars() + .take_while(|c| match (c, &escaped) { + ('*', false) => false, + ('[', false) => false, + ('{', false) => false, + ('\\', false) => { + escaped = true; + true + } + (_, false) => true, + (_, true) => { + escaped = false; + true + } + }) + .filter(|c| match (c, &escaped_filter) { + (_, true) => { + escaped_filter = false; + true + } + ('\\', false) => { + escaped_filter = true; + false + } + (_, _) => true, + }) + .collect() +} + +pub struct S3SourceLister { + client: Client, + // prefix is used to reduce the number of objects to be listed + prefix: Option, + matcher: Option, + bucket_name: String, +} + +impl S3SourceLister { + #[try_stream(boxed, ok = FsPage, error = RwError)] + async fn paginate_inner(self) { + loop { // start a new round + let mut next_continuation_token = None; + 'truncated: loop { // loop to paginate + let mut req = self + .client + .list_objects_v2() + .bucket(&self.bucket_name) + .set_prefix(self.prefix.clone()); + if let Some(continuation_token) = next_continuation_token.take() { + req = req.continuation_token(continuation_token); + } + let mut res = req + .send() + .await + .map_err(|e| anyhow!(DisplayErrorContext(e)))?; + + yield res + .contents + .take() + .unwrap_or_default() + .iter() + .filter(|obj| obj.key().is_some()) + .filter(|obj| { + self.matcher + .as_ref() + .map(|m| m.matches(obj.key().unwrap())) + .unwrap_or(true) + }) + .into_iter() + .map(|obj| { + let aws_ts = obj.last_modified().unwrap(); + FsPageItem::new( + obj.key().unwrap().to_owned(), + obj.size() as usize, + Timestamp::from_timestamp_uncheck(aws_ts.secs(), aws_ts.subsec_nanos()), + ) + }) + .collect_vec(); + + if res.is_truncated() { + next_continuation_token = Some(res.next_continuation_token.unwrap()) + } else { + break 'truncated; + } + } + } + } +} + +#[async_trait] +impl SourceLister for S3SourceLister { + type Properties = S3Properties; + type Split = FsSplit; + + async fn new(properties: Self::Properties) -> Result { + let config = AwsAuthProps::from(&properties); + let sdk_config = config.build_config().await?; + let s3_client = s3_client(&sdk_config, Some(default_conn_config())); + let (prefix, matcher) = if let Some(pattern) = properties.match_pattern.as_ref() { + let prefix = get_prefix(pattern); + let matcher = glob::Pattern::new(pattern) + .with_context(|| format!("Invalid match_pattern: {}", pattern))?; + (Some(prefix), Some(matcher)) + } else { + (None, None) + }; + + Ok(Self { + bucket_name: properties.bucket_name, + matcher, + prefix, + client: s3_client, + }) + } + + fn paginate(self) -> BoxTryStream { + self.paginate_inner() + } +} diff --git a/src/connector/src/source/filesystem/s3_v2/mod.rs b/src/connector/src/source/filesystem/s3_v2/mod.rs new file mode 100644 index 0000000000000..0bc457b97a27d --- /dev/null +++ b/src/connector/src/source/filesystem/s3_v2/mod.rs @@ -0,0 +1,16 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod lister; +pub mod reader; diff --git a/src/connector/src/source/filesystem/s3_v2/reader.rs b/src/connector/src/source/filesystem/s3_v2/reader.rs new file mode 100644 index 0000000000000..378334dd64008 --- /dev/null +++ b/src/connector/src/source/filesystem/s3_v2/reader.rs @@ -0,0 +1,92 @@ +use anyhow::Result; +use async_trait::async_trait; +use aws_sdk_s3::client as s3_client; +use futures_async_stream::try_stream; +use risingwave_common::error::RwError; + +use crate::aws_auth::AwsAuthProps; +use crate::aws_utils::{s3_client, default_conn_config}; +use crate::parser::{ParserConfig, ByteStreamSourceParserImpl}; +use crate::source::{BoxSourceWithStateStream, StreamChunkWithState}; +use crate::source::base::SplitMetaData; +use crate::source::filesystem::{FsSplit, S3FileReader, nd_streaming}; +use crate::source::{SourceReader, filesystem::S3Properties, SourceContextRef, Column}; + +pub struct S3SourceReader { + bucket_name: String, + s3_client: s3_client::Client, + parser_config: ParserConfig, + source_ctx: SourceContextRef, +} + +impl S3SourceReader { + #[try_stream(boxed, ok = StreamChunkWithState, error = RwError)] + async fn build_read_stream_inner( + client_for_s3: s3_client::Client, + bucket_name: String, + source_ctx: SourceContextRef, + parser_config: ParserConfig, + split: FsSplit, + ) { + let split_id = split.id(); + let data_stream = S3FileReader::stream_read_object(client_for_s3, bucket_name, split, source_ctx.clone()); + let parser = ByteStreamSourceParserImpl::create(parser_config.clone(), source_ctx.clone()).await?; + let msg_stream = if matches!( + parser, + ByteStreamSourceParserImpl::Json(_) | ByteStreamSourceParserImpl::Csv(_) + ) { + parser.into_stream(nd_streaming::split_stream(data_stream)) + } else { + parser.into_stream(data_stream) + }; + + let actor_id = source_ctx.source_info.actor_id.to_string(); + let source_id = source_ctx.source_info.source_id.to_string(); + #[for_await] + for msg in msg_stream { + let msg = msg?; + source_ctx + .metrics + .partition_input_count + .with_label_values(&[&actor_id, &source_id, &split_id]) + .inc_by(msg.chunk.cardinality() as u64); + yield msg; + } + } +} + +#[async_trait] +impl SourceReader for S3SourceReader { + type Properties = S3Properties; + + async fn new( + properties: Self::Properties, + parser_config: ParserConfig, + source_ctx: SourceContextRef, + _columns: Option>, + ) -> Result { + let config = AwsAuthProps::from(&properties); + + let sdk_config = config.build_config().await?; + + let bucket_name = properties.bucket_name; + let s3_client = s3_client(&sdk_config, Some(default_conn_config())); + + Ok(S3SourceReader { + bucket_name, + s3_client, + parser_config, + source_ctx, + }) + } + + fn build_read_stream(&mut self, split: FsSplit) -> BoxSourceWithStateStream { + Self::build_read_stream_inner( + self.s3_client.clone(), + self.bucket_name.clone(), + self.source_ctx.clone(), + self.parser_config.clone(), + split + ) + } +} \ No newline at end of file diff --git a/src/source/src/connector_source.rs b/src/source/src/connector_source.rs index 445bf0f6dbb90..2a7a513491673 100644 --- a/src/source/src/connector_source.rs +++ b/src/source/src/connector_source.rs @@ -25,9 +25,12 @@ use risingwave_common::error::{internal_error, Result}; use risingwave_common::util::select_all; use risingwave_connector::dispatch_source_prop; use risingwave_connector::parser::{CommonParserConfig, ParserConfig, SpecificParserConfig}; +use risingwave_connector::source::filesystem::s3_v2::lister::S3SourceLister; +use risingwave_connector::source::filesystem::{FsPage, FsSplit}; +use risingwave_connector::source::filesystem::s3_v2::reader::S3SourceReader; use risingwave_connector::source::{ - create_split_reader, BoxSourceWithStateStream, Column, ConnectorProperties, ConnectorState, - SourceColumnDesc, SourceContext, SplitReader, + create_split_reader, BoxSourceWithStateStream, BoxTryStream, Column, ConnectorProperties, + ConnectorState, SourceColumnDesc, SourceContext, SourceLister, SourceReader, SplitReader, }; #[derive(Clone, Debug)] @@ -74,6 +77,58 @@ impl ConnectorSource { .collect::>>() } + pub async fn source_lister(&self) -> Result> { + let config = self.config.clone(); + let lister = match config { + ConnectorProperties::S3(prop) => S3SourceLister::new(*prop).await?, + _ => unreachable!(), + }; + + Ok(lister.paginate()) + } + + // TODO: reuse stream_reader, and using SourceContext + // to discriminate source v1/v2 + pub async fn source_reader( + &self, + column_ids: Vec, + source_ctx: Arc, + split: FsSplit, + ) -> Result { + let config = self.config.clone(); + let columns = self.get_target_columns(column_ids)?; + + let data_gen_columns = Some( + columns + .iter() + .map(|col| Column { + name: col.name.clone(), + data_type: col.data_type.clone(), + is_visible: col.is_visible(), + }) + .collect_vec(), + ); + + let parser_config = ParserConfig { + specific: self.parser_config.clone(), + common: CommonParserConfig { + rw_columns: columns, + }, + }; + + let mut reader = match config { + ConnectorProperties::S3(prop) => S3SourceReader::new( + *prop, + parser_config, + source_ctx, + data_gen_columns, + ).await?, + _ => unreachable!(), + }; + + Ok(reader.build_read_stream(split)) + } + pub async fn stream_reader( &self, state: ConnectorState, diff --git a/src/stream/src/executor/mod.rs b/src/stream/src/executor/mod.rs index 8fa7a5d818cc4..6fcd928e2c523 100644 --- a/src/stream/src/executor/mod.rs +++ b/src/stream/src/executor/mod.rs @@ -87,6 +87,7 @@ mod sink; mod sort; mod sort_buffer; pub mod source; +pub mod source_v2; mod stateless_simple_agg; mod stream_reader; pub mod subtask; diff --git a/src/stream/src/executor/source_v2/fetch_executor.rs b/src/stream/src/executor/source_v2/fetch_executor.rs new file mode 100644 index 0000000000000..e4d62865bca0d --- /dev/null +++ b/src/stream/src/executor/source_v2/fetch_executor.rs @@ -0,0 +1,391 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use either::Either; +use risingwave_common::row::Row; +use risingwave_common::types::{ScalarRef, ScalarRefImpl}; +use risingwave_common::util::epoch::EpochPair; +use risingwave_common::util::value_encoding::BasicSerde; +use risingwave_connector::source::{StreamChunkWithState, SplitMetaData, SplitImpl, SplitId}; +use risingwave_storage::StateStore; +use risingwave_storage::store::PrefetchOptions; +use std::collections::HashMap; +use std::collections::hash_map::Entry; +use std::pin::Pin; +use std::sync::Arc; + +use futures::stream::{self, StreamExt}; +use futures_async_stream::try_stream; +use risingwave_common::catalog::{Schema, ColumnId, TableId}; +use risingwave_connector::ConnectorParams; +use risingwave_connector::source::{SourceContext, BoxSourceWithStateStream, filesystem::FsSplit, SourceCtrlOpts}; +use risingwave_source::source_desc::{SourceDesc, SourceDescBuilder}; +use crate::common::table::state_table::KeyedRowStream; +use crate::executor::{StreamSourceCore, Mutation, SourceStateTableHandler}; +use crate::executor::{ + ActorContextRef, Executor, BoxedMessageStream, PkIndicesRef, PkIndices, Message, StreamExecutorError, + BoxedExecutor, expect_first_barrier, stream_reader::StreamReaderWithPause, StreamExecutorResult, +}; + +pub struct FsFetchExecutor { + actor_ctx: ActorContextRef, + + identity: String, + + schema: Schema, + + pk_indices: PkIndices, + + /// Streaming source for external + stream_source_core: Option>, + + /// Upstream list executor. + upstream: Option, + + // control options for connector level + source_ctrl_opts: SourceCtrlOpts, + + // config for the connector node + connector_params: ConnectorParams, +} + +impl FsFetchExecutor { + pub fn new( + actor_ctx: ActorContextRef, + schema: Schema, + pk_indices: PkIndices, + stream_source_core: StreamSourceCore, + executor_id: u64, + upstream: BoxedExecutor, + source_ctrl_opts: SourceCtrlOpts, + connector_params: ConnectorParams, + ) -> Self { + Self { + actor_ctx, + identity: format!("FsFetchExecutor {:X}", executor_id), + schema, + pk_indices, + stream_source_core: Some(stream_source_core), + upstream: Some(upstream), + source_ctrl_opts, + connector_params, + } + } + + async fn try_replace_with_new_reader<'a, const BIASED: bool>( + is_datastream_empty: &mut bool, + _state_store_handler: &'a SourceStateTableHandler, + state_cache: &mut HashMap, + column_ids: Vec, + source_ctx: SourceContext, + source_desc: &SourceDesc, + stream: &mut StreamReaderWithPause, + store_iter: &mut Pin>>, + ) -> StreamExecutorResult<()> { + let fs_split = + if let Some(item) = store_iter.next().await { + // Find the next assignment in state store. + let row = item?; + let split_id = match row.datum_at(0) { + Some(ScalarRefImpl::Utf8(split_id)) => split_id, + _ => unreachable!() + }; + let fs_split = match row.datum_at(1) { + Some(ScalarRefImpl::Jsonb(jsonb_ref)) => { + SplitImpl::restore_from_json(jsonb_ref.to_owned_scalar())? + .as_fs() + .unwrap() + .to_owned() + }, + _ => unreachable!() + }; + + // Cache the assignment retrieved from state store. + state_cache.insert(split_id.into(), fs_split.clone().into()); + Some(fs_split) + } else { + // Find incompleted assignment in state cache. + state_cache.iter().find(|(_, split)| { + let fs_split = split.as_fs().unwrap(); + fs_split.offset < fs_split.size + }) + .map(|(_, split)| split.as_fs().unwrap().to_owned()) + }; + + if let Some(fs_split) = fs_split { + stream.replace_data_stream( + Self::build_stream_source_reader(column_ids, source_ctx, source_desc, fs_split).await? + ); + *is_datastream_empty = false; + } else { + stream.replace_data_stream(stream::pending().boxed()); + *is_datastream_empty = true; + }; + + Ok(()) + } + + async fn take_snapshot_and_flush( + state_store_handler: &mut SourceStateTableHandler, + state_cache: &mut HashMap, + epoch: EpochPair, + ) -> StreamExecutorResult<()> { + let mut to_flush = Vec::new(); + let mut to_delete = Vec::new(); + state_cache + .iter() + .for_each(|(_, split)| { + let fs_split = split.as_fs().unwrap(); + if fs_split.offset >= fs_split.size { + // If read out, try delete in the state store + to_delete.push(split.to_owned()); + } else { + // Otherwise, flush to state store + to_flush.push(split.to_owned()); + } + }); + + state_store_handler.take_snapshot(to_flush).await?; + state_store_handler.trim_state(&to_delete).await?; + state_store_handler.state_store.commit(epoch).await?; + state_cache.clear(); + Ok(()) + } + + async fn build_stream_source_reader( + column_ids: Vec, + source_ctx: SourceContext, + source_desc: &SourceDesc, + split: FsSplit, + ) -> StreamExecutorResult { + source_desc + .source + .source_reader(column_ids, Arc::new(source_ctx), split) + .await + .map_err(StreamExecutorError::connector_error) + } + + fn build_source_ctx( + &self, + source_desc: &SourceDesc, + source_id: TableId, + ) -> SourceContext { + SourceContext::new_with_suppressor( + self.actor_ctx.id, + source_id, + self.actor_ctx.fragment_id, + source_desc.metrics.clone(), + self.source_ctrl_opts.clone(), + self.connector_params.connector_client.clone(), + self.actor_ctx.error_suppressor.clone(), + ) + } + + #[try_stream(ok = Message, error = StreamExecutorError)] + async fn into_stream(mut self) { + let mut upstream = self + .upstream + .take() + .unwrap() + .execute(); + let barrier = expect_first_barrier(&mut upstream).await?; + + let mut core = self.stream_source_core.take().unwrap(); + let mut state_store_handler = core.split_state_store; + let mut state_cache = core.state_cache; + + // Build source description from the builder. + let source_desc_builder: SourceDescBuilder = core + .source_desc_builder + .take() + .unwrap(); + + let source_desc = source_desc_builder + .build() + .map_err(StreamExecutorError::connector_error)?; + + // Initialize state store. + state_store_handler.init_epoch(barrier.epoch); + + let mut is_datastream_empty = true; + let mut stream = StreamReaderWithPause::::new( + upstream, + stream::pending().boxed() + ); + + if barrier.is_pause_on_startup() { + stream.pause_stream(); + } + + let mut store_iter = Box::pin( + state_store_handler + .state_store + .iter_row(PrefetchOptions::new_with_exhaust_iter(false)) + .await? + ); + + // If it is a recovery startup, + // there can be file assignments in state store. + // Hence we try to build a reader first. + Self::try_replace_with_new_reader( + &mut is_datastream_empty, + &state_store_handler, + &mut state_cache, + core.column_ids.clone(), + self.build_source_ctx(&source_desc, core.source_id), + &source_desc, + &mut stream, + &mut store_iter, + ).await?; + + yield Message::Barrier(barrier); + + while let Some(msg) = stream.next().await { + match msg { + Err(_) => { + todo!() + } + Ok(msg) => { + match msg { + // This branch will be preferred. + Either::Left(msg) => match &msg { + Message::Barrier(barrier) => { + if let Some(mutation) = barrier.mutation.as_deref() { + match mutation { + Mutation::Pause => stream.pause_stream(), + Mutation::Resume => stream.resume_stream(), + _ => (), + } + } + + drop(store_iter); + Self::take_snapshot_and_flush( + &mut state_store_handler, + &mut state_cache, + barrier.epoch, + ).await?; + + // Rebuild state store iterator. + store_iter = Box::pin( + state_store_handler + .state_store + .iter_row(PrefetchOptions::new_with_exhaust_iter(false)) + .await? + ); + + // Propagate the barrier. + yield msg; + }, + // Receiving file assignments from upstream list executor, + // store FsSplit into the cache. + Message::Chunk(chunk) => { + let file_assignment = chunk + .data_chunk() + .rows() + .map(|row| { + let filename = row.datum_at(0).unwrap().into_utf8(); + let size = row.datum_at(2).unwrap().into_int64(); + ( + Arc::::from(filename), + FsSplit::new(filename.to_owned(), 0, size as usize).into() + ) + }); + state_cache.extend(file_assignment); + + // When both of state cache and state store are empty, + // the right arm of stream is a pending stream, + // and is_datastream_empty is set to true. + // The new + if is_datastream_empty { + Self::try_replace_with_new_reader( + &mut is_datastream_empty, + &state_store_handler, + &mut state_cache, + core.column_ids.clone(), + self.build_source_ctx(&source_desc, core.source_id), + &source_desc, + &mut stream, + &mut store_iter, + ).await?; + } + }, + _ => unreachable!() + }, + // StreamChunk from FsSourceReader, and the reader reads only one file. + // If the file read out, replace with a new file reader. + Either::Right(StreamChunkWithState { + chunk, + split_offset_mapping, + }) => { + let mapping = split_offset_mapping.unwrap(); + debug_assert_eq!(mapping.len(), 1); + + // Get FsSplit in state cache. + let (split_id, offset) = mapping.iter().nth(0).unwrap(); + let mut cache_entry = match state_cache + .entry(split_id.to_owned()) { + Entry::Occupied(entry) => entry, + Entry::Vacant(_) => unreachable!() + }; + + // Update the offset in the state cache. + // If offset is equal to size, the entry + // will be deleted after the next barrier. + let offset = offset.parse().unwrap(); + let mut fs_split = cache_entry.get().to_owned().into_fs().unwrap(); + let fs_split_size = fs_split.size; + fs_split.offset = offset; + cache_entry.insert(fs_split.into()); + + // The file is read out, build a new reader. + if fs_split_size <= offset { + debug_assert_eq!(fs_split_size, offset); + Self::try_replace_with_new_reader( + &mut is_datastream_empty, + &state_store_handler, + &mut state_cache, + core.column_ids.clone(), + self.build_source_ctx(&source_desc, core.source_id), + &source_desc, + &mut stream, + &mut store_iter, + ).await?; + } + + yield Message::Chunk(chunk); + } + } + } + } + } + } +} + +impl Executor for FsFetchExecutor { + fn execute(self: Box) -> BoxedMessageStream { + self.into_stream().boxed() + } + + fn schema(&self) -> &Schema { + &self.schema + } + + fn pk_indices(&self) -> PkIndicesRef<'_> { + &self.pk_indices + } + + fn identity(&self) -> &str { + self.identity.as_str() + } +} \ No newline at end of file diff --git a/src/stream/src/executor/source_v2/list_executor.rs b/src/stream/src/executor/source_v2/list_executor.rs new file mode 100644 index 0000000000000..eea7798634a1c --- /dev/null +++ b/src/stream/src/executor/source_v2/list_executor.rs @@ -0,0 +1,320 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use anyhow::anyhow; +use either::Either; +use futures::StreamExt; +use futures_async_stream::try_stream; +use risingwave_common::array::Op; +use risingwave_common::catalog::Schema; +use risingwave_common::system_param::local_manager::SystemParamsReaderRef; +use risingwave_connector::source::filesystem::FsPage; +use risingwave_connector::source::{BoxTryStream, SourceCtrlOpts}; +use risingwave_connector::ConnectorParams; +use risingwave_source::source_desc::{SourceDesc, SourceDescBuilder}; +use risingwave_storage::StateStore; +use tokio::sync::mpsc::UnboundedReceiver; +use crate::executor::error::StreamExecutorError; +use crate::executor::monitor::StreamingMetrics; +use crate::executor::stream_reader::StreamReaderWithPause; +use crate::executor::*; + +pub struct FsListExecutor { + actor_ctx: ActorContextRef, + + identity: String, + + schema: Schema, + + pk_indices: PkIndices, + + /// Streaming source for external + stream_source_core: Option>, + + /// Metrics for monitor. + metrics: Arc, + + /// Receiver of barrier channel. + barrier_receiver: Option>, + + /// System parameter reader to read barrier interval + system_params: SystemParamsReaderRef, + + // control options for connector level + source_ctrl_opts: SourceCtrlOpts, + + // config for the connector node + connector_params: ConnectorParams, +} + +impl FsListExecutor { + #[allow(clippy::too_many_arguments)] + pub fn new( + actor_ctx: ActorContextRef, + schema: Schema, + pk_indices: PkIndices, + stream_source_core: Option>, + metrics: Arc, + barrier_receiver: UnboundedReceiver, + system_params: SystemParamsReaderRef, + executor_id: u64, + source_ctrl_opts: SourceCtrlOpts, + connector_params: ConnectorParams, + ) -> Self { + Self { + actor_ctx, + identity: format!("FsListExecutor {:X}", executor_id), + schema, + pk_indices, + stream_source_core, + metrics, + barrier_receiver: Some(barrier_receiver), + system_params, + source_ctrl_opts, + connector_params, + } + } + + async fn build_chunked_paginate_stream( + &self, + source_desc: &SourceDesc, + ) -> StreamExecutorResult> { + let stream = source_desc + .source + .source_lister() + .await + .map_err(StreamExecutorError::connector_error)?; + + Ok(stream.map(|item| { + item.map(Self::map_fs_page_to_chunk) + }).boxed()) + } + + fn map_fs_page_to_chunk(page: FsPage) -> StreamChunk { + let rows = page + .into_iter() + .map(|split| {( + Op::Insert, + OwnedRow::new(vec![ + Some(ScalarImpl::Utf8(split.name.into_boxed_str())), + Some(ScalarImpl::Timestamp(split.timestamp)), + Some(ScalarImpl::Int64(split.size as i64)), + ]), + )}) + .collect::>(); + StreamChunk::from_rows( + &rows, + &[DataType::Varchar, DataType::Timestamp, DataType::Int64], + ) + } + + #[try_stream(ok = Message, error = StreamExecutorError)] + async fn into_stream(mut self) { + let mut barrier_receiver = self.barrier_receiver.take().unwrap(); + let barrier = barrier_receiver + .recv() + .instrument_await("source_recv_first_barrier") + .await + .ok_or_else(|| { + anyhow!( + "failed to receive the first barrier, actor_id: {:?}, source_id: {:?}", + self.actor_ctx.id, + self.stream_source_core.as_ref().unwrap().source_id + ) + })?; + + let mut core = self.stream_source_core.unwrap(); + + // Build source description from the builder. + let source_desc_builder: SourceDescBuilder = core.source_desc_builder.take().unwrap(); + let source_desc = source_desc_builder + .build() + .map_err(StreamExecutorError::connector_error)?; + + // Return the ownership of `stream_source_core` to the source executor. + self.stream_source_core = Some(core); + + let chunked_paginate_stream = self.build_chunked_paginate_stream(&source_desc).await?; + + let barrier_stream = barrier_to_message_stream(barrier_receiver).boxed(); + let mut stream = StreamReaderWithPause::::new( + barrier_stream, + chunked_paginate_stream + ); + + if barrier.is_pause_on_startup() { + stream.pause_stream(); + } + + yield Message::Barrier(barrier); + + while let Some(msg) = stream.next().await { + match msg { + Err(_) => (), + Ok(msg) => match msg { + // Barrier arrives. + Either::Left(msg) => match &msg { + Message::Barrier(barrier) => { + if let Some(mutation) = barrier.mutation.as_deref() { + match mutation { + Mutation::Pause => stream.pause_stream(), + Mutation::Resume => stream.resume_stream(), + _ => (), + } + } + + // Propagate the barrier. + yield msg; + } + // Only barrier can be received. + _ => unreachable!(), + }, + // Chunked FsPage arrives. + Either::Right(chunk) => { + yield Message::Chunk(chunk); + } + }, + } + } + } +} + +impl Executor for FsListExecutor { + fn execute(self: Box) -> BoxedMessageStream { + self.into_stream().boxed() + } + + fn schema(&self) -> &Schema { + &self.schema + } + + fn pk_indices(&self) -> PkIndicesRef<'_> { + &self.pk_indices + } + + fn identity(&self) -> &str { + self.identity.as_str() + } +} + +#[cfg(test)] +mod tests { + use futures::StreamExt; + use maplit::{convert_args, hashmap}; + use risingwave_common::catalog::{ColumnId, Field, Schema, TableId}; + use risingwave_common::system_param::local_manager::LocalSystemParamsManager; + use risingwave_common::types::DataType; + use risingwave_pb::catalog::StreamSourceInfo; + use risingwave_pb::plan_common::PbRowFormatType; + use risingwave_source::connector_test_utils::create_source_desc_builder; + use risingwave_storage::memory::MemoryStateStore; + use tokio::sync::mpsc::unbounded_channel; + + use super::*; + use crate::executor::ActorContext; + + const MOCK_SOURCE_NAME: &str = "mock_source"; + + #[tokio::test] + async fn test_fs_list_executor() { + let table_id = TableId::default(); + let schema = Schema { + fields: vec![ + Field::with_name(DataType::Varchar, "filename"), + Field::with_name(DataType::Timestamp, "timestamp"), + Field::with_name(DataType::Int64, "size"), + ], + }; + let row_id_index = None; + let pk_indices = vec![0]; + let source_info = StreamSourceInfo { + row_format: PbRowFormatType::Native as i32, + ..Default::default() + }; + let (barrier_tx, barrier_rx) = unbounded_channel::(); + let column_ids = vec![0].into_iter().map(ColumnId::from).collect(); + + let properties: HashMap = convert_args!(hashmap!( + "connector" => "s3", + "s3.region_name" => "us-east-1", + "s3.endpoint_url" => "http://[::1]:9090", + "s3.bucket_name" => "test", + "s3.credentials.access" => "any", + "s3.credentials.secret" => "any", + )); + let source_desc_builder = + create_source_desc_builder(&schema, row_id_index, source_info, properties, vec![]); + let split_state_store = SourceStateTableHandler::from_table_catalog( + &default_source_internal_table(0x2333), + MemoryStateStore::new(), + ) + .await; + let core = StreamSourceCore:: { + source_id: table_id, + column_ids, + source_desc_builder: Some(source_desc_builder), + stream_source_splits: HashMap::new(), + split_state_store, + state_cache: HashMap::new(), + source_name: MOCK_SOURCE_NAME.to_string(), + }; + + let system_params_manager = LocalSystemParamsManager::for_test(); + + let executor = FsListExecutor::new( + ActorContext::create(0), + schema, + pk_indices, + Some(core), + Arc::new(StreamingMetrics::unused()), + barrier_rx, + system_params_manager.get_params(), + 1, + SourceCtrlOpts::default(), + ConnectorParams::default(), + ); + let mut executor = Box::new(executor).execute(); + + let init_barrier = Barrier::new_test_barrier(1).with_mutation(Mutation::Add { + adds: HashMap::new(), + added_actors: HashSet::new(), + splits: hashmap! { + ActorId::default() => vec![], + }, + pause: false, + }); + barrier_tx.send(init_barrier).unwrap(); + + // Consume init barrier. + executor.next().await.unwrap().unwrap(); + + // Consume the second barrier. + let barrier = Barrier::new_test_barrier(2); + barrier_tx.send(barrier).unwrap(); + let msg = executor.next().await.unwrap().unwrap(); // page chunk + executor.next().await.unwrap().unwrap(); // barrier + + println!("barrier 2: {:#?}", msg); + + // Consume the third barrier. + let barrier = Barrier::new_test_barrier(3); + barrier_tx.send(barrier).unwrap(); + let msg = executor.next().await.unwrap().unwrap(); // page chunk + executor.next().await.unwrap().unwrap(); // barrier + + println!("barrier 3: {:#?}", msg); + } +} diff --git a/src/stream/src/executor/source_v2/mod.rs b/src/stream/src/executor/source_v2/mod.rs new file mode 100644 index 0000000000000..8c3cb0a1a538a --- /dev/null +++ b/src/stream/src/executor/source_v2/mod.rs @@ -0,0 +1,16 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod list_executor; +pub mod fetch_executor; \ No newline at end of file diff --git a/src/stream/src/from_proto/source/fs_fetch.rs b/src/stream/src/from_proto/source/fs_fetch.rs index 14b01cc1ff861..60468ec6ed912 100644 --- a/src/stream/src/from_proto/source/fs_fetch.rs +++ b/src/stream/src/from_proto/source/fs_fetch.rs @@ -12,11 +12,16 @@ // See the License for the specific language governing permissions and // limitations under the License. +use risingwave_common::catalog::{Field, Schema, TableId, ColumnId}; +use risingwave_common::types::DataType; +use risingwave_connector::source::SourceCtrlOpts; use risingwave_pb::stream_plan::StreamFsFetchNode; +use risingwave_source::source_desc::SourceDescBuilder; use risingwave_storage::StateStore; use crate::error::StreamResult; -use crate::executor::BoxedExecutor; +use crate::executor::source_v2::fetch_executor::FsFetchExecutor; +use crate::executor::{BoxedExecutor, Executor, SourceStateTableHandler, StreamSourceCore}; use crate::from_proto::ExecutorBuilder; use crate::task::{ExecutorParams, LocalStreamManagerCore}; @@ -30,8 +35,71 @@ impl ExecutorBuilder for FsFetchExecutorBuilder { params: ExecutorParams, node: &Self::Node, store: impl StateStore, - stream: &mut LocalStreamManagerCore, + _stream: &mut LocalStreamManagerCore, ) -> StreamResult { - todo!() + let [upstream]: [_; 1] = params.input.try_into().unwrap(); + + let source = node.node_inner.as_ref().unwrap(); + + let source_id = TableId::new(source.source_id); + let source_name = source.source_name.clone(); + let source_info = source.get_info()?; + + let source_desc_builder = SourceDescBuilder::new( + source.columns.clone(), + params.env.source_metrics(), + source.row_id_index.map(|x| x as _), + source.properties.clone(), + source_info.clone(), + params.env.connector_params(), + params.env.config().developer.connector_message_buffer_size, + params.pk_indices.clone(), + ); + + let source_ctrl_opts = SourceCtrlOpts { + chunk_size: params.env.config().developer.chunk_size, + }; + + let column_ids: Vec<_> = source + .columns + .iter() + .map(|column| ColumnId::from(column.get_column_desc().unwrap().column_id)) + .collect(); + let fields = source + .columns + .iter() + .map(|prost| { + let column_desc = prost.column_desc.as_ref().unwrap(); + let data_type = DataType::from(column_desc.column_type.as_ref().unwrap()); + let name = column_desc.name.clone(); + Field::with_name(data_type, name) + }) + .collect(); + let schema = Schema::new(fields); + + let state_table_handler = SourceStateTableHandler::from_table_catalog( + source.state_table.as_ref().unwrap(), + store.clone(), + ) + .await; + let stream_source_core = StreamSourceCore::new( + source_id, + source_name, + column_ids, + source_desc_builder, + state_table_handler, + ); + + Ok(FsFetchExecutor::new( + params.actor_context, + schema, + params.pk_indices, + stream_source_core, + params.executor_id, + upstream, + source_ctrl_opts, + params.env.connector_params(), + ) + .boxed()) } } diff --git a/src/stream/src/from_proto/source/trad_source.rs b/src/stream/src/from_proto/source/trad_source.rs index 77bbcc53e69c5..ee8ab9eac02a8 100644 --- a/src/stream/src/from_proto/source/trad_source.rs +++ b/src/stream/src/from_proto/source/trad_source.rs @@ -16,7 +16,7 @@ use risingwave_common::catalog::{ColumnId, Field, Schema, TableId}; use risingwave_common::types::DataType; use risingwave_common::util::sort_util::OrderType; use risingwave_connector::source::external::{ExternalTableType, SchemaTableName}; -use risingwave_connector::source::SourceCtrlOpts; +use risingwave_connector::source::{SourceCtrlOpts, S3_V2_CONNECTOR}; use risingwave_pb::stream_plan::SourceNode; use risingwave_source::source_desc::SourceDescBuilder; use risingwave_storage::panic_store::PanicStateStore; @@ -26,10 +26,12 @@ use super::*; use crate::executor::external::ExternalStorageTable; use crate::executor::source::StreamSourceCore; use crate::executor::source_executor::SourceExecutor; +use crate::executor::source_v2::list_executor::FsListExecutor; use crate::executor::state_table_handler::SourceStateTableHandler; use crate::executor::{CdcBackfillExecutor, FlowControlExecutor, FsSourceExecutor}; const FS_CONNECTORS: &[&str] = &["s3"]; +const FS_V2_CONNECTORS: &[&str] = &[S3_V2_CONNECTOR]; pub struct SourceExecutorBuilder; #[async_trait::async_trait] @@ -115,6 +117,7 @@ impl ExecutorBuilder for SourceExecutorBuilder { .map(|c| c.to_ascii_lowercase()) .unwrap_or_default(); let is_fs_connector = FS_CONNECTORS.contains(&connector.as_str()); + let is_fs_v2_connector = FS_V2_CONNECTORS.contains(&connector.as_str()); if is_fs_connector { FsSourceExecutor::new( @@ -129,6 +132,20 @@ impl ExecutorBuilder for SourceExecutorBuilder { source_ctrl_opts, )? .boxed() + } else if is_fs_v2_connector { + FsListExecutor::new( + params.actor_context.clone(), + schema.clone(), + params.pk_indices.clone(), + Some(stream_source_core), + params.executor_stats.clone(), + barrier_receiver, + system_params, + params.executor_id, + source_ctrl_opts.clone(), + params.env.connector_params(), + ) + .boxed() } else { let source_exec = SourceExecutor::new( params.actor_context.clone(),