Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(source): implement list executor and S3SourceLister #12531

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion src/connector/src/source/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use risingwave_rpc_client::ConnectorClient;
use serde::de::DeserializeOwned;

use super::datagen::DatagenMeta;
use super::filesystem::FsSplit;
use super::filesystem::{FsPage, FsSplit};
use super::google_pubsub::GooglePubsubMeta;
use super::kafka::KafkaMeta;
use super::monitor::SourceMetrics;
Expand Down Expand Up @@ -499,6 +499,15 @@ pub trait SplitMetaData: Sized {
/// [`None`] and the created source stream will be a pending stream.
pub type ConnectorState = Option<Vec<SplitImpl>>;

#[async_trait]
pub trait SourceLister: Sized {
type Split: SplitMetaData + Send;
type Properties;

async fn new(properties: Self::Properties) -> Result<Self>;
fn paginate(self) -> BoxTryStream<Vec<FsPage>>;
}

#[cfg(test)]
mod tests {
use maplit::*;
Expand Down
18 changes: 17 additions & 1 deletion src/connector/src/source/filesystem/file_common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use anyhow::anyhow;
use risingwave_common::types::JsonbVal;
use risingwave_common::types::{JsonbVal, Timestamp};
use serde::{Deserialize, Serialize};

use crate::source::{SplitId, SplitMetaData};
Expand Down Expand Up @@ -55,3 +55,19 @@ impl FsSplit {
}
}
}

pub struct FsPage {
pub name: String,
pub size: usize,
pub timestamp: Timestamp,
}

impl FsPage {
pub fn new(name: String, size: usize, timestamp: Timestamp) -> Self {
Self {
name,
size,
timestamp,
}
}
}
3 changes: 2 additions & 1 deletion src/connector/src/source/filesystem/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,6 @@ pub use s3::{S3FileReader, S3Properties, S3SplitEnumerator, S3_CONNECTOR};

mod file_common;
pub mod nd_streaming;
pub use file_common::FsSplit;
pub use file_common::{FsPage, FsSplit};
mod s3;
pub mod s3_v2;
152 changes: 152 additions & 0 deletions src/connector/src/source/filesystem/s3_v2/lister.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use anyhow::{anyhow, Context, Result};
Rossil2012 marked this conversation as resolved.
Show resolved Hide resolved
use async_trait::async_trait;
use aws_sdk_s3::error::DisplayErrorContext;
use aws_sdk_s3::Client;
use futures_async_stream::try_stream;
use itertools::Itertools;
use risingwave_common::error::RwError;
use risingwave_common::types::Timestamp;

use crate::aws_auth::AwsAuthProps;
use crate::aws_utils::{default_conn_config, s3_client};
use crate::source::filesystem::file_common::{FsPage, FsSplit};
use crate::source::filesystem::S3Properties;
use crate::source::{BoxTryStream, SourceLister};

/// Get the prefix from a glob
fn get_prefix(glob: &str) -> String {
let mut escaped = false;
let mut escaped_filter = false;
glob.chars()
.take_while(|c| match (c, &escaped) {
('*', false) => false,
('[', false) => false,
('{', false) => false,
('\\', false) => {
escaped = true;
true
}
(_, false) => true,
(_, true) => {
escaped = false;
true
}
})
.filter(|c| match (c, &escaped_filter) {
(_, true) => {
escaped_filter = false;
true
}
('\\', false) => {
escaped_filter = true;
false
}
(_, _) => true,
})
.collect()
}

pub struct S3SourceLister {
client: Client,
// prefix is used to reduce the number of objects to be listed
prefix: Option<String>,
matcher: Option<glob::Pattern>,
bucket_name: String,
}

impl S3SourceLister {
#[try_stream(boxed, ok = Vec<FsPage>, error = RwError)]
async fn paginate_inner(self) {
'round: loop { // start a new round
let mut next_continuation_token = None;
'truncated: loop { // loop to paginate
let mut req = self
.client
.list_objects_v2()
.bucket(&self.bucket_name)
.set_prefix(self.prefix.clone());
if let Some(continuation_token) = next_continuation_token.take() {
req = req.continuation_token(continuation_token);
}
let mut res = req
.send()
.await
.map_err(|e| anyhow!(DisplayErrorContext(e)))?;

yield res
.contents
.take()
.unwrap_or_default()
.iter()
.filter(|obj| obj.key().is_some())
.filter(|obj| {
self.matcher
.as_ref()
.map(|m| m.matches(obj.key().unwrap()))
.unwrap_or(true)
})
.into_iter()
.map(|obj| {
let aws_ts = obj.last_modified().unwrap();
FsPage::new(
obj.key().unwrap().to_owned(),
obj.size() as usize,
Timestamp::from_timestamp_uncheck(aws_ts.secs(), aws_ts.subsec_nanos()),
)
})
.collect_vec();

if res.is_truncated() {
next_continuation_token = Some(res.next_continuation_token.unwrap())
} else {
break 'truncated;
}
}
}
}
}

#[async_trait]
impl SourceLister for S3SourceLister {
type Properties = S3Properties;
type Split = FsSplit;

async fn new(properties: Self::Properties) -> Result<Self> {
let config = AwsAuthProps::from(&properties);
let sdk_config = config.build_config().await?;
let s3_client = s3_client(&sdk_config, Some(default_conn_config()));
let (prefix, matcher) = if let Some(pattern) = properties.match_pattern.as_ref() {
let prefix = get_prefix(pattern);
let matcher = glob::Pattern::new(pattern)
.with_context(|| format!("Invalid match_pattern: {}", pattern))?;
(Some(prefix), Some(matcher))
} else {
(None, None)
};

Ok(Self {
bucket_name: properties.bucket_name,
matcher,
prefix,
client: s3_client,
})
}

fn paginate(self) -> BoxTryStream<Vec<FsPage>> {
self.paginate_inner()
}
}
15 changes: 15 additions & 0 deletions src/connector/src/source/filesystem/s3_v2/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

pub mod lister;
16 changes: 14 additions & 2 deletions src/source/src/connector_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@ use risingwave_common::error::{internal_error, Result};
use risingwave_common::util::select_all;
use risingwave_connector::dispatch_source_prop;
use risingwave_connector::parser::{CommonParserConfig, ParserConfig, SpecificParserConfig};
use risingwave_connector::source::filesystem::s3_v2::lister::S3SourceLister;
use risingwave_connector::source::filesystem::FsPage;
use risingwave_connector::source::{
create_split_reader, BoxSourceWithStateStream, Column, ConnectorProperties, ConnectorState,
SourceColumnDesc, SourceContext, SplitReader,
create_split_reader, BoxSourceWithStateStream, BoxTryStream, Column, ConnectorProperties,
ConnectorState, SourceColumnDesc, SourceContext, SourceLister, SplitReader,
};

#[derive(Clone, Debug)]
Expand Down Expand Up @@ -74,6 +76,16 @@ impl ConnectorSource {
.collect::<Result<Vec<SourceColumnDesc>>>()
}

pub async fn source_lister(&self) -> Result<BoxTryStream<Vec<FsPage>>> {
let config = self.config.clone();
let lister = match config {
ConnectorProperties::S3(prop) => S3SourceLister::new(*prop).await?,
_ => unreachable!(),
};

Ok(lister.paginate())
}

pub async fn stream_reader(
&self,
state: ConnectorState,
Expand Down
1 change: 1 addition & 0 deletions src/stream/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ mod sink;
mod sort;
mod sort_buffer;
pub mod source;
pub mod source_v2;
mod stateless_simple_agg;
mod stream_reader;
pub mod subtask;
Expand Down
Loading
Loading