Skip to content

Commit

Permalink
implement list executor and S3SourceLister
Browse files Browse the repository at this point in the history
  • Loading branch information
Rossil2012 committed Sep 26, 2023
1 parent 5a2ee63 commit c82268b
Show file tree
Hide file tree
Showing 9 changed files with 554 additions and 5 deletions.
11 changes: 10 additions & 1 deletion src/connector/src/source/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use risingwave_rpc_client::ConnectorClient;
use serde::de::DeserializeOwned;

use super::datagen::DatagenMeta;
use super::filesystem::FsSplit;
use super::filesystem::{FsPage, FsSplit};
use super::google_pubsub::GooglePubsubMeta;
use super::kafka::KafkaMeta;
use super::monitor::SourceMetrics;
Expand Down Expand Up @@ -499,6 +499,15 @@ pub trait SplitMetaData: Sized {
/// [`None`] and the created source stream will be a pending stream.
pub type ConnectorState = Option<Vec<SplitImpl>>;

#[async_trait]
pub trait SourceLister: Sized {
type Split: SplitMetaData + Send;
type Properties;

async fn new(properties: Self::Properties) -> Result<Self>;
fn paginate(self) -> BoxTryStream<Vec<FsPage>>;
}

#[cfg(test)]
mod tests {
use maplit::*;
Expand Down
18 changes: 17 additions & 1 deletion src/connector/src/source/filesystem/file_common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use anyhow::anyhow;
use risingwave_common::types::JsonbVal;
use risingwave_common::types::{JsonbVal, Timestamp};
use serde::{Deserialize, Serialize};

use crate::source::{SplitId, SplitMetaData};
Expand Down Expand Up @@ -55,3 +55,19 @@ impl FsSplit {
}
}
}

pub struct FsPage {
pub name: String,
pub size: usize,
pub timestamp: Timestamp,
}

impl FsPage {
pub fn new(name: String, size: usize, timestamp: Timestamp) -> Self {
Self {
name,
size,
timestamp,
}
}
}
3 changes: 2 additions & 1 deletion src/connector/src/source/filesystem/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,6 @@ pub use s3::{S3FileReader, S3Properties, S3SplitEnumerator, S3_CONNECTOR};

mod file_common;
pub mod nd_streaming;
pub use file_common::FsSplit;
pub use file_common::{FsPage, FsSplit};
mod s3;
pub mod s3_v2;
138 changes: 138 additions & 0 deletions src/connector/src/source/filesystem/s3_v2/lister.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
use anyhow::{anyhow, Context, Result};
use async_trait::async_trait;
use aws_sdk_s3::error::DisplayErrorContext;
use aws_sdk_s3::Client;
use futures_async_stream::try_stream;
use itertools::Itertools;
use risingwave_common::error::RwError;
use risingwave_common::types::Timestamp;

use crate::aws_auth::AwsAuthProps;
use crate::aws_utils::{default_conn_config, s3_client};
use crate::source::filesystem::file_common::{FsPage, FsSplit};
use crate::source::filesystem::S3Properties;
use crate::source::{BoxTryStream, SourceLister};

/// Get the prefix from a glob
fn get_prefix(glob: &str) -> String {
let mut escaped = false;
let mut escaped_filter = false;
glob.chars()
.take_while(|c| match (c, &escaped) {
('*', false) => false,
('[', false) => false,
('{', false) => false,
('\\', false) => {
escaped = true;
true
}
(_, false) => true,
(_, true) => {
escaped = false;
true
}
})
.filter(|c| match (c, &escaped_filter) {
(_, true) => {
escaped_filter = false;
true
}
('\\', false) => {
escaped_filter = true;
false
}
(_, _) => true,
})
.collect()
}

pub struct S3SourceLister {
client: Client,
// prefix is used to reduce the number of objects to be listed
prefix: Option<String>,
matcher: Option<glob::Pattern>,
bucket_name: String,
}

impl S3SourceLister {
#[try_stream(boxed, ok = Vec<FsPage>, error = RwError)]
async fn paginate_inner(self) {
loop { // start a new round
let mut next_continuation_token = None;
loop { // loop to paginate
let mut req = self
.client
.list_objects_v2()
.bucket(&self.bucket_name)
.set_prefix(self.prefix.clone());
if let Some(continuation_token) = next_continuation_token.take() {
req = req.continuation_token(continuation_token);
}
let mut res = req
.send()
.await
.map_err(|e| anyhow!(DisplayErrorContext(e)))?;

yield res
.contents
.take()
.unwrap_or_default()
.iter()
.filter(|obj| obj.key().is_some())
.filter(|obj| {
self.matcher
.as_ref()
.map(|m| m.matches(obj.key().unwrap()))
.unwrap_or(true)
})
.into_iter()
.map(|obj| {
let aws_ts = obj.last_modified().unwrap();
FsPage::new(
obj.key().unwrap().to_owned(),
obj.size() as usize,
Timestamp::from_timestamp_uncheck(aws_ts.secs(), aws_ts.subsec_nanos()),
)
})
.collect_vec();

if res.is_truncated() {
next_continuation_token = Some(res.next_continuation_token.unwrap())
} else {
break;
}
}
}
}
}

#[async_trait]
impl SourceLister for S3SourceLister {
type Properties = S3Properties;
type Split = FsSplit;

async fn new(properties: Self::Properties) -> Result<Self> {
let config = AwsAuthProps::from(&properties);
let sdk_config = config.build_config().await?;
let s3_client = s3_client(&sdk_config, Some(default_conn_config()));
let (prefix, matcher) = if let Some(pattern) = properties.match_pattern.as_ref() {
let prefix = get_prefix(pattern);
let matcher = glob::Pattern::new(pattern)
.with_context(|| format!("Invalid match_pattern: {}", pattern))?;
(Some(prefix), Some(matcher))
} else {
(None, None)
};

Ok(Self {
bucket_name: properties.bucket_name,
matcher,
prefix,
client: s3_client,
})
}

fn paginate(self) -> BoxTryStream<Vec<FsPage>> {
self.paginate_inner()
}
}
15 changes: 15 additions & 0 deletions src/connector/src/source/filesystem/s3_v2/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

pub mod lister;
16 changes: 14 additions & 2 deletions src/source/src/connector_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@ use risingwave_common::error::{internal_error, Result};
use risingwave_common::util::select_all;
use risingwave_connector::dispatch_source_prop;
use risingwave_connector::parser::{CommonParserConfig, ParserConfig, SpecificParserConfig};
use risingwave_connector::source::filesystem::s3_v2::lister::S3SourceLister;
use risingwave_connector::source::filesystem::FsPage;
use risingwave_connector::source::{
create_split_reader, BoxSourceWithStateStream, Column, ConnectorProperties, ConnectorState,
SourceColumnDesc, SourceContext, SplitReader,
create_split_reader, BoxSourceWithStateStream, BoxTryStream, Column, ConnectorProperties,
ConnectorState, SourceColumnDesc, SourceContext, SourceLister, SplitReader,
};

#[derive(Clone, Debug)]
Expand Down Expand Up @@ -74,6 +76,16 @@ impl ConnectorSource {
.collect::<Result<Vec<SourceColumnDesc>>>()
}

pub async fn source_lister(&self) -> Result<BoxTryStream<Vec<FsPage>>> {
let config = self.config.clone();
let lister = match config {
ConnectorProperties::S3(prop) => S3SourceLister::new(*prop).await?,
_ => unreachable!(),
};

Ok(lister.paginate())
}

pub async fn stream_reader(
&self,
state: ConnectorState,
Expand Down
1 change: 1 addition & 0 deletions src/stream/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ mod sink;
mod sort;
mod sort_buffer;
pub mod source;
pub mod source_v2;
mod stateless_simple_agg;
mod stream_reader;
pub mod subtask;
Expand Down
Loading

0 comments on commit c82268b

Please sign in to comment.