From c82268b7c2e98af98e72dc50a4892f55d21e4565 Mon Sep 17 00:00:00 2001 From: Rossil <40714231+Rossil2012@users.noreply.github.com> Date: Tue, 26 Sep 2023 11:35:12 +0800 Subject: [PATCH] implement list executor and S3SourceLister --- src/connector/src/source/base.rs | 11 +- .../src/source/filesystem/file_common.rs | 18 +- src/connector/src/source/filesystem/mod.rs | 3 +- .../src/source/filesystem/s3_v2/lister.rs | 138 +++++++ .../src/source/filesystem/s3_v2/mod.rs | 15 + src/source/src/connector_source.rs | 16 +- src/stream/src/executor/mod.rs | 1 + .../src/executor/source_v2/list_executor.rs | 342 ++++++++++++++++++ src/stream/src/executor/source_v2/mod.rs | 15 + 9 files changed, 554 insertions(+), 5 deletions(-) create mode 100644 src/connector/src/source/filesystem/s3_v2/lister.rs create mode 100644 src/connector/src/source/filesystem/s3_v2/mod.rs create mode 100644 src/stream/src/executor/source_v2/list_executor.rs create mode 100644 src/stream/src/executor/source_v2/mod.rs diff --git a/src/connector/src/source/base.rs b/src/connector/src/source/base.rs index d02645385b81..5205dea54094 100644 --- a/src/connector/src/source/base.rs +++ b/src/connector/src/source/base.rs @@ -33,7 +33,7 @@ use risingwave_rpc_client::ConnectorClient; use serde::de::DeserializeOwned; use super::datagen::DatagenMeta; -use super::filesystem::FsSplit; +use super::filesystem::{FsPage, FsSplit}; use super::google_pubsub::GooglePubsubMeta; use super::kafka::KafkaMeta; use super::monitor::SourceMetrics; @@ -499,6 +499,15 @@ pub trait SplitMetaData: Sized { /// [`None`] and the created source stream will be a pending stream. pub type ConnectorState = Option>; +#[async_trait] +pub trait SourceLister: Sized { + type Split: SplitMetaData + Send; + type Properties; + + async fn new(properties: Self::Properties) -> Result; + fn paginate(self) -> BoxTryStream>; +} + #[cfg(test)] mod tests { use maplit::*; diff --git a/src/connector/src/source/filesystem/file_common.rs b/src/connector/src/source/filesystem/file_common.rs index d4328289b547..2d816e52ccc7 100644 --- a/src/connector/src/source/filesystem/file_common.rs +++ b/src/connector/src/source/filesystem/file_common.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. use anyhow::anyhow; -use risingwave_common::types::JsonbVal; +use risingwave_common::types::{JsonbVal, Timestamp}; use serde::{Deserialize, Serialize}; use crate::source::{SplitId, SplitMetaData}; @@ -55,3 +55,19 @@ impl FsSplit { } } } + +pub struct FsPage { + pub name: String, + pub size: usize, + pub timestamp: Timestamp, +} + +impl FsPage { + pub fn new(name: String, size: usize, timestamp: Timestamp) -> Self { + Self { + name, + size, + timestamp, + } + } +} diff --git a/src/connector/src/source/filesystem/mod.rs b/src/connector/src/source/filesystem/mod.rs index 729fb376ecc6..3d40763caf18 100644 --- a/src/connector/src/source/filesystem/mod.rs +++ b/src/connector/src/source/filesystem/mod.rs @@ -16,5 +16,6 @@ pub use s3::{S3FileReader, S3Properties, S3SplitEnumerator, S3_CONNECTOR}; mod file_common; pub mod nd_streaming; -pub use file_common::FsSplit; +pub use file_common::{FsPage, FsSplit}; mod s3; +pub mod s3_v2; diff --git a/src/connector/src/source/filesystem/s3_v2/lister.rs b/src/connector/src/source/filesystem/s3_v2/lister.rs new file mode 100644 index 000000000000..2b8789d3db4e --- /dev/null +++ b/src/connector/src/source/filesystem/s3_v2/lister.rs @@ -0,0 +1,138 @@ +use anyhow::{anyhow, Context, Result}; +use async_trait::async_trait; +use aws_sdk_s3::error::DisplayErrorContext; +use aws_sdk_s3::Client; +use futures_async_stream::try_stream; +use itertools::Itertools; +use risingwave_common::error::RwError; +use risingwave_common::types::Timestamp; + +use crate::aws_auth::AwsAuthProps; +use crate::aws_utils::{default_conn_config, s3_client}; +use crate::source::filesystem::file_common::{FsPage, FsSplit}; +use crate::source::filesystem::S3Properties; +use crate::source::{BoxTryStream, SourceLister}; + +/// Get the prefix from a glob +fn get_prefix(glob: &str) -> String { + let mut escaped = false; + let mut escaped_filter = false; + glob.chars() + .take_while(|c| match (c, &escaped) { + ('*', false) => false, + ('[', false) => false, + ('{', false) => false, + ('\\', false) => { + escaped = true; + true + } + (_, false) => true, + (_, true) => { + escaped = false; + true + } + }) + .filter(|c| match (c, &escaped_filter) { + (_, true) => { + escaped_filter = false; + true + } + ('\\', false) => { + escaped_filter = true; + false + } + (_, _) => true, + }) + .collect() +} + +pub struct S3SourceLister { + client: Client, + // prefix is used to reduce the number of objects to be listed + prefix: Option, + matcher: Option, + bucket_name: String, +} + +impl S3SourceLister { + #[try_stream(boxed, ok = Vec, error = RwError)] + async fn paginate_inner(self) { + loop { // start a new round + let mut next_continuation_token = None; + loop { // loop to paginate + let mut req = self + .client + .list_objects_v2() + .bucket(&self.bucket_name) + .set_prefix(self.prefix.clone()); + if let Some(continuation_token) = next_continuation_token.take() { + req = req.continuation_token(continuation_token); + } + let mut res = req + .send() + .await + .map_err(|e| anyhow!(DisplayErrorContext(e)))?; + + yield res + .contents + .take() + .unwrap_or_default() + .iter() + .filter(|obj| obj.key().is_some()) + .filter(|obj| { + self.matcher + .as_ref() + .map(|m| m.matches(obj.key().unwrap())) + .unwrap_or(true) + }) + .into_iter() + .map(|obj| { + let aws_ts = obj.last_modified().unwrap(); + FsPage::new( + obj.key().unwrap().to_owned(), + obj.size() as usize, + Timestamp::from_timestamp_uncheck(aws_ts.secs(), aws_ts.subsec_nanos()), + ) + }) + .collect_vec(); + + if res.is_truncated() { + next_continuation_token = Some(res.next_continuation_token.unwrap()) + } else { + break; + } + } + } + } +} + +#[async_trait] +impl SourceLister for S3SourceLister { + type Properties = S3Properties; + type Split = FsSplit; + + async fn new(properties: Self::Properties) -> Result { + let config = AwsAuthProps::from(&properties); + let sdk_config = config.build_config().await?; + let s3_client = s3_client(&sdk_config, Some(default_conn_config())); + let (prefix, matcher) = if let Some(pattern) = properties.match_pattern.as_ref() { + let prefix = get_prefix(pattern); + let matcher = glob::Pattern::new(pattern) + .with_context(|| format!("Invalid match_pattern: {}", pattern))?; + (Some(prefix), Some(matcher)) + } else { + (None, None) + }; + + Ok(Self { + bucket_name: properties.bucket_name, + matcher, + prefix, + client: s3_client, + }) + } + + fn paginate(self) -> BoxTryStream> { + self.paginate_inner() + } +} diff --git a/src/connector/src/source/filesystem/s3_v2/mod.rs b/src/connector/src/source/filesystem/s3_v2/mod.rs new file mode 100644 index 000000000000..6fab862daca1 --- /dev/null +++ b/src/connector/src/source/filesystem/s3_v2/mod.rs @@ -0,0 +1,15 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod lister; diff --git a/src/source/src/connector_source.rs b/src/source/src/connector_source.rs index 445bf0f6dbb9..90c2259ef100 100644 --- a/src/source/src/connector_source.rs +++ b/src/source/src/connector_source.rs @@ -25,9 +25,11 @@ use risingwave_common::error::{internal_error, Result}; use risingwave_common::util::select_all; use risingwave_connector::dispatch_source_prop; use risingwave_connector::parser::{CommonParserConfig, ParserConfig, SpecificParserConfig}; +use risingwave_connector::source::filesystem::s3_v2::lister::S3SourceLister; +use risingwave_connector::source::filesystem::FsPage; use risingwave_connector::source::{ - create_split_reader, BoxSourceWithStateStream, Column, ConnectorProperties, ConnectorState, - SourceColumnDesc, SourceContext, SplitReader, + create_split_reader, BoxSourceWithStateStream, BoxTryStream, Column, ConnectorProperties, + ConnectorState, SourceColumnDesc, SourceContext, SourceLister, SplitReader, }; #[derive(Clone, Debug)] @@ -74,6 +76,16 @@ impl ConnectorSource { .collect::>>() } + pub async fn source_lister(&self) -> Result>> { + let config = self.config.clone(); + let lister = match config { + ConnectorProperties::S3(prop) => S3SourceLister::new(*prop).await?, + _ => unreachable!(), + }; + + Ok(lister.paginate()) + } + pub async fn stream_reader( &self, state: ConnectorState, diff --git a/src/stream/src/executor/mod.rs b/src/stream/src/executor/mod.rs index 8fa7a5d818cc..6fcd928e2c52 100644 --- a/src/stream/src/executor/mod.rs +++ b/src/stream/src/executor/mod.rs @@ -87,6 +87,7 @@ mod sink; mod sort; mod sort_buffer; pub mod source; +pub mod source_v2; mod stateless_simple_agg; mod stream_reader; pub mod subtask; diff --git a/src/stream/src/executor/source_v2/list_executor.rs b/src/stream/src/executor/source_v2/list_executor.rs new file mode 100644 index 000000000000..9b73ee1a2015 --- /dev/null +++ b/src/stream/src/executor/source_v2/list_executor.rs @@ -0,0 +1,342 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use anyhow::anyhow; +use either::Either; +use futures::StreamExt; +use futures_async_stream::try_stream; +use risingwave_common::array::Op; +use risingwave_common::catalog::Schema; +use risingwave_common::system_param::local_manager::SystemParamsReaderRef; +use risingwave_connector::source::filesystem::FsPage; +use risingwave_connector::source::{ + BoxTryStream, SourceCtrlOpts, StreamChunkWithState +}; +use risingwave_connector::ConnectorParams; +use risingwave_source::source_desc::{SourceDesc, SourceDescBuilder}; +use risingwave_storage::StateStore; +use tokio::sync::mpsc::UnboundedReceiver; +use crate::executor::error::StreamExecutorError; +use crate::executor::monitor::StreamingMetrics; +use crate::executor::stream_reader::StreamReaderWithPause; +use crate::executor::*; + +pub struct ListExecutor { + actor_ctx: ActorContextRef, + + identity: String, + + schema: Schema, + + pk_indices: PkIndices, + + /// Streaming source for external + stream_source_core: Option>, + + /// Metrics for monitor. + metrics: Arc, + + /// Receiver of barrier channel. + barrier_receiver: Option>, + + /// System parameter reader to read barrier interval + system_params: SystemParamsReaderRef, + + // control options for connector level + source_ctrl_opts: SourceCtrlOpts, + + // config for the connector node + connector_params: ConnectorParams, +} + +impl ListExecutor { + #[allow(clippy::too_many_arguments)] + pub fn new( + actor_ctx: ActorContextRef, + schema: Schema, + pk_indices: PkIndices, + stream_source_core: Option>, + metrics: Arc, + barrier_receiver: UnboundedReceiver, + system_params: SystemParamsReaderRef, + executor_id: u64, + source_ctrl_opts: SourceCtrlOpts, + connector_params: ConnectorParams, + ) -> Self { + Self { + actor_ctx, + identity: format!("ListExecutor {:X}", executor_id), + schema, + pk_indices, + stream_source_core, + metrics, + barrier_receiver: Some(barrier_receiver), + system_params, + source_ctrl_opts, + connector_params, + } + } + + async fn build_fs_source_lister( + &self, + source_desc: &SourceDesc, + ) -> StreamExecutorResult>> { + source_desc + .source + .source_lister() + .await + .map_err(StreamExecutorError::connector_error) + } + + async fn fetch_one_page_chunk( + &self, + paginate_stream: &mut BoxTryStream>, + ) -> StreamExecutorResult { + match paginate_stream.next().await { + Some(Ok(page)) => { + let rows = page + .into_iter() + .map(|split| { + ( + Op::Insert, + OwnedRow::new(vec![ + Some(ScalarImpl::Utf8(split.name.into_boxed_str())), + Some(ScalarImpl::Timestamp(split.timestamp)), + Some(ScalarImpl::Int64(split.size as i64)), + ]), + ) + }) + .collect::>(); + Ok(StreamChunk::from_rows( + &rows, + &[DataType::Varchar, DataType::Timestamp, DataType::Int64], + )) + }, + Some(Err(err)) => Err(StreamExecutorError::connector_error(err)), + None => unreachable!(), // paginate_stream never ends + } + } + + + #[try_stream(ok = Message, error = StreamExecutorError)] + async fn into_stream(mut self) { + let mut barrier_receiver = self.barrier_receiver.take().unwrap(); + let barrier = barrier_receiver + .recv() + .instrument_await("source_recv_first_barrier") + .await + .ok_or_else(|| { + anyhow!( + "failed to receive the first barrier, actor_id: {:?}, source_id: {:?}", + self.actor_ctx.id, + self.stream_source_core.as_ref().unwrap().source_id + ) + })?; + + let mut core = self.stream_source_core.unwrap(); + + // Build source description from the builder. + let source_desc_builder: SourceDescBuilder = core.source_desc_builder.take().unwrap(); + let source_desc = source_desc_builder + .build() + .map_err(StreamExecutorError::connector_error)?; + + // TODO: init state store epoch + + // Return the ownership of `stream_source_core` to the source executor. + self.stream_source_core = Some(core); + + // TODO: recover state + let mut paginate_stream = self.build_fs_source_lister(&source_desc).await?; + + let barrier_stream = barrier_to_message_stream(barrier_receiver).boxed(); + let mut stream = StreamReaderWithPause::::new( + barrier_stream, + tokio_stream::pending().boxed(), + ); + + if barrier.is_pause_on_startup() { + stream.pause_stream(); + } + + yield Message::Barrier(barrier); + + while let Some(msg) = stream.next().await { + match msg { + Err(_) => (), + Ok(msg) => match msg { + Either::Left(msg) => match &msg { + Message::Barrier(barrier) => { + let mut is_pause_resume = false; + if let Some(mutation) = barrier.mutation.as_deref() { + match mutation { + Mutation::Pause => { + stream.pause_stream(); + is_pause_resume = true; + }, + Mutation::Resume => { + stream.resume_stream(); + is_pause_resume = true; + } + _ => (), + } + } + + if !is_pause_resume { + // TODO: persist some state here + let chunk = self.fetch_one_page_chunk(&mut paginate_stream).await?; + yield Message::Chunk(chunk); + } + + yield msg; // propagate the barrier + } + // Only barrier can be received. + _ => unreachable!(), + }, + // Right arm is always pending. + _ => unreachable!(), + }, + } + } + } +} + +impl Executor for ListExecutor { + fn execute(self: Box) -> BoxedMessageStream { + self.into_stream().boxed() + } + + fn schema(&self) -> &Schema { + &self.schema + } + + fn pk_indices(&self) -> PkIndicesRef<'_> { + &self.pk_indices + } + + fn identity(&self) -> &str { + self.identity.as_str() + } +} + +#[cfg(test)] +mod tests { + use futures::StreamExt; + use maplit::{convert_args, hashmap}; + use risingwave_common::catalog::{ColumnId, Field, Schema, TableId}; + use risingwave_common::system_param::local_manager::LocalSystemParamsManager; + use risingwave_common::types::DataType; + use risingwave_pb::catalog::StreamSourceInfo; + use risingwave_pb::plan_common::PbRowFormatType; + use risingwave_source::connector_test_utils::create_source_desc_builder; + use risingwave_storage::memory::MemoryStateStore; + use tokio::sync::mpsc::unbounded_channel; + + use super::*; + use crate::executor::ActorContext; + + const MOCK_SOURCE_NAME: &str = "mock_source"; + + #[tokio::test] + async fn test_fs_list_executor() { + let table_id = TableId::default(); + let schema = Schema { + fields: vec![ + Field::with_name(DataType::Varchar, "filename"), + Field::with_name(DataType::Timestamp, "timestamp"), + Field::with_name(DataType::Int64, "size"), + ], + }; + let row_id_index = None; + let pk_indices = vec![0]; + let source_info = StreamSourceInfo { + row_format: PbRowFormatType::Native as i32, + ..Default::default() + }; + let (barrier_tx, barrier_rx) = unbounded_channel::(); + let column_ids = vec![0].into_iter().map(ColumnId::from).collect(); + + let properties: HashMap = convert_args!(hashmap!( + "connector" => "s3", + "s3.region_name" => "us-east-1", + "s3.endpoint_url" => "http://[::1]:9090", + "s3.bucket_name" => "test", + "s3.credentials.access" => "any", + "s3.credentials.secret" => "any", + )); + let source_desc_builder = + create_source_desc_builder(&schema, row_id_index, source_info, properties, vec![]); + let split_state_store = SourceStateTableHandler::from_table_catalog( + &default_source_internal_table(0x2333), + MemoryStateStore::new(), + ) + .await; + let core = StreamSourceCore:: { + source_id: table_id, + column_ids, + source_desc_builder: Some(source_desc_builder), + stream_source_splits: HashMap::new(), + split_state_store, + state_cache: HashMap::new(), + source_name: MOCK_SOURCE_NAME.to_string(), + }; + + let system_params_manager = LocalSystemParamsManager::for_test(); + + let executor = ListExecutor::new( + ActorContext::create(0), + schema, + pk_indices, + Some(core), + Arc::new(StreamingMetrics::unused()), + barrier_rx, + system_params_manager.get_params(), + 1, + SourceCtrlOpts::default(), + ConnectorParams::default(), + ); + let mut executor = Box::new(executor).execute(); + + let init_barrier = Barrier::new_test_barrier(1).with_mutation(Mutation::Add { + adds: HashMap::new(), + added_actors: HashSet::new(), + splits: hashmap! { + ActorId::default() => vec![], + }, + pause: false, + }); + barrier_tx.send(init_barrier).unwrap(); + + // Consume init barrier. + executor.next().await.unwrap().unwrap(); + + // Consume the second barrier. + let barrier = Barrier::new_test_barrier(2); + barrier_tx.send(barrier).unwrap(); + let msg = executor.next().await.unwrap().unwrap(); // page chunk + executor.next().await.unwrap().unwrap(); // barrier + + println!("barrier 2: {:#?}", msg); + + // Consume the third barrier. + let barrier = Barrier::new_test_barrier(3); + barrier_tx.send(barrier).unwrap(); + let msg = executor.next().await.unwrap().unwrap(); // page chunk + executor.next().await.unwrap().unwrap(); // barrier + + println!("barrier 3: {:#?}", msg); + } +} diff --git a/src/stream/src/executor/source_v2/mod.rs b/src/stream/src/executor/source_v2/mod.rs new file mode 100644 index 000000000000..a6cf4823bdd2 --- /dev/null +++ b/src/stream/src/executor/source_v2/mod.rs @@ -0,0 +1,15 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod list_executor;